/*
* This file is part of GNUnet
- * (C) 2013 Christian Grothoff (and other contributing authors)
+ * Copyright (C) 2013 Christian Grothoff (and other contributing authors)
*
* GNUnet is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published
*
* You should have received a copy of the GNU General Public License
* along with GNUnet; see the file COPYING. If not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 02111-1307, USA.
+ * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
*/
/**
struct MessageTransmitQueue *tmit_msgs_head;
struct MessageTransmitQueue *tmit_msgs_tail;
+ struct GNUNET_PSYC_Channel *channel;
+
/**
* Public key of the channel.
*/
};
+struct OperationClosure
+{
+ struct GNUNET_SERVER_Client *client;
+ struct Place *plc;
+ uint64_t op_id;
+ uint32_t flags;
+};
+
+
static int
psyc_transmit_message (struct Place *plc);
client_send_msg (const struct Place *plc,
const struct GNUNET_MessageHeader *msg)
{
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%p Sending message to clients.\n", plc);
struct ClientListItem *cli = plc->clients_head;
}
+/**
+ * Send a result code back to the client.
+ *
+ * @param client
+ * Client that should receive the result code.
+ * @param result_code
+ * Code to transmit.
+ * @param op_id
+ * Operation ID in network byte order.
+ * @param data
+ * Data payload or NULL.
+ * @param data_size
+ * Size of @a data.
+ */
+static void
+client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id,
+ int64_t result_code, const void *data, uint16_t data_size)
+{
+ struct GNUNET_OperationResultMessage *res;
+
+ res = GNUNET_malloc (sizeof (*res) + data_size);
+ res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
+ res->header.size = htons (sizeof (*res) + data_size);
+ res->result_code = GNUNET_htonll (result_code);
+ res->op_id = op_id;
+ if (0 < data_size)
+ memcpy (&res[1], data, data_size);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Sending result to client for operation #%" PRIu64 ": "
+ "%" PRId64 " (size: %u)\n",
+ client, GNUNET_ntohll (op_id), result_code, data_size);
+
+ GNUNET_SERVER_notification_context_add (nc, client);
+ GNUNET_SERVER_notification_context_unicast (nc, client, &res->header,
+ GNUNET_NO);
+ GNUNET_free (res);
+}
+
+
/**
* Called after a PSYC master is started.
*/
&psyc_master_started,
&psyc_recv_join_request,
&psyc_recv_message, NULL, hst);
+ hst->plc.channel = GNUNET_PSYC_master_get_channel (hst->master);
}
else
{
&gst->origin, gst->relay_count, gst->relays,
&psyc_recv_message, NULL, &psyc_slave_connected,
&psyc_recv_join_dcsn, gst, join_msg);
+ gst->plc.channel = GNUNET_PSYC_slave_get_channel (gst->slave);
}
else
{
}
+/**
+ * A historic message result arrived from PSYC.
+ */
+static void
+psyc_recv_history_message (void *cls,
+ uint64_t message_id,
+ uint32_t flags,
+ const struct GNUNET_PSYC_MessageHeader *msg)
+{
+ struct OperationClosure *opcls = cls;
+ struct Place *plc = opcls->plc;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Received historic message #%" PRId64 " (flags: %x)\n",
+ plc, message_id, flags);
+
+ uint16_t size = ntohs (msg->header.size);
+
+ struct GNUNET_OperationResultMessage *
+ res = GNUNET_malloc (sizeof (*res) + size);
+ res->header.size = htons (sizeof (*res) + size);
+ res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT);
+ res->op_id = opcls->op_id;
+ res->result_code = GNUNET_htonll (GNUNET_OK);
+
+ memcpy (&res[1], msg, size);
+
+ /** @todo FIXME: send only to requesting client */
+ client_send_msg (plc, &res->header);
+}
+
+
+static void
+psyc_recv_history_result (void *cls, int64_t result,
+ const void *err_msg, uint16_t err_msg_size)
+{
+ struct OperationClosure *opcls = cls;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p History replay #%" PRIu64 ": "
+ "PSYCSTORE returned %" PRId64 " (%.*s)\n",
+ opcls->plc, GNUNET_ntohll (opcls->op_id), result, err_msg_size, err_msg);
+
+ // FIXME: place might have been destroyed
+ client_send_result (opcls->client, opcls->op_id, result, err_msg, err_msg_size);
+}
+
+
+/**
+ * Client requests channel history.
+ */
+static void
+client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
+{
+ struct Client *
+ ctx = GNUNET_SERVER_client_get_user_context (client, struct Client);
+ GNUNET_assert (NULL != ctx);
+ struct Place *plc = ctx->plc;
+
+ const struct GNUNET_PSYC_HistoryRequestMessage *
+ req = (const struct GNUNET_PSYC_HistoryRequestMessage *) msg;
+ uint16_t size = ntohs (msg->size);
+ const char *method_prefix = (const char *) &req[1];
+
+ if (size < sizeof (*req) + 1
+ || '\0' != method_prefix[size - sizeof (*req) - 1])
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "%p History replay #%" PRIu64 ": "
+ "invalid method prefix. size: %u < %u?\n",
+ plc, GNUNET_ntohll (req->op_id), size, sizeof (*req) + 1);
+ GNUNET_break (0);
+ GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ return;
+ }
+
+ struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls));
+ opcls->client = client;
+ opcls->plc = plc;
+ opcls->op_id = req->op_id;
+ opcls->flags = ntohl (req->flags);
+
+ if (0 == req->message_limit)
+ GNUNET_PSYC_channel_history_replay (plc->channel,
+ GNUNET_ntohll (req->start_message_id),
+ GNUNET_ntohll (req->end_message_id),
+ method_prefix, opcls->flags,
+ &psyc_recv_history_message, NULL,
+ &psyc_recv_history_result, opcls);
+ else
+ GNUNET_PSYC_channel_history_replay_latest (plc->channel,
+ GNUNET_ntohll (req->message_limit),
+ method_prefix, opcls->flags,
+ &psyc_recv_history_message, NULL,
+ &psyc_recv_history_result, opcls);
+
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
+}
+
+
+static const struct GNUNET_SERVER_MessageHandler handlers[] = {
+ { &client_recv_host_enter, NULL,
+ GNUNET_MESSAGE_TYPE_SOCIAL_HOST_ENTER, 0 },
+
+ { &client_recv_guest_enter, NULL,
+ GNUNET_MESSAGE_TYPE_SOCIAL_GUEST_ENTER, 0 },
+
+ { &client_recv_join_decision, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
+
+ { &client_recv_psyc_message, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
+
+ { &client_recv_history_replay, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY, 0 },
+#if FIXME
+ { &client_recv_state_get, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
+
+ { &client_recv_state_get_prefix, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 },
+#endif
+ { NULL, NULL, 0, 0 }
+};
+
+
/**
* Initialize the PSYC service.
*
run (void *cls, struct GNUNET_SERVER_Handle *server,
const struct GNUNET_CONFIGURATION_Handle *c)
{
- static const struct GNUNET_SERVER_MessageHandler handlers[] = {
- { &client_recv_host_enter, NULL,
- GNUNET_MESSAGE_TYPE_SOCIAL_HOST_ENTER, 0 },
-
- { &client_recv_guest_enter, NULL,
- GNUNET_MESSAGE_TYPE_SOCIAL_GUEST_ENTER, 0 },
-
- { &client_recv_join_decision, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
-
- { &client_recv_psyc_message, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 }
- };
-
cfg = c;
stats = GNUNET_STATISTICS_create ("social", cfg);
hosts = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);