2 This file is part of GNUnet.
3 (C) 2009, 2010 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file core/core_api.c
23 * @brief core service; this is the main API for encrypted P2P
25 * @author Christian Grothoff
28 #include "gnunet_constants.h"
29 #include "gnunet_core_service.h"
32 #define LOG(kind,...) GNUNET_log_from (kind, "core-api",__VA_ARGS__)
35 * Information we track for each peer.
41 * We generally do NOT keep peer records in a DLL; this
42 * DLL is only used IF this peer's 'pending_head' message
43 * is ready for transmission.
45 struct PeerRecord *prev;
48 * We generally do NOT keep peer records in a DLL; this
49 * DLL is only used IF this peer's 'pending_head' message
50 * is ready for transmission.
52 struct PeerRecord *next;
55 * Peer the record is about.
57 struct GNUNET_PeerIdentity peer;
60 * Corresponding core handle.
62 struct GNUNET_CORE_Handle *ch;
65 * Head of doubly-linked list of pending requests.
66 * Requests are sorted by deadline *except* for HEAD,
67 * which is only modified upon transmission to core.
69 struct GNUNET_CORE_TransmitHandle *pending_head;
72 * Tail of doubly-linked list of pending requests.
74 struct GNUNET_CORE_TransmitHandle *pending_tail;
77 * ID of timeout task for the 'pending_head' handle
78 * which is the one with the smallest timeout.
80 GNUNET_SCHEDULER_TaskIdentifier timeout_task;
83 * ID of task to run 'next_request_transmission'.
85 GNUNET_SCHEDULER_TaskIdentifier ntr_task;
88 * Current size of the queue of pending requests.
90 unsigned int queue_size;
93 * SendMessageRequest ID generator for this peer.
101 * Type of function called upon completion.
104 * @param success GNUNET_OK on success (which for request_connect
105 * ONLY means that we transmitted the connect request to CORE,
106 * it does not mean that we are actually now connected!);
107 * GNUNET_NO on timeout,
108 * GNUNET_SYSERR if core was shut down
110 typedef void (*GNUNET_CORE_ControlContinuation) (void *cls, int success);
114 * Entry in a doubly-linked list of control messages to be transmitted
115 * to the core service. Control messages include traffic allocation,
116 * connection requests and of course our initial 'init' request.
118 * The actual message is allocated at the end of this struct.
120 struct ControlMessage
123 * This is a doubly-linked list.
125 struct ControlMessage *next;
128 * This is a doubly-linked list.
130 struct ControlMessage *prev;
133 * Function to run after transmission failed/succeeded.
135 GNUNET_CORE_ControlContinuation cont;
138 * Closure for 'cont'.
143 * Transmit handle (if one is associated with this ControlMessage), or NULL.
145 struct GNUNET_CORE_TransmitHandle *th;
151 * Context for the core service connection.
153 struct GNUNET_CORE_Handle
157 * Configuration we're using.
159 const struct GNUNET_CONFIGURATION_Handle *cfg;
162 * Closure for the various callbacks.
167 * Function to call once we've handshaked with the core service.
169 GNUNET_CORE_StartupCallback init;
172 * Function to call whenever we're notified about a peer connecting.
174 GNUNET_CORE_ConnectEventHandler connects;
177 * Function to call whenever we're notified about a peer disconnecting.
179 GNUNET_CORE_DisconnectEventHandler disconnects;
182 * Function to call whenever we receive an inbound message.
184 GNUNET_CORE_MessageCallback inbound_notify;
187 * Function to call whenever we receive an outbound message.
189 GNUNET_CORE_MessageCallback outbound_notify;
192 * Function handlers for messages of particular type.
194 const struct GNUNET_CORE_MessageHandler *handlers;
197 * Our connection to the service.
199 struct GNUNET_CLIENT_Connection *client;
202 * Handle for our current transmission request.
204 struct GNUNET_CLIENT_TransmitHandle *cth;
207 * Head of doubly-linked list of pending requests.
209 struct ControlMessage *control_pending_head;
212 * Tail of doubly-linked list of pending requests.
214 struct ControlMessage *control_pending_tail;
217 * Head of doubly-linked list of peers that are core-approved
218 * to send their next message.
220 struct PeerRecord *ready_peer_head;
223 * Tail of doubly-linked list of peers that are core-approved
224 * to send their next message.
226 struct PeerRecord *ready_peer_tail;
229 * Hash map listing all of the peers that we are currently
232 struct GNUNET_CONTAINER_MultiHashMap *peers;
235 * Identity of this peer.
237 struct GNUNET_PeerIdentity me;
240 * ID of reconnect task (if any).
242 GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
245 * Current delay we use for re-trying to connect to core.
247 struct GNUNET_TIME_Relative retry_backoff;
250 * Number of messages we are allowed to queue per target.
252 unsigned int queue_size;
255 * Number of entries in the handlers array.
260 * For inbound notifications without a specific handler, do
261 * we expect to only receive headers?
263 int inbound_hdr_only;
266 * For outbound notifications without a specific handler, do
267 * we expect to only receive headers?
269 int outbound_hdr_only;
272 * Are we currently disconnected and hence unable to forward
281 * Handle for a transmission request.
283 struct GNUNET_CORE_TransmitHandle
287 * We keep active transmit handles in a doubly-linked list.
289 struct GNUNET_CORE_TransmitHandle *next;
292 * We keep active transmit handles in a doubly-linked list.
294 struct GNUNET_CORE_TransmitHandle *prev;
297 * Corresponding peer record.
299 struct PeerRecord *peer;
302 * Corresponding SEND_REQUEST message. Only non-NULL
303 * while SEND_REQUEST message is pending.
305 struct ControlMessage *cm;
308 * Function that will be called to get the actual request
309 * (once we are ready to transmit this request to the core).
310 * The function will be called with a NULL buffer to signal
313 GNUNET_CONNECTION_TransmitReadyNotify get_message;
316 * Closure for get_message.
318 void *get_message_cls;
321 * Timeout for this handle.
323 struct GNUNET_TIME_Absolute timeout;
326 * How important is this message?
331 * Size of this request.
336 * Send message request ID for this request.
341 * Is corking allowed?
349 * Our current client connection went down. Clean it up
350 * and try to reconnect!
352 * @param h our handle to the core service
355 reconnect (struct GNUNET_CORE_Handle *h);
359 * Task schedule to try to re-connect to core.
361 * @param cls the 'struct GNUNET_CORE_Handle'
362 * @param tc task context
365 reconnect_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
367 struct GNUNET_CORE_Handle *h = cls;
369 h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
370 LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to CORE service after delay\n");
376 * Notify clients about disconnect and free
377 * the entry for connected peer.
379 * @param cls the 'struct GNUNET_CORE_Handle*'
380 * @param key the peer identity (not used)
381 * @param value the 'struct PeerRecord' to free.
382 * @return GNUNET_YES (continue)
385 disconnect_and_free_peer_entry (void *cls, const GNUNET_HashCode * key,
388 struct GNUNET_CORE_Handle *h = cls;
389 struct GNUNET_CORE_TransmitHandle *th;
390 struct PeerRecord *pr = value;
392 if (pr->timeout_task != GNUNET_SCHEDULER_NO_TASK)
394 GNUNET_SCHEDULER_cancel (pr->timeout_task);
395 pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
397 if (pr->ntr_task != GNUNET_SCHEDULER_NO_TASK)
399 GNUNET_SCHEDULER_cancel (pr->ntr_task);
400 pr->ntr_task = GNUNET_SCHEDULER_NO_TASK;
402 if ((pr->prev != NULL) || (pr->next != NULL) || (h->ready_peer_head == pr))
403 GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr);
404 if (h->disconnects != NULL)
405 h->disconnects (h->cls, &pr->peer);
406 /* all requests should have been cancelled, clean up anyway, just in case */
407 GNUNET_break (pr->queue_size == 0);
408 while (NULL != (th = pr->pending_head))
411 GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, th);
417 /* done with 'voluntary' cleanups, now on to normal freeing */
418 GNUNET_assert (GNUNET_YES ==
419 GNUNET_CONTAINER_multihashmap_remove (h->peers, key, pr));
420 GNUNET_assert (pr->pending_head == NULL);
421 GNUNET_assert (pr->pending_tail == NULL);
422 GNUNET_assert (pr->ch == h);
423 GNUNET_assert (pr->queue_size == 0);
424 GNUNET_assert (pr->timeout_task == GNUNET_SCHEDULER_NO_TASK);
425 GNUNET_assert (pr->ntr_task == GNUNET_SCHEDULER_NO_TASK);
432 * Close down any existing connection to the CORE service and
433 * try re-establishing it later.
435 * @param h our handle
438 reconnect_later (struct GNUNET_CORE_Handle *h)
440 struct ControlMessage *cm;
441 struct PeerRecord *pr;
443 GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
446 GNUNET_CLIENT_notify_transmit_ready_cancel (h->cth);
449 if (h->client != NULL)
451 GNUNET_CLIENT_disconnect (h->client);
454 h->currently_down = GNUNET_YES;
455 GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
457 GNUNET_SCHEDULER_add_delayed (h->retry_backoff, &reconnect_task, h);
458 while (NULL != (cm = h->control_pending_head))
460 GNUNET_CONTAINER_DLL_remove (h->control_pending_head,
461 h->control_pending_tail, cm);
464 if (cm->cont != NULL)
465 cm->cont (cm->cont_cls, GNUNET_NO);
468 GNUNET_CONTAINER_multihashmap_iterate (h->peers,
469 &disconnect_and_free_peer_entry, h);
470 while (NULL != (pr = h->ready_peer_head))
471 GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr);
472 GNUNET_assert (h->control_pending_head == NULL);
474 GNUNET_TIME_relative_min (GNUNET_TIME_UNIT_SECONDS, h->retry_backoff);
475 h->retry_backoff = GNUNET_TIME_relative_multiply (h->retry_backoff, 2);
480 * Check the list of pending requests, send the next
483 * @param h core handle
484 * @param ignore_currently_down transmit message even if not initialized?
487 trigger_next_request (struct GNUNET_CORE_Handle *h, int ignore_currently_down);
491 * The given request hit its timeout. Remove from the
492 * doubly-linked list and call the respective continuation.
494 * @param cls the transmit handle of the request that timed out
495 * @param tc context, can be NULL (!)
498 transmission_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
502 * Send a control message to the peer asking for transmission
503 * of the message in the given peer record.
505 * @param pr peer to request transmission to
508 request_next_transmission (struct PeerRecord *pr)
510 struct GNUNET_CORE_Handle *h = pr->ch;
511 struct ControlMessage *cm;
512 struct SendMessageRequest *smr;
513 struct GNUNET_CORE_TransmitHandle *th;
515 if (pr->timeout_task != GNUNET_SCHEDULER_NO_TASK)
517 GNUNET_SCHEDULER_cancel (pr->timeout_task);
518 pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
520 if (NULL == (th = pr->pending_head))
522 trigger_next_request (h, GNUNET_NO);
526 return; /* already done */
527 GNUNET_assert (pr->prev == NULL);
528 GNUNET_assert (pr->next == NULL);
530 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining
531 (th->timeout), &transmission_timeout, pr);
532 cm = GNUNET_malloc (sizeof (struct ControlMessage) +
533 sizeof (struct SendMessageRequest));
536 smr = (struct SendMessageRequest *) &cm[1];
537 smr->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND_REQUEST);
538 smr->header.size = htons (sizeof (struct SendMessageRequest));
539 smr->priority = htonl (th->priority);
540 smr->deadline = GNUNET_TIME_absolute_hton (th->timeout);
541 smr->peer = pr->peer;
542 smr->queue_size = htonl (pr->queue_size);
543 smr->size = htons (th->msize);
544 smr->smr_id = htons (th->smr_id = pr->smr_id_gen++);
545 GNUNET_CONTAINER_DLL_insert_tail (h->control_pending_head,
546 h->control_pending_tail, cm);
547 LOG (GNUNET_ERROR_TYPE_DEBUG,
548 "Adding SEND REQUEST for peer `%s' to message queue\n",
549 GNUNET_i2s (&pr->peer));
550 trigger_next_request (h, GNUNET_NO);
555 * The given request hit its timeout. Remove from the
556 * doubly-linked list and call the respective continuation.
558 * @param cls the transmit handle of the request that timed out
559 * @param tc context, can be NULL (!)
562 transmission_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
564 struct PeerRecord *pr = cls;
565 struct GNUNET_CORE_Handle *h = pr->ch;
566 struct GNUNET_CORE_TransmitHandle *th;
568 pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
569 th = pr->pending_head;
570 GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, th);
572 if ((pr->prev != NULL) || (pr->next != NULL) || (pr == h->ready_peer_head))
574 /* the request that was 'approved' by core was
575 * canceled before it could be transmitted; remove
576 * us from the 'ready' list */
577 GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr);
579 LOG (GNUNET_ERROR_TYPE_DEBUG,
580 "Signalling timeout of request for transmission to CORE service\n");
581 request_next_transmission (pr);
582 GNUNET_assert (0 == th->get_message (th->get_message_cls, 0, NULL));
588 * Transmit the next message to the core service.
591 transmit_message (void *cls, size_t size, void *buf)
593 struct GNUNET_CORE_Handle *h = cls;
594 struct ControlMessage *cm;
595 struct GNUNET_CORE_TransmitHandle *th;
596 struct PeerRecord *pr;
597 struct SendMessage *sm;
598 const struct GNUNET_MessageHeader *hdr;
602 GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
606 LOG (GNUNET_ERROR_TYPE_DEBUG,
607 "Transmission failed, initiating reconnect\n");
611 /* first check for control messages */
612 if (NULL != (cm = h->control_pending_head))
614 hdr = (const struct GNUNET_MessageHeader *) &cm[1];
615 msize = ntohs (hdr->size);
618 trigger_next_request (h, GNUNET_NO);
621 LOG (GNUNET_ERROR_TYPE_DEBUG,
622 "Transmitting control message with %u bytes of type %u to core.\n",
623 (unsigned int) msize, (unsigned int) ntohs (hdr->type));
624 memcpy (buf, hdr, msize);
625 GNUNET_CONTAINER_DLL_remove (h->control_pending_head,
626 h->control_pending_tail, cm);
629 if (NULL != cm->cont)
630 cm->cont (cm->cont_cls, GNUNET_OK);
632 trigger_next_request (h, GNUNET_NO);
635 /* now check for 'ready' P2P messages */
636 if (NULL != (pr = h->ready_peer_head))
638 GNUNET_assert (pr->pending_head != NULL);
639 th = pr->pending_head;
640 if (size < th->msize + sizeof (struct SendMessage))
642 trigger_next_request (h, GNUNET_NO);
645 GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr);
646 GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, th);
648 if (pr->timeout_task != GNUNET_SCHEDULER_NO_TASK)
650 GNUNET_SCHEDULER_cancel (pr->timeout_task);
651 pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
653 LOG (GNUNET_ERROR_TYPE_DEBUG,
654 "Transmitting SEND request to `%s' with %u bytes.\n",
655 GNUNET_i2s (&pr->peer), (unsigned int) th->msize);
656 sm = (struct SendMessage *) buf;
657 sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND);
658 sm->priority = htonl (th->priority);
659 sm->deadline = GNUNET_TIME_absolute_hton (th->timeout);
661 sm->cork = htonl ((uint32_t) th->cork);
662 sm->reserved = htonl (0);
664 th->get_message (th->get_message_cls,
665 size - sizeof (struct SendMessage), &sm[1]);
667 LOG (GNUNET_ERROR_TYPE_DEBUG,
668 "Transmitting SEND request to `%s' yielded %u bytes.\n",
669 GNUNET_i2s (&pr->peer), ret);
673 LOG (GNUNET_ERROR_TYPE_DEBUG,
674 "Size of clients message to peer %s is 0!\n",
675 GNUNET_i2s (&pr->peer));
676 /* client decided to send nothing! */
677 request_next_transmission (pr);
680 LOG (GNUNET_ERROR_TYPE_DEBUG,
681 "Produced SEND message to core with %u bytes payload\n",
683 GNUNET_assert (ret >= sizeof (struct GNUNET_MessageHeader));
684 if (ret + sizeof (struct SendMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
687 request_next_transmission (pr);
690 ret += sizeof (struct SendMessage);
691 sm->header.size = htons (ret);
692 GNUNET_assert (ret <= size);
693 request_next_transmission (pr);
701 * Check the list of pending requests, send the next
704 * @param h core handle
705 * @param ignore_currently_down transmit message even if not initialized?
708 trigger_next_request (struct GNUNET_CORE_Handle *h, int ignore_currently_down)
712 if ((GNUNET_YES == h->currently_down) && (ignore_currently_down == GNUNET_NO))
714 LOG (GNUNET_ERROR_TYPE_DEBUG,
715 "Core connection down, not processing queue\n");
720 LOG (GNUNET_ERROR_TYPE_DEBUG, "Request pending, not processing queue\n");
723 if (h->control_pending_head != NULL)
725 ntohs (((struct GNUNET_MessageHeader *) &h->
726 control_pending_head[1])->size);
727 else if (h->ready_peer_head != NULL)
729 h->ready_peer_head->pending_head->msize + sizeof (struct SendMessage);
732 LOG (GNUNET_ERROR_TYPE_DEBUG,
733 "Request queue empty, not processing queue\n");
734 return; /* no pending message */
737 GNUNET_CLIENT_notify_transmit_ready (h->client, msize,
738 GNUNET_TIME_UNIT_FOREVER_REL,
739 GNUNET_NO, &transmit_message, h);
744 * Handler for notification messages received from the core.
746 * @param cls our "struct GNUNET_CORE_Handle"
747 * @param msg the message received from the core service
750 main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg)
752 struct GNUNET_CORE_Handle *h = cls;
753 const struct InitReplyMessage *m;
754 const struct ConnectNotifyMessage *cnm;
755 const struct DisconnectNotifyMessage *dnm;
756 const struct NotifyTrafficMessage *ntm;
757 const struct GNUNET_MessageHeader *em;
758 const struct SendMessageReady *smr;
759 const struct GNUNET_CORE_MessageHandler *mh;
760 const struct GNUNET_ATS_Information *ats;
761 GNUNET_CORE_StartupCallback init;
762 struct PeerRecord *pr;
763 struct GNUNET_CORE_TransmitHandle *th;
772 LOG (GNUNET_ERROR_TYPE_INFO,
774 ("Client was disconnected from core service, trying to reconnect.\n"));
778 msize = ntohs (msg->size);
779 LOG (GNUNET_ERROR_TYPE_DEBUG,
780 "Processing message of type %u and size %u from core service\n",
781 ntohs (msg->type), msize);
782 switch (ntohs (msg->type))
784 case GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY:
785 if (ntohs (msg->size) != sizeof (struct InitReplyMessage))
791 m = (const struct InitReplyMessage *) msg;
792 GNUNET_break (0 == ntohl (m->reserved));
793 /* start our message processing loop */
794 if (GNUNET_YES == h->currently_down)
796 h->currently_down = GNUNET_NO;
797 trigger_next_request (h, GNUNET_NO);
799 h->retry_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
800 h->me = m->my_identity;
801 if (NULL != (init = h->init))
803 /* mark so we don't call init on reconnect */
805 LOG (GNUNET_ERROR_TYPE_DEBUG, "Connected to core service of peer `%s'.\n",
806 GNUNET_i2s (&h->me));
807 init (h->cls, h, &h->me);
811 LOG (GNUNET_ERROR_TYPE_DEBUG,
812 "Successfully reconnected to core service.\n");
814 /* fake 'connect to self' */
815 pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &h->me.hashPubKey);
816 GNUNET_assert (NULL == pr);
817 pr = GNUNET_malloc (sizeof (struct PeerRecord));
820 GNUNET_assert (GNUNET_YES ==
821 GNUNET_CONTAINER_multihashmap_put (h->peers,
822 &h->me.hashPubKey, pr,
823 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
824 if (NULL != h->connects)
825 h->connects (h->cls, &h->me, NULL, 0);
827 case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT:
828 if (msize < sizeof (struct ConnectNotifyMessage))
834 cnm = (const struct ConnectNotifyMessage *) msg;
835 ats_count = ntohl (cnm->ats_count);
837 sizeof (struct ConnectNotifyMessage) +
838 ats_count * sizeof (struct GNUNET_ATS_Information))
844 LOG (GNUNET_ERROR_TYPE_DEBUG,
845 "Received notification about connection from `%s'.\n",
846 GNUNET_i2s (&cnm->peer));
847 if (0 == memcmp (&h->me, &cnm->peer, sizeof (struct GNUNET_PeerIdentity)))
849 /* connect to self!? */
853 pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &cnm->peer.hashPubKey);
860 pr = GNUNET_malloc (sizeof (struct PeerRecord));
861 pr->peer = cnm->peer;
863 GNUNET_assert (GNUNET_YES ==
864 GNUNET_CONTAINER_multihashmap_put (h->peers,
865 &cnm->peer.hashPubKey, pr,
866 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
867 ats = (const struct GNUNET_ATS_Information *) &cnm[1];
868 if (NULL != h->connects)
869 h->connects (h->cls, &cnm->peer, ats, ats_count);
871 case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT:
872 if (msize != sizeof (struct DisconnectNotifyMessage))
878 dnm = (const struct DisconnectNotifyMessage *) msg;
879 if (0 == memcmp (&h->me, &dnm->peer, sizeof (struct GNUNET_PeerIdentity)))
881 /* connection to self!? */
885 GNUNET_break (0 == ntohl (dnm->reserved));
886 LOG (GNUNET_ERROR_TYPE_DEBUG,
887 "Received notification about disconnect from `%s'.\n",
888 GNUNET_i2s (&dnm->peer));
889 pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &dnm->peer.hashPubKey);
896 trigger = ((pr->prev != NULL) || (pr->next != NULL) ||
897 (h->ready_peer_head == pr));
898 disconnect_and_free_peer_entry (h, &dnm->peer.hashPubKey, pr);
900 trigger_next_request (h, GNUNET_NO);
902 case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND:
903 if (msize < sizeof (struct NotifyTrafficMessage))
909 ntm = (const struct NotifyTrafficMessage *) msg;
910 ats_count = ntohl (ntm->ats_count);
912 sizeof (struct NotifyTrafficMessage) +
913 ats_count * sizeof (struct GNUNET_ATS_Information) +
914 sizeof (struct GNUNET_MessageHeader)) )
920 ats = (const struct GNUNET_ATS_Information*) &ntm[1];
921 em = (const struct GNUNET_MessageHeader *) &ats[ats_count];
922 LOG (GNUNET_ERROR_TYPE_DEBUG,
923 "Received message of type %u and size %u from peer `%4s'\n",
924 ntohs (em->type), ntohs (em->size), GNUNET_i2s (&ntm->peer));
925 if ((GNUNET_NO == h->inbound_hdr_only) &&
927 ntohs (em->size) + sizeof (struct NotifyTrafficMessage) +
928 +ats_count * sizeof (struct GNUNET_ATS_Information)))
934 et = ntohs (em->type);
935 for (hpos = 0; hpos < h->hcnt; hpos++)
937 mh = &h->handlers[hpos];
940 if ((mh->expected_size != ntohs (em->size)) && (mh->expected_size != 0))
942 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
943 "Unexpected message size %u for message of type %u from peer `%4s'\n",
944 htons (em->size), mh->type, GNUNET_i2s (&ntm->peer));
948 pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &ntm->peer.hashPubKey);
956 h->handlers[hpos].callback (h->cls, &ntm->peer, em, ats,
959 /* error in processing, do not process other messages! */
963 if (NULL != h->inbound_notify)
964 h->inbound_notify (h->cls, &ntm->peer, em, ats, ats_count);
966 case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_OUTBOUND:
967 if (msize < sizeof (struct NotifyTrafficMessage))
973 ntm = (const struct NotifyTrafficMessage *) msg;
974 if (0 == memcmp (&h->me, &ntm->peer, sizeof (struct GNUNET_PeerIdentity)))
980 ats_count = ntohl (ntm->ats_count);
982 sizeof (struct NotifyTrafficMessage) +
983 ats_count * sizeof (struct GNUNET_ATS_Information) +
984 sizeof (struct GNUNET_MessageHeader)) )
990 ats = (const struct GNUNET_ATS_Information*) &ntm[1];
991 em = (const struct GNUNET_MessageHeader *) &ats[ats_count];
992 LOG (GNUNET_ERROR_TYPE_DEBUG,
993 "Received notification about transmission to `%s'.\n",
994 GNUNET_i2s (&ntm->peer));
995 if ((GNUNET_NO == h->outbound_hdr_only) &&
997 ntohs (em->size) + sizeof (struct NotifyTrafficMessage) +
998 ats_count * sizeof (struct GNUNET_ATS_Information)))
1001 reconnect_later (h);
1004 if (NULL == h->outbound_notify)
1009 h->outbound_notify (h->cls, &ntm->peer, em, ats, ats_count);
1011 case GNUNET_MESSAGE_TYPE_CORE_SEND_READY:
1012 if (msize != sizeof (struct SendMessageReady))
1015 reconnect_later (h);
1018 smr = (const struct SendMessageReady *) msg;
1019 pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &smr->peer.hashPubKey);
1023 reconnect_later (h);
1026 LOG (GNUNET_ERROR_TYPE_DEBUG,
1027 "Received notification about transmission readiness to `%s'.\n",
1028 GNUNET_i2s (&smr->peer));
1029 if (NULL == pr->pending_head)
1031 /* request must have been cancelled between the original request
1032 * and the response from core, ignore core's readiness */
1036 th = pr->pending_head;
1037 if (ntohs (smr->smr_id) != th->smr_id)
1039 /* READY message is for expired or cancelled message,
1040 * ignore! (we should have already sent another request) */
1043 if ((NULL != pr->prev) || (NULL != pr->next) || (h->ready_peer_head == pr))
1045 /* we should not already be on the ready list... */
1047 reconnect_later (h);
1050 GNUNET_CONTAINER_DLL_insert (h->ready_peer_head, h->ready_peer_tail, pr);
1051 trigger_next_request (h, GNUNET_NO);
1054 reconnect_later (h);
1057 GNUNET_CLIENT_receive (h->client, &main_notify_handler, h,
1058 GNUNET_TIME_UNIT_FOREVER_REL);
1063 * Task executed once we are done transmitting the INIT message.
1064 * Starts our 'receive' loop.
1066 * @param cls the 'struct GNUNET_CORE_Handle'
1067 * @param success were we successful
1070 init_done_task (void *cls, int success)
1072 struct GNUNET_CORE_Handle *h = cls;
1074 if (GNUNET_SYSERR == success)
1075 return; /* shutdown */
1076 if (GNUNET_NO == success)
1078 LOG (GNUNET_ERROR_TYPE_DEBUG,
1079 "Failed to exchange INIT with core, retrying\n");
1080 if (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK)
1081 reconnect_later (h);
1084 GNUNET_CLIENT_receive (h->client, &main_notify_handler, h,
1085 GNUNET_TIME_UNIT_FOREVER_REL);
1090 * Our current client connection went down. Clean it up
1091 * and try to reconnect!
1093 * @param h our handle to the core service
1096 reconnect (struct GNUNET_CORE_Handle *h)
1098 struct ControlMessage *cm;
1099 struct InitMessage *init;
1105 GNUNET_assert (NULL == h->client);
1106 GNUNET_assert (h->currently_down == GNUNET_YES);
1107 h->client = GNUNET_CLIENT_connect ("core", h->cfg);
1108 if (NULL == h->client)
1110 reconnect_later (h);
1113 msize = h->hcnt * sizeof (uint16_t) + sizeof (struct InitMessage);
1114 cm = GNUNET_malloc (sizeof (struct ControlMessage) + msize);
1115 cm->cont = &init_done_task;
1117 init = (struct InitMessage *) &cm[1];
1118 init->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_INIT);
1119 init->header.size = htons (msize);
1121 if (h->inbound_notify != NULL)
1123 if (h->inbound_hdr_only)
1124 opt |= GNUNET_CORE_OPTION_SEND_HDR_INBOUND;
1126 opt |= GNUNET_CORE_OPTION_SEND_FULL_INBOUND;
1128 if (h->outbound_notify != NULL)
1130 if (h->outbound_hdr_only)
1131 opt |= GNUNET_CORE_OPTION_SEND_HDR_OUTBOUND;
1133 opt |= GNUNET_CORE_OPTION_SEND_FULL_OUTBOUND;
1135 LOG (GNUNET_ERROR_TYPE_INFO,
1136 "(Re)connecting to CORE service, monitoring messages of type %u\n",
1139 init->options = htonl (opt);
1140 ts = (uint16_t *) & init[1];
1141 for (hpos = 0; hpos < h->hcnt; hpos++)
1142 ts[hpos] = htons (h->handlers[hpos].type);
1143 GNUNET_CONTAINER_DLL_insert (h->control_pending_head, h->control_pending_tail,
1145 trigger_next_request (h, GNUNET_YES);
1151 * Connect to the core service. Note that the connection may
1152 * complete (or fail) asynchronously.
1154 * @param cfg configuration to use
1155 * @param queue_size size of the per-peer message queue
1156 * @param cls closure for the various callbacks that follow (including handlers in the handlers array)
1157 * @param init callback to call once we have successfully
1158 * connected to the core service
1159 * @param connects function to call on peer connect, can be NULL
1160 * @param disconnects function to call on peer disconnect / timeout, can be NULL
1161 * @param inbound_notify function to call for all inbound messages, can be NULL
1162 * @param inbound_hdr_only set to GNUNET_YES if inbound_notify will only read the
1163 * GNUNET_MessageHeader and hence we do not need to give it the full message;
1164 * can be used to improve efficiency, ignored if inbound_notify is NULLL
1165 * @param outbound_notify function to call for all outbound messages, can be NULL
1166 * @param outbound_hdr_only set to GNUNET_YES if outbound_notify will only read the
1167 * GNUNET_MessageHeader and hence we do not need to give it the full message
1168 * can be used to improve efficiency, ignored if outbound_notify is NULLL
1169 * @param handlers callbacks for messages we care about, NULL-terminated
1170 * @return handle to the core service (only useful for disconnect until 'init' is called);
1171 * NULL on error (in this case, init is never called)
1173 struct GNUNET_CORE_Handle *
1174 GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
1175 unsigned int queue_size, void *cls,
1176 GNUNET_CORE_StartupCallback init,
1177 GNUNET_CORE_ConnectEventHandler connects,
1178 GNUNET_CORE_DisconnectEventHandler disconnects,
1179 GNUNET_CORE_MessageCallback inbound_notify,
1180 int inbound_hdr_only,
1181 GNUNET_CORE_MessageCallback outbound_notify,
1182 int outbound_hdr_only,
1183 const struct GNUNET_CORE_MessageHandler *handlers)
1185 struct GNUNET_CORE_Handle *h;
1187 h = GNUNET_malloc (sizeof (struct GNUNET_CORE_Handle));
1189 h->queue_size = queue_size;
1192 h->connects = connects;
1193 h->disconnects = disconnects;
1194 h->inbound_notify = inbound_notify;
1195 h->outbound_notify = outbound_notify;
1196 h->inbound_hdr_only = inbound_hdr_only;
1197 h->outbound_hdr_only = outbound_hdr_only;
1198 h->handlers = handlers;
1200 h->currently_down = GNUNET_YES;
1201 h->peers = GNUNET_CONTAINER_multihashmap_create (128);
1202 h->retry_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
1203 if (NULL != handlers)
1204 while (handlers[h->hcnt].callback != NULL)
1206 GNUNET_assert (h->hcnt <
1207 (GNUNET_SERVER_MAX_MESSAGE_SIZE -
1208 sizeof (struct InitMessage)) / sizeof (uint16_t));
1209 LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to CORE service\n");
1216 * Disconnect from the core service. This function can only
1217 * be called *after* all pending 'GNUNET_CORE_notify_transmit_ready'
1218 * requests have been explicitly canceled.
1220 * @param handle connection to core to disconnect
1223 GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle)
1225 struct ControlMessage *cm;
1227 LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting from CORE service\n");
1228 if (handle->cth != NULL)
1230 GNUNET_CLIENT_notify_transmit_ready_cancel (handle->cth);
1233 while (NULL != (cm = handle->control_pending_head))
1235 GNUNET_CONTAINER_DLL_remove (handle->control_pending_head,
1236 handle->control_pending_tail, cm);
1239 if (cm->cont != NULL)
1240 cm->cont (cm->cont_cls, GNUNET_SYSERR);
1243 if (handle->client != NULL)
1245 GNUNET_CLIENT_disconnect (handle->client);
1246 handle->client = NULL;
1248 GNUNET_CONTAINER_multihashmap_iterate (handle->peers,
1249 &disconnect_and_free_peer_entry,
1251 if (handle->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
1253 GNUNET_SCHEDULER_cancel (handle->reconnect_task);
1254 handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
1256 GNUNET_CONTAINER_multihashmap_destroy (handle->peers);
1257 handle->peers = NULL;
1258 GNUNET_break (handle->ready_peer_head == NULL);
1259 GNUNET_free (handle);
1264 * Task that calls 'request_next_transmission'.
1266 * @param cls the 'struct PeerRecord*'
1267 * @param tc scheduler context
1270 run_request_next_transmission (void *cls,
1271 const struct GNUNET_SCHEDULER_TaskContext *tc)
1273 struct PeerRecord *pr = cls;
1275 pr->ntr_task = GNUNET_SCHEDULER_NO_TASK;
1276 request_next_transmission (pr);
1281 * Ask the core to call "notify" once it is ready to transmit the
1282 * given number of bytes to the specified "target". Must only be
1283 * called after a connection to the respective peer has been
1284 * established (and the client has been informed about this).
1286 * @param handle connection to core service
1287 * @param cork is corking allowed for this transmission?
1288 * @param priority how important is the message?
1289 * @param maxdelay how long can the message wait?
1290 * @param target who should receive the message, never NULL (can be this peer's identity for loopback)
1291 * @param notify_size how many bytes of buffer space does notify want?
1292 * @param notify function to call when buffer space is available
1293 * @param notify_cls closure for notify
1294 * @return non-NULL if the notify callback was queued,
1295 * NULL if we can not even queue the request (insufficient
1296 * memory); if NULL is returned, "notify" will NOT be called.
1298 struct GNUNET_CORE_TransmitHandle *
1299 GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle, int cork,
1301 struct GNUNET_TIME_Relative maxdelay,
1302 const struct GNUNET_PeerIdentity *target,
1304 GNUNET_CONNECTION_TransmitReadyNotify notify,
1307 struct PeerRecord *pr;
1308 struct GNUNET_CORE_TransmitHandle *th;
1309 struct GNUNET_CORE_TransmitHandle *pos;
1310 struct GNUNET_CORE_TransmitHandle *prev;
1311 struct GNUNET_CORE_TransmitHandle *minp;
1313 pr = GNUNET_CONTAINER_multihashmap_get (handle->peers, &target->hashPubKey);
1316 /* attempt to send to peer that is not connected */
1317 LOG (GNUNET_ERROR_TYPE_WARNING,
1318 "Attempting to send to peer `%s' from peer `%s', but not connected!\n",
1319 GNUNET_i2s (target), GNUNET_h2s (&handle->me.hashPubKey));
1323 GNUNET_assert (notify_size + sizeof (struct SendMessage) <
1324 GNUNET_SERVER_MAX_MESSAGE_SIZE);
1325 th = GNUNET_malloc (sizeof (struct GNUNET_CORE_TransmitHandle));
1327 GNUNET_assert (NULL != notify);
1328 th->get_message = notify;
1329 th->get_message_cls = notify_cls;
1330 th->timeout = GNUNET_TIME_relative_to_absolute (maxdelay);
1331 th->priority = priority;
1332 th->msize = notify_size;
1334 /* bound queue size */
1335 if (pr->queue_size == handle->queue_size)
1337 /* find lowest-priority entry, but skip the head of the list */
1338 minp = pr->pending_head->next;
1340 while (prev != NULL)
1342 if (prev->priority < minp->priority)
1348 GNUNET_break (handle->queue_size != 0);
1349 GNUNET_break (pr->queue_size == 1);
1351 LOG (GNUNET_ERROR_TYPE_DEBUG,
1352 "Dropping transmission request: cannot drop queue head and limit is one\n");
1355 if (priority <= minp->priority)
1357 LOG (GNUNET_ERROR_TYPE_DEBUG,
1358 "Dropping transmission request: priority too low\n");
1360 return NULL; /* priority too low */
1362 GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, minp);
1364 GNUNET_assert (0 == minp->get_message (minp->get_message_cls, 0, NULL));
1368 /* Order entries by deadline, but SKIP 'HEAD' (as we may have transmitted
1369 * that request already or might even already be approved to transmit that
1370 * message to core) */
1371 pos = pr->pending_head;
1373 pos = pos->next; /* skip head */
1375 /* insertion sort */
1377 while ((pos != NULL) && (pos->timeout.abs_value < th->timeout.abs_value))
1382 GNUNET_CONTAINER_DLL_insert_after (pr->pending_head, pr->pending_tail, prev,
1385 /* was the request queue previously empty? */
1386 LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmission request added to queue\n");
1387 if ((pr->pending_head == th) && (pr->ntr_task == GNUNET_SCHEDULER_NO_TASK) &&
1388 (pr->next == NULL) && (pr->prev == NULL) &&
1389 (handle->ready_peer_head != pr))
1391 GNUNET_SCHEDULER_add_now (&run_request_next_transmission, pr);
1397 * Cancel the specified transmission-ready notification.
1399 * @param th handle that was returned by "notify_transmit_ready".
1402 GNUNET_CORE_notify_transmit_ready_cancel (struct GNUNET_CORE_TransmitHandle *th)
1404 struct PeerRecord *pr = th->peer;
1405 struct GNUNET_CORE_Handle *h = pr->ch;
1408 was_head = (pr->pending_head == th);
1409 GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, th);
1413 /* we're currently in the control queue, remove */
1414 GNUNET_CONTAINER_DLL_remove (h->control_pending_head,
1415 h->control_pending_tail, th->cm);
1416 GNUNET_free (th->cm);
1421 if ((pr->prev != NULL) || (pr->next != NULL) || (pr == h->ready_peer_head))
1423 /* the request that was 'approved' by core was
1424 * canceled before it could be transmitted; remove
1425 * us from the 'ready' list */
1426 GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr);
1428 request_next_transmission (pr);
1433 /* end of core_api.c */