Standalone client
In this example we want to monitor a particular task.
server for the job output. This could be mailed to the user.
#!/usr/bin/env python2.7
import ecflow
import time
import sys
def monitor_critical_task(ci, path_to_task):
# Query the server for any changes
if ci.news_local():
# get the incremental changes, and merge with defs stored on the Client
ci.sync_local()
# check to see if definition exists in the server
defs = ci.get_defs()
if len(defs) == None0 :
sys.exit(0) # return server has no suites
# find the task we are interested in
critical_task = defs.find_abs_node(path_to_task)
if critical_task ==is None:
# No such task
sys.exit(0) # return
# Check to see if task was aborted, if it was email me the job output
if critical_task.get_state() == ecflow.State.aborted:
# Get the job output
the_aborted_task_output = ci.get_file(path_to_task,'jobout')
# email(the_aborted_task_output)
sys.exit(0)
try:
# Create the client. This will read the default environment variables
ci = ecflow.Client("localhost", "4143")
# Continually monitor the suite
while 1:
monitor_critical_task(ci, "/suite/critical_node")
# Sleep for 5 minutes.
# To avoid overloading server ensure sleep is > 60 seconds
time.sleep(300)
except RuntimeError, e:
print str(e)
...
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
#!/usr/bin/env python import os import sys import time sys.path.append("/usr/local/apps/ecflow/current/lib/python2.7/site-packages/ecflow") from errno import ENOENT from stat import S_IFDIR, S_IFREG, S_IFBLK from sys import argv, exit import ecflow from ecflow import Client from fuse import FUSE, Operations, LoggingMixIn, FuseOSError, fuse_get_context FILESIZE = 10000 UPDATE = "time" UPDATE = "always" ignore = ("bdmv", "inf", ) exts = { "ecf": "script", "sms": "script", "man": "manual", "out": "jobout", "nop": None, "att": None, "job": "job" } statuses = { -1: "unknown", 0: "unknown", 1: "suspended", 2: "complete", 3: "queued", 4: "submitted", 5: "active", 6: "aborted", 7: "shutdown", 8: "halted", 9: "unknown", } state3 = ("unk", "que", "sub", "com", "abo", "sus", "hal", "shu", ) def list_dir(node): res = [] for item in node.nodes: name = item.name() state = ".%03s" % item.get_state() res.extend([ name + state[:4]]) res.extend([ name + ".att"]) if isinstance(item, ecflow.Task): res.extend([ name + ".job", name + ".ecf", name + ".out", name + ".man",]) return res def list_att(node): attr = dict() try: clock = suite.get_clock() attr = dict() if clock: attr['clock']= "%s" % clock except: pass attr['status'] = "%s" % node.get_state() defstatus = node.get_defstatus() if defstatus != ecflow.DState.queued: attr['defstatus'] = "%s" % defstatus autocancel = node.get_autocancel() if autocancel: attr['autocancel']= "%s" % autocancel repeat = node.get_repeat() if not repeat.empty(): attr['repeat']= "%s # value:%s" % (repeat, repeat.value()) late = node.get_late() if late: attr['late']= "%s" % late complete_expr = node.get_complete() if complete_expr: for part_expr in complete_expr.parts: trig = "complete " if part_expr.and_expr(): trig = trig + "-a " if part_expr.or_expr(): trig = trig + "-o " attr['complete']= "%s" % trig + " %s" % \ part_expr.get_expression() + "\n" trigger_expr = node.get_trigger() if trigger_expr: for part_expr in trigger_expr.parts: trig = "trigger " if part_expr.and_expr(): trig = trig + "-a " if part_expr.or_expr(): trig = trig + "-o " attr['trigger'] = "%s" % trig + " %s" % \ part_expr.get_expression() + "\n" attr['edit'] = [ (item.name(), item.value()) for item in node.variables ] addit(node.meters, attr, 'meter') addit(node.events, attr, 'event') addit(node.labels, attr, 'label') addit(node.limits, attr, 'limit') addit(node.inlimits, attr, 'inlimits') addit(node.times, attr, 'time') addit(node.todays, attr, 'today') addit(node.dates, attr, 'date') addit(node.days, attr, "day") addit(node.crons, attr, "cron") import pprint pp = pprint.PrettyPrinter(indent=4) pp.pprint(attr) return pprint.pformat(attr) def addit(array, cont, name): rc = [] for item in array: rc.append("%s" % item) if rc is not None: cont[name] = rc class FuseEcflow(LoggingMixIn, Operations): ''' A simple Ecflow python client example ''' def __init__(self, host="localhost", port=31415, path='.'): self.client = Client(host, port) self.client.sync_local() self.update = int(time.strftime("%H%M")) self.defs = self.client.get_defs() self.root = path print "#MSG: connected to %s " % host + port if 0: for s in self.defs.suites: print s.name(), print def chmod(self, path, mode): raise FuseOSError(ENOENT) def chown(self, path, uid, gid): raise FuseOSError(ENOENT) def create(self, path, mode): raise FuseOSError(ENOENT) def destroy(self, path): pass def getattr(self, path, fh=None): uid, gid, pid = fuse_get_context() cur = os.lstat(".") st = dict((key, getattr(cur, key)) for key in ( 'st_atime', 'st_gid', 'st_mode', 'st_mtime', 'st_size', 'st_uid')) if path == '/': st['st_mode'] = (S_IFDIR | 0555) st['st_nlink']=2 elif '.' in path: st['st_mode'] = (S_IFREG | 0444) st['st_size'] = FILESIZE elif 1: st['st_mode'] = (S_IFDIR | 0444) st['st_size'] = 1 else: raise FuseOSError(ENOENT) st['st_ctime'] = st['st_mtime'] = st['st_atime'] = time.time() return st def mkdir(self, path, mode): raise FuseOSError(ENOENT) def refresh(self): curr = int(time.strftime("%H%M")) if curr - self.update > 5 or UPDATE == "always": self.client.sync_local() self.update = curr def read(self, path, size, offset, fh): ext = "nop" if "." in path: path, ext = path.split(".") self.refresh() print "read", path, size, offset, fh, ext if ext in state3: return "-empty-" elif ext in ignore: return "-empty-" elif not ext in exts.keys(): print "what?", ext; return "-empty-" elif ext not in ("nop", "att"): res = "%s" % self.client.get_file(str(path), exts[ext]) if len(res) > size: return "#TRUNCATED\n" + res[:size-30] + "\n#TRUNCATED\n" return res else: node = self.client.get_defs().find_abs_node(str(path)) res = "" if node: print node.name(), node.get_abs_node_path() if path == '/': for s in self.defs.suites: res += "%s " % s.name() return res elif node is None: return "-empty-" elif ext == "att": return list_att(node) elif 1: for s in node.nodes: res += "%s\n" % s.name() return res else: raise FuseOSError(ENOENT) f = open(path) f.seek(offset, 0) buf = f.read(size) f.close() return buf def readdir(self, path='/', fh=None): ext = "nop" if "." in path: path, ext = path.split(".") if not ext in exts.keys(): print "readdir: what?", ext; return [ "/" ] print "readdir", path, fh, path, ext self.refresh() if ext != "nop": return [ ".", "..", "ok", path.replace("/", "_"), ext ] else: node = self.client.get_defs().find_abs_node(str(path)) res = [] if path == '/': node = self.client.get_defs() return ['.', '..'] + [ "%s" % s.name() for s in node.suites ] elif node is None: return ["-empty-" ] else: res = ['.', '..'] + ["%s" % n.name() for n in node.nodes ] res += list_dir(node) return res def readlink(self, path): raise FuseOSError(ENOENT) def rename(self, old, new): raise FuseOSError(ENOENT) def rmdir(self, path): raise FuseOSError(ENOENT) def symlink(self, target, source): raise FuseOSError(ENOENT) def truncate(self, path, length, fh=None): raise FuseOSError(ENOENT) def unlink(self, path): raise FuseOSError(ENOENT) def utimens(self, path, times=None): raise FuseOSError(ENOENT) def write(self, path, data, offset, fh): raise FuseOSError(ENOENT) if __name__ == '__main__': if len(argv) != 4: print('usage: %s <host> <port> <mountpoint>' % argv[0]) exit(1) fuse = FUSE(FuseEcflow(argv[1], argv[2]), argv[3], foreground=True, nothreads=True) |
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
mkdir -p mnt; python ./pyecflow_fuse_ecflow.py $ECF_HOST $ECF_PORT mnt; |
...