0
点赞
收藏
分享

微信扫一扫

[并发编程]设置互斥锁的健壮性

互斥锁的健壮性

互斥锁的健壮性,指的是当持有锁的线程/进程在未释放 锁的情况下退出时,可以通过其他操作将锁恢复到正常状态。可以通过设置锁的PTHREAD_MUTEX_ROBUST属性来达到该目的。

相关接口

#include <pthread.h>

// __attr:互斥锁的属性
// __robustness:可取值PTHREAD_MUTEX_STALLED 或 PTHREAD_MUTEX_ROBUST
int pthread_mutexattr_setrobust (pthread_mutexattr_t *__attr,
int __robustness)
;
int pthread_mutexattr_getrobust (const pthread_mutexattr_t *__attr,
int *__robustness)
;

// 当在健壮性的互斥锁上调用pthread_mutex_lock返回EOWNERDEAD的时候
// 可以使用该函数恢复互斥锁的状态
int pthread_mutex_consistent (pthread_mutex_t *__mutex);

PTHREAD_MUTEX_STALLED

默认属性,如果用PTHREAD_MUTEX_STALLED属性初始化互斥体,并且它的所有者没有解锁它就死了,互斥体之后仍然是锁定的,以后任何在互斥体上调用pthread_mutex_lock (3)的尝试都将永久陷入阻塞。

PTHREAD_MUTEX_ROBUST

如果使用PTHREAD_MUTEX_ROBUST属性初始化互斥体,并且其所有者在未解锁的情况下死亡,则以后对此互斥体调用pthread_mutex_lock (3)的任何尝试都将成功,并返回 EOWNERDEAD 以指示原始所有者已不存在,互斥体处于不一致状态。通常在 EOWNERDEAD 返回后,下一个拥有者应该在获取的互斥体上调用pthread_mutex_consistent (3),以使其再次一致,然后再使用它。

如果下一个所有者在使互斥体保持一致之前使用pthread_mutex_unlock (3)解锁互斥体,则互斥体将永久不可用,并且随后使用pthread_mutex_lock (3)锁定互斥体的任何尝试都将失败,错误为ENOTRECOVERABLE。 在这样的互斥体上唯一允许的操作是pthread_mutex_destroy (3)。如果下一个所有者在调用pthread_mutex_consistent (3)之前终止,则在此互斥体上进一步的pthread_mutex_lock (3)操作仍将返回EOWNERDEAD

使用实例

下面是我们在实际的项目开发上遇到的一个例子。为了进行跨进程的同步,我们将互斥锁和条件变量都创建在了共享内存上,并且设置了互斥锁的PTHREAD_PROCESS_SHARED属性。但是,当进程A在持有锁的时候死掉,进程B正好处于等待加锁的状态,此时进程B将陷入死锁的状态。我们可以通过设置进程锁的``属性,当出现上述情况的时候,pthread_mutex_lock将立马返回EOWNERDEAD,此时可以通过pthread_mutex_consistent将进程所恢复正常。

核心代码如下:

        // 
pthread_mutexattr_setrobust(&mutexattr,PTHREAD_MUTEX_ROBUST);

int ret = pthread_mutex_lock(&_shm_mutex->_mutex);
if (ret != 0)
{
if (ret == EOWNERDEAD)
{
if (pthread_mutex_consistent(&_shm_mutex->_mutex) == -1)
{
exit_on_error(true,pthread_condattr_destroy failed);
}
else
{
printf(pthread_mutex_consistent OK\r\n);
}
}
else
{
printf(pthread_mutex_lock failed,code:%d\r\n,ret);
exit_on_error(true,pthread_mutex_lock failed!);
}
}

完整代码示例

测试步骤:

  1. 编译代码
  2. 先启动productor,它会在持有锁的情况下休眠20s,然后再解锁
  3. 再运行consumer,它会去尝试获取锁
  4. 当productor在持有锁的过程中,将productor kill掉
  5. 此时可以看到consumer通过pthread_mutex_consistent将锁恢复。

如果在创建锁的时候不设置PTHREAD_MUTEX_ROBUST属性,然后将加锁的地方进行pthread_mutex_consistent的操作进行屏蔽,重复上面的1、2、3、4操作,将看到consumer永远的阻塞在了等待锁的地方。

