0
点赞
收藏
分享

微信扫一扫

对pipe downstream的思考&分析

花明 2022-06-24 阅读 16

     回到ngx_http_upstream_send_response,如果是buffering,就会进入后面的处理过程,准备一个ngx_event_pipe_t结构的数据,这个结构可以通过upstream的u->pipe进行索引找到。首先设置p->output_filter输出过滤函数为ngx_http_output_filter,用来进行输出过滤并发送数据;将p->upstream 设置为跟后端 u->peer.connection; p->downstream设置为跟客户端的连接;

  之后便是拷贝了u->buffer到p->preread_bufs,这个u->buf是读取上游返回的数据的缓冲区,也就是proxy;这里面有http头部,也可能有body部分;然后便是设置upstream的读回调函数read_event_handler为read_event_handler,跟客户端的连接的写回调函数write_event_handler为ngx_http_upstream_process_downstream;这2个回调函数一个处理跟upstream的读取数据,一个处理跟客户端连接的发送数据,这是重点。设置完后就调用ngx_http_upstream_process_upstream尝试读取upstream的数据。

p = u->pipe;
//设置filter,可以看到就是http的输出filter
p->output_filter = ngx_http_upstream_output_filter;
p->output_ctx = r;
p->tag = u->output.tag;
p->bufs = u->conf->bufs;//设置bufs,它就是upstream中设置的bufs.u == &flcf->upstream;
p->busy_size = u->conf->busy_buffers_size;
p->upstream = u->peer.connection;//赋值跟后端upstream的连接。
p->downstream = c;//赋值跟客户端的连接。
p->pool = r->pool;
p->log = c->log;
p->limit_rate = u->conf->limit_rate;
p->start_sec = ngx_time();

p->cacheable = u->cacheable || u->store;
-------------------------------------------------------
//下面申请一个缓冲链接节点,来存储刚才我们再读取后端的包,为了得到HTTP headers的时候不小心多读取到的数据。
//其实只要FCGI等后端发给后端的包中,有一个包的前半部分是header,后一部分是body,就会有预读数据。

p->preread_bufs = ngx_alloc_chain_link(r->pool);
if (p->preread_bufs == NULL) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
p->preread_bufs->buf = &u->buffer;
p->preread_bufs->next = NULL;
u->buffer.recycled = 1;
p->preread_size = u->buffer.last - u->buffer.pos;
------------------------------------------
ngx_http_upstream_process_upstream

ngx_http_upstream_process_upstream 用来读取后端cgi等数据,然后调用pipe转发到客户端;下面来看看ngx_event_pipe。

 

