sessão 6b – Semáforos (produtor/consumidor)

N produtores M consumidores
buffer circular (parte 1)
um buffer circular é um array em que as pontas estão ligadas
se estiver cheio o array e precisarmos de preencher, voltamos à posição zero
podemos usar de forma infinita o array, estando a sobrepor os índices anteriores
cuidados com o buffer circular:
a leitura
a escrita
sincronização necessária
garantir que o consumidor só lê se existir informação para ser lida
se o produtor produzir muita informação pode preencher dados que o consumidor ainda não leu
assim o produtor só pode escrever numa posição escrita quando souber que o consumidor já a leu
usamos assim dois semáforos e dois mutexes (os mutexes resolver a questão da exclusão mutua)
os semáforos:
um para saber quantas posições do buffer circular estão preenchidas
e um para saber quantas posições do buffer circular estão vazias
na prática:
criar dois semáforos
produtor

no produtor:
while (COND) {
	item_p = produz_item();
	esperar(&sem_vazios);
	esperar(&sem_mutex_p);
	buffer[in] = item_p;
	in = (in + 1) % DIM;  //incrementar a posição de escrita
	assinalar(&sem_mutex_p);
	assinalar(&sem_itens); //indica que existem coisas para ler
}
no consumidor:
while (COND) {
	esperar(&sem_itens); //controlar as posições ocupadas
	esperar(&sem_mutex_c); //
	item_c = buffer[out]; //ler o buffer
	out = (0Ut + 1) % DIM; //incrementar a posição de leitura
	assinalar(&sem_mutex_c);
	assinalar(&sem_vazios); //indica que já existe uma posição vaiza
	trata_item(item_c);	
}

dois mutexes
para termos N produtores e M consumidores
os produtores não podem todos ocupar a mesma posição
os semáforos só garantem que que se existirem posições vazias os produtores podem ter a possibilidade popular essas posições
mas não garante a sincronização, e isso é feito com a exclusão mutua
então surge o mutex, para garantir não haver problema ao acesso concorrente
cada consumidor vai ler uma posição, a sua própria posição.
exemplo:
5 produtores são 5 processos
5 consumidores são 5 processos
podem existir
5 produtores 0 consumidores, sendo que quando o array estiver cheio eles não devem produzir mais e ficam à espera
0 produtores 5 consumidores, sendo que quando o array estiver vazio eles não devem consumir mais e ficam à espera

assim fica o produtor:

#include <Windows.h>
#include <tchar.h>
#include <math.h>
#include <stdio.h>
#include <fcntl.h>
#include <io.h>
#include <time.h>

#define TAM_BUFFER 10 //tamanho do buffer circular

typedef struct
{
    int id;
    int valor;
} CelulaBuffer;

//a memória partilhada
typedef struct{
    int nP; //numero de produtores
    int nC; //numero de consumidores
    int posE; //proxima posição de escrita
    int posL; //proxima posição de leitura
    CelulaBuffer buffer[TAM_BUFFER]; //estrutura do buffer circular
} BufferCircular;

//lidar com a informação dos handles semáforos, threads..
typedef struct
{
    BufferCircular * memoriaPartilhada;
	//hanelde para cada uma dos semaforoes
    HANDLE hSemEscrita; //controla as posições vazias (sem_vazios)
    HANDLE hSemLeitura; // controla as posições que estão para ser lidos (sem_intens)
    HANDLE hMutex; //mutex para controlar as exclusões mutuas (contexto: um exclusivo para todos os produtoes e um para todos os consumidores)
    int terminar; //flag para indicar a thread para lidar quando é que ela termina, 1 para sair, 0 para o contrário
	//id do produtor
	int id;
} DadosThreads;

int num_aletaorios(int min, int max)
{
    //gerar os valores aleatorios
    return rand() % (max - min + 1) + min;
}

