Async socket

I’m trying to build a streaming server using sockets with C language.

First I tried linux and io_uring. It works ok but only for small files (~ 2 MB). With io_uring I’ve managed to do async read/write to socket.

Then I’ve read about FreeBSD and Kqueue so I decided to give it a try and I need some help with understanding on how can I do async read/write to socket. As I understand Kqueue allows you to do async accept new connections but how do you async read/write to socket on FreeBSD. Maybe aio_read() and aio_read() would help in this case or there is a better way?
 
Had nearly the same program to develop in C: server application which listening socket (HTTP(s) port), and process requests from the clients. After some research I decided that the best solution for my case: kqueue+kevent to handle incomming connections and pool-of-threads (pthreads) which picking up connection from the task queue, process request and send data to the client. I think there is no need to use async IO because each connection is handeling by separate thread and it won't block the other connections.

If you have a similar task - I can share some example, if you need.
 
There was a similar discussion on the forums not too long ago, you might find some parts of it useful?

https://forums.freebsd.org/threads/a-tiny-socket-to-tls-tunnel.88872/

Personally I would go for async/non-blocking sockets first and then think about adding threads ontop of the async code if you need to deal with a very high number of connecting clients.

With threads on their own, you are basically adding contention during context switching so start losing performance if more clients connect than cores on your machine.
 
With threads on their own, you are basically adding contention during context switching so start losing performance if more clients connect than cores on your machine.
Well, it's mainly depends of what's need to be done for each client connection. In my case - it process image file and send it to the client. File processing is SYNC operation. I cannot reply to the client while processed file is not ready for send. So, processing files in ASYNC mode is useless or even redundant in my case.

Yes, threads performs context switch. But the time of context switch is very low in comparison with file processing operation. According to my tests, async file processing slow downs a little bit my program.
 
How big is your server? How many concurrent connections? Is there heavy interleaving between connections?

Here's what I would do: The simplest thing possible. A synchronous server, one thread per connection, each thread blocking. It might just work. It is probably a bit inefficient. But with modern CPUs being so fast, and at moderate workloads, it might make no difference in practice.

I would even go further: If you get to pick the protocol, find an existing RPC framework, and just use it.
 
How big is your server? How many concurrent connections? Is there heavy interleaving between connections?
Here is my project which already written in TCL and pecfectly works in production under high load: https://github.com/iron-udjin/tcl-img-proxy
I'm trying to rewrite it in C with graphics/vips as a conversion backend.
Here's what I would do: The simplest thing possible. A synchronous server, one thread per connection, each thread blocking. It might just work. It is probably a bit inefficient. But with modern CPUs being so fast, and at moderate workloads, it might make no difference in practice.
I use diffrent appreach: incomming connections handled by kqueue, each connection processed in the separate thread which puts conversion job into conversion queue. A thread from the pool-of-threads picking up the job, process image file and outputs result into socket. Pool-of-threads allows flexible management of server resources like CPU load and memory comsumption.
All described above already implemented, works perfectly and 40% faster then my previous implementation with graphics/ImageMagick7 as a conversion backend.
I would even go further: If you get to pick the protocol, find an existing RPC framework, and just use it.
I don't want to use any frameworks or libraries like devel/libevent or devel/libuv. My program has to be as small as possible without extra dependencies and follow KISS principle.
And yes, I know that there are a lot of tools doing the same things that I'm trying to implement. But they don't fit my needs. Eventually I'm doing it just for fun, for study C and OS internals (kqueue, pthread, etc...).
 
it process image file and send it to the client. File processing is SYNC operation. I cannot reply to the client while processed file is not ready for send. So, processing files in ASYNC mode is useless or even redundant in my case.
If possible I would recommend re-architecting your image processing code to be asynchronous. Even if it is just working on small parts of it during and up to a timeslice.

Imagine you have 1000 connecting clients, you would not want to start 1000 threads, each running the blocking image processing function. This is not so scalable. Instead I would make it all asynchronous but then (if you are looking at massive amounts of clients) create a threadpool of size $MAX_CORES to share the work.

If realistically, your server only needs to handle a couple of clients at a time, then yes ignore the above. The slight waste from potential over-utilisation of threads per connection is likely worth the greatly simplified codebase. Basically agreeing with ralphbsz.
 
If possible I would recommend re-architecting your image processing code to be asynchronous. Even if it is just working on small parts of it during and up to a timeslice.

