0
点赞
收藏
分享

微信扫一扫

epoll oneshot EPOLLONESHOT

钎探穗 2022-06-23 阅读 46

/* Epoll private bits inside the event mask */
#define EP_PRIVATE_BITS (EPOLLWAKEUP | EPOLLONESHOT | EPOLLET | EPOLLEXCLUSIVE)

主要是看下:惊群源:

1、socket wake_up 

2、epoll_wait 中wake_up 

 

目前data ready的时候调用sk_data_ready 唤醒进程,此时唤醒进程选择了  只唤醒一个

 

epoll oneshot  EPOLLONESHOT_#includeepoll oneshot  EPOLLONESHOT_数据_02

/ nr_exclusive是1  

static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,

int nr_exclusive, int wake_flags, void *key)

{

wait_queue_t *curr, *next;

list_for_each_entry_safe(curr, next, &q->task_list, task_list) {

unsigned flags = curr->flags;

if (curr->func(curr, mode, wake_flags, key) &&

(flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)

break;

}

}

View Code

 

(flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)

传进来的nr_exclusive是1, 所以flags & WQ_FLAG_EXCLUSIVE为真的时候,执行一次,就会跳出循环。

Epoll_create()在fork子进程之前

所有进程都共享一个 epfd, 所以data ready 唤醒进程的时候即使加上 nr_exclusive = 1 只唤醒一个进程, 那么唤醒那个一个呢?

也就是当连接到来时,我们需要选择一个进程来accept,这个时候,任何一个accept都是可以的。当连接建立以后,后续的读写事件,却与进程有了关联。一个请求与a进程建立连接后,后续的读写也应该由a进程来做。

当读写事件发生时,应该通知哪个进程呢?Epoll并不知道,因此,事件有可能错误通知另一个进程处理

Epoll_create()在fork子进程之后

每个进程的读写事件,只注册在自己进程的epoll中。所以不会出现竞争

但是accept呢???

目前有的内核版本说是会出现有的不会!!!

这就需要看内核版本实现了 无非就是唤醒的时候加上一些标志。。。当然是用reuseport 一劳永逸!!

如果不是是用reuseport实现只唤醒一个进程,那么wake_up的时候就是唤醒等待队列的头一个。。那怎么做到负载均衡呢???

所以还是reuseport好!!!!

epoll oneshot  EPOLLONESHOT_#includeepoll oneshot  EPOLLONESHOT_数据_02

*/
static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,
int nr_exclusive, int wake_flags, void *key)
{
wait_queue_t *curr, *next;

list_for_each_entry_safe(curr, next, &q->task_list, task_list) {
unsigned flags = curr->flags;

if (curr->func(curr, mode, wake_flags, key) &&
(flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
break;
}
}

void __wake_up(wait_queue_head_t *q, unsigned int mode,
int nr_exclusive, void *key)
{
unsigned long flags;

spin_lock_irqsave(&q->lock, flags);
__wake_up_common(q, mode, nr_exclusive, 0, key);
spin_unlock_irqrestore(&q->lock, flags);
}


/*
* This is the callback that is passed to the wait queue wakeup
* machanism. It is called by the stored file descriptors when they
* have events to report.
*/
static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
int pwake = 0;
unsigned long flags;
struct epitem *epi = ep_item_from_wait(wait);
struct eventpoll *ep = epi->ep;

spin_lock_irqsave(&ep->lock, flags);

/*
* If the event mask does not contain any poll(2) event, we consider the
* descriptor to be disabled. This condition is likely the effect of the
* EPOLLONESHOT bit that disables the descriptor when an event is received,
* until the next EPOLL_CTL_MOD will be issued.
*/
if (!(epi->event.events & ~EP_PRIVATE_BITS))
goto out_unlock;

/*
* Check the events coming with the callback. At this stage, not
* every device reports the events in the "key" parameter of the
* callback. We need to be able to handle both cases here, hence the
* test for "key" != NULL before the event match test.
*/
if (key && !((unsigned long) key & epi->event.events))
goto out_unlock;

/*
* If we are trasfering events to userspace, we can hold no locks
* (because we're accessing user memory, and because of linux f_op->poll()
* semantics). All the events that happens during that period of time are
* chained in ep->ovflist and requeued later on.
*/
if (unlikely(ep->ovflist != EP_UNACTIVE_PTR)) {
if (epi->next == EP_UNACTIVE_PTR) {
epi->next = ep->ovflist;
ep->ovflist = epi;
}
goto out_unlock;
}

/* If this file is already in the ready list we exit soon */
if (!ep_is_linked(&epi->rdllink))
list_add_tail(&epi->rdllink, &ep->rdllist);

/*
* Wake up ( if active ) both the eventpoll wait list and the ->poll()
* wait list.
*/
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;

out_unlock:
spin_unlock_irqrestore(&ep->lock, flags);

/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(&ep->poll_wait);

return 1;
}




static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
int pwake = 0;
unsigned long flags;
struct epitem *epi = ep_item_from_wait(wait);
struct eventpoll *ep = epi->ep;
int ewake = 0;

if ((unsigned long)key & POLLFREE) {
ep_pwq_from_wait(wait)->whead = NULL;
/*
* whead = NULL above can race with ep_remove_wait_queue()
* which can do another remove_wait_queue() after us, so we
* can't use __remove_wait_queue(). whead->lock is held by
* the caller.
*/
list_del_init(&wait->task_list);
}

spin_lock_irqsave(&ep->lock, flags);

/*
* If the event mask does not contain any poll(2) event, we consider the
* descriptor to be disabled. This condition is likely the effect of the
* EPOLLONESHOT bit that disables the descriptor when an event is received,
* until the next EPOLL_CTL_MOD will be issued.
*/
if (!(epi->event.events & ~EP_PRIVATE_BITS))
goto out_unlock;

/*
* Check the events coming with the callback. At this stage, not
* every device reports the events in the "key" parameter of the
* callback. We need to be able to handle both cases here, hence the
* test for "key" != NULL before the event match test.
*/
if (key && !((unsigned long) key & epi->event.events))
goto out_unlock;

/*
* If we are transferring events to userspace, we can hold no locks
* (because we're accessing user memory, and because of linux f_op->poll()
* semantics). All the events that happen during that period of time are
* chained in ep->ovflist and requeued later on.
*/
if (unlikely(ep->ovflist != EP_UNACTIVE_PTR)) {
if (epi->next == EP_UNACTIVE_PTR) {
epi->next = ep->ovflist;
ep->ovflist = epi;
if (epi->ws) {
/*
* Activate ep->ws since epi->ws may get
* deactivated at any time.
*/
__pm_stay_awake(ep->ws);
}

}
goto out_unlock;
}

/* If this file is already in the ready list we exit soon */
if (!ep_is_linked(&epi->rdllink)) {
list_add_tail(&epi->rdllink, &ep->rdllist);
ep_pm_stay_awake_rcu(epi);
}

/*
* Wake up ( if active ) both the eventpoll wait list and the ->poll()
* wait list.
*/
if (waitqueue_active(&ep->wq)) {
if ((epi->event.events & EPOLLEXCLUSIVE) &&
!((unsigned long)key & POLLFREE)) {
switch ((unsigned long)key & EPOLLINOUT_BITS) {
case POLLIN:
if (epi->event.events & POLLIN)
ewake = 1;
break;
case POLLOUT:
if (epi->event.events & POLLOUT)
ewake = 1;
break;
case 0:
ewake = 1;
break;
}
}
wake_up_locked(&ep->wq);
}
if (waitqueue_active(&ep->poll_wait))
pwake++;

out_unlock:
spin_unlock_irqrestore(&ep->lock, flags);

/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(&ep->poll_wait);

if (epi->event.events & EPOLLEXCLUSIVE)
return ewake;

return 1;
}

