0
点赞
收藏
分享

微信扫一扫

log&& buffevent&&内存池 2

  基于套接字的 bufferevent使用 libevent 的底层事件机制来检测底层网络套接字是否已经就绪,可以进行读写操作,并且使用底层网络调用(如 readv 、 writev)来发送和接收数据。

libevent的bufferevent 是基于event_base 的基础上再次封装一层!其本质还是离不开event和event_base,分析bufferevent结构体可以看出这一点!!

/**
Shared implementation of a bufferevent.

This type is exposed only because it was exposed in previous versions,
and some people's code may rely on manipulating it. Otherwise, you
should really not rely on the layout, size, or contents of this structure:
it is fairly volatile, and WILL change in future versions of the code.
**/
struct bufferevent {
/** Event base for which this bufferevent was created. */
struct event_base *ev_base;
/** Pointer to a table of function pointers to set up how this
bufferevent behaves. */
const struct bufferevent_ops *be_ops;

/** A read event that triggers when a timeout has happened or a socket
is ready to read data. Only used by some subtypes of
bufferevent. */
struct event ev_read; //读事件 event
/** A write event that triggers when a timeout has happened or a socket
is ready to write data. Only used by some subtypes of
bufferevent. */
struct event ev_write;// 写事件

/** An input buffer. Only the bufferevent is allowed to add data to
this buffer, though the user is allowed to drain it. */
struct evbuffer *input;//读缓冲区

/** An input buffer. Only the bufferevent is allowed to drain data
from this buffer, though the user is allowed to add it. */
struct evbuffer *output;// 写缓冲区

struct event_watermark wm_read;//读水位
struct event_watermark wm_write;//写水位

bufferevent_data_cb readcb; //可读时回调函数指针
bufferevent_data_cb writecb;//可写时回调函数指针
/* This should be called 'eventcb', but renaming it would break
* backward compatibility */
bufferevent_event_cb errorcb;
void *cbarg;

struct timeval timeout_read; // 读事件event的超时值
struct timeval timeout_write;//写事件event的超时值

/** Events that are currently enabled: currently EV_READ and EV_WRITE
are supported. */
short enabled;
};

  bufferevent 相当于一个公共部分,那么 肯定有使用bufferevent进行更一步适配业务扩展的结构!bufferevent_private 是 bufferevent的一个子集

/** Parts of the bufferevent structure that are shared among all bufferevent
* types, but not exposed in bufferevent_struct.h. */
struct bufferevent_private {
/** The underlying bufferevent structure. */
struct bufferevent bev;

/** Evbuffer callback to enforce watermarks on input. */
// 设置 input evbuff 的高水位时 需要一个evbuffer 回调函数
struct evbuffer_cb_entry *read_watermarks_cb;

/** If set, we should free the lock when we free the bufferevent. */
unsigned own_lock : 1;

/** Flag: set if we have deferred callbacks and a read callback is
* pending. */
unsigned readcb_pending : 1;
/** Flag: set if we have deferred callbacks and a write callback is
* pending. */
unsigned writecb_pending : 1;
/** Flag: set if we are currently busy connecting. */
unsigned connecting : 1; // socket 连接状态
/** Flag: set if a connect failed prematurely; this is a hack for
* getting around the bufferevent abstraction. */
unsigned connection_refused : 1; // socket connect failed 连接失败
/** Set to the events pending if we have deferred callbacks and
* an events callback is pending. */
short eventcb_pending;

/** If set, read is suspended until one or more conditions are over.
* The actual value here is a bitfield of those conditions; see the
* BEV_SUSPEND_* flags above. */
bufferevent_suspend_flags read_suspended;// 标志是什么原因把 读 挂起来

/** If set, writing is suspended until one or more conditions are over.
* The actual value here is a bitfield of those conditions; see the
* BEV_SUSPEND_* flags above. */
bufferevent_suspend_flags write_suspended; // 标志是什么原因吧 写 挂起来

/** Set to the current socket errno if we have deferred callbacks and
* an events callback is pending. */
int errno_pending;

/** The DNS error code for bufferevent_socket_connect_hostname */
int dns_error;

/** Used to implement deferred callbacks */
struct event_callback deferred;

/** The options this bufferevent was constructed with */
enum bufferevent_options options;

/** Current reference count for this bufferevent. */
int refcnt;

/** Lock for this bufferevent. Shared by the inbuf and the outbuf.
* If NULL, locking is disabled. */
void *lock;

/** No matter how big our bucket gets, don't try to read more than this
* much in a single read operation. */
ev_ssize_t max_single_read;

/** No matter how big our bucket gets, don't try to write more than this
* much in a single write operation. */
ev_ssize_t max_single_write;

/** Rate-limiting information for this bufferevent */
struct bufferevent_rate_limit *rate_limiting;