/*在有buffering的时候,使用event_pipe进行数据的转发,调用 ngx_event_pipe_*
函数读取数据,或者发送数据给客户端。
ngx_event_pipe将upstream响应发送回客户端。do_write代表是否要往客户端发送,写数据。
如果设置了,那么会先发给客户端,再读upstream数据,当然,如果读取了数据,也会调用这里的。
*/
ngx_int_t
ngx_event_pipe(ngx_event_pipe_t *p, ngx_int_t do_write)
{
ngx_int_t rc;
ngx_uint_t flags;
ngx_event_t *rev, *wev;
//不断的用ngx_event_pipe_read_upstream读取客户端数据,然后调用ngx_event_pipe_write_to_downstream
for ( ;; ) {
if (do_write) {// do_write为1,向下游发送响应包体,并检查其返回值
p->log->action = "sending to client";

rc = ngx_event_pipe_write_to_downstream(p);
        // 返回NGX_OK时继续读取上游的响应事件,返回其他值需要终止ngx_event_pipe函数
if (rc == NGX_ABORT) {
return NGX_ABORT;
}

if (rc == NGX_BUSY) {
return NGX_OK;
}
}

p->read = 0;
p->upstream_blocked = 0;

p->log->action = "reading upstream";
//从upstream读取数据到chain的链表里面,然后整块整块的调用input_filter进行协议的解析,
//并将HTTP结果存放在p->in,p->last_in的链表里面。// 从上游读取响应数据
if (ngx_event_pipe_read_upstream(p) == NGX_ABORT) {
return NGX_ABORT;
}
//upstream_blocked是在ngx_event_pipe_read_upstream里面设置的变量,代表是否有数据已经从upstream读取了

//当没有读取到响应数据,并且也不需要暂停读取响应的读取时,跳出当前循环,即不对do_write进行设置
if (!p->read && !p->upstream_blocked) {
break;
}
//还要转发到后端。 因为当读到的响应数据,或者需要暂停读取数据,先给客户端发送响应以释放缓冲区时,设置do_write进行响应的发送
do_write = 1;
}
if (p->upstream->fd != (ngx_socket_t) -1) {
rev = p->upstream->read;
flags = (rev->eof || rev->error) ? NGX_CLOSE_EVENT : 0;
          // 将上游读事件添加到epoll中
if (ngx_handle_read_event(rev, flags) != NGX_OK) {
return NGX_ABORT;
}
if (!rev->delayed) { // 同时设置读事件的超时定时器
if (rev->active && !rev->ready) {
ngx_add_timer(rev, p->read_timeout);

} else if (rev->timer_set) {
ngx_del_timer(rev);
}
}
}
// 将下游的写事件添加到epoll中,并且设置写事件的定时器
if (p->downstream->fd != (ngx_socket_t) -1
&& p->downstream->data == p->output_ctx)
{
wev = p->downstream->write;
if (ngx_handle_write_event(wev, p->send_lowat) != NGX_OK) {
return NGX_ABORT;
}

if (!wev->delayed) {
if (wev->active && !wev->ready) {
ngx_add_timer(wev, p->send_timeout);

} else if (wev->timer_set) {
ngx_del_timer(wev);
}
}
}

return NGX_OK;
}

 

 

 

整体流程是:

ngx_process_cycle循环调用ngx_process_events_and_timers,后者调用ngx_epoll_process_events处理读写事件;
1.c->read->handler = ngx_http_upstream_handler();SOCK连接最基础的读写回调handler,
2.u->read_event_handler = ngx_http_upstream_process_upstream();
3.ngx_event_pipe();
4.ngx_event_pipe_read_upstream() 进入主要读取处理函数。

ngx_event_pipe_read_upstream函数完成下面几个功能:


0.从preread_bufs,free_raw_bufs或者ngx_create_temp_buf寻找一块空闲的或部分空闲的内存;
1.调用p->upstream->recv_chain==ngx_readv_chain,用writev的方式读取FCGI的数据,填充chain。
2.对于整块buf都满了的chain节点调用input_filter(ngx_http_fastcgi_input_filter)进行upstream协议解析,比如FCGI协议,解析后的结果放入p->in里面;
3.对于没有填充满的buffer节点,放入free_raw_bufs以待下次进入时从后面进行追加。
4.当然了,如果对端发送完数据FIN了,那就直接调用input_filter处理free_raw_bufs这块数据


简单来说就是:设置fd 回调,事件被唤醒,循环进行数据读取、解析、保存、转发;然后根据业务场景需求 处理异常逻辑

ngx_event_pipe_t结构,这个结构维护着上下游间转发的响应包体,用于解决内存复制的问题

 

