c67059f344080a84324b6e729bd743983ef0d177
[oweals/gnunet.git] / src / fragmentation / fragmentation.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2006, 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19  */
20 /**
21  * @file fragmentation/fragmentation.c
22  * @brief fragmentation and defragmentation, this code allows
23  *        sending and receiving messages that are larger than
24  *        the MTU of the transport.  Messages are still limited
25  *        to a maximum size of 65535 bytes, which is a good
26  *        idea because otherwise we may need ungainly fragmentation
27  *        buffers.  Each connected peer can have at most one
28  *        fragmented packet at any given point in time (prevents
29  *        DoS attacks).  Fragmented messages that have not been
30  *        completed after a certain amount of time are discarded.
31  * @author Christian Grothoff
32  */
33
34 #include "platform.h"
35 #include "gnunet_fragmentation_lib.h"
36 #include "gnunet_protocols.h"
37 #include "gnunet_util_lib.h"
38 /**
39  * Message fragment.  This header is followed
40  * by the actual data of the fragment.
41  */
42 struct Fragment
43 {
44
45         struct GNUNET_MessageHeader header;
46
47         /**
48          * Fragment offset.
49          */
50         uint32_t off GNUNET_PACKED;
51
52         /**
53          * "unique" id for the fragment
54          */
55         uint64_t id GNUNET_PACKED;
56
57         size_t mtu;
58         uint32_t totalNum;
59         uint64_t totalSize;
60 };
61
62 struct GNUNET_FRAGEMENT_Ctxbuffer{
63         struct GNUNET_FRAGEMENT_Ctxbuffer *next;
64         uint64_t id;
65         uint16_t size;
66         char * buff;
67         int counter;
68         struct GNUNET_TIME_Absolute receivedTime;
69         struct GNUNET_PeerIdentity *peerID;
70         int * num;
71 };
72
73
74 /**
75  * Defragmentation context.
76  */
77 struct GNUNET_FRAGMENT_Context
78 {
79         uint32_t maxNum;
80         struct GNUNET_FRAGEMENT_Ctxbuffer *buffer;
81         GNUNET_FRAGMENT_MessageProcessor proc;
82         void *proc_cls;
83 };
84
85
86 /**
87  * Fragment an over-sized message.
88  *
89  * @param msg the message to fragment
90  * @param mtu the maximum message size
91  * @param proc function to call for each fragment
92  * @param proc_cls closure for proc
93  */
94 void
95 GNUNET_FRAGMENT_fragment (const struct GNUNET_MessageHeader *msg,
96                 uint16_t mtu,
97                 GNUNET_FRAGMENT_MessageProcessor proc,
98                 void *proc_cls)
99 {
100         uint32_t id = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, 256);
101         size_t size = sizeof(struct Fragment);
102         if(ntohs(msg->size) > mtu-size){
103                 uint16_t lastSize;
104                 uint16_t num;
105                 uint16_t i;
106                 uint16_t actualNum;
107                 lastSize = ntohs(msg->size) % (mtu-size);
108                 num     = ntohs(msg->size) / (mtu - size);
109                 actualNum = num;
110                 if(lastSize!=0){
111                         actualNum = num+1;
112                 }
113                 for(i = 0; i<actualNum; i++)
114                 {
115                         struct Fragment *frag;
116                         if(actualNum != num){
117                                                         if(i!=actualNum-1){
118                                                                 frag = GNUNET_malloc(mtu);
119                                                         }
120                                                         else{
121                                                             frag = GNUNET_malloc(lastSize+size);
122                                                                 }
123                                                         }
124                         else{
125                                         frag = GNUNET_malloc(mtu);
126                                 }
127                         frag->header.type = htons(GNUNET_MESSAGE_TYPE_FRAGMENT);
128                         frag->id = htonl(id);
129                         frag->off = htons((mtu-size)*i);
130                         frag->mtu = htons(mtu);
131                         frag->totalNum = htons(actualNum);
132                         frag->totalSize = msg->size;
133                         if(actualNum != num){
134                                 if(i!=actualNum-1){
135                                         frag->header.size = htons(mtu);
136                                         memcpy(&frag[1], msg + ntohs(frag->off), mtu - size);
137                                 }
138                                 else{
139                                         frag->header.size = htons(lastSize+size);
140                                         memcpy(&frag[1], msg + ntohs(frag->off), lastSize);
141                                 }
142                         }
143                         else{
144                                 frag->header.size = htons(mtu);
145                                 memcpy(&frag[1], msg + ntohs(frag->off), mtu - size);
146                         }
147                         proc(proc_cls, &frag->header);
148                         GNUNET_free(frag);
149                 }
150         }
151 }
152
153
154 /**
155  * Create a defragmentation context.
156  *
157  * @param stats statistics context
158  * @param proc function to call with defragmented messages
159  * @param proc_cls closure for proc
160  * @return the defragmentation context
161  */
162 struct GNUNET_FRAGMENT_Context *
163 GNUNET_FRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
164                 GNUNET_FRAGMENT_MessageProcessor proc,
165                 void *proc_cls)
166                 {
167         struct GNUNET_FRAGMENT_Context *ctx = (struct GNUNET_FRAGMENT_Context*)GNUNET_malloc(sizeof(struct GNUNET_FRAGMENT_Context));
168         ctx->maxNum = 100;
169         ctx->proc = proc;
170         ctx->proc_cls = proc_cls;
171         ctx->buffer = NULL;
172         return ctx;
173                 }
174
175
176 /**
177  * Destroy the given defragmentation context.
178  */
179 void
180 GNUNET_FRAGMENT_context_destroy (struct GNUNET_FRAGMENT_Context *ctx)
181 {
182         struct GNUNET_FRAGEMENT_Ctxbuffer *buffer;
183         for(buffer = ctx->buffer; buffer!=NULL; buffer = buffer->next){
184                 GNUNET_free(buffer->num);
185                 GNUNET_free(buffer);
186         }
187         GNUNET_free(ctx);
188         GNUNET_assert (0);
189 }
190
191
192 /**
193  * We have received a fragment.  Process it.
194  *
195  * @param ctx the context
196  * @param sender who transmitted the fragment
197  * @param msg the message that was received
198  */
199 void
200 GNUNET_FRAGMENT_process (struct GNUNET_FRAGMENT_Context *ctx,
201                 const struct GNUNET_PeerIdentity *sender,
202                 const struct GNUNET_MessageHeader *msg)
203 {
204         uint16_t type = ntohs(msg->type);
205         int exist = 0, received = 0;
206         if(type!=GNUNET_MESSAGE_TYPE_FRAGMENT){
207                 return;
208         }
209         struct Fragment *frag = (struct Fragment *)msg;
210         struct GNUNET_FRAGEMENT_Ctxbuffer* buffer;
211         struct GNUNET_FRAGEMENT_Ctxbuffer* prev;
212         prev = NULL;
213         buffer = ctx->buffer;
214         while (buffer != NULL)
215         {
216 //for(buffer = ctx->buffer; buffer != NULL; buffer = buffer->next){
217                 if(buffer->id == ntohl(frag->id)&&(buffer->peerID==sender)){
218                         exist = 1;
219                         break;
220                 }
221                 prev = buffer;
222                 buffer = buffer->next;
223         }
224
225         if (exist)
226         {
227                 int i;
228                 for(i = 0; i<ntohs(frag->totalNum); i++){
229                         if(buffer->num[i]==ntohs(frag->off)/(ntohs(frag->mtu)-sizeof(struct Fragment))){
230                                 received = 1;
231                                 break;
232                                 }
233                 }
234         }
235
236         if(!exist){
237                 buffer = GNUNET_malloc(sizeof(struct GNUNET_FRAGEMENT_Ctxbuffer));
238                 buffer->num = (int*)GNUNET_malloc(ntohs(frag->totalNum)*sizeof(int));
239                 int j;
240                 for(j = 0; j<ntohs(frag->totalNum); j++){
241                         buffer->num[j] = -10;
242                 }
243                 buffer->peerID = sender;
244                 buffer->id = ntohl(frag->id);
245                 buffer->receivedTime = GNUNET_TIME_absolute_get ();
246                 uint64_t si = ntohs(frag->totalSize);
247                 buffer->size = si;
248                 buffer->buff = (char *)GNUNET_malloc(si);
249                 buffer->next = ctx->buffer;
250                 ctx->buffer = buffer;
251         }
252
253         if(!received){
254                 buffer->num[buffer->counter++]=ntohs(frag->off)/(ntohs(frag->mtu)-sizeof(struct Fragment));
255                 uint16_t sizeoffrag = ntohs(frag->header.size) - sizeof(struct Fragment);
256                 memcpy(&buffer->buff[ntohs(frag->off)], &frag[1], sizeoffrag);
257                 buffer->receivedTime = GNUNET_TIME_absolute_get ();
258         }
259
260         if(buffer->counter == ntohs(frag->totalNum))
261         {
262                 ctx->proc(ctx->proc_cls, (struct GNUNET_MessageHeader *)buffer->buff);
263                 if(prev==NULL){
264                         ctx->buffer = buffer->next;
265                 }
266                 else{
267                         prev->next = buffer->next;
268                 }
269                 GNUNET_free(buffer);
270                 return;
271         }
272 }
273
274
275
276 #if 0
277
278 /**
279  * How many buckets does the fragment hash table
280  * have?
281  */
282 #define DEFRAG_BUCKET_COUNT 16
283
284 /**
285  * After how long do fragments time out?
286  */
287 #ifndef DEFRAGMENTATION_TIMEOUT
288 #define DEFRAGMENTATION_TIMEOUT (3 * GNUNET_CRON_MINUTES)
289 #endif
290
291 /**
292  * Entry in the linked list of fragments.
293  */
294 typedef struct FL
295 {
296         struct FL *link;
297         P2P_fragmentation_MESSAGE *frag;
298 } FL;
299
300 /**
301  * Entry in the GNUNET_hash table of fragments.
302  */
303 typedef struct FC
304 {
305         struct FC *next;
306         FL *head;
307         GNUNET_PeerIdentity sender;
308         int id;
309         GNUNET_CronTime ttl;
310 } FC;
311
312 #define FRAGSIZE(fl) ((ntohs(fl->frag->header.size)-sizeof(P2P_fragmentation_MESSAGE)))
313
314 static GNUNET_CoreAPIForPlugins *coreAPI;
315
316 static GNUNET_Stats_ServiceAPI *stats;
317
318 static int stat_defragmented;
319
320 static int stat_fragmented;
321
322 static int stat_discarded;
323
324 /**
325  * Hashtable *with* collision management!
326  */
327 static FC *defragmentationCache[DEFRAG_BUCKET_COUNT];
328
329 /**
330  * Lock for the defragmentation cache.
331  */
332 static struct GNUNET_Mutex *defragCacheLock;
333
334 static void
335 freeFL (FL * fl, int c)
336 {
337         while (fl != NULL)
338         {
339                 FL *link = fl->link;
340                 if (stats != NULL)
341                         stats->change (stat_discarded, c);
342                 GNUNET_free (fl->frag);
343                 GNUNET_free (fl);
344                 fl = link;
345         }
346 }
347
348 /**
349  * This cron job ensures that we purge buffers of fragments
350  * that have timed out.  It can run in much longer intervals
351  * than the defragmentationCron, e.g. every 60s.
352  * <p>
353  * This method goes through the hashtable, finds entries that
354  * have timed out and removes them (and all the fragments that
355  * belong to the entry).  It's a bit more complicated as the
356  * collision list is also collapsed.
357  */
358 static void
359 defragmentationPurgeCron (void *unused)
360 {
361         int i;
362         FC *smf;
363         FC *next;
364         FC *last;
365
366         GNUNET_mutex_lock (defragCacheLock);
367         for (i = 0; i < DEFRAG_BUCKET_COUNT; i++)
368         {
369                 last = NULL;
370                 smf = defragmentationCache[i];
371                 while (smf != NULL)
372                 {
373                         if (smf->ttl < GNUNET_get_time ())
374                         {
375                                 /* free linked list of fragments */
376                                 freeFL (smf->head, 1);
377                                 next = smf->next;
378                                 GNUNET_free (smf);
379                                 if (last == NULL)
380                                         defragmentationCache[i] = next;
381                                 else
382                                         last->next = next;
383                                 smf = next;
384                         }
385                         else
386                         {
387                                 last = smf;
388                                 smf = smf->next;
389                         }
390                 }                       /* while smf != NULL */
391         }                           /* for all buckets */
392         GNUNET_mutex_unlock (defragCacheLock);
393 }
394
395 /**
396  * Check if this fragment-list is complete.  If yes, put it together,
397  * process and free all buffers.  Does not free the pep
398  * itself (but sets the TTL to 0 to have the cron free it
399  * in the next iteration).
400  *
401  * @param pep the entry in the GNUNET_hash table
402  */
403 static void
404 checkComplete (FC * pep)
405 {
406         FL *pos;
407         unsigned short off;
408         unsigned short len;
409         char *msg;
410
411         GNUNET_GE_ASSERT (NULL, pep != NULL);
412         pos = pep->head;
413         if (pos == NULL)
414                 return;
415         len = ntohs (pos->frag->len);
416         if (len == 0)
417                 goto CLEANUP;               /* really bad error! */
418         off = 0;
419         while ((pos != NULL) && (ntohs (pos->frag->off) <= off))
420         {
421                 if (off >= off + FRAGSIZE (pos))
422                         goto CLEANUP;           /* error! */
423                 if (ntohs (pos->frag->off) + FRAGSIZE (pos) > off)
424                         off = ntohs (pos->frag->off) + FRAGSIZE (pos);
425                 else
426                         goto CLEANUP;           /* error! */
427                 pos = pos->link;
428         }
429         if (off < len)
430                 return;                     /* some fragment is still missing */
431
432         msg = GNUNET_malloc (len);
433         pos = pep->head;
434         while (pos != NULL)
435         {
436                 memcpy (&msg[ntohs (pos->frag->off)], &pos->frag[1], FRAGSIZE (pos));
437                 pos = pos->link;
438         }
439         if (stats != NULL)
440                 stats->change (stat_defragmented, 1);
441 #if 0
442         printf ("Finished defragmentation!\n");
443 #endif
444         /* handle message! */
445         coreAPI->loopback_send (&pep->sender, msg, len, GNUNET_YES, NULL);
446         GNUNET_free (msg);
447         CLEANUP:
448         /* free fragment buffers */
449         freeFL (pep->head, 0);
450         pep->head = NULL;
451         pep->ttl = 0;
452 }
453
454 /**
455  * See if the new fragment is a part of this entry and join them if
456  * yes.  Return GNUNET_SYSERR if the fragments do not match.  Return GNUNET_OK if
457  * the fragments do match and the fragment has been processed.  The
458  * defragCacheLock is already acquired by the caller whenever this
459  * method is called.<p>
460  *
461  * @param entry the entry in the cache
462  * @param pep the new entry
463  * @param packet the ip part in the new entry
464  */
465 static int
466 tryJoin (FC * entry,
467                 const GNUNET_PeerIdentity * sender,
468                 const P2P_fragmentation_MESSAGE * packet)
469 {
470         /* frame before ours; may end in the middle of
471      our frame or before it starts; NULL if we are
472      the earliest position we have received so far */
473         FL *before;
474         /* frame after ours; may start in the middle of
475      our frame or after it; NULL if we are the last
476      fragment we have received so far */
477         FL *after;
478         /* current position in the frame-list */
479         FL *pos;
480         /* the new entry that we're inserting */
481         FL *pep;
482         FL *tmp;
483         unsigned short end;
484
485         GNUNET_GE_ASSERT (NULL, entry != NULL);
486         if (0 != memcmp (sender, &entry->sender, sizeof (GNUNET_PeerIdentity)))
487                 return GNUNET_SYSERR;       /* wrong fragment list, try another! */
488         if (ntohl (packet->id) != entry->id)
489                 return GNUNET_SYSERR;       /* wrong fragment list, try another! */
490 #if 0
491         printf ("Received fragment %u from %u to %u\n",
492                         ntohl (packet->id),
493                         ntohs (packet->off),
494                         ntohs (packet->off) + ntohs (packet->header.size) -
495                         sizeof (P2P_fragmentation_MESSAGE));
496 #endif
497         pos = entry->head;
498         if ((pos != NULL) && (packet->len != pos->frag->len))
499                 return GNUNET_SYSERR;       /* wrong fragment size */
500
501         before = NULL;
502         /* find the before-frame */
503         while ((pos != NULL) && (ntohs (pos->frag->off) < ntohs (packet->off)))
504         {
505                 before = pos;
506                 pos = pos->link;
507         }
508
509         /* find the after-frame */
510         end =
511                         ntohs (packet->off) + ntohs (packet->header.size) -
512                         sizeof (P2P_fragmentation_MESSAGE);
513         if (end <= ntohs (packet->off))
514         {
515                 GNUNET_GE_LOG (NULL,
516                                 GNUNET_GE_DEVELOPER | GNUNET_GE_DEBUG | GNUNET_GE_BULK,
517                                 "Received invalid fragment at %s:%d\n", __FILE__,
518                                 __LINE__);
519                 return GNUNET_SYSERR;     /* yuck! integer overflow! */
520         }
521
522         if (before != NULL)
523                 after = before;
524         else
525                 after = entry->head;
526         while ((after != NULL) && (ntohs (after->frag->off) < end))
527                 after = after->link;
528
529         if ((before != NULL) && (before == after))
530         {
531                 /* this implies after or before != NULL and thereby the new
532          fragment is redundant as it is fully enclosed in an earlier
533          fragment */
534                 if (stats != NULL)
535                         stats->change (stat_defragmented, 1);
536                 return GNUNET_OK;         /* drop, there is a packet that spans our range! */
537         }
538
539         if ((before != NULL) &&
540                         (after != NULL) &&
541                         ((htons (before->frag->off) +
542                                         FRAGSIZE (before)) >= htons (after->frag->off)))
543         {
544                 /* this implies that the fragment that starts before us and the
545          fragment that comes after this one leave no space in the middle
546          or even overlap; thus we can drop this redundant piece */
547                 if (stats != NULL)
548                         stats->change (stat_defragmented, 1);
549                 return GNUNET_OK;
550         }
551
552         /* allocate pep */
553         pep = GNUNET_malloc (sizeof (FC));
554         pep->frag = GNUNET_malloc (ntohs (packet->header.size));
555         memcpy (pep->frag, packet, ntohs (packet->header.size));
556         pep->link = NULL;
557
558         if (before == NULL)
559         {
560                 pep->link = after;
561                 pos = entry->head;
562                 while (pos != after)
563                 {
564                         tmp = pos->link;
565                         GNUNET_free (pos->frag);
566                         GNUNET_free (pos);
567                         pos = tmp;
568                 }
569                 entry->head = pep;
570                 goto FINISH;
571                 /* end of insert first */
572         }
573
574         if (after == NULL)
575         {
576                 /* insert last: find the end, free everything after it */
577                 freeFL (before->link, 1);
578                 before->link = pep;
579                 goto FINISH;
580         }
581
582         /* ok, we are filling the middle between two fragments; insert.  If
583      there is anything else in the middle, it can be dropped as we're
584      bigger & cover that area as well */
585         /* free everything between before and after */
586         pos = before->link;
587         while (pos != after)
588         {
589                 tmp = pos->link;
590                 GNUNET_free (pos->frag);
591                 GNUNET_free (pos);
592                 pos = tmp;
593         }
594         before->link = pep;
595         pep->link = after;
596
597         FINISH:
598         entry->ttl = GNUNET_get_time () + DEFRAGMENTATION_TIMEOUT;
599         checkComplete (entry);
600         return GNUNET_OK;
601 }
602
603 /**
604  * Defragment the given fragment and pass to handler once
605  * defragmentation is complete.
606  *
607  * @param frag the packet to defragment
608  * @return GNUNET_SYSERR if the fragment is invalid
609  */
610 static int
611 processFragment (const GNUNET_PeerIdentity * sender,
612                 const GNUNET_MessageHeader * frag)
613 {
614         unsigned int hash;
615         FC *smf;
616
617         if (ntohs (frag->size) < sizeof (P2P_fragmentation_MESSAGE))
618                 return GNUNET_SYSERR;
619
620         GNUNET_mutex_lock (defragCacheLock);
621         hash = sender->hashPubKey.bits[0] % DEFRAG_BUCKET_COUNT;
622         smf = defragmentationCache[hash];
623         while (smf != NULL)
624         {
625                 if (GNUNET_OK ==
626                                 tryJoin (smf, sender, (P2P_fragmentation_MESSAGE *) frag))
627                 {
628                         GNUNET_mutex_unlock (defragCacheLock);
629                         return GNUNET_OK;
630                 }
631                 if (0 == memcmp (sender, &smf->sender, sizeof (GNUNET_PeerIdentity)))
632                 {
633                         freeFL (smf->head, 1);
634                         break;
635                 }
636                 smf = smf->next;
637         }
638         if (smf == NULL)
639         {
640                 smf = GNUNET_malloc (sizeof (FC));
641                 smf->next = defragmentationCache[hash];
642                 defragmentationCache[hash] = smf;
643                 smf->ttl = GNUNET_get_time () + DEFRAGMENTATION_TIMEOUT;
644                 smf->sender = *sender;
645         }
646         smf->id = ntohl (((P2P_fragmentation_MESSAGE *) frag)->id);
647         smf->head = GNUNET_malloc (sizeof (FL));
648         smf->head->link = NULL;
649         smf->head->frag = GNUNET_malloc (ntohs (frag->size));
650         memcpy (smf->head->frag, frag, ntohs (frag->size));
651
652         GNUNET_mutex_unlock (defragCacheLock);
653         return GNUNET_OK;
654 }
655
656 typedef struct
657 {
658         GNUNET_PeerIdentity sender;
659         /* maximums size of each fragment */
660         unsigned short mtu;
661         /** how long is this message part expected to be? */
662         unsigned short len;
663         /** when did we intend to transmit? */
664         GNUNET_CronTime transmissionTime;
665 } FragmentBMC;
666
667 /**
668  * Send a message that had to be fragmented (right now!).  First grabs
669  * the first part of the message (obtained from ctx->se) and stores
670  * that in a P2P_fragmentation_MESSAGE envelope.  The remaining fragments are
671  * added to the send queue with GNUNET_EXTREME_PRIORITY (to ensure that they
672  * will be transmitted next).  The logic here is that if the priority
673  * for the first fragment was sufficiently high, the priority should
674  * also have been sufficiently high for all of the other fragments (at
675  * this time) since they have the same priority.  And we want to make
676  * sure that we send all of them since just sending the first fragment
677  * and then going to other messages of equal priority would not be
678  * such a great idea (i.e. would just waste bandwidth).
679  */
680 static int
681 fragmentBMC (void *buf, void *cls, unsigned short len)
682 {
683         FragmentBMC *ctx = cls;
684         static int idGen = 0;
685         P2P_fragmentation_MESSAGE *frag;
686         unsigned int pos;
687         int id;
688         unsigned short mlen;
689
690         if ((len < ctx->mtu) || (buf == NULL))
691         {
692                 GNUNET_free (ctx);
693                 return GNUNET_SYSERR;
694         }
695         if (stats != NULL)
696                 stats->change (stat_fragmented, 1);
697         id = (idGen++) + GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, 512);
698         /* write first fragment to buf */
699         frag = (P2P_fragmentation_MESSAGE *) buf;
700         frag->header.size = htons (len);
701         frag->header.type = htons (GNUNET_P2P_PROTO_MESSAGE_FRAGMENT);
702         frag->id = id;
703         frag->off = htons (0);
704         frag->len = htons (ctx->len);
705         memcpy (&frag[1], &ctx[1], len - sizeof (P2P_fragmentation_MESSAGE));
706
707         /* create remaining fragments, add to queue! */
708         pos = len - sizeof (P2P_fragmentation_MESSAGE);
709         frag = GNUNET_malloc (ctx->mtu);
710         while (pos < ctx->len)
711         {
712                 mlen = sizeof (P2P_fragmentation_MESSAGE) + ctx->len - pos;
713                 if (mlen > ctx->mtu)
714                         mlen = ctx->mtu;
715                 GNUNET_GE_ASSERT (NULL, mlen > sizeof (P2P_fragmentation_MESSAGE));
716                 frag->header.size = htons (mlen);
717                 frag->header.type = htons (GNUNET_P2P_PROTO_MESSAGE_FRAGMENT);
718                 frag->id = id;
719                 frag->off = htons (pos);
720                 frag->len = htons (ctx->len);
721                 memcpy (&frag[1],
722                                 &((char *) (&ctx[1]))[pos],
723                                 mlen - sizeof (P2P_fragmentation_MESSAGE));
724                 coreAPI->ciphertext_send (&ctx->sender,
725                                 &frag->header,
726                                 GNUNET_EXTREME_PRIORITY,
727                                 ctx->transmissionTime - GNUNET_get_time ());
728                 pos += mlen - sizeof (P2P_fragmentation_MESSAGE);
729         }
730         GNUNET_GE_ASSERT (NULL, pos == ctx->len);
731         GNUNET_free (frag);
732         GNUNET_free (ctx);
733         return GNUNET_OK;
734 }
735
736 /**
737  * The given message must be fragmented.  Produce a placeholder that
738  * corresponds to the first fragment.  Once that fragment is scheduled
739  * for transmission, the placeholder should automatically add all of
740  * the other fragments (with very high priority).
741  */
742 void
743 fragment (const GNUNET_PeerIdentity * peer,
744                 unsigned int mtu,
745                 unsigned int prio,
746                 unsigned int targetTime,
747                 unsigned int len, GNUNET_BuildMessageCallback bmc, void *bmcClosure)
748 {
749         FragmentBMC *fbmc;
750         int xlen;
751
752         GNUNET_GE_ASSERT (NULL, len > mtu);
753         GNUNET_GE_ASSERT (NULL, mtu > sizeof (P2P_fragmentation_MESSAGE));
754         fbmc = GNUNET_malloc (sizeof (FragmentBMC) + len);
755         fbmc->mtu = mtu;
756         fbmc->sender = *peer;
757         fbmc->transmissionTime = targetTime;
758         fbmc->len = len;
759         if (bmc == NULL)
760         {
761                 memcpy (&fbmc[1], bmcClosure, len);
762                 GNUNET_free (bmcClosure);
763         }
764         else
765         {
766                 if (GNUNET_SYSERR == bmc (&fbmc[1], bmcClosure, len))
767                 {
768                         GNUNET_free (fbmc);
769                         return;
770                 }
771         }
772         xlen = mtu - sizeof (P2P_fragmentation_MESSAGE);
773         coreAPI->ciphertext_send_with_callback (peer, &fragmentBMC, fbmc, mtu, prio * xlen / len,     /* compute new priority */
774                         targetTime);
775 }
776
777 /**
778  * Initialize Fragmentation module.
779  */
780 GNUNET_Fragmentation_ServiceAPI *
781 provide_module_fragmentation (GNUNET_CoreAPIForPlugins * capi)
782 {
783         static GNUNET_Fragmentation_ServiceAPI ret;
784         int i;
785
786         coreAPI = capi;
787         stats = coreAPI->service_request ("stats");
788         if (stats != NULL)
789         {
790                 stat_defragmented =
791                                 stats->create (gettext_noop ("# messages defragmented"));
792                 stat_fragmented =
793                                 stats->create (gettext_noop ("# messages fragmented"));
794                 stat_discarded = stats->create (gettext_noop ("# fragments discarded"));
795         }
796         for (i = 0; i < DEFRAG_BUCKET_COUNT; i++)
797                 defragmentationCache[i] = NULL;
798         defragCacheLock = GNUNET_mutex_create (GNUNET_NO);
799         GNUNET_cron_add_job (coreAPI->cron,
800                         &defragmentationPurgeCron,
801                         60 * GNUNET_CRON_SECONDS, 60 * GNUNET_CRON_SECONDS,
802                         NULL);
803         GNUNET_GE_LOG (capi->ectx,
804                         GNUNET_GE_INFO | GNUNET_GE_USER | GNUNET_GE_REQUEST,
805                         _("`%s' registering handler %d\n"), "fragmentation",
806                         GNUNET_P2P_PROTO_MESSAGE_FRAGMENT);
807         capi->p2p_ciphertext_handler_register (GNUNET_P2P_PROTO_MESSAGE_FRAGMENT,
808                         &processFragment);
809
810         ret.fragment = &fragment;
811         return &ret;
812 }
813
814 /**
815  * Shutdown fragmentation.
816  */
817 void
818 release_module_fragmentation ()
819 {
820         int i;
821
822         coreAPI->p2p_ciphertext_handler_unregister
823         (GNUNET_P2P_PROTO_MESSAGE_FRAGMENT, &processFragment);
824         GNUNET_cron_del_job (coreAPI->cron, &defragmentationPurgeCron,
825                         60 * GNUNET_CRON_SECONDS, NULL);
826         for (i = 0; i < DEFRAG_BUCKET_COUNT; i++)
827         {
828                 FC *pos = defragmentationCache[i];
829                 while (pos != NULL)
830                 {
831                         FC *next = pos->next;
832                         freeFL (pos->head, 1);
833                         GNUNET_free (pos);
834                         pos = next;
835                 }
836         }
837         if (stats != NULL)
838         {
839                 coreAPI->service_release (stats);
840                 stats = NULL;
841         }
842         GNUNET_mutex_destroy (defragCacheLock);
843         defragCacheLock = NULL;
844         coreAPI = NULL;
845 }
846
847 #endif
848
849 /* end of fragmentation.c */