0
点赞
收藏
分享

微信扫一扫

nginx&http 第四章 ngx http代理 upstream input filter

upstream直接转发响应时的具体处理流程,主要是上面subrequest_memory为1的场景,此时该请求属于一个子请求。
分析默认的input_filter的处理方法,如果读取包头时同时读到了包体信息,会调用input_filter方法处理:

/*
将u->buffer.last - u->buffer.pos之间的数据放到u->out_bufs发送缓冲去链表里面。这样可写的时候就会发送给客户端。
ngx_http_upstream_process_non_buffered_request函数会读取out_bufs里面的数据,然后调用输出过滤链接进行发送的。
*/ //buffering方式,为ngx_http_fastcgi_input_filter 非buffering方式为ngx_http_upstream_non_buffered_filter
static ngx_int_t
ngx_http_upstream_non_buffered_filter(void *data, ssize_t bytes)
{
ngx_http_request_t *r = data;

ngx_buf_t *b;
ngx_chain_t *cl, **ll;
ngx_http_upstream_t *u;

u = r->upstream;

for (cl = u->out_bufs, ll = &u->out_bufs; cl; cl = cl->next) { //遍历u->out_bufs
ll = &cl->next;
}

cl = ngx_chain_get_free_buf(r->pool, &u->free_bufs);//分配一个空闲的chain buff
if (cl == NULL) {
return NGX_ERROR;
}

*ll = cl; //将新申请的缓存链接进来。

cl->buf->flush = 1;
cl->buf->memory = 1;

b = &u->buffer; //去除将要发送的这个数据,应该是客户端的返回数据体。将其放入

cl->buf->pos = b->last;
b->last += bytes;
cl->buf->last = b->last;
cl->buf->tag = u->output.tag;

if (u->length == -1) {//u->length表示将要发送的数据大小如果为-1,则说明后端协议并没有指定需要发送的大小(例如chunk方式),此时我们只需要发送我们接收到的.
return NGX_OK;
}

u->length -= bytes;//更新将要发送的数据大小

return NGX_OK;
}

继续向下分析,process_header调用input_filter处理完包体后,最后调用的函数时ngx_http_upstream_process_body_in_memory,
该函数实际上会接收上游服务器的包体内容

static void ngx_http_upstream_process_body_in_memory(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
......
// 获取到上游的连接信息
c = u->peer.connection;
// 获取该连接的读事件,判断是否发生了读事件的超时,如果超时,则直接结束连接
rev = c->read;
if (rev->timedout) {
ngx_connection_error(c, NGX_ETIMEDOUT, "upstream timed out");
ngx_http_upstream_finalize_request(r, u, NGX_HTTP_GATEWAY_TIME_OUT);
return;
}
// buffer为存储上游响应包体的缓冲区
b = &u->buffer;

for ( ;; ) {
// 计算剩余空闲缓冲区的大小
size = b->end - b->last;
......
// 如果还有空闲的空间,调用recv方法继续读取响应
n = c->recv(c, b->last, size);
// 此处NGX_AGAIN代表需要等待下一次的读事件
if (n == NGX_AGAIN) {
break;
}
// 如果上游主动关闭连接,或者读取出现错误,则直接关闭连接
if (n == 0 || n == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, n);
return;
}
// 更新读到的响应包体的长度
u->state->response_length += n;
// 处理读到的包体内容
if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}

if (!rev->ready) {
break;
}
}
// 如果包体长度没有设置,则可以直接结束请求了
if (u->length == 0) {
ngx_http_upstream_finalize_request(r, u, 0);
return;
}
// 将读事件增加到Epoll中
if (ngx_handle_read_event(rev, 0) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
// 将读事件同时添加到定时器中,超时时间为配置的read_timeout,避免长时间等待
if (rev->active) {
ngx_add_timer(rev, u->conf->read_timeout);

} else if (rev->timer_set) {
ngx_del_timer(rev);
}
}

上面流程很容易看出一个问题,那就是读取响应头的Buffer的空间可能不足,导致处理出现问题。使用时关键还在于Input_filter方法中对buffer的管理。
分析完不转发响应的过程后,继续看一下转发响应的两种实现方式,下游网速优先和上游网速优先的实现。上游网速优先的方式,实现较为复杂,先看一下下游网速优先的方式,即采用固定的内存大小,作为响应的缓冲区。代码上也删减不必要的逻辑。

不管上游网速优先还是下游网速优先,响应的转发都是通过ngx_http_upstream_send_response函数进行的

