changes to scheduler
[oweals/gnunet.git] / src / experimentation / gnunet-daemon-experimentation_nodes.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file experimentation/gnunet-daemon-experimentation_nodes.c
23  * @brief experimentation daemon: node management
24  * @author Christian Grothoff
25  * @author Matthias Wachs
26  */
27 #include "platform.h"
28 #include "gnunet_getopt_lib.h"
29 #include "gnunet_util_lib.h"
30 #include "gnunet_core_service.h"
31 #include "gnunet_statistics_service.h"
32 #include "gnunet-daemon-experimentation.h"
33
34
35 #define FAST_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
36 /**
37  * Core handle
38  */
39 static struct GNUNET_CORE_Handle *ch;
40
41
42 /**
43  * Peer's own identity
44  */
45 static struct GNUNET_PeerIdentity me;
46
47
48 /**
49  * Nodes with a pending request
50  */
51 struct GNUNET_CONTAINER_MultiHashMap *nodes_requested;
52
53
54 /**
55  * Active experimentation nodes
56  */
57 struct GNUNET_CONTAINER_MultiHashMap *nodes_active;
58
59
60 /**
61  * Inactive experimentation nodes
62  * To be excluded from future requests
63  */
64 struct GNUNET_CONTAINER_MultiHashMap *nodes_inactive;
65
66
67 /**
68  * Update statistics
69  *
70  * @param m hashmap to update values from
71  */
72 static void update_stats (struct GNUNET_CONTAINER_MultiHashMap *m)
73 {
74         GNUNET_assert (NULL != m);
75         GNUNET_assert (NULL != GED_stats);
76
77         if (m == nodes_active)
78         {
79                         GNUNET_STATISTICS_set (GED_stats, "# nodes active",
80                                         GNUNET_CONTAINER_multihashmap_size(m), GNUNET_NO);
81         }
82         else if (m == nodes_inactive)
83         {
84                         GNUNET_STATISTICS_set (GED_stats, "# nodes inactive",
85                                         GNUNET_CONTAINER_multihashmap_size(m), GNUNET_NO);
86         }
87         else if (m == nodes_requested)
88         {
89                         GNUNET_STATISTICS_set (GED_stats, "# nodes requested",
90                                         GNUNET_CONTAINER_multihashmap_size(m), GNUNET_NO);
91         }
92         else
93                 GNUNET_break (0);
94
95 }
96
97
98 /**
99  * Clean up nodes
100  *
101  * @param cls the hashmap to clean up
102  * @param key key of the current node
103  * @param value related node object
104  * @return always GNUNET_OK
105  */
106 static int
107 cleanup_nodes (void *cls,
108                                                          const struct GNUNET_HashCode * key,
109                                                          void *value)
110 {
111         struct Node *n;
112         struct GNUNET_CONTAINER_MultiHashMap *cur = cls;
113
114         n = value;
115         if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
116         {
117                 GNUNET_SCHEDULER_cancel (n->timeout_task);
118                 n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
119         }
120         if (NULL != n->cth)
121         {
122                 GNUNET_CORE_notify_transmit_ready_cancel (n->cth);
123                 n->cth = NULL;
124         }
125         GNUNET_free_non_null (n->issuer_id);
126
127         GNUNET_CONTAINER_multihashmap_remove (cur, key, value);
128         GNUNET_free (value);
129         return GNUNET_OK;
130 }
131
132
133 /**
134  * Check if id passed is my id
135  *
136  * @param id the id to check
137  * @return GNUNET_YES or GNUNET_NO
138  */
139 static int is_me (const struct GNUNET_PeerIdentity *id)
140 {
141         if (0 == memcmp (&me, id, sizeof (me)))
142                 return GNUNET_YES;
143         else
144                 return GNUNET_NO;
145 }
146
147 /**
148  * Core startup callback
149  *
150  * @param cls unused
151  * @param server core service's server handle
152  * @param my_identity my id
153  */
154 static void
155 core_startup_handler (void *cls,
156                                                                                         struct GNUNET_CORE_Handle *server,
157                       const struct GNUNET_PeerIdentity *my_identity)
158 {
159         me = *my_identity;
160 }
161
162
163 /**
164  * Remove experimentation request due to timeout
165  *
166  * @param cls the related node
167  * @param tc scheduler's task context
168  */
169 static void
170 remove_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
171 {
172         struct Node *n = cls;
173
174         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Removing request for peer %s due to timeout\n"),
175                         GNUNET_i2s (&n->id));
176
177         if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (nodes_requested, &n->id.hashPubKey))
178         {
179                         GNUNET_CONTAINER_multihashmap_remove (nodes_requested, &n->id.hashPubKey, n);
180                         update_stats (nodes_requested);
181                         GNUNET_CONTAINER_multihashmap_put (nodes_inactive, &n->id.hashPubKey, n,
182                                         GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
183                         update_stats (nodes_inactive);
184         }
185
186         n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
187         if (NULL != n->cth)
188         {
189                 GNUNET_CORE_notify_transmit_ready_cancel (n->cth);
190                 n->cth = NULL;
191         }
192 }
193
194
195 /**
196  * Core's transmit notify callback to send request
197  *
198  * @param cls the related node
199  * @param bufsize buffer size
200  * @param buf the buffer to copy to
201  * @return bytes passed
202  */
203 size_t send_experimentation_request_cb (void *cls, size_t bufsize, void *buf)
204 {
205         struct Node *n = cls;
206         struct Experimentation_Request msg;
207         size_t msg_size = sizeof (msg);
208         size_t ri_size = sizeof (struct Experimentation_Issuer) * GSE_my_issuer_count;
209         size_t total_size = msg_size + ri_size;
210
211         memset (buf, '0', bufsize);
212         n->cth = NULL;
213   if (buf == NULL)
214   {
215     /* client disconnected */
216     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client disconnected\n");
217     if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
218                 GNUNET_SCHEDULER_cancel (n->timeout_task);
219     GNUNET_SCHEDULER_add_now (&remove_request, n);
220     return 0;
221   }
222   GNUNET_assert (bufsize >= total_size);
223
224         msg.msg.size = htons (total_size);
225         msg.msg.type = htons (GNUNET_MESSAGE_TYPE_EXPERIMENTATION_REQUEST);
226         msg.capabilities = htonl (GSE_node_capabilities);
227         msg.issuer_count = htonl (GSE_my_issuer_count);
228         memcpy (buf, &msg, msg_size);
229         memcpy (&((char *) buf)[msg_size], GSE_my_issuer, ri_size);
230
231         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Sending request to peer %s\n"),
232                         GNUNET_i2s (&n->id));
233         return total_size;
234 }
235
236
237 /**
238  * Send request to peer to start add him to to the set of experimentation nodes
239  *
240  * @param peer the peer to send to
241  */
242 static void send_experimentation_request (const struct GNUNET_PeerIdentity *peer)
243 {
244         struct Node *n;
245         size_t size;
246         size_t c_issuers;
247
248         c_issuers = GSE_my_issuer_count;
249
250         size = sizeof (struct Experimentation_Request) +
251                                  c_issuers * sizeof (struct Experimentation_Issuer);
252         n = GNUNET_malloc (sizeof (struct Node));
253         n->id = *peer;
254         n->timeout_task = GNUNET_SCHEDULER_add_delayed (EXP_RESPONSE_TIMEOUT, &remove_request, n);
255         n->capabilities = NONE;
256         n->cth = GNUNET_CORE_notify_transmit_ready(ch, GNUNET_NO, 0,
257                                                                 GNUNET_TIME_relative_get_forever_(),
258                                                                 peer, size, send_experimentation_request_cb, n);
259
260         GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put (nodes_requested,
261                         &peer->hashPubKey, n, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
262         update_stats (nodes_requested);
263 }
264
265
266 /**
267  * Core's transmit notify callback to send response
268  *
269  * @param cls the related node
270  * @param bufsize buffer size
271  * @param buf the buffer to copy to
272  * @return bytes passed
273  */
274 size_t send_response_cb (void *cls, size_t bufsize, void *buf)
275 {
276         struct Node *n = cls;
277         struct Experimentation_Response msg;
278         size_t ri_size = GSE_my_issuer_count * sizeof (struct Experimentation_Issuer);
279         size_t msg_size = sizeof (msg);
280         size_t total_size = msg_size + ri_size;
281
282         n->cth = NULL;
283   if (buf == NULL)
284   {
285     /* client disconnected */
286     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client disconnected\n");
287     return 0;
288   }
289   GNUNET_assert (bufsize >= total_size);
290
291         msg.msg.size = htons (total_size);
292         msg.msg.type = htons (GNUNET_MESSAGE_TYPE_EXPERIMENTATION_RESPONSE);
293         msg.capabilities = htonl (GSE_node_capabilities);
294         msg.issuer_count = htonl (GSE_my_issuer_count);
295         memcpy (buf, &msg, msg_size);
296         memcpy (&((char *) buf)[msg_size], GSE_my_issuer, ri_size);
297
298         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Sending response to peer %s\n"),
299                         GNUNET_i2s (&n->id));
300         return total_size;
301 }
302
303
304 static void
305 get_experiments_cb (struct Node *n, struct Experiment *e)
306 {
307         static int counter = 0;
308         if (NULL == e)
309         {
310                         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Added %u experiments for peer %s\n"),
311                                         counter, GNUNET_i2s (&n->id));
312                         return;
313         }
314
315         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Starting experiment `%s' with peer %s\n"),
316                         e->name,
317                         GNUNET_i2s (&n->id));
318
319         /* Tell the scheduler to add a node with an experiment */
320         GED_scheduler_add (n, e, GNUNET_YES);
321         counter ++;
322 }
323
324 struct Node *
325 get_node (const struct GNUNET_PeerIdentity *id)
326 {
327         struct Node * res;
328         struct Node * tmp;
329
330         res = NULL;
331         tmp = NULL;
332         tmp = GNUNET_CONTAINER_multihashmap_get (nodes_active, &id->hashPubKey);
333         if (res == NULL)
334                 res = tmp;
335
336         tmp = GNUNET_CONTAINER_multihashmap_get (nodes_inactive, &id->hashPubKey);
337         if (res == NULL)
338                 res = tmp;
339         else
340                 GNUNET_break (0); /* Multiple instances */
341
342         tmp = GNUNET_CONTAINER_multihashmap_get (nodes_requested, &id->hashPubKey);
343         if (res == NULL)
344                 res = tmp;
345         else
346                 GNUNET_break (0); /* Multiple instances */
347
348         return res;
349 }
350
351
352 /**
353  * Set a specific node as active
354  *
355  * @param n the node
356  */
357 static void node_make_active (struct Node *n)
358 {
359         int c1;
360   GNUNET_CONTAINER_multihashmap_put (nodes_active,
361                         &n->id.hashPubKey, n, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
362         update_stats (nodes_active);
363         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Added peer `%s' as active node\n"),
364                         GNUNET_i2s (&n->id));
365
366         /* Request experiments for this node to start them */
367         for (c1 = 0; c1 < n->issuer_count; c1++)
368         {
369
370                 GED_experiments_get (n, &n->issuer_id[c1], &get_experiments_cb);
371         }
372 }
373
374
375 /**
376  * Handle a request and send a response
377  *
378  * @param peer the source
379  * @param message the message
380  */
381 static void handle_request (const struct GNUNET_PeerIdentity *peer,
382                                                                                                                 const struct GNUNET_MessageHeader *message)
383 {
384         struct Node *n;
385         struct Experimentation_Request *rm = (struct Experimentation_Request *) message;
386         struct Experimentation_Issuer *rmi = (struct Experimentation_Issuer *) &rm[1];
387         int c1;
388         int c2;
389         uint32_t ic;
390         uint32_t ic_accepted;
391         int make_active;
392
393         if (ntohs (message->size) < sizeof (struct Experimentation_Request))
394         {
395                 GNUNET_break (0);
396                 return;
397         }
398         ic = ntohl (rm->issuer_count);
399         if (ntohs (message->size) != sizeof (struct Experimentation_Request) + ic * sizeof (struct Experimentation_Issuer))
400         {
401                 GNUNET_break (0);
402                 return;
403         }
404
405         make_active = GNUNET_NO;
406         if (NULL != (n = GNUNET_CONTAINER_multihashmap_get (nodes_active, &peer->hashPubKey)))
407         {
408                         /* Nothing to do */
409         }
410         else if (NULL != (n = GNUNET_CONTAINER_multihashmap_get (nodes_requested, &peer->hashPubKey)))
411         {
412                         GNUNET_CONTAINER_multihashmap_remove (nodes_requested, &peer->hashPubKey, n);
413                         if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
414                         {
415                                 GNUNET_SCHEDULER_cancel (n->timeout_task);
416                                 n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
417                         }
418                         if (NULL != n->cth)
419                         {
420                                 GNUNET_CORE_notify_transmit_ready_cancel (n->cth);
421                                 n->cth = NULL;
422                         }
423                         update_stats (nodes_requested);
424                         make_active = GNUNET_YES;
425         }
426         else if (NULL != (n = GNUNET_CONTAINER_multihashmap_get (nodes_inactive, &peer->hashPubKey)))
427         {
428                         GNUNET_CONTAINER_multihashmap_remove (nodes_inactive, &peer->hashPubKey, n);
429                         update_stats (nodes_inactive);
430                         make_active = GNUNET_YES;
431         }
432         else
433         {
434                         /* Create new node */
435                         n = GNUNET_malloc (sizeof (struct Node));
436                         n->id = *peer;
437                         n->capabilities = NONE;
438                         make_active = GNUNET_YES;
439         }
440
441         /* Update node */
442         n->capabilities = ntohl (rm->capabilities);
443
444         /* Filter accepted issuer */
445         ic_accepted = 0;
446         for (c1 = 0; c1 < ic; c1++)
447         {
448                 if (GNUNET_YES == GED_experiments_issuer_accepted(&rmi[c1].issuer_id))
449                         ic_accepted ++;
450         }
451         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Request from peer `%s' with %u issuers, we accepted %u issuer \n"),
452                         GNUNET_i2s (peer), ic, ic_accepted);
453         GNUNET_free_non_null (n->issuer_id);
454         n->issuer_id = GNUNET_malloc (ic_accepted * sizeof (struct GNUNET_PeerIdentity));
455         c2 = 0;
456         for (c1 = 0; c1 < ic; c1++)
457         {
458                         if (GNUNET_YES == GED_experiments_issuer_accepted(&rmi[c1].issuer_id))
459                         {
460                                 n->issuer_id[c2] = rmi[c1].issuer_id;
461                                 c2 ++;
462                         }
463         }
464         n->issuer_count = ic_accepted;
465
466         if (GNUNET_YES == make_active)
467                 node_make_active (n);
468
469         /* Send response */
470         n->cth = GNUNET_CORE_notify_transmit_ready (ch, GNUNET_NO, 0,
471                                                                 GNUNET_TIME_relative_get_forever_(),
472                                                                 peer,
473                                                                 sizeof (struct Experimentation_Response) +
474                                                                 GSE_my_issuer_count * sizeof (struct Experimentation_Issuer),
475                                                                 send_response_cb, n);
476 }
477
478
479 /**
480  * Handle a response
481  *
482  * @param peer the source
483  * @param message the message
484  */
485 static void handle_response (const struct GNUNET_PeerIdentity *peer,
486                                                                                                                  const struct GNUNET_MessageHeader *message)
487 {
488         struct Node *n;
489         struct Experimentation_Response *rm = (struct Experimentation_Response *) message;
490         struct Experimentation_Issuer *rmi = (struct Experimentation_Issuer *) &rm[1];
491         uint32_t ic;
492         uint32_t ic_accepted;
493         int make_active;
494         unsigned int c1;
495         unsigned int c2;
496
497
498         if (ntohs (message->size) < sizeof (struct Experimentation_Response))
499         {
500                 GNUNET_break (0);
501                 return;
502         }
503         ic = ntohl (rm->issuer_count);
504         if (ntohs (message->size) != sizeof (struct Experimentation_Response) + ic * sizeof (struct Experimentation_Issuer))
505         {
506                 GNUNET_break (0);
507                 return;
508         }
509
510         make_active = GNUNET_NO;
511         if (NULL != (n = GNUNET_CONTAINER_multihashmap_get (nodes_active, &peer->hashPubKey)))
512         {
513                         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Received %s from %s peer `%s'\n"),
514                                         "RESPONSE", "active", GNUNET_i2s (peer));
515         }
516         else if (NULL != (n = GNUNET_CONTAINER_multihashmap_get (nodes_requested, &peer->hashPubKey)))
517         {
518                         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Received %s from %s peer `%s'\n"),
519                                         "RESPONSE", "requested", GNUNET_i2s (peer));
520                         GNUNET_CONTAINER_multihashmap_remove (nodes_requested, &peer->hashPubKey, n);
521                         if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
522                         {
523                                 GNUNET_SCHEDULER_cancel (n->timeout_task);
524                                 n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
525                         }
526                         if (NULL != n->cth)
527                         {
528                                 GNUNET_CORE_notify_transmit_ready_cancel (n->cth);
529                                 n->cth = NULL;
530                         }
531                         update_stats (nodes_requested);
532                         make_active = GNUNET_YES;
533         }
534         else if (NULL != (n = GNUNET_CONTAINER_multihashmap_get (nodes_inactive, &peer->hashPubKey)))
535         {
536                         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Received %s from peer `%s'\n"),
537                                         "RESPONSE", "inactive", GNUNET_i2s (peer));
538                         GNUNET_CONTAINER_multihashmap_remove (nodes_inactive, &peer->hashPubKey, n);
539                         update_stats (nodes_inactive);
540                         make_active = GNUNET_YES;
541         }
542         else
543         {
544                         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Received %s from %s peer `%s'\n"),
545                                         "RESPONSE", "unknown", GNUNET_i2s (peer));
546                         return;
547         }
548
549         /* Update */
550         n->capabilities = ntohl (rm->capabilities);
551
552         /* Filter accepted issuer */
553         ic_accepted = 0;
554         for (c1 = 0; c1 < ic; c1++)
555         {
556                 if (GNUNET_YES == GED_experiments_issuer_accepted(&rmi[c1].issuer_id))
557                         ic_accepted ++;
558         }
559         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Response from peer `%s' with %u issuers, we accepted %u issuer \n"),
560                         GNUNET_i2s (peer), ic, ic_accepted);
561         GNUNET_free_non_null (n->issuer_id);
562         n->issuer_id = GNUNET_malloc (ic_accepted * sizeof (struct GNUNET_PeerIdentity));
563         c2 = 0;
564         for (c1 = 0; c1 < ic; c1++)
565         {
566                         if (GNUNET_YES == GED_experiments_issuer_accepted(&rmi[c1].issuer_id))
567                         {
568                                 n->issuer_id[c2] = rmi[c1].issuer_id;
569                                 c2 ++;
570                         }
571         }
572         n->issuer_count = ic_accepted;
573
574         if (GNUNET_YES == make_active)
575                 node_make_active (n);
576 }
577
578 /**
579  * Handle a response
580  *
581  * @param peer the source
582  * @param message the message
583  */
584 static void handle_start (const struct GNUNET_PeerIdentity *peer,
585                                                                                                                  const struct GNUNET_MessageHeader *message)
586 {
587         uint16_t size;
588         uint32_t name_len;
589         const struct GED_start_message *msg;
590         const char *name;
591         struct Node *n;
592         struct Experiment *e;
593
594         if (NULL == peer)
595         {
596                 GNUNET_break (0);
597                 return;
598         }
599         if (NULL == message)
600         {
601                 GNUNET_break (0);
602                 return;
603         }
604
605         size = ntohs (message->size);
606         if (size < sizeof (struct GED_start_message))
607         {
608                 GNUNET_break (0);
609                 return;
610         }
611         msg = (const struct GED_start_message *) message;
612         name_len = ntohl (msg->len_name);
613         if (size != sizeof (struct GED_start_message) + name_len)
614         {
615                 GNUNET_break (0);
616                 return;
617         }
618
619         n = get_node (peer);
620         if (NULL == n)
621         {
622                 GNUNET_break (0);
623                 return;
624         }
625         name = (const char *) &msg[1];
626         if (name[name_len-1] != '\0')
627         {
628                 GNUNET_break (0);
629                 return;
630         }
631
632         if (name_len != strlen (name) + 1)
633         {
634                 GNUNET_break (0);
635                 return;
636         }
637
638         e = GED_experiments_find (&msg->issuer, name, GNUNET_TIME_absolute_ntoh(msg->version_nbo));
639         if (NULL == e)
640         {
641                 GNUNET_break (0);
642                 return;
643         }
644
645         GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received %s message from peer %s for experiment `%s'\n"),
646                         "START", GNUNET_i2s (peer), name);
647
648         GED_scheduler_handle_start (n, e);
649 }
650
651 /**
652  * Handle a response
653  *
654  * @param peer the source
655  * @param message the message
656  */
657 static void handle_start_ack (const struct GNUNET_PeerIdentity *peer,
658                                                                                                                  const struct GNUNET_MessageHeader *message)
659 {
660         uint16_t size;
661         uint32_t name_len;
662         const struct GED_start_ack_message *msg;
663         const char *name;
664         struct Node *n;
665         struct Experiment *e;
666
667         if (NULL == peer)
668         {
669                 GNUNET_break (0);
670                 return;
671         }
672         if (NULL == message)
673         {
674                 GNUNET_break (0);
675                 return;
676         }
677
678         size = ntohs (message->size);
679         if (size < sizeof (struct GED_start_ack_message))
680         {
681                 GNUNET_break (0);
682                 return;
683         }
684         msg = (const struct GED_start_ack_message *) message;
685         name_len = ntohl (msg->len_name);
686         if (size != sizeof (struct GED_start_message) + name_len)
687         {
688                 GNUNET_break (0);
689                 return;
690         }
691
692         n = get_node (peer);
693         if (NULL == n)
694         {
695                 GNUNET_break (0);
696                 return;
697         }
698         name = (const char *) &msg[1];
699         if (name[name_len-1] != '\0')
700         {
701                 GNUNET_break (0);
702                 return;
703         }
704
705         if (name_len != strlen (name) + 1)
706         {
707                 GNUNET_break (0);
708                 return;
709         }
710
711         e = GED_experiments_find (&msg->issuer, name, GNUNET_TIME_absolute_ntoh(msg->version_nbo));
712         if (NULL == e)
713         {
714                 GNUNET_break (0);
715                 return;
716         }
717
718         GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received %s message from peer %s for experiment `%s'\n"),
719                         "START_ACK", GNUNET_i2s (peer), name);
720         GED_scheduler_handle_start_ack (n, e);
721 }
722
723 /**
724  * Handle a response
725  *
726  * @param peer the source
727  * @param message the message
728  */
729 static void handle_stop (const struct GNUNET_PeerIdentity *peer,
730                                                                                                  const struct GNUNET_MessageHeader *message)
731 {
732         uint16_t size;
733         uint32_t name_len;
734         const struct GED_stop_message *msg;
735         const char *name;
736         struct Node *n;
737         struct Experiment *e;
738
739         if (NULL == peer)
740         {
741                 GNUNET_break (0);
742                 return;
743         }
744         if (NULL == message)
745         {
746                 GNUNET_break (0);
747                 return;
748         }
749
750         size = ntohs (message->size);
751         if (size < sizeof (struct GED_stop_message))
752         {
753                 GNUNET_break (0);
754                 return;
755         }
756         msg = (const struct GED_stop_message *) message;
757         name_len = ntohl (msg->len_name);
758         if (size != sizeof (struct GED_start_message) + name_len)
759         {
760                 GNUNET_break (0);
761                 return;
762         }
763
764         n = get_node (peer);
765         if (NULL == n)
766         {
767                 GNUNET_break (0);
768                 return;
769         }
770         name = (const char *) &msg[1];
771         if (name[name_len-1] != '\0')
772         {
773                 GNUNET_break (0);
774                 return;
775         }
776
777         if (name_len != strlen (name) + 1)
778         {
779                 GNUNET_break (0);
780                 return;
781         }
782
783         e = GED_experiments_find (&msg->issuer, name, GNUNET_TIME_absolute_ntoh(msg->version_nbo));
784         if (NULL == e)
785         {
786                 GNUNET_break (0);
787                 return;
788         }
789         GED_scheduler_handle_stop (n, e);
790 }
791
792 /**
793  * Method called whenever a given peer connects.
794  *
795  * @param cls closure
796  * @param peer peer identity this notification is about
797  */
798 void core_connect_handler (void *cls,
799                            const struct GNUNET_PeerIdentity *peer)
800 {
801         if (GNUNET_YES == is_me(peer))
802                 return;
803
804         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Connected to peer %s\n"),
805                         GNUNET_i2s (peer));
806
807         if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (nodes_requested, &peer->hashPubKey))
808                 return; /* We already sent a request */
809
810         if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (nodes_active, &peer->hashPubKey))
811                 return; /* This peer is known as active  */
812
813         if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (nodes_inactive, &peer->hashPubKey))
814                 return; /* This peer is known as inactive  */
815
816         send_experimentation_request (peer);
817
818 }
819
820
821 /**
822  * Method called whenever a given peer disconnects.
823  *
824  * @param cls closure
825  * @param peer peer identity this notification is about
826  */
827 void core_disconnect_handler (void *cls,
828                            const struct GNUNET_PeerIdentity * peer)
829 {
830         if (GNUNET_YES == is_me(peer))
831                 return;
832
833         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Disconnected from peer %s\n"),
834                         GNUNET_i2s (peer));
835
836 }
837
838
839 /**
840  * Handle a request and send a response
841  *
842  * @param cls unused
843  * @param other the sender
844  * @param message the message
845  * @return GNUNET_OK to keep connection, GNUNET_SYSERR on error
846  */
847 static int
848 core_receive_handler (void *cls,
849                                                                                         const struct GNUNET_PeerIdentity *other,
850                                                                                         const struct GNUNET_MessageHeader *message)
851 {
852         if (ntohs (message->size) < sizeof (struct GNUNET_MessageHeader))
853         {
854                         GNUNET_break (0);
855                         return GNUNET_SYSERR;
856         }
857
858         switch (ntohs (message->type)) {
859                 case GNUNET_MESSAGE_TYPE_EXPERIMENTATION_REQUEST:
860                         handle_request (other, message);
861                         break;
862                 case GNUNET_MESSAGE_TYPE_EXPERIMENTATION_RESPONSE:
863                         handle_response (other, message);
864                         break;
865                 case GNUNET_MESSAGE_TYPE_EXPERIMENTATION_START:
866                         handle_start (other, message);
867                         break;
868                 case GNUNET_MESSAGE_TYPE_EXPERIMENTATION_START_ACK:
869                         handle_start_ack (other, message);
870                         break;
871                 case GNUNET_MESSAGE_TYPE_EXPERIMENTATION_STOP:
872                         handle_stop (other, message);
873                         break;
874                 default:
875                         break;
876         }
877
878         return GNUNET_OK;
879 }
880
881 struct ExperimentStartCtx
882 {
883         struct ExperimentStartCtx *prev;
884         struct ExperimentStartCtx *next;
885
886         struct Node *n;
887         struct Experiment *e;
888 };
889
890 size_t node_experiment_start_cb (void *cls, size_t bufsize, void *buf)
891 {
892         struct ExperimentStartCtx *e_ctx = cls;
893         struct GED_start_message *msg;
894         size_t name_len;
895         size_t size;
896
897         GNUNET_CONTAINER_DLL_remove (e_ctx->n->e_req_head, e_ctx->n->e_req_tail, e_ctx);
898         e_ctx->n->cth = NULL;
899         if (NULL == buf)
900         {
901                 GNUNET_free (e_ctx);
902                 return 0;
903         }
904
905         name_len = strlen(e_ctx->e->name) + 1;
906         size = sizeof (struct GED_start_message) + name_len;
907
908         msg = GNUNET_malloc (size);
909         msg->header.size = htons (size);
910         msg->header.type = htons (GNUNET_MESSAGE_TYPE_EXPERIMENTATION_START);
911         msg->issuer = e_ctx->e->issuer;
912         msg->version_nbo = GNUNET_TIME_absolute_hton(e_ctx->e->version);
913         msg->len_name = htonl (name_len);
914         memcpy (&msg[1], e_ctx->e->name, name_len);
915
916         memcpy (buf, msg, size);
917         GNUNET_free (msg);
918         GNUNET_free (e_ctx);
919         return size;
920 }
921
922
923 int
924 GED_nodes_rts (struct Node *n)
925 {
926         if (NULL == n->cth)
927                 return GNUNET_YES;
928         else
929                 return GNUNET_NO;
930
931 }
932
933 /**
934  * Request a experiment to start with a node
935  *
936  * @return GNUNET_NO if core was busy with sending, GNUNET_OK otherwise
937  */
938 int
939 GED_nodes_request_start (struct Node *n, struct Experiment *e)
940 {
941         struct ExperimentStartCtx *e_ctx;
942
943         if (NULL != n->cth)
944         {
945                 GNUNET_break (0); /* should call rts before */
946                 return GNUNET_NO;
947         }
948
949         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Sending experiment start request to peer `%s' for experiment `%s'\n"),
950                         GNUNET_i2s(&n->id), e->name);
951
952         e_ctx = GNUNET_malloc (sizeof (struct ExperimentStartCtx));
953         e_ctx->n = n;
954         e_ctx->e = e;
955         n->cth = GNUNET_CORE_notify_transmit_ready (ch, GNUNET_NO, 0, FAST_TIMEOUT, &n->id,
956                         sizeof (struct GED_start_message) + strlen (e->name) + 1,
957                         &node_experiment_start_cb, e_ctx);
958         if (NULL == n->cth)
959         {
960                 GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Cannot send experiment start request to peer `%s' for experiment `%s'\n"),
961                                 GNUNET_i2s(&n->id), e->name);
962                 GNUNET_free (e_ctx);
963         }
964         GNUNET_CONTAINER_DLL_insert (n->e_req_head, n->e_req_tail, e_ctx);
965
966         return GNUNET_OK;
967 }
968
969
970 /**
971  * Start the nodes management
972  */
973 void
974 GED_nodes_start ()
975 {
976         /* Connecting to core service to find partners */
977         ch = GNUNET_CORE_connect (GED_cfg, NULL,
978                                                                                                                 &core_startup_handler,
979                                                                                                                 &core_connect_handler,
980                                                                                                                 &core_disconnect_handler,
981                                                                                                                 &core_receive_handler,
982                                                                                                                 GNUNET_NO, NULL, GNUNET_NO, NULL);
983         if (NULL == ch)
984         {
985                         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Failed to connect to CORE service!\n"));
986                         return;
987         }
988
989         nodes_requested = GNUNET_CONTAINER_multihashmap_create (10, GNUNET_NO);
990         nodes_active = GNUNET_CONTAINER_multihashmap_create (10, GNUNET_NO);
991         nodes_inactive = GNUNET_CONTAINER_multihashmap_create (10, GNUNET_NO);
992 }
993
994
995 /**
996  * Stop the nodes management
997  */
998 void
999 GED_nodes_stop ()
1000 {
1001   if (NULL != ch)
1002   {
1003                 GNUNET_CORE_disconnect (ch);
1004                 ch = NULL;
1005   }
1006
1007   if (NULL != nodes_requested)
1008   {
1009                 GNUNET_CONTAINER_multihashmap_iterate (nodes_requested,
1010                                                                                                                                                                          &cleanup_nodes,
1011                                                                                                                                                                          nodes_requested);
1012                 update_stats (nodes_requested);
1013                 GNUNET_CONTAINER_multihashmap_destroy (nodes_requested);
1014                 nodes_requested = NULL;
1015   }
1016
1017   if (NULL != nodes_active)
1018   {
1019                 GNUNET_CONTAINER_multihashmap_iterate (nodes_active,
1020                                                                                                                                                                          &cleanup_nodes,
1021                                                                                                                                                                          nodes_active);
1022                 update_stats (nodes_active);
1023                 GNUNET_CONTAINER_multihashmap_destroy (nodes_active);
1024                 nodes_active = NULL;
1025   }
1026
1027   if (NULL != nodes_inactive)
1028   {
1029                 GNUNET_CONTAINER_multihashmap_iterate (nodes_inactive,
1030                                                                                                                                                                          &cleanup_nodes,
1031                                                                                                                                                                          nodes_inactive);
1032                 update_stats (nodes_inactive);
1033                 GNUNET_CONTAINER_multihashmap_destroy (nodes_inactive);
1034                 nodes_inactive = NULL;
1035   }
1036 }
1037
1038 /* end of gnunet-daemon-experimentation_nodes.c */