Programare Concurenta in Python Continuare

10
2015/03/22 15:26 1/10 Programare concurentă în Python (continuare) ASC Wiki - http://cs.curs.pub.ro/wiki/asc/ Programare concurentă în Python (continuare) Obiective Vom continua în cadrul acestui laborator prezentarea elementelor de sincronizare oferite de Python și nu numai. Obiecte de sincronizare Event Event-urile sunt obiecte simple de sincronizare care permit mai multor thread-uri blocarea voluntară până la apariția unui eveniment semnalat de un alt thread (ex: o condiție a devenit adevărată). Intern, un obiect Event conține un flag setat inițial la valoarea false. El oferă două operații de bază: set(), care setează valoarea flag-ului pe true, și wait(), care blochează execuția thread-urilor apelante până când flag-ul devine true. Dacă flag-ul este deja true în momentul apelării lui wait() aceasta nu va bloca execuția. În momentul setării flag-ului pe true toate thread-urile blocate în wait() vor fi deblocate și își vor continua execuția. Event-ul mai oferă și alte două operații: clear(), care setează flag-ul intern la valoarea false (resetează evenimentul) și is_set(), care oferă posibilitatea de interogare a valorii curente a flag-ului. Refolosirea obiectelor Event pentru a semnaliza un eveniment (cu metoda clear()) trebuie făcută cu grijă, deoarece clear() poate șterge a doua setare a flag-ului pe true înainte ca thread-ul care dorește să aștepte evenimentul să facă wait() pentru a doua oară. Acest lucru va duce cel mai probabil la blocarea nelimitată a acelui thread (deadlock). O altă problemă care poate apărea în cazul refolosirii unui obiect Event este că thread-ul care dorește așteptarea își poate continua execuția fără ca evenimentul să fie semnalizat a doua oară. Această situație apare atunci când flagul nu este resetat cu metoda clear() înainte de al doilea wait(), el rămânând astfel la valoarea true. Al doilea wait() nu va bloca execuția în acestă situație precum se dorește. Testarea valorii flag-ului cu is_set() într-o buclă, fără a executa calcule utile în acea buclă sau fără a apela metode blocante, reprezintă o formă de busy-waiting și trebuie evitată, deoarece, ca orice

description

jkb kj j

