Creating Nowcast Worker Modules

Nowcast workers are Python modules that can be imported from nowcast.workers. They are composed of some standard code to enable them to interface with the nowcast system messaging and logging framework, and one or more functions to execute their task in the nowcast system. Most of the standard code is centred around setup of a NowcastWorker object and executing method calls on it. The worker object is an instance of the nemo_nowcast.worker.NowcastWorker class.

Skeleton Worker Example

Here is a skeleton example of a worker module showing the standard code. It is explained, line by line, below. Actual (and obviously, more complicated) worker modules can be found in:

 1"""NEMO Nowcast worker to ...
 2
 3...
 4"""
 5import logging
 6
 7from nemo_nowcast import NowcastWorker
 8
 9
10NAME = 'worker_name'
11logger = logging.getLogger(NAME)
12
13
14def main():
15    """Set up and run the worker.
16
17    For command-line usage see:
18
19    :command:`python -m nemo_nowcast.workers.worker_name --help`
20    """
21    worker = NowcastWorker(NAME, description=__doc__)
22    worker.init_cli()
23    worker.run(worker_func, success, failure)
24
25
26def success(parsed_args):
27    logger.info('success message')
28    msg_type = 'success'
29    return msg_type
30
31
32def failure(parsed_args):
33    logger.critical('failure message')
34    msg_type = 'failure'
35    return msg_type
36
37
38def worker_func(parsed_args, config, tell_manager):
39    ...
40    return checklist
41
42
43if __name__ == '__main__':
44    main()

Lines 1 through 5 are the module’s triple-quoted docstring. It will appear in auto-generated documentation of the module. For convenience we will also use the docstring as the description element of the worker’s command-line help message, although that can easily be changed if you prefer to put more details in the docstring than you want to appear in the help text.

The minimum set of imports that a worker needs are:

import logging

from nemo_nowcast import NowcastWorker

The logging module is a Python standard library module that provides the mechanism that we use to print output about the worker’s progress and status to the log file or the screen, effectively developer-approved print statements on steroids :-) The NowcastWorker class provides the interface to the nowcast framework.

Obviously you will need to import whatever other modules your worker needs for its task.

Next up, on lines 12 and 13, are 2 module level variables:

NAME = 'worker_name'
logger = logging.getLogger(NAME)

NAME is used to identify the source of logging messages, and messages exchanged between the worker and the nowcast manager process.

logger is our interface to the Python standard library logging framework and we give this module’s instance the worker’s name.

Python scoping rules make module level variables available for use in any functions in the module without passing them as arguments but assigning new values to them elsewhere in the module will surely mess things up.

The main() Function

The main() function is where our worker gets down to work. It is called when the worker is run from the command line by virtue of the

if __name__ == '__main__':
    main()

stanza at the end of the module.

The minimum possible main() function is shown in lines 14 to 23:

def main():
    """Set up and run the worker.

    For command-line usage see:

    :command:`python -m nemo_nowcast.workers.worker_name --help`
    """
    worker = NowcastWorker(NAME, description=__doc__)
    worker.init_cli()
    worker.run(worker_func, success, failure)

The main() function docstring will appear in auto-generated documentation of the module.

First, we create an instance of the NowcastWorker class that we call, by convention, worker. The NowcastWorker constructor takes 2 arguments:

  • the NAME that we defined as a module-level variable above

  • a description string that is used as the description element of the worker’s command-line help message; here we use the worker’s module docstring (that is automatically stored in the __doc__ module-level variable)

    The description part of the help message is the paragraph after the usage, for example:

    (nowcast)$ python -m nowcast.workers.download_weather --help
    
    usage: python -m nowcast.workers.download_weather
           [-h] [--debug] [--yesterday] config_file {18,00,12,06}
    
    Salish Sea NEMO nowcast weather model dataset download worker. Download the
    GRIB2 files from today's 00, 06, 12, or 18 EC GEM 2.5km HRDPS operational
    model forecast.
    
    ...
    

See the NEMO_Nowcast.worker.NowcastWorker documentation for details of the NowcastWorker object’s contructor arguments, other attributes, and methods.

