- distribute peers equally among island nodes on SuperMUC
[oweals/gnunet.git] / src / testbed / gnunet-service-testbed_peers.c
1 /*
2   This file is part of GNUnet.
3   (C) 2008--2013 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 2, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21
22 /**
23  * @file testbed/gnunet-service-testbed_peers.c
24  * @brief implementation of TESTBED service that deals with peer management
25  * @author Sree Harsha Totakura <sreeharsha@totakura.in> 
26  */
27
28 #include "gnunet-service-testbed.h"
29 #include "gnunet_arm_service.h"
30 #include <zlib.h>
31
32
33 /**
34  * A list of peers we know about
35  */
36 struct Peer **GST_peer_list;
37
38
39 /**
40  * Context information to manage peers' services
41  */
42 struct ManageServiceContext
43 {
44   /**
45    * DLL next ptr
46    */
47   struct ManageServiceContext *next;
48
49   /**
50    * DLL prev ptr
51    */
52   struct ManageServiceContext *prev;
53
54   /**
55    * The ARM handle of the peer
56    */
57   struct GNUNET_ARM_Handle *ah;
58
59   /**
60    * peer whose service has to be managed
61    */
62   struct Peer *peer;
63
64   /**
65    * The client which requested to manage the peer's service
66    */
67   struct GNUNET_SERVER_Client *client;
68   
69   /**
70    * The operation id of the associated request
71    */
72   uint64_t op_id;
73   
74   /**
75    * 1 if the service at the peer has to be started; 0 if it has to be stopped
76    */
77   uint8_t start;
78
79   /**
80    * Is this context expired?  Do not work on this context if it is set to
81    * GNUNET_YES
82    */
83   uint8_t expired;  
84 };
85
86
87 /**
88  * Context information for peer re-configure operations
89  */
90 struct PeerReconfigureContext
91 {
92   /**
93    * DLL next for inclusoin in peer reconfigure operations list
94    */
95   struct PeerReconfigureContext *next;
96
97   /**
98    * DLL prev
99    */
100   struct PeerReconfigureContext *prev;
101
102   /**
103    * The client which gave this operation to us
104    */
105   struct GNUNET_SERVER_Client *client;
106
107   /**
108    * The configuration handle to use as the new template
109    */
110   struct GNUNET_CONFIGURATION_Handle *cfg;
111
112   /**
113    * The id of the operation
114    */
115   uint64_t op_id;
116
117   /**
118    * The id of the peer which has to be reconfigured
119    */
120   uint32_t peer_id;
121
122   /**
123    * The the peer stopped?  Used while cleaning up this context to decide
124    * whether the asynchronous stop request through Testing/ARM API has to be
125    * cancelled
126    */
127   uint8_t stopped;
128 };
129
130 /**
131  * The DLL head for the peer reconfigure list
132  */
133 static struct PeerReconfigureContext *prc_head;
134
135 /**
136  * The DLL tail for the peer reconfigure list
137  */
138 static struct PeerReconfigureContext *prc_tail;
139
140
141
142 /**
143  * DLL head for queue of manage service requests
144  */
145 static struct ManageServiceContext *mctx_head;
146
147 /**
148  * DLL tail for queue of manage service requests
149  */
150 static struct ManageServiceContext *mctx_tail;
151
152
153 /**
154  * Adds a peer to the peer array
155  *
156  * @param peer the peer to add
157  */
158 static void
159 peer_list_add (struct Peer *peer)
160 {
161   if (peer->id >= GST_peer_list_size)
162     GST_array_grow_large_enough (GST_peer_list, GST_peer_list_size, peer->id);
163   GNUNET_assert (NULL == GST_peer_list[peer->id]);
164   GST_peer_list[peer->id] = peer;
165 }
166
167
168 /**
169  * Removes a the give peer from the peer array
170  *
171  * @param peer the peer to be removed
172  */
173 static void
174 peer_list_remove (struct Peer *peer)
175 {
176   unsigned int orig_size;
177   uint32_t id;
178
179   GST_peer_list[peer->id] = NULL;
180   orig_size = GST_peer_list_size;
181   while (GST_peer_list_size >= LIST_GROW_STEP)
182   {
183     for (id = GST_peer_list_size - 1;
184          (id >= GST_peer_list_size - LIST_GROW_STEP) && (id != UINT32_MAX);
185          id--)
186       if (NULL != GST_peer_list[id])
187         break;
188     if (id != ((GST_peer_list_size - LIST_GROW_STEP) - 1))
189       break;
190     GST_peer_list_size -= LIST_GROW_STEP;
191   }
192   if (orig_size == GST_peer_list_size)
193     return;
194   GST_peer_list =
195       GNUNET_realloc (GST_peer_list,
196                       sizeof (struct Peer *) * GST_peer_list_size);
197 }
198
199
200 /**
201  * The task to be executed if the forwarded peer create operation has been
202  * timed out
203  *
204  * @param cls the FowardedOperationContext
205  * @param tc the TaskContext from the scheduler
206  */
207 static void
208 peer_create_forward_timeout (void *cls,
209                              const struct GNUNET_SCHEDULER_TaskContext *tc)
210 {
211   struct ForwardedOperationContext *fopc = cls;
212
213   GNUNET_free (fopc->cls);
214   GST_forwarded_operation_timeout (fopc, tc);
215 }
216
217
218 /**
219  * Callback to be called when forwarded peer create operation is successfull. We
220  * have to relay the reply msg back to the client
221  *
222  * @param cls ForwardedOperationContext
223  * @param msg the peer create success message
224  */
225 static void
226 peer_create_success_cb (void *cls, const struct GNUNET_MessageHeader *msg)
227 {
228   struct ForwardedOperationContext *fopc = cls;
229   struct Peer *remote_peer;
230
231   if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_TESTBED_CREATE_PEER_SUCCESS)
232   {
233     GNUNET_assert (NULL != fopc->cls);
234     remote_peer = fopc->cls;
235     peer_list_add (remote_peer);
236   }
237   GST_forwarded_operation_reply_relay (fopc, msg);
238 }
239
240
241 /**
242  * Function to destroy a peer
243  *
244  * @param peer the peer structure to destroy
245  */
246 void
247 GST_destroy_peer (struct Peer *peer)
248 {
249   GNUNET_break (0 == peer->reference_cnt);
250   if (GNUNET_YES == peer->is_remote)
251   {
252     peer_list_remove (peer);
253     GNUNET_free (peer);
254     return;
255   }
256   if (GNUNET_YES == peer->details.local.is_running)
257   {
258     GNUNET_TESTING_peer_stop (peer->details.local.peer);
259     peer->details.local.is_running = GNUNET_NO;
260   }
261   GNUNET_TESTING_peer_destroy (peer->details.local.peer);
262   GNUNET_CONFIGURATION_destroy (peer->details.local.cfg);
263   peer_list_remove (peer);
264   GNUNET_free (peer);
265 }
266
267
268 /**
269  * Callback to be called when forwarded peer destroy operation is successfull. We
270  * have to relay the reply msg back to the client
271  *
272  * @param cls ForwardedOperationContext
273  * @param msg the peer create success message
274  */
275 static void
276 peer_destroy_success_cb (void *cls, const struct GNUNET_MessageHeader *msg)
277 {
278   struct ForwardedOperationContext *fopc = cls;
279   struct Peer *remote_peer;
280
281   if (GNUNET_MESSAGE_TYPE_TESTBED_GENERIC_OPERATION_SUCCESS ==
282       ntohs (msg->type))
283   {
284     remote_peer = fopc->cls;
285     GNUNET_assert (NULL != remote_peer);
286     remote_peer->destroy_flag = GNUNET_YES;
287     if (0 == remote_peer->reference_cnt)
288       GST_destroy_peer (remote_peer);
289   }
290   GST_forwarded_operation_reply_relay (fopc, msg);
291 }
292
293
294 /**
295  * Handler for GNUNET_MESSAGE_TYPE_TESTBED_CREATEPEER messages
296  *
297  * @param cls NULL
298  * @param client identification of the client
299  * @param message the actual message
300  */
301 void
302 GST_handle_peer_create (void *cls, struct GNUNET_SERVER_Client *client,
303                         const struct GNUNET_MessageHeader *message)
304 {
305   const struct GNUNET_TESTBED_PeerCreateMessage *msg;
306   struct GNUNET_TESTBED_PeerCreateSuccessEventMessage *reply;
307   struct GNUNET_CONFIGURATION_Handle *cfg;
308   struct ForwardedOperationContext *fo_ctxt;
309   struct Route *route;
310   struct Peer *peer;
311   char *emsg;
312   uint32_t host_id;
313   uint32_t peer_id;
314   uint16_t msize;
315
316
317   msize = ntohs (message->size);
318   if (msize <= sizeof (struct GNUNET_TESTBED_PeerCreateMessage))
319   {
320     GNUNET_break (0);           /* We need configuration */
321     GNUNET_SERVER_receive_done (client, GNUNET_OK);
322     return;
323   }
324   msg = (const struct GNUNET_TESTBED_PeerCreateMessage *) message;
325   host_id = ntohl (msg->host_id);
326   peer_id = ntohl (msg->peer_id);
327   if (VALID_PEER_ID (peer_id))
328   {
329     (void) GNUNET_asprintf (&emsg, "Peer with ID %u already exists", peer_id);
330     GST_send_operation_fail_msg (client, GNUNET_ntohll (msg->operation_id),
331                                  emsg);
332     GNUNET_free (emsg);
333     GNUNET_SERVER_receive_done (client, GNUNET_OK);
334     return;
335   }
336   if (UINT32_MAX == peer_id)
337   {    
338     GST_send_operation_fail_msg (client, GNUNET_ntohll (msg->operation_id),
339                                  "Cannot create peer with given ID");
340     GNUNET_SERVER_receive_done (client, GNUNET_OK);
341     return;
342   }
343   if (host_id == GST_context->host_id)
344   {
345     /* We are responsible for this peer */
346     cfg = GNUNET_TESTBED_extract_config_ (message);
347     if (NULL == cfg)
348     {
349       GNUNET_break (0);
350       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
351       return;
352     }
353     GNUNET_CONFIGURATION_set_value_number (cfg, "TESTBED", "PEERID",
354                                            (unsigned long long) peer_id);
355     peer = GNUNET_malloc (sizeof (struct Peer));
356     peer->is_remote = GNUNET_NO;
357     peer->details.local.cfg = cfg;
358     peer->id = peer_id;
359     LOG_DEBUG ("Creating peer with id: %u\n", (unsigned int) peer->id);
360     peer->details.local.peer =
361         GNUNET_TESTING_peer_configure (GST_context->system,
362                                        peer->details.local.cfg, peer->id,
363                                        NULL /* Peer id */ ,
364                                        &emsg);
365     if (NULL == peer->details.local.peer)
366     {
367       LOG (GNUNET_ERROR_TYPE_WARNING, "Configuring peer failed: %s\n", emsg);
368       GNUNET_free (emsg);
369       GNUNET_free (peer);
370       GNUNET_break (0);
371       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
372       return;
373     }
374     peer->details.local.is_running = GNUNET_NO;
375     peer_list_add (peer);
376     reply =
377         GNUNET_malloc (sizeof
378                        (struct GNUNET_TESTBED_PeerCreateSuccessEventMessage));
379     reply->header.size =
380         htons (sizeof (struct GNUNET_TESTBED_PeerCreateSuccessEventMessage));
381     reply->header.type =
382         htons (GNUNET_MESSAGE_TYPE_TESTBED_CREATE_PEER_SUCCESS);
383     reply->peer_id = msg->peer_id;
384     reply->operation_id = msg->operation_id;
385     GST_queue_message (client, &reply->header);
386     GNUNET_SERVER_receive_done (client, GNUNET_OK);
387     return;
388   }
389
390   /* Forward peer create request */
391   route = GST_find_dest_route (host_id);
392   if (NULL == route)
393   {
394     GNUNET_break (0);
395     GNUNET_SERVER_receive_done (client, GNUNET_OK);
396     return;
397   }
398   peer = GNUNET_malloc (sizeof (struct Peer));
399   peer->is_remote = GNUNET_YES;
400   peer->id = peer_id;
401   peer->details.remote.slave = GST_slave_list[route->dest];
402   peer->details.remote.remote_host_id = host_id;
403   fo_ctxt = GNUNET_malloc (sizeof (struct ForwardedOperationContext));
404   GNUNET_SERVER_client_keep (client);
405   fo_ctxt->client = client;
406   fo_ctxt->operation_id = GNUNET_ntohll (msg->operation_id);
407   fo_ctxt->cls = peer;
408   fo_ctxt->type = OP_PEER_CREATE;
409   fo_ctxt->opc =
410       GNUNET_TESTBED_forward_operation_msg_ (GST_slave_list
411                                              [route->dest]->controller,
412                                              fo_ctxt->operation_id,
413                                              &msg->header,
414                                              peer_create_success_cb, fo_ctxt);
415   fo_ctxt->timeout_task =
416       GNUNET_SCHEDULER_add_delayed (GST_timeout, &peer_create_forward_timeout,
417                                     fo_ctxt);
418   GNUNET_CONTAINER_DLL_insert_tail (fopcq_head, fopcq_tail, fo_ctxt);
419   GNUNET_SERVER_receive_done (client, GNUNET_OK);
420 }
421
422
423 /**
424  * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_DESTROYPEER messages
425  *
426  * @param cls NULL
427  * @param client identification of the client
428  * @param message the actual message
429  */
430 void
431 GST_handle_peer_destroy (void *cls, struct GNUNET_SERVER_Client *client,
432                          const struct GNUNET_MessageHeader *message)
433 {
434   const struct GNUNET_TESTBED_PeerDestroyMessage *msg;
435   struct ForwardedOperationContext *fopc;
436   struct Peer *peer;
437   uint32_t peer_id;
438
439   msg = (const struct GNUNET_TESTBED_PeerDestroyMessage *) message;
440   peer_id = ntohl (msg->peer_id);
441   LOG_DEBUG ("Received peer destory on peer: %u and operation id: %ul\n",
442              peer_id, GNUNET_ntohll (msg->operation_id));
443   if (!VALID_PEER_ID (peer_id))
444   {
445     LOG (GNUNET_ERROR_TYPE_ERROR,
446          "Asked to destroy a non existent peer with id: %u\n", peer_id);
447     GST_send_operation_fail_msg (client, GNUNET_ntohll (msg->operation_id),
448                                  "Peer doesn't exist");
449     GNUNET_SERVER_receive_done (client, GNUNET_OK);
450     return;
451   }
452   peer = GST_peer_list[peer_id];
453   if (GNUNET_YES == peer->is_remote)
454   {
455     /* Forward the destory message to sub controller */
456     fopc = GNUNET_malloc (sizeof (struct ForwardedOperationContext));
457     GNUNET_SERVER_client_keep (client);
458     fopc->client = client;
459     fopc->cls = peer;
460     fopc->type = OP_PEER_DESTROY;
461     fopc->operation_id = GNUNET_ntohll (msg->operation_id);
462     fopc->opc =
463         GNUNET_TESTBED_forward_operation_msg_ (peer->details.remote.
464                                                slave->controller,
465                                                fopc->operation_id, &msg->header,
466                                                &peer_destroy_success_cb, fopc);
467     fopc->timeout_task =
468         GNUNET_SCHEDULER_add_delayed (GST_timeout, &GST_forwarded_operation_timeout,
469                                       fopc);
470     GNUNET_CONTAINER_DLL_insert_tail (fopcq_head, fopcq_tail, fopc);
471     GNUNET_SERVER_receive_done (client, GNUNET_OK);
472     return;
473   }
474   peer->destroy_flag = GNUNET_YES;
475   if (0 == peer->reference_cnt)
476     GST_destroy_peer (peer);
477   else
478     LOG (GNUNET_ERROR_TYPE_DEBUG,
479          "Delaying peer destroy as peer is currently in use\n");
480   GST_send_operation_success_msg (client, GNUNET_ntohll (msg->operation_id));
481   GNUNET_SERVER_receive_done (client, GNUNET_OK);
482 }
483
484
485 /**
486  * Stats a peer
487  *
488  * @param peer the peer to start
489  * @return GNUNET_OK upon success; GNUNET_SYSERR upon failure
490  */
491 static int
492 start_peer (struct Peer *peer)
493 {
494   GNUNET_assert (GNUNET_NO == peer->is_remote);
495   if (GNUNET_OK != GNUNET_TESTING_peer_start (peer->details.local.peer))
496     return GNUNET_SYSERR;
497   peer->details.local.is_running = GNUNET_YES;
498   return GNUNET_OK;
499 }
500
501
502 /**
503  * Stops a peer
504  *
505  * @param peer the peer to stop
506  * @return GNUNET_OK upon success; GNUNET_SYSERR upon failure
507  */
508 static int
509 stop_peer (struct Peer *peer)
510 {
511   GNUNET_assert (GNUNET_NO == peer->is_remote);
512   if (GNUNET_OK != GNUNET_TESTING_peer_kill (peer->details.local.peer))
513     return GNUNET_SYSERR;
514   peer->details.local.is_running = GNUNET_NO;
515   return GNUNET_OK;
516 }
517
518
519 /**
520  * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_DESTROYPEER messages
521  *
522  * @param cls NULL
523  * @param client identification of the client
524  * @param message the actual message
525  */
526 void
527 GST_handle_peer_start (void *cls, struct GNUNET_SERVER_Client *client,
528                        const struct GNUNET_MessageHeader *message)
529 {
530   const struct GNUNET_TESTBED_PeerStartMessage *msg;
531   struct GNUNET_TESTBED_PeerEventMessage *reply;
532   struct ForwardedOperationContext *fopc;
533   struct Peer *peer;
534   uint32_t peer_id;
535
536   msg = (const struct GNUNET_TESTBED_PeerStartMessage *) message;
537   peer_id = ntohl (msg->peer_id);
538   if (!VALID_PEER_ID (peer_id))
539   {
540     GNUNET_break (0);
541     LOG (GNUNET_ERROR_TYPE_ERROR,
542          "Asked to start a non existent peer with id: %u\n", peer_id);
543     GNUNET_SERVER_receive_done (client, GNUNET_OK);
544     return;
545   }
546   peer = GST_peer_list[peer_id];
547   if (GNUNET_YES == peer->is_remote)
548   {
549     fopc = GNUNET_malloc (sizeof (struct ForwardedOperationContext));
550     GNUNET_SERVER_client_keep (client);
551     fopc->client = client;
552     fopc->operation_id = GNUNET_ntohll (msg->operation_id);
553     fopc->type = OP_PEER_START;
554     fopc->opc =
555         GNUNET_TESTBED_forward_operation_msg_ (peer->details.remote.
556                                                slave->controller,
557                                                fopc->operation_id, &msg->header,
558                                                &GST_forwarded_operation_reply_relay,
559                                                fopc);
560     fopc->timeout_task =
561         GNUNET_SCHEDULER_add_delayed (GST_timeout, &GST_forwarded_operation_timeout,
562                                       fopc);
563     GNUNET_CONTAINER_DLL_insert_tail (fopcq_head, fopcq_tail, fopc);
564     GNUNET_SERVER_receive_done (client, GNUNET_OK);
565     return;
566   }
567   if (GNUNET_OK != start_peer (peer))
568   {
569     GST_send_operation_fail_msg (client, GNUNET_ntohll (msg->operation_id),
570                                  "Failed to start");
571     GNUNET_SERVER_receive_done (client, GNUNET_OK);
572     return;
573   }
574   reply = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_PeerEventMessage));
575   reply->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_PEER_EVENT);
576   reply->header.size = htons (sizeof (struct GNUNET_TESTBED_PeerEventMessage));
577   reply->event_type = htonl (GNUNET_TESTBED_ET_PEER_START);
578   reply->host_id = htonl (GST_context->host_id);
579   reply->peer_id = msg->peer_id;
580   reply->operation_id = msg->operation_id;
581   GST_queue_message (client, &reply->header);
582   GNUNET_SERVER_receive_done (client, GNUNET_OK);
583 }
584
585
586 /**
587  * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_DESTROYPEER messages
588  *
589  * @param cls NULL
590  * @param client identification of the client
591  * @param message the actual message
592  */
593 void
594 GST_handle_peer_stop (void *cls, struct GNUNET_SERVER_Client *client,
595                       const struct GNUNET_MessageHeader *message)
596 {
597   const struct GNUNET_TESTBED_PeerStopMessage *msg;
598   struct GNUNET_TESTBED_PeerEventMessage *reply;
599   struct ForwardedOperationContext *fopc;
600   struct Peer *peer;
601   uint32_t peer_id;
602
603   msg = (const struct GNUNET_TESTBED_PeerStopMessage *) message;
604   peer_id = ntohl (msg->peer_id);
605   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PEER_STOP for peer %u\n", peer_id);
606   if (!VALID_PEER_ID (peer_id))
607   {
608     GST_send_operation_fail_msg (client, GNUNET_ntohll (msg->operation_id),
609                                  "Peer not found");
610     GNUNET_SERVER_receive_done (client, GNUNET_OK);
611     return;
612   }
613   peer = GST_peer_list[peer_id];
614   if (GNUNET_YES == peer->is_remote)
615   {
616     LOG (GNUNET_ERROR_TYPE_DEBUG, "Forwarding PEER_STOP for peer %u\n",
617          peer_id);
618     fopc = GNUNET_malloc (sizeof (struct ForwardedOperationContext));
619     GNUNET_SERVER_client_keep (client);
620     fopc->client = client;
621     fopc->operation_id = GNUNET_ntohll (msg->operation_id);
622     fopc->type = OP_PEER_STOP;
623     fopc->opc =
624         GNUNET_TESTBED_forward_operation_msg_ (peer->details.remote.
625                                                slave->controller,
626                                                fopc->operation_id, &msg->header,
627                                                &GST_forwarded_operation_reply_relay,
628                                                fopc);
629     fopc->timeout_task =
630         GNUNET_SCHEDULER_add_delayed (GST_timeout, &GST_forwarded_operation_timeout,
631                                       fopc);
632     GNUNET_CONTAINER_DLL_insert_tail (fopcq_head, fopcq_tail, fopc);
633     GNUNET_SERVER_receive_done (client, GNUNET_OK);
634     return;
635   }
636   if (GNUNET_OK != stop_peer (peer))
637   {
638     LOG (GNUNET_ERROR_TYPE_WARNING, "Stopping peer %u failed\n", peer_id);
639     GST_send_operation_fail_msg (client, GNUNET_ntohll (msg->operation_id),
640                                  "Peer not running");
641     GNUNET_SERVER_receive_done (client, GNUNET_OK);
642     return;
643   }
644   LOG (GNUNET_ERROR_TYPE_DEBUG, "Peer %u successfully stopped\n", peer_id);
645   reply = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_PeerEventMessage));
646   reply->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_PEER_EVENT);
647   reply->header.size = htons (sizeof (struct GNUNET_TESTBED_PeerEventMessage));
648   reply->event_type = htonl (GNUNET_TESTBED_ET_PEER_STOP);
649   reply->host_id = htonl (GST_context->host_id);
650   reply->peer_id = msg->peer_id;
651   reply->operation_id = msg->operation_id;
652   GST_queue_message (client, &reply->header);
653   GNUNET_SERVER_receive_done (client, GNUNET_OK);
654   GNUNET_TESTING_peer_wait (peer->details.local.peer);
655 }
656
657
658 /**
659  * Handler for GNUNET_MESSAGE_TYPE_TESTBED_GETPEERCONFIG messages
660  *
661  * @param cls NULL
662  * @param client identification of the client
663  * @param message the actual message
664  */
665 void
666 GST_handle_peer_get_config (void *cls, struct GNUNET_SERVER_Client *client,
667                             const struct GNUNET_MessageHeader *message)
668 {
669   const struct GNUNET_TESTBED_PeerGetConfigurationMessage *msg;
670   struct GNUNET_TESTBED_PeerConfigurationInformationMessage *reply;
671   struct ForwardedOperationContext *fopc;
672   struct Peer *peer;
673   char *config;
674   char *xconfig;
675   size_t c_size;
676   size_t xc_size;
677   uint32_t peer_id;
678   uint16_t msize;
679
680   msg = (const struct GNUNET_TESTBED_PeerGetConfigurationMessage *) message;
681   peer_id = ntohl (msg->peer_id);
682   LOG_DEBUG ("Received GET_CONFIG for peer %u\n", peer_id);
683   if (!VALID_PEER_ID (peer_id))
684   {
685     GST_send_operation_fail_msg (client, GNUNET_ntohll (msg->operation_id),
686                                  "Peer not found");
687     GNUNET_SERVER_receive_done (client, GNUNET_OK);
688     return;
689   }
690   peer = GST_peer_list[peer_id];
691   if (GNUNET_YES == peer->is_remote)
692   {
693     LOG_DEBUG ("Forwarding PEER_GET_CONFIG for peer: %u\n", peer_id);
694     fopc = GNUNET_malloc (sizeof (struct ForwardedOperationContext));
695     GNUNET_SERVER_client_keep (client);
696     fopc->client = client;
697     fopc->operation_id = GNUNET_ntohll (msg->operation_id);
698     fopc->type = OP_PEER_INFO;
699     fopc->opc =
700         GNUNET_TESTBED_forward_operation_msg_ (peer->details.remote.
701                                                slave->controller,
702                                                fopc->operation_id, &msg->header,
703                                                &GST_forwarded_operation_reply_relay,
704                                                fopc);
705     fopc->timeout_task =
706         GNUNET_SCHEDULER_add_delayed (GST_timeout, &GST_forwarded_operation_timeout,
707                                       fopc);
708     GNUNET_CONTAINER_DLL_insert_tail (fopcq_head, fopcq_tail, fopc);
709     GNUNET_SERVER_receive_done (client, GNUNET_OK);
710     return;
711   }
712   LOG_DEBUG ("Received PEER_GET_CONFIG for peer: %u\n", peer_id);
713   config =
714       GNUNET_CONFIGURATION_serialize (GST_peer_list[peer_id]->details.local.cfg,
715                                       &c_size);
716   xc_size = GNUNET_TESTBED_compress_config_ (config, c_size, &xconfig);
717   GNUNET_free (config);
718   msize =
719       xc_size +
720       sizeof (struct GNUNET_TESTBED_PeerConfigurationInformationMessage);
721   reply = GNUNET_realloc (xconfig, msize);
722   (void) memmove (&reply[1], reply, xc_size);
723   reply->header.size = htons (msize);
724   reply->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_PEER_CONFIGURATION);
725   reply->peer_id = msg->peer_id;
726   reply->operation_id = msg->operation_id;
727   GNUNET_TESTING_peer_get_identity (GST_peer_list[peer_id]->details.local.peer,
728                                     &reply->peer_identity);
729   reply->config_size = htons ((uint16_t) c_size);
730   GST_queue_message (client, &reply->header);
731   GNUNET_SERVER_receive_done (client, GNUNET_OK);
732 }
733
734
735 /**
736  * Cleans up the given PeerReconfigureContext
737  *
738  * @param prc the PeerReconfigureContext
739  */
740 static void
741 cleanup_prc (struct PeerReconfigureContext *prc)
742 {
743   struct Peer *peer;
744
745   if (VALID_PEER_ID (prc->peer_id))
746   {
747     peer = GST_peer_list [prc->peer_id];
748     if (1 != prc->stopped)
749     {
750       GNUNET_TESTING_peer_stop_async_cancel (peer->details.local.peer);
751       stop_peer (peer);         /* Stop the peer synchronously */
752     }
753   }
754   if (NULL != prc->cfg)
755     GNUNET_CONFIGURATION_destroy (prc->cfg);
756   GNUNET_SERVER_client_drop (prc->client);
757   GNUNET_CONTAINER_DLL_remove (prc_head, prc_tail, prc);
758   GNUNET_free (prc);
759 }
760
761
762 /**
763  * Cleans up the Peer reconfigure context list
764  */
765 void
766 GST_free_prcq ()
767 {
768   while (NULL != prc_head)
769     cleanup_prc (prc_head);
770 }
771
772
773 /**
774  * Callback to inform whether the peer is running or stopped.
775  *
776  * @param cls the closure given to GNUNET_TESTING_peer_stop_async()
777  * @param peer the respective peer whose status is being reported
778  * @param success GNUNET_YES if the peer is stopped; GNUNET_SYSERR upon any
779  *          error
780  */
781 static void
782 prc_stop_cb (void *cls, struct GNUNET_TESTING_Peer *p, int success)
783 {
784   struct PeerReconfigureContext *prc = cls;
785   struct Peer *peer;
786   char *emsg;
787   
788   GNUNET_assert (VALID_PEER_ID (prc->peer_id));
789   peer = GST_peer_list [prc->peer_id];
790   GNUNET_assert (GNUNET_NO == peer->is_remote);
791   GNUNET_TESTING_peer_destroy (peer->details.local.peer);
792   GNUNET_CONFIGURATION_destroy (peer->details.local.cfg);
793   peer->details.local.cfg = prc->cfg;
794   prc->cfg = NULL;
795   prc->stopped = 1;
796   emsg = NULL;
797   peer->details.local.peer
798       = GNUNET_TESTING_peer_configure (GST_context->system,
799                                        peer->details.local.cfg, peer->id,
800                                        NULL /* Peer id */ ,
801                                        &emsg);
802   if (NULL == peer->details.local.peer)
803   {
804     GST_send_operation_fail_msg (prc->client, prc->op_id, emsg);
805     goto cleanup;
806   }
807   if (GNUNET_OK != start_peer (peer))
808   {
809     
810     GST_send_operation_fail_msg (prc->client, prc->op_id,
811                                  "Failed to start reconfigured peer");
812     goto cleanup;
813   }
814   GST_send_operation_success_msg (prc->client, prc->op_id);
815   
816  cleanup:
817   cleanup_prc (prc);
818   return;
819 }
820
821
822 /**
823  * Handler for GNUNET_MESSAGE_TYPDE_TESTBED_RECONFIGURE_PEER type messages.
824  * Should stop the peer asyncronously, destroy it and create it again with the
825  * new configuration.
826  *
827  * @param cls NULL
828  * @param client identification of the client
829  * @param message the actual message
830  */
831 void
832 GST_handle_peer_reconfigure (void *cls, struct GNUNET_SERVER_Client *client,
833                              const struct GNUNET_MessageHeader *message)
834 {
835   const struct GNUNET_TESTBED_PeerReconfigureMessage *msg;
836   struct Peer *peer;
837   struct GNUNET_CONFIGURATION_Handle *cfg;
838   struct ForwardedOperationContext *fopc;
839   struct PeerReconfigureContext *prc;
840   uint64_t op_id;
841   uint32_t peer_id;
842   uint16_t msize;
843   
844   msize = ntohs (message->size);
845   if (msize <= sizeof (struct GNUNET_TESTBED_PeerReconfigureMessage))
846   {
847     GNUNET_break_op (0);
848     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
849     return;
850   }
851   msg = (const struct GNUNET_TESTBED_PeerReconfigureMessage *) message;
852   peer_id = ntohl (msg->peer_id);
853   op_id = GNUNET_ntohll (msg->operation_id);
854   if (!VALID_PEER_ID (peer_id))
855   {
856     GNUNET_break (0);
857     GST_send_operation_fail_msg (client, op_id, "Peer not found");
858     GNUNET_SERVER_receive_done (client, GNUNET_OK);
859     return;
860   }
861   peer = GST_peer_list[peer_id];
862   if (GNUNET_YES == peer->is_remote)
863   {
864     LOG_DEBUG ("Forwarding PEER_RECONFIGURE for peer: %u\n", peer_id);
865     fopc = GNUNET_malloc (sizeof (struct ForwardedOperationContext));
866     GNUNET_SERVER_client_keep (client);
867     fopc->client = client;
868     fopc->operation_id = op_id;
869     fopc->type = OP_PEER_RECONFIGURE;
870     fopc->opc =
871         GNUNET_TESTBED_forward_operation_msg_ (peer->details.remote.
872                                                slave->controller,
873                                                fopc->operation_id, &msg->header,
874                                                &GST_forwarded_operation_reply_relay,
875                                                fopc);
876     fopc->timeout_task =
877         GNUNET_SCHEDULER_add_delayed (GST_timeout, &GST_forwarded_operation_timeout,
878                                       fopc);
879     GNUNET_CONTAINER_DLL_insert_tail (fopcq_head, fopcq_tail, fopc);
880     GNUNET_SERVER_receive_done (client, GNUNET_OK);
881     return;
882   }
883   LOG_DEBUG ("Received PEER_RECONFIGURE for peer %u\n", peer_id);
884   if (0 < peer->reference_cnt)
885   {
886     GNUNET_break (0);
887     GST_send_operation_fail_msg (client, op_id, "Peer in use");
888     GNUNET_SERVER_receive_done (client, GNUNET_OK);
889     return;
890   }
891   if (GNUNET_YES == peer->destroy_flag)
892   {
893     GNUNET_break (0);
894     GST_send_operation_fail_msg (client, op_id, "Peer is being destroyed");
895     GNUNET_SERVER_receive_done (client, GNUNET_OK);
896     return;
897   }
898   cfg = GNUNET_TESTBED_extract_config_ (message);
899   if (NULL == cfg)
900   {
901     GNUNET_break (0);    
902     GST_send_operation_fail_msg (client, op_id, "Compression error");
903     GNUNET_SERVER_receive_done (client, GNUNET_OK);
904     return;
905   }
906   prc = GNUNET_malloc (sizeof (struct PeerReconfigureContext));
907   prc->cfg = cfg;
908   prc->peer_id = peer_id;
909   prc->op_id = op_id;
910   prc->client = client;  
911   GNUNET_SERVER_client_keep (client);
912   GNUNET_CONTAINER_DLL_insert_tail (prc_head, prc_tail, prc);
913   GNUNET_TESTING_peer_stop_async (peer->details.local.peer, prc_stop_cb, prc);
914   GNUNET_SERVER_receive_done (client, GNUNET_OK);
915 }
916
917
918 /**
919  * Cleanup the context information created for managing a peer's service
920  *
921  * @param mctx the ManageServiceContext
922  */
923 static void
924 cleanup_mctx (struct ManageServiceContext *mctx)
925 {
926   mctx->expired = GNUNET_YES;
927   GNUNET_CONTAINER_DLL_remove (mctx_head, mctx_tail, mctx);
928   GNUNET_SERVER_client_drop (mctx->client);
929   GNUNET_ARM_disconnect_and_free (mctx->ah);
930   GNUNET_assert (0 < mctx->peer->reference_cnt);
931   mctx->peer->reference_cnt--;
932   if ( (GNUNET_YES == mctx->peer->destroy_flag)
933        && (0 == mctx->peer->reference_cnt) )
934     GST_destroy_peer (mctx->peer);
935   GNUNET_free (mctx);
936 }
937
938
939 /**
940  * Frees the ManageServiceContext queue
941  */
942 void
943 GST_free_mctxq ()
944 {
945   while (NULL != mctx_head)
946     cleanup_mctx (mctx_head);
947 }
948
949
950 /**
951  * Returns a string interpretation of 'rs'
952  *
953  * @param rs the request status from ARM
954  * @return a string interpretation of the request status
955  */
956 static const char *
957 arm_req_string (enum GNUNET_ARM_RequestStatus rs)
958 {
959   switch (rs)
960   {
961   case GNUNET_ARM_REQUEST_SENT_OK:
962     return _("Message was sent successfully");
963   case GNUNET_ARM_REQUEST_CONFIGURATION_ERROR:
964     return _("Misconfiguration (can't connect to the ARM service)");
965   case GNUNET_ARM_REQUEST_DISCONNECTED:
966     return _("We disconnected from ARM before we could send a request");
967   case GNUNET_ARM_REQUEST_BUSY:
968     return _("ARM API is busy");
969   case GNUNET_ARM_REQUEST_TOO_LONG:
970     return _("Request doesn't fit into a message");
971   case GNUNET_ARM_REQUEST_TIMEOUT:
972     return _("Request timed out");
973   }
974   return _("Unknown request status");
975 }
976
977
978 /**
979  * Returns a string interpretation of the 'result'
980  *
981  * @param result the arm result
982  * @return a string interpretation
983  */
984 static const char *
985 arm_ret_string (enum GNUNET_ARM_Result result)
986 {
987   switch (result)
988   {
989   case GNUNET_ARM_RESULT_STOPPED:
990     return _("%s is stopped");
991   case GNUNET_ARM_RESULT_STARTING:
992     return _("%s is starting");
993   case GNUNET_ARM_RESULT_STOPPING:
994     return _("%s is stopping");
995   case GNUNET_ARM_RESULT_IS_STARTING_ALREADY:
996     return _("%s is starting already");
997   case GNUNET_ARM_RESULT_IS_STOPPING_ALREADY:
998     return _("%s is stopping already");
999   case GNUNET_ARM_RESULT_IS_STARTED_ALREADY:
1000     return _("%s is started already");
1001   case GNUNET_ARM_RESULT_IS_STOPPED_ALREADY:
1002     return _("%s is stopped already");
1003   case GNUNET_ARM_RESULT_IS_NOT_KNOWN:
1004     return _("%s service is not known to ARM");
1005   case GNUNET_ARM_RESULT_START_FAILED:
1006     return _("%s service failed to start");
1007   case GNUNET_ARM_RESULT_IN_SHUTDOWN:
1008     return _("%s service can't be started because ARM is shutting down");
1009   }
1010   return _("%.s Unknown result code.");
1011 }
1012
1013
1014 /**
1015  * Function called in response to a start/stop request.
1016  * Will be called when request was not sent successfully,
1017  * or when a reply comes. If the request was not sent successfully,
1018  * 'rs' will indicate that, and 'service' and 'result' will be undefined.
1019  *
1020  * @param cls ManageServiceContext
1021  * @param rs status of the request
1022  * @param service service name
1023  * @param result result of the operation
1024  */
1025 static void
1026 service_manage_result_cb (void *cls, 
1027                           enum GNUNET_ARM_RequestStatus rs, 
1028                           const char *service, enum GNUNET_ARM_Result result)
1029 {
1030   struct ManageServiceContext *mctx = cls;
1031   char *emsg;
1032
1033   emsg = NULL;
1034   if (GNUNET_YES == mctx->expired)
1035     return;
1036   if (GNUNET_ARM_REQUEST_SENT_OK != rs)
1037   {
1038     GNUNET_asprintf (&emsg, "Error communicating with Peer %u's ARM: %s",
1039                      mctx->peer->id, arm_req_string (rs));
1040     goto ret;
1041   }
1042   if (1 == mctx->start)
1043     goto service_start_check;
1044   if (! ((GNUNET_ARM_RESULT_STOPPED == result)
1045             || (GNUNET_ARM_RESULT_STOPPING == result)
1046             || (GNUNET_ARM_RESULT_IS_STOPPING_ALREADY == result)
1047             || (GNUNET_ARM_RESULT_IS_STOPPED_ALREADY == result)) )
1048   {
1049     /* stopping a service failed */
1050     GNUNET_asprintf (&emsg, arm_ret_string (result), service);
1051     goto ret;
1052   }
1053   /* service stopped successfully */
1054   goto ret;
1055
1056  service_start_check:
1057   if (! ((GNUNET_ARM_RESULT_STARTING == result)
1058             || (GNUNET_ARM_RESULT_IS_STARTING_ALREADY == result)
1059             || (GNUNET_ARM_RESULT_IS_STARTED_ALREADY == result)) )
1060   {
1061     /* starting a service failed */
1062     GNUNET_asprintf (&emsg, arm_ret_string (result), service);
1063     goto ret;
1064   }
1065   /* service started successfully */
1066   
1067  ret:
1068   if (NULL != emsg)
1069   {
1070     LOG_DEBUG ("%s\n", emsg);
1071     GST_send_operation_fail_msg (mctx->client, mctx->op_id, emsg);
1072   }
1073   else
1074     GST_send_operation_success_msg (mctx->client, mctx->op_id);
1075   GNUNET_free_non_null (emsg);
1076   cleanup_mctx (mctx);
1077 }
1078
1079
1080 /**
1081  * Handler for GNUNET_TESTBED_ManagePeerServiceMessage message
1082  *
1083  * @param cls NULL
1084  * @param client identification of client
1085  * @param message the actual message
1086  */
1087 void
1088 GST_handle_manage_peer_service (void *cls, struct GNUNET_SERVER_Client *client,
1089                                 const struct GNUNET_MessageHeader *message)
1090 {
1091   const struct GNUNET_TESTBED_ManagePeerServiceMessage *msg;
1092   const char* service;
1093   struct Peer *peer;
1094   char *emsg;
1095   struct GNUNET_ARM_Handle *ah;
1096   struct ManageServiceContext *mctx;
1097   struct ForwardedOperationContext *fopc;
1098   uint64_t op_id;
1099   uint32_t peer_id;
1100   uint16_t msize;
1101   
1102
1103   msize = ntohs (message->size);
1104   if (msize <= sizeof (struct GNUNET_TESTBED_ManagePeerServiceMessage))
1105   {
1106     GNUNET_break_op (0);  
1107     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1108     return;
1109   }
1110   msg = (const struct GNUNET_TESTBED_ManagePeerServiceMessage *) message;
1111   service = (const char *) &msg[1];  
1112   if ('\0' != service[msize - sizeof
1113                       (struct GNUNET_TESTBED_ManagePeerServiceMessage) - 1])
1114   {
1115     GNUNET_break_op (0);
1116     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1117     return;
1118   }
1119   if (1 < msg->start)
1120   {
1121     GNUNET_break_op (0);
1122     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1123     return;    
1124   }
1125   peer_id = ntohl (msg->peer_id);
1126   op_id = GNUNET_ntohll (msg->operation_id);
1127   LOG_DEBUG ("Received request to manage service %s on peer %u\n",
1128              service, (unsigned int) peer_id);
1129   if ((GST_peer_list_size <= peer_id)
1130       || (NULL == (peer = GST_peer_list[peer_id])))
1131   {
1132     GNUNET_asprintf (&emsg, "Asked to manage service of a non existent peer "
1133                      "with id: %u", peer_id);
1134     goto err_ret;
1135   }
1136   if (0 == strcasecmp ("arm", service))
1137   {
1138     emsg = GNUNET_strdup ("Cannot start/stop peer's ARM service.  "
1139                           "Use peer start/stop for that");
1140     goto err_ret;
1141   }
1142   if (GNUNET_YES == peer->is_remote)
1143   {
1144     /* Forward the destory message to sub controller */
1145     fopc = GNUNET_malloc (sizeof (struct ForwardedOperationContext));
1146     GNUNET_SERVER_client_keep (client);
1147     fopc->client = client;
1148     fopc->cls = peer;
1149     fopc->type = OP_MANAGE_SERVICE;
1150     fopc->operation_id = op_id;
1151     fopc->opc =
1152         GNUNET_TESTBED_forward_operation_msg_ (peer->details.remote.
1153                                                slave->controller,
1154                                                fopc->operation_id, &msg->header,
1155                                                &GST_forwarded_operation_reply_relay,
1156                                                fopc);
1157     fopc->timeout_task =
1158         GNUNET_SCHEDULER_add_delayed (GST_timeout, &GST_forwarded_operation_timeout,
1159                                       fopc);
1160     GNUNET_CONTAINER_DLL_insert_tail (fopcq_head, fopcq_tail, fopc);
1161     GNUNET_SERVER_receive_done (client, GNUNET_OK);
1162     return;
1163   }
1164   if ((0 != peer->reference_cnt)
1165       && ( (0 == strcasecmp ("core", service))
1166            || (0 == strcasecmp ("transport", service)) )  )
1167   {
1168     GNUNET_asprintf (&emsg, "Cannot stop %s service of peer with id: %u "
1169                      "since it is required by existing operations",
1170                      service, peer_id);
1171     goto err_ret;
1172   }
1173   ah = GNUNET_ARM_connect (peer->details.local.cfg, NULL, NULL);
1174   if (NULL == ah)
1175   {
1176     GNUNET_asprintf (&emsg,
1177                      "Cannot connect to ARM service of peer with id: %u",
1178                      peer_id);
1179     goto err_ret;
1180   }
1181   mctx = GNUNET_malloc (sizeof (struct ManageServiceContext));
1182   mctx->peer = peer;
1183   peer->reference_cnt++;
1184   mctx->op_id = op_id;
1185   mctx->ah = ah;
1186   GNUNET_SERVER_client_keep (client);
1187   mctx->client = client;
1188   mctx->start = msg->start;
1189   GNUNET_CONTAINER_DLL_insert_tail (mctx_head, mctx_tail, mctx);
1190   if (1 == mctx->start)
1191     GNUNET_ARM_request_service_start (mctx->ah, service,
1192                                       GNUNET_OS_INHERIT_STD_ERR,
1193                                       GST_timeout,
1194                                       service_manage_result_cb,
1195                                       mctx);
1196   else
1197     GNUNET_ARM_request_service_stop (mctx->ah, service,
1198                                      GST_timeout,
1199                                      service_manage_result_cb,
1200                                      mctx);
1201   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1202   return;
1203
1204  err_ret:
1205   LOG (GNUNET_ERROR_TYPE_ERROR, "%s\n", emsg);
1206   GST_send_operation_fail_msg (client, op_id, emsg);
1207   GNUNET_free (emsg);
1208   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1209 }
1210
1211
1212 /**
1213  * Stops and destroys all peers
1214  */
1215 void
1216 GST_destroy_peers ()
1217 {
1218   struct Peer *peer;
1219   unsigned int id;
1220
1221   if (NULL == GST_peer_list)
1222     return;
1223   for (id = 0; id < GST_peer_list_size; id++)
1224   {
1225     peer = GST_peer_list[id];
1226     if (NULL == peer)
1227       continue;
1228     /* If destroy flag is set it means that this peer should have been
1229      * destroyed by a context which we destroy before */
1230     GNUNET_break (GNUNET_NO == peer->destroy_flag);
1231     /* counter should be zero as we free all contexts before */
1232     GNUNET_break (0 == peer->reference_cnt);
1233     if ((GNUNET_NO == peer->is_remote) &&
1234         (GNUNET_YES == peer->details.local.is_running))
1235       GNUNET_TESTING_peer_kill (peer->details.local.peer);
1236   }
1237   for (id = 0; id < GST_peer_list_size; id++)
1238   {
1239     peer = GST_peer_list[id];
1240     if (NULL == peer)
1241       continue;    
1242     if (GNUNET_NO == peer->is_remote)
1243     {
1244       if (GNUNET_YES == peer->details.local.is_running)
1245         GNUNET_TESTING_peer_wait (peer->details.local.peer);
1246       GNUNET_TESTING_peer_destroy (peer->details.local.peer);
1247       GNUNET_CONFIGURATION_destroy (peer->details.local.cfg);
1248     }
1249     GNUNET_free (peer);
1250   }
1251   GNUNET_free_non_null (GST_peer_list);
1252   GST_peer_list = NULL;
1253   GST_peer_list_size = 0;
1254 }
1255
1256
1257 /**
1258  * Task run upon timeout of forwarded SHUTDOWN_PEERS operation
1259  *
1260  * @param cls the ForwardedOperationContext
1261  * @param tc the scheduler task context
1262  */
1263 static void
1264 shutdown_peers_timeout_cb (void *cls,
1265                            const struct GNUNET_SCHEDULER_TaskContext *tc)
1266 {
1267   struct ForwardedOperationContext *fo_ctxt = cls;
1268   struct HandlerContext_ShutdownPeers *hc;
1269
1270   fo_ctxt->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1271   hc = fo_ctxt->cls;
1272   hc->timeout = GNUNET_YES;
1273   GNUNET_assert (0 < hc->nslaves);
1274   hc->nslaves--;
1275   if (0 == hc->nslaves)
1276     GST_send_operation_fail_msg (fo_ctxt->client, fo_ctxt->operation_id,
1277                                  "Timeout at a slave controller");
1278   GNUNET_TESTBED_forward_operation_msg_cancel_ (fo_ctxt->opc);  
1279   GNUNET_SERVER_client_drop (fo_ctxt->client);
1280   GNUNET_CONTAINER_DLL_remove (fopcq_head, fopcq_tail, fo_ctxt);
1281   GNUNET_free (fo_ctxt);
1282 }
1283
1284
1285 /**
1286  * The reply msg handler forwarded SHUTDOWN_PEERS operation.  Checks if a
1287  * success reply is received from all clients and then sends the success message
1288  * to the client
1289  *
1290  * @param cls ForwardedOperationContext
1291  * @param msg the message to relay
1292  */
1293 static void
1294 shutdown_peers_reply_cb (void *cls,
1295                          const struct GNUNET_MessageHeader *msg)
1296 {
1297   struct ForwardedOperationContext *fo_ctxt = cls;
1298   struct HandlerContext_ShutdownPeers *hc;
1299   
1300   hc = fo_ctxt->cls;
1301   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != fo_ctxt->timeout_task);
1302   GNUNET_SCHEDULER_cancel (fo_ctxt->timeout_task);
1303   fo_ctxt->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1304   GNUNET_assert (0 < hc->nslaves);
1305   hc->nslaves--;
1306   if (GNUNET_MESSAGE_TYPE_TESTBED_GENERIC_OPERATION_SUCCESS != 
1307       ntohs (msg->type))
1308     hc->timeout = GNUNET_YES;
1309   if (0 == hc->nslaves)
1310   {
1311     if (GNUNET_YES == hc->timeout)
1312       GST_send_operation_fail_msg (fo_ctxt->client, fo_ctxt->operation_id,
1313                                    "Timeout at a slave controller");
1314     else
1315       GST_send_operation_success_msg (fo_ctxt->client, fo_ctxt->operation_id);
1316   }
1317   GNUNET_SERVER_client_drop (fo_ctxt->client);
1318   GNUNET_CONTAINER_DLL_remove (fopcq_head, fopcq_tail, fo_ctxt);
1319   GNUNET_free (fo_ctxt);
1320 }
1321
1322
1323 /**
1324  * Handler for GNUNET_MESSAGE_TYPE_TESTBED_SHUTDOWN_PEERS messages
1325  *
1326  * @param cls NULL
1327  * @param client identification of the client
1328  * @param message the actual message
1329  */
1330 void
1331 GST_handle_shutdown_peers (void *cls, struct GNUNET_SERVER_Client *client,
1332                            const struct GNUNET_MessageHeader *message)
1333 {
1334   const struct GNUNET_TESTBED_ShutdownPeersMessage *msg;
1335   struct HandlerContext_ShutdownPeers *hc;
1336   struct Slave *slave;
1337   struct ForwardedOperationContext *fo_ctxt;
1338   uint64_t op_id;
1339   unsigned int cnt;
1340
1341   msg = (const struct GNUNET_TESTBED_ShutdownPeersMessage *) message;
1342   LOG_DEBUG ("Received SHUTDOWN_PEERS\n");
1343     /* Stop and destroy all peers */
1344   GST_free_mctxq ();
1345   GST_free_occq ();
1346   GST_free_roccq ();
1347   GST_clear_fopcq ();
1348   /* Forward to all slaves which we have started */
1349   op_id = GNUNET_ntohll (msg->operation_id);
1350   hc = GNUNET_malloc (sizeof (struct HandlerContext_ShutdownPeers));
1351   /* FIXME: have a better implementation where we track which slaves are
1352      started by this controller */
1353   for (cnt = 0; cnt < GST_slave_list_size; cnt++)
1354   {
1355     slave = GST_slave_list[cnt];
1356     if (NULL == slave)
1357       continue;
1358     if (NULL == slave->controller_proc) /* We didn't start the slave */
1359       continue;
1360     LOG_DEBUG ("Forwarding SHUTDOWN_PEERS\n");
1361     hc->nslaves++;
1362     fo_ctxt = GNUNET_malloc (sizeof (struct ForwardedOperationContext));
1363     GNUNET_SERVER_client_keep (client);
1364     fo_ctxt->client = client;
1365     fo_ctxt->operation_id = op_id;
1366     fo_ctxt->cls = hc;
1367     fo_ctxt->type = OP_SHUTDOWN_PEERS;
1368     fo_ctxt->opc =
1369         GNUNET_TESTBED_forward_operation_msg_ (slave->controller,
1370                                                fo_ctxt->operation_id,
1371                                                &msg->header,
1372                                                shutdown_peers_reply_cb,
1373                                                fo_ctxt);
1374     fo_ctxt->timeout_task =
1375         GNUNET_SCHEDULER_add_delayed (GST_timeout, &shutdown_peers_timeout_cb,
1376                                       fo_ctxt);
1377     GNUNET_CONTAINER_DLL_insert_tail (fopcq_head, fopcq_tail, fo_ctxt);
1378   }
1379   LOG_DEBUG ("Shutting down peers\n");
1380   GST_destroy_peers ();
1381   if (0 == hc->nslaves)
1382   {
1383     GST_send_operation_success_msg (client, op_id);
1384     GNUNET_free (hc);
1385   }
1386   GNUNET_SERVER_receive_done (client, GNUNET_OK);  
1387 }