static void ngx_http_upstream_send_response(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
// 发送设置到r->headers_out中的响应头部
rc = ngx_http_send_header(r);
......
// 如果客户端的请求携带了包体,且包体已经保存到了临时文件中,则清理临时文件,前面分析过了
if (r->request_body && r->request_body->temp_file) {
ngx_pool_run_cleanup_file(r->pool, r->request_body->temp_file->file.fd);
r->request_body->temp_file->file.fd = NGX_INVALID_FILE;
}
......
// buffering为1时走的是上游网速优先的流程,为0时走的是下游网速优先的流程
if (!u->buffering) {
......
return ;
}

/* TODO: preallocate event_pipe bufs, look "Content-Length" */

// pipe的内存在upstream启动时已经分配了,这里直接使用,对pipe进行初始化
p = u->pipe;
// 设置向下游发送响应的方法
p->output_filter = (ngx_event_pipe_output_filter_pt) ngx_http_output_filter;
// 将pipe的output_ctx指向ngx_http_request_t结构,后续传入的参数都是pipe,通过pipe->output_ctx找到ngx_http_request_t
p->output_ctx = r;
// 设置转发响应时启用的每个缓冲区的tag标志位
p->tag = u->output.tag;
// bufs指定了内存缓冲区的限制
p->bufs = u->conf->bufs;
// 设置busy缓冲区中待发送的响应长度触发值
p->busy_size = u->conf->busy_buffers_size;
// upstream指向nginx与上游服务器的连接
p->upstream = u->peer.connection;
// downstream指向nginx与客户端之间的连接
p->downstream = c;
// 初始化用于分配内存缓冲区的内存池
p->pool = r->pool;
// 初始化用于记录日志的log成员
p->log = c->log;
// 初始化速率阀值
p->limit_rate = u->conf->limit_rate;
// 记录当前的时间
p->start_sec = ngx_time();
// 记录是否进行文件缓存
p->cacheable = u->cacheable || u->store;
// 申请临时文件结构
p->temp_file = ngx_pcalloc(r->pool, sizeof(ngx_temp_file_t));
if (p->temp_file == NULL) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
// 初始化临时文件的结构信息
p->temp_file->file.fd = NGX_INVALID_FILE;
p->temp_file->file.log = c->log;
p->temp_file->path = u->conf->temp_path;
p->temp_file->pool = r->pool;
......
// 设置临时存放上游响应的单个缓存文件的最大长度
p->max_temp_file_size = u->conf->max_temp_file_size;
// 设置一次写入临时文件时写入的最大长度
p->temp_file_write_size = u->conf->temp_file_write_size;
// 申请预读缓冲区链表,该链表的缓冲区不会分配内存来存放上游的响应内容,而用ngx_buf_t指向实际存放包体的内容
p->preread_bufs = ngx_alloc_chain_link(r->pool);
if (p->preread_bufs == NULL) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
// 初始化预读缓冲区的链表,(预读是在读取包头时,同时读到了包体的情况)
p->preread_bufs->buf = &u->buffer;
p->preread_bufs->next = NULL;
u->buffer.recycled = 1;
p->preread_size = u->buffer.last - u->buffer.pos;
.......
// 设置读取上游服务器响应的超时时间
p->read_timeout = u->conf->read_timeout;
// 设置发送到下游客户端的超时时间
p->send_timeout = clcf->send_timeout;
// 设置向客户端发送响应时TCP的send_lowat选项
p->send_lowat = clcf->send_lowat;
.......
// 设置处理上游读事件的回调
u->read_event_handler = ngx_http_upstream_process_upstream;
// 设置处理下游写事件的回调
r->write_event_handler = ngx_http_upstream_process_downstream;
// 处理上游发来的响应包体
ngx_http_upstream_process_upstream(r, u);
}

不管是读取上游的响应事件process_upstream,还是向客户端写数据的process_downstream,最终都是通过ngx_event_pipe实现缓存转发响应的

/*
这是在有buffering的情况下使用的函数。
ngx_http_upstream_send_response 调用这里发动一下数据读取。以后有数据可读的时候也会调用这里的读取后端数据。设置到了u->read_event_handler了。
*/
static void
ngx_http_upstream_process_upstream(ngx_http_request_t *r,
ngx_http_upstream_t *u)
//buffring模式通过ngx_http_upstream_process_upstream该函数处理,非buffring模式通过ngx_http_upstream_process_non_buffered_downstream处理
{ //注意走到这里的时候,后端发送的头部行信息已经在前面的ngx_http_upstream_send_response->ngx_http_send_header已经把头部行部分发送给客户端了
ngx_event_t *rev;
ngx_event_pipe_t *p;
ngx_connection_t *c;

c = u->peer.connection;
p = u->pipe;
rev = c->read;

ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
"http upstream process upstream");

c->log->action = "reading upstream";

if (rev->timedout) { //在发送请求到后端的时候,我们需要等待对方应答,因此设置了读超时定时器,见ngx_http_upstream_send_request

if (rev->delayed) {

rev->timedout = 0;
rev->delayed = 0;

if (!rev->ready) {
ngx_add_timer(rev, p->read_timeout, NGX_FUNC_LINE);

if (ngx_handle_read_event(rev, 0, NGX_FUNC_LINE) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
}

return;
}

if (ngx_event_pipe(p, 0) == NGX_ABORT) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}

} else {
p->upstream_error = 1;
ngx_connection_error(c, NGX_ETIMEDOUT, "upstream timed out");
}

} else {//请求没有超时,那么对后端,处理一下读事件。ngx_event_pipe开始处理

if (rev->delayed) {

ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
"http upstream delayed");

if (ngx_handle_read_event(rev, 0, NGX_FUNC_LINE) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
}

return;
}

if (ngx_event_pipe(p, 0) == NGX_ABORT) { //注意这里的do_write为0
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
}
//注意走到这里的时候,后端发送的头部行信息已经在前面的ngx_http_upstream_send_response->ngx_http_send_header已经把头部行部分发送给客户端了
//该函数处理的只是后端放回过来的网页包体部分

ngx_http_upstream_process_request(r, u);
}

 

