ng
[oweals/gnunet.git] / src / fragmentation / fragmentation.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2006, 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20 /**
21  * @file fragmentation/fragmentation.c
22  * @brief fragmentation and defragmentation, this code allows
23  *        sending and receiving messages that are larger than
24  *        the MTU of the transport.  Messages are still limited
25  *        to a maximum size of 65535 bytes, which is a good
26  *        idea because otherwise we may need ungainly fragmentation
27  *        buffers.  Each connected peer can have at most one
28  *        fragmented packet at any given point in time (prevents
29  *        DoS attacks).  Fragmented messages that have not been
30  *        completed after a certain amount of time are discarded.
31  * @author Christian Grothoff
32  */
33
34 #include "platform.h"
35 #include "gnunet_fragmentation_lib.h"
36
37 /**
38  * Message fragment.  This header is followed
39  * by the actual data of the fragment.
40  */
41 struct Fragment
42 {
43
44   struct GNUNET_MessageHeader header;
45
46   /**
47    * Fragment offset.
48    */
49   uint32_t off GNUNET_PACKED;
50
51   /**
52    * "unique" id for the fragment
53    */
54   uint64_t id GNUNET_PACKED;
55
56 };
57
58
59 /**
60  * Defragmentation context.
61  */
62 struct GNUNET_FRAGMENT_Context
63 {
64 };
65
66
67 /**
68  * Fragment an over-sized message.
69  *
70  * @param msg the message to fragment
71  * @param mtu the maximum message size
72  * @param proc function to call for each fragment
73  * @param proc_cls closure for proc
74  */
75 void
76 GNUNET_FRAGMENT_fragment (const struct GNUNET_MessageHeader *msg,
77                           uint16_t mtu,
78                           GNUNET_FRAGMENT_MessageProcessor proc,
79                           void *proc_cls)
80 {
81   GNUNET_assert (0);
82 }
83
84
85 /**
86  * Create a defragmentation context.
87  *
88  * @param stats statistics context
89  * @param proc function to call with defragmented messages
90  * @param proc_cls closure for proc
91  * @return the defragmentation context
92  */
93 struct GNUNET_FRAGMENT_Context *
94 GNUNET_FRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
95                                 GNUNET_FRAGMENT_MessageProcessor proc,
96                                 void *proc_cls)
97 {
98   return NULL;
99 }
100
101
102 /**
103  * Destroy the given defragmentation context.
104  */
105 void
106 GNUNET_FRAGMENT_context_destroy (struct GNUNET_FRAGMENT_Context *ctx)
107 {
108   GNUNET_assert (0);
109 }
110
111
112 /**
113  * We have received a fragment.  Process it.
114  *
115  * @param ctx the context
116  * @param sender who transmitted the fragment
117  * @param msg the message that was received
118  */
119 void
120 GNUNET_FRAGMENT_process (struct GNUNET_FRAGMENT_Context *ctx,
121                          const struct GNUNET_PeerIdentity *sender,
122                          const struct GNUNET_MessageHeader *msg)
123 {
124   GNUNET_assert (0);
125 }
126
127
128
129 #if 0
130
131 /**
132  * How many buckets does the fragment hash table
133  * have?
134  */
135 #define DEFRAG_BUCKET_COUNT 16
136
137 /**
138  * After how long do fragments time out?
139  */
140 #ifndef DEFRAGMENTATION_TIMEOUT
141 #define DEFRAGMENTATION_TIMEOUT (3 * GNUNET_CRON_MINUTES)
142 #endif
143
144 /**
145  * Entry in the linked list of fragments.
146  */
147 typedef struct FL
148 {
149   struct FL *link;
150   P2P_fragmentation_MESSAGE *frag;
151 } FL;
152
153 /**
154  * Entry in the GNUNET_hash table of fragments.
155  */
156 typedef struct FC
157 {
158   struct FC *next;
159   FL *head;
160   GNUNET_PeerIdentity sender;
161   int id;
162   GNUNET_CronTime ttl;
163 } FC;
164
165 #define FRAGSIZE(fl) ((ntohs(fl->frag->header.size)-sizeof(P2P_fragmentation_MESSAGE)))
166
167 static GNUNET_CoreAPIForPlugins *coreAPI;
168
169 static GNUNET_Stats_ServiceAPI *stats;
170
171 static int stat_defragmented;
172
173 static int stat_fragmented;
174
175 static int stat_discarded;
176
177 /**
178  * Hashtable *with* collision management!
179  */
180 static FC *defragmentationCache[DEFRAG_BUCKET_COUNT];
181
182 /**
183  * Lock for the defragmentation cache.
184  */
185 static struct GNUNET_Mutex *defragCacheLock;
186
187 static void
188 freeFL (FL * fl, int c)
189 {
190   while (fl != NULL)
191     {
192       FL *link = fl->link;
193       if (stats != NULL)
194         stats->change (stat_discarded, c);
195       GNUNET_free (fl->frag);
196       GNUNET_free (fl);
197       fl = link;
198     }
199 }
200
201 /**
202  * This cron job ensures that we purge buffers of fragments
203  * that have timed out.  It can run in much longer intervals
204  * than the defragmentationCron, e.g. every 60s.
205  * <p>
206  * This method goes through the hashtable, finds entries that
207  * have timed out and removes them (and all the fragments that
208  * belong to the entry).  It's a bit more complicated as the
209  * collision list is also collapsed.
210  */
211 static void
212 defragmentationPurgeCron (void *unused)
213 {
214   int i;
215   FC *smf;
216   FC *next;
217   FC *last;
218
219   GNUNET_mutex_lock (defragCacheLock);
220   for (i = 0; i < DEFRAG_BUCKET_COUNT; i++)
221     {
222       last = NULL;
223       smf = defragmentationCache[i];
224       while (smf != NULL)
225         {
226           if (smf->ttl < GNUNET_get_time ())
227             {
228               /* free linked list of fragments */
229               freeFL (smf->head, 1);
230               next = smf->next;
231               GNUNET_free (smf);
232               if (last == NULL)
233                 defragmentationCache[i] = next;
234               else
235                 last->next = next;
236               smf = next;
237             }
238           else
239             {
240               last = smf;
241               smf = smf->next;
242             }
243         }                       /* while smf != NULL */
244     }                           /* for all buckets */
245   GNUNET_mutex_unlock (defragCacheLock);
246 }
247
248 /**
249  * Check if this fragment-list is complete.  If yes, put it together,
250  * process and free all buffers.  Does not free the pep
251  * itself (but sets the TTL to 0 to have the cron free it
252  * in the next iteration).
253  *
254  * @param pep the entry in the GNUNET_hash table
255  */
256 static void
257 checkComplete (FC * pep)
258 {
259   FL *pos;
260   unsigned short off;
261   unsigned short len;
262   char *msg;
263
264   GNUNET_GE_ASSERT (NULL, pep != NULL);
265   pos = pep->head;
266   if (pos == NULL)
267     return;
268   len = ntohs (pos->frag->len);
269   if (len == 0)
270     goto CLEANUP;               /* really bad error! */
271   off = 0;
272   while ((pos != NULL) && (ntohs (pos->frag->off) <= off))
273     {
274       if (off >= off + FRAGSIZE (pos))
275         goto CLEANUP;           /* error! */
276       if (ntohs (pos->frag->off) + FRAGSIZE (pos) > off)
277         off = ntohs (pos->frag->off) + FRAGSIZE (pos);
278       else
279         goto CLEANUP;           /* error! */
280       pos = pos->link;
281     }
282   if (off < len)
283     return;                     /* some fragment is still missing */
284
285   msg = GNUNET_malloc (len);
286   pos = pep->head;
287   while (pos != NULL)
288     {
289       memcpy (&msg[ntohs (pos->frag->off)], &pos->frag[1], FRAGSIZE (pos));
290       pos = pos->link;
291     }
292   if (stats != NULL)
293     stats->change (stat_defragmented, 1);
294 #if 0
295   printf ("Finished defragmentation!\n");
296 #endif
297   /* handle message! */
298   coreAPI->loopback_send (&pep->sender, msg, len, GNUNET_YES, NULL);
299   GNUNET_free (msg);
300 CLEANUP:
301   /* free fragment buffers */
302   freeFL (pep->head, 0);
303   pep->head = NULL;
304   pep->ttl = 0;
305 }
306
307 /**
308  * See if the new fragment is a part of this entry and join them if
309  * yes.  Return GNUNET_SYSERR if the fragments do not match.  Return GNUNET_OK if
310  * the fragments do match and the fragment has been processed.  The
311  * defragCacheLock is already acquired by the caller whenever this
312  * method is called.<p>
313  *
314  * @param entry the entry in the cache
315  * @param pep the new entry
316  * @param packet the ip part in the new entry
317  */
318 static int
319 tryJoin (FC * entry,
320          const GNUNET_PeerIdentity * sender,
321          const P2P_fragmentation_MESSAGE * packet)
322 {
323   /* frame before ours; may end in the middle of
324      our frame or before it starts; NULL if we are
325      the earliest position we have received so far */
326   FL *before;
327   /* frame after ours; may start in the middle of
328      our frame or after it; NULL if we are the last
329      fragment we have received so far */
330   FL *after;
331   /* current position in the frame-list */
332   FL *pos;
333   /* the new entry that we're inserting */
334   FL *pep;
335   FL *tmp;
336   unsigned short end;
337
338   GNUNET_GE_ASSERT (NULL, entry != NULL);
339   if (0 != memcmp (sender, &entry->sender, sizeof (GNUNET_PeerIdentity)))
340     return GNUNET_SYSERR;       /* wrong fragment list, try another! */
341   if (ntohl (packet->id) != entry->id)
342     return GNUNET_SYSERR;       /* wrong fragment list, try another! */
343 #if 0
344   printf ("Received fragment %u from %u to %u\n",
345           ntohl (packet->id),
346           ntohs (packet->off),
347           ntohs (packet->off) + ntohs (packet->header.size) -
348           sizeof (P2P_fragmentation_MESSAGE));
349 #endif
350   pos = entry->head;
351   if ((pos != NULL) && (packet->len != pos->frag->len))
352     return GNUNET_SYSERR;       /* wrong fragment size */
353
354   before = NULL;
355   /* find the before-frame */
356   while ((pos != NULL) && (ntohs (pos->frag->off) < ntohs (packet->off)))
357     {
358       before = pos;
359       pos = pos->link;
360     }
361
362   /* find the after-frame */
363   end =
364     ntohs (packet->off) + ntohs (packet->header.size) -
365     sizeof (P2P_fragmentation_MESSAGE);
366   if (end <= ntohs (packet->off))
367     {
368       GNUNET_GE_LOG (NULL,
369                      GNUNET_GE_DEVELOPER | GNUNET_GE_DEBUG | GNUNET_GE_BULK,
370                      "Received invalid fragment at %s:%d\n", __FILE__,
371                      __LINE__);
372       return GNUNET_SYSERR;     /* yuck! integer overflow! */
373     }
374
375   if (before != NULL)
376     after = before;
377   else
378     after = entry->head;
379   while ((after != NULL) && (ntohs (after->frag->off) < end))
380     after = after->link;
381
382   if ((before != NULL) && (before == after))
383     {
384       /* this implies after or before != NULL and thereby the new
385          fragment is redundant as it is fully enclosed in an earlier
386          fragment */
387       if (stats != NULL)
388         stats->change (stat_defragmented, 1);
389       return GNUNET_OK;         /* drop, there is a packet that spans our range! */
390     }
391
392   if ((before != NULL) &&
393       (after != NULL) &&
394       ((htons (before->frag->off) +
395         FRAGSIZE (before)) >= htons (after->frag->off)))
396     {
397       /* this implies that the fragment that starts before us and the
398          fragment that comes after this one leave no space in the middle
399          or even overlap; thus we can drop this redundant piece */
400       if (stats != NULL)
401         stats->change (stat_defragmented, 1);
402       return GNUNET_OK;
403     }
404
405   /* allocate pep */
406   pep = GNUNET_malloc (sizeof (FC));
407   pep->frag = GNUNET_malloc (ntohs (packet->header.size));
408   memcpy (pep->frag, packet, ntohs (packet->header.size));
409   pep->link = NULL;
410
411   if (before == NULL)
412     {
413       pep->link = after;
414       pos = entry->head;
415       while (pos != after)
416         {
417           tmp = pos->link;
418           GNUNET_free (pos->frag);
419           GNUNET_free (pos);
420           pos = tmp;
421         }
422       entry->head = pep;
423       goto FINISH;
424       /* end of insert first */
425     }
426
427   if (after == NULL)
428     {
429       /* insert last: find the end, free everything after it */
430       freeFL (before->link, 1);
431       before->link = pep;
432       goto FINISH;
433     }
434
435   /* ok, we are filling the middle between two fragments; insert.  If
436      there is anything else in the middle, it can be dropped as we're
437      bigger & cover that area as well */
438   /* free everything between before and after */
439   pos = before->link;
440   while (pos != after)
441     {
442       tmp = pos->link;
443       GNUNET_free (pos->frag);
444       GNUNET_free (pos);
445       pos = tmp;
446     }
447   before->link = pep;
448   pep->link = after;
449
450 FINISH:
451   entry->ttl = GNUNET_get_time () + DEFRAGMENTATION_TIMEOUT;
452   checkComplete (entry);
453   return GNUNET_OK;
454 }
455
456 /**
457  * Defragment the given fragment and pass to handler once
458  * defragmentation is complete.
459  *
460  * @param frag the packet to defragment
461  * @return GNUNET_SYSERR if the fragment is invalid
462  */
463 static int
464 processFragment (const GNUNET_PeerIdentity * sender,
465                  const GNUNET_MessageHeader * frag)
466 {
467   unsigned int hash;
468   FC *smf;
469
470   if (ntohs (frag->size) < sizeof (P2P_fragmentation_MESSAGE))
471     return GNUNET_SYSERR;
472
473   GNUNET_mutex_lock (defragCacheLock);
474   hash = sender->hashPubKey.bits[0] % DEFRAG_BUCKET_COUNT;
475   smf = defragmentationCache[hash];
476   while (smf != NULL)
477     {
478       if (GNUNET_OK ==
479           tryJoin (smf, sender, (P2P_fragmentation_MESSAGE *) frag))
480         {
481           GNUNET_mutex_unlock (defragCacheLock);
482           return GNUNET_OK;
483         }
484       if (0 == memcmp (sender, &smf->sender, sizeof (GNUNET_PeerIdentity)))
485         {
486           freeFL (smf->head, 1);
487           break;
488         }
489       smf = smf->next;
490     }
491   if (smf == NULL)
492     {
493       smf = GNUNET_malloc (sizeof (FC));
494       smf->next = defragmentationCache[hash];
495       defragmentationCache[hash] = smf;
496       smf->ttl = GNUNET_get_time () + DEFRAGMENTATION_TIMEOUT;
497       smf->sender = *sender;
498     }
499   smf->id = ntohl (((P2P_fragmentation_MESSAGE *) frag)->id);
500   smf->head = GNUNET_malloc (sizeof (FL));
501   smf->head->link = NULL;
502   smf->head->frag = GNUNET_malloc (ntohs (frag->size));
503   memcpy (smf->head->frag, frag, ntohs (frag->size));
504
505   GNUNET_mutex_unlock (defragCacheLock);
506   return GNUNET_OK;
507 }
508
509 typedef struct
510 {
511   GNUNET_PeerIdentity sender;
512   /* maximums size of each fragment */
513   unsigned short mtu;
514   /** how long is this message part expected to be? */
515   unsigned short len;
516   /** when did we intend to transmit? */
517   GNUNET_CronTime transmissionTime;
518 } FragmentBMC;
519
520 /**
521  * Send a message that had to be fragmented (right now!).  First grabs
522  * the first part of the message (obtained from ctx->se) and stores
523  * that in a P2P_fragmentation_MESSAGE envelope.  The remaining fragments are
524  * added to the send queue with GNUNET_EXTREME_PRIORITY (to ensure that they
525  * will be transmitted next).  The logic here is that if the priority
526  * for the first fragment was sufficiently high, the priority should
527  * also have been sufficiently high for all of the other fragments (at
528  * this time) since they have the same priority.  And we want to make
529  * sure that we send all of them since just sending the first fragment
530  * and then going to other messages of equal priority would not be
531  * such a great idea (i.e. would just waste bandwidth).
532  */
533 static int
534 fragmentBMC (void *buf, void *cls, unsigned short len)
535 {
536   FragmentBMC *ctx = cls;
537   static int idGen = 0;
538   P2P_fragmentation_MESSAGE *frag;
539   unsigned int pos;
540   int id;
541   unsigned short mlen;
542
543   if ((len < ctx->mtu) || (buf == NULL))
544     {
545       GNUNET_free (ctx);
546       return GNUNET_SYSERR;
547     }
548   if (stats != NULL)
549     stats->change (stat_fragmented, 1);
550   id = (idGen++) + GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, 512);
551   /* write first fragment to buf */
552   frag = (P2P_fragmentation_MESSAGE *) buf;
553   frag->header.size = htons (len);
554   frag->header.type = htons (GNUNET_P2P_PROTO_MESSAGE_FRAGMENT);
555   frag->id = id;
556   frag->off = htons (0);
557   frag->len = htons (ctx->len);
558   memcpy (&frag[1], &ctx[1], len - sizeof (P2P_fragmentation_MESSAGE));
559
560   /* create remaining fragments, add to queue! */
561   pos = len - sizeof (P2P_fragmentation_MESSAGE);
562   frag = GNUNET_malloc (ctx->mtu);
563   while (pos < ctx->len)
564     {
565       mlen = sizeof (P2P_fragmentation_MESSAGE) + ctx->len - pos;
566       if (mlen > ctx->mtu)
567         mlen = ctx->mtu;
568       GNUNET_GE_ASSERT (NULL, mlen > sizeof (P2P_fragmentation_MESSAGE));
569       frag->header.size = htons (mlen);
570       frag->header.type = htons (GNUNET_P2P_PROTO_MESSAGE_FRAGMENT);
571       frag->id = id;
572       frag->off = htons (pos);
573       frag->len = htons (ctx->len);
574       memcpy (&frag[1],
575               &((char *) (&ctx[1]))[pos],
576               mlen - sizeof (P2P_fragmentation_MESSAGE));
577       coreAPI->ciphertext_send (&ctx->sender,
578                                 &frag->header,
579                                 GNUNET_EXTREME_PRIORITY,
580                                 ctx->transmissionTime - GNUNET_get_time ());
581       pos += mlen - sizeof (P2P_fragmentation_MESSAGE);
582     }
583   GNUNET_GE_ASSERT (NULL, pos == ctx->len);
584   GNUNET_free (frag);
585   GNUNET_free (ctx);
586   return GNUNET_OK;
587 }
588
589 /**
590  * The given message must be fragmented.  Produce a placeholder that
591  * corresponds to the first fragment.  Once that fragment is scheduled
592  * for transmission, the placeholder should automatically add all of
593  * the other fragments (with very high priority).
594  */
595 void
596 fragment (const GNUNET_PeerIdentity * peer,
597           unsigned int mtu,
598           unsigned int prio,
599           unsigned int targetTime,
600           unsigned int len, GNUNET_BuildMessageCallback bmc, void *bmcClosure)
601 {
602   FragmentBMC *fbmc;
603   int xlen;
604
605   GNUNET_GE_ASSERT (NULL, len > mtu);
606   GNUNET_GE_ASSERT (NULL, mtu > sizeof (P2P_fragmentation_MESSAGE));
607   fbmc = GNUNET_malloc (sizeof (FragmentBMC) + len);
608   fbmc->mtu = mtu;
609   fbmc->sender = *peer;
610   fbmc->transmissionTime = targetTime;
611   fbmc->len = len;
612   if (bmc == NULL)
613     {
614       memcpy (&fbmc[1], bmcClosure, len);
615       GNUNET_free (bmcClosure);
616     }
617   else
618     {
619       if (GNUNET_SYSERR == bmc (&fbmc[1], bmcClosure, len))
620         {
621           GNUNET_free (fbmc);
622           return;
623         }
624     }
625   xlen = mtu - sizeof (P2P_fragmentation_MESSAGE);
626   coreAPI->ciphertext_send_with_callback (peer, &fragmentBMC, fbmc, mtu, prio * xlen / len,     /* compute new priority */
627                                           targetTime);
628 }
629
630 /**
631  * Initialize Fragmentation module.
632  */
633 GNUNET_Fragmentation_ServiceAPI *
634 provide_module_fragmentation (GNUNET_CoreAPIForPlugins * capi)
635 {
636   static GNUNET_Fragmentation_ServiceAPI ret;
637   int i;
638
639   coreAPI = capi;
640   stats = coreAPI->service_request ("stats");
641   if (stats != NULL)
642     {
643       stat_defragmented =
644         stats->create (gettext_noop ("# messages defragmented"));
645       stat_fragmented =
646         stats->create (gettext_noop ("# messages fragmented"));
647       stat_discarded = stats->create (gettext_noop ("# fragments discarded"));
648     }
649   for (i = 0; i < DEFRAG_BUCKET_COUNT; i++)
650     defragmentationCache[i] = NULL;
651   defragCacheLock = GNUNET_mutex_create (GNUNET_NO);
652   GNUNET_cron_add_job (coreAPI->cron,
653                        &defragmentationPurgeCron,
654                        60 * GNUNET_CRON_SECONDS, 60 * GNUNET_CRON_SECONDS,
655                        NULL);
656   GNUNET_GE_LOG (capi->ectx,
657                  GNUNET_GE_INFO | GNUNET_GE_USER | GNUNET_GE_REQUEST,
658                  _("`%s' registering handler %d\n"), "fragmentation",
659                  GNUNET_P2P_PROTO_MESSAGE_FRAGMENT);
660   capi->p2p_ciphertext_handler_register (GNUNET_P2P_PROTO_MESSAGE_FRAGMENT,
661                                          &processFragment);
662
663   ret.fragment = &fragment;
664   return &ret;
665 }
666
667 /**
668  * Shutdown fragmentation.
669  */
670 void
671 release_module_fragmentation ()
672 {
673   int i;
674
675   coreAPI->p2p_ciphertext_handler_unregister
676     (GNUNET_P2P_PROTO_MESSAGE_FRAGMENT, &processFragment);
677   GNUNET_cron_del_job (coreAPI->cron, &defragmentationPurgeCron,
678                        60 * GNUNET_CRON_SECONDS, NULL);
679   for (i = 0; i < DEFRAG_BUCKET_COUNT; i++)
680     {
681       FC *pos = defragmentationCache[i];
682       while (pos != NULL)
683         {
684           FC *next = pos->next;
685           freeFL (pos->head, 1);
686           GNUNET_free (pos);
687           pos = next;
688         }
689     }
690   if (stats != NULL)
691     {
692       coreAPI->service_release (stats);
693       stats = NULL;
694     }
695   GNUNET_mutex_destroy (defragCacheLock);
696   defragCacheLock = NULL;
697   coreAPI = NULL;
698 }
699
700 #endif
701
702 /* end of fragmentation.c */