
TCP服务器
锁:Lock.hpp
代码
#pragma once
#include <iostream>
#include <pthread.h>
class Mutex
{
public:
Mutex()
{
pthread_mutex_init(&lock_, nullptr);
}
void lock()
{
pthread_mutex_lock(&lock_);
}
void unlock()
{
pthread_mutex_unlock(&lock_);
}
~Mutex()
{
pthread_mutex_destroy(&lock_);
}
private:
pthread_mutex_t lock_;
};
class LockGuard
{
public:
LockGuard(Mutex *mutex) : mutex_(mutex)
{
mutex_->lock();
std::cout << "加锁成功..." << std::endl;
}
~LockGuard()
{
mutex_->unlock();
std::cout << "解锁成功...." << std::endl;
}
private:
Mutex *mutex_;
};
介绍
守护进程:daemonize.hpp
代码
#pragma once
#include <cstdio>
#include <iostream>
#include <signal.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
void daemonize()
{
int fd = 0;
signal(SIGPIPE, SIG_IGN);
if (fork() > 0)
exit(1);
setsid();
if ((fd = open("/dev/null", O_RDWR)) != -1)
{
dup2(fd, STDIN_FILENO);
dup2(fd, STDOUT_FILENO);
dup2(fd, STDERR_FILENO);
if(fd > STDERR_FILENO) close(fd);
}
}
说明
- 具体来说,这段代码中的 daemonize() 函数实现了以下步骤:
- 忽略 SIGPIPE 信号:
- 更改进程的工作目录:
- 不要成为进程组组长:
- 设置独立会话:
- 重定向标准输入、输出和错误:
- 关闭不需要的文件描述符:
日志文件:log.hpp
代码
#pragma once
#include <cstdio>
#include <ctime>
#include <cstdarg>
#include <cassert>
#include <cassert>
#include <cstring>
#include <cerrno>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#define DEBUG 0
#define NOTICE 1
#define WARINING 2
#define FATAL 3
const char *log_level[] = {"DEBUG", "NOTICE", "WARINING", "FATAL"};
#define LOGFILE "serverTcp.log"
void logMessage(int level, const char *format, ...)
{
assert(level >= DEBUG);
assert(level <= FATAL);
char *name = getenv("USER");
char logInfo[1024];
va_list ap;
va_start(ap, format);
vsnprintf(logInfo, sizeof(logInfo) - 1, format, ap);
va_end(ap);
umask(0);
int fd = open(LOGFILE, O_WRONLY | O_CREAT | O_APPEND, 0666);
assert(fd >= 0);
FILE *out = (level == FATAL) ? stderr : stdout;
dup2(fd, 1);
dup2(fd, 2);
fprintf(out, "%s | %u | %s | %s\n",
log_level[level],
(unsigned int)time(nullptr),
name == nullptr ? "unknow" : name,
logInfo);
fflush(out);
fsync(fd);
close(fd);
}
说明
任务处理 Task.hpp
代码
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include <pthread.h>
#include "log.hpp"
class Task
{
public:
using callback_t = std::function<void (int, std::string, uint16_t)>;
private:
int sock_;
uint16_t port_;
std::string ip_;
callback_t func_;
public:
Task():sock_(-1), port_(-1)
{}
Task(int sock, std::string ip, uint16_t port, callback_t func)
: sock_(sock), ip_(ip), port_(port), func_(func)
{}
void operator () ()
{
logMessage(DEBUG, "线程ID[%p]处理%s:%d的请求 开始啦...",\
pthread_self(), ip_.c_str(), port_);
func_(sock_, ip_, port_);
logMessage(DEBUG, "线程ID[%p]处理%s:%d的请求 结束啦...",\
pthread_self(), ip_.c_str(), port_);
}
~Task()
{}
};
说明
线程池 ThreadPool.hpp
代码
#pragma once
#include <iostream>
#include <cassert>
#include <queue>
#include <memory>
#include <cstdlib>
#include <pthread.h>
#include <unistd.h>
#include <sys/prctl.h>
#include "Lock.hpp"
using namespace std;
int gThreadNum = 15;
template <class T>
class ThreadPool
{
private:
ThreadPool(int threadNum = gThreadNum) : threadNum_(threadNum), isStart_(false)
{
assert(threadNum_ > 0);
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&cond_, nullptr);
}
ThreadPool(const ThreadPool<T> &) = delete;
void operator=(const ThreadPool<T>&) = delete;
public:
static ThreadPool<T> *getInstance()
{
static Mutex mutex;
if (nullptr == instance)
{
LockGuard lockguard(&mutex);
if (nullptr == instance)
{
instance = new ThreadPool<T>();
}
}
return instance;
}
static void *threadRoutine(void *args)
{
pthread_detach(pthread_self());
ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);
while (1)
{
tp->lockQueue();
while (!tp->haveTask())
{
tp->waitForTask();
}
T t = tp->pop();
tp->unlockQueue();
t();
}
}
void start()
{
assert(!isStart_);
for (int i = 0; i < threadNum_; i++)
{
pthread_t temp;
pthread_create(&temp, nullptr, threadRoutine, this);
}
isStart_ = true;
}
void push(const T &in)
{
lockQueue();
taskQueue_.push(in);
choiceThreadForHandler();
unlockQueue();
}
~ThreadPool()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&cond_);
}
int threadNum()
{
return threadNum_;
}
private:
void lockQueue() { pthread_mutex_lock(&mutex_); }
void unlockQueue() { pthread_mutex_unlock(&mutex_); }
bool haveTask() { return !taskQueue_.empty(); }
void waitForTask() { pthread_cond_wait(&cond_, &mutex_); }
void choiceThreadForHandler() { pthread_cond_signal(&cond_); }
T pop()
{
T temp = taskQueue_.front();
taskQueue_.pop();
return temp;
}
private:
bool isStart_;
int threadNum_;
queue<T> taskQueue_;
pthread_mutex_t mutex_;
pthread_cond_t cond_;
static ThreadPool<T> *instance;
};
template <class T>
ThreadPool<T> *ThreadPool<T>::instance = nullptr;
说明
客户端 TCPClient.cc
代码
#include "util.hpp"
volatile bool quit = false;
static void Usage(std::string proc)
{
std::cerr << "Usage:\n\t" << proc << " serverIp serverPort" << std::endl;
std::cerr << "Example:\n\t" << proc << " 127.0.0.1 8081\n"
<< std::endl;
}
int main(int argc, char *argv[])
{
if (argc != 3)
{
Usage(argv[0]);
exit(USAGE_ERR);
}
std::string serverIp = argv[1];
uint16_t serverPort = atoi(argv[2]);
int sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock < 0)
{
std::cerr << "socket: " << strerror(errno) << std::endl;
exit(SOCKET_ERR);
}
struct sockaddr_in server;
memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(serverPort);
inet_aton(serverIp.c_str(), &server.sin_addr);
if (connect(sock, (const struct sockaddr *)&server, sizeof(server)) != 0)
{
std::cerr << "connect: " << strerror(errno) << std::endl;
exit(CONN_ERR);
}
std::cout << "info : connect success: " << sock << std::endl;
std::string message;
while (!quit)
{
message.clear();
std::cout << "请输入你的消息>>> ";
std::getline(std::cin, message);
if (strcasecmp(message.c_str(), "quit") == 0)
quit = true;
ssize_t s = write(sock, message.c_str(), message.size());
if (s > 0)
{
message.resize(1024);
ssize_t s = read(sock, (char *)(message.c_str()), 1024);
if (s > 0)
message[s] = 0;
std::cout << "Server Echo>>> " << message << std::endl;
}
else if (s <= 0)
{
break;
}
}
close(sock);
return 0;
}
说明
服务器 TCPServer.cc
代码
#include "util.hpp"
#include "Task.hpp"
#include "Threadpool.hpp"
#include "daemonize.hpp"
#include <signal.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <pthread.h>
class ServerTcp;
void transService(int sock, const std::string &clientIp, uint16_t clientPort)
{
assert(sock >= 0);
assert(!clientIp.empty());
assert(clientPort >= 1024);
char inbuffer[BUFFER_SIZE];
while (true)
{
ssize_t s = read(sock, inbuffer, sizeof(inbuffer) - 1);
if (s > 0)
{
inbuffer[s] = '\0';
if (strcasecmp(inbuffer, "quit") == 0)
{
logMessage(DEBUG, "client quit -- %s[%d]", clientIp.c_str(), clientPort);
break;
}
logMessage(DEBUG, "trans before: %s[%d]>>> %s", clientIp.c_str(), clientPort, inbuffer);
for (int i = 0; i < s; i++)
{
if (isalpha(inbuffer[i]) && islower(inbuffer[i]))
inbuffer[i] = toupper(inbuffer[i]);
}
logMessage(DEBUG, "trans after: %s[%d]>>> %s", clientIp.c_str(), clientPort, inbuffer);
write(sock, inbuffer, strlen(inbuffer));
}
else if (s == 0)
{
logMessage(DEBUG, "client quit -- %s[%d]", clientIp.c_str(), clientPort);
break;
}
else
{
logMessage(DEBUG, "%s[%d] - read: %s", clientIp.c_str(), clientPort, strerror(errno));
break;
}
}
close(sock);
logMessage(DEBUG, "server close %d done", sock);
}
void execCommand(int sock, const std::string &clientIp, uint16_t clientPort)
{
assert(sock >= 0);
assert(!clientIp.empty());
assert(clientPort >= 1024);
char command[BUFFER_SIZE];
while (true)
{
ssize_t s = read(sock, command, sizeof(command) - 1);
if (s > 0)
{
command[s] = '\0';
logMessage(DEBUG, "[%s:%d] exec [%s]", clientIp.c_str(), clientPort, command);
std::string safe = command;
if((std::string::npos != safe.find("rm")) || (std::string::npos != safe.find("unlink")))
{
break;
}
FILE *fp = popen(command, "r");
if(fp == nullptr)
{
logMessage(WARINING, "exec %s failed, beacuse: %s", command, strerror(errno));
break;
}
char line[1024];
while(fgets(line, sizeof(line)-1, fp) != nullptr)
{
write(sock, line, strlen(line));
}
pclose(fp);
logMessage(DEBUG, "[%s:%d] exec [%s] ... done", clientIp.c_str(), clientPort, command);
}
else if (s == 0)
{
logMessage(DEBUG, "client quit -- %s[%d]", clientIp.c_str(), clientPort);
break;
}
else
{
logMessage(DEBUG, "%s[%d] - read: %s", clientIp.c_str(), clientPort, strerror(errno));
break;
}
}
close(sock);
logMessage(DEBUG, "server close %d done", sock);
}
class ThreadData
{
public:
uint16_t clientPort_;
std::string clinetIp_;
int sock_;
ServerTcp *this_;
public:
ThreadData(uint16_t port, std::string ip, int sock, ServerTcp *ts)
: clientPort_(port), clinetIp_(ip), sock_(sock), this_(ts)
{
}
};
class ServerTcp
{
public:
ServerTcp(uint16_t port, const std::string &ip = "")
: port_(port),
ip_(ip),
listenSock_(-1),
tp_(nullptr)
{
}
~ServerTcp()
{
}
public:
void init()
{
listenSock_ = socket(PF_INET, SOCK_STREAM, 0);
if (listenSock_ < 0)
{
logMessage(FATAL, "socket: %s", strerror(errno));
exit(SOCKET_ERR);
}
logMessage(DEBUG, "socket: %s, %d", strerror(errno), listenSock_);
struct sockaddr_in local;
memset(&local, 0, sizeof local);
local.sin_family = PF_INET;
local.sin_port = htons(port_);
ip_.empty() ? (local.sin_addr.s_addr = INADDR_ANY) : (inet_aton(ip_.c_str(), &local.sin_addr));
if (bind(listenSock_, (const struct sockaddr *)&local, sizeof local) < 0)
{
logMessage(FATAL, "bind: %s", strerror(errno));
exit(BIND_ERR);
}
logMessage(DEBUG, "bind: %s, %d", strerror(errno), listenSock_);
if (listen(listenSock_, 5 ) < 0)
{
logMessage(FATAL, "listen: %s", strerror(errno));
exit(LISTEN_ERR);
}
logMessage(DEBUG, "listen: %s, %d", strerror(errno), listenSock_);
tp_ = ThreadPool<Task>::getInstance();
}
void loop()
{
tp_->start();
logMessage(DEBUG, "thread pool start success, thread num: %d", tp_->threadNum());
while (true)
{
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
int serviceSock = accept(listenSock_, (struct sockaddr *)&peer, &len);
if (serviceSock < 0)
{
logMessage(WARINING, "accept: %s[%d]", strerror(errno), serviceSock);
continue;
}
uint16_t peerPort = ntohs(peer.sin_port);
std::string peerIp = inet_ntoa(peer.sin_addr);
logMessage(DEBUG, "accept: %s | %s[%d], socket fd: %d",
strerror(errno), peerIp.c_str(), peerPort, serviceSock);
Task t(serviceSock, peerIp, peerPort, execCommand);
tp_->push(t);
}
}
private:
int listenSock_;
uint16_t port_;
std::string ip_;
ThreadPool<Task> *tp_;
};
static void Usage(std::string proc)
{
std::cerr << "Usage:\n\t" << proc << " port ip" << std::endl;
std::cerr << "example:\n\t" << proc << " 8080 127.0.0.1\n"
<< std::endl;
}
int main(int argc, char *argv[])
{
if (argc != 2 && argc != 3)
{
Usage(argv[0]);
exit(USAGE_ERR);
}
uint16_t port = atoi(argv[1]);
std::string ip;
if (argc == 3)
ip = argv[2];
daemonize();
ServerTcp svr(port, ip);
svr.init();
svr.loop();
return 0;
}
说明
头文件包 util.hpp
代码
#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <cstdlib>
#include <cassert>
#include <ctype.h>
#include <unistd.h>
#include <strings.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "log.hpp"
#define SOCKET_ERR 1
#define BIND_ERR 2
#define LISTEN_ERR 3
#define USAGE_ERR 4
#define CONN_ERR 5
#define BUFFER_SIZE 1024
Makefile
.PHONY:all
all:TCPClient TCPServer
TCPClient:TCPClient.cc
g++ -o $@ $^ -std=c++11 -lpthread
TCPServer:TCPServer.cc
g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
rm -f TCPClient TCPServer serverTcp.log