stuff
[oweals/gnunet.git] / src / fs / gnunet-service-fs_cp.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_cp.c
23  * @brief API to handle 'connected peers'
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet-service-fs.h"
28 #include "gnunet-service-fs_cp.h"
29
30 /**
31  * How often do we flush trust values to disk?
32  */
33 #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
34
35
36 struct GSF_PeerTransmitHandle
37 {
38
39   /**
40    * Handle for an active request for transmission to this
41    * peer, or NULL (if core queue was full).
42    */
43   struct GNUNET_CORE_TransmitHandle *cth;
44
45   /**
46    * Time when this transmission request was issued.
47    */
48   struct GNUNET_TIME_Absolute transmission_request_start_time;
49
50   /**
51    * Timeout for this request.
52    */
53   struct GNUNET_TIME_Absolute timeout;
54
55   /**
56    * Task called on timeout, or 0 for none.
57    */
58   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
59
60   /**
61    * Function to call to get the actual message.
62    */
63   GSF_GetMessageCallback gmc;
64
65   /**
66    * Peer this request targets.
67    */
68   struct GSF_ConnectedPeer *cp;
69
70   /**
71    * Closure for 'gmc'.
72    */
73   void *gmc_cls;
74
75   /**
76    * Size of the message to be transmitted.
77    */
78   size_t size;
79
80   /**
81    * GNUNET_YES if this is a query, GNUNET_NO for content.
82    */
83   int is_query;
84
85   /**
86    * Priority of this request.
87    */
88   uint32_t priority;
89
90 };
91
92
93 /**
94  * A connected peer.
95  */
96 struct GSF_ConnectedPeer 
97 {
98
99   /**
100    * Performance data for this peer.
101    */
102   struct GSF_PeerPerformanceData ppd;
103
104   /**
105    * Time until when we blocked this peer from migrating
106    * data to us.
107    */
108   struct GNUNET_TIME_Absolute last_migration_block;
109
110   /**
111    * Messages (replies, queries, content migration) we would like to
112    * send to this peer in the near future.  Sorted by priority, head.
113    */
114   struct GSF_PeerTransmitHandle *pth_head;
115
116   /**
117    * Messages (replies, queries, content migration) we would like to
118    * send to this peer in the near future.  Sorted by priority, tail.
119    */
120   struct GSF_PeerTransmitHandle *pth_tail;
121
122   /**
123    * Context of our GNUNET_CORE_peer_change_preference call (or NULL).
124    */
125   struct GNUNET_CORE_InformationRequestContext *irc;
126
127   /**
128    * ID of delay task for scheduling transmission.
129    */
130   GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task;
131
132   /**
133    * Increase in traffic preference still to be submitted
134    * to the core service for this peer.
135    */
136   uint64_t inc_preference;
137
138   /**
139    * Trust rating for this peer
140    */
141   uint32_t trust;
142
143   /**
144    * Trust rating for this peer on disk.
145    */
146   uint32_t disk_trust;
147
148   /**
149    * The peer's identity.
150    */
151   GNUNET_PEER_Id pid;
152
153   /**
154    * Which offset in "last_p2p_replies" will be updated next?
155    * (we go round-robin).
156    */
157   unsigned int last_p2p_replies_woff;
158
159   /**
160    * Which offset in "last_client_replies" will be updated next?
161    * (we go round-robin).
162    */
163   unsigned int last_client_replies_woff;
164
165   /**
166    * Current offset into 'last_request_times' ring buffer.
167    */
168   unsigned int last_request_times_off;
169
170 };
171
172
173 /**
174  * Map from peer identities to 'struct GSF_ConnectPeer' entries.
175  */
176 static struct GNUNET_CONTAINER_MultiHashMap *cp_map;
177
178
179 /**
180  * Where do we store trust information?
181  */
182 static char *trustDirectory;
183
184
185 /**
186  * Get the filename under which we would store the GNUNET_HELLO_Message
187  * for the given host and protocol.
188  * @return filename of the form DIRECTORY/HOSTID
189  */
190 static char *
191 get_trust_filename (const struct GNUNET_PeerIdentity *id)
192 {
193   struct GNUNET_CRYPTO_HashAsciiEncoded fil;
194   char *fn;
195
196   GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil);
197   GNUNET_asprintf (&fn, "%s%s%s", trustDirectory, DIR_SEPARATOR_STR, &fil);
198   return fn;
199 }
200
201
202 /**
203  * Find latency information in 'atsi'.
204  *
205  * @param atsi performance data
206  * @return connection latency
207  */
208 static struct GNUNET_TIME_Relative
209 get_latency (const struct GNUNET_TRANSPORT_ATS_Information *atsi)
210 {
211   if (atsi == NULL)
212     return GNUNET_TIME_UNIT_SECONDS;
213   while ( (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR) &&
214           (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_QUALITY_NET_DELAY) )
215     atsi++;
216   if (ntohl (atsi->type) == GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR) 
217     {
218       GNUNET_break (0);
219       /* how can we not have latency data? */
220       return GNUNET_TIME_UNIT_SECONDS;
221     }
222   return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
223                                         ntohl (atsi->value));
224 }
225
226
227 /**
228  * Update the performance information kept for the given peer.
229  *
230  * @param cp peer record to update
231  * @param atsi transport performance data
232  */
233 static void
234 update_atsi (struct GSF_ConnectedPeer *cp,
235              const struct GNUNET_TRANSPORT_ATS_Information *atsi)
236 {
237   // FIXME: merge atsi into cp's performance data!
238 }
239
240
241 /**
242  * A peer connected to us.  Setup the connected peer
243  * records.
244  *
245  * @param peer identity of peer that connected
246  * @param atsi performance data for the connection
247  * @return handle to connected peer entry
248  */
249 struct GSF_ConnectedPeer *
250 GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer,
251                            const struct GNUNET_TRANSPORT_ATS_Information *atsi)
252 {
253   struct GSF_ConnectedPeer *cp;
254   char *fn;
255   uint32_t trust;
256   struct GNUNET_TIME_Relative latency;
257
258   cp = GNUNET_malloc (sizeof (struct GSF_ConnectedPeer));
259   cp->transmission_delay = GNUNET_LOAD_value_init (latency);
260   cp->pid = GNUNET_PEER_intern (peer);
261   fn = get_trust_filename (peer);
262   if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
263       (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
264     cp->disk_trust = cp->trust = ntohl (trust);
265   GNUNET_free (fn);
266   GNUNET_break (GNUNET_OK ==
267                 GNUNET_CONTAINER_multihashmap_put (cp_map,
268                                                    &peer->hashPubKey,
269                                                    cp,
270                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
271   update_atsi (cp, atsi);
272
273
274   // FIXME: notify plan & migration about new peer!
275   
276   return cp;
277 }
278
279
280 /**
281  * Core is ready to transmit to a peer, get the message.
282  *
283  * @param cls the 'struct GSF_PeerTransmitHandle' of the message
284  * @param size number of bytes core is willing to take
285  * @param buf where to copy the message
286  * @return number of bytes copied to buf
287  */
288 static size_t
289 peer_transmit_ready_cb (void *cls,
290                         size_t size,
291                         void *buf)
292 {
293   struct GSF_PeerTransmitHandle *pth = cls;
294   struct GSF_ConnectedPeer *cp;
295   size_t ret;
296
297   cp = pth->cp;
298   GNUNET_CONTAINER_DLL_remove (cp->pth_head,
299                                cp->pth_tail,
300                                pth);
301   // FIXME: update 'cp' counters!
302   ret = pth->gmc (pth->gmc_cls, 
303                   0, NULL);
304   GNUNET_free (pth);  
305   return ret;
306 }
307
308
309 /**
310  * Function called if there has been a timeout trying to satisfy
311  * a transmission request.
312  *
313  * @param cls the 'struct GSF_PeerTransmitHandle' of the request 
314  * @param tc scheduler context
315  */
316 static void
317 peer_transmit_timeout (void *cls,
318                        const struct GNUNET_SCHEDULER_TaskContext *tc)
319 {
320   struct GSF_PeerTransmitHandle *pth = cls;
321   struct GSF_ConnectedPeer *cp;
322   
323   pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
324   cp = pth->cp;
325   GNUNET_CONTAINER_DLL_remove (cp->pth_head,
326                                cp->pth_tail,
327                                pth);
328   // FIXME: update 'cp' counters!
329   pth->gmc (pth->gmc_cls, 
330             0, NULL);
331   GNUNET_free (pth);
332 }
333
334
335 /**
336  * Transmit a message to the given peer as soon as possible.
337  * If the peer disconnects before the transmission can happen,
338  * the callback is invoked with a 'NULL' buffer.
339  *
340  * @param peer target peer
341  * @param is_query is this a query (GNUNET_YES) or content (GNUNET_NO)
342  * @param priority how important is this request?
343  * @param timeout when does this request timeout (call gmc with error)
344  * @param size number of bytes we would like to send to the peer
345  * @param gmc function to call to get the message
346  * @param gmc_cls closure for gmc
347  * @return handle to cancel request
348  */
349 struct GSF_PeerTransmitHandle *
350 GSF_peer_transmit_ (struct GSF_ConnectedPeer *peer,
351                     int is_query,
352                     uint32_t priority,
353                     struct GNUNET_TIME_Relative timeout,
354                     size_t size,
355                     GSF_GetMessageCallback gmc,
356                     void *gmc_cls)
357 {
358   struct GSF_ConnectedPeer *cp;
359   struct GSF_PeerTransmitHandle *pth;
360   struct GSF_PeerTransmitHandle *pos;
361   struct GSF_PeerTransmitHandle *prev;
362   struct GNUNET_PeerIdentity target;
363
364   cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
365                                           &peer->hashPubKey);
366   GNUNET_assert (NULL != cp);
367   pth = GNUNET_malloc (sizeof (struct GSF_PeerTransmitHandle));
368   pth->transmission_request_start_time = GNUNET_TIME_absolute_now ();
369   pth->timeout = GNUNET_TIME_relative_to_absolute (timeout);
370   pth->gmc = gmc;
371   pth->gmc_cls = gmc_cls;
372   pth->size = size;
373   pth->is_query = is_query;
374   pth->priority = priority;
375   pth->cp = cp;
376   /* insertion sort (by priority, descending) */
377   prev = NULL;
378   pos = cp->pth_head;
379   while ( (pos != NULL) &&
380           (pos->priority > priority) )
381     {
382       prev = pos;
383       pos = pos->next;
384     }
385   if (prev == NULL)
386     GNUNET_CONTAINER_DLL_insert_head (cp->pth_head,
387                                       cp->pth_tail,
388                                       pth);
389   else
390     GNUNET_CONTAINER_DLL_insert_after (cp->pth_head,
391                                        cp->pth_tail,
392                                        prev,
393                                        pth);
394   GNUNET_PEER_resolve (cp->pid,
395                        &target);
396   pth->cth = GNUNET_CORE_notify_transmit_ready (core,
397                                                 priority,
398                                                 timeout,
399                                                 &target,
400                                                 size,
401                                                 &peer_transmit_ready_cb,
402                                                 pth);
403   /* pth->cth could be NULL here, that's OK, we'll try again
404      later... */
405   if (pth->cth == NULL)
406     pth->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
407                                                       &peer_transmit_timeout,
408                                                       pth);
409   return pth;
410 }
411
412
413 /**
414  * Cancel an earlier request for transmission.
415  *
416  * @param pth request to cancel
417  */
418 void
419 GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth)
420 {
421   struct GSF_PeerTransmitHandle *pth = cls;
422   struct GSF_ConnectedPeer *cp;
423
424   if (pth->timeout_task != GNUNET_SCHEDULER_NO_TASK)
425     {
426       GNUNET_SCHEDULER_cancel (pth->timeout_task);
427       pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
428     }
429   cp = pth->cp;
430   GNUNET_CONTAINER_DLL_remove (cp->pth_head,
431                                cp->pth_tail,
432                                pth);
433   // FIXME: update 'cp' counters!
434   GNUNET_free (pth);
435 }
436
437
438 /**
439  * Report on receiving a reply; update the performance record of the given peer.
440  *
441  * @param peer responding peer (will be updated)
442  * @param request_time time at which the original query was transmitted
443  * @param request_priority priority of the original request
444  * @param initiator_client local client on responsible for query (or NULL)
445  * @param initiator_peer other peer responsible for query (or NULL)
446  */
447 void
448 GSF_peer_update_performance_ (struct GSF_ConnectedPeer *peer,
449                               GNUNET_TIME_Absolute request_time,
450                               uint32_t request_priority,
451                               const struct GSF_LocalClient *initiator_client,
452                               const struct GSF_ConnectedPeer *initiator_peer)
453 {
454   // FIXME...
455 }
456
457
458 /**
459  * Method called whenever a given peer has a status change.
460  *
461  * @param cls closure
462  * @param peer peer identity this notification is about
463  * @param bandwidth_in available amount of inbound bandwidth
464  * @param bandwidth_out available amount of outbound bandwidth
465  * @param timeout absolute time when this peer will time out
466  *        unless we see some further activity from it
467  * @param atsi status information
468  */
469 void
470 GSF_peer_status_handler_ (void *cls,
471                           const struct GNUNET_PeerIdentity *peer,
472                           struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in,
473                           struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
474                           struct GNUNET_TIME_Absolute timeout,
475                           const struct GNUNET_TRANSPORT_ATS_Information *atsi)
476 {
477   struct GSF_ConnectedPeer *cp;
478
479   cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
480                                           &peer->hashPubKey);
481   GNUNET_assert (NULL != cp);
482   update_atsi (cp, atsi);
483 }
484
485
486 /**
487  * A peer disconnected from us.  Tear down the connected peer
488  * record.
489  *
490  * @param cls unused
491  * @param peer identity of peer that connected
492  */
493 void
494 GSF_peer_disconnect_handler_ (void *cls,
495                               const struct GNUNET_PeerIdentity *peer)
496 {
497   struct GSF_ConnectedPeer *cp;
498
499   cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
500                                           &peer->hashPubKey);
501   GNUNET_assert (NULL != cp);
502   GNUNET_CONTAINER_multihashmap_remove (cp_map,
503                                         &peer->hashPubKey,
504                                         cp);
505   // FIXME: more cleanup
506   GNUNET_free (cp);
507 }
508
509
510 /**
511  * Closure for 'call_iterator'.
512  */
513 struct IterationContext
514 {
515   /**
516    * Function to call on each entry.
517    */
518   GSF_ConnectedPeerIterator it;
519
520   /**
521    * Closure for 'it'.
522    */
523   void *it_cls;
524 };
525
526
527 /**
528  * Function that calls the callback for each peer.
529  *
530  * @param cls the 'struct IterationContext*'
531  * @param key identity of the peer
532  * @param value the 'struct GSF_ConnectedPeer*'
533  * @return GNUNET_YES to continue iteration
534  */
535 static int
536 call_iterator (void *cls,
537                const GNUNET_HashCode *key,
538                void *value)
539 {
540   struct IterationContext *ic = cls;
541   struct GSF_ConnectedPeer *cp = value;
542   
543   ic->it (ic->it_cls,
544           (const struct GNUNET_PeerIdentity*) key,
545           cp,
546           &cp->ppd);
547   return GNUNET_YES;
548 }
549
550
551 /**
552  * Iterate over all connected peers.
553  *
554  * @param it function to call for each peer
555  * @param it_cls closure for it
556  */
557 void
558 GSF_iterate_connected_peers_ (GSF_ConnectedPeerIterator it,
559                               void *it_cls)
560 {
561   struct IterationContext ic;
562
563   ic.it = it;
564   ic.it_cls = it_cls;
565   GNUNET_CONTAINER_multihashmap_iterate (cp_map,
566                                          &call_iterator,
567                                          &ic);
568 }
569
570
571 /**
572  * Try to reserve bandwidth (to receive data FROM the given peer).
573  * This function must only be called ONCE per connected peer at a
574  * time; it can be called again after the 'rc' callback was invoked.
575  * If the peer disconnects, the request is (silently!) ignored (and
576  * the requester is responsible to register for notification about the
577  * peer disconnect if any special action needs to be taken in this
578  * case).
579  *
580  * @param cp peer to reserve bandwidth from
581  * @param size number of bytes to reserve
582  * @param rc function to call upon reservation success or failure
583  * @param rc_cls closure for rc
584  */
585 void
586 GSF_connected_peer_reserve_ (struct GSF_ConnectedPeer *cp,
587                              size_t size,
588                              GSF_PeerReserveCallback rc,
589                              void *rc_cls)
590 {
591   // FIXME: should we allow queueing multiple reservation requests?
592   // FIXME: what about cancellation?
593   // FIXME: change docu on peer disconnect handling?
594   if (NULL != cp->irc)
595     {
596       rc (rc_cls, cp, GNUNET_NO);
597       return;
598     }
599   // FIXME...
600 }
601
602
603 /**
604  * Write host-trust information to a file - flush the buffer entry!
605  *
606  * @param cls closure, not used
607  * @param key host identity
608  * @param value the 'struct GSF_ConnectedPeer' to flush
609  * @return GNUNET_OK to continue iteration
610  */
611 static int
612 flush_trust (void *cls,
613              const GNUNET_HashCode *key,
614              void *value)
615 {
616   struct GSF_ConnectedPeer *cp = value;
617   char *fn;
618   uint32_t trust;
619   struct GNUNET_PeerIdentity pid;
620
621   if (cp->trust == cp->disk_trust)
622     return GNUNET_OK;                     /* unchanged */
623   GNUNET_PEER_resolve (cp->pid,
624                        &pid);
625   fn = get_trust_filename (&pid);
626   if (cp->trust == 0)
627     {
628       if ((0 != UNLINK (fn)) && (errno != ENOENT))
629         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
630                                   GNUNET_ERROR_TYPE_BULK, "unlink", fn);
631     }
632   else
633     {
634       trust = htonl (cp->trust);
635       if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust, 
636                                                     sizeof(uint32_t),
637                                                     GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE
638                                                     | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ))
639         cp->disk_trust = cp->trust;
640     }
641   GNUNET_free (fn);
642   return GNUNET_OK;
643 }
644
645
646 /**
647  * Call this method periodically to flush trust information to disk.
648  *
649  * @param cls closure, not used
650  * @param tc task context, not used
651  */
652 static void
653 cron_flush_trust (void *cls,
654                   const struct GNUNET_SCHEDULER_TaskContext *tc)
655 {
656
657   if (NULL == cp_map)
658     return;
659   GNUNET_CONTAINER_multihashmap_iterate (cp_map,
660                                          &flush_trust,
661                                          NULL);
662   if (NULL == tc)
663     return;
664   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
665     return;
666   GNUNET_SCHEDULER_add_delayed (TRUST_FLUSH_FREQ, 
667                                 &cron_flush_trust, 
668                                 NULL);
669 }
670
671
672 /**
673  * Initialize peer management subsystem.
674  *
675  * @param cfg configuration to use
676  */
677 void
678 GSF_connected_peer_init_ (struct GNUNET_CONFIGURATION_Handle *cfg)
679 {
680   cp_map = GNUNET_CONTAINER_multihashmap_create (128);
681   GNUNET_assert (GNUNET_OK ==
682                  GNUNET_CONFIGURATION_get_value_filename (cfg,
683                                                           "fs",
684                                                           "TRUST",
685                                                           &trustDirectory));
686   GNUNET_DISK_directory_create (trustDirectory);
687   GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_HIGH,
688                                       &cron_flush_trust, NULL);
689 }
690
691
692 /**
693  * Iterator to free peer entries.
694  *
695  * @param cls closure, unused
696  * @param key current key code
697  * @param value value in the hash map (peer entry)
698  * @return GNUNET_YES (we should continue to iterate)
699  */
700 static int 
701 clean_peer (void *cls,
702             const GNUNET_HashCode * key,
703             void *value)
704 {
705   GSF_peer_disconnect_handler_ (NULL, 
706                                 (const struct GNUNET_PeerIdentity*) key);
707   return GNUNET_YES;
708 }
709
710
711 /**
712  * Shutdown peer management subsystem.
713  */
714 void
715 GSF_connected_peer_done_ ()
716 {
717   cron_flush_trust (NULL, NULL);
718   GNUNET_CONTAINER_multihashmap_iterate (cp_peers,
719                                          &clean_peer,
720                                          NULL);
721   GNUNET_CONTAINER_multihashmap_destroy (cp_map);
722   cp_map = NULL;
723   GNUNET_free (trustDirectory);
724   trustDirectory = NULL;
725 }
726
727
728
729 #endif
730 /* end of gnunet-service-fs_cp.h */