ecFlow's documentation is now on readthedocs!

Few questions were about starting a project from scratch, creating a "template suite", or a suite skeleton, where to define tasks targeting one destination or another at the centre.It generates a simple task wrapper and link the trap.h from emos.

It is also common to design a suite with several families working at different frequency (daily, monthly, yearly). This template provides such structure as an example.

Another expectation from this example is to show that "action" attribute was not preserved with ecFlow, while it can be simply replaced with a one liner task. This use case is not advocated: it can be attractive for simple "one liner" command, but error message may not be clear, would there be any, "when things go wrong"

ecflow_suite.py
 #!/usr/local/apps/python/current/bin/python
import getopt
import os
import pwd
# import sh
import sys
sys.path.append("/opt/cray/sdb/1.0-1.0502.50558.6.21.ari/lib64/py")
def get_username():
    return pwd.getpwuid( os.getuid() )[ 0 ]
def get_uid():
    return pwd.getpwnam(get_username()).pw_uid
"""
python docstring: demonstrate python cli + ecFlow task wrapper + task loader
$manual
DESCRIPTION:
sweeper:
find out min/max among modeleps_nemo tasks
OPERATORS: please, set complete if problematic
ANALYST: example as pure python task - job
$end
$comment
  comments can be added ...
$end
"""
PATH = "/usr/local/apps/ecflow/4.0.9/lib/python2.7/site-packages/ecflow"
sys.path.insert(0, PATH)
import ecflow as ec
MICRO = "$$" # double dollar to please ecFlow micro character balance
task_content = """#!/bin/ksh
#!/bin/bash # NOK while typeset -Z2 may be found... if [[ var =  @(v1) ]]; then ...
%include <trap.h>
inc=%ECF_HOME%/%SUITE%/%FAMILY%/preproc.job1
if [[ -f $inc ]]; then
  . $inc
# deterministic runs are lost:
\rm -f $inc || :
fi
xevent 1
i=0; while ((i < 100)) ; do xmeter step $i; ((i+=1)); done
xlabel info "done"
trap 0; xcomplete; exit 0
"""
inc_content = """# request from Blazej?
xevent 2
xlabel info "%SUITE% %TASK%"
"""
def create_wrapper(name, content):
    wrapper = open(name, "w")
    print >>wrapper, content
    wrapper.close()
