hermes.base.nodes¶
The module is responsible for wrapping common functionality of Producer Nodes. It defines a unified interface to the ZeroMQ message exchange layer for all concrete Producers.
producer_interface - base interface for Producer function.
producer - abstract Producer with common data generation logic to subclass.
    Classes:
| Name | Description | 
|---|---|
| ProducerInterface | Interface for the Producer Node component. | 
    
              Bases: NodeInterface
Interface for the Producer Node component.
Methods:
| Name | Description | 
|---|---|
| create_stream | Instantiate Stream datastructure object specific to this Pipeline. | 
abstractmethod
      property
  
¶
    Read-only property identifying if the Node completed operation.
Returns:
| Name | Type | Description | 
|---|---|---|
| bool | bool | Whether the Node completed its function. | 
abstractmethod
  
¶
    Start listening for new data from other Nodes.
abstractmethod
  
¶
    Start listening for the KILL signal on the special PUB/SUB socket that coordinates program termination.
abstractmethod
  
¶
    Connect to device via its corresponding backend.
Returns:
| Name | Type | Description | 
|---|---|---|
| bool | bool | Whether connection to the device succeeded. | 
abstractmethod
  
¶
    Stop listening for the KILL signal.
abstractmethod
  
¶
    Get reference to the socket used for synchronization of the Node to its master Broker.
Returns:
| Type | Description | 
|---|---|
| SyncSocket | zmq.SyncSocket: ZeroMQ socket of the Node connected to the local master Broker. | 
abstractmethod
  
¶
    Node-specific initialization procedure.
Pre-run setup of the backend specific to the Node implementaiton. Generic setup should be run first.
abstractmethod
  
¶
    Node-specific externally triggered function to start keeping in memory streamed data.
abstractmethod
      classmethod
  
¶
    Read-only property uniquely identifying the Node.
Returns:
| Name | Type | Description | 
|---|---|---|
| str | str | Unique key identifying the Node in the data exchange. | 
abstractmethod
  
¶
_on_poll(poll_res: tuple[list[SyncSocket], list[int]]) -> None
Callback to perform some logic everytime some data transactions are received by the Poller.
Generic entry-point for all types of Nodes, based on their active Poller settings. NOTE: if Node in JoinState, kill socket is no longer in the Poller and only higher-level logic is triggered.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
|                    | tuple[list[SyncSocket], list[int]] | Reference to the complete captured result of listening by the Poller. | required | 
abstractmethod
  
¶
    Callback to perform some logic after synchronization of Nodes is completed and indicated by the Broker.
abstractmethod
  
¶
    Device-specific procedure for round-trip time estimation.
Concrete implementation of Producer must override the method if required to measure transmission delay for realtime/post-processing alignment of modalities that don't support system clock sync.
abstractmethod
  
¶
    Block for new ZeroMQ data to collect at the Poller.
Listens for events when new data is received from or when new data can be written to sockets, based on the active Poller settings of the Node implementation.
Returns:
| Type | Description | 
|---|---|
| tuple[list[SyncSocket], list[int]] | tuple[list[zmq.SyncSocket], list[int]]: Result of listening on the sockets registered by the Poller. | 
abstractmethod
  
¶
    Main iteration loop logic for the Node during its running phase.
Acquire data from your sensor as desired, and for each timestep. SDK thread pushes data into shared memory space, this thread pulls data and does all the processing, ensuring that lost packets are responsibility of the slow consumer.
abstractmethod
  
¶
    Send a slave KILL signal to the local Broker in case program termination by the slave Node is recorded.
abstractmethod
  
¶
_set_state(state) -> None
User-defined logic for FSM state transition.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
|                    | Any | New state to transition to. | required | 
abstractmethod
  
¶
    Stop sampling data, continue sending already captured until none is left.
abstractmethod
  
¶
    Trigger to the Node's internal procedures and background threads to gracefully wrap-up.
Producer: stops sampling data, continue sending already captured until none is left, with last message labeled 'END'. Consumer: continues listening to data until each of subscribed Producers sent the last message. Pipeline: continues listening to data to produce results until each data sources sent the last message, and then labels the last message with 'END'.
abstractmethod
      classmethod
  
