Thanks to python syntax, provided there is no need to use include to merge code, one single python script can be used on the command line, but also as a task wrapper with no change, and it may provide the ability to load the task into the expected server/suite for operation and test.
meter sweeper Expand source
#!/usr/bin/env python """ python docstring: demonstrate python cli + ecFlow task wrapper + task loader $manual DESCRIPTION: sweeper: find out min/max among modeleps_nemo tasks OPERATORS: please, set complete if problematic ANALYST: example as pure python task - job $end $comment comments can be added ... $end """ import time import os.path import getopt import sys PATH = "/usr/local/apps/ecflow/4.0.9/lib/python2.7/site-packages/ecflow" sys.path.insert(0, PATH) import ecflow as ec MICRO = "$$" # double dollar to please ecFlow micro character balance TARGET_TASK = "modeleps_nemo" # use option -t to overwrite TARGET_METER = "step" # would this need an option??? MC_STEPS_LIST = ("/mc/main/12/legA/fc", # use option -p for single path "/mc/main/12/legB/fc", "/mc/main/18bc/legA/fc", "/mc/main/00/legA/fc", "/mc/main/00/legB/fc", "/mc/main/06bc/legA/fc", "/mofc/thu/01/legC/fc", "/mofc/thu/hind/14/back", "/mofc/mon/01/legC/fc", "/mofc/mon/hind/14/back",) def focus_below(node): """ continue down below nodes which name is expected ; please update for external use... """ return (node.name() in ("00", "12", "18bc", "06bc", "legA", "legB", "pf", "cf", "fc") or "main" == node.name()) def process(item, path=None, low=999, high=-1, task=None): """ track nodes to follow and update min/max """ if type(item) == str: defs = ec.Defs(item) elif isinstance(item, ec.Client): item.sync_local() defs = item.get_defs() elif isinstance(item, ec.Defs): defs = item else: defs = None if defs: for node in defs.suites: if node.name() != str(path.split('/')[1]): continue status = "%s" % node.get_state() if node.is_suspended() or status == "unknown": continue for item in node.nodes: low, high = process(item, path, low, high, task) elif isinstance(item, ec.Family): for node in item.nodes: if focus_below(node): return process(node, path, low, high, task) low, high = process(node, path, low, high, task) elif isinstance(item, ec.Task): node = item path2node = item.get_abs_node_path() status = "%s" % node.get_state() # ignore cal-val # ecmwf specific if ("/cv/" in path2node or not task in item.name()): return low, high if "/cv/" in path2node and (high > 240 or low < -1): high = 240 return low, high stamp = "" evt = "" meter = "" if node.is_suspended(): status = "suspended" try: for att in node.meters: meter = "%s" % att.value() if att.name() != TARGET_METER: continue if att.value() > high: high = att.value() elif att.value() < low: low = att.value() except Exception as excpt: print("#! problem with line:", excpt) # name = "%20s %s %5s %5s" % (path2node, stamp, evt, meter) else: print type(item) return low, high def create_task(): """ add ecFlow task into a family...""" sys.path.append("/home/ma/emos/o/def") import ecf steps = 360 return ecf.Task("sweeper").add( ecf.Variables( ECF_FILES=os.getenv("ECF_FDI", None), ECF_EXTN=".py", ECF_JOB_CMD="ssh -x $WSHOST$ $ECF_JOB$ > $ECF_JOBOUT$ 2>&1 &", ECF_MICRO=MICRO[0], ), ecf.Label("info", ""), ecf.Defcomplete(), ecf.Meter("min", -1, steps, steps), ecf.Meter("max", -1, steps, steps),) def loader(test=1, host=os.getenv("ECF_TEST_HOST", None)): """ replace sweeper task in test/oper mode """ sys.path.append("/home/ma/emos/o/def") import ecf if test: env = {"host": os.getenv("ECF_TEST_HOST", None), "port": os.getenv("ECF_TEST_PORT", None), "path": "/admine/steps", } else: env = {"host": os.getenv("ECF_OPER_HOST", None), "port": os.getenv("ECF_OPER_PORT", None), "path": "/admin/steps", } defs = ecf.Defs() path = env["path"] sname = str(path.split('/')[1]) fname = str(path.split('/')[2]) suite = ecf.Suite(sname).add( ecf.Extern(MC_STEPS_LIST, defs), ecf.Family(fname).add(create_task().add( ecf.Cron("04:30 23:59 03:00"), ecf.Trigger("==active or ".join(MC_STEPS_LIST) + "==active"), ecf.Variables(WSHOST=host, QUEUE="test"), ))) defs.add_suite(suite) print "#MSG: replacing", path, env["host"], env["port"] ecf.Client(env["host"], env["port"]).replace(path, defs, 1, 1) class Sweep(object): """ find min/max meter value below a node """ def __init__(self, host=None, port=None, path=None, task=None): import signal if host is None: host = os.getenv("ECF_NODE", "$ECF_NODE$") if port is None: port = int(os.getenv("ECF_PORT", "$ECF_PORT$")) self.clt = ec.Client(host, port) self.cl2 = None if path is None: self.path = MC_STEPS_LIST else: self.path = (path) if task is None: self.task = TARGET_TASK else: self.task = task # shall make sense when processed into job by ecflow # name remains when not processed... ecfv = {"ECF_NODE": "$ECF_NODE$", "ECF_PASS": "$ECF_PASS$", "ECF_NAME": "$ECF_NAME$", "ECF_PORT": "$ECF_PORT$", "ECF_TRYNO": "$ECF_TRYNO$", } for key, val in ecfv.items(): if key in val: ecfv[key] = os.getenv(key, None) if ecfv["ECF_NODE"] and ecfv["ECF_NAME"]: print "#MSG will communicated with server..." print "#kill: ssh %s kill -15 %d" % (ecfv["ECF_NODE"], os.getpid()) self.cl2 = ec.Client() self.cl2.set_host_port(ecfv["ECF_NODE"], ecfv["ECF_PORT"]) self.cl2.set_child_pid(os.getpid()) self.cl2.set_child_path(ecfv["ECF_NAME"]) self.cl2.set_child_password(ecfv["ECF_PASS"]) self.cl2.set_child_try_no(int(ecfv["ECF_TRYNO"])) self.cl2.child_init() self.cl2.set_child_timeout(20) for sig in (signal.SIGINT, signal.SIGHUP, signal.SIGQUIT, signal.SIGILL, signal.SIGTRAP, signal.SIGIOT, signal.SIGBUS, signal.SIGFPE, signal.SIGUSR1, signal.SIGUSR2, signal.SIGPIPE, signal.SIGTERM, signal.SIGXCPU, signal.SIGPWR): signal.signal(sig, self.signal_handler) def signal_handler(self, signum, frame): """ catch signal """ print 'Aborting: Signal handler called with signal ', signum self.cl2.child_abort("Signal handler called with signal " + str(signum)) def report(self, msg, meter=None): """ communicate with ecFlow server """ if not self.cl2: return if meter: self.cl2.child_meter(msg, meter) elif msg == "stop": self.cl2.child_complete() sys.exit(0) else: self.cl2.child_label("info", msg) def update(self): """ refresh status tree asking ecFlow server """ num = 20 leg = "legA" for path in self.path: first = 1 low = 0 high = 0 self.clt.ch_register(False, [str(path.split('/')[1])]) if "/back" in path: path += "/%02d/%s" % (num, leg) while 1: clt = self.clt status = "" try: if clt.news_local() or first: # has the server changed clt.sync_local() first = 0 defs = clt.get_defs() node = defs.find_abs_node(path) if node: status = "%s" % node.get_state() low, high = process(node, self.path, 999, -1, self.task) sleep = 30 else: sleep = 90 msg = "... %ds %s %s %d %d"% (sleep, path, status, low, high) print msg if type(low) != type(high): print type(low), type(high) if status == "complete": if leg == "legB": num -= 1 leg = "legA" else: leg = "legB" break elif low == high and status in ( "queued", "aborted", "complete") or node is None: if node: msg = "# %s %s" % (node.get_abs_node_path(), status) print msg, self.report(msg) break else: self.report(msg) self.report("min", low) self.report("max", high) sys.stdout.flush() time.sleep(sleep) except RuntimeError, exp: print str(exp) def usage(): """ help """ print """client -o: operational node is to be replaced, default is test node, provide this option BEFORE -r -r: replace task node -p: path to look below, by default internal list MC_STEPS_LIST will be used -h: this help ECF_NODE=localhost ECF_PORT=31415 ./client.py --path /mc/main/18bc/legA/fc -e """ if __name__ == '__main__': try: OPTS, ARGS = getopt.getopt( sys.argv[1:], "hep:rot:", ["help", "ens", "path", "replace", "oper", "task"]) except getopt.GetoptError as err: print "# what?", usage() sys.exit(2) TEST = 1 PATH = None TASK = None for o, a in OPTS: if o in ("-e", "--ens", ): Sweep(path=PATH).update() sys.exit(0) elif o in ("-r", "--replace", ): loader(TEST) sys.exit(0) elif o in ("-p", "--path", ): PATH = a elif o in ("-o", "--oper", ): TEST = 0 elif o in ("-t", "--task", ): TASK = a CLT = Sweep(path=PATH, task=TASK) CLT.update() CLT.report("stop")
Sweeper is used in operation to start product generation earlier.
This page contains macros or features from a plugin which requires a valid license.
You will need to contact your administrator.