View Code

/*
* This is the callback that is used to add our wait queue to the
* target file wakeup lists.
*/
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
poll_table *pt)
{
struct epitem *epi = ep_item_from_epqueue(pt);
struct eppoll_entry *pwq;

if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
pwq->whead = whead;
pwq->base = epi;
if (epi->event.events & EPOLLEXCLUSIVE)
add_wait_queue_exclusive(whead, &pwq->wait);
else
add_wait_queue(whead, &pwq->wait);
list_add_tail(&pwq->llink, &epi->pwqlist);
epi->nwait++;
} else {
/* We have to signal that an error occurred */
epi->nwait = -1;
}
}

根据EPOLLEXCLUSIVE 加入到不同的唤醒 队列add_wait_queue_exclusive  add_wait_queue

在wake_up的时候 通过nr-exclu  控制 但是 要想break还需要返回 ep_poll_callback返回 true;

 

对于epoll_oneshot 

在epoll_wait后  send fd 到user时,

if (epi->event.events & EPOLLONESHOT)  //不会重复添加进去 导致后续链式唤醒
epi->event.events &= EP_PRIVATE_BITS;
else if (!(epi->event.events & EPOLLET)) {
list_add_tail(&epi->rdllink, &ep->rdllist);
ep_pm_stay_awake(epi);
}
epi->event.events &= EP_PRIVATE_BITS;  将设置了 epolloneshot的fd  需要监听的事件掩码全部清楚 只保留 oneshot标志位
同时在不会执行list_add 添加到ep的双向链表中

