#!/usr/bin/env python
import os
import sys
import time
sys.path.append("/usr/local/apps/ecflow/current/lib/python2.7/site-packages/ecflow")
from errno import ENOENT
from stat import S_IFDIR, S_IFREG, S_IFBLK
from sys import argv, exit
import ecflow
from ecflow import Client
from fuse import FUSE, Operations, LoggingMixIn, FuseOSError, fuse_get_context
FILESIZE = 10000
UPDATE = "time"
UPDATE = "always"
ignore = ("bdmv", "inf", )
exts = { "ecf": "script",
"sms": "script",
"man": "manual",
"out": "jobout",
"nop": None,
"att": None,
"job": "job" }
statuses = {
-1: "unknown",
0: "unknown",
1: "suspended",
2: "complete",
3: "queued",
4: "submitted",
5: "active",
6: "aborted",
7: "shutdown",
8: "halted",
9: "unknown", }
state3 = ("unk", "que", "sub", "com", "abo", "sus", "hal", "shu", )
def list_dir(node):
res = []
for item in node.nodes:
name = item.name()
state = ".%03s" % item.get_state()
res.extend([ name + state[:4]])
res.extend([ name + ".att"])
if isinstance(item, ecflow.Task):
res.extend([ name + ".job",
name + ".ecf",
name + ".out",
name + ".man",])
return res
def list_att(node):
attr = dict()
try:
clock = suite.get_clock()
attr = dict()
if clock:
attr['clock']= "%s" % clock
except: pass
attr['status'] = "%s" % node.get_state()
defstatus = node.get_defstatus()
if defstatus != ecflow.DState.queued:
attr['defstatus'] = "%s" % defstatus
autocancel = node.get_autocancel()
if autocancel: attr['autocancel']= "%s" % autocancel
repeat = node.get_repeat()
if not repeat.empty(): attr['repeat']= "%s # value:%s" % (repeat,
repeat.value())
late = node.get_late()
if late: attr['late']= "%s" % late
complete_expr = node.get_complete()
if complete_expr:
for part_expr in complete_expr.parts:
trig = "complete "
if part_expr.and_expr(): trig = trig + "-a "
if part_expr.or_expr(): trig = trig + "-o "
attr['complete']= "%s" % trig + " %s" % \
part_expr.get_expression() + "\n"
trigger_expr = node.get_trigger()
if trigger_expr:
for part_expr in trigger_expr.parts:
trig = "trigger "
if part_expr.and_expr(): trig = trig + "-a "
if part_expr.or_expr(): trig = trig + "-o "
attr['trigger'] = "%s" % trig + " %s" % \
part_expr.get_expression() + "\n"
attr['edit'] = [ (item.name(), item.value()) for item in node.variables ]
addit(node.meters, attr, 'meter')
addit(node.events, attr, 'event')
addit(node.labels, attr, 'label')
addit(node.limits, attr, 'limit')
addit(node.inlimits, attr, 'inlimits')
addit(node.times, attr, 'time')
addit(node.todays, attr, 'today')
addit(node.dates, attr, 'date')
addit(node.days, attr, "day")
addit(node.crons, attr, "cron")
import pprint
pp = pprint.PrettyPrinter(indent=4)
pp.pprint(attr)
return pprint.pformat(attr)
def addit(array, cont, name):
rc = []
for item in array:
rc.append("%s" % item)
if rc is not None: cont[name] = rc
class FuseEcflow(LoggingMixIn, Operations):
'''
A simple Ecflow python client example
'''
def __init__(self, host="localhost", port=31415, path='.'):
self.client = Client(host, port)
self.client.sync_local()
self.update = int(time.strftime("%H%M"))
self.defs = self.client.get_defs()
self.root = path
print "#MSG: connected to %s " % host + port
if 0:
for s in self.defs.suites: print s.name(),
print
def chmod(self, path, mode):
raise FuseOSError(ENOENT)
def chown(self, path, uid, gid):
raise FuseOSError(ENOENT)
def create(self, path, mode):
raise FuseOSError(ENOENT)
def destroy(self, path): pass
def getattr(self, path, fh=None):
uid, gid, pid = fuse_get_context()
cur = os.lstat(".")
st = dict((key, getattr(cur, key)) for key in (
'st_atime', 'st_gid',
'st_mode', 'st_mtime', 'st_size', 'st_uid'))
if path == '/':
st['st_mode'] = (S_IFDIR | 0555)
st['st_nlink']=2
elif '.' in path:
st['st_mode'] = (S_IFREG | 0444)
st['st_size'] = FILESIZE
elif 1:
st['st_mode'] = (S_IFDIR | 0444)
st['st_size'] = 1
else:
raise FuseOSError(ENOENT)
st['st_ctime'] = st['st_mtime'] = st['st_atime'] = time.time()
return st
def mkdir(self, path, mode):
raise FuseOSError(ENOENT)
def refresh(self):
curr = int(time.strftime("%H%M"))
if curr - self.update > 5 or UPDATE == "always":
self.client.sync_local()
self.update = curr
def read(self, path, size, offset, fh):
ext = "nop"
if "." in path:
path, ext = path.split(".")
self.refresh()
print "read", path, size, offset, fh, ext
if ext in state3: return "-empty-"
elif ext in ignore: return "-empty-"
elif not ext in exts.keys(): print "what?", ext; return "-empty-"
elif ext not in ("nop", "att"):
res = "%s" % self.client.get_file(str(path), exts[ext])
if len(res) > size:
return "#TRUNCATED\n" + res[:size-30] + "\n#TRUNCATED\n"
return res
else: node = self.client.get_defs().find_abs_node(str(path))
res = ""
if node: print node.name(), node.get_abs_node_path()
if path == '/':
for s in self.defs.suites: res += "%s " % s.name()
return res
elif node is None: return "-empty-"
elif ext == "att": return list_att(node)
elif 1:
for s in node.nodes: res += "%s\n" % s.name()
return res
else: raise FuseOSError(ENOENT)
f = open(path)
f.seek(offset, 0)
buf = f.read(size)
f.close()
return buf
def readdir(self, path='/', fh=None):
ext = "nop"
if "." in path:
path, ext = path.split(".")
if not ext in exts.keys(): print "readdir: what?", ext; return [ "/" ]
print "readdir", path, fh, path, ext
self.refresh()
if ext != "nop":
return [ ".", "..", "ok", path.replace("/", "_"), ext ]
else: node = self.client.get_defs().find_abs_node(str(path))
res = []
if path == '/':
node = self.client.get_defs()
return ['.', '..'] + [ "%s" % s.name() for s in node.suites ]
elif node is None: return ["-empty-" ]
else:
res = ['.', '..'] + ["%s" % n.name() for n in node.nodes ]
res += list_dir(node)
return res
def readlink(self, path):
raise FuseOSError(ENOENT)
def rename(self, old, new):
raise FuseOSError(ENOENT)
def rmdir(self, path):
raise FuseOSError(ENOENT)
def symlink(self, target, source):
raise FuseOSError(ENOENT)
def truncate(self, path, length, fh=None):
raise FuseOSError(ENOENT)
def unlink(self, path):
raise FuseOSError(ENOENT)
def utimens(self, path, times=None):
raise FuseOSError(ENOENT)
def write(self, path, data, offset, fh):
raise FuseOSError(ENOENT)
if __name__ == '__main__':
if len(argv) != 4:
print('usage: %s <host> <port> <mountpoint>' % argv[0])
exit(1)
fuse = FUSE(FuseEcflow(argv[1], argv[2]), argv[3],
foreground=True, nothreads=True)
|