0
点赞
收藏
分享

微信扫一扫

基于Linux sharememory的一种多进程生产者消费者工作模式实现


程序原理:

此程序可以用来模拟异构多核的处理器之间的相互通信以及Linux系统下多进程之间的数据直接共享,原理如下图所示:

基于Linux sharememory的一种多进程生产者消费者工作模式实现_#include

1.Server端(生产者端)

#include <sys/msg.h>
#include <sys/shm.h>
#include <sys/sem.h>
#include <sys/ipc.h>
#include <stdio.h>
#include <stdlib.h>
#include "ringbuffer.h"

union semun
{
int val; //信号量初始值
struct semid_ds *buf;
unsigned short int *array;
struct seminfo *__buf;
};

int main()
{
struct ringbuffer *rb;
int shm_key = ftok(".",0x5a);
//第二个参数(创建共享内存的大小)
int shm_id = shmget(shm_key, 1024*1024 + sizeof(struct ringbuffer), IPC_CREAT|IPC_EXCL|0755);
if(shm_id == -1)
{
perror("shmget.");
exit(1);
}
printf("%s line %d, shm_id=%d ,shm_key=0x%x.\n", __func__, __LINE__, shm_id,shm_key);
char *address = (char*)shmat(shm_id, NULL, 0);
if((void*)address == (void*)-1)
{
perror("shmat.");
shmctl(shm_id, IPC_RMID, 0);
exit(1);
}

rb = (struct ringbuffer*)address;

ringbuffer_init(rb, (void*)sizeof(struct ringbuffer), 1024);

key_t sem_key = ftok(".", 0x6b);
int sem_id = semget(sem_key, 2, IPC_CREAT|IPC_EXCL|0755);
union semun info;
info.val = 0;
semctl(sem_id,0, SETALL, info);
struct sembuf p ={0, -1, 0}, v = {1, 1,0};
unsigned char w = 0;
while(1)
{
if(ringbuffer_put(rb, &w, 1) == 1)
{
w ++;
printf("server: %s line %d.r %d, w %d.\n", __func__, __LINE__, rb->read_index, rb->write_index);
}
semop(sem_id, &v, 1);
semop(sem_id, &p, 1);
}
shmdt(address);
shmctl(shm_id, IPC_RMID, 0);
shmctl(sem_id, 0, IPC_RMID);
return 0;
}

2.Client端(消费者端)

#include <sys/msg.h>
#include <sys/shm.h>
#include <sys/sem.h>
#include <sys/ipc.h>
#include <stdio.h>
#include <stdlib.h>
#include "ringbuffer.h"

union semun
{
int val; //信号量初始值
struct semid_ds *buf;
unsigned short int *array;
struct seminfo *__buf;
};


int main()
{
struct ringbuffer *rb;
int shm_key = ftok(".",0x5a);
int shm_id = shmget(shm_key, 0, 0);
if(shm_id == -1)
{
perror("shmget.");
exit(1);
}
printf("%s line %d, shm_id=%d ,shm_key=0x%x.\n", __func__, __LINE__, shm_id,shm_key);
char *address = (char*)shmat(shm_id, NULL, 0);//NULL连接的地址由系统自动分配
if((void*)address == (void*)-1)
{
perror("shmat.");
exit(1);
}

rb = (struct ringbuffer *)address;

printf("address = %p\n", address);
rb = (struct ringbuffer*)address;
//ringbuffer_init(rb, address + sizeof(struct ringbuffer), 1024);

key_t sem_key = ftok(".",0x6b);
int sem_id = semget(sem_key, 0, 0);
struct sembuf p = {1, -1, 0}, v = {0, 1, 0};
unsigned char r = 0;
while(1)
{
printf("client: %s line %d.w %d, r %d\n", __func__, __LINE__, rb->write_index, rb->read_index);
semop(sem_id, &p, 1);
if (ringbuffer_get(rb, &r, 1) == 1)
{
printf("---%x---\n", r);
}
semop(sem_id, &v, 1);
}
shmdt(address);
return 0;
}

3.RingBuffer实现

ringbuffer.c

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include "ringbuffer.h"

void ringbuffer_init(struct ringbuffer *rb, uint8_t *pool, int16_t size)
{
if(rb == NULL)
{
return;
}

rb->read_mirror = rb->read_index = 0;
rb->write_mirror = rb->write_index = 0;

rb->buffer_ptr = pool;
rb->buffer_size = size;

pthread_mutexattr_t attr;
if(pthread_mutexattr_init(&attr) != 0)
{
printf("%s line %d, fatal error.\n", __func__, __LINE__);
while(1);
}

if(pthread_mutexattr_setpshared(&attr,PTHREAD_PROCESS_SHARED)!=0)
{
printf("%s line %d, fatal error.\n", __func__, __LINE__);
while(1);
}

if(pthread_mutexattr_setrobust(&attr, PTHREAD_MUTEX_ROBUST) != 0)
{
printf("%s line %d, fatal error.\n", __func__, __LINE__);
while(1);
}

pthread_mutex_init(&rb->mutex_lock,&attr);

return;
}


void ringbuffer_reset(struct ringbuffer *rb)
{
if(rb == NULL)
{
return;
}

rb->read_mirror = 0;
rb->read_index = 0;
rb->write_mirror = 0;
rb->write_index = 0;

return;
}