Next, we call the init_cli() method on the worker to initialize the worker’s command-line interface (CLI). The default worker command-line interface requires a nowcast config file name, and provides --debug, --help, and -h options. The worker’s CLI can be extended with additional command-line arguments and/or options. Please see Extending the Command Line Interface for details.

Finally, we call the run() method on the worker to do the actual work. The run() method takes 3 function names as arguments:

  • worker_func is the name of the function that does the worker’s job

  • success is the name of the function to be called when the worker finishes successfully

  • failure is the name of the function to be called when the worker fails

All 3 functions must be defined in the worker module. Their required call signatures and return values are described below.

success() and failure() Functions

The success() function is called when the worker successfully completes its task. It is used to generate the message that is sent to the nowcast manager process to indicate the worker’s success so that the nowcast automation can proceed to the next appropriate worker(s). A minimal success() function is shown in lines 26 through 29:

def success(parsed_args):
    logger.info('success message')
    msg_type = 'success'
    return msg_type

The name of the function is success() by convention, but it could be anything provided that it is the 2nd argument passed to the worker.run() method.

The success() function must accept exactly 1 argument, named parsed_args by convention. It is an argparse.Namespace object that has the worker’s command-line argument names and values as attributes. Even if your success() function does not use parsed_args it must still be included in the function definition.

The success() function should send a message via logger.info() to the logging system that describes the worker’s success.

The success() function must return a string that is a key registered for the worker in the Message Registry section of the Nowcast Configuration File. The returned key specifies the message type that is sent to the Manager process to indicate the worker’s success.

Here is a more sophisticated example of a success() function from the GoMSS Nowcast package download_weather

worker:

def success(parsed_args):
    logger.info(
        '{date} weather forecast file downloads complete'
        .format(date=parsed_args.forecast_date.format('YYYY-MM-DD')))
    msg_type = 'success'
    return msg_type

The failure() function is very similar to the success() function except that it is called if the worker fails to complete its task. It is used to generate the message that is sent to the nowcast manager process to indicate the worker’s failure so that appropriate notifications can be produced and/or remedial action(s) taken. A minimal failure() function is shown on lines 32 through 35:

def failure(parsed_args):
    logger.critical('failure message')
    msg_type = 'failure'
    return msg_type

The name of the function is failure() by convention, but it could be anything provided that it is the 3rd argument passed to the worker.run() method.

Like the success() function, the failure() function must accept exactly 1 argument, named parsed_args by convention. It is an argparse.Namespace object that has the worker’s command-line argument names and values as attributes. Even if your failure() function does not use parsed_args it must still be included in the function definition.

The failure() function should send a message via logger.critical() to the logging system that describes the worker’s failure.

The failure() function must return a string that is a key registered for the worker in the Message Registry section of the Nowcast Configuration File. The returned key specifies the message type that is sent to the nowcast manager process to indicate the worker’s failure.

Doing the Work

Lines 38 through 40 show the required call signature and return value for the function that is called to do the worker’s task:

def worker_func(parsed_args, config, tell_manager):
    ...
    return checklist

The name of the function can be anything provided that it is the 1st argument passed to the worker.run() method. Ideally, the function name should be descriptive of the worker’s task. If you can’t think of anything else, you can use the name of the worker module.

The function must accept exactly 3 arguments:

  • The 1st argument is named parsed_args by convention. It is an argparse.Namespace object that has the worker’s command-line argument names and values as attributes. Even if your function does not use parsed_args it must still be included in the function definition.

  • The 2nd argument is named config by convention. It is a nemo_nowcast.config.Config object that provides dict-like access to the nowcast system configuration loaded from the Nowcast Configuration File. Even if your function does not use config it must still be included in the function definition.

  • The 3rd argument is named tell_manager by convention. It is the worker’s nemo_nowcast.worker.NowcastWorker.tell_manager() method. That method provides a mechanism for the exchange of messages with the nowcast manager process. Few workers need to do that, so the tell_manager is often replaced by *args in the function signature:

    def worker_func(parsed_args, config, *args):
    

    Please see the SalishSeaNowcast package watch_NEMO worker for examples of the use of tell_manager.

The function must return a Python dict, known as checklist by convention. checklist must contain at least 1 key/value pair that provides information about the worker’s successful completion. checklist is sent to the nowcast manager process as the payload of the worker’s success message. A simple example of a checklist from the GoMSS Nowcast package download_weather worker is:

checklist = {
    '{date} forecast'
    .format(date=date=parsed_args.forecast_date.format('YYYY-MM-DD'))): True}

which indicates that the particular forecast download was successful. A more sophisticated checklist such as the one produced by the SalishSeaNowcast package get_NeahBay_ssh worker contains multiple keys and lists of filenames.

The function whose name is passed as the 1st argument to the worker.run() method can be a driver function that calls other functions in the worker module to subdivide the worker task into smaller, more readable, and more testable sections. By convention, such “2nd level” functions are marked as private by prefixing their names with the _ (underscore) character; e.g. _calc_date(). This is in line with the Python language convention that functions and methods that start with an underscore should not be called outside the module in which they are defined.

The worker should send messages to the logging system that indicate its progress. Messages sent via logger.info() appear in the nowcast.log file. Info level logging should be used for “high level” progress messages, and preferably not used inside loops. Messages logged via logger.debug() can be used for more detailed logging. Those messages appear in the nowcast.debug.log file.

If a worker function encounters an expected error condition (a file download failure or timeout, for example) it should send a message to the logging system via logger.critical() and raise a nemo_nowcast.worker.WorkerError exception. Here is an example that handles an empty downloaded file in the SalishSeaNowcast package download_weather worker:

if size == 0:
    logger.critical('Problem, 0 size file {}'.format(fileURL))
    raise WorkerError

This section has only outlined the basic code structure and conventions for writing nowcast workers. The best way to learn now to write a new worker is by studying the code in existing worker modules, for example:

Extending the Command Line Interface

Generic Arguments

If you need to add a command-line argument to a worker you can do so by calling the worker.cli.add_argument() method. Here is an example from the SalishSeaNowcast package get_NeahBay_ssh worker:

def main():
    """For command-line usage see:

    :command:`python -m nowcast.workers.watch_NEMO --help`
    """
    worker = NowcastWorker(NAME, description=__doc__)
    worker.init_cli()
    worker.cli.add_argument("host_name", help="Name of the host to monitor the run on")
    worker.cli.add_argument(
        "run_type",
        choices={"nowcast", "nowcast-green", "nowcast-dev", "forecast", "forecast2"},
        help="""
        Type of run to monitor:
        'nowcast' means nowcast physics run,
        'nowcast-green' means nowcast green ocean run,
        'forecast' means updated forecast run,
        'forecast2' means preliminary forecast run,
        """,
    )
    worker.run(watch_NEMO, success, failure)

The worker.cli.add_argument() method is documented at nemo_nowcast.cli.CommandLineInterface.add_argument(). It takes the same arguments as the Python standard library argparse.ArgumentParser.add_argument() method.

Note

The worker.init_cli() method initialized the worker’s command-line interface to provide help messages, and handle the config_file argument, and the --debug option.

Date Options

The fairly common need to add a date option to a worker’s CLI is simplified by the worker.cli.add_date_option(). Here is an example from the GoMSS Nowcast package download_weather worker:

def main():
    """Set up and run the worker.

    For command-line usage see:

    :command:`python -m nowcast.workers.download_weather --help`
    """
    worker = NowcastWorker(NAME, description=__doc__)
    worker.init_cli()
    worker.cli.add_date_option(
        '--forecast-date', default=arrow.now().floor('day'),
        help='Date for which to download the weather forecast.')
    worker.run(download_weather, success, failure)

This adds a --forecast-date option to the CLI. It’s default value is an Arrow object whose value is midnight on the current date. It will be available in the worker functions as parsed_args.forecast_date. The help message for the option is:

Date for which to download the weather forecast. Use YYYY-MM-DD format. Defaults to {default}.

where {default} is the value of default passed into worker.cli.add_date_option() formatted as YYYY-MM-DD.

The worker.cli.add_date_option() method is documented at nemo_nowcast.cli.CommandLineInterface.add_date_option().

Note

The Arrow object produced by worker.cli.add_date_option() is timezone-aware and its timezone is set to UTC. That is typically fine when working with just the date. If you need to do time calculations in a worker you may need to set the correct timezone. That is typically done by calling the to() method on the Arrow object with 'local' as its argument; e.g. parsed_args.forecast_date.to('local').