hermes.base.storage¶
The module is responsible for high-performance asynchronous file IO. It periodically spins up coroutines to offset long IO operations with concurrent writes of different files.
storage_interface - base interface for Storage function.
storage - concrete FSM-based Storage with streaming and dumping features.
storage_states - FSM states of the Storage.
storage_interface
¶
Classes:
| Name | Description |
|---|---|
StorageInterface |
Interface for the AsyncIO Storage component. |
StorageInterface
¶
Bases: ABC
Interface for the AsyncIO Storage component.
_initialize
abstractmethod
¶
_is_to_dump
abstractmethod
¶
Check if any streams were configured to record.
Returns:
| Name | Type | Description |
|---|---|---|
bool |
bool
|
Whether there are any streams configured to dump record data. |
_is_to_stream
abstractmethod
¶
Check if any streams were configured to stream.
Returns:
| Name | Type | Description |
|---|---|---|
bool |
bool
|
Whether there are any streams configured to stream data. |
_log_data
abstractmethod
async
¶
Main AsyncIO loop for Storage to write files concurrentlyto disk.
_release_thread_pool
abstractmethod
¶
Trigger release of AsyncIO resources used for writing files.
_set_state
abstractmethod
¶
_set_state(state) -> None
User defined logic for FSM state transition.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
Any
|
New state to transition to. |
required |
_start_dump_logging
abstractmethod
¶
Initialize passive recording until terminated to dump data once.
_start_stream_logging
abstractmethod
¶
Set up AV/HDF5 file writers for stream-logging, if desired.
_stop_stream_logging
abstractmethod
¶
Trigger termination and flushing of accumulated streamed data.
Will wait for the thread to finish before returning.
_wait_till_flush
abstractmethod
¶
Sleep until the Storage is triggered to terminate after flushing.
storage
¶
Classes:
| Name | Description |
|---|---|
Storage |
Manages IO operations of all stream data. |
Storage
¶
Bases: StorageInterface
Manages IO operations of all stream data.
Flushes data periodically for continuous operation and clears from memory to reduce RAM usage, or all at once if user guarantees enough memory. In continuous mode, will flush leftover data once the program is stopped.
Logs video and audio data with FFmpeg to MKV/MP4 and MP3, respectively. Logs all other sensor data in a single hierarchical HDF5 file. CSV format is also supported, but discouraged -> creates file per sub-stream.
If sub-stream elements contain a burst of samples of sample_size, will automatically unroll it.
Will fail if no FFmpeg is installed on the system.
Methods:
| Name | Description |
|---|---|
__call__ |
Callable that runs main FSM loop. |
__init__ |
Constructor of the Storage component responsible for all IO. |
cleanup |
Stop stream-logging and wait for it to finish. |
__call__
¶
__init__
¶
__init__(
log_tag: str,
log_dir: str,
log_time_s: float,
experiment: dict[str, str],
stream_csv: bool = False,
stream_hdf5: bool = False,
stream_video: bool = False,
stream_audio: bool = False,
dump_csv: bool = False,
dump_hdf5: bool = False,
dump_video: bool = False,
dump_audio: bool = False,
video_codec: VideoCodecDict | None = None,
video_codec_num_cpu: int = 1,
audio_codec: AudioCodecDict | None = None,
audio_codec_num_cpu: int = 1,
stream_period_s: float = 30.0,
**_,
)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
str
|
Filename prefix. |
required |
|
str
|
Path to the directory on disk to flush data to. |
required |
|
float
|
Start time of saving data. |
required |
|
dict[str, str]
|
Nested setup definition of Nodes across distributed hosts. |
required |
|
bool
|
Whether to stream data into CSV files. Defaults to False. |
False
|
|
bool
|
Whether to stream data into HDF5 files. Defaults to False. |
False
|
|
bool
|
Whether to stream video data into MP4/MKV files. Defaults to False. |
False
|
|
bool
|
Whether to stream audio data into MP3/WAV files. Defaults to False. |
False
|
|
bool
|
Weather to dump in-memory recorded data in CSV files. Defaults to False. |
False
|
|
bool
|
Weather to dump in-memory recorded data in HDF5 files. Defaults to False. |
False
|
|
bool
|
Weather to dump in-memory recorded video data in MP4/MKV files. Defaults to False. |
False
|
|
bool
|
Weather to dump in-memory recorded audio data in MP3/WAV files. Defaults to False. |
False
|
|
VideoCodecDict | None
|
Definition of the video codec to use for FFmpeg. Defaults to None. |
None
|
|
int
|
Number of CPU cores to limit an FFmpeg process to for video writing. Defaults to 1. |
1
|
|
AudioCodecDict | None
|
Definition of the audio codec to use for FFmpeg. Defaults to None. |
None
|
|
int
|
Number of CPU cores to limit an FFmpeg process to for audio writing. Defaults to 1. |
1
|
|
float
|
Duration of periods over which to flush streamed accumulated data from memory to disk. Defaults to 30.0. |
30.0
|
_close_files_hdf5
¶
Flush/close the HDF5 file writer.
Resizes datasets to remove extra empty rows.
_init_files_audio
¶
[Not implemented] Create and initialize audio writers, one for each device.
TODO: implement audio streaming info on the Stream object.
Will fail if no FFmpeg installed.
Raises:
| Type | Description |
|---|---|
ValueError
|
When no supported codec specification was provided in config file. |
Returns:
| Name | Type | Description |
|---|---|---|
int |
int
|
Number of initialized writers. |
_init_files_csv
¶
Create and initialize CSV files.
Will have a separate file for each stream of each device. Currently assumes that device names are unique across all streamers.
Returns:
| Name | Type | Description |
|---|---|---|
int |
int
|
Number of initialized writers. |
_init_files_hdf5
¶
Create and initialize a hierarchical HDF5 file.
Will have a single file for all streams from all devices. Currently assumes that device names are unique across all streamers.
Returns:
| Name | Type | Description |
|---|---|---|
int |
int
|
Number of initialized writers. |
_init_files_video
¶
Create and initialize video writers, one for each device.
Will fail if no FFmpeg installed.
Raises:
| Type | Description |
|---|---|
ValueError
|
When no supported codec specification was provided in config file. |
Returns:
| Name | Type | Description |
|---|---|---|
int |
int
|
Number of initialized writers. |
_init_log_indices
¶
Initialize the data indices to fetch for logging
Will record the next data indices that should be fetched for each stream, and the number of timesteps that each streamer needs before data is solidified.
_log_data
async
¶
Trigger release of AsyncIO resources used for writing files.
Polls data from each Node periodically or all at once.
The poll period is set by self._stream_period_s.
Will loop until self._is_streaming is False, and then will do one final fetch/log if self._is_flush is True.
Assert self._is_streaming and deassert self._is_flush for streaming. To finish, deassert self._is_streaming to False and assert self._is_flush.
Deassert self._is_streaming and assert self._is_flush to dump record. The thread will be inactive until terminated. User is responsible to provision sufficient memory.
_log_metadata
¶
Convenience method to add metadata to all file types.
Will include device-level metadata and any lower-level data notes.
_log_metadata_audio
¶
Add experiment metadata on the audio files.
Dummy method, metadata is written on instantiation.
_log_metadata_csv
¶
Add experiment metadata on the CSV files.
TODO: validate logic.
_log_metadata_video
¶
Add experiment metadata on the video files.
Dummy method, metadata is written on instantiation.
_sync_write_audio
¶
_sync_write_audio(
audio_writer: Popen, node_name: str, device_name: str, stream_name: str
)
Write provided data to the audio files.
Can be called during streaming (periodic writing) and post-experiment dumping.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
Popen
|
FFmpeg writer corresponding to the audio. |
required |
|
str
|
Valid unique tag of the Node owning the data. |
required |
|
str
|
Valid device tree name. |
required |
|
str
|
Valid sub-stream name. |
required |
_sync_write_csv
¶
_sync_write_csv(
csv_writer: TextIOWrapper, node_name: str, device_name: str, stream_name: str
) -> None
Write provided data to the CSV file.
Can be called during streaming (periodic writing) and post-experiment dumping.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
TextIOWrapper
|
Text file writer corresponding to the stream. |
required |
|
str
|
Valid unique tag of the Node owning the data. |
required |
|
str
|
Valid device tree name. |
required |
|
str
|
Valid sub-stream name. |
required |
_sync_write_hdf5
¶
_sync_write_hdf5(node_name: str, device_name: str, stream_name: str) -> None
Write provided data to the HDF5 file.
Can be called during streaming (periodic writing) and post-experiment dumping.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
str
|
Valid unique tag of the Node owning the data. |
required |
|
str
|
Valid device tree name. |
required |
|
str
|
Valid sub-stream name. |
required |
_sync_write_video
¶
_sync_write_video(
video_writer: Popen, node_name: str, device_name: str, stream_name: str
)
Write provided data to the video files.
Can be called during streaming (periodic writing) and post-experiment dumping.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
Popen
|
FFmpeg writer corresponding to the video. |
required |
|
str
|
Valid unique tag of the Node owning the data. |
required |
|
str
|
Valid device tree name. |
required |
|
str
|
Valid sub-stream name. |
required |
_write_audio
async
¶
_write_audio(audio_writer: Popen, node_name: str, device_name: str, stream_name: str)
Coroutine asynchronous wrapper for multiple audio streams.
Wraps synchronous IO of each video file into an asynchronous pool of coroutines.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
Popen
|
FFmpeg writer corresponding to the audio. |
required |
|
str
|
Valid unique tag of the Node owning the data. |
required |
|
str
|
Valid device tree name. |
required |
|
str
|
Valid sub-stream name. |
required |
_write_csv
async
¶
_write_csv(
csv_writer: TextIOWrapper, node_name: str, device_name: str, stream_name: str
)
Coroutine asynchronous wrapper for multiple CSV streams.
Wraps synchronous IO of each CSV file into an asynchronous pool of coroutines.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
TextIOWrapper
|
Text file writer corresponding to the stream. |
required |
|
str
|
Valid unique tag of the Node owning the data. |
required |
|
str
|
Valid device tree name. |
required |
|
str
|
Valid sub-stream name. |
required |
_write_files_audio
async
¶
Convenience method to asynchronously write all audio data to files.
Will launch coroutine concurrently with other IO writer types.
_write_files_csv
async
¶
Convenience method to asynchronously write all CSV data to files.
Will launch coroutine concurrently with other IO writer types.
_write_files_hdf5
async
¶
Convenience method to asynchronously write all HDF5 data to file.
Will launch coroutine concurrently with other IO writer types.
_write_files_video
async
¶
Convenience method to asynchronously write all video data to files.
Will launch coroutines concurrently with other IO writer types.
_write_hdf5
¶
Convenience wrapper to write new data for multiple HDF5 streams.
Writes all new text data to a single HDF5 file.
_write_video
async
¶
_write_video(video_writer: Popen, node_name: str, device_name: str, stream_name: str)
Coroutine asynchronous wrapper for multiple video streams.
Wraps synchronous IO of each video file into an asynchronous pool of coroutines.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
Popen
|
FFmpeg writer corresponding to the video. |
required |
|
str
|
Valid unique tag of the Node owning the data. |
required |
|
str
|
Valid device tree name. |
required |
|
str
|
Valid sub-stream name. |
required |
cleanup
¶
Stop stream-logging and wait for it to finish.
Will stop stream-logging, if it is active. Will trigger data dump, if configured. Node pushing data to the Stream should stop adding new data before cleaning up Logger.
storage_states
¶
Classes:
| Name | Description |
|---|---|
AbstractStorageState |
Abstract class for the Storage component. |
DumpState |
Passive recording state of the Storage component. |
StartState |
Startup state that initializes Storage. |
StreamState |
Streaming state of the Storage component. |
AbstractStorageState
¶
Bases: StateInterface
Abstract class for the Storage component.
Methods:
| Name | Description |
|---|---|
__init__ |
Constructor of the AbstractStorageState. |
__init__
¶
__init__(context: StorageInterface)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
StorageInterface
|
Reference to the Storage object. |
required |
DumpState
¶
Bases: AbstractStorageState
Passive recording state of the Storage component.
Will flush data to disk once upon system termination.
May run out of memory if the recording is long or user doesn't provision sufficient data.
Using some streams in stream and others in dump has undefined behavior.
Methods:
| Name | Description |
|---|---|
__init__ |
Constructor of the DumpState. |
__init__
¶
__init__(context: StorageInterface)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
StorageInterface
|
Reference to the Storage object. |
required |
StartState
¶
Bases: AbstractStorageState
Startup state that initializes Storage.
Will immediately transition into StreamState after initialization.
Methods:
| Name | Description |
|---|---|
__init__ |
Constructor of the StartState. |
__init__
¶
__init__(context: StorageInterface, streams: OrderedDict[str, Stream])
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
StorageInterface
|
Reference to the Storage object. |
required |
|
OrderedDict[str, Stream]
|
Reference to the mapping between uniquly identifying Node keys and the corresponding Stream datastructures. |
required |
StreamState
¶
Bases: AbstractStorageState
Streaming state of the Storage component.
Will periodically flush data to disk, to clear memory, if any streams were specified to stream.
Will flush remaining data to disk on exit.
Using some streams in stream and others in dump has undefined behavior.
Methods:
| Name | Description |
|---|---|
__init__ |
Constructor of the StreamState. |
__init__
¶
__init__(context: StorageInterface)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
StorageInterface
|
Reference to the Storage object. |
required |