Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Section


Column

(Expanded) Definition file:

It consists of the following keywords for nodes and their attributes definition:

  • suite family task endsuite endfamily endtask
  • autocancel automigrate autorestore,clock complete cron date day defstatus edit event extern inlimit label late limit meter repeat time today trigger     
  • on a line, text beyond # is a comment

 Comparing with SMS text owner are left behind.

autocancel
      for a node to be deleted automatically
      autocancel +01:00  # cancel one hour after complete
      autocancel 10      # cancel 10 days after complete
      autocancel 0       # cancel immediately after being complete
clock
      clock real # hybrid may be used in test mode
complete
      for a node, to be recursively forced complete from a condition
      complete t1:1 or t1==complete
cron
    to run a task regularly, task is requeued as soon as complete is received
    ie no trigger on the parent task complete shall be used
    task can only become complete, thanks to inherited defstatus or complete attribute
    cron 23:00                     # at next 23:00
    cron 10:00 20:00 01:00 # every hour from 10am to 8pm
date
    date 25.12.2012
    date 01.*.*
day
    day monday # sunday,monday,tuesday,wednesday,thursday,friday,saturday
defstatus
    defstatus complete #unknown,suspended,queued,submitted,active,aborted
edit
    to attach a variable definition to a node
    edit variable value
    # variables to be find/and/replaced in a task wrapper
    edit COMMAND  "echo OK" # %COMMAND:sleep 1%
    edit TRIGGER  "t1==complete" # ecflow_client --wait="%TRIGGER:1==1%"
event
      event 1 # may fit a call in task.ecf to 'ecflow_client --event=1'
      event ready # ecflow_client --event=ready
extern
      extern /path/to/a/external/node # to allow path's use in trigger/complete
inlimit
      register the node and its kids to a limit
      inlimit /limits:hpc
      inlimit /suite/limits:hpc
      inlimit /suite/limits:hpc 10
label
      label name "default message" # task.ecf: ecflow_client name "label update"
late
     late -s +00:15 -a 20:00 -c +02:00
limit
     limit hpc 500
meter
     meter name -1 100 90 # 90 is threshold (optional) # task.ecf: ecflow_client --meter=name 30
repeat
      repeat is incremented when all nodes below are complete
      an aborted task DOES prevent repeat to increment
      an Operator/Analsyst/dedicated task can help carry on
      repeat day        step [ENDDATE]     # only for suites
      repeat integer    VARIABLE start end [step]
      repeat enumerated VARIABLE first [second [third ...]]
      repeat string     VARIABLE str1 [str2 ...]
      repeat date       VARIABLE yyyymmdd yyyymmdd [delta]
time
    task become complete ONLY when time range is over
    better not to use such task in a trigger expression
    time 23:00                # at next 23:00
    time 10:00 20:00 01:00    # every hour from 10am to 8pm
    time +00:01               # one minute after the begin suite
    time +00:10 01:00 00:05   # 10-60 min after begin every 5 min
today
    with such attribute, task will start straight when loaded/replaced after given time
    while time attribute would make it wait the next day
    today 3:00                          # today at 3:00
    today 10:00 20:00 01:00    # every hour from 10am to 8pm
trigger      
    for a task to wait the right condition (step/meter/status/variable(int)) to start


Column

Py-Def

As soon as a definition file is beyond few hundred lines, or even before, when obvious repeated patterns are used, a language like Python shall be used. At the Centre, a python module is used for both research and operation to reduce verbosity in suite definition (/home/ma/emos/def/o/def/ecf.py)

Code Block
#!/usr/bin/env python
import sys, pwd; sys.path.append('/home/ma/emos/def/o/def')
# ipython # import ecf; help(ecf.<tab>)
from ecf import *
defs = Defs()
def fill(): # functions can generate tasks/families
	return [Task("t%02d" % i).add(Event(1),
                          Meter("step", -1, 100),
	                      Label("info", ""), )
            for i in xrange(1, 10+1)]  # LIST COMPREHENSION
home = os.getenv("HOME") + "/ecflow_server"
top = Suite("test").add(
   Edit(ECF_HOME= home, # where job, local .out go
	    ECF_FILES= home + "/files", ### where .ecf are found
        ECF_INCLUDE= home + "/include", ### where .h are found
        ECF_OUT= home + "out", # output remote/local location, create missing directories...
        ),
   Family("fam").add(
    Task("t00").add(
      Trigger("t01==complete"),
      Complete("t02:1 or t02==complete"),),
    fill(), ))
if __name__ == '__main__':
  uid = pwd.getpwnam(pwd.getpwuid( os.getuid() )[ 0 ]).pw_uid
  host = "localhost"
  client = Client(host,1500+ui)
  path = "/test"
  defs.add_suite(top)
  client.replace(path, defs)