  bufferevent的结构体中有两个event,分别用来监听同一个fd的可读事件以及可写事件!! 为什么不用同一个fd-event同时监听一个可读和可写???

创建基于套接字的bufferevent

  可以使用 bufferevent_socket_new()创建基于套接字的 bufferevent。

struct bufferevent *
bufferevent_socket_new(struct event_base *base, evutil_socket_t fd, int options)
struct bufferevent *
bufferevent_socket_new(struct event_base *base, evutil_socket_t fd,
int options)
{
struct bufferevent_private *bufev_p;
struct bufferevent *bufev;
// alloc bufev_p for bufferevent
if ((bufev_p = mm_calloc(1, sizeof(struct bufferevent_private)))== NULL)
return NULL;
// 设置 新建一个新的输入 输出 缓存区
if (bufferevent_init_common_(bufev_p, base, &bufferevent_ops_socket,
options) < 0) {
mm_free(bufev_p);
return NULL;
}
bufev = &bufev_p->bev;
//设置 evbuff 的数据 想fd
evbuffer_set_flags(bufev->output, EVBUFFER_FLAG_DRAINS_TO_FD);
// 将fd 和 event相关联 同一个fd 关联两个event 每次 只会监听fd 读事件或者写事件中的一个----->
// 这个 相当于 socket 的
event_assign(&bufev->ev_read, bufev->ev_base, fd,
EV_READ|EV_PERSIST|EV_FINALIZE, bufferevent_readcb, bufev);
event_assign(&bufev->ev_write, bufev->ev_base, fd,
EV_WRITE|EV_PERSIST|EV_FINALIZE, bufferevent_writecb, bufev);

// 设置evbuff的回调函数 外界给写缓冲区添加数据时
evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);

evbuffer_freeze(bufev->input, 0); //设置读写标志
evbuffer_freeze(bufev->output, 1);// 设置读写标志

return bufev;
}

  函数在最开始的时候 会冻结两个缓冲区,但是在读写数据之前,会解冻---解冻读写数据完成后,又会去冻结数据, 这样是为了 防止意外操作数据

int
bufferevent_init_common_(struct bufferevent_private *bufev_private,
struct event_base *base,
const struct bufferevent_ops *ops,
enum bufferevent_options options)
{
struct bufferevent *bufev = &bufev_private->bev;

if (!bufev->input) {//分配输入缓冲区
if ((bufev->input = evbuffer_new()) == NULL)
return -1;
}

if (!bufev->output) {//分配输出冲区
if ((bufev->output = evbuffer_new()) == NULL) {
evbuffer_free(bufev->input);
return -1;
}
}

bufev_private->refcnt = 1;// 引用次数
bufev->ev_base = base;

/* Disable timeouts. 读写event 没有开始 超时 */

evutil_timerclear(&bufev->timeout_read);
evutil_timerclear(&bufev->timeout_write);

bufev->be_ops = ops;

bufferevent_ratelim_init_(bufev_private);

/*
* Set to EV_WRITE so that using bufferevent_write is going to
* trigger a callback. Reading needs to be explicitly enabled
* because otherwise no data will be available.
*/
bufev->enabled = EV_WRITE;//默认是支持写操作

#ifndef EVENT__DISABLE_THREAD_SUPPORT
if (options & BEV_OPT_THREADSAFE) {
if (bufferevent_enable_locking_(bufev, NULL) < 0) {
/* cleanup */
evbuffer_free(bufev->input);
evbuffer_free(bufev->output);
bufev->input = NULL;
bufev->output = NULL;
return -1;
}
}
#endif
if ((options & (BEV_OPT_DEFER_CALLBACKS|BEV_OPT_UNLOCK_CALLBACKS))
== BEV_OPT_UNLOCK_CALLBACKS) {
event_warnx("UNLOCK_CALLBACKS requires DEFER_CALLBACKS");
return -1;
}
if (options & BEV_OPT_UNLOCK_CALLBACKS)
event_deferred_cb_init_(
&bufev_private->deferred,
event_base_get_npriorities(base) / 2,
bufferevent_run_deferred_callbacks_unlocked,
bufev_private);
else
event_deferred_cb_init_(
&bufev_private->deferred,
event_base_get_npriorities(base) / 2,
bufferevent_run_deferred_callbacks_locked,
bufev_private);

bufev_private->options = options;
// 设置 evbuffer 和bufferevent 相关联
evbuffer_set_parent_(bufev->input, bufev);
evbuffer_set_parent_(bufev->output, bufev);

return 0;
}
event_assign 只是将fd 和read/write 回调函数关联起来, 此时的回调函数主要是从fd recv数据 或者 write数据到fd;类似于TCP/IP分层看;这只是L2 的逻辑;
我们再看L3 逻辑, ip_rcv 结束后 就会调用IP_proto_cb函数--tcp_rcv; 同样此时就会调用 evbuff的回调;evbuff_read/write_cb;
那么此时L3-evbuff的回调是啥呢??
bufferevent_setcb(bufev, readcb, writecb, eventcb, cbarg);
void
bufferevent_setcb(struct bufferevent *bufev,
bufferevent_data_cb readcb, bufferevent_data_cb writecb,
bufferevent_event_cb eventcb, void *cbarg)
{
BEV_LOCK(bufev);

bufev->readcb = readcb;
bufev->writecb = writecb;
bufev->errorcb = eventcb;

bufev->cbarg = cbarg;
BEV_UNLOCK(bufev);
}

  设置 L3 的回调处理!!

