{
/* need to align or need more space */
mst->pos -= mst->off;
- memmove (ibuf, &ibuf[mst->off], mst->pos);
+ memmove (ibuf,
+ &ibuf[mst->off],
+ mst->pos);
mst->off = 0;
}
if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader))
{
- delta =
- GNUNET_MIN (sizeof (struct GNUNET_MessageHeader) -
- (mst->pos - mst->off), size);
- GNUNET_memcpy (&ibuf[mst->pos], buf, delta);
+ delta
+ = GNUNET_MIN (sizeof (struct GNUNET_MessageHeader)
+ - (mst->pos - mst->off),
+ size);
+ GNUNET_memcpy (&ibuf[mst->pos],
+ buf,
+ delta);
mst->pos += delta;
buf += delta;
size -= delta;
{
/* can get more space by moving */
mst->pos -= mst->off;
- memmove (ibuf, &ibuf[mst->off], mst->pos);
+ memmove (ibuf,
+ &ibuf[mst->off],
+ mst->pos);
mst->off = 0;
}
if (mst->curr_buf < want)
{
/* need to get more space by growing buffer */
GNUNET_assert (0 == mst->off);
- mst->hdr = GNUNET_realloc (mst->hdr, want);
+ mst->hdr = GNUNET_realloc (mst->hdr,
+ want);
ibuf = (char *) mst->hdr;
mst->curr_buf = want;
}
hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off];
if (mst->pos - mst->off < want)
{
- delta = GNUNET_MIN (want - (mst->pos - mst->off), size);
+ delta = GNUNET_MIN (want - (mst->pos - mst->off),
+ size);
GNUNET_assert (mst->pos + delta <= mst->curr_buf);
- GNUNET_memcpy (&ibuf[mst->pos], buf, delta);
+ GNUNET_memcpy (&ibuf[mst->pos],
+ buf,
+ delta);
mst->pos += delta;
buf += delta;
size -= delta;
{
if (size + mst->pos > mst->curr_buf)
{
- mst->hdr = GNUNET_realloc (mst->hdr, size + mst->pos);
+ mst->hdr = GNUNET_realloc (mst->hdr,
+ size + mst->pos);
ibuf = (char *) mst->hdr;
mst->curr_buf = size + mst->pos;
}
GNUNET_assert (size + mst->pos <= mst->curr_buf);
- GNUNET_memcpy (&ibuf[mst->pos], buf, size);
+ GNUNET_memcpy (&ibuf[mst->pos],
+ buf,
+ size);
mst->pos += size;
}
if (purge)
int purge,
int one_shot)
{
- GNUNET_assert (0); // not implemented
- return GNUNET_SYSERR;
+ ssize_t ret;
+ size_t left;
+ char *buf;
+
+ left = mst->curr_buf - mst->pos;
+ buf = (char *) mst->hdr;
+ ret = GNUNET_NETWORK_socket_recv (sock,
+ &buf[mst->pos],
+ left);
+ if (-1 == ret)
+ {
+ if ( (EAGAIN == errno) ||
+ (EINTR == errno) )
+ return GNUNET_OK;
+ GNUNET_log_strerror (GNUNET_ERROR_TYPE_INFO,
+ "recv");
+ return GNUNET_SYSERR;
+ }
+ if (0 == ret)
+ {
+ /* other side closed connection, treat as error */
+ return GNUNET_SYSERR;
+ }
+ mst->pos += ret;
+ return GNUNET_MST_from_buffer (mst,
+ NULL,
+ 0,
+ purge,
+ one_shot);
}
int ret;
/**
- * If GNUNET_YES, consider unknown message types an error where the
+ * If #GNUNET_YES, consider unknown message types an error where the
* client is disconnected.
- * FIXME: remove?
*/
int require_found;
};
/**
* Tokenizer we use for processing incoming data.
*/
- struct GNUNET_SERVER_MessageStreamTokenizer *mst;
+ struct GNUNET_MessageStreamTokenizer *mst;
/**
* Task that warns about missing calls to
*/
void *user_context;
+ /**
+ * Time when we last gave a message from this client
+ * to the application.
+ */
+ struct GNUNET_TIME_Absolute warn_start;
+
/**
* Persist the file handle for this client no matter what happens,
* force the OS to close once the process actually dies. Should only
*/
int is_monitor;
+ /**
+ * Are we waiting for the application to call #GNUNET_SERVICE_client_continue()?
+ */
+ int needs_continue;
+
/**
* Type of last message processed (for warn_no_receive_done).
*/
{
char *opt;
- if (!GNUNET_CONFIGURATION_have_value (sh->cfg, sh->service_name, option))
+ if (! GNUNET_CONFIGURATION_have_value (sh->cfg,
+ sh->service_name,
+ option))
{
*ret = NULL;
return GNUNET_OK;
GNUNET_break (GNUNET_OK ==
GNUNET_CONFIGURATION_get_value_string (sh->cfg,
sh->service_name,
- option, &opt));
+ option,
+ &opt));
if (NULL == (*ret = GNUNET_STRINGS_parse_ipv4_policy (opt)))
{
LOG (GNUNET_ERROR_TYPE_WARNING,
{
char *opt;
- if (!GNUNET_CONFIGURATION_have_value (sh->cfg, sh->service_name, option))
+ if (! GNUNET_CONFIGURATION_have_value (sh->cfg,
+ sh->service_name,
+ option))
{
*ret = NULL;
return GNUNET_OK;
GNUNET_break (GNUNET_OK ==
GNUNET_CONFIGURATION_get_value_string (sh->cfg,
sh->service_name,
- option, &opt));
+ option,
+ &opt));
if (NULL == (*ret = GNUNET_STRINGS_parse_ipv6_policy (opt)))
{
LOG (GNUNET_ERROR_TYPE_WARNING,
_("Could not parse IPv6 network specification `%s' for `%s:%s'\n"),
- opt, sh->service_name, option);
+ opt,
+ sh->service_name,
+ option);
GNUNET_free (opt);
return GNUNET_SYSERR;
}
un = GNUNET_new (struct sockaddr_un);
un->sun_family = AF_UNIX;
- strncpy (un->sun_path, unixpath, sizeof (un->sun_path) - 1);
+ strncpy (un->sun_path,
+ unixpath,
+ sizeof (un->sun_path) - 1);
#ifdef LINUX
if (GNUNET_YES == abstract)
un->sun_path[0] = '\0';
0);
if (NULL == desc)
{
- if ((ENOBUFS == errno) ||
- (ENOMEM == errno) ||
- (ENFILE == errno) ||
- (EACCES == errno))
+ if ( (ENOBUFS == errno) ||
+ (ENOMEM == errno) ||
+ (ENFILE == errno) ||
+ (EACCES == errno) )
{
LOG_STRERROR (GNUNET_ERROR_TYPE_ERROR,
"socket");
}
else
{
- GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (desc));
+ GNUNET_break (GNUNET_OK ==
+ GNUNET_NETWORK_socket_close (desc));
desc = NULL;
}
}
if (GNUNET_SYSERR == abstract)
abstract = GNUNET_NO;
#endif
- if ((GNUNET_YES != abstract)
- && (GNUNET_OK !=
- GNUNET_DISK_directory_create_for_file (unixpath)))
+ if ( (GNUNET_YES != abstract) &&
+ (GNUNET_OK !=
+ GNUNET_DISK_directory_create_for_file (unixpath)) )
GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_ERROR,
"mkdir",
unixpath);
}
else
{
- GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (desc));
+ GNUNET_break (GNUNET_OK ==
+ GNUNET_NETWORK_socket_close (desc));
desc = NULL;
}
}
LOG (GNUNET_ERROR_TYPE_ERROR,
_("Could not access a pre-bound socket, will try to bind myself\n"));
for (i = 0; (i < count) && (NULL != lsocks[i]); i++)
- GNUNET_break (0 == GNUNET_NETWORK_socket_close (lsocks[i]));
+ GNUNET_break (GNUNET_OK ==
+ GNUNET_NETWORK_socket_close (lsocks[i]));
GNUNET_free (lsocks);
return NULL;
}
GNUNET_a2s (server_addr, socklen));
}
}
- GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (sock));
+ GNUNET_break (GNUNET_OK ==
+ GNUNET_NETWORK_socket_close (sock));
errno = eno;
return NULL;
}
{
LOG_STRERROR (GNUNET_ERROR_TYPE_ERROR,
"listen");
- GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (sock));
+ GNUNET_break (GNUNET_OK ==
+ GNUNET_NETWORK_socket_close (sock));
errno = 0;
return NULL;
}
(unsigned int) 3 + cnt);
cnt++;
while (NULL != lsocks[cnt])
- GNUNET_break (0 == GNUNET_NETWORK_socket_close (lsocks[cnt++]));
+ GNUNET_break (GNUNET_OK ==
+ GNUNET_NETWORK_socket_close (lsocks[cnt++]));
GNUNET_free (lsocks);
lsocks = NULL;
break;
struct ServiceListenContext *slc = sh.slc_head;
sh.slc_head = slc->next;
- // FIXME: destroy slc
+ if (NULL != slc->listen_task)
+ GNUNET_SCHEDULER_cancel (slc->listen_task);
+ GNUNET_break (GNUNET_OK ==
+ GNUNET_NETWORK_socket_close (slc->listen_socket));
GNUNET_free (slc);
}
* the message queue.
* Not every message queue implementation supports an error handler.
*
- * @param cls closure
+ * @param cls closure with our `struct GNUNET_SERVICE_Client`
* @param error error code
*/
static void
enum GNUNET_MQ_Error error)
{
struct GNUNET_SERVICE_Client *client = cls;
+ struct GNUNET_SERVICE_Handle *sh = client->sh;
- // FIXME!
+ if ( (GNUNET_MQ_ERROR_NO_MATCH == error) &&
+ (GNUNET_NO == sh->require_found) )
+ return; /* ignore error */
+ GNUNET_SERVICE_client_drop (client);
+}
+
+
+/**
+ * Task run to warn about missing calls to #GNUNET_SERVICE_client_continue().
+ *
+ * @param cls our `struct GNUNET_SERVICE_Client *` to process more requests from
+ */
+static void
+warn_no_client_continue (void *cls)
+{
+ struct GNUNET_SERVICE_Client *client = cls;
+
+ GNUNET_break (0 != client->warn_type); /* type should never be 0 here, as we don't use 0 */
+ client->warn_task
+ = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
+ &warn_no_client_continue,
+ client);
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ _("Processing code for message of type %u did not call `GNUNET_SERVICE_client_continue' after %s\n"),
+ (unsigned int) client->warn_type,
+ GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_duration (client->warn_start),
+ GNUNET_YES));
}
* Functions with this signature are called whenever a
* complete message is received by the tokenizer for a client.
*
- * Do not call #GNUNET_SERVER_mst_destroy() from within
+ * Do not call #GNUNET_MST_destroy() from within
* the scope of this callback.
*
* @param cls closure with the `struct GNUNET_SERVICE_Client *`
- * @param client_cls closure with the `struct GNUNET_SERVICE_Client *`
* @param message the actual message
* @return #GNUNET_OK on success (always)
*/
static int
service_client_mst_cb (void *cls,
- void *client_cls,
const struct GNUNET_MessageHeader *message)
{
struct GNUNET_SERVICE_Client *client = cls;
+ GNUNET_assert (GNUNET_NO == client->needs_continue);
+ client->needs_continue = GNUNET_YES;
+ client->warn_type = ntohs (message->type);
+ client->warn_start = GNUNET_TIME_absolute_get ();
+ client->warn_task
+ = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
+ &warn_no_client_continue,
+ client);
GNUNET_MQ_inject_message (client->mq,
message);
return GNUNET_OK;
service_client_recv (void *cls)
{
struct GNUNET_SERVICE_Client *client = cls;
+ int ret;
- // FIXME: read into buffer, pass to MST, then client->mq inject!
- // FIXME: revise MST API to avoid the memcpy!
- // i.e.: GNUNET_MST_read (client->sock);
+ client->recv_task = NULL;
+ ret = GNUNET_MST_read (client->mst,
+ client->sock,
+ GNUNET_NO,
+ GNUNET_YES);
+ if (GNUNET_SYSERR == ret)
+ {
+ GNUNET_break (0);
+ GNUNET_SERVICE_client_drop (client);
+ return;
+ }
+ if (GNUNET_NO == ret)
+ return; /* more messages in buffer, wait for application
+ to be done processing */
+ GNUNET_assert (GNUNET_OK == ret);
+ if (GNUNET_YES == client->needs_continue)
+ return;
+ /* MST needs more data, re-schedule read job */
+ client->recv_task
+ = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
+ client->sock,
+ &service_client_recv,
+ client);
}
sh->handlers,
&service_mq_error_handler,
client);
- client->mst = GNUNET_SERVER_mst_create (&service_client_mst_cb,
- client);
+ client->mst = GNUNET_MST_create (&service_client_mst_cb,
+ client);
client->user_context = sh->connect_cb (sh->cb_cls,
client,
client->mq);
slc->listen_task = NULL;
while (1)
+ {
+ struct GNUNET_NETWORK_Handle *sock;
+ const struct sockaddr_in *v4;
+ const struct sockaddr_in6 *v6;
+ struct sockaddr_storage sa;
+ socklen_t addrlen;
+ int ok;
+
+ addrlen = sizeof (sa);
+ sock = GNUNET_NETWORK_socket_accept (slc->listen_socket,
+ (struct sockaddr *) &sa,
+ &addrlen);
+ if (NULL == sock)
+ break;
+ switch (sa.ss_family)
{
- struct GNUNET_NETWORK_Handle *sock;
- const struct sockaddr_in *v4;
- const struct sockaddr_in6 *v6;
- struct sockaddr_storage sa;
- socklen_t addrlen;
- int ok;
-
- addrlen = sizeof (sa);
- sock = GNUNET_NETWORK_socket_accept (slc->listen_socket,
- (struct sockaddr *) &sa,
- &addrlen);
- if (NULL == sock)
- break;
- switch (sa.ss_family)
- {
- case AF_INET:
- GNUNET_assert (addrlen == sizeof (struct sockaddr_in));
- v4 = (const struct sockaddr_in *) &sa;
- ok = ( ( (NULL == sh->v4_allowed) ||
- (check_ipv4_listed (sh->v4_allowed,
- &v4->sin_addr))) &&
- ( (NULL == sh->v4_denied) ||
- (! check_ipv4_listed (sh->v4_denied,
- &v4->sin_addr)) ) );
- break;
- case AF_INET6:
- GNUNET_assert (addrlen == sizeof (struct sockaddr_in6));
- v6 = (const struct sockaddr_in6 *) &sa;
- ok = ( ( (NULL == sh->v6_allowed) ||
- (check_ipv6_listed (sh->v6_allowed,
- &v6->sin6_addr))) &&
- ( (NULL == sh->v6_denied) ||
- (! check_ipv6_listed (sh->v6_denied,
- &v6->sin6_addr)) ) );
- break;
+ case AF_INET:
+ GNUNET_assert (addrlen == sizeof (struct sockaddr_in));
+ v4 = (const struct sockaddr_in *) &sa;
+ ok = ( ( (NULL == sh->v4_allowed) ||
+ (check_ipv4_listed (sh->v4_allowed,
+ &v4->sin_addr))) &&
+ ( (NULL == sh->v4_denied) ||
+ (! check_ipv4_listed (sh->v4_denied,
+ &v4->sin_addr)) ) );
+ break;
+ case AF_INET6:
+ GNUNET_assert (addrlen == sizeof (struct sockaddr_in6));
+ v6 = (const struct sockaddr_in6 *) &sa;
+ ok = ( ( (NULL == sh->v6_allowed) ||
+ (check_ipv6_listed (sh->v6_allowed,
+ &v6->sin6_addr))) &&
+ ( (NULL == sh->v6_denied) ||
+ (! check_ipv6_listed (sh->v6_denied,
+ &v6->sin6_addr)) ) );
+ break;
#ifndef WINDOWS
- case AF_UNIX:
- ok = GNUNET_OK; /* controlled using file-system ACL now */
- break;
+ case AF_UNIX:
+ ok = GNUNET_OK; /* controlled using file-system ACL now */
+ break;
#endif
- default:
- LOG (GNUNET_ERROR_TYPE_WARNING,
- _("Unknown address family %d\n"),
- sa.ss_family);
- return;
- }
- if (! ok)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Service rejected incoming connection from %s due to policy.\n",
- GNUNET_a2s ((const struct sockaddr *) &sa,
- addrlen));
- GNUNET_NETWORK_socket_close (sock);
- continue;
- }
+ default:
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ _("Unknown address family %d\n"),
+ sa.ss_family);
+ return;
+ }
+ if (! ok)
+ {
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Service accepted incoming connection from %s.\n",
- GNUNET_a2s ((const struct sockaddr *) &sa,
- addrlen));
- start_client (slc->sh,
- sock);
+ "Service rejected incoming connection from %s due to policy.\n",
+ GNUNET_a2s ((const struct sockaddr *) &sa,
+ addrlen));
+ GNUNET_break (GNUNET_OK ==
+ GNUNET_NETWORK_socket_close (sock));
+ continue;
}
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Service accepted incoming connection from %s.\n",
+ GNUNET_a2s ((const struct sockaddr *) &sa,
+ addrlen));
+ start_client (slc->sh,
+ sock);
+ }
slc->listen_task
= GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
slc->listen_socket,
}
+/**
+ * Task run to resume receiving data from the client after
+ * the client called #GNUNET_SERVICE_client_continue().
+ *
+ * @param cls our `struct GNUNET_SERVICE_Client`
+ */
+static void
+resume_client_receive (void *cls)
+{
+ struct GNUNET_SERVICE_Client *c = cls;
+ int ret;
+
+ c->recv_task = NULL;
+ /* first, check if there is still something in the buffer */
+ ret = GNUNET_MST_next (c->mst,
+ GNUNET_YES);
+ if (GNUNET_SYSERR == ret)
+ {
+ GNUNET_break (0);
+ GNUNET_SERVICE_client_drop (c);
+ return;
+ }
+ if (GNUNET_NO == ret)
+ return; /* done processing, wait for more later */
+ GNUNET_assert (GNUNET_OK == ret);
+ if (GNUNET_YES == c->needs_continue)
+ return; /* #GNUNET_MST_next() did give a message to the client */
+ /* need to receive more data from the network first */
+ c->recv_task
+ = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
+ c->sock,
+ &service_client_recv,
+ c);
+}
+
+
/**
* Continue receiving further messages from the given client.
* Must be called after each message received.
void
GNUNET_SERVICE_client_continue (struct GNUNET_SERVICE_Client *c)
{
- GNUNET_break (0); // not implemented
+ GNUNET_assert (GNUNET_YES == c->needs_continue);
+ GNUNET_assert (NULL == c->recv_task);
+ c->needs_continue = GNUNET_NO;
+ if (NULL != c->warn_task)
+ {
+ GNUNET_SCHEDULER_cancel (c->warn_task);
+ c->warn_task = NULL;
+ }
+ c->recv_task = GNUNET_SCHEDULER_add_now (&resume_client_receive,
+ c);
}
GNUNET_SCHEDULER_cancel (c->send_task);
c->send_task = NULL;
}
- GNUNET_SERVER_mst_destroy (c->mst);
+ GNUNET_MST_destroy (c->mst);
GNUNET_MQ_destroy (c->mq);
if (GNUNET_NO == c->persist)
{
- GNUNET_NETWORK_socket_close (c->sock);
+ GNUNET_break (GNUNET_OK ==
+ GNUNET_NETWORK_socket_close (c->sock));
}
else
{