struct ngx_event_pipe_s {
ngx_connection_t *upstream; // 与上游服务器间的连接
ngx_connection_t *downstream; // 与下游客户端间的连接

ngx_chain_t *free_raw_bufs; // 用于接收上游服务器响应的缓冲区链表,新收到的响应向链表头部插入
ngx_chain_t *in; // 接收到上游响应的缓冲区,ngx_event_pipe_copy_input_filter将buffer中的数据设置到in中
ngx_chain_t **last_in; // 指向刚刚接收到的缓冲区

ngx_chain_t *out; // 将要发给客户端的缓冲区链表,
ngx_chain_t *free; // 等待释放的缓冲区
ngx_chain_t *busy; // 表示上次发送响应时未发完的缓冲区链表,下一次发送时会合并到out链表中

/*
* the input filter i.e. that moves HTTP/1.1 chunks
* from the raw bufs to an incoming chain
*/

ngx_event_pipe_input_filter_pt input_filter; // 处理接收到的来自上游服务器的缓冲区,接收响应的处理方法
void *input_ctx; // input_filter函数的参数,通常设置为ngx_http_request_t

ngx_event_pipe_output_filter_pt output_filter; // 向下游发送响应的方法,默认为ngx_http_output_filter
void *output_ctx; // output_filter函数的参数,通常设置为ngx_http_request_t

unsigned read:1; // 为1表示当前已经读到来自上游的响应
unsigned cacheable:1; // 为1时表示启用文件缓存
unsigned single_buf:1; // 为1时表示接收上游的响应时一次只能接收一个ngx_buf_t缓冲区
unsigned free_bufs:1; // 为1时表示当不再接收上游的响应包体时,尽可能快的释放缓冲区
unsigned upstream_done:1; // input_filter中用到的标识位,表示Nginx与上游间的交互已经结束
unsigned upstream_error:1; // 与上游连接出现错误时,将该标识为置为1,比如超时,解析错误等
unsigned upstream_eof:1; // 与上游的连接已经关闭时,该标志位置为1
unsigned upstream_blocked:1; // 表示暂时阻塞读取上游响应的流程,先发送响应,再用释放的缓冲区接收响应
unsigned downstream_done:1; // 为1时表示与下游的交互已经结束
unsigned downstream_error:1; // 与下游连接出现错误时,设置为1
unsigned cyclic_temp_file:1; // 为1时会试图复用临时文件中曾用过的空间

ngx_int_t allocated; // 表示已经分配的缓冲区的数目,其受bufs.num成员的限制
ngx_bufs_t bufs; // 记录了接收上游响应的内存缓冲区的大小,bufs.size记录每个缓冲区大小,bufs.num记录缓冲区个数
ngx_buf_tag_t tag; // 用于设置、比较缓冲区链表中ngx_buf_t结构体的tag标志位

ssize_t busy_size;

off_t read_length; // 已经接收到上游响应包体长度
off_t length; // 表示临时文件的最大长度

off_t max_temp_file_size; // 表示临时文件的最大长度
ssize_t temp_file_write_size; // 表示一次写入文件时的最大长度

ngx_msec_t read_timeout; // 读取上游响应的超时时间
ngx_msec_t send_timeout; // 向下游发送响应的超时时间
ssize_t send_lowat; // 向下游发送响应时,TCP连接中设置的参数

ngx_pool_t *pool; // 用于分配内存缓冲区的连接池对象
ngx_log_t *log; // 用于记录日志的ngx_log_t对象

ngx_chain_t *preread_bufs; // 表示接收上游服务器响应头部的阶段,已经读到的响应包体
size_t preread_size; // 表示接收上游服务器响应头部的阶段,已经读到的响应包体长度
ngx_buf_t *buf_to_file; //

size_t limit_rate; // 发送速率的限制
time_t start_sec; // 连接的启动时间

ngx_temp_file_t *temp_file; // 存放上游响应的临时文件

/* STUB */ int num; // 已经使用的ngx_buf_t的数目
}

 