/*
在有buffering的时候,使用event_pipe进行数据的转发,调用ngx_event_pipe_write_to_downstream函数读取数据,或者发送数据给客户端。
ngx_event_pipe将upstream响应发送回客户端。do_write代表是否要往客户端发送,写数据。
如果设置了,那么会先发给客户端,再读upstream数据,当然,如果读取了数据,也会调用这里的。
*/ //ngx_event_pipe->ngx_event_pipe_write_to_downstream
ngx_int_t
ngx_event_pipe(ngx_event_pipe_t *p, ngx_int_t do_write)
{//注意走到这里的时候,后端发送的头部行信息已经在前面的ngx_http_upstream_send_response->ngx_http_send_header已经把头部行部分发送给客户端了
//该函数处理的只是后端放回过来的网页包体部分
ngx_int_t rc;
ngx_uint_t flags;
ngx_event_t *rev, *wev;

//这个for循环是不断的用ngx_event_pipe_read_upstream读取客户端数据,然后调用ngx_event_pipe_write_to_downstream
for ( ;; ) {
if (do_write) { //注意这里的do_write,为1是先写后读,以此循环。为0是先读后写,以此循环
p->log->action = "sending to client";

rc = ngx_event_pipe_write_to_downstream(p);


if (rc == NGX_ABORT) {
return NGX_ABORT;
}

if (rc == NGX_BUSY) {
return NGX_OK;
}
}

p->read = 0;
p->upstream_blocked = 0;

p->log->action = "reading upstream";

//从upstream读取数据到chain的链表里面,然后整块整块的调用input_filter进行协议的解析,并将HTTP结果存放在p->in,p->last_in的链表里面。
if (ngx_event_pipe_read_upstream(p) == NGX_ABORT) {
return NGX_ABORT;
}

/* 非cachable方式下,指定内存用完了数据还没有读完的情况下,或者是后端包体读取完毕,则会从这里返回,其他情况下都会在这里面一直循环 */

//p->read的值可以参考ngx_event_pipe_read_upstream->p->upstream->recv_chain()->ngx_readv_chain里面是否赋值为0
//upstream_blocked是在ngx_event_pipe_read_upstream里面设置的变量,代表是否有数据已经从upstream读取了。
if (!p->read && !p->upstream_blocked) { //内核缓冲区数据已经读完,或者本地指定内存已经用完,则推出
break; //读取后端返回NGX_AGAIN则read置0
}

do_write = 1;//还要写。因为我这次读到了一些数据
}

if (p->upstream->fd != (ngx_socket_t) -1) {
rev = p->upstream->read;

flags = (rev->eof || rev->error) ? NGX_CLOSE_EVENT : 0;

//得到这个连接的读写事件结构,如果其发生了错误,那么将其读写事件注册删除掉,否则保存原样。
if (ngx_handle_read_event(rev, flags, NGX_FUNC_LINE) != NGX_OK) {
return NGX_ABORT;
}

if (!rev->delayed) {
if (rev->active && !rev->ready) {//没有读写数据了,那就设置一个读超时定时器
ngx_add_timer(rev, p->read_timeout, NGX_FUNC_LINE); //本轮读取后端数据完毕,添加超时定时器,继续读,如果时间到还没数据,表示超时

} else if (rev->timer_set) {
/*
这里删除的定时器是发送数据到后端后,需要等待后端应答,在
ngx_http_upstream_send_request->ngx_add_timer(c->read, u->conf->read_timeout, NGX_FUNC_LINE); 中添加的定时器
*/
ngx_del_timer(rev, NGX_FUNC_LINE);
}
}
}

if (p->downstream->fd != (ngx_socket_t) -1
&& p->downstream->data == p->output_ctx)
{
wev = p->downstream->write;
if (ngx_handle_write_event(wev, p->send_lowat, NGX_FUNC_LINE) != NGX_OK) {
return NGX_ABORT;
}

if (!wev->delayed) {
if (wev->active && !wev->ready) { //想客户端的写超时设置
ngx_add_timer(wev, p->send_timeout, NGX_FUNC_LINE);

} else if (wev->timer_set) {
ngx_del_timer(wev, NGX_FUNC_LINE);
}
}
}

return NGX_OK;
}

