Tag: SO2 – 2021 – 7.b – Sincronizacao – prod-cons v.11.pdf
sessão 6b – Semáforos (produtor/consumidor)
N produtores M consumidores
buffer circular (parte 1)
um buffer circular é um array em que as pontas estão ligadas
se estiver cheio o array e precisarmos de preencher, voltamos à posição zero
podemos usar de forma infinita o array, estando a sobrepor os índices anteriores
cuidados com o buffer circular:
a leitura
a escrita
sincronização necessária
garantir que o consumidor só lê se existir informação para ser lida
se o produtor produzir muita informação pode preencher dados que o consumidor ainda não leu
assim o produtor só pode escrever numa posição escrita quando souber que o consumidor já a leu
usamos assim dois semáforos e dois mutexes (os mutexes resolver a questão da exclusão mutua)
os semáforos:
um para saber quantas posições do buffer circular estão preenchidas
e um para saber quantas posições do buffer circular estão vazias
na prática:
criar dois semáforos
produtor
no produtor: while (COND) { item_p = produz_item(); esperar(&sem_vazios); esperar(&sem_mutex_p); buffer[in] = item_p; in = (in + 1) % DIM; //incrementar a posição de escrita assinalar(&sem_mutex_p); assinalar(&sem_itens); //indica que existem coisas para ler } no consumidor: while (COND) { esperar(&sem_itens); //controlar as posições ocupadas esperar(&sem_mutex_c); // item_c = buffer[out]; //ler o buffer out = (0Ut + 1) % DIM; //incrementar a posição de leitura assinalar(&sem_mutex_c); assinalar(&sem_vazios); //indica que já existe uma posição vaiza trata_item(item_c); }
dois mutexes
para termos N produtores e M consumidores
os produtores não podem todos ocupar a mesma posição
os semáforos só garantem que que se existirem posições vazias os produtores podem ter a possibilidade popular essas posições
mas não garante a sincronização, e isso é feito com a exclusão mutua
então surge o mutex, para garantir não haver problema ao acesso concorrente
cada consumidor vai ler uma posição, a sua própria posição.
exemplo:
5 produtores são 5 processos
5 consumidores são 5 processos
podem existir
5 produtores 0 consumidores, sendo que quando o array estiver cheio eles não devem produzir mais e ficam à espera
0 produtores 5 consumidores, sendo que quando o array estiver vazio eles não devem consumir mais e ficam à espera
assim fica o produtor:
#include <Windows.h> #include <tchar.h> #include <math.h> #include <stdio.h> #include <fcntl.h> #include <io.h> #include <time.h> #define TAM_BUFFER 10 //tamanho do buffer circular typedef struct { int id; int valor; } CelulaBuffer; //a memória partilhada typedef struct{ int nP; //numero de produtores int nC; //numero de consumidores int posE; //proxima posição de escrita int posL; //proxima posição de leitura CelulaBuffer buffer[TAM_BUFFER]; //estrutura do buffer circular } BufferCircular; //lidar com a informação dos handles semáforos, threads.. typedef struct { BufferCircular * memoriaPartilhada; //hanelde para cada uma dos semaforoes HANDLE hSemEscrita; //controla as posições vazias (sem_vazios) HANDLE hSemLeitura; // controla as posições que estão para ser lidos (sem_intens) HANDLE hMutex; //mutex para controlar as exclusões mutuas (contexto: um exclusivo para todos os produtoes e um para todos os consumidores) int terminar; //flag para indicar a thread para lidar quando é que ela termina, 1 para sair, 0 para o contrário //id do produtor int id; } DadosThreads; int num_aletaorios(int min, int max) { //gerar os valores aleatorios return rand() % (max - min + 1) + min; } //thread da logica do produtor DWORD WINAPI ThreadProdutor(LPVOID param) { DadosThreads* dados=(DadosThreads*)param; int contador = 0; CelulaBuffer cel; //celula do buffer circular que vai ser preenchida while(!dados->terminar) { //logica do produtor cel.id = dados->id; //um valor aleatorio cel.valor = num_aletaorios(10,99); //escrever no buffer cicular, se existir uma posição de escrita WaitForSingleObject(dados->hSemEscrita, INFINITE); //vão ter 10 posições //aceder a essa posição em exclusão mutua WaitForSingleObject(dados->hMutex, INFINITE); //os dois waits foram desbloqueados então vamos escrever no buffer circular CopyMemory( &dados->memoriaPartilhada->buffer[dados->memoriaPartilhada->posE], //ponteiro (&) para onde vamos copiar &cel, //origem sizeof(CelulaBuffer)//quantidade de informação que vai ser copiada ); //a proxima posição de escrita tem que ser incremantada dados->memoriaPartilhada->posE++; //se eu atingir o tamanho do buffer circular, pode ser feito com a logica da divisão por zero if(dados->memoriaPartilhada->posE == TAM_BUFFER) { dados->memoriaPartilhada->posE = 0; } //libertar o mutex (entre os produtores) ReleaseMutex(dados->hMutex); //libertar o semaforo: libertar UMA posição de leitura, para o consumidor poder ler //o produto espera por uma posição de escrita e liberta uma posição de leitura //o consumidor espera por uma posição de leitura e liberta uma posição de escrita ReleaseSemaphore(dados->hSemLeitura, 1, NULL); contador++; //quantidade de item produzidos em cada iteração _tprintf(TEXT("\nProdutor %d, produziu %d"), dados->id, cel.valor); Sleep(num_aletaorios(2,4)*1000); } _tprintf(TEXT("\nProdutor %d, produziu %d"), dados->id, contador); return 0; } int _tmain(int argc, TCHAR* argv[]) { //criar a estrutua de memoria partilhada HANDLE hFileMap; //estrutura de dados para a memoria partilhada DadosThreads dados; // BOOL primeirProcesso = FALSE; //handle para a thread HANDLE hThread; TCHAR comando[100]; #ifdef UNICODE _setmode(_fileno(stdin), _O_WTEXT); _setmode(_fileno(stdout), _O_WTEXT); _setmode(_fileno(stderr), _O_WTEXT); #endif //para o aletorio srand((unsigned int)time(NULL)); //criar ou abrir os semaforos antes das funções .. para não usar unmaps dados.hSemEscrita = CreateSemaphore( NULL, //segurança TAM_BUFFER,//iniciais no maximo - começa com a totalidade das poisções libertas TAM_BUFFER,//finais no maximo - começa com a totalidade das poisções libertas TEXT("SO_SEM_ESCRITA")//nome do semaforo ); //controlar as posições que podem ser lidas dados.hSemLeitura = CreateSemaphore( NULL, //segurança 0,//iniciais nenhumas, não existe nada para ser lido TAM_BUFFER,//finais, o maximo, a totalidade TEXT("SO_SEM_LEITURA")//nome do semaforo ); //criar os mutexs para a exclusão mutua dos produtores dados.hMutex = CreateMutex( NULL, FALSE, TEXT("SO_MUTEX_PRODUTORES") ); //testar os três ultimos if(dados.hSemEscrita == NULL || dados.hSemLeitura == NULL || dados.hMutex == NULL) { _tprintf(TEXT("\nErro no CreateSemaphore OU no CreateMutex")); return 1; } hFileMap = OpenFileMapping( FILE_MAP_ALL_ACCESS, //aceder FALSE, //não vai haver processos TEXT("SO2_MEM_PARTILHADA") //nome igual ao create file ); if(hFileMap == NULL) { primeirProcesso = TRUE; //existe um poblema no openFileMapping //então criamos.. hFileMap = CreateFileMapping( INVALID_HANDLE_VALUE, //handle para o FILE que vai ser criado pelo sistema operativo NULL, //atributos de segurança PAGE_READWRITE, //nivel de protecção, tipo de acesso , normal ser leitura/escrita 0, //dimensão da memoria partilhada (mais significativa e menos significativa, a menos fica a 0) sizeof(BufferCircular), //parte menos significativa fica com o tamanho da memoria partilhada TEXT("SO2_MEM_PARTILHADA")//nome para o file map ); //verificação ao file mapping if (hFileMap == NULL) { _tprintf(TEXT("\nErro no CreateFileMapping")); return 1; } } //fazer o mapeamento da memoria partilhada dados.memoriaPartilhada = (BufferCircular *) MapViewOfFile( hFileMap, //o handle para o file mapping FILE_MAP_ALL_ACCESS, //permissões escrita/leitura porque tanto o consumidor e produtor fazem as duas coisas 0, //off-set, a partir do local onde queremos mapear a memoria partilhada 0, 0 //tudo a zero porque queremos mapear desde o inicio até ao final.. ); //deve haver um cast para o MapViewOfFile, porque se correr bem será um ponteiro para void (BufferCircular *) //verificação do MapViewOfFile if (dados.memoriaPartilhada == NULL) { _tprintf(TEXT("\nErro no MapViewOfFile")); return 1; } //temos que incializar as variaveis que estão associados à estrutra circular //int nP; //numero de produtores //int nC; //numero de consumidores //int posE; //proxima posição de escrita //int posL; //proxima posição de leitura //mas estas variaveis só devem ser carregadas quando o primeiro produtor arrancar, apenas e só! if(primeirProcesso == TRUE){ dados.memoriaPartilhada->nC = 0; dados.memoriaPartilhada->nP = 0; dados.memoriaPartilhada->posE = 0; dados.memoriaPartilhada->posL = 0; } dados.terminar = 0; //incrementar o numero de produtores WaitForSingleObject(dados.hMutex, INFINITE); dados.memoriaPartilhada->nP++; dados.id = dados.memoriaPartilhada->nP; //libertar o mutex ReleaseMutex(dados.hMutex); //a thread hThread = CreateThread(NULL, 0, ThreadProdutor, &dados, 0, NULL); if(hThread != NULL) { //thread criada _tprintf(TEXT("\nEscreva qualquer coisa para sair..")); _getts_s(comando, 100); dados.terminar = 1; WaitForSingleObject(hThread, INFINITE); } //memoria partilhada UnmapViewOfFile(dados.memoriaPartilhada); //close dos handles: termina quando processo termina, não é preciso criar isto! return 0; }
assim fica o consumidor:
#include <Windows.h> #include <tchar.h> #include <math.h> #include <stdio.h> #include <fcntl.h> #include <io.h> #include <time.h> #define TAM_BUFFER 10 typedef struct { int id; int valor; } CelulaBuffer; typedef struct { int nP; int nC; int posE; int posL; CelulaBuffer buffer[TAM_BUFFER]; } BufferCircular; typedef struct { BufferCircular* memoriaPartilhada; HANDLE hSemEscrita; HANDLE hSemLeitura; HANDLE hMutex; int terminar; int id; } DadosThreads; DWORD WINAPI ThreadConsumidor(LPVOID param) { DadosThreads* dados = (DadosThreads*)param; int contador = 0; //consumidor int soma = 0; CelulaBuffer cel; while (!dados->terminar) { //bloquer no semaforo de leitura //consumidor WaitForSingleObject(dados->hSemLeitura, INFINITE); WaitForSingleObject(dados->hMutex, INFINITE); //consumidor CopyMemory( &cel, //variavel interna, local &dados->memoriaPartilhada->buffer[dados->memoriaPartilhada->posL], //origem, proxima posição de leitura sizeof(CelulaBuffer) ); //consumidor dados->memoriaPartilhada->posL++; //consumidor if (dados->memoriaPartilhada->posL == TAM_BUFFER) { dados->memoriaPartilhada->posL = 0; } ReleaseMutex(dados->hMutex); //consumidor ReleaseSemaphore(dados->hSemEscrita, 1, NULL); contador++; //consumidor soma += cel.valor; _tprintf(TEXT("\nConsumidor %d, consumiu %d"), dados->id, cel.valor); } _tprintf(TEXT("\nConsumidor %d, somando um valor %d"), dados->id, soma); return 0; } int _tmain(int argc, TCHAR* argv[]) { //criar a estrutua de memoria partilhada HANDLE hFileMap; //estrutura de dados para a memoria partilhada DadosThreads dados; // BOOL primeirProcesso = FALSE; //handle para a thread HANDLE hThread; TCHAR comando[100]; #ifdef UNICODE _setmode(_fileno(stdin), _O_WTEXT); _setmode(_fileno(stdout), _O_WTEXT); _setmode(_fileno(stderr), _O_WTEXT); #endif //para o aletorio srand((unsigned int)time(NULL)); //criar ou abrir os semaforos antes das funções .. para não usar unmaps dados.hSemEscrita = CreateSemaphore( NULL, //segurança TAM_BUFFER,//iniciais no maximo - começa com a totalidade das poisções libertas TAM_BUFFER,//finais no maximo - começa com a totalidade das poisções libertas TEXT("SO_SEM_ESCRITA")//nome do semaforo ); //controlar as posições que podem ser lidas dados.hSemLeitura = CreateSemaphore( NULL, //segurança 0,//iniciais nenhumas, não existe nada para ser lido TAM_BUFFER,//finais, o maximo, a totalidade TEXT("SO_SEM_LEITURA")//nome do semaforo ); //consumidor dados.hMutex = CreateMutex( NULL, FALSE, TEXT("SO_MUTEX_CONSUMIDORES") ); //testar os três ultimos if (dados.hSemEscrita == NULL || dados.hSemLeitura == NULL || dados.hMutex == NULL) { _tprintf(TEXT("\nErro no CreateSemaphore OU no CreateMutex")); return 1; } hFileMap = OpenFileMapping( FILE_MAP_ALL_ACCESS, //aceder FALSE, //não vai haver processos TEXT("SO2_MEM_PARTILHADA") //nome igual ao create file ); if (hFileMap == NULL) { primeirProcesso = TRUE; //existe um poblema no openFileMapping //então criamos.. hFileMap = CreateFileMapping( INVALID_HANDLE_VALUE, //handle para o FILE que vai ser criado pelo sistema operativo NULL, //atributos de segurança PAGE_READWRITE, //nivel de protecção, tipo de acesso , normal ser leitura/escrita 0, //dimensão da memoria partilhada (mais significativa e menos significativa, a menos fica a 0) sizeof(BufferCircular), //parte menos significativa fica com o tamanho da memoria partilhada TEXT("SO2_MEM_PARTILHADA")//nome para o file map ); //verificação ao file mapping if (hFileMap == NULL) { _tprintf(TEXT("\nErro no CreateFileMapping")); return 1; } } //fazer o mapeamento da memoria partilhada dados.memoriaPartilhada = (BufferCircular*)MapViewOfFile( hFileMap, //o handle para o file mapping FILE_MAP_ALL_ACCESS, //permissões escrita/leitura porque tanto o consumidor e produtor fazem as duas coisas 0, //off-set, a partir do local onde queremos mapear a memoria partilhada 0, 0 //tudo a zero porque queremos mapear desde o inicio até ao final.. ); //deve haver um cast para o MapViewOfFile, porque se correr bem será um ponteiro para void (BufferCircular *) //verificação do MapViewOfFile if (dados.memoriaPartilhada == NULL) { _tprintf(TEXT("\nErro no MapViewOfFile")); return 1; } //temos que incializar as variaveis que estão associados à estrutra circular //int nP; //numero de produtores //int nC; //numero de consumidores //int posE; //proxima posição de escrita //int posL; //proxima posição de leitura //mas estas variaveis só devem ser carregadas quando o primeiro produtor arrancar, apenas e só! if (primeirProcesso == TRUE) { dados.memoriaPartilhada->nC = 0; dados.memoriaPartilhada->nP = 0; dados.memoriaPartilhada->posE = 0; dados.memoriaPartilhada->posL = 0; } dados.terminar = 0; //consumidor WaitForSingleObject(dados.hMutex, INFINITE); dados.memoriaPartilhada->nC++; dados.id = dados.memoriaPartilhada->nC; //libertar o mutex ReleaseMutex(dados.hMutex); //a thread hThread = CreateThread(NULL, 0, ThreadConsumidor, &dados, 0, NULL); if (hThread != NULL) { //thread criada _tprintf(TEXT("\nEscreva qualquer coisa para sair..")); _getts_s(comando, 100); dados.terminar = 1; WaitForSingleObject(hThread, INFINITE); } //memoria partilhada UnmapViewOfFile(dados.memoriaPartilhada); //close dos handles: termina quando processo termina, não é preciso criar isto! return 0; }
para relembrar a versão light do semáforo:
#include <Windows.h> #include <tchar.h> #include <math.h> #include <stdio.h> #include <fcntl.h> #include <io.h> #include <shlwapi.h> //memória partilhada entre os processos //usar mecanismos de sincronização, para garantir exclusão mutua nesse bloco de memtória partilhada //em windows: mapeamento de ficheiros #define NUM_CARACTERES 100 typedef struct { //ponteiro para memoria partilhada TCHAR* fileViewMap; //handle para o evento HANDLE hEvent; //handle para o mutex HANDLE hMutex; //flag para controlar as threads (aquela que escreve o fim) int terminar; } ThreadDados; DWORD WINAPI ThreadLer(LPVOID param) { ThreadDados* dados = (ThreadDados*)param; while (1) { //bloquear à espera do evento evitar a espera ativa WaitForSingleObject(dados->hEvent, INFINITE); if (dados->terminar) break; WaitForSingleObject(dados->hMutex, INFINITE); //desbloqueou: e aqui deve ser o codigo mais curto possivel!!! //é uma zona critica _tprintf(TEXT("\nmensagem recebida: %s"), dados->fileViewMap); //libertar o mutex ReleaseMutex(dados->hMutex); Sleep(1000); } return 0; } DWORD WINAPI ThreadEscrever(LPVOID param) { ThreadDados* dados = (ThreadDados*)param; TCHAR msg[NUM_CARACTERES]; while (!(dados->terminar)) { _fgetts(msg, NUM_CARACTERES, stdin); msg[_tcslen(msg) - 1] = '\0'; if (_tcscmp(msg, TEXT("fim")) == 0) { dados->terminar = 1; } //bloquear à espera do evento evitar a espera ativa WaitForSingleObject(dados->hMutex, INFINITE); //limpar a memoria partilhada ZeroMemory(dados->fileViewMap, NUM_CARACTERES * sizeof(TCHAR)); //copiar o conteudo para a memoria partilhada CopyMemory(dados->fileViewMap, msg, _tcslen(msg) * sizeof(TCHAR)); //libertar o mutex ReleaseMutex(dados->hMutex); SetEvent(dados->hEvent); Sleep(500); ResetEvent(dados->hEvent); } return 0; } int _tmain(int argc, TCHAR* argv[]) { HANDLE hfileMap; ThreadDados dados; //criar as threads HANDLE hThreads[2]; //1 de escrita e 1 de leitura //semaforos, é uma generalziação do mutex HANDLE hSemaforo; #ifdef UNICODE _setmode(_fileno(stdin), _O_WTEXT); _setmode(_fileno(stdout), _O_WTEXT); _setmode(_fileno(stderr), _O_WTEXT); #endif //criar o semaforo antes da memoria partilhada hSemaforo = CreateSemaphore(NULL, 2, //qts começam disponiveis de inicio 2, //qt são ao mesmo tempo TEXT("SO2_SEMAFORO") ); if (hSemaforo == NULL) { _tprintf(TEXT("\nErro no CreateSemaphore")); return 1; } //esperar por uma slote _tprintf(TEXT("\naguardar por uma slote..")); WaitForSingleObject(hSemaforo, INFINITE); //quem não tem espaço aguarda. _tprintf(TEXT("\nchat..")); hfileMap = CreateFileMapping( INVALID_HANDLE_VALUE, //sistema operativo que faça a gestao NULL, PAGE_READWRITE, 0, NUM_CARACTERES * sizeof(TCHAR), //tamanho do file mapping TEXT("SO2_MEM_PART") //nome do file mapping ); if (hfileMap == NULL) { _tprintf(TEXT("\nErro no CreateFileMapping")); return 1; } dados.fileViewMap = (TCHAR*)MapViewOfFile( hfileMap, FILE_MAP_ALL_ACCESS, 0, 0, 0 ); if (dados.fileViewMap == NULL) { _tprintf(TEXT("\nErro no fileViewMap")); return 1; } //o evento dados.hEvent = CreateEvent( NULL, TRUE, //reset manual FALSE, TEXT("SO2_EVENTO") ); if (dados.hEvent == NULL) { _tprintf(TEXT("\nErro no CreateEvent")); UnmapViewOfFile(dados.fileViewMap); //pois ja temos e memoria partilhada return 1; } dados.hMutex = CreateMutex( NULL, FALSE, TEXT("SO2_MUTEX") ); //sincronização da memoria partilhada if (dados.hMutex == NULL) { _tprintf(TEXT("\nErro no CreateMutex")); UnmapViewOfFile(dados.fileViewMap); //pois ja temos e memoria partilhada return 1; } dados.terminar = 0; //criar as threads (a de ler e a de escrever) hThreads[0] = CreateThread(NULL, 0, ThreadLer, &dados, 0, NULL); hThreads[1] = CreateThread(NULL, 0, ThreadEscrever, &dados, 0, NULL); if (hThreads[0] != NULL && hThreads[1] != NULL) { WaitForMultipleObjects(2, hThreads, TRUE, INFINITE); } //lidar com os semaforos ReleaseSemaphore(hSemaforo, 1, NULL); UnmapViewOfFile(dados.fileViewMap); CloseHandle(hfileMap); return 0; }