但是调用了bufferevent_socket_new和bufferevent_setcb,这个bufferevent还是不能工作,必须调用bufferevent_enable。此时需要 将fd add insert 到 epoll_wait里面去

int
bufferevent_enable(struct bufferevent *bufev, short event)
{
struct bufferevent_private *bufev_private =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
short impl_events = event;
int r = 0;
//增加引用并加锁
//增加引用是为了防止其他线程调用bufferevent_free,释放了bufferevent
_bufferevent_incref_and_lock(bufev);
//挂起了读,此时不能监听读事件
if (bufev_private->read_suspended)
impl_events &= ~EV_READ;
//挂起了写,此时不能监听写事情
if (bufev_private->write_suspended)
impl_events &= ~EV_WRITE;
bufev->enabled |= event;
//调用对应类型的enbale函数。因为不同类型的bufferevent有不同的enable函数
if (impl_events && bufev->be_ops->enable(bufev, impl_events) < 0)
r = -1;
//减少引用并解锁
_bufferevent_decref_and_unlock(bufev);
return r;
}

 

最终会调用对应bufferevent类型的enable函数,对于socket bufferevent,其enable函数是be_socket_enable,代码如下:

static int
be_socket_enable(struct bufferevent *bufev, short event)
{
if (event & EV_READ) {
if (be_socket_add(&bufev->ev_read,&bufev->timeout_read) == -1)
return -1;
}
if (event & EV_WRITE) {
if (be_socket_add(&bufev->ev_write,&bufev->timeout_write) == -1)
return -1;
}
return 0;
}

 

int
_bufferevent_add_event(struct event *ev, const struct timeval *tv)
{
if (tv->tv_sec == 0 && tv->tv_usec == 0)
return event_add(ev, NULL);
else
return event_add(ev, tv);
}
//bufferevent_sock.c文件
#define be_socket_add(ev, t) \
_bufferevent_add_event((ev), (t))

读事件:

  虽然控制了evbuffer的大小,但socket fd可能还有数据。有数据就会触发可读事件,但处理可读的时候,又会发现设置了高水位,不能读取数据evbuffer。socket fd的数据没有被读完,相当于是个矛盾体!!   实际上是不会,因为Libevent发现evbuffer的数据量到达高水位后,就会把可读事件给挂起来,让它不能再触发了。Libevent使用函数bufferevent_wm_suspend_read把监听读事件的event挂起来

void
bufferevent_suspend_read(struct bufferevent *bufev, bufferevent_suspend_flags what)
{
struct bufferevent_private *bufev_private =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
BEV_LOCK(bufev);
if (!bufev_private->read_suspended)//不能挂多次
bufev->be_ops->disable(bufev, EV_READ);//实际调用be_socket_disable函数
bufev_private->read_suspended |= what;//因何而被挂起
BEV_UNLOCK(bufev);
}
//bufferevent_sock.c文件
static int
be_socket_disable(struct bufferevent *bufev, short event)
{
struct bufferevent_private *bufev_p =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
if (event & EV_READ) {
if (event_del(&bufev->ev_read) == -1)
return -1;
}
/* Don't actually disable the write if we are trying to connect. */
if ((event & EV_WRITE) && ! bufev_p->connecting) {
if (event_del(&bufev->ev_write) == -1)//删掉这个event
return -1;
}
return 0;
}

  是直接删除这个监听读/写事件的event

