-fixing #2440 - cancel messages if RELEASE called when ACQUIRE still in queue
authorSree Harsha Totakura <totakura@in.tum.de>
Thu, 21 Jun 2012 11:35:54 +0000 (11:35 +0000)
committerSree Harsha Totakura <totakura@in.tum.de>
Thu, 21 Jun 2012 11:35:54 +0000 (11:35 +0000)
src/lockmanager/lockmanager_api.c

index 7a47da65b3170bb34087808f16f2c9b99ca45131..d648d1c5c9dfb8e750950bedf3d0be6306c5bc2a 100644 (file)
@@ -149,6 +149,11 @@ struct GNUNET_LOCKMANAGER_LockingRequest
    * The status of the lock
    */
   enum GNUNET_LOCKMANAGER_Status status;
+  
+  /**
+   * set to GNUNET_YES if acquire message for this lock is till in messga queue
+   */
+  int acquire_sent;
 };
 
 
@@ -220,10 +225,17 @@ transmit_notify (void *cls, size_t size, void *buf)
   memcpy (buf, queue_entity->msg, msg_size);
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Message of size %u sent\n", msg_size);
-  GNUNET_free (queue_entity->msg);
+  if (GNUNET_MESSAGE_TYPE_LOCKMANAGER_ACQUIRE
+      == ntohs (queue_entity->msg->header.type))
+  {
+    GNUNET_break (GNUNET_NO == queue_entity->lr->acquire_sent);
+    queue_entity->lr->acquire_sent = GNUNET_YES;
+    queue_entity->lr->mqe = NULL;
+  }
+  GNUNET_free (queue_entity->msg);  
   GNUNET_CONTAINER_DLL_remove (handle->mq_head,
                                handle->mq_tail,
-                               queue_entity);
+                               queue_entity);  
   GNUNET_free (queue_entity);
   queue_entity = handle->mq_head;
   if (NULL != queue_entity)
@@ -254,16 +266,20 @@ transmit_notify (void *cls, size_t size, void *buf)
  *
  * @param handle the lockmanager handle whose queue will be used
  * @param msg the message to be queued
+ * @param request the locking reqeust responsible for queueing this message
+ * @return the MessageQueue entity that has been queued
  */
-static void
+static struct MessageQueue *
 queue_message (struct GNUNET_LOCKMANAGER_Handle *handle,
-               struct GNUNET_LOCKMANAGER_Message *msg)
+               struct GNUNET_LOCKMANAGER_Message *msg,
+               struct GNUNET_LOCKMANAGER_LockingRequest *request)
 {
   struct MessageQueue *queue_entity;
 
   GNUNET_assert (NULL != msg);
   queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
   queue_entity->msg = msg;
+  queue_entity->lr = request;
   GNUNET_CONTAINER_DLL_insert_tail (handle->mq_head,
                                     handle->mq_tail,
                                     queue_entity);
@@ -277,6 +293,7 @@ queue_message (struct GNUNET_LOCKMANAGER_Handle *handle,
                                            &transmit_notify,
                                            handle);
   }
+  return queue_entity;
 }
 
 
@@ -407,7 +424,9 @@ generate_acquire_msg (const char *domain_name, uint32_t lock)
 
 
 /**
- * Iterator to call relase on locks
+ * Iterator to call relase on locks; acquire messages are sent for all
+ * locks. In addition, if a lock is acquired before, it is not released and its
+ * status callback is called to signal its release
  *
  * @param cls the lockmanager handle
  * @param key current key code
@@ -425,8 +444,11 @@ release_n_retry_iterator (void *cls,
   struct GNUNET_LOCKMANAGER_Handle *h = cls;
   struct GNUNET_LOCKMANAGER_Message *msg;
 
+  if (GNUNET_NO == r->acquire_sent) /* an acquire is still in queue */
+    return GNUNET_YES;
+  r->acquire_sent = GNUNET_NO;
   msg = generate_acquire_msg (r->domain, r->lock);
-  queue_message (h, msg);
+  r->mqe = queue_message (h, msg, r);
   if (GNUNET_LOCKMANAGER_RELEASE == r->status)
     return GNUNET_YES;
   if (NULL != r->status_cb)
@@ -680,10 +702,11 @@ GNUNET_LOCKMANAGER_acquire_lock (struct GNUNET_LOCKMANAGER_Handle *handle,
   r->status = GNUNET_LOCKMANAGER_RELEASE;
   r->status_cb = status_cb;
   r->status_cb_cls = status_cb_cls;
+  r->acquire_sent = GNUNET_NO;
   memcpy (r->domain, domain_name, domain_name_length);
   msg = generate_acquire_msg (r->domain, r->lock);
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "Queueing ACQUIRE message\n");
-  queue_message (handle, msg);
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "Queueing ACQUIRE message\n");  
+  r->mqe = queue_message (handle, msg, r);
   get_key (r->domain, r->lock, &hash);
   GNUNET_assert (GNUNET_OK == 
                 GNUNET_CONTAINER_multihashmap_put (r->handle->hashmap,
@@ -713,7 +736,23 @@ GNUNET_LOCKMANAGER_cancel_request (struct GNUNET_LOCKMANAGER_LockingRequest
   size_t domain_name_length;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s()\n", __func__);
-  /* FIXME: Stop ACQUIRE retransmissions */
+  if (GNUNET_NO == request->acquire_sent)
+  {
+    GNUNET_assert (NULL != request->mqe);
+    if ((NULL != request->handle->transmit_handle) 
+        && (request->handle->mq_head == request->mqe))
+    {
+      GNUNET_CLIENT_notify_transmit_ready_cancel
+        (request->handle->transmit_handle);
+      request->handle->transmit_handle = NULL;
+    }      
+    GNUNET_CONTAINER_DLL_remove (request->handle->mq_head,
+                                 request->handle->mq_tail,
+                                 request->mqe);
+    GNUNET_free (request->mqe->msg);
+    GNUNET_free (request->mqe);
+    request->status = GNUNET_LOCKMANAGER_RELEASE;
+  }
   if (GNUNET_LOCKMANAGER_SUCCESS == request->status)
   {
     domain_name_length = strlen (request->domain) + 1;
@@ -724,7 +763,8 @@ GNUNET_LOCKMANAGER_cancel_request (struct GNUNET_LOCKMANAGER_LockingRequest
     msg->header.size = htons (msg_size);
     msg->lock = htonl (request->lock);
     memcpy (&msg[1], request->domain, domain_name_length);
-    queue_message (request->handle, msg);
+    GNUNET_assert (NULL == request->mqe);
+    (void) queue_message (request->handle, msg, request);
   }
   get_key (request->domain, request->lock, &hash);
   GNUNET_assert (GNUNET_YES ==