Standalone client
In this example we want to monitor a particular task.
server for the job output. This could be mailed to the user.
import ecflow
import time
import sys
def monitor_critical_task(ci, path_to_task):
# Query the server for any changes
if ci.news_local():
# get the incremental changes, and merge with defs stored on the Client
ci.sync_local()
# check to see if definition exists in the server
defs = ci.get_defs()
if len(defs) == 0 :
sys.exit(0) # return server has no suites
...
...
# find the task we are interested in
critical_task = defs.find_abs_node(path_to_task)
...
if critical_task is None:
...
# No such task
...
...
...
...
...
...
...
...
...
...
...
sys.exit(0) # return
...
...
...
...
...
...
...
...
...
#
...
Check to see if task was aborted, if it was email me the job output
...
if critical_task.get_state() == ecflow.State.aborted:
...
# Get the
...
job
...
output
...
...
...
...
...
the_aborted_task_output = ci.get_file(path_to_task,'jobout')
...
# email(the_aborted_task_output)
sys.exit(0)
...
try:
# Create the client. This will read the default environment variables
ci = ecflow.Client("localhost", "4143")
# Continually monitor the suite
while 1:
monitor_critical_task(ci, "/suite/critical_node")
...
# Sleep for 5 minutes.
...
# To avoid overloading server ensure sleep is > 60 seconds
time.sleep(300)
except RuntimeError, e:
print str(e)
Display ecFlow server content as a file system
Another example is to use an ecFlow python client to provide the server content visible like a mounted file system.
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
virtualenv venv
. ./venv/bin/activate
pip install fusepy |
The following script shows that task wrappers, manual page, job, output or even node attributes may be provided as files.
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
#!/usr/bin/env python import os import sys import time sys.path.append("/usr/local/apps/ecflow/current/lib/python2.7/site-packages/ecflow") from errno import ENOENT from stat import S_IFDIR, S_IFREG, S_IFBLK from sys import argv, exit import ecflow from ecflow import Client from fuse import FUSE, Operations, LoggingMixIn, FuseOSError, fuse_get_context FILESIZE = 10000 UPDATE = "time" UPDATE = "always" ignore = ("bdmv", "inf", ) exts = { "ecf": "script", "sms": "script", "man": "manual", <span class="c"># Get the job output</span> <span class="n">the_aborted_task_output</span> <span class="o">=</span> <span class="n">ci</span><span class="o">.</span><span class="n">get_file</span><span class="p">(</span><span class="n">path_to_task</span><span class="p">,</span><span class="s">'jobout'</span><span class="p">)</span> <span class="cout"># email(the_aborted_task_output)</span> : "jobout", <span class="nbnop">exit</span><span class="p">(</span><span class="mi">0</span><span class="p">)</span>: None, "att": None, <span class="kjob">try</span><span: class="p">:</span> <span class="c"># Create the client. This will read the default environment variables</span> job" } statuses = { -1: "unknown", <span class="n">ci</span> <span class="o">=</span> <span class="n">ecflow</span><span class="o">.</span><span class="n">Client</span><span class="p">(</span><span class="s">"localhost"</span><span class="p">,</span> <span class="s">"4141"</span><span class="p">)</span> <span class="c"># Continually monitor the suite</span> <span class="k">while</span> <span class="mi">1</span><span class="p">:</span> 0: "unknown", 1: "suspended", 2: "complete", 3: "queued", <span4: class="n">monitor_critical_task</span><span class="p">(</span><span class="n">ci</span><span class="p">,</span> <span class="s">"/suite/critical_node"</span><span class="p">)</span> <span class="c"># Sleep for 5 minutes. </span> <span class="c"># To avoid overloading server ensure sleep is > 60 seconds </span> <span class="n">time</span><span class="o">.</span><span class="n">sleep</span><span class="p">(</span><span class="mi">300</span><span class="p">)</span> <span class="k">except</span> <span class="ne">RuntimeError</span><span class="p">,</span> <span class="n">e</span><span class="p">:</span> <span class="k">print</span> <span class="nb">str</span><span class="p">(</span><span class="n">e</span><span class="p">)</span> </pre></div> </div> </div> submitted", 5: "active", 6: "aborted", 7: "shutdown", 8: "halted", 9: "unknown", } state3 = ("unk", "que", "sub", "com", "abo", "sus", "hal", "shu", ) def list_dir(node): res = [] for item in node.nodes: name = item.name() state = ".%03s" % item.get_state() res.extend([ name + state[:4]]) res.extend([ name + ".att"]) if isinstance(item, ecflow.Task): res.extend([ name + ".job", name + ".ecf", name + ".out", name + ".man",]) return res def list_att(node): attr = dict() try: clock = suite.get_clock() attr = dict() if clock: attr['clock']= "%s" % clock except: pass attr['status'] = "%s" % node.get_state() defstatus = node.get_defstatus() if defstatus != ecflow.DState.queued: attr['defstatus'] = "%s" % defstatus autocancel = node.get_autocancel() if autocancel: attr['autocancel']= "%s" % autocancel repeat = node.get_repeat() if not repeat.empty(): attr['repeat']= "%s # value:%s" % (repeat, repeat.value()) late = node.get_late() if late: attr['late']= "%s" % late complete_expr = node.get_complete() if complete_expr: for part_expr in complete_expr.parts: trig = "complete " if part_expr.and_expr(): trig = trig + "-a " if part_expr.or_expr(): trig = trig + "-o " attr['complete']= "%s" % trig + " %s" % \ part_expr.get_expression() + "\n" trigger_expr = node.get_trigger() if trigger_expr: for part_expr in trigger_expr.parts: trig = "trigger " if part_expr.and_expr(): trig = trig + "-a " if part_expr.or_expr(): trig = trig + "-o " attr['trigger'] = "%s" % trig + " %s" % \ part_expr.get_expression() + "\n" attr['edit'] = [ (item.name(), item.value()) for item in node.variables ] addit(node.meters, attr, 'meter') addit(node.events, attr, 'event') addit(node.labels, attr, 'label') addit(node.limits, attr, 'limit') addit(node.inlimits, attr, 'inlimits') addit(node.times, attr, 'time') addit(node.todays, attr, 'today') addit(node.dates, attr, 'date') addit(node.days, attr, "day") addit(node.crons, attr, "cron") import pprint pp = pprint.PrettyPrinter(indent=4) pp.pprint(attr) return pprint.pformat(attr) def addit(array, cont, name): rc = [] for item in array: rc.append("%s" % item) if rc is not None: cont[name] = rc class FuseEcflow(LoggingMixIn, Operations): ''' A simple Ecflow python client example ''' def __init__(self, host="localhost", port=31415, path='.'): self.client = Client(host, port) self.client.sync_local() self.update = int(time.strftime("%H%M")) self.defs = self.client.get_defs() self.root = path print "#MSG: connected to %s " % host + port if 0: for s in self.defs.suites: print s.name(), print def chmod(self, path, mode): raise FuseOSError(ENOENT) def chown(self, path, uid, gid): raise FuseOSError(ENOENT) def create(self, path, mode): raise FuseOSError(ENOENT) def destroy(self, path): pass def getattr(self, path, fh=None): uid, gid, pid = fuse_get_context() cur = os.lstat(".") st = dict((key, getattr(cur, key)) for key in ( 'st_atime', 'st_gid', 'st_mode', 'st_mtime', 'st_size', 'st_uid')) if path == '/': st['st_mode'] = (S_IFDIR | 0555) st['st_nlink']=2 elif '.' in path: st['st_mode'] = (S_IFREG | 0444) st['st_size'] = FILESIZE elif 1: st['st_mode'] = (S_IFDIR | 0444) st['st_size'] = 1 else: raise FuseOSError(ENOENT) st['st_ctime'] = st['st_mtime'] = st['st_atime'] = time.time() return st def mkdir(self, path, mode): raise FuseOSError(ENOENT) def refresh(self): curr = int(time.strftime("%H%M")) if curr - self.update > 5 or UPDATE == "always": self.client.sync_local() self.update = curr def read(self, path, size, offset, fh): ext = "nop" if "." in path: path, ext = path.split(".") self.refresh() print "read", path, size, offset, fh, ext if ext in state3: return "-empty-" elif ext in ignore: return "-empty-" elif not ext in exts.keys(): print "what?", ext; return "-empty-" elif ext not in ("nop", "att"): res = "%s" % self.client.get_file(str(path), exts[ext]) if len(res) > size: return "#TRUNCATED\n" + res[:size-30] + "\n#TRUNCATED\n" return res else: node = self.client.get_defs().find_abs_node(str(path)) res = "" if node: print node.name(), node.get_abs_node_path() if path == '/': for s in self.defs.suites: res += "%s " % s.name() return res elif node is None: return "-empty-" elif ext == "att": return list_att(node) elif 1: for s in node.nodes: res += "%s\n" % s.name() return res else: raise FuseOSError(ENOENT) f = open(path) f.seek(offset, 0) buf = f.read(size) f.close() return buf def readdir(self, path='/', fh=None): ext = "nop" if "." in path: path, ext = path.split(".") if not ext in exts.keys(): print "readdir: what?", ext; return [ "/" ] print "readdir", path, fh, path, ext self.refresh() if ext != "nop": return [ ".", "..", "ok", path.replace("/", "_"), ext ] else: node = self.client.get_defs().find_abs_node(str(path)) res = [] if path == '/': node = self.client.get_defs() return ['.', '..'] + [ "%s" % s.name() for s in node.suites ] elif node is None: return ["-empty-" ] else: res = ['.', '..'] + ["%s" % n.name() for n in node.nodes ] res += list_dir(node) return res def readlink(self, path): raise FuseOSError(ENOENT) def rename(self, old, new): raise FuseOSError(ENOENT) def rmdir(self, path): raise FuseOSError(ENOENT) def symlink(self, target, source): raise FuseOSError(ENOENT) def truncate(self, path, length, fh=None): raise FuseOSError(ENOENT) def unlink(self, path): raise FuseOSError(ENOENT) def utimens(self, path, times=None): raise FuseOSError(ENOENT) def write(self, path, data, offset, fh): raise FuseOSError(ENOENT) if __name__ == '__main__': if len(argv) != 4: print('usage: %s <host> <port> <mountpoint>' % argv[0]) exit(1) fuse = FUSE(FuseEcflow(argv[1], argv[2]), argv[3], foreground=True, nothreads=True) |
Client shall be activated:
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
mkdir -p mnt; python ./ecflow_fuse.py $ECF_HOST $ECF_PORT mnt; |
In this example, each node status is encoded as a three characters extension (.com for complete, .que for queued, .abo for aborted ...) and you will appreciate that suspended status is not displayed.
Standard command line utilities (midnight commander, find, ls, cat, ...) can be used then:
Code Block | ||||
---|---|---|---|---|
| ||||
# server eod3 was mounted
ls eod3/emc_41r2/thu
# 01 01.att 01.que hind hind.att hind.que verify verify.att verify.com
LS_COLORS+="*.abo=41:*.com=43:*.sus=01;45:*.que=44:*.sub=01;46:*.unk=47:*.act=01;42:"
ls eod3/emc_41r2/thu
cat eod3/emc_41r2/main/18bc/sv/getini.ecf
cat eod3/emc_41r2/main/18bc/sv/getini.man
cat eod3/emc_41r2/main/18bc/sv/getini.out
kdirstat eod3/emc_41r2/main/
find eod3 -name "*.abo"
mc eod3 |
Center |
---|