removing remenants of abstract unix domain socket handling, this finishes addressing...
[oweals/gnunet.git] / src / experimentation / gnunet-daemon-experimentation_nodes.c
1 /*
2      This file is part of GNUnet.
3      (C) 2012-2013 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file experimentation/gnunet-daemon-experimentation_nodes.c
23  * @brief experimentation daemon: node management
24  * @author Christian Grothoff
25  * @author Matthias Wachs
26  */
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_core_service.h"
30 #include "gnunet_statistics_service.h"
31 #include "gnunet-daemon-experimentation.h"
32
33
34 #define FAST_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
35
36 /**
37  * Core handle
38  */
39 static struct GNUNET_CORE_Handle *ch;
40
41 /**
42  * Peer's own identity
43  */
44 static struct GNUNET_PeerIdentity me;
45
46 /**
47  * Nodes with a pending request
48  */
49 static struct GNUNET_CONTAINER_MultiPeerMap *nodes_requested;
50
51 /**
52  * Active experimentation nodes
53  */
54 static struct GNUNET_CONTAINER_MultiPeerMap *nodes_active;
55
56 /**
57  * Inactive experimentation nodes
58  * To be excluded from future requests
59  */
60 static struct GNUNET_CONTAINER_MultiPeerMap *nodes_inactive;
61
62
63 struct NodeComCtx
64 {
65   struct NodeComCtx *prev;
66   struct NodeComCtx *next;
67
68   struct Node *n;
69   struct Experiment *e;
70
71   size_t size;
72   GNUNET_CONNECTION_TransmitReadyNotify notify;
73   void *notify_cls;
74 };
75
76
77 /**
78  * Update statistics
79  *
80  * @param m peermap to update values from
81  */
82 static void
83 update_stats (struct GNUNET_CONTAINER_MultiPeerMap *m)
84 {
85   GNUNET_assert (NULL != m);
86   GNUNET_assert (NULL != GED_stats);
87
88   if (m == nodes_active)
89   {
90     GNUNET_STATISTICS_set (GED_stats, "# nodes active",
91                            GNUNET_CONTAINER_multipeermap_size(m), GNUNET_NO);
92   }
93   else if (m == nodes_inactive)
94   {
95     GNUNET_STATISTICS_set (GED_stats, "# nodes inactive",
96                            GNUNET_CONTAINER_multipeermap_size(m), GNUNET_NO);
97   }
98   else if (m == nodes_requested)
99   {
100     GNUNET_STATISTICS_set (GED_stats, "# nodes requested",
101                            GNUNET_CONTAINER_multipeermap_size(m), GNUNET_NO);
102   }
103   else
104     GNUNET_break (0);
105 }
106
107
108 /**
109  * Clean up node
110  *
111  * @param cls the peermap to clean up
112  * @param key key of the current node
113  * @param value related node object
114  * @return always #GNUNET_OK
115  */
116 static int
117 cleanup_node (void *cls,
118               const struct GNUNET_PeerIdentity * key,
119               void *value)
120 {
121   struct Node *n;
122   struct NodeComCtx *e_cur;
123   struct NodeComCtx *e_next;
124   struct GNUNET_CONTAINER_MultiPeerMap *cur = cls;
125
126   n = value;
127   if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
128   {
129     GNUNET_SCHEDULER_cancel (n->timeout_task);
130     n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
131   }
132
133   if (NULL != n->cth)
134   {
135     GNUNET_CORE_notify_transmit_ready_cancel (n->cth);
136     n->cth = NULL;
137   }
138   e_next = n->e_req_head;
139   while (NULL != (e_cur = e_next))
140   {
141     e_next = e_cur->next;
142     GNUNET_CONTAINER_DLL_remove (n->e_req_head, n->e_req_tail, e_cur);
143     GNUNET_free (e_cur);
144   }
145   GNUNET_break (0 == GNUNET_CONTAINER_multipeermap_remove (cur, key, value));
146   GNUNET_free (value);
147   return GNUNET_OK;
148 }
149
150
151 /**
152  * Check if id passed is my id
153  *
154  * @param id the id to check
155  * @return GNUNET_YES or GNUNET_NO
156  */
157 static int
158 is_me (const struct GNUNET_PeerIdentity *id)
159 {
160   if (0 == memcmp (&me, id, sizeof (me)))
161     return GNUNET_YES;
162   else
163     return GNUNET_NO;
164 }
165
166
167 /**
168  * Core startup callback
169  *
170  * @param cls unused
171  * @param my_identity my id
172  */
173 static void
174 core_startup_handler (void *cls,
175                       const struct GNUNET_PeerIdentity *my_identity)
176 {
177   me = *my_identity;
178 }
179
180
181 static void
182 schedule_transmisson (struct NodeComCtx *e_ctx);
183
184
185 static size_t
186 transmit_read_wrapper (void *cls, size_t bufsize, void *buf)
187 {
188   struct NodeComCtx *e_ctx = cls;
189   struct NodeComCtx *next;
190
191   size_t res = e_ctx->notify (e_ctx->notify_cls, bufsize, buf);
192   e_ctx->n->cth = NULL;
193
194   GNUNET_CONTAINER_DLL_remove (e_ctx->n->e_req_head, e_ctx->n->e_req_tail, e_ctx);
195   next = e_ctx->n->e_req_head;
196   GNUNET_free (e_ctx);
197
198   if (NULL != next)
199   {
200     /* Schedule next message */
201     schedule_transmisson (next);
202   }
203   return res;
204 }
205
206
207 static void
208 schedule_transmisson (struct NodeComCtx *e_ctx)
209 {
210   if (NULL != e_ctx->n->cth)
211     return;
212
213   e_ctx->n->cth = GNUNET_CORE_notify_transmit_ready (ch, GNUNET_NO, 0, FAST_TIMEOUT,
214                                                      &e_ctx->n->id, e_ctx->size,
215                                                      transmit_read_wrapper, e_ctx);
216   if (NULL == e_ctx->n->cth)
217   {
218     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
219                 _("Cannot send message to peer `%s' for experiment `%s'\n"),
220                 GNUNET_i2s(&e_ctx->n->id), e_ctx->e->name);
221     GNUNET_free (e_ctx);
222   }
223 }
224
225
226 /**
227  * Remove experimentation request due to timeout
228  *
229  * @param cls the related node
230  * @param tc scheduler's task context
231  */
232 static void
233 remove_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
234 {
235   struct Node *n = cls;
236
237   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
238               "Removing request for peer %s due to timeout\n",
239               GNUNET_i2s (&n->id));
240   if (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (nodes_requested, &n->id))
241   {
242     GNUNET_break (0 == GNUNET_CONTAINER_multipeermap_remove (nodes_requested, &n->id, n));
243     update_stats (nodes_requested);
244     GNUNET_CONTAINER_multipeermap_put (nodes_inactive, &n->id, n,
245                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
246     update_stats (nodes_inactive);
247   }
248   n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
249 }
250
251
252 static int
253 append_public_key (void *cls,
254                    const struct GNUNET_HashCode *key,
255                    void *value)
256 {
257   struct GNUNET_CRYPTO_EccPublicSignKey **issuers = cls;
258   struct Issuer *issuer = value;
259
260   *issuers[0] = issuer->pubkey;
261   *issuers = &((*issuers)[1]);
262   return GNUNET_OK;
263 }
264
265
266 /**
267  * Core's transmit notify callback to send request
268  *
269  * @param cls the related node
270  * @param bufsize buffer size
271  * @param buf the buffer to copy to
272  * @return bytes passed
273  */
274 static size_t
275 send_experimentation_request_cb (void *cls, size_t bufsize, void *buf)
276 {
277   struct Node *n = cls;
278   struct Experimentation_Request msg;
279   unsigned int my_issuer_count = GNUNET_CONTAINER_multihashmap_size (valid_issuers);
280   size_t msg_size = sizeof (msg);
281   size_t ri_size = sizeof (struct GNUNET_CRYPTO_EccPublicSignKey) * my_issuer_count;
282   size_t total_size = msg_size + ri_size;
283   struct GNUNET_CRYPTO_EccPublicSignKey *issuers;
284         
285   n->cth = NULL;
286   if (NULL == buf)
287   {
288     /* client disconnected */
289     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
290                 "Client disconnected\n");
291     if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
292                 GNUNET_SCHEDULER_cancel (n->timeout_task);
293     GNUNET_SCHEDULER_add_now (&remove_request, n);
294     return 0;
295   }
296   GNUNET_assert (bufsize >= total_size);
297   msg.msg.size = htons (total_size);
298   msg.msg.type = htons (GNUNET_MESSAGE_TYPE_EXPERIMENTATION_REQUEST);
299   msg.capabilities = htonl (GSE_node_capabilities);
300   msg.issuer_count = htonl (my_issuer_count);
301   memcpy (buf, &msg, msg_size);
302   issuers = (struct GNUNET_CRYPTO_EccPublicSignKey *) buf + msg_size;
303   GNUNET_CONTAINER_multihashmap_iterate (valid_issuers,
304                                          &append_public_key,
305                                          &issuers);
306   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
307               _("Sending experimentation request to peer %s\n"),
308               GNUNET_i2s (&n->id));
309   return total_size;
310 }
311
312
313 /**
314  * Send request to peer to start add him to to the set of experimentation nodes
315  *
316  * @param peer the peer to send to
317  */
318 static void
319 send_experimentation_request (const struct GNUNET_PeerIdentity *peer)
320 {
321   struct Node *n;
322   struct NodeComCtx *e_ctx;
323   size_t size;
324   size_t c_issuers;
325
326   c_issuers = GNUNET_CONTAINER_multihashmap_size (valid_issuers);
327   size = sizeof (struct Experimentation_Request) +
328     c_issuers * sizeof (struct GNUNET_CRYPTO_EccPublicSignKey);
329   n = GNUNET_new (struct Node);
330   n->id = *peer;
331   n->timeout_task = GNUNET_SCHEDULER_add_delayed (EXP_RESPONSE_TIMEOUT, &remove_request, n);
332   n->capabilities = NONE;
333
334   e_ctx = GNUNET_new (struct NodeComCtx);
335   e_ctx->n = n;
336   e_ctx->e = NULL;
337   e_ctx->size = size;
338   e_ctx->notify = &send_experimentation_request_cb;
339   e_ctx->notify_cls = n;
340   GNUNET_CONTAINER_DLL_insert_tail(n->e_req_head, n->e_req_tail, e_ctx);
341   schedule_transmisson (e_ctx);
342
343   GNUNET_assert (GNUNET_OK ==
344                  GNUNET_CONTAINER_multipeermap_put (nodes_requested,
345                                                     peer, n,
346                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
347   update_stats (nodes_requested);
348 }
349
350
351 /**
352  * Core's transmit notify callback to send response
353  *
354  * @param cls the related node
355  * @param bufsize buffer size
356  * @param buf the buffer to copy to
357  * @return bytes passed
358  */
359 static size_t
360 send_response_cb (void *cls, size_t bufsize, void *buf)
361 {
362   struct Node *n = cls;
363   struct Experimentation_Response msg;
364   size_t c_issuers = GNUNET_CONTAINER_multihashmap_size (valid_issuers);
365   size_t ri_size = c_issuers * sizeof (struct GNUNET_CRYPTO_EccPublicSignKey);
366   size_t msg_size = sizeof (msg);
367   size_t total_size = msg_size + ri_size;
368   struct GNUNET_CRYPTO_EccPublicSignKey *issuers;
369
370   n->cth = NULL;
371   if (buf == NULL)
372   {
373     /* client disconnected */
374     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
375                 "Client disconnected\n");
376     return 0;
377   }
378   GNUNET_assert (bufsize >= total_size);
379
380   msg.msg.size = htons (total_size);
381   msg.msg.type = htons (GNUNET_MESSAGE_TYPE_EXPERIMENTATION_RESPONSE);
382   msg.capabilities = htonl (GSE_node_capabilities);
383   msg.issuer_count = htonl (c_issuers);
384   memcpy (buf, &msg, msg_size);
385   issuers = (struct GNUNET_CRYPTO_EccPublicSignKey *) buf + msg_size;
386   GNUNET_CONTAINER_multihashmap_iterate (valid_issuers,
387                                          &append_public_key,
388                                          &issuers);
389   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
390               "Sending response to peer %s\n",
391               GNUNET_i2s (&n->id));
392   return total_size;
393 }
394
395
396 static void
397 get_experiments_cb (struct Node *n, struct Experiment *e)
398 {
399   static int counter = 0;
400   if (NULL == e)
401     return; /* Done */
402
403   /* Tell the scheduler to add a node with an experiment */
404   GED_scheduler_add (n, e, GNUNET_YES);
405   counter ++;
406 }
407
408
409 struct Node *
410 get_node (const struct GNUNET_PeerIdentity *id)
411 {
412   struct Node * res;
413   struct Node * tmp;
414
415   res = NULL;
416   tmp = NULL;
417   tmp = GNUNET_CONTAINER_multipeermap_get (nodes_active, id);
418   if (res == NULL)
419     res = tmp;
420
421   tmp = GNUNET_CONTAINER_multipeermap_get (nodes_inactive, id);
422   if (res == NULL)
423     res = tmp;
424   else
425     GNUNET_break (0); /* Multiple instances */
426
427   tmp = GNUNET_CONTAINER_multipeermap_get (nodes_requested, id);
428   if (res == NULL)
429     res = tmp;
430   else
431     GNUNET_break (0); /* Multiple instances */
432
433   return res;
434 }
435
436
437 /**
438  * Set a specific node as active
439  *
440  * @param n the node
441  */
442 static void
443 node_make_active (struct Node *n)
444 {
445   int c1;
446
447   GNUNET_CONTAINER_multipeermap_put (nodes_active,
448                                      &n->id, n, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
449   update_stats (nodes_active);
450   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
451               _("Added peer `%s' as active node\n"),
452               GNUNET_i2s (&n->id));
453   /* Request experiments for this node to start them */
454   for (c1 = 0; c1 < n->issuer_count; c1++)
455   {
456     GED_experiments_get (n, &n->issuer_id[c1], &get_experiments_cb);
457   }
458 }
459
460
461 /**
462  * Handle a request and send a response
463  *
464  * @param peer the source
465  * @param message the message
466  */
467 static void
468 handle_request (const struct GNUNET_PeerIdentity *peer,
469                 const struct GNUNET_MessageHeader *message)
470 {
471   struct Node *n;
472   struct NodeComCtx *e_ctx;
473   const struct Experimentation_Request *rm = (const struct Experimentation_Request *) message;
474   const struct GNUNET_CRYPTO_EccPublicSignKey *rmi = (const struct GNUNET_CRYPTO_EccPublicSignKey *) &rm[1];
475   unsigned int my_issuer_count = GNUNET_CONTAINER_multihashmap_size (valid_issuers);
476   int c1;
477   int c2;
478   uint32_t ic;
479   uint32_t ic_accepted;
480   int make_active;
481
482   if (ntohs (message->size) < sizeof (struct Experimentation_Request))
483   {
484     GNUNET_break (0);
485     return;
486   }
487   ic = ntohl (rm->issuer_count);
488   if (ntohs (message->size) !=
489       sizeof (struct Experimentation_Request) + ic * sizeof (struct GNUNET_CRYPTO_EccPublicSignKey))
490   {
491     GNUNET_break (0);
492     return;
493   }
494
495   make_active = GNUNET_NO;
496   if (NULL != (n = GNUNET_CONTAINER_multipeermap_get (nodes_active, peer)))
497   {
498     /* Nothing to do */
499   }
500   else if (NULL != (n = GNUNET_CONTAINER_multipeermap_get (nodes_requested, peer)))
501   {
502     GNUNET_CONTAINER_multipeermap_remove (nodes_requested, peer, n);
503     if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
504       {
505         GNUNET_SCHEDULER_cancel (n->timeout_task);
506         n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
507       }
508     update_stats (nodes_requested);
509     make_active = GNUNET_YES;
510   }
511   else if (NULL != (n = GNUNET_CONTAINER_multipeermap_get (nodes_inactive, peer)))
512   {
513     GNUNET_break (0 == GNUNET_CONTAINER_multipeermap_remove (nodes_inactive, peer, n));
514     update_stats (nodes_inactive);
515     make_active = GNUNET_YES;
516   }
517   else
518   {
519     /* Create new node */
520     n = GNUNET_new (struct Node);
521     n->id = *peer;
522     n->capabilities = NONE;
523     make_active = GNUNET_YES;
524   }
525
526   /* Update node */
527   n->capabilities = ntohl (rm->capabilities);
528
529   /* Filter accepted issuer */
530   ic_accepted = 0;
531   for (c1 = 0; c1 < ic; c1++)
532   {
533     if (GNUNET_YES == GED_experiments_issuer_accepted(&rmi[c1]))
534       ic_accepted ++;
535   }
536   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
537               "Request from peer `%s' with %u issuers, we accepted %u issuer \n",
538               GNUNET_i2s (peer), ic, ic_accepted);
539   GNUNET_free_non_null (n->issuer_id);
540   n->issuer_id = GNUNET_malloc (ic_accepted * sizeof (struct GNUNET_CRYPTO_EccPublicSignKey));
541   c2 = 0;
542   for (c1 = 0; c1 < ic; c1++)
543   {
544     if (GNUNET_YES == GED_experiments_issuer_accepted (&rmi[c1]))
545     {
546       n->issuer_id[c2] = rmi[c1];
547       c2 ++;
548     }
549   }
550   n->issuer_count = ic_accepted;
551
552   if (GNUNET_YES == make_active)
553     node_make_active (n);
554
555   /* Send response */
556   e_ctx = GNUNET_new (struct NodeComCtx);
557   e_ctx->n = n;
558   e_ctx->e = NULL;
559   e_ctx->size = sizeof (struct Experimentation_Response) +
560     my_issuer_count * sizeof (struct GNUNET_CRYPTO_EccPublicSignKey);
561   e_ctx->notify = &send_response_cb;
562   e_ctx->notify_cls = n;
563
564   GNUNET_CONTAINER_DLL_insert_tail(n->e_req_head, n->e_req_tail, e_ctx);
565   schedule_transmisson (e_ctx);
566 }
567
568
569 /**
570  * Handle a response
571  *
572  * @param peer the source
573  * @param message the message
574  */
575 static void handle_response (const struct GNUNET_PeerIdentity *peer,
576                              const struct GNUNET_MessageHeader *message)
577 {
578   struct Node *n;
579   const struct Experimentation_Response *rm = (const struct Experimentation_Response *) message;
580   const struct GNUNET_CRYPTO_EccPublicSignKey *rmi = (const struct GNUNET_CRYPTO_EccPublicSignKey *) &rm[1];
581   uint32_t ic;
582   uint32_t ic_accepted;
583   int make_active;
584   unsigned int c1;
585   unsigned int c2;
586
587   if (ntohs (message->size) < sizeof (struct Experimentation_Response))
588     {
589       GNUNET_break (0);
590       return;
591     }
592   ic = ntohl (rm->issuer_count);
593   if (ntohs (message->size) != sizeof (struct Experimentation_Response) + ic * sizeof (struct GNUNET_CRYPTO_EccPublicSignKey))
594   {
595     GNUNET_break (0);
596     return;
597   }
598
599   make_active = GNUNET_NO;
600   if (NULL != (n = GNUNET_CONTAINER_multipeermap_get (nodes_active, peer)))
601   {
602     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
603                 "Received %s from %s peer `%s'\n",
604                 "RESPONSE", "active", GNUNET_i2s (peer));
605   }
606   else if (NULL != (n = GNUNET_CONTAINER_multipeermap_get (nodes_requested, peer)))
607   {
608     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received %s from %s peer `%s'\n",
609                 "RESPONSE", "requested", GNUNET_i2s (peer));
610     GNUNET_CONTAINER_multipeermap_remove (nodes_requested, peer, n);
611     if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
612     {
613       GNUNET_SCHEDULER_cancel (n->timeout_task);
614       n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
615     }
616     update_stats (nodes_requested);
617     make_active = GNUNET_YES;
618   }
619   else if (NULL != (n = GNUNET_CONTAINER_multipeermap_get (nodes_inactive, peer)))
620   {
621     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
622                 "Received %s from peer `%s'\n",
623                 "RESPONSE", "inactive", GNUNET_i2s (peer));
624     GNUNET_break (0 == GNUNET_CONTAINER_multipeermap_remove (nodes_inactive, peer, n));
625     update_stats (nodes_inactive);
626     make_active = GNUNET_YES;
627   }
628   else
629   {
630     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received %s from %s peer `%s'\n",
631                 "RESPONSE", "unknown", GNUNET_i2s (peer));
632     return;
633   }
634
635   /* Update */
636   n->capabilities = ntohl (rm->capabilities);
637
638   /* Filter accepted issuer */
639   ic_accepted = 0;
640   for (c1 = 0; c1 < ic; c1++)
641   {
642     if (GNUNET_YES == GED_experiments_issuer_accepted(&rmi[c1]))
643       ic_accepted ++;
644   }
645   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
646               "Response from peer `%s' with %u issuers, we accepted %u issuer \n",
647               GNUNET_i2s (peer), ic, ic_accepted);
648   GNUNET_free_non_null (n->issuer_id);
649   n->issuer_id = GNUNET_malloc (ic_accepted * sizeof (struct GNUNET_PeerIdentity));
650   c2 = 0;
651   for (c1 = 0; c1 < ic; c1++)
652   {
653     if (GNUNET_YES == GED_experiments_issuer_accepted(&rmi[c1]))
654     {
655       n->issuer_id[c2] = rmi[c1];
656       c2 ++;
657     }
658   }
659   n->issuer_count = ic_accepted;
660
661   if (GNUNET_YES == make_active)
662     node_make_active (n);
663 }
664
665
666 /**
667  * Handle a response
668  *
669  * @param peer the source
670  * @param message the message
671  */
672 static void
673 handle_start (const struct GNUNET_PeerIdentity *peer,
674               const struct GNUNET_MessageHeader *message)
675 {
676   uint16_t size;
677   uint32_t name_len;
678   const struct GED_start_message *msg;
679   const char *name;
680   struct Node *n;
681   struct Experiment *e;
682
683   if (NULL == peer)
684   {
685     GNUNET_break (0);
686     return;
687   }
688   if (NULL == message)
689   {
690     GNUNET_break (0);
691     return;
692   }
693
694   size = ntohs (message->size);
695   if (size < sizeof (struct GED_start_message))
696   {
697     GNUNET_break (0);
698     return;
699   }
700   msg = (const struct GED_start_message *) message;
701   name_len = ntohl (msg->len_name);
702   if (size != sizeof (struct GED_start_message) + name_len)
703   {
704     GNUNET_break (0);
705     return;
706   }
707
708   n = get_node (peer);
709   if (NULL == n)
710   {
711     GNUNET_break (0);
712     return;
713   }
714   name = (const char *) &msg[1];
715   if (name[name_len-1] != '\0')
716   {
717     GNUNET_break (0);
718     return;
719   }
720   if (name_len != strlen (name) + 1)
721   {
722     GNUNET_break (0);
723     return;
724   }
725   e = GED_experiments_find (&msg->issuer, name, GNUNET_TIME_absolute_ntoh(msg->version_nbo));
726   if (NULL == e)
727   {
728     GNUNET_break (0);
729     return;
730   }
731   GED_scheduler_handle_start (n, e);
732 }
733
734
735 /**
736  * Handle a response
737  *
738  * @param peer the source
739  * @param message the message
740  */
741 static void
742 handle_start_ack (const struct GNUNET_PeerIdentity *peer,
743                   const struct GNUNET_MessageHeader *message)
744 {
745   uint16_t size;
746   uint32_t name_len;
747   const struct GED_start_ack_message *msg;
748   const char *name;
749   struct Node *n;
750   struct Experiment *e;
751
752   if (NULL == peer)
753   {
754     GNUNET_break (0);
755     return;
756   }
757   if (NULL == message)
758   {
759     GNUNET_break (0);
760     return;
761   }
762
763   size = ntohs (message->size);
764   if (size < sizeof (struct GED_start_ack_message))
765   {
766     GNUNET_break (0);
767     return;
768   }
769   msg = (const struct GED_start_ack_message *) message;
770   name_len = ntohl (msg->len_name);
771   if (size != sizeof (struct GED_start_message) + name_len)
772   {
773     GNUNET_break (0);
774     return;
775   }
776
777   n = get_node (peer);
778   if (NULL == n)
779   {
780     GNUNET_break (0);
781     return;
782   }
783   name = (const char *) &msg[1];
784   if (name[name_len-1] != '\0')
785   {
786     GNUNET_break (0);
787     return;
788   }
789   if (name_len != strlen (name) + 1)
790   {
791     GNUNET_break (0);
792     return;
793   }
794
795   e = GED_experiments_find (&msg->issuer, name, GNUNET_TIME_absolute_ntoh(msg->version_nbo));
796   if (NULL == e)
797   {
798     GNUNET_break (0);
799     return;
800   }
801   GED_scheduler_handle_start_ack (n, e);
802 }
803
804
805 /**
806  * Handle a response
807  *
808  * @param peer the source
809  * @param message the message
810  */
811 static void
812 handle_stop (const struct GNUNET_PeerIdentity *peer,
813              const struct GNUNET_MessageHeader *message)
814 {
815         uint16_t size;
816         uint32_t name_len;
817         const struct GED_stop_message *msg;
818         const char *name;
819         struct Node *n;
820         struct Experiment *e;
821
822         if (NULL == peer)
823         {
824                 GNUNET_break (0);
825                 return;
826         }
827         if (NULL == message)
828         {
829                 GNUNET_break (0);
830                 return;
831         }
832
833         size = ntohs (message->size);
834         if (size < sizeof (struct GED_stop_message))
835         {
836                 GNUNET_break (0);
837                 return;
838         }
839         msg = (const struct GED_stop_message *) message;
840         name_len = ntohl (msg->len_name);
841         if (size != sizeof (struct GED_start_message) + name_len)
842         {
843                 GNUNET_break (0);
844                 return;
845         }
846
847         n = get_node (peer);
848         if (NULL == n)
849         {
850                 GNUNET_break (0);
851                 return;
852         }
853         name = (const char *) &msg[1];
854         if (name[name_len-1] != '\0')
855         {
856                 GNUNET_break (0);
857                 return;
858         }
859
860         if (name_len != strlen (name) + 1)
861         {
862                 GNUNET_break (0);
863                 return;
864         }
865
866         e = GED_experiments_find (&msg->issuer, name, GNUNET_TIME_absolute_ntoh(msg->version_nbo));
867         if (NULL == e)
868         {
869                 GNUNET_break (0);
870                 return;
871         }
872         GED_scheduler_handle_stop (n, e);
873 }
874
875
876 /**
877  * Method called whenever a given peer connects.
878  *
879  * @param cls closure
880  * @param peer peer identity this notification is about
881  */
882 static void
883 core_connect_handler (void *cls,
884                       const struct GNUNET_PeerIdentity *peer)
885 {
886   if (GNUNET_YES == is_me(peer))
887     return;
888
889   GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Connected to peer %s\n"),
890               GNUNET_i2s (peer));
891
892   if (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (nodes_requested, peer))
893     return; /* We already sent a request */
894
895   if (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (nodes_active, peer))
896     return; /* This peer is known as active  */
897
898   if (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (nodes_inactive, peer))
899     return; /* This peer is known as inactive  */
900
901   send_experimentation_request (peer);
902 }
903
904
905 /**
906  * Method called whenever a given peer disconnects.
907  *
908  * @param cls closure
909  * @param peer peer identity this notification is about
910  */
911 static void
912 core_disconnect_handler (void *cls,
913                          const struct GNUNET_PeerIdentity * peer)
914 {
915         struct Node *n;
916         if (GNUNET_YES == is_me(peer))
917                 return;
918
919         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Disconnected from peer %s\n"),
920                         GNUNET_i2s (peer));
921
922         if (NULL != (n = GNUNET_CONTAINER_multipeermap_get (nodes_requested, peer)))
923                 cleanup_node (nodes_requested, peer, n);
924
925         if (NULL != (n = GNUNET_CONTAINER_multipeermap_get (nodes_active, peer)))
926                 cleanup_node (nodes_active, peer, n);
927
928         if (NULL != (n = GNUNET_CONTAINER_multipeermap_get (nodes_inactive, peer)))
929                 cleanup_node (nodes_inactive, peer, n);
930 }
931
932
933 /**
934  * Handle a request and send a response
935  *
936  * @param cls unused
937  * @param other the sender
938  * @param message the message
939  * @return GNUNET_OK to keep connection, GNUNET_SYSERR on error
940  */
941 static int
942 core_receive_handler (void *cls,
943                       const struct GNUNET_PeerIdentity *other,
944                       const struct GNUNET_MessageHeader *message)
945 {
946         if (ntohs (message->size) < sizeof (struct GNUNET_MessageHeader))
947         {
948                         GNUNET_break (0);
949                         return GNUNET_SYSERR;
950         }
951
952         switch (ntohs (message->type)) {
953                 case GNUNET_MESSAGE_TYPE_EXPERIMENTATION_REQUEST:
954                         handle_request (other, message);
955                         break;
956                 case GNUNET_MESSAGE_TYPE_EXPERIMENTATION_RESPONSE:
957                         handle_response (other, message);
958                         break;
959                 case GNUNET_MESSAGE_TYPE_EXPERIMENTATION_START:
960                         handle_start (other, message);
961                         break;
962                 case GNUNET_MESSAGE_TYPE_EXPERIMENTATION_START_ACK:
963                         handle_start_ack (other, message);
964                         break;
965                 case GNUNET_MESSAGE_TYPE_EXPERIMENTATION_STOP:
966                         handle_stop (other, message);
967                         break;
968                 default:
969                         break;
970         }
971
972         return GNUNET_OK;
973 }
974
975
976 static size_t
977 node_experiment_start_cb (void *cls, size_t bufsize, void *buf)
978 {
979         struct NodeComCtx *e_ctx = cls;
980         struct GED_start_message *msg;
981         size_t name_len;
982         size_t size;
983
984         if (NULL == buf)
985                 return 0;
986
987         name_len = strlen(e_ctx->e->name) + 1;
988         size = sizeof (struct GED_start_message) + name_len;
989
990         msg = GNUNET_malloc (size);
991         msg->header.size = htons (size);
992         msg->header.type = htons (GNUNET_MESSAGE_TYPE_EXPERIMENTATION_START);
993         msg->issuer = e_ctx->e->issuer;
994         msg->version_nbo = GNUNET_TIME_absolute_hton(e_ctx->e->version);
995         msg->len_name = htonl (name_len);
996         memcpy (&msg[1], e_ctx->e->name, name_len);
997
998         memcpy (buf, msg, size);
999         GNUNET_free (msg);
1000         return size;
1001 }
1002
1003
1004 static size_t
1005 node_experiment_start_ack_cb (void *cls, size_t bufsize, void *buf)
1006 {
1007         struct NodeComCtx *e_ctx = cls;
1008         struct GED_start_ack_message *msg;
1009         size_t name_len;
1010         size_t size;
1011         if (NULL == buf)
1012                 return 0;
1013
1014         name_len = strlen(e_ctx->e->name) + 1;
1015         size = sizeof (struct GED_start_ack_message) + name_len;
1016
1017         msg = GNUNET_malloc (size);
1018         msg->header.size = htons (size);
1019         msg->header.type = htons (GNUNET_MESSAGE_TYPE_EXPERIMENTATION_START_ACK);
1020         msg->issuer = e_ctx->e->issuer;
1021         msg->version_nbo = GNUNET_TIME_absolute_hton(e_ctx->e->version);
1022         msg->len_name = htonl (name_len);
1023         memcpy (&msg[1], e_ctx->e->name, name_len);
1024
1025         memcpy (buf, msg, size);
1026         GNUNET_free (msg);
1027         return size;
1028 }
1029
1030
1031
1032
1033 /**
1034  * Confirm a experiment START with a node
1035  *
1036  * @return GNUNET_NO if core was busy with sending, GNUNET_OK otherwise
1037  */
1038 int
1039 GED_nodes_send_start_ack (struct Node *n, struct Experiment *e)
1040 {
1041         struct NodeComCtx *e_ctx;
1042
1043         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1044                         "Sending %s for experiment request to peer `%s' for experiment `%s'\n",
1045                         "START_ACK" ,GNUNET_i2s(&n->id), e->name);
1046
1047         e_ctx = GNUNET_malloc (sizeof (struct NodeComCtx));
1048         e_ctx->n = n;
1049         e_ctx->e = e;
1050         e_ctx->size = sizeof (struct GED_start_ack_message) + strlen (e->name) + 1;
1051         e_ctx->notify = &node_experiment_start_ack_cb;
1052         e_ctx->notify_cls = e_ctx;
1053
1054         GNUNET_CONTAINER_DLL_insert_tail (n->e_req_head, n->e_req_tail, e_ctx);
1055         schedule_transmisson (e_ctx);
1056         return GNUNET_OK;
1057 }
1058
1059
1060 /**
1061  * Request a experiment to start with a node
1062  *
1063  * @return GNUNET_NO if core was busy with sending, GNUNET_OK otherwise
1064  */
1065 int
1066 GED_nodes_send_start (struct Node *n, struct Experiment *e)
1067 {
1068         struct NodeComCtx *e_ctx;
1069
1070         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1071                         "Sending %s for experiment request to peer `%s' for experiment `%s'\n",
1072                         "START", GNUNET_i2s(&n->id), e->name);
1073
1074         e_ctx = GNUNET_malloc (sizeof (struct NodeComCtx));
1075         e_ctx->n = n;
1076         e_ctx->e = e;
1077         e_ctx->size = sizeof (struct GED_start_message) + strlen (e->name) + 1;
1078         e_ctx->notify = &node_experiment_start_cb;
1079         e_ctx->notify_cls = e_ctx;
1080
1081         GNUNET_CONTAINER_DLL_insert_tail (n->e_req_head, n->e_req_tail, e_ctx);
1082         schedule_transmisson (e_ctx);
1083         return GNUNET_OK;
1084 }
1085
1086
1087 /**
1088  * Start the nodes management
1089  */
1090 void
1091 GED_nodes_start ()
1092 {
1093         /* Connecting to core service to find partners */
1094         ch = GNUNET_CORE_connect (GED_cfg, NULL,
1095                                                                                                                 &core_startup_handler,
1096                                                                                                                 &core_connect_handler,
1097                                                                                                                 &core_disconnect_handler,
1098                                                                                                                 &core_receive_handler,
1099                                                                                                                 GNUNET_NO, NULL, GNUNET_NO, NULL);
1100         if (NULL == ch)
1101         {
1102                         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Failed to connect to CORE service!\n"));
1103                         return;
1104         }
1105
1106         nodes_requested = GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO);
1107         nodes_active = GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO);
1108         nodes_inactive = GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO);
1109 }
1110
1111
1112 /**
1113  * Stop the nodes management
1114  */
1115 void
1116 GED_nodes_stop ()
1117 {
1118   if (NULL != ch)
1119   {
1120                 GNUNET_CORE_disconnect (ch);
1121                 ch = NULL;
1122   }
1123
1124   if (NULL != nodes_requested)
1125   {
1126                 GNUNET_CONTAINER_multipeermap_iterate (nodes_requested,
1127                                                                                                                                                                          &cleanup_node,
1128                                                                                                                                                                          nodes_requested);
1129                 update_stats (nodes_requested);
1130                 GNUNET_CONTAINER_multipeermap_destroy (nodes_requested);
1131                 nodes_requested = NULL;
1132   }
1133
1134   if (NULL != nodes_active)
1135   {
1136                 GNUNET_CONTAINER_multipeermap_iterate (nodes_active,
1137                                                                                                                                                                          &cleanup_node,
1138                                                                                                                                                                          nodes_active);
1139                 update_stats (nodes_active);
1140                 GNUNET_CONTAINER_multipeermap_destroy (nodes_active);
1141                 nodes_active = NULL;
1142   }
1143
1144   if (NULL != nodes_inactive)
1145   {
1146                 GNUNET_CONTAINER_multipeermap_iterate (nodes_inactive,
1147                                                                                                                                                                          &cleanup_node,
1148                                                                                                                                                                          nodes_inactive);
1149                 update_stats (nodes_inactive);
1150                 GNUNET_CONTAINER_multipeermap_destroy (nodes_inactive);
1151                 nodes_inactive = NULL;
1152   }
1153 }
1154
1155 /* end of gnunet-daemon-experimentation_nodes.c */