DKVS: Distributed Key-Value Store --- Parallelism

Introduction

The aim of this week's work is to add a bit parallelism, although different, in both our clients and servers.

Up to now, clients sent their messages and wait for their responses serially, one after the other. This simplifies the code but prevents the servers from working concurrently, which limits the overall performance. In this last week, we're going to change that and allow clients to first send all their messages, and then retrieve and process replies as they arrive so that the servers can now work concurrently.

Similarly, each server also responds serially to all the possible clients that might contact it. This is not much of an issue in our current project since server replies are very quick, but let's also take the opportunity to parallelize this part via threads.

Notice however that, in order to lower the workload, we do not ask you to multithread the client.

Client allowing concurrent servers

Regarding the client, we now would like the "put" operation to first send its N messages, without worrying about responses, and then declare (afterwards) a success as soon as W positive replies have been received (within the allotted time linked to the opening of the corresponding socket; otherwise, as before, it's a failure; this point doesn't change).

Similarly, the "get" operation will also first send its N messages, without worrying about replies, and then declare a success as soon as R identical replies have been received (under the same delay conditions as above).

However, it's quite possible that a (typically slow) server will continue to respond to a request that has already been completed, perhaps interfering with another request from the same client. To avoid such a collision, you now have to open a new UDP socket for each "put" or "get" operation performed (but not for each server contacted for the same operation!). This will prevent UDP messages from an old operation being received by a new one.

In concrete terms, you will need to revise network.c, since it's now up to network_get() and network_put() to open a new socket each time they're called, then change the communication policy as explained above.

We also take the opportunity now to check that we actually got a reply from one of our known servers: check (using the last argument of udp_read()) that a received reply indeed belongs to our ring.

Notes:

  1. client.[ch]: the client no longer needs a single socket associated with it since sockets will now be associated to operations ("get" or "put"); however, for backward compatibility, we will still keep it in the client_t structure, and could still have that useless socket open by the client (client_init());
  2. we don't ask you to change dkvs-dump-ring;
  3. we don't ask you to do any multi-threading in the client.

Multithreaded server

The main problem with the current server design is that we open only one single socket for the communication and that this socket is blocking: only one single communication can occur at a time. Although not really harmful in our project since server operations are very quick, this is not convenient in general. Actually you could give it a first try by arbitrarily slowing down the server by adding a usleep(500000); (or more) before any server reply...

The most advance way to solve this kind of problems is to use polling non blocking connections (using poll() or even epoll() for larger servers). In this project, we choose to implement a simplest way, also illustrating the lectures you soon had: multithreaded blocking connections. Each client reply will be open in a new thread, thus allowing several parallel communications to the server.

But then, of course, all access to the hash-table shall be locked: any interaction with it must be locked for the other threads and unlocked as soon as the interaction with the hash-table is over.

Lock hash-table access

In dkvs-server.c, lock all access to the hash-table:

  1. declare a global variable of type pthread_mutex_t;
  2. initialize it in the main() (see pthread_mutex_init() man-page); and release it at the end (see pthread_mutex_destroy());
  3. add a lock (pthread_mutex_lock()) and unlock around all the access to the hash-table (a priori three in your code).

Multi-threaded reply to clients

It's the reply to the client which will be threaded, i.e. the part of your code that handles a client's request and replies to it. So first, if you haven't already, you have to make this part a function. Since the generic prototype of a threaded function is

void* to_do(void* arguments);

you first have to create a data structure (e.g. struct client_data_t) to store all the required data as the new thread's argument. These are:

  • the client address (type struct sockaddr_in);
  • the socket used (type int);
  • the read/write buffer (an array of MAX_MSG_SIZE + 1 characters);
  • the size of its (used) content (type ssize_t); this will correspond to the former return value of udp_read();
  • the hash-table used (type Htable_t*).

Then, if you don't have one already, define the void* handle_client(void* arguments); function which regroups all your former code about replying to the client (calls to server_dump(), server_get() or server_put() depending on the content of buffer read from the client).

Of course you have to cast the received void* arguments to a struct client_data_t to properly get all the required information from it.

Maybe, before making that function a new thread, give it a first try to check that you didn't break anything so far.
Once this checked, you can proceed to making it multithreaded.

So we first have to create a thread in the "udp_receive()" part (depending on how you coded it, either the infinite loop in your main() or the part just after the udp_read() from the client):

  1. have the content of your client_data_t (the ones to be passed to the handle_client() function) to be stored on the heap; and, of course, release them whenever needed (in handle_client(); don't forget error cases);
  2. create and initialize to PTHREAD_CREATE_DETACHED some pthread attributes; see pthread_attr_init() and pthread_attr_setdetachstate() man-pages; notice that "detached" threads automatically release their resources on exit (but then there is no way to get their return value; we'll ignore them; you can thus simply return NULL);
  3. create a thread (see pthread_create()) that will run handle_client() with the proper client_data_t as a parameter;
  4. don't forget to release the pthread_attr_t with pthread_attr_destroy().

Note: this is a practice exercise for programming threads in C. There is thus a part of understanding on your side here: reviewing your lecture slides, reading man-pages(, asking questions).

We also recommend, for the sake of tracing/debugging to had a debug_printf() appropriate message at the beginning and a the end of handle_client(), typically indicating the thread id with pthread_self(). You could also maybe print the corresponding error message ERR_MSG(error) if you got some error.

Signal handling

Now that handle_client() is multi-threaded, we don't want the SIGTERM and SIGINT signals to be intercepted by it (but leave them to the main thread). For this, simply add this code at the beginning of handle_client():

    sigset_t mask;
    sigemptyset(&mask);
    sigaddset(&mask, SIGINT );
    sigaddset(&mask, SIGTERM);
    pthread_sigmask(SIG_BLOCK, &mask, NULL);

Testing with many servers and many client requests at the same time

Test the multithreaded approach by launching several client requests at the same time. You can do this for instance by:

  1. Launching all the servers, maybe with some artificial reply delay (usleep()) as pointed out in the beginning:
killall dkvs-server
cut -d' ' -f 1-2 servers.txt | sort -u | while read line; do i=$(($i + 1)); ./dkvs-server $line >LOG$i.txt 2>&1 & done
  1. then have a few (2-5) terminals ready; in one launch a thousand of "put" requests:

     for i in $(seq 100); do ./dkvs-client put -- key$i value$i; done
    

    while in each of the others you can launch a thousand of "get" requests:

     for i in $(seq 100); do ./dkvs-client get -- key$i; done
    

See how this behaves differently with or without multithreading (without multithreading some of these requests are slowed by the others; the more terminals in parallel the better you'll see that).

Also have a look at the number of threads a server has launched (in the multithreaded version). For instance:

grep thread_start LOG1.txt | sort -n | uniq -c

if you used the word "thread_start" somewhere in your debugging message at the beginning of handle_client() (This can, of course, be whatever unique text; what is more important is that those lines contain the thread id.)

Final submission

So this is the end! Next week will indeed be "free", no new content, only to finalize your project before the deadline which is:
SUNDAY JUNE 01, 11:59pm

For this deadline, there is nothing special to be done, except to commit and push, and to provide a (short) README.md file which must contain:

  • what you did and what you did not in the project (= up to where you went);
  • any particular remark about your project (specific aspect, changes in the conception, ...);
  • anything else you want us to know (about the project).

Don't forget to push everything before the above deadline. The content of your project will be the state of your main branch at the deadline (in case this is relevant for you: thus don't forget to merge your branch(es) into the main branch).

IMPORTANT NOTE: don't push your dkvs-server slow version: don't leave any usleep() (not sleep()) in you final version!