//thread da logica do produtor
DWORD WINAPI ThreadProdutor(LPVOID param)
{
    DadosThreads* dados=(DadosThreads*)param;
    int contador = 0;
	
    CelulaBuffer cel; //celula do buffer circular que vai ser preenchida
	while(!dados->terminar)
	{
		//logica do produtor
        cel.id = dados->id;
		//um valor aleatorio
        cel.valor = num_aletaorios(10,99);

		//escrever no buffer cicular, se existir uma posição de escrita
        WaitForSingleObject(dados->hSemEscrita, INFINITE); //vão ter 10 posições
        //aceder a essa posição em exclusão mutua
        WaitForSingleObject(dados->hMutex, INFINITE);
		//os dois waits foram desbloqueados então vamos escrever no buffer circular
        CopyMemory(
            &dados->memoriaPartilhada->buffer[dados->memoriaPartilhada->posE], //ponteiro (&) para onde vamos copiar
            &cel, //origem
            sizeof(CelulaBuffer)//quantidade de informação que vai ser copiada
            );
		//a proxima posição de escrita tem que ser incremantada
        dados->memoriaPartilhada->posE++;
		//se eu atingir o tamanho do buffer circular, pode ser feito com a logica da divisão por zero
		if(dados->memoriaPartilhada->posE == TAM_BUFFER)
		{
            dados->memoriaPartilhada->posE = 0;
		}
		//libertar o mutex (entre os produtores)
        ReleaseMutex(dados->hMutex);
		//libertar o semaforo: libertar UMA posição de leitura, para o consumidor poder ler
		//o produto espera por uma posição de escrita e liberta uma posição de leitura
		//o consumidor espera por uma posição de leitura e liberta uma posição de escrita
        ReleaseSemaphore(dados->hSemLeitura, 1, NULL);
        contador++; //quantidade de item produzidos em cada iteração
        _tprintf(TEXT("\nProdutor %d, produziu %d"), dados->id, cel.valor);
		
        Sleep(num_aletaorios(2,4)*1000);
	}
    _tprintf(TEXT("\nProdutor %d, produziu %d"), dados->id, contador);
    return 0;
}

