2 This file is part of GNUnet.
3 (C) 2009, 2010 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file core/core_api.c
23 * @brief core service; this is the main API for encrypted P2P
25 * @author Christian Grothoff
28 #include "gnunet_constants.h"
29 #include "gnunet_core_service.h"
32 #define LOG(kind,...) GNUNET_log_from (kind, "core-api",__VA_ARGS__)
35 * Information we track for each peer.
41 * We generally do NOT keep peer records in a DLL; this
42 * DLL is only used IF this peer's 'pending_head' message
43 * is ready for transmission.
45 struct PeerRecord *prev;
48 * We generally do NOT keep peer records in a DLL; this
49 * DLL is only used IF this peer's 'pending_head' message
50 * is ready for transmission.
52 struct PeerRecord *next;
55 * Peer the record is about.
57 struct GNUNET_PeerIdentity peer;
60 * Corresponding core handle.
62 struct GNUNET_CORE_Handle *ch;
65 * Head of doubly-linked list of pending requests.
66 * Requests are sorted by deadline *except* for HEAD,
67 * which is only modified upon transmission to core.
69 struct GNUNET_CORE_TransmitHandle *pending_head;
72 * Tail of doubly-linked list of pending requests.
74 struct GNUNET_CORE_TransmitHandle *pending_tail;
77 * ID of timeout task for the 'pending_head' handle
78 * which is the one with the smallest timeout.
80 GNUNET_SCHEDULER_TaskIdentifier timeout_task;
83 * ID of task to run 'next_request_transmission'.
85 GNUNET_SCHEDULER_TaskIdentifier ntr_task;
88 * Current size of the queue of pending requests.
90 unsigned int queue_size;
93 * SendMessageRequest ID generator for this peer.
101 * Type of function called upon completion.
104 * @param success GNUNET_OK on success (which for request_connect
105 * ONLY means that we transmitted the connect request to CORE,
106 * it does not mean that we are actually now connected!);
107 * GNUNET_NO on timeout,
108 * GNUNET_SYSERR if core was shut down
110 typedef void (*GNUNET_CORE_ControlContinuation) (void *cls, int success);
114 * Entry in a doubly-linked list of control messages to be transmitted
115 * to the core service. Control messages include traffic allocation,
116 * connection requests and of course our initial 'init' request.
118 * The actual message is allocated at the end of this struct.
120 struct ControlMessage
123 * This is a doubly-linked list.
125 struct ControlMessage *next;
128 * This is a doubly-linked list.
130 struct ControlMessage *prev;
133 * Function to run after transmission failed/succeeded.
135 GNUNET_CORE_ControlContinuation cont;
138 * Closure for 'cont'.
143 * Transmit handle (if one is associated with this ControlMessage), or NULL.
145 struct GNUNET_CORE_TransmitHandle *th;
151 * Context for the core service connection.
153 struct GNUNET_CORE_Handle
157 * Configuration we're using.
159 const struct GNUNET_CONFIGURATION_Handle *cfg;
162 * Closure for the various callbacks.
167 * Function to call once we've handshaked with the core service.
169 GNUNET_CORE_StartupCallback init;
172 * Function to call whenever we're notified about a peer connecting.
174 GNUNET_CORE_ConnectEventHandler connects;
177 * Function to call whenever we're notified about a peer disconnecting.
179 GNUNET_CORE_DisconnectEventHandler disconnects;
182 * Function to call whenever we receive an inbound message.
184 GNUNET_CORE_MessageCallback inbound_notify;
187 * Function to call whenever we receive an outbound message.
189 GNUNET_CORE_MessageCallback outbound_notify;
192 * Function handlers for messages of particular type.
194 const struct GNUNET_CORE_MessageHandler *handlers;
197 * Our connection to the service.
199 struct GNUNET_CLIENT_Connection *client;
202 * Handle for our current transmission request.
204 struct GNUNET_CLIENT_TransmitHandle *cth;
207 * Head of doubly-linked list of pending requests.
209 struct ControlMessage *control_pending_head;
212 * Tail of doubly-linked list of pending requests.
214 struct ControlMessage *control_pending_tail;
217 * Head of doubly-linked list of peers that are core-approved
218 * to send their next message.
220 struct PeerRecord *ready_peer_head;
223 * Tail of doubly-linked list of peers that are core-approved
224 * to send their next message.
226 struct PeerRecord *ready_peer_tail;
229 * Hash map listing all of the peers that we are currently
232 struct GNUNET_CONTAINER_MultiHashMap *peers;
235 * Identity of this peer.
237 struct GNUNET_PeerIdentity me;
240 * ID of reconnect task (if any).
242 GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
245 * Current delay we use for re-trying to connect to core.
247 struct GNUNET_TIME_Relative retry_backoff;
250 * Number of messages we are allowed to queue per target.
252 unsigned int queue_size;
255 * Number of entries in the handlers array.
260 * For inbound notifications without a specific handler, do
261 * we expect to only receive headers?
263 int inbound_hdr_only;
266 * For outbound notifications without a specific handler, do
267 * we expect to only receive headers?
269 int outbound_hdr_only;
272 * Are we currently disconnected and hence unable to forward
281 * Handle for a transmission request.
283 struct GNUNET_CORE_TransmitHandle
287 * We keep active transmit handles in a doubly-linked list.
289 struct GNUNET_CORE_TransmitHandle *next;
292 * We keep active transmit handles in a doubly-linked list.
294 struct GNUNET_CORE_TransmitHandle *prev;
297 * Corresponding peer record.
299 struct PeerRecord *peer;
302 * Corresponding SEND_REQUEST message. Only non-NULL
303 * while SEND_REQUEST message is pending.
305 struct ControlMessage *cm;
308 * Function that will be called to get the actual request
309 * (once we are ready to transmit this request to the core).
310 * The function will be called with a NULL buffer to signal
313 GNUNET_CONNECTION_TransmitReadyNotify get_message;
316 * Closure for get_message.
318 void *get_message_cls;
321 * Timeout for this handle.
323 struct GNUNET_TIME_Absolute timeout;
326 * How important is this message?
331 * Size of this request.
336 * Send message request ID for this request.
341 * Is corking allowed?
349 * Our current client connection went down. Clean it up
350 * and try to reconnect!
352 * @param h our handle to the core service
355 reconnect (struct GNUNET_CORE_Handle *h);
359 * Task schedule to try to re-connect to core.
361 * @param cls the 'struct GNUNET_CORE_Handle'
362 * @param tc task context
365 reconnect_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
367 struct GNUNET_CORE_Handle *h = cls;
369 h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
371 LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to CORE service after delay\n");
378 * Notify clients about disconnect and free
379 * the entry for connected peer.
381 * @param cls the 'struct GNUNET_CORE_Handle*'
382 * @param key the peer identity (not used)
383 * @param value the 'struct PeerRecord' to free.
384 * @return GNUNET_YES (continue)
387 disconnect_and_free_peer_entry (void *cls, const GNUNET_HashCode * key,
390 struct GNUNET_CORE_Handle *h = cls;
391 struct GNUNET_CORE_TransmitHandle *th;
392 struct PeerRecord *pr = value;
394 if (pr->timeout_task != GNUNET_SCHEDULER_NO_TASK)
396 GNUNET_SCHEDULER_cancel (pr->timeout_task);
397 pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
399 if (pr->ntr_task != GNUNET_SCHEDULER_NO_TASK)
401 GNUNET_SCHEDULER_cancel (pr->ntr_task);
402 pr->ntr_task = GNUNET_SCHEDULER_NO_TASK;
404 if ((pr->prev != NULL) || (pr->next != NULL) || (h->ready_peer_head == pr))
405 GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr);
406 if (h->disconnects != NULL)
407 h->disconnects (h->cls, &pr->peer);
408 /* all requests should have been cancelled, clean up anyway, just in case */
409 GNUNET_break (pr->queue_size == 0);
410 while (NULL != (th = pr->pending_head))
413 GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, th);
419 /* done with 'voluntary' cleanups, now on to normal freeing */
420 GNUNET_assert (GNUNET_YES ==
421 GNUNET_CONTAINER_multihashmap_remove (h->peers, key, pr));
422 GNUNET_assert (pr->pending_head == NULL);
423 GNUNET_assert (pr->pending_tail == NULL);
424 GNUNET_assert (pr->ch = h);
425 GNUNET_assert (pr->queue_size == 0);
426 GNUNET_assert (pr->timeout_task == GNUNET_SCHEDULER_NO_TASK);
427 GNUNET_assert (pr->ntr_task == GNUNET_SCHEDULER_NO_TASK);
434 * Close down any existing connection to the CORE service and
435 * try re-establishing it later.
437 * @param h our handle
440 reconnect_later (struct GNUNET_CORE_Handle *h)
442 struct ControlMessage *cm;
443 struct PeerRecord *pr;
445 GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
448 GNUNET_CLIENT_notify_transmit_ready_cancel (h->cth);
451 if (h->client != NULL)
453 GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
456 h->currently_down = GNUNET_YES;
457 GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
459 GNUNET_SCHEDULER_add_delayed (h->retry_backoff, &reconnect_task, h);
460 while (NULL != (cm = h->control_pending_head))
462 GNUNET_CONTAINER_DLL_remove (h->control_pending_head,
463 h->control_pending_tail, cm);
466 if (cm->cont != NULL)
467 cm->cont (cm->cont_cls, GNUNET_NO);
470 GNUNET_CONTAINER_multihashmap_iterate (h->peers,
471 &disconnect_and_free_peer_entry, h);
472 while (NULL != (pr = h->ready_peer_head))
473 GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr);
474 GNUNET_assert (h->control_pending_head == NULL);
476 GNUNET_TIME_relative_min (GNUNET_TIME_UNIT_SECONDS, h->retry_backoff);
477 h->retry_backoff = GNUNET_TIME_relative_multiply (h->retry_backoff, 2);
482 * Check the list of pending requests, send the next
485 * @param h core handle
486 * @param ignore_currently_down transmit message even if not initialized?
489 trigger_next_request (struct GNUNET_CORE_Handle *h, int ignore_currently_down);
493 * The given request hit its timeout. Remove from the
494 * doubly-linked list and call the respective continuation.
496 * @param cls the transmit handle of the request that timed out
497 * @param tc context, can be NULL (!)
500 transmission_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
504 * Send a control message to the peer asking for transmission
505 * of the message in the given peer record.
507 * @param pr peer to request transmission to
510 request_next_transmission (struct PeerRecord *pr)
512 struct GNUNET_CORE_Handle *h = pr->ch;
513 struct ControlMessage *cm;
514 struct SendMessageRequest *smr;
515 struct GNUNET_CORE_TransmitHandle *th;
517 if (pr->timeout_task != GNUNET_SCHEDULER_NO_TASK)
519 GNUNET_SCHEDULER_cancel (pr->timeout_task);
520 pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
522 if (NULL == (th = pr->pending_head))
524 trigger_next_request (h, GNUNET_NO);
528 return; /* already done */
529 GNUNET_assert (pr->prev == NULL);
530 GNUNET_assert (pr->next == NULL);
532 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining
533 (th->timeout), &transmission_timeout, pr);
534 cm = GNUNET_malloc (sizeof (struct ControlMessage) +
535 sizeof (struct SendMessageRequest));
538 smr = (struct SendMessageRequest *) &cm[1];
539 smr->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND_REQUEST);
540 smr->header.size = htons (sizeof (struct SendMessageRequest));
541 smr->priority = htonl (th->priority);
542 smr->deadline = GNUNET_TIME_absolute_hton (th->timeout);
543 smr->peer = pr->peer;
544 smr->queue_size = htonl (pr->queue_size);
545 smr->size = htons (th->msize);
546 smr->smr_id = htons (th->smr_id = pr->smr_id_gen++);
547 GNUNET_CONTAINER_DLL_insert_tail (h->control_pending_head,
548 h->control_pending_tail, cm);
550 LOG (GNUNET_ERROR_TYPE_DEBUG,
551 "Adding SEND REQUEST for peer `%s' to message queue\n",
552 GNUNET_i2s (&pr->peer));
554 trigger_next_request (h, GNUNET_NO);
559 * The given request hit its timeout. Remove from the
560 * doubly-linked list and call the respective continuation.
562 * @param cls the transmit handle of the request that timed out
563 * @param tc context, can be NULL (!)
566 transmission_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
568 struct PeerRecord *pr = cls;
569 struct GNUNET_CORE_Handle *h = pr->ch;
570 struct GNUNET_CORE_TransmitHandle *th;
572 pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
573 th = pr->pending_head;
574 GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, th);
576 if ((pr->prev != NULL) || (pr->next != NULL) || (pr == h->ready_peer_head))
578 /* the request that was 'approved' by core was
579 * canceled before it could be transmitted; remove
580 * us from the 'ready' list */
581 GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr);
584 LOG (GNUNET_ERROR_TYPE_DEBUG,
585 "Signalling timeout of request for transmission to CORE service\n");
587 request_next_transmission (pr);
588 GNUNET_assert (0 == th->get_message (th->get_message_cls, 0, NULL));
594 * Transmit the next message to the core service.
597 transmit_message (void *cls, size_t size, void *buf)
599 struct GNUNET_CORE_Handle *h = cls;
600 struct ControlMessage *cm;
601 struct GNUNET_CORE_TransmitHandle *th;
602 struct PeerRecord *pr;
603 struct SendMessage *sm;
604 const struct GNUNET_MessageHeader *hdr;
608 GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
613 LOG (GNUNET_ERROR_TYPE_DEBUG,
614 "Transmission failed, initiating reconnect\n");
619 /* first check for control messages */
620 if (NULL != (cm = h->control_pending_head))
622 hdr = (const struct GNUNET_MessageHeader *) &cm[1];
623 msize = ntohs (hdr->size);
626 trigger_next_request (h, GNUNET_NO);
630 LOG (GNUNET_ERROR_TYPE_DEBUG,
631 "Transmitting control message with %u bytes of type %u to core.\n",
632 (unsigned int) msize, (unsigned int) ntohs (hdr->type));
634 memcpy (buf, hdr, msize);
635 GNUNET_CONTAINER_DLL_remove (h->control_pending_head,
636 h->control_pending_tail, cm);
639 if (NULL != cm->cont)
640 cm->cont (cm->cont_cls, GNUNET_OK);
642 trigger_next_request (h, GNUNET_NO);
645 /* now check for 'ready' P2P messages */
646 if (NULL != (pr = h->ready_peer_head))
648 GNUNET_assert (pr->pending_head != NULL);
649 th = pr->pending_head;
650 if (size < th->msize + sizeof (struct SendMessage))
652 trigger_next_request (h, GNUNET_NO);
655 GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr);
656 GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, th);
658 if (pr->timeout_task != GNUNET_SCHEDULER_NO_TASK)
660 GNUNET_SCHEDULER_cancel (pr->timeout_task);
661 pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
664 LOG (GNUNET_ERROR_TYPE_DEBUG,
665 "Transmitting SEND request to `%s' with %u bytes.\n",
666 GNUNET_i2s (&pr->peer), (unsigned int) th->msize);
668 sm = (struct SendMessage *) buf;
669 sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND);
670 sm->priority = htonl (th->priority);
671 sm->deadline = GNUNET_TIME_absolute_hton (th->timeout);
673 sm->cork = htonl ((uint32_t) th->cork);
674 sm->reserved = htonl (0);
676 th->get_message (th->get_message_cls,
677 size - sizeof (struct SendMessage), &sm[1]);
680 LOG (GNUNET_ERROR_TYPE_DEBUG,
681 "Transmitting SEND request to `%s' yielded %u bytes.\n",
682 GNUNET_i2s (&pr->peer), ret);
688 LOG (GNUNET_ERROR_TYPE_DEBUG,
689 "Size of clients message to peer %s is 0!\n",
690 GNUNET_i2s (&pr->peer));
692 /* client decided to send nothing! */
693 request_next_transmission (pr);
697 LOG (GNUNET_ERROR_TYPE_DEBUG,
698 "Produced SEND message to core with %u bytes payload\n",
701 GNUNET_assert (ret >= sizeof (struct GNUNET_MessageHeader));
702 if (ret + sizeof (struct SendMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
705 request_next_transmission (pr);
708 ret += sizeof (struct SendMessage);
709 sm->header.size = htons (ret);
710 GNUNET_assert (ret <= size);
711 request_next_transmission (pr);
719 * Check the list of pending requests, send the next
722 * @param h core handle
723 * @param ignore_currently_down transmit message even if not initialized?
726 trigger_next_request (struct GNUNET_CORE_Handle *h, int ignore_currently_down)
730 if ((GNUNET_YES == h->currently_down) && (ignore_currently_down == GNUNET_NO))
733 LOG (GNUNET_ERROR_TYPE_DEBUG,
734 "Core connection down, not processing queue\n");
741 LOG (GNUNET_ERROR_TYPE_DEBUG, "Request pending, not processing queue\n");
745 if (h->control_pending_head != NULL)
747 ntohs (((struct GNUNET_MessageHeader *) &h->
748 control_pending_head[1])->size);
749 else if (h->ready_peer_head != NULL)
751 h->ready_peer_head->pending_head->msize + sizeof (struct SendMessage);
755 LOG (GNUNET_ERROR_TYPE_DEBUG,
756 "Request queue empty, not processing queue\n");
758 return; /* no pending message */
761 GNUNET_CLIENT_notify_transmit_ready (h->client, msize,
762 GNUNET_TIME_UNIT_FOREVER_REL,
763 GNUNET_NO, &transmit_message, h);
768 * Handler for notification messages received from the core.
770 * @param cls our "struct GNUNET_CORE_Handle"
771 * @param msg the message received from the core service
774 main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg)
776 struct GNUNET_CORE_Handle *h = cls;
777 const struct InitReplyMessage *m;
778 const struct ConnectNotifyMessage *cnm;
779 const struct DisconnectNotifyMessage *dnm;
780 const struct NotifyTrafficMessage *ntm;
781 const struct GNUNET_MessageHeader *em;
782 const struct SendMessageReady *smr;
783 const struct GNUNET_CORE_MessageHandler *mh;
784 const struct GNUNET_ATS_Information* ats;
785 GNUNET_CORE_StartupCallback init;
786 struct PeerRecord *pr;
787 struct GNUNET_CORE_TransmitHandle *th;
796 LOG (GNUNET_ERROR_TYPE_INFO,
798 ("Client was disconnected from core service, trying to reconnect.\n"));
802 msize = ntohs (msg->size);
804 LOG (GNUNET_ERROR_TYPE_DEBUG,
805 "Processing message of type %u and size %u from core service\n",
806 ntohs (msg->type), msize);
808 switch (ntohs (msg->type))
810 case GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY:
811 if (ntohs (msg->size) != sizeof (struct InitReplyMessage))
817 m = (const struct InitReplyMessage *) msg;
818 GNUNET_break (0 == ntohl (m->reserved));
819 /* start our message processing loop */
820 if (GNUNET_YES == h->currently_down)
822 h->currently_down = GNUNET_NO;
823 trigger_next_request (h, GNUNET_NO);
825 h->retry_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
826 h->me = m->my_identity;
827 if (NULL != (init = h->init))
829 /* mark so we don't call init on reconnect */
832 LOG (GNUNET_ERROR_TYPE_DEBUG, "Connected to core service of peer `%s'.\n",
833 GNUNET_i2s (&h->me));
835 init (h->cls, h, &h->me);
840 LOG (GNUNET_ERROR_TYPE_DEBUG,
841 "Successfully reconnected to core service.\n");
844 /* fake 'connect to self' */
845 pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &h->me.hashPubKey);
846 GNUNET_assert (pr == NULL);
847 pr = GNUNET_malloc (sizeof (struct PeerRecord));
850 GNUNET_assert (GNUNET_YES ==
851 GNUNET_CONTAINER_multihashmap_put (h->peers,
852 &h->me.hashPubKey, pr,
853 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
854 if (NULL != h->connects)
855 h->connects (h->cls, &h->me, NULL, 0);
857 case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT:
858 if (msize < sizeof (struct ConnectNotifyMessage))
864 cnm = (const struct ConnectNotifyMessage *) msg;
865 ats_count = ntohl (cnm->ats_count);
867 sizeof (struct ConnectNotifyMessage) +
868 ats_count * sizeof (struct GNUNET_ATS_Information))
875 LOG (GNUNET_ERROR_TYPE_DEBUG,
876 "Received notification about connection from `%s'.\n",
877 GNUNET_i2s (&cnm->peer));
879 if (0 == memcmp (&h->me, &cnm->peer, sizeof (struct GNUNET_PeerIdentity)))
881 /* connect to self!? */
885 pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &cnm->peer.hashPubKey);
892 pr = GNUNET_malloc (sizeof (struct PeerRecord));
893 pr->peer = cnm->peer;
895 GNUNET_assert (GNUNET_YES ==
896 GNUNET_CONTAINER_multihashmap_put (h->peers,
897 &cnm->peer.hashPubKey, pr,
898 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
899 ats = (const struct GNUNET_ATS_Information*) &cnm[1];
900 if (NULL != h->connects)
901 h->connects (h->cls, &cnm->peer,
905 case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT:
906 if (msize != sizeof (struct DisconnectNotifyMessage))
912 dnm = (const struct DisconnectNotifyMessage *) msg;
913 if (0 == memcmp (&h->me, &dnm->peer, sizeof (struct GNUNET_PeerIdentity)))
915 /* connection to self!? */
919 GNUNET_break (0 == ntohl (dnm->reserved));
921 LOG (GNUNET_ERROR_TYPE_DEBUG,
922 "Received notification about disconnect from `%s'.\n",
923 GNUNET_i2s (&dnm->peer));
925 pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &dnm->peer.hashPubKey);
932 trigger = ((pr->prev != NULL) || (pr->next != NULL) ||
933 (h->ready_peer_head == pr));
934 disconnect_and_free_peer_entry (h, &dnm->peer.hashPubKey, pr);
936 trigger_next_request (h, GNUNET_NO);
938 case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND:
939 if (msize < sizeof (struct NotifyTrafficMessage))
945 ntm = (const struct NotifyTrafficMessage *) msg;
947 ats_count = ntohl (ntm->ats_count);
949 sizeof (struct NotifyTrafficMessage) +
950 ats_count * sizeof (struct GNUNET_ATS_Information) +
951 sizeof (struct GNUNET_MessageHeader)) ||
952 (GNUNET_ATS_ARRAY_TERMINATOR !=
953 ntohl ((&ntm->ats)[ats_count].type)))
959 em = (const struct GNUNET_MessageHeader *) &(&ntm->ats)[ats_count + 1];
961 LOG (GNUNET_ERROR_TYPE_DEBUG,
962 "Received message of type %u and size %u from peer `%4s'\n",
963 ntohs (em->type), ntohs (em->size), GNUNET_i2s (&ntm->peer));
965 pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &ntm->peer.hashPubKey);
972 if ((GNUNET_NO == h->inbound_hdr_only) &&
974 ntohs (em->size) + sizeof (struct NotifyTrafficMessage) +
975 +ats_count * sizeof (struct GNUNET_ATS_Information)))
981 et = ntohs (em->type);
982 for (hpos = 0; hpos < h->hcnt; hpos++)
984 mh = &h->handlers[hpos];
987 if ((mh->expected_size != ntohs (em->size)) && (mh->expected_size != 0))
989 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
990 "Unexpected message size for message of type %u\n",
996 h->handlers[hpos].callback (h->cls, &ntm->peer, em, &ntm->ats,
999 /* error in processing, do not process other messages! */
1003 if (NULL != h->inbound_notify)
1004 h->inbound_notify (h->cls, &ntm->peer, em, &ntm->ats,
1007 case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_OUTBOUND:
1008 if (msize < sizeof (struct NotifyTrafficMessage))
1011 reconnect_later (h);
1014 ntm = (const struct NotifyTrafficMessage *) msg;
1015 if (0 == memcmp (&h->me, &ntm->peer, sizeof (struct GNUNET_PeerIdentity)))
1021 ats_count = ntohl (ntm->ats_count);
1023 sizeof (struct NotifyTrafficMessage) +
1024 ats_count * sizeof (struct GNUNET_ATS_Information) +
1025 sizeof (struct GNUNET_MessageHeader)) ||
1026 (GNUNET_ATS_ARRAY_TERMINATOR !=
1027 ntohl ((&ntm->ats)[ats_count].type)))
1030 reconnect_later (h);
1033 em = (const struct GNUNET_MessageHeader *) &(&ntm->ats)[ats_count + 1];
1034 pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &ntm->peer.hashPubKey);
1038 reconnect_later (h);
1042 LOG (GNUNET_ERROR_TYPE_DEBUG,
1043 "Received notification about transmission to `%s'.\n",
1044 GNUNET_i2s (&ntm->peer));
1046 if ((GNUNET_NO == h->outbound_hdr_only) &&
1048 ntohs (em->size) + sizeof (struct NotifyTrafficMessage) +
1049 ats_count * sizeof (struct GNUNET_ATS_Information)))
1052 reconnect_later (h);
1055 if (NULL == h->outbound_notify)
1060 h->outbound_notify (h->cls, &ntm->peer, em, &ntm->ats, ats_count);
1062 case GNUNET_MESSAGE_TYPE_CORE_SEND_READY:
1063 if (msize != sizeof (struct SendMessageReady))
1066 reconnect_later (h);
1069 smr = (const struct SendMessageReady *) msg;
1070 pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &smr->peer.hashPubKey);
1074 reconnect_later (h);
1078 LOG (GNUNET_ERROR_TYPE_DEBUG,
1079 "Received notification about transmission readiness to `%s'.\n",
1080 GNUNET_i2s (&smr->peer));
1082 if (pr->pending_head == NULL)
1084 /* request must have been cancelled between the original request
1085 * and the response from core, ignore core's readiness */
1089 th = pr->pending_head;
1090 if (ntohs (smr->smr_id) != th->smr_id)
1092 /* READY message is for expired or cancelled message,
1093 * ignore! (we should have already sent another request) */
1096 if ((pr->prev != NULL) || (pr->next != NULL) || (h->ready_peer_head == pr))
1098 /* we should not already be on the ready list... */
1100 reconnect_later (h);
1103 GNUNET_CONTAINER_DLL_insert (h->ready_peer_head, h->ready_peer_tail, pr);
1104 trigger_next_request (h, GNUNET_NO);
1107 reconnect_later (h);
1110 GNUNET_CLIENT_receive (h->client, &main_notify_handler, h,
1111 GNUNET_TIME_UNIT_FOREVER_REL);
1116 * Task executed once we are done transmitting the INIT message.
1117 * Starts our 'receive' loop.
1119 * @param cls the 'struct GNUNET_CORE_Handle'
1120 * @param success were we successful
1123 init_done_task (void *cls, int success)
1125 struct GNUNET_CORE_Handle *h = cls;
1127 if (success == GNUNET_SYSERR)
1128 return; /* shutdown */
1129 if (success == GNUNET_NO)
1132 LOG (GNUNET_ERROR_TYPE_DEBUG,
1133 "Failed to exchange INIT with core, retrying\n");
1135 if (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK)
1136 reconnect_later (h);
1139 GNUNET_CLIENT_receive (h->client, &main_notify_handler, h,
1140 GNUNET_TIME_UNIT_FOREVER_REL);
1145 * Our current client connection went down. Clean it up
1146 * and try to reconnect!
1148 * @param h our handle to the core service
1151 reconnect (struct GNUNET_CORE_Handle *h)
1153 struct ControlMessage *cm;
1154 struct InitMessage *init;
1161 LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnecting to CORE service\n");
1163 GNUNET_assert (h->client == NULL);
1164 GNUNET_assert (h->currently_down == GNUNET_YES);
1165 h->client = GNUNET_CLIENT_connect ("core", h->cfg);
1166 if (h->client == NULL)
1168 reconnect_later (h);
1171 msize = h->hcnt * sizeof (uint16_t) + sizeof (struct InitMessage);
1172 cm = GNUNET_malloc (sizeof (struct ControlMessage) + msize);
1173 cm->cont = &init_done_task;
1175 init = (struct InitMessage *) &cm[1];
1176 init->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_INIT);
1177 init->header.size = htons (msize);
1179 if (h->inbound_notify != NULL)
1181 if (h->inbound_hdr_only)
1182 opt |= GNUNET_CORE_OPTION_SEND_HDR_INBOUND;
1184 opt |= GNUNET_CORE_OPTION_SEND_FULL_INBOUND;
1186 if (h->outbound_notify != NULL)
1188 if (h->outbound_hdr_only)
1189 opt |= GNUNET_CORE_OPTION_SEND_HDR_OUTBOUND;
1191 opt |= GNUNET_CORE_OPTION_SEND_FULL_OUTBOUND;
1193 init->options = htonl (opt);
1194 ts = (uint16_t *) & init[1];
1195 for (hpos = 0; hpos < h->hcnt; hpos++)
1196 ts[hpos] = htons (h->handlers[hpos].type);
1197 GNUNET_CONTAINER_DLL_insert (h->control_pending_head, h->control_pending_tail,
1199 trigger_next_request (h, GNUNET_YES);
1205 * Connect to the core service. Note that the connection may
1206 * complete (or fail) asynchronously.
1208 * @param cfg configuration to use
1209 * @param queue_size size of the per-peer message queue
1210 * @param cls closure for the various callbacks that follow (including handlers in the handlers array)
1211 * @param init callback to call on timeout or once we have successfully
1212 * connected to the core service; note that timeout is only meaningful if init is not NULL
1213 * @param connects function to call on peer connect, can be NULL
1214 * @param disconnects function to call on peer disconnect / timeout, can be NULL
1215 * @param inbound_notify function to call for all inbound messages, can be NULL
1216 * @param inbound_hdr_only set to GNUNET_YES if inbound_notify will only read the
1217 * GNUNET_MessageHeader and hence we do not need to give it the full message;
1218 * can be used to improve efficiency, ignored if inbound_notify is NULLL
1219 * @param outbound_notify function to call for all outbound messages, can be NULL
1220 * @param outbound_hdr_only set to GNUNET_YES if outbound_notify will only read the
1221 * GNUNET_MessageHeader and hence we do not need to give it the full message
1222 * can be used to improve efficiency, ignored if outbound_notify is NULLL
1223 * @param handlers callbacks for messages we care about, NULL-terminated
1224 * @return handle to the core service (only useful for disconnect until 'init' is called);
1225 * NULL on error (in this case, init is never called)
1227 struct GNUNET_CORE_Handle *
1228 GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
1229 unsigned int queue_size, void *cls,
1230 GNUNET_CORE_StartupCallback init,
1231 GNUNET_CORE_ConnectEventHandler connects,
1232 GNUNET_CORE_DisconnectEventHandler disconnects,
1233 GNUNET_CORE_MessageCallback inbound_notify,
1234 int inbound_hdr_only,
1235 GNUNET_CORE_MessageCallback outbound_notify,
1236 int outbound_hdr_only,
1237 const struct GNUNET_CORE_MessageHandler *handlers)
1239 struct GNUNET_CORE_Handle *h;
1241 h = GNUNET_malloc (sizeof (struct GNUNET_CORE_Handle));
1243 h->queue_size = queue_size;
1246 h->connects = connects;
1247 h->disconnects = disconnects;
1248 h->inbound_notify = inbound_notify;
1249 h->outbound_notify = outbound_notify;
1250 h->inbound_hdr_only = inbound_hdr_only;
1251 h->outbound_hdr_only = outbound_hdr_only;
1252 h->handlers = handlers;
1254 h->currently_down = GNUNET_YES;
1255 h->peers = GNUNET_CONTAINER_multihashmap_create (128);
1256 h->retry_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
1257 if (NULL != handlers)
1258 while (handlers[h->hcnt].callback != NULL)
1260 GNUNET_assert (h->hcnt <
1261 (GNUNET_SERVER_MAX_MESSAGE_SIZE -
1262 sizeof (struct InitMessage)) / sizeof (uint16_t));
1264 LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to CORE service\n");
1272 * Disconnect from the core service. This function can only
1273 * be called *after* all pending 'GNUNET_CORE_notify_transmit_ready'
1274 * requests have been explicitly canceled.
1276 * @param handle connection to core to disconnect
1279 GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle)
1281 struct ControlMessage *cm;
1284 LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting from CORE service\n");
1286 if (handle->cth != NULL)
1288 GNUNET_CLIENT_notify_transmit_ready_cancel (handle->cth);
1291 while (NULL != (cm = handle->control_pending_head))
1293 GNUNET_CONTAINER_DLL_remove (handle->control_pending_head,
1294 handle->control_pending_tail, cm);
1297 if (cm->cont != NULL)
1298 cm->cont (cm->cont_cls, GNUNET_SYSERR);
1301 if (handle->client != NULL)
1303 GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
1304 handle->client = NULL;
1306 GNUNET_CONTAINER_multihashmap_iterate (handle->peers,
1307 &disconnect_and_free_peer_entry,
1309 if (handle->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
1311 GNUNET_SCHEDULER_cancel (handle->reconnect_task);
1312 handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
1314 GNUNET_CONTAINER_multihashmap_destroy (handle->peers);
1315 handle->peers = NULL;
1316 GNUNET_break (handle->ready_peer_head == NULL);
1317 GNUNET_free (handle);
1322 * Task that calls 'request_next_transmission'.
1324 * @param cls the 'struct PeerRecord*'
1325 * @param tc scheduler context
1328 run_request_next_transmission (void *cls,
1329 const struct GNUNET_SCHEDULER_TaskContext *tc)
1331 struct PeerRecord *pr = cls;
1333 pr->ntr_task = GNUNET_SCHEDULER_NO_TASK;
1334 request_next_transmission (pr);
1339 * Ask the core to call "notify" once it is ready to transmit the
1340 * given number of bytes to the specified "target". Must only be
1341 * called after a connection to the respective peer has been
1342 * established (and the client has been informed about this).
1344 * @param handle connection to core service
1345 * @param cork is corking allowed for this transmission?
1346 * @param priority how important is the message?
1347 * @param maxdelay how long can the message wait?
1348 * @param target who should receive the message,
1349 * use NULL for this peer (loopback)
1350 * @param notify_size how many bytes of buffer space does notify want?
1351 * @param notify function to call when buffer space is available
1352 * @param notify_cls closure for notify
1353 * @return non-NULL if the notify callback was queued,
1354 * NULL if we can not even queue the request (insufficient
1355 * memory); if NULL is returned, "notify" will NOT be called.
1357 struct GNUNET_CORE_TransmitHandle *
1358 GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle, int cork,
1360 struct GNUNET_TIME_Relative maxdelay,
1361 const struct GNUNET_PeerIdentity *target,
1363 GNUNET_CONNECTION_TransmitReadyNotify notify,
1366 struct PeerRecord *pr;
1367 struct GNUNET_CORE_TransmitHandle *th;
1368 struct GNUNET_CORE_TransmitHandle *pos;
1369 struct GNUNET_CORE_TransmitHandle *prev;
1370 struct GNUNET_CORE_TransmitHandle *minp;
1372 pr = GNUNET_CONTAINER_multihashmap_get (handle->peers, &target->hashPubKey);
1375 /* attempt to send to peer that is not connected */
1376 LOG (GNUNET_ERROR_TYPE_WARNING,
1377 "Attempting to send to peer `%s' from peer `%s', but not connected!\n",
1378 GNUNET_i2s (target), GNUNET_h2s (&handle->me.hashPubKey));
1382 GNUNET_assert (notify_size + sizeof (struct SendMessage) <
1383 GNUNET_SERVER_MAX_MESSAGE_SIZE);
1384 th = GNUNET_malloc (sizeof (struct GNUNET_CORE_TransmitHandle));
1386 GNUNET_assert (NULL != notify);
1387 th->get_message = notify;
1388 th->get_message_cls = notify_cls;
1389 th->timeout = GNUNET_TIME_relative_to_absolute (maxdelay);
1390 th->priority = priority;
1391 th->msize = notify_size;
1393 /* bound queue size */
1394 if (pr->queue_size == handle->queue_size)
1396 /* find lowest-priority entry, but skip the head of the list */
1397 minp = pr->pending_head->next;
1399 while (prev != NULL)
1401 if (prev->priority < minp->priority)
1407 GNUNET_break (handle->queue_size != 0);
1408 GNUNET_break (pr->queue_size == 1);
1411 LOG (GNUNET_ERROR_TYPE_DEBUG,
1412 "Dropping transmission request: cannot drop queue head and limit is one\n");
1416 if (priority <= minp->priority)
1419 LOG (GNUNET_ERROR_TYPE_DEBUG,
1420 "Dropping transmission request: priority too low\n");
1423 return NULL; /* priority too low */
1425 GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, minp);
1427 GNUNET_assert (0 == minp->get_message (minp->get_message_cls, 0, NULL));
1431 /* Order entries by deadline, but SKIP 'HEAD' if
1432 * we're in the 'ready_peer_*' DLL */
1433 pos = pr->pending_head;
1434 if ((pr->prev != NULL) || (pr->next != NULL) ||
1435 (pr == handle->ready_peer_head))
1437 GNUNET_assert (pos != NULL);
1438 pos = pos->next; /* skip head */
1441 /* insertion sort */
1443 while ((pos != NULL) && (pos->timeout.abs_value < th->timeout.abs_value))
1448 GNUNET_CONTAINER_DLL_insert_after (pr->pending_head, pr->pending_tail, prev,
1451 /* was the request queue previously empty? */
1453 LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmission request added to queue\n");
1455 if ((pr->pending_head == th) && (pr->ntr_task == GNUNET_SCHEDULER_NO_TASK) &&
1456 (pr->next == NULL) && (pr->prev == NULL) &&
1457 (handle->ready_peer_head != pr))
1459 GNUNET_SCHEDULER_add_now (&run_request_next_transmission, pr);
1465 * Cancel the specified transmission-ready notification.
1467 * @param th handle that was returned by "notify_transmit_ready".
1470 GNUNET_CORE_notify_transmit_ready_cancel (struct GNUNET_CORE_TransmitHandle *th)
1472 struct PeerRecord *pr = th->peer;
1473 struct GNUNET_CORE_Handle *h = pr->ch;
1476 was_head = (pr->pending_head == th);
1477 GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, th);
1481 /* we're currently in the control queue, remove */
1482 GNUNET_CONTAINER_DLL_remove (h->control_pending_head,
1483 h->control_pending_tail, th->cm);
1484 GNUNET_free (th->cm);
1489 if ((pr->prev != NULL) || (pr->next != NULL) || (pr == h->ready_peer_head))
1491 /* the request that was 'approved' by core was
1492 * canceled before it could be transmitted; remove
1493 * us from the 'ready' list */
1494 GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr);
1496 request_next_transmission (pr);
1501 /* end of core_api.c */