Skip to content

hermes.base.broker

The module is responsible for managing the lifecycle of all locally-hosted Nodes. It is also the program's entrypoint to the continuous sensing and processing.

broker_interface - base interface for Broker function.

broker - concrete FSM-based Broker with distributed LAN connectivity.

broker_states - FSM states of the Broker.

broker_interface

Classes:

Name Description
BrokerInterface

Interface for the Broker component.

BrokerInterface

Bases: ABC

Interface for the Broker component.

_activate_pubsub_poller abstractmethod

_activate_pubsub_poller() -> None

Register PUB-SUB sockets on both Broker interfaces for polling.

_add_brokered_node abstractmethod

_add_brokered_node(topic: str) -> None

Add a unique local Node identifier joining the exchange via the Broker.

Noes uniquely self-identify by the data topic they produce.

Parameters:

Name Type Description Default
topic
str

Unique identifier of the Node.

required

_broker_packets abstractmethod

_broker_packets(
    poll_res: ZMQResult,
    on_data_received: Callable[[list[bytes]], None] = lambda _: None,
    on_subscription_changed: Callable[[list[bytes]], None] = lambda _: None,
) -> None

Move packets between publishers and subscribers, local and remote.

Parameters:

Name Type Description Default
poll_res
ZMQResult

New ZeroMQ packets from PUB or SUB interfaces.

required
on_data_received
_type_

Callback for data packets. Defaults to lambda_:None.

lambda _: None
on_subscription_changed
_type_

Callback for subscription status packets. Defaults to lambda_:None.

lambda _: None

_check_for_kill abstractmethod

_check_for_kill(poll_res: ZMQResult) -> bool

Check if received packets contain a kill signal from a downstream Broker.

Parameters:

Name Type Description Default
poll_res
ZMQResult

New ZeroMQ packets from PUB or SUB interfaces.

required

Returns:

Name Type Description
bool bool

Whether a KILL message is contained in messages.

_deactivate_pubsub_poller abstractmethod

_deactivate_pubsub_poller() -> None

Stop listening on the PUB or SUB interfaces for new data packets.

_get_brokered_nodes abstractmethod

_get_brokered_nodes() -> set[str]

Get the set of unique local Node identifiers that Broker manages.

Returns:

Type Description
set[str]

set[str]: Set of unique identifiers.

_get_duration abstractmethod

_get_duration() -> float | None

Get the user-requested active duration of the capture/streaming.

Returns:

Type Description
float | None

float | None: Time in seconds, if specified on Broker launch.

_get_host_ip abstractmethod

_get_host_ip() -> str

Get the Broker's host machine LAN IP address.

Returns:

Name Type Description
str str

Host's IP address.

_get_is_master_broker abstractmethod

_get_is_master_broker() -> bool

Get the master status of this Broker in a distributed setup.

Returns:

Name Type Description
bool bool

Whether this Broker is the master.

_get_node_addresses abstractmethod

_get_node_addresses() -> dict[str, bytes]

Bulk-get socket identifiers and unique Node identifiers for Broker's local Nodes.

Returns:

Type Description
dict[str, bytes]

dict[str, bytes]: Mapping of unique Node topics and their ZeroMQ socket identifiers.

_get_num_backends abstractmethod

_get_num_backends() -> int

Get the number of XSUB interfaces to remote hosts, the Broker listens to.

Returns:

Name Type Description
int int

Number of sockets.

_get_num_frontends abstractmethod

_get_num_frontends() -> int

Get the number of XPUB interfaces to remote hosts, listening to the Broker.

Returns:

Name Type Description
int int

Number of sockets.

_get_num_local_nodes abstractmethod

_get_num_local_nodes() -> int

Get the number of Nodes hosted and managed by this Broker.

Returns:

Name Type Description
int int

Number of locally attached Nodes.

_get_poller abstractmethod

_get_poller() -> Poller

Get the ZeroMQ Poller object responsible for socket management.

Returns:

Type Description
Poller

zmq.Poller: ZeroMQ poller to (de)activate listening on an interface.

_get_remote_broker_addresses abstractmethod

_get_remote_broker_addresses() -> dict[str, bytes]