int _tmain(int argc, TCHAR* argv[]) {
	//criar a estrutua de memoria partilhada
    HANDLE hFileMap;
    //estrutura de dados para a memoria partilhada
    DadosThreads dados;
    //
    BOOL primeirProcesso = FALSE;

	//handle para a thread
    HANDLE hThread;
    TCHAR comando[100];
	
#ifdef UNICODE 
    _setmode(_fileno(stdin), _O_WTEXT);
    _setmode(_fileno(stdout), _O_WTEXT);
    _setmode(_fileno(stderr), _O_WTEXT);
#endif
    //para o aletorio
    srand((unsigned int)time(NULL));

	
    //criar ou abrir os semaforos antes das funções .. para não usar unmaps
	dados.hSemEscrita = CreateSemaphore(
		NULL, //segurança
        TAM_BUFFER,//iniciais no maximo - começa com a totalidade das poisções libertas
        TAM_BUFFER,//finais no maximo - começa com a totalidade das poisções libertas
        TEXT("SO_SEM_ESCRITA")//nome do semaforo
    );
	//controlar as posições que podem ser lidas
    dados.hSemLeitura = CreateSemaphore(
        NULL, //segurança
        0,//iniciais nenhumas, não existe nada para ser lido
        TAM_BUFFER,//finais, o maximo, a totalidade
        TEXT("SO_SEM_LEITURA")//nome do semaforo
    );
    //criar os mutexs para a exclusão mutua dos produtores
    dados.hMutex = CreateMutex(
        NULL,
        FALSE,
        TEXT("SO_MUTEX_PRODUTORES")
    );
	//testar os três ultimos
	if(dados.hSemEscrita == NULL || dados.hSemLeitura == NULL || dados.hMutex == NULL)
	{
        _tprintf(TEXT("\nErro no CreateSemaphore OU no CreateMutex"));
        return 1;
	}
	
    hFileMap = OpenFileMapping(
    FILE_MAP_ALL_ACCESS, //aceder 
        FALSE, //não vai haver processos
        TEXT("SO2_MEM_PARTILHADA") //nome igual ao create file
        );
	if(hFileMap == NULL)
	{
        primeirProcesso = TRUE;
        //existe um poblema no openFileMapping
		//então criamos..
		hFileMap = CreateFileMapping(
        INVALID_HANDLE_VALUE, //handle para o FILE que vai ser criado pelo sistema operativo
            NULL, //atributos de segurança
            PAGE_READWRITE, //nivel de protecção, tipo de acesso , normal ser leitura/escrita
            0, //dimensão da memoria partilhada (mais significativa e menos significativa, a menos fica a 0)
            sizeof(BufferCircular), //parte menos significativa fica com o tamanho da memoria partilhada
            TEXT("SO2_MEM_PARTILHADA")//nome para o file map
            );
            //verificação ao file mapping
            if (hFileMap == NULL)
            {
                _tprintf(TEXT("\nErro no CreateFileMapping"));
                return 1;
            }
	}
    
	//fazer o mapeamento da memoria partilhada
    dados.memoriaPartilhada = (BufferCircular *) MapViewOfFile(
        hFileMap, //o handle para o file mapping
        FILE_MAP_ALL_ACCESS, //permissões escrita/leitura porque tanto o consumidor e produtor fazem as duas coisas
        0, //off-set, a partir do local onde queremos mapear a memoria partilhada
        0,
        0 //tudo a zero porque queremos mapear desde o inicio até ao final..
    );
    //deve haver um cast para o MapViewOfFile, porque se correr bem será um ponteiro para void (BufferCircular *)
    //verificação do MapViewOfFile
    if (dados.memoriaPartilhada == NULL)
    {
        _tprintf(TEXT("\nErro no MapViewOfFile"));
        return 1;
    }
	//temos que incializar as variaveis que estão associados à estrutra circular
    //int nP; //numero de produtores
	//int nC; //numero de consumidores
	//int posE; //proxima posição de escrita
	//int posL; //proxima posição de leitura

	//mas estas variaveis só devem ser carregadas quando o primeiro produtor arrancar, apenas e só!
	if(primeirProcesso == TRUE){
	    dados.memoriaPartilhada->nC = 0;
	    dados.memoriaPartilhada->nP = 0;
	    dados.memoriaPartilhada->posE = 0;
	    dados.memoriaPartilhada->posL = 0;
    }

    dados.terminar = 0;

	//incrementar o numero de produtores
    WaitForSingleObject(dados.hMutex, INFINITE);
    dados.memoriaPartilhada->nP++;
    dados.id = dados.memoriaPartilhada->nP;
	//libertar o mutex
    ReleaseMutex(dados.hMutex);

	//a thread
    hThread = CreateThread(NULL, 0, ThreadProdutor, &dados, 0, NULL);
	if(hThread != NULL)
	{
		//thread criada
        _tprintf(TEXT("\nEscreva qualquer coisa para sair.."));
        _getts_s(comando, 100);
        dados.terminar = 1;
        WaitForSingleObject(hThread, INFINITE);
	}

    //memoria partilhada
    UnmapViewOfFile(dados.memoriaPartilhada);
	//close dos handles: termina quando processo termina, não é preciso criar isto!
	
    return 0;
}

assim fica o consumidor:

#include <Windows.h>
#include <tchar.h>
#include <math.h>
#include <stdio.h>
#include <fcntl.h>
#include <io.h>
#include <time.h>

#define TAM_BUFFER 10

typedef struct
{
    int id;
    int valor;
} CelulaBuffer;

typedef struct {
    int nP; 
    int nC; 
    int posE; 
    int posL; 
    CelulaBuffer buffer[TAM_BUFFER];
} BufferCircular;

typedef struct
{
    BufferCircular* memoriaPartilhada;
    HANDLE hSemEscrita; 
    HANDLE hSemLeitura;
    HANDLE hMutex; 
    int terminar; 
    int id;
} DadosThreads;

