d81a6f730941428fee75675bdfd8650e5f0a65e3
[oweals/gnunet.git] / src / transport / transport_api2_core.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009-2013, 2016, 2018 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 /**
20  * @file transport/transport_api_core.c
21  * @brief library to access the transport service for message exchange
22  * @author Christian Grothoff
23  */
24 #include "platform.h"
25 #include "gnunet_util_lib.h"
26 #include "gnunet_constants.h"
27 #include "gnunet_arm_service.h"
28 #include "gnunet_hello_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_transport_core_service.h"
31 #include "transport.h"
32
33 #define LOG(kind,...) GNUNET_log_from (kind, "transport-api-core",__VA_ARGS__)
34
35 /**
36  * How large to start with for the hashmap of neighbours.
37  */
38 #define STARTING_NEIGHBOURS_SIZE 16
39
40
41 /**
42  * Entry in hash table of all of our current (connected) neighbours.
43  */
44 struct Neighbour
45 {
46
47   /**
48    * Identity of this neighbour.
49    */
50   struct GNUNET_PeerIdentity id;
51
52   /**
53    * Overall transport handle.
54    */
55   struct GNUNET_TRANSPORT_CoreHandle *h;
56
57   /**
58    * Active message queue for the peer.
59    */
60   struct GNUNET_MQ_Handle *mq;
61
62   /**
63    * Envelope with the message we are currently transmitting (or NULL).
64    */
65   struct GNUNET_MQ_Envelope *env;
66
67   /**
68    * Closure for @e mq handlers.
69    */
70   void *handlers_cls;
71
72   /**
73    * Entry in our readyness heap (which is sorted by @e next_ready
74    * value).  NULL if there is no pending transmission request for
75    * this neighbour or if we're waiting for @e is_ready to become
76    * true AFTER the @e out_tracker suggested that this peer's quota
77    * has been satisfied (so once @e is_ready goes to #GNUNET_YES,
78    * we should immediately go back into the heap).
79    */
80   struct GNUNET_CONTAINER_HeapNode *hn;
81
82   /**
83    * Task to trigger MQ when we have enough bandwidth for the
84    * next transmission.
85    */
86   struct GNUNET_SCHEDULER_Task *timeout_task;
87
88   /**
89    * Outbound bandwidh tracker.
90    */
91   struct GNUNET_BANDWIDTH_Tracker out_tracker;
92
93   /**
94    * Sending consumed more bytes on wire than payload was announced
95    * This overhead is added to the delay of next sending operation
96    */
97   unsigned long long traffic_overhead;
98
99   /**
100    * Is this peer currently ready to receive a message?
101    */
102   int is_ready;
103
104   /**
105    * Size of the message in @e env.
106    */
107   uint16_t env_size;
108
109 };
110
111
112
113 /**
114  * Handle for the transport service (includes all of the
115  * state for the transport service).
116  */
117 struct GNUNET_TRANSPORT_CoreHandle
118 {
119
120   /**
121    * Closure for the callbacks.
122    */
123   void *cls;
124
125   /**
126    * Functions to call for received data (template for
127    * new message queues).
128    */
129   struct GNUNET_MQ_MessageHandler *handlers;
130
131   /**
132    * function to call on connect events
133    */
134   GNUNET_TRANSPORT_NotifyConnect nc_cb;
135
136   /**
137    * function to call on disconnect events
138    */
139   GNUNET_TRANSPORT_NotifyDisconnect nd_cb;
140
141   /**
142    * function to call on excess bandwidth events
143    */
144   GNUNET_TRANSPORT_NotifyExcessBandwidth neb_cb;
145
146   /**
147    * My client connection to the transport service.
148    */
149   struct GNUNET_MQ_Handle *mq;
150
151   /**
152    * My configuration.
153    */
154   const struct GNUNET_CONFIGURATION_Handle *cfg;
155
156   /**
157    * Hash map of the current connected neighbours of this peer.
158    * Maps peer identities to `struct Neighbour` entries.
159    */
160   struct GNUNET_CONTAINER_MultiPeerMap *neighbours;
161
162   /**
163    * Peer identity as assumed by this process, or all zeros.
164    */
165   struct GNUNET_PeerIdentity self;
166
167   /**
168    * ID of the task trying to reconnect to the service.
169    */
170   struct GNUNET_SCHEDULER_Task *reconnect_task;
171
172   /**
173    * Delay until we try to reconnect.
174    */
175   struct GNUNET_TIME_Relative reconnect_delay;
176
177   /**
178    * Should we check that @e self matches what the service thinks?
179    * (if #GNUNET_NO, then @e self is all zeros!).
180    */
181   int check_self;
182
183 };
184
185
186 /**
187  * Function that will schedule the job that will try
188  * to connect us again to the client.
189  *
190  * @param h transport service to reconnect
191  */
192 static void
193 disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_CoreHandle *h);
194
195
196 /**
197  * Get the neighbour list entry for the given peer
198  *
199  * @param h our context
200  * @param peer peer to look up
201  * @return NULL if no such peer entry exists
202  */
203 static struct Neighbour *
204 neighbour_find (struct GNUNET_TRANSPORT_CoreHandle *h,
205                 const struct GNUNET_PeerIdentity *peer)
206 {
207   return GNUNET_CONTAINER_multipeermap_get (h->neighbours,
208                                             peer);
209 }
210
211
212 /**
213  * Function called by the bandwidth tracker if we have excess
214  * bandwidth.
215  *
216  * @param cls the `struct Neighbour` that has excess bandwidth
217  */
218 static void
219 notify_excess_cb (void *cls)
220 {
221   struct Neighbour *n = cls;
222   struct GNUNET_TRANSPORT_CoreHandle *h = n->h;
223
224   LOG (GNUNET_ERROR_TYPE_DEBUG,
225        "Notifying CORE that more bandwidth is available for %s\n",
226        GNUNET_i2s (&n->id));
227
228   if (NULL != h->neb_cb)
229     h->neb_cb (h->cls,
230                &n->id,
231                n->handlers_cls);
232 }
233
234
235 /**
236  * Iterator over hash map entries, for deleting state of a neighbour.
237  *
238  * @param cls the `struct GNUNET_TRANSPORT_CoreHandle *`
239  * @param key peer identity
240  * @param value value in the hash map, the neighbour entry to delete
241  * @return #GNUNET_YES if we should continue to
242  *         iterate,
243  *         #GNUNET_NO if not.
244  */
245 static int
246 neighbour_delete (void *cls,
247                   const struct GNUNET_PeerIdentity *key,
248                   void *value)
249 {
250   struct GNUNET_TRANSPORT_CoreHandle *handle = cls;
251   struct Neighbour *n = value;
252
253   LOG (GNUNET_ERROR_TYPE_DEBUG,
254        "Dropping entry for neighbour `%s'.\n",
255        GNUNET_i2s (key));
256   GNUNET_BANDWIDTH_tracker_notification_stop (&n->out_tracker);
257   if (NULL != handle->nd_cb)
258     handle->nd_cb (handle->cls,
259                    &n->id,
260                    n->handlers_cls);
261   if (NULL != n->timeout_task)
262   {
263     GNUNET_SCHEDULER_cancel (n->timeout_task);
264     n->timeout_task = NULL;
265   }
266   if (NULL != n->env)
267   {
268     GNUNET_MQ_send_cancel (n->env);
269     n->env = NULL;
270   }
271   GNUNET_MQ_destroy (n->mq);
272   GNUNET_assert (NULL == n->mq);
273   GNUNET_assert (GNUNET_YES ==
274                  GNUNET_CONTAINER_multipeermap_remove (handle->neighbours,
275                                                        key,
276                                                        n));
277   GNUNET_free (n);
278   return GNUNET_YES;
279 }
280
281
282 /**
283  * Generic error handler, called with the appropriate
284  * error code and the same closure specified at the creation of
285  * the message queue.
286  * Not every message queue implementation supports an error handler.
287  *
288  * @param cls closure with the `struct GNUNET_TRANSPORT_CoreHandle *`
289  * @param error error code
290  */
291 static void
292 mq_error_handler (void *cls,
293                   enum GNUNET_MQ_Error error)
294 {
295   struct GNUNET_TRANSPORT_CoreHandle *h = cls;
296
297   LOG (GNUNET_ERROR_TYPE_DEBUG,
298        "Error receiving from transport service, disconnecting temporarily.\n");
299   disconnect_and_schedule_reconnect (h);
300 }
301
302
303 /**
304  * A message from the handler's message queue to a neighbour was
305  * transmitted.  Now trigger (possibly delayed) notification of the
306  * neighbour's message queue that we are done and thus ready for
307  * the next message.
308  *
309  * @param cls the `struct Neighbour` where the message was sent
310  */
311 static void
312 notify_send_done_fin (void *cls)
313 {
314   struct Neighbour *n = cls;
315
316   n->timeout_task = NULL;
317   n->is_ready = GNUNET_YES;
318   GNUNET_MQ_impl_send_continue (n->mq);
319 }
320
321
322 /**
323  * A message from the handler's message queue to a neighbour was
324  * transmitted.  Now trigger (possibly delayed) notification of the
325  * neighbour's message queue that we are done and thus ready for
326  * the next message.
327  *
328  * @param cls the `struct Neighbour` where the message was sent
329  */
330 static void
331 notify_send_done (void *cls)
332 {
333   struct Neighbour *n = cls;
334   struct GNUNET_TIME_Relative delay;
335
336   n->timeout_task = NULL;
337   if (NULL != n->env)
338   {
339     GNUNET_BANDWIDTH_tracker_consume (&n->out_tracker,
340                                       n->env_size + n->traffic_overhead);
341     n->env = NULL;
342     n->traffic_overhead = 0;
343   }
344   delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
345                                               128);
346   if (0 == delay.rel_value_us)
347   {
348     n->is_ready = GNUNET_YES;
349     GNUNET_MQ_impl_send_continue (n->mq);
350     return;
351   }
352   GNUNET_MQ_impl_send_in_flight (n->mq);
353   /* cannot send even a small message without violating
354      quota, wait a before allowing MQ to send next message */
355   n->timeout_task = GNUNET_SCHEDULER_add_delayed (delay,
356                                                   &notify_send_done_fin,
357                                                   n);
358 }
359
360
361 /**
362  * Implement sending functionality of a message queue.
363  * Called one message at a time. Should send the @a msg
364  * to the transport service and then notify the queue
365  * once we are ready for the next one.
366  *
367  * @param mq the message queue
368  * @param msg the message to send
369  * @param impl_state state of the implementation
370  */
371 static void
372 mq_send_impl (struct GNUNET_MQ_Handle *mq,
373               const struct GNUNET_MessageHeader *msg,
374               void *impl_state)
375 {
376   struct Neighbour *n = impl_state;
377   struct GNUNET_TRANSPORT_CoreHandle *h = n->h;
378   struct OutboundMessage *obm;
379   uint16_t msize;
380
381   GNUNET_assert (GNUNET_YES == n->is_ready);
382   msize = ntohs (msg->size);
383   if (msize >= GNUNET_MAX_MESSAGE_SIZE - sizeof (*obm))
384   {
385     GNUNET_break (0);
386     GNUNET_MQ_impl_send_continue (mq);
387     return;
388   }
389   GNUNET_assert (NULL == n->env);
390   n->env = GNUNET_MQ_msg_nested_mh (obm,
391                                     GNUNET_MESSAGE_TYPE_TRANSPORT_SEND,
392                                     msg);
393   obm->reserved = htonl (0);
394   obm->timeout = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_MINUTES); /* FIXME: to be removed */
395   obm->peer = n->id;
396   GNUNET_assert (NULL == n->timeout_task);
397   n->is_ready = GNUNET_NO;
398   n->env_size = ntohs (msg->size);
399   GNUNET_MQ_notify_sent (n->env,
400                          &notify_send_done,
401                          n);
402   GNUNET_MQ_send (h->mq,
403                   n->env);
404   LOG (GNUNET_ERROR_TYPE_DEBUG,
405        "Queued message of type %u for neighbour `%s'.\n",
406        ntohs (msg->type),
407        GNUNET_i2s (&n->id));
408 }
409
410
411 /**
412  * Handle destruction of a message queue.  Implementations must not
413  * free @a mq, but should take care of @a impl_state.
414  *
415  * @param mq the message queue to destroy
416  * @param impl_state state of the implementation
417  */
418 static void
419 mq_destroy_impl (struct GNUNET_MQ_Handle *mq,
420                  void *impl_state)
421 {
422   struct Neighbour *n = impl_state;
423
424   GNUNET_assert (mq == n->mq);
425   n->mq = NULL;
426 }
427
428
429 /**
430  * Implementation function that cancels the currently sent message.
431  * Should basically undo whatever #mq_send_impl() did.
432  *
433  * @param mq message queue
434  * @param impl_state state specific to the implementation
435  */
436 static void
437 mq_cancel_impl (struct GNUNET_MQ_Handle *mq,
438                 void *impl_state)
439 {
440   struct Neighbour *n = impl_state;
441
442   GNUNET_assert (GNUNET_NO == n->is_ready);
443   if (NULL != n->env)
444   {
445     GNUNET_MQ_send_cancel (n->env);
446     n->env = NULL;
447   }
448
449   n->is_ready = GNUNET_YES;
450 }
451
452
453 /**
454  * We had an error processing a message we forwarded from a peer to
455  * the CORE service.  We should just complain about it but otherwise
456  * continue processing.
457  *
458  * @param cls closure
459  * @param error error code
460  */
461 static void
462 peer_mq_error_handler (void *cls,
463                        enum GNUNET_MQ_Error error)
464 {
465   /* struct Neighbour *n = cls; */
466
467   GNUNET_break_op (0);
468 }
469
470
471 /**
472  * The outbound quota has changed in a way that may require
473  * us to reset the timeout.  Update the timeout.
474  *
475  * @param cls the `struct Neighbour` for which the timeout changed
476  */
477 static void
478 outbound_bw_tracker_update (void *cls)
479 {
480   struct Neighbour *n = cls;
481   struct GNUNET_TIME_Relative delay;
482
483   if (NULL == n->timeout_task)
484     return;
485   delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
486                                               128);
487   GNUNET_SCHEDULER_cancel (n->timeout_task);
488   n->timeout_task = GNUNET_SCHEDULER_add_delayed (delay,
489                                                   &notify_send_done,
490                                                   n);
491 }
492
493
494 /**
495  * Function we use for handling incoming connect messages.
496  *
497  * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
498  * @param cim message received
499  */
500 static void
501 handle_connect (void *cls,
502                 const struct ConnectInfoMessage *cim)
503 {
504   struct GNUNET_TRANSPORT_CoreHandle *h = cls;
505   struct Neighbour *n;
506
507   LOG (GNUNET_ERROR_TYPE_DEBUG,
508        "Receiving CONNECT message for `%s' with quota %u\n",
509        GNUNET_i2s (&cim->id),
510        ntohl (cim->quota_out.value__));
511   n = neighbour_find (h,
512                       &cim->id);
513   if (NULL != n)
514   {
515     GNUNET_break (0);
516     disconnect_and_schedule_reconnect (h);
517     return;
518   }
519   n = GNUNET_new (struct Neighbour);
520   n->id = cim->id;
521   n->h = h;
522   n->is_ready = GNUNET_YES;
523   n->traffic_overhead = 0;
524   GNUNET_BANDWIDTH_tracker_init2 (&n->out_tracker,
525                                   &outbound_bw_tracker_update,
526                                   n,
527                                   GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
528                                   MAX_BANDWIDTH_CARRY_S,
529                                   &notify_excess_cb,
530                                   n);
531   GNUNET_assert (GNUNET_OK ==
532                  GNUNET_CONTAINER_multipeermap_put (h->neighbours,
533                                                     &n->id,
534                                                     n,
535                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
536
537   GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
538                                          cim->quota_out);
539   n->mq = GNUNET_MQ_queue_for_callbacks (&mq_send_impl,
540                                          &mq_destroy_impl,
541                                          &mq_cancel_impl,
542                                          n,
543                                          h->handlers,
544                                          &peer_mq_error_handler,
545                                          n);
546   if (NULL != h->nc_cb)
547   {
548     n->handlers_cls = h->nc_cb (h->cls,
549                                 &n->id,
550                                 n->mq);
551     GNUNET_MQ_set_handlers_closure (n->mq,
552                                     n->handlers_cls);
553   }
554 }
555
556
557 /**
558  * Function we use for handling incoming disconnect messages.
559  *
560  * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *`
561  * @param dim message received
562  */
563 static void
564 handle_disconnect (void *cls,
565                    const struct DisconnectInfoMessage *dim)
566 {
567   struct GNUNET_TRANSPORT_CoreHandle *h = cls;
568   struct Neighbour *n;
569
570   GNUNET_break (ntohl (dim->reserved) == 0);
571   LOG (GNUNET_ERROR_TYPE_DEBUG,
572        "Receiving DISCONNECT message for `%s'.\n",
573        GNUNET_i2s (&dim->peer));
574   n = neighbour_find (h,
575                       &dim->peer);
576   if (NULL == n)
577   {
578     GNUNET_break (0);
579     disconnect_and_schedule_reconnect (h);
580     return;
581   }
582   GNUNET_assert (GNUNET_YES ==
583                  neighbour_delete (h,
584                                    &dim->peer,
585                                    n));
586 }
587
588
589 /**
590  * Function we use for handling incoming send-ok messages.
591  *
592  * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *`
593  * @param okm message received
594  */
595 static void
596 handle_send_ok (void *cls,
597                 const struct SendOkMessage *okm)
598 {
599   struct GNUNET_TRANSPORT_CoreHandle *h = cls;
600   struct Neighbour *n;
601   uint32_t bytes_msg;
602   uint32_t bytes_physical;
603
604   bytes_msg = ntohl (okm->bytes_msg);
605   bytes_physical = ntohl (okm->bytes_physical);
606   LOG (GNUNET_ERROR_TYPE_DEBUG,
607        "Receiving SEND_OK message, transmission to %s %s.\n",
608        GNUNET_i2s (&okm->peer),
609        (GNUNET_OK == ntohl (okm->success))
610        ? "succeeded"
611        : "failed");
612   n = neighbour_find (h,
613                       &okm->peer);
614   if (NULL == n)
615   {
616     /* We should never get a 'SEND_OK' for a peer that we are not
617        connected to */
618     GNUNET_break (0);
619     disconnect_and_schedule_reconnect (h);
620     return;
621   }
622   if (bytes_physical > bytes_msg)
623   {
624     LOG (GNUNET_ERROR_TYPE_DEBUG,
625          "Overhead for %u byte message was %u\n",
626          (unsigned int) bytes_msg,
627          (unsigned int) (bytes_physical - bytes_msg));
628     n->traffic_overhead += bytes_physical - bytes_msg;
629   }
630 }
631
632
633 /**
634  * Function we use for checking incoming "inbound" messages.
635  *
636  * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *`
637  * @param im message received
638  */
639 static int
640 check_recv (void *cls,
641             const struct InboundMessage *im)
642 {
643   const struct GNUNET_MessageHeader *imm;
644   uint16_t size;
645
646   size = ntohs (im->header.size) - sizeof (*im);
647   if (size < sizeof (struct GNUNET_MessageHeader))
648   {
649     GNUNET_break (0);
650     return GNUNET_SYSERR;
651   }
652   imm = (const struct GNUNET_MessageHeader *) &im[1];
653   if (ntohs (imm->size) != size)
654   {
655     GNUNET_break (0);
656     return GNUNET_SYSERR;
657   }
658   return GNUNET_OK;
659 }
660
661
662 /**
663  * Function we use for handling incoming messages.
664  *
665  * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *`
666  * @param im message received
667  */
668 static void
669 handle_recv (void *cls,
670              const struct InboundMessage *im)
671 {
672   struct GNUNET_TRANSPORT_CoreHandle *h = cls;
673   const struct GNUNET_MessageHeader *imm
674     = (const struct GNUNET_MessageHeader *) &im[1];
675   struct Neighbour *n;
676
677   LOG (GNUNET_ERROR_TYPE_DEBUG,
678        "Received message of type %u with %u bytes from `%s'.\n",
679        (unsigned int) ntohs (imm->type),
680        (unsigned int) ntohs (imm->size),
681        GNUNET_i2s (&im->peer));
682   n = neighbour_find (h,
683                       &im->peer);
684   if (NULL == n)
685   {
686     GNUNET_break (0);
687     disconnect_and_schedule_reconnect (h);
688     return;
689   }
690   GNUNET_MQ_inject_message (n->mq,
691                             imm);
692 }
693
694
695 /**
696  * Function we use for handling incoming set quota messages.
697  *
698  * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *`
699  * @param msg message received
700  */
701 static void
702 handle_set_quota (void *cls,
703                   const struct QuotaSetMessage *qm)
704 {
705   struct GNUNET_TRANSPORT_CoreHandle *h = cls;
706   struct Neighbour *n;
707
708   n = neighbour_find (h,
709                       &qm->peer);
710   if (NULL == n)
711   {
712     GNUNET_break (0);
713     disconnect_and_schedule_reconnect (h);
714     return;
715   }
716   LOG (GNUNET_ERROR_TYPE_DEBUG,
717        "Receiving SET_QUOTA message for `%s' with quota %u\n",
718        GNUNET_i2s (&qm->peer),
719        (unsigned int) ntohl (qm->quota.value__));
720   GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
721                                          qm->quota);
722 }
723
724
725 /**
726  * Try again to connect to transport service.
727  *
728  * @param cls the handle to the transport service
729  */
730 static void
731 reconnect (void *cls)
732 {
733   struct GNUNET_TRANSPORT_CoreHandle *h = cls;
734   struct GNUNET_MQ_MessageHandler handlers[] = {
735     GNUNET_MQ_hd_fixed_size (connect,
736                              GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT,
737                              struct ConnectInfoMessage,
738                              h),
739     GNUNET_MQ_hd_fixed_size (disconnect,
740                              GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT,
741                              struct DisconnectInfoMessage,
742                              h),
743     GNUNET_MQ_hd_fixed_size (send_ok,
744                              GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK,
745                              struct SendOkMessage,
746                              h),
747     GNUNET_MQ_hd_var_size (recv,
748                            GNUNET_MESSAGE_TYPE_TRANSPORT_RECV,
749                            struct InboundMessage,
750                            h),
751     GNUNET_MQ_hd_fixed_size (set_quota,
752                              GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA,
753                              struct QuotaSetMessage,
754                              h),
755     GNUNET_MQ_handler_end ()
756   };
757   struct GNUNET_MQ_Envelope *env;
758   struct StartMessage *s;
759   uint32_t options;
760
761   h->reconnect_task = NULL;
762   LOG (GNUNET_ERROR_TYPE_DEBUG,
763        "Connecting to transport service.\n");
764   GNUNET_assert (NULL == h->mq);
765   h->mq = GNUNET_CLIENT_connect (h->cfg,
766                                  "transport",
767                                  handlers,
768                                  &mq_error_handler,
769                                  h);
770   if (NULL == h->mq)
771     return;
772   env = GNUNET_MQ_msg (s,
773                        GNUNET_MESSAGE_TYPE_TRANSPORT_START);
774   options = 0;
775   if (h->check_self)
776     options |= 1;
777   if (NULL != h->handlers)
778     options |= 2;
779   s->options = htonl (options);
780   s->self = h->self;
781   GNUNET_MQ_send (h->mq,
782                   env);
783 }
784
785
786 /**
787  * Disconnect from the transport service.
788  *
789  * @param h transport service to reconnect
790  */
791 static void
792 disconnect (struct GNUNET_TRANSPORT_CoreHandle *h)
793 {
794   GNUNET_CONTAINER_multipeermap_iterate (h->neighbours,
795                                          &neighbour_delete,
796                                          h);
797   if (NULL != h->mq)
798   {
799     GNUNET_MQ_destroy (h->mq);
800     h->mq = NULL;
801   }
802 }
803
804
805 /**
806  * Function that will schedule the job that will try
807  * to connect us again to the client.
808  *
809  * @param h transport service to reconnect
810  */
811 static void
812 disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_CoreHandle *h)
813 {
814   GNUNET_assert (NULL == h->reconnect_task);
815   disconnect (h);
816   LOG (GNUNET_ERROR_TYPE_DEBUG,
817        "Scheduling task to reconnect to transport service in %s.\n",
818        GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay,
819                                                GNUNET_YES));
820   h->reconnect_task =
821       GNUNET_SCHEDULER_add_delayed (h->reconnect_delay,
822                                     &reconnect,
823                                     h);
824   h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
825 }
826
827
828 /**
829  * Checks if a given peer is connected to us and get the message queue.
830  *
831  * @param handle connection to transport service
832  * @param peer the peer to check
833  * @return NULL if disconnected, otherwise message queue for @a peer
834  */
835 struct GNUNET_MQ_Handle *
836 GNUNET_TRANSPORT_core_get_mq (struct GNUNET_TRANSPORT_CoreHandle *handle,
837                               const struct GNUNET_PeerIdentity *peer)
838 {
839   struct Neighbour *n;
840
841   n = neighbour_find (handle,
842                       peer);
843   if (NULL == n)
844     return NULL;
845   return n->mq;
846 }
847
848
849 /**
850  * Connect to the transport service.  Note that the connection may
851  * complete (or fail) asynchronously.
852  *
853  * @param cfg configuration to use
854  * @param self our own identity (API should check that it matches
855  *             the identity found by transport), or NULL (no check)
856  * @param cls closure for the callbacks
857  * @param rec receive function to call
858  * @param nc function to call on connect events
859  * @param nd function to call on disconnect events
860  * @param neb function to call if we have excess bandwidth to a peer
861  * @return NULL on error
862  */
863 struct GNUNET_TRANSPORT_CoreHandle *
864 GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
865                                const struct GNUNET_PeerIdentity *self,
866                                const struct GNUNET_MQ_MessageHandler *handlers,
867                                void *cls,
868                                GNUNET_TRANSPORT_NotifyConnect nc,
869                                GNUNET_TRANSPORT_NotifyDisconnect nd,
870                                GNUNET_TRANSPORT_NotifyExcessBandwidth neb)
871 {
872   struct GNUNET_TRANSPORT_CoreHandle *h;
873   unsigned int i;
874
875   h = GNUNET_new (struct GNUNET_TRANSPORT_CoreHandle);
876   if (NULL != self)
877   {
878     h->self = *self;
879     h->check_self = GNUNET_YES;
880   }
881   h->cfg = cfg;
882   h->cls = cls;
883   h->nc_cb = nc;
884   h->nd_cb = nd;
885   h->neb_cb = neb;
886   h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
887   if (NULL != handlers)
888   {
889     for (i=0;NULL != handlers[i].cb; i++) ;
890     h->handlers = GNUNET_new_array (i + 1,
891                                     struct GNUNET_MQ_MessageHandler);
892     GNUNET_memcpy (h->handlers,
893                    handlers,
894                    i * sizeof (struct GNUNET_MQ_MessageHandler));
895   }
896   LOG (GNUNET_ERROR_TYPE_DEBUG,
897        "Connecting to transport service\n");
898   reconnect (h);
899   if (NULL == h->mq)
900   {
901     GNUNET_free_non_null (h->handlers);
902     GNUNET_free (h);
903     return NULL;
904   }
905   h->neighbours =
906     GNUNET_CONTAINER_multipeermap_create (STARTING_NEIGHBOURS_SIZE,
907                                           GNUNET_YES);
908   return h;
909 }
910
911
912 /**
913  * Disconnect from the transport service.
914  *
915  * @param handle handle to the service as returned from #GNUNET_TRANSPORT_core_connect()
916  */
917 void
918 GNUNET_TRANSPORT_core_disconnect (struct GNUNET_TRANSPORT_CoreHandle *handle)
919 {
920   LOG (GNUNET_ERROR_TYPE_DEBUG,
921        "Transport disconnect called!\n");
922   /* this disconnects all neighbours... */
923   disconnect (handle);
924   /* and now we stop trying to connect again... */
925   if (NULL != handle->reconnect_task)
926   {
927     GNUNET_SCHEDULER_cancel (handle->reconnect_task);
928     handle->reconnect_task = NULL;
929   }
930   GNUNET_CONTAINER_multipeermap_destroy (handle->neighbours);
931   handle->neighbours = NULL;
932   GNUNET_free_non_null (handle->handlers);
933   handle->handlers = NULL;
934   GNUNET_free (handle);
935 }
936
937
938 /* end of transport_api_core.c */