d446516bd31ccfd2c519a8f383fcaf4a5a58bc4e
[oweals/gnunet.git] / src / transport / transport_api2_communication.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2018 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14     
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 /**
20  * @file transport/transport_api2_communication.c
21  * @brief implementation of the gnunet_transport_communication_service.h API
22  * @author Christian Grothoff
23  */
24 #include "platform.h"
25 #include "gnunet_util_lib.h"
26 #include "gnunet_protocols.h"
27 #include "gnunet_transport_communication_service.h"
28 #include "transport.h"
29
30
31 /**
32  * How many messages do we keep at most in the queue to the 
33  * transport service before we start to drop (default, 
34  * can be changed via the configuration file).
35  */
36 #define DEFAULT_MAX_QUEUE_LENGTH 16
37
38
39 /**
40  * Information we track per packet to enable flow control.
41  */
42 struct FlowControl
43 {
44   /**
45    * Kept in a DLL.
46    */
47   struct FlowControl *next;
48
49   /**
50    * Kept in a DLL.
51    */
52   struct FlowControl *prev;
53
54   /**
55    * Function to call once the message was processed.
56    */
57   GNUNET_TRANSPORT_MessageCompletedCallback cb;
58
59   /**
60    * Closure for @e cb
61    */
62   void *cb_cls;
63   
64   /**
65    * Which peer is this about?
66    */
67   struct GNUNET_PeerIdentity sender;
68
69   /**
70    * More-or-less unique ID for the message.
71    */
72   uint64_t id;
73 };
74
75
76 /**
77  * Information we track per message to tell the transport about
78  * success or failures.
79  */
80 struct AckPending
81 {
82   /**
83    * Kept in a DLL.
84    */
85   struct AckPending *next;
86
87   /**
88    * Kept in a DLL.
89    */
90   struct AckPending *prev;
91
92   /**
93    * Which peer is this about?
94    */
95   struct GNUNET_PeerIdentity receiver;
96
97   /**
98    * More-or-less unique ID for the message.
99    */
100   uint64_t mid;
101 };
102
103
104 /**
105  * Opaque handle to the transport service for communicators.
106  */
107 struct GNUNET_TRANSPORT_CommunicatorHandle
108 {
109   /**
110    * Head of DLL of addresses this communicator offers to the transport service.
111    */
112   struct GNUNET_TRANSPORT_AddressIdentifier *ai_head;
113
114   /**
115    * Tail of DLL of addresses this communicator offers to the transport service.
116    */
117   struct GNUNET_TRANSPORT_AddressIdentifier *ai_tail;
118
119   /**
120    * DLL of messages awaiting flow control confirmation (ack).
121    */
122   struct FlowControl *fc_head;
123
124   /**
125    * DLL of messages awaiting flow control confirmation (ack).
126    */
127   struct FlowControl *fc_tail;
128
129   /**
130    * DLL of messages awaiting transmission confirmation (ack).
131    */
132   struct AckPending *ap_head;
133
134   /**
135    * DLL of messages awaiting transmission confirmation (ack).
136    */
137   struct AckPending *ac_tail;
138
139   /**
140    * DLL of queues we offer.
141    */
142   struct QueueHandle *queue_head;
143   
144   /**
145    * DLL of queues we offer.
146    */
147   struct QueueHandle *queue_tail;
148     
149   /**
150    * Our configuration.
151    */
152   const struct GNUNET_CONFIGURATION_Handle *cfg;
153
154   /**
155    * Name of the communicator.
156    */
157   const char *name;
158
159   /**
160    * Function to call when the transport service wants us to initiate
161    * a communication channel with another peer.
162    */
163   GNUNET_TRANSPORT_CommunicatorMqInit mq_init;
164
165   /**
166    * Closure for @e mq_init.
167    */
168   void *mq_init_cls;
169
170   /**
171    * Maximum permissable queue length.
172    */
173   unsigned long long max_queue_length;
174   
175   /**
176    * Flow-control identifier generator.
177    */
178   uint64_t fc_gen;
179
180   /**
181    * MTU of the communicator
182    */
183   size_t mtu;
184   
185   /**
186    * Internal UUID for the address used in communication with the
187    * transport service.
188    */
189   uint32_t aid_gen;
190
191   /**
192    * Queue identifier generator.
193    */
194   uint32_t queue_gen;
195   
196 };
197
198
199 /**
200  * Handle returned to identify the internal data structure the transport
201  * API has created to manage a message queue to a particular peer.
202  */
203 struct GNUNET_TRANSPORT_QueueHandle
204 {
205   /**
206    * Handle this queue belongs to.
207    */
208   struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
209
210   /**
211    * Which peer we can communciate with.
212    */
213   struct GNUNET_PeerIdentity peer;
214
215   /**
216    * Address used by the communication queue.
217    */ 
218   char *address;
219
220   /**
221    * Network type of the communciation queue.
222    */
223   enum GNUNET_ATS_Network_Type nt;
224
225   /**
226    * The queue itself.
227    */ 
228   struct GNUNET_MQ_Handle *mq;
229
230   /**
231    * ID for this queue when talking to the transport service.
232    */
233   uint32_t queue_id;
234   
235 };
236
237
238 /**
239  * Internal representation of an address a communicator is
240  * currently providing for the transport service.
241  */
242 struct GNUNET_TRANSPORT_AddressIdentifier
243 {
244
245   /**
246    * Kept in a DLL.
247    */
248   struct GNUNET_TRANSPORT_AddressIdentifier *next;
249
250   /**
251    * Kept in a DLL.
252    */
253   struct GNUNET_TRANSPORT_AddressIdentifier *prev;
254   
255   /**
256    * Transport handle where the address was added.
257    */
258   struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
259
260   /**
261    * The actual address.
262    */
263   char *address;
264
265   /**
266    * When does the address expire? (Expected lifetime of the
267    * address.)
268    */
269   struct GNUNET_TIME_Relative expiration;
270   
271   /**
272    * Internal UUID for the address used in communication with the
273    * transport service.
274    */
275   uint32_t aid;
276
277   /**
278    * Network type for the address.
279    */
280   enum GNUNET_ATS_Network_Type nt;
281   
282 };
283
284
285 /**
286  * (re)connect our communicator to the transport service
287  *
288  * @param ch handle to reconnect
289  */
290 static void
291 reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch);
292
293
294 /**
295  * Send message to the transport service about address @a ai
296  * being now available.
297  *
298  * @param ai address to add
299  */
300 static void
301 send_add_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
302 {
303   struct GNUNET_MQ_Envelope *env;
304   struct GNUNET_TRANSPORT_AddAddressMessage *aam;
305   
306   if (NULL == ai->ch->mq)
307     return;
308   env = GNUNET_MQ_msg_extra (aam,
309                              strlen (ai->address) + 1,
310                              GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS);
311   aam->expiration = GNUNET_TIME_relative_to_nbo (ai->expiration);
312   aam->nt = htonl ((uint32_t) ai->nt);
313   memcpy (&aam[1],
314           ai->address,
315           strlen (ai->address) + 1);
316   GNUNET_MQ_send (ai->ch->mq,
317                   env);
318 }
319
320
321 /**
322  * Send message to the transport service about address @a ai
323  * being no longer available.
324  *
325  * @param ai address to delete
326  */
327 static void
328 send_del_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
329 {
330   struct GNUNET_MQ_Envelope *env;
331   struct GNUNET_TRANSPORT_DelAddressMessage *dam;
332   
333   if (NULL == ai->ch->mq)
334     return;
335   env = GNUNET_MQ_msg (dam,                          
336                        GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS);
337   dam.aid = htonl (ai->aid);
338   GNUNET_MQ_send (ai->ch->mq,
339                   env);
340 }
341
342
343 /**
344  * Send message to the transport service about queue @a qh
345  * being now available.
346  *
347  * @param qh queue to add
348  */
349 static void
350 send_add_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
351 {
352   struct GNUNET_MQ_Envelope *env;
353   struct GNUNET_TRANSPORT_AddQueueMessage *aqm;
354   
355   if (NULL == ai->ch->mq)
356     return;
357   env = GNUNET_MQ_msg_extra (aqm,
358                              strlen (ai->address) + 1,
359                              GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_QUEUE);
360   aqm.receiver = qh->peer;
361   aqm.nt = htonl ((uint32_t) qh->nt);
362   aqm.qid = htonl (qh->qid);
363   memcpy (&aqm[1],
364           ai->address,
365           strlen (ai->address) + 1);
366   GNUNET_MQ_send (ai->ch->mq,
367                   env);
368 }
369
370
371 /**
372  * Send message to the transport service about queue @a qh
373  * being no longer available.
374  *
375  * @param qh queue to delete
376  */
377 static void
378 send_del_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
379 {
380   struct GNUNET_MQ_Envelope *env;
381   struct GNUNET_TRANSPORT_DelQueueMessage *dqm;
382   
383   if (NULL == ai->ch->mq)
384     return;
385   env = GNUNET_MQ_msg (dqm,                          
386                        GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_QUEUE);
387   dqm.qid = htonl (qh->qid);
388   dqm.receiver = qh->peer;
389   GNUNET_MQ_send (ai->ch->mq,
390                   env);
391 }
392
393
394 /**
395  * Disconnect from the transport service.  Purges
396  * all flow control entries as we will no longer receive
397  * the ACKs.  Purges the ack pending entries as the
398  * transport will no longer expect the confirmations.
399  *
400  * @param ch service to disconnect from
401  */
402 static void
403 disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
404 {
405   struct FlowControl *fcn;
406   struct AckPending *apn;
407   
408   for (struct FlowControl *fc = ch->fc_head;
409        NULL != fc;
410        fc = fcn)
411   {
412     fcn = fc->next;
413     GNUNET_CONTAINER_DLL_remove (ch->fc_head,
414                                  ch->fc_tail,
415                                  fc);
416     fc->cb (fc->cb_cls,
417             GNUNET_SYSERR);
418     GNUNET_free (fc);
419   }
420   for (struct AckPending *ap = ch->ap_head;
421        NULL != ap;
422        ap = apn)
423   {
424     apn = ap->next;
425     GNUNET_CONTAINER_DLL_remove (ch->ap_head,
426                                  ch->ap_tail,
427                                  ap);
428     GNUNET_free (ap);
429   }
430   if (NULL == ch->mq)
431     return;
432   GNUNET_MQ_destroy (ch->mq);
433   ch->mq = NULL;
434 }
435
436
437 /**
438  * Function called on MQ errors.
439  */
440 static void
441 error_handler (void *cls,
442                enum GNUNET_MQ_Error error)
443 {
444   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
445
446   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
447               "MQ failure, reconnecting to transport service.\n");
448   disconnect (ch);
449   /* TODO: maybe do this with exponential backoff/delay */
450   reconnect (ch);
451 }
452
453
454 /**
455  * Transport service acknowledged a message we gave it
456  * (with flow control enabled). Tell the communicator.
457  *
458  * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
459  * @param incoming_ack the ack
460  */
461 static void
462 handle_incoming_ack (void *cls,
463                      struct GNUNET_TRANSPORT_IncomingMessageAck *incoming_ack)
464 {
465   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
466   
467   for (struct FlowControl *fc = ch->fc_head;
468        NULL != fc;
469        fc = fc->next)
470   {
471     if ( (fc->id == incoming_ack->fc_id) &&
472          (0 == memcmp (&fc->sender,
473                        incoming_ack->sender,
474                        sizeof (struct GNUNET_PeerIdentity))) )
475     {
476       GNUNET_CONTAINER_DLL_remove (ch->fc_head,
477                                    ch->fc_tail,
478                                    fc);
479       fc->cb (fc->cb_cls,
480               GNUNET_OK);
481       GNUNET_free (fc);
482       return;
483     }
484   }
485   GNUNET_break (0);
486   disconnect (ch);
487   /* TODO: maybe do this with exponential backoff/delay */
488   reconnect (ch);
489 }
490
491
492 /**
493  * Transport service wants us to create a queue. Check if @a cq
494  * is well-formed.
495  *
496  * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
497  * @param cq the queue creation request
498  * @return #GNUNET_OK if @a smt is well-formed
499  */
500 static int
501 check_create_queue (void *cls,
502                     struct GNUNET_TRANSPORT_CreateQueue *cq)
503 {
504   uint16_t len = ntohs (cq->header.size) - sizeof (*cq);
505   const char *addr = (const char *) &cq[1];
506
507   if ( (0 == len) ||
508        ('\0' != addr[len-1]) )
509   {
510     GNUNET_break (0);
511     return GNUNET_SYSERR;
512   }
513   return GNUNET_OK;
514 }
515
516
517 /**
518  * Transport service wants us to create a queue. Tell the communicator.
519  *
520  * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
521  * @param cq the queue creation request
522  */
523 static void
524 handle_create_queue (void *cls,
525                      struct GNUNET_TRANSPORT_CreateQueue *cq)
526 {
527   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
528   const char *addr = (const char *) &cq[1];
529
530   if (GNUNET_OK !=
531       ch->mq_init (ch->mq_init_cls,
532                    &cq->receiver,
533                    addr))
534   {
535     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
536                 "Address `%s' invalid for this communicator\n",
537                 addr);
538     // TODO: do we notify the transport!?
539   }
540 }
541
542
543 /**
544  * Transport service wants us to send a message. Check if @a smt
545  * is well-formed.
546  *
547  * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
548  * @param smt the transmission request
549  * @return #GNUNET_OK if @a smt is well-formed
550  */
551 static int
552 check_send_msg (void *cls,
553                 struct GNUNET_TRANSPORT_SendMessageTo *smt)
554 {
555   uint16_t len = ntohs (smt->header.size) - sizeof (*smt);
556   const struct GNUNET_MessageHeader *mh = (const struct GNUNET_MessageHeader *) &smt[1];
557
558   if (ntohs (mh->size) != len)
559   {
560     GNUNET_break (0);
561     return GNUNET_SYSERR;
562   }
563   return GNUNET_OK;
564 }
565
566
567 /**
568  * Notify transport service about @a status of a message with
569  * @a mid sent to @a receiver.
570  *
571  * @param ch handle
572  * @param status #GNUNET_OK on success, #GNUNET_SYSERR on failure
573  * @param receiver which peer was the receiver
574  * @param mid message that the ack is about
575  */
576 static void
577 send_ack (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
578           int status,
579           const struct GNUNET_PeerIdentity *receiver,
580           uint64_t mid)
581 {
582   struct GNUNET_MQ_Envelope *env;
583   struct GNUNET_TRANSPORT_SendMessageToAck *ack;
584
585   env = GNUNET_MQ_msg (ack,
586                        GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK);
587   ack->status = htonl (GNUNET_OK);
588   ack->mid = ap->mid;
589   ack->receiver = ap->receiver;
590   GNUNET_MQ_send (ch->mq,
591                   env);
592 }
593
594
595 /**
596  * Message queue transmission by communicator was successful,
597  * notify transport service.
598  *
599  * @param cls an `struct AckPending *`
600  */
601 static void
602 send_ack_cb (void *cls)
603 {
604   struct AckPending *ap = cls;
605   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ap->ch;
606
607   GNUNET_CONTAINER_DLL_remove (ch->ap_head,
608                                ch->ap_tail,
609                                ap);
610   send_ack (ch,
611             GNUNET_OK,
612             &ap->receiver,
613             ap->mid);
614   GNUNET_free (ap);
615 }
616
617
618 /**
619  * Transport service wants us to send a message. Tell the communicator.
620  *
621  * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
622  * @param smt the transmission request
623  */
624 static void
625 handle_send_msg (void *cls,
626                  struct GNUNET_TRANSPORT_SendMessageTo *smt)
627 {
628   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
629   const struct GNUNET_MessageHeader *mh;
630   struct GNUNET_MQ_Envelope *env;
631   struct AckPending *ap;
632   struct QueueHandle *qh;
633
634   for (qh = ch->queue_head;NULL != qh; qh = qh->next)  
635     if ( (qh->queue_id == smt->qid) &&
636          (0 == memcmp (&qh->peer,
637                        &smt->target,
638                        sizeof (struct GNUNET_PeerIdentity))) )
639       break;  
640   if (NULL == qh)
641   {
642     /* queue is already gone, tell transport this one failed */
643     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
644                 "Transmission failed, queue no longer exists.\n");
645     send_ack (ch,
646               GNUNET_NO,
647               &smt->receiver,
648               smt->mid);
649     return;
650   }
651   ap = GNUNET_new (struct AckPending);
652   ap->ch = ch;
653   ap->receiver = smt->receiver;
654   ap->mid = smt->mid;
655   GNUNET_CONTAINER_DLL_insert (ch->ap_head,
656                                cp->ap_tail,
657                                ap);
658   mh = (const struct GNUNET_MessageHeader *) &smt[1];
659   env = GNUNET_MQ_msg_copy (mh);
660   GNUNET_MQ_notify_sent (env,
661                          &send_ack_cb,
662                          ap);
663   GNUNET_MQ_send (qh->mq,
664                   env);
665 }
666
667
668 /**
669  * (re)connect our communicator to the transport service
670  *
671  * @param ch handle to reconnect
672  */
673 static void
674 reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
675 {
676   struct GNUNET_MQ_MessageHandler handlers[] = {
677     GNUNET_MQ_hd_fixed_size (incoming_ack,
678                              GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK,
679                              struct GNUNET_TRANSPORT_IncomingMessageAck,
680                              ch),
681     GNUNET_MQ_hd_var_size (create_queue,
682                            GNUNET_MESSAGE_TYPE_TRANSPORT_CREATE_QUEUE,
683                            struct GNUNET_TRANSPORT_CreateQueue,
684                            ch),
685     GNUNET_MQ_hd_var_size (send_msg,
686                            GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG,
687                            struct GNUNET_TRANSPORT_SendMessageTo,
688                            ch),
689     GNUNET_MQ_handler_end()
690   };
691   
692   ch->mq = GNUNET_CLIENT_connect (cfg,
693                                   "transport",
694                                   handlers,
695                                   &error_handler,
696                                   ch);
697   for (struct GNUNET_TRANSPORT_AddressIdentifier *ai = ch->ai_head;
698        NULL != ai;
699        ai = ai->next)
700     send_add_address (ai);
701   for (struct GNUNET_TRANSPORT_QueueHandle *qh = ch->queue_head;
702        NULL != qh;
703        qh = qh->next)
704     send_add_queue (qh);
705 }
706
707
708 /**
709  * Connect to the transport service.
710  *
711  * @param cfg configuration to use
712  * @param name name of the communicator that is connecting
713  * @param mtu maximum message size supported by communicator, 0 if
714  *            sending is not supported, SIZE_MAX for no MTU
715  * @param mq_init function to call to initialize a message queue given
716  *                the address of another peer, can be NULL if the
717  *                communicator only supports receiving messages
718  * @param mq_init_cls closure for @a mq_init
719  * @return NULL on error
720  */
721 struct GNUNET_TRANSPORT_CommunicatorHandle *
722 GNUNET_TRANSPORT_communicator_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
723                                        const char *name,
724                                        size_t mtu,
725                                        GNUNET_TRANSPORT_CommunicatorMqInit mq_init,
726                                        void *mq_init_cls)
727 {
728   struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
729   
730   ch = GNUNET_new (struct GNUNET_TRANSPORT_CommunicatorHandle);
731   ch->cfg = cfg;
732   ch->name = name;
733   ch->mtu = mtu;
734   ch->mq_init = mq_init;
735   ch->mq_init_cls = mq_init_cls;
736   reconnect (ch);
737   if (GNUNET_OK !=
738       GNUNET_CONFIGURATION_get_value_number (cfg,
739                                              name,
740                                              "MAX_QUEUE_LENGTH",
741                                              &ch->max_queue_length))
742     ch->max_queue_length = DEFAULT_MAX_QUEUE_LENGTH;
743   if (NULL == ch->mq)
744   {
745     GNUNET_free (ch);
746     return NULL;
747   }
748   return ch;
749 }
750
751
752 /**
753  * Disconnect from the transport service.
754  *
755  * @param ch handle returned from connect
756  */
757 void
758 GNUNET_TRANSPORT_communicator_disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
759 {
760   disconnect (ch);
761   while (NULL != ch->ai_head)
762   {
763     GNUNET_break (0); /* communicator forgot to remove address, warn! */
764     GNUNET_TRANSPORT_communicator_address_remove (ch->ai_head);
765   }
766   GNUNET_free (ch);
767 }
768
769
770 /* ************************* Receiving *************************** */
771
772
773 /**
774  * Notify transport service that the communicator has received
775  * a message.
776  *
777  * @param ch connection to transport service
778  * @param sender presumed sender of the message (details to be checked
779  *        by higher layers)
780  * @param msg the message
781  * @param cb function to call once handling the message is done, NULL if
782  *         flow control is not supported by this communicator
783  * @param cb_cls closure for @a cb
784  * @return #GNUNET_OK if all is well, #GNUNET_NO if the message was
785  *         immediately dropped due to memory limitations (communicator
786  *         should try to apply back pressure),
787  *         #GNUNET_SYSERR if the message could not be delivered because
788  *         the tranport service is not yet up
789  */
790 int
791 GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
792                                        const struct GNUNET_PeerIdentity *sender,
793                                        const struct GNUNET_MessageHeader *msg,
794                                        GNUNET_TRANSPORT_MessageCompletedCallback cb,
795                                        void *cb_cls)
796 {
797   struct GNUNET_MQ_Envelope *env;
798   struct GNUNET_TRANSPORT_IncomingMessage *im;
799   uint16_t msize;
800   
801   if (NULL == ai->ch->mq)
802     return GNUNET_SYSERR;
803   if (NULL != cb)
804   {
805     struct FlowControl *fc;
806
807     im->fc_on = htonl (GNUNET_YES);
808     im->fc_id = ai->ch->fc_gen++;
809     fc = GNUNET_new (struct FlowControl);
810     fc->sender = *sender;
811     fc->id = im->fc_id;
812     fc->cb = cb;
813     fc->cb_cls = cb_cls;
814     GNUNET_CONTAINER_DLL_insert (ch->fc_head,
815                                  ch->fc_tail,
816                                  fc);
817   }
818   else
819   {
820     if (GNUNET_MQ_get_length (ch->mq) >= ch->max_queue_length)
821     {
822       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
823                   "Dropping message: transprot is too slow, queue length %u exceeded\n",
824                   ch->max_queue_length);
825       return GNUNET_NO;
826     }
827   }
828   
829   msize = ntohs (msg->size);
830   env = GNUNET_MQ_msg_extra (im,
831                              msize,
832                              GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG);
833   if (NULL == env)
834   {
835     GNUNET_break (0);
836     return GNUNET_SYSERR;
837   }
838   im->sender = *sender;
839   memcpy (&im[1],
840           msg,
841           msize);
842   GNUNET_MQ_send (ai->ch->mq,
843                   env);
844   return GNUNET_OK;
845 }
846
847
848 /* ************************* Discovery *************************** */
849
850
851 /**
852  * Notify transport service that an MQ became available due to an
853  * "inbound" connection or because the communicator discovered the
854  * presence of another peer.
855  *
856  * @param ch connection to transport service
857  * @param peer peer with which we can now communicate
858  * @param address address in human-readable format, 0-terminated, UTF-8
859  * @param nt which network type does the @a address belong to?
860  * @param mq message queue of the @a peer
861  * @return API handle identifying the new MQ
862  */
863 struct GNUNET_TRANSPORT_QueueHandle *
864 GNUNET_TRANSPORT_communicator_mq_add (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
865                                       const struct GNUNET_PeerIdentity *peer,
866                                       const char *address,
867                                       enum GNUNET_ATS_Network_Type nt,
868                                       struct GNUNET_MQ_Handle *mq)
869 {
870   struct GNUNET_TRANSPORT_QueueHandle *qh;
871
872   qh = GNUNET_new (struct GNUNET_TRANSPORT_QueueHandle);
873   qh->ch = ch;
874   qh->peer = *peer;
875   qh->address = GNUNET_strdup (address);
876   qh->nt = nt;
877   qh->mq = mq;
878   qh->queue_id = ch->queue_gen++;
879   GNUNET_CONTAINER_DLL_insert (ch->queue_head,
880                                ch->queue_tail,
881                                qh);
882   send_add_queue (qh);
883   return qh;
884 }
885
886
887 /**
888  * Notify transport service that an MQ became unavailable due to a
889  * disconnect or timeout.
890  *
891  * @param qh handle for the queue that must be invalidated
892  */
893 void
894 GNUNET_TRANSPORT_communicator_mq_del (struct GNUNET_TRANSPORT_QueueHandle *qh)
895 {
896   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = qh->ch;
897   
898   send_del_queue (qh);
899   GNUNET_CONTAINER_DLL_remove (ch->queue_head,
900                                ch->queue_tail,
901                                qh);
902   GNUNET_MQ_destroy (qh->mq);
903   GNUNET_free (qh->address);
904   GNUNET_free (qh);
905 }
906
907
908 /**
909  * Notify transport service about an address that this communicator
910  * provides for this peer.
911  *
912  * @param ch connection to transport service
913  * @param address our address in human-readable format, 0-terminated, UTF-8
914  * @param nt which network type does the address belong to?
915  * @param expiration when does the communicator forsee this address expiring?
916  */
917 struct GNUNET_TRANSPORT_AddressIdentifier *
918 GNUNET_TRANSPORT_communicator_address_add (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
919                                            const char *address,
920                                            enum GNUNET_ATS_Network_Type nt,
921                                            struct GNUNET_TIME_Relative expiration)
922 {
923   struct GNUNET_TRANSPORT_AddressIdentifier *ai;
924
925   ai = GNUNET_new (struct GNUNET_TRANSPORT_AddressIdentifier);
926   ai->ch = ch;
927   ai->address = GNUNET_strdup (address);
928   ai->nt = nt;
929   ai->expiration = expiration;
930   ai->aid = handle->aid_gen++;
931   GNUNET_CONTAINER_DLL_insert (handle->ai_head,
932                                handle->ai_tail,
933                                ai);
934   send_add_address (ai);
935   return ai;
936 }
937
938
939 /**
940  * Notify transport service about an address that this communicator no
941  * longer provides for this peer.
942  *
943  * @param ai address that is no longer provided
944  */
945 void
946 GNUNET_TRANSPORT_communicator_address_remove (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
947 {
948   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ai->ch;
949
950   send_del_address (ai);
951   GNUNET_CONTAINER_DLL_remove (ch->ai_head,
952                                ch->ai_tail,
953                                ai);
954   GNUNET_free (ai->address);
955   GNUNET_free (ai);
956 }
957
958
959 /* end of transport_api2_communication.c */