Bulk-get socket identifiers and IP addresses of remote Brokers.

Returns:

Type Description
dict[str, bytes]

dict[str, bytes]: Mapping of remote IPs and their ZeroMQ socket identifiers.

_get_remote_pub_brokers abstractmethod

_get_remote_pub_brokers() -> list[str]

Get the list of remote publishing Brokers' IPs.

Returns:

Type Description
list[str]

list[str]: List of IP addresses.

_get_remote_sub_brokers abstractmethod

_get_remote_sub_brokers() -> list[str]

Get the list of remote subscribing Brokers' IPs.

Returns:

Type Description
list[str]

list[str]: List of IP addresses.

_get_start_time abstractmethod

_get_start_time() -> float

Get the start time when the Broker set everything up and started streaming.

Useful for measuring run time of the experiment, excluding the lengthy setup process.

Returns:

Name Type Description
float float

Time in seconds since epoch.

_get_sync_host_socket abstractmethod

_get_sync_host_socket() -> SyncSocket

Get the reference to the RCV socket for syncing local Nodes.

Returns:

Type Description
SyncSocket

zmq.SyncSocket: ZeroMQ socket used to communicate SYNC process.

_get_sync_remote_socket abstractmethod

_get_sync_remote_socket() -> SyncSocket

Get the reference to the RCV socket for syncing remote Brokers.

Returns:

Type Description
SyncSocket

zmq.SyncSocket: ZeroMQ socket used to communicate SYNC process.

_log_source_tag abstractmethod classmethod

_log_source_tag() -> str

Read-only class property identifying the component.

Returns:

Name Type Description
str str

Unique identifier.

_poll abstractmethod

_poll(timeout_ms: int) -> ZMQResult

Block until any new packets are available on PUB or SUB Broker interfaces.

Parameters:

Name Type Description Default
timeout_ms
int

Polling timeout duration to re-evaluate check for manual CLI termination.

required

Returns:

Name Type Description
ZMQResult ZMQResult

New ZeroMQ packets from PUB or SUB interfaces.

_publish_kill abstractmethod

_publish_kill()

Send kill signals to upstream Brokers and local Nodes.

_remove_brokered_node abstractmethod

_remove_brokered_node(topic: str) -> None

Remove the existing local Node identifier from the exchange via the Broker.

Parameters:

Name Type Description Default
topic
str

Unique identifier of the Node.

required

_set_node_addresses abstractmethod

_set_node_addresses(node_addresses: dict[str, bytes]) -> None

Bulk-set socket identifiers and unique Node identifiers for Broker's local Nodes.

Parameters:

Name Type Description Default
node_addresses
dict[str, bytes]

Mapping of unique Node topics and their ZeroMQ socket identifiers.

required

_set_remote_broker_addresses abstractmethod

_set_remote_broker_addresses(remote_brokers: dict[str, bytes]) -> None

Bulk-set socket identifiers and IP addresses of remote Brokers for this Broker.

Parameters:

Name Type Description Default
remote_brokers
dict[str, bytes]

Mapping of remote IPs and their ZeroMQ socket identifiers.

required

_set_state abstractmethod

_set_state(state) -> None

User defined logic for FSM state transition.

Parameters:

Name Type Description Default
state
Any

New state to transition to.

required

_start_local_nodes abstractmethod

_start_local_nodes() -> None

Spawn specified locally hosted Nodes, each in a separate process.

broker

Classes:

Name Description
Broker

Manager of the lifecycle of all connected local Nodes and data broker.

Broker

Bases: BrokerInterface

Manager of the lifecycle of all connected local Nodes and data broker.

Facilitates high-performance message exchange using ZeroMQ zero-copy communication across distributed sensing and computing hosts. Passes data over sockets to external Brokers or over shared memory for local Nodes.

Hosts control logic of interactive proxy/server. Will launch/destroy/connect to Nodes on creation and ad-hoc. Will use a separate process for each streamer and consumer. Each Node connects only to its local Broker, which then exposes its data to outside LAN subscribers.

Uses fixed ports for communication under zmq_utils.py. Use of other ports is discouraged.

Macro-defined ports are preferred for consistency in a distributed, multi-host setup.

