Introduction
After 3 days of study and code, today I will explain to you the source code TinyServer
.
In my opion, TinyServer functions like modifying some files with file descriptors.
We have to make the thread pool for this program, and have a knowledge about thread lock.
Basic knowledge
File descriptor
what exactly is FD in Linux? 1
what exactly is FD in Linux? 2
A file descriptor (file descriptor) is an index created by the kernel to **efficiently manage**
these open files. It is a non-negative integer (usually a small integer) used to represent open files. All system calls to perform I/O operations are implemented through a file descriptor.
Epoll
Epoll reference article 1
Wikipedia Epoll
The full name of epoll is eventpoll, which is an implementation of Linux kernel to implement IO multiplexing (IO multiplexing). IO multiplexing means listening to multiple input and output sources at the same time in one operation, returning when one or more of the input and output sources are available, and then performing read and write operations on them.
Thread lock
Lock: an object that can only be owned by a single thread at any given time (C++ class std::mutex). Operations on a lock:
- lock: mark the lock as owned by the current thread; if some other thread already owns the lock then first wait until the lock is free. Lock typically includes a queue to keep track of waiting threads.
- unlock: mark the lock as free (it must currently be owned by the calling thread).
Too much milk solution with locks (using C++ library APIs):
std::mutex mutex;
...
mutex.lock();
if (milk == 0) {
buy_milk();
}
mutex.unlock();
Producer / Consumer
- Producers add characters to a buffer
- Consumers remove characters from the buffer
- Characters will be removed in the same order added
Version 1
class Pipe {
Pipe() {}
void put(char c);
char get();
std::mutex mutex;
char buffer[SIZE];
int count = 0;
int nextPut = 0;
int nextGet = 0;
};
void Pipe::put(char c) {
mutex.lock();
count++;
buffer[nextPut] = c;
nextPut++;
if (nextPut == SIZE) {
nextPut = 0;
}
mutex.unlock();
}
char Pipe::get() {
char c;
mutex.lock();
count--;
c = buffer[nextGet];
nextGet++;
if (nextGet == SIZE) {
nextGet = 0;
}
mutex.unlock();
return c;
}
Version2 handle full and empty situations
class Pipe {
Pipe() {}
void put(char c);
char get();
std::mutex mutex;
char buffer[SIZE];
int count = 0;
int nextPut = 0;
int nextGet = 0;
};
void Pipe::put(char c) {
mutex.lock();
while (count == SIZE) {
mutex.unlock();
mutex.lock();
}
count++;
buffer[nextPut] = c;
nextPut++;
if (nextPut == SIZE) {
nextPut = 0;
}
mutex.unlock();
}
char Pipe::get() {
char c;
mutex.lock();
while (count == 0) {
mutex.unlock();
mutex.lock();
}
count--;
c = buffer[nextGet];
nextGet++;
if (nextGet == SIZE) {
nextGet = 0;
}
mutex.unlock();
return c;
}
Version3 with condition variables
- notify_one(): if any threads are waiting on condition, wake up one of them.
- notify_all(): same as notify, except wake up all waiting threads.
class Pipe {
Pipe() {}
void put(char c);
char get();
std::mutex mutex;
std::condition_variable charAdded, charRemoved;
char buffer[SIZE];
int count = 0;
int nextPut = 0;
int nextGet = 0;
};
void Pipe::put(char c) {
mutex.lock();
while (count == SIZE) {
charRemoved.wait(mutex);
}
count++;
buffer[nextPut] = c;
nextPut++;
if (nextPut == SIZE) {
nextPut = 0;
}
charAdded.notify_one();
mutex.unlock();
}
char Pipe::get() {
char c;
mutex.lock();
while (count == 0) {
charAdded.wait(mutex);
}
count--;
c = buffer[nextGet];
nextGet++;
if (nextGet == SIZE) {
nextGet = 0;
}
charRemoved.notify_one();
mutex.unlock();
return c;
}
Thread pool(C++11)
The easy thread pool main function are as follows:
#include <vector>
#include <queue>
using namespace std;
class ThreadPool{
public:
void startup();
void QueueJob(const std::function<void()& job);
void stop();
void busy();
private:
void ThreadLoop();
bool terminate = false;
mutex queue_mutex;
vector<thread> threads;
queue<function<void()>> jobs;
}
1. startup
Each thread should be running its own infinite loop, constantly waiting for new tasks to grab and run.
void ThreadPool::start()
{
// Max # of threads the system supports
const uint32_t num_threads = std::thread::hardware_concurrency();
threads.resize(num_threads);
for(uint32_t i = 0; i < num_threads; i++)
threads.at(i) = std::thread(ThreadLoop);
}
2. ThreadLoop
The infinite loop function. This is a while(true)
loop waiting for the task queue to open up.
void ThreadPool::ThreadLoop() {
while (true) {
std::function<void()> job;
{
std::unique_lock<std::mutex> lock(queue_mutex);
mutex_condition.wait(lock, [this] {
return !jobs.empty() || should_terminate;
});
if (should_terminate) {
return;
}
job = jobs.front();
jobs.pop();
}
job();
}
}
3. QueueJob
Add a new job to the pool; use a lock so that there isn’t a data race.
void ThreadPool::QueueJob(const std::function<void()>& job) {
{
std::unique_lock<std::mutex> lock(queue_mutex);
jobs.push(job);
}
mutex_condition.notify_one();
}
4. busy
The busy()
function can be used in a while loop, such that the main thread can wait the threadpool to complete all the tasks before calling the threadpool destructor.
void ThreadPool::busy() {
bool poolbusy;
{
std::unique_lock<std::mutex> lock(queue_mutex);
poolbusy = jobs.empty();
}
return poolbusy;
}
5. stop
void ThreadPool::Stop() {
{
std::unique_lock<std::mutex> lock(queue_mutex);
should_terminate = true;
}
mutex_condition.notify_all();
for (std::thread& active_thread : threads) {
active_thread.join();
}
threads.clear();
}
Analysis source code
suggested reading order:
Basic function codeMutexLock.h
encapsulates from pthread_mutex
epoll.h/epoll.cpp
encapsulates from sys/epoll.h
Utils.h/Utils.cpp
the basic server functions in these files.Timer.h/Timer.cpp
Log.h/Log.cpp
Output the data to Terminal with colors.
**Advanced ****ThreadPool.h/ThreadPool.cpp**
** **The threadpool from serverHttpHandler.h/HttpHandler.cpp
The core of the server.Main.cpp
Server Start
MutexLock
simplify, encapsulate, and use
Epoll
Epoll operation: isValid, create, add, modify, del, wait
isVaild
determine whether epoll fd
is normal
create
function is encapsulated from epoll_create1
add, modify, del
functions are the same, the root is epoll_ctl
.
Only diffierent by arguments:
#define EPOLL_CTL_ADD 1
// Valid opcodes ( "op" parameter ) to issue to epoll_ctl().
// Add a file descriptor to the interface.
#define EPOLL_CTL_DEL 2
// Remove a file descriptor from the interface.
#define EPOLL_CTL_MOD 3
// Change file descriptor epoll_event structure.
Utils
socket bind && listen
int socket_bind_and_listen(int port)
{
int listen_fd = 0;
// AF_INET : IPv4 Internet protocols
// SOCK_STREAM : TCP socket
if((listen_fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0)) == -1)
return -1;
sockaddr_in server_addr;
memset(&server_addr, '\0', sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons((unsigned short)port);
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
int opt = 1;
if(setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) == -1)
return -1;
if(bind(listen_fd, (sockaddr*)&server_addr, sizeof(server_addr)) == -1)
return -1;
if(listen(listen_fd, 1024) == -1)
return -1;
return listen_fd;
}
Readn
read the data
ssize_t readn(int fd, void* buf, size_t len)
{
char *pos = (char*)buf;
size_t leftNum = len;
ssize_t readNum = 0;
while(leftNum > 0)
{
ssize_t tmpRead = read(fd, pos, leftNum);
if(tmpRead < 0)
{
if(errno == EINTR)
tmpRead = 0;
else if (errno == EAGAIN)
return readNum;
else
return -1;
}
// tmpread == 0 ,the connection is close
if(tmpRead == 0)
break;
readNum += tmpRead;
pos += tmpRead;
leftNum -= tmpRead;
}
return readNum;
}
writen
ssize_t writen(int fd, const void* buf, size_t len, bool isWrite)
{
char *pos = (char*)buf;
size_t leftNum = len;
ssize_t writtenNum = 0;
while(leftNum > 0)
{
ssize_t tmpWrite = 0;
if(isWrite)
tmpWrite = write(fd, pos, leftNum);
else
tmpWrite = send(fd, pos, leftNum, 0);
if(tmpWrite < 0)
{
if(errno == EINTR || errno == EAGAIN)
tmpWrite = 0;
else
return -1;
}
if(tmpWrite == 0)
break;
writtenNum += tmpWrite;
pos += tmpWrite;
leftNum -= tmpWrite;
}
return writtenNum;
}
escapeStr
Traversal the string, and replace the char which is unprintable.
is_path_parent
Determine whether there is a directory traversal vulnerability
if(child_p == strstr(child_p, parent_p)) {
// The parent is in the child, so the child [parent.len] will not cross the boundary
separator = child_p[strlen(parent_p)];
if (separator == '\0' || separator == '/')
return true;
}
Advanced functions have comments in the source code, you can find them from
https://github.com/k3lpi3b4nsh33/SKR-WebServer