- Created by Axel Bonet, last modified by Dominique Lucas on Feb 04, 2016
You are viewing an old version of this page. View the current version.
Compare with Current View Page History
« Previous Version 4 Next »
An ecFlow Python Client can be used to solve a Fault Tolerance request, for example, when three out of five families or tasks are enough to carry on submitting new jobs.
ecflow3of5.py Expand source
#!/usr/bin/env python """ ./ecflow3of5.py 63 /e_41r2/main/12/prod """ import ecflow import os import sys import time host = os.getenv("ECF_HOST", "localhost") port = os.getenv("ECF_PORT", 31415) client = ecflow.Client(host, port) wait = False; wait = True interval = 30 outof5 = int(sys.argv[1]) node_path = sys.argv[2] def stop(msg, num): print msg; sys.exit(num) while 1: tot = 0 count = 0 done = True client.sync_local() node = client.get_defs().find_abs_node(node_path) if node is None: stop("node not found!!!", 1) for item in node.nodes: count += 1 status = "%s" % item.get_state() # print item.get_abs_node_path(), status, outof5, tot if status == "complete": tot += 1 if tot >= outof5: stop("# OK", 0) elif status == "aborted": pass else: done = False if count < outof5: stop("# Impossible: %d < %d" % (count, outof5), 1) if done: stop("# KO %d" % tot, 1) print "# still possible", if wait: print "...", tot, outof5, count time.sleep(interval); else: stop("", -1)
Such client can be used as an ecFlow task wrapper, with few more lines.
tasked client Expand source
#!/usr/bin/env python """ an example for a simple python client to help ecFlow scheduling + unit test + task loading in the suite $manual example $end $comment a comment in a comment $end """ import ecflow import getopt import os import sys import time import unittest sys.path.append("/home/ma/emos/def/o/def") # ecf can be imported ... MICRO = "$$" # keep ecFlow pleased with micro character balance def usage(): print sys.argv[0], """ -n <num> -p <path_to_node> -h # help -i <number> # sleep interval for sleep mode -t # unit test -r # load the task definition in a suite, associated with -p option -w # activate sleep mode """ for num, val in enumerate(sys.argv): print num, val sys.exit(2) def stop(msg, num, child=None): print msg; if child: child.report(msg) if num == 0: child.report("stop") sys.stdout.flush() sys.stderr.flush() sys.exit(num) class Child(object): """ /var/tmp/map/work/p4/metapps/suites/o/admin/ecflow3of5_prod.py -i 30 -w -p /test3outof5/fam -n 3 """ def __init__(self): import signal env = { "ECF_NODE": "$ECF_NODE$", # check can be on a server, child on another... "ECF_PASS": "$ECF_PASS$", "ECF_NAME": "$ECF_NAME$", "ECF_PORT": "$ECF_PORT$", "ECF_TRYNO": "$ECF_TRYNO$", } self.client = None if MICRO[0] in env["ECF_PORT"]: print "# cli mode" return print "#MSG: will communicate with server..." print "#MSG: kill: ssh %s kill -15 %d" % (os.getenv("HOST","localhost"), os.getpid()) self.client = ecflow.Client() self.client.set_host_port(env["ECF_NODE"], int(env["ECF_PORT"])) self.client.set_child_pid(os.getpid()) self.client.set_child_path(env["ECF_NAME"]) self.client.set_child_password(env["ECF_PASS"]) self.client.set_child_try_no(int(env["ECF_TRYNO"])) self.client.child_init() self.client.set_child_timeout(20) for sig in (signal.SIGINT, signal.SIGHUP, signal.SIGQUIT, signal.SIGILL, signal.SIGTRAP, signal.SIGIOT, signal.SIGBUS, signal.SIGFPE, signal.SIGUSR1, signal.SIGUSR2, signal.SIGPIPE, signal.SIGTERM, signal.SIGXCPU, signal.SIGPWR): signal.signal(sig, self.signal_handler) def signal_handler(self, signum, frame): """ catch signal """ print 'Aborting: Signal handler called with signal ', signum if self.client: self.client.child_abort("Signal handler called with signal " + str(signum)) def __exit__(self, exc_type, exc_value, traceback): if self.client: self.client.child_abort() def report(self, msg, meter=None): """ communicate with ecFlow server """ if not self.client: return elif meter: self.client.child_meter(msg, meter) elif msg == "stop": self.client.child_complete() self.client = None else: self.client.child_label("info", msg) def check3of5(outof5, node_path, wait=True, interval=30): host = os.getenv("ECF_HOST", "localhost") port = os.getenv("ECF_PORT", 31415) client = ecflow.Client(host, port) client.ch_register(False, [ str(node_path.split('/')[1]) ]) child = Child() # choice? might be global variable ??? # raise BaseException() while 1: tot = 0 count = 0 done = True client.sync_local() node = client.get_defs().find_abs_node(node_path) if node is None: stop("node not found!!!", 1, child) for item in node.nodes: count += 1 status = "%s" % item.get_state() # print item.get_abs_node_path(), status, outof5, tot if status == "complete": tot += 1 if tot >= outof5: stop("# OK", 0, child) elif status == "aborted": pass else: done = False if count < outof5: stop("# Impossible %d %d" % (count, outof5), 1, child) if done: stop("# KO %d %d %d" % (tot, outof5, count), 1, child) print "# still possible", if wait: print "...", tot, outof5, count child.report("... %d %d %d" % (tot, outof5, count)) time.sleep(interval); else: stop("", -1, child) def task_check(): from ecf import Task, Label, Variables pwd = os.getcwd() name = sys.argv[0] if '/' in name: name = name.split('/')[-1] if '.' in name: name = name.split('.')[0] return Task(name).add( Label("info", "this task uses a python-script directly, no job..."), Variables(ECF_MICRO = MICRO[0], ECF_FILES= pwd, ECF_HOME= pwd, ECF_EXTN= ".py", ECF_PASS= "FREE", # ECF_JOB_CMD= pwd + "/%s -i $INTERVAL$ -w" % name # ECF_JOB_CMD= "python $ECF_JOB$ -i $INTERVAL$ -w" ECF_JOB_CMD= "python $ECF_JOB$ -w" + " -n 3 -p /$SUITE$/fam" + " > $ECF_JOBOUT$ 2>&1", INTERVAL=30, ), ) class Test3outof5(unittest.TestCase): """ a test case """ def test_1(self, test_ok=1): """ a test """ import ecf from ecf import Task, Label, Variables, Suite, Defs, Family, Trigger defs = Defs() sname = "test3outof5" cargo = task_check() defs.add_suite( Suite(sname).add( # ecf.Defstatus("suspended"), Family("fam").add( ecf.Defstatus("suspended"), [ Task(name) for name in ("a", "b", "c", "d", "e") ]), cargo.add(Trigger("fam != queued")), # trigger as string Family("user").add( # trigger with python == and obj: Trigger(cargo.name() == ecf.COMPLETE), Task("dummy").add( ecf.Complete("1==1")) ) ) ) client = replace("/%s" % sname, defs) client.begin_suite(sname) client.force_state("/%s/fam/a" % sname, ecflow.State.complete) client.force_state("/%s/fam/b" % sname, ecflow.State.aborted) client.force_state("/%s/fam/c" % sname, ecflow.State.complete) if test_ok: client.force_state("/%s/fam/d" % sname, ecflow.State.active) else: client.force_state("/%s/fam/d" % sname, ecflow.State.aborted) if test_ok: client.force_state("/%s/fam/e" % sname, ecflow.State.complete) else: client.force_state("/%s/fam/e" % sname, ecflow.State.aborted) # def test_2(self): self.test_1(0) def replace(path, defs=None): import ecf host = os.getenv("ECF_HOST", "localhost") port = os.getenv("ECF_PORT", 31415) client = ecf.Client(host, port) # ecf.Client(host, port).replace(path, defs, 1, 1) client.replace(path, defs, 1, 1) return client if __name__ == '__main__': try: OPTS, ARGS = getopt.getopt( sys.argv[1:], "hi:n:p:rtw", ["help", "interval", "number", "path", "replace", "test", "wait"]) except getopt.GetoptError as err: print "#what?", usage() INTERVAL = 30 WAIT = False PATH = None NUM = None REPLACE = False for o, a in OPTS: if o in ("-h", "--help"): usage() elif o in ("-i", "--interval"): INTERVAL = int(a) elif o in ("-n", "--number"): NUM = int(a) elif o in ("-p", "--path"): PATH = a elif o in ("-r", "--replace"): REPLACE = False elif o in ("-t", "--test"): unittest.main(argv=[sys.argv[0]]) sys.exit(0) elif o in ("-w", "--wait"): WAIT = True if NUM and PATH: check3of5(NUM, PATH, WAIT, INTERVAL) elif PATH and REPLACE: replace(PATH) else: usage()
This page contains macros or features from a plugin which requires a valid license.
You will need to contact your administrator.
- No labels