bulk
[oweals/gnunet.git] / src / transport / gnunet-service-transport.c
index 6486e1028b61a91fbf50763e068d9b19d286e3dd..8cbd32dd383bb9d84980e7131367bdd4f7ab06e0 100644 (file)
@@ -73,7 +73,7 @@ struct GNUNET_CRYPTO_RsaPrivateKey *GST_my_private_key;
 /**
  * ATS handle.
  */
-struct GNUNET_ATS_Handle *GST_ats;
+struct GNUNET_ATS_SchedulingHandle *GST_ats;
 
 
 /**
@@ -83,16 +83,22 @@ struct GNUNET_ATS_Handle *GST_ats;
  * @param target a connected neighbour
  * @param ats performance information (unused)
  * @param ats_count number of records in ats (unused)
+ * @param transport plugin
+ * @param addr address
+ * @param addrlen address length
  */
 static void
 transmit_our_hello (void *cls, const struct GNUNET_PeerIdentity *target,
-                    const struct GNUNET_TRANSPORT_ATS_Information *ats,
-                    uint32_t ats_count)
+                    const struct GNUNET_ATS_Information *ats,
+                    uint32_t ats_count,
+                    const char * transport,
+                    const void * addr,
+                    size_t addrlen)
 {
   const struct GNUNET_MessageHeader *hello = cls;
 
   GST_neighbours_send (target, (const char *) hello, ntohs (hello->size),
-                       GST_HELLO_ADDRESS_EXPIRATION, NULL, NULL);
+                       GNUNET_CONSTANTS_HELLO_ADDRESS_EXPIRATION, NULL, NULL);
 }
 
 
@@ -129,6 +135,71 @@ try_connect_if_allowed (void *cls, const struct GNUNET_PeerIdentity *peer,
 }
 
 
