2 This file is part of GNUnet.
3 (C) 2009, 2010 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file core/core_api.c
23 * @brief core service; this is the main API for encrypted P2P
25 * @author Christian Grothoff
28 #include "gnunet_constants.h"
29 #include "gnunet_core_service.h"
32 #define LOG(kind,...) GNUNET_log_from (kind, "core-api",__VA_ARGS__)
35 * Information we track for each peer.
41 * We generally do NOT keep peer records in a DLL; this
42 * DLL is only used IF this peer's 'pending_head' message
43 * is ready for transmission.
45 struct PeerRecord *prev;
48 * We generally do NOT keep peer records in a DLL; this
49 * DLL is only used IF this peer's 'pending_head' message
50 * is ready for transmission.
52 struct PeerRecord *next;
55 * Peer the record is about.
57 struct GNUNET_PeerIdentity peer;
60 * Corresponding core handle.
62 struct GNUNET_CORE_Handle *ch;
65 * Head of doubly-linked list of pending requests.
66 * Requests are sorted by deadline *except* for HEAD,
67 * which is only modified upon transmission to core.
69 struct GNUNET_CORE_TransmitHandle *pending_head;
72 * Tail of doubly-linked list of pending requests.
74 struct GNUNET_CORE_TransmitHandle *pending_tail;
77 * ID of timeout task for the 'pending_head' handle
78 * which is the one with the smallest timeout.
80 GNUNET_SCHEDULER_TaskIdentifier timeout_task;
83 * ID of task to run 'next_request_transmission'.
85 GNUNET_SCHEDULER_TaskIdentifier ntr_task;
88 * Current size of the queue of pending requests.
90 unsigned int queue_size;
93 * SendMessageRequest ID generator for this peer.
101 * Type of function called upon completion.
104 * @param success GNUNET_OK on success (which for request_connect
105 * ONLY means that we transmitted the connect request to CORE,
106 * it does not mean that we are actually now connected!);
107 * GNUNET_NO on timeout,
108 * GNUNET_SYSERR if core was shut down
110 typedef void (*GNUNET_CORE_ControlContinuation) (void *cls, int success);
114 * Entry in a doubly-linked list of control messages to be transmitted
115 * to the core service. Control messages include traffic allocation,
116 * connection requests and of course our initial 'init' request.
118 * The actual message is allocated at the end of this struct.
120 struct ControlMessage
123 * This is a doubly-linked list.
125 struct ControlMessage *next;
128 * This is a doubly-linked list.
130 struct ControlMessage *prev;
133 * Function to run after transmission failed/succeeded.
135 GNUNET_CORE_ControlContinuation cont;
138 * Closure for 'cont'.
143 * Transmit handle (if one is associated with this ControlMessage), or NULL.
145 struct GNUNET_CORE_TransmitHandle *th;
151 * Context for the core service connection.
153 struct GNUNET_CORE_Handle
157 * Configuration we're using.
159 const struct GNUNET_CONFIGURATION_Handle *cfg;
162 * Closure for the various callbacks.
167 * Function to call once we've handshaked with the core service.
169 GNUNET_CORE_StartupCallback init;
172 * Function to call whenever we're notified about a peer connecting.
174 GNUNET_CORE_ConnectEventHandler connects;
177 * Function to call whenever we're notified about a peer disconnecting.
179 GNUNET_CORE_DisconnectEventHandler disconnects;
182 * Function to call whenever we receive an inbound message.
184 GNUNET_CORE_MessageCallback inbound_notify;
187 * Function to call whenever we receive an outbound message.
189 GNUNET_CORE_MessageCallback outbound_notify;
192 * Function handlers for messages of particular type.
194 const struct GNUNET_CORE_MessageHandler *handlers;
197 * Our connection to the service.
199 struct GNUNET_CLIENT_Connection *client;
202 * Handle for our current transmission request.
204 struct GNUNET_CLIENT_TransmitHandle *cth;
207 * Head of doubly-linked list of pending requests.
209 struct ControlMessage *control_pending_head;
212 * Tail of doubly-linked list of pending requests.
214 struct ControlMessage *control_pending_tail;
217 * Head of doubly-linked list of peers that are core-approved
218 * to send their next message.
220 struct PeerRecord *ready_peer_head;
223 * Tail of doubly-linked list of peers that are core-approved
224 * to send their next message.
226 struct PeerRecord *ready_peer_tail;
229 * Hash map listing all of the peers that we are currently
232 struct GNUNET_CONTAINER_MultiHashMap *peers;
235 * Identity of this peer.
237 struct GNUNET_PeerIdentity me;
240 * ID of reconnect task (if any).
242 GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
245 * Current delay we use for re-trying to connect to core.
247 struct GNUNET_TIME_Relative retry_backoff;
250 * Number of messages we are allowed to queue per target.
252 unsigned int queue_size;
255 * Number of entries in the handlers array.
260 * For inbound notifications without a specific handler, do
261 * we expect to only receive headers?
263 int inbound_hdr_only;
266 * For outbound notifications without a specific handler, do
267 * we expect to only receive headers?
269 int outbound_hdr_only;
272 * Are we currently disconnected and hence unable to forward
281 * Handle for a transmission request.
283 struct GNUNET_CORE_TransmitHandle
287 * We keep active transmit handles in a doubly-linked list.
289 struct GNUNET_CORE_TransmitHandle *next;
292 * We keep active transmit handles in a doubly-linked list.
294 struct GNUNET_CORE_TransmitHandle *prev;
297 * Corresponding peer record.
299 struct PeerRecord *peer;
302 * Corresponding SEND_REQUEST message. Only non-NULL
303 * while SEND_REQUEST message is pending.
305 struct ControlMessage *cm;
308 * Function that will be called to get the actual request
309 * (once we are ready to transmit this request to the core).
310 * The function will be called with a NULL buffer to signal
313 GNUNET_CONNECTION_TransmitReadyNotify get_message;
316 * Closure for get_message.
318 void *get_message_cls;
321 * Timeout for this handle.
323 struct GNUNET_TIME_Absolute timeout;
326 * How important is this message?
331 * Size of this request.
336 * Send message request ID for this request.
341 * Is corking allowed?
349 * Our current client connection went down. Clean it up
350 * and try to reconnect!
352 * @param h our handle to the core service
355 reconnect (struct GNUNET_CORE_Handle *h);
359 * Task schedule to try to re-connect to core.
361 * @param cls the 'struct GNUNET_CORE_Handle'
362 * @param tc task context
365 reconnect_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
367 struct GNUNET_CORE_Handle *h = cls;
369 h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
371 LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to CORE service after delay\n");
378 * Notify clients about disconnect and free
379 * the entry for connected peer.
381 * @param cls the 'struct GNUNET_CORE_Handle*'
382 * @param key the peer identity (not used)
383 * @param value the 'struct PeerRecord' to free.
384 * @return GNUNET_YES (continue)
387 disconnect_and_free_peer_entry (void *cls, const GNUNET_HashCode * key,
390 struct GNUNET_CORE_Handle *h = cls;
391 struct GNUNET_CORE_TransmitHandle *th;
392 struct PeerRecord *pr = value;
394 if (pr->timeout_task != GNUNET_SCHEDULER_NO_TASK)
396 GNUNET_SCHEDULER_cancel (pr->timeout_task);
397 pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
399 if (pr->ntr_task != GNUNET_SCHEDULER_NO_TASK)
401 GNUNET_SCHEDULER_cancel (pr->ntr_task);
402 pr->ntr_task = GNUNET_SCHEDULER_NO_TASK;
404 if ((pr->prev != NULL) || (pr->next != NULL) || (h->ready_peer_head == pr))
405 GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr);
406 if (h->disconnects != NULL)
407 h->disconnects (h->cls, &pr->peer);
408 /* all requests should have been cancelled, clean up anyway, just in case */
409 GNUNET_break (pr->queue_size == 0);
410 while (NULL != (th = pr->pending_head))
413 GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, th);
419 /* done with 'voluntary' cleanups, now on to normal freeing */
420 GNUNET_assert (GNUNET_YES ==
421 GNUNET_CONTAINER_multihashmap_remove (h->peers, key, pr));
422 GNUNET_assert (pr->pending_head == NULL);
423 GNUNET_assert (pr->pending_tail == NULL);
424 GNUNET_assert (pr->ch == h);
425 GNUNET_assert (pr->queue_size == 0);
426 GNUNET_assert (pr->timeout_task == GNUNET_SCHEDULER_NO_TASK);
427 GNUNET_assert (pr->ntr_task == GNUNET_SCHEDULER_NO_TASK);
434 * Close down any existing connection to the CORE service and
435 * try re-establishing it later.
437 * @param h our handle
440 reconnect_later (struct GNUNET_CORE_Handle *h)
442 struct ControlMessage *cm;
443 struct PeerRecord *pr;
445 GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
448 GNUNET_CLIENT_notify_transmit_ready_cancel (h->cth);
451 if (h->client != NULL)
453 GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
456 h->currently_down = GNUNET_YES;
457 GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
459 GNUNET_SCHEDULER_add_delayed (h->retry_backoff, &reconnect_task, h);
460 while (NULL != (cm = h->control_pending_head))
462 GNUNET_CONTAINER_DLL_remove (h->control_pending_head,
463 h->control_pending_tail, cm);
466 if (cm->cont != NULL)
467 cm->cont (cm->cont_cls, GNUNET_NO);
470 GNUNET_CONTAINER_multihashmap_iterate (h->peers,
471 &disconnect_and_free_peer_entry, h);
472 while (NULL != (pr = h->ready_peer_head))
473 GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr);
474 GNUNET_assert (h->control_pending_head == NULL);
476 GNUNET_TIME_relative_min (GNUNET_TIME_UNIT_SECONDS, h->retry_backoff);
477 h->retry_backoff = GNUNET_TIME_relative_multiply (h->retry_backoff, 2);
482 * Check the list of pending requests, send the next
485 * @param h core handle
486 * @param ignore_currently_down transmit message even if not initialized?
489 trigger_next_request (struct GNUNET_CORE_Handle *h, int ignore_currently_down);
493 * The given request hit its timeout. Remove from the
494 * doubly-linked list and call the respective continuation.
496 * @param cls the transmit handle of the request that timed out
497 * @param tc context, can be NULL (!)
500 transmission_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
504 * Send a control message to the peer asking for transmission
505 * of the message in the given peer record.
507 * @param pr peer to request transmission to
510 request_next_transmission (struct PeerRecord *pr)
512 struct GNUNET_CORE_Handle *h = pr->ch;
513 struct ControlMessage *cm;
514 struct SendMessageRequest *smr;
515 struct GNUNET_CORE_TransmitHandle *th;
517 if (pr->timeout_task != GNUNET_SCHEDULER_NO_TASK)
519 GNUNET_SCHEDULER_cancel (pr->timeout_task);
520 pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
522 if (NULL == (th = pr->pending_head))
524 trigger_next_request (h, GNUNET_NO);
528 return; /* already done */
529 GNUNET_assert (pr->prev == NULL);
530 GNUNET_assert (pr->next == NULL);
532 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining
533 (th->timeout), &transmission_timeout, pr);
534 cm = GNUNET_malloc (sizeof (struct ControlMessage) +
535 sizeof (struct SendMessageRequest));
538 smr = (struct SendMessageRequest *) &cm[1];
539 smr->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND_REQUEST);
540 smr->header.size = htons (sizeof (struct SendMessageRequest));
541 smr->priority = htonl (th->priority);
542 smr->deadline = GNUNET_TIME_absolute_hton (th->timeout);
543 smr->peer = pr->peer;
544 smr->queue_size = htonl (pr->queue_size);
545 smr->size = htons (th->msize);
546 smr->smr_id = htons (th->smr_id = pr->smr_id_gen++);
547 GNUNET_CONTAINER_DLL_insert_tail (h->control_pending_head,
548 h->control_pending_tail, cm);
550 LOG (GNUNET_ERROR_TYPE_DEBUG,
551 "Adding SEND REQUEST for peer `%s' to message queue\n",
552 GNUNET_i2s (&pr->peer));
554 trigger_next_request (h, GNUNET_NO);
559 * The given request hit its timeout. Remove from the
560 * doubly-linked list and call the respective continuation.
562 * @param cls the transmit handle of the request that timed out
563 * @param tc context, can be NULL (!)
566 transmission_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
568 struct PeerRecord *pr = cls;
569 struct GNUNET_CORE_Handle *h = pr->ch;
570 struct GNUNET_CORE_TransmitHandle *th;
572 pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
573 th = pr->pending_head;
574 GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, th);
576 if ((pr->prev != NULL) || (pr->next != NULL) || (pr == h->ready_peer_head))
578 /* the request that was 'approved' by core was
579 * canceled before it could be transmitted; remove
580 * us from the 'ready' list */
581 GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr);
584 LOG (GNUNET_ERROR_TYPE_DEBUG,
585 "Signalling timeout of request for transmission to CORE service\n");
587 request_next_transmission (pr);
588 GNUNET_assert (0 == th->get_message (th->get_message_cls, 0, NULL));
594 * Transmit the next message to the core service.
597 transmit_message (void *cls, size_t size, void *buf)
599 struct GNUNET_CORE_Handle *h = cls;
600 struct ControlMessage *cm;
601 struct GNUNET_CORE_TransmitHandle *th;
602 struct PeerRecord *pr;
603 struct SendMessage *sm;
604 const struct GNUNET_MessageHeader *hdr;
608 GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
613 LOG (GNUNET_ERROR_TYPE_DEBUG,
614 "Transmission failed, initiating reconnect\n");
619 /* first check for control messages */
620 if (NULL != (cm = h->control_pending_head))
622 hdr = (const struct GNUNET_MessageHeader *) &cm[1];
623 msize = ntohs (hdr->size);
626 trigger_next_request (h, GNUNET_NO);
630 LOG (GNUNET_ERROR_TYPE_DEBUG,
631 "Transmitting control message with %u bytes of type %u to core.\n",
632 (unsigned int) msize, (unsigned int) ntohs (hdr->type));
634 memcpy (buf, hdr, msize);
635 GNUNET_CONTAINER_DLL_remove (h->control_pending_head,
636 h->control_pending_tail, cm);
639 if (NULL != cm->cont)
640 cm->cont (cm->cont_cls, GNUNET_OK);
642 trigger_next_request (h, GNUNET_NO);
645 /* now check for 'ready' P2P messages */
646 if (NULL != (pr = h->ready_peer_head))
648 GNUNET_assert (pr->pending_head != NULL);
649 th = pr->pending_head;
650 if (size < th->msize + sizeof (struct SendMessage))
652 trigger_next_request (h, GNUNET_NO);
655 GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr);
656 GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, th);
658 if (pr->timeout_task != GNUNET_SCHEDULER_NO_TASK)
660 GNUNET_SCHEDULER_cancel (pr->timeout_task);
661 pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
664 LOG (GNUNET_ERROR_TYPE_DEBUG,
665 "Transmitting SEND request to `%s' with %u bytes.\n",
666 GNUNET_i2s (&pr->peer), (unsigned int) th->msize);
668 sm = (struct SendMessage *) buf;
669 sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND);
670 sm->priority = htonl (th->priority);
671 sm->deadline = GNUNET_TIME_absolute_hton (th->timeout);
673 sm->cork = htonl ((uint32_t) th->cork);
674 sm->reserved = htonl (0);
676 th->get_message (th->get_message_cls,
677 size - sizeof (struct SendMessage), &sm[1]);
680 LOG (GNUNET_ERROR_TYPE_DEBUG,
681 "Transmitting SEND request to `%s' yielded %u bytes.\n",
682 GNUNET_i2s (&pr->peer), ret);
688 LOG (GNUNET_ERROR_TYPE_DEBUG,
689 "Size of clients message to peer %s is 0!\n",
690 GNUNET_i2s (&pr->peer));
692 /* client decided to send nothing! */
693 request_next_transmission (pr);
697 LOG (GNUNET_ERROR_TYPE_DEBUG,
698 "Produced SEND message to core with %u bytes payload\n",
701 GNUNET_assert (ret >= sizeof (struct GNUNET_MessageHeader));
702 if (ret + sizeof (struct SendMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
705 request_next_transmission (pr);
708 ret += sizeof (struct SendMessage);
709 sm->header.size = htons (ret);
710 GNUNET_assert (ret <= size);
711 request_next_transmission (pr);
719 * Check the list of pending requests, send the next
722 * @param h core handle
723 * @param ignore_currently_down transmit message even if not initialized?
726 trigger_next_request (struct GNUNET_CORE_Handle *h, int ignore_currently_down)
730 if ((GNUNET_YES == h->currently_down) && (ignore_currently_down == GNUNET_NO))
733 LOG (GNUNET_ERROR_TYPE_DEBUG,
734 "Core connection down, not processing queue\n");
741 LOG (GNUNET_ERROR_TYPE_DEBUG, "Request pending, not processing queue\n");
745 if (h->control_pending_head != NULL)
747 ntohs (((struct GNUNET_MessageHeader *) &h->
748 control_pending_head[1])->size);
749 else if (h->ready_peer_head != NULL)
751 h->ready_peer_head->pending_head->msize + sizeof (struct SendMessage);
755 LOG (GNUNET_ERROR_TYPE_DEBUG,
756 "Request queue empty, not processing queue\n");
758 return; /* no pending message */
761 GNUNET_CLIENT_notify_transmit_ready (h->client, msize,
762 GNUNET_TIME_UNIT_FOREVER_REL,
763 GNUNET_NO, &transmit_message, h);
768 * Handler for notification messages received from the core.
770 * @param cls our "struct GNUNET_CORE_Handle"
771 * @param msg the message received from the core service
774 main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg)
776 struct GNUNET_CORE_Handle *h = cls;
777 const struct InitReplyMessage *m;
778 const struct ConnectNotifyMessage *cnm;
779 const struct DisconnectNotifyMessage *dnm;
780 const struct NotifyTrafficMessage *ntm;
781 const struct GNUNET_MessageHeader *em;
782 const struct SendMessageReady *smr;
783 const struct GNUNET_CORE_MessageHandler *mh;
784 const struct GNUNET_ATS_Information *ats;
785 GNUNET_CORE_StartupCallback init;
786 struct PeerRecord *pr;
787 struct GNUNET_CORE_TransmitHandle *th;
796 LOG (GNUNET_ERROR_TYPE_INFO,
798 ("Client was disconnected from core service, trying to reconnect.\n"));
802 msize = ntohs (msg->size);
804 LOG (GNUNET_ERROR_TYPE_DEBUG,
805 "Processing message of type %u and size %u from core service\n",
806 ntohs (msg->type), msize);
808 switch (ntohs (msg->type))
810 case GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY:
811 if (ntohs (msg->size) != sizeof (struct InitReplyMessage))
817 m = (const struct InitReplyMessage *) msg;
818 GNUNET_break (0 == ntohl (m->reserved));
819 /* start our message processing loop */
820 if (GNUNET_YES == h->currently_down)
822 h->currently_down = GNUNET_NO;
823 trigger_next_request (h, GNUNET_NO);
825 h->retry_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
826 h->me = m->my_identity;
827 if (NULL != (init = h->init))
829 /* mark so we don't call init on reconnect */
832 LOG (GNUNET_ERROR_TYPE_DEBUG, "Connected to core service of peer `%s'.\n",
833 GNUNET_i2s (&h->me));
835 init (h->cls, h, &h->me);
840 LOG (GNUNET_ERROR_TYPE_DEBUG,
841 "Successfully reconnected to core service.\n");
844 /* fake 'connect to self' */
845 pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &h->me.hashPubKey);
846 GNUNET_assert (pr == NULL);
847 pr = GNUNET_malloc (sizeof (struct PeerRecord));
850 GNUNET_assert (GNUNET_YES ==
851 GNUNET_CONTAINER_multihashmap_put (h->peers,
852 &h->me.hashPubKey, pr,
853 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
854 if (NULL != h->connects)
855 h->connects (h->cls, &h->me, NULL, 0);
857 case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT:
858 if (msize < sizeof (struct ConnectNotifyMessage))
864 cnm = (const struct ConnectNotifyMessage *) msg;
865 ats_count = ntohl (cnm->ats_count);
867 sizeof (struct ConnectNotifyMessage) +
868 ats_count * sizeof (struct GNUNET_ATS_Information))
875 LOG (GNUNET_ERROR_TYPE_DEBUG,
876 "Received notification about connection from `%s'.\n",
877 GNUNET_i2s (&cnm->peer));
879 if (0 == memcmp (&h->me, &cnm->peer, sizeof (struct GNUNET_PeerIdentity)))
881 /* connect to self!? */
885 pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &cnm->peer.hashPubKey);
892 pr = GNUNET_malloc (sizeof (struct PeerRecord));
893 pr->peer = cnm->peer;
895 GNUNET_assert (GNUNET_YES ==
896 GNUNET_CONTAINER_multihashmap_put (h->peers,
897 &cnm->peer.hashPubKey, pr,
898 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
899 ats = (const struct GNUNET_ATS_Information *) &cnm[1];
900 if (NULL != h->connects)
901 h->connects (h->cls, &cnm->peer, ats, ats_count);
903 case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT:
904 if (msize != sizeof (struct DisconnectNotifyMessage))
910 dnm = (const struct DisconnectNotifyMessage *) msg;
911 if (0 == memcmp (&h->me, &dnm->peer, sizeof (struct GNUNET_PeerIdentity)))
913 /* connection to self!? */
917 GNUNET_break (0 == ntohl (dnm->reserved));
919 LOG (GNUNET_ERROR_TYPE_DEBUG,
920 "Received notification about disconnect from `%s'.\n",
921 GNUNET_i2s (&dnm->peer));
923 pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &dnm->peer.hashPubKey);
930 trigger = ((pr->prev != NULL) || (pr->next != NULL) ||
931 (h->ready_peer_head == pr));
932 disconnect_and_free_peer_entry (h, &dnm->peer.hashPubKey, pr);
934 trigger_next_request (h, GNUNET_NO);
936 case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND:
937 if (msize < sizeof (struct NotifyTrafficMessage))
943 ntm = (const struct NotifyTrafficMessage *) msg;
945 ats_count = ntohl (ntm->ats_count);
947 sizeof (struct NotifyTrafficMessage) +
948 ats_count * sizeof (struct GNUNET_ATS_Information) +
949 sizeof (struct GNUNET_MessageHeader)) ||
950 (GNUNET_ATS_ARRAY_TERMINATOR != ntohl ((&ntm->ats)[ats_count].type)))
956 em = (const struct GNUNET_MessageHeader *) &(&ntm->ats)[ats_count + 1];
958 LOG (GNUNET_ERROR_TYPE_DEBUG,
959 "Received message of type %u and size %u from peer `%4s'\n",
960 ntohs (em->type), ntohs (em->size), GNUNET_i2s (&ntm->peer));
962 pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &ntm->peer.hashPubKey);
969 if ((GNUNET_NO == h->inbound_hdr_only) &&
971 ntohs (em->size) + sizeof (struct NotifyTrafficMessage) +
972 +ats_count * sizeof (struct GNUNET_ATS_Information)))
978 et = ntohs (em->type);
979 for (hpos = 0; hpos < h->hcnt; hpos++)
981 mh = &h->handlers[hpos];
984 if ((mh->expected_size != ntohs (em->size)) && (mh->expected_size != 0))
986 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
987 "Unexpected message size for message of type %u\n",
993 h->handlers[hpos].callback (h->cls, &ntm->peer, em, &ntm->ats,
996 /* error in processing, do not process other messages! */
1000 if (NULL != h->inbound_notify)
1001 h->inbound_notify (h->cls, &ntm->peer, em, &ntm->ats, ats_count);
1003 case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_OUTBOUND:
1004 if (msize < sizeof (struct NotifyTrafficMessage))
1007 reconnect_later (h);
1010 ntm = (const struct NotifyTrafficMessage *) msg;
1011 if (0 == memcmp (&h->me, &ntm->peer, sizeof (struct GNUNET_PeerIdentity)))
1017 ats_count = ntohl (ntm->ats_count);
1019 sizeof (struct NotifyTrafficMessage) +
1020 ats_count * sizeof (struct GNUNET_ATS_Information) +
1021 sizeof (struct GNUNET_MessageHeader)) ||
1022 (GNUNET_ATS_ARRAY_TERMINATOR != ntohl ((&ntm->ats)[ats_count].type)))
1025 reconnect_later (h);
1028 em = (const struct GNUNET_MessageHeader *) &(&ntm->ats)[ats_count + 1];
1029 pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &ntm->peer.hashPubKey);
1033 reconnect_later (h);
1037 LOG (GNUNET_ERROR_TYPE_DEBUG,
1038 "Received notification about transmission to `%s'.\n",
1039 GNUNET_i2s (&ntm->peer));
1041 if ((GNUNET_NO == h->outbound_hdr_only) &&
1043 ntohs (em->size) + sizeof (struct NotifyTrafficMessage) +
1044 ats_count * sizeof (struct GNUNET_ATS_Information)))
1047 reconnect_later (h);
1050 if (NULL == h->outbound_notify)
1055 h->outbound_notify (h->cls, &ntm->peer, em, &ntm->ats, ats_count);
1057 case GNUNET_MESSAGE_TYPE_CORE_SEND_READY:
1058 if (msize != sizeof (struct SendMessageReady))
1061 reconnect_later (h);
1064 smr = (const struct SendMessageReady *) msg;
1065 pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &smr->peer.hashPubKey);
1069 reconnect_later (h);
1073 LOG (GNUNET_ERROR_TYPE_DEBUG,
1074 "Received notification about transmission readiness to `%s'.\n",
1075 GNUNET_i2s (&smr->peer));
1077 if (pr->pending_head == NULL)
1079 /* request must have been cancelled between the original request
1080 * and the response from core, ignore core's readiness */
1084 th = pr->pending_head;
1085 if (ntohs (smr->smr_id) != th->smr_id)
1087 /* READY message is for expired or cancelled message,
1088 * ignore! (we should have already sent another request) */
1091 if ((pr->prev != NULL) || (pr->next != NULL) || (h->ready_peer_head == pr))
1093 /* we should not already be on the ready list... */
1095 reconnect_later (h);
1098 GNUNET_CONTAINER_DLL_insert (h->ready_peer_head, h->ready_peer_tail, pr);
1099 trigger_next_request (h, GNUNET_NO);
1102 reconnect_later (h);
1105 GNUNET_CLIENT_receive (h->client, &main_notify_handler, h,
1106 GNUNET_TIME_UNIT_FOREVER_REL);
1111 * Task executed once we are done transmitting the INIT message.
1112 * Starts our 'receive' loop.
1114 * @param cls the 'struct GNUNET_CORE_Handle'
1115 * @param success were we successful
1118 init_done_task (void *cls, int success)
1120 struct GNUNET_CORE_Handle *h = cls;
1122 if (success == GNUNET_SYSERR)
1123 return; /* shutdown */
1124 if (success == GNUNET_NO)
1127 LOG (GNUNET_ERROR_TYPE_DEBUG,
1128 "Failed to exchange INIT with core, retrying\n");
1130 if (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK)
1131 reconnect_later (h);
1134 GNUNET_CLIENT_receive (h->client, &main_notify_handler, h,
1135 GNUNET_TIME_UNIT_FOREVER_REL);
1140 * Our current client connection went down. Clean it up
1141 * and try to reconnect!
1143 * @param h our handle to the core service
1146 reconnect (struct GNUNET_CORE_Handle *h)
1148 struct ControlMessage *cm;
1149 struct InitMessage *init;
1156 LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnecting to CORE service\n");
1158 GNUNET_assert (h->client == NULL);
1159 GNUNET_assert (h->currently_down == GNUNET_YES);
1160 h->client = GNUNET_CLIENT_connect ("core", h->cfg);
1161 if (h->client == NULL)
1163 reconnect_later (h);
1166 msize = h->hcnt * sizeof (uint16_t) + sizeof (struct InitMessage);
1167 cm = GNUNET_malloc (sizeof (struct ControlMessage) + msize);
1168 cm->cont = &init_done_task;
1170 init = (struct InitMessage *) &cm[1];
1171 init->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_INIT);
1172 init->header.size = htons (msize);
1174 if (h->inbound_notify != NULL)
1176 if (h->inbound_hdr_only)
1177 opt |= GNUNET_CORE_OPTION_SEND_HDR_INBOUND;
1179 opt |= GNUNET_CORE_OPTION_SEND_FULL_INBOUND;
1181 if (h->outbound_notify != NULL)
1183 if (h->outbound_hdr_only)
1184 opt |= GNUNET_CORE_OPTION_SEND_HDR_OUTBOUND;
1186 opt |= GNUNET_CORE_OPTION_SEND_FULL_OUTBOUND;
1188 init->options = htonl (opt);
1189 ts = (uint16_t *) & init[1];
1190 for (hpos = 0; hpos < h->hcnt; hpos++)
1191 ts[hpos] = htons (h->handlers[hpos].type);
1192 GNUNET_CONTAINER_DLL_insert (h->control_pending_head, h->control_pending_tail,
1194 trigger_next_request (h, GNUNET_YES);
1200 * Connect to the core service. Note that the connection may
1201 * complete (or fail) asynchronously.
1203 * @param cfg configuration to use
1204 * @param queue_size size of the per-peer message queue
1205 * @param cls closure for the various callbacks that follow (including handlers in the handlers array)
1206 * @param init callback to call on timeout or once we have successfully
1207 * connected to the core service; note that timeout is only meaningful if init is not NULL
1208 * @param connects function to call on peer connect, can be NULL
1209 * @param disconnects function to call on peer disconnect / timeout, can be NULL
1210 * @param inbound_notify function to call for all inbound messages, can be NULL
1211 * @param inbound_hdr_only set to GNUNET_YES if inbound_notify will only read the
1212 * GNUNET_MessageHeader and hence we do not need to give it the full message;
1213 * can be used to improve efficiency, ignored if inbound_notify is NULLL
1214 * @param outbound_notify function to call for all outbound messages, can be NULL
1215 * @param outbound_hdr_only set to GNUNET_YES if outbound_notify will only read the
1216 * GNUNET_MessageHeader and hence we do not need to give it the full message
1217 * can be used to improve efficiency, ignored if outbound_notify is NULLL
1218 * @param handlers callbacks for messages we care about, NULL-terminated
1219 * @return handle to the core service (only useful for disconnect until 'init' is called);
1220 * NULL on error (in this case, init is never called)
1222 struct GNUNET_CORE_Handle *
1223 GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
1224 unsigned int queue_size, void *cls,
1225 GNUNET_CORE_StartupCallback init,
1226 GNUNET_CORE_ConnectEventHandler connects,
1227 GNUNET_CORE_DisconnectEventHandler disconnects,
1228 GNUNET_CORE_MessageCallback inbound_notify,
1229 int inbound_hdr_only,
1230 GNUNET_CORE_MessageCallback outbound_notify,
1231 int outbound_hdr_only,
1232 const struct GNUNET_CORE_MessageHandler *handlers)
1234 struct GNUNET_CORE_Handle *h;
1236 h = GNUNET_malloc (sizeof (struct GNUNET_CORE_Handle));
1238 h->queue_size = queue_size;
1241 h->connects = connects;
1242 h->disconnects = disconnects;
1243 h->inbound_notify = inbound_notify;
1244 h->outbound_notify = outbound_notify;
1245 h->inbound_hdr_only = inbound_hdr_only;
1246 h->outbound_hdr_only = outbound_hdr_only;
1247 h->handlers = handlers;
1249 h->currently_down = GNUNET_YES;
1250 h->peers = GNUNET_CONTAINER_multihashmap_create (128);
1251 h->retry_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
1252 if (NULL != handlers)
1253 while (handlers[h->hcnt].callback != NULL)
1255 GNUNET_assert (h->hcnt <
1256 (GNUNET_SERVER_MAX_MESSAGE_SIZE -
1257 sizeof (struct InitMessage)) / sizeof (uint16_t));
1259 LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to CORE service\n");
1267 * Disconnect from the core service. This function can only
1268 * be called *after* all pending 'GNUNET_CORE_notify_transmit_ready'
1269 * requests have been explicitly canceled.
1271 * @param handle connection to core to disconnect
1274 GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle)
1276 struct ControlMessage *cm;
1279 LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting from CORE service\n");
1281 if (handle->cth != NULL)
1283 GNUNET_CLIENT_notify_transmit_ready_cancel (handle->cth);
1286 while (NULL != (cm = handle->control_pending_head))
1288 GNUNET_CONTAINER_DLL_remove (handle->control_pending_head,
1289 handle->control_pending_tail, cm);
1292 if (cm->cont != NULL)
1293 cm->cont (cm->cont_cls, GNUNET_SYSERR);
1296 if (handle->client != NULL)
1298 GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
1299 handle->client = NULL;
1301 GNUNET_CONTAINER_multihashmap_iterate (handle->peers,
1302 &disconnect_and_free_peer_entry,
1304 if (handle->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
1306 GNUNET_SCHEDULER_cancel (handle->reconnect_task);
1307 handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
1309 GNUNET_CONTAINER_multihashmap_destroy (handle->peers);
1310 handle->peers = NULL;
1311 GNUNET_break (handle->ready_peer_head == NULL);
1312 GNUNET_free (handle);
1317 * Task that calls 'request_next_transmission'.
1319 * @param cls the 'struct PeerRecord*'
1320 * @param tc scheduler context
1323 run_request_next_transmission (void *cls,
1324 const struct GNUNET_SCHEDULER_TaskContext *tc)
1326 struct PeerRecord *pr = cls;
1328 pr->ntr_task = GNUNET_SCHEDULER_NO_TASK;
1329 request_next_transmission (pr);
1334 * Ask the core to call "notify" once it is ready to transmit the
1335 * given number of bytes to the specified "target". Must only be
1336 * called after a connection to the respective peer has been
1337 * established (and the client has been informed about this).
1339 * @param handle connection to core service
1340 * @param cork is corking allowed for this transmission?
1341 * @param priority how important is the message?
1342 * @param maxdelay how long can the message wait?
1343 * @param target who should receive the message,
1344 * use NULL for this peer (loopback)
1345 * @param notify_size how many bytes of buffer space does notify want?
1346 * @param notify function to call when buffer space is available
1347 * @param notify_cls closure for notify
1348 * @return non-NULL if the notify callback was queued,
1349 * NULL if we can not even queue the request (insufficient
1350 * memory); if NULL is returned, "notify" will NOT be called.
1352 struct GNUNET_CORE_TransmitHandle *
1353 GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle, int cork,
1355 struct GNUNET_TIME_Relative maxdelay,
1356 const struct GNUNET_PeerIdentity *target,
1358 GNUNET_CONNECTION_TransmitReadyNotify notify,
1361 struct PeerRecord *pr;
1362 struct GNUNET_CORE_TransmitHandle *th;
1363 struct GNUNET_CORE_TransmitHandle *pos;
1364 struct GNUNET_CORE_TransmitHandle *prev;
1365 struct GNUNET_CORE_TransmitHandle *minp;
1367 pr = GNUNET_CONTAINER_multihashmap_get (handle->peers, &target->hashPubKey);
1370 /* attempt to send to peer that is not connected */
1371 LOG (GNUNET_ERROR_TYPE_WARNING,
1372 "Attempting to send to peer `%s' from peer `%s', but not connected!\n",
1373 GNUNET_i2s (target), GNUNET_h2s (&handle->me.hashPubKey));
1377 GNUNET_assert (notify_size + sizeof (struct SendMessage) <
1378 GNUNET_SERVER_MAX_MESSAGE_SIZE);
1379 th = GNUNET_malloc (sizeof (struct GNUNET_CORE_TransmitHandle));
1381 GNUNET_assert (NULL != notify);
1382 th->get_message = notify;
1383 th->get_message_cls = notify_cls;
1384 th->timeout = GNUNET_TIME_relative_to_absolute (maxdelay);
1385 th->priority = priority;
1386 th->msize = notify_size;
1388 /* bound queue size */
1389 if (pr->queue_size == handle->queue_size)
1391 /* find lowest-priority entry, but skip the head of the list */
1392 minp = pr->pending_head->next;
1394 while (prev != NULL)
1396 if (prev->priority < minp->priority)
1402 GNUNET_break (handle->queue_size != 0);
1403 GNUNET_break (pr->queue_size == 1);
1406 LOG (GNUNET_ERROR_TYPE_DEBUG,
1407 "Dropping transmission request: cannot drop queue head and limit is one\n");
1411 if (priority <= minp->priority)
1414 LOG (GNUNET_ERROR_TYPE_DEBUG,
1415 "Dropping transmission request: priority too low\n");
1418 return NULL; /* priority too low */
1420 GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, minp);
1422 GNUNET_assert (0 == minp->get_message (minp->get_message_cls, 0, NULL));
1426 /* Order entries by deadline, but SKIP 'HEAD' if
1427 * we're in the 'ready_peer_*' DLL */
1428 pos = pr->pending_head;
1429 if ((pr->prev != NULL) || (pr->next != NULL) ||
1430 (pr == handle->ready_peer_head))
1432 GNUNET_assert (pos != NULL);
1433 pos = pos->next; /* skip head */
1436 /* insertion sort */
1438 while ((pos != NULL) && (pos->timeout.abs_value < th->timeout.abs_value))
1443 GNUNET_CONTAINER_DLL_insert_after (pr->pending_head, pr->pending_tail, prev,
1446 /* was the request queue previously empty? */
1448 LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmission request added to queue\n");
1450 if ((pr->pending_head == th) && (pr->ntr_task == GNUNET_SCHEDULER_NO_TASK) &&
1451 (pr->next == NULL) && (pr->prev == NULL) &&
1452 (handle->ready_peer_head != pr))
1454 GNUNET_SCHEDULER_add_now (&run_request_next_transmission, pr);
1460 * Cancel the specified transmission-ready notification.
1462 * @param th handle that was returned by "notify_transmit_ready".
1465 GNUNET_CORE_notify_transmit_ready_cancel (struct GNUNET_CORE_TransmitHandle *th)
1467 struct PeerRecord *pr = th->peer;
1468 struct GNUNET_CORE_Handle *h = pr->ch;
1471 was_head = (pr->pending_head == th);
1472 GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, th);
1476 /* we're currently in the control queue, remove */
1477 GNUNET_CONTAINER_DLL_remove (h->control_pending_head,
1478 h->control_pending_tail, th->cm);
1479 GNUNET_free (th->cm);
1484 if ((pr->prev != NULL) || (pr->next != NULL) || (pr == h->ready_peer_head))
1486 /* the request that was 'approved' by core was
1487 * canceled before it could be transmitted; remove
1488 * us from the 'ready' list */
1489 GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr);
1491 request_next_transmission (pr);
1496 /* end of core_api.c */