beg = 0
fin = 48
by = 3
def not_consumer(): return Variables(CONSUME= "no")
def not_producer(): return Variables(PRODUCE= "no")
def ev():return (Event("p"), Event("c"))
def call_task(name, start, stop, inc):
meter = None
if start != stop:
meter = Meter( "step", -1, int(stop))
return Task(name).add(
ev(),
Variables(BEG= start,
FIN= stop,
BY= inc),
meter)
def call_consumer(SELECTION):
# defstatus suspended # avoid early start
def consume1(leap = 1, leap_nb = 3):
return [ Family("%d" % leap).add(
call_task( "consume", "'%STEP%'", "'%STEP%'", by * leap_nb).add(
Repeat("STEP", beg + by * (leap - 1), fin, by * leap_nb, kind="integer"),
# same trigger as consume0
# Trigger("(%s and (consume:STEP le %s1/produce:STEP)) or " % (lead, prod) +
# "(not %s and (consume:STEP le %s0/produce:step))" % (lead, prod))
Trigger("(consume:STEP le %s1/produce:STEP)" % prod),
)) for leap in xrange(1, leap_nb) ]
def consume2(beg, fin):
return [ Family("%03d" % idx).add(
call_task( "consume", idx, idx, by).add(
Variables(STEP= idx),
# Same trigger as consume0
# Trigger("(%s and (consume:STEP le %s1/produce:STEP)) or " % (lead, prod) +
# "(not %s and (consume:STEP le %s0/produce:step))" % (lead, prod))))
Trigger("consume:STEP le %s1/produce:STEP" % prod),
)) for idx in xrange(beg, fin+1, by )]
lead = ic.psel() + "/consumer/admin/leader:1"
prod = ic.psel() + "/consumer/produce"
if 0: return Family("consumer").add(Defcomplete()) # FIXME
return Family("consumer").add(
Defcomplete(),
Variables(SLEEP= 10,
PRODUCE= "no",
CONSUME= "no"),
Family("limit").add(
Defcomplete(),
Limit("consume", 7),),
Family("admin").add(
# set manually with Xcdp or alter the event 1 so
# that producer 1 becomes leader
# default is producer0 leads
Task("leader").add(
Event("1"),
Defcomplete())),
# text this task is dummy task not designed to run
# default : task does both :
Variables(PRODUCE= "yes",
CONSUME= "yes"),
# this task will do both, ie serial
call_task( "produce", beg, fin, by).add(
Label("info", "do both at once"),),
# this will loop inside the task, reporting
# its processed step as a meter indication
Family("produce0").add(
not_consumer(),
call_task( "produce", beg, fin, by )),
# here, choice is to push a new task for each step
Family("produce1").add(
not_consumer(),
call_task( "produce", '%STEP%', "%STEP%", by ).add(
Repeat("STEP", beg, fin, by , kind="integer"), )),
# PRB edit FIN '$((%STEP% + %BY%))'
Family("consume").add(
not_producer(),
Inlimit("limit:consume"),
Variables(CALL_WAITER= 1,
# $step will be interpreted in the job!
TRIGGER= "../produce:step -gt consume:$step or ../produce eq complete"),
call_task( "consume", beg, fin, by ).add(
)),
Family("consume0or1").add(
not_producer(),
Inlimit("limit:consume"),
call_task( "consume", "%STEP%", "%STEP%", by, ),
Repeat( "STEP", beg, fin, by, kind="integer"),
Trigger("(%s and (consume0or1:STEP le %s1/produce:STEP)) or "
% (lead, prod) +
"(not %s and (consume0or1:STEP le %s0/produce:step))"
% (lead, prod))),
Family("consume1").add(
not_producer(),
Inlimit("limit:consume"),
consume1()),
Family("consume2").add(
# loop is ``exploded''
# consume limit may be changed manually with Xcdp
# to reduce or increase the load
Inlimit("limit:consume"),
not_producer(),
consume2(beg, fin)))
|