As providers start to add WIGOS ids to their data, the need to test how the NWP model copes with the WIGOS ids raisedThe motivation for this test case is to test how NWP models deal with WIGOS ids.
To this aim, a Python3 program has been created to add WIGOS ids to current SYNOP messages received at ECMWF.
...
The outline of this page is :
1) Problem description
2) Program flow
3) Test data file and caveats
Data date of predefined data set is: 2019-10-15 till 2019-10-17
1) Description
The WIGOS id contains four parts such as 0-2XXXX-0-YYYYY,
wigosIdentifierSeries | Issuer of Identifier | Issue Number | LocalIdentifier |
---|---|---|---|
0 | 2XXXX | 0 | YYYYY |
The OSCAR web REST REST API interface was used to obtain a list of all the WIGOS Ids available at the moment ( 18 Oct ("https://oscar.wmo.int/surface/rest/api/search/station?) was used to obtain a list of all the WIGOS Ids available at the moment ( ).
From this information only the surface observations 0-20000-0-YYYYY were used.
The last part of the WIGOS id, ( local Identifier) matches the current BUFR message identifier ( concatenation of blockNumber and stationNumber) and is used to do the mapping between
old stations and their their WIGOS ids.
2)Program description
Code Block | ||
---|---|---|
| ||
''' Created on 22 Oct 2019 # Copyright 2005-2018 ECMWF. # This software is licensed under the terms of the Apache Licence Version 2.0 # which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. # In applying this licence, ECMWF does not waive the privileges and immunities # granted to it by virtue of its status as an intergovernmental organisation # nor does it submit to any jurisdiction This is a test program to encode Wigos Synop requires 1) ecCodes version 2.814.1 or above (available at https://confluence.ecmwf.int/display/ECC/Releases) 2) python2.7python3.6.8-01 To run the program ./wigosTempaddWigosProg.py -i synop_multi_subset.bufr -o out_synop_multisubset.bufr -w WIGOS_TEMP_IDENT.csv Uses BUFR version 4 template and adds the WIGOS Identifier 301150 REQUIRES TablesVersionNumber above 28 Author : Roberto Ribas Garcia ECMWF 1228/0910/2019 ''' from eccodes import * import argparse import json import re import pandas as pd import numpy as np import logging import requests import os def read_cmd_line(): p=argparse.ArgumentParser() p.add_argument("-i","--input",help="input bufr file") p.add_argument("-o","--output",help="output bufr file with wigos") p.add_argument("-m","--mode",choices=["web","json"],help=" wigos source [ json file or web ]") p.add_argument("-l","--logfile",help="log file ") args=p.parse_args() return args def read_oscar_json(jsonFile): with open(jsonFile,"r") as f: Modifications performance improvement ( uses skipExtraKeyAttributes) and codes_clone 04/11/2019 changes for SYNOP and TEMP messages 05/11/2019 fixed codes_clone issue jtext=json.load(f) return jtext def read_oscar_web(oscarURL="https://oscar.wmo.int/surface/rest/api/search/station?"): r=requests.get(oscarURL) jtext=json.loads(r.text) return jtext def parse_json_into_dataframe(jtext): ''' parses the JSON from the file wigosJsonFile filters the stations by wigosStationIdentifiers key in the dictionaries ''' wigosStations=[] nowigosStations=[] for d in jtext: if "wigosStationIdentifiers" in d.keys(): wigosStations.append(d) else: nowigosStations.append(d) ''' uses only the wigos 0-20XXX-0-YYYYY (surface) ''' p=re.compile("0-20\d{3}-0-\d{5}") fwigosStations=[] for d in wigosStations: wigosInfo=d["wigosStationIdentifiers"] for e in wigosInfo: if e["primary"]==True: wigosId=e["wigosStationIdentifier"] 05/11/2019 ''' from eccodes import * import argparse import json import re import pandas as pd import numpy as np import logging import requests import os def read_cmd_line(): p=argparse.ArgumentParser() p.add_argument("-i","--input",help="input bufr file") p.add_argument("-o","--output",help="output bufr file with wigos") p.add_argument("-m","--mode",choices=["web","json"],help=" wigos source [ json file or web ]") p.add_argument("-l","--logfile",help="log file ") args=p.parse_args() return args def read_oscar_json(jsonFile): with open(jsonFile,"r") as f: jtext=json.load(f) return jtext def read_oscar_web(oscarURL="https://oscar.wmo.int/surface/rest/api/search/station?"): r=requests.get(oscarURL) jtext=json.loads(r.text) return jtext def parse_json_into_dataframe(jtext): ''' parses the JSON from the file wigosJsonFile filters the stations by wigosStationIdentifiers key in the dictionaries ''' wigosStations=[] nowigosStations=[] for d in jtext: if if p.match(wigosId"wigosStationIdentifiers" in d.keys(): wigosParts=wigosId.split("-"wigosStations.append(d) else: d["wigosIdentifierSeries"]=wigosParts[0]nowigosStations.append(d) ''' uses only the wigos d["wigosIssuerOfIdentifier"]=wigosParts[1]0-20XXX-0-YYYYY (surface) ''' p=re.compile("0-20\d{3}-0-\d{5}") fwigosStations=[] for d in wigosStations: wigosInfo=d["wigosIssueNumberwigosStationIdentifiers"]=wigosParts[2] for e in wigosInfo: if de["wigosLocalIdentifierCharacterprimary"]=wigosParts[3]=True: dwigosId=e["oldIDwigosStationIdentifier"]=wigosParts[3][-5:] if fwigosStations.append(d)p.match(wigosId): wigosParts=wigosId.split("-") df=pd.DataFrame(fwigosStations) df=df[["longitude","latitude","name","wigosStationIdentifiers","wigosIdentifierSeries","wigosIssuerOfIdentifier","wigosIssueNumber", "wigosLocalIdentifierCharacter","oldID"]] d["wigosIdentifierSeries"]=wigosParts[0] return df def get_ident(bid): ''' gets the ident of the message by combining blockNumber and stationNumber keys from the input BUFR file d["wigosIssuerOfIdentifier"]=wigosParts[1] the ident may be single valued or multivalued ( only single valued are considered further) d["wigosIssueNumber"]=wigosParts[2] ''' ident=None if ( codes_is_defined(bid, "blockNumber") and codes_is_defined(bid,"stationNumber") ): d["wigosLocalIdentifierCharacter"]=wigosParts[3] blockNumber=codes_get_array(bid,"blockNumber")d["oldID"]=wigosParts[3][-5:] stationNumber=codes_get_array(bid,"stationNumber" fwigosStations.append(d) if len(blockNumber)==1 and len(stationNumber)==1: ident="{0:02d}{1:03d}".format(int(blockNumber),int(stationNumber))df=pd.DataFrame(fwigosStations) elif len(blockNumber)==1 and len(stationNumber)!=1: df=df[["longitude","latitude","name","wigosStationIdentifiers","wigosIdentifierSeries","wigosIssuerOfIdentifier","wigosIssueNumber", "wigosLocalIdentifierCharacter","oldID"]] blockNumber=np.repeat(blockNumber,len(stationNumber))return df def get_ident(bid): ''' gets the ident ident=[str("{0:02d}{1:03d}".format(b,s)) for b,s in zip(blockNumber,stationNumber) of the message by combining blockNumber and stationNumber keys from the input BUFR file the ident may be single valued or multivalued if b!=CODES_MISSING_LONG and s!=CODES_MISSING_LONG] ( only single valued are considered further) elif len(blockNumber)!=1 and len(stationNumber)!=1: ''' ident=None if ident=[str("{0:02d}{1:03d}".format(b,s)) for b,s in zip(blockNumber,stationNumber) ( codes_is_defined(bid, "blockNumber") and codes_is_defined(bid,"stationNumber") ): blockNumber=codes_get_array(bid,"blockNumber") if b!=CODES_MISSING_LONG and s!=CODES_MISSING_LONG]stationNumber=codes_get_array(bid,"stationNumber") if len(blockNumber)==1 return ident def add_wigos_info(ident,bid,wdf,obid)and len(stationNumber)==1: ''' add the wigos information to the message ident pointed by bid the wdf is the whole wigos dataframe and obid is the output bid ''' if codes_is_defined(bid, "shortDelayedDescriptorReplicationFactor"): ident="{0:02d}{1:03d}".format(int(blockNumber),int(stationNumber)) elif len(blockNumber)==1 and len(stationNumber)!=1: blockNumber=np.repeat(blockNumber,len(stationNumber)) ident=[str("{0:02d}{1:03d}".format(b,s)) for b,s in zip(blockNumber,stationNumber) shortDelayed=codes_get_array(bid,"shortDelayedDescriptorReplicationFactor") else: if b!=CODES_MISSING_LONG and shortDelayed=None s!=CODES_MISSING_LONG] if codes_is_defined(bid, "delayedDescriptorReplicationFactor"): elif len(blockNumber)!=1 and len(stationNumber)!=1: delayedDesc=codes_get_array(bid,"delayedDescriptorReplicationFactor") else: ident=[str("{0:02d}{1:03d}".format(b,s)) for b,s in zip(blockNumber,stationNumber) delayedDesc=None if b!=CODES_MISSING_LONG and s!=CODES_MISSING_LONG] nsubsets=codes_get(bid,"numberOfSubsets") compressed=codes_get(bid,"compressedData")''' masterTablesVersionNumber=codes_get(bid,"masterTablesVersionNumber") if masterTablesVersionNumber<28: masterTablesVersionNumber=28here only the first element of the list is returned to the main program this avoids lists unexpandedDescriptors=codes_get_array(bid,"unexpandedDescriptors") outUD=list(unexpandedDescriptors) outUD.insert(0,301150)being used in the dataframe query and breaking the logic ''' only treat the uncompressed messages with 1 subset if isinstance(ident,list): for future add treatment of compressed messages with more than 1 subset ''' ident=ident[0] return ident def add_wigos_info(ident,bid,odf,obid): if compressed==0 and nsubsets==1: if shortDelayed is not None: ''' add the wigos information to the message ident pointed by bid the odf contains the WIGOS information for codes_set_array(obid,"inputShortDelayedDescriptorReplicationFactor",shortDelayed)ident obid is the output ifhandle delayedDesc is not None:''' if codes_setis_arraydefined(obidbid, "inputDelayedDescriptorReplicationFactorshortDelayedDescriptorReplicationFactor",delayedDesc): shortDelayed=codes_setget_array(obidbid,"masterTablesVersionNumbershortDelayedDescriptorReplicationFactor",masterTablesVersionNumber) else: codes_set(obid,"numberOfSubsets",nsubsets) shortDelayed=None if codes_is_defined(bid, "delayedDescriptorReplicationFactor"): odf=wdf.query("oldID=='{0}'".format(ident))delayedDesc=codes_get_array(bid,"delayedDescriptorReplicationFactor") else: if not odf.empty: delayedDesc=None if codes_setis_arraydefined(obidbid, "unexpandedDescriptorsextendedDelayedDescriptorReplicationFactor",outUD): wis=odf["wigosIdentifierSeries"].values extDelayedDesc=codes_get_array(bid,"extendedDelayedDescriptorReplicationFactor") else: if len(wis)!=1:extDelayedDesc=None nsubsets=codes_get(bid,"numberOfSubsets") wis=wis[0]compressed=codes_get(bid,"compressedData") masterTablesVersionNumber=codes_setget(obidbid,"wigosIdentifierSeriesmasterTablesVersionNumber",int(wis)) if masterTablesVersionNumber<28: wid=odf["wigosIssuerOfIdentifier"].values masterTablesVersionNumber=28 if len(wid)!=1:unexpandedDescriptors=codes_get_array(bid,"unexpandedDescriptors") outUD=list(unexpandedDescriptors) wid=wid[0]outUD.insert(0,301150) codes_set(obid,"wigosIssuerOfIdentifier",int(wid))''' only treat the uncompressed messages with 1 win=odf["wigosIssueNumber"].values subset for future add treatment of compressed messages with if len(win)!=1:more than 1 subset ''' if compressed==0 and win=win[0]nsubsets==1: ''' codes_set(obid,"wigosIssueNumber",int(win)) IMPORTANT, takes into account delayed replications ( all possible cases) to accommodate wlid=odf["wigosLocalIdentifierCharacter"].values SYNOP + TEMP messages wlid="{0:5}".format(wlid[0])''' if shortDelayed is logging.info(" wlid here {0}".format(wlid))not None: codes_set_array(obid,"wigosLocalIdentifierCharacter",str(wlid))inputShortDelayedDescriptorReplicationFactor",shortDelayed) if delayedDesc is not None: codes_bufrset_copy_data(bid,obidarray(obid,"inputDelayedDescriptorReplicationFactor",delayedDesc) if extDelayedDesc is not elseNone: logging.info(" wigos {0} is empty for ident {1}".format(ident,odf["wigosLocalIdentifierCharacter"].values)) codes_set_array(obid,"inputExtendedDelayedDescriptorReplicationFactor",extDelayedDesc) else: logging.info(" skipping compressed message id {0} with {1} subsets ".format(ident,nsubsets)) codes_set(obid,"masterTablesVersionNumber",masterTablesVersionNumber) codes_set(obid,"numberOfSubsets",nsubsets) return obid def main(): args=read_cmd_line() logfile=args.logfile logging.basicConfig(filename=logfile,level=logging.INFO,filemode="w"codes_set_array(obid, "unexpandedDescriptors",outUD) infile=args.inputwis=odf["wigosIdentifierSeries"].values outfile=args.output if len(wis)!=1: mode=args.mode if mode=="web":wis=wis[0] jtext=read_oscar_web(codes_set(obid,"wigosIdentifierSeries",int(wis)) cdirectory=os.getcwd() wid=odf["wigosIssuerOfIdentifier"].values oscarFile=os.path.join(cdirectory,"oscar.json")if len(wid)!=1: with open(oscarFile,"w") as f: wid=wid[0] json.dump(jtext,fcodes_set(obid,"wigosIssuerOfIdentifier",int(wid)) else: cdirectory=os.getcwd()win=odf["wigosIssueNumber"].values if len(win)!=1: oscarFile=os.path.join(cdirectory,"oscar.json") win=win[0] with open(oscarFilecodes_set(obid,"rwigosIssueNumber",int(win)) as f: jtext=json.load(f) wlid=odf["wigosLocalIdentifierCharacter"].values wlid="{0:5}".format(wlid[0]) logging.info(" wlid wigosDf=parse_json_into_dataframe(jtexthere {0}".format(wlid)) f=open(infilecodes_set(obid,"rbwigosLocalIdentifierCharacter",str(wlid)) nmsg= codes_countbufr_incopy_filedata(fbid,obid) fout=open(outfile,"wb") else: for i in range(0,nmsg): obid=codes_bufr_new_from_samples("BUFR4"logging.info(" skipping compressed message id {0} with {1} subsets ".format(ident,nsubsets)) bid=codes_bufr_new_from_file(f) return codes_set(bid,"unpack",1) ident=get_ident(bid) def main(): if ident: logging.info print("ecCodes \t messageversion {0} ident {1} ".format(i+1,ident(codes_get_api_version())) args=read_cmd_line() logfile=args.logfile add_wigos_info(ident,bid, wigosDf, obidlogging.basicConfig(filename=logfile,level=logging.INFO,filemode="w") infile=args.input codes_write(obid,fout) outfile=args.output mode=args.mode else: if mode=="web": logging.info ("message {0} rejected ".format(i+1) jtext=read_oscar_web() cdirectory=os.getcwd() codes_release(obid)oscarFile=os.path.join(cdirectory,"oscar.json") with open(oscarFile,"w") as f: codes_release(bid) fjson.closedump(jtext,f) else: cdirectory=os.getcwd() print (" finished oscarFile=os.path.join(cdirectory,"oscar.json") if __name__ == '__main__': with mainopen(oscarFile,"r") |
The program can be called with the following arguments
-i input BUFR file containing SYNOP messages without WIGOS ids
-o output BUFR file that will contain the SYNOP messages with Wigos Id.
-m mode ( can be web to make the program connect to OSCAR server or json to make the program use a JSON file containing the same information as the OSCAR server) this was done to speed up the development avoiding reloading the Oscar data from the web
-l log file to write the progress of the conversion
The program flow is the following
1) read the command line arguments
2) read the OSCAR information from web or JSON file and store it in a pandas DataFrame that will help in the mapping.
3) open the input BUFR file
4) for each message, find the message identifier ( concatenation of blockNumber+stationNumber). With this information
the function add_wigos_info is called with the wigosDf dataframe ( the mapping) and the input and output BUFR handles ibid and obid.
4.a ) the add_wigos_info function has a complex task, for each message it has to
check if the delayedDescriptors are present and set them in the output message if they are present.
find in the wigosDf dataframe the ident message identifier. If found, adds the wigos information retrieved from the wigosDf
copy the rest of the data to the output message
...
as f:
jtext=json.load(f)
wigosDf=parse_json_into_dataframe(jtext)
f=open(infile,"rb")
nmsg=codes_count_in_file(f)
fout=open(outfile,"wb")
for i in range(0,nmsg):
bid=codes_bufr_new_from_file(f)
obid=codes_clone(bid)
codes_set(bid, 'skipExtraKeyAttributes', 1)
codes_set(bid,"unpack",1)
ident=get_ident(bid)
if ident:
logging.info (" \t message {0} ident {1} ".format(i+1,ident))
odf=wigosDf.query("oldID=='{0}'".format(ident))
if not odf.empty:
add_wigos_info(ident,bid, odf,obid)
codes_write(obid,fout)
else:
logging.info(" wigos {0} is empty for ident {1}".format(ident,odf["wigosLocalIdentifierCharacter"].values))
else:
logging.info ("message {0} rejected ".format(i+1))
codes_release(obid)
codes_release(bid)
f.close()
print (" finished")
if __name__ == '__main__':
main() |
The program can be called with the following arguments
-i input BUFR file containing SYNOP messages without WIGOS ids
-o output BUFR file that will contain the SYNOP messages with WIGOS Id.
-m mode ( can be 'web' to allow the program connect to OSCAR server or 'json' to make the program use a JSON file containing the same information as the OSCAR server) this was done to speed up the development avoiding reloading the Oscar data from the web
-l log file to write the progress of the conversion
The program flow is the following
1) read the command line arguments
2) read the OSCAR information from web or JSON file and store it in a pandas DataFrame that will help in the mapping. The two functions read_oscar_web and read_oscar_json return a JSON list of dictionaries
that are filtered to retain only the surface observations with issuer Number 20000( surface observations) Then a pandas dataframe is used to store this information and help in the querying of the database.
3) open the input BUFR file and read each individual message
4) for each message, create the message identifier ( concatenation of blockNumber+stationNumber) and add the WIGOS information to the messages
that are uncompressed ( compressed =0) and single subset ( numberOfSubsets=1) if their ident matches the ones in wigosDf.
5) If get_ident function founds many idents on a message only returns the first one.
During program execution a log file is generated containing information about the processing.
At this point some caveats are needed
- Only uncompressed messages (compressed =0) and single subset (numberOfSubsets=1) are considered
- The Oscar information retrieved from the web server has to be cleared for this program to work. This is the goal of the function parse_json_into_dataframe that uses regular expressions to filter out the WIGOS data.
- When setting the WIGOS information It is important to preserve the data types , for example "wigosLocalIdentifierCharacter" is a character string.
- The masterTablesVersionNumber must be above 28 otherwise no WIGOS ids can be added. This is done in the add_wigos_info function that updates the table version number key for each message processed.
Results
The output file contains 19543 SYNOP messages obtained from running the program on a input BUFR file containing raw SYNOP data received through GTS
View file | ||||
---|---|---|---|---|
|
This file contains 7 TEMP messages obtained running the program on a BUFR file containing raw TEMP messages.
View file | ||||
---|---|---|---|---|
|
...