Tiny Server

Introduction

After 3 days of study and code, today I will explain to you the source code TinyServer .
In my opion, TinyServer functions like modifying some files with file descriptors.
We have to make the thread pool for this program, and have a knowledge about thread lock.

Basic knowledge

File descriptor

what exactly is FD in Linux? 1
what exactly is FD in Linux? 2
A file descriptor (file descriptor) is an index created by the kernel to **efficiently manage** these open files. It is a non-negative integer (usually a small integer) used to represent open files. All system calls to perform I/O operations are implemented through a file descriptor.

Epoll

Epoll reference article 1
Wikipedia Epoll
The full name of epoll is eventpoll, which is an implementation of Linux kernel to implement IO multiplexing (IO multiplexing). IO multiplexing means listening to multiple input and output sources at the same time in one operation, returning when one or more of the input and output sources are available, and then performing read and write operations on them.

Thread lock

Lock: an object that can only be owned by a single thread at any given time (C++ class std::mutex). Operations on a lock:

  • lock: mark the lock as owned by the current thread; if some other thread already owns the lock then first wait until the lock is free. Lock typically includes a queue to keep track of waiting threads.
  • unlock: mark the lock as free (it must currently be owned by the calling thread).

Too much milk solution with locks (using C++ library APIs):

std::mutex mutex;
...
mutex.lock();
if (milk == 0) {
  buy_milk();
}
mutex.unlock();

Producer / Consumer

  • Producers add characters to a buffer
  • Consumers remove characters from the buffer
  • Characters will be removed in the same order added

Version 1

class Pipe {
    Pipe() {}
    void put(char c);
    char get();

    std::mutex mutex;
    char buffer[SIZE];
    int count = 0;
    int nextPut = 0;
    int nextGet = 0;
};

void Pipe::put(char c) {
    mutex.lock();
    count++;
    buffer[nextPut] = c;
    nextPut++;
    if (nextPut == SIZE) {
        nextPut = 0;
    }
    mutex.unlock();
}

char Pipe::get() {
    char c;
    mutex.lock();
    count--;
    c = buffer[nextGet];
    nextGet++;
    if (nextGet == SIZE) {
        nextGet = 0;
    }
    mutex.unlock();
    return c;
}

Version2 handle full and empty situations

class Pipe {
    Pipe() {}
    void put(char c);
    char get();

    std::mutex mutex;
    char buffer[SIZE];
    int count = 0;
    int nextPut = 0;
    int nextGet = 0;
};

void Pipe::put(char c) {
    mutex.lock();
    while (count == SIZE) {
        mutex.unlock();
        mutex.lock();
    }
    count++;
    buffer[nextPut] = c;
    nextPut++;
    if (nextPut == SIZE) {
        nextPut = 0;
    }
    mutex.unlock();
}

char Pipe::get() {
    char c;
    mutex.lock();
    while (count == 0) {
        mutex.unlock();
        mutex.lock();
    }
    count--;
    c = buffer[nextGet];
    nextGet++;
    if (nextGet == SIZE) {
        nextGet = 0;
    }
    mutex.unlock();
    return c;
}

Version3 with condition variables

  • notify_one(): if any threads are waiting on condition, wake up one of them.
  • notify_all(): same as notify, except wake up all waiting threads.
class Pipe {
     Pipe() {}
     void put(char c);
     char get();

     std::mutex mutex;
     std::condition_variable charAdded, charRemoved;
     char buffer[SIZE];
     int count = 0;
     int nextPut = 0;
     int nextGet = 0;
 };

 void Pipe::put(char c) {
     mutex.lock();
     while (count == SIZE) {
         charRemoved.wait(mutex);
     }
     count++;
     buffer[nextPut] = c;
     nextPut++;
     if (nextPut == SIZE) {
         nextPut = 0;
     }
     charAdded.notify_one();
     mutex.unlock();
 }

 char Pipe::get() {
     char c;
     mutex.lock();
     while (count == 0) {
         charAdded.wait(mutex);
     }
     count--;
     c = buffer[nextGet];
     nextGet++;
     if (nextGet == SIZE) {
         nextGet = 0;
     }
     charRemoved.notify_one();
     mutex.unlock();
     return c;
 }

Thread pool(C++11)

The easy thread pool main function are as follows:

#include <vector>
#include <queue>
using namespace std;

class ThreadPool{
public:
	void startup();
	void QueueJob(const std::function<void()& job);
	void stop();
	void busy();

private:
	void ThreadLoop();
	bool terminate = false;
	mutex queue_mutex;
	vector<thread> threads;
	queue<function<void()>> jobs;

}

1. startup

Each thread should be running its own infinite loop, constantly waiting for new tasks to grab and run.

void ThreadPool::start()
{
    // Max # of threads the system supports
    const uint32_t num_threads = std::thread::hardware_concurrency();
    threads.resize(num_threads);
    for(uint32_t i = 0; i < num_threads; i++)
        threads.at(i) = std::thread(ThreadLoop);
}