def loader(test=1, # host=os.getenv("ECF_TEST_HOST", None), 
           host=None, port=None, path=None):
    """ replace sweeper task in test/oper mode """
    sys.path.append("/home/ma/emos/def/o/def")
    import ecf
    from ecf import Suite, Family, Task, Variables, Trigger, Complete, Repeat, Defstatus, Event, Meter, Label, Limit, Inlimit
    if test:
        env = {"host": os.getenv("ECF_TEST_HOST", None),
               "port": os.getenv("ECF_TEST_PORT", None),
               "path": '/' + SUITE + "/submit", }
    else:
        env = {"host": os.getenv("ECF_OPER_HOST", None),
               "port": os.getenv("ECF_OPER_PORT", None),
               "path": '/' + SUITE + "/submit", }
    defs = ecf.Defs()
    
    if host: env["host"] = host
    if port: env["port"] = port
    if env["host"] != "eurus": print host, env["host"]; raise
    if path is None: path = env["path"]
    try: sname = str(path.split('/')[1])
    except: sname = "suite"
    try: fname = str(path.split('/')[2])
    except: fname = "submit"
    user = get_username()
    submit = "/home/ma/emos/bin/trimurti.41r2"
    rid = submit + " %USER% %HOST% %ECF_RID:0% %ECF_JOB% $ECF_JOBOUT% "    
    ecf_home = pwd.getpwnam(user).pw_dir + "/ecflow_server"
    ecf_files = ecf_home + "/smsfiles"        
    ecf_include = ecf_home + "/include"
              
    client=" ECF_PASS=%ECF_PASS% ECF_NAME=%ECF_NAME% ecflow_client "
    hosts = ["cca", "ccb", "cct", "ecgb", "lxc", "opensuse131", "localhost"]
    suite = ecf.Suite(sname).add(
        Defstatus("suspended"),
        Variables(ECF_JOB_CMD= submit + " %USER% %HOST% %ECF_JOB% %ECF_JOBOUT%",
                  ECF_KILL_CMD= rid + "kill",
                  ECF_STATUS_CMD= rid + "status",
                  ECF_CHECK_CMD= rid + "check",
                  USER = user,
                  ECF_HOME= ecf_home,
                  ECF_FILES= ecf_files,
                  ECF_INCLUDE= ecf_include,
                  ECF_EXTN= ".sms",
                  HOST="localhost", LOGDIR= "%ECF_HOME%", SMSOUT= "%ECF_HOME%", # trap.h
                  ),
        Family("limits").add(Defstatus("complete"),
                             Label("memo", "use click-mouse-3-Edit to leave a message"),
                             Label("begin", "Edit-Pref-Admin + suite-node-click3-Begin"),
                             Label("example", "four cases: no job, no submit - no job but submit - job created but no submit - job + submit"),
                             Limit("tasks", 10)),
        Inlimit("/%s/limits:tasks" % sname),
        
        Task("dummy").add(Variables(ECF_DUMMY_TASK= 1)),
        Family("ssh_login").add(
            Label("one_liner", "dummy script, no need for job, execute ECF_JOB_CMD"),
            Repeat("HOST", start=hosts, kind="enumerated"),
            Variables(ECF_JOB_CMD= client + "--init;" +
                      # "xterm -T %HOST% -e ssh %HOST%;" + 
                      "ssh %HOST% pwd;" +
                      client + "--complete;"),
            Task("simple"),
        ),
        Family("preprocess").add(
            Task("preproc").add(
                Variables(ECF_JOB_CMD= client + " --init;"
                          + client + "--complete")),
            Task("simple").add(
                Label("info", ""),
                Event(2),
                Trigger("preproc eq complete")),
        ),
        Family("submit").add(
            Trigger(["ssh_login", ]),
            [ Family(host).add(
                Variables(HOST= host),
                Task("simple").add(Event(1),
                                   Meter("step", -1, 100, 90),
                                   Label("info", "")),
            ) for host in hosts ]
        ),
        Family("process").add(
            Label("info", "outer-follower-family + inner-run-sometime example"),
            Label("stop", "at first abort"),
            Trigger("./process ne aborted"),
            Family("daily").add(
                Repeat("YMD", 20160101, 20321212, kind="date"),
                Task("simple"),
                Family("decade").add(
                    Label("info", "Show-Icons-Complete"),
                    Complete("../daily:YMD % 10 ne 0"),
                    Task("simple")),
            ),
            Family("monthly").add(
                Trigger("monthly:YM lt daily:YMD / 100 or daily eq complete"),
                Repeat(name="YM", start=
                       [ "%d" % ( YM) 
                         for YM in range(201601, 203212+1) 
                         if (YM % 100) < 13],
                       kind="enum"),
                Task("simple"),
                Family("odd").add(
                    Complete("../monthly:YM % 2 eq 0"),
                    Task("simple")),
),
            Family("yearly").add(
                Repeat("Y", 2016, 2032, kind="integer"),
                Trigger("yearly:Y lt daily:YMD / 10000 or daily eq complete"),
                Task("simple"),
                Family("decade").add(
                    Complete("../yearly:Y % 10 ne 0"),
                    Task("simple")),
                Family("century").add(
                    Complete("../yearly:Y % 100 ne 0"),
                    Task("simple")),
            )
        )
    )
    # sh.mkdir(ecf_files);     sh.mkdir( ecf_include);     sh.ln("-sf ~emos/def/o/trap.h %s" % ecf_include)
    os.system(""
+ "mkdir %s;" % ecf_files 
+ "mkdir %s;" % ecf_include 
+ "ln -sf ~emos/def/o/include/trap.h %s;" % ecf_include # trap + init + lxop .running
              + "touch %s/rcp.h;" % ecf_include 
              + "touch %s/env_setup.h;" % ecf_include 
# +"ln -sf ~emos/def/o/include/env_setup.h %s;" % ecf_include # link PBS job to .running
# +"ln -sf ~emos/def/o/include/rcp.h %s;" % ecf_include # rapatriate remote job@completion
    )
    create_wrapper(ecf_files + "/simple.sms", task_content)
    create_wrapper(ecf_files + "/preproc.sms", inc_content)
    defs.add_suite(suite)
    print "#MSG: replacing", path, env["host"], env["port"]
    ecf.Client(env["host"], env["port"]).replace(path, defs, 1, 1)