那么什么时候取消挂起,让bufferevent可以继续读socket 数据呢?从高水位的意义来说,当然是当evbuffer里面的数据量小于高水位时,就能再次读取socket数据了!!当evbuffer里面的数据添加或者删除时,是会触发一些回调函数的。so用户移除evbuffer的一些数据量时,Libevent就会检查这个evbuffer的数据量是否小于高水位,如果小于的话,那么就恢复 读事件。

void
bufferevent_setwatermark(struct bufferevent *bufev, short events,
size_t lowmark, size_t highmark)
{
struct bufferevent_private *bufev_private =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
BEV_LOCK(bufev);
if (events & EV_READ) {
bufev->wm_read.low = lowmark;
bufev->wm_read.high = highmark;
if (highmark) {//高水位
/* There is now a new high-water mark for read.
enable the callback if needed, and see if we should
suspend/bufferevent_wm_unsuspend. */
//还没设置高水位的回调函数
if (bufev_private->read_watermarks_cb == NULL) {
bufev_private->read_watermarks_cb =
evbuffer_add_cb(bufev->input,
bufferevent_inbuf_wm_cb,
bufev);//添加回调函数
}
evbuffer_cb_set_flags(bufev->input,
bufev_private->read_watermarks_cb,
EVBUFFER_CB_ENABLED|EVBUFFER_CB_NODEFER);
//设置(修改)高水位时,evbuffer的数据量已经超过了水位值
//可能是把之前的高水位调高或者调低
//挂起操作和取消挂起操作都是幂等的(即多次挂起的作用等同于挂起一次)
if (evbuffer_get_length(bufev->input) > highmark)
bufferevent_wm_suspend_read(bufev);
else if (evbuffer_get_length(bufev->input) < highmark)//调低了
bufferevent_wm_unsuspend_read(bufev);
} else {
//高水位值等于0,那么就要取消挂起 读事件
//取消挂起操作是幂等的
/* There is now no high-water mark for read. */
if (bufev_private->read_watermarks_cb)
evbuffer_cb_clear_flags(bufev->input,
bufev_private->read_watermarks_cb,
EVBUFFER_CB_ENABLED);
bufferevent_wm_unsuspend_read(bufev);
}
}
BEV_UNLOCK(bufev);
}

 现在假设用户移除了一些evbuffer的数据,进而触发了evbuffer的回调函数,当然也就调用了函数bufferevent_inbuf_wm_cb

static void
bufferevent_inbuf_wm_cb(struct evbuffer *buf,
const struct evbuffer_cb_info *cbinfo,
void *arg)
{
struct bufferevent *bufev = arg;
size_t size;
size = evbuffer_get_length(buf);
if (size >= bufev->wm_read.high)
bufferevent_wm_suspend_read(bufev);
else
bufferevent_wm_unsuspend_read(bufev);
}
//bufferevent-internal.h文件
#define bufferevent_wm_unsuspend_read(b) \
bufferevent_unsuspend_read((b), BEV_SUSPEND_WM)
//bufferevent.c文件
void
bufferevent_unsuspend_read(struct bufferevent *bufev, bufferevent_suspend_flags what)
{
struct bufferevent_private *bufev_private =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
BEV_LOCK(bufev);
bufev_private->read_suspended &= ~what;
if (!bufev_private->read_suspended && (bufev->enabled & EV_READ))
bufev->be_ops->enable(bufev, EV_READ);//重新把event插入到event_base中
BEV_UNLOCK(bufev);
}

bufferevent_setcb:

  当socket有数据可读时,Libevent就会监听到,然后调用bufferevent_readcb函数处理。该函数会调用evbuffer_read函数,把数据从socket fd中读取到evbuffer中。然后再调用用户在bufferevent_setcb函数中设置的读事件回调函数。所以,当用户的读事件回调函数被调用时,数据已经在evbuffer中了;

