Aviso is a system developed at ECMWF with the aim of:
- Notifying for data availability
- Real-Time Model Output Data
- Product dissemination via ECPDS
- Meant for automated systems
- Based on an Event system supporting a mechanism When <this> … Do <that> …
It allows users to:
- Define which events to be notified of.
- Define the triggers to be executed once a notification is received.
- Dispatch notifications to the notification server.
Content
Install AVISO and configure for ECMWF events
Aviso can be used as a Python API or as Command-Line Interface (CLI) application. Here a few steps to quickly get a working configuration listening to notifications.
Note, this guide assumes the user to have a ECMWF account.
- Install AVISO following instructions on: https://pyaviso.readthedocs.io/en/latest/guide/getting_started.html#installing
- Configure MARS credentials following instructions on Access MARS
Create a configuration file in the default location ~/.aviso/config.yaml
username_file: ~/.marsrc/mars.email key_file: ~/.marsrc/mars.token notification_engine: type: etcd_rest host: aviso.ecmwf.int port: 443 https: true configuration_engine: type: etcd_rest host: aviso.ecmwf.int port: 443 https: true schema_parser: ecmwf remote_schema: True auth_type: ecmwf
- Create ~/.marsrc/mars.email and ~/.marsrc/mars.token files. mars.email file should contain “email” listed here: https://api.ecmwf.int/v1/key and mars.token file should contain “key” shown on the same page.
- Create your listener configuration file(s) with following content:
- Example for dissemination:Note thelistener_diss.yaml
listeners: - event: dissemination request: destination: <user_destination> class: od expver: 1 domain: g stream: enfo step: [1,2,3] triggers: - type: echo
dissemination
event listener.request
describes for which dissemination event users want to execute the triggers. It is made of a set of fields. Users have to specify only the fields they wants to use as filters.destination
is a mandatory field and it is associated to one or more destinations which are linked to the user's ECMWF account. Only the notifications complying with all the fields defined will execute the trigger. The trigger in this example isecho
. This will simply print out the notification to the console output. - Example for MARS:listener_mars.yaml
listeners: - event: mars request: class: od expver: 1 domain: g stream: enfo step: [1,2,3] triggers: - type: echo
Note, in the MARS event listener, destination field is not present.Aviso will notify when data exists in MARS, but most users will not be able to retrieve this data before the scheduled time (Dissemination schedule). Triggering an automated retrieval of data will require adding logic to ensure you only retrieve data when you have respective permissions to do so. For automation of downstream time-critical workflows, this should mainly be based on dissemination events.
- Example for dissemination:
- If AVISO is installed in Python virtual environment, activate the environment first:
source {PATH_TO_MY_ENV}/myenv/bin/activate
- Launch the AVISO application to listen ECMWF defined in your configuration file:
aviso listen listener_diss.yaml #or aviso listen listener_mars.yaml
Once in execution this command will create a process waiting for notifications. Users can terminate the application by typing CTRL+C .
Note, the configuration file is only read at start time, therefore every time users make changes to it they need to restart the listening process.
Defining your listeners
Aviso configuration file allows the definition of multiple listeners. Alternatively, the listeners configuration can be indicated as an independent file or multiple files:
aviso listen listener1.yaml listener2.yaml
Regardless where is defined, each listener is composed of:
- an
event
type - a
request
block - a
triggers
block
Event
Aviso offers notifications for the following types of events:
- The
dissemination
event is submitted by the product generation. The related listener configuration must define thedestination
field. A notification related to adissemination
event will have the fieldlocation
containing the URL to the product notified. The
mars
event is designed for real-time data from the model output. The related listener configuration has no mandatory fields. Moreover the related notification will not contain thelocation
field because users will have to access to it by the conventional MARS API.
Request
The table below shows the full list of fields accepted in a request
block. These fields represent a subset on the MARS language.
Field | Type | Event | Optional/Mandatory |
---|---|---|---|
destination | String, uppercase | dissemination | Mandatory |
target | String | dissemination | Optional |
class | Enum | All | Optional |
stream | Enum | All | Optional |
domain | Enum | All | Optional |
expver | Integer | All | Optional |
date | Date (e.g.20190810) | All | Optional |
time | Values: [0,6,12,18] | All | Optional |
step | Integer | All | Optional |
Triggers
The triggers
block accepts a sequence of triggers. Each trigger will result in an independent process executed every time a notification is received. This sections shows the type of triggers currently available.
Echo
This is the simplest trigger as it prints the notification to the console output. It is used for testing and it does not accept any extra parameters.
triggers: - type: echo
Log
This trigger logs the event to a log file. It is useful for recording the received event. Note, it will fail if the directory does not exist.
triggers: - type: log path: testLog.log
Command
This trigger allows the user to define a shell command to work with the notification.
triggers: - type: command working_dir: $HOME/aviso/examples command: ./script.sh --stream ${request.stream} --date ${request.date} --time ${request.time} --step ${request.step} environment: STEP: ${request.step} TIME: "The time is ${request.time}"
command
is the command that will be executed for each notification received. This is a mandatory field.environment
is a user defined list of local variables that will be passed to the command shell. This is an optional field.working_dir
defines the working directory that will be set before executing the command. This is an optional field.
Moreover, the system performs a parameter substitution in the command
and environment
fields, for every sequence of the pattern:
-
${name}
, it replaces it with the value associated to the corresponding key found in the notification received. ${json}
, it replaces it with the whole notification formatted as a JSON inline string.${jsonpath}
, it replaces it with the file name of a JSON file containing the notification.
A notification is a dictionary whose keys can be used in the parameter substitution mechanism described above. Here an example of a notification:
{ "event": "dissemination", "request": { "class": "od", "date": "20191112", "destination": "FOO", "domain": "g", "expver": "0001", "step": "001", "stream": "enfo", "time": "18", "target": "E1" }, "location": "/home/ecpds/xxx.xx" }
Here an example file of a listener command triggering a bash script executing a MARS request.
Post
This trigger allows the user to send as HTTP POST message the notification received and formatted accordingly to the CloudEvents specification
triggers: - type: post protocol: type: cloudevents_http url: http://my.notification.system/api
This is the basic configuration. More parameters can be specified to customise the CloudEvents message. More info the reference documentation.
The CloudEvents message sent would look like the following:
{ "type" : "aviso", # this is customisable by the user "data": { # this is aviso specific "event": "dissemination", "request": { "target": "E1", "class": "od", "date": "20190810", "destination": "FOO", "domain": "g", "expver": "1", "step": "1", "stream": "enfo", "time": "0", }, "location": "/home/diss/foo/bar/20190810/xyz", # location on ceph or s3 }, "datacontenttype": "application/json", "id": "0c02fdc5-148c-43b5-b2fa-cb1f590369ff", # UUID random generated by aviso "source": "https://aviso.ecmwf.int", # this is customisable by the user "specversion": "1.0", "time": "2020-03-02T13:34:40.245Z", # Timestamp of when this message is created }
Aviso can also post events to a AWS Simple Notification Service (SNS) topic. See the relevant documentation page for more details: https://pyaviso.readthedocs.io/en/latest/reference/triggers.html#post
Aviso as a Python API
Aviso can be used as a Python API. This is intended for users that want to integrate Aviso in a bigger workflow written in Python or that simply have their trigger defined as a Python function. Below an example of a python script that defines a function to be executed once a notification is received, creates a listener that references to this function trigger and finally passes it to aviso to execute.
from pyaviso import NotificationManager # define function to be called def do_something(notification): print(f"Notification for step {notification['request']['step']} received") # now do something useful with it ... # define the trigger trigger = {"type": "function", "function": do_something} # create a event listener request that uses that trigger request = {"class": "od", "stream": "oper", "expver": 1, "domain": "g", "step": 1} listeners = {"listeners": [{"event": "mars", "request": request, "triggers": [trigger]}]} # run it aviso = NotificationManager() aviso.listen(listeners=listeners)
Here an example file of a Python script running Aviso and executing a MARS request after a notification is received.
Dealing with past notifications
Before listening to new notifications, Aviso by default checks what was the last notification received and it will then return all the notifications that have been missed since. It will then carry on by listening to new ones. The first ever time the application runs however no previous notification will be returned. This behaviour allows users not to miss any notifications in case of machine reboots.
To override this behaviour by ignoring the missed notifications while listening only to the new ones, run the following:
aviso listen --now
This command will also reset the previous history.
Users can also explicitly replay past notifications. Aviso can deliver notifications from the ECMWF server up to 14 days in the past. This can also be used to test the listener configuration with real notifications.
Here an example, launch Aviso with the following options:
aviso listen --from 2020-01-20T00:00:00.0Z --to 2020-01-21T00:00:00.0Z
It will replay all the notifications sent from 20 January to 21 January and the ones complying with the listener request will execute the triggers.
Note, the dates must be in the past and --to
can only be defined together with --from
. The dates are defined in ISO format and they are in UTC.
In absence of --to
, the system after having retrieved the past notifications, it will continue listening to future notifications. If --to
is defined Aviso will terminate once retrieved all the past notifications.
Examples
There is a GitHub repository with collection of example Python scripts and listener configuration files specifically designed for the AVISO software and notifications generated by ECMWF. These examples are intended to illustrate how AVISO can be effectively used with data distributed by ECMWF and we strongly recommend going through the examples if you intend to use AVISO to get notifications about availability of ECMWF data.
Upgrading AVISO installation
To upgrade Aviso to a newer version once it is available run the following command:
pip3 install --upgrade pyaviso
Running as a service on Linux machines
Aviso can be executed as a system service. This helps automating its restart in case of machine reboots. The following steps help to configure Aviso to run as a service that automatically restart:
Create a system service unit, by creating the following file in /etc/systemd/system/aviso.service:
[Unit] Description=Aviso [Service] User=<username> (if omitted it will run as root) Group=<groupname> (optional) WorkingDirectory= <home_directory> (optional) ExecStart=/opt/anaconda3/bin/aviso listen Restart=always [Install] WantedBy=multi-user.target
Enable the aviso service:
sudo systemctl enable aviso.service
Reload systemd:
sudo systemctl daemon-reload
Start the service:
sudo systemctl start aviso.service
Note, if users change Aviso configuration, Aviso service must be restarted otherwise the change will be ineffective.