This commit is contained in:
p-w-rs
2022-09-21 12:04:48 -05:00
parent 9e1ad48346
commit 6d881b3edd
75 changed files with 3681 additions and 113 deletions

View File

@@ -0,0 +1,12 @@
CC=gcc
CFLAGS=-O0 -Wall -Wextra -g -pthread
BINS=$(patsubst prod_con%.c,prod_con%,$(wildcard prod_con*.c))
all: $(BINS)
clean:
rm -f $(BINS)
%: %.c
$(CC) $(CFLAGS) -o $@ $<

View File

@@ -0,0 +1,38 @@
# Producer/Consumer with condition variable examples
CS3841 examples for producer/consumer that motivate the need for condition variables
* prod\_con1.c - Producer and consumer with unlimited sized buffer
Works with a single semaphore to count full slots when only a single producer and consumer is used
* prod\_con2.c - Producer and consumer with unlimited sized buffer
Shows a problem that can arise when multiple consumers are consuming elements from the shared space at the same time
With no controlled access between consumers, the number of elements in the shared space become incorrect
* prod\_con3.c - Producer and consumer with unlimited sized buffer
Fixes the problem when multiple producers and consumers modify a shared space by using a mutex lock to prevent threads from modifying the space at the same time
* prod\_con4.c - Producer and consumer with unlimited sized buffer
Shows a problem that can arise when a consumer wants to consume multiple items from the shared space. Since a mutex lock is used to lock out other threads, it is possible for a consumer to remove elements from an empty shared space
* prod\_con5.c - Producer and consumer with unlimited sized buffer
An attempt to fix the problem of a consumer removing from an empty shared space when more than one item is to be consumed by waiting in a while loop until enough elements are in the space
Unfortunately causes deadlock since the consumer is holding the the mutex lock while waiting for more items
* prod\_con6.c - Producer and consumer with unlimited sized buffer
An attempt to fix the problem of a consumer removing from an empty shared space when more than one item is to be consumed by waiting in a while loop until enough elements are in the space.
Attemps to fix the deadlock provlem in prod\_cons5.c by releasing the lock
Unfortunately the consumer releasing the lock in the while loop causes undefined behavior
* prod\_con7.c - Producer and consumer with unlimited sized buffer
An attempt to fix the problem of a consumer removing from an empty shared space when more than one item is to be consumed by releasing the lock on the shared space and then waiting on a semaphore until there are more items.
Unfortunately, there is know way for the consumer to get back into the "head of the line" after more items are produced. Another consumer can skip ahead
* prod\_con\_cond1.c - Producer and consumer with unlimited sized buffer
Uses a condition variable to ensure proper order of the consumers when they wait for more items to be consumed
Unfortunately, using a single condition variable without additional mutual exculsion does not maintain the consumers order in line
* prod\_con\_cond2.c - Producer and consumer with unlimited sized buffer
Uses a condition variable to ensure proper order of the consumers when they wait for more items to be consumed
Also uses a mutex to control the ordering of consumers
This example provides correct behavior when there are multiple producers and multiple consumers and each consumer orders multiple items.

View File

@@ -0,0 +1,95 @@
/*
* prod_con1.c - Producer and consumer with unlimited sized buffer
* Works with a single semaphore to count full slots
* when only a single producer and consumer is used
*/
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>
#include <semaphore.h>
#define ELEMENTS 100
int count = 0;
// Semaphores for controlling access
sem_t fullCount;
// Start flag
volatile int start = 0;
// Producer Routine
void* producer()
{
// wait for start from master thread
while(!start);
for(int i = 0; i < ELEMENTS; i++) {
// Add an element
count++;
printf("Producer %lu added, count: %d\n", pthread_self(), count);
// Signal the consumer that there something to consume
sem_post(&fullCount);
usleep(200);
}
return NULL;
}
// Consumer Routine
void* consumer()
{
// wait for start from master thread
while(!start);
for(int i = 0; i < ELEMENTS; i++) {
// Wait for element to consume
sem_wait(&fullCount);
// Remove an element from the buffer
if(count == 0) {
printf("ERROR: Customer %lu removed with no elements\n", pthread_self());
exit(EXIT_FAILURE);
}
count--;
usleep(50);
}
return NULL;
}
// Main thread
int main()
{
sem_init(&fullCount, 0, 0);
pthread_t prod;
pthread_t cons;
if(pthread_create(&prod, NULL, producer, NULL) == -1) {
printf("COULD NOT CREATE PRODUCER\n");
exit(EXIT_FAILURE);
}
if(pthread_create(&cons, NULL, consumer, NULL) == -1) {
printf("COULD NOT CREATE CONSUMER\n");
exit(EXIT_FAILURE);
}
start = 1;
pthread_join(prod, NULL);
pthread_join(cons, NULL);
printf("Final count is: %d, expected 0\n", count);
sem_destroy(&fullCount);
return EXIT_SUCCESS;
}

