libpq服务端代码存放于src/backend/libpq/pqcomm.c,顶层接口函数如下所示:
* setup/teardown:
* StreamServerPort - Open postmaster's server port
* StreamConnection - Create new connection with client
* StreamClose - Close a client/backend connection
* pq_init - initialize libpq at backend startup
* pq_comm_reset - reset libpq during error recovery
pq_init
pq_init在后端进程启动过程中初始化libpq。其主要是在子进程中的BackendInitialize中调用。注意这里的MyProcPort指向了postmaster传递过来的PORT结构体。
static int ServerLoop(void){
| -- nSockets = initMasks(&readmask);
| -- for (;;){
| -- if (pmState == PM_WAIT_DEAD_END)
| -- else
| -- selres = select(nSockets, &rmask, NULL, NULL, &timeout); // 监听新连接
| -- if (selres > 0)
| -- for (i = 0; i < MAXLISTEN; i++)
| -- port = ConnCreate(ListenSocket[i]);
| -- if (port){
| -- BackendStartup(Port *port)
| -- pid = fork_process();
| -- if (pid == 0){
| -- InitPostmasterChild();
| -- ClosePostmasterPorts(false);
| -- BackendInitialize(Port *port)
| -- MyProcPort = port;
| -- port->remote_host = "";
| -- port->remote_port = "";
| -- pq_init(); /* initialize libpq to talk to client */
| -- whereToSendOutput = DestRemote; /* now safe to ereport to client */
在后端进程,我们以非阻塞模式操作底层套接字(MyProcPort->sock,其实就是Port传递进来的sock),并在需要时使用闩锁来实现阻塞语义。 这使我们能够提供安全的可中断读取和写入。失败时使用 COMMERROR,因为 ERROR 会尝试将错误发送给客户端,这可能需要再次更改模式,从而导致无限递归。In backends (as soon as forked) we operate the underlying socket in nonblocking mode and use latches to implement blocking semantics if needed. That allows us to provide safely interruptible reads and writes. Use COMMERROR on failure, because ERROR would try to send the error to the client, which might require changing the mode again, leading to infinite recursion. 关于闩锁可参考PG服务进程(Postgres)——WaitEventSet。
void pq_init(void) {
PqSendBufferSize = PQ_SEND_BUFFER_SIZE; /* initialize state variables */
PqSendBuffer = MemoryContextAlloc(TopMemoryContext, PqSendBufferSize); // 初始化发送缓冲区
PqSendPointer = PqSendStart = PqRecvPointer = PqRecvLength = 0;
PqCommBusy = false;
PqCommReadingMsg = false;
DoingCopyOut = false;
on_proc_exit(socket_close, 0); /* set up process-exit hook to close the socket */
if (!pg_set_noblock(MyProcPort->sock)) ereport(COMMERROR, (errmsg("could not set socket to nonblocking mode: %m")));
FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, 3);
AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE, MyProcPort->sock, NULL, NULL); // 只监视socket可写事件
AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, -1, MyLatch, NULL);
AddWaitEventToSet(FeBeWaitSet, WL_POSTMASTER_DEATH, -1, NULL, NULL);
}
从上面的代码可以看出WaitEevent模块只监控socket可写事件、LATCH_SET和POSTMASTER_DEATH事件。如下为可能唤醒 WaitLatch()、WaitLatchOrSocket() 或 WaitEventSetWait() 的事件的位掩码。
#define WL_LATCH_SET (1 << 0)
#define WL_SOCKET_READABLE (1 << 1)// 可读事件
#define WL_SOCKET_WRITEABLE (1 << 2)// 可写事件
#define WL_TIMEOUT (1 << 3)/* not for WaitEventSetWait() */
#define WL_POSTMASTER_DEATH (1 << 4)
#define WL_EXIT_ON_PM_DEATH (1 << 5)
#define WL_SOCKET_CONNECTED WL_SOCKET_WRITEABLE /* avoid having to deal with case on platforms not requiring it */
#define WL_SOCKET_MASK (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE | WL_SOCKET_CONNECTED)// socket事件
socket_close函数在后端退出时关闭 libpq,用于注册为pg_on_exit_callback回调函数。
static void socket_close(int code, Datum arg) {
if (MyProcPort != NULL) { /* Nothing to do in a standalone backend, where MyProcPort is NULL. */
#ifdef ENABLE_GSS
...
#endif /* ENABLE_GSS */
/* Cleanly shut down SSL layer. Nowhere else does a postmaster child call this, so this is safe when interrupting BackendInitialize(). */
secure_close(MyProcPort);
/* Formerly we did an explicit close() here, but it seems better to leave the socket open until the process dies. This allows clients to perform a "synchronous close" if they care --- wait till the transport layer reports connection closure, and you can be sure the backend has exited.
* We do set sock to PGINVALID_SOCKET to prevent any further I/O, though. */
MyProcPort->sock = PGINVALID_SOCKET;
}
}
pq_comm_reset
pq_comm_reset函数在postgres后端进程错误处理longjmp点进行调用,用于复位libpq。
// src/include/libpq/libpq.h
typedef struct {
void (*comm_reset) (void);
int (*flush) (void);
int (*flush_if_writable) (void);
bool (*is_send_pending) (void);
int (*putmessage) (char msgtype, const char *s, size_t len);
void (*putmessage_noblock) (char msgtype, const char *s, size_t len);
void (*startcopyout) (void);
void (*endcopyout) (bool errorAbort);
} PQcommMethods;
#define pq_comm_reset() (PqCommMethods->comm_reset())
StreamConnection
StreamConnection函数主要在postmaster进程在为客户端创建PORT时使用。
static int ServerLoop(void){
| -- nSockets = initMasks(&readmask);
| -- for (;;){
| -- if (pmState == PM_WAIT_DEAD_END)
| -- else
| -- selres = select(nSockets, &rmask, NULL, NULL, &timeout); // 监听新连接
| -- if (selres > 0)
| -- for (i = 0; i < MAXLISTEN; i++)
| -- port = ConnCreate(ListenSocket[i]);
| -- if (!(port = (Port *) calloc(1, sizeof(Port))))
| -- if (StreamConnection(serverFd, port) != STATUS_OK)
该函数使用服务器端口创建与客户端的新连接,将 port->sock 设置为新连接的 FD。假设条件是:这不需要是非阻塞的,因为 Postmaster 使用 select() 来判断服务器主套接字何时准备好接受 accept()。返回:STATUS_OK 或 STATUS_ERROR
int StreamConnection(pgsocket server_fd, Port *port) {
port->raddr.salen = sizeof(port->raddr.addr); /* accept connection and fill in the client (remote) address */
if ((port->sock = accept(server_fd, (struct sockaddr *) &port->raddr.addr, &port->raddr.salen)) == PGINVALID_SOCKET){
ereport(LOG,(errcode_for_socket_access(),errmsg("could not accept new connection: %m")));
/* If accept() fails then postmaster.c will still see the server socket as read-ready, and will immediately try again. To avoid uselessly sucking lots of CPU, delay a bit before trying again. (The most likely reason for failure is being out of kernel file table slots; we can do little except hope some will get freed up.) */
pg_usleep(100000L); /* wait 0.1 sec */
return STATUS_ERROR;
}
port->laddr.salen = sizeof(port->laddr.addr); /* fill in the server (local) address */
if (getsockname(port->sock,(struct sockaddr *) &port->laddr.addr,&port->laddr.salen) < 0) {
elog(LOG, "getsockname() failed: %m"); return STATUS_ERROR;
}
if (!IS_AF_UNIX(port->laddr.addr.ss_family)){ /* select NODELAY and KEEPALIVE options if it's a TCP connection */
int on;
#ifdef TCP_NODELAY
on = 1;
if (setsockopt(port->sock, IPPROTO_TCP, TCP_NODELAY, (char *) &on, sizeof(on)) < 0) {
elog(LOG, "setsockopt(%s) failed: %m", "TCP_NODELAY"); return STATUS_ERROR;
}
#endif
on = 1;
if (setsockopt(port->sock, SOL_SOCKET, SO_KEEPALIVE, (char *) &on, sizeof(on)) < 0){
elog(LOG, "setsockopt(%s) failed: %m", "SO_KEEPALIVE"); return STATUS_ERROR;
}
/* Also apply the current keepalive parameters. If we fail to set a
* parameter, don't error out, because these aren't universally
* supported. (Note: you might think we need to reset the GUC
* variables to 0 in such a case, but it's not necessary because the
* show hooks for these variables report the truth anyway.) */
(void) pq_setkeepalivesidle(tcp_keepalives_idle, port);
(void) pq_setkeepalivesinterval(tcp_keepalives_interval, port);
(void) pq_setkeepalivescount(tcp_keepalives_count, port);
(void) pq_settcpusertimeout(tcp_user_timeout, port);
}
return STATUS_OK;
}
pqcomm.c提供的这些例程用于处理前端和后端之间通信的低级细节。他们只是将数据推送到通信通道中,并且对数据的语义一无所知——或者将是,除了旧的 COPY OUT 协议设计中的重大脑损伤。不幸的是,COPY OUT 旨在控制通信通道(它只是传输数据而不将其包装到消息中)。 COPY OUT 正在进行时不能发送其他消息;如果副本被 ereport(ERROR) 中止,我们需要关闭副本,以便前端恢复同步。因此,这些例程必须知道 COPY OUT 状态。 (新的 COPY-OUT 是基于消息的,并且不设置 DoingCopyOut 标志。)
 注意:通常,直接使用 pq_putbytes() 发出传出消息是个坏主意,特别是如果消息需要多次调用才能发送。相反,使用 pqformat.c 中的例程在缓冲区中构造消息,然后在一次调用 pq_putmessage 时发出它。这确保了如果执行在消息中途被 ereport(ERROR) 中止,则通道不会被不完整的消息阻塞。唯一应该直接调用 pq_putbytes 的非 libpq 代码是老式的 COPY OUT。
 曾经,libpq 在前端和后端之间共享,但现在后端的“backend/libpq”与“interfaces/libpq”完全分开。剩下的就是名字的相似性,以诱捕粗心的人……
 These routines handle the low-level details of communication between frontend and backend. They just shove data across the communication channel, and are ignorant of the semantics of the data — or would be, except for major brain damage in the design of the old COPY OUT protocol. Unfortunately, COPY OUT was designed to commandeer the communication channel (it just transfers data without wrapping it into messages). No other messages can be sent while COPY OUT is in progress; and if the copy is aborted by an ereport(ERROR), we need to close out the copy so that the frontend gets back into sync. Therefore, these routines have to be aware of COPY OUT state. (New COPY-OUT is message-based and does not set the DoingCopyOut flag.)
NOTE: generally, it’s a bad idea to emit outgoing messages directly with pq_putbytes(), especially if the message would require multiple calls to send. Instead, use the routines in pqformat.c to construct the message in a buffer and then emit it in one call to pq_putmessage. This ensures that the channel will not be clogged by an incomplete message if execution is aborted by ereport(ERROR) partway through the message. The only non-libpq code that should call pq_putbytes directly is old-style COPY OUT.
At one time, libpq was shared between frontend and backend, but now the backend’s “backend/libpq” is quite separate from “interfaces/libpq”. All that remains is similarities of names to trap the unwary…
                










