avoid direct call of notification -- except on error
[oweals/gnunet.git] / src / core / core_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file core/core_api.c
23  * @brief core service; this is the main API for encrypted P2P
24  *        communications
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_constants.h"
29 #include "gnunet_core_service.h"
30 #include "core.h"
31
32
33 /**
34  * Information we track for each peer.
35  */
36 struct PeerRecord
37 {
38
39   /**
40    * We generally do NOT keep peer records in a DLL; this
41    * DLL is only used IF this peer's 'pending_head' message
42    * is ready for transmission.  
43    */
44   struct PeerRecord *prev;
45
46   /**
47    * We generally do NOT keep peer records in a DLL; this
48    * DLL is only used IF this peer's 'pending_head' message
49    * is ready for transmission. 
50    */
51   struct PeerRecord *next;
52
53   /**
54    * Peer the record is about.
55    */
56   struct GNUNET_PeerIdentity peer;
57
58   /**
59    * Corresponding core handle.
60    */
61   struct GNUNET_CORE_Handle *ch;
62
63   /**
64    * Head of doubly-linked list of pending requests.
65    * Requests are sorted by deadline *except* for HEAD,
66    * which is only modified upon transmission to core.
67    */
68   struct GNUNET_CORE_TransmitHandle *pending_head;
69
70   /**
71    * Tail of doubly-linked list of pending requests.
72    */
73   struct GNUNET_CORE_TransmitHandle *pending_tail;
74
75   /**
76    * Pending callback waiting for peer information, or NULL for none.
77    */
78   GNUNET_CORE_PeerConfigurationInfoCallback pcic;
79
80   /**
81    * Closure for pcic.
82    */
83   void *pcic_cls;
84
85   /**
86    * Pointer to free when we call pcic.
87    */
88   void *pcic_ptr;
89
90   /**
91    * Request information ID for the given pcic (needed in case a
92    * request is cancelled after being submitted to core and a new
93    * one is generated; in this case, we need to avoid matching the
94    * reply to the first (cancelled) request to the second request).
95    */
96   uint32_t rim_id;
97
98   /**
99    * ID of timeout task for the 'pending_head' handle
100    * which is the one with the smallest timeout. 
101    */
102   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
103
104   /**
105    * ID of task to run 'next_request_transmission'.
106    */
107   GNUNET_SCHEDULER_TaskIdentifier ntr_task;
108
109   /**
110    * Current size of the queue of pending requests.
111    */
112   unsigned int queue_size;
113
114   /**
115    * SendMessageRequest ID generator for this peer.
116    */
117   uint16_t smr_id_gen;
118   
119 };
120
121
122 /**
123  * Entry in a doubly-linked list of control messages to be transmitted
124  * to the core service.  Control messages include traffic allocation,
125  * connection requests and of course our initial 'init' request.
126  * 
127  * The actual message is allocated at the end of this struct.
128  */
129 struct ControlMessage
130 {
131   /**
132    * This is a doubly-linked list.
133    */
134   struct ControlMessage *next;
135
136   /**
137    * This is a doubly-linked list.
138    */
139   struct ControlMessage *prev;
140
141   /**
142    * Function to run after transmission failed/succeeded.
143    */
144   GNUNET_CORE_ControlContinuation cont;
145   
146   /**
147    * Closure for 'cont'.
148    */
149   void *cont_cls;
150
151   /**
152    * Transmit handle (if one is associated with this ControlMessage), or NULL.
153    */
154   struct GNUNET_CORE_TransmitHandle *th;
155 };
156
157
158
159 /**
160  * Context for the core service connection.
161  */
162 struct GNUNET_CORE_Handle
163 {
164
165   /**
166    * Configuration we're using.
167    */
168   const struct GNUNET_CONFIGURATION_Handle *cfg;
169
170   /**
171    * Closure for the various callbacks.
172    */
173   void *cls;
174
175   /**
176    * Function to call once we've handshaked with the core service.
177    */
178   GNUNET_CORE_StartupCallback init;
179
180   /**
181    * Function to call whenever we're notified about a peer connecting.
182    */
183   GNUNET_CORE_ConnectEventHandler connects;
184
185   /**
186    * Function to call whenever we're notified about a peer disconnecting.
187    */
188   GNUNET_CORE_DisconnectEventHandler disconnects;
189
190   /**
191    * Function to call whenever we're notified about a peer changing status.
192    */  
193   GNUNET_CORE_PeerStatusEventHandler status_events;
194   
195   /**
196    * Function to call whenever we receive an inbound message.
197    */
198   GNUNET_CORE_MessageCallback inbound_notify;
199
200   /**
201    * Function to call whenever we receive an outbound message.
202    */
203   GNUNET_CORE_MessageCallback outbound_notify;
204
205   /**
206    * Function handlers for messages of particular type.
207    */
208   const struct GNUNET_CORE_MessageHandler *handlers;
209
210   /**
211    * Our connection to the service.
212    */
213   struct GNUNET_CLIENT_Connection *client;
214
215   /**
216    * Handle for our current transmission request.
217    */
218   struct GNUNET_CLIENT_TransmitHandle *cth;
219
220   /**
221    * Head of doubly-linked list of pending requests.
222    */
223   struct ControlMessage *control_pending_head;
224
225   /**
226    * Tail of doubly-linked list of pending requests.
227    */
228   struct ControlMessage *control_pending_tail;
229
230   /**
231    * Head of doubly-linked list of peers that are core-approved
232    * to send their next message.
233    */
234   struct PeerRecord *ready_peer_head;
235
236   /**
237    * Tail of doubly-linked list of peers that are core-approved
238    * to send their next message.
239    */
240   struct PeerRecord *ready_peer_tail;
241
242   /**
243    * Hash map listing all of the peers that we are currently
244    * connected to.
245    */
246   struct GNUNET_CONTAINER_MultiHashMap *peers;
247
248   /**
249    * Identity of this peer.
250    */
251   struct GNUNET_PeerIdentity me;
252
253   /**
254    * ID of reconnect task (if any).
255    */
256   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
257
258   /**
259    * Current delay we use for re-trying to connect to core.
260    */
261   struct GNUNET_TIME_Relative retry_backoff;
262
263   /**
264    * Request information ID generator.
265    */
266   uint32_t rim_id_gen;
267
268   /**
269    * Number of messages we are allowed to queue per target.
270    */
271   unsigned int queue_size;
272
273   /**
274    * Number of entries in the handlers array.
275    */
276   unsigned int hcnt;
277
278   /**
279    * For inbound notifications without a specific handler, do
280    * we expect to only receive headers?
281    */
282   int inbound_hdr_only;
283
284   /**
285    * For outbound notifications without a specific handler, do
286    * we expect to only receive headers?
287    */
288   int outbound_hdr_only;
289
290   /**
291    * Are we currently disconnected and hence unable to forward
292    * requests?
293    */
294   int currently_down;
295
296 };
297
298
299 /**
300  * Handle for a transmission request.
301  */
302 struct GNUNET_CORE_TransmitHandle
303 {
304
305   /**
306    * We keep active transmit handles in a doubly-linked list.
307    */
308   struct GNUNET_CORE_TransmitHandle *next;
309
310   /**
311    * We keep active transmit handles in a doubly-linked list.
312    */
313   struct GNUNET_CORE_TransmitHandle *prev;
314
315   /**
316    * Corresponding peer record.
317    */
318   struct PeerRecord *peer;
319
320   /**
321    * Corresponding SEND_REQUEST message.  Only non-NULL 
322    * while SEND_REQUEST message is pending.
323    */
324   struct ControlMessage *cm;
325
326   /**
327    * Function that will be called to get the actual request
328    * (once we are ready to transmit this request to the core).
329    * The function will be called with a NULL buffer to signal
330    * timeout.
331    */
332   GNUNET_CONNECTION_TransmitReadyNotify get_message;
333
334   /**
335    * Closure for get_message.
336    */
337   void *get_message_cls;
338
339   /**
340    * Timeout for this handle.
341    */
342   struct GNUNET_TIME_Absolute timeout;
343
344   /**
345    * How important is this message?
346    */
347   uint32_t priority;
348
349   /**
350    * Size of this request.
351    */
352   uint16_t msize;
353
354   /**
355    * Send message request ID for this request.
356    */
357   uint16_t smr_id;
358
359   /**
360    * Is corking allowed?
361    */
362   int cork;
363
364 };
365
366
367 /**
368  * Our current client connection went down.  Clean it up
369  * and try to reconnect!
370  *
371  * @param h our handle to the core service
372  */
373 static void
374 reconnect (struct GNUNET_CORE_Handle *h);
375
376
377 /**
378  * Task schedule to try to re-connect to core.
379  *
380  * @param cls the 'struct GNUNET_CORE_Handle'
381  * @param tc task context
382  */
383 static void
384 reconnect_task (void *cls, 
385                 const struct GNUNET_SCHEDULER_TaskContext *tc)
386 {
387   struct GNUNET_CORE_Handle *h = cls;
388
389   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
390 #if DEBUG_CORE
391   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
392               "Connecting to CORE service after delay\n");
393 #endif
394   reconnect (h);
395 }
396
397
398 /**
399  * Notify clients about disconnect and free 
400  * the entry for connected peer.
401  *
402  * @param cls the 'struct GNUNET_CORE_Handle*'
403  * @param key the peer identity (not used)
404  * @param value the 'struct PeerRecord' to free.
405  * @return GNUNET_YES (continue)
406  */
407 static int
408 disconnect_and_free_peer_entry (void *cls,
409                                 const GNUNET_HashCode *key,
410                                 void *value)
411 {
412   static struct GNUNET_BANDWIDTH_Value32NBO zero;
413   struct GNUNET_CORE_Handle *h = cls;
414   struct GNUNET_CORE_TransmitHandle *th;
415   struct PeerRecord *pr = value;
416   GNUNET_CORE_PeerConfigurationInfoCallback pcic;
417
418   while (NULL != (th = pr->pending_head))
419     {
420       GNUNET_CONTAINER_DLL_remove (pr->pending_head,
421                                    pr->pending_tail,
422                                    th);
423       pr->queue_size--;
424       GNUNET_assert (0 == 
425                      th->get_message (th->get_message_cls,
426                                       0, NULL));
427       if (th->cm != NULL)
428         th->cm->th = NULL;
429       GNUNET_free (th);
430     }
431   if (NULL != (pcic = pr->pcic))
432     {
433       pr->pcic = NULL;
434       GNUNET_free_non_null (pr->pcic_ptr);
435       pr->pcic_ptr = NULL;
436       pcic (pr->pcic_cls,
437             &pr->peer,
438             zero,
439             0, 
440             GNUNET_TIME_UNIT_FOREVER_REL,
441             0);
442     }
443   if (pr->timeout_task != GNUNET_SCHEDULER_NO_TASK)
444     {
445       GNUNET_SCHEDULER_cancel (pr->timeout_task);
446       pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
447     }
448   if (pr->ntr_task != GNUNET_SCHEDULER_NO_TASK)
449     {
450       GNUNET_SCHEDULER_cancel (pr->ntr_task);
451       pr->ntr_task = GNUNET_SCHEDULER_NO_TASK;
452     }
453   GNUNET_assert (pr->queue_size == 0);
454   if ( (pr->prev != NULL) ||
455        (pr->next != NULL) ||
456        (h->ready_peer_head == pr) )
457     GNUNET_CONTAINER_DLL_remove (h->ready_peer_head,
458                                  h->ready_peer_tail,
459                                  pr);
460   if (h->disconnects != NULL)
461     h->disconnects (h->cls,
462                     &pr->peer);    
463   GNUNET_assert (GNUNET_YES ==
464                  GNUNET_CONTAINER_multihashmap_remove (h->peers,
465                                                        key,
466                                                        pr));
467   GNUNET_assert (pr->pending_head == NULL);
468   GNUNET_assert (pr->pending_tail == NULL);
469   GNUNET_assert (pr->ch = h);
470   GNUNET_assert (pr->queue_size == 0);
471   GNUNET_assert (pr->timeout_task == GNUNET_SCHEDULER_NO_TASK);
472   GNUNET_assert (pr->ntr_task == GNUNET_SCHEDULER_NO_TASK);
473   GNUNET_free (pr);  
474   return GNUNET_YES;
475 }
476
477
478 /**
479  * Close down any existing connection to the CORE service and
480  * try re-establishing it later.
481  *
482  * @param h our handle
483  */
484 static void
485 reconnect_later (struct GNUNET_CORE_Handle *h)
486 {
487   struct ControlMessage *cm;
488   struct PeerRecord *pr;
489
490   GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
491   if (h->client != NULL)
492     {
493       GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
494       h->client = NULL;
495       h->cth = NULL;
496       GNUNET_CONTAINER_multihashmap_iterate (h->peers,
497                                              &disconnect_and_free_peer_entry,
498                                              h);
499     }
500   while (NULL != (pr = h->ready_peer_head))    
501     GNUNET_CONTAINER_DLL_remove (h->ready_peer_head,
502                                  h->ready_peer_tail,
503                                  pr);
504   h->currently_down = GNUNET_YES;
505   h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->retry_backoff,
506                                                     &reconnect_task,
507                                                     h);
508   while (NULL != (cm = h->control_pending_head))
509     {
510       GNUNET_CONTAINER_DLL_remove (h->control_pending_head,
511                                    h->control_pending_tail,
512                                    cm);
513       if (cm->th != NULL)
514         cm->th->cm = NULL; 
515       if (cm->cont != NULL)
516         cm->cont (cm->cont_cls, GNUNET_NO);
517       GNUNET_free (cm);
518     }
519   GNUNET_assert (h->control_pending_head == NULL);
520   h->retry_backoff = GNUNET_TIME_relative_min (GNUNET_TIME_UNIT_SECONDS,
521                                                h->retry_backoff);
522   h->retry_backoff = GNUNET_TIME_relative_multiply (h->retry_backoff, 2);
523 }
524
525
526 /**
527  * Check the list of pending requests, send the next
528  * one to the core.
529  *
530  * @param h core handle
531  * @param ignore_currently_down transmit message even if not initialized?
532  */
533 static void
534 trigger_next_request (struct GNUNET_CORE_Handle *h,
535                       int ignore_currently_down);
536
537
538 /**
539  * The given request hit its timeout.  Remove from the
540  * doubly-linked list and call the respective continuation.
541  *
542  * @param cls the transmit handle of the request that timed out
543  * @param tc context, can be NULL (!)
544  */
545 static void
546 transmission_timeout (void *cls, 
547                       const struct GNUNET_SCHEDULER_TaskContext *tc);
548
549
550 /**
551  * Send a control message to the peer asking for transmission
552  * of the message in the given peer record.
553  *
554  * @param pr peer to request transmission to
555  */
556 static void
557 request_next_transmission (struct PeerRecord *pr)
558 {
559   struct GNUNET_CORE_Handle *h = pr->ch;
560   struct ControlMessage *cm;
561   struct SendMessageRequest *smr;
562   struct GNUNET_CORE_TransmitHandle *th;
563
564   if (pr->timeout_task != GNUNET_SCHEDULER_NO_TASK)
565     {
566       GNUNET_SCHEDULER_cancel (pr->timeout_task);
567       pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
568     }
569   if (NULL == (th = pr->pending_head))
570     {
571       trigger_next_request (h, GNUNET_NO);
572       return;
573     }
574   if (th->cm != NULL)
575     return; /* already done */
576   GNUNET_assert (pr->prev == NULL);
577   GNUNET_assert (pr->next == NULL);
578   pr->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (th->timeout),
579                                                    &transmission_timeout,
580                                                    pr);
581   cm = GNUNET_malloc (sizeof (struct ControlMessage) + 
582                       sizeof (struct SendMessageRequest));
583   th->cm = cm;
584   cm->th = th;
585   smr = (struct SendMessageRequest*) &cm[1];
586   smr->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND_REQUEST);
587   smr->header.size = htons (sizeof (struct SendMessageRequest));
588   smr->priority = htonl (th->priority);
589   smr->deadline = GNUNET_TIME_absolute_hton (th->timeout);
590   smr->peer = pr->peer;
591   smr->queue_size = htonl (pr->queue_size);
592   smr->size = htons (th->msize);
593   smr->smr_id = htons (th->smr_id = pr->smr_id_gen++);
594   GNUNET_CONTAINER_DLL_insert_tail (h->control_pending_head,
595                                     h->control_pending_tail,
596                                     cm);
597 #if DEBUG_CORE
598   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
599               "Adding SEND REQUEST for peer `%s' to message queue\n",
600               GNUNET_i2s (&pr->peer));
601 #endif
602   trigger_next_request (h, GNUNET_NO);
603 }
604
605
606 /**
607  * The given request hit its timeout.  Remove from the
608  * doubly-linked list and call the respective continuation.
609  *
610  * @param cls the transmit handle of the request that timed out
611  * @param tc context, can be NULL (!)
612  */
613 static void
614 transmission_timeout (void *cls, 
615                       const struct GNUNET_SCHEDULER_TaskContext *tc)
616 {
617   struct PeerRecord *pr = cls;
618   struct GNUNET_CORE_Handle *h = pr->ch;
619   struct GNUNET_CORE_TransmitHandle *th;
620   
621   pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
622   th = pr->pending_head;
623   GNUNET_CONTAINER_DLL_remove (pr->pending_head,
624                                pr->pending_tail,
625                                th);
626   pr->queue_size--;
627   if ( (pr->prev != NULL) ||
628        (pr->next != NULL) ||
629        (pr == h->ready_peer_head) )
630     {
631       /* the request that was 'approved' by core was
632          canceled before it could be transmitted; remove
633          us from the 'ready' list */
634       GNUNET_CONTAINER_DLL_remove (h->ready_peer_head,
635                                    h->ready_peer_tail,
636                                    pr);
637     }
638 #if DEBUG_CORE
639   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
640               "Signalling timeout of request for transmission to CORE service\n");
641 #endif
642   GNUNET_assert (0 == th->get_message (th->get_message_cls, 0, NULL));
643   request_next_transmission (pr);
644 }
645
646
647 /**
648  * Transmit the next message to the core service.
649  */
650 static size_t
651 transmit_message (void *cls,
652                   size_t size, 
653                   void *buf)
654 {
655   struct GNUNET_CORE_Handle *h = cls;
656   struct ControlMessage *cm;
657   struct GNUNET_CORE_TransmitHandle *th;
658   struct PeerRecord *pr;
659   struct SendMessage *sm;
660   const struct GNUNET_MessageHeader *hdr;
661   uint16_t msize;
662   size_t ret;
663
664   GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
665   h->cth = NULL;
666   if (buf == NULL)
667     {
668 #if DEBUG_CORE
669       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
670                   "Transmission failed, initiating reconnect\n");
671 #endif
672       reconnect_later (h);
673       return 0;
674     }
675   /* first check for control messages */
676   if (NULL != (cm = h->control_pending_head))
677     {
678       hdr = (const struct GNUNET_MessageHeader*) &cm[1];
679       msize = ntohs (hdr->size);
680       if (size < msize)
681         {
682           trigger_next_request (h, GNUNET_NO);
683           return 0;
684         }
685 #if DEBUG_CORE
686       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
687                   "Transmitting control message with %u bytes of type %u to core.\n",
688                   (unsigned int) msize,
689                   (unsigned int) ntohs (hdr->type));
690 #endif
691       memcpy (buf, hdr, msize);
692       GNUNET_CONTAINER_DLL_remove (h->control_pending_head,
693                                    h->control_pending_tail,
694                                    cm);     
695       if (cm->th != NULL)
696         cm->th->cm = NULL;
697       if (NULL != cm->cont)
698         cm->cont (cm->cont_cls, GNUNET_OK);
699       GNUNET_free (cm);
700       trigger_next_request (h, GNUNET_NO);
701       return msize;
702     }
703   /* now check for 'ready' P2P messages */
704   if (NULL != (pr = h->ready_peer_head))
705     {
706       GNUNET_assert (pr->pending_head != NULL);
707       th = pr->pending_head;
708       if (size < th->msize + sizeof (struct SendMessage))
709         {
710           trigger_next_request (h, GNUNET_NO);
711           return 0;
712         }
713       GNUNET_CONTAINER_DLL_remove (h->ready_peer_head,
714                                    h->ready_peer_tail,
715                                    pr);
716       GNUNET_CONTAINER_DLL_remove (pr->pending_head,
717                                    pr->pending_tail,
718                                    th);
719       pr->queue_size--;
720       if (pr->timeout_task != GNUNET_SCHEDULER_NO_TASK)
721         {
722           GNUNET_SCHEDULER_cancel (pr->timeout_task);
723           pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
724         }
725 #if DEBUG_CORE
726       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
727                   "Transmitting SEND request to `%s' with %u bytes.\n",
728                   GNUNET_i2s (&pr->peer),
729                   (unsigned int) th->msize);
730 #endif
731       sm = (struct SendMessage *) buf;
732       sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND);
733       sm->priority = htonl (th->priority);
734       sm->deadline = GNUNET_TIME_absolute_hton (th->timeout);
735       sm->peer = pr->peer;
736       sm->cork = htonl ((uint32_t) th->cork);
737       sm->reserved = htonl (0);
738       ret = th->get_message (th->get_message_cls,
739                              size - sizeof (struct SendMessage),
740                              &sm[1]);
741  
742 #if DEBUG_CORE
743       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
744                   "Transmitting SEND request to `%s' yielded %u bytes.\n",
745                   GNUNET_i2s (&pr->peer),
746                   ret);
747 #endif
748       GNUNET_free (th);
749      if (0 == ret)
750         {
751 #if DEBUG_CORE
752           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
753                       "Size of clients message to peer %s is 0!\n",
754                       GNUNET_i2s(&pr->peer));
755 #endif
756           /* client decided to send nothing! */
757           request_next_transmission (pr);
758           return 0;       
759         }
760 #if DEBUG_CORE
761       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
762                   "Produced SEND message to core with %u bytes payload\n",
763                   (unsigned int) ret);
764 #endif
765       GNUNET_assert (ret >= sizeof (struct GNUNET_MessageHeader));
766       if (ret + sizeof (struct SendMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
767         {
768           GNUNET_break (0);
769           request_next_transmission (pr);
770           return 0;
771         }
772       ret += sizeof (struct SendMessage);
773       sm->header.size = htons (ret);
774       GNUNET_assert (ret <= size);
775       request_next_transmission (pr);
776       return ret;
777     }
778   return 0;
779 }
780
781
782 /**
783  * Check the list of pending requests, send the next
784  * one to the core.
785  *
786  * @param h core handle
787  * @param ignore_currently_down transmit message even if not initialized?
788  */
789 static void
790 trigger_next_request (struct GNUNET_CORE_Handle *h,
791                       int ignore_currently_down)
792 {
793   uint16_t msize;
794
795   if ( (GNUNET_YES == h->currently_down) &&
796        (ignore_currently_down == GNUNET_NO) )
797     {
798 #if DEBUG_CORE
799       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
800                   "Core connection down, not processing queue\n");
801 #endif
802       return;
803     }
804   if (NULL != h->cth)
805     {
806 #if DEBUG_CORE
807       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
808                   "Request pending, not processing queue\n");
809 #endif
810       return;
811     }
812   if (h->control_pending_head != NULL)
813     msize = ntohs (((struct GNUNET_MessageHeader*) &h->control_pending_head[1])->size);    
814   else if (h->ready_peer_head != NULL) 
815     msize = h->ready_peer_head->pending_head->msize + sizeof (struct SendMessage);    
816   else
817     {
818 #if DEBUG_CORE
819       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
820                   "Request queue empty, not processing queue\n");
821 #endif
822       return; /* no pending message */
823     }
824   h->cth = GNUNET_CLIENT_notify_transmit_ready (h->client,
825                                                 msize,
826                                                 GNUNET_TIME_UNIT_FOREVER_REL,
827                                                 GNUNET_NO,
828                                                 &transmit_message, h);
829 }
830
831
832 /**
833  * Handler for notification messages received from the core.
834  *
835  * @param cls our "struct GNUNET_CORE_Handle"
836  * @param msg the message received from the core service
837  */
838 static void
839 main_notify_handler (void *cls, 
840                      const struct GNUNET_MessageHeader *msg)
841 {
842   struct GNUNET_CORE_Handle *h = cls;
843   const struct InitReplyMessage *m;
844   const struct ConnectNotifyMessage *cnm;
845   const struct DisconnectNotifyMessage *dnm;
846   const struct NotifyTrafficMessage *ntm;
847   const struct GNUNET_MessageHeader *em;
848   const struct ConfigurationInfoMessage *cim;
849   const struct PeerStatusNotifyMessage *psnm;
850   const struct SendMessageReady *smr;
851   const struct GNUNET_CORE_MessageHandler *mh;
852   GNUNET_CORE_StartupCallback init;
853   GNUNET_CORE_PeerConfigurationInfoCallback pcic;
854   struct PeerRecord *pr;
855   struct GNUNET_CORE_TransmitHandle *th;
856   unsigned int hpos;
857   int trigger;
858   uint16_t msize;
859   uint16_t et;
860   uint32_t ats_count;
861
862   if (msg == NULL)
863     {
864       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
865                   _
866                   ("Client was disconnected from core service, trying to reconnect.\n"));
867       reconnect_later (h);
868       return;
869     }
870   msize = ntohs (msg->size);
871 #if DEBUG_CORE > 2
872   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
873               "Processing message of type %u and size %u from core service\n",
874               ntohs (msg->type), msize);
875 #endif
876   switch (ntohs (msg->type))
877     {
878     case GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY:
879       if (ntohs (msg->size) != sizeof (struct InitReplyMessage))
880         {
881           GNUNET_break (0);
882           reconnect_later (h);
883           return;
884         }
885       m = (const struct InitReplyMessage *) msg;
886       GNUNET_break (0 == ntohl (m->reserved));
887       /* start our message processing loop */
888       if (GNUNET_YES == h->currently_down)
889         {
890           h->currently_down = GNUNET_NO;
891           trigger_next_request (h, GNUNET_NO);
892         }
893       h->retry_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
894       GNUNET_CRYPTO_hash (&m->publicKey,
895                           sizeof (struct
896                                   GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
897                           &h->me.hashPubKey);
898       if (NULL != (init = h->init))
899         {
900           /* mark so we don't call init on reconnect */
901           h->init = NULL;
902 #if DEBUG_CORE
903           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
904                       "Connected to core service of peer `%s'.\n",
905                       GNUNET_i2s (&h->me));
906 #endif
907           init (h->cls, h, &h->me, &m->publicKey);
908         }
909       else
910         {
911 #if DEBUG_CORE
912           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
913                       "Successfully reconnected to core service.\n");
914 #endif
915         }
916       /* fake 'connect to self' */
917       pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
918                                               &h->me.hashPubKey);
919       GNUNET_assert (pr == NULL);
920       pr = GNUNET_malloc (sizeof (struct PeerRecord));
921       pr->peer = h->me;
922       pr->ch = h;
923       GNUNET_assert (GNUNET_YES ==
924                      GNUNET_CONTAINER_multihashmap_put (h->peers,
925                                                         &h->me.hashPubKey,
926                                                         pr,
927                                                         GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
928       if (NULL != h->connects)
929         h->connects (h->cls,
930                      &h->me,
931                      NULL);
932       break;
933     case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT:
934       if (msize < sizeof (struct ConnectNotifyMessage))
935         {
936           GNUNET_break (0);
937           reconnect_later (h);
938           return;
939         }
940       cnm = (const struct ConnectNotifyMessage *) msg;
941       ats_count = ntohl (cnm->ats_count);
942       if ( (msize != sizeof (struct ConnectNotifyMessage) + ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information)) ||
943            (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR != ntohl ((&cnm->ats)[ats_count].type)) )
944         {
945           GNUNET_break (0);
946           reconnect_later (h);
947           return;
948         }
949 #if DEBUG_CORE
950       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
951                   "Received notification about connection from `%s'.\n",
952                   GNUNET_i2s (&cnm->peer));
953 #endif
954       if (0 == memcmp (&h->me,
955                        &cnm->peer,
956                        sizeof (struct GNUNET_PeerIdentity)))
957         {
958           /* connect to self!? */
959           GNUNET_break (0);
960           return;
961         }
962       pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
963                                               &cnm->peer.hashPubKey);
964       if (pr != NULL)
965         {
966           GNUNET_break (0);
967           reconnect_later (h);
968           return;
969         }
970       pr = GNUNET_malloc (sizeof (struct PeerRecord));
971       pr->peer = cnm->peer;
972       pr->ch = h;
973       GNUNET_assert (GNUNET_YES ==
974                      GNUNET_CONTAINER_multihashmap_put (h->peers,
975                                                         &cnm->peer.hashPubKey,
976                                                         pr,
977                                                         GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
978       if (NULL != h->connects)
979         h->connects (h->cls,
980                      &cnm->peer,
981                      &cnm->ats);
982       break;
983     case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT:
984       if (msize != sizeof (struct DisconnectNotifyMessage))
985         {
986           GNUNET_break (0);
987           reconnect_later (h);
988           return;
989         }
990       dnm = (const struct DisconnectNotifyMessage *) msg;
991       if (0 == memcmp (&h->me,
992                        &dnm->peer,
993                        sizeof (struct GNUNET_PeerIdentity)))
994         {
995           /* connection to self!? */
996           GNUNET_break (0);
997           return;
998         }
999       GNUNET_break (0 == ntohl (dnm->reserved));
1000 #if DEBUG_CORE
1001       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1002                   "Received notification about disconnect from `%s'.\n",
1003                   GNUNET_i2s (&dnm->peer));
1004 #endif
1005       pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
1006                                               &dnm->peer.hashPubKey);
1007       if (pr == NULL)
1008         {
1009           GNUNET_break (0);
1010           reconnect_later (h);
1011           return;
1012         }
1013       trigger = ( (pr->prev != NULL) ||
1014                   (pr->next != NULL) ||
1015                   (h->ready_peer_head == pr) );
1016       disconnect_and_free_peer_entry (h, &dnm->peer.hashPubKey, pr);
1017       if (trigger)
1018         trigger_next_request (h, GNUNET_NO);
1019       break;
1020     case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_STATUS_CHANGE:
1021       if (NULL == h->status_events)
1022         {
1023           GNUNET_break (0);
1024           return;
1025         }
1026       if (msize < sizeof (struct PeerStatusNotifyMessage))
1027         {
1028           GNUNET_break (0);
1029           reconnect_later (h);
1030           return;
1031         }
1032       psnm = (const struct PeerStatusNotifyMessage *) msg;
1033       if (0 == memcmp (&h->me,
1034                        &psnm->peer,
1035                        sizeof (struct GNUNET_PeerIdentity)))
1036         {
1037           /* self-change!? */
1038           GNUNET_break (0);
1039           return;
1040         }
1041       ats_count = ntohl (psnm->ats_count);
1042       if ( (msize != sizeof (struct PeerStatusNotifyMessage) + ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information)) ||
1043            (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR != ntohl ((&psnm->ats)[ats_count].type)) )
1044         {
1045           GNUNET_break (0);
1046           reconnect_later (h);
1047           return;
1048         }
1049 #if DEBUG_CORE > 1
1050       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1051                   "Received notification about status change by `%s'.\n",
1052                   GNUNET_i2s (&psnm->peer));
1053 #endif
1054       pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
1055                                               &psnm->peer.hashPubKey);
1056       if (pr == NULL)
1057         {
1058           GNUNET_break (0);
1059           reconnect_later (h);
1060           return;
1061         }
1062       h->status_events (h->cls,
1063                         &psnm->peer,
1064                         psnm->bandwidth_in,
1065                         psnm->bandwidth_out,
1066                         GNUNET_TIME_absolute_ntoh (psnm->timeout),
1067                         &psnm->ats);
1068       break;
1069     case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND:
1070       if (msize < sizeof (struct NotifyTrafficMessage))
1071         {
1072           GNUNET_break (0);
1073           reconnect_later (h);
1074           return;
1075         }
1076       ntm = (const struct NotifyTrafficMessage *) msg;
1077
1078       ats_count = ntohl (ntm->ats_count);
1079       if ( (msize < sizeof (struct NotifyTrafficMessage) + ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information)
1080             + sizeof (struct GNUNET_MessageHeader)) ||
1081            (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR != ntohl ((&ntm->ats)[ats_count].type)) )
1082         {
1083           GNUNET_break (0);
1084           reconnect_later (h);
1085           return;
1086         }
1087       em = (const struct GNUNET_MessageHeader *) &(&ntm->ats)[ats_count+1];
1088 #if DEBUG_CORE
1089       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1090                   "Received message of type %u and size %u from peer `%4s'\n",
1091                   ntohs (em->type), 
1092                   ntohs (em->size),
1093                   GNUNET_i2s (&ntm->peer));
1094 #endif
1095       pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
1096                                               &ntm->peer.hashPubKey);
1097       if (pr == NULL)
1098         {
1099           GNUNET_break (0);
1100           reconnect_later (h);
1101           return;
1102         }
1103       if ((GNUNET_NO == h->inbound_hdr_only) &&
1104           (msize != ntohs (em->size) + sizeof (struct NotifyTrafficMessage) + 
1105            + ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information)) )
1106         {
1107           GNUNET_break (0);
1108           reconnect_later (h);
1109           return;
1110         }
1111       et = ntohs (em->type);
1112       for (hpos = 0; hpos < h->hcnt; hpos++)
1113         {
1114           mh = &h->handlers[hpos];
1115           if (mh->type != et)
1116             continue;
1117           if ((mh->expected_size != ntohs (em->size)) &&
1118               (mh->expected_size != 0))
1119             {
1120               GNUNET_break (0);
1121               continue;
1122             }
1123           if (GNUNET_OK !=
1124               h->handlers[hpos].callback (h->cls, &ntm->peer, em,
1125                                           &ntm->ats))
1126             {
1127               /* error in processing, do not process other messages! */
1128               break;
1129             }
1130         }
1131       if (NULL != h->inbound_notify)
1132         h->inbound_notify (h->cls, &ntm->peer, em,
1133                            &ntm->ats);
1134       break;
1135     case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_OUTBOUND:
1136       if (msize < sizeof (struct NotifyTrafficMessage))
1137         {
1138           GNUNET_break (0);
1139           reconnect_later (h);
1140           return;
1141         }
1142       ntm = (const struct NotifyTrafficMessage *) msg;
1143       if (0 == memcmp (&h->me,
1144                        &ntm->peer,
1145                        sizeof (struct GNUNET_PeerIdentity)))
1146         {
1147           /* self-change!? */
1148           GNUNET_break (0);
1149           return;
1150         }
1151       ats_count = ntohl (ntm->ats_count);
1152       if ( (msize < sizeof (struct NotifyTrafficMessage) + ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information)
1153             + sizeof (struct GNUNET_MessageHeader)) ||
1154            (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR != ntohl ((&ntm->ats)[ats_count].type)) )
1155         {
1156           GNUNET_break (0);
1157           reconnect_later (h);
1158           return;
1159         }
1160       em = (const struct GNUNET_MessageHeader *) &(&ntm->ats)[ats_count+1];
1161       pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
1162                                               &ntm->peer.hashPubKey);
1163       if (pr == NULL)
1164         {
1165           GNUNET_break (0);
1166           reconnect_later (h);
1167           return;
1168         }
1169 #if DEBUG_CORE
1170       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1171                   "Received notification about transmission to `%s'.\n",
1172                   GNUNET_i2s (&ntm->peer));
1173 #endif
1174       if ((GNUNET_NO == h->outbound_hdr_only) &&
1175           (msize != ntohs (em->size) + sizeof (struct NotifyTrafficMessage) 
1176            + ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information)) )
1177         {
1178           GNUNET_break (0);
1179           reconnect_later (h);
1180           return;
1181         }
1182       if (NULL == h->outbound_notify)
1183         {
1184           GNUNET_break (0);
1185           break;
1186         }
1187       h->outbound_notify (h->cls, &ntm->peer, em,
1188                           &ntm->ats);
1189       break;
1190     case GNUNET_MESSAGE_TYPE_CORE_SEND_READY:
1191       if (msize != sizeof (struct SendMessageReady))
1192         {
1193           GNUNET_break (0);
1194           reconnect_later (h);
1195           return;
1196         }
1197       smr = (const struct SendMessageReady *) msg;
1198       pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
1199                                               &smr->peer.hashPubKey);
1200       if (pr == NULL)
1201         {
1202           GNUNET_break (0);
1203           reconnect_later (h);
1204           return;
1205         }
1206 #if DEBUG_CORE
1207       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1208                   "Received notification about transmission readiness to `%s'.\n",
1209                   GNUNET_i2s (&smr->peer));
1210 #endif
1211       if (pr->pending_head == NULL)
1212         {
1213           /* request must have been cancelled between the original request
1214              and the response from core, ignore core's readiness */
1215           break;
1216         }
1217
1218       th = pr->pending_head;
1219       if (ntohs (smr->smr_id) != th->smr_id)
1220         {
1221           /* READY message is for expired or cancelled message,
1222              ignore! (we should have already sent another request) */
1223           break;
1224         }
1225       if ( (pr->prev != NULL) ||
1226            (pr->next != NULL) ||
1227            (h->ready_peer_head == pr) )
1228         {
1229           /* we should not already be on the ready list... */
1230           GNUNET_break (0);
1231           reconnect_later (h);
1232           return;
1233         }
1234       GNUNET_CONTAINER_DLL_insert (h->ready_peer_head,
1235                                    h->ready_peer_tail,
1236                                    pr);
1237       trigger_next_request (h, GNUNET_NO);
1238       break;
1239     case GNUNET_MESSAGE_TYPE_CORE_CONFIGURATION_INFO:
1240       if (ntohs (msg->size) != sizeof (struct ConfigurationInfoMessage))
1241         {
1242           GNUNET_break (0);
1243           reconnect_later (h);
1244           return;
1245         }
1246       cim = (const struct ConfigurationInfoMessage*) msg;
1247       if (0 == memcmp (&h->me,
1248                        &cim->peer,
1249                        sizeof (struct GNUNET_PeerIdentity)))
1250         {
1251           /* self-change!? */
1252           GNUNET_break (0);
1253           return;
1254         }
1255 #if DEBUG_CORE
1256       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1257                   "Received notification about configuration update for `%s' with RIM %u.\n",
1258                   GNUNET_i2s (&cim->peer),
1259                   (unsigned int) ntohl (cim->rim_id));
1260 #endif
1261       pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
1262                                               &cim->peer.hashPubKey);
1263       if (pr == NULL)
1264         {
1265           GNUNET_break (0);
1266           reconnect_later (h);
1267           return;
1268         }
1269       if (pr->rim_id != ntohl (cim->rim_id))
1270         {
1271 #if DEBUG_CORE
1272           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1273                       "Reservation ID mismatch in notification...\n");
1274 #endif
1275           break;
1276         }
1277       pcic = pr->pcic;
1278       pr->pcic = NULL;
1279       GNUNET_free_non_null (pr->pcic_ptr);
1280       pr->pcic_ptr = NULL;
1281       if (pcic != NULL)
1282         pcic (pr->pcic_cls,
1283               &pr->peer,
1284               cim->bw_out,
1285               ntohl (cim->reserved_amount),
1286               GNUNET_TIME_relative_ntoh (cim->reserve_delay),
1287               GNUNET_ntohll (cim->preference));
1288       break;
1289     default:
1290       reconnect_later (h);
1291       return;
1292     }
1293   GNUNET_CLIENT_receive (h->client,
1294                          &main_notify_handler, h, 
1295                          GNUNET_TIME_UNIT_FOREVER_REL);
1296 }
1297
1298
1299 /**
1300  * Task executed once we are done transmitting the INIT message.
1301  * Starts our 'receive' loop.
1302  *
1303  * @param cls the 'struct GNUNET_CORE_Handle'
1304  * @param success were we successful
1305  */
1306 static void
1307 init_done_task (void *cls, 
1308                 int success)
1309 {
1310   struct GNUNET_CORE_Handle *h = cls;
1311
1312   if (success == GNUNET_SYSERR)
1313     return; /* shutdown */
1314   if (success == GNUNET_NO)
1315     {
1316 #if DEBUG_CORE
1317       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1318                   "Failed to exchange INIT with core, retrying\n");
1319 #endif
1320       if (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK)
1321         reconnect_later (h);
1322       return;
1323     }
1324   GNUNET_CLIENT_receive (h->client,
1325                          &main_notify_handler, 
1326                          h, 
1327                          GNUNET_TIME_UNIT_FOREVER_REL);
1328 }
1329
1330
1331 /**
1332  * Our current client connection went down.  Clean it up
1333  * and try to reconnect!
1334  *
1335  * @param h our handle to the core service
1336  */
1337 static void
1338 reconnect (struct GNUNET_CORE_Handle *h)
1339 {
1340   struct ControlMessage *cm;
1341   struct InitMessage *init;
1342   uint32_t opt;
1343   uint16_t msize;
1344   uint16_t *ts;
1345   unsigned int hpos;
1346
1347 #if DEBUG_CORE
1348   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1349               "Reconnecting to CORE service\n");
1350 #endif
1351   GNUNET_assert (h->client == NULL);
1352   GNUNET_assert (h->currently_down == GNUNET_YES);
1353   h->client = GNUNET_CLIENT_connect ("core", h->cfg);
1354   if (h->client == NULL)
1355     {
1356       reconnect_later (h);
1357       return;
1358     }
1359   msize = h->hcnt * sizeof (uint16_t) + sizeof (struct InitMessage);
1360   cm = GNUNET_malloc (sizeof (struct ControlMessage) +
1361                       msize);
1362   cm->cont = &init_done_task;
1363   cm->cont_cls = h;
1364   init = (struct InitMessage*) &cm[1];
1365   init->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_INIT);
1366   init->header.size = htons (msize);
1367   opt = GNUNET_CORE_OPTION_SEND_CONNECT | GNUNET_CORE_OPTION_SEND_DISCONNECT;
1368   if (h->status_events != NULL)
1369     opt |= GNUNET_CORE_OPTION_SEND_STATUS_CHANGE;
1370   if (h->inbound_notify != NULL)
1371     {
1372       if (h->inbound_hdr_only)
1373         opt |= GNUNET_CORE_OPTION_SEND_HDR_INBOUND;
1374       else
1375         opt |= GNUNET_CORE_OPTION_SEND_FULL_INBOUND;
1376     }
1377   if (h->outbound_notify != NULL)
1378     {
1379       if (h->outbound_hdr_only)
1380         opt |= GNUNET_CORE_OPTION_SEND_HDR_OUTBOUND;
1381       else
1382         opt |= GNUNET_CORE_OPTION_SEND_FULL_OUTBOUND;
1383     }
1384   init->options = htonl (opt);
1385   ts = (uint16_t *) &init[1];
1386   for (hpos = 0; hpos < h->hcnt; hpos++)
1387     ts[hpos] = htons (h->handlers[hpos].type);
1388   GNUNET_CONTAINER_DLL_insert (h->control_pending_head,
1389                                h->control_pending_tail,
1390                                cm);
1391   trigger_next_request (h, GNUNET_YES);
1392 }
1393
1394
1395
1396 /**
1397  * Connect to the core service.  Note that the connection may
1398  * complete (or fail) asynchronously.
1399  *
1400  * @param cfg configuration to use
1401  * @param queue_size size of the per-peer message queue
1402  * @param cls closure for the various callbacks that follow (including handlers in the handlers array)
1403  * @param init callback to call on timeout or once we have successfully
1404  *        connected to the core service; note that timeout is only meaningful if init is not NULL
1405  * @param connects function to call on peer connect, can be NULL
1406  * @param disconnects function to call on peer disconnect / timeout, can be NULL
1407  * @param status_events function to call on changes to peer connection status, can be NULL
1408  * @param inbound_notify function to call for all inbound messages, can be NULL
1409  * @param inbound_hdr_only set to GNUNET_YES if inbound_notify will only read the
1410  *                GNUNET_MessageHeader and hence we do not need to give it the full message;
1411  *                can be used to improve efficiency, ignored if inbound_notify is NULLL
1412  * @param outbound_notify function to call for all outbound messages, can be NULL
1413  * @param outbound_hdr_only set to GNUNET_YES if outbound_notify will only read the
1414  *                GNUNET_MessageHeader and hence we do not need to give it the full message
1415  *                can be used to improve efficiency, ignored if outbound_notify is NULLL
1416  * @param handlers callbacks for messages we care about, NULL-terminated
1417  * @return handle to the core service (only useful for disconnect until 'init' is called);
1418  *                NULL on error (in this case, init is never called)
1419  */
1420 struct GNUNET_CORE_Handle *
1421 GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
1422                      unsigned int queue_size,
1423                      void *cls,
1424                      GNUNET_CORE_StartupCallback init,
1425                      GNUNET_CORE_ConnectEventHandler connects,
1426                      GNUNET_CORE_DisconnectEventHandler disconnects,
1427                      GNUNET_CORE_PeerStatusEventHandler status_events,
1428                      GNUNET_CORE_MessageCallback inbound_notify,
1429                      int inbound_hdr_only,
1430                      GNUNET_CORE_MessageCallback outbound_notify,
1431                      int outbound_hdr_only,
1432                      const struct GNUNET_CORE_MessageHandler *handlers)
1433 {
1434   struct GNUNET_CORE_Handle *h;
1435
1436   h = GNUNET_malloc (sizeof (struct GNUNET_CORE_Handle));
1437   h->cfg = cfg;
1438   h->queue_size = queue_size;
1439   h->cls = cls;
1440   h->init = init;
1441   h->connects = connects;
1442   h->disconnects = disconnects;
1443   h->status_events = status_events;
1444   h->inbound_notify = inbound_notify;
1445   h->outbound_notify = outbound_notify;
1446   h->inbound_hdr_only = inbound_hdr_only;
1447   h->outbound_hdr_only = outbound_hdr_only;
1448   h->handlers = handlers;
1449   h->hcnt = 0;
1450   h->currently_down = GNUNET_YES;
1451   h->peers = GNUNET_CONTAINER_multihashmap_create (128);
1452   h->retry_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
1453   while (handlers[h->hcnt].callback != NULL)
1454     h->hcnt++;
1455   GNUNET_assert (h->hcnt <
1456                  (GNUNET_SERVER_MAX_MESSAGE_SIZE -
1457                   sizeof (struct InitMessage)) / sizeof (uint16_t));
1458 #if DEBUG_CORE
1459   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1460               "Connecting to CORE service\n");
1461 #endif
1462   reconnect (h);
1463   return h;
1464 }
1465
1466
1467 /**
1468  * Disconnect from the core service.  This function can only 
1469  * be called *after* all pending 'GNUNET_CORE_notify_transmit_ready'
1470  * requests have been explicitly canceled.
1471  *
1472  * @param handle connection to core to disconnect
1473  */
1474 void
1475 GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle)
1476 {
1477   struct ControlMessage *cm;
1478   
1479 #if DEBUG_CORE
1480   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1481               "Disconnecting from CORE service\n");
1482 #endif
1483   if (handle->cth != NULL)
1484     {
1485       GNUNET_CLIENT_notify_transmit_ready_cancel (handle->cth);
1486       handle->cth = NULL;
1487     }
1488   if (handle->client != NULL)
1489     {
1490       GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
1491       handle->client = NULL;
1492     }
1493   while (NULL != (cm = handle->control_pending_head))
1494     {
1495       GNUNET_CONTAINER_DLL_remove (handle->control_pending_head,
1496                                    handle->control_pending_tail,
1497                                    cm);
1498       if (cm->th != NULL)
1499         cm->th->cm = NULL;
1500       if (cm->cont != NULL)
1501         cm->cont (cm->cont_cls, GNUNET_SYSERR);
1502       GNUNET_free (cm);
1503     }
1504   if (handle->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
1505     {
1506       GNUNET_SCHEDULER_cancel (handle->reconnect_task);
1507       handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
1508     }
1509   GNUNET_CONTAINER_multihashmap_iterate (handle->peers,
1510                                          &disconnect_and_free_peer_entry,
1511                                          handle);
1512   GNUNET_CONTAINER_multihashmap_destroy (handle->peers);
1513   GNUNET_break (handle->ready_peer_head == NULL);
1514   GNUNET_free (handle);
1515 }
1516
1517
1518 /**
1519  * Task that calls 'request_next_transmission'.
1520  *
1521  * @param cls the 'struct PeerRecord*'
1522  * @param tc scheduler context
1523  */
1524 static void
1525 run_request_next_transmission (void *cls,
1526                                const struct GNUNET_SCHEDULER_TaskContext *tc)
1527 {
1528   struct PeerRecord *pr = cls;
1529
1530   pr->ntr_task = GNUNET_SCHEDULER_NO_TASK;
1531   request_next_transmission (pr);
1532 }
1533
1534
1535 /**
1536  * Ask the core to call "notify" once it is ready to transmit the
1537  * given number of bytes to the specified "target".    Must only be
1538  * called after a connection to the respective peer has been
1539  * established (and the client has been informed about this).
1540  *
1541  * @param handle connection to core service
1542  * @param cork is corking allowed for this transmission?
1543  * @param priority how important is the message?
1544  * @param maxdelay how long can the message wait?
1545  * @param target who should receive the message,
1546  *        use NULL for this peer (loopback)
1547  * @param notify_size how many bytes of buffer space does notify want?
1548  * @param notify function to call when buffer space is available
1549  * @param notify_cls closure for notify
1550  * @return non-NULL if the notify callback was queued,
1551  *         NULL if we can not even queue the request (insufficient
1552  *         memory); if NULL is returned, "notify" will NOT be called.
1553  */
1554 struct GNUNET_CORE_TransmitHandle *
1555 GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle,
1556                                    int cork,
1557                                    uint32_t priority,
1558                                    struct GNUNET_TIME_Relative maxdelay,
1559                                    const struct GNUNET_PeerIdentity *target,
1560                                    size_t notify_size,
1561                                    GNUNET_CONNECTION_TransmitReadyNotify notify,
1562                                    void *notify_cls)
1563 {
1564   struct PeerRecord *pr;
1565   struct GNUNET_CORE_TransmitHandle *th;
1566   struct GNUNET_CORE_TransmitHandle *pos;
1567   struct GNUNET_CORE_TransmitHandle *prev;
1568   struct GNUNET_CORE_TransmitHandle *minp;
1569
1570   pr = GNUNET_CONTAINER_multihashmap_get (handle->peers,
1571                                           &target->hashPubKey);
1572   if (NULL == pr)
1573     {
1574       /* attempt to send to peer that is not connected */
1575       GNUNET_log(GNUNET_ERROR_TYPE_WARNING,
1576                  "Attempting to send to peer `%s' from peer `%s', but not connected!\n",
1577                  GNUNET_i2s(target), GNUNET_h2s(&handle->me.hashPubKey));
1578       GNUNET_break (0);
1579       return NULL;
1580     }
1581   GNUNET_assert (notify_size + sizeof (struct SendMessage) <
1582                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
1583   th = GNUNET_malloc (sizeof (struct GNUNET_CORE_TransmitHandle));
1584   th->peer = pr;
1585   GNUNET_assert(NULL != notify);
1586   th->get_message = notify;
1587   th->get_message_cls = notify_cls;
1588   th->timeout = GNUNET_TIME_relative_to_absolute (maxdelay);
1589   th->priority = priority;
1590   th->msize = notify_size;
1591   th->cork = cork;
1592   /* bound queue size */
1593   if (pr->queue_size == handle->queue_size)
1594     {
1595       /* find lowest-priority entry, but skip the head of the list */
1596       minp = pr->pending_head->next;
1597       prev = minp;
1598       while (prev != NULL)
1599         {
1600           if (prev->priority < minp->priority)
1601             minp = prev;
1602           prev = prev->next;
1603         }
1604       if (minp == NULL) 
1605         {
1606           GNUNET_break (handle->queue_size != 0);
1607           GNUNET_break (pr->queue_size == 1);
1608           GNUNET_free(th);
1609 #if DEBUG_CORE
1610           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1611                       "Dropping transmission request: cannot drop queue head and limit is one\n");
1612 #endif
1613           return NULL;
1614         }
1615       if (priority <= minp->priority)
1616         {
1617 #if DEBUG_CORE
1618           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1619                       "Dropping transmission request: priority too low\n");
1620 #endif
1621           GNUNET_free(th);
1622           return NULL; /* priority too low */
1623         }
1624       GNUNET_CONTAINER_DLL_remove (pr->pending_head,
1625                                    pr->pending_tail,
1626                                    minp);
1627       pr->queue_size--;
1628       GNUNET_assert (0 ==
1629                      minp->get_message (minp->get_message_cls,
1630                                         0, NULL));
1631       GNUNET_free (minp);
1632     }
1633
1634   /* Order entries by deadline, but SKIP 'HEAD' if
1635      we're in the 'ready_peer_*' DLL */
1636   pos = pr->pending_head;
1637   if ( (pr->prev != NULL) ||
1638        (pr->next != NULL) ||
1639        (pr == handle->ready_peer_head) )
1640     {
1641       GNUNET_assert (pos != NULL);
1642       pos = pos->next; /* skip head */
1643     }
1644
1645   /* insertion sort */
1646   prev = pos;
1647   while ( (pos != NULL) &&
1648           (pos->timeout.abs_value < th->timeout.abs_value) )      
1649     {
1650       prev = pos;
1651       pos = pos->next;
1652     }
1653   GNUNET_CONTAINER_DLL_insert_after (pr->pending_head,
1654                                      pr->pending_tail,
1655                                      prev,
1656                                      th);
1657   pr->queue_size++;
1658   /* was the request queue previously empty? */
1659 #if DEBUG_CORE
1660   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1661               "Transmission request added to queue\n");
1662 #endif
1663   if ( (pr->pending_head == th)  &&
1664        (pr->ntr_task == GNUNET_SCHEDULER_NO_TASK) )
1665     pr->ntr_task = GNUNET_SCHEDULER_add_now (&run_request_next_transmission, pr);
1666   return th;
1667 }
1668
1669
1670 /**
1671  * Cancel the specified transmission-ready notification.
1672  *
1673  * @param th handle that was returned by "notify_transmit_ready".
1674  */
1675 void
1676 GNUNET_CORE_notify_transmit_ready_cancel (struct GNUNET_CORE_TransmitHandle
1677                                           *th)
1678 {
1679   struct PeerRecord *pr = th->peer;
1680   struct GNUNET_CORE_Handle *h = pr->ch;
1681   int was_head;
1682
1683   was_head = (pr->pending_head == th);
1684   GNUNET_CONTAINER_DLL_remove (pr->pending_head,
1685                                pr->pending_tail,
1686                                th);    
1687   pr->queue_size--;
1688   if (th->cm != NULL)
1689     {
1690       /* we're currently in the control queue, remove */
1691       GNUNET_CONTAINER_DLL_remove (h->control_pending_head,
1692                                    h->control_pending_tail,
1693                                    th->cm);
1694       GNUNET_free (th->cm);      
1695     }
1696   GNUNET_free (th);
1697   if (was_head)
1698     {
1699       if ( (pr->prev != NULL) ||
1700            (pr->next != NULL) ||
1701            (pr == h->ready_peer_head) )
1702         {
1703           /* the request that was 'approved' by core was
1704              canceled before it could be transmitted; remove
1705              us from the 'ready' list */
1706           GNUNET_CONTAINER_DLL_remove (h->ready_peer_head,
1707                                        h->ready_peer_tail,
1708                                        pr);
1709         }
1710       request_next_transmission (pr);
1711     }
1712 }
1713
1714
1715 /* ****************** GNUNET_CORE_peer_request_connect ******************** */
1716
1717 /**
1718  * Handle for a request to the core to connect to
1719  * a particular peer.  Can be used to cancel the request
1720  * (before the 'cont'inuation is called).
1721  */
1722 struct GNUNET_CORE_PeerRequestHandle
1723 {
1724
1725   /**
1726    * Link to control message.
1727    */
1728   struct ControlMessage *cm;
1729
1730   /**
1731    * Core handle used.
1732    */
1733   struct GNUNET_CORE_Handle *h;
1734
1735   /**
1736    * Continuation to run when done.
1737    */
1738   GNUNET_CORE_ControlContinuation cont;
1739
1740   /**
1741    * Closure for 'cont'.
1742    */
1743   void *cont_cls;
1744
1745 };
1746
1747
1748 /**
1749  * Continuation called when the control message was transmitted.
1750  * Calls the original continuation and frees the remaining
1751  * resources.
1752  *
1753  * @param cls the 'struct GNUNET_CORE_PeerRequestHandle'
1754  * @param success was the request transmitted?
1755  */
1756 static void
1757 peer_request_connect_cont (void *cls,
1758                            int success)
1759 {
1760   struct GNUNET_CORE_PeerRequestHandle *ret = cls;
1761   
1762   if (ret->cont != NULL)
1763     ret->cont (ret->cont_cls, success);    
1764   GNUNET_free (ret);
1765 }
1766
1767
1768 /**
1769  * Request that the core should try to connect to a particular peer.
1770  * Once the request has been transmitted to the core, the continuation
1771  * function will be called.  Note that this does NOT mean that a
1772  * connection was successfully established -- it only means that the
1773  * core will now try.  Successful establishment of the connection
1774  * will be signalled to the 'connects' callback argument of
1775  * 'GNUNET_CORE_connect' only.  If the core service does not respond
1776  * to our connection attempt within the given time frame, 'cont' will
1777  * be called with the TIMEOUT reason code.
1778  *
1779  * @param h core handle
1780  * @param timeout how long to try to talk to core
1781  * @param peer who should we connect to
1782  * @param cont function to call once the request has been completed (or timed out)
1783  * @param cont_cls closure for cont
1784  *
1785  * @return NULL on error or already connected,
1786  *         otherwise handle for cancellation
1787  */
1788 struct GNUNET_CORE_PeerRequestHandle *
1789 GNUNET_CORE_peer_request_connect (struct GNUNET_CORE_Handle *h,
1790                                   struct GNUNET_TIME_Relative timeout,
1791                                   const struct GNUNET_PeerIdentity * peer,
1792                                   GNUNET_CORE_ControlContinuation cont,
1793                                   void *cont_cls)
1794 {
1795   struct GNUNET_CORE_PeerRequestHandle *ret;
1796   struct ControlMessage *cm;
1797   struct ConnectMessage *msg;
1798
1799   if (NULL != GNUNET_CONTAINER_multihashmap_get (h->peers,
1800                                           &peer->hashPubKey))
1801     {
1802 #if DEBUG_CORE
1803       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 
1804                  "Peers are already connected!\n");
1805 #endif
1806       return NULL;
1807     }
1808   
1809   cm = GNUNET_malloc (sizeof (struct ControlMessage) + 
1810                       sizeof (struct ConnectMessage));
1811   msg = (struct ConnectMessage*) &cm[1];
1812   msg->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_REQUEST_CONNECT);
1813   msg->header.size = htons (sizeof (struct ConnectMessage));
1814   msg->reserved = htonl (0);
1815   msg->timeout = GNUNET_TIME_relative_hton (timeout);
1816   msg->peer = *peer;
1817   GNUNET_CONTAINER_DLL_insert_tail (h->control_pending_head,
1818                                     h->control_pending_tail,
1819                                     cm);
1820   ret = GNUNET_malloc (sizeof (struct GNUNET_CORE_PeerRequestHandle));
1821   ret->h = h;
1822   ret->cm = cm;
1823   ret->cont = cont;
1824   ret->cont_cls = cont_cls;
1825   cm->cont = &peer_request_connect_cont;
1826   cm->cont_cls = ret;
1827 #if DEBUG_CORE
1828   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1829               "Queueing REQUEST_CONNECT request\n");
1830 #endif
1831   trigger_next_request (h, GNUNET_NO);
1832   return ret;
1833 }
1834
1835
1836 /**
1837  * Cancel a pending request to connect to a particular peer.  Must not
1838  * be called after the 'cont' function was invoked.
1839  *
1840  * @param req request handle that was returned for the original request
1841  */
1842 void
1843 GNUNET_CORE_peer_request_connect_cancel (struct GNUNET_CORE_PeerRequestHandle *req)
1844 {
1845   struct GNUNET_CORE_Handle *h = req->h;
1846   struct ControlMessage *cm = req->cm;
1847
1848 #if DEBUG_CORE
1849   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1850               "A CHANGE PREFERENCE request was cancelled!\n");
1851 #endif
1852   GNUNET_CONTAINER_DLL_remove (h->control_pending_head,
1853                                h->control_pending_tail,
1854                                cm);
1855   GNUNET_free (cm);
1856   GNUNET_free (req);
1857 }
1858
1859
1860 /* ****************** GNUNET_CORE_peer_change_preference ******************** */
1861
1862
1863 struct GNUNET_CORE_InformationRequestContext 
1864 {
1865   
1866   /**
1867    * Our connection to the service.
1868    */
1869   struct GNUNET_CORE_Handle *h;
1870
1871   /**
1872    * Link to control message, NULL if CM was sent.
1873    */ 
1874   struct ControlMessage *cm;
1875
1876   /**
1877    * Link to peer record.
1878    */
1879   struct PeerRecord *pr;
1880 };
1881
1882
1883 /**
1884  * CM was sent, remove link so we don't double-free.
1885  *
1886  * @param cls the 'struct GNUNET_CORE_InformationRequestContext'
1887  * @param success were we successful?
1888  */
1889 static void
1890 change_preference_send_continuation (void *cls,
1891                                      int success)
1892 {
1893   struct GNUNET_CORE_InformationRequestContext *irc = cls;
1894
1895   irc->cm = NULL;
1896 }
1897
1898
1899 /**
1900  * Obtain statistics and/or change preferences for the given peer.
1901  *
1902  * @param h core handle
1903  * @param peer identifies the peer
1904  * @param timeout after how long should we give up (and call "info" with NULL
1905  *                for "peer" to signal an error)?
1906  * @param bw_out set to the current bandwidth limit (sending) for this peer,
1907  *                caller should set "bw_out" to "-1" to avoid changing
1908  *                the current value; otherwise "bw_out" will be lowered to
1909  *                the specified value; passing a pointer to "0" can be used to force
1910  *                us to disconnect from the peer; "bw_out" might not increase
1911  *                as specified since the upper bound is generally
1912  *                determined by the other peer!
1913  * @param amount reserve N bytes for receiving, negative
1914  *                amounts can be used to undo a (recent) reservation;
1915  * @param preference increase incoming traffic share preference by this amount;
1916  *                in the absence of "amount" reservations, we use this
1917  *                preference value to assign proportional bandwidth shares
1918  *                to all connected peers
1919  * @param info function to call with the resulting configuration information
1920  * @param info_cls closure for info
1921  * @return NULL on error
1922  */
1923 struct GNUNET_CORE_InformationRequestContext *
1924 GNUNET_CORE_peer_change_preference (struct GNUNET_CORE_Handle *h,
1925                                     const struct GNUNET_PeerIdentity *peer,
1926                                     struct GNUNET_TIME_Relative timeout,
1927                                     struct GNUNET_BANDWIDTH_Value32NBO bw_out,
1928                                     int32_t amount,
1929                                     uint64_t preference,
1930                                     GNUNET_CORE_PeerConfigurationInfoCallback info,
1931                                     void *info_cls)
1932 {
1933   struct GNUNET_CORE_InformationRequestContext *irc;
1934   struct PeerRecord *pr;
1935   struct RequestInfoMessage *rim;
1936   struct ControlMessage *cm;
1937
1938   pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
1939                                           &peer->hashPubKey);
1940   if (NULL == pr)
1941     {
1942       /* attempt to change preference on peer that is not connected */
1943       GNUNET_break (0);
1944       return NULL;
1945     }
1946   if (pr->pcic != NULL)
1947     {
1948       /* second change before first one is done */
1949       GNUNET_break (0);
1950       return NULL;
1951     }
1952   irc = GNUNET_malloc (sizeof (struct GNUNET_CORE_InformationRequestContext));
1953   irc->h = h;
1954   irc->pr = pr;
1955   cm = GNUNET_malloc (sizeof (struct ControlMessage) +
1956                       sizeof (struct RequestInfoMessage));
1957   cm->cont = &change_preference_send_continuation;
1958   cm->cont_cls = irc;
1959   irc->cm = cm;
1960   rim = (struct RequestInfoMessage*) &cm[1];
1961   rim->header.size = htons (sizeof (struct RequestInfoMessage));
1962   rim->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_REQUEST_INFO);
1963   rim->rim_id = htonl (pr->rim_id = h->rim_id_gen++);
1964   rim->limit_outbound = bw_out;
1965   rim->reserve_inbound = htonl (amount);
1966   rim->preference_change = GNUNET_htonll(preference);
1967   rim->peer = *peer;
1968 #if DEBUG_CORE
1969   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1970               "Queueing CHANGE PREFERENCE request for peer `%s' with RIM %u\n",
1971               GNUNET_i2s (peer),
1972               (unsigned int) pr->rim_id);
1973 #endif
1974   GNUNET_CONTAINER_DLL_insert_tail (h->control_pending_head,
1975                                     h->control_pending_tail,
1976                                     cm); 
1977   pr->pcic = info;
1978   pr->pcic_cls = info_cls;
1979   pr->pcic_ptr = irc; /* for free'ing irc */
1980   trigger_next_request (h, GNUNET_NO);
1981   return irc;
1982 }
1983
1984
1985 /**
1986  * Cancel request for getting information about a peer.
1987  * Note that an eventual change in preference, trust or bandwidth
1988  * assignment MAY have already been committed at the time, 
1989  * so cancelling a request is NOT sure to undo the original
1990  * request.  The original request may or may not still commit.
1991  * The only thing cancellation ensures is that the callback
1992  * from the original request will no longer be called.
1993  *
1994  * @param irc context returned by the original GNUNET_CORE_peer_get_info call
1995  */
1996 void
1997 GNUNET_CORE_peer_change_preference_cancel (struct GNUNET_CORE_InformationRequestContext *irc)
1998 {
1999   struct GNUNET_CORE_Handle *h = irc->h;
2000   struct PeerRecord *pr = irc->pr;
2001
2002   GNUNET_assert (pr->pcic_ptr == irc);
2003   if (irc->cm != NULL)
2004     {
2005       GNUNET_CONTAINER_DLL_remove (h->control_pending_head,
2006                                    h->control_pending_tail,
2007                                    irc->cm);
2008       GNUNET_free (irc->cm);
2009     }
2010   pr->pcic = NULL;
2011   pr->pcic_cls = NULL;
2012   pr->pcic_ptr = NULL;
2013   GNUNET_free (irc);
2014 }
2015
2016
2017 /* end of core_api.c */