Skip to content

hermes.base.storage

The module is responsible for high-performance asynchronous file IO. It periodically spins up coroutines to offset long IO operations with concurrent writes of different files.

storage_interface - base interface for Storage function.

storage - concrete FSM-based Storage with streaming and dumping features.

storage_states - FSM states of the Storage.

storage_interface

Classes:

Name Description
StorageInterface

Interface for the AsyncIO Storage component.

StorageInterface

Bases: ABC

Interface for the AsyncIO Storage component.

_initialize abstractmethod

_initialize(streams: OrderedDict[str, Stream]) -> None

Initializes files and indices for write pointer tracking.

Parameters:

Name Type Description Default
streams
OrderedDict[str, Stream]

Reference to the Stream objects to flush to disk.

required

_is_to_dump abstractmethod

_is_to_dump() -> bool

Check if any streams were configured to record.

Returns:

Name Type Description
bool bool

Whether there are any streams configured to dump record data.

_is_to_stream abstractmethod

_is_to_stream() -> bool

Check if any streams were configured to stream.

Returns:

Name Type Description
bool bool

Whether there are any streams configured to stream data.

_log_data abstractmethod async

_log_data() -> None

Main AsyncIO loop for Storage to write files concurrentlyto disk.

_release_thread_pool abstractmethod

_release_thread_pool() -> None

Trigger release of AsyncIO resources used for writing files.

_set_state abstractmethod

_set_state(state) -> None

User defined logic for FSM state transition.

Parameters:

Name Type Description Default
state
Any

New state to transition to.

required

_start_dump_logging abstractmethod

_start_dump_logging() -> None

Initialize passive recording until terminated to dump data once.

_start_stream_logging abstractmethod

_start_stream_logging() -> None

Set up AV/HDF5 file writers for stream-logging, if desired.

_stop_stream_logging abstractmethod

_stop_stream_logging() -> None

Trigger termination and flushing of accumulated streamed data.

Will wait for the thread to finish before returning.

_wait_till_flush abstractmethod

_wait_till_flush() -> None

Sleep until the Storage is triggered to terminate after flushing.

storage

Classes:

Name Description
Storage

Manages IO operations of all stream data.

Storage

Bases: StorageInterface

Manages IO operations of all stream data.

Flushes data periodically for continuous operation and clears from memory to reduce RAM usage, or all at once if user guarantees enough memory. In continuous mode, will flush leftover data once the program is stopped.

Logs video and audio data with FFmpeg to MKV/MP4 and MP3, respectively. Logs all other sensor data in a single hierarchical HDF5 file. CSV format is also supported, but discouraged -> creates file per sub-stream.

If sub-stream elements contain a burst of samples of sample_size, will automatically unroll it.

Will fail if no FFmpeg is installed on the system.

Methods:

Name Description
__call__

Callable that runs main FSM loop.

__init__

Constructor of the Storage component responsible for all IO.

cleanup

Stop stream-logging and wait for it to finish.

__call__

__call__(streams: OrderedDict[str, Stream]) -> None

Callable that runs main FSM loop.

Runs continuously, ignoring Ctrl+C interrupt, until owner Node triggers an exit.

Parameters:

Name Type Description Default
streams
OrderedDict[str, Stream]

Reference to the Stream objects to flush to disk.

required

__init__

__init__(
    log_tag: str,
    log_dir: str,
    log_time_s: float,
    experiment: dict[str, str],
    stream_csv: bool = False,
    stream_hdf5: bool = False,
    stream_video: bool = False,
    stream_audio: bool = False,
    dump_csv: bool = False,
    dump_hdf5: bool = False,
    dump_video: bool = False,
    dump_audio: bool = False,
    video_codec: VideoCodecDict | None = None,
    video_codec_num_cpu: int = 1,
    audio_codec: AudioCodecDict | None = None,
    audio_codec_num_cpu: int = 1,
    stream_period_s: float = 30.0,
    **_,
)

Parameters:

Name Type Description Default
log_tag
str

Filename prefix.

required
log_dir
str

Path to the directory on disk to flush data to.

required
log_time_s
float

Start time of saving data.

required
experiment
dict[str, str]

Nested setup definition of Nodes across distributed hosts.

required
stream_csv
bool

Whether to stream data into CSV files. Defaults to False.

False
stream_hdf5
bool

Whether to stream data into HDF5 files. Defaults to False.

False
stream_video
bool

Whether to stream video data into MP4/MKV files. Defaults to False.

False
stream_audio
bool

Whether to stream audio data into MP3/WAV files. Defaults to False.

False
dump_csv
bool

Weather to dump in-memory recorded data in CSV files. Defaults to False.

False
dump_hdf5
bool

Weather to dump in-memory recorded data in HDF5 files. Defaults to False.

False
dump_video
bool

Weather to dump in-memory recorded video data in MP4/MKV files. Defaults to False.

False
dump_audio
bool

Weather to dump in-memory recorded audio data in MP3/WAV files. Defaults to False.

False
video_codec
VideoCodecDict | None

Definition of the video codec to use for FFmpeg. Defaults to None.

None
video_codec_num_cpu
int

Number of CPU cores to limit an FFmpeg process to for video writing. Defaults to 1.

1
audio_codec
AudioCodecDict | None

Definition of the audio codec to use for FFmpeg. Defaults to None.

None
audio_codec_num_cpu
int

Number of CPU cores to limit an FFmpeg process to for audio writing. Defaults to 1.

1
stream_period_s
float

Duration of periods over which to flush streamed accumulated data from memory to disk. Defaults to 30.0.

30.0

_close_files

_close_files() -> None

Convenience method to close all files writers.

_close_files_audio

_close_files_audio() -> None

Flush/close the audio file writers.

_close_files_csv

_close_files_csv() -> None

Flush/close the CSV file writers.

_close_files_hdf5

_close_files_hdf5() -> None

Flush/close the HDF5 file writer.

Resizes datasets to remove extra empty rows.

_close_files_video

_close_files_video() -> None

Flush/close the video files writers.

_init_files_audio

_init_files_audio() -> int

[Not implemented] Create and initialize audio writers, one for each device.

TODO: implement audio streaming info on the Stream object.

Will fail if no FFmpeg installed.

Raises:

Type Description
ValueError

When no supported codec specification was provided in config file.

Returns:

Name Type Description
int int

Number of initialized writers.

_init_files_csv

_init_files_csv() -> int

Create and initialize CSV files.

Will have a separate file for each stream of each device. Currently assumes that device names are unique across all streamers.

Returns:

Name Type Description
int int

Number of initialized writers.

_init_files_hdf5

_init_files_hdf5() -> int

Create and initialize a hierarchical HDF5 file.

Will have a single file for all streams from all devices. Currently assumes that device names are unique across all streamers.

Returns:

Name Type Description
int int

Number of initialized writers.

_init_files_video

_init_files_video() -> int

Create and initialize video writers, one for each device.

Will fail if no FFmpeg installed.

Raises:

Type Description
ValueError

When no supported codec specification was provided in config file.

Returns:

Name Type Description
int int

Number of initialized writers.

_init_log_indices

_init_log_indices() -> None

Initialize the data indices to fetch for logging

Will record the next data indices that should be fetched for each stream, and the number of timesteps that each streamer needs before data is solidified.

_log_data async

_log_data() -> None

Trigger release of AsyncIO resources used for writing files.

Polls data from each Node periodically or all at once.

The poll period is set by self._stream_period_s.

Will loop until self._is_streaming is False, and then will do one final fetch/log if self._is_flush is True.

Assert self._is_streaming and deassert self._is_flush for streaming. To finish, deassert self._is_streaming to False and assert self._is_flush.

Deassert self._is_streaming and assert self._is_flush to dump record. The thread will be inactive until terminated. User is responsible to provision sufficient memory.

_log_metadata

_log_metadata()

Convenience method to add metadata to all file types.

Will include device-level metadata and any lower-level data notes.

_log_metadata_audio

_log_metadata_audio() -> None

Add experiment metadata on the audio files.

Dummy method, metadata is written on instantiation.

_log_metadata_csv

_log_metadata_csv() -> None

Add experiment metadata on the CSV files.

TODO: validate logic.

_log_metadata_hdf5

_log_metadata_hdf5() -> None

Add experiment metadata on the HDF5 file.

_log_metadata_video

_log_metadata_video() -> None

Add experiment metadata on the video files.

Dummy method, metadata is written on instantiation.

_sync_write_audio

_sync_write_audio(
    audio_writer: Popen, node_name: str, device_name: str, stream_name: str
)

Write provided data to the audio files.

Can be called during streaming (periodic writing) and post-experiment dumping.

Parameters:

Name Type Description Default
audio_writer
Popen

FFmpeg writer corresponding to the audio.

required
node_name
str

Valid unique tag of the Node owning the data.

required
device_name
str

Valid device tree name.

required
stream_name
str

Valid sub-stream name.

required

_sync_write_csv

_sync_write_csv(
    csv_writer: TextIOWrapper, node_name: str, device_name: str, stream_name: str
) -> None

Write provided data to the CSV file.

Can be called during streaming (periodic writing) and post-experiment dumping.

Parameters:

Name Type Description Default
csv_writer
TextIOWrapper

Text file writer corresponding to the stream.

required
node_name
str

Valid unique tag of the Node owning the data.

required
device_name
str

Valid device tree name.

required
stream_name
str

Valid sub-stream name.

required

_sync_write_hdf5

_sync_write_hdf5(node_name: str, device_name: str, stream_name: str) -> None

Write provided data to the HDF5 file.

Can be called during streaming (periodic writing) and post-experiment dumping.

Parameters:

Name Type Description Default
node_name
str

Valid unique tag of the Node owning the data.

required
device_name
str

Valid device tree name.

required
stream_name
str

Valid sub-stream name.

required

_sync_write_video

_sync_write_video(
    video_writer: Popen, node_name: str, device_name: str, stream_name: str
)

Write provided data to the video files.

Can be called during streaming (periodic writing) and post-experiment dumping.

Parameters:

Name Type Description Default
video_writer
Popen

FFmpeg writer corresponding to the video.

required
node_name
str

Valid unique tag of the Node owning the data.

required
device_name
str

Valid device tree name.

required
stream_name
str

Valid sub-stream name.

required

_write_audio async

_write_audio(audio_writer: Popen, node_name: str, device_name: str, stream_name: str)

Coroutine asynchronous wrapper for multiple audio streams.

Wraps synchronous IO of each video file into an asynchronous pool of coroutines.

Parameters:

Name Type Description Default
audio_writer
Popen

FFmpeg writer corresponding to the audio.

required
node_name
str

Valid unique tag of the Node owning the data.

required
device_name
str

Valid device tree name.

required
stream_name
str

Valid sub-stream name.

required

_write_csv async

_write_csv(
    csv_writer: TextIOWrapper, node_name: str, device_name: str, stream_name: str
)

Coroutine asynchronous wrapper for multiple CSV streams.

Wraps synchronous IO of each CSV file into an asynchronous pool of coroutines.

Parameters:

Name Type Description Default
csv_writer
TextIOWrapper

Text file writer corresponding to the stream.

required
node_name
str

Valid unique tag of the Node owning the data.

required
device_name
str

Valid device tree name.

required
stream_name
str

Valid sub-stream name.

required

_write_files_audio async

_write_files_audio()

Convenience method to asynchronously write all audio data to files.

Will launch coroutine concurrently with other IO writer types.

_write_files_csv async

_write_files_csv()

Convenience method to asynchronously write all CSV data to files.

Will launch coroutine concurrently with other IO writer types.

_write_files_hdf5 async

_write_files_hdf5()

Convenience method to asynchronously write all HDF5 data to file.

Will launch coroutine concurrently with other IO writer types.

_write_files_video async

_write_files_video()

Convenience method to asynchronously write all video data to files.

Will launch coroutines concurrently with other IO writer types.

_write_hdf5

_write_hdf5() -> None

Convenience wrapper to write new data for multiple HDF5 streams.

Writes all new text data to a single HDF5 file.

_write_video async

_write_video(video_writer: Popen, node_name: str, device_name: str, stream_name: str)

Coroutine asynchronous wrapper for multiple video streams.

Wraps synchronous IO of each video file into an asynchronous pool of coroutines.

Parameters:

Name Type Description Default
video_writer
Popen

FFmpeg writer corresponding to the video.

required
node_name
str

Valid unique tag of the Node owning the data.

required
device_name
str

Valid device tree name.

required
stream_name
str

Valid sub-stream name.

required

cleanup

cleanup() -> None

Stop stream-logging and wait for it to finish.

Will stop stream-logging, if it is active. Will trigger data dump, if configured. Node pushing data to the Stream should stop adding new data before cleaning up Logger.

storage_states

Classes:

Name Description
AbstractStorageState

Abstract class for the Storage component.

DumpState

Passive recording state of the Storage component.

StartState

Startup state that initializes Storage.

StreamState

Streaming state of the Storage component.

AbstractStorageState

Bases: StateInterface

Abstract class for the Storage component.

Methods:

Name Description
__init__

Constructor of the AbstractStorageState.

__init__

Parameters:

Name Type Description Default
context
StorageInterface

Reference to the Storage object.

required

DumpState

Bases: AbstractStorageState

Passive recording state of the Storage component.

Will flush data to disk once upon system termination.

May run out of memory if the recording is long or user doesn't provision sufficient data.

Using some streams in stream and others in dump has undefined behavior.

Methods:

Name Description
__init__

Constructor of the DumpState.

__init__

Parameters:

Name Type Description Default
context
StorageInterface

Reference to the Storage object.

required

StartState

Bases: AbstractStorageState

Startup state that initializes Storage.

Will immediately transition into StreamState after initialization.

Methods:

Name Description
__init__

Constructor of the StartState.

__init__

__init__(context: StorageInterface, streams: OrderedDict[str, Stream])

Parameters:

Name Type Description Default
context
StorageInterface

Reference to the Storage object.

required
streams
OrderedDict[str, Stream]

Reference to the mapping between uniquly identifying Node keys and the corresponding Stream datastructures.

required

StreamState

Bases: AbstractStorageState

Streaming state of the Storage component.

Will periodically flush data to disk, to clear memory, if any streams were specified to stream.

Will flush remaining data to disk on exit.

Using some streams in stream and others in dump has undefined behavior.

Methods:

Name Description
__init__

Constructor of the StreamState.

__init__

Parameters:

Name Type Description Default
context
StorageInterface

Reference to the Storage object.

required