make weakness more explicit
[oweals/gnunet.git] / src / fragmentation / fragmentation.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2006, 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19  */
20 /**
21  * @file fragmentation/fragmentation.c
22  * @brief fragmentation and defragmentation, this code allows
23  *        sending and receiving messages that are larger than
24  *        the MTU of the transport.  Messages are still limited
25  *        to a maximum size of 65535 bytes, which is a good
26  *        idea because otherwise we may need ungainly fragmentation
27  *        buffers.  Each connected peer can have at most one
28  *        fragmented packet at any given point in time (prevents
29  *        DoS attacks).  Fragmented messages that have not been
30  *        completed after a certain amount of time are discarded.
31  * @author Ji Lu
32  */
33
34 #include "platform.h"
35 #include "gnunet_fragmentation_lib.h"
36 #include "gnunet_protocols.h"
37 #include "gnunet_util_lib.h"
38 /**
39  * Message fragment.  This header is followed
40  * by the actual data of the fragment.
41  */
42 struct Fragment
43 {
44
45         struct GNUNET_MessageHeader header;
46
47         /**
48          * Fragment offset.
49          */
50         uint16_t off GNUNET_PACKED;
51
52         /**
53         * "unique" id for the fragment
54          */
55         uint32_t id GNUNET_PACKED;
56         uint16_t mtu;
57         uint16_t totalNum;
58         uint16_t totalSize;
59
60 };
61
62 struct GNUNET_FRAGEMENT_Ctxbuffer{
63         struct GNUNET_FRAGEMENT_Ctxbuffer *next;
64         uint32_t id;
65         uint16_t size;
66         char * buff;
67         int counter;
68         struct GNUNET_TIME_Absolute receivedTime;
69         struct GNUNET_PeerIdentity peerID;
70         int * num;
71 };
72
73
74 /**
75  * Defragmentation context.
76  */
77 struct GNUNET_FRAGMENT_Context
78 {
79         uint32_t maxNum;
80         struct GNUNET_FRAGEMENT_Ctxbuffer *buffer;
81         GNUNET_FRAGMENT_MessageProcessor proc;
82         void *proc_cls;
83 };
84
85
86 /**
87  * Fragment an over-sized message.
88  *
89  * @param msg the message to fragment
90  * @param mtu the maximum message size
91  * @param proc function to call for each fragment
92  * @param proc_cls closure for proc
93  */
94 void
95 GNUNET_FRAGMENT_fragment (const struct GNUNET_MessageHeader *msg,
96                 uint16_t mtu,
97                 GNUNET_FRAGMENT_MessageProcessor proc,
98                 void *proc_cls)
99 {
100         uint32_t id = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, 256);
101         size_t size = sizeof(struct Fragment);
102
103         if(ntohs(msg->size) > mtu-size){
104                 uint16_t lastSize;
105                 uint16_t num;
106                 uint16_t i;
107                 uint16_t actualNum;
108                 lastSize = ntohs(msg->size) % (mtu-size);
109                 num     = ntohs(msg->size) / (mtu - size);
110                 actualNum = num;
111                 if(lastSize!=0){
112                         actualNum = num+1;
113                 }
114                 for(i = 0; i<actualNum; i++)
115                 {
116                         struct Fragment *frag;
117                         if(actualNum != num){
118                                 if(i!=actualNum-1){
119                                         frag = (struct Fragment *)GNUNET_malloc(mtu);
120                                 }
121                                 else{
122                                         frag = (struct Fragment *)GNUNET_malloc(lastSize+size);
123                                         }
124                                 }
125                         else{
126                                         frag = (struct Fragment *)GNUNET_malloc(mtu);
127                                 }
128                         frag->header.type = htons(GNUNET_MESSAGE_TYPE_FRAGMENT);
129                         frag->id = htonl(id);
130                         frag->off = htons((mtu-size)*i);
131                         frag->mtu = htons(mtu);
132                         frag->totalNum = htons(actualNum);
133                         frag->totalSize = msg->size;
134                         char *tmpMsg =  (char *)msg;
135                         if(actualNum != num){
136                                 if(i!=actualNum-1){
137                                         frag->header.size = htons(mtu);
138                                         memcpy(&frag[1], tmpMsg + (mtu-size)*i, mtu - size);
139                                 }
140                                 else{
141                                         frag->header.size = htons(lastSize+size);
142                                         memcpy(&frag[1], tmpMsg + (mtu-size)*i, lastSize);
143                                 }
144                         }
145                         else{
146                                 frag->header.size = htons(mtu);
147                                 memcpy(&frag[1], tmpMsg + (mtu-size)*i, mtu - size);
148                         }
149                         proc(proc_cls, &frag->header);
150                         GNUNET_free(frag);
151                 }
152         }
153 }
154
155 /**
156  * Create a defragmentation context.
157  *
158  * @param stats statistics context
159  * @param proc function to call with defragmented messages
160  * @param proc_cls closure for proc
161  * @return the defragmentation context
162  */
163 struct GNUNET_FRAGMENT_Context *
164 GNUNET_FRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
165                 GNUNET_FRAGMENT_MessageProcessor proc,
166                 void *proc_cls)
167                 {
168         struct GNUNET_FRAGMENT_Context *ctx = (struct GNUNET_FRAGMENT_Context*)GNUNET_malloc(sizeof(struct GNUNET_FRAGMENT_Context));
169         ctx->maxNum = 100;
170         ctx->proc = proc;
171         ctx->proc_cls = proc_cls;
172         ctx->buffer = NULL;
173         return ctx;
174                 }
175
176
177 /**
178  * Destroy the given defragmentation context.
179  */
180 void
181 GNUNET_FRAGMENT_context_destroy (struct GNUNET_FRAGMENT_Context *ctx)
182 {
183         struct GNUNET_FRAGEMENT_Ctxbuffer *buffer;
184         for(buffer = ctx->buffer; buffer!=NULL; buffer = buffer->next){
185                 GNUNET_free(buffer->num);
186                 GNUNET_free(buffer);
187         }
188         GNUNET_free(ctx);
189         GNUNET_assert (0);
190 }
191
192
193 /**
194  * We have received a fragment.  Process it.
195  *
196  * @param ctx the context
197  * @param sender who transmitted the fragment
198  * @param msg the message that was received
199  */
200 void
201 GNUNET_FRAGMENT_process (struct GNUNET_FRAGMENT_Context *ctx,
202                 const struct GNUNET_PeerIdentity *sender,
203                 const struct GNUNET_MessageHeader *msg)
204 {
205         uint16_t type = ntohs(msg->type);
206         int exist = 0, received = 0;
207         if(type!=GNUNET_MESSAGE_TYPE_FRAGMENT){
208                 return;
209         }
210         struct Fragment *frag = (struct Fragment *)msg;
211         struct GNUNET_FRAGEMENT_Ctxbuffer* buffer;
212         struct GNUNET_FRAGEMENT_Ctxbuffer* prev;
213         prev = NULL;
214         buffer = ctx->buffer;
215         while (buffer != NULL)
216         {
217                 if ((buffer->id == ntohl(frag->id))&&(0 == memcmp (&buffer->peerID,
218                              sender, sizeof (struct GNUNET_PeerIdentity)))){
219                         exist = 1;
220                         break;
221                 }
222                 prev = buffer;
223                 buffer = buffer->next;
224         }
225
226         if (exist)
227         {
228                 int i;
229                 for(i = 0; i<ntohs(frag->totalNum); i++){
230                         if(buffer->num[i]==ntohs(frag->off)/(ntohs(frag->mtu)-sizeof(struct Fragment))){
231                                 received = 1;
232                                 break;
233                                 }
234                 }
235         }
236
237         if(!exist){
238                 buffer = GNUNET_malloc(sizeof(struct GNUNET_FRAGEMENT_Ctxbuffer));
239                 buffer->num = GNUNET_malloc(ntohs(frag->totalNum)*sizeof(int));
240                 int j;
241                 for(j = 0; j<ntohs(frag->totalNum); j++){
242                         buffer->num[j] = -10;
243                 }
244                 buffer->peerID = *sender;
245                 buffer->id = ntohl(frag->id);
246                 buffer->receivedTime = GNUNET_TIME_absolute_get ();
247                 uint16_t si = ntohs(frag->totalSize);
248                 buffer->size = si;
249                 buffer->buff = GNUNET_malloc(si);
250                 buffer->next = ctx->buffer;
251                 ctx->buffer = buffer;
252         }
253
254         if(!received){
255                 buffer->num[buffer->counter++]=ntohs(frag->off)/(ntohs(frag->mtu)-sizeof(struct Fragment));
256                 uint16_t sizeoffrag = ntohs(frag->header.size) - sizeof(struct Fragment);
257                 memcpy(&buffer->buff[ntohs(frag->off)], &frag[1], sizeoffrag);
258                 buffer->receivedTime = GNUNET_TIME_absolute_get ();
259         }
260
261         if(buffer->counter == ntohs(frag->totalNum))
262         {
263                 ctx->proc(ctx->proc_cls, (struct GNUNET_MessageHeader *)buffer->buff);
264                 if(prev==NULL){
265                         ctx->buffer = buffer->next;
266                 }
267                 else{
268                         prev->next = buffer->next;
269                 }
270                 GNUNET_free(buffer);
271                 return;
272         }
273 }
274
275
276
277 #if 0
278
279 /**
280  * How many buckets does the fragment hash table
281  * have?
282  */
283 #define DEFRAG_BUCKET_COUNT 16
284
285 /**
286  * After how long do fragments time out?
287  */
288 #ifndef DEFRAGMENTATION_TIMEOUT
289 #define DEFRAGMENTATION_TIMEOUT (3 * GNUNET_CRON_MINUTES)
290 #endif
291
292 /**
293  * Entry in the linked list of fragments.
294  */
295 typedef struct FL
296 {
297         struct FL *link;
298         P2P_fragmentation_MESSAGE *frag;
299 } FL;
300
301 /**
302  * Entry in the GNUNET_hash table of fragments.
303  */
304 typedef struct FC
305 {
306         struct FC *next;
307         FL *head;
308         GNUNET_PeerIdentity sender;
309         int id;
310         GNUNET_CronTime ttl;
311 } FC;
312
313 #define FRAGSIZE(fl) ((ntohs(fl->frag->header.size)-sizeof(P2P_fragmentation_MESSAGE)))
314
315 static GNUNET_CoreAPIForPlugins *coreAPI;
316
317 static GNUNET_Stats_ServiceAPI *stats;
318
319 static int stat_defragmented;
320
321 static int stat_fragmented;
322
323 static int stat_discarded;
324
325 /**
326  * Hashtable *with* collision management!
327  */
328 static FC *defragmentationCache[DEFRAG_BUCKET_COUNT];
329
330 /**
331  * Lock for the defragmentation cache.
332  */
333 static struct GNUNET_Mutex *defragCacheLock;
334
335 static void
336 freeFL (FL * fl, int c)
337 {
338         while (fl != NULL)
339         {
340                 FL *link = fl->link;
341                 if (stats != NULL)
342                         stats->change (stat_discarded, c);
343                 GNUNET_free (fl->frag);
344                 GNUNET_free (fl);
345                 fl = link;
346         }
347 }
348
349 /**
350  * This cron job ensures that we purge buffers of fragments
351  * that have timed out.  It can run in much longer intervals
352  * than the defragmentationCron, e.g. every 60s.
353  * <p>
354  * This method goes through the hashtable, finds entries that
355  * have timed out and removes them (and all the fragments that
356  * belong to the entry).  It's a bit more complicated as the
357  * collision list is also collapsed.
358  */
359 static void
360 defragmentationPurgeCron (void *unused)
361 {
362         int i;
363         FC *smf;
364         FC *next;
365         FC *last;
366
367         GNUNET_mutex_lock (defragCacheLock);
368         for (i = 0; i < DEFRAG_BUCKET_COUNT; i++)
369         {
370                 last = NULL;
371                 smf = defragmentationCache[i];
372                 while (smf != NULL)
373                 {
374                         if (smf->ttl < GNUNET_get_time ())
375                         {
376                                 /* free linked list of fragments */
377                                 freeFL (smf->head, 1);
378                                 next = smf->next;
379                                 GNUNET_free (smf);
380                                 if (last == NULL)
381                                         defragmentationCache[i] = next;
382                                 else
383                                         last->next = next;
384                                 smf = next;
385                         }
386                         else
387                         {
388                                 last = smf;
389                                 smf = smf->next;
390                         }
391                 }                       /* while smf != NULL */
392         }                           /* for all buckets */
393         GNUNET_mutex_unlock (defragCacheLock);
394 }
395
396 /**
397  * Check if this fragment-list is complete.  If yes, put it together,
398  * process and free all buffers.  Does not free the pep
399  * itself (but sets the TTL to 0 to have the cron free it
400  * in the next iteration).
401  *
402  * @param pep the entry in the GNUNET_hash table
403  */
404 static void
405 checkComplete (FC * pep)
406 {
407         FL *pos;
408         unsigned short off;
409         unsigned short len;
410         char *msg;
411
412         GNUNET_GE_ASSERT (NULL, pep != NULL);
413         pos = pep->head;
414         if (pos == NULL)
415                 return;
416         len = ntohs (pos->frag->len);
417         if (len == 0)
418                 goto CLEANUP;               /* really bad error! */
419         off = 0;
420         while ((pos != NULL) && (ntohs (pos->frag->off) <= off))
421         {
422                 if (off >= off + FRAGSIZE (pos))
423                         goto CLEANUP;           /* error! */
424                 if (ntohs (pos->frag->off) + FRAGSIZE (pos) > off)
425                         off = ntohs (pos->frag->off) + FRAGSIZE (pos);
426                 else
427                         goto CLEANUP;           /* error! */
428                 pos = pos->link;
429         }
430         if (off < len)
431                 return;                     /* some fragment is still missing */
432
433         msg = GNUNET_malloc (len);
434         pos = pep->head;
435         while (pos != NULL)
436         {
437                 memcpy (&msg[ntohs (pos->frag->off)], &pos->frag[1], FRAGSIZE (pos));
438                 pos = pos->link;
439         }
440         if (stats != NULL)
441                 stats->change (stat_defragmented, 1);
442 #if 0
443         printf ("Finished defragmentation!\n");
444 #endif
445         /* handle message! */
446         coreAPI->loopback_send (&pep->sender, msg, len, GNUNET_YES, NULL);
447         GNUNET_free (msg);
448         CLEANUP:
449         /* free fragment buffers */
450         freeFL (pep->head, 0);
451         pep->head = NULL;
452         pep->ttl = 0;
453 }
454
455 /**
456  * See if the new fragment is a part of this entry and join them if
457  * yes.  Return GNUNET_SYSERR if the fragments do not match.  Return GNUNET_OK if
458  * the fragments do match and the fragment has been processed.  The
459  * defragCacheLock is already acquired by the caller whenever this
460  * method is called.<p>
461  *
462  * @param entry the entry in the cache
463  * @param pep the new entry
464  * @param packet the ip part in the new entry
465  */
466 static int
467 tryJoin (FC * entry,
468                 const GNUNET_PeerIdentity * sender,
469                 const P2P_fragmentation_MESSAGE * packet)
470 {
471         /* frame before ours; may end in the middle of
472      our frame or before it starts; NULL if we are
473      the earliest position we have received so far */
474         FL *before;
475         /* frame after ours; may start in the middle of
476      our frame or after it; NULL if we are the last
477      fragment we have received so far */
478         FL *after;
479         /* current position in the frame-list */
480         FL *pos;
481         /* the new entry that we're inserting */
482         FL *pep;
483         FL *tmp;
484         unsigned short end;
485
486         GNUNET_GE_ASSERT (NULL, entry != NULL);
487         if (0 != memcmp (sender, &entry->sender, sizeof (GNUNET_PeerIdentity)))
488                 return GNUNET_SYSERR;       /* wrong fragment list, try another! */
489         if (ntohl (packet->id) != entry->id)
490                 return GNUNET_SYSERR;       /* wrong fragment list, try another! */
491 #if 0
492         printf ("Received fragment %u from %u to %u\n",
493                         ntohl (packet->id),
494                         ntohs (packet->off),
495                         ntohs (packet->off) + ntohs (packet->header.size) -
496                         sizeof (P2P_fragmentation_MESSAGE));
497 #endif
498         pos = entry->head;
499         if ((pos != NULL) && (packet->len != pos->frag->len))
500                 return GNUNET_SYSERR;       /* wrong fragment size */
501
502         before = NULL;
503         /* find the before-frame */
504         while ((pos != NULL) && (ntohs (pos->frag->off) < ntohs (packet->off)))
505         {
506                 before = pos;
507                 pos = pos->link;
508         }
509
510         /* find the after-frame */
511         end =
512                         ntohs (packet->off) + ntohs (packet->header.size) -
513                         sizeof (P2P_fragmentation_MESSAGE);
514         if (end <= ntohs (packet->off))
515         {
516                 GNUNET_GE_LOG (NULL,
517                                 GNUNET_GE_DEVELOPER | GNUNET_GE_DEBUG | GNUNET_GE_BULK,
518                                 "Received invalid fragment at %s:%d\n", __FILE__,
519                                 __LINE__);
520                 return GNUNET_SYSERR;     /* yuck! integer overflow! */
521         }
522
523         if (before != NULL)
524                 after = before;
525         else
526                 after = entry->head;
527         while ((after != NULL) && (ntohs (after->frag->off) < end))
528                 after = after->link;
529
530         if ((before != NULL) && (before == after))
531         {
532                 /* this implies after or before != NULL and thereby the new
533          fragment is redundant as it is fully enclosed in an earlier
534          fragment */
535                 if (stats != NULL)
536                         stats->change (stat_defragmented, 1);
537                 return GNUNET_OK;         /* drop, there is a packet that spans our range! */
538         }
539
540         if ((before != NULL) &&
541                         (after != NULL) &&
542                         ((htons (before->frag->off) +
543                                         FRAGSIZE (before)) >= htons (after->frag->off)))
544         {
545                 /* this implies that the fragment that starts before us and the
546          fragment that comes after this one leave no space in the middle
547          or even overlap; thus we can drop this redundant piece */
548                 if (stats != NULL)
549                         stats->change (stat_defragmented, 1);
550                 return GNUNET_OK;
551         }
552
553         /* allocate pep */
554         pep = GNUNET_malloc (sizeof (FC));
555         pep->frag = GNUNET_malloc (ntohs (packet->header.size));
556         memcpy (pep->frag, packet, ntohs (packet->header.size));
557         pep->link = NULL;
558
559         if (before == NULL)
560         {
561                 pep->link = after;
562                 pos = entry->head;
563                 while (pos != after)
564                 {
565                         tmp = pos->link;
566                         GNUNET_free (pos->frag);
567                         GNUNET_free (pos);
568                         pos = tmp;
569                 }
570                 entry->head = pep;
571                 goto FINISH;
572                 /* end of insert first */
573         }
574
575         if (after == NULL)
576         {
577                 /* insert last: find the end, free everything after it */
578                 freeFL (before->link, 1);
579                 before->link = pep;
580                 goto FINISH;
581         }
582
583         /* ok, we are filling the middle between two fragments; insert.  If
584      there is anything else in the middle, it can be dropped as we're
585      bigger & cover that area as well */
586         /* free everything between before and after */
587         pos = before->link;
588         while (pos != after)
589         {
590                 tmp = pos->link;
591                 GNUNET_free (pos->frag);
592                 GNUNET_free (pos);
593                 pos = tmp;
594         }
595         before->link = pep;
596         pep->link = after;
597
598         FINISH:
599         entry->ttl = GNUNET_get_time () + DEFRAGMENTATION_TIMEOUT;
600         checkComplete (entry);
601         return GNUNET_OK;
602 }
603
604 /**
605  * Defragment the given fragment and pass to handler once
606  * defragmentation is complete.
607  *
608  * @param frag the packet to defragment
609  * @return GNUNET_SYSERR if the fragment is invalid
610  */
611 static int
612 processFragment (const GNUNET_PeerIdentity * sender,
613                 const GNUNET_MessageHeader * frag)
614 {
615         unsigned int hash;
616         FC *smf;
617
618         if (ntohs (frag->size) < sizeof (P2P_fragmentation_MESSAGE))
619                 return GNUNET_SYSERR;
620
621         GNUNET_mutex_lock (defragCacheLock);
622         hash = sender->hashPubKey.bits[0] % DEFRAG_BUCKET_COUNT;
623         smf = defragmentationCache[hash];
624         while (smf != NULL)
625         {
626                 if (GNUNET_OK ==
627                                 tryJoin (smf, sender, (P2P_fragmentation_MESSAGE *) frag))
628                 {
629                         GNUNET_mutex_unlock (defragCacheLock);
630                         return GNUNET_OK;
631                 }
632                 if (0 == memcmp (sender, &smf->sender, sizeof (GNUNET_PeerIdentity)))
633                 {
634                         freeFL (smf->head, 1);
635                         break;
636                 }
637                 smf = smf->next;
638         }
639         if (smf == NULL)
640         {
641                 smf = GNUNET_malloc (sizeof (FC));
642                 smf->next = defragmentationCache[hash];
643                 defragmentationCache[hash] = smf;
644                 smf->ttl = GNUNET_get_time () + DEFRAGMENTATION_TIMEOUT;
645                 smf->sender = *sender;
646         }
647         smf->id = ntohl (((P2P_fragmentation_MESSAGE *) frag)->id);
648         smf->head = GNUNET_malloc (sizeof (FL));
649         smf->head->link = NULL;
650         smf->head->frag = GNUNET_malloc (ntohs (frag->size));
651         memcpy (smf->head->frag, frag, ntohs (frag->size));
652
653         GNUNET_mutex_unlock (defragCacheLock);
654         return GNUNET_OK;
655 }
656
657 typedef struct
658 {
659         GNUNET_PeerIdentity sender;
660         /* maximums size of each fragment */
661         unsigned short mtu;
662         /** how long is this message part expected to be? */
663         unsigned short len;
664         /** when did we intend to transmit? */
665         GNUNET_CronTime transmissionTime;
666 } FragmentBMC;
667
668 /**
669  * Send a message that had to be fragmented (right now!).  First grabs
670  * the first part of the message (obtained from ctx->se) and stores
671  * that in a P2P_fragmentation_MESSAGE envelope.  The remaining fragments are
672  * added to the send queue with GNUNET_EXTREME_PRIORITY (to ensure that they
673  * will be transmitted next).  The logic here is that if the priority
674  * for the first fragment was sufficiently high, the priority should
675  * also have been sufficiently high for all of the other fragments (at
676  * this time) since they have the same priority.  And we want to make
677  * sure that we send all of them since just sending the first fragment
678  * and then going to other messages of equal priority would not be
679  * such a great idea (i.e. would just waste bandwidth).
680  */
681 static int
682 fragmentBMC (void *buf, void *cls, unsigned short len)
683 {
684         FragmentBMC *ctx = cls;
685         static int idGen = 0;
686         P2P_fragmentation_MESSAGE *frag;
687         unsigned int pos;
688         int id;
689         unsigned short mlen;
690
691         if ((len < ctx->mtu) || (buf == NULL))
692         {
693                 GNUNET_free (ctx);
694                 return GNUNET_SYSERR;
695         }
696         if (stats != NULL)
697                 stats->change (stat_fragmented, 1);
698         id = (idGen++) + GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, 512);
699         /* write first fragment to buf */
700         frag = (P2P_fragmentation_MESSAGE *) buf;
701         frag->header.size = htons (len);
702         frag->header.type = htons (GNUNET_P2P_PROTO_MESSAGE_FRAGMENT);
703         frag->id = id;
704         frag->off = htons (0);
705         frag->len = htons (ctx->len);
706         memcpy (&frag[1], &ctx[1], len - sizeof (P2P_fragmentation_MESSAGE));
707
708         /* create remaining fragments, add to queue! */
709         pos = len - sizeof (P2P_fragmentation_MESSAGE);
710         frag = GNUNET_malloc (ctx->mtu);
711         while (pos < ctx->len)
712         {
713                 mlen = sizeof (P2P_fragmentation_MESSAGE) + ctx->len - pos;
714                 if (mlen > ctx->mtu)
715                         mlen = ctx->mtu;
716                 GNUNET_GE_ASSERT (NULL, mlen > sizeof (P2P_fragmentation_MESSAGE));
717                 frag->header.size = htons (mlen);
718                 frag->header.type = htons (GNUNET_P2P_PROTO_MESSAGE_FRAGMENT);
719                 frag->id = id;
720                 frag->off = htons (pos);
721                 frag->len = htons (ctx->len);
722                 memcpy (&frag[1],
723                                 &((char *) (&ctx[1]))[pos],
724                                 mlen - sizeof (P2P_fragmentation_MESSAGE));
725                 coreAPI->ciphertext_send (&ctx->sender,
726                                 &frag->header,
727                                 GNUNET_EXTREME_PRIORITY,
728                                 ctx->transmissionTime - GNUNET_get_time ());
729                 pos += mlen - sizeof (P2P_fragmentation_MESSAGE);
730         }
731         GNUNET_GE_ASSERT (NULL, pos == ctx->len);
732         GNUNET_free (frag);
733         GNUNET_free (ctx);
734         return GNUNET_OK;
735 }
736
737 /**
738  * The given message must be fragmented.  Produce a placeholder that
739  * corresponds to the first fragment.  Once that fragment is scheduled
740  * for transmission, the placeholder should automatically add all of
741  * the other fragments (with very high priority).
742  */
743 void
744 fragment (const GNUNET_PeerIdentity * peer,
745                 unsigned int mtu,
746                 unsigned int prio,
747                 unsigned int targetTime,
748                 unsigned int len, GNUNET_BuildMessageCallback bmc, void *bmcClosure)
749 {
750         FragmentBMC *fbmc;
751         int xlen;
752
753         GNUNET_GE_ASSERT (NULL, len > mtu);
754         GNUNET_GE_ASSERT (NULL, mtu > sizeof (P2P_fragmentation_MESSAGE));
755         fbmc = GNUNET_malloc (sizeof (FragmentBMC) + len);
756         fbmc->mtu = mtu;
757         fbmc->sender = *peer;
758         fbmc->transmissionTime = targetTime;
759         fbmc->len = len;
760         if (bmc == NULL)
761         {
762                 memcpy (&fbmc[1], bmcClosure, len);
763                 GNUNET_free (bmcClosure);
764         }
765         else
766         {
767                 if (GNUNET_SYSERR == bmc (&fbmc[1], bmcClosure, len))
768                 {
769                         GNUNET_free (fbmc);
770                         return;
771                 }
772         }
773         xlen = mtu - sizeof (P2P_fragmentation_MESSAGE);
774         coreAPI->ciphertext_send_with_callback (peer, &fragmentBMC, fbmc, mtu, prio * xlen / len,     /* compute new priority */
775                         targetTime);
776 }
777
778 /**
779  * Initialize Fragmentation module.
780  */
781 GNUNET_Fragmentation_ServiceAPI *
782 provide_module_fragmentation (GNUNET_CoreAPIForPlugins * capi)
783 {
784         static GNUNET_Fragmentation_ServiceAPI ret;
785         int i;
786
787         coreAPI = capi;
788         stats = coreAPI->service_request ("stats");
789         if (stats != NULL)
790         {
791                 stat_defragmented =
792                                 stats->create (gettext_noop ("# messages defragmented"));
793                 stat_fragmented =
794                                 stats->create (gettext_noop ("# messages fragmented"));
795                 stat_discarded = stats->create (gettext_noop ("# fragments discarded"));
796         }
797         for (i = 0; i < DEFRAG_BUCKET_COUNT; i++)
798                 defragmentationCache[i] = NULL;
799         defragCacheLock = GNUNET_mutex_create (GNUNET_NO);
800         GNUNET_cron_add_job (coreAPI->cron,
801                         &defragmentationPurgeCron,
802                         60 * GNUNET_CRON_SECONDS, 60 * GNUNET_CRON_SECONDS,
803                         NULL);
804         GNUNET_GE_LOG (capi->ectx,
805                         GNUNET_GE_INFO | GNUNET_GE_USER | GNUNET_GE_REQUEST,
806                         _("`%s' registering handler %d\n"), "fragmentation",
807                         GNUNET_P2P_PROTO_MESSAGE_FRAGMENT);
808         capi->p2p_ciphertext_handler_register (GNUNET_P2P_PROTO_MESSAGE_FRAGMENT,
809                         &processFragment);
810
811         ret.fragment = &fragment;
812         return &ret;
813 }
814
815 /**
816  * Shutdown fragmentation.
817  */
818 void
819 release_module_fragmentation ()
820 {
821         int i;
822
823         coreAPI->p2p_ciphertext_handler_unregister
824         (GNUNET_P2P_PROTO_MESSAGE_FRAGMENT, &processFragment);
825         GNUNET_cron_del_job (coreAPI->cron, &defragmentationPurgeCron,
826                         60 * GNUNET_CRON_SECONDS, NULL);
827         for (i = 0; i < DEFRAG_BUCKET_COUNT; i++)
828         {
829                 FC *pos = defragmentationCache[i];
830                 while (pos != NULL)
831                 {
832                         FC *next = pos->next;
833                         freeFL (pos->head, 1);
834                         GNUNET_free (pos);
835                         pos = next;
836                 }
837         }
838         if (stats != NULL)
839         {
840                 coreAPI->service_release (stats);
841                 stats = NULL;
842         }
843         GNUNET_mutex_destroy (defragCacheLock);
844         defragCacheLock = NULL;
845         coreAPI = NULL;
846 }
847
848 #endif
849
850 /* end of fragmentation.c */