Transcript of Programare Concurenta in Python Continuare

  • 2015/03/22 15:26 1/10 Programare concurent n Python (continuare)

    ASC Wiki - http://cs.curs.pub.ro/wiki/asc/

    Programare concurent n Python(continuare)

    Obiective

    Vom continua n cadrul acestui laborator prezentarea elementelor de sincronizare oferite de Python inu numai.

    Obiecte de sincronizare

    Event

    Event-urile sunt obiecte simple de sincronizare care permit mai multor thread-uri blocarea voluntarpn la apariia unui eveniment semnalat de un alt thread (ex: o condiie a devenit adevrat). Intern,un obiect Event conine un flag setat iniial la valoarea false. El ofer dou operaii de baz: set(),care seteaz valoarea flag-ului pe true, i wait(), care blocheaz execuia thread-urilor apelante pncnd flag-ul devine true. Dac flag-ul este deja true n momentul apelrii lui wait() aceasta nu vabloca execuia. n momentul setrii flag-ului pe true toate thread-urile blocate n wait() vor fideblocate i i vor continua execuia.

    Event-ul mai ofer i alte dou operaii: clear(), care seteaz flag-ul intern la valoarea false(reseteaz evenimentul) i is_set(), care ofer posibilitatea de interogare a valorii curente a flag-ului.

    Refolosirea obiectelor Event pentru a semnaliza un eveniment (cu metoda clear()) trebuie fcut cugrij, deoarece clear() poate terge a doua setare a flag-ului pe true nainte ca thread-ul care doretes atepte evenimentul s fac wait() pentru a doua oar. Acest lucru va duce cel mai probabil lablocarea nelimitat a acelui thread (deadlock).

    O alt problem care poate aprea n cazul refolosirii unui obiect Event este c thread-ul care doreteateptarea i poate continua execuia fr ca evenimentul s fie semnalizat a doua oar. Aceastsituaie apare atunci cnd flagul nu este resetat cu metoda clear() nainte de al doilea wait(), elrmnnd astfel la valoarea true. Al doilea wait() nu va bloca execuia n acest situaie precum sedorete.

    Testarea valorii flag-ului cu is_set() ntr-o bucl, fr a executa calcule utile n acea bucl sau fr aapela metode blocante, reprezint o form de busy-waiting i trebuie evitat, deoarece, ca orice

  • Last update: 2015/03/22 00:56 asc:lab3:index http://cs.curs.pub.ro/wiki/asc/asc:lab3:index

    http://cs.curs.pub.ro/wiki/asc/ Printed on 2015/03/22 15:26

    busy-waiting, irosete timp de procesor, care ar putea fi altfel folosit de celelate thread-uri.

    Mai jos este prezentat un exemplu care folosete obiecte Event att pentru a atepta ndeplinireaunei condiii (work_available i result_available), ct i doar pentru a testa un flag (terminate).work_available este folosit pentru a bloca worker-ul atunci cnd nu are task-uri de procesat, iarresult_available este folosit pentru a bloca masterul ct timp worker-ul i proceseaz task-ul.terminate este folosit pentru a semnaliza worker-ului c trebuie s-i ncheie execuia. Observai cdei se folosete metoda is_set(), n bucl exist alte apeluri blocante, care fac worker-ul s testezeflagul doar la apariia unui task, deci nu putem spune c avem de-a face cu busy-waiting.

    event.py

    from threading import enumerate, Event, Thread class Master(Thread): def __init__(self, max_work, work_available, result_available): Thread.__init__(self, name = "Master") self.max_work = max_work self.work_available = work_available self.result_available = result_available def set_worker(self, worker): self.worker = worker def run(self): for i in xrange(self.max_work): # generate work self.work = i # notify worker self.work_available.set() # get result self.result_available.wait() self.result_available.clear() if self.get_work() + 1 != self.worker.get_result(): print "oops", print "%d -> %d" % (self.work, self.worker.get_result()) def get_work(self): return self.work class Worker(Thread): def __init__(self, terminate, work_available, result_available): Thread.__init__(self, name = "Worker") self.terminate = terminate self.work_available = work_available self.result_available = result_available def set_master(self, master): self.master = master def run(self):

  • 2015/03/22 15:26 3/10 Programare concurent n Python (continuare)

    ASC Wiki - http://cs.curs.pub.ro/wiki/asc/

    while(True): # wait work self.work_available.wait() self.work_available.clear() if(terminate.is_set()): break # generate result self.result = self.master.get_work() + 1 # notify master self.result_available.set() def get_result(self): return self.result if __name__ == "__main__": # create shared objects terminate = Event() work_available = Event() result_available = Event() # start worker and master w = Worker(terminate, work_available, result_available) m = Master(10, work_available, result_available) w.set_master(m) m.set_worker(w) w.start() m.start() # wait for master m.join() # wait for worker terminate.set() work_available.set() w.join() # print running threads for verification print enumerate()

    Condition

    Condition (sau variabil condiie) este un obiect de sincronizare care permite mai multor thread-uriblocarea voluntar pn la apariia unei condiii semnalate de un alt thread, asementor Event-urilor.Spre deosebire de acestea ns, un obiect Condition ofer un set de operaii diferit i este asociatntotdeauna cu un lock. Lock-ul este creat implicit la instanierea obiectului Condition sau poate fipasat prin intermediul constructorului dac mai multe obiecte Condition trebuie s partajeze acelailock.

  • Last update: 2015/03/22 00:56 asc:lab3:index http://cs.curs.pub.ro/wiki/asc/asc:lab3:index

    http://cs.curs.pub.ro/wiki/asc/ Printed on 2015/03/22 15:26

    Un obiect Condition ofer operaiile acquire() i release() care vor bloca, respectiv, elibera lock-ulasociat i operaiile specifice: wait(), notify() i notify_all(). Aceste ultime trei operaii trebuientotdeauna apelate doar dup blocarea prealabil a lock-ului asociat. wait() va cauza blocareathread-ul apelant pn la semnalizarea condiiei de ctre alt thread. nainte de blocarea thread-uluiapelant, wait() va debloca lock-ul asociat, iar dup semnalizarea condiiei, metoda wait() va ateptapreluarea lock-ului nainte de terminare. Toi aceti trei pai ai metodei wait() sunt efectuai n modatomic, thread-ul apelant fiind lsat n aceeai stare ca nainte de apel (cu lock-ul asociat blocat).Semnalizarea unei condiii se face cu metodele notify() sau notify_all(). Diferena dintre aceste doumetode este numrul de thread-uri deblocate n momentul apelului: notify() va debloca un singurthread, iar notify_all() va debloca toate thread-urile.

    Cele trei operaii: wait(), notify() i notify_all() vor lsa lock-ul asociat n starea blocat, deblocareaacestuia fcndu-se manual cu metoda release(). De remarcat c dup un notify() sau notify_all(),thread-urile blocate n wait() nu vor continua imediat, ele trebuind s atepte pn cnd lock-ulasociat devine i el disponibil.

    Un obiect Condition este folosit atunci cnd pe lng semnalizarea unei condiii este necesar i unlock pentru a sincroniza accesul la o resurs partajat. n acest caz un obiect Condition este depreferat unui Event deoarece ofer acest lock n mod implicit, revenirea din wait() n momentulsemnalizrii condiiei fcndu-se cu lock-ul blocat.

    Funcionarea unui Condition este asemntoare cu a monitorului asociat fiecrui obiect Java,metodele wait(), notify() i notify_all() / notifyAll() avnd aceeai semantic n ambele limbaje.Apelarea metodelor acquire() i release() este nlocuit n Java de blocul synchronize. Pentru oasemnare mai mare cu Java, n Python putei folosi instruciunea with pentru apelarea automat ametodelor acquire() i release(), precum n exemplul urmtor:

    Java Pythonsynchronize(c) { while(!check()) c.wait();}

    with c: while(not check()): c.wait()

    Un exemplu de folosire a obiectelor Condition poate fi gsit n seciunea Bariere unde, pe lngnotificarea thread-urilor de ndeplinirea unei condiii, avem i o resurs partajat (contorul dethread-uri blocate) care trebuie protejat de un lock.

    Queue

    Cozile sincronizate sunt implementate n Python n modulul Queue n clasele Queue, LifoQueue iPriorityQueue. Obiectele de acest tipuri sunt folosite pentru implementarea comunicrii ntrethreaduri, dup modelul productori-consumatori.

    Metodele oferite de aceste clase permit adugarea i scoaterea de elemente ntr-un mod sincronizat,put i get, i interogarea strii cozii, empty, qsize i full. n plus fa de acestea, putem implementa

  • 2015/03/22 15:26 5/10 Programare concurent n Python (continuare)

    ASC Wiki - http://cs.curs.pub.ro/wiki/asc/

    comportamentul productor-consumator sau master-worker folosind metodele task_done i join, ca nexemplul din documentaie.

    Bariere

    De multe ori un grup de thread-uri sau procese trebuie s ajung toate ntr-un anumit punct i numaidup aceea execuia poate continua. Mecanismul de sincronizare potrivit pentru asemenea cazurieste bariera. In Python 2.7 nu avem un obiect de sincronizare care s implementeze comportamentulunei bariere, dar putem s-o implementam cu ajutorul celorlalte mecanisme de sincronizare. ncepndcu versiunea Python 3.2 s-a introdus clasa Barrier n modulul threading, acesta fiind o barierreentrant implementat folosind variabile condiie (cod surs).

    Ce trebuie s ofere o barier?

    un mecanism de blocare/deblocare a thread-urilorlun contor al numrului de thread-uri care au ajuns/mai trebuie s ajung la barierldeblocarea tuturor thread-urilor atunci cnd a ajuns i ultimul dintre ele la barierl

    Bariera ne-reentrant

    Putem implementa o barier folosind un semafor iniializat cu 0 i un contor al numrului dethread-uri care mai trebuie s ajung la barier, iniializat cu numrul de thread-uri utilizate.Semaforul este folosit pentru a bloca execuia thread-urilor. Contorul este decrementat de fiecarethread care ajunge la barier i reprezint numrul de thread-uri care au mai rmas de ajuns. Fiind ovariabil partajat modificarea lui trebuie bineneles protejat de un lock. n momentul n care ultimulthread decrementeaz contorul, acesta va avea valoarea 0, semnaliznd faptul c toate thread-urileau ajuns la barier. Ultimul thread va incrementa astfel semaforul i va debloca toate thread-urileblocate pn acum.

    simple_barrier.py

    from threading import * class SimpleBarrier(): def __init__(self, num_threads): self.num_threads = num_threads self.count_threads = self.num_threads # contorizeaza numarulde thread-uri ramase self.counter_lock = Lock() # protejeazaaccesarea/modificarea contorului self.threads_sem = Semaphore(0) # blocheazathread-urile ajunse def wait(self): with self.counter_lock: self.count_threads -= 1

  • Last update: 2015/03/22 00:56 asc:lab3:index http://cs.curs.pub.ro/wiki/asc/asc:lab3:index

    http://cs.curs.pub.ro/wiki/asc/ Printed on 2015/03/22 15:26

    if self.count_threads == 0: # a ajuns la barierasi ultimul thread for i in range(self.num_threads): self.threads_sem.release() # incrementeazasemaforul pentru a debloca num_threads thread-uri self.threads_sem.acquire() # num_threads-1threaduri se blocheaza aici # contorul semaforuluise decrementeaza de num_threads ori class MyThread(Thread): def __init__(self, tid, barrier): Thread.__init__(self) self.tid = tid self.barrier = barrier def run(self): print "I'm Thread " + str(self.tid) + " before\n", self.barrier.wait() print "I'm Thread " + str(self.tid) + " after barrier\n",

    De ce nu este reentrant bariera cu un semafor?

    Fie cazul n care avem N thread-uri, iar acestea trebuie sincronizate prin barier de mai multe ori.

    N-1 thread-uri fac acquirelultimul thread face release de N de oril

    unul din release-uri este pt elmN-1 thread-uri se deblocheaz, ultimul thread ar trebui s fac acquire i s nu se blochezel(semaforul fiind 1).rulnd n bucl, unul din thread-urile deblocate poate ajunge sa fac acquire din nou, nainte calultimul thread s treac de acquire.

    ultimul thread rmne blocat la acquirem

    Bariera reentrant

    Barierele reentrante (eng. reusable barrier) sunt utile n prelucrri 'step-by-step' i/sau bucle. Uneleaplicaii pot necesita ca thread-urile s execute anumite operaii n bucl, cu rezultatele tuturorthread-urilor din iteraia curent necesare pentru nceperea iteraiei urmtoare. n acest caz, dupfiecare iteraie, se folosete o sincronizare cu barier.

    Pentru a adapta bariera din seciunea anterioar astfel nct s poat fi folosit de mai multe ori,avem nevoie de nc un semafor. Soluia aceasta se bazeaz pe necesitatea ca toate cele N thread-uris treac de acquire() nainte ca vreunul s revin la barier. Astfel, partea de sincronizare estecompus din dou etape, fiecare folosind cte un semafor.

    Folosind implementarea de mai jos, garantm c thread-urile ajung s se blocheze din nou pe primulsemafor doar dup ce toate au trecut n prealabil de acesta.

  • 2015/03/22 15:26 7/10 Programare concurent n Python (continuare)

    ASC Wiki - http://cs.curs.pub.ro/wiki/asc/

    N-1 thread-uri vor face acquire pe semaforul 1lultimul thread face release de N de ori pe semaforul 1lN-1 thread-uri se deblocheaz i fac acquire pe semaforul 2lultimul thread face acquire pe semaforul 1 i trece de acestalultimul thread face release de N de ori pe semaforul 2lN-1 thread-uri se deblocheaz i fac acquire pe semaforul 1l

    s.a.m.d.

    reentrant_barrier.py

    from threading import * class ReusableBarrierSem(): def __init__(self, num_threads): self.num_threads = num_threads self.count_threads1 = self.num_threads self.count_threads2 = self.num_threads self.counter_lock = Lock() # protejamaccesarea/modificarea contoarelor self.threads_sem1 = Semaphore(0) # blocam thread-urilein prima etapa self.threads_sem2 = Semaphore(0) # blocam thread-urilein a doua etapa def wait(self): self.phase1() self.phase2() def phase1(self): with self.counter_lock: self.count_threads1 -= 1 if self.count_threads1 == 0: for i in range(self.num_threads): self.threads_sem1.release() self.count_threads1 = self.num_threads self.threads_sem1.acquire() def phase2(self): with self.counter_lock: self.count_threads2 -= 1 if self.count_threads2 == 0: for i in range(self.num_threads): self.threads_sem2.release() self.count_threads2 = self.num_threads self.threads_sem2.acquire() class MyThread(Thread): def __init__(self, tid, barrier):

  • Last update: 2015/03/22 00:56 asc:lab3:index http://cs.curs.pub.ro/wiki/asc/asc:lab3:index

    http://cs.curs.pub.ro/wiki/asc/ Printed on 2015/03/22 15:26

    Thread.__init__(self) self.tid = tid self.barrier = barrier def run(self): for i in xrange(10): self.barrier.wait() print "I'm Thread " + str(self.tid) + " after barrier, instep " + str(i) + "\n",

    Soluia de mai sus este necesar deoarece avem nevoie de un mecanism de a notifica toatethread-urile c pot rencepe execuia. O alt modalitate de implementare este folosind un obiectCondition care s trezeasc toate thread-urile care ateapt la barier. Deoarece obiectele Conditionconin un lock, nu mai avem nevoie de alt lock pentru protejarea decrementrii contorului.

    cond_barrier.py

    from threading import Condition class ReusableBarrierCond(): def __init__(self, num_threads): self.num_threads = num_threads self.count_threads = self.num_threads self.cond = Condition() #blocheaza/deblocheaza thread-urile # protejeazamodificarea contorului def wait(self): self.cond.acquire() # intra in regiuneacritica self.count_threads -= 1; if self.count_threads == 0: self.cond.notify_all() # deblocheaza toatethread-urile self.count_threads = self.num_threads else: self.cond.wait(); # blocheaza thread-uleliberand in acelasi timp lock-ul self.cond.release(); # iese din regiuneacritica

    Exerciii(2p) Pornind de la exemplul de folosire a obiectelor Event din fiierul event.py modificai1.metodele get_work i get_result astfel nct fiecare metod s afieze numele ei, obiectul self cu

  • 2015/03/22 15:26 9/10 Programare concurent n Python (continuare)

    ASC Wiki - http://cs.curs.pub.ro/wiki/asc/

    care a fost apelat i thread-ul curent pe care ruleaz. Ce legtura observai ntre obiectul self ithread-ul curent? Justificai.(4p) Rulai fiierul broken-event.py. ncercai diferite valori pentru sleep()-ul de la linia 47.2.ncercai eliminarea apelului sleep(). Ce observai? Ce intercalare a thread-urilor genereazcomportamentul observat?Folosii Ctrl+\ pentru a opri un program blocat.mFolosii sleep() pentru a fora diferite intercalri ale thread-urilor.mFolosii instruciuni print naintea metodelor care lucreaz cu Event-uri pentru a avea o ideemasupra ordinii operaiilor. Datorit intercalrilor thread-urilor este posibil ca print-urile s nu reflecte ordinea exact am

    operaiilor. Rulai de mai multe ori pentru a putea prinde problema.(4p) Implementai n fiierul gossiping.py o propagare ciclic de tip gossiping folosind bariere.3.Se consider N noduri (obiecte de tip Thread), cu indeci 0N-1.mFiecare nod ine o valoare generat random.mCalculai valoarea minim folosind urmtorul procedeu:m

    nodurile ruleaz n ciclurinntr-un ciclu, fiecare nod comunic cu un subset de alte noduri pentru a obine valoareanacestora i a o compara cu a sa

    ca subset considerai random 3 noduri din lista de noduri primit n constructor i obineilvaloarea acestora (metoda get_value)

    dup terminarea unui ciclu, fiecare nod va avea ca valoare minimul obinut n ciclul anteriornla finalul iteraiei toate nodurile vor conine valoarea minimn

    Folosii una din barierele reentrante din modulul barrier din scheletul de laborator.mPentru a simplifica implementarea, e folosit un numr fix de cicluri, negarantnd astfelmconvergena tuturor nodurilor la acelai minim.

    ResurseResponsabilii acestui laborator: Adriana Drghici, Dan DragomirlPDF laboratorlSchelet laboratorlSoluie laboratorl

    Referinemodulul threading - Thread, Lock, Semaphore, Condition, Eventlmodulul QueuelLittle book of semaphores - capitolele 3.5 Barrier i 3.6 Reusable Barrierl

    From:http://cs.curs.pub.ro/wiki/asc/ - ASC Wiki

    Permanent link:http://cs.curs.pub.ro/wiki/asc/asc:lab3:index

    Last update: 2015/03/22 00:56

  • Last update: 2015/03/22 00:56 asc:lab3:index http://cs.curs.pub.ro/wiki/asc/asc:lab3:index

    http://cs.curs.pub.ro/wiki/asc/ Printed on 2015/03/22 15:26

    Programare concurent n Python (continuare)ObiectiveObiecte de sincronizareEventConditionQueue

    BariereBariera ne-reentrantBariera reentrant

    ExerciiiResurseReferine