对于ET的fd recv----->对用户的读事件回调函数的触发是边缘触发的。这也就要求,在回调函数中,用户应该尽可能地把evbuffer的所有数据都读出来,如果想等到下一次回调时再读,那么需要等到下一次socketfd接收到数据才会触发用户的回调函数。如果之后socket fd一直收不到任何数据,那么即使evbuffer还有数据,用户的回调函数也不会被调用了。

 写事件

  对于可写就是fd的写缓冲区(这个缓冲区在内核)还没满,可以往里面放数据。这就有一个问题------>如果写缓冲区没有满,那么就一直是可写状态。如果一个event监听了可写事件,那么这个event就会一直被触发,因为一般情况下,如果不是发大量的数据这个写缓冲区是不会满的。

所以一般 写事件都是 先send 数据, 发送失败后,缓冲区写满了,还有数据没有发送,就将fd insert epoll wait 可写!!Libevent的做法是:当我们确实要写入数据时,才监听可写事件。也就是说我们调用bufferevent_write写入数据时,Libevent才会把监听可写事件的那个event注册到event_base中。当Libevent把数据都写入到fd的缓冲区后,Libevent又会把这个event从event_base中删除。比较麻烦,感觉此时这样设计有点不好。   

ps:调用connect时,可能还没连接上就返回了。对于非阻塞socket fd,一般是通过判断这个socket是否可写,从而得知这个socket是否已经连接上服务器。如果可写,那么它就已经成功连接上服务器了!!

log&& buffevent&&内存池 2_数据log&& buffevent&&内存池 2_数据_02

static void
bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
{
struct bufferevent *bufev = arg;
struct bufferevent_private *bufev_p =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
int res = 0;
short what = BEV_EVENT_WRITING;
int connected = 0;
ev_ssize_t atmost = -1;
_bufferevent_incref_and_lock(bufev);
if (event == EV_TIMEOUT) {
/* Note that we only check for event==EV_TIMEOUT. If
* event==EV_TIMEOUT|EV_WRITE, we can safely ignore the
* timeout, since a read has occurred */
what |= BEV_EVENT_TIMEOUT;
goto error;
}
...//判断这个socket是否已经连接上服务器了
//用户可能设置了限速,如果没有限速,那么atmost将返回16384(16K)
atmost = _bufferevent_get_write_max(bufev_p);
//一些原因导致写被挂起来了
if (bufev_p->write_suspended)
goto done;
//如果evbuffer有数据可以写到sockfd中
if (evbuffer_get_length(bufev->output)) {
//解冻链表头
evbuffer_unfreeze(bufev->output, 1);
//将output这个evbuffer的数据写到socket fd 的缓冲区中
//会把已经写到socket fd缓冲区的数据,从evbuffer中删除
res = evbuffer_write_atmost(bufev->output, fd, atmost);
evbuffer_freeze(bufev->output, 1);
if (res == -1) {
int err = evutil_socket_geterror(fd);
if (EVUTIL_ERR_RW_RETRIABLE(err))//可以恢复的错误。一般是EINTR或者EAGAIN
goto reschedule;
what |= BEV_EVENT_ERROR;
} else if (res == 0) {//该socket已经断开连接了
what |= BEV_EVENT_EOF;
}
if (res <= 0)
goto error;
}
//如果把写缓冲区的数据都写完成了。为了防止event_base不断地触发可写
//事件,此时要把这个监听可写的event删除。
//前面的atmost限制了一次最大的可写数据。如果还没写所有的数据
//那么就不能delete这个event,而是要继续监听可写事情,知道把所有的
//数据都写到socket fd中。
if (evbuffer_get_length(bufev->output) == 0) {
event_del(&bufev->ev_write);
}
//如果evbuffer里面的数据量已经写得七七八八了,小于设置的低水位值,那么
//就会调用用户设置的写事件回调函数
if ((res || !connected) &&
evbuffer_get_length(bufev->output) <= bufev->wm_write.low) {
_bufferevent_run_writecb(bufev);
}
goto done;
reschedule:
if (evbuffer_get_length(bufev->output) == 0) {
event_del(&bufev->ev_write);
}
goto done;
error:
bufferevent_disable(bufev, EV_WRITE);//有错误。把这个写event删除
_bufferevent_run_eventcb(bufev, what);
done:
_bufferevent_decref_and_unlock(bufev);
}

View Code

  调用evbuffer_write_atmost函数把数据从evbuffer中send 到fd 中;还要判断evbuffer的数据是否已经全部写到socket 的缓冲区了。如果没有写完, 那就继续监听fd 是否可写了!!!用户一般是通过bufferevent_write函数把数据写入到evbuffer,写入evbuffer后,接着就会被写入socket