View File

@@ -0,0 +1,108 @@
/*
* prod_con2.c - Producer and consumer with unlimited sized buffer
* Shows a problem that can arise when multiple consumers
* are consuming elements from the shared space at the same time
* With no controlled access between consumers, the number
* of elements in the shared space become incorrect
*/
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>
#include <semaphore.h>
#define ELEMENTS 100
#define PRODUCER_COUNT 10
#define CONSUMER_COUNT 5
int count = 0;
// Semaphores for controlling access
sem_t fullCount;
// Start flag
volatile int start = 0;
// Producer Routine
void* producer()
{
// wait for start from master thread
while(!start);
for(int i = 0; i < ELEMENTS; i++) {
// Add an element
count++;
printf("Producer %lu added, count: %d\n", pthread_self(), count);
// Signal the consumer that there something to consume
sem_post(&fullCount);
usleep(200);
}
return NULL;
}
// Consumer Routine
void* consumer()
{
// wait for start from master thread
while(!start);
for(int i = 0; i < ELEMENTS; i++) {
// Wait for element to consume
sem_wait(&fullCount);
// Remove an element from the buffer
if(count == 0) {
printf("ERROR: Customer %lu removed with no elements\n", pthread_self());
exit(EXIT_FAILURE);
}
count--;
usleep(50);
}
return NULL;
}
// Main thread
int main()
{
sem_init(&fullCount, 0, 0);
pthread_t prod[PRODUCER_COUNT];
pthread_t cons[CONSUMER_COUNT];
for(int i = 0; i < PRODUCER_COUNT; i++) {
if(pthread_create(&prod[i], NULL, producer, NULL) == -1) {
printf("COULD NOT CREATE PRODUCER\n");
exit(EXIT_FAILURE);
}
}
for(int i = 0; i < CONSUMER_COUNT; i++) {
if(pthread_create(&cons[i], NULL, consumer, NULL) == -1) {
printf("COULD NOT CREATE CONSUMER\n");
exit(EXIT_FAILURE);
}
}
start = 1;
for(int i = 0; i < PRODUCER_COUNT; i++) {
pthread_join(prod[i], NULL);
}
for(int i = 0; i < CONSUMER_COUNT; i++) {
pthread_join(cons[i], NULL);
}
int expected_final_count = PRODUCER_COUNT*ELEMENTS - CONSUMER_COUNT*ELEMENTS;
printf("Final count is: %d, expected %d\n", count, expected_final_count);
sem_destroy(&fullCount);
return EXIT_SUCCESS;
}

View File

