Skip to content

hermes.base.nodes

The module is responsible for wrapping common functionality of Pipeline Nodes. It defines a unified interface to the ZeroMQ message exchange layer for all concrete Pipelines.

pipeline_interface - base interface for Pipeline function.

pipeline - abstract Pipeline with common data generation and ingestion logic to subclass.

pipeline_interface

Classes:

Name Description
PipelineInterface

Interface for the Pipeline Node component.

PipelineInterface

Bases: NodeInterface

Interface for the Pipeline Node component.

Methods:

Name Description
create_stream

Instantiate Stream datastructure object specific to this Pipeline.

_is_done abstractmethod property

_is_done: bool

Read-only property identifying if the Node completed operation.

Returns:

Name Type Description
bool bool

Whether the Node completed its function.

_activate_data_poller abstractmethod

_activate_data_poller() -> None

Start listening for new data from other Nodes.

_activate_kill_poller abstractmethod

_activate_kill_poller() -> None

Start listening for the KILL signal on the special PUB/SUB socket that coordinates program termination.

_deactivate_kill_poller abstractmethod

_deactivate_kill_poller() -> None

Stop listening for the KILL signal.

_get_sync_socket abstractmethod

_get_sync_socket() -> SyncSocket

Get reference to the socket used for synchronization of the Node to its master Broker.

Returns:

Type Description
SyncSocket

zmq.SyncSocket: ZeroMQ socket of the Node connected to the local master Broker.

_initialize abstractmethod

_initialize() -> None

Node-specific initialization procedure.

Pre-run setup of the backend specific to the Node implementaiton. Generic setup should be run first.

_log_source_tag abstractmethod classmethod

_log_source_tag() -> str

Read-only property uniquely identifying the Node.

Returns:

Name Type Description
str str

Unique key identifying the Node in the data exchange.

_on_poll abstractmethod

_on_poll(poll_res: tuple[list[SyncSocket], list[int]]) -> None

Callback to perform some logic everytime some data transactions are received by the Poller.

Generic entry-point for all types of Nodes, based on their active Poller settings. NOTE: if Node in JoinState, kill socket is no longer in the Poller and only higher-level logic is triggered.

Parameters:

Name Type Description Default
poll_res
tuple[list[SyncSocket], list[int]]

Reference to the complete captured result of listening by the Poller.

required

_on_sync_complete abstractmethod

_on_sync_complete() -> None

Callback to perform some logic after synchronization of Nodes is completed and indicated by the Broker.

_poll abstractmethod

_poll() -> tuple[list[SyncSocket], list[int]]

Block for new ZeroMQ data to collect at the Poller.

Listens for events when new data is received from or when new data can be written to sockets, based on the active Poller settings of the Node implementation.

Returns:

Type Description
tuple[list[SyncSocket], list[int]]

tuple[list[zmq.SyncSocket], list[int]]: Result of listening on the sockets registered by the Poller.

_process_data abstractmethod

_process_data(topic: str, msg: dict) -> None

Main iteration loop logic for the Node during its running phase.

Contained logic has to deal with async multiple modalities. Must end with calling _send_end_packet.

Parameters:

Name Type Description Default
topic
str

Uniquely identified modality of the contained data.

required
msg
dict

Received data of the corresponding modality.

required

_send_kill_to_broker abstractmethod

_send_kill_to_broker() -> None

Send a slave KILL signal to the local Broker in case program termination by the slave Node is recorded.

_set_state abstractmethod

_set_state(state) -> None

User-defined logic for FSM state transition.

Parameters:

Name Type Description Default
state
Any

New state to transition to.

required

_stop_new_data abstractmethod

_stop_new_data() -> None

Stop sampling data, continue sending already captured until none is left.

_trigger_stop abstractmethod

_trigger_stop() -> None

Trigger to the Node's internal procedures and background threads to gracefully wrap-up.

Producer: stops sampling data, continue sending already captured until none is left, with last message labeled 'END'. Consumer: continues listening to data until each of subscribed Producers sent the last message. Pipeline: continues listening to data to produce results until each data sources sent the last message, and then labels the last message with 'END'.

create_stream abstractmethod classmethod

create_stream(stream_spec: dict) -> Stream

Instantiate Stream datastructure object specific to this Pipeline.

Should also be a class method to create Stream objects on consumers.

Parameters:

Name Type Description Default
stream_spec
dict

