#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <string.h>
#include <pthread.h>
#include <stdbool.h>
#include <assert.h>
#include <sys/time.h>
#include <errno.h>
/************ queue manage ***********/
typedef struct Node
{
void *data;
struct Node *next;
}queue_node_t;
#define BUFFER_POOL_SIZE (640 * 480)
#define BUFFER_POOL_NUM 5
struct pbuf{
uint32_t len;
uint8_t payload[BUFFER_POOL_SIZE];
};
typedef struct QueueList
{
int sizeOfQueue;
uint16_t memSize;
queue_node_t *head;
queue_node_t *tail;
}queue_t;
/* to inform consumer_thread */
static pthread_cond_t cap_cond;
static pthread_mutex_t cap_mutex;
/* stream queue for communicate between two threads */
static queue_t strm_queue;
static pthread_mutex_t strmq_mutex;
int strm_queue_init(){
queue_t *q = &strm_queue;
q->sizeOfQueue = 0;
q->memSize = 0;
q->head = q->tail = NULL;
if (pthread_cond_init(&cap_cond, NULL) != 0) {
printf("pthread_cond_init failed\n");
exit(-1);
}
if (pthread_mutex_init(&cap_mutex, NULL) != 0) {
printf("pthread_mutex_init failed\n");
pthread_cond_destroy(&cap_cond);
exit(-1);
}
if (pthread_mutex_init(&strmq_mutex, NULL) != 0) {
printf("pthread_mutex_init failed\n");
exit(-1);
}
return 0;
}
int malloc_node(queue_node_t **newNode);
int frame_enqueue(void *data, uint32_t len){
queue_t *q = &strm_queue;
queue_node_t *newNode = NULL;
struct pbuf *p;
assert(len < BUFFER_POOL_SIZE);
if (malloc_node(&newNode) != true) {
//printf("no node available!\n");
return -1;
}
p = (struct pbuf *)newNode->data;
memcpy(p->payload, data, len);
p->len = len;
pthread_mutex_lock(&strmq_mutex);
if (q->sizeOfQueue == 0) {
q->head = q->tail = newNode;
} else {
q->tail->next = newNode;
q->tail = newNode;
}
q->sizeOfQueue++;
pthread_mutex_unlock(&strmq_mutex);
pthread_mutex_lock(&cap_mutex);
pthread_cond_signal(&cap_cond);
pthread_mutex_unlock(&cap_mutex);
return 0;
}
int frame_dequeue(queue_node_t **newNode){
queue_t *q = &strm_queue;
bool ret = false;
pthread_mutex_lock(&strmq_mutex);
if (q->sizeOfQueue <= 0) {
pthread_mutex_unlock(&strmq_mutex);
*newNode = NULL;
ret = false;
}else{
*newNode = q->head;
if (q->sizeOfQueue > 1) {
q->head = q->head->next;
} else {
q->head = NULL;
q->tail = NULL;
}
q->sizeOfQueue--;
ret = true;
}
pthread_mutex_unlock(&strmq_mutex);
return ret;
}
/***** node pool manage *****/
static struct pbuf buffer_pool[BUFFER_POOL_NUM];
static queue_node_t queue_node[BUFFER_POOL_NUM];
/* mutex for free_queue when malloc and release node */
pthread_mutex_t freeq_mutex;
queue_t free_queue;
int node_pool_init(){
int i;
queue_t *q = &free_queue;
queue_node_t *newNode;
newNode = &queue_node[0];
newNode->data = (void *)&buffer_pool[0];
q->head = q->tail = newNode;
for (i = 1; i < BUFFER_POOL_NUM; i++) {
newNode = &queue_node[i];
newNode->data = (void *)&buffer_pool[i];
q->tail->next = newNode;
q->tail = newNode;
}
q->sizeOfQueue = i;
if (pthread_mutex_init(&freeq_mutex, NULL) != 0) {
printf("pthread_mutex_init failed\n");
exit(-1);
}
return 0;
}
/* dequeue from free queue */
int malloc_node(queue_node_t **newNode){
queue_t *q = &free_queue;
bool ret = false;
pthread_mutex_lock(&freeq_mutex);
if (q->sizeOfQueue <= 0) {
pthread_mutex_unlock(&freeq_mutex);
*newNode = NULL;
ret = false;
}else{
*newNode = q->head;
if (q->sizeOfQueue > 1) {
q->head = q->head->next;
} else {
q->head = NULL;
q->tail = NULL;
}
q->sizeOfQueue--;
ret = true;
}
pthread_mutex_unlock(&freeq_mutex);
return ret;
}
/* tailed to free queue */
int release_node(queue_node_t *newNode ){
queue_t *q = &free_queue;
newNode->next = NULL;
pthread_mutex_lock(&freeq_mutex);
if (q->sizeOfQueue == 0) {
q->head = q->tail = newNode;
} else {
q->tail->next = newNode;
q->tail = newNode;
}
q->sizeOfQueue++;
pthread_mutex_unlock(&freeq_mutex);
return 0;
}
/**************** test for communication between threads *************/
void* producer_thread(void *argv[]){
char message[100];
printf("producer start!\n");
for(int i = 0; i < 100; i++){
usleep(100000);
sprintf(message, "Hello %d", i);
if(0 != frame_enqueue((void *)message, strlen(message))){
printf("drop message: %s!\n", message);
};
}
printf("producer stop!\n");
}
void* consumer_thread(void *argv[]){
queue_node_t *newNode;
int i = 0;
struct timeval cur_tv;
struct timespec to;
int timeout_msec = 5000;
int ret;
while(1){
gettimeofday(&cur_tv, NULL);
to.tv_sec = cur_tv.tv_sec + timeout_msec / 1000;
to.tv_nsec = cur_tv.tv_usec * 1000 + (timeout_msec % 1000) * 1000000;
printf("consumer wait!\n");
pthread_mutex_lock(&cap_mutex);
ret = pthread_cond_timedwait(&cap_cond, &cap_mutex, &to);
pthread_mutex_unlock(&cap_mutex);
if((ret == ETIMEDOUT)){
printf("condition wait timeout!\n");
break;
}
while(frame_dequeue(&newNode)){
/* do some process */
printf("%s\n", ((struct pbuf *)(newNode->data))->payload);
usleep(500000);
release_node(newNode);
i++;
}
}
printf("consumer get %d message!\n", i);
printf("consumer exit!\n");
}
int main(int argc, char *argv[])
{
int i;
pthread_t pid1, pid2;
printf("\n--- test for communication between threads ---\n");
node_pool_init();
strm_queue_init();
if (pthread_create(&pid2, NULL, consumer_thread, NULL)){
printf("create consumer failed!\n");
}
if (pthread_create(&pid1, NULL, producer_thread, NULL)) {
printf("create producer failed!\n");
return -1;
}
pthread_join(pid1, NULL);
pthread_join(pid2, NULL);
printf("\n--- test exit ---!\n");
return 0;
}