2 This file is part of GNUnet.
3 (C) 2009, 2010 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file core/core_api.c
23 * @brief core service; this is the main API for encrypted P2P
25 * @author Christian Grothoff
28 #include "gnunet_constants.h"
29 #include "gnunet_core_service.h"
32 #define LOG(kind,...) GNUNET_log_from (kind, "core-api",__VA_ARGS__)
35 * Information we track for each peer.
41 * We generally do NOT keep peer records in a DLL; this
42 * DLL is only used IF this peer's 'pending_head' message
43 * is ready for transmission.
45 struct PeerRecord *prev;
48 * We generally do NOT keep peer records in a DLL; this
49 * DLL is only used IF this peer's 'pending_head' message
50 * is ready for transmission.
52 struct PeerRecord *next;
55 * Peer the record is about.
57 struct GNUNET_PeerIdentity peer;
60 * Corresponding core handle.
62 struct GNUNET_CORE_Handle *ch;
65 * Head of doubly-linked list of pending requests.
66 * Requests are sorted by deadline *except* for HEAD,
67 * which is only modified upon transmission to core.
69 struct GNUNET_CORE_TransmitHandle *pending_head;
72 * Tail of doubly-linked list of pending requests.
74 struct GNUNET_CORE_TransmitHandle *pending_tail;
77 * ID of timeout task for the 'pending_head' handle
78 * which is the one with the smallest timeout.
80 GNUNET_SCHEDULER_TaskIdentifier timeout_task;
83 * ID of task to run 'next_request_transmission'.
85 GNUNET_SCHEDULER_TaskIdentifier ntr_task;
88 * Current size of the queue of pending requests.
90 unsigned int queue_size;
93 * SendMessageRequest ID generator for this peer.
101 * Type of function called upon completion.
104 * @param success GNUNET_OK on success (which for request_connect
105 * ONLY means that we transmitted the connect request to CORE,
106 * it does not mean that we are actually now connected!);
107 * GNUNET_NO on timeout,
108 * GNUNET_SYSERR if core was shut down
110 typedef void (*GNUNET_CORE_ControlContinuation) (void *cls, int success);
114 * Entry in a doubly-linked list of control messages to be transmitted
115 * to the core service. Control messages include traffic allocation,
116 * connection requests and of course our initial 'init' request.
118 * The actual message is allocated at the end of this struct.
120 struct ControlMessage
123 * This is a doubly-linked list.
125 struct ControlMessage *next;
128 * This is a doubly-linked list.
130 struct ControlMessage *prev;
133 * Function to run after transmission failed/succeeded.
135 GNUNET_CORE_ControlContinuation cont;
138 * Closure for 'cont'.
143 * Transmit handle (if one is associated with this ControlMessage), or NULL.
145 struct GNUNET_CORE_TransmitHandle *th;
151 * Context for the core service connection.
153 struct GNUNET_CORE_Handle
157 * Configuration we're using.
159 const struct GNUNET_CONFIGURATION_Handle *cfg;
162 * Closure for the various callbacks.
167 * Function to call once we've handshaked with the core service.
169 GNUNET_CORE_StartupCallback init;
172 * Function to call whenever we're notified about a peer connecting.
174 GNUNET_CORE_ConnectEventHandler connects;
177 * Function to call whenever we're notified about a peer disconnecting.
179 GNUNET_CORE_DisconnectEventHandler disconnects;
182 * Function to call whenever we receive an inbound message.
184 GNUNET_CORE_MessageCallback inbound_notify;
187 * Function to call whenever we receive an outbound message.
189 GNUNET_CORE_MessageCallback outbound_notify;
192 * Function handlers for messages of particular type.
194 const struct GNUNET_CORE_MessageHandler *handlers;
197 * Our connection to the service.
199 struct GNUNET_CLIENT_Connection *client;
202 * Handle for our current transmission request.
204 struct GNUNET_CLIENT_TransmitHandle *cth;
207 * Head of doubly-linked list of pending requests.
209 struct ControlMessage *control_pending_head;
212 * Tail of doubly-linked list of pending requests.
214 struct ControlMessage *control_pending_tail;
217 * Head of doubly-linked list of peers that are core-approved
218 * to send their next message.
220 struct PeerRecord *ready_peer_head;
223 * Tail of doubly-linked list of peers that are core-approved
224 * to send their next message.
226 struct PeerRecord *ready_peer_tail;
229 * Hash map listing all of the peers that we are currently
232 struct GNUNET_CONTAINER_MultiHashMap *peers;
235 * Identity of this peer.
237 struct GNUNET_PeerIdentity me;
240 * ID of reconnect task (if any).
242 GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
245 * Current delay we use for re-trying to connect to core.
247 struct GNUNET_TIME_Relative retry_backoff;
250 * Number of messages we are allowed to queue per target.
252 unsigned int queue_size;
255 * Number of entries in the handlers array.
260 * For inbound notifications without a specific handler, do
261 * we expect to only receive headers?
263 int inbound_hdr_only;
266 * For outbound notifications without a specific handler, do
267 * we expect to only receive headers?
269 int outbound_hdr_only;
272 * Are we currently disconnected and hence unable to forward
281 * Handle for a transmission request.
283 struct GNUNET_CORE_TransmitHandle
287 * We keep active transmit handles in a doubly-linked list.
289 struct GNUNET_CORE_TransmitHandle *next;
292 * We keep active transmit handles in a doubly-linked list.
294 struct GNUNET_CORE_TransmitHandle *prev;
297 * Corresponding peer record.
299 struct PeerRecord *peer;
302 * Corresponding SEND_REQUEST message. Only non-NULL
303 * while SEND_REQUEST message is pending.
305 struct ControlMessage *cm;
308 * Function that will be called to get the actual request
309 * (once we are ready to transmit this request to the core).
310 * The function will be called with a NULL buffer to signal
313 GNUNET_CONNECTION_TransmitReadyNotify get_message;
316 * Closure for get_message.
318 void *get_message_cls;
321 * Timeout for this handle.
323 struct GNUNET_TIME_Absolute timeout;
326 * How important is this message?
331 * Size of this request.
336 * Send message request ID for this request.
341 * Is corking allowed?
349 * Our current client connection went down. Clean it up
350 * and try to reconnect!
352 * @param h our handle to the core service
355 reconnect (struct GNUNET_CORE_Handle *h);
359 * Task schedule to try to re-connect to core.
361 * @param cls the 'struct GNUNET_CORE_Handle'
362 * @param tc task context
365 reconnect_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
367 struct GNUNET_CORE_Handle *h = cls;
369 h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
370 LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to CORE service after delay\n");
376 * Notify clients about disconnect and free
377 * the entry for connected peer.
379 * @param cls the 'struct GNUNET_CORE_Handle*'
380 * @param key the peer identity (not used)
381 * @param value the 'struct PeerRecord' to free.
382 * @return GNUNET_YES (continue)
385 disconnect_and_free_peer_entry (void *cls, const GNUNET_HashCode * key,
388 struct GNUNET_CORE_Handle *h = cls;
389 struct GNUNET_CORE_TransmitHandle *th;
390 struct PeerRecord *pr = value;
392 if (pr->timeout_task != GNUNET_SCHEDULER_NO_TASK)
394 GNUNET_SCHEDULER_cancel (pr->timeout_task);
395 pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
397 if (pr->ntr_task != GNUNET_SCHEDULER_NO_TASK)
399 GNUNET_SCHEDULER_cancel (pr->ntr_task);
400 pr->ntr_task = GNUNET_SCHEDULER_NO_TASK;
402 if ((pr->prev != NULL) || (pr->next != NULL) || (h->ready_peer_head == pr))
403 GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr);
404 if (h->disconnects != NULL)
405 h->disconnects (h->cls, &pr->peer);
406 /* all requests should have been cancelled, clean up anyway, just in case */
407 GNUNET_break (pr->queue_size == 0);
408 while (NULL != (th = pr->pending_head))
411 GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, th);
417 /* done with 'voluntary' cleanups, now on to normal freeing */
418 GNUNET_assert (GNUNET_YES ==
419 GNUNET_CONTAINER_multihashmap_remove (h->peers, key, pr));
420 GNUNET_assert (pr->pending_head == NULL);
421 GNUNET_assert (pr->pending_tail == NULL);
422 GNUNET_assert (pr->ch == h);
423 GNUNET_assert (pr->queue_size == 0);
424 GNUNET_assert (pr->timeout_task == GNUNET_SCHEDULER_NO_TASK);
425 GNUNET_assert (pr->ntr_task == GNUNET_SCHEDULER_NO_TASK);
432 * Close down any existing connection to the CORE service and
433 * try re-establishing it later.
435 * @param h our handle
438 reconnect_later (struct GNUNET_CORE_Handle *h)
440 struct ControlMessage *cm;
441 struct PeerRecord *pr;
443 GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
446 GNUNET_CLIENT_notify_transmit_ready_cancel (h->cth);
449 if (h->client != NULL)
451 GNUNET_CLIENT_disconnect (h->client);
454 h->currently_down = GNUNET_YES;
455 GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
457 GNUNET_SCHEDULER_add_delayed (h->retry_backoff, &reconnect_task, h);
458 while (NULL != (cm = h->control_pending_head))
460 GNUNET_CONTAINER_DLL_remove (h->control_pending_head,
461 h->control_pending_tail, cm);
464 if (cm->cont != NULL)
465 cm->cont (cm->cont_cls, GNUNET_NO);
468 GNUNET_CONTAINER_multihashmap_iterate (h->peers,
469 &disconnect_and_free_peer_entry, h);
470 while (NULL != (pr = h->ready_peer_head))
471 GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr);
472 GNUNET_assert (h->control_pending_head == NULL);
474 GNUNET_TIME_relative_min (GNUNET_TIME_UNIT_SECONDS, h->retry_backoff);
475 h->retry_backoff = GNUNET_TIME_relative_multiply (h->retry_backoff, 2);
480 * Check the list of pending requests, send the next
483 * @param h core handle
484 * @param ignore_currently_down transmit message even if not initialized?
487 trigger_next_request (struct GNUNET_CORE_Handle *h, int ignore_currently_down);
491 * The given request hit its timeout. Remove from the
492 * doubly-linked list and call the respective continuation.
494 * @param cls the transmit handle of the request that timed out
495 * @param tc context, can be NULL (!)
498 transmission_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
502 * Send a control message to the peer asking for transmission
503 * of the message in the given peer record.
505 * @param pr peer to request transmission to
508 request_next_transmission (struct PeerRecord *pr)
510 struct GNUNET_CORE_Handle *h = pr->ch;
511 struct ControlMessage *cm;
512 struct SendMessageRequest *smr;
513 struct GNUNET_CORE_TransmitHandle *th;
515 if (pr->timeout_task != GNUNET_SCHEDULER_NO_TASK)
517 GNUNET_SCHEDULER_cancel (pr->timeout_task);
518 pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
520 if (NULL == (th = pr->pending_head))
522 trigger_next_request (h, GNUNET_NO);
526 return; /* already done */
527 GNUNET_assert (pr->prev == NULL);
528 GNUNET_assert (pr->next == NULL);
530 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining
531 (th->timeout), &transmission_timeout, pr);
532 cm = GNUNET_malloc (sizeof (struct ControlMessage) +
533 sizeof (struct SendMessageRequest));
536 smr = (struct SendMessageRequest *) &cm[1];
537 smr->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND_REQUEST);
538 smr->header.size = htons (sizeof (struct SendMessageRequest));
539 smr->priority = htonl (th->priority);
540 smr->deadline = GNUNET_TIME_absolute_hton (th->timeout);
541 smr->peer = pr->peer;
542 smr->queue_size = htonl (pr->queue_size);
543 smr->size = htons (th->msize);
544 smr->smr_id = htons (th->smr_id = pr->smr_id_gen++);
545 GNUNET_CONTAINER_DLL_insert_tail (h->control_pending_head,
546 h->control_pending_tail, cm);
547 LOG (GNUNET_ERROR_TYPE_DEBUG,
548 "Adding SEND REQUEST for peer `%s' to message queue\n",
549 GNUNET_i2s (&pr->peer));
550 trigger_next_request (h, GNUNET_NO);
555 * The given request hit its timeout. Remove from the
556 * doubly-linked list and call the respective continuation.
558 * @param cls the transmit handle of the request that timed out
559 * @param tc context, can be NULL (!)
562 transmission_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
564 struct PeerRecord *pr = cls;
565 struct GNUNET_CORE_Handle *h = pr->ch;
566 struct GNUNET_CORE_TransmitHandle *th;
568 pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
569 th = pr->pending_head;
570 GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, th);
572 if ((pr->prev != NULL) || (pr->next != NULL) || (pr == h->ready_peer_head))
574 /* the request that was 'approved' by core was
575 * canceled before it could be transmitted; remove
576 * us from the 'ready' list */
577 GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr);
581 /* we're currently in the control queue, remove */
582 GNUNET_CONTAINER_DLL_remove (h->control_pending_head,
583 h->control_pending_tail, th->cm);
584 GNUNET_free (th->cm);
586 LOG (GNUNET_ERROR_TYPE_DEBUG,
587 "Signalling timeout of request for transmission to CORE service\n");
588 request_next_transmission (pr);
589 GNUNET_assert (0 == th->get_message (th->get_message_cls, 0, NULL));
595 * Transmit the next message to the core service.
597 * @param cls closure with the 'struct GNUNET_CORE_Handle'
598 * @param size number of bytes available in buf
599 * @param buf where the callee should write the message
600 * @return number of bytes written to buf
603 transmit_message (void *cls, size_t size, void *buf)
605 struct GNUNET_CORE_Handle *h = cls;
606 struct ControlMessage *cm;
607 struct GNUNET_CORE_TransmitHandle *th;
608 struct PeerRecord *pr;
609 struct SendMessage *sm;
610 const struct GNUNET_MessageHeader *hdr;
614 GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
618 LOG (GNUNET_ERROR_TYPE_DEBUG,
619 "Transmission failed, initiating reconnect\n");
623 /* first check for control messages */
624 if (NULL != (cm = h->control_pending_head))
626 hdr = (const struct GNUNET_MessageHeader *) &cm[1];
627 msize = ntohs (hdr->size);
630 trigger_next_request (h, GNUNET_NO);
633 LOG (GNUNET_ERROR_TYPE_DEBUG,
634 "Transmitting control message with %u bytes of type %u to core.\n",
635 (unsigned int) msize, (unsigned int) ntohs (hdr->type));
636 memcpy (buf, hdr, msize);
637 GNUNET_CONTAINER_DLL_remove (h->control_pending_head,
638 h->control_pending_tail, cm);
641 if (NULL != cm->cont)
642 cm->cont (cm->cont_cls, GNUNET_OK);
644 trigger_next_request (h, GNUNET_NO);
647 /* now check for 'ready' P2P messages */
648 if (NULL != (pr = h->ready_peer_head))
650 GNUNET_assert (pr->pending_head != NULL);
651 th = pr->pending_head;
652 if (size < th->msize + sizeof (struct SendMessage))
654 trigger_next_request (h, GNUNET_NO);
657 GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr);
658 GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, th);
660 if (pr->timeout_task != GNUNET_SCHEDULER_NO_TASK)
662 GNUNET_SCHEDULER_cancel (pr->timeout_task);
663 pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
665 LOG (GNUNET_ERROR_TYPE_DEBUG,
666 "Transmitting SEND request to `%s' with %u bytes.\n",
667 GNUNET_i2s (&pr->peer), (unsigned int) th->msize);
668 sm = (struct SendMessage *) buf;
669 sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND);
670 sm->priority = htonl (th->priority);
671 sm->deadline = GNUNET_TIME_absolute_hton (th->timeout);
673 sm->cork = htonl ((uint32_t) th->cork);
674 sm->reserved = htonl (0);
676 th->get_message (th->get_message_cls,
677 size - sizeof (struct SendMessage), &sm[1]);
679 LOG (GNUNET_ERROR_TYPE_DEBUG,
680 "Transmitting SEND request to `%s' yielded %u bytes.\n",
681 GNUNET_i2s (&pr->peer), ret);
685 LOG (GNUNET_ERROR_TYPE_DEBUG,
686 "Size of clients message to peer %s is 0!\n",
687 GNUNET_i2s (&pr->peer));
688 /* client decided to send nothing! */
689 request_next_transmission (pr);
692 LOG (GNUNET_ERROR_TYPE_DEBUG,
693 "Produced SEND message to core with %u bytes payload\n",
695 GNUNET_assert (ret >= sizeof (struct GNUNET_MessageHeader));
696 if (ret + sizeof (struct SendMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
699 request_next_transmission (pr);
702 ret += sizeof (struct SendMessage);
703 sm->header.size = htons (ret);
704 GNUNET_assert (ret <= size);
705 request_next_transmission (pr);
713 * Check the list of pending requests, send the next
716 * @param h core handle
717 * @param ignore_currently_down transmit message even if not initialized?
720 trigger_next_request (struct GNUNET_CORE_Handle *h, int ignore_currently_down)
724 if ((GNUNET_YES == h->currently_down) && (ignore_currently_down == GNUNET_NO))
726 LOG (GNUNET_ERROR_TYPE_DEBUG,
727 "Core connection down, not processing queue\n");
732 LOG (GNUNET_ERROR_TYPE_DEBUG, "Request pending, not processing queue\n");
735 if (h->control_pending_head != NULL)
737 ntohs (((struct GNUNET_MessageHeader *) &h->
738 control_pending_head[1])->size);
739 else if (h->ready_peer_head != NULL)
741 h->ready_peer_head->pending_head->msize + sizeof (struct SendMessage);
744 LOG (GNUNET_ERROR_TYPE_DEBUG,
745 "Request queue empty, not processing queue\n");
746 return; /* no pending message */
749 GNUNET_CLIENT_notify_transmit_ready (h->client, msize,
750 GNUNET_TIME_UNIT_FOREVER_REL,
751 GNUNET_NO, &transmit_message, h);
756 * Handler for notification messages received from the core.
758 * @param cls our "struct GNUNET_CORE_Handle"
759 * @param msg the message received from the core service
762 main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg)
764 struct GNUNET_CORE_Handle *h = cls;
765 const struct InitReplyMessage *m;
766 const struct ConnectNotifyMessage *cnm;
767 const struct DisconnectNotifyMessage *dnm;
768 const struct NotifyTrafficMessage *ntm;
769 const struct GNUNET_MessageHeader *em;
770 const struct SendMessageReady *smr;
771 const struct GNUNET_CORE_MessageHandler *mh;
772 const struct GNUNET_ATS_Information *ats;
773 GNUNET_CORE_StartupCallback init;
774 struct PeerRecord *pr;
775 struct GNUNET_CORE_TransmitHandle *th;
784 LOG (GNUNET_ERROR_TYPE_INFO,
786 ("Client was disconnected from core service, trying to reconnect.\n"));
790 msize = ntohs (msg->size);
791 LOG (GNUNET_ERROR_TYPE_DEBUG,
792 "Processing message of type %u and size %u from core service\n",
793 ntohs (msg->type), msize);
794 switch (ntohs (msg->type))
796 case GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY:
797 if (ntohs (msg->size) != sizeof (struct InitReplyMessage))
803 m = (const struct InitReplyMessage *) msg;
804 GNUNET_break (0 == ntohl (m->reserved));
805 /* start our message processing loop */
806 if (GNUNET_YES == h->currently_down)
808 h->currently_down = GNUNET_NO;
809 trigger_next_request (h, GNUNET_NO);
811 h->retry_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
812 h->me = m->my_identity;
813 if (NULL != (init = h->init))
815 /* mark so we don't call init on reconnect */
817 LOG (GNUNET_ERROR_TYPE_DEBUG, "Connected to core service of peer `%s'.\n",
818 GNUNET_i2s (&h->me));
819 init (h->cls, h, &h->me);
823 LOG (GNUNET_ERROR_TYPE_DEBUG,
824 "Successfully reconnected to core service.\n");
826 /* fake 'connect to self' */
827 pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &h->me.hashPubKey);
828 GNUNET_assert (NULL == pr);
829 pr = GNUNET_malloc (sizeof (struct PeerRecord));
832 GNUNET_assert (GNUNET_YES ==
833 GNUNET_CONTAINER_multihashmap_put (h->peers,
834 &h->me.hashPubKey, pr,
835 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
836 if (NULL != h->connects)
837 h->connects (h->cls, &h->me, NULL, 0);
839 case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT:
840 if (msize < sizeof (struct ConnectNotifyMessage))
846 cnm = (const struct ConnectNotifyMessage *) msg;
847 ats_count = ntohl (cnm->ats_count);
849 sizeof (struct ConnectNotifyMessage) +
850 ats_count * sizeof (struct GNUNET_ATS_Information))
856 LOG (GNUNET_ERROR_TYPE_DEBUG,
857 "Received notification about connection from `%s'.\n",
858 GNUNET_i2s (&cnm->peer));
859 if (0 == memcmp (&h->me, &cnm->peer, sizeof (struct GNUNET_PeerIdentity)))
861 /* connect to self!? */
865 pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &cnm->peer.hashPubKey);
872 pr = GNUNET_malloc (sizeof (struct PeerRecord));
873 pr->peer = cnm->peer;
875 GNUNET_assert (GNUNET_YES ==
876 GNUNET_CONTAINER_multihashmap_put (h->peers,
877 &cnm->peer.hashPubKey, pr,
878 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
879 ats = (const struct GNUNET_ATS_Information *) &cnm[1];
880 if (NULL != h->connects)
881 h->connects (h->cls, &cnm->peer, ats, ats_count);
883 case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT:
884 if (msize != sizeof (struct DisconnectNotifyMessage))
890 dnm = (const struct DisconnectNotifyMessage *) msg;
891 if (0 == memcmp (&h->me, &dnm->peer, sizeof (struct GNUNET_PeerIdentity)))
893 /* connection to self!? */
897 GNUNET_break (0 == ntohl (dnm->reserved));
898 LOG (GNUNET_ERROR_TYPE_DEBUG,
899 "Received notification about disconnect from `%s'.\n",
900 GNUNET_i2s (&dnm->peer));
901 pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &dnm->peer.hashPubKey);
908 trigger = ((pr->prev != NULL) || (pr->next != NULL) ||
909 (h->ready_peer_head == pr));
910 disconnect_and_free_peer_entry (h, &dnm->peer.hashPubKey, pr);
912 trigger_next_request (h, GNUNET_NO);
914 case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND:
915 if (msize < sizeof (struct NotifyTrafficMessage))
921 ntm = (const struct NotifyTrafficMessage *) msg;
922 ats_count = ntohl (ntm->ats_count);
924 sizeof (struct NotifyTrafficMessage) +
925 ats_count * sizeof (struct GNUNET_ATS_Information) +
926 sizeof (struct GNUNET_MessageHeader)) )
932 ats = (const struct GNUNET_ATS_Information*) &ntm[1];
933 em = (const struct GNUNET_MessageHeader *) &ats[ats_count];
934 LOG (GNUNET_ERROR_TYPE_DEBUG,
935 "Received message of type %u and size %u from peer `%4s'\n",
936 ntohs (em->type), ntohs (em->size), GNUNET_i2s (&ntm->peer));
937 if ((GNUNET_NO == h->inbound_hdr_only) &&
939 ntohs (em->size) + sizeof (struct NotifyTrafficMessage) +
940 +ats_count * sizeof (struct GNUNET_ATS_Information)))
946 et = ntohs (em->type);
947 for (hpos = 0; hpos < h->hcnt; hpos++)
949 mh = &h->handlers[hpos];
952 if ((mh->expected_size != ntohs (em->size)) && (mh->expected_size != 0))
954 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
955 "Unexpected message size %u for message of type %u from peer `%4s'\n",
956 htons (em->size), mh->type, GNUNET_i2s (&ntm->peer));
960 pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &ntm->peer.hashPubKey);
968 h->handlers[hpos].callback (h->cls, &ntm->peer, em, ats,
971 /* error in processing, do not process other messages! */
975 if (NULL != h->inbound_notify)
976 h->inbound_notify (h->cls, &ntm->peer, em, ats, ats_count);
978 case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_OUTBOUND:
979 if (msize < sizeof (struct NotifyTrafficMessage))
985 ntm = (const struct NotifyTrafficMessage *) msg;
986 ats_count = ntohl (ntm->ats_count);
988 sizeof (struct NotifyTrafficMessage) +
989 ats_count * sizeof (struct GNUNET_ATS_Information) +
990 sizeof (struct GNUNET_MessageHeader)) )
996 ats = (const struct GNUNET_ATS_Information*) &ntm[1];
997 em = (const struct GNUNET_MessageHeader *) &ats[ats_count];
998 LOG (GNUNET_ERROR_TYPE_DEBUG,
999 "Received notification about transmission to `%s'.\n",
1000 GNUNET_i2s (&ntm->peer));
1001 if ((GNUNET_NO == h->outbound_hdr_only) &&
1003 ntohs (em->size) + sizeof (struct NotifyTrafficMessage) +
1004 ats_count * sizeof (struct GNUNET_ATS_Information)))
1007 reconnect_later (h);
1010 if (NULL == h->outbound_notify)
1015 h->outbound_notify (h->cls, &ntm->peer, em, ats, ats_count);
1017 case GNUNET_MESSAGE_TYPE_CORE_SEND_READY:
1018 if (msize != sizeof (struct SendMessageReady))
1021 reconnect_later (h);
1024 smr = (const struct SendMessageReady *) msg;
1025 pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &smr->peer.hashPubKey);
1029 reconnect_later (h);
1032 LOG (GNUNET_ERROR_TYPE_DEBUG,
1033 "Received notification about transmission readiness to `%s'.\n",
1034 GNUNET_i2s (&smr->peer));
1035 if (NULL == pr->pending_head)
1037 /* request must have been cancelled between the original request
1038 * and the response from core, ignore core's readiness */
1042 th = pr->pending_head;
1043 if (ntohs (smr->smr_id) != th->smr_id)
1045 /* READY message is for expired or cancelled message,
1046 * ignore! (we should have already sent another request) */
1049 if ((NULL != pr->prev) || (NULL != pr->next) || (h->ready_peer_head == pr))
1051 /* we should not already be on the ready list... */
1053 reconnect_later (h);
1056 GNUNET_CONTAINER_DLL_insert (h->ready_peer_head, h->ready_peer_tail, pr);
1057 trigger_next_request (h, GNUNET_NO);
1060 reconnect_later (h);
1063 GNUNET_CLIENT_receive (h->client, &main_notify_handler, h,
1064 GNUNET_TIME_UNIT_FOREVER_REL);
1069 * Task executed once we are done transmitting the INIT message.
1070 * Starts our 'receive' loop.
1072 * @param cls the 'struct GNUNET_CORE_Handle'
1073 * @param success were we successful
1076 init_done_task (void *cls, int success)
1078 struct GNUNET_CORE_Handle *h = cls;
1080 if (GNUNET_SYSERR == success)
1081 return; /* shutdown */
1082 if (GNUNET_NO == success)
1084 LOG (GNUNET_ERROR_TYPE_DEBUG,
1085 "Failed to exchange INIT with core, retrying\n");
1086 if (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK)
1087 reconnect_later (h);
1090 GNUNET_CLIENT_receive (h->client, &main_notify_handler, h,
1091 GNUNET_TIME_UNIT_FOREVER_REL);
1096 * Our current client connection went down. Clean it up
1097 * and try to reconnect!
1099 * @param h our handle to the core service
1102 reconnect (struct GNUNET_CORE_Handle *h)
1104 struct ControlMessage *cm;
1105 struct InitMessage *init;
1111 GNUNET_assert (NULL == h->client);
1112 GNUNET_assert (h->currently_down == GNUNET_YES);
1113 h->client = GNUNET_CLIENT_connect ("core", h->cfg);
1114 if (NULL == h->client)
1116 reconnect_later (h);
1119 msize = h->hcnt * sizeof (uint16_t) + sizeof (struct InitMessage);
1120 cm = GNUNET_malloc (sizeof (struct ControlMessage) + msize);
1121 cm->cont = &init_done_task;
1123 init = (struct InitMessage *) &cm[1];
1124 init->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_INIT);
1125 init->header.size = htons (msize);
1127 if (h->inbound_notify != NULL)
1129 if (h->inbound_hdr_only)
1130 opt |= GNUNET_CORE_OPTION_SEND_HDR_INBOUND;
1132 opt |= GNUNET_CORE_OPTION_SEND_FULL_INBOUND;
1134 if (h->outbound_notify != NULL)
1136 if (h->outbound_hdr_only)
1137 opt |= GNUNET_CORE_OPTION_SEND_HDR_OUTBOUND;
1139 opt |= GNUNET_CORE_OPTION_SEND_FULL_OUTBOUND;
1141 LOG (GNUNET_ERROR_TYPE_INFO,
1142 "(Re)connecting to CORE service, monitoring messages of type %u\n",
1145 init->options = htonl (opt);
1146 ts = (uint16_t *) & init[1];
1147 for (hpos = 0; hpos < h->hcnt; hpos++)
1148 ts[hpos] = htons (h->handlers[hpos].type);
1149 GNUNET_CONTAINER_DLL_insert (h->control_pending_head, h->control_pending_tail,
1151 trigger_next_request (h, GNUNET_YES);
1157 * Connect to the core service. Note that the connection may
1158 * complete (or fail) asynchronously.
1160 * @param cfg configuration to use
1161 * @param cls closure for the various callbacks that follow (including handlers in the handlers array)
1162 * @param init callback to call once we have successfully
1163 * connected to the core service
1164 * @param connects function to call on peer connect, can be NULL
1165 * @param disconnects function to call on peer disconnect / timeout, can be NULL
1166 * @param inbound_notify function to call for all inbound messages, can be NULL
1167 * @param inbound_hdr_only set to GNUNET_YES if inbound_notify will only read the
1168 * GNUNET_MessageHeader and hence we do not need to give it the full message;
1169 * can be used to improve efficiency, ignored if inbound_notify is NULLL
1170 * @param outbound_notify function to call for all outbound messages, can be NULL
1171 * @param outbound_hdr_only set to GNUNET_YES if outbound_notify will only read the
1172 * GNUNET_MessageHeader and hence we do not need to give it the full message
1173 * can be used to improve efficiency, ignored if outbound_notify is NULLL
1174 * @param handlers callbacks for messages we care about, NULL-terminated
1175 * @return handle to the core service (only useful for disconnect until 'init' is called);
1176 * NULL on error (in this case, init is never called)
1178 struct GNUNET_CORE_Handle *
1179 GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
1181 GNUNET_CORE_StartupCallback init,
1182 GNUNET_CORE_ConnectEventHandler connects,
1183 GNUNET_CORE_DisconnectEventHandler disconnects,
1184 GNUNET_CORE_MessageCallback inbound_notify,
1185 int inbound_hdr_only,
1186 GNUNET_CORE_MessageCallback outbound_notify,
1187 int outbound_hdr_only,
1188 const struct GNUNET_CORE_MessageHandler *handlers)
1190 struct GNUNET_CORE_Handle *h;
1192 h = GNUNET_malloc (sizeof (struct GNUNET_CORE_Handle));
1194 h->queue_size = 1; // FIXME: remove entirely...
1197 h->connects = connects;
1198 h->disconnects = disconnects;
1199 h->inbound_notify = inbound_notify;
1200 h->outbound_notify = outbound_notify;
1201 h->inbound_hdr_only = inbound_hdr_only;
1202 h->outbound_hdr_only = outbound_hdr_only;
1203 h->handlers = handlers;
1205 h->currently_down = GNUNET_YES;
1206 h->peers = GNUNET_CONTAINER_multihashmap_create (128);
1207 h->retry_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
1208 if (NULL != handlers)
1209 while (handlers[h->hcnt].callback != NULL)
1211 GNUNET_assert (h->hcnt <
1212 (GNUNET_SERVER_MAX_MESSAGE_SIZE -
1213 sizeof (struct InitMessage)) / sizeof (uint16_t));
1214 LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to CORE service\n");
1221 * Disconnect from the core service. This function can only
1222 * be called *after* all pending 'GNUNET_CORE_notify_transmit_ready'
1223 * requests have been explicitly canceled.
1225 * @param handle connection to core to disconnect
1228 GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle)
1230 struct ControlMessage *cm;
1232 LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting from CORE service\n");
1233 if (NULL != handle->cth)
1235 GNUNET_CLIENT_notify_transmit_ready_cancel (handle->cth);
1238 while (NULL != (cm = handle->control_pending_head))
1240 GNUNET_CONTAINER_DLL_remove (handle->control_pending_head,
1241 handle->control_pending_tail, cm);
1244 if (NULL != cm->cont)
1245 cm->cont (cm->cont_cls, GNUNET_SYSERR);
1248 if (NULL != handle->client)
1250 GNUNET_CLIENT_disconnect (handle->client);
1251 handle->client = NULL;
1253 GNUNET_CONTAINER_multihashmap_iterate (handle->peers,
1254 &disconnect_and_free_peer_entry,
1256 if (handle->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
1258 GNUNET_SCHEDULER_cancel (handle->reconnect_task);
1259 handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
1261 GNUNET_CONTAINER_multihashmap_destroy (handle->peers);
1262 handle->peers = NULL;
1263 GNUNET_break (handle->ready_peer_head == NULL);
1264 GNUNET_free (handle);
1269 * Task that calls 'request_next_transmission'.
1271 * @param cls the 'struct PeerRecord*'
1272 * @param tc scheduler context
1275 run_request_next_transmission (void *cls,
1276 const struct GNUNET_SCHEDULER_TaskContext *tc)
1278 struct PeerRecord *pr = cls;
1280 pr->ntr_task = GNUNET_SCHEDULER_NO_TASK;
1281 request_next_transmission (pr);
1286 * Ask the core to call "notify" once it is ready to transmit the
1287 * given number of bytes to the specified "target". Must only be
1288 * called after a connection to the respective peer has been
1289 * established (and the client has been informed about this). You may
1290 * have one request of this type pending for each connected peer at
1291 * any time. If a peer disconnects, the application MUST call
1292 * "GNUNET_CORE_notify_transmit_ready_cancel" on the respective
1293 * transmission request, if one such request is pending.
1295 * @param handle connection to core service
1296 * @param cork is corking allowed for this transmission?
1297 * @param priority how important is the message?
1298 * @param maxdelay how long can the message wait?
1299 * @param target who should receive the message, never NULL (can be this peer's identity for loopback)
1300 * @param notify_size how many bytes of buffer space does notify want?
1301 * @param notify function to call when buffer space is available;
1302 * will be called with NULL on timeout; clients MUST cancel
1303 * all pending transmission requests DURING the disconnect
1305 * @param notify_cls closure for notify
1306 * @return non-NULL if the notify callback was queued,
1307 * NULL if we can not even queue the request (request already pending);
1308 * if NULL is returned, "notify" will NOT be called.
1310 struct GNUNET_CORE_TransmitHandle *
1311 GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle, int cork,
1313 struct GNUNET_TIME_Relative maxdelay,
1314 const struct GNUNET_PeerIdentity *target,
1316 GNUNET_CONNECTION_TransmitReadyNotify notify,
1319 struct PeerRecord *pr;
1320 struct GNUNET_CORE_TransmitHandle *th;
1321 struct GNUNET_CORE_TransmitHandle *pos;
1322 struct GNUNET_CORE_TransmitHandle *prev;
1323 struct GNUNET_CORE_TransmitHandle *minp;
1325 pr = GNUNET_CONTAINER_multihashmap_get (handle->peers, &target->hashPubKey);
1328 /* attempt to send to peer that is not connected */
1329 LOG (GNUNET_ERROR_TYPE_WARNING,
1330 "Attempting to send to peer `%s' from peer `%s', but not connected!\n",
1331 GNUNET_i2s (target), GNUNET_h2s (&handle->me.hashPubKey));
1335 GNUNET_assert (notify_size + sizeof (struct SendMessage) <
1336 GNUNET_SERVER_MAX_MESSAGE_SIZE);
1337 th = GNUNET_malloc (sizeof (struct GNUNET_CORE_TransmitHandle));
1339 GNUNET_assert (NULL != notify);
1340 th->get_message = notify;
1341 th->get_message_cls = notify_cls;
1342 th->timeout = GNUNET_TIME_relative_to_absolute (maxdelay);
1343 th->priority = priority;
1344 th->msize = notify_size;
1346 /* bound queue size */
1347 if (pr->queue_size == handle->queue_size)
1349 /* find lowest-priority entry, but skip the head of the list */
1350 minp = pr->pending_head->next;
1352 while (prev != NULL)
1354 if (prev->priority < minp->priority)
1360 GNUNET_break (handle->queue_size != 0);
1361 GNUNET_break (pr->queue_size == 1);
1363 LOG (GNUNET_ERROR_TYPE_DEBUG,
1364 "Dropping transmission request: cannot drop queue head and limit is one\n");
1367 if (priority <= minp->priority)
1369 LOG (GNUNET_ERROR_TYPE_DEBUG,
1370 "Dropping transmission request: priority too low\n");
1372 return NULL; /* priority too low */
1374 GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, minp);
1376 GNUNET_assert (0 == minp->get_message (minp->get_message_cls, 0, NULL));
1380 /* Order entries by deadline, but SKIP 'HEAD' (as we may have transmitted
1381 * that request already or might even already be approved to transmit that
1382 * message to core) */
1383 pos = pr->pending_head;
1385 pos = pos->next; /* skip head */
1387 /* insertion sort */
1389 while ((NULL != pos) && (pos->timeout.abs_value < th->timeout.abs_value))
1394 GNUNET_CONTAINER_DLL_insert_after (pr->pending_head, pr->pending_tail, prev,
1397 /* was the request queue previously empty? */
1398 LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmission request added to queue\n");
1399 if ((pr->pending_head == th) && (pr->ntr_task == GNUNET_SCHEDULER_NO_TASK) &&
1400 (pr->next == NULL) && (pr->prev == NULL) &&
1401 (handle->ready_peer_head != pr))
1403 GNUNET_SCHEDULER_add_now (&run_request_next_transmission, pr);
1409 * Cancel the specified transmission-ready notification.
1411 * @param th handle that was returned by "notify_transmit_ready".
1414 GNUNET_CORE_notify_transmit_ready_cancel (struct GNUNET_CORE_TransmitHandle *th)
1416 struct PeerRecord *pr = th->peer;
1417 struct GNUNET_CORE_Handle *h = pr->ch;
1420 was_head = (pr->pending_head == th);
1421 GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, th);
1425 /* we're currently in the control queue, remove */
1426 GNUNET_CONTAINER_DLL_remove (h->control_pending_head,
1427 h->control_pending_tail, th->cm);
1428 GNUNET_free (th->cm);
1433 if ((NULL != pr->prev) || (NULL != pr->next) || (pr == h->ready_peer_head))
1435 /* the request that was 'approved' by core was
1436 * canceled before it could be transmitted; remove
1437 * us from the 'ready' list */
1438 GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr);
1440 if (NULL != h->client)
1441 request_next_transmission (pr);
1446 /* end of core_api.c */