@@ -0,0 +1,116 @@
/*
* prod_con3.c - Producer and consumer with unlimited sized buffer
* Fixes the problem when multiple producers and
* consumers modify a shared space by using
* a mutex lock to prevent threads from modifying
* the space at the same time
*/
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>
#include <semaphore.h>
#define ELEMENTS 100
#define PRODUCER_COUNT 10
#define CONSUMER_COUNT 5
int count = 0;
// Semaphores for controlling access
sem_t fullCount;
pthread_mutex_t element_mutex = PTHREAD_MUTEX_INITIALIZER;
// Start flag
volatile int start = 0;
// Producer Routine
void* producer()
{
// wait for start from master thread
while(!start);
for(int i = 0; i < ELEMENTS; i++) {
pthread_mutex_lock(&element_mutex);
// Add an element
count++;
printf("Producer %lu added, count: %d\n", pthread_self(), count);
// Signal the consumer that there something to consume
sem_post(&fullCount);
pthread_mutex_unlock(&element_mutex);
usleep(200);
}
return NULL;
}
// Consumer Routine
void* consumer()
{
// wait for start from master thread
while(!start);
for(int i = 0; i < ELEMENTS; i++) {
// Wait for element to consume
sem_wait(&fullCount);
// Remove an element from the buffer
pthread_mutex_lock(&element_mutex);
if(count == 0) {
printf("ERROR: Customer %lu removed with no elements\n", pthread_self());
exit(EXIT_FAILURE);
}
count--;
pthread_mutex_unlock(&element_mutex);
usleep(50);
}
return NULL;
}
// Main thread
int main()
{
sem_init(&fullCount, 0, 0);
pthread_t prod[PRODUCER_COUNT];
pthread_t cons[CONSUMER_COUNT];
for(int i = 0; i < PRODUCER_COUNT; i++) {
if(pthread_create(&prod[i], NULL, producer, NULL) == -1) {
printf("COULD NOT CREATE PRODUCER\n");
exit(EXIT_FAILURE);
}
}
for(int i = 0; i < CONSUMER_COUNT; i++) {
if(pthread_create(&cons[i], NULL, consumer, NULL) == -1) {
printf("COULD NOT CREATE CONSUMER\n");
exit(EXIT_FAILURE);
}
}
start = 1;
for(int i = 0; i < PRODUCER_COUNT; i++) {
pthread_join(prod[i], NULL);
}
for(int i = 0; i < CONSUMER_COUNT; i++) {
pthread_join(cons[i], NULL);
}
int expected_final_count = PRODUCER_COUNT*ELEMENTS - CONSUMER_COUNT*ELEMENTS;
printf("Final count is: %d, expected %d\n", count, expected_final_count);
sem_destroy(&fullCount);
pthread_mutex_destroy(&element_mutex);
return EXIT_SUCCESS;
}

View File

@@ -0,0 +1,119 @@
/*
* prod_con4.c - Producer and consumer with unlimited sized buffer
* Shows a problem that can arise when a consumer
* wants to consume multiple items from the shared
* space. Since a mutex lock is used to lock out
* other threads, it is possible for a consumer
* to remove elements from an empty shared space
*/
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>
#include <semaphore.h>
#define ELEMENTS 100
#define PRODUCER_COUNT 10
#define CONSUMER_COUNT 5
#define ORDER_COUNT 2
int count = 0;
// Semaphores for controlling access
sem_t fullCount;
pthread_mutex_t element_mutex = PTHREAD_MUTEX_INITIALIZER;
// Start flag
volatile int start = 0;
// Producer Routine
void* producer()
{
// wait for start from master thread
while(!start);
for(int i = 0; i < ELEMENTS; i++) {
pthread_mutex_lock(&element_mutex);
// Add an element
count++;
printf("Producer %lu added, count: %d\n", pthread_self(), count);
// Signal the consumer that there something to consume
sem_post(&fullCount);
pthread_mutex_unlock(&element_mutex);
usleep(200);
}
return NULL;
}
// Consumer Routine
void* consumer()
{
// wait for start from master thread
while(!start);
for(int i = 0; i < ELEMENTS; i++) {
// Wait for element to consume
sem_wait(&fullCount);
pthread_mutex_lock(&element_mutex);
for(int j = 0; j < ORDER_COUNT; j++) {
// Remove an element from the buffer
if(count == 0) {
printf("ERROR: Customer %lu removed with no elements\n", pthread_self());
exit(EXIT_FAILURE);
}
count--;
}
pthread_mutex_unlock(&element_mutex);
usleep(50);
}
return NULL;
}
// Main thread
int main()
{
sem_init(&fullCount, 0, 0);
pthread_t prod[PRODUCER_COUNT];
pthread_t cons[CONSUMER_COUNT];
for(int i = 0; i < PRODUCER_COUNT; i++) {
if(pthread_create(&prod[i], NULL, producer, NULL) == -1) {
printf("COULD NOT CREATE PRODUCER\n");
exit(EXIT_FAILURE);
}
}
for(int i = 0; i < CONSUMER_COUNT; i++) {
if(pthread_create(&cons[i], NULL, consumer, NULL) == -1) {
printf("COULD NOT CREATE CONSUMER\n");
exit(EXIT_FAILURE);
}
}
start = 1;
for(int i = 0; i < PRODUCER_COUNT; i++) {
pthread_join(prod[i], NULL);
}
for(int i = 0; i < CONSUMER_COUNT; i++) {
pthread_join(cons[i], NULL);
}
int expected_final_count = PRODUCER_COUNT*ELEMENTS - CONSUMER_COUNT*ELEMENTS*ORDER_COUNT;
printf("Final count is: %d, expected %d\n", count, expected_final_count);
sem_destroy(&fullCount);
return EXIT_SUCCESS;
}