Imagine you have 1000 connecting clients, you would not want to start 1000 threads, each running the blocking image processing function. This is not so scalable. Instead I would make it all asynchronous but then (if you are looking at massive amounts of clients) create a threadpool of size $MAX_CORES to share the work.
Thread for incomming connection is only for process request (do some basic URL check and parsing parameters) and to put convertsion task into the task queue. It deosn't cause impact on any other part of application. After that connections are processing according to threadpool size. In that case I won't have any overflow or server overload.
If realistically, your server only needs to handle a couple of clients at a time, then yes ignore the above. The slight waste from potential over-utilisation of threads per connection is likely worth the greatly simplified codebase. Basically agreeing with ralphbsz.
My application doesn't handle connections from clients dirrectly. So, there is no problem to use thread per connections because there is www/nginx between application and client which do all validation operations for incomming connections. It also caches requested images.
 
Thread for incomming connection is only for process request (do some basic URL check and parsing parameters) and to put convertsion task into the task queue.
Oh right, unless I am not understanding correctly, that seems to contradict your previous description:

Well, it's mainly depends of what's need to be done for each client connection. In my case - it process image file and send it to the client. File processing is SYNC operation

So if each client is *not* doing any synchronous operations and instead simply adding stuff to a queue for later processing (perhaps on another thread), it really does seem wasteful to be spawning a thread for each one. Instead non-blocking sockets (i.e fcntl(sockfd, F_SETFL, O_NONBLOCK) and select(2) or poll(2)) does seem like the correct choice. Some info here in beej's guide.

My application doesn't handle connections from clients dirrectly. So, there is no problem to use thread per connections because there is www/nginx between application and client which do all validation operations for incomming connections. It also caches requested images.
In this case, your "client" connection is nginx. I don't think that changes too much (unless using i.e inetd(8) which changes a fair amount ;)).
 
So if each client is *not* doing any synchronous operations and instead simply adding stuff to a queue for later processing (perhaps on another thread), it really does seem wasteful to be spawning a thread for each one. Instead non-blocking sockets (i.e fcntl(sockfd, F_SETFL, O_NONBLOCK) and select(2) or poll(2)) does seem like the correct choice. Some info here in beej's guide.
Even if incomming processing thread doesn't perform image conversion operation, it will block accept queue it finish processing current connection.
Possibly I can replace processing connection in the seperate thread (handle_client function) by any other non-blocking call.
In connection read loop it sends request to thread and after that it's free to handle other requests.
Here is code:

C:
int main(int argc, char **argv) {
    struct sockaddr_in sockaddr_in;
    struct sockaddr_un sockaddr_un;
    int kq;

    // compile regex
    if (!compile_regex()) {
        printf("Regex compilation failed!");
    }

    init_vars();

    if (parse_cmdline_params(argc, argv)) {
        return 1;
    }

    signal(SIGINT, signal_handler);

    start_conv_threads();
    // Create TCP socket
    if (unix_socket[0] != '\0') {
        listen_sock = socket(AF_UNIX, SOCK_STREAM, 0);
    } else {
        listen_sock = socket(AF_INET, SOCK_STREAM, 0);
        // Set socket options to reuse address and port
        int optval = 1;
        struct linger timeout;
        timeout.l_onoff = 1;
        timeout.l_linger = 1;

        setsockopt(listen_sock, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval));
        setsockopt(listen_sock, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval));
        setsockopt(listen_sock, SOL_SOCKET, SO_LINGER, &timeout, sizeof(timeout));
        setsockopt(listen_sock, SOL_SOCKET, SO_KEEPALIVE, &timeout, sizeof(timeout));
    }
    if (listen_sock < 0) {
        perror("socket");
        exit(1);
    }

    if (unix_socket[0] != '\0') {
        // Bind Unix Socket
        memset(&sockaddr_un, 0, sizeof(sockaddr_un));
        sockaddr_un.sun_family = AF_UNIX;
        strncpy(sockaddr_un.sun_path, unix_socket, sizeof(sockaddr_un.sun_path) - 1);
    } else {
        // Bind socket to port
        memset(&sockaddr_in, 0, sizeof(sockaddr_in));
        sockaddr_in.sin_family = AF_INET;
        sockaddr_in.sin_port = htons(port);
        sockaddr_in.sin_addr.s_addr = inet_addr(ip);
    }
    
    if (unix_socket[0] != '\0') {
        if (bind(listen_sock, (struct sockaddr *)&sockaddr_un, strlen(sockaddr_un.sun_path) +
                    sizeof(sockaddr_un.sun_len) + sizeof (sockaddr_un.sun_family)) < 0) {
            perror("bind");
            exit(EXIT_FAILURE);
        }
        mode_t socket_permissions = 0666;
        if (chmod(unix_socket, socket_permissions) == -1) {
            perror("chmod");
            exit(EXIT_FAILURE);
        }
    } else {
        if (bind(listen_sock, (struct sockaddr *)&sockaddr_in, sizeof(sockaddr_in)) < 0) {
            perror("bind");
            exit(EXIT_FAILURE);
        }
    }

    // Listen for incoming connections
    if (listen(listen_sock, BACKLOG) < 0) {
        perror("listen");
        exit(EXIT_FAILURE);
    }

    // Create kqueue
    kq = kqueue();
    if (kq < 0) {
        perror("kqueue");
        exit(EXIT_FAILURE);
    }

    // Register server socket with kqueue for read events
    struct kevent ev;
    EV_SET(&ev, listen_sock, EVFILT_READ, EV_ADD, 0, 0, NULL);
    if (kevent(kq, &ev, 1, NULL, 0, NULL) < 0) {
        perror("kevent");
        exit(1);
    }

    // Loop for handling events
    while (1) {
        struct kevent event;
        int n = kevent(kq, NULL, 0, &event, 1, NULL);

        if (n < 0) {
            perror("kevent");
            exit(0);
        }
        int *arg;
        if (event.ident == listen_sock) {
            int client_socket = accept(listen_sock, NULL, NULL);
            if (client_socket < 0) {
                perror("accept");
                exit(1);
            }
            arg = malloc(sizeof(*arg));
            *arg = client_socket;
            pthread_t tid;
            if (pthread_create(&tid, NULL, handle_client, arg) != 0) {
                perror("pthread_create");
                free(arg);
                continue;
            }
            pthread_detach(tid);
        } else if (event.filter == EVFILT_READ) {
            pthread_join(*(pthread_t*)event.udata, NULL);
        }
    }
}