+/**
+ * We received some payload.  Prepare to pass it on to our clients. 
+ *
+ * @param peer (claimed) identity of the other peer
+ * @param message the message, NULL if we only care about
+ *                learning about the delay until we should receive again -- FIXME!
+ * @param ats performance information
+ * @param ats_count number of records in ats
+ * @return how long the plugin should wait until receiving more data
+ */
+static struct GNUNET_TIME_Relative
+process_payload (const struct GNUNET_PeerIdentity *peer,
+                const struct GNUNET_MessageHeader *message,
+                const struct GNUNET_ATS_Information *ats,
+                uint32_t ats_count)
+{
+  struct GNUNET_TIME_Relative ret;
+  int do_forward;
+  struct InboundMessage *im;
+  size_t size = sizeof (struct InboundMessage) + ntohs (message->size);
+  char buf[size];
+  
+  ret = GNUNET_TIME_UNIT_ZERO;
+  do_forward = GNUNET_SYSERR;
+  ret =
+    GST_neighbours_calculate_receive_delay (peer,
+                                           (message ==
+                                            NULL) ? 0 :
+                                           ntohs (message->size),
+                                           &do_forward);
+  im = (struct InboundMessage*) buf;    
+  im->header.size = htons (size);
+  im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
+  im->ats_count = htonl (0);
+  memcpy (&(im->peer), peer, sizeof (struct GNUNET_PeerIdentity));
+  memcpy (&im[1], message, ntohs (message->size));
+
+  switch (do_forward)
+  {
+  case GNUNET_YES:
+    GST_clients_broadcast (&im->header, GNUNET_YES);     
+    break;
+  case GNUNET_NO:
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+               _("Discarded %u bytes of type %u from %s: quota violated!\n"),
+               ntohs (message->size),
+               ntohs (message->type),
+               GNUNET_i2s (peer));
+    break;
+  case GNUNET_SYSERR:
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+               _("Discarded %u bytes of type %u from %s: connection is down!\n"),
+               ntohs (message->size),
+               ntohs (message->type),
+               GNUNET_i2s (peer));
+    /* FIXME: store until connection is up? This is virtually always a SETKEY and a PING... */
+    break;
+  default:
+    GNUNET_break (0);
+    break;
+  }    
+  return ret;
+}
+
+
 /**
  * Function called by the transport for each received message.
  * This function should also be called with "NULL" for the
@@ -155,103 +226,89 @@ try_connect_if_allowed (void *cls, const struct GNUNET_PeerIdentity *peer,
 static struct GNUNET_TIME_Relative
 plugin_env_receive_callback (void *cls, const struct GNUNET_PeerIdentity *peer,
                              const struct GNUNET_MessageHeader *message,
-                             const struct GNUNET_TRANSPORT_ATS_Information *ats,
+                             const struct GNUNET_ATS_Information *ats,
                              uint32_t ats_count, struct Session *session,
                              const char *sender_address,
                              uint16_t sender_address_len)
 {
   const char *plugin_name = cls;
-  int do_forward;
   struct GNUNET_TIME_Relative ret;
   uint16_t type;
-
+  
   ret = GNUNET_TIME_UNIT_ZERO;
-  if (NULL != message)
+  if (NULL == message)
+    goto end;
+  type = ntohs (message->type);
+  switch (type)
   {
-    type = ntohs (message->type);
-    switch (type)
-    {
-    case GNUNET_MESSAGE_TYPE_HELLO:
-      GST_validation_handle_hello (message);
-      return ret;
-    case GNUNET_MESSAGE_TYPE_TRANSPORT_PING:
+  case GNUNET_MESSAGE_TYPE_HELLO:
+    GST_validation_handle_hello (message);
+    return ret;
+  case GNUNET_MESSAGE_TYPE_TRANSPORT_PING:
 #if DEBUG_TRANSPORT
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
-                  "Processing `%s' from `%s'\n", "PING",
-                  (sender_address != NULL) ? GST_plugins_a2s (plugin_name,
-                                                              sender_address,
-                                                              sender_address_len)
-                  : "<inbound>");
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
+               "Processing `%s' from `%s'\n", "PING",
+               (sender_address != NULL) ? GST_plugins_a2s (plugin_name,
+                                                           sender_address,
+                                                           sender_address_len)
+               : "<inbound>");
 #endif
-      GST_validation_handle_ping (peer, message, plugin_name, session,
-                                  sender_address, sender_address_len);
-      break;
-    case GNUNET_MESSAGE_TYPE_TRANSPORT_PONG:
+    GST_validation_handle_ping (peer, message, plugin_name, session,
+                               sender_address, sender_address_len);
+    break;
+  case GNUNET_MESSAGE_TYPE_TRANSPORT_PONG:
 #if DEBUG_TRANSPORT
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
-                  "Processing `%s' from `%s'\n", "PONG",
-                  (sender_address != NULL) ? GST_plugins_a2s (plugin_name,
-                                                              sender_address,
-                                                              sender_address_len)
-                  : "<inbound>");
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
+               "Processing `%s' from `%s'\n", "PONG",
+               (sender_address != NULL) ? GST_plugins_a2s (plugin_name,
+                                                           sender_address,
+                                                           sender_address_len)
+               : "<inbound>");
 #endif
-      GST_validation_handle_pong (peer, message);
-      break;
-    case GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT:
-      (void) GST_blacklist_test_allowed (peer, NULL, &try_connect_if_allowed,
-                                         NULL);
-      /* TODO: if 'session != NULL', and timestamp more recent than the
-       * previous one, maybe notify ATS that this is now the preferred
-       * * way to communicate with this peer (other peer switched transport) */
-      break;
-    case GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT:
-      /* FIXME: do some validation to prevent an attacker from sending
-       * a fake disconnect message... */         
-      GST_neighbours_force_disconnect (peer);
-      break;
-    case GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE:
-      GST_neighbours_keepalive (peer);
-      break;
-    default:
-      /* should be payload */
-      do_forward = GNUNET_SYSERR;
-      ret =
-          GST_neighbours_calculate_receive_delay (peer,
-                                                  (message ==
-                                                   NULL) ? 0 :
-                                                  ntohs (message->size),
-                                                  &do_forward);
-      if (do_forward == GNUNET_YES)
-      {
-        struct InboundMessage *im;
-        size_t size = sizeof (struct InboundMessage) + ntohs (message->size);
-
-        im = GNUNET_malloc (size);
-        im->header.size = htons (size);
-        im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
-        im->ats_count = htonl (0);
-        memcpy (&(im->peer), peer, sizeof (struct GNUNET_PeerIdentity));
-        memcpy (&im[1], message, ntohs (message->size));
-
-        GST_clients_broadcast ((const struct GNUNET_MessageHeader *) im,
-                               GNUNET_YES);
-
-        GNUNET_free (im);
-      }
-      break;
-    }
+    GST_validation_handle_pong (peer, message);
+    break;
+  case GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT:
+    GST_neighbours_handle_connect (message,
+                                  peer,
+                                  plugin_name, sender_address, sender_address_len,
+                                  session, ats, ats_count);
+    (void) GST_blacklist_test_allowed (peer, NULL, &try_connect_if_allowed,
+                                      NULL);
+    break;
+  case GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT:
+    /* FIXME: do some validation to prevent an attacker from sending
+     * a fake disconnect message... */           
+    GST_neighbours_force_disconnect (peer);
+    break;
+  case GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE:
+    GST_neighbours_keepalive (peer);
+    break;
+  default:
+    /* should be payload */
+    process_payload (peer,
+                    message,
+                    ats, ats_count);
+    break;
   }
