fixing 1622
[oweals/gnunet.git] / src / core / core_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file core/core_api.c
23  * @brief core service; this is the main API for encrypted P2P
24  *        communications
25  * @author Christian Grothoff
26  *
27  * TODO:
28  * - implement atsi parsing and passing
29  */
30 #include "platform.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_core_service.h"
33 #include "core.h"
34
35
36 /**
37  * Information we track for each peer.
38  */
39 struct PeerRecord
40 {
41
42   /**
43    * We generally do NOT keep peer records in a DLL; this
44    * DLL is only used IF this peer's 'pending_head' message
45    * is ready for transmission.  
46    */
47   struct PeerRecord *prev;
48
49   /**
50    * We generally do NOT keep peer records in a DLL; this
51    * DLL is only used IF this peer's 'pending_head' message
52    * is ready for transmission. 
53    */
54   struct PeerRecord *next;
55
56   /**
57    * Peer the record is about.
58    */
59   struct GNUNET_PeerIdentity peer;
60
61   /**
62    * Corresponding core handle.
63    */
64   struct GNUNET_CORE_Handle *ch;
65
66   /**
67    * Head of doubly-linked list of pending requests.
68    * Requests are sorted by deadline *except* for HEAD,
69    * which is only modified upon transmission to core.
70    */
71   struct GNUNET_CORE_TransmitHandle *pending_head;
72
73   /**
74    * Tail of doubly-linked list of pending requests.
75    */
76   struct GNUNET_CORE_TransmitHandle *pending_tail;
77
78   /**
79    * Pending callback waiting for peer information, or NULL for none.
80    */
81   GNUNET_CORE_PeerConfigurationInfoCallback pcic;
82
83   /**
84    * Closure for pcic.
85    */
86   void *pcic_cls;
87
88   /**
89    * Request information ID for the given pcic (needed in case a
90    * request is cancelled after being submitted to core and a new
91    * one is generated; in this case, we need to avoid matching the
92    * reply to the first (cancelled) request to the second request).
93    */
94   uint32_t rim_id;
95
96   /**
97    * ID of timeout task for the 'pending_head' handle
98    * which is the one with the smallest timeout. 
99    */
100   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
101
102   /**
103    * Current size of the queue of pending requests.
104    */
105   unsigned int queue_size;
106
107   /**
108    * SendMessageRequest ID generator for this peer.
109    */
110   uint16_t smr_id_gen;
111   
112 };
113
114
115 /**
116  * Entry in a doubly-linked list of control messages to be transmitted
117  * to the core service.  Control messages include traffic allocation,
118  * connection requests and of course our initial 'init' request.
119  * 
120  * The actual message is allocated at the end of this struct.
121  */
122 struct ControlMessage
123 {
124   /**
125    * This is a doubly-linked list.
126    */
127   struct ControlMessage *next;
128
129   /**
130    * This is a doubly-linked list.
131    */
132   struct ControlMessage *prev;
133
134   /**
135    * Function to run after successful transmission (or call with
136    * reason 'TIMEOUT' on error); called with scheduler context 'NULL'
137    * on disconnect.
138    */
139   GNUNET_SCHEDULER_Task cont;
140
141   /**
142    * Closure for 'cont'.
143    */
144   void *cont_cls;
145
146   /**
147    * Transmit handle (if one is associated with this ControlMessage), or NULL.
148    */
149   struct GNUNET_CORE_TransmitHandle *th;
150 };
151
152
153
154 /**
155  * Context for the core service connection.
156  */
157 struct GNUNET_CORE_Handle
158 {
159
160   /**
161    * Configuration we're using.
162    */
163   const struct GNUNET_CONFIGURATION_Handle *cfg;
164
165   /**
166    * Closure for the various callbacks.
167    */
168   void *cls;
169
170   /**
171    * Function to call once we've handshaked with the core service.
172    */
173   GNUNET_CORE_StartupCallback init;
174
175   /**
176    * Function to call whenever we're notified about a peer connecting.
177    */
178   GNUNET_CORE_ConnectEventHandler connects;
179
180   /**
181    * Function to call whenever we're notified about a peer disconnecting.
182    */
183   GNUNET_CORE_DisconnectEventHandler disconnects;
184
185   /**
186    * Function to call whenever we're notified about a peer changing status.
187    */  
188   GNUNET_CORE_PeerStatusEventHandler status_events;
189   
190   /**
191    * Function to call whenever we receive an inbound message.
192    */
193   GNUNET_CORE_MessageCallback inbound_notify;
194
195   /**
196    * Function to call whenever we receive an outbound message.
197    */
198   GNUNET_CORE_MessageCallback outbound_notify;
199
200   /**
201    * Function handlers for messages of particular type.
202    */
203   const struct GNUNET_CORE_MessageHandler *handlers;
204
205   /**
206    * Our connection to the service.
207    */
208   struct GNUNET_CLIENT_Connection *client;
209
210   /**
211    * Handle for our current transmission request.
212    */
213   struct GNUNET_CLIENT_TransmitHandle *cth;
214
215   /**
216    * Head of doubly-linked list of pending requests.
217    */
218   struct ControlMessage *pending_head;
219
220   /**
221    * Tail of doubly-linked list of pending requests.
222    */
223   struct ControlMessage *pending_tail;
224
225   /**
226    * Head of doubly-linked list of peers that are core-approved
227    * to send their next message.
228    */
229   struct PeerRecord *ready_peer_head;
230
231   /**
232    * Tail of doubly-linked list of peers that are core-approved
233    * to send their next message.
234    */
235   struct PeerRecord *ready_peer_tail;
236
237   /**
238    * Hash map listing all of the peers that we are currently
239    * connected to.
240    */
241   struct GNUNET_CONTAINER_MultiHashMap *peers;
242
243   /**
244    * Identity of this peer.
245    */
246   struct GNUNET_PeerIdentity me;
247
248   /**
249    * ID of reconnect task (if any).
250    */
251   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
252
253   /**
254    * Current delay we use for re-trying to connect to core.
255    */
256   struct GNUNET_TIME_Relative retry_backoff;
257
258   /**
259    * Request information ID generator.
260    */
261   uint32_t rim_id_gen;
262
263   /**
264    * Number of messages we are allowed to queue per target.
265    */
266   unsigned int queue_size;
267
268   /**
269    * Number of entries in the handlers array.
270    */
271   unsigned int hcnt;
272
273   /**
274    * For inbound notifications without a specific handler, do
275    * we expect to only receive headers?
276    */
277   int inbound_hdr_only;
278
279   /**
280    * For outbound notifications without a specific handler, do
281    * we expect to only receive headers?
282    */
283   int outbound_hdr_only;
284
285   /**
286    * Are we currently disconnected and hence unable to forward
287    * requests?
288    */
289   int currently_down;
290
291 };
292
293
294 /**
295  * Handle for a transmission request.
296  */
297 struct GNUNET_CORE_TransmitHandle
298 {
299
300   /**
301    * We keep active transmit handles in a doubly-linked list.
302    */
303   struct GNUNET_CORE_TransmitHandle *next;
304
305   /**
306    * We keep active transmit handles in a doubly-linked list.
307    */
308   struct GNUNET_CORE_TransmitHandle *prev;
309
310   /**
311    * Corresponding peer record.
312    */
313   struct PeerRecord *peer;
314
315   /**
316    * Corresponding SEND_REQUEST message.  Only non-NULL 
317    * while SEND_REQUEST message is pending.
318    */
319   struct ControlMessage *cm;
320
321   /**
322    * Function that will be called to get the actual request
323    * (once we are ready to transmit this request to the core).
324    * The function will be called with a NULL buffer to signal
325    * timeout.
326    */
327   GNUNET_CONNECTION_TransmitReadyNotify get_message;
328
329   /**
330    * Closure for get_message.
331    */
332   void *get_message_cls;
333
334   /**
335    * Timeout for this handle.
336    */
337   struct GNUNET_TIME_Absolute timeout;
338
339   /**
340    * How important is this message?
341    */
342   uint32_t priority;
343
344   /**
345    * Size of this request.
346    */
347   uint16_t msize;
348
349   /**
350    * Send message request ID for this request.
351    */
352   uint16_t smr_id;
353
354 };
355
356
357 /**
358  * Our current client connection went down.  Clean it up
359  * and try to reconnect!
360  *
361  * @param h our handle to the core service
362  */
363 static void
364 reconnect (struct GNUNET_CORE_Handle *h);
365
366
367 /**
368  * Task schedule to try to re-connect to core.
369  *
370  * @param cls the 'struct GNUNET_CORE_Handle'
371  * @param tc task context
372  */
373 static void
374 reconnect_task (void *cls, 
375                 const struct GNUNET_SCHEDULER_TaskContext *tc)
376 {
377   struct GNUNET_CORE_Handle *h = cls;
378
379   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
380 #if DEBUG_CORE
381   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
382               "Connecting to CORE service after delay\n");
383 #endif
384   reconnect (h);
385 }
386
387
388 /**
389  * Notify clients about disconnect and free 
390  * the entry for connected peer.
391  *
392  * @param cls the 'struct GNUNET_CORE_Handle*'
393  * @param key the peer identity (not used)
394  * @param value the 'struct PeerRecord' to free.
395  * @return GNUNET_YES (continue)
396  */
397 static int
398 disconnect_and_free_peer_entry (void *cls,
399                                 const GNUNET_HashCode *key,
400                                 void *value)
401 {
402   static struct GNUNET_BANDWIDTH_Value32NBO zero;
403   struct GNUNET_CORE_Handle *h = cls;
404   struct GNUNET_CORE_TransmitHandle *th;
405   struct PeerRecord *pr = value;
406   GNUNET_CORE_PeerConfigurationInfoCallback pcic;
407
408   while (NULL != (th = pr->pending_head))
409     {
410       GNUNET_CONTAINER_DLL_remove (pr->pending_head,
411                                    pr->pending_tail,
412                                    th);
413       pr->queue_size--;
414       GNUNET_assert (0 == 
415                      th->get_message (th->get_message_cls,
416                                       0, NULL));
417       GNUNET_free (th);
418     }
419   if (NULL != (pcic = pr->pcic))
420     {
421       pr->pcic = NULL;
422       pcic (pr->pcic_cls,
423             &pr->peer,
424             zero,
425             0, 0);
426     }
427   if (pr->timeout_task != GNUNET_SCHEDULER_NO_TASK)
428     {
429       GNUNET_SCHEDULER_cancel (pr->timeout_task);
430       pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
431     }
432   GNUNET_assert (pr->queue_size == 0);
433   if ( (pr->prev != NULL) ||
434        (pr->next != NULL) ||
435        (h->ready_peer_head == pr) )
436     GNUNET_CONTAINER_DLL_remove (h->ready_peer_head,
437                                  h->ready_peer_tail,
438                                  pr);
439   if (h->disconnects != NULL)
440     h->disconnects (h->cls,
441                     &pr->peer);    
442   GNUNET_assert (GNUNET_YES ==
443                  GNUNET_CONTAINER_multihashmap_remove (h->peers,
444                                                        key,
445                                                        pr));
446   GNUNET_assert (pr->pending_head == NULL);
447   GNUNET_assert (pr->pending_tail == NULL);
448   GNUNET_assert (pr->ch = h);
449   GNUNET_assert (pr->queue_size == 0);
450   GNUNET_assert (pr->timeout_task == GNUNET_SCHEDULER_NO_TASK);
451   GNUNET_free (pr);  
452   return GNUNET_YES;
453 }
454
455
456 /**
457  * Close down any existing connection to the CORE service and
458  * try re-establishing it later.
459  *
460  * @param h our handle
461  */
462 static void
463 reconnect_later (struct GNUNET_CORE_Handle *h)
464 {
465   struct ControlMessage *cm;
466
467   while (NULL != (cm = h->pending_head))
468     {
469       GNUNET_CONTAINER_DLL_remove (h->pending_head,
470                                    h->pending_tail,
471                                    cm);      
472       cm->cont (cm->cont_cls, NULL);
473       GNUNET_free (cm);
474     }
475   if (h->client != NULL)
476     {
477       GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
478       h->client = NULL;
479       GNUNET_CONTAINER_multihashmap_iterate (h->peers,
480                                              &disconnect_and_free_peer_entry,
481                                              h);
482     }
483   GNUNET_assert (h->pending_head == NULL);
484   h->currently_down = GNUNET_YES;
485   GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
486   h->retry_backoff = GNUNET_TIME_relative_min (GNUNET_TIME_UNIT_SECONDS,
487                                                h->retry_backoff);
488   h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->retry_backoff,
489                                                     &reconnect_task,
490                                                     h);
491   h->retry_backoff = GNUNET_TIME_relative_multiply (h->retry_backoff, 2);
492 }
493
494
495 /**
496  * Check the list of pending requests, send the next
497  * one to the core.
498  *
499  * @param h core handle
500  * @param ignore_currently_down transmit message even if not initialized?
501  */
502 static void
503 trigger_next_request (struct GNUNET_CORE_Handle *h,
504                       int ignore_currently_down);
505
506
507 /**
508  * The given request hit its timeout.  Remove from the
509  * doubly-linked list and call the respective continuation.
510  *
511  * @param cls the transmit handle of the request that timed out
512  * @param tc context, can be NULL (!)
513  */
514 static void
515 transmission_timeout (void *cls, 
516                       const struct GNUNET_SCHEDULER_TaskContext *tc);
517
518
519 /**
520  * Control message was sent, mark it as such.
521  *
522  * @param cls the 'struct GNUNET_CORE_TransmitHandle*'
523  * @param tc scheduler context
524  */
525 static void
526 mark_control_message_sent (void *cls,
527                            const struct GNUNET_SCHEDULER_TaskContext *tc)
528 {
529   struct GNUNET_CORE_TransmitHandle *th = cls;
530
531   th->cm = NULL;
532 }
533
534
535 /**
536  * Send a control message to the peer asking for transmission
537  * of the message in the given peer record.
538  *
539  * @param pr peer to request transmission to
540  */
541 static void
542 request_next_transmission (struct PeerRecord *pr)
543 {
544   struct GNUNET_CORE_Handle *h = pr->ch;
545   struct ControlMessage *cm;
546   struct SendMessageRequest *smr;
547   struct GNUNET_CORE_TransmitHandle *th;
548
549   if (pr->timeout_task != GNUNET_SCHEDULER_NO_TASK)
550     {
551       GNUNET_SCHEDULER_cancel (pr->timeout_task);
552       pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
553     }
554   if (NULL == (th = pr->pending_head))
555     {
556       trigger_next_request (h, GNUNET_NO);
557       return;
558     }
559   GNUNET_assert (pr->prev == NULL);
560   GNUNET_assert (pr->next == NULL);
561   pr->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (th->timeout),
562                                                    &transmission_timeout,
563                                                    pr);
564   cm = GNUNET_malloc (sizeof (struct ControlMessage) + 
565                       sizeof (struct SendMessageRequest));
566   cm->cont = &mark_control_message_sent;
567   cm->cont_cls = th;
568   th->cm = cm;
569   cm->th = th;
570   smr = (struct SendMessageRequest*) &cm[1];
571   smr->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND_REQUEST);
572   smr->header.size = htons (sizeof (struct SendMessageRequest));
573   smr->priority = htonl (th->priority);
574   smr->deadline = GNUNET_TIME_absolute_hton (th->timeout);
575   smr->peer = pr->peer;
576   smr->queue_size = htonl (pr->queue_size);
577   smr->size = htons (th->msize);
578   smr->smr_id = htons (th->smr_id = pr->smr_id_gen++);
579   GNUNET_CONTAINER_DLL_insert_after (h->pending_head,
580                                      h->pending_tail,
581                                      h->pending_tail,
582                                      cm);
583 #if DEBUG_CORE
584   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
585               "Adding SEND REQUEST for peer `%s' to message queue\n",
586               GNUNET_i2s (&pr->peer));
587 #endif
588   trigger_next_request (h, GNUNET_NO);
589 }
590
591
592 /**
593  * The given request hit its timeout.  Remove from the
594  * doubly-linked list and call the respective continuation.
595  *
596  * @param cls the transmit handle of the request that timed out
597  * @param tc context, can be NULL (!)
598  */
599 static void
600 transmission_timeout (void *cls, 
601                       const struct GNUNET_SCHEDULER_TaskContext *tc)
602 {
603   struct PeerRecord *pr = cls;
604   struct GNUNET_CORE_TransmitHandle *th;
605
606   pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
607   th = pr->pending_head;
608   GNUNET_CONTAINER_DLL_remove (pr->pending_head,
609                                pr->pending_tail,
610                                th);
611   pr->queue_size--;
612 #if DEBUG_CORE
613   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
614               "Signalling timeout of request for transmission to CORE service\n");
615 #endif
616   GNUNET_assert (0 == th->get_message (th->get_message_cls, 0, NULL));
617   request_next_transmission (pr);
618 }
619
620
621 /**
622  * Transmit the next message to the core service.
623  */
624 static size_t
625 transmit_message (void *cls,
626                   size_t size, 
627                   void *buf)
628 {
629   struct GNUNET_CORE_Handle *h = cls;
630   struct ControlMessage *cm;
631   struct GNUNET_CORE_TransmitHandle *th;
632   struct PeerRecord *pr;
633   struct SendMessage *sm;
634   const struct GNUNET_MessageHeader *hdr;
635   uint16_t msize;
636   size_t ret;
637
638   h->cth = NULL;
639   if (buf == NULL)
640     {
641 #if DEBUG_CORE
642       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
643                   "Transmission failed, initiating reconnect\n");
644 #endif
645       reconnect_later (h);
646       return 0;
647     }
648   /* first check for control messages */
649   if (NULL != (cm = h->pending_head))
650     {
651       hdr = (const struct GNUNET_MessageHeader*) &cm[1];
652       msize = ntohs (hdr->size);
653       if (size < msize)
654         {
655           trigger_next_request (h, GNUNET_NO);
656           return 0;
657         }
658 #if DEBUG_CORE
659       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
660                   "Transmitting control message with %u bytes of type %u to core.\n",
661                   (unsigned int) msize,
662                   (unsigned int) ntohs (hdr->type));
663 #endif
664       memcpy (buf, hdr, msize);
665       GNUNET_CONTAINER_DLL_remove (h->pending_head,
666                                    h->pending_tail,
667                                    cm);     
668       if (cm->th != NULL)
669         cm->th->cm = NULL;
670       if (NULL != cm->cont)
671         GNUNET_SCHEDULER_add_continuation (cm->cont, 
672                                            cm->cont_cls,
673                                            GNUNET_SCHEDULER_REASON_PREREQ_DONE);
674       GNUNET_free (cm);
675       trigger_next_request (h, GNUNET_NO);
676       return msize;
677     }
678   /* now check for 'ready' P2P messages */
679   if (NULL != (pr = h->ready_peer_head))
680     {
681       th = pr->pending_head;
682       if (size < th->msize + sizeof (struct SendMessage))
683         {
684           trigger_next_request (h, GNUNET_NO);
685           return 0;
686         }
687       GNUNET_CONTAINER_DLL_remove (h->ready_peer_head,
688                                    h->ready_peer_tail,
689                                    pr);
690       GNUNET_CONTAINER_DLL_remove (pr->pending_head,
691                                    pr->pending_tail,
692                                    th);
693       pr->queue_size--;
694       if (pr->timeout_task != GNUNET_SCHEDULER_NO_TASK)
695         {
696           GNUNET_SCHEDULER_cancel (pr->timeout_task);
697           pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
698         }
699 #if DEBUG_CORE
700       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
701                   "Transmitting SEND request to `%s' with %u bytes.\n",
702                   GNUNET_i2s (&pr->peer),
703                   (unsigned int) th->msize);
704 #endif
705       sm = (struct SendMessage *) buf;
706       sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND);
707       sm->priority = htonl (th->priority);
708       sm->deadline = GNUNET_TIME_absolute_hton (th->timeout);
709       sm->peer = pr->peer;
710       ret = th->get_message (th->get_message_cls,
711                              size - sizeof (struct SendMessage),
712                              &sm[1]);
713
714       if (0 == ret)
715         {
716 #if DEBUG_CORE
717           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
718                       "Size of clients message to peer %s is 0!\n",
719                       GNUNET_i2s(&pr->peer));
720 #endif
721           /* client decided to send nothing! */
722           request_next_transmission (pr);
723           return 0;       
724         }
725 #if DEBUG_CORE
726       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
727                   "Produced SEND message to core with %u bytes payload\n",
728                   (unsigned int) ret);
729 #endif
730       GNUNET_assert (ret >= sizeof (struct GNUNET_MessageHeader));
731       if (ret + sizeof (struct SendMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
732         {
733           GNUNET_break (0);
734           request_next_transmission (pr);
735           return 0;
736         }
737       ret += sizeof (struct SendMessage);
738       sm->header.size = htons (ret);
739       GNUNET_assert (ret <= size);
740       GNUNET_free (th);
741       request_next_transmission (pr);
742       return ret;
743     }
744   return 0;
745 }
746
747
748 /**
749  * Check the list of pending requests, send the next
750  * one to the core.
751  *
752  * @param h core handle
753  * @param ignore_currently_down transmit message even if not initialized?
754  */
755 static void
756 trigger_next_request (struct GNUNET_CORE_Handle *h,
757                       int ignore_currently_down)
758 {
759   uint16_t msize;
760
761   if ( (GNUNET_YES == h->currently_down) &&
762        (ignore_currently_down == GNUNET_NO) )
763     {
764 #if DEBUG_CORE
765       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
766                   "Core connection down, not processing queue\n");
767 #endif
768       return;
769     }
770   if (NULL != h->cth)
771     {
772 #if DEBUG_CORE
773       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
774                   "Request pending, not processing queue\n");
775 #endif
776       return;
777     }
778   if (h->pending_head != NULL)
779     msize = ntohs (((struct GNUNET_MessageHeader*) &h->pending_head[1])->size);    
780   else if (h->ready_peer_head != NULL)
781     msize = h->ready_peer_head->pending_head->msize + sizeof (struct SendMessage);    
782   else
783     {
784 #if DEBUG_CORE
785       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
786                   "Request queue empty, not processing queue\n");
787 #endif
788       return; /* no pending message */
789     }
790   h->cth = GNUNET_CLIENT_notify_transmit_ready (h->client,
791                                                 msize,
792                                                 GNUNET_TIME_UNIT_FOREVER_REL,
793                                                 GNUNET_NO,
794                                                 &transmit_message, h);
795 }
796
797
798 /**
799  * Handler for notification messages received from the core.
800  *
801  * @param cls our "struct GNUNET_CORE_Handle"
802  * @param msg the message received from the core service
803  */
804 static void
805 main_notify_handler (void *cls, 
806                      const struct GNUNET_MessageHeader *msg)
807 {
808   struct GNUNET_CORE_Handle *h = cls;
809   const struct InitReplyMessage *m;
810   const struct ConnectNotifyMessage *cnm;
811   const struct DisconnectNotifyMessage *dnm;
812   const struct NotifyTrafficMessage *ntm;
813   const struct GNUNET_MessageHeader *em;
814   const struct ConfigurationInfoMessage *cim;
815   const struct PeerStatusNotifyMessage *psnm;
816   const struct SendMessageReady *smr;
817   const struct GNUNET_CORE_MessageHandler *mh;
818   GNUNET_CORE_StartupCallback init;
819   GNUNET_CORE_PeerConfigurationInfoCallback pcic;
820   struct PeerRecord *pr;
821   struct GNUNET_CORE_TransmitHandle *th;
822   unsigned int hpos;
823   int trigger;
824   uint16_t msize;
825   uint16_t et;
826
827   if (msg == NULL)
828     {
829       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
830                   _
831                   ("Client was disconnected from core service, trying to reconnect.\n"));
832       reconnect_later (h);
833       return;
834     }
835   msize = ntohs (msg->size);
836 #if DEBUG_CORE
837   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
838               "Processing message of type %u and size %u from core service\n",
839               ntohs (msg->type), msize);
840 #endif
841   switch (ntohs (msg->type))
842     {
843     case GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY:
844       if (ntohs (msg->size) != sizeof (struct InitReplyMessage))
845         {
846           GNUNET_break (0);
847           reconnect_later (h);
848           return;
849         }
850       m = (const struct InitReplyMessage *) msg;
851       GNUNET_break (0 == ntohl (m->reserved));
852       /* start our message processing loop */
853       if (GNUNET_YES == h->currently_down)
854         {
855           h->currently_down = GNUNET_NO;
856           trigger_next_request (h, GNUNET_NO);
857         }
858       h->retry_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
859       if (NULL != (init = h->init))
860         {
861           /* mark so we don't call init on reconnect */
862           h->init = NULL;
863           GNUNET_CRYPTO_hash (&m->publicKey,
864                               sizeof (struct
865                                       GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
866                               &h->me.hashPubKey);
867 #if DEBUG_CORE
868           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
869                       "Connected to core service of peer `%s'.\n",
870                       GNUNET_i2s (&h->me));
871 #endif
872           init (h->cls, h, &h->me, &m->publicKey);
873         }
874       else
875         {
876 #if DEBUG_CORE
877           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
878                       "Successfully reconnected to core service.\n");
879 #endif
880         }
881       break;
882     case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT:
883       if (msize != sizeof (struct ConnectNotifyMessage))
884         {
885           GNUNET_break (0);
886           reconnect_later (h);
887           return;
888         }
889       cnm = (const struct ConnectNotifyMessage *) msg;
890 #if DEBUG_CORE
891       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
892                   "Received notification about connection from `%s'.\n",
893                   GNUNET_i2s (&cnm->peer));
894 #endif
895       if (0 == memcmp (&h->me,
896                        &cnm->peer,
897                        sizeof (struct GNUNET_PeerIdentity)))
898         {
899           /* disconnect from self!? */
900           GNUNET_break (0);
901           return;
902         }
903       pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
904                                               &cnm->peer.hashPubKey);
905       if (pr != NULL)
906         {
907           GNUNET_break (0);
908           reconnect_later (h);
909           return;
910         }
911       pr = GNUNET_malloc (sizeof (struct PeerRecord));
912       pr->peer = cnm->peer;
913       pr->ch = h;
914       GNUNET_assert (GNUNET_YES ==
915                      GNUNET_CONTAINER_multihashmap_put (h->peers,
916                                                         &cnm->peer.hashPubKey,
917                                                         pr,
918                                                         GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
919       if (NULL != h->connects)
920         h->connects (h->cls,
921                      &cnm->peer,
922                      NULL /* FIXME: atsi! */);
923       break;
924     case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT:
925       if (msize != sizeof (struct DisconnectNotifyMessage))
926         {
927           GNUNET_break (0);
928           reconnect_later (h);
929           return;
930         }
931       dnm = (const struct DisconnectNotifyMessage *) msg;
932       if (0 == memcmp (&h->me,
933                        &dnm->peer,
934                        sizeof (struct GNUNET_PeerIdentity)))
935         {
936           /* connection to self!? */
937           GNUNET_break (0);
938           return;
939         }
940 #if DEBUG_CORE
941       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
942                   "Received notification about disconnect from `%s'.\n",
943                   GNUNET_i2s (&dnm->peer));
944 #endif
945       pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
946                                               &dnm->peer.hashPubKey);
947       if (pr == NULL)
948         {
949           GNUNET_break (0);
950           reconnect_later (h);
951           return;
952         }
953       trigger = ( (pr->prev != NULL) ||
954                   (pr->next != NULL) ||
955                   (h->ready_peer_head == pr) );
956       disconnect_and_free_peer_entry (h, &dnm->peer.hashPubKey, pr);
957       if (trigger)
958         trigger_next_request (h, GNUNET_NO);
959       break;
960     case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_STATUS_CHANGE:
961       if (NULL == h->status_events)
962         {
963           GNUNET_break (0);
964           break;
965         }
966       if (msize != sizeof (struct PeerStatusNotifyMessage))
967         {
968           GNUNET_break (0);
969           reconnect_later (h);
970           return;
971         }
972       psnm = (const struct PeerStatusNotifyMessage *) msg;
973       if (0 == memcmp (&h->me,
974                        &psnm->peer,
975                        sizeof (struct GNUNET_PeerIdentity)))
976         {
977           /* self-change!? */
978           GNUNET_break (0);
979           return;
980         }
981 #if DEBUG_CORE
982       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
983                   "Received notification about status change by `%s'.\n",
984                   GNUNET_i2s (&psnm->peer));
985 #endif
986       pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
987                                               &psnm->peer.hashPubKey);
988       if (pr == NULL)
989         {
990           GNUNET_break (0);
991           reconnect_later (h);
992           return;
993         }
994       h->status_events (h->cls,
995                         &psnm->peer,
996                         psnm->bandwidth_in,
997                         psnm->bandwidth_out,
998                         GNUNET_TIME_absolute_ntoh (psnm->timeout),
999                         NULL /* FIXME: atsi */);
1000       break;
1001     case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND:
1002       if (msize <
1003           sizeof (struct NotifyTrafficMessage) +
1004           sizeof (struct GNUNET_MessageHeader))
1005         {
1006           GNUNET_break (0);
1007           reconnect_later (h);
1008           return;
1009         }
1010       ntm = (const struct NotifyTrafficMessage *) msg;
1011       if (0 == memcmp (&h->me,
1012                        &ntm->peer,
1013                        sizeof (struct GNUNET_PeerIdentity)))
1014         {
1015           /* self-change!? */
1016           GNUNET_break (0);
1017           return;
1018         }
1019       em = (const struct GNUNET_MessageHeader *) &ntm[1];
1020 #if DEBUG_CORE
1021       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1022                   "Received message of type %u and size %u from peer `%4s'\n",
1023                   ntohs (em->type), 
1024                   ntohs (em->size),
1025                   GNUNET_i2s (&ntm->peer));
1026 #endif
1027       pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
1028                                               &ntm->peer.hashPubKey);
1029       if (pr == NULL)
1030         {
1031           GNUNET_break (0);
1032           reconnect_later (h);
1033           return;
1034         }
1035       if ((GNUNET_NO == h->inbound_hdr_only) &&
1036           (msize != ntohs (em->size) + sizeof (struct NotifyTrafficMessage)))
1037         {
1038           GNUNET_break (0);
1039           reconnect_later (h);
1040           return;
1041         }
1042       et = ntohs (em->type);
1043       for (hpos = 0; hpos < h->hcnt; hpos++)
1044         {
1045           mh = &h->handlers[hpos];
1046           if (mh->type != et)
1047             continue;
1048           if ((mh->expected_size != ntohs (em->size)) &&
1049               (mh->expected_size != 0))
1050             {
1051               GNUNET_break (0);
1052               continue;
1053             }
1054           if (GNUNET_OK !=
1055               h->handlers[hpos].callback (h->cls, &ntm->peer, em,
1056                                           NULL /* FIXME: atsi */))
1057             {
1058               /* error in processing, do not process other messages! */
1059               break;
1060             }
1061         }
1062       if (NULL != h->inbound_notify)
1063         h->inbound_notify (h->cls, &ntm->peer, em,
1064                            NULL /* FIXME: atsi */);
1065       break;
1066     case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_OUTBOUND:
1067       if (msize <
1068           sizeof (struct NotifyTrafficMessage) +
1069           sizeof (struct GNUNET_MessageHeader))
1070         {
1071           GNUNET_break (0);
1072           reconnect_later (h);
1073           return;
1074         }
1075       ntm = (const struct NotifyTrafficMessage *) msg;
1076       if (0 == memcmp (&h->me,
1077                        &ntm->peer,
1078                        sizeof (struct GNUNET_PeerIdentity)))
1079         {
1080           /* self-change!? */
1081           GNUNET_break (0);
1082           return;
1083         }
1084       em = (const struct GNUNET_MessageHeader *) &ntm[1];
1085       pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
1086                                               &ntm->peer.hashPubKey);
1087       if (pr == NULL)
1088         {
1089           GNUNET_break (0);
1090           reconnect_later (h);
1091           return;
1092         }
1093 #if DEBUG_CORE
1094       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1095                   "Received notification about transmission to `%s'.\n",
1096                   GNUNET_i2s (&ntm->peer));
1097 #endif
1098       if ((GNUNET_NO == h->outbound_hdr_only) &&
1099           (msize != ntohs (em->size) + sizeof (struct NotifyTrafficMessage)))
1100         {
1101           GNUNET_break (0);
1102           reconnect_later (h);
1103           return;
1104         }
1105       if (NULL == h->outbound_notify)
1106         {
1107           GNUNET_break (0);
1108           break;
1109         }
1110       h->outbound_notify (h->cls, &ntm->peer, em,
1111                           NULL /* FIXME: atsi? */);
1112       break;
1113     case GNUNET_MESSAGE_TYPE_CORE_SEND_READY:
1114       if (msize != sizeof (struct SendMessageReady))
1115         {
1116           GNUNET_break (0);
1117           reconnect_later (h);
1118           return;
1119         }
1120       smr = (const struct SendMessageReady *) msg;
1121       if (0 == memcmp (&h->me,
1122                        &smr->peer,
1123                        sizeof (struct GNUNET_PeerIdentity)))
1124         {
1125           /* self-change!? */
1126           GNUNET_break (0);
1127           return;
1128         }
1129       pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
1130                                               &smr->peer.hashPubKey);
1131       if (pr == NULL)
1132         {
1133           GNUNET_break (0);
1134           reconnect_later (h);
1135           return;
1136         }
1137 #if DEBUG_CORE
1138       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1139                   "Received notification about transmission readiness to `%s'.\n",
1140                   GNUNET_i2s (&smr->peer));
1141 #endif
1142       th = pr->pending_head;
1143       if (ntohs (smr->smr_id) != th->smr_id)
1144         {
1145           /* READY message is for expired or cancelled message,
1146              ignore! (we should have already sent another request) */
1147           break;
1148         }
1149       if ( (pr->prev != NULL) ||
1150            (pr->next != NULL) ||
1151            (h->ready_peer_head == pr) )
1152         {
1153           /* we should not already be on the ready list... */
1154           GNUNET_break (0);
1155           reconnect_later (h);
1156           return;
1157         }
1158       GNUNET_CONTAINER_DLL_insert (h->ready_peer_head,
1159                                    h->ready_peer_tail,
1160                                    pr);
1161       trigger_next_request (h, GNUNET_NO);
1162       break;
1163     case GNUNET_MESSAGE_TYPE_CORE_CONFIGURATION_INFO:
1164       if (ntohs (msg->size) != sizeof (struct ConfigurationInfoMessage))
1165         {
1166           GNUNET_break (0);
1167           reconnect_later (h);
1168           return;
1169         }
1170       cim = (const struct ConfigurationInfoMessage*) msg;
1171       if (0 == memcmp (&h->me,
1172                        &cim->peer,
1173                        sizeof (struct GNUNET_PeerIdentity)))
1174         {
1175           /* self-change!? */
1176           GNUNET_break (0);
1177           return;
1178         }
1179 #if DEBUG_CORE
1180       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1181                   "Received notification about configuration update for `%s'.\n",
1182                   GNUNET_i2s (&cim->peer));
1183 #endif
1184       pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
1185                                               &cim->peer.hashPubKey);
1186       if (pr == NULL)
1187         {
1188           GNUNET_break (0);
1189           reconnect_later (h);
1190           return;
1191         }
1192       if (pr->rim_id != ntohl (cim->rim_id))
1193         break;
1194       pcic = pr->pcic;
1195       pr->pcic = NULL;
1196       if (pcic != NULL)
1197         pcic (pr->pcic_cls,
1198               &pr->peer,
1199               cim->bw_out,
1200               ntohl (cim->reserved_amount),
1201               GNUNET_ntohll (cim->preference));
1202       break;
1203     default:
1204       reconnect_later (h);
1205       return;
1206     }
1207   GNUNET_CLIENT_receive (h->client,
1208                          &main_notify_handler, h, 
1209                          GNUNET_TIME_UNIT_FOREVER_REL);
1210 }
1211
1212
1213 /**
1214  * Task executed once we are done transmitting the INIT message.
1215  * Starts our 'receive' loop.
1216  *
1217  * @param cls the 'struct GNUNET_CORE_Handle'
1218  * @param tc task context
1219  */
1220 static void
1221 init_done_task (void *cls, 
1222                 const struct GNUNET_SCHEDULER_TaskContext *tc)
1223 {
1224   struct GNUNET_CORE_Handle *h = cls;
1225
1226   if (tc == NULL)
1227     return; /* error */
1228   if (0 == (tc->reason & GNUNET_SCHEDULER_REASON_PREREQ_DONE))
1229     {
1230 #if DEBUG_CORE
1231       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1232                   "Failed to exchange INIT with core, retrying\n");
1233 #endif
1234       reconnect_later (h);
1235       return;
1236     }
1237   GNUNET_CLIENT_receive (h->client,
1238                          &main_notify_handler, 
1239                          h, 
1240                          GNUNET_TIME_UNIT_FOREVER_REL);
1241 }
1242
1243
1244 /**
1245  * Our current client connection went down.  Clean it up
1246  * and try to reconnect!
1247  *
1248  * @param h our handle to the core service
1249  */
1250 static void
1251 reconnect (struct GNUNET_CORE_Handle *h)
1252 {
1253   struct ControlMessage *cm;
1254   struct InitMessage *init;
1255   uint32_t opt;
1256   uint16_t msize;
1257   uint16_t *ts;
1258   unsigned int hpos;
1259
1260 #if DEBUG_CORE
1261   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1262               "Reconnecting to CORE service\n");
1263 #endif
1264   GNUNET_assert (h->client == NULL);
1265   GNUNET_assert (h->currently_down == GNUNET_YES);
1266   h->client = GNUNET_CLIENT_connect ("core", h->cfg);
1267   if (h->client == NULL)
1268     {
1269       reconnect_later (h);
1270       return;
1271     }
1272   msize = h->hcnt * sizeof (uint16_t) + sizeof (struct InitMessage);
1273   cm = GNUNET_malloc (sizeof (struct ControlMessage) +
1274                       msize);
1275   cm->cont = &init_done_task;
1276   cm->cont_cls = h;
1277   init = (struct InitMessage*) &cm[1];
1278   init->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_INIT);
1279   init->header.size = htons (msize);
1280   opt = GNUNET_CORE_OPTION_SEND_CONNECT | GNUNET_CORE_OPTION_SEND_DISCONNECT;
1281   if (h->status_events != NULL)
1282     opt |= GNUNET_CORE_OPTION_SEND_STATUS_CHANGE;
1283   if (h->inbound_notify != NULL)
1284     {
1285       if (h->inbound_hdr_only)
1286         opt |= GNUNET_CORE_OPTION_SEND_HDR_INBOUND;
1287       else
1288         opt |= GNUNET_CORE_OPTION_SEND_FULL_INBOUND;
1289     }
1290   if (h->outbound_notify != NULL)
1291     {
1292       if (h->outbound_hdr_only)
1293         opt |= GNUNET_CORE_OPTION_SEND_HDR_OUTBOUND;
1294       else
1295         opt |= GNUNET_CORE_OPTION_SEND_FULL_OUTBOUND;
1296     }
1297   init->options = htonl (opt);
1298   ts = (uint16_t *) &init[1];
1299   for (hpos = 0; hpos < h->hcnt; hpos++)
1300     ts[hpos] = htons (h->handlers[hpos].type);
1301   GNUNET_CONTAINER_DLL_insert (h->pending_head,
1302                                h->pending_tail,
1303                                cm);
1304   trigger_next_request (h, GNUNET_YES);
1305 }
1306
1307
1308
1309 /**
1310  * Connect to the core service.  Note that the connection may
1311  * complete (or fail) asynchronously.
1312  *
1313  * @param cfg configuration to use
1314  * @param queue_size size of the per-peer message queue
1315  * @param cls closure for the various callbacks that follow (including handlers in the handlers array)
1316  * @param init callback to call on timeout or once we have successfully
1317  *        connected to the core service; note that timeout is only meaningful if init is not NULL
1318  * @param connects function to call on peer connect, can be NULL
1319  * @param disconnects function to call on peer disconnect / timeout, can be NULL
1320  * @param status_events function to call on changes to peer connection status, can be NULL
1321  * @param inbound_notify function to call for all inbound messages, can be NULL
1322  * @param inbound_hdr_only set to GNUNET_YES if inbound_notify will only read the
1323  *                GNUNET_MessageHeader and hence we do not need to give it the full message;
1324  *                can be used to improve efficiency, ignored if inbound_notify is NULLL
1325  * @param outbound_notify function to call for all outbound messages, can be NULL
1326  * @param outbound_hdr_only set to GNUNET_YES if outbound_notify will only read the
1327  *                GNUNET_MessageHeader and hence we do not need to give it the full message
1328  *                can be used to improve efficiency, ignored if outbound_notify is NULLL
1329  * @param handlers callbacks for messages we care about, NULL-terminated
1330  * @return handle to the core service (only useful for disconnect until 'init' is called);
1331  *                NULL on error (in this case, init is never called)
1332  */
1333 struct GNUNET_CORE_Handle *
1334 GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
1335                      unsigned int queue_size,
1336                      void *cls,
1337                      GNUNET_CORE_StartupCallback init,
1338                      GNUNET_CORE_ConnectEventHandler connects,
1339                      GNUNET_CORE_DisconnectEventHandler disconnects,
1340                      GNUNET_CORE_PeerStatusEventHandler status_events,
1341                      GNUNET_CORE_MessageCallback inbound_notify,
1342                      int inbound_hdr_only,
1343                      GNUNET_CORE_MessageCallback outbound_notify,
1344                      int outbound_hdr_only,
1345                      const struct GNUNET_CORE_MessageHandler *handlers)
1346 {
1347   struct GNUNET_CORE_Handle *h;
1348
1349   h = GNUNET_malloc (sizeof (struct GNUNET_CORE_Handle));
1350   h->cfg = cfg;
1351   h->queue_size = queue_size;
1352   h->cls = cls;
1353   h->init = init;
1354   h->connects = connects;
1355   h->disconnects = disconnects;
1356   h->status_events = status_events;
1357   h->inbound_notify = inbound_notify;
1358   h->outbound_notify = outbound_notify;
1359   h->inbound_hdr_only = inbound_hdr_only;
1360   h->outbound_hdr_only = outbound_hdr_only;
1361   h->handlers = handlers;
1362   h->hcnt = 0;
1363   h->currently_down = GNUNET_YES;
1364   h->peers = GNUNET_CONTAINER_multihashmap_create (128);
1365   h->retry_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
1366   while (handlers[h->hcnt].callback != NULL)
1367     h->hcnt++;
1368   GNUNET_assert (h->hcnt <
1369                  (GNUNET_SERVER_MAX_MESSAGE_SIZE -
1370                   sizeof (struct InitMessage)) / sizeof (uint16_t));
1371 #if DEBUG_CORE
1372   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1373               "Connecting to CORE service\n");
1374 #endif
1375   reconnect (h);
1376   return h;
1377 }
1378
1379
1380 /**
1381  * Disconnect from the core service.  This function can only 
1382  * be called *after* all pending 'GNUNET_CORE_notify_transmit_ready'
1383  * requests have been explicitly cancelled.
1384  *
1385  * @param handle connection to core to disconnect
1386  */
1387 void
1388 GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle)
1389 {
1390   struct ControlMessage *cm;
1391   
1392 #if DEBUG_CORE
1393   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1394               "Disconnecting from CORE service\n");
1395 #endif
1396   if (handle->cth != NULL)
1397     {
1398       GNUNET_CLIENT_notify_transmit_ready_cancel (handle->cth);
1399       handle->cth = NULL;
1400     }
1401   if (handle->client != NULL)
1402     {
1403       GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
1404       handle->client = NULL;
1405     }
1406   if (handle->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
1407     {
1408       GNUNET_SCHEDULER_cancel (handle->reconnect_task);
1409       handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
1410     }
1411   while (NULL != (cm = handle->pending_head))
1412     {
1413       GNUNET_CONTAINER_DLL_remove (handle->pending_head,
1414                                    handle->pending_tail,
1415                                    cm);
1416       if (cm->th != NULL)
1417         cm->th->cm = NULL;
1418       cm->cont (cm->cont_cls, NULL);
1419       GNUNET_free (cm);
1420     }
1421   GNUNET_CONTAINER_multihashmap_iterate (handle->peers,
1422                                          &disconnect_and_free_peer_entry,
1423                                          handle);
1424   GNUNET_CONTAINER_multihashmap_destroy (handle->peers);
1425   GNUNET_break (handle->ready_peer_head == NULL);
1426   GNUNET_free (handle);
1427 }
1428
1429
1430 /**
1431  * Ask the core to call "notify" once it is ready to transmit the
1432  * given number of bytes to the specified "target".  If we are not yet
1433  * connected to the specified peer, a call to this function will cause
1434  * us to try to establish a connection.
1435  *
1436  * @param handle connection to core service
1437  * @param priority how important is the message?
1438  * @param maxdelay how long can the message wait?
1439  * @param target who should receive the message,
1440  *        use NULL for this peer (loopback)
1441  * @param notify_size how many bytes of buffer space does notify want?
1442  * @param notify function to call when buffer space is available
1443  * @param notify_cls closure for notify
1444  * @return non-NULL if the notify callback was queued,
1445  *         NULL if we can not even queue the request (insufficient
1446  *         memory); if NULL is returned, "notify" will NOT be called.
1447  */
1448 struct GNUNET_CORE_TransmitHandle *
1449 GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle,
1450                                    uint32_t priority,
1451                                    struct GNUNET_TIME_Relative maxdelay,
1452                                    const struct GNUNET_PeerIdentity *target,
1453                                    size_t notify_size,
1454                                    GNUNET_CONNECTION_TransmitReadyNotify notify,
1455                                    void *notify_cls)
1456 {
1457   struct PeerRecord *pr;
1458   struct GNUNET_CORE_TransmitHandle *th;
1459   struct GNUNET_CORE_TransmitHandle *pos;
1460   struct GNUNET_CORE_TransmitHandle *prev;
1461   struct GNUNET_CORE_TransmitHandle *minp;
1462
1463   pr = GNUNET_CONTAINER_multihashmap_get (handle->peers,
1464                                           &target->hashPubKey);
1465   if (NULL == pr)
1466     {
1467       /* attempt to send to peer that is not connected */
1468       GNUNET_log(GNUNET_ERROR_TYPE_WARNING,
1469                  "Attempting to send to peer `%s' from peer `%s', but not connected!\n",
1470                  GNUNET_i2s(target), GNUNET_h2s(&handle->me.hashPubKey));
1471       GNUNET_break (0);
1472       return NULL;
1473     }
1474   GNUNET_assert (notify_size + sizeof (struct SendMessage) <
1475                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
1476   th = GNUNET_malloc (sizeof (struct GNUNET_CORE_TransmitHandle));
1477   th->peer = pr;
1478   th->get_message = notify;
1479   th->get_message_cls = notify_cls;
1480   th->timeout = GNUNET_TIME_relative_to_absolute (maxdelay);
1481   th->priority = priority;
1482   th->msize = notify_size;
1483   /* bound queue size */
1484   if (pr->queue_size == handle->queue_size)
1485     {
1486       /* find lowest-priority entry */
1487       minp = pr->pending_head;
1488       prev = minp->next;
1489       while (prev != NULL)
1490         {
1491           if (prev->priority < minp->priority)
1492             minp = prev;
1493           prev = prev->next;
1494         }
1495       if (minp == NULL) 
1496         {
1497           GNUNET_break (handle->queue_size != 0);
1498           GNUNET_break (pr->queue_size == 0);
1499           return NULL;
1500         }
1501       if (priority <= minp->priority)
1502         {
1503 #if DEBUG_CORE
1504           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1505                       "Dropping transmission request: priority too low\n");
1506 #endif
1507           return NULL; /* priority too low */
1508         }
1509       GNUNET_CONTAINER_DLL_remove (pr->pending_head,
1510                                    pr->pending_tail,
1511                                    minp);
1512       pr->queue_size--;
1513       GNUNET_assert (0 ==
1514                      minp->get_message (minp->get_message_cls,
1515                                         0, NULL));
1516       GNUNET_free (minp);
1517     }
1518
1519   /* Order entries by deadline, but SKIP 'HEAD' if
1520      we're in the 'ready_peer_*' DLL */
1521   pos = pr->pending_head;
1522   if ( (pr->prev != NULL) ||
1523        (pr->next != NULL) ||
1524        (pr == handle->ready_peer_head) )
1525     {
1526       GNUNET_assert (pos != NULL);
1527       pos = pos->next; /* skip head */
1528     }
1529
1530   /* insertion sort */
1531   prev = pos;
1532   while ( (pos != NULL) &&
1533           (pos->timeout.abs_value < th->timeout.abs_value) )      
1534     {
1535       prev = pos;
1536       pos = pos->next;
1537     }
1538   GNUNET_CONTAINER_DLL_insert_after (pr->pending_head,
1539                                      pr->pending_tail,
1540                                      prev,
1541                                      th);
1542   pr->queue_size++;
1543   /* was the request queue previously empty? */
1544 #if DEBUG_CORE
1545   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1546               "Transmission request added to queue\n");
1547 #endif
1548   if (pr->pending_head == th) 
1549     request_next_transmission (pr);
1550   return th;
1551 }
1552
1553
1554 /**
1555  * Cancel the specified transmission-ready notification.
1556  *
1557  * @param th handle that was returned by "notify_transmit_ready".
1558  */
1559 void
1560 GNUNET_CORE_notify_transmit_ready_cancel (struct GNUNET_CORE_TransmitHandle
1561                                           *th)
1562 {
1563   struct PeerRecord *pr = th->peer;
1564   struct GNUNET_CORE_Handle *h = pr->ch;
1565   int was_head;
1566   
1567   was_head = (pr->pending_head == th);
1568   GNUNET_CONTAINER_DLL_remove (pr->pending_head,
1569                                pr->pending_tail,
1570                                th);    
1571   pr->queue_size--;
1572   if (th->cm != NULL)
1573     {
1574       /* we're currently in the control queue, remove */
1575       GNUNET_CONTAINER_DLL_remove (h->pending_head,
1576                                    h->pending_tail,
1577                                    th->cm);
1578       GNUNET_free (th->cm);      
1579     }
1580   GNUNET_free (th);
1581   if (was_head)
1582     {
1583       if ( (pr->prev != NULL) ||
1584            (pr->next != NULL) ||
1585            (pr == h->ready_peer_head) )
1586         {
1587           /* the request that was 'approved' by core was
1588              cancelled before it could be transmitted; remove
1589              us from the 'ready' list */
1590           GNUNET_CONTAINER_DLL_remove (h->ready_peer_head,
1591                                        h->ready_peer_tail,
1592                                        pr);
1593         }
1594       request_next_transmission (pr);
1595     }
1596 }
1597
1598
1599 /* ****************** GNUNET_CORE_peer_request_connect ******************** */
1600
1601 /**
1602  * Handle for a request to the core to connect to
1603  * a particular peer.  Can be used to cancel the request
1604  * (before the 'cont'inuation is called).
1605  */
1606 struct GNUNET_CORE_PeerRequestHandle
1607 {
1608
1609   /**
1610    * Link to control message.
1611    */
1612   struct ControlMessage *cm;
1613
1614   /**
1615    * Core handle used.
1616    */
1617   struct GNUNET_CORE_Handle *h;
1618
1619   /**
1620    * Continuation to run when done.
1621    */
1622   GNUNET_SCHEDULER_Task cont;
1623
1624   /**
1625    * Closure for 'cont'.
1626    */
1627   void *cont_cls;
1628
1629 };
1630
1631
1632 /**
1633  * Continuation called when the control message was transmitted.
1634  * Calls the original continuation and frees the remaining
1635  * resources.
1636  *
1637  * @param cls the 'struct GNUNET_CORE_PeerRequestHandle'
1638  * @param tc scheduler context
1639  */
1640 static void
1641 peer_request_connect_cont (void *cls,
1642                            const struct GNUNET_SCHEDULER_TaskContext *tc)
1643 {
1644   struct GNUNET_CORE_PeerRequestHandle *ret = cls;
1645   
1646   if (ret->cont != NULL)
1647     {
1648       if (tc == NULL)
1649         GNUNET_SCHEDULER_add_now (ret->cont,
1650                                   ret->cont_cls);
1651       else
1652         ret->cont (ret->cont_cls, tc);
1653     }
1654   GNUNET_free (ret);
1655 }
1656
1657
1658 /**
1659  * Request that the core should try to connect to a particular peer.
1660  * Once the request has been transmitted to the core, the continuation
1661  * function will be called.  Note that this does NOT mean that a
1662  * connection was successfully established -- it only means that the
1663  * core will now try.  Successful establishment of the connection
1664  * will be signalled to the 'connects' callback argument of
1665  * 'GNUNET_CORE_connect' only.  If the core service does not respond
1666  * to our connection attempt within the given time frame, 'cont' will
1667  * be called with the TIMEOUT reason code.
1668  *
1669  * @param h core handle
1670  * @param timeout how long to try to talk to core
1671  * @param peer who should we connect to
1672  * @param cont function to call once the request has been completed (or timed out)
1673  * @param cont_cls closure for cont
1674  * @return NULL on error (cont will not be called), otherwise handle for cancellation
1675  */
1676 struct GNUNET_CORE_PeerRequestHandle *
1677 GNUNET_CORE_peer_request_connect (struct GNUNET_CORE_Handle *h,
1678                                   struct GNUNET_TIME_Relative timeout,
1679                                   const struct GNUNET_PeerIdentity * peer,
1680                                   GNUNET_SCHEDULER_Task cont,
1681                                   void *cont_cls)
1682 {
1683   struct GNUNET_CORE_PeerRequestHandle *ret;
1684   struct ControlMessage *cm;
1685   struct ConnectMessage *msg;
1686   
1687   cm = GNUNET_malloc (sizeof (struct ControlMessage) + 
1688                       sizeof (struct ConnectMessage));
1689   msg = (struct ConnectMessage*) &cm[1];
1690   msg->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_REQUEST_CONNECT);
1691   msg->header.size = htons (sizeof (struct ConnectMessage));
1692   msg->reserved = htonl (0);
1693   msg->timeout = GNUNET_TIME_relative_hton (timeout);
1694   msg->peer = *peer;
1695   GNUNET_CONTAINER_DLL_insert (h->pending_head,
1696                                h->pending_tail,
1697                                cm);
1698   ret = GNUNET_malloc (sizeof (struct GNUNET_CORE_PeerRequestHandle));
1699   ret->h = h;
1700   ret->cm = cm;
1701   ret->cont = cont;
1702   ret->cont_cls = cont_cls;
1703   cm->cont = &peer_request_connect_cont;
1704   cm->cont_cls = ret;
1705 #if DEBUG_CORE
1706   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1707               "Queueing REQUEST_CONNECT request\n");
1708 #endif
1709   if (h->pending_head == cm)
1710     trigger_next_request (h, GNUNET_NO);
1711   return ret;
1712 }
1713
1714
1715 /**
1716  * Cancel a pending request to connect to a particular peer.  Must not
1717  * be called after the 'cont' function was invoked.
1718  *
1719  * @param req request handle that was returned for the original request
1720  */
1721 void
1722 GNUNET_CORE_peer_request_connect_cancel (struct GNUNET_CORE_PeerRequestHandle *req)
1723 {
1724   struct GNUNET_CORE_Handle *h = req->h;
1725   struct ControlMessage *cm = req->cm;
1726
1727   GNUNET_CONTAINER_DLL_remove (h->pending_head,
1728                                h->pending_tail,
1729                                cm);
1730   GNUNET_free (cm);
1731   GNUNET_free (req);
1732 }
1733
1734
1735 /* ****************** GNUNET_CORE_peer_change_preference ******************** */
1736
1737
1738 struct GNUNET_CORE_InformationRequestContext 
1739 {
1740   
1741   /**
1742    * Our connection to the service.
1743    */
1744   struct GNUNET_CORE_Handle *h;
1745
1746   /**
1747    * Function to call with the information.
1748    */
1749   GNUNET_CORE_PeerConfigurationInfoCallback info;
1750
1751   /**
1752    * Closure for info.
1753    */
1754   void *info_cls;
1755
1756   /**
1757    * Link to control message, NULL if CM was sent.
1758    */ 
1759   struct ControlMessage *cm;
1760
1761   /**
1762    * Link to peer record.
1763    */
1764   struct PeerRecord *pr;
1765 };
1766
1767
1768 /**
1769  * CM was sent, remove link so we don't double-free.
1770  *
1771  * @param cls the 'struct GNUNET_CORE_InformationRequestContext'
1772  * @param tc scheduler context
1773  */
1774 static void
1775 change_preference_send_continuation (void *cls,
1776                                      const struct GNUNET_SCHEDULER_TaskContext *tc)
1777 {
1778   struct GNUNET_CORE_InformationRequestContext *irc = cls;
1779
1780   irc->cm = NULL;
1781 }
1782
1783
1784 /**
1785  * Obtain statistics and/or change preferences for the given peer.
1786  *
1787  * @param h core handle
1788  * @param peer identifies the peer
1789  * @param timeout after how long should we give up (and call "info" with NULL
1790  *                for "peer" to signal an error)?
1791  * @param bw_out set to the current bandwidth limit (sending) for this peer,
1792  *                caller should set "bw_out" to "-1" to avoid changing
1793  *                the current value; otherwise "bw_out" will be lowered to
1794  *                the specified value; passing a pointer to "0" can be used to force
1795  *                us to disconnect from the peer; "bw_out" might not increase
1796  *                as specified since the upper bound is generally
1797  *                determined by the other peer!
1798  * @param amount reserve N bytes for receiving, negative
1799  *                amounts can be used to undo a (recent) reservation;
1800  * @param preference increase incoming traffic share preference by this amount;
1801  *                in the absence of "amount" reservations, we use this
1802  *                preference value to assign proportional bandwidth shares
1803  *                to all connected peers
1804  * @param info function to call with the resulting configuration information
1805  * @param info_cls closure for info
1806  * @return NULL on error
1807  */
1808 struct GNUNET_CORE_InformationRequestContext *
1809 GNUNET_CORE_peer_change_preference (struct GNUNET_CORE_Handle *h,
1810                                     const struct GNUNET_PeerIdentity *peer,
1811                                     struct GNUNET_TIME_Relative timeout,
1812                                     struct GNUNET_BANDWIDTH_Value32NBO bw_out,
1813                                     int32_t amount,
1814                                     uint64_t preference,
1815                                     GNUNET_CORE_PeerConfigurationInfoCallback info,
1816                                     void *info_cls)
1817 {
1818   struct GNUNET_CORE_InformationRequestContext *irc;
1819   struct PeerRecord *pr;
1820   struct RequestInfoMessage *rim;
1821   struct ControlMessage *cm;
1822
1823   pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
1824                                           &peer->hashPubKey);
1825   if (NULL == pr)
1826     {
1827       /* attempt to change preference on peer that is not connected */
1828       GNUNET_break (0);
1829       return NULL;
1830     }
1831   if (pr->pcic != NULL)
1832     {
1833       /* second change before first one is done */
1834       GNUNET_break (0);
1835       return NULL;
1836     }
1837   irc = GNUNET_malloc (sizeof (struct GNUNET_CORE_InformationRequestContext));
1838   irc->h = h;
1839   irc->pr = pr;
1840   irc->info = info;
1841   irc->info_cls = info_cls;
1842   cm = GNUNET_malloc (sizeof (struct ControlMessage) +
1843                       sizeof (struct RequestInfoMessage));
1844   cm->cont = &change_preference_send_continuation;
1845   cm->cont_cls = irc;
1846   irc->cm = cm;
1847   rim = (struct RequestInfoMessage*) &cm[1];
1848   rim->header.size = htons (sizeof (struct RequestInfoMessage));
1849   rim->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_REQUEST_INFO);
1850   rim->rim_id = htonl (pr->rim_id = h->rim_id_gen++);
1851   rim->limit_outbound = bw_out;
1852   rim->reserve_inbound = htonl (amount);
1853   rim->preference_change = GNUNET_htonll(preference);
1854   rim->peer = *peer;
1855 #if DEBUG_CORE
1856   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1857               "Queueing CHANGE PREFERENCE request\n");
1858 #endif
1859   GNUNET_CONTAINER_DLL_insert (h->pending_head,
1860                                h->pending_tail,
1861                                cm); 
1862   pr->pcic = info;
1863   pr->pcic_cls = info_cls;
1864   if (h->pending_head == cm)
1865     trigger_next_request (h, GNUNET_NO);
1866   return irc;
1867 }
1868
1869
1870 /**
1871  * Cancel request for getting information about a peer.
1872  * Note that an eventual change in preference, trust or bandwidth
1873  * assignment MAY have already been committed at the time, 
1874  * so cancelling a request is NOT sure to undo the original
1875  * request.  The original request may or may not still commit.
1876  * The only thing cancellation ensures is that the callback
1877  * from the original request will no longer be called.
1878  *
1879  * @param irc context returned by the original GNUNET_CORE_peer_get_info call
1880  */
1881 void
1882 GNUNET_CORE_peer_change_preference_cancel (struct GNUNET_CORE_InformationRequestContext *irc)
1883 {
1884   struct GNUNET_CORE_Handle *h = irc->h;
1885   struct PeerRecord *pr = irc->pr;
1886
1887   if (irc->cm != NULL)
1888     {
1889       GNUNET_CONTAINER_DLL_remove (h->pending_head,
1890                                    h->pending_tail,
1891                                    irc->cm);
1892       GNUNET_free (irc->cm);
1893     }
1894   pr->pcic = NULL;
1895   pr->pcic_cls = NULL;
1896   GNUNET_free (irc);
1897 }
1898
1899
1900 /* end of core_api.c */