/*
* This file is part of GNUnet
- * (C) 2013 Christian Grothoff (and other contributing authors)
+ * Copyright (C) 2013 Christian Grothoff (and other contributing authors)
*
* GNUnet is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published
*
* You should have received a copy of the GNU General Public License
* along with GNUnet; see the file COPYING. If not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 02111-1307, USA.
+ * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
*/
/**
struct MessageTransmitQueue *tmit_msgs_head;
struct MessageTransmitQueue *tmit_msgs_tail;
+ struct GNUNET_PSYC_Channel *channel;
+
/**
* Public key of the channel.
*/
};
+struct OperationClosure
+{
+ struct GNUNET_SERVER_Client *client;
+ struct Place *plc;
+ uint64_t op_id;
+ uint32_t flags;
+};
+
+
static int
psyc_transmit_message (struct Place *plc);
client_send_msg (const struct Place *plc,
const struct GNUNET_MessageHeader *msg)
{
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%p Sending message to clients.\n", plc);
struct ClientListItem *cli = plc->clients_head;
}
+/**
+ * Send a result code back to the client.
+ *
+ * @param client
+ * Client that should receive the result code.
+ * @param result_code
+ * Code to transmit.
+ * @param op_id
+ * Operation ID in network byte order.
+ * @param data
+ * Data payload or NULL.
+ * @param data_size
+ * Size of @a data.
+ */
+static void
+client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id,
+ int64_t result_code, const void *data, uint16_t data_size)
+{
+ struct GNUNET_OperationResultMessage *res;
+
+ res = GNUNET_malloc (sizeof (*res) + data_size);
+ res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
+ res->header.size = htons (sizeof (*res) + data_size);
+ res->result_code = GNUNET_htonll (result_code);
+ res->op_id = op_id;
+ if (0 < data_size)
+ memcpy (&res[1], data, data_size);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Sending result to client for operation #%" PRIu64 ": "
+ "%" PRId64 " (size: %u)\n",
+ client, GNUNET_ntohll (op_id), result_code, data_size);
+
+ GNUNET_SERVER_notification_context_add (nc, client);
+ GNUNET_SERVER_notification_context_unicast (nc, client, &res->header,
+ GNUNET_NO);
+ GNUNET_free (res);
+}
+
+
/**
* Called after a PSYC master is started.
*/
static void
-psyc_master_started (void *cls, uint64_t max_message_id)
+psyc_master_started (void *cls, int result, uint64_t max_message_id)
{
struct Host *hst = cls;
struct Place *plc = &hst->plc;
plc->max_message_id = max_message_id;
plc->is_ready = GNUNET_YES;
- struct CountersResult res;
+ struct GNUNET_PSYC_CountersResultMessage res;
res.header.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_HOST_ENTER_ACK);
res.header.size = htons (sizeof (res));
- res.result_code = htonl (GNUNET_OK);
+ res.result_code = htonl (result - INT32_MIN);
res.max_message_id = GNUNET_htonll (plc->max_message_id);
client_send_msg (plc, &res.header);
* Called after a PSYC slave is connected.
*/
static void
-psyc_slave_connected (void *cls, uint64_t max_message_id)
+psyc_slave_connected (void *cls, int result, uint64_t max_message_id)
{
struct Guest *gst = cls;
struct Place *plc = &gst->plc;
plc->max_message_id = max_message_id;
plc->is_ready = GNUNET_YES;
- struct CountersResult res;
+ struct GNUNET_PSYC_CountersResultMessage res;
res.header.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_GUEST_ENTER_ACK);
res.header.size = htons (sizeof (res));
- res.result_code = htonl (GNUNET_OK);
+ res.result_code = htonl (result - INT32_MIN);
res.max_message_id = GNUNET_htonll (plc->max_message_id);
client_send_msg (plc, &res.header);
&psyc_master_started,
&psyc_recv_join_request,
&psyc_recv_message, NULL, hst);
+ hst->plc.channel = GNUNET_PSYC_master_get_channel (hst->master);
}
else
{
plc = &hst->plc;
- struct CountersResult res;
+ struct GNUNET_PSYC_CountersResultMessage res;
res.header.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_HOST_ENTER_ACK);
res.header.size = htons (sizeof (res));
res.result_code = htonl (GNUNET_OK);
sizeof (*req), relay_size, join_msg_size, req_size);
GNUNET_break (0);
GNUNET_SERVER_client_disconnect (client);
+ GNUNET_free (gst);
return;
}
if (0 < gst->relay_count)
if (NULL == plc_gst)
{
plc_gst = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
- GNUNET_CONTAINER_multihashmap_put (place_guests, &plc->pub_key_hash, plc_gst,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+ (void) GNUNET_CONTAINER_multihashmap_put (place_guests, &plc->pub_key_hash, plc_gst,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
}
- GNUNET_CONTAINER_multihashmap_put (plc_gst, &gst->pub_key_hash, plc,
+ (void) GNUNET_CONTAINER_multihashmap_put (plc_gst, &gst->pub_key_hash, gst,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
- GNUNET_CONTAINER_multihashmap_put (guests, &plc->pub_key_hash, plc,
+ (void) GNUNET_CONTAINER_multihashmap_put (guests, &plc->pub_key_hash, gst,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
gst->slave
= GNUNET_PSYC_slave_join (cfg, &plc->pub_key, &gst->priv_key,
&gst->origin, gst->relay_count, gst->relays,
&psyc_recv_message, NULL, &psyc_slave_connected,
&psyc_recv_join_dcsn, gst, join_msg);
+ gst->plc.channel = GNUNET_PSYC_slave_get_channel (gst->slave);
}
else
{
plc = &gst->plc;
- struct CountersResult res;
+ struct GNUNET_PSYC_CountersResultMessage res;
res.header.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_GUEST_ENTER_ACK);
res.header.size = htons (sizeof (res));
res.result_code = htonl (GNUNET_OK);
struct FragmentTransmitQueue *tmit_frag)
{
uint16_t psize = ntohs (tmit_frag->next_part->size);
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "%p psyc_transmit_queue_next_part: %x + %u - %x = %u < %u\n",
- plc, tmit_frag->next_part, psize, &tmit_frag[1],
- (char *) tmit_frag->next_part + psize - ((char *) &tmit_frag[1]),
- tmit_frag->size);
if ((char *) tmit_frag->next_part + psize - ((char *) &tmit_frag[1])
< tmit_frag->size)
{
break;
case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
- case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
*data_size = 0;
ret = GNUNET_YES;
break;
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
+ *data_size = 0;
+ ret = GNUNET_SYSERR;
+ break;
+
default:
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"%p psyc_transmit_notify_data: unexpected message part of type %u.\n",
ret = GNUNET_SYSERR;
}
- if (GNUNET_SYSERR == ret)
+ if (GNUNET_SYSERR == ret && GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL != ptype)
{
*data_size = 0;
tmit_msg = psyc_transmit_queue_next_msg (plc, tmit_msg);
}
else
{
- psyc_transmit_queue_next_part (plc, tmit_msg, tmit_frag);
+ tmit_frag = psyc_transmit_queue_next_part (plc, tmit_msg, tmit_frag);
+ if (NULL != tmit_frag)
+ {
+ struct GNUNET_MessageHeader *pmsg = tmit_frag->next_part;
+ ptype = ntohs (pmsg->type);
+ switch (ptype)
+ {
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
+ ret = GNUNET_YES;
+ break;
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
+ ret = GNUNET_SYSERR;
+ break;
+ }
+ switch (ptype)
+ {
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
+ tmit_frag = psyc_transmit_queue_next_part (plc, tmit_msg, tmit_frag);
+ }
+ }
if (NULL == tmit_msg->frags_head
&& GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END <= ptype)
*data_size = mod_size;
memcpy (data, &pmod[1], mod_size);
ret = GNUNET_NO;
-#if REMOVE // FIXME
- ret = (mod_size - strnlen ((char *) &pmod[1], mod_size) - 1
- == *full_value_size)
- ? GNUNET_YES
- : GNUNET_NO;
-#endif
break;
}
}
else
{
- psyc_transmit_queue_next_part (plc, tmit_msg, tmit_frag);
+ if (GNUNET_YES != ret)
+ psyc_transmit_queue_next_part (plc, tmit_msg, tmit_frag);
if (NULL == tmit_msg->frags_head
&& GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END <= ptype)
uint16_t psize = ntohs (pmsg->size);
*pmeth = (struct GNUNET_PSYC_MessageMethod *) pmsg;
-
if (psize < sizeof (**pmeth) + 1 || '\0' != *((char *) *pmeth + psize - 1))
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
GNUNET_break (0);
return GNUNET_SYSERR;
}
+
psyc_transmit_queue_next_part (plc, tmit_msg, tmit_frag);
return GNUNET_OK;
}
}
+/**
+ * A historic message result arrived from PSYC.
+ */
+static void
+psyc_recv_history_message (void *cls,
+ uint64_t message_id,
+ uint32_t flags,
+ const struct GNUNET_PSYC_MessageHeader *msg)
+{
+ struct OperationClosure *opcls = cls;
+ struct Place *plc = opcls->plc;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Received historic message #%" PRId64 " (flags: %x)\n",
+ plc, message_id, flags);
+
+ uint16_t size = ntohs (msg->header.size);
+
+ struct GNUNET_OperationResultMessage *
+ res = GNUNET_malloc (sizeof (*res) + size);
+ res->header.size = htons (sizeof (*res) + size);
+ res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT);
+ res->op_id = opcls->op_id;
+ res->result_code = GNUNET_htonll (GNUNET_OK);
+
+ memcpy (&res[1], msg, size);
+
+ /** @todo FIXME: send only to requesting client */
+ client_send_msg (plc, &res->header);
+}
+
+
+static void
+psyc_recv_history_result (void *cls, int64_t result,
+ const void *err_msg, uint16_t err_msg_size)
+{
+ struct OperationClosure *opcls = cls;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p History replay #%" PRIu64 ": "
+ "PSYCSTORE returned %" PRId64 " (%.*s)\n",
+ opcls->plc, GNUNET_ntohll (opcls->op_id), result, err_msg_size, err_msg);
+
+ // FIXME: place might have been destroyed
+ client_send_result (opcls->client, opcls->op_id, result, err_msg, err_msg_size);
+}
+
+
+/**
+ * Client requests channel history.
+ */
+static void
+client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
+{
+ struct Client *
+ ctx = GNUNET_SERVER_client_get_user_context (client, struct Client);
+ GNUNET_assert (NULL != ctx);
+ struct Place *plc = ctx->plc;
+
+ const struct GNUNET_PSYC_HistoryRequestMessage *
+ req = (const struct GNUNET_PSYC_HistoryRequestMessage *) msg;
+ uint16_t size = ntohs (msg->size);
+ const char *method_prefix = (const char *) &req[1];
+
+ if (size < sizeof (*req) + 1
+ || '\0' != method_prefix[size - sizeof (*req) - 1])
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "%p History replay #%" PRIu64 ": "
+ "invalid method prefix. size: %u < %u?\n",
+ plc, GNUNET_ntohll (req->op_id), size, sizeof (*req) + 1);
+ GNUNET_break (0);
+ GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ return;
+ }
+
+ struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls));
+ opcls->client = client;
+ opcls->plc = plc;
+ opcls->op_id = req->op_id;
+ opcls->flags = ntohl (req->flags);
+
+ if (0 == req->message_limit)
+ GNUNET_PSYC_channel_history_replay (plc->channel,
+ GNUNET_ntohll (req->start_message_id),
+ GNUNET_ntohll (req->end_message_id),
+ method_prefix, opcls->flags,
+ &psyc_recv_history_message, NULL,
+ &psyc_recv_history_result, opcls);
+ else
+ GNUNET_PSYC_channel_history_replay_latest (plc->channel,
+ GNUNET_ntohll (req->message_limit),
+ method_prefix, opcls->flags,
+ &psyc_recv_history_message, NULL,
+ &psyc_recv_history_result, opcls);
+
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
+}
+
+
+static const struct GNUNET_SERVER_MessageHandler handlers[] = {
+ { &client_recv_host_enter, NULL,
+ GNUNET_MESSAGE_TYPE_SOCIAL_HOST_ENTER, 0 },
+
+ { &client_recv_guest_enter, NULL,
+ GNUNET_MESSAGE_TYPE_SOCIAL_GUEST_ENTER, 0 },
+
+ { &client_recv_join_decision, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
+
+ { &client_recv_psyc_message, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
+
+ { &client_recv_history_replay, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY, 0 },
+#if FIXME
+ { &client_recv_state_get, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
+
+ { &client_recv_state_get_prefix, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 },
+#endif
+ { NULL, NULL, 0, 0 }
+};
+
+
/**
* Initialize the PSYC service.
*
run (void *cls, struct GNUNET_SERVER_Handle *server,
const struct GNUNET_CONFIGURATION_Handle *c)
{
- static const struct GNUNET_SERVER_MessageHandler handlers[] = {
- { &client_recv_host_enter, NULL,
- GNUNET_MESSAGE_TYPE_SOCIAL_HOST_ENTER, 0 },
-
- { &client_recv_guest_enter, NULL,
- GNUNET_MESSAGE_TYPE_SOCIAL_GUEST_ENTER, 0 },
-
- { &client_recv_join_decision, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
-
- { &client_recv_psyc_message, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 }
- };
-
cfg = c;
stats = GNUNET_STATISTICS_create ("social", cfg);
hosts = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);