Uses hierarchical coordination of distributed host synchronization/setup. Each Broker first starts up and sync local Nodes. Then syncs with all expected remote hosts, until all are ready. After all are ready, master Broker communicates trigger signal to start streaming.

Methods:

Name Description
__call__

The main FSM loop of the Broker.

__init__

Constructor of the Broker component responsible for the lifecycle of all local Nodes and for message exchange across them and distributed hosts.

connect_to_remote_broker

Connects to a known address and port of external LAN data broker.

expose_to_remote_broker

Exposes a known address and port to remote networked subscribers if configured.

set_is_quit

External asynchronous trigger to indicate termination to the Broker.

subscribe_to_killsig

Subscribes to external kill signal of another host as master.

__call__

__call__(duration_s: float | None = None) -> None

The main FSM loop of the Broker.

Runs continuously until the user ends the experiment or after the specified duration. The duration start to count only after all Nodes established communication and synced.

Parameters:

Name Type Description Default
duration_s
float | None

Duration of data capturing/streaming. Defaults to None.

None

__init__

__init__(
    host_ip: str,
    node_specs: list[dict],
    port_backend: str = PORT_BACKEND,
    port_frontend: str = PORT_FRONTEND,
    port_sync_host: str = PORT_SYNC_HOST,
    port_sync_remote: str = PORT_SYNC_REMOTE,
    port_killsig: str = PORT_KILL,
    is_master_broker: bool = False,
) -> None

Parameters:

Name Type Description Default
host_ip
str

Public LAN IP address of this host.

required
node_specs
list[dict]

List of to-be-created Node specification dictionaries.

required
port_backend
str

XSUB port of the Broker. Defaults to PORT_BACKEND.

PORT_BACKEND
port_frontend
str

XPUB port of the Broker. Defaults to PORT_FRONTEND.

PORT_FRONTEND
port_sync_host
str

Port for SYNC socket to coordinate startup of local Nodes. Defaults to PORT_SYNC_HOST.

PORT_SYNC_HOST
port_sync_remote
str

Port for SYNC socket to coordinate startup across remote hosts. Defaults to PORT_SYNC_REMOTE.

PORT_SYNC_REMOTE
port_killsig
str

Port of the KILL signal this Broker announces from. Defaults to PORT_KILL.

PORT_KILL
is_master_broker
bool

Whether this Broker is the master in the distributed host setup. Defaults to False.

False

_stop

_stop() -> None

Gracefully exit after all local subprocesses terminate, and cleanup.

connect_to_remote_broker

connect_to_remote_broker(addr: str, port_pub: str = PORT_FRONTEND) -> None

Connects to a known address and port of external LAN data broker.

Parameters:

Name Type Description Default
addr
str

Remote host IP to connect to as a listener.

required
port_pub
str

Port number on which remote host publishes local Nodes' data. Defaults to PORT_FRONTEND.

PORT_FRONTEND

expose_to_remote_broker

expose_to_remote_broker(addr: list[str]) -> None

Exposes a known address and port to remote networked subscribers if configured.

Parameters:

Name Type Description Default
addr
list[str]

List of IP addresses of remote hosts (other Brokers).

required

set_is_quit

set_is_quit() -> None

External asynchronous trigger to indicate termination to the Broker.

subscribe_to_killsig

subscribe_to_killsig(addr: str, port_killsig: str = PORT_KILL) -> None

Subscribes to external kill signal of another host as master.

Parameters:

Name Type Description Default
addr
str

IP address of the master Broker in a distributed setting.

required
port_killsig
str

Port of the remote Broker to listen to for the termination signal. Defaults to PORT_KILL.

PORT_KILL

broker_states

Classes:

Name Description
AbstractBrokerState

Abstract class for the Broker FSM.

InitState

Initialization state of the Broker to launch local Nodes.

JoinBrokerBarrierState

Waits until all remote Brokers, if any, have their local Nodes gracefully terminated.

JoinNodeBarrierState

Waits until all local Nodes send final packets then quits itself.

KillState

Received 1 of 3 possible KILL signals to terminate.

RunningState

Stead-state capturing or streaming state of the Broker.

StartState

Start state of the Broker that waits or initiates distributed launch.