int16_t ringbuffer_put(struct ringbuffer *rb, const uint8_t *ptr, int16_t length)
{
if(rb == NULL || length == 0)
{
return 0;
}

int16_t size;
uint8_t *buffer_ptr = (uint8_t *)rb + (unsigned long)rb->buffer_ptr;

pthread_mutex_lock(&rb->mutex_lock);
size = ringbuffer_space_len(rb);
pthread_mutex_unlock(&rb->mutex_lock);

if(size == 0)
return 0;

if (size < length)
{
length = size;
}

if (rb->buffer_size - rb->write_index > length)
{
memcpy(&buffer_ptr[rb->write_index], ptr, length);
rb->write_index += length;
return length;
}

memcpy(&buffer_ptr[rb->write_index],&ptr[0],rb->buffer_size - rb->write_index);
memcpy(&buffer_ptr[0],&ptr[rb->buffer_size - rb->write_index],length - (rb->buffer_size - rb->write_index));

pthread_mutex_lock(&rb->mutex_lock);
rb->write_mirror = ~rb->write_mirror;
rb->write_index = length - (rb->buffer_size - rb->write_index);
pthread_mutex_unlock(&rb->mutex_lock);

return length;
}

int16_t ringbuffer_get(struct ringbuffer *rb, uint8_t *ptr, int16_t length)
{
if(rb == NULL || length == 0)
{
return 0;
}

int16_t size;
uint8_t *buffer_ptr = (uint8_t *)rb + (unsigned long)rb->buffer_ptr;

pthread_mutex_lock(&rb->mutex_lock);
size = ringbuffer_data_len(rb);
pthread_mutex_unlock(&rb->mutex_lock);

if (size == 0) return 0;

if (size < length)
{
length = size;
}

if (rb->buffer_size - rb->read_index > length)
{
memcpy(ptr, &buffer_ptr[rb->read_index], length);
rb->read_index += length;
return length;
}

memcpy(&ptr[0],&buffer_ptr[rb->read_index],rb->buffer_size - rb->read_index);
memcpy(&ptr[rb->buffer_size - rb->read_index], &buffer_ptr[0], length - (rb->buffer_size - rb->read_index));

pthread_mutex_lock(&rb->mutex_lock);
rb->read_mirror = ~rb->read_mirror;
rb->read_index = length - (rb->buffer_size - rb->read_index);
pthread_mutex_unlock(&rb->mutex_lock);

return length;
}

enum ringbuffer_state ringbuffer_status(struct ringbuffer *rb)
{
if (rb->read_index == rb->write_index)
{
if (rb->read_mirror == rb->write_mirror)
{
return RINGBUFFER_EMPTY;
}
else
{
return RINGBUFFER_FULL;
}
}

return RINGBUFFER_HALFFULL;
}

int16_t ringbuffer_data_len(struct ringbuffer *rb)
{
switch (ringbuffer_status(rb))
{
case RINGBUFFER_EMPTY:
return 0;
case RINGBUFFER_FULL:
return rb->buffer_size;
case RINGBUFFER_HALFFULL:
default:
if (rb->write_index > rb->read_index)
return rb->write_index - rb->read_index;
else
return rb->buffer_size - (rb->read_index - rb->write_index);
}
}

ringbuffer.h

#ifndef __RING_BUFFER__
#define __RING_BUFFER__
#include <stdint.h>
#include <stddef.h>
#include <pthread.h>
#include <unistd.h>

struct ringbuffer
{
uint8_t *buffer_ptr;

uint16_t read_mirror : 1;
uint16_t read_index : 15;

uint16_t write_mirror : 1;
uint16_t write_index : 15;

int16_t buffer_size;
pthread_mutex_t mutex_lock;
};

enum ringbuffer_state
{
RINGBUFFER_EMPTY,
RINGBUFFER_FULL,
RINGBUFFER_HALFFULL,
RINGBUFFER_INVALID,
};

void ringbuffer_init(struct ringbuffer *rb, uint8_t *pool, int16_t size);
void ringbuffer_reset(struct ringbuffer *rb);
int16_t ringbuffer_put(struct ringbuffer *rb, const uint8_t *ptr, int16_t length);
int16_t ringbuffer_get(struct ringbuffer *rb, uint8_t *ptr, int16_t length);
int16_t ringbuffer_data_len(struct ringbuffer *rb);

struct ringbuffer* ringbuffer_create(int16_t length);
void ringbuffer_destroy(struct ringbuffer *rb);

static inline int16_t ringbuffer_get_size(struct ringbuffer *rb)
{
return rb->buffer_size;
}

#define ringbuffer_space_len(rb) ((rb)->buffer_size - ringbuffer_data_len(rb))

#endif

Makefile

all:
gcc server.c ringbuffer.c -g -o server -lpthread
gcc client.c ringbuffer.c -g -o client -lpthread

测试:

先执行sever,再执行client

基于Linux sharememory的一种多进程生产者消费者工作模式实现_信号量_02

基于Linux sharememory的一种多进程生产者消费者工作模式实现_#define_03

程序退出后,删除信号量和共享内存,为了下次启动应用不会报错.

基于Linux sharememory的一种多进程生产者消费者工作模式实现_信号量_04

方法,执行ipcs

基于Linux sharememory的一种多进程生产者消费者工作模式实现_信号量_05

删除semid 13以及shmid 65574,注意根据代码中ftok函数第二个参数(0x5a,0x6b)来查找对应的资源ID.避免出错.

执行ipcrm -s semid执行删除semid.

执行ipcrm -s shmid执行删除shmid.

基于Linux sharememory的一种多进程生产者消费者工作模式实现_信号量_06

如果这样修改该ringbuffer_put函数,观察进程间pthread_mutex_lock的同步是否生效,即在ringbuffer_put临界区内添加sleep函数睡眠1s.

基于Linux sharememory的一种多进程生产者消费者工作模式实现_#define_07

可以看到互相之间以1秒为周期ping-pong执行

基于Linux sharememory的一种多进程生产者消费者工作模式实现_#define_08

结束!

举报

相关推荐

0 条评论