-  GNUNET_assert ((ats_count > 0) && (ats != NULL));
-  /*
-     FIXME: this gives an address that might not have been validated to
-     ATS for 'selection', which is probably not what we want; this 
-     might be particularly wrong (as in, possibly hiding bugs with address
-     validation) as 'GNUNET_ATS_address_update' currently ignores
-     the expiration given.
-  */
-  GNUNET_ATS_address_update (GST_ats, peer, GNUNET_TIME_absolute_get (),        /* valid at least until right now... */
-                             plugin_name, session, sender_address,
-                             sender_address_len, ats, ats_count);
+ end:
+#if 1
+  /* FIXME: this should not be needed, and not sure it's good to have it, but without
+     this connections seem to go extra-slow */
+  if ((ats_count > 0) && (ats != NULL))
+  {
+    if (NULL != session)
+      GNUNET_log_from (GNUNET_ERROR_TYPE_INFO | GNUNET_ERROR_TYPE_BULK,
+                      "transport-ats",
+                      "Giving ATS session %p of plugin %s for peer %s\n",
+                      session,
+                      plugin_name,
+                      GNUNET_i2s (peer));
+    GNUNET_ATS_address_update (GST_ats, peer,
+                               plugin_name, sender_address, sender_address_len,
+                               session,
+                               ats, ats_count);
+  }
+#endif
   return ret;
 }
 
