PATR_Curs_7

30
PROGRAMAREA APLICATIILOR IN TIMP REAL Curs 7 Comunicatia prin intermediul mesajelor

description

Programarea Aplicatiilor in timp real

Transcript of PATR_Curs_7

Page 1: PATR_Curs_7

PROGRAMAREA APLICATIILOR IN TIMP REAL

Curs 7

Comunicatia prin intermediul mesajelor

Page 2: PATR_Curs_7

Comunicatia intre procese:

● QNX Neutrino suporta mai multe mecanisme de comunicatie intre procese (IPC):– mesaje Neutrino native– pulsuri– semnale– memorie partajata– pipe-uri (buffere dinamice)– cozi de mesaje POSIX

Page 3: PATR_Curs_7

Transmitere de mesaje:

● transmiterea mesajelor se poate face:– direct intre procese sau fire de executie, prin specificarea

destinatiei, respectiv a sursei– indirect, prin intermediul cutiilor postale sau canalelor de

comunicatie (e.g. sub QNX)– cu ajutorul pipe-urilor

● in general, functia de transmitere de mesaje este implementata sub forma unei perechi de primitive de tipul:

send(destinatie, mesaj) in procesul senderreceive(sursa, mesaj) in procesul receiver

Page 4: PATR_Curs_7

Transmitere de mesaje - aspecte:

● sincronizare prin transmitere de mesaje– transmiterea de mesaje intre procese (fire de executie) implica un anumit nivel de

sincronizare

● un proces care receptioneaza date (receiver) poate receptiona un mesaj doar atunci cand mesajul a fost transmis de procesul transmitator (sender)

● dupa efectuarea primitivelor send, respectiv receive exista doua posibilitati de comportament al procesului, adica ...

Page 5: PATR_Curs_7

Transmitere de mesaje - aspecte:

● dupa apelul unei primitive de tip send:● procesul sender se blocheaza pana cand mesajul este receptionat

la destinatie, sau● procesul sender nu se blocheaza

● dupa apelul unei primitive de tip receive:– daca mesajul fusese transmis anterior executarii primitivei receive in

taskul de destinatie, el va fi receptionat de catre acest task si executia taskului continua

– in cazul in care mesajul nu a fost inca transmis in momentul executarii primitivei receive,

● taskul care a efectuat receive se blocheaza pana la sosirea mesajului, sau

● taskul receptor isi continua executia, abandonand incercarea de a receptiona mesajul

Page 6: PATR_Curs_7

Transmitere de mesaje - aspecte:

In acest mod, atat taskul sender, cat si taskul receiver, pot fi cu blocare (engl. blocking sender and/or receiver) sau fara blocare (engl. non-blocking sender and/or receiver). Exista trei situatii posibile:

● blocking send, blocking receive. Atat taskul sender, cat si taskul receiver se blocheaza pana la transmiterea, respectiv receptionarea mesajului. Se realizeaza un rendez-vous, sau sincronizare puternica (engl. tight syncronization).

● non-blocking send, blocking receive. Taskul sender poate continua sa transmita mesaje, chiar daca receptorul se blocheaza pana la receptionarea mesajului asteptat. Este cea mai utila combinatie, care permite unui proces sa transmita unul sau mai multe mesaje la destinatii diferite in timp foarte scurt (e.g. un proces server care are rolul de a furniza servicii catre alte procese). Este implementarea cea mai uzuala pentru programarea concurenta

● non-blocking send, non-blocking receive.

Page 7: PATR_Curs_7

Transmitere de mesaje - aspecte:

● adresarea:– intotdeauna este necesar ca in primitiva send sa fie specificat

procesul care trebuie sa receptioneze mesajul si– in procesul receiver trebuie sa se specifice sursa mesajului

care trebuie receptionat

● adresare directa– primitiva send include un identificator specific al procesului receiver– pentru primitiva receive este necesar ca procesul sa specifice

procesul sender

● adresare indirecta– mesajele nu vor fi trimise direct de la taskul sender la taskul

receiver, ci sunt transmise la o structura de date partajata (e.g. cozi care pot tine temporar mesaje).

● astfel de cozi poarta denumirea de cutii postale– in acest caz, comunicatia intre doua procese presupune ca

procesul transmitator sa trimita un mesaj la cutia postala corespunzatoare, de unde procesul receptor va ridica mesajul

Page 8: PATR_Curs_7

Transmitere de mesaje - aspecte:● adresare indirecta

– rela\ia [ntre senders ]i receivers poate fi

(a) one-to-one canal privat de comunicatie

(b) many-to-many

(c) many-to-one

one-to-many

ProcesP1

ProcesPn

