-improve indentation, reduce duplication of PIDs in core's neighbour map
[oweals/gnunet.git] / src / social / gnunet-service-social.c
index 90d12200da5a78328f82b7b00a209bf865bea095..4211772f1e7ff295fdae9a93d865928806b3089e 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * This file is part of GNUnet
- * (C) 2013 Christian Grothoff (and other contributing authors)
+ * Copyright (C) 2013 Christian Grothoff (and other contributing authors)
  *
  * GNUnet is free software; you can redistribute it and/or modify
  * it under the terms of the GNU General Public License as published
@@ -14,8 +14,8 @@
  *
  * You should have received a copy of the GNU General Public License
  * along with GNUnet; see the file COPYING.  If not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 02111-1307, USA.
+ * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
  */
 
 /**
@@ -137,6 +137,8 @@ struct Place
   struct MessageTransmitQueue *tmit_msgs_head;
   struct MessageTransmitQueue *tmit_msgs_tail;
 
+  struct GNUNET_PSYC_Channel *channel;
+
   /**
    * Public key of the channel.
    */
@@ -288,6 +290,15 @@ struct Client
 };
 
 
+struct OperationClosure
+{
+  struct GNUNET_SERVER_Client *client;
+  struct Place *plc;
+  uint64_t op_id;
+  uint32_t flags;
+};
+
+
 static int
 psyc_transmit_message (struct Place *plc);
 
@@ -450,7 +461,7 @@ static void
 client_send_msg (const struct Place *plc,
                  const struct GNUNET_MessageHeader *msg)
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "%p Sending message to clients.\n", plc);
 
   struct ClientListItem *cli = plc->clients_head;
@@ -463,21 +474,61 @@ client_send_msg (const struct Place *plc,
 }
 
 
+/**
+ * Send a result code back to the client.
+ *
+ * @param client
+ *        Client that should receive the result code.
+ * @param result_code
+ *        Code to transmit.
+ * @param op_id
+ *        Operation ID in network byte order.
+ * @param data
+ *        Data payload or NULL.
+ * @param data_size
+ *        Size of @a data.
+ */
+static void
+client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id,
+                    int64_t result_code, const void *data, uint16_t data_size)
+{
+  struct GNUNET_OperationResultMessage *res;
+
+  res = GNUNET_malloc (sizeof (*res) + data_size);
+  res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
+  res->header.size = htons (sizeof (*res) + data_size);
+  res->result_code = GNUNET_htonll (result_code);
+  res->op_id = op_id;
+  if (0 < data_size)
+    memcpy (&res[1], data, data_size);
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "%p Sending result to client for operation #%" PRIu64 ": "
+              "%" PRId64 " (size: %u)\n",
+             client, GNUNET_ntohll (op_id), result_code, data_size);
+
+  GNUNET_SERVER_notification_context_add (nc, client);
+  GNUNET_SERVER_notification_context_unicast (nc, client, &res->header,
+                                              GNUNET_NO);
+  GNUNET_free (res);
+}
+
+
 /**
  * Called after a PSYC master is started.
  */
 static void
-psyc_master_started (void *cls, uint64_t max_message_id)
+psyc_master_started (void *cls, int result, uint64_t max_message_id)
 {
   struct Host *hst = cls;
   struct Place *plc = &hst->plc;
   plc->max_message_id = max_message_id;
   plc->is_ready = GNUNET_YES;
 
-  struct CountersResult res;
+  struct GNUNET_PSYC_CountersResultMessage res;
   res.header.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_HOST_ENTER_ACK);
   res.header.size = htons (sizeof (res));
-  res.result_code = htonl (GNUNET_OK);
+  res.result_code = htonl (result - INT32_MIN);
   res.max_message_id = GNUNET_htonll (plc->max_message_id);
 
   client_send_msg (plc, &res.header);