In this case, your "client" connection is nginx. I don't think that changes too much (unless using i.e inetd(8) which changes a fair amount ;)).
It changes a lot. Because 99% of incoming file requests are going to nginx cache. Also, for security in nginx I can cache 404 and limit amount of connections from single IP which allow to protect my application from DoS and other types of attacks. (please see application logic from the link on github repo above)
 
Slightly tongue-in-cheek (only slightly). Since this is BSD why all this talk about threads? Isn't the traditional model of forking a new process for each incoming connection the BSD way? I mean. resident size after fork shouldn't increase by that much, you still have all your file descriptors, and each client environment is isolated better than when using threads.
 
Slightly tongue-in-cheek (only slightly). Since this is BSD why all this talk about threads? Isn't the traditional model of forking a new process for each incoming connection the BSD way? I mean. resident size after fork shouldn't increase by that much, you still have all your file descriptors, and each client environment is isolated better than when using threads.
Forking performs copy of main process for each request. Copy is not cheap operation and redundant for my case.
For each file conversion operation I need to prepare variables and environment for graphics/vips and then process image file. So, the bast way is to pre-spawn threads in pool-of-threads when the program starts with ready for conversion environment and handle conversion operations there.
 
Even if incomming processing thread doesn't perform image conversion operation, it will block accept queue it finish processing current connection.
I think one issue I see is that you are blocking on accept() and using threads to get around this. Instead either set the socket non-blocking so it returns -1 with EWOULDBLOCK / EAGAIN. Or make sure to select() / poll() for readability first before calling accept().

Other than that, it is really what is in the handle_client function that may make the difference (either needing a refactor or indeed wanting to actually be on another thread). Also if this is using blocking calls (i.e recv()) then without making these sockets non-blocking, this might also be a little tricky to stop threads cleanly when application exits.

A small network daemon I wrote not too long ago is convoluted as hell to use as an example, but you can see the non-blocking listen socket approach here.
It changes a lot. Because 99% of incoming file requests are going to nginx cache. Also, for security in nginx I can cache 404 and limit amount of connections from single IP which allow to protect my application from DoS and other types of attacks. (please see application logic from the link on github repo above)
From a code point of view, whether the request has gone through a load balancer, cache, etc, it shouldn't change the design too much typically.
 
I think one issue I see is that you are blocking on accept() and using threads to get around this. Instead either set non-blocking so it returns -1 with EWOULDBLOCK / EAGAIN. Or make sure to select() / poll() for readability first.
Where do I have blocking accept()? In the code above I'm subscribe on listening socket event. When event is comming - I send request for processing in separate thread.
Other than that, it is really what is in the handle_client function that may need a refactor.
What do you mean? handle_client is running in the separate thread. Whatever run there it doesn't affect other parts of the program. The only one thing that can be improved as I see - to replace client handle request thread to something lighter and non-blocking which doesn't do context switch. What potentially I can use there?
From a code point of view, whether the request has gone through a load balancer, cache, etc, it shouldn't change the design too much typically.
Typically yes. But in my case nginx is important part of application in general because it takes job for caching and filtering incomming requests.
 