接收上游响应函数处理

 

ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
{
off_t limit;
ssize_t n, size;
ngx_int_t rc;
ngx_buf_t *b;
ngx_msec_t delay;
ngx_chain_t *chain, *cl, *ln;

if (p->upstream_eof || p->upstream_error || p->upstream_done) {
return NGX_OK;
}
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe read upstream: %d", p->upstream->read->ready);

for ( ;; ) {
      // 检查上游连接是否结束,如果已经结束,不再接收新的响应,跳出循环
if (p->upstream_eof || p->upstream_error || p->upstream_done) {
break;
}
  // 如果preread_bufs为NULL代表读包头时没有读到包体信息或者已经处理完成,ready为0表示没有上游响应可以接收,跳出循环
if (p->preread_bufs == NULL && !p->upstream->read->ready) {
break;
}
    // preread_bufs存放着接收包头时可能读取到的包体信息,如果不为空,则先要优先处理这部分包体信息
if (p->preread_bufs) {
/* use the pre-read bufs if they exist 已经读取的数据 */
chain = p->preread_bufs;// 用chain保存待处理的缓冲区,重置preread_bufs,下次循环则不会再走到该逻辑
p->preread_bufs = NULL;
n = p->preread_size;if (n) {
p->read = 1; // 有待处理的包体信息,将read设置为1,表示接收到的包体待处理
}
} else {
if (p->limit_rate) {
if (p->upstream->read->delayed) {
break;
}

limit = (off_t) p->limit_rate * (ngx_time() - p->start_sec + 1)
- p->read_length;

if (limit <= 0) {
p->upstream->read->delayed = 1;
delay = (ngx_msec_t) (- limit * 1000 / p->limit_rate + 1);
ngx_add_timer(p->upstream->read, delay);
break;
}

} else {
limit = 0;
}
// free_raw_bufs用于表示一次ngx_event_pipe_read_upstream方法调用过程中接收到的上游响应
if (p->free_raw_bufs) {
/* use the free bufs if they exist */
chain = p->free_raw_bufs;
if (p->single_buf) {
p->free_raw_bufs = p->free_raw_bufs->next;
chain->next = NULL;
} else {
p->free_raw_bufs = NULL;
}// 判断当前已分配的缓冲区的数量是否超过了bufs.num,没有超过时可以继续分配
} else if (p->allocated < p->bufs.num) {
/* allocate a new buf if it's still allowed */
b = ngx_create_temp_buf(p->pool, p->bufs.size);
if (b == NULL) {
return NGX_ABORT;
}
p->allocated++;
chain = ngx_alloc_chain_link(p->pool);
if (chain == NULL) {
return NGX_ABORT;
}
chain->buf = b;
chain->next = NULL;
} else if (/*---- 缓冲区已经达到上限,如果写事件的ready为1时表示可以向下游发送响应,而delay为0代表并不是由于限速的原因导致写事件就
当ready为1,且delay为0时,可以向下游发送响应来释放缓冲区了 ----------- * if the bufs are not needed to be saved in a cache and
* a downstream is ready then write the bufs to a downstream
*/
}


n = p->upstream->recv_chain(p->upstream, chain, limit);

ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe recv chain: %z", n);

if (p->free_raw_bufs) { // 将新接收到的缓冲区放置到free_raw_bufs链表的最后
chain->next = p->free_raw_bufs;
}
p->free_raw_bufs = chain;

if (n == NGX_ERROR) {
p->upstream_error = 1;
break;
}
if (n == NGX_AGAIN) {
if (p->single_buf) {
ngx_event_pipe_remove_shadow_links(chain->buf);
}
break;
}
p->read = 1;
if (n == 0) {//没有读到数据,肯定upstream发送了FIN包,那就读取完成了。
p->upstream_eof = 1;
break;
}
}
delay = p->limit_rate ? (ngx_msec_t) n * 1000 / p->limit_rate : 0;
p->read_length += n;
cl = chain;
p->free_raw_bufs = NULL;
/*
首先cl = chain;用cl指向刚刚读取的或预读的数据,n代表数据的大小,
于是循环的将cl指向的链表里面的数据(总大小为n)进行协议解析,我们知道,
这个协议解析会调用FCGI协议或者其他协议的回调的,那就是:input_filter,
对于FCGI是ngx_http_fastcgi_input_filter,其他可能为ngx_event_pipe_copy_input_filter。
FCGI这个回调是在ngx_http_fastcgi_handler函数里面初始化时设置的。
*/
while (cl && n > 0) {/*如果还有链表数据并且长度不为0,也就是这次的还没有处理完
那如果之前保留有一部分数据呢?
不会的,如果之前预读了数据,那么上面的大if语句else里面进不去,
就是此时的n肯定等于preread_bufs的长度preread_size。
//如果之前没有预读数据,但free_raw_bufs不为空,那也没关系,
free_raw_bufs里面的数据肯定已经在下面几行处理过了。
*/
ngx_event_pipe_remove_shadow_links(cl->buf);

size = cl->buf->end - cl->buf->last;

if (n >= size) {//缓冲区已满,需要调用input_filter函数处理
cl->buf->last = cl->buf->end;
/* STUB */ cl->buf->num = p->num++;
// 当前缓冲区已满,需要处理,
//下面的input_filter方法是ngx_event_pipe_copy_input_filter函数,
//其主要在in链表中增加这个缓冲区

if (p->input_filter(p, cl->buf) == NGX_ERROR) {

return NGX_ABORT;
} // 更新待处理的包体的长度,释放已经处理的缓冲区
n -= size;
ln = cl;
cl = cl->next;
//继续处理下一块,并释放这个节点。
ngx_free_chain(p->pool, ln);

} else {//如果这个节点的空闲内存数目大于剩下要处理的,就将剩下的存放在这里。
cl->buf->last += n;
n = 0;
}
}

if (cl) { //将上面没有填满一块内存块的数据链接放到free_raw_bufs的前面。
//注意上面修改了cl->buf->last,后续的读入数据不会覆盖这些数据的
for (ln = cl; ln->next; ln = ln->next) {
/* void foreach last */
}
// 走到这里时cl的链表中一定有缓冲区没有用满(最后一个?),此时cl不为NULL;或者cl的所有缓冲区都已经被处理回收了,此时cl为NULL
ln->next = p->free_raw_bufs;
p->free_raw_bufs = cl;
}

if (delay > 0) {
p->upstream->read->delayed = 1;
ngx_add_timer(p->upstream->read, delay);
break;
}
}


/*upstream数据发送完毕了,那么upstream_eof会被设置为1,在函数最后会进行扫尾工作,
把半满的free_raw_bufs数据进行解析。这里我们可以看到buffering的含义就在这里:
nginx会尽量读取upstream的数据,直到填满一块buffer,由fastcgi_buffers等参数决定的大小,
才会发送给客户端。千万别误解为读取完所有的数据才发送,而是读取了一块buffe*/
if (p->free_raw_bufs && p->length != -1) {
cl = p->free_raw_bufs;

if (cl->buf->last - cl->buf->pos >= p->length) {

p->free_raw_bufs = cl->next;

/* STUB */ cl->buf->num = p->num++;

if (p->input_filter(p, cl->buf) == NGX_ERROR) {
return NGX_ABORT;
}

ngx_free_chain(p->pool, cl);
}
}

if (p->length == 0) {
p->upstream_done = 1;
p->read = 1;
}
// upstream_eof为1时表示上游服务器关闭了连接,upstream_error表示处理过程中出现了错误,而free_raw_bufs不为空代表还有需要处理的包体信息
if ((p->upstream_eof || p->upstream_error) && p->free_raw_bufs) {

/* STUB */ p->free_raw_bufs->buf->num = p->num++;
// 调用input_filter处理剩余的包体信息
if (p->input_filter(p, p->free_raw_bufs->buf) == NGX_ERROR) {
return NGX_ABORT;
}
p->free_raw_bufs = p->free_raw_bufs->next;
// free_bufs为1时代表需要尽快释放缓冲区中用到内存,此时应该调用ngx_pfree尽快释放shadow域为空的缓冲区
if (p->free_bufs && p->buf_to_file == NULL) {
for (cl = p->free_raw_bufs; cl; cl = cl->next) {
if (cl->buf->shadow == NULL) {
ngx_pfree(p->pool, cl->buf->start);
}
}
}
}

if (p->cacheable && (p->in || p->buf_to_file)) {

ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe write chain");

rc = ngx_event_pipe_write_chain_to_temp_file(p);

if (rc != NGX_OK) {
return rc;
}
}

return NGX_OK;
}

