remove limit_outbound, DCE
[oweals/gnunet.git] / src / core / core_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file core/core_api.c
23  * @brief core service; this is the main API for encrypted P2P
24  *        communications
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_constants.h"
29 #include "gnunet_core_service.h"
30 #include "core.h"
31
32
33 /**
34  * Information we track for each peer.
35  */
36 struct PeerRecord
37 {
38
39   /**
40    * We generally do NOT keep peer records in a DLL; this
41    * DLL is only used IF this peer's 'pending_head' message
42    * is ready for transmission.
43    */
44   struct PeerRecord *prev;
45
46   /**
47    * We generally do NOT keep peer records in a DLL; this
48    * DLL is only used IF this peer's 'pending_head' message
49    * is ready for transmission.
50    */
51   struct PeerRecord *next;
52
53   /**
54    * Peer the record is about.
55    */
56   struct GNUNET_PeerIdentity peer;
57
58   /**
59    * Corresponding core handle.
60    */
61   struct GNUNET_CORE_Handle *ch;
62
63   /**
64    * Head of doubly-linked list of pending requests.
65    * Requests are sorted by deadline *except* for HEAD,
66    * which is only modified upon transmission to core.
67    */
68   struct GNUNET_CORE_TransmitHandle *pending_head;
69
70   /**
71    * Tail of doubly-linked list of pending requests.
72    */
73   struct GNUNET_CORE_TransmitHandle *pending_tail;
74
75   /**
76    * Pending callback waiting for peer information, or NULL for none.
77    */
78   GNUNET_CORE_PeerConfigurationInfoCallback pcic;
79
80   /**
81    * Closure for pcic.
82    */
83   void *pcic_cls;
84
85   /**
86    * Pointer to free when we call pcic and to use to cancel
87    * preference change on disconnect.
88    */
89   struct GNUNET_CORE_InformationRequestContext *pcic_ptr;
90
91   /**
92    * Request information ID for the given pcic (needed in case a
93    * request is cancelled after being submitted to core and a new
94    * one is generated; in this case, we need to avoid matching the
95    * reply to the first (cancelled) request to the second request).
96    */
97   uint32_t rim_id;
98
99   /**
100    * ID of timeout task for the 'pending_head' handle
101    * which is the one with the smallest timeout.
102    */
103   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
104
105   /**
106    * ID of task to run 'next_request_transmission'.
107    */
108   GNUNET_SCHEDULER_TaskIdentifier ntr_task;
109
110   /**
111    * Current size of the queue of pending requests.
112    */
113   unsigned int queue_size;
114
115   /**
116    * SendMessageRequest ID generator for this peer.
117    */
118   uint16_t smr_id_gen;
119
120 };
121
122
123 /**
124  * Type of function called upon completion.
125  *
126  * @param cls closure
127  * @param success GNUNET_OK on success (which for request_connect
128  *        ONLY means that we transmitted the connect request to CORE,
129  *        it does not mean that we are actually now connected!);
130  *        GNUNET_NO on timeout,
131  *        GNUNET_SYSERR if core was shut down
132  */
133 typedef void (*GNUNET_CORE_ControlContinuation) (void *cls, int success);
134
135
136 /**
137  * Entry in a doubly-linked list of control messages to be transmitted
138  * to the core service.  Control messages include traffic allocation,
139  * connection requests and of course our initial 'init' request.
140  *
141  * The actual message is allocated at the end of this struct.
142  */
143 struct ControlMessage
144 {
145   /**
146    * This is a doubly-linked list.
147    */
148   struct ControlMessage *next;
149
150   /**
151    * This is a doubly-linked list.
152    */
153   struct ControlMessage *prev;
154
155   /**
156    * Function to run after transmission failed/succeeded.
157    */
158   GNUNET_CORE_ControlContinuation cont;
159
160   /**
161    * Closure for 'cont'.
162    */
163   void *cont_cls;
164
165   /**
166    * Transmit handle (if one is associated with this ControlMessage), or NULL.
167    */
168   struct GNUNET_CORE_TransmitHandle *th;
169 };
170
171
172
173 /**
174  * Context for the core service connection.
175  */
176 struct GNUNET_CORE_Handle
177 {
178
179   /**
180    * Configuration we're using.
181    */
182   const struct GNUNET_CONFIGURATION_Handle *cfg;
183
184   /**
185    * Closure for the various callbacks.
186    */
187   void *cls;
188
189   /**
190    * Function to call once we've handshaked with the core service.
191    */
192   GNUNET_CORE_StartupCallback init;
193
194   /**
195    * Function to call whenever we're notified about a peer connecting.
196    */
197   GNUNET_CORE_ConnectEventHandler connects;
198
199   /**
200    * Function to call whenever we're notified about a peer disconnecting.
201    */
202   GNUNET_CORE_DisconnectEventHandler disconnects;
203
204   /**
205    * Function to call whenever we're notified about a peer changing status.
206    */
207   GNUNET_CORE_PeerStatusEventHandler status_events;
208
209   /**
210    * Function to call whenever we receive an inbound message.
211    */
212   GNUNET_CORE_MessageCallback inbound_notify;
213
214   /**
215    * Function to call whenever we receive an outbound message.
216    */
217   GNUNET_CORE_MessageCallback outbound_notify;
218
219   /**
220    * Function handlers for messages of particular type.
221    */
222   const struct GNUNET_CORE_MessageHandler *handlers;
223
224   /**
225    * Our connection to the service.
226    */
227   struct GNUNET_CLIENT_Connection *client;
228
229   /**
230    * Handle for our current transmission request.
231    */
232   struct GNUNET_CLIENT_TransmitHandle *cth;
233
234   /**
235    * Head of doubly-linked list of pending requests.
236    */
237   struct ControlMessage *control_pending_head;
238
239   /**
240    * Tail of doubly-linked list of pending requests.
241    */
242   struct ControlMessage *control_pending_tail;
243
244   /**
245    * Head of doubly-linked list of peers that are core-approved
246    * to send their next message.
247    */
248   struct PeerRecord *ready_peer_head;
249
250   /**
251    * Tail of doubly-linked list of peers that are core-approved
252    * to send their next message.
253    */
254   struct PeerRecord *ready_peer_tail;
255
256   /**
257    * Hash map listing all of the peers that we are currently
258    * connected to.
259    */
260   struct GNUNET_CONTAINER_MultiHashMap *peers;
261
262   /**
263    * Identity of this peer.
264    */
265   struct GNUNET_PeerIdentity me;
266
267   /**
268    * ID of reconnect task (if any).
269    */
270   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
271
272   /**
273    * Current delay we use for re-trying to connect to core.
274    */
275   struct GNUNET_TIME_Relative retry_backoff;
276
277   /**
278    * Request information ID generator.
279    */
280   uint32_t rim_id_gen;
281
282   /**
283    * Number of messages we are allowed to queue per target.
284    */
285   unsigned int queue_size;
286
287   /**
288    * Number of entries in the handlers array.
289    */
290   unsigned int hcnt;
291
292   /**
293    * For inbound notifications without a specific handler, do
294    * we expect to only receive headers?
295    */
296   int inbound_hdr_only;
297
298   /**
299    * For outbound notifications without a specific handler, do
300    * we expect to only receive headers?
301    */
302   int outbound_hdr_only;
303
304   /**
305    * Are we currently disconnected and hence unable to forward
306    * requests?
307    */
308   int currently_down;
309
310 };
311
312
313 /**
314  * Handle for a transmission request.
315  */
316 struct GNUNET_CORE_TransmitHandle
317 {
318
319   /**
320    * We keep active transmit handles in a doubly-linked list.
321    */
322   struct GNUNET_CORE_TransmitHandle *next;
323
324   /**
325    * We keep active transmit handles in a doubly-linked list.
326    */
327   struct GNUNET_CORE_TransmitHandle *prev;
328
329   /**
330    * Corresponding peer record.
331    */
332   struct PeerRecord *peer;
333
334   /**
335    * Corresponding SEND_REQUEST message.  Only non-NULL
336    * while SEND_REQUEST message is pending.
337    */
338   struct ControlMessage *cm;
339
340   /**
341    * Function that will be called to get the actual request
342    * (once we are ready to transmit this request to the core).
343    * The function will be called with a NULL buffer to signal
344    * timeout.
345    */
346   GNUNET_CONNECTION_TransmitReadyNotify get_message;
347
348   /**
349    * Closure for get_message.
350    */
351   void *get_message_cls;
352
353   /**
354    * Timeout for this handle.
355    */
356   struct GNUNET_TIME_Absolute timeout;
357
358   /**
359    * How important is this message?
360    */
361   uint32_t priority;
362
363   /**
364    * Size of this request.
365    */
366   uint16_t msize;
367
368   /**
369    * Send message request ID for this request.
370    */
371   uint16_t smr_id;
372
373   /**
374    * Is corking allowed?
375    */
376   int cork;
377
378 };
379
380
381 /**
382  * Our current client connection went down.  Clean it up
383  * and try to reconnect!
384  *
385  * @param h our handle to the core service
386  */
387 static void
388 reconnect (struct GNUNET_CORE_Handle *h);
389
390
391 /**
392  * Task schedule to try to re-connect to core.
393  *
394  * @param cls the 'struct GNUNET_CORE_Handle'
395  * @param tc task context
396  */
397 static void
398 reconnect_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
399 {
400   struct GNUNET_CORE_Handle *h = cls;
401
402   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
403 #if DEBUG_CORE
404   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
405               "Connecting to CORE service after delay\n");
406 #endif
407   reconnect (h);
408 }
409
410
411 /**
412  * Notify clients about disconnect and free
413  * the entry for connected peer.
414  *
415  * @param cls the 'struct GNUNET_CORE_Handle*'
416  * @param key the peer identity (not used)
417  * @param value the 'struct PeerRecord' to free.
418  * @return GNUNET_YES (continue)
419  */
420 static int
421 disconnect_and_free_peer_entry (void *cls, const GNUNET_HashCode * key,
422                                 void *value)
423 {
424   struct GNUNET_CORE_Handle *h = cls;
425   struct GNUNET_CORE_TransmitHandle *th;
426   struct PeerRecord *pr = value;
427   GNUNET_CORE_PeerConfigurationInfoCallback pcic;
428   void *pcic_cls;
429
430   if (pr->timeout_task != GNUNET_SCHEDULER_NO_TASK)
431   {
432     GNUNET_SCHEDULER_cancel (pr->timeout_task);
433     pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
434   }
435   if (pr->ntr_task != GNUNET_SCHEDULER_NO_TASK)
436   {
437     GNUNET_SCHEDULER_cancel (pr->ntr_task);
438     pr->ntr_task = GNUNET_SCHEDULER_NO_TASK;
439   }
440   if ((pr->prev != NULL) || (pr->next != NULL) || (h->ready_peer_head == pr))
441     GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr);
442   if (h->disconnects != NULL)
443     h->disconnects (h->cls, &pr->peer);
444   /* all requests should have been cancelled, clean up anyway, just in case */
445   GNUNET_break (pr->queue_size == 0);
446   if (NULL != (pcic = pr->pcic))
447   {
448     GNUNET_break (0);
449     pcic_cls = pr->pcic_cls;
450     GNUNET_CORE_peer_change_preference_cancel (pr->pcic_ptr);
451     pcic (pcic_cls, &pr->peer, 0, GNUNET_TIME_UNIT_FOREVER_REL);
452   }
453   while (NULL != (th = pr->pending_head))
454   {
455     GNUNET_break (0);
456     GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, th);
457     pr->queue_size--;
458     if (th->cm != NULL)
459       th->cm->th = NULL;
460     GNUNET_free (th);
461   }
462   /* done with 'voluntary' cleanups, now on to normal freeing */
463   GNUNET_assert (GNUNET_YES ==
464                  GNUNET_CONTAINER_multihashmap_remove (h->peers, key, pr));
465   GNUNET_assert (pr->pending_head == NULL);
466   GNUNET_assert (pr->pending_tail == NULL);
467   GNUNET_assert (pr->ch = h);
468   GNUNET_assert (pr->queue_size == 0);
469   GNUNET_assert (pr->timeout_task == GNUNET_SCHEDULER_NO_TASK);
470   GNUNET_assert (pr->ntr_task == GNUNET_SCHEDULER_NO_TASK);
471   GNUNET_free (pr);
472   return GNUNET_YES;
473 }
474
475
476 /**
477  * Close down any existing connection to the CORE service and
478  * try re-establishing it later.
479  *
480  * @param h our handle
481  */
482 static void
483 reconnect_later (struct GNUNET_CORE_Handle *h)
484 {
485   struct ControlMessage *cm;
486   struct PeerRecord *pr;
487
488   GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
489   if (NULL != h->cth)
490   {
491     GNUNET_CLIENT_notify_transmit_ready_cancel (h->cth);
492     h->cth = NULL;
493   }
494   if (h->client != NULL)
495   {
496     GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
497     h->client = NULL;
498   }
499   h->currently_down = GNUNET_YES;
500   GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
501   h->reconnect_task =
502       GNUNET_SCHEDULER_add_delayed (h->retry_backoff, &reconnect_task, h);
503   while (NULL != (cm = h->control_pending_head))
504   {
505     GNUNET_CONTAINER_DLL_remove (h->control_pending_head,
506                                  h->control_pending_tail, cm);
507     if (cm->th != NULL)
508       cm->th->cm = NULL;
509     if (cm->cont != NULL)
510       cm->cont (cm->cont_cls, GNUNET_NO);
511     GNUNET_free (cm);
512   }
513   GNUNET_CONTAINER_multihashmap_iterate (h->peers,
514                                          &disconnect_and_free_peer_entry, h);
515   while (NULL != (pr = h->ready_peer_head))
516     GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr);
517   GNUNET_assert (h->control_pending_head == NULL);
518   h->retry_backoff =
519       GNUNET_TIME_relative_min (GNUNET_TIME_UNIT_SECONDS, h->retry_backoff);
520   h->retry_backoff = GNUNET_TIME_relative_multiply (h->retry_backoff, 2);
521 }
522
523
524 /**
525  * Check the list of pending requests, send the next
526  * one to the core.
527  *
528  * @param h core handle
529  * @param ignore_currently_down transmit message even if not initialized?
530  */
531 static void
532 trigger_next_request (struct GNUNET_CORE_Handle *h, int ignore_currently_down);
533
534
535 /**
536  * The given request hit its timeout.  Remove from the
537  * doubly-linked list and call the respective continuation.
538  *
539  * @param cls the transmit handle of the request that timed out
540  * @param tc context, can be NULL (!)
541  */
542 static void
543 transmission_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
544
545
546 /**
547  * Send a control message to the peer asking for transmission
548  * of the message in the given peer record.
549  *
550  * @param pr peer to request transmission to
551  */
552 static void
553 request_next_transmission (struct PeerRecord *pr)
554 {
555   struct GNUNET_CORE_Handle *h = pr->ch;
556   struct ControlMessage *cm;
557   struct SendMessageRequest *smr;
558   struct GNUNET_CORE_TransmitHandle *th;
559
560   if (pr->timeout_task != GNUNET_SCHEDULER_NO_TASK)
561   {
562     GNUNET_SCHEDULER_cancel (pr->timeout_task);
563     pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
564   }
565   if (NULL == (th = pr->pending_head))
566   {
567     trigger_next_request (h, GNUNET_NO);
568     return;
569   }
570   if (th->cm != NULL)
571     return;                     /* already done */
572   GNUNET_assert (pr->prev == NULL);
573   GNUNET_assert (pr->next == NULL);
574   pr->timeout_task =
575       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining
576                                     (th->timeout), &transmission_timeout, pr);
577   cm = GNUNET_malloc (sizeof (struct ControlMessage) +
578                       sizeof (struct SendMessageRequest));
579   th->cm = cm;
580   cm->th = th;
581   smr = (struct SendMessageRequest *) &cm[1];
582   smr->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND_REQUEST);
583   smr->header.size = htons (sizeof (struct SendMessageRequest));
584   smr->priority = htonl (th->priority);
585   smr->deadline = GNUNET_TIME_absolute_hton (th->timeout);
586   smr->peer = pr->peer;
587   smr->queue_size = htonl (pr->queue_size);
588   smr->size = htons (th->msize);
589   smr->smr_id = htons (th->smr_id = pr->smr_id_gen++);
590   GNUNET_CONTAINER_DLL_insert_tail (h->control_pending_head,
591                                     h->control_pending_tail, cm);
592 #if DEBUG_CORE
593   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
594               "Adding SEND REQUEST for peer `%s' to message queue\n",
595               GNUNET_i2s (&pr->peer));
596 #endif
597   trigger_next_request (h, GNUNET_NO);
598 }
599
600
601 /**
602  * The given request hit its timeout.  Remove from the
603  * doubly-linked list and call the respective continuation.
604  *
605  * @param cls the transmit handle of the request that timed out
606  * @param tc context, can be NULL (!)
607  */
608 static void
609 transmission_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
610 {
611   struct PeerRecord *pr = cls;
612   struct GNUNET_CORE_Handle *h = pr->ch;
613   struct GNUNET_CORE_TransmitHandle *th;
614
615   pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
616   th = pr->pending_head;
617   GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, th);
618   pr->queue_size--;
619   if ((pr->prev != NULL) || (pr->next != NULL) || (pr == h->ready_peer_head))
620   {
621     /* the request that was 'approved' by core was
622      * canceled before it could be transmitted; remove
623      * us from the 'ready' list */
624     GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr);
625   }
626 #if DEBUG_CORE
627   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
628               "Signalling timeout of request for transmission to CORE service\n");
629 #endif
630   request_next_transmission (pr);
631   GNUNET_assert (0 == th->get_message (th->get_message_cls, 0, NULL));
632   GNUNET_free (th);
633 }
634
635
636 /**
637  * Transmit the next message to the core service.
638  */
639 static size_t
640 transmit_message (void *cls, size_t size, void *buf)
641 {
642   struct GNUNET_CORE_Handle *h = cls;
643   struct ControlMessage *cm;
644   struct GNUNET_CORE_TransmitHandle *th;
645   struct PeerRecord *pr;
646   struct SendMessage *sm;
647   const struct GNUNET_MessageHeader *hdr;
648   uint16_t msize;
649   size_t ret;
650
651   GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
652   h->cth = NULL;
653   if (buf == NULL)
654   {
655 #if DEBUG_CORE
656     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
657                 "Transmission failed, initiating reconnect\n");
658 #endif
659     reconnect_later (h);
660     return 0;
661   }
662   /* first check for control messages */
663   if (NULL != (cm = h->control_pending_head))
664   {
665     hdr = (const struct GNUNET_MessageHeader *) &cm[1];
666     msize = ntohs (hdr->size);
667     if (size < msize)
668     {
669       trigger_next_request (h, GNUNET_NO);
670       return 0;
671     }
672 #if DEBUG_CORE
673     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
674                 "Transmitting control message with %u bytes of type %u to core.\n",
675                 (unsigned int) msize, (unsigned int) ntohs (hdr->type));
676 #endif
677     memcpy (buf, hdr, msize);
678     GNUNET_CONTAINER_DLL_remove (h->control_pending_head,
679                                  h->control_pending_tail, cm);
680     if (cm->th != NULL)
681       cm->th->cm = NULL;
682     if (NULL != cm->cont)
683       cm->cont (cm->cont_cls, GNUNET_OK);
684     GNUNET_free (cm);
685     trigger_next_request (h, GNUNET_NO);
686     return msize;
687   }
688   /* now check for 'ready' P2P messages */
689   if (NULL != (pr = h->ready_peer_head))
690   {
691     GNUNET_assert (pr->pending_head != NULL);
692     th = pr->pending_head;
693     if (size < th->msize + sizeof (struct SendMessage))
694     {
695       trigger_next_request (h, GNUNET_NO);
696       return 0;
697     }
698     GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr);
699     GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, th);
700     pr->queue_size--;
701     if (pr->timeout_task != GNUNET_SCHEDULER_NO_TASK)
702     {
703       GNUNET_SCHEDULER_cancel (pr->timeout_task);
704       pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
705     }
706 #if DEBUG_CORE
707     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
708                 "Transmitting SEND request to `%s' with %u bytes.\n",
709                 GNUNET_i2s (&pr->peer), (unsigned int) th->msize);
710 #endif
711     sm = (struct SendMessage *) buf;
712     sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND);
713     sm->priority = htonl (th->priority);
714     sm->deadline = GNUNET_TIME_absolute_hton (th->timeout);
715     sm->peer = pr->peer;
716     sm->cork = htonl ((uint32_t) th->cork);
717     sm->reserved = htonl (0);
718     ret =
719         th->get_message (th->get_message_cls,
720                          size - sizeof (struct SendMessage), &sm[1]);
721
722 #if DEBUG_CORE
723     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
724                 "Transmitting SEND request to `%s' yielded %u bytes.\n",
725                 GNUNET_i2s (&pr->peer), ret);
726 #endif
727     GNUNET_free (th);
728     if (0 == ret)
729     {
730 #if DEBUG_CORE
731       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
732                   "Size of clients message to peer %s is 0!\n",
733                   GNUNET_i2s (&pr->peer));
734 #endif
735       /* client decided to send nothing! */
736       request_next_transmission (pr);
737       return 0;
738     }
739 #if DEBUG_CORE
740     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
741                 "Produced SEND message to core with %u bytes payload\n",
742                 (unsigned int) ret);
743 #endif
744     GNUNET_assert (ret >= sizeof (struct GNUNET_MessageHeader));
745     if (ret + sizeof (struct SendMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
746     {
747       GNUNET_break (0);
748       request_next_transmission (pr);
749       return 0;
750     }
751     ret += sizeof (struct SendMessage);
752     sm->header.size = htons (ret);
753     GNUNET_assert (ret <= size);
754     request_next_transmission (pr);
755     return ret;
756   }
757   return 0;
758 }
759
760
761 /**
762  * Check the list of pending requests, send the next
763  * one to the core.
764  *
765  * @param h core handle
766  * @param ignore_currently_down transmit message even if not initialized?
767  */
768 static void
769 trigger_next_request (struct GNUNET_CORE_Handle *h, int ignore_currently_down)
770 {
771   uint16_t msize;
772
773   if ((GNUNET_YES == h->currently_down) && (ignore_currently_down == GNUNET_NO))
774   {
775 #if DEBUG_CORE
776     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
777                 "Core connection down, not processing queue\n");
778 #endif
779     return;
780   }
781   if (NULL != h->cth)
782   {
783 #if DEBUG_CORE
784     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
785                 "Request pending, not processing queue\n");
786 #endif
787     return;
788   }
789   if (h->control_pending_head != NULL)
790     msize =
791         ntohs (((struct GNUNET_MessageHeader *) &h->
792                 control_pending_head[1])->size);
793   else if (h->ready_peer_head != NULL)
794     msize =
795         h->ready_peer_head->pending_head->msize + sizeof (struct SendMessage);
796   else
797   {
798 #if DEBUG_CORE
799     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
800                 "Request queue empty, not processing queue\n");
801 #endif
802     return;                     /* no pending message */
803   }
804   h->cth =
805       GNUNET_CLIENT_notify_transmit_ready (h->client, msize,
806                                            GNUNET_TIME_UNIT_FOREVER_REL,
807                                            GNUNET_NO, &transmit_message, h);
808 }
809
810
811 /**
812  * Handler for notification messages received from the core.
813  *
814  * @param cls our "struct GNUNET_CORE_Handle"
815  * @param msg the message received from the core service
816  */
817 static void
818 main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg)
819 {
820   struct GNUNET_CORE_Handle *h = cls;
821   const struct InitReplyMessage *m;
822   const struct ConnectNotifyMessage *cnm;
823   const struct DisconnectNotifyMessage *dnm;
824   const struct NotifyTrafficMessage *ntm;
825   const struct GNUNET_MessageHeader *em;
826   const struct ConfigurationInfoMessage *cim;
827   const struct PeerStatusNotifyMessage *psnm;
828   const struct SendMessageReady *smr;
829   const struct GNUNET_CORE_MessageHandler *mh;
830   GNUNET_CORE_StartupCallback init;
831   GNUNET_CORE_PeerConfigurationInfoCallback pcic;
832   struct PeerRecord *pr;
833   struct GNUNET_CORE_TransmitHandle *th;
834   unsigned int hpos;
835   int trigger;
836   uint16_t msize;
837   uint16_t et;
838   uint32_t ats_count;
839
840   if (msg == NULL)
841   {
842     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
843                 _
844                 ("Client was disconnected from core service, trying to reconnect.\n"));
845     reconnect_later (h);
846     return;
847   }
848   msize = ntohs (msg->size);
849 #if DEBUG_CORE > 2
850   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
851               "Processing message of type %u and size %u from core service\n",
852               ntohs (msg->type), msize);
853 #endif
854   switch (ntohs (msg->type))
855   {
856   case GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY:
857     if (ntohs (msg->size) != sizeof (struct InitReplyMessage))
858     {
859       GNUNET_break (0);
860       reconnect_later (h);
861       return;
862     }
863     m = (const struct InitReplyMessage *) msg;
864     GNUNET_break (0 == ntohl (m->reserved));
865     /* start our message processing loop */
866     if (GNUNET_YES == h->currently_down)
867     {
868       h->currently_down = GNUNET_NO;
869       trigger_next_request (h, GNUNET_NO);
870     }
871     h->retry_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
872     h->me = m->my_identity;
873     if (NULL != (init = h->init))
874     {
875       /* mark so we don't call init on reconnect */
876       h->init = NULL;
877 #if DEBUG_CORE
878       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
879                   "Connected to core service of peer `%s'.\n",
880                   GNUNET_i2s (&h->me));
881 #endif
882       init (h->cls, h, &h->me);
883     }
884     else
885     {
886 #if DEBUG_CORE
887       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
888                   "Successfully reconnected to core service.\n");
889 #endif
890     }
891     /* fake 'connect to self' */
892     pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &h->me.hashPubKey);
893     GNUNET_assert (pr == NULL);
894     pr = GNUNET_malloc (sizeof (struct PeerRecord));
895     pr->peer = h->me;
896     pr->ch = h;
897     GNUNET_assert (GNUNET_YES ==
898                    GNUNET_CONTAINER_multihashmap_put (h->peers,
899                                                       &h->me.hashPubKey, pr,
900                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
901     if (NULL != h->connects)
902       h->connects (h->cls, &h->me, NULL);
903     break;
904   case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT:
905     if (msize < sizeof (struct ConnectNotifyMessage))
906     {
907       GNUNET_break (0);
908       reconnect_later (h);
909       return;
910     }
911     cnm = (const struct ConnectNotifyMessage *) msg;
912     ats_count = ntohl (cnm->ats_count);
913     if ((msize !=
914          sizeof (struct ConnectNotifyMessage) +
915          ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information)) ||
916         (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR !=
917          ntohl ((&cnm->ats)[ats_count].type)))
918     {
919       GNUNET_break (0);
920       reconnect_later (h);
921       return;
922     }
923 #if DEBUG_CORE
924     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
925                 "Received notification about connection from `%s'.\n",
926                 GNUNET_i2s (&cnm->peer));
927 #endif
928     if (0 == memcmp (&h->me, &cnm->peer, sizeof (struct GNUNET_PeerIdentity)))
929     {
930       /* connect to self!? */
931       GNUNET_break (0);
932       return;
933     }
934     pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &cnm->peer.hashPubKey);
935     if (pr != NULL)
936     {
937       GNUNET_break (0);
938       reconnect_later (h);
939       return;
940     }
941     pr = GNUNET_malloc (sizeof (struct PeerRecord));
942     pr->peer = cnm->peer;
943     pr->ch = h;
944     GNUNET_assert (GNUNET_YES ==
945                    GNUNET_CONTAINER_multihashmap_put (h->peers,
946                                                       &cnm->peer.hashPubKey, pr,
947                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
948     if (NULL != h->connects)
949       h->connects (h->cls, &cnm->peer, &cnm->ats);
950     break;
951   case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT:
952     if (msize != sizeof (struct DisconnectNotifyMessage))
953     {
954       GNUNET_break (0);
955       reconnect_later (h);
956       return;
957     }
958     dnm = (const struct DisconnectNotifyMessage *) msg;
959     if (0 == memcmp (&h->me, &dnm->peer, sizeof (struct GNUNET_PeerIdentity)))
960     {
961       /* connection to self!? */
962       GNUNET_break (0);
963       return;
964     }
965     GNUNET_break (0 == ntohl (dnm->reserved));
966 #if DEBUG_CORE
967     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
968                 "Received notification about disconnect from `%s'.\n",
969                 GNUNET_i2s (&dnm->peer));
970 #endif
971     pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &dnm->peer.hashPubKey);
972     if (pr == NULL)
973     {
974       GNUNET_break (0);
975       reconnect_later (h);
976       return;
977     }
978     trigger = ((pr->prev != NULL) || (pr->next != NULL) ||
979                (h->ready_peer_head == pr));
980     disconnect_and_free_peer_entry (h, &dnm->peer.hashPubKey, pr);
981     if (trigger)
982       trigger_next_request (h, GNUNET_NO);
983     break;
984   case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_STATUS_CHANGE:
985     if (NULL == h->status_events)
986     {
987       GNUNET_break (0);
988       return;
989     }
990     if (msize < sizeof (struct PeerStatusNotifyMessage))
991     {
992       GNUNET_break (0);
993       reconnect_later (h);
994       return;
995     }
996     psnm = (const struct PeerStatusNotifyMessage *) msg;
997     if (0 == memcmp (&h->me, &psnm->peer, sizeof (struct GNUNET_PeerIdentity)))
998     {
999       /* self-change!? */
1000       GNUNET_break (0);
1001       return;
1002     }
1003     ats_count = ntohl (psnm->ats_count);
1004     if ((msize !=
1005          sizeof (struct PeerStatusNotifyMessage) +
1006          ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information)) ||
1007         (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR !=
1008          ntohl ((&psnm->ats)[ats_count].type)))
1009     {
1010       GNUNET_break (0);
1011       reconnect_later (h);
1012       return;
1013     }
1014 #if DEBUG_CORE > 1
1015     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1016                 "Received notification about status change by `%s'.\n",
1017                 GNUNET_i2s (&psnm->peer));
1018 #endif
1019     pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &psnm->peer.hashPubKey);
1020     if (pr == NULL)
1021     {
1022       GNUNET_break (0);
1023       reconnect_later (h);
1024       return;
1025     }
1026     h->status_events (h->cls, &psnm->peer, psnm->bandwidth_in,
1027                       psnm->bandwidth_out,
1028                       GNUNET_TIME_absolute_ntoh (psnm->timeout), &psnm->ats);
1029     break;
1030   case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND:
1031     if (msize < sizeof (struct NotifyTrafficMessage))
1032     {
1033       GNUNET_break (0);
1034       reconnect_later (h);
1035       return;
1036     }
1037     ntm = (const struct NotifyTrafficMessage *) msg;
1038
1039     ats_count = ntohl (ntm->ats_count);
1040     if ((msize <
1041          sizeof (struct NotifyTrafficMessage) +
1042          ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information) +
1043          sizeof (struct GNUNET_MessageHeader)) ||
1044         (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR !=
1045          ntohl ((&ntm->ats)[ats_count].type)))
1046     {
1047       GNUNET_break (0);
1048       reconnect_later (h);
1049       return;
1050     }
1051     em = (const struct GNUNET_MessageHeader *) &(&ntm->ats)[ats_count + 1];
1052 #if DEBUG_CORE
1053     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1054                 "Received message of type %u and size %u from peer `%4s'\n",
1055                 ntohs (em->type), ntohs (em->size), GNUNET_i2s (&ntm->peer));
1056 #endif
1057     pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &ntm->peer.hashPubKey);
1058     if (pr == NULL)
1059     {
1060       GNUNET_break (0);
1061       reconnect_later (h);
1062       return;
1063     }
1064     if ((GNUNET_NO == h->inbound_hdr_only) &&
1065         (msize !=
1066          ntohs (em->size) + sizeof (struct NotifyTrafficMessage) +
1067          +ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information)))
1068     {
1069       GNUNET_break (0);
1070       reconnect_later (h);
1071       return;
1072     }
1073     et = ntohs (em->type);
1074     for (hpos = 0; hpos < h->hcnt; hpos++)
1075     {
1076       mh = &h->handlers[hpos];
1077       if (mh->type != et)
1078         continue;
1079       if ((mh->expected_size != ntohs (em->size)) && (mh->expected_size != 0))
1080       {
1081         GNUNET_break (0);
1082         continue;
1083       }
1084       if (GNUNET_OK !=
1085           h->handlers[hpos].callback (h->cls, &ntm->peer, em, &ntm->ats))
1086       {
1087         /* error in processing, do not process other messages! */
1088         break;
1089       }
1090     }
1091     if (NULL != h->inbound_notify)
1092       h->inbound_notify (h->cls, &ntm->peer, em, &ntm->ats);
1093     break;
1094   case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_OUTBOUND:
1095     if (msize < sizeof (struct NotifyTrafficMessage))
1096     {
1097       GNUNET_break (0);
1098       reconnect_later (h);
1099       return;
1100     }
1101     ntm = (const struct NotifyTrafficMessage *) msg;
1102     if (0 == memcmp (&h->me, &ntm->peer, sizeof (struct GNUNET_PeerIdentity)))
1103     {
1104       /* self-change!? */
1105       GNUNET_break (0);
1106       return;
1107     }
1108     ats_count = ntohl (ntm->ats_count);
1109     if ((msize <
1110          sizeof (struct NotifyTrafficMessage) +
1111          ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information) +
1112          sizeof (struct GNUNET_MessageHeader)) ||
1113         (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR !=
1114          ntohl ((&ntm->ats)[ats_count].type)))
1115     {
1116       GNUNET_break (0);
1117       reconnect_later (h);
1118       return;
1119     }
1120     em = (const struct GNUNET_MessageHeader *) &(&ntm->ats)[ats_count + 1];
1121     pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &ntm->peer.hashPubKey);
1122     if (pr == NULL)
1123     {
1124       GNUNET_break (0);
1125       reconnect_later (h);
1126       return;
1127     }
1128 #if DEBUG_CORE
1129     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1130                 "Received notification about transmission to `%s'.\n",
1131                 GNUNET_i2s (&ntm->peer));
1132 #endif
1133     if ((GNUNET_NO == h->outbound_hdr_only) &&
1134         (msize !=
1135          ntohs (em->size) + sizeof (struct NotifyTrafficMessage) +
1136          ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information)))
1137     {
1138       GNUNET_break (0);
1139       reconnect_later (h);
1140       return;
1141     }
1142     if (NULL == h->outbound_notify)
1143     {
1144       GNUNET_break (0);
1145       break;
1146     }
1147     h->outbound_notify (h->cls, &ntm->peer, em, &ntm->ats);
1148     break;
1149   case GNUNET_MESSAGE_TYPE_CORE_SEND_READY:
1150     if (msize != sizeof (struct SendMessageReady))
1151     {
1152       GNUNET_break (0);
1153       reconnect_later (h);
1154       return;
1155     }
1156     smr = (const struct SendMessageReady *) msg;
1157     pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &smr->peer.hashPubKey);
1158     if (pr == NULL)
1159     {
1160       GNUNET_break (0);
1161       reconnect_later (h);
1162       return;
1163     }
1164 #if DEBUG_CORE
1165     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1166                 "Received notification about transmission readiness to `%s'.\n",
1167                 GNUNET_i2s (&smr->peer));
1168 #endif
1169     if (pr->pending_head == NULL)
1170     {
1171       /* request must have been cancelled between the original request
1172        * and the response from core, ignore core's readiness */
1173       break;
1174     }
1175
1176     th = pr->pending_head;
1177     if (ntohs (smr->smr_id) != th->smr_id)
1178     {
1179       /* READY message is for expired or cancelled message,
1180        * ignore! (we should have already sent another request) */
1181       break;
1182     }
1183     if ((pr->prev != NULL) || (pr->next != NULL) || (h->ready_peer_head == pr))
1184     {
1185       /* we should not already be on the ready list... */
1186       GNUNET_break (0);
1187       reconnect_later (h);
1188       return;
1189     }
1190     GNUNET_CONTAINER_DLL_insert (h->ready_peer_head, h->ready_peer_tail, pr);
1191     trigger_next_request (h, GNUNET_NO);
1192     break;
1193   case GNUNET_MESSAGE_TYPE_CORE_CONFIGURATION_INFO:
1194     if (ntohs (msg->size) != sizeof (struct ConfigurationInfoMessage))
1195     {
1196       GNUNET_break (0);
1197       reconnect_later (h);
1198       return;
1199     }
1200     cim = (const struct ConfigurationInfoMessage *) msg;
1201     if (0 == memcmp (&h->me, &cim->peer, sizeof (struct GNUNET_PeerIdentity)))
1202     {
1203       /* self-change!? */
1204       GNUNET_break (0);
1205       return;
1206     }
1207 #if DEBUG_CORE
1208     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1209                 "Received notification about configuration update for `%s' with RIM %u.\n",
1210                 GNUNET_i2s (&cim->peer), (unsigned int) ntohl (cim->rim_id));
1211 #endif
1212     pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &cim->peer.hashPubKey);
1213     if (pr == NULL)
1214     {
1215       GNUNET_break (0);
1216       reconnect_later (h);
1217       return;
1218     }
1219     if (pr->rim_id != ntohl (cim->rim_id))
1220     {
1221 #if DEBUG_CORE
1222       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1223                   "Reservation ID mismatch in notification...\n");
1224 #endif
1225       break;
1226     }
1227     pcic = pr->pcic;
1228     pr->pcic = NULL;
1229     GNUNET_free_non_null (pr->pcic_ptr);
1230     pr->pcic_ptr = NULL;
1231     if (pcic != NULL)
1232       pcic (pr->pcic_cls, &pr->peer, ntohl (cim->reserved_amount),
1233             GNUNET_TIME_relative_ntoh (cim->reserve_delay));
1234     break;
1235   default:
1236     reconnect_later (h);
1237     return;
1238   }
1239   GNUNET_CLIENT_receive (h->client, &main_notify_handler, h,
1240                          GNUNET_TIME_UNIT_FOREVER_REL);
1241 }
1242
1243
1244 /**
1245  * Task executed once we are done transmitting the INIT message.
1246  * Starts our 'receive' loop.
1247  *
1248  * @param cls the 'struct GNUNET_CORE_Handle'
1249  * @param success were we successful
1250  */
1251 static void
1252 init_done_task (void *cls, int success)
1253 {
1254   struct GNUNET_CORE_Handle *h = cls;
1255
1256   if (success == GNUNET_SYSERR)
1257     return;                     /* shutdown */
1258   if (success == GNUNET_NO)
1259   {
1260 #if DEBUG_CORE
1261     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1262                 "Failed to exchange INIT with core, retrying\n");
1263 #endif
1264     if (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK)
1265       reconnect_later (h);
1266     return;
1267   }
1268   GNUNET_CLIENT_receive (h->client, &main_notify_handler, h,
1269                          GNUNET_TIME_UNIT_FOREVER_REL);
1270 }
1271
1272
1273 /**
1274  * Our current client connection went down.  Clean it up
1275  * and try to reconnect!
1276  *
1277  * @param h our handle to the core service
1278  */
1279 static void
1280 reconnect (struct GNUNET_CORE_Handle *h)
1281 {
1282   struct ControlMessage *cm;
1283   struct InitMessage *init;
1284   uint32_t opt;
1285   uint16_t msize;
1286   uint16_t *ts;
1287   unsigned int hpos;
1288
1289 #if DEBUG_CORE
1290   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Reconnecting to CORE service\n");
1291 #endif
1292   GNUNET_assert (h->client == NULL);
1293   GNUNET_assert (h->currently_down == GNUNET_YES);
1294   h->client = GNUNET_CLIENT_connect ("core", h->cfg);
1295   if (h->client == NULL)
1296   {
1297     reconnect_later (h);
1298     return;
1299   }
1300   msize = h->hcnt * sizeof (uint16_t) + sizeof (struct InitMessage);
1301   cm = GNUNET_malloc (sizeof (struct ControlMessage) + msize);
1302   cm->cont = &init_done_task;
1303   cm->cont_cls = h;
1304   init = (struct InitMessage *) &cm[1];
1305   init->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_INIT);
1306   init->header.size = htons (msize);
1307   opt = GNUNET_CORE_OPTION_SEND_CONNECT | GNUNET_CORE_OPTION_SEND_DISCONNECT;
1308   if (h->status_events != NULL)
1309     opt |= GNUNET_CORE_OPTION_SEND_STATUS_CHANGE;
1310   if (h->inbound_notify != NULL)
1311   {
1312     if (h->inbound_hdr_only)
1313       opt |= GNUNET_CORE_OPTION_SEND_HDR_INBOUND;
1314     else
1315       opt |= GNUNET_CORE_OPTION_SEND_FULL_INBOUND;
1316   }
1317   if (h->outbound_notify != NULL)
1318   {
1319     if (h->outbound_hdr_only)
1320       opt |= GNUNET_CORE_OPTION_SEND_HDR_OUTBOUND;
1321     else
1322       opt |= GNUNET_CORE_OPTION_SEND_FULL_OUTBOUND;
1323   }
1324   init->options = htonl (opt);
1325   ts = (uint16_t *) & init[1];
1326   for (hpos = 0; hpos < h->hcnt; hpos++)
1327     ts[hpos] = htons (h->handlers[hpos].type);
1328   GNUNET_CONTAINER_DLL_insert (h->control_pending_head, h->control_pending_tail,
1329                                cm);
1330   trigger_next_request (h, GNUNET_YES);
1331 }
1332
1333
1334
1335 /**
1336  * Connect to the core service.  Note that the connection may
1337  * complete (or fail) asynchronously.
1338  *
1339  * @param cfg configuration to use
1340  * @param queue_size size of the per-peer message queue
1341  * @param cls closure for the various callbacks that follow (including handlers in the handlers array)
1342  * @param init callback to call on timeout or once we have successfully
1343  *        connected to the core service; note that timeout is only meaningful if init is not NULL
1344  * @param connects function to call on peer connect, can be NULL
1345  * @param disconnects function to call on peer disconnect / timeout, can be NULL
1346  * @param status_events function to call on changes to peer connection status, can be NULL
1347  * @param inbound_notify function to call for all inbound messages, can be NULL
1348  * @param inbound_hdr_only set to GNUNET_YES if inbound_notify will only read the
1349  *                GNUNET_MessageHeader and hence we do not need to give it the full message;
1350  *                can be used to improve efficiency, ignored if inbound_notify is NULLL
1351  * @param outbound_notify function to call for all outbound messages, can be NULL
1352  * @param outbound_hdr_only set to GNUNET_YES if outbound_notify will only read the
1353  *                GNUNET_MessageHeader and hence we do not need to give it the full message
1354  *                can be used to improve efficiency, ignored if outbound_notify is NULLL
1355  * @param handlers callbacks for messages we care about, NULL-terminated
1356  * @return handle to the core service (only useful for disconnect until 'init' is called);
1357  *                NULL on error (in this case, init is never called)
1358  */
1359 struct GNUNET_CORE_Handle *
1360 GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
1361                      unsigned int queue_size, void *cls,
1362                      GNUNET_CORE_StartupCallback init,
1363                      GNUNET_CORE_ConnectEventHandler connects,
1364                      GNUNET_CORE_DisconnectEventHandler disconnects,
1365                      GNUNET_CORE_PeerStatusEventHandler status_events,
1366                      GNUNET_CORE_MessageCallback inbound_notify,
1367                      int inbound_hdr_only,
1368                      GNUNET_CORE_MessageCallback outbound_notify,
1369                      int outbound_hdr_only,
1370                      const struct GNUNET_CORE_MessageHandler *handlers)
1371 {
1372   struct GNUNET_CORE_Handle *h;
1373
1374   h = GNUNET_malloc (sizeof (struct GNUNET_CORE_Handle));
1375   h->cfg = cfg;
1376   h->queue_size = queue_size;
1377   h->cls = cls;
1378   h->init = init;
1379   h->connects = connects;
1380   h->disconnects = disconnects;
1381   h->status_events = status_events;
1382   h->inbound_notify = inbound_notify;
1383   h->outbound_notify = outbound_notify;
1384   h->inbound_hdr_only = inbound_hdr_only;
1385   h->outbound_hdr_only = outbound_hdr_only;
1386   h->handlers = handlers;
1387   h->hcnt = 0;
1388   h->currently_down = GNUNET_YES;
1389   h->peers = GNUNET_CONTAINER_multihashmap_create (128);
1390   h->retry_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
1391   if (NULL != handlers)
1392     while (handlers[h->hcnt].callback != NULL)
1393       h->hcnt++;
1394   GNUNET_assert (h->hcnt <
1395                  (GNUNET_SERVER_MAX_MESSAGE_SIZE -
1396                   sizeof (struct InitMessage)) / sizeof (uint16_t));
1397 #if DEBUG_CORE
1398   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Connecting to CORE service\n");
1399 #endif
1400   reconnect (h);
1401   return h;
1402 }
1403
1404
1405 /**
1406  * Disconnect from the core service.  This function can only
1407  * be called *after* all pending 'GNUNET_CORE_notify_transmit_ready'
1408  * requests have been explicitly canceled.
1409  *
1410  * @param handle connection to core to disconnect
1411  */
1412 void
1413 GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle)
1414 {
1415   struct ControlMessage *cm;
1416
1417 #if DEBUG_CORE
1418   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting from CORE service\n");
1419 #endif
1420   if (handle->cth != NULL)
1421   {
1422     GNUNET_CLIENT_notify_transmit_ready_cancel (handle->cth);
1423     handle->cth = NULL;
1424   }
1425   while (NULL != (cm = handle->control_pending_head))
1426   {
1427     GNUNET_CONTAINER_DLL_remove (handle->control_pending_head,
1428                                  handle->control_pending_tail, cm);
1429     if (cm->th != NULL)
1430       cm->th->cm = NULL;
1431     if (cm->cont != NULL)
1432       cm->cont (cm->cont_cls, GNUNET_SYSERR);
1433     GNUNET_free (cm);
1434   }
1435   if (handle->client != NULL)
1436   {
1437     GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
1438     handle->client = NULL;
1439   }
1440   GNUNET_CONTAINER_multihashmap_iterate (handle->peers,
1441                                          &disconnect_and_free_peer_entry,
1442                                          handle);
1443   if (handle->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
1444   {
1445     GNUNET_SCHEDULER_cancel (handle->reconnect_task);
1446     handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
1447   }
1448   GNUNET_CONTAINER_multihashmap_destroy (handle->peers);
1449   handle->peers = NULL;
1450   GNUNET_break (handle->ready_peer_head == NULL);
1451   GNUNET_free (handle);
1452 }
1453
1454
1455 /**
1456  * Task that calls 'request_next_transmission'.
1457  *
1458  * @param cls the 'struct PeerRecord*'
1459  * @param tc scheduler context
1460  */
1461 static void
1462 run_request_next_transmission (void *cls,
1463                                const struct GNUNET_SCHEDULER_TaskContext *tc)
1464 {
1465   struct PeerRecord *pr = cls;
1466
1467   pr->ntr_task = GNUNET_SCHEDULER_NO_TASK;
1468   request_next_transmission (pr);
1469 }
1470
1471
1472 /**
1473  * Ask the core to call "notify" once it is ready to transmit the
1474  * given number of bytes to the specified "target".    Must only be
1475  * called after a connection to the respective peer has been
1476  * established (and the client has been informed about this).
1477  *
1478  * @param handle connection to core service
1479  * @param cork is corking allowed for this transmission?
1480  * @param priority how important is the message?
1481  * @param maxdelay how long can the message wait?
1482  * @param target who should receive the message,
1483  *        use NULL for this peer (loopback)
1484  * @param notify_size how many bytes of buffer space does notify want?
1485  * @param notify function to call when buffer space is available
1486  * @param notify_cls closure for notify
1487  * @return non-NULL if the notify callback was queued,
1488  *         NULL if we can not even queue the request (insufficient
1489  *         memory); if NULL is returned, "notify" will NOT be called.
1490  */
1491 struct GNUNET_CORE_TransmitHandle *
1492 GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle, int cork,
1493                                    uint32_t priority,
1494                                    struct GNUNET_TIME_Relative maxdelay,
1495                                    const struct GNUNET_PeerIdentity *target,
1496                                    size_t notify_size,
1497                                    GNUNET_CONNECTION_TransmitReadyNotify notify,
1498                                    void *notify_cls)
1499 {
1500   struct PeerRecord *pr;
1501   struct GNUNET_CORE_TransmitHandle *th;
1502   struct GNUNET_CORE_TransmitHandle *pos;
1503   struct GNUNET_CORE_TransmitHandle *prev;
1504   struct GNUNET_CORE_TransmitHandle *minp;
1505
1506   pr = GNUNET_CONTAINER_multihashmap_get (handle->peers, &target->hashPubKey);
1507   if (NULL == pr)
1508   {
1509     /* attempt to send to peer that is not connected */
1510     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1511                 "Attempting to send to peer `%s' from peer `%s', but not connected!\n",
1512                 GNUNET_i2s (target), GNUNET_h2s (&handle->me.hashPubKey));
1513     GNUNET_break (0);
1514     return NULL;
1515   }
1516   GNUNET_assert (notify_size + sizeof (struct SendMessage) <
1517                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
1518   th = GNUNET_malloc (sizeof (struct GNUNET_CORE_TransmitHandle));
1519   th->peer = pr;
1520   GNUNET_assert (NULL != notify);
1521   th->get_message = notify;
1522   th->get_message_cls = notify_cls;
1523   th->timeout = GNUNET_TIME_relative_to_absolute (maxdelay);
1524   th->priority = priority;
1525   th->msize = notify_size;
1526   th->cork = cork;
1527   /* bound queue size */
1528   if (pr->queue_size == handle->queue_size)
1529   {
1530     /* find lowest-priority entry, but skip the head of the list */
1531     minp = pr->pending_head->next;
1532     prev = minp;
1533     while (prev != NULL)
1534     {
1535       if (prev->priority < minp->priority)
1536         minp = prev;
1537       prev = prev->next;
1538     }
1539     if (minp == NULL)
1540     {
1541       GNUNET_break (handle->queue_size != 0);
1542       GNUNET_break (pr->queue_size == 1);
1543       GNUNET_free (th);
1544 #if DEBUG_CORE
1545       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1546                   "Dropping transmission request: cannot drop queue head and limit is one\n");
1547 #endif
1548       return NULL;
1549     }
1550     if (priority <= minp->priority)
1551     {
1552 #if DEBUG_CORE
1553       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1554                   "Dropping transmission request: priority too low\n");
1555 #endif
1556       GNUNET_free (th);
1557       return NULL;              /* priority too low */
1558     }
1559     GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, minp);
1560     pr->queue_size--;
1561     GNUNET_assert (0 == minp->get_message (minp->get_message_cls, 0, NULL));
1562     GNUNET_free (minp);
1563   }
1564
1565   /* Order entries by deadline, but SKIP 'HEAD' if
1566    * we're in the 'ready_peer_*' DLL */
1567   pos = pr->pending_head;
1568   if ((pr->prev != NULL) || (pr->next != NULL) ||
1569       (pr == handle->ready_peer_head))
1570   {
1571     GNUNET_assert (pos != NULL);
1572     pos = pos->next;            /* skip head */
1573   }
1574
1575   /* insertion sort */
1576   prev = pos;
1577   while ((pos != NULL) && (pos->timeout.abs_value < th->timeout.abs_value))
1578   {
1579     prev = pos;
1580     pos = pos->next;
1581   }
1582   GNUNET_CONTAINER_DLL_insert_after (pr->pending_head, pr->pending_tail, prev,
1583                                      th);
1584   pr->queue_size++;
1585   /* was the request queue previously empty? */
1586 #if DEBUG_CORE
1587   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission request added to queue\n");
1588 #endif
1589   if ((pr->pending_head == th) && (pr->ntr_task == GNUNET_SCHEDULER_NO_TASK) &&
1590       (pr->next == NULL) && (pr->prev == NULL) &&
1591       (handle->ready_peer_head != pr))
1592     pr->ntr_task =
1593         GNUNET_SCHEDULER_add_now (&run_request_next_transmission, pr);
1594   return th;
1595 }
1596
1597
1598 /**
1599  * Cancel the specified transmission-ready notification.
1600  *
1601  * @param th handle that was returned by "notify_transmit_ready".
1602  */
1603 void
1604 GNUNET_CORE_notify_transmit_ready_cancel (struct GNUNET_CORE_TransmitHandle *th)
1605 {
1606   struct PeerRecord *pr = th->peer;
1607   struct GNUNET_CORE_Handle *h = pr->ch;
1608   int was_head;
1609
1610   was_head = (pr->pending_head == th);
1611   GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, th);
1612   pr->queue_size--;
1613   if (th->cm != NULL)
1614   {
1615     /* we're currently in the control queue, remove */
1616     GNUNET_CONTAINER_DLL_remove (h->control_pending_head,
1617                                  h->control_pending_tail, th->cm);
1618     GNUNET_free (th->cm);
1619   }
1620   GNUNET_free (th);
1621   if (was_head)
1622   {
1623     if ((pr->prev != NULL) || (pr->next != NULL) || (pr == h->ready_peer_head))
1624     {
1625       /* the request that was 'approved' by core was
1626        * canceled before it could be transmitted; remove
1627        * us from the 'ready' list */
1628       GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr);
1629     }
1630     request_next_transmission (pr);
1631   }
1632 }
1633
1634
1635 /* ****************** GNUNET_CORE_peer_change_preference ******************** */
1636
1637
1638 struct GNUNET_CORE_InformationRequestContext
1639 {
1640
1641   /**
1642    * Our connection to the service.
1643    */
1644   struct GNUNET_CORE_Handle *h;
1645
1646   /**
1647    * Link to control message, NULL if CM was sent.
1648    */
1649   struct ControlMessage *cm;
1650
1651   /**
1652    * Link to peer record.
1653    */
1654   struct PeerRecord *pr;
1655 };
1656
1657
1658 /**
1659  * CM was sent, remove link so we don't double-free.
1660  *
1661  * @param cls the 'struct GNUNET_CORE_InformationRequestContext'
1662  * @param success were we successful?
1663  */
1664 static void
1665 change_preference_send_continuation (void *cls, int success)
1666 {
1667   struct GNUNET_CORE_InformationRequestContext *irc = cls;
1668
1669   irc->cm = NULL;
1670 }
1671
1672
1673 /**
1674  * Obtain statistics and/or change preferences for the given peer.
1675  *
1676  * @param h core handle
1677  * @param peer identifies the peer
1678  * @param amount reserve N bytes for receiving, negative
1679  *                amounts can be used to undo a (recent) reservation;
1680  * @param preference increase incoming traffic share preference by this amount;
1681  *                in the absence of "amount" reservations, we use this
1682  *                preference value to assign proportional bandwidth shares
1683  *                to all connected peers
1684  * @param info function to call with the resulting configuration information
1685  * @param info_cls closure for info
1686  * @return NULL on error
1687  */
1688 struct GNUNET_CORE_InformationRequestContext *
1689 GNUNET_CORE_peer_change_preference (struct GNUNET_CORE_Handle *h,
1690                                     const struct GNUNET_PeerIdentity *peer,
1691                                     int32_t amount, uint64_t preference,
1692                                     GNUNET_CORE_PeerConfigurationInfoCallback
1693                                     info, void *info_cls)
1694 {
1695   struct GNUNET_CORE_InformationRequestContext *irc;
1696   struct PeerRecord *pr;
1697   struct RequestInfoMessage *rim;
1698   struct ControlMessage *cm;
1699
1700   pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &peer->hashPubKey);
1701   if (NULL == pr)
1702   {
1703     /* attempt to change preference on peer that is not connected */
1704     GNUNET_assert (0);
1705     return NULL;
1706   }
1707   if (pr->pcic != NULL)
1708   {
1709     /* second change before first one is done */
1710     GNUNET_break (0);
1711     return NULL;
1712   }
1713   irc = GNUNET_malloc (sizeof (struct GNUNET_CORE_InformationRequestContext));
1714   irc->h = h;
1715   irc->pr = pr;
1716   cm = GNUNET_malloc (sizeof (struct ControlMessage) +
1717                       sizeof (struct RequestInfoMessage));
1718   cm->cont = &change_preference_send_continuation;
1719   cm->cont_cls = irc;
1720   irc->cm = cm;
1721   rim = (struct RequestInfoMessage *) &cm[1];
1722   rim->header.size = htons (sizeof (struct RequestInfoMessage));
1723   rim->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_REQUEST_INFO);
1724   rim->rim_id = htonl (pr->rim_id = h->rim_id_gen++);
1725   rim->reserved = htonl (0);
1726   rim->reserve_inbound = htonl (amount);
1727   rim->preference_change = GNUNET_htonll (preference);
1728   rim->peer = *peer;
1729 #if DEBUG_CORE
1730   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1731               "Queueing CHANGE PREFERENCE request for peer `%s' with RIM %u\n",
1732               GNUNET_i2s (peer), (unsigned int) pr->rim_id);
1733 #endif
1734   GNUNET_CONTAINER_DLL_insert_tail (h->control_pending_head,
1735                                     h->control_pending_tail, cm);
1736   pr->pcic = info;
1737   pr->pcic_cls = info_cls;
1738   pr->pcic_ptr = irc;           /* for free'ing irc */
1739   if (NULL != h->client)
1740     trigger_next_request (h, GNUNET_NO);
1741   return irc;
1742 }
1743
1744
1745 /**
1746  * Cancel request for getting information about a peer.
1747  * Note that an eventual change in preference, trust or bandwidth
1748  * assignment MAY have already been committed at the time,
1749  * so cancelling a request is NOT sure to undo the original
1750  * request.  The original request may or may not still commit.
1751  * The only thing cancellation ensures is that the callback
1752  * from the original request will no longer be called.
1753  *
1754  * @param irc context returned by the original GNUNET_CORE_peer_get_info call
1755  */
1756 void
1757 GNUNET_CORE_peer_change_preference_cancel (struct
1758                                            GNUNET_CORE_InformationRequestContext
1759                                            *irc)
1760 {
1761   struct GNUNET_CORE_Handle *h = irc->h;
1762   struct PeerRecord *pr = irc->pr;
1763
1764   GNUNET_assert (pr->pcic_ptr == irc);
1765   if (irc->cm != NULL)
1766   {
1767     GNUNET_CONTAINER_DLL_remove (h->control_pending_head,
1768                                  h->control_pending_tail, irc->cm);
1769     GNUNET_free (irc->cm);
1770   }
1771   pr->pcic = NULL;
1772   pr->pcic_cls = NULL;
1773   pr->pcic_ptr = NULL;
1774   GNUNET_free (irc);
1775 }
1776
1777
1778 /* end of core_api.c */