*/
#define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
+/**
+ * After how long do we discard a reply?
+ */
+#define REPLY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
+
/**
* Handle to cancel a transmission request.
}
+/**
+ * Copy reply and free put message.
+ *
+ * @param cls the 'struct PutMessage'
+ * @param buf_size number of bytes available in buf
+ * @param buf where to copy the message, NULL on error (peer disconnect)
+ * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
+ */
+static size_t
+copy_reply (void *cls,
+ size_t buf_size,
+ void *buf)
+{
+ struct PutMessage *pm = cls;
+
+ if (buf != NULL)
+ {
+ GNUNET_assert (size >= ntohs (pm->header.size));
+ size = ntohs (pm->header.size);
+ memcpy (buf, pm, size);
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# replies transmitted to other peers"),
+ 1,
+ GNUNET_NO);
+ }
+ else
+ {
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# replies dropped"),
+ 1,
+ GNUNET_NO);
+ }
+ GNUNET_free (pm);
+ return size;
+}
+
+
/**
* Handle a reply to a pending request. Also called if a request
* expires (then with data == NULL). The handler may be called
* @param cls 'struct GSF_ConnectedPeer' of the peer that would
* have liked an answer to the request
* @param pr handle to the original pending request
+ * @param expiration when does 'data' expire?
* @param data response data, NULL on request expiration
* @param data_len number of bytes in data
+ * @param more GNUNET_YES if the request remains active (may call
+ * this function again), GNUNET_NO if the request is
+ * finished (client must not call GSF_pending_request_cancel_)
*/
static void
handle_p2p_reply (void *cls,
struct GSF_PendingRequest *pr,
+ struct GNUNET_TIME_Absolute expiration,
const void *data,
- size_t data_len)
+ size_t data_len,
+ int more)
{
struct GSF_ConnectedPeer *cp = cls;
+ struct GSF_PendingRequest *prd;
+ struct PutMessage *pm;
+ size_t msize;
-#if SUPPORT_DELAYS
- struct GNUNET_TIME_Relative art_delay;
-#endif
-
- /* FIXME: adapt code fragments below to new API! */
+ prd = GSF_pending_request_get_data_ (pr);
if (NULL == data)
{
- /* FIXME: request expired! clean up! */
+ GNUNET_assert (GNUNET_NO == more);
GNUNET_STATISTICS_update (stats,
gettext_noop ("# P2P searches active"),
-1,
GNUNET_NO);
+ GNUNET_break (GNUNET_OK ==
+ GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
+ &prd->query,
+ pr));
return;
}
-
- /* reply will go over the network, check for cover traffic */
- if ( (prq->anonymity_level > 1) &&
- (cover_content_count < prq->anonymity_level - 1) )
- {
- /* insufficient cover traffic, skip */
- GNUNET_STATISTICS_update (stats,
- gettext_noop ("# replies suppressed due to lack of cover traffic"),
- 1,
- GNUNET_NO);
- return GNUNET_YES;
- }
- if (prq->anonymity_level > 1)
- cover_content_count -= prq->anonymity_level - 1;
-
-
- cp = pr->cp;
#if DEBUG_FS
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Transmitting result for query `%s' to other peer (PID=%u)\n",
- GNUNET_h2s (key),
- (unsigned int) cp->pid);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Transmitting result for query `%s'\n",
+ GNUNET_h2s (key));
#endif
- GNUNET_STATISTICS_update (stats,
- gettext_noop ("# replies received for other peers"),
- 1,
- GNUNET_NO);
- msize = sizeof (struct PutMessage) + prq->size;
- reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
- reply->cont = &transmit_reply_continuation;
- reply->cont_cls = pr;
-#if SUPPORT_DELAYS
- art_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
- GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
- TTL_DECREMENT));
- reply->delay_until
- = GNUNET_TIME_relative_to_absolute (art_delay);
- GNUNET_STATISTICS_update (stats,
- gettext_noop ("cummulative artificial delay introduced (ms)"),
- art_delay.abs_value,
- GNUNET_NO);
-#endif
- reply->msize = msize;
- reply->priority = UINT32_MAX; /* send replies first! */
- pm = (struct PutMessage*) &reply[1];
- pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
- pm->header.size = htons (msize);
- pm->type = htonl (prq->type);
- pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
- memcpy (&pm[1], prq->data, prq->size);
- add_to_pending_messages_for_peer (cp, reply, pr);
-
-
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# replies received for other peers"),
+ 1,
+ GNUNET_NO);
+ msize = sizeof (struct PutMessage) + data_len;
+ pm = GNUNET_malloc (sizeof (msize));
+ pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
+ pm->header.size = htons (msize);
+ pm->type = htonl (prd->type);
+ pm->expiration = GNUNET_TIME_absolute_hton (expiration);
+ memcpy (&pm[1], data, data_len);
+ (void) GSF_peer_transmit_ (cp, GNUNET_NO,
+ UINT32_MAX,
+ REPLY_TIMEOUT,
+ msize,
+ ©_reply,
+ pm);
}
}
-
/**
* We have received a reply; handle it!
*
break;
case GNUNET_BLOCK_EVALUATION_OK_LAST:
update_request_performance_data (prq, pr);
- /* FIXME: adapt code to new API! */
- while (NULL != pr->pending_head)
- destroy_pending_message_list_entry (pr->pending_head);
- if (pr->qe != NULL)
- {
- if (pr->client_request_list != NULL)
- GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client,
- GNUNET_YES);
- GNUNET_DATASTORE_cancel (pr->qe);
- pr->qe = NULL;
- }
- pr->do_remove = GNUNET_YES;
- if (pr->task != GNUNET_SCHEDULER_NO_TASK)
- {
- GNUNET_SCHEDULER_cancel (pr->task);
- pr->task = GNUNET_SCHEDULER_NO_TASK;
- }
- GNUNET_break (GNUNET_YES ==
- GNUNET_CONTAINER_multihashmap_remove (query_request_map,
- key,
- pr));
GNUNET_LOAD_update (rt_entry_lifetime,
GNUNET_TIME_absolute_get_duration (pr->start_time).rel_value);
- break;
+ /* pass on to other peers / local clients */
+ pr->rh (pr->rh_cls, pr, prq->data, prq->size, GNUNET_NO);
+ /* destroy request, we're done */
+ GSF_pending_request_cancel_ (pr);
+ return GNUNET_YES;
case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
GNUNET_STATISTICS_update (stats,
gettext_noop ("# duplicate replies discarded (bloomfilter)"),
pr->results_found++;
prq->request_found = GNUNET_YES;
/* finally, pass on to other peers / local clients */
- pr->rh (pr->rh_cls, pr, prq->data, prq->size);
+ pr->rh (pr->rh_cls, pr, prq->data, prq->size, GNUNET_YES);
return GNUNET_YES;
}
GSF_pending_request_init_ ()
{
pr_map = GNUNET_CONTAINER_multihashmap_create (32 * 1024);
+ requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
}
NULL);
GNUNET_CONTAINER_multihashmap_destroy (pr_map);
pr_map = NULL;
+ GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
+ requests_by_expiration_heap = NULL;
}