- more barrier stuff
[oweals/gnunet.git] / src / testbed / gnunet-service-testbed_barriers.c
1 /*
2   This file is part of GNUnet.
3   (C) 2008--2013 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file testbed/gnunet-service-testbed_barriers.c
23  * @brief barrier handling at the testbed controller
24  * @author Sree Harsha Totakura <sreeharsha@totakura.in> 
25  */
26
27 #include "gnunet-service-testbed.h"
28
29 /**
30  * timeout for outgoing message transmissions in seconds
31  */
32 #define MESSAGE_SEND_TIMEOUT(s) \
33   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, s)
34
35
36 /**
37  * Test to see if local peers have reached the required quorum of a barrier
38  */
39 #define LOCAL_QUORUM_REACHED(barrier)           \
40   ((barrier->quorum * GST_num_local_peers) <= (barrier->nreached * 100))
41
42
43 /**
44  * Barrier
45  */
46 struct Barrier;
47
48
49 /**
50  * Message queue for transmitting messages
51  */
52 struct MessageQueue
53 {
54   /**
55    * next pointer for DLL
56    */
57   struct MessageQueue *next;
58
59   /**
60    * prev pointer for DLL
61    */
62   struct MessageQueue *prev;
63
64   /**
65    * The message to be sent
66    */
67   struct GNUNET_MessageHeader *msg;
68 };
69
70
71 /**
72  * Context to be associated with each client
73  */
74 struct ClientCtx
75 {
76   /**
77    * The barrier this client is waiting for
78    */
79   struct Barrier *barrier;
80
81   /**
82    * DLL next ptr
83    */
84   struct ClientCtx *next;
85
86   /**
87    * DLL prev ptr
88    */
89   struct ClientCtx *prev;
90
91   /**
92    * The client handle
93    */
94   struct GNUNET_SERVER_Client *client;
95
96   /**
97    * the transmission handle
98    */
99   struct GNUNET_SERVER_TransmitHandle *tx;
100
101   /**
102    * message queue head
103    */
104   struct MessageQueue *mq_head;
105
106   /**
107    * message queue tail
108    */
109   struct MessageQueue *mq_tail;
110 };
111
112
113 /**
114  * Wrapper around Barrier handle
115  */
116 struct WBarrier
117 {
118   /**
119    * DLL next pointer
120    */
121   struct WBarrier *next;
122
123   /**
124    * DLL prev pointer
125    */
126   struct WBarrier *prev;
127
128   /**
129    * The local barrier associated with the creation of this wrapper
130    */
131   struct Barrier *barrier;
132
133   /**
134    * The barrier handle from API
135    */
136   struct GNUNET_TESTBED_Barrier *hbarrier;
137
138   /**
139    * Has this barrier been crossed?
140    */
141   uint8_t reached;
142 };
143
144
145 /**
146  * Barrier
147  */
148 struct Barrier
149 {
150   /**
151    * The hashcode of the barrier name
152    */
153   struct GNUNET_HashCode hash;
154
155   /**
156    * The client handle to the master controller
157    */
158   struct GNUNET_SERVER_Client *client;
159
160   /**
161    * The name of the barrier
162    */
163   char *name;
164
165   /**
166    * DLL head for the list of clients waiting for this barrier
167    */
168   struct ClientCtx *head;
169
170   /**
171    * DLL tail for the list of clients waiting for this barrier
172    */
173   struct ClientCtx *tail;
174
175   /**
176    * DLL head for the list of barrier handles
177    */
178   struct WBarrier *whead;
179
180   /**
181    * DLL tail for the list of barrier handles
182    */
183   struct WBarrier *wtail;
184
185   /**
186    * Identifier for the timeout task
187    */
188   GNUNET_SCHEDULER_TaskIdentifier tout_task;
189   
190   /**
191    * The status of this barrier
192    */
193   enum GNUNET_TESTBED_BarrierStatus status;
194   
195   /**
196    * Number of barriers wrapped in the above DLL
197    */
198   unsigned int num_wbarriers;
199
200   /**
201    * Number of wrapped barriers reached so far
202    */
203   unsigned int num_wbarriers_reached;
204
205   /**
206    * Number of wrapped barrier initialised so far
207    */
208   unsigned int num_wbarriers_inited;
209
210   /**
211    * Number of peers which have reached this barrier
212    */
213   unsigned int nreached;
214
215   /**
216    * Number of slaves we have initialised this barrier
217    */
218   unsigned int nslaves;
219
220   /**
221    * Quorum percentage to be reached
222    */
223   uint8_t quorum;
224   
225   /**
226    * Was there a timeout while propagating initialisation
227    */
228   uint8_t timedout;
229 };
230
231
232 /**
233  * Hashtable handle for storing initialised barriers
234  */
235 static struct GNUNET_CONTAINER_MultiHashMap *barrier_map;
236
237 /**
238  * Service context
239  */
240 static struct GNUNET_SERVICE_Context *ctx;
241
242
243 /**
244  * Function called to notify a client about the connection
245  * begin ready to queue more data.  "buf" will be
246  * NULL and "size" zero if the connection was closed for
247  * writing in the meantime.
248  *
249  * @param cls client context
250  * @param size number of bytes available in buf
251  * @param buf where the callee should write the message
252  * @return number of bytes written to buf
253  */
254 static size_t 
255 transmit_ready_cb (void *cls, size_t size, void *buf)
256 {
257   struct ClientCtx *ctx = cls;
258   struct GNUNET_SERVER_Client *client = ctx->client;
259   struct MessageQueue *mq;
260   struct GNUNET_MessageHeader *msg;
261   size_t wrote;
262
263   ctx->tx = NULL;
264   wrote = 0;
265   if ((0 == size) || (NULL == buf))
266   {
267     GNUNET_assert (NULL != ctx->client);
268     GNUNET_SERVER_client_drop (ctx->client);
269     ctx->client = NULL;    
270     return 0;
271   }
272   mq = ctx->mq_head;
273   msg = mq->msg;
274   wrote = ntohs (msg->size);
275   GNUNET_assert (size >= wrote);
276   (void) memcpy (buf, msg, wrote);
277   GNUNET_CONTAINER_DLL_remove (ctx->mq_head, ctx->mq_tail, mq);
278   GNUNET_free (mq->msg);
279   GNUNET_free (mq);
280   if (NULL != (mq = ctx->mq_head))
281     ctx->tx = GNUNET_SERVER_notify_transmit_ready (client, ntohs (msg->size),
282                                                   MESSAGE_SEND_TIMEOUT (30),
283                                                   &transmit_ready_cb, ctx);
284   return wrote;
285 }
286
287
288 /**
289  * Queue a message into a clients message queue
290  *
291  * @param ctx the context associated with the client
292  * @param msg the message to queue.  Will be consumed
293  */
294 static void
295 queue_message (struct ClientCtx *ctx, struct GNUNET_MessageHeader *msg)
296 {
297   struct MessageQueue *mq;
298   struct GNUNET_SERVER_Client *client = ctx->client;
299   
300   mq = GNUNET_malloc (sizeof (struct MessageQueue));
301   mq->msg = msg;
302   GNUNET_CONTAINER_DLL_insert_tail (ctx->mq_head, ctx->mq_tail, mq);
303   if (NULL == ctx->tx)
304    ctx->tx = GNUNET_SERVER_notify_transmit_ready (client, ntohs (msg->size),
305                                                   MESSAGE_SEND_TIMEOUT (30),
306                                                   &transmit_ready_cb, ctx);
307 }
308
309
310 /**
311  * Function to remove a barrier from the barrier map and cleanup resources
312  * occupied by a barrier
313  *
314  * @param barrier the barrier handle
315  */
316 static void
317 remove_barrier (struct Barrier *barrier)
318 {
319   GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (barrier_map,
320                                                                     &barrier->hash,
321                                                                     barrier));
322   GNUNET_free (barrier->name);
323   if (NULL != barrier->client)
324     GNUNET_SERVER_client_drop (barrier->client);
325   GNUNET_free (barrier);
326 }
327
328
329 /**
330  * Cancels all subcontroller barrier handles
331  *
332  * @param barrier the local barrier
333  */
334 static void
335 cancel_wrappers (struct Barrier *barrier)
336 {
337   struct WBarrier *wrapper;
338
339   while (NULL != (wrapper = barrier->whead))
340   {
341     GNUNET_TESTBED_barrier_cancel (wrapper->hbarrier);
342     GNUNET_CONTAINER_DLL_remove (barrier->whead, barrier->wtail, wrapper);
343     GNUNET_free (wrapper);
344   }
345 }
346
347
348 /**
349  * Sends a barrier failed message
350  *
351  * @param barrier the corresponding barrier
352  * @param status the status of the barrier
353  * @param emsg the error message; should be non-NULL for
354  *   status=BARRIER_STATUS_ERROR 
355  */
356 static void
357 send_barrier_status_msg (struct Barrier *barrier, 
358                          enum GNUNET_TESTBED_BarrierStatus status,
359                          const char *emsg)
360 {
361   struct GNUNET_TESTBED_BarrierStatusMsg *msg;
362   size_t name_len;
363   uint16_t msize;
364
365   GNUNET_assert ((NULL == emsg) || (BARRIER_STATUS_ERROR == status));
366   name_len = strlen (barrier->name) + 1;
367   msize = sizeof (struct GNUNET_TESTBED_BarrierStatusMsg)
368       + name_len
369       + (NULL == emsg) ? 0 : strlen (emsg) + 1;
370   msg = GNUNET_malloc (msize);
371   msg->status = htons (status);
372   msg->name_len = htons (name_len);
373   (void) memcpy (msg->data, barrier->name, name_len);
374   if (NULL != emsg)
375     (void) memcpy (msg->data + name_len, emsg, strlen (emsg) + 1);
376   GST_queue_message (barrier->client, &msg->header);
377 }
378
379
380
381 /**
382  * Task for sending barrier crossed notifications to waiting client
383  *
384  * @param cls the barrier which is crossed
385  * @param tc scheduler task context
386  */
387 static void
388 notify_task_cb (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
389 {
390   struct Barrier *barrier = cls;
391   struct ClientCtx *client_ctx;
392   struct GNUNET_TESTBED_BarrierStatusMsg *msg;
393   struct GNUNET_MessageHeader *dup_msg;
394   uint16_t name_len;
395   uint16_t msize;
396
397   name_len = strlen (barrier->name) + 1;
398   msize = sizeof (struct GNUNET_TESTBED_BarrierStatusMsg) + name_len;  
399   msg = GNUNET_malloc (msize);
400   msg->header.size = htons (msize);
401   msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS);
402   msg->status = 0;
403   msg->name_len = htons (name_len);
404   (void) memcpy (msg->data, barrier->name, name_len);
405   msg->data[name_len] = '\0';
406   while (NULL != (client_ctx = barrier->head))
407   {
408     dup_msg = GNUNET_copy_message (&msg->header);
409     queue_message (client_ctx, dup_msg);
410     GNUNET_CONTAINER_DLL_remove (barrier->head, barrier->tail, client_ctx);
411     GNUNET_SERVER_client_set_user_context_ (client_ctx->client, NULL, 0);
412     GNUNET_free (client_ctx);
413   }
414 }
415
416
417 /**
418  * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT messages.  This
419  * message should come from peers or a shared helper service using the
420  * testbed-barrier client API (@see gnunet_testbed_barrier_service.h)
421  *
422  * This handler is queued in the main service and will handle the messages sent
423  * either from the testbed driver or from a high level controller
424  *
425  * @param cls NULL
426  * @param client identification of the client
427  * @param message the actual message
428  */
429 static void
430 handle_barrier_wait (void *cls, struct GNUNET_SERVER_Client *client,
431                      const struct GNUNET_MessageHeader *message)
432 {
433   const struct GNUNET_TESTBED_BarrierWait *msg;
434   struct Barrier *barrier;
435   char *name;
436   struct ClientCtx *client_ctx;
437   struct GNUNET_HashCode key;
438   size_t name_len;
439   uint16_t msize;
440   
441   msize = ntohs (message->size);
442   if (msize <= sizeof (struct GNUNET_TESTBED_BarrierWait))
443   {
444     GNUNET_break_op (0);
445     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
446     return;
447   }
448   if (NULL == barrier_map)
449   {
450     GNUNET_break (0);
451     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
452     return;
453   }
454   msg = (const struct GNUNET_TESTBED_BarrierWait *) message;
455   name_len = msize - sizeof (struct GNUNET_TESTBED_BarrierWait);
456   name = GNUNET_malloc (name_len + 1);
457   name[name_len] = '\0';
458   (void) memcpy (name, msg->name, name_len);
459   GNUNET_CRYPTO_hash (name, name_len - 1, &key);
460   if (NULL == (barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &key)))
461   {
462     GNUNET_break (0);
463     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
464     GNUNET_free (name);
465     return;
466   }
467   client_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientCtx);
468   if (NULL == client_ctx)
469   {
470     client_ctx = GNUNET_malloc (sizeof (struct ClientCtx));
471     client_ctx->client = client;
472     GNUNET_SERVER_client_keep (client);
473     client_ctx->barrier = barrier;
474     GNUNET_CONTAINER_DLL_insert_tail (barrier->head, barrier->tail, client_ctx);
475     barrier->nreached++;
476     if (LOCAL_QUORUM_REACHED (barrier))
477       notify_task_cb (barrier, NULL);
478   }
479   GNUNET_SERVER_receive_done (client, GNUNET_OK);
480 }
481
482
483 /**
484  * Functions with this signature are called whenever a client
485  * is disconnected on the network level.
486  *
487  * @param cls closure
488  * @param client identification of the client; NULL
489  *        for the last call when the server is destroyed
490  */
491 static void
492 disconnect_cb (void *cls, struct GNUNET_SERVER_Client *client)
493 {
494   struct ClientCtx *client_ctx;
495   struct Barrier *barrier;
496   
497   client_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientCtx);
498   if (NULL == client_ctx)
499     return;
500   barrier = client_ctx->barrier;
501   GNUNET_CONTAINER_DLL_remove (barrier->head, barrier->tail, client_ctx);
502   if (NULL != client_ctx->tx)
503     GNUNET_SERVER_notify_transmit_ready_cancel (client_ctx->tx);
504   
505 }
506
507
508 /**
509  * Function to initialise barrriers component
510  */
511 void
512 GST_barriers_init (struct GNUNET_CONFIGURATION_Handle *cfg)
513 {
514   static const struct GNUNET_SERVER_MessageHandler message_handlers[] = {
515     {&handle_barrier_wait, NULL, GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT, 0},
516     {NULL, NULL, 0, 0}
517   };
518   struct GNUNET_SERVER_Handle *srv;
519
520   barrier_map = GNUNET_CONTAINER_multihashmap_create (3, GNUNET_YES);
521   ctx = GNUNET_SERVICE_start ("testbed-barrier", cfg,
522                               GNUNET_SERVICE_OPTION_MANUAL_SHUTDOWN);
523   srv = GNUNET_SERVICE_get_server (ctx);
524   GNUNET_SERVER_add_handlers (srv, message_handlers);
525   GNUNET_SERVER_disconnect_notify (srv, &disconnect_cb, NULL);  
526 }
527
528
529 /**
530  * Function to stop the barrier service
531  */
532 void
533 GST_barriers_stop ()
534 {
535   GNUNET_assert (NULL != barrier_map);
536   GNUNET_CONTAINER_multihashmap_destroy (barrier_map);
537   GNUNET_assert (NULL != ctx);
538   GNUNET_SERVICE_stop (ctx);
539 }
540
541
542 /**
543  * Functions of this type are to be given as callback argument to
544  * GNUNET_TESTBED_barrier_init().  The callback will be called when status
545  * information is available for the barrier.
546  *
547  * @param cls the closure given to GNUNET_TESTBED_barrier_init()
548  * @param name the name of the barrier
549  * @param barrier the barrier handle
550  * @param status status of the barrier; GNUNET_OK if the barrier is crossed;
551  *   GNUNET_SYSERR upon error
552  * @param emsg if the status were to be GNUNET_SYSERR, this parameter has the
553  *   error messsage
554  */
555 static void 
556 wbarrier_status_cb (void *cls, const char *name,
557                     struct GNUNET_TESTBED_Barrier *b_,
558                     enum GNUNET_TESTBED_BarrierStatus status,
559                     const char *emsg)
560 {
561   struct WBarrier *wrapper = cls;
562   struct Barrier *barrier = wrapper->barrier;
563
564   GNUNET_assert (b_ == wrapper->hbarrier);
565   wrapper->hbarrier = NULL;
566   GNUNET_CONTAINER_DLL_remove (barrier->whead, barrier->wtail, wrapper);
567   GNUNET_free (wrapper);
568   if (BARRIER_STATUS_ERROR == status)
569   {
570     LOG (GNUNET_ERROR_TYPE_ERROR,
571          "Initialising barrier `%s' failed at a sub-controller: %s\n",
572          barrier->name, (NULL != emsg) ? emsg : "NULL");
573     cancel_wrappers (barrier);
574     if (NULL == emsg)
575       emsg = "Initialisation failed at a sub-controller";
576     send_barrier_status_msg (barrier, BARRIER_STATUS_ERROR, emsg);
577     return;
578   }
579   switch (status)
580   {
581   case BARRIER_STATUS_CROSSED:
582     barrier->num_wbarriers_reached++;
583     if ((barrier->num_wbarriers_reached == barrier->num_wbarriers)
584         && (LOCAL_QUORUM_REACHED (barrier)))
585       send_barrier_status_msg (barrier, BARRIER_STATUS_CROSSED, NULL);
586     break;
587   case BARRIER_STATUS_INITIALISED:
588     barrier->num_wbarriers_inited++;
589     if (barrier->num_wbarriers_inited == barrier->num_wbarriers)
590       send_barrier_status_msg (barrier, BARRIER_STATUS_INITIALISED, NULL);
591     break;
592   case BARRIER_STATUS_ERROR:
593     GNUNET_assert (0);
594   }
595   return;
596 }
597
598
599 /**
600  * Function called upon timeout while waiting for a response from the
601  * subcontrollers to barrier init message
602  *
603  * @param 
604  * @return 
605  */
606 static void
607 fwd_tout_barrier_init (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
608 {
609   struct Barrier *barrier = cls;
610   
611   barrier->nslaves--;
612   barrier->timedout = GNUNET_YES;
613   cancel_wrappers (barrier);
614   send_barrier_status_msg (barrier, BARRIER_STATUS_ERROR,
615                            "Timedout while propagating barrier initialisation\n");
616   remove_barrier (barrier);
617 }
618
619
620 /**
621  * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_INIT messages.  This
622  * message should always come from a parent controller or the testbed API if we
623  * are the root controller.
624  *
625  * This handler is queued in the main service and will handle the messages sent
626  * either from the testbed driver or from a high level controller
627  *
628  * @param cls NULL
629  * @param client identification of the client
630  * @param message the actual message
631  */
632 void
633 GST_handle_barrier_init (void *cls, struct GNUNET_SERVER_Client *client,
634                          const struct GNUNET_MessageHeader *message)
635 {
636   const struct GNUNET_TESTBED_BarrierInit *msg;
637   const char *name;
638   struct Barrier *barrier;
639   struct Slave *slave;
640   struct WBarrier *wrapper;
641   struct GNUNET_HashCode hash;
642   size_t name_len;
643   uint64_t op_id;
644   unsigned int cnt;
645   uint16_t msize;
646   
647   if (NULL == GST_context)
648   {
649     GNUNET_break_op (0);
650     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
651     return;
652   }
653   if (client != GST_context->client)
654   {
655     GNUNET_break_op (0);
656     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
657     return;
658   }
659   msize = ntohs (message->size);
660   if (msize <= sizeof (struct GNUNET_TESTBED_BarrierInit))
661   {
662     GNUNET_break_op (0);
663     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
664     return;
665   }
666   msg = (const struct GNUNET_TESTBED_BarrierInit *) message;
667   op_id = GNUNET_ntohll (msg->op_id);
668   name = msg->name;
669   name_len = (size_t) msize - sizeof (struct GNUNET_TESTBED_BarrierInit);
670   GNUNET_CRYPTO_hash (name, name_len, &hash);
671   if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (barrier_map, &hash))
672   {
673     GST_send_operation_fail_msg (client, op_id, "Barrier already initialised");
674     GNUNET_SERVER_receive_done (client, GNUNET_OK);
675     return;
676   }
677   barrier = GNUNET_malloc (sizeof (struct Barrier));
678   (void) memcpy (&barrier->hash, &hash, sizeof (struct GNUNET_HashCode));
679   barrier->quorum = msg->quorum;
680   barrier->name = GNUNET_malloc (name_len + 1);
681   barrier->name[name_len] = '\0';
682   (void) memcpy (barrier->name, name, name_len);
683   barrier->client = client;
684   GNUNET_SERVER_client_keep (client);
685   GNUNET_assert (GNUNET_OK ==
686                  GNUNET_CONTAINER_multihashmap_put (barrier_map,
687                                                     &barrier->hash,
688                                                     barrier,
689                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
690   GNUNET_SERVER_receive_done (client, GNUNET_OK);
691   /* Propagate barrier init to subcontrollers */
692   for (cnt = 0; cnt < GST_slave_list_size; cnt++)
693   {
694     if (NULL == (slave = GST_slave_list[cnt]))
695       continue;
696     if (NULL == slave->controller)
697     {
698       GNUNET_break (0);/* May happen when we are connecting to the controller */
699       continue;
700     }    
701     wrapper = GNUNET_malloc (sizeof (struct WBarrier));
702     wrapper->barrier = barrier;
703     GNUNET_CONTAINER_DLL_insert_tail (barrier->whead, barrier->wtail, wrapper);
704     wrapper->hbarrier = GNUNET_TESTBED_barrier_init (slave->controller,
705                                                      barrier->name,
706                                                      barrier->quorum,
707                                                      &wbarrier_status_cb,
708                                                      wrapper);    
709   }
710   if (NULL == barrier->whead)   /* No further propagation */
711     send_barrier_status_msg (barrier, BARRIER_STATUS_INITIALISED, NULL);
712   else
713     barrier->tout_task = GNUNET_SCHEDULER_add_delayed (MESSAGE_SEND_TIMEOUT (30),
714                                                        &fwd_tout_barrier_init,
715                                                        barrier);
716 }
717
718 /* end of gnunet-service-testbed_barriers.c */