DWORD WINAPI ThreadConsumidor(LPVOID param)
{
    DadosThreads* dados = (DadosThreads*)param;
    int contador = 0;
	//consumidor
    int soma = 0;

    CelulaBuffer cel;
    while (!dados->terminar)
    {
    	//bloquer no semaforo de leitura
    	//consumidor
        WaitForSingleObject(dados->hSemLeitura, INFINITE); 
        WaitForSingleObject(dados->hMutex, INFINITE);
    	//consumidor
        CopyMemory(
            &cel, //variavel interna, local
            &dados->memoriaPartilhada->buffer[dados->memoriaPartilhada->posL], //origem, proxima posição de leitura
            sizeof(CelulaBuffer)
        );
        //consumidor
        dados->memoriaPartilhada->posL++;
        //consumidor
        if (dados->memoriaPartilhada->posL == TAM_BUFFER)
        {
            dados->memoriaPartilhada->posL = 0;
        }
        ReleaseMutex(dados->hMutex);
    	//consumidor
        ReleaseSemaphore(dados->hSemEscrita, 1, NULL);
        contador++;
    	//consumidor
        soma += cel.valor;
        _tprintf(TEXT("\nConsumidor %d, consumiu %d"), dados->id, cel.valor);
    }
    _tprintf(TEXT("\nConsumidor %d, somando um valor %d"), dados->id, soma);
    return 0;
}

int _tmain(int argc, TCHAR* argv[]) {
    //criar a estrutua de memoria partilhada
    HANDLE hFileMap;
    //estrutura de dados para a memoria partilhada
    DadosThreads dados;
    //
    BOOL primeirProcesso = FALSE;

    //handle para a thread
    HANDLE hThread;
    TCHAR comando[100];

#ifdef UNICODE 
    _setmode(_fileno(stdin), _O_WTEXT);
    _setmode(_fileno(stdout), _O_WTEXT);
    _setmode(_fileno(stderr), _O_WTEXT);
#endif
    //para o aletorio
    srand((unsigned int)time(NULL));

    //criar ou abrir os semaforos antes das funções .. para não usar unmaps
    dados.hSemEscrita = CreateSemaphore(
        NULL, //segurança
        TAM_BUFFER,//iniciais no maximo - começa com a totalidade das poisções libertas
        TAM_BUFFER,//finais no maximo - começa com a totalidade das poisções libertas
        TEXT("SO_SEM_ESCRITA")//nome do semaforo
    );
    //controlar as posições que podem ser lidas
    dados.hSemLeitura = CreateSemaphore(
        NULL, //segurança
        0,//iniciais nenhumas, não existe nada para ser lido
        TAM_BUFFER,//finais, o maximo, a totalidade
        TEXT("SO_SEM_LEITURA")//nome do semaforo
    );
    //consumidor
    dados.hMutex = CreateMutex(
        NULL,
        FALSE,
        TEXT("SO_MUTEX_CONSUMIDORES")
    );
    //testar os três ultimos
    if (dados.hSemEscrita == NULL || dados.hSemLeitura == NULL || dados.hMutex == NULL)
    {
        _tprintf(TEXT("\nErro no CreateSemaphore OU no CreateMutex"));
        return 1;
    }

    hFileMap = OpenFileMapping(
        FILE_MAP_ALL_ACCESS, //aceder 
        FALSE, //não vai haver processos
        TEXT("SO2_MEM_PARTILHADA") //nome igual ao create file
    );
    if (hFileMap == NULL)
    {
        primeirProcesso = TRUE;
        //existe um poblema no openFileMapping
        //então criamos..
        hFileMap = CreateFileMapping(
            INVALID_HANDLE_VALUE, //handle para o FILE que vai ser criado pelo sistema operativo
            NULL, //atributos de segurança
            PAGE_READWRITE, //nivel de protecção, tipo de acesso , normal ser leitura/escrita
            0, //dimensão da memoria partilhada (mais significativa e menos significativa, a menos fica a 0)
            sizeof(BufferCircular), //parte menos significativa fica com o tamanho da memoria partilhada
            TEXT("SO2_MEM_PARTILHADA")//nome para o file map
        );
        //verificação ao file mapping
        if (hFileMap == NULL)
        {
            _tprintf(TEXT("\nErro no CreateFileMapping"));
            return 1;
        }
    }

    //fazer o mapeamento da memoria partilhada
    dados.memoriaPartilhada = (BufferCircular*)MapViewOfFile(
        hFileMap, //o handle para o file mapping
        FILE_MAP_ALL_ACCESS, //permissões escrita/leitura porque tanto o consumidor e produtor fazem as duas coisas
        0, //off-set, a partir do local onde queremos mapear a memoria partilhada
        0,
        0 //tudo a zero porque queremos mapear desde o inicio até ao final..
    );
    //deve haver um cast para o MapViewOfFile, porque se correr bem será um ponteiro para void (BufferCircular *)
    //verificação do MapViewOfFile
    if (dados.memoriaPartilhada == NULL)
    {
        _tprintf(TEXT("\nErro no MapViewOfFile"));
        return 1;
    }
    //temos que incializar as variaveis que estão associados à estrutra circular
    //int nP; //numero de produtores
    //int nC; //numero de consumidores
    //int posE; //proxima posição de escrita
    //int posL; //proxima posição de leitura

    //mas estas variaveis só devem ser carregadas quando o primeiro produtor arrancar, apenas e só!
    if (primeirProcesso == TRUE) {
        dados.memoriaPartilhada->nC = 0;
        dados.memoriaPartilhada->nP = 0;
        dados.memoriaPartilhada->posE = 0;
        dados.memoriaPartilhada->posL = 0;
    }

    dados.terminar = 0;

    //consumidor
    WaitForSingleObject(dados.hMutex, INFINITE);
    dados.memoriaPartilhada->nC++;
    dados.id = dados.memoriaPartilhada->nC;
    //libertar o mutex
    ReleaseMutex(dados.hMutex);

    //a thread
    hThread = CreateThread(NULL, 0, ThreadConsumidor, &dados, 0, NULL);
    if (hThread != NULL)
    {
        //thread criada
        _tprintf(TEXT("\nEscreva qualquer coisa para sair.."));
        _getts_s(comando, 100);
        dados.terminar = 1;
        WaitForSingleObject(hThread, INFINITE);
    }

    //memoria partilhada
    UnmapViewOfFile(dados.memoriaPartilhada);
    //close dos handles: termina quando processo termina, não é preciso criar isto!

    return 0;
}

