fix 2893: Move adaptive parallelisation mechanism to operation queues
[oweals/gnunet.git] / src / testbed / testbed_api.c
1 /*
2       This file is part of GNUnet
3       (C) 2008--2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * @file testbed/testbed_api.c
23  * @brief API for accessing the GNUnet testing service.
24  *        This library is supposed to make it easier to write
25  *        testcases and script large-scale benchmarks.
26  * @author Christian Grothoff
27  * @author Sree Harsha Totakura
28  */
29
30
31 #include "platform.h"
32 #include "gnunet_testbed_service.h"
33 #include "gnunet_core_service.h"
34 #include "gnunet_constants.h"
35 #include "gnunet_transport_service.h"
36 #include "gnunet_hello_lib.h"
37 #include <zlib.h>
38
39 #include "testbed.h"
40 #include "testbed_api.h"
41 #include "testbed_api_hosts.h"
42 #include "testbed_api_peers.h"
43 #include "testbed_api_operations.h"
44 #include "testbed_api_sd.h"
45
46 /**
47  * Generic logging shorthand
48  */
49 #define LOG(kind, ...)                          \
50   GNUNET_log_from (kind, "testbed-api", __VA_ARGS__)
51
52 /**
53  * Debug logging
54  */
55 #define LOG_DEBUG(...)                          \
56   LOG (GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__)
57
58 /**
59  * Relative time seconds shorthand
60  */
61 #define TIME_REL_SECS(sec) \
62   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
63
64
65 /**
66  * Default server message sending retry timeout
67  */
68 #define TIMEOUT_REL TIME_REL_SECS(1)
69
70
71 /**
72  * The message queue for sending messages to the controller service
73  */
74 struct MessageQueue
75 {
76   /**
77    * The message to be sent
78    */
79   struct GNUNET_MessageHeader *msg;
80
81   /**
82    * next pointer for DLL
83    */
84   struct MessageQueue *next;
85
86   /**
87    * prev pointer for DLL
88    */
89   struct MessageQueue *prev;
90 };
91
92
93 /**
94  * Context data for forwarded Operation
95  */
96 struct ForwardedOperationData
97 {
98
99   /**
100    * The callback to call when reply is available
101    */
102   GNUNET_CLIENT_MessageHandler cc;
103
104   /**
105    * The closure for the above callback
106    */
107   void *cc_cls;
108
109 };
110
111
112 /**
113  * Context data for get slave config operations
114  */
115 struct GetSlaveConfigData
116 {
117   /**
118    * The id of the slave controller
119    */
120   uint32_t slave_id;
121
122 };
123
124
125 /**
126  * Context data for controller link operations
127  */
128 struct ControllerLinkData
129 {
130   /**
131    * The controller link message
132    */
133   struct GNUNET_TESTBED_ControllerLinkRequest *msg;
134
135   /**
136    * The id of the host which is hosting the controller to be linked
137    */
138   uint32_t host_id;
139
140 };
141
142
143 /**
144  * Date context for OP_SHUTDOWN_PEERS operations
145  */
146 struct ShutdownPeersData
147 {
148   /**
149    * The operation completion callback to call
150    */
151   GNUNET_TESTBED_OperationCompletionCallback cb;
152
153   /**
154    * The closure for the above callback
155    */
156   void *cb_cls;
157 };
158
159
160 /**
161  * An entry in the stack for keeping operations which are about to expire
162  */
163 struct ExpireOperationEntry
164 {
165   /**
166    * DLL head; new entries are to be inserted here
167    */
168   struct ExpireOperationEntry *next;
169
170   /**
171    * DLL tail; entries are deleted from here
172    */
173   struct ExpireOperationEntry *prev;
174
175   /**
176    * The operation.  This will be a dangling pointer when the operation is freed
177    */
178   const struct GNUNET_TESTBED_Operation *op;
179 };
180
181
182 /**
183  * DLL head for list of operations marked for expiry
184  */
185 static struct ExpireOperationEntry *exop_head;
186
187 /**
188  * DLL tail for list of operation marked for expiry
189  */
190 static struct ExpireOperationEntry *exop_tail;
191
192
193 /**
194  * Inserts an operation into the list of operations marked for expiry
195  *
196  * @param op the operation to insert
197  */
198 static void
199 exop_insert (struct GNUNET_TESTBED_Operation *op)
200 {
201   struct ExpireOperationEntry *entry;
202
203   entry = GNUNET_malloc (sizeof (struct ExpireOperationEntry));
204   entry->op = op;
205   GNUNET_CONTAINER_DLL_insert_tail (exop_head, exop_tail, entry);  
206 }
207
208
209 /**
210  * Checks if an operation is present in the list of operations marked for
211  * expiry.  If the operation is found, it and the tail of operations after it
212  * are removed from the list.
213  *
214  * @param op the operation to check
215  * @return GNUNET_NO if the operation is not present in the list; GNUNET_YES if
216  *           the operation is found in the list (the operation is then removed
217  *           from the list -- calling this function again with the same
218  *           paramenter will return GNUNET_NO)
219  */
220 static int
221 exop_check (const struct GNUNET_TESTBED_Operation *const op)
222 {
223   struct ExpireOperationEntry *entry;
224   struct ExpireOperationEntry *entry2;
225   int found;
226
227   found = GNUNET_NO;
228   entry = exop_head;
229   while (NULL != entry)
230   {
231     if (op == entry->op)
232     {
233       found = GNUNET_YES;
234       break;
235     }
236     entry = entry->next;
237   }
238   if (GNUNET_NO == found)
239     return GNUNET_NO;
240   /* Truncate the tail */
241   while (NULL != entry)
242   {
243     entry2 = entry->next;
244     GNUNET_CONTAINER_DLL_remove (exop_head, exop_tail, entry);
245     GNUNET_free (entry);
246     entry = entry2;
247   }
248   return GNUNET_YES;
249 }
250
251
252 /**
253  * Context information to be used while searching for operation contexts
254  */
255 struct SearchContext
256 {
257   /**
258    * The result of the search
259    */
260   struct OperationContext *opc;
261
262   /**
263    * The id of the operation context we are searching for
264    */
265   uint64_t id;
266 };
267
268
269 /**
270  * Search iterator for searching an operation context
271  *
272  * @param cls the serach context
273  * @param key current key code
274  * @param value value in the hash map
275  * @return GNUNET_YES if we should continue to
276  *         iterate,
277  *         GNUNET_NO if not.
278  */
279 static int
280 opc_search_iterator (void *cls, uint32_t key, void *value)
281 {
282   struct SearchContext *sc = cls;
283   struct OperationContext *opc = value;
284   
285   GNUNET_assert (NULL != opc);
286   GNUNET_assert (NULL == sc->opc);
287   if (opc->id != sc->id)
288     return GNUNET_YES;
289   sc->opc = opc;
290   return GNUNET_NO;
291 }
292
293
294 /**
295  * Returns the operation context with the given id if found in the Operation
296  * context queues of the controller
297  *
298  * @param c the controller whose operation context map is searched
299  * @param id the id which has to be checked
300  * @return the matching operation context; NULL if no match found
301  */
302 static struct OperationContext *
303 find_opc (const struct GNUNET_TESTBED_Controller *c, const uint64_t id)
304 {
305   struct SearchContext sc;
306
307   sc.id = id;
308   sc.opc = NULL;
309   GNUNET_assert (NULL != c->opc_map);
310   if (GNUNET_SYSERR != 
311       GNUNET_CONTAINER_multihashmap32_get_multiple (c->opc_map, (uint32_t) id,
312                                                     &opc_search_iterator, &sc))
313     return NULL;
314   return sc.opc;
315 }
316
317
318 /**
319  * Inserts the given operation context into the operation context map of the
320  * given controller.  Creates the operation context map if one does not exist
321  * for the controller
322  *
323  * @param c the controller
324  * @param opc the operation context to be inserted
325  */
326 void
327 GNUNET_TESTBED_insert_opc_ (struct GNUNET_TESTBED_Controller *c,
328                             struct OperationContext *opc)
329 {
330   if (NULL == c->opc_map)
331     c->opc_map = GNUNET_CONTAINER_multihashmap32_create (256);
332   GNUNET_assert (GNUNET_OK ==
333                  GNUNET_CONTAINER_multihashmap32_put (c->opc_map, 
334                                                       (uint32_t) opc->id, opc,
335                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
336 }
337
338
339 /**
340  * Removes the given operation context from the operation context map of the
341  * given controller
342  *
343  * @param c the controller
344  * @param opc the operation context to remove 
345  */
346 void
347 GNUNET_TESTBED_remove_opc_ (const struct GNUNET_TESTBED_Controller *c,
348                             struct OperationContext *opc)
349 {
350   GNUNET_assert (NULL != c->opc_map);
351   GNUNET_assert (GNUNET_YES ==
352                  GNUNET_CONTAINER_multihashmap32_remove (c->opc_map,
353                                                          (uint32_t) opc->id,
354                                                          opc));
355 }
356
357
358 /**
359  * Handler for forwarded operations
360  *
361  * @param c the controller handle
362  * @param opc the opearation context
363  * @param msg the message
364  */
365 static void
366 handle_forwarded_operation_msg (struct GNUNET_TESTBED_Controller *c,
367                                 struct OperationContext *opc,
368                                 const struct GNUNET_MessageHeader *msg)
369 {
370   struct ForwardedOperationData *fo_data;
371
372   fo_data = opc->data;
373   if (NULL != fo_data->cc)
374     fo_data->cc (fo_data->cc_cls, msg);
375   GNUNET_TESTBED_remove_opc_ (c, opc);
376   GNUNET_free (fo_data);
377   GNUNET_free (opc);
378 }
379
380
381 /**
382  * Handler for GNUNET_MESSAGE_TYPE_TESTBED_ADDHOSTCONFIRM message from
383  * controller (testbed service)
384  *
385  * @param c the controller handler
386  * @param msg message received
387  * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if
388  *           not
389  */
390 static int
391 handle_opsuccess (struct GNUNET_TESTBED_Controller *c,
392                   const struct
393                   GNUNET_TESTBED_GenericOperationSuccessEventMessage *msg)
394 {
395   struct OperationContext *opc;
396   GNUNET_TESTBED_OperationCompletionCallback op_comp_cb;
397   void *op_comp_cb_cls;
398   struct GNUNET_TESTBED_EventInformation event;
399   uint64_t op_id;
400
401   op_id = GNUNET_ntohll (msg->operation_id);
402   LOG_DEBUG ("Operation %lu successful\n", op_id);
403   if (NULL == (opc = find_opc (c, op_id)))
404   {
405     LOG_DEBUG ("Operation not found\n");
406     return GNUNET_YES;
407   }
408   event.type = GNUNET_TESTBED_ET_OPERATION_FINISHED;
409   event.op = opc->op;
410   event.op_cls = opc->op_cls;
411   event.details.operation_finished.emsg = NULL;
412   event.details.operation_finished.generic = NULL;
413   op_comp_cb = NULL;
414   op_comp_cb_cls = NULL;
415   switch (opc->type)
416   {
417   case OP_FORWARDED:
418   {
419     handle_forwarded_operation_msg (c, opc,
420                                     (const struct GNUNET_MessageHeader *) msg);
421     return GNUNET_YES;
422   }
423     break;
424   case OP_PEER_DESTROY:
425   {
426     struct GNUNET_TESTBED_Peer *peer;
427
428     peer = opc->data;
429     GNUNET_TESTBED_peer_deregister_ (peer);
430     GNUNET_free (peer);
431     opc->data = NULL;
432     //PEERDESTROYDATA
433   }
434     break;
435   case OP_SHUTDOWN_PEERS:
436   {
437     struct ShutdownPeersData *data;
438
439     data = opc->data;
440     op_comp_cb = data->cb;
441     op_comp_cb_cls = data->cb_cls;
442     GNUNET_free (data);
443     opc->data = NULL;
444     GNUNET_TESTBED_cleanup_peers_ ();
445   }
446     break;
447   case OP_MANAGE_SERVICE:
448   {
449     struct ManageServiceData *data;
450
451     GNUNET_assert (NULL != (data = opc->data));
452     op_comp_cb = data->cb;
453     op_comp_cb_cls = data->cb_cls;
454     GNUNET_free (data);
455     opc->data = NULL;
456   }
457     break;
458   case OP_PEER_RECONFIGURE:
459     break;
460   default:
461     GNUNET_assert (0);
462   }
463   GNUNET_TESTBED_remove_opc_ (opc->c, opc);
464   opc->state = OPC_STATE_FINISHED;
465   exop_insert (event.op);  
466   if (0 != (c->event_mask & (1L << GNUNET_TESTBED_ET_OPERATION_FINISHED)))
467   {
468     if (NULL != c->cc)
469       c->cc (c->cc_cls, &event);
470     if (GNUNET_NO == exop_check (event.op))
471       return GNUNET_YES;
472   }
473   else
474     LOG_DEBUG ("Not calling callback\n");
475   if (NULL != op_comp_cb)
476     op_comp_cb (op_comp_cb_cls, event.op, NULL);
477    /* You could have marked the operation as done by now */
478   GNUNET_break (GNUNET_NO == exop_check (event.op));
479   return GNUNET_YES;
480 }
481
482
483 /**
484  * Handler for GNUNET_MESSAGE_TYPE_TESTBED_PEERCREATESUCCESS message from
485  * controller (testbed service)
486  *
487  * @param c the controller handle
488  * @param msg message received
489  * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if
490  *           not
491  */
492 static int
493 handle_peer_create_success (struct GNUNET_TESTBED_Controller *c,
494                             const struct
495                             GNUNET_TESTBED_PeerCreateSuccessEventMessage *msg)
496 {
497   struct OperationContext *opc;
498   struct PeerCreateData *data;
499   struct GNUNET_TESTBED_Peer *peer;
500   struct GNUNET_TESTBED_Operation *op;
501   GNUNET_TESTBED_PeerCreateCallback cb;
502   void *cls;
503   uint64_t op_id;
504
505   GNUNET_assert (sizeof (struct GNUNET_TESTBED_PeerCreateSuccessEventMessage) ==
506                  ntohs (msg->header.size));
507   op_id = GNUNET_ntohll (msg->operation_id);
508   if (NULL == (opc = find_opc (c, op_id)))
509   {
510     LOG_DEBUG ("Operation context for PeerCreateSuccessEvent not found\n");
511     return GNUNET_YES;
512   }
513   if (OP_FORWARDED == opc->type)
514   {
515     handle_forwarded_operation_msg (c, opc,
516                                     (const struct GNUNET_MessageHeader *) msg);
517     return GNUNET_YES;
518   }
519   GNUNET_assert (OP_PEER_CREATE == opc->type);
520   GNUNET_assert (NULL != opc->data);
521   data = opc->data;
522   GNUNET_assert (NULL != data->peer);
523   peer = data->peer;
524   GNUNET_assert (peer->unique_id == ntohl (msg->peer_id));
525   peer->state = PS_CREATED;
526   GNUNET_TESTBED_peer_register_ (peer);
527   cb = data->cb;
528   cls = data->cls;
529   op = opc->op;
530   GNUNET_free (opc->data);
531   GNUNET_TESTBED_remove_opc_ (opc->c, opc);
532   opc->state = OPC_STATE_FINISHED;
533   exop_insert (op);
534   if (NULL != cb)
535     cb (cls, peer, NULL);
536    /* You could have marked the operation as done by now */
537   GNUNET_break (GNUNET_NO == exop_check (op));
538   return GNUNET_YES;
539 }
540
541
542 /**
543  * Handler for GNUNET_MESSAGE_TYPE_TESTBED_PEEREVENT message from
544  * controller (testbed service)
545  *
546  * @param c the controller handler
547  * @param msg message received
548  * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if
549  *           not
550  */
551 static int
552 handle_peer_event (struct GNUNET_TESTBED_Controller *c,
553                    const struct GNUNET_TESTBED_PeerEventMessage *msg)
554 {
555   struct OperationContext *opc;
556   struct GNUNET_TESTBED_Peer *peer;
557   struct PeerEventData *data;
558   GNUNET_TESTBED_PeerChurnCallback pcc;
559   void *pcc_cls;
560   struct GNUNET_TESTBED_EventInformation event;
561   uint64_t op_id;
562   uint64_t mask;
563
564   GNUNET_assert (sizeof (struct GNUNET_TESTBED_PeerEventMessage) ==
565                  ntohs (msg->header.size));
566   op_id = GNUNET_ntohll (msg->operation_id);
567   if (NULL == (opc = find_opc (c, op_id)))
568   {
569     LOG_DEBUG ("Operation not found\n");
570     return GNUNET_YES;
571   }
572   if (OP_FORWARDED == opc->type)
573   {
574     handle_forwarded_operation_msg (c, opc,
575                                     (const struct GNUNET_MessageHeader *) msg);
576     return GNUNET_YES;
577   }
578   GNUNET_assert ((OP_PEER_START == opc->type) || (OP_PEER_STOP == opc->type));
579   data = opc->data;
580   GNUNET_assert (NULL != data);
581   peer = data->peer;
582   GNUNET_assert (NULL != peer);
583   event.type = (enum GNUNET_TESTBED_EventType) ntohl (msg->event_type);
584   event.op = opc->op;
585   event.op_cls = opc->op_cls;
586   switch (event.type)
587   {
588   case GNUNET_TESTBED_ET_PEER_START:
589     peer->state = PS_STARTED;
590     event.details.peer_start.host = peer->host;
591     event.details.peer_start.peer = peer;
592     break;
593   case GNUNET_TESTBED_ET_PEER_STOP:
594     peer->state = PS_STOPPED;
595     event.details.peer_stop.peer = peer;
596     break;
597   default:
598     GNUNET_assert (0);          /* We should never reach this state */
599   }
600   pcc = data->pcc;
601   pcc_cls = data->pcc_cls;
602   GNUNET_free (data);
603   GNUNET_TESTBED_remove_opc_ (opc->c, opc);
604   opc->state = OPC_STATE_FINISHED;
605   exop_insert (event.op);
606   mask = 1LL << GNUNET_TESTBED_ET_PEER_START;
607   mask |= 1LL << GNUNET_TESTBED_ET_PEER_STOP;
608   if (0 != (mask & c->event_mask))
609   {
610     if (NULL != c->cc)
611       c->cc (c->cc_cls, &event);    
612     if (GNUNET_NO == exop_check (event.op))
613       return GNUNET_YES;
614   }
615   if (NULL != pcc)
616     pcc (pcc_cls, NULL);
617    /* You could have marked the operation as done by now */
618   GNUNET_break (GNUNET_NO == exop_check (event.op));
619   return GNUNET_YES;
620 }
621
622
623 /**
624  * Handler for GNUNET_MESSAGE_TYPE_TESTBED_PEERCONEVENT message from
625  * controller (testbed service)
626  *
627  * @param c the controller handler
628  * @param msg message received
629  * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if
630  *           not
631  */
632 static int
633 handle_peer_conevent (struct GNUNET_TESTBED_Controller *c,
634                       const struct GNUNET_TESTBED_ConnectionEventMessage *msg)
635 {
636   struct OperationContext *opc;
637   struct OverlayConnectData *data;
638   GNUNET_TESTBED_OperationCompletionCallback cb;
639   void *cb_cls;
640   struct GNUNET_TESTBED_EventInformation event;
641   uint64_t op_id;
642   uint64_t mask;
643
644   op_id = GNUNET_ntohll (msg->operation_id);
645   if (NULL == (opc = find_opc (c, op_id)))
646   {
647     LOG_DEBUG ("Operation not found\n");
648     return GNUNET_YES;
649   }
650   if (OP_FORWARDED == opc->type)
651   {
652     handle_forwarded_operation_msg (c, opc,
653                                     (const struct GNUNET_MessageHeader *) msg);
654     return GNUNET_YES;
655   }
656   GNUNET_assert (OP_OVERLAY_CONNECT == opc->type);
657   GNUNET_assert (NULL != (data = opc->data));
658   GNUNET_assert ((ntohl (msg->peer1) == data->p1->unique_id) &&
659                  (ntohl (msg->peer2) == data->p2->unique_id));
660   event.type = (enum GNUNET_TESTBED_EventType) ntohl (msg->event_type);
661   event.op = opc->op;
662   event.op_cls = opc->op_cls;
663   switch (event.type)
664   {
665   case GNUNET_TESTBED_ET_CONNECT:
666     event.details.peer_connect.peer1 = data->p1;
667     event.details.peer_connect.peer2 = data->p2;
668     break;
669   case GNUNET_TESTBED_ET_DISCONNECT:
670     GNUNET_assert (0);          /* FIXME: implement */
671     break;
672   default:
673     GNUNET_assert (0);          /* Should never reach here */
674     break;
675   }
676   cb = data->cb;
677   cb_cls = data->cb_cls;
678   GNUNET_TESTBED_remove_opc_ (opc->c, opc);
679   opc->state = OPC_STATE_FINISHED;
680   exop_insert (event.op);
681   mask = 1LL << GNUNET_TESTBED_ET_CONNECT;
682   mask |= 1LL << GNUNET_TESTBED_ET_DISCONNECT;
683   if (0 != (mask & c->event_mask))
684   {
685     if (NULL != c->cc)
686       c->cc (c->cc_cls, &event);
687     if (GNUNET_NO == exop_check (event.op))
688       return GNUNET_YES;
689   }  
690   if (NULL != cb)
691     cb (cb_cls, opc->op, NULL);
692    /* You could have marked the operation as done by now */
693   GNUNET_break (GNUNET_NO == exop_check (event.op));
694   return GNUNET_YES;
695 }
696
697
698 /**
699  * Handler for GNUNET_MESSAGE_TYPE_TESTBED_PEERCONFIG message from
700  * controller (testbed service)
701  *
702  * @param c the controller handler
703  * @param msg message received
704  * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if
705  *           not
706  */
707 static int
708 handle_peer_config (struct GNUNET_TESTBED_Controller *c,
709                     const struct
710                     GNUNET_TESTBED_PeerConfigurationInformationMessage *msg)
711 {
712   struct OperationContext *opc;
713   struct GNUNET_TESTBED_Peer *peer;
714   struct PeerInfoData *data;
715   struct GNUNET_TESTBED_PeerInformation *pinfo;
716   GNUNET_TESTBED_PeerInfoCallback cb;
717   void *cb_cls;
718   uint64_t op_id;
719
720   op_id = GNUNET_ntohll (msg->operation_id);
721   if (NULL == (opc = find_opc (c, op_id)))
722   {
723     LOG_DEBUG ("Operation not found\n");
724     return GNUNET_YES;
725   }
726   if (OP_FORWARDED == opc->type)
727   {
728     handle_forwarded_operation_msg (c, opc,
729                                     (const struct GNUNET_MessageHeader *) msg);
730     return GNUNET_YES;
731   }
732   data = opc->data;
733   GNUNET_assert (NULL != data);
734   peer = data->peer;
735   GNUNET_assert (NULL != peer);
736   GNUNET_assert (ntohl (msg->peer_id) == peer->unique_id);
737   pinfo = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_PeerInformation));
738   pinfo->pit = data->pit;  
739   cb = data->cb;
740   cb_cls = data->cb_cls;
741   GNUNET_assert (NULL != cb);
742   GNUNET_free (data);
743   opc->data = NULL;  
744   switch (pinfo->pit)
745   {
746   case GNUNET_TESTBED_PIT_IDENTITY:
747     pinfo->result.id = GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity));
748     (void) memcpy (pinfo->result.id, &msg->peer_identity,
749                    sizeof (struct GNUNET_PeerIdentity));
750     break;
751   case GNUNET_TESTBED_PIT_CONFIGURATION:
752     pinfo->result.cfg =         /* Freed in oprelease_peer_getinfo */
753         GNUNET_TESTBED_extract_config_ (&msg->header);
754     break;
755   case GNUNET_TESTBED_PIT_GENERIC:
756     GNUNET_assert (0);          /* never reach here */
757     break;
758   }
759   opc->data = pinfo;
760   GNUNET_TESTBED_remove_opc_ (opc->c, opc);  
761   opc->state = OPC_STATE_FINISHED;
762   cb (cb_cls, opc->op, pinfo, NULL);
763   /* We dont check whether the operation is marked as done here as the
764      operation contains data (cfg/identify) which will be freed at a later point
765   */
766   return GNUNET_YES;
767 }
768
769
770 /**
771  * Handler for GNUNET_MESSAGE_TYPE_TESTBED_OPERATIONFAILEVENT message from
772  * controller (testbed service)
773  *
774  * @param c the controller handler
775  * @param msg message received
776  * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if
777  *           not
778  */
779 static int
780 handle_op_fail_event (struct GNUNET_TESTBED_Controller *c,
781                       const struct GNUNET_TESTBED_OperationFailureEventMessage
782                       *msg)
783 {
784   struct OperationContext *opc;
785   const char *emsg;
786   uint64_t op_id;
787   uint64_t mask;
788   struct GNUNET_TESTBED_EventInformation event;
789
790   op_id = GNUNET_ntohll (msg->operation_id);
791   if (NULL == (opc = find_opc (c, op_id)))
792   {
793     LOG_DEBUG ("Operation not found\n");
794     return GNUNET_YES;
795   }
796   if (OP_FORWARDED == opc->type)
797   {
798     handle_forwarded_operation_msg (c, opc,
799                                     (const struct GNUNET_MessageHeader *) msg);
800     return GNUNET_YES;
801   }
802   GNUNET_TESTBED_remove_opc_ (opc->c, opc);
803   opc->state = OPC_STATE_FINISHED;
804   emsg = GNUNET_TESTBED_parse_error_string_ (msg);
805   if (NULL == emsg)
806     emsg = "Unknown error";
807   if (OP_PEER_INFO == opc->type)
808   {
809     struct PeerInfoData *data;
810
811     data = opc->data;
812     if (NULL != data->cb)
813       data->cb (data->cb_cls, opc->op, NULL, emsg);
814     GNUNET_free (data);
815     return GNUNET_YES;          /* We do not call controller callback for peer info */
816   }
817   event.type = GNUNET_TESTBED_ET_OPERATION_FINISHED;
818   event.op = opc->op;
819   event.op_cls = opc->op_cls;
820   event.details.operation_finished.emsg = emsg;
821   event.details.operation_finished.generic = NULL;
822   mask = (1LL << GNUNET_TESTBED_ET_OPERATION_FINISHED);
823   if ((0 != (mask & c->event_mask)) && (NULL != c->cc))
824   {
825     exop_insert (event.op);
826     c->cc (c->cc_cls, &event);
827     if (GNUNET_NO == exop_check (event.op))
828       return GNUNET_YES;
829   }
830   switch (opc->type)
831   {
832   case OP_PEER_CREATE:
833   {
834     struct PeerCreateData *data;
835
836     data = opc->data;
837     GNUNET_free (data->peer);
838     if (NULL != data->cb)
839       data->cb (data->cls, NULL, emsg);
840     GNUNET_free (data);
841   }
842     break;
843   case OP_PEER_START:
844   case OP_PEER_STOP:
845   {
846     struct PeerEventData *data;
847
848     data = opc->data;
849     if (NULL != data->pcc)
850       data->pcc (data->pcc_cls, emsg);
851     GNUNET_free (data);
852   }
853     break;
854   case OP_PEER_DESTROY:
855     break;
856   case OP_PEER_INFO:
857     GNUNET_assert (0);
858   case OP_OVERLAY_CONNECT:
859   {
860     struct OverlayConnectData *data;
861
862     data = opc->data;
863     GNUNET_TESTBED_operation_mark_failed (opc->op);
864     if (NULL != data->cb)
865       data->cb (data->cb_cls, opc->op, emsg);
866   }
867     break;
868   case OP_FORWARDED:
869     GNUNET_assert (0);
870   case OP_LINK_CONTROLLERS:    /* No secondary callback */
871     break;    
872   case OP_SHUTDOWN_PEERS:
873   {
874     struct ShutdownPeersData *data;
875
876     data = opc->data;
877     GNUNET_free (data);         /* FIXME: Decide whether we call data->op_cb */
878     opc->data = NULL;
879   }
880     break;
881   case OP_MANAGE_SERVICE:
882   {
883     struct ManageServiceData *data = opc->data;
884       GNUNET_TESTBED_OperationCompletionCallback cb;
885     void *cb_cls;
886
887     GNUNET_assert (NULL != data);
888     cb = data->cb;
889     cb_cls = data->cb_cls;
890     GNUNET_free (data);
891     opc->data = NULL;
892     exop_insert (event.op);
893     if (NULL != cb)
894       cb (cb_cls, opc->op, emsg);
895     /* You could have marked the operation as done by now */
896     GNUNET_break (GNUNET_NO == exop_check (event.op));
897   }
898     break;
899   default:
900     GNUNET_break (0);
901   }
902   return GNUNET_YES;
903 }
904
905
906 /**
907  * Function to build GET_SLAVE_CONFIG message
908  *
909  * @param op_id the id this message should contain in its operation id field
910  * @param slave_id the id this message should contain in its slave id field
911  * @return newly allocated SlaveGetConfigurationMessage
912  */
913 static struct GNUNET_TESTBED_SlaveGetConfigurationMessage *
914 GNUNET_TESTBED_generate_slavegetconfig_msg_ (uint64_t op_id, uint32_t slave_id)
915 {
916   struct GNUNET_TESTBED_SlaveGetConfigurationMessage *msg;
917   uint16_t msize;
918
919   msize = sizeof (struct GNUNET_TESTBED_SlaveGetConfigurationMessage);
920   msg = GNUNET_malloc (msize);
921   msg->header.size = htons (msize);
922   msg->header.type =
923       htons (GNUNET_MESSAGE_TYPE_TESTBED_GET_SLAVE_CONFIGURATION);
924   msg->operation_id = GNUNET_htonll (op_id);
925   msg->slave_id = htonl (slave_id);
926   return msg;
927 }
928
929
930 /**
931  * Handler for GNUNET_MESSAGE_TYPE_TESTBED_SLAVECONFIG message from controller
932  * (testbed service)
933  *
934  * @param c the controller handler
935  * @param msg message received
936  * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if
937  *           not
938  */
939 static int
940 handle_slave_config (struct GNUNET_TESTBED_Controller *c,
941                      const struct GNUNET_TESTBED_SlaveConfiguration *msg)
942 {
943   struct OperationContext *opc;
944   uint64_t op_id;
945   uint64_t mask;
946   struct GNUNET_TESTBED_EventInformation event;
947
948   op_id = GNUNET_ntohll (msg->operation_id);
949   if (NULL == (opc = find_opc (c, op_id)))
950   {
951     LOG_DEBUG ("Operation not found\n");
952     return GNUNET_YES;
953   }
954   if (OP_GET_SLAVE_CONFIG != opc->type)
955   {
956     GNUNET_break (0);
957     return GNUNET_YES;
958   }
959   opc->state = OPC_STATE_FINISHED;
960   GNUNET_TESTBED_remove_opc_ (opc->c, opc);
961   mask = 1LL << GNUNET_TESTBED_ET_OPERATION_FINISHED;
962   if ((0 != (mask & c->event_mask)) &&
963       (NULL != c->cc))
964   {
965     opc->data = GNUNET_TESTBED_extract_config_ (&msg->header);
966     event.type = GNUNET_TESTBED_ET_OPERATION_FINISHED;
967     event.op = opc->op;
968     event.op_cls = opc->op_cls;
969     event.details.operation_finished.generic = opc->data;
970     event.details.operation_finished.emsg = NULL;
971     c->cc (c->cc_cls, &event);
972   }
973   return GNUNET_YES;
974 }
975
976
977 /**
978  * Handler for GNUNET_MESSAGE_TYPE_TESTBED_SLAVECONFIG message from controller
979  * (testbed service)
980  *
981  * @param c the controller handler
982  * @param msg message received
983  * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if
984  *           not
985  */
986 static int
987 handle_link_controllers_result (struct GNUNET_TESTBED_Controller *c,
988                                 const struct
989                                 GNUNET_TESTBED_ControllerLinkResponse *msg)
990 {
991   struct OperationContext *opc;
992   struct ControllerLinkData *data;
993   struct GNUNET_CONFIGURATION_Handle *cfg;
994   struct GNUNET_TESTBED_Host *host;
995   char *emsg;
996   uint64_t op_id;
997   struct GNUNET_TESTBED_EventInformation event;
998
999   op_id = GNUNET_ntohll (msg->operation_id);
1000   if (NULL == (opc = find_opc (c, op_id)))
1001   {
1002     LOG_DEBUG ("Operation not found\n");
1003     return GNUNET_YES;
1004   }
1005   if (OP_FORWARDED == opc->type)
1006   {
1007     handle_forwarded_operation_msg (c, opc,
1008                                     (const struct GNUNET_MessageHeader *) msg);
1009     return GNUNET_YES;
1010   }
1011   if (OP_LINK_CONTROLLERS != opc->type)
1012   {
1013     GNUNET_break (0);
1014     return GNUNET_YES;
1015   }
1016   GNUNET_assert (NULL != (data = opc->data));
1017   host = GNUNET_TESTBED_host_lookup_by_id_ (data->host_id);
1018   GNUNET_assert (NULL != host);
1019   GNUNET_free (data);
1020   opc->data = NULL;
1021   opc->state = OPC_STATE_FINISHED;
1022   GNUNET_TESTBED_remove_opc_ (opc->c, opc);
1023   event.type = GNUNET_TESTBED_ET_OPERATION_FINISHED;
1024   event.op = opc->op;
1025   event.op_cls = opc->op_cls;
1026   event.details.operation_finished.emsg = NULL;
1027   event.details.operation_finished.generic = NULL;
1028   emsg = NULL;
1029   cfg = NULL;
1030   if (GNUNET_NO == ntohs (msg->success))
1031   {
1032     emsg = GNUNET_malloc (ntohs (msg->header.size)
1033                           - sizeof (struct
1034                                     GNUNET_TESTBED_ControllerLinkResponse) + 1);
1035     memcpy (emsg, &msg[1], ntohs (msg->header.size)
1036                           - sizeof (struct
1037                                     GNUNET_TESTBED_ControllerLinkResponse));
1038     event.details.operation_finished.emsg = emsg;
1039   }
1040   else
1041   {
1042     if (0 != ntohs (msg->config_size))
1043     {
1044       cfg = GNUNET_TESTBED_extract_config_ ((const struct GNUNET_MessageHeader *) msg);
1045       GNUNET_assert (NULL != cfg);
1046       GNUNET_TESTBED_host_replace_cfg_ (host, cfg);
1047     }
1048   }
1049   if (0 != (c->event_mask & (1L << GNUNET_TESTBED_ET_OPERATION_FINISHED)))
1050   {
1051     if (NULL != c->cc)
1052       c->cc (c->cc_cls, &event);
1053   }
1054   else
1055     LOG_DEBUG ("Not calling callback\n");
1056   if (NULL != cfg)
1057     GNUNET_CONFIGURATION_destroy (cfg);
1058   GNUNET_free_non_null (emsg);
1059   return GNUNET_YES;
1060 }
1061
1062
1063 /**
1064  * Handler for messages from controller (testbed service)
1065  *
1066  * @param cls the controller handler
1067  * @param msg message received, NULL on timeout or fatal error
1068  */
1069 static void
1070 message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
1071 {
1072   struct GNUNET_TESTBED_Controller *c = cls;
1073   int status;
1074   uint16_t msize;
1075
1076   c->in_receive = GNUNET_NO;
1077   /* FIXME: Add checks for message integrity */
1078   if (NULL == msg)
1079   {
1080     LOG_DEBUG ("Receive timed out or connection to service dropped\n");
1081     return;
1082   }
1083   msize = ntohs (msg->size);
1084   switch (ntohs (msg->type))
1085   {
1086   case GNUNET_MESSAGE_TYPE_TESTBED_ADD_HOST_SUCCESS:
1087     GNUNET_assert (msize >=
1088                    sizeof (struct GNUNET_TESTBED_HostConfirmedMessage));
1089     status =
1090         GNUNET_TESTBED_host_handle_addhostconfirm_
1091         (c, (const struct GNUNET_TESTBED_HostConfirmedMessage*) msg);
1092     break;
1093   case GNUNET_MESSAGE_TYPE_TESTBED_GENERIC_OPERATION_SUCCESS:
1094     GNUNET_assert (msize ==
1095                    sizeof (struct
1096                            GNUNET_TESTBED_GenericOperationSuccessEventMessage));
1097     status =
1098         handle_opsuccess (c,
1099                           (const struct
1100                            GNUNET_TESTBED_GenericOperationSuccessEventMessage *)
1101                           msg);
1102     break;
1103   case GNUNET_MESSAGE_TYPE_TESTBED_OPERATION_FAIL_EVENT:
1104     GNUNET_assert (msize >=
1105                    sizeof (struct GNUNET_TESTBED_OperationFailureEventMessage));
1106     status =
1107         handle_op_fail_event (c,
1108                               (const struct
1109                                GNUNET_TESTBED_OperationFailureEventMessage *)
1110                               msg);
1111     break;
1112   case GNUNET_MESSAGE_TYPE_TESTBED_CREATE_PEER_SUCCESS:
1113     GNUNET_assert (msize ==
1114                    sizeof (struct
1115                            GNUNET_TESTBED_PeerCreateSuccessEventMessage));
1116     status =
1117         handle_peer_create_success (c,
1118                                     (const struct
1119                                      GNUNET_TESTBED_PeerCreateSuccessEventMessage
1120                                      *) msg);
1121     break;
1122   case GNUNET_MESSAGE_TYPE_TESTBED_PEER_EVENT:
1123     GNUNET_assert (msize == sizeof (struct GNUNET_TESTBED_PeerEventMessage));
1124     status =
1125         handle_peer_event (c,
1126                            (const struct GNUNET_TESTBED_PeerEventMessage *)
1127                            msg);
1128
1129     break;
1130   case GNUNET_MESSAGE_TYPE_TESTBED_PEER_INFORMATION:
1131     GNUNET_assert (msize >=
1132                    sizeof (struct
1133                            GNUNET_TESTBED_PeerConfigurationInformationMessage));
1134     status =
1135         handle_peer_config (c,
1136                             (const struct
1137                              GNUNET_TESTBED_PeerConfigurationInformationMessage
1138                              *) msg);
1139     break;
1140   case GNUNET_MESSAGE_TYPE_TESTBED_PEER_CONNECT_EVENT:
1141     GNUNET_assert (msize ==
1142                    sizeof (struct GNUNET_TESTBED_ConnectionEventMessage));
1143     status =
1144         handle_peer_conevent (c,
1145                               (const struct
1146                                GNUNET_TESTBED_ConnectionEventMessage *) msg);
1147     break;
1148   case GNUNET_MESSAGE_TYPE_TESTBED_SLAVE_CONFIGURATION:
1149     GNUNET_assert (msize > sizeof (struct GNUNET_TESTBED_SlaveConfiguration));
1150     status =
1151         handle_slave_config (c,
1152                              (const struct GNUNET_TESTBED_SlaveConfiguration *)
1153                              msg);
1154     break;
1155   case GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS_RESULT:
1156     status = 
1157         handle_link_controllers_result (c,
1158                                         (const struct
1159                                          GNUNET_TESTBED_ControllerLinkResponse
1160                                          *) msg);
1161     break;
1162   default:
1163     GNUNET_assert (0);
1164   }
1165   if ((GNUNET_OK == status) && (GNUNET_NO == c->in_receive))
1166   {
1167     c->in_receive = GNUNET_YES;
1168     GNUNET_CLIENT_receive (c->client, &message_handler, c,
1169                            GNUNET_TIME_UNIT_FOREVER_REL);
1170   }
1171 }
1172
1173
1174 /**
1175  * Function called to notify a client about the connection begin ready to queue
1176  * more data.  "buf" will be NULL and "size" zero if the connection was closed
1177  * for writing in the meantime.
1178  *
1179  * @param cls closure
1180  * @param size number of bytes available in buf
1181  * @param buf where the callee should write the message
1182  * @return number of bytes written to buf
1183  */
1184 static size_t
1185 transmit_ready_notify (void *cls, size_t size, void *buf)
1186 {
1187   struct GNUNET_TESTBED_Controller *c = cls;
1188   struct MessageQueue *mq_entry;
1189
1190   c->th = NULL;
1191   mq_entry = c->mq_head;
1192   GNUNET_assert (NULL != mq_entry);
1193   if ((0 == size) && (NULL == buf))     /* Timeout */
1194   {
1195     LOG_DEBUG ("Message sending timed out -- retrying\n");
1196     c->th =
1197         GNUNET_CLIENT_notify_transmit_ready (c->client,
1198                                              ntohs (mq_entry->msg->size),
1199                                              TIMEOUT_REL, GNUNET_YES,
1200                                              &transmit_ready_notify, c);
1201     return 0;
1202   }
1203   GNUNET_assert (ntohs (mq_entry->msg->size) <= size);
1204   size = ntohs (mq_entry->msg->size);
1205   memcpy (buf, mq_entry->msg, size);
1206   LOG_DEBUG ("Message of type: %u and size: %u sent\n",
1207              ntohs (mq_entry->msg->type), size);
1208   GNUNET_free (mq_entry->msg);
1209   GNUNET_CONTAINER_DLL_remove (c->mq_head, c->mq_tail, mq_entry);
1210   GNUNET_free (mq_entry);
1211   mq_entry = c->mq_head;
1212   if (NULL != mq_entry)
1213     c->th =
1214         GNUNET_CLIENT_notify_transmit_ready (c->client,
1215                                              ntohs (mq_entry->msg->size),
1216                                              TIMEOUT_REL, GNUNET_YES,
1217                                              &transmit_ready_notify, c);
1218   if (GNUNET_NO == c->in_receive)
1219   {
1220     c->in_receive = GNUNET_YES;
1221     GNUNET_CLIENT_receive (c->client, &message_handler, c,
1222                            GNUNET_TIME_UNIT_FOREVER_REL);
1223   }
1224   return size;
1225 }
1226
1227
1228 /**
1229  * Queues a message in send queue for sending to the service
1230  *
1231  * @param controller the handle to the controller
1232  * @param msg the message to queue
1233  */
1234 void
1235 GNUNET_TESTBED_queue_message_ (struct GNUNET_TESTBED_Controller *controller,
1236                                struct GNUNET_MessageHeader *msg)
1237 {
1238   struct MessageQueue *mq_entry;
1239   uint16_t type;
1240   uint16_t size;
1241
1242   type = ntohs (msg->type);
1243   size = ntohs (msg->size);
1244   GNUNET_assert ((GNUNET_MESSAGE_TYPE_TESTBED_INIT <= type) &&
1245                  (GNUNET_MESSAGE_TYPE_TESTBED_MAX > type));
1246   mq_entry = GNUNET_malloc (sizeof (struct MessageQueue));
1247   mq_entry->msg = msg;
1248   LOG (GNUNET_ERROR_TYPE_DEBUG,
1249        "Queueing message of type %u, size %u for sending\n", type,
1250        ntohs (msg->size));
1251   GNUNET_CONTAINER_DLL_insert_tail (controller->mq_head, controller->mq_tail,
1252                                     mq_entry);
1253   if (NULL == controller->th)
1254     controller->th =
1255         GNUNET_CLIENT_notify_transmit_ready (controller->client, size,
1256                                              TIMEOUT_REL, GNUNET_YES,
1257                                              &transmit_ready_notify,
1258                                              controller);
1259 }
1260
1261
1262 /**
1263  * Sends the given message as an operation. The given callback is called when a
1264  * reply for the operation is available.  Call
1265  * GNUNET_TESTBED_forward_operation_msg_cancel_() to cleanup the returned
1266  * operation context if the cc hasn't been called
1267  *
1268  * @param controller the controller to which the message has to be sent
1269  * @param operation_id the operation id of the message
1270  * @param msg the message to send
1271  * @param cc the callback to call when reply is available
1272  * @param cc_cls the closure for the above callback
1273  * @return the operation context which can be used to cancel the forwarded
1274  *           operation
1275  */
1276 struct OperationContext *
1277 GNUNET_TESTBED_forward_operation_msg_ (struct GNUNET_TESTBED_Controller
1278                                        *controller, uint64_t operation_id,
1279                                        const struct GNUNET_MessageHeader *msg,
1280                                        GNUNET_CLIENT_MessageHandler cc,
1281                                        void *cc_cls)
1282 {
1283   struct OperationContext *opc;
1284   struct ForwardedOperationData *data;
1285   struct GNUNET_MessageHeader *dup_msg;
1286   uint16_t msize;
1287
1288   data = GNUNET_malloc (sizeof (struct ForwardedOperationData));
1289   data->cc = cc;
1290   data->cc_cls = cc_cls;
1291   opc = GNUNET_malloc (sizeof (struct OperationContext));
1292   opc->c = controller;
1293   opc->type = OP_FORWARDED;
1294   opc->data = data;
1295   opc->id = operation_id;
1296   msize = ntohs (msg->size);
1297   dup_msg = GNUNET_malloc (msize);
1298   (void) memcpy (dup_msg, msg, msize);
1299   GNUNET_TESTBED_queue_message_ (opc->c, dup_msg);
1300   GNUNET_TESTBED_insert_opc_ (controller, opc);
1301   return opc;
1302 }
1303
1304
1305 /**
1306  * Function to cancel an operation created by simply forwarding an operation
1307  * message.
1308  *
1309  * @param opc the operation context from GNUNET_TESTBED_forward_operation_msg_()
1310  */
1311 void
1312 GNUNET_TESTBED_forward_operation_msg_cancel_ (struct OperationContext *opc)
1313 {
1314   GNUNET_TESTBED_remove_opc_ (opc->c, opc);
1315   GNUNET_free (opc->data);
1316   GNUNET_free (opc);
1317 }
1318
1319
1320 /**
1321  * Function to call to start a link-controllers type operation once all queues
1322  * the operation is part of declare that the operation can be activated.
1323  *
1324  * @param cls the closure from GNUNET_TESTBED_operation_create_()
1325  */
1326 static void
1327 opstart_link_controllers (void *cls)
1328 {
1329   struct OperationContext *opc = cls;
1330   struct ControllerLinkData *data;
1331   struct GNUNET_TESTBED_ControllerLinkRequest *msg;
1332
1333   GNUNET_assert (NULL != opc->data);
1334   data = opc->data;
1335   msg = data->msg;
1336   data->msg = NULL;
1337   opc->state = OPC_STATE_STARTED;
1338   GNUNET_TESTBED_insert_opc_ (opc->c, opc);
1339   GNUNET_TESTBED_queue_message_ (opc->c, &msg->header);
1340 }
1341
1342
1343 /**
1344  * Callback which will be called when link-controllers type operation is released
1345  *
1346  * @param cls the closure from GNUNET_TESTBED_operation_create_()
1347  */
1348 static void
1349 oprelease_link_controllers (void *cls)
1350 {
1351   struct OperationContext *opc = cls;
1352   struct ControllerLinkData *data;
1353
1354   data = opc->data;
1355   switch (opc->state)
1356   {
1357   case OPC_STATE_INIT:
1358     GNUNET_free (data->msg);    
1359     break;
1360   case OPC_STATE_STARTED:
1361     GNUNET_TESTBED_remove_opc_ (opc->c, opc);
1362     break;
1363   case OPC_STATE_FINISHED:
1364     break;
1365   }
1366   GNUNET_free_non_null (data);
1367   GNUNET_free (opc);
1368 }
1369
1370
1371 /**
1372  * Function to be called when get slave config operation is ready
1373  *
1374  * @param cls the OperationContext of type OP_GET_SLAVE_CONFIG
1375  */
1376 static void
1377 opstart_get_slave_config (void *cls)
1378 {
1379   struct OperationContext *opc = cls;
1380   struct GetSlaveConfigData *data = opc->data;
1381   struct GNUNET_TESTBED_SlaveGetConfigurationMessage *msg;
1382
1383   GNUNET_assert (NULL != data);
1384   msg = GNUNET_TESTBED_generate_slavegetconfig_msg_ (opc->id, data->slave_id);
1385   GNUNET_free (opc->data);
1386   data = NULL;
1387   opc->data = NULL;
1388   GNUNET_TESTBED_insert_opc_ (opc->c, opc);
1389   GNUNET_TESTBED_queue_message_ (opc->c, &msg->header);
1390   opc->state = OPC_STATE_STARTED;
1391 }
1392
1393
1394 /**
1395  * Function to be called when get slave config operation is cancelled or finished
1396  *
1397  * @param cls the OperationContext of type OP_GET_SLAVE_CONFIG
1398  */
1399 static void
1400 oprelease_get_slave_config (void *cls)
1401 {
1402   struct OperationContext *opc = cls;
1403
1404   switch (opc->state)
1405   {
1406   case OPC_STATE_INIT:
1407     GNUNET_free (opc->data);
1408     break;
1409   case OPC_STATE_STARTED:
1410     GNUNET_TESTBED_remove_opc_ (opc->c, opc);
1411     break;
1412   case OPC_STATE_FINISHED:
1413     if (NULL != opc->data)
1414       GNUNET_CONFIGURATION_destroy (opc->data);
1415     break;
1416   }
1417   GNUNET_free (opc);
1418 }
1419
1420
1421 /**
1422  * Start a controller process using the given configuration at the
1423  * given host.
1424  *
1425  * @param host host to run the controller on; This should be the same host if
1426  *          the controller was previously started with
1427  *          GNUNET_TESTBED_controller_start()
1428  * @param event_mask bit mask with set of events to call 'cc' for;
1429  *                   or-ed values of "1LL" shifted by the
1430  *                   respective 'enum GNUNET_TESTBED_EventType'
1431  *                   (i.e.  "(1LL << GNUNET_TESTBED_ET_CONNECT) | ...")
1432  * @param cc controller callback to invoke on events
1433  * @param cc_cls closure for cc
1434  * @return handle to the controller
1435  */
1436 struct GNUNET_TESTBED_Controller *
1437 GNUNET_TESTBED_controller_connect (struct GNUNET_TESTBED_Host *host,
1438                                    uint64_t event_mask,
1439                                    GNUNET_TESTBED_ControllerCallback cc,
1440                                    void *cc_cls)
1441 {
1442   struct GNUNET_TESTBED_Controller *controller;
1443   struct GNUNET_TESTBED_InitMessage *msg;
1444   const struct GNUNET_CONFIGURATION_Handle *cfg;
1445   const char *controller_hostname;
1446   unsigned long long max_parallel_operations;
1447   unsigned long long max_parallel_service_connections;
1448   unsigned long long max_parallel_topology_config_operations;
1449
1450   GNUNET_assert (NULL != (cfg = GNUNET_TESTBED_host_get_cfg_ (host)));
1451   if (GNUNET_OK !=
1452       GNUNET_CONFIGURATION_get_value_number (cfg, "testbed",
1453                                              "MAX_PARALLEL_OPERATIONS",
1454                                              &max_parallel_operations))
1455   {
1456     GNUNET_break (0);
1457     return NULL;
1458   }
1459   if (GNUNET_OK !=
1460       GNUNET_CONFIGURATION_get_value_number (cfg, "testbed",
1461                                              "MAX_PARALLEL_SERVICE_CONNECTIONS",
1462                                              &max_parallel_service_connections))
1463   {
1464     GNUNET_break (0);
1465     return NULL;
1466   }
1467   if (GNUNET_OK !=
1468       GNUNET_CONFIGURATION_get_value_number (cfg, "testbed",
1469                                              "MAX_PARALLEL_TOPOLOGY_CONFIG_OPERATIONS",
1470                                              &max_parallel_topology_config_operations))
1471   {
1472     GNUNET_break (0);
1473     return NULL;
1474   }
1475   controller = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_Controller));
1476   controller->cc = cc;
1477   controller->cc_cls = cc_cls;
1478   controller->event_mask = event_mask;
1479   controller->cfg = GNUNET_CONFIGURATION_dup (cfg);
1480   controller->client = GNUNET_CLIENT_connect ("testbed", controller->cfg);
1481   if (NULL == controller->client)
1482   {
1483     GNUNET_TESTBED_controller_disconnect (controller);
1484     return NULL;
1485   }
1486   GNUNET_TESTBED_mark_host_registered_at_ (host, controller);
1487   controller->host = host;
1488   controller->opq_parallel_operations =
1489       GNUNET_TESTBED_operation_queue_create_ (OPERATION_QUEUE_TYPE_FIXED,
1490                                               (unsigned int) max_parallel_operations);
1491   controller->opq_parallel_service_connections =
1492       GNUNET_TESTBED_operation_queue_create_ (OPERATION_QUEUE_TYPE_FIXED,
1493                                               (unsigned int)
1494                                               max_parallel_service_connections);
1495   controller->opq_parallel_topology_config_operations =
1496       GNUNET_TESTBED_operation_queue_create_ (OPERATION_QUEUE_TYPE_FIXED,
1497                                               (unsigned int)
1498                                               max_parallel_topology_config_operations);
1499   controller_hostname = GNUNET_TESTBED_host_get_hostname (host);
1500   if (NULL == controller_hostname)
1501     controller_hostname = "127.0.0.1";
1502   msg =
1503       GNUNET_malloc (sizeof (struct GNUNET_TESTBED_InitMessage) +
1504                      strlen (controller_hostname) + 1);
1505   msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_INIT);
1506   msg->header.size =
1507       htons (sizeof (struct GNUNET_TESTBED_InitMessage) +
1508              strlen (controller_hostname) + 1);
1509   msg->host_id = htonl (GNUNET_TESTBED_host_get_id_ (host));
1510   msg->event_mask = GNUNET_htonll (controller->event_mask);
1511   strcpy ((char *) &msg[1], controller_hostname);
1512   GNUNET_TESTBED_queue_message_ (controller,
1513                                  (struct GNUNET_MessageHeader *) msg);
1514   return controller;
1515 }
1516
1517
1518 /**
1519  * Iterator to free opc map entries
1520  *
1521  * @param cls closure
1522  * @param key current key code
1523  * @param value value in the hash map
1524  * @return GNUNET_YES if we should continue to
1525  *         iterate,
1526  *         GNUNET_NO if not.
1527  */
1528 static int
1529 opc_free_iterator (void *cls, uint32_t key, void *value)
1530 {
1531   struct GNUNET_CONTAINER_MultiHashMap32 *map = cls;
1532   struct OperationContext *opc = value;
1533
1534   GNUNET_assert (NULL != opc);
1535   GNUNET_break (0);
1536   GNUNET_assert (GNUNET_YES == 
1537                  GNUNET_CONTAINER_multihashmap32_remove (map, key, value));
1538   GNUNET_free (opc);
1539   return GNUNET_YES;
1540 }
1541
1542
1543 /**
1544  * Stop the given controller (also will terminate all peers and
1545  * controllers dependent on this controller).  This function
1546  * blocks until the testbed has been fully terminated (!).
1547  *
1548  * @param c handle to controller to stop
1549  */
1550 void
1551 GNUNET_TESTBED_controller_disconnect (struct GNUNET_TESTBED_Controller
1552                                       *c)
1553 {
1554   struct MessageQueue *mq_entry;
1555
1556   if (NULL != c->th)
1557     GNUNET_CLIENT_notify_transmit_ready_cancel (c->th);
1558   /* Clear the message queue */
1559   while (NULL != (mq_entry = c->mq_head))
1560   {
1561     GNUNET_CONTAINER_DLL_remove (c->mq_head, c->mq_tail,
1562                                  mq_entry);
1563     GNUNET_free (mq_entry->msg);
1564     GNUNET_free (mq_entry);
1565   }
1566   if (NULL != c->client)
1567     GNUNET_CLIENT_disconnect (c->client);
1568   if (NULL != c->host)
1569     GNUNET_TESTBED_deregister_host_at_ (c->host, c);
1570   GNUNET_CONFIGURATION_destroy (c->cfg);
1571   GNUNET_TESTBED_operation_queue_destroy_ (c->opq_parallel_operations);
1572   GNUNET_TESTBED_operation_queue_destroy_
1573       (c->opq_parallel_service_connections);
1574   GNUNET_TESTBED_operation_queue_destroy_
1575       (c->opq_parallel_topology_config_operations);
1576   if (NULL != c->opc_map)
1577   {
1578     GNUNET_assert (GNUNET_SYSERR !=
1579                    GNUNET_CONTAINER_multihashmap32_iterate (c->opc_map,
1580                                                             &opc_free_iterator,
1581                                                             c->opc_map));
1582     GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap32_size (c->opc_map));
1583     GNUNET_CONTAINER_multihashmap32_destroy (c->opc_map);
1584   }
1585   GNUNET_free (c);
1586 }
1587
1588
1589 /**
1590  * Compresses given configuration using zlib compress
1591  *
1592  * @param config the serialized configuration
1593  * @param size the size of config
1594  * @param xconfig will be set to the compressed configuration (memory is fresly
1595  *          allocated)
1596  * @return the size of the xconfig
1597  */
1598 size_t
1599 GNUNET_TESTBED_compress_config_ (const char *config, size_t size,
1600                                  char **xconfig)
1601 {
1602   size_t xsize;
1603
1604   xsize = compressBound ((uLong) size);
1605   *xconfig = GNUNET_malloc (xsize);
1606   GNUNET_assert (Z_OK ==
1607                  compress2 ((Bytef *) * xconfig, (uLongf *) & xsize,
1608                             (const Bytef *) config, (uLongf) size,
1609                             Z_BEST_SPEED));
1610   return xsize;
1611 }
1612
1613
1614 /**
1615  * Function to serialize and compress using zlib a configuration through a
1616  * configuration handle
1617  *
1618  * @param cfg the configuration
1619  * @param size the size of configuration when serialize.  Will be set on success.
1620  * @param xsize the sizeo of the compressed configuration.  Will be set on success.
1621  * @return the serialized and compressed configuration
1622  */
1623 char *
1624 GNUNET_TESTBED_compress_cfg_ (const struct GNUNET_CONFIGURATION_Handle *cfg,
1625                               size_t *size, size_t *xsize)
1626 {
1627   char *config;
1628   char *xconfig;
1629   size_t size_;
1630   size_t xsize_;
1631   
1632   config = GNUNET_CONFIGURATION_serialize (cfg, &size_);
1633   xsize_ = GNUNET_TESTBED_compress_config_ (config, size_, &xconfig);
1634   GNUNET_free (config);
1635   *size = size_;
1636   *xsize = xsize_;
1637   return xconfig;
1638 }
1639
1640
1641 /**
1642  * Create a link from slave controller to delegated controller. Whenever the
1643  * master controller is asked to start a peer at the delegated controller the
1644  * request will be routed towards slave controller (if a route exists). The
1645  * slave controller will then route it to the delegated controller. The
1646  * configuration of the delegated controller is given and is used to either
1647  * create the delegated controller or to connect to an existing controller. Note
1648  * that while starting the delegated controller the configuration will be
1649  * modified to accommodate available free ports.  the 'is_subordinate' specifies
1650  * if the given delegated controller should be started and managed by the slave
1651  * controller, or if the delegated controller already has a master and the slave
1652  * controller connects to it as a non master controller. The success or failure
1653  * of this operation will be signalled through the
1654  * GNUNET_TESTBED_ControllerCallback() with an event of type
1655  * GNUNET_TESTBED_ET_OPERATION_FINISHED
1656  *
1657  * @param op_cls the operation closure for the event which is generated to
1658  *          signal success or failure of this operation
1659  * @param master handle to the master controller who creates the association
1660  * @param delegated_host requests to which host should be delegated; cannot be NULL
1661  * @param slave_host which host is used to run the slave controller; use NULL to
1662  *          make the master controller connect to the delegated host
1663  * @param is_subordinate GNUNET_YES if the controller at delegated_host should
1664  *          be started by the slave controller; GNUNET_NO if the slave
1665  *          controller has to connect to the already started delegated
1666  *          controller via TCP/IP
1667  * @return the operation handle
1668  */
1669 struct GNUNET_TESTBED_Operation *
1670 GNUNET_TESTBED_controller_link (void *op_cls,
1671                                 struct GNUNET_TESTBED_Controller *master,
1672                                 struct GNUNET_TESTBED_Host *delegated_host,
1673                                 struct GNUNET_TESTBED_Host *slave_host,
1674                                 int is_subordinate)
1675 {
1676   struct OperationContext *opc;
1677   struct GNUNET_TESTBED_ControllerLinkRequest *msg;
1678   struct ControllerLinkData *data;
1679   uint32_t slave_host_id;
1680   uint32_t delegated_host_id;
1681   uint16_t msg_size;
1682   
1683   GNUNET_assert (GNUNET_YES ==
1684                  GNUNET_TESTBED_is_host_registered_ (delegated_host, master));
1685   slave_host_id =
1686       GNUNET_TESTBED_host_get_id_ ((NULL !=
1687                                     slave_host) ? slave_host : master->host);
1688   delegated_host_id = GNUNET_TESTBED_host_get_id_ (delegated_host);
1689   if ((NULL != slave_host) && (0 != slave_host_id))
1690     GNUNET_assert (GNUNET_YES ==
1691                    GNUNET_TESTBED_is_host_registered_ (slave_host, master));
1692   msg_size = sizeof (struct GNUNET_TESTBED_ControllerLinkRequest);
1693   msg = GNUNET_malloc (msg_size);
1694   msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS);
1695   msg->header.size = htons (msg_size);
1696   msg->delegated_host_id = htonl (delegated_host_id);
1697   msg->slave_host_id = htonl (slave_host_id);
1698   msg->is_subordinate = (GNUNET_YES == is_subordinate) ? 1 : 0;
1699   data = GNUNET_malloc (sizeof (struct ControllerLinkData));
1700   data->msg = msg;
1701   data->host_id = delegated_host_id;
1702   opc = GNUNET_malloc (sizeof (struct OperationContext));
1703   opc->c = master;
1704   opc->data = data;
1705   opc->type = OP_LINK_CONTROLLERS;
1706   opc->id = GNUNET_TESTBED_get_next_op_id (opc->c);
1707   opc->state = OPC_STATE_INIT;
1708   opc->op_cls = op_cls;
1709   msg->operation_id = GNUNET_htonll (opc->id);
1710   opc->op =
1711       GNUNET_TESTBED_operation_create_ (opc, &opstart_link_controllers,
1712                                         &oprelease_link_controllers);
1713   GNUNET_TESTBED_operation_queue_insert_ (master->opq_parallel_operations,
1714                                           opc->op);
1715   GNUNET_TESTBED_operation_begin_wait_ (opc->op);
1716   return opc->op;
1717 }
1718
1719
1720 /**
1721  * Like GNUNET_TESTBED_get_slave_config(), however without the host registration
1722  * check. Another difference is that this function takes the id of the slave
1723  * host.
1724  *
1725  * @param op_cls the closure for the operation
1726  * @param master the handle to master controller
1727  * @param slave_host_id id of the host where the slave controller is running to
1728  *          the slave_host should remain valid until this operation is cancelled
1729  *          or marked as finished
1730  * @return the operation handle;
1731  */
1732 struct GNUNET_TESTBED_Operation *
1733 GNUNET_TESTBED_get_slave_config_ (void *op_cls,
1734                                   struct GNUNET_TESTBED_Controller *master,
1735                                   uint32_t slave_host_id)
1736 {
1737   struct OperationContext *opc;
1738   struct GetSlaveConfigData *data;
1739
1740   data = GNUNET_malloc (sizeof (struct GetSlaveConfigData));
1741   data->slave_id = slave_host_id;
1742   opc = GNUNET_malloc (sizeof (struct OperationContext));
1743   opc->state = OPC_STATE_INIT;
1744   opc->c = master;
1745   opc->id = GNUNET_TESTBED_get_next_op_id (master);
1746   opc->type = OP_GET_SLAVE_CONFIG;
1747   opc->data = data;
1748   opc->op_cls = op_cls;
1749   opc->op =
1750       GNUNET_TESTBED_operation_create_ (opc, &opstart_get_slave_config,
1751                                         &oprelease_get_slave_config);
1752   GNUNET_TESTBED_operation_queue_insert_ (master->opq_parallel_operations,
1753                                           opc->op);
1754   GNUNET_TESTBED_operation_begin_wait_ (opc->op);
1755   return opc->op;
1756 }
1757
1758
1759 /**
1760  * Function to acquire the configuration of a running slave controller. The
1761  * completion of the operation is signalled through the controller_cb from
1762  * GNUNET_TESTBED_controller_connect(). If the operation is successful the
1763  * handle to the configuration is available in the generic pointer of
1764  * operation_finished field of struct GNUNET_TESTBED_EventInformation.
1765  *
1766  * @param op_cls the closure for the operation
1767  * @param master the handle to master controller
1768  * @param slave_host the host where the slave controller is running; the handle
1769  *          to the slave_host should remain valid until this operation is
1770  *          cancelled or marked as finished
1771  * @return the operation handle; NULL if the slave_host is not registered at
1772  *           master
1773  */
1774 struct GNUNET_TESTBED_Operation *
1775 GNUNET_TESTBED_get_slave_config (void *op_cls,
1776                                  struct GNUNET_TESTBED_Controller *master,
1777                                  struct GNUNET_TESTBED_Host *slave_host)
1778 {
1779   if (GNUNET_NO == GNUNET_TESTBED_is_host_registered_ (slave_host, master))
1780     return NULL;
1781   return GNUNET_TESTBED_get_slave_config_ (op_cls, master,
1782                                            GNUNET_TESTBED_host_get_id_
1783                                            (slave_host));
1784 }
1785
1786
1787 /**
1788  * Ask the testbed controller to write the current overlay topology to
1789  * a file.  Naturally, the file will only contain a snapshot as the
1790  * topology may evolve all the time.
1791  *
1792  * @param controller overlay controller to inspect
1793  * @param filename name of the file the topology should
1794  *        be written to.
1795  */
1796 void
1797 GNUNET_TESTBED_overlay_write_topology_to_file (struct GNUNET_TESTBED_Controller
1798                                                *controller,
1799                                                const char *filename)
1800 {
1801   GNUNET_break (0);
1802 }
1803
1804
1805 /**
1806  * Creates a helper initialization message. This function is here because we
1807  * want to use this in testing
1808  *
1809  * @param trusted_ip the ip address of the controller which will be set as TRUSTED
1810  *          HOST(all connections form this ip are permitted by the testbed) when
1811  *          starting testbed controller at host. This can either be a single ip
1812  *          address or a network address in CIDR notation.
1813  * @param hostname the hostname of the destination this message is intended for
1814  * @param cfg the configuration that has to used to start the testbed service
1815  *          thru helper
1816  * @return the initialization message
1817  */
1818 struct GNUNET_TESTBED_HelperInit *
1819 GNUNET_TESTBED_create_helper_init_msg_ (const char *trusted_ip,
1820                                         const char *hostname,
1821                                         const struct GNUNET_CONFIGURATION_Handle
1822                                         *cfg)
1823 {
1824   struct GNUNET_TESTBED_HelperInit *msg;
1825   char *config;
1826   char *xconfig;
1827   size_t config_size;
1828   size_t xconfig_size;
1829   uint16_t trusted_ip_len;
1830   uint16_t hostname_len;
1831   uint16_t msg_size;
1832
1833   config = GNUNET_CONFIGURATION_serialize (cfg, &config_size);
1834   GNUNET_assert (NULL != config);
1835   xconfig_size =
1836       GNUNET_TESTBED_compress_config_ (config, config_size, &xconfig);
1837   GNUNET_free (config);
1838   trusted_ip_len = strlen (trusted_ip);
1839   hostname_len = (NULL == hostname) ? 0 : strlen (hostname);
1840   msg_size =
1841       xconfig_size + trusted_ip_len + 1 +
1842       sizeof (struct GNUNET_TESTBED_HelperInit);
1843   msg_size += hostname_len;
1844   msg = GNUNET_realloc (xconfig, msg_size);
1845   (void) memmove (((void *) &msg[1]) + trusted_ip_len + 1 + hostname_len, msg,
1846                   xconfig_size);
1847   msg->header.size = htons (msg_size);
1848   msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_HELPER_INIT);
1849   msg->trusted_ip_size = htons (trusted_ip_len);
1850   msg->hostname_size = htons (hostname_len);
1851   msg->config_size = htons (config_size);
1852   (void) strcpy ((char *) &msg[1], trusted_ip);
1853   if (0 != hostname_len)
1854     (void) strncpy (((char *) &msg[1]) + trusted_ip_len + 1, hostname,
1855                     hostname_len);
1856   return msg;
1857 }
1858
1859
1860 /**
1861  * This function is used to signal that the event information (struct
1862  * GNUNET_TESTBED_EventInformation) from an operation has been fully processed
1863  * i.e. if the event callback is ever called for this operation. If the event
1864  * callback for this operation has not yet been called, calling this function
1865  * cancels the operation, frees its resources and ensures the no event is
1866  * generated with respect to this operation. Note that however cancelling an
1867  * operation does NOT guarantee that the operation will be fully undone (or that
1868  * nothing ever happened). 
1869  *
1870  * This function MUST be called for every operation to fully remove the
1871  * operation from the operation queue.  After calling this function, if
1872  * operation is completed and its event information is of type
1873  * GNUNET_TESTBED_ET_OPERATION_FINISHED, the 'op_result' becomes invalid (!).
1874
1875  * If the operation is generated from GNUNET_TESTBED_service_connect() then
1876  * calling this function on such as operation calls the disconnect adapter if
1877  * the connect adapter was ever called.
1878  *
1879  * @param operation operation to signal completion or cancellation
1880  */
1881 void
1882 GNUNET_TESTBED_operation_done (struct GNUNET_TESTBED_Operation *operation)
1883 {
1884   (void) exop_check (operation);
1885   GNUNET_TESTBED_operation_release_ (operation);
1886 }
1887
1888
1889 /**
1890  * Generates configuration by uncompressing configuration in given message. The
1891  * given message should be of the following types:
1892  * GNUNET_MESSAGE_TYPE_TESTBED_PEER_INFORMATION,
1893  * GNUNET_MESSAGE_TYPE_TESTBED_SLAVE_CONFIGURATION,
1894  * GNUNET_MESSAGE_TYPE_TESTBED_ADD_HOST,
1895  * GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS,
1896  * GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS_RESULT,
1897  *
1898  * @param msg the message containing compressed configuration
1899  * @return handle to the parsed configuration; NULL upon error while parsing the message
1900  */
1901 struct GNUNET_CONFIGURATION_Handle *
1902 GNUNET_TESTBED_extract_config_ (const struct GNUNET_MessageHeader *msg)
1903 {
1904   struct GNUNET_CONFIGURATION_Handle *cfg;
1905   Bytef *data;
1906   const Bytef *xdata;
1907   uLong data_len;
1908   uLong xdata_len;
1909   int ret;
1910
1911   switch (ntohs (msg->type))
1912   {
1913   case GNUNET_MESSAGE_TYPE_TESTBED_PEER_INFORMATION:
1914   {
1915     const struct GNUNET_TESTBED_PeerConfigurationInformationMessage *imsg;
1916
1917     imsg =
1918         (const struct GNUNET_TESTBED_PeerConfigurationInformationMessage *) msg;
1919     data_len = (uLong) ntohs (imsg->config_size);
1920     xdata_len =
1921         ntohs (imsg->header.size) -
1922         sizeof (struct GNUNET_TESTBED_PeerConfigurationInformationMessage);
1923     xdata = (const Bytef *) &imsg[1];
1924   }
1925     break;
1926   case GNUNET_MESSAGE_TYPE_TESTBED_SLAVE_CONFIGURATION:
1927   {
1928     const struct GNUNET_TESTBED_SlaveConfiguration *imsg;
1929
1930     imsg = (const struct GNUNET_TESTBED_SlaveConfiguration *) msg;
1931     data_len = (uLong) ntohs (imsg->config_size);
1932     xdata_len =
1933         ntohs (imsg->header.size) -
1934         sizeof (struct GNUNET_TESTBED_SlaveConfiguration);
1935     xdata = (const Bytef *) &imsg[1];
1936   }
1937   break;
1938   case GNUNET_MESSAGE_TYPE_TESTBED_ADD_HOST:
1939     {
1940       const struct GNUNET_TESTBED_AddHostMessage *imsg;
1941       uint16_t osize;
1942       
1943       imsg = (const struct GNUNET_TESTBED_AddHostMessage *) msg;
1944       data_len = (uLong) ntohs (imsg->config_size);
1945       osize = sizeof (struct GNUNET_TESTBED_AddHostMessage) +
1946           ntohs (imsg->username_length) + ntohs (imsg->hostname_length); 
1947       xdata_len = ntohs (imsg->header.size) - osize;
1948       xdata = (const Bytef *) ((const void *) imsg + osize);
1949     }
1950     break;
1951   case GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS_RESULT:
1952     {
1953       const struct GNUNET_TESTBED_ControllerLinkResponse *imsg;
1954       
1955       imsg = (const struct GNUNET_TESTBED_ControllerLinkResponse *) msg;
1956       data_len = ntohs (imsg->config_size);
1957       xdata_len = ntohs (imsg->header.size) - 
1958           sizeof (const struct GNUNET_TESTBED_ControllerLinkResponse);
1959       xdata = (const Bytef *) &imsg[1];
1960     }
1961     break;
1962   case GNUNET_MESSAGE_TYPE_TESTBED_CREATE_PEER:
1963     {
1964       const struct GNUNET_TESTBED_PeerCreateMessage *imsg;
1965
1966       imsg = (const struct GNUNET_TESTBED_PeerCreateMessage *) msg;
1967       data_len = ntohs (imsg->config_size);
1968       xdata_len = ntohs (imsg->header.size) -
1969           sizeof (struct GNUNET_TESTBED_PeerCreateMessage);
1970       xdata = (const Bytef *) &imsg[1];
1971     }
1972     break;
1973   case GNUNET_MESSAGE_TYPE_TESTBED_RECONFIGURE_PEER:
1974     {
1975       const struct GNUNET_TESTBED_PeerReconfigureMessage *imsg;
1976
1977       imsg = (const struct GNUNET_TESTBED_PeerReconfigureMessage *) msg;
1978       data_len =  ntohs (imsg->config_size);
1979       xdata_len = ntohs (imsg->header.size) -
1980           sizeof (struct GNUNET_TESTBED_PeerReconfigureMessage);
1981       xdata = (const Bytef *) &imsg[1];
1982     }
1983     break;
1984   default:
1985     GNUNET_assert (0);
1986   }
1987   data = GNUNET_malloc (data_len);
1988   if (Z_OK != (ret = uncompress (data, &data_len, xdata, xdata_len)))
1989   {
1990     GNUNET_free (data);
1991     GNUNET_break_op (0);        /* Un-compression failure */
1992     return NULL;
1993   }
1994   cfg = GNUNET_CONFIGURATION_create ();
1995   if (GNUNET_OK !=
1996       GNUNET_CONFIGURATION_deserialize (cfg, (const char *) data,
1997                                         (size_t) data_len,
1998                                         GNUNET_NO))
1999   {
2000     GNUNET_free (data);
2001     GNUNET_break_op (0);        /* De-serialization failure */
2002     return NULL;
2003   }
2004   GNUNET_free (data);
2005   return cfg;
2006 }
2007
2008
2009 /**
2010  * Checks the integrity of the OperationFailureEventMessage and if good returns
2011  * the error message it contains.
2012  *
2013  * @param msg the OperationFailureEventMessage
2014  * @return the error message
2015  */
2016 const char *
2017 GNUNET_TESTBED_parse_error_string_ (const struct
2018                                     GNUNET_TESTBED_OperationFailureEventMessage
2019                                     *msg)
2020 {
2021   uint16_t msize;
2022   const char *emsg;
2023
2024   msize = ntohs (msg->header.size);
2025   if (sizeof (struct GNUNET_TESTBED_OperationFailureEventMessage) >= msize)
2026     return NULL;
2027   msize -= sizeof (struct GNUNET_TESTBED_OperationFailureEventMessage);
2028   emsg = (const char *) &msg[1];
2029   if ('\0' != emsg[msize - 1])
2030   {
2031     GNUNET_break (0);
2032     return NULL;
2033   }
2034   return emsg;
2035 }
2036
2037
2038 /**
2039  * Function to return the operation id for a controller. The operation id is
2040  * created from the controllers host id and its internal operation counter.
2041  *
2042  * @param controller the handle to the controller whose operation id has to be incremented
2043  * @return the incremented operation id.
2044  */
2045 uint64_t
2046 GNUNET_TESTBED_get_next_op_id (struct GNUNET_TESTBED_Controller * controller)
2047 {
2048   uint64_t op_id;
2049
2050   op_id = (uint64_t) GNUNET_TESTBED_host_get_id_ (controller->host);
2051   op_id = op_id << 32;
2052   op_id |= (uint64_t) controller->operation_counter++;
2053   return op_id;
2054 }
2055
2056
2057 /**
2058  * Function called when a shutdown peers operation is ready
2059  *
2060  * @param cls the closure from GNUNET_TESTBED_operation_create_()
2061  */
2062 static void
2063 opstart_shutdown_peers (void *cls)
2064 {
2065   struct OperationContext *opc = cls;
2066   struct GNUNET_TESTBED_ShutdownPeersMessage *msg;
2067
2068   opc->state = OPC_STATE_STARTED;
2069   msg = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_ShutdownPeersMessage));
2070   msg->header.size = 
2071       htons (sizeof (struct GNUNET_TESTBED_ShutdownPeersMessage));
2072   msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_SHUTDOWN_PEERS);
2073   msg->operation_id = GNUNET_htonll (opc->id);
2074   GNUNET_TESTBED_insert_opc_ (opc->c, opc);
2075   GNUNET_TESTBED_queue_message_ (opc->c, &msg->header);
2076 }
2077
2078
2079 /**
2080  * Callback which will be called when shutdown peers operation is released
2081  *
2082  * @param cls the closure from GNUNET_TESTBED_operation_create_()
2083  */
2084 static void
2085 oprelease_shutdown_peers (void *cls)
2086 {
2087   struct OperationContext *opc = cls;
2088
2089   switch (opc->state)
2090   {
2091   case OPC_STATE_STARTED:
2092     GNUNET_TESTBED_remove_opc_ (opc->c, opc);
2093     /* no break; continue */
2094   case OPC_STATE_INIT:
2095     GNUNET_free (opc->data);
2096     break;
2097   case OPC_STATE_FINISHED:
2098     break;
2099   }
2100   GNUNET_free (opc);
2101 }
2102
2103
2104 /**
2105  * Stops and destroys all peers.  Is equivalent of calling
2106  * GNUNET_TESTBED_peer_stop() and GNUNET_TESTBED_peer_destroy() on all peers,
2107  * except that the peer stop event and operation finished event corresponding to
2108  * the respective functions are not generated.  This function should be called
2109  * when there are no other pending operations.  If there are pending operations,
2110  * it will return NULL
2111  *
2112  * @param c the controller to send this message to
2113  * @param op_cls closure for the operation
2114  * @param cb the callback to call when all peers are stopped and destroyed
2115  * @param cb_cls the closure for the callback
2116  * @return operation handle on success; NULL if any pending operations are
2117  *           present
2118  */
2119 struct GNUNET_TESTBED_Operation *
2120 GNUNET_TESTBED_shutdown_peers (struct GNUNET_TESTBED_Controller *c,
2121                                void *op_cls,
2122                                GNUNET_TESTBED_OperationCompletionCallback cb,
2123                                void *cb_cls)
2124 {
2125   struct OperationContext *opc;
2126   struct ShutdownPeersData *data;
2127
2128   if (0 != GNUNET_CONTAINER_multihashmap32_size (c->opc_map))
2129     return NULL;
2130   data = GNUNET_malloc (sizeof (struct ShutdownPeersData));
2131   data->cb = cb;
2132   data->cb_cls = cb_cls;
2133   opc = GNUNET_malloc (sizeof (struct OperationContext));
2134   opc->c = c;
2135   opc->op_cls = op_cls;
2136   opc->data = data;
2137   opc->id =  GNUNET_TESTBED_get_next_op_id (c);
2138   opc->type = OP_SHUTDOWN_PEERS;
2139   opc->state = OPC_STATE_INIT;  
2140   opc->op = GNUNET_TESTBED_operation_create_ (opc, &opstart_shutdown_peers,
2141                                               &oprelease_shutdown_peers);
2142   GNUNET_TESTBED_operation_queue_insert_ (opc->c->opq_parallel_operations,
2143                                         opc->op);
2144   GNUNET_TESTBED_operation_begin_wait_ (opc->op);
2145   return opc->op;
2146 }
2147
2148
2149 /* end of testbed_api.c */