fixed stat counters
[oweals/gnunet.git] / src / experimentation / gnunet-daemon-experimentation_nodes.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file experimentation/gnunet-daemon-experimentation_nodes.c
23  * @brief experimentation daemon: node management
24  * @author Christian Grothoff
25  * @author Matthias Wachs
26  */
27 #include "platform.h"
28 #include "gnunet_getopt_lib.h"
29 #include "gnunet_util_lib.h"
30 #include "gnunet_core_service.h"
31 #include "gnunet_statistics_service.h"
32 #include "gnunet-daemon-experimentation.h"
33
34
35 #define FAST_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
36 /**
37  * Core handle
38  */
39 static struct GNUNET_CORE_Handle *ch;
40
41
42 /**
43  * Peer's own identity
44  */
45 static struct GNUNET_PeerIdentity me;
46
47
48 /**
49  * Nodes with a pending request
50  */
51 struct GNUNET_CONTAINER_MultiHashMap *nodes_requested;
52
53
54 /**
55  * Active experimentation nodes
56  */
57 struct GNUNET_CONTAINER_MultiHashMap *nodes_active;
58
59
60 /**
61  * Inactive experimentation nodes
62  * To be excluded from future requests
63  */
64 struct GNUNET_CONTAINER_MultiHashMap *nodes_inactive;
65
66 struct NodeComCtx
67 {
68         struct NodeComCtx *prev;
69         struct NodeComCtx *next;
70
71         struct Node *n;
72         struct Experiment *e;
73
74         size_t size;
75         GNUNET_CONNECTION_TransmitReadyNotify notify;
76         void *notify_cls;
77 };
78
79
80 /**
81  * Update statistics
82  *
83  * @param m hashmap to update values from
84  */
85 static void update_stats (struct GNUNET_CONTAINER_MultiHashMap *m)
86 {
87         GNUNET_assert (NULL != m);
88         GNUNET_assert (NULL != GED_stats);
89
90         if (m == nodes_active)
91         {
92                         GNUNET_STATISTICS_set (GED_stats, "# nodes active",
93                                         GNUNET_CONTAINER_multihashmap_size(m), GNUNET_NO);
94         }
95         else if (m == nodes_inactive)
96         {
97                         GNUNET_STATISTICS_set (GED_stats, "# nodes inactive",
98                                         GNUNET_CONTAINER_multihashmap_size(m), GNUNET_NO);
99         }
100         else if (m == nodes_requested)
101         {
102                         GNUNET_STATISTICS_set (GED_stats, "# nodes requested",
103                                         GNUNET_CONTAINER_multihashmap_size(m), GNUNET_NO);
104         }
105         else
106                 GNUNET_break (0);
107
108 }
109
110
111 /**
112  * Clean up node
113  *
114  * @param cls the hashmap to clean up
115  * @param key key of the current node
116  * @param value related node object
117  * @return always GNUNET_OK
118  */
119 static int
120 cleanup_node (void *cls,
121                                                          const struct GNUNET_HashCode * key,
122                                                          void *value)
123 {
124         struct Node *n;
125         struct NodeComCtx *e_cur;
126         struct NodeComCtx *e_next;
127         struct GNUNET_CONTAINER_MultiHashMap *cur = cls;
128
129         n = value;
130         if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
131         {
132                 GNUNET_SCHEDULER_cancel (n->timeout_task);
133                 n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
134         }
135
136         if (NULL != n->cth)
137         {
138                 GNUNET_CORE_notify_transmit_ready_cancel (n->cth);
139                 n->cth = NULL;
140         }
141         e_next = n->e_req_head;
142         while (NULL != (e_cur = e_next))
143         {
144                 e_next = e_cur->next;
145                 GNUNET_CONTAINER_DLL_remove (n->e_req_head, n->e_req_tail, e_cur);
146                 GNUNET_free (e_cur);
147         }
148
149         GNUNET_free_non_null (n->issuer_id);
150
151         GNUNET_CONTAINER_multihashmap_remove (cur, key, value);
152         GNUNET_free (value);
153         return GNUNET_OK;
154 }
155
156
157 /**
158  * Check if id passed is my id
159  *
160  * @param id the id to check
161  * @return GNUNET_YES or GNUNET_NO
162  */
163 static int is_me (const struct GNUNET_PeerIdentity *id)
164 {
165         if (0 == memcmp (&me, id, sizeof (me)))
166                 return GNUNET_YES;
167         else
168                 return GNUNET_NO;
169 }
170
171 /**
172  * Core startup callback
173  *
174  * @param cls unused
175  * @param server core service's server handle
176  * @param my_identity my id
177  */
178 static void
179 core_startup_handler (void *cls,
180                                                                                         struct GNUNET_CORE_Handle *server,
181                       const struct GNUNET_PeerIdentity *my_identity)
182 {
183         me = *my_identity;
184 }
185
186 void
187 schedule_transmisson (struct NodeComCtx *e_ctx);
188
189 size_t
190 transmit_read_wrapper (void *cls, size_t bufsize, void *buf)
191 {
192         struct NodeComCtx *e_ctx = cls;
193         struct NodeComCtx *next = NULL;
194
195         size_t res = e_ctx->notify (e_ctx->notify_cls, bufsize, buf);
196         e_ctx->n->cth = NULL;
197
198         GNUNET_CONTAINER_DLL_remove (e_ctx->n->e_req_head, e_ctx->n->e_req_tail, e_ctx);
199         next = e_ctx->n->e_req_head;
200         GNUNET_free (e_ctx);
201
202         if (NULL != next)
203         {
204                 /* Schedule next message */
205                 schedule_transmisson (next);
206         }
207         return res;
208 }
209
210 void
211 schedule_transmisson (struct NodeComCtx *e_ctx)
212 {
213         if (NULL != e_ctx->n->cth)
214                 return;
215
216         e_ctx->n->cth = GNUNET_CORE_notify_transmit_ready (ch, GNUNET_NO, 0, FAST_TIMEOUT,
217                         &e_ctx->n->id, e_ctx->size, transmit_read_wrapper, e_ctx);
218         if (NULL == e_ctx->n->cth)
219         {
220                 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, _("Cannot send message to peer `%s' for experiment `%s'\n"),
221                                 GNUNET_i2s(&e_ctx->n->id), e_ctx->e->name);
222                 GNUNET_free (e_ctx);
223         }
224
225 }
226
227
228 /**
229  * Remove experimentation request due to timeout
230  *
231  * @param cls the related node
232  * @param tc scheduler's task context
233  */
234 static void
235 remove_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
236 {
237         struct Node *n = cls;
238
239         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Removing request for peer %s due to timeout\n",
240                         GNUNET_i2s (&n->id));
241
242         if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (nodes_requested, &n->id.hashPubKey))
243         {
244                         GNUNET_CONTAINER_multihashmap_remove (nodes_requested, &n->id.hashPubKey, n);
245                         update_stats (nodes_requested);
246                         GNUNET_CONTAINER_multihashmap_put (nodes_inactive, &n->id.hashPubKey, n,
247                                         GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
248                         update_stats (nodes_inactive);
249         }
250         n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
251 }
252
253
254 /**
255  * Core's transmit notify callback to send request
256  *
257  * @param cls the related node
258  * @param bufsize buffer size
259  * @param buf the buffer to copy to
260  * @return bytes passed
261  */
262 size_t send_experimentation_request_cb (void *cls, size_t bufsize, void *buf)
263 {
264         struct Node *n = cls;
265         struct Experimentation_Request msg;
266         size_t msg_size = sizeof (msg);
267         size_t ri_size = sizeof (struct Experimentation_Issuer) * GSE_my_issuer_count;
268         size_t total_size = msg_size + ri_size;
269
270         memset (buf, '0', bufsize);
271         n->cth = NULL;
272   if (buf == NULL)
273   {
274     /* client disconnected */
275     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client disconnected\n");
276     if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
277                 GNUNET_SCHEDULER_cancel (n->timeout_task);
278     GNUNET_SCHEDULER_add_now (&remove_request, n);
279     return 0;
280   }
281   GNUNET_assert (bufsize >= total_size);
282
283         msg.msg.size = htons (total_size);
284         msg.msg.type = htons (GNUNET_MESSAGE_TYPE_EXPERIMENTATION_REQUEST);
285         msg.capabilities = htonl (GSE_node_capabilities);
286         msg.issuer_count = htonl (GSE_my_issuer_count);
287         memcpy (buf, &msg, msg_size);
288         memcpy (&((char *) buf)[msg_size], GSE_my_issuer, ri_size);
289
290         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Sending experimentation request to peer %s\n"),
291                         GNUNET_i2s (&n->id));
292         return total_size;
293 }
294
295
296 /**
297  * Send request to peer to start add him to to the set of experimentation nodes
298  *
299  * @param peer the peer to send to
300  */
301 static void send_experimentation_request (const struct GNUNET_PeerIdentity *peer)
302 {
303         struct Node *n;
304         struct NodeComCtx *e_ctx;
305         size_t size;
306         size_t c_issuers;
307
308         c_issuers = GSE_my_issuer_count;
309
310         size = sizeof (struct Experimentation_Request) +
311                                  c_issuers * sizeof (struct Experimentation_Issuer);
312         n = GNUNET_malloc (sizeof (struct Node));
313         n->id = *peer;
314         n->timeout_task = GNUNET_SCHEDULER_add_delayed (EXP_RESPONSE_TIMEOUT, &remove_request, n);
315         n->capabilities = NONE;
316
317         e_ctx = GNUNET_malloc (sizeof (struct NodeComCtx));
318         e_ctx->n = n;
319         e_ctx->e = NULL;
320         e_ctx->size = size;
321         e_ctx->notify = &send_experimentation_request_cb;
322         e_ctx->notify_cls = n;
323         GNUNET_CONTAINER_DLL_insert_tail(n->e_req_head, n->e_req_tail, e_ctx);
324         schedule_transmisson (e_ctx);
325
326         GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put (nodes_requested,
327                         &peer->hashPubKey, n, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
328         update_stats (nodes_requested);
329 }
330
331
332 /**
333  * Core's transmit notify callback to send response
334  *
335  * @param cls the related node
336  * @param bufsize buffer size
337  * @param buf the buffer to copy to
338  * @return bytes passed
339  */
340 size_t send_response_cb (void *cls, size_t bufsize, void *buf)
341 {
342         struct Node *n = cls;
343         struct Experimentation_Response msg;
344         size_t ri_size = GSE_my_issuer_count * sizeof (struct Experimentation_Issuer);
345         size_t msg_size = sizeof (msg);
346         size_t total_size = msg_size + ri_size;
347
348         n->cth = NULL;
349   if (buf == NULL)
350   {
351     /* client disconnected */
352     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client disconnected\n");
353     return 0;
354   }
355   GNUNET_assert (bufsize >= total_size);
356
357         msg.msg.size = htons (total_size);
358         msg.msg.type = htons (GNUNET_MESSAGE_TYPE_EXPERIMENTATION_RESPONSE);
359         msg.capabilities = htonl (GSE_node_capabilities);
360         msg.issuer_count = htonl (GSE_my_issuer_count);
361         memcpy (buf, &msg, msg_size);
362         memcpy (&((char *) buf)[msg_size], GSE_my_issuer, ri_size);
363
364         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending response to peer %s\n",
365                         GNUNET_i2s (&n->id));
366         return total_size;
367 }
368
369
370 static void
371 get_experiments_cb (struct Node *n, struct Experiment *e)
372 {
373         static int counter = 0;
374         if (NULL == e)
375                         return; /* Done */
376
377         /* Tell the scheduler to add a node with an experiment */
378         GED_scheduler_add (n, e, GNUNET_YES);
379         counter ++;
380 }
381
382 struct Node *
383 get_node (const struct GNUNET_PeerIdentity *id)
384 {
385         struct Node * res;
386         struct Node * tmp;
387
388         res = NULL;
389         tmp = NULL;
390         tmp = GNUNET_CONTAINER_multihashmap_get (nodes_active, &id->hashPubKey);
391         if (res == NULL)
392                 res = tmp;
393
394         tmp = GNUNET_CONTAINER_multihashmap_get (nodes_inactive, &id->hashPubKey);
395         if (res == NULL)
396                 res = tmp;
397         else
398                 GNUNET_break (0); /* Multiple instances */
399
400         tmp = GNUNET_CONTAINER_multihashmap_get (nodes_requested, &id->hashPubKey);
401         if (res == NULL)
402                 res = tmp;
403         else
404                 GNUNET_break (0); /* Multiple instances */
405
406         return res;
407 }
408
409
410 /**
411  * Set a specific node as active
412  *
413  * @param n the node
414  */
415 static void node_make_active (struct Node *n)
416 {
417         int c1;
418   GNUNET_CONTAINER_multihashmap_put (nodes_active,
419                         &n->id.hashPubKey, n, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
420         update_stats (nodes_active);
421         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Added peer `%s' as active node\n"),
422                         GNUNET_i2s (&n->id));
423         /* Request experiments for this node to start them */
424         for (c1 = 0; c1 < n->issuer_count; c1++)
425         {
426
427                 GED_experiments_get (n, &n->issuer_id[c1], &get_experiments_cb);
428         }
429 }
430
431
432 /**
433  * Handle a request and send a response
434  *
435  * @param peer the source
436  * @param message the message
437  */
438 static void handle_request (const struct GNUNET_PeerIdentity *peer,
439                                                                                                                 const struct GNUNET_MessageHeader *message)
440 {
441         struct Node *n;
442         struct NodeComCtx *e_ctx;
443         struct Experimentation_Request *rm = (struct Experimentation_Request *) message;
444         struct Experimentation_Issuer *rmi = (struct Experimentation_Issuer *) &rm[1];
445         int c1;
446         int c2;
447         uint32_t ic;
448         uint32_t ic_accepted;
449         int make_active;
450
451         if (ntohs (message->size) < sizeof (struct Experimentation_Request))
452         {
453                 GNUNET_break (0);
454                 return;
455         }
456         ic = ntohl (rm->issuer_count);
457         if (ntohs (message->size) != sizeof (struct Experimentation_Request) + ic * sizeof (struct Experimentation_Issuer))
458         {
459                 GNUNET_break (0);
460                 return;
461         }
462
463         make_active = GNUNET_NO;
464         if (NULL != (n = GNUNET_CONTAINER_multihashmap_get (nodes_active, &peer->hashPubKey)))
465         {
466                         /* Nothing to do */
467         }
468         else if (NULL != (n = GNUNET_CONTAINER_multihashmap_get (nodes_requested, &peer->hashPubKey)))
469         {
470                         GNUNET_CONTAINER_multihashmap_remove (nodes_requested, &peer->hashPubKey, n);
471                         if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
472                         {
473                                 GNUNET_SCHEDULER_cancel (n->timeout_task);
474                                 n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
475                         }
476                         update_stats (nodes_requested);
477                         make_active = GNUNET_YES;
478         }
479         else if (NULL != (n = GNUNET_CONTAINER_multihashmap_get (nodes_inactive, &peer->hashPubKey)))
480         {
481                         GNUNET_CONTAINER_multihashmap_remove (nodes_inactive, &peer->hashPubKey, n);
482                         update_stats (nodes_inactive);
483                         make_active = GNUNET_YES;
484         }
485         else
486         {
487                         /* Create new node */
488                         n = GNUNET_malloc (sizeof (struct Node));
489                         n->id = *peer;
490                         n->capabilities = NONE;
491                         make_active = GNUNET_YES;
492         }
493
494         /* Update node */
495         n->capabilities = ntohl (rm->capabilities);
496
497         /* Filter accepted issuer */
498         ic_accepted = 0;
499         for (c1 = 0; c1 < ic; c1++)
500         {
501                 if (GNUNET_YES == GED_experiments_issuer_accepted(&rmi[c1].issuer_id))
502                         ic_accepted ++;
503         }
504         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Request from peer `%s' with %u issuers, we accepted %u issuer \n",
505                         GNUNET_i2s (peer), ic, ic_accepted);
506         GNUNET_free_non_null (n->issuer_id);
507         n->issuer_id = GNUNET_malloc (ic_accepted * sizeof (struct GNUNET_PeerIdentity));
508         c2 = 0;
509         for (c1 = 0; c1 < ic; c1++)
510         {
511                         if (GNUNET_YES == GED_experiments_issuer_accepted(&rmi[c1].issuer_id))
512                         {
513                                 n->issuer_id[c2] = rmi[c1].issuer_id;
514                                 c2 ++;
515                         }
516         }
517         n->issuer_count = ic_accepted;
518
519         if (GNUNET_YES == make_active)
520                 node_make_active (n);
521
522         /* Send response */
523         e_ctx = GNUNET_malloc (sizeof (struct NodeComCtx));
524         e_ctx->n = n;
525         e_ctx->e = NULL;
526         e_ctx->size = sizeof (struct Experimentation_Response) + GSE_my_issuer_count * sizeof (struct Experimentation_Issuer);
527         e_ctx->notify = &send_response_cb;
528         e_ctx->notify_cls = n;
529
530         GNUNET_CONTAINER_DLL_insert_tail(n->e_req_head, n->e_req_tail, e_ctx);
531         schedule_transmisson (e_ctx);
532 }
533
534
535 /**
536  * Handle a response
537  *
538  * @param peer the source
539  * @param message the message
540  */
541 static void handle_response (const struct GNUNET_PeerIdentity *peer,
542                                                                                                                  const struct GNUNET_MessageHeader *message)
543 {
544         struct Node *n;
545         struct Experimentation_Response *rm = (struct Experimentation_Response *) message;
546         struct Experimentation_Issuer *rmi = (struct Experimentation_Issuer *) &rm[1];
547         uint32_t ic;
548         uint32_t ic_accepted;
549         int make_active;
550         unsigned int c1;
551         unsigned int c2;
552
553         if (ntohs (message->size) < sizeof (struct Experimentation_Response))
554         {
555                 GNUNET_break (0);
556                 return;
557         }
558         ic = ntohl (rm->issuer_count);
559         if (ntohs (message->size) != sizeof (struct Experimentation_Response) + ic * sizeof (struct Experimentation_Issuer))
560         {
561                 GNUNET_break (0);
562                 return;
563         }
564
565         make_active = GNUNET_NO;
566         if (NULL != (n = GNUNET_CONTAINER_multihashmap_get (nodes_active, &peer->hashPubKey)))
567         {
568                         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received %s from %s peer `%s'\n",
569                                         "RESPONSE", "active", GNUNET_i2s (peer));
570         }
571         else if (NULL != (n = GNUNET_CONTAINER_multihashmap_get (nodes_requested, &peer->hashPubKey)))
572         {
573                         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received %s from %s peer `%s'\n",
574                                         "RESPONSE", "requested", GNUNET_i2s (peer));
575                         GNUNET_CONTAINER_multihashmap_remove (nodes_requested, &peer->hashPubKey, n);
576                         if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
577                         {
578                                 GNUNET_SCHEDULER_cancel (n->timeout_task);
579                                 n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
580                         }
581                         update_stats (nodes_requested);
582                         make_active = GNUNET_YES;
583         }
584         else if (NULL != (n = GNUNET_CONTAINER_multihashmap_get (nodes_inactive, &peer->hashPubKey)))
585         {
586                         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received %s from peer `%s'\n",
587                                         "RESPONSE", "inactive", GNUNET_i2s (peer));
588                         GNUNET_CONTAINER_multihashmap_remove (nodes_inactive, &peer->hashPubKey, n);
589                         update_stats (nodes_inactive);
590                         make_active = GNUNET_YES;
591         }
592         else
593         {
594                         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received %s from %s peer `%s'\n",
595                                         "RESPONSE", "unknown", GNUNET_i2s (peer));
596                         return;
597         }
598
599         /* Update */
600         n->capabilities = ntohl (rm->capabilities);
601
602         /* Filter accepted issuer */
603         ic_accepted = 0;
604         for (c1 = 0; c1 < ic; c1++)
605         {
606                 if (GNUNET_YES == GED_experiments_issuer_accepted(&rmi[c1].issuer_id))
607                         ic_accepted ++;
608         }
609         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Response from peer `%s' with %u issuers, we accepted %u issuer \n",
610                         GNUNET_i2s (peer), ic, ic_accepted);
611         GNUNET_free_non_null (n->issuer_id);
612         n->issuer_id = GNUNET_malloc (ic_accepted * sizeof (struct GNUNET_PeerIdentity));
613         c2 = 0;
614         for (c1 = 0; c1 < ic; c1++)
615         {
616                         if (GNUNET_YES == GED_experiments_issuer_accepted(&rmi[c1].issuer_id))
617                         {
618                                 n->issuer_id[c2] = rmi[c1].issuer_id;
619                                 c2 ++;
620                         }
621         }
622         n->issuer_count = ic_accepted;
623
624         if (GNUNET_YES == make_active)
625                 node_make_active (n);
626 }
627
628 /**
629  * Handle a response
630  *
631  * @param peer the source
632  * @param message the message
633  */
634 static void handle_start (const struct GNUNET_PeerIdentity *peer,
635                                                                                                                  const struct GNUNET_MessageHeader *message)
636 {
637         uint16_t size;
638         uint32_t name_len;
639         const struct GED_start_message *msg;
640         const char *name;
641         struct Node *n;
642         struct Experiment *e;
643
644         if (NULL == peer)
645         {
646                 GNUNET_break (0);
647                 return;
648         }
649         if (NULL == message)
650         {
651                 GNUNET_break (0);
652                 return;
653         }
654
655         size = ntohs (message->size);
656         if (size < sizeof (struct GED_start_message))
657         {
658                 GNUNET_break (0);
659                 return;
660         }
661         msg = (const struct GED_start_message *) message;
662         name_len = ntohl (msg->len_name);
663         if (size != sizeof (struct GED_start_message) + name_len)
664         {
665                 GNUNET_break (0);
666                 return;
667         }
668
669         n = get_node (peer);
670         if (NULL == n)
671         {
672                 GNUNET_break (0);
673                 return;
674         }
675         name = (const char *) &msg[1];
676         if (name[name_len-1] != '\0')
677         {
678                 GNUNET_break (0);
679                 return;
680         }
681
682         if (name_len != strlen (name) + 1)
683         {
684                 GNUNET_break (0);
685                 return;
686         }
687
688         e = GED_experiments_find (&msg->issuer, name, GNUNET_TIME_absolute_ntoh(msg->version_nbo));
689         if (NULL == e)
690         {
691                 GNUNET_break (0);
692                 return;
693         }
694
695         GED_scheduler_handle_start (n, e);
696 }
697
698 /**
699  * Handle a response
700  *
701  * @param peer the source
702  * @param message the message
703  */
704 static void handle_start_ack (const struct GNUNET_PeerIdentity *peer,
705                                                                                                                  const struct GNUNET_MessageHeader *message)
706 {
707         uint16_t size;
708         uint32_t name_len;
709         const struct GED_start_ack_message *msg;
710         const char *name;
711         struct Node *n;
712         struct Experiment *e;
713
714         if (NULL == peer)
715         {
716                 GNUNET_break (0);
717                 return;
718         }
719         if (NULL == message)
720         {
721                 GNUNET_break (0);
722                 return;
723         }
724
725         size = ntohs (message->size);
726         if (size < sizeof (struct GED_start_ack_message))
727         {
728                 GNUNET_break (0);
729                 return;
730         }
731         msg = (const struct GED_start_ack_message *) message;
732         name_len = ntohl (msg->len_name);
733         if (size != sizeof (struct GED_start_message) + name_len)
734         {
735                 GNUNET_break (0);
736                 return;
737         }
738
739         n = get_node (peer);
740         if (NULL == n)
741         {
742                 GNUNET_break (0);
743                 return;
744         }
745         name = (const char *) &msg[1];
746         if (name[name_len-1] != '\0')
747         {
748                 GNUNET_break (0);
749                 return;
750         }
751
752         if (name_len != strlen (name) + 1)
753         {
754                 GNUNET_break (0);
755                 return;
756         }
757
758         e = GED_experiments_find (&msg->issuer, name, GNUNET_TIME_absolute_ntoh(msg->version_nbo));
759         if (NULL == e)
760         {
761                 GNUNET_break (0);
762                 return;
763         }
764         GED_scheduler_handle_start_ack (n, e);
765 }
766
767 /**
768  * Handle a response
769  *
770  * @param peer the source
771  * @param message the message
772  */
773 static void handle_stop (const struct GNUNET_PeerIdentity *peer,
774                                                                                                  const struct GNUNET_MessageHeader *message)
775 {
776         uint16_t size;
777         uint32_t name_len;
778         const struct GED_stop_message *msg;
779         const char *name;
780         struct Node *n;
781         struct Experiment *e;
782
783         if (NULL == peer)
784         {
785                 GNUNET_break (0);
786                 return;
787         }
788         if (NULL == message)
789         {
790                 GNUNET_break (0);
791                 return;
792         }
793
794         size = ntohs (message->size);
795         if (size < sizeof (struct GED_stop_message))
796         {
797                 GNUNET_break (0);
798                 return;
799         }
800         msg = (const struct GED_stop_message *) message;
801         name_len = ntohl (msg->len_name);
802         if (size != sizeof (struct GED_start_message) + name_len)
803         {
804                 GNUNET_break (0);
805                 return;
806         }
807
808         n = get_node (peer);
809         if (NULL == n)
810         {
811                 GNUNET_break (0);
812                 return;
813         }
814         name = (const char *) &msg[1];
815         if (name[name_len-1] != '\0')
816         {
817                 GNUNET_break (0);
818                 return;
819         }
820
821         if (name_len != strlen (name) + 1)
822         {
823                 GNUNET_break (0);
824                 return;
825         }
826
827         e = GED_experiments_find (&msg->issuer, name, GNUNET_TIME_absolute_ntoh(msg->version_nbo));
828         if (NULL == e)
829         {
830                 GNUNET_break (0);
831                 return;
832         }
833         GED_scheduler_handle_stop (n, e);
834 }
835
836 /**
837  * Method called whenever a given peer connects.
838  *
839  * @param cls closure
840  * @param peer peer identity this notification is about
841  */
842 void core_connect_handler (void *cls,
843                            const struct GNUNET_PeerIdentity *peer)
844 {
845         if (GNUNET_YES == is_me(peer))
846                 return;
847
848         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Connected to peer %s\n"),
849                         GNUNET_i2s (peer));
850
851         if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (nodes_requested, &peer->hashPubKey))
852                 return; /* We already sent a request */
853
854         if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (nodes_active, &peer->hashPubKey))
855                 return; /* This peer is known as active  */
856
857         if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (nodes_inactive, &peer->hashPubKey))
858                 return; /* This peer is known as inactive  */
859
860         send_experimentation_request (peer);
861 }
862
863
864 /**
865  * Method called whenever a given peer disconnects.
866  *
867  * @param cls closure
868  * @param peer peer identity this notification is about
869  */
870 void core_disconnect_handler (void *cls,
871                            const struct GNUNET_PeerIdentity * peer)
872 {
873         struct Node *n;
874         if (GNUNET_YES == is_me(peer))
875                 return;
876
877         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Disconnected from peer %s\n"),
878                         GNUNET_i2s (peer));
879
880         if (NULL != (n = GNUNET_CONTAINER_multihashmap_get (nodes_requested, &peer->hashPubKey)))
881                 cleanup_node (nodes_requested, &peer->hashPubKey, n);
882
883         if (NULL != (n = GNUNET_CONTAINER_multihashmap_get (nodes_active, &peer->hashPubKey)))
884                 cleanup_node (nodes_active, &peer->hashPubKey, n);
885
886         if (NULL != (n = GNUNET_CONTAINER_multihashmap_get (nodes_inactive, &peer->hashPubKey)))
887                 cleanup_node (nodes_inactive, &peer->hashPubKey, n);
888 }
889
890
891 /**
892  * Handle a request and send a response
893  *
894  * @param cls unused
895  * @param other the sender
896  * @param message the message
897  * @return GNUNET_OK to keep connection, GNUNET_SYSERR on error
898  */
899 static int
900 core_receive_handler (void *cls,
901                                                                                         const struct GNUNET_PeerIdentity *other,
902                                                                                         const struct GNUNET_MessageHeader *message)
903 {
904         if (ntohs (message->size) < sizeof (struct GNUNET_MessageHeader))
905         {
906                         GNUNET_break (0);
907                         return GNUNET_SYSERR;
908         }
909
910         switch (ntohs (message->type)) {
911                 case GNUNET_MESSAGE_TYPE_EXPERIMENTATION_REQUEST:
912                         handle_request (other, message);
913                         break;
914                 case GNUNET_MESSAGE_TYPE_EXPERIMENTATION_RESPONSE:
915                         handle_response (other, message);
916                         break;
917                 case GNUNET_MESSAGE_TYPE_EXPERIMENTATION_START:
918                         handle_start (other, message);
919                         break;
920                 case GNUNET_MESSAGE_TYPE_EXPERIMENTATION_START_ACK:
921                         handle_start_ack (other, message);
922                         break;
923                 case GNUNET_MESSAGE_TYPE_EXPERIMENTATION_STOP:
924                         handle_stop (other, message);
925                         break;
926                 default:
927                         break;
928         }
929
930         return GNUNET_OK;
931 }
932
933
934 size_t node_experiment_start_cb (void *cls, size_t bufsize, void *buf)
935 {
936         struct NodeComCtx *e_ctx = cls;
937         struct GED_start_message *msg;
938         size_t name_len;
939         size_t size;
940
941         if (NULL == buf)
942                 return 0;
943
944         name_len = strlen(e_ctx->e->name) + 1;
945         size = sizeof (struct GED_start_message) + name_len;
946
947         msg = GNUNET_malloc (size);
948         msg->header.size = htons (size);
949         msg->header.type = htons (GNUNET_MESSAGE_TYPE_EXPERIMENTATION_START);
950         msg->issuer = e_ctx->e->issuer;
951         msg->version_nbo = GNUNET_TIME_absolute_hton(e_ctx->e->version);
952         msg->len_name = htonl (name_len);
953         memcpy (&msg[1], e_ctx->e->name, name_len);
954
955         memcpy (buf, msg, size);
956         GNUNET_free (msg);
957         return size;
958 }
959
960 size_t node_experiment_start_ack_cb (void *cls, size_t bufsize, void *buf)
961 {
962         struct NodeComCtx *e_ctx = cls;
963         struct GED_start_ack_message *msg;
964         size_t name_len;
965         size_t size;
966         if (NULL == buf)
967                 return 0;
968
969         name_len = strlen(e_ctx->e->name) + 1;
970         size = sizeof (struct GED_start_ack_message) + name_len;
971
972         msg = GNUNET_malloc (size);
973         msg->header.size = htons (size);
974         msg->header.type = htons (GNUNET_MESSAGE_TYPE_EXPERIMENTATION_START_ACK);
975         msg->issuer = e_ctx->e->issuer;
976         msg->version_nbo = GNUNET_TIME_absolute_hton(e_ctx->e->version);
977         msg->len_name = htonl (name_len);
978         memcpy (&msg[1], e_ctx->e->name, name_len);
979
980         memcpy (buf, msg, size);
981         GNUNET_free (msg);
982         return size;
983 }
984
985
986
987
988 /**
989  * Confirm a experiment START with a node
990  *
991  * @return GNUNET_NO if core was busy with sending, GNUNET_OK otherwise
992  */
993 int
994 GED_nodes_send_start_ack (struct Node *n, struct Experiment *e)
995 {
996         struct NodeComCtx *e_ctx;
997
998         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
999                         "Sending %s for experiment request to peer `%s' for experiment `%s'\n",
1000                         "START_ACK" ,GNUNET_i2s(&n->id), e->name);
1001
1002         e_ctx = GNUNET_malloc (sizeof (struct NodeComCtx));
1003         e_ctx->n = n;
1004         e_ctx->e = e;
1005         e_ctx->size = sizeof (struct GED_start_ack_message) + strlen (e->name) + 1;
1006         e_ctx->notify = &node_experiment_start_ack_cb;
1007         e_ctx->notify_cls = e_ctx;
1008
1009         GNUNET_CONTAINER_DLL_insert_tail (n->e_req_head, n->e_req_tail, e_ctx);
1010         schedule_transmisson (e_ctx);
1011         return GNUNET_OK;
1012 }
1013
1014
1015 /**
1016  * Request a experiment to start with a node
1017  *
1018  * @return GNUNET_NO if core was busy with sending, GNUNET_OK otherwise
1019  */
1020 int
1021 GED_nodes_send_start (struct Node *n, struct Experiment *e)
1022 {
1023         struct NodeComCtx *e_ctx;
1024
1025         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1026                         "Sending %s for experiment request to peer `%s' for experiment `%s'\n",
1027                         "START", GNUNET_i2s(&n->id), e->name);
1028
1029         e_ctx = GNUNET_malloc (sizeof (struct NodeComCtx));
1030         e_ctx->n = n;
1031         e_ctx->e = e;
1032         e_ctx->size = sizeof (struct GED_start_message) + strlen (e->name) + 1;
1033         e_ctx->notify = &node_experiment_start_cb;
1034         e_ctx->notify_cls = e_ctx;
1035
1036         GNUNET_CONTAINER_DLL_insert_tail (n->e_req_head, n->e_req_tail, e_ctx);
1037         schedule_transmisson (e_ctx);
1038         return GNUNET_OK;
1039 }
1040
1041
1042 /**
1043  * Start the nodes management
1044  */
1045 void
1046 GED_nodes_start ()
1047 {
1048         /* Connecting to core service to find partners */
1049         ch = GNUNET_CORE_connect (GED_cfg, NULL,
1050                                                                                                                 &core_startup_handler,
1051                                                                                                                 &core_connect_handler,
1052                                                                                                                 &core_disconnect_handler,
1053                                                                                                                 &core_receive_handler,
1054                                                                                                                 GNUNET_NO, NULL, GNUNET_NO, NULL);
1055         if (NULL == ch)
1056         {
1057                         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Failed to connect to CORE service!\n"));
1058                         return;
1059         }
1060
1061         nodes_requested = GNUNET_CONTAINER_multihashmap_create (10, GNUNET_NO);
1062         nodes_active = GNUNET_CONTAINER_multihashmap_create (10, GNUNET_NO);
1063         nodes_inactive = GNUNET_CONTAINER_multihashmap_create (10, GNUNET_NO);
1064 }
1065
1066
1067 /**
1068  * Stop the nodes management
1069  */
1070 void
1071 GED_nodes_stop ()
1072 {
1073   if (NULL != ch)
1074   {
1075                 GNUNET_CORE_disconnect (ch);
1076                 ch = NULL;
1077   }
1078
1079   if (NULL != nodes_requested)
1080   {
1081                 GNUNET_CONTAINER_multihashmap_iterate (nodes_requested,
1082                                                                                                                                                                          &cleanup_node,
1083                                                                                                                                                                          nodes_requested);
1084                 update_stats (nodes_requested);
1085                 GNUNET_CONTAINER_multihashmap_destroy (nodes_requested);
1086                 nodes_requested = NULL;
1087   }
1088
1089   if (NULL != nodes_active)
1090   {
1091                 GNUNET_CONTAINER_multihashmap_iterate (nodes_active,
1092                                                                                                                                                                          &cleanup_node,
1093                                                                                                                                                                          nodes_active);
1094                 update_stats (nodes_active);
1095                 GNUNET_CONTAINER_multihashmap_destroy (nodes_active);
1096                 nodes_active = NULL;
1097   }
1098
1099   if (NULL != nodes_inactive)
1100   {
1101                 GNUNET_CONTAINER_multihashmap_iterate (nodes_inactive,
1102                                                                                                                                                                          &cleanup_node,
1103                                                                                                                                                                          nodes_inactive);
1104                 update_stats (nodes_inactive);
1105                 GNUNET_CONTAINER_multihashmap_destroy (nodes_inactive);
1106                 nodes_inactive = NULL;
1107   }
1108 }
1109
1110 /* end of gnunet-daemon-experimentation_nodes.c */