para relembrar a versão light do semáforo:

#include <Windows.h>
#include <tchar.h>
#include <math.h>
#include <stdio.h>
#include <fcntl.h>
#include <io.h>
#include <shlwapi.h>

//memória partilhada entre os processos
//usar mecanismos de sincronização, para garantir exclusão mutua nesse bloco de memtória partilhada
//em windows: mapeamento de ficheiros

#define NUM_CARACTERES 100

typedef struct
{
	//ponteiro para memoria partilhada
	TCHAR* fileViewMap;
	//handle para o evento
	HANDLE hEvent;
	//handle para o mutex
	HANDLE hMutex;
	//flag para controlar as threads (aquela que escreve o fim)
	int terminar;
} ThreadDados;

DWORD WINAPI ThreadLer(LPVOID param)
{
	ThreadDados* dados = (ThreadDados*)param;

	while (1)
	{
		//bloquear à espera do evento evitar a espera ativa
		WaitForSingleObject(dados->hEvent, INFINITE);

		if (dados->terminar)
			break;

		WaitForSingleObject(dados->hMutex, INFINITE);

		//desbloqueou: e aqui deve ser o codigo mais curto possivel!!!
		//é uma zona critica
		_tprintf(TEXT("\nmensagem recebida: %s"), dados->fileViewMap);

		//libertar o mutex
		ReleaseMutex(dados->hMutex);
		Sleep(1000);
	}
	return 0;
}