/*
1.从preread_bufs,free_raw_bufs或者ngx_create_temp_buf寻找一块空闲的或部分空闲的内存;
2.调用p->upstream->recv_chain==ngx_readv_chain,用writev的方式读取FCGI的数据,填充chain。
3.对于整块buf都满了的chain节点调用input_filter(ngx_http_fastcgi_input_filter)进行upstream协议解析,比如FCGI协议,解析后的结果放入p->in里面;
4.对于没有填充满的buffer节点,放入free_raw_bufs以待下次进入时从后面进行追加。
5.当然了,如果对端发送完数据FIN了,那就直接调用input_filter处理free_raw_bufs这块数据。
*/
/*
buffering方式,读数据前首先开辟一块大空间,在ngx_event_pipe_read_upstream->ngx_readv_chain中开辟一个ngx_buf_t(buf1)结构指向读到的数据,
然后在读取数据到in链表的时候,在ngx_http_fastcgi_input_filter会重新创建一个ngx_buf_t(buf1),这里面设置buf1->shadow=buf2->shadow
buf2->shadow=buf1->shadow。同时把buf2添加到p->in中。当通过ngx_http_write_filter发送数据的时候会把p->in中的数据添加到ngx_http_request_t->out,然后发送,
如果一次没有发送完成,则属于的数据会留在ngx_http_request_t->out中,由写事件触发再次发送。当数据通过p->output_filter(p->output_ctx, out)发送后,buf2
会被添加到p->free中,buf1会被添加到free_raw_bufs中,见ngx_event_pipe_write_to_downstream
*/
static ngx_int_t
ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
//ngx_event_pipe_write_to_downstream写数据到客户端,ngx_event_pipe_read_upstream从后端读取数据
{//ngx_event_pipe调用这里读取后端的数据。
off_t limit;
ssize_t n, size;
ngx_int_t rc;
ngx_buf_t *b;
ngx_msec_t delay;
ngx_chain_t *chain, *cl, *ln;
int upstream_eof = 0;
int upstream_error = 0;
int single_buf = 0;
int leftsize = 0;
int upstream_done = 0;
ngx_chain_t *free_raw_bufs = NULL;

if (p->upstream_eof || p->upstream_error || p->upstream_done) {
return NGX_OK;
}

ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe read upstream, read ready: %d", p->upstream->read->ready);

for ( ;; ) {
//数据读取完毕,或者出错,直接退出循环
if (p->upstream_eof || p->upstream_error || p->upstream_done) {
break;
}

//如果没有预读数据,并且跟upstream的连接还没有read,那就可以退出了,因为没数据可读。
if (p->preread_bufs == NULL && !p->upstream->read->ready) { //如果后端协议栈数据读取完毕,返回NGX_AGAIN,则ready会置0
break;
}

/*
下面这个大的if-else就干一件事情: 寻找一块空闲的内存缓冲区,用来待会存放读取进来的upstream的数据。
如果preread_bufs不为空,就先用之,否则看看free_raw_bufs有没有,或者申请一块
*/
if (p->preread_bufs) {

/* use the pre-read bufs if they exist */

chain = p->preread_bufs;
p->preread_bufs = NULL;
n = p->preread_size;

ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe preread: %z", n); //这是读取头部行等信息的时候顺带读取到的包体长度

if (n) {
p->read = 1; //表示有读到客户端包体
}
} else {
if (p->limit_rate) {
if (p->upstream->read->delayed) {
break;
}

limit = (off_t) p->limit_rate * (ngx_time() - p->start_sec + 1)
- p->read_length;

if (limit <= 0) {
p->upstream->read->delayed = 1;
delay = (ngx_msec_t) (- limit * 1000 / p->limit_rate + 1);
ngx_add_timer(p->upstream->read, delay, NGX_FUNC_LINE);
break;
}

} else {
limit = 0;
}

if (p->free_raw_bufs) { //上次分配了chain->buf后,调用ngx_readv_chain读取数据的时候返回NGX_AGAIN,则这次新的epoll读事件触发后,直接使用上次没有用的chain来重新读取数据
//当后面的n = p->upstream->recv_chain返回NGX_AGAIN,下次epoll再次触发读的时候,直接用free_raw_bufs

/* use the free bufs if they exist */

chain = p->free_raw_bufs;
if (p->single_buf) { //如果设置了NGX_USE_AIO_EVENT标志, the posted aio operation may currupt a shadow buffer
p->free_raw_bufs = p->free_raw_bufs->next;
chain->next = NULL;
} else { //如果不是AIO,那么可以用多块内存一次用readv读取的。
p->free_raw_bufs = NULL;
}

} else if (p->allocated < p->bufs.num) {

/* allocate a new buf if it's still allowed */
/*
如果没有超过fastcgi_buffers等指令的限制,那么申请一块内存吧。因为现在没有空闲内存了。
allocate a new buf if it's still allowed申请一个ngx_buf_t以及size大小的数据。用来存储从FCGI读取的数据。
*/
b = ngx_create_temp_buf(p->pool, p->bufs.size);
if (b == NULL) {
return NGX_ABORT;
}

p->allocated++;

chain = ngx_alloc_chain_link(p->pool);
if (chain == NULL) {
return NGX_ABORT;
}

chain->buf = b;
chain->next = NULL;

} else if (!p->cacheable
&& p->downstream->data == p->output_ctx
&& p->downstream->write->ready
&& !p->downstream->write->delayed)
{
//没有开启换成,并且前面已经开辟了5个3Kbuf已经都开辟了,不能在分配空间了
//到这里,那说明没法申请内存了,但是配置里面没要求必须先保留在cache里,那我们可以吧当前的数据发送给客户端了。跳出循环进行write操作,然后就会空余处空间来继续读。
/*
* if the bufs are not needed to be saved in a cache and
* a downstream is ready then write the bufs to a downstream
*/

p->upstream_blocked = 1;

ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe downstream ready");

break;

} else if (p->cacheable
|| p->temp_file->offset < p->max_temp_file_size) //如果后端内容超过了max_temp_file_size,则不缓存

/* 当前fastcgi_buffers 和fastcgi_buffer_size配置的空间都已经用完了,则需要把读取到(就是fastcgi_buffers 和
fastcgi_buffer_size指定的空间中保存的读取数据)的数据写道临时文件中去 */

{//必须缓存,而且当前的缓存文件的位移,其大小小于可允许的大小,那good,可以写入文件了。
//这里可以看出,在开启cache的时候,只有前面的fastcgi_buffers 5 3K都已经用完了,才会写入临时文件中去//下面将r->in的数据写到临时文件
/*
* if it is allowed, then save some bufs from p->in
* to a temporary file, and add them to a p->out chain
*/

rc = ngx_event_pipe_write_chain_to_temp_file(p);//下面将r->in的数据写到临时文件

ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe temp offset: %O", p->temp_file->offset);

if (rc == NGX_BUSY) {
break;
}

if (rc == NGX_AGAIN) {
if (ngx_event_flags & NGX_USE_LEVEL_EVENT
&& p->upstream->read->active
&& p->upstream->read->ready)
{
if (ngx_del_event(p->upstream->read, NGX_READ_EVENT, 0)
== NGX_ERROR)
{
return NGX_ABORT;
}
}
}

if (rc != NGX_OK) {
return rc;
}

chain = p->free_raw_bufs;
if (p->single_buf) {
p->free_raw_bufs = p->free_raw_bufs->next;
chain->next = NULL;
} else {
p->free_raw_bufs = NULL;
}

} else {

/* there are no bufs to read in */

ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
"no pipe bufs to read in");

break;
}

//到这里,肯定是找到空闲的buf了,chain指向之了。ngx_readv_chain .调用readv不断的读取连接的数据。放入chain的链表里面这里的
//chain是不是只有一块? 其next成员为空呢,不一定,如果free_raw_bufs不为空,上面的获取空闲buf只要没有使用AIO的话,就可能有多个buffer链表的。
//注意:这里面只是把读到的数据放入了chain->buf中,但是没有移动尾部last指针,实际上该函数返回后pos和last都还是指向读取数据的头部的
n = p->upstream->recv_chain(p->upstream, chain, limit); //chain->buf空间用来存储recv_chain从后端接收到的数据

leftsize = chain->buf->end - chain->buf->last;
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe recv chain: %z, left-size:%d", n, leftsize);

if (p->free_raw_bufs) { //free_raw_bufs不为空,那就将chain指向的这块放到free_raw_bufs头部。
chain->next = p->free_raw_bufs;
}
p->free_raw_bufs = chain; //把读取到的存有后端数据的chain赋值给free_raw_bufs

if (n == NGX_ERROR) {
p->upstream_error = 1;
return NGX_ERROR;
}

if (n == NGX_AGAIN) { //循环回去通过epoll读事件触发继续读,一般都是把内核缓冲区数据读完后从这里返回
if (p->single_buf) {
ngx_event_pipe_remove_shadow_links(chain->buf);
}

single_buf = p->single_buf;
/*
2025/04/27 00:40:55[ ngx_readv_chain, 179] [debug] 22653#22653: *3 readv() not ready (11: Resource temporarily unavailable)
2025/04/27 00:40:55[ ngx_event_pipe_read_upstream, 337] [debug] 22653#22653: *3 pipe recv chain: -2
*/
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
"ngx_event_pipe_read_upstream recv return ngx_again, single_buf:%d ", single_buf);
break; //当下次epoll再次触发读的时候,就直接使用p->free_raw_bufs
}

p->read = 1; //表示有读到数据,并且可以继续读

if (n == 0) {
p->upstream_eof = 1;//upstream_eof表示内核协议栈已经读取完毕,内核协议栈已经没有数据了,需要再次epoll触发读操作
break; //跳出循环
}
} //从上面for循环刚开始的if (p->preread_bufs) {到这里,都在寻找一个空闲的缓冲区,然后读取数据填充chain。够长的。
//读取了数据,下面要进行FCGI协议解析,保存了。

