hermes.base.broker¶
The module is responsible for managing the lifecycle of all locally-hosted Nodes. It is also the program's entrypoint to the continuous sensing and processing.
broker_interface - base interface for Broker function.
broker - concrete FSM-based Broker with distributed LAN connectivity.
broker_states - FSM states of the Broker.
broker_interface
¶
Classes:
| Name | Description |
|---|---|
BrokerInterface |
Interface for the Broker component. |
BrokerInterface
¶
Bases: ABC
Interface for the Broker component.
_activate_pubsub_poller
abstractmethod
¶
Register PUB-SUB sockets on both Broker interfaces for polling.
_add_brokered_node
abstractmethod
¶
_add_brokered_node(topic: str) -> None
Add a unique local Node identifier joining the exchange via the Broker.
Noes uniquely self-identify by the data topic they produce.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
str
|
Unique identifier of the Node. |
required |
_broker_packets
abstractmethod
¶
_broker_packets(
poll_res: ZMQResult,
on_data_received: Callable[[list[bytes]], None] = lambda _: None,
on_subscription_changed: Callable[[list[bytes]], None] = lambda _: None,
) -> None
Move packets between publishers and subscribers, local and remote.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
ZMQResult
|
New ZeroMQ packets from PUB or SUB interfaces. |
required |
|
_type_
|
Callback for data packets. Defaults to lambda_:None. |
lambda _: None
|
|
_type_
|
Callback for subscription status packets. Defaults to lambda_:None. |
lambda _: None
|
_check_for_kill
abstractmethod
¶
_check_for_kill(poll_res: ZMQResult) -> bool
Check if received packets contain a kill signal from a downstream Broker.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
ZMQResult
|
New ZeroMQ packets from PUB or SUB interfaces. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
bool |
bool
|
Whether a KILL message is contained in messages. |
_deactivate_pubsub_poller
abstractmethod
¶
Stop listening on the PUB or SUB interfaces for new data packets.
_get_brokered_nodes
abstractmethod
¶
Get the set of unique local Node identifiers that Broker manages.
Returns:
| Type | Description |
|---|---|
set[str]
|
set[str]: Set of unique identifiers. |
_get_duration
abstractmethod
¶
Get the user-requested active duration of the capture/streaming.
Returns:
| Type | Description |
|---|---|
float | None
|
float | None: Time in seconds, if specified on Broker launch. |
_get_host_ip
abstractmethod
¶
Get the Broker's host machine LAN IP address.
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
Host's IP address. |
_get_is_master_broker
abstractmethod
¶
Get the master status of this Broker in a distributed setup.
Returns:
| Name | Type | Description |
|---|---|---|
bool |
bool
|
Whether this Broker is the master. |
_get_node_addresses
abstractmethod
¶
Bulk-get socket identifiers and unique Node identifiers for Broker's local Nodes.
Returns:
| Type | Description |
|---|---|
dict[str, bytes]
|
dict[str, bytes]: Mapping of unique Node topics and their ZeroMQ socket identifiers. |
_get_num_backends
abstractmethod
¶
Get the number of XSUB interfaces to remote hosts, the Broker listens to.
Returns:
| Name | Type | Description |
|---|---|---|
int |
int
|
Number of sockets. |
_get_num_frontends
abstractmethod
¶
Get the number of XPUB interfaces to remote hosts, listening to the Broker.
Returns:
| Name | Type | Description |
|---|---|---|
int |
int
|
Number of sockets. |
_get_num_local_nodes
abstractmethod
¶
Get the number of Nodes hosted and managed by this Broker.
Returns:
| Name | Type | Description |
|---|---|---|
int |
int
|
Number of locally attached Nodes. |
_get_poller
abstractmethod
¶
Get the ZeroMQ Poller object responsible for socket management.
Returns:
| Type | Description |
|---|---|
Poller
|
zmq.Poller: ZeroMQ poller to (de)activate listening on an interface. |
_get_remote_broker_addresses
abstractmethod
¶
Bulk-get socket identifiers and IP addresses of remote Brokers.
Returns:
| Type | Description |
|---|---|
dict[str, bytes]
|
dict[str, bytes]: Mapping of remote IPs and their ZeroMQ socket identifiers. |
_get_remote_pub_brokers
abstractmethod
¶
Get the list of remote publishing Brokers' IPs.
Returns:
| Type | Description |
|---|---|
list[str]
|
list[str]: List of IP addresses. |
_get_remote_sub_brokers
abstractmethod
¶
Get the list of remote subscribing Brokers' IPs.
Returns:
| Type | Description |
|---|---|
list[str]
|
list[str]: List of IP addresses. |
_get_start_time
abstractmethod
¶
Get the start time when the Broker set everything up and started streaming.
Useful for measuring run time of the experiment, excluding the lengthy setup process.
Returns:
| Name | Type | Description |
|---|---|---|
float |
float
|
Time in seconds since epoch. |
_get_sync_host_socket
abstractmethod
¶
Get the reference to the RCV socket for syncing local Nodes.
Returns:
| Type | Description |
|---|---|
SyncSocket
|
zmq.SyncSocket: ZeroMQ socket used to communicate SYNC process. |
_get_sync_remote_socket
abstractmethod
¶
Get the reference to the RCV socket for syncing remote Brokers.
Returns:
| Type | Description |
|---|---|
SyncSocket
|
zmq.SyncSocket: ZeroMQ socket used to communicate SYNC process. |
_log_source_tag
abstractmethod
classmethod
¶
Read-only class property identifying the component.
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
Unique identifier. |
_poll
abstractmethod
¶
_poll(timeout_ms: int) -> ZMQResult
Block until any new packets are available on PUB or SUB Broker interfaces.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
int
|
Polling timeout duration to re-evaluate check for manual CLI termination. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
ZMQResult |
ZMQResult
|
New ZeroMQ packets from PUB or SUB interfaces. |
_publish_kill
abstractmethod
¶
Send kill signals to upstream Brokers and local Nodes.
_remove_brokered_node
abstractmethod
¶
_remove_brokered_node(topic: str) -> None
Remove the existing local Node identifier from the exchange via the Broker.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
str
|
Unique identifier of the Node. |
required |
_set_node_addresses
abstractmethod
¶
_set_node_addresses(node_addresses: dict[str, bytes]) -> None
Bulk-set socket identifiers and unique Node identifiers for Broker's local Nodes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
dict[str, bytes]
|
Mapping of unique Node topics and their ZeroMQ socket identifiers. |
required |
_set_remote_broker_addresses
abstractmethod
¶
_set_remote_broker_addresses(remote_brokers: dict[str, bytes]) -> None
Bulk-set socket identifiers and IP addresses of remote Brokers for this Broker.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
dict[str, bytes]
|
Mapping of remote IPs and their ZeroMQ socket identifiers. |
required |
_set_state
abstractmethod
¶
_set_state(state) -> None
User defined logic for FSM state transition.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
Any
|
New state to transition to. |
required |
_start_local_nodes
abstractmethod
¶
Spawn specified locally hosted Nodes, each in a separate process.
broker
¶
Classes:
| Name | Description |
|---|---|
Broker |
Manager of the lifecycle of all connected local Nodes and data broker. |
Broker
¶
Bases: BrokerInterface
Manager of the lifecycle of all connected local Nodes and data broker.
Facilitates high-performance message exchange using ZeroMQ zero-copy communication across distributed sensing and computing hosts. Passes data over sockets to external Brokers or over shared memory for local Nodes.
Hosts control logic of interactive proxy/server. Will launch/destroy/connect to Nodes on creation and ad-hoc. Will use a separate process for each streamer and consumer. Each Node connects only to its local Broker, which then exposes its data to outside LAN subscribers.
Uses fixed ports for communication under zmq_utils.py.
Use of other ports is discouraged.
Macro-defined ports are preferred for consistency in a distributed, multi-host setup.
Uses hierarchical coordination of distributed host synchronization/setup. Each Broker first starts up and sync local Nodes. Then syncs with all expected remote hosts, until all are ready. After all are ready, master Broker communicates trigger signal to start streaming.
Methods:
| Name | Description |
|---|---|
__call__ |
The main FSM loop of the Broker. |
__init__ |
Constructor of the Broker component responsible for the lifecycle of all local Nodes and for message exchange across them and distributed hosts. |
connect_to_remote_broker |
Connects to a known address and port of external LAN data broker. |
expose_to_remote_broker |
Exposes a known address and port to remote networked subscribers if configured. |
set_is_quit |
External asynchronous trigger to indicate termination to the Broker. |
subscribe_to_killsig |
Subscribes to external kill signal of another host as master. |
__call__
¶
__call__(duration_s: float | None = None) -> None
The main FSM loop of the Broker.
Runs continuously until the user ends the experiment or after the specified duration. The duration start to count only after all Nodes established communication and synced.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
float | None
|
Duration of data capturing/streaming. Defaults to None. |
None
|
__init__
¶
__init__(
host_ip: str,
node_specs: list[dict],
port_backend: str = PORT_BACKEND,
port_frontend: str = PORT_FRONTEND,
port_sync_host: str = PORT_SYNC_HOST,
port_sync_remote: str = PORT_SYNC_REMOTE,
port_killsig: str = PORT_KILL,
is_master_broker: bool = False,
) -> None
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
str
|
Public LAN IP address of this host. |
required |
|
list[dict]
|
List of to-be-created Node specification dictionaries. |
required |
|
str
|
XSUB port of the Broker. Defaults to PORT_BACKEND. |
PORT_BACKEND
|
|
str
|
XPUB port of the Broker. Defaults to PORT_FRONTEND. |
PORT_FRONTEND
|
|
str
|
Port for SYNC socket to coordinate startup of local Nodes. Defaults to PORT_SYNC_HOST. |
PORT_SYNC_HOST
|
|
str
|
Port for SYNC socket to coordinate startup across remote hosts. Defaults to PORT_SYNC_REMOTE. |
PORT_SYNC_REMOTE
|
|
str
|
Port of the KILL signal this Broker announces from. Defaults to PORT_KILL. |
PORT_KILL
|
|
bool
|
Whether this Broker is the master in the distributed host setup. Defaults to False. |
False
|
connect_to_remote_broker
¶
expose_to_remote_broker
¶
expose_to_remote_broker(addr: list[str]) -> None
Exposes a known address and port to remote networked subscribers if configured.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
list[str]
|
List of IP addresses of remote hosts (other Brokers). |
required |
set_is_quit
¶
External asynchronous trigger to indicate termination to the Broker.
subscribe_to_killsig
¶
subscribe_to_killsig(addr: str, port_killsig: str = PORT_KILL) -> None
Subscribes to external kill signal of another host as master.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
str
|
IP address of the master Broker in a distributed setting. |
required |
|
str
|
Port of the remote Broker to listen to for the termination signal. Defaults to PORT_KILL. |
PORT_KILL
|
broker_states
¶
Classes:
| Name | Description |
|---|---|
AbstractBrokerState |
Abstract class for the Broker FSM. |
InitState |
Initialization state of the Broker to launch local Nodes. |
JoinBrokerBarrierState |
Waits until all remote Brokers, if any, have their local Nodes gracefully terminated. |
JoinNodeBarrierState |
Waits until all local Nodes send final packets then quits itself. |
KillState |
Received 1 of 3 possible KILL signals to terminate. |
RunningState |
Stead-state capturing or streaming state of the Broker. |
StartState |
Start state of the Broker that waits or initiates distributed launch. |
SyncBrokerBarrierState |
Synchronization state of the Broker that waits until all remote Brokers setup. |
SyncNodeBarrierState |
Synchronization state of the Broker that waits until all local Nodes setup. |
AbstractBrokerState
¶
Bases: StateInterface
Abstract class for the Broker FSM.
Can be externally triggered into the KILL state from any child State class.
Methods:
| Name | Description |
|---|---|
__init__ |
Constructor of the AbstractBrokerState class. |
run |
Run the logic of the currently selected state. |
__init__
¶
__init__(context: BrokerInterface)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
BrokerInterface
|
Reference to the Broker object. |
required |
InitState
¶
Bases: AbstractBrokerState
Initialization state of the Broker to launch local Nodes.
Activates broker poller sockets and goes in sync to wait for local Nodes to start up.
Methods:
| Name | Description |
|---|---|
__init__ |
Constructor of the AbstractBrokerState class. |
__init__
¶
__init__(context: BrokerInterface)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
BrokerInterface
|
Reference to the Broker object. |
required |
JoinBrokerBarrierState
¶
Bases: AbstractBrokerState
Waits until all remote Brokers, if any, have their local Nodes gracefully terminated.
Wait for all dependent remote Brokers to send the acknowledgement messages that they no longer dependend on this Broker's data.
Continue brokering packets until signalled by all remote hosts that there will be no more packets. Use the remote SYNC socket to coordinate when every Broker can exit.
JoinNodeBarrierState
¶
Bases: AbstractBrokerState
Waits until all local Nodes send final packets then quits itself.
Wait for all processes (local and remote) to send the last messages before closing. Continue brokering packets until signalled by all publishers that there will be no more packets. Append a frame to the ZeroMQ message that indicates the last message from the sensor.
_check_host_sync_socket
¶
Check if a local Node sent a request on the SYNC socket to indicate its closure.
Can be triggered by all local Nodes: Producer, Consumer, or Pipeline, sending 'EXIT?' request.
_is_finished
¶
Convenience method indicating if any local Nodes remain active.
Will wait until all local Nodes finish.
Returns:
| Name | Type | Description |
|---|---|---|
bool |
bool
|
Whether all local Nodes gracefully terminated. |
_on_is_end_packet
¶
Callback to track brokering of last packets of local Producers and Pipelines.
Will get trigerred at most once per Node because publishing Nodes send it only once.
_release_local_node
¶
Release the local Node from the list of active Nodes of the Broker.
KillState
¶
Bases: AbstractBrokerState
Received 1 of 3 possible KILL signals to terminate.
Relay it to all Nodes and Brokers and wrap up gracefully. from the local Keyboard Interrupt; from the Master Broker; from the GUI;
Methods:
| Name | Description |
|---|---|
__init__ |
Constructor of the AbstractBrokerState class. |
__init__
¶
__init__(context: BrokerInterface)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
BrokerInterface
|
Reference to the Broker object. |
required |
RunningState
¶
Bases: AbstractBrokerState
Stead-state capturing or streaming state of the Broker.
Will run until the the experiment is stopped or after a fixed period, if provided.
_on_subscription_added
¶
Update a list on the Broker that keeps track of which Nodes are being brokered for.
StartState
¶
Bases: AbstractBrokerState
Start state of the Broker that waits or initiates distributed launch.
Trigger local Nodes to start logging when the agreed start time arrives.
Methods:
| Name | Description |
|---|---|
__init__ |
Constructor of the AbstractBrokerState class. |
__init__
¶
__init__(context: BrokerInterface)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
BrokerInterface
|
Reference to the Broker object. |
required |
SyncBrokerBarrierState
¶
Bases: AbstractBrokerState
Synchronization state of the Broker that waits until all remote Brokers setup.
Communicate to other Brokers that every local device is ready.
SyncNodeBarrierState
¶
Bases: AbstractBrokerState
Synchronization state of the Broker that waits until all local Nodes setup.
Waits until all local Nodes signalled that they are initialized and ready to go.
Methods:
| Name | Description |
|---|---|
__init__ |
Constructor of the AbstractBrokerState class. |
__init__
¶
__init__(context: BrokerInterface)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
|
BrokerInterface
|
Reference to the Broker object. |
required |