fixing various issues Nate had: ready queue simply must be purged on reconnect
[oweals/gnunet.git] / src / core / core_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file core/core_api.c
23  * @brief core service; this is the main API for encrypted P2P
24  *        communications
25  * @author Christian Grothoff
26  *
27  * TODO:
28  * - implement atsi parsing and passing
29  */
30 #include "platform.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_core_service.h"
33 #include "core.h"
34
35
36 /**
37  * Information we track for each peer.
38  */
39 struct PeerRecord
40 {
41
42   /**
43    * We generally do NOT keep peer records in a DLL; this
44    * DLL is only used IF this peer's 'pending_head' message
45    * is ready for transmission.  
46    */
47   struct PeerRecord *prev;
48
49   /**
50    * We generally do NOT keep peer records in a DLL; this
51    * DLL is only used IF this peer's 'pending_head' message
52    * is ready for transmission. 
53    */
54   struct PeerRecord *next;
55
56   /**
57    * Peer the record is about.
58    */
59   struct GNUNET_PeerIdentity peer;
60
61   /**
62    * Corresponding core handle.
63    */
64   struct GNUNET_CORE_Handle *ch;
65
66   /**
67    * Head of doubly-linked list of pending requests.
68    * Requests are sorted by deadline *except* for HEAD,
69    * which is only modified upon transmission to core.
70    */
71   struct GNUNET_CORE_TransmitHandle *pending_head;
72
73   /**
74    * Tail of doubly-linked list of pending requests.
75    */
76   struct GNUNET_CORE_TransmitHandle *pending_tail;
77
78   /**
79    * Pending callback waiting for peer information, or NULL for none.
80    */
81   GNUNET_CORE_PeerConfigurationInfoCallback pcic;
82
83   /**
84    * Closure for pcic.
85    */
86   void *pcic_cls;
87
88   /**
89    * Request information ID for the given pcic (needed in case a
90    * request is cancelled after being submitted to core and a new
91    * one is generated; in this case, we need to avoid matching the
92    * reply to the first (cancelled) request to the second request).
93    */
94   uint32_t rim_id;
95
96   /**
97    * ID of timeout task for the 'pending_head' handle
98    * which is the one with the smallest timeout. 
99    */
100   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
101
102   /**
103    * Current size of the queue of pending requests.
104    */
105   unsigned int queue_size;
106
107   /**
108    * SendMessageRequest ID generator for this peer.
109    */
110   uint16_t smr_id_gen;
111   
112 };
113
114
115 /**
116  * Entry in a doubly-linked list of control messages to be transmitted
117  * to the core service.  Control messages include traffic allocation,
118  * connection requests and of course our initial 'init' request.
119  * 
120  * The actual message is allocated at the end of this struct.
121  */
122 struct ControlMessage
123 {
124   /**
125    * This is a doubly-linked list.
126    */
127   struct ControlMessage *next;
128
129   /**
130    * This is a doubly-linked list.
131    */
132   struct ControlMessage *prev;
133
134   /**
135    * Function to run after successful transmission (or call with
136    * reason 'TIMEOUT' on error); called with scheduler context 'NULL'
137    * on disconnect.
138    */
139   GNUNET_SCHEDULER_Task cont;
140
141   /**
142    * Closure for 'cont'.
143    */
144   void *cont_cls;
145
146   /**
147    * Transmit handle (if one is associated with this ControlMessage), or NULL.
148    */
149   struct GNUNET_CORE_TransmitHandle *th;
150 };
151
152
153
154 /**
155  * Context for the core service connection.
156  */
157 struct GNUNET_CORE_Handle
158 {
159
160   /**
161    * Configuration we're using.
162    */
163   const struct GNUNET_CONFIGURATION_Handle *cfg;
164
165   /**
166    * Closure for the various callbacks.
167    */
168   void *cls;
169
170   /**
171    * Function to call once we've handshaked with the core service.
172    */
173   GNUNET_CORE_StartupCallback init;
174
175   /**
176    * Function to call whenever we're notified about a peer connecting.
177    */
178   GNUNET_CORE_ConnectEventHandler connects;
179
180   /**
181    * Function to call whenever we're notified about a peer disconnecting.
182    */
183   GNUNET_CORE_DisconnectEventHandler disconnects;
184
185   /**
186    * Function to call whenever we're notified about a peer changing status.
187    */  
188   GNUNET_CORE_PeerStatusEventHandler status_events;
189   
190   /**
191    * Function to call whenever we receive an inbound message.
192    */
193   GNUNET_CORE_MessageCallback inbound_notify;
194
195   /**
196    * Function to call whenever we receive an outbound message.
197    */
198   GNUNET_CORE_MessageCallback outbound_notify;
199
200   /**
201    * Function handlers for messages of particular type.
202    */
203   const struct GNUNET_CORE_MessageHandler *handlers;
204
205   /**
206    * Our connection to the service.
207    */
208   struct GNUNET_CLIENT_Connection *client;
209
210   /**
211    * Handle for our current transmission request.
212    */
213   struct GNUNET_CLIENT_TransmitHandle *cth;
214
215   /**
216    * Head of doubly-linked list of pending requests.
217    */
218   struct ControlMessage *pending_head;
219
220   /**
221    * Tail of doubly-linked list of pending requests.
222    */
223   struct ControlMessage *pending_tail;
224
225   /**
226    * Head of doubly-linked list of peers that are core-approved
227    * to send their next message.
228    */
229   struct PeerRecord *ready_peer_head;
230
231   /**
232    * Tail of doubly-linked list of peers that are core-approved
233    * to send their next message.
234    */
235   struct PeerRecord *ready_peer_tail;
236
237   /**
238    * Hash map listing all of the peers that we are currently
239    * connected to.
240    */
241   struct GNUNET_CONTAINER_MultiHashMap *peers;
242
243   /**
244    * Identity of this peer.
245    */
246   struct GNUNET_PeerIdentity me;
247
248   /**
249    * ID of reconnect task (if any).
250    */
251   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
252
253   /**
254    * Current delay we use for re-trying to connect to core.
255    */
256   struct GNUNET_TIME_Relative retry_backoff;
257
258   /**
259    * Request information ID generator.
260    */
261   uint32_t rim_id_gen;
262
263   /**
264    * Number of messages we are allowed to queue per target.
265    */
266   unsigned int queue_size;
267
268   /**
269    * Number of entries in the handlers array.
270    */
271   unsigned int hcnt;
272
273   /**
274    * For inbound notifications without a specific handler, do
275    * we expect to only receive headers?
276    */
277   int inbound_hdr_only;
278
279   /**
280    * For outbound notifications without a specific handler, do
281    * we expect to only receive headers?
282    */
283   int outbound_hdr_only;
284
285   /**
286    * Are we currently disconnected and hence unable to forward
287    * requests?
288    */
289   int currently_down;
290
291 };
292
293
294 /**
295  * Handle for a transmission request.
296  */
297 struct GNUNET_CORE_TransmitHandle
298 {
299
300   /**
301    * We keep active transmit handles in a doubly-linked list.
302    */
303   struct GNUNET_CORE_TransmitHandle *next;
304
305   /**
306    * We keep active transmit handles in a doubly-linked list.
307    */
308   struct GNUNET_CORE_TransmitHandle *prev;
309
310   /**
311    * Corresponding peer record.
312    */
313   struct PeerRecord *peer;
314
315   /**
316    * Corresponding SEND_REQUEST message.  Only non-NULL 
317    * while SEND_REQUEST message is pending.
318    */
319   struct ControlMessage *cm;
320
321   /**
322    * Function that will be called to get the actual request
323    * (once we are ready to transmit this request to the core).
324    * The function will be called with a NULL buffer to signal
325    * timeout.
326    */
327   GNUNET_CONNECTION_TransmitReadyNotify get_message;
328
329   /**
330    * Closure for get_message.
331    */
332   void *get_message_cls;
333
334   /**
335    * Timeout for this handle.
336    */
337   struct GNUNET_TIME_Absolute timeout;
338
339   /**
340    * How important is this message?
341    */
342   uint32_t priority;
343
344   /**
345    * Size of this request.
346    */
347   uint16_t msize;
348
349   /**
350    * Send message request ID for this request.
351    */
352   uint16_t smr_id;
353
354 };
355
356
357 /**
358  * Our current client connection went down.  Clean it up
359  * and try to reconnect!
360  *
361  * @param h our handle to the core service
362  */
363 static void
364 reconnect (struct GNUNET_CORE_Handle *h);
365
366
367 /**
368  * Task schedule to try to re-connect to core.
369  *
370  * @param cls the 'struct GNUNET_CORE_Handle'
371  * @param tc task context
372  */
373 static void
374 reconnect_task (void *cls, 
375                 const struct GNUNET_SCHEDULER_TaskContext *tc)
376 {
377   struct GNUNET_CORE_Handle *h = cls;
378
379   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
380 #if DEBUG_CORE
381   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
382               "Connecting to CORE service after delay\n");
383 #endif
384   reconnect (h);
385 }
386
387
388 /**
389  * Notify clients about disconnect and free 
390  * the entry for connected peer.
391  *
392  * @param cls the 'struct GNUNET_CORE_Handle*'
393  * @param key the peer identity (not used)
394  * @param value the 'struct PeerRecord' to free.
395  * @return GNUNET_YES (continue)
396  */
397 static int
398 disconnect_and_free_peer_entry (void *cls,
399                                 const GNUNET_HashCode *key,
400                                 void *value)
401 {
402   static struct GNUNET_BANDWIDTH_Value32NBO zero;
403   struct GNUNET_CORE_Handle *h = cls;
404   struct GNUNET_CORE_TransmitHandle *th;
405   struct PeerRecord *pr = value;
406   GNUNET_CORE_PeerConfigurationInfoCallback pcic;
407
408   while (NULL != (th = pr->pending_head))
409     {
410       GNUNET_CONTAINER_DLL_remove (pr->pending_head,
411                                    pr->pending_tail,
412                                    th);
413       pr->queue_size--;
414       GNUNET_assert (0 == 
415                      th->get_message (th->get_message_cls,
416                                       0, NULL));
417       GNUNET_free (th);
418     }
419   if (NULL != (pcic = pr->pcic))
420     {
421       pr->pcic = NULL;
422       pcic (pr->pcic_cls,
423             &pr->peer,
424             zero,
425             0, 0);
426     }
427   if (pr->timeout_task != GNUNET_SCHEDULER_NO_TASK)
428     {
429       GNUNET_SCHEDULER_cancel (pr->timeout_task);
430       pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
431     }
432   GNUNET_assert (pr->queue_size == 0);
433   if ( (pr->prev != NULL) ||
434        (pr->next != NULL) ||
435        (h->ready_peer_head == pr) )
436     GNUNET_CONTAINER_DLL_remove (h->ready_peer_head,
437                                  h->ready_peer_tail,
438                                  pr);
439   if (h->disconnects != NULL)
440     h->disconnects (h->cls,
441                     &pr->peer);    
442   GNUNET_assert (GNUNET_YES ==
443                  GNUNET_CONTAINER_multihashmap_remove (h->peers,
444                                                        key,
445                                                        pr));
446   GNUNET_assert (pr->pending_head == NULL);
447   GNUNET_assert (pr->pending_tail == NULL);
448   GNUNET_assert (pr->ch = h);
449   GNUNET_assert (pr->queue_size == 0);
450   GNUNET_assert (pr->timeout_task == GNUNET_SCHEDULER_NO_TASK);
451   GNUNET_free (pr);  
452   return GNUNET_YES;
453 }
454
455
456 /**
457  * Close down any existing connection to the CORE service and
458  * try re-establishing it later.
459  *
460  * @param h our handle
461  */
462 static void
463 reconnect_later (struct GNUNET_CORE_Handle *h)
464 {
465   struct ControlMessage *cm;
466   struct PeerRecord *pr;
467
468   while (NULL != (cm = h->pending_head))
469     {
470       GNUNET_CONTAINER_DLL_remove (h->pending_head,
471                                    h->pending_tail,
472                                    cm);
473       if (cm->th != NULL)
474         cm->th->cm = NULL; 
475       if (cm->cont != NULL)
476         cm->cont (cm->cont_cls, NULL);
477       GNUNET_free (cm);
478     }
479   if (h->client != NULL)
480     {
481       GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
482       h->client = NULL;
483       h->cth = NULL;
484       GNUNET_CONTAINER_multihashmap_iterate (h->peers,
485                                              &disconnect_and_free_peer_entry,
486                                              h);
487     }
488   while (NULL != (pr = h->ready_peer_head))    
489     GNUNET_CONTAINER_DLL_remove (h->ready_peer_head,
490                                  h->ready_peer_tail,
491                                  pr);
492   
493   GNUNET_assert (h->pending_head == NULL);
494   h->currently_down = GNUNET_YES;
495   GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
496   h->retry_backoff = GNUNET_TIME_relative_min (GNUNET_TIME_UNIT_SECONDS,
497                                                h->retry_backoff);
498   h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->retry_backoff,
499                                                     &reconnect_task,
500                                                     h);
501   h->retry_backoff = GNUNET_TIME_relative_multiply (h->retry_backoff, 2);
502 }
503
504
505 /**
506  * Check the list of pending requests, send the next
507  * one to the core.
508  *
509  * @param h core handle
510  * @param ignore_currently_down transmit message even if not initialized?
511  */
512 static void
513 trigger_next_request (struct GNUNET_CORE_Handle *h,
514                       int ignore_currently_down);
515
516
517 /**
518  * The given request hit its timeout.  Remove from the
519  * doubly-linked list and call the respective continuation.
520  *
521  * @param cls the transmit handle of the request that timed out
522  * @param tc context, can be NULL (!)
523  */
524 static void
525 transmission_timeout (void *cls, 
526                       const struct GNUNET_SCHEDULER_TaskContext *tc);
527
528
529 /**
530  * Send a control message to the peer asking for transmission
531  * of the message in the given peer record.
532  *
533  * @param pr peer to request transmission to
534  */
535 static void
536 request_next_transmission (struct PeerRecord *pr)
537 {
538   struct GNUNET_CORE_Handle *h = pr->ch;
539   struct ControlMessage *cm;
540   struct SendMessageRequest *smr;
541   struct GNUNET_CORE_TransmitHandle *th;
542
543   if (pr->timeout_task != GNUNET_SCHEDULER_NO_TASK)
544     {
545       GNUNET_SCHEDULER_cancel (pr->timeout_task);
546       pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
547     }
548   if (NULL == (th = pr->pending_head))
549     {
550       trigger_next_request (h, GNUNET_NO);
551       return;
552     }
553   GNUNET_assert (pr->prev == NULL);
554   GNUNET_assert (pr->next == NULL);
555   pr->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (th->timeout),
556                                                    &transmission_timeout,
557                                                    pr);
558   cm = GNUNET_malloc (sizeof (struct ControlMessage) + 
559                       sizeof (struct SendMessageRequest));
560   th->cm = cm;
561   cm->th = th;
562   smr = (struct SendMessageRequest*) &cm[1];
563   smr->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND_REQUEST);
564   smr->header.size = htons (sizeof (struct SendMessageRequest));
565   smr->priority = htonl (th->priority);
566   smr->deadline = GNUNET_TIME_absolute_hton (th->timeout);
567   smr->peer = pr->peer;
568   smr->queue_size = htonl (pr->queue_size);
569   smr->size = htons (th->msize);
570   smr->smr_id = htons (th->smr_id = pr->smr_id_gen++);
571   GNUNET_CONTAINER_DLL_insert_after (h->pending_head,
572                                      h->pending_tail,
573                                      h->pending_tail,
574                                      cm);
575 #if DEBUG_CORE
576   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
577               "Adding SEND REQUEST for peer `%s' to message queue\n",
578               GNUNET_i2s (&pr->peer));
579 #endif
580   trigger_next_request (h, GNUNET_NO);
581 }
582
583
584 /**
585  * The given request hit its timeout.  Remove from the
586  * doubly-linked list and call the respective continuation.
587  *
588  * @param cls the transmit handle of the request that timed out
589  * @param tc context, can be NULL (!)
590  */
591 static void
592 transmission_timeout (void *cls, 
593                       const struct GNUNET_SCHEDULER_TaskContext *tc)
594 {
595   struct PeerRecord *pr = cls;
596   struct GNUNET_CORE_TransmitHandle *th;
597
598   pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
599   th = pr->pending_head;
600   GNUNET_CONTAINER_DLL_remove (pr->pending_head,
601                                pr->pending_tail,
602                                th);
603   pr->queue_size--;
604 #if DEBUG_CORE
605   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
606               "Signalling timeout of request for transmission to CORE service\n");
607 #endif
608   GNUNET_assert (0 == th->get_message (th->get_message_cls, 0, NULL));
609   request_next_transmission (pr);
610 }
611
612
613 /**
614  * Transmit the next message to the core service.
615  */
616 static size_t
617 transmit_message (void *cls,
618                   size_t size, 
619                   void *buf)
620 {
621   struct GNUNET_CORE_Handle *h = cls;
622   struct ControlMessage *cm;
623   struct GNUNET_CORE_TransmitHandle *th;
624   struct PeerRecord *pr;
625   struct SendMessage *sm;
626   const struct GNUNET_MessageHeader *hdr;
627   uint16_t msize;
628   size_t ret;
629
630   h->cth = NULL;
631   if (buf == NULL)
632     {
633 #if DEBUG_CORE
634       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
635                   "Transmission failed, initiating reconnect\n");
636 #endif
637       reconnect_later (h);
638       return 0;
639     }
640   /* first check for control messages */
641   if (NULL != (cm = h->pending_head))
642     {
643       hdr = (const struct GNUNET_MessageHeader*) &cm[1];
644       msize = ntohs (hdr->size);
645       if (size < msize)
646         {
647           trigger_next_request (h, GNUNET_NO);
648           return 0;
649         }
650 #if DEBUG_CORE
651       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
652                   "Transmitting control message with %u bytes of type %u to core.\n",
653                   (unsigned int) msize,
654                   (unsigned int) ntohs (hdr->type));
655 #endif
656       memcpy (buf, hdr, msize);
657       GNUNET_CONTAINER_DLL_remove (h->pending_head,
658                                    h->pending_tail,
659                                    cm);     
660       if (cm->th != NULL)
661         cm->th->cm = NULL;
662       if (NULL != cm->cont)
663         GNUNET_SCHEDULER_add_continuation (cm->cont, 
664                                            cm->cont_cls,
665                                            GNUNET_SCHEDULER_REASON_PREREQ_DONE);
666       GNUNET_free (cm);
667       trigger_next_request (h, GNUNET_NO);
668       return msize;
669     }
670   /* now check for 'ready' P2P messages */
671   if (NULL != (pr = h->ready_peer_head))
672     {
673       GNUNET_assert (pr->pending_head != NULL);
674       th = pr->pending_head;
675       if (size < th->msize + sizeof (struct SendMessage))
676         {
677           trigger_next_request (h, GNUNET_NO);
678           return 0;
679         }
680       GNUNET_CONTAINER_DLL_remove (h->ready_peer_head,
681                                    h->ready_peer_tail,
682                                    pr);
683       GNUNET_CONTAINER_DLL_remove (pr->pending_head,
684                                    pr->pending_tail,
685                                    th);
686       pr->queue_size--;
687       if (pr->timeout_task != GNUNET_SCHEDULER_NO_TASK)
688         {
689           GNUNET_SCHEDULER_cancel (pr->timeout_task);
690           pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
691         }
692 #if DEBUG_CORE
693       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
694                   "Transmitting SEND request to `%s' with %u bytes.\n",
695                   GNUNET_i2s (&pr->peer),
696                   (unsigned int) th->msize);
697 #endif
698       sm = (struct SendMessage *) buf;
699       sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND);
700       sm->priority = htonl (th->priority);
701       sm->deadline = GNUNET_TIME_absolute_hton (th->timeout);
702       sm->peer = pr->peer;
703       ret = th->get_message (th->get_message_cls,
704                              size - sizeof (struct SendMessage),
705                              &sm[1]);
706
707       if (0 == ret)
708         {
709 #if DEBUG_CORE
710           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
711                       "Size of clients message to peer %s is 0!\n",
712                       GNUNET_i2s(&pr->peer));
713 #endif
714           /* client decided to send nothing! */
715           request_next_transmission (pr);
716           return 0;       
717         }
718 #if DEBUG_CORE
719       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
720                   "Produced SEND message to core with %u bytes payload\n",
721                   (unsigned int) ret);
722 #endif
723       GNUNET_assert (ret >= sizeof (struct GNUNET_MessageHeader));
724       if (ret + sizeof (struct SendMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
725         {
726           GNUNET_break (0);
727           request_next_transmission (pr);
728           return 0;
729         }
730       ret += sizeof (struct SendMessage);
731       sm->header.size = htons (ret);
732       GNUNET_assert (ret <= size);
733       GNUNET_free (th);
734       request_next_transmission (pr);
735       return ret;
736     }
737   return 0;
738 }
739
740
741 /**
742  * Check the list of pending requests, send the next
743  * one to the core.
744  *
745  * @param h core handle
746  * @param ignore_currently_down transmit message even if not initialized?
747  */
748 static void
749 trigger_next_request (struct GNUNET_CORE_Handle *h,
750                       int ignore_currently_down)
751 {
752   uint16_t msize;
753
754   if ( (GNUNET_YES == h->currently_down) &&
755        (ignore_currently_down == GNUNET_NO) )
756     {
757 #if DEBUG_CORE
758       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
759                   "Core connection down, not processing queue\n");
760 #endif
761       return;
762     }
763   if (NULL != h->cth)
764     {
765 #if DEBUG_CORE
766       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
767                   "Request pending, not processing queue\n");
768 #endif
769       return;
770     }
771   if (h->pending_head != NULL)
772     msize = ntohs (((struct GNUNET_MessageHeader*) &h->pending_head[1])->size);    
773   else if (h->ready_peer_head != NULL) 
774     msize = h->ready_peer_head->pending_head->msize + sizeof (struct SendMessage);    
775   else
776     {
777 #if DEBUG_CORE
778       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
779                   "Request queue empty, not processing queue\n");
780 #endif
781       return; /* no pending message */
782     }
783   h->cth = GNUNET_CLIENT_notify_transmit_ready (h->client,
784                                                 msize,
785                                                 GNUNET_TIME_UNIT_FOREVER_REL,
786                                                 GNUNET_NO,
787                                                 &transmit_message, h);
788 }
789
790
791 /**
792  * Handler for notification messages received from the core.
793  *
794  * @param cls our "struct GNUNET_CORE_Handle"
795  * @param msg the message received from the core service
796  */
797 static void
798 main_notify_handler (void *cls, 
799                      const struct GNUNET_MessageHeader *msg)
800 {
801   struct GNUNET_CORE_Handle *h = cls;
802   const struct InitReplyMessage *m;
803   const struct ConnectNotifyMessage *cnm;
804   const struct DisconnectNotifyMessage *dnm;
805   const struct NotifyTrafficMessage *ntm;
806   const struct GNUNET_MessageHeader *em;
807   const struct ConfigurationInfoMessage *cim;
808   const struct PeerStatusNotifyMessage *psnm;
809   const struct SendMessageReady *smr;
810   const struct GNUNET_CORE_MessageHandler *mh;
811   GNUNET_CORE_StartupCallback init;
812   GNUNET_CORE_PeerConfigurationInfoCallback pcic;
813   struct PeerRecord *pr;
814   struct GNUNET_CORE_TransmitHandle *th;
815   unsigned int hpos;
816   int trigger;
817   uint16_t msize;
818   uint16_t et;
819
820   if (msg == NULL)
821     {
822       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
823                   _
824                   ("Client was disconnected from core service, trying to reconnect.\n"));
825       reconnect_later (h);
826       return;
827     }
828   msize = ntohs (msg->size);
829 #if DEBUG_CORE
830   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
831               "Processing message of type %u and size %u from core service\n",
832               ntohs (msg->type), msize);
833 #endif
834   switch (ntohs (msg->type))
835     {
836     case GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY:
837       if (ntohs (msg->size) != sizeof (struct InitReplyMessage))
838         {
839           GNUNET_break (0);
840           reconnect_later (h);
841           return;
842         }
843       m = (const struct InitReplyMessage *) msg;
844       GNUNET_break (0 == ntohl (m->reserved));
845       /* start our message processing loop */
846       if (GNUNET_YES == h->currently_down)
847         {
848           h->currently_down = GNUNET_NO;
849           trigger_next_request (h, GNUNET_NO);
850         }
851       h->retry_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
852       if (NULL != (init = h->init))
853         {
854           /* mark so we don't call init on reconnect */
855           h->init = NULL;
856           GNUNET_CRYPTO_hash (&m->publicKey,
857                               sizeof (struct
858                                       GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
859                               &h->me.hashPubKey);
860 #if DEBUG_CORE
861           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
862                       "Connected to core service of peer `%s'.\n",
863                       GNUNET_i2s (&h->me));
864 #endif
865           init (h->cls, h, &h->me, &m->publicKey);
866         }
867       else
868         {
869 #if DEBUG_CORE
870           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
871                       "Successfully reconnected to core service.\n");
872 #endif
873         }
874       break;
875     case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT:
876       if (msize != sizeof (struct ConnectNotifyMessage))
877         {
878           GNUNET_break (0);
879           reconnect_later (h);
880           return;
881         }
882       cnm = (const struct ConnectNotifyMessage *) msg;
883 #if DEBUG_CORE
884       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
885                   "Received notification about connection from `%s'.\n",
886                   GNUNET_i2s (&cnm->peer));
887 #endif
888       if (0 == memcmp (&h->me,
889                        &cnm->peer,
890                        sizeof (struct GNUNET_PeerIdentity)))
891         {
892           /* disconnect from self!? */
893           GNUNET_break (0);
894           return;
895         }
896       pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
897                                               &cnm->peer.hashPubKey);
898       if (pr != NULL)
899         {
900           GNUNET_break (0);
901           reconnect_later (h);
902           return;
903         }
904       pr = GNUNET_malloc (sizeof (struct PeerRecord));
905       pr->peer = cnm->peer;
906       pr->ch = h;
907       GNUNET_assert (GNUNET_YES ==
908                      GNUNET_CONTAINER_multihashmap_put (h->peers,
909                                                         &cnm->peer.hashPubKey,
910                                                         pr,
911                                                         GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
912       if (NULL != h->connects)
913         h->connects (h->cls,
914                      &cnm->peer,
915                      NULL /* FIXME: atsi! */);
916       break;
917     case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT:
918       if (msize != sizeof (struct DisconnectNotifyMessage))
919         {
920           GNUNET_break (0);
921           reconnect_later (h);
922           return;
923         }
924       dnm = (const struct DisconnectNotifyMessage *) msg;
925       if (0 == memcmp (&h->me,
926                        &dnm->peer,
927                        sizeof (struct GNUNET_PeerIdentity)))
928         {
929           /* connection to self!? */
930           GNUNET_break (0);
931           return;
932         }
933 #if DEBUG_CORE
934       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
935                   "Received notification about disconnect from `%s'.\n",
936                   GNUNET_i2s (&dnm->peer));
937 #endif
938       pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
939                                               &dnm->peer.hashPubKey);
940       if (pr == NULL)
941         {
942           GNUNET_break (0);
943           reconnect_later (h);
944           return;
945         }
946       trigger = ( (pr->prev != NULL) ||
947                   (pr->next != NULL) ||
948                   (h->ready_peer_head == pr) );
949       disconnect_and_free_peer_entry (h, &dnm->peer.hashPubKey, pr);
950       if (trigger)
951         trigger_next_request (h, GNUNET_NO);
952       break;
953     case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_STATUS_CHANGE:
954       if (NULL == h->status_events)
955         {
956           GNUNET_break (0);
957           break;
958         }
959       if (msize != sizeof (struct PeerStatusNotifyMessage))
960         {
961           GNUNET_break (0);
962           reconnect_later (h);
963           return;
964         }
965       psnm = (const struct PeerStatusNotifyMessage *) msg;
966       if (0 == memcmp (&h->me,
967                        &psnm->peer,
968                        sizeof (struct GNUNET_PeerIdentity)))
969         {
970           /* self-change!? */
971           GNUNET_break (0);
972           return;
973         }
974 #if DEBUG_CORE
975       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
976                   "Received notification about status change by `%s'.\n",
977                   GNUNET_i2s (&psnm->peer));
978 #endif
979       pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
980                                               &psnm->peer.hashPubKey);
981       if (pr == NULL)
982         {
983           GNUNET_break (0);
984           reconnect_later (h);
985           return;
986         }
987       h->status_events (h->cls,
988                         &psnm->peer,
989                         psnm->bandwidth_in,
990                         psnm->bandwidth_out,
991                         GNUNET_TIME_absolute_ntoh (psnm->timeout),
992                         NULL /* FIXME: atsi */);
993       break;
994     case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND:
995       if (msize <
996           sizeof (struct NotifyTrafficMessage) +
997           sizeof (struct GNUNET_MessageHeader))
998         {
999           GNUNET_break (0);
1000           reconnect_later (h);
1001           return;
1002         }
1003       ntm = (const struct NotifyTrafficMessage *) msg;
1004       if (0 == memcmp (&h->me,
1005                        &ntm->peer,
1006                        sizeof (struct GNUNET_PeerIdentity)))
1007         {
1008           /* self-change!? */
1009           GNUNET_break (0);
1010           return;
1011         }
1012       em = (const struct GNUNET_MessageHeader *) &ntm[1];
1013 #if DEBUG_CORE
1014       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1015                   "Received message of type %u and size %u from peer `%4s'\n",
1016                   ntohs (em->type), 
1017                   ntohs (em->size),
1018                   GNUNET_i2s (&ntm->peer));
1019 #endif
1020       pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
1021                                               &ntm->peer.hashPubKey);
1022       if (pr == NULL)
1023         {
1024           GNUNET_break (0);
1025           reconnect_later (h);
1026           return;
1027         }
1028       if ((GNUNET_NO == h->inbound_hdr_only) &&
1029           (msize != ntohs (em->size) + sizeof (struct NotifyTrafficMessage)))
1030         {
1031           GNUNET_break (0);
1032           reconnect_later (h);
1033           return;
1034         }
1035       et = ntohs (em->type);
1036       for (hpos = 0; hpos < h->hcnt; hpos++)
1037         {
1038           mh = &h->handlers[hpos];
1039           if (mh->type != et)
1040             continue;
1041           if ((mh->expected_size != ntohs (em->size)) &&
1042               (mh->expected_size != 0))
1043             {
1044               GNUNET_break (0);
1045               continue;
1046             }
1047           if (GNUNET_OK !=
1048               h->handlers[hpos].callback (h->cls, &ntm->peer, em,
1049                                           NULL /* FIXME: atsi */))
1050             {
1051               /* error in processing, do not process other messages! */
1052               break;
1053             }
1054         }
1055       if (NULL != h->inbound_notify)
1056         h->inbound_notify (h->cls, &ntm->peer, em,
1057                            NULL /* FIXME: atsi */);
1058       break;
1059     case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_OUTBOUND:
1060       if (msize <
1061           sizeof (struct NotifyTrafficMessage) +
1062           sizeof (struct GNUNET_MessageHeader))
1063         {
1064           GNUNET_break (0);
1065           reconnect_later (h);
1066           return;
1067         }
1068       ntm = (const struct NotifyTrafficMessage *) msg;
1069       if (0 == memcmp (&h->me,
1070                        &ntm->peer,
1071                        sizeof (struct GNUNET_PeerIdentity)))
1072         {
1073           /* self-change!? */
1074           GNUNET_break (0);
1075           return;
1076         }
1077       em = (const struct GNUNET_MessageHeader *) &ntm[1];
1078       pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
1079                                               &ntm->peer.hashPubKey);
1080       if (pr == NULL)
1081         {
1082           GNUNET_break (0);
1083           reconnect_later (h);
1084           return;
1085         }
1086 #if DEBUG_CORE
1087       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1088                   "Received notification about transmission to `%s'.\n",
1089                   GNUNET_i2s (&ntm->peer));
1090 #endif
1091       if ((GNUNET_NO == h->outbound_hdr_only) &&
1092           (msize != ntohs (em->size) + sizeof (struct NotifyTrafficMessage)))
1093         {
1094           GNUNET_break (0);
1095           reconnect_later (h);
1096           return;
1097         }
1098       if (NULL == h->outbound_notify)
1099         {
1100           GNUNET_break (0);
1101           break;
1102         }
1103       h->outbound_notify (h->cls, &ntm->peer, em,
1104                           NULL /* FIXME: atsi? */);
1105       break;
1106     case GNUNET_MESSAGE_TYPE_CORE_SEND_READY:
1107       if (msize != sizeof (struct SendMessageReady))
1108         {
1109           GNUNET_break (0);
1110           reconnect_later (h);
1111           return;
1112         }
1113       smr = (const struct SendMessageReady *) msg;
1114       if (0 == memcmp (&h->me,
1115                        &smr->peer,
1116                        sizeof (struct GNUNET_PeerIdentity)))
1117         {
1118           /* self-change!? */
1119           GNUNET_break (0);
1120           return;
1121         }
1122       pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
1123                                               &smr->peer.hashPubKey);
1124       if (pr == NULL)
1125         {
1126           GNUNET_break (0);
1127           reconnect_later (h);
1128           return;
1129         }
1130 #if DEBUG_CORE
1131       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1132                   "Received notification about transmission readiness to `%s'.\n",
1133                   GNUNET_i2s (&smr->peer));
1134 #endif
1135       if (pr->pending_head == NULL)
1136         {
1137           /* request must have been cancelled between the original request
1138              and the response from core, ignore core's readiness */
1139           return;
1140         }
1141
1142       th = pr->pending_head;
1143       if (ntohs (smr->smr_id) != th->smr_id)
1144         {
1145           /* READY message is for expired or cancelled message,
1146              ignore! (we should have already sent another request) */
1147           break;
1148         }
1149       if ( (pr->prev != NULL) ||
1150            (pr->next != NULL) ||
1151            (h->ready_peer_head == pr) )
1152         {
1153           /* we should not already be on the ready list... */
1154           GNUNET_break (0);
1155           reconnect_later (h);
1156           return;
1157         }
1158       GNUNET_CONTAINER_DLL_insert (h->ready_peer_head,
1159                                    h->ready_peer_tail,
1160                                    pr);
1161       trigger_next_request (h, GNUNET_NO);
1162       break;
1163     case GNUNET_MESSAGE_TYPE_CORE_CONFIGURATION_INFO:
1164       if (ntohs (msg->size) != sizeof (struct ConfigurationInfoMessage))
1165         {
1166           GNUNET_break (0);
1167           reconnect_later (h);
1168           return;
1169         }
1170       cim = (const struct ConfigurationInfoMessage*) msg;
1171       if (0 == memcmp (&h->me,
1172                        &cim->peer,
1173                        sizeof (struct GNUNET_PeerIdentity)))
1174         {
1175           /* self-change!? */
1176           GNUNET_break (0);
1177           return;
1178         }
1179 #if DEBUG_CORE
1180       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1181                   "Received notification about configuration update for `%s'.\n",
1182                   GNUNET_i2s (&cim->peer));
1183 #endif
1184       pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
1185                                               &cim->peer.hashPubKey);
1186       if (pr == NULL)
1187         {
1188           GNUNET_break (0);
1189           reconnect_later (h);
1190           return;
1191         }
1192       if (pr->rim_id != ntohl (cim->rim_id))
1193         break;
1194       pcic = pr->pcic;
1195       pr->pcic = NULL;
1196       if (pcic != NULL)
1197         pcic (pr->pcic_cls,
1198               &pr->peer,
1199               cim->bw_out,
1200               ntohl (cim->reserved_amount),
1201               GNUNET_ntohll (cim->preference));
1202       break;
1203     default:
1204       reconnect_later (h);
1205       return;
1206     }
1207   GNUNET_CLIENT_receive (h->client,
1208                          &main_notify_handler, h, 
1209                          GNUNET_TIME_UNIT_FOREVER_REL);
1210 }
1211
1212
1213 /**
1214  * Task executed once we are done transmitting the INIT message.
1215  * Starts our 'receive' loop.
1216  *
1217  * @param cls the 'struct GNUNET_CORE_Handle'
1218  * @param tc task context
1219  */
1220 static void
1221 init_done_task (void *cls, 
1222                 const struct GNUNET_SCHEDULER_TaskContext *tc)
1223 {
1224   struct GNUNET_CORE_Handle *h = cls;
1225
1226   if (tc == NULL)
1227     return; /* error */
1228   if (0 == (tc->reason & GNUNET_SCHEDULER_REASON_PREREQ_DONE))
1229     {
1230 #if DEBUG_CORE
1231       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1232                   "Failed to exchange INIT with core, retrying\n");
1233 #endif
1234       reconnect_later (h);
1235       return;
1236     }
1237   GNUNET_CLIENT_receive (h->client,
1238                          &main_notify_handler, 
1239                          h, 
1240                          GNUNET_TIME_UNIT_FOREVER_REL);
1241 }
1242
1243
1244 /**
1245  * Our current client connection went down.  Clean it up
1246  * and try to reconnect!
1247  *
1248  * @param h our handle to the core service
1249  */
1250 static void
1251 reconnect (struct GNUNET_CORE_Handle *h)
1252 {
1253   struct ControlMessage *cm;
1254   struct InitMessage *init;
1255   uint32_t opt;
1256   uint16_t msize;
1257   uint16_t *ts;
1258   unsigned int hpos;
1259
1260 #if DEBUG_CORE
1261   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1262               "Reconnecting to CORE service\n");
1263 #endif
1264   GNUNET_assert (h->client == NULL);
1265   GNUNET_assert (h->currently_down == GNUNET_YES);
1266   h->client = GNUNET_CLIENT_connect ("core", h->cfg);
1267   if (h->client == NULL)
1268     {
1269       reconnect_later (h);
1270       return;
1271     }
1272   msize = h->hcnt * sizeof (uint16_t) + sizeof (struct InitMessage);
1273   cm = GNUNET_malloc (sizeof (struct ControlMessage) +
1274                       msize);
1275   cm->cont = &init_done_task;
1276   cm->cont_cls = h;
1277   init = (struct InitMessage*) &cm[1];
1278   init->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_INIT);
1279   init->header.size = htons (msize);
1280   opt = GNUNET_CORE_OPTION_SEND_CONNECT | GNUNET_CORE_OPTION_SEND_DISCONNECT;
1281   if (h->status_events != NULL)
1282     opt |= GNUNET_CORE_OPTION_SEND_STATUS_CHANGE;
1283   if (h->inbound_notify != NULL)
1284     {
1285       if (h->inbound_hdr_only)
1286         opt |= GNUNET_CORE_OPTION_SEND_HDR_INBOUND;
1287       else
1288         opt |= GNUNET_CORE_OPTION_SEND_FULL_INBOUND;
1289     }
1290   if (h->outbound_notify != NULL)
1291     {
1292       if (h->outbound_hdr_only)
1293         opt |= GNUNET_CORE_OPTION_SEND_HDR_OUTBOUND;
1294       else
1295         opt |= GNUNET_CORE_OPTION_SEND_FULL_OUTBOUND;
1296     }
1297   init->options = htonl (opt);
1298   ts = (uint16_t *) &init[1];
1299   for (hpos = 0; hpos < h->hcnt; hpos++)
1300     ts[hpos] = htons (h->handlers[hpos].type);
1301   GNUNET_CONTAINER_DLL_insert (h->pending_head,
1302                                h->pending_tail,
1303                                cm);
1304   trigger_next_request (h, GNUNET_YES);
1305 }
1306
1307
1308
1309 /**
1310  * Connect to the core service.  Note that the connection may
1311  * complete (or fail) asynchronously.
1312  *
1313  * @param cfg configuration to use
1314  * @param queue_size size of the per-peer message queue
1315  * @param cls closure for the various callbacks that follow (including handlers in the handlers array)
1316  * @param init callback to call on timeout or once we have successfully
1317  *        connected to the core service; note that timeout is only meaningful if init is not NULL
1318  * @param connects function to call on peer connect, can be NULL
1319  * @param disconnects function to call on peer disconnect / timeout, can be NULL
1320  * @param status_events function to call on changes to peer connection status, can be NULL
1321  * @param inbound_notify function to call for all inbound messages, can be NULL
1322  * @param inbound_hdr_only set to GNUNET_YES if inbound_notify will only read the
1323  *                GNUNET_MessageHeader and hence we do not need to give it the full message;
1324  *                can be used to improve efficiency, ignored if inbound_notify is NULLL
1325  * @param outbound_notify function to call for all outbound messages, can be NULL
1326  * @param outbound_hdr_only set to GNUNET_YES if outbound_notify will only read the
1327  *                GNUNET_MessageHeader and hence we do not need to give it the full message
1328  *                can be used to improve efficiency, ignored if outbound_notify is NULLL
1329  * @param handlers callbacks for messages we care about, NULL-terminated
1330  * @return handle to the core service (only useful for disconnect until 'init' is called);
1331  *                NULL on error (in this case, init is never called)
1332  */
1333 struct GNUNET_CORE_Handle *
1334 GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
1335                      unsigned int queue_size,
1336                      void *cls,
1337                      GNUNET_CORE_StartupCallback init,
1338                      GNUNET_CORE_ConnectEventHandler connects,
1339                      GNUNET_CORE_DisconnectEventHandler disconnects,
1340                      GNUNET_CORE_PeerStatusEventHandler status_events,
1341                      GNUNET_CORE_MessageCallback inbound_notify,
1342                      int inbound_hdr_only,
1343                      GNUNET_CORE_MessageCallback outbound_notify,
1344                      int outbound_hdr_only,
1345                      const struct GNUNET_CORE_MessageHandler *handlers)
1346 {
1347   struct GNUNET_CORE_Handle *h;
1348
1349   h = GNUNET_malloc (sizeof (struct GNUNET_CORE_Handle));
1350   h->cfg = cfg;
1351   h->queue_size = queue_size;
1352   h->cls = cls;
1353   h->init = init;
1354   h->connects = connects;
1355   h->disconnects = disconnects;
1356   h->status_events = status_events;
1357   h->inbound_notify = inbound_notify;
1358   h->outbound_notify = outbound_notify;
1359   h->inbound_hdr_only = inbound_hdr_only;
1360   h->outbound_hdr_only = outbound_hdr_only;
1361   h->handlers = handlers;
1362   h->hcnt = 0;
1363   h->currently_down = GNUNET_YES;
1364   h->peers = GNUNET_CONTAINER_multihashmap_create (128);
1365   h->retry_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
1366   while (handlers[h->hcnt].callback != NULL)
1367     h->hcnt++;
1368   GNUNET_assert (h->hcnt <
1369                  (GNUNET_SERVER_MAX_MESSAGE_SIZE -
1370                   sizeof (struct InitMessage)) / sizeof (uint16_t));
1371 #if DEBUG_CORE
1372   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1373               "Connecting to CORE service\n");
1374 #endif
1375   reconnect (h);
1376   return h;
1377 }
1378
1379
1380 /**
1381  * Disconnect from the core service.  This function can only 
1382  * be called *after* all pending 'GNUNET_CORE_notify_transmit_ready'
1383  * requests have been explicitly canceled.
1384  *
1385  * @param handle connection to core to disconnect
1386  */
1387 void
1388 GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle)
1389 {
1390   struct ControlMessage *cm;
1391   
1392 #if DEBUG_CORE
1393   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1394               "Disconnecting from CORE service\n");
1395 #endif
1396   if (handle->cth != NULL)
1397     {
1398       GNUNET_CLIENT_notify_transmit_ready_cancel (handle->cth);
1399       handle->cth = NULL;
1400     }
1401   if (handle->client != NULL)
1402     {
1403       GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
1404       handle->client = NULL;
1405     }
1406   if (handle->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
1407     {
1408       GNUNET_SCHEDULER_cancel (handle->reconnect_task);
1409       handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
1410     }
1411   while (NULL != (cm = handle->pending_head))
1412     {
1413       GNUNET_CONTAINER_DLL_remove (handle->pending_head,
1414                                    handle->pending_tail,
1415                                    cm);
1416       if (cm->th != NULL)
1417         cm->th->cm = NULL;
1418       if (cm->cont != NULL)
1419         cm->cont (cm->cont_cls, NULL);
1420       GNUNET_free (cm);
1421     }
1422   GNUNET_CONTAINER_multihashmap_iterate (handle->peers,
1423                                          &disconnect_and_free_peer_entry,
1424                                          handle);
1425   GNUNET_CONTAINER_multihashmap_destroy (handle->peers);
1426   GNUNET_break (handle->ready_peer_head == NULL);
1427   GNUNET_free (handle);
1428 }
1429
1430
1431 /**
1432  * Ask the core to call "notify" once it is ready to transmit the
1433  * given number of bytes to the specified "target".  If we are not yet
1434  * connected to the specified peer, a call to this function will cause
1435  * us to try to establish a connection.
1436  *
1437  * @param handle connection to core service
1438  * @param priority how important is the message?
1439  * @param maxdelay how long can the message wait?
1440  * @param target who should receive the message,
1441  *        use NULL for this peer (loopback)
1442  * @param notify_size how many bytes of buffer space does notify want?
1443  * @param notify function to call when buffer space is available
1444  * @param notify_cls closure for notify
1445  * @return non-NULL if the notify callback was queued,
1446  *         NULL if we can not even queue the request (insufficient
1447  *         memory); if NULL is returned, "notify" will NOT be called.
1448  */
1449 struct GNUNET_CORE_TransmitHandle *
1450 GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle,
1451                                    uint32_t priority,
1452                                    struct GNUNET_TIME_Relative maxdelay,
1453                                    const struct GNUNET_PeerIdentity *target,
1454                                    size_t notify_size,
1455                                    GNUNET_CONNECTION_TransmitReadyNotify notify,
1456                                    void *notify_cls)
1457 {
1458   struct PeerRecord *pr;
1459   struct GNUNET_CORE_TransmitHandle *th;
1460   struct GNUNET_CORE_TransmitHandle *pos;
1461   struct GNUNET_CORE_TransmitHandle *prev;
1462   struct GNUNET_CORE_TransmitHandle *minp;
1463
1464   pr = GNUNET_CONTAINER_multihashmap_get (handle->peers,
1465                                           &target->hashPubKey);
1466   if (NULL == pr)
1467     {
1468       /* attempt to send to peer that is not connected */
1469       GNUNET_log(GNUNET_ERROR_TYPE_WARNING,
1470                  "Attempting to send to peer `%s' from peer `%s', but not connected!\n",
1471                  GNUNET_i2s(target), GNUNET_h2s(&handle->me.hashPubKey));
1472       GNUNET_break (0);
1473       return NULL;
1474     }
1475   GNUNET_assert (notify_size + sizeof (struct SendMessage) <
1476                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
1477   th = GNUNET_malloc (sizeof (struct GNUNET_CORE_TransmitHandle));
1478   th->peer = pr;
1479   th->get_message = notify;
1480   th->get_message_cls = notify_cls;
1481   th->timeout = GNUNET_TIME_relative_to_absolute (maxdelay);
1482   th->priority = priority;
1483   th->msize = notify_size;
1484   /* bound queue size */
1485   if (pr->queue_size == handle->queue_size)
1486     {
1487       /* find lowest-priority entry */
1488       minp = pr->pending_head;
1489       prev = minp->next;
1490       while (prev != NULL)
1491         {
1492           if (prev->priority < minp->priority)
1493             minp = prev;
1494           prev = prev->next;
1495         }
1496       if (minp == NULL) 
1497         {
1498           GNUNET_break (handle->queue_size != 0);
1499           GNUNET_break (pr->queue_size == 0);
1500           return NULL;
1501         }
1502       if (priority <= minp->priority)
1503         {
1504 #if DEBUG_CORE
1505           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1506                       "Dropping transmission request: priority too low\n");
1507 #endif
1508           return NULL; /* priority too low */
1509         }
1510       GNUNET_CONTAINER_DLL_remove (pr->pending_head,
1511                                    pr->pending_tail,
1512                                    minp);
1513       pr->queue_size--;
1514       GNUNET_assert (0 ==
1515                      minp->get_message (minp->get_message_cls,
1516                                         0, NULL));
1517       GNUNET_free (minp);
1518     }
1519
1520   /* Order entries by deadline, but SKIP 'HEAD' if
1521      we're in the 'ready_peer_*' DLL */
1522   pos = pr->pending_head;
1523   if ( (pr->prev != NULL) ||
1524        (pr->next != NULL) ||
1525        (pr == handle->ready_peer_head) )
1526     {
1527       GNUNET_assert (pos != NULL);
1528       pos = pos->next; /* skip head */
1529     }
1530
1531   /* insertion sort */
1532   prev = pos;
1533   while ( (pos != NULL) &&
1534           (pos->timeout.abs_value < th->timeout.abs_value) )      
1535     {
1536       prev = pos;
1537       pos = pos->next;
1538     }
1539   GNUNET_CONTAINER_DLL_insert_after (pr->pending_head,
1540                                      pr->pending_tail,
1541                                      prev,
1542                                      th);
1543   pr->queue_size++;
1544   /* was the request queue previously empty? */
1545 #if DEBUG_CORE
1546   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1547               "Transmission request added to queue\n");
1548 #endif
1549   if (pr->pending_head == th) 
1550     request_next_transmission (pr);
1551   return th;
1552 }
1553
1554
1555 /**
1556  * Cancel the specified transmission-ready notification.
1557  *
1558  * @param th handle that was returned by "notify_transmit_ready".
1559  */
1560 void
1561 GNUNET_CORE_notify_transmit_ready_cancel (struct GNUNET_CORE_TransmitHandle
1562                                           *th)
1563 {
1564   struct PeerRecord *pr = th->peer;
1565   struct GNUNET_CORE_Handle *h = pr->ch;
1566   int was_head;
1567   
1568   was_head = (pr->pending_head == th);
1569   GNUNET_CONTAINER_DLL_remove (pr->pending_head,
1570                                pr->pending_tail,
1571                                th);    
1572   pr->queue_size--;
1573   if (th->cm != NULL)
1574     {
1575       /* we're currently in the control queue, remove */
1576       GNUNET_CONTAINER_DLL_remove (h->pending_head,
1577                                    h->pending_tail,
1578                                    th->cm);
1579       GNUNET_free (th->cm);      
1580     }
1581   GNUNET_free (th);
1582   if (was_head)
1583     {
1584       if ( (pr->prev != NULL) ||
1585            (pr->next != NULL) ||
1586            (pr == h->ready_peer_head) )
1587         {
1588           /* the request that was 'approved' by core was
1589              canceled before it could be transmitted; remove
1590              us from the 'ready' list */
1591           GNUNET_CONTAINER_DLL_remove (h->ready_peer_head,
1592                                        h->ready_peer_tail,
1593                                        pr);
1594         }
1595       request_next_transmission (pr);
1596     }
1597 }
1598
1599
1600 /* ****************** GNUNET_CORE_peer_request_connect ******************** */
1601
1602 /**
1603  * Handle for a request to the core to connect to
1604  * a particular peer.  Can be used to cancel the request
1605  * (before the 'cont'inuation is called).
1606  */
1607 struct GNUNET_CORE_PeerRequestHandle
1608 {
1609
1610   /**
1611    * Link to control message.
1612    */
1613   struct ControlMessage *cm;
1614
1615   /**
1616    * Core handle used.
1617    */
1618   struct GNUNET_CORE_Handle *h;
1619
1620   /**
1621    * Continuation to run when done.
1622    */
1623   GNUNET_SCHEDULER_Task cont;
1624
1625   /**
1626    * Closure for 'cont'.
1627    */
1628   void *cont_cls;
1629
1630 };
1631
1632
1633 /**
1634  * Continuation called when the control message was transmitted.
1635  * Calls the original continuation and frees the remaining
1636  * resources.
1637  *
1638  * @param cls the 'struct GNUNET_CORE_PeerRequestHandle'
1639  * @param tc scheduler context
1640  */
1641 static void
1642 peer_request_connect_cont (void *cls,
1643                            const struct GNUNET_SCHEDULER_TaskContext *tc)
1644 {
1645   struct GNUNET_CORE_PeerRequestHandle *ret = cls;
1646   
1647   if (ret->cont != NULL)
1648     {
1649       if (tc == NULL)
1650         GNUNET_SCHEDULER_add_now (ret->cont,
1651                                   ret->cont_cls);
1652       else
1653         ret->cont (ret->cont_cls, tc);
1654     }
1655   GNUNET_free (ret);
1656 }
1657
1658
1659 /**
1660  * Request that the core should try to connect to a particular peer.
1661  * Once the request has been transmitted to the core, the continuation
1662  * function will be called.  Note that this does NOT mean that a
1663  * connection was successfully established -- it only means that the
1664  * core will now try.  Successful establishment of the connection
1665  * will be signalled to the 'connects' callback argument of
1666  * 'GNUNET_CORE_connect' only.  If the core service does not respond
1667  * to our connection attempt within the given time frame, 'cont' will
1668  * be called with the TIMEOUT reason code.
1669  *
1670  * @param h core handle
1671  * @param timeout how long to try to talk to core
1672  * @param peer who should we connect to
1673  * @param cont function to call once the request has been completed (or timed out)
1674  * @param cont_cls closure for cont
1675  * @return NULL on error (cont will not be called), otherwise handle for cancellation
1676  */
1677 struct GNUNET_CORE_PeerRequestHandle *
1678 GNUNET_CORE_peer_request_connect (struct GNUNET_CORE_Handle *h,
1679                                   struct GNUNET_TIME_Relative timeout,
1680                                   const struct GNUNET_PeerIdentity * peer,
1681                                   GNUNET_SCHEDULER_Task cont,
1682                                   void *cont_cls)
1683 {
1684   struct GNUNET_CORE_PeerRequestHandle *ret;
1685   struct ControlMessage *cm;
1686   struct ConnectMessage *msg;
1687   
1688   cm = GNUNET_malloc (sizeof (struct ControlMessage) + 
1689                       sizeof (struct ConnectMessage));
1690   msg = (struct ConnectMessage*) &cm[1];
1691   msg->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_REQUEST_CONNECT);
1692   msg->header.size = htons (sizeof (struct ConnectMessage));
1693   msg->reserved = htonl (0);
1694   msg->timeout = GNUNET_TIME_relative_hton (timeout);
1695   msg->peer = *peer;
1696   GNUNET_CONTAINER_DLL_insert (h->pending_head,
1697                                h->pending_tail,
1698                                cm);
1699   ret = GNUNET_malloc (sizeof (struct GNUNET_CORE_PeerRequestHandle));
1700   ret->h = h;
1701   ret->cm = cm;
1702   ret->cont = cont;
1703   ret->cont_cls = cont_cls;
1704   cm->cont = &peer_request_connect_cont;
1705   cm->cont_cls = ret;
1706 #if DEBUG_CORE
1707   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1708               "Queueing REQUEST_CONNECT request\n");
1709 #endif
1710   if (h->pending_head == cm)
1711     trigger_next_request (h, GNUNET_NO);
1712   return ret;
1713 }
1714
1715
1716 /**
1717  * Cancel a pending request to connect to a particular peer.  Must not
1718  * be called after the 'cont' function was invoked.
1719  *
1720  * @param req request handle that was returned for the original request
1721  */
1722 void
1723 GNUNET_CORE_peer_request_connect_cancel (struct GNUNET_CORE_PeerRequestHandle *req)
1724 {
1725   struct GNUNET_CORE_Handle *h = req->h;
1726   struct ControlMessage *cm = req->cm;
1727
1728   GNUNET_CONTAINER_DLL_remove (h->pending_head,
1729                                h->pending_tail,
1730                                cm);
1731   GNUNET_free (cm);
1732   GNUNET_free (req);
1733 }
1734
1735
1736 /* ****************** GNUNET_CORE_peer_change_preference ******************** */
1737
1738
1739 struct GNUNET_CORE_InformationRequestContext 
1740 {
1741   
1742   /**
1743    * Our connection to the service.
1744    */
1745   struct GNUNET_CORE_Handle *h;
1746
1747   /**
1748    * Function to call with the information.
1749    */
1750   GNUNET_CORE_PeerConfigurationInfoCallback info;
1751
1752   /**
1753    * Closure for info.
1754    */
1755   void *info_cls;
1756
1757   /**
1758    * Link to control message, NULL if CM was sent.
1759    */ 
1760   struct ControlMessage *cm;
1761
1762   /**
1763    * Link to peer record.
1764    */
1765   struct PeerRecord *pr;
1766 };
1767
1768
1769 /**
1770  * CM was sent, remove link so we don't double-free.
1771  *
1772  * @param cls the 'struct GNUNET_CORE_InformationRequestContext'
1773  * @param tc scheduler context
1774  */
1775 static void
1776 change_preference_send_continuation (void *cls,
1777                                      const struct GNUNET_SCHEDULER_TaskContext *tc)
1778 {
1779   struct GNUNET_CORE_InformationRequestContext *irc = cls;
1780
1781   irc->cm = NULL;
1782 }
1783
1784
1785 /**
1786  * Obtain statistics and/or change preferences for the given peer.
1787  *
1788  * @param h core handle
1789  * @param peer identifies the peer
1790  * @param timeout after how long should we give up (and call "info" with NULL
1791  *                for "peer" to signal an error)?
1792  * @param bw_out set to the current bandwidth limit (sending) for this peer,
1793  *                caller should set "bw_out" to "-1" to avoid changing
1794  *                the current value; otherwise "bw_out" will be lowered to
1795  *                the specified value; passing a pointer to "0" can be used to force
1796  *                us to disconnect from the peer; "bw_out" might not increase
1797  *                as specified since the upper bound is generally
1798  *                determined by the other peer!
1799  * @param amount reserve N bytes for receiving, negative
1800  *                amounts can be used to undo a (recent) reservation;
1801  * @param preference increase incoming traffic share preference by this amount;
1802  *                in the absence of "amount" reservations, we use this
1803  *                preference value to assign proportional bandwidth shares
1804  *                to all connected peers
1805  * @param info function to call with the resulting configuration information
1806  * @param info_cls closure for info
1807  * @return NULL on error
1808  */
1809 struct GNUNET_CORE_InformationRequestContext *
1810 GNUNET_CORE_peer_change_preference (struct GNUNET_CORE_Handle *h,
1811                                     const struct GNUNET_PeerIdentity *peer,
1812                                     struct GNUNET_TIME_Relative timeout,
1813                                     struct GNUNET_BANDWIDTH_Value32NBO bw_out,
1814                                     int32_t amount,
1815                                     uint64_t preference,
1816                                     GNUNET_CORE_PeerConfigurationInfoCallback info,
1817                                     void *info_cls)
1818 {
1819   struct GNUNET_CORE_InformationRequestContext *irc;
1820   struct PeerRecord *pr;
1821   struct RequestInfoMessage *rim;
1822   struct ControlMessage *cm;
1823
1824   pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
1825                                           &peer->hashPubKey);
1826   if (NULL == pr)
1827     {
1828       /* attempt to change preference on peer that is not connected */
1829       GNUNET_break (0);
1830       return NULL;
1831     }
1832   if (pr->pcic != NULL)
1833     {
1834       /* second change before first one is done */
1835       GNUNET_break (0);
1836       return NULL;
1837     }
1838   irc = GNUNET_malloc (sizeof (struct GNUNET_CORE_InformationRequestContext));
1839   irc->h = h;
1840   irc->pr = pr;
1841   irc->info = info;
1842   irc->info_cls = info_cls;
1843   cm = GNUNET_malloc (sizeof (struct ControlMessage) +
1844                       sizeof (struct RequestInfoMessage));
1845   cm->cont = &change_preference_send_continuation;
1846   cm->cont_cls = irc;
1847   irc->cm = cm;
1848   rim = (struct RequestInfoMessage*) &cm[1];
1849   rim->header.size = htons (sizeof (struct RequestInfoMessage));
1850   rim->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_REQUEST_INFO);
1851   rim->rim_id = htonl (pr->rim_id = h->rim_id_gen++);
1852   rim->limit_outbound = bw_out;
1853   rim->reserve_inbound = htonl (amount);
1854   rim->preference_change = GNUNET_htonll(preference);
1855   rim->peer = *peer;
1856 #if DEBUG_CORE
1857   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1858               "Queueing CHANGE PREFERENCE request\n");
1859 #endif
1860   GNUNET_CONTAINER_DLL_insert (h->pending_head,
1861                                h->pending_tail,
1862                                cm); 
1863   pr->pcic = info;
1864   pr->pcic_cls = info_cls;
1865   if (h->pending_head == cm)
1866     trigger_next_request (h, GNUNET_NO);
1867   return irc;
1868 }
1869
1870
1871 /**
1872  * Cancel request for getting information about a peer.
1873  * Note that an eventual change in preference, trust or bandwidth
1874  * assignment MAY have already been committed at the time, 
1875  * so cancelling a request is NOT sure to undo the original
1876  * request.  The original request may or may not still commit.
1877  * The only thing cancellation ensures is that the callback
1878  * from the original request will no longer be called.
1879  *
1880  * @param irc context returned by the original GNUNET_CORE_peer_get_info call
1881  */
1882 void
1883 GNUNET_CORE_peer_change_preference_cancel (struct GNUNET_CORE_InformationRequestContext *irc)
1884 {
1885   struct GNUNET_CORE_Handle *h = irc->h;
1886   struct PeerRecord *pr = irc->pr;
1887
1888   if (irc->cm != NULL)
1889     {
1890       GNUNET_CONTAINER_DLL_remove (h->pending_head,
1891                                    h->pending_tail,
1892                                    irc->cm);
1893       GNUNET_free (irc->cm);
1894     }
1895   pr->pcic = NULL;
1896   pr->pcic_cls = NULL;
1897   GNUNET_free (irc);
1898 }
1899
1900
1901 /* end of core_api.c */