delay = p->limit_rate ? (ngx_msec_t) n * 1000 / p->limit_rate : 0;

p->read_length += n; //表示读取到的后端包体部分数据长度增加n字节
cl = chain; //cl保存了前面ngx_readv_chain的时候读取的数据
p->free_raw_bufs = NULL;

while (cl && n > 0) {

//下面的函数将c->buf中用shadow指针连接起来的链表中所有节点的recycled,temporary,shadow成员置空。
ngx_event_pipe_remove_shadow_links(cl->buf);

/* 前面的n = p->upstream->recv_chain()读取数据后,没有移动last指针,实际上该函数返回后pos和last都还是指向读取数据的头部的 */
size = cl->buf->end - cl->buf->last; //buf中剩余的空间

if (n >= size) { //读取的数据比第一块cl->buf(也就是chain->buf)多,说明读到的数据可以把第一个buf塞满
cl->buf->last = cl->buf->end; //把这坨全部用了,readv填充了数据。

/* STUB */ cl->buf->num = p->num++; //第几块,cl链中(cl->next)中的第几块

//主要功能就是解析fastcgi格式包体,解析出包体后,把对应的buf加入到p->in
//FCGI为ngx_http_fastcgi_input_filter,其他为ngx_event_pipe_copy_input_filter 。用来解析特定格式数据
if (p->input_filter(p, cl->buf) == NGX_ERROR) { //整块buffer的调用协议解析句柄
//这里面,如果cl->buf这块数据解析出来了DATA数据,那么cl->buf->shadow成员指向一个链表,
//通过shadow成员链接起来的链表,每个成员就是零散的fcgi data数据部分。

return NGX_ABORT;
}

n -= size;

//继续处理下一块,并释放这个节点。
ln = cl;
cl = cl->next;
ngx_free_chain(p->pool, ln);

} else { //说明本次读到的n字节数据不能装满一个buf,则移动last指针,同时返回出去继续读

//如果这个节点的空闲内存数目大于剩下要处理的,就将剩下的存放在这里。 通过后面的if (p->free_raw_bufs && p->length != -1){}执行p->input_filter(p, cl->buf)
/*
啥意思,不用调用input_filter了吗,不是。是这样的,如果剩下的这块数据还不够塞满当前这个cl的缓存大小,
那就先存起来,怎么存呢: 别释放cl了,只是移动其大小,然后n=0使循环退出。然后在下面几行的if (cl) {里面可以检测到这种情况
于是在下面的if里面会将这个ln处的数据放入free_raw_bufs的头部。不过这里会有多个连接吗? 可能有的。
*/
cl->buf->last += n;
n = 0;
}
}

if (cl) {
//将上面没有填满一块内存块的数据链接放到free_raw_bufs的前面。注意上面修改了cl->buf->last,后续的读入数据不会
//覆盖这些数据的。看ngx_readv_chain然后继续读
for (ln = cl; ln->next; ln = ln->next) { /* void */ }

ln->next = p->free_raw_bufs;
p->free_raw_bufs = cl;
}

if (delay > 0) {
p->upstream->read->delayed = 1;
ngx_add_timer(p->upstream->read, delay, NGX_FUNC_LINE);
break;
}
}//注意这里是for循环,只有满足p->upstream_eof || p->upstream_error || p->upstream_done才推出

#if (NGX_DEBUG)

for (cl = p->busy; cl; cl = cl->next) {
ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe buf busy s:%d t:%d f:%d "
"%p, pos %p, size: %z "
"file: %O, size: %O",
(cl->buf->shadow ? 1 : 0),
cl->buf->temporary, cl->buf->in_file,
cl->buf->start, cl->buf->pos,
cl->buf->last - cl->buf->pos,
cl->buf->file_pos,
cl->buf->file_last - cl->buf->file_pos);
}

for (cl = p->out; cl; cl = cl->next) {
ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe buf out s:%d t:%d f:%d "
"%p, pos %p, size: %z "
"file: %O, size: %O",
(cl->buf->shadow ? 1 : 0),
cl->buf->temporary, cl->buf->in_file,
cl->buf->start, cl->buf->pos,
cl->buf->last - cl->buf->pos,
cl->buf->file_pos,
cl->buf->file_last - cl->buf->file_pos);
}

