- Created by Axel Bonet on Dec 03, 2015
You are viewing an old version of this page. View the current version.
Compare with Current View Page History
Version 1 Next »
Few questions were about starting a project from scratch, creating a "template suite", or a suite skeleton, where to define tasks targeting one destination or another at the centre.It generates a simple task wrapper and link the trap.h from emos.
It is also common to design a suite with several families working at different frequency (daily, monthly, yearly). This template provides such structure as an example.
Another expectation from this example is to show that "action" attribute was not preserved with ecFlow, while it can be simply replaced with a one liner task. This use case is not advocated: it can be attractive for simple "one liner" command, but error message may not be clear, would there be any, "when things go wrong"
#!/usr/local/apps/python/current/bin/python import getopt import os import pwd # import sh import sys sys.path.append("/opt/cray/sdb/1.0-1.0502.50558.6.21.ari/lib64/py") def get_username(): return pwd.getpwuid( os.getuid() )[ 0 ] def get_uid(): return pwd.getpwnam(get_username()).pw_uid """ python docstring: demonstrate python cli + ecFlow task wrapper + task loader $manual DESCRIPTION: sweeper: find out min/max among modeleps_nemo tasks OPERATORS: please, set complete if problematic ANALYST: example as pure python task - job $end $comment comments can be added ... $end """ PATH = "/usr/local/apps/ecflow/4.0.9/lib/python2.7/site-packages/ecflow" sys.path.insert(0, PATH) import ecflow as ec MICRO = "$$" # double dollar to please ecFlow micro character balance task_content = """#!/bin/ksh #!/bin/bash # NOK while typeset -Z2 may be found... if [[ var = @(v1) ]]; then ... %include <trap.h> inc=%ECF_HOME%/%SUITE%/%FAMILY%/preproc.job1 if [[ -f $inc ]]; then . $inc # deterministic runs are lost: \rm -f $inc || : fi xevent 1 i=0; while ((i < 100)) ; do xmeter step $i; ((i+=1)); done xlabel info "done" trap 0; xcomplete; exit 0 """ inc_content = """# request from Blazej? xevent 2 xlabel info "%SUITE% %TASK%" """ def create_wrapper(name, content): wrapper = open(name, "w") print >>wrapper, content wrapper.close() def loader(test=1, # host=os.getenv("ECF_TEST_HOST", None), host=None, port=None, path=None): """ replace sweeper task in test/oper mode """ sys.path.append("/home/ma/emos/def/o/def") import ecf from ecf import Suite, Family, Task, Variables, Trigger, Complete, Repeat, Defstatus, Event, Meter, Label, Limit, Inlimit if test: env = {"host": os.getenv("ECF_TEST_HOST", None), "port": os.getenv("ECF_TEST_PORT", None), "path": '/' + SUITE + "/submit", } else: env = {"host": os.getenv("ECF_OPER_HOST", None), "port": os.getenv("ECF_OPER_PORT", None), "path": '/' + SUITE + "/submit", } defs = ecf.Defs() if host: env["host"] = host if port: env["port"] = port if env["host"] != "eurus": print host, env["host"]; raise if path is None: path = env["path"] try: sname = str(path.split('/')[1]) except: sname = "suite" try: fname = str(path.split('/')[2]) except: fname = "submit" user = get_username() submit = "/home/ma/emos/bin/trimurti.41r2" rid = submit + " %USER% %HOST% %ECF_RID:0% %ECF_JOB% $ECF_JOBOUT% " ecf_home = pwd.getpwnam(user).pw_dir + "/ecflow_server" ecf_files = ecf_home + "/smsfiles" ecf_include = ecf_home + "/include" client=" ECF_PASS=%ECF_PASS% ECF_NAME=%ECF_NAME% ecflow_client " hosts = ["cca", "ccb", "cct", "ecgb", "lxc", "opensuse131", "localhost"] suite = ecf.Suite(sname).add( Defstatus("suspended"), Variables(ECF_JOB_CMD= submit + " %USER% %HOST% %ECF_JOB% %ECF_JOBOUT%", ECF_KILL_CMD= rid + "kill", ECF_STATUS_CMD= rid + "status", ECF_CHECK_CMD= rid + "check", USER = user, ECF_HOME= ecf_home, ECF_FILES= ecf_files, ECF_INCLUDE= ecf_include, ECF_EXTN= ".sms", HOST="localhost", LOGDIR= "%ECF_HOME%", SMSOUT= "%ECF_HOME%", # trap.h ), Family("limits").add(Defstatus("complete"), Label("memo", "use click-mouse-3-Edit to leave a message"), Label("begin", "Edit-Pref-Admin + suite-node-click3-Begin"), Label("example", "four cases: no job, no submit - no job but submit - job created but no submit - job + submit"), Limit("tasks", 10)), Inlimit("/%s/limits:tasks" % sname), Task("dummy").add(Variables(ECF_DUMMY_TASK= 1)), Family("ssh_login").add( Label("one_liner", "dummy script, no need for job, execute ECF_JOB_CMD"), Repeat("HOST", start=hosts, kind="enumerated"), Variables(ECF_JOB_CMD= client + "--init;" + # "xterm -T %HOST% -e ssh %HOST%;" + "ssh %HOST% pwd;" + client + "--complete;"), Task("simple"), ), Family("preprocess").add( Task("preproc").add( Variables(ECF_JOB_CMD= client + " --init;" + client + "--complete")), Task("simple").add( Label("info", ""), Event(2), Trigger("preproc eq complete")), ), Family("submit").add( Trigger(["ssh_login", ]), [ Family(host).add( Variables(HOST= host), Task("simple").add(Event(1), Meter("step", -1, 100, 90), Label("info", "")), ) for host in hosts ] ), Family("process").add( Label("info", "outer-follower-family + inner-run-sometime example"), Label("stop", "at first abort"), Trigger("./process ne aborted"), Family("daily").add( Repeat("YMD", 20160101, 20321212, kind="date"), Task("simple"), Family("decade").add( Label("info", "Show-Icons-Complete"), Complete("../daily:YMD % 10 ne 0"), Task("simple")), ), Family("monthly").add( Trigger("monthly:YM lt daily:YMD / 100 or daily eq complete"), Repeat(name="YM", start= [ "%d" % ( YM) for YM in range(201601, 2013212+1) if (YM % 100) < 13], kind="enum"), Task("simple"), Family("odd").add( Complete("../monthly:YM % 2 eq 0"), Task("simple")), ), Family("yearly").add( Repeat("Y", 2016, 2032, kind="integer"), Trigger("yearly:Y lt daily:YMD / 10000 or daily eq complete"), Task("simple"), Family("decade").add( Complete("../yearly:Y % 10 ne 0"), Task("simple")), Family("century").add( Complete("../yearly:Y % 100 ne 0"), Task("simple")), ) ) ) # sh.mkdir(ecf_files); sh.mkdir( ecf_include); sh.ln("-sf ~emos/def/o/trap.h %s" % ecf_include) os.system("" + "mkdir %s;" % ecf_files + "mkdir %s;" % ecf_include + "ln -sf ~emos/def/o/include/trap.h %s;" % ecf_include # trap + init + lxop .running + "touch %s/rcp.h;" % ecf_include + "touch %s/env_setup.h;" % ecf_include # +"ln -sf ~emos/def/o/include/env_setup.h %s;" % ecf_include # link PBS job to .running # +"ln -sf ~emos/def/o/include/rcp.h %s;" % ecf_include # rapatriate remote job@completion ) create_wrapper(ecf_files + "/simple.sms", task_content) create_wrapper(ecf_files + "/preproc.sms", inc_content) defs.add_suite(suite) print "#MSG: replacing", path, env["host"], env["port"] ecf.Client(env["host"], env["port"]).replace(path, defs, 1, 1) class Seed(object): """ find min/max meter value below a node """ def __init__(self, host=None, port=None, path=None, task=None): import signal if host is None: host = os.getenv("ECF_NODE", "$ECF_NODE$") if port is None and not MICRO[0] in host: try: port = int(os.getenv("ECF_PORT", "$ECF_PORT$")) except: port = 31415 if host and port: self.clt = ec.Client(host, port) else: self.clt = None self.cl2 = None if path is None: if "$SUITE$" != "/admin" and not "SUITE" in "$SUITE$": self.path = ( "/$SUITE$/$FAMILY$/%s" % legs[0], "/$SUITE$/$FAMILY$/%s" % legs[1], ) else: self.path = MC_STEPS_LIST else: self.path = (path, ) if task is None: self.task = TARGET_TASK else: self.task = task # shall make sense when processed into job by ecflow # name remains when not processed... ecfv = {"ECF_NODE": "$ECF_NODE$", "ECF_PASS": "$ECF_PASS$", "ECF_NAME": "$ECF_NAME$", "ECF_PORT": "$ECF_PORT$", "ECF_TRYNO": "$ECF_TRYNO$", } for key, val in ecfv.items(): if key in val: ecfv[key] = os.getenv(key, None) if ecfv["ECF_NODE"] and ecfv["ECF_NAME"]: print "#MSG will communicate with server..." print "#kill: ssh %s kill -15 %d" % (ecfv["ECF_NODE"], os.getpid()) self.cl2 = ec.Client() self.cl2.set_host_port(ecfv["ECF_NODE"], ecfv["ECF_PORT"]) self.cl2.set_child_pid(os.getpid()) self.cl2.set_child_path(ecfv["ECF_NAME"]) self.cl2.set_child_password(ecfv["ECF_PASS"]) self.cl2.set_child_try_no(int(ecfv["ECF_TRYNO"])) self.cl2.child_init() self.cl2.set_child_timeout(20) for sig in (signal.SIGINT, signal.SIGHUP, signal.SIGQUIT, signal.SIGILL, signal.SIGTRAP, signal.SIGIOT, signal.SIGBUS, signal.SIGFPE, signal.SIGUSR1, signal.SIGUSR2, signal.SIGPIPE, signal.SIGTERM, signal.SIGXCPU, signal.SIGPWR): signal.signal(sig, self.signal_handler) def signal_handler(self, signum, frame): """ catch signal """ print 'Aborting: Signal handler called with signal ', signum self.cl2.child_abort("Signal handler called with signal " + str(signum)) def report(self, msg, meter=None): """ communicate with ecFlow server """ if not self.cl2: return if meter: self.cl2.child_meter(msg, meter) elif msg == "stop": self.cl2.child_complete() sys.exit(0) else: self.cl2.child_label("info", msg) def usage(): """ help """ print """client -o: operational node is to be replaced, default is test node, provide this option BEFORE -r -r: replace task node -h: this help ECF_NODE=localhost ECF_PORT=31415 ./ecflow_suite.py -s start """ if __name__ == '__main__': try: OPTS, ARGS = getopt.getopt( sys.argv[1:], "hros:t:n:p:", ["help", "replace", "oper", "task", "suite", "node", "port"]) except getopt.GetoptError as err: print "# what?", usage() sys.exit(2) TEST = 1 PATH = None TASK = None SUITE = "start" PORT = None NODE = None for o, a in OPTS: if o in ("-r", "--replace", ): loader(TEST, host=NODE, port=PORT, path=TASK) sys.exit(0) # elif o in ("-p", "--path", ): PATH = a elif o in ("-o", "--oper", ): TEST = 0 elif o in ("-t", "--task", ): TASK = a elif o in ("-s", "--suite", ): SUITE = a elif o in ("-n", "--node", ): NODE = a elif o in ("-p", "--port", ): PORT = a """ python ecflow_suite.py -s suite -p 1630 -n eurus -r """
This script can be run from emos def:
python /home/ma/emos/def/o/admin/ecflow_suite.py -s suite -p $(($(id -u) + 1500)) -n localhost -r
You will need to contact your administrator.
- No labels