#!/usr/bin/env python
""" an example for a simple python client to help ecFlow scheduling
+ unit test
+ task loading in the suite
$manual
example
$end
$comment
a comment in a comment
$end
"""
import ecflow
import getopt
import os
import sys
import time
import unittest
sys.path.append("/home/ma/emos/def/o/def") # ecf can be imported ...
MICRO = "$$" # keep ecFlow pleased with micro character balance
def usage():
print sys.argv[0], """ -n <num> -p <path_to_node>
-h # help
-i <number> # sleep interval for sleep mode
-t # unit test
-r # load the task definition in a suite, associated with -p option
-w # activate sleep mode
"""
for num, val in enumerate(sys.argv):
print num, val
sys.exit(2)
def stop(msg, num, child=None):
print msg;
if child:
child.report(msg)
if num == 0: child.report("stop")
sys.stdout.flush()
sys.stderr.flush()
sys.exit(num)
class Child(object):
"""
/var/tmp/map/work/p4/metapps/suites/o/admin/ecflow3of5_prod.py -i 30 -w -p /test3outof5/fam -n 3
"""
def __init__(self):
import signal
env = {
"ECF_NODE": "$ECF_NODE$", # check can be on a server, child on another...
"ECF_PASS": "$ECF_PASS$",
"ECF_NAME": "$ECF_NAME$",
"ECF_PORT": "$ECF_PORT$",
"ECF_TRYNO": "$ECF_TRYNO$", }
self.client = None
if MICRO[0] in env["ECF_PORT"]:
print "# cli mode"
return
print "#MSG: will communicate with server..."
print "#MSG: kill: ssh %s kill -15 %d" % (os.getenv("HOST","localhost"),
os.getpid())
self.client = ecflow.Client()
self.client.set_host_port(env["ECF_NODE"], int(env["ECF_PORT"]))
self.client.set_child_pid(os.getpid())
self.client.set_child_path(env["ECF_NAME"])
self.client.set_child_password(env["ECF_PASS"])
self.client.set_child_try_no(int(env["ECF_TRYNO"]))
self.client.child_init()
self.client.set_child_timeout(20)
for sig in (signal.SIGINT,
signal.SIGHUP,
signal.SIGQUIT,
signal.SIGILL,
signal.SIGTRAP,
signal.SIGIOT,
signal.SIGBUS,
signal.SIGFPE,
signal.SIGUSR1,
signal.SIGUSR2,
signal.SIGPIPE,
signal.SIGTERM,
signal.SIGXCPU,
signal.SIGPWR):
signal.signal(sig, self.signal_handler)
def signal_handler(self, signum, frame):
""" catch signal """
print 'Aborting: Signal handler called with signal ', signum
if self.client:
self.client.child_abort("Signal handler called with signal " + str(signum))
sys.exit(0)
def __exit__(self, exc_type, exc_value, traceback):
if self.client:
self.client.child_abort()
sys.exit(0)
def report(self, msg, meter=None):
""" communicate with ecFlow server """
if not self.client:
return
elif meter:
self.client.child_meter(msg, meter)
elif msg == "stop":
self.client.child_complete()
self.client = None
else: self.client.child_label("info", msg)
def check3of5(outof5, node_path, wait=True, interval=30):
host = os.getenv("ECF_HOST", "localhost")
port = os.getenv("ECF_PORT", 31415)
client = ecflow.Client(host, port)
client.ch_register(False, [ str(node_path.split('/')[1]) ])
child = Child() # choice? might be global variable ???
# raise BaseException()
while 1:
tot = 0
count = 0
done = True
client.sync_local()
node = client.get_defs().find_abs_node(node_path)
if node is None: stop("node not found!!!", 1, child)
for item in node.nodes:
count += 1
status = "%s" % item.get_state()
# print item.get_abs_node_path(), status, outof5, tot
if status == "complete":
tot += 1
if tot >= outof5: stop("# OK", 0, child)
elif status == "aborted":
pass
else: done = False
if count < outof5: stop("# Impossible %d %d" % (count, outof5), 1, child)
if done: stop("# KO %d %d %d" % (tot, outof5, count), 1, child)
print "# still possible",
if wait:
print "...", tot, outof5, count
child.report("... %d %d %d" % (tot, outof5, count))
time.sleep(interval);
else: stop("", -1, child)
def task_check():
from ecf import Task, Label, Variables
pwd = os.getcwd()
name = sys.argv[0]
if '/' in name: name = name.split('/')[-1]
if '.' in name: name = name.split('.')[0]
return Task(name).add(
Label("info", "this task uses a python-script directly, no job..."),
Variables(ECF_MICRO = MICRO[0],
ECF_FILES= pwd,
ECF_HOME= pwd,
ECF_EXTN= ".py",
ECF_PASS= "FREE",
# ECF_JOB_CMD= pwd + "/%s -i $INTERVAL$ -w" % name
# ECF_JOB_CMD= "python $ECF_JOB$ -i $INTERVAL$ -w"
ECF_JOB_CMD= "python $ECF_JOB$ -w"
+ " -n 3 -p /$SUITE$/fam"
+ " > $ECF_JOBOUT$ 2>&1",
INTERVAL=30,
),
)
class Test3outof5(unittest.TestCase):
""" a test case """
def test_1(self, test_ok=1):
""" a test """
import ecf
from ecf import Task, Label, Variables, Suite, Defs, Family, Trigger
defs = Defs()
sname = "test3outof5"
cargo = task_check()
defs.add_suite(
Suite(sname).add(
# ecf.Defstatus("suspended"),
Family("fam").add(
ecf.Defstatus("suspended"),
[ Task(name) for name in ("a", "b", "c", "d", "e") ]),
cargo.add(Trigger("fam != queued")), # trigger as string
Family("user").add(
# trigger with python == and obj:
Trigger(cargo.name() == ecf.COMPLETE),
Task("dummy").add(
ecf.Complete("1==1"))
)
)
)
client = replace("/%s" % sname, defs)
client.begin_suite(sname)
client.force_state("/%s/fam/a" % sname, ecflow.State.complete)
client.force_state("/%s/fam/b" % sname, ecflow.State.aborted)
client.force_state("/%s/fam/c" % sname, ecflow.State.complete)
if test_ok: client.force_state("/%s/fam/d" % sname, ecflow.State.active)
else: client.force_state("/%s/fam/d" % sname, ecflow.State.aborted)
if test_ok: client.force_state("/%s/fam/e" % sname, ecflow.State.complete)
else: client.force_state("/%s/fam/e" % sname, ecflow.State.aborted)
# def test_2(self): self.test_1(0)
def replace(path, defs=None):
import ecf
host = os.getenv("ECF_HOST", "localhost")
port = os.getenv("ECF_PORT", 31415)
client = ecf.Client(host, port)
# ecf.Client(host, port).replace(path, defs, 1, 1)
client.replace(path, defs, 1, 1)
return client
if __name__ == '__main__':
try:
OPTS, ARGS = getopt.getopt(
sys.argv[1:], "hi:n:p:rtw",
["help", "interval", "number", "path", "replace", "test", "wait"])
except getopt.GetoptError as err:
print "#what?", usage()
INTERVAL = 30
WAIT = False
PATH = None
NUM = None
REPLACE = False
for o, a in OPTS:
if o in ("-h", "--help"):
usage()
elif o in ("-i", "--interval"):
INTERVAL = int(a)
elif o in ("-n", "--number"):
NUM = int(a)
elif o in ("-p", "--path"):
PATH = a
elif o in ("-r", "--replace"):
REPLACE = True
elif o in ("-t", "--test"):
unittest.main(argv=[sys.argv[0]])
sys.exit(0)
elif o in ("-w", "--wait"):
WAIT = True
if NUM and PATH:
check3of5(NUM, PATH, WAIT, INTERVAL)
elif PATH and REPLACE:
replace(PATH)
else: usage()
|