for (cl = p->in; cl; cl = cl->next) {
ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe buf in s:%d t:%d f:%d "
"%p, pos %p, size: %z "
"file: %O, size: %O",
(cl->buf->shadow ? 1 : 0),
cl->buf->temporary, cl->buf->in_file,
cl->buf->start, cl->buf->pos,
cl->buf->last - cl->buf->pos,
cl->buf->file_pos,
cl->buf->file_last - cl->buf->file_pos);
}

for (cl = p->free_raw_bufs; cl; cl = cl->next) {
ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe buf free s:%d t:%d f:%d "
"%p, pos %p, size: %z "
"file: %O, size: %O",
(cl->buf->shadow ? 1 : 0),
cl->buf->temporary, cl->buf->in_file,
cl->buf->start, cl->buf->pos,
cl->buf->last - cl->buf->pos,
cl->buf->file_pos,
cl->buf->file_last - cl->buf->file_pos);
}

upstream_eof = p->upstream_eof;
upstream_error = p->upstream_error;
free_raw_bufs = p->free_raw_bufs;
upstream_done = p->upstream_done;
ngx_log_debug5(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe length: %O, p->upstream_eof:%d, p->upstream_error:%d, p->free_raw_bufs:%p, upstream_done:%d",
p->length, upstream_eof, upstream_error, free_raw_bufs, upstream_done);

#endif

if (p->free_raw_bufs && p->length != -1) { //注意前面已经把读取到的chain数据加入到了free_raw_bufs
cl = p->free_raw_bufs;

if (cl->buf->last - cl->buf->pos >= p->length) { //包体读取完毕

p->free_raw_bufs = cl->next;

/* STUB */ cl->buf->num = p->num++;

//主要功能就是解析fastcgi格式包体,解析出包体后,把对应的buf加入到p->in
//FCGI为ngx_http_fastcgi_input_filter,其他为ngx_event_pipe_copy_input_filter 。用来解析特定格式数据
if (p->input_filter(p, cl->buf) == NGX_ERROR) {
return NGX_ABORT;
}

ngx_free_chain(p->pool, cl);
}
}

if (p->length == 0) { //后端页面包体数据读取完毕或者本来就没有包体,把upstream_done置1
p->upstream_done = 1;
p->read = 1;
}

//upstream_eof表示内核协议栈已经读取完毕,内核协议栈已经没有数据了,需要再次epoll触发读操作 //注意前面已经把读取到的chain数据加入到了free_raw_bufs
if ((p->upstream_eof || p->upstream_error) && p->free_raw_bufs) {//没办法了,都快到头了,或者出现错误了,所以处理一下这块不完整的buffer

/* STUB */ p->free_raw_bufs->buf->num = p->num++;
//如果数据读取完毕了,或者后端出现问题了,并且,free_raw_bufs不为空,后面还有一部分数据,
//当然只可能有一块。那就调用input_filter处理它。FCGI为ngx_http_fastcgi_input_filter 在ngx_http_fastcgi_handler里面设置的

//这里考虑一种情况: 这是最后一块数据了,没满,里面没有data数据,所以ngx_http_fastcgi_input_filter会调用ngx_event_pipe_add_free_buf函数,
//将这块内存放入free_raw_bufs的前面,可是君不知,这最后一块不存在数据部分的内存正好等于free_raw_bufs,因为free_raw_bufs还没来得及改变。
//所以,就把自己给替换掉了。这种情况会发生吗?
if (p->input_filter(p, p->free_raw_bufs->buf) == NGX_ERROR) {
return NGX_ABORT;
}

p->free_raw_bufs = p->free_raw_bufs->next;

if (p->free_bufs && p->buf_to_file == NULL) {
for (cl = p->free_raw_bufs; cl; cl = cl->next) {
if (cl->buf->shadow == NULL) {
//这个shadow成员指向由我这块buf产生的小FCGI数据块buf的指针列表。如果为NULL,就说明这块buf没有data,可以释放了。
ngx_pfree(p->pool, cl->buf->start);
}
}
}
}

if (p->cacheable && (p->in || p->buf_to_file)) {

ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0, "pipe write chain");

if (ngx_event_pipe_write_chain_to_temp_file(p) == NGX_ABORT) {
return NGX_ABORT;
}
}

upstream_done = p->upstream_done;
if(upstream_done)
ngx_log_debugall(p->log, 0, "pipe read upstream upstream_done:%d", upstream_done);

return NGX_OK;
}