Mapping of corresponding Stream object parameters to user-defined configuration values.

required

Returns:

Name Type Description
Stream Stream

Datastructure object of the corresponding Node, configured according to the user-provided specification.

pipeline

Classes:

Name Description
Pipeline

An abstract class to interface with a data-producing worker.

Pipeline

Bases: PipelineInterface, Node

An abstract class to interface with a data-producing worker.

Methods:

Name Description
__call__

Node objects are callable to start the FSM as entry-point.

__init__

Constructor of the Pipeline parent class.

create_stream

Instantiate Stream datastructure object specific to this Pipeline.

__call__

__call__()

Node objects are callable to start the FSM as entry-point.

__init__

__init__(
    host_ip: str,
    stream_out_spec: dict,
    stream_in_specs: list[dict],
    logging_spec: dict,
    port_pub: str = PORT_BACKEND,
    port_sub: str = PORT_FRONTEND,
    port_sync: str = PORT_SYNC_HOST,
    port_killsig: str = PORT_KILL,
) -> None

Parameters:

Name Type Description Default
host_ip
str

IP address of the local master Broker.

required
stream_out_spec
dict

Mapping of corresponding Stream object parameters to user-defined configuration values.

required
stream_in_specs
list[dict]

List of mappings of user-configured incoming modalities.

required
logging_spec
dict

Mapping of Storage object parameters to user-defined configuration values.

required
port_pub
str

Local port to publish to for local master Broker to relay. Defaults to PORT_BACKEND.

PORT_BACKEND
port_sub
str

Local port to subscribe to for incoming relayed data from the local master Broker. Defaults to PORT_FRONTEND.

PORT_FRONTEND
port_sync
str

Local port to listen to for local master Broker's startup coordination. Defaults to PORT_SYNC_HOST.

PORT_SYNC_HOST
port_killsig
str

Local port to listen to for local master Broker's termination signal. Defaults to PORT_KILL.

PORT_KILL

_log_source_tag abstractmethod classmethod

_log_source_tag() -> str

Read-only property uniquely identifying the Node.

Returns:

Name Type Description
str str

Unique key identifying the Node in the data exchange.

_poll_data_packets

_poll_data_packets() -> None

Receive data packets in a steady state.

Gets called every time one of the requestes modalities produced new data. In normal operation mode, all messages are 2-part.

_poll_ending_data_packets

_poll_ending_data_packets() -> None

Receive data packets from producers and monitor for end-of-stream signal.

When system triggered a safe exit, Pipeline gets a mix of normal 2-part messages and 3-part 'END' message from each Producer that safely exited. It's more efficient to dynamically switch the callback instead of checking every message.

Processes packets on each modality until all data sources sent the 'END' packet. If triggered to stop and no more available data, sends empty 'END' packet and joins.

_process_data abstractmethod

_process_data(topic: str, msg: dict) -> None

Main iteration loop logic for the Node during its running phase.

Contained logic has to deal with async multiple modalities. Must end with calling _send_end_packet.

Parameters:

Name Type Description Default
topic
str

Uniquely identified modality of the contained data.

required
msg
dict

Received data of the corresponding modality.

required

_publish

_publish(tag: str, **kwargs) -> None

Pass generated data to the ZeroMQ message exchange layer.

Parameters:

Name Type Description Default
tag
str

Uniquely identifying key for the data generated by the Node.

required

_send_end_packet

_send_end_packet() -> None

Send 'END' empty packet and label Node as done to safely finish and exit the process and its threads.

_stop_new_data abstractmethod

_stop_new_data() -> None

Stop sampling data, continue sending already captured until none is left.

_store_and_broadcast

_store_and_broadcast(tag: str, **kwargs) -> None

Place captured data into the corresponding Stream datastructure and transmit serialized ZeroMQ packets to subscribers.

Parameters:

Name Type Description Default
tag
str

Uniquely identifying key for the modality to label data for message exchange.

required

create_stream abstractmethod classmethod

create_stream(stream_spec: dict) -> Stream

Instantiate Stream datastructure object specific to this Pipeline.

Should also be a class method to create Stream objects on consumers.

Parameters:

Name Type Description Default
stream_spec
dict

Mapping of corresponding Stream object parameters to user-defined configuration values.

required

Returns:

Name Type Description
Stream Stream

Datastructure object of the corresponding Node, configured according to the user-provided specification.