// shm.h
#pragma once

#include <errno.h>
#include sys.h
#include <sys/mman.h> // for shm_open
#include <sys/stat.h> /* For mode constants */
#include <fcntl.h> /* For O_* constants */
#include <pthread.h> // for pthread_xx
#include <assert.h> // for assert
#include <unistd.h>
#include <stdlib.h>
#include <string.h>

struct SHM_MUTEX
{
pthread_mutex_t _mutex;
pthread_cond_t _cond;
};

static const char *shm_cond_name = shm_cond_name;

class SHM_SYNC_COND
{
public:
SHM_SYNC_COND() = default;
bool init(const char *shm_name,size_t elm_size,size_t eml_count)
{
assert(_data == nullptr);
assert(_shm_mutex == nullptr);

bool first_create = false;
struct stat st = {0};
char shm_path[256];
strcpy(shm_path,/dev/shm/);
strcat(shm_path,shm_cond_name);
printf(shm path:%s\r\n,shm_path);
if (stat(shm_path,&st) != 0)
{
first_create = true;
}

printf(first create shm file:%s\r\n,(first_create?true:false));
size_t s = sizeof(SHM_MUTEX) + elm_size * eml_count;// 计算内存大小
int shm_fd = shm_open(shm_name,O_CREAT | O_RDWR,0666);// 创建/打开共享内存文件
exit_on_error(shm_fd < 0,shm_open failed!);

if (first_create)
{
printf(fisrt to truncate shm file\r\n);
int ret = ftruncate(shm_fd,s); // 截断共享文件大小
exit_on_error(ret < 0,ftruncate failed!);
}

void *addr = mmap(NULL,s,PROT_WRITE | PROT_READ,MAP_SHARED,shm_fd,0);// 将共享内存文件进行内存映射
exit_on_error(addr == (void *)-1,mmap failed);

_shm_mutex = (SHM_MUTEX *)addr;// 获取共享内存锁
_data = (char *)addr + sizeof(SHM_MUTEX);

if (first_create)
{
printf(init mutex and cond\r\n);
pthread_mutexattr_t mutexattr;// 设置 mutex 的 PTHREAD_PROCESS_SHARED 属性
int ret = pthread_mutexattr_init(
exit_on_error(ret < 0,pthread_mutexattr_init failed);
ret = pthread_mutexattr_setpshared(&mutexattr,PTHREAD_PROCESS_SHARED);
exit_on_error(ret < 0,pthread_mutexattr_setpshared failed);
ret = pthread_mutexattr_setrobust(&mutexattr,PTHREAD_MUTEX_ROBUST);
exit_on_error(ret < 0,pthread_mutexattr_setrobust failed);
ret = pthread_mutex_init(&_shm_mutex->_mutex,
exit_on_error(ret < 0,pthread_mutex_init failed);
ret = pthread_mutexattr_destroy(
exit_on_error(ret < 0,pthread_mutexattr_destroy failed);


pthread_condattr_t condattr;// 设置 cond 的 PTHREAD_PROCESS_SHARED 属性
ret = pthread_condattr_init(
exit_on_error(ret < 0,pthread_condattr_init failed);
ret = pthread_condattr_setpshared(&condattr,PTHREAD_PROCESS_SHARED);
exit_on_error(ret < 0,pthread_condattr_setpshared failed);
ret = pthread_cond_init(&_shm_mutex->_cond,
exit_on_error(ret < 0,pthread_cond_init failed);
ret = pthread_condattr_destroy(
exit_on_error(ret < 0,pthread_condattr_destroy failed);
}
return true;
}

void notify()// 跨进程进行条件变量通知
{
assert(_data != nullptr);
assert(_shm_mutex != nullptr);
static int32_t index = 0;
pthread_mutex_lock(&_shm_mutex->_mutex);
printf(sleep 20s to notify %d..\r\n,++index);
sleep(10);
pthread_cond_broadcast(&_shm_mutex->_cond);
pthread_mutex_unlock(&_shm_mutex->_mutex);
}

void wait(int32_t wait_sec)// 跨进程进行条件变量等待
{
assert(_data != nullptr);
assert(_shm_mutex != nullptr);


struct timeval now;
struct timespec abstime;
gettimeofday(
printf(wait sec:%ld,nsec:%ld,wait_sec:%d\r\n,now.tv_sec,now.tv_usec*1000,wait_sec);
abstime.tv_nsec = (now.tv_usec) * 1000;
abstime.tv_sec = now.tv_sec + wait_sec;
int ret = pthread_mutex_lock(&_shm_mutex->_mutex);
if (ret != 0)
{
if (ret == EOWNERDEAD)
{
if (pthread_mutex_consistent(&_shm_mutex->_mutex) == -1)
{
exit_on_error(true,pthread_condattr_destroy failed);
}
else
{
printf(pthread_mutex_consistent OK\r\n);
}
}
else
{
printf(pthread_mutex_lock failed,code:%d\r\n,ret);
exit_on_error(true,pthread_mutex_lock failed!);
}
}
printf(wait sec:%ld,nsec:%ld\r\n,abstime.tv_sec,abstime.tv_nsec);
exit_on_error(pthread_cond_timedwait(&_shm_mutex->_cond,&_shm_mutex->_mutex,&abstime) == -1,pthread_cond_timedwait failed);
exit_on_error(pthread_mutex_unlock(&_shm_mutex->_mutex) == -1,pthread_mutex_unlock failed);
}

void *data_buf() const
{
assert(_data != nullptr);
return _data;
}

private:
SHM_MUTEX * _shm_mutex{nullptr};
void *_data{nullptr};
};

struct IPC_DATA // 跨进程通信时使用的结构体
{
pid_t _pid{0};
char _msg[256];
};
//consumer.cc
#include <sys/time.h>
#include <sys/types.h>
#include <sys/shm.h>
#include <stdlib.h>
#include <stdio.h>
#include sys.h
#include <unistd.h>
#include ./shm.h



int main()
{
const char *shm_name = shm_cond_name; // 共享内存文件名
SHM_SYNC_COND shm_cond;
size_t elm_size = sizeof(IPC_DATA);
size_t elm_count = 1;
bool ret = shm_cond.init(shm_name, elm_size,elm_count);// 创建共享锁以及内存映射
exit_on_error(ret == false,create shm cond failed);
IPC_DATA *datas = (IPC_DATA *)shm_cond.data_buf();// 获取开辟的内存映射数据地址

while(true)
{
static size_t index = 0;
printf(consumer wait for msg index:%lu\r\n);
shm_cond.wait(5); // 如果没有生产者进行notify,则需要等待10s超时,此处在10s内就已经被唤醒,说明条件变量跨进程生效
printf(wait for index:%lu,msg:%s,index,datas[0]._msg);
index++;
}
return 0;
}
// productor.cc
#include <sys/time.h>
#include <sys/types.h>
#include <sys/shm.h>
#include <stdlib.h>
#include <stdio.h>
#include sys.h
#include <unistd.h>

#include ./shm.h

int main()
{
const char *shm_name = shm_cond_name; // 共享内存文件名
SHM_SYNC_COND shm_cond;
size_t elm_size = sizeof(IPC_DATA);
size_t elm_count = 1;
shm_cond.init(shm_name, elm_size,elm_count);// 创建共享锁以及内存映射
IPC_DATA *datas = (IPC_DATA *)shm_cond.data_buf();// 获取开辟的内存映射数据地址

while(true) // 生产者,每隔5s发送一次数据,让消费者在被kill的时候阻塞在条件变量上
{
static size_t index = 0;
usleep(1000*1000*1);
datas[0]._pid = getpid();
snprintf(datas[0]._msg,sizeof(datas[0]._msg),[productor]write msg,pid:%d,index:%lu\r\n,getpid(),index);
shm_cond.notify();
index++;
}

return 0;
}
project(shm-mutex-cond-robust)

set(bin_name ${PROJECT_NAME})

set(CMAKE_CXX_FLAGS -pthread)

aux_source_directory(${CMAKE_CURRENT_SOURCE_DIR} SRC_FILES)

set(consumer_src consumer.cc)
set(productor_src productor.cc)
add_executable(consumer-robost ${consumer_src})
add_executable(productor-robost ${productor_src})

target_link_libraries(consumer-robost rt)
target_link_libraries(productor-robost rt)
举报

相关推荐

0 条评论