接收响应的处理完后,立即就是发送响应处理流程 :

 

ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p)
{
downstream = p->downstream;
flushed = 0;

for ( ;; ) {
if (p->downstream_error) { //往请求端发送出错
//busy, out, in 三个缓冲chain释放 同时释放shadow缓冲并将空闲的buffer加入到pipe中
return ngx_event_pipe_drain_chains(p);
}
// 检查与上游的连接是否结束
if (p->upstream_eof || p->upstream_error || p->upstream_done) {

/* pass the p->out and p->in chains to the output filter */

for (cl = p->busy; cl; cl = cl->next) {
cl->buf->recycled = 0;
}
// 发送out链表中的缓冲区给客户端
if (p->out) {
-------------------
for (cl = p->out; cl; cl = cl->next) {
cl->buf->recycled = 0;
}
rc = p->output_filter(p->output_ctx, p->out);
------------------------------
}

if (p->writing) {//还有往请求端写入的缓冲链
break;
}

if (p->in) { // 发送in链表中的缓冲区给客户端

for (cl = p->in; cl; cl = cl->next) {
cl->buf->recycled = 0;
}
rc = p->output_filter(p->output_ctx, p->in);
--------------------------------------------
}
/* TODO: free unused bufs */
// 标识需要向下游发送的响应已经完成
p->downstream_done = 1;
break;
}
-------------------------
/* bsize is the size of the busy recycled bufs */
prev = NULL;
bsize = 0; // 计算busy缓冲区中待发送的响应长度
for (cl = p->busy; cl; cl = cl->next) {

if (cl->buf->recycled) {
if (prev == cl->buf->start) {
continue;
}
bsize += cl->buf->end - cl->buf->start;
prev = cl->buf->start;
}
}


out = NULL;
// 检查是否超过了busy_size的配置,当超过配置值时跳转至flush处检查和发送out缓冲区
if (bsize >= (size_t) p->busy_size) {
flush = 1;
goto flush;
}

--------------------------

for ( ;; ) {
if (p->out) { // 先检查out链表是否为NULL,不为空则先发送out链表的缓冲区
cl = p->out;

if (cl->buf->recycled) {
ngx_log_error(NGX_LOG_ALERT, p->log, 0,
"recycled buffer in pipe out chain");
}

p->out = p->out->next;

} else if (!p->cacheable && !p->writing && p->in) {
cl = p->in;// 当out链表中的数据被处理完成后,开始处理in链表中的数据

if (cl->buf->recycled && prev_last_shadow) {
if (bsize + cl->buf->end - cl->buf->start > p->busy_size) {
flush = 1;
break;
}
bsize += cl->buf->end - cl->buf->start;
}
prev_last_shadow = cl->buf->last_shadow;
p->in = p->in->next;
} else {
break;
}

cl->next = NULL;

if (out) {
*ll = cl;
} else {
out = cl;
}
ll = &cl->next;
}

flush:

--------------------------------------
// 发送响应给客户端
rc = p->output_filter(p->output_ctx, out);
// 更新free、busy和out缓冲区
ngx_chain_update_chains(p->pool, &p->free, &p->busy, &out, p->tag);

if (rc == NGX_ERROR) {
p->downstream_error = 1;
return ngx_event_pipe_drain_chains(p);
}

for (cl = p->free; cl; cl = cl->next) {
// 遍历free链表中的缓冲区,释放缓冲区中shadow域
----------------------------

/* TODO: free buf if p->free_bufs && upstream done */
/* add the free shadow raw buf to p->free_raw_bufs */
if (cl->buf->last_shadow) {
if (ngx_event_pipe_add_free_buf(p, cl->buf->shadow) != NGX_OK) {
return NGX_ABORT;
}
cl->buf->last_shadow = 0;
}
cl->buf->shadow = NULL;
}
}

return NGX_OK;
}

看完pipe down 发现其主要是解决读取数据包 数据如何缓存拷贝问题。主要是要分清楚 上游读 下游写 交错经行时的数据报文拷贝

 

http代理服务器(3-4-7层代理)-网络事件库公共组件、内核kernel驱动 摄像头驱动 tcpip网络协议栈、netfilter、bridge 好像看过!!!! 但行好事 莫问前程 --身高体重180的胖子

举报

相关推荐

0 条评论