* The status of the lock
*/
enum GNUNET_LOCKMANAGER_Status status;
+
+ /**
+ * set to GNUNET_YES if acquire message for this lock is till in messga queue
+ */
+ int acquire_sent;
};
memcpy (buf, queue_entity->msg, msg_size);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Message of size %u sent\n", msg_size);
- GNUNET_free (queue_entity->msg);
+ if (GNUNET_MESSAGE_TYPE_LOCKMANAGER_ACQUIRE
+ == ntohs (queue_entity->msg->header.type))
+ {
+ GNUNET_break (GNUNET_NO == queue_entity->lr->acquire_sent);
+ queue_entity->lr->acquire_sent = GNUNET_YES;
+ queue_entity->lr->mqe = NULL;
+ }
+ GNUNET_free (queue_entity->msg);
GNUNET_CONTAINER_DLL_remove (handle->mq_head,
handle->mq_tail,
- queue_entity);
+ queue_entity);
GNUNET_free (queue_entity);
queue_entity = handle->mq_head;
if (NULL != queue_entity)
*
* @param handle the lockmanager handle whose queue will be used
* @param msg the message to be queued
+ * @param request the locking reqeust responsible for queueing this message
+ * @return the MessageQueue entity that has been queued
*/
-static void
+static struct MessageQueue *
queue_message (struct GNUNET_LOCKMANAGER_Handle *handle,
- struct GNUNET_LOCKMANAGER_Message *msg)
+ struct GNUNET_LOCKMANAGER_Message *msg,
+ struct GNUNET_LOCKMANAGER_LockingRequest *request)
{
struct MessageQueue *queue_entity;
GNUNET_assert (NULL != msg);
queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
queue_entity->msg = msg;
+ queue_entity->lr = request;
GNUNET_CONTAINER_DLL_insert_tail (handle->mq_head,
handle->mq_tail,
queue_entity);
&transmit_notify,
handle);
}
+ return queue_entity;
}
/**
- * Iterator to call relase on locks
+ * Iterator to call relase on locks; acquire messages are sent for all
+ * locks. In addition, if a lock is acquired before, it is not released and its
+ * status callback is called to signal its release
*
* @param cls the lockmanager handle
* @param key current key code
struct GNUNET_LOCKMANAGER_Handle *h = cls;
struct GNUNET_LOCKMANAGER_Message *msg;
+ if (GNUNET_NO == r->acquire_sent) /* an acquire is still in queue */
+ return GNUNET_YES;
+ r->acquire_sent = GNUNET_NO;
msg = generate_acquire_msg (r->domain, r->lock);
- queue_message (h, msg);
+ r->mqe = queue_message (h, msg, r);
if (GNUNET_LOCKMANAGER_RELEASE == r->status)
return GNUNET_YES;
if (NULL != r->status_cb)
r->status = GNUNET_LOCKMANAGER_RELEASE;
r->status_cb = status_cb;
r->status_cb_cls = status_cb_cls;
+ r->acquire_sent = GNUNET_NO;
memcpy (r->domain, domain_name, domain_name_length);
msg = generate_acquire_msg (r->domain, r->lock);
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Queueing ACQUIRE message\n");
- queue_message (handle, msg);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Queueing ACQUIRE message\n");
+ r->mqe = queue_message (handle, msg, r);
get_key (r->domain, r->lock, &hash);
GNUNET_assert (GNUNET_OK ==
GNUNET_CONTAINER_multihashmap_put (r->handle->hashmap,
size_t domain_name_length;
LOG (GNUNET_ERROR_TYPE_DEBUG, "%s()\n", __func__);
- /* FIXME: Stop ACQUIRE retransmissions */
+ if (GNUNET_NO == request->acquire_sent)
+ {
+ GNUNET_assert (NULL != request->mqe);
+ if ((NULL != request->handle->transmit_handle)
+ && (request->handle->mq_head == request->mqe))
+ {
+ GNUNET_CLIENT_notify_transmit_ready_cancel
+ (request->handle->transmit_handle);
+ request->handle->transmit_handle = NULL;
+ }
+ GNUNET_CONTAINER_DLL_remove (request->handle->mq_head,
+ request->handle->mq_tail,
+ request->mqe);
+ GNUNET_free (request->mqe->msg);
+ GNUNET_free (request->mqe);
+ request->status = GNUNET_LOCKMANAGER_RELEASE;
+ }
if (GNUNET_LOCKMANAGER_SUCCESS == request->status)
{
domain_name_length = strlen (request->domain) + 1;
msg->header.size = htons (msg_size);
msg->lock = htonl (request->lock);
memcpy (&msg[1], request->domain, domain_name_length);
- queue_message (request->handle, msg);
+ GNUNET_assert (NULL == request->mqe);
+ (void) queue_message (request->handle, msg, request);
}
get_key (request->domain, request->lock, &hash);
GNUNET_assert (GNUNET_YES ==