- Created by Axel Bonet, last modified on Apr 14, 2014
You are viewing an old version of this page. View the current version.
Compare with Current View Page History
« Previous Version 7 Next »
Setting the environment is done calling
module load ecflow
When module is not available on a platform, use is the original way to set PATH and PYTHONPATH variables
use ecflow
It is possible to setup a specific version with
module unload ecflow; module load ecflow/4.0.2
Server can be started with
ecflow_start.sh
Client command can be called to get the self-contained documentation
ecflow_client --help
and the graphical interface is started with
ecflowview; # line below shall add localhost as part the ecflowview->Servers list grep localhost $HOME/.ecflowrc/servers || \ echo "localhost $(uname -n) $(( 1500 + $(id -g))) \ >> $HOME/ecflowrc/servers
Server administrator directory is $HOME/ecflow_server/ which will contain the server log file, the check point file (binary snapshot of the server content). It is defined as variable ECF_HOME on the top node.
Next step is to load a suite into the server. The following python script can be used for suite definition, to expand the suite into a file, and to load it into the serve, as shown in the ecflowview snapshot below
#!/usr/bin/env python # -*- coding= UTF-8 -*- """ course 2014 sample suite to spawn jobs on main available cpu facilities at ECMWF """ from __future__ import with_statement import sys, os, pwd, getopt sys.path.append('/home/ma/emos/def/o/def') import inc_emos as ic from inc_emos import Family, Task, Variables, Meter, Event, \ Label, Limit, If, Complete, Time, Trigger, Client from ecf import get_username, Clock, Defstatus, Inlimit, Repeat, get_uid, Defs # import ecf; ecf.USE_TRIGGER = 0 # DEBUG MODE: not to load triggers def create_task(): """ create example tasks wrappers and include files""" ### task content = """#!/bin/ksh %manual manual - this task is automatically created by cray.py %end %include <qsub.h> if [[ $ARCH = hp* ]]; then export PATH=/usr/local/bin:$PATH; fi if [[ $HOST = lxop* ]]; then export PATH=/usr/local/apps/ecflow/current:/usr/local/apps/sms/bin:$PATH; fi %include <trap.h> SLEEP=%SLEEP:0% echo OK case %ECF_PORT:0% in 0) base=900000; LOGPORT=%LOGPORT:0%;; *) base=1000; LOGPORT=%LOGPORT:0%;; esac base=1000 case $ARCH in cray)xlabel info $(printenv | grep -E '(SUBMIT_|EC_)') ;; *) xlabel info OK esac printenv | sort xevent 1 step=0 while (( $step <= 12 )); do xmeter step $step ((step = step + 1)) sleep $SLEEP done %include <endt.h> """ create_wrapper("test.sms", content) ### PYTHON TASK content = """$include <python_header.h> # header files are located in the same directory as wrapper (quotes) for step in range(0,101): print step xmeter("step", step) else: print 'the loop is over' xevent("1") xlabel("info", "news from pure python world") $include <python_endt.h> """ create_wrapper("python.sms", content) content = """#!/usr/bin/env python import os import sys import signal ECF_PORT=$ECF_PORT:0$ XECF="/usr/local/apps/ecflow/current/bin/ecflow_client "; # --port $ECF_PORT:0$ --host $ECF_NODE:0$ "; def SigHandler(signum, frame): print "caught signal " + signum xabort() sys.exit(0); return print ECF_PORT # set -eux ? # trap 0 ? # time stamp per executed line import atexit early_exit = True @atexit.register def goodbye(): if early_exit: print "too early" xabort() else: xcomplete() # TIME_STAMP # http://shop.oreilly.com/product/9780596007973.do # recipie p436 import syslog, time class FunctionFileLikeWrapper(): def __init__(self, func): self.func = func def write(self, msg): self.func(msg) def flush(self): pass class TimeStamper(object): msg_format = "[%y%m%d %H:%M:%S]", time.gmtime, "%s: %s" msg_format = "+ %H:%M:%S", time.gmtime, "%s %s" def __call__(self, msg): tfmt, tfun, gfmt = self.msg_format return "%s %s\\\n" % (time.strftime(tfmt, tfun()), msg) class TeeFileLikeWrapper(): def __init__(self, *files): self.files = files def write(self, msg): for f in self.files: f.write(timestamp(msg.strip())) class FlushingWrapper: def __init__(self, *files): self.files = files def write(self, msg): for f in self.files: f.write(timestamp(msg)) # f.write(timestamp(msg.strip())) f.flush() def logto(*files): # sys.stdout = TeeFileLikeWrapper(*files) sys.stdout = FlushingWrapper(*files) syslogger = syslog.syslog syslogfile = FunctionFileLikeWrapper(syslogger) timestamp = TimeStamper() logto(sys.stdout, syslogfile, open("log.tmp", "w")) # end time stamp if ECF_PORT > 0: os.environ['ECF_PORT'] = "$ECF_PORT:0$" os.environ['ECF_NAME'] = "$ECF_NAME:0$" os.environ['ECF_NODE'] = "$ECF_NODE:0$" os.environ['ECF_PASS'] = "$ECF_PASS:0$" def xinit(): os.system(XECF + " --init " + str(os.getpid())) print "init" def xabort(): os.system(XECF + " --abort") def xcomplete(): os.system(XECF + " --complete") def xmeter(name, step): os.system(XECF + " --meter " + name + " " + str(step)) def xevent(name): os.system(XECF + " --event " + name) def xlabel(name, msg): os.system(XECF + " --label " + name + " '%s'" % msg) else: os.environ['SMS_PROG'] = "$SMS_PROG:0$" os.environ['SMSNAME'] = "$SMSNAME:0$" os.environ['SMSNODE'] = "$SMSNODE:0$" os.environ['SMSPASS'] = "$SMSPASS:0$" def xinit(): os.system('smsinit ' + str(os.getpid())) def xabort(): os.system('smsabort') def xcomplete(): os.system('smscomplete') def xmeter(name, step): os.system("smsmeter " + name + " " + str(step)) def xevent(name): os.system("smsevent " + name) def xlabel(name, msg): os.system("smslabel " + name + " '%s'" % msg) signal.signal (signal.SIGHUP, SigHandler) signal.signal (signal.SIGINT, SigHandler) signal.signal (signal.SIGQUIT, SigHandler) signal.signal (signal.SIGILL, SigHandler) signal.signal (signal.SIGTRAP, SigHandler) signal.signal (signal.SIGIOT, SigHandler) signal.signal (signal.SIGBUS, SigHandler) signal.signal (signal.SIGFPE, SigHandler) """ create_wrapper("python_header.h", content) content = """# os.system('smscomplete') early_exit = False # xcomplete() # managed with atexit # os.system('/usr/local/apps/sms/bin/ecflow/bin/ecf_client --complete') """ create_wrapper("python_endt.h", content) ### PERL TASK create_wrapper("perl.sms", """#!/usr/bin/perl -w ^include <perl_header.h> # header files are located in the same directory as wrapper (quotes) print "Pure perl SMS task"; for ( my $step=1; $step <= 100 ; $step++ ) { print "this is the number $step\\\n"; xmeter("step", $step); } xevent("1"); xlabel("info", "news from pure perl world"); ^include <perl_endt.h> """) # create_wrapper("perl.sms", content) source = """use strict; my $xmeter = "smsmeter"; my $arg_m = ""; my $xlabel = "smslabel"; my $arg_l = ""; my $xevent = "smsevent"; my $arg_e = ""; my $xcomplete = "smscomplete"; my $arg_c = ""; my $xabort = "smsabort"; if (^ECF_PORT:0^ != 0) { $ENV{'ECF_PORT'} = "^ECF_PORT:0^" ; # ecFlow port number $ENV{'ECF_NODE'} = "^ECF_NODE:0^" ; # ecFlow host $ENV{'ECF_NAME'} = "^ECF_NAME:0^" ; # task path into the suite $ENV{'ECF_PASS'} = "^ECF_PASS:0^" ; # password for the job $ENV{'ECF_TRYNO'} = "^ECF_TRYNO:0^" ; # job occurence number my $client = "/usr/local/apps/ecflow/current/bin/ecflow_client"; $xmeter = $client; $arg_m = "--meter"; $xlabel = $client; $arg_l = "--label"; $xevent = $client; $arg_e = "--event"; $xcomplete = $client; $arg_c = "--complete"; $xabort = $client; system($client, "--init", "$$"); } else { $ENV{'SMS_PROG'} = "^SMS_PROG:0^" ; # SMS Program Number $ENV{'SMSNODE'} = "^SMSNODE:0^" ; # SMS host $ENV{'SMSNAME'} = "^SMSNAME:0^" ; # task path into the suite $ENV{'SMSPASS'} = "^SMSPASS:0^" ; # password for the job occurence $ENV{'SMSTRYNO'} = "^SMSTRYNO:0^" ; # job occurence number } sub xmeter($$){ my ($name, $step) = @_; system($xmeter, $arg_m, $name, $step); } sub xevent($){ my ($name) = @_; system($xevent, $arg_e, $name); } sub xlabel($$){ my ($name, $msg) = @_; system($xlabel, $arg_l, $name, $msg); } sub xabort(){ system($xabort); } sub xcomplete(){ system($xcomplete, $arg_c, "$$"); } print "start"; eval ' """ create_wrapper("perl_header.h", source) source = """'; if ($@){ print "caught signal: $@\n"; xabort(); exit; } print "the job is now complete\n"; xcomplete(); exit;""" create_wrapper("perl_endt.h", source) def create_wrapper(name, content): """ wrapper creation """ print "#MSG: creating file %s/" % wdir + name wrapper = open(wdir + "/%s" % name, 'w') print >> wrapper, content wrapper.close() ############################################## def dummy(): """ create test task""" return Task("test") def limits(): """create limits famly""" return (Family("limits").add( Defstatus("complete"), Limit("lim", 10), Limit("test", 10), Limit("hpc", 10), Limit("cca", 10), Limit("c2a", 10), Inlimit(ic.psel() + "/limits:lim"))) HCRAY = "cct" SUBMIT = ic.SUBM + " %USER% %SCHOST% %ECF_JOB% %ECF_JOBOUT% submit" GSUB = SUBMIT.replace("%SCHOST%", "%SCHOST% %ECF_RID%") KILL = GSUB.replace(" submit", " kill") STATUS = GSUB.replace(" submit", " status") CHECK = STATUS def unit(name, schost, queue, rdir, account="", leaf=True): """ course suite tree leave family""" return Family(name).add( If(account != "", Variables(ACCOUNT= account)), Variables(QUEUE= queue, SCHOST= schost, ECF_OUT= rdir, LOGDIR= rdir,), If (leaf, Task("test").add( Event(1), Meter("step", -1, 120, 100), Label("info", "nop")))) def call_python(): """ pure python task example""" return Task("python").add( Variables(ECF_MICRO= "$", ECF_INCLUDE= wdir, SCHOST= "localhost", ECF_JOB_CMD= "$ECF_JOB$ > $ECF_JOBOUT$ 2>&1 &"), Event("1"), Label("info", "micro is $"), Meter("step", -1, 100), # Defstatus("complete"), ) def call_perl(): """ pure perl task example""" return Task("perl").add( Label("todo", "time-stamp PS4, set eux, exit 0 1"), Variables(ECF_MICRO= "^", ECF_INCLUDE= wdir, SCHOST= "localhost", ECF_JOB_CMD= "^ECF_JOB^ > ^ECF_JOBOUT^ 2>&1 &"), Event("1"), Label("info", "micro is ^"), Meter("step", -1, 100), ) ########################################################################## class Course(ic.SeedOD): """ example class for a suite suite definition""" def __init__(self): super(Course, self).__init__() def setup(self, node): account = "UNSET" global user, host, SUITE self.defs.add_extern("/o/main:YMD") node.add( Clock("real"), Defstatus("suspended"), Event("1"), Meter("step", -1, 100), Label("info", "click edit to update manually"), limits(), Variables(ECF_JOB_CMD= SUBMIT, ECF_KILL_CMD= KILL, ECF_STATUS_CMD= STATUS, ECF_CHECK_CMD= CHECK, ECF_EXTN= ".sms", ACCOUNT= "UNSET", SCHOST= "localhost", ECF_INCLUDE= idir, ECF_HOME= jdir, ECF_FILES= wdir, QUEUE= "ns", USER= user, LOGDIR= "/tmp", TOPATH= udir + "/logs", SLEEP= 1, # x120 ECF_TRIES= 1,), call_python(), call_perl(), ) msg = "have you? setup ssh login, created remote directory" msg += ", considered the need for a logserver" node.family("main").add( Repeat(name="YMD", kind="date", start=20010101, end=20991231, step=1), Family("00").add( Task("REPLACE_ME").add(Defstatus("complete")) ), Family("loop").add( Label("info", "prevent direct repeat increment"), Trigger("./00==complete"), Complete("/%s:1" % node.name()), Time("12:00"))) node = node.family("submit").add( Variables(USER= user, ACCOUNT= account), self.submits(), ) def submits(self): # global HOST, user ct1logs = "/sc1/sb/%s/logs" % user out = [ unit("cca", "cca", "ns", ct1logs) ] out += (unit("localhost", "localhost", "ns", "/tmp/%s/logs" % user), unit("lxab", "lxab", "serial", udir + "/logs"), unit("ecgb", "ecgb", "ns", udir + "/logs"), ) return out def usage(): print sys.argv[0] + ''' -h -u [user] -n [node] -s [suite-name] \ -e: ecflow''' if __name__ == "__main__": global user, udir, ecflow, HOST SUITES = {"course": Course, } user = get_username() try: HOST = sys.argv[-1] except: HOST = "localhost" ECFLOW = True SUITE = None user = get_username() opts, args = getopt.getopt( sys.argv[1:], "hp:u:es:n:p:", ["help", "port", "user", "ecflow", "suite", "node", "path"]) output = None verbose = False for o, a in opts: if o in ("-n", "--node"): HOST = a elif o in ("-h", "--help"): usage() sys.exit() elif o in ("-s", "--suite"): SUITE = a elif o in ("-p", "--path"): path = a elif o in ("-e", "--ecflow"): ecflow = True elif o in ("-u", "--user"): user = a elif o in ("-p", "--port"): port = a else: print "#ERR: what?", o, a; assert False, "unhandled option" ########### SETTINGS ################ WRAPPERS global wdir, rdir, fdef, jdir udir = pwd.getpwnam(user).pw_dir wdir = udir + "/ecflow_server/course" jdir = "/tmp/%s/ecflow" % user ddef = udir + "/ecflow_server" fdef = ddef + "/course.def" idir = "$HOME/ecflow_server/course/include" try: os.makedirs(wdir) except: pass try: os.makedirs(jdir) except: pass create_task() ########### SETTINGS ################ LOAD if 1: client = Client(sys.argv[3], 1000 + int(get_uid())) top = Course().suite() if 1: out = file("%s.exp" % top.name(), 'w'); print >>out, top defs = Defs() defs.add_suite(top) client.replace(sys.argv[2], defs) sys.exit(0)
- No labels