Source code for obstools.broker_listener

# -*- coding: utf-8 -*-
#
#  This file is part of LDTObserverTools.
#
#   This Source Code Form is subject to the terms of the Mozilla Public
#   License, v. 2.0. If a copy of the MPL was not distributed with this
#   file, You can obtain one at http://mozilla.org/MPL/2.0/.
#
#  Created on 31-May-2023
#
#  @author: tbowers

"""Apache ActiveMQ Broker Listener Module

LDTObserverTools contains python ports of various LDT Observer Tools

Lowell Discovery Telescope (Lowell Observatory: Flagstaff, AZ)
https://lowell.edu

This file contains the ``ActiveMQ_Listener`` class used by the
``deveny_collfocus`` routine for retrieving current values from the LDT's
ActiveMQ broker.  This is kept separate so that an ``ImportError`` on ``stomp``
simply causes this module to not import and the calling module can go about its
business.
"""

# Built-In Libraries

# 3rd-Party Libraries
import stomp
import yaml
import xmltodict

# Local Libraries


[docs]class ActiveMQ_Listener: """Broadcast (status) message listening class This class listens to the ActiveMQ broker and places a parsed version of the desired broker messages into attributes accessible by the calling function. This class is adapted from similar ones used with the LORAX project, being developed at Lowell Observatory. Parameters ---------- config_file : :obj:`str` or :obj:`pathlib.Path` The location of the configuration file to read """ def __init__(self, config_file): # Initialize self.mounttemp_from_broker = {} self.grangle_from_broker = {} # Read the config file. with open(config_file, "r", encoding="utf-8") as stream: try: self.config = yaml.safe_load(stream) except yaml.YAMLError as exc: print(exc) # Get the broker host from the configuration. # Make a connection to the broker. self.hosts = [tuple(self.config["broker_hosts"])] try: # Get a connection handle. self.conn = stomp.Connection(host_and_ports=self.hosts) # Set up a listener and and connect. self.conn.set_listener("", self.MyListener(self)) self.conn.connect(wait=True) except stomp.exception.StompException: # If we cannot connect to the broker, return now and don't subscribe return # Subscribe to specified topics for topic in [ f"{data_source}_incoming_topic" for data_source in ["mounttemp", "grangle"] ]: self.conn.subscribe( id=1, destination="/topic/" + self.config[topic], headers={}, )
[docs] class MyListener(stomp.ConnectionListener): """Class for listening to the STOMPing This listener runs asynchronously from the main body of the program through stomp's threading functionality. When a broker message is received, it is parsed and placed into the ``parent`` attribute(s) to be accessed whenever the main code needs it. Parameters ---------- parent : :obj:`class` The parent class into whose attributes values from the broker message are placed. """ def __init__(self, parent): # Set parent for communication upward self.parent = parent
[docs] def on_error(self, message): # TODO: This really needs to do something substantial print(f'received an error "{message}"')
[docs] def on_message(self, message): # When a message is recieved, parse out and place into parent attributes # Get the topic topic = message.headers["destination"] if self.parent.config["mounttemp_incoming_topic"] in topic: status = xmltodict.parse(message.body)["TCSTelemetry"] self.parent.mounttemp_from_broker = self.parent.parse_deveny(status) if self.parent.config["grangle_incoming_topic"] in topic: status = xmltodict.parse(message.body)["DevenyTelemetry"] self.parent.grangle_from_broker = self.parent.parse_deveny(status)
[docs] def parse_deveny(self, status): """Parse an XMLTODICT status message Parameters ---------- status : :obj:`dict` Translated XML status message Returns ------- :obj:`dict` The parsed status dictionary for this particular use case. """ # Return empty dictionary if no status if status is None: return {} ret_dict = dict(status.items()) # Convert to float, if possible for key, val in status.items(): try: ret_dict[key] = float(val) except ValueError: ret_dict[key] = val # Return return ret_dict