int
bufferevent_write(struct bufferevent *bufev, const void *data, size_t size)
{
if (evbuffer_add(bufev->output, data, size) == -1)
return (-1);
return 0;
}
//buffer.c文件
int
evbuffer_add(struct evbuffer *buf, const void *data_in, size_t datlen)
{
...
out:
evbuffer_invoke_callbacks(buf);//调用回调函数
result = 0;
done:
return result;
}

前面的bufferevent_socket_new函数吗?该函数里面会有

evbuffer_add_cb(bufev->output,bufferevent_socket_outbuf_cb, bufev);

 当bufferevent的写缓冲区output的数据发生变化时,函数bufferevent_socket_outbuf_cb就会被调用;

bufferevent_socket_connect(struct bufferevent *bev,
struct sockaddr *sa, int socklen)
{

r = evutil_socket_connect(&fd, sa, socklen);//非阻塞connect

//为bufferevent里面的两个event设置监听的fd
//后面会调用bufferevent_enable
bufferevent_setfd(bev, fd);
if (r == 0) {//暂时还没连接上,因为fd是非阻塞的
//此时需要监听可写事件,当可写了,并且没有错误的话,就成功连接上了
if (! be_socket_enable(bev, EV_WRITE)) {
bufev_p->connecting = 1;//标志这个sockfd正在连接
result = 0;
goto done;
}
} else if (r == 1) {//已经连接上了
/* The connect succeeded already. How very BSD of it. */
result = 0;
bufev_p->connecting = 1;
event_active(&bev->ev_write, EV_WRITE, 1);//手动激活这个event
} else {// connection refused
/* The connect failed already. How very BSD of it. */
bufev_p->connection_refused = 1;
bufev_p->connecting = 1;
result = 0;
event_active(&bev->ev_write, EV_WRITE, 1);//手动激活这个event
}
goto done;
freesock:
_bufferevent_run_eventcb(bev, BEV_EVENT_ERROR);//出现错误
if (ownfd)
evutil_closesocket(fd);
done:
_bufferevent_decref_and_unlock(bev);
return result;
}

  即使connect的时候被拒绝,或者已经连接上了,都会手动激活这个event。一个event即使没有加入event_base,也是可以手动激活的!! 这个激活是指放入 event active 队列; ev_base 执行dispatch的时候会遍历event active 队列,这些队列表示 有可读 可写 定时器事件需要处理!!!   已经连接上了为啥 要去激活呢?? 还有连接失败了 直接处理错误不好吗!!这个和自己预想的逻辑有点区别,不过这个思想相当与统一了模型, 事件唤醒先不处理,都post到active  queue,后续最后统一处理,或者根据active queue 优先级 依次处理!!!

  对于libevent的应用可以看起le-proxy 写的比较经典!!!附上代码

log&& buffevent&&内存池 2_数据log&& buffevent&&内存池 2_数据_02

/*
This example code shows how to write an (optionally encrypting) SSL proxy
with Libevent's bufferevent layer.

XXX It's a little ugly and should probably be cleaned up.
*/

// Get rid of OSX 10.7 and greater deprecation warnings.
#if defined(__APPLE__) && defined(__clang__)
#pragma clang diagnostic ignored "-Wdeprecated-declarations"
#endif

#include <stdio.h>
#include <assert.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>

#ifdef _WIN32
#include <winsock2.h>
#include <ws2tcpip.h>
#else
#include <sys/socket.h>
#include <netinet/in.h>
#endif

#include <event2/bufferevent_ssl.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <event2/listener.h>
#include <event2/util.h>

#include <openssl/ssl.h>
#include <openssl/err.h>
#include <openssl/rand.h>

static struct event_base *base;
static struct sockaddr_storage listen_on_addr;
static struct sockaddr_storage connect_to_addr;
static int connect_to_addrlen;
static int use_wrapper = 1;

static SSL_CTX *ssl_ctx = NULL;

#define MAX_OUTPUT (512*1024)

static void drained_writecb(struct bufferevent *bev, void *ctx);
static void eventcb(struct bufferevent *bev, short what, void *ctx);

