Source code for nemo_nowcast.worker

# Copyright 2016-2021 Doug Latornell, 43ravens

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at


# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

"""NEMO_Nowcast worker classes.
import logging
import logging.config
import os
import signal
import socket
import subprocess
import time

import attr
import requests
import sentry_sdk
import zmq
import zmq.log.handlers

from nemo_nowcast import CommandLineInterface, Config, Message

[docs]class WorkerError(Exception): """Raised when a worker encounters an error or exception that it can't recover from. """
[docs]@attr.s class NextWorker: """Construct a :py:class:`nemo_nowcast.worker.NextWorker` instance. Intended for use in a nowcast system implementation's :py:mod:`nowcast.next_workers` module where :py:func:`after_worker_name` functions return lists of :py:class:`nemo_nowcast.worker.NextWorker` instances that provide the sequence of workers and their arguments that are to be launched next. """ #: Name of the worker module including its package path, #: in dotted notation; #: e.g. :kbd:`nowcast.workers.download_weather`. module = attr.ib() #: Arguments to use when the worker is launched. #: Defaults to an empty list. args = attr.ib(default=attr.Factory(list)) #: Host to launch the worker on. #: Defaults to :kbd:`localhost` host = attr.ib(default="localhost")
[docs] def launch(self, config, logger_name): """Use a subprocess to launch worker on host with args as the worker's command-line arguments. :arg config: Nowcast system configuration that was read from the configuration file. :type config: :py:class:`nemo_nowcast.config.Config` :arg str logger_name: Name of the logger to emit messages on. This method *does not* wait for the subprocess to complete. """ logger = logging.getLogger(logger_name) if == "localhost": cmd = [config["python"], "-m"] config_file = config.file else: enabled_host_config = config["run"]["enabled hosts"][] cmd = [ "ssh",, "source", enabled_host_config["envvars"], ";", enabled_host_config["python"], "-m", ] config_file = enabled_host_config["config file"] cmd.extend([self.module, config_file]) if self.args: cmd.extend(self.args)"launching {self}", extra={"worker": self}) logger.debug(f"cmd = {cmd}", extra={"cmd": cmd}) subprocess.Popen(cmd)
[docs]@attr.s class NowcastWorker: """Construct a :py:class:`nemo_nowcast.worker.NowcastWorker` instance. """ #: The name of the worker instance. #: Used in the nowcast messaging system and for logging. name = attr.ib() #: Description of the worker. #: Used in the command-line interface. #: Typically the worker module docstring; #: i.e. :kbd:`description=__doc__`. description = attr.ib() #: Name of the package that the worker is part of; #: used to build the usage message. #: Use dotted notation; #: e.g. :kbd:`nowcast.workers`. package = attr.ib(default="nowcast.workers") #: :py:class:`nemo_nowcast.config.Config` object that holds #: the nowcast system configuration that is loaded from the configuration #: file in the :py:meth:`` method. config = attr.ib(default=attr.Factory(Config)) #: Logger for the worker. #: Configured from the :kbd:`logging` section of the configuration file #: in the :py:meth:`` method. logger = attr.ib(default=None) #: :py:class:`nemo_nowcast.cli.CommandLineInterface` object configured #: in the :py:meth:`` method #: to provide the default worker command-line interface that requires #: a nowcast config file name, #: and provides :kbd:`--debug`, #: :kbd:`--help`, #: and :kbd:`-h` options. cli = attr.ib(default=None) #: Function to be called to do the worker's job. #: Called with the worker's parsed command-line arguments #: :py:class:`argparse.Namespace` instance, #: the worker's configuration dict, #: and the :py:meth:`~nemo_nowcast.worker.NowcastWorker.tell_manager` #: method. #: Passed as an argument to the #: :py:meth:`` method. worker_func = attr.ib(default=None) #: Function to be called when the worker finishes successfully. #: Called with the worker's parsed command-line arguments #: :py:class:`argparse.Namespace` instance. #: Must return a string whose value is a success message type defined #: for the worker in the nowcast configuration file. #: Passed as an argument to the #: :py:meth:`` method. success = attr.ib(default=None) #: Function to be called when the worker fails. Called with the #: worker's parsed command-line arguments #: :py:class:`argparse.Namespace`; instance. #: Must return a string whose value is a failure message type defined #: for the worker in the nowcast configuration file. #: Passed as an argument to the #: :py:meth:`` method. failure = attr.ib(default=None) #: :py:class:`argparse.Namespace` instance containing the arguments #: and option flags and values parsed from the command-line when the #: :py:meth:`~nemo_nowcast.worker.NowcastWorker.setup method is called. _parsed_args = attr.ib(default=None) #: :py:class:`zmq.Context` instance that provides the basis for the #: nowcast messaging system. _context = attr.ib(default=attr.Factory(zmq.Context)) #: :py:class:`zmq.Context.socket` instance that is connected to the #: message broker to enable nowcast system messages to be exchanged #: with manager process. #: Created when the #: py:meth:`` method is called. _socket = attr.ib(default=None)
[docs] def init_cli(self): """Initialize the worker's command-line interface. The default worker command-line interface requires a nowcast config file name, and provides :kbd:`--debug`, :kbd:`--help`, and :kbd:`-h` options. Use the :py:meth:`~nemo_nowcast.cli.CommandLineInterface.add_argument` method to add worker-specific arguments and/or options to the interface. The :py:meth:`~nemo_nowcast.cli.CommandLineInterface.add_date_option` method is also available for the common task of adding a date option (e.g. :kbd:`--run-date`) to the interface. """ self.cli = CommandLineInterface(, description=self.description, package=self.package ) self.cli.build_parser() self.cli.parser.add_argument( "--debug", action="store_true", help=""" Send logging output to the console instead of the log file, and suppress messages to the nowcast manager process. Nowcast system messages that would normally be sent to the manager are logged to the console, suppressing interactions with the manager such as launching other workers. Intended only for use when the worker is run in foreground from the command-line. """, )
[docs] def run(self, worker_func, success, failure): """Prepare the worker to do its work, then do it. Preparations include: * Parsing the worker's command-line argument into a :py:class:`argparse.ArgumentParser.Namepsace` instance * Reading the nowcast configuration file named on the command line to a dict * Configuring the worker's logging interface * Installing handlers for signals from the operating system * Configuring the worker's interface to the nowcast messaging framework :arg worker_func: Function to be called to do the worker's job. Called with the worker's parsed command-line arguments :py:class:`argparse.Namespace` instance, and the worker's configuration dict. :type worker_func: Python function :arg success: Function to be called when the worker finishes successfully. Called with the worker's parsed command-line arguments :py:class:`argparse.Namespace` instance. Must return a string whose value is a success message type defined for the worker in the nowcast configuration file. :type success: Python function :arg failure: Function to be called when the worker fails. Called with the worker's parsed command-line arguments :py:class:`argparse.Namespace` instance. Must return a string whose value is a failure message type defined for the worker in the nowcast configuration file. :type failure: Python function """ self.worker_func = worker_func self.success, self.failure = success, failure self._parsed_args = self.cli.parser.parse_args() self.config.load(self._parsed_args.config_file) msg = self._configure_logging()"running in process {os.getpid()}")"read config from {self.config.file}") if not self._parsed_args.debug: self._install_signal_handlers() self._init_zmq_interface() self._do_work()
def _configure_logging(self): """Configure the worker's logging system interface. """ # Initialize exception logging to Sentry with client DSN URL from SENTRY_DSN envvar; # does nothing if SENTRY_DSN does not exist, is empty, or is not recognized by Sentry if not self._parsed_args.debug: sentry_sdk.init() self.logger = logging.getLogger( if "publisher" in self.config["logging"]: # Publish log messages to distributed logging aggregator logging_config = self.config["logging"]["publisher"] zmq_pub_config = logging_config["handlers"]["zmq_pub"] zmq_pub_config["context"] = self._context if in self.config["zmq"]["ports"]["logging"]: addrs = self.config["zmq"]["ports"]["logging"][] addrs = addrs if isinstance(addrs, list) else [addrs] hosts, ports = set(), set() for addr in addrs: try: # host:port host, port = addr.split(":") port = int(port) except AttributeError: host = None port = addr if hosts and host not in hosts: if ports and port not in ports: raise WorkerError( f"workers on difference hosts must use the same port number: " f"{}: {addrs}" ) else: if host: hosts.add(host) ports.add(port) else: ports = self.config["zmq"]["ports"]["logging"]["workers"] for port in ports: try: addr = f"tcp://*:{port}" zmq_pub_config["interface_or_socket"] = addr logging.config.dictConfig(logging_config) break except (zmq.ZMQError, ValueError): continue else: raise WorkerError("unable for find port to publish log messages to") for handler in self.logger.root.handlers: if isinstance(handler, zmq.log.handlers.PUBHandler): handler.root_topic = handler.formatters = { logging.DEBUG: logging.Formatter("%(message)s\n"), logging.INFO: logging.Formatter("%(message)s\n"), logging.WARNING: logging.Formatter("%(message)s\n"), logging.ERROR: logging.Formatter("%(message)s\n"), logging.CRITICAL: logging.Formatter("%(message)s\n"), } # Not sure why, but we need a brief pause before we start logging # messages time.sleep(0.25) msg = f"publishing log messages to {addr}" else: # Write log messages to local file system logging_config = self.config["logging"] logging.config.dictConfig(logging_config) msg = "writing log messages to local file system" if self._parsed_args.debug: for handler in self.logger.root.handlers: if == "console": # Activate console logging handler at the debug level handler.setLevel(logging.DEBUG) else: # Deactivate other logging handlers by setting their # levels very high try: # Assumes that the console logging level in the # config is set to a value >logging.DEBUG console_level = logging_config["handlers"]["console"]["level"] except (KeyError, TypeError): console_level = 100 handler.setLevel(console_level) msg = "**debug mode** writing log messages to console" return msg def _install_signal_handlers(self): """Set up interrupt and kill signal handlers. """ def sigint_handler(signal, frame): "interrupt signal (SIGINT or Ctrl-C) received; shutting down" ) self._socket.close() raise SystemExit signal.signal(signal.SIGINT, sigint_handler) def sigterm_handler(signal, frame):"termination signal (SIGTERM) received; shutting down") self._socket.close() raise SystemExit signal.signal(signal.SIGTERM, sigterm_handler) def _init_zmq_interface(self): """Initialize a ZeroMQ request/reply (REQ/REP) interface. :returns: ZeroMQ socket for communication with nowcast manager process. """ if self._parsed_args.debug: self.logger.debug("**debug mode** no connection to manager") return self._socket = self._context.socket(zmq.REQ) zmq_host = self.config["zmq"]["host"] zmq_port = self.config["zmq"]["ports"]["workers"] self._socket.setsockopt(zmq.TCP_KEEPALIVE, 1) self._socket.setsockopt(zmq.TCP_KEEPALIVE_IDLE, 900) self._socket.connect(f"tcp://{zmq_host}:{zmq_port}")"connected to {zmq_host} port {zmq_port}") def _do_work(self): """Execute the worker function, communicate its success or failure to the nowcast manager via the messaging framework, and handle any exceptions it raises. """ try: checklist = self.worker_func( self._parsed_args, self.config, self.tell_manager ) msg_type = self.success(self._parsed_args) self.tell_manager(msg_type, checklist) except WorkerError: msg_type = self.failure(self._parsed_args) self.tell_manager(msg_type) except SystemExit: # Normal termination pass except: self.logger.critical("unhandled exception:", exc_info=True) self.tell_manager("crash") self.logger.debug("shutting down", extra={"logger_name":}) self._context.destroy()
[docs] def tell_manager(self, msg_type, payload=None): """Exchange messages with the nowcast manager process. Message is composed of worker's name, msg_type, and payload. Acknowledgement message from manager process is logged and returned. :arg str msg_type: Key of the message type to send; must be defined for worker name in the configuration data structure. :arg payload: Data object to send in the message; e.g. dict containing worker's checklist of accomplishments. :returns: Acknowledgement message from manager process. """ try: worker_msgs = self.config["message registry"]["workers"][] except (KeyError, TypeError): raise WorkerError( f"worker not found in {self.config.file} message registry: {}" ) try: msg_words = worker_msgs[msg_type] except (KeyError, TypeError): raise WorkerError( f"message type not found for {} worker in {self.config.file} " f"message registry: {msg_type}" ) if self._parsed_args.debug: self.logger.debug( f"**debug mode** message that would have been sent to manager: ({msg_type} {msg_words})" ) return # Send message to nowcast manager message = Message(, msg_type, payload).serialize() self._socket.send_string(message) self.logger.debug( f"sent message: ({msg_type}) {worker_msgs[msg_type]}", extra={"logger_name":}, ) # Wait for and process response msg = self._socket.recv_string() message = Message.deserialize(msg) mgr_msgs = self.config["message registry"]["manager"] try: msg_words = mgr_msgs[message.type] except KeyError: raise WorkerError( f"message type not found for manager in {self.config.file} message registry: {message.type}" ) self.logger.debug( f"received message from {message.source}: ({message.type}) {msg_words}", extra={"logger_name":}, ) return message
[docs]def get_web_data( file_url, logger_name, filepath=None, session=None, chunk_size=100 * 1024, wait_exponential_multiplier=2, wait_retry_max=256, wait_exponential_max=60 * 60, ): """Download content from file_url and store it in filepath. If the first download attempt fails, retry at exponentially increasing intervals until wait_exponential_max is exceeded. The first retry occurs after wait_exponential_multiplier seconds The delay until the next retry is calculated by multiplying the previous delay by wait_exponential_multiplier. So, with the default argument values, the first retry will occur 2 seconds after the download fails, and subsequent retries will occur at 4, 8, 16, 32, 64, ..., 256, 256, ..., 3582 seconds after each failure. :param str file_url: URL to download content from. :param str logger_name: Name of the :py:class:`logging.Logger` to emit messages on. :param filepath: File path/name at which to store the downloaded content. If :py:class:`None` (the default) the content is returned. :type filepath: :py:class:`pathlib.Path` :param session: Session object to use for TCP connection pooling to improve performance for multiple requests to the same host. Defaults to :py:obj:`None` for simplicity, in which case a session is created within the function. If the function is called within loop, the recommended use pattern is to create the session outside the loop as a context manager: .. code-block:: python with requests.Session() as session: for thing in iterable: nemo_nowcast.worker.get_web_data( file_url, logger_name, filepath, session) :type session: :py:class:`requests.Session` :param chunk_size: Maximum number of bytes to read into memory at a time and write to disk as the download proceeds. The default value gives performance comparable to :command:`curl` when downloading weather forecast files from the Environment Canada collaboration FTP server. Tuning maybe required for downloads from other sources. :param wait_exponential_multiplier: Multiplicative factor that increases the time interval between retries. Also the number of seconds to wait before the first retry. :type wait_exponential_multiplier: int or float :param wait_retry_max: Maximum number of seconds to wait between retries. This caps the exponential growth of the wait time between retries, but retries continue with this period until wait_exponential_max is reached. :type wait_retry_max: int or float :param wait_exponential_max: Maximum number of seconds for the final retry wait interval. The actual wait time is less than or equal to the limit so it may be significantly less than the limit; e.g. with the default argument values the final retry wait interval will be 2048 seconds. :type wait_exponential_max: int or float :return: :py:class:`requests.Response.content` :rtype: bytes :raises: :py:exc:`nemo_nowcast.workers.WorkerError` """ logger = logging.getLogger(logger_name) if session is None: session = requests.Session() def _get_data(): try: response = session.get(file_url, stream=True) response.raise_for_status() if filepath is None: return response.content with"wb") as f: for block in response.iter_content(chunk_size=chunk_size): if not block: break f.write(block) except ( requests.exceptions.ConnectionError, requests.exceptions.HTTPError, socket.error, ) as e: logger.debug(f"received {e} from {file_url}") raise e try: return _get_data() except: wait_seconds = wait_exponential_multiplier total_seconds = wait_exponential_multiplier retries = 0 while total_seconds < wait_exponential_max: sleep_seconds = min(wait_seconds, wait_retry_max) logger.debug(f"waiting {sleep_seconds} seconds until retry {retries + 1}") time.sleep(sleep_seconds) try: return _get_data() except ( requests.exceptions.ConnectionError, requests.exceptions.HTTPError, socket.error, ): wait_seconds *= wait_exponential_multiplier total_seconds += sleep_seconds retries += 1 logger.error(f"giving up; download from {file_url} failed {retries + 1} times") raise WorkerError