-towards fixing FTBFS in experimentation
[oweals/gnunet.git] / src / experimentation / gnunet-daemon-experimentation_nodes.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file experimentation/gnunet-daemon-experimentation_nodes.c
23  * @brief experimentation daemon: node management
24  * @author Christian Grothoff
25  * @author Matthias Wachs
26  */
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_core_service.h"
30 #include "gnunet_statistics_service.h"
31 #include "gnunet-daemon-experimentation.h"
32
33
34 #define FAST_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
35 /**
36  * Core handle
37  */
38 static struct GNUNET_CORE_Handle *ch;
39
40
41 /**
42  * Peer's own identity
43  */
44 static struct GNUNET_PeerIdentity me;
45
46
47 /**
48  * Nodes with a pending request
49  */
50 struct GNUNET_CONTAINER_MultiHashMap *nodes_requested;
51
52
53 /**
54  * Active experimentation nodes
55  */
56 struct GNUNET_CONTAINER_MultiHashMap *nodes_active;
57
58
59 /**
60  * Inactive experimentation nodes
61  * To be excluded from future requests
62  */
63 struct GNUNET_CONTAINER_MultiHashMap *nodes_inactive;
64
65 struct NodeComCtx
66 {
67         struct NodeComCtx *prev;
68         struct NodeComCtx *next;
69
70         struct Node *n;
71         struct Experiment *e;
72
73         size_t size;
74         GNUNET_CONNECTION_TransmitReadyNotify notify;
75         void *notify_cls;
76 };
77
78
79 /**
80  * Update statistics
81  *
82  * @param m hashmap to update values from
83  */
84 static void update_stats (struct GNUNET_CONTAINER_MultiHashMap *m)
85 {
86         GNUNET_assert (NULL != m);
87         GNUNET_assert (NULL != GED_stats);
88
89         if (m == nodes_active)
90         {
91                         GNUNET_STATISTICS_set (GED_stats, "# nodes active",
92                                         GNUNET_CONTAINER_multihashmap_size(m), GNUNET_NO);
93         }
94         else if (m == nodes_inactive)
95         {
96                         GNUNET_STATISTICS_set (GED_stats, "# nodes inactive",
97                                         GNUNET_CONTAINER_multihashmap_size(m), GNUNET_NO);
98         }
99         else if (m == nodes_requested)
100         {
101                         GNUNET_STATISTICS_set (GED_stats, "# nodes requested",
102                                         GNUNET_CONTAINER_multihashmap_size(m), GNUNET_NO);
103         }
104         else
105                 GNUNET_break (0);
106
107 }
108
109
110 /**
111  * Clean up node
112  *
113  * @param cls the hashmap to clean up
114  * @param key key of the current node
115  * @param value related node object
116  * @return always GNUNET_OK
117  */
118 static int
119 cleanup_node (void *cls,
120                                                          const struct GNUNET_HashCode * key,
121                                                          void *value)
122 {
123         struct Node *n;
124         struct NodeComCtx *e_cur;
125         struct NodeComCtx *e_next;
126         struct GNUNET_CONTAINER_MultiHashMap *cur = cls;
127
128         n = value;
129         if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
130         {
131                 GNUNET_SCHEDULER_cancel (n->timeout_task);
132                 n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
133         }
134
135         if (NULL != n->cth)
136         {
137                 GNUNET_CORE_notify_transmit_ready_cancel (n->cth);
138                 n->cth = NULL;
139         }
140         e_next = n->e_req_head;
141         while (NULL != (e_cur = e_next))
142         {
143                 e_next = e_cur->next;
144                 GNUNET_CONTAINER_DLL_remove (n->e_req_head, n->e_req_tail, e_cur);
145                 GNUNET_free (e_cur);
146         }
147
148         GNUNET_free_non_null (n->issuer_id);
149
150         GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_remove (cur, key, value));
151         GNUNET_free (value);
152         return GNUNET_OK;
153 }
154
155
156 /**
157  * Check if id passed is my id
158  *
159  * @param id the id to check
160  * @return GNUNET_YES or GNUNET_NO
161  */
162 static int is_me (const struct GNUNET_PeerIdentity *id)
163 {
164         if (0 == memcmp (&me, id, sizeof (me)))
165                 return GNUNET_YES;
166         else
167                 return GNUNET_NO;
168 }
169
170 /**
171  * Core startup callback
172  *
173  * @param cls unused
174  * @param my_identity my id
175  */
176 static void
177 core_startup_handler (void *cls,
178                       const struct GNUNET_PeerIdentity *my_identity)
179 {
180   me = *my_identity;
181 }
182
183
184 static void
185 schedule_transmisson (struct NodeComCtx *e_ctx);
186
187
188 static size_t
189 transmit_read_wrapper (void *cls, size_t bufsize, void *buf)
190 {
191         struct NodeComCtx *e_ctx = cls;
192         struct NodeComCtx *next = NULL;
193
194         size_t res = e_ctx->notify (e_ctx->notify_cls, bufsize, buf);
195         e_ctx->n->cth = NULL;
196
197         GNUNET_CONTAINER_DLL_remove (e_ctx->n->e_req_head, e_ctx->n->e_req_tail, e_ctx);
198         next = e_ctx->n->e_req_head;
199         GNUNET_free (e_ctx);
200
201         if (NULL != next)
202         {
203                 /* Schedule next message */
204                 schedule_transmisson (next);
205         }
206         return res;
207 }
208
209
210 static void
211 schedule_transmisson (struct NodeComCtx *e_ctx)
212 {
213         if (NULL != e_ctx->n->cth)
214                 return;
215
216         e_ctx->n->cth = GNUNET_CORE_notify_transmit_ready (ch, GNUNET_NO, 0, FAST_TIMEOUT,
217                         &e_ctx->n->id, e_ctx->size, transmit_read_wrapper, e_ctx);
218         if (NULL == e_ctx->n->cth)
219         {
220                 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, _("Cannot send message to peer `%s' for experiment `%s'\n"),
221                                 GNUNET_i2s(&e_ctx->n->id), e_ctx->e->name);
222                 GNUNET_free (e_ctx);
223         }
224
225 }
226
227
228 /**
229  * Remove experimentation request due to timeout
230  *
231  * @param cls the related node
232  * @param tc scheduler's task context
233  */
234 static void
235 remove_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
236 {
237         struct Node *n = cls;
238
239         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Removing request for peer %s due to timeout\n",
240                         GNUNET_i2s (&n->id));
241
242         if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (nodes_requested, &n->id.hashPubKey))
243         {
244                         GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_remove (nodes_requested, &n->id.hashPubKey, n));
245                         update_stats (nodes_requested);
246                         GNUNET_CONTAINER_multihashmap_put (nodes_inactive, &n->id.hashPubKey, n,
247                                         GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
248                         update_stats (nodes_inactive);
249         }
250         n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
251 }
252
253
254 /**
255  * Core's transmit notify callback to send request
256  *
257  * @param cls the related node
258  * @param bufsize buffer size
259  * @param buf the buffer to copy to
260  * @return bytes passed
261  */
262 static size_t 
263 send_experimentation_request_cb (void *cls, size_t bufsize, void *buf)
264 {
265   struct Node *n = cls;
266   struct Experimentation_Request msg;
267   size_t msg_size = sizeof (msg);
268   size_t ri_size = sizeof (struct Experimentation_Issuer) * GSE_my_issuer_count;
269   size_t total_size = msg_size + ri_size;
270         
271   n->cth = NULL;
272   if (NULL == buf)
273   {
274     /* client disconnected */
275     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client disconnected\n");
276     if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
277                 GNUNET_SCHEDULER_cancel (n->timeout_task);
278     GNUNET_SCHEDULER_add_now (&remove_request, n);
279     return 0;
280   }
281   memset (buf, '\0', bufsize);
282   GNUNET_assert (bufsize >= total_size);
283
284         msg.msg.size = htons (total_size);
285         msg.msg.type = htons (GNUNET_MESSAGE_TYPE_EXPERIMENTATION_REQUEST);
286         msg.capabilities = htonl (GSE_node_capabilities);
287         msg.issuer_count = htonl (GSE_my_issuer_count);
288         memcpy (buf, &msg, msg_size);
289         memcpy (&((char *) buf)[msg_size], GSE_my_issuer, ri_size);
290
291         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Sending experimentation request to peer %s\n"),
292                         GNUNET_i2s (&n->id));
293         return total_size;
294 }
295
296
297 /**
298  * Send request to peer to start add him to to the set of experimentation nodes
299  *
300  * @param peer the peer to send to
301  */
302 static void 
303 send_experimentation_request (const struct GNUNET_PeerIdentity *peer)
304 {
305         struct Node *n;
306         struct NodeComCtx *e_ctx;
307         size_t size;
308         size_t c_issuers;
309
310         c_issuers = GSE_my_issuer_count;
311
312         size = sizeof (struct Experimentation_Request) +
313                                  c_issuers * sizeof (struct Experimentation_Issuer);
314         n = GNUNET_malloc (sizeof (struct Node));
315         n->id = *peer;
316         n->timeout_task = GNUNET_SCHEDULER_add_delayed (EXP_RESPONSE_TIMEOUT, &remove_request, n);
317         n->capabilities = NONE;
318
319         e_ctx = GNUNET_malloc (sizeof (struct NodeComCtx));
320         e_ctx->n = n;
321         e_ctx->e = NULL;
322         e_ctx->size = size;
323         e_ctx->notify = &send_experimentation_request_cb;
324         e_ctx->notify_cls = n;
325         GNUNET_CONTAINER_DLL_insert_tail(n->e_req_head, n->e_req_tail, e_ctx);
326         schedule_transmisson (e_ctx);
327
328         GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put (nodes_requested,
329                         &peer->hashPubKey, n, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
330         update_stats (nodes_requested);
331 }
332
333
334 /**
335  * Core's transmit notify callback to send response
336  *
337  * @param cls the related node
338  * @param bufsize buffer size
339  * @param buf the buffer to copy to
340  * @return bytes passed
341  */
342 size_t send_response_cb (void *cls, size_t bufsize, void *buf)
343 {
344         struct Node *n = cls;
345         struct Experimentation_Response msg;
346         size_t ri_size = GSE_my_issuer_count * sizeof (struct Experimentation_Issuer);
347         size_t msg_size = sizeof (msg);
348         size_t total_size = msg_size + ri_size;
349
350         n->cth = NULL;
351   if (buf == NULL)
352   {
353     /* client disconnected */
354     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client disconnected\n");
355     return 0;
356   }
357   GNUNET_assert (bufsize >= total_size);
358
359         msg.msg.size = htons (total_size);
360         msg.msg.type = htons (GNUNET_MESSAGE_TYPE_EXPERIMENTATION_RESPONSE);
361         msg.capabilities = htonl (GSE_node_capabilities);
362         msg.issuer_count = htonl (GSE_my_issuer_count);
363         memcpy (buf, &msg, msg_size);
364         memcpy (&((char *) buf)[msg_size], GSE_my_issuer, ri_size);
365
366         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending response to peer %s\n",
367                         GNUNET_i2s (&n->id));
368         return total_size;
369 }
370
371
372 static void
373 get_experiments_cb (struct Node *n, struct Experiment *e)
374 {
375         static int counter = 0;
376         if (NULL == e)
377                         return; /* Done */
378
379         /* Tell the scheduler to add a node with an experiment */
380         GED_scheduler_add (n, e, GNUNET_YES);
381         counter ++;
382 }
383
384 struct Node *
385 get_node (const struct GNUNET_PeerIdentity *id)
386 {
387         struct Node * res;
388         struct Node * tmp;
389
390         res = NULL;
391         tmp = NULL;
392         tmp = GNUNET_CONTAINER_multihashmap_get (nodes_active, &id->hashPubKey);
393         if (res == NULL)
394                 res = tmp;
395
396         tmp = GNUNET_CONTAINER_multihashmap_get (nodes_inactive, &id->hashPubKey);
397         if (res == NULL)
398                 res = tmp;
399         else
400                 GNUNET_break (0); /* Multiple instances */
401
402         tmp = GNUNET_CONTAINER_multihashmap_get (nodes_requested, &id->hashPubKey);
403         if (res == NULL)
404                 res = tmp;
405         else
406                 GNUNET_break (0); /* Multiple instances */
407
408         return res;
409 }
410
411
412 /**
413  * Set a specific node as active
414  *
415  * @param n the node
416  */
417 static void node_make_active (struct Node *n)
418 {
419         int c1;
420   GNUNET_CONTAINER_multihashmap_put (nodes_active,
421                         &n->id.hashPubKey, n, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
422         update_stats (nodes_active);
423         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Added peer `%s' as active node\n"),
424                         GNUNET_i2s (&n->id));
425         /* Request experiments for this node to start them */
426         for (c1 = 0; c1 < n->issuer_count; c1++)
427         {
428
429                 GED_experiments_get (n, &n->issuer_id[c1], &get_experiments_cb);
430         }
431 }
432
433
434 /**
435  * Handle a request and send a response
436  *
437  * @param peer the source
438  * @param message the message
439  */
440 static void handle_request (const struct GNUNET_PeerIdentity *peer,
441                                                                                                                 const struct GNUNET_MessageHeader *message)
442 {
443         struct Node *n;
444         struct NodeComCtx *e_ctx;
445         struct Experimentation_Request *rm = (struct Experimentation_Request *) message;
446         struct Experimentation_Issuer *rmi = (struct Experimentation_Issuer *) &rm[1];
447         int c1;
448         int c2;
449         uint32_t ic;
450         uint32_t ic_accepted;
451         int make_active;
452
453         if (ntohs (message->size) < sizeof (struct Experimentation_Request))
454         {
455                 GNUNET_break (0);
456                 return;
457         }
458         ic = ntohl (rm->issuer_count);
459         if (ntohs (message->size) != sizeof (struct Experimentation_Request) + ic * sizeof (struct Experimentation_Issuer))
460         {
461                 GNUNET_break (0);
462                 return;
463         }
464
465         make_active = GNUNET_NO;
466         if (NULL != (n = GNUNET_CONTAINER_multihashmap_get (nodes_active, &peer->hashPubKey)))
467         {
468                         /* Nothing to do */
469         }
470         else if (NULL != (n = GNUNET_CONTAINER_multihashmap_get (nodes_requested, &peer->hashPubKey)))
471         {
472                         GNUNET_CONTAINER_multihashmap_remove (nodes_requested, &peer->hashPubKey, n);
473                         if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
474                         {
475                                 GNUNET_SCHEDULER_cancel (n->timeout_task);
476                                 n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
477                         }
478                         update_stats (nodes_requested);
479                         make_active = GNUNET_YES;
480         }
481         else if (NULL != (n = GNUNET_CONTAINER_multihashmap_get (nodes_inactive, &peer->hashPubKey)))
482         {
483                   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_remove (nodes_inactive, &peer->hashPubKey, n));
484                         update_stats (nodes_inactive);
485                         make_active = GNUNET_YES;
486         }
487         else
488         {
489                         /* Create new node */
490                         n = GNUNET_malloc (sizeof (struct Node));
491                         n->id = *peer;
492                         n->capabilities = NONE;
493                         make_active = GNUNET_YES;
494         }
495
496         /* Update node */
497         n->capabilities = ntohl (rm->capabilities);
498
499         /* Filter accepted issuer */
500         ic_accepted = 0;
501         for (c1 = 0; c1 < ic; c1++)
502         {
503                 if (GNUNET_YES == GED_experiments_issuer_accepted(&rmi[c1].issuer_id))
504                         ic_accepted ++;
505         }
506         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Request from peer `%s' with %u issuers, we accepted %u issuer \n",
507                         GNUNET_i2s (peer), ic, ic_accepted);
508         GNUNET_free_non_null (n->issuer_id);
509         n->issuer_id = GNUNET_malloc (ic_accepted * sizeof (struct GNUNET_PeerIdentity));
510         c2 = 0;
511         for (c1 = 0; c1 < ic; c1++)
512         {
513                         if (GNUNET_YES == GED_experiments_issuer_accepted(&rmi[c1].issuer_id))
514                         {
515                                 n->issuer_id[c2] = rmi[c1].issuer_id;
516                                 c2 ++;
517                         }
518         }
519         n->issuer_count = ic_accepted;
520
521         if (GNUNET_YES == make_active)
522                 node_make_active (n);
523
524         /* Send response */
525         e_ctx = GNUNET_malloc (sizeof (struct NodeComCtx));
526         e_ctx->n = n;
527         e_ctx->e = NULL;
528         e_ctx->size = sizeof (struct Experimentation_Response) + GSE_my_issuer_count * sizeof (struct Experimentation_Issuer);
529         e_ctx->notify = &send_response_cb;
530         e_ctx->notify_cls = n;
531
532         GNUNET_CONTAINER_DLL_insert_tail(n->e_req_head, n->e_req_tail, e_ctx);
533         schedule_transmisson (e_ctx);
534 }
535
536
537 /**
538  * Handle a response
539  *
540  * @param peer the source
541  * @param message the message
542  */
543 static void handle_response (const struct GNUNET_PeerIdentity *peer,
544                                                                                                                  const struct GNUNET_MessageHeader *message)
545 {
546         struct Node *n;
547         struct Experimentation_Response *rm = (struct Experimentation_Response *) message;
548         struct Experimentation_Issuer *rmi = (struct Experimentation_Issuer *) &rm[1];
549         uint32_t ic;
550         uint32_t ic_accepted;
551         int make_active;
552         unsigned int c1;
553         unsigned int c2;
554
555         if (ntohs (message->size) < sizeof (struct Experimentation_Response))
556         {
557                 GNUNET_break (0);
558                 return;
559         }
560         ic = ntohl (rm->issuer_count);
561         if (ntohs (message->size) != sizeof (struct Experimentation_Response) + ic * sizeof (struct Experimentation_Issuer))
562         {
563                 GNUNET_break (0);
564                 return;
565         }
566
567         make_active = GNUNET_NO;
568         if (NULL != (n = GNUNET_CONTAINER_multihashmap_get (nodes_active, &peer->hashPubKey)))
569         {
570                         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received %s from %s peer `%s'\n",
571                                         "RESPONSE", "active", GNUNET_i2s (peer));
572         }
573         else if (NULL != (n = GNUNET_CONTAINER_multihashmap_get (nodes_requested, &peer->hashPubKey)))
574         {
575                         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received %s from %s peer `%s'\n",
576                                         "RESPONSE", "requested", GNUNET_i2s (peer));
577                         GNUNET_CONTAINER_multihashmap_remove (nodes_requested, &peer->hashPubKey, n);
578                         if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
579                         {
580                                 GNUNET_SCHEDULER_cancel (n->timeout_task);
581                                 n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
582                         }
583                         update_stats (nodes_requested);
584                         make_active = GNUNET_YES;
585         }
586         else if (NULL != (n = GNUNET_CONTAINER_multihashmap_get (nodes_inactive, &peer->hashPubKey)))
587         {
588                         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received %s from peer `%s'\n",
589                                         "RESPONSE", "inactive", GNUNET_i2s (peer));
590                         GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_remove (nodes_inactive, &peer->hashPubKey, n));
591                         update_stats (nodes_inactive);
592                         make_active = GNUNET_YES;
593         }
594         else
595         {
596                         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received %s from %s peer `%s'\n",
597                                         "RESPONSE", "unknown", GNUNET_i2s (peer));
598                         return;
599         }
600
601         /* Update */
602         n->capabilities = ntohl (rm->capabilities);
603
604         /* Filter accepted issuer */
605         ic_accepted = 0;
606         for (c1 = 0; c1 < ic; c1++)
607         {
608                 if (GNUNET_YES == GED_experiments_issuer_accepted(&rmi[c1].issuer_id))
609                         ic_accepted ++;
610         }
611         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Response from peer `%s' with %u issuers, we accepted %u issuer \n",
612                         GNUNET_i2s (peer), ic, ic_accepted);
613         GNUNET_free_non_null (n->issuer_id);
614         n->issuer_id = GNUNET_malloc (ic_accepted * sizeof (struct GNUNET_PeerIdentity));
615         c2 = 0;
616         for (c1 = 0; c1 < ic; c1++)
617         {
618                         if (GNUNET_YES == GED_experiments_issuer_accepted(&rmi[c1].issuer_id))
619                         {
620                                 n->issuer_id[c2] = rmi[c1].issuer_id;
621                                 c2 ++;
622                         }
623         }
624         n->issuer_count = ic_accepted;
625
626         if (GNUNET_YES == make_active)
627                 node_make_active (n);
628 }
629
630 /**
631  * Handle a response
632  *
633  * @param peer the source
634  * @param message the message
635  */
636 static void handle_start (const struct GNUNET_PeerIdentity *peer,
637                                                                                                                  const struct GNUNET_MessageHeader *message)
638 {
639         uint16_t size;
640         uint32_t name_len;
641         const struct GED_start_message *msg;
642         const char *name;
643         struct Node *n;
644         struct Experiment *e;
645
646         if (NULL == peer)
647         {
648                 GNUNET_break (0);
649                 return;
650         }
651         if (NULL == message)
652         {
653                 GNUNET_break (0);
654                 return;
655         }
656
657         size = ntohs (message->size);
658         if (size < sizeof (struct GED_start_message))
659         {
660                 GNUNET_break (0);
661                 return;
662         }
663         msg = (const struct GED_start_message *) message;
664         name_len = ntohl (msg->len_name);
665         if (size != sizeof (struct GED_start_message) + name_len)
666         {
667                 GNUNET_break (0);
668                 return;
669         }
670
671         n = get_node (peer);
672         if (NULL == n)
673         {
674                 GNUNET_break (0);
675                 return;
676         }
677         name = (const char *) &msg[1];
678         if (name[name_len-1] != '\0')
679         {
680                 GNUNET_break (0);
681                 return;
682         }
683
684         if (name_len != strlen (name) + 1)
685         {
686                 GNUNET_break (0);
687                 return;
688         }
689
690         e = GED_experiments_find (&msg->issuer, name, GNUNET_TIME_absolute_ntoh(msg->version_nbo));
691         if (NULL == e)
692         {
693                 GNUNET_break (0);
694                 return;
695         }
696
697         GED_scheduler_handle_start (n, e);
698 }
699
700 /**
701  * Handle a response
702  *
703  * @param peer the source
704  * @param message the message
705  */
706 static void handle_start_ack (const struct GNUNET_PeerIdentity *peer,
707                                                                                                                  const struct GNUNET_MessageHeader *message)
708 {
709         uint16_t size;
710         uint32_t name_len;
711         const struct GED_start_ack_message *msg;
712         const char *name;
713         struct Node *n;
714         struct Experiment *e;
715
716         if (NULL == peer)
717         {
718                 GNUNET_break (0);
719                 return;
720         }
721         if (NULL == message)
722         {
723                 GNUNET_break (0);
724                 return;
725         }
726
727         size = ntohs (message->size);
728         if (size < sizeof (struct GED_start_ack_message))
729         {
730                 GNUNET_break (0);
731                 return;
732         }
733         msg = (const struct GED_start_ack_message *) message;
734         name_len = ntohl (msg->len_name);
735         if (size != sizeof (struct GED_start_message) + name_len)
736         {
737                 GNUNET_break (0);
738                 return;
739         }
740
741         n = get_node (peer);
742         if (NULL == n)
743         {
744                 GNUNET_break (0);
745                 return;
746         }
747         name = (const char *) &msg[1];
748         if (name[name_len-1] != '\0')
749         {
750                 GNUNET_break (0);
751                 return;
752         }
753
754         if (name_len != strlen (name) + 1)
755         {
756                 GNUNET_break (0);
757                 return;
758         }
759
760         e = GED_experiments_find (&msg->issuer, name, GNUNET_TIME_absolute_ntoh(msg->version_nbo));
761         if (NULL == e)
762         {
763                 GNUNET_break (0);
764                 return;
765         }
766         GED_scheduler_handle_start_ack (n, e);
767 }
768
769 /**
770  * Handle a response
771  *
772  * @param peer the source
773  * @param message the message
774  */
775 static void handle_stop (const struct GNUNET_PeerIdentity *peer,
776                                                                                                  const struct GNUNET_MessageHeader *message)
777 {
778         uint16_t size;
779         uint32_t name_len;
780         const struct GED_stop_message *msg;
781         const char *name;
782         struct Node *n;
783         struct Experiment *e;
784
785         if (NULL == peer)
786         {
787                 GNUNET_break (0);
788                 return;
789         }
790         if (NULL == message)
791         {
792                 GNUNET_break (0);
793                 return;
794         }
795
796         size = ntohs (message->size);
797         if (size < sizeof (struct GED_stop_message))
798         {
799                 GNUNET_break (0);
800                 return;
801         }
802         msg = (const struct GED_stop_message *) message;
803         name_len = ntohl (msg->len_name);
804         if (size != sizeof (struct GED_start_message) + name_len)
805         {
806                 GNUNET_break (0);
807                 return;
808         }
809
810         n = get_node (peer);
811         if (NULL == n)
812         {
813                 GNUNET_break (0);
814                 return;
815         }
816         name = (const char *) &msg[1];
817         if (name[name_len-1] != '\0')
818         {
819                 GNUNET_break (0);
820                 return;
821         }
822
823         if (name_len != strlen (name) + 1)
824         {
825                 GNUNET_break (0);
826                 return;
827         }
828
829         e = GED_experiments_find (&msg->issuer, name, GNUNET_TIME_absolute_ntoh(msg->version_nbo));
830         if (NULL == e)
831         {
832                 GNUNET_break (0);
833                 return;
834         }
835         GED_scheduler_handle_stop (n, e);
836 }
837
838 /**
839  * Method called whenever a given peer connects.
840  *
841  * @param cls closure
842  * @param peer peer identity this notification is about
843  */
844 void core_connect_handler (void *cls,
845                            const struct GNUNET_PeerIdentity *peer)
846 {
847         if (GNUNET_YES == is_me(peer))
848                 return;
849
850         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Connected to peer %s\n"),
851                         GNUNET_i2s (peer));
852
853         if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (nodes_requested, &peer->hashPubKey))
854                 return; /* We already sent a request */
855
856         if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (nodes_active, &peer->hashPubKey))
857                 return; /* This peer is known as active  */
858
859         if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (nodes_inactive, &peer->hashPubKey))
860                 return; /* This peer is known as inactive  */
861
862         send_experimentation_request (peer);
863 }
864
865
866 /**
867  * Method called whenever a given peer disconnects.
868  *
869  * @param cls closure
870  * @param peer peer identity this notification is about
871  */
872 void core_disconnect_handler (void *cls,
873                            const struct GNUNET_PeerIdentity * peer)
874 {
875         struct Node *n;
876         if (GNUNET_YES == is_me(peer))
877                 return;
878
879         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Disconnected from peer %s\n"),
880                         GNUNET_i2s (peer));
881
882         if (NULL != (n = GNUNET_CONTAINER_multihashmap_get (nodes_requested, &peer->hashPubKey)))
883                 cleanup_node (nodes_requested, &peer->hashPubKey, n);
884
885         if (NULL != (n = GNUNET_CONTAINER_multihashmap_get (nodes_active, &peer->hashPubKey)))
886                 cleanup_node (nodes_active, &peer->hashPubKey, n);
887
888         if (NULL != (n = GNUNET_CONTAINER_multihashmap_get (nodes_inactive, &peer->hashPubKey)))
889                 cleanup_node (nodes_inactive, &peer->hashPubKey, n);
890 }
891
892
893 /**
894  * Handle a request and send a response
895  *
896  * @param cls unused
897  * @param other the sender
898  * @param message the message
899  * @return GNUNET_OK to keep connection, GNUNET_SYSERR on error
900  */
901 static int
902 core_receive_handler (void *cls,
903                                                                                         const struct GNUNET_PeerIdentity *other,
904                                                                                         const struct GNUNET_MessageHeader *message)
905 {
906         if (ntohs (message->size) < sizeof (struct GNUNET_MessageHeader))
907         {
908                         GNUNET_break (0);
909                         return GNUNET_SYSERR;
910         }
911
912         switch (ntohs (message->type)) {
913                 case GNUNET_MESSAGE_TYPE_EXPERIMENTATION_REQUEST:
914                         handle_request (other, message);
915                         break;
916                 case GNUNET_MESSAGE_TYPE_EXPERIMENTATION_RESPONSE:
917                         handle_response (other, message);
918                         break;
919                 case GNUNET_MESSAGE_TYPE_EXPERIMENTATION_START:
920                         handle_start (other, message);
921                         break;
922                 case GNUNET_MESSAGE_TYPE_EXPERIMENTATION_START_ACK:
923                         handle_start_ack (other, message);
924                         break;
925                 case GNUNET_MESSAGE_TYPE_EXPERIMENTATION_STOP:
926                         handle_stop (other, message);
927                         break;
928                 default:
929                         break;
930         }
931
932         return GNUNET_OK;
933 }
934
935
936 size_t node_experiment_start_cb (void *cls, size_t bufsize, void *buf)
937 {
938         struct NodeComCtx *e_ctx = cls;
939         struct GED_start_message *msg;
940         size_t name_len;
941         size_t size;
942
943         if (NULL == buf)
944                 return 0;
945
946         name_len = strlen(e_ctx->e->name) + 1;
947         size = sizeof (struct GED_start_message) + name_len;
948
949         msg = GNUNET_malloc (size);
950         msg->header.size = htons (size);
951         msg->header.type = htons (GNUNET_MESSAGE_TYPE_EXPERIMENTATION_START);
952         msg->issuer = e_ctx->e->issuer;
953         msg->version_nbo = GNUNET_TIME_absolute_hton(e_ctx->e->version);
954         msg->len_name = htonl (name_len);
955         memcpy (&msg[1], e_ctx->e->name, name_len);
956
957         memcpy (buf, msg, size);
958         GNUNET_free (msg);
959         return size;
960 }
961
962 size_t node_experiment_start_ack_cb (void *cls, size_t bufsize, void *buf)
963 {
964         struct NodeComCtx *e_ctx = cls;
965         struct GED_start_ack_message *msg;
966         size_t name_len;
967         size_t size;
968         if (NULL == buf)
969                 return 0;
970
971         name_len = strlen(e_ctx->e->name) + 1;
972         size = sizeof (struct GED_start_ack_message) + name_len;
973
974         msg = GNUNET_malloc (size);
975         msg->header.size = htons (size);
976         msg->header.type = htons (GNUNET_MESSAGE_TYPE_EXPERIMENTATION_START_ACK);
977         msg->issuer = e_ctx->e->issuer;
978         msg->version_nbo = GNUNET_TIME_absolute_hton(e_ctx->e->version);
979         msg->len_name = htonl (name_len);
980         memcpy (&msg[1], e_ctx->e->name, name_len);
981
982         memcpy (buf, msg, size);
983         GNUNET_free (msg);
984         return size;
985 }
986
987
988
989
990 /**
991  * Confirm a experiment START with a node
992  *
993  * @return GNUNET_NO if core was busy with sending, GNUNET_OK otherwise
994  */
995 int
996 GED_nodes_send_start_ack (struct Node *n, struct Experiment *e)
997 {
998         struct NodeComCtx *e_ctx;
999
1000         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1001                         "Sending %s for experiment request to peer `%s' for experiment `%s'\n",
1002                         "START_ACK" ,GNUNET_i2s(&n->id), e->name);
1003
1004         e_ctx = GNUNET_malloc (sizeof (struct NodeComCtx));
1005         e_ctx->n = n;
1006         e_ctx->e = e;
1007         e_ctx->size = sizeof (struct GED_start_ack_message) + strlen (e->name) + 1;
1008         e_ctx->notify = &node_experiment_start_ack_cb;
1009         e_ctx->notify_cls = e_ctx;
1010
1011         GNUNET_CONTAINER_DLL_insert_tail (n->e_req_head, n->e_req_tail, e_ctx);
1012         schedule_transmisson (e_ctx);
1013         return GNUNET_OK;
1014 }
1015
1016
1017 /**
1018  * Request a experiment to start with a node
1019  *
1020  * @return GNUNET_NO if core was busy with sending, GNUNET_OK otherwise
1021  */
1022 int
1023 GED_nodes_send_start (struct Node *n, struct Experiment *e)
1024 {
1025         struct NodeComCtx *e_ctx;
1026
1027         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1028                         "Sending %s for experiment request to peer `%s' for experiment `%s'\n",
1029                         "START", GNUNET_i2s(&n->id), e->name);
1030
1031         e_ctx = GNUNET_malloc (sizeof (struct NodeComCtx));
1032         e_ctx->n = n;
1033         e_ctx->e = e;
1034         e_ctx->size = sizeof (struct GED_start_message) + strlen (e->name) + 1;
1035         e_ctx->notify = &node_experiment_start_cb;
1036         e_ctx->notify_cls = e_ctx;
1037
1038         GNUNET_CONTAINER_DLL_insert_tail (n->e_req_head, n->e_req_tail, e_ctx);
1039         schedule_transmisson (e_ctx);
1040         return GNUNET_OK;
1041 }
1042
1043
1044 /**
1045  * Start the nodes management
1046  */
1047 void
1048 GED_nodes_start ()
1049 {
1050         /* Connecting to core service to find partners */
1051         ch = GNUNET_CORE_connect (GED_cfg, NULL,
1052                                                                                                                 &core_startup_handler,
1053                                                                                                                 &core_connect_handler,
1054                                                                                                                 &core_disconnect_handler,
1055                                                                                                                 &core_receive_handler,
1056                                                                                                                 GNUNET_NO, NULL, GNUNET_NO, NULL);
1057         if (NULL == ch)
1058         {
1059                         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Failed to connect to CORE service!\n"));
1060                         return;
1061         }
1062
1063         nodes_requested = GNUNET_CONTAINER_multihashmap_create (10, GNUNET_NO);
1064         nodes_active = GNUNET_CONTAINER_multihashmap_create (10, GNUNET_NO);
1065         nodes_inactive = GNUNET_CONTAINER_multihashmap_create (10, GNUNET_NO);
1066 }
1067
1068
1069 /**
1070  * Stop the nodes management
1071  */
1072 void
1073 GED_nodes_stop ()
1074 {
1075   if (NULL != ch)
1076   {
1077                 GNUNET_CORE_disconnect (ch);
1078                 ch = NULL;
1079   }
1080
1081   if (NULL != nodes_requested)
1082   {
1083                 GNUNET_CONTAINER_multihashmap_iterate (nodes_requested,
1084                                                                                                                                                                          &cleanup_node,
1085                                                                                                                                                                          nodes_requested);
1086                 update_stats (nodes_requested);
1087                 GNUNET_CONTAINER_multihashmap_destroy (nodes_requested);
1088                 nodes_requested = NULL;
1089   }
1090
1091   if (NULL != nodes_active)
1092   {
1093                 GNUNET_CONTAINER_multihashmap_iterate (nodes_active,
1094                                                                                                                                                                          &cleanup_node,
1095                                                                                                                                                                          nodes_active);
1096                 update_stats (nodes_active);
1097                 GNUNET_CONTAINER_multihashmap_destroy (nodes_active);
1098                 nodes_active = NULL;
1099   }
1100
1101   if (NULL != nodes_inactive)
1102   {
1103                 GNUNET_CONTAINER_multihashmap_iterate (nodes_inactive,
1104                                                                                                                                                                          &cleanup_node,
1105                                                                                                                                                                          nodes_inactive);
1106                 update_stats (nodes_inactive);
1107                 GNUNET_CONTAINER_multihashmap_destroy (nodes_inactive);
1108                 nodes_inactive = NULL;
1109   }
1110 }
1111
1112 /* end of gnunet-daemon-experimentation_nodes.c */