View File

@@ -0,0 +1,123 @@
/*
* prod_con5.c - Producer and consumer with unlimited sized buffer
* An attempt to fix the problem of a consumer removing
* from an empty shared space when more than one item
* is to be consumed by waiting in a while loop until
* enough elements are in the space
* Unfortunately causes deadlock since the consumer is
* holding the the mutex lock while waiting for more items
*/
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>
#include <semaphore.h>
#define ELEMENTS 100
#define PRODUCER_COUNT 10
#define CONSUMER_COUNT 5
#define ORDER_COUNT 2
int count = 0;
// Semaphores for controlling access
sem_t fullCount;
pthread_mutex_t element_mutex = PTHREAD_MUTEX_INITIALIZER;
// Start flag
volatile int start = 0;
// Producer Routine
void* producer()
{
// wait for start from master thread
while(!start);
for(int i = 0; i < ELEMENTS; i++) {
pthread_mutex_lock(&element_mutex);
// Add an element
count++;
printf("Producer %lu added, count: %d\n", pthread_self(), count);
// Signal the consumer that there something to consume
sem_post(&fullCount);
pthread_mutex_unlock(&element_mutex);
usleep(200);
}
return NULL;
}
// Consumer Routine
void* consumer()
{
// wait for start from master thread
while(!start);
for(int i = 0; i < ELEMENTS; i++) {
// Wait for element to consume
sem_wait(&fullCount);
pthread_mutex_lock(&element_mutex);
while(count < ORDER_COUNT);
for(int j = 0; j < ORDER_COUNT; j++) {
// Remove an element from the buffer
if(count == 0) {
printf("ERROR: Customer %lu removed with no elements\n", pthread_self());
exit(EXIT_FAILURE);
}
count--;
}
pthread_mutex_unlock(&element_mutex);
usleep(50);
}
return NULL;
}
// Main thread
int main()
{
sem_init(&fullCount, 0, 0);
pthread_t prod[PRODUCER_COUNT];
pthread_t cons[CONSUMER_COUNT];
for(int i = 0; i < PRODUCER_COUNT; i++) {
if(pthread_create(&prod[i], NULL, producer, NULL) == -1) {
printf("COULD NOT CREATE PRODUCER\n");
exit(EXIT_FAILURE);
}
}
for(int i = 0; i < CONSUMER_COUNT; i++) {
if(pthread_create(&cons[i], NULL, consumer, NULL) == -1) {
printf("COULD NOT CREATE CONSUMER\n");
exit(EXIT_FAILURE);
}
}
start = 1;
for(int i = 0; i < PRODUCER_COUNT; i++) {
pthread_join(prod[i], NULL);
}
for(int i = 0; i < CONSUMER_COUNT; i++) {
pthread_join(cons[i], NULL);
}
int expected_final_count = PRODUCER_COUNT*ELEMENTS - CONSUMER_COUNT*ELEMENTS;
printf("Final count is: %d, expected %d\n", count, expected_final_count);
sem_destroy(&fullCount);
pthread_mutex_destroy(&element_mutex);
return EXIT_SUCCESS;
}

View File