static void
readcb(struct bufferevent *bev, void *ctx)
{
struct bufferevent *partner = ctx;
struct evbuffer *src, *dst;
size_t len;
src = bufferevent_get_input(bev);
len = evbuffer_get_length(src);
if (!partner) {
evbuffer_drain(src, len);
return;
}
dst = bufferevent_get_output(partner);
evbuffer_add_buffer(dst, src);

if (evbuffer_get_length(dst) >= MAX_OUTPUT) {
/* We're giving the other side data faster than it can
* pass it on. Stop reading here until we have drained the
* other side to MAX_OUTPUT/2 bytes. */
bufferevent_setcb(partner, readcb, drained_writecb,
eventcb, bev);
bufferevent_setwatermark(partner, EV_WRITE, MAX_OUTPUT/2,
MAX_OUTPUT);
bufferevent_disable(bev, EV_READ);
}
}

static void
drained_writecb(struct bufferevent *bev, void *ctx)
{
struct bufferevent *partner = ctx;

/* We were choking the other side until we drained our outbuf a bit.
* Now it seems drained. */
bufferevent_setcb(bev, readcb, NULL, eventcb, partner);
bufferevent_setwatermark(bev, EV_WRITE, 0, 0);
if (partner)
bufferevent_enable(partner, EV_READ);
}

static void
close_on_finished_writecb(struct bufferevent *bev, void *ctx)
{
struct evbuffer *b = bufferevent_get_output(bev);

if (evbuffer_get_length(b) == 0) {
bufferevent_free(bev);
}
}

static void
eventcb(struct bufferevent *bev, short what, void *ctx)
{
struct bufferevent *partner = ctx;

if (what & (BEV_EVENT_EOF|BEV_EVENT_ERROR)) {
if (what & BEV_EVENT_ERROR) {
unsigned long err;
while ((err = (bufferevent_get_openssl_error(bev)))) {
const char *msg = (const char*)
ERR_reason_error_string(err);
const char *lib = (const char*)
ERR_lib_error_string(err);
const char *func = (const char*)
ERR_func_error_string(err);
fprintf(stderr,
"%s in %s %s\n", msg, lib, func);
}
if (errno)
perror("connection error");
}

if (partner) {
/* Flush all pending data */
readcb(bev, ctx);

if (evbuffer_get_length(
bufferevent_get_output(partner))) {
/* We still have to flush data from the other
* side, but when that's done, close the other
* side. */
bufferevent_setcb(partner,
NULL, close_on_finished_writecb,
eventcb, NULL);
bufferevent_disable(partner, EV_READ);
} else {
/* We have nothing left to say to the other
* side; close it. */
bufferevent_free(partner);
}
}
bufferevent_free(bev);
}
}

static void
syntax(void)
{
fputs("Syntax:\n", stderr);
fputs(" le-proxy [-s] [-W] <listen-on-addr> <connect-to-addr>\n", stderr);
fputs("Example:\n", stderr);
fputs(" le-proxy 127.0.0.1:8888 1.2.3.4:80\n", stderr);

exit(1);
}

static void
accept_cb(struct evconnlistener *listener, evutil_socket_t fd,
struct sockaddr *a, int slen, void *p)
{
struct bufferevent *b_out, *b_in;
/* Create two linked bufferevent objects: one to connect, one for the
* new connection */
b_in = bufferevent_socket_new(base, fd,
BEV_OPT_CLOSE_ON_FREE|BEV_OPT_DEFER_CALLBACKS);

if (!ssl_ctx || use_wrapper)
b_out = bufferevent_socket_new(base, -1,
BEV_OPT_CLOSE_ON_FREE|BEV_OPT_DEFER_CALLBACKS);
else {
SSL *ssl = SSL_new(ssl_ctx);
b_out = bufferevent_openssl_socket_new(base, -1, ssl,
BUFFEREVENT_SSL_CONNECTING,
BEV_OPT_CLOSE_ON_FREE|BEV_OPT_DEFER_CALLBACKS);
}

assert(b_in && b_out);

if (bufferevent_socket_connect(b_out,
(struct sockaddr*)&connect_to_addr, connect_to_addrlen)<0) {
perror("bufferevent_socket_connect");
bufferevent_free(b_out);
bufferevent_free(b_in);
return;
}

if (ssl_ctx && use_wrapper) {
struct bufferevent *b_ssl;
SSL *ssl = SSL_new(ssl_ctx);
b_ssl = bufferevent_openssl_filter_new(base,
b_out, ssl, BUFFEREVENT_SSL_CONNECTING,
BEV_OPT_CLOSE_ON_FREE|BEV_OPT_DEFER_CALLBACKS);
if (!b_ssl) {
perror("Bufferevent_openssl_new");
bufferevent_free(b_out);
bufferevent_free(b_in);
}
b_out = b_ssl;
}

bufferevent_setcb(b_in, readcb, NULL, eventcb, b_out);
bufferevent_setcb(b_out, readcb, NULL, eventcb, b_in);

bufferevent_enable(b_in, EV_READ|EV_WRITE);
bufferevent_enable(b_out, EV_READ|EV_WRITE);
}