Task-Wrapper (ecf-file) and header-files

Code Block
languagebash
titleecf-file
#!/usr/bin/env ksh
%manual
  DESCRIPTION: ...
    input(s): ...
    output(s): ...
  OPERATORS: ...
  ANALYST: ...
%end
%comment 
  # ...
%end
%include <qsub.h>
%include <head.h>
# main section
 %COMMAND:printenv% # a variable may contain a command

ecflow_client --wait="%TRIGGER:1==1%" 
# or a embedded blocking/trigger condition 

ecflow_client --event=1
ecflow_client --meter=step 30
ecflow_client --label="updating"
%nopp
  # no preprocessing, here
%end
# a directive to include a file without preprocessing
cat > test.pl <<\EOF
%includenopp <test.pl>
EOF
%ecfmicro @
# from now _at_ is the micro character for directives and variables
# ...
# and revert to percent:
@ecfmicro %
%include <tail.h>


Code Block
languagebash
titlehead.h
#!/bin/ksh
# Defines the variables that are needed for any communication with ECF
export ECF_PORT=%ECF_PORT%    # The server port number
export ECF_HOST=%ECF_HOST%    # The name of ecf host that issued this task
export ECF_NAME=%ECF_NAME%    # The name of this current task
export ECF_PASS=%ECF_PASS%    # A unique password, ...
export ECF_RID=$$             # record the process id. Used for zombie detection
# set as FREE on the server with menu
# ecflowviewecFlowUI=>Special=>FreePassword, to accept communication 
# with a "zombie" with invalid pass
set -eux; export PATH=/usr/local/apps/ecflow/%ECF_VERSION%/bin:$PATH 
ERROR() {
   set +e; wait; ecflow_client --abort=trap; trap 0; exit 0 
} 
trap '{ ERROR ; }' 0 1 2 3 4 5 6 7 8 10 12 13 15
ecflow_client --init=$$


Code Block
languagebash
titletail.h
wait; ecflow_client --complete; trap 0; exit 0



...

Section
bordertrue

Custom GUI

Column

Tkinter can be used to setup an simple GUI client. Suite traverser  is described in the cookbook.

As an example, overview.py is provided, while SMS/CDP has a similar command available.

Code Block
languagepy
themeEclipse
titleoverview
linenumberstrue
collapsetrue
 #!/usr/bin/env python
""" tkinter use example with ecflow
  from a NCEP request, while overview was part of CDP commands
"""

import Tkinter as tki
import ecflow  as ec
from threading import Thread
import Queue, sys, time, os, pwd
from scrolledlist import ScrolledList
# thanks to NMT 
# from=http://infohost.nmt.edu/tcc/help/lang/python/examples/scrolledlist/
# firefox $from ; wget $from/scrolledlist.py

PROGRAM_NAME =  "ecflowview-overview"
BUTTON_FONT  =  ('times', 12)
MONO_FONT    =  ('lucidatypewriter', 14, 'bold')
DEBUG = 0

COLORS = { "aborted": "red",
           "active":  "green",
           "submitted": "cyan",
           "complete": "yellow",
           "suspended": "orange",
           "queued": "blue",
           "unknwon": "grey" }

class Label(object):
    """ a class to encapsulate what was a global variable"""
    inst = None

    def __init__(self, item): Label.inst = item

    @classmethod
    def update(cls):
        if Label.inst is None: return
        Label.inst.set(time.strftime(
                "%a, %d %b %Y %H:%M:%S"))

class MenuBar(tki.Frame):
    def __init__(self, parent):
        tki.Frame.__init__(self, parent)
        self.__helpButton = self.__createHelp()
        self.__helpButton.grid(row=0, column=3)
        self.__updateButton = tki.Button(
            self, text='Update',
            font= BUTTON_FONT,
            command= parent.update)
        self.__updateButton.grid(row=0, column=2)

    def __createHelp(self):
        mb = tki.Menubutton(self, font=BUTTON_FONT,
                        relief= tki.RAISED,
                        text= 'Help')
        menu = tki.Menu(mb)
        mb['menu'] = menu
        url = "https://softwareconfluence.ecmwf.int/wiki/display/ECFLOW/Documentation"
        def url1(): self.__url(url= url)
        def url2(): self.__url(url="http://effbot.org/tkinterbook/")
        menu.add_command(command= url1, label="confluence tutorial?")
        menu.add_command(command= url2, label="tkinter?")
        return mb

    def __url(self, url=None):
        if url is None: return
        os.system("firefox " + url)