同时在ep_call_back的时候也会 继续检查 EPOLLONESHOT ,防止 epoll_wait返回时,在处理data中,fd又有数据需要相应,此时多线程 中别的线程可以相应。。。。乱序了!!!

/*
* If the event mask does not contain any poll(2) event, we consider the descriptor to be disabled. This condition is likely the effect of the
* EPOLLONESHOT bit that disables the descriptor when an event is received,* until the next EPOLL_CTL_MOD will be issued.
*/
if (!(epi->event.events & ~EP_PRIVATE_BITS))
goto out_unlock;

之前 event 被设置为   EP_PRIVATE_BITS  所以现在直接  goto unlock;

  

  即使使用ET模式,一个socket上的某个事件还是可能被触发多次。比如:一个线程在读取完某个socket上的数据后开始处理这些数据,而在数据的处理过程中该socket上又有新数据可读(EPOLLIN再次被触发),此时另外一个线程被唤醒用来读取这些新的数据。于是就出现了两个线程同时操作一个socket的局面。而我们希望一个socket连接在任一时刻都只能被一个线程处理,这就可以通过EPOLLONESHOT事件实现。
  对于注册了EPOLLONESHOT事件的文件描述符,操作系统最多触发其上注册的一个事件,且只触发一次,除非我们使用epoll_ctl函数重置该文件描述符上注册的EPOLLONESHOT事件。这样,在一个线程使用socket时,其他线程无法操作socket。同样,只有在该socket被处理完后,须立即重置该socket的EPOLLONESHOT事件,以确保这个socket在下次可读时,其EPOLLIN事件能够被触发,进而让其他线程有机会操作这个socket。

/* EPOLLONESHOT 事件的使用
** 运行命令: g++ filename.cpp -lpthread; ./a.out 127.0.0.1 6666
** 可以使用telnet连接该服务器(telnet 127.0.0.1 6666)
*/


#include <iostream>
#include <cstdio>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/sendfile.h>
#include <unistd.h>
#include <signal.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <cstdlib>
#include <cstring>
#include <fcntl.h>
#include <sys/epoll.h>
#include <pthread.h>
using namespace std;

const int MAX_EVENT_NUMBER = 1024;
const int BUF_SIZE = 1024;

struct fds{
int epoll_fd;
int sock_fd;
};

void err( int line ) {
cout << "error_line: " << line << endl;
}

int setnonblocking( int fd ) {
int old_option = fcntl( fd, F_GETFL );
int new_option = old_option | O_NONBLOCK;
fcntl( fd, F_SETFL, new_option );
return old_option;
}