int
main(int argc, char **argv)
{
int i;
int socklen;

int use_ssl = 0;
struct evconnlistener *listener;

if (argc < 3)
syntax();

for (i=1; i < argc; ++i) {
if (!strcmp(argv[i], "-s")) {
use_ssl = 1;
} else if (!strcmp(argv[i], "-W")) {
use_wrapper = 0;
} else if (argv[i][0] == '-') {
syntax();
} else
break;
}

if (i+2 != argc)
syntax();

memset(&listen_on_addr, 0, sizeof(listen_on_addr));
socklen = sizeof(listen_on_addr);
if (evutil_parse_sockaddr_port(argv[i],
(struct sockaddr*)&listen_on_addr, &socklen)<0) {
int p = atoi(argv[i]);
struct sockaddr_in *sin = (struct sockaddr_in*)&listen_on_addr;
if (p < 1 || p > 65535)
syntax();
sin->sin_port = htons(p);
sin->sin_addr.s_addr = htonl(0x7f000001);
sin->sin_family = AF_INET;
socklen = sizeof(struct sockaddr_in);
}

memset(&connect_to_addr, 0, sizeof(connect_to_addr));
connect_to_addrlen = sizeof(connect_to_addr);
if (evutil_parse_sockaddr_port(argv[i+1],
(struct sockaddr*)&connect_to_addr, &connect_to_addrlen)<0)
syntax();

base = event_base_new();
if (!base) {
perror("event_base_new()");
return 1;
}

if (use_ssl) {
int r;
SSL_library_init();
ERR_load_crypto_strings();
SSL_load_error_strings();
OpenSSL_add_all_algorithms();
r = RAND_poll();
if (r == 0) {
fprintf(stderr, "RAND_poll() failed.\n");
return 1;
}
ssl_ctx = SSL_CTX_new(SSLv23_method());
}

listener = evconnlistener_new_bind(base, accept_cb, NULL,
LEV_OPT_CLOSE_ON_FREE|LEV_OPT_CLOSE_ON_EXEC|LEV_OPT_REUSEABLE,
-1, (struct sockaddr*)&listen_on_addr, socklen);

if (! listener) {
fprintf(stderr, "Couldn't open listener.\n");
event_base_free(base);
return 1;
}
event_base_dispatch(base);

evconnlistener_free(listener);
event_base_free(base);

return 0;
}

View Code

 注意看:

static void
readcb(struct bufferevent *bev, void *ctx)
{
struct bufferevent *partner = ctx;
struct evbuffer *src, *dst;
size_t len;
src = bufferevent_get_input(bev);
len = evbuffer_get_length(src);
if (!partner) {
evbuffer_drain(src, len);
return;
}
dst = bufferevent_get_output(partner);
evbuffer_add_buffer(dst, src);

if (evbuffer_get_length(dst) >= MAX_OUTPUT) {
/* We're giving the other side data faster than it can
* pass it on. Stop reading here until we have drained the
* other side to MAX_OUTPUT/2 bytes. */
bufferevent_setcb(partner, readcb, drained_writecb,
eventcb, bev);
bufferevent_setwatermark(partner, EV_WRITE, MAX_OUTPUT/2,
MAX_OUTPUT);
bufferevent_disable(bev, EV_READ);
}
}

  也就是:recv 的数据 远大于 send的数据时候,也就是生产者速率大于消费者速率, 当生产者累计的数据 超过一定数量的时候, 此时不会继续recv 数据;直到 生产者数据被消耗完时,才会重新打开recv 数据。注意 evbuffer 设置的回调函数  会根据不同标志设置不同回调以及何时enable -disable  fd read!!!

 

http代理服务器(3-4-7层代理)-网络事件库公共组件、内核kernel驱动 摄像头驱动 tcpip网络协议栈、netfilter、bridge 好像看过!!!! 但行好事 莫问前程 --身高体重180的胖子

举报

相关推荐

0 条评论