DWORD WINAPI ThreadEscrever(LPVOID param)
{
	ThreadDados* dados = (ThreadDados*)param;
	TCHAR msg[NUM_CARACTERES];

	while (!(dados->terminar))
	{
		_fgetts(msg, NUM_CARACTERES, stdin);
		msg[_tcslen(msg) - 1] = '\0';

		if (_tcscmp(msg, TEXT("fim")) == 0)
		{
			dados->terminar = 1;
		}

		//bloquear à espera do evento evitar a espera ativa
		WaitForSingleObject(dados->hMutex, INFINITE);

		//limpar a memoria partilhada
		ZeroMemory(dados->fileViewMap, NUM_CARACTERES * sizeof(TCHAR));

		//copiar o conteudo para a memoria partilhada
		CopyMemory(dados->fileViewMap, msg, _tcslen(msg) * sizeof(TCHAR));
		//libertar o mutex
		ReleaseMutex(dados->hMutex);

		SetEvent(dados->hEvent);
		Sleep(500);
		ResetEvent(dados->hEvent);
	}
	return 0;
}

int _tmain(int argc, TCHAR* argv[]) {
	HANDLE hfileMap;

	ThreadDados dados;

	//criar as threads
	HANDLE hThreads[2]; //1 de escrita  e 1 de leitura

	//semaforos, é uma generalziação do mutex
	HANDLE hSemaforo;

#ifdef UNICODE 
	_setmode(_fileno(stdin), _O_WTEXT);
	_setmode(_fileno(stdout), _O_WTEXT);
	_setmode(_fileno(stderr), _O_WTEXT);
#endif

	//criar o semaforo antes da memoria partilhada
	hSemaforo = CreateSemaphore(NULL, 
		2, //qts começam disponiveis de inicio
		2, //qt são ao mesmo tempo
		TEXT("SO2_SEMAFORO")
	);
	
	if (hSemaforo == NULL)
	{
		_tprintf(TEXT("\nErro no CreateSemaphore"));
		return 1;
	}
	//esperar por uma slote
	_tprintf(TEXT("\naguardar por uma slote.."));
	WaitForSingleObject(hSemaforo, INFINITE); //quem não tem espaço aguarda.
	_tprintf(TEXT("\nchat.."));
	
	hfileMap = CreateFileMapping(
		INVALID_HANDLE_VALUE, //sistema operativo que faça a gestao
		NULL,
		PAGE_READWRITE,
		0,
		NUM_CARACTERES * sizeof(TCHAR), //tamanho do file mapping
		TEXT("SO2_MEM_PART") //nome do file mapping
	);
	if (hfileMap == NULL)
	{
		_tprintf(TEXT("\nErro no CreateFileMapping"));
		return 1;
	}
	dados.fileViewMap = (TCHAR*)MapViewOfFile(
		hfileMap,
		FILE_MAP_ALL_ACCESS,
		0,
		0,
		0
	);

	if (dados.fileViewMap == NULL)
	{
		_tprintf(TEXT("\nErro no fileViewMap"));
		return 1;
	}

	//o evento
	dados.hEvent = CreateEvent(
		NULL,
		TRUE, //reset manual
		FALSE,
		TEXT("SO2_EVENTO")
	);

	if (dados.hEvent == NULL)
	{
		_tprintf(TEXT("\nErro no CreateEvent"));
		UnmapViewOfFile(dados.fileViewMap); //pois ja temos e memoria partilhada
		return 1;
	}

	dados.hMutex = CreateMutex(
		NULL,
		FALSE,
		TEXT("SO2_MUTEX")
	);

	//sincronização da memoria partilhada
	if (dados.hMutex == NULL)
	{
		_tprintf(TEXT("\nErro no CreateMutex"));
		UnmapViewOfFile(dados.fileViewMap); //pois ja temos e memoria partilhada
		return 1;
	}

	dados.terminar = 0;
	//criar as threads (a de ler e a de escrever)
	hThreads[0] = CreateThread(NULL, 0, ThreadLer, &dados, 0, NULL);
	hThreads[1] = CreateThread(NULL, 0, ThreadEscrever, &dados, 0, NULL);

	if (hThreads[0] != NULL && hThreads[1] != NULL)
	{
		WaitForMultipleObjects(2, hThreads, TRUE, INFINITE);
	}

	//lidar com os semaforos
	ReleaseSemaphore(hSemaforo, 1, NULL);

	UnmapViewOfFile(dados.fileViewMap);
	CloseHandle(hfileMap);

	return 0;
}
Tags : , , ,

0 thoughts on “sessão 6b – Semáforos (produtor/consumidor)”

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.