# Copyright 2016-2021 Doug Latornell, 43ravens
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""NEMO_Nowcast worker launch scheduler.
"""
import logging
import logging.config
import os
import signal
import time
import schedule
import sentry_sdk
import zmq
import zmq.log.handlers
from nemo_nowcast import CommandLineInterface, Config, NextWorker
NAME = "scheduler"
logger = logging.getLogger(NAME)
context = zmq.Context()
[docs]
def main():
"""Set up and run the nowcast system worker launch scheduler.
Set-up includes:
* Building the command-line parser, and parsing the command-line used
to launch the scheduler
* Reading and parsing the configuration file given on the command-line
* Configuring the logging system as specified in the configuration file
* Logging the scheduler's PID, and the file path/name that was used to
configure it.
* Install signal handlers for hangup, interrupt, and kill signals.
The set-up is repeated if the scheduler process receives a HUP signal
so that the configuration can be re-loaded without having to stop and
re-start the scheduler.
After the set-up is complete, start the scheduled worker launching loop.
See :command:`python -m nowcast.scheduler --help`
for details of the command-line interface.
"""
cli = CommandLineInterface(NAME, package="nemo_nowcast", description=__doc__)
cli.build_parser()
parsed_args = cli.parser.parse_args()
config = Config()
config.load(parsed_args.config_file)
msg = _configure_logging(config)
logger.info(f"running in process {os.getpid()}")
logger.info(f"read config from {config.file}")
logger.info(msg)
_install_signal_handlers()
run(config)
def _configure_logging(config):
"""Configure the scheduler's logging system interface.
:param config: Nowcast system configuration.
:type config: :py:class:`nemo_nowcast.config.Config`
"""
# Initialize exception logging to Sentry with client DSN URL from SENTRY_DSN envvar;
# does nothing if SENTRY_DSN does not exist, is empty, or is not recognized by Sentry
sentry_sdk.init()
if "publisher" in config["logging"]:
# Publish log messages to distributed logging aggregator
logging_config = config["logging"]["publisher"]
logging_config["handlers"]["zmq_pub"]["context"] = context
host = config["zmq"]["host"]
port = config["zmq"]["ports"]["logging"][NAME]
addr = f"tcp://*:{port}"
logging_config["handlers"]["zmq_pub"]["interface_or_socket"] = addr
logging.config.dictConfig(logging_config)
for handler in logger.root.handlers:
if isinstance(handler, zmq.log.handlers.PUBHandler):
handler.root_topic = NAME
handler.formatters = {
logging.DEBUG: logging.Formatter("%(message)s\n"),
logging.INFO: logging.Formatter("%(message)s\n"),
logging.WARNING: logging.Formatter("%(message)s\n"),
logging.ERROR: logging.Formatter("%(message)s\n"),
logging.CRITICAL: logging.Formatter("%(message)s\n"),
}
# Not sure why, but we need a brief pause before we start logging
# messages
time.sleep(0.25)
msg = f"publishing logging messages to {addr}"
else:
# Write log messages to local file system
#
# Replace logging RotatingFileHandlers with WatchedFileHandlers so
# that we notice when log files are rotated and switch to writing
# to the new ones
logging_config = config["logging"]
logging_handlers = logging_config["handlers"]
rotating_handler = "logging.handlers.RotatingFileHandler"
watched_handler = "logging.handlers.WatchedFileHandler"
for handler in logging_handlers:
if logging_handlers[handler]["class"] == rotating_handler:
logging_handlers[handler]["class"] = watched_handler
del logging_handlers[handler]["backupCount"]
logging.config.dictConfig(logging_config)
msg = "writing logging messages to local file system"
return msg
def run(config):
"""Run the nowcast system worker launch scheduler.
* Prepare the schedule as specified in the configuration file.
* Loop forever, periodically checking to see if it is time to launch the
scheduled workers.
:param config: Nowcast system configuration.
:type config: :py:class:`nemo_nowcast.config.Config`
"""
sleep_seconds = _prep_schedule(config)
while True:
schedule.run_pending()
time.sleep(sleep_seconds)
def _prep_schedule(config):
"""Create the schedule to launch workers and set how often it is checked.
:param config: Nowcast system configuration.
:type config: :py:class:`nemo_nowcast.config.Config`
"""
sleep_seconds = 60
try:
for sched_item in config["scheduled workers"]:
worker_module = list(sched_item.keys())[0]
_create_scheduled_job(worker_module, sched_item[worker_module], config)
except (AttributeError, KeyError):
# Do nothing if scheduled workers config section is missing or empty
pass
return sleep_seconds
def _create_scheduled_job(worker_module, params, config):
try:
args = params["cmd line opts"].split()
except KeyError:
args = []
worker = NextWorker(worker_module, args)
job = (
schedule.every()
.__getattribute__(params["every"])
.at(params["at"])
.do(worker.launch, config, NAME)
)
return job
def _install_signal_handlers():
"""Set up hangup, interrupt, and kill signal handlers.
"""
def sighup_handler(signal, frame):
logger.info("hangup signal (SIGHUP) received; reloading configuration")
main()
signal.signal(signal.SIGHUP, sighup_handler)
def sigint_handler(signal, frame):
logger.info("interrupt signal (SIGINT or Ctrl-C) received; shutting down")
raise SystemExit
signal.signal(signal.SIGINT, sigint_handler)
def sigterm_handler(signal, frame):
logger.info("termination signal (SIGTERM) received; shutting down")
raise SystemExit
signal.signal(signal.SIGTERM, sigterm_handler)
if __name__ == "__main__":
main() # pragma: no cover