NEMO_Nowcast Package API

System Configuration

NEMO_Nowcast framework system configuration object.

Provides dict-like access to the configuration loaded from the YAML system configuration file.

class nemo_nowcast.config.Config[source]

Construct a nemo_nowcast.config.Config instance.

file

Path/name of YAML configuration file for the NEMO nowcast system. Assigned when load() method is called.

load(config_file)[source]

Load the YAML config_file.

The value of config_file is stored on the nemo_nowcast.config.Config.file attribute.

Parameters

config_file (pathlib.Path or str) – Path/name of YAML configuration file for the NEMO nowcast system.

Message

NEMO_Nowcast framework message object.

class nemo_nowcast.message.Message(source, type, payload=None)[source]

Construct a nemo_nowcast.message.Message instance.

classmethod deserialize(message)[source]

Transform received message from str to message data structure.

Parameters

message (str) – Message dict serialized using YAML.

Returns

nemo_nowcast.lib.Message instance

payload

Content of message; must be serializable by YAML such that it can be deserialized by yaml.safe_load().

serialize()[source]

Construct a message data structure and transform it into a string suitable for sending.

Returns

Message data structure serialized using YAML.

source

Name of the worker or manager sending the message.

type

Key of a message type that is defined for source in the message registry section of the configuration data structure.

Message Broker

NEMO_Nowcast ZeroMQ message broker.

This broker provides the static point in the nowcast messaging framework, allowing the nowcast manager to be restarted more or less at will.

nemo_nowcast.message_broker.main()[source]

Set up and run the nowcast system message broker.

Set-up includes:

  • Building the command-line parser, and parsing the command-line used to launch the message broker

  • Reading and parsing the configuration file given on the command-line

  • Configuring the logging system as specified in the configuration file

  • Log the message broker’s PID, and the file path/name that was used to configure it.

The set-up is repeated if the message broker process receives a HUP signal so that the configuration can be re-loaded without having to stop and re-start the message broker.

After the set-up is complete, launch the broker message queuing process.

See python -m nemo_nowcast.message_broker --help for details of the command-line interface.

Manager

NEMO_Nowcast manager.

class nemo_nowcast.manager.NowcastManager(name='manager', config=_Nothing.NOTHING, logger=None, checklist=_Nothing.NOTHING, parsed_args=None, msg_registry=None, race_condition_mgmt=_Nothing.NOTHING, next_workers_module=None, context=_Nothing.NOTHING, socket=None)[source]

Construct a nemo_nowcast.manager.NowcastManager instance.

checklist

Nowcast system checklist: dict containing the present state of the nowcast system.

config

nemo_nowcast.config.Config object that holds the nowcast system configuration that is loaded from the configuration file in the setup() method.

logger

Logger for the manager. Configured from the logging section of the configuration file in the setup() method .

name

The name of the manager instance. Used in the nowcast messaging system and for logging.

run()[source]

Run the nowcast system manager:

  • Create the zmq.Context.socket for communication with the worker processes and connect it to the message broker.

  • Install signal handlers for hangup, interrupt, and kill signals.

  • Launch the manager’s message processing loop

setup()[source]

Set up the nowcast system manager process including:

  • Building the command-line parser, and parsing the command-line used to launch the manager

  • Reading and parsing the configuration file given on the command-line

  • Configuring the logging system as specified in the configuration file

  • Logging the manager’s PID, and the file path/name that was used to configure it.

  • Importing the next_workers module specified in the configuration file.

The set-up is repeated if the manager process receives a HUP signal so that the configuration can be re-loaded without having to stop and re-start the manager.

nemo_nowcast.manager.main()[source]

Setup and run the nowcast system manager.

See python -m nemo_nowcast.manager --help for details of the command-line interface.

Classes, Exceptions and Functions for Building Workers

NEMO_Nowcast worker classes.

class nemo_nowcast.worker.NextWorker(module, args=_Nothing.NOTHING, host='localhost')[source]

Construct a nemo_nowcast.worker.NextWorker instance.

Intended for use in a nowcast system implementation’s nowcast.next_workers module where after_worker_name() functions return lists of nemo_nowcast.worker.NextWorker instances that provide the sequence of workers and their arguments that are to be launched next.

args

Arguments to use when the worker is launched. Defaults to an empty list.

host

Host to launch the worker on. Defaults to localhost

launch(config, logger_name)[source]

Use a subprocess to launch worker on host with args as the worker’s command-line arguments.

