- Created by Daniel Varela Santoalla, last modified by Axel Bonet on Jul 02, 2015
You are viewing an old version of this page. View the current version.
Compare with Current View Page History
« Previous Version 8 Current »
Here is an example of a Python Suite Definition, hosting a task that might be helpful for simple SMS suite translation to ecFlow:
:: cd ~map/course/201303/ecflow; python course.py
#!/usr/bin/env python """ suite builder example for ecFlow course """ import sys, os sys.path.append('/home/ma/emos/def/ecflow') import ecf as ec from ecf import * import inc_emos as ie # provides Seed class + system related dependencies # cd ~map/course/201303/ecflow; python course.py # task wrappers underneath # consume: choices for a family matching the producer-consumer pattern # local family + remote family + BEWARE ECF_OUT + log-server example # barber: an example of a "dynamical suite", with a "family producer task" # perl + python: example that task wrapper may not be ksh/bash scripts import time from datetime import date import argparse today = str(date.today()).replace('-', '') ############################################################################ class GenericFamily(object): """ provide structure for derived classes""" def make(self, node): return node def main(self, node): return BaseException # return node def arch(self, node): return node ############################################################################ class NativePerl(GenericFamily): """ ksh is not the only language for task wrappers""" def __init__(self): self.name = "perl" def main(self, node): tsk = Task(self.name).add( Variables(ECF_MICRO= "^", ECF_JOB_CMD= "^ECF_JOB^ > ^ECF_JOBOUT^ 2>&1"), Meter("step", -1, 100), Event("1"), Event("2"), Label("info", "none"), ) node.add(tsk) ############################################################################ class NativePython(NativePerl): """ ksh is not the only language for task wrappers""" def __init__(self): super(NativePerl, self).__init__() self.name = "python" ############################################################################ def _kind(prod=1, cons=1): return Variables(CONSUME= cons, PRODUCE=prod) def _evt(): return (Event("p"), Event("c")) def _leaf(name="produce", init=0, stop=100, step=1): add = None if type(stop) == int: add = Meter("step", -1, int(stop)) return Task("%s" % name).add( _evt(), add, Variables(INIT= init, STOP= stop, STEP= step),) ############################################################################ class FallBack(GenericFamily): """ in some situation, user may want its family to continue, and repeat to increment, even when some tasks abort """ def main(self, node=None): return Family("daily").repeat(name="YMD", start=today, end=DATE_STOP).add( Task("action").add(Time("10:00")), Family("loop").add( Time("11:00"), Task("dummy").add( TriggerImpossible(), Complete("1==1"))), Task("fallback").add( Label("info", "force complete when action is aborted"), Time("10:55"), Trigger("action eq aborted"), Complete("action eq complete"))) class DailyInc(GenericFamily): """ anopther method to have daily repeat increment, with aborted tasks""" def main(self, node=None): return Family("daily_inc").repeat(name="YMD", start=today, end=DATE_STOP).add( Label("info", "requeue will reset repeat attribute!"), Complete("daily_inc/loop/dummy eq complete"), Task("action").add(Time("10:00")), Family("loop").add( Time("11:00"), Task("dummy").add(TriggerImpossible(), Complete("1==1")))) class Consume(GenericFamily): """ producer-consumer pattern can be implemented in many ways""" def __init__(self): self.init = 0 self.stop = 48 self.step = 3 def main(self, node): """ pass the parent node, so that absolute paths can be used with triggers""" path = node.fullname() top = node.family("consume").add( Variables(SLEEP= 10, PRODUCE= 1, # default: tasks will do both CONSUME= 1), Family("limits").add(Defcomplete(), Limit("consume", 7)), Task("leader").add( Label("info", "set event to get produce1 leader"), Event("1"), # set/cleared by user Defcomplete()), # task does both, ie serial ########################### _leaf("produce", self.init, self.stop, self.step).add( Label("info", "both produce and consume in a task")), # meter will report about producer progress ########### Family("produce0").add( Label("info", "only produce"), _kind(1, 0), _leaf("produce", self.init, self.stop, self.step)), # serialy produced, create a new task for each step ### Family("produce1").add( _kind(1, 0), Label("info", "repeat, one job per step"), _leaf("produce", init="%STEP%", stop="%STEP%", step=1).add( Meter("step", -1, 100)))\ .repeat(kind="integer", name="STEP", start=self.init, end =self.stop, step =self.step).add() ) top.defstatus("suspended") fam = Family("produce2").add( # parallel _kind(1, 0), Label("info", "limited, one task per step, step by 3"), Limit("prod", 5), InLimit("produce2:prod")) top.add(fam) for step in xrange(self.init, self.stop, self.step): fam.add(Family("%02d" % step).add( Variables(STEP= step), _leaf("produce", step, step, 1))) ###################### lead = path + "/consume/leader:1" prod = path + "/consume/produce" top.add( ### trigger may be inside a task _leaf("consume", self.init, self.stop, self.step).add( Label("info", "trigger may be inside a task"), _kind(0, 1), InLimit("limits:consume"), Variables(CALL_WAITER= 1, SLEEP= 3, # sleep less than producer TRIGGER_EXPRESSION= prod + ":step ge $step or " + prod + " eq complete",)), Family("consume1").add( Label("info", "explicit trigger, follow faster"), _kind(0, 1), Trigger("(consume1:STEP lt %s1:STEP and %s) or " % (prod, lead) + "(consume1:STEP lt %s0/produce:step and not %s) or " % (prod, lead) + # lt while both are repeat "(%s1 eq complete and %s0 eq complete)" % (prod, prod) ), InLimit("limits:consume"), _leaf("consume", "%STEP%", "%STEP%", 1), ).repeat(kind="integer", name="STEP", start=self.init, end=self.stop, step=self.step)) fam = Family("consume2").add( # parallel Label("info", "one task per step, step by three"), _kind(0, 1), Limit("consume", 5), InLimit("consume2:consume")) top.add(fam) for step in xrange(self.init, self.stop, self.step): fam.add(Family("%02d" % step).add( Variables(STEP = step), Trigger("(%02d:STEP le %s1:STEP and %s) or " % (step, prod, lead) + "(%02d:STEP le %s0/produce:step and not %s)" % (step, prod, lead)), _leaf("consume", init=step, stop=step, step=1))) ############################################################################ class Barber(GenericFamily): """ a 'barber shop' example with families created by a task """ def _passer_by(self): """ generator """ return Task("passby").add( Time("00:00 23:59 00:05"), Variables(ID=0), Label("info", ""), Label("rem", "this task alters its variable ID, " + "aliases won't work natively"), InLimit("limits:passby")) def _client(self, node, position): """ python version of the family created initialy attention: raw definition file is located in passby task wrapper""" path = node.fullname() + "/limits" fam = node.family("list").family("%s" % position).add( AutoCancel(1), Task("cut").inlimit(path + ":barbers"), Task("pay").add( Trigger("cut eq complete"), InLimit(path + ":barbers"), InLimit(path + ":cashiers")), Task("leave").add( Label("info", ""), Trigger(["cut", "pay"]))) fam.defstatus("complete") def _shop(self, node): fam = node.family("shop").defstatus("suspended").add( Variables(NB_CHAIRS= 4), Family("limits").add(Defcomplete(), Limit("passby", 1), Limit("barbers", 2), Limit("cashiers", 1)), self._passer_by(), ) self._client(fam, 1), def main(self, node): self._shop(node) ############################################################################ def user(): return os.getlogin() def locate_scripts(): pwd = os.getcwd() return Variables( ECF_HOME= "/tmp/%s/ecflow/" % user(), # pwd, ECF_FILES= pwd + "/scripts", ECF_INCLUDE= pwd + "/include", ) DATE_STOP = 20300115 class Course(ie.Seed): """ host families together """ def __init__(self): super(Course, self).__init__() self.name = "course" def suite(self): """ define limits (empty) """ node = Suite(user()) node.defstatus("suspended").add( Variables(USER= user()), locate_scripts()) # self.top(node) fp = open("/tmp/%s/" % user() + self.name + ".def", "w") print >> fp, node return node def top(self, node): barber_shop = Barber() perl = NativePerl() python = NativePython() consume = Consume() with node.family(self.name) as node: node.add(FallBack().main(), DailyInc().main()) barber_shop.main(node) perl.main(node) python.main(node) consume.main(node) return node ############################################################################### class Admin(Course): """ host newlog task + logsvr start/stop/check task -- server logs can be renewed with a ecflowview menu command also """ def __init__(self): self.name = "admin" def top(self, node): with node.family("admin") as node: node.add(self.main()).repeat(name="YMD", start=today, end=DATE_STOP) node.defstatus("suspended") return node def main(self): """ return self contained Family/Task, without absolute node references or with relative path triggers""" remote_submit = "rsh -l %USER% %HOST% %ECF_JOB% > %ECF_JOBOUT% 2>&1" logpath = "/home/ma/map/course/201303/ecflow" return ( Task("newlog").add( Label("info", "renew server log-file"), Time("08:00")), Task("logsvr").add( Defcomplete(), Variables(HOST= "pikachu", ECF_LOGPORT=9316, ECF_LOGPATH= logpath, ECF_LOGMAP= logpath + ":" + logpath, ECF_JOB_CMD= remote_submit), Label("info", "(re)start the logsvr on HOST"), Time("08:00")), Family("loop").add( Time("08:30"), Family("dummy").add(# TriggerImpossible(), Complete("1==1")))) ############################################################################### class EcEvents(Admin): """ connecting to third party software as event generator to update a suite variable, and enable daily family run """ def top(self, node): node = node.family("ecevents") node.add( Label("info", "use web... menu"), Defcomplete(), Variables( URL= "http://eccmd.ecmwf.int:8090/#Mainpanel", ECF_URL_CMD= "${BROWSER:=firefox} -remote 'openURL(%URL%")) self.main(node) return node def main(self, node): for mode in ["list", "register", "delete", "register_all", "delete_all"]: added = None if "list" in mode: added = Label("regs", "") fam = Family(mode).add( Variables(MODE= mode), Task("ecjobs").add(Label("info", "none"), added)) node.add(fam) if "_all" in mode: fam.add(Variables(EVENT= "_all_")) elif mode in ("register", "delete"): fam.add(Variables(EVENT= "an12h000"), Label("info", "update EVENT variable")) event = "an00h000" node.family("ref").defstatus("complete").add( Task(event).add(Variables(YMD= today), Label("YMD", today))) node.family("user").family(event).repeat( name="YMD", start=today, end=DATE_STOP).add( Label("info", "extern cannot be used anymore for " + "intra-suite reference triggers"), Variables(SLEEP= 1), Trigger(event + ":YMD le %s/ref/%s:YMD" % (node.fullname(), event)), _leaf("consume")) return node ############################################################################### class SerialTask(object): """ add trigger on the previous task """ def __init__(self): self.prev = None def add(self, name): fam = Family(name).add( Variables(MODE= name), Task("to_ecflow")) if self.prev != None: fam.add(Trigger("./%s eq complete" % self.prev)) self.prev = name return fam class Reload(Admin): """ a simple task to download SMS content and translate it to ecFlow """ def top(self, node): node = node.family("reload") node.add( Label("info", "from sms to ecFlow"), Defcomplete(), Variables( URL= "https://software.ecmwf.int/wiki/display/ECFLOW/Home", ECF_URL_CMD= "${BROWSER:=firefox} -remote 'openURL(%URL%"), self.main()) return node def main(self, node=None): fam = Family("reload") serial = SerialTask() fam.add( serial.add("get").add( Variables( SMS_SUITE= "eras", # SMS_PROG= 314159, SMS_NODE= "localhost", # SMS_PROG= 314199, SMS_NODE= "marhaus", # eras SMS_PROG= 314197, SMS_NODE= "marhaus", # eras2 ),), serial.add("translate"), serial.add("edit"), serial.add("mail"), serial.add("load"), serial.add("bench").add(Defcomplete())) if node is not None: return node.add(fam) return fam ############################################################################ if __name__ == '__main__': import cli_proc, sys parser = argparse.ArgumentParser( description=DESC, formatter_class=argparse.RawDescriptionHelpFormatter) parser.add_argument("--host", default="localhost", help= "server hostname") parser.add_argument("--port", default="3141", help= "server port number") if len(sys.argv) > 1: suites = { "course": Course(), } argv = cli_proc.process(suites, compare=False) sys.exit(0) args = parser.parse_args() clt = ec.Client(args.host, args.port) try: clt.ping() except: clt = ec.Client("localhost", 1000 + os.geteuid()) try: clt.ping() except: clt = ec.Client("localhost", 31415) try: clt.ping() except: clt = ec.Client("localhost", 3199) defs = ec.ecflow.Defs() course = Course() suite = course.suite().add(Reload().main()) if 1: # enable/disable remote version of 'consume' rem = suite.family("remote").add( Task("mk_remote").add( Defcomplete(), Label("info", "do not forget to create directory structure" + "on remote host for job output creation"), ie.onws(host = "class01")), Variables(ECF_HOME= "/tmp/%s/ecflow" % user()), ie.onws(host = "class02")) course.top(rem) # print clt.stats() Admin().top(suite) EcEvents().top(suite) defs.add_suite(suite) path = "/%s" % user() # path = "/%s/course" % user() # path = "/%s/admin" % user() # path = "/%s/ecevents" % user() # print defs clt.replace(path, defs) sys.exit (0)
Some may benefit from the file ecf.py that shall be distributed with ecflow package to facilitate large suite definition.
#!/usr/bin/env python # This software is provided under the ECMWF standard software license agreement. """ a layer over raw ecflow api use 'export ECF_DEBUG_LEVEL=10' to remove warning message related to variables overwrite """ import pwd, os, unittest try: from ecflow import TimeSlot from ecflow import JobCreationCtrl as JobCreationCtrl from ecflow import ChildCmdType from ecflow import ZombieType, ZombieAttr, ZombieUserActionType except: print "# ecf.py cannot import few types" import ecflow ecflow.Ecf.set_debug_level(3) DEBUG = 0 # DECORATE = "ONLY_TRIGGER" # DECORATE = "NO_TRIGGER" # DECORATE = "ONLY_EVENT" # DECORATE = "NO_ATTRIBUTE" DECORATE = "ALL" USE_TIME = True USE_LATE = False USE_TRIGGER = True USE_LIMIT = True if DECORATE == "NO_TRIGGER": USE_TRIGGER = False USE_LIMIT = True USE_EVENT = True elif DECORATE == "ONLY_TRIGGER": USE_TRIGGER = True USE_LIMIT = False USE_EVENT = False elif DECORATE == "NO_ATTRIBUTE": USE_TRIGGER = False USE_LIMIT = False USE_EVENT = False elif DECORATE == "ONLY_EVENT": USE_TRIGGER = False USE_LIMIT = False USE_EVENT = True elif DECORATE == "ALL": USE_TRIGGER = True USE_LIMIT = True USE_EVENT = True else: raise BaseException def get_username(): return pwd.getpwuid( os.getuid() )[ 0 ] def get_uid(): return pwd.getpwnam(get_username()).pw_uid def translate(name, value=None): """ Translate from sms to ecflow """ import sms2ecf, sys def is_sms(): if sys.argv[-1] in ( # "localhost" "sms", "od3", "ode","pikachu","ibis","map", "od", "od2", ): sms2ecf.ECF_MODE = "sms" return True return sms2ecf.ECF_MODE == "sms" if is_sms(): sms2ecf.ECF_MODE = "sms" return sms2ecf.translate(name, value) return name, value class State: """ this class aims at affording a user the possibility to add Triggers as t1 = Task("t1") Task("ts").add(Trigger(t1 == COMPLETE)) SUBMITTED, ACTIVE, SUSPENDED, ABORTED, QUEUED, COMPLETE, UNKNOWN are instance of this class. """ def __init__(self, state): """ store the status """ self.state = str(state) def __str__(self): """ translate into string """ return "%s" % self.state def __eq__(self, arg): """ when == is used, we should care about task name starting with 0-9""" if type(arg) == str: add = "" if type(arg[0]) == int: add = "./" return add + arg + " == " + self.state elif isinstance(arg, ecflow.Node): return arg.get_abs_node_path() + " == " + self.state return False def __ne__(self, arg): """ aka != """ if type(arg) == str: add = "" if type(arg[0]) == int: add = "./" return add + arg + " != " + self.state elif isinstance(arg, ecflow.Node): return arg.get_abs_node_path() + " != " + self.state return False def value(self): """ return state """ return self.state def eval(self, node): """ return state """ return self.state == node.get_state() SUBMITTED = State("submitted") ACTIVE = State("active") SUSPENDED = State("suspended") ABORTED = State("aborted") QUEUED = State("queued") COMPLETE = State("complete") UNKNOWN = State("unknown") class Attribute(object): """ generic attribute to be attached to a node""" def add_to(self, node): """ use polymorphism to attach attribute to a node""" pass class Variable(Attribute, ecflow.Variable): """ filter variables to be attached to a node, may alter its name SMS-ECF""" def add_to(self, node): """ add_variable""" keyt, valt = translate(self.name(), self.value()) if "/tc1/emos_es" in valt: raise BaseException # FIXME node.add_variable(keyt, valt) class Label(Attribute, ecflow.Label): """ wrap around label""" def add_to(self, node): """ add_label""" node.add_label(self) class Meter(Attribute, ecflow.Meter): """ wrap around meter""" def add_to(self, node): """ add_meter""" node.add_meter(self) class Event(Attribute, ecflow.Event): """ wrap around event""" def add_to(self, node): """ add_event""" node.add_event(self) class InLimit(Attribute): """ a class to host a path for a limit silently ignore if USE_LIMIT is False, (in debug mode) """ def __init__(self, fullpath): self.data = None if USE_LIMIT: try: path, name = fullpath.split(":") except: name = fullpath; path = "" if name is None: raise BaseException self.data = ecflow.InLimit(name, path) self.path_ = path self.name_ = name def add_to(self, node): """ add_inlimit""" if USE_LIMIT and self.data is None: raise BaseException if USE_LIMIT and self.data is not None: node.add_inlimit(self.data) def value(self): """ get limit fullpath-name """ return self.path_ + ":" + self.name_ def name(self): """ get limit name """ return self.name_ class Inlimit(InLimit): def __init__(self, fullpath): super(Inlimit, self).__init__(fullpath) class Trigger(Attribute): """ add trigger (string, list of task names, or directly expression and: and'ed (True) or or'ed (False) unk: add or [name]==unknown for RD """ def __init__(self, expr, unk=False, anded=True): self.expr = None if expr is None: return if expr == "": return if type(expr) == str: # self.expr = ecflow.Expression(expr) self.expr = expr return if type(expr) == tuple: prep = list(expr) expr = prep if type(expr) == list: for index, name in enumerate(expr): if name == None: continue pre = "" if type(name) in (Node, Task, Family, Suite): fullname = name.fullname() name = fullname elif type(name) in (ecflow.Task, ecflow.Family): name = name.name() else: pass if name[0].isdigit(): pre = "./" # "0123456789": if ':' in name: item = pre + name else: item = pre + "%s == complete" % name if unk: item += " or %s%s==unknown" % (pre, name) else: pass if item is None: self.expr = None elif index == 0 or self.expr is None: self.expr = item elif item is None: return else: self.expr += " and %s" % item elif type(expr) in (ecflow.Expression, ecflow.PartExpression): self.expr = ecflow.Expression(str(item)) else: print type(expr) raise Exception("what? trigger?") def add_to(self, node): if not USE_TRIGGER: return if self.expr is None: return if node.get_trigger() is None: node.add_trigger(self.expr) else: node.add_part_trigger(ecflow.PartExpression(self.expr, True)) class TriggerAnd(Trigger): def __init__(self, expr, unk=False, anded=True): self.expr = expr if (not ":" in expr and not " eq " in expr and not "==" in expr): self.expr += " eq complete" def add_to(self, node): node.add_part_trigger(ecflow.PartExpression(self.expr, True)) class TriggerImpossible(Trigger): """ attribute to be added to node when it is not expected to run any task""" def __init__(self): """ add an 'impossible trigger', for a task not to run """ super(TriggerImpossible, self).__init__("1==0") class TriggerAlways(Trigger): """ attribute to be added to node when it is not expected to run any task""" def __init__(self): """ add an 'impossible trigger', for a task not to run """ super(TriggerAlways, self).__init__("1==1") class Complete(Trigger): """ class to host complete expression, added later to a node""" def __init__(self, expression, unk=False, anded=False): super(Complete, self).__init__(expression, unk, anded) def add_to(self, node): if USE_TRIGGER and self.expr is not None: node.add_complete(self.expr) class Clock(Attribute): """ wrapper to add clock """ def __init__(self, arg): if type(arg) == str: hybrid = "hybrid" in arg hhh, mmm, sss = [0, 0, 0] try: hhh, mmm, sss = arg.split(':') self.data = ecflow.Clock(hhh, mmm, sss, hybrid) except: self.data = ecflow.Clock(hybrid) else: self.data = ecflow.Clock(arg) def add_to(self, node): if type(node) != Suite: print "WAR: clock can only be attached to suite node, " print "WAR: clock is ignored" return node.add_clock(self.data) class AutoCancel(Attribute): """ wrapper to add time """ def __init__(self, arg): if type(arg) == str: hhh, mmm = arg.split(':') rel = '+' in arg self.data = ecflow.Autocancel(int(hhh), int(mmm), rel) else: self.data = ecflow.Autocancel(arg) def add_to(self, node): node.add_autocancel(self.data) class Time(Attribute): """ wrapper to add time """ def __init__(self, arg): self.data = arg def add_to(self, node): if USE_TIME and self.data is not None: node.add_time(self.data) class Today(Time): """ wrapper to add time """ def __init__(self, arg): self.data = arg def add_to(self, node): if USE_TIME and self.data is not None: node.add_today(self.data) class Cron(Time): """ wrapper to add time """ def __init__(self, bes, wdays=None, days=None, months=None): self.data = ecflow.Cron() if not ("-w" in bes or "-m" in bes or "-d" in bes): self.data.set_time_series(bes); return import argparse parser = argparse.ArgumentParser() parser.add_argument("-w", nargs='?', default=0, help= "weekdays") parser.add_argument("-d", nargs='?', default=0, help= "days") parser.add_argument("-m", nargs='?', default=0, help= "months") parser.add_argument("arg", type=str, help= "begin end step") parsed = parser.parse_args(bes.split()) if parsed.w: self.data.set_week_days([int(x) for x in parsed.w.split(',')]) if parsed.d: self.data.set_week_days([int(x) for x in parsed.d.split(',')]) if parsed.m: self.data.set_months([int(x) for x in parsed.m.split(',')]) self.data.set_time_series(parsed.arg) def add_to(self, node): if USE_TIME and self.data is not None: node.add_cron(self.data) else: print "#WAR: ignoring: %s" % self.data class Date(Time): """ wrapper to add date """ def __init__(self, arg, mask=False): super(Date, self).__init__(arg) self.mask = mask def add_to(self, node): if USE_TIME and self.data is not None: ### ??? FIX, emos avoids dates, datasvc would not if self.mask: node.add_variable("DATEMASK", self.data) else: ddd, mmm, yyy = self.data.split('.') if ddd == '*': ddd = 0 if mmm == '*': mmm = 0 if yyy == '*': yyy = 0 node.add_date(int(ddd), int(mmm), int(yyy)) # node.add_date(self.data) class Day(Date): """ wrapper to add day """ def add_to(self, node): if USE_TIME and self.data is not None: if isinstance(self.data, str): days = { "monday": ecflow.Days.monday, "sunday": ecflow.Days.sunday, "tuesday": ecflow.Days.tuesday, "wednesday": ecflow.Days.wednesday, "thursday": ecflow.Days.thursday, "saturday": ecflow.Days.saturday, "friday": ecflow.Days.friday, } # node.add_date(self.data) ### FIX node.add_variable("WEEKDAY", self.data) node.add_day(ecflow.Days(days[self.data])) else: node.add_day(ecflow.Days(self.data)) class Defcomplete(Attribute): """ wrapper to add defstatus complete """ def __init__(self): pass def add_to(self, node): node.defstatus("complete") class Defstatus(Defcomplete): """ add defstatus attribute""" def __init__(self, kind): if type(kind) == str: kinds = {"suspended": ecflow.DState.suspended, "aborted": ecflow.DState.aborted, "complete": ecflow.DState.complete, "active": ecflow.DState.active, "submitted": ecflow.DState.submitted, "unknown": ecflow.DState.unknown, "queued": ecflow.DState.queued, } self.data = kinds[kind] else: self.data = kind def add_to(self, node): node.add_defstatus(self.data) class DefcompleteIf(Defcomplete): """ wrapper to add conditional defstatus complete just change name to make it explicit """ def __init__(self, arg=True): # super(DefcompleteIf, self).__init__() self.data = arg def add_to(self, node): if self.data: node.defstatus("complete") # else: node.defstatus("queued") # in comment to prevent # overwrite when using multiple defcomplete class Limit(Attribute): """ wrapper to add limit """ # name = None; size = 1 def __init__(self, name=None, size=1): self.name = name self.size = size def add_to(self, node): if USE_LIMIT and self.name is not None: if type(self.name) is dict: for name, size in self.name.items(): node.add_limit(name, size) else: node.add_limit(self.name, self.size) class Late(Attribute): """ wrapper around late, to be add'ed to families and tasks """ def __init__(self, arg): self.data = None if not USE_LATE: if DEBUG: print "#MSG: late is disabled" return sub = False act = False com = False rel = False self.data = ecflow.Late() for item in arg.split(" "): if item == "-s": sub = True elif item == "-c": com = True elif item == "-a": act = True else: hour, mins = item.split(":") rel = "+" in hour if "+" in hour: hour= hour[1:] if sub: self._add_sub(hour, mins) elif com: self._add_com(hour, mins, rel) elif act: self._add_act(hour, mins) sub = False act = False com = False def _add_sub(self, hour, mins): """ submitted""" self.data.submitted(ecflow.TimeSlot(int(hour), int(mins))) def _add_com(self, hour, mins, rel): """ complete""" self.data.complete(ecflow.TimeSlot(int(hour), int(mins)), rel) def _add_act(self, hour, mins): """ active""" self.data.active(ecflow.TimeSlot(int(hour), int(mins))) def add_to(self, node): if USE_LATE and self.data is not None: node.add_late(self.data) class Variables(Attribute): """ dedicated class to enable variable addition with different syntax """ def _set_tvar(self, key, val): """ facilitate to load a ecflow suite to SMS, translating variable names""" keyt, valt = translate(str(key), str(val)) if self.data is None: self.data = Variable(keyt, valt) else: next = self.next self.next = Variables(keyt, valt, next) def __init__(self, __a=None, __b=None, __next=None, *args, **kwargs): self.data = None self.next = __next if len(args) > 0: if type(args) == list: for item in args.iteritems(): self._set_tvar(item.name(), item.value()) elif type(args) == tuple: for key, val in args.items(): self._set_tvar(key, val) else: raise BaseException() if len(kwargs) > 0: for key, val in kwargs.items(): self._set_tvar(key, val) if type(__a) == dict: for key, val in __a.items(): self._set_tvar(key, val) elif type(__a) == tuple: raise BaseException() # for key, val in __a.items(): self._set_tvar(key, val) elif type(__a) == list: raise BaseException() elif type(__a) == Variable: self.data = __a elif __a is not None and __b is not None: self._set_tvar(__a, __b) elif __a is None and __b is None: pass else: raise BaseException(__a, __b, __next, args, kwargs) def add_to(self, node): if self.data is not None: node.add_variable(self.data) edit = "%s" % self.data try: # FIXME Christian's request if "ECF_JOB_CMD" in edit: if "%WSHOST%" in edit: node.add_label("infopcmd", "WSHOST") elif "%SCHOST%" in edit: node.add_label("infopcmd", "SCHOST") elif "%HOST%" in edit: node.add_label("infopcmd", "HOST") else: node.add_label("infopcmd", edit) elif "edit WSHOST " in edit: node.add_label("infopws", edit.replace("edit WSHOST ", "")) elif "edit SCHOST " in edit: node.add_label("infopsc", edit.replace("edit SCHOST ", "")) elif "edit HOST " in edit: node.add_label("infophs", edit.replace("edit HOST ", "")) except: pass if self.next is not None: self.next.add_to(node) def add(self, what): raise baseException(what.fullname()) class Limits(Attribute): """ dedicated class to enable limits addition with different syntax """ def _set_tvar(self, key, val): """ append limits """ if self.data is None: self.data = ecflow.Limit(key, val) else: next = self.next self.next = Limits(key, val, next) def __init__(self, __a=None, __b=None, __next=None, *args, **kwargs): self.data = None self.next = __next if len(args) > 0: if type(args) == list: for item in args.iteritems(): self._set_tvar(item.name(), item.value()) elif type(args) == tuple: for key, val in args.items(): self._set_tvar(key, val) elif len(kwargs) > 0: for key, val in kwargs.items(): self._set_tvar(key, val) elif type(__a) == dict: for key in sorted(__a.iterkeys()): self._set_tvar(key, __a[key]) if __a is not None and __b is not None: self._set_tvar(__a, __b) def add_to(self, node): if self.data is not None: node.add_limit(self.data) if self.next is not None: self.next.add_to(node) class Repeat(Attribute): def __init__(self, name="YMD", start=20120101, end=21010101, step=1, kind="date"): if kind == "date": # print "# repeat", start, end, step, name, kind self.data = ecflow.RepeatDate(name, int(start), int(end), int(step)) elif "int" in kind: self.data = ecflow.RepeatInteger(name, int(start), int(end), int(step)) elif kind == "string": self.data = ecflow.RepeatString(name, start) elif "enum" in kind: self.data = ecflow.RepeatEnumerated(name, start) elif kind == "day": self.data = ecflow.RepeatDay(step) else: self.data = None def add_to(self, node): if self.data is not None: node.add_repeat(self.data) def If(test=True, then=None, otow=None): """ enable Task("t1").add(If(test= (1==1), then= Variables(ONE=1), otow= Variables(TWO=2))) appreciate that both branches are evaluated, using this If class ie there is no 'dead code' as it is with python language 'if' structure using If to distinguish od/rd mode request that both users share the variables (parameter.py) and ecf.py otow: on the other way? """ if test: return then return otow class Root(object): # from where Suite and Node derive """ generic tree node """ def __str__(self): if isinstance(self, ecflow.Node): return self.fullname() return str(self) def __eq__(self, node): if isinstance(self, ecflow.Node): return "%s == " % self.fullname() + str(node) return False def __ne__(self, node): if isinstance(self, ecflow.Node): return "%s != " % self.fullname() + str(node) return False def __and__(self, node): if isinstance(self, ecflow.Node): return "%s and " % self.fullname() + str(node) return False def __or__(self, node): if isinstance(self, ecflow.Node): return "%s or " % self.fullname() + str(node) return False def fullname(self): """ simple syntax """ if isinstance(self, ecflow.Node): return self.get_abs_node_path() return str(self) def repeat(self, name="YMD", start=20120101, end=20321212, step=1, kind="date"): """ add repeat attribute""" if kind == "date": self.add_repeat(ecflow.RepeatDate(name, int(start), int(end), int(step))) elif kind == "integer": self.add_repeat(ecflow.RepeatInteger(name, int(start), int(end), int(step))) elif kind == "string": self.add_repeat(ecflow.RepeatString(name, start)) elif kind == "enumerated": self.add_repeat(ecflow.RepeatEnumerated(name, start)) elif kind == "day": self.add_repeat(ecflow.RepeatDay(step)) else: raise BaseException return self def defstatus(self, kind): """ add defstatus attribute""" status = kind if type(kind) == str: kinds = {"suspended": ecflow.DState.suspended, "aborted": ecflow.DState.aborted, "complete": ecflow.DState.complete, "active": ecflow.DState.active, "submitted": ecflow.DState.submitted, "unknown": ecflow.DState.unknown, "queued": ecflow.DState.queued, } status = kinds[kind] self.add_defstatus(status) return self def add(self, item=None, *args): """ add a task, a family or an attribute """ if DEBUG: print self.fullname(), item, args if item is not None: if type(item) == tuple: for val in item: self.add(val) elif type(item) == list: for val in item: self.add(val) else: if DEBUG: if type(item) in (Task, Family): print item.fullname() try: item.add_to(self) except Exception, exc: print item raise BaseException("not yet", self, type(item), exc) if len(args) > 0: if type(args) == tuple: for val in args: self.add(val) elif type(args) == list: for val in args: self.add(val) else: raise BaseException() if not isinstance(self, ecflow.Node): raise BaseException( "you don't want that") return self def limit(self, name, size): """ add limit attribute""" if name is None: raise BaseException self.add_limit(name, size) return self def inlimit(self, full_path): """ add inlimit attribute""" if not USE_LIMIT: return self path, name = full_path.split(":") if name is None: raise BaseException() if path is None: raise BaseException() self.add_inlimit(name, path) return self class Node(Root): # from where Task and Family derive """ Node class is shared by family and task """ def add_limits(self, __a = None, __b = None, **kwargs): """ add limit dependency""" if isinstance(__a, basestring): self.add_limit(__a, __b) elif isinstance(__a, dict): assert __b is None for key, val in __a.items(): self.add_limit(key, val) for key, val in kwargs.items(): self.add_limit(key, val) return self def meter(self, name, start, end, threshold=None): """ add meter attribute""" if threshold == None: threshold = end self.add_meter(name, start, end, threshold) return self def label(self, name, default=""): """ add label attribute""" self.add_label(name, default) return self def event(self, name=1): """ add event attribute""" if USE_EVENT: self.add_event(name) return self def cron(self, time, dom=False, wdays=False, month=False): """ wrapper for add_cron """ cron = ecflow.Cron() cron.set_time_series(time) if wdays is not False: cron.set_week_days(wdays) if month is not False: cron.set_months(month) if dom is not False: cron.set_day_of_month(dom) self.add_cron(cron) return self def today(self, hhmm): """ wrapper around time """ self.time(hhmm) return self # ??? def time(self, hhmm): """ wrapper around time, None argument is silently ignored """ if hhmm is not None: self.add_time(hhmm) return self def trigger(self, arg): """ add trigger attribute""" if USE_TRIGGER and arg is not None: self.add_trigger(arg) return self def trigger_and(self, arg): """ append to existing trigger""" if USE_TRIGGER and arg is not None: self.add_part_trigger(ecflow.PartExpression(arg, True)) return self def trigger_or(self, arg): """ append to existing trigger""" if USE_TRIGGER and arg is not None: self.add_part_trigger(ecflow.PartExpression(arg, False)) return self def complete(self, arg): """ add complete attribute""" if USE_TRIGGER and arg is not None: self.add_complete(arg) return self def complete_and(self, arg): """ append to existing complete""" if USE_TRIGGER and arg is not None: self.add_part_complete(ecflow.PartExpression(arg, True)) return self def complete_or(self, arg): """ append to existing complete""" if USE_TRIGGER and arg is not None: self.add_part_complete(ecflow.PartExpression(arg, False)) return self def up(self): """ get parent, one level up""" return self.get_parent() class Defs(ecflow.Defs): """ wrapper for the definition """ def add(self, suite): """ add suite """ self.add_suite(suite) return suite def suite(self, name): """ add suite providing its name """ suite = Suite(name) self.add(suite) return suite class Client(ecflow.Client): """ wrapper around client """ def __init__(self, host="localhost", port="31415"): if "@" in host: host, port = host.split("@") # super(Client, self).__init__(host, int(port)) super(Client, self).__init__() self.set_host_port(host, int(port)) else: super(Client, self).__init__() self.set_host_port(host, int(port)) # super(Client, self).__init__(host, "%s" % port) self.host = host self.port = port def __str__(self): return "ecflow client %s@%s v%s" % (self.host, self.port, self.version()) class Suite(ecflow.Suite, Root): """ wrapper for a suite """ def family(self, name): """ add family """ fam = Family(name) self.add_family(fam) return fam def task(self, name): """ add family """ tsk = Task(name) self.add_task(tsk) return tsk # def __enter__(self): return self # def __exit__(self, *args): pass class Family(ecflow.Family, Node, Attribute): """ wrapper around family """ def family(self, name): """ add a family """ fam = Family(name) self.add_family(fam) return fam def task(self, name): """ add a task """ tsk = Task(name) self.add_task(tsk) return tsk def add_to(self, node): node.add_family(self) # def __enter__(self): return self # def __exit__(self, *args): pass class Task(ecflow.Task, Node, Attribute): """ wrapper around task """ def __setattr__(self, key, val): # assert key.isupper() if key.isupper(): key, val = translate(key, val) self.add_variable(key, val) def add_to(self, node): # self.add_label("infop", "def") node.add_task(self) def add_family(self, node): raise BaseException() def display(defs, fname=None): """ print defs""" if fname is None: pass # print defs else: fop = open(fname, "w") print >> fop, defs class TestEcf(unittest.TestCase): """ a test case """ def test_xxx(self): """ a test """ suite = Suite ("a_suite") suite.defstatus("suspended") fam = Family("a_family") tsk = Task("a_task") ft2 = Task("a_fam") ft2.add_to(fam) tsk.VAR = "VALUE" # edit VAR "VALUE" tsk.add(Late("-s 00:05 -c 01:00")) fam.add(tsk, (Task("1"), Task("2")), [Task("11"), Task("12")], Task("111"), Task("211"), Task("t2").add(Trigger(tsk == COMPLETE), Time("01:00")) ) fam.add(Task("t3").add( If(test= (1==1), then=Variables(ADD_ONE=1), otow=Variables(ADD_TWO=1)), If(test= (1==0), then=Variables(ADD_ONE=0), otow=Variables(ADD_TWO=0)), Trigger(tsk != ABORTED), Complete(tsk == COMPLETE))) # longer fam.add( Task("2t"), Task("t4").add( Trigger(tsk.name() != COMPLETE)), Late("-s 00:05 -c 01:00"), Variables(VAR="VALUE"), Task("t5").add(Trigger(["t4", "t3", "t2"])), Task("t6").add(Trigger("2t" == COMPLETE)), Task("t7").add(Trigger("2t eq complete")), ) tsk.add(Limit("a_limit", 10), InLimit("a_task:a_limit"), Meter("step", -1, 100), Label("info", "none"), Event(1), Event("a"), Defcomplete()) tsk.add(Variables({"A": "a", "B": "b"})) tsk.add(Variables(D="d", E="e")) tsk.add(Variables("C", "c")) suite.add(fam) fam.family("another").add(DefcompleteIf(True)) defs = Defs() defs.add(suite) another = defs.suite("another") another.defstatus("suspended") another.task("tn") afam = another.family("another_fam") afam.task("t2n") display(defs, fname="test_ecf.tmp") if __name__ == '__main__': unittest.main()
Also, a simple converter from expended definition file to a python file is hereby provided as an example (it is known to face intrinsic python language limitations in large complete suites case).
#!/usr/bin/env python # This software is provided under the ECMWF standard software license agreement. import sys try: import ecflow except: sys.path.append("/usr/local/apps/ecflow/current/lib/python2.7/site-packages/ecflow") import ecflow import ecflow as ec import sys """ a simple program to convert an expanded definition file to py script using ecf.py""" class Indent: """This class manages indentation, for use with context manager It is used to correctly indent the definition node tree hierarchy """ _pos = 0 _step = 3 def __init__(self): Indent._pos += Indent._step def __del__(self): Indent._pos -= Indent._step @classmethod def indent(cls, loc=''): for i in range(Indent._pos): if loc is None: print ' ' elif type(loc) == str: loc += ' ' else: loc.write(' ') if type(loc) == str: return loc def adds(line=0): if line is None: return "" return Indent.indent() + line + "\n" def add(line, echo=0): if line is None: return elif echo: print Indent.indent() + line else: return Indent.indent() + line + "\n" class DefFormat(object): def __init__(self, defs): self.defs = defs def process_attr(self, node, end=False): res = "" ind = Indent() defstatus = node.get_defstatus() if defstatus: if defstatus != ec.DState.queued: res += add("Defstatus('%s')," % defstatus) item = node.get_autocancel() if item: line = "%s" % item line = line.replace("autocancel ", "") res += add("Autocancel('%s')," % line) item = node.get_repeat() if not item.empty(): line = "%s" % item full = line.split() kind = full[1] name = full[2] try: beg = full[3] try: end = full[4] except: end = beg if "#" in end: end = beg except: beg = 1; end = 1 by = 1 if len(full) == 6: by = full[5] if kind in ("integer", "date"): res += add( "Repeat(kind='%s', name='%s', start=%s, end=%s, step=%s)," % ( kind, name, beg, end, by)) elif kind in ("day", ): res += add( "Repeat(kind='%s', name='%s', step=%s)," % ( kind, name, by)) elif kind in ("string", "enumerated"): line = "%s" % item line = line.replace("repeat %s %s" % (kind, name), "") line.replace('"', '') res += add("Repeat(kind='%s', name='%s', start=%s)," % ( kind, name, line.split())) # FIXME string enum item = node.get_late() if item: line = "%s" % item line = line.replace("late ", "") res += add("Late('%s')," % line) item = node.get_complete() if item: res += add("Complete('%s')," % item) item = node.get_trigger() if item: res += add("Trigger('%s')," % item) for item in node.meters: line = "%s" % item dummy, name, beg, end, thr = line.split(" ") res += add("Meter('%s', %s, %s, %s)," % (name, beg, end, thr)) for item in node.events: line = "%s" % item line = line.replace("event ", "").strip() res += add("Event('%s')," % line) for item in node.labels: res += add("Label('%s', '%s')," % (item.name(), item.value())) for item in node.limits: val = item.value() if val == "0" or val == 0: val = "1" res += add("Limit('%s', %d)," % ( item.name(), int(val))) for item in node.inlimits: line = "%s" % item line = line.replace("inlimit ", "") res += add("InLimit('%s')," % line) for item in node.times: line = "%s" % item line = line.replace("time ", "") res += add("Time('%s')," % line) for item in node.todays: line = "%s" % item line = line.replace("today ", "") res += add("Today('%s')," % line) for item in node.dates: line = "%s" % item line = line.replace("date ", "") res += add("Date('%s')," % line) for item in node.days: line = "%s" % item line = line.replace("day ", "") res += add("Day('%s')," % item) for item in node.crons: line = "%s" % item line = line.replace("cron ", "") res += add("Cron('%s')," % line) var = "Variables(" vind = Indent() num = 0 for item in node.variables: sep="'" if sep in item.value(): sep="\"" var += ind.indent("\n") + item.name() + \ "= %s%s%s," % (sep, item.value(), sep) num += 1 del vind if num: res += add(var + "),") del ind if len(res) == 0: return None return res def process(self, node=None, inc=0): STREAM = 1 if node is None: num = 0 print "# ecf.ECF_MODE = 'sms'" print "suites = []" for suite in self.defs.suites: if STREAM: print "suite%d = Suite('%s').add(" % (num, suite.name()) else: print "s = Suite('%s')"% (suite.name()) + "; suites.append(s);" self.process(suite) print ")" num += 1 if num > 1: if 0: raise BaseException("no more than one suite at a time") print "### WARNING: more than one suite defined" elif isinstance(node, ec.Suite): if not STREAM: print "s.add(\n" add( self.process_attr(node),1 ) if STREAM: ind = Indent() for kid in node.nodes: self.process(kid) if STREAM: del ind elif isinstance(node, ec.Family): many = ""; one = ""; post = "" # circumvent limitation to 255 items if inc % 200 == 0: one += "("; post = ")," if inc > 200: many = ")," if STREAM: add( many + one + "Family('%s').add(" % node.name(), 1) res = self.process_attr(node) else: print ")\ncur = Family('%s')" % node.name() print "fam.add(cur); fam = cur; fam.add(" res = self.process_attr(node) if STREAM: if res is not None: print Indent.indent(res) ind = Indent() num = 0 for kid in node.nodes: self.process(kid, num) ; num += 1 if STREAM: del ind add( Indent.indent(post + "), # endfamily %s" % node.name()), 1 ) print "# ", num elif isinstance(node, ec.Task): res = self.process_attr(node) if res is None: add( "Task('%s')," % node.name(), 1) else: add( "Task('%s').add(\n" % node.name() + "%s%s)," % (res, Indent.indent()), 1) def main(self): self.header() self.process() self.footer() def header(self): print """#!/usr/bin/env python # NOTICE: this file was originally created with def2def.py import sys # sys.path.append('../o/def') # sys.path.append('.') sys.path.append('/home/ma/emos/def/o/def') from ecf import * import ecf as ecf # import inc_emos as ie """ def footer(self): print """ if __name__ == '__main__': defs = Defs() defs.add(suite0); defs.auto_add_externs(True) if 0: import cli_proc, ecf cli_proc.process(ie.Seed(defs), compare=False) else: port = 1500 + int(get_uid()) # when started with ecflow_start.sh # ECMWF if 0: # test job creation job_ctrl = ecflow.JobCreationCtrl() defs.check_job_creation(job_ctrl) print "loading on localhost@%s" % port client = ecf.Client("localhost", port) client.replace("/%s" % suite0.name(), defs) """ if __name__ == '__main__': filename = sys.argv[1] defs = ec.Defs(filename) defs.auto_add_externs(True) if 0: print defs DefFormat(defs).main()
- No labels