@@ -507,17 +558,17 @@ psyc_recv_join_request (void *cls,
  * Called after a PSYC slave is connected.
  */
 static void
-psyc_slave_connected (void *cls, uint64_t max_message_id)
+psyc_slave_connected (void *cls, int result, uint64_t max_message_id)
 {
   struct Guest *gst = cls;
   struct Place *plc = &gst->plc;
   plc->max_message_id = max_message_id;
   plc->is_ready = GNUNET_YES;
 
-  struct CountersResult res;
+  struct GNUNET_PSYC_CountersResultMessage res;
   res.header.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_GUEST_ENTER_ACK);
   res.header.size = htons (sizeof (res));
-  res.result_code = htonl (GNUNET_OK);
+  res.result_code = htonl (result - INT32_MIN);
   res.max_message_id = GNUNET_htonll (plc->max_message_id);
 
   client_send_msg (plc, &res.header);
@@ -603,12 +654,13 @@ client_recv_host_enter (void *cls, struct GNUNET_SERVER_Client *client,
                                             &psyc_master_started,
                                             &psyc_recv_join_request,
                                             &psyc_recv_message, NULL, hst);
+    hst->plc.channel = GNUNET_PSYC_master_get_channel (hst->master);
   }
   else
   {
     plc = &hst->plc;
 
-    struct CountersResult res;
+    struct GNUNET_PSYC_CountersResultMessage res;
     res.header.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_HOST_ENTER_ACK);
     res.header.size = htons (sizeof (res));
     res.result_code = htonl (GNUNET_OK);
@@ -690,6 +742,7 @@ client_recv_guest_enter (void *cls, struct GNUNET_SERVER_Client *client,
                   sizeof (*req), relay_size, join_msg_size, req_size);
       GNUNET_break (0);
       GNUNET_SERVER_client_disconnect (client);
+      GNUNET_free (gst);
       return;
     }
     if (0 < gst->relay_count)