/*
buffering方式,读数据前首先开辟一块大空间,在ngx_event_pipe_read_upstream->ngx_readv_chain中开辟一个ngx_buf_t(buf1)结构指向读到的数据,
然后在读取数据到in链表的时候,在ngx_http_fastcgi_input_filter会重新创建一个ngx_buf_t(buf1),这里面设置buf1->shadow=buf2->shadow
buf2->shadow=buf1->shadow。同时把buf2添加到p->in中。当通过ngx_http_write_filter发送数据的时候会把p->in中的数据添加到ngx_http_request_t->out,然后发送,
如果一次没有发送完成,则属于的数据会留在ngx_http_request_t->out中,由写事件触发再次发送。当数据通过p->output_filter(p->output_ctx, out)发送后,buf2
会被添加到p->free中,buf1会被添加到free_raw_bufs中,见ngx_event_pipe_write_to_downstream
*/
//ngx_event_pipe_write_to_downstream写数据到客户端,ngx_event_pipe_read_upstream从后端读取数据
static ngx_int_t
ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p)
{//ngx_event_pipe调用这里进行数据发送给客户端,数据已经准备在p->out,p->in里面了。
u_char *prev;
size_t bsize;
ngx_int_t rc;
ngx_uint_t flush, flushed, prev_last_shadow;
ngx_chain_t *out, **ll, *cl;
ngx_connection_t *downstream;

downstream = p->downstream; //与客户端的连接信息

ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe write downstream, write ready: %d", downstream->write->ready);

flushed = 0;

for ( ;; ) {
if (p->downstream_error) { //如果客户端连接出错了。drain=排水;流干,
return ngx_event_pipe_drain_chains(p);//清空upstream发过来的,解析过格式后的HTML数据。将其放入free_raw_bufs里面。
}

/*
ngx_event_pipe_write_to_downstream
if (p->upstream_eof || p->upstream_error || p->upstream_done) {
p->output_filter(p->output_ctx, p->out);
}
*/

//upstream_eof表示内核缓冲区数据已经读完 如果upstream的连接已经关闭了,或出问题了,或者发送完毕了,那就可以发送了。
if (p->upstream_eof || p->upstream_error || p->upstream_done) {
//实际上在接受完后端数据后,在想客户端发送包体部分的时候,会两次调用该函数,一次是ngx_event_pipe_write_to_downstream-> p->output_filter(),
//另一次是ngx_http_upstream_finalize_request->ngx_http_send_special,

/* pass the p->out and p->in chains to the output filter */

for (cl = p->busy; cl; cl = cl->next) {
cl->buf->recycled = 0;//不需要回收重复利用了,因为upstream_done了,不会再给我发送数据了。
}


/*
发送缓存文件中内容到客户端过程:
ngx_http_file_cache_open->ngx_http_file_cache_read->ngx_http_file_cache_aio_read这个流程获取文件中前面的头部信息相关内容,并获取整个
文件stat信息,例如文件大小等。
头部部分在ngx_http_cache_send->ngx_http_send_header发送,
缓存文件后面的包体部分在ngx_http_cache_send后半部代码中触发在filter模块中发送

接收后端数据并转发到客户端触发数据发送过程:
ngx_event_pipe_write_to_downstream中的
if (p->upstream_eof || p->upstream_error || p->upstream_done) {
遍历p->in 或者遍历p->out,然后执行输出
p->output_filter(p->output_ctx, p->out);
}
*/
//如果没有开启缓存,数据不会写入临时文件中,p->out = NULL
if (p->out) { //和临时文件相关,如果换成存在与临时文件中,走这里
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe write downstream flush out");

for (cl = p->out; cl; cl = cl->next) {
cl->buf->recycled = 0;
}

//下面,因为p->out的链表里面一块块都是解析后的后端服务器页面数据,所以直接调用ngx_http_output_filter进行数据发送就行了。
//注意: 没有发送完毕的数据会保存到ngx_http_request_t->out中,HTTP框架会触发再次把r->out写出去,而不是存在p->out中的
rc = p->output_filter(p->output_ctx, p->out);

if (rc == NGX_ERROR) {
p->downstream_error = 1;
return ngx_event_pipe_drain_chains(p);
}

p->out = NULL;
}

//ngx_event_pipe_read_upstream读取数据后通过ngx_http_fastcgi_input_filter把读取到的数据加入到p->in链表
//如果开启缓存,则数据写入临时文件中,p->in=NULL
if (p->in) { //跟out同理。简单调用ngx_http_output_filter进入各个filter发送过程中。
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe write downstream flush in");

for (cl = p->in; cl; cl = cl->next) {
cl->buf->recycled = 0; //已经是最后的了,不需要回收了
}

//注意下面的发送不是真的writev了,得看具体情况比如是否需要recycled,是否是最后一块等。ngx_http_write_filter会判断这个的。
rc = p->output_filter(p->output_ctx, p->in);//调用ngx_http_output_filter发送,最后一个是ngx_http_write_filter

if (rc == NGX_ERROR) {
p->downstream_error = 1;
return ngx_event_pipe_drain_chains(p);
}

p->in = NULL; //在执行上面的output_filter()后,p->in中的数据会添加到r->out中
}

ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0, "pipe write downstream done");

/* TODO: free unused bufs */

p->downstream_done = 1;
break; //这里会退出循环
}

//否则upstream数据还没有发送完毕。
if (downstream->data != p->output_ctx
|| !downstream->write->ready
|| downstream->write->delayed)
{
break;
}

/* bsize is the size of the busy recycled bufs */

prev = NULL;
bsize = 0;

//这里遍历需要busy这个正在发送,已经调用过output_filter的buf链表,计算一下那些可以回收重复利用的buf
//计算这些buf的总容量,注意这里不是计算busy中还有多少数据没有真正writev出去,而是他们总共的最大容量
for (cl = p->busy; cl; cl = cl->next) {

if (cl->buf->recycled) {
if (prev == cl->buf->start) {
continue;
}

bsize += cl->buf->end - cl->buf->start; //计算还没有发送出去的ngx_buf_t所指向所有空间的大小
prev = cl->buf->start;
}
}

ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe write busy: %uz", bsize);

out = NULL;

//busy_size为fastcgi_busy_buffers_size 指令设置的大小,指最大待发送的busy状态的内存总大小。
//如果大于这个大小,nginx会尝试去发送新的数据并回收这些busy状态的buf。
if (bsize >= (size_t) p->busy_size) {
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, p->log, 0,
"break while(), bsize:%uz >= (size_t) p->busy_size: %uz", bsize, (size_t) p->busy_size);
flush = 1;
goto flush;
}

flush = 0;
ll = NULL;
prev_last_shadow = 1; //标记上一个节点是不是正好是一块FCGI buffer的最后一个数据节点。

