* Double linked list tail for message queue
*/
struct MessageQueue *mq_tail;
+
+ /**
+ * Are we currently handling replies?
+ */
+ int in_replies;
};
};
+/**
+ * Handler for server replies
+ *
+ * @param cls the LOCKMANAGER_Handle
+ * @param msg received message, NULL on timeout or fatal error
+ */
+static void
+handle_replies (void *cls,
+ const struct GNUNET_MessageHeader *msg);
+
+
/**
* Transmit notify for sending message to server
*
uint16_t msg_size;
handle->transmit_handle = NULL;
+ queue_entity = handle->mq_head;
+ GNUNET_assert (NULL != queue_entity);
if ((0 == size) || (NULL == buf))
{
- /* FIXME: Timed out -- requeue? */
+ handle->transmit_handle =
+ GNUNET_CLIENT_notify_transmit_ready (handle->conn,
+ ntohs
+ (queue_entity->msg->header.size),
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ GNUNET_YES,
+ &transmit_notify,
+ handle);
return 0;
- }
- queue_entity = handle->mq_head;
- GNUNET_assert (NULL != queue_entity);
+ }
msg_size = ntohs (queue_entity->msg->header.size);
GNUNET_assert (size >= msg_size);
memcpy (buf, queue_entity->msg, msg_size);
&transmit_notify,
handle);
}
+ if (GNUNET_NO == handle->in_replies)
+ {
+ GNUNET_CLIENT_receive (handle->conn,
+ &handle_replies,
+ handle,
+ GNUNET_TIME_UNIT_FOREVER_REL);
+ handle->in_replies = GNUNET_YES;
+ }
return msg_size;
}
/**
- * Iterator to call relase and free all LockingRequest entries
+ * Function to generate acquire message for a lock
+ *
+ * @param domain_name the domain name of the lock
+ * @param lock the lock number
+ * @return the generated GNUNET_LOCKMANAGER_Message
+ */
+static struct GNUNET_LOCKMANAGER_Message *
+generate_acquire_msg (const char *domain_name, uint32_t lock)
+{
+ struct GNUNET_LOCKMANAGER_Message *msg;
+ size_t domain_name_len;
+ uint16_t msg_size;
+
+ domain_name_len = strlen (domain_name) + 1;
+ msg_size = sizeof (struct GNUNET_LOCKMANAGER_Message) + domain_name_len;
+ msg = GNUNET_malloc (msg_size);
+ msg->header.type = htons (GNUNET_MESSAGE_TYPE_LOCKMANAGER_ACQUIRE);
+ msg->header.size = htons (msg_size);
+ msg->lock = htonl (lock);
+ memcpy (&msg[1], domain_name, domain_name_len);
+ return msg;
+}
+
+
+/**
+ * Iterator to call relase on locks
*
* @param cls the lockmanager handle
* @param key current key code
* GNUNET_NO if not.
*/
static int
-release_iterator(void *cls,
- const struct GNUNET_HashCode * key,
- void *value)
+release_n_retry_iterator (void *cls,
+ const struct GNUNET_HashCode * key,
+ void *value)
{
- struct GNUNET_LOCKMANAGER_Handle *h = cls;
struct GNUNET_LOCKMANAGER_LockingRequest *r = value;
+ struct GNUNET_LOCKMANAGER_Handle *h = cls;
+ struct GNUNET_LOCKMANAGER_Message *msg;
+ msg = generate_acquire_msg (r->domain, r->lock);
+ queue_message (h, msg);
+ if (GNUNET_LOCKMANAGER_RELEASE == r->status)
+ return GNUNET_YES;
if (NULL != r->status_cb)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Calling status change for RELEASE on lock num: %d, domain: %s\n",
r->lock, r->domain);
+ r->status = GNUNET_LOCKMANAGER_RELEASE;
r->status_cb (r->status_cb_cls,
r->domain,
r->lock,
GNUNET_LOCKMANAGER_RELEASE);
}
- GNUNET_assert (GNUNET_YES ==
- GNUNET_CONTAINER_multihashmap_remove (h->hashmap,
- key,
- value));
- GNUNET_free (r->domain);
- GNUNET_free (r);
return GNUNET_YES;
}
struct GNUNET_HashCode hash;
uint32_t lock;
uint16_t msize;
-
+
+ handle->in_replies = GNUNET_NO;
if (NULL == msg)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Lockmanager service not available or went down\n");
- /* Should release all locks and free its locking requests */
+ "Lockmanager service not available or went down\n");
+ /* Should release all locks and retry to acquire them */
GNUNET_CONTAINER_multihashmap_iterate (handle->hashmap,
- &release_iterator,
+ &release_n_retry_iterator,
handle);
return;
}
&handle_replies,
handle,
GNUNET_TIME_UNIT_FOREVER_REL);
+ handle->in_replies = GNUNET_YES;
if (GNUNET_MESSAGE_TYPE_LOCKMANAGER_SUCCESS != ntohs(msg->type))
{
GNUNET_break (0);
&handle_replies,
h,
GNUNET_TIME_UNIT_FOREVER_REL);
-
+ h->in_replies = GNUNET_YES;
LOG (GNUNET_ERROR_TYPE_DEBUG, "%s() END\n", __func__);
return h;
}
struct GNUNET_LOCKMANAGER_LockingRequest *r;
struct GNUNET_LOCKMANAGER_Message *msg;
struct GNUNET_HashCode hash;
- uint16_t msg_size;
size_t domain_name_length;
LOG (GNUNET_ERROR_TYPE_DEBUG, "%s()\n", __func__);
r->status_cb = status_cb;
r->status_cb_cls = status_cb_cls;
memcpy (r->domain, domain_name, domain_name_length);
- msg_size = sizeof (struct GNUNET_LOCKMANAGER_Message) + domain_name_length;
- msg = GNUNET_malloc (msg_size);
- msg->header.type = htons (GNUNET_MESSAGE_TYPE_LOCKMANAGER_ACQUIRE);
- msg->header.size = htons (msg_size);
- msg->lock = htonl (lock);
- memcpy (&msg[1], r->domain, domain_name_length);
+ msg = generate_acquire_msg (r->domain, r->lock);
LOG (GNUNET_ERROR_TYPE_DEBUG, "Queueing ACQUIRE message\n");
queue_message (handle, msg);
get_key (r->domain, r->lock, &hash);
/**
* Function to cancel the locking request generated by
- * GNUNET_LOCKMANAGER_acquire_lock. If the lock is acquired us then the lock is
- * released. GNUNET_LOCKMANAGER_StatusCallback will not be called upon any
+ * GNUNET_LOCKMANAGER_acquire_lock. If the lock is acquired by us then the lock
+ * is released. GNUNET_LOCKMANAGER_StatusCallback will not be called upon any
* status changes resulting due to this call.
*
* @param request the LockingRequest to cancel