sessão 6b – Semáforos (produtor/consumidor)
N produtores M consumidores
buffer circular (parte 1)
um buffer circular é um array em que as pontas estão ligadas
se estiver cheio o array e precisarmos de preencher, voltamos à posição zero
podemos usar de forma infinita o array, estando a sobrepor os índices anteriores
cuidados com o buffer circular:
a leitura
a escrita
sincronização necessária
garantir que o consumidor só lê se existir informação para ser lida
se o produtor produzir muita informação pode preencher dados que o consumidor ainda não leu
assim o produtor só pode escrever numa posição escrita quando souber que o consumidor já a leu
usamos assim dois semáforos e dois mutexes (os mutexes resolver a questão da exclusão mutua)
os semáforos:
um para saber quantas posições do buffer circular estão preenchidas
e um para saber quantas posições do buffer circular estão vazias
na prática:
criar dois semáforos
produtor
no produtor:
while (COND) {
item_p = produz_item();
esperar(&sem_vazios);
esperar(&sem_mutex_p);
buffer[in] = item_p;
in = (in + 1) % DIM; //incrementar a posição de escrita
assinalar(&sem_mutex_p);
assinalar(&sem_itens); //indica que existem coisas para ler
}
no consumidor:
while (COND) {
esperar(&sem_itens); //controlar as posições ocupadas
esperar(&sem_mutex_c); //
item_c = buffer[out]; //ler o buffer
out = (0Ut + 1) % DIM; //incrementar a posição de leitura
assinalar(&sem_mutex_c);
assinalar(&sem_vazios); //indica que já existe uma posição vaiza
trata_item(item_c);
}
dois mutexes
para termos N produtores e M consumidores
os produtores não podem todos ocupar a mesma posição
os semáforos só garantem que que se existirem posições vazias os produtores podem ter a possibilidade popular essas posições
mas não garante a sincronização, e isso é feito com a exclusão mutua
então surge o mutex, para garantir não haver problema ao acesso concorrente
cada consumidor vai ler uma posição, a sua própria posição.
exemplo:
5 produtores são 5 processos
5 consumidores são 5 processos
podem existir
5 produtores 0 consumidores, sendo que quando o array estiver cheio eles não devem produzir mais e ficam à espera
0 produtores 5 consumidores, sendo que quando o array estiver vazio eles não devem consumir mais e ficam à espera
assim fica o produtor:
#include <Windows.h>
#include <tchar.h>
#include <math.h>
#include <stdio.h>
#include <fcntl.h>
#include <io.h>
#include <time.h>
#define TAM_BUFFER 10 //tamanho do buffer circular
typedef struct
{
int id;
int valor;
} CelulaBuffer;
//a memória partilhada
typedef struct{
int nP; //numero de produtores
int nC; //numero de consumidores
int posE; //proxima posição de escrita
int posL; //proxima posição de leitura
CelulaBuffer buffer[TAM_BUFFER]; //estrutura do buffer circular
} BufferCircular;
//lidar com a informação dos handles semáforos, threads..
typedef struct
{
BufferCircular * memoriaPartilhada;
//hanelde para cada uma dos semaforoes
HANDLE hSemEscrita; //controla as posições vazias (sem_vazios)
HANDLE hSemLeitura; // controla as posições que estão para ser lidos (sem_intens)
HANDLE hMutex; //mutex para controlar as exclusões mutuas (contexto: um exclusivo para todos os produtoes e um para todos os consumidores)
int terminar; //flag para indicar a thread para lidar quando é que ela termina, 1 para sair, 0 para o contrário
//id do produtor
int id;
} DadosThreads;
int num_aletaorios(int min, int max)
{
//gerar os valores aleatorios
return rand() % (max - min + 1) + min;
}
//thread da logica do produtor
DWORD WINAPI ThreadProdutor(LPVOID param)
{
DadosThreads* dados=(DadosThreads*)param;
int contador = 0;
CelulaBuffer cel; //celula do buffer circular que vai ser preenchida
while(!dados->terminar)
{
//logica do produtor
cel.id = dados->id;
//um valor aleatorio
cel.valor = num_aletaorios(10,99);
//escrever no buffer cicular, se existir uma posição de escrita
WaitForSingleObject(dados->hSemEscrita, INFINITE); //vão ter 10 posições
//aceder a essa posição em exclusão mutua
WaitForSingleObject(dados->hMutex, INFINITE);
//os dois waits foram desbloqueados então vamos escrever no buffer circular
CopyMemory(
&dados->memoriaPartilhada->buffer[dados->memoriaPartilhada->posE], //ponteiro (&) para onde vamos copiar
&cel, //origem
sizeof(CelulaBuffer)//quantidade de informação que vai ser copiada
);
//a proxima posição de escrita tem que ser incremantada
dados->memoriaPartilhada->posE++;
//se eu atingir o tamanho do buffer circular, pode ser feito com a logica da divisão por zero
if(dados->memoriaPartilhada->posE == TAM_BUFFER)
{
dados->memoriaPartilhada->posE = 0;
}
//libertar o mutex (entre os produtores)
ReleaseMutex(dados->hMutex);
//libertar o semaforo: libertar UMA posição de leitura, para o consumidor poder ler
//o produto espera por uma posição de escrita e liberta uma posição de leitura
//o consumidor espera por uma posição de leitura e liberta uma posição de escrita
ReleaseSemaphore(dados->hSemLeitura, 1, NULL);
contador++; //quantidade de item produzidos em cada iteração
_tprintf(TEXT("\nProdutor %d, produziu %d"), dados->id, cel.valor);
Sleep(num_aletaorios(2,4)*1000);
}
_tprintf(TEXT("\nProdutor %d, produziu %d"), dados->id, contador);
return 0;
}
int _tmain(int argc, TCHAR* argv[]) {
//criar a estrutua de memoria partilhada
HANDLE hFileMap;
//estrutura de dados para a memoria partilhada
DadosThreads dados;
//
BOOL primeirProcesso = FALSE;
//handle para a thread
HANDLE hThread;
TCHAR comando[100];
#ifdef UNICODE
_setmode(_fileno(stdin), _O_WTEXT);
_setmode(_fileno(stdout), _O_WTEXT);
_setmode(_fileno(stderr), _O_WTEXT);
#endif
//para o aletorio
srand((unsigned int)time(NULL));
//criar ou abrir os semaforos antes das funções .. para não usar unmaps
dados.hSemEscrita = CreateSemaphore(
NULL, //segurança
TAM_BUFFER,//iniciais no maximo - começa com a totalidade das poisções libertas
TAM_BUFFER,//finais no maximo - começa com a totalidade das poisções libertas
TEXT("SO_SEM_ESCRITA")//nome do semaforo
);
//controlar as posições que podem ser lidas
dados.hSemLeitura = CreateSemaphore(
NULL, //segurança
0,//iniciais nenhumas, não existe nada para ser lido
TAM_BUFFER,//finais, o maximo, a totalidade
TEXT("SO_SEM_LEITURA")//nome do semaforo
);
//criar os mutexs para a exclusão mutua dos produtores
dados.hMutex = CreateMutex(
NULL,
FALSE,
TEXT("SO_MUTEX_PRODUTORES")
);
//testar os três ultimos
if(dados.hSemEscrita == NULL || dados.hSemLeitura == NULL || dados.hMutex == NULL)
{
_tprintf(TEXT("\nErro no CreateSemaphore OU no CreateMutex"));
return 1;
}
hFileMap = OpenFileMapping(
FILE_MAP_ALL_ACCESS, //aceder
FALSE, //não vai haver processos
TEXT("SO2_MEM_PARTILHADA") //nome igual ao create file
);
if(hFileMap == NULL)
{
primeirProcesso = TRUE;
//existe um poblema no openFileMapping
//então criamos..
hFileMap = CreateFileMapping(
INVALID_HANDLE_VALUE, //handle para o FILE que vai ser criado pelo sistema operativo
NULL, //atributos de segurança
PAGE_READWRITE, //nivel de protecção, tipo de acesso , normal ser leitura/escrita
0, //dimensão da memoria partilhada (mais significativa e menos significativa, a menos fica a 0)
sizeof(BufferCircular), //parte menos significativa fica com o tamanho da memoria partilhada
TEXT("SO2_MEM_PARTILHADA")//nome para o file map
);
//verificação ao file mapping
if (hFileMap == NULL)
{
_tprintf(TEXT("\nErro no CreateFileMapping"));
return 1;
}
}
//fazer o mapeamento da memoria partilhada
dados.memoriaPartilhada = (BufferCircular *) MapViewOfFile(
hFileMap, //o handle para o file mapping
FILE_MAP_ALL_ACCESS, //permissões escrita/leitura porque tanto o consumidor e produtor fazem as duas coisas
0, //off-set, a partir do local onde queremos mapear a memoria partilhada
0,
0 //tudo a zero porque queremos mapear desde o inicio até ao final..
);
//deve haver um cast para o MapViewOfFile, porque se correr bem será um ponteiro para void (BufferCircular *)
//verificação do MapViewOfFile
if (dados.memoriaPartilhada == NULL)
{
_tprintf(TEXT("\nErro no MapViewOfFile"));
return 1;
}
//temos que incializar as variaveis que estão associados à estrutra circular
//int nP; //numero de produtores
//int nC; //numero de consumidores
//int posE; //proxima posição de escrita
//int posL; //proxima posição de leitura
//mas estas variaveis só devem ser carregadas quando o primeiro produtor arrancar, apenas e só!
if(primeirProcesso == TRUE){
dados.memoriaPartilhada->nC = 0;
dados.memoriaPartilhada->nP = 0;
dados.memoriaPartilhada->posE = 0;
dados.memoriaPartilhada->posL = 0;
}
dados.terminar = 0;
//incrementar o numero de produtores
WaitForSingleObject(dados.hMutex, INFINITE);
dados.memoriaPartilhada->nP++;
dados.id = dados.memoriaPartilhada->nP;
//libertar o mutex
ReleaseMutex(dados.hMutex);
//a thread
hThread = CreateThread(NULL, 0, ThreadProdutor, &dados, 0, NULL);
if(hThread != NULL)
{
//thread criada
_tprintf(TEXT("\nEscreva qualquer coisa para sair.."));
_getts_s(comando, 100);
dados.terminar = 1;
WaitForSingleObject(hThread, INFINITE);
}
//memoria partilhada
UnmapViewOfFile(dados.memoriaPartilhada);
//close dos handles: termina quando processo termina, não é preciso criar isto!
return 0;
}
assim fica o consumidor:
#include <Windows.h>
#include <tchar.h>
#include <math.h>
#include <stdio.h>
#include <fcntl.h>
#include <io.h>
#include <time.h>
#define TAM_BUFFER 10
typedef struct
{
int id;
int valor;
} CelulaBuffer;
typedef struct {
int nP;
int nC;
int posE;
int posL;
CelulaBuffer buffer[TAM_BUFFER];
} BufferCircular;
typedef struct
{
BufferCircular* memoriaPartilhada;
HANDLE hSemEscrita;
HANDLE hSemLeitura;
HANDLE hMutex;
int terminar;
int id;
} DadosThreads;
DWORD WINAPI ThreadConsumidor(LPVOID param)
{
DadosThreads* dados = (DadosThreads*)param;
int contador = 0;
//consumidor
int soma = 0;
CelulaBuffer cel;
while (!dados->terminar)
{
//bloquer no semaforo de leitura
//consumidor
WaitForSingleObject(dados->hSemLeitura, INFINITE);
WaitForSingleObject(dados->hMutex, INFINITE);
//consumidor
CopyMemory(
&cel, //variavel interna, local
&dados->memoriaPartilhada->buffer[dados->memoriaPartilhada->posL], //origem, proxima posição de leitura
sizeof(CelulaBuffer)
);
//consumidor
dados->memoriaPartilhada->posL++;
//consumidor
if (dados->memoriaPartilhada->posL == TAM_BUFFER)
{
dados->memoriaPartilhada->posL = 0;
}
ReleaseMutex(dados->hMutex);
//consumidor
ReleaseSemaphore(dados->hSemEscrita, 1, NULL);
contador++;
//consumidor
soma += cel.valor;
_tprintf(TEXT("\nConsumidor %d, consumiu %d"), dados->id, cel.valor);
}
_tprintf(TEXT("\nConsumidor %d, somando um valor %d"), dados->id, soma);
return 0;
}
int _tmain(int argc, TCHAR* argv[]) {
//criar a estrutua de memoria partilhada
HANDLE hFileMap;
//estrutura de dados para a memoria partilhada
DadosThreads dados;
//
BOOL primeirProcesso = FALSE;
//handle para a thread
HANDLE hThread;
TCHAR comando[100];
#ifdef UNICODE
_setmode(_fileno(stdin), _O_WTEXT);
_setmode(_fileno(stdout), _O_WTEXT);
_setmode(_fileno(stderr), _O_WTEXT);
#endif
//para o aletorio
srand((unsigned int)time(NULL));
//criar ou abrir os semaforos antes das funções .. para não usar unmaps
dados.hSemEscrita = CreateSemaphore(
NULL, //segurança
TAM_BUFFER,//iniciais no maximo - começa com a totalidade das poisções libertas
TAM_BUFFER,//finais no maximo - começa com a totalidade das poisções libertas
TEXT("SO_SEM_ESCRITA")//nome do semaforo
);
//controlar as posições que podem ser lidas
dados.hSemLeitura = CreateSemaphore(
NULL, //segurança
0,//iniciais nenhumas, não existe nada para ser lido
TAM_BUFFER,//finais, o maximo, a totalidade
TEXT("SO_SEM_LEITURA")//nome do semaforo
);
//consumidor
dados.hMutex = CreateMutex(
NULL,
FALSE,
TEXT("SO_MUTEX_CONSUMIDORES")
);
//testar os três ultimos
if (dados.hSemEscrita == NULL || dados.hSemLeitura == NULL || dados.hMutex == NULL)
{
_tprintf(TEXT("\nErro no CreateSemaphore OU no CreateMutex"));
return 1;
}
hFileMap = OpenFileMapping(
FILE_MAP_ALL_ACCESS, //aceder
FALSE, //não vai haver processos
TEXT("SO2_MEM_PARTILHADA") //nome igual ao create file
);
if (hFileMap == NULL)
{
primeirProcesso = TRUE;
//existe um poblema no openFileMapping
//então criamos..
hFileMap = CreateFileMapping(
INVALID_HANDLE_VALUE, //handle para o FILE que vai ser criado pelo sistema operativo
NULL, //atributos de segurança
PAGE_READWRITE, //nivel de protecção, tipo de acesso , normal ser leitura/escrita
0, //dimensão da memoria partilhada (mais significativa e menos significativa, a menos fica a 0)
sizeof(BufferCircular), //parte menos significativa fica com o tamanho da memoria partilhada
TEXT("SO2_MEM_PARTILHADA")//nome para o file map
);
//verificação ao file mapping
if (hFileMap == NULL)
{
_tprintf(TEXT("\nErro no CreateFileMapping"));
return 1;
}
}
//fazer o mapeamento da memoria partilhada
dados.memoriaPartilhada = (BufferCircular*)MapViewOfFile(
hFileMap, //o handle para o file mapping
FILE_MAP_ALL_ACCESS, //permissões escrita/leitura porque tanto o consumidor e produtor fazem as duas coisas
0, //off-set, a partir do local onde queremos mapear a memoria partilhada
0,
0 //tudo a zero porque queremos mapear desde o inicio até ao final..
);
//deve haver um cast para o MapViewOfFile, porque se correr bem será um ponteiro para void (BufferCircular *)
//verificação do MapViewOfFile
if (dados.memoriaPartilhada == NULL)
{
_tprintf(TEXT("\nErro no MapViewOfFile"));
return 1;
}
//temos que incializar as variaveis que estão associados à estrutra circular
//int nP; //numero de produtores
//int nC; //numero de consumidores
//int posE; //proxima posição de escrita
//int posL; //proxima posição de leitura
//mas estas variaveis só devem ser carregadas quando o primeiro produtor arrancar, apenas e só!
if (primeirProcesso == TRUE) {
dados.memoriaPartilhada->nC = 0;
dados.memoriaPartilhada->nP = 0;
dados.memoriaPartilhada->posE = 0;
dados.memoriaPartilhada->posL = 0;
}
dados.terminar = 0;
//consumidor
WaitForSingleObject(dados.hMutex, INFINITE);
dados.memoriaPartilhada->nC++;
dados.id = dados.memoriaPartilhada->nC;
//libertar o mutex
ReleaseMutex(dados.hMutex);
//a thread
hThread = CreateThread(NULL, 0, ThreadConsumidor, &dados, 0, NULL);
if (hThread != NULL)
{
//thread criada
_tprintf(TEXT("\nEscreva qualquer coisa para sair.."));
_getts_s(comando, 100);
dados.terminar = 1;
WaitForSingleObject(hThread, INFINITE);
}
//memoria partilhada
UnmapViewOfFile(dados.memoriaPartilhada);
//close dos handles: termina quando processo termina, não é preciso criar isto!
return 0;
}
para relembrar a versão light do semáforo:
#include <Windows.h>
#include <tchar.h>
#include <math.h>
#include <stdio.h>
#include <fcntl.h>
#include <io.h>
#include <shlwapi.h>
//memória partilhada entre os processos
//usar mecanismos de sincronização, para garantir exclusão mutua nesse bloco de memtória partilhada
//em windows: mapeamento de ficheiros
#define NUM_CARACTERES 100
typedef struct
{
//ponteiro para memoria partilhada
TCHAR* fileViewMap;
//handle para o evento
HANDLE hEvent;
//handle para o mutex
HANDLE hMutex;
//flag para controlar as threads (aquela que escreve o fim)
int terminar;
} ThreadDados;
DWORD WINAPI ThreadLer(LPVOID param)
{
ThreadDados* dados = (ThreadDados*)param;
while (1)
{
//bloquear à espera do evento evitar a espera ativa
WaitForSingleObject(dados->hEvent, INFINITE);
if (dados->terminar)
break;
WaitForSingleObject(dados->hMutex, INFINITE);
//desbloqueou: e aqui deve ser o codigo mais curto possivel!!!
//é uma zona critica
_tprintf(TEXT("\nmensagem recebida: %s"), dados->fileViewMap);
//libertar o mutex
ReleaseMutex(dados->hMutex);
Sleep(1000);
}
return 0;
}
DWORD WINAPI ThreadEscrever(LPVOID param)
{
ThreadDados* dados = (ThreadDados*)param;
TCHAR msg[NUM_CARACTERES];
while (!(dados->terminar))
{
_fgetts(msg, NUM_CARACTERES, stdin);
msg[_tcslen(msg) - 1] = '\0';
if (_tcscmp(msg, TEXT("fim")) == 0)
{
dados->terminar = 1;
}
//bloquear à espera do evento evitar a espera ativa
WaitForSingleObject(dados->hMutex, INFINITE);
//limpar a memoria partilhada
ZeroMemory(dados->fileViewMap, NUM_CARACTERES * sizeof(TCHAR));
//copiar o conteudo para a memoria partilhada
CopyMemory(dados->fileViewMap, msg, _tcslen(msg) * sizeof(TCHAR));
//libertar o mutex
ReleaseMutex(dados->hMutex);
SetEvent(dados->hEvent);
Sleep(500);
ResetEvent(dados->hEvent);
}
return 0;
}
int _tmain(int argc, TCHAR* argv[]) {
HANDLE hfileMap;
ThreadDados dados;
//criar as threads
HANDLE hThreads[2]; //1 de escrita e 1 de leitura
//semaforos, é uma generalziação do mutex
HANDLE hSemaforo;
#ifdef UNICODE
_setmode(_fileno(stdin), _O_WTEXT);
_setmode(_fileno(stdout), _O_WTEXT);
_setmode(_fileno(stderr), _O_WTEXT);
#endif
//criar o semaforo antes da memoria partilhada
hSemaforo = CreateSemaphore(NULL,
2, //qts começam disponiveis de inicio
2, //qt são ao mesmo tempo
TEXT("SO2_SEMAFORO")
);
if (hSemaforo == NULL)
{
_tprintf(TEXT("\nErro no CreateSemaphore"));
return 1;
}
//esperar por uma slote
_tprintf(TEXT("\naguardar por uma slote.."));
WaitForSingleObject(hSemaforo, INFINITE); //quem não tem espaço aguarda.
_tprintf(TEXT("\nchat.."));
hfileMap = CreateFileMapping(
INVALID_HANDLE_VALUE, //sistema operativo que faça a gestao
NULL,
PAGE_READWRITE,
0,
NUM_CARACTERES * sizeof(TCHAR), //tamanho do file mapping
TEXT("SO2_MEM_PART") //nome do file mapping
);
if (hfileMap == NULL)
{
_tprintf(TEXT("\nErro no CreateFileMapping"));
return 1;
}
dados.fileViewMap = (TCHAR*)MapViewOfFile(
hfileMap,
FILE_MAP_ALL_ACCESS,
0,
0,
0
);
if (dados.fileViewMap == NULL)
{
_tprintf(TEXT("\nErro no fileViewMap"));
return 1;
}
//o evento
dados.hEvent = CreateEvent(
NULL,
TRUE, //reset manual
FALSE,
TEXT("SO2_EVENTO")
);
if (dados.hEvent == NULL)
{
_tprintf(TEXT("\nErro no CreateEvent"));
UnmapViewOfFile(dados.fileViewMap); //pois ja temos e memoria partilhada
return 1;
}
dados.hMutex = CreateMutex(
NULL,
FALSE,
TEXT("SO2_MUTEX")
);
//sincronização da memoria partilhada
if (dados.hMutex == NULL)
{
_tprintf(TEXT("\nErro no CreateMutex"));
UnmapViewOfFile(dados.fileViewMap); //pois ja temos e memoria partilhada
return 1;
}
dados.terminar = 0;
//criar as threads (a de ler e a de escrever)
hThreads[0] = CreateThread(NULL, 0, ThreadLer, &dados, 0, NULL);
hThreads[1] = CreateThread(NULL, 0, ThreadEscrever, &dados, 0, NULL);
if (hThreads[0] != NULL && hThreads[1] != NULL)
{
WaitForMultipleObjects(2, hThreads, TRUE, INFINITE);
}
//lidar com os semaforos
ReleaseSemaphore(hSemaforo, 1, NULL);
UnmapViewOfFile(dados.fileViewMap);
CloseHandle(hfileMap);
return 0;
}
0 thoughts on “sessão 6b – Semáforos (produtor/consumidor)”