SyncBrokerBarrierState

Synchronization state of the Broker that waits until all remote Brokers setup.

SyncNodeBarrierState

Synchronization state of the Broker that waits until all local Nodes setup.

AbstractBrokerState

Bases: StateInterface

Abstract class for the Broker FSM.

Can be externally triggered into the KILL state from any child State class.

Methods:

Name Description
__init__

Constructor of the AbstractBrokerState class.

run

Run the logic of the currently selected state.

__init__

Parameters:

Name Type Description Default
context
BrokerInterface

Reference to the Broker object.

required

run abstractmethod

run() -> None

Run the logic of the currently selected state.

InitState

Bases: AbstractBrokerState

Initialization state of the Broker to launch local Nodes.

Activates broker poller sockets and goes in sync to wait for local Nodes to start up.

Methods:

Name Description
__init__

Constructor of the AbstractBrokerState class.

__init__

Parameters:

Name Type Description Default
context
BrokerInterface

Reference to the Broker object.

required

JoinBrokerBarrierState

Bases: AbstractBrokerState

Waits until all remote Brokers, if any, have their local Nodes gracefully terminated.

Wait for all dependent remote Brokers to send the acknowledgement messages that they no longer dependend on this Broker's data.

Continue brokering packets until signalled by all remote hosts that there will be no more packets. Use the remote SYNC socket to coordinate when every Broker can exit.

JoinNodeBarrierState

Bases: AbstractBrokerState

Waits until all local Nodes send final packets then quits itself.

Wait for all processes (local and remote) to send the last messages before closing. Continue brokering packets until signalled by all publishers that there will be no more packets. Append a frame to the ZeroMQ message that indicates the last message from the sensor.

_check_host_sync_socket

_check_host_sync_socket(poll_res: ZMQResult) -> None

Check if a local Node sent a request on the SYNC socket to indicate its closure.

Can be triggered by all local Nodes: Producer, Consumer, or Pipeline, sending 'EXIT?' request.

_is_finished

_is_finished() -> bool

Convenience method indicating if any local Nodes remain active.

Will wait until all local Nodes finish.

Returns:

Name Type Description
bool bool

Whether all local Nodes gracefully terminated.

_on_is_end_packet

_on_is_end_packet(msg: list[bytes]) -> None

Callback to track brokering of last packets of local Producers and Pipelines.

Will get trigerred at most once per Node because publishing Nodes send it only once.

_release_local_node

_release_local_node(topic: str) -> None

Release the local Node from the list of active Nodes of the Broker.

KillState

Bases: AbstractBrokerState

Received 1 of 3 possible KILL signals to terminate.

Relay it to all Nodes and Brokers and wrap up gracefully. from the local Keyboard Interrupt; from the Master Broker; from the GUI;

Methods:

Name Description
__init__

Constructor of the AbstractBrokerState class.

__init__

Parameters:

Name Type Description Default
context
BrokerInterface

Reference to the Broker object.

required

RunningState

Bases: AbstractBrokerState

Stead-state capturing or streaming state of the Broker.

Will run until the the experiment is stopped or after a fixed period, if provided.

_on_subscription_added

_on_subscription_added(msg: list[bytes]) -> None

Update a list on the Broker that keeps track of which Nodes are being brokered for.

StartState

Bases: AbstractBrokerState

Start state of the Broker that waits or initiates distributed launch.

Trigger local Nodes to start logging when the agreed start time arrives.

Methods:

Name Description
__init__

Constructor of the AbstractBrokerState class.

__init__

Parameters:

Name Type Description Default
context
BrokerInterface

Reference to the Broker object.

required

SyncBrokerBarrierState

Bases: AbstractBrokerState

Synchronization state of the Broker that waits until all remote Brokers setup.

Communicate to other Brokers that every local device is ready.

SyncNodeBarrierState

Bases: AbstractBrokerState

Synchronization state of the Broker that waits until all local Nodes setup.

Waits until all local Nodes signalled that they are initialized and ready to go.

Methods:

Name Description
__init__

Constructor of the AbstractBrokerState class.

__init__

Parameters:

Name Type Description Default
context
BrokerInterface

Reference to the Broker object.

required