NEMO_Nowcast Package API
System Configuration
NEMO_Nowcast framework system configuration object.
Provides dict
-like access to the configuration loaded from the
YAML system configuration file.
- class nemo_nowcast.config.Config[source]
Construct a
nemo_nowcast.config.Config
instance.- file
Path/name of YAML configuration file for the NEMO nowcast system. Assigned when
load()
method is called.
- load(config_file)[source]
Load the YAML config_file.
The value of config_file is stored on the
nemo_nowcast.config.Config.file
attribute.- Parameters
config_file (
pathlib.Path
or str) – Path/name of YAML configuration file for the NEMO nowcast system.
Message
NEMO_Nowcast framework message object.
- class nemo_nowcast.message.Message(source, type, payload=None)[source]
Construct a
nemo_nowcast.message.Message
instance.- classmethod deserialize(message)[source]
Transform received message from str to message data structure.
- Parameters
message (str) – Message dict serialized using YAML.
- Returns
nemo_nowcast.lib.Message
instance
- payload
Content of message; must be serializable by YAML such that it can be deserialized by
yaml.safe_load()
.
- serialize()[source]
Construct a message data structure and transform it into a string suitable for sending.
- Returns
Message data structure serialized using YAML.
- source
Name of the worker or manager sending the message.
- type
Key of a message type that is defined for source in the message registry section of the configuration data structure.
Message Broker
NEMO_Nowcast ZeroMQ message broker.
This broker provides the static point in the nowcast messaging framework, allowing the nowcast manager to be restarted more or less at will.
- nemo_nowcast.message_broker.main()[source]
Set up and run the nowcast system message broker.
Set-up includes:
Building the command-line parser, and parsing the command-line used to launch the message broker
Reading and parsing the configuration file given on the command-line
Configuring the logging system as specified in the configuration file
Log the message broker’s PID, and the file path/name that was used to configure it.
The set-up is repeated if the message broker process receives a HUP signal so that the configuration can be re-loaded without having to stop and re-start the message broker.
After the set-up is complete, launch the broker message queuing process.
See python -m nemo_nowcast.message_broker --help for details of the command-line interface.
Manager
NEMO_Nowcast manager.
- class nemo_nowcast.manager.NowcastManager(name='manager', config=_Nothing.NOTHING, logger=None, checklist=_Nothing.NOTHING, parsed_args=None, msg_registry=None, race_condition_mgmt=_Nothing.NOTHING, next_workers_module=None, context=_Nothing.NOTHING, socket=None)[source]
Construct a
nemo_nowcast.manager.NowcastManager
instance.- config
nemo_nowcast.config.Config
object that holds the nowcast system configuration that is loaded from the configuration file in thesetup()
method.
- logger
Logger for the manager. Configured from the logging section of the configuration file in the
setup()
method .
- name
The name of the manager instance. Used in the nowcast messaging system and for logging.
- run()[source]
Run the nowcast system manager:
Create the
zmq.Context.socket
for communication with the worker processes and connect it to the message broker.Install signal handlers for hangup, interrupt, and kill signals.
Launch the manager’s message processing loop
- setup()[source]
Set up the nowcast system manager process including:
Building the command-line parser, and parsing the command-line used to launch the manager
Reading and parsing the configuration file given on the command-line
Configuring the logging system as specified in the configuration file
Logging the manager’s PID, and the file path/name that was used to configure it.
Importing the
next_workers
module specified in the configuration file.
The set-up is repeated if the manager process receives a HUP signal so that the configuration can be re-loaded without having to stop and re-start the manager.
Classes, Exceptions and Functions for Building Workers
NEMO_Nowcast worker classes.
- class nemo_nowcast.worker.NextWorker(module, args=_Nothing.NOTHING, host='localhost')[source]
Construct a
nemo_nowcast.worker.NextWorker
instance.Intended for use in a nowcast system implementation’s
nowcast.next_workers
module whereafter_worker_name()
functions return lists ofnemo_nowcast.worker.NextWorker
instances that provide the sequence of workers and their arguments that are to be launched next.- args
Arguments to use when the worker is launched. Defaults to an empty list.
- host
Host to launch the worker on. Defaults to localhost
- launch(config, logger_name)[source]
Use a subprocess to launch worker on host with args as the worker’s command-line arguments.
- Parameters
config (
nemo_nowcast.config.Config
) – Nowcast system configuration that was read from the configuration file.logger_name (str) – Name of the logger to emit messages on.
This method does not wait for the subprocess to complete.
- module
Name of the worker module including its package path, in dotted notation; e.g. nowcast.workers.download_weather.
- class nemo_nowcast.worker.NowcastWorker(name, description, package='nowcast.workers', config=_Nothing.NOTHING, logger=None, cli=None, worker_func=None, success=None, failure=None, parsed_args=None, context=_Nothing.NOTHING, socket=None)[source]
Construct a
nemo_nowcast.worker.NowcastWorker
instance.- cli
nemo_nowcast.cli.CommandLineInterface
object configured in therun()
method to provide the default worker command-line interface that requires a nowcast config file name, and provides --debug, --help, and -h options.
- config
nemo_nowcast.config.Config
object that holds the nowcast system configuration that is loaded from the configuration file in therun()
method.
- description
Description of the worker. Used in the command-line interface. Typically the worker module docstring; i.e. description=__doc__.
- failure
Function to be called when the worker fails. Called with the worker’s parsed command-line arguments
argparse.Namespace
; instance. Must return a string whose value is a failure message type defined for the worker in the nowcast configuration file. Passed as an argument to therun()
method.
- init_cli()[source]
Initialize the worker’s command-line interface.
The default worker command-line interface requires a nowcast config file name, and provides --debug, --help, and -h options.
Use the
add_argument()
method to add worker-specific arguments and/or options to the interface. Theadd_date_option()
method is also available for the common task of adding a date option (e.g. --run-date) to the interface.
- logger
Logger for the worker. Configured from the logging section of the configuration file in the
run()
method.
- name
The name of the worker instance. Used in the nowcast messaging system and for logging.
- package
Name of the package that the worker is part of; used to build the usage message. Use dotted notation; e.g. nowcast.workers.
- run(worker_func, success, failure)[source]
Prepare the worker to do its work, then do it.
Preparations include:
Parsing the worker’s command-line argument into a
argparse.ArgumentParser.Namepsace
instanceReading the nowcast configuration file named on the command line to a dict
Configuring the worker’s logging interface
Installing handlers for signals from the operating system
Configuring the worker’s interface to the nowcast messaging framework
- Parameters
worker_func (Python function) – Function to be called to do the worker’s job. Called with the worker’s parsed command-line arguments
argparse.Namespace
instance, and the worker’s configuration dict.success (Python function) – Function to be called when the worker finishes successfully. Called with the worker’s parsed command-line arguments
argparse.Namespace
instance. Must return a string whose value is a success message type defined for the worker in the nowcast configuration file.failure (Python function) – Function to be called when the worker fails. Called with the worker’s parsed command-line arguments
argparse.Namespace
instance. Must return a string whose value is a failure message type defined for the worker in the nowcast configuration file.
- success
Function to be called when the worker finishes successfully. Called with the worker’s parsed command-line arguments
argparse.Namespace
instance. Must return a string whose value is a success message type defined for the worker in the nowcast configuration file. Passed as an argument to therun()
method.
- tell_manager(msg_type, payload=None)[source]
Exchange messages with the nowcast manager process.
Message is composed of worker’s name, msg_type, and payload. Acknowledgement message from manager process is logged and returned.
- Parameters
msg_type (str) – Key of the message type to send; must be defined for worker name in the configuration data structure.
payload – Data object to send in the message; e.g. dict containing worker’s checklist of accomplishments.
- Returns
Acknowledgement message from manager process.
- worker_func
Function to be called to do the worker’s job. Called with the worker’s parsed command-line arguments
argparse.Namespace
instance, the worker’s configuration dict, and thetell_manager()
method. Passed as an argument to therun()
method.
- exception nemo_nowcast.worker.WorkerError[source]
Raised when a worker encounters an error or exception that it can’t recover from.
- nemo_nowcast.worker.get_web_data(file_url, logger_name, filepath=None, session=None, chunk_size=102400, wait_exponential_multiplier=2, wait_retry_max=256, wait_exponential_max=3600)[source]
Download content from file_url and store it in filepath.
If the first download attempt fails, retry at exponentially increasing intervals until wait_exponential_max is exceeded. The first retry occurs after wait_exponential_multiplier seconds The delay until the next retry is calculated by multiplying the previous delay by wait_exponential_multiplier.
So, with the default argument values, the first retry will occur 2 seconds after the download fails, and subsequent retries will occur at 4, 8, 16, 32, 64, …, 256, 256, …, 3582 seconds after each failure.
- Parameters
file_url (str) – URL to download content from.
logger_name (str) – Name of the
logging.Logger
to emit messages on.filepath (
pathlib.Path
) – File path/name at which to store the downloaded content. IfNone
(the default) the content is returned.session (
requests.Session
) –Session object to use for TCP connection pooling to improve performance for multiple requests to the same host. Defaults to
None
for simplicity, in which case a session is created within the function. If the function is called within loop, the recommended use pattern is to create the session outside the loop as a context manager:with requests.Session() as session: for thing in iterable: nemo_nowcast.worker.get_web_data( file_url, logger_name, filepath, session)
chunk_size – Maximum number of bytes to read into memory at a time and write to disk as the download proceeds. The default value gives performance comparable to curl when downloading weather forecast files from the Environment Canada collaboration FTP server. Tuning maybe required for downloads from other sources.
wait_exponential_multiplier (int or float) – Multiplicative factor that increases the time interval between retries. Also the number of seconds to wait before the first retry.
wait_retry_max (int or float) – Maximum number of seconds to wait between retries. This caps the exponential growth of the wait time between retries, but retries continue with this period until wait_exponential_max is reached.
wait_exponential_max (int or float) – Maximum number of seconds for the final retry wait interval. The actual wait time is less than or equal to the limit so it may be significantly less than the limit; e.g. with the default argument values the final retry wait interval will be 2048 seconds.
- Returns
requests.Response.content
- Return type
- Raises
nemo_nowcast.workers.WorkerError
Command-line Interface
NEMO_Nowcast framework command-line interface.
Provides a command-line interface argument parser for nowcast system components. The parser includes handling for the always-required config_file argument.
- class nemo_nowcast.cli.CommandLineInterface(module_name, package='nowcast', description=None)[source]
Construct a
nemo_nowcast.cli.CommandLineInterface
instance.- add_argument(*args, **kwargs)[source]
Add an argument to the CLI parser.
This is a thin wrapper around
argparse.ArgumentParser.add_argument()
that accepts that method’s arguments.
- add_date_option(name, default, help)[source]
Add a date option to the CLI parser.
The stored date is an
arrow.Arrow
object.This is a thin wrapper around
argparse.ArgumentParser.add_argument()
that sets the type of the option tonemo_nowcast.cli.CommandLineInterface.arrow_date()
, and append information about the option’s format and default value to the help message.- Parameters
name (str) – Option name/flag; e.g. --forecast-date.
default (
arrow.Arrow
) – Date to use when the option is not included on the command-line; typically arrow.now().floor('day').help (str) – Help message. The words “Use YYYY-MM-DD format. Defaults to {default}.” are appended to the message provided, where “{default}” is the value of default formatted as YYYY-MM-DD.
- static arrow_date(string)[source]
Convert a YYYY-MM-DD string to a UTC arrow object or raise
argparse.ArgumentTypeError
.The time part of the resulting arrow object is set to 00:00:00.
- Parameters
string (str) – YYYY-MM-DD string to convert.
- Returns
Date string converted to a UTC
arrow.Arrow
object.- Raises
argparse.ArgumentTypeError
- build_parser(add_help=True)[source]
Return a command-line argument parser with its description and usage messages set, and config_file as a required argument.
- Parameters
add_help (boolean) – Add a -h/–help option to the parser. Disable this if you are going to use the returned parser as a parent parser to facilitate adding more args/options.
- Returns
argparse.ArgumentParser
object
- description
Brief description of what the module does that will be displayed in the help messages.
- module_name
Name of the module that the parser is for; used to build the usage message.
- package
Name of the package that the module is part of; used to build the usage message. Use dotted notation; e.g. nowcast.workers.
- parser
Argument parser; created by calling the
build_parser()
method.
Worker Launch Scheduler
NEMO_Nowcast worker launch scheduler.
- nemo_nowcast.scheduler.main()[source]
Set up and run the nowcast system worker launch scheduler.
Set-up includes:
Building the command-line parser, and parsing the command-line used to launch the scheduler
Reading and parsing the configuration file given on the command-line
Configuring the logging system as specified in the configuration file
Logging the scheduler’s PID, and the file path/name that was used to configure it.
Install signal handlers for hangup, interrupt, and kill signals.
The set-up is repeated if the scheduler process receives a HUP signal so that the configuration can be re-loaded without having to stop and re-start the scheduler.
After the set-up is complete, start the scheduled worker launching loop.
See python -m nowcast.scheduler --help for details of the command-line interface.
Log Aggregator
NEMO_Nowcast distributed logging aggregator.
This logging aggregator subscribes to a ZeroMQ port to collect logging messages published by other processes. It is useful for nowcast systems in which workers run on hosts other than the one that the manager and message broker run on.
- nemo_nowcast.log_aggregator.main()[source]
Set up and run the nowcast system logging aggregator.
Set-up includes:
Building the command-line parser, and parsing the command-line used to launch the log aggregator
Reading and parsing the configuration file given on the command-line
Configuring the logging system as specified in the configuration file
Logging the log aggregator’s PID, and the file path/name that was used to configure it.
The set-up is repeated if the log aggregator process receives a HUP signal so that the configuration can be re-loaded without having to stop and re-start the scheduler.
After the set-up is complete, start the log message processing launching loop.
See python -m nowcast.log_aggregator --help for details of the command-line interface.
Built-in Workers
The framework provides a few worker modules for tasks that are generic enough that they are likely to be required in most nowcast systems. Please see Built-in Workers for descriptions of these workers and their intended use.
NEMO_Nowcast framework rotate_logs worker.
Iterate through the nowcast system logging handlers, calling the
doRollover()
method on any that are instances of
logging.handlers.RotatingFileHandler
.
This worker is normally launched in automation at the end of a nowcast processing cycle (e.g. end of the day).
It can also be launched from the command-line by the nowcast administrator as necessary for system maintenance.
- nemo_nowcast.workers.rotate_logs.main()[source]
Set up and run the worker.
For command-line usage see:
python -m nemo_nowcast.workers.rotate_logs --help
NEMO_Nowcast framework clear_checklist worker.
Send a message to the nowcast system manager requesting that it clear its system state checklist.
This worker is normally launched in automation at the end of a nowcast
processing cycle (e.g. end of the day), just prior to launching the
nemo_nowcast.workers.rotate_logs
worker.
It can also be launched from the command-line by the nowcast administrator as necessary for system maintenance.
Example Workers
NEMO_Nowcast framework sleep worker example.
An example implementation of a worker module that does nothing other than sleep for a specified number of seconds.
- nemo_nowcast.workers.sleep.main()[source]
Set up and run the worker.
For command-line usage see:
python -m nemo_nowcast.workers.sleep --help
NEMO_Nowcast framework awaken worker example.
An example implementation of a worker module that does nothing other than send messages to the manager. This worker is intended to demonstrate how a worker is launched after the sleep example worker finishes successfully.
Example next_workers
Module
Example next_workers
module.
This should be implemented as nowcast.next_workers
in a nowcast
system package the is built on top of the NEMO_Nowcast package.
Please see the documentation at
https://nemo-nowcast.readthedocs.io/en/latest/nowcast_system/index.html.
Functions to calculate lists of workers to launch after previous workers end their work.
Function names must be of the form after_worker_name()
.
- nemo_nowcast.next_workers.after_awaken(msg, config, checklist)[source]
Calculate the list of workers to launch after the awaken example worker ends.
- Parameters
msg (
collections.namedtuple()
) – Nowcast system message.config (
nemo_nowcast.config.Config
) –dict
-like object that holds the nowcast system configuration that is loaded from the system configuration file.checklist (dict) – System checklist: data structure containing the present state of the nowcast system.
- Returns
Sequence of
nemo_nowcast.worker.NextWorker
instances for worker(s) to launch next.
- nemo_nowcast.next_workers.after_sleep(msg, config, checklist)[source]
Calculate the list of workers to launch after the sleep example worker ends.
- Parameters
msg (
collections.namedtuple()
) – Nowcast system message.config (
nemo_nowcast.config.Config
) –dict
-like object that holds the nowcast system configuration that is loaded from the system configuration file.checklist (dict) – System checklist: data structure containing the present state of the nowcast system.
- Returns
Sequence of
nemo_nowcast.worker.NextWorker
instances for worker(s) to launch next.- Return type