2. ThreadLoop

The infinite loop function. This is a while(true) loop waiting for the task queue to open up.

void ThreadPool::ThreadLoop() {
    while (true) {
        std::function<void()> job;
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            mutex_condition.wait(lock, [this] {
                return !jobs.empty() || should_terminate;
            });
            if (should_terminate) {
                return;
            }
            job = jobs.front();
            jobs.pop();
        }
        job();
    }
}

3. QueueJob

Add a new job to the pool; use a lock so that there isn’t a data race.

void ThreadPool::QueueJob(const std::function<void()>& job) {
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        jobs.push(job);
    }
    mutex_condition.notify_one();
}

4. busy

The busy()function can be used in a while loop, such that the main thread can wait the threadpool to complete all the tasks before calling the threadpool destructor.

void ThreadPool::busy() {
    bool poolbusy;
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        poolbusy = jobs.empty();
    }
    return poolbusy;
}

5. stop

void ThreadPool::Stop() {
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        should_terminate = true;
    }
    mutex_condition.notify_all();
    for (std::thread& active_thread : threads) {
        active_thread.join();
    }
    threads.clear();
}

Analysis source code

suggested reading order:
Basic function code
MutexLock.h encapsulates from pthread_mutex
epoll.h/epoll.cpp encapsulates from sys/epoll.h
Utils.h/Utils.cpp the basic server functions in these files.
Timer.h/Timer.cpp
Log.h/Log.cppOutput the data to Terminal with colors.

**Advanced **
**ThreadPool.h/ThreadPool.cpp**** **The threadpool from server
HttpHandler.h/HttpHandler.cpp The core of the server.
Main.cpp Server Start

MutexLock

simplify, encapsulate, and use

Epoll

Epoll operation: isValid, create, add, modify, del, wait

isVaild determine whether epoll fd is normal

create function is encapsulated from epoll_create1

add, modify, del functions are the same, the root is epoll_ctl.
Only diffierent by arguments:

#define EPOLL_CTL_ADD 1
// Valid opcodes ( "op" parameter ) to issue to epoll_ctl().
// Add a file descriptor to the interface.

#define EPOLL_CTL_DEL 2
// Remove a file descriptor from the interface.

#define EPOLL_CTL_MOD 3
// Change file descriptor epoll_event structure.

Utils

socket bind && listen

int socket_bind_and_listen(int port)
{
    int listen_fd = 0;
    // AF_INET      : IPv4 Internet protocols
    // SOCK_STREAM  : TCP socket
    if((listen_fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0)) == -1)
        return -1;

    sockaddr_in server_addr;
    memset(&server_addr, '\0', sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons((unsigned short)port);
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY);


    int opt = 1;
    if(setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) == -1)
        return -1;
    if(bind(listen_fd, (sockaddr*)&server_addr, sizeof(server_addr)) == -1)
        return -1;
    if(listen(listen_fd, 1024) == -1)
        return -1;

    return listen_fd;
}

Readn read the data

ssize_t readn(int fd, void* buf, size_t len)
{
    char *pos = (char*)buf;
    size_t leftNum = len;
    ssize_t readNum = 0;
    while(leftNum > 0)
    {
        ssize_t tmpRead = read(fd, pos, leftNum);
        if(tmpRead < 0)
        {
            if(errno == EINTR)
                tmpRead = 0;
            else if (errno == EAGAIN)
                return readNum;
            else
                return -1;
        }
        // tmpread == 0 ,the connection is close
        if(tmpRead == 0)
            break;
        readNum += tmpRead;
        pos += tmpRead;

        leftNum -= tmpRead;
    }
    return readNum;
}

writen

ssize_t writen(int fd, const void* buf, size_t len, bool isWrite)
{
    char *pos = (char*)buf;
    size_t leftNum = len;
    ssize_t writtenNum = 0;
    while(leftNum > 0)
    {
        ssize_t tmpWrite = 0;

        if(isWrite)
            tmpWrite = write(fd, pos, leftNum);
        else
            tmpWrite = send(fd, pos, leftNum, 0);

        if(tmpWrite < 0)
        {
            if(errno == EINTR || errno == EAGAIN)
                tmpWrite = 0;
            else
                return -1;
        }
        if(tmpWrite == 0)
            break;
        writtenNum += tmpWrite;
        pos += tmpWrite;
        leftNum -= tmpWrite;
    }
    return writtenNum;
}

escapeStr Traversal the string, and replace the char which is unprintable.

is_path_parent Determine whether there is a directory traversal vulnerability

if(child_p == strstr(child_p, parent_p)) {
        // The parent is in the child, so the child [parent.len] will not cross the boundary
        separator = child_p[strlen(parent_p)];
        if (separator == '\0' || separator == '/')
            return true;
    }

Advanced functions have comments in the source code, you can find them from
https://github.com/k3lpi3b4nsh33/SKR-WebServer