X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fcore%2Fcore_api.c;h=e9a38271bf317cce14072f629e842e521ca4fa6d;hb=9706e822ff61a85bf2353d2c233eb766ffc13323;hp=2211028fb65e3cd12a13e8f80b9f767251ed4890;hpb=214424c6576d922dd5076ac5377294d81ccca647;p=oweals%2Fgnunet.git diff --git a/src/core/core_api.c b/src/core/core_api.c index 2211028fb..e9a38271b 100644 --- a/src/core/core_api.c +++ b/src/core/core_api.c @@ -23,9 +23,6 @@ * @brief core service; this is the main API for encrypted P2P * communications * @author Christian Grothoff - * - * TODO: - * - implement atsi parsing and passing */ #include "platform.h" #include "gnunet_constants.h" @@ -132,17 +129,19 @@ struct ControlMessage struct ControlMessage *prev; /** - * Function to run after successful transmission (or call with - * reason 'TIMEOUT' on error); called with scheduler context 'NULL' - * on disconnect. + * Function to run after transmission failed/succeeded. */ - GNUNET_SCHEDULER_Task cont; - + GNUNET_CORE_ControlContinuation cont; + /** * Closure for 'cont'. */ void *cont_cls; + /** + * Transmit handle (if one is associated with this ControlMessage), or NULL. + */ + struct GNUNET_CORE_TransmitHandle *th; }; @@ -211,12 +210,12 @@ struct GNUNET_CORE_Handle /** * Head of doubly-linked list of pending requests. */ - struct ControlMessage *pending_head; + struct ControlMessage *control_pending_head; /** * Tail of doubly-linked list of pending requests. */ - struct ControlMessage *pending_tail; + struct ControlMessage *control_pending_tail; /** * Head of doubly-linked list of peers that are core-approved @@ -459,30 +458,40 @@ static void reconnect_later (struct GNUNET_CORE_Handle *h) { struct ControlMessage *cm; + struct PeerRecord *pr; + GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK); if (h->client != NULL) { GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); h->client = NULL; + h->cth = NULL; GNUNET_CONTAINER_multihashmap_iterate (h->peers, &disconnect_and_free_peer_entry, h); } + while (NULL != (pr = h->ready_peer_head)) + GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, + h->ready_peer_tail, + pr); h->currently_down = GNUNET_YES; - while (NULL != (cm = h->pending_head)) + h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->retry_backoff, + &reconnect_task, + h); + while (NULL != (cm = h->control_pending_head)) { - GNUNET_CONTAINER_DLL_remove (h->pending_head, - h->pending_tail, + GNUNET_CONTAINER_DLL_remove (h->control_pending_head, + h->control_pending_tail, cm); - cm->cont (cm->cont_cls, NULL); + if (cm->th != NULL) + cm->th->cm = NULL; + if (cm->cont != NULL) + cm->cont (cm->cont_cls, GNUNET_NO); GNUNET_free (cm); } - GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK); + GNUNET_assert (h->control_pending_head == NULL); h->retry_backoff = GNUNET_TIME_relative_min (GNUNET_TIME_UNIT_SECONDS, h->retry_backoff); - h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->retry_backoff, - &reconnect_task, - h); h->retry_backoff = GNUNET_TIME_relative_multiply (h->retry_backoff, 2); } @@ -511,22 +520,6 @@ transmission_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); -/** - * Control message was sent, mark it as such. - * - * @param cls the 'struct GNUNET_CORE_TransmitHandle*' - * @param tc scheduler context - */ -static void -mark_control_message_sent (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct GNUNET_CORE_TransmitHandle *th = cls; - - th->cm = NULL; -} - - /** * Send a control message to the peer asking for transmission * of the message in the given peer record. @@ -551,6 +544,8 @@ request_next_transmission (struct PeerRecord *pr) trigger_next_request (h, GNUNET_NO); return; } + if (th->cm != NULL) + return; /* already done */ GNUNET_assert (pr->prev == NULL); GNUNET_assert (pr->next == NULL); pr->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (th->timeout), @@ -558,9 +553,8 @@ request_next_transmission (struct PeerRecord *pr) pr); cm = GNUNET_malloc (sizeof (struct ControlMessage) + sizeof (struct SendMessageRequest)); - cm->cont = &mark_control_message_sent; - cm->cont_cls = th; th->cm = cm; + cm->th = th; smr = (struct SendMessageRequest*) &cm[1]; smr->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND_REQUEST); smr->header.size = htons (sizeof (struct SendMessageRequest)); @@ -570,10 +564,15 @@ request_next_transmission (struct PeerRecord *pr) smr->queue_size = htonl (pr->queue_size); smr->size = htons (th->msize); smr->smr_id = htons (th->smr_id = pr->smr_id_gen++); - GNUNET_CONTAINER_DLL_insert_after (h->pending_head, - h->pending_tail, - h->pending_tail, + GNUNET_CONTAINER_DLL_insert_after (h->control_pending_head, + h->control_pending_tail, + h->control_pending_tail, cm); +#if DEBUG_CORE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Adding SEND REQUEST for peer `%s' to message queue\n", + GNUNET_i2s (&pr->peer)); +#endif trigger_next_request (h, GNUNET_NO); } @@ -590,14 +589,26 @@ transmission_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { struct PeerRecord *pr = cls; + struct GNUNET_CORE_Handle *h = pr->ch; struct GNUNET_CORE_TransmitHandle *th; - + pr->timeout_task = GNUNET_SCHEDULER_NO_TASK; th = pr->pending_head; GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, th); pr->queue_size--; + if ( (pr->prev != NULL) || + (pr->next != NULL) || + (pr == h->ready_peer_head) ) + { + /* the request that was 'approved' by core was + canceled before it could be transmitted; remove + us from the 'ready' list */ + GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, + h->ready_peer_tail, + pr); + } #if DEBUG_CORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Signalling timeout of request for transmission to CORE service\n"); @@ -624,6 +635,7 @@ transmit_message (void *cls, uint16_t msize; size_t ret; + GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK); h->cth = NULL; if (buf == NULL) { @@ -635,7 +647,7 @@ transmit_message (void *cls, return 0; } /* first check for control messages */ - if (NULL != (cm = h->pending_head)) + if (NULL != (cm = h->control_pending_head)) { hdr = (const struct GNUNET_MessageHeader*) &cm[1]; msize = ntohs (hdr->size); @@ -651,13 +663,13 @@ transmit_message (void *cls, (unsigned int) ntohs (hdr->type)); #endif memcpy (buf, hdr, msize); - GNUNET_CONTAINER_DLL_remove (h->pending_head, - h->pending_tail, - cm); + GNUNET_CONTAINER_DLL_remove (h->control_pending_head, + h->control_pending_tail, + cm); + if (cm->th != NULL) + cm->th->cm = NULL; if (NULL != cm->cont) - GNUNET_SCHEDULER_add_continuation (cm->cont, - cm->cont_cls, - GNUNET_SCHEDULER_REASON_PREREQ_DONE); + cm->cont (cm->cont_cls, GNUNET_OK); GNUNET_free (cm); trigger_next_request (h, GNUNET_NO); return msize; @@ -665,6 +677,7 @@ transmit_message (void *cls, /* now check for 'ready' P2P messages */ if (NULL != (pr = h->ready_peer_head)) { + GNUNET_assert (pr->pending_head != NULL); th = pr->pending_head; if (size < th->msize + sizeof (struct SendMessage)) { @@ -747,15 +760,33 @@ trigger_next_request (struct GNUNET_CORE_Handle *h, if ( (GNUNET_YES == h->currently_down) && (ignore_currently_down == GNUNET_NO) ) - return; + { +#if DEBUG_CORE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Core connection down, not processing queue\n"); +#endif + return; + } if (NULL != h->cth) - return; - if (h->pending_head != NULL) - msize = ntohs (((struct GNUNET_MessageHeader*) &h->pending_head[1])->size); - else if (h->ready_peer_head != NULL) + { +#if DEBUG_CORE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Request pending, not processing queue\n"); +#endif + return; + } + if (h->control_pending_head != NULL) + msize = ntohs (((struct GNUNET_MessageHeader*) &h->control_pending_head[1])->size); + else if (h->ready_peer_head != NULL) msize = h->ready_peer_head->pending_head->msize + sizeof (struct SendMessage); else - return; /* no pending message */ + { +#if DEBUG_CORE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Request queue empty, not processing queue\n"); +#endif + return; /* no pending message */ + } h->cth = GNUNET_CLIENT_notify_transmit_ready (h->client, msize, GNUNET_TIME_UNIT_FOREVER_REL, @@ -792,6 +823,7 @@ main_notify_handler (void *cls, int trigger; uint16_t msize; uint16_t et; + uint32_t ats_count; if (msg == NULL) { @@ -819,35 +851,67 @@ main_notify_handler (void *cls, m = (const struct InitReplyMessage *) msg; GNUNET_break (0 == ntohl (m->reserved)); /* start our message processing loop */ -#if DEBUG_CORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Successfully connected to core service, starting processing loop.\n"); -#endif if (GNUNET_YES == h->currently_down) { h->currently_down = GNUNET_NO; trigger_next_request (h, GNUNET_NO); } h->retry_backoff = GNUNET_TIME_UNIT_MILLISECONDS; + GNUNET_CRYPTO_hash (&m->publicKey, + sizeof (struct + GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded), + &h->me.hashPubKey); if (NULL != (init = h->init)) { /* mark so we don't call init on reconnect */ h->init = NULL; - GNUNET_CRYPTO_hash (&m->publicKey, - sizeof (struct - GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded), - &h->me.hashPubKey); +#if DEBUG_CORE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Connected to core service of peer `%s'.\n", + GNUNET_i2s (&h->me)); +#endif init (h->cls, h, &h->me, &m->publicKey); } + else + { +#if DEBUG_CORE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Successfully reconnected to core service.\n"); +#endif + } + /* fake 'connect to self' */ + pr = GNUNET_CONTAINER_multihashmap_get (h->peers, + &h->me.hashPubKey); + GNUNET_assert (pr == NULL); + pr = GNUNET_malloc (sizeof (struct PeerRecord)); + pr->peer = h->me; + pr->ch = h; + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_put (h->peers, + &h->me.hashPubKey, + pr, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST)); + if (NULL != h->connects) + h->connects (h->cls, + &h->me, + NULL); break; case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT: - if (msize != sizeof (struct ConnectNotifyMessage)) + if (msize < sizeof (struct ConnectNotifyMessage)) { GNUNET_break (0); reconnect_later (h); return; } cnm = (const struct ConnectNotifyMessage *) msg; + ats_count = ntohl (cnm->ats_count); + if ( (msize != sizeof (struct ConnectNotifyMessage) + ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information)) || + (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR != ntohl ((&cnm->ats)[ats_count].type)) ) + { + GNUNET_break (0); + reconnect_later (h); + return; + } #if DEBUG_CORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received notification about connection from `%s'.\n", @@ -857,7 +921,7 @@ main_notify_handler (void *cls, &cnm->peer, sizeof (struct GNUNET_PeerIdentity))) { - /* disconnect from self!? */ + /* connect to self!? */ GNUNET_break (0); return; } @@ -880,7 +944,7 @@ main_notify_handler (void *cls, if (NULL != h->connects) h->connects (h->cls, &cnm->peer, - NULL /* FIXME: atsi! */); + &cnm->ats); break; case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT: if (msize != sizeof (struct DisconnectNotifyMessage)) @@ -898,6 +962,7 @@ main_notify_handler (void *cls, GNUNET_break (0); return; } + GNUNET_break (0 == ntohl (dnm->reserved)); #if DEBUG_CORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received notification about disconnect from `%s'.\n", @@ -922,9 +987,8 @@ main_notify_handler (void *cls, if (NULL == h->status_events) { GNUNET_break (0); - break; } - if (msize != sizeof (struct PeerStatusNotifyMessage)) + if (msize < sizeof (struct PeerStatusNotifyMessage)) { GNUNET_break (0); reconnect_later (h); @@ -939,6 +1003,14 @@ main_notify_handler (void *cls, GNUNET_break (0); return; } + ats_count = ntohl (psnm->ats_count); + if ( (msize != sizeof (struct PeerStatusNotifyMessage) + ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information)) || + (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR != ntohl ((&psnm->ats)[ats_count].type)) ) + { + GNUNET_break (0); + reconnect_later (h); + return; + } #if DEBUG_CORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received notification about status change by `%s'.\n", @@ -957,27 +1029,27 @@ main_notify_handler (void *cls, psnm->bandwidth_in, psnm->bandwidth_out, GNUNET_TIME_absolute_ntoh (psnm->timeout), - NULL /* FIXME: atsi */); + &psnm->ats); break; case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND: - if (msize < - sizeof (struct NotifyTrafficMessage) + - sizeof (struct GNUNET_MessageHeader)) + if (msize < sizeof (struct NotifyTrafficMessage)) { GNUNET_break (0); reconnect_later (h); return; } ntm = (const struct NotifyTrafficMessage *) msg; - if (0 == memcmp (&h->me, - &ntm->peer, - sizeof (struct GNUNET_PeerIdentity))) - { - /* self-change!? */ - GNUNET_break (0); + + ats_count = ntohl (ntm->ats_count); + if ( (msize < sizeof (struct NotifyTrafficMessage) + ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information) + + sizeof (struct GNUNET_MessageHeader)) || + (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR != ntohl ((&ntm->ats)[ats_count].type)) ) + { + GNUNET_break (0); + reconnect_later (h); return; - } - em = (const struct GNUNET_MessageHeader *) &ntm[1]; + } + em = (const struct GNUNET_MessageHeader *) &(&ntm->ats)[ats_count+1]; #if DEBUG_CORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received message of type %u and size %u from peer `%4s'\n", @@ -994,7 +1066,8 @@ main_notify_handler (void *cls, return; } if ((GNUNET_NO == h->inbound_hdr_only) && - (msize != ntohs (em->size) + sizeof (struct NotifyTrafficMessage))) + (msize != ntohs (em->size) + sizeof (struct NotifyTrafficMessage) + + + ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information)) ) { GNUNET_break (0); reconnect_later (h); @@ -1014,7 +1087,7 @@ main_notify_handler (void *cls, } if (GNUNET_OK != h->handlers[hpos].callback (h->cls, &ntm->peer, em, - NULL /* FIXME: atsi */)) + &ntm->ats)) { /* error in processing, do not process other messages! */ break; @@ -1022,12 +1095,10 @@ main_notify_handler (void *cls, } if (NULL != h->inbound_notify) h->inbound_notify (h->cls, &ntm->peer, em, - NULL /* FIXME: atsi */); + &ntm->ats); break; case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_OUTBOUND: - if (msize < - sizeof (struct NotifyTrafficMessage) + - sizeof (struct GNUNET_MessageHeader)) + if (msize < sizeof (struct NotifyTrafficMessage)) { GNUNET_break (0); reconnect_later (h); @@ -1042,7 +1113,16 @@ main_notify_handler (void *cls, GNUNET_break (0); return; } - em = (const struct GNUNET_MessageHeader *) &ntm[1]; + ats_count = ntohl (ntm->ats_count); + if ( (msize < sizeof (struct NotifyTrafficMessage) + ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information) + + sizeof (struct GNUNET_MessageHeader)) || + (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR != ntohl ((&ntm->ats)[ats_count].type)) ) + { + GNUNET_break (0); + reconnect_later (h); + return; + } + em = (const struct GNUNET_MessageHeader *) &(&ntm->ats)[ats_count+1]; pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &ntm->peer.hashPubKey); if (pr == NULL) @@ -1057,7 +1137,8 @@ main_notify_handler (void *cls, GNUNET_i2s (&ntm->peer)); #endif if ((GNUNET_NO == h->outbound_hdr_only) && - (msize != ntohs (em->size) + sizeof (struct NotifyTrafficMessage))) + (msize != ntohs (em->size) + sizeof (struct NotifyTrafficMessage) + + ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information)) ) { GNUNET_break (0); reconnect_later (h); @@ -1069,7 +1150,7 @@ main_notify_handler (void *cls, break; } h->outbound_notify (h->cls, &ntm->peer, em, - NULL /* FIXME: atsi? */); + &ntm->ats); break; case GNUNET_MESSAGE_TYPE_CORE_SEND_READY: if (msize != sizeof (struct SendMessageReady)) @@ -1079,14 +1160,6 @@ main_notify_handler (void *cls, return; } smr = (const struct SendMessageReady *) msg; - if (0 == memcmp (&h->me, - &smr->peer, - sizeof (struct GNUNET_PeerIdentity))) - { - /* self-change!? */ - GNUNET_break (0); - return; - } pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &smr->peer.hashPubKey); if (pr == NULL) @@ -1100,6 +1173,13 @@ main_notify_handler (void *cls, "Received notification about transmission readiness to `%s'.\n", GNUNET_i2s (&smr->peer)); #endif + if (pr->pending_head == NULL) + { + /* request must have been cancelled between the original request + and the response from core, ignore core's readiness */ + return; + } + th = pr->pending_head; if (ntohs (smr->smr_id) != th->smr_id) { @@ -1151,7 +1231,13 @@ main_notify_handler (void *cls, return; } if (pr->rim_id != ntohl (cim->rim_id)) - break; + { +#if DEBUG_CORE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Reservation ID mismatch in notification...\n"); +#endif + break; + } pcic = pr->pcic; pr->pcic = NULL; if (pcic != NULL) @@ -1176,23 +1262,24 @@ main_notify_handler (void *cls, * Starts our 'receive' loop. * * @param cls the 'struct GNUNET_CORE_Handle' - * @param tc task context + * @param success were we successful */ static void init_done_task (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) + int success) { struct GNUNET_CORE_Handle *h = cls; - if (tc == NULL) - return; /* error */ - if (0 == (tc->reason & GNUNET_SCHEDULER_REASON_PREREQ_DONE)) + if (success == GNUNET_SYSERR) + return; /* shutdown */ + if (success == GNUNET_NO) { #if DEBUG_CORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Failed to exchange INIT with core, retrying\n"); #endif - reconnect_later (h); + if (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK) + reconnect_later (h); return; } GNUNET_CLIENT_receive (h->client, @@ -1259,8 +1346,8 @@ reconnect (struct GNUNET_CORE_Handle *h) ts = (uint16_t *) &init[1]; for (hpos = 0; hpos < h->hcnt; hpos++) ts[hpos] = htons (h->handlers[hpos].type); - GNUNET_CONTAINER_DLL_insert (h->pending_head, - h->pending_tail, + GNUNET_CONTAINER_DLL_insert (h->control_pending_head, + h->control_pending_tail, cm); trigger_next_request (h, GNUNET_YES); } @@ -1341,7 +1428,7 @@ GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, /** * Disconnect from the core service. This function can only * be called *after* all pending 'GNUNET_CORE_notify_transmit_ready' - * requests have been explicitly cancelled. + * requests have been explicitly canceled. * * @param handle connection to core to disconnect */ @@ -1364,19 +1451,22 @@ GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle) GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO); handle->client = NULL; } + while (NULL != (cm = handle->control_pending_head)) + { + GNUNET_CONTAINER_DLL_remove (handle->control_pending_head, + handle->control_pending_tail, + cm); + if (cm->th != NULL) + cm->th->cm = NULL; + if (cm->cont != NULL) + cm->cont (cm->cont_cls, GNUNET_SYSERR); + GNUNET_free (cm); + } if (handle->reconnect_task != GNUNET_SCHEDULER_NO_TASK) { GNUNET_SCHEDULER_cancel (handle->reconnect_task); handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK; } - while (NULL != (cm = handle->pending_head)) - { - GNUNET_CONTAINER_DLL_remove (handle->pending_head, - handle->pending_tail, - cm); - cm->cont (cm->cont_cls, NULL); - GNUNET_free (cm); - } GNUNET_CONTAINER_multihashmap_iterate (handle->peers, &disconnect_and_free_peer_entry, handle); @@ -1388,9 +1478,9 @@ GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle) /** * Ask the core to call "notify" once it is ready to transmit the - * given number of bytes to the specified "target". If we are not yet - * connected to the specified peer, a call to this function will cause - * us to try to establish a connection. + * given number of bytes to the specified "target". Must only be + * called after a connection to the respective peer has been + * established (and the client has been informed about this). * * @param handle connection to core service * @param priority how important is the message? @@ -1424,6 +1514,9 @@ GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle, if (NULL == pr) { /* attempt to send to peer that is not connected */ + GNUNET_log(GNUNET_ERROR_TYPE_WARNING, + "Attempting to send to peer `%s' from peer `%s', but not connected!\n", + GNUNET_i2s(target), GNUNET_h2s(&handle->me.hashPubKey)); GNUNET_break (0); return NULL; } @@ -1431,6 +1524,7 @@ GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle, GNUNET_SERVER_MAX_MESSAGE_SIZE); th = GNUNET_malloc (sizeof (struct GNUNET_CORE_TransmitHandle)); th->peer = pr; + GNUNET_assert(NULL != notify); th->get_message = notify; th->get_message_cls = notify_cls; th->timeout = GNUNET_TIME_relative_to_absolute (maxdelay); @@ -1439,9 +1533,9 @@ GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle, /* bound queue size */ if (pr->queue_size == handle->queue_size) { - /* find lowest-priority entry */ - minp = pr->pending_head; - prev = minp->next; + /* find lowest-priority entry, but skip the head of the list */ + minp = pr->pending_head->next; + prev = minp; while (prev != NULL) { if (prev->priority < minp->priority) @@ -1452,10 +1546,17 @@ GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle, { GNUNET_break (handle->queue_size != 0); GNUNET_break (pr->queue_size == 0); + GNUNET_free(th); return NULL; } if (priority <= minp->priority) - return NULL; /* priority too low */ + { +#if DEBUG_CORE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Dropping transmission request: priority too low\n"); +#endif + return NULL; /* priority too low */ + } GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, minp); @@ -1491,6 +1592,10 @@ GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle, th); pr->queue_size++; /* was the request queue previously empty? */ +#if DEBUG_CORE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Transmission request added to queue\n"); +#endif if (pr->pending_head == th) request_next_transmission (pr); return th; @@ -1514,11 +1619,12 @@ GNUNET_CORE_notify_transmit_ready_cancel (struct GNUNET_CORE_TransmitHandle GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, th); + pr->queue_size--; if (th->cm != NULL) { /* we're currently in the control queue, remove */ - GNUNET_CONTAINER_DLL_remove (h->pending_head, - h->pending_tail, + GNUNET_CONTAINER_DLL_remove (h->control_pending_head, + h->control_pending_tail, th->cm); GNUNET_free (th->cm); } @@ -1530,7 +1636,7 @@ GNUNET_CORE_notify_transmit_ready_cancel (struct GNUNET_CORE_TransmitHandle (pr == h->ready_peer_head) ) { /* the request that was 'approved' by core was - cancelled before it could be transmitted; remove + canceled before it could be transmitted; remove us from the 'ready' list */ GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, @@ -1564,7 +1670,7 @@ struct GNUNET_CORE_PeerRequestHandle /** * Continuation to run when done. */ - GNUNET_SCHEDULER_Task cont; + GNUNET_CORE_ControlContinuation cont; /** * Closure for 'cont'. @@ -1580,22 +1686,16 @@ struct GNUNET_CORE_PeerRequestHandle * resources. * * @param cls the 'struct GNUNET_CORE_PeerRequestHandle' - * @param tc scheduler context + * @param success was the request transmitted? */ static void peer_request_connect_cont (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) + int success) { struct GNUNET_CORE_PeerRequestHandle *ret = cls; if (ret->cont != NULL) - { - if (tc == NULL) - GNUNET_SCHEDULER_add_now (ret->cont, - ret->cont_cls); - else - ret->cont (ret->cont_cls, tc); - } + ret->cont (ret->cont_cls, success); GNUNET_free (ret); } @@ -1616,18 +1716,25 @@ peer_request_connect_cont (void *cls, * @param peer who should we connect to * @param cont function to call once the request has been completed (or timed out) * @param cont_cls closure for cont - * @return NULL on error (cont will not be called), otherwise handle for cancellation + * + * @return NULL on error or already connected, + * otherwise handle for cancellation */ struct GNUNET_CORE_PeerRequestHandle * GNUNET_CORE_peer_request_connect (struct GNUNET_CORE_Handle *h, struct GNUNET_TIME_Relative timeout, const struct GNUNET_PeerIdentity * peer, - GNUNET_SCHEDULER_Task cont, + GNUNET_CORE_ControlContinuation cont, void *cont_cls) { struct GNUNET_CORE_PeerRequestHandle *ret; struct ControlMessage *cm; struct ConnectMessage *msg; + + if (NULL != GNUNET_CONTAINER_multihashmap_get (h->peers, + &peer->hashPubKey)) + return NULL; /* Already connected, means callback should have happened already! */ + cm = GNUNET_malloc (sizeof (struct ControlMessage) + sizeof (struct ConnectMessage)); @@ -1637,8 +1744,8 @@ GNUNET_CORE_peer_request_connect (struct GNUNET_CORE_Handle *h, msg->reserved = htonl (0); msg->timeout = GNUNET_TIME_relative_hton (timeout); msg->peer = *peer; - GNUNET_CONTAINER_DLL_insert (h->pending_head, - h->pending_tail, + GNUNET_CONTAINER_DLL_insert (h->control_pending_head, + h->control_pending_tail, cm); ret = GNUNET_malloc (sizeof (struct GNUNET_CORE_PeerRequestHandle)); ret->h = h; @@ -1647,7 +1754,11 @@ GNUNET_CORE_peer_request_connect (struct GNUNET_CORE_Handle *h, ret->cont_cls = cont_cls; cm->cont = &peer_request_connect_cont; cm->cont_cls = ret; - if (h->pending_head == cm) +#if DEBUG_CORE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Queueing REQUEST_CONNECT request\n"); +#endif + if (h->control_pending_head == cm) trigger_next_request (h, GNUNET_NO); return ret; } @@ -1665,8 +1776,8 @@ GNUNET_CORE_peer_request_connect_cancel (struct GNUNET_CORE_PeerRequestHandle *r struct GNUNET_CORE_Handle *h = req->h; struct ControlMessage *cm = req->cm; - GNUNET_CONTAINER_DLL_remove (h->pending_head, - h->pending_tail, + GNUNET_CONTAINER_DLL_remove (h->control_pending_head, + h->control_pending_tail, cm); GNUNET_free (cm); GNUNET_free (req); @@ -1710,11 +1821,11 @@ struct GNUNET_CORE_InformationRequestContext * CM was sent, remove link so we don't double-free. * * @param cls the 'struct GNUNET_CORE_InformationRequestContext' - * @param tc scheduler context + * @param success were we successful? */ static void change_preference_send_continuation (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) + int success) { struct GNUNET_CORE_InformationRequestContext *irc = cls; @@ -1777,6 +1888,7 @@ GNUNET_CORE_peer_change_preference (struct GNUNET_CORE_Handle *h, } irc = GNUNET_malloc (sizeof (struct GNUNET_CORE_InformationRequestContext)); irc->h = h; + irc->pr = pr; irc->info = info; irc->info_cls = info_cls; cm = GNUNET_malloc (sizeof (struct ControlMessage) + @@ -1792,11 +1904,17 @@ GNUNET_CORE_peer_change_preference (struct GNUNET_CORE_Handle *h, rim->reserve_inbound = htonl (amount); rim->preference_change = GNUNET_htonll(preference); rim->peer = *peer; - GNUNET_CONTAINER_DLL_insert (h->pending_head, - h->pending_tail, +#if DEBUG_CORE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Queueing CHANGE PREFERENCE request\n"); +#endif + GNUNET_CONTAINER_DLL_insert (h->control_pending_head, + h->control_pending_tail, cm); pr->pcic = info; pr->pcic_cls = info_cls; + if (h->control_pending_head == cm) + trigger_next_request (h, GNUNET_NO); return irc; } @@ -1820,8 +1938,8 @@ GNUNET_CORE_peer_change_preference_cancel (struct GNUNET_CORE_InformationRequest if (irc->cm != NULL) { - GNUNET_CONTAINER_DLL_remove (h->pending_head, - h->pending_tail, + GNUNET_CONTAINER_DLL_remove (h->control_pending_head, + h->control_pending_tail, irc->cm); GNUNET_free (irc->cm); } @@ -1831,70 +1949,4 @@ GNUNET_CORE_peer_change_preference_cancel (struct GNUNET_CORE_InformationRequest } -/* ********************* GNUNET_CORE_iterate_peers *********************** */ - -/** - * Context for 'iterate_peers' helper function. - */ -struct IterationContext -{ - /** - * Callback to call. - */ - GNUNET_CORE_ConnectEventHandler peer_cb; - - /** - * Closure for 'peer_cb'. - */ - void *cb_cls; -}; - - -/** - * Call callback for each peer. - * - * @param cls the 'struct IterationContext' - * @param hc peer identity, not used - * @param value the 'struct PeerRecord' - * @return GNUNET_YES (continue iteration) - */ -static int -iterate_peers (void *cls, - const GNUNET_HashCode *hc, - void *value) -{ - struct IterationContext *ic = cls; - struct PeerRecord *pr = value; - - ic->peer_cb (ic->cb_cls, - &pr->peer, - NULL /* FIXME: pass atsi? */); - return GNUNET_YES; -} - - -/** - * Obtain statistics and/or change preferences for the given peer. - * - * @param h handle to core - * @param peer_cb function to call with the peer information - * @param cb_cls closure for peer_cb - * @return GNUNET_OK if iterating, GNUNET_SYSERR on error - */ -int -GNUNET_CORE_iterate_peers (struct GNUNET_CORE_Handle *h, - GNUNET_CORE_ConnectEventHandler peer_cb, - void *cb_cls) -{ - struct IterationContext ic; - - ic.peer_cb = peer_cb; - ic.cb_cls = cb_cls; - GNUNET_CONTAINER_multihashmap_iterate (h->peers, - &iterate_peers, - &ic); - return GNUNET_OK; -} - - /* end of core_api.c */