Parameters
  • config (nemo_nowcast.config.Config) – Nowcast system configuration that was read from the configuration file.

  • logger_name (str) – Name of the logger to emit messages on.

This method does not wait for the subprocess to complete.

module

Name of the worker module including its package path, in dotted notation; e.g. nowcast.workers.download_weather.

class nemo_nowcast.worker.NowcastWorker(name, description, package='nowcast.workers', config=_Nothing.NOTHING, logger=None, cli=None, worker_func=None, success=None, failure=None, parsed_args=None, context=_Nothing.NOTHING, socket=None)[source]

Construct a nemo_nowcast.worker.NowcastWorker instance.

cli

nemo_nowcast.cli.CommandLineInterface object configured in the run() method to provide the default worker command-line interface that requires a nowcast config file name, and provides --debug, --help, and -h options.

config

nemo_nowcast.config.Config object that holds the nowcast system configuration that is loaded from the configuration file in the run() method.

description

Description of the worker. Used in the command-line interface. Typically the worker module docstring; i.e. description=__doc__.

failure

Function to be called when the worker fails. Called with the worker’s parsed command-line arguments argparse.Namespace; instance. Must return a string whose value is a failure message type defined for the worker in the nowcast configuration file. Passed as an argument to the run() method.

init_cli()[source]

Initialize the worker’s command-line interface.

The default worker command-line interface requires a nowcast config file name, and provides --debug, --help, and -h options.

Use the add_argument() method to add worker-specific arguments and/or options to the interface. The add_date_option() method is also available for the common task of adding a date option (e.g. --run-date) to the interface.

logger

Logger for the worker. Configured from the logging section of the configuration file in the run() method.

name

The name of the worker instance. Used in the nowcast messaging system and for logging.

package

Name of the package that the worker is part of; used to build the usage message. Use dotted notation; e.g. nowcast.workers.

run(worker_func, success, failure)[source]

Prepare the worker to do its work, then do it.

Preparations include:

  • Parsing the worker’s command-line argument into a argparse.ArgumentParser.Namepsace instance

  • Reading the nowcast configuration file named on the command line to a dict

  • Configuring the worker’s logging interface

  • Installing handlers for signals from the operating system

  • Configuring the worker’s interface to the nowcast messaging framework

Parameters
  • worker_func (Python function) – Function to be called to do the worker’s job. Called with the worker’s parsed command-line arguments argparse.Namespace instance, and the worker’s configuration dict.

  • success (Python function) – Function to be called when the worker finishes successfully. Called with the worker’s parsed command-line arguments argparse.Namespace instance. Must return a string whose value is a success message type defined for the worker in the nowcast configuration file.

  • failure (Python function) – Function to be called when the worker fails. Called with the worker’s parsed command-line arguments argparse.Namespace instance. Must return a string whose value is a failure message type defined for the worker in the nowcast configuration file.

success

Function to be called when the worker finishes successfully. Called with the worker’s parsed command-line arguments argparse.Namespace instance. Must return a string whose value is a success message type defined for the worker in the nowcast configuration file. Passed as an argument to the run() method.

tell_manager(msg_type, payload=None)[source]

Exchange messages with the nowcast manager process.

Message is composed of worker’s name, msg_type, and payload. Acknowledgement message from manager process is logged and returned.

Parameters
  • msg_type (str) – Key of the message type to send; must be defined for worker name in the configuration data structure.

  • payload – Data object to send in the message; e.g. dict containing worker’s checklist of accomplishments.

Returns

Acknowledgement message from manager process.

worker_func

Function to be called to do the worker’s job. Called with the worker’s parsed command-line arguments argparse.Namespace instance, the worker’s configuration dict, and the tell_manager() method. Passed as an argument to the run() method.

exception nemo_nowcast.worker.WorkerError[source]

Raised when a worker encounters an error or exception that it can’t recover from.

nemo_nowcast.worker.get_web_data(file_url, logger_name, filepath=None, session=None, chunk_size=102400, wait_exponential_multiplier=2, wait_retry_max=256, wait_exponential_max=3600)[source]

Download content from file_url and store it in filepath.

If the first download attempt fails, retry at exponentially increasing intervals until wait_exponential_max is exceeded. The first retry occurs after wait_exponential_multiplier seconds The delay until the next retry is calculated by multiplying the previous delay by wait_exponential_multiplier.

