#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
#include <sys/types.h>
#include <fcntl.h>
#include <string.h>
#include <sys/stat.h>
#include <semaphore.h>
#include <sys/time.h>
#define BUFFER_SIZE 256
/* --- Structs --- */
typedef struct ThreadParams {
int pipeFile[2];
sem_t sem_A, sem_B, sem_C;
char message[BUFFER_SIZE];
FILE* inputFile;
} ThreadParams;
pthread_attr_t attr;
/* --- Prototypes --- */
/* Initializes data and utilities used in thread params */
void initializeData(ThreadParams *params);
void* ThreadA(void *params);
void* ThreadB(void *params);
void* ThreadC(void *params);
/* --- Main Code --- */
int main(int argc, char const *argv[]) {
int err;
pthread_t tid[3];
ThreadParams params;
// Initialization
initializeData(¶ms);
// Create Threads
pthread_create(&(tid[0]), &attr, &ThreadA, (void*)(¶ms));
pthread_create(&(tid[1]), &attr, &ThreadB, (void*)(¶ms));
pthread_create(&(tid[2]), &attr, &ThreadC, (void*)(¶ms));
// Wait on threads to finish
pthread_join(tid[0], NULL);
pthread_join(tid[1], NULL);
pthread_join(tid[2], NULL);
// Clean up
sem_destroy(¶ms.sem_A);
sem_destroy(¶ms.sem_B);
sem_destroy(¶ms.sem_C);
return 0;
}
void initializeData(ThreadParams *params) {
// Initialize Sempahores
sem_init(&(params->sem_A), 0, 0);
sem_init(&(params->sem_B), 0, 0);
sem_init(&(params->sem_C), 0, 0);
// Initialize thread attributes
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
// Open data.txt and create a pipe
params->inputFile = fopen("data.txt", "r");
if (params->inputFile == NULL) {
printf("Failed to open data.txt\n");
exit(1);
}
if (pipe(params->pipeFile) == -1) {
printf("Failed to create a pipe\n");
exit(1);
}
return;
}
void* ThreadA(void *params) {
printf("Thread A start\n");
ThreadParams* threadParams = (ThreadParams*) params;
while(fgets(threadParams->message, BUFFER_SIZE, threadParams->inputFile) != NULL) {
printf("Thread A while\n");
write(threadParams->pipeFile[1], threadParams->message, BUFFER_SIZE);
sem_post(&threadParams->sem_B);
sem_wait(&threadParams->sem_A);
}
sem_post(&threadParams->sem_B);
close(threadParams->pipeFile[1]);
fclose(threadParams->inputFile);
pthread_exit(NULL);
}
void* ThreadB(void *params) {
printf("Thread B start\n");
ThreadParams *p = (ThreadParams *) params;
int bytes_read;
while(1) {
printf("Thread B while\n");
// Wait for ThreadA to send data to pipe
sem_wait(&(p->sem_B));
// Read from pipe
bytes_read = read(p->pipeFile[0], p->message, BUFFER_SIZE);
// Check if all data has been processed
if (bytes_read == 0) {
p->message[0] = 0;
sem_post(&(p->sem_C));
break;
}
printf("\nThreadB: Forwarding message to ThreadC\n");
sem_post(&(p->sem_C));
}
close(p->pipeFile[0]);
pthread_exit(0);
}
void* ThreadC(void *params) {
printf("Thread C start\n");
ThreadParams *p = (ThreadParams *) params;
FILE *fOut = fopen("output.txt", "w+");
while(1) {
printf("Thread C while\n");
// Wait for ThreadB to write data to shared variable
sem_wait(&(p->sem_C));
// Check if all data has been processed
if (strlen(p->message) == 0) {
fclose(fOut);
sem_post(&(p->sem_A));
break;
}
fprintf(fOut, "%s", p->message);
sem_post(&(p->sem_A));
}
pthread_exit(0);
}
This is the first line
This is the second line
etc..