more checks
[oweals/gnunet.git] / src / core / core_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file core/core_api.c
23  * @brief core service; this is the main API for encrypted P2P
24  *        communications
25  * @author Christian Grothoff
26  *
27  * TODO:
28  * - implement atsi parsing and passing
29  */
30 #include "platform.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_core_service.h"
33 #include "core.h"
34
35
36 /**
37  * Information we track for each peer.
38  */
39 struct PeerRecord
40 {
41
42   /**
43    * We generally do NOT keep peer records in a DLL; this
44    * DLL is only used IF this peer's 'pending_head' message
45    * is ready for transmission.  
46    */
47   struct PeerRecord *prev;
48
49   /**
50    * We generally do NOT keep peer records in a DLL; this
51    * DLL is only used IF this peer's 'pending_head' message
52    * is ready for transmission. 
53    */
54   struct PeerRecord *next;
55
56   /**
57    * Peer the record is about.
58    */
59   struct GNUNET_PeerIdentity peer;
60
61   /**
62    * Corresponding core handle.
63    */
64   struct GNUNET_CORE_Handle *ch;
65
66   /**
67    * Head of doubly-linked list of pending requests.
68    * Requests are sorted by deadline *except* for HEAD,
69    * which is only modified upon transmission to core.
70    */
71   struct GNUNET_CORE_TransmitHandle *pending_head;
72
73   /**
74    * Tail of doubly-linked list of pending requests.
75    */
76   struct GNUNET_CORE_TransmitHandle *pending_tail;
77
78   /**
79    * Pending callback waiting for peer information, or NULL for none.
80    */
81   GNUNET_CORE_PeerConfigurationInfoCallback pcic;
82
83   /**
84    * Closure for pcic.
85    */
86   void *pcic_cls;
87
88   /**
89    * Request information ID for the given pcic (needed in case a
90    * request is cancelled after being submitted to core and a new
91    * one is generated; in this case, we need to avoid matching the
92    * reply to the first (cancelled) request to the second request).
93    */
94   uint32_t rim_id;
95
96   /**
97    * ID of timeout task for the 'pending_head' handle
98    * which is the one with the smallest timeout. 
99    */
100   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
101
102   /**
103    * Current size of the queue of pending requests.
104    */
105   unsigned int queue_size;
106
107   /**
108    * SendMessageRequest ID generator for this peer.
109    */
110   uint16_t smr_id_gen;
111   
112 };
113
114
115 /**
116  * Entry in a doubly-linked list of control messages to be transmitted
117  * to the core service.  Control messages include traffic allocation,
118  * connection requests and of course our initial 'init' request.
119  * 
120  * The actual message is allocated at the end of this struct.
121  */
122 struct ControlMessage
123 {
124   /**
125    * This is a doubly-linked list.
126    */
127   struct ControlMessage *next;
128
129   /**
130    * This is a doubly-linked list.
131    */
132   struct ControlMessage *prev;
133
134   /**
135    * Function to run after successful transmission (or call with
136    * reason 'TIMEOUT' on error); called with scheduler context 'NULL'
137    * on disconnect.
138    */
139   GNUNET_SCHEDULER_Task cont;
140
141   /**
142    * Closure for 'cont'.
143    */
144   void *cont_cls;
145
146 };
147
148
149
150 /**
151  * Context for the core service connection.
152  */
153 struct GNUNET_CORE_Handle
154 {
155
156   /**
157    * Configuration we're using.
158    */
159   const struct GNUNET_CONFIGURATION_Handle *cfg;
160
161   /**
162    * Closure for the various callbacks.
163    */
164   void *cls;
165
166   /**
167    * Function to call once we've handshaked with the core service.
168    */
169   GNUNET_CORE_StartupCallback init;
170
171   /**
172    * Function to call whenever we're notified about a peer connecting.
173    */
174   GNUNET_CORE_ConnectEventHandler connects;
175
176   /**
177    * Function to call whenever we're notified about a peer disconnecting.
178    */
179   GNUNET_CORE_DisconnectEventHandler disconnects;
180
181   /**
182    * Function to call whenever we're notified about a peer changing status.
183    */  
184   GNUNET_CORE_PeerStatusEventHandler status_events;
185   
186   /**
187    * Function to call whenever we receive an inbound message.
188    */
189   GNUNET_CORE_MessageCallback inbound_notify;
190
191   /**
192    * Function to call whenever we receive an outbound message.
193    */
194   GNUNET_CORE_MessageCallback outbound_notify;
195
196   /**
197    * Function handlers for messages of particular type.
198    */
199   const struct GNUNET_CORE_MessageHandler *handlers;
200
201   /**
202    * Our connection to the service.
203    */
204   struct GNUNET_CLIENT_Connection *client;
205
206   /**
207    * Handle for our current transmission request.
208    */
209   struct GNUNET_CLIENT_TransmitHandle *cth;
210
211   /**
212    * Head of doubly-linked list of pending requests.
213    */
214   struct ControlMessage *pending_head;
215
216   /**
217    * Tail of doubly-linked list of pending requests.
218    */
219   struct ControlMessage *pending_tail;
220
221   /**
222    * Head of doubly-linked list of peers that are core-approved
223    * to send their next message.
224    */
225   struct PeerRecord *ready_peer_head;
226
227   /**
228    * Tail of doubly-linked list of peers that are core-approved
229    * to send their next message.
230    */
231   struct PeerRecord *ready_peer_tail;
232
233   /**
234    * Hash map listing all of the peers that we are currently
235    * connected to.
236    */
237   struct GNUNET_CONTAINER_MultiHashMap *peers;
238
239   /**
240    * Identity of this peer.
241    */
242   struct GNUNET_PeerIdentity me;
243
244   /**
245    * ID of reconnect task (if any).
246    */
247   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
248
249   /**
250    * Current delay we use for re-trying to connect to core.
251    */
252   struct GNUNET_TIME_Relative retry_backoff;
253
254   /**
255    * Request information ID generator.
256    */
257   uint32_t rim_id_gen;
258
259   /**
260    * Number of messages we are allowed to queue per target.
261    */
262   unsigned int queue_size;
263
264   /**
265    * Number of entries in the handlers array.
266    */
267   unsigned int hcnt;
268
269   /**
270    * For inbound notifications without a specific handler, do
271    * we expect to only receive headers?
272    */
273   int inbound_hdr_only;
274
275   /**
276    * For outbound notifications without a specific handler, do
277    * we expect to only receive headers?
278    */
279   int outbound_hdr_only;
280
281   /**
282    * Are we currently disconnected and hence unable to forward
283    * requests?
284    */
285   int currently_down;
286
287 };
288
289
290 /**
291  * Handle for a transmission request.
292  */
293 struct GNUNET_CORE_TransmitHandle
294 {
295
296   /**
297    * We keep active transmit handles in a doubly-linked list.
298    */
299   struct GNUNET_CORE_TransmitHandle *next;
300
301   /**
302    * We keep active transmit handles in a doubly-linked list.
303    */
304   struct GNUNET_CORE_TransmitHandle *prev;
305
306   /**
307    * Corresponding peer record.
308    */
309   struct PeerRecord *peer;
310
311   /**
312    * Corresponding SEND_REQUEST message.  Only non-NULL 
313    * while SEND_REQUEST message is pending.
314    */
315   struct ControlMessage *cm;
316
317   /**
318    * Function that will be called to get the actual request
319    * (once we are ready to transmit this request to the core).
320    * The function will be called with a NULL buffer to signal
321    * timeout.
322    */
323   GNUNET_CONNECTION_TransmitReadyNotify get_message;
324
325   /**
326    * Closure for get_message.
327    */
328   void *get_message_cls;
329
330   /**
331    * Timeout for this handle.
332    */
333   struct GNUNET_TIME_Absolute timeout;
334
335   /**
336    * How important is this message?
337    */
338   uint32_t priority;
339
340   /**
341    * Size of this request.
342    */
343   uint16_t msize;
344
345   /**
346    * Send message request ID for this request.
347    */
348   uint16_t smr_id;
349
350 };
351
352
353 /**
354  * Our current client connection went down.  Clean it up
355  * and try to reconnect!
356  *
357  * @param h our handle to the core service
358  */
359 static void
360 reconnect (struct GNUNET_CORE_Handle *h);
361
362
363 /**
364  * Task schedule to try to re-connect to core.
365  *
366  * @param cls the 'struct GNUNET_CORE_Handle'
367  * @param tc task context
368  */
369 static void
370 reconnect_task (void *cls, 
371                 const struct GNUNET_SCHEDULER_TaskContext *tc)
372 {
373   struct GNUNET_CORE_Handle *h = cls;
374
375   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
376 #if DEBUG_CORE
377   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
378               "Connecting to CORE service after delay\n");
379 #endif
380   reconnect (h);
381 }
382
383
384 /**
385  * Notify clients about disconnect and free 
386  * the entry for connected peer.
387  *
388  * @param cls the 'struct GNUNET_CORE_Handle*'
389  * @param key the peer identity (not used)
390  * @param value the 'struct PeerRecord' to free.
391  * @return GNUNET_YES (continue)
392  */
393 static int
394 disconnect_and_free_peer_entry (void *cls,
395                                 const GNUNET_HashCode *key,
396                                 void *value)
397 {
398   static struct GNUNET_BANDWIDTH_Value32NBO zero;
399   struct GNUNET_CORE_Handle *h = cls;
400   struct GNUNET_CORE_TransmitHandle *th;
401   struct PeerRecord *pr = value;
402   GNUNET_CORE_PeerConfigurationInfoCallback pcic;
403
404   while (NULL != (th = pr->pending_head))
405     {
406       GNUNET_CONTAINER_DLL_remove (pr->pending_head,
407                                    pr->pending_tail,
408                                    th);
409       pr->queue_size--;
410       GNUNET_assert (0 == 
411                      th->get_message (th->get_message_cls,
412                                       0, NULL));
413       GNUNET_free (th);
414     }
415   if (NULL != (pcic = pr->pcic))
416     {
417       pr->pcic = NULL;
418       pcic (pr->pcic_cls,
419             &pr->peer,
420             zero,
421             0, 0);
422     }
423   if (pr->timeout_task != GNUNET_SCHEDULER_NO_TASK)
424     {
425       GNUNET_SCHEDULER_cancel (pr->timeout_task);
426       pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
427     }
428   GNUNET_assert (pr->queue_size == 0);
429   if ( (pr->prev != NULL) ||
430        (pr->next != NULL) ||
431        (h->ready_peer_head == pr) )
432     GNUNET_CONTAINER_DLL_remove (h->ready_peer_head,
433                                  h->ready_peer_tail,
434                                  pr);
435   if (h->disconnects != NULL)
436     h->disconnects (h->cls,
437                     &pr->peer);    
438   GNUNET_assert (GNUNET_YES ==
439                  GNUNET_CONTAINER_multihashmap_remove (h->peers,
440                                                        key,
441                                                        pr));
442   GNUNET_assert (pr->pending_head == NULL);
443   GNUNET_assert (pr->pending_tail == NULL);
444   GNUNET_assert (pr->ch = h);
445   GNUNET_assert (pr->queue_size == 0);
446   GNUNET_assert (pr->timeout_task == GNUNET_SCHEDULER_NO_TASK);
447   GNUNET_free (pr);  
448   return GNUNET_YES;
449 }
450
451
452 /**
453  * Close down any existing connection to the CORE service and
454  * try re-establishing it later.
455  *
456  * @param h our handle
457  */
458 static void
459 reconnect_later (struct GNUNET_CORE_Handle *h)
460 {
461   struct ControlMessage *cm;
462
463   if (h->client != NULL)
464     {
465       GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
466       h->client = NULL;
467       GNUNET_CONTAINER_multihashmap_iterate (h->peers,
468                                              &disconnect_and_free_peer_entry,
469                                              h);
470     }
471   h->currently_down = GNUNET_YES;
472   while (NULL != (cm = h->pending_head))
473     {
474       GNUNET_CONTAINER_DLL_remove (h->pending_head,
475                                    h->pending_tail,
476                                    cm);
477       cm->cont (cm->cont_cls, NULL);
478       GNUNET_free (cm);
479     }
480   GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
481   h->retry_backoff = GNUNET_TIME_relative_min (GNUNET_TIME_UNIT_SECONDS,
482                                                h->retry_backoff);
483   h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->retry_backoff,
484                                                     &reconnect_task,
485                                                     h);
486   h->retry_backoff = GNUNET_TIME_relative_multiply (h->retry_backoff, 2);
487 }
488
489
490 /**
491  * Check the list of pending requests, send the next
492  * one to the core.
493  *
494  * @param h core handle
495  * @param ignore_currently_down transmit message even if not initialized?
496  */
497 static void
498 trigger_next_request (struct GNUNET_CORE_Handle *h,
499                       int ignore_currently_down);
500
501
502 /**
503  * The given request hit its timeout.  Remove from the
504  * doubly-linked list and call the respective continuation.
505  *
506  * @param cls the transmit handle of the request that timed out
507  * @param tc context, can be NULL (!)
508  */
509 static void
510 transmission_timeout (void *cls, 
511                       const struct GNUNET_SCHEDULER_TaskContext *tc);
512
513
514 /**
515  * Control message was sent, mark it as such.
516  *
517  * @param cls the 'struct GNUNET_CORE_TransmitHandle*'
518  * @param tc scheduler context
519  */
520 static void
521 mark_control_message_sent (void *cls,
522                            const struct GNUNET_SCHEDULER_TaskContext *tc)
523 {
524   struct GNUNET_CORE_TransmitHandle *th = cls;
525
526   th->cm = NULL;
527 }
528
529
530 /**
531  * Send a control message to the peer asking for transmission
532  * of the message in the given peer record.
533  *
534  * @param pr peer to request transmission to
535  */
536 static void
537 request_next_transmission (struct PeerRecord *pr)
538 {
539   struct GNUNET_CORE_Handle *h = pr->ch;
540   struct ControlMessage *cm;
541   struct SendMessageRequest *smr;
542   struct GNUNET_CORE_TransmitHandle *th;
543
544   if (pr->timeout_task != GNUNET_SCHEDULER_NO_TASK)
545     {
546       GNUNET_SCHEDULER_cancel (pr->timeout_task);
547       pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
548     }
549   if (NULL == (th = pr->pending_head))
550     {
551       trigger_next_request (h, GNUNET_NO);
552       return;
553     }
554   GNUNET_assert (pr->prev == NULL);
555   GNUNET_assert (pr->next == NULL);
556   pr->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (th->timeout),
557                                                    &transmission_timeout,
558                                                    pr);
559   cm = GNUNET_malloc (sizeof (struct ControlMessage) + 
560                       sizeof (struct SendMessageRequest));
561   cm->cont = &mark_control_message_sent;
562   cm->cont_cls = th;
563   th->cm = cm;
564   smr = (struct SendMessageRequest*) &cm[1];
565   smr->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND_REQUEST);
566   smr->header.size = htons (sizeof (struct SendMessageRequest));
567   smr->priority = htonl (th->priority);
568   smr->deadline = GNUNET_TIME_absolute_hton (th->timeout);
569   smr->peer = pr->peer;
570   smr->queue_size = htonl (pr->queue_size);
571   smr->size = htons (th->msize);
572   smr->smr_id = htons (th->smr_id = pr->smr_id_gen++);
573   GNUNET_CONTAINER_DLL_insert_after (h->pending_head,
574                                      h->pending_tail,
575                                      h->pending_tail,
576                                      cm);
577   trigger_next_request (h, GNUNET_NO);
578 }
579
580
581 /**
582  * The given request hit its timeout.  Remove from the
583  * doubly-linked list and call the respective continuation.
584  *
585  * @param cls the transmit handle of the request that timed out
586  * @param tc context, can be NULL (!)
587  */
588 static void
589 transmission_timeout (void *cls, 
590                       const struct GNUNET_SCHEDULER_TaskContext *tc)
591 {
592   struct PeerRecord *pr = cls;
593   struct GNUNET_CORE_TransmitHandle *th;
594
595   pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
596   th = pr->pending_head;
597   GNUNET_CONTAINER_DLL_remove (pr->pending_head,
598                                pr->pending_tail,
599                                th);
600   pr->queue_size--;
601 #if DEBUG_CORE
602   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
603               "Signalling timeout of request for transmission to CORE service\n");
604 #endif
605   GNUNET_assert (0 == th->get_message (th->get_message_cls, 0, NULL));
606   request_next_transmission (pr);
607 }
608
609
610 /**
611  * Transmit the next message to the core service.
612  */
613 static size_t
614 transmit_message (void *cls,
615                   size_t size, 
616                   void *buf)
617 {
618   struct GNUNET_CORE_Handle *h = cls;
619   struct ControlMessage *cm;
620   struct GNUNET_CORE_TransmitHandle *th;
621   struct PeerRecord *pr;
622   struct SendMessage *sm;
623   const struct GNUNET_MessageHeader *hdr;
624   uint16_t msize;
625   size_t ret;
626
627   h->cth = NULL;
628   if (buf == NULL)
629     {
630 #if DEBUG_CORE
631       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
632                   "Transmission failed, initiating reconnect\n");
633 #endif
634       reconnect_later (h);
635       return 0;
636     }
637   /* first check for control messages */
638   if (NULL != (cm = h->pending_head))
639     {
640       hdr = (const struct GNUNET_MessageHeader*) &cm[1];
641       msize = ntohs (hdr->size);
642       if (size < msize)
643         {
644           trigger_next_request (h, GNUNET_NO);
645           return 0;
646         }
647 #if DEBUG_CORE
648       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
649                   "Transmitting control message with %u bytes of type %u to core.\n",
650                   (unsigned int) msize,
651                   (unsigned int) ntohs (hdr->type));
652 #endif
653       memcpy (buf, hdr, msize);
654       GNUNET_CONTAINER_DLL_remove (h->pending_head,
655                                    h->pending_tail,
656                                    cm);
657       if (NULL != cm->cont)
658         GNUNET_SCHEDULER_add_continuation (cm->cont, 
659                                            cm->cont_cls,
660                                            GNUNET_SCHEDULER_REASON_PREREQ_DONE);
661       GNUNET_free (cm);
662       trigger_next_request (h, GNUNET_NO);
663       return msize;
664     }
665   /* now check for 'ready' P2P messages */
666   if (NULL != (pr = h->ready_peer_head))
667     {
668       th = pr->pending_head;
669       if (size < th->msize + sizeof (struct SendMessage))
670         {
671           trigger_next_request (h, GNUNET_NO);
672           return 0;
673         }
674       GNUNET_CONTAINER_DLL_remove (h->ready_peer_head,
675                                    h->ready_peer_tail,
676                                    pr);
677       GNUNET_CONTAINER_DLL_remove (pr->pending_head,
678                                    pr->pending_tail,
679                                    th);
680       pr->queue_size--;
681       if (pr->timeout_task != GNUNET_SCHEDULER_NO_TASK)
682         {
683           GNUNET_SCHEDULER_cancel (pr->timeout_task);
684           pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
685         }
686 #if DEBUG_CORE
687       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
688                   "Transmitting SEND request to `%s' with %u bytes.\n",
689                   GNUNET_i2s (&pr->peer),
690                   (unsigned int) th->msize);
691 #endif
692       sm = (struct SendMessage *) buf;
693       sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND);
694       sm->priority = htonl (th->priority);
695       sm->deadline = GNUNET_TIME_absolute_hton (th->timeout);
696       sm->peer = pr->peer;
697       ret = th->get_message (th->get_message_cls,
698                              size - sizeof (struct SendMessage),
699                              &sm[1]);
700
701       if (0 == ret)
702         {
703 #if DEBUG_CORE
704           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
705                       "Size of clients message to peer %s is 0!\n",
706                       GNUNET_i2s(&pr->peer));
707 #endif
708           /* client decided to send nothing! */
709           request_next_transmission (pr);
710           return 0;       
711         }
712 #if DEBUG_CORE
713       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
714                   "Produced SEND message to core with %u bytes payload\n",
715                   (unsigned int) ret);
716 #endif
717       GNUNET_assert (ret >= sizeof (struct GNUNET_MessageHeader));
718       if (ret + sizeof (struct SendMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
719         {
720           GNUNET_break (0);
721           request_next_transmission (pr);
722           return 0;
723         }
724       ret += sizeof (struct SendMessage);
725       sm->header.size = htons (ret);
726       GNUNET_assert (ret <= size);
727       GNUNET_free (th);
728       request_next_transmission (pr);
729       return ret;
730     }
731   return 0;
732 }
733
734
735 /**
736  * Check the list of pending requests, send the next
737  * one to the core.
738  *
739  * @param h core handle
740  * @param ignore_currently_down transmit message even if not initialized?
741  */
742 static void
743 trigger_next_request (struct GNUNET_CORE_Handle *h,
744                       int ignore_currently_down)
745 {
746   uint16_t msize;
747
748   if ( (GNUNET_YES == h->currently_down) &&
749        (ignore_currently_down == GNUNET_NO) )
750     return;
751   if (NULL != h->cth)
752     return;
753   if (h->pending_head != NULL)
754     msize = ntohs (((struct GNUNET_MessageHeader*) &h->pending_head[1])->size);    
755   else if (h->ready_peer_head != NULL)
756     msize = h->ready_peer_head->pending_head->msize + sizeof (struct SendMessage);    
757   else
758     return; /* no pending message */
759   h->cth = GNUNET_CLIENT_notify_transmit_ready (h->client,
760                                                 msize,
761                                                 GNUNET_TIME_UNIT_FOREVER_REL,
762                                                 GNUNET_NO,
763                                                 &transmit_message, h);
764 }
765
766
767 /**
768  * Handler for notification messages received from the core.
769  *
770  * @param cls our "struct GNUNET_CORE_Handle"
771  * @param msg the message received from the core service
772  */
773 static void
774 main_notify_handler (void *cls, 
775                      const struct GNUNET_MessageHeader *msg)
776 {
777   struct GNUNET_CORE_Handle *h = cls;
778   const struct InitReplyMessage *m;
779   const struct ConnectNotifyMessage *cnm;
780   const struct DisconnectNotifyMessage *dnm;
781   const struct NotifyTrafficMessage *ntm;
782   const struct GNUNET_MessageHeader *em;
783   const struct ConfigurationInfoMessage *cim;
784   const struct PeerStatusNotifyMessage *psnm;
785   const struct SendMessageReady *smr;
786   const struct GNUNET_CORE_MessageHandler *mh;
787   GNUNET_CORE_StartupCallback init;
788   GNUNET_CORE_PeerConfigurationInfoCallback pcic;
789   struct PeerRecord *pr;
790   struct GNUNET_CORE_TransmitHandle *th;
791   unsigned int hpos;
792   int trigger;
793   uint16_t msize;
794   uint16_t et;
795
796   if (msg == NULL)
797     {
798       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
799                   _
800                   ("Client was disconnected from core service, trying to reconnect.\n"));
801       reconnect_later (h);
802       return;
803     }
804   msize = ntohs (msg->size);
805 #if DEBUG_CORE
806   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
807               "Processing message of type %u and size %u from core service\n",
808               ntohs (msg->type), msize);
809 #endif
810   switch (ntohs (msg->type))
811     {
812     case GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY:
813       if (ntohs (msg->size) != sizeof (struct InitReplyMessage))
814         {
815           GNUNET_break (0);
816           reconnect_later (h);
817           return;
818         }
819       m = (const struct InitReplyMessage *) msg;
820       GNUNET_break (0 == ntohl (m->reserved));
821       /* start our message processing loop */
822       if (GNUNET_YES == h->currently_down)
823         {
824           h->currently_down = GNUNET_NO;
825           trigger_next_request (h, GNUNET_NO);
826         }
827       h->retry_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
828       if (NULL != (init = h->init))
829         {
830           /* mark so we don't call init on reconnect */
831           h->init = NULL;
832           GNUNET_CRYPTO_hash (&m->publicKey,
833                               sizeof (struct
834                                       GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
835                               &h->me.hashPubKey);
836 #if DEBUG_CORE
837           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
838                       "Connected to core service of peer `%s'.\n",
839                       GNUNET_i2s (&h->me));
840 #endif
841           init (h->cls, h, &h->me, &m->publicKey);
842         }
843       else
844         {
845 #if DEBUG_CORE
846           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
847                       "Successfully reconnected to core service.\n");
848 #endif
849         }
850       break;
851     case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT:
852       if (msize != sizeof (struct ConnectNotifyMessage))
853         {
854           GNUNET_break (0);
855           reconnect_later (h);
856           return;
857         }
858       cnm = (const struct ConnectNotifyMessage *) msg;
859 #if DEBUG_CORE
860       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
861                   "Received notification about connection from `%s'.\n",
862                   GNUNET_i2s (&cnm->peer));
863 #endif
864       if (0 == memcmp (&h->me,
865                        &cnm->peer,
866                        sizeof (struct GNUNET_PeerIdentity)))
867         {
868           /* disconnect from self!? */
869           GNUNET_break (0);
870           return;
871         }
872       pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
873                                               &cnm->peer.hashPubKey);
874       if (pr != NULL)
875         {
876           GNUNET_break (0);
877           reconnect_later (h);
878           return;
879         }
880       pr = GNUNET_malloc (sizeof (struct PeerRecord));
881       pr->peer = cnm->peer;
882       pr->ch = h;
883       GNUNET_assert (GNUNET_YES ==
884                      GNUNET_CONTAINER_multihashmap_put (h->peers,
885                                                         &cnm->peer.hashPubKey,
886                                                         pr,
887                                                         GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
888       if (NULL != h->connects)
889         h->connects (h->cls,
890                      &cnm->peer,
891                      NULL /* FIXME: atsi! */);
892       break;
893     case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT:
894       if (msize != sizeof (struct DisconnectNotifyMessage))
895         {
896           GNUNET_break (0);
897           reconnect_later (h);
898           return;
899         }
900       dnm = (const struct DisconnectNotifyMessage *) msg;
901       if (0 == memcmp (&h->me,
902                        &dnm->peer,
903                        sizeof (struct GNUNET_PeerIdentity)))
904         {
905           /* connection to self!? */
906           GNUNET_break (0);
907           return;
908         }
909 #if DEBUG_CORE
910       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
911                   "Received notification about disconnect from `%s'.\n",
912                   GNUNET_i2s (&dnm->peer));
913 #endif
914       pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
915                                               &dnm->peer.hashPubKey);
916       if (pr == NULL)
917         {
918           GNUNET_break (0);
919           reconnect_later (h);
920           return;
921         }
922       trigger = ( (pr->prev != NULL) ||
923                   (pr->next != NULL) ||
924                   (h->ready_peer_head == pr) );
925       disconnect_and_free_peer_entry (h, &dnm->peer.hashPubKey, pr);
926       if (trigger)
927         trigger_next_request (h, GNUNET_NO);
928       break;
929     case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_STATUS_CHANGE:
930       if (NULL == h->status_events)
931         {
932           GNUNET_break (0);
933           break;
934         }
935       if (msize != sizeof (struct PeerStatusNotifyMessage))
936         {
937           GNUNET_break (0);
938           reconnect_later (h);
939           return;
940         }
941       psnm = (const struct PeerStatusNotifyMessage *) msg;
942       if (0 == memcmp (&h->me,
943                        &psnm->peer,
944                        sizeof (struct GNUNET_PeerIdentity)))
945         {
946           /* self-change!? */
947           GNUNET_break (0);
948           return;
949         }
950 #if DEBUG_CORE
951       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
952                   "Received notification about status change by `%s'.\n",
953                   GNUNET_i2s (&psnm->peer));
954 #endif
955       pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
956                                               &psnm->peer.hashPubKey);
957       if (pr == NULL)
958         {
959           GNUNET_break (0);
960           reconnect_later (h);
961           return;
962         }
963       h->status_events (h->cls,
964                         &psnm->peer,
965                         psnm->bandwidth_in,
966                         psnm->bandwidth_out,
967                         GNUNET_TIME_absolute_ntoh (psnm->timeout),
968                         NULL /* FIXME: atsi */);
969       break;
970     case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND:
971       if (msize <
972           sizeof (struct NotifyTrafficMessage) +
973           sizeof (struct GNUNET_MessageHeader))
974         {
975           GNUNET_break (0);
976           reconnect_later (h);
977           return;
978         }
979       ntm = (const struct NotifyTrafficMessage *) msg;
980       if (0 == memcmp (&h->me,
981                        &ntm->peer,
982                        sizeof (struct GNUNET_PeerIdentity)))
983         {
984           /* self-change!? */
985           GNUNET_break (0);
986           return;
987         }
988       em = (const struct GNUNET_MessageHeader *) &ntm[1];
989 #if DEBUG_CORE
990       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
991                   "Received message of type %u and size %u from peer `%4s'\n",
992                   ntohs (em->type), 
993                   ntohs (em->size),
994                   GNUNET_i2s (&ntm->peer));
995 #endif
996       pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
997                                               &ntm->peer.hashPubKey);
998       if (pr == NULL)
999         {
1000           GNUNET_break (0);
1001           reconnect_later (h);
1002           return;
1003         }
1004       if ((GNUNET_NO == h->inbound_hdr_only) &&
1005           (msize != ntohs (em->size) + sizeof (struct NotifyTrafficMessage)))
1006         {
1007           GNUNET_break (0);
1008           reconnect_later (h);
1009           return;
1010         }
1011       et = ntohs (em->type);
1012       for (hpos = 0; hpos < h->hcnt; hpos++)
1013         {
1014           mh = &h->handlers[hpos];
1015           if (mh->type != et)
1016             continue;
1017           if ((mh->expected_size != ntohs (em->size)) &&
1018               (mh->expected_size != 0))
1019             {
1020               GNUNET_break (0);
1021               continue;
1022             }
1023           if (GNUNET_OK !=
1024               h->handlers[hpos].callback (h->cls, &ntm->peer, em,
1025                                           NULL /* FIXME: atsi */))
1026             {
1027               /* error in processing, do not process other messages! */
1028               break;
1029             }
1030         }
1031       if (NULL != h->inbound_notify)
1032         h->inbound_notify (h->cls, &ntm->peer, em,
1033                            NULL /* FIXME: atsi */);
1034       break;
1035     case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_OUTBOUND:
1036       if (msize <
1037           sizeof (struct NotifyTrafficMessage) +
1038           sizeof (struct GNUNET_MessageHeader))
1039         {
1040           GNUNET_break (0);
1041           reconnect_later (h);
1042           return;
1043         }
1044       ntm = (const struct NotifyTrafficMessage *) msg;
1045       if (0 == memcmp (&h->me,
1046                        &ntm->peer,
1047                        sizeof (struct GNUNET_PeerIdentity)))
1048         {
1049           /* self-change!? */
1050           GNUNET_break (0);
1051           return;
1052         }
1053       em = (const struct GNUNET_MessageHeader *) &ntm[1];
1054       pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
1055                                               &ntm->peer.hashPubKey);
1056       if (pr == NULL)
1057         {
1058           GNUNET_break (0);
1059           reconnect_later (h);
1060           return;
1061         }
1062 #if DEBUG_CORE
1063       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1064                   "Received notification about transmission to `%s'.\n",
1065                   GNUNET_i2s (&ntm->peer));
1066 #endif
1067       if ((GNUNET_NO == h->outbound_hdr_only) &&
1068           (msize != ntohs (em->size) + sizeof (struct NotifyTrafficMessage)))
1069         {
1070           GNUNET_break (0);
1071           reconnect_later (h);
1072           return;
1073         }
1074       if (NULL == h->outbound_notify)
1075         {
1076           GNUNET_break (0);
1077           break;
1078         }
1079       h->outbound_notify (h->cls, &ntm->peer, em,
1080                           NULL /* FIXME: atsi? */);
1081       break;
1082     case GNUNET_MESSAGE_TYPE_CORE_SEND_READY:
1083       if (msize != sizeof (struct SendMessageReady))
1084         {
1085           GNUNET_break (0);
1086           reconnect_later (h);
1087           return;
1088         }
1089       smr = (const struct SendMessageReady *) msg;
1090       if (0 == memcmp (&h->me,
1091                        &smr->peer,
1092                        sizeof (struct GNUNET_PeerIdentity)))
1093         {
1094           /* self-change!? */
1095           GNUNET_break (0);
1096           return;
1097         }
1098       pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
1099                                               &smr->peer.hashPubKey);
1100       if (pr == NULL)
1101         {
1102           GNUNET_break (0);
1103           reconnect_later (h);
1104           return;
1105         }
1106 #if DEBUG_CORE
1107       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1108                   "Received notification about transmission readiness to `%s'.\n",
1109                   GNUNET_i2s (&smr->peer));
1110 #endif
1111       th = pr->pending_head;
1112       if (ntohs (smr->smr_id) != th->smr_id)
1113         {
1114           /* READY message is for expired or cancelled message,
1115              ignore! (we should have already sent another request) */
1116           break;
1117         }
1118       if ( (pr->prev != NULL) ||
1119            (pr->next != NULL) ||
1120            (h->ready_peer_head == pr) )
1121         {
1122           /* we should not already be on the ready list... */
1123           GNUNET_break (0);
1124           reconnect_later (h);
1125           return;
1126         }
1127       GNUNET_CONTAINER_DLL_insert (h->ready_peer_head,
1128                                    h->ready_peer_tail,
1129                                    pr);
1130       trigger_next_request (h, GNUNET_NO);
1131       break;
1132     case GNUNET_MESSAGE_TYPE_CORE_CONFIGURATION_INFO:
1133       if (ntohs (msg->size) != sizeof (struct ConfigurationInfoMessage))
1134         {
1135           GNUNET_break (0);
1136           reconnect_later (h);
1137           return;
1138         }
1139       cim = (const struct ConfigurationInfoMessage*) msg;
1140       if (0 == memcmp (&h->me,
1141                        &cim->peer,
1142                        sizeof (struct GNUNET_PeerIdentity)))
1143         {
1144           /* self-change!? */
1145           GNUNET_break (0);
1146           return;
1147         }
1148 #if DEBUG_CORE
1149       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1150                   "Received notification about configuration update for `%s'.\n",
1151                   GNUNET_i2s (&cim->peer));
1152 #endif
1153       pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
1154                                               &cim->peer.hashPubKey);
1155       if (pr == NULL)
1156         {
1157           GNUNET_break (0);
1158           reconnect_later (h);
1159           return;
1160         }
1161       if (pr->rim_id != ntohl (cim->rim_id))
1162         break;
1163       pcic = pr->pcic;
1164       pr->pcic = NULL;
1165       if (pcic != NULL)
1166         pcic (pr->pcic_cls,
1167               &pr->peer,
1168               cim->bw_out,
1169               ntohl (cim->reserved_amount),
1170               GNUNET_ntohll (cim->preference));
1171       break;
1172     default:
1173       reconnect_later (h);
1174       return;
1175     }
1176   GNUNET_CLIENT_receive (h->client,
1177                          &main_notify_handler, h, 
1178                          GNUNET_TIME_UNIT_FOREVER_REL);
1179 }
1180
1181
1182 /**
1183  * Task executed once we are done transmitting the INIT message.
1184  * Starts our 'receive' loop.
1185  *
1186  * @param cls the 'struct GNUNET_CORE_Handle'
1187  * @param tc task context
1188  */
1189 static void
1190 init_done_task (void *cls, 
1191                 const struct GNUNET_SCHEDULER_TaskContext *tc)
1192 {
1193   struct GNUNET_CORE_Handle *h = cls;
1194
1195   if (tc == NULL)
1196     return; /* error */
1197   if (0 == (tc->reason & GNUNET_SCHEDULER_REASON_PREREQ_DONE))
1198     {
1199 #if DEBUG_CORE
1200       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1201                   "Failed to exchange INIT with core, retrying\n");
1202 #endif
1203       reconnect_later (h);
1204       return;
1205     }
1206   GNUNET_CLIENT_receive (h->client,
1207                          &main_notify_handler, 
1208                          h, 
1209                          GNUNET_TIME_UNIT_FOREVER_REL);
1210 }
1211
1212
1213 /**
1214  * Our current client connection went down.  Clean it up
1215  * and try to reconnect!
1216  *
1217  * @param h our handle to the core service
1218  */
1219 static void
1220 reconnect (struct GNUNET_CORE_Handle *h)
1221 {
1222   struct ControlMessage *cm;
1223   struct InitMessage *init;
1224   uint32_t opt;
1225   uint16_t msize;
1226   uint16_t *ts;
1227   unsigned int hpos;
1228
1229 #if DEBUG_CORE
1230   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1231               "Reconnecting to CORE service\n");
1232 #endif
1233   GNUNET_assert (h->client == NULL);
1234   GNUNET_assert (h->currently_down == GNUNET_YES);
1235   h->client = GNUNET_CLIENT_connect ("core", h->cfg);
1236   if (h->client == NULL)
1237     {
1238       reconnect_later (h);
1239       return;
1240     }
1241   msize = h->hcnt * sizeof (uint16_t) + sizeof (struct InitMessage);
1242   cm = GNUNET_malloc (sizeof (struct ControlMessage) +
1243                       msize);
1244   cm->cont = &init_done_task;
1245   cm->cont_cls = h;
1246   init = (struct InitMessage*) &cm[1];
1247   init->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_INIT);
1248   init->header.size = htons (msize);
1249   opt = GNUNET_CORE_OPTION_SEND_CONNECT | GNUNET_CORE_OPTION_SEND_DISCONNECT;
1250   if (h->status_events != NULL)
1251     opt |= GNUNET_CORE_OPTION_SEND_STATUS_CHANGE;
1252   if (h->inbound_notify != NULL)
1253     {
1254       if (h->inbound_hdr_only)
1255         opt |= GNUNET_CORE_OPTION_SEND_HDR_INBOUND;
1256       else
1257         opt |= GNUNET_CORE_OPTION_SEND_FULL_INBOUND;
1258     }
1259   if (h->outbound_notify != NULL)
1260     {
1261       if (h->outbound_hdr_only)
1262         opt |= GNUNET_CORE_OPTION_SEND_HDR_OUTBOUND;
1263       else
1264         opt |= GNUNET_CORE_OPTION_SEND_FULL_OUTBOUND;
1265     }
1266   init->options = htonl (opt);
1267   ts = (uint16_t *) &init[1];
1268   for (hpos = 0; hpos < h->hcnt; hpos++)
1269     ts[hpos] = htons (h->handlers[hpos].type);
1270   GNUNET_CONTAINER_DLL_insert (h->pending_head,
1271                                h->pending_tail,
1272                                cm);
1273   trigger_next_request (h, GNUNET_YES);
1274 }
1275
1276
1277
1278 /**
1279  * Connect to the core service.  Note that the connection may
1280  * complete (or fail) asynchronously.
1281  *
1282  * @param cfg configuration to use
1283  * @param queue_size size of the per-peer message queue
1284  * @param cls closure for the various callbacks that follow (including handlers in the handlers array)
1285  * @param init callback to call on timeout or once we have successfully
1286  *        connected to the core service; note that timeout is only meaningful if init is not NULL
1287  * @param connects function to call on peer connect, can be NULL
1288  * @param disconnects function to call on peer disconnect / timeout, can be NULL
1289  * @param status_events function to call on changes to peer connection status, can be NULL
1290  * @param inbound_notify function to call for all inbound messages, can be NULL
1291  * @param inbound_hdr_only set to GNUNET_YES if inbound_notify will only read the
1292  *                GNUNET_MessageHeader and hence we do not need to give it the full message;
1293  *                can be used to improve efficiency, ignored if inbound_notify is NULLL
1294  * @param outbound_notify function to call for all outbound messages, can be NULL
1295  * @param outbound_hdr_only set to GNUNET_YES if outbound_notify will only read the
1296  *                GNUNET_MessageHeader and hence we do not need to give it the full message
1297  *                can be used to improve efficiency, ignored if outbound_notify is NULLL
1298  * @param handlers callbacks for messages we care about, NULL-terminated
1299  * @return handle to the core service (only useful for disconnect until 'init' is called);
1300  *                NULL on error (in this case, init is never called)
1301  */
1302 struct GNUNET_CORE_Handle *
1303 GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
1304                      unsigned int queue_size,
1305                      void *cls,
1306                      GNUNET_CORE_StartupCallback init,
1307                      GNUNET_CORE_ConnectEventHandler connects,
1308                      GNUNET_CORE_DisconnectEventHandler disconnects,
1309                      GNUNET_CORE_PeerStatusEventHandler status_events,
1310                      GNUNET_CORE_MessageCallback inbound_notify,
1311                      int inbound_hdr_only,
1312                      GNUNET_CORE_MessageCallback outbound_notify,
1313                      int outbound_hdr_only,
1314                      const struct GNUNET_CORE_MessageHandler *handlers)
1315 {
1316   struct GNUNET_CORE_Handle *h;
1317
1318   h = GNUNET_malloc (sizeof (struct GNUNET_CORE_Handle));
1319   h->cfg = cfg;
1320   h->queue_size = queue_size;
1321   h->cls = cls;
1322   h->init = init;
1323   h->connects = connects;
1324   h->disconnects = disconnects;
1325   h->status_events = status_events;
1326   h->inbound_notify = inbound_notify;
1327   h->outbound_notify = outbound_notify;
1328   h->inbound_hdr_only = inbound_hdr_only;
1329   h->outbound_hdr_only = outbound_hdr_only;
1330   h->handlers = handlers;
1331   h->hcnt = 0;
1332   h->currently_down = GNUNET_YES;
1333   h->peers = GNUNET_CONTAINER_multihashmap_create (128);
1334   h->retry_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
1335   while (handlers[h->hcnt].callback != NULL)
1336     h->hcnt++;
1337   GNUNET_assert (h->hcnt <
1338                  (GNUNET_SERVER_MAX_MESSAGE_SIZE -
1339                   sizeof (struct InitMessage)) / sizeof (uint16_t));
1340 #if DEBUG_CORE
1341   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1342               "Connecting to CORE service\n");
1343 #endif
1344   reconnect (h);
1345   return h;
1346 }
1347
1348
1349 /**
1350  * Disconnect from the core service.  This function can only 
1351  * be called *after* all pending 'GNUNET_CORE_notify_transmit_ready'
1352  * requests have been explicitly cancelled.
1353  *
1354  * @param handle connection to core to disconnect
1355  */
1356 void
1357 GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle)
1358 {
1359   struct ControlMessage *cm;
1360   
1361 #if DEBUG_CORE
1362   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1363               "Disconnecting from CORE service\n");
1364 #endif
1365   if (handle->cth != NULL)
1366     {
1367       GNUNET_CLIENT_notify_transmit_ready_cancel (handle->cth);
1368       handle->cth = NULL;
1369     }
1370   if (handle->client != NULL)
1371     {
1372       GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
1373       handle->client = NULL;
1374     }
1375   if (handle->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
1376     {
1377       GNUNET_SCHEDULER_cancel (handle->reconnect_task);
1378       handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
1379     }
1380   while (NULL != (cm = handle->pending_head))
1381     {
1382       GNUNET_CONTAINER_DLL_remove (handle->pending_head,
1383                                    handle->pending_tail,
1384                                    cm);
1385       cm->cont (cm->cont_cls, NULL);
1386       GNUNET_free (cm);
1387     }
1388   GNUNET_CONTAINER_multihashmap_iterate (handle->peers,
1389                                          &disconnect_and_free_peer_entry,
1390                                          handle);
1391   GNUNET_CONTAINER_multihashmap_destroy (handle->peers);
1392   GNUNET_break (handle->ready_peer_head == NULL);
1393   GNUNET_free (handle);
1394 }
1395
1396
1397 /**
1398  * Ask the core to call "notify" once it is ready to transmit the
1399  * given number of bytes to the specified "target".  If we are not yet
1400  * connected to the specified peer, a call to this function will cause
1401  * us to try to establish a connection.
1402  *
1403  * @param handle connection to core service
1404  * @param priority how important is the message?
1405  * @param maxdelay how long can the message wait?
1406  * @param target who should receive the message,
1407  *        use NULL for this peer (loopback)
1408  * @param notify_size how many bytes of buffer space does notify want?
1409  * @param notify function to call when buffer space is available
1410  * @param notify_cls closure for notify
1411  * @return non-NULL if the notify callback was queued,
1412  *         NULL if we can not even queue the request (insufficient
1413  *         memory); if NULL is returned, "notify" will NOT be called.
1414  */
1415 struct GNUNET_CORE_TransmitHandle *
1416 GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle,
1417                                    uint32_t priority,
1418                                    struct GNUNET_TIME_Relative maxdelay,
1419                                    const struct GNUNET_PeerIdentity *target,
1420                                    size_t notify_size,
1421                                    GNUNET_CONNECTION_TransmitReadyNotify notify,
1422                                    void *notify_cls)
1423 {
1424   struct PeerRecord *pr;
1425   struct GNUNET_CORE_TransmitHandle *th;
1426   struct GNUNET_CORE_TransmitHandle *pos;
1427   struct GNUNET_CORE_TransmitHandle *prev;
1428   struct GNUNET_CORE_TransmitHandle *minp;
1429
1430   pr = GNUNET_CONTAINER_multihashmap_get (handle->peers,
1431                                           &target->hashPubKey);
1432   if (NULL == pr)
1433     {
1434       /* attempt to send to peer that is not connected */
1435       GNUNET_break (0);
1436       return NULL;
1437     }
1438   GNUNET_assert (notify_size + sizeof (struct SendMessage) <
1439                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
1440   th = GNUNET_malloc (sizeof (struct GNUNET_CORE_TransmitHandle));
1441   th->peer = pr;
1442   th->get_message = notify;
1443   th->get_message_cls = notify_cls;
1444   th->timeout = GNUNET_TIME_relative_to_absolute (maxdelay);
1445   th->priority = priority;
1446   th->msize = notify_size;
1447   /* bound queue size */
1448   if (pr->queue_size == handle->queue_size)
1449     {
1450       /* find lowest-priority entry */
1451       minp = pr->pending_head;
1452       prev = minp->next;
1453       while (prev != NULL)
1454         {
1455           if (prev->priority < minp->priority)
1456             minp = prev;
1457           prev = prev->next;
1458         }
1459       if (minp == NULL) 
1460         {
1461           GNUNET_break (handle->queue_size != 0);
1462           GNUNET_break (pr->queue_size == 0);
1463           return NULL;
1464         }
1465       if (priority <= minp->priority)
1466         return NULL; /* priority too low */
1467       GNUNET_CONTAINER_DLL_remove (pr->pending_head,
1468                                    pr->pending_tail,
1469                                    minp);
1470       pr->queue_size--;
1471       GNUNET_assert (0 ==
1472                      minp->get_message (minp->get_message_cls,
1473                                         0, NULL));
1474       GNUNET_free (minp);
1475     }
1476
1477   /* Order entries by deadline, but SKIP 'HEAD' if
1478      we're in the 'ready_peer_*' DLL */
1479   pos = pr->pending_head;
1480   if ( (pr->prev != NULL) ||
1481        (pr->next != NULL) ||
1482        (pr == handle->ready_peer_head) )
1483     {
1484       GNUNET_assert (pos != NULL);
1485       pos = pos->next; /* skip head */
1486     }
1487
1488   /* insertion sort */
1489   prev = pos;
1490   while ( (pos != NULL) &&
1491           (pos->timeout.abs_value < th->timeout.abs_value) )      
1492     {
1493       prev = pos;
1494       pos = pos->next;
1495     }
1496   GNUNET_CONTAINER_DLL_insert_after (pr->pending_head,
1497                                      pr->pending_tail,
1498                                      prev,
1499                                      th);
1500   pr->queue_size++;
1501   /* was the request queue previously empty? */
1502   if (pr->pending_head == th) 
1503     request_next_transmission (pr);
1504   return th;
1505 }
1506
1507
1508 /**
1509  * Cancel the specified transmission-ready notification.
1510  *
1511  * @param th handle that was returned by "notify_transmit_ready".
1512  */
1513 void
1514 GNUNET_CORE_notify_transmit_ready_cancel (struct GNUNET_CORE_TransmitHandle
1515                                           *th)
1516 {
1517   struct PeerRecord *pr = th->peer;
1518   struct GNUNET_CORE_Handle *h = pr->ch;
1519   int was_head;
1520   
1521   was_head = (pr->pending_head == th);
1522   GNUNET_CONTAINER_DLL_remove (pr->pending_head,
1523                                pr->pending_tail,
1524                                th);    
1525   if (th->cm != NULL)
1526     {
1527       /* we're currently in the control queue, remove */
1528       GNUNET_CONTAINER_DLL_remove (h->pending_head,
1529                                    h->pending_tail,
1530                                    th->cm);
1531       GNUNET_free (th->cm);      
1532     }
1533   GNUNET_free (th);
1534   if (was_head)
1535     {
1536       if ( (pr->prev != NULL) ||
1537            (pr->next != NULL) ||
1538            (pr == h->ready_peer_head) )
1539         {
1540           /* the request that was 'approved' by core was
1541              cancelled before it could be transmitted; remove
1542              us from the 'ready' list */
1543           GNUNET_CONTAINER_DLL_remove (h->ready_peer_head,
1544                                        h->ready_peer_tail,
1545                                        pr);
1546         }
1547       request_next_transmission (pr);
1548     }
1549 }
1550
1551
1552 /* ****************** GNUNET_CORE_peer_request_connect ******************** */
1553
1554 /**
1555  * Handle for a request to the core to connect to
1556  * a particular peer.  Can be used to cancel the request
1557  * (before the 'cont'inuation is called).
1558  */
1559 struct GNUNET_CORE_PeerRequestHandle
1560 {
1561
1562   /**
1563    * Link to control message.
1564    */
1565   struct ControlMessage *cm;
1566
1567   /**
1568    * Core handle used.
1569    */
1570   struct GNUNET_CORE_Handle *h;
1571
1572   /**
1573    * Continuation to run when done.
1574    */
1575   GNUNET_SCHEDULER_Task cont;
1576
1577   /**
1578    * Closure for 'cont'.
1579    */
1580   void *cont_cls;
1581
1582 };
1583
1584
1585 /**
1586  * Continuation called when the control message was transmitted.
1587  * Calls the original continuation and frees the remaining
1588  * resources.
1589  *
1590  * @param cls the 'struct GNUNET_CORE_PeerRequestHandle'
1591  * @param tc scheduler context
1592  */
1593 static void
1594 peer_request_connect_cont (void *cls,
1595                            const struct GNUNET_SCHEDULER_TaskContext *tc)
1596 {
1597   struct GNUNET_CORE_PeerRequestHandle *ret = cls;
1598   
1599   if (ret->cont != NULL)
1600     {
1601       if (tc == NULL)
1602         GNUNET_SCHEDULER_add_now (ret->cont,
1603                                   ret->cont_cls);
1604       else
1605         ret->cont (ret->cont_cls, tc);
1606     }
1607   GNUNET_free (ret);
1608 }
1609
1610
1611 /**
1612  * Request that the core should try to connect to a particular peer.
1613  * Once the request has been transmitted to the core, the continuation
1614  * function will be called.  Note that this does NOT mean that a
1615  * connection was successfully established -- it only means that the
1616  * core will now try.  Successful establishment of the connection
1617  * will be signalled to the 'connects' callback argument of
1618  * 'GNUNET_CORE_connect' only.  If the core service does not respond
1619  * to our connection attempt within the given time frame, 'cont' will
1620  * be called with the TIMEOUT reason code.
1621  *
1622  * @param h core handle
1623  * @param timeout how long to try to talk to core
1624  * @param peer who should we connect to
1625  * @param cont function to call once the request has been completed (or timed out)
1626  * @param cont_cls closure for cont
1627  * @return NULL on error (cont will not be called), otherwise handle for cancellation
1628  */
1629 struct GNUNET_CORE_PeerRequestHandle *
1630 GNUNET_CORE_peer_request_connect (struct GNUNET_CORE_Handle *h,
1631                                   struct GNUNET_TIME_Relative timeout,
1632                                   const struct GNUNET_PeerIdentity * peer,
1633                                   GNUNET_SCHEDULER_Task cont,
1634                                   void *cont_cls)
1635 {
1636   struct GNUNET_CORE_PeerRequestHandle *ret;
1637   struct ControlMessage *cm;
1638   struct ConnectMessage *msg;
1639   
1640   cm = GNUNET_malloc (sizeof (struct ControlMessage) + 
1641                       sizeof (struct ConnectMessage));
1642   msg = (struct ConnectMessage*) &cm[1];
1643   msg->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_REQUEST_CONNECT);
1644   msg->header.size = htons (sizeof (struct ConnectMessage));
1645   msg->reserved = htonl (0);
1646   msg->timeout = GNUNET_TIME_relative_hton (timeout);
1647   msg->peer = *peer;
1648   GNUNET_CONTAINER_DLL_insert (h->pending_head,
1649                                h->pending_tail,
1650                                cm);
1651   ret = GNUNET_malloc (sizeof (struct GNUNET_CORE_PeerRequestHandle));
1652   ret->h = h;
1653   ret->cm = cm;
1654   ret->cont = cont;
1655   ret->cont_cls = cont_cls;
1656   cm->cont = &peer_request_connect_cont;
1657   cm->cont_cls = ret;
1658   if (h->pending_head == cm)
1659     trigger_next_request (h, GNUNET_NO);
1660   return ret;
1661 }
1662
1663
1664 /**
1665  * Cancel a pending request to connect to a particular peer.  Must not
1666  * be called after the 'cont' function was invoked.
1667  *
1668  * @param req request handle that was returned for the original request
1669  */
1670 void
1671 GNUNET_CORE_peer_request_connect_cancel (struct GNUNET_CORE_PeerRequestHandle *req)
1672 {
1673   struct GNUNET_CORE_Handle *h = req->h;
1674   struct ControlMessage *cm = req->cm;
1675
1676   GNUNET_CONTAINER_DLL_remove (h->pending_head,
1677                                h->pending_tail,
1678                                cm);
1679   GNUNET_free (cm);
1680   GNUNET_free (req);
1681 }
1682
1683
1684 /* ****************** GNUNET_CORE_peer_change_preference ******************** */
1685
1686
1687 struct GNUNET_CORE_InformationRequestContext 
1688 {
1689   
1690   /**
1691    * Our connection to the service.
1692    */
1693   struct GNUNET_CORE_Handle *h;
1694
1695   /**
1696    * Function to call with the information.
1697    */
1698   GNUNET_CORE_PeerConfigurationInfoCallback info;
1699
1700   /**
1701    * Closure for info.
1702    */
1703   void *info_cls;
1704
1705   /**
1706    * Link to control message, NULL if CM was sent.
1707    */ 
1708   struct ControlMessage *cm;
1709
1710   /**
1711    * Link to peer record.
1712    */
1713   struct PeerRecord *pr;
1714 };
1715
1716
1717 /**
1718  * CM was sent, remove link so we don't double-free.
1719  *
1720  * @param cls the 'struct GNUNET_CORE_InformationRequestContext'
1721  * @param tc scheduler context
1722  */
1723 static void
1724 change_preference_send_continuation (void *cls,
1725                                      const struct GNUNET_SCHEDULER_TaskContext *tc)
1726 {
1727   struct GNUNET_CORE_InformationRequestContext *irc = cls;
1728
1729   irc->cm = NULL;
1730 }
1731
1732
1733 /**
1734  * Obtain statistics and/or change preferences for the given peer.
1735  *
1736  * @param h core handle
1737  * @param peer identifies the peer
1738  * @param timeout after how long should we give up (and call "info" with NULL
1739  *                for "peer" to signal an error)?
1740  * @param bw_out set to the current bandwidth limit (sending) for this peer,
1741  *                caller should set "bw_out" to "-1" to avoid changing
1742  *                the current value; otherwise "bw_out" will be lowered to
1743  *                the specified value; passing a pointer to "0" can be used to force
1744  *                us to disconnect from the peer; "bw_out" might not increase
1745  *                as specified since the upper bound is generally
1746  *                determined by the other peer!
1747  * @param amount reserve N bytes for receiving, negative
1748  *                amounts can be used to undo a (recent) reservation;
1749  * @param preference increase incoming traffic share preference by this amount;
1750  *                in the absence of "amount" reservations, we use this
1751  *                preference value to assign proportional bandwidth shares
1752  *                to all connected peers
1753  * @param info function to call with the resulting configuration information
1754  * @param info_cls closure for info
1755  * @return NULL on error
1756  */
1757 struct GNUNET_CORE_InformationRequestContext *
1758 GNUNET_CORE_peer_change_preference (struct GNUNET_CORE_Handle *h,
1759                                     const struct GNUNET_PeerIdentity *peer,
1760                                     struct GNUNET_TIME_Relative timeout,
1761                                     struct GNUNET_BANDWIDTH_Value32NBO bw_out,
1762                                     int32_t amount,
1763                                     uint64_t preference,
1764                                     GNUNET_CORE_PeerConfigurationInfoCallback info,
1765                                     void *info_cls)
1766 {
1767   struct GNUNET_CORE_InformationRequestContext *irc;
1768   struct PeerRecord *pr;
1769   struct RequestInfoMessage *rim;
1770   struct ControlMessage *cm;
1771
1772   pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
1773                                           &peer->hashPubKey);
1774   if (NULL == pr)
1775     {
1776       /* attempt to change preference on peer that is not connected */
1777       GNUNET_break (0);
1778       return NULL;
1779     }
1780   if (pr->pcic != NULL)
1781     {
1782       /* second change before first one is done */
1783       GNUNET_break (0);
1784       return NULL;
1785     }
1786   irc = GNUNET_malloc (sizeof (struct GNUNET_CORE_InformationRequestContext));
1787   irc->h = h;
1788   irc->info = info;
1789   irc->info_cls = info_cls;
1790   cm = GNUNET_malloc (sizeof (struct ControlMessage) +
1791                       sizeof (struct RequestInfoMessage));
1792   cm->cont = &change_preference_send_continuation;
1793   cm->cont_cls = irc;
1794   irc->cm = cm;
1795   rim = (struct RequestInfoMessage*) &cm[1];
1796   rim->header.size = htons (sizeof (struct RequestInfoMessage));
1797   rim->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_REQUEST_INFO);
1798   rim->rim_id = htonl (pr->rim_id = h->rim_id_gen++);
1799   rim->limit_outbound = bw_out;
1800   rim->reserve_inbound = htonl (amount);
1801   rim->preference_change = GNUNET_htonll(preference);
1802   rim->peer = *peer;
1803   GNUNET_CONTAINER_DLL_insert (h->pending_head,
1804                                h->pending_tail,
1805                                cm); 
1806   pr->pcic = info;
1807   pr->pcic_cls = info_cls;
1808   return irc;
1809 }
1810
1811
1812 /**
1813  * Cancel request for getting information about a peer.
1814  * Note that an eventual change in preference, trust or bandwidth
1815  * assignment MAY have already been committed at the time, 
1816  * so cancelling a request is NOT sure to undo the original
1817  * request.  The original request may or may not still commit.
1818  * The only thing cancellation ensures is that the callback
1819  * from the original request will no longer be called.
1820  *
1821  * @param irc context returned by the original GNUNET_CORE_peer_get_info call
1822  */
1823 void
1824 GNUNET_CORE_peer_change_preference_cancel (struct GNUNET_CORE_InformationRequestContext *irc)
1825 {
1826   struct GNUNET_CORE_Handle *h = irc->h;
1827   struct PeerRecord *pr = irc->pr;
1828
1829   if (irc->cm != NULL)
1830     {
1831       GNUNET_CONTAINER_DLL_remove (h->pending_head,
1832                                    h->pending_tail,
1833                                    irc->cm);
1834       GNUNET_free (irc->cm);
1835     }
1836   pr->pcic = NULL;
1837   pr->pcic_cls = NULL;
1838   GNUNET_free (irc);
1839 }
1840
1841
1842 /* ********************* GNUNET_CORE_iterate_peers *********************** */
1843
1844 /**
1845  * Context for 'iterate_peers' helper function.
1846  */
1847 struct IterationContext
1848 {
1849   /**
1850    * Callback to call.
1851    */
1852   GNUNET_CORE_ConnectEventHandler peer_cb;
1853
1854   /**
1855    * Closure for 'peer_cb'.
1856    */
1857   void *cb_cls;
1858 };
1859
1860
1861 /**
1862  * Call callback for each peer.
1863  *
1864  * @param cls the 'struct IterationContext'
1865  * @param hc peer identity, not used
1866  * @param value the 'struct PeerRecord'
1867  * @return GNUNET_YES (continue iteration)
1868  */
1869 static int
1870 iterate_peers (void *cls,
1871                const GNUNET_HashCode *hc,
1872                void *value)
1873 {
1874   struct IterationContext *ic = cls;
1875   struct PeerRecord *pr = value;
1876
1877   ic->peer_cb (ic->cb_cls,
1878                &pr->peer,
1879                NULL /* FIXME: pass atsi? */);
1880   return GNUNET_YES;
1881 }
1882
1883
1884 /**
1885  * Obtain statistics and/or change preferences for the given peer.
1886  *
1887  * @param h handle to core
1888  * @param peer_cb function to call with the peer information
1889  * @param cb_cls closure for peer_cb
1890  * @return GNUNET_OK if iterating, GNUNET_SYSERR on error
1891  */
1892 int
1893 GNUNET_CORE_iterate_peers (struct GNUNET_CORE_Handle *h,
1894                            GNUNET_CORE_ConnectEventHandler peer_cb,
1895                            void *cb_cls)
1896 {
1897   struct IterationContext ic;
1898
1899   ic.peer_cb = peer_cb;
1900   ic.cb_cls = cb_cls;
1901   GNUNET_CONTAINER_multihashmap_iterate (h->peers,
1902                                          &iterate_peers,
1903                                          &ic);
1904   return GNUNET_OK;
1905 }
1906
1907
1908 /* end of core_api.c */