/**
* Entry in the queue of messages we need to transmit to the helper.
*/
-struct HelperMessageQueueEntry
+struct GNUNET_HELPER_SendHandle
{
/**
* This is an entry in a DLL.
*/
- struct HelperMessageQueueEntry *next;
+ struct GNUNET_HELPER_SendHandle *next;
/**
* This is an entry in a DLL.
*/
- struct HelperMessageQueueEntry *prev;
+ struct GNUNET_HELPER_SendHandle *prev;
/**
* Message to transmit (allocated at the end of this struct)
*/
const struct GNUNET_MessageHeader *msg;
-
+
+ /**
+ * The handle to a helper process.
+ */
+ struct GNUNET_HELPER_Handle *h;
+
/**
* Function to call upon completion.
*/
/**
* First message queued for transmission to helper.
*/
- struct HelperMessageQueueEntry *mq_head;
+ struct GNUNET_HELPER_SendHandle *sh_head;
/**
* Last message queued for transmission to helper.
*/
- struct HelperMessageQueueEntry *mq_tail;
+ struct GNUNET_HELPER_SendHandle *sh_tail;
/**
* Binary to run.
static void
stop_helper (struct GNUNET_HELPER_Handle *h)
{
- struct HelperMessageQueueEntry *qe;
+ struct GNUNET_HELPER_SendHandle *sh;
if (NULL != h->helper_proc)
{
GNUNET_break (0 == GNUNET_OS_process_kill (h->helper_proc, SIGTERM));
GNUNET_break (GNUNET_OK == GNUNET_OS_process_wait (h->helper_proc));
- GNUNET_OS_process_close (h->helper_proc);
+ GNUNET_OS_process_destroy (h->helper_proc);
h->helper_proc = NULL;
}
if (GNUNET_SCHEDULER_NO_TASK != h->restart_task)
h->helper_out = NULL;
h->fh_from_helper = NULL;
}
- while (NULL != (qe = h->mq_head))
+ while (NULL != (sh = h->sh_head))
{
- GNUNET_CONTAINER_DLL_remove (h->mq_head,
- h->mq_tail,
- qe);
- qe->cont (qe->cont_cls, GNUNET_NO);
- GNUNET_free (qe);
+ GNUNET_CONTAINER_DLL_remove (h->sh_head,
+ h->sh_tail,
+ sh);
+ if (NULL != sh->cont)
+ sh->cont (sh->cont_cls, GNUNET_NO);
+ GNUNET_free (sh);
}
/* purge MST buffer */
- GNUNET_SERVER_mst_receive (h->mst, NULL, NULL, 0, GNUNET_YES, GNUNET_NO);
+ (void) GNUNET_SERVER_mst_receive (h->mst, NULL, NULL, 0, GNUNET_YES, GNUNET_NO);
}
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct GNUNET_HELPER_Handle *h = cls;
- char buf[GNUNET_SERVER_MAX_MESSAGE_SIZE];
+ char buf[GNUNET_SERVER_MAX_MESSAGE_SIZE] GNUNET_ALIGN;
ssize_t t;
h->read_task = GNUNET_SCHEDULER_NO_TASK;
_("Got %u bytes from helper `%s'\n"),
(unsigned int) t,
h->binary_name);
+ h->read_task = GNUNET_SCHEDULER_add_read_file (GNUNET_TIME_UNIT_FOREVER_REL,
+ h->fh_from_helper, &helper_read, h);
if (GNUNET_SYSERR ==
GNUNET_SERVER_mst_receive (h->mst, NULL, buf, t, GNUNET_NO, GNUNET_NO))
{
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
&restart_task, h);
return;
-
}
- h->read_task = GNUNET_SCHEDULER_add_read_file (GNUNET_TIME_UNIT_FOREVER_REL,
- h->fh_from_helper, &helper_read, h);
}
&restart_task, h);
return;
}
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ _("Starting HELPER process `%s'\n"),
+ h->binary_name);
h->fh_from_helper =
GNUNET_DISK_pipe_handle (h->helper_out, GNUNET_DISK_PIPE_END_READ);
h->fh_to_helper =
GNUNET_DISK_pipe_handle (h->helper_in, GNUNET_DISK_PIPE_END_WRITE);
h->helper_proc =
- GNUNET_OS_start_process_vap (h->helper_in, h->helper_out,
+ GNUNET_OS_start_process_vap (GNUNET_NO,
+ h->helper_in, h->helper_out,
h->binary_name,
h->binary_argv);
if (NULL == h->helper_proc)
void
GNUNET_HELPER_stop (struct GNUNET_HELPER_Handle *h)
{
- struct HelperMessageQueueEntry *qe;
+ struct GNUNET_HELPER_SendHandle *sh;
/* signal pending writes that we were stopped */
- while (NULL != (qe = h->mq_head))
+ while (NULL != (sh = h->sh_head))
{
- GNUNET_CONTAINER_DLL_remove (h->mq_head,
- h->mq_tail,
- qe);
- qe->cont (qe->cont_cls, GNUNET_SYSERR);
- GNUNET_free (qe);
+ GNUNET_CONTAINER_DLL_remove (h->sh_head,
+ h->sh_tail,
+ sh);
+ if (NULL != sh->cont)
+ sh->cont (sh->cont_cls, GNUNET_SYSERR);
+ GNUNET_free (sh);
}
stop_helper (h);
GNUNET_SERVER_mst_destroy (h->mst);
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct GNUNET_HELPER_Handle *h = cls;
- struct HelperMessageQueueEntry *qe;
+ struct GNUNET_HELPER_SendHandle *sh;
const char *buf;
ssize_t t;
h->fh_to_helper, &helper_write, h);
return;
}
- if (NULL == (qe = h->mq_head))
+ if (NULL == (sh = h->sh_head))
return; /* how did this happen? */
- buf = (const char*) qe->msg;
- t = GNUNET_DISK_file_write (h->fh_to_helper, &buf[qe->wpos], ntohs (qe->msg->size) - qe->wpos);
+ buf = (const char*) sh->msg;
+ t = GNUNET_DISK_file_write (h->fh_to_helper, &buf[sh->wpos], ntohs (sh->msg->size) - sh->wpos);
if (t <= 0)
{
/* On write-error, restart the helper */
&restart_task, h);
return;
}
- qe->wpos += t;
- if (qe->wpos == ntohs (qe->msg->size))
+ sh->wpos += t;
+ if (sh->wpos == ntohs (sh->msg->size))
{
- GNUNET_CONTAINER_DLL_remove (h->mq_head,
- h->mq_tail,
- qe);
- if (NULL != qe->cont)
- qe->cont (qe->cont_cls, GNUNET_YES);
- GNUNET_free (qe);
+ GNUNET_CONTAINER_DLL_remove (h->sh_head,
+ h->sh_tail,
+ sh);
+ if (NULL != sh->cont)
+ sh->cont (sh->cont_cls, GNUNET_YES);
+ GNUNET_free (sh);
}
- if (NULL != h->mq_head)
+ if (NULL != h->sh_head)
h->write_task = GNUNET_SCHEDULER_add_write_file (GNUNET_TIME_UNIT_FOREVER_REL,
h->fh_to_helper,
&helper_write,
* @param cont continuation to run once the message is out (PREREQ_DONE on succees, CANCEL
* if the helper process died, NULL during GNUNET_HELPER_stop).
* @param cont_cls closure for 'cont'
- * @return GNUNET_YES if the message will be sent
- * GNUNET_NO if the message was dropped
+ * @return NULL if the message was dropped,
+ * otherwise handle to cancel *cont* (actual transmission may
+ * not be abortable)
*/
-int
+struct GNUNET_HELPER_SendHandle *
GNUNET_HELPER_send (struct GNUNET_HELPER_Handle *h,
const struct GNUNET_MessageHeader *msg,
int can_drop,
GNUNET_HELPER_Continuation cont,
void *cont_cls)
{
- struct HelperMessageQueueEntry *qe;
+ struct GNUNET_HELPER_SendHandle *sh;
uint16_t mlen;
if (NULL == h->fh_to_helper)
- return GNUNET_NO;
+ return NULL;
if ( (GNUNET_YES == can_drop) &&
- (h->mq_head != NULL) )
- return GNUNET_NO;
+ (NULL != h->sh_head) )
+ return NULL;
mlen = ntohs (msg->size);
- qe = GNUNET_malloc (sizeof (struct HelperMessageQueueEntry) + mlen);
- qe->msg = (const struct GNUNET_MessageHeader*) &qe[1];
- memcpy (&qe[1], msg, mlen);
- qe->cont = cont;
- qe->cont_cls = cont_cls;
- GNUNET_CONTAINER_DLL_insert_tail (h->mq_head,
- h->mq_tail,
- qe);
+ sh = GNUNET_malloc (sizeof (struct GNUNET_HELPER_SendHandle) + mlen);
+ sh->msg = (const struct GNUNET_MessageHeader*) &sh[1];
+ memcpy (&sh[1], msg, mlen);
+ sh->h = h;
+ sh->cont = cont;
+ sh->cont_cls = cont_cls;
+ GNUNET_CONTAINER_DLL_insert_tail (h->sh_head,
+ h->sh_tail,
+ sh);
if (GNUNET_SCHEDULER_NO_TASK == h->write_task)
h->write_task = GNUNET_SCHEDULER_add_write_file (GNUNET_TIME_UNIT_FOREVER_REL,
h->fh_to_helper,
&helper_write,
h);
- return GNUNET_YES;
+ return sh;
+}
+
+/**
+ * Cancel a 'send' operation. If possible, transmitting the
+ * message is also aborted, but at least 'cont' won't be
+ * called.
+ *
+ * @param sh operation to cancel
+ */
+void
+GNUNET_HELPER_send_cancel (struct GNUNET_HELPER_SendHandle *sh)
+{
+ struct GNUNET_HELPER_Handle *h = sh->h;
+
+ sh->cont = NULL;
+ sh->cont_cls = NULL;
+ if (0 == sh->wpos)
+ {
+ GNUNET_CONTAINER_DLL_remove (h->sh_head, h->sh_tail, sh);
+ if (NULL == h->sh_head)
+ {
+ GNUNET_SCHEDULER_cancel (h->write_task);
+ h->write_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ GNUNET_free (sh);
+ }
}