dead code elimination
[oweals/gnunet.git] / src / core / core_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file core/core_api.c
23  * @brief core service; this is the main API for encrypted P2P
24  *        communications
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_constants.h"
29 #include "gnunet_core_service.h"
30 #include "core.h"
31
32
33 /**
34  * Information we track for each peer.
35  */
36 struct PeerRecord
37 {
38
39   /**
40    * We generally do NOT keep peer records in a DLL; this
41    * DLL is only used IF this peer's 'pending_head' message
42    * is ready for transmission.
43    */
44   struct PeerRecord *prev;
45
46   /**
47    * We generally do NOT keep peer records in a DLL; this
48    * DLL is only used IF this peer's 'pending_head' message
49    * is ready for transmission.
50    */
51   struct PeerRecord *next;
52
53   /**
54    * Peer the record is about.
55    */
56   struct GNUNET_PeerIdentity peer;
57
58   /**
59    * Corresponding core handle.
60    */
61   struct GNUNET_CORE_Handle *ch;
62
63   /**
64    * Head of doubly-linked list of pending requests.
65    * Requests are sorted by deadline *except* for HEAD,
66    * which is only modified upon transmission to core.
67    */
68   struct GNUNET_CORE_TransmitHandle *pending_head;
69
70   /**
71    * Tail of doubly-linked list of pending requests.
72    */
73   struct GNUNET_CORE_TransmitHandle *pending_tail;
74
75   /**
76    * ID of timeout task for the 'pending_head' handle
77    * which is the one with the smallest timeout.
78    */
79   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
80
81   /**
82    * ID of task to run 'next_request_transmission'.
83    */
84   GNUNET_SCHEDULER_TaskIdentifier ntr_task;
85
86   /**
87    * Current size of the queue of pending requests.
88    */
89   unsigned int queue_size;
90
91   /**
92    * SendMessageRequest ID generator for this peer.
93    */
94   uint16_t smr_id_gen;
95
96 };
97
98
99 /**
100  * Type of function called upon completion.
101  *
102  * @param cls closure
103  * @param success GNUNET_OK on success (which for request_connect
104  *        ONLY means that we transmitted the connect request to CORE,
105  *        it does not mean that we are actually now connected!);
106  *        GNUNET_NO on timeout,
107  *        GNUNET_SYSERR if core was shut down
108  */
109 typedef void (*GNUNET_CORE_ControlContinuation) (void *cls, int success);
110
111
112 /**
113  * Entry in a doubly-linked list of control messages to be transmitted
114  * to the core service.  Control messages include traffic allocation,
115  * connection requests and of course our initial 'init' request.
116  *
117  * The actual message is allocated at the end of this struct.
118  */
119 struct ControlMessage
120 {
121   /**
122    * This is a doubly-linked list.
123    */
124   struct ControlMessage *next;
125
126   /**
127    * This is a doubly-linked list.
128    */
129   struct ControlMessage *prev;
130
131   /**
132    * Function to run after transmission failed/succeeded.
133    */
134   GNUNET_CORE_ControlContinuation cont;
135
136   /**
137    * Closure for 'cont'.
138    */
139   void *cont_cls;
140
141   /**
142    * Transmit handle (if one is associated with this ControlMessage), or NULL.
143    */
144   struct GNUNET_CORE_TransmitHandle *th;
145 };
146
147
148
149 /**
150  * Context for the core service connection.
151  */
152 struct GNUNET_CORE_Handle
153 {
154
155   /**
156    * Configuration we're using.
157    */
158   const struct GNUNET_CONFIGURATION_Handle *cfg;
159
160   /**
161    * Closure for the various callbacks.
162    */
163   void *cls;
164
165   /**
166    * Function to call once we've handshaked with the core service.
167    */
168   GNUNET_CORE_StartupCallback init;
169
170   /**
171    * Function to call whenever we're notified about a peer connecting.
172    */
173   GNUNET_CORE_ConnectEventHandler connects;
174
175   /**
176    * Function to call whenever we're notified about a peer disconnecting.
177    */
178   GNUNET_CORE_DisconnectEventHandler disconnects;
179
180   /**
181    * Function to call whenever we're notified about a peer changing status.
182    */
183   GNUNET_CORE_PeerStatusEventHandler status_events;
184
185   /**
186    * Function to call whenever we receive an inbound message.
187    */
188   GNUNET_CORE_MessageCallback inbound_notify;
189
190   /**
191    * Function to call whenever we receive an outbound message.
192    */
193   GNUNET_CORE_MessageCallback outbound_notify;
194
195   /**
196    * Function handlers for messages of particular type.
197    */
198   const struct GNUNET_CORE_MessageHandler *handlers;
199
200   /**
201    * Our connection to the service.
202    */
203   struct GNUNET_CLIENT_Connection *client;
204
205   /**
206    * Handle for our current transmission request.
207    */
208   struct GNUNET_CLIENT_TransmitHandle *cth;
209
210   /**
211    * Head of doubly-linked list of pending requests.
212    */
213   struct ControlMessage *control_pending_head;
214
215   /**
216    * Tail of doubly-linked list of pending requests.
217    */
218   struct ControlMessage *control_pending_tail;
219
220   /**
221    * Head of doubly-linked list of peers that are core-approved
222    * to send their next message.
223    */
224   struct PeerRecord *ready_peer_head;
225
226   /**
227    * Tail of doubly-linked list of peers that are core-approved
228    * to send their next message.
229    */
230   struct PeerRecord *ready_peer_tail;
231
232   /**
233    * Hash map listing all of the peers that we are currently
234    * connected to.
235    */
236   struct GNUNET_CONTAINER_MultiHashMap *peers;
237
238   /**
239    * Identity of this peer.
240    */
241   struct GNUNET_PeerIdentity me;
242
243   /**
244    * ID of reconnect task (if any).
245    */
246   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
247
248   /**
249    * Current delay we use for re-trying to connect to core.
250    */
251   struct GNUNET_TIME_Relative retry_backoff;
252
253   /**
254    * Number of messages we are allowed to queue per target.
255    */
256   unsigned int queue_size;
257
258   /**
259    * Number of entries in the handlers array.
260    */
261   unsigned int hcnt;
262
263   /**
264    * For inbound notifications without a specific handler, do
265    * we expect to only receive headers?
266    */
267   int inbound_hdr_only;
268
269   /**
270    * For outbound notifications without a specific handler, do
271    * we expect to only receive headers?
272    */
273   int outbound_hdr_only;
274
275   /**
276    * Are we currently disconnected and hence unable to forward
277    * requests?
278    */
279   int currently_down;
280
281 };
282
283
284 /**
285  * Handle for a transmission request.
286  */
287 struct GNUNET_CORE_TransmitHandle
288 {
289
290   /**
291    * We keep active transmit handles in a doubly-linked list.
292    */
293   struct GNUNET_CORE_TransmitHandle *next;
294
295   /**
296    * We keep active transmit handles in a doubly-linked list.
297    */
298   struct GNUNET_CORE_TransmitHandle *prev;
299
300   /**
301    * Corresponding peer record.
302    */
303   struct PeerRecord *peer;
304
305   /**
306    * Corresponding SEND_REQUEST message.  Only non-NULL
307    * while SEND_REQUEST message is pending.
308    */
309   struct ControlMessage *cm;
310
311   /**
312    * Function that will be called to get the actual request
313    * (once we are ready to transmit this request to the core).
314    * The function will be called with a NULL buffer to signal
315    * timeout.
316    */
317   GNUNET_CONNECTION_TransmitReadyNotify get_message;
318
319   /**
320    * Closure for get_message.
321    */
322   void *get_message_cls;
323
324   /**
325    * Timeout for this handle.
326    */
327   struct GNUNET_TIME_Absolute timeout;
328
329   /**
330    * How important is this message?
331    */
332   uint32_t priority;
333
334   /**
335    * Size of this request.
336    */
337   uint16_t msize;
338
339   /**
340    * Send message request ID for this request.
341    */
342   uint16_t smr_id;
343
344   /**
345    * Is corking allowed?
346    */
347   int cork;
348
349 };
350
351
352 /**
353  * Our current client connection went down.  Clean it up
354  * and try to reconnect!
355  *
356  * @param h our handle to the core service
357  */
358 static void
359 reconnect (struct GNUNET_CORE_Handle *h);
360
361
362 /**
363  * Task schedule to try to re-connect to core.
364  *
365  * @param cls the 'struct GNUNET_CORE_Handle'
366  * @param tc task context
367  */
368 static void
369 reconnect_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
370 {
371   struct GNUNET_CORE_Handle *h = cls;
372
373   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
374 #if DEBUG_CORE
375   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
376               "Connecting to CORE service after delay\n");
377 #endif
378   reconnect (h);
379 }
380
381
382 /**
383  * Notify clients about disconnect and free
384  * the entry for connected peer.
385  *
386  * @param cls the 'struct GNUNET_CORE_Handle*'
387  * @param key the peer identity (not used)
388  * @param value the 'struct PeerRecord' to free.
389  * @return GNUNET_YES (continue)
390  */
391 static int
392 disconnect_and_free_peer_entry (void *cls, const GNUNET_HashCode * key,
393                                 void *value)
394 {
395   struct GNUNET_CORE_Handle *h = cls;
396   struct GNUNET_CORE_TransmitHandle *th;
397   struct PeerRecord *pr = value;
398
399   if (pr->timeout_task != GNUNET_SCHEDULER_NO_TASK)
400   {
401     GNUNET_SCHEDULER_cancel (pr->timeout_task);
402     pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
403   }
404   if (pr->ntr_task != GNUNET_SCHEDULER_NO_TASK)
405   {
406     GNUNET_SCHEDULER_cancel (pr->ntr_task);
407     pr->ntr_task = GNUNET_SCHEDULER_NO_TASK;
408   }
409   if ((pr->prev != NULL) || (pr->next != NULL) || (h->ready_peer_head == pr))
410     GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr);
411   if (h->disconnects != NULL)
412     h->disconnects (h->cls, &pr->peer);
413   /* all requests should have been cancelled, clean up anyway, just in case */
414   GNUNET_break (pr->queue_size == 0);
415   while (NULL != (th = pr->pending_head))
416   {
417     GNUNET_break (0);
418     GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, th);
419     pr->queue_size--;
420     if (th->cm != NULL)
421       th->cm->th = NULL;
422     GNUNET_free (th);
423   }
424   /* done with 'voluntary' cleanups, now on to normal freeing */
425   GNUNET_assert (GNUNET_YES ==
426                  GNUNET_CONTAINER_multihashmap_remove (h->peers, key, pr));
427   GNUNET_assert (pr->pending_head == NULL);
428   GNUNET_assert (pr->pending_tail == NULL);
429   GNUNET_assert (pr->ch = h);
430   GNUNET_assert (pr->queue_size == 0);
431   GNUNET_assert (pr->timeout_task == GNUNET_SCHEDULER_NO_TASK);
432   GNUNET_assert (pr->ntr_task == GNUNET_SCHEDULER_NO_TASK);
433   GNUNET_free (pr);
434   return GNUNET_YES;
435 }
436
437
438 /**
439  * Close down any existing connection to the CORE service and
440  * try re-establishing it later.
441  *
442  * @param h our handle
443  */
444 static void
445 reconnect_later (struct GNUNET_CORE_Handle *h)
446 {
447   struct ControlMessage *cm;
448   struct PeerRecord *pr;
449
450   GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
451   if (NULL != h->cth)
452   {
453     GNUNET_CLIENT_notify_transmit_ready_cancel (h->cth);
454     h->cth = NULL;
455   }
456   if (h->client != NULL)
457   {
458     GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
459     h->client = NULL;
460   }
461   h->currently_down = GNUNET_YES;
462   GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
463   h->reconnect_task =
464       GNUNET_SCHEDULER_add_delayed (h->retry_backoff, &reconnect_task, h);
465   while (NULL != (cm = h->control_pending_head))
466   {
467     GNUNET_CONTAINER_DLL_remove (h->control_pending_head,
468                                  h->control_pending_tail, cm);
469     if (cm->th != NULL)
470       cm->th->cm = NULL;
471     if (cm->cont != NULL)
472       cm->cont (cm->cont_cls, GNUNET_NO);
473     GNUNET_free (cm);
474   }
475   GNUNET_CONTAINER_multihashmap_iterate (h->peers,
476                                          &disconnect_and_free_peer_entry, h);
477   while (NULL != (pr = h->ready_peer_head))
478     GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr);
479   GNUNET_assert (h->control_pending_head == NULL);
480   h->retry_backoff =
481       GNUNET_TIME_relative_min (GNUNET_TIME_UNIT_SECONDS, h->retry_backoff);
482   h->retry_backoff = GNUNET_TIME_relative_multiply (h->retry_backoff, 2);
483 }
484
485
486 /**
487  * Check the list of pending requests, send the next
488  * one to the core.
489  *
490  * @param h core handle
491  * @param ignore_currently_down transmit message even if not initialized?
492  */
493 static void
494 trigger_next_request (struct GNUNET_CORE_Handle *h, int ignore_currently_down);
495
496
497 /**
498  * The given request hit its timeout.  Remove from the
499  * doubly-linked list and call the respective continuation.
500  *
501  * @param cls the transmit handle of the request that timed out
502  * @param tc context, can be NULL (!)
503  */
504 static void
505 transmission_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
506
507
508 /**
509  * Send a control message to the peer asking for transmission
510  * of the message in the given peer record.
511  *
512  * @param pr peer to request transmission to
513  */
514 static void
515 request_next_transmission (struct PeerRecord *pr)
516 {
517   struct GNUNET_CORE_Handle *h = pr->ch;
518   struct ControlMessage *cm;
519   struct SendMessageRequest *smr;
520   struct GNUNET_CORE_TransmitHandle *th;
521
522   if (pr->timeout_task != GNUNET_SCHEDULER_NO_TASK)
523   {
524     GNUNET_SCHEDULER_cancel (pr->timeout_task);
525     pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
526   }
527   if (NULL == (th = pr->pending_head))
528   {
529     trigger_next_request (h, GNUNET_NO);
530     return;
531   }
532   if (th->cm != NULL)
533     return;                     /* already done */
534   GNUNET_assert (pr->prev == NULL);
535   GNUNET_assert (pr->next == NULL);
536   pr->timeout_task =
537       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining
538                                     (th->timeout), &transmission_timeout, pr);
539   cm = GNUNET_malloc (sizeof (struct ControlMessage) +
540                       sizeof (struct SendMessageRequest));
541   th->cm = cm;
542   cm->th = th;
543   smr = (struct SendMessageRequest *) &cm[1];
544   smr->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND_REQUEST);
545   smr->header.size = htons (sizeof (struct SendMessageRequest));
546   smr->priority = htonl (th->priority);
547   smr->deadline = GNUNET_TIME_absolute_hton (th->timeout);
548   smr->peer = pr->peer;
549   smr->queue_size = htonl (pr->queue_size);
550   smr->size = htons (th->msize);
551   smr->smr_id = htons (th->smr_id = pr->smr_id_gen++);
552   GNUNET_CONTAINER_DLL_insert_tail (h->control_pending_head,
553                                     h->control_pending_tail, cm);
554 #if DEBUG_CORE
555   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
556               "Adding SEND REQUEST for peer `%s' to message queue\n",
557               GNUNET_i2s (&pr->peer));
558 #endif
559   trigger_next_request (h, GNUNET_NO);
560 }
561
562
563 /**
564  * The given request hit its timeout.  Remove from the
565  * doubly-linked list and call the respective continuation.
566  *
567  * @param cls the transmit handle of the request that timed out
568  * @param tc context, can be NULL (!)
569  */
570 static void
571 transmission_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
572 {
573   struct PeerRecord *pr = cls;
574   struct GNUNET_CORE_Handle *h = pr->ch;
575   struct GNUNET_CORE_TransmitHandle *th;
576
577   pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
578   th = pr->pending_head;
579   GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, th);
580   pr->queue_size--;
581   if ((pr->prev != NULL) || (pr->next != NULL) || (pr == h->ready_peer_head))
582   {
583     /* the request that was 'approved' by core was
584      * canceled before it could be transmitted; remove
585      * us from the 'ready' list */
586     GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr);
587   }
588 #if DEBUG_CORE
589   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
590               "Signalling timeout of request for transmission to CORE service\n");
591 #endif
592   request_next_transmission (pr);
593   GNUNET_assert (0 == th->get_message (th->get_message_cls, 0, NULL));
594   GNUNET_free (th);
595 }
596
597
598 /**
599  * Transmit the next message to the core service.
600  */
601 static size_t
602 transmit_message (void *cls, size_t size, void *buf)
603 {
604   struct GNUNET_CORE_Handle *h = cls;
605   struct ControlMessage *cm;
606   struct GNUNET_CORE_TransmitHandle *th;
607   struct PeerRecord *pr;
608   struct SendMessage *sm;
609   const struct GNUNET_MessageHeader *hdr;
610   uint16_t msize;
611   size_t ret;
612
613   GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
614   h->cth = NULL;
615   if (buf == NULL)
616   {
617 #if DEBUG_CORE
618     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
619                 "Transmission failed, initiating reconnect\n");
620 #endif
621     reconnect_later (h);
622     return 0;
623   }
624   /* first check for control messages */
625   if (NULL != (cm = h->control_pending_head))
626   {
627     hdr = (const struct GNUNET_MessageHeader *) &cm[1];
628     msize = ntohs (hdr->size);
629     if (size < msize)
630     {
631       trigger_next_request (h, GNUNET_NO);
632       return 0;
633     }
634 #if DEBUG_CORE
635     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
636                 "Transmitting control message with %u bytes of type %u to core.\n",
637                 (unsigned int) msize, (unsigned int) ntohs (hdr->type));
638 #endif
639     memcpy (buf, hdr, msize);
640     GNUNET_CONTAINER_DLL_remove (h->control_pending_head,
641                                  h->control_pending_tail, cm);
642     if (cm->th != NULL)
643       cm->th->cm = NULL;
644     if (NULL != cm->cont)
645       cm->cont (cm->cont_cls, GNUNET_OK);
646     GNUNET_free (cm);
647     trigger_next_request (h, GNUNET_NO);
648     return msize;
649   }
650   /* now check for 'ready' P2P messages */
651   if (NULL != (pr = h->ready_peer_head))
652   {
653     GNUNET_assert (pr->pending_head != NULL);
654     th = pr->pending_head;
655     if (size < th->msize + sizeof (struct SendMessage))
656     {
657       trigger_next_request (h, GNUNET_NO);
658       return 0;
659     }
660     GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr);
661     GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, th);
662     pr->queue_size--;
663     if (pr->timeout_task != GNUNET_SCHEDULER_NO_TASK)
664     {
665       GNUNET_SCHEDULER_cancel (pr->timeout_task);
666       pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
667     }
668 #if DEBUG_CORE
669     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
670                 "Transmitting SEND request to `%s' with %u bytes.\n",
671                 GNUNET_i2s (&pr->peer), (unsigned int) th->msize);
672 #endif
673     sm = (struct SendMessage *) buf;
674     sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND);
675     sm->priority = htonl (th->priority);
676     sm->deadline = GNUNET_TIME_absolute_hton (th->timeout);
677     sm->peer = pr->peer;
678     sm->cork = htonl ((uint32_t) th->cork);
679     sm->reserved = htonl (0);
680     ret =
681         th->get_message (th->get_message_cls,
682                          size - sizeof (struct SendMessage), &sm[1]);
683
684 #if DEBUG_CORE
685     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
686                 "Transmitting SEND request to `%s' yielded %u bytes.\n",
687                 GNUNET_i2s (&pr->peer), ret);
688 #endif
689     GNUNET_free (th);
690     if (0 == ret)
691     {
692 #if DEBUG_CORE
693       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
694                   "Size of clients message to peer %s is 0!\n",
695                   GNUNET_i2s (&pr->peer));
696 #endif
697       /* client decided to send nothing! */
698       request_next_transmission (pr);
699       return 0;
700     }
701 #if DEBUG_CORE
702     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
703                 "Produced SEND message to core with %u bytes payload\n",
704                 (unsigned int) ret);
705 #endif
706     GNUNET_assert (ret >= sizeof (struct GNUNET_MessageHeader));
707     if (ret + sizeof (struct SendMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
708     {
709       GNUNET_break (0);
710       request_next_transmission (pr);
711       return 0;
712     }
713     ret += sizeof (struct SendMessage);
714     sm->header.size = htons (ret);
715     GNUNET_assert (ret <= size);
716     request_next_transmission (pr);
717     return ret;
718   }
719   return 0;
720 }
721
722
723 /**
724  * Check the list of pending requests, send the next
725  * one to the core.
726  *
727  * @param h core handle
728  * @param ignore_currently_down transmit message even if not initialized?
729  */
730 static void
731 trigger_next_request (struct GNUNET_CORE_Handle *h, int ignore_currently_down)
732 {
733   uint16_t msize;
734
735   if ((GNUNET_YES == h->currently_down) && (ignore_currently_down == GNUNET_NO))
736   {
737 #if DEBUG_CORE
738     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
739                 "Core connection down, not processing queue\n");
740 #endif
741     return;
742   }
743   if (NULL != h->cth)
744   {
745 #if DEBUG_CORE
746     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
747                 "Request pending, not processing queue\n");
748 #endif
749     return;
750   }
751   if (h->control_pending_head != NULL)
752     msize =
753         ntohs (((struct GNUNET_MessageHeader *) &h->
754                 control_pending_head[1])->size);
755   else if (h->ready_peer_head != NULL)
756     msize =
757         h->ready_peer_head->pending_head->msize + sizeof (struct SendMessage);
758   else
759   {
760 #if DEBUG_CORE
761     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
762                 "Request queue empty, not processing queue\n");
763 #endif
764     return;                     /* no pending message */
765   }
766   h->cth =
767       GNUNET_CLIENT_notify_transmit_ready (h->client, msize,
768                                            GNUNET_TIME_UNIT_FOREVER_REL,
769                                            GNUNET_NO, &transmit_message, h);
770 }
771
772
773 /**
774  * Handler for notification messages received from the core.
775  *
776  * @param cls our "struct GNUNET_CORE_Handle"
777  * @param msg the message received from the core service
778  */
779 static void
780 main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg)
781 {
782   struct GNUNET_CORE_Handle *h = cls;
783   const struct InitReplyMessage *m;
784   const struct ConnectNotifyMessage *cnm;
785   const struct DisconnectNotifyMessage *dnm;
786   const struct NotifyTrafficMessage *ntm;
787   const struct GNUNET_MessageHeader *em;
788   const struct PeerStatusNotifyMessage *psnm;
789   const struct SendMessageReady *smr;
790   const struct GNUNET_CORE_MessageHandler *mh;
791   GNUNET_CORE_StartupCallback init;
792   struct PeerRecord *pr;
793   struct GNUNET_CORE_TransmitHandle *th;
794   unsigned int hpos;
795   int trigger;
796   uint16_t msize;
797   uint16_t et;
798   uint32_t ats_count;
799
800   if (msg == NULL)
801   {
802     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
803                 _
804                 ("Client was disconnected from core service, trying to reconnect.\n"));
805     reconnect_later (h);
806     return;
807   }
808   msize = ntohs (msg->size);
809 #if DEBUG_CORE > 2
810   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
811               "Processing message of type %u and size %u from core service\n",
812               ntohs (msg->type), msize);
813 #endif
814   switch (ntohs (msg->type))
815   {
816   case GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY:
817     if (ntohs (msg->size) != sizeof (struct InitReplyMessage))
818     {
819       GNUNET_break (0);
820       reconnect_later (h);
821       return;
822     }
823     m = (const struct InitReplyMessage *) msg;
824     GNUNET_break (0 == ntohl (m->reserved));
825     /* start our message processing loop */
826     if (GNUNET_YES == h->currently_down)
827     {
828       h->currently_down = GNUNET_NO;
829       trigger_next_request (h, GNUNET_NO);
830     }
831     h->retry_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
832     h->me = m->my_identity;
833     if (NULL != (init = h->init))
834     {
835       /* mark so we don't call init on reconnect */
836       h->init = NULL;
837 #if DEBUG_CORE
838       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
839                   "Connected to core service of peer `%s'.\n",
840                   GNUNET_i2s (&h->me));
841 #endif
842       init (h->cls, h, &h->me);
843     }
844     else
845     {
846 #if DEBUG_CORE
847       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
848                   "Successfully reconnected to core service.\n");
849 #endif
850     }
851     /* fake 'connect to self' */
852     pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &h->me.hashPubKey);
853     GNUNET_assert (pr == NULL);
854     pr = GNUNET_malloc (sizeof (struct PeerRecord));
855     pr->peer = h->me;
856     pr->ch = h;
857     GNUNET_assert (GNUNET_YES ==
858                    GNUNET_CONTAINER_multihashmap_put (h->peers,
859                                                       &h->me.hashPubKey, pr,
860                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
861     if (NULL != h->connects)
862       h->connects (h->cls, &h->me, NULL);
863     break;
864   case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT:
865     if (msize < sizeof (struct ConnectNotifyMessage))
866     {
867       GNUNET_break (0);
868       reconnect_later (h);
869       return;
870     }
871     cnm = (const struct ConnectNotifyMessage *) msg;
872     ats_count = ntohl (cnm->ats_count);
873     if ((msize !=
874          sizeof (struct ConnectNotifyMessage) +
875          ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information)) ||
876         (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR !=
877          ntohl ((&cnm->ats)[ats_count].type)))
878     {
879       GNUNET_break (0);
880       reconnect_later (h);
881       return;
882     }
883 #if DEBUG_CORE
884     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
885                 "Received notification about connection from `%s'.\n",
886                 GNUNET_i2s (&cnm->peer));
887 #endif
888     if (0 == memcmp (&h->me, &cnm->peer, sizeof (struct GNUNET_PeerIdentity)))
889     {
890       /* connect to self!? */
891       GNUNET_break (0);
892       return;
893     }
894     pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &cnm->peer.hashPubKey);
895     if (pr != NULL)
896     {
897       GNUNET_break (0);
898       reconnect_later (h);
899       return;
900     }
901     pr = GNUNET_malloc (sizeof (struct PeerRecord));
902     pr->peer = cnm->peer;
903     pr->ch = h;
904     GNUNET_assert (GNUNET_YES ==
905                    GNUNET_CONTAINER_multihashmap_put (h->peers,
906                                                       &cnm->peer.hashPubKey, pr,
907                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
908     if (NULL != h->connects)
909       h->connects (h->cls, &cnm->peer, &cnm->ats);
910     break;
911   case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT:
912     if (msize != sizeof (struct DisconnectNotifyMessage))
913     {
914       GNUNET_break (0);
915       reconnect_later (h);
916       return;
917     }
918     dnm = (const struct DisconnectNotifyMessage *) msg;
919     if (0 == memcmp (&h->me, &dnm->peer, sizeof (struct GNUNET_PeerIdentity)))
920     {
921       /* connection to self!? */
922       GNUNET_break (0);
923       return;
924     }
925     GNUNET_break (0 == ntohl (dnm->reserved));
926 #if DEBUG_CORE
927     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
928                 "Received notification about disconnect from `%s'.\n",
929                 GNUNET_i2s (&dnm->peer));
930 #endif
931     pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &dnm->peer.hashPubKey);
932     if (pr == NULL)
933     {
934       GNUNET_break (0);
935       reconnect_later (h);
936       return;
937     }
938     trigger = ((pr->prev != NULL) || (pr->next != NULL) ||
939                (h->ready_peer_head == pr));
940     disconnect_and_free_peer_entry (h, &dnm->peer.hashPubKey, pr);
941     if (trigger)
942       trigger_next_request (h, GNUNET_NO);
943     break;
944   case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_STATUS_CHANGE:
945     if (NULL == h->status_events)
946     {
947       GNUNET_break (0);
948       return;
949     }
950     if (msize < sizeof (struct PeerStatusNotifyMessage))
951     {
952       GNUNET_break (0);
953       reconnect_later (h);
954       return;
955     }
956     psnm = (const struct PeerStatusNotifyMessage *) msg;
957     if (0 == memcmp (&h->me, &psnm->peer, sizeof (struct GNUNET_PeerIdentity)))
958     {
959       /* self-change!? */
960       GNUNET_break (0);
961       return;
962     }
963     ats_count = ntohl (psnm->ats_count);
964     if ((msize !=
965          sizeof (struct PeerStatusNotifyMessage) +
966          ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information)) ||
967         (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR !=
968          ntohl ((&psnm->ats)[ats_count].type)))
969     {
970       GNUNET_break (0);
971       reconnect_later (h);
972       return;
973     }
974 #if DEBUG_CORE > 1
975     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
976                 "Received notification about status change by `%s'.\n",
977                 GNUNET_i2s (&psnm->peer));
978 #endif
979     pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &psnm->peer.hashPubKey);
980     if (pr == NULL)
981     {
982       GNUNET_break (0);
983       reconnect_later (h);
984       return;
985     }
986     h->status_events (h->cls, &psnm->peer, psnm->bandwidth_in,
987                       psnm->bandwidth_out,
988                       GNUNET_TIME_absolute_ntoh (psnm->timeout), &psnm->ats);
989     break;
990   case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND:
991     if (msize < sizeof (struct NotifyTrafficMessage))
992     {
993       GNUNET_break (0);
994       reconnect_later (h);
995       return;
996     }
997     ntm = (const struct NotifyTrafficMessage *) msg;
998
999     ats_count = ntohl (ntm->ats_count);
1000     if ((msize <
1001          sizeof (struct NotifyTrafficMessage) +
1002          ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information) +
1003          sizeof (struct GNUNET_MessageHeader)) ||
1004         (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR !=
1005          ntohl ((&ntm->ats)[ats_count].type)))
1006     {
1007       GNUNET_break (0);
1008       reconnect_later (h);
1009       return;
1010     }
1011     em = (const struct GNUNET_MessageHeader *) &(&ntm->ats)[ats_count + 1];
1012 #if DEBUG_CORE
1013     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1014                 "Received message of type %u and size %u from peer `%4s'\n",
1015                 ntohs (em->type), ntohs (em->size), GNUNET_i2s (&ntm->peer));
1016 #endif
1017     pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &ntm->peer.hashPubKey);
1018     if (pr == NULL)
1019     {
1020       GNUNET_break (0);
1021       reconnect_later (h);
1022       return;
1023     }
1024     if ((GNUNET_NO == h->inbound_hdr_only) &&
1025         (msize !=
1026          ntohs (em->size) + sizeof (struct NotifyTrafficMessage) +
1027          +ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information)))
1028     {
1029       GNUNET_break (0);
1030       reconnect_later (h);
1031       return;
1032     }
1033     et = ntohs (em->type);
1034     for (hpos = 0; hpos < h->hcnt; hpos++)
1035     {
1036       mh = &h->handlers[hpos];
1037       if (mh->type != et)
1038         continue;
1039       if ((mh->expected_size != ntohs (em->size)) && (mh->expected_size != 0))
1040       {
1041         GNUNET_break (0);
1042         continue;
1043       }
1044       if (GNUNET_OK !=
1045           h->handlers[hpos].callback (h->cls, &ntm->peer, em, &ntm->ats))
1046       {
1047         /* error in processing, do not process other messages! */
1048         break;
1049       }
1050     }
1051     if (NULL != h->inbound_notify)
1052       h->inbound_notify (h->cls, &ntm->peer, em, &ntm->ats);
1053     break;
1054   case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_OUTBOUND:
1055     if (msize < sizeof (struct NotifyTrafficMessage))
1056     {
1057       GNUNET_break (0);
1058       reconnect_later (h);
1059       return;
1060     }
1061     ntm = (const struct NotifyTrafficMessage *) msg;
1062     if (0 == memcmp (&h->me, &ntm->peer, sizeof (struct GNUNET_PeerIdentity)))
1063     {
1064       /* self-change!? */
1065       GNUNET_break (0);
1066       return;
1067     }
1068     ats_count = ntohl (ntm->ats_count);
1069     if ((msize <
1070          sizeof (struct NotifyTrafficMessage) +
1071          ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information) +
1072          sizeof (struct GNUNET_MessageHeader)) ||
1073         (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR !=
1074          ntohl ((&ntm->ats)[ats_count].type)))
1075     {
1076       GNUNET_break (0);
1077       reconnect_later (h);
1078       return;
1079     }
1080     em = (const struct GNUNET_MessageHeader *) &(&ntm->ats)[ats_count + 1];
1081     pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &ntm->peer.hashPubKey);
1082     if (pr == NULL)
1083     {
1084       GNUNET_break (0);
1085       reconnect_later (h);
1086       return;
1087     }
1088 #if DEBUG_CORE
1089     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1090                 "Received notification about transmission to `%s'.\n",
1091                 GNUNET_i2s (&ntm->peer));
1092 #endif
1093     if ((GNUNET_NO == h->outbound_hdr_only) &&
1094         (msize !=
1095          ntohs (em->size) + sizeof (struct NotifyTrafficMessage) +
1096          ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information)))
1097     {
1098       GNUNET_break (0);
1099       reconnect_later (h);
1100       return;
1101     }
1102     if (NULL == h->outbound_notify)
1103     {
1104       GNUNET_break (0);
1105       break;
1106     }
1107     h->outbound_notify (h->cls, &ntm->peer, em, &ntm->ats);
1108     break;
1109   case GNUNET_MESSAGE_TYPE_CORE_SEND_READY:
1110     if (msize != sizeof (struct SendMessageReady))
1111     {
1112       GNUNET_break (0);
1113       reconnect_later (h);
1114       return;
1115     }
1116     smr = (const struct SendMessageReady *) msg;
1117     pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &smr->peer.hashPubKey);
1118     if (pr == NULL)
1119     {
1120       GNUNET_break (0);
1121       reconnect_later (h);
1122       return;
1123     }
1124 #if DEBUG_CORE
1125     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1126                 "Received notification about transmission readiness to `%s'.\n",
1127                 GNUNET_i2s (&smr->peer));
1128 #endif
1129     if (pr->pending_head == NULL)
1130     {
1131       /* request must have been cancelled between the original request
1132        * and the response from core, ignore core's readiness */
1133       break;
1134     }
1135
1136     th = pr->pending_head;
1137     if (ntohs (smr->smr_id) != th->smr_id)
1138     {
1139       /* READY message is for expired or cancelled message,
1140        * ignore! (we should have already sent another request) */
1141       break;
1142     }
1143     if ((pr->prev != NULL) || (pr->next != NULL) || (h->ready_peer_head == pr))
1144     {
1145       /* we should not already be on the ready list... */
1146       GNUNET_break (0);
1147       reconnect_later (h);
1148       return;
1149     }
1150     GNUNET_CONTAINER_DLL_insert (h->ready_peer_head, h->ready_peer_tail, pr);
1151     trigger_next_request (h, GNUNET_NO);
1152     break;
1153   default:
1154     reconnect_later (h);
1155     return;
1156   }
1157   GNUNET_CLIENT_receive (h->client, &main_notify_handler, h,
1158                          GNUNET_TIME_UNIT_FOREVER_REL);
1159 }
1160
1161
1162 /**
1163  * Task executed once we are done transmitting the INIT message.
1164  * Starts our 'receive' loop.
1165  *
1166  * @param cls the 'struct GNUNET_CORE_Handle'
1167  * @param success were we successful
1168  */
1169 static void
1170 init_done_task (void *cls, int success)
1171 {
1172   struct GNUNET_CORE_Handle *h = cls;
1173
1174   if (success == GNUNET_SYSERR)
1175     return;                     /* shutdown */
1176   if (success == GNUNET_NO)
1177   {
1178 #if DEBUG_CORE
1179     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1180                 "Failed to exchange INIT with core, retrying\n");
1181 #endif
1182     if (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK)
1183       reconnect_later (h);
1184     return;
1185   }
1186   GNUNET_CLIENT_receive (h->client, &main_notify_handler, h,
1187                          GNUNET_TIME_UNIT_FOREVER_REL);
1188 }
1189
1190
1191 /**
1192  * Our current client connection went down.  Clean it up
1193  * and try to reconnect!
1194  *
1195  * @param h our handle to the core service
1196  */
1197 static void
1198 reconnect (struct GNUNET_CORE_Handle *h)
1199 {
1200   struct ControlMessage *cm;
1201   struct InitMessage *init;
1202   uint32_t opt;
1203   uint16_t msize;
1204   uint16_t *ts;
1205   unsigned int hpos;
1206
1207 #if DEBUG_CORE
1208   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Reconnecting to CORE service\n");
1209 #endif
1210   GNUNET_assert (h->client == NULL);
1211   GNUNET_assert (h->currently_down == GNUNET_YES);
1212   h->client = GNUNET_CLIENT_connect ("core", h->cfg);
1213   if (h->client == NULL)
1214   {
1215     reconnect_later (h);
1216     return;
1217   }
1218   msize = h->hcnt * sizeof (uint16_t) + sizeof (struct InitMessage);
1219   cm = GNUNET_malloc (sizeof (struct ControlMessage) + msize);
1220   cm->cont = &init_done_task;
1221   cm->cont_cls = h;
1222   init = (struct InitMessage *) &cm[1];
1223   init->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_INIT);
1224   init->header.size = htons (msize);
1225   opt = GNUNET_CORE_OPTION_SEND_CONNECT | GNUNET_CORE_OPTION_SEND_DISCONNECT;
1226   if (h->status_events != NULL)
1227     opt |= GNUNET_CORE_OPTION_SEND_STATUS_CHANGE;
1228   if (h->inbound_notify != NULL)
1229   {
1230     if (h->inbound_hdr_only)
1231       opt |= GNUNET_CORE_OPTION_SEND_HDR_INBOUND;
1232     else
1233       opt |= GNUNET_CORE_OPTION_SEND_FULL_INBOUND;
1234   }
1235   if (h->outbound_notify != NULL)
1236   {
1237     if (h->outbound_hdr_only)
1238       opt |= GNUNET_CORE_OPTION_SEND_HDR_OUTBOUND;
1239     else
1240       opt |= GNUNET_CORE_OPTION_SEND_FULL_OUTBOUND;
1241   }
1242   init->options = htonl (opt);
1243   ts = (uint16_t *) & init[1];
1244   for (hpos = 0; hpos < h->hcnt; hpos++)
1245     ts[hpos] = htons (h->handlers[hpos].type);
1246   GNUNET_CONTAINER_DLL_insert (h->control_pending_head, h->control_pending_tail,
1247                                cm);
1248   trigger_next_request (h, GNUNET_YES);
1249 }
1250
1251
1252
1253 /**
1254  * Connect to the core service.  Note that the connection may
1255  * complete (or fail) asynchronously.
1256  *
1257  * @param cfg configuration to use
1258  * @param queue_size size of the per-peer message queue
1259  * @param cls closure for the various callbacks that follow (including handlers in the handlers array)
1260  * @param init callback to call on timeout or once we have successfully
1261  *        connected to the core service; note that timeout is only meaningful if init is not NULL
1262  * @param connects function to call on peer connect, can be NULL
1263  * @param disconnects function to call on peer disconnect / timeout, can be NULL
1264  * @param status_events function to call on changes to peer connection status, can be NULL
1265  * @param inbound_notify function to call for all inbound messages, can be NULL
1266  * @param inbound_hdr_only set to GNUNET_YES if inbound_notify will only read the
1267  *                GNUNET_MessageHeader and hence we do not need to give it the full message;
1268  *                can be used to improve efficiency, ignored if inbound_notify is NULLL
1269  * @param outbound_notify function to call for all outbound messages, can be NULL
1270  * @param outbound_hdr_only set to GNUNET_YES if outbound_notify will only read the
1271  *                GNUNET_MessageHeader and hence we do not need to give it the full message
1272  *                can be used to improve efficiency, ignored if outbound_notify is NULLL
1273  * @param handlers callbacks for messages we care about, NULL-terminated
1274  * @return handle to the core service (only useful for disconnect until 'init' is called);
1275  *                NULL on error (in this case, init is never called)
1276  */
1277 struct GNUNET_CORE_Handle *
1278 GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
1279                      unsigned int queue_size, void *cls,
1280                      GNUNET_CORE_StartupCallback init,
1281                      GNUNET_CORE_ConnectEventHandler connects,
1282                      GNUNET_CORE_DisconnectEventHandler disconnects,
1283                      GNUNET_CORE_PeerStatusEventHandler status_events,
1284                      GNUNET_CORE_MessageCallback inbound_notify,
1285                      int inbound_hdr_only,
1286                      GNUNET_CORE_MessageCallback outbound_notify,
1287                      int outbound_hdr_only,
1288                      const struct GNUNET_CORE_MessageHandler *handlers)
1289 {
1290   struct GNUNET_CORE_Handle *h;
1291
1292   h = GNUNET_malloc (sizeof (struct GNUNET_CORE_Handle));
1293   h->cfg = cfg;
1294   h->queue_size = queue_size;
1295   h->cls = cls;
1296   h->init = init;
1297   h->connects = connects;
1298   h->disconnects = disconnects;
1299   h->status_events = status_events;
1300   h->inbound_notify = inbound_notify;
1301   h->outbound_notify = outbound_notify;
1302   h->inbound_hdr_only = inbound_hdr_only;
1303   h->outbound_hdr_only = outbound_hdr_only;
1304   h->handlers = handlers;
1305   h->hcnt = 0;
1306   h->currently_down = GNUNET_YES;
1307   h->peers = GNUNET_CONTAINER_multihashmap_create (128);
1308   h->retry_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
1309   if (NULL != handlers)
1310     while (handlers[h->hcnt].callback != NULL)
1311       h->hcnt++;
1312   GNUNET_assert (h->hcnt <
1313                  (GNUNET_SERVER_MAX_MESSAGE_SIZE -
1314                   sizeof (struct InitMessage)) / sizeof (uint16_t));
1315 #if DEBUG_CORE
1316   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Connecting to CORE service\n");
1317 #endif
1318   reconnect (h);
1319   return h;
1320 }
1321
1322
1323 /**
1324  * Disconnect from the core service.  This function can only
1325  * be called *after* all pending 'GNUNET_CORE_notify_transmit_ready'
1326  * requests have been explicitly canceled.
1327  *
1328  * @param handle connection to core to disconnect
1329  */
1330 void
1331 GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle)
1332 {
1333   struct ControlMessage *cm;
1334
1335 #if DEBUG_CORE
1336   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting from CORE service\n");
1337 #endif
1338   if (handle->cth != NULL)
1339   {
1340     GNUNET_CLIENT_notify_transmit_ready_cancel (handle->cth);
1341     handle->cth = NULL;
1342   }
1343   while (NULL != (cm = handle->control_pending_head))
1344   {
1345     GNUNET_CONTAINER_DLL_remove (handle->control_pending_head,
1346                                  handle->control_pending_tail, cm);
1347     if (cm->th != NULL)
1348       cm->th->cm = NULL;
1349     if (cm->cont != NULL)
1350       cm->cont (cm->cont_cls, GNUNET_SYSERR);
1351     GNUNET_free (cm);
1352   }
1353   if (handle->client != NULL)
1354   {
1355     GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
1356     handle->client = NULL;
1357   }
1358   GNUNET_CONTAINER_multihashmap_iterate (handle->peers,
1359                                          &disconnect_and_free_peer_entry,
1360                                          handle);
1361   if (handle->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
1362   {
1363     GNUNET_SCHEDULER_cancel (handle->reconnect_task);
1364     handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
1365   }
1366   GNUNET_CONTAINER_multihashmap_destroy (handle->peers);
1367   handle->peers = NULL;
1368   GNUNET_break (handle->ready_peer_head == NULL);
1369   GNUNET_free (handle);
1370 }
1371
1372
1373 /**
1374  * Task that calls 'request_next_transmission'.
1375  *
1376  * @param cls the 'struct PeerRecord*'
1377  * @param tc scheduler context
1378  */
1379 static void
1380 run_request_next_transmission (void *cls,
1381                                const struct GNUNET_SCHEDULER_TaskContext *tc)
1382 {
1383   struct PeerRecord *pr = cls;
1384
1385   pr->ntr_task = GNUNET_SCHEDULER_NO_TASK;
1386   request_next_transmission (pr);
1387 }
1388
1389
1390 /**
1391  * Ask the core to call "notify" once it is ready to transmit the
1392  * given number of bytes to the specified "target".    Must only be
1393  * called after a connection to the respective peer has been
1394  * established (and the client has been informed about this).
1395  *
1396  * @param handle connection to core service
1397  * @param cork is corking allowed for this transmission?
1398  * @param priority how important is the message?
1399  * @param maxdelay how long can the message wait?
1400  * @param target who should receive the message,
1401  *        use NULL for this peer (loopback)
1402  * @param notify_size how many bytes of buffer space does notify want?
1403  * @param notify function to call when buffer space is available
1404  * @param notify_cls closure for notify
1405  * @return non-NULL if the notify callback was queued,
1406  *         NULL if we can not even queue the request (insufficient
1407  *         memory); if NULL is returned, "notify" will NOT be called.
1408  */
1409 struct GNUNET_CORE_TransmitHandle *
1410 GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle, int cork,
1411                                    uint32_t priority,
1412                                    struct GNUNET_TIME_Relative maxdelay,
1413                                    const struct GNUNET_PeerIdentity *target,
1414                                    size_t notify_size,
1415                                    GNUNET_CONNECTION_TransmitReadyNotify notify,
1416                                    void *notify_cls)
1417 {
1418   struct PeerRecord *pr;
1419   struct GNUNET_CORE_TransmitHandle *th;
1420   struct GNUNET_CORE_TransmitHandle *pos;
1421   struct GNUNET_CORE_TransmitHandle *prev;
1422   struct GNUNET_CORE_TransmitHandle *minp;
1423
1424   pr = GNUNET_CONTAINER_multihashmap_get (handle->peers, &target->hashPubKey);
1425   if (NULL == pr)
1426   {
1427     /* attempt to send to peer that is not connected */
1428     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1429                 "Attempting to send to peer `%s' from peer `%s', but not connected!\n",
1430                 GNUNET_i2s (target), GNUNET_h2s (&handle->me.hashPubKey));
1431     GNUNET_break (0);
1432     return NULL;
1433   }
1434   GNUNET_assert (notify_size + sizeof (struct SendMessage) <
1435                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
1436   th = GNUNET_malloc (sizeof (struct GNUNET_CORE_TransmitHandle));
1437   th->peer = pr;
1438   GNUNET_assert (NULL != notify);
1439   th->get_message = notify;
1440   th->get_message_cls = notify_cls;
1441   th->timeout = GNUNET_TIME_relative_to_absolute (maxdelay);
1442   th->priority = priority;
1443   th->msize = notify_size;
1444   th->cork = cork;
1445   /* bound queue size */
1446   if (pr->queue_size == handle->queue_size)
1447   {
1448     /* find lowest-priority entry, but skip the head of the list */
1449     minp = pr->pending_head->next;
1450     prev = minp;
1451     while (prev != NULL)
1452     {
1453       if (prev->priority < minp->priority)
1454         minp = prev;
1455       prev = prev->next;
1456     }
1457     if (minp == NULL)
1458     {
1459       GNUNET_break (handle->queue_size != 0);
1460       GNUNET_break (pr->queue_size == 1);
1461       GNUNET_free (th);
1462 #if DEBUG_CORE
1463       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1464                   "Dropping transmission request: cannot drop queue head and limit is one\n");
1465 #endif
1466       return NULL;
1467     }
1468     if (priority <= minp->priority)
1469     {
1470 #if DEBUG_CORE
1471       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1472                   "Dropping transmission request: priority too low\n");
1473 #endif
1474       GNUNET_free (th);
1475       return NULL;              /* priority too low */
1476     }
1477     GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, minp);
1478     pr->queue_size--;
1479     GNUNET_assert (0 == minp->get_message (minp->get_message_cls, 0, NULL));
1480     GNUNET_free (minp);
1481   }
1482
1483   /* Order entries by deadline, but SKIP 'HEAD' if
1484    * we're in the 'ready_peer_*' DLL */
1485   pos = pr->pending_head;
1486   if ((pr->prev != NULL) || (pr->next != NULL) ||
1487       (pr == handle->ready_peer_head))
1488   {
1489     GNUNET_assert (pos != NULL);
1490     pos = pos->next;            /* skip head */
1491   }
1492
1493   /* insertion sort */
1494   prev = pos;
1495   while ((pos != NULL) && (pos->timeout.abs_value < th->timeout.abs_value))
1496   {
1497     prev = pos;
1498     pos = pos->next;
1499   }
1500   GNUNET_CONTAINER_DLL_insert_after (pr->pending_head, pr->pending_tail, prev,
1501                                      th);
1502   pr->queue_size++;
1503   /* was the request queue previously empty? */
1504 #if DEBUG_CORE
1505   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission request added to queue\n");
1506 #endif
1507   if ((pr->pending_head == th) && (pr->ntr_task == GNUNET_SCHEDULER_NO_TASK) &&
1508       (pr->next == NULL) && (pr->prev == NULL) &&
1509       (handle->ready_peer_head != pr))
1510     pr->ntr_task =
1511         GNUNET_SCHEDULER_add_now (&run_request_next_transmission, pr);
1512   return th;
1513 }
1514
1515
1516 /**
1517  * Cancel the specified transmission-ready notification.
1518  *
1519  * @param th handle that was returned by "notify_transmit_ready".
1520  */
1521 void
1522 GNUNET_CORE_notify_transmit_ready_cancel (struct GNUNET_CORE_TransmitHandle *th)
1523 {
1524   struct PeerRecord *pr = th->peer;
1525   struct GNUNET_CORE_Handle *h = pr->ch;
1526   int was_head;
1527
1528   was_head = (pr->pending_head == th);
1529   GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, th);
1530   pr->queue_size--;
1531   if (th->cm != NULL)
1532   {
1533     /* we're currently in the control queue, remove */
1534     GNUNET_CONTAINER_DLL_remove (h->control_pending_head,
1535                                  h->control_pending_tail, th->cm);
1536     GNUNET_free (th->cm);
1537   }
1538   GNUNET_free (th);
1539   if (was_head)
1540   {
1541     if ((pr->prev != NULL) || (pr->next != NULL) || (pr == h->ready_peer_head))
1542     {
1543       /* the request that was 'approved' by core was
1544        * canceled before it could be transmitted; remove
1545        * us from the 'ready' list */
1546       GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr);
1547     }
1548     request_next_transmission (pr);
1549   }
1550 }
1551
1552
1553 /* end of core_api.c */