So, with the default argument values, the first retry will occur 2 seconds after the download fails, and subsequent retries will occur at 4, 8, 16, 32, 64, …, 256, 256, …, 3582 seconds after each failure.

Parameters
  • file_url (str) – URL to download content from.

  • logger_name (str) – Name of the logging.Logger to emit messages on.

  • filepath (pathlib.Path) – File path/name at which to store the downloaded content. If None (the default) the content is returned.

  • session (requests.Session) –

    Session object to use for TCP connection pooling to improve performance for multiple requests to the same host. Defaults to None for simplicity, in which case a session is created within the function. If the function is called within loop, the recommended use pattern is to create the session outside the loop as a context manager:

    with requests.Session() as session:
        for thing in iterable:
            nemo_nowcast.worker.get_web_data(
                file_url, logger_name, filepath, session)
    

  • chunk_size – Maximum number of bytes to read into memory at a time and write to disk as the download proceeds. The default value gives performance comparable to curl when downloading weather forecast files from the Environment Canada collaboration FTP server. Tuning maybe required for downloads from other sources.

  • wait_exponential_multiplier (int or float) – Multiplicative factor that increases the time interval between retries. Also the number of seconds to wait before the first retry.

  • wait_retry_max (int or float) – Maximum number of seconds to wait between retries. This caps the exponential growth of the wait time between retries, but retries continue with this period until wait_exponential_max is reached.

  • wait_exponential_max (int or float) – Maximum number of seconds for the final retry wait interval. The actual wait time is less than or equal to the limit so it may be significantly less than the limit; e.g. with the default argument values the final retry wait interval will be 2048 seconds.

Returns

requests.Response.content

Return type

bytes

Raises

nemo_nowcast.workers.WorkerError

Command-line Interface

NEMO_Nowcast framework command-line interface.

Provides a command-line interface argument parser for nowcast system components. The parser includes handling for the always-required config_file argument.

class nemo_nowcast.cli.CommandLineInterface(module_name, package='nowcast', description=None)[source]

Construct a nemo_nowcast.cli.CommandLineInterface instance.

add_argument(*args, **kwargs)[source]

Add an argument to the CLI parser.

This is a thin wrapper around argparse.ArgumentParser.add_argument() that accepts that method’s arguments.

add_date_option(name, default, help)[source]

Add a date option to the CLI parser.

The stored date is an arrow.Arrow object.

This is a thin wrapper around argparse.ArgumentParser.add_argument() that sets the type of the option to nemo_nowcast.cli.CommandLineInterface.arrow_date(), and append information about the option’s format and default value to the help message.

Parameters
  • name (str) – Option name/flag; e.g. --forecast-date.

  • default (arrow.Arrow) – Date to use when the option is not included on the command-line; typically arrow.now().floor('day').

  • help (str) – Help message. The words “Use YYYY-MM-DD format. Defaults to {default}.” are appended to the message provided, where “{default}” is the value of default formatted as YYYY-MM-DD.

static arrow_date(string)[source]

Convert a YYYY-MM-DD string to a UTC arrow object or raise argparse.ArgumentTypeError.

The time part of the resulting arrow object is set to 00:00:00.

Parameters

string (str) – YYYY-MM-DD string to convert.

Returns

Date string converted to a UTC arrow.Arrow object.

Raises

argparse.ArgumentTypeError

build_parser(add_help=True)[source]

Return a command-line argument parser with its description and usage messages set, and config_file as a required argument.

Parameters

add_help (boolean) – Add a -h/–help option to the parser. Disable this if you are going to use the returned parser as a parent parser to facilitate adding more args/options.

Returns

argparse.ArgumentParser object

description

Brief description of what the module does that will be displayed in the help messages.

module_name

Name of the module that the parser is for; used to build the usage message.

package

Name of the package that the module is part of; used to build the usage message. Use dotted notation; e.g. nowcast.workers.

parser

Argument parser; created by calling the build_parser() method.

Worker Launch Scheduler

NEMO_Nowcast worker launch scheduler.

nemo_nowcast.scheduler.main()[source]

Set up and run the nowcast system worker launch scheduler.

Set-up includes:

  • Building the command-line parser, and parsing the command-line used to launch the scheduler

  • Reading and parsing the configuration file given on the command-line

  • Configuring the logging system as specified in the configuration file

  • Logging the scheduler’s PID, and the file path/name that was used to configure it.

  • Install signal handlers for hangup, interrupt, and kill signals.