ProcesQ1

ProcesQm

CUTIEPOSTALA

ProcesP1

ProcesPn

ProcesQ1PORT

Page 9: PATR_Curs_7

Transmitere de mesaje - aspecte:

● relatie sender receiver de tip many-to many / one-to-many– asocierea proceselor la cutiile postale poate fi statica sau dinamica– porturile sunt de cele mai multe ori asociate static unui anumit proces (dupa creare,

portul este asociat permanent procesului)– cand exista mai multe procese sender, asocierea unui sender la o cutie postala poate

fi facuta in mod dinamic (se folosesc primitive de tip connect si disconnect) ● exemplu: canale de comunicatie in QNX

Cine detine o cutie postala?● in cazul unui port, acesta este in mod normal creat si detinut de procesul receiver (atunci

cand procesul este distrus, va fi distrus si portul); ● in cazul general al cutiilor postale, sistemul de operare poate oferi un servicu special de

creare a cutiilor postale

ProcesP1

ProcesPn

ProcesQ1PORT

Page 10: PATR_Curs_7

Transmitere de mesaje - aspecte:

● modalitatea de inregistrare a mesajelor la cutia postala– cea simpla metoda este cea de tip FIFO

● nu este indicata daca unele mesaje sunt mai urgente decat altele

– se aloca o prioritate mesajelor, in functie de tipul mesajului sau de sender

– i se poate permite receiver-ului sa inspecteze coada de mesaje si sa selecteze mesajul pe care il va receptiona

Page 11: PATR_Curs_7

Sincronizarea cu ajutorul mesajelor:

● se considera doua taskuri care trebuie sa-si coordoneze activitatile, astfel incat Task 1 sa-si poata continua executia doar dupa executarea activitatii 2 (dupa sosirea mesajului de la Task 2)

Task 1 Task 2

activitate_1() activitate_2()receive(Task2, mesaj) send(Task1, mesaj)activitate_3() activitate_4()

● functioneaza corect daca folosim primitive de tipul non-blocking send / blocking receive

● starile initiale ale cutiilor postale sunt foarte importante– pentru exemplul de mai sus, cutia postala trebuie de fie vida initial

● analogie cu primitivele P si V pe semafor binar, initial valSB

= 0 Task 1 Task 2

activitate_1() activitate_2()P(SB) V(SB)activitate_3() activitate_4()

Page 12: PATR_Curs_7

Sincronizarea cu ajutorul mesajelor:

● analogie intre semafoare binare si cutii postale:

Cutie vida Semafor binar = 0Cutie nevida Semafor binar = 1Primitiva receive Primitiva PPrimitiva send Primitiva V

● deoarece cutia postala poate contine mai multe mesaje, analogia corecta este cu semafoarele generalizate, astfel:– numarul de mesaje din cutia postala este echivalentul valorii semaforului generalizat– operatia de extragere de mesaje din este echivalenta cu decrementarea valorii

semaforului generalizat– operatia de depunere de mesaje in cutie este echivalenta cu incrementarea valorii

semaforului generalizat● impreuna cu eventuala asteptare a taskului care face extragerea, deci practic

efectele sunt similare cu cele ale primitivelor P si V pe un semafor generalizat

rezulta modul in care se executa operatiile se sincronizare, excludere mutuala, etc.

Page 13: PATR_Curs_7

Excluderea mutuala cu ajutorul mesajelor:

● se utilizeaza primitive blocking receive si non - blocking send● se asociaza fiecarei excluderi mutuale dintre doua sau mai multe taskuri o cutie postala (asa cum

se asociaza un singur semafor pentru primitivele P si V)– prezenta unui mesaj in cutia postala este asociata cu accesul liber in sectiunea critica– absenta mesajului in cutia postala este asociata cu accesul interzis in sectiunea critica

● mecanismul de functionare: – o multime de procese concurente utilizeaza in comun o cutie postala (cp) - este utilizata de

catre toate procesele pentru a transmite si receptiona mesaje– cutia postala va fi initializata cu un singur mesaj, cu continut NULL

Task i, i = 1…n Task initializare

