- Created by Axel Bonet, last modified on Jan 25, 2017
You are viewing an old version of this page. View the current version.
Compare with Current View Page History
« Previous Version 2 Next »
From one simple task which both produces and consumes "steps", to the extreme parallel version with one task per step to process.
Alternatives Expand source
beg = 0 fin = 48 by = 3 def not_consumer(): return Variables(CONSUME= "no") def not_producer(): return Variables(PRODUCE= "no") def ev():return (Event("p"), Event("c")) def call_task(name, start, stop, inc): meter = None if start != stop: meter = Meter( "step", -1, int(stop)) return Task(name).add( ev(), Variables(BEG= start, FIN= stop, BY= inc), meter) def call_consumer(SELECTION): # defstatus suspended # avoid early start def consume1(leap = 1, leap_nb = 3): return [ Family("%d" % leap).add( call_task( "consume", "'%STEP%'", "'%STEP%'", by * leap_nb).add( Repeat("STEP", beg + by * (leap - 1), fin, by * leap_nb, kind="integer"), # same trigger as consume0 # Trigger("(%s and (consume:STEP le %s1/produce:STEP)) or " % (lead, prod) + # "(not %s and (consume:STEP le %s0/produce:step))" % (lead, prod)) Trigger("(consume:STEP le %s1/produce:STEP)" % prod), )) for leap in xrange(1, leap_nb) ] def consume2(beg, fin): return [ Family("%03d" % idx).add( call_task( "consume", idx, idx, by).add( Variables(STEP= idx), # Same trigger as consume0 # Trigger("(%s and (consume:STEP le %s1/produce:STEP)) or " % (lead, prod) + # "(not %s and (consume:STEP le %s0/produce:step))" % (lead, prod)))) Trigger("consume:STEP le %s1/produce:STEP" % prod), )) for idx in xrange(beg, fin+1, by )] lead = ic.psel() + "/consumer/admin/leader:1" prod = ic.psel() + "/consumer/produce" if 0: return Family("consumer").add(Defcomplete()) # FIXME return Family("consumer").add( Defcomplete(), Variables(SLEEP= 10, PRODUCE= "no", CONSUME= "no"), Family("limit").add( Defcomplete(), Limit("consume", 7),), Family("admin").add( # set manually with Xcdp or alter the event 1 so # that producer 1 becomes leader # default is producer0 leads Task("leader").add( Event("1"), Defcomplete())), # text this task is dummy task not designed to run # default : task does both : Variables(PRODUCE= "yes", CONSUME= "yes"), # this task will do both, ie serial call_task( "produce", beg, fin, by).add( Label("info", "do both at once"),), # this will loop inside the task, reporting # its processed step as a meter indication Family("produce0").add( not_consumer(), call_task( "produce", beg, fin, by )), # here, choice is to push a new task for each step Family("produce1").add( not_consumer(), call_task( "produce", '%STEP%', "%STEP%", by ).add( Repeat("STEP", beg, fin, by , kind="integer"), )), # PRB edit FIN '$((%STEP% + %BY%))' Family("consume").add( not_producer(), Inlimit("limit:consume"), Variables(CALL_WAITER= 1, # $step will be interpreted in the job! TRIGGER= "../produce:step -gt consume:$step or ../produce eq complete"), call_task( "consume", beg, fin, by ).add( )), Family("consume0or1").add( not_producer(), Inlimit("limit:consume"), call_task( "consume", "%STEP%", "%STEP%", by, ), Repeat( "STEP", beg, fin, by, kind="integer"), Trigger("(%s and (consume0or1:STEP le %s1/produce:STEP)) or " % (lead, prod) + "(not %s and (consume0or1:STEP le %s0/produce:step))" % (lead, prod))), Family("consume1").add( not_producer(), Inlimit("limit:consume"), consume1()), Family("consume2").add( # loop is ``exploded'' # consume limit may be changed manually with Xcdp # to reduce or increase the load Inlimit("limit:consume"), not_producer(), consume2(beg, fin)))
- No labels