* - GSF_plan_get_ (!)
* - GSF_plan_size_ (?)
* - GSF_plan_notify_request_done (!)
- * -
+ * - consider re-issue GSF_dht_lookup_ after non-DHT reply received
*
*
*/
}
+/**
+ * We have a new request, consider forwarding it to the given
+ * peer.
+ *
+ * @param cls the 'struct GSF_PendingRequest'
+ * @param peer identity of the peer
+ * @param cp handle to the connected peer record
+ * @param perf peer performance data
+ */
+static void
+consider_request_for_forwarding (void *cls,
+ const struct GNUNET_PeerIdentity *peer,
+ struct GSF_ConnectedPeer *cp,
+ const struct GSF_PeerPerformanceData *ppd)
+{
+ struct GSF_PendingRequest *pr = cls;
+
+ plan (cp, pr);
+}
+
+
+/**
+ * Function to be called after we're done processing
+ * replies from the local lookup. If the result status
+ * code indicates that there may be more replies, plan
+ * forwarding the request.
+ *
+ * @param cls closure (NULL)
+ * @param pr the pending request we were processing
+ * @param result final datastore lookup result
+ */
+static void
+consider_forwarding (void *cls,
+ struct GSF_PendingRequest *pr,
+ enum GNUNET_BLOCK_EvaluationResult result)
+{
+ if (GNUNET_BLOCK_EVALUATION_OK_LAST == result)
+ return; /* we're done... */
+ GSF_iterate_connected_peers_ (&consider_request_for_forwarding,
+ pr);
+}
+
+
/**
* Handle P2P "GET" request.
*
pr = GSF_handle_p2p_query_ (other, message);
if (NULL == pr)
- return GNUNET_SYSERR;
- /* FIXME: local lookup! */
- /* FIXME: after local lookup, trigger forwarding/routing! */
+ return GNUNET_SYSERR;
+ GSF_local_lookup_ (pr,
+ &consider_forwarding,
+ NULL);
return GNUNET_OK;
}
/**
- * We have a new request, consider forwarding it to the given
- * peer.
+ * We're done with the local lookup, now consider
+ * P2P processing (depending on request options and
+ * result status). Also signal that we can now
+ * receive more request information from the client.
*
- * @param cls the 'struct GSF_PendingRequest'
- * @param peer identity of the peer
- * @param cp handle to the connected peer record
- * @param perf peer performance data
+ * @param cls the client doing the request ('struct GNUNET_SERVER_Client')
+ * @param pr the pending request we were processing
+ * @param result final datastore lookup result
*/
static void
-consider_request_for_forwarding (void *cls,
- const struct GNUNET_PeerIdentity *peer,
- struct GSF_ConnectedPeer *cp,
- const struct GSF_PeerPerformanceData *ppd)
+start_p2p_processing (void *cls,
+ struct GSF_PendingRequest *pr,
+ enum GNUNET_BLOCK_EvaluationResult result)
{
- struct GSF_PendingRequest *pr = cls;
+ struct GNUNET_SERVER_Client *client = cls;
- plan (cp, pr);
+ GNUNET_SERVER_receive_done (client,
+ GNUNET_OK);
+ if (GNUNET_BLOCK_EVALUATION_OK_LAST == result)
+ return; /* we're done... */
+ GSF_dht_lookup_ (pr);
+ consider_forwarding (NULL, pr, result);
}
/* 'GNUNET_SERVER_receive_done was already called! */
return;
}
- /* FIXME: local lookup, then (after DB done!) receive_done: */
- GNUNET_SERVER_receive_done (client,
- GNUNET_OK);
-#if 0
- /* FIXME: also do DHT lookup */
- struct GNUNET_DHT_GetHandle *gh;
- /* store 'gh' with 'pr', cancel it on pr destruction, etc. */
- gh = GNUNET_DHT_get_start (GSF_dht,
- timeout,
- type,
- key,
- des_repl_level,
- options,
- bf,
- bf_mutator,
- xquery,
- xquery_size,
- &GSF_handle_dht_reply_,
- pr);
-#endif
- GSF_iterate_connected_peers_ (&consider_request_for_forwarding,
- pr);
+ GSF_local_lookup_ (pr,
+ &start_p2p_processing,
+ client);
}
*/
struct GNUNET_CONTAINER_HeapNode *hnode;
+ /**
+ * Datastore queue entry for this request (or NULL for none).
+ */
+ struct GNUNET_DATASTORE_QueueEntry *qe;
+
+ /**
+ * DHT request handle for this request (or NULL for none).
+ */
+ struct GNUNET_DHT_GetHandle *gh;
+
/**
* Identity of the peer that we should use for the 'sender'
* (recipient of the response) when forwarding (0 for none).
if (NULL != pr->hnode)
GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
pr->hnode);
+ if (NULL != pr->qe)
+ GNUNET_DATASTORE_cancel (pr->qe);
+ if (NULL != pr->gh)
+ GNUNET_DHT_get_stop (pr->gh);
GNUNET_free (pr);
return GNUNET_YES;
}
1,
GNUNET_NO);
}
+ else
+ {
+ GSF_dht_lookup_ (pr);
+ }
prq->priority += pr->public_data.original_priority;
pr->public_data.priority = 0;
pr->public_data.original_priority = 0;
* @param size number of bytes in data
* @param data pointer to the result data
*/
-void
-GSF_handle_dht_reply_ (void *cls,
- struct GNUNET_TIME_Absolute exp,
- const GNUNET_HashCode *key,
- const struct GNUNET_PeerIdentity * const *get_path,
- const struct GNUNET_PeerIdentity * const *put_path,
- enum GNUNET_BLOCK_Type type,
- size_t size,
- const void *data)
+static void
+handle_dht_reply (void *cls,
+ struct GNUNET_TIME_Absolute exp,
+ const GNUNET_HashCode *key,
+ const struct GNUNET_PeerIdentity * const *get_path,
+ const struct GNUNET_PeerIdentity * const *put_path,
+ enum GNUNET_BLOCK_Type type,
+ size_t size,
+ const void *data)
{
struct GSF_PendingRequest *pr = cls;
struct ProcessReplyClosure prq;
}
+/**
+ * Consider looking up the data in the DHT (anonymity-level permitting).
+ *
+ * @param pr the pending request to process
+ */
+void
+GSF_dht_lookup_ (struct GSF_PendingRequest *pr)
+{
+ const void *xquery;
+ size_t xquery_size;
+ struct GNUNET_PeerIdentity pi;
+ char buf[sizeof (GNUNET_HashCode) * 2];
+
+ if (0 != pr->public_data.anonymity_level)
+ return;
+ if (NULL != pr->gh)
+ {
+ GNUNET_DHT_get_stop (pr->gh);
+ pr->gh = NULL;
+ }
+ xquery = NULL;
+ xquery_size = 0;
+ if (GNUNET_BLOCK_TYPE_FS_SBLOCK == pr->public_data.type)
+ {
+ xquery = buf;
+ memcpy (buf, &pr->public_data.namespace, sizeof (GNUNET_HashCode));
+ xquery_size = sizeof (GNUNET_HashCode);
+ }
+ if (0 != (pr->public_data.options & GSF_PRO_FORWARD_ONLY))
+ {
+ GNUNET_PEER_resolve (pr->sender_pid,
+ &pi);
+ memcpy (&buf[xquery_size], &pi, sizeof (struct GNUNET_PeerIdentity));
+ xquery_size += sizeof (struct GNUNET_PeerIdentity);
+ }
+ pr->gh = GNUNET_DHT_get_start (GSF_dht,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ pr->public_data.type,
+ &pr->public_data.query,
+ DEFAULT_GET_REPLICATION,
+ GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE,
+ pr->bf,
+ pr->mingle,
+ xquery,
+ xquery_size,
+ &handle_dht_reply,
+ pr);
+}
+
+
+/**
+ * We're processing (local) results for a search request
+ * from another peer. Pass applicable results to the
+ * peer and if we are done either clean up (operation
+ * complete) or forward to other peers (more results possible).
+ *
+ * @param cls our closure (struct LocalGetContext)
+ * @param key key for the content
+ * @param size number of bytes in data
+ * @param data content stored
+ * @param type type of the content
+ * @param priority priority of the content
+ * @param anonymity anonymity-level for the content
+ * @param expiration expiration time for the content
+ * @param uid unique identifier for the datum;
+ * maybe 0 if no unique identifier is available
+ */
+static void
+process_local_reply (void *cls,
+ const GNUNET_HashCode * key,
+ size_t size,
+ const void *data,
+ enum GNUNET_BLOCK_Type type,
+ uint32_t priority,
+ uint32_t anonymity,
+ struct GNUNET_TIME_Absolute expiration,
+ uint64_t uid)
+{
+#if FIXME
+ struct PendingRequest *pr = cls;
+ struct ProcessReplyClosure prq;
+ struct CheckDuplicateRequestClosure cdrc;
+ GNUNET_HashCode query;
+ unsigned int old_rf;
+
+ if (NULL == key)
+ {
+#if DEBUG_FS > 1
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Done processing local replies, forwarding request to other peers.\n");
+#endif
+ pr->qe = NULL;
+ if (pr->client_request_list != NULL)
+ {
+ GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client,
+ GNUNET_YES);
+ /* Figure out if this is a duplicate request and possibly
+ merge 'struct PendingRequest' entries */
+ cdrc.have = NULL;
+ cdrc.pr = pr;
+ GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
+ &pr->query,
+ &check_duplicate_request_client,
+ &cdrc);
+ if (cdrc.have != NULL)
+ {
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received request for block `%s' twice from client, will only request once.\n",
+ GNUNET_h2s (&pr->query));
+#endif
+
+ destroy_pending_request (pr);
+ return;
+ }
+ }
+ if (pr->local_only == GNUNET_YES)
+ {
+ destroy_pending_request (pr);
+ return;
+ }
+ /* no more results */
+ if (pr->task == GNUNET_SCHEDULER_NO_TASK)
+ pr->task = GNUNET_SCHEDULER_add_now (&forward_request_task,
+ pr);
+ return;
+ }
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "New local response to `%s' of type %u.\n",
+ GNUNET_h2s (key),
+ type);
+#endif
+ if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
+ {
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Found ONDEMAND block, performing on-demand encoding\n");
+#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# on-demand blocks matched requests"),
+ 1,
+ GNUNET_NO);
+ if (GNUNET_OK !=
+ GNUNET_FS_handle_on_demand_block (key, size, data, type, priority,
+ anonymity, expiration, uid,
+ &process_local_reply,
+ pr))
+ if (pr->qe != NULL)
+ {
+ GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
+ }
+ return;
+ }
+ old_rf = pr->results_found;
+ memset (&prq, 0, sizeof (prq));
+ prq.data = data;
+ prq.expiration = expiration;
+ prq.size = size;
+ if (GNUNET_OK !=
+ GNUNET_BLOCK_get_key (block_ctx,
+ type,
+ data,
+ size,
+ &query))
+ {
+ GNUNET_break (0);
+ GNUNET_DATASTORE_remove (dsh,
+ key,
+ size, data,
+ -1, -1,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ NULL, NULL);
+ GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
+ return;
+ }
+ prq.type = type;
+ prq.priority = priority;
+ prq.finished = GNUNET_NO;
+ prq.request_found = GNUNET_NO;
+ prq.anonymity_level = anonymity;
+ if ( (old_rf == 0) &&
+ (pr->results_found == 0) )
+ update_datastore_delays (pr->start_time);
+ process_reply (&prq, key, pr);
+ if (prq.finished == GNUNET_YES)
+ return;
+ if (pr->qe == NULL)
+ return; /* done here */
+ if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST)
+ {
+ pr->local_only = GNUNET_YES; /* do not forward */
+ GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
+ return;
+ }
+ if ( (pr->client_request_list == NULL) &&
+ ( (GNUNET_YES == test_get_load_too_high (0)) ||
+ (pr->results_found > 5 + 2 * pr->priority) ) )
+ {
+#if DEBUG_FS > 2
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Load too high, done with request\n");
+#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# processing result set cut short due to load"),
+ 1,
+ GNUNET_NO);
+ GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
+ return;
+ }
+ GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
+#endif
+}
+
+
+/**
+ * Look up the request in the local datastore.
+ *
+ * @param pr the pending request to process
+ * @param cont function to call at the end
+ * @param cont_cls closure for cont
+ */
+void
+GSF_local_lookup_ (struct GSF_PendingRequest *pr,
+ GSF_LocalLookupContinuation cont,
+ void *cont_cls)
+{
+ // FIXME: fix process_local_reply / cont!
+ GNUNET_assert (NULL == pr->gh);
+ pr->qe = GNUNET_DATASTORE_get (GSF_dsh,
+ &pr->public_data.query,
+ pr->public_data.type,
+ 1 /* queue priority */,
+ 1 /* max queue size */,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &process_local_reply,
+ pr);
+}
+
+
+
/**
* Handle P2P "CONTENT" message. Checks that the message is
* well-formed and then checks if there are any pending requests for
/**
- * Iterator called on each result obtained for a DHT
- * operation that expects a reply
+ * Consider looking up the data in the DHT (anonymity-level permitting).
*
- * @param cls closure, the 'struct GSF_PendingRequest *'.
- * @param exp when will this value expire
- * @param key key of the result
- * @param get_path NULL-terminated array of pointers
- * to the peers on reverse GET path (or NULL if not recorded)
- * @param put_path NULL-terminated array of pointers
- * to the peers on the PUT path (or NULL if not recorded)
- * @param type type of the result
- * @param size number of bytes in data
- * @param data pointer to the result data
+ * @param pr the pending request to process
*/
void
-GSF_handle_dht_reply_ (void *cls,
- struct GNUNET_TIME_Absolute exp,
- const GNUNET_HashCode * key,
- const struct GNUNET_PeerIdentity * const *get_path,
- const struct GNUNET_PeerIdentity * const *put_path,
- enum GNUNET_BLOCK_Type type,
- size_t size,
- const void *data);
+GSF_dht_lookup_ (struct GSF_PendingRequest *pr);
+
+
+/**
+ * Function to be called after we're done processing
+ * replies from the local lookup.
+ *
+ * @param cls closure
+ * @param pr the pending request we were processing
+ * @param result final datastore lookup result
+ */
+typedef void (GSF_LocalLookupContinuation)(void *cls,
+ struct GSF_PendingRequest *pr,
+ enum GNUNET_BLOCK_EvaluationResult result);
+
+
+/**
+ * Look up the request in the local datastore.
+ *
+ * @param pr the pending request to process
+ * @param cont function to call at the end
+ * @param cont_cls closure for cont
+ */
+void
+GSF_local_lookup_ (struct GSF_PendingRequest *pr,
+ GSF_LocalLookupContinuation cont,
+ void *cont_cls);
/**