… main() {

while(true){ creaza_mailbox(cp) …. send(cp, NULL) receive(cp, mesaj) lanseaza_taskuri() // executa sectiunea critica } send(cp, mesaj) // alta secventa de program …

}

Page 14: PATR_Curs_7

Excluderea mutuala cu ajutorul mesajelor:

● solutia cu primitive de tip blocking receive si (non) blocking send asigura ca atunci cand mai multe procese executa operatia receive in mod concurent:– daca este un mesaj in cutie, el va fi transmis unui singur proces, iar

celelalte procese se blocheaza– atunci cand coada de mesaje este vida, toate procesele vor fi

blocate; – atunci cand este disponibil un mesaj, un singur proces va fi activat si

va primi mesajul

Page 15: PATR_Curs_7

Excluderea mutuala cu ajutorul mesajelor - exemplu:

● problema producator – consumator cu buffer finit– se utilizeaza doua cutii postale, mayproduce si mayconsume– cap = capacitatea bufferului mayconsume– mayconsume este organizata ca un buffer, datele din buffer sunt organizate ca o coada

de mesaje– mayproduce este umpluta initial cu un numar de mesaje nule egal cu capacitatea

bufferului mayconsume● numarul de mesaje din mayproduce descreste cu fiecare producere de date si

creste cu fiecare consumare de date

Page 16: PATR_Curs_7

Excluderea mutuala cu ajutorul mesajelor - exemplu:

Task initializare

create_mailbox(mayproduce); create_mailbox(mayconsume);for(i=1; i < cap; i++) send(mayproduce, NULL);fork(producator, consumator);

Task producator Task consumator

while(true){ while(true){

receive(mayproduce, pmsg); receive(mayconsume, cmsg); pmsg = produce_date(); consuma(cmsg);send(mayconsume, pmsg); send(mayproduce,NULL);} }

Page 17: PATR_Curs_7

Mesaje – QNX: www.qnx.com

● pentru studiu individual, toata informatia referitoare la mesaje continuta in aceasta prezentare este disponibila free pe site-ul QNX, in sectiunea documentatie

● sincronizarea prin intermediul schimbului de mesaje reprezinta principala forma de IPC pusa se dispozitie de nucleul Neutrino

● schimbul de mesaje este implementat prin functiile specifice de tip MsgSend(), MsgReceive() ]i MsgReply()

● functiile de tip send si receive sunt cu blocare – blocking send si blocking receive

● blocarea inerenta sincronizeaza executia firului care trimite mesajul, acesta se blocheaza pana la confirmarea receptiei, in timp ce firul de executie care primeste mesajul este programat pentru executie

Page 18: PATR_Curs_7

Mesaje – QNX:

RECEIVE-blocked

SEND-blocked

READY

REPLY-blocked

MsgReply() sauMsgError()

MsgSend()

MsgReceive()

MsgReply() sauMsgError()

MsgSend()

MsgReceive()

Acest thread

Alt thread

Page 19: PATR_Curs_7

Interfata C/POSIX pentru transmitere de mesaje

● "Programarea aplicatiilor in timp-real", capitolul 4, pag. 139

Page 20: PATR_Curs_7

Mesaje – QNX: canale de comunicatie

● schimbul de mesaje asociat nucleului de TR Neutrino nu se realizeaza direct, ci prin intermediul canalelor si conexiunilor

● un fir de execute care doreste sa primeasca un mesaj creaza mai intai un canal de comunicatie

● un alt fir de executie care doreste sa ii trimita un mesaj trebuie sa creeze o conexiune, prin ”atasarea” la canalul respectiv

Server

Server

Canal

Canal

Conexiune

ConexiuneClient

Page 21: PATR_Curs_7

Mesaje - QNX:

● canalele sunt folosite de servere pentru a receptiona mesaje

– canalele si conexiunile sunt reprezentate in cadrul procesului printr-un identificator de tip small integer

– conexiunile clientilor sunt mapate direct in descriptori de fisier, acest lucru eliminand necesitatea determinarii destinatarului mesajului bazandu-ne pe descriptorul de fisier (de exemplu, prin read(fd))

Page 22: PATR_Curs_7

Mesaje - QNX:

● conexiunile sunt folosite de firele de executie client, prin atasarea lor la canalele puse la dispozitie de server

Page 23: PATR_Curs_7

Mesaje – QNX: exemplu

#include <pthread.h>#include <stdio.h>#include <stdlib.h>#include <errno.h>#include <string.h>#include <unistd.h>#include <sys/neutrino.h>#include <process.h>int ChID;int PIDProces;struct mesaj{

char body[50];};

Page 24: PATR_Curs_7