¶
create_stream(stream_spec: dict) -> Stream
Instantiate Stream datastructure object specific to this Pipeline.
Should also be a class method to create Stream objects on consumers.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
|                    | dict | Mapping of corresponding Stream object parameters to user-defined configuration values. | required | 
Returns:
| Name | Type | Description | 
|---|---|---|
| Stream | Stream | Datastructure object of the corresponding Node, configured according to the user-provided specification. | 
    Classes:
| Name | Description | 
|---|---|
| Producer | An abstract class wrapping an interface with a particular device into a Producer Node. | 
    
              Bases: ProducerInterface, Node
An abstract class wrapping an interface with a particular device into a Producer Node.
Methods:
| Name | Description | 
|---|---|
| __call__ | Node objects are callable to start the FSM as entry-point. | 
| __init__ | Constructor of the Producer parent class. | 
| create_stream | Instantiate Stream datastructure object specific to this Pipeline. | 
__init__(
    host_ip: str,
    stream_out_spec: dict,
    logging_spec: dict,
    sampling_rate_hz: float = float("nan"),
    port_pub: str = PORT_BACKEND,
    port_sync: str = PORT_SYNC_HOST,
    port_killsig: str = PORT_KILL,
    transmit_delay_sample_period_s: float = float("nan"),
) -> None
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
|                    | str | IP address of the local master Broker. | required | 
|                    | dict | Mapping of corresponding Stream object parameters to user-defined configuration values. | required | 
|                    | dict | Mapping of Storage object parameters to user-defined configuration values. | required | 
|                    | float | Expected sample rate of the device. Defaults to float('nan'). | float('nan') | 
|                    | str | Local port to publish to for local master Broker to relay. Defaults to PORT_BACKEND. | PORT_BACKEND | 
|                    | str | Local port to listen to for local master Broker's startup coordination. Defaults to PORT_SYNC_HOST. | PORT_SYNC_HOST | 
|                    | str | Local port to listen to for local master Broker's termination signal. Defaults to PORT_KILL. | PORT_KILL | 
|                    | float | Duration of the period over which to estimate propagation delay of measurements from the corresponding device. Defaults to float('nan'). | float('nan') | 
abstractmethod
  
¶
    Connect to device via its corresponding backend.
Returns:
| Name | Type | Description | 
|---|---|---|
| bool | bool | Whether connection to the device succeeded. | 
abstractmethod
  
¶
    Node-specific externally triggered function to start keeping in memory streamed data.
abstractmethod
      classmethod
  
¶
    Read-only property uniquely identifying the Node.
Returns:
| Name | Type | Description | 
|---|---|---|
| str | str | Unique key identifying the Node in the data exchange. | 
abstractmethod
  
¶
    Device-specific procedure for round-trip time estimation.
Concrete implementation of Producer must override the method if required to measure transmission delay for realtime/post-processing alignment of modalities that don't support system clock sync.
abstractmethod
  
¶
    Main iteration loop logic for the Node during its running phase.
Acquire data from your sensor as desired, and for each timestep. SDK thread pushes data into shared memory space, this thread pulls data and does all the processing, ensuring that lost packets are responsibility of the slow consumer.
_publish(tag: str, **kwargs) -> None
Common method to save and publish the captured sample.
Best to deal with data structure (threading primitives) AFTER handing off packet to ZeroMQ. That way network thread can already start processing the packet.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
|                    | str | Uniquely identifying key for the modality to label data for message exchange. | required | 
    Send 'END' empty packet and label Node as done to safely finish and exit the process and its threads.
abstractmethod
  
¶
    Stop sampling data, continue sending already captured until none is left.
_store_and_broadcast(tag: str, **kwargs) -> None
Place captured data into the corresponding Stream datastructure and transmit serialized ZeroMQ packets to subscribers.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
|                    | str | Uniquely identifying key for the modality to label data for message exchange. | required | 
abstractmethod
      classmethod
  
¶
create_stream(stream_spec: dict) -> Stream
Instantiate Stream datastructure object specific to this Pipeline.
Should also be a class method to create Stream objects on consumers.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
|                    | dict | Mapping of corresponding Stream object parameters to user-defined configuration values. | required | 
Returns:
| Name | Type | Description | 
|---|---|---|
| Stream | Stream | Datastructure object of the corresponding Node, configured according to the user-provided specification. |