Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Confirmed.

...

Code Block
languagepy
themeEclipse
titleecflow_suite.py
collapsetrue
 #!/usr/local/apps/python/current/bin/python
import getopt
import os
import pwd
# import sh
import sys
sys.path.append("/opt/cray/sdb/1.0-1.0502.50558.6.21.ari/lib64/py")
def get_username():
    return pwd.getpwuid( os.getuid() )[ 0 ]
def get_uid():
    return pwd.getpwnam(get_username()).pw_uid
"""
python docstring: demonstrate python cli + ecFlow task wrapper + task loader
$manual
DESCRIPTION:
sweeper:
find out min/max among modeleps_nemo tasks
OPERATORS: please, set complete if problematic
ANALYST: example as pure python task - job
$end
$comment
  comments can be added ...
$end
"""
PATH = "/usr/local/apps/ecflow/4.0.9/lib/python2.7/site-packages/ecflow"
sys.path.insert(0, PATH)
import ecflow as ec
MICRO = "$$" # double dollar to please ecFlow micro character balance
task_content = """#!/bin/ksh
#!/bin/bash # NOK while typeset -Z2 may be found... if [[ var =  @(v1) ]]; then ...
%include <trap.h>
inc=%ECF_HOME%/%SUITE%/%FAMILY%/preproc.job1
if [[ -f $inc ]]; then
  . $inc
# deterministic runs are lost:
\rm -f $inc || :
fi
xevent 1
i=0; while ((i < 100)) ; do xmeter step $i; ((i+=1)); done
xlabel info "done"
trap 0; xcomplete; exit 0
"""
inc_content = """# request from Blazej?
xevent 2
xlabel info "%SUITE% %TASK%"
"""
def create_wrapper(name, content):
    wrapper = open(name, "w")
    print >>wrapper, content
    wrapper.close()
