...
The outline of this page is :
1) Problem description
2) Program flow
3) Test data file and caveats
Data date of predefined data set is: 2019-10-15 till 2019-10-17
1) Description
The WIGOS id contains four parts such as 0-2XXXX-0-YYYYY,
...
old stations and their WIGOS ids.
2)Program description
Code Block | ||
---|---|---|
| ||
''' Created on 22 Oct 2019 # Copyright 2005-2018 ECMWF. # This software is licensed under the terms of the Apache Licence Version 2.0 # which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. # In applying this licence, ECMWF does not waive the privileges and immunities # granted to it by virtue of its status as an intergovernmental organisation # nor does it submit to any jurisdiction This is a test program to encode Wigos Synop requires 1) ecCodes version 2.14.81 or above (available at https://confluence.ecmwf.int/display/ECC/Releases) 2) python3.6.8-01 To run the program -i <input bufr >./addWigosProg.py -m <mode [web|json]> -l <logFile> -o <output BUFR file>i synop_multi_subset.bufr -o out_synop_multisubset.bufr -w WIGOS_TEMP_IDENT.csv Uses BUFR version 4 template and adds the WIGOS Identifier 301150 REQUIRES TablesVersionNumber above 28 Author : Roberto Ribas Garcia ECMWF 28/10/2019 Modifications Addedperformance copy_headerimprovement function( touses keepskipExtraKeyAttributes) the headerand keyscodes_clone from the input message 04/11/2019 ''' from eccodes import * import argparsechanges importfor json import re import pandas as pd import numpy as np import logging import requests import os def read_cmd_line(): p=argparse.ArgumentParser() p.add_argument("-i","--input",help="input bufr file") p.add_argument("-o","--output",help="output bufr file with wigos") p.add_argument("-m","--mode",choices=["web","json"],help=" wigos source [ json file or web ]") p.add_argument("-l","--logfile",help="log file ") args=p.parse_args() return args def read_oscar_json(jsonFile): with open(jsonFile,"r") as f: SYNOP and TEMP messages 05/11/2019 fixed codes_clone issue jtext=json.load(f) return jtext def read_oscar_web(oscarURL="https://oscar.wmo.int/surface/rest/api/search/station?"): r=requests.get(oscarURL) jtext=json.loads(r.text) return jtext 05/11/2019 ''' from eccodes import * import argparse import json import re import pandas as pd import numpy as np import logging import requests import os def parseread_jsoncmd_into_dataframeline(jtext): '''p=argparse.ArgumentParser() parses the JSON from the file wigosJsonFilep.add_argument("-i","--input",help="input bufr file") filters the stations by wigosStationIdentifiers key in the dictionaries ''' wigosStations=[] nowigosStations=[] for d in jtext: if "wigosStationIdentifiers" in d.keys(): wigosStations.append(d) elsep.add_argument("-o","--output",help="output bufr file with wigos") p.add_argument("-m","--mode",choices=["web","json"],help=" wigos source [ json file or web ]") p.add_argument("-l","--logfile",help="log file ") args=p.parse_args() return args def read_oscar_json(jsonFile): with open(jsonFile,"r") as f: nowigosStations.append(djtext=json.load(f) return jtext ''' def read_oscar_web(oscarURL="https://oscar.wmo.int/surface/rest/api/search/station?"): uses only the wigos 0-20XXX-0-YYYYY (surfacer=requests.get(oscarURL) ''' p=re.compile("0-20\d{3}-0-\d{5}") jtext=json.loads(r.text) fwigosStations=[] return jtext def for d in wigosStations:parse_json_into_dataframe(jtext): ''' parses the JSON wigosInfo=d["wigosStationIdentifiers"]from the file wigosJsonFile filters the stations by forwigosStationIdentifiers ekey in wigosInfo: the dictionaries ''' wigosStations=[] if enowigosStations=["primary"]==True: for d in jtext: if wigosId=e["wigosStationIdentifierwigosStationIdentifiers"] in d.keys(): if p.match(wigosId):wigosStations.append(d) else: wigosParts=wigosIdnowigosStations.splitappend("-"d) ''' uses only the wigos d["wigosIdentifierSeries"]=wigosParts[0]0-20XXX-0-YYYYY (surface) ''' p=re.compile("0-20\d{3}-0-\d{5}") d["wigosIssuerOfIdentifier"]=wigosParts[1fwigosStations=[] for d in wigosStations: wigosInfo=d["wigosIssueNumberwigosStationIdentifiers"]=wigosParts[2] for e in wigosInfo: if de["wigosLocalIdentifierCharacterprimary"]=wigosParts[3]=True: dwigosId=e["oldID"]=wigosParts[3][-5:wigosStationIdentifier"] if fwigosStations.append(d)p.match(wigosId): wigosParts=wigosId.split("-") df=pd.DataFrame(fwigosStations) df=dfd[["longitude","latitude","name","wigosStationIdentifiers","wigosIdentifierSeries","wigosIssuerOfIdentifier","wigosIssueNumber", wigosIdentifierSeries"]=wigosParts[0] "wigosLocalIdentifierCharacter","oldID"]] d["wigosIssuerOfIdentifier"]=wigosParts[1] return df def get_ident(bid): ''' gets the ident of the message by combining blockNumber and stationNumber keys from the input BUFR file d["wigosIssueNumber"]=wigosParts[2] the ident may be single valued or multivalued ( only single valued are considered further) d["wigosLocalIdentifierCharacter"]=wigosParts[3] ''' ident=None if ( codes_is_defined(bid, "blockNumber") and codes_is_defined(bid,"stationNumber") ): d["oldID"]=wigosParts[3][-5:] blockNumber=codes_get_array(bid,"blockNumber"fwigosStations.append(d) stationNumber=codes_get_array(bid,"stationNumber") if len(blockNumber)==1 and len(stationNumber)==1: df=pd.DataFrame(fwigosStations) df=df[["longitude","latitude","name","wigosStationIdentifiers","wigosIdentifierSeries","wigosIssuerOfIdentifier","wigosIssueNumber", ident="{0:02d}{1:03d}".format(int(blockNumber),int(stationNumber)) "wigosLocalIdentifierCharacter","oldID"]] return elif len(blockNumber)==1 and len(stationNumber)!=1:df def get_ident(bid): ''' gets the ident of the message by combining blockNumber=np.repeat(blockNumber,len(stationNumber)) and stationNumber keys from the input BUFR file the ident=[str("{0:02d}{1:03d}".format(b,s)) for b,s in zip(blockNumber,stationNumber) may be single valued or multivalued ( only single valued are considered further) ''' if b!=CODES_MISSING_LONG and s!=CODES_MISSING_LONG]ident=None if ( elif len(blockNumber)!=1 and len(stationNumber)!=1codes_is_defined(bid, "blockNumber") and codes_is_defined(bid,"stationNumber") ): ident=[str("{0:02d}{1:03d}".format(b,s)) for b,s in zip(blockNumber,stationNumber) blockNumber=codes_get_array(bid,"blockNumber") stationNumber=codes_get_array(bid,"stationNumber") if len(blockNumber)==1 and len(stationNumber)==1: if b!=CODES_MISSING_LONG and s!=CODES_MISSING_LONG] ident="{0:02d}{1:03d}".format(int(blockNumber),int(stationNumber)) elif len(blockNumber)==1 and len(stationNumber)!=1: return ident def copy_header(bid,obid): blockNumber=np.repeat(blockNumber,len(stationNumber)) ''' this function copies the header keys ''' bhc=codes_get(bid,"bufrHeaderCentre") codes_set(obid,"bufrHeaderCentre",bhc) bhsc=codes_get(bid,"bufrHeaderSubCentre") codes_set(obid,"bufrHeaderSubCentre",bhsc) usn=codes_get(bid,"updateSequenceNumber") codes_set(obid,"updateSequenceNumber",usn) dc=codes_get(bid,"dataCategory") codes_set(obid,"dataCategory",dc) dsc=codes_get(bid,"dataSubCategory") codes_set(obid,"dataSubCategory",dsc) year=codes_get(bid,"typicalYear") codes_set(obid,"typicalYear",year) month=codes_get(bid,"typicalMonth") codes_set(obid,"typicalMonth",month) day=codes_get(bid,"typicalDay") codes_set(obid,"typicalDay",day) hour=codes_get(bid,"typicalHour") codes_set(obid,"typicalHour",hour) tmin=codes_get(bid,"typicalMinute") codes_set(obid,"typicalMinute",tmin) sec=codes_get(bid,"typicalSecond") codes_set(obid,"typicalSecond",sec) return def add_wigos_info(ident,bid,wdf,obid): ''' add the wigos information to the message ident pointed by bid the wdf is the whole wigos dataframe and obid is the output bid ''' ident=[str("{0:02d}{1:03d}".format(b,s)) for b,s in zip(blockNumber,stationNumber) if b!=CODES_MISSING_LONG and s!=CODES_MISSING_LONG] elif len(blockNumber)!=1 and len(stationNumber)!=1: ident=[str("{0:02d}{1:03d}".format(b,s)) for b,s in zip(blockNumber,stationNumber) if b!=CODES_MISSING_LONG and s!=CODES_MISSING_LONG] ''' here only the first element of the list is returned to the main program this avoids lists being used in the dataframe query and breaking the logic ''' if isinstance(ident,list): ident=ident[0] return ident def add_wigos_info(ident,bid,odf,obid): ''' add the wigos information to the message ident pointed by bid the odf contains the WIGOS information for ident obid is the output handle ''' if codes_is_defined(bid, "shortDelayedDescriptorReplicationFactor"): shortDelayed=codes_get_array(bid,"shortDelayedDescriptorReplicationFactor") else: shortDelayed=None if codes_is_defined(bid, "delayedDescriptorReplicationFactor"): delayedDesc=codes_get_array(bid,"delayedDescriptorReplicationFactor") else: delayedDesc=None if codes_is_defined(bid, "extendedDelayedDescriptorReplicationFactor"): nsubsetsextDelayedDesc=codes_get_array(bid,"numberOfSubsetsextendedDelayedDescriptorReplicationFactor") else: extDelayedDesc=None nsubsets=codes_get(bid,"numberOfSubsets") compressed=codes_get(bid,"compressedData") masterTablesVersionNumber=codes_get(bid,"masterTablesVersionNumber") if masterTablesVersionNumber<28: masterTablesVersionNumber=28 unexpandedDescriptors=codes_get_array(bid,"unexpandedDescriptors") outUD=list(unexpandedDescriptors) outUD.insert(0,301150) ''' only treat the uncompressed messages with 1 subset for future add treatment of compressed messages with more than 1 subset ''' if compressed==0 and nsubsets==1: if''' shortDelayed is not None: IMPORTANT, takes into account delayed replications ( all codes_possible cases) to accommodate SYNOP + TEMP messages ''' if shortDelayed is not None: codes_set_array(obid,"inputShortDelayedDescriptorReplicationFactor",shortDelayed) if delayedDesc is not None: codes_set_array(obid,"inputDelayedDescriptorReplicationFactor",delayedDesc) copy_header(bid,obid) if extDelayedDesc is not None: codes_set_array(obid,"inputExtendedDelayedDescriptorReplicationFactor",extDelayedDesc) codes_set(obid,"masterTablesVersionNumber",masterTablesVersionNumber) codes_set(obid,"numberOfSubsets",nsubsets) odf=wdf.query("oldID=='{0}'".format(ident)) if not odf.empty: codes_set_array(obid, "unexpandedDescriptors",outUD) wis=odf["wigosIdentifierSeries"].values if len(wis)!=1: wis=wis[0] codes_set(obid,"wigosIdentifierSeries",int(wis)) wid=odf["wigosIssuerOfIdentifier"].values if len(wid)!=1: wid=wid[0] codes_set(obid,"wigosIssuerOfIdentifier",int(wid)) win=odf["wigosIssueNumber"].values if len(win)!=1: win=win[0] codes_set(obid,"wigosIssueNumber",int(win)) wlid=odf["wigosLocalIdentifierCharacter"].values wlid="{0:5}".format(wlid[0]) logging.logging.info(" wlid here {0}".format(wlid)) codes_set(obid,"wigosLocalIdentifierCharacter",str(wlid)) codes_bufr_copy_data(bid,obid) else: logging.info(" wigos skipping compressed message id {0} is empty for identwith {1} subsets ".format(ident,odf["wigosLocalIdentifierCharacter"].valuesnsubsets)) else: return logging.info(" skipping compressed message id {0} with {1} subsets ".format(ident,nsubsets)) def main(): print("ecCodes return obid def main():version {0}".format(codes_get_api_version())) args=read_cmd_line() logfile=args.logfile logging.basicConfig(filename=logfile,level=logging.INFO,filemode="w") infile=args.input outfile=args.output mode=args.mode if mode=="web": jtext=read_oscar_web() cdirectory=os.getcwd() oscarFile=os.path.join(cdirectory,"oscar.json") with open(oscarFile,"w") as f: json.dump(jtext,f) else: cdirectory=os.getcwd() oscarFile=os.path.join(cdirectory,"oscar.json") with open(oscarFile,"r") as f: jtext=json.load(f) wigosDf=parse_json_into_dataframe(jtext) f=open(infile,"rb") nmsg=codes_count_in_file(f) fout=open(outfile,"wb") for i in range(0,nmsg): obidbid=codes_bufr_new_from_samplesfile("BUFR4"f) bidobid=codes_bufr_new_from_file(fclone(bid) codes_set(bid, 'skipExtraKeyAttributes', 1) codes_set(bid,"unpack",1) ident=get_ident(bid) if ident: logging.info (" \t message {0} ident {1} ".format(i+1,ident)) add_wigos_info(ident,bid, wigosDf, obid) odf=wigosDf.query("oldID=='{0}'".format(ident)) codes_write(obid,fout) else: if not odf.empty: logging.info ("message {0} rejected ".format(i+1)) codesadd_wigos_release(info(ident,bid, odf,obid) codes_releasewrite(bidobid,fout) f.close() else: print (" finished") if __name__ == '__main__': main() logging.info(" wigos {0} is empty for ident {1}".format(ident,odf["wigosLocalIdentifierCharacter"].values)) else: logging.info ("message {0} rejected ".format(i+1)) codes_release(obid) codes_release(bid) f.close() print (" finished") if __name__ == '__main__': main() |
The program can be called with the following arguments
...
that are uncompressed ( compressed =0) and single subset ( numberOfSubsets=1) if their ident matches the ones in wigosDf.
5) a new function ( copy_header) was added to avoid changing the header of the message. Now, it copies the keys from bid to obid except typicalDate which is read onlyIf get_ident function founds many idents on a message only returns the first one.
During program execution a log file is generated containing information about the processing.
...
- Only uncompressed messages (compressed =0) and single subset (numberOfSubsets=1) are considered
- The Oscar information retrieved from the web server has to be cleared for this program to work. This is the goal of the function parse_json_into_dataframe that uses regular expressions to filter out the WIGOS data.
- When setting the WIGOS information It is important to preserve the data types , for example "wigosLocalIdentifierCharacter" is a character string.
- The masterTablesVersionNumber must be above 28 otherwise no WIGOS ids can be added. This is done in the add_wigos_info function that updates the table version number key for each message processed.
Results
- be above 28 otherwise no WIGOS ids can be added. This is done in the add_wigos_info function that updates the table version number key for each message processed.
Results
The output file contains 19543 SYNOP messages obtained from running the program on a input BUFR file containing raw SYNOP data received through GTS
View file | ||||
---|---|---|---|---|
|
This file contains 7 TEMP messages obtained running the program on a BUFR file containing raw TEMP messages.
View file | ||||
---|---|---|---|---|
|