Mesaje – QNX: exempluvoid* Client(void* arg) {

int srv, cID, i; char *mesaj_send_client = "Mesaj SEND de la client! \n"; struct mesaj *Cmsg, *Smsg;

Cmsg = (struct mesaj*)malloc(sizeof(struct mesaj));Smsg = (struct mesaj*)malloc(sizeof(struct mesaj));

printf("\tClient: ChID este %d\n", ChID);

cID = ConnectAttach(0, PIDProces, ChID, _NTO_SIDE_CHANNEL, 0);

if (cID == -1)perror("\tClient: eroare conectare la canal de comunicatie.\n");

printf("Client: return conectare = %d\n", cID);

strcpy(Cmsg->body, mesaj_send_client);printf("\tClient: trimit mesajul: ... %s\n", Cmsg->body);

for(i=0; i<3; i++){ srv = MsgSend(cID, Cmsg, sizeof(struct mesaj), Smsg, sizeof(struct mesaj));

printf("\tClient: am primit mesajul: ... - %s\n", Smsg->body); sleep(3); } ConnectDetach(cID);}

Page 25: PATR_Curs_7

Mesaje – QNX: exempluint main() {

pthread_t Fir_Client; int rcvID, i; struct mesaj *Cmsg, *Rmsg;char *mesaj_reply_server = "Mesaj REPLY de la server. \n";

Cmsg = (struct mesaj*)malloc(sizeof(struct mesaj));Rmsg = (struct mesaj*)malloc(sizeof(struct mesaj));

PIDProces = getpid();

ChID = ChannelCreate(0);printf("\nMain Server: am creat canal de comunicatie cu ChID = %d\n", ChID);

pthread_create(&Fir_Client, &attr, (void*)&Client, NULL);

rcvID = MsgReceive( ChID, Cmsg, sizeof(struct mesaj), NULL);

strcpy(Rmsg->body, mesaj_reply_server);

MsgReply( rcvID, 0, Rmsg, sizeof(struct mesaj));

for(i=0; i<2;i++){rcvID = MsgReceive( ChID, Cmsg, sizeof(struct mesaj), NULL);printf("\n\nServer: MsgReceive pe rcvID = %d\n", rcvID);printf("\nServer - mesajul de la client este: %s \n", Cmsg-> body);strcpy(Rmsg->body, mesaj_reply_server);MsgReply( rcvID, 0, Rmsg, sizeof(struct mesaj));

}

pthread_join(Fir_Client, NULL);ChannelDestroy(ChID);pthread_exit(NULL);

}

Page 26: PATR_Curs_7

Output Program:

Client: am primit mesajul: Mesaj REPLY de la server.

Server: MsgReceive pe rcvID = 65539

Server: mesaj de la client este: Mesaj SEND de la client!

Client: am primit mesajul: Mesaj REPLY de la server.

Server: MsgReceive pe rcvID = 65539

Server: mesaj de la client este: Mesaj SEND de la client!

Client: am primit mesajul: Mesaj REPLY de la server.

Page 27: PATR_Curs_7

Exemplu:

Enunt problema:

Sa se scrie o aplicatie multitasking pentru monitorizarea accesului mai multor procese la 2 imprimante (de tip A sau B). Se presupune ca exista 3 tipuri de procese care pot utiliza imprimantele:

● procese care utilizeaza doar imprimanta A;● procese care utilizeaza doar imprimanta B;

Sa se scrie o aplicatie multitasking formata din 3 tipuri de taskuri, astfel incat:

● sa existe taskuri care pot accesa cele doua tipuri de imprimanta – sunt procese de tip client care vor face cereri de alocare / dealocare resursa (imprimanta);

● sa existe un proces de tip server care monitoriza alocarea imprimantelor pentru procesele de tip client.

Observatie: se va considera faptul ca un proces de tip client poate elibera imprimanta pe care o foloseste (dealocare resursa) la un anumit moment de timp.

Page 28: PATR_Curs_7

Exemplu:

Cerinte:

● sa se analizeze problema si sa se propuna o solutie pentru implementare

● sa se defineasca taskurile● sa se organizeze aplicatia sub forma organigramelor● sa se aleaga un mecanism de sincronizare / comunicare (sub

QNX) pentru implementare

● sa se schiteze programul cu functii specifice sub QNX

Page 29: PATR_Curs_7

Task server

initializare; creare canal de comunicatie

asteapta mesaj

mesaj

A_READY

A_READY = 0

transmiteA_FREE

transmiteA_BUSY

A_READY = 1

transmiteA_FREE

mesaj necunoscut

A_FREE

(B_FREE)

A_REQ

(B_REQ)

1 0

eliberare resursa cerere resursa

Page 30: PATR_Curs_7

Task client (A sau B)

initializare; conectare la canal decomunicatie

transmitere cerere A_REQ(B_REQ)

asteapta mesaj raspuns

mesajraspuns

mesajnecunoscut

ERRORmesaj resursa

ocupata

mutex_lock(&A_printer)

utilizare resursa

mutex_unlock(&A_printer)

transmite A_FREE

EXIT

A_FREE

(B_FREE)

A_BUSY(B_BUSY)default