class TaskList(tki.Frame):
    NAME_WIDTH = 40
    NAME_LINES = 80

    def __init__(self, parent, kind):
        tki.Frame.__init__(self, parent)
        self.__kind = kind
        self.__callback = None
        self.__label = tki.Label(self, font=BUTTON_FONT,
                                 background= COLORS[kind],
                                 text= kind)
        self.__label.grid(row=0, column=0, sticky= tki.W)
        self.__scrolledList = ScrolledList(
            self, 
            width= self.NAME_WIDTH,
            height= self.NAME_LINES,
            callback= self.__callback)
        self.__scrolledList.grid(row=1, column=0)

    def insert(self, path): self.__scrolledList.append(path)

    def clear(self):        self.__scrolledList.clear()

running = [True]
class PaceKeeper():
    PACE = 60
    def __init__(self, item, queue): 
        thr = Thread(target=self.process, 
               args=(queue, running))
        self._item = item
        thr.start()
        
    def process(self, queue, running):
        while running:
            queue.put(self._item.update)
            time.sleep(self.PACE)

    def run(self): self.update()

    def update(self, verbose=False): 
        while True:
            print time.clock()
            self._item.update()
            time.sleep(self.PACE)
    
class Client(object):
    """ a class to focus on client-ecFlow-server comm"""

    def __init__(self, one="local-localhost@31415"):
        try:    nick, hhh = one.split("-")
        except: hhh = one; nick = None
        try:    host, port = hhh.split("@")
        except: host = "localhost"; port = 31415

        if nick is None: self.nick = "%s@%d" % (host, port)
        print "# client creation", nick, host, port
        self.nick = nick
        self.client = ec.Client(host, port)

    def process(self, win):
        Label.update()
        self.client.sync_local()
        defs = self.client.get_defs()
        if defs is None: print("# %s-%: empty content" % (
                self.host, self.port))
        Label.update()
        for suite in defs.suites: 
            self.process_nc(suite, win)

    def process_nc(self, node, win):
        for item in node.nodes:
            if isinstance(item, ec.Task): 
                self.process_node(item, win)
            else: self.process_nc(item, win)

    def process_node(self, node, wins):
        for kind, win in wins.items():
            status = "%s" % node.get_state()
            if status != kind: continue
            win.insert("%s:%s" % (self.nick, node.get_abs_node_path()))
       # print  self.nick, node.get_abs_node_path(), status

class Application(tki.Frame):
    def __init__(self, master=None, client=None, queue=None):
        tki.Frame.__init__(self, master)
        if client is None:
            self.__clients = [ Client("localhost@31415"), ]
        elif type(client) == set:
            self.__clients = client
        else: self.__clients = [ client ]

        self.__queue = queue
        width =  640
        height = 780
        self.canvas = tki.Canvas(width=width, height=height, bg='black')
        self.grid()
        self.createWidgets()
        self.canvas.after(50, self.check_queue)

    def createWidgets(self):
        rowx = 1
        glob = Label(tki.StringVar(self))
        root = self
        self.__menuBar = MenuBar(root)
        self.__menuBar.grid(row=0, column=0,sticky=tki.W)
        self.label = tki.Label(root,textvariable=Label.inst)
        self.label.grid(row=0, column=2,sticky=tki.E)
        self.__wins = dict()
        rowx += 1
        colx = 0
        kinds = ("active", "aborted", "submitted")
        for kind in kinds:
            self.__wins[kind] = TaskList(root, kind)
            self.__wins[kind].grid(row=rowx, column= colx, 
                                   sticky=tki.S + tki.E + tki.W)
            colx += 1
        self.update()

    def check_queue(self):
        try:   self.__queue.get(block=False)
        except Queue.Empty: pass
        else: self.update()
        self.canvas.after(50, self.check_queue)

    def update(self):
        Label.update()
        for kind, win in self.__wins.items(): 
            win.clear()
        for client in self.__clients: 
            if type(client) == list: 
                for clt in client:
                    clt.process(self.__wins)
            else:
                try: client.process(self.__wins)
                except: pass
        pass

def get_username(): return pwd.getpwuid( os.getuid() )[ 0 ]
def get_uid():      return pwd.getpwnam(get_username()).pw_uid

if __name__ == '__main__':
    try:    port = 1500 + int(get_uid())
    except: port = 31415
        
    if len(sys.argv) == 0: 
        clients = [ Client("localhost%d" % port) ]
    else: 
        clients = []
        for num in xrange(1, len(sys.argv)):
            clients.append( Client(sys.argv[num]) )

    queue = Queue.Queue()
    # app = AppSms(host, port, queue)
    app = Application(client= clients, queue= queue)
    app.master.title(PROGRAM_NAME)
    app.columnconfigure(0, weight=1)
    app.rowconfigure(0, weight=1)

    PaceKeeper(app, queue)
    app.mainloop()



...