fixes for list queues
[oweals/gnunet.git] / src / experimentation / gnunet-daemon-experimentation_nodes.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file experimentation/gnunet-daemon-experimentation_nodes.c
23  * @brief experimentation daemon: node management
24  * @author Christian Grothoff
25  * @author Matthias Wachs
26  */
27 #include "platform.h"
28 #include "gnunet_getopt_lib.h"
29 #include "gnunet_util_lib.h"
30 #include "gnunet_core_service.h"
31 #include "gnunet_statistics_service.h"
32 #include "gnunet-daemon-experimentation.h"
33
34
35 #define FAST_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
36 /**
37  * Core handle
38  */
39 static struct GNUNET_CORE_Handle *ch;
40
41
42 /**
43  * Peer's own identity
44  */
45 static struct GNUNET_PeerIdentity me;
46
47
48 /**
49  * Nodes with a pending request
50  */
51 struct GNUNET_CONTAINER_MultiHashMap *nodes_requested;
52
53
54 /**
55  * Active experimentation nodes
56  */
57 struct GNUNET_CONTAINER_MultiHashMap *nodes_active;
58
59
60 /**
61  * Inactive experimentation nodes
62  * To be excluded from future requests
63  */
64 struct GNUNET_CONTAINER_MultiHashMap *nodes_inactive;
65
66 struct NodeComCtx
67 {
68         struct NodeComCtx *prev;
69         struct NodeComCtx *next;
70
71         struct Node *n;
72         struct Experiment *e;
73
74         size_t size;
75         GNUNET_CONNECTION_TransmitReadyNotify notify;
76         void *notify_cls;
77 };
78
79
80 /**
81  * Update statistics
82  *
83  * @param m hashmap to update values from
84  */
85 static void update_stats (struct GNUNET_CONTAINER_MultiHashMap *m)
86 {
87         GNUNET_assert (NULL != m);
88         GNUNET_assert (NULL != GED_stats);
89
90         if (m == nodes_active)
91         {
92                         GNUNET_STATISTICS_set (GED_stats, "# nodes active",
93                                         GNUNET_CONTAINER_multihashmap_size(m), GNUNET_NO);
94         }
95         else if (m == nodes_inactive)
96         {
97                         GNUNET_STATISTICS_set (GED_stats, "# nodes inactive",
98                                         GNUNET_CONTAINER_multihashmap_size(m), GNUNET_NO);
99         }
100         else if (m == nodes_requested)
101         {
102                         GNUNET_STATISTICS_set (GED_stats, "# nodes requested",
103                                         GNUNET_CONTAINER_multihashmap_size(m), GNUNET_NO);
104         }
105         else
106                 GNUNET_break (0);
107
108 }
109
110
111 /**
112  * Clean up node
113  *
114  * @param cls the hashmap to clean up
115  * @param key key of the current node
116  * @param value related node object
117  * @return always GNUNET_OK
118  */
119 static int
120 cleanup_node (void *cls,
121                                                          const struct GNUNET_HashCode * key,
122                                                          void *value)
123 {
124         struct Node *n;
125         struct NodeComCtx *e_cur;
126         struct NodeComCtx *e_next;
127         struct GNUNET_CONTAINER_MultiHashMap *cur = cls;
128
129         n = value;
130         if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
131         {
132                 GNUNET_SCHEDULER_cancel (n->timeout_task);
133                 n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
134         }
135
136         if (NULL != n->cth)
137         {
138                 GNUNET_CORE_notify_transmit_ready_cancel (n->cth);
139                 n->cth = NULL;
140         }
141         e_next = n->e_req_head;
142         while (NULL != (e_cur = e_next))
143         {
144                 e_next = e_cur->next;
145                 GNUNET_CONTAINER_DLL_remove (n->e_req_head, n->e_req_tail, e_cur);
146                 GNUNET_free (e_cur);
147         }
148
149         GNUNET_free_non_null (n->issuer_id);
150
151         GNUNET_CONTAINER_multihashmap_remove (cur, key, value);
152         GNUNET_free (value);
153         return GNUNET_OK;
154 }
155
156
157 /**
158  * Check if id passed is my id
159  *
160  * @param id the id to check
161  * @return GNUNET_YES or GNUNET_NO
162  */
163 static int is_me (const struct GNUNET_PeerIdentity *id)
164 {
165         if (0 == memcmp (&me, id, sizeof (me)))
166                 return GNUNET_YES;
167         else
168                 return GNUNET_NO;
169 }
170
171 /**
172  * Core startup callback
173  *
174  * @param cls unused
175  * @param server core service's server handle
176  * @param my_identity my id
177  */
178 static void
179 core_startup_handler (void *cls,
180                                                                                         struct GNUNET_CORE_Handle *server,
181                       const struct GNUNET_PeerIdentity *my_identity)
182 {
183         me = *my_identity;
184 }
185
186 void
187 schedule_transmisson (struct NodeComCtx *e_ctx);
188
189 size_t
190 transmit_read_wrapper (void *cls, size_t bufsize, void *buf)
191 {
192         struct NodeComCtx *e_ctx = cls;
193         struct NodeComCtx *next = NULL;
194
195         size_t res = e_ctx->notify (e_ctx->notify_cls, bufsize, buf);
196         e_ctx->n->cth = NULL;
197
198         GNUNET_CONTAINER_DLL_remove (e_ctx->n->e_req_head, e_ctx->n->e_req_tail, e_ctx);
199         next = e_ctx->n->e_req_head;
200         GNUNET_free (e_ctx);
201
202         if (NULL != next)
203         {
204                 /* Schedule next message */
205                 schedule_transmisson (next);
206         }
207         return res;
208 }
209
210 void
211 schedule_transmisson (struct NodeComCtx *e_ctx)
212 {
213         if (NULL != e_ctx->n->cth)
214                 return;
215
216         e_ctx->n->cth = GNUNET_CORE_notify_transmit_ready (ch, GNUNET_NO, 0, FAST_TIMEOUT,
217                         &e_ctx->n->id, e_ctx->size, transmit_read_wrapper, e_ctx);
218         if (NULL == e_ctx->n->cth)
219         {
220                 GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Cannot send message to peer `%s' for experiment `%s'\n"),
221                                 GNUNET_i2s(&e_ctx->n->id), e_ctx->e->name);
222                 GNUNET_free (e_ctx);
223         }
224
225 }
226
227
228 /**
229  * Remove experimentation request due to timeout
230  *
231  * @param cls the related node
232  * @param tc scheduler's task context
233  */
234 static void
235 remove_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
236 {
237         struct Node *n = cls;
238
239         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Removing request for peer %s due to timeout\n"),
240                         GNUNET_i2s (&n->id));
241
242         if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (nodes_requested, &n->id.hashPubKey))
243         {
244                         GNUNET_CONTAINER_multihashmap_remove (nodes_requested, &n->id.hashPubKey, n);
245                         update_stats (nodes_requested);
246                         GNUNET_CONTAINER_multihashmap_put (nodes_inactive, &n->id.hashPubKey, n,
247                                         GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
248                         update_stats (nodes_inactive);
249         }
250         n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
251 }
252
253
254 /**
255  * Core's transmit notify callback to send request
256  *
257  * @param cls the related node
258  * @param bufsize buffer size
259  * @param buf the buffer to copy to
260  * @return bytes passed
261  */
262 size_t send_experimentation_request_cb (void *cls, size_t bufsize, void *buf)
263 {
264         struct Node *n = cls;
265         struct Experimentation_Request msg;
266         size_t msg_size = sizeof (msg);
267         size_t ri_size = sizeof (struct Experimentation_Issuer) * GSE_my_issuer_count;
268         size_t total_size = msg_size + ri_size;
269
270         memset (buf, '0', bufsize);
271         n->cth = NULL;
272   if (buf == NULL)
273   {
274     /* client disconnected */
275     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client disconnected\n");
276     if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
277                 GNUNET_SCHEDULER_cancel (n->timeout_task);
278     GNUNET_SCHEDULER_add_now (&remove_request, n);
279     return 0;
280   }
281   GNUNET_assert (bufsize >= total_size);
282
283         msg.msg.size = htons (total_size);
284         msg.msg.type = htons (GNUNET_MESSAGE_TYPE_EXPERIMENTATION_REQUEST);
285         msg.capabilities = htonl (GSE_node_capabilities);
286         msg.issuer_count = htonl (GSE_my_issuer_count);
287         memcpy (buf, &msg, msg_size);
288         memcpy (&((char *) buf)[msg_size], GSE_my_issuer, ri_size);
289
290         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Sending request to peer %s\n"),
291                         GNUNET_i2s (&n->id));
292         return total_size;
293 }
294
295
296 /**
297  * Send request to peer to start add him to to the set of experimentation nodes
298  *
299  * @param peer the peer to send to
300  */
301 static void send_experimentation_request (const struct GNUNET_PeerIdentity *peer)
302 {
303         struct Node *n;
304         struct NodeComCtx *e_ctx;
305         size_t size;
306         size_t c_issuers;
307
308         c_issuers = GSE_my_issuer_count;
309
310         size = sizeof (struct Experimentation_Request) +
311                                  c_issuers * sizeof (struct Experimentation_Issuer);
312         n = GNUNET_malloc (sizeof (struct Node));
313         n->id = *peer;
314         n->timeout_task = GNUNET_SCHEDULER_add_delayed (EXP_RESPONSE_TIMEOUT, &remove_request, n);
315         n->capabilities = NONE;
316
317         e_ctx = GNUNET_malloc (sizeof (struct NodeComCtx));
318         e_ctx->n = n;
319         e_ctx->e = NULL;
320         e_ctx->size = size;
321         e_ctx->notify = &send_experimentation_request_cb;
322         e_ctx->notify_cls = n;
323         GNUNET_CONTAINER_DLL_insert_tail(n->e_req_head, n->e_req_tail, e_ctx);
324         schedule_transmisson (e_ctx);
325
326         GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put (nodes_requested,
327                         &peer->hashPubKey, n, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
328         update_stats (nodes_requested);
329 }
330
331
332 /**
333  * Core's transmit notify callback to send response
334  *
335  * @param cls the related node
336  * @param bufsize buffer size
337  * @param buf the buffer to copy to
338  * @return bytes passed
339  */
340 size_t send_response_cb (void *cls, size_t bufsize, void *buf)
341 {
342         struct Node *n = cls;
343         struct Experimentation_Response msg;
344         size_t ri_size = GSE_my_issuer_count * sizeof (struct Experimentation_Issuer);
345         size_t msg_size = sizeof (msg);
346         size_t total_size = msg_size + ri_size;
347
348         n->cth = NULL;
349   if (buf == NULL)
350   {
351     /* client disconnected */
352     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client disconnected\n");
353     return 0;
354   }
355   GNUNET_assert (bufsize >= total_size);
356
357         msg.msg.size = htons (total_size);
358         msg.msg.type = htons (GNUNET_MESSAGE_TYPE_EXPERIMENTATION_RESPONSE);
359         msg.capabilities = htonl (GSE_node_capabilities);
360         msg.issuer_count = htonl (GSE_my_issuer_count);
361         memcpy (buf, &msg, msg_size);
362         memcpy (&((char *) buf)[msg_size], GSE_my_issuer, ri_size);
363
364         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Sending response to peer %s\n"),
365                         GNUNET_i2s (&n->id));
366         return total_size;
367 }
368
369
370 static void
371 get_experiments_cb (struct Node *n, struct Experiment *e)
372 {
373         static int counter = 0;
374         if (NULL == e)
375         {
376                         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Added %u experiments for peer %s\n"),
377                                         counter, GNUNET_i2s (&n->id));
378                         return;
379         }
380
381         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Starting experiment `%s' with peer %s\n"),
382                         e->name,
383                         GNUNET_i2s (&n->id));
384
385         /* Tell the scheduler to add a node with an experiment */
386         GED_scheduler_add (n, e, GNUNET_YES);
387         counter ++;
388 }
389
390 struct Node *
391 get_node (const struct GNUNET_PeerIdentity *id)
392 {
393         struct Node * res;
394         struct Node * tmp;
395
396         res = NULL;
397         tmp = NULL;
398         tmp = GNUNET_CONTAINER_multihashmap_get (nodes_active, &id->hashPubKey);
399         if (res == NULL)
400                 res = tmp;
401
402         tmp = GNUNET_CONTAINER_multihashmap_get (nodes_inactive, &id->hashPubKey);
403         if (res == NULL)
404                 res = tmp;
405         else
406                 GNUNET_break (0); /* Multiple instances */
407
408         tmp = GNUNET_CONTAINER_multihashmap_get (nodes_requested, &id->hashPubKey);
409         if (res == NULL)
410                 res = tmp;
411         else
412                 GNUNET_break (0); /* Multiple instances */
413
414         return res;
415 }
416
417
418 /**
419  * Set a specific node as active
420  *
421  * @param n the node
422  */
423 static void node_make_active (struct Node *n)
424 {
425         int c1;
426   GNUNET_CONTAINER_multihashmap_put (nodes_active,
427                         &n->id.hashPubKey, n, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
428         update_stats (nodes_active);
429         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Added peer `%s' as active node\n"),
430                         GNUNET_i2s (&n->id));
431         /* Request experiments for this node to start them */
432         for (c1 = 0; c1 < n->issuer_count; c1++)
433         {
434
435                 GED_experiments_get (n, &n->issuer_id[c1], &get_experiments_cb);
436         }
437 }
438
439
440 /**
441  * Handle a request and send a response
442  *
443  * @param peer the source
444  * @param message the message
445  */
446 static void handle_request (const struct GNUNET_PeerIdentity *peer,
447                                                                                                                 const struct GNUNET_MessageHeader *message)
448 {
449         struct Node *n;
450         struct NodeComCtx *e_ctx;
451         struct Experimentation_Request *rm = (struct Experimentation_Request *) message;
452         struct Experimentation_Issuer *rmi = (struct Experimentation_Issuer *) &rm[1];
453         int c1;
454         int c2;
455         uint32_t ic;
456         uint32_t ic_accepted;
457         int make_active;
458
459         if (ntohs (message->size) < sizeof (struct Experimentation_Request))
460         {
461                 GNUNET_break (0);
462                 return;
463         }
464         ic = ntohl (rm->issuer_count);
465         if (ntohs (message->size) != sizeof (struct Experimentation_Request) + ic * sizeof (struct Experimentation_Issuer))
466         {
467                 GNUNET_break (0);
468                 return;
469         }
470
471         make_active = GNUNET_NO;
472         if (NULL != (n = GNUNET_CONTAINER_multihashmap_get (nodes_active, &peer->hashPubKey)))
473         {
474                         /* Nothing to do */
475         }
476         else if (NULL != (n = GNUNET_CONTAINER_multihashmap_get (nodes_requested, &peer->hashPubKey)))
477         {
478                         GNUNET_CONTAINER_multihashmap_remove (nodes_requested, &peer->hashPubKey, n);
479                         if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
480                         {
481                                 GNUNET_SCHEDULER_cancel (n->timeout_task);
482                                 n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
483                         }
484                         update_stats (nodes_requested);
485                         make_active = GNUNET_YES;
486         }
487         else if (NULL != (n = GNUNET_CONTAINER_multihashmap_get (nodes_inactive, &peer->hashPubKey)))
488         {
489                         GNUNET_CONTAINER_multihashmap_remove (nodes_inactive, &peer->hashPubKey, n);
490                         update_stats (nodes_inactive);
491                         make_active = GNUNET_YES;
492         }
493         else
494         {
495                         /* Create new node */
496                         n = GNUNET_malloc (sizeof (struct Node));
497                         n->id = *peer;
498                         n->capabilities = NONE;
499                         make_active = GNUNET_YES;
500         }
501
502         /* Update node */
503         n->capabilities = ntohl (rm->capabilities);
504
505         /* Filter accepted issuer */
506         ic_accepted = 0;
507         for (c1 = 0; c1 < ic; c1++)
508         {
509                 if (GNUNET_YES == GED_experiments_issuer_accepted(&rmi[c1].issuer_id))
510                         ic_accepted ++;
511         }
512         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Request from peer `%s' with %u issuers, we accepted %u issuer \n"),
513                         GNUNET_i2s (peer), ic, ic_accepted);
514         GNUNET_free_non_null (n->issuer_id);
515         n->issuer_id = GNUNET_malloc (ic_accepted * sizeof (struct GNUNET_PeerIdentity));
516         c2 = 0;
517         for (c1 = 0; c1 < ic; c1++)
518         {
519                         if (GNUNET_YES == GED_experiments_issuer_accepted(&rmi[c1].issuer_id))
520                         {
521                                 n->issuer_id[c2] = rmi[c1].issuer_id;
522                                 c2 ++;
523                         }
524         }
525         n->issuer_count = ic_accepted;
526
527         if (GNUNET_YES == make_active)
528                 node_make_active (n);
529
530         /* Send response */
531         e_ctx = GNUNET_malloc (sizeof (struct NodeComCtx));
532         e_ctx->n = n;
533         e_ctx->e = NULL;
534         e_ctx->size = sizeof (struct Experimentation_Response) + GSE_my_issuer_count * sizeof (struct Experimentation_Issuer);
535         e_ctx->notify = &send_response_cb;
536         e_ctx->notify_cls = n;
537
538         GNUNET_CONTAINER_DLL_insert_tail(n->e_req_head, n->e_req_tail, e_ctx);
539         schedule_transmisson (e_ctx);
540 }
541
542
543 /**
544  * Handle a response
545  *
546  * @param peer the source
547  * @param message the message
548  */
549 static void handle_response (const struct GNUNET_PeerIdentity *peer,
550                                                                                                                  const struct GNUNET_MessageHeader *message)
551 {
552         struct Node *n;
553         struct Experimentation_Response *rm = (struct Experimentation_Response *) message;
554         struct Experimentation_Issuer *rmi = (struct Experimentation_Issuer *) &rm[1];
555         uint32_t ic;
556         uint32_t ic_accepted;
557         int make_active;
558         unsigned int c1;
559         unsigned int c2;
560
561         if (ntohs (message->size) < sizeof (struct Experimentation_Response))
562         {
563                 GNUNET_break (0);
564                 return;
565         }
566         ic = ntohl (rm->issuer_count);
567         if (ntohs (message->size) != sizeof (struct Experimentation_Response) + ic * sizeof (struct Experimentation_Issuer))
568         {
569                 GNUNET_break (0);
570                 return;
571         }
572
573         make_active = GNUNET_NO;
574         if (NULL != (n = GNUNET_CONTAINER_multihashmap_get (nodes_active, &peer->hashPubKey)))
575         {
576                         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Received %s from %s peer `%s'\n"),
577                                         "RESPONSE", "active", GNUNET_i2s (peer));
578         }
579         else if (NULL != (n = GNUNET_CONTAINER_multihashmap_get (nodes_requested, &peer->hashPubKey)))
580         {
581                         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Received %s from %s peer `%s'\n"),
582                                         "RESPONSE", "requested", GNUNET_i2s (peer));
583                         GNUNET_CONTAINER_multihashmap_remove (nodes_requested, &peer->hashPubKey, n);
584                         if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
585                         {
586                                 GNUNET_SCHEDULER_cancel (n->timeout_task);
587                                 n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
588                         }
589                         update_stats (nodes_requested);
590                         make_active = GNUNET_YES;
591         }
592         else if (NULL != (n = GNUNET_CONTAINER_multihashmap_get (nodes_inactive, &peer->hashPubKey)))
593         {
594                         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Received %s from peer `%s'\n"),
595                                         "RESPONSE", "inactive", GNUNET_i2s (peer));
596                         GNUNET_CONTAINER_multihashmap_remove (nodes_inactive, &peer->hashPubKey, n);
597                         update_stats (nodes_inactive);
598                         make_active = GNUNET_YES;
599         }
600         else
601         {
602                         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Received %s from %s peer `%s'\n"),
603                                         "RESPONSE", "unknown", GNUNET_i2s (peer));
604                         return;
605         }
606
607         /* Update */
608         n->capabilities = ntohl (rm->capabilities);
609
610         /* Filter accepted issuer */
611         ic_accepted = 0;
612         for (c1 = 0; c1 < ic; c1++)
613         {
614                 if (GNUNET_YES == GED_experiments_issuer_accepted(&rmi[c1].issuer_id))
615                         ic_accepted ++;
616         }
617         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Response from peer `%s' with %u issuers, we accepted %u issuer \n"),
618                         GNUNET_i2s (peer), ic, ic_accepted);
619         GNUNET_free_non_null (n->issuer_id);
620         n->issuer_id = GNUNET_malloc (ic_accepted * sizeof (struct GNUNET_PeerIdentity));
621         c2 = 0;
622         for (c1 = 0; c1 < ic; c1++)
623         {
624                         if (GNUNET_YES == GED_experiments_issuer_accepted(&rmi[c1].issuer_id))
625                         {
626                                 n->issuer_id[c2] = rmi[c1].issuer_id;
627                                 c2 ++;
628                         }
629         }
630         n->issuer_count = ic_accepted;
631
632         if (GNUNET_YES == make_active)
633                 node_make_active (n);
634 }
635
636 /**
637  * Handle a response
638  *
639  * @param peer the source
640  * @param message the message
641  */
642 static void handle_start (const struct GNUNET_PeerIdentity *peer,
643                                                                                                                  const struct GNUNET_MessageHeader *message)
644 {
645         uint16_t size;
646         uint32_t name_len;
647         const struct GED_start_message *msg;
648         const char *name;
649         struct Node *n;
650         struct Experiment *e;
651
652         if (NULL == peer)
653         {
654                 GNUNET_break (0);
655                 return;
656         }
657         if (NULL == message)
658         {
659                 GNUNET_break (0);
660                 return;
661         }
662
663         size = ntohs (message->size);
664         if (size < sizeof (struct GED_start_message))
665         {
666                 GNUNET_break (0);
667                 return;
668         }
669         msg = (const struct GED_start_message *) message;
670         name_len = ntohl (msg->len_name);
671         if (size != sizeof (struct GED_start_message) + name_len)
672         {
673                 GNUNET_break (0);
674                 return;
675         }
676
677         n = get_node (peer);
678         if (NULL == n)
679         {
680                 GNUNET_break (0);
681                 return;
682         }
683         name = (const char *) &msg[1];
684         if (name[name_len-1] != '\0')
685         {
686                 GNUNET_break (0);
687                 return;
688         }
689
690         if (name_len != strlen (name) + 1)
691         {
692                 GNUNET_break (0);
693                 return;
694         }
695
696         e = GED_experiments_find (&msg->issuer, name, GNUNET_TIME_absolute_ntoh(msg->version_nbo));
697         if (NULL == e)
698         {
699                 GNUNET_break (0);
700                 return;
701         }
702
703         GED_scheduler_handle_start (n, e);
704 }
705
706 /**
707  * Handle a response
708  *
709  * @param peer the source
710  * @param message the message
711  */
712 static void handle_start_ack (const struct GNUNET_PeerIdentity *peer,
713                                                                                                                  const struct GNUNET_MessageHeader *message)
714 {
715         uint16_t size;
716         uint32_t name_len;
717         const struct GED_start_ack_message *msg;
718         const char *name;
719         struct Node *n;
720         struct Experiment *e;
721
722         if (NULL == peer)
723         {
724                 GNUNET_break (0);
725                 return;
726         }
727         if (NULL == message)
728         {
729                 GNUNET_break (0);
730                 return;
731         }
732
733         size = ntohs (message->size);
734         if (size < sizeof (struct GED_start_ack_message))
735         {
736                 GNUNET_break (0);
737                 return;
738         }
739         msg = (const struct GED_start_ack_message *) message;
740         name_len = ntohl (msg->len_name);
741         if (size != sizeof (struct GED_start_message) + name_len)
742         {
743                 GNUNET_break (0);
744                 return;
745         }
746
747         n = get_node (peer);
748         if (NULL == n)
749         {
750                 GNUNET_break (0);
751                 return;
752         }
753         name = (const char *) &msg[1];
754         if (name[name_len-1] != '\0')
755         {
756                 GNUNET_break (0);
757                 return;
758         }
759
760         if (name_len != strlen (name) + 1)
761         {
762                 GNUNET_break (0);
763                 return;
764         }
765
766         e = GED_experiments_find (&msg->issuer, name, GNUNET_TIME_absolute_ntoh(msg->version_nbo));
767         if (NULL == e)
768         {
769                 GNUNET_break (0);
770                 return;
771         }
772         GED_scheduler_handle_start_ack (n, e);
773 }
774
775 /**
776  * Handle a response
777  *
778  * @param peer the source
779  * @param message the message
780  */
781 static void handle_stop (const struct GNUNET_PeerIdentity *peer,
782                                                                                                  const struct GNUNET_MessageHeader *message)
783 {
784         uint16_t size;
785         uint32_t name_len;
786         const struct GED_stop_message *msg;
787         const char *name;
788         struct Node *n;
789         struct Experiment *e;
790
791         if (NULL == peer)
792         {
793                 GNUNET_break (0);
794                 return;
795         }
796         if (NULL == message)
797         {
798                 GNUNET_break (0);
799                 return;
800         }
801
802         size = ntohs (message->size);
803         if (size < sizeof (struct GED_stop_message))
804         {
805                 GNUNET_break (0);
806                 return;
807         }
808         msg = (const struct GED_stop_message *) message;
809         name_len = ntohl (msg->len_name);
810         if (size != sizeof (struct GED_start_message) + name_len)
811         {
812                 GNUNET_break (0);
813                 return;
814         }
815
816         n = get_node (peer);
817         if (NULL == n)
818         {
819                 GNUNET_break (0);
820                 return;
821         }
822         name = (const char *) &msg[1];
823         if (name[name_len-1] != '\0')
824         {
825                 GNUNET_break (0);
826                 return;
827         }
828
829         if (name_len != strlen (name) + 1)
830         {
831                 GNUNET_break (0);
832                 return;
833         }
834
835         e = GED_experiments_find (&msg->issuer, name, GNUNET_TIME_absolute_ntoh(msg->version_nbo));
836         if (NULL == e)
837         {
838                 GNUNET_break (0);
839                 return;
840         }
841         GED_scheduler_handle_stop (n, e);
842 }
843
844 /**
845  * Method called whenever a given peer connects.
846  *
847  * @param cls closure
848  * @param peer peer identity this notification is about
849  */
850 void core_connect_handler (void *cls,
851                            const struct GNUNET_PeerIdentity *peer)
852 {
853         if (GNUNET_YES == is_me(peer))
854                 return;
855
856         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Connected to peer %s\n"),
857                         GNUNET_i2s (peer));
858
859         if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (nodes_requested, &peer->hashPubKey))
860                 return; /* We already sent a request */
861
862         if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (nodes_active, &peer->hashPubKey))
863                 return; /* This peer is known as active  */
864
865         if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (nodes_inactive, &peer->hashPubKey))
866                 return; /* This peer is known as inactive  */
867
868         send_experimentation_request (peer);
869 }
870
871
872 /**
873  * Method called whenever a given peer disconnects.
874  *
875  * @param cls closure
876  * @param peer peer identity this notification is about
877  */
878 void core_disconnect_handler (void *cls,
879                            const struct GNUNET_PeerIdentity * peer)
880 {
881         struct Node *n;
882         if (GNUNET_YES == is_me(peer))
883                 return;
884
885         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Disconnected from peer %s\n"),
886                         GNUNET_i2s (peer));
887
888         if (NULL != (n = GNUNET_CONTAINER_multihashmap_get (nodes_requested, &peer->hashPubKey)))
889                 cleanup_node (nodes_requested, &peer->hashPubKey, n);
890
891         if (NULL != (n = GNUNET_CONTAINER_multihashmap_get (nodes_active, &peer->hashPubKey)))
892                 cleanup_node (nodes_active, &peer->hashPubKey, n);
893
894         if (NULL != (n = GNUNET_CONTAINER_multihashmap_get (nodes_inactive, &peer->hashPubKey)))
895                 cleanup_node (nodes_inactive, &peer->hashPubKey, n);
896 }
897
898
899 /**
900  * Handle a request and send a response
901  *
902  * @param cls unused
903  * @param other the sender
904  * @param message the message
905  * @return GNUNET_OK to keep connection, GNUNET_SYSERR on error
906  */
907 static int
908 core_receive_handler (void *cls,
909                                                                                         const struct GNUNET_PeerIdentity *other,
910                                                                                         const struct GNUNET_MessageHeader *message)
911 {
912         if (ntohs (message->size) < sizeof (struct GNUNET_MessageHeader))
913         {
914                         GNUNET_break (0);
915                         return GNUNET_SYSERR;
916         }
917
918         switch (ntohs (message->type)) {
919                 case GNUNET_MESSAGE_TYPE_EXPERIMENTATION_REQUEST:
920                         handle_request (other, message);
921                         break;
922                 case GNUNET_MESSAGE_TYPE_EXPERIMENTATION_RESPONSE:
923                         handle_response (other, message);
924                         break;
925                 case GNUNET_MESSAGE_TYPE_EXPERIMENTATION_START:
926                         handle_start (other, message);
927                         break;
928                 case GNUNET_MESSAGE_TYPE_EXPERIMENTATION_START_ACK:
929                         handle_start_ack (other, message);
930                         break;
931                 case GNUNET_MESSAGE_TYPE_EXPERIMENTATION_STOP:
932                         handle_stop (other, message);
933                         break;
934                 default:
935                         break;
936         }
937
938         return GNUNET_OK;
939 }
940
941
942 size_t node_experiment_start_cb (void *cls, size_t bufsize, void *buf)
943 {
944         struct NodeComCtx *e_ctx = cls;
945         struct GED_start_message *msg;
946         size_t name_len;
947         size_t size;
948
949         if (NULL == buf)
950                 return 0;
951
952         name_len = strlen(e_ctx->e->name) + 1;
953         size = sizeof (struct GED_start_message) + name_len;
954
955         msg = GNUNET_malloc (size);
956         msg->header.size = htons (size);
957         msg->header.type = htons (GNUNET_MESSAGE_TYPE_EXPERIMENTATION_START);
958         msg->issuer = e_ctx->e->issuer;
959         msg->version_nbo = GNUNET_TIME_absolute_hton(e_ctx->e->version);
960         msg->len_name = htonl (name_len);
961         memcpy (&msg[1], e_ctx->e->name, name_len);
962
963         memcpy (buf, msg, size);
964         GNUNET_free (msg);
965         return size;
966 }
967
968 size_t node_experiment_start_ack_cb (void *cls, size_t bufsize, void *buf)
969 {
970         struct NodeComCtx *e_ctx = cls;
971         struct GED_start_ack_message *msg;
972         size_t name_len;
973         size_t size;
974         if (NULL == buf)
975                 return 0;
976
977         name_len = strlen(e_ctx->e->name) + 1;
978         size = sizeof (struct GED_start_ack_message) + name_len;
979
980         msg = GNUNET_malloc (size);
981         msg->header.size = htons (size);
982         msg->header.type = htons (GNUNET_MESSAGE_TYPE_EXPERIMENTATION_START_ACK);
983         msg->issuer = e_ctx->e->issuer;
984         msg->version_nbo = GNUNET_TIME_absolute_hton(e_ctx->e->version);
985         msg->len_name = htonl (name_len);
986         memcpy (&msg[1], e_ctx->e->name, name_len);
987
988         memcpy (buf, msg, size);
989         GNUNET_free (msg);
990         return size;
991 }
992
993
994
995
996 /**
997  * Confirm a experiment START with a node
998  *
999  * @return GNUNET_NO if core was busy with sending, GNUNET_OK otherwise
1000  */
1001 int
1002 GED_nodes_send_start_ack (struct Node *n, struct Experiment *e)
1003 {
1004         struct NodeComCtx *e_ctx;
1005
1006         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Sending %s for experiment request to peer `%s' for experiment `%s'\n"),
1007                         "START_ACK" ,GNUNET_i2s(&n->id), e->name);
1008
1009         e_ctx = GNUNET_malloc (sizeof (struct NodeComCtx));
1010         e_ctx->n = n;
1011         e_ctx->e = e;
1012         e_ctx->size = sizeof (struct GED_start_ack_message) + strlen (e->name) + 1;
1013         e_ctx->notify = &node_experiment_start_ack_cb;
1014         e_ctx->notify_cls = e_ctx;
1015
1016         GNUNET_CONTAINER_DLL_insert_tail (n->e_req_head, n->e_req_tail, e_ctx);
1017         schedule_transmisson (e_ctx);
1018         return GNUNET_OK;
1019 }
1020
1021
1022 /**
1023  * Request a experiment to start with a node
1024  *
1025  * @return GNUNET_NO if core was busy with sending, GNUNET_OK otherwise
1026  */
1027 int
1028 GED_nodes_request_start (struct Node *n, struct Experiment *e)
1029 {
1030         struct NodeComCtx *e_ctx;
1031
1032         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Sending %s for experiment request to peer `%s' for experiment `%s'\n"),
1033                         "START", GNUNET_i2s(&n->id), e->name);
1034
1035         e_ctx = GNUNET_malloc (sizeof (struct NodeComCtx));
1036         e_ctx->n = n;
1037         e_ctx->e = e;
1038         e_ctx->size = sizeof (struct GED_start_message) + strlen (e->name) + 1;
1039         e_ctx->notify = &node_experiment_start_cb;
1040         e_ctx->notify_cls = e_ctx;
1041
1042         GNUNET_CONTAINER_DLL_insert_tail (n->e_req_head, n->e_req_tail, e_ctx);
1043         schedule_transmisson (e_ctx);
1044         return GNUNET_OK;
1045 }
1046
1047
1048 /**
1049  * Start the nodes management
1050  */
1051 void
1052 GED_nodes_start ()
1053 {
1054         /* Connecting to core service to find partners */
1055         ch = GNUNET_CORE_connect (GED_cfg, NULL,
1056                                                                                                                 &core_startup_handler,
1057                                                                                                                 &core_connect_handler,
1058                                                                                                                 &core_disconnect_handler,
1059                                                                                                                 &core_receive_handler,
1060                                                                                                                 GNUNET_NO, NULL, GNUNET_NO, NULL);
1061         if (NULL == ch)
1062         {
1063                         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Failed to connect to CORE service!\n"));
1064                         return;
1065         }
1066
1067         nodes_requested = GNUNET_CONTAINER_multihashmap_create (10, GNUNET_NO);
1068         nodes_active = GNUNET_CONTAINER_multihashmap_create (10, GNUNET_NO);
1069         nodes_inactive = GNUNET_CONTAINER_multihashmap_create (10, GNUNET_NO);
1070 }
1071
1072
1073 /**
1074  * Stop the nodes management
1075  */
1076 void
1077 GED_nodes_stop ()
1078 {
1079   if (NULL != ch)
1080   {
1081                 GNUNET_CORE_disconnect (ch);
1082                 ch = NULL;
1083   }
1084
1085   if (NULL != nodes_requested)
1086   {
1087                 GNUNET_CONTAINER_multihashmap_iterate (nodes_requested,
1088                                                                                                                                                                          &cleanup_node,
1089                                                                                                                                                                          nodes_requested);
1090                 update_stats (nodes_requested);
1091                 GNUNET_CONTAINER_multihashmap_destroy (nodes_requested);
1092                 nodes_requested = NULL;
1093   }
1094
1095   if (NULL != nodes_active)
1096   {
1097                 GNUNET_CONTAINER_multihashmap_iterate (nodes_active,
1098                                                                                                                                                                          &cleanup_node,
1099                                                                                                                                                                          nodes_active);
1100                 update_stats (nodes_active);
1101                 GNUNET_CONTAINER_multihashmap_destroy (nodes_active);
1102                 nodes_active = NULL;
1103   }
1104
1105   if (NULL != nodes_inactive)
1106   {
1107                 GNUNET_CONTAINER_multihashmap_iterate (nodes_inactive,
1108                                                                                                                                                                          &cleanup_node,
1109                                                                                                                                                                          nodes_inactive);
1110                 update_stats (nodes_inactive);
1111                 GNUNET_CONTAINER_multihashmap_destroy (nodes_inactive);
1112                 nodes_inactive = NULL;
1113   }
1114 }
1115
1116 /* end of gnunet-daemon-experimentation_nodes.c */