//遍历p->out,p->in里面的未发送数据,将他们放到out链表后面,注意这里发送的数据不超过busy_size因为配置限制了。
for ( ;; ) {
//循环,这个循环的终止后,我们就能获得几块HTML数据节点,并且他们跨越了1个以上的FCGI数据块的并以最后一块带有last_shadow结束。
if (p->out) { //buf到tempfile的数据会放到out里面。一次read后端服务端数据返回NGX_AGIAN后开始发送缓存中的内容
//说明数据缓存到了临时文件中
cl = p->out;

if (cl->buf->recycled) {
ngx_log_error(NGX_LOG_ALERT, p->log, 0,
"recycled buffer in pipe out chain");
}

p->out = p->out->next;

} else if (!p->cacheable && p->in) { //说明数据时缓存到内存中的
cl = p->in;

ngx_log_debug3(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe write buf ls:%d %p %z",
cl->buf->last_shadow, //
cl->buf->pos,
cl->buf->last - cl->buf->pos);


//1.对于在in里面的数据,如果其需要回收;
//2.并且又是某一块大FCGI buf的最后一个有效html数据节点;
//3.而且当前的没法送的大小大于busy_size, 那就需要回收一下了,因为我们有buffer机制
if (cl->buf->recycled && prev_last_shadow) {
if (bsize + cl->buf->end - cl->buf->start > p->busy_size) {
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, p->log, 0,
"break while(), bsize + cl->buf->end - cl->buf->start:%uz > p->busy_size: %uz",
bsize, (size_t) p->busy_size);
flush = 1;//超过了大小,标记一下待会是需要真正发送的。不过这个好像没发挥多少作用,因为后面不怎么判断、
break;//停止处理后面的内存块,因为这里已经大于busy_size了。
}

bsize += cl->buf->end - cl->buf->start;
}

prev_last_shadow = cl->buf->last_shadow;

p->in = p->in->next;

} else {
break; //一般
}

cl->next = NULL;

if (out) {
*ll = cl;
} else {
out = cl;//指向第一块数据
}
ll = &cl->next;
}

//到这里后,out指针指向一个链表,其里面的数据是从p->out,p->in来的要发送的数据。见ngx_http_output_filter
flush:

ngx_log_debug2(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe write: out:%p, flush:%d", out, flush);

//下面将out指针指向的内存调用output_filter,进入filter过程。
//如果后端数据有写入临时文件,则out=NULL,只有在获取到全部后端数据并写入临时文件后,才会通过前面的if (p->upstream_eof || p->upstream_error || p->upstream_done) {p->output_filter()}发送出去
if (out == NULL) { //在下面的ngx_chain_update_chains中有可能置为NULL,表示out链上的数据发送完毕

if (!flush) {
break;
}

/* a workaround for AIO */
if (flushed++ > 10) { //最多循环10次,从而可以达到异步的效果,避免在里面反复循环
return NGX_BUSY;
}
}

rc = p->output_filter(p->output_ctx, out);//简单调用ngx_http_output_filter进入各个filter发送过程中。

/*
读数据前首先开辟一块大空间,在ngx_event_pipe_read_upstream->ngx_readv_chain中开辟一个ngx_buf_t(buf1)结构指向读到的数据,
然后在读取数据到in链表的时候,在ngx_http_fastcgi_input_filter会重新创建一个ngx_buf_t(buf1),这里面设置buf1->shadow=buf2->shadow
buf2->shadow=buf1->shadow。同时把buf2添加到p->in中。当通过ngx_http_write_filter发送数据的时候会把p->in中的数据添加到ngx_http_request_t->out,然后发送,
如果一次没有发送完成,则剩余的数据会留在p->out中。当数据通过p->output_filter(p->output_ctx, out)发送后,buf2会被添加到p->free中,
buf1会被添加到free_raw_bufs中,见ngx_event_pipe_write_to_downstream
*/
//将没有全部发送的buf(last != end)加入到busy,已经全部处理了的buf(end = last)放入free中
//实际上p->busy最终指向的是ngx_http_write_filter中未发送完的r->out中保存的数据,见ngx_http_write_filter
/*实际上p->busy最终指向的是ngx_http_write_filter中未发送完的r->out中保存的数据,这部分数据始终在r->out的最前面,后面在读到数据后在
ngx_http_write_filter中会把新来的数据加到r->out后面,也就是未发送的数据在r->out前面新数据在链后面,所以实际write是之前未发送的先发送出去*/
ngx_chain_update_chains(p->pool, &p->free, &p->busy, &out, p->tag);

if (rc == NGX_ERROR) {
p->downstream_error = 1;
return ngx_event_pipe_drain_chains(p);
}

for (cl = p->free; cl; cl = cl->next) {

if (cl->buf->temp_file) {
if (p->cacheable || !p->cyclic_temp_file) {
continue;
}

/* reset p->temp_offset if all bufs had been sent */

if (cl->buf->file_last == p->temp_file->offset) {
p->temp_file->offset = 0;
}
}

/* TODO: free buf if p->free_bufs && upstream done */

/* add the free shadow raw buf to p->free_raw_bufs */

if (cl->buf->last_shadow) {
if (ngx_event_pipe_add_free_buf(p, cl->buf->shadow) != NGX_OK) { //配合参考ngx_http_fastcgi_input_filter阅读
//也就是在读取后端数据的时候创建的ngx_buf_t(读取数据时创建的第一个ngx_buf_t)放入free_raw_bufs
return NGX_ABORT;
}

cl->buf->last_shadow = 0;
}

cl->buf->shadow = NULL;
}
}

return NGX_OK;
}

 

http代理服务器(3-4-7层代理)-网络事件库公共组件、内核kernel驱动 摄像头驱动 tcpip网络协议栈、netfilter、bridge 好像看过!!!! 但行好事 莫问前程 --身高体重180的胖子

举报

相关推荐

0 条评论