#!/usr/bin/env python
# -*- coding= UTF-8 -*-
""" course 2014
sample suite to spawn jobs on main available cpu facilities
at ECMWF
"""
from __future__ import with_statement
import sys, os, pwd, getopt
sys.path.append('/home/ma/emos/def/o/def')
import inc_emos as ic
from inc_emos import (Family, Task, VariablesEdit, Meter, Event, \
Label, Limit, If, Complete, Time, Trigger, Client)
from ecf import get_username, Clock, Defstatus, Inlimit, Repeat, get_uid, Defs
# import ecf; ecf.USE_TRIGGER = 0 # DEBUG MODE: not to load triggers
def create_task():
""" create example tasks wrappers and include files"""
### task
content = """#!/bin/ksh
%manual
manual - this task is automatically created by cray.py
%end
%include <qsub.h>
if [[ $ARCH = hp* ]]; then export PATH=/usr/local/bin:$PATH; fi
if [[ $HOST = lxop* ]]; then export PATH=/usr/local/apps/ecflow/current:/usr/local/apps/sms/bin:$PATH; fi
%include <trap.h>
SLEEP=%SLEEP:0%
echo OK
case %ECF_PORT:0% in
0) base=900000; LOGPORT=%LOGPORT:0%;;
*) base=1000; LOGPORT=%LOGPORT:0%;;
esac
base=1000
case $ARCH in
cray)xlabel info $(printenv | grep -E '(SUBMIT_|EC_)') ;;
*) xlabel info OK
esac
printenv | sort
xevent 1
step=0
while (( $step <= 12 )); do
xmeter step $step
((step = step + 1))
sleep $SLEEP
done
%include <endt.h>
"""
create_wrapper("test.sms", content)
### PYTHON TASK
content = """$include <python_header.h>
# header files are located in the same directory as wrapper (quotes)
for step in range(0,101):
print step
xmeter("step", step)
else:
print 'the loop is over'
xevent("1")
xlabel("info", "news from pure python world")
$include <python_endt.h>
"""
create_wrapper("python.sms", content)
content = """#!/usr/bin/env python
import os
import sys
import signal
ECF_PORT=$ECF_PORT:0$
XECF="/usr/local/apps/ecflow/current/bin/ecflow_client ";
# --port =$ECF_PORT:0$ --host =$ECF_NODE:0$ ";
def SigHandler(signum, frame):
print "caught signal " + signum
xabort()
sys.exit(0);
return
print ECF_PORT
# set -eux ?
# trap 0 ?
# time stamp per executed line
import atexit
early_exit = True
@atexit.register
def goodbye():
if early_exit:
print "too early"
xabort()
else:
xcomplete()
# TIME_STAMP
# http://shop.oreilly.com/product/9780596007973.do # recipie p436
import syslog, time
class FunctionFileLikeWrapper():
def __init__(self, func): self.func = func
def write(self, msg): self.func(msg)
def flush(self): pass
class TimeStamper(object):
msg_format = "[%y%m%d %H:%M:%S]", time.gmtime, "%s: %s"
msg_format = "+ %H:%M:%S", time.gmtime, "%s %s"
def __call__(self, msg):
tfmt, tfun, gfmt = self.msg_format
return "%s %s\\\n" % (time.strftime(tfmt, tfun()), msg)
class TeeFileLikeWrapper():
def __init__(self, *files): self.files = files
def write(self, msg):
for f in self.files: f.write(timestamp(msg.strip()))
class FlushingWrapper:
def __init__(self, *files): self.files = files
def write(self, msg):
for f in self.files:
f.write(timestamp(msg))
# f.write(timestamp(msg.strip()))
f.flush()
def logto(*files):
# sys.stdout = TeeFileLikeWrapper(*files)
sys.stdout = FlushingWrapper(*files)
syslogger = syslog.syslog
syslogfile = FunctionFileLikeWrapper(syslogger)
timestamp = TimeStamper()
logto(sys.stdout, syslogfile, open("log.tmp", "w"))
# end time stamp
if ECF_PORT > 0:
os.environ['ECF_PORT'] = "$ECF_PORT:0$"
os.environ['ECF_NAME'] = "$ECF_NAME:0$"
os.environ['ECF_NODE'] = "$ECF_NODE:0$"
os.environ['ECF_PASS'] = "$ECF_PASS:0$"
def xinit():
os.system(XECF + " --init " + str(os.getpid()))
print "init"
def xabort():
os.system(XECF + " --abort")
def xcomplete():
os.system(XECF + " --complete")
def xmeter(name, step):
os.system(XECF + " --meter " + name + " " + str(step))
def xevent(name):
os.system(XECF + " --event " + name)
def xlabel(name, msg):
os.system(XECF + " --label " + name + " '%s'" % msg)
else:
os.environ['SMS_PROG'] = "$SMS_PROG:0$"
os.environ['SMSNAME'] = "$SMSNAME:0$"
os.environ['SMSNODE'] = "$SMSNODE:0$"
os.environ['SMSPASS'] = "$SMSPASS:0$"
def xinit():
os.system('smsinit ' + str(os.getpid()))
def xabort():
os.system('smsabort')
def xcomplete():
os.system('smscomplete')
def xmeter(name, step):
os.system("smsmeter " + name + " " + str(step))
def xevent(name):
os.system("smsevent " + name)
def xlabel(name, msg):
os.system("smslabel " + name + " '%s'" % msg)
signal.signal (signal.SIGHUP, SigHandler)
signal.signal (signal.SIGINT, SigHandler)
signal.signal (signal.SIGQUIT, SigHandler)
signal.signal (signal.SIGILL, SigHandler)
signal.signal (signal.SIGTRAP, SigHandler)
signal.signal (signal.SIGIOT, SigHandler)
signal.signal (signal.SIGBUS, SigHandler)
signal.signal (signal.SIGFPE, SigHandler)
"""
create_wrapper("python_header.h", content)
content = """# os.system('smscomplete')
early_exit = False
# xcomplete() # managed with atexit
# os.system('/usr/local/apps/sms/bin/ecflow/bin/ecf_client --complete')
"""
create_wrapper("python_endt.h", content)
### PERL TASK
create_wrapper("perl.sms", """#!/usr/bin/perl -w
^include <perl_header.h>
# header files are located in the same directory as wrapper (quotes)
print "Pure perl SMS task";
for ( my $step=1; $step <= 100 ; $step++ ) {
print "this is the number $step\\\n";
xmeter("step", $step);
}
xevent("1");
xlabel("info", "news from pure perl world");
^include <perl_endt.h>
""")
# create_wrapper("perl.sms", content)
source = """use strict;
my $xmeter = "smsmeter"; my $arg_m = "";
my $xlabel = "smslabel"; my $arg_l = "";
my $xevent = "smsevent"; my $arg_e = "";
my $xcomplete = "smscomplete"; my $arg_c = "";
my $xabort = "smsabort";
if (^ECF_PORT:0^ != 0) {
$ENV{'ECF_PORT'} = "^ECF_PORT:0^" ; # ecFlow port number
$ENV{'ECF_NODE'} = "^ECF_NODE:0^" ; # ecFlow host
$ENV{'ECF_NAME'} = "^ECF_NAME:0^" ; # task path into the suite
$ENV{'ECF_PASS'} = "^ECF_PASS:0^" ; # password for the job
$ENV{'ECF_TRYNO'} = "^ECF_TRYNO:0^" ; # job occurenceoccurrence number
my $client = "/usr/local/apps/ecflow/current/bin/ecflow_client";
$xmeter = $client; $arg_m = "--meter";
$xlabel = $client; $arg_l = "--label";
$xevent = $client; $arg_e = "--event";
$xcomplete = $client; $arg_c = "--complete";
$xabort = $client;
system($client, "--init", "$$");
} else {
$ENV{'SMS_PROG'} = "^SMS_PROG:0^" ; # SMS Program Number
$ENV{'SMSNODE'} = "^SMSNODE:0^" ; # SMS host
$ENV{'SMSNAME'} = "^SMSNAME:0^" ; # task path into the suite
$ENV{'SMSPASS'} = "^SMSPASS:0^" ; # password for the job occurenceoccurrence
$ENV{'SMSTRYNO'} = "^SMSTRYNO:0^" ; # job occurenceoccurrence number
}
sub xmeter($$){ my ($name, $step) = @_;
system($xmeter, $arg_m, $name, $step); }
sub xevent($){ my ($name) = @_;
system($xevent, $arg_e, $name); }
sub xlabel($$){ my ($name, $msg) = @_;
system($xlabel, $arg_l, $name, $msg); }
sub xabort(){ system($xabort); }
sub xcomplete(){ system($xcomplete, $arg_c, "$$"); }
print "start";
eval '
"""
create_wrapper("perl_header.h", source)
source = """';
if ($@){
print "caught signal: $@\n";
xabort();
exit;
}
print "the job is now complete\n";
xcomplete();
exit;"""
create_wrapper("perl_endt.h", source)
def create_wrapper(name, content):
""" wrapper creation """
print "#MSG: creating file %s/" % wdir + name
wrapper = open(wdir + "/%s" % name, 'w')
print >> wrapper, content
wrapper.close()
##############################################
def dummy():
""" create test task"""
return Task("test")
def limits():
"""create limits famly"""
return (Family("limits").add(
Defstatus("complete"),
Limit("lim", 10),
Limit("test", 10),
Limit("hpc", 10),
Limit("cca", 10),
Limit("c2a", 10),
Inlimit(ic.psel() + "/limits:lim")))
HCRAY = "cct"
SUBMIT = ic.SUBM + " %USER% %SCHOST% %ECF_JOB% %ECF_JOBOUT% submit"
GSUB = SUBMIT.replace("%SCHOST%", "%SCHOST% %ECF_RID%")
KILL = GSUB.replace(" submit", " kill")
STATUS = GSUB.replace(" submit", " status")
CHECK = STATUS
def unit(name, schost, queue, rdir, account="", leaf=True):
""" course suite tree leave family"""
return Family(name).add(
If(account != "",
VariablesEdit(ACCOUNT= account)),
VariablesEdit(QUEUE= queue,
SCHOST= schost,
ECF_OUT= rdir,
LOGDIR= rdir,),
If (leaf, Task("test").add(
Event(1),
Meter("step", -1, 120, 100),
Label("info", "nop"))))
def call_python():
""" pure python task example"""
return Task("python").add(
VariablesEdit(ECF_MICRO= "$",
ECF_INCLUDE= wdir,
SCHOST= "localhost",
ECF_JOB_CMD= "$ECF_JOB$ > $ECF_JOBOUT$ 2>&1 &"),
Event("1"),
Label("info", "micro is $"),
Meter("step", -1, 100),
# Defstatus("complete"),
)
def call_perl():
""" pure perl task example"""
return Task("perl").add(
Label("todo", "time-stamp PS4, set eux, exit 0 1"),
VariablesEdit(ECF_MICRO= "^",
ECF_INCLUDE= wdir,
SCHOST= "localhost",
ECF_JOB_CMD= "^ECF_JOB^ > ^ECF_JOBOUT^ 2>&1 &"),
Event("1"),
Label("info", "micro is ^"),
Meter("step", -1, 100),
)
##########################################################################
class Course(ic.SeedOD):
""" example class for a suite suite definition"""
def __init__(self):
super(Course, self).__init__()
def setup(self, node):
account = "UNSET"
global user, host, SUITE
self.defs.add_extern("/o/main:YMD")
node.add(
Clock("real"),
Defstatus("suspended"),
Event("1"),
Meter("step", -1, 100),
Label("info", "click edit to update manually"),
limits(),
VariablesEdit(ECF_JOB_CMD= SUBMIT,
ECF_KILL_CMD= KILL,
ECF_STATUS_CMD= STATUS,
ECF_CHECK_CMD= CHECK,
ECF_EXTN= ".sms",
ACCOUNT= "UNSET",
SCHOST= "localhost",
ECF_INCLUDE= idir,
ECF_HOME= jdir,
ECF_FILES= wdir,
QUEUE= "ns", USER= user, LOGDIR= "/tmp",
TOPATH= udir + "/logs",
SLEEP= 1, # x120
ECF_TRIES= 1,),
call_python(),
call_perl(),
)
msg = "have you? setup ssh login, created remote directory"
msg += ", considered the need for a logserver"
node.family("main").add(
Repeat(name="YMD", kind="date",
start=20010101, end=20991231, step=1),
Family("00").add(
Task("REPLACE_ME").add(Defstatus("complete"))
),
Family("loop").add(
Label("info", "prevent direct repeat increment"),
Trigger("./00==complete"),
Complete("/%s:1" % node.name()),
Time("12:00")))
node = node.family("submit").add(
VariablesEdit(USER= user,
ACCOUNT= account),
self.submits(),
)
def submits(self):
# global HOST, user
ct1logs = "/sc1/sb/%s/logs" % user
out = [ unit("cca", "cca", "ns", ct1logs) ]
out += (unit("localhost", "localhost", "ns", "/tmp/%s/logs" % user),
unit("lxab", "lxab", "serial", udir + "/logs"),
unit("ecgb", "ecgb", "ns", udir + "/logs"),
)
return out
def usage():
print sys.argv[0] + ''' -h -u [user] -n [node] -s [suite-name] \
-e: ecflow'''
if __name__ == "__main__":
global user, udir, ecflow, HOST
SUITES = {"course": Course,
}
user = get_username()
try: HOST = sys.argv[-1]
except: HOST = "localhost"
ECFLOW = True
SUITE = None
user = get_username()
opts, args = getopt.getopt(
sys.argv[1:], "hp:u:es:n:p:",
["help", "port", "user", "ecflow", "suite", "node", "path"])
output = None
verbose = False
for o, a in opts:
if o in ("-n", "--node"):
HOST = a
elif o in ("-h", "--help"):
usage()
sys.exit()
elif o in ("-s", "--suite"):
SUITE = a
elif o in ("-p", "--path"):
path = a
elif o in ("-e", "--ecflow"):
ecflow = True
elif o in ("-u", "--user"):
user = a
elif o in ("-p", "--port"):
port = a
else: print "#ERR: what?", o, a; assert False, "unhandled option"
########### SETTINGS ################ WRAPPERS
global wdir, rdir, fdef, jdir
udir = pwd.getpwnam(user).pw_dir
wdir = udir + "/ecflow_server/course"
jdir = "/tmp/%s/ecflow" % user
ddef = udir + "/ecflow_server"
fdef = ddef + "/course.def"
idir = "$HOME/ecflow_server/course/include"
try:
os.makedirs(wdir)
except:
pass
try:
os.makedirs(jdir)
except:
pass
create_task()
########### SETTINGS ################ LOAD
if 1:
client = Client(sys.argv[3], 1000 + int(get_uid()))
top = Course().suite()
if 1:
out = file("%s.exp" % top.name(), 'w'); print >>out, top
defs = Defs()
defs.add_suite(top)
client.replace(sys.argv[2], defs)
sys.exit(0)
|