-misc stream hxing
authorChristian Grothoff <christian@grothoff.org>
Fri, 3 Feb 2012 10:01:23 +0000 (10:01 +0000)
committerChristian Grothoff <christian@grothoff.org>
Fri, 3 Feb 2012 10:01:23 +0000 (10:01 +0000)
src/stream/stream_api.c
src/stream/stream_protocol.h

index 25b8763339e3538049c4c9a8625c2de848b1c28f..3dcb2d85aa10b87b5410f7c566a41834a70a8b64 100644 (file)
  * @brief Implementation of the stream library
  * @author Sree Harsha Totakura
  */
-
+#include "platform.h"
 #include "gnunet_common.h"
 #include "gnunet_stream_lib.h"
 
-
 /**
  * states in the Protocol
  */
@@ -103,12 +102,12 @@ struct GNUNET_STREAM_Socket
   /**
    * The session id associated with this stream connection
    */
-  unint32_t session_id;
+  uint32_t session_id;
 
   /**
    * The peer identity of the peer at the other end of the stream
    */
-  GNUNET_PeerIdentity *other_peer;
+  GNUNET_PeerIdentity other_peer;
 
   /**
    * Stream open closure
@@ -181,35 +180,6 @@ struct GNUNET_STREAM_ListenSocket
 static unsigned int default_timeout = 300;
 
 
-/**
- * Converts message fields from host byte order to network byte order
- *
- * @param msg the message to convert
- */
-static void
-GNUNET_STREAM_convert_message_h2n (struct GNUNET_STREAM_MessageHeader *msg)
-{
-  /* Add type specific message conversion here  */
-
-  msg->size = htons (msg->size);
-  msg->type = htons (msg->type);
-}
-
-
-/**
- * Converts message fields from network byte order to host byte order
- *
- * @param msg the messeage to convert
- */
-static void
-GNUNET_STREAM_convert_message_n2h (struct GNUNET_STREAM_MessageHeader *msg)
-{
-  msg->size = ntohs (msg->size);
-  msg->type = ntohs (msg->type);
-
-  /* Add type specific message conversion here  */
-}
-
 /**
  * Callback function from send_message
  *
@@ -221,24 +191,28 @@ GNUNET_STREAM_convert_message_n2h (struct GNUNET_STREAM_MessageHeader *msg)
 static size_t
 send_message_notify (void *cls, size_t size, void *buf)
 {
-  struct GNUNET_STREAM_Socket *socket;
+  struct GNUNET_STREAM_Socket *socket = cls;
+  size_t ret;
 
-  socket = (struct GNUNET_STREAM_Socket *) cls;
   socket->transmit_handle = NULL; /* Remove the transmit handle */
   if (0 == size)                /* Socket closed? */
     {
+      // statistics ("message timeout")
+      
+      
       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-                  "Message not sent as tunnel was closed \n");
+                 "Message not sent as tunnel was closed \n");
+      ret = 0;
     }
   else                          /* Size is more or equal to what was requested */
     {
-      size = socket->message->size;
-      GNUNET_STREAM_convert_message_h2n (socket->message) /* Convert h2n */
-        memcpy (buf, socket->message, size);
+      ret = ntohs (socket->message->size);
+      GNUNET_assert (size >= ret);
+      memcpy (buf, socket->message, ret);
     }
   GNUNET_free (socket->message); /* Free the message memory */
   socket->message = NULL;
-  return size;
+  return ret;
 }
 
 
@@ -258,8 +232,8 @@ send_message (struct GNUNET_STREAM_Socket *socket,
                                        0, /* Corking */
                                        timeout, /* FIXME: Maxdelay */
                                        socket->other_peer,
-                                       message->size,
-                                       send_message_notify,
+                                       ntohs (message->size),
+                                       &send_message_notify,
                                        socket);
 }
 