class Seed(object):
    """ find min/max meter value below a node """
    def __init__(self, host=None, port=None, path=None, task=None):
        import signal
        if host is None:
            host = os.getenv("ECF_NODE", "$ECF_NODE$")
        if port is None and not MICRO[0] in host:
            try: port = int(os.getenv("ECF_PORT", "$ECF_PORT$"))
            except: port = 31415
        if host and port:
            self.clt = ec.Client(host, port)
        else: self.clt = None
        self.cl2 = None
        if path is None:
            if "$SUITE$" != "/admin" and not "SUITE" in "$SUITE$":
                self.path = ( "/$SUITE$/$FAMILY$/%s" % legs[0],
                              "/$SUITE$/$FAMILY$/%s" % legs[1],
                          )
            else: self.path = MC_STEPS_LIST
        else: self.path = (path, )
        if task is None:
            self.task = TARGET_TASK
        else: self.task = task
        # shall make sense when processed into job by ecflow
        # name remains when not processed...
        ecfv = {"ECF_NODE": "$ECF_NODE$",
                "ECF_PASS": "$ECF_PASS$",
                "ECF_NAME": "$ECF_NAME$",
                "ECF_PORT": "$ECF_PORT$",
                "ECF_TRYNO": "$ECF_TRYNO$",        }
        for key, val in ecfv.items():
            if key in val:
                ecfv[key] = os.getenv(key, None)
        if ecfv["ECF_NODE"] and ecfv["ECF_NAME"]:
            print "#MSG will communicate with server..."
            print "#kill: ssh %s kill -15 %d" % (ecfv["ECF_NODE"], os.getpid())
            self.cl2 = ec.Client()
            self.cl2.set_host_port(ecfv["ECF_NODE"], ecfv["ECF_PORT"])
            self.cl2.set_child_pid(os.getpid())
            self.cl2.set_child_path(ecfv["ECF_NAME"])
            self.cl2.set_child_password(ecfv["ECF_PASS"])
            self.cl2.set_child_try_no(int(ecfv["ECF_TRYNO"]))
            self.cl2.child_init()
            self.cl2.set_child_timeout(20)
            for sig in (signal.SIGINT,
                        signal.SIGHUP,
                        signal.SIGQUIT,
                        signal.SIGILL,
                        signal.SIGTRAP,
                        signal.SIGIOT,
                        signal.SIGBUS,
                        signal.SIGFPE,
                        signal.SIGUSR1,
                        signal.SIGUSR2,
                        signal.SIGPIPE,
                        signal.SIGTERM,
                        signal.SIGXCPU,
                        signal.SIGPWR):
                signal.signal(sig, self.signal_handler)
    def signal_handler(self, signum, frame):
        """ catch signal """
        print 'Aborting: Signal handler called with signal ', signum
        self.cl2.child_abort("Signal handler called with signal " + str(signum))
    def report(self, msg, meter=None):
        """ communicate with ecFlow server """
        if not self.cl2:
            return
        if meter:
            self.cl2.child_meter(msg, meter)
        elif msg == "stop":
            self.cl2.child_complete()
            sys.exit(0)
        else: self.cl2.child_label("info", msg)
def usage():
    """ help """
    print """client
  -o: operational node is to be replaced, default is test node,
      provide this option BEFORE -r
  -r: replace task node
  -h: this help
ECF_NODE=localhost ECF_PORT=31415 ./ecflow_suite.py -s start
"""
if __name__ == '__main__':
    try:
        OPTS, ARGS = getopt.getopt(
            sys.argv[1:], "hros:t:n:p:",
            ["help", "replace", "oper", "task", "suite", "node", "port"])
    except getopt.GetoptError as err:
        print "# what?", usage()
        sys.exit(2)
    TEST = 1
    PATH = None
    TASK = None
    SUITE = "start"
    PORT = None
    NODE = None
    for o, a in OPTS:
        if o in ("-r", "--replace", ):
            loader(TEST, host=NODE, port=PORT, path=TASK)
            sys.exit(0)
        # elif o in ("-p", "--path", ): PATH = a
        elif o in ("-o", "--oper", ):
            TEST = 0
        elif o in ("-t", "--task", ):
            TASK = a
        elif o in ("-s", "--suite", ): SUITE = a
        elif o in ("-n", "--node", ): NODE = a
        elif o in ("-p", "--port", ): PORT = a
"""
python ecflow_suite.py -s suite -p 1630 -n eurus -r 
"""

This script can be run from emos def:

load suite
python /home/ma/emos/def/o/admin/ecflow_suite.py -s suite -p $(($(id -u) + 1500)) -n localhost -r 
This page contains macros or features from a plugin which requires a valid license.

You will need to contact your administrator.

suite template


Another example would be for seasonal suites, where forecast is on the first of the month, run few days after (here the 4th, in this example), where families may have to work with previous month date.

            Family("seas").add(
                Label("info", "micro as tilde"),
                Variables(ECF_MICRO= "~"),
                Repeat(name="YM", start=
                       [ "%d" % ( YM) 
                         for YM in range(201601, 203212+1) 
                         if (YM % 100) < 13],
                       kind="enum"),
                Family("assim").add(
                    Variables(YMD= "$(if [[ ~YM~ = *01 ]]; then echo $((~YM~ - 89)); else echo $((~YM~ -1)); fi)01", ),
                    Label("info", "back one month"),
                    Task("impossible").add(Trigger("1==0")), # replace me
                ),
                Family("fc").add(
                    Variables(YMD= "~YM~01"),
                    Trigger("assim eq complete"),
                    Task("impossible").add(Trigger("1==0")), # replace me
                ),
                Family("hold").add(
                    Time("10:00"),
                    Date("4.*.*"),
                    Task("always").add(
                        Trigger("1==0"),
                        Complete("1==1")),
                ),