fixing leak
[oweals/gnunet.git] / src / core / core_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file core/core_api.c
23  * @brief core service; this is the main API for encrypted P2P
24  *        communications
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_constants.h"
29 #include "gnunet_core_service.h"
30 #include "core.h"
31
32
33 /**
34  * Information we track for each peer.
35  */
36 struct PeerRecord
37 {
38
39   /**
40    * We generally do NOT keep peer records in a DLL; this
41    * DLL is only used IF this peer's 'pending_head' message
42    * is ready for transmission.  
43    */
44   struct PeerRecord *prev;
45
46   /**
47    * We generally do NOT keep peer records in a DLL; this
48    * DLL is only used IF this peer's 'pending_head' message
49    * is ready for transmission. 
50    */
51   struct PeerRecord *next;
52
53   /**
54    * Peer the record is about.
55    */
56   struct GNUNET_PeerIdentity peer;
57
58   /**
59    * Corresponding core handle.
60    */
61   struct GNUNET_CORE_Handle *ch;
62
63   /**
64    * Head of doubly-linked list of pending requests.
65    * Requests are sorted by deadline *except* for HEAD,
66    * which is only modified upon transmission to core.
67    */
68   struct GNUNET_CORE_TransmitHandle *pending_head;
69
70   /**
71    * Tail of doubly-linked list of pending requests.
72    */
73   struct GNUNET_CORE_TransmitHandle *pending_tail;
74
75   /**
76    * Pending callback waiting for peer information, or NULL for none.
77    */
78   GNUNET_CORE_PeerConfigurationInfoCallback pcic;
79
80   /**
81    * Closure for pcic.
82    */
83   void *pcic_cls;
84
85   /**
86    * Pointer to free when we call pcic.
87    */
88   void *pcic_ptr;
89
90   /**
91    * Request information ID for the given pcic (needed in case a
92    * request is cancelled after being submitted to core and a new
93    * one is generated; in this case, we need to avoid matching the
94    * reply to the first (cancelled) request to the second request).
95    */
96   uint32_t rim_id;
97
98   /**
99    * ID of timeout task for the 'pending_head' handle
100    * which is the one with the smallest timeout. 
101    */
102   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
103
104   /**
105    * Current size of the queue of pending requests.
106    */
107   unsigned int queue_size;
108
109   /**
110    * SendMessageRequest ID generator for this peer.
111    */
112   uint16_t smr_id_gen;
113   
114 };
115
116
117 /**
118  * Entry in a doubly-linked list of control messages to be transmitted
119  * to the core service.  Control messages include traffic allocation,
120  * connection requests and of course our initial 'init' request.
121  * 
122  * The actual message is allocated at the end of this struct.
123  */
124 struct ControlMessage
125 {
126   /**
127    * This is a doubly-linked list.
128    */
129   struct ControlMessage *next;
130
131   /**
132    * This is a doubly-linked list.
133    */
134   struct ControlMessage *prev;
135
136   /**
137    * Function to run after transmission failed/succeeded.
138    */
139   GNUNET_CORE_ControlContinuation cont;
140   
141   /**
142    * Closure for 'cont'.
143    */
144   void *cont_cls;
145
146   /**
147    * Transmit handle (if one is associated with this ControlMessage), or NULL.
148    */
149   struct GNUNET_CORE_TransmitHandle *th;
150 };
151
152
153
154 /**
155  * Context for the core service connection.
156  */
157 struct GNUNET_CORE_Handle
158 {
159
160   /**
161    * Configuration we're using.
162    */
163   const struct GNUNET_CONFIGURATION_Handle *cfg;
164
165   /**
166    * Closure for the various callbacks.
167    */
168   void *cls;
169
170   /**
171    * Function to call once we've handshaked with the core service.
172    */
173   GNUNET_CORE_StartupCallback init;
174
175   /**
176    * Function to call whenever we're notified about a peer connecting.
177    */
178   GNUNET_CORE_ConnectEventHandler connects;
179
180   /**
181    * Function to call whenever we're notified about a peer disconnecting.
182    */
183   GNUNET_CORE_DisconnectEventHandler disconnects;
184
185   /**
186    * Function to call whenever we're notified about a peer changing status.
187    */  
188   GNUNET_CORE_PeerStatusEventHandler status_events;
189   
190   /**
191    * Function to call whenever we receive an inbound message.
192    */
193   GNUNET_CORE_MessageCallback inbound_notify;
194
195   /**
196    * Function to call whenever we receive an outbound message.
197    */
198   GNUNET_CORE_MessageCallback outbound_notify;
199
200   /**
201    * Function handlers for messages of particular type.
202    */
203   const struct GNUNET_CORE_MessageHandler *handlers;
204
205   /**
206    * Our connection to the service.
207    */
208   struct GNUNET_CLIENT_Connection *client;
209
210   /**
211    * Handle for our current transmission request.
212    */
213   struct GNUNET_CLIENT_TransmitHandle *cth;
214
215   /**
216    * Head of doubly-linked list of pending requests.
217    */
218   struct ControlMessage *control_pending_head;
219
220   /**
221    * Tail of doubly-linked list of pending requests.
222    */
223   struct ControlMessage *control_pending_tail;
224
225   /**
226    * Head of doubly-linked list of peers that are core-approved
227    * to send their next message.
228    */
229   struct PeerRecord *ready_peer_head;
230
231   /**
232    * Tail of doubly-linked list of peers that are core-approved
233    * to send their next message.
234    */
235   struct PeerRecord *ready_peer_tail;
236
237   /**
238    * Hash map listing all of the peers that we are currently
239    * connected to.
240    */
241   struct GNUNET_CONTAINER_MultiHashMap *peers;
242
243   /**
244    * Identity of this peer.
245    */
246   struct GNUNET_PeerIdentity me;
247
248   /**
249    * ID of reconnect task (if any).
250    */
251   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
252
253   /**
254    * Current delay we use for re-trying to connect to core.
255    */
256   struct GNUNET_TIME_Relative retry_backoff;
257
258   /**
259    * Request information ID generator.
260    */
261   uint32_t rim_id_gen;
262
263   /**
264    * Number of messages we are allowed to queue per target.
265    */
266   unsigned int queue_size;
267
268   /**
269    * Number of entries in the handlers array.
270    */
271   unsigned int hcnt;
272
273   /**
274    * For inbound notifications without a specific handler, do
275    * we expect to only receive headers?
276    */
277   int inbound_hdr_only;
278
279   /**
280    * For outbound notifications without a specific handler, do
281    * we expect to only receive headers?
282    */
283   int outbound_hdr_only;
284
285   /**
286    * Are we currently disconnected and hence unable to forward
287    * requests?
288    */
289   int currently_down;
290
291 };
292
293
294 /**
295  * Handle for a transmission request.
296  */
297 struct GNUNET_CORE_TransmitHandle
298 {
299
300   /**
301    * We keep active transmit handles in a doubly-linked list.
302    */
303   struct GNUNET_CORE_TransmitHandle *next;
304
305   /**
306    * We keep active transmit handles in a doubly-linked list.
307    */
308   struct GNUNET_CORE_TransmitHandle *prev;
309
310   /**
311    * Corresponding peer record.
312    */
313   struct PeerRecord *peer;
314
315   /**
316    * Corresponding SEND_REQUEST message.  Only non-NULL 
317    * while SEND_REQUEST message is pending.
318    */
319   struct ControlMessage *cm;
320
321   /**
322    * Function that will be called to get the actual request
323    * (once we are ready to transmit this request to the core).
324    * The function will be called with a NULL buffer to signal
325    * timeout.
326    */
327   GNUNET_CONNECTION_TransmitReadyNotify get_message;
328
329   /**
330    * Closure for get_message.
331    */
332   void *get_message_cls;
333
334   /**
335    * Timeout for this handle.
336    */
337   struct GNUNET_TIME_Absolute timeout;
338
339   /**
340    * How important is this message?
341    */
342   uint32_t priority;
343
344   /**
345    * Size of this request.
346    */
347   uint16_t msize;
348
349   /**
350    * Send message request ID for this request.
351    */
352   uint16_t smr_id;
353
354   /**
355    * Is corking allowed?
356    */
357   int cork;
358
359 };
360
361
362 /**
363  * Our current client connection went down.  Clean it up
364  * and try to reconnect!
365  *
366  * @param h our handle to the core service
367  */
368 static void
369 reconnect (struct GNUNET_CORE_Handle *h);
370
371
372 /**
373  * Task schedule to try to re-connect to core.
374  *
375  * @param cls the 'struct GNUNET_CORE_Handle'
376  * @param tc task context
377  */
378 static void
379 reconnect_task (void *cls, 
380                 const struct GNUNET_SCHEDULER_TaskContext *tc)
381 {
382   struct GNUNET_CORE_Handle *h = cls;
383
384   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
385 #if DEBUG_CORE
386   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
387               "Connecting to CORE service after delay\n");
388 #endif
389   reconnect (h);
390 }
391
392
393 /**
394  * Notify clients about disconnect and free 
395  * the entry for connected peer.
396  *
397  * @param cls the 'struct GNUNET_CORE_Handle*'
398  * @param key the peer identity (not used)
399  * @param value the 'struct PeerRecord' to free.
400  * @return GNUNET_YES (continue)
401  */
402 static int
403 disconnect_and_free_peer_entry (void *cls,
404                                 const GNUNET_HashCode *key,
405                                 void *value)
406 {
407   static struct GNUNET_BANDWIDTH_Value32NBO zero;
408   struct GNUNET_CORE_Handle *h = cls;
409   struct GNUNET_CORE_TransmitHandle *th;
410   struct PeerRecord *pr = value;
411   GNUNET_CORE_PeerConfigurationInfoCallback pcic;
412
413   while (NULL != (th = pr->pending_head))
414     {
415       GNUNET_CONTAINER_DLL_remove (pr->pending_head,
416                                    pr->pending_tail,
417                                    th);
418       pr->queue_size--;
419       GNUNET_assert (0 == 
420                      th->get_message (th->get_message_cls,
421                                       0, NULL));
422       GNUNET_free (th);
423     }
424   if (NULL != (pcic = pr->pcic))
425     {
426       pr->pcic = NULL;
427       GNUNET_free_non_null (pr->pcic_ptr);
428       pr->pcic_ptr = NULL;
429       pcic (pr->pcic_cls,
430             &pr->peer,
431             zero,
432             0, 
433             GNUNET_TIME_UNIT_FOREVER_REL,
434             0);
435     }
436   if (pr->timeout_task != GNUNET_SCHEDULER_NO_TASK)
437     {
438       GNUNET_SCHEDULER_cancel (pr->timeout_task);
439       pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
440     }
441   GNUNET_assert (pr->queue_size == 0);
442   if ( (pr->prev != NULL) ||
443        (pr->next != NULL) ||
444        (h->ready_peer_head == pr) )
445     GNUNET_CONTAINER_DLL_remove (h->ready_peer_head,
446                                  h->ready_peer_tail,
447                                  pr);
448   if (h->disconnects != NULL)
449     h->disconnects (h->cls,
450                     &pr->peer);    
451   GNUNET_assert (GNUNET_YES ==
452                  GNUNET_CONTAINER_multihashmap_remove (h->peers,
453                                                        key,
454                                                        pr));
455   GNUNET_assert (pr->pending_head == NULL);
456   GNUNET_assert (pr->pending_tail == NULL);
457   GNUNET_assert (pr->ch = h);
458   GNUNET_assert (pr->queue_size == 0);
459   GNUNET_assert (pr->timeout_task == GNUNET_SCHEDULER_NO_TASK);
460   GNUNET_free (pr);  
461   return GNUNET_YES;
462 }
463
464
465 /**
466  * Close down any existing connection to the CORE service and
467  * try re-establishing it later.
468  *
469  * @param h our handle
470  */
471 static void
472 reconnect_later (struct GNUNET_CORE_Handle *h)
473 {
474   struct ControlMessage *cm;
475   struct PeerRecord *pr;
476
477   GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
478   if (h->client != NULL)
479     {
480       GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
481       h->client = NULL;
482       h->cth = NULL;
483       GNUNET_CONTAINER_multihashmap_iterate (h->peers,
484                                              &disconnect_and_free_peer_entry,
485                                              h);
486     }
487   while (NULL != (pr = h->ready_peer_head))    
488     GNUNET_CONTAINER_DLL_remove (h->ready_peer_head,
489                                  h->ready_peer_tail,
490                                  pr);
491   h->currently_down = GNUNET_YES;
492   h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->retry_backoff,
493                                                     &reconnect_task,
494                                                     h);
495   while (NULL != (cm = h->control_pending_head))
496     {
497       GNUNET_CONTAINER_DLL_remove (h->control_pending_head,
498                                    h->control_pending_tail,
499                                    cm);
500       if (cm->th != NULL)
501         cm->th->cm = NULL; 
502       if (cm->cont != NULL)
503         cm->cont (cm->cont_cls, GNUNET_NO);
504       GNUNET_free (cm);
505     }
506   GNUNET_assert (h->control_pending_head == NULL);
507   h->retry_backoff = GNUNET_TIME_relative_min (GNUNET_TIME_UNIT_SECONDS,
508                                                h->retry_backoff);
509   h->retry_backoff = GNUNET_TIME_relative_multiply (h->retry_backoff, 2);
510 }
511
512
513 /**
514  * Check the list of pending requests, send the next
515  * one to the core.
516  *
517  * @param h core handle
518  * @param ignore_currently_down transmit message even if not initialized?
519  */
520 static void
521 trigger_next_request (struct GNUNET_CORE_Handle *h,
522                       int ignore_currently_down);
523
524
525 /**
526  * The given request hit its timeout.  Remove from the
527  * doubly-linked list and call the respective continuation.
528  *
529  * @param cls the transmit handle of the request that timed out
530  * @param tc context, can be NULL (!)
531  */
532 static void
533 transmission_timeout (void *cls, 
534                       const struct GNUNET_SCHEDULER_TaskContext *tc);
535
536
537 /**
538  * Send a control message to the peer asking for transmission
539  * of the message in the given peer record.
540  *
541  * @param pr peer to request transmission to
542  */
543 static void
544 request_next_transmission (struct PeerRecord *pr)
545 {
546   struct GNUNET_CORE_Handle *h = pr->ch;
547   struct ControlMessage *cm;
548   struct SendMessageRequest *smr;
549   struct GNUNET_CORE_TransmitHandle *th;
550
551   if (pr->timeout_task != GNUNET_SCHEDULER_NO_TASK)
552     {
553       GNUNET_SCHEDULER_cancel (pr->timeout_task);
554       pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
555     }
556   if (NULL == (th = pr->pending_head))
557     {
558       trigger_next_request (h, GNUNET_NO);
559       return;
560     }
561   if (th->cm != NULL)
562     return; /* already done */
563   GNUNET_assert (pr->prev == NULL);
564   GNUNET_assert (pr->next == NULL);
565   pr->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (th->timeout),
566                                                    &transmission_timeout,
567                                                    pr);
568   cm = GNUNET_malloc (sizeof (struct ControlMessage) + 
569                       sizeof (struct SendMessageRequest));
570   th->cm = cm;
571   cm->th = th;
572   smr = (struct SendMessageRequest*) &cm[1];
573   smr->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND_REQUEST);
574   smr->header.size = htons (sizeof (struct SendMessageRequest));
575   smr->priority = htonl (th->priority);
576   smr->deadline = GNUNET_TIME_absolute_hton (th->timeout);
577   smr->peer = pr->peer;
578   smr->queue_size = htonl (pr->queue_size);
579   smr->size = htons (th->msize);
580   smr->smr_id = htons (th->smr_id = pr->smr_id_gen++);
581   GNUNET_CONTAINER_DLL_insert_after (h->control_pending_head,
582                                      h->control_pending_tail,
583                                      h->control_pending_tail,
584                                      cm);
585 #if DEBUG_CORE
586   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
587               "Adding SEND REQUEST for peer `%s' to message queue\n",
588               GNUNET_i2s (&pr->peer));
589 #endif
590   trigger_next_request (h, GNUNET_NO);
591 }
592
593
594 /**
595  * The given request hit its timeout.  Remove from the
596  * doubly-linked list and call the respective continuation.
597  *
598  * @param cls the transmit handle of the request that timed out
599  * @param tc context, can be NULL (!)
600  */
601 static void
602 transmission_timeout (void *cls, 
603                       const struct GNUNET_SCHEDULER_TaskContext *tc)
604 {
605   struct PeerRecord *pr = cls;
606   struct GNUNET_CORE_Handle *h = pr->ch;
607   struct GNUNET_CORE_TransmitHandle *th;
608   
609   pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
610   th = pr->pending_head;
611   GNUNET_CONTAINER_DLL_remove (pr->pending_head,
612                                pr->pending_tail,
613                                th);
614   pr->queue_size--;
615   if ( (pr->prev != NULL) ||
616        (pr->next != NULL) ||
617        (pr == h->ready_peer_head) )
618     {
619       /* the request that was 'approved' by core was
620          canceled before it could be transmitted; remove
621          us from the 'ready' list */
622       GNUNET_CONTAINER_DLL_remove (h->ready_peer_head,
623                                    h->ready_peer_tail,
624                                    pr);
625     }
626 #if DEBUG_CORE
627   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
628               "Signalling timeout of request for transmission to CORE service\n");
629 #endif
630   GNUNET_assert (0 == th->get_message (th->get_message_cls, 0, NULL));
631   request_next_transmission (pr);
632 }
633
634
635 /**
636  * Transmit the next message to the core service.
637  */
638 static size_t
639 transmit_message (void *cls,
640                   size_t size, 
641                   void *buf)
642 {
643   struct GNUNET_CORE_Handle *h = cls;
644   struct ControlMessage *cm;
645   struct GNUNET_CORE_TransmitHandle *th;
646   struct PeerRecord *pr;
647   struct SendMessage *sm;
648   const struct GNUNET_MessageHeader *hdr;
649   uint16_t msize;
650   size_t ret;
651
652   GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
653   h->cth = NULL;
654   if (buf == NULL)
655     {
656 #if DEBUG_CORE
657       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
658                   "Transmission failed, initiating reconnect\n");
659 #endif
660       reconnect_later (h);
661       return 0;
662     }
663   /* first check for control messages */
664   if (NULL != (cm = h->control_pending_head))
665     {
666       hdr = (const struct GNUNET_MessageHeader*) &cm[1];
667       msize = ntohs (hdr->size);
668       if (size < msize)
669         {
670           trigger_next_request (h, GNUNET_NO);
671           return 0;
672         }
673 #if DEBUG_CORE
674       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
675                   "Transmitting control message with %u bytes of type %u to core.\n",
676                   (unsigned int) msize,
677                   (unsigned int) ntohs (hdr->type));
678 #endif
679       memcpy (buf, hdr, msize);
680       GNUNET_CONTAINER_DLL_remove (h->control_pending_head,
681                                    h->control_pending_tail,
682                                    cm);     
683       if (cm->th != NULL)
684         cm->th->cm = NULL;
685       if (NULL != cm->cont)
686         cm->cont (cm->cont_cls, GNUNET_OK);
687       GNUNET_free (cm);
688       trigger_next_request (h, GNUNET_NO);
689       return msize;
690     }
691   /* now check for 'ready' P2P messages */
692   if (NULL != (pr = h->ready_peer_head))
693     {
694       GNUNET_assert (pr->pending_head != NULL);
695       th = pr->pending_head;
696       if (size < th->msize + sizeof (struct SendMessage))
697         {
698           trigger_next_request (h, GNUNET_NO);
699           return 0;
700         }
701       GNUNET_CONTAINER_DLL_remove (h->ready_peer_head,
702                                    h->ready_peer_tail,
703                                    pr);
704       GNUNET_CONTAINER_DLL_remove (pr->pending_head,
705                                    pr->pending_tail,
706                                    th);
707       pr->queue_size--;
708       if (pr->timeout_task != GNUNET_SCHEDULER_NO_TASK)
709         {
710           GNUNET_SCHEDULER_cancel (pr->timeout_task);
711           pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
712         }
713 #if DEBUG_CORE
714       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
715                   "Transmitting SEND request to `%s' with %u bytes.\n",
716                   GNUNET_i2s (&pr->peer),
717                   (unsigned int) th->msize);
718 #endif
719       sm = (struct SendMessage *) buf;
720       sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND);
721       sm->priority = htonl (th->priority);
722       sm->deadline = GNUNET_TIME_absolute_hton (th->timeout);
723       sm->peer = pr->peer;
724       sm->cork = htonl ((uint32_t) th->cork);
725       sm->reserved = htonl (0);
726       ret = th->get_message (th->get_message_cls,
727                              size - sizeof (struct SendMessage),
728                              &sm[1]);
729
730       if (0 == ret)
731         {
732 #if DEBUG_CORE
733           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
734                       "Size of clients message to peer %s is 0!\n",
735                       GNUNET_i2s(&pr->peer));
736 #endif
737           /* client decided to send nothing! */
738           request_next_transmission (pr);
739           return 0;       
740         }
741 #if DEBUG_CORE
742       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
743                   "Produced SEND message to core with %u bytes payload\n",
744                   (unsigned int) ret);
745 #endif
746       GNUNET_assert (ret >= sizeof (struct GNUNET_MessageHeader));
747       if (ret + sizeof (struct SendMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
748         {
749           GNUNET_break (0);
750           request_next_transmission (pr);
751           return 0;
752         }
753       ret += sizeof (struct SendMessage);
754       sm->header.size = htons (ret);
755       GNUNET_assert (ret <= size);
756       GNUNET_free (th);
757       request_next_transmission (pr);
758       return ret;
759     }
760   return 0;
761 }
762
763
764 /**
765  * Check the list of pending requests, send the next
766  * one to the core.
767  *
768  * @param h core handle
769  * @param ignore_currently_down transmit message even if not initialized?
770  */
771 static void
772 trigger_next_request (struct GNUNET_CORE_Handle *h,
773                       int ignore_currently_down)
774 {
775   uint16_t msize;
776
777   if ( (GNUNET_YES == h->currently_down) &&
778        (ignore_currently_down == GNUNET_NO) )
779     {
780 #if DEBUG_CORE
781       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
782                   "Core connection down, not processing queue\n");
783 #endif
784       return;
785     }
786   if (NULL != h->cth)
787     {
788 #if DEBUG_CORE
789       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
790                   "Request pending, not processing queue\n");
791 #endif
792       return;
793     }
794   if (h->control_pending_head != NULL)
795     msize = ntohs (((struct GNUNET_MessageHeader*) &h->control_pending_head[1])->size);    
796   else if (h->ready_peer_head != NULL) 
797     msize = h->ready_peer_head->pending_head->msize + sizeof (struct SendMessage);    
798   else
799     {
800 #if DEBUG_CORE
801       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
802                   "Request queue empty, not processing queue\n");
803 #endif
804       return; /* no pending message */
805     }
806   h->cth = GNUNET_CLIENT_notify_transmit_ready (h->client,
807                                                 msize,
808                                                 GNUNET_TIME_UNIT_FOREVER_REL,
809                                                 GNUNET_NO,
810                                                 &transmit_message, h);
811 }
812
813
814 /**
815  * Handler for notification messages received from the core.
816  *
817  * @param cls our "struct GNUNET_CORE_Handle"
818  * @param msg the message received from the core service
819  */
820 static void
821 main_notify_handler (void *cls, 
822                      const struct GNUNET_MessageHeader *msg)
823 {
824   struct GNUNET_CORE_Handle *h = cls;
825   const struct InitReplyMessage *m;
826   const struct ConnectNotifyMessage *cnm;
827   const struct DisconnectNotifyMessage *dnm;
828   const struct NotifyTrafficMessage *ntm;
829   const struct GNUNET_MessageHeader *em;
830   const struct ConfigurationInfoMessage *cim;
831   const struct PeerStatusNotifyMessage *psnm;
832   const struct SendMessageReady *smr;
833   const struct GNUNET_CORE_MessageHandler *mh;
834   GNUNET_CORE_StartupCallback init;
835   GNUNET_CORE_PeerConfigurationInfoCallback pcic;
836   struct PeerRecord *pr;
837   struct GNUNET_CORE_TransmitHandle *th;
838   unsigned int hpos;
839   int trigger;
840   uint16_t msize;
841   uint16_t et;
842   uint32_t ats_count;
843
844   if (msg == NULL)
845     {
846       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
847                   _
848                   ("Client was disconnected from core service, trying to reconnect.\n"));
849       reconnect_later (h);
850       return;
851     }
852   msize = ntohs (msg->size);
853 #if DEBUG_CORE
854   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
855               "Processing message of type %u and size %u from core service\n",
856               ntohs (msg->type), msize);
857 #endif
858   switch (ntohs (msg->type))
859     {
860     case GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY:
861       if (ntohs (msg->size) != sizeof (struct InitReplyMessage))
862         {
863           GNUNET_break (0);
864           reconnect_later (h);
865           return;
866         }
867       m = (const struct InitReplyMessage *) msg;
868       GNUNET_break (0 == ntohl (m->reserved));
869       /* start our message processing loop */
870       if (GNUNET_YES == h->currently_down)
871         {
872           h->currently_down = GNUNET_NO;
873           trigger_next_request (h, GNUNET_NO);
874         }
875       h->retry_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
876       GNUNET_CRYPTO_hash (&m->publicKey,
877                           sizeof (struct
878                                   GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
879                           &h->me.hashPubKey);
880       if (NULL != (init = h->init))
881         {
882           /* mark so we don't call init on reconnect */
883           h->init = NULL;
884 #if DEBUG_CORE
885           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
886                       "Connected to core service of peer `%s'.\n",
887                       GNUNET_i2s (&h->me));
888 #endif
889           init (h->cls, h, &h->me, &m->publicKey);
890         }
891       else
892         {
893 #if DEBUG_CORE
894           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
895                       "Successfully reconnected to core service.\n");
896 #endif
897         }
898       /* fake 'connect to self' */
899       pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
900                                               &h->me.hashPubKey);
901       GNUNET_assert (pr == NULL);
902       pr = GNUNET_malloc (sizeof (struct PeerRecord));
903       pr->peer = h->me;
904       pr->ch = h;
905       GNUNET_assert (GNUNET_YES ==
906                      GNUNET_CONTAINER_multihashmap_put (h->peers,
907                                                         &h->me.hashPubKey,
908                                                         pr,
909                                                         GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
910       if (NULL != h->connects)
911         h->connects (h->cls,
912                      &h->me,
913                      NULL);
914       break;
915     case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT:
916       if (msize < sizeof (struct ConnectNotifyMessage))
917         {
918           GNUNET_break (0);
919           reconnect_later (h);
920           return;
921         }
922       cnm = (const struct ConnectNotifyMessage *) msg;
923       ats_count = ntohl (cnm->ats_count);
924       if ( (msize != sizeof (struct ConnectNotifyMessage) + ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information)) ||
925            (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR != ntohl ((&cnm->ats)[ats_count].type)) )
926         {
927           GNUNET_break (0);
928           reconnect_later (h);
929           return;
930         }
931 #if DEBUG_CORE
932       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
933                   "Received notification about connection from `%s'.\n",
934                   GNUNET_i2s (&cnm->peer));
935 #endif
936       if (0 == memcmp (&h->me,
937                        &cnm->peer,
938                        sizeof (struct GNUNET_PeerIdentity)))
939         {
940           /* connect to self!? */
941           GNUNET_break (0);
942           return;
943         }
944       pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
945                                               &cnm->peer.hashPubKey);
946       if (pr != NULL)
947         {
948           GNUNET_break (0);
949           reconnect_later (h);
950           return;
951         }
952       pr = GNUNET_malloc (sizeof (struct PeerRecord));
953       pr->peer = cnm->peer;
954       pr->ch = h;
955       GNUNET_assert (GNUNET_YES ==
956                      GNUNET_CONTAINER_multihashmap_put (h->peers,
957                                                         &cnm->peer.hashPubKey,
958                                                         pr,
959                                                         GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
960       if (NULL != h->connects)
961         h->connects (h->cls,
962                      &cnm->peer,
963                      &cnm->ats);
964       break;
965     case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT:
966       if (msize != sizeof (struct DisconnectNotifyMessage))
967         {
968           GNUNET_break (0);
969           reconnect_later (h);
970           return;
971         }
972       dnm = (const struct DisconnectNotifyMessage *) msg;
973       if (0 == memcmp (&h->me,
974                        &dnm->peer,
975                        sizeof (struct GNUNET_PeerIdentity)))
976         {
977           /* connection to self!? */
978           GNUNET_break (0);
979           return;
980         }
981       GNUNET_break (0 == ntohl (dnm->reserved));
982 #if DEBUG_CORE
983       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
984                   "Received notification about disconnect from `%s'.\n",
985                   GNUNET_i2s (&dnm->peer));
986 #endif
987       pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
988                                               &dnm->peer.hashPubKey);
989       if (pr == NULL)
990         {
991           GNUNET_break (0);
992           reconnect_later (h);
993           return;
994         }
995       trigger = ( (pr->prev != NULL) ||
996                   (pr->next != NULL) ||
997                   (h->ready_peer_head == pr) );
998       disconnect_and_free_peer_entry (h, &dnm->peer.hashPubKey, pr);
999       if (trigger)
1000         trigger_next_request (h, GNUNET_NO);
1001       break;
1002     case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_STATUS_CHANGE:
1003       if (NULL == h->status_events)
1004         {
1005           GNUNET_break (0);
1006           return;
1007         }
1008       if (msize < sizeof (struct PeerStatusNotifyMessage))
1009         {
1010           GNUNET_break (0);
1011           reconnect_later (h);
1012           return;
1013         }
1014       psnm = (const struct PeerStatusNotifyMessage *) msg;
1015       if (0 == memcmp (&h->me,
1016                        &psnm->peer,
1017                        sizeof (struct GNUNET_PeerIdentity)))
1018         {
1019           /* self-change!? */
1020           GNUNET_break (0);
1021           return;
1022         }
1023       ats_count = ntohl (psnm->ats_count);
1024       if ( (msize != sizeof (struct PeerStatusNotifyMessage) + ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information)) ||
1025            (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR != ntohl ((&psnm->ats)[ats_count].type)) )
1026         {
1027           GNUNET_break (0);
1028           reconnect_later (h);
1029           return;
1030         }
1031 #if DEBUG_CORE
1032       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1033                   "Received notification about status change by `%s'.\n",
1034                   GNUNET_i2s (&psnm->peer));
1035 #endif
1036       pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
1037                                               &psnm->peer.hashPubKey);
1038       if (pr == NULL)
1039         {
1040           GNUNET_break (0);
1041           reconnect_later (h);
1042           return;
1043         }
1044       h->status_events (h->cls,
1045                         &psnm->peer,
1046                         psnm->bandwidth_in,
1047                         psnm->bandwidth_out,
1048                         GNUNET_TIME_absolute_ntoh (psnm->timeout),
1049                         &psnm->ats);
1050       break;
1051     case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND:
1052       if (msize < sizeof (struct NotifyTrafficMessage))
1053         {
1054           GNUNET_break (0);
1055           reconnect_later (h);
1056           return;
1057         }
1058       ntm = (const struct NotifyTrafficMessage *) msg;
1059
1060       ats_count = ntohl (ntm->ats_count);
1061       if ( (msize < sizeof (struct NotifyTrafficMessage) + ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information)
1062             + sizeof (struct GNUNET_MessageHeader)) ||
1063            (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR != ntohl ((&ntm->ats)[ats_count].type)) )
1064         {
1065           GNUNET_break (0);
1066           reconnect_later (h);
1067           return;
1068         }
1069       em = (const struct GNUNET_MessageHeader *) &(&ntm->ats)[ats_count+1];
1070 #if DEBUG_CORE
1071       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1072                   "Received message of type %u and size %u from peer `%4s'\n",
1073                   ntohs (em->type), 
1074                   ntohs (em->size),
1075                   GNUNET_i2s (&ntm->peer));
1076 #endif
1077       pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
1078                                               &ntm->peer.hashPubKey);
1079       if (pr == NULL)
1080         {
1081           GNUNET_break (0);
1082           reconnect_later (h);
1083           return;
1084         }
1085       if ((GNUNET_NO == h->inbound_hdr_only) &&
1086           (msize != ntohs (em->size) + sizeof (struct NotifyTrafficMessage) + 
1087            + ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information)) )
1088         {
1089           GNUNET_break (0);
1090           reconnect_later (h);
1091           return;
1092         }
1093       et = ntohs (em->type);
1094       for (hpos = 0; hpos < h->hcnt; hpos++)
1095         {
1096           mh = &h->handlers[hpos];
1097           if (mh->type != et)
1098             continue;
1099           if ((mh->expected_size != ntohs (em->size)) &&
1100               (mh->expected_size != 0))
1101             {
1102               GNUNET_break (0);
1103               continue;
1104             }
1105           if (GNUNET_OK !=
1106               h->handlers[hpos].callback (h->cls, &ntm->peer, em,
1107                                           &ntm->ats))
1108             {
1109               /* error in processing, do not process other messages! */
1110               break;
1111             }
1112         }
1113       if (NULL != h->inbound_notify)
1114         h->inbound_notify (h->cls, &ntm->peer, em,
1115                            &ntm->ats);
1116       break;
1117     case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_OUTBOUND:
1118       if (msize < sizeof (struct NotifyTrafficMessage))
1119         {
1120           GNUNET_break (0);
1121           reconnect_later (h);
1122           return;
1123         }
1124       ntm = (const struct NotifyTrafficMessage *) msg;
1125       if (0 == memcmp (&h->me,
1126                        &ntm->peer,
1127                        sizeof (struct GNUNET_PeerIdentity)))
1128         {
1129           /* self-change!? */
1130           GNUNET_break (0);
1131           return;
1132         }
1133       ats_count = ntohl (ntm->ats_count);
1134       if ( (msize < sizeof (struct NotifyTrafficMessage) + ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information)
1135             + sizeof (struct GNUNET_MessageHeader)) ||
1136            (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR != ntohl ((&ntm->ats)[ats_count].type)) )
1137         {
1138           GNUNET_break (0);
1139           reconnect_later (h);
1140           return;
1141         }
1142       em = (const struct GNUNET_MessageHeader *) &(&ntm->ats)[ats_count+1];
1143       pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
1144                                               &ntm->peer.hashPubKey);
1145       if (pr == NULL)
1146         {
1147           GNUNET_break (0);
1148           reconnect_later (h);
1149           return;
1150         }
1151 #if DEBUG_CORE
1152       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1153                   "Received notification about transmission to `%s'.\n",
1154                   GNUNET_i2s (&ntm->peer));
1155 #endif
1156       if ((GNUNET_NO == h->outbound_hdr_only) &&
1157           (msize != ntohs (em->size) + sizeof (struct NotifyTrafficMessage) 
1158            + ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information)) )
1159         {
1160           GNUNET_break (0);
1161           reconnect_later (h);
1162           return;
1163         }
1164       if (NULL == h->outbound_notify)
1165         {
1166           GNUNET_break (0);
1167           break;
1168         }
1169       h->outbound_notify (h->cls, &ntm->peer, em,
1170                           &ntm->ats);
1171       break;
1172     case GNUNET_MESSAGE_TYPE_CORE_SEND_READY:
1173       if (msize != sizeof (struct SendMessageReady))
1174         {
1175           GNUNET_break (0);
1176           reconnect_later (h);
1177           return;
1178         }
1179       smr = (const struct SendMessageReady *) msg;
1180       pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
1181                                               &smr->peer.hashPubKey);
1182       if (pr == NULL)
1183         {
1184           GNUNET_break (0);
1185           reconnect_later (h);
1186           return;
1187         }
1188 #if DEBUG_CORE
1189       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1190                   "Received notification about transmission readiness to `%s'.\n",
1191                   GNUNET_i2s (&smr->peer));
1192 #endif
1193       if (pr->pending_head == NULL)
1194         {
1195           /* request must have been cancelled between the original request
1196              and the response from core, ignore core's readiness */
1197           return;
1198         }
1199
1200       th = pr->pending_head;
1201       if (ntohs (smr->smr_id) != th->smr_id)
1202         {
1203           /* READY message is for expired or cancelled message,
1204              ignore! (we should have already sent another request) */
1205           break;
1206         }
1207       if ( (pr->prev != NULL) ||
1208            (pr->next != NULL) ||
1209            (h->ready_peer_head == pr) )
1210         {
1211           /* we should not already be on the ready list... */
1212           GNUNET_break (0);
1213           reconnect_later (h);
1214           return;
1215         }
1216       GNUNET_CONTAINER_DLL_insert (h->ready_peer_head,
1217                                    h->ready_peer_tail,
1218                                    pr);
1219       trigger_next_request (h, GNUNET_NO);
1220       break;
1221     case GNUNET_MESSAGE_TYPE_CORE_CONFIGURATION_INFO:
1222       if (ntohs (msg->size) != sizeof (struct ConfigurationInfoMessage))
1223         {
1224           GNUNET_break (0);
1225           reconnect_later (h);
1226           return;
1227         }
1228       cim = (const struct ConfigurationInfoMessage*) msg;
1229       if (0 == memcmp (&h->me,
1230                        &cim->peer,
1231                        sizeof (struct GNUNET_PeerIdentity)))
1232         {
1233           /* self-change!? */
1234           GNUNET_break (0);
1235           return;
1236         }
1237 #if DEBUG_CORE
1238       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1239                   "Received notification about configuration update for `%s'.\n",
1240                   GNUNET_i2s (&cim->peer));
1241 #endif
1242       pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
1243                                               &cim->peer.hashPubKey);
1244       if (pr == NULL)
1245         {
1246           GNUNET_break (0);
1247           reconnect_later (h);
1248           return;
1249         }
1250       if (pr->rim_id != ntohl (cim->rim_id))
1251         {
1252 #if DEBUG_CORE
1253           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1254                       "Reservation ID mismatch in notification...\n");
1255 #endif
1256           break;
1257         }
1258       pcic = pr->pcic;
1259       pr->pcic = NULL;
1260       GNUNET_free_non_null (pr->pcic_ptr);
1261       pr->pcic_ptr = NULL;
1262       if (pcic != NULL)
1263         pcic (pr->pcic_cls,
1264               &pr->peer,
1265               cim->bw_out,
1266               ntohl (cim->reserved_amount),
1267               GNUNET_TIME_relative_ntoh (cim->reserve_delay),
1268               GNUNET_ntohll (cim->preference));
1269       break;
1270     default:
1271       reconnect_later (h);
1272       return;
1273     }
1274   GNUNET_CLIENT_receive (h->client,
1275                          &main_notify_handler, h, 
1276                          GNUNET_TIME_UNIT_FOREVER_REL);
1277 }
1278
1279
1280 /**
1281  * Task executed once we are done transmitting the INIT message.
1282  * Starts our 'receive' loop.
1283  *
1284  * @param cls the 'struct GNUNET_CORE_Handle'
1285  * @param success were we successful
1286  */
1287 static void
1288 init_done_task (void *cls, 
1289                 int success)
1290 {
1291   struct GNUNET_CORE_Handle *h = cls;
1292
1293   if (success == GNUNET_SYSERR)
1294     return; /* shutdown */
1295   if (success == GNUNET_NO)
1296     {
1297 #if DEBUG_CORE
1298       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1299                   "Failed to exchange INIT with core, retrying\n");
1300 #endif
1301       if (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK)
1302         reconnect_later (h);
1303       return;
1304     }
1305   GNUNET_CLIENT_receive (h->client,
1306                          &main_notify_handler, 
1307                          h, 
1308                          GNUNET_TIME_UNIT_FOREVER_REL);
1309 }
1310
1311
1312 /**
1313  * Our current client connection went down.  Clean it up
1314  * and try to reconnect!
1315  *
1316  * @param h our handle to the core service
1317  */
1318 static void
1319 reconnect (struct GNUNET_CORE_Handle *h)
1320 {
1321   struct ControlMessage *cm;
1322   struct InitMessage *init;
1323   uint32_t opt;
1324   uint16_t msize;
1325   uint16_t *ts;
1326   unsigned int hpos;
1327
1328 #if DEBUG_CORE
1329   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1330               "Reconnecting to CORE service\n");
1331 #endif
1332   GNUNET_assert (h->client == NULL);
1333   GNUNET_assert (h->currently_down == GNUNET_YES);
1334   h->client = GNUNET_CLIENT_connect ("core", h->cfg);
1335   if (h->client == NULL)
1336     {
1337       reconnect_later (h);
1338       return;
1339     }
1340   msize = h->hcnt * sizeof (uint16_t) + sizeof (struct InitMessage);
1341   cm = GNUNET_malloc (sizeof (struct ControlMessage) +
1342                       msize);
1343   cm->cont = &init_done_task;
1344   cm->cont_cls = h;
1345   init = (struct InitMessage*) &cm[1];
1346   init->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_INIT);
1347   init->header.size = htons (msize);
1348   opt = GNUNET_CORE_OPTION_SEND_CONNECT | GNUNET_CORE_OPTION_SEND_DISCONNECT;
1349   if (h->status_events != NULL)
1350     opt |= GNUNET_CORE_OPTION_SEND_STATUS_CHANGE;
1351   if (h->inbound_notify != NULL)
1352     {
1353       if (h->inbound_hdr_only)
1354         opt |= GNUNET_CORE_OPTION_SEND_HDR_INBOUND;
1355       else
1356         opt |= GNUNET_CORE_OPTION_SEND_FULL_INBOUND;
1357     }
1358   if (h->outbound_notify != NULL)
1359     {
1360       if (h->outbound_hdr_only)
1361         opt |= GNUNET_CORE_OPTION_SEND_HDR_OUTBOUND;
1362       else
1363         opt |= GNUNET_CORE_OPTION_SEND_FULL_OUTBOUND;
1364     }
1365   init->options = htonl (opt);
1366   ts = (uint16_t *) &init[1];
1367   for (hpos = 0; hpos < h->hcnt; hpos++)
1368     ts[hpos] = htons (h->handlers[hpos].type);
1369   GNUNET_CONTAINER_DLL_insert (h->control_pending_head,
1370                                h->control_pending_tail,
1371                                cm);
1372   trigger_next_request (h, GNUNET_YES);
1373 }
1374
1375
1376
1377 /**
1378  * Connect to the core service.  Note that the connection may
1379  * complete (or fail) asynchronously.
1380  *
1381  * @param cfg configuration to use
1382  * @param queue_size size of the per-peer message queue
1383  * @param cls closure for the various callbacks that follow (including handlers in the handlers array)
1384  * @param init callback to call on timeout or once we have successfully
1385  *        connected to the core service; note that timeout is only meaningful if init is not NULL
1386  * @param connects function to call on peer connect, can be NULL
1387  * @param disconnects function to call on peer disconnect / timeout, can be NULL
1388  * @param status_events function to call on changes to peer connection status, can be NULL
1389  * @param inbound_notify function to call for all inbound messages, can be NULL
1390  * @param inbound_hdr_only set to GNUNET_YES if inbound_notify will only read the
1391  *                GNUNET_MessageHeader and hence we do not need to give it the full message;
1392  *                can be used to improve efficiency, ignored if inbound_notify is NULLL
1393  * @param outbound_notify function to call for all outbound messages, can be NULL
1394  * @param outbound_hdr_only set to GNUNET_YES if outbound_notify will only read the
1395  *                GNUNET_MessageHeader and hence we do not need to give it the full message
1396  *                can be used to improve efficiency, ignored if outbound_notify is NULLL
1397  * @param handlers callbacks for messages we care about, NULL-terminated
1398  * @return handle to the core service (only useful for disconnect until 'init' is called);
1399  *                NULL on error (in this case, init is never called)
1400  */
1401 struct GNUNET_CORE_Handle *
1402 GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
1403                      unsigned int queue_size,
1404                      void *cls,
1405                      GNUNET_CORE_StartupCallback init,
1406                      GNUNET_CORE_ConnectEventHandler connects,
1407                      GNUNET_CORE_DisconnectEventHandler disconnects,
1408                      GNUNET_CORE_PeerStatusEventHandler status_events,
1409                      GNUNET_CORE_MessageCallback inbound_notify,
1410                      int inbound_hdr_only,
1411                      GNUNET_CORE_MessageCallback outbound_notify,
1412                      int outbound_hdr_only,
1413                      const struct GNUNET_CORE_MessageHandler *handlers)
1414 {
1415   struct GNUNET_CORE_Handle *h;
1416
1417   h = GNUNET_malloc (sizeof (struct GNUNET_CORE_Handle));
1418   h->cfg = cfg;
1419   h->queue_size = queue_size;
1420   h->cls = cls;
1421   h->init = init;
1422   h->connects = connects;
1423   h->disconnects = disconnects;
1424   h->status_events = status_events;
1425   h->inbound_notify = inbound_notify;
1426   h->outbound_notify = outbound_notify;
1427   h->inbound_hdr_only = inbound_hdr_only;
1428   h->outbound_hdr_only = outbound_hdr_only;
1429   h->handlers = handlers;
1430   h->hcnt = 0;
1431   h->currently_down = GNUNET_YES;
1432   h->peers = GNUNET_CONTAINER_multihashmap_create (128);
1433   h->retry_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
1434   while (handlers[h->hcnt].callback != NULL)
1435     h->hcnt++;
1436   GNUNET_assert (h->hcnt <
1437                  (GNUNET_SERVER_MAX_MESSAGE_SIZE -
1438                   sizeof (struct InitMessage)) / sizeof (uint16_t));
1439 #if DEBUG_CORE
1440   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1441               "Connecting to CORE service\n");
1442 #endif
1443   reconnect (h);
1444   return h;
1445 }
1446
1447
1448 /**
1449  * Disconnect from the core service.  This function can only 
1450  * be called *after* all pending 'GNUNET_CORE_notify_transmit_ready'
1451  * requests have been explicitly canceled.
1452  *
1453  * @param handle connection to core to disconnect
1454  */
1455 void
1456 GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle)
1457 {
1458   struct ControlMessage *cm;
1459   
1460 #if DEBUG_CORE
1461   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1462               "Disconnecting from CORE service\n");
1463 #endif
1464   if (handle->cth != NULL)
1465     {
1466       GNUNET_CLIENT_notify_transmit_ready_cancel (handle->cth);
1467       handle->cth = NULL;
1468     }
1469   if (handle->client != NULL)
1470     {
1471       GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
1472       handle->client = NULL;
1473     }
1474   while (NULL != (cm = handle->control_pending_head))
1475     {
1476       GNUNET_CONTAINER_DLL_remove (handle->control_pending_head,
1477                                    handle->control_pending_tail,
1478                                    cm);
1479       if (cm->th != NULL)
1480         cm->th->cm = NULL;
1481       if (cm->cont != NULL)
1482         cm->cont (cm->cont_cls, GNUNET_SYSERR);
1483       GNUNET_free (cm);
1484     }
1485   if (handle->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
1486     {
1487       GNUNET_SCHEDULER_cancel (handle->reconnect_task);
1488       handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
1489     }
1490   GNUNET_CONTAINER_multihashmap_iterate (handle->peers,
1491                                          &disconnect_and_free_peer_entry,
1492                                          handle);
1493   GNUNET_CONTAINER_multihashmap_destroy (handle->peers);
1494   GNUNET_break (handle->ready_peer_head == NULL);
1495   GNUNET_free (handle);
1496 }
1497
1498
1499 /**
1500  * Ask the core to call "notify" once it is ready to transmit the
1501  * given number of bytes to the specified "target".    Must only be
1502  * called after a connection to the respective peer has been
1503  * established (and the client has been informed about this).
1504  *
1505  * @param handle connection to core service
1506  * @param cork is corking allowed for this transmission?
1507  * @param priority how important is the message?
1508  * @param maxdelay how long can the message wait?
1509  * @param target who should receive the message,
1510  *        use NULL for this peer (loopback)
1511  * @param notify_size how many bytes of buffer space does notify want?
1512  * @param notify function to call when buffer space is available
1513  * @param notify_cls closure for notify
1514  * @return non-NULL if the notify callback was queued,
1515  *         NULL if we can not even queue the request (insufficient
1516  *         memory); if NULL is returned, "notify" will NOT be called.
1517  */
1518 struct GNUNET_CORE_TransmitHandle *
1519 GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle,
1520                                    int cork,
1521                                    uint32_t priority,
1522                                    struct GNUNET_TIME_Relative maxdelay,
1523                                    const struct GNUNET_PeerIdentity *target,
1524                                    size_t notify_size,
1525                                    GNUNET_CONNECTION_TransmitReadyNotify notify,
1526                                    void *notify_cls)
1527 {
1528   struct PeerRecord *pr;
1529   struct GNUNET_CORE_TransmitHandle *th;
1530   struct GNUNET_CORE_TransmitHandle *pos;
1531   struct GNUNET_CORE_TransmitHandle *prev;
1532   struct GNUNET_CORE_TransmitHandle *minp;
1533
1534   pr = GNUNET_CONTAINER_multihashmap_get (handle->peers,
1535                                           &target->hashPubKey);
1536   if (NULL == pr)
1537     {
1538       /* attempt to send to peer that is not connected */
1539       GNUNET_log(GNUNET_ERROR_TYPE_WARNING,
1540                  "Attempting to send to peer `%s' from peer `%s', but not connected!\n",
1541                  GNUNET_i2s(target), GNUNET_h2s(&handle->me.hashPubKey));
1542       GNUNET_break (0);
1543       return NULL;
1544     }
1545   GNUNET_assert (notify_size + sizeof (struct SendMessage) <
1546                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
1547   th = GNUNET_malloc (sizeof (struct GNUNET_CORE_TransmitHandle));
1548   th->peer = pr;
1549   GNUNET_assert(NULL != notify);
1550   th->get_message = notify;
1551   th->get_message_cls = notify_cls;
1552   th->timeout = GNUNET_TIME_relative_to_absolute (maxdelay);
1553   th->priority = priority;
1554   th->msize = notify_size;
1555   th->cork = cork;
1556   /* bound queue size */
1557   if (pr->queue_size == handle->queue_size)
1558     {
1559       /* find lowest-priority entry, but skip the head of the list */
1560       minp = pr->pending_head->next;
1561       prev = minp;
1562       while (prev != NULL)
1563         {
1564           if (prev->priority < minp->priority)
1565             minp = prev;
1566           prev = prev->next;
1567         }
1568       if (minp == NULL) 
1569         {
1570           GNUNET_break (handle->queue_size != 0);
1571           GNUNET_break (pr->queue_size == 1);
1572           GNUNET_free(th);
1573 #if DEBUG_CORE
1574           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1575                       "Dropping transmission request: cannot drop queue head and limit is one\n");
1576 #endif
1577           return NULL;
1578         }
1579       if (priority <= minp->priority)
1580         {
1581 #if DEBUG_CORE
1582           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1583                       "Dropping transmission request: priority too low\n");
1584 #endif
1585           return NULL; /* priority too low */
1586         }
1587       GNUNET_CONTAINER_DLL_remove (pr->pending_head,
1588                                    pr->pending_tail,
1589                                    minp);
1590       pr->queue_size--;
1591       GNUNET_assert (0 ==
1592                      minp->get_message (minp->get_message_cls,
1593                                         0, NULL));
1594       GNUNET_free (minp);
1595     }
1596
1597   /* Order entries by deadline, but SKIP 'HEAD' if
1598      we're in the 'ready_peer_*' DLL */
1599   pos = pr->pending_head;
1600   if ( (pr->prev != NULL) ||
1601        (pr->next != NULL) ||
1602        (pr == handle->ready_peer_head) )
1603     {
1604       GNUNET_assert (pos != NULL);
1605       pos = pos->next; /* skip head */
1606     }
1607
1608   /* insertion sort */
1609   prev = pos;
1610   while ( (pos != NULL) &&
1611           (pos->timeout.abs_value < th->timeout.abs_value) )      
1612     {
1613       prev = pos;
1614       pos = pos->next;
1615     }
1616   GNUNET_CONTAINER_DLL_insert_after (pr->pending_head,
1617                                      pr->pending_tail,
1618                                      prev,
1619                                      th);
1620   pr->queue_size++;
1621   /* was the request queue previously empty? */
1622 #if DEBUG_CORE
1623   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1624               "Transmission request added to queue\n");
1625 #endif
1626   if (pr->pending_head == th) 
1627     request_next_transmission (pr);
1628   return th;
1629 }
1630
1631
1632 /**
1633  * Cancel the specified transmission-ready notification.
1634  *
1635  * @param th handle that was returned by "notify_transmit_ready".
1636  */
1637 void
1638 GNUNET_CORE_notify_transmit_ready_cancel (struct GNUNET_CORE_TransmitHandle
1639                                           *th)
1640 {
1641   struct PeerRecord *pr = th->peer;
1642   struct GNUNET_CORE_Handle *h = pr->ch;
1643   int was_head;
1644   
1645   was_head = (pr->pending_head == th);
1646   GNUNET_CONTAINER_DLL_remove (pr->pending_head,
1647                                pr->pending_tail,
1648                                th);    
1649   pr->queue_size--;
1650   if (th->cm != NULL)
1651     {
1652       /* we're currently in the control queue, remove */
1653       GNUNET_CONTAINER_DLL_remove (h->control_pending_head,
1654                                    h->control_pending_tail,
1655                                    th->cm);
1656       GNUNET_free (th->cm);      
1657     }
1658   GNUNET_free (th);
1659   if (was_head)
1660     {
1661       if ( (pr->prev != NULL) ||
1662            (pr->next != NULL) ||
1663            (pr == h->ready_peer_head) )
1664         {
1665           /* the request that was 'approved' by core was
1666              canceled before it could be transmitted; remove
1667              us from the 'ready' list */
1668           GNUNET_CONTAINER_DLL_remove (h->ready_peer_head,
1669                                        h->ready_peer_tail,
1670                                        pr);
1671         }
1672       request_next_transmission (pr);
1673     }
1674 }
1675
1676
1677 /* ****************** GNUNET_CORE_peer_request_connect ******************** */
1678
1679 /**
1680  * Handle for a request to the core to connect to
1681  * a particular peer.  Can be used to cancel the request
1682  * (before the 'cont'inuation is called).
1683  */
1684 struct GNUNET_CORE_PeerRequestHandle
1685 {
1686
1687   /**
1688    * Link to control message.
1689    */
1690   struct ControlMessage *cm;
1691
1692   /**
1693    * Core handle used.
1694    */
1695   struct GNUNET_CORE_Handle *h;
1696
1697   /**
1698    * Continuation to run when done.
1699    */
1700   GNUNET_CORE_ControlContinuation cont;
1701
1702   /**
1703    * Closure for 'cont'.
1704    */
1705   void *cont_cls;
1706
1707 };
1708
1709
1710 /**
1711  * Continuation called when the control message was transmitted.
1712  * Calls the original continuation and frees the remaining
1713  * resources.
1714  *
1715  * @param cls the 'struct GNUNET_CORE_PeerRequestHandle'
1716  * @param success was the request transmitted?
1717  */
1718 static void
1719 peer_request_connect_cont (void *cls,
1720                            int success)
1721 {
1722   struct GNUNET_CORE_PeerRequestHandle *ret = cls;
1723   
1724   if (ret->cont != NULL)
1725     ret->cont (ret->cont_cls, success);    
1726   GNUNET_free (ret);
1727 }
1728
1729
1730 /**
1731  * Request that the core should try to connect to a particular peer.
1732  * Once the request has been transmitted to the core, the continuation
1733  * function will be called.  Note that this does NOT mean that a
1734  * connection was successfully established -- it only means that the
1735  * core will now try.  Successful establishment of the connection
1736  * will be signalled to the 'connects' callback argument of
1737  * 'GNUNET_CORE_connect' only.  If the core service does not respond
1738  * to our connection attempt within the given time frame, 'cont' will
1739  * be called with the TIMEOUT reason code.
1740  *
1741  * @param h core handle
1742  * @param timeout how long to try to talk to core
1743  * @param peer who should we connect to
1744  * @param cont function to call once the request has been completed (or timed out)
1745  * @param cont_cls closure for cont
1746  *
1747  * @return NULL on error or already connected,
1748  *         otherwise handle for cancellation
1749  */
1750 struct GNUNET_CORE_PeerRequestHandle *
1751 GNUNET_CORE_peer_request_connect (struct GNUNET_CORE_Handle *h,
1752                                   struct GNUNET_TIME_Relative timeout,
1753                                   const struct GNUNET_PeerIdentity * peer,
1754                                   GNUNET_CORE_ControlContinuation cont,
1755                                   void *cont_cls)
1756 {
1757   struct GNUNET_CORE_PeerRequestHandle *ret;
1758   struct ControlMessage *cm;
1759   struct ConnectMessage *msg;
1760
1761   if (NULL != GNUNET_CONTAINER_multihashmap_get (h->peers,
1762                                           &peer->hashPubKey))
1763     {
1764 #if DEBUG_CORE
1765       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Peers are already connected!\n");
1766 #endif
1767       return NULL;
1768     }
1769   
1770   cm = GNUNET_malloc (sizeof (struct ControlMessage) + 
1771                       sizeof (struct ConnectMessage));
1772   msg = (struct ConnectMessage*) &cm[1];
1773   msg->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_REQUEST_CONNECT);
1774   msg->header.size = htons (sizeof (struct ConnectMessage));
1775   msg->reserved = htonl (0);
1776   msg->timeout = GNUNET_TIME_relative_hton (timeout);
1777   msg->peer = *peer;
1778   GNUNET_CONTAINER_DLL_insert_tail (h->control_pending_head,
1779                                     h->control_pending_tail,
1780                                     cm);
1781   ret = GNUNET_malloc (sizeof (struct GNUNET_CORE_PeerRequestHandle));
1782   ret->h = h;
1783   ret->cm = cm;
1784   ret->cont = cont;
1785   ret->cont_cls = cont_cls;
1786   cm->cont = &peer_request_connect_cont;
1787   cm->cont_cls = ret;
1788 #if DEBUG_CORE
1789   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1790               "Queueing REQUEST_CONNECT request\n");
1791 #endif
1792   if (h->control_pending_head == cm)
1793     trigger_next_request (h, GNUNET_NO);
1794   return ret;
1795 }
1796
1797
1798 /**
1799  * Cancel a pending request to connect to a particular peer.  Must not
1800  * be called after the 'cont' function was invoked.
1801  *
1802  * @param req request handle that was returned for the original request
1803  */
1804 void
1805 GNUNET_CORE_peer_request_connect_cancel (struct GNUNET_CORE_PeerRequestHandle *req)
1806 {
1807   struct GNUNET_CORE_Handle *h = req->h;
1808   struct ControlMessage *cm = req->cm;
1809
1810   GNUNET_CONTAINER_DLL_remove (h->control_pending_head,
1811                                h->control_pending_tail,
1812                                cm);
1813   GNUNET_free (cm);
1814   GNUNET_free (req);
1815 }
1816
1817
1818 /* ****************** GNUNET_CORE_peer_change_preference ******************** */
1819
1820
1821 struct GNUNET_CORE_InformationRequestContext 
1822 {
1823   
1824   /**
1825    * Our connection to the service.
1826    */
1827   struct GNUNET_CORE_Handle *h;
1828
1829   /**
1830    * Link to control message, NULL if CM was sent.
1831    */ 
1832   struct ControlMessage *cm;
1833
1834   /**
1835    * Link to peer record.
1836    */
1837   struct PeerRecord *pr;
1838 };
1839
1840
1841 /**
1842  * CM was sent, remove link so we don't double-free.
1843  *
1844  * @param cls the 'struct GNUNET_CORE_InformationRequestContext'
1845  * @param success were we successful?
1846  */
1847 static void
1848 change_preference_send_continuation (void *cls,
1849                                      int success)
1850 {
1851   struct GNUNET_CORE_InformationRequestContext *irc = cls;
1852
1853   irc->cm = NULL;
1854   // FIXME: who frees 'irc'?
1855 }
1856
1857
1858 /**
1859  * Obtain statistics and/or change preferences for the given peer.
1860  *
1861  * @param h core handle
1862  * @param peer identifies the peer
1863  * @param timeout after how long should we give up (and call "info" with NULL
1864  *                for "peer" to signal an error)?
1865  * @param bw_out set to the current bandwidth limit (sending) for this peer,
1866  *                caller should set "bw_out" to "-1" to avoid changing
1867  *                the current value; otherwise "bw_out" will be lowered to
1868  *                the specified value; passing a pointer to "0" can be used to force
1869  *                us to disconnect from the peer; "bw_out" might not increase
1870  *                as specified since the upper bound is generally
1871  *                determined by the other peer!
1872  * @param amount reserve N bytes for receiving, negative
1873  *                amounts can be used to undo a (recent) reservation;
1874  * @param preference increase incoming traffic share preference by this amount;
1875  *                in the absence of "amount" reservations, we use this
1876  *                preference value to assign proportional bandwidth shares
1877  *                to all connected peers
1878  * @param info function to call with the resulting configuration information
1879  * @param info_cls closure for info
1880  * @return NULL on error
1881  */
1882 struct GNUNET_CORE_InformationRequestContext *
1883 GNUNET_CORE_peer_change_preference (struct GNUNET_CORE_Handle *h,
1884                                     const struct GNUNET_PeerIdentity *peer,
1885                                     struct GNUNET_TIME_Relative timeout,
1886                                     struct GNUNET_BANDWIDTH_Value32NBO bw_out,
1887                                     int32_t amount,
1888                                     uint64_t preference,
1889                                     GNUNET_CORE_PeerConfigurationInfoCallback info,
1890                                     void *info_cls)
1891 {
1892   struct GNUNET_CORE_InformationRequestContext *irc;
1893   struct PeerRecord *pr;
1894   struct RequestInfoMessage *rim;
1895   struct ControlMessage *cm;
1896
1897   pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
1898                                           &peer->hashPubKey);
1899   if (NULL == pr)
1900     {
1901       /* attempt to change preference on peer that is not connected */
1902       GNUNET_break (0);
1903       return NULL;
1904     }
1905   if (pr->pcic != NULL)
1906     {
1907       /* second change before first one is done */
1908       GNUNET_break (0);
1909       return NULL;
1910     }
1911   irc = GNUNET_malloc (sizeof (struct GNUNET_CORE_InformationRequestContext));
1912   irc->h = h;
1913   irc->pr = pr;
1914   cm = GNUNET_malloc (sizeof (struct ControlMessage) +
1915                       sizeof (struct RequestInfoMessage));
1916   cm->cont = &change_preference_send_continuation;
1917   cm->cont_cls = irc;
1918   irc->cm = cm;
1919   rim = (struct RequestInfoMessage*) &cm[1];
1920   rim->header.size = htons (sizeof (struct RequestInfoMessage));
1921   rim->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_REQUEST_INFO);
1922   rim->rim_id = htonl (pr->rim_id = h->rim_id_gen++);
1923   rim->limit_outbound = bw_out;
1924   rim->reserve_inbound = htonl (amount);
1925   rim->preference_change = GNUNET_htonll(preference);
1926   rim->peer = *peer;
1927 #if DEBUG_CORE
1928   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1929               "Queueing CHANGE PREFERENCE request\n");
1930 #endif
1931   GNUNET_CONTAINER_DLL_insert_tail (h->control_pending_head,
1932                                     h->control_pending_tail,
1933                                     cm); 
1934   pr->pcic = info;
1935   pr->pcic_cls = info_cls;
1936   pr->pcic_ptr = irc; /* for free'ing irc */
1937   if (h->control_pending_head == cm)
1938     trigger_next_request (h, GNUNET_NO);
1939   return irc;
1940 }
1941
1942
1943 /**
1944  * Cancel request for getting information about a peer.
1945  * Note that an eventual change in preference, trust or bandwidth
1946  * assignment MAY have already been committed at the time, 
1947  * so cancelling a request is NOT sure to undo the original
1948  * request.  The original request may or may not still commit.
1949  * The only thing cancellation ensures is that the callback
1950  * from the original request will no longer be called.
1951  *
1952  * @param irc context returned by the original GNUNET_CORE_peer_get_info call
1953  */
1954 void
1955 GNUNET_CORE_peer_change_preference_cancel (struct GNUNET_CORE_InformationRequestContext *irc)
1956 {
1957   struct GNUNET_CORE_Handle *h = irc->h;
1958   struct PeerRecord *pr = irc->pr;
1959
1960   GNUNET_assert (pr->pcic_ptr == irc);
1961   if (irc->cm != NULL)
1962     {
1963       GNUNET_CONTAINER_DLL_remove (h->control_pending_head,
1964                                    h->control_pending_tail,
1965                                    irc->cm);
1966       GNUNET_free (irc->cm);
1967     }
1968   pr->pcic = NULL;
1969   pr->pcic_cls = NULL;
1970   pr->pcic_ptr = NULL;
1971   GNUNET_free (irc);
1972 }
1973
1974
1975 /* end of core_api.c */