@@ -0,0 +1,127 @@
/*
* prod_con6.c - Producer and consumer with unlimited sized buffer
* An attempt to fix the problem of a consumer removing
* from an empty shared space when more than one item
* is to be consumed by waiting in a while loop until
* enough elements are in the space. Attemps to fix the
* deadlock provlem in prod_cons5.c by releasing the lock
* Unfortunately the consumer releasing the lock in the
* while loop causes undefined behavior
*/
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>
#include <semaphore.h>
#define ELEMENTS 100
#define PRODUCER_COUNT 10
#define CONSUMER_COUNT 5
#define ORDER_COUNT 2
int count = 0;
// Semaphores for controlling access
sem_t fullCount;
pthread_mutex_t element_mutex = PTHREAD_MUTEX_INITIALIZER;
// Start flag
volatile int start = 0;
// Producer Routine
void* producer()
{
// wait for start from master thread
while(!start);
for(int i = 0; i < ELEMENTS; i++) {
pthread_mutex_lock(&element_mutex);
// Add an element
count++;
printf("Producer %lu added, count: %d\n", pthread_self(), count);
// Signal the consumer that there something to consume
sem_post(&fullCount);
pthread_mutex_unlock(&element_mutex);
usleep(200);
}
return NULL;
}
// Consumer Routine
void* consumer()
{
// wait for start from master thread
while(!start);
for(int i = 0; i < ELEMENTS; i++) {
// Wait for element to consume
sem_wait(&fullCount);
pthread_mutex_lock(&element_mutex);
while(count < ORDER_COUNT) {
pthread_mutex_unlock(&element_mutex);
}
pthread_mutex_lock(&element_mutex);
for(int j = 0; j < ORDER_COUNT; j++) {
// Remove an element from the buffer
if(count == 0) {
printf("ERROR: Customer %lu removed with no elements\n", pthread_self());
exit(EXIT_FAILURE);
}
count--;
}
pthread_mutex_unlock(&element_mutex);
usleep(50);
}
return NULL;
}
// Main thread
int main()
{
sem_init(&fullCount, 0, 0);
pthread_t prod[PRODUCER_COUNT];
pthread_t cons[CONSUMER_COUNT];
for(int i = 0; i < PRODUCER_COUNT; i++) {
if(pthread_create(&prod[i], NULL, producer, NULL) == -1) {
printf("COULD NOT CREATE PRODUCER\n");
exit(EXIT_FAILURE);
}
}
for(int i = 0; i < CONSUMER_COUNT; i++) {
if(pthread_create(&cons[i], NULL, consumer, NULL) == -1) {
printf("COULD NOT CREATE CONSUMER\n");
exit(EXIT_FAILURE);
}
}
start = 1;
for(int i = 0; i < PRODUCER_COUNT; i++) {
pthread_join(prod[i], NULL);
}
for(int i = 0; i < CONSUMER_COUNT; i++) {
pthread_join(cons[i], NULL);
}
int expected_final_count = PRODUCER_COUNT*ELEMENTS - CONSUMER_COUNT*ELEMENTS;
printf("Final count is: %d, expected %d\n", count, expected_final_count);
sem_destroy(&fullCount);
pthread_mutex_destroy(&element_mutex);
return EXIT_SUCCESS;
}

View File