def loader(test=1, # host=os.getenv("ECF_TEST_HOST", None), 
           host=None, port=None, path=None):
    """ replace sweeper task in test/oper mode """
    sys.path.append("/home/ma/emos/def/o/def")
    import ecf
    from ecf import Suite, Family, Task, Variables, Trigger, Complete, Repeat, Defstatus, Event, Meter, Label, Limit, Inlimit
    if test:
        env = {"host": os.getenv("ECF_TEST_HOST", None),
               "port": os.getenv("ECF_TEST_PORT", None),
               "path": '/' + SUITE + "/submit", }
    else:
        env = {"host": os.getenv("ECF_OPER_HOST", None),
               "port": os.getenv("ECF_OPER_PORT", None),
               "path": '/' + SUITE + "/submit", }
    defs = ecf.Defs()
    
    if host: env["host"] = host
    if port: env["port"] = port
    if env["host"] != "eurus": print host, env["host"]; raise
    if path is None: path = env["path"]
    try: sname = str(path.split('/')[1])
    except: sname = "suite"
    try: fname = str(path.split('/')[2])
    except: fname = "submit"
    user = get_username()
    submit = "/home/ma/emos/bin/trimurti.41r2"
    rid = submit + " %USER% %HOST% %ECF_RID:0% %ECF_JOB% $ECF_JOBOUT% "    
    ecf_home = pwd.getpwnam(user).pw_dir + "/ecflow_server"
    ecf_files = ecf_home + "/smsfiles"        
    ecf_include = ecf_home + "/include"
              
    client=" ECF_PASS=%ECF_PASS% ECF_NAME=%ECF_NAME% ecflow_client "
    hosts = ["cca", "ccb", "cct", "ecgb", "lxc", "opensuse131", "localhost"]
    suite = ecf.Suite(sname).add(
        Defstatus("suspended"),
        Variables(ECF_JOB_CMD= submit + " %USER% %HOST% %ECF_JOB% %ECF_JOBOUT%",
                  ECF_KILL_CMD= rid + "kill",
                  ECF_STATUS_CMD= rid + "status",
                  ECF_CHECK_CMD= rid + "check",
                  USER = user,
                  ECF_HOME= ecf_home,
                  ECF_FILES= ecf_files,
                  ECF_INCLUDE= ecf_include,
                  ECF_EXTN= ".sms",
                  HOST="localhost", LOGDIR= "%ECF_HOME%", SMSOUT= "%ECF_HOME%", # trap.h
                  ),
        Family("limits").add(Defstatus("complete"),
                             Label("memo", "use click-mouse-3-Edit to leave a message"),
                             Label("begin", "Edit-Pref-Admin + suite-node-click3-Begin"),
                             Label("example", "four cases: no job, no submit - no job but submit - job created but no submit - job + submit"),
                             Limit("tasks", 10)),
        Inlimit("/%s/limits:tasks" % sname),
        
        Task("dummy").add(Variables(ECF_DUMMY_TASK= 1)),
        Family("ssh_login").add(
            Label("one_liner", "dummy script, no need for job, execute ECF_JOB_CMD"),
            Repeat("HOST", start=hosts, kind="enumerated"),
            Variables(ECF_JOB_CMD= client + "--init;" +
                      # "xterm -T %HOST% -e ssh %HOST%;" + 
                      "ssh %HOST% pwd;" +
                      client + "--complete;"),
            Task("simple"),
        ),
        Family("preprocess").add(
            Task("preproc").add(
                Variables(ECF_JOB_CMD= client + " --init;"
                          + client + "--complete")),
            Task("simple").add(
                Label("info", ""),
                Event(2),
                Trigger("preproc eq complete")),
        ),
        Family("submit").add(
            Trigger(["ssh_login", ]),
            [ Family(host).add(
                Variables(HOST= host),
                Task("simple").add(Event(1),
                                   Meter("step", -1, 100, 90),
                                   Label("info", "")),
            ) for host in hosts ]
        ),
        Family("process").add(
            Label("info", "outer-follower-family + inner-run-sometime example"),
            Label("stop", "at first abort"),
            Trigger("./process ne aborted"),
            Family("daily").add(
                Repeat("YMD", 20160101, 20321212, kind="date"),
                Task("simple"),
                Family("decade").add(
                    Label("info", "Show-Icons-Complete"),
                    Complete("../daily:YMD % 10 ne 0"),
                    Task("simple")),
            ),
            Family("monthly").add(
                Trigger("monthly:YM lt daily:YMD / 100 or daily eq complete"),
                Repeat(name="YM", start=
                       [ "%d" % ( YM) 
                         for YM in range(201601, 2013212203212+1) 
                         if (YM % 100) < 13],
                       kind="enum"),
                Task("simple"),
                Family("odd").add(
                    Complete("../monthly:YM % 2 eq 0"),
                    Task("simple")),
),
            Family("yearly").add(
                Repeat("Y", 2016, 2032, kind="integer"),
                Trigger("yearly:Y lt daily:YMD / 10000 or daily eq complete"),
                Task("simple"),
                Family("decade").add(
                    Complete("../yearly:Y % 10 ne 0"),
                    Task("simple")),
                Family("century").add(
                    Complete("../yearly:Y % 100 ne 0"),
                    Task("simple")),
            )
        )
    )
    # sh.mkdir(ecf_files);     sh.mkdir( ecf_include);     sh.ln("-sf ~emos/def/o/trap.h %s" % ecf_include)
    os.system(""
+ "mkdir %s;" % ecf_files 
+ "mkdir %s;" % ecf_include 
+ "ln -sf ~emos/def/o/include/trap.h %s;" % ecf_include # trap + init + lxop .running
              + "touch %s/rcp.h;" % ecf_include 
              + "touch %s/env_setup.h;" % ecf_include 
# +"ln -sf ~emos/def/o/include/env_setup.h %s;" % ecf_include # link PBS job to .running
# +"ln -sf ~emos/def/o/include/rcp.h %s;" % ecf_include # rapatriate remote job@completion
    )
    create_wrapper(ecf_files + "/simple.sms", task_content)
    create_wrapper(ecf_files + "/preproc.sms", inc_content)
    defs.add_suite(suite)
    print "#MSG: replacing", path, env["host"], env["port"]
    ecf.Client(env["host"], env["port"]).replace(path, defs, 1, 1)
