uncrustify as demanded.
[oweals/gnunet.git] / src / transport / transport_api_core.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009-2013, 2016 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20
21 /**
22  * @file transport/transport_api_core.c
23  * @brief library to access the transport service for message exchange
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_constants.h"
29 #include "gnunet_arm_service.h"
30 #include "gnunet_hello_lib.h"
31 #include "gnunet_protocols.h"
32 #include "gnunet_transport_service.h"
33 #include "transport.h"
34
35 #define LOG(kind, ...) GNUNET_log_from(kind, "transport-api-core", __VA_ARGS__)
36
37 /**
38  * If we could not send any payload to a peer for this amount of
39  * time, we print a warning.
40  */
41 #define UNREADY_WARN_TIME GNUNET_TIME_UNIT_MINUTES
42
43 /**
44  * How large to start with for the hashmap of neighbours.
45  */
46 #define STARTING_NEIGHBOURS_SIZE 16
47
48
49 /**
50  * Entry in hash table of all of our current (connected) neighbours.
51  */
52 struct Neighbour {
53   /**
54    * Overall transport handle.
55    */
56   struct GNUNET_TRANSPORT_CoreHandle *h;
57
58   /**
59    * Active message queue for the peer.
60    */
61   struct GNUNET_MQ_Handle *mq;
62
63   /**
64    * Envelope with the message we are currently transmitting (or NULL).
65    */
66   struct GNUNET_MQ_Envelope *env;
67
68   /**
69    * Closure for @e mq handlers.
70    */
71   void *handlers_cls;
72
73   /**
74    * Identity of this neighbour.
75    */
76   struct GNUNET_PeerIdentity id;
77
78   /**
79    * Outbound bandwidh tracker.
80    */
81   struct GNUNET_BANDWIDTH_Tracker out_tracker;
82
83   /**
84    * Entry in our readyness heap (which is sorted by @e next_ready
85    * value).  NULL if there is no pending transmission request for
86    * this neighbour or if we're waiting for @e is_ready to become
87    * true AFTER the @e out_tracker suggested that this peer's quota
88    * has been satisfied (so once @e is_ready goes to #GNUNET_YES,
89    * we should immediately go back into the heap).
90    */
91   struct GNUNET_CONTAINER_HeapNode *hn;
92
93   /**
94    * Task to trigger MQ when we have enough bandwidth for the
95    * next transmission.
96    */
97   struct GNUNET_SCHEDULER_Task *timeout_task;
98
99   /**
100    * Sending consumed more bytes on wire than payload was announced
101    * This overhead is added to the delay of next sending operation
102    */
103   unsigned long long traffic_overhead;
104
105   /**
106    * Is this peer currently ready to receive a message?
107    */
108   int is_ready;
109
110   /**
111    * Size of the message in @e env.
112    */
113   uint16_t env_size;
114 };
115
116
117 /**
118  * Handle for the transport service (includes all of the
119  * state for the transport service).
120  */
121 struct GNUNET_TRANSPORT_CoreHandle {
122   /**
123    * Closure for the callbacks.
124    */
125   void *cls;
126
127   /**
128    * Functions to call for received data (template for
129    * new message queues).
130    */
131   struct GNUNET_MQ_MessageHandler *handlers;
132
133   /**
134    * function to call on connect events
135    */
136   GNUNET_TRANSPORT_NotifyConnect nc_cb;
137
138   /**
139    * function to call on disconnect events
140    */
141   GNUNET_TRANSPORT_NotifyDisconnect nd_cb;
142
143   /**
144    * function to call on excess bandwidth events
145    */
146   GNUNET_TRANSPORT_NotifyExcessBandwidth neb_cb;
147
148   /**
149    * My client connection to the transport service.
150    */
151   struct GNUNET_MQ_Handle *mq;
152
153   /**
154    * My configuration.
155    */
156   const struct GNUNET_CONFIGURATION_Handle *cfg;
157
158   /**
159    * Hash map of the current connected neighbours of this peer.
160    * Maps peer identities to `struct Neighbour` entries.
161    */
162   struct GNUNET_CONTAINER_MultiPeerMap *neighbours;
163
164   /**
165    * Peer identity as assumed by this process, or all zeros.
166    */
167   struct GNUNET_PeerIdentity self;
168
169   /**
170    * ID of the task trying to reconnect to the service.
171    */
172   struct GNUNET_SCHEDULER_Task *reconnect_task;
173
174   /**
175    * Delay until we try to reconnect.
176    */
177   struct GNUNET_TIME_Relative reconnect_delay;
178
179   /**
180    * Internal counter to check how many more receive OK messages this
181    * CORE service is allowed to send in total. Just to detect easy
182    * cases of protocol violations by the CORE implementation.
183    * NOTE: we may want to make this stronger by counting per peer
184    * instead of globally.
185    */
186   unsigned int rom_pending;
187
188   /**
189    * Should we check that @e self matches what the service thinks?
190    * (if #GNUNET_NO, then @e self is all zeros!).
191    */
192   int check_self;
193 };
194
195
196 /**
197  * Function that will schedule the job that will try
198  * to connect us again to the client.
199  *
200  * @param h transport service to reconnect
201  */
202 static void
203 disconnect_and_schedule_reconnect(struct GNUNET_TRANSPORT_CoreHandle *h);
204
205
206 /**
207  * Get the neighbour list entry for the given peer
208  *
209  * @param h our context
210  * @param peer peer to look up
211  * @return NULL if no such peer entry exists
212  */
213 static struct Neighbour *
214 neighbour_find(struct GNUNET_TRANSPORT_CoreHandle *h,
215                const struct GNUNET_PeerIdentity *peer)
216 {
217   return GNUNET_CONTAINER_multipeermap_get(h->neighbours, peer);
218 }
219
220
221 /**
222  * Function called by the bandwidth tracker if we have excess
223  * bandwidth.
224  *
225  * @param cls the `struct Neighbour` that has excess bandwidth
226  */
227 static void
228 notify_excess_cb(void *cls)
229 {
230   struct Neighbour *n = cls;
231   struct GNUNET_TRANSPORT_CoreHandle *h = n->h;
232
233   LOG(GNUNET_ERROR_TYPE_DEBUG,
234       "Notifying CORE that more bandwidth is available for %s\n",
235       GNUNET_i2s(&n->id));
236
237   if (NULL != h->neb_cb)
238     h->neb_cb(h->cls, &n->id, n->handlers_cls);
239 }
240
241
242 /**
243  * Iterator over hash map entries, for deleting state of a neighbour.
244  *
245  * @param cls the `struct GNUNET_TRANSPORT_CoreHandle *`
246  * @param key peer identity
247  * @param value value in the hash map, the neighbour entry to delete
248  * @return #GNUNET_YES if we should continue to
249  *         iterate,
250  *         #GNUNET_NO if not.
251  */
252 static int
253 neighbour_delete(void *cls, const struct GNUNET_PeerIdentity *key, void *value)
254 {
255   struct GNUNET_TRANSPORT_CoreHandle *handle = cls;
256   struct Neighbour *n = value;
257
258   LOG(GNUNET_ERROR_TYPE_DEBUG,
259       "Dropping entry for neighbour `%s'.\n",
260       GNUNET_i2s(key));
261   GNUNET_BANDWIDTH_tracker_notification_stop(&n->out_tracker);
262   if (NULL != handle->nd_cb)
263     handle->nd_cb(handle->cls, &n->id, n->handlers_cls);
264   if (NULL != n->timeout_task)
265     {
266       GNUNET_SCHEDULER_cancel(n->timeout_task);
267       n->timeout_task = NULL;
268     }
269   if (NULL != n->env)
270     {
271       GNUNET_MQ_send_cancel(n->env);
272       n->env = NULL;
273     }
274   GNUNET_MQ_destroy(n->mq);
275   GNUNET_assert(NULL == n->mq);
276   GNUNET_assert(
277     GNUNET_YES ==
278     GNUNET_CONTAINER_multipeermap_remove(handle->neighbours, key, n));
279   GNUNET_free(n);
280   return GNUNET_YES;
281 }
282
283
284 /**
285  * Generic error handler, called with the appropriate
286  * error code and the same closure specified at the creation of
287  * the message queue.
288  * Not every message queue implementation supports an error handler.
289  *
290  * @param cls closure with the `struct GNUNET_TRANSPORT_CoreHandle *`
291  * @param error error code
292  */
293 static void
294 mq_error_handler(void *cls, enum GNUNET_MQ_Error error)
295 {
296   struct GNUNET_TRANSPORT_CoreHandle *h = cls;
297
298   LOG(GNUNET_ERROR_TYPE_ERROR,
299       "Error receiving from transport service (%d), disconnecting temporarily.\n",
300       error);
301   disconnect_and_schedule_reconnect(h);
302 }
303
304
305 /**
306  * Function we use for checking incoming HELLO messages.
307  *
308  * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *`
309  * @param msg message received
310  * @return #GNUNET_OK if message is well-formed
311  */
312 static int
313 check_hello(void *cls, const struct GNUNET_MessageHeader *msg)
314 {
315   struct GNUNET_PeerIdentity me;
316
317   if (GNUNET_OK !=
318       GNUNET_HELLO_get_id((const struct GNUNET_HELLO_Message *)msg, &me))
319     {
320       GNUNET_break(0);
321       return GNUNET_SYSERR;
322     }
323   return GNUNET_OK;
324 }
325
326
327 /**
328  * Function we use for handling incoming HELLO messages.
329  *
330  * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *`
331  * @param msg message received
332  */
333 static void
334 handle_hello(void *cls, const struct GNUNET_MessageHeader *msg)
335 {
336   /* we do not care => FIXME: signal in options to NEVER send HELLOs! */
337 }
338
339
340 /**
341  * A message from the handler's message queue to a neighbour was
342  * transmitted.  Now trigger (possibly delayed) notification of the
343  * neighbour's message queue that we are done and thus ready for
344  * the next message.
345  *
346  * @param cls the `struct Neighbour` where the message was sent
347  */
348 static void
349 notify_send_done_fin(void *cls)
350 {
351   struct Neighbour *n = cls;
352
353   n->timeout_task = NULL;
354   n->is_ready = GNUNET_YES;
355   GNUNET_MQ_impl_send_continue(n->mq);
356 }
357
358
359 /**
360  * A message from the handler's message queue to a neighbour was
361  * transmitted.  Now trigger (possibly delayed) notification of the
362  * neighbour's message queue that we are done and thus ready for
363  * the next message.
364  *
365  * @param cls the `struct Neighbour` where the message was sent
366  */
367 static void
368 notify_send_done(void *cls)
369 {
370   struct Neighbour *n = cls;
371   struct GNUNET_TIME_Relative delay;
372
373   n->timeout_task = NULL;
374   if (NULL != n->env)
375     {
376       GNUNET_BANDWIDTH_tracker_consume(&n->out_tracker,
377                                        n->env_size + n->traffic_overhead);
378       n->env = NULL;
379       n->traffic_overhead = 0;
380     }
381   delay = GNUNET_BANDWIDTH_tracker_get_delay(&n->out_tracker, 128);
382   if (0 == delay.rel_value_us)
383     {
384       n->is_ready = GNUNET_YES;
385       GNUNET_MQ_impl_send_continue(n->mq);
386       return;
387     }
388   GNUNET_MQ_impl_send_in_flight(n->mq);
389   /* cannot send even a small message without violating
390      quota, wait a before allowing MQ to send next message */
391   n->timeout_task =
392     GNUNET_SCHEDULER_add_delayed(delay, &notify_send_done_fin, n);
393 }
394
395
396 /**
397  * Implement sending functionality of a message queue.
398  * Called one message at a time. Should send the @a msg
399  * to the transport service and then notify the queue
400  * once we are ready for the next one.
401  *
402  * @param mq the message queue
403  * @param msg the message to send
404  * @param impl_state state of the implementation
405  */
406 static void
407 mq_send_impl(struct GNUNET_MQ_Handle *mq,
408              const struct GNUNET_MessageHeader *msg,
409              void *impl_state)
410 {
411   struct Neighbour *n = impl_state;
412   struct GNUNET_TRANSPORT_CoreHandle *h = n->h;
413   struct OutboundMessage *obm;
414   uint16_t msize;
415
416   GNUNET_assert(GNUNET_YES == n->is_ready);
417   msize = ntohs(msg->size);
418   if (msize >= GNUNET_MAX_MESSAGE_SIZE - sizeof(*obm))
419     {
420       GNUNET_break(0);
421       GNUNET_MQ_impl_send_continue(mq);
422       return;
423     }
424   GNUNET_assert(NULL == n->env);
425   n->env =
426     GNUNET_MQ_msg_nested_mh(obm, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, msg);
427   {
428     struct GNUNET_MQ_Envelope *env;
429
430     env = GNUNET_MQ_get_current_envelope(mq);
431     obm->priority = htonl((uint32_t)GNUNET_MQ_env_get_options(env));
432   }
433   obm->timeout = GNUNET_TIME_relative_hton(
434     GNUNET_TIME_UNIT_MINUTES); /* FIXME: to be removed */
435   obm->peer = n->id;
436   GNUNET_assert(NULL == n->timeout_task);
437   n->is_ready = GNUNET_NO;
438   n->env_size = ntohs(msg->size);
439   GNUNET_MQ_notify_sent(n->env, &notify_send_done, n);
440   GNUNET_MQ_send(h->mq, n->env);
441   LOG(GNUNET_ERROR_TYPE_DEBUG,
442       "Queued message of type %u for neighbour `%s'.\n",
443       ntohs(msg->type),
444       GNUNET_i2s(&n->id));
445 }
446
447
448 /**
449  * Handle destruction of a message queue.  Implementations must not
450  * free @a mq, but should take care of @a impl_state.
451  *
452  * @param mq the message queue to destroy
453  * @param impl_state state of the implementation
454  */
455 static void
456 mq_destroy_impl(struct GNUNET_MQ_Handle *mq, void *impl_state)
457 {
458   struct Neighbour *n = impl_state;
459
460   GNUNET_assert(mq == n->mq);
461   n->mq = NULL;
462 }
463
464
465 /**
466  * Implementation function that cancels the currently sent message.
467  * Should basically undo whatever #mq_send_impl() did.
468  *
469  * @param mq message queue
470  * @param impl_state state specific to the implementation
471  */
472 static void
473 mq_cancel_impl(struct GNUNET_MQ_Handle *mq, void *impl_state)
474 {
475   struct Neighbour *n = impl_state;
476
477   GNUNET_assert(GNUNET_NO == n->is_ready);
478   if (NULL != n->env)
479     {
480       GNUNET_MQ_send_cancel(n->env);
481       n->env = NULL;
482     }
483
484   n->is_ready = GNUNET_YES;
485 }
486
487
488 /**
489  * We had an error processing a message we forwarded from a peer to
490  * the CORE service.  We should just complain about it but otherwise
491  * continue processing.
492  *
493  * @param cls closure
494  * @param error error code
495  */
496 static void
497 peer_mq_error_handler(void *cls, enum GNUNET_MQ_Error error)
498 {
499   /* struct Neighbour *n = cls; */
500
501   GNUNET_break_op(0);
502 }
503
504
505 /**
506  * The outbound quota has changed in a way that may require
507  * us to reset the timeout.  Update the timeout.
508  *
509  * @param cls the `struct Neighbour` for which the timeout changed
510  */
511 static void
512 outbound_bw_tracker_update(void *cls)
513 {
514   struct Neighbour *n = cls;
515   struct GNUNET_TIME_Relative delay;
516
517   if (NULL == n->timeout_task)
518     return;
519   delay = GNUNET_BANDWIDTH_tracker_get_delay(&n->out_tracker, 128);
520   GNUNET_SCHEDULER_cancel(n->timeout_task);
521   n->timeout_task = GNUNET_SCHEDULER_add_delayed(delay, &notify_send_done, n);
522 }
523
524
525 /**
526  * Function we use for handling incoming connect messages.
527  *
528  * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
529  * @param cim message received
530  */
531 static void
532 handle_connect(void *cls, const struct ConnectInfoMessage *cim)
533 {
534   struct GNUNET_TRANSPORT_CoreHandle *h = cls;
535   struct Neighbour *n;
536
537   LOG(GNUNET_ERROR_TYPE_DEBUG,
538       "Receiving CONNECT message for `%s' with quota %u\n",
539       GNUNET_i2s(&cim->id),
540       ntohl(cim->quota_out.value__));
541   n = neighbour_find(h, &cim->id);
542   if (NULL != n)
543     {
544       GNUNET_break(0); /* FIXME: this assertion seems to fail sometimes!? */
545       disconnect_and_schedule_reconnect(h);
546       return;
547     }
548   n = GNUNET_new(struct Neighbour);
549   n->id = cim->id;
550   n->h = h;
551   n->is_ready = GNUNET_YES;
552   n->traffic_overhead = 0;
553   GNUNET_BANDWIDTH_tracker_init2(&n->out_tracker,
554                                  &outbound_bw_tracker_update,
555                                  n,
556                                  GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
557                                  MAX_BANDWIDTH_CARRY_S,
558                                  &notify_excess_cb,
559                                  n);
560   GNUNET_assert(GNUNET_OK ==
561                 GNUNET_CONTAINER_multipeermap_put(
562                   h->neighbours,
563                   &n->id,
564                   n,
565                   GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
566
567   GNUNET_BANDWIDTH_tracker_update_quota(&n->out_tracker, cim->quota_out);
568   n->mq = GNUNET_MQ_queue_for_callbacks(&mq_send_impl,
569                                         &mq_destroy_impl,
570                                         &mq_cancel_impl,
571                                         n,
572                                         h->handlers,
573                                         &peer_mq_error_handler,
574                                         n);
575   if (NULL != h->nc_cb)
576     {
577       n->handlers_cls = h->nc_cb(h->cls, &n->id, n->mq);
578       GNUNET_MQ_set_handlers_closure(n->mq, n->handlers_cls);
579     }
580 }
581
582
583 /**
584  * Function we use for handling incoming disconnect messages.
585  *
586  * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *`
587  * @param dim message received
588  */
589 static void
590 handle_disconnect(void *cls, const struct DisconnectInfoMessage *dim)
591 {
592   struct GNUNET_TRANSPORT_CoreHandle *h = cls;
593   struct Neighbour *n;
594
595   GNUNET_break(ntohl(dim->reserved) == 0);
596   LOG(GNUNET_ERROR_TYPE_DEBUG,
597       "Receiving DISCONNECT message for `%s'.\n",
598       GNUNET_i2s(&dim->peer));
599   n = neighbour_find(h, &dim->peer);
600   if (NULL == n)
601     {
602       GNUNET_break(0);
603       disconnect_and_schedule_reconnect(h);
604       return;
605     }
606   GNUNET_assert(GNUNET_YES == neighbour_delete(h, &dim->peer, n));
607 }
608
609
610 /**
611  * Function we use for handling incoming send-ok messages.
612  *
613  * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *`
614  * @param okm message received
615  */
616 static void
617 handle_send_ok(void *cls, const struct SendOkMessage *okm)
618 {
619   struct GNUNET_TRANSPORT_CoreHandle *h = cls;
620   struct Neighbour *n;
621   uint32_t bytes_msg;
622   uint32_t bytes_physical;
623
624   bytes_msg = ntohl(okm->bytes_msg);
625   bytes_physical = ntohl(okm->bytes_physical);
626   LOG(GNUNET_ERROR_TYPE_DEBUG,
627       "Receiving SEND_OK message, transmission to %s %s.\n",
628       GNUNET_i2s(&okm->peer),
629       ntohl(okm->success) == GNUNET_OK ? "succeeded" : "failed");
630   n = neighbour_find(h, &okm->peer);
631   if (NULL == n)
632     {
633       /* We should never get a 'SEND_OK' for a peer that we are not
634          connected to */
635       GNUNET_break(0);
636       disconnect_and_schedule_reconnect(h);
637       return;
638     }
639   if (bytes_physical > bytes_msg)
640     {
641       LOG(GNUNET_ERROR_TYPE_DEBUG,
642           "Overhead for %u byte message was %u\n",
643           bytes_msg,
644           bytes_physical - bytes_msg);
645       n->traffic_overhead += bytes_physical - bytes_msg;
646     }
647 }
648
649
650 /**
651  * Function we use for checking incoming "inbound" messages.
652  *
653  * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *`
654  * @param im message received
655  */
656 static int
657 check_recv(void *cls, const struct InboundMessage *im)
658 {
659   const struct GNUNET_MessageHeader *imm;
660   uint16_t size;
661
662   size = ntohs(im->header.size) - sizeof(*im);
663   if (size < sizeof(struct GNUNET_MessageHeader))
664     {
665       GNUNET_break(0);
666       return GNUNET_SYSERR;
667     }
668   imm = (const struct GNUNET_MessageHeader *)&im[1];
669   if (ntohs(imm->size) != size)
670     {
671       GNUNET_break(0);
672       return GNUNET_SYSERR;
673     }
674   return GNUNET_OK;
675 }
676
677
678 /**
679  * Function we use for handling incoming messages.
680  *
681  * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *`
682  * @param im message received
683  */
684 static void
685 handle_recv(void *cls, const struct InboundMessage *im)
686 {
687   struct GNUNET_TRANSPORT_CoreHandle *h = cls;
688   const struct GNUNET_MessageHeader *imm =
689     (const struct GNUNET_MessageHeader *)&im[1];
690   struct Neighbour *n;
691
692   LOG(GNUNET_ERROR_TYPE_DEBUG,
693       "Received message of type %u with %u bytes from `%s'.\n",
694       (unsigned int)ntohs(imm->type),
695       (unsigned int)ntohs(imm->size),
696       GNUNET_i2s(&im->peer));
697   n = neighbour_find(h, &im->peer);
698   if (NULL == n)
699     {
700       GNUNET_break(0);
701       disconnect_and_schedule_reconnect(h);
702       return;
703     }
704   h->rom_pending++;
705   GNUNET_MQ_inject_message(n->mq, imm);
706 }
707
708
709 /**
710  * Function we use for handling incoming set quota messages.
711  *
712  * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *`
713  * @param msg message received
714  */
715 static void
716 handle_set_quota(void *cls, const struct QuotaSetMessage *qm)
717 {
718   struct GNUNET_TRANSPORT_CoreHandle *h = cls;
719   struct Neighbour *n;
720
721   LOG(GNUNET_ERROR_TYPE_DEBUG,
722       "Receiving SET_QUOTA message for `%s' with quota %u\n",
723       GNUNET_i2s(&qm->peer),
724       ntohl(qm->quota.value__));
725   n = neighbour_find(h, &qm->peer);
726   if (NULL == n)
727     {
728       GNUNET_break(
729         0); /* FIXME: julius reports this assertion fails sometimes? */
730       disconnect_and_schedule_reconnect(h);
731       return;
732     }
733   GNUNET_BANDWIDTH_tracker_update_quota(&n->out_tracker, qm->quota);
734 }
735
736
737 /**
738  * Try again to connect to transport service.
739  *
740  * @param cls the handle to the transport service
741  */
742 static void
743 reconnect(void *cls)
744 {
745   struct GNUNET_TRANSPORT_CoreHandle *h = cls;
746   struct GNUNET_MQ_MessageHandler handlers[] =
747   { GNUNET_MQ_hd_var_size(hello,
748                           GNUNET_MESSAGE_TYPE_HELLO,
749                           struct GNUNET_MessageHeader,
750                           h),
751     GNUNET_MQ_hd_fixed_size(connect,
752                             GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT,
753                             struct ConnectInfoMessage,
754                             h),
755     GNUNET_MQ_hd_fixed_size(disconnect,
756                             GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT,
757                             struct DisconnectInfoMessage,
758                             h),
759     GNUNET_MQ_hd_fixed_size(send_ok,
760                             GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK,
761                             struct SendOkMessage,
762                             h),
763     GNUNET_MQ_hd_var_size(recv,
764                           GNUNET_MESSAGE_TYPE_TRANSPORT_RECV,
765                           struct InboundMessage,
766                           h),
767     GNUNET_MQ_hd_fixed_size(set_quota,
768                             GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA,
769                             struct QuotaSetMessage,
770                             h),
771     GNUNET_MQ_handler_end() };
772   struct GNUNET_MQ_Envelope *env;
773   struct StartMessage *s;
774   uint32_t options;
775
776   h->reconnect_task = NULL;
777   LOG(GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service.\n");
778   GNUNET_assert(NULL == h->mq);
779   h->mq =
780     GNUNET_CLIENT_connect(h->cfg, "transport", handlers, &mq_error_handler, h);
781   if (NULL == h->mq)
782     return;
783   env = GNUNET_MQ_msg(s, GNUNET_MESSAGE_TYPE_TRANSPORT_START);
784   options = 0;
785   if (h->check_self)
786     options |= 1;
787   if (NULL != h->handlers)
788     options |= 2;
789   s->options = htonl(options);
790   s->self = h->self;
791   GNUNET_MQ_send(h->mq, env);
792 }
793
794
795 /**
796  * Function that will schedule the job that will try
797  * to connect us again to the client.
798  *
799  * @param h transport service to reconnect
800  */
801 static void
802 disconnect_and_schedule_reconnect(struct GNUNET_TRANSPORT_CoreHandle *h)
803 {
804   GNUNET_assert(NULL == h->reconnect_task);
805   /* Forget about all neighbours that we used to be connected to */
806   GNUNET_CONTAINER_multipeermap_iterate(h->neighbours, &neighbour_delete, h);
807   if (NULL != h->mq)
808     {
809       GNUNET_MQ_destroy(h->mq);
810       h->mq = NULL;
811     }
812   LOG(GNUNET_ERROR_TYPE_DEBUG,
813       "Scheduling task to reconnect to transport service in %s.\n",
814       GNUNET_STRINGS_relative_time_to_string(h->reconnect_delay, GNUNET_YES));
815   h->reconnect_task =
816     GNUNET_SCHEDULER_add_delayed(h->reconnect_delay, &reconnect, h);
817   h->reconnect_delay = GNUNET_TIME_STD_BACKOFF(h->reconnect_delay);
818 }
819
820
821 /**
822  * Checks if a given peer is connected to us and get the message queue.
823  *
824  * @param handle connection to transport service
825  * @param peer the peer to check
826  * @return NULL if disconnected, otherwise message queue for @a peer
827  */
828 struct GNUNET_MQ_Handle *
829 GNUNET_TRANSPORT_core_get_mq(struct GNUNET_TRANSPORT_CoreHandle *handle,
830                              const struct GNUNET_PeerIdentity *peer)
831 {
832   struct Neighbour *n;
833
834   n = neighbour_find(handle, peer);
835   if (NULL == n)
836     return NULL;
837   return n->mq;
838 }
839
840
841 /**
842  * Connect to the transport service.  Note that the connection may
843  * complete (or fail) asynchronously.
844  *
845  * @param cfg configuration to use
846  * @param self our own identity (API should check that it matches
847  *             the identity found by transport), or NULL (no check)
848  * @param cls closure for the callbacks
849  * @param rec receive function to call
850  * @param nc function to call on connect events
851  * @param nd function to call on disconnect events
852  * @param neb function to call if we have excess bandwidth to a peer
853  * @return NULL on error
854  */
855 struct GNUNET_TRANSPORT_CoreHandle *
856 GNUNET_TRANSPORT_core_connect(const struct GNUNET_CONFIGURATION_Handle *cfg,
857                               const struct GNUNET_PeerIdentity *self,
858                               const struct GNUNET_MQ_MessageHandler *handlers,
859                               void *cls,
860                               GNUNET_TRANSPORT_NotifyConnect nc,
861                               GNUNET_TRANSPORT_NotifyDisconnect nd,
862                               GNUNET_TRANSPORT_NotifyExcessBandwidth neb)
863 {
864   struct GNUNET_TRANSPORT_CoreHandle *h;
865   unsigned int i;
866
867   h = GNUNET_new(struct GNUNET_TRANSPORT_CoreHandle);
868   if (NULL != self)
869     {
870       h->self = *self;
871       h->check_self = GNUNET_YES;
872     }
873   h->cfg = cfg;
874   h->cls = cls;
875   h->nc_cb = nc;
876   h->nd_cb = nd;
877   h->neb_cb = neb;
878   h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
879   if (NULL != handlers)
880     {
881       for (i = 0; NULL != handlers[i].cb; i++)
882         ;
883       h->handlers = GNUNET_new_array(i + 1, struct GNUNET_MQ_MessageHandler);
884       GNUNET_memcpy(h->handlers,
885                     handlers,
886                     i * sizeof(struct GNUNET_MQ_MessageHandler));
887     }
888   LOG(GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service\n");
889   reconnect(h);
890   if (NULL == h->mq)
891     {
892       GNUNET_free_non_null(h->handlers);
893       GNUNET_free(h);
894       return NULL;
895     }
896   h->neighbours =
897     GNUNET_CONTAINER_multipeermap_create(STARTING_NEIGHBOURS_SIZE, GNUNET_YES);
898   return h;
899 }
900
901
902 /**
903  * Disconnect from the transport service.
904  *
905  * @param handle handle to the service as returned from
906  * #GNUNET_TRANSPORT_core_connect()
907  */
908 void
909 GNUNET_TRANSPORT_core_disconnect(struct GNUNET_TRANSPORT_CoreHandle *handle)
910 {
911   LOG(GNUNET_ERROR_TYPE_DEBUG, "Transport disconnect called!\n");
912   /* this disconnects all neighbours... */
913   if (NULL == handle->reconnect_task)
914     disconnect_and_schedule_reconnect(handle);
915   /* and now we stop trying to connect again... */
916   if (NULL != handle->reconnect_task)
917     {
918       GNUNET_SCHEDULER_cancel(handle->reconnect_task);
919       handle->reconnect_task = NULL;
920     }
921   GNUNET_CONTAINER_multipeermap_destroy(handle->neighbours);
922   handle->neighbours = NULL;
923   GNUNET_free_non_null(handle->handlers);
924   handle->handlers = NULL;
925   GNUNET_free(handle);
926 }
927
928
929 /**
930  * Notification from the CORE service to the TRANSPORT service
931  * that the CORE service has finished processing a message from
932  * TRANSPORT (via the @code{handlers} of #GNUNET_TRANSPORT_core_connect())
933  * and that it is thus now OK for TRANSPORT to send more messages
934  * for @a pid.
935  *
936  * Used to provide flow control, this is our equivalent to
937  * #GNUNET_SERVICE_client_continue() of an ordinary service.
938  *
939  * Note that due to the use of a window, TRANSPORT may send multiple
940  * messages destined for the same peer even without an intermediate
941  * call to this function. However, CORE must still call this function
942  * once per message received, as otherwise eventually the window will
943  * be full and TRANSPORT will stop providing messages to CORE for @a
944  * pid.
945  *
946  * @param ch core handle
947  * @param pid which peer was the message from that was fully processed by CORE
948  */
949 void
950 GNUNET_TRANSPORT_core_receive_continue(struct GNUNET_TRANSPORT_CoreHandle *ch,
951                                        const struct GNUNET_PeerIdentity *pid)
952 {
953   struct RecvOkMessage *rom;
954   struct GNUNET_MQ_Envelope *env;
955
956   GNUNET_assert(ch->rom_pending > 0);
957   ch->rom_pending--;
958   env = GNUNET_MQ_msg(rom, GNUNET_MESSAGE_TYPE_TRANSPORT_RECV_OK);
959   rom->increase_window_delta = htonl(1);
960   rom->peer = *pid;
961   GNUNET_MQ_send(ch->mq, env);
962 }
963
964
965 /* end of transport_api_core.c */