void addfd( int epoll_fd, int fd, bool oneshot ) {
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET;
if ( oneshot ) {
event.events |= EPOLLONESHOT;
}
epoll_ctl( epoll_fd, EPOLL_CTL_ADD, fd, &event );
setnonblocking( fd );
}

void reset_oneshot( int epoll_fd, int fd ) {
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
epoll_ctl( epoll_fd, EPOLL_CTL_MOD, fd, &event );
}

void * worker( void * arg ) {
int sock_fd = ( (fds *)arg )->sock_fd;
int epoll_fd = ( (fds *)arg )->epoll_fd;
printf("start new thread to receive data on fd: %d\n", sock_fd);
char buf[BUF_SIZE];
memset( buf, 0, sizeof( buf ) );

while( true ) {
int ret = recv( sock_fd, buf, BUF_SIZE - 1, 0 );
if ( !ret ) {
close( sock_fd );
cout << "foreiner closed the connection\n";
break;
} else if ( ret < 0 ) {
if ( errno == EAGAIN ) { //数据未读完,需要再次读取
reset_oneshot( epoll_fd, sock_fd );
cout << "read later\n";
break;
}
} else {
printf( "got %d bytes of data: %s\n", ret, buf );
sleep(5);
}
}
printf("end thread receiving data on fd: %d\n", sock_fd);
}

int main( int argc, char * argv[] ) {
if ( argc < 3 ) {
printf( "usage: ./file ip_number port_number\n" );
return 1;
}

const char * ip = argv[1];
const int port = atoi( argv[2] );

struct sockaddr_in address;
memset( &address, 0, sizeof( address ) );
address.sin_family = AF_INET;
address.sin_port = htons( port );
inet_pton( AF_INET, ip, &address.sin_addr );

int sock_fd = socket( AF_INET, SOCK_STREAM, 0 );
if ( sock_fd < 0 ) {
err( __LINE__ );
}

int ret = bind( sock_fd, ( struct sockaddr * )&address, sizeof( address ) );
if ( sock_fd < 0 ) {
err( __LINE__ );
}

ret = listen( sock_fd, 5 );
if ( sock_fd < 0 ) {
err( __LINE__ );
}

epoll_event events[MAX_EVENT_NUMBER];
int epoll_fd = epoll_create( 5 );
if ( epoll_fd < 0 ) {
err( __LINE__ );
}

/* 监听socket的 sock_fd 不能注册 EPOLLONESHOT 事件,否则程序只能处理一个客户连接
** 因为后续的客户连接请求将不再触发sock_fd的 EPOLLIN 事件
*/
addfd( epoll_fd, sock_fd, false );

while( true ) {
ret = epoll_wait( epoll_fd, events, MAX_EVENT_NUMBER, -1 ); //等待事件发生
if ( ret < 0 ) {
printf( "epoll failure\n" );
break;
}

for ( int i = 0; i < ret; i++ ) {
int fd = events[i].data.fd;
if ( fd == sock_fd ) { //有新的连接请求
struct sockaddr_in client;
socklen_t client_length = sizeof( client );
int conn_fd = accept( sock_fd, ( struct sockaddr * )&client,
&client_length );

//对每个非监听文件描述符都注册 EPOLLONESHOT 事件
//添加的是刚accept的fd
addfd( epoll_fd, conn_fd, true );
} else if ( events[i].events & EPOLLIN ) { //有可读取数据
pthread_t thread;
fds fds_for_new_worker;
fds_for_new_worker.epoll_fd = epoll_fd;
fds_for_new_worker.sock_fd = fd; //内核事件表中的fd,不要搞混

//新启动一个线程为sock_fd服务
pthread_create( &thread, NULL, worker, (void *)&fds_for_new_worker );
} else {
printf( "something else happened\n" );
}
}
}

close(sock_fd);

return 0;
}

 

http代理服务器(3-4-7层代理)-网络事件库公共组件、内核kernel驱动 摄像头驱动 tcpip网络协议栈、netfilter、bridge 好像看过!!!! 但行好事 莫问前程 --身高体重180的胖子

举报

相关推荐

0 条评论