- distribute peers equally among island nodes on SuperMUC
[oweals/gnunet.git] / src / testbed / testbed_api.c
1 /*
2       This file is part of GNUnet
3       (C) 2008--2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * @file testbed/testbed_api.c
23  * @brief API for accessing the GNUnet testing service.
24  *        This library is supposed to make it easier to write
25  *        testcases and script large-scale benchmarks.
26  * @author Christian Grothoff
27  * @author Sree Harsha Totakura
28  */
29
30
31 #include "platform.h"
32 #include "gnunet_testbed_service.h"
33 #include "gnunet_core_service.h"
34 #include "gnunet_constants.h"
35 #include "gnunet_transport_service.h"
36 #include "gnunet_hello_lib.h"
37 #include <zlib.h>
38
39 #include "testbed.h"
40 #include "testbed_api.h"
41 #include "testbed_api_hosts.h"
42 #include "testbed_api_peers.h"
43 #include "testbed_api_operations.h"
44 #include "testbed_api_sd.h"
45
46 /**
47  * Generic logging shorthand
48  */
49 #define LOG(kind, ...)                          \
50   GNUNET_log_from (kind, "testbed-api", __VA_ARGS__)
51
52 /**
53  * Debug logging
54  */
55 #define LOG_DEBUG(...)                          \
56   LOG (GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__)
57
58 /**
59  * Relative time seconds shorthand
60  */
61 #define TIME_REL_SECS(sec) \
62   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
63
64
65 /**
66  * Default server message sending retry timeout
67  */
68 #define TIMEOUT_REL TIME_REL_SECS(1)
69
70
71 /**
72  * The message queue for sending messages to the controller service
73  */
74 struct MessageQueue
75 {
76   /**
77    * The message to be sent
78    */
79   struct GNUNET_MessageHeader *msg;
80
81   /**
82    * next pointer for DLL
83    */
84   struct MessageQueue *next;
85
86   /**
87    * prev pointer for DLL
88    */
89   struct MessageQueue *prev;
90 };
91
92
93 /**
94  * Context data for forwarded Operation
95  */
96 struct ForwardedOperationData
97 {
98
99   /**
100    * The callback to call when reply is available
101    */
102   GNUNET_CLIENT_MessageHandler cc;
103
104   /**
105    * The closure for the above callback
106    */
107   void *cc_cls;
108
109 };
110
111
112 /**
113  * Context data for get slave config operations
114  */
115 struct GetSlaveConfigData
116 {
117   /**
118    * The id of the slave controller
119    */
120   uint32_t slave_id;
121
122 };
123
124
125 /**
126  * Context data for controller link operations
127  */
128 struct ControllerLinkData
129 {
130   /**
131    * The controller link message
132    */
133   struct GNUNET_TESTBED_ControllerLinkRequest *msg;
134
135   /**
136    * The id of the host which is hosting the controller to be linked
137    */
138   uint32_t host_id;
139
140 };
141
142
143 /**
144  * Date context for OP_SHUTDOWN_PEERS operations
145  */
146 struct ShutdownPeersData
147 {
148   /**
149    * The operation completion callback to call
150    */
151   GNUNET_TESTBED_OperationCompletionCallback cb;
152
153   /**
154    * The closure for the above callback
155    */
156   void *cb_cls;
157 };
158
159
160 /**
161  * An entry in the stack for keeping operations which are about to expire
162  */
163 struct ExpireOperationEntry
164 {
165   /**
166    * DLL head; new entries are to be inserted here
167    */
168   struct ExpireOperationEntry *next;
169
170   /**
171    * DLL tail; entries are deleted from here
172    */
173   struct ExpireOperationEntry *prev;
174
175   /**
176    * The operation.  This will be a dangling pointer when the operation is freed
177    */
178   const struct GNUNET_TESTBED_Operation *op;
179 };
180
181
182 /**
183  * DLL head for list of operations marked for expiry
184  */
185 static struct ExpireOperationEntry *exop_head;
186
187 /**
188  * DLL tail for list of operation marked for expiry
189  */
190 static struct ExpireOperationEntry *exop_tail;
191
192
193 /**
194  * Inserts an operation into the list of operations marked for expiry
195  *
196  * @param op the operation to insert
197  */
198 static void
199 exop_insert (struct GNUNET_TESTBED_Operation *op)
200 {
201   struct ExpireOperationEntry *entry;
202
203   entry = GNUNET_malloc (sizeof (struct ExpireOperationEntry));
204   entry->op = op;
205   GNUNET_CONTAINER_DLL_insert_tail (exop_head, exop_tail, entry);  
206 }
207
208
209 /**
210  * Checks if an operation is present in the list of operations marked for
211  * expiry.  If the operation is found, it and the tail of operations after it
212  * are removed from the list.
213  *
214  * @param op the operation to check
215  * @return GNUNET_NO if the operation is not present in the list; GNUNET_YES if
216  *           the operation is found in the list (the operation is then removed
217  *           from the list -- calling this function again with the same
218  *           paramenter will return GNUNET_NO)
219  */
220 static int
221 exop_check (const struct GNUNET_TESTBED_Operation *const op)
222 {
223   struct ExpireOperationEntry *entry;
224   struct ExpireOperationEntry *entry2;
225   int found;
226
227   found = GNUNET_NO;
228   entry = exop_head;
229   while (NULL != entry)
230   {
231     if (op == entry->op)
232     {
233       found = GNUNET_YES;
234       break;
235     }
236     entry = entry->next;
237   }
238   if (GNUNET_NO == found)
239     return GNUNET_NO;
240   /* Truncate the tail */
241   while (NULL != entry)
242   {
243     entry2 = entry->next;
244     GNUNET_CONTAINER_DLL_remove (exop_head, exop_tail, entry);
245     GNUNET_free (entry);
246     entry = entry2;
247   }
248   return GNUNET_YES;
249 }
250
251
252 /**
253  * Context information to be used while searching for operation contexts
254  */
255 struct SearchContext
256 {
257   /**
258    * The result of the search
259    */
260   struct OperationContext *opc;
261
262   /**
263    * The id of the operation context we are searching for
264    */
265   uint64_t id;
266 };
267
268
269 /**
270  * Search iterator for searching an operation context
271  *
272  * @param cls the serach context
273  * @param key current key code
274  * @param value value in the hash map
275  * @return GNUNET_YES if we should continue to
276  *         iterate,
277  *         GNUNET_NO if not.
278  */
279 static int
280 opc_search_iterator (void *cls, uint32_t key, void *value)
281 {
282   struct SearchContext *sc = cls;
283   struct OperationContext *opc = value;
284   
285   GNUNET_assert (NULL != opc);
286   GNUNET_assert (NULL == sc->opc);
287   if (opc->id != sc->id)
288     return GNUNET_YES;
289   sc->opc = opc;
290   return GNUNET_NO;
291 }
292
293
294 /**
295  * Returns the operation context with the given id if found in the Operation
296  * context queues of the controller
297  *
298  * @param c the controller whose operation context map is searched
299  * @param id the id which has to be checked
300  * @return the matching operation context; NULL if no match found
301  */
302 static struct OperationContext *
303 find_opc (const struct GNUNET_TESTBED_Controller *c, const uint64_t id)
304 {
305   struct SearchContext sc;
306
307   sc.id = id;
308   sc.opc = NULL;
309   GNUNET_assert (NULL != c->opc_map);
310   if (GNUNET_SYSERR != 
311       GNUNET_CONTAINER_multihashmap32_get_multiple (c->opc_map, (uint32_t) id,
312                                                     &opc_search_iterator, &sc))
313     return NULL;
314   return sc.opc;
315 }
316
317
318 /**
319  * Inserts the given operation context into the operation context map of the
320  * given controller.  Creates the operation context map if one does not exist
321  * for the controller
322  *
323  * @param c the controller
324  * @param opc the operation context to be inserted
325  */
326 void
327 GNUNET_TESTBED_insert_opc_ (struct GNUNET_TESTBED_Controller *c,
328                             struct OperationContext *opc)
329 {
330   if (NULL == c->opc_map)
331     c->opc_map = GNUNET_CONTAINER_multihashmap32_create (256);
332   GNUNET_assert (GNUNET_OK ==
333                  GNUNET_CONTAINER_multihashmap32_put (c->opc_map, 
334                                                       (uint32_t) opc->id, opc,
335                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
336 }
337
338
339 /**
340  * Removes the given operation context from the operation context map of the
341  * given controller
342  *
343  * @param c the controller
344  * @param opc the operation context to remove 
345  */
346 void
347 GNUNET_TESTBED_remove_opc_ (const struct GNUNET_TESTBED_Controller *c,
348                             struct OperationContext *opc)
349 {
350   GNUNET_assert (NULL != c->opc_map);
351   GNUNET_assert (GNUNET_YES ==
352                  GNUNET_CONTAINER_multihashmap32_remove (c->opc_map,
353                                                          (uint32_t) opc->id,
354                                                          opc));
355 }
356
357
358 /**
359  * Handler for forwarded operations
360  *
361  * @param c the controller handle
362  * @param opc the opearation context
363  * @param msg the message
364  */
365 static void
366 handle_forwarded_operation_msg (struct GNUNET_TESTBED_Controller *c,
367                                 struct OperationContext *opc,
368                                 const struct GNUNET_MessageHeader *msg)
369 {
370   struct ForwardedOperationData *fo_data;
371
372   fo_data = opc->data;
373   if (NULL != fo_data->cc)
374     fo_data->cc (fo_data->cc_cls, msg);
375   GNUNET_TESTBED_remove_opc_ (c, opc);
376   GNUNET_free (fo_data);
377   GNUNET_free (opc);
378 }
379
380
381 /**
382  * Handler for GNUNET_MESSAGE_TYPE_TESTBED_ADDHOSTCONFIRM message from
383  * controller (testbed service)
384  *
385  * @param c the controller handler
386  * @param msg message received
387  * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if
388  *           not
389  */
390 static int
391 handle_opsuccess (struct GNUNET_TESTBED_Controller *c,
392                   const struct
393                   GNUNET_TESTBED_GenericOperationSuccessEventMessage *msg)
394 {
395   struct OperationContext *opc;
396   GNUNET_TESTBED_OperationCompletionCallback op_comp_cb;
397   void *op_comp_cb_cls;
398   struct GNUNET_TESTBED_EventInformation event;
399   uint64_t op_id;
400
401   op_id = GNUNET_ntohll (msg->operation_id);
402   LOG_DEBUG ("Operation %lu successful\n", op_id);
403   if (NULL == (opc = find_opc (c, op_id)))
404   {
405     LOG_DEBUG ("Operation not found\n");
406     return GNUNET_YES;
407   }
408   event.type = GNUNET_TESTBED_ET_OPERATION_FINISHED;
409   event.op = opc->op;
410   event.op_cls = opc->op_cls;
411   event.details.operation_finished.emsg = NULL;
412   event.details.operation_finished.generic = NULL;
413   op_comp_cb = NULL;
414   op_comp_cb_cls = NULL;
415   switch (opc->type)
416   {
417   case OP_FORWARDED:
418   {
419     handle_forwarded_operation_msg (c, opc,
420                                     (const struct GNUNET_MessageHeader *) msg);
421     return GNUNET_YES;
422   }
423     break;
424   case OP_PEER_DESTROY:
425   {
426     struct GNUNET_TESTBED_Peer *peer;
427
428     peer = opc->data;
429     GNUNET_TESTBED_peer_deregister_ (peer);
430     GNUNET_free (peer);
431     opc->data = NULL;
432     //PEERDESTROYDATA
433   }
434     break;
435   case OP_SHUTDOWN_PEERS:
436   {
437     struct ShutdownPeersData *data;
438
439     data = opc->data;
440     op_comp_cb = data->cb;
441     op_comp_cb_cls = data->cb_cls;
442     GNUNET_free (data);
443     opc->data = NULL;
444     GNUNET_TESTBED_cleanup_peers_ ();
445   }
446     break;
447   case OP_MANAGE_SERVICE:
448   {
449     struct ManageServiceData *data;
450
451     GNUNET_assert (NULL != (data = opc->data));
452     op_comp_cb = data->cb;
453     op_comp_cb_cls = data->cb_cls;
454     GNUNET_free (data);
455     opc->data = NULL;
456   }
457     break;
458   case OP_PEER_RECONFIGURE:
459     break;
460   default:
461     GNUNET_assert (0);
462   }
463   GNUNET_TESTBED_remove_opc_ (opc->c, opc);
464   opc->state = OPC_STATE_FINISHED;
465   exop_insert (event.op);  
466   if (0 != (c->event_mask & (1L << GNUNET_TESTBED_ET_OPERATION_FINISHED)))
467   {
468     if (NULL != c->cc)
469       c->cc (c->cc_cls, &event);
470     if (GNUNET_NO == exop_check (event.op))
471       return GNUNET_YES;
472   }
473   else
474     LOG_DEBUG ("Not calling callback\n");
475   if (NULL != op_comp_cb)
476     op_comp_cb (op_comp_cb_cls, event.op, NULL);
477    /* You could have marked the operation as done by now */
478   GNUNET_break (GNUNET_NO == exop_check (event.op));
479   return GNUNET_YES;
480 }
481
482
483 /**
484  * Handler for GNUNET_MESSAGE_TYPE_TESTBED_PEERCREATESUCCESS message from
485  * controller (testbed service)
486  *
487  * @param c the controller handle
488  * @param msg message received
489  * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if
490  *           not
491  */
492 static int
493 handle_peer_create_success (struct GNUNET_TESTBED_Controller *c,
494                             const struct
495                             GNUNET_TESTBED_PeerCreateSuccessEventMessage *msg)
496 {
497   struct OperationContext *opc;
498   struct PeerCreateData *data;
499   struct GNUNET_TESTBED_Peer *peer;
500   struct GNUNET_TESTBED_Operation *op;
501   GNUNET_TESTBED_PeerCreateCallback cb;
502   void *cls;
503   uint64_t op_id;
504
505   GNUNET_assert (sizeof (struct GNUNET_TESTBED_PeerCreateSuccessEventMessage) ==
506                  ntohs (msg->header.size));
507   op_id = GNUNET_ntohll (msg->operation_id);
508   if (NULL == (opc = find_opc (c, op_id)))
509   {
510     LOG_DEBUG ("Operation context for PeerCreateSuccessEvent not found\n");
511     return GNUNET_YES;
512   }
513   if (OP_FORWARDED == opc->type)
514   {
515     handle_forwarded_operation_msg (c, opc,
516                                     (const struct GNUNET_MessageHeader *) msg);
517     return GNUNET_YES;
518   }
519   GNUNET_assert (OP_PEER_CREATE == opc->type);
520   GNUNET_assert (NULL != opc->data);
521   data = opc->data;
522   GNUNET_assert (NULL != data->peer);
523   peer = data->peer;
524   GNUNET_assert (peer->unique_id == ntohl (msg->peer_id));
525   peer->state = PS_CREATED;
526   GNUNET_TESTBED_peer_register_ (peer);
527   cb = data->cb;
528   cls = data->cls;
529   op = opc->op;
530   GNUNET_free (opc->data);
531   GNUNET_TESTBED_remove_opc_ (opc->c, opc);
532   opc->state = OPC_STATE_FINISHED;
533   exop_insert (op);
534   if (NULL != cb)
535     cb (cls, peer, NULL);
536    /* You could have marked the operation as done by now */
537   GNUNET_break (GNUNET_NO == exop_check (op));
538   return GNUNET_YES;
539 }
540
541
542 /**
543  * Handler for GNUNET_MESSAGE_TYPE_TESTBED_PEEREVENT message from
544  * controller (testbed service)
545  *
546  * @param c the controller handler
547  * @param msg message received
548  * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if
549  *           not
550  */
551 static int
552 handle_peer_event (struct GNUNET_TESTBED_Controller *c,
553                    const struct GNUNET_TESTBED_PeerEventMessage *msg)
554 {
555   struct OperationContext *opc;
556   struct GNUNET_TESTBED_Peer *peer;
557   struct PeerEventData *data;
558   GNUNET_TESTBED_PeerChurnCallback pcc;
559   void *pcc_cls;
560   struct GNUNET_TESTBED_EventInformation event;
561   uint64_t op_id;
562
563   GNUNET_assert (sizeof (struct GNUNET_TESTBED_PeerEventMessage) ==
564                  ntohs (msg->header.size));
565   op_id = GNUNET_ntohll (msg->operation_id);
566   if (NULL == (opc = find_opc (c, op_id)))
567   {
568     LOG_DEBUG ("Operation not found\n");
569     return GNUNET_YES;
570   }
571   if (OP_FORWARDED == opc->type)
572   {
573     handle_forwarded_operation_msg (c, opc,
574                                     (const struct GNUNET_MessageHeader *) msg);
575     return GNUNET_YES;
576   }
577   GNUNET_assert ((OP_PEER_START == opc->type) || (OP_PEER_STOP == opc->type));
578   data = opc->data;
579   GNUNET_assert (NULL != data);
580   peer = data->peer;
581   GNUNET_assert (NULL != peer);
582   event.type = (enum GNUNET_TESTBED_EventType) ntohl (msg->event_type);
583   event.op = opc->op;
584   event.op_cls = opc->op_cls;
585   switch (event.type)
586   {
587   case GNUNET_TESTBED_ET_PEER_START:
588     peer->state = PS_STARTED;
589     event.details.peer_start.host = peer->host;
590     event.details.peer_start.peer = peer;
591     break;
592   case GNUNET_TESTBED_ET_PEER_STOP:
593     peer->state = PS_STOPPED;
594     event.details.peer_stop.peer = peer;
595     break;
596   default:
597     GNUNET_assert (0);          /* We should never reach this state */
598   }
599   pcc = data->pcc;
600   pcc_cls = data->pcc_cls;
601   GNUNET_free (data);
602   GNUNET_TESTBED_remove_opc_ (opc->c, opc);
603   opc->state = OPC_STATE_FINISHED;
604   exop_insert (event.op);
605   if (0 !=
606       ((GNUNET_TESTBED_ET_PEER_START | GNUNET_TESTBED_ET_PEER_STOP) &
607        c->event_mask))
608   {
609     if (NULL != c->cc)
610       c->cc (c->cc_cls, &event);    
611     if (GNUNET_NO == exop_check (event.op))
612       return GNUNET_YES;
613   }
614   if (NULL != pcc)
615     pcc (pcc_cls, NULL);
616    /* You could have marked the operation as done by now */
617   GNUNET_break (GNUNET_NO == exop_check (event.op));
618   return GNUNET_YES;
619 }
620
621
622 /**
623  * Handler for GNUNET_MESSAGE_TYPE_TESTBED_PEERCONEVENT message from
624  * controller (testbed service)
625  *
626  * @param c the controller handler
627  * @param msg message received
628  * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if
629  *           not
630  */
631 static int
632 handle_peer_conevent (struct GNUNET_TESTBED_Controller *c,
633                       const struct GNUNET_TESTBED_ConnectionEventMessage *msg)
634 {
635   struct OperationContext *opc;
636   struct OverlayConnectData *data;
637   GNUNET_TESTBED_OperationCompletionCallback cb;
638   void *cb_cls;
639   struct GNUNET_TESTBED_EventInformation event;
640   uint64_t op_id;
641
642   op_id = GNUNET_ntohll (msg->operation_id);
643   if (NULL == (opc = find_opc (c, op_id)))
644   {
645     LOG_DEBUG ("Operation not found\n");
646     return GNUNET_YES;
647   }
648   if (OP_FORWARDED == opc->type)
649   {
650     handle_forwarded_operation_msg (c, opc,
651                                     (const struct GNUNET_MessageHeader *) msg);
652     return GNUNET_YES;
653   }
654   GNUNET_assert (OP_OVERLAY_CONNECT == opc->type);
655   GNUNET_assert (NULL != (data = opc->data));
656   GNUNET_assert ((ntohl (msg->peer1) == data->p1->unique_id) &&
657                  (ntohl (msg->peer2) == data->p2->unique_id));
658   event.type = (enum GNUNET_TESTBED_EventType) ntohl (msg->event_type);
659   event.op = opc->op;
660   event.op_cls = opc->op_cls;
661   switch (event.type)
662   {
663   case GNUNET_TESTBED_ET_CONNECT:
664     event.details.peer_connect.peer1 = data->p1;
665     event.details.peer_connect.peer2 = data->p2;
666     break;
667   case GNUNET_TESTBED_ET_DISCONNECT:
668     GNUNET_assert (0);          /* FIXME: implement */
669     break;
670   default:
671     GNUNET_assert (0);          /* Should never reach here */
672     break;
673   }
674   cb = data->cb;
675   cb_cls = data->cb_cls;
676   GNUNET_TESTBED_remove_opc_ (opc->c, opc);
677   opc->state = OPC_STATE_FINISHED;
678   exop_insert (event.op);
679   if (0 !=
680       ((GNUNET_TESTBED_ET_CONNECT | GNUNET_TESTBED_ET_DISCONNECT) &
681        c->event_mask))
682   {
683     if (NULL != c->cc)
684       c->cc (c->cc_cls, &event);
685     if (GNUNET_NO == exop_check (event.op))
686       return GNUNET_YES;
687   }  
688   if (NULL != cb)
689     cb (cb_cls, opc->op, NULL);
690    /* You could have marked the operation as done by now */
691   GNUNET_break (GNUNET_NO == exop_check (event.op));
692   return GNUNET_YES;
693 }
694
695
696 /**
697  * Handler for GNUNET_MESSAGE_TYPE_TESTBED_PEERCONFIG message from
698  * controller (testbed service)
699  *
700  * @param c the controller handler
701  * @param msg message received
702  * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if
703  *           not
704  */
705 static int
706 handle_peer_config (struct GNUNET_TESTBED_Controller *c,
707                     const struct
708                     GNUNET_TESTBED_PeerConfigurationInformationMessage *msg)
709 {
710   struct OperationContext *opc;
711   struct GNUNET_TESTBED_Peer *peer;
712   struct PeerInfoData *data;
713   struct GNUNET_TESTBED_PeerInformation *pinfo;
714   GNUNET_TESTBED_PeerInfoCallback cb;
715   void *cb_cls;
716   uint64_t op_id;
717
718   op_id = GNUNET_ntohll (msg->operation_id);
719   if (NULL == (opc = find_opc (c, op_id)))
720   {
721     LOG_DEBUG ("Operation not found\n");
722     return GNUNET_YES;
723   }
724   if (OP_FORWARDED == opc->type)
725   {
726     handle_forwarded_operation_msg (c, opc,
727                                     (const struct GNUNET_MessageHeader *) msg);
728     return GNUNET_YES;
729   }
730   data = opc->data;
731   GNUNET_assert (NULL != data);
732   peer = data->peer;
733   GNUNET_assert (NULL != peer);
734   GNUNET_assert (ntohl (msg->peer_id) == peer->unique_id);
735   pinfo = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_PeerInformation));
736   pinfo->pit = data->pit;  
737   cb = data->cb;
738   cb_cls = data->cb_cls;
739   GNUNET_assert (NULL != cb);
740   GNUNET_free (data);
741   opc->data = NULL;  
742   switch (pinfo->pit)
743   {
744   case GNUNET_TESTBED_PIT_IDENTITY:
745     pinfo->result.id = GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity));
746     (void) memcpy (pinfo->result.id, &msg->peer_identity,
747                    sizeof (struct GNUNET_PeerIdentity));
748     break;
749   case GNUNET_TESTBED_PIT_CONFIGURATION:
750     pinfo->result.cfg =         /* Freed in oprelease_peer_getinfo */
751         GNUNET_TESTBED_extract_config_ (&msg->header);
752     break;
753   case GNUNET_TESTBED_PIT_GENERIC:
754     GNUNET_assert (0);          /* never reach here */
755     break;
756   }
757   opc->data = pinfo;
758   GNUNET_TESTBED_remove_opc_ (opc->c, opc);  
759   opc->state = OPC_STATE_FINISHED;
760   cb (cb_cls, opc->op, pinfo, NULL);
761   /* We dont check whether the operation is marked as done here as the
762      operation contains data (cfg/identify) which will be freed at a later point
763   */
764   return GNUNET_YES;
765 }
766
767
768 /**
769  * Handler for GNUNET_MESSAGE_TYPE_TESTBED_OPERATIONFAILEVENT message from
770  * controller (testbed service)
771  *
772  * @param c the controller handler
773  * @param msg message received
774  * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if
775  *           not
776  */
777 static int
778 handle_op_fail_event (struct GNUNET_TESTBED_Controller *c,
779                       const struct GNUNET_TESTBED_OperationFailureEventMessage
780                       *msg)
781 {
782   struct OperationContext *opc;
783   const char *emsg;
784   uint64_t op_id;
785   uint64_t mask;
786   struct GNUNET_TESTBED_EventInformation event;
787
788   op_id = GNUNET_ntohll (msg->operation_id);
789   if (NULL == (opc = find_opc (c, op_id)))
790   {
791     LOG_DEBUG ("Operation not found\n");
792     return GNUNET_YES;
793   }
794   if (OP_FORWARDED == opc->type)
795   {
796     handle_forwarded_operation_msg (c, opc,
797                                     (const struct GNUNET_MessageHeader *) msg);
798     return GNUNET_YES;
799   }
800   GNUNET_TESTBED_remove_opc_ (opc->c, opc);
801   opc->state = OPC_STATE_FINISHED;
802   emsg = GNUNET_TESTBED_parse_error_string_ (msg);
803   if (NULL == emsg)
804     emsg = "Unknown error";
805   if (OP_PEER_INFO == opc->type)
806   {
807     struct PeerInfoData *data;
808
809     data = opc->data;
810     if (NULL != data->cb)
811       data->cb (data->cb_cls, opc->op, NULL, emsg);
812     GNUNET_free (data);
813     return GNUNET_YES;          /* We do not call controller callback for peer info */
814   }
815   event.type = GNUNET_TESTBED_ET_OPERATION_FINISHED;
816   event.op = opc->op;
817   event.op_cls = opc->op_cls;
818   event.details.operation_finished.emsg = emsg;
819   event.details.operation_finished.generic = NULL;
820   mask = (1LL << GNUNET_TESTBED_ET_OPERATION_FINISHED);
821   if ((0 != (mask & c->event_mask)) && (NULL != c->cc))
822   {
823     exop_insert (event.op);
824     c->cc (c->cc_cls, &event);
825     if (GNUNET_NO == exop_check (event.op))
826       return GNUNET_YES;
827   }
828   switch (opc->type)
829   {
830   case OP_PEER_CREATE:
831   {
832     struct PeerCreateData *data;
833
834     data = opc->data;
835     GNUNET_free (data->peer);
836     if (NULL != data->cb)
837       data->cb (data->cls, NULL, emsg);
838     GNUNET_free (data);
839   }
840     break;
841   case OP_PEER_START:
842   case OP_PEER_STOP:
843   {
844     struct PeerEventData *data;
845
846     data = opc->data;
847     if (NULL != data->pcc)
848       data->pcc (data->pcc_cls, emsg);
849     GNUNET_free (data);
850   }
851     break;
852   case OP_PEER_DESTROY:
853     break;
854   case OP_PEER_INFO:
855     GNUNET_assert (0);
856   case OP_OVERLAY_CONNECT:
857   {
858     struct OverlayConnectData *data;
859
860     data = opc->data;
861     data->failed = GNUNET_YES;
862     if (NULL != data->cb)
863       data->cb (data->cb_cls, opc->op, emsg);
864   }
865     break;
866   case OP_FORWARDED:
867     GNUNET_assert (0);
868   case OP_LINK_CONTROLLERS:    /* No secondary callback */
869     break;    
870   case OP_SHUTDOWN_PEERS:
871   {
872     struct ShutdownPeersData *data;
873
874     data = opc->data;
875     GNUNET_free (data);         /* FIXME: Decide whether we call data->op_cb */
876     opc->data = NULL;
877   }
878     break;
879   case OP_MANAGE_SERVICE:
880   {
881     struct ManageServiceData *data = opc->data;
882       GNUNET_TESTBED_OperationCompletionCallback cb;
883     void *cb_cls;
884
885     GNUNET_assert (NULL != data);
886     cb = data->cb;
887     cb_cls = data->cb_cls;
888     GNUNET_free (data);
889     opc->data = NULL;
890     exop_insert (event.op);
891     if (NULL != cb)
892       cb (cb_cls, opc->op, emsg);
893     /* You could have marked the operation as done by now */
894     GNUNET_break (GNUNET_NO == exop_check (event.op));
895   }
896     break;
897   default:
898     GNUNET_break (0);
899   }
900   return GNUNET_YES;
901 }
902
903
904 /**
905  * Function to build GET_SLAVE_CONFIG message
906  *
907  * @param op_id the id this message should contain in its operation id field
908  * @param slave_id the id this message should contain in its slave id field
909  * @return newly allocated SlaveGetConfigurationMessage
910  */
911 static struct GNUNET_TESTBED_SlaveGetConfigurationMessage *
912 GNUNET_TESTBED_generate_slavegetconfig_msg_ (uint64_t op_id, uint32_t slave_id)
913 {
914   struct GNUNET_TESTBED_SlaveGetConfigurationMessage *msg;
915   uint16_t msize;
916
917   msize = sizeof (struct GNUNET_TESTBED_SlaveGetConfigurationMessage);
918   msg = GNUNET_malloc (msize);
919   msg->header.size = htons (msize);
920   msg->header.type =
921       htons (GNUNET_MESSAGE_TYPE_TESTBED_GET_SLAVE_CONFIGURATION);
922   msg->operation_id = GNUNET_htonll (op_id);
923   msg->slave_id = htonl (slave_id);
924   return msg;
925 }
926
927
928 /**
929  * Handler for GNUNET_MESSAGE_TYPE_TESTBED_SLAVECONFIG message from controller
930  * (testbed service)
931  *
932  * @param c the controller handler
933  * @param msg message received
934  * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if
935  *           not
936  */
937 static int
938 handle_slave_config (struct GNUNET_TESTBED_Controller *c,
939                      const struct GNUNET_TESTBED_SlaveConfiguration *msg)
940 {
941   struct OperationContext *opc;
942   uint64_t op_id;
943   struct GNUNET_TESTBED_EventInformation event;
944
945   op_id = GNUNET_ntohll (msg->operation_id);
946   if (NULL == (opc = find_opc (c, op_id)))
947   {
948     LOG_DEBUG ("Operation not found\n");
949     return GNUNET_YES;
950   }
951   if (OP_GET_SLAVE_CONFIG != opc->type)
952   {
953     GNUNET_break (0);
954     return GNUNET_YES;
955   }
956   opc->state = OPC_STATE_FINISHED;
957   GNUNET_TESTBED_remove_opc_ (opc->c, opc);
958   if ((0 != (GNUNET_TESTBED_ET_OPERATION_FINISHED & c->event_mask)) &&
959       (NULL != c->cc))
960   {
961     opc->data = GNUNET_TESTBED_extract_config_ (&msg->header);
962     event.type = GNUNET_TESTBED_ET_OPERATION_FINISHED;
963     event.op = opc->op;
964     event.op_cls = opc->op_cls;
965     event.details.operation_finished.generic = opc->data;
966     event.details.operation_finished.emsg = NULL;
967     c->cc (c->cc_cls, &event);
968   }
969   return GNUNET_YES;
970 }
971
972
973 /**
974  * Handler for GNUNET_MESSAGE_TYPE_TESTBED_SLAVECONFIG message from controller
975  * (testbed service)
976  *
977  * @param c the controller handler
978  * @param msg message received
979  * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if
980  *           not
981  */
982 static int
983 handle_link_controllers_result (struct GNUNET_TESTBED_Controller *c,
984                                 const struct
985                                 GNUNET_TESTBED_ControllerLinkResponse *msg)
986 {
987   struct OperationContext *opc;
988   struct ControllerLinkData *data;
989   struct GNUNET_CONFIGURATION_Handle *cfg;
990   struct GNUNET_TESTBED_Host *host;
991   char *emsg;
992   uint64_t op_id;
993   struct GNUNET_TESTBED_EventInformation event;
994
995   op_id = GNUNET_ntohll (msg->operation_id);
996   if (NULL == (opc = find_opc (c, op_id)))
997   {
998     LOG_DEBUG ("Operation not found\n");
999     return GNUNET_YES;
1000   }
1001   if (OP_FORWARDED == opc->type)
1002   {
1003     handle_forwarded_operation_msg (c, opc,
1004                                     (const struct GNUNET_MessageHeader *) msg);
1005     return GNUNET_YES;
1006   }
1007   if (OP_LINK_CONTROLLERS != opc->type)
1008   {
1009     GNUNET_break (0);
1010     return GNUNET_YES;
1011   }
1012   GNUNET_assert (NULL != (data = opc->data));
1013   host = GNUNET_TESTBED_host_lookup_by_id_ (data->host_id);
1014   GNUNET_assert (NULL != host);
1015   GNUNET_free (data);
1016   opc->data = NULL;
1017   opc->state = OPC_STATE_FINISHED;
1018   GNUNET_TESTBED_remove_opc_ (opc->c, opc);
1019   event.type = GNUNET_TESTBED_ET_OPERATION_FINISHED;
1020   event.op = opc->op;
1021   event.op_cls = opc->op_cls;
1022   event.details.operation_finished.emsg = NULL;
1023   event.details.operation_finished.generic = NULL;
1024   emsg = NULL;
1025   cfg = NULL;
1026   if (GNUNET_NO == ntohs (msg->success))
1027   {
1028     emsg = GNUNET_malloc (ntohs (msg->header.size)
1029                           - sizeof (struct
1030                                     GNUNET_TESTBED_ControllerLinkResponse) + 1);
1031     memcpy (emsg, &msg[1], ntohs (msg->header.size)
1032                           - sizeof (struct
1033                                     GNUNET_TESTBED_ControllerLinkResponse));
1034     event.details.operation_finished.emsg = emsg;
1035   }
1036   else
1037   {
1038     if (0 != ntohs (msg->config_size))
1039     {
1040       cfg = GNUNET_TESTBED_extract_config_ ((const struct GNUNET_MessageHeader *) msg);
1041       GNUNET_assert (NULL != cfg);
1042       GNUNET_TESTBED_host_replace_cfg_ (host, cfg);
1043     }
1044   }
1045   if (0 != (c->event_mask & (1L << GNUNET_TESTBED_ET_OPERATION_FINISHED)))
1046   {
1047     if (NULL != c->cc)
1048       c->cc (c->cc_cls, &event);
1049   }
1050   else
1051     LOG_DEBUG ("Not calling callback\n");
1052   if (NULL != cfg)
1053     GNUNET_CONFIGURATION_destroy (cfg);
1054   GNUNET_free_non_null (emsg);
1055   return GNUNET_YES;
1056 }
1057
1058
1059 /**
1060  * Handler for messages from controller (testbed service)
1061  *
1062  * @param cls the controller handler
1063  * @param msg message received, NULL on timeout or fatal error
1064  */
1065 static void
1066 message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
1067 {
1068   struct GNUNET_TESTBED_Controller *c = cls;
1069   int status;
1070   uint16_t msize;
1071
1072   c->in_receive = GNUNET_NO;
1073   /* FIXME: Add checks for message integrity */
1074   if (NULL == msg)
1075   {
1076     LOG_DEBUG ("Receive timed out or connection to service dropped\n");
1077     return;
1078   }
1079   status = GNUNET_OK;
1080   msize = ntohs (msg->size);
1081   switch (ntohs (msg->type))
1082   {
1083   case GNUNET_MESSAGE_TYPE_TESTBED_ADD_HOST_SUCCESS:
1084     GNUNET_assert (msize >=
1085                    sizeof (struct GNUNET_TESTBED_HostConfirmedMessage));
1086     status =
1087         GNUNET_TESTBED_host_handle_addhostconfirm_
1088         (c, (const struct GNUNET_TESTBED_HostConfirmedMessage*) msg);
1089     break;
1090   case GNUNET_MESSAGE_TYPE_TESTBED_GENERIC_OPERATION_SUCCESS:
1091     GNUNET_assert (msize ==
1092                    sizeof (struct
1093                            GNUNET_TESTBED_GenericOperationSuccessEventMessage));
1094     status =
1095         handle_opsuccess (c,
1096                           (const struct
1097                            GNUNET_TESTBED_GenericOperationSuccessEventMessage *)
1098                           msg);
1099     break;
1100   case GNUNET_MESSAGE_TYPE_TESTBED_OPERATION_FAIL_EVENT:
1101     GNUNET_assert (msize >=
1102                    sizeof (struct GNUNET_TESTBED_OperationFailureEventMessage));
1103     status =
1104         handle_op_fail_event (c,
1105                               (const struct
1106                                GNUNET_TESTBED_OperationFailureEventMessage *)
1107                               msg);
1108     break;
1109   case GNUNET_MESSAGE_TYPE_TESTBED_CREATE_PEER_SUCCESS:
1110     GNUNET_assert (msize ==
1111                    sizeof (struct
1112                            GNUNET_TESTBED_PeerCreateSuccessEventMessage));
1113     status =
1114         handle_peer_create_success (c,
1115                                     (const struct
1116                                      GNUNET_TESTBED_PeerCreateSuccessEventMessage
1117                                      *) msg);
1118     break;
1119   case GNUNET_MESSAGE_TYPE_TESTBED_PEER_EVENT:
1120     GNUNET_assert (msize == sizeof (struct GNUNET_TESTBED_PeerEventMessage));
1121     status =
1122         handle_peer_event (c,
1123                            (const struct GNUNET_TESTBED_PeerEventMessage *)
1124                            msg);
1125
1126     break;
1127   case GNUNET_MESSAGE_TYPE_TESTBED_PEER_CONFIGURATION:
1128     GNUNET_assert (msize >=
1129                    sizeof (struct
1130                            GNUNET_TESTBED_PeerConfigurationInformationMessage));
1131     status =
1132         handle_peer_config (c,
1133                             (const struct
1134                              GNUNET_TESTBED_PeerConfigurationInformationMessage
1135                              *) msg);
1136     break;
1137   case GNUNET_MESSAGE_TYPE_TESTBED_PEER_CONNECT_EVENT:
1138     GNUNET_assert (msize ==
1139                    sizeof (struct GNUNET_TESTBED_ConnectionEventMessage));
1140     status =
1141         handle_peer_conevent (c,
1142                               (const struct
1143                                GNUNET_TESTBED_ConnectionEventMessage *) msg);
1144     break;
1145   case GNUNET_MESSAGE_TYPE_TESTBED_SLAVE_CONFIGURATION:
1146     GNUNET_assert (msize > sizeof (struct GNUNET_TESTBED_SlaveConfiguration));
1147     status =
1148         handle_slave_config (c,
1149                              (const struct GNUNET_TESTBED_SlaveConfiguration *)
1150                              msg);
1151     break;
1152   case GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS_RESULT:
1153     status = 
1154         handle_link_controllers_result (c,
1155                                         (const struct
1156                                          GNUNET_TESTBED_ControllerLinkResponse
1157                                          *) msg);
1158     break;
1159   default:
1160     GNUNET_assert (0);
1161   }
1162   if ((GNUNET_OK == status) && (GNUNET_NO == c->in_receive))
1163   {
1164     c->in_receive = GNUNET_YES;
1165     GNUNET_CLIENT_receive (c->client, &message_handler, c,
1166                            GNUNET_TIME_UNIT_FOREVER_REL);
1167   }
1168 }
1169
1170
1171 /**
1172  * Function called to notify a client about the connection begin ready to queue
1173  * more data.  "buf" will be NULL and "size" zero if the connection was closed
1174  * for writing in the meantime.
1175  *
1176  * @param cls closure
1177  * @param size number of bytes available in buf
1178  * @param buf where the callee should write the message
1179  * @return number of bytes written to buf
1180  */
1181 static size_t
1182 transmit_ready_notify (void *cls, size_t size, void *buf)
1183 {
1184   struct GNUNET_TESTBED_Controller *c = cls;
1185   struct MessageQueue *mq_entry;
1186
1187   c->th = NULL;
1188   mq_entry = c->mq_head;
1189   GNUNET_assert (NULL != mq_entry);
1190   if ((0 == size) && (NULL == buf))     /* Timeout */
1191   {
1192     LOG_DEBUG ("Message sending timed out -- retrying\n");
1193     c->th =
1194         GNUNET_CLIENT_notify_transmit_ready (c->client,
1195                                              ntohs (mq_entry->msg->size),
1196                                              TIMEOUT_REL, GNUNET_YES,
1197                                              &transmit_ready_notify, c);
1198     return 0;
1199   }
1200   GNUNET_assert (ntohs (mq_entry->msg->size) <= size);
1201   size = ntohs (mq_entry->msg->size);
1202   memcpy (buf, mq_entry->msg, size);
1203   LOG_DEBUG ("Message of type: %u and size: %u sent\n",
1204              ntohs (mq_entry->msg->type), size);
1205   GNUNET_free (mq_entry->msg);
1206   GNUNET_CONTAINER_DLL_remove (c->mq_head, c->mq_tail, mq_entry);
1207   GNUNET_free (mq_entry);
1208   mq_entry = c->mq_head;
1209   if (NULL != mq_entry)
1210     c->th =
1211         GNUNET_CLIENT_notify_transmit_ready (c->client,
1212                                              ntohs (mq_entry->msg->size),
1213                                              TIMEOUT_REL, GNUNET_YES,
1214                                              &transmit_ready_notify, c);
1215   if (GNUNET_NO == c->in_receive)
1216   {
1217     c->in_receive = GNUNET_YES;
1218     GNUNET_CLIENT_receive (c->client, &message_handler, c,
1219                            GNUNET_TIME_UNIT_FOREVER_REL);
1220   }
1221   return size;
1222 }
1223
1224
1225 /**
1226  * Queues a message in send queue for sending to the service
1227  *
1228  * @param controller the handle to the controller
1229  * @param msg the message to queue
1230  */
1231 void
1232 GNUNET_TESTBED_queue_message_ (struct GNUNET_TESTBED_Controller *controller,
1233                                struct GNUNET_MessageHeader *msg)
1234 {
1235   struct MessageQueue *mq_entry;
1236   uint16_t type;
1237   uint16_t size;
1238
1239   type = ntohs (msg->type);
1240   size = ntohs (msg->size);
1241   GNUNET_assert ((GNUNET_MESSAGE_TYPE_TESTBED_INIT <= type) &&
1242                  (GNUNET_MESSAGE_TYPE_TESTBED_MAX > type));
1243   mq_entry = GNUNET_malloc (sizeof (struct MessageQueue));
1244   mq_entry->msg = msg;
1245   LOG (GNUNET_ERROR_TYPE_DEBUG,
1246        "Queueing message of type %u, size %u for sending\n", type,
1247        ntohs (msg->size));
1248   GNUNET_CONTAINER_DLL_insert_tail (controller->mq_head, controller->mq_tail,
1249                                     mq_entry);
1250   if (NULL == controller->th)
1251     controller->th =
1252         GNUNET_CLIENT_notify_transmit_ready (controller->client, size,
1253                                              TIMEOUT_REL, GNUNET_YES,
1254                                              &transmit_ready_notify,
1255                                              controller);
1256 }
1257
1258
1259 /**
1260  * Sends the given message as an operation. The given callback is called when a
1261  * reply for the operation is available.  Call
1262  * GNUNET_TESTBED_forward_operation_msg_cancel_() to cleanup the returned
1263  * operation context if the cc hasn't been called
1264  *
1265  * @param controller the controller to which the message has to be sent
1266  * @param operation_id the operation id of the message
1267  * @param msg the message to send
1268  * @param cc the callback to call when reply is available
1269  * @param cc_cls the closure for the above callback
1270  * @return the operation context which can be used to cancel the forwarded
1271  *           operation
1272  */
1273 struct OperationContext *
1274 GNUNET_TESTBED_forward_operation_msg_ (struct GNUNET_TESTBED_Controller
1275                                        *controller, uint64_t operation_id,
1276                                        const struct GNUNET_MessageHeader *msg,
1277                                        GNUNET_CLIENT_MessageHandler cc,
1278                                        void *cc_cls)
1279 {
1280   struct OperationContext *opc;
1281   struct ForwardedOperationData *data;
1282   struct GNUNET_MessageHeader *dup_msg;
1283   uint16_t msize;
1284
1285   data = GNUNET_malloc (sizeof (struct ForwardedOperationData));
1286   data->cc = cc;
1287   data->cc_cls = cc_cls;
1288   opc = GNUNET_malloc (sizeof (struct OperationContext));
1289   opc->c = controller;
1290   opc->type = OP_FORWARDED;
1291   opc->data = data;
1292   opc->id = operation_id;
1293   msize = ntohs (msg->size);
1294   dup_msg = GNUNET_malloc (msize);
1295   (void) memcpy (dup_msg, msg, msize);
1296   GNUNET_TESTBED_queue_message_ (opc->c, dup_msg);
1297   GNUNET_TESTBED_insert_opc_ (controller, opc);
1298   return opc;
1299 }
1300
1301
1302 /**
1303  * Function to cancel an operation created by simply forwarding an operation
1304  * message.
1305  *
1306  * @param opc the operation context from GNUNET_TESTBED_forward_operation_msg_()
1307  */
1308 void
1309 GNUNET_TESTBED_forward_operation_msg_cancel_ (struct OperationContext *opc)
1310 {
1311   GNUNET_TESTBED_remove_opc_ (opc->c, opc);
1312   GNUNET_free (opc->data);
1313   GNUNET_free (opc);
1314 }
1315
1316
1317 /**
1318  * Function to call to start a link-controllers type operation once all queues
1319  * the operation is part of declare that the operation can be activated.
1320  *
1321  * @param cls the closure from GNUNET_TESTBED_operation_create_()
1322  */
1323 static void
1324 opstart_link_controllers (void *cls)
1325 {
1326   struct OperationContext *opc = cls;
1327   struct ControllerLinkData *data;
1328   struct GNUNET_TESTBED_ControllerLinkRequest *msg;
1329
1330   GNUNET_assert (NULL != opc->data);
1331   data = opc->data;
1332   msg = data->msg;
1333   data->msg = NULL;
1334   opc->state = OPC_STATE_STARTED;
1335   GNUNET_TESTBED_insert_opc_ (opc->c, opc);
1336   GNUNET_TESTBED_queue_message_ (opc->c, &msg->header);
1337 }
1338
1339
1340 /**
1341  * Callback which will be called when link-controllers type operation is released
1342  *
1343  * @param cls the closure from GNUNET_TESTBED_operation_create_()
1344  */
1345 static void
1346 oprelease_link_controllers (void *cls)
1347 {
1348   struct OperationContext *opc = cls;
1349   struct ControllerLinkData *data;
1350
1351   data = opc->data;
1352   switch (opc->state)
1353   {
1354   case OPC_STATE_INIT:
1355     GNUNET_free (data->msg);    
1356     break;
1357   case OPC_STATE_STARTED:
1358     GNUNET_TESTBED_remove_opc_ (opc->c, opc);
1359     break;
1360   case OPC_STATE_FINISHED:
1361     break;
1362   }
1363   GNUNET_free_non_null (data);
1364   GNUNET_free (opc);
1365 }
1366
1367
1368 /**
1369  * Function to be called when get slave config operation is ready
1370  *
1371  * @param cls the OperationContext of type OP_GET_SLAVE_CONFIG
1372  */
1373 static void
1374 opstart_get_slave_config (void *cls)
1375 {
1376   struct OperationContext *opc = cls;
1377   struct GetSlaveConfigData *data = opc->data;
1378   struct GNUNET_TESTBED_SlaveGetConfigurationMessage *msg;
1379
1380   GNUNET_assert (NULL != data);
1381   msg = GNUNET_TESTBED_generate_slavegetconfig_msg_ (opc->id, data->slave_id);
1382   GNUNET_free (opc->data);
1383   data = NULL;
1384   opc->data = NULL;
1385   GNUNET_TESTBED_insert_opc_ (opc->c, opc);
1386   GNUNET_TESTBED_queue_message_ (opc->c, &msg->header);
1387   opc->state = OPC_STATE_STARTED;
1388 }
1389
1390
1391 /**
1392  * Function to be called when get slave config operation is cancelled or finished
1393  *
1394  * @param cls the OperationContext of type OP_GET_SLAVE_CONFIG
1395  */
1396 static void
1397 oprelease_get_slave_config (void *cls)
1398 {
1399   struct OperationContext *opc = cls;
1400
1401   switch (opc->state)
1402   {
1403   case OPC_STATE_INIT:
1404     GNUNET_free (opc->data);
1405     break;
1406   case OPC_STATE_STARTED:
1407     GNUNET_TESTBED_remove_opc_ (opc->c, opc);
1408     break;
1409   case OPC_STATE_FINISHED:
1410     if (NULL != opc->data)
1411       GNUNET_CONFIGURATION_destroy (opc->data);
1412     break;
1413   }
1414   GNUNET_free (opc);
1415 }
1416
1417
1418 /**
1419  * Start a controller process using the given configuration at the
1420  * given host.
1421  *
1422  * @param host host to run the controller on; This should be the same host if
1423  *          the controller was previously started with
1424  *          GNUNET_TESTBED_controller_start()
1425  * @param event_mask bit mask with set of events to call 'cc' for;
1426  *                   or-ed values of "1LL" shifted by the
1427  *                   respective 'enum GNUNET_TESTBED_EventType'
1428  *                   (i.e.  "(1LL << GNUNET_TESTBED_ET_CONNECT) | ...")
1429  * @param cc controller callback to invoke on events
1430  * @param cc_cls closure for cc
1431  * @return handle to the controller
1432  */
1433 struct GNUNET_TESTBED_Controller *
1434 GNUNET_TESTBED_controller_connect (struct GNUNET_TESTBED_Host *host,
1435                                    uint64_t event_mask,
1436                                    GNUNET_TESTBED_ControllerCallback cc,
1437                                    void *cc_cls)
1438 {
1439   struct GNUNET_TESTBED_Controller *c;
1440   struct GNUNET_TESTBED_InitMessage *msg;
1441   const struct GNUNET_CONFIGURATION_Handle *cfg;
1442   unsigned long long max_parallel_operations;
1443   unsigned long long max_parallel_service_connections;
1444   unsigned long long max_parallel_topology_config_operations;
1445
1446   GNUNET_assert (NULL != (cfg = GNUNET_TESTBED_host_get_cfg_ (host)));
1447   if (GNUNET_OK !=
1448       GNUNET_CONFIGURATION_get_value_number (cfg, "testbed",
1449                                              "MAX_PARALLEL_OPERATIONS",
1450                                              &max_parallel_operations))
1451   {
1452     GNUNET_break (0);
1453     return NULL;
1454   }
1455   if (GNUNET_OK !=
1456       GNUNET_CONFIGURATION_get_value_number (cfg, "testbed",
1457                                              "MAX_PARALLEL_SERVICE_CONNECTIONS",
1458                                              &max_parallel_service_connections))
1459   {
1460     GNUNET_break (0);
1461     return NULL;
1462   }
1463   if (GNUNET_OK !=
1464       GNUNET_CONFIGURATION_get_value_number (cfg, "testbed",
1465                                              "MAX_PARALLEL_TOPOLOGY_CONFIG_OPERATIONS",
1466                                              &max_parallel_topology_config_operations))
1467   {
1468     GNUNET_break (0);
1469     return NULL;
1470   }
1471   c = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_Controller));
1472   c->cc = cc;
1473   c->cc_cls = cc_cls;
1474   c->event_mask = event_mask;
1475   c->cfg = GNUNET_CONFIGURATION_dup (cfg);
1476   c->client = GNUNET_CLIENT_connect ("testbed", c->cfg);
1477   if (NULL == c->client)
1478   {
1479     GNUNET_TESTBED_controller_disconnect (c);
1480     return NULL;
1481   }
1482   GNUNET_TESTBED_mark_host_registered_at_ (host, c);
1483   c->host = host;
1484   c->opq_parallel_operations =
1485       GNUNET_TESTBED_operation_queue_create_ ((unsigned int)
1486                                               max_parallel_operations);
1487   c->opq_parallel_service_connections =
1488       GNUNET_TESTBED_operation_queue_create_ ((unsigned int)
1489                                               max_parallel_service_connections);
1490   c->opq_parallel_topology_config_operations =
1491       GNUNET_TESTBED_operation_queue_create_ ((unsigned int)
1492                                               max_parallel_topology_config_operations);
1493   msg = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_InitMessage));
1494   msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_INIT);
1495   msg->header.size = htons (sizeof (struct GNUNET_TESTBED_InitMessage));
1496   msg->host_id = htonl (GNUNET_TESTBED_host_get_id_ (host));
1497   msg->event_mask = GNUNET_htonll (c->event_mask);
1498   GNUNET_TESTBED_queue_message_ (c, &msg->header);
1499   return c;
1500 }
1501
1502
1503 /**
1504  * Iterator to free opc map entries
1505  *
1506  * @param cls closure
1507  * @param key current key code
1508  * @param value value in the hash map
1509  * @return GNUNET_YES if we should continue to
1510  *         iterate,
1511  *         GNUNET_NO if not.
1512  */
1513 static int
1514 opc_free_iterator (void *cls, uint32_t key, void *value)
1515 {
1516   struct GNUNET_CONTAINER_MultiHashMap32 *map = cls;
1517   struct OperationContext *opc = value;
1518
1519   GNUNET_assert (NULL != opc);
1520   GNUNET_break (0);
1521   GNUNET_free (opc);
1522   GNUNET_assert (GNUNET_YES == 
1523                  GNUNET_CONTAINER_multihashmap32_remove (map, key, value));
1524   return GNUNET_YES;
1525 }
1526
1527
1528 /**
1529  * Stop the given controller (also will terminate all peers and
1530  * controllers dependent on this controller).  This function
1531  * blocks until the testbed has been fully terminated (!).
1532  *
1533  * @param c handle to controller to stop
1534  */
1535 void
1536 GNUNET_TESTBED_controller_disconnect (struct GNUNET_TESTBED_Controller
1537                                       *c)
1538 {
1539   struct MessageQueue *mq_entry;
1540
1541   if (NULL != c->th)
1542     GNUNET_CLIENT_notify_transmit_ready_cancel (c->th);
1543   /* Clear the message queue */
1544   while (NULL != (mq_entry = c->mq_head))
1545   {
1546     GNUNET_CONTAINER_DLL_remove (c->mq_head, c->mq_tail,
1547                                  mq_entry);
1548     GNUNET_free (mq_entry->msg);
1549     GNUNET_free (mq_entry);
1550   }
1551   if (NULL != c->client)
1552     GNUNET_CLIENT_disconnect (c->client);
1553   if (NULL != c->host)
1554     GNUNET_TESTBED_deregister_host_at_ (c->host, c);
1555   GNUNET_CONFIGURATION_destroy (c->cfg);
1556   GNUNET_TESTBED_operation_queue_destroy_ (c->opq_parallel_operations);
1557   GNUNET_TESTBED_operation_queue_destroy_
1558       (c->opq_parallel_service_connections);
1559   GNUNET_TESTBED_operation_queue_destroy_
1560       (c->opq_parallel_topology_config_operations);
1561   if (NULL != c->opc_map)
1562   {
1563     GNUNET_assert (GNUNET_SYSERR !=
1564                    GNUNET_CONTAINER_multihashmap32_iterate (c->opc_map,
1565                                                             &opc_free_iterator,
1566                                                             c->opc_map));
1567     GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap32_size (c->opc_map));
1568     GNUNET_CONTAINER_multihashmap32_destroy (c->opc_map);
1569   }
1570   GNUNET_free (c);
1571 }
1572
1573
1574 /**
1575  * Compresses given configuration using zlib compress
1576  *
1577  * @param config the serialized configuration
1578  * @param size the size of config
1579  * @param xconfig will be set to the compressed configuration (memory is fresly
1580  *          allocated)
1581  * @return the size of the xconfig
1582  */
1583 size_t
1584 GNUNET_TESTBED_compress_config_ (const char *config, size_t size,
1585                                  char **xconfig)
1586 {
1587   size_t xsize;
1588
1589   xsize = compressBound ((uLong) size);
1590   *xconfig = GNUNET_malloc (xsize);
1591   GNUNET_assert (Z_OK ==
1592                  compress2 ((Bytef *) * xconfig, (uLongf *) & xsize,
1593                             (const Bytef *) config, (uLongf) size,
1594                             Z_BEST_SPEED));
1595   return xsize;
1596 }
1597
1598
1599 /**
1600  * Function to serialize and compress using zlib a configuration through a
1601  * configuration handle
1602  *
1603  * @param cfg the configuration
1604  * @param size the size of configuration when serialize.  Will be set on success.
1605  * @param xsize the sizeo of the compressed configuration.  Will be set on success.
1606  * @return the serialized and compressed configuration
1607  */
1608 char *
1609 GNUNET_TESTBED_compress_cfg_ (const struct GNUNET_CONFIGURATION_Handle *cfg,
1610                               size_t *size, size_t *xsize)
1611 {
1612   char *config;
1613   char *xconfig;
1614   size_t size_;
1615   size_t xsize_;
1616   
1617   config = GNUNET_CONFIGURATION_serialize (cfg, &size_);
1618   xsize_ = GNUNET_TESTBED_compress_config_ (config, size_, &xconfig);
1619   GNUNET_free (config);
1620   *size = size_;
1621   *xsize = xsize_;
1622   return xconfig;
1623 }
1624
1625
1626 /**
1627  * Create a link from slave controller to delegated controller. Whenever the
1628  * master controller is asked to start a peer at the delegated controller the
1629  * request will be routed towards slave controller (if a route exists). The
1630  * slave controller will then route it to the delegated controller. The
1631  * configuration of the delegated controller is given and is used to either
1632  * create the delegated controller or to connect to an existing controller. Note
1633  * that while starting the delegated controller the configuration will be
1634  * modified to accommodate available free ports.  the 'is_subordinate' specifies
1635  * if the given delegated controller should be started and managed by the slave
1636  * controller, or if the delegated controller already has a master and the slave
1637  * controller connects to it as a non master controller. The success or failure
1638  * of this operation will be signalled through the
1639  * GNUNET_TESTBED_ControllerCallback() with an event of type
1640  * GNUNET_TESTBED_ET_OPERATION_FINISHED
1641  *
1642  * @param op_cls the operation closure for the event which is generated to
1643  *          signal success or failure of this operation
1644  * @param master handle to the master controller who creates the association
1645  * @param delegated_host requests to which host should be delegated; cannot be NULL
1646  * @param slave_host which host is used to run the slave controller; use NULL to
1647  *          make the master controller connect to the delegated host
1648  * @param is_subordinate GNUNET_YES if the controller at delegated_host should
1649  *          be started by the slave controller; GNUNET_NO if the slave
1650  *          controller has to connect to the already started delegated
1651  *          controller via TCP/IP
1652  * @return the operation handle
1653  */
1654 struct GNUNET_TESTBED_Operation *
1655 GNUNET_TESTBED_controller_link (void *op_cls,
1656                                 struct GNUNET_TESTBED_Controller *master,
1657                                 struct GNUNET_TESTBED_Host *delegated_host,
1658                                 struct GNUNET_TESTBED_Host *slave_host,
1659                                 int is_subordinate)
1660 {
1661   struct OperationContext *opc;
1662   struct GNUNET_TESTBED_ControllerLinkRequest *msg;
1663   struct ControllerLinkData *data;
1664   uint32_t slave_host_id;
1665   uint32_t delegated_host_id;
1666   uint16_t msg_size;
1667   
1668   GNUNET_assert (GNUNET_YES ==
1669                  GNUNET_TESTBED_is_host_registered_ (delegated_host, master));
1670   slave_host_id =
1671       GNUNET_TESTBED_host_get_id_ ((NULL !=
1672                                     slave_host) ? slave_host : master->host);
1673   delegated_host_id = GNUNET_TESTBED_host_get_id_ (delegated_host);
1674   if ((NULL != slave_host) && (0 != slave_host_id))
1675     GNUNET_assert (GNUNET_YES ==
1676                    GNUNET_TESTBED_is_host_registered_ (slave_host, master));
1677   msg_size = sizeof (struct GNUNET_TESTBED_ControllerLinkRequest);
1678   msg = GNUNET_malloc (msg_size);
1679   msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS);
1680   msg->header.size = htons (msg_size);
1681   msg->delegated_host_id = htonl (delegated_host_id);
1682   msg->slave_host_id = htonl (slave_host_id);
1683   msg->is_subordinate = (GNUNET_YES == is_subordinate) ? 1 : 0;
1684   data = GNUNET_malloc (sizeof (struct ControllerLinkData));
1685   data->msg = msg;
1686   data->host_id = delegated_host_id;
1687   opc = GNUNET_malloc (sizeof (struct OperationContext));
1688   opc->c = master;
1689   opc->data = data;
1690   opc->type = OP_LINK_CONTROLLERS;
1691   opc->id = GNUNET_TESTBED_get_next_op_id (opc->c);
1692   opc->state = OPC_STATE_INIT;
1693   opc->op_cls = op_cls;
1694   msg->operation_id = GNUNET_htonll (opc->id);
1695   opc->op =
1696       GNUNET_TESTBED_operation_create_ (opc, &opstart_link_controllers,
1697                                         &oprelease_link_controllers);
1698   GNUNET_TESTBED_operation_queue_insert_ (master->opq_parallel_operations,
1699                                           opc->op);
1700   GNUNET_TESTBED_operation_begin_wait_ (opc->op);
1701   return opc->op;
1702 }
1703
1704
1705 /**
1706  * Like GNUNET_TESTBED_get_slave_config(), however without the host registration
1707  * check. Another difference is that this function takes the id of the slave
1708  * host.
1709  *
1710  * @param op_cls the closure for the operation
1711  * @param master the handle to master controller
1712  * @param slave_host_id id of the host where the slave controller is running to
1713  *          the slave_host should remain valid until this operation is cancelled
1714  *          or marked as finished
1715  * @return the operation handle;
1716  */
1717 struct GNUNET_TESTBED_Operation *
1718 GNUNET_TESTBED_get_slave_config_ (void *op_cls,
1719                                   struct GNUNET_TESTBED_Controller *master,
1720                                   uint32_t slave_host_id)
1721 {
1722   struct OperationContext *opc;
1723   struct GetSlaveConfigData *data;
1724
1725   data = GNUNET_malloc (sizeof (struct GetSlaveConfigData));
1726   data->slave_id = slave_host_id;
1727   opc = GNUNET_malloc (sizeof (struct OperationContext));
1728   opc->state = OPC_STATE_INIT;
1729   opc->c = master;
1730   opc->id = GNUNET_TESTBED_get_next_op_id (master);
1731   opc->type = OP_GET_SLAVE_CONFIG;
1732   opc->data = data;
1733   opc->op_cls = op_cls;
1734   opc->op =
1735       GNUNET_TESTBED_operation_create_ (opc, &opstart_get_slave_config,
1736                                         &oprelease_get_slave_config);
1737   GNUNET_TESTBED_operation_queue_insert_ (master->opq_parallel_operations,
1738                                           opc->op);
1739   GNUNET_TESTBED_operation_begin_wait_ (opc->op);
1740   return opc->op;
1741 }
1742
1743
1744 /**
1745  * Function to acquire the configuration of a running slave controller. The
1746  * completion of the operation is signalled through the controller_cb from
1747  * GNUNET_TESTBED_controller_connect(). If the operation is successful the
1748  * handle to the configuration is available in the generic pointer of
1749  * operation_finished field of struct GNUNET_TESTBED_EventInformation.
1750  *
1751  * @param op_cls the closure for the operation
1752  * @param master the handle to master controller
1753  * @param slave_host the host where the slave controller is running; the handle
1754  *          to the slave_host should remain valid until this operation is
1755  *          cancelled or marked as finished
1756  * @return the operation handle; NULL if the slave_host is not registered at
1757  *           master
1758  */
1759 struct GNUNET_TESTBED_Operation *
1760 GNUNET_TESTBED_get_slave_config (void *op_cls,
1761                                  struct GNUNET_TESTBED_Controller *master,
1762                                  struct GNUNET_TESTBED_Host *slave_host)
1763 {
1764   if (GNUNET_NO == GNUNET_TESTBED_is_host_registered_ (slave_host, master))
1765     return NULL;
1766   return GNUNET_TESTBED_get_slave_config_ (op_cls, master,
1767                                            GNUNET_TESTBED_host_get_id_
1768                                            (slave_host));
1769 }
1770
1771
1772 /**
1773  * Ask the testbed controller to write the current overlay topology to
1774  * a file.  Naturally, the file will only contain a snapshot as the
1775  * topology may evolve all the time.
1776  *
1777  * @param controller overlay controller to inspect
1778  * @param filename name of the file the topology should
1779  *        be written to.
1780  */
1781 void
1782 GNUNET_TESTBED_overlay_write_topology_to_file (struct GNUNET_TESTBED_Controller
1783                                                *controller,
1784                                                const char *filename)
1785 {
1786   GNUNET_break (0);
1787 }
1788
1789
1790 /**
1791  * Creates a helper initialization message. This function is here because we
1792  * want to use this in testing
1793  *
1794  * @param trusted_ip the ip address of the controller which will be set as TRUSTED
1795  *          HOST(all connections form this ip are permitted by the testbed) when
1796  *          starting testbed controller at host. This can either be a single ip
1797  *          address or a network address in CIDR notation.
1798  * @param hostname the hostname of the destination this message is intended for
1799  * @param cfg the configuration that has to used to start the testbed service
1800  *          thru helper
1801  * @return the initialization message
1802  */
1803 struct GNUNET_TESTBED_HelperInit *
1804 GNUNET_TESTBED_create_helper_init_msg_ (const char *trusted_ip,
1805                                         const char *hostname,
1806                                         const struct GNUNET_CONFIGURATION_Handle
1807                                         *cfg)
1808 {
1809   struct GNUNET_TESTBED_HelperInit *msg;
1810   char *config;
1811   char *xconfig;
1812   size_t config_size;
1813   size_t xconfig_size;
1814   uint16_t trusted_ip_len;
1815   uint16_t hostname_len;
1816   uint16_t msg_size;
1817
1818   config = GNUNET_CONFIGURATION_serialize (cfg, &config_size);
1819   GNUNET_assert (NULL != config);
1820   xconfig_size =
1821       GNUNET_TESTBED_compress_config_ (config, config_size, &xconfig);
1822   GNUNET_free (config);
1823   trusted_ip_len = strlen (trusted_ip);
1824   hostname_len = (NULL == hostname) ? 0 : strlen (hostname);
1825   msg_size =
1826       xconfig_size + trusted_ip_len + 1 +
1827       sizeof (struct GNUNET_TESTBED_HelperInit);
1828   msg_size += hostname_len;
1829   msg = GNUNET_realloc (xconfig, msg_size);
1830   (void) memmove (((void *) &msg[1]) + trusted_ip_len + 1 + hostname_len, msg,
1831                   xconfig_size);
1832   msg->header.size = htons (msg_size);
1833   msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_HELPER_INIT);
1834   msg->trusted_ip_size = htons (trusted_ip_len);
1835   msg->hostname_size = htons (hostname_len);
1836   msg->config_size = htons (config_size);
1837   (void) strcpy ((char *) &msg[1], trusted_ip);
1838   if (0 != hostname_len)
1839     (void) strncpy (((char *) &msg[1]) + trusted_ip_len + 1, hostname,
1840                     hostname_len);
1841   return msg;
1842 }
1843
1844
1845 /**
1846  * Signal that the information from an operation has been fully
1847  * processed.  This function MUST be called for each event
1848  * of type 'operation_finished' to fully remove the operation
1849  * from the operation queue.  After calling this function, the
1850  * 'op_result' becomes invalid (!).
1851  *
1852  * @param operation operation to signal completion for
1853  */
1854 void
1855 GNUNET_TESTBED_operation_done (struct GNUNET_TESTBED_Operation *operation)
1856 {
1857   (void) exop_check (operation);
1858   GNUNET_TESTBED_operation_release_ (operation);
1859 }
1860
1861
1862 /**
1863  * Generates configuration by uncompressing configuration in given message. The
1864  * given message should be of the following types:
1865  * GNUNET_MESSAGE_TYPE_TESTBED_PEER_CONFIGURATION,
1866  * GNUNET_MESSAGE_TYPE_TESTBED_SLAVE_CONFIGURATION,
1867  * GNUNET_MESSAGE_TYPE_TESTBED_ADD_HOST,
1868  * GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS,
1869  * GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS_RESULT,
1870  *
1871  * @param msg the message containing compressed configuration
1872  * @return handle to the parsed configuration; NULL upon error while parsing the message
1873  */
1874 struct GNUNET_CONFIGURATION_Handle *
1875 GNUNET_TESTBED_extract_config_ (const struct GNUNET_MessageHeader *msg)
1876 {
1877   struct GNUNET_CONFIGURATION_Handle *cfg;
1878   Bytef *data;
1879   const Bytef *xdata;
1880   uLong data_len;
1881   uLong xdata_len;
1882   int ret;
1883
1884   switch (ntohs (msg->type))
1885   {
1886   case GNUNET_MESSAGE_TYPE_TESTBED_PEER_CONFIGURATION:
1887   {
1888     const struct GNUNET_TESTBED_PeerConfigurationInformationMessage *imsg;
1889
1890     imsg =
1891         (const struct GNUNET_TESTBED_PeerConfigurationInformationMessage *) msg;
1892     data_len = (uLong) ntohs (imsg->config_size);
1893     xdata_len =
1894         ntohs (imsg->header.size) -
1895         sizeof (struct GNUNET_TESTBED_PeerConfigurationInformationMessage);
1896     xdata = (const Bytef *) &imsg[1];
1897   }
1898     break;
1899   case GNUNET_MESSAGE_TYPE_TESTBED_SLAVE_CONFIGURATION:
1900   {
1901     const struct GNUNET_TESTBED_SlaveConfiguration *imsg;
1902
1903     imsg = (const struct GNUNET_TESTBED_SlaveConfiguration *) msg;
1904     data_len = (uLong) ntohs (imsg->config_size);
1905     xdata_len =
1906         ntohs (imsg->header.size) -
1907         sizeof (struct GNUNET_TESTBED_SlaveConfiguration);
1908     xdata = (const Bytef *) &imsg[1];
1909   }
1910   break;
1911   case GNUNET_MESSAGE_TYPE_TESTBED_ADD_HOST:
1912     {
1913       const struct GNUNET_TESTBED_AddHostMessage *imsg;
1914       uint16_t osize;
1915       
1916       imsg = (const struct GNUNET_TESTBED_AddHostMessage *) msg;
1917       data_len = (uLong) ntohs (imsg->config_size);
1918       osize = sizeof (struct GNUNET_TESTBED_AddHostMessage) +
1919           ntohs (imsg->username_length) + ntohs (imsg->hostname_length); 
1920       xdata_len = ntohs (imsg->header.size) - osize;
1921       xdata = (const Bytef *) ((const void *) imsg + osize);
1922     }
1923     break;
1924   case GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS_RESULT:
1925     {
1926       const struct GNUNET_TESTBED_ControllerLinkResponse *imsg;
1927       
1928       imsg = (const struct GNUNET_TESTBED_ControllerLinkResponse *) msg;
1929       data_len = ntohs (imsg->config_size);
1930       xdata_len = ntohs (imsg->header.size) - 
1931           sizeof (const struct GNUNET_TESTBED_ControllerLinkResponse);
1932       xdata = (const Bytef *) &imsg[1];
1933     }
1934     break;
1935   case GNUNET_MESSAGE_TYPE_TESTBED_CREATE_PEER:
1936     {
1937       const struct GNUNET_TESTBED_PeerCreateMessage *imsg;
1938
1939       imsg = (const struct GNUNET_TESTBED_PeerCreateMessage *) msg;
1940       data_len = ntohs (imsg->config_size);
1941       xdata_len = ntohs (imsg->header.size) -
1942           sizeof (struct GNUNET_TESTBED_PeerCreateMessage);
1943       xdata = (const Bytef *) &imsg[1];
1944     }
1945     break;
1946   case GNUNET_MESSAGE_TYPE_TESTBED_RECONFIGURE_PEER:
1947     {
1948       const struct GNUNET_TESTBED_PeerReconfigureMessage *imsg;
1949
1950       imsg = (const struct GNUNET_TESTBED_PeerReconfigureMessage *) msg;
1951       data_len =  ntohs (imsg->config_size);
1952       xdata_len = ntohs (imsg->header.size) -
1953           sizeof (struct GNUNET_TESTBED_PeerReconfigureMessage);
1954       xdata = (const Bytef *) &imsg[1];
1955     }
1956     break;
1957   default:
1958     GNUNET_assert (0);
1959   }
1960   data = GNUNET_malloc (data_len);
1961   if (Z_OK != (ret = uncompress (data, &data_len, xdata, xdata_len)))
1962   {
1963     GNUNET_free (data);
1964     GNUNET_break_op (0);        /* Un-compression failure */
1965     return NULL;
1966   }
1967   cfg = GNUNET_CONFIGURATION_create ();
1968   if (GNUNET_OK !=
1969       GNUNET_CONFIGURATION_deserialize (cfg, (const char *) data,
1970                                         (size_t) data_len,
1971                                         GNUNET_NO))
1972   {
1973     GNUNET_free (data);
1974     GNUNET_break_op (0);        /* De-serialization failure */
1975     return NULL;
1976   }
1977   GNUNET_free (data);
1978   return cfg;
1979 }
1980
1981
1982 /**
1983  * Checks the integrity of the OperationFailureEventMessage and if good returns
1984  * the error message it contains.
1985  *
1986  * @param msg the OperationFailureEventMessage
1987  * @return the error message
1988  */
1989 const char *
1990 GNUNET_TESTBED_parse_error_string_ (const struct
1991                                     GNUNET_TESTBED_OperationFailureEventMessage
1992                                     *msg)
1993 {
1994   uint16_t msize;
1995   const char *emsg;
1996
1997   msize = ntohs (msg->header.size);
1998   if (sizeof (struct GNUNET_TESTBED_OperationFailureEventMessage) >= msize)
1999     return NULL;
2000   msize -= sizeof (struct GNUNET_TESTBED_OperationFailureEventMessage);
2001   emsg = (const char *) &msg[1];
2002   if ('\0' != emsg[msize - 1])
2003   {
2004     GNUNET_break (0);
2005     return NULL;
2006   }
2007   return emsg;
2008 }
2009
2010
2011 /**
2012  * Function to return the operation id for a controller. The operation id is
2013  * created from the controllers host id and its internal operation counter.
2014  *
2015  * @param controller the handle to the controller whose operation id has to be incremented
2016  * @return the incremented operation id.
2017  */
2018 uint64_t
2019 GNUNET_TESTBED_get_next_op_id (struct GNUNET_TESTBED_Controller * controller)
2020 {
2021   uint64_t op_id;
2022
2023   op_id = (uint64_t) GNUNET_TESTBED_host_get_id_ (controller->host);
2024   op_id = op_id << 32;
2025   op_id |= (uint64_t) controller->operation_counter++;
2026   return op_id;
2027 }
2028
2029
2030 /**
2031  * Function called when a shutdown peers operation is ready
2032  *
2033  * @param cls the closure from GNUNET_TESTBED_operation_create_()
2034  */
2035 static void
2036 opstart_shutdown_peers (void *cls)
2037 {
2038   struct OperationContext *opc = cls;
2039   struct GNUNET_TESTBED_ShutdownPeersMessage *msg;
2040
2041   opc->state = OPC_STATE_STARTED;
2042   msg = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_ShutdownPeersMessage));
2043   msg->header.size = 
2044       htons (sizeof (struct GNUNET_TESTBED_ShutdownPeersMessage));
2045   msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_SHUTDOWN_PEERS);
2046   msg->operation_id = GNUNET_htonll (opc->id);
2047   GNUNET_TESTBED_insert_opc_ (opc->c, opc);
2048   GNUNET_TESTBED_queue_message_ (opc->c, &msg->header);
2049 }
2050
2051
2052 /**
2053  * Callback which will be called when shutdown peers operation is released
2054  *
2055  * @param cls the closure from GNUNET_TESTBED_operation_create_()
2056  */
2057 static void
2058 oprelease_shutdown_peers (void *cls)
2059 {
2060   struct OperationContext *opc = cls;
2061
2062   switch (opc->state)
2063   {
2064   case OPC_STATE_STARTED:
2065     GNUNET_TESTBED_remove_opc_ (opc->c, opc);
2066     /* no break; continue */
2067   case OPC_STATE_INIT:
2068     GNUNET_free (opc->data);
2069     break;
2070   case OPC_STATE_FINISHED:
2071     break;
2072   }
2073   GNUNET_free (opc);
2074 }
2075
2076
2077 /**
2078  * Stops and destroys all peers.  Is equivalent of calling
2079  * GNUNET_TESTBED_peer_stop() and GNUNET_TESTBED_peer_destroy() on all peers,
2080  * except that the peer stop event and operation finished event corresponding to
2081  * the respective functions are not generated.  This function should be called
2082  * when there are no other pending operations.  If there are pending operations,
2083  * it will return NULL
2084  *
2085  * @param c the controller to send this message to
2086  * @param op_cls closure for the operation
2087  * @param cb the callback to call when all peers are stopped and destroyed
2088  * @param cb_cls the closure for the callback
2089  * @return operation handle on success; NULL if any pending operations are
2090  *           present
2091  */
2092 struct GNUNET_TESTBED_Operation *
2093 GNUNET_TESTBED_shutdown_peers (struct GNUNET_TESTBED_Controller *c,
2094                                void *op_cls,
2095                                GNUNET_TESTBED_OperationCompletionCallback cb,
2096                                void *cb_cls)
2097 {
2098   struct OperationContext *opc;
2099   struct ShutdownPeersData *data;
2100
2101   if (0 != GNUNET_CONTAINER_multihashmap32_size (c->opc_map))
2102     return NULL;
2103   data = GNUNET_malloc (sizeof (struct ShutdownPeersData));
2104   data->cb = cb;
2105   data->cb_cls = cb_cls;
2106   opc = GNUNET_malloc (sizeof (struct OperationContext));
2107   opc->c = c;
2108   opc->op_cls = op_cls;
2109   opc->data = data;
2110   opc->id =  GNUNET_TESTBED_get_next_op_id (c);
2111   opc->type = OP_SHUTDOWN_PEERS;
2112   opc->state = OPC_STATE_INIT;  
2113   opc->op = GNUNET_TESTBED_operation_create_ (opc, &opstart_shutdown_peers,
2114                                               &oprelease_shutdown_peers);
2115   GNUNET_TESTBED_operation_queue_insert_ (opc->c->opq_parallel_operations,
2116                                         opc->op);
2117   GNUNET_TESTBED_operation_begin_wait_ (opc->op);
2118   return opc->op;
2119 }
2120
2121
2122 /* end of testbed_api.c */