@@ -295,24 +269,58 @@ handle_data (void *cls,
              const struct GNUNET_MessageHeader *message,
              const struct GNUNET_ATS_Information*atsi)
 {
+  struct GNUNET_STREAM_Socket *socket = cls;
   uint16_t size;
-  struct GNUNET_STREAM_MessageHeader *message_copy;
-  
+  const struct GNUNET_STREAM_DataMessage *data_msg;
+  const void *payload;
+
   size = ntohs (message->size);
-  message_copy = GNUNET_malloc (size);
-  memcpy (message_copy, message, size);
-  GNUNET_STREAM_convert_message_n2h (message_copy);
+  if (size < sizeof (struct GNUNET_STREAM_DataMessage))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  data_msg = (const struct GNUNET_STREAM_DataMessage *) message;
+  size -= sizeof (Struct GNUNET_STREAM_DataMessage);
+  payload = &data_msg[1];
+  /* ... */
   
-  route_message (message_copy);
+  return GNUNET_OK;
+}
+
+
+/**
+ * Message Handler for mesh
+ *
+ * @param cls closure (set from GNUNET_MESH_connect)
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx place to store local state associated with the tunnel
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ *         GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+handle_ack (void *cls,
+           struct GNUNET_MESH_Tunnel *tunnel,
+           void **tunnel_ctx,
+           const struct GNUNET_PeerIdentity *sender,
+           const struct GNUNET_MessageHeader *message,
+           const struct GNUNET_ATS_Information*atsi)
+{
+  struct GNUNET_STREAM_Socket *socket = cls;
+  const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
+
 }
 
 
 static struct GNUNET_MESH_MessageHandler message_handlers[] = {
   {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
-  {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_ACK, 0},
-  {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 0},
-  {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK, 0},
-  {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_RESET, 0},
+  {&handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, sizeof (struct GNUNET_STREAM_AckMessage) },
+  {&handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 0},
+  {&handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK, 0},
+  {&handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET, 0},
   {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE, 0},
   {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK, 0},
   {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE, 0},
@@ -335,9 +343,8 @@ mesh_peer_connect_callback (void *cls,
                             const struct GNUNET_PeerIdentity *peer,
                             const struct GNUNET_ATS_Information * atsi)
 {
-  const struct GNUNET_STREAM_Socket *socket;
-  
-  socket = (const struct GNUNET_STREAM_Socket *) cls;
+  const struct GNUNET_STREAM_Socket *socket = cls;
+
   if (0 != memcmp (socket->other_peer, 
                    peer, 
                    sizeof (struct GNUNET_PeerIdentity)))
@@ -426,19 +433,9 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
   va_list vargs;                /* Variable arguments */
 
   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
-  if (NULL == socket)
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                  "Unable to allocate memory\n");
-      return NULL;
-    }
-  socket->other_peer = GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity));
-  if (NULL == socket->other_peer)
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                  "Unable to allocate memory \n");
-      return NULL;
-    }
+  socket->other_peer = *target;
+  socket->open_cb = open_cb;
+  socket->open_cls = open_cb_cls;
 
   /* Set defaults */
   socket->retransmit_timeout = 
@@ -462,16 +459,13 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
   va_end (vargs);               /* End of variable args parsing */
 
   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
-                                      10,  /* QUEUE size as parameter? */
-                                      NULL, /* cls */
+                                      1,  /* QUEUE size as parameter? */
+                                      socket, /* cls */
                                       NULL, /* No inbound tunnel handler */
                                       NULL, /* No inbound tunnel cleaner */
                                       message_handlers,
                                       NULL); /* We don't get inbound tunnels */
-
-  memcpy (socket->other_peer, target, sizeof (struct GNUNET_PeerIdentity));
-  socket->open_cb = open_cb;
-  socket->open_cls = open_cb_cls;
+  // FIXME: if (NULL == socket->mesh) ...
 
   /* Now create the mesh tunnel to target */
   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
@@ -479,6 +473,7 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
                                               &mesh_peer_connect_callback,
                                               &mesh_peer_disconnect_callback,
                                               (void *) socket);
+  // FIXME: if (NULL == socket->tunnel) ...
 
   return socket;
 }
@@ -502,8 +497,6 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
     {
       GNUNET_free (socket->message);
     }