@@ -0,0 +1,131 @@
/*
* prod_con7.c - Producer and consumer with unlimited sized buffer
* An attempt to fix the problem of a consumer removing
* from an empty shared space when more than one item
* is to be consumed by releasing the lock on the shared
* space and then waiting on a semaphore until there
* are more items.
* Unfortunately, there is know way for the consumer
* to get back into the "head of the line" after more
* items are produced. Another consumer can skip ahead
* and take the items.
*/
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>
#include <semaphore.h>
#define ELEMENTS 100
#define PRODUCER_COUNT 10
#define CONSUMER_COUNT 5
#define ORDER_COUNT 2
int count = 0;
// Semaphores for controlling access
sem_t fullCount;
pthread_mutex_t element_mutex = PTHREAD_MUTEX_INITIALIZER;
// Start flag
volatile int start = 0;
// Producer Routine
void* producer()
{
// wait for start from master thread
while(!start);
for(int i = 0; i < ELEMENTS; i++) {
pthread_mutex_lock(&element_mutex);
// Add an element
count++;
printf("Producer %lu added, count: %d\n", pthread_self(), count);
// Signal the consumer that there something to consume
sem_post(&fullCount);
pthread_mutex_unlock(&element_mutex);
usleep(200);
}
return NULL;
}
// Consumer Routine
void* consumer()
{
// wait for start from master thread
while(!start);
for(int i = 0; i < ELEMENTS; i++) {
// Wait for element to consume
sem_wait(&fullCount);
pthread_mutex_lock(&element_mutex);
for(int j = 0; j < ORDER_COUNT; j++) {
if(count == 0) {
pthread_mutex_unlock(&element_mutex);
sem_wait(&fullCount);
pthread_mutex_lock(&element_mutex);
}
// Remove an element from the buffer
if(count == 0) {
printf("ERROR: Customer %lu removed with no elements\n", pthread_self());
exit(EXIT_FAILURE);
}
count--;
}
pthread_mutex_unlock(&element_mutex);
usleep(50);
}
return NULL;
}
// Main thread
int main()
{
sem_init(&fullCount, 0, 0);
pthread_t prod[PRODUCER_COUNT];
pthread_t cons[CONSUMER_COUNT];
for(int i = 0; i < PRODUCER_COUNT; i++) {
if(pthread_create(&prod[i], NULL, producer, NULL) == -1) {
printf("COULD NOT CREATE PRODUCER\n");
exit(EXIT_FAILURE);
}
}
for(int i = 0; i < CONSUMER_COUNT; i++) {
if(pthread_create(&cons[i], NULL, consumer, NULL) == -1) {
printf("COULD NOT CREATE CONSUMER\n");
exit(EXIT_FAILURE);
}
}
start = 1;
for(int i = 0; i < PRODUCER_COUNT; i++) {
pthread_join(prod[i], NULL);
}
for(int i = 0; i < CONSUMER_COUNT; i++) {
pthread_join(cons[i], NULL);
}
int expected_final_count = PRODUCER_COUNT*ELEMENTS - CONSUMER_COUNT*ELEMENTS;
printf("Final count is: %d, expected %d\n", count, expected_final_count);
sem_destroy(&fullCount);
pthread_mutex_destroy(&element_mutex);
return EXIT_SUCCESS;
}

View File

@@ -0,0 +1,121 @@
/*
* prod_con_cond1.c - Producer and consumer with unlimited sized buffer
* Uses a condition variable to ensure proper
* order of the consumers when they wait for more
* items to be consumed.
* Unfortunately, using a single condition variable
* without additional mutual exculsion does not maintain
* the consumers order in line
*/
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>
#define ELEMENTS 100
#define PRODUCER_COUNT 10
#define CONSUMER_COUNT 5
#define ORDER_COUNT 2
int count = 0;
pthread_mutex_t element_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t has_elements = PTHREAD_COND_INITIALIZER;
// Start flag
volatile int start = 0;
// Producer Routine
void* producer()
{
// wait for start from master thread
while(!start);
for(int i = 0; i < ELEMENTS; i++) {
pthread_mutex_lock(&element_mutex);
// Add an element
count++;
printf("Producer %lu added, count: %d\n", pthread_self(), count);
// Signal the consumer that there is something to consume
pthread_cond_signal(&has_elements);
pthread_mutex_unlock(&element_mutex);
usleep(200);
}
return NULL;
}
// Consumer Routine
void* consumer()
{
// wait for start from master thread
while(!start);
for(int i = 0; i < ELEMENTS; i++) {
pthread_mutex_lock(&element_mutex);
for(int j = 0; j < ORDER_COUNT; j++) {
// Wait for element to consume
if(count == 0) {
pthread_cond_wait(&has_elements, &element_mutex);
}
// Remove an element
if(count == 0) {
printf("ERROR: Customer %lu removed with no elements\n", pthread_self());
exit(EXIT_FAILURE);
}
count--;
}
pthread_mutex_unlock(&element_mutex);
usleep(50);
}
return NULL;
}
// Main thread
int main()
{
pthread_t prod[PRODUCER_COUNT];
pthread_t cons[CONSUMER_COUNT];
for(int i = 0; i < PRODUCER_COUNT; i++) {
if(pthread_create(&prod[i], NULL, producer, NULL) == -1) {
printf("COULD NOT CREATE PRODUCER\n");
exit(EXIT_FAILURE);
}
}
for(int i = 0; i < 5; i++) {
if(pthread_create(&cons[i], NULL, consumer, NULL) == -1) {
printf("COULD NOT CREATE CONSUMER\n");
exit(EXIT_FAILURE);
}
}
start = 1;
for(int i = 0; i < PRODUCER_COUNT; i++) {
pthread_join(prod[i], NULL);
}
for(int i = 0; i < CONSUMER_COUNT; i++) {
pthread_join(cons[i], NULL);
}
printf("Final count is: %d\n", count);
pthread_cond_destroy(&has_elements);
pthread_mutex_destroy(&element_mutex);
return EXIT_SUCCESS;
}

