Programare Concurenta in Python Continuare

Post on 27-Sep-2015

24 views 0 download

description

jkb kj j

Transcript of Programare Concurenta in Python Continuare

  • 2015/03/22 15:26 1/10 Programare concurent n Python (continuare)

    ASC Wiki - http://cs.curs.pub.ro/wiki/asc/

    Programare concurent n Python(continuare)

    Obiective

    Vom continua n cadrul acestui laborator prezentarea elementelor de sincronizare oferite de Python inu numai.

    Obiecte de sincronizare

    Event

    Event-urile sunt obiecte simple de sincronizare care permit mai multor thread-uri blocarea voluntarpn la apariia unui eveniment semnalat de un alt thread (ex: o condiie a devenit adevrat). Intern,un obiect Event conine un flag setat iniial la valoarea false. El ofer dou operaii de baz: set(),care seteaz valoarea flag-ului pe true, i wait(), care blocheaz execuia thread-urilor apelante pncnd flag-ul devine true. Dac flag-ul este deja true n momentul apelrii lui wait() aceasta nu vabloca execuia. n momentul setrii flag-ului pe true toate thread-urile blocate n wait() vor fideblocate i i vor continua execuia.

    Event-ul mai ofer i alte dou operaii: clear(), care seteaz flag-ul intern la valoarea false(reseteaz evenimentul) i is_set(), care ofer posibilitatea de interogare a valorii curente a flag-ului.

    Refolosirea obiectelor Event pentru a semnaliza un eveniment (cu metoda clear()) trebuie fcut cugrij, deoarece clear() poate terge a doua setare a flag-ului pe true nainte ca thread-ul care doretes atepte evenimentul s fac wait() pentru a doua oar. Acest lucru va duce cel mai probabil lablocarea nelimitat a acelui thread (deadlock).

    O alt problem care poate aprea n cazul refolosirii unui obiect Event este c thread-ul care doreteateptarea i poate continua execuia fr ca evenimentul s fie semnalizat a doua oar. Aceastsituaie apare atunci cnd flagul nu este resetat cu metoda clear() nainte de al doilea wait(), elrmnnd astfel la valoarea true. Al doilea wait() nu va bloca execuia n acest situaie precum sedorete.

    Testarea valorii flag-ului cu is_set() ntr-o bucl, fr a executa calcule utile n acea bucl sau fr aapela metode blocante, reprezint o form de busy-waiting i trebuie evitat, deoarece, ca orice

  • Last update: 2015/03/22 00:56 asc:lab3:index http://cs.curs.pub.ro/wiki/asc/asc:lab3:index

    http://cs.curs.pub.ro/wiki/asc/ Printed on 2015/03/22 15:26

    busy-waiting, irosete timp de procesor, care ar putea fi altfel folosit de celelate thread-uri.

    Mai jos este prezentat un exemplu care folosete obiecte Event att pentru a atepta ndeplinireaunei condiii (work_available i result_available), ct i doar pentru a testa un flag (terminate).work_available este folosit pentru a bloca worker-ul atunci cnd nu are task-uri de procesat, iarresult_available este folosit pentru a bloca masterul ct timp worker-ul i proceseaz task-ul.terminate este folosit pentru a semnaliza worker-ului c trebuie s-i ncheie execuia. Observai cdei se folosete metoda is_set(), n bucl exist alte apeluri blocante, care fac worker-ul s testezeflagul doar la apariia unui task, deci nu putem spune c avem de-a face cu busy-waiting.

    event.py

    from threading import enumerate, Event, Thread class Master(Thread): def __init__(self, max_work, work_available, result_available): Thread.__init__(self, name = "Master") self.max_work = max_work self.work_available = work_available self.result_available = result_available def set_worker(self, worker): self.worker = worker def run(self): for i in xrange(self.max_work): # generate work self.work = i # notify worker self.work_available.set() # get result self.result_available.wait() self.result_available.clear() if self.get_work() + 1 != self.worker.get_result(): print "oops", print "%d -> %d" % (self.work, self.worker.get_result()) def get_work(self): return self.work class Worker(Thread): def __init__(self, terminate, work_available, result_available): Thread.__init__(self, name = "Worker") self.terminate = terminate self.work_available = work_available self.result_available = result_available def set_master(self, master): self.master = master def run(self):

  • 2015/03/22 15:26 3/10 Programare concurent n Python (continuare)

    ASC Wiki - http://cs.curs.pub.ro/wiki/asc/

    while(True): # wait work self.work_available.wait() self.work_available.clear() if(terminate.is_set()): break # generate result self.result = self.master.get_work() + 1 # notify master self.result_available.set() def get_result(self): return self.result if __name__ == "__main__": # create shared objects terminate = Event() work_available = Event() result_available = Event() # start worker and master w = Worker(terminate, work_available, result_available) m = Master(10, work_available, result_available) w.set_master(m) m.set_worker(w) w.start() m.start() # wait for master m.join() # wait for worker terminate.set() work_available.set() w.join() # print running threads for verification print enumerate()

    Condition

    Condition (sau variabil condiie) este un obiect de sincronizare care permite mai multor thread-uriblocarea voluntar pn la apariia unei condiii semnalate de un alt thread, asementor Event-urilor.Spre deosebire de acestea ns, un obiect Condition ofer un set de operaii diferit i este asociatntotdeauna cu un lock. Lock-ul este creat implicit la instanierea obiectului Condition sau poate fipasat prin intermediul constructorului dac mai multe obiecte Condition trebuie s partajeze acelailock.

  • Last update: 2015/03/22 00:56 asc:lab3:index http://cs.curs.pub.ro/wiki/asc/asc:lab3:index

    http://cs.curs.pub.ro/wiki/asc/ Printed on 2015/03/22 15:26

    Un obiect Condition ofer operaiile acquire() i release() care vor bloca, respectiv, elibera lock-ulasociat i operaiile specifice: wait(), notify() i notify_all(). Aceste ultime trei operaii trebuientotdeauna apelate doar dup blocarea prealabil a lock-ului asociat. wait() va cauza blocareathread-ul apelant pn la semnalizarea condiiei de ctre alt thread. nainte de blocarea thread-uluiapelant, wait() va debloca lock-ul asociat, iar dup semnalizarea condiiei, metoda wait() va ateptapreluarea lock-ului nainte de terminare. Toi aceti trei pai ai metodei wait() sunt efectuai n modatomic, thread-ul apelant fiind lsat n aceeai stare ca nainte de apel (cu lock-ul asociat blocat).Semnalizarea unei condiii se face cu metodele notify() sau notify_all(). Diferena dintre aceste doumetode este numrul de thread-uri deblocate n momentul apelului: notify() va debloca un singurthread, iar notify_all() va debloca toate thread-urile.

    Cele trei operaii: wait(), notify() i notify_all() vor lsa lock-ul asociat n starea blocat, deblocareaacestuia fcndu-se manual cu metoda release(). De remarcat c dup un notify() sau notify_all(),thread-urile blocate n wait() nu vor continua imediat, ele trebuind s atepte pn cnd lock-ulasociat devine i el disponibil.

    Un obiect Condition este folosit atunci cnd pe lng semnalizarea unei condiii este necesar i unlock pentru a sincroniza accesul la o resurs partajat. n acest caz un obiect Condition este depreferat unui Event deoarece ofer acest lock n mod implicit, revenirea din wait() n momentulsemnalizrii condiiei fcndu-se cu lock-ul blocat.

    Funcionarea unui Condition este asemntoare cu a monitorului asociat fiecrui obiect Java,metodele wait(), notify() i notify_all() / notifyAll() avnd aceeai semantic n ambele limbaje.Apelarea metodelor acquire() i release() este nlocuit n Java de blocul synchronize. Pentru oasemnare mai mare cu Java, n Python putei folosi instruciunea with pentru apelarea automat ametodelor acquire() i release(), precum n exemplul urmtor:

    Java Pythonsynchronize(c) { while(!check()) c.wait();}

    with c: while(not check()): c.wait()

    Un exemplu de folosire a obiectelor Condition poate fi gsit n seciunea Bariere unde, pe lngnotificarea thread-urilor de ndeplinirea unei condiii, avem i o resurs partajat (contorul dethread-uri blocate) care trebuie protejat de un lock.

    Queue

    Cozile sincronizate sunt implementate n Python n modulul Queue n clasele Queue, LifoQueue iPriorityQueue. Obiectele de acest tipuri sunt folosite pentru implementarea comunicrii ntrethreaduri, dup modelul productori-consumatori.

    Metodele oferite de aceste clase permit adugarea i scoaterea de elemente ntr-un mod sincronizat,put i get, i interogarea strii cozii, empty, qsize i full. n plus fa de acestea, putem implementa

  • 2015/03/22 15:26 5/10 Programare concurent n Python (continuare)

    ASC Wiki - http://cs.curs.pub.ro/wiki/asc/

    comportamentul productor-consumator sau master-worker folosind metodele task_done i join, ca nexemplul din documentaie.

    Bariere

    De multe ori un grup de thread-uri sau procese trebuie s ajung toate ntr-un anumit punct i numaidup aceea execuia poate continua. Mecanismul de sincronizare potrivit pentru asemenea cazurieste bariera. In Python 2.7 nu avem un obiect de sincronizare care s implementeze comportamentulunei bariere, dar putem s-o implementam cu ajutorul celorlalte mecanisme de sincronizare. ncepndcu versiunea Python 3.2 s-a introdus clasa Barrier n modulul threading, acesta fiind o barierreentrant implementat folosind variabile condiie (cod surs).

    Ce trebuie s ofere o barier?

    un mecanism de blocare/deblocare a thread-urilorlun contor al numrului de thread-uri care au ajuns/mai trebuie s ajung la barierldeblocarea tuturor thread-urilor atunci cnd a ajuns i ultimul dintre ele la barierl

    Bariera ne-reentrant

    Putem implementa o barier folosind un semafor iniializat cu 0 i un contor al numrului dethread-uri care mai trebuie s ajung la barier, iniializat cu numrul de thread-uri utilizate.Semaforul este folosit pentru a bloca execuia thread-urilor. Contorul este decrementat de fiecarethread care ajunge la barier i reprezint numrul de thread-uri care au mai rmas de ajuns. Fiind ovariabil partajat modificarea lui trebuie bineneles protejat de un lock. n momentul n care ultimulthread decrementeaz contorul, acesta va avea valoarea 0, semnaliznd faptul c toate thread-urileau ajuns la barier. Ultimul thread va incrementa astfel semaforul i va debloca toate thread-urileblocate pn acum.

    simple_barrier.py

    from threading import * class SimpleBarrier(): def __init__(self, num_threads): self.num_threads = num_threads self.count_threads = self.num_threads # contorizeaza numarulde thread-uri ramase self.counter_lock = Lock() # protejeazaaccesarea/modificarea contorului self.threads_sem = Semaphore(0) # blocheazathread-urile ajunse def wait(self): with self.counter_lock: self.count_threads -= 1

  • Last update: 2015/03/22 00:56 asc:lab3:index http://cs.curs.pub.ro/wiki/asc/asc:lab3:index

    http://cs.curs.pub.ro/wiki/asc/ Printed on 2015/03/22 15:26

    if self.count_threads == 0: # a ajuns la barierasi ultimul thread for i in range(self.num_threads): self.threads_sem.release() # incrementeazasemaforul pentru a debloca num_threads thread-uri self.threads_sem.acquire() # num_threads-1threaduri se blocheaza aici # contorul semaforuluise decrementeaza de num_threads ori class MyThread(Thread): def __init__(self, tid, barrier): Thread.__init__(self) self.tid = tid self.barrier = barrier def run(self): print "I'm Thread " + str(self.tid) + " before\n", self.barrier.wait() print "I'm Thread " + str(self.tid) + " after barrier\n",

    De ce nu este reentrant bariera cu un semafor?

    Fie cazul n care avem N thread-uri, iar acestea trebuie sincronizate prin barier de mai multe ori.

    N-1 thread-uri fac acquirelultimul thread face release de N de oril

    unul din release-uri este pt elmN-1 thread-uri se deblocheaz, ultimul thread ar trebui s fac acquire i s nu se blochezel(semaforul fiind 1).rulnd n bucl, unul din thread-urile deblocate poate ajunge sa fac acquire din nou, nainte calultimul thread s treac de acquire.

    ultimul thread rmne blocat la acquirem

    Bariera reentrant

    Barierele reentrante (eng. reusable barrier) sunt utile n prelucrri 'step-by-step' i/sau bucle. Uneleaplicaii pot necesita ca thread-urile s execute anumite operaii n bucl, cu rezultatele tuturorthread-urilor din iteraia curent necesare pentru nceperea iteraiei urmtoare. n acest caz, dupfiecare iteraie, se folosete o sincronizare cu barier.

    Pentru a adapta bariera din seciunea anterioar astfel nct s poat fi folosit de mai multe ori,avem nevoie de nc un semafor. Soluia aceasta se bazeaz pe necesitatea ca toate cele N thread-uris treac de acquire() nainte ca vreunul s revin la barier. Astfel, partea de sincronizare estecompus din dou etape, fiecare folosind cte un semafor.

    Folosind implementarea de mai jos, garantm c thread-urile ajung s se blocheze din nou pe primulsemafor doar dup ce toate au trecut n prealabil de acesta.

  • 2015/03/22 15:26 7/10 Programare concurent n Python (continuare)

    ASC Wiki - http://cs.curs.pub.ro/wiki/asc/

    N-1 thread-uri vor face acquire pe semaforul 1lultimul thread face release de N de ori pe semaforul 1lN-1 thread-uri se deblocheaz i fac acquire pe semaforul 2lultimul thread face acquire pe semaforul 1 i trece de acestalultimul thread face release de N de ori pe semaforul 2lN-1 thread-uri se deblocheaz i fac acquire pe semaforul 1l

    s.a.m.d.

    reentrant_barrier.py

    from threading import * class ReusableBarrierSem(): def __init__(self, num_threads): self.num_threads = num_threads self.count_threads1 = self.num_threads self.count_threads2 = self.num_threads self.counter_lock = Lock() # protejamaccesarea/modificarea contoarelor self.threads_sem1 = Semaphore(0) # blocam thread-urilein prima etapa self.threads_sem2 = Semaphore(0) # blocam thread-urilein a doua etapa def wait(self): self.phase1() self.phase2() def phase1(self): with self.counter_lock: self.count_threads1 -= 1 if self.count_threads1 == 0: for i in range(self.num_threads): self.threads_sem1.release() self.count_threads1 = self.num_threads self.threads_sem1.acquire() def phase2(self): with self.counter_lock: self.count_threads2 -= 1 if self.count_threads2 == 0: for i in range(self.num_threads): self.threads_sem2.release() self.count_threads2 = self.num_threads self.threads_sem2.acquire() class MyThread(Thread): def __init__(self, tid, barrier):

  • Last update: 2015/03/22 00:56 asc:lab3:index http://cs.curs.pub.ro/wiki/asc/asc:lab3:index

    http://cs.curs.pub.ro/wiki/asc/ Printed on 2015/03/22 15:26

    Thread.__init__(self) self.tid = tid self.barrier = barrier def run(self): for i in xrange(10): self.barrier.wait() print "I'm Thread " + str(self.tid) + " after barrier, instep " + str(i) + "\n",

    Soluia de mai sus este necesar deoarece avem nevoie de un mecanism de a notifica toatethread-urile c pot rencepe execuia. O alt modalitate de implementare este folosind un obiectCondition care s trezeasc toate thread-urile care ateapt la barier. Deoarece obiectele Conditionconin un lock, nu mai avem nevoie de alt lock pentru protejarea decrementrii contorului.

    cond_barrier.py

    from threading import Condition class ReusableBarrierCond(): def __init__(self, num_threads): self.num_threads = num_threads self.count_threads = self.num_threads self.cond = Condition() #blocheaza/deblocheaza thread-urile # protejeazamodificarea contorului def wait(self): self.cond.acquire() # intra in regiuneacritica self.count_threads -= 1; if self.count_threads == 0: self.cond.notify_all() # deblocheaza toatethread-urile self.count_threads = self.num_threads else: self.cond.wait(); # blocheaza thread-uleliberand in acelasi timp lock-ul self.cond.release(); # iese din regiuneacritica

    Exerciii(2p) Pornind de la exemplul de folosire a obiectelor Event din fiierul event.py modificai1.metodele get_work i get_result astfel nct fiecare metod s afieze numele ei, obiectul self cu

  • 2015/03/22 15:26 9/10 Programare concurent n Python (continuare)

    ASC Wiki - http://cs.curs.pub.ro/wiki/asc/

    care a fost apelat i thread-ul curent pe care ruleaz. Ce legtura observai ntre obiectul self ithread-ul curent? Justificai.(4p) Rulai fiierul broken-event.py. ncercai diferite valori pentru sleep()-ul de la linia 47.2.ncercai eliminarea apelului sleep(). Ce observai? Ce intercalare a thread-urilor genereazcomportamentul observat?Folosii Ctrl+\ pentru a opri un program blocat.mFolosii sleep() pentru a fora diferite intercalri ale thread-urilor.mFolosii instruciuni print naintea metodelor care lucreaz cu Event-uri pentru a avea o ideemasupra ordinii operaiilor. Datorit intercalrilor thread-urilor este posibil ca print-urile s nu reflecte ordinea exact am

    operaiilor. Rulai de mai multe ori pentru a putea prinde problema.(4p) Implementai n fiierul gossiping.py o propagare ciclic de tip gossiping folosind bariere.3.Se consider N noduri (obiecte de tip Thread), cu indeci 0N-1.mFiecare nod ine o valoare generat random.mCalculai valoarea minim folosind urmtorul procedeu:m

    nodurile ruleaz n ciclurinntr-un ciclu, fiecare nod comunic cu un subset de alte noduri pentru a obine valoareanacestora i a o compara cu a sa

    ca subset considerai random 3 noduri din lista de noduri primit n constructor i obineilvaloarea acestora (metoda get_value)

    dup terminarea unui ciclu, fiecare nod va avea ca valoare minimul obinut n ciclul anteriornla finalul iteraiei toate nodurile vor conine valoarea minimn

    Folosii una din barierele reentrante din modulul barrier din scheletul de laborator.mPentru a simplifica implementarea, e folosit un numr fix de cicluri, negarantnd astfelmconvergena tuturor nodurilor la acelai minim.

    ResurseResponsabilii acestui laborator: Adriana Drghici, Dan DragomirlPDF laboratorlSchelet laboratorlSoluie laboratorl

    Referinemodulul threading - Thread, Lock, Semaphore, Condition, Eventlmodulul QueuelLittle book of semaphores - capitolele 3.5 Barrier i 3.6 Reusable Barrierl

    From:http://cs.curs.pub.ro/wiki/asc/ - ASC Wiki

    Permanent link:http://cs.curs.pub.ro/wiki/asc/asc:lab3:index

    Last update: 2015/03/22 00:56

  • Last update: 2015/03/22 00:56 asc:lab3:index http://cs.curs.pub.ro/wiki/asc/asc:lab3:index

    http://cs.curs.pub.ro/wiki/asc/ Printed on 2015/03/22 15:26

    Programare concurent n Python (continuare)ObiectiveObiecte de sincronizareEventConditionQueue

    BariereBariera ne-reentrantBariera reentrant

    ExerciiiResurseReferine