class Seed(object):
    """ find min/max meter value below a node """
    def __init__(self, host=None, port=None, path=None, task=None):
        import signal
        if host is None:
            host = os.getenv("ECF_NODE", "$ECF_NODE$")
        if port is None and not MICRO[0] in host:
            try: port = int(os.getenv("ECF_PORT", "$ECF_PORT$"))
            except: port = 31415
        if host and port:
            self.clt = ec.Client(host, port)
        else: self.clt = None
        self.cl2 = None
        if path is None:
            if "$SUITE$" != "/admin" and not "SUITE" in "$SUITE$":
                self.path = ( "/$SUITE$/$FAMILY$/%s" % legs[0],
                              "/$SUITE$/$FAMILY$/%s" % legs[1],
                          )
            else: self.path = MC_STEPS_LIST
        else: self.path = (path, )
        if task is None:
            self.task = TARGET_TASK
        else: self.task = task
        # shall make sense when processed into job by ecflow
        # name remains when not processed...
        ecfv = {"ECF_NODE": "$ECF_NODE$",
                "ECF_PASS": "$ECF_PASS$",
                "ECF_NAME": "$ECF_NAME$",
                "ECF_PORT": "$ECF_PORT$",
                "ECF_TRYNO": "$ECF_TRYNO$",        }
        for key, val in ecfv.items():
            if key in val:
                ecfv[key] = os.getenv(key, None)
        if ecfv["ECF_NODE"] and ecfv["ECF_NAME"]:
            print "#MSG will communicate with server..."
            print "#kill: ssh %s kill -15 %d" % (ecfv["ECF_NODE"], os.getpid())
            self.cl2 = ec.Client()
            self.cl2.set_host_port(ecfv["ECF_NODE"], ecfv["ECF_PORT"])
            self.cl2.set_child_pid(os.getpid())
            self.cl2.set_child_path(ecfv["ECF_NAME"])
            self.cl2.set_child_password(ecfv["ECF_PASS"])
            self.cl2.set_child_try_no(int(ecfv["ECF_TRYNO"]))
            self.cl2.child_init()
            self.cl2.set_child_timeout(20)
            for sig in (signal.SIGINT,
                        signal.SIGHUP,
                        signal.SIGQUIT,
                        signal.SIGILL,
                        signal.SIGTRAP,
                        signal.SIGIOT,
                        signal.SIGBUS,
                        signal.SIGFPE,
                        signal.SIGUSR1,
                        signal.SIGUSR2,
                        signal.SIGPIPE,
                        signal.SIGTERM,
                        signal.SIGXCPU,
                        signal.SIGPWR):
                signal.signal(sig, self.signal_handler)
    def signal_handler(self, signum, frame):
        """ catch signal """
        print 'Aborting: Signal handler called with signal ', signum
        self.cl2.child_abort("Signal handler called with signal " + str(signum))
    def report(self, msg, meter=None):
        """ communicate with ecFlow server """
        if not self.cl2:
            return
        if meter:
            self.cl2.child_meter(msg, meter)
        elif msg == "stop":
            self.cl2.child_complete()
            sys.exit(0)
        else: self.cl2.child_label("info", msg)
def usage():
    """ help """
    print """client
  -o: operational node is to be replaced, default is test node,
      provide this option BEFORE -r
  -r: replace task node
  -h: this help
ECF_NODE=localhost ECF_PORT=31415 ./ecflow_suite.py -s start
"""
if __name__ == '__main__':
    try:
        OPTS, ARGS = getopt.getopt(
            sys.argv[1:], "hros:t:n:p:",
            ["help", "replace", "oper", "task", "suite", "node", "port"])
    except getopt.GetoptError as err:
        print "# what?", usage()
        sys.exit(2)
    TEST = 1
    PATH = None
    TASK = None
    SUITE = "start"
    PORT = None
    NODE = None
    for o, a in OPTS:
        if o in ("-r", "--replace", ):
            loader(TEST, host=NODE, port=PORT, path=TASK)
            sys.exit(0)
        # elif o in ("-p", "--path", ): PATH = a
        elif o in ("-o", "--oper", ):
            TEST = 0
        elif o in ("-t", "--task", ):
            TASK = a
        elif o in ("-s", "--suite", ): SUITE = a
        elif o in ("-n", "--node", ): NODE = a
        elif o in ("-p", "--port", ): PORT = a
"""
python ecflow_suite.py -s suite -p 1630 -n eurus -r 
"""

...

Code Block
languagebash
themeEclipse
titleload suite
python /home/ma/emos/def/o/admin/ecflow_suite.py -s suite -p $(($(id -u) + 1500)) -n localhost -r 


Center

suite template

...


Another example would be for seasonal suites, where forecast is on the first of the month, run few days after (here the 4th, in this example), where families may have to work with previous month date.

Code Block
            Family("seas").add(
                Label("info", "micro as tilde"),
                Variables(ECF_MICRO= "~"),
                Repeat(name="YM", start=
                       [ "%d" % ( YM) 
                         for YM in range(201601, 203212+1) 
                         if (YM % 100) < 13],
                       kind="enum"),
                Family("assim").add(
                    Variables(YMD= "$(if [[ ~YM~ = *01 ]]; then echo $((~YM~ - 89)); else echo $((~YM~ -1)); fi)01", ),
                    Label("info", "back one month"),
                    Task("impossible").add(Trigger("1==0")), # replace me
                ),
                Family("fc").add(
                    Variables(YMD= "~YM~01"),
                    Trigger("assim eq complete"),
                    Task("impossible").add(Trigger("1==0")), # replace me
                ),
                Family("hold").add(
                    Time("10:00"),
                    Date("4.*.*"),
                    Task("always").add(
                        Trigger("1==0"),
                        Complete("1==1")),
                ),

 

...



Center