hermes.base.stream¶
Stream is a high-performance parallel thread-safe streaming data structure, which the rest of HERMES is based on. It is used to coordinate information between Broker, Producer/Consumer/Pipeline Nodes, and Storage component.
stream
¶
Classes:
| Name | Description |
|---|---|
Stream |
An abstract class to store data of a Node. |
Stream
¶
Bases: ABC
An abstract class to store data of a Node.
Tree-like structure of FIFO buffers. May contain multiple sub-streams for a single device, e.g. acceleration and gyroscope of an IMU.
Data for sub-streams under the same device tree arrives as a single packet. Packets containing decoupled data are better treated as independent device trees.
Uses a lock for each device tree to delegate access to the start of the FIFO: ensures high performance from parallel acquisition, processing, and logging blocks. This allows the end of the FIFO to be saved and discarded by the Storage.
Will store the class name of each sensor in HDF5 metadata.
Can periodically clear old data (if needed).
Methods:
| Name | Description |
|---|---|
add_stream |
Add a new sub-stream to an existing device tree or creates new. |
append_data |
Thread-safe append of new sample to the stream. |
clear_data |
Clear data in a stream. |
clear_data_all |
Clear all sub-streams from all device trees. |
get_device_names |
Get the names of the asynchronous device trees. |
get_fps |
Get effective frame rate of this unique stream's captured data. |
get_num_devices |
Get the number of asynchronous device trees. |
get_stream_info |
Get metadata of a sub-stream. |
get_stream_info_all |
Get metadata of all sub-streams. |
get_stream_names |
Get the names of sub-streams in a device tree. |
peek_data_new |
Wrap N newest samples in an iterator to peek. |
pop_data |
Wrap all samples ready to be popped in an iterator oldest->newest. |
_add_stream
¶
_add_stream(
device_name: str,
stream_name: str,
data_type: str,
sample_size: Iterable[int],
sampling_rate_hz: float = 0.0,
is_measure_rate_hz: bool = False,
data_notes: Mapping[str, str] = {},
is_video: bool = False,
color_format: str | None = None,
is_audio: bool = False,
timesteps_before_solidified: int = 0,
extra_data_info: ExtraDataInfoDict = {},
) -> None
[Internal] Underlying logic for adding a stream.
Raises:
| Type | Description |
|---|---|
KeyError
|
If supplied color format is not supported or misspelled. |
_append
¶
[Internal] Non thread-safe append of new sample.
_get_fps
¶
_get_fps(device_name: str, stream_name: str) -> float | None
[Internal] Retrieve the effective sampling rate of a signal, if recorded.
Records and refreshes rolling statistics on each data structure append over 1-second windows.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
str
|
Valid device tree name. |
required |
|
str
|
Valid sub-stream name. |
required |
Returns:
| Type | Description |
|---|---|
float | None
|
float | None: Measured acquisition sampling rate of the sub-stream. |
add_stream
¶
add_stream(
device_name: str,
stream_name: str,
data_type: str,
sample_size: Iterable[int],
sampling_rate_hz: float = 0.0,
is_measure_rate_hz: bool = False,
data_notes: Mapping[str, str] = {},
is_video: bool = False,
color_format: str | None = None,
is_audio: bool = False,
timesteps_before_solidified: int = 0,
extra_data_info: ExtraDataInfoDict = {},
) -> None
Add a new sub-stream to an existing device tree or creates new.
Will by default add a stream for each device to mark each captured sample with the host's time-of-arrival.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
str
|
Device tree name. Will autocreate if doesn't exist. |
required |
|
str
|
Unique sub-stream name under this device tree. |
required |
|
str
|
Fixed data type expected in the sub-stream. |
required |
|
Iterable[int]
|
An interable of dimensions of given data type in each captured sample. |
required |
|
float
|
Expected sampling frequency of the signal. Defaults to 0.0. |
0.0
|
|
bool
|
Whether to compute the effective sampling frequency. Defaults to False. |
False
|
|
Mapping[str, str]
|
Mapping of streams to notes for Storage to use in file metadata. Defaults to {}. |
{}
|
|
bool
|
Whether it is a video stream. Defaults to False. |
False
|
|
str | None
|
One of the supported VIDEO_FORMAT identifiers. Defaults to None. |
None
|
|
bool
|
Whether it is an audio stream. Defaults to False. |
False
|
|
int
|
How many most recent samples to keep in memory before flushing. Defaults to 0. |
0
|
|
ExtraDataInfoDict
|
Additional mapping that will be streamed along with data, with at least 'data_type' and 'sample_size'. Defaults to {}. |
{}
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If stream name is not unique or is reserved. |
append_data
¶
append_data(process_time_s: float, data: NewDataDict) -> None
Thread-safe append of new sample to the stream.
Locks the device tree of the sub-stream, to avoid immutability error in reverse iterator of the GUI thread when trying to peek N newest samples of the stream while new are written.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
float
|
Time-of-processing of the sample. |
required |
|
NewDataDict
|
Newly processed sample. |
required |
clear_data
¶
clear_data(
device_name: str, stream_name: str, num_oldest_to_clear: int | None = None
) -> None
Clear data in a stream.
Initializes the sub-stream if it doesn't exist. Optionally can clear N oldest samples.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
str
|
Valid device tree name. |
required |
|
str
|
Valid sub-stream name. |
required |
|
int | None
|
Number of oldest samples to clear. Defaults to None. |
None
|
get_device_names
¶
Get the names of the asynchronous device trees.
Returns:
| Type | Description |
|---|---|
list[str]
|
list[str]: Names of device trees. |
get_fps
abstractmethod
¶
Get effective frame rate of this unique stream's captured data.
Subject to expected transmission delay and throughput limitation. Computed based on how fast data becomes available to the data structure. Used to measure the performance of the system - local or remote nodes.
Returns:
| Type | Description |
|---|---|
dict[str, float | None]
|
dict[str, float | None]: Mapping of measured FPS to stream names. |
get_num_devices
¶
Get the number of asynchronous device trees.
Returns:
| Name | Type | Description |
|---|---|---|
int |
int
|
Number of device trees. |
get_stream_info
¶
get_stream_info(device_name: str, stream_name: str) -> Dict[str, Any]
Get metadata of a sub-stream.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
str
|
Valid device tree name. |
required |
|
str
|
Valid sub-stream name. |
required |
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
Dict[str, Any]: Metadata dictionary with keys: data_type is_video is_audio sample_size sampling_rate_hz timesteps_before_solidified extra_data_info data_notes if is_measure_rate_hz: actual_rate_hz dt_circular_buffer dt_circular_index dt_running_sum old_toa |
get_stream_info_all
¶
Get metadata of all sub-streams.
Returns:
| Name | Type | Description |
|---|---|---|
StreamInfoDict |
StreamInfoDict
|
Nested dictionary of metadata, with device trees and sub-streams as keys. |
get_stream_names
¶
get_stream_names(device_name: str | None = None) -> list[str]
Get the names of sub-streams in a device tree.
If device_name is None, will assume streams are the same for every device.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
str | None
|
Name of the device tree to query. Defaults to None. |
None
|
Returns:
| Type | Description |
|---|---|
list[str]
|
list[str]: Names of sub-streams in a device tree. |
peek_data_new
¶
peek_data_new(
device_name: str, stream_name: str, num_newest_to_peek: int | None = None
) -> Iterator[Any]
Wrap N newest samples in an iterator to peek.
Will lock the device tree of the sub-stream to prevent appends muttating the iterator.
Will allow popping of the oldest data (e.g. for Storage to flush).
Peeking and popping ranges are protected by timesteps_before_solidified.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
str
|
Valid device tree name. |
required |
|
str
|
Valid sub-stream name. |
required |
|
int | None
|
Number of samples to peek, if less than |
None
|
Yields:
| Type | Description |
|---|---|
Any
|
Iterator[Any]: Iterator over peekable newest samples. |
pop_data
¶
pop_data(
device_name: str,
stream_name: str,
num_oldest_to_pop: int | None = None,
is_flush: bool = False,
) -> Iterator[Any]
Wrap all samples ready to be popped in an iterator oldest->newest.
Used by Storage to flush data to disk. Popped data is cleared from memory. Thread-safe without locks while appending new data.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
str
|
Valid device tree name. |
required |
|
str
|
Valid sub-stream name. |
required |
|
int | None
|
Number of samples to pop. Defaults to None. |
None
|
|
bool
|
Whether to pop all data in the stream, regardless of timesteps_before_solidified. Defaults to False. |
False
|
Yields:
| Type | Description |
|---|---|
Any
|
Iterator[Any]: Iterator over poppable oldest->newest samples. |