- Created by Axel Bonet, last modified on Apr 14, 2014
You are viewing an old version of this page. View the current version.
Compare with Current View Page History
« Previous Version 9 Next »
Setting the environment is done calling
module load ecflow
When module is not available on a platform, use is the original way to set PATH and PYTHONPATH variables
use ecflow
It is possible to setup a specific version with
module unload ecflow; module load ecflow/4.0.2
Server can be started with
ecflow_start.sh
Client command can be called to get the self-contained documentation
ecflow_client --help
and the graphical interface is started with
ecflowview; # line below shall add localhost as part the ecflowview->Servers list grep localhost $HOME/.ecflowrc/servers || \ echo "localhost $(uname -n) $(( 1500 + $(id -g))) \ >> $HOME/ecflowrc/servers
Server administrator directory is $HOME/ecflow_server/ which will contain the server log file, the check point file (binary snapshot of the server content). It is defined as variable ECF_HOME on the top node.
Next step is to load a suite into the server. The following python script can be used for suite definition, to expand the suite into a file, and to load it into the serve, as shown in the ecflowview snapshot below
#!/usr/bin/env python # -*- coding= UTF-8 -*- """ course 2014 sample suite to spawn jobs on main available cpu facilities at ECMWF """ from __future__ import with_statement import sys, os, pwd, getopt sys.path.append('/home/ma/emos/def/o/def') import inc_emos as ic from inc_emos import Family, Task, Variables, Meter, Event, \ Label, Limit, If, Complete, Time, Trigger, Client from ecf import get_username, Clock, Defstatus, Inlimit, Repeat, get_uid, Defs # import ecf; ecf.USE_TRIGGER = 0 # DEBUG MODE: not to load triggers def create_task(): """ create example tasks wrappers and include files""" ### task content = """#!/bin/ksh %manual manual - this task is automatically created by cray.py %end %include <qsub.h> if [[ $ARCH = hp* ]]; then export PATH=/usr/local/bin:$PATH; fi if [[ $HOST = lxop* ]]; then export PATH=/usr/local/apps/ecflow/current:/usr/local/apps/sms/bin:$PATH; fi %include <trap.h> SLEEP=%SLEEP:0% echo OK case %ECF_PORT:0% in 0) base=900000; LOGPORT=%LOGPORT:0%;; *) base=1000; LOGPORT=%LOGPORT:0%;; esac base=1000 case $ARCH in cray)xlabel info $(printenv | grep -E '(SUBMIT_|EC_)') ;; *) xlabel info OK esac printenv | sort xevent 1 step=0 while (( $step <= 12 )); do xmeter step $step ((step = step + 1)) sleep $SLEEP done %include <endt.h> """ create_wrapper("test.sms", content) ### PYTHON TASK content = """$include <python_header.h> # header files are located in the same directory as wrapper (quotes) for step in range(0,101): print step xmeter("step", step) else: print 'the loop is over' xevent("1") xlabel("info", "news from pure python world") $include <python_endt.h> """ create_wrapper("python.sms", content) content = """#!/usr/bin/env python import os import sys import signal ECF_PORT=$ECF_PORT:0$ XECF="/usr/local/apps/ecflow/current/bin/ecflow_client "; # --port $ECF_PORT:0$ --host $ECF_NODE:0$ "; def SigHandler(signum, frame): print "caught signal " + signum xabort() sys.exit(0); return print ECF_PORT # set -eux ? # trap 0 ? # time stamp per executed line import atexit early_exit = True @atexit.register def goodbye(): if early_exit: print "too early" xabort() else: xcomplete() # TIME_STAMP # http://shop.oreilly.com/product/9780596007973.do # recipie p436 import syslog, time class FunctionFileLikeWrapper(): def __init__(self, func): self.func = func def write(self, msg): self.func(msg) def flush(self): pass class TimeStamper(object): msg_format = "[%y%m%d %H:%M:%S]", time.gmtime, "%s: %s" msg_format = "+ %H:%M:%S", time.gmtime, "%s %s" def __call__(self, msg): tfmt, tfun, gfmt = self.msg_format return "%s %s\\\n" % (time.strftime(tfmt, tfun()), msg) class TeeFileLikeWrapper(): def __init__(self, *files): self.files = files def write(self, msg): for f in self.files: f.write(timestamp(msg.strip())) class FlushingWrapper: def __init__(self, *files): self.files = files def write(self, msg): for f in self.files: f.write(timestamp(msg)) # f.write(timestamp(msg.strip())) f.flush() def logto(*files): # sys.stdout = TeeFileLikeWrapper(*files) sys.stdout = FlushingWrapper(*files) syslogger = syslog.syslog syslogfile = FunctionFileLikeWrapper(syslogger) timestamp = TimeStamper() logto(sys.stdout, syslogfile, open("log.tmp", "w")) # end time stamp if ECF_PORT > 0: os.environ['ECF_PORT'] = "$ECF_PORT:0$" os.environ['ECF_NAME'] = "$ECF_NAME:0$" os.environ['ECF_NODE'] = "$ECF_NODE:0$" os.environ['ECF_PASS'] = "$ECF_PASS:0$" def xinit(): os.system(XECF + " --init " + str(os.getpid())) print "init" def xabort(): os.system(XECF + " --abort") def xcomplete(): os.system(XECF + " --complete") def xmeter(name, step): os.system(XECF + " --meter " + name + " " + str(step)) def xevent(name): os.system(XECF + " --event " + name) def xlabel(name, msg): os.system(XECF + " --label " + name + " '%s'" % msg) else: os.environ['SMS_PROG'] = "$SMS_PROG:0$" os.environ['SMSNAME'] = "$SMSNAME:0$" os.environ['SMSNODE'] = "$SMSNODE:0$" os.environ['SMSPASS'] = "$SMSPASS:0$" def xinit(): os.system('smsinit ' + str(os.getpid())) def xabort(): os.system('smsabort') def xcomplete(): os.system('smscomplete') def xmeter(name, step): os.system("smsmeter " + name + " " + str(step)) def xevent(name): os.system("smsevent " + name) def xlabel(name, msg): os.system("smslabel " + name + " '%s'" % msg) signal.signal (signal.SIGHUP, SigHandler) signal.signal (signal.SIGINT, SigHandler) signal.signal (signal.SIGQUIT, SigHandler) signal.signal (signal.SIGILL, SigHandler) signal.signal (signal.SIGTRAP, SigHandler) signal.signal (signal.SIGIOT, SigHandler) signal.signal (signal.SIGBUS, SigHandler) signal.signal (signal.SIGFPE, SigHandler) """ create_wrapper("python_header.h", content) content = """# os.system('smscomplete') early_exit = False # xcomplete() # managed with atexit # os.system('/usr/local/apps/sms/bin/ecflow/bin/ecf_client --complete') """ create_wrapper("python_endt.h", content) ### PERL TASK create_wrapper("perl.sms", """#!/usr/bin/perl -w ^include <perl_header.h> # header files are located in the same directory as wrapper (quotes) print "Pure perl SMS task"; for ( my $step=1; $step <= 100 ; $step++ ) { print "this is the number $step\\\n"; xmeter("step", $step); } xevent("1"); xlabel("info", "news from pure perl world"); ^include <perl_endt.h> """) # create_wrapper("perl.sms", content) source = """use strict; my $xmeter = "smsmeter"; my $arg_m = ""; my $xlabel = "smslabel"; my $arg_l = ""; my $xevent = "smsevent"; my $arg_e = ""; my $xcomplete = "smscomplete"; my $arg_c = ""; my $xabort = "smsabort"; if (^ECF_PORT:0^ != 0) { $ENV{'ECF_PORT'} = "^ECF_PORT:0^" ; # ecFlow port number $ENV{'ECF_NODE'} = "^ECF_NODE:0^" ; # ecFlow host $ENV{'ECF_NAME'} = "^ECF_NAME:0^" ; # task path into the suite $ENV{'ECF_PASS'} = "^ECF_PASS:0^" ; # password for the job $ENV{'ECF_TRYNO'} = "^ECF_TRYNO:0^" ; # job occurence number my $client = "/usr/local/apps/ecflow/current/bin/ecflow_client"; $xmeter = $client; $arg_m = "--meter"; $xlabel = $client; $arg_l = "--label"; $xevent = $client; $arg_e = "--event"; $xcomplete = $client; $arg_c = "--complete"; $xabort = $client; system($client, "--init", "$$"); } else { $ENV{'SMS_PROG'} = "^SMS_PROG:0^" ; # SMS Program Number $ENV{'SMSNODE'} = "^SMSNODE:0^" ; # SMS host $ENV{'SMSNAME'} = "^SMSNAME:0^" ; # task path into the suite $ENV{'SMSPASS'} = "^SMSPASS:0^" ; # password for the job occurence $ENV{'SMSTRYNO'} = "^SMSTRYNO:0^" ; # job occurence number } sub xmeter($$){ my ($name, $step) = @_; system($xmeter, $arg_m, $name, $step); } sub xevent($){ my ($name) = @_; system($xevent, $arg_e, $name); } sub xlabel($$){ my ($name, $msg) = @_; system($xlabel, $arg_l, $name, $msg); } sub xabort(){ system($xabort); } sub xcomplete(){ system($xcomplete, $arg_c, "$$"); } print "start"; eval ' """ create_wrapper("perl_header.h", source) source = """'; if ($@){ print "caught signal: $@\n"; xabort(); exit; } print "the job is now complete\n"; xcomplete(); exit;""" create_wrapper("perl_endt.h", source) def create_wrapper(name, content): """ wrapper creation """ print "#MSG: creating file %s/" % wdir + name wrapper = open(wdir + "/%s" % name, 'w') print >> wrapper, content wrapper.close() ############################################## def dummy(): """ create test task""" return Task("test") def limits(): """create limits famly""" return (Family("limits").add( Defstatus("complete"), Limit("lim", 10), Limit("test", 10), Limit("hpc", 10), Limit("cca", 10), Limit("c2a", 10), Inlimit(ic.psel() + "/limits:lim"))) HCRAY = "cct" SUBMIT = ic.SUBM + " %USER% %SCHOST% %ECF_JOB% %ECF_JOBOUT% submit" GSUB = SUBMIT.replace("%SCHOST%", "%SCHOST% %ECF_RID%") KILL = GSUB.replace(" submit", " kill") STATUS = GSUB.replace(" submit", " status") CHECK = STATUS def unit(name, schost, queue, rdir, account="", leaf=True): """ course suite tree leave family""" return Family(name).add( If(account != "", Variables(ACCOUNT= account)), Variables(QUEUE= queue, SCHOST= schost, ECF_OUT= rdir, LOGDIR= rdir,), If (leaf, Task("test").add( Event(1), Meter("step", -1, 120, 100), Label("info", "nop")))) def call_python(): """ pure python task example""" return Task("python").add( Variables(ECF_MICRO= "$", ECF_INCLUDE= wdir, SCHOST= "localhost", ECF_JOB_CMD= "$ECF_JOB$ > $ECF_JOBOUT$ 2>&1 &"), Event("1"), Label("info", "micro is $"), Meter("step", -1, 100), # Defstatus("complete"), ) def call_perl(): """ pure perl task example""" return Task("perl").add( Label("todo", "time-stamp PS4, set eux, exit 0 1"), Variables(ECF_MICRO= "^", ECF_INCLUDE= wdir, SCHOST= "localhost", ECF_JOB_CMD= "^ECF_JOB^ > ^ECF_JOBOUT^ 2>&1 &"), Event("1"), Label("info", "micro is ^"), Meter("step", -1, 100), ) ########################################################################## class Course(ic.SeedOD): """ example class for a suite suite definition""" def __init__(self): super(Course, self).__init__() def setup(self, node): account = "UNSET" global user, host, SUITE self.defs.add_extern("/o/main:YMD") node.add( Clock("real"), Defstatus("suspended"), Event("1"), Meter("step", -1, 100), Label("info", "click edit to update manually"), limits(), Variables(ECF_JOB_CMD= SUBMIT, ECF_KILL_CMD= KILL, ECF_STATUS_CMD= STATUS, ECF_CHECK_CMD= CHECK, ECF_EXTN= ".sms", ACCOUNT= "UNSET", SCHOST= "localhost", ECF_INCLUDE= idir, ECF_HOME= jdir, ECF_FILES= wdir, QUEUE= "ns", USER= user, LOGDIR= "/tmp", TOPATH= udir + "/logs", SLEEP= 1, # x120 ECF_TRIES= 1,), call_python(), call_perl(), ) msg = "have you? setup ssh login, created remote directory" msg += ", considered the need for a logserver" node.family("main").add( Repeat(name="YMD", kind="date", start=20010101, end=20991231, step=1), Family("00").add( Task("REPLACE_ME").add(Defstatus("complete")) ), Family("loop").add( Label("info", "prevent direct repeat increment"), Trigger("./00==complete"), Complete("/%s:1" % node.name()), Time("12:00"))) node = node.family("submit").add( Variables(USER= user, ACCOUNT= account), self.submits(), ) def submits(self): # global HOST, user ct1logs = "/sc1/sb/%s/logs" % user out = [ unit("cca", "cca", "ns", ct1logs) ] out += (unit("localhost", "localhost", "ns", "/tmp/%s/logs" % user), unit("lxab", "lxab", "serial", udir + "/logs"), unit("ecgb", "ecgb", "ns", udir + "/logs"), ) return out def usage(): print sys.argv[0] + ''' -h -u [user] -n [node] -s [suite-name] \ -e: ecflow''' if __name__ == "__main__": global user, udir, ecflow, HOST SUITES = {"course": Course, } user = get_username() try: HOST = sys.argv[-1] except: HOST = "localhost" ECFLOW = True SUITE = None user = get_username() opts, args = getopt.getopt( sys.argv[1:], "hp:u:es:n:p:", ["help", "port", "user", "ecflow", "suite", "node", "path"]) output = None verbose = False for o, a in opts: if o in ("-n", "--node"): HOST = a elif o in ("-h", "--help"): usage() sys.exit() elif o in ("-s", "--suite"): SUITE = a elif o in ("-p", "--path"): path = a elif o in ("-e", "--ecflow"): ecflow = True elif o in ("-u", "--user"): user = a elif o in ("-p", "--port"): port = a else: print "#ERR: what?", o, a; assert False, "unhandled option" ########### SETTINGS ################ WRAPPERS global wdir, rdir, fdef, jdir udir = pwd.getpwnam(user).pw_dir wdir = udir + "/ecflow_server/course" jdir = "/tmp/%s/ecflow" % user ddef = udir + "/ecflow_server" fdef = ddef + "/course.def" idir = "$HOME/ecflow_server/course/include" try: os.makedirs(wdir) except: pass try: os.makedirs(jdir) except: pass create_task() ########### SETTINGS ################ LOAD if 1: client = Client(sys.argv[3], 1000 + int(get_uid())) top = Course().suite() if 1: out = file("%s.exp" % top.name(), 'w'); print >>out, top defs = Defs() defs.add_suite(top) client.replace(sys.argv[2], defs) sys.exit(0)
Clicking on each task, we can check the presence of task wrapper script (ECF_FILES defined properly), then Edit it and preprocess it (ECF_INCLUDE defined properly, no micro character on its own), and submit it.
When the task does not reach the active status, we shall check ECF_OUT directory is existing on the remote host, check that rsh or ssh connection does not request password anymore, or query the queuing system while the directive may not be directly valid (user account, queue)
When the submit family is working for the expected remote host, time to fill the main family with relevant tasks. Enjoy!
- No labels