new unified communication mechanism
[oweals/gnunet.git] / src / experimentation / gnunet-daemon-experimentation_nodes.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file experimentation/gnunet-daemon-experimentation_nodes.c
23  * @brief experimentation daemon: node management
24  * @author Christian Grothoff
25  * @author Matthias Wachs
26  */
27 #include "platform.h"
28 #include "gnunet_getopt_lib.h"
29 #include "gnunet_util_lib.h"
30 #include "gnunet_core_service.h"
31 #include "gnunet_statistics_service.h"
32 #include "gnunet-daemon-experimentation.h"
33
34
35 #define FAST_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
36 /**
37  * Core handle
38  */
39 static struct GNUNET_CORE_Handle *ch;
40
41
42 /**
43  * Peer's own identity
44  */
45 static struct GNUNET_PeerIdentity me;
46
47
48 /**
49  * Nodes with a pending request
50  */
51 struct GNUNET_CONTAINER_MultiHashMap *nodes_requested;
52
53
54 /**
55  * Active experimentation nodes
56  */
57 struct GNUNET_CONTAINER_MultiHashMap *nodes_active;
58
59
60 /**
61  * Inactive experimentation nodes
62  * To be excluded from future requests
63  */
64 struct GNUNET_CONTAINER_MultiHashMap *nodes_inactive;
65
66 struct NodeComCtx
67 {
68         struct NodeComCtx *prev;
69         struct NodeComCtx *next;
70
71         struct Node *n;
72         struct Experiment *e;
73
74         size_t size;
75         GNUNET_CONNECTION_TransmitReadyNotify notify;
76         void *notify_cls;
77 };
78
79
80 /**
81  * Update statistics
82  *
83  * @param m hashmap to update values from
84  */
85 static void update_stats (struct GNUNET_CONTAINER_MultiHashMap *m)
86 {
87         GNUNET_assert (NULL != m);
88         GNUNET_assert (NULL != GED_stats);
89
90         if (m == nodes_active)
91         {
92                         GNUNET_STATISTICS_set (GED_stats, "# nodes active",
93                                         GNUNET_CONTAINER_multihashmap_size(m), GNUNET_NO);
94         }
95         else if (m == nodes_inactive)
96         {
97                         GNUNET_STATISTICS_set (GED_stats, "# nodes inactive",
98                                         GNUNET_CONTAINER_multihashmap_size(m), GNUNET_NO);
99         }
100         else if (m == nodes_requested)
101         {
102                         GNUNET_STATISTICS_set (GED_stats, "# nodes requested",
103                                         GNUNET_CONTAINER_multihashmap_size(m), GNUNET_NO);
104         }
105         else
106                 GNUNET_break (0);
107
108 }
109
110
111 /**
112  * Clean up node
113  *
114  * @param cls the hashmap to clean up
115  * @param key key of the current node
116  * @param value related node object
117  * @return always GNUNET_OK
118  */
119 static int
120 cleanup_node (void *cls,
121                                                          const struct GNUNET_HashCode * key,
122                                                          void *value)
123 {
124         struct Node *n;
125         struct NodeComCtx *e_cur;
126         struct NodeComCtx *e_next;
127         struct GNUNET_CONTAINER_MultiHashMap *cur = cls;
128
129         n = value;
130         if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
131         {
132                 GNUNET_SCHEDULER_cancel (n->timeout_task);
133                 n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
134         }
135
136         if (NULL != n->cth)
137         {
138                 GNUNET_CORE_notify_transmit_ready_cancel (n->cth);
139                 n->cth = NULL;
140         }
141         e_next = n->e_req_head;
142         while (NULL != (e_cur = e_next))
143         {
144                 e_next = e_cur->next;
145                 GNUNET_CONTAINER_DLL_remove (n->e_req_head, n->e_req_tail, e_cur);
146                 GNUNET_free (e_cur);
147         }
148
149         GNUNET_free_non_null (n->issuer_id);
150
151         GNUNET_CONTAINER_multihashmap_remove (cur, key, value);
152         GNUNET_free (value);
153         return GNUNET_OK;
154 }
155
156
157 /**
158  * Check if id passed is my id
159  *
160  * @param id the id to check
161  * @return GNUNET_YES or GNUNET_NO
162  */
163 static int is_me (const struct GNUNET_PeerIdentity *id)
164 {
165         if (0 == memcmp (&me, id, sizeof (me)))
166                 return GNUNET_YES;
167         else
168                 return GNUNET_NO;
169 }
170
171 /**
172  * Core startup callback
173  *
174  * @param cls unused
175  * @param server core service's server handle
176  * @param my_identity my id
177  */
178 static void
179 core_startup_handler (void *cls,
180                                                                                         struct GNUNET_CORE_Handle *server,
181                       const struct GNUNET_PeerIdentity *my_identity)
182 {
183         me = *my_identity;
184 }
185
186 void
187 schedule_transmisson (struct NodeComCtx *e_ctx);
188
189 size_t
190 transmit_read_wrapper (void *cls, size_t bufsize, void *buf)
191 {
192         struct NodeComCtx *e_ctx = cls;
193         struct NodeComCtx *next = NULL;
194
195         size_t res = e_ctx->notify (e_ctx->notify_cls, bufsize, buf);
196         e_ctx->n->cth = NULL;
197
198         GNUNET_CONTAINER_DLL_remove (e_ctx->n->e_req_head, e_ctx->n->e_req_tail, e_ctx);
199         next = e_ctx->n->e_req_head;
200         GNUNET_free (e_ctx);
201
202         if (NULL != next)
203         {
204                 /* Schedule next message */
205                 schedule_transmisson (next);
206         }
207         return res;
208 }
209
210 void
211 schedule_transmisson (struct NodeComCtx *e_ctx)
212 {
213         if (NULL != e_ctx->n->cth)
214                 return;
215
216         e_ctx->n->cth = GNUNET_CORE_notify_transmit_ready (ch, GNUNET_NO, 0, FAST_TIMEOUT,
217                         &e_ctx->n->id, e_ctx->size, transmit_read_wrapper, e_ctx);
218         if (NULL == e_ctx->n->cth)
219         {
220                 GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Cannot send message to peer `%s' for experiment `%s'\n"),
221                                 GNUNET_i2s(&e_ctx->n->id), e_ctx->e->name);
222                 GNUNET_free (e_ctx);
223         }
224
225 }
226
227
228 /**
229  * Remove experimentation request due to timeout
230  *
231  * @param cls the related node
232  * @param tc scheduler's task context
233  */
234 static void
235 remove_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
236 {
237         struct Node *n = cls;
238
239         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Removing request for peer %s due to timeout\n"),
240                         GNUNET_i2s (&n->id));
241
242         if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (nodes_requested, &n->id.hashPubKey))
243         {
244                         GNUNET_CONTAINER_multihashmap_remove (nodes_requested, &n->id.hashPubKey, n);
245                         update_stats (nodes_requested);
246                         GNUNET_CONTAINER_multihashmap_put (nodes_inactive, &n->id.hashPubKey, n,
247                                         GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
248                         update_stats (nodes_inactive);
249         }
250
251         n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
252         if (NULL != n->cth)
253         {
254                 GNUNET_CORE_notify_transmit_ready_cancel (n->cth);
255                 n->cth = NULL;
256         }
257 }
258
259
260 /**
261  * Core's transmit notify callback to send request
262  *
263  * @param cls the related node
264  * @param bufsize buffer size
265  * @param buf the buffer to copy to
266  * @return bytes passed
267  */
268 size_t send_experimentation_request_cb (void *cls, size_t bufsize, void *buf)
269 {
270         struct Node *n = cls;
271         struct Experimentation_Request msg;
272         size_t msg_size = sizeof (msg);
273         size_t ri_size = sizeof (struct Experimentation_Issuer) * GSE_my_issuer_count;
274         size_t total_size = msg_size + ri_size;
275
276         memset (buf, '0', bufsize);
277         n->cth = NULL;
278   if (buf == NULL)
279   {
280     /* client disconnected */
281     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client disconnected\n");
282     if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
283                 GNUNET_SCHEDULER_cancel (n->timeout_task);
284     GNUNET_SCHEDULER_add_now (&remove_request, n);
285     return 0;
286   }
287   GNUNET_assert (bufsize >= total_size);
288
289         msg.msg.size = htons (total_size);
290         msg.msg.type = htons (GNUNET_MESSAGE_TYPE_EXPERIMENTATION_REQUEST);
291         msg.capabilities = htonl (GSE_node_capabilities);
292         msg.issuer_count = htonl (GSE_my_issuer_count);
293         memcpy (buf, &msg, msg_size);
294         memcpy (&((char *) buf)[msg_size], GSE_my_issuer, ri_size);
295
296         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Sending request to peer %s\n"),
297                         GNUNET_i2s (&n->id));
298         return total_size;
299 }
300
301
302 /**
303  * Send request to peer to start add him to to the set of experimentation nodes
304  *
305  * @param peer the peer to send to
306  */
307 static void send_experimentation_request (const struct GNUNET_PeerIdentity *peer)
308 {
309         struct Node *n;
310         struct NodeComCtx *e_ctx;
311         size_t size;
312         size_t c_issuers;
313
314         c_issuers = GSE_my_issuer_count;
315
316         size = sizeof (struct Experimentation_Request) +
317                                  c_issuers * sizeof (struct Experimentation_Issuer);
318         n = GNUNET_malloc (sizeof (struct Node));
319         n->id = *peer;
320         n->timeout_task = GNUNET_SCHEDULER_add_delayed (EXP_RESPONSE_TIMEOUT, &remove_request, n);
321         n->capabilities = NONE;
322
323         e_ctx = GNUNET_malloc (sizeof (struct NodeComCtx));
324         e_ctx->n = n;
325         e_ctx->e = NULL;
326         e_ctx->size = size;
327         e_ctx->notify = &send_experimentation_request_cb;
328         e_ctx->notify_cls = n;
329         GNUNET_CONTAINER_DLL_insert_tail(n->e_req_head, n->e_req_tail, e_ctx);
330         schedule_transmisson (e_ctx);
331
332         GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put (nodes_requested,
333                         &peer->hashPubKey, n, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
334         update_stats (nodes_requested);
335 }
336
337
338 /**
339  * Core's transmit notify callback to send response
340  *
341  * @param cls the related node
342  * @param bufsize buffer size
343  * @param buf the buffer to copy to
344  * @return bytes passed
345  */
346 size_t send_response_cb (void *cls, size_t bufsize, void *buf)
347 {
348         struct Node *n = cls;
349         struct Experimentation_Response msg;
350         size_t ri_size = GSE_my_issuer_count * sizeof (struct Experimentation_Issuer);
351         size_t msg_size = sizeof (msg);
352         size_t total_size = msg_size + ri_size;
353
354         n->cth = NULL;
355   if (buf == NULL)
356   {
357     /* client disconnected */
358     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client disconnected\n");
359     return 0;
360   }
361   GNUNET_assert (bufsize >= total_size);
362
363         msg.msg.size = htons (total_size);
364         msg.msg.type = htons (GNUNET_MESSAGE_TYPE_EXPERIMENTATION_RESPONSE);
365         msg.capabilities = htonl (GSE_node_capabilities);
366         msg.issuer_count = htonl (GSE_my_issuer_count);
367         memcpy (buf, &msg, msg_size);
368         memcpy (&((char *) buf)[msg_size], GSE_my_issuer, ri_size);
369
370         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Sending response to peer %s\n"),
371                         GNUNET_i2s (&n->id));
372         return total_size;
373 }
374
375
376 static void
377 get_experiments_cb (struct Node *n, struct Experiment *e)
378 {
379         static int counter = 0;
380         if (NULL == e)
381         {
382                         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Added %u experiments for peer %s\n"),
383                                         counter, GNUNET_i2s (&n->id));
384                         return;
385         }
386
387         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Starting experiment `%s' with peer %s\n"),
388                         e->name,
389                         GNUNET_i2s (&n->id));
390
391         /* Tell the scheduler to add a node with an experiment */
392         GED_scheduler_add (n, e, GNUNET_YES);
393         counter ++;
394 }
395
396 struct Node *
397 get_node (const struct GNUNET_PeerIdentity *id)
398 {
399         struct Node * res;
400         struct Node * tmp;
401
402         res = NULL;
403         tmp = NULL;
404         tmp = GNUNET_CONTAINER_multihashmap_get (nodes_active, &id->hashPubKey);
405         if (res == NULL)
406                 res = tmp;
407
408         tmp = GNUNET_CONTAINER_multihashmap_get (nodes_inactive, &id->hashPubKey);
409         if (res == NULL)
410                 res = tmp;
411         else
412                 GNUNET_break (0); /* Multiple instances */
413
414         tmp = GNUNET_CONTAINER_multihashmap_get (nodes_requested, &id->hashPubKey);
415         if (res == NULL)
416                 res = tmp;
417         else
418                 GNUNET_break (0); /* Multiple instances */
419
420         return res;
421 }
422
423
424 /**
425  * Set a specific node as active
426  *
427  * @param n the node
428  */
429 static void node_make_active (struct Node *n)
430 {
431         int c1;
432   GNUNET_CONTAINER_multihashmap_put (nodes_active,
433                         &n->id.hashPubKey, n, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
434         update_stats (nodes_active);
435         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Added peer `%s' as active node\n"),
436                         GNUNET_i2s (&n->id));
437 return;
438         /* Request experiments for this node to start them */
439         for (c1 = 0; c1 < n->issuer_count; c1++)
440         {
441
442                 GED_experiments_get (n, &n->issuer_id[c1], &get_experiments_cb);
443         }
444 }
445
446
447 /**
448  * Handle a request and send a response
449  *
450  * @param peer the source
451  * @param message the message
452  */
453 static void handle_request (const struct GNUNET_PeerIdentity *peer,
454                                                                                                                 const struct GNUNET_MessageHeader *message)
455 {
456         struct Node *n;
457         struct NodeComCtx *e_ctx;
458         struct Experimentation_Request *rm = (struct Experimentation_Request *) message;
459         struct Experimentation_Issuer *rmi = (struct Experimentation_Issuer *) &rm[1];
460         int c1;
461         int c2;
462         uint32_t ic;
463         uint32_t ic_accepted;
464         int make_active;
465
466         if (ntohs (message->size) < sizeof (struct Experimentation_Request))
467         {
468                 GNUNET_break (0);
469                 return;
470         }
471         ic = ntohl (rm->issuer_count);
472         if (ntohs (message->size) != sizeof (struct Experimentation_Request) + ic * sizeof (struct Experimentation_Issuer))
473         {
474                 GNUNET_break (0);
475                 return;
476         }
477
478         make_active = GNUNET_NO;
479         if (NULL != (n = GNUNET_CONTAINER_multihashmap_get (nodes_active, &peer->hashPubKey)))
480         {
481                         /* Nothing to do */
482         }
483         else if (NULL != (n = GNUNET_CONTAINER_multihashmap_get (nodes_requested, &peer->hashPubKey)))
484         {
485                         GNUNET_CONTAINER_multihashmap_remove (nodes_requested, &peer->hashPubKey, n);
486                         if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
487                         {
488                                 GNUNET_SCHEDULER_cancel (n->timeout_task);
489                                 n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
490                         }
491                         if (NULL != n->cth)
492                         {
493                                 GNUNET_CORE_notify_transmit_ready_cancel (n->cth);
494                                 n->cth = NULL;
495                         }
496                         update_stats (nodes_requested);
497                         make_active = GNUNET_YES;
498         }
499         else if (NULL != (n = GNUNET_CONTAINER_multihashmap_get (nodes_inactive, &peer->hashPubKey)))
500         {
501                         GNUNET_CONTAINER_multihashmap_remove (nodes_inactive, &peer->hashPubKey, n);
502                         update_stats (nodes_inactive);
503                         make_active = GNUNET_YES;
504         }
505         else
506         {
507                         /* Create new node */
508                         n = GNUNET_malloc (sizeof (struct Node));
509                         n->id = *peer;
510                         n->capabilities = NONE;
511                         make_active = GNUNET_YES;
512         }
513
514         /* Update node */
515         n->capabilities = ntohl (rm->capabilities);
516
517         /* Filter accepted issuer */
518         ic_accepted = 0;
519         for (c1 = 0; c1 < ic; c1++)
520         {
521                 if (GNUNET_YES == GED_experiments_issuer_accepted(&rmi[c1].issuer_id))
522                         ic_accepted ++;
523         }
524         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Request from peer `%s' with %u issuers, we accepted %u issuer \n"),
525                         GNUNET_i2s (peer), ic, ic_accepted);
526         GNUNET_free_non_null (n->issuer_id);
527         n->issuer_id = GNUNET_malloc (ic_accepted * sizeof (struct GNUNET_PeerIdentity));
528         c2 = 0;
529         for (c1 = 0; c1 < ic; c1++)
530         {
531                         if (GNUNET_YES == GED_experiments_issuer_accepted(&rmi[c1].issuer_id))
532                         {
533                                 n->issuer_id[c2] = rmi[c1].issuer_id;
534                                 c2 ++;
535                         }
536         }
537         n->issuer_count = ic_accepted;
538
539         if (GNUNET_YES == make_active)
540                 node_make_active (n);
541
542         /* Send response */
543         e_ctx = GNUNET_malloc (sizeof (struct NodeComCtx));
544         e_ctx->n = n;
545         e_ctx->e = NULL;
546         e_ctx->size = sizeof (struct Experimentation_Response) + GSE_my_issuer_count * sizeof (struct Experimentation_Issuer);
547         e_ctx->notify = &send_response_cb;
548         e_ctx->notify_cls = n;
549
550         GNUNET_CONTAINER_DLL_insert_tail(n->e_req_head, n->e_req_tail, e_ctx);
551         schedule_transmisson (e_ctx);
552 }
553
554
555 /**
556  * Handle a response
557  *
558  * @param peer the source
559  * @param message the message
560  */
561 static void handle_response (const struct GNUNET_PeerIdentity *peer,
562                                                                                                                  const struct GNUNET_MessageHeader *message)
563 {
564         struct Node *n;
565         struct Experimentation_Response *rm = (struct Experimentation_Response *) message;
566         struct Experimentation_Issuer *rmi = (struct Experimentation_Issuer *) &rm[1];
567         uint32_t ic;
568         uint32_t ic_accepted;
569         int make_active;
570         unsigned int c1;
571         unsigned int c2;
572
573         if (ntohs (message->size) < sizeof (struct Experimentation_Response))
574         {
575                 GNUNET_break (0);
576                 return;
577         }
578         ic = ntohl (rm->issuer_count);
579         if (ntohs (message->size) != sizeof (struct Experimentation_Response) + ic * sizeof (struct Experimentation_Issuer))
580         {
581                 GNUNET_break (0);
582                 return;
583         }
584
585         make_active = GNUNET_NO;
586         if (NULL != (n = GNUNET_CONTAINER_multihashmap_get (nodes_active, &peer->hashPubKey)))
587         {
588                         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Received %s from %s peer `%s'\n"),
589                                         "RESPONSE", "active", GNUNET_i2s (peer));
590         }
591         else if (NULL != (n = GNUNET_CONTAINER_multihashmap_get (nodes_requested, &peer->hashPubKey)))
592         {
593                         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Received %s from %s peer `%s'\n"),
594                                         "RESPONSE", "requested", GNUNET_i2s (peer));
595                         GNUNET_CONTAINER_multihashmap_remove (nodes_requested, &peer->hashPubKey, n);
596                         if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
597                         {
598                                 GNUNET_SCHEDULER_cancel (n->timeout_task);
599                                 n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
600                         }
601                         if (NULL != n->cth)
602                         {
603                                 GNUNET_CORE_notify_transmit_ready_cancel (n->cth);
604                                 n->cth = NULL;
605                         }
606                         update_stats (nodes_requested);
607                         make_active = GNUNET_YES;
608         }
609         else if (NULL != (n = GNUNET_CONTAINER_multihashmap_get (nodes_inactive, &peer->hashPubKey)))
610         {
611                         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Received %s from peer `%s'\n"),
612                                         "RESPONSE", "inactive", GNUNET_i2s (peer));
613                         GNUNET_CONTAINER_multihashmap_remove (nodes_inactive, &peer->hashPubKey, n);
614                         update_stats (nodes_inactive);
615                         make_active = GNUNET_YES;
616         }
617         else
618         {
619                         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Received %s from %s peer `%s'\n"),
620                                         "RESPONSE", "unknown", GNUNET_i2s (peer));
621                         return;
622         }
623
624         /* Update */
625         n->capabilities = ntohl (rm->capabilities);
626
627         /* Filter accepted issuer */
628         ic_accepted = 0;
629         for (c1 = 0; c1 < ic; c1++)
630         {
631                 if (GNUNET_YES == GED_experiments_issuer_accepted(&rmi[c1].issuer_id))
632                         ic_accepted ++;
633         }
634         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Response from peer `%s' with %u issuers, we accepted %u issuer \n"),
635                         GNUNET_i2s (peer), ic, ic_accepted);
636         GNUNET_free_non_null (n->issuer_id);
637         n->issuer_id = GNUNET_malloc (ic_accepted * sizeof (struct GNUNET_PeerIdentity));
638         c2 = 0;
639         for (c1 = 0; c1 < ic; c1++)
640         {
641                         if (GNUNET_YES == GED_experiments_issuer_accepted(&rmi[c1].issuer_id))
642                         {
643                                 n->issuer_id[c2] = rmi[c1].issuer_id;
644                                 c2 ++;
645                         }
646         }
647         n->issuer_count = ic_accepted;
648
649         if (GNUNET_YES == make_active)
650                 node_make_active (n);
651 }
652
653 /**
654  * Handle a response
655  *
656  * @param peer the source
657  * @param message the message
658  */
659 static void handle_start (const struct GNUNET_PeerIdentity *peer,
660                                                                                                                  const struct GNUNET_MessageHeader *message)
661 {
662         uint16_t size;
663         uint32_t name_len;
664         const struct GED_start_message *msg;
665         const char *name;
666         struct Node *n;
667         struct Experiment *e;
668
669         if (NULL == peer)
670         {
671                 GNUNET_break (0);
672                 return;
673         }
674         if (NULL == message)
675         {
676                 GNUNET_break (0);
677                 return;
678         }
679
680         size = ntohs (message->size);
681         if (size < sizeof (struct GED_start_message))
682         {
683                 GNUNET_break (0);
684                 return;
685         }
686         msg = (const struct GED_start_message *) message;
687         name_len = ntohl (msg->len_name);
688         if (size != sizeof (struct GED_start_message) + name_len)
689         {
690                 GNUNET_break (0);
691                 return;
692         }
693
694         n = get_node (peer);
695         if (NULL == n)
696         {
697                 GNUNET_break (0);
698                 return;
699         }
700         name = (const char *) &msg[1];
701         if (name[name_len-1] != '\0')
702         {
703                 GNUNET_break (0);
704                 return;
705         }
706
707         if (name_len != strlen (name) + 1)
708         {
709                 GNUNET_break (0);
710                 return;
711         }
712
713         e = GED_experiments_find (&msg->issuer, name, GNUNET_TIME_absolute_ntoh(msg->version_nbo));
714         if (NULL == e)
715         {
716                 GNUNET_break (0);
717                 return;
718         }
719
720         GED_scheduler_handle_start (n, e);
721 }
722
723 /**
724  * Handle a response
725  *
726  * @param peer the source
727  * @param message the message
728  */
729 static void handle_start_ack (const struct GNUNET_PeerIdentity *peer,
730                                                                                                                  const struct GNUNET_MessageHeader *message)
731 {
732         uint16_t size;
733         uint32_t name_len;
734         const struct GED_start_ack_message *msg;
735         const char *name;
736         struct Node *n;
737         struct Experiment *e;
738
739         if (NULL == peer)
740         {
741                 GNUNET_break (0);
742                 return;
743         }
744         if (NULL == message)
745         {
746                 GNUNET_break (0);
747                 return;
748         }
749
750         size = ntohs (message->size);
751         if (size < sizeof (struct GED_start_ack_message))
752         {
753                 GNUNET_break (0);
754                 return;
755         }
756         msg = (const struct GED_start_ack_message *) message;
757         name_len = ntohl (msg->len_name);
758         if (size != sizeof (struct GED_start_message) + name_len)
759         {
760                 GNUNET_break (0);
761                 return;
762         }
763
764         n = get_node (peer);
765         if (NULL == n)
766         {
767                 GNUNET_break (0);
768                 return;
769         }
770         name = (const char *) &msg[1];
771         if (name[name_len-1] != '\0')
772         {
773                 GNUNET_break (0);
774                 return;
775         }
776
777         if (name_len != strlen (name) + 1)
778         {
779                 GNUNET_break (0);
780                 return;
781         }
782
783         e = GED_experiments_find (&msg->issuer, name, GNUNET_TIME_absolute_ntoh(msg->version_nbo));
784         if (NULL == e)
785         {
786                 GNUNET_break (0);
787                 return;
788         }
789         GED_scheduler_handle_start_ack (n, e);
790 }
791
792 /**
793  * Handle a response
794  *
795  * @param peer the source
796  * @param message the message
797  */
798 static void handle_stop (const struct GNUNET_PeerIdentity *peer,
799                                                                                                  const struct GNUNET_MessageHeader *message)
800 {
801         uint16_t size;
802         uint32_t name_len;
803         const struct GED_stop_message *msg;
804         const char *name;
805         struct Node *n;
806         struct Experiment *e;
807
808         if (NULL == peer)
809         {
810                 GNUNET_break (0);
811                 return;
812         }
813         if (NULL == message)
814         {
815                 GNUNET_break (0);
816                 return;
817         }
818
819         size = ntohs (message->size);
820         if (size < sizeof (struct GED_stop_message))
821         {
822                 GNUNET_break (0);
823                 return;
824         }
825         msg = (const struct GED_stop_message *) message;
826         name_len = ntohl (msg->len_name);
827         if (size != sizeof (struct GED_start_message) + name_len)
828         {
829                 GNUNET_break (0);
830                 return;
831         }
832
833         n = get_node (peer);
834         if (NULL == n)
835         {
836                 GNUNET_break (0);
837                 return;
838         }
839         name = (const char *) &msg[1];
840         if (name[name_len-1] != '\0')
841         {
842                 GNUNET_break (0);
843                 return;
844         }
845
846         if (name_len != strlen (name) + 1)
847         {
848                 GNUNET_break (0);
849                 return;
850         }
851
852         e = GED_experiments_find (&msg->issuer, name, GNUNET_TIME_absolute_ntoh(msg->version_nbo));
853         if (NULL == e)
854         {
855                 GNUNET_break (0);
856                 return;
857         }
858         GED_scheduler_handle_stop (n, e);
859 }
860
861 /**
862  * Method called whenever a given peer connects.
863  *
864  * @param cls closure
865  * @param peer peer identity this notification is about
866  */
867 void core_connect_handler (void *cls,
868                            const struct GNUNET_PeerIdentity *peer)
869 {
870         if (GNUNET_YES == is_me(peer))
871                 return;
872
873         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Connected to peer %s\n"),
874                         GNUNET_i2s (peer));
875
876         if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (nodes_requested, &peer->hashPubKey))
877                 return; /* We already sent a request */
878
879         if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (nodes_active, &peer->hashPubKey))
880                 return; /* This peer is known as active  */
881
882         if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (nodes_inactive, &peer->hashPubKey))
883                 return; /* This peer is known as inactive  */
884
885         send_experimentation_request (peer);
886 }
887
888
889 /**
890  * Method called whenever a given peer disconnects.
891  *
892  * @param cls closure
893  * @param peer peer identity this notification is about
894  */
895 void core_disconnect_handler (void *cls,
896                            const struct GNUNET_PeerIdentity * peer)
897 {
898         struct Node *n;
899         if (GNUNET_YES == is_me(peer))
900                 return;
901
902         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Disconnected from peer %s\n"),
903                         GNUNET_i2s (peer));
904
905         if (NULL != (n = GNUNET_CONTAINER_multihashmap_get (nodes_requested, &peer->hashPubKey)))
906                 cleanup_node (nodes_requested, &peer->hashPubKey, n);
907
908         if (NULL != (n = GNUNET_CONTAINER_multihashmap_get (nodes_active, &peer->hashPubKey)))
909                 cleanup_node (nodes_active, &peer->hashPubKey, n);
910
911         if (NULL != (n = GNUNET_CONTAINER_multihashmap_get (nodes_inactive, &peer->hashPubKey)))
912                 cleanup_node (nodes_inactive, &peer->hashPubKey, n);
913 }
914
915
916 /**
917  * Handle a request and send a response
918  *
919  * @param cls unused
920  * @param other the sender
921  * @param message the message
922  * @return GNUNET_OK to keep connection, GNUNET_SYSERR on error
923  */
924 static int
925 core_receive_handler (void *cls,
926                                                                                         const struct GNUNET_PeerIdentity *other,
927                                                                                         const struct GNUNET_MessageHeader *message)
928 {
929         if (ntohs (message->size) < sizeof (struct GNUNET_MessageHeader))
930         {
931                         GNUNET_break (0);
932                         return GNUNET_SYSERR;
933         }
934
935         switch (ntohs (message->type)) {
936                 case GNUNET_MESSAGE_TYPE_EXPERIMENTATION_REQUEST:
937                         handle_request (other, message);
938                         break;
939                 case GNUNET_MESSAGE_TYPE_EXPERIMENTATION_RESPONSE:
940                         handle_response (other, message);
941                         break;
942                 case GNUNET_MESSAGE_TYPE_EXPERIMENTATION_START:
943                         handle_start (other, message);
944                         break;
945                 case GNUNET_MESSAGE_TYPE_EXPERIMENTATION_START_ACK:
946                         handle_start_ack (other, message);
947                         break;
948                 case GNUNET_MESSAGE_TYPE_EXPERIMENTATION_STOP:
949                         handle_stop (other, message);
950                         break;
951                 default:
952                         break;
953         }
954
955         return GNUNET_OK;
956 }
957
958
959 size_t node_experiment_start_cb (void *cls, size_t bufsize, void *buf)
960 {
961         struct NodeComCtx *e_ctx = cls;
962         struct GED_start_message *msg;
963         size_t name_len;
964         size_t size;
965
966         if (NULL == buf)
967                 return 0;
968
969         name_len = strlen(e_ctx->e->name) + 1;
970         size = sizeof (struct GED_start_message) + name_len;
971
972         msg = GNUNET_malloc (size);
973         msg->header.size = htons (size);
974         msg->header.type = htons (GNUNET_MESSAGE_TYPE_EXPERIMENTATION_START);
975         msg->issuer = e_ctx->e->issuer;
976         msg->version_nbo = GNUNET_TIME_absolute_hton(e_ctx->e->version);
977         msg->len_name = htonl (name_len);
978         memcpy (&msg[1], e_ctx->e->name, name_len);
979
980         memcpy (buf, msg, size);
981         GNUNET_free (msg);
982         return size;
983 }
984
985 size_t node_experiment_start_ack_cb (void *cls, size_t bufsize, void *buf)
986 {
987         struct NodeComCtx *e_ctx = cls;
988         struct GED_start_ack_message *msg;
989         size_t name_len;
990         size_t size;
991         if (NULL == buf)
992                 return 0;
993
994         name_len = strlen(e_ctx->e->name) + 1;
995         size = sizeof (struct GED_start_ack_message) + name_len;
996
997         msg = GNUNET_malloc (size);
998         msg->header.size = htons (size);
999         msg->header.type = htons (GNUNET_MESSAGE_TYPE_EXPERIMENTATION_START_ACK);
1000         msg->issuer = e_ctx->e->issuer;
1001         msg->version_nbo = GNUNET_TIME_absolute_hton(e_ctx->e->version);
1002         msg->len_name = htonl (name_len);
1003         memcpy (&msg[1], e_ctx->e->name, name_len);
1004
1005         memcpy (buf, msg, size);
1006         GNUNET_free (msg);
1007         return size;
1008 }
1009
1010
1011
1012
1013 /**
1014  * Confirm a experiment START with a node
1015  *
1016  * @return GNUNET_NO if core was busy with sending, GNUNET_OK otherwise
1017  */
1018 int
1019 GED_nodes_send_start_ack (struct Node *n, struct Experiment *e)
1020 {
1021         struct NodeComCtx *e_ctx;
1022
1023         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Sending %s for experiment request to peer `%s' for experiment `%s'\n"),
1024                         "START_ACK" ,GNUNET_i2s(&n->id), e->name);
1025
1026         e_ctx = GNUNET_malloc (sizeof (struct NodeComCtx));
1027         e_ctx->n = n;
1028         e_ctx->e = e;
1029         e_ctx->size = sizeof (struct GED_start_ack_message) + strlen (e->name) + 1;
1030         e_ctx->notify = &node_experiment_start_ack_cb;
1031         e_ctx->notify_cls = e_ctx;
1032
1033         GNUNET_CONTAINER_DLL_insert_tail (n->e_req_head, n->e_req_tail, e_ctx);
1034         schedule_transmisson (e_ctx);
1035         return GNUNET_OK;
1036 }
1037
1038
1039 /**
1040  * Request a experiment to start with a node
1041  *
1042  * @return GNUNET_NO if core was busy with sending, GNUNET_OK otherwise
1043  */
1044 int
1045 GED_nodes_request_start (struct Node *n, struct Experiment *e)
1046 {
1047         struct NodeComCtx *e_ctx;
1048
1049         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Sending %s for experiment request to peer `%s' for experiment `%s'\n"),
1050                         "START", GNUNET_i2s(&n->id), e->name);
1051
1052         e_ctx = GNUNET_malloc (sizeof (struct NodeComCtx));
1053         e_ctx->n = n;
1054         e_ctx->e = e;
1055         e_ctx->size = sizeof (struct GED_start_message) + strlen (e->name) + 1;
1056         e_ctx->notify = &node_experiment_start_cb;
1057         e_ctx->notify_cls = e_ctx;
1058
1059         GNUNET_CONTAINER_DLL_insert_tail (n->e_req_head, n->e_req_tail, e_ctx);
1060         schedule_transmisson (e_ctx);
1061         return GNUNET_OK;
1062 }
1063
1064
1065 /**
1066  * Start the nodes management
1067  */
1068 void
1069 GED_nodes_start ()
1070 {
1071         /* Connecting to core service to find partners */
1072         ch = GNUNET_CORE_connect (GED_cfg, NULL,
1073                                                                                                                 &core_startup_handler,
1074                                                                                                                 &core_connect_handler,
1075                                                                                                                 &core_disconnect_handler,
1076                                                                                                                 &core_receive_handler,
1077                                                                                                                 GNUNET_NO, NULL, GNUNET_NO, NULL);
1078         if (NULL == ch)
1079         {
1080                         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Failed to connect to CORE service!\n"));
1081                         return;
1082         }
1083
1084         nodes_requested = GNUNET_CONTAINER_multihashmap_create (10, GNUNET_NO);
1085         nodes_active = GNUNET_CONTAINER_multihashmap_create (10, GNUNET_NO);
1086         nodes_inactive = GNUNET_CONTAINER_multihashmap_create (10, GNUNET_NO);
1087 }
1088
1089
1090 /**
1091  * Stop the nodes management
1092  */
1093 void
1094 GED_nodes_stop ()
1095 {
1096   if (NULL != nodes_requested)
1097   {
1098                 GNUNET_CONTAINER_multihashmap_iterate (nodes_requested,
1099                                                                                                                                                                          &cleanup_node,
1100                                                                                                                                                                          nodes_requested);
1101                 update_stats (nodes_requested);
1102                 GNUNET_CONTAINER_multihashmap_destroy (nodes_requested);
1103                 nodes_requested = NULL;
1104   }
1105
1106   if (NULL != nodes_active)
1107   {
1108                 GNUNET_CONTAINER_multihashmap_iterate (nodes_active,
1109                                                                                                                                                                          &cleanup_node,
1110                                                                                                                                                                          nodes_active);
1111                 update_stats (nodes_active);
1112                 GNUNET_CONTAINER_multihashmap_destroy (nodes_active);
1113                 nodes_active = NULL;
1114   }
1115
1116   if (NULL != nodes_inactive)
1117   {
1118                 GNUNET_CONTAINER_multihashmap_iterate (nodes_inactive,
1119                                                                                                                                                                          &cleanup_node,
1120                                                                                                                                                                          nodes_inactive);
1121                 update_stats (nodes_inactive);
1122                 GNUNET_CONTAINER_multihashmap_destroy (nodes_inactive);
1123                 nodes_inactive = NULL;
1124   }
1125   if (NULL != ch)
1126   {
1127                 GNUNET_CORE_disconnect (ch);
1128                 ch = NULL;
1129   }
1130 }
1131
1132 /* end of gnunet-daemon-experimentation_nodes.c */