/**
* Head of transmission queue.
*/
- struct GNUNET_PEERSTORE_AddContext *ac_head;
+ struct GNUNET_PEERSTORE_RequestContext *rc_head;
/**
* Tail of transmission queue.
*/
- struct GNUNET_PEERSTORE_AddContext *ac_tail;
+ struct GNUNET_PEERSTORE_RequestContext *rc_tail;
/**
* Handle for the current transmission request, or NULL if none is pending.
*/
struct GNUNET_CLIENT_TransmitHandle *th;
+ /**
+ * Head of store requests DLL.
+ */
+ struct GNUNET_PEERSTORE_StoreContext *sc_head;
+
+ /**
+ * Tail of store requests DLL.
+ */
+ struct GNUNET_PEERSTORE_StoreContext *sc_tail;
+
/**
* ID for a reconnect task.
*/
* Entry in the transmission queue to PEERSTORE service.
*
*/
-struct GNUNET_PEERSTORE_AddContext
+struct GNUNET_PEERSTORE_RequestContext
{
/**
* This is a linked list.
*/
- struct GNUNET_PEERSTORE_AddContext *next;
+ struct GNUNET_PEERSTORE_RequestContext *next;
/**
* This is a linked list.
*/
- struct GNUNET_PEERSTORE_AddContext *prev;
+ struct GNUNET_PEERSTORE_RequestContext *prev;
/**
* Handle to the PEERSTORE service.
};
+/**
+ * Context for a store request
+ *
+ */
+struct GNUNET_PEERSTORE_StoreContext
+{
+ /**
+ * Kept in a DLL.
+ */
+ struct GNUNET_PEERSTORE_StoreContext *next;
+
+ /**
+ * Kept in a DLL.
+ */
+ struct GNUNET_PEERSTORE_StoreContext *prev;
+
+ /**
+ * Handle to the PEERSTORE service.
+ */
+ struct GNUNET_PEERSTORE_Handle *h;
+
+ /**
+ * Our entry in the transmission queue.
+ */
+ struct GNUNET_PEERSTORE_RequestContext *rc;
+
+ /**
+ * Function to call with store operation result
+ */
+ GNUNET_PEERSTORE_Continuation cont;
+
+ /**
+ * Closure for 'cont'.
+ */
+ void *cont_cls;
+
+ /**
+ * Set to GNUNET_YES if we are currently receiving replies from the
+ * service.
+ */
+ int request_transmitted;
+
+};
+
/******************************************************************************/
/*********************** DECLARATIONS *************************/
/******************************************************************************/
GNUNET_CLIENT_disconnect (h->client);
h->client = NULL;
}
- h->in_receive = GNUNET_NO;
h->client = GNUNET_CLIENT_connect ("peerstore", h->cfg);
if (NULL == h->client)
{
do_transmit (void *cls, size_t size, void *buf)
{
struct GNUNET_PEERSTORE_Handle *h = cls;
- struct GNUNET_PEERSTORE_AddContext *ac = h->ac_head;
+ struct GNUNET_PEERSTORE_RequestContext *rc = h->rc_head;
size_t ret;
h->th = NULL;
- if (NULL == ac)
- return 0; /* request was cancelled in the meantime */
+ if (NULL == rc)
+ return 0; /* request was canceled in the meantime */
if (NULL == buf)
{
/* peerstore service died */
LOG (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
"Failed to transmit message to `%s' service.\n", "PEERSTORE");
- GNUNET_CONTAINER_DLL_remove (h->ac_head, h->ac_tail, ac);
+ GNUNET_CONTAINER_DLL_remove (h->rc_head, h->rc_tail, rc);
reconnect (h);
- if (NULL != ac->cont)
- ac->cont (ac->cont_cls, _("failed to transmit request (service down?)"));
- GNUNET_free (ac);
+ if (NULL != rc->cont)
+ rc->cont (rc->cont_cls, _("failed to transmit request (service down?)"));
+ GNUNET_free (rc);
return 0;
}
- ret = ac->size;
+ ret = rc->size;
if (size < ret)
{
/* change in head of queue (i.e. cancel + add), try again */
}
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Transmitting request of size %u to `%s' service.\n", ret, "PEERSTORE");
- memcpy (buf, &ac[1], ret);
- GNUNET_CONTAINER_DLL_remove (h->ac_head, h->ac_tail, ac);
+ memcpy (buf, &rc[1], ret);
+ GNUNET_CONTAINER_DLL_remove (h->rc_head, h->rc_tail, rc);
trigger_transmit (h);
- if (NULL != ac->cont)
- ac->cont (ac->cont_cls, NULL);
- GNUNET_free (ac);
+ if (NULL != rc->cont)
+ rc->cont (rc->cont_cls, NULL);
+ GNUNET_free (rc);
return ret;
}
static void
trigger_transmit (struct GNUNET_PEERSTORE_Handle *h)
{
- struct GNUNET_PEERSTORE_AddContext *ac;
+ struct GNUNET_PEERSTORE_RequestContext *rc;
- if (NULL == (ac = h->ac_head))
+ if (NULL == (rc = h->rc_head))
return; /* no requests queued */
if (NULL != h->th)
return; /* request already pending */
return;
}
h->th =
- GNUNET_CLIENT_notify_transmit_ready (h->client, ac->size,
+ GNUNET_CLIENT_notify_transmit_ready (h->client, rc->size,
GNUNET_TIME_UNIT_FOREVER_REL,
GNUNET_YES,
&do_transmit, h);
}
+/******************************************************************************/
+/******************* GENERAL FUNCTIONS *********************/
+/******************************************************************************/
+
+/**
+ * Function called with server response message
+ * after a store operation is request
+ *
+ * @param cls a 'struct GNUNET_PEERSTORE_StoreContext'
+ * @param msg message received, NULL on timeout or fatal error
+ */
+static void
+peerstore_handler (void *cls, const struct GNUNET_MessageHeader *msg)
+{
+ struct GNUNET_PEERSTORE_Handle *h = cls;
+ struct GNUNET_PEERSTORE_StoreContext *sc;
+ struct StoreResponseMessage *srm;
+ uint16_t response_type;
+ uint16_t response_size;
+ char *emsg;
+
+ h->in_receive = GNUNET_NO;
+ if(NULL == msg)
+ {
+ reconnect(h);
+ return;
+ }
+ response_type = ntohs(msg->type);
+ response_size = ntohs(msg->size);
+ switch(response_type)
+ {
+ case GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT:
+ GNUNET_assert(response_size >= sizeof(struct GNUNET_MessageHeader) + sizeof(struct StoreResponseMessage));
+ sc = h->sc_head;
+ if(NULL == sc)
+ {
+ LOG(GNUNET_ERROR_TYPE_ERROR, "Received a response to a non-existent store request\n");
+ return;
+ }
+ GNUNET_PEERSTORE_store_cancel(sc);
+ trigger_transmit (h);
+ if (NULL != h->sc_head)
+ {
+ h->in_receive = GNUNET_YES;
+ GNUNET_CLIENT_receive (h->client,
+ &peerstore_handler,
+ h,
+ GNUNET_TIME_UNIT_FOREVER_REL);
+ }
+ if(NULL != sc->cont)
+ {
+ srm = (struct StoreResponseMessage *)&msg[1];
+ emsg = NULL;
+ if(GNUNET_NO == ntohs(srm->success))
+ {
+ emsg = GNUNET_malloc(ntohs(srm->emsg_size));
+ memcpy(emsg, &srm[1], ntohs(srm->emsg_size));
+ }
+ sc->cont(sc->cont_cls, emsg);
+ }
+ break;
+ }
+
+}
+
/******************************************************************************/
/******************* ADD FUNCTIONS *********************/
/******************************************************************************/
-struct GNUNET_PEERSTORE_AddContext *
-GNUNET_PEERSTORE_add (struct GNUNET_PEERSTORE_Handle *h,
+/**
+ * Cancel a store request
+ *
+ * @param sc Store request context
+ */
+void
+GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc)
+{
+ struct GNUNET_PEERSTORE_Handle *h;
+
+ h = sc->h;
+ sc->cont = NULL;
+ if (GNUNET_YES == sc->request_transmitted)
+ return; /* need to finish processing */
+ GNUNET_CONTAINER_DLL_remove (h->sc_head,
+ h->sc_tail,
+ sc);
+ if (NULL != sc->rc)
+ {
+ GNUNET_CONTAINER_DLL_remove (h->rc_head, h->rc_tail, sc->rc);
+ GNUNET_free (sc->rc);
+ }
+ GNUNET_free (sc);
+}
+
+/**
+ * Called after store request is sent
+ * Waits for response from service
+ *
+ * @param cls a 'struct GNUNET_PEERSTORE_StoreContext'
+ * @parma emsg error message (or NULL)
+ */
+void store_receive_result(void *cls, const char *emsg)
+{
+ struct GNUNET_PEERSTORE_StoreContext *sc = cls;
+ struct GNUNET_PEERSTORE_Handle *h = sc->h;
+
+ sc->rc = NULL;
+ if(NULL != emsg)
+ {
+ GNUNET_PEERSTORE_store_cancel (sc);
+ reconnect (h);
+ if (NULL != sc->cont)
+ sc->cont (sc->cont_cls, emsg);
+ return;
+ }
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Waiting for response from `%s' service.\n",
+ "PEERSTORE");
+ sc->request_transmitted = GNUNET_YES;
+ if (GNUNET_NO == h->in_receive)
+ {
+ h->in_receive = GNUNET_YES;
+ GNUNET_CLIENT_receive (h->client,
+ &peerstore_handler,
+ h,
+ GNUNET_TIME_UNIT_FOREVER_REL);
+ }
+}
+
+/**
+ * Store a new entry in the PEERSTORE
+ *
+ * @param h Handle to the PEERSTORE service
+ * @param peer Peer Identity
+ * @param sub_system name of the sub system
+ * @param value entry value BLOB
+ * @param size size of 'value'
+ * @param lifetime relative time after which the entry is (possibly) deleted
+ * @param cont Continuation function after the store request is processed
+ * @param cont_cls Closure for 'cont'
+ */
+struct GNUNET_PEERSTORE_StoreContext *
+GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
const struct GNUNET_PeerIdentity *peer,
const char *sub_system,
const void *value,
GNUNET_PEERSTORE_Continuation cont,
void *cont_cls)
{
- struct GNUNET_PEERSTORE_AddContext *ac;
- struct AddEntryMessage *entry;
+ struct GNUNET_PEERSTORE_RequestContext *rc;
+ struct StoreRequestMessage *entry;
+ struct GNUNET_PEERSTORE_StoreContext *sc;
char *ss;
void *val;
size_t sub_system_size;
"Storing value (size: %lu) for subsytem `%s' and peer `%s'",
size, sub_system, GNUNET_i2s (peer));
sub_system_size = strlen(sub_system);
- request_size = sizeof(struct AddEntryMessage) + sub_system_size + size;
- ac = GNUNET_malloc(sizeof(struct GNUNET_PEERSTORE_AddContext) + request_size);
- ac->h = h;
- ac->size = request_size;
- entry = (struct AddEntryMessage *)&ac[1];
+ request_size = sizeof(struct StoreRequestMessage) + sub_system_size + size;
+ rc = GNUNET_malloc(sizeof(struct GNUNET_PEERSTORE_RequestContext) + request_size);
+ rc->h = h;
+ rc->size = request_size;
+ entry = (struct StoreRequestMessage *)&rc[1];
entry->header.size = htons(request_size);
- entry->header.type = htons(GNUNET_MESSAGE_TYPE_PEERSTORE_ADD);
+ entry->header.type = htons(GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
entry->peer = *peer;
- entry->sub_system_size = sub_system_size;
- entry->value_size = size;
+ entry->sub_system_size = htons(sub_system_size);
+ entry->value_size = htons(size);
entry->lifetime = lifetime;
ss = (char *)&entry[1];
memcpy(ss, sub_system, sub_system_size);
val = ss + sub_system_size;
memcpy(val, value, size);
- GNUNET_CONTAINER_DLL_insert_tail(h->ac_head, h->ac_tail, ac);
+ sc = GNUNET_new(struct GNUNET_PEERSTORE_StoreContext);
+ sc->cont = cont;
+ sc->cont_cls = cont_cls;
+ sc->h = h;
+ sc->rc = rc;
+ rc->cont = &store_receive_result;
+ rc->cont_cls = sc;
+ GNUNET_CONTAINER_DLL_insert_tail(h->rc_head, h->rc_tail, rc);
+ GNUNET_CONTAINER_DLL_insert_tail(h->sc_head, h->sc_tail, sc);
trigger_transmit (h);
- return ac;
+ return sc;
}