-  /* Clear memory allocated for other peer's PeerIdentity */
-  GNUNET_Free (socket->other_peer);
   /* Close associated tunnel */
   if (NULL != socket->tunnel)
     {
@@ -528,16 +521,15 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
  * @return initial tunnel context for the tunnel
  *         (can be NULL -- that's not an error)
  */
-void 
+static void 
 new_tunnel_notify (void *cls,
                    struct GNUNET_MESH_Tunnel *tunnel,
                    const struct GNUNET_PeerIdentity *initiator,
                    const struct GNUNET_ATS_Information *atsi)
 {
-  struct GNUNET_STREAM_ListenSocket *lsocket;
+  struct GNUNET_STREAM_ListenSocket *lsocket = cls;
   struct GNUNET_STREAM_Socket *socket;
 
-  lsocket = (struct GNUNET_STREAM_ListenSocket *) cls;
   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
   socket->tunnel = tunnel;
   socket->session_id = 0;       /* FIXME */
@@ -574,7 +566,7 @@ new_tunnel_notify (void *cls,
  * @param tunnel_ctx place where local state associated
  *                   with the tunnel is stored
  */
-void 
+static void 
 tunnel_cleaner (void *cls,
                 const struct GNUNET_MESH_Tunnel *tunnel,
                 void *tunnel_ctx)
@@ -623,12 +615,6 @@ GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
   struct GNUNET_STREAM_ListenSocket *lsocket;
 
   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
-  if (NULL == lsocket)
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                  "Unable to allocate memory\n");
-      return NULL;
-    }
   lsocket->port = app_port;
   lsocket->listen_cb = listen_cb;
   lsocket->listen_cb_cls = listen_cb_cls;
index b7fff7e5f862fa19114e2f15a5f50768ca2f6a43..9c7964e336c1a6b82964ada7a7d3e1cd967ceeba 100644 (file)
@@ -37,6 +37,8 @@ extern "C"
 
 #include "gnunet_util_lib.h"
 
+GNUNET_NETWORK_STRUCT_BEGIN
+
 
 /**
  * The stream message header
@@ -53,7 +55,7 @@ struct GNUNET_STREAM_MessageHeader
   /**
    * A number which identifies a session between the two peers.
    */
-  uint32_t session_id;
+  uint32_t session_id GNUNET_PACKED;
 
 };
 
@@ -70,6 +72,13 @@ struct GNUNET_STREAM_DataMessage
    */
   struct GNUNET_STREAM_MessageHeader header;
 
+  /**
+   * Sequence number; starts with a random value.  (Just in case
+   * someone breaks mesh and is able to try to do a Sequence
+   * Prediction Attack on us.)
+   */
+  uint32_t sequence_number GNUNET_PACKED;
+
   /**
    * number of milliseconds to the soft deadline for sending acknowledgement
    * measured from the time this message is received. It is optimal for the
@@ -77,19 +86,12 @@ struct GNUNET_STREAM_DataMessage
    */
   struct GNUNET_TIME_RelativeNBO ack_deadline;
 
-  /**
-   * Sequence number; starts with a random value.  (Just in case
-   * someone breaks mesh and is able to try to do a Sequence
-   * Prediction Attack on us.)
-   */
-  uint32_t sequence_number;
-
   /**
    * Offset of the packet in the overall stream, modulo 2^32; allows
    * the receiver to calculate where in the destination buffer the
-   * message should be placed.
+   * message should be placed.  In network byte order.
    */
-  uint32_t offset;
+  uint32_t offset GNUNET_PACKED;
 
   /**
    * The data should be appended here
@@ -117,21 +119,23 @@ struct GNUNET_STREAM_AckMessage
    * The Selective Acknowledgement Bitmap. Computed relative to the base_seq
    * (bit n corresponds to the Data message with sequence number base_seq+n)
    */
-  GNUNET_STREAM_AckBitmap bitmap;
+  GNUNET_STREAM_AckBitmap bitmap GNUNET_PACKED;
 
   /**
    * The sequence number of the Data Message upto which the receiver has filled
    * its buffer without any missing packets
    */
-  uint32_t base_sequence_number;
+  uint32_t base_sequence_number GNUNET_PACKED;
 
   /**
    * Available buffer space past the last acknowledged buffer (for flow control),
    * in bytes.
    */
-  uint32_t receive_window_remaining;
+  uint32_t receive_window_remaining GNUNET_PACKED;
 };
 
+GNUNET_NETWORK_STRUCT_END
+
 
 #if 0                           /** keep Emacsens' auto-indent happy */
 {