DKVS: Distributed Key-Value Store --- Parallelism
Introduction
The aim of this week's work is to add a bit parallelism, although different, in both our clients and servers.
Up to now, clients sent their messages and wait for their responses serially, one after the other. This simplifies the code but prevents the servers from working concurrently, which limits the overall performance. In this last week, we're going to change that and allow clients to first send all their messages, and then retrieve and process replies as they arrive so that the servers can now work concurrently.
Similarly, each server also responds serially to all the possible clients that might contact it. This is not much of an issue in our current project since server replies are very quick, but let's also take the opportunity to parallelize this part via threads.
Notice however that, in order to lower the workload, we do not ask you to multithread the client.
Client allowing concurrent servers
Regarding the client, we now would like the "put" operation to first send its N messages, without worrying about responses, and then declare (afterwards) a success as soon as W positive replies have been received (within the allotted time linked to the opening of the corresponding socket; otherwise, as before, it's a failure; this point doesn't change).
Similarly, the "get" operation will also first send its N messages, without worrying about replies, and then declare a success as soon as R identical replies have been received (under the same delay conditions as above).
However, it's quite possible that a (typically slow) server will continue to respond to a request that has already been completed, perhaps interfering with another request from the same client. To avoid such a collision, you now have to open a new UDP socket for each "put" or "get" operation performed (but not for each server contacted for the same operation!). This will prevent UDP messages from an old operation being received by a new one.
In concrete terms, you will need to revise network.c
, since it's now up to network_get()
and network_put()
to open a new socket each time they're called, then change the communication policy as explained above.
We also take the opportunity now to check that we actually got a reply from one of our known servers:
check (using the last argument of udp_read()
) that a received reply indeed belongs to our ring.
Notes:
client.[ch]
: the client no longer needs a single socket associated with it since sockets will now be associated to operations ("get" or "put"); however, for backward compatibility, we will still keep it in theclient_t
structure, and could still have that useless socket open by the client (client_init()
);- we don't ask you to change
dkvs-dump-ring
; - we don't ask you to do any multi-threading in the client.
Multithreaded server
The main problem with the current server design is that we open only one single socket for the communication and that this socket is blocking: only one single communication can occur at a time. Although not really harmful in our project since server operations are very quick, this is not convenient in general. Actually you could give it a first try by arbitrarily slowing down the server by adding a
usleep(500000);
(or more) before any server reply...
The most advance way to solve this kind of problems is to use polling non blocking connections (using poll()
or even epoll()
for larger servers).
In this project, we choose to implement a simplest way, also illustrating the lectures you soon had: multithreaded blocking connections.
Each client reply will be open in a new thread, thus allowing several parallel communications to the server.
But then, of course, all access to the hash-table shall be locked: any interaction with it must be locked for the other threads and unlocked as soon as the interaction with the hash-table is over.
Lock hash-table access
In dkvs-server.c
, lock all access to the hash-table:
- declare a global variable of type
pthread_mutex_t
; - initialize it in the
main()
(seepthread_mutex_init()
man-page); and release it at the end (seepthread_mutex_destroy()
); - add a lock (
pthread_mutex_lock()
) and unlock around all the access to the hash-table (a priori three in your code).
Multi-threaded reply to clients
It's the reply to the client which will be threaded, i.e. the part of your code that handles a client's request and replies to it. So first, if you haven't already, you have to make this part a function. Since the generic prototype of a threaded function is
void* to_do(void* arguments);
you first have to create a data structure (e.g. struct client_data_t
) to store all the required data as the new thread's argument. These are:
- the client address (type
struct sockaddr_in
); - the socket used (type
int
); - the read/write buffer (an array of
MAX_MSG_SIZE + 1
characters); - the size of its (used) content (type
ssize_t
); this will correspond to the former return value ofudp_read()
; - the hash-table used (type
Htable_t*
).
Then, if you don't have one already, define the void* handle_client(void* arguments);
function which regroups all your former code about replying to the client (calls to server_dump()
, server_get()
or server_put()
depending on the content of buffer read from the client).
Of course you have to cast the received void* arguments
to a struct client_data_t
to properly get all the required information from it.
Maybe, before making that function a new thread, give it a first try to check that you didn't break anything so far.
Once this checked, you can proceed to making it multithreaded.
So we first have to create a thread in the "udp_receive()
" part (depending on how you coded it, either the infinite loop in your main()
or the part just after the udp_read()
from the client):
- have the content of your
client_data_t
(the ones to be passed to thehandle_client()
function) to be stored on the heap; and, of course, release them whenever needed (inhandle_client()
; don't forget error cases); - create and initialize to
PTHREAD_CREATE_DETACHED
some pthread attributes; seepthread_attr_init()
andpthread_attr_setdetachstate()
man-pages; notice that "detached" threads automatically release their resources on exit (but then there is no way to get their return value; we'll ignore them; you can thus simply returnNULL
); - create a thread (see
pthread_create()
) that will runhandle_client()
with the properclient_data_t
as a parameter; - don't forget to release the
pthread_attr_t
withpthread_attr_destroy()
.
Note: this is a practice exercise for programming threads in C. There is thus a part of understanding on your side here: reviewing your lecture slides, reading man-pages(, asking questions).
We also recommend, for the sake of tracing/debugging to had a debug_printf()
appropriate message at the beginning
and a the end of handle_client()
, typically indicating the thread id with pthread_self()
. You could also maybe print the corresponding error message ERR_MSG(error)
if you got some error.
Signal handling
Now that handle_client()
is multi-threaded, we don't want the SIGTERM
and SIGINT
signals to be intercepted by it (but leave them to the main thread).
For this, simply add this code at the beginning of handle_client()
:
sigset_t mask;
sigemptyset(&mask);
sigaddset(&mask, SIGINT );
sigaddset(&mask, SIGTERM);
pthread_sigmask(SIG_BLOCK, &mask, NULL);
Testing with many servers and many client requests at the same time
Test the multithreaded approach by launching several client requests at the same time. You can do this for instance by:
- Launching all the servers, maybe with some artificial reply delay (
usleep()
) as pointed out in the beginning:
killall dkvs-server
cut -d' ' -f 1-2 servers.txt | sort -u | while read line; do i=$(($i + 1)); ./dkvs-server $line >LOG$i.txt 2>&1 & done
-
then have a few (2-5) terminals ready; in one launch a thousand of "put" requests:
for i in $(seq 100); do ./dkvs-client put -- key$i value$i; done
while in each of the others you can launch a thousand of "get" requests:
for i in $(seq 100); do ./dkvs-client get -- key$i; done
See how this behaves differently with or without multithreading (without multithreading some of these requests are slowed by the others; the more terminals in parallel the better you'll see that).
Also have a look at the number of threads a server has launched (in the multithreaded version). For instance:
grep thread_start LOG1.txt | sort -n | uniq -c
if you used the word "thread_start"
somewhere in your debugging message at the beginning of handle_client()
(This can, of course, be whatever unique text; what is more important is that those lines contain the thread id.)
Final submission
So this is the end! Next week will indeed be "free", no new content, only to finalize your project before the deadline which is:
SUNDAY JUNE 01, 11:59pm
For this deadline, there is nothing special to be done, except to commit and push, and to provide a (short) README.md
file which must contain:
- what you did and what you did not in the project (= up to where you went);
- any particular remark about your project (specific aspect, changes in the conception, ...);
- anything else you want us to know (about the project).
Don't forget to push everything before the above deadline. The content of your project will be the state of your main
branch at the deadline (in case this is relevant for you: thus don't forget to merge your branch(es) into the main
branch).
IMPORTANT NOTE: don't push your dkvs-server
slow version: don't leave any usleep()
(not sleep()
) in you final version!