make copy of transport_api_core.c
[oweals/gnunet.git] / src / transport / transport_api2_communication.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2018 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14     
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 /**
20  * @file transport/transport_api2_communication.c
21  * @brief implementation of the gnunet_transport_communication_service.h API
22  * @author Christian Grothoff
23  */
24 #include "platform.h"
25 #include "gnunet_util_lib.h"
26 #include "gnunet_protocols.h"
27 #include "gnunet_transport_communication_service.h"
28 #include "transport.h"
29
30
31 /**
32  * How many messages do we keep at most in the queue to the 
33  * transport service before we start to drop (default, 
34  * can be changed via the configuration file).
35  */
36 #define DEFAULT_MAX_QUEUE_LENGTH 16
37
38
39 /**
40  * Information we track per packet to enable flow control.
41  */
42 struct FlowControl
43 {
44   /**
45    * Kept in a DLL.
46    */
47   struct FlowControl *next;
48
49   /**
50    * Kept in a DLL.
51    */
52   struct FlowControl *prev;
53
54   /**
55    * Function to call once the message was processed.
56    */
57   GNUNET_TRANSPORT_MessageCompletedCallback cb;
58
59   /**
60    * Closure for @e cb
61    */
62   void *cb_cls;
63   
64   /**
65    * Which peer is this about?
66    */
67   struct GNUNET_PeerIdentity sender;
68
69   /**
70    * More-or-less unique ID for the message.
71    */
72   uint64_t id;
73 };
74
75
76 /**
77  * Information we track per message to tell the transport about
78  * success or failures.
79  */
80 struct AckPending
81 {
82   /**
83    * Kept in a DLL.
84    */
85   struct AckPending *next;
86
87   /**
88    * Kept in a DLL.
89    */
90   struct AckPending *prev;
91
92   /**
93    * Communicator this entry belongs to.
94    */
95   struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
96   
97   /**
98    * Which peer is this about?
99    */
100   struct GNUNET_PeerIdentity receiver;
101
102   /**
103    * More-or-less unique ID for the message.
104    */
105   uint64_t mid;
106 };
107
108
109 /**
110  * Opaque handle to the transport service for communicators.
111  */
112 struct GNUNET_TRANSPORT_CommunicatorHandle
113 {
114   /**
115    * Head of DLL of addresses this communicator offers to the transport service.
116    */
117   struct GNUNET_TRANSPORT_AddressIdentifier *ai_head;
118
119   /**
120    * Tail of DLL of addresses this communicator offers to the transport service.
121    */
122   struct GNUNET_TRANSPORT_AddressIdentifier *ai_tail;
123
124   /**
125    * DLL of messages awaiting flow control confirmation (ack).
126    */
127   struct FlowControl *fc_head;
128
129   /**
130    * DLL of messages awaiting flow control confirmation (ack).
131    */
132   struct FlowControl *fc_tail;
133
134   /**
135    * DLL of messages awaiting transmission confirmation (ack).
136    */
137   struct AckPending *ap_head;
138
139   /**
140    * DLL of messages awaiting transmission confirmation (ack).
141    */
142   struct AckPending *ap_tail;
143
144   /**
145    * DLL of queues we offer.
146    */
147   struct GNUNET_TRANSPORT_QueueHandle *queue_head;
148   
149   /**
150    * DLL of queues we offer.
151    */
152   struct GNUNET_TRANSPORT_QueueHandle *queue_tail;
153     
154   /**
155    * Our configuration.
156    */
157   const struct GNUNET_CONFIGURATION_Handle *cfg;
158
159   /**
160    * Config section to use.
161    */
162   const char *config_section;
163
164   /**
165    * Address prefix to use.
166    */
167   const char *addr_prefix;
168
169   /**
170    * Function to call when the transport service wants us to initiate
171    * a communication channel with another peer.
172    */
173   GNUNET_TRANSPORT_CommunicatorMqInit mq_init;
174
175   /**
176    * Closure for @e mq_init.
177    */
178   void *mq_init_cls;
179
180   /**
181    * Queue to talk to the transport service.
182    */
183   struct GNUNET_MQ_Handle *mq;
184   
185   /**
186    * Maximum permissable queue length.
187    */
188   unsigned long long max_queue_length;
189   
190   /**
191    * Flow-control identifier generator.
192    */
193   uint64_t fc_gen;
194
195   /**
196    * MTU of the communicator
197    */
198   size_t mtu;
199   
200   /**
201    * Internal UUID for the address used in communication with the
202    * transport service.
203    */
204   uint32_t aid_gen;
205
206   /**
207    * Queue identifier generator.
208    */
209   uint32_t queue_gen;
210   
211 };
212
213
214 /**
215  * Handle returned to identify the internal data structure the transport
216  * API has created to manage a message queue to a particular peer.
217  */
218 struct GNUNET_TRANSPORT_QueueHandle
219 {
220
221   /**
222    * Kept in a DLL.
223    */
224   struct GNUNET_TRANSPORT_QueueHandle *next;
225
226   /**
227    * Kept in a DLL.
228    */
229   struct GNUNET_TRANSPORT_QueueHandle *prev;
230   
231   /**
232    * Handle this queue belongs to.
233    */
234   struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
235
236   /**
237    * Which peer we can communciate with.
238    */
239   struct GNUNET_PeerIdentity peer;
240
241   /**
242    * Address used by the communication queue.
243    */ 
244   char *address;
245
246   /**
247    * Network type of the communciation queue.
248    */
249   enum GNUNET_ATS_Network_Type nt;
250
251   /**
252    * The queue itself.
253    */ 
254   struct GNUNET_MQ_Handle *mq;
255
256   /**
257    * ID for this queue when talking to the transport service.
258    */
259   uint32_t queue_id;
260   
261 };
262
263
264 /**
265  * Internal representation of an address a communicator is
266  * currently providing for the transport service.
267  */
268 struct GNUNET_TRANSPORT_AddressIdentifier
269 {
270
271   /**
272    * Kept in a DLL.
273    */
274   struct GNUNET_TRANSPORT_AddressIdentifier *next;
275
276   /**
277    * Kept in a DLL.
278    */
279   struct GNUNET_TRANSPORT_AddressIdentifier *prev;
280   
281   /**
282    * Transport handle where the address was added.
283    */
284   struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
285
286   /**
287    * The actual address.
288    */
289   char *address;
290
291   /**
292    * When does the address expire? (Expected lifetime of the
293    * address.)
294    */
295   struct GNUNET_TIME_Relative expiration;
296   
297   /**
298    * Internal UUID for the address used in communication with the
299    * transport service.
300    */
301   uint32_t aid;
302
303   /**
304    * Network type for the address.
305    */
306   enum GNUNET_ATS_Network_Type nt;
307   
308 };
309
310
311 /**
312  * (re)connect our communicator to the transport service
313  *
314  * @param ch handle to reconnect
315  */
316 static void
317 reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch);
318
319
320 /**
321  * Send message to the transport service about address @a ai
322  * being now available.
323  *
324  * @param ai address to add
325  */
326 static void
327 send_add_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
328 {
329   struct GNUNET_MQ_Envelope *env;
330   struct GNUNET_TRANSPORT_AddAddressMessage *aam;
331   
332   if (NULL == ai->ch->mq)
333     return;
334   env = GNUNET_MQ_msg_extra (aam,
335                              strlen (ai->address) + 1,
336                              GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS);
337   aam->expiration = GNUNET_TIME_relative_hton (ai->expiration);
338   aam->nt = htonl ((uint32_t) ai->nt);
339   memcpy (&aam[1],
340           ai->address,
341           strlen (ai->address) + 1);
342   GNUNET_MQ_send (ai->ch->mq,
343                   env);
344 }
345
346
347 /**
348  * Send message to the transport service about address @a ai
349  * being no longer available.
350  *
351  * @param ai address to delete
352  */
353 static void
354 send_del_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
355 {
356   struct GNUNET_MQ_Envelope *env;
357   struct GNUNET_TRANSPORT_DelAddressMessage *dam;
358   
359   if (NULL == ai->ch->mq)
360     return;
361   env = GNUNET_MQ_msg (dam,                          
362                        GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS);
363   dam->aid = htonl (ai->aid);
364   GNUNET_MQ_send (ai->ch->mq,
365                   env);
366 }
367
368
369 /**
370  * Send message to the transport service about queue @a qh
371  * being now available.
372  *
373  * @param qh queue to add
374  */
375 static void
376 send_add_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
377 {
378   struct GNUNET_MQ_Envelope *env;
379   struct GNUNET_TRANSPORT_AddQueueMessage *aqm;
380   
381   if (NULL == qh->ch->mq)
382     return;
383   env = GNUNET_MQ_msg_extra (aqm,
384                              strlen (qh->address) + 1,
385                              GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP);
386   aqm->receiver = qh->peer;
387   aqm->nt = htonl ((uint32_t) qh->nt);
388   aqm->qid = htonl (qh->queue_id);
389   memcpy (&aqm[1],
390           qh->address,
391           strlen (qh->address) + 1);
392   GNUNET_MQ_send (qh->ch->mq,
393                   env);
394 }
395
396
397 /**
398  * Send message to the transport service about queue @a qh
399  * being no longer available.
400  *
401  * @param qh queue to delete
402  */
403 static void
404 send_del_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
405 {
406   struct GNUNET_MQ_Envelope *env;
407   struct GNUNET_TRANSPORT_DelQueueMessage *dqm;
408   
409   if (NULL == qh->ch->mq)
410     return;
411   env = GNUNET_MQ_msg (dqm,                          
412                        GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN);
413   dqm->qid = htonl (qh->queue_id);
414   dqm->receiver = qh->peer;
415   GNUNET_MQ_send (qh->ch->mq,
416                   env);
417 }
418
419
420 /**
421  * Disconnect from the transport service.  Purges
422  * all flow control entries as we will no longer receive
423  * the ACKs.  Purges the ack pending entries as the
424  * transport will no longer expect the confirmations.
425  *
426  * @param ch service to disconnect from
427  */
428 static void
429 disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
430 {
431   struct FlowControl *fcn;
432   struct AckPending *apn;
433   
434   for (struct FlowControl *fc = ch->fc_head;
435        NULL != fc;
436        fc = fcn)
437   {
438     fcn = fc->next;
439     GNUNET_CONTAINER_DLL_remove (ch->fc_head,
440                                  ch->fc_tail,
441                                  fc);
442     fc->cb (fc->cb_cls,
443             GNUNET_SYSERR);
444     GNUNET_free (fc);
445   }
446   for (struct AckPending *ap = ch->ap_head;
447        NULL != ap;
448        ap = apn)
449   {
450     apn = ap->next;
451     GNUNET_CONTAINER_DLL_remove (ch->ap_head,
452                                  ch->ap_tail,
453                                  ap);
454     GNUNET_free (ap);
455   }
456   if (NULL == ch->mq)
457     return;
458   GNUNET_MQ_destroy (ch->mq);
459   ch->mq = NULL;
460 }
461
462
463 /**
464  * Function called on MQ errors.
465  */
466 static void
467 error_handler (void *cls,
468                enum GNUNET_MQ_Error error)
469 {
470   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
471
472   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
473               "MQ failure %d, reconnecting to transport service.\n",
474               error);
475   disconnect (ch);
476   /* TODO: maybe do this with exponential backoff/delay */
477   reconnect (ch);
478 }
479
480
481 /**
482  * Transport service acknowledged a message we gave it
483  * (with flow control enabled). Tell the communicator.
484  *
485  * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
486  * @param incoming_ack the ack
487  */
488 static void
489 handle_incoming_ack (void *cls,
490                      const struct GNUNET_TRANSPORT_IncomingMessageAck *incoming_ack)
491 {
492   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
493   
494   for (struct FlowControl *fc = ch->fc_head;
495        NULL != fc;
496        fc = fc->next)
497   {
498     if ( (fc->id == incoming_ack->fc_id) &&
499          (0 == memcmp (&fc->sender,
500                        &incoming_ack->sender,
501                        sizeof (struct GNUNET_PeerIdentity))) )
502     {
503       GNUNET_CONTAINER_DLL_remove (ch->fc_head,
504                                    ch->fc_tail,
505                                    fc);
506       fc->cb (fc->cb_cls,
507               GNUNET_OK);
508       GNUNET_free (fc);
509       return;
510     }
511   }
512   GNUNET_break (0);
513   disconnect (ch);
514   /* TODO: maybe do this with exponential backoff/delay */
515   reconnect (ch);
516 }
517
518
519 /**
520  * Transport service wants us to create a queue. Check if @a cq
521  * is well-formed.
522  *
523  * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
524  * @param cq the queue creation request
525  * @return #GNUNET_OK if @a smt is well-formed
526  */
527 static int
528 check_create_queue (void *cls,
529                     const struct GNUNET_TRANSPORT_CreateQueue *cq)
530 {
531   uint16_t len = ntohs (cq->header.size) - sizeof (*cq);
532   const char *addr = (const char *) &cq[1];
533
534   (void) cls;
535   if ( (0 == len) ||
536        ('\0' != addr[len-1]) )
537   {
538     GNUNET_break (0);
539     return GNUNET_SYSERR;
540   }
541   return GNUNET_OK;
542 }
543
544
545 /**
546  * Transport service wants us to create a queue. Tell the communicator.
547  *
548  * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
549  * @param cq the queue creation request
550  */
551 static void
552 handle_create_queue (void *cls,
553                      const struct GNUNET_TRANSPORT_CreateQueue *cq)
554 {
555   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
556   const char *addr = (const char *) &cq[1];
557   struct GNUNET_TRANSPORT_CreateQueueResponse *cqr;
558   struct GNUNET_MQ_Envelope *env;
559   
560   if (GNUNET_OK !=
561       ch->mq_init (ch->mq_init_cls,
562                    &cq->receiver,
563                    addr))
564   {
565     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
566                 "Address `%s' invalid for this communicator\n",
567                 addr);
568     env = GNUNET_MQ_msg (cqr,
569                          GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL);      
570   }
571   else
572   {
573     env = GNUNET_MQ_msg (cqr,
574                          GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK);      
575   }
576   cqr->request_id = cq->request_id;
577   GNUNET_MQ_send (ch->mq,
578                   env);
579 }
580
581
582 /**
583  * Transport service wants us to send a message. Check if @a smt
584  * is well-formed.
585  *
586  * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
587  * @param smt the transmission request
588  * @return #GNUNET_OK if @a smt is well-formed
589  */
590 static int
591 check_send_msg (void *cls,
592                 const struct GNUNET_TRANSPORT_SendMessageTo *smt)
593 {
594   uint16_t len = ntohs (smt->header.size) - sizeof (*smt);
595   const struct GNUNET_MessageHeader *mh = (const struct GNUNET_MessageHeader *) &smt[1];
596
597   (void) cls;
598   if (ntohs (mh->size) != len)
599   {
600     GNUNET_break (0);
601     return GNUNET_SYSERR;
602   }
603   return GNUNET_OK;
604 }
605
606
607 /**
608  * Notify transport service about @a status of a message with
609  * @a mid sent to @a receiver.
610  *
611  * @param ch handle
612  * @param status #GNUNET_OK on success, #GNUNET_SYSERR on failure
613  * @param receiver which peer was the receiver
614  * @param mid message that the ack is about
615  */
616 static void
617 send_ack (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
618           int status,
619           const struct GNUNET_PeerIdentity *receiver,
620           uint64_t mid)
621 {
622   struct GNUNET_MQ_Envelope *env;
623   struct GNUNET_TRANSPORT_SendMessageToAck *ack;
624
625   env = GNUNET_MQ_msg (ack,
626                        GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK);
627   ack->status = htonl (status);
628   ack->mid = mid;
629   ack->receiver = *receiver;
630   GNUNET_MQ_send (ch->mq,
631                   env);
632 }
633
634
635 /**
636  * Message queue transmission by communicator was successful,
637  * notify transport service.
638  *
639  * @param cls an `struct AckPending *`
640  */
641 static void
642 send_ack_cb (void *cls)
643 {
644   struct AckPending *ap = cls;
645   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ap->ch;
646
647   GNUNET_CONTAINER_DLL_remove (ch->ap_head,
648                                ch->ap_tail,
649                                ap);
650   send_ack (ch,
651             GNUNET_OK,
652             &ap->receiver,
653             ap->mid);
654   GNUNET_free (ap);
655 }
656
657
658 /**
659  * Transport service wants us to send a message. Tell the communicator.
660  *
661  * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
662  * @param smt the transmission request
663  */
664 static void
665 handle_send_msg (void *cls,
666                  const struct GNUNET_TRANSPORT_SendMessageTo *smt)
667 {
668   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
669   const struct GNUNET_MessageHeader *mh;
670   struct GNUNET_MQ_Envelope *env;
671   struct AckPending *ap;
672   struct GNUNET_TRANSPORT_QueueHandle *qh;
673
674   for (qh = ch->queue_head;NULL != qh; qh = qh->next)  
675     if ( (qh->queue_id == smt->qid) &&
676          (0 == memcmp (&qh->peer,
677                        &smt->receiver,
678                        sizeof (struct GNUNET_PeerIdentity))) )
679       break;  
680   if (NULL == qh)
681   {
682     /* queue is already gone, tell transport this one failed */
683     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
684                 "Transmission failed, queue no longer exists.\n");
685     send_ack (ch,
686               GNUNET_NO,
687               &smt->receiver,
688               smt->mid);
689     return;
690   }
691   ap = GNUNET_new (struct AckPending);
692   ap->ch = ch;
693   ap->receiver = smt->receiver;
694   ap->mid = smt->mid;
695   GNUNET_CONTAINER_DLL_insert (ch->ap_head,
696                                ch->ap_tail,
697                                ap);
698   mh = (const struct GNUNET_MessageHeader *) &smt[1];
699   env = GNUNET_MQ_msg_copy (mh);
700   GNUNET_MQ_notify_sent (env,
701                          &send_ack_cb,
702                          ap);
703   GNUNET_MQ_send (qh->mq,
704                   env);
705 }
706
707
708 /**
709  * (re)connect our communicator to the transport service
710  *
711  * @param ch handle to reconnect
712  */
713 static void
714 reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
715 {
716   struct GNUNET_MQ_MessageHandler handlers[] = {
717     GNUNET_MQ_hd_fixed_size (incoming_ack,
718                              GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK,
719                              struct GNUNET_TRANSPORT_IncomingMessageAck,
720                              ch),
721     GNUNET_MQ_hd_var_size (create_queue,
722                            GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE,
723                            struct GNUNET_TRANSPORT_CreateQueue,
724                            ch),
725     GNUNET_MQ_hd_var_size (send_msg,
726                            GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG,
727                            struct GNUNET_TRANSPORT_SendMessageTo,
728                            ch),
729     GNUNET_MQ_handler_end()
730   };
731   struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam;
732   struct GNUNET_MQ_Envelope *env;
733   
734   ch->mq = GNUNET_CLIENT_connect (ch->cfg,
735                                   "transport",
736                                   handlers,
737                                   &error_handler,
738                                   ch);
739   if (NULL == ch->mq)
740     return;
741   env = GNUNET_MQ_msg_extra (cam,                            
742                              strlen (ch->addr_prefix) + 1,
743                              GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR);
744   memcpy (&cam[1],
745           ch->addr_prefix,
746           strlen (ch->addr_prefix) + 1);
747   GNUNET_MQ_send (ch->mq,
748                   env);
749   for (struct GNUNET_TRANSPORT_AddressIdentifier *ai = ch->ai_head;
750        NULL != ai;
751        ai = ai->next)
752     send_add_address (ai);
753   for (struct GNUNET_TRANSPORT_QueueHandle *qh = ch->queue_head;
754        NULL != qh;
755        qh = qh->next)
756     send_add_queue (qh);
757 }
758
759
760 /**
761  * Connect to the transport service.
762  *
763  * @param cfg configuration to use
764  * @param config_section section of the configuration to use for options
765  * @param addr_prefix address prefix for addresses supported by this
766  *        communicator, could be NULL for incoming-only communicators
767  * @param mtu maximum message size supported by communicator, 0 if
768  *            sending is not supported, SIZE_MAX for no MTU
769  * @param mq_init function to call to initialize a message queue given
770  *                the address of another peer, can be NULL if the
771  *                communicator only supports receiving messages
772  * @param mq_init_cls closure for @a mq_init
773  * @return NULL on error
774  */
775 struct GNUNET_TRANSPORT_CommunicatorHandle *
776 GNUNET_TRANSPORT_communicator_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
777                                        const char *config_section,
778                                        const char *addr_prefix,
779                                        size_t mtu,
780                                        GNUNET_TRANSPORT_CommunicatorMqInit mq_init,
781                                        void *mq_init_cls)
782 {
783   struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
784   
785   ch = GNUNET_new (struct GNUNET_TRANSPORT_CommunicatorHandle);
786   ch->cfg = cfg;
787   ch->config_section = config_section;
788   ch->addr_prefix = addr_prefix;
789   ch->mtu = mtu;
790   ch->mq_init = mq_init;
791   ch->mq_init_cls = mq_init_cls;
792   reconnect (ch);
793   if (GNUNET_OK !=
794       GNUNET_CONFIGURATION_get_value_number (cfg,
795                                              config_section,
796                                              "MAX_QUEUE_LENGTH",
797                                              &ch->max_queue_length))
798     ch->max_queue_length = DEFAULT_MAX_QUEUE_LENGTH;
799   if (NULL == ch->mq)
800   {
801     GNUNET_free (ch);
802     return NULL;
803   }
804   return ch;
805 }
806
807
808 /**
809  * Disconnect from the transport service.
810  *
811  * @param ch handle returned from connect
812  */
813 void
814 GNUNET_TRANSPORT_communicator_disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
815 {
816   disconnect (ch);
817   while (NULL != ch->ai_head)
818   {
819     GNUNET_break (0); /* communicator forgot to remove address, warn! */
820     GNUNET_TRANSPORT_communicator_address_remove (ch->ai_head);
821   }
822   GNUNET_free (ch);
823 }
824
825
826 /* ************************* Receiving *************************** */
827
828
829 /**
830  * Notify transport service that the communicator has received
831  * a message.
832  *
833  * @param ch connection to transport service
834  * @param sender presumed sender of the message (details to be checked
835  *        by higher layers)
836  * @param msg the message
837  * @param cb function to call once handling the message is done, NULL if
838  *         flow control is not supported by this communicator
839  * @param cb_cls closure for @a cb
840  * @return #GNUNET_OK if all is well, #GNUNET_NO if the message was
841  *         immediately dropped due to memory limitations (communicator
842  *         should try to apply back pressure),
843  *         #GNUNET_SYSERR if the message could not be delivered because
844  *         the tranport service is not yet up
845  */
846 int
847 GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
848                                        const struct GNUNET_PeerIdentity *sender,
849                                        const struct GNUNET_MessageHeader *msg,
850                                        GNUNET_TRANSPORT_MessageCompletedCallback cb,
851                                        void *cb_cls)
852 {
853   struct GNUNET_MQ_Envelope *env;
854   struct GNUNET_TRANSPORT_IncomingMessage *im;
855   uint16_t msize;
856   
857   if (NULL == ch->mq)
858     return GNUNET_SYSERR;
859   if ( (NULL == cb) &&
860        (GNUNET_MQ_get_length (ch->mq) >= ch->max_queue_length) )
861   {
862     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
863                 "Dropping message: transprot is too slow, queue length %llu exceeded\n",
864                 ch->max_queue_length);
865     return GNUNET_NO;
866   }
867   
868   msize = ntohs (msg->size);
869   env = GNUNET_MQ_msg_extra (im,
870                              msize,
871                              GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG);
872   if (NULL == env)
873   {
874     GNUNET_break (0);
875     return GNUNET_SYSERR;
876   }
877   im->sender = *sender;
878   memcpy (&im[1],
879           msg,
880           msize);
881   if (NULL != cb)
882   {
883     struct FlowControl *fc;
884
885     im->fc_on = htonl (GNUNET_YES);
886     im->fc_id = ch->fc_gen++;
887     fc = GNUNET_new (struct FlowControl);
888     fc->sender = *sender;
889     fc->id = im->fc_id;
890     fc->cb = cb;
891     fc->cb_cls = cb_cls;
892     GNUNET_CONTAINER_DLL_insert (ch->fc_head,
893                                  ch->fc_tail,
894                                  fc);
895   }
896   GNUNET_MQ_send (ch->mq,
897                   env);
898   return GNUNET_OK;
899 }
900
901
902 /* ************************* Discovery *************************** */
903
904
905 /**
906  * Notify transport service that an MQ became available due to an
907  * "inbound" connection or because the communicator discovered the
908  * presence of another peer.
909  *
910  * @param ch connection to transport service
911  * @param peer peer with which we can now communicate
912  * @param address address in human-readable format, 0-terminated, UTF-8
913  * @param nt which network type does the @a address belong to?
914  * @param mq message queue of the @a peer
915  * @return API handle identifying the new MQ
916  */
917 struct GNUNET_TRANSPORT_QueueHandle *
918 GNUNET_TRANSPORT_communicator_mq_add (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
919                                       const struct GNUNET_PeerIdentity *peer,
920                                       const char *address,
921                                       enum GNUNET_ATS_Network_Type nt,
922                                       struct GNUNET_MQ_Handle *mq)
923 {
924   struct GNUNET_TRANSPORT_QueueHandle *qh;
925
926   qh = GNUNET_new (struct GNUNET_TRANSPORT_QueueHandle);
927   qh->ch = ch;
928   qh->peer = *peer;
929   qh->address = GNUNET_strdup (address);
930   qh->nt = nt;
931   qh->mq = mq;
932   qh->queue_id = ch->queue_gen++;
933   GNUNET_CONTAINER_DLL_insert (ch->queue_head,
934                                ch->queue_tail,
935                                qh);
936   send_add_queue (qh);
937   return qh;
938 }
939
940
941 /**
942  * Notify transport service that an MQ became unavailable due to a
943  * disconnect or timeout.
944  *
945  * @param qh handle for the queue that must be invalidated
946  */
947 void
948 GNUNET_TRANSPORT_communicator_mq_del (struct GNUNET_TRANSPORT_QueueHandle *qh)
949 {
950   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = qh->ch;
951   
952   send_del_queue (qh);
953   GNUNET_CONTAINER_DLL_remove (ch->queue_head,
954                                ch->queue_tail,
955                                qh);
956   GNUNET_MQ_destroy (qh->mq);
957   GNUNET_free (qh->address);
958   GNUNET_free (qh);
959 }
960
961
962 /**
963  * Notify transport service about an address that this communicator
964  * provides for this peer.
965  *
966  * @param ch connection to transport service
967  * @param address our address in human-readable format, 0-terminated, UTF-8
968  * @param nt which network type does the address belong to?
969  * @param expiration when does the communicator forsee this address expiring?
970  */
971 struct GNUNET_TRANSPORT_AddressIdentifier *
972 GNUNET_TRANSPORT_communicator_address_add (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
973                                            const char *address,
974                                            enum GNUNET_ATS_Network_Type nt,
975                                            struct GNUNET_TIME_Relative expiration)
976 {
977   struct GNUNET_TRANSPORT_AddressIdentifier *ai;
978
979   ai = GNUNET_new (struct GNUNET_TRANSPORT_AddressIdentifier);
980   ai->ch = ch;
981   ai->address = GNUNET_strdup (address);
982   ai->nt = nt;
983   ai->expiration = expiration;
984   ai->aid = ch->aid_gen++;
985   GNUNET_CONTAINER_DLL_insert (ch->ai_head,
986                                ch->ai_tail,
987                                ai);
988   send_add_address (ai);
989   return ai;
990 }
991
992
993 /**
994  * Notify transport service about an address that this communicator no
995  * longer provides for this peer.
996  *
997  * @param ai address that is no longer provided
998  */
999 void
1000 GNUNET_TRANSPORT_communicator_address_remove (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
1001 {
1002   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ai->ch;
1003
1004   send_del_address (ai);
1005   GNUNET_CONTAINER_DLL_remove (ch->ai_head,
1006                                ch->ai_tail,
1007                                ai);
1008   GNUNET_free (ai->address);
1009   GNUNET_free (ai);
1010 }
1011
1012
1013 /* end of transport_api2_communication.c */