__all__ = [
# misc riptable utility funcs
# public .sds methods
"load_sds",
"save_sds",
"load_sds_mem",
"sds_tree",
"sds_info",
"sds_flatten",
"sds_concat",
# private .sds methods
"save_struct",
"container_from_filetype",
"compress_dataset_internal",
"decompress_dataset_internal",
# debugging
"SDSMakeDirsOn",
"SDSMakeDirsOff",
"SDSVerboseOn",
"SDSVerboseOff",
"SDSRebuildRootOn",
"SDSRebuildRootOff",
]
import os
import shutil
# from pathlib import Path
import sys
import warnings
from typing import (
TYPE_CHECKING,
Any,
AnyStr,
Callable,
List,
Optional,
Sequence,
Tuple,
Union,
)
import numpy as np
import riptide_cpp as rc
from .rt_enum import (
INVALID_DICT,
INVALID_FILE_CHARS,
SDS_EXTENSION,
CategoryMode,
CompressionMode,
CompressionType,
SDSFileType,
SDSFlag,
TypeRegister,
)
from .rt_grouping import merge_cats
from .rt_numpy import arange, empty, ismember, zeros
from .rt_timers import utcnow
from .rt_utils import h5io_to_struct
from .Utils.rt_metadata import MetaData
# import logging
if TYPE_CHECKING:
from .rt_dataset import Dataset
from .rt_datetime import TimeSpan
from .rt_struct import Struct
AnyPath = Union[AnyStr, os.PathLike]
"""Type annotation for types accepted for use as filesystem paths."""
# TODO: Replace these two COMPRESSION_TYPE_ constants with the CompressionType enum?
COMPRESSION_TYPE_NONE: int = 0
"""Designator for not using compression when saving data to SDS file."""
COMPRESSION_TYPE_ZSTD: int = 1
"""Designator for using ZSTD compression when saving data to SDS file."""
# TODO: Consider a global mode variable to be used when a directory is created
SDSMakeDirs = True
"""
When ``SDSMakeDirs`` is set to ``True``, ``rt_sds`` will call ``os.makedirs`` to make one or more subdirectories,
otherwise ``rt_sds`` will create a directory one level deep when there is nesting.
"""
SDSVerbose = False
"""
If enabled, the SDS module will include verbose logging.
"""
SDSRebuildRoot = False
"""
If enabled, SDS will rebuilds the root SDS file, ``_root.sds``, in the event that a dataset is saved to an existing
directory that was part of a previous ``Struct`` save.
"""
# -----------------------------------------------------------------------------------------
[docs]
def SDSMakeDirsOn() -> None:
"""Enables ``SDSMakeDirs``."""
global SDSMakeDirs
SDSMakeDirs = True
[docs]
def SDSMakeDirsOff() -> None:
"""Disables ``SDSMakeDirs``."""
global SDSMakeDirs
SDSMakeDirs = False
# -----------------------------------------------------------------------------------------
[docs]
def SDSVerboseOn() -> None:
"""Enables ``SDSVerbose``."""
global SDSVerbose
SDSVerbose = True
[docs]
def SDSVerboseOff() -> None:
"""Disables ``SDSVerbose``."""
global SDSVerbose
SDSVerbose = False
def VerbosePrint(s: str, time: bool = True) -> Optional["TimeSpan"]:
"""
Prints a message `s` and an optional timestamp to a stream, or to sys.stdout by default.
If `time` is enabled, print the message with the current time and return the time.
Parameters
----------
s: str
Message to print.
time: bool
Whether to include a timestamp (defaults True).
Returns
-------
TimeSpan, optional
See Also
--------
VerbosePrintElapsed: Prints a message along with the elapsed time.
"""
if time:
t = utcnow().hour_span
print(f"{t} " + s)
return t
else:
print(s)
def VerbosePrintElapsed(s: str, start: "TimeSpan") -> None:
"""
Prints a message `s` and includes the elapsed time since `start`.
Parameters
----------
s: str
Message to print.
start: TimeSpan
The start time that will be used to calculate the elapsed time span.
See Also
--------
VerbosePrint: Prints a message and an optional timestamp.
"""
end = utcnow().hour_span
print(f"{end} " + s + f" elapsed time: {(end-start).seconds[0]} seconds")
# -----------------------------------------------------------------------------------------
[docs]
def SDSRebuildRootOn() -> None:
"""Enables ``SDSRebuildRoot``."""
global SDSRebuildRoot
print(f"Setting SDSRebuildRoot to True. Currently set to {SDSRebuildRoot}.")
SDSRebuildRoot = True
[docs]
def SDSRebuildRootOff() -> None:
"""Disables ``SDSRebuildRoot``."""
global SDSRebuildRoot
print(f"Setting SDSRebuildRoot to False. Currently set to {SDSRebuildRoot}.")
SDSRebuildRoot = False
def _anypath_to_bytes(s: AnyPath) -> bytes:
"""Normalize `str`, `bytes`, or `os.PathLike` to a `bytes`."""
return os.fsencode(s)
def _anypath_to_str(s: AnyPath) -> str:
"""Normalize `str`, `bytes`, or `os.PathLike` to a `str`."""
return os.fsdecode(s)
# -----------------------------------------------------------------------------------------
def sds_os(func: Callable, path: AnyPath) -> Any:
"""
Wrapper around Python ``os`` and ``os.path`` functions that has SDS related logic (for instance
verbose printing, if enabled in ``rt_sds``).
Parameters
----------
func: callable
Python function to call that accepts a path-like parameter.
path: str or bytes or os.PathLike
The pathname that `func` operates on.
Returns
-------
Any
The return value(s) of the called callable.
See Also
--------
SDSVerboseOn: Enables SDS Verbose mode.
SDSVerboseOff: Disables SDS Verbose mode.
VerbosePrintElapsed: Prints a message along with the elapsed time.
VerbosePrint: Prints a message and an optional timestamp.
"""
if SDSVerbose:
start = VerbosePrint(f"calling {func.__name__} on {path}")
d = func(path)
if SDSVerbose:
VerbosePrintElapsed(f"finished {func.__name__}", start)
return d
def sds_isdir(path: AnyPath) -> bool:
"""
Return ``True`` if pathname `path` refers to an existing directory.
Parameters
----------
path: str or bytes or os.PathLike
Returns
-------
bool
See Also
--------
SDSVerboseOn: Enables SDS Verbose mode.
SDSVerboseOff: Disables SDS Verbose mode.
Notes
-----
If SDS verbose mode is toggled, verbose logging will appear.
"""
return sds_os(os.path.isdir, path)
def sds_isfile(path: AnyPath) -> bool:
"""
Return ``True`` if pathname `path` refers to an existing file.
Parameters
----------
path: str or bytes or os.PathLike
Returns
-------
bool
See Also
--------
SDSVerboseOn: Enables SDS Verbose mode.
SDSVerboseOff: Disables SDS Verbose mode.
Notes
-----
If SDS verbose mode is toggled, verbose logging will appear.
"""
return sds_os(os.path.isfile, path)
def sds_exists(path: AnyPath) -> bool:
"""
Return ``True`` if pathname `path` exists.
Parameters
----------
path: str or bytes or os.PathLike
Returns
-------
bool
See Also
--------
SDSVerboseOn: Enables SDS Verbose mode.
SDSVerboseOff: Disables SDS Verbose mode.
Notes
-----
If SDS verbose mode is toggled, verbose logging will appear.
"""
return sds_os(os.path.exists, path)
def sds_listdir(path: AnyPath) -> List[str]:
"""
Returns a list of pathnames referred to by directory path `path`.
Parameters
----------
path: str or bytes or os.PathLike
Returns
-------
list of str
See Also
--------
SDSVerboseOn: Enables SDS Verbose mode.
SDSVerboseOff: Disables SDS Verbose mode.
Notes
-----
If SDS verbose mode is toggled, verbose logging will appear.
"""
return sds_os(os.listdir, path)
def sds_endswith(path: Union[bytes, str, List[Union[bytes, str]]], add: bool = False) -> Union[bool, str, List[str]]:
"""
Returns true if the pathname ends with SDS extension, ``.sds``, unless `add` is enabled then it returns
the SDS pathname.
Parameters
----------
path: bytes, str, or list of str or bytes
Pathname or list of pathnames to check if they are SDS file types.
Returns
-------
bool or str or list of str
Notes
-----
Although a list of pathnames is accepted, the current implementation assumes these are SDS pathnames and returns
them as is.
"""
endswith = False
if isinstance(path, bytes):
path = path.decode()
# user can pass in a list of filenames, right now we assume these end with SDS
if isinstance(path, list) or path.lower().endswith(SDS_EXTENSION):
endswith = True
if endswith:
if add:
return path
else:
return True
else:
if add:
return path + SDS_EXTENSION
else:
return False
# -----------------------------------------------------------------------------------------
[docs]
def sds_flatten(rootpath: AnyPath) -> None:
r"""
`sds_flatten` brings all structs and nested structures in sub-directories into the main directory.
Parameters
----------
rootpath: str or bytes or os.PathLike
The pathname to the SDS root directory.
Examples
--------
>>> sds_flatten(r'D:\junk\PYTHON_SDS')
Notes
-----
- The current implementation of `sds_flatten` crawls one subdirectory.
- If a nested directory contains items that are not sds files, the flatten will be skipped for the nested directory.
- If a there is a name conflict with items already in the base directory, the flatten will be skipped for the nested directory.
- No files will be moved or renamed until all conflicts are checked.
- If there were directories that couldn't be flattened, lists them at the end.
"""
# TODO: make this recursive
# check file permissions for final move before starting
# any other safeguard to stop directories from being half-flattened
rootpath = os.fspath(rootpath) # convert path-like objects to str/bytes
dirlist = sds_listdir(rootpath)
full_dirlist = [rootpath + os.sep + fname for fname in dirlist]
nested = [f for f in full_dirlist if sds_isdir(f)]
flatten_fail = []
# main loop over all subdirectories
for dirpath in nested:
move_files = []
dirlist = sds_listdir(dirpath)
# make sure directory contains only .sds files
# TODO: if nested subdirectory, recurse
skip = False
for fname in dirlist:
if not sds_endswith(fname):
skip = True
break
elif sds_isdir(dirpath + os.sep + fname):
skip = True
break
if skip:
warnings.warn(f"{dirpath} contained items that were not .sds files or directories. Could not flatten")
flatten_fail.append(dirpath)
continue
oldroot = None
# move the _root.sds file to the base directory
if "_root.sds" in dirlist:
# might have same name as directory, so use a temp name
oldroot = dirpath + os.sep + "_root.sds"
newroot = rootpath + os.sep + "_root.sds_temp"
# BUG: get the final root path here, or before final save
if sds_exists(newroot):
warnings.warn(f"temp file {newroot} already in root container, could not flatten {dirpath}.")
skip = True
else:
# add to final moving list
move_files.append((oldroot, newroot))
dirlist.remove("_root.sds")
if skip:
flatten_fail.append(dirpath)
continue
# strip .sds for renaming
prefix = dirpath
if sds_endswith(prefix):
prefix = dirpath[:-4]
prefix = prefix + "!"
# move all .sds files to base directory
for fname in dirlist:
old = dirpath + os.sep + fname
new = prefix + fname
if sds_exists(new):
warnings.warn("{new} was already found in base directory. Could not flatten {dirpath}.")
skip = True
break
else:
move_files.append((old, new))
if skip:
flatten_fail.append(dirpath)
continue
# move all .sds files to base directory
for old, new in move_files:
os.rename(old, new)
shutil.rmtree(dirpath)
# rename temp _root if necessary
if oldroot is not None:
# strip ! from end of prefix
finalroot = prefix[:-1] + SDS_EXTENSION
os.rename(newroot, finalroot)
if len(flatten_fail) > 0:
print("Failed to flatten subdirectories:")
for dname in flatten_fail:
print(dname)
# -----------------------------------------------------------------------------------------
def _sds_path_multi(path, share=None, overwrite=True):
"""
Checks for existence of directory for saving multiple .sds files.
If directory exists, asks user if it should be used (potentially overwriting existing .sds files inside)
Returns True if okay to proceed with save.
"""
# path will never get checked/created if saving to shared memory
if share is None:
# prompt user for overwrite
if sds_exists(path):
if overwrite is False:
prompt = f"{path} already exists. Possibly overwrite .sds files in directory? (subdirectories will remain intact) (y/n) "
overwrite = False
while True:
choice = input(prompt)
if choice in ["Y", "y"]:
overwrite = True
break
elif choice in ["N", "n"]:
break
if overwrite is False:
print(f"No file was saved.")
return False
else:
pass
# don't remove the entire tree by default
# shutil.rmtree(path)
else:
# possible TODO: call chmod after this so permissions are correct
# or maybe use os.umask before creating the directory?
if SDSVerbose:
VerbosePrint(f"calling makedirs")
if SDSMakeDirs:
os.makedirs(path)
else:
os.mkdir(path)
# raise ValueError(f'Directory {path!r} does not exist. SDSMakeDirs global variable must be set to auto create sub directories.')
return True
# -----------------------------------------------------------------------------------------
def _sds_path_single(path, share=None, overwrite=True, name=None, append=None):
"""
Checks for existence of a single .sds file and possibly prompts user to overwrite.
If the directory does not exist, it will be created for the final save.
Returns full path for final save and status (True if okay to proceed with save)
NOTE: TJD overwrite changed to True on Aug, 2019
"""
# TODO: add this routine to Dataset.save()
if isinstance(path, bytes):
path = path.decode()
# possibly add extension
if name is None:
name = os.path.basename(os.path.normpath(path))
else:
name = _parse_nested_name(name)
path = path + os.sep + name
if sds_endswith(name):
name = name[:-4]
else:
path += SDS_EXTENSION
# if the user is appending to a file, overwrite is expected
if append is not None:
overwrite = True
# TJD look at this path since it does os check on filepath
if share is None:
# if exists, let user know if file or directory
exists_str = None
if sds_isfile(path) is False:
if sds_isdir(path):
# for now, don't allow overwrite if name.sds is a directory
exists_str = f"directory"
raise TypeError(f"{path} already existed and was a {exists_str}.")
else:
exists_str = f"file"
# prompt user for overwrite
if exists_str is not None:
prompt = f"{path} already exists. Overwrite? (y/n) "
if overwrite is False:
while True:
choice = input(prompt)
if choice in "Yy":
overwrite = True
break
elif choice in "Nn":
break
if overwrite is False:
print(f"No file was saved.")
return path, name, False
else:
# overwriting files is allowed, overwriting directories is not
if sds_isdir(path):
shutil.rmtree(path)
# TJD disabled this (consider flag to re-enable)
##print(f"Overwriting {exists_str} with {path}")
# if the file/directory does not exist, possibly create the nested containing directory
else:
dir_end = len(os.path.basename(os.path.normpath(path)))
if not sds_isdir(path[:-dir_end]):
# don't make directory if empty string
if len(path[:-dir_end]) > 0:
newpath = path[:-dir_end]
if SDSMakeDirs:
os.makedirs(newpath)
else:
os.mkdir(newpath)
# raise ValueError(f'Directory {newpath!r} does not exist. SDSMakeDirs global variable must be set to auto create sub directories.')
return path, name, True
# -----------------------------------------------------------------------------------------
def _sds_save_single(
item,
path,
share=None,
overwrite=True,
compress=True,
name=None,
onefile=False,
bandsize=None,
append=None,
complevel=None,
):
"""
Fast track for saving a single item in an .sds file. This will be called if someone saves
a single array or FastArray subclass with the main save_sds() wrapper
"""
new_path, new_name, status = _sds_path_single(path, share=share, overwrite=overwrite, name=name, append=append)
if status is False:
return
# wrap in struct, struct build meta will call item build meta if necessary
item = TypeRegister.Struct({new_name: item})
fileType = SDSFileType.Array
_write_to_sds(
item,
path=new_path,
name=None,
compress=compress,
sharename=share,
fileType=fileType,
onefile=onefile,
bandsize=bandsize,
append=append,
complevel=complevel,
)
# -----------------------------------------------------------------------------------------
def _sds_load_single(meta, arrays, meta_tups, info=False):
"""
If an .sds file has a filetype SDSFileType.Array, it will be sent to this routine.
Extracts the underlying array, and rebuilds any FastArray subclasses.
"""
item = TypeRegister.Struct._load_from_sds_meta_data(meta, arrays, meta_tups)
item = list(item.values())[0]
return item
# -----------------------------------------------------------------------------------------
def save_sds_uncompressed(
filepath: AnyPath, item: Union[np.ndarray, "Dataset", "Struct"], overwrite: bool = True, name: Optional[str] = None
) -> None:
"""
Explicitly save an item without using compression.
Equivalent to ``save_sds(filepath, item, compress=False)``.
Parameters
----------
filepath: str or bytes
Path to directory for ``Struct``, path to ``.sds`` file for ``Dataset`` or array
(where SDS extension will be added if necessary).
item : Struct, Dataset, ndarray, or ndarray subclass
The ``Struct``, ``Dataset``, ``ndarray``, or ``ndarray`` subclass to store.
overwrite : bool
If ``True``, do not prompt the user when overwriting an existing ``.sds`` file (mainly useful for ``Struct.save()``,
which may call ``Dataset.save()`` multiple times) (default False).
name : str, optional
Name of the sds file (default None).
Raises
------
TypeError
If `item` type cannot be saved.
See Also
--------
save_sds: save datasets to the filename.
"""
save_sds(filepath, item, compress=False, overwrite=overwrite, name=name)
# -----------------------------------------------------------------------------------------
[docs]
def save_sds(
filepath: AnyPath,
item: Union[np.ndarray, "Dataset", "Struct"],
share: Optional[str] = None,
compress: bool = True,
overwrite: bool = True,
name: Optional[str] = None,
onefile: bool = False,
bandsize: Optional[int] = None,
append: Optional[str] = None,
complevel: Optional[int] = None,
) -> None:
r"""
Datasets and arrays will be saved into a single .sds file.
Structs will create a directory of ``.sds`` files for potential nested structures.
Parameters
----------
filepath: str or bytes or os.PathLike
Path to directory for Struct, path to ``.sds`` file for Dataset/array (extension will be added if necessary).
item : Struct, dataset, array, or array subclass
share
If the shared memory name is set, `item` will be saved to shared memory and NOT to disk. When shared memory
is specified, a filename must be included in path. Only this will be used, the rest of the path will be discarded.
For Windows make sure SE_CREATE_GLOBAL_NAME flag is set.
compress : bool, default True
Use compression when saving the file (shared memory is always saved uncompressed)
overwrite : bool, default False
If ``True``, do not prompt the user when overwriting an existing ``.sds`` file (mainly useful for ``Struct.save()``,
which may call ``Dataset.save()`` multiple times)
name : str, optional
Name of the sds file.
onefile : bool, default False
If True will flatten() a nested struct before saving to make it one file.
bandsize : int, optional
If set to an integer greater than 10000 it will compress column datas every `bandsize` rows.
append : str, optional
If set to a string it will append to the file with the section name
complevel : int, optional
Compression level from 0 to 9. 2 (default) is average. 1 is faster, less compressed, 3 is slower, more compressed.
Raises
------
TypeError
If `item` type cannot be saved
Notes
-----
``save()`` can also be called from a ``Struct`` or ``Dataset`` object.
Examples
--------
Saving a Struct:
>>> st = Struct({ \
'a': Struct({ \
'arr' : arange(10), \
'a2' : Dataset({ 'col1': arange(5) }) \
}), \
'b': Struct({ \
'ds1' : Dataset({ 'ds1col': arange(6) }), \
'ds2' : Dataset({ 'ds2col' : arange(7) }) \
}), \
})
>>> st.tree()
Struct
├──── a (Struct)
│ ├──── arr int32 (10,) 4
│ └──── a2 (Dataset)
│ └──── col1 int32 (5,) 4
└──── b (Struct)
├──── ds1 (Dataset)
│ └──── ds1col int32 (6,) 4
└──── ds2 (Dataset)
└──── ds2col int32 (7,) 4
>>> save_sds(r'D:\\junk\\nested', st)
>>> os.listdir(r'D:\\junk\\nested')
_root.sds
a!a2.sds
a.sds
b!ds1.sds
b!ds2.sds
Saving a Dataset:
>>> ds = Dataset({'col_'+str(i):arange(5) for i in range(5)})
>>> save_sds(r'D:\\junk\\test', ds)
>>> os.listdir(r'D:\\junk')
test.sds
Saving an Array:
>>> a = arange(100)
>>> save_sds('D:\\junk\\test_arr', a)
>>> os.listdir('D:\\junk')
test_arr.sds
Saving an Array Subclass:
>>> c = Categorical(np.random.choice(['a','b','c'],500))
>>> save_sds(r'D:\\junk\\cat', c)
>>> os.listdir(r'D:\\junk')
cat.sds
"""
# Convert path-like objects to str/bytes here.
# This should be removed once we're sure all functions in this module
# are doing their own normalization (as needed).
filepath = os.fspath(filepath)
if isinstance(item, TypeRegister.Dataset):
# keep name and path as-is, extension added later
_, _, status = _sds_path_single(filepath, share=share, overwrite=overwrite, name=name, append=append)
if status is False:
return
_write_to_sds(
item,
filepath,
name=name,
compress=compress,
sharename=share,
onefile=onefile,
bandsize=bandsize,
append=append,
complevel=complevel,
)
# if it exists, add this dataset to the folder's _root.sds file for future loads
# maybe stick rebuild in _write_to_sds... this could also take care of single array saves
_rebuild_rootfile(filepath, sharename=share, bandsize=bandsize, compress=compress, complevel=complevel)
elif isinstance(item, TypeRegister.Struct):
save_struct(
item,
filepath,
name=name,
sharename=share,
overwrite=overwrite,
compress=compress,
onefile=onefile,
bandsize=bandsize,
complevel=complevel,
)
# pack array into struct for save (will handle subclasses)
elif isinstance(item, np.ndarray):
_sds_save_single(
item, filepath, share=share, compress=compress, overwrite=overwrite, name=name, complevel=complevel
)
else:
raise TypeError(f"save_sds() can only save Structs, Datasets, or single arrays. Got {type(item)}")
# -----------------------------------------------------------------------------------------
def _sds_raw_info(
filepath: AnyPath,
share: Optional[Union[bytes, str]] = None,
sections: Optional[List[str]] = None,
threads: Optional[int] = None,
) -> List[tuple]:
"""
Returns
-------
a list of sds tuples
"""
def _normalize_path(filepath):
# Normalize any os.PathLike or bytes paths to str
filepath = _anypath_to_str(filepath)
if not sds_endswith(filepath):
if sds_isdir(filepath):
# should we return an error also?
return filepath + os.sep + "_root.sds"
else:
return filepath + SDS_EXTENSION
elif sds_isdir(filepath):
raise ValueError(
f"The filename {filepath} is a directory and ends with .sds so sds_info will not work. Consider sds_tree(filepath) instead."
)
else:
return filepath
if isinstance(filepath, (str, bytes, os.PathLike)):
filepath = _normalize_path(filepath)
else:
# Assume this is a Sequence[AnyPath]
filepath = [_normalize_path(x) for x in filepath]
return decompress_dataset_internal(filepath, sharename=share, info=True, sections=sections, threads=threads)
# -----------------------------------------------------------------------------------------
def sds_dir(filepath: AnyPath, share: Optional[str] = None) -> List[str]:
r"""
Returns list of ``Dataset`` or ``Struct`` item names as strings.
Only returns top level item names of ``Struct`` directory.
Parameters
----------
filepath: str or bytes or os.PathLike
Path to directory for Struct, path to ``.sds`` file for Dataset/array (extension will be added if necessary).
share
If the shared memory name is set, the item will be saved to shared memory and NOT to disk. When shared memory
is specified, a filename must be included in path. Only this will be used, the rest of the path will be discarded.
Returns
-------
List of str
Examples
--------
>>> ds = Dataset({'col_'+str(i):arange(5) for i in range(5)})
>>> ds.save(r'D:\junk\test')
>>> sds_dir(r'D:\junk\test')
['col_0', 'col_1', 'col_2', 'col_3', 'col_4']
"""
dirlist = []
firstsds = _sds_raw_info(filepath, share=share)
meta, info, tups, fileheader = firstsds[0]
for tup in tups:
if tup[1] & SDSFlag.OriginalContainer:
dirlist.append(tup[0].decode())
return dirlist
# -----------------------------------------------------------------------------------------
[docs]
def sds_info(
filepath: Union[AnyPath, Sequence[AnyPath]],
share: Optional[Union[bytes, str]] = None,
sections: Optional[List[str]] = None,
threads: Optional[int] = None,
):
# TODO: match the Matlab output (should it look the same, or print more information from array info?)
return _sds_raw_info(filepath, share=share, sections=sections, threads=threads)
# -----------------------------------------------------------------------------------------
[docs]
def sds_tree(filepath: AnyPath, threads: Optional[int] = None):
r"""
Explicitly display a tree of data for .sds file or directory.
Only loads info, not data.
Parameters
----------
filepath : str or bytes or os.PathLike
threads : int, optional
Examples
--------
>>> ds = Dataset({'col_'+str(i):arange(5) for i in range(5)})
>>> ds.save(r'D:\junk\treeds')
>>> sds_tree(r'D:\junk\treeds')
treeds
├──── col_0 FA (5,) int32 i4
├──── col_1 FA (5,) int32 i4
├──── col_2 FA (5,) int32 i4
├──── col_3 FA (5,) int32 i4
└──── col_4 FA (5,) int32 i4
"""
return _load_sds_internal(filepath, info=True, threads=threads)
# -----------------------------------------------------------------------------------------
[docs]
def load_sds_mem(
filepath: AnyPath,
share: str,
include: Optional[List[str]] = None,
threads: Optional[int] = None,
filter: Optional[np.ndarray] = None,
):
"""
Explicitly load data from shared memory.
Parameters
----------
filepath : str or bytes or os.PathLike
name of sds file or directory. if no .sds extension, _load_sds will look for _root.sds
if no _root.sds is found, extension will be added and shared memory will be checked again.
share : str
shared memory name. For Windows make sure ``SE_CREATE_GLOBAL_NAME`` flag is set.
include : list of str, optional
threads: int, optional, defaults to None
how many threads to used
filter: int array or bool array, optional, defaults to None
Returns
-------
Struct, Dataset or array loaded from shared memory.
Notes
-----
To load a single dataset that belongs to a struct, the extension must be included. Otherwise,
the path is assumed to be a directory, and the entire Struct is loaded.
"""
return _load_sds_internal(filepath, share=share, include=include, threads=threads, filter=filter)
# -----------------------------------------------------------------------------------------
def _sds_dir_from_file_list(filenames: Sequence[AnyPath], share: Optional[AnyStr] = None, mustexist: bool = False):
# files might only be in shared memory
if share is not None:
raise NotImplementedError
names = []
single_sds = []
badlist = []
hasdir = False
hasfile = False
for f in filenames:
# Normalize path-like and bytes to str (if needed).
f = os.fspath(f)
if isinstance(f, bytes):
f = f.decode()
# TODO: this has been written before... pull the complete version from somewhere else
if sds_isdir(f):
names.append(f)
hasdir = True
else:
fnew = sds_endswith(f, add=True)
if sds_isfile(fnew):
names.append(None)
single_sds.append(fnew)
hasfile = True
else:
badlist.append(fnew)
if mustexist is True:
raise ValueError(f"Could not find file named {f} and mustexist is True.")
else:
warnings.warn(f"Could not find file named {f}")
# names is list of [ None, None, path, None ]
# where None is a single file
# single_sds is a list of fullpaths to single sds files
# also returns flags to save a pass for calling function
return names, single_sds, badlist, hasdir, hasfile
# -----------------------------------------------------------------------------------------
def _sds_load_from_list(
files: Sequence[Optional[AnyPath]],
single_sds,
share=None,
info=False,
include=None,
threads=None,
folders=None,
filter=None,
mustexist=False,
sections=None,
):
"""
Called by load_sds(), for loading an explicit list of .sds files or directories.
Parameters
----------
files : sequence of optional paths
a list of [ None, None, directory/path, None ] where None is a placeholder for a single load to maintain file order.
single_sds : a list of single .sds files
share : **not implemented
info : **not implemented
include : skips items in single .sds loads, or files in directory, behaves the same way as include keyword in load_sds()
filter : optional: boolean or fancy index filter (only rows in the filter will be added)
sections : optional: list of strings with sections to load (file must have been saved with append=)
loads all loose .sds files in one decompress call
for each directory, loads all .sds files inside in one decompress call
For instance, if the list has 3 .sds files, and 2 directories, there will be 3 calls to rc.MultiDecompressFiles
*future optimization: reduce this to 1 call for all files (gets tricky with nested structures)
Returns
-------
list of datasets/structs/arrays.
"""
# load all loose sds files at the same time
multiload = decompress_dataset_internal(
single_sds,
sharename=share,
info=info,
include=include,
threads=threads,
filter=filter,
mustexist=mustexist,
folders=folders,
sections=sections,
)
# check for autodetect on appended to or concat
if isinstance(multiload, (tuple, list)):
single_idx = 0
for idx, f in enumerate(files):
if f is None:
files[idx] = _read_sds(
"",
sharename=share,
info=info,
include=include,
multiload=multiload[single_idx],
filter=filter,
mustexist=mustexist,
sections=sections,
)
single_idx += 1
else:
files[idx] = load_sds(
f,
share=share,
info=info,
include=include,
threads=threads,
filter=filter,
mustexist=mustexist,
sections=sections,
)
return files
else:
return multiload
# -----------------------------------------------------------------------------------------
def _multistack_categoricals(spec_name, meta_list, indices, listcats, idx_cutoffs, unique_cutoffs):
"""
Call when loading multiple SDS files.
Assumes meta_list is a list of dictionaries
Returns a Categorical.
"""
if idx_cutoffs is not None and len(idx_cutoffs) > 1:
# pull the firstkey and check for invalids
firstindex = indices[idx_cutoffs[1:] - 1]
invalid = INVALID_DICT[firstindex.dtype.num]
invalidsum = np.sum(firstindex == invalid)
if invalidsum > 0 or indices[0] == invalid:
warnings.warn(
f"!! {invalidsum} Bad indices in categorical {spec_name}. May have been gap filled. Setting invalids to bin 0."
)
finvalidmask = indices == invalid
indices[finvalidmask] = 0
if SDSVerbose:
verbose_start = VerbosePrint(f"start reconstructing categorical {spec_name}")
mode = {m["instance_vars"]["mode"] for m in meta_list}
if len(mode) != 1:
raise TypeError(f"Categoricals had different modes! {list(mode)}")
mode = CategoryMode(list(mode)[0])
# for other properties, use the ones from the first item
firstmeta = meta_list[0]
ordered = firstmeta["instance_vars"]["ordered"]
sort_display = firstmeta["instance_vars"]["sort_gb"]
# ------------------------- start rebuild here
if mode in (CategoryMode.Dictionary, CategoryMode.IntEnum):
base_index = None
indices, listcats = merge_cats(
indices, listcats, unique_cutoffs=unique_cutoffs, from_mapping=True, ordered=ordered, verbose=SDSVerbose
)
# TJD added check
# This check works even if the arrays returned in `listcats` are empty.
if np.issubdtype(listcats[0].dtype, np.integer):
# EXCPECT first value is string, and second is int
newcats = dict(zip(listcats[1], listcats[0]))
else:
newcats = dict(zip(listcats[0], listcats[1]))
else:
base_index = {m["instance_vars"]["base_index"] for m in meta_list}
if len(base_index) != 1:
raise TypeError(f"Categoricals had different base index {base_index}, cannot be stacked!")
base_index = CategoryMode(list(base_index)[0])
indices, newcats = merge_cats(
indices,
listcats,
idx_cutoffs=idx_cutoffs,
unique_cutoffs=unique_cutoffs,
verbose=SDSVerbose,
base_index=base_index,
ordered=ordered,
)
# newcats = TypeRegister.Grouping(indices, categories=newcats)
newcats = TypeRegister.Grouping(
indices, categories=newcats, _trusted=True, base_index=base_index, ordered=ordered, sort_display=sort_display
)
result = TypeRegister.Categorical(newcats)
if SDSVerbose:
VerbosePrintElapsed(f"finished reconstructing categorical {spec_name}", verbose_start)
return result
# -----------------------------------------------------------------------------------------
def _multistack_onefile(arrays, nameflag_tup, cutoffs, meta, sep="/"):
"""
Interntal routine to stack any FastArray subclasses from a multistacked load.
Returns dictionary of items.
"""
def _build_meta(metastrings):
# this routine does not get meta for a categorical that appears later
# we would have to go through all meta data to discover new items
# get the first meta string (they are stacked)
beststring = metastrings[0]
if len(beststring) == 0:
for i in range(1, len(metastrings)):
beststring = metastrings[i]
if len(beststring) > 0:
break
# print("buildmeta returning", beststring)
return beststring
# first pass, build an array of (colname, value, arrayflag)
data_cutoffs = {}
obj_array = np.empty(len(nameflag_tup), dtype="O")
for i, (colname, flag) in enumerate(nameflag_tup):
arr = arrays[i]
colname = colname.decode()
if flag & SDSFlag.Nested:
# this is an entry point such as 'data/'
# the array is a list of meta byte strings
# now find the first valid meta string
arr = _build_meta(arr)
obj_array[i] = (colname, arr, flag)
# regular item, or underlying array for FastArray subclass
if flag & SDSFlag.OriginalContainer:
# find sep char to get name
pos = colname.rfind(sep)
if pos >= 0:
purename = colname[pos + 1 :]
data_cutoffs[purename] = cutoffs[i]
startname = ""
# the root meta
meta = _build_meta(meta)
s = TypeRegister.Struct._flatten_undo(sep, 0, startname, obj_array, meta=meta, cutoffs=cutoffs)
return s, data_cutoffs
# -----------------------------------------------------------------------------------------
def _multistack_items(arrays, meta_tups, cutoffs, meta):
"""
Interntal routine to stack any FastArray subclasses from a multistacked load.
Returns dictionary of items.
"""
data = {}
data_cutoffs = {}
spec_items = {}
spec_cutoffs = {}
spec_meta = {}
# list of non-categorical fastarray subclasses
# all will be rebuilt using metadata from first item
# loop over all meta data, the first definition gets stored in spec_meta
for metadata in meta:
if metadata is None or len(metadata) == 0:
continue
item_meta = MetaData(metadata).get("item_meta", [])
for i_meta in item_meta:
i_meta = MetaData(i_meta)
m_list = spec_meta.setdefault(i_meta["name"], [])
# TJD think this builds a list of all meta items for this column
m_list.append(i_meta)
for item_idx, tup in enumerate(meta_tups):
itemname = tup[0].decode()
itemenum = tup[1]
# regular item, or underlying array for FastArray subclass
if itemenum & SDSFlag.OriginalContainer:
underlying = arrays[item_idx]
# TODO: change this loop, or move elsewhere to NOT be hard-coded for categorical
if itemname in spec_meta:
metalist = spec_meta[itemname]
i_meta = metalist[0]
i_class = i_meta.itemclass
# we can fix certain classes immediately, categoricals have to wait until all extra arrays loaded
if not TypeRegister.is_binned_type(i_class):
underlying = i_class._load_from_sds_meta_data(itemname, underlying, [], i_meta)
del spec_meta[itemname]
data[itemname] = underlying
data_cutoffs[itemname] = cutoffs[item_idx]
# auxilery item (categorical uniques, etc.)
# python only
else:
spec_name = itemname[: itemname.find("!")]
# each dictionary key in spec_arrays corresponds to an item in the original container
# spec_items = {itemname: [arr1, arr2, arr3...]}
spec_list = spec_items.setdefault(spec_name, [])
spec_list.append(arrays[item_idx])
# save cutoffs for categorical fixup
spec_cutoffs_list = spec_cutoffs.setdefault(spec_name, [])
spec_cutoffs_list.append(cutoffs[item_idx])
# categoricals only
for spec_name, meta_list in spec_meta.items():
underlying = data.get(spec_name, None)
# only rebuild if the underlying was loaded (may not have been in include list)
if underlying is not None:
listcats = spec_items[spec_name]
idx_cutoffs = data_cutoffs[spec_name]
unique_cutoffs = spec_cutoffs[spec_name]
stacked_categorical = _multistack_categoricals(
spec_name, meta_list, underlying, listcats, idx_cutoffs, unique_cutoffs
)
data[spec_name] = stacked_categorical
return data, data_cutoffs
# -----------------------------------------------------------------------------------------
# internal routine to resolve metadata after a stacked load
def _stacked(filenames, result, folders):
arrays, meta_tups, cutoffs, meta, loadedpaths, fileheader = result
# check which files were loaded, warn with list of load failures
found, _ = ismember(filenames, loadedpaths)
if sum(found) != len(found):
badlist = [filenames[idx] for idx, f in enumerate(found) if not f]
warnings.warn(f"Error loading files: {badlist}")
# fix special array subclasses and create a partitioned dataset
if SDSVerbose:
verbose_start = VerbosePrint(f"starting _multistack_items")
isonefile = False
# check here for onefile stacking vs normal stacking
for i, mtup in enumerate(meta_tups):
# check for the meta flag in a onefile, only onefile has this
flag = mtup[1]
if flag & SDSFlag.Meta:
isonefile = True
break
if flag & SDSFlag.Nested and b"/" in mtup[0]:
isonefile = True
break
if isonefile:
data, allcutoffs = _multistack_onefile(arrays, meta_tups, cutoffs, meta, sep="/")
# the data might be in a Struct
if folders is not None and isinstance(data, TypeRegister.Struct):
sep = "/"
# get the first foldername, has trailing slash
folder = folders[0]
while folder.find(sep) >= 0:
pos = folder.find(sep)
subfolder = folder[:pos]
data = data[subfolder]
folder = folder[pos + 1 :]
if not isinstance(data, TypeRegister.Dataset):
return data
# fix cutoffs (trim down to just what we have)
cutoffs = {colname: allcutoffs[colname] for colname in data}
# print("**final cutoffs", cutoffs)
# ds = TypeRegister.PDataset(data, cutoffs=cutoffs, filenames=loadedpaths)
# return data, cutoffs
else:
data, cutoffs = _multistack_items(arrays, meta_tups, cutoffs, meta)
if SDSVerbose:
VerbosePrintElapsed(f"finished _multistack_items", verbose_start)
ds = TypeRegister.PDataset(data, cutoffs=cutoffs, filenames=loadedpaths)
return ds
# -----------------------------------------------------------------------------------------
def _convert_to_mask(filter):
if not isinstance(filter, np.ndarray):
filter = np.atleast_1d(filter)
if isinstance(filter, np.ndarray) and filter.dtype.char != "O":
if filter.dtype.char == "?" and len(filter) != 0:
# no more bool to fancy
# filter = bool_to_fancy(filter)
return filter
else:
if filter.dtype.num > 10:
raise TypeError(f"The filter must be a numpy array of booleans or integers not {filter.dtype}.")
# convert fancy index to bool
if len(filter) > 0:
maxval = np.max(filter)
mask = zeros(maxval + 1, dtype=bool)
mask[filter] = True
else:
mask = zeros(0, dtype=bool)
return mask
else:
raise TypeError(f"The filter must be a numpy array of booleans or integers not {type(filter)}.")
# -----------------------------------------------------------------------------------------
def _stack_sds_files(
filenames,
share=None,
info=False,
include=None,
folders=None,
threads: Optional[int] = None,
filter=None,
mustexist=False,
sections=None,
reserve=0.0,
):
"""
Internal routine for a single list of filenames (datasets or structs) to be stacked.
Called by stack_sds() and _stack_sds_dirs()
Only supports datasets (no structs/nesting)
Returns stacked dataset, will be pdataset when class has been implemented.
"""
# files that werent found were not passed in, but may have raised an error during multiload call
savethreads = None if threads is None else rc.SetThreadWakeUp(threads)
try:
if len(filenames) == 0:
raise ValueError(f"MultiStack list was empty. No files existed in original list.")
if SDSVerbose:
verbose_start = VerbosePrint(
f"calling rc.MultiStackFiles with {len(filenames)} files first: {filenames[0]} last: {filenames[-1]} include: {include}"
)
# Always use boolean mask now
mask = None
if filter is not None:
mask = _convert_to_mask(filter)
filter = None
result = rc.MultiStackFiles(
filenames,
include=include,
folders=folders,
filter=filter,
mask=mask,
mustexist=mustexist,
sections=sections,
reserve=reserve,
)
if result is None:
raise ValueError(f"There was a problem when trying to stack the files {filenames}. No data was returned.")
if SDSVerbose:
VerbosePrintElapsed(f"finished rc.MultiStackFiles", verbose_start)
finally:
if savethreads is not None:
rc.SetThreadWakeUp(savethreads)
return _stacked(filenames, result, folders)
# -----------------------------------------------------------------------------------------
def _stack_sds_dirs(filenames, share=None, info: bool = False, include=[], folders=[], sections=None, threads=None):
r"""
Dictionary will be created for final `rc.MultiStackFiles` call.
>>> dirs = ['D:\junk\foobar\20190201', 'D:\junk\foobar\20190204', 'D:\junk\foobar\20190205']
>>> include = ['zz', 'qq', 'MM']
>>> stack_sds( dirs, [] )
This routine will build the following dictionary:
include_dict = {
'zz': ['D:\junk\foobar\20190201\zz.sds',
'D:\junk\foobar\20190204\zz.sds',
'D:\junk\foobar\20190205\zz.sds'],
'qq': ['D:\junk\foobar\20190201\qq.sds',
'D:\junk\foobar\20190204\qq.sds',
'D:\junk\foobar\20190205\qq.sds'],
'MM': ['D:\junk\foobar\20190201\MM.sds',
'D:\junk\foobar\20190204\MM.sds',
'D:\junk\foobar\20190205\MM.sds']
}
rc.MultiStackFiles will be called 3 times, on each of the dict values.
A struct will be returned with three stacked Datasets ( or pdatasets when class has been implemented )
Struct({
'zz' : rc.MultiStack(include_dict['zz']),
'qq' : rc.MultiStack(include_dict['qq']),
'MM' : rc.MultiStack(include_dict['MM']),
})
"""
# folders is the new way (include is really for column names now)
# onefile will NOT take this path
if folders is None:
folders = include
include_dict = {}
for path in filenames:
path = path + os.sep
# treat include item as name of file within struct directory
for inc in folders:
if isinstance(inc, bytes):
inc = inc.decode()
inc = sds_endswith(inc, add=True)
# don't put .sds in result item name
name = inc[:-4]
# don't check for existence of file, CPP loader will just skip it
# pull ref to list, or create a new one
inc_list = include_dict.setdefault(name, [])
inc_list.append(path + inc)
# build single dataset for each include item
for inc, files in include_dict.items():
include_dict[inc] = _stack_sds_files(files, share=share, info=info, sections=sections)
# if just one item pop it
# if len(include_dict) == 1:
# return include_dict.popitem()[1]
# return all items in struct container
return TypeRegister.Struct(include_dict)
# -----------------------------------------------------------------------------------------
def _load_sds_internal(
filepath: Union[AnyPath, Sequence[AnyPath]],
share: Optional[str] = None,
info: bool = False,
include_all_sds: bool = False,
include: Optional[List[str]] = None,
stack: Optional[bool] = None,
name: Optional[str] = None,
threads: Optional[int] = None,
folders: Optional[List[str]] = None,
filter: Optional[np.ndarray] = None,
mustexist: bool = False,
sections: Optional[List[str]] = None,
reserve: float = 0.0,
):
"""
All explicit `load_sds` calls will be funneled into this routine.
See docstrings for load_sds(), load_sds_mem(), sds_tree(), sds_info()
"""
if isinstance(include, (str, bytes)):
include = [include]
# All folder names have to end in /
if folders is not None:
if isinstance(folders, (str, bytes)):
folders = [folders]
if not isinstance(folders, list):
raise ValueError(
f"The folders kwarg must be a list of strings of dataset or struct names to include. {folders}"
)
if stack:
if isinstance(filepath, (str, bytes, os.PathLike)):
filepath = [filepath]
# Convert path-like objects to str/bytes (for compatibility with code below).
filepath = [os.fspath(x) for x in filepath]
files, sds_filelist, badlist, hasdir, hasfile = _sds_dir_from_file_list(
filepath, share=share, mustexist=mustexist
)
if hasdir:
if hasfile:
raise TypeError(f"List of files must contain only directories or only .sds files. {filepath}")
else:
# only directories
if include is None and folders is None:
raise ValueError(
f"SDS stacking only implemented for Datasets. Must provide folders list if loading from multiple Struct directories."
)
return _stack_sds_dirs(
files, share=share, info=info, include=include, folders=folders, sections=sections, threads=threads
)
else:
# only files
# TODO: Check if stacking with onefile (have to read file type of first file??)
# TODO folders= must be preserved
if folders is not None:
# make sure all folders end with slash
newfolders = []
for f in folders:
if not f.endswith("/"):
f = f + "/"
newfolders.append(f)
folders = newfolders
# assume onefile mode
include_dict = {}
for f in folders:
fname = f[:-1]
include_dict[fname] = _stack_sds_files(
sds_filelist,
share=share,
info=info,
include=include,
folders=[f],
sections=sections,
threads=threads,
filter=filter,
reserve=reserve,
)
return TypeRegister.Struct(include_dict)
return _stack_sds_files(
sds_filelist,
share=share,
info=info,
include=include,
folders=folders,
sections=sections,
threads=threads,
filter=filter,
mustexist=mustexist,
reserve=reserve,
)
# not stacked
# string-only operations until final load
if isinstance(filepath, os.PathLike):
filepath = os.fspath(filepath)
if isinstance(filepath, bytes):
filepath = filepath.decode()
# list of full filepaths provided
elif isinstance(filepath, list):
files, single_sds, _, _, _ = _sds_dir_from_file_list(filepath, mustexist=mustexist)
return _sds_load_from_list(
files,
single_sds,
share=share,
info=info,
include=include,
threads=threads,
filter=filter,
mustexist=mustexist,
folders=folders,
sections=sections,
)
if sds_endswith(filepath) or share is not None:
# do not have a try
result = _load_sds(
filepath,
sharename=share,
info=info,
include_all_sds=include_all_sds,
include=include,
name=name,
stack=stack,
threads=threads,
filter=filter,
mustexist=mustexist,
folders=folders,
sections=sections,
)
else:
# change so only one routine (_load_sds) attempts to fix file
# do this when shared memory load gets forked
# try to load with extension and without (due to people naming directories with .sds extensions)
try:
result = _load_sds(
filepath,
sharename=share,
info=info,
include_all_sds=include_all_sds,
include=include,
name=name,
stack=stack,
threads=threads,
filter=filter,
folders=folders,
sections=sections,
)
origerror = None
except Exception:
origerror = sys.exc_info()[1]
if origerror is not None:
# try again with extension
filepath = filepath + SDS_EXTENSION
try:
result = _load_sds(
filepath,
sharename=share,
info=info,
include_all_sds=include_all_sds,
include=include,
name=name,
stack=stack,
threads=threads,
filter=filter,
folders=folders,
sections=sections,
)
except Exception:
raise ValueError(
f"Could not load item with filepath {filepath!r} and shared name {share!r}. First error: {origerror!r}. Second error {sys.exc_info()[1]}"
)
if info:
# tree from struct, otherwise single string from array
if isinstance(result, TypeRegister.Struct):
result = TypeRegister.Struct._info_tree(filepath, result)
return result
# -----------------------------------------------------------------------------------------
[docs]
def load_sds(
filepath: Union[AnyPath, Sequence[AnyPath]],
share: Optional[str] = None,
info: bool = False,
include_all_sds: bool = False,
include: Optional[List[str]] = None,
name: Optional[str] = None,
threads: Optional[int] = None,
stack: Optional[bool] = None,
folders: Optional[List[str]] = None,
sections: Optional[List[str]] = None,
filter: Optional[np.ndarray] = None,
mustexist: bool = False,
verbose: bool = False,
reserve: float = 0.0,
) -> "Struct":
r"""
Load a dataset from single ``.sds`` file or struct from directory of ``.sds`` files.
When ``stack=True``, generic loader for a single ``.sds`` file or directory of multiple ``.sds`` files.
Parameters
----------
filepath : str or bytes or os.PathLike or sequence of str
Full path to file or directory.
When `stack` is ``True`` can be list of ``.sds`` files to stack
When `stack` is ``True`` list of directories containing ``.sds`` files to stack (must also use kwarg `include`)
share : str, optional
The shared memory name. loader will check for dataset in shared memory first and if it's not there, the
data (if the filepath is found on disk) will be loaded into the user's workspace AND shared memory.
A sharename must be accompanied by a file name. The rest of a full path will be trimmed off internally.
Defaults to None. For Windows make sure SE_CREATE_GLOBAL_NAME flag is set.
info : bool
No item data will be loaded, the hierarchy will be displayed in a tree (defaults to False).
include_all_sds : bool
If ``True``, any extra files in saved struct's directory will be loaded into final struct (skips user prompt) (defaults to False).
include : list of str, optional
A list of strings of which columns to load, e.g. ``['Ask','Bid']``.
When `stack` is ``True`` and directories passed, list of filenames to stack across each directory (defaults to None).
name : str, optional
Optionally specify the name of the struct being loaded. This might be different than directory (defaults to None).
threads : int, optional
How many threads to read, stack, and decompress with (defaults to None).
stack : bool, optional
Set to ``True`` to stack array data before loading into python (see docstring for `stack_sds`).
Set to ``False`` when appending many files into one and want columns flattening.
This parameter is not compatible with the `share` or `info` parameters (defaults to None).
folders : list of str, optional
A list of strings on which folders to include e.g., ``['zz/','xtra/']`` (must be saved with ``onefile=True``) (defaults to None).
sections : list of str, optional
A list of strings on which sections to include (must be saved with ``append="name"``) (defaults to None).
filter : ndarray, optional
Optional fancy index or boolean array. Does not work with ``stack=True``.
Designed to read in contiguous sections; for example, ``filter=arange(10)`` to read first 10 elements (defaults to None).
mustexist : bool
Set to True to ensure that all files exist or raise an exception (defaults to False).
verbose : bool
Prints time related data to stdout (defaults to False).
reserve : float
When set greater than 0.0 and less than 1.0, this is how much extra room is reserved when stacking.
If set to 0.10, it will allocate 10% more memory for future partitions.
Defaults to 0.0.
Returns
-------
Struct
Notes
-----
When `stack` is ``True``:
- columns with the same name must have matching types or upcastable types
- bytestring widths will be fixed internally
- numeric types will be upcast appropriately
- missing columns will be filled with the invalid value for the column type
Examples
--------
Stacking multiple files together while loading:
>>> files = [ r'D:\dir1\ds1.sds' r'D:\dir2\ds1.sds' ]
>>> load_sds(files, stack=True)
# col_0 col_1 col_2 col_3 col_4
- ----- ----- ----- ----- -----
0 0.71 0.86 0.44 0.97 0.47
1 0.89 0.40 0.10 0.94 0.66
2 0.03 0.56 0.80 0.85 0.30
Stacking multiple files together while loading, explicitly specifying the
list of columns to be loaded.
>>> files = [ r'D:\dir1\ds1.sds' r'D:\dir2\ds1.sds' ]
>>> include = ['col_0', 'col_1', 'col_4']
>>> load_sds(files, include=include, stack=True)
# col_0 col_1 col_4
- ----- ----- -----
0 0.71 0.86 0.47
1 0.89 0.40 0.66
2 0.03 0.56 0.30
Stacking multiple directories together while loading, explicitly specifying
the list of `Dataset` objects to load (from each directory, then stack together).
>>> files = [ r'D:\dir1', r'D:\dir2' ]
>>> include = [ 'ds1', 'ds2', 'ds3' ]
>>> load_sds(files, include=include, stack=True)
# Name Type Size 0 1 2
- ---- ------- ----------------- - - -
0 ds1 Dataset 20 rows x 10 cols
1 ds2 Dataset 20 rows x 10 cols
2 ds3 Dataset 20 rows x 10 cols
See Also
--------
sds_tree
sds_info
"""
if verbose:
SDSVerboseOn()
else:
SDSVerboseOff()
if stack is True:
if info:
raise ValueError("sds: info cannot be set when stack=True")
if share is not None:
raise ValueError("sds: share cannot be set when stack =True")
if stack is False:
if reserve != 0.0:
raise ValueError("sds: reserve cannot be set when stack=False")
return _load_sds_internal(
filepath,
share=share,
info=info,
include_all_sds=include_all_sds,
include=include,
folders=folders,
sections=sections,
stack=stack,
name=name,
threads=threads,
filter=filter,
mustexist=mustexist,
reserve=reserve,
)
# -----------------------------------------------------------------------------------------
def _make_zero_length(sdsresult):
"""
Internal routine that walks each array returned and creates a 0 length version
"""
# we just read in first row and now we have to return arrays of 0 length in the first dim
newlist = []
# SDS file returns 4 tuples
# meta, arrays, (name, flags), infodict
for sds in sdsresult:
filetype = sds[3]["FileType"]
if filetype == SDSFileType.Dataset or filetype == SDSFileType.Array:
arr = sds[1]
tups = sds[2]
newarr = []
for a, t in zip(arr, tups):
# names with a bang are for categoricals (we dont touch them)
if t[0].find(b"!") == -1:
# make zero length array for first dim
l = [*a.shape]
l[0] = 0
a = empty(tuple(l), dtype=a.dtype)
newarr.append(a)
arr = tuple(newarr)
# rebuild sds
sds = (sds[0], arr, sds[2], sds[3])
newlist.append(sds)
return newlist
# -----------------------------------------------------------------------------------------
# TODO - PEP484 What type does sds_concat return?
[docs]
def sds_concat(
filenames: Sequence[Union[str, os.PathLike]],
output: Optional[Union[str, os.PathLike]] = None,
include: List[str] = None,
):
"""
Parameters
----------
filenames : sequence of str or os.PathLike.
List of fully qualified pathnames
output : str or os.PathLike, optional
Single string of the filename to create (defaults to None).
include : list of str, optional
A list of strings indicating which columns to include in the load (currently not supported).
Defaults to None.
Returns
-------
A new file created with the name in `output`. This output file has all the filenames appended.
Raises
------
ValueError
If output filename is not specified.
Notes
-----
The `include` parameter is not currently implemented.
Examples
--------
>>> flist=['/nfs/file1.sds', '/nfs/file2.sds', '/nfs/file3.sds']
>>> sds_concat(flist, output='/nfs/mydata/concattest.sds')
>>> sds_load('/nfs/mydata/concattest.sds', stack=True)
"""
if output is None:
raise ValueError(f"The output kwarg must be specified and be a valid filename to create.")
# Convert path-like objects to bytes/str before passing to the C++ layer.
filenames = [os.fspath(x) for x in filenames]
output = None if output is None else os.fspath(output)
result = rc.MultiConcatFiles(filenames, output=output, include=include)
# -----------------------------------------------------------------------------------------
[docs]
def decompress_dataset_internal(
filename: Union[AnyPath, Sequence[AnyPath]],
mode: CompressionMode = CompressionMode.DecompressFile,
sharename: Optional[AnyStr] = None,
info: bool = False,
include: Optional[Union[AnyStr, Sequence[AnyStr]]] = None,
stack: Optional[bool] = None,
threads: Optional[int] = None,
folders: Optional[List[str]] = None,
sections: Optional[List[str]] = None,
filter: Optional[np.ndarray] = None,
mustexist: bool = False,
goodfiles: Optional[Tuple[List[str], AnyPath]] = None,
) -> List[Tuple[bytes, List[np.ndarray], List[tuple]]]:
r"""
Parameters
----------
filename : str or bytes or os.PathLike or sequence of str
A string (or list of strings) of fully qualified path name, or shared memory location (e.g., ``Global\...``)
mode : CompressionMode
When set to `CompressionMode.Info`, tup2 is replaced with a tuple of numpy attributes (shape, dtype,
flags, itemsize) (default CompressionMode).
sharename : str, or bytes, optional
Unique bytestring for shared memory location. Prevents mistakenly overwriting data in shared memory (defaults to None).
include : str, bytes, or list of str
Which items to include in the load. If items were omitted, tuples will still appear, but None will
be loaded as their corresponding data (defaults to None).
stack : bool, optional
Set to ``True`` to stack array data before loading into python (see docstring for `stack_sds`).
Set to ``False`` when appending many files into one and want columns flattening.
Defaults to None.
threads : int, optional
How many threads to read, stack, and decompress with (defaults to None).
info : boolean
Instead of decompressing numpy arrays, return a summary of each one's contents (shape/dtype/itemsize/etc.)
folders : str, bytes, or list of strings, optional
When saving with ``onefile=True`` (will filter out only those subfolders) list of strings (defaults to None)
filter : ndarray, optional
A boolean or fancy index filter (only rows in the filter will be added) (defaults to None).
mustexist : bool
When true will raise exception if any file is missing.
sections : list of str, optional
List of strings with sections to load (file must have been saved with ``append=``) (defaults to None).
goodfiles : list of str, optional
Tuples of two objects (list of filenames, path the files came from) -- often from ``os.walk`` (defaults to None).
Returns
-------
list of tuples, optional
tup1: json metadata in a bytestring
tup2: list of numpy arrays or tuple of (shape, dtype, flags, itemsize) if info mode
tup3: list of tuples containing (itemname, SDSFlags bitmask) for all items in container (might not correspond with 2nd item's arrays)
tup4: dictionary of file header meta data
Raises
------
ValueError
If `include` is not a list of column names.
If the result doesn't contain any data.
"""
# -------------------------------------------
def _add_sds_ext(filename):
"""If a filename does not exist or is not a file, and has no extension, add extension"""
try_add = False
if sds_exists(filename):
pass
else:
root, ext = os.path.splitext(filename)
if len(ext) == 0:
try_add = True
if try_add:
filename = sds_endswith(filename, add=True)
return filename
# -----------------------------------------------------------------------------------------
def _include_as_dict(include):
"""
If include list is specified, converts to dictionary of names->None
"""
if include is None or isinstance(include, dict):
pass
else:
include = {item: None for item in include}
return include
# -------------------------------------------
# even if one string, convert to a list of one string
if isinstance(filename, (str, bytes, os.PathLike)):
filename = [filename]
# Normalize any Path-like objects to str/bytes, then convert bytes to str.
filename = [_anypath_to_str(x) for x in filename]
# sharename still passed (to riptide_cpp) as bytes. Convert str to bytes if needed.
if isinstance(sharename, str):
sharename = sharename.encode()
if include is not None:
if isinstance(include, (str, bytes)):
include = [include]
if not isinstance(include, list):
raise ValueError(f"The include kwarg must be a list of column names to include. {include}")
if info:
mode = CompressionMode.Info
# print('***filename',filename)
# print('mode',mode)
# print('sharename',sharename)
# print('include',include)
# until the low-level routine does this, put the final extension check here
# all SDS loads will hit this block
# user can also pass in a list of known good filenames
if goodfiles is not None:
flist = goodfiles[0]
fullpath = _anypath_to_str(goodfiles[1])
# build a good dictionary to avoid checking for file existence
gooddict = {}
for file in flist:
gooddict[os.path.join(fullpath, file)] = True
for pfname, fname in enumerate(filename):
if gooddict.get(fname, False):
filename[pfname] = fname
else:
filename[pfname] = _add_sds_ext(fname)
else:
for pfname, fname in enumerate(filename):
filename[pfname] = _add_sds_ext(fname)
# check mask
# if mask is not None:
# if filter is not None:
# raise ValueError('Both "mask" and "filter" are set. Only one can be set.')
# if not isinstance(mask, np.ndarray):
# mask = np.atleast_1d(mask)
# if isinstance(mask, np.ndarray) and mask.dtype.char != '?':
# pass
zerofilter = False
mask = None
# check filter
if filter is not None:
# Always use boolean mask now
mask = _convert_to_mask(filter)
filter = None
if len(mask) == 0:
# special handling of zero filters
zerofilter = True
warnings.warn(f"Zero length filter for sds detected.")
savethreads = None if threads is None else rc.SetThreadWakeUp(threads)
if sharename is None:
try:
# TODO:
# When len(filename) > 1000 we need to loop over MultiDecompressFiles and read 1000 files at a time
# to avoid hitting open file limits on various operating systems
if SDSVerbose:
verbose_start = VerbosePrint(
f"calling rc.MultiDecompressFiles first: {filename[0]} last: {filename[-1]}"
)
if stack is not False and info is False and len(filename) == 1:
# this path checks for just one file. this file may have been appended to and so contains multple files inside
# if we detect the appended file ('StackType'==1), we default to stacking unless a forced stack=False
result = rc.MultiPossiblyStackFiles(
filename,
include=include,
folders=folders,
filter=filter,
mask=mask,
mustexist=mustexist,
sections=sections,
)
if result is None:
raise ValueError(f"No data was found in the file {filename} with the specified parameters.")
# the length of the return result will tell use what type of file this was
if len(result) > 0 and len(result[0]) != 4:
# this is an sds_concat file and we assume it is stacked
return _stacked(filename, result, folders)
# TO BE DELETED WHEN PASSES TESTS
## arrays, meta_tups, cutoffs, meta, loadedpaths, fileheader_dict
# try:
# # TODO: Future improvement for split style loading
# # reader checks the header and calls one of the two style loads
# result = rc.MultiStackFiles(filename, include=include, folders=folders, filter=filter, mustexist=mustexist, sections=sections)
# arrs, meta_tups, cutoffs, meta, loadedpaths, fileheader_dict = result
# if fileheader_dict['StackType'] == 1:
# # this is an appended file
# return _stacked(filename, result, folders)
# # else convert back to as if normal MultiDecomp was called
# result = ((meta[0], arrs, meta_tups, fileheader_dict),)
# except Exception:
# # fallback to old style read
# result = rc.MultiDecompressFiles(filename, mode, include=include, folders=folders, sections=sections, filter=filter, mustexist=mustexist)
else:
# meta, arrs, meta_tups, fileheader_dict
result = rc.MultiDecompressFiles(
filename,
mode,
include=include,
folders=folders,
sections=sections,
filter=filter,
mask=mask,
mustexist=mustexist,
)
if SDSVerbose:
VerbosePrintElapsed(f"finished rc.MultiDecompressFiles", verbose_start)
finally:
if savethreads is not None:
rc.SetThreadWakeUp(savethreads)
else:
# call it the old way for now
filename = filename[0]
if isinstance(filename, str):
filename = filename.encode()
if include is not None:
include = _include_as_dict(include)
try:
if SDSVerbose:
verbose_start = VerbosePrint(f"calling rc.DecompressFiles filename: {filename} sharename: {sharename}")
# return a list of one to normalize return values
result = [
rc.DecompressFile(
filename,
mode,
sharename,
include=include,
folders=folders,
sections=sections,
filter=filter,
mustexist=mustexist,
)
]
if SDSVerbose:
VerbosePrintElapsed(
f"calling rc.MultiDecompressFiles filename: {filename} sharename: {sharename}", verbose_start
)
finally:
if savethreads is not None:
rc.SetThreadWakeUp(savethreads)
if zerofilter:
# we just read in first row and now we have to return arrays of 0 length in the first dim
result = _make_zero_length(result)
return result
# ------------------------------------------------------------------------------------
[docs]
def compress_dataset_internal(
filename: AnyPath,
metadata: bytes,
listarrays: List[np.ndarray],
meta_tups: Optional[List[Tuple[str, SDSFlag]]] = None,
comptype: CompressionType = CompressionType.ZStd,
complevel: Optional[int] = 2,
fileType=0,
sharename: Optional[AnyStr] = None,
bandsize=None,
append=None,
) -> None:
"""
All SDS saves will hit this routine before the final call to ``riptable_cpp.CompressFile()``
Parameters
----------
filename : str or bytes or os.PathLike
Fully qualified filename (path has already been checked by save_sds wrapper)
metadata : bytes
JSON metadata as a bytestring
listarrays : list of numpy arrays
meta_tups : Tuples of (itemname, SDSFlag) - see SDSFlag enum in rt_enum.py
comptype : CompressionType
Specify the type of compression to use when saving the Dataset.
complevel : int
Compression level. 2 (default) is average. 1 is faster, less compressed, 3 is slower, more compressed.
fileType : SDSFileType
See SDSFileType in rt_enum.py - distinguishes between Struct, Dataset, Single item, or Matlab Table
sharename : str or bytes, optional
If provided, data will be saved (uncompressed) into shared memory. No file will be saved to disk.
Returns
-------
None
"""
if complevel is None:
complevel = 2
if meta_tups is None:
meta_tups = []
# Normalize str or path-like to bytes.
filename = _anypath_to_bytes(filename)
# until the low-level routine does this, put the final extension check here
# all SDS saves will hit this block
if not filename.endswith(SDS_EXTENSION.encode()):
filename += SDS_EXTENSION.encode()
# Metadata should be specified as 'bytes'; but in case it's a str,
# convert to bytes.
if not isinstance(metadata, bytes):
metadata = metadata.encode()
if not isinstance(listarrays, list):
raise TypeError(f"Input must be list of numpy arrays. Got {type(listarrays)}")
# TODO - a gateway check could go here
if sharename is None:
if SDSVerbose:
print(f"calling rc.CompressFile {filename}")
rc.CompressFile(
filename, metadata, listarrays, meta_tups, comptype, complevel, fileType, bandsize=bandsize, section=append
)
if SDSVerbose:
print(f"finished rc.CompressFile")
else:
if isinstance(sharename, str):
sharename = sharename.encode()
rc.CompressFile(
filename,
metadata,
listarrays,
meta_tups,
comptype,
complevel,
fileType,
sharename,
bandsize=bandsize,
section=append,
)
# ------------------------------------------------------------------------------------
[docs]
def container_from_filetype(filetype: SDSFileType) -> type:
"""
Returns the appropriate container class based on the ``SDSFileType`` enum saved in the SDS file header.
Older files where the file type is not set will default to 0, and container will default to ``Struct``.
Parameters
----------
filetype: SDSFileType
Returns
-------
type
"""
if filetype in (SDSFileType.Dataset, SDSFileType.Table):
container_type = TypeRegister.Dataset
else:
container_type = TypeRegister.Struct
return container_type
# ------------------------------------------------------------------------------------
def _rebuild_rootfile(path: AnyPath, sharename=None, compress=True, bandsize=None, complevel=None):
"""If a dataset is saved to an existing directory that was part of a previous struct save,
_root.sds file will be resaved to include the added dataset for future loads.
"""
if not SDSRebuildRoot:
return
path = _anypath_to_str(path)
# check for root file in directory
rootpath = os.path.join(os.path.dirname(path), "_root.sds")
if not os.path.isfile(rootpath):
return
if path.endswith(".sds"):
path = path[:-4]
# decompress only the _root.sds file, not the full schema
meta, arrays, sdsflags, fileheader = decompress_dataset_internal(rootpath, sharename=sharename)[0]
# check if the container already exists in the root file
# if the new save has the same name as an array in root, array will be kept
dsname = os.path.basename(path).encode()
itemnames = {t[0]: True for t in sdsflags}
exists = itemnames.get(dsname, False)
# no rewrite
if exists:
return
# add tuple with name, container flag to sds tuples
flag = SDSFlag.Nested + SDSFlag.OriginalContainer
sdsflags.append(tuple((dsname, flag)))
# containers save None as a placeholder
arrays = list(arrays)
arrays.append(None)
comptype = CompressionType.ZStd if compress else CompressionType.Uncompressed
# send raw data back to compressor in same format as original save
try:
compress_dataset_internal(
rootpath,
meta,
arrays,
meta_tups=sdsflags,
comptype=comptype,
complevel=complevel,
fileType=fileheader["FileType"],
sharename=sharename,
bandsize=bandsize,
)
except:
warnings.warn(f"Could not add {dsname} to {rootpath}.")
# ------------------------------------------------------------------------------------
def skeleton_from_meta_data(container_type, filepath, meta, arrays, meta_tups, file_header):
# bug when nested struct doesn't have its own file (only has containers)
# need to find a general way to address this for all SDS loads
# general loader should also look for files that start with end of path
# if a folder is:
# st1 /
# st2!ds1.sds
# st2!ds2.sds
# ds3.sds
# should be able to call load_sds(r'st1/st2')
# also not sure if this will get hit when onefile = True
# looks like meta data doesn't work for this
# need to turn onefile info load into same meta as multiple file info load
if not isinstance(meta, MetaData):
meta = MetaData(meta)
data = {}
specitems = {}
arr_idx = 0
# store all sds container info in a separate struct
# BUG: the columns are sorted later... these meta items are not in the meta data
# sort appears to be checking number of columns first
data["Name_"] = meta["name"]
data["Type_"] = container_type.__name__
data["FilePath_"] = filepath
data["MetaData_"] = meta
# placeholder for shape
data["Shape_"] = None
num_infofields = len(data)
def get_dtype(dtypenum, itemsize):
# bytestrings
if dtypenum == 18:
return np.dtype("S" + str(itemsize))
# unicode
elif dtypenum == 19:
return np.dtype("U" + str(itemsize // 4))
else:
return np.dtype(np.sctypeDict[dtypenum])
def info_to_struct(f):
def wrapper(item_tup, sds_tup, filepath):
iteminfo = {}
iteminfo["Name_"] = sds_tup[0].decode()
iteminfo = f(iteminfo, item_tup, sds_tup, filepath)
iteminfo["SDSFlags_"] = sds_tup[1]
iteminfo = TypeRegister.Struct(iteminfo)
# iteminfo.sds_info_on()
return iteminfo
return wrapper
@info_to_struct
def arrinfo_to_struct(arrinfo, array_tup, sds_tup, filepath):
# also store container filepath so this array can be sniped
# load_sds() can be sent the item name in 'include' keyword
arrinfo["Type_"] = "FastArray"
arrinfo["Shape_"] = TypeRegister.FastArray(array_tup[0])
arrinfo["Dtype_"] = get_dtype(array_tup[1], array_tup[3])
arrinfo["NumpyFlags_"] = array_tup[2]
arrinfo["Itemsize_"] = array_tup[3]
arrinfo["FilePath_"] = filepath
return arrinfo
@info_to_struct
def scalarinfo_to_struct(scinfo, scalar_tup, sds_tup, filepath):
dtype = get_dtype(scalar_tup[1], scalar_tup[3])
scinfo["Type_"] = dtype.type.__name__
scinfo["FilePath_"] = filepath
return scinfo
# store to individually load this
for idx, tup in enumerate(meta_tups):
name = tup[0].decode()
itemenum = tup[1]
if itemenum & SDSFlag.OriginalContainer:
if itemenum & SDSFlag.Scalar:
# data['nScalars'] += 1
data[name] = scalarinfo_to_struct(arrays[idx], tup, filepath)
# nested container will add its own info
elif itemenum & SDSFlag.Nested:
pass
# data['nContainers'] += 1
# do this to maintain order
else:
# data['nArrays'] += 1
# store array info tuple
data[name] = arrinfo_to_struct(arrays[idx], tup, filepath)
else:
# don't add to main dict of items
# later on, if has extra columns, check item meta data
# data['nExtra'] += 1
# save to extra dict?
# add to this items array info?
specname = name[: name.find("!")]
# add tuple of extra array info to spec items dict
# each special item will have a list of extra array info
specitem = specitems.setdefault(specname, [])
specitem.append(tuple((arrays[idx], tup)))
# add more info for special items
try:
for imeta in meta["item_meta"]:
imeta = MetaData(imeta)
name = imeta["name"]
arrinfo = data[name]
# change type
# add info for extra arrays if present
arrinfo["Type_"] = imeta["classname"]
arrinfo["MetaData_"] = imeta
# special item types should probably have their own routine to do repair their info
# similar to _load_from_sds_meta_data
extra_arrays = specitems.get(name, None)
if extra_arrays is not None:
extrastart = len(name) + 1
# these are tuples of (array tup, sds tup)
for extra in extra_arrays:
# trim prefix off
extraname = extra[1][0][extrastart:]
sdstup = tuple((extraname, extra[1][1]))
# generate same info for extra array, just as if it were in a container
arrinfo[extraname] = arrinfo_to_struct(extra[0], sdstup, filepath)
except:
# Tjd should print a warning or something here? This happens with matlab saves?
pass
# get container shape
if data["Type_"] == "Dataset":
# get nrows from first column (same for all in dataset)
item = list(data.values())[num_infofields]
nrows = item["Shape_"][0]
else:
nrows = 0
ncols = len(data) - num_infofields
data["Shape_"] = tuple((nrows, ncols))
# data = TypeRegister.Struct(data)
# data.sds_info_on()
return data
# ------------------------------------------------------------------------------------
def _init_root_container(path, name, sharename=None, info=False, include=None, threads=None):
fullpath = path + os.path.sep + name + SDS_EXTENSION
firstsds = decompress_dataset_internal(fullpath, sharename=sharename, info=info, include=include, threads=threads)
meta, arrays, meta_tups, fileheader_dict = firstsds[0]
container_type = container_from_filetype(fileheader_dict["FileType"])
# possibly non-json meta data (Matlab, older SDS format)
try:
final_meta = MetaData(meta)
except:
final_meta = None
if info:
# root_struct = container_type._tree_from_sds_meta_data( meta, arrays, meta_tups, fileheader_dict)
# pass in the path as path to the folder
root_struct = skeleton_from_meta_data(container_type, path, meta, arrays, meta_tups, fileheader_dict)
else:
root_struct = container_type._load_from_sds_meta_data(meta, arrays, meta_tups, fileheader_dict)
return root_struct, final_meta, meta_tups
# ------------------------------------------------------------------------------------
def _include_extra_sds_files(schema, final_sort, include_all_sds=False, include=None):
"""
If additional .sds files are found in a directory, the user can optionally load all or individually.
If additional files are loaded, the user can choose to rewrite the _root.sds file to always include these.
Parameters
----------
schema : nested dictionary build based on the files in a saved Struct's directory
final_sort : if a _root.sds file was found, a list of its item names
"""
include_extra = False
extra_items = []
# keep track of all items not in root struct
for k, v in schema.items():
if k not in final_sort:
extra_items.append(k)
# extra items found in root struct
if len(extra_items) > 0:
# skip prompt if flag is set
if include_all_sds:
include_extra = True
else:
# Change for Blair April 2019
warnings.warn(f"Found extra .sds information for items {extra_items}. They will not be included")
# prompt = f"Found extra .sds information for items {extra_items}. Would you like to include any? (y/n/a) "
# while(True):
# choice = input(prompt)
# if choice in 'Yy':
# include_extra = True
# break
# elif choice in 'Nn':
# print("No extra items will be included.")
# break
# elif choice in 'Aa':
# include_extra = True
# include_all_sds = True
# break
# extra items will be appended to sortlist
if include_extra:
for item in extra_items:
# fast track to include all extra files
if include_all_sds:
print(f"Including {item}")
final_sort.append(item)
# ask about each item until they set all flag or finish list
else:
prompt = f"Include {item}? (y/n/a) "
while True:
choice = input(prompt)
if choice in "Yy":
final_sort.append(item)
break
elif choice in "Nn":
del schema[item]
break
elif choice in "Aa":
final_sort.append(item)
include_all_sds = True
break
# exclude all extra files
else:
for item in extra_items:
del schema[item]
return include_extra
# ------------------------------------------------------------------------------------
def _build_schema_shared(
root_tups: Sequence[Tuple[AnyStr, SDSFlag]],
sharename: AnyStr,
prefix: str = "",
dirlist: Optional[List[str]] = None,
threads: Optional[int] = None,
):
"""
Returns nested schema and directory list for shared memory.
Because shared memory has no directory call, .sds file info needs to be expanded to check for nested structures.
Parameters
----------
root_tups : sequence of (str, SDSFlag)
Sequence of (itemname, SDSFlag) tuples.
sharename : str or bytes
shared memory name
prefix : str
used to recursively build .sds file names for nested structures
dirlist : list of str, optional
List gets passed through recursion to avoid generating in a separate pass.
After recursion, will be identical to directory list in non-sharedmemory load.
"""
schema = {}
if dirlist is None:
dirlist = [] # Initialize the list
for item in root_tups:
itemname = item[0].decode()
itemenum = item[1]
if itemenum & SDSFlag.OriginalContainer and itemenum & SDSFlag.Nested:
filename = prefix + itemname + SDS_EXTENSION
# check for file for nested container
try:
# only pull info to check for nested containers
# replace with some sort of shared memory specific dir() call
firstsds = decompress_dataset_internal(filename, sharename=sharename, info=True, threads=threads)
meta, arrays, meta_tups, fileheader = firstsds[0]
# only structs can have nested containers
if fileheader["FileType"] == SDSFileType.Struct:
# chain off prefix
prefix = prefix + itemname + "!"
schema[itemname], dirlist = _build_schema_shared(meta_tups, sharename, prefix, dirlist, threads)
# wasn't a struct, stop the schema here
else:
schema[itemname] = {}
dirlist.append(filename)
except:
warnings.warn(f"Could not find {filename} in sharename {sharename}.")
return schema, dirlist
# ------------------------------------------------------------------------------------
def _load_sds_mem(path: str, name=None, sharename=None, info=False, include_all_sds=False, include=None, threads=None):
"""
Shared memory checks directory differently for different operating systems.
In Linux, a regular directory listing is used.
In Windows, a different mechanism needs to be written **not implemented.
Split this into a separate routine for readability in main _load_sds routine.
"""
checkdir = False
root_struct = None
meta = None
meta_tups = None
if sys.platform != "win32":
# strip shared memory prefix
dir = TypeRegister.SharedMemory.listdir(sharename)
schema = _build_schema(path, dir)
checkdir = True
else:
# for now, can only load struct from shared memory if a _root.sds file exists
root_struct, meta, meta_tups = _init_root_container(
path, "_root", sharename=sharename, info=info, include=include, threads=threads
)
schema, dir = _build_schema_shared(meta_tups, sharename, threads=threads)
return dir, schema, checkdir, root_struct, meta, meta_tups
# ------------------------------------------------------------------------------------
def _load_sds(
path: str,
name: Optional[str] = None,
sharename: Optional[str] = None,
info: bool = False,
stack: bool = None,
include_all_sds: bool = False,
include: Optional[List[str]] = None,
threads: Optional[int] = None,
folders: Optional[List[str]] = None,
mustexist: bool = False,
sections: Optional[List[str]] = None,
filter: Optional[np.ndarray] = None,
) -> Union["Struct", "Dataset"]:
"""
Build a tree (nested dictionaries) using the SDS file names in the provided directory.
If path is a file, it will be loaded directly.
Rebuild containers and numpy arrays after possibly decompressing SDS files. Any nodes without
SDS files are assumed to be Structs.
Parameters
----------
path : str
Full path to root directory.
name : str, optional
Optionally specify the name of the struct being loaded. This might be different than directory,
however the _build_schema routine will be able to pull it generically.
Returns
-------
Struct or Dataset
When a `Struct` is returned, it may have nested data from all SDS files.
"""
has_ext = sds_endswith(path, add=False)
goodfiles = None
# TODO: only use this routine for non-shared memory loads
if sharename is None:
checkdir = False
# check file
if sds_isfile(path):
loadpath = path
# check directory
elif sds_isdir(path):
checkdir = True
else:
# if has extension, remove and check for dir again
if has_ext:
loadpath = path[: -len(SDS_EXTENSION)]
if sds_isdir(loadpath):
path = loadpath
checkdir = True
else:
raise ValueError(
f"Failed to load. {path} was not a file or directory and {loadpath} was not a directory."
)
# if no extension, add and check file again
else:
loadpath = path + SDS_EXTENSION
if not sds_isfile(loadpath):
raise ValueError(
f"Failed to load. {path} was not a file or directory and {loadpath} was not a file."
)
if not checkdir:
return _read_sds(
loadpath,
sharename=sharename,
info=info,
include=include,
stack=stack,
threads=threads,
folders=folders,
sections=sections,
filter=filter,
)
# only directories will hit this
# we can speed this up with os.walk
# remember the file list, because we know all these files exist
dir = [f for root, dirs, files in os.walk(path) for f in files]
# old code --> sds_listdir(path)
schema = _build_schema(path, dir, nodirs=True)
# remember the file list because it can be large
goodfiles = (dir, path)
root_struct = None
meta = None
meta_tups = None
else:
# shared memory path
if sys.platform != "win32":
# TJD this path needs to be tested more
if has_ext:
return _read_sds(
path,
sharename=sharename,
info=info,
include=include,
stack=stack,
sections=sections,
threads=threads,
filter=filter,
)
dir, schema, checkdir, root_struct, meta, meta_tups = _load_sds_mem(
path,
name=name,
sharename=sharename,
info=info,
include_all_sds=include_all_sds,
include=include,
threads=threads,
)
else:
# NOTE: windows shared memory does not support dataset nesting via a struct currently..
# but it could with a little more work
return _read_sds(
path,
sharename=sharename,
info=info,
include=include,
stack=stack,
threads=threads,
folders=folders,
sections=sections,
filter=filter,
)
# root struct still needs to be initialized - windows sharedmemory load has root struct already
# linux has a normal directory listing from the file system
if checkdir:
if name is None:
name = f"_root.sds"
if name not in dir:
# directories with SDS and no _root are pretty common, killing this warning
# warnings.warn(f'Could not find _root.sds file. Loading files in {dir} into container struct.')
root_struct = TypeRegister.Struct({})
meta = None
else:
# build the initial struct from root sds
del schema["_root"]
root_struct, meta, meta_tups = _init_root_container(
path, "_root", sharename=sharename, info=info, include=include, threads=threads
)
file_prefix = None
else:
# tiers can be separated by /, but files will be named with !
name = name.replace("/", "!")
if sds_endswith(name, add=False):
name = name[:-4]
# use name keyword to snipe one dataset or struct
if name + SDS_EXTENSION in dir:
root_struct, meta, meta_tups = _init_root_container(
path, name, sharename=sharename, info=info, include=include, threads=threads
)
file_prefix = name
name = name.split("!")
# climb down tiers
for tier in name:
schema = schema[tier]
else:
raise ValueError(f"Could not find .sds file for {name} in {path}")
# TODO: write something to handle name keyword in shared memory
else:
file_prefix = None
final_sort = None
root_file_found = meta is not None
# possibly load from extra files in directory
include_extra = False
if root_file_found:
final_sort = _order_from_meta(root_struct, meta, meta_tups)
# all items will be included if no root file was found
if final_sort is not None:
# check for extra files, see if user wants to include
include_extra = _include_extra_sds_files(schema, final_sort, include_all_sds, include=include)
# choose the correct recursive function (full load or just info)
# the recursive function will crawl other structures, or dictionaries from tree
if info:
# build_func = _summary_from_schema
nocrawl = str
else:
# build_func = _struct_from_schema
nocrawl = np.ndarray
# multiload = None
multiload = []
# load individual files
# not supported for shared memory
# include keyword behaves differently than with an individual file load, so take the less common path for that too
if multiload is None or sharename is not None or include is not None:
# ---- main load for entire directory
for k, v in schema.items():
if include is not None:
if k not in include:
continue
try:
item = root_struct[k]
# none indicates that the structure was initialized, but data hasn't been loaded from file
# this helps preserve item order in struct
if item is None:
# root_struct[k] = build_func(schema, path, dir, filename=file_prefix, root=k, sharename=sharename, include=include)
root_struct[k] = _sds_load_from_schema(
schema,
path,
dir,
filename=file_prefix,
root=k,
sharename=sharename,
include=None,
info=info,
nocrawl=nocrawl,
threads=threads,
)
# root_struct[k] = _sds_load_from_schema(schema, path, dir, filename=file_prefix, root=k, sharename=sharename, include=include, info=info, nocrawl=nocrawl, threads=threads)
else:
warnings.warn(
f"Found .sds file for item {k}, but was already in struct as {root_struct[k]}. Skipping .sds load.",
stacklevel=2,
)
except:
# root_struct[k] = build_func(schema, path, dir, filename=file_prefix, root=k, sharename=sharename)
root_struct[k] = _sds_load_from_schema(
schema,
path,
dir,
filename=file_prefix,
root=k,
sharename=sharename,
include=None,
info=info,
nocrawl=nocrawl,
threads=threads,
)
# root_struct[k] = _sds_load_from_schema(schema, path, dir, filename=file_prefix, root=k, sharename=sharename, include=include, info=info, nocrawl=nocrawl, threads=threads)
# in this branch, flip to multi-file load
else:
# first pass, collect all filepaths
# TODO: fold this into one pass, store return index in some kind of nested dictionary?
for k, v in schema.items():
if include is not None:
if k not in include:
continue
try:
item = root_struct[k]
if item is None:
_ = _sds_load_from_schema(
schema,
path,
dir,
filename=file_prefix,
root=k,
sharename=sharename,
include=None,
info=info,
nocrawl=nocrawl,
multiload=multiload,
)
else:
pass
except:
_ = _sds_load_from_schema(
schema,
path,
dir,
filename=file_prefix,
root=k,
sharename=sharename,
include=None,
info=info,
nocrawl=nocrawl,
multiload=multiload,
)
# call multiload, loads all into list
# NEW: pass in list of known good files
multiload = decompress_dataset_internal(
multiload,
sharename=sharename,
info=info,
include=include,
stack=stack,
threads=threads,
filter=filter,
goodfiles=goodfiles,
)
# if isinstance(multiload, tuple):
# multiload = [multiload]
# second pass, build nested containers
# fake python int pointer to index the order of loaded files, restore correct hierarchy
multiload_idx = [0]
for k, v in schema.items():
if include is not None:
if k not in include:
continue
try:
item = root_struct[k]
if item is None:
root_struct[k] = _sds_load_from_schema(
schema,
path,
dir,
filename=file_prefix,
root=k,
sharename=sharename,
include=None,
info=info,
nocrawl=nocrawl,
multiload=multiload,
multiload_idx=multiload_idx,
)
else:
pass
except:
root_struct[k] = _sds_load_from_schema(
schema,
path,
dir,
filename=file_prefix,
root=k,
sharename=sharename,
include=None,
info=info,
nocrawl=nocrawl,
multiload=multiload,
multiload_idx=multiload_idx,
)
# if root file found for metadata, sort items in root struct
# if no root file found, order will be same as directory order
if root_file_found and final_sort is not None and include is None:
# if any files from original list were not included in final struct, list them, remove from sort list
missing = []
for item in final_sort:
rm_item = False
try:
v = root_struct[item]
except:
rm_item = True
else:
# initialized from root info, but no .sds file found - value will be None
if v is None:
rm_item = True
if rm_item:
warn_missing = True
if include is not None:
if rm_item not in include:
warn_missing = False
if warn_missing:
warnings.warn(f"Could not load data for item {item}, file for this item may be missing.")
missing.append(item)
for item in missing:
final_sort.remove(item)
root_struct = root_struct[final_sort]
# if extra files were added to root struct, optionally rebuild _root.sds
# if all extra files were included, skip the prompt, but don't rewrite the _root.sds file
if include_extra and not include_all_sds:
prompt = f"Include extra items in root struct for future loads? (_root.sds will be rebuilt) (y/n) "
while True:
choice = input(prompt)
if choice in "Yy":
_write_to_sds(root_struct, path, name="_root", compress=True, sharename=sharename)
break
elif choice in "Nn":
break
return root_struct
# ------------------------------------------------------------------------------------
def _sds_load_from_schema(
schema,
path,
dir,
filename=None,
root=None,
sharename=None,
include=None,
info=False,
nocrawl=np.ndarray,
multiload=None,
multiload_idx=None,
multiload_schema=None,
threads=None,
):
r"""
Recursive function for loading data or info from .sds directory.
Nested structures are stored:
Example
-------
>>> st = Struct({ 'a': Struct({ 'arr' : arange(10),
'a2' : Dataset({ 'col1': arange(5) }) }),
'b': Struct({ 'ds1' : Dataset({ 'ds1col': arange(6) }),
'ds2' : Dataset({ 'ds2col' : arange(7) }) }),
})
>>> st.tree()
Struct
├──── a (Struct)
│ ├──── arr int32 (10,) 4
│ └──── a2 (Dataset)
│ └──── col1 int32 (5,) 4
└──── b (Struct)
├──── ds1 (Dataset)
│ └──── ds1col int32 (6,) 4
└──── ds2 (Dataset)
└──── ds2col int32 (7,) 4
>>> st.save(r'D:\junk\morejunk')
>>> os.listdir(r'D:\junk\morejunk')
_root.sds
a!a2.sds
a.sds
b!ds1.sds
b!ds2.sds
"""
multiload_schema = {}
schema = schema[root]
if filename is not None:
filename = filename + "!" + root
else:
# for root level items
filename = root
# set default container in case nested .sds file doesn't exist
default_container = TypeRegister.Struct
data = {}
sds_file = filename + SDS_EXTENSION
# check for file in directory list
if sds_file in dir:
fullpath = path + os.path.sep + sds_file
if multiload is None:
# load container or array
data = _read_sds(fullpath, sharename=sharename, include=include, info=info)
else:
if multiload_idx is None:
# add full path for final multiload call
multiload.append(fullpath)
# maybe add to a different schema so the second pass for final load can be reduced
# will this save any time? it would reduce the amount of calls to 'in', but not much else
else:
# pass the preloaded data to final constructor
data = _read_sds(
fullpath, sharename=sharename, include=include, info=info, multiload=multiload[multiload_idx[0]]
)
multiload_idx[0] += 1
# only recurse/restore order for containers
if not isinstance(data, nocrawl):
# TJD Feb 2020 - this code is slow when many files in the directory > 10000+
# TODO improve the speed of this
for k in schema.keys():
data[k] = _sds_load_from_schema(
schema,
path,
dir,
filename=filename,
root=k,
sharename=sharename,
include=include,
info=info,
nocrawl=nocrawl,
multiload=multiload,
multiload_idx=multiload_idx,
threads=threads,
)
# nested structures might not have .sds files, flip to default container type (Struct)
if multiload is None or multiload_idx is not None:
if not isinstance(data, default_container):
data = default_container(data)
return data
# ------------------------------------------------------------------------------------
def _order_from_meta(data, meta, meta_tups):
"""
Restore the order of container items based on meta data.
Meta tuples with (itemname, SDSFlag) will be checked first.
If there is a mismatch (possibly older SDS version), json meta data will be used.
If meta data is not valid, order will not change.
"""
if meta_tups is None:
return None
numitems = len(data)
success = False
# first try with tuples only
order = []
for t in meta_tups:
if t[1] & SDSFlag.OriginalContainer:
order.append(t[0].decode())
# OLD: sds is no longer dependent on item_names in python meta data
if len(order) != numitems:
if isinstance(meta, MetaData):
# items still might be missing
try:
order = meta["item_names"]
except:
order = None
else:
order = None
return order
# ------------------------------------------------------------------------------------
def _build_schema(path: str, dir, share_prefix="", nodirs=False) -> dict:
"""
Build a tree with nested dictionaries using SDS files names in provided directory. Leaf
nodes will be empty dictionaries.
Parameters
----------
path : str
Full path to root directory.
dir
share_prefix
nodirs : bool, defaults to False
Returns
-------
schema : dict
Nested dictionary.
"""
plen = len(share_prefix)
schema = {}
# fnames=[f for root, dirs, files in os.walk(f1) for f in files]
# TJD check if caller stripped the directories already
if nodirs:
files = [f[plen:-4] for f in dir if sds_endswith(f, add=False)]
else:
# also check if a directory
# NOTE this runs too slow when there are many files because of all the isdir checks
files = [f[plen:-4] for f in dir if sds_endswith(f, add=False) and not sds_isdir(os.path.join(path, f))]
for f in files:
schema_p = schema
while True:
sep_idx = f.find("!")
if sep_idx != -1:
node = f[:sep_idx]
schema_p = schema_p.setdefault(node, {})
else:
node = f
schema_p = schema_p.setdefault(node, {})
break
f = f[sep_idx + 1 :]
return schema
def _parse_nested_name(name):
"""
For Struct/Dataset save when a name is provided. Turns the '/' into '!' for the correct
file prefix.
"""
if isinstance(name, bytes):
name = name.decode()
name = name.replace("/", "!")
return name
# ------------------------------------------------------------------------------------
[docs]
def save_struct(
data=None,
path=None,
sharename=None,
name=None,
overwrite=True,
compress=True,
onefile=False,
bandsize=None,
complevel=None,
):
if path is not None:
path = _anypath_to_str(path)
has_nested_containers = data.has_nested_containers
# only create a directory if the struct has nested containers
if has_nested_containers and not onefile:
# add this change when matlab save changes
# to mirror the save, strip .sds if the user added the extension
# .sds should only be on files, not directories
# if path.endswith(SDS_EXTENSION):
# path = path[:-4]
if _sds_path_multi(path, share=sharename, overwrite=overwrite):
# all structs with nested containers will get a _root.sds (helps maintain order)
rootname = "_root" if name is None else name.split("!")[0]
_write_to_sds(
data,
path,
name=rootname,
compress=compress,
sharename=sharename,
onefile=onefile,
bandsize=bandsize,
complevel=complevel,
)
_sds_from_tree(data, path, name=name, sharename=sharename, compress=compress)
else:
return
# otherwise, save as a single .sds file, possibly overwrite
else:
path, name, status = _sds_path_single(path, share=sharename, overwrite=overwrite)
if status is False:
return
if has_nested_containers and onefile:
# we have nesting and onefile is true
flatstruct = data.flatten()
meta = flatstruct.metastring
del flatstruct.metastring
arrayflags = flatstruct.arrayflags
del flatstruct.arrayflags
arrays = [*flatstruct.values()]
meta_tups = [(name.encode(), arrayflag) for name, arrayflag in zip(flatstruct.keys(), arrayflags)]
filetype = SDSFileType.OneFile
else:
meta, arrays, meta_tups = data._build_sds_meta_data(name)
meta = meta.string
filetype = SDSFileType.Struct
comptype = COMPRESSION_TYPE_ZSTD if compress else COMPRESSION_TYPE_NONE
compress_dataset_internal(
path,
meta,
arrays,
sharename=sharename,
meta_tups=meta_tups,
comptype=comptype,
fileType=filetype,
bandsize=bandsize,
complevel=complevel,
)
# most structs will create a new directory when they save
# possibly append to existing _root.sds file if this struct was appended to another
if name is not None or onefile:
if not onefile:
# get first name in sequence like 'st1!nested1!ds2'
rootname = name.split("!")[0]
path = os.path.join(path, rootname)
_rebuild_rootfile(path, sharename=sharename, compress=compress, bandsize=bandsize, complevel=complevel)
# ------------------------------------------------------------------------------------
def _escape_filename(name: str, dname: str) -> Tuple[str, str]:
"""
Raises an error if invalid characters are found in dname.
Not fully implemented.
TODO: replace with escape string
Parameters
----------
name : str
Full name of individual file - includes ! to separate tiers of riptable container classes.
dname : str
Leaf of file new container name to be checked/possibly escaped.
Returns
-------
name : str
Full name may be escaped.
dname : str
Name of container may be escaped.
"""
for invalid in INVALID_FILE_CHARS:
if invalid in dname:
raise ValueError(f"Invalid character {invalid} found in file name {name}.")
return name, dname
# ------------------------------------------------------------------------------------
def _read_sds(
path: str,
sharename: Optional[str] = None,
info: bool = False,
include: Optional[List[str]] = None,
stack: bool = None,
multiload=None,
threads: Optional[int] = None,
folders: Optional[List[str]] = None,
sections=None,
filter: Optional[np.ndarray] = None,
mustexist: bool = False,
):
"""
Wrapper around a single .sds file load. Will return the appropriate item type based on the file header.
Parameters
----------
path : str
full path to .sds file or filename in shared memory (will always be trimmed to name.sds in shared)
sharename : str, optional
specify a shared memory name instead of loading from disk
info : bool
When True, array header information will be returned, stored in a struct.
include : list of str, optional
if not None, list of column names to selectively load
folders : list of str, optional
list of strings containing folder names. Only valid when file was saved with ``onefile=True``.
"""
# even if one string, convert to a list of one string
if isinstance(path, (str, bytes)):
path = [path]
specialappend = -1
# firstsds is normalized - it is what SDS decompress return (always a list of tuples)
if multiload is None:
firstsds = decompress_dataset_internal(
path,
sharename=sharename,
info=info,
include=include,
stack=stack,
threads=threads,
folders=folders,
sections=sections,
filter=filter,
mustexist=mustexist,
)
# check if file was concat
if not isinstance(firstsds, (list, tuple)):
return firstsds
meta, arrs, meta_tups, fileheader_dict = firstsds[0]
if len(firstsds) > 1:
# check for a file that was appended that has a stack type
# currently if sds_concat was called, StackType will be 1
# if it was manually appended, then it will be 0
# to be used in the future
specialappend = fileheader_dict.get("StackType", 0)
else:
meta, arrs, meta_tups, fileheader_dict = multiload
ftype = fileheader_dict.get("FileType", None)
if ftype == SDSFileType.Array:
# TODO: determine what to do with include list for single item - need to know filetype before entire load
# if include is not None:
# warnings.warn(f'Found single item in .sds file, but include was not None: {include}. Ignoring include keyword, Loading single item.')
# include = None
result = _sds_load_single(meta, arrs, meta_tups, info=info)
elif ftype == SDSFileType.OneFile:
if info:
raise NotImplementedError(f"SDS info struct not yet supported for onefile saves.")
return TypeRegister.Struct._from_sds_onefile(arrs, meta_tups, meta=meta, folders=folders)
else:
container_type = container_from_filetype(ftype)
if info:
loader = getattr(container_type, "_tree_from_sds_meta_data")
# test skeleton struct
return skeleton_from_meta_data(container_type, path, meta, arrs, meta_tups, fileheader_dict)
else:
loader = getattr(container_type, "_load_from_sds_meta_data")
section_count = 1
if multiload is None:
section_count = len(firstsds)
# since ver 4.5, sds can be appended to
if len(path) == 1 and section_count > 1:
result = np.empty(section_count, dtype="O")
offsets = np.empty(section_count, dtype=np.int64)
sections = None
# walk all possible datasets, structs
for i, sds in enumerate(firstsds):
meta, arrays, meta_tups, fileheader_dict = sds
result[i] = loader(meta, arrays, meta_tups, fileheader_dict)
# find which one has the Sections (this is random due to threading)
offsets[i] = sds[3].get("SectionOffset", None)
temp = sds[3].get("Sections", None)
if temp:
sections = temp
# sort by the order appended by
sortorder = np.lexsort([offsets])
result = result[sortorder]
# we should always find the section names
if sections:
set_type = set()
len_type = set()
# now try a named Struct
# this will work for one file that was appended with sections
# if we have multiple files with sections appended we need another routine
resultStruct = TypeRegister.Struct({})
for s, r in zip(sections, result):
resultStruct[s] = r
set_type.add(type(r))
try:
len_type.add(len(r))
except Exception:
# col not found and is likely None
len_type.add(0)
# check if all the section were the same type and same length
if len(set_type) == 1 and len(len_type) == 1:
only_type = set_type.pop()
if only_type == TypeRegister.Dataset:
# try to make one dataset
arr_dict = {}
counter = 0
fail = False
for ds in result:
for k, v in ds.items():
# try (small effort) for no name clashes
if k in arr_dict:
k = k + "_" + str(counter)
# give up if still not unique
if k in arr_dict:
fail = True
break
counter = counter + 1
arr_dict[k] = v
# return a Dataset if we can
if not fail:
return TypeRegister.Dataset(arr_dict)
# try to return a Struct if no information loss
if len(resultStruct) == len(result):
return resultStruct
else:
result = loader(meta, arrs, meta_tups, fileheader_dict)
return result
# ------------------------------------------------------------------------------------
def _write_to_sds(
data,
path,
name=None,
compress=True,
sharename=None,
fileType=None,
onefile=False,
bandsize=None,
append=None,
complevel=None,
):
"""
:param data: Struct/Dataset/Multiset (must have _build_sds_meta_data() method)
:param name: Name of SDS file. Hierarchy separated by !
:param path: Full path to directory where all SDS files are stored.
Writes a data structure's metadata, python objects, and numpy arrays to an SDS file
with optional compression.
:return None:
"""
# what do we do about other containers/subclasses?
if fileType is None:
if isinstance(data, TypeRegister.Dataset):
fileType = SDSFileType.Dataset
else:
fileType = SDSFileType.Struct
# path IS the single file
if name is None:
dname = name
fullpath = path
else:
# chop off end of string or use full string
dname_idx = name.rfind("!") + 1
dname = name[dname_idx:]
name, dname = _escape_filename(name, dname)
fullpath = path + os.path.sep + name
comptype = CompressionType.ZStd if compress else CompressionType.Uncompressed
if onefile:
# NOTE: this routine is similar to savestruct
# TODO: Reduce to one routine
flatstruct = data.flatten()
meta = flatstruct.metastring
del flatstruct.metastring
arrayflags = flatstruct.arrayflags
del flatstruct.arrayflags
arrays = [*flatstruct.values()]
meta_tups = [(name.encode(), arrayflag) for name, arrayflag in zip(flatstruct.keys(), arrayflags)]
fileType = SDSFileType.OneFile
compress_dataset_internal(
fullpath,
meta,
arrays,
meta_tups=meta_tups,
comptype=comptype,
sharename=sharename,
fileType=fileType,
bandsize=bandsize,
append=append,
complevel=complevel,
)
else:
meta, arrays, meta_tups = data._build_sds_meta_data(name=dname)
compress_dataset_internal(
fullpath,
meta.string,
arrays,
meta_tups=meta_tups,
comptype=comptype,
sharename=sharename,
fileType=fileType,
bandsize=bandsize,
append=append,
complevel=complevel,
)
# ------------------------------------------------------------------------------------
def _sds_from_tree(data, path, name=None, compress=True, sharename=None):
"""
:param data: Struct/Dataset/Multiset
:param path: Full path to directory where SDS files will be stored
:param name: Name of possible SDS file. Hierarchy separated by !
Recursively crawls through containers within a Struct, generating SDS files as necessary.
If a container only holds other containers, no SDS file will be generated. When loaded, it
it will be reconstructed as a Struct.
:return None:
"""
# in shared memory, always generate .sds file, even for struct that only has containers
needs_sds = sharename is not None
if len(data) == 0:
# empty container, like empty struct will also be written
needs_sds = True
for k, v in data.items():
# flip hdf5.io objects to riptable containers
try:
if v.__module__ == "hdf5.io":
v = h5io_to_struct(v)
except:
pass
# if the item is an riptable container class, crawl it
if hasattr(v, "items") and hasattr(v, "_build_sds_meta_data"):
if name is None:
new_name = k
else:
new_name = name + "!" + k
_sds_from_tree(v, path, name=new_name, compress=compress, sharename=sharename)
# needs_sds = True
# if the item contains things other than containers (arrays, python objects), it needs an sds file
else:
needs_sds = True
if needs_sds:
if name is not None:
_write_to_sds(data, path, name=name, compress=compress, sharename=sharename)