Standalone client
In this example we want to monitor a particular task.
server for the job output. This could be mailed to the user.
#!/usr/bin/env python2.7
import ecflow
import time
import sys
def monitor_critical_task(ci, path_to_task):
# Query the server for any changes
if ci.news_local():
# get the incremental changes, and merge with defs stored on the Client
ci.sync_local()
# check to see if definition exists in the server
defs = ci.get_defs()
if len(defs) == None0 :
sys.exit(0) # return server has no suites
# find the task we are interested in
critical_task = defs.find_abs_node(path_to_task)
if critical_task ==is None:
# No such task
sys.exit(0) # return
# Check to see if task was aborted, if it was email me the job output
if critical_task.get_state() == ecflow.State.aborted:
# Get the job output
the_aborted_task_output = ci.get_file(path_to_task,'jobout')
# email(the_aborted_task_output)
sys.exit(0)
try:
# Create the client. This will read the default environment variables
ci = ecflow.Client("localhost", "4143")
# Continually monitor the suite
while 1:
monitor_critical_task(ci, "/suite/critical_node")
# Sleep for 5 minutes.
# To avoid overloading server ensure sleep is > 60 seconds
time.sleep(300)
except RuntimeError, e:
print str(e)
Display
...
ecFlow server content as a file system
Another example is to use an ecFlow python client to provide the server content visible like a mounted file system.
...
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
#!/usr/bin/env python import os import sys import time sys.path.append("/usr/local/apps/ecflow/current/lib/python2.7/site-packages/ecflow") from errno import ENOENT from stat import S_IFDIR, S_IFREG, S_IFBLK from sys import argv, exit import ecflow from ecflow import Client from fuse import FUSE, Operations, LoggingMixIn, FuseOSError, fuse_get_context FILESIZE = 10000 UPDATE = "time" UPDATE = "always" ignore = ("bdmv", "inf", ) exts = { "ecf": "script", "sms": "script", "man": "manual", "out": "jobout", "nop": None, "att": None, "job": "job" } statuses = { -1: "unknown", 0: "unknown", 1: "suspended", 2: "complete", 3: "queued", 4: "submitted", 5: "active", 6: "aborted", 7: "shutdown", 8: "halted", 9: "unknown", } state3 = ("unk", "que", "sub", "com", "abo", "sus", "hal", "shu", ) def list_dir(node): res = [] for item in node.nodes: name = item.name() state = ".%03s" % item.get_state() res.extend([ name + state[:4]]) res.extend([ name + ".att"]) if isinstance(item, ecflow.Task): res.extend([ name + ".job", name + ".ecf", name + ".out", name + ".man",]) return res def list_att(node): attr = dict() try: clock = suite.get_clock() attr = dict() if clock: attr['clock']= "%s" % clock except: pass attr['status'] = "%s" % node.get_state() defstatus = node.get_defstatus() if defstatus != ecflow.DState.queued: attr['defstatus'] = "%s" % defstatus autocancel = node.get_autocancel() if autocancel: attr['autocancel']= "%s" % autocancel repeat = node.get_repeat() if not repeat.empty(): attr['repeat']= "%s # value:%s" % (repeat, repeat.value()) late = node.get_late() if late: attr['late']= "%s" % late complete_expr = node.get_complete() if complete_expr: for part_expr in complete_expr.parts: trig = "complete " if part_expr.and_expr(): trig = trig + "-a " if part_expr.or_expr(): trig = trig + "-o " attr['complete']= "%s" % trig + " %s" % \ part_expr.get_expression() + "\n" trigger_expr = node.get_trigger() if trigger_expr: for part_expr in trigger_expr.parts: trig = "trigger " if part_expr.and_expr(): trig = trig + "-a " if part_expr.or_expr(): trig = trig + "-o " attr['trigger'] = "%s" % trig + " %s" % \ part_expr.get_expression() + "\n" attr['edit'] = [ (item.name(), item.value()) for item in node.variables ] addit(node.meters, attr, 'meter') addit(node.events, attr, 'event') addit(node.labels, attr, 'label') addit(node.limits, attr, 'limit') addit(node.inlimits, attr, 'inlimits') addit(node.times, attr, 'time') addit(node.todays, attr, 'today') addit(node.dates, attr, 'date') addit(node.days, attr, "day") addit(node.crons, attr, "cron") import pprint pp = pprint.PrettyPrinter(indent=4) pp.pprint(attr) return pprint.pformat(attr) def addit(array, cont, name): rc = [] for item in array: rc.append("%s" % item) if rc is not None: cont[name] = rc class FuseEcflow(LoggingMixIn, Operations): ''' A simple Ecflow python client example ''' def __init__(self, host="localhost", port=31415, path='.'): self.client = Client(host, port) self.client.sync_local() self.update = int(time.strftime("%H%M")) self.defs = self.client.get_defs() self.root = path print "#MSG: connected to %s " % host + port if 0: for s in self.defs.suites: print s.name(), print def chmod(self, path, mode): raise FuseOSError(ENOENT) def chown(self, path, uid, gid): raise FuseOSError(ENOENT) def create(self, path, mode): raise FuseOSError(ENOENT) def destroy(self, path): pass def getattr(self, path, fh=None): uid, gid, pid = fuse_get_context() cur = os.lstat(".") st = dict((key, getattr(cur, key)) for key in ( 'st_atime', 'st_gid', 'st_mode', 'st_mtime', 'st_size', 'st_uid')) if path == '/': st['st_mode'] = (S_IFDIR | 0555) st['st_nlink']=2 elif '.' in path: st['st_mode'] = (S_IFREG | 0444) st['st_size'] = FILESIZE elif 1: st['st_mode'] = (S_IFDIR | 0444) st['st_size'] = 1 else: raise FuseOSError(ENOENT) st['st_ctime'] = st['st_mtime'] = st['st_atime'] = time.time() return st def mkdir(self, path, mode): raise FuseOSError(ENOENT) def refresh(self): curr = int(time.strftime("%H%M")) if curr - self.update > 5 or UPDATE == "always": self.client.sync_local() self.update = curr def read(self, path, size, offset, fh): ext = "nop" if "." in path: path, ext = path.split(".") self.refresh() print "read", path, size, offset, fh, ext if ext in state3: return "-empty-" elif ext in ignore: return "-empty-" elif not ext in exts.keys(): print "what?", ext; return "-empty-" elif ext not in ("nop", "att"): res = "%s" % self.client.get_file(str(path), exts[ext]) if len(res) > size: return "#TRUNCATED\n" + res[:size-30] + "\n#TRUNCATED\n" return res else: node = self.client.get_defs().find_abs_node(str(path)) res = "" if node: print node.name(), node.get_abs_node_path() if path == '/': for s in self.defs.suites: res += "%s " % s.name() return res elif node is None: return "-empty-" elif ext == "att": return list_att(node) elif 1: for s in node.nodes: res += "%s\n" % s.name() return res else: raise FuseOSError(ENOENT) f = open(path) f.seek(offset, 0) buf = f.read(size) f.close() return buf def readdir(self, path='/', fh=None): ext = "nop" if "." in path: path, ext = path.split(".") if not ext in exts.keys(): print "readdir: what?", ext; return [ "/" ] print "readdir", path, fh, path, ext self.refresh() if ext != "nop": return [ ".", "..", "ok", path.replace("/", "_"), ext ] else: node = self.client.get_defs().find_abs_node(str(path)) res = [] if path == '/': node = self.client.get_defs() return ['.', '..'] + [ "%s" % s.name() for s in node.suites ] elif node is None: return ["-empty-" ] else: res = ['.', '..'] + ["%s" % n.name() for n in node.nodes ] res += list_dir(node) return res def readlink(self, path): raise FuseOSError(ENOENT) def rename(self, old, new): raise FuseOSError(ENOENT) def rmdir(self, path): raise FuseOSError(ENOENT) def symlink(self, target, source): raise FuseOSError(ENOENT) def truncate(self, path, length, fh=None): raise FuseOSError(ENOENT) def unlink(self, path): raise FuseOSError(ENOENT) def utimens(self, path, times=None): raise FuseOSError(ENOENT) def write(self, path, data, offset, fh): raise FuseOSError(ENOENT) if __name__ == '__main__': if len(argv) != 4: print('usage: %s <host> <port> <mountpoint>' % argv[0]) exit(1) fuse = FUSE(FuseEcflow(argv[1], argv[2]), argv[3], foreground=True, nothreads=True) |
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
mkdir -p mnt; python ./pyecflow_fuse_ecflow.py $ECF_NODEHOST $ECF_PORT 31415 mnt; |
In this example, node attributes are visible as a .att file ; each node status is encoded and as a three characters extension (.com for complete, .que for queued, .abo for aborted ...) and you will appreciate that suspended status is not displayed.
Standard command line utilities (midnight commander, find, ls, cat, ...) can be used then:
Code Block | ||||
---|---|---|---|---|
| ||||
# server eod3 was mounted ls eod3/emc_41r2/thu # 01 01.att 01.que hind hind.att hind.que verify verify.att verify.com LS_COLORS+="*.abo=41:*.com=43:*.sus=01;45:*.que=44:*.sub=01;46:*.unk=47:*.act=01;42:" ls eod3/emc_41r2/thu cat eod3/emc_41r2/main/18bc/sv/getini.ecf cat eod3/emc_41r2/main/18bc/sv/getini.man cat eod3/emc_41r2/main/18bc/sv/getini.out kdirstat eod3/emc_41r2/main/ find eod3 -name "*.abo" mc eod3 |
Center |
---|