@@ -294,6 +351,18 @@ static void
 plugin_env_session_end (void *cls, const struct GNUNET_PeerIdentity *peer,
                         struct Session *session)
 {
+#if DEBUG_TRANSPORT
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Session %X to peer `%s' ended \n",
+              session, GNUNET_i2s (peer));
+#endif
+  if (NULL != session)
+    GNUNET_log_from (GNUNET_ERROR_TYPE_INFO  | GNUNET_ERROR_TYPE_BULK,
+                    "transport-ats",
+                    "Telling ATS to destroy session %p from peer %s\n",
+                    session,              
+                    GNUNET_i2s (peer));
+  GNUNET_ATS_address_destroyed(GST_ats, peer, NULL, NULL, 0, session);
   GST_neighbours_session_terminated (peer, session);
 }
 
@@ -311,17 +380,50 @@ plugin_env_session_end (void *cls, const struct GNUNET_PeerIdentity *peer,
  * @param session session to use (if available)
  * @param plugin_addr address to use (if available)
  * @param plugin_addr_len number of bytes in addr
- * @param bandwidth assigned outbound bandwidth for the connection
+ * @param bandwidth_out assigned outbound bandwidth for the connection, 0 to disconnect from peer
+ * @param bandwidth_in assigned inbound bandwidth for the connection, 0 to disconnect from peer
  */
 static void
 ats_request_address_change (void *cls, const struct GNUNET_PeerIdentity *peer,
-                            const char *plugin_name, struct Session *session,
+                            const char *plugin_name,
                             const void *plugin_addr, size_t plugin_addr_len,
-                            struct GNUNET_BANDWIDTH_Value32NBO bandwidth)
+                            struct Session *session,
+                            struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
+                            struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in,
+                            const struct GNUNET_ATS_Information * ats,
+                            uint32_t ats_count)
 {
+  uint32_t bw_in = ntohl (bandwidth_in.value__);
+  uint32_t bw_out = ntohl (bandwidth_out.value__);
+
+  /* ATS tells me to disconnect from peer*/
+  if ((bw_in == 0) && (bw_out == 0))
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "ATS tells me to disconnect from peer `%s'\n",
+        GNUNET_i2s (peer));
+    GST_neighbours_force_disconnect(peer);
+    return;
+  }
+
   GST_neighbours_switch_to_address (peer, plugin_name, plugin_addr,
-                                    plugin_addr_len, session, NULL, 0);
-  GST_neighbours_set_incoming_quota (peer, bandwidth);
+                                    plugin_addr_len, session, ats, ats_count);
+
+#if DEBUG_TRANSPORT
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending outbound quota of %u Bps for peer `%s' to all clients\n",
+              ntohl (bandwidth_out.value__), GNUNET_i2s (peer));
+#endif
+  struct QuotaSetMessage msg;
+  msg.header.size = htons (sizeof (struct QuotaSetMessage));
+  msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
+  msg.quota = bandwidth_out;
+  msg.peer = (*peer);
+  GST_clients_broadcast ((struct GNUNET_MessageHeader *) &msg, GNUNET_NO);
+
+#if DEBUG_TRANSPORT
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Setting inbound quota of %u for peer `%s' to \n",
+              ntohl (bandwidth_in.value__), GNUNET_i2s (peer));
+#endif
+  GST_neighbours_set_incoming_quota (peer, bandwidth_in);
 }
 
 
@@ -332,27 +434,24 @@ ats_request_address_change (void *cls, const struct GNUNET_PeerIdentity *peer,
  * @param cls closure
  * @param peer the peer that connected
  * @param ats performance data
- * @param ats_count number of entries in ats (excluding 0-termination)
+ * @param ats_count number of entries in ats
  */
 static void
 neighbours_connect_notification (void *cls,
                                  const struct GNUNET_PeerIdentity *peer,
-                                 const struct GNUNET_TRANSPORT_ATS_Information
+                                 const struct GNUNET_ATS_Information
                                  *ats, uint32_t ats_count)
 {
   char buf[sizeof (struct ConnectInfoMessage) +
-           ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information)];
+           ats_count * sizeof (struct GNUNET_ATS_Information)];
   struct ConnectInfoMessage *connect_msg = (struct ConnectInfoMessage *) buf;
-  struct GNUNET_TRANSPORT_ATS_Information *atsm = &connect_msg->ats;
 
   connect_msg->header.size = htons (sizeof (buf));
   connect_msg->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
   connect_msg->ats_count = htonl (ats_count);
   connect_msg->id = *peer;
-  memcpy (&connect_msg->ats, ats,
-          ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information));
-  atsm[ats_count].type = htonl (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR);
-  atsm[ats_count].value = htonl (0);
+  memcpy (&connect_msg->ats, &connect_msg->ats,
+          ats_count * sizeof (struct GNUNET_ATS_Information));
   GST_clients_broadcast (&connect_msg->header, GNUNET_NO);
 }
 
@@ -391,7 +490,7 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
   GST_validation_stop ();
   GST_plugins_unload ();
   GST_neighbours_stop ();
-  GNUNET_ATS_shutdown (GST_ats);
+  GNUNET_ATS_scheduling_done (GST_ats);
   GST_ats = NULL;
   GST_clients_stop ();
   GST_blacklist_stop ();
@@ -470,7 +569,7 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
   GST_plugins_load (&plugin_env_receive_callback,
                     &plugin_env_address_change_notification,
                     &plugin_env_session_end);
-  GST_ats = GNUNET_ATS_init (GST_cfg, &ats_request_address_change, NULL);
+  GST_ats = GNUNET_ATS_scheduling_init (GST_cfg, &ats_request_address_change, NULL);
   GST_neighbours_start (NULL, &neighbours_connect_notification,
                         &neighbours_disconnect_notification);
   GST_clients_start (server);