Where do I have blocking accept()? In the code above I'm subscribe on listening socket event. When event is comming - I send request for processing in separate thread.
Good point, was looking for FD_SET but you have similar via the EV_SET(&ev, listen_sock, EVFILT_READ) stuff. Indeed this is fine but you then block on the kevent instead. Instead that time might be better served waiting for read/write on your other sockets too (I discuss this a little further down).

What do you mean? handle_client is running in the separate thread.
I am suggesting that depending on what is inside handle_client it might not warrant spawning a thread in the first place. If all it is doing is adding something to an i.e queue, then this may not deserve its own thread being created. Especially since you can avoid recv()/write() blocking.

to replace client handle request thread to something lighter and non-blocking which doesn't do context switch. What potentially I can use there?
Possibly the clients could be added to a simple array and then an i.e FD_SET can be built up to poll/select them for read/write. Only if i.e they can be read (similar to what you are doing for the kevent() and accept() further up), then you call recv() knowing that it won't block (especially if O_NONBLOCK attribute has been set on the sockfd prior).

I do this in the wait() function in this code example. It means that every socket (incl listening) gets polled at once and can then be processed asynchronously without needing a separate thread to handle it. Something like this, *combined* with a thread pool could manage many more clients than just threads alone (obviously most of our software doesn't need to do that!)
 
I am suggesting that depending on what is inside handle_client it might not warrant spawning a thread in the first place. If all it is doing is adding something to an i.e queue, then this may not deserve its own thread being created.
parse_request() - parse http request, extract and validate parameters, decrypt hash with encrypted parameters and so on. It's too havy to process with without separate thread.
C:
void *handle_client(void *arg) {
    int socket = *(int *)arg;
    free(arg);
    char request[BUFFER_SIZE];
    client_data * data;
    memset(request, 0, BUFFER_SIZE);

    // Read request from client
    ssize_t n = recv(socket, request, BUFFER_SIZE - 1, 0);
    if (n < 0) {
        perror("recv");
        close(socket);
        return NULL;
    } else if (n == 0) {
        printf("Connection closed\n");
        close(socket);
    } else {
        data = parse_request(request);
        if (data == NULL) {
            send_http_headers(404, socket);
        } else {
            data->socket = socket;
            submitTask(data);
        }        
    }
    return NULL;
}
submitTask() - add parsed and validated data to the queue. data is a pointer to struct with convert parameters.
C:
typedef struct {
    char * url;
    char * filename;
    convert_params params;
    int socket;
} client_data;

client_data *taskQueue[1024];
pthread_cond_t condQueue;
pthread_mutex_t mutexQueue;

void submitTask(client_data *data) {
    pthread_mutex_lock(&mutexQueue);
    taskQueue[taskCount] = data;
    taskCount++;
    pthread_mutex_unlock(&mutexQueue);
    pthread_cond_signal(&condQueue);
}

ConverterThread() - is a thread from threadpool:
C:
void* ConverterThread(void* args) {
    client_data *data;
    char name[16];
    pthread_getname_np(pthread_self(), name, sizeof(name));
    // initialization of some variables for image conversion operations

    while (1) {
        pthread_testcancel();
        // ----------------Begin Critical section --------------------
        pthread_mutex_lock(&mutexQueue);
        while (taskCount == 0) {
            pthread_cond_wait(&condQueue, &mutexQueue);
        }

        data = taskQueue[0];
        //int i;
        for (register int i = 0; i < taskCount - 1; i++) {
            taskQueue[i] = taskQueue[i + 1];
        }
        taskCount--;
        pthread_mutex_unlock(&mutexQueue);
        // ----------------End Critical section -----------------------
      
        // here is converison operation
        // ...
    }
}
As you can see, threadpool logic is very simple. Before submitTask() add job into the queue, it locks queue by pthread_mutex_lock() call. When job is added it unlock queue and signal threadpool that there is new job in the queue. One of the free ConverterThread() picking up job and does it.
 
Slightly tongue-in-cheek (only slightly). Since this is BSD why all this talk about threads? Isn't the traditional model of forking a new process for each incoming connection the BSD way? I mean. resident size after fork shouldn't increase by that much, you still have all your file descriptors, and each client environment is isolated better than when using threads.
That's not "the BSD way", that's "the ancient way". Servers were implemented that way before sane and standardized APIs for both threads and non-blocking/async I/O appeared. It comes with a huge overhead and additionally makes it unnecessarily complicated if the processes serving two different clients must communicate. It won't scale well, creating processes does come with a cost, and you can't have an unlimited amount of them.

I'm almost done finally releasing a lib that contains the async socket code I've developed myself and I'm using for a few years now. It uses pselect(). So, it won't scale to a huge number of concurrent(!) connections either (it probably starts to perform worse with more than 100 and stops working at all with more than 1000), but on the plus side, it's perfectly portable to any POSIX system, still small without extra dependencies and still works a lot better than forking (or, having a separate thread for each client). If you're interested, here's the source (with a link to API docs): https://github.com/Zirias/poser

If you indeed need to scale to a huge number of concurrent connections, then indeed, platform-specific event-based APIs (like FreeBSD's kqueue) are the way to go. There are libraries abstracting them like e.g. libev.
 
parse_request() - parse http request, extract and validate parameters, decrypt hash with encrypted parameters and so on. It's too havy to process with without separate thread.
I'm not sure, imagine you had 100 clients connecting per second. Spawning 100 threads per second might end up being more expensive than a threadpool of i.e 8, doing non-blocking reads on ~12 clients each.

And then up to 1000 clients a second and... well you would probably have load balancers and multiple servers by that point anyway. ;)

How many clients are you looking to support?, I may just be making pointless noise with my random suggestions haha!

The converter thread is generally fine, especially if the algorithm used to process the images isn't easily made resumable or incremental. The mutexes don't hold on for longer than they need to (I see the processing at the end after it is set in the queue and mutex released).

Out of interest, why are you using a register keyword for your for loop?
 
If you're interested, here's the source (with a link to API docs): https://github.com/Zirias/poser
Thank you, will take a look.
If you indeed need to scale to a huge number of concurrent connections, then indeed, platform-specific event-based APIs (like FreeBSD's kqueue) are the way to go.
That's what I used for handling incoming connections use kqueue (see code above).
There are libraries abstracting them like e.g. libev.
I wouldn't like to stick to any of the libraries. It's not so big project and it's easy to use platform-specific mechanisms for efficient event (connection) handling.
In case I'll need Linux support, I would add easily epool() event handling.
 
Does the overhead for whatever concurrency scheme on the webserving side of things really matter if the operation performed on any HTTP request is a full image conversion? Is that conversion via a C library or via fork/exec of a commandline program?

It's not that the caching is handled by this program.

And why can't you use sendfile(2)? That should be fastest, assuming you have the output file on disk and the filesystem is not ZFS.
 
I'm not sure, imagine you had 100 clients connecting per second. Spawning 100 threads per second might end up being more expensive than a threadpool of i.e 8, doing non-blocking reads on ~12 clients each.

And then up to 1000 clients a second and... well you would probably have load balancers and multiple servers by that point anyway. ;)

How many clients are you looking to support?, I may just be making pointless noise with my random suggestions haha!
TCL version of converter in peak handled around 300 concurent conversion requests. But sometimes it jumps up to 1000 RPS. (it's a popular mass media project)
Out of interest, why are you using a register keyword for your for loop?
When I use register for short lived and the most intensively used variables (loop counter one of them), it increases a little bit performance because the variable stay in registers instead of stack.
I know that's not so big loop to use it there. It's just a habit.
 
Does the overhead for whatever concurrency scheme on the webserving side of things really matter if the operation performed on any HTTP request is a full image conversion?
According to my tests and workload - yes, it's important and increase overal application speed by 70% under high load.
Is that conversion via a C library or via fork/exec of a commandline program?
In my TCL version (link above) yes, it uses cmd line magick program. But I want to rewrite it in C (just for fun) and use libvips as a conversion backend with library call (not cmd line, of course). According to my tests, under high load, libvips consumes 30% less RAM and 40% faster then the same operations performed by ImageMagick (comparation was between MagickResizeImage() and vips_thumbnail()).
And why can't you use sendfile(2)? That should be fastest, assuming you have the output file on disk and the filesystem is not ZFS.
If I'm not mistaken, sendfile() can help only when you need to send a file which is stored on disk, without modifications. In my case, I read the file, perform operations with it and send it into the socket. It seems there is no benefit for me from sendfile().
 
sendfile() is created by Netflix for Netflix :) As they perform video streaming. It avoids to copy huge files from kernel stace to userspace. So, use cases for sendfile() - video/file hostings. On small files, there is no benefit. Better to use aio() from nginx for them.

P.S: As far as I remember, sendfile() doesn't work well with ZFS. Is this still relevant or has something changed?
 
Back
Top