@@ -707,24 +760,25 @@ client_recv_guest_enter (void *cls, struct GNUNET_SERVER_Client *client,
     if (NULL == plc_gst)
     {
       plc_gst = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
-      GNUNET_CONTAINER_multihashmap_put (place_guests, &plc->pub_key_hash, plc_gst,
-                                         GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+      (void) GNUNET_CONTAINER_multihashmap_put (place_guests, &plc->pub_key_hash, plc_gst,
+                                                GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
     }
-    GNUNET_CONTAINER_multihashmap_put (plc_gst, &gst->pub_key_hash, plc,
+    (void) GNUNET_CONTAINER_multihashmap_put (plc_gst, &gst->pub_key_hash, gst,
                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
-    GNUNET_CONTAINER_multihashmap_put (guests, &plc->pub_key_hash, plc,
+    (void) GNUNET_CONTAINER_multihashmap_put (guests, &plc->pub_key_hash, gst,
                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
     gst->slave
       = GNUNET_PSYC_slave_join (cfg, &plc->pub_key, &gst->priv_key,
                                 &gst->origin, gst->relay_count, gst->relays,
                                 &psyc_recv_message, NULL, &psyc_slave_connected,
                                 &psyc_recv_join_dcsn, gst, join_msg);
+    gst->plc.channel = GNUNET_PSYC_slave_get_channel (gst->slave);
   }
   else
   {
     plc = &gst->plc;
 
-    struct CountersResult res;
+    struct GNUNET_PSYC_CountersResultMessage res;
     res.header.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_GUEST_ENTER_ACK);
     res.header.size = htons (sizeof (res));
     res.result_code = htonl (GNUNET_OK);
@@ -857,11 +911,6 @@ psyc_transmit_queue_next_part (struct Place *plc,
                                struct FragmentTransmitQueue *tmit_frag)
 {
   uint16_t psize = ntohs (tmit_frag->next_part->size);
-  GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-              "%p psyc_transmit_queue_next_part: %x + %u - %x = %u < %u\n",
-              plc, tmit_frag->next_part, psize, &tmit_frag[1],
-              (char *) tmit_frag->next_part + psize - ((char *) &tmit_frag[1]),
-              tmit_frag->size);
   if ((char *) tmit_frag->next_part + psize - ((char *) &tmit_frag[1])
       < tmit_frag->size)
   {
@@ -952,11 +1001,15 @@ psyc_transmit_notify_data (void *cls, uint16_t *data_size, void *data)
     break;
 
   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
-  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
     *data_size = 0;
     ret = GNUNET_YES;
     break;
 
+  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
+    *data_size = 0;
+    ret = GNUNET_SYSERR;
+    break;
+
   default:
     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                 "%p psyc_transmit_notify_data: unexpected message part of type %u.\n",
@@ -964,7 +1017,7 @@ psyc_transmit_notify_data (void *cls, uint16_t *data_size, void *data)
     ret = GNUNET_SYSERR;
   }
 
-  if (GNUNET_SYSERR == ret)
+  if (GNUNET_SYSERR == ret && GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL != ptype)
   {
     *data_size = 0;
     tmit_msg = psyc_transmit_queue_next_msg (plc, tmit_msg);
@@ -975,7 +1028,27 @@ psyc_transmit_notify_data (void *cls, uint16_t *data_size, void *data)
   }
   else
   {
-    psyc_transmit_queue_next_part (plc, tmit_msg, tmit_frag);
+    tmit_frag = psyc_transmit_queue_next_part (plc, tmit_msg, tmit_frag);
+    if (NULL != tmit_frag)
+    {
+      struct GNUNET_MessageHeader *pmsg = tmit_frag->next_part;
+      ptype = ntohs (pmsg->type);
+      switch (ptype)
+      {
+      case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
+        ret = GNUNET_YES;
+        break;
+      case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
+        ret = GNUNET_SYSERR;
+        break;
+      }
+      switch (ptype)
+      {
+      case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
+      case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
+        tmit_frag = psyc_transmit_queue_next_part (plc, tmit_msg, tmit_frag);
+      }
+    }
 
     if (NULL == tmit_msg->frags_head
         && GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END <= ptype)
@@ -1060,12 +1133,6 @@ psyc_transmit_notify_mod (void *cls, uint16_t *data_size, void *data,
     *data_size = mod_size;
     memcpy (data, &pmod[1], mod_size);
     ret = GNUNET_NO;
-#if REMOVE // FIXME
-    ret = (mod_size - strnlen ((char *) &pmod[1], mod_size) - 1
-           == *full_value_size)
-      ? GNUNET_YES
-      : GNUNET_NO;
-#endif
     break;
   }
 
@@ -1120,7 +1187,8 @@ psyc_transmit_notify_mod (void *cls, uint16_t *data_size, void *data,
   }
   else
   {
-    psyc_transmit_queue_next_part (plc, tmit_msg, tmit_frag);
+    if (GNUNET_YES != ret)
+      psyc_transmit_queue_next_part (plc, tmit_msg, tmit_frag);
 
     if (NULL == tmit_msg->frags_head
         && GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END <= ptype)
@@ -1241,7 +1309,6 @@ psyc_transmit_queue_next_method (struct Place *plc,
 
   uint16_t psize = ntohs (pmsg->size);
   *pmeth = (struct GNUNET_PSYC_MessageMethod *) pmsg;
-
   if (psize < sizeof (**pmeth) + 1 || '\0' != *((char *) *pmeth + psize - 1))
   {
     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
@@ -1253,6 +1320,7 @@ psyc_transmit_queue_next_method (struct Place *plc,
     GNUNET_break (0);
     return GNUNET_SYSERR;
   }
+
   psyc_transmit_queue_next_part (plc, tmit_msg, tmit_frag);
   return GNUNET_OK;
 }
@@ -1467,6 +1535,132 @@ client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
 }
 
 
+/**
+ * A historic message result arrived from PSYC.
+ */
+static void
+psyc_recv_history_message (void *cls,
+                           uint64_t message_id,
+                           uint32_t flags,
+                           const struct GNUNET_PSYC_MessageHeader *msg)
+{
+  struct OperationClosure *opcls = cls;
+  struct Place *plc = opcls->plc;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%p Received historic message #%" PRId64 " (flags: %x)\n",
+              plc, message_id, flags);
+
+  uint16_t size = ntohs (msg->header.size);
+
+  struct GNUNET_OperationResultMessage *
+    res = GNUNET_malloc (sizeof (*res) + size);
+  res->header.size = htons (sizeof (*res) + size);
+  res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT);
+  res->op_id = opcls->op_id;
+  res->result_code = GNUNET_htonll (GNUNET_OK);
+
+  memcpy (&res[1], msg, size);
+
+  /** @todo FIXME: send only to requesting client */
+  client_send_msg (plc, &res->header);
+}
+
+
+static void
+psyc_recv_history_result (void *cls, int64_t result,
+                          const void *err_msg, uint16_t err_msg_size)
+{
+  struct OperationClosure *opcls = cls;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%p History replay #%" PRIu64 ": "
+              "PSYCSTORE returned %" PRId64 " (%.*s)\n",
+              opcls->plc, GNUNET_ntohll (opcls->op_id), result, err_msg_size, err_msg);
+
+  // FIXME: place might have been destroyed
+  client_send_result (opcls->client, opcls->op_id, result, err_msg, err_msg_size);
+}
+
+
+/**
+ * Client requests channel history.
+ */
+static void
+client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client,
+                            const struct GNUNET_MessageHeader *msg)
+{
+  struct Client *
+    ctx = GNUNET_SERVER_client_get_user_context (client, struct Client);
+  GNUNET_assert (NULL != ctx);
+  struct Place *plc = ctx->plc;
+
+  const struct GNUNET_PSYC_HistoryRequestMessage *
+    req = (const struct GNUNET_PSYC_HistoryRequestMessage *) msg;
+  uint16_t size = ntohs (msg->size);
+  const char *method_prefix = (const char *) &req[1];
+
+  if (size < sizeof (*req) + 1
+      || '\0' != method_prefix[size - sizeof (*req) - 1])
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "%p History replay #%" PRIu64 ": "
+                "invalid method prefix. size: %u < %u?\n",
+                plc, GNUNET_ntohll (req->op_id), size, sizeof (*req) + 1);
+    GNUNET_break (0);
+    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    return;
+  }
+
+  struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls));
+  opcls->client = client;
+  opcls->plc = plc;
+  opcls->op_id = req->op_id;
+  opcls->flags = ntohl (req->flags);
+
+  if (0 == req->message_limit)
+    GNUNET_PSYC_channel_history_replay (plc->channel,
+                                        GNUNET_ntohll (req->start_message_id),
+                                        GNUNET_ntohll (req->end_message_id),
+                                        method_prefix, opcls->flags,
+                                        &psyc_recv_history_message, NULL,
+                                        &psyc_recv_history_result, opcls);
+  else
+    GNUNET_PSYC_channel_history_replay_latest (plc->channel,
+                                               GNUNET_ntohll (req->message_limit),
+                                               method_prefix, opcls->flags,
+                                               &psyc_recv_history_message, NULL,
+                                               &psyc_recv_history_result, opcls);
+
+  GNUNET_SERVER_receive_done (client, GNUNET_OK);
+}
+
+
+static const struct GNUNET_SERVER_MessageHandler handlers[] = {
+  { &client_recv_host_enter, NULL,
+    GNUNET_MESSAGE_TYPE_SOCIAL_HOST_ENTER, 0 },
+
+  { &client_recv_guest_enter, NULL,
+    GNUNET_MESSAGE_TYPE_SOCIAL_GUEST_ENTER, 0 },
+
+  { &client_recv_join_decision, NULL,
+    GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
+
+  { &client_recv_psyc_message, NULL,
+    GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
+
+  { &client_recv_history_replay, NULL,
+    GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY, 0 },
+#if FIXME
+  { &client_recv_state_get, NULL,
+    GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
+
+  { &client_recv_state_get_prefix, NULL,
+    GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 },
+#endif
+  { NULL, NULL, 0, 0 }
+};
+
+
 /**
  * Initialize the PSYC service.
  *
@@ -1478,20 +1672,6 @@ static void
 run (void *cls, struct GNUNET_SERVER_Handle *server,
      const struct GNUNET_CONFIGURATION_Handle *c)
 {
-  static const struct GNUNET_SERVER_MessageHandler handlers[] = {
-    { &client_recv_host_enter, NULL,
-      GNUNET_MESSAGE_TYPE_SOCIAL_HOST_ENTER, 0 },
-
-    { &client_recv_guest_enter, NULL,
-      GNUNET_MESSAGE_TYPE_SOCIAL_GUEST_ENTER, 0 },
-
-    { &client_recv_join_decision, NULL,
-      GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
-
-    { &client_recv_psyc_message, NULL,
-      GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 }
-  };
-
   cfg = c;
   stats = GNUNET_STATISTICS_create ("social", cfg);
   hosts = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);