(no commit message)
[oweals/gnunet.git] / src / fragmentation / fragmentation.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2006, 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19  */
20 /**
21  * @file fragmentation/fragmentation.c
22  * @brief fragmentation and defragmentation, this code allows
23  *        sending and receiving messages that are larger than
24  *        the MTU of the transport.  Messages are still limited
25  *        to a maximum size of 65535 bytes, which is a good
26  *        idea because otherwise we may need ungainly fragmentation
27  *        buffers.  Each connected peer can have at most one
28  *        fragmented packet at any given point in time (prevents
29  *        DoS attacks).  Fragmented messages that have not been
30  *        completed after a certain amount of time are discarded.
31  * @author Ji Lu
32  */
33
34 #include "platform.h"
35 #include "gnunet_fragmentation_lib.h"
36 #include "gnunet_protocols.h"
37 #include "gnunet_util_lib.h"
38
39 /**
40  * Message fragment.  This header is followed
41  * by the actual data of the fragment.
42  */
43
44 struct Fragment
45 {
46
47         struct GNUNET_MessageHeader header;
48
49         /**
50          * Fragment offset.
51          */
52         uint16_t off GNUNET_PACKED;
53
54         /**
55         * "unique" id for the fragment
56          */
57         uint32_t id GNUNET_PACKED;
58         uint16_t mtu;
59         uint16_t totalNum;
60         uint16_t totalSize;
61
62 };
63
64 struct GNUNET_FRAGEMENT_Ctxbuffer{
65         struct GNUNET_FRAGEMENT_Ctxbuffer *next;
66         uint32_t id;
67         uint16_t size;
68         char * buff;
69         int counter;
70         struct GNUNET_TIME_Absolute receivedTime;
71         struct GNUNET_PeerIdentity peerID;
72         int * num;
73 };
74
75
76 /**
77  * Defragmentation context.
78  */
79 struct GNUNET_FRAGMENT_Context
80 {
81         uint32_t maxNum;
82         struct GNUNET_FRAGEMENT_Ctxbuffer *buffer;
83         GNUNET_FRAGMENT_MessageProcessor proc;
84         void *proc_cls;
85 };
86
87
88 /**
89  * Fragment an over-sized message.
90  *
91  * @param msg the message to fragment
92  * @param mtu the maximum message size
93  * @param proc function to call for each fragment
94  * @param proc_cls closure for proc
95  */
96 void
97 GNUNET_FRAGMENT_fragment (const struct GNUNET_MessageHeader *msg,
98                 uint16_t mtu,
99                 GNUNET_FRAGMENT_MessageProcessor proc,
100                 void *proc_cls)
101 {
102         uint32_t id = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, 256);
103         size_t size = sizeof(struct Fragment);
104
105         if(ntohs(msg->size) > mtu-size){
106                 uint16_t lastSize;
107                 uint16_t num;
108                 uint16_t i;
109                 uint16_t actualNum;
110                 lastSize = ntohs(msg->size) % (mtu-size);
111                 num     = ntohs(msg->size) / (mtu - size);
112                 actualNum = num;
113                 if(lastSize!=0){
114                         actualNum = num+1;
115                 }
116                 for(i = 0; i<actualNum; i++)
117                 {
118                         struct Fragment *frag;
119                         if(actualNum != num){
120                                 if(i!=actualNum-1){
121                                         frag = (struct Fragment *)GNUNET_malloc(mtu);
122                                 }
123                                 else{
124                                         frag = (struct Fragment *)GNUNET_malloc(lastSize+size);
125                                         }
126                                 }
127                         else{
128                                         frag = (struct Fragment *)GNUNET_malloc(mtu);
129                                 }
130                         frag->header.type = htons(GNUNET_MESSAGE_TYPE_FRAGMENT);
131                         frag->id = htonl(id);
132                         frag->off = htons((mtu-size)*i);
133                         frag->mtu = htons(mtu);
134                         frag->totalNum = htons(actualNum);
135                         frag->totalSize = msg->size;
136                         char *tmpMsg =  (char *)msg;
137                         if(actualNum != num){
138                                 if(i!=actualNum-1){
139                                         frag->header.size = htons(mtu);
140                                         memcpy(&frag[1], tmpMsg + (mtu-size)*i, mtu - size);
141                                 }
142                                 else{
143                                         frag->header.size = htons(lastSize+size);
144                                         memcpy(&frag[1], tmpMsg + (mtu-size)*i, lastSize);
145                                 }
146                         }
147                         else{
148                                 frag->header.size = htons(mtu);
149                                 memcpy(&frag[1], tmpMsg + (mtu-size)*i, mtu - size);
150                         }
151                         proc(proc_cls, &frag->header);
152                         GNUNET_free(frag);
153                 }
154         }
155 }
156
157 /**
158  * Create a defragmentation context.
159  *
160  * @param stats statistics context
161  * @param proc function to call with defragmented messages
162  * @param proc_cls closure for proc
163  * @return the defragmentation context
164  */
165 struct GNUNET_FRAGMENT_Context *
166 GNUNET_FRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
167                 GNUNET_FRAGMENT_MessageProcessor proc,
168                 void *proc_cls)
169                 {
170         struct GNUNET_FRAGMENT_Context *ctx = (struct GNUNET_FRAGMENT_Context*)GNUNET_malloc(sizeof(struct GNUNET_FRAGMENT_Context));
171         ctx->maxNum = 100;
172         ctx->proc = proc;
173         ctx->proc_cls = proc_cls;
174         ctx->buffer = NULL;
175         return ctx;
176                 }
177
178
179 /**
180  * Destroy the given defragmentation context.
181  */
182 void
183 GNUNET_FRAGMENT_context_destroy (struct GNUNET_FRAGMENT_Context *ctx)
184 {
185         struct GNUNET_FRAGEMENT_Ctxbuffer *buffer;
186         struct GNUNET_FRAGEMENT_Ctxbuffer *temp;
187         buffer = ctx->buffer;
188
189         while (buffer != NULL)
190           {
191             temp = buffer->next;
192             GNUNET_free(buffer->num);
193             GNUNET_free(buffer);
194             buffer = temp;
195           }
196         GNUNET_free(ctx);
197         GNUNET_assert (0);
198 }
199
200
201 /**
202  * We have received a fragment.  Process it.
203  *
204  * @param ctx the context
205  * @param sender who transmitted the fragment
206  * @param msg the message that was received
207  */
208 void
209 GNUNET_FRAGMENT_process (struct GNUNET_FRAGMENT_Context *ctx,
210                 const struct GNUNET_PeerIdentity *sender,
211                 const struct GNUNET_MessageHeader *msg)
212 {
213         uint16_t type = ntohs(msg->type);
214         int exist = 0, received = 0;
215         if(type!=GNUNET_MESSAGE_TYPE_FRAGMENT){
216                 return;
217         }
218         struct Fragment *frag = (struct Fragment *)msg;
219         struct GNUNET_FRAGEMENT_Ctxbuffer* buffer;
220         struct GNUNET_FRAGEMENT_Ctxbuffer* prev;
221         prev = NULL;
222         buffer = ctx->buffer;
223         while (buffer != NULL)
224         {
225                 if ((buffer->id == ntohl(frag->id))&&(0 == memcmp (&buffer->peerID,
226                              sender, sizeof (struct GNUNET_PeerIdentity)))){
227                         exist = 1;
228                         break;
229                 }
230                 prev = buffer;
231                 buffer = buffer->next;
232         }
233
234         if (exist)
235         {
236                 int i;
237                 for(i = 0; i<ntohs(frag->totalNum); i++){
238                         if(buffer->num[i]==ntohs(frag->off)/(ntohs(frag->mtu)-sizeof(struct Fragment))){
239                                 received = 1;
240                                 break;
241                                 }
242                 }
243         }
244
245         if(!exist){
246                 buffer = GNUNET_malloc(sizeof(struct GNUNET_FRAGEMENT_Ctxbuffer));
247                 buffer->num = GNUNET_malloc(ntohs(frag->totalNum)*sizeof(int));
248                 int j;
249                 for(j = 0; j<ntohs(frag->totalNum); j++){
250                         buffer->num[j] = -10;
251                 }
252                 buffer->peerID = *sender;
253                 buffer->id = ntohl(frag->id);
254                 buffer->receivedTime = GNUNET_TIME_absolute_get ();
255                 uint16_t si = ntohs(frag->totalSize);
256                 buffer->size = si;
257                 buffer->buff = GNUNET_malloc(si);
258                 buffer->next = ctx->buffer;
259                 ctx->buffer = buffer;
260         }
261
262         if(!received){
263                 buffer->num[buffer->counter++]=ntohs(frag->off)/(ntohs(frag->mtu)-sizeof(struct Fragment));
264                 uint16_t sizeoffrag = ntohs(frag->header.size) - sizeof(struct Fragment);
265                 memcpy(&buffer->buff[ntohs(frag->off)], &frag[1], sizeoffrag);
266                 buffer->receivedTime = GNUNET_TIME_absolute_get ();
267         }
268
269         if(buffer->counter == ntohs(frag->totalNum))
270         {
271                 ctx->proc(ctx->proc_cls, (struct GNUNET_MessageHeader *)buffer->buff);
272                 if(prev==NULL){
273                         ctx->buffer = buffer->next;
274                 }
275                 else{
276                         prev->next = buffer->next;
277                 }
278                 GNUNET_free(buffer);
279                 return;
280         }
281 }
282
283
284
285 #if 0
286
287 /**
288  * How many buckets does the fragment hash table
289  * have?
290  */
291 #define DEFRAG_BUCKET_COUNT 16
292
293 /**
294  * After how long do fragments time out?
295  */
296 #ifndef DEFRAGMENTATION_TIMEOUT
297 #define DEFRAGMENTATION_TIMEOUT (3 * GNUNET_CRON_MINUTES)
298 #endif
299
300 /**
301  * Entry in the linked list of fragments.
302  */
303 typedef struct FL
304 {
305         struct FL *link;
306         P2P_fragmentation_MESSAGE *frag;
307 } FL;
308
309 /**
310  * Entry in the GNUNET_hash table of fragments.
311  */
312 typedef struct FC
313 {
314         struct FC *next;
315         FL *head;
316         GNUNET_PeerIdentity sender;
317         int id;
318         GNUNET_CronTime ttl;
319 } FC;
320
321 #define FRAGSIZE(fl) ((ntohs(fl->frag->header.size)-sizeof(P2P_fragmentation_MESSAGE)))
322
323 static GNUNET_CoreAPIForPlugins *coreAPI;
324
325 static GNUNET_Stats_ServiceAPI *stats;
326
327 static int stat_defragmented;
328
329 static int stat_fragmented;
330
331 static int stat_discarded;
332
333 /**
334  * Hashtable *with* collision management!
335  */
336 static FC *defragmentationCache[DEFRAG_BUCKET_COUNT];
337
338 /**
339  * Lock for the defragmentation cache.
340  */
341 static struct GNUNET_Mutex *defragCacheLock;
342
343 static void
344 freeFL (FL * fl, int c)
345 {
346         while (fl != NULL)
347         {
348                 FL *link = fl->link;
349                 if (stats != NULL)
350                         stats->change (stat_discarded, c);
351                 GNUNET_free (fl->frag);
352                 GNUNET_free (fl);
353                 fl = link;
354         }
355 }
356
357 /**
358  * This cron job ensures that we purge buffers of fragments
359  * that have timed out.  It can run in much longer intervals
360  * than the defragmentationCron, e.g. every 60s.
361  * <p>
362  * This method goes through the hashtable, finds entries that
363  * have timed out and removes them (and all the fragments that
364  * belong to the entry).  It's a bit more complicated as the
365  * collision list is also collapsed.
366  */
367 static void
368 defragmentationPurgeCron (void *unused)
369 {
370         int i;
371         FC *smf;
372         FC *next;
373         FC *last;
374
375         GNUNET_mutex_lock (defragCacheLock);
376         for (i = 0; i < DEFRAG_BUCKET_COUNT; i++)
377         {
378                 last = NULL;
379                 smf = defragmentationCache[i];
380                 while (smf != NULL)
381                 {
382                         if (smf->ttl < GNUNET_get_time ())
383                         {
384                                 /* free linked list of fragments */
385                                 freeFL (smf->head, 1);
386                                 next = smf->next;
387                                 GNUNET_free (smf);
388                                 if (last == NULL)
389                                         defragmentationCache[i] = next;
390                                 else
391                                         last->next = next;
392                                 smf = next;
393                         }
394                         else
395                         {
396                                 last = smf;
397                                 smf = smf->next;
398                         }
399                 }                       /* while smf != NULL */
400         }                           /* for all buckets */
401         GNUNET_mutex_unlock (defragCacheLock);
402 }
403
404 /**
405  * Check if this fragment-list is complete.  If yes, put it together,
406  * process and free all buffers.  Does not free the pep
407  * itself (but sets the TTL to 0 to have the cron free it
408  * in the next iteration).
409  *
410  * @param pep the entry in the GNUNET_hash table
411  */
412 static void
413 checkComplete (FC * pep)
414 {
415         FL *pos;
416         unsigned short off;
417         unsigned short len;
418         char *msg;
419
420         GNUNET_GE_ASSERT (NULL, pep != NULL);
421         pos = pep->head;
422         if (pos == NULL)
423                 return;
424         len = ntohs (pos->frag->len);
425         if (len == 0)
426                 goto CLEANUP;               /* really bad error! */
427         off = 0;
428         while ((pos != NULL) && (ntohs (pos->frag->off) <= off))
429         {
430                 if (off >= off + FRAGSIZE (pos))
431                         goto CLEANUP;           /* error! */
432                 if (ntohs (pos->frag->off) + FRAGSIZE (pos) > off)
433                         off = ntohs (pos->frag->off) + FRAGSIZE (pos);
434                 else
435                         goto CLEANUP;           /* error! */
436                 pos = pos->link;
437         }
438         if (off < len)
439                 return;                     /* some fragment is still missing */
440
441         msg = GNUNET_malloc (len);
442         pos = pep->head;
443         while (pos != NULL)
444         {
445                 memcpy (&msg[ntohs (pos->frag->off)], &pos->frag[1], FRAGSIZE (pos));
446                 pos = pos->link;
447         }
448         if (stats != NULL)
449                 stats->change (stat_defragmented, 1);
450 #if 0
451         printf ("Finished defragmentation!\n");
452 #endif
453         /* handle message! */
454         coreAPI->loopback_send (&pep->sender, msg, len, GNUNET_YES, NULL);
455         GNUNET_free (msg);
456         CLEANUP:
457         /* free fragment buffers */
458         freeFL (pep->head, 0);
459         pep->head = NULL;
460         pep->ttl = 0;
461 }
462
463 /**
464  * See if the new fragment is a part of this entry and join them if
465  * yes.  Return GNUNET_SYSERR if the fragments do not match.  Return GNUNET_OK if
466  * the fragments do match and the fragment has been processed.  The
467  * defragCacheLock is already acquired by the caller whenever this
468  * method is called.<p>
469  *
470  * @param entry the entry in the cache
471  * @param pep the new entry
472  * @param packet the ip part in the new entry
473  */
474 static int
475 tryJoin (FC * entry,
476                 const GNUNET_PeerIdentity * sender,
477                 const P2P_fragmentation_MESSAGE * packet)
478 {
479         /* frame before ours; may end in the middle of
480      our frame or before it starts; NULL if we are
481      the earliest position we have received so far */
482         FL *before;
483         /* frame after ours; may start in the middle of
484      our frame or after it; NULL if we are the last
485      fragment we have received so far */
486         FL *after;
487         /* current position in the frame-list */
488         FL *pos;
489         /* the new entry that we're inserting */
490         FL *pep;
491         FL *tmp;
492         unsigned short end;
493
494         GNUNET_GE_ASSERT (NULL, entry != NULL);
495         if (0 != memcmp (sender, &entry->sender, sizeof (GNUNET_PeerIdentity)))
496                 return GNUNET_SYSERR;       /* wrong fragment list, try another! */
497         if (ntohl (packet->id) != entry->id)
498                 return GNUNET_SYSERR;       /* wrong fragment list, try another! */
499 #if 0
500         printf ("Received fragment %u from %u to %u\n",
501                         ntohl (packet->id),
502                         ntohs (packet->off),
503                         ntohs (packet->off) + ntohs (packet->header.size) -
504                         sizeof (P2P_fragmentation_MESSAGE));
505 #endif
506         pos = entry->head;
507         if ((pos != NULL) && (packet->len != pos->frag->len))
508                 return GNUNET_SYSERR;       /* wrong fragment size */
509
510         before = NULL;
511         /* find the before-frame */
512         while ((pos != NULL) && (ntohs (pos->frag->off) < ntohs (packet->off)))
513         {
514                 before = pos;
515                 pos = pos->link;
516         }
517
518         /* find the after-frame */
519         end =
520                         ntohs (packet->off) + ntohs (packet->header.size) -
521                         sizeof (P2P_fragmentation_MESSAGE);
522         if (end <= ntohs (packet->off))
523         {
524                 GNUNET_GE_LOG (NULL,
525                                 GNUNET_GE_DEVELOPER | GNUNET_GE_DEBUG | GNUNET_GE_BULK,
526                                 "Received invalid fragment at %s:%d\n", __FILE__,
527                                 __LINE__);
528                 return GNUNET_SYSERR;     /* yuck! integer overflow! */
529         }
530
531         if (before != NULL)
532                 after = before;
533         else
534                 after = entry->head;
535         while ((after != NULL) && (ntohs (after->frag->off) < end))
536                 after = after->link;
537
538         if ((before != NULL) && (before == after))
539         {
540                 /* this implies after or before != NULL and thereby the new
541          fragment is redundant as it is fully enclosed in an earlier
542          fragment */
543                 if (stats != NULL)
544                         stats->change (stat_defragmented, 1);
545                 return GNUNET_OK;         /* drop, there is a packet that spans our range! */
546         }
547
548         if ((before != NULL) &&
549                         (after != NULL) &&
550                         ((htons (before->frag->off) +
551                                         FRAGSIZE (before)) >= htons (after->frag->off)))
552         {
553                 /* this implies that the fragment that starts before us and the
554          fragment that comes after this one leave no space in the middle
555          or even overlap; thus we can drop this redundant piece */
556                 if (stats != NULL)
557                         stats->change (stat_defragmented, 1);
558                 return GNUNET_OK;
559         }
560
561         /* allocate pep */
562         pep = GNUNET_malloc (sizeof (FC));
563         pep->frag = GNUNET_malloc (ntohs (packet->header.size));
564         memcpy (pep->frag, packet, ntohs (packet->header.size));
565         pep->link = NULL;
566
567         if (before == NULL)
568         {
569                 pep->link = after;
570                 pos = entry->head;
571                 while (pos != after)
572                 {
573                         tmp = pos->link;
574                         GNUNET_free (pos->frag);
575                         GNUNET_free (pos);
576                         pos = tmp;
577                 }
578                 entry->head = pep;
579                 goto FINISH;
580                 /* end of insert first */
581         }
582
583         if (after == NULL)
584         {
585                 /* insert last: find the end, free everything after it */
586                 freeFL (before->link, 1);
587                 before->link = pep;
588                 goto FINISH;
589         }
590
591         /* ok, we are filling the middle between two fragments; insert.  If
592      there is anything else in the middle, it can be dropped as we're
593      bigger & cover that area as well */
594         /* free everything between before and after */
595         pos = before->link;
596         while (pos != after)
597         {
598                 tmp = pos->link;
599                 GNUNET_free (pos->frag);
600                 GNUNET_free (pos);
601                 pos = tmp;
602         }
603         before->link = pep;
604         pep->link = after;
605
606         FINISH:
607         entry->ttl = GNUNET_get_time () + DEFRAGMENTATION_TIMEOUT;
608         checkComplete (entry);
609         return GNUNET_OK;
610 }
611
612 /**
613  * Defragment the given fragment and pass to handler once
614  * defragmentation is complete.
615  *
616  * @param frag the packet to defragment
617  * @return GNUNET_SYSERR if the fragment is invalid
618  */
619 static int
620 processFragment (const GNUNET_PeerIdentity * sender,
621                 const GNUNET_MessageHeader * frag)
622 {
623         unsigned int hash;
624         FC *smf;
625
626         if (ntohs (frag->size) < sizeof (P2P_fragmentation_MESSAGE))
627                 return GNUNET_SYSERR;
628
629         GNUNET_mutex_lock (defragCacheLock);
630         hash = sender->hashPubKey.bits[0] % DEFRAG_BUCKET_COUNT;
631         smf = defragmentationCache[hash];
632         while (smf != NULL)
633         {
634                 if (GNUNET_OK ==
635                                 tryJoin (smf, sender, (P2P_fragmentation_MESSAGE *) frag))
636                 {
637                         GNUNET_mutex_unlock (defragCacheLock);
638                         return GNUNET_OK;
639                 }
640                 if (0 == memcmp (sender, &smf->sender, sizeof (GNUNET_PeerIdentity)))
641                 {
642                         freeFL (smf->head, 1);
643                         break;
644                 }
645                 smf = smf->next;
646         }
647         if (smf == NULL)
648         {
649                 smf = GNUNET_malloc (sizeof (FC));
650                 smf->next = defragmentationCache[hash];
651                 defragmentationCache[hash] = smf;
652                 smf->ttl = GNUNET_get_time () + DEFRAGMENTATION_TIMEOUT;
653                 smf->sender = *sender;
654         }
655         smf->id = ntohl (((P2P_fragmentation_MESSAGE *) frag)->id);
656         smf->head = GNUNET_malloc (sizeof (FL));
657         smf->head->link = NULL;
658         smf->head->frag = GNUNET_malloc (ntohs (frag->size));
659         memcpy (smf->head->frag, frag, ntohs (frag->size));
660
661         GNUNET_mutex_unlock (defragCacheLock);
662         return GNUNET_OK;
663 }
664
665 typedef struct
666 {
667         GNUNET_PeerIdentity sender;
668         /* maximums size of each fragment */
669         unsigned short mtu;
670         /** how long is this message part expected to be? */
671         unsigned short len;
672         /** when did we intend to transmit? */
673         GNUNET_CronTime transmissionTime;
674 } FragmentBMC;
675
676 /**
677  * Send a message that had to be fragmented (right now!).  First grabs
678  * the first part of the message (obtained from ctx->se) and stores
679  * that in a P2P_fragmentation_MESSAGE envelope.  The remaining fragments are
680  * added to the send queue with GNUNET_EXTREME_PRIORITY (to ensure that they
681  * will be transmitted next).  The logic here is that if the priority
682  * for the first fragment was sufficiently high, the priority should
683  * also have been sufficiently high for all of the other fragments (at
684  * this time) since they have the same priority.  And we want to make
685  * sure that we send all of them since just sending the first fragment
686  * and then going to other messages of equal priority would not be
687  * such a great idea (i.e. would just waste bandwidth).
688  */
689 static int
690 fragmentBMC (void *buf, void *cls, unsigned short len)
691 {
692         FragmentBMC *ctx = cls;
693         static int idGen = 0;
694         P2P_fragmentation_MESSAGE *frag;
695         unsigned int pos;
696         int id;
697         unsigned short mlen;
698
699         if ((len < ctx->mtu) || (buf == NULL))
700         {
701                 GNUNET_free (ctx);
702                 return GNUNET_SYSERR;
703         }
704         if (stats != NULL)
705                 stats->change (stat_fragmented, 1);
706         id = (idGen++) + GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, 512);
707         /* write first fragment to buf */
708         frag = (P2P_fragmentation_MESSAGE *) buf;
709         frag->header.size = htons (len);
710         frag->header.type = htons (GNUNET_P2P_PROTO_MESSAGE_FRAGMENT);
711         frag->id = id;
712         frag->off = htons (0);
713         frag->len = htons (ctx->len);
714         memcpy (&frag[1], &ctx[1], len - sizeof (P2P_fragmentation_MESSAGE));
715
716         /* create remaining fragments, add to queue! */
717         pos = len - sizeof (P2P_fragmentation_MESSAGE);
718         frag = GNUNET_malloc (ctx->mtu);
719         while (pos < ctx->len)
720         {
721                 mlen = sizeof (P2P_fragmentation_MESSAGE) + ctx->len - pos;
722                 if (mlen > ctx->mtu)
723                         mlen = ctx->mtu;
724                 GNUNET_GE_ASSERT (NULL, mlen > sizeof (P2P_fragmentation_MESSAGE));
725                 frag->header.size = htons (mlen);
726                 frag->header.type = htons (GNUNET_P2P_PROTO_MESSAGE_FRAGMENT);
727                 frag->id = id;
728                 frag->off = htons (pos);
729                 frag->len = htons (ctx->len);
730                 memcpy (&frag[1],
731                                 &((char *) (&ctx[1]))[pos],
732                                 mlen - sizeof (P2P_fragmentation_MESSAGE));
733                 coreAPI->ciphertext_send (&ctx->sender,
734                                 &frag->header,
735                                 GNUNET_EXTREME_PRIORITY,
736                                 ctx->transmissionTime - GNUNET_get_time ());
737                 pos += mlen - sizeof (P2P_fragmentation_MESSAGE);
738         }
739         GNUNET_GE_ASSERT (NULL, pos == ctx->len);
740         GNUNET_free (frag);
741         GNUNET_free (ctx);
742         return GNUNET_OK;
743 }
744
745 /**
746  * The given message must be fragmented.  Produce a placeholder that
747  * corresponds to the first fragment.  Once that fragment is scheduled
748  * for transmission, the placeholder should automatically add all of
749  * the other fragments (with very high priority).
750  */
751 void
752 fragment (const GNUNET_PeerIdentity * peer,
753                 unsigned int mtu,
754                 unsigned int prio,
755                 unsigned int targetTime,
756                 unsigned int len, GNUNET_BuildMessageCallback bmc, void *bmcClosure)
757 {
758         FragmentBMC *fbmc;
759         int xlen;
760
761         GNUNET_GE_ASSERT (NULL, len > mtu);
762         GNUNET_GE_ASSERT (NULL, mtu > sizeof (P2P_fragmentation_MESSAGE));
763         fbmc = GNUNET_malloc (sizeof (FragmentBMC) + len);
764         fbmc->mtu = mtu;
765         fbmc->sender = *peer;
766         fbmc->transmissionTime = targetTime;
767         fbmc->len = len;
768         if (bmc == NULL)
769         {
770                 memcpy (&fbmc[1], bmcClosure, len);
771                 GNUNET_free (bmcClosure);
772         }
773         else
774         {
775                 if (GNUNET_SYSERR == bmc (&fbmc[1], bmcClosure, len))
776                 {
777                         GNUNET_free (fbmc);
778                         return;
779                 }
780         }
781         xlen = mtu - sizeof (P2P_fragmentation_MESSAGE);
782         coreAPI->ciphertext_send_with_callback (peer, &fragmentBMC, fbmc, mtu, prio * xlen / len,     /* compute new priority */
783                         targetTime);
784 }
785
786 /**
787  * Initialize Fragmentation module.
788  */
789 GNUNET_Fragmentation_ServiceAPI *
790 provide_module_fragmentation (GNUNET_CoreAPIForPlugins * capi)
791 {
792         static GNUNET_Fragmentation_ServiceAPI ret;
793         int i;
794
795         coreAPI = capi;
796         stats = coreAPI->service_request ("stats");
797         if (stats != NULL)
798         {
799                 stat_defragmented =
800                                 stats->create (gettext_noop ("# messages defragmented"));
801                 stat_fragmented =
802                                 stats->create (gettext_noop ("# messages fragmented"));
803                 stat_discarded = stats->create (gettext_noop ("# fragments discarded"));
804         }
805         for (i = 0; i < DEFRAG_BUCKET_COUNT; i++)
806                 defragmentationCache[i] = NULL;
807         defragCacheLock = GNUNET_mutex_create (GNUNET_NO);
808         GNUNET_cron_add_job (coreAPI->cron,
809                         &defragmentationPurgeCron,
810                         60 * GNUNET_CRON_SECONDS, 60 * GNUNET_CRON_SECONDS,
811                         NULL);
812         GNUNET_GE_LOG (capi->ectx,
813                         GNUNET_GE_INFO | GNUNET_GE_USER | GNUNET_GE_REQUEST,
814                         _("`%s' registering handler %d\n"), "fragmentation",
815                         GNUNET_P2P_PROTO_MESSAGE_FRAGMENT);
816         capi->p2p_ciphertext_handler_register (GNUNET_P2P_PROTO_MESSAGE_FRAGMENT,
817                         &processFragment);
818
819         ret.fragment = &fragment;
820         return &ret;
821 }
822
823 /**
824  * Shutdown fragmentation.
825  */
826 void
827 release_module_fragmentation ()
828 {
829         int i;
830
831         coreAPI->p2p_ciphertext_handler_unregister
832         (GNUNET_P2P_PROTO_MESSAGE_FRAGMENT, &processFragment);
833         GNUNET_cron_del_job (coreAPI->cron, &defragmentationPurgeCron,
834                         60 * GNUNET_CRON_SECONDS, NULL);
835         for (i = 0; i < DEFRAG_BUCKET_COUNT; i++)
836         {
837                 FC *pos = defragmentationCache[i];
838                 while (pos != NULL)
839                 {
840                         FC *next = pos->next;
841                         freeFL (pos->head, 1);
842                         GNUNET_free (pos);
843                         pos = next;
844                 }
845         }
846         if (stats != NULL)
847         {
848                 coreAPI->service_release (stats);
849                 stats = NULL;
850         }
851         GNUNET_mutex_destroy (defragCacheLock);
852         defragCacheLock = NULL;
853         coreAPI = NULL;
854 }
855
856 #endif
857
858 /* end of fragmentation.c */