View File

@@ -0,0 +1,133 @@
/*
* prod_con_cond2.c - Producer and consumer with unlimited sized buffer
* Uses a condition variable to ensure proper
* order of the consumers when they wait for more
* items to be consumed.
* Also uses a mutex to control the ordering of consumers
* This example provides correct behavior when there
* are multiple producers and multiple consumers and each
* consumer orders multiple items.
*/
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>
#define ELEMENTS 100
#define PRODUCER_COUNT 10
#define CONSUMER_COUNT 5
#define ORDER_COUNT 2
int count = 0;
pthread_mutex_t consumer_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t element_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t has_elements = PTHREAD_COND_INITIALIZER;
// Start flag
volatile int start = 0;
// Producer Routine
void* producer()
{
// wait for start from master thread
while(!start);
for(int i = 0; i < ELEMENTS; i++) {
pthread_mutex_lock(&element_mutex);
// Add an element
count++;
printf("Producer %lu added, count: %d\n", pthread_self(), count);
// Signal the consumer that there is something to consume
pthread_cond_signal(&has_elements);
pthread_mutex_unlock(&element_mutex);
usleep(200);
}
return NULL;
}
// Consumer Routine
void* consumer()
{
// wait for start from master thread
while(!start);
for(int i = 0; i < ELEMENTS; i++) {
// All consumers line up in order
pthread_mutex_lock(&consumer_mutex);
// Only one consumer gets to be at the head of the line
pthread_mutex_lock(&element_mutex);
for(int j = 0; j < ORDER_COUNT; j++) {
// Wait for element to consume
if(count == 0) {
pthread_cond_wait(&has_elements, &element_mutex);
}
// Remove an element
if(count == 0) {
printf("ERROR: Customer %lu removed with no elements\n", pthread_self());
exit(EXIT_FAILURE);
}
count--;
}
// Let more consumers and producers run
pthread_mutex_unlock(&element_mutex);
// Let the next consumer at the head of the line order items
pthread_mutex_unlock(&consumer_mutex);
usleep(50);
}
return NULL;
}
// Main thread
int main()
{
pthread_t prod[PRODUCER_COUNT];
pthread_t cons[CONSUMER_COUNT];
for(int i = 0; i < PRODUCER_COUNT; i++) {
if(pthread_create(&prod[i], NULL, producer, NULL) == -1) {
printf("COULD NOT CREATE PRODUCER\n");
exit(EXIT_FAILURE);
}
}
for(int i = 0; i < 5; i++) {
if(pthread_create(&cons[i], NULL, consumer, NULL) == -1) {
printf("COULD NOT CREATE CONSUMER\n");
exit(EXIT_FAILURE);
}
}
start = 1;
for(int i = 0; i < PRODUCER_COUNT; i++) {
pthread_join(prod[i], NULL);
}
for(int i = 0; i < CONSUMER_COUNT; i++) {
pthread_join(cons[i], NULL);
}
printf("Final count is: %d\n", count);
pthread_cond_destroy(&has_elements);
pthread_mutex_destroy(&element_mutex);
pthread_mutex_destroy(&consumer_mutex);
return EXIT_SUCCESS;
}