079096d86c1699e5dcc2bbe85dd14902cf0bfb86
[oweals/gnunet.git] / src / testbed / gnunet-service-testbed_barriers.c
1 /*
2   This file is part of GNUnet.
3   (C) 2008--2013 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file testbed/gnunet-service-testbed_barriers.c
23  * @brief barrier handling at the testbed controller
24  * @author Sree Harsha Totakura <sreeharsha@totakura.in> 
25  */
26
27 #include "gnunet-service-testbed.h"
28
29 /**
30  * timeout for outgoing message transmissions in seconds
31  */
32 #define MESSAGE_SEND_TIMEOUT(s) \
33   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, s)
34
35
36 /**
37  * Barrier
38  */
39 struct Barrier;
40
41
42 /**
43  * Message queue for transmitting messages
44  */
45 struct MessageQueue
46 {
47   /**
48    * next pointer for DLL
49    */
50   struct MessageQueue *next;
51
52   /**
53    * prev pointer for DLL
54    */
55   struct MessageQueue *prev;
56
57   /**
58    * The message to be sent
59    */
60   struct GNUNET_MessageHeader *msg;
61 };
62
63 /**
64  * Context to be associated with each client
65  */
66 struct ClientCtx
67 {
68   /**
69    * The barrier this client is waiting for
70    */
71   struct Barrier *barrier;
72
73   /**
74    * DLL next ptr
75    */
76   struct ClientCtx *next;
77
78   /**
79    * DLL prev ptr
80    */
81   struct ClientCtx *prev;
82
83   /**
84    * The client handle
85    */
86   struct GNUNET_SERVER_Client *client;
87
88   /**
89    * the transmission handle
90    */
91   struct GNUNET_SERVER_TransmitHandle *tx;
92
93   /**
94    * message queue head
95    */
96   struct MessageQueue *mq_head;
97
98   /**
99    * message queue tail
100    */
101   struct MessageQueue *mq_tail;
102 };
103
104
105 /**
106  * Barrier
107  */
108 struct Barrier
109 {
110   /**
111    * The hashcode of the barrier name
112    */
113   struct GNUNET_HashCode hash;
114
115   /**
116    * The name of the barrier
117    */
118   char *name;
119
120   /**
121    * DLL head for the list of clients waiting for this barrier
122    */
123   struct ClientCtx *head;
124
125   /**
126    * DLL tail for the list of clients waiting for this barrier
127    */
128   struct ClientCtx *tail;
129
130   /**
131    * Number of peers which have reached this barrier
132    */
133   unsigned int nreached;
134
135   /**
136    * Number of slaves we have initialised this barrier
137    */
138   unsigned int nslaves;
139
140   /**
141    * Quorum percentage to be reached
142    */
143   uint8_t quorum;
144   
145   /**
146    * Was there a timeout while propagating initialisation
147    */
148   uint8_t timedout;
149 };
150
151
152 /**
153  * Hashtable handle for storing initialised barriers
154  */
155 static struct GNUNET_CONTAINER_MultiHashMap *barrier_map;
156
157 /**
158  * Service context
159  */
160 static struct GNUNET_SERVICE_Context *ctx;
161
162
163 /**
164  * Function called to notify a client about the connection
165  * begin ready to queue more data.  "buf" will be
166  * NULL and "size" zero if the connection was closed for
167  * writing in the meantime.
168  *
169  * @param cls client context
170  * @param size number of bytes available in buf
171  * @param buf where the callee should write the message
172  * @return number of bytes written to buf
173  */
174 static size_t 
175 transmit_ready_cb (void *cls, size_t size, void *buf)
176 {
177   struct ClientCtx *ctx = cls;
178   struct GNUNET_SERVER_Client *client = ctx->client;
179   struct MessageQueue *mq;
180   struct GNUNET_MessageHeader *msg;
181   size_t wrote;
182
183   ctx->tx = NULL;
184   wrote = 0;
185   if ((0 == size) || (NULL == buf))
186   {
187     GNUNET_assert (NULL != ctx->client);
188     GNUNET_SERVER_client_drop (ctx->client);
189     ctx->client = NULL;    
190     return 0;
191   }
192   mq = ctx->mq_head;
193   msg = mq->msg;
194   wrote = ntohs (msg->size);
195   GNUNET_assert (size >= wrote);
196   (void) memcpy (buf, msg, wrote);
197   GNUNET_CONTAINER_DLL_remove (ctx->mq_head, ctx->mq_tail, mq);
198   GNUNET_free (mq->msg);
199   GNUNET_free (mq);
200   if (NULL != (mq = ctx->mq_head))
201     ctx->tx = GNUNET_SERVER_notify_transmit_ready (client, ntohs (msg->size),
202                                                   MESSAGE_SEND_TIMEOUT (30),
203                                                   &transmit_ready_cb, ctx);
204   return wrote;
205 }
206
207
208 /**
209  * Queue a message into a clients message queue
210  *
211  * @param ctx the context associated with the client
212  * @param msg the message to queue.  Will be consumed
213  * @param suspended is the client suspended at the time of calling queue_message
214  */
215 static void
216 queue_message (struct ClientCtx *ctx, struct GNUNET_MessageHeader *msg)
217 {
218   struct MessageQueue *mq;
219   struct GNUNET_SERVER_Client *client = ctx->client;
220   
221   mq = GNUNET_malloc (sizeof (struct MessageQueue));
222   mq->msg = msg;
223   GNUNET_CONTAINER_DLL_insert_tail (ctx->mq_head, ctx->mq_tail, mq);
224   if (NULL == ctx->tx)
225    ctx->tx = GNUNET_SERVER_notify_transmit_ready (client, ntohs (msg->size),
226                                                   MESSAGE_SEND_TIMEOUT (30),
227                                                   &transmit_ready_cb, ctx);
228 }
229
230
231 #if 0
232 /**
233  * Function to remove a barrier from the barrier map and cleanup resources
234  * occupied by a barrier
235  *
236  * @param barrier the barrier handle
237  */
238 static void
239 remove_barrier (struct Barrier *barrier)
240 {
241   GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (barrier_map,
242                                                                     &barrier->hash,
243                                                                     barrier));
244   GNUNET_free (barrier->name);
245   GNUNET_free (barrier);
246 }
247
248
249 /**
250  * Function called upon timeout while waiting for a response from the
251  * subcontrollers to barrier init message
252  *
253  * @param 
254  * @return 
255  */
256 static void
257 fwd_tout_barrier_init (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
258 {
259   struct ForwardedOperationContext *foctx = cls;
260   struct Barrier *barrier = foctx->cls;
261   
262   barrier->nslaves--;
263   barrier->timedout = GNUNET_YES;
264   if (0 == barrier->nslaves)
265   {
266     GST_send_operation_fail_msg (foctx->client, foctx->operation_id,
267                                  "Timeout while contacting a slave controller");
268     remove_barrier (barrier);
269   }
270 }
271 #endif
272
273 /**
274  * Task for sending barrier crossed notifications to waiting client
275  *
276  * @param cls the barrier which is crossed
277  * @param tc scheduler task context
278  */
279 static void
280 notify_task_cb (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
281 {
282   struct Barrier *barrier = cls;
283   struct ClientCtx *client_ctx;
284   struct GNUNET_TESTBED_BarrierStatus *msg;
285   struct GNUNET_MessageHeader *dup_msg;
286   uint16_t name_len;
287   uint16_t msize;
288
289   name_len = strlen (barrier->name) + 1;
290   msize = sizeof (struct GNUNET_TESTBED_BarrierStatus) + name_len;  
291   msg = GNUNET_malloc (msize);
292   msg->header.size = htons (msize);
293   msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS);
294   msg->status = 0;
295   msg->name_len = htons (name_len);
296   (void) memcpy (msg->data, barrier->name, name_len);
297   msg->data[name_len] = '\0';
298   while (NULL != (client_ctx = barrier->head))
299   {
300     dup_msg = GNUNET_copy_message (&msg->header);
301     queue_message (client_ctx, dup_msg);
302     GNUNET_CONTAINER_DLL_remove (barrier->head, barrier->tail, client_ctx);
303     GNUNET_SERVER_client_set_user_context_ (client_ctx->client, NULL, 0);
304     GNUNET_free (client_ctx);
305   }
306 }
307
308
309 /**
310  * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT messages.  This
311  * message should come from peers or a shared helper service using the
312  * testbed-barrier client API (@see gnunet_testbed_barrier_service.h)
313  *
314  * This handler is queued in the main service and will handle the messages sent
315  * either from the testbed driver or from a high level controller
316  *
317  * @param cls NULL
318  * @param client identification of the client
319  * @param message the actual message
320  */
321 static void
322 handle_barrier_wait (void *cls, struct GNUNET_SERVER_Client *client,
323                      const struct GNUNET_MessageHeader *message)
324 {
325   const struct GNUNET_TESTBED_BarrierWait *msg;
326   struct Barrier *barrier;
327   char *name;
328   struct ClientCtx *client_ctx;
329   struct GNUNET_HashCode key;
330   size_t name_len;
331   uint16_t msize;
332   
333   msize = ntohs (message->size);
334   if (msize <= sizeof (struct GNUNET_TESTBED_BarrierWait))
335   {
336     GNUNET_break_op (0);
337     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
338     return;
339   }
340   if (NULL == barrier_map)
341   {
342     GNUNET_break (0);
343     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
344     return;
345   }
346   msg = (const struct GNUNET_TESTBED_BarrierWait *) message;
347   name_len = msize - sizeof (struct GNUNET_TESTBED_BarrierWait);
348   name = GNUNET_malloc (name_len + 1);
349   name[name_len] = '\0';
350   (void) memcpy (name, msg->name, name_len);
351   GNUNET_CRYPTO_hash (name, name_len - 1, &key);
352   if (NULL == (barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &key)))
353   {
354     GNUNET_break (0);
355     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
356     GNUNET_free (name);
357     return;
358   }
359   client_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientCtx);
360   if (NULL == client_ctx)
361   {
362     client_ctx = GNUNET_malloc (sizeof (struct ClientCtx));
363     client_ctx->client = client;
364     GNUNET_SERVER_client_keep (client);
365     client_ctx->barrier = barrier;
366     GNUNET_CONTAINER_DLL_insert_tail (barrier->head, barrier->tail, client_ctx);
367     barrier->nreached++;
368     if ((barrier->quorum * GST_num_local_peers) <= (barrier->nreached * 100))
369       notify_task_cb (barrier, NULL);
370   }
371   GNUNET_SERVER_receive_done (client, GNUNET_OK);
372 }
373
374
375 /**
376  * Functions with this signature are called whenever a client
377  * is disconnected on the network level.
378  *
379  * @param cls closure
380  * @param client identification of the client; NULL
381  *        for the last call when the server is destroyed
382  */
383 static void
384 disconnect_cb (void *cls, struct GNUNET_SERVER_Client *client)
385 {
386   struct ClientCtx *client_ctx;
387   struct Barrier *barrier;
388   
389   client_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientCtx);
390   if (NULL == client_ctx)
391     return;
392   barrier = client_ctx->barrier;
393   GNUNET_CONTAINER_DLL_remove (barrier->head, barrier->tail, client_ctx);
394   if (NULL != client_ctx->tx)
395     GNUNET_SERVER_notify_transmit_ready_cancel (client_ctx->tx);
396   
397 }
398
399
400 /**
401  * Function to initialise barrriers component
402  */
403 void
404 GST_barriers_init (struct GNUNET_CONFIGURATION_Handle *cfg)
405 {
406   static const struct GNUNET_SERVER_MessageHandler message_handlers[] = {
407     {&handle_barrier_wait, NULL, GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT, 0},
408     {NULL, NULL, 0, 0}
409   };
410   struct GNUNET_SERVER_Handle *srv;
411
412   barrier_map = GNUNET_CONTAINER_multihashmap_create (3, GNUNET_YES);
413   ctx = GNUNET_SERVICE_start ("testbed-barrier", cfg,
414                               GNUNET_SERVICE_OPTION_MANUAL_SHUTDOWN);
415   srv = GNUNET_SERVICE_get_server (ctx);
416   GNUNET_SERVER_add_handlers (srv, message_handlers);
417   GNUNET_SERVER_disconnect_notify (srv, &disconnect_cb, NULL);  
418 }
419
420
421 /**
422  * Function to stop the barrier service
423  */
424 void
425 GST_barriers_stop ()
426 {
427   GNUNET_assert (NULL != ctx);
428   GNUNET_SERVICE_stop (ctx);
429 }
430
431
432 /**
433  * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_INIT messages.  This
434  * message should always come from a parent controller or the testbed API if we
435  * are the root controller.
436  *
437  * This handler is queued in the main service and will handle the messages sent
438  * either from the testbed driver or from a high level controller
439  *
440  * @param cls NULL
441  * @param client identification of the client
442  * @param message the actual message
443  */
444 void
445 GST_handle_barrier_init (void *cls, struct GNUNET_SERVER_Client *client,
446                          const struct GNUNET_MessageHeader *message)
447 {
448   const struct GNUNET_TESTBED_BarrierInit *msg;
449   const char *name;
450   struct Barrier *barrier;
451   struct Slave *slave;
452   struct GNUNET_HashCode hash;
453   size_t name_len;
454   uint64_t op_id;
455   unsigned int cnt;
456   uint16_t msize;
457   
458   if (NULL == GST_context)
459   {
460     GNUNET_break_op (0);
461     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
462     return;
463   }
464   if (client != GST_context->client)
465   {
466     GNUNET_break_op (0);
467     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
468     return;
469   }
470   msize = ntohs (message->size);
471   if (msize <= sizeof (struct GNUNET_TESTBED_BarrierInit))
472   {
473     GNUNET_break_op (0);
474     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
475     return;
476   }
477   msg = (const struct GNUNET_TESTBED_BarrierInit *) message;
478   op_id = GNUNET_ntohll (msg->op_id);
479   name = msg->name;
480   name_len = (size_t) msize - sizeof (struct GNUNET_TESTBED_BarrierInit);
481   GNUNET_CRYPTO_hash (name, name_len, &hash);
482   if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (barrier_map, &hash))
483   {
484     GST_send_operation_fail_msg (client, op_id, "Barrier already initialised");
485     GNUNET_SERVER_receive_done (client, GNUNET_OK);
486     return;
487   }
488   barrier = GNUNET_malloc (sizeof (struct Barrier));
489   (void) memcpy (&barrier->hash, &hash, sizeof (struct GNUNET_HashCode));
490   barrier->quorum = msg->quorum;
491   barrier->name = GNUNET_malloc (name_len + 1);
492   barrier->name[name_len] = '\0';
493   (void) memcpy (barrier->name, name, name_len);
494   GNUNET_assert (GNUNET_OK ==
495                  GNUNET_CONTAINER_multihashmap_put (barrier_map,
496                                                     &barrier->hash,
497                                                     barrier,
498                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
499   GNUNET_SERVER_receive_done (client, GNUNET_OK);
500   /* Propagate barrier init to subcontrollers */
501   for (cnt = 0; cnt < GST_slave_list_size; cnt++)
502   {
503     if (NULL == (slave = GST_slave_list[cnt]))
504       continue;
505     if (NULL == slave->controller)
506     {
507       GNUNET_break (0);/* May happen when we are connecting to the controller */
508       continue;
509     }    
510     GNUNET_break (0);           /* FIXME */
511   }
512 }