make copy of transport_api_core.c
[oweals/gnunet.git] / src / transport / transport_api2_core.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009-2013, 2016 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14     
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 /**
20  * @file transport/transport_api_core.c
21  * @brief library to access the transport service for message exchange
22  * @author Christian Grothoff
23  */
24 #include "platform.h"
25 #include "gnunet_util_lib.h"
26 #include "gnunet_constants.h"
27 #include "gnunet_arm_service.h"
28 #include "gnunet_hello_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_transport_core_service.h"
31 #include "transport.h"
32
33 #define LOG(kind,...) GNUNET_log_from (kind, "transport-api-core",__VA_ARGS__)
34
35 /**
36  * If we could not send any payload to a peer for this amount of
37  * time, we print a warning.
38  */
39 #define UNREADY_WARN_TIME GNUNET_TIME_UNIT_MINUTES
40
41 /**
42  * How large to start with for the hashmap of neighbours.
43  */
44 #define STARTING_NEIGHBOURS_SIZE 16
45
46
47 /**
48  * Entry in hash table of all of our current (connected) neighbours.
49  */
50 struct Neighbour
51 {
52   /**
53    * Overall transport handle.
54    */
55   struct GNUNET_TRANSPORT_CoreHandle *h;
56
57   /**
58    * Active message queue for the peer.
59    */
60   struct GNUNET_MQ_Handle *mq;
61
62   /**
63    * Envelope with the message we are currently transmitting (or NULL).
64    */
65   struct GNUNET_MQ_Envelope *env;
66
67   /**
68    * Closure for @e mq handlers.
69    */
70   void *handlers_cls;
71
72   /**
73    * Identity of this neighbour.
74    */
75   struct GNUNET_PeerIdentity id;
76
77   /**
78    * Outbound bandwidh tracker.
79    */
80   struct GNUNET_BANDWIDTH_Tracker out_tracker;
81
82   /**
83    * Entry in our readyness heap (which is sorted by @e next_ready
84    * value).  NULL if there is no pending transmission request for
85    * this neighbour or if we're waiting for @e is_ready to become
86    * true AFTER the @e out_tracker suggested that this peer's quota
87    * has been satisfied (so once @e is_ready goes to #GNUNET_YES,
88    * we should immediately go back into the heap).
89    */
90   struct GNUNET_CONTAINER_HeapNode *hn;
91
92   /**
93    * Task to trigger MQ when we have enough bandwidth for the
94    * next transmission.
95    */
96   struct GNUNET_SCHEDULER_Task *timeout_task;
97
98   /**
99    * Sending consumed more bytes on wire than payload was announced
100    * This overhead is added to the delay of next sending operation
101    */
102   unsigned long long traffic_overhead;
103
104   /**
105    * Is this peer currently ready to receive a message?
106    */
107   int is_ready;
108
109   /**
110    * Size of the message in @e env.
111    */
112   uint16_t env_size;
113
114 };
115
116
117
118 /**
119  * Handle for the transport service (includes all of the
120  * state for the transport service).
121  */
122 struct GNUNET_TRANSPORT_CoreHandle
123 {
124
125   /**
126    * Closure for the callbacks.
127    */
128   void *cls;
129
130   /**
131    * Functions to call for received data (template for
132    * new message queues).
133    */
134   struct GNUNET_MQ_MessageHandler *handlers;
135
136   /**
137    * function to call on connect events
138    */
139   GNUNET_TRANSPORT_NotifyConnecT nc_cb;
140
141   /**
142    * function to call on disconnect events
143    */
144   GNUNET_TRANSPORT_NotifyDisconnecT nd_cb;
145
146   /**
147    * function to call on excess bandwidth events
148    */
149   GNUNET_TRANSPORT_NotifyExcessBandwidtH neb_cb;
150
151   /**
152    * My client connection to the transport service.
153    */
154   struct GNUNET_MQ_Handle *mq;
155
156   /**
157    * My configuration.
158    */
159   const struct GNUNET_CONFIGURATION_Handle *cfg;
160
161   /**
162    * Hash map of the current connected neighbours of this peer.
163    * Maps peer identities to `struct Neighbour` entries.
164    */
165   struct GNUNET_CONTAINER_MultiPeerMap *neighbours;
166
167   /**
168    * Peer identity as assumed by this process, or all zeros.
169    */
170   struct GNUNET_PeerIdentity self;
171
172   /**
173    * ID of the task trying to reconnect to the service.
174    */
175   struct GNUNET_SCHEDULER_Task *reconnect_task;
176
177   /**
178    * Delay until we try to reconnect.
179    */
180   struct GNUNET_TIME_Relative reconnect_delay;
181
182   /**
183    * Should we check that @e self matches what the service thinks?
184    * (if #GNUNET_NO, then @e self is all zeros!).
185    */
186   int check_self;
187
188 };
189
190
191 /**
192  * Function that will schedule the job that will try
193  * to connect us again to the client.
194  *
195  * @param h transport service to reconnect
196  */
197 static void
198 disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_CoreHandle *h);
199
200
201 /**
202  * Get the neighbour list entry for the given peer
203  *
204  * @param h our context
205  * @param peer peer to look up
206  * @return NULL if no such peer entry exists
207  */
208 static struct Neighbour *
209 neighbour_find (struct GNUNET_TRANSPORT_CoreHandle *h,
210                 const struct GNUNET_PeerIdentity *peer)
211 {
212   return GNUNET_CONTAINER_multipeermap_get (h->neighbours,
213                                             peer);
214 }
215
216
217 /**
218  * Function called by the bandwidth tracker if we have excess
219  * bandwidth.
220  *
221  * @param cls the `struct Neighbour` that has excess bandwidth
222  */
223 static void
224 notify_excess_cb (void *cls)
225 {
226   struct Neighbour *n = cls;
227   struct GNUNET_TRANSPORT_CoreHandle *h = n->h;
228
229   LOG (GNUNET_ERROR_TYPE_DEBUG,
230        "Notifying CORE that more bandwidth is available for %s\n",
231        GNUNET_i2s (&n->id));
232
233   if (NULL != h->neb_cb)
234     h->neb_cb (h->cls,
235                &n->id,
236                n->handlers_cls);
237 }
238
239
240 /**
241  * Iterator over hash map entries, for deleting state of a neighbour.
242  *
243  * @param cls the `struct GNUNET_TRANSPORT_CoreHandle *`
244  * @param key peer identity
245  * @param value value in the hash map, the neighbour entry to delete
246  * @return #GNUNET_YES if we should continue to
247  *         iterate,
248  *         #GNUNET_NO if not.
249  */
250 static int
251 neighbour_delete (void *cls,
252                   const struct GNUNET_PeerIdentity *key,
253                   void *value)
254 {
255   struct GNUNET_TRANSPORT_CoreHandle *handle = cls;
256   struct Neighbour *n = value;
257
258   LOG (GNUNET_ERROR_TYPE_DEBUG,
259        "Dropping entry for neighbour `%s'.\n",
260        GNUNET_i2s (key));
261   GNUNET_BANDWIDTH_tracker_notification_stop (&n->out_tracker);
262   if (NULL != handle->nd_cb)
263     handle->nd_cb (handle->cls,
264                    &n->id,
265                    n->handlers_cls);
266   if (NULL != n->timeout_task)
267   {
268     GNUNET_SCHEDULER_cancel (n->timeout_task);
269     n->timeout_task = NULL;
270   }
271   if (NULL != n->env)
272   {
273     GNUNET_MQ_send_cancel (n->env);
274     n->env = NULL;
275   }
276   GNUNET_MQ_destroy (n->mq);
277   GNUNET_assert (NULL == n->mq);
278   GNUNET_assert (GNUNET_YES ==
279                  GNUNET_CONTAINER_multipeermap_remove (handle->neighbours,
280                                                        key,
281                                                        n));
282   GNUNET_free (n);
283   return GNUNET_YES;
284 }
285
286
287 /**
288  * Generic error handler, called with the appropriate
289  * error code and the same closure specified at the creation of
290  * the message queue.
291  * Not every message queue implementation supports an error handler.
292  *
293  * @param cls closure with the `struct GNUNET_TRANSPORT_CoreHandle *`
294  * @param error error code
295  */
296 static void
297 mq_error_handler (void *cls,
298                   enum GNUNET_MQ_Error error)
299 {
300   struct GNUNET_TRANSPORT_CoreHandle *h = cls;
301
302   LOG (GNUNET_ERROR_TYPE_DEBUG,
303        "Error receiving from transport service, disconnecting temporarily.\n");
304   disconnect_and_schedule_reconnect (h);
305 }
306
307
308 /**
309  * Function we use for checking incoming HELLO messages.
310  *
311  * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *`
312  * @param msg message received
313  * @return #GNUNET_OK if message is well-formed
314  */
315 static int
316 check_hello (void *cls,
317              const struct GNUNET_MessageHeader *msg)
318 {
319   struct GNUNET_PeerIdentity me;
320
321   if (GNUNET_OK !=
322       GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg,
323                            &me))
324   {
325     GNUNET_break (0);
326     return GNUNET_SYSERR;
327   }
328   return GNUNET_OK;
329 }
330
331
332 /**
333  * Function we use for handling incoming HELLO messages.
334  *
335  * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *`
336  * @param msg message received
337  */
338 static void
339 handle_hello (void *cls,
340               const struct GNUNET_MessageHeader *msg)
341 {
342   /* we do not care => FIXME: signal in options to NEVER send HELLOs! */
343 }
344
345
346 /**
347  * A message from the handler's message queue to a neighbour was
348  * transmitted.  Now trigger (possibly delayed) notification of the
349  * neighbour's message queue that we are done and thus ready for
350  * the next message.
351  *
352  * @param cls the `struct Neighbour` where the message was sent
353  */
354 static void
355 notify_send_done_fin (void *cls)
356 {
357   struct Neighbour *n = cls;
358
359   n->timeout_task = NULL;
360   n->is_ready = GNUNET_YES;
361   GNUNET_MQ_impl_send_continue (n->mq);
362 }
363
364
365 /**
366  * A message from the handler's message queue to a neighbour was
367  * transmitted.  Now trigger (possibly delayed) notification of the
368  * neighbour's message queue that we are done and thus ready for
369  * the next message.
370  *
371  * @param cls the `struct Neighbour` where the message was sent
372  */
373 static void
374 notify_send_done (void *cls)
375 {
376   struct Neighbour *n = cls;
377   struct GNUNET_TIME_Relative delay;
378
379   n->timeout_task = NULL;
380   if (NULL != n->env)
381   {
382     GNUNET_BANDWIDTH_tracker_consume (&n->out_tracker,
383                                       n->env_size + n->traffic_overhead);
384     n->env = NULL;
385     n->traffic_overhead = 0;
386   }
387   delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
388                                               128);
389   if (0 == delay.rel_value_us)
390   {
391     n->is_ready = GNUNET_YES;
392     GNUNET_MQ_impl_send_continue (n->mq);
393     return;
394   }
395   GNUNET_MQ_impl_send_in_flight (n->mq);
396   /* cannot send even a small message without violating
397      quota, wait a before allowing MQ to send next message */
398   n->timeout_task = GNUNET_SCHEDULER_add_delayed (delay,
399                                                   &notify_send_done_fin,
400                                                   n);
401 }
402
403
404 /**
405  * Implement sending functionality of a message queue.
406  * Called one message at a time. Should send the @a msg
407  * to the transport service and then notify the queue
408  * once we are ready for the next one.
409  *
410  * @param mq the message queue
411  * @param msg the message to send
412  * @param impl_state state of the implementation
413  */
414 static void
415 mq_send_impl (struct GNUNET_MQ_Handle *mq,
416               const struct GNUNET_MessageHeader *msg,
417               void *impl_state)
418 {
419   struct Neighbour *n = impl_state;
420   struct GNUNET_TRANSPORT_CoreHandle *h = n->h;
421   struct OutboundMessage *obm;
422   uint16_t msize;
423
424   GNUNET_assert (GNUNET_YES == n->is_ready);
425   msize = ntohs (msg->size);
426   if (msize >= GNUNET_MAX_MESSAGE_SIZE - sizeof (*obm))
427   {
428     GNUNET_break (0);
429     GNUNET_MQ_impl_send_continue (mq);
430     return;
431   }
432   GNUNET_assert (NULL == n->env);
433   n->env = GNUNET_MQ_msg_nested_mh (obm,
434                                     GNUNET_MESSAGE_TYPE_TRANSPORT_SEND,
435                                     msg);
436   obm->reserved = htonl (0);
437   obm->timeout = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_MINUTES); /* FIXME: to be removed */
438   obm->peer = n->id;
439   GNUNET_assert (NULL == n->timeout_task);
440   n->is_ready = GNUNET_NO;
441   n->env_size = ntohs (msg->size);
442   GNUNET_MQ_notify_sent (n->env,
443                          &notify_send_done,
444                          n);
445   GNUNET_MQ_send (h->mq,
446                   n->env);
447   LOG (GNUNET_ERROR_TYPE_DEBUG,
448        "Queued message of type %u for neighbour `%s'.\n",
449        ntohs (msg->type),
450        GNUNET_i2s (&n->id));
451 }
452
453
454 /**
455  * Handle destruction of a message queue.  Implementations must not
456  * free @a mq, but should take care of @a impl_state.
457  *
458  * @param mq the message queue to destroy
459  * @param impl_state state of the implementation
460  */
461 static void
462 mq_destroy_impl (struct GNUNET_MQ_Handle *mq,
463                  void *impl_state)
464 {
465   struct Neighbour *n = impl_state;
466
467   GNUNET_assert (mq == n->mq);
468   n->mq = NULL;
469 }
470
471
472 /**
473  * Implementation function that cancels the currently sent message.
474  * Should basically undo whatever #mq_send_impl() did.
475  *
476  * @param mq message queue
477  * @param impl_state state specific to the implementation
478  */
479 static void
480 mq_cancel_impl (struct GNUNET_MQ_Handle *mq,
481                 void *impl_state)
482 {
483   struct Neighbour *n = impl_state;
484
485   GNUNET_assert (GNUNET_NO == n->is_ready);
486   if (NULL != n->env)
487   {
488     GNUNET_MQ_send_cancel (n->env);
489     n->env = NULL;
490   }
491
492   n->is_ready = GNUNET_YES;
493 }
494
495
496 /**
497  * We had an error processing a message we forwarded from a peer to
498  * the CORE service.  We should just complain about it but otherwise
499  * continue processing.
500  *
501  * @param cls closure
502  * @param error error code
503  */
504 static void
505 peer_mq_error_handler (void *cls,
506                        enum GNUNET_MQ_Error error)
507 {
508   /* struct Neighbour *n = cls; */
509
510   GNUNET_break_op (0);
511 }
512
513
514 /**
515  * The outbound quota has changed in a way that may require
516  * us to reset the timeout.  Update the timeout.
517  *
518  * @param cls the `struct Neighbour` for which the timeout changed
519  */
520 static void
521 outbound_bw_tracker_update (void *cls)
522 {
523   struct Neighbour *n = cls;
524   struct GNUNET_TIME_Relative delay;
525
526   if (NULL == n->timeout_task)
527     return;
528   delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
529                                               128);
530   GNUNET_SCHEDULER_cancel (n->timeout_task);
531   n->timeout_task = GNUNET_SCHEDULER_add_delayed (delay,
532                                                   &notify_send_done,
533                                                   n);
534 }
535
536
537 /**
538  * Function we use for handling incoming connect messages.
539  *
540  * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
541  * @param cim message received
542  */
543 static void
544 handle_connect (void *cls,
545                 const struct ConnectInfoMessage *cim)
546 {
547   struct GNUNET_TRANSPORT_CoreHandle *h = cls;
548   struct Neighbour *n;
549
550   LOG (GNUNET_ERROR_TYPE_DEBUG,
551        "Receiving CONNECT message for `%s' with quota %u\n",
552        GNUNET_i2s (&cim->id),
553        ntohl (cim->quota_out.value__));
554   n = neighbour_find (h, &cim->id);
555   if (NULL != n)
556   {
557     GNUNET_break (0);
558     disconnect_and_schedule_reconnect (h);
559     return;
560   }
561   n = GNUNET_new (struct Neighbour);
562   n->id = cim->id;
563   n->h = h;
564   n->is_ready = GNUNET_YES;
565   n->traffic_overhead = 0;
566   GNUNET_BANDWIDTH_tracker_init2 (&n->out_tracker,
567                                   &outbound_bw_tracker_update,
568                                   n,
569                                   GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
570                                   MAX_BANDWIDTH_CARRY_S,
571                                   &notify_excess_cb,
572                                   n);
573   GNUNET_assert (GNUNET_OK ==
574                  GNUNET_CONTAINER_multipeermap_put (h->neighbours,
575                                                     &n->id,
576                                                     n,
577                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
578
579   GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
580                                          cim->quota_out);
581   n->mq = GNUNET_MQ_queue_for_callbacks (&mq_send_impl,
582                                          &mq_destroy_impl,
583                                          &mq_cancel_impl,
584                                          n,
585                                          h->handlers,
586                                          &peer_mq_error_handler,
587                                          n);
588   if (NULL != h->nc_cb)
589   {
590     n->handlers_cls = h->nc_cb (h->cls,
591                                 &n->id,
592                                 n->mq);
593     GNUNET_MQ_set_handlers_closure (n->mq,
594                                     n->handlers_cls);
595   }
596 }
597
598
599 /**
600  * Function we use for handling incoming disconnect messages.
601  *
602  * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *`
603  * @param dim message received
604  */
605 static void
606 handle_disconnect (void *cls,
607                    const struct DisconnectInfoMessage *dim)
608 {
609   struct GNUNET_TRANSPORT_CoreHandle *h = cls;
610   struct Neighbour *n;
611
612   GNUNET_break (ntohl (dim->reserved) == 0);
613   LOG (GNUNET_ERROR_TYPE_DEBUG,
614        "Receiving DISCONNECT message for `%s'.\n",
615        GNUNET_i2s (&dim->peer));
616   n = neighbour_find (h, &dim->peer);
617   if (NULL == n)
618   {
619     GNUNET_break (0);
620     disconnect_and_schedule_reconnect (h);
621     return;
622   }
623   GNUNET_assert (GNUNET_YES ==
624                  neighbour_delete (h,
625                                    &dim->peer,
626                                    n));
627 }
628
629
630 /**
631  * Function we use for handling incoming send-ok messages.
632  *
633  * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *`
634  * @param okm message received
635  */
636 static void
637 handle_send_ok (void *cls,
638                 const struct SendOkMessage *okm)
639 {
640   struct GNUNET_TRANSPORT_CoreHandle *h = cls;
641   struct Neighbour *n;
642   uint32_t bytes_msg;
643   uint32_t bytes_physical;
644
645   bytes_msg = ntohl (okm->bytes_msg);
646   bytes_physical = ntohl (okm->bytes_physical);
647   LOG (GNUNET_ERROR_TYPE_DEBUG,
648        "Receiving SEND_OK message, transmission to %s %s.\n",
649        GNUNET_i2s (&okm->peer),
650        ntohl (okm->success) == GNUNET_OK ? "succeeded" : "failed");
651   n = neighbour_find (h,
652                       &okm->peer);
653   if (NULL == n)
654   {
655     /* We should never get a 'SEND_OK' for a peer that we are not
656        connected to */
657     GNUNET_break (0);
658     disconnect_and_schedule_reconnect (h);
659     return;
660   }
661   if (bytes_physical > bytes_msg)
662   {
663     LOG (GNUNET_ERROR_TYPE_DEBUG,
664          "Overhead for %u byte message was %u\n",
665          bytes_msg,
666          bytes_physical - bytes_msg);
667     n->traffic_overhead += bytes_physical - bytes_msg;
668   }
669 }
670
671
672 /**
673  * Function we use for checking incoming "inbound" messages.
674  *
675  * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *`
676  * @param im message received
677  */
678 static int
679 check_recv (void *cls,
680              const struct InboundMessage *im)
681 {
682   const struct GNUNET_MessageHeader *imm;
683   uint16_t size;
684
685   size = ntohs (im->header.size) - sizeof (*im);
686   if (size < sizeof (struct GNUNET_MessageHeader))
687   {
688     GNUNET_break (0);
689     return GNUNET_SYSERR;
690   }
691   imm = (const struct GNUNET_MessageHeader *) &im[1];
692   if (ntohs (imm->size) != size)
693   {
694     GNUNET_break (0);
695     return GNUNET_SYSERR;
696   }
697   return GNUNET_OK;
698 }
699
700
701 /**
702  * Function we use for handling incoming messages.
703  *
704  * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *`
705  * @param im message received
706  */
707 static void
708 handle_recv (void *cls,
709              const struct InboundMessage *im)
710 {
711   struct GNUNET_TRANSPORT_CoreHandle *h = cls;
712   const struct GNUNET_MessageHeader *imm
713     = (const struct GNUNET_MessageHeader *) &im[1];
714   struct Neighbour *n;
715
716   LOG (GNUNET_ERROR_TYPE_DEBUG,
717        "Received message of type %u with %u bytes from `%s'.\n",
718        (unsigned int) ntohs (imm->type),
719        (unsigned int) ntohs (imm->size),
720        GNUNET_i2s (&im->peer));
721   n = neighbour_find (h, &im->peer);
722   if (NULL == n)
723   {
724     GNUNET_break (0);
725     disconnect_and_schedule_reconnect (h);
726     return;
727   }
728   GNUNET_MQ_inject_message (n->mq,
729                             imm);
730 }
731
732
733 /**
734  * Function we use for handling incoming set quota messages.
735  *
736  * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *`
737  * @param msg message received
738  */
739 static void
740 handle_set_quota (void *cls,
741                   const struct QuotaSetMessage *qm)
742 {
743   struct GNUNET_TRANSPORT_CoreHandle *h = cls;
744   struct Neighbour *n;
745
746   n = neighbour_find (h,
747                       &qm->peer);
748   if (NULL == n)
749   {
750     GNUNET_break (0);
751     disconnect_and_schedule_reconnect (h);
752     return;
753   }
754   LOG (GNUNET_ERROR_TYPE_DEBUG,
755        "Receiving SET_QUOTA message for `%s' with quota %u\n",
756        GNUNET_i2s (&qm->peer),
757        ntohl (qm->quota.value__));
758   GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
759                                          qm->quota);
760 }
761
762
763 /**
764  * Try again to connect to transport service.
765  *
766  * @param cls the handle to the transport service
767  */
768 static void
769 reconnect (void *cls)
770 {
771   struct GNUNET_TRANSPORT_CoreHandle *h = cls;
772   struct GNUNET_MQ_MessageHandler handlers[] = {
773     GNUNET_MQ_hd_var_size (hello,
774                            GNUNET_MESSAGE_TYPE_HELLO,
775                            struct GNUNET_MessageHeader,
776                            h),
777     GNUNET_MQ_hd_fixed_size (connect,
778                              GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT,
779                              struct ConnectInfoMessage,
780                              h),
781     GNUNET_MQ_hd_fixed_size (disconnect,
782                              GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT,
783                              struct DisconnectInfoMessage,
784                              h),
785     GNUNET_MQ_hd_fixed_size (send_ok,
786                              GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK,
787                              struct SendOkMessage,
788                              h),
789     GNUNET_MQ_hd_var_size (recv,
790                            GNUNET_MESSAGE_TYPE_TRANSPORT_RECV,
791                            struct InboundMessage,
792                            h),
793     GNUNET_MQ_hd_fixed_size (set_quota,
794                              GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA,
795                              struct QuotaSetMessage,
796                              h),
797     GNUNET_MQ_handler_end ()
798   };
799   struct GNUNET_MQ_Envelope *env;
800   struct StartMessage *s;
801   uint32_t options;
802
803   h->reconnect_task = NULL;
804   LOG (GNUNET_ERROR_TYPE_DEBUG,
805        "Connecting to transport service.\n");
806   GNUNET_assert (NULL == h->mq);
807   h->mq = GNUNET_CLIENT_connect (h->cfg,
808                                  "transport",
809                                  handlers,
810                                  &mq_error_handler,
811                                  h);
812   if (NULL == h->mq)
813     return;
814   env = GNUNET_MQ_msg (s,
815                        GNUNET_MESSAGE_TYPE_TRANSPORT_START);
816   options = 0;
817   if (h->check_self)
818     options |= 1;
819   if (NULL != h->handlers)
820     options |= 2;
821   s->options = htonl (options);
822   s->self = h->self;
823   GNUNET_MQ_send (h->mq,
824                   env);
825 }
826
827
828 /**
829  * Function that will schedule the job that will try
830  * to connect us again to the client.
831  *
832  * @param h transport service to reconnect
833  */
834 static void
835 disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_CoreHandle *h)
836 {
837   GNUNET_assert (NULL == h->reconnect_task);
838   /* Forget about all neighbours that we used to be connected to */
839   GNUNET_CONTAINER_multipeermap_iterate (h->neighbours,
840                                          &neighbour_delete,
841                                          h);
842   if (NULL != h->mq)
843   {
844     GNUNET_MQ_destroy (h->mq);
845     h->mq = NULL;
846   }
847   LOG (GNUNET_ERROR_TYPE_DEBUG,
848        "Scheduling task to reconnect to transport service in %s.\n",
849        GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay,
850                                                GNUNET_YES));
851   h->reconnect_task =
852       GNUNET_SCHEDULER_add_delayed (h->reconnect_delay,
853                                     &reconnect,
854                                     h);
855   h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
856 }
857
858
859 /**
860  * Checks if a given peer is connected to us and get the message queue.
861  *
862  * @param handle connection to transport service
863  * @param peer the peer to check
864  * @return NULL if disconnected, otherwise message queue for @a peer
865  */
866 struct GNUNET_MQ_Handle *
867 GNUNET_TRANSPORT_core_get_mq (struct GNUNET_TRANSPORT_CoreHandle *handle,
868                               const struct GNUNET_PeerIdentity *peer)
869 {
870   struct Neighbour *n;
871
872   n = neighbour_find (handle,
873                       peer);
874   if (NULL == n)
875     return NULL;
876   return n->mq;
877 }
878
879
880 /**
881  * Connect to the transport service.  Note that the connection may
882  * complete (or fail) asynchronously.
883  *
884  * @param cfg configuration to use
885  * @param self our own identity (API should check that it matches
886  *             the identity found by transport), or NULL (no check)
887  * @param cls closure for the callbacks
888  * @param rec receive function to call
889  * @param nc function to call on connect events
890  * @param nd function to call on disconnect events
891  * @param neb function to call if we have excess bandwidth to a peer
892  * @return NULL on error
893  */
894 struct GNUNET_TRANSPORT_CoreHandle *
895 GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
896                                const struct GNUNET_PeerIdentity *self,
897                                const struct GNUNET_MQ_MessageHandler *handlers,
898                                void *cls,
899                                GNUNET_TRANSPORT_NotifyConnecT nc,
900                                GNUNET_TRANSPORT_NotifyDisconnecT nd,
901                                GNUNET_TRANSPORT_NotifyExcessBandwidtH neb)
902 {
903   struct GNUNET_TRANSPORT_CoreHandle *h;
904   unsigned int i;
905
906   h = GNUNET_new (struct GNUNET_TRANSPORT_CoreHandle);
907   if (NULL != self)
908   {
909     h->self = *self;
910     h->check_self = GNUNET_YES;
911   }
912   h->cfg = cfg;
913   h->cls = cls;
914   h->nc_cb = nc;
915   h->nd_cb = nd;
916   h->neb_cb = neb;
917   h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
918   if (NULL != handlers)
919   {
920     for (i=0;NULL != handlers[i].cb; i++) ;
921     h->handlers = GNUNET_new_array (i + 1,
922                                     struct GNUNET_MQ_MessageHandler);
923     GNUNET_memcpy (h->handlers,
924                    handlers,
925                    i * sizeof (struct GNUNET_MQ_MessageHandler));
926   }
927   LOG (GNUNET_ERROR_TYPE_DEBUG,
928        "Connecting to transport service\n");
929   reconnect (h);
930   if (NULL == h->mq)
931   {
932     GNUNET_free_non_null (h->handlers);
933     GNUNET_free (h);
934     return NULL;
935   }
936   h->neighbours =
937     GNUNET_CONTAINER_multipeermap_create (STARTING_NEIGHBOURS_SIZE,
938                                           GNUNET_YES);
939   return h;
940 }
941
942
943 /**
944  * Disconnect from the transport service.
945  *
946  * @param handle handle to the service as returned from #GNUNET_TRANSPORT_core_connect()
947  */
948 void
949 GNUNET_TRANSPORT_core_disconnect (struct GNUNET_TRANSPORT_CoreHandle *handle)
950 {
951   LOG (GNUNET_ERROR_TYPE_DEBUG,
952        "Transport disconnect called!\n");
953   /* this disconnects all neighbours... */
954   if (NULL == handle->reconnect_task)
955     disconnect_and_schedule_reconnect (handle);
956   /* and now we stop trying to connect again... */
957   if (NULL != handle->reconnect_task)
958   {
959     GNUNET_SCHEDULER_cancel (handle->reconnect_task);
960     handle->reconnect_task = NULL;
961   }
962   GNUNET_CONTAINER_multipeermap_destroy (handle->neighbours);
963   handle->neighbours = NULL;
964   GNUNET_free_non_null (handle->handlers);
965   handle->handlers = NULL;
966   GNUNET_free (handle);
967 }
968
969
970 /* end of transport_api_core.c */