- more barrier stuff
[oweals/gnunet.git] / src / testbed / gnunet-service-testbed_barriers.c
1 /*
2   This file is part of GNUnet.
3   (C) 2008--2013 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file testbed/gnunet-service-testbed_barriers.c
23  * @brief barrier handling at the testbed controller
24  * @author Sree Harsha Totakura <sreeharsha@totakura.in> 
25  */
26
27 #include "gnunet-service-testbed.h"
28
29 /**
30  * timeout for outgoing message transmissions in seconds
31  */
32 #define MESSAGE_SEND_TIMEOUT(s) \
33   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, s)
34
35
36 /**
37  * Test to see if local peers have reached the required quorum of a barrier
38  */
39 #define LOCAL_QUORUM_REACHED(barrier)           \
40   ((barrier->quorum * GST_num_local_peers) <= (barrier->nreached * 100))
41
42
43 /**
44  * Barrier
45  */
46 struct Barrier;
47
48
49 /**
50  * Message queue for transmitting messages
51  */
52 struct MessageQueue
53 {
54   /**
55    * next pointer for DLL
56    */
57   struct MessageQueue *next;
58
59   /**
60    * prev pointer for DLL
61    */
62   struct MessageQueue *prev;
63
64   /**
65    * The message to be sent
66    */
67   struct GNUNET_MessageHeader *msg;
68 };
69
70
71 /**
72  * Context to be associated with each client
73  */
74 struct ClientCtx
75 {
76   /**
77    * The barrier this client is waiting for
78    */
79   struct Barrier *barrier;
80
81   /**
82    * DLL next ptr
83    */
84   struct ClientCtx *next;
85
86   /**
87    * DLL prev ptr
88    */
89   struct ClientCtx *prev;
90
91   /**
92    * The client handle
93    */
94   struct GNUNET_SERVER_Client *client;
95
96   /**
97    * the transmission handle
98    */
99   struct GNUNET_SERVER_TransmitHandle *tx;
100
101   /**
102    * message queue head
103    */
104   struct MessageQueue *mq_head;
105
106   /**
107    * message queue tail
108    */
109   struct MessageQueue *mq_tail;
110 };
111
112
113 /**
114  * Wrapper around Barrier handle
115  */
116 struct WBarrier
117 {
118   /**
119    * DLL next pointer
120    */
121   struct WBarrier *next;
122
123   /**
124    * DLL prev pointer
125    */
126   struct WBarrier *prev;
127
128   /**
129    * The local barrier associated with the creation of this wrapper
130    */
131   struct Barrier *barrier;
132
133   /**
134    * The barrier handle from API
135    */
136   struct GNUNET_TESTBED_Barrier *hbarrier;
137
138   /**
139    * Has this barrier been crossed?
140    */
141   uint8_t reached;
142 };
143
144
145 /**
146  * Barrier
147  */
148 struct Barrier
149 {
150   /**
151    * The hashcode of the barrier name
152    */
153   struct GNUNET_HashCode hash;
154
155   /**
156    * The name of the barrier
157    */
158   char *name;
159
160   /**
161    * DLL head for the list of clients waiting for this barrier
162    */
163   struct ClientCtx *head;
164
165   /**
166    * DLL tail for the list of clients waiting for this barrier
167    */
168   struct ClientCtx *tail;
169
170   /**
171    * DLL head for the list of barrier handles
172    */
173   struct WBarrier *whead;
174
175   /**
176    * DLL tail for the list of barrier handles
177    */
178   struct WBarrier *wtail;
179
180   /**
181    * Number of barriers wrapped in the above DLL
182    */
183   unsigned int num_wbarriers;
184
185   /**
186    * Number of wrapped barriers reached so far
187    */
188   unsigned int num_wbarriers_reached;
189
190   /**
191    * Number of peers which have reached this barrier
192    */
193   unsigned int nreached;
194
195   /**
196    * Number of slaves we have initialised this barrier
197    */
198   unsigned int nslaves;
199
200   /**
201    * Quorum percentage to be reached
202    */
203   uint8_t quorum;
204   
205   /**
206    * Was there a timeout while propagating initialisation
207    */
208   uint8_t timedout;
209 };
210
211
212 /**
213  * Hashtable handle for storing initialised barriers
214  */
215 static struct GNUNET_CONTAINER_MultiHashMap *barrier_map;
216
217 /**
218  * Service context
219  */
220 static struct GNUNET_SERVICE_Context *ctx;
221
222
223 /**
224  * Function called to notify a client about the connection
225  * begin ready to queue more data.  "buf" will be
226  * NULL and "size" zero if the connection was closed for
227  * writing in the meantime.
228  *
229  * @param cls client context
230  * @param size number of bytes available in buf
231  * @param buf where the callee should write the message
232  * @return number of bytes written to buf
233  */
234 static size_t 
235 transmit_ready_cb (void *cls, size_t size, void *buf)
236 {
237   struct ClientCtx *ctx = cls;
238   struct GNUNET_SERVER_Client *client = ctx->client;
239   struct MessageQueue *mq;
240   struct GNUNET_MessageHeader *msg;
241   size_t wrote;
242
243   ctx->tx = NULL;
244   wrote = 0;
245   if ((0 == size) || (NULL == buf))
246   {
247     GNUNET_assert (NULL != ctx->client);
248     GNUNET_SERVER_client_drop (ctx->client);
249     ctx->client = NULL;    
250     return 0;
251   }
252   mq = ctx->mq_head;
253   msg = mq->msg;
254   wrote = ntohs (msg->size);
255   GNUNET_assert (size >= wrote);
256   (void) memcpy (buf, msg, wrote);
257   GNUNET_CONTAINER_DLL_remove (ctx->mq_head, ctx->mq_tail, mq);
258   GNUNET_free (mq->msg);
259   GNUNET_free (mq);
260   if (NULL != (mq = ctx->mq_head))
261     ctx->tx = GNUNET_SERVER_notify_transmit_ready (client, ntohs (msg->size),
262                                                   MESSAGE_SEND_TIMEOUT (30),
263                                                   &transmit_ready_cb, ctx);
264   return wrote;
265 }
266
267
268 /**
269  * Queue a message into a clients message queue
270  *
271  * @param ctx the context associated with the client
272  * @param msg the message to queue.  Will be consumed
273  */
274 static void
275 queue_message (struct ClientCtx *ctx, struct GNUNET_MessageHeader *msg)
276 {
277   struct MessageQueue *mq;
278   struct GNUNET_SERVER_Client *client = ctx->client;
279   
280   mq = GNUNET_malloc (sizeof (struct MessageQueue));
281   mq->msg = msg;
282   GNUNET_CONTAINER_DLL_insert_tail (ctx->mq_head, ctx->mq_tail, mq);
283   if (NULL == ctx->tx)
284    ctx->tx = GNUNET_SERVER_notify_transmit_ready (client, ntohs (msg->size),
285                                                   MESSAGE_SEND_TIMEOUT (30),
286                                                   &transmit_ready_cb, ctx);
287 }
288
289
290 #if 0
291 /**
292  * Function to remove a barrier from the barrier map and cleanup resources
293  * occupied by a barrier
294  *
295  * @param barrier the barrier handle
296  */
297 static void
298 remove_barrier (struct Barrier *barrier)
299 {
300   GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (barrier_map,
301                                                                     &barrier->hash,
302                                                                     barrier));
303   GNUNET_free (barrier->name);
304   GNUNET_free (barrier);
305 }
306
307
308 /**
309  * Function called upon timeout while waiting for a response from the
310  * subcontrollers to barrier init message
311  *
312  * @param 
313  * @return 
314  */
315 static void
316 fwd_tout_barrier_init (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
317 {
318   struct ForwardedOperationContext *foctx = cls;
319   struct Barrier *barrier = foctx->cls;
320   
321   barrier->nslaves--;
322   barrier->timedout = GNUNET_YES;
323   if (0 == barrier->nslaves)
324   {
325     GST_send_operation_fail_msg (foctx->client, foctx->operation_id,
326                                  "Timeout while contacting a slave controller");
327     remove_barrier (barrier);
328   }
329 }
330 #endif
331
332 /**
333  * Task for sending barrier crossed notifications to waiting client
334  *
335  * @param cls the barrier which is crossed
336  * @param tc scheduler task context
337  */
338 static void
339 notify_task_cb (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
340 {
341   struct Barrier *barrier = cls;
342   struct ClientCtx *client_ctx;
343   struct GNUNET_TESTBED_BarrierStatus *msg;
344   struct GNUNET_MessageHeader *dup_msg;
345   uint16_t name_len;
346   uint16_t msize;
347
348   name_len = strlen (barrier->name) + 1;
349   msize = sizeof (struct GNUNET_TESTBED_BarrierStatus) + name_len;  
350   msg = GNUNET_malloc (msize);
351   msg->header.size = htons (msize);
352   msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS);
353   msg->status = 0;
354   msg->name_len = htons (name_len);
355   (void) memcpy (msg->data, barrier->name, name_len);
356   msg->data[name_len] = '\0';
357   while (NULL != (client_ctx = barrier->head))
358   {
359     dup_msg = GNUNET_copy_message (&msg->header);
360     queue_message (client_ctx, dup_msg);
361     GNUNET_CONTAINER_DLL_remove (barrier->head, barrier->tail, client_ctx);
362     GNUNET_SERVER_client_set_user_context_ (client_ctx->client, NULL, 0);
363     GNUNET_free (client_ctx);
364   }
365 }
366
367
368 /**
369  * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT messages.  This
370  * message should come from peers or a shared helper service using the
371  * testbed-barrier client API (@see gnunet_testbed_barrier_service.h)
372  *
373  * This handler is queued in the main service and will handle the messages sent
374  * either from the testbed driver or from a high level controller
375  *
376  * @param cls NULL
377  * @param client identification of the client
378  * @param message the actual message
379  */
380 static void
381 handle_barrier_wait (void *cls, struct GNUNET_SERVER_Client *client,
382                      const struct GNUNET_MessageHeader *message)
383 {
384   const struct GNUNET_TESTBED_BarrierWait *msg;
385   struct Barrier *barrier;
386   char *name;
387   struct ClientCtx *client_ctx;
388   struct GNUNET_HashCode key;
389   size_t name_len;
390   uint16_t msize;
391   
392   msize = ntohs (message->size);
393   if (msize <= sizeof (struct GNUNET_TESTBED_BarrierWait))
394   {
395     GNUNET_break_op (0);
396     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
397     return;
398   }
399   if (NULL == barrier_map)
400   {
401     GNUNET_break (0);
402     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
403     return;
404   }
405   msg = (const struct GNUNET_TESTBED_BarrierWait *) message;
406   name_len = msize - sizeof (struct GNUNET_TESTBED_BarrierWait);
407   name = GNUNET_malloc (name_len + 1);
408   name[name_len] = '\0';
409   (void) memcpy (name, msg->name, name_len);
410   GNUNET_CRYPTO_hash (name, name_len - 1, &key);
411   if (NULL == (barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &key)))
412   {
413     GNUNET_break (0);
414     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
415     GNUNET_free (name);
416     return;
417   }
418   client_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientCtx);
419   if (NULL == client_ctx)
420   {
421     client_ctx = GNUNET_malloc (sizeof (struct ClientCtx));
422     client_ctx->client = client;
423     GNUNET_SERVER_client_keep (client);
424     client_ctx->barrier = barrier;
425     GNUNET_CONTAINER_DLL_insert_tail (barrier->head, barrier->tail, client_ctx);
426     barrier->nreached++;
427     if (LOCAL_QUORUM_REACHED (barrier))
428       notify_task_cb (barrier, NULL);
429   }
430   GNUNET_SERVER_receive_done (client, GNUNET_OK);
431 }
432
433
434 /**
435  * Functions with this signature are called whenever a client
436  * is disconnected on the network level.
437  *
438  * @param cls closure
439  * @param client identification of the client; NULL
440  *        for the last call when the server is destroyed
441  */
442 static void
443 disconnect_cb (void *cls, struct GNUNET_SERVER_Client *client)
444 {
445   struct ClientCtx *client_ctx;
446   struct Barrier *barrier;
447   
448   client_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientCtx);
449   if (NULL == client_ctx)
450     return;
451   barrier = client_ctx->barrier;
452   GNUNET_CONTAINER_DLL_remove (barrier->head, barrier->tail, client_ctx);
453   if (NULL != client_ctx->tx)
454     GNUNET_SERVER_notify_transmit_ready_cancel (client_ctx->tx);
455   
456 }
457
458
459 /**
460  * Function to initialise barrriers component
461  */
462 void
463 GST_barriers_init (struct GNUNET_CONFIGURATION_Handle *cfg)
464 {
465   static const struct GNUNET_SERVER_MessageHandler message_handlers[] = {
466     {&handle_barrier_wait, NULL, GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT, 0},
467     {NULL, NULL, 0, 0}
468   };
469   struct GNUNET_SERVER_Handle *srv;
470
471   barrier_map = GNUNET_CONTAINER_multihashmap_create (3, GNUNET_YES);
472   ctx = GNUNET_SERVICE_start ("testbed-barrier", cfg,
473                               GNUNET_SERVICE_OPTION_MANUAL_SHUTDOWN);
474   srv = GNUNET_SERVICE_get_server (ctx);
475   GNUNET_SERVER_add_handlers (srv, message_handlers);
476   GNUNET_SERVER_disconnect_notify (srv, &disconnect_cb, NULL);  
477 }
478
479
480 /**
481  * Function to stop the barrier service
482  */
483 void
484 GST_barriers_stop ()
485 {
486   GNUNET_assert (NULL != barrier_map);
487   GNUNET_CONTAINER_multihashmap_destroy (barrier_map);
488   GNUNET_assert (NULL != ctx);
489   GNUNET_SERVICE_stop (ctx);
490 }
491
492
493 /**
494  * Functions of this type are to be given as callback argument to
495  * GNUNET_TESTBED_barrier_init().  The callback will be called when status
496  * information is available for the barrier.
497  *
498  * @param cls the closure given to GNUNET_TESTBED_barrier_init()
499  * @param name the name of the barrier
500  * @param barrier the barrier handle
501  * @param status status of the barrier; GNUNET_OK if the barrier is crossed;
502  *   GNUNET_SYSERR upon error
503  * @param emsg if the status were to be GNUNET_SYSERR, this parameter has the
504  *   error messsage
505  */
506 static void 
507 wbarrier_status_cb (void *cls, const char *name,
508                    struct GNUNET_TESTBED_Barrier *b_,
509                    int status, const char *emsg)
510 {
511   struct WBarrier *wrapper = cls;
512   struct Barrier *barrier = wrapper->barrier;
513
514   GNUNET_assert (b_ == wrapper->hbarrier);
515   wrapper->hbarrier = NULL;
516   GNUNET_CONTAINER_DLL_remove (barrier->whead, barrier->wtail, wrapper);
517   GNUNET_free (wrapper);
518   if (GNUNET_SYSERR == status)
519   {
520     LOG (GNUNET_ERROR_TYPE_ERROR,
521          "Initialising barrier (%s) failed at a sub-controller: %s\n",
522          barrier->name, (NULL != emsg) ? emsg : "NULL");
523     while (NULL != (wrapper = barrier->whead))
524     {
525       GNUNET_TESTBED_barrier_cancel (wrapper->hbarrier);
526       GNUNET_CONTAINER_DLL_remove (barrier->whead, barrier->wtail, wrapper);
527       GNUNET_free (wrapper);
528     }
529     /* Send parent controller failure message */
530     GNUNET_break (0);
531   }
532   barrier->num_wbarriers_reached++;
533   if ((barrier->num_wbarriers_reached == barrier->num_wbarriers)
534       && (LOCAL_QUORUM_REACHED (barrier)))
535   {
536     /* Send parent controller success status message */
537     GNUNET_break (0);    
538   }
539   return;
540 }
541
542
543 /**
544  * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_INIT messages.  This
545  * message should always come from a parent controller or the testbed API if we
546  * are the root controller.
547  *
548  * This handler is queued in the main service and will handle the messages sent
549  * either from the testbed driver or from a high level controller
550  *
551  * @param cls NULL
552  * @param client identification of the client
553  * @param message the actual message
554  */
555 void
556 GST_handle_barrier_init (void *cls, struct GNUNET_SERVER_Client *client,
557                          const struct GNUNET_MessageHeader *message)
558 {
559   const struct GNUNET_TESTBED_BarrierInit *msg;
560   const char *name;
561   struct Barrier *barrier;
562   struct Slave *slave;
563   struct WBarrier *wrapper;
564   struct GNUNET_HashCode hash;
565   size_t name_len;
566   uint64_t op_id;
567   unsigned int cnt;
568   uint16_t msize;
569   
570   if (NULL == GST_context)
571   {
572     GNUNET_break_op (0);
573     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
574     return;
575   }
576   if (client != GST_context->client)
577   {
578     GNUNET_break_op (0);
579     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
580     return;
581   }
582   msize = ntohs (message->size);
583   if (msize <= sizeof (struct GNUNET_TESTBED_BarrierInit))
584   {
585     GNUNET_break_op (0);
586     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
587     return;
588   }
589   msg = (const struct GNUNET_TESTBED_BarrierInit *) message;
590   op_id = GNUNET_ntohll (msg->op_id);
591   name = msg->name;
592   name_len = (size_t) msize - sizeof (struct GNUNET_TESTBED_BarrierInit);
593   GNUNET_CRYPTO_hash (name, name_len, &hash);
594   if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (barrier_map, &hash))
595   {
596     GST_send_operation_fail_msg (client, op_id, "Barrier already initialised");
597     GNUNET_SERVER_receive_done (client, GNUNET_OK);
598     return;
599   }
600   barrier = GNUNET_malloc (sizeof (struct Barrier));
601   (void) memcpy (&barrier->hash, &hash, sizeof (struct GNUNET_HashCode));
602   barrier->quorum = msg->quorum;
603   barrier->name = GNUNET_malloc (name_len + 1);
604   barrier->name[name_len] = '\0';
605   (void) memcpy (barrier->name, name, name_len);
606   GNUNET_assert (GNUNET_OK ==
607                  GNUNET_CONTAINER_multihashmap_put (barrier_map,
608                                                     &barrier->hash,
609                                                     barrier,
610                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
611   GNUNET_SERVER_receive_done (client, GNUNET_OK);
612   /* Propagate barrier init to subcontrollers */
613   for (cnt = 0; cnt < GST_slave_list_size; cnt++)
614   {
615     if (NULL == (slave = GST_slave_list[cnt]))
616       continue;
617     if (NULL == slave->controller)
618     {
619       GNUNET_break (0);/* May happen when we are connecting to the controller */
620       continue;
621     }    
622     wrapper = GNUNET_malloc (sizeof (struct WBarrier));
623     wrapper->barrier = barrier;
624     wrapper->hbarrier = GNUNET_TESTBED_barrier_init (slave->controller,
625                                                      barrier->name,
626                                                      barrier->quorum,
627                                                      &wbarrier_status_cb,
628                                                      wrapper);
629   }
630 }