The set-up is repeated if the scheduler process receives a HUP signal so that the configuration can be re-loaded without having to stop and re-start the scheduler.

After the set-up is complete, start the scheduled worker launching loop.

See python -m nowcast.scheduler --help for details of the command-line interface.

Log Aggregator

NEMO_Nowcast distributed logging aggregator.

This logging aggregator subscribes to a ZeroMQ port to collect logging messages published by other processes. It is useful for nowcast systems in which workers run on hosts other than the one that the manager and message broker run on.

nemo_nowcast.log_aggregator.main()[source]

Set up and run the nowcast system logging aggregator.

Set-up includes:

  • Building the command-line parser, and parsing the command-line used to launch the log aggregator

  • Reading and parsing the configuration file given on the command-line

  • Configuring the logging system as specified in the configuration file

  • Logging the log aggregator’s PID, and the file path/name that was used to configure it.

The set-up is repeated if the log aggregator process receives a HUP signal so that the configuration can be re-loaded without having to stop and re-start the scheduler.

After the set-up is complete, start the log message processing launching loop.

See python -m nowcast.log_aggregator --help for details of the command-line interface.

Built-in Workers

The framework provides a few worker modules for tasks that are generic enough that they are likely to be required in most nowcast systems. Please see Built-in Workers for descriptions of these workers and their intended use.

NEMO_Nowcast framework rotate_logs worker.

Iterate through the nowcast system logging handlers, calling the doRollover() method on any that are instances of logging.handlers.RotatingFileHandler.

This worker is normally launched in automation at the end of a nowcast processing cycle (e.g. end of the day).

It can also be launched from the command-line by the nowcast administrator as necessary for system maintenance.

nemo_nowcast.workers.rotate_logs.main()[source]

Set up and run the worker.

For command-line usage see:

python -m nemo_nowcast.workers.rotate_logs --help

NEMO_Nowcast framework clear_checklist worker.

Send a message to the nowcast system manager requesting that it clear its system state checklist.

This worker is normally launched in automation at the end of a nowcast processing cycle (e.g. end of the day), just prior to launching the nemo_nowcast.workers.rotate_logs worker.

It can also be launched from the command-line by the nowcast administrator as necessary for system maintenance.

nemo_nowcast.workers.clear_checklist.main()[source]

Set up and run the worker.

For command-line usage see:

python -m nemo_nowcast.workers.clear_checklist --help

Example Workers

NEMO_Nowcast framework sleep worker example.

An example implementation of a worker module that does nothing other than sleep for a specified number of seconds.

nemo_nowcast.workers.sleep.main()[source]

Set up and run the worker.

For command-line usage see:

python -m nemo_nowcast.workers.sleep --help

NEMO_Nowcast framework awaken worker example.

An example implementation of a worker module that does nothing other than send messages to the manager. This worker is intended to demonstrate how a worker is launched after the sleep example worker finishes successfully.

nemo_nowcast.workers.awaken.main()[source]

Set up and run the worker.

For command-line usage see:

python -m nemo_nowcast.workers.awaken --help

Example next_workers Module

Example next_workers module.

This should be implemented as nowcast.next_workers in a nowcast system package the is built on top of the NEMO_Nowcast package. Please see the documentation at https://nemo-nowcast.readthedocs.io/en/latest/nowcast_system/index.html.

Functions to calculate lists of workers to launch after previous workers end their work.

Function names must be of the form after_worker_name().

nemo_nowcast.next_workers.after_awaken(msg, config, checklist)[source]

Calculate the list of workers to launch after the awaken example worker ends.

Parameters
  • msg (collections.namedtuple()) – Nowcast system message.

  • config (nemo_nowcast.config.Config) – dict-like object that holds the nowcast system configuration that is loaded from the system configuration file.

  • checklist (dict) – System checklist: data structure containing the present state of the nowcast system.

Returns

Sequence of nemo_nowcast.worker.NextWorker instances for worker(s) to launch next.

nemo_nowcast.next_workers.after_sleep(msg, config, checklist)[source]

Calculate the list of workers to launch after the sleep example worker ends.

Parameters
  • msg (collections.namedtuple()) – Nowcast system message.

  • config (nemo_nowcast.config.Config) – dict-like object that holds the nowcast system configuration that is loaded from the system configuration file.

  • checklist (dict) – System checklist: data structure containing the present state of the nowcast system.

Returns

Sequence of nemo_nowcast.worker.NextWorker instances for worker(s) to launch next.

Return type

list