Skip to content

hermes.base.stream

Stream is a high-performance parallel thread-safe streaming data structure, which the rest of HERMES is based on. It is used to coordinate information between Broker, Producer/Consumer/Pipeline Nodes, and Storage component.

stream

Classes:

Name Description
Stream

An abstract class to store data of a Node.

Stream

Bases: ABC

An abstract class to store data of a Node.

Tree-like structure of FIFO buffers. May contain multiple sub-streams for a single device, e.g. acceleration and gyroscope of an IMU.

Data for sub-streams under the same device tree arrives as a single packet. Packets containing decoupled data are better treated as independent device trees.

Uses a lock for each device tree to delegate access to the start of the FIFO: ensures high performance from parallel acquisition, processing, and logging blocks. This allows the end of the FIFO to be saved and discarded by the Storage.

Will store the class name of each sensor in HDF5 metadata.

Can periodically clear old data (if needed).

Methods:

Name Description
add_stream

Add a new sub-stream to an existing device tree or creates new.

append_data

Thread-safe append of new sample to the stream.

clear_data

Clear data in a stream.

clear_data_all

Clear all sub-streams from all device trees.

get_device_names

Get the names of the asynchronous device trees.

get_fps

Get effective frame rate of this unique stream's captured data.

get_num_devices

Get the number of asynchronous device trees.

get_stream_info

Get metadata of a sub-stream.

get_stream_info_all

Get metadata of all sub-streams.

get_stream_names

Get the names of sub-streams in a device tree.

peek_data_new

Wrap N newest samples in an iterator to peek.

pop_data

Wrap all samples ready to be popped in an iterator oldest->newest.

_add_stream

_add_stream(
    device_name: str,
    stream_name: str,
    data_type: str,
    sample_size: Iterable[int],
    sampling_rate_hz: float = 0.0,
    is_measure_rate_hz: bool = False,
    data_notes: Mapping[str, str] = {},
    is_video: bool = False,
    color_format: str | None = None,
    is_audio: bool = False,
    timesteps_before_solidified: int = 0,
    extra_data_info: ExtraDataInfoDict = {},
) -> None

[Internal] Underlying logic for adding a stream.

Raises:

Type Description
KeyError

If supplied color format is not supported or misspelled.

_append

_append(device_name: str, stream_name: str, data: Any) -> None

[Internal] Non thread-safe append of new sample.

_get_fps

_get_fps(device_name: str, stream_name: str) -> float | None

[Internal] Retrieve the effective sampling rate of a signal, if recorded.

Records and refreshes rolling statistics on each data structure append over 1-second windows.

Parameters:

Name Type Description Default
device_name
str

Valid device tree name.

required
stream_name
str

Valid sub-stream name.

required

Returns:

Type Description
float | None

float | None: Measured acquisition sampling rate of the sub-stream.

add_stream

add_stream(
    device_name: str,
    stream_name: str,
    data_type: str,
    sample_size: Iterable[int],
    sampling_rate_hz: float = 0.0,
    is_measure_rate_hz: bool = False,
    data_notes: Mapping[str, str] = {},
    is_video: bool = False,
    color_format: str | None = None,
    is_audio: bool = False,
    timesteps_before_solidified: int = 0,
    extra_data_info: ExtraDataInfoDict = {},
) -> None

Add a new sub-stream to an existing device tree or creates new.

Will by default add a stream for each device to mark each captured sample with the host's time-of-arrival.

Parameters:

Name Type Description Default
device_name
str

Device tree name. Will autocreate if doesn't exist.

required
stream_name
str

Unique sub-stream name under this device tree.

required
data_type
str

Fixed data type expected in the sub-stream.

required
sample_size
Iterable[int]

An interable of dimensions of given data type in each captured sample.

required
sampling_rate_hz
float

Expected sampling frequency of the signal. Defaults to 0.0.

0.0
is_measure_rate_hz
bool

Whether to compute the effective sampling frequency. Defaults to False.

False
data_notes
Mapping[str, str]

Mapping of streams to notes for Storage to use in file metadata. Defaults to {}.

{}
is_video
bool

Whether it is a video stream. Defaults to False.

False
color_format
str | None

One of the supported VIDEO_FORMAT identifiers. Defaults to None.

None
is_audio
bool

Whether it is an audio stream. Defaults to False.

False
timesteps_before_solidified
int

How many most recent samples to keep in memory before flushing. Defaults to 0.

0
extra_data_info
ExtraDataInfoDict

Additional mapping that will be streamed along with data, with at least 'data_type' and 'sample_size'. Defaults to {}.

{}

Raises:

Type Description
ValueError

If stream name is not unique or is reserved.

append_data

append_data(process_time_s: float, data: NewDataDict) -> None

Thread-safe append of new sample to the stream.

Locks the device tree of the sub-stream, to avoid immutability error in reverse iterator of the GUI thread when trying to peek N newest samples of the stream while new are written.

Parameters:

Name Type Description Default
process_time_s
float

Time-of-processing of the sample.

required
data
NewDataDict

Newly processed sample.

required

clear_data

clear_data(
    device_name: str, stream_name: str, num_oldest_to_clear: int | None = None
) -> None

Clear data in a stream.

Initializes the sub-stream if it doesn't exist. Optionally can clear N oldest samples.

Parameters:

Name Type Description Default
device_name
str

Valid device tree name.

required
stream_name
str

Valid sub-stream name.

required
num_oldest_to_clear
int | None

Number of oldest samples to clear. Defaults to None.

None

clear_data_all

clear_data_all() -> None

Clear all sub-streams from all device trees.

get_device_names

get_device_names() -> list[str]

Get the names of the asynchronous device trees.

Returns:

Type Description
list[str]

list[str]: Names of device trees.

get_fps abstractmethod

get_fps() -> dict[str, float | None]

Get effective frame rate of this unique stream's captured data.

Subject to expected transmission delay and throughput limitation. Computed based on how fast data becomes available to the data structure. Used to measure the performance of the system - local or remote nodes.

Returns:

Type Description
dict[str, float | None]

dict[str, float | None]: Mapping of measured FPS to stream names.

get_num_devices

get_num_devices() -> int

Get the number of asynchronous device trees.

Returns:

Name Type Description
int int

Number of device trees.

get_stream_info

get_stream_info(device_name: str, stream_name: str) -> Dict[str, Any]

Get metadata of a sub-stream.

Parameters:

Name Type Description Default
device_name
str

Valid device tree name.

required
stream_name
str

Valid sub-stream name.

required

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: Metadata dictionary with keys: data_type is_video is_audio sample_size sampling_rate_hz timesteps_before_solidified extra_data_info data_notes if is_measure_rate_hz: actual_rate_hz dt_circular_buffer dt_circular_index dt_running_sum old_toa

get_stream_info_all

get_stream_info_all() -> StreamInfoDict

Get metadata of all sub-streams.

Returns:

Name Type Description
StreamInfoDict StreamInfoDict

Nested dictionary of metadata, with device trees and sub-streams as keys.

get_stream_names

get_stream_names(device_name: str | None = None) -> list[str]

Get the names of sub-streams in a device tree.

If device_name is None, will assume streams are the same for every device.

Parameters:

Name Type Description Default
device_name
str | None

Name of the device tree to query. Defaults to None.

None

Returns:

Type Description
list[str]

list[str]: Names of sub-streams in a device tree.

peek_data_new

peek_data_new(
    device_name: str, stream_name: str, num_newest_to_peek: int | None = None
) -> Iterator[Any]

Wrap N newest samples in an iterator to peek.

Will lock the device tree of the sub-stream to prevent appends muttating the iterator. Will allow popping of the oldest data (e.g. for Storage to flush). Peeking and popping ranges are protected by timesteps_before_solidified.

Parameters:

Name Type Description Default
device_name
str

Valid device tree name.

required
stream_name
str

Valid sub-stream name.

required
num_newest_to_peek
int | None

Number of samples to peek, if less than timesteps_before_solidified. Defaults to None.

None

Yields:

Type Description
Any

Iterator[Any]: Iterator over peekable newest samples.

pop_data

pop_data(
    device_name: str,
    stream_name: str,
    num_oldest_to_pop: int | None = None,
    is_flush: bool = False,
) -> Iterator[Any]

Wrap all samples ready to be popped in an iterator oldest->newest.

Used by Storage to flush data to disk. Popped data is cleared from memory. Thread-safe without locks while appending new data.

Parameters:

Name Type Description Default
device_name
str

Valid device tree name.

required
stream_name
str

Valid sub-stream name.

required
num_oldest_to_pop
int | None

Number of samples to pop. Defaults to None.

None
is_flush
bool

Whether to pop all data in the stream, regardless of timesteps_before_solidified. Defaults to False.

False

Yields:

Type Description
Any

Iterator[Any]: Iterator over poppable oldest->newest samples.