-messing up experimentation some more, towards using egos instead of peer identities
[oweals/gnunet.git] / src / experimentation / gnunet-daemon-experimentation_nodes.c
1 /*
2      This file is part of GNUnet.
3      (C) 2012-2013 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file experimentation/gnunet-daemon-experimentation_nodes.c
23  * @brief experimentation daemon: node management
24  * @author Christian Grothoff
25  * @author Matthias Wachs
26  */
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_core_service.h"
30 #include "gnunet_statistics_service.h"
31 #include "gnunet-daemon-experimentation.h"
32
33
34 #define FAST_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
35
36 /**
37  * Core handle
38  */
39 static struct GNUNET_CORE_Handle *ch;
40
41 /**
42  * Peer's own identity
43  */
44 static struct GNUNET_PeerIdentity me;
45
46 /**
47  * Nodes with a pending request
48  */
49 static struct GNUNET_CONTAINER_MultiPeerMap *nodes_requested;
50
51 /**
52  * Active experimentation nodes
53  */
54 static struct GNUNET_CONTAINER_MultiPeerMap *nodes_active;
55
56 /**
57  * Inactive experimentation nodes
58  * To be excluded from future requests
59  */
60 static struct GNUNET_CONTAINER_MultiPeerMap *nodes_inactive;
61
62
63 struct NodeComCtx
64 {
65   struct NodeComCtx *prev;
66   struct NodeComCtx *next;
67   
68   struct Node *n;
69   struct Experiment *e;
70   
71   size_t size;
72   GNUNET_CONNECTION_TransmitReadyNotify notify;
73   void *notify_cls;
74 };
75
76
77 /**
78  * Update statistics
79  *
80  * @param m peermap to update values from
81  */
82 static void 
83 update_stats (struct GNUNET_CONTAINER_MultiPeerMap *m)
84 {
85   GNUNET_assert (NULL != m);
86   GNUNET_assert (NULL != GED_stats);
87
88   if (m == nodes_active)
89   {
90     GNUNET_STATISTICS_set (GED_stats, "# nodes active",
91                            GNUNET_CONTAINER_multipeermap_size(m), GNUNET_NO);
92   }
93   else if (m == nodes_inactive)
94   {
95     GNUNET_STATISTICS_set (GED_stats, "# nodes inactive",
96                            GNUNET_CONTAINER_multipeermap_size(m), GNUNET_NO);
97   }
98   else if (m == nodes_requested)
99   {
100     GNUNET_STATISTICS_set (GED_stats, "# nodes requested",
101                            GNUNET_CONTAINER_multipeermap_size(m), GNUNET_NO);
102   }
103   else
104     GNUNET_break (0);
105 }
106
107
108 /**
109  * Clean up node
110  *
111  * @param cls the peermap to clean up
112  * @param key key of the current node
113  * @param value related node object
114  * @return always #GNUNET_OK
115  */
116 static int
117 cleanup_node (void *cls,
118               const struct GNUNET_PeerIdentity * key,
119               void *value)
120 {
121   struct Node *n;
122   struct NodeComCtx *e_cur;
123   struct NodeComCtx *e_next;
124   struct GNUNET_CONTAINER_MultiPeerMap *cur = cls;
125   
126   n = value;
127   if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
128   {
129     GNUNET_SCHEDULER_cancel (n->timeout_task);
130     n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
131   }
132   
133   if (NULL != n->cth)
134   {
135     GNUNET_CORE_notify_transmit_ready_cancel (n->cth);
136     n->cth = NULL;
137   }
138   e_next = n->e_req_head;
139   while (NULL != (e_cur = e_next))
140   {
141     e_next = e_cur->next;
142     GNUNET_CONTAINER_DLL_remove (n->e_req_head, n->e_req_tail, e_cur);
143     GNUNET_free (e_cur);
144   }
145   GNUNET_break (0 == GNUNET_CONTAINER_multipeermap_remove (cur, key, value));
146   GNUNET_free (value);
147   return GNUNET_OK;
148 }
149
150
151 /**
152  * Check if id passed is my id
153  *
154  * @param id the id to check
155  * @return GNUNET_YES or GNUNET_NO
156  */
157 static int 
158 is_me (const struct GNUNET_PeerIdentity *id)
159 {
160   if (0 == memcmp (&me, id, sizeof (me)))
161     return GNUNET_YES;
162   else
163     return GNUNET_NO;
164 }
165
166
167 /**
168  * Core startup callback
169  *
170  * @param cls unused
171  * @param my_identity my id
172  */
173 static void
174 core_startup_handler (void *cls,
175                       const struct GNUNET_PeerIdentity *my_identity)
176 {
177   me = *my_identity;
178 }
179
180
181 static void
182 schedule_transmisson (struct NodeComCtx *e_ctx);
183
184
185 static size_t
186 transmit_read_wrapper (void *cls, size_t bufsize, void *buf)
187 {
188   struct NodeComCtx *e_ctx = cls;
189   struct NodeComCtx *next;
190   
191   size_t res = e_ctx->notify (e_ctx->notify_cls, bufsize, buf);
192   e_ctx->n->cth = NULL;
193   
194   GNUNET_CONTAINER_DLL_remove (e_ctx->n->e_req_head, e_ctx->n->e_req_tail, e_ctx);
195   next = e_ctx->n->e_req_head;
196   GNUNET_free (e_ctx);
197   
198   if (NULL != next)
199   {
200     /* Schedule next message */
201     schedule_transmisson (next);
202   }
203   return res;
204 }
205
206
207 static void
208 schedule_transmisson (struct NodeComCtx *e_ctx)
209 {
210   if (NULL != e_ctx->n->cth)
211     return;
212   
213   e_ctx->n->cth = GNUNET_CORE_notify_transmit_ready (ch, GNUNET_NO, 0, FAST_TIMEOUT,
214                                                      &e_ctx->n->id, e_ctx->size, 
215                                                      transmit_read_wrapper, e_ctx);
216   if (NULL == e_ctx->n->cth)
217   {
218     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
219                 _("Cannot send message to peer `%s' for experiment `%s'\n"),
220                 GNUNET_i2s(&e_ctx->n->id), e_ctx->e->name);
221     GNUNET_free (e_ctx);
222   }  
223 }
224
225
226 /**
227  * Remove experimentation request due to timeout
228  *
229  * @param cls the related node
230  * @param tc scheduler's task context
231  */
232 static void
233 remove_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
234 {
235   struct Node *n = cls;
236   
237   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
238               "Removing request for peer %s due to timeout\n",
239               GNUNET_i2s (&n->id));  
240   if (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (nodes_requested, &n->id))
241   {
242     GNUNET_break (0 == GNUNET_CONTAINER_multipeermap_remove (nodes_requested, &n->id, n));
243     update_stats (nodes_requested);
244     GNUNET_CONTAINER_multipeermap_put (nodes_inactive, &n->id, n,
245                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
246     update_stats (nodes_inactive);
247   }
248   n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
249 }
250
251
252 static int
253 append_public_key (void *cls,
254                    const struct GNUNET_HashCode *key,
255                    void *value)
256 {
257   struct GNUNET_CRYPTO_EccPublicSignKey **issuers = cls;
258   struct Issuer *issuer = value;
259
260   *issuers[0] = issuer->pubkey;
261   *issuers = &((*issuers)[1]);
262   return GNUNET_OK;
263 }
264
265
266 /**
267  * Core's transmit notify callback to send request
268  *
269  * @param cls the related node
270  * @param bufsize buffer size
271  * @param buf the buffer to copy to
272  * @return bytes passed
273  */
274 static size_t 
275 send_experimentation_request_cb (void *cls, size_t bufsize, void *buf)
276 {
277   struct Node *n = cls;
278   struct Experimentation_Request msg;
279   unsigned int my_issuer_count = GNUNET_CONTAINER_multihashmap_size (valid_issuers);
280   size_t msg_size = sizeof (msg);
281   size_t ri_size = sizeof (struct GNUNET_CRYPTO_EccPublicSignKey) * my_issuer_count;
282   size_t total_size = msg_size + ri_size;
283   struct GNUNET_CRYPTO_EccPublicSignKey *issuers;
284         
285   n->cth = NULL;
286   if (NULL == buf)
287   {
288     /* client disconnected */
289     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
290                 "Client disconnected\n");
291     if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
292                 GNUNET_SCHEDULER_cancel (n->timeout_task);
293     GNUNET_SCHEDULER_add_now (&remove_request, n);
294     return 0;
295   }
296   GNUNET_assert (bufsize >= total_size);
297   msg.msg.size = htons (total_size);
298   msg.msg.type = htons (GNUNET_MESSAGE_TYPE_EXPERIMENTATION_REQUEST);
299   msg.capabilities = htonl (GSE_node_capabilities);
300   msg.issuer_count = htonl (my_issuer_count);
301   memcpy (buf, &msg, msg_size);
302   issuers = (struct GNUNET_CRYPTO_EccPublicSignKey *) buf + msg_size;
303   GNUNET_CONTAINER_multihashmap_iterate (valid_issuers,
304                                          &append_public_key,
305                                          &issuers);
306   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
307               _("Sending experimentation request to peer %s\n"),
308               GNUNET_i2s (&n->id));
309   return total_size;
310 }
311
312
313 /**
314  * Send request to peer to start add him to to the set of experimentation nodes
315  *
316  * @param peer the peer to send to
317  */
318 static void 
319 send_experimentation_request (const struct GNUNET_PeerIdentity *peer)
320 {
321   struct Node *n;
322   struct NodeComCtx *e_ctx;
323   size_t size;
324   size_t c_issuers;
325   
326   c_issuers = GNUNET_CONTAINER_multihashmap_size (valid_issuers);  
327   size = sizeof (struct Experimentation_Request) +
328     c_issuers * sizeof (struct GNUNET_CRYPTO_EccPublicSignKey);
329   n = GNUNET_new (struct Node);
330   n->id = *peer;
331   n->timeout_task = GNUNET_SCHEDULER_add_delayed (EXP_RESPONSE_TIMEOUT, &remove_request, n);
332   n->capabilities = NONE;
333   
334   e_ctx = GNUNET_new (struct NodeComCtx);
335   e_ctx->n = n;
336   e_ctx->e = NULL;
337   e_ctx->size = size;
338   e_ctx->notify = &send_experimentation_request_cb;
339   e_ctx->notify_cls = n;
340   GNUNET_CONTAINER_DLL_insert_tail(n->e_req_head, n->e_req_tail, e_ctx);
341   schedule_transmisson (e_ctx);
342   
343   GNUNET_assert (GNUNET_OK == 
344                  GNUNET_CONTAINER_multipeermap_put (nodes_requested,
345                                                     peer, n, 
346                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
347   update_stats (nodes_requested);
348 }
349
350
351 /**
352  * Core's transmit notify callback to send response
353  *
354  * @param cls the related node
355  * @param bufsize buffer size
356  * @param buf the buffer to copy to
357  * @return bytes passed
358  */
359 static size_t
360 send_response_cb (void *cls, size_t bufsize, void *buf)
361 {
362   struct Node *n = cls;
363   struct Experimentation_Response msg;
364   size_t c_issuers = GNUNET_CONTAINER_multihashmap_size (valid_issuers);  
365   size_t ri_size = c_issuers * sizeof (struct GNUNET_CRYPTO_EccPublicSignKey);
366   size_t msg_size = sizeof (msg);
367   size_t total_size = msg_size + ri_size;
368   struct GNUNET_CRYPTO_EccPublicSignKey *issuers;
369   
370   n->cth = NULL;
371   if (buf == NULL)
372   {
373     /* client disconnected */
374     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
375                 "Client disconnected\n");
376     return 0;
377   }
378   GNUNET_assert (bufsize >= total_size);
379
380   msg.msg.size = htons (total_size);
381   msg.msg.type = htons (GNUNET_MESSAGE_TYPE_EXPERIMENTATION_RESPONSE);
382   msg.capabilities = htonl (GSE_node_capabilities);
383   msg.issuer_count = htonl (c_issuers);
384   memcpy (buf, &msg, msg_size);
385   issuers = (struct GNUNET_CRYPTO_EccPublicSignKey *) buf + msg_size;
386   GNUNET_CONTAINER_multihashmap_iterate (valid_issuers,
387                                          &append_public_key,
388                                          &issuers);
389   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
390               "Sending response to peer %s\n",
391               GNUNET_i2s (&n->id));
392   return total_size;
393 }
394
395
396 static void
397 get_experiments_cb (struct Node *n, struct Experiment *e)
398 {
399   static int counter = 0;
400   if (NULL == e)
401     return; /* Done */
402   
403   /* Tell the scheduler to add a node with an experiment */
404   GED_scheduler_add (n, e, GNUNET_YES);
405   counter ++;
406 }
407
408
409 struct Node *
410 get_node (const struct GNUNET_PeerIdentity *id)
411 {
412   struct Node * res;
413   struct Node * tmp;
414
415   res = NULL;
416   tmp = NULL;
417   tmp = GNUNET_CONTAINER_multipeermap_get (nodes_active, id);
418   if (res == NULL)
419     res = tmp;
420   
421   tmp = GNUNET_CONTAINER_multipeermap_get (nodes_inactive, id);
422   if (res == NULL)
423     res = tmp;
424   else
425     GNUNET_break (0); /* Multiple instances */
426   
427   tmp = GNUNET_CONTAINER_multipeermap_get (nodes_requested, id);
428   if (res == NULL)
429     res = tmp;
430   else
431     GNUNET_break (0); /* Multiple instances */
432   
433   return res;
434 }
435
436
437 /**
438  * Set a specific node as active
439  *
440  * @param n the node
441  */
442 static void 
443 node_make_active (struct Node *n)
444 {
445   int c1;
446
447   GNUNET_CONTAINER_multipeermap_put (nodes_active,
448                                      &n->id, n, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
449   update_stats (nodes_active);
450   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
451               _("Added peer `%s' as active node\n"),
452               GNUNET_i2s (&n->id));
453   /* Request experiments for this node to start them */
454   for (c1 = 0; c1 < n->issuer_count; c1++)
455   {    
456     GED_experiments_get (n, &n->issuer_id[c1], &get_experiments_cb);
457   }
458 }
459
460
461 /**
462  * Handle a request and send a response
463  *
464  * @param peer the source
465  * @param message the message
466  */
467 static void
468 handle_request (const struct GNUNET_PeerIdentity *peer,
469                 const struct GNUNET_MessageHeader *message)
470 {
471   struct Node *n;
472   struct NodeComCtx *e_ctx;
473   const struct Experimentation_Request *rm = (const struct Experimentation_Request *) message;
474   const struct Experimentation_Issuer *rmi = (const struct Experimentation_Issuer *) &rm[1];
475   int c1;
476   int c2;
477   uint32_t ic;
478   uint32_t ic_accepted;
479   int make_active;
480   
481   if (ntohs (message->size) < sizeof (struct Experimentation_Request))
482   {
483     GNUNET_break (0);
484     return;
485   }
486   ic = ntohl (rm->issuer_count);
487   if (ntohs (message->size) != sizeof (struct Experimentation_Request) + ic * sizeof (struct Experimentation_Issuer))
488   {
489     GNUNET_break (0);
490     return;
491   }
492   
493   make_active = GNUNET_NO;
494   if (NULL != (n = GNUNET_CONTAINER_multipeermap_get (nodes_active, peer)))
495   {
496     /* Nothing to do */
497   }
498   else if (NULL != (n = GNUNET_CONTAINER_multipeermap_get (nodes_requested, peer)))
499   {
500     GNUNET_CONTAINER_multipeermap_remove (nodes_requested, peer, n);
501     if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
502       {
503         GNUNET_SCHEDULER_cancel (n->timeout_task);
504         n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
505       }
506     update_stats (nodes_requested);
507     make_active = GNUNET_YES;
508   }
509   else if (NULL != (n = GNUNET_CONTAINER_multipeermap_get (nodes_inactive, peer)))
510   {
511     GNUNET_break (0 == GNUNET_CONTAINER_multipeermap_remove (nodes_inactive, peer, n));
512     update_stats (nodes_inactive);
513     make_active = GNUNET_YES;
514   }
515   else
516   {
517     /* Create new node */
518     n = GNUNET_new (struct Node);
519     n->id = *peer;
520     n->capabilities = NONE;
521     make_active = GNUNET_YES;
522   }
523   
524   /* Update node */
525   n->capabilities = ntohl (rm->capabilities);
526   
527   /* Filter accepted issuer */
528   ic_accepted = 0;
529   for (c1 = 0; c1 < ic; c1++)
530   {
531     if (GNUNET_YES == GED_experiments_issuer_accepted(&rmi[c1].issuer_id))
532       ic_accepted ++;
533   }
534   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
535               "Request from peer `%s' with %u issuers, we accepted %u issuer \n",
536               GNUNET_i2s (peer), ic, ic_accepted);
537   GNUNET_free_non_null (n->issuer_id);
538   n->issuer_id = GNUNET_malloc (ic_accepted * sizeof (struct GNUNET_PeerIdentity));
539   c2 = 0;
540   for (c1 = 0; c1 < ic; c1++)
541   {
542     if (GNUNET_YES == GED_experiments_issuer_accepted (&rmi[c1].issuer_id))
543     {
544       n->issuer_id[c2] = rmi[c1].issuer_id;
545       c2 ++;
546     }
547   }
548   n->issuer_count = ic_accepted;
549   
550   if (GNUNET_YES == make_active)
551     node_make_active (n);
552   
553   /* Send response */
554   e_ctx = GNUNET_new (struct NodeComCtx);
555   e_ctx->n = n;
556   e_ctx->e = NULL;
557   e_ctx->size = sizeof (struct Experimentation_Response) + GSE_my_issuer_count * sizeof (struct Experimentation_Issuer);
558   e_ctx->notify = &send_response_cb;
559   e_ctx->notify_cls = n;
560   
561   GNUNET_CONTAINER_DLL_insert_tail(n->e_req_head, n->e_req_tail, e_ctx);
562   schedule_transmisson (e_ctx);
563 }
564
565
566 /**
567  * Handle a response
568  *
569  * @param peer the source
570  * @param message the message
571  */
572 static void handle_response (const struct GNUNET_PeerIdentity *peer,
573                              const struct GNUNET_MessageHeader *message)
574 {
575   struct Node *n;
576   const struct Experimentation_Response *rm = (const struct Experimentation_Response *) message;
577   const struct Experimentation_Issuer *rmi = (const struct Experimentation_Issuer *) &rm[1];
578   uint32_t ic;
579   uint32_t ic_accepted;
580   int make_active;
581   unsigned int c1;
582   unsigned int c2;
583   
584   if (ntohs (message->size) < sizeof (struct Experimentation_Response))
585     {
586       GNUNET_break (0);
587       return;
588     }
589   ic = ntohl (rm->issuer_count);
590   if (ntohs (message->size) != sizeof (struct Experimentation_Response) + ic * sizeof (struct Experimentation_Issuer))
591   {
592     GNUNET_break (0);
593     return;
594   }
595   
596   make_active = GNUNET_NO;
597   if (NULL != (n = GNUNET_CONTAINER_multipeermap_get (nodes_active, peer)))
598   {
599     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
600                 "Received %s from %s peer `%s'\n",
601                 "RESPONSE", "active", GNUNET_i2s (peer));
602   }
603   else if (NULL != (n = GNUNET_CONTAINER_multipeermap_get (nodes_requested, peer)))
604   {
605     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received %s from %s peer `%s'\n",
606                 "RESPONSE", "requested", GNUNET_i2s (peer));
607     GNUNET_CONTAINER_multipeermap_remove (nodes_requested, peer, n);
608     if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
609     {
610       GNUNET_SCHEDULER_cancel (n->timeout_task);
611       n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
612     }
613     update_stats (nodes_requested);
614     make_active = GNUNET_YES;
615   }
616   else if (NULL != (n = GNUNET_CONTAINER_multipeermap_get (nodes_inactive, peer)))
617   {
618     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
619                 "Received %s from peer `%s'\n",
620                 "RESPONSE", "inactive", GNUNET_i2s (peer));
621     GNUNET_break (0 == GNUNET_CONTAINER_multipeermap_remove (nodes_inactive, peer, n));
622     update_stats (nodes_inactive);
623     make_active = GNUNET_YES;
624   }
625   else
626   {
627     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received %s from %s peer `%s'\n",
628                 "RESPONSE", "unknown", GNUNET_i2s (peer));
629     return;
630   }
631   
632   /* Update */
633   n->capabilities = ntohl (rm->capabilities);
634   
635   /* Filter accepted issuer */
636   ic_accepted = 0;
637   for (c1 = 0; c1 < ic; c1++)
638   {
639     if (GNUNET_YES == GED_experiments_issuer_accepted(&rmi[c1].issuer_id))
640       ic_accepted ++;
641   }
642   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
643               "Response from peer `%s' with %u issuers, we accepted %u issuer \n",
644               GNUNET_i2s (peer), ic, ic_accepted);
645   GNUNET_free_non_null (n->issuer_id);
646   n->issuer_id = GNUNET_malloc (ic_accepted * sizeof (struct GNUNET_PeerIdentity));
647   c2 = 0;
648   for (c1 = 0; c1 < ic; c1++)
649   {
650     if (GNUNET_YES == GED_experiments_issuer_accepted(&rmi[c1].issuer_id))
651     {
652       n->issuer_id[c2] = rmi[c1].issuer_id;
653       c2 ++;
654     }
655   }
656   n->issuer_count = ic_accepted;
657   
658   if (GNUNET_YES == make_active)
659     node_make_active (n);
660 }
661
662
663 /**
664  * Handle a response
665  *
666  * @param peer the source
667  * @param message the message
668  */
669 static void 
670 handle_start (const struct GNUNET_PeerIdentity *peer,
671               const struct GNUNET_MessageHeader *message)
672 {
673   uint16_t size;
674   uint32_t name_len;
675   const struct GED_start_message *msg;
676   const char *name;
677   struct Node *n;
678   struct Experiment *e;
679   
680   if (NULL == peer)
681   {
682     GNUNET_break (0);
683     return;
684   }
685   if (NULL == message)
686   {
687     GNUNET_break (0);
688     return;
689   }
690   
691   size = ntohs (message->size);
692   if (size < sizeof (struct GED_start_message))
693   {
694     GNUNET_break (0);
695     return;
696   }
697   msg = (const struct GED_start_message *) message;
698   name_len = ntohl (msg->len_name);
699   if (size != sizeof (struct GED_start_message) + name_len)
700   {
701     GNUNET_break (0);
702     return;
703   }
704   
705   n = get_node (peer);
706   if (NULL == n)
707   {
708     GNUNET_break (0);
709     return;
710   }
711   name = (const char *) &msg[1];
712   if (name[name_len-1] != '\0')
713   {
714     GNUNET_break (0);
715     return;
716   }  
717   if (name_len != strlen (name) + 1)
718   {
719     GNUNET_break (0);
720     return;
721   }  
722   e = GED_experiments_find (&msg->issuer, name, GNUNET_TIME_absolute_ntoh(msg->version_nbo));
723   if (NULL == e)
724   {
725     GNUNET_break (0);
726     return;
727   }  
728   GED_scheduler_handle_start (n, e);
729 }
730
731
732 /**
733  * Handle a response
734  *
735  * @param peer the source
736  * @param message the message
737  */
738 static void
739 handle_start_ack (const struct GNUNET_PeerIdentity *peer,
740                   const struct GNUNET_MessageHeader *message)
741 {
742   uint16_t size;
743   uint32_t name_len;
744   const struct GED_start_ack_message *msg;
745   const char *name;
746   struct Node *n;
747   struct Experiment *e;
748   
749   if (NULL == peer)
750   {
751     GNUNET_break (0);
752     return;
753   }
754   if (NULL == message)
755   {
756     GNUNET_break (0);
757     return;
758   }
759   
760   size = ntohs (message->size);
761   if (size < sizeof (struct GED_start_ack_message))
762   {
763     GNUNET_break (0);
764     return;
765   }
766   msg = (const struct GED_start_ack_message *) message;
767   name_len = ntohl (msg->len_name);
768   if (size != sizeof (struct GED_start_message) + name_len)
769   {
770     GNUNET_break (0);
771     return;
772   }
773   
774   n = get_node (peer);
775   if (NULL == n)
776   {
777     GNUNET_break (0);
778     return;
779   }
780   name = (const char *) &msg[1];
781   if (name[name_len-1] != '\0')
782   {
783     GNUNET_break (0);
784     return;
785   }
786   if (name_len != strlen (name) + 1)
787   {
788     GNUNET_break (0);
789     return;
790   }
791   
792   e = GED_experiments_find (&msg->issuer, name, GNUNET_TIME_absolute_ntoh(msg->version_nbo));
793   if (NULL == e)
794   {
795     GNUNET_break (0);
796     return;
797   }
798   GED_scheduler_handle_start_ack (n, e);
799 }
800
801
802 /**
803  * Handle a response
804  *
805  * @param peer the source
806  * @param message the message
807  */
808 static void
809 handle_stop (const struct GNUNET_PeerIdentity *peer,
810              const struct GNUNET_MessageHeader *message)
811 {
812         uint16_t size;
813         uint32_t name_len;
814         const struct GED_stop_message *msg;
815         const char *name;
816         struct Node *n;
817         struct Experiment *e;
818
819         if (NULL == peer)
820         {
821                 GNUNET_break (0);
822                 return;
823         }
824         if (NULL == message)
825         {
826                 GNUNET_break (0);
827                 return;
828         }
829
830         size = ntohs (message->size);
831         if (size < sizeof (struct GED_stop_message))
832         {
833                 GNUNET_break (0);
834                 return;
835         }
836         msg = (const struct GED_stop_message *) message;
837         name_len = ntohl (msg->len_name);
838         if (size != sizeof (struct GED_start_message) + name_len)
839         {
840                 GNUNET_break (0);
841                 return;
842         }
843
844         n = get_node (peer);
845         if (NULL == n)
846         {
847                 GNUNET_break (0);
848                 return;
849         }
850         name = (const char *) &msg[1];
851         if (name[name_len-1] != '\0')
852         {
853                 GNUNET_break (0);
854                 return;
855         }
856
857         if (name_len != strlen (name) + 1)
858         {
859                 GNUNET_break (0);
860                 return;
861         }
862
863         e = GED_experiments_find (&msg->issuer, name, GNUNET_TIME_absolute_ntoh(msg->version_nbo));
864         if (NULL == e)
865         {
866                 GNUNET_break (0);
867                 return;
868         }
869         GED_scheduler_handle_stop (n, e);
870 }
871
872
873 /**
874  * Method called whenever a given peer connects.
875  *
876  * @param cls closure
877  * @param peer peer identity this notification is about
878  */
879 static void
880 core_connect_handler (void *cls,
881                       const struct GNUNET_PeerIdentity *peer)
882 {
883   if (GNUNET_YES == is_me(peer))
884     return;
885   
886   GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Connected to peer %s\n"),
887               GNUNET_i2s (peer));
888   
889   if (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (nodes_requested, peer))
890     return; /* We already sent a request */
891   
892   if (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (nodes_active, peer))
893     return; /* This peer is known as active  */
894   
895   if (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (nodes_inactive, peer))
896     return; /* This peer is known as inactive  */
897   
898   send_experimentation_request (peer);
899 }
900
901
902 /**
903  * Method called whenever a given peer disconnects.
904  *
905  * @param cls closure
906  * @param peer peer identity this notification is about
907  */
908 static void
909 core_disconnect_handler (void *cls,
910                          const struct GNUNET_PeerIdentity * peer)
911 {
912         struct Node *n;
913         if (GNUNET_YES == is_me(peer))
914                 return;
915
916         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Disconnected from peer %s\n"),
917                         GNUNET_i2s (peer));
918
919         if (NULL != (n = GNUNET_CONTAINER_multipeermap_get (nodes_requested, peer)))
920                 cleanup_node (nodes_requested, peer, n);
921
922         if (NULL != (n = GNUNET_CONTAINER_multipeermap_get (nodes_active, peer)))
923                 cleanup_node (nodes_active, peer, n);
924
925         if (NULL != (n = GNUNET_CONTAINER_multipeermap_get (nodes_inactive, peer)))
926                 cleanup_node (nodes_inactive, peer, n);
927 }
928
929
930 /**
931  * Handle a request and send a response
932  *
933  * @param cls unused
934  * @param other the sender
935  * @param message the message
936  * @return GNUNET_OK to keep connection, GNUNET_SYSERR on error
937  */
938 static int
939 core_receive_handler (void *cls,
940                       const struct GNUNET_PeerIdentity *other,
941                       const struct GNUNET_MessageHeader *message)
942 {
943         if (ntohs (message->size) < sizeof (struct GNUNET_MessageHeader))
944         {
945                         GNUNET_break (0);
946                         return GNUNET_SYSERR;
947         }
948
949         switch (ntohs (message->type)) {
950                 case GNUNET_MESSAGE_TYPE_EXPERIMENTATION_REQUEST:
951                         handle_request (other, message);
952                         break;
953                 case GNUNET_MESSAGE_TYPE_EXPERIMENTATION_RESPONSE:
954                         handle_response (other, message);
955                         break;
956                 case GNUNET_MESSAGE_TYPE_EXPERIMENTATION_START:
957                         handle_start (other, message);
958                         break;
959                 case GNUNET_MESSAGE_TYPE_EXPERIMENTATION_START_ACK:
960                         handle_start_ack (other, message);
961                         break;
962                 case GNUNET_MESSAGE_TYPE_EXPERIMENTATION_STOP:
963                         handle_stop (other, message);
964                         break;
965                 default:
966                         break;
967         }
968
969         return GNUNET_OK;
970 }
971
972
973 static size_t
974 node_experiment_start_cb (void *cls, size_t bufsize, void *buf)
975 {
976         struct NodeComCtx *e_ctx = cls;
977         struct GED_start_message *msg;
978         size_t name_len;
979         size_t size;
980
981         if (NULL == buf)
982                 return 0;
983
984         name_len = strlen(e_ctx->e->name) + 1;
985         size = sizeof (struct GED_start_message) + name_len;
986
987         msg = GNUNET_malloc (size);
988         msg->header.size = htons (size);
989         msg->header.type = htons (GNUNET_MESSAGE_TYPE_EXPERIMENTATION_START);
990         msg->issuer = e_ctx->e->issuer;
991         msg->version_nbo = GNUNET_TIME_absolute_hton(e_ctx->e->version);
992         msg->len_name = htonl (name_len);
993         memcpy (&msg[1], e_ctx->e->name, name_len);
994
995         memcpy (buf, msg, size);
996         GNUNET_free (msg);
997         return size;
998 }
999
1000
1001 static size_t
1002 node_experiment_start_ack_cb (void *cls, size_t bufsize, void *buf)
1003 {
1004         struct NodeComCtx *e_ctx = cls;
1005         struct GED_start_ack_message *msg;
1006         size_t name_len;
1007         size_t size;
1008         if (NULL == buf)
1009                 return 0;
1010
1011         name_len = strlen(e_ctx->e->name) + 1;
1012         size = sizeof (struct GED_start_ack_message) + name_len;
1013
1014         msg = GNUNET_malloc (size);
1015         msg->header.size = htons (size);
1016         msg->header.type = htons (GNUNET_MESSAGE_TYPE_EXPERIMENTATION_START_ACK);
1017         msg->issuer = e_ctx->e->issuer;
1018         msg->version_nbo = GNUNET_TIME_absolute_hton(e_ctx->e->version);
1019         msg->len_name = htonl (name_len);
1020         memcpy (&msg[1], e_ctx->e->name, name_len);
1021
1022         memcpy (buf, msg, size);
1023         GNUNET_free (msg);
1024         return size;
1025 }
1026
1027
1028
1029
1030 /**
1031  * Confirm a experiment START with a node
1032  *
1033  * @return GNUNET_NO if core was busy with sending, GNUNET_OK otherwise
1034  */
1035 int
1036 GED_nodes_send_start_ack (struct Node *n, struct Experiment *e)
1037 {
1038         struct NodeComCtx *e_ctx;
1039
1040         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1041                         "Sending %s for experiment request to peer `%s' for experiment `%s'\n",
1042                         "START_ACK" ,GNUNET_i2s(&n->id), e->name);
1043
1044         e_ctx = GNUNET_malloc (sizeof (struct NodeComCtx));
1045         e_ctx->n = n;
1046         e_ctx->e = e;
1047         e_ctx->size = sizeof (struct GED_start_ack_message) + strlen (e->name) + 1;
1048         e_ctx->notify = &node_experiment_start_ack_cb;
1049         e_ctx->notify_cls = e_ctx;
1050
1051         GNUNET_CONTAINER_DLL_insert_tail (n->e_req_head, n->e_req_tail, e_ctx);
1052         schedule_transmisson (e_ctx);
1053         return GNUNET_OK;
1054 }
1055
1056
1057 /**
1058  * Request a experiment to start with a node
1059  *
1060  * @return GNUNET_NO if core was busy with sending, GNUNET_OK otherwise
1061  */
1062 int
1063 GED_nodes_send_start (struct Node *n, struct Experiment *e)
1064 {
1065         struct NodeComCtx *e_ctx;
1066
1067         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1068                         "Sending %s for experiment request to peer `%s' for experiment `%s'\n",
1069                         "START", GNUNET_i2s(&n->id), e->name);
1070
1071         e_ctx = GNUNET_malloc (sizeof (struct NodeComCtx));
1072         e_ctx->n = n;
1073         e_ctx->e = e;
1074         e_ctx->size = sizeof (struct GED_start_message) + strlen (e->name) + 1;
1075         e_ctx->notify = &node_experiment_start_cb;
1076         e_ctx->notify_cls = e_ctx;
1077
1078         GNUNET_CONTAINER_DLL_insert_tail (n->e_req_head, n->e_req_tail, e_ctx);
1079         schedule_transmisson (e_ctx);
1080         return GNUNET_OK;
1081 }
1082
1083
1084 /**
1085  * Start the nodes management
1086  */
1087 void
1088 GED_nodes_start ()
1089 {
1090         /* Connecting to core service to find partners */
1091         ch = GNUNET_CORE_connect (GED_cfg, NULL,
1092                                                                                                                 &core_startup_handler,
1093                                                                                                                 &core_connect_handler,
1094                                                                                                                 &core_disconnect_handler,
1095                                                                                                                 &core_receive_handler,
1096                                                                                                                 GNUNET_NO, NULL, GNUNET_NO, NULL);
1097         if (NULL == ch)
1098         {
1099                         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Failed to connect to CORE service!\n"));
1100                         return;
1101         }
1102
1103         nodes_requested = GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO);
1104         nodes_active = GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO);
1105         nodes_inactive = GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO);
1106 }
1107
1108
1109 /**
1110  * Stop the nodes management
1111  */
1112 void
1113 GED_nodes_stop ()
1114 {
1115   if (NULL != ch)
1116   {
1117                 GNUNET_CORE_disconnect (ch);
1118                 ch = NULL;
1119   }
1120
1121   if (NULL != nodes_requested)
1122   {
1123                 GNUNET_CONTAINER_multipeermap_iterate (nodes_requested,
1124                                                                                                                                                                          &cleanup_node,
1125                                                                                                                                                                          nodes_requested);
1126                 update_stats (nodes_requested);
1127                 GNUNET_CONTAINER_multipeermap_destroy (nodes_requested);
1128                 nodes_requested = NULL;
1129   }
1130
1131   if (NULL != nodes_active)
1132   {
1133                 GNUNET_CONTAINER_multipeermap_iterate (nodes_active,
1134                                                                                                                                                                          &cleanup_node,
1135                                                                                                                                                                          nodes_active);
1136                 update_stats (nodes_active);
1137                 GNUNET_CONTAINER_multipeermap_destroy (nodes_active);
1138                 nodes_active = NULL;
1139   }
1140
1141   if (NULL != nodes_inactive)
1142   {
1143                 GNUNET_CONTAINER_multipeermap_iterate (nodes_inactive,
1144                                                                                                                                                                          &cleanup_node,
1145                                                                                                                                                                          nodes_inactive);
1146                 update_stats (nodes_inactive);
1147                 GNUNET_CONTAINER_multipeermap_destroy (nodes_inactive);
1148                 nodes_inactive = NULL;
1149   }
1150 }
1151
1152 /* end of gnunet-daemon-experimentation_nodes.c */