Skip to content

FIFO

fifo

Classes:

Name Description
AlignedFifoBuffer

Multichannel first-in first-out buffer that aligns asynchronous temporally-lossy samples across channels.

BufferInterface

Interface for the multi-channel FIFO buffer.

NonOverflowingCounterAlignedFifoBuffer

Multichannel first-in first-out buffer that aligns asynchronous temporally-lossy samples across channels by supplied correlated overflowing counter.

NonOverflowingCounterConverter

A counter value converter from overflowing fixed range to non-overflowing values, starting at 0 for the first received sample.

TimestampAlignedFifoBuffer

Multichannel first-in first-out buffer that aligns asynchronous temporally-lossy samples across channels by supplied correlated timestamps.

TimestampToCounterConverter

A counter value converter from overflowing fixed range timestamp to non-overflowing counter values, starting at 0 for the first received sample.

AlignedFifoBuffer

Bases: BufferInterface

Multichannel first-in first-out buffer that aligns asynchronous temporally-lossy samples across channels.

Receives asynchronous samples for each channel, aligns them, and returns to the user an aligned snapshot across all channels. Allows yeeting from buffer if some keys have been empty for a while (disconnection or out of range), while others continue producing.

By default uses dynamically-growing Deque for the buffer, approprate for the sample rate of IMUs. maxlen offers possibility to turn into a fixed-length ring buffer, to avoid unnecessary memory allocations for higher performance, at the cost of lost data in case of slow consumers.

Updates only on yeet to discard stale sample that arrived too late. Adds counter into the data payload to retreive on the reader. (Useful for time->counter converted buffer). If the snapshot had not been read, even if the measurement is stale (arrived later than specified), still adds it to the buffer. Empty pads if some intermediate timesteps did not recieve a packet for a specific key. If buffer contents are valid, moves snapshot into the output Queue. Update the frame counter to keep track of removed data to discard stale late arrivals.

Methods:

Name Description
__init__

Constructor of the AlignedFifoBuffer.

flush

Allow to evict all present data because no new samples will be captured.

yeet

Attempts to synchronously retrieve the oldest set of samples from all channels of the buffer with a timeout.

__init__

__init__(keys: Iterable, timesteps_before_stale: int, maxlen: int | None = None)

Parameters:

Name Type Description Default
keys
Iterable

Set of uniquely identifying channel keys.

required
timesteps_before_stale
int

The number of samples in other channels after which a missing sample in a channel is marked missing.

required
maxlen
int | None

Fixed length of preallocated ring buffer. Defaults to None.

None

_put_output_queue

_put_output_queue(packet: dict) -> None

Places a ready to consume complete snapshot onto the output queue.

Parameters:

Name Type Description Default
packet
dict

Temporally aligned snapshot mapping unique channel-identifying keys to the corresponding sample.

required

flush

flush() -> None

Allow to evict all present data because no new samples will be captured.

yeet

yeet(timeout: float = 10.0) -> dict | None

Attempts to synchronously retrieve the oldest set of samples from all channels of the buffer with a timeout.

Parameters:

Name Type Description Default
timeout
float

How long to wait for new snapshot. Defaults to 10.0.

10.0

Returns:

Type Description
dict | None

dict | None: Multi-channel vector of the oldest sample or None if no new data became available until timeout.

BufferInterface

Bases: ABC

Interface for the multi-channel FIFO buffer.

Methods:

Name Description
plop

Asynchronously adds an element to the specified channel of the buffer.

yeet

Synchronously retrieves the oldest set of samples from all channels of the buffer.

plop abstractmethod

plop(key: str, data: dict) -> None

Asynchronously adds an element to the specified channel of the buffer.

Parameters:

Name Type Description Default
key
str

Unique identifier of the channel to add the data to.

required
data
dict

Usecase-specific nested dictionary of data to add.

required

yeet abstractmethod

yeet() -> Any

Synchronously retrieves the oldest set of samples from all channels of the buffer.

Returns:

Name Type Description
Any Any

Multi-channel vector of the oldest sample.

NonOverflowingCounterAlignedFifoBuffer

Bases: AlignedFifoBuffer

Multichannel first-in first-out buffer that aligns asynchronous temporally-lossy samples across channels by supplied correlated overflowing counter.

Methods:

Name Description
__init__

Constructor of the NonOverflowingCounterAlignedFifoBuffer.

flush

Allow to evict all present data because no new samples will be captured.

yeet

Attempts to synchronously retrieve the oldest set of samples from all channels of the buffer with a timeout.

__init__

__init__(
    keys: Iterable,
    timesteps_before_stale: int,
    num_bits_timestamp: int,
    maxlen: int | None = None,
)

Parameters:

Name Type Description Default
keys
Iterable

Set of uniquely identifying channel keys.

required
timesteps_before_stale
int

The number of samples in other channels after which a missing sample in a channel is marked missing.

required
num_bits_timestamp
int

The fixed-width of the counter generating the data.

required
maxlen
int | None

Fixed length of preallocated ring buffer. Defaults to None.

None

_put_output_queue

_put_output_queue(packet: dict) -> None

Places a ready to consume complete snapshot onto the output queue.

Parameters:

Name Type Description Default
packet
dict

Temporally aligned snapshot mapping unique channel-identifying keys to the corresponding sample.

required

flush

flush() -> None

Allow to evict all present data because no new samples will be captured.

yeet

yeet(timeout: float = 10.0) -> dict | None

Attempts to synchronously retrieve the oldest set of samples from all channels of the buffer with a timeout.

Parameters:

Name Type Description Default
timeout
float

How long to wait for new snapshot. Defaults to 10.0.

10.0

Returns:

Type Description
dict | None

dict | None: Multi-channel vector of the oldest sample or None if no new data became available until timeout.

NonOverflowingCounterConverter

A counter value converter from overflowing fixed range to non-overflowing values, starting at 0 for the first received sample.

Converts overflowing monotonically increasing counter from a sensor into a non-overflowing value, starting counting from 0, regardless of device's actual onboard counter.

Methods:

Name Description
__init__

Constructor of the NonOverflowingCounterConverter.

__init__

__init__(keys: Iterable[Any], num_bits_counter: int)

Parameters:

Name Type Description Default
keys
Iterable[Any]

Set of uniquely identifying channel keys.

required
num_bits_counter
int

The fixed-width of the counter generating the data.

required

_bar

_bar(key: Any, counter: int) -> int | None

Optimized counter converter function for continuous steady-state operation.

Previous counters are guaranteed to be non-0 after this function is activated.

Parameters:

Name Type Description Default
key
Any

The unique identifier of the channel.

required
counter
int

Monotonically increasing overflowing integer from a sensor.

required

Returns:

Type Description
int | None

int | None: Converted counter value starting at 0 counter value.

_foo

_foo(key: Any, counter: int) -> int | None

Startup multi-channel function that converts correlated over-flowing counter values from asynchronous sensors to a non-overflowing counter.

Switches to the continuous function when all channels have provided a starting sample. Else, reuses this function with branch conditional logic. The branched version is not ideal for continuous use, because other conditions never happen after the first iteration of the module on the channel.

Parameters:

Name Type Description Default
key
Any

The unique identifier of the channel.

required
counter
int

Monotonically increasing overflowing integer from a sensor.

required

Returns:

Type Description
int | None

int | None: Converted counter value starting at 0 counter value.

TimestampAlignedFifoBuffer

Bases: AlignedFifoBuffer

Multichannel first-in first-out buffer that aligns asynchronous temporally-lossy samples across channels by supplied correlated timestamps.

Allows yeeting from buffer if some keys have been empty for a while, while others continue producing.

Methods:

Name Description
__init__

Constructor of the TimestampAlignedFifoBuffer.

flush

Allow to evict all present data because no new samples will be captured.

yeet

Attempts to synchronously retrieve the oldest set of samples from all channels of the buffer with a timeout.

__init__

__init__(
    keys: Iterable,
    timesteps_before_stale: int,
    sampling_period: int,
    counter_limit: int,
    maxlen: int | None = None,
)

Parameters:

Name Type Description Default
keys
Iterable

Set of uniquely identifying channel keys.

required
timesteps_before_stale
int

The number of samples in other channels after which a missing sample in a channel is marked missing.

required
sampling_period
int

Sampling period in the same units as timestamp limit and timestamps.

required
counter_limit
int

The upper counting limit of the sensor's timestamp.

required
maxlen
int | None

Fixed length of preallocated ring buffer. Defaults to None.

None

_put_output_queue

_put_output_queue(packet: dict) -> None

Places a ready to consume complete snapshot onto the output queue.

Parameters:

Name Type Description Default
packet
dict

Temporally aligned snapshot mapping unique channel-identifying keys to the corresponding sample.

required

flush

flush() -> None

Allow to evict all present data because no new samples will be captured.

yeet

yeet(timeout: float = 10.0) -> dict | None

Attempts to synchronously retrieve the oldest set of samples from all channels of the buffer with a timeout.

Parameters:

Name Type Description Default
timeout
float

How long to wait for new snapshot. Defaults to 10.0.

10.0

Returns:

Type Description
dict | None

dict | None: Multi-channel vector of the oldest sample or None if no new data became available until timeout.

TimestampToCounterConverter

A counter value converter from overflowing fixed range timestamp to non-overflowing counter values, starting at 0 for the first received sample.

Converts overflowing monotonically increasing timestamp of a certain sample rate from a sensor into a non-overflowing counter value, starting counting from 0, regardless of device's actual onboard timestamp.

Methods:

Name Description
__init__

Constructor of the TimestampToCounterConverter.

__init__

__init__(keys: Iterable[Any], sampling_period: int, counter_limit: int)

Parameters:

Name Type Description Default
keys
Iterable[Any]

Set of uniquely identifying channel keys.

required
sampling_period
int

Sampling period in the same units as timestamp limit and timestamps.

required
counter_limit
int

The upper counting limit of the sensor's timestamp.

required

_bar

_bar(key: Any, timestamp: int | float) -> int | float | None

Optimized counter converter function for continuous steady-state operation.

Measures the change in time between 2 measurements w.r.t. sensor device time and the max value before overlow. dt > 0 always thanks to modulo, even if sensor on-board clock overflows. Converts to the number of sample periods in the measured time delta window, allowing for slight skew. Rolling correlation using sample rate, previous and current time is more accurate than averaging over whole timelife.

Parameters:

Name Type Description Default
key
Any

The unique identifier of the channel.

required
timestamp
int

Monotonically increasing overflowing integer from a sensor.

required

Returns:

Type Description
int | float | None

int | None: Converted counter value starting at 0 counter value.

_foo

_foo(key: Any, timestamp: int | float) -> int | float | None

Startup multi-channel function that converts correlated over-flowing timestamp values from asynchronous sensors to a non-overflowing counter.

Sets the start time according to the first received packet and switches to the monotone calculation routine after. Has some tolerance to temporally skewed samples, when the skew is less than hald a sampling period. If the channel sample is the first in the overall buffer, will use it as reference starting point onward. Will return 0 start counter at the end of the function.

If it's not the very first packet, but first reading for this device, records if the capture was during or after the start reference. If the measurement taken during or after the reference measurement and no chance for overflow, will return 0 start counter at the end of the function. If the measurement taken after the overflow of the on-sensor clock and effectively after the reference measurement, will return 0 start counter at the end of the function. Will discard the sample as stale to ensure alignment otherwise.

Parameters:

Name Type Description Default
key
Any

The unique identifier of the channel.

required
timestamp
int

Monotonically increasing overflowing integer from a sensor.

required

Returns:

Type Description
int | float | None

int | None: Converted counter value starting at 0 counter value.