From: Christian Grothoff Date: Thu, 1 Nov 2018 14:29:45 +0000 (+0100) Subject: work on TNG X-Git-Tag: v0.11.0~240^2~12 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=43de1e4a084b7d9e773b05f173d516dc573de5c1;p=oweals%2Fgnunet.git work on TNG --- diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h index 03b13fd48..4831c9215 100644 --- a/src/include/gnunet_protocols.h +++ b/src/include/gnunet_protocols.h @@ -3005,9 +3005,58 @@ extern "C" #define GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_CANCEL 1135 +/******************************************************* + NEW (TNG) Transport service + ******************************************************* */ /** - * Next available: 1200 + * @brief inform transport to add an address of this peer + */ +#define GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS 1200 + +/** + * @brief inform transport to delete an address of this peer + */ +#define GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS 1201 + +/** + * @brief inform transport about an incoming message + */ +#define GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG 1202 + +/** + * @brief transport acknowledges processing an incoming message + */ +#define GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK 1203 + +/** + * @brief inform transport that a queue was setup to talk to some peer + */ +#define GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP 1204 + +/** + * @brief inform transport that a queue was torn down + */ +#define GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN 1205 + +/** + * @brief transport tells communicator it wants a queue + */ +#define GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE 1206 + +/** + * @brief transport tells communicator it wants to transmit + */ +#define GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG 1207 + +/** + * @brief communicator tells transports that message was sent + */ +#define GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK 1208 + + +/** + * Next available: 1300 */ diff --git a/src/include/gnunet_transport_communication_service.h b/src/include/gnunet_transport_communication_service.h index 94d15af22..d93d5134e 100644 --- a/src/include/gnunet_transport_communication_service.h +++ b/src/include/gnunet_transport_communication_service.h @@ -137,8 +137,8 @@ typedef void * @return #GNUNET_OK if all is well, #GNUNET_NO if the message was * immediately dropped due to memory limitations (communicator * should try to apply back pressure), - * #GNUNET_SYSERR if the message is ill formed and communicator - * should try to reset stream + * #GNUNET_SYSERR if the message could not be delivered because + * the tranport service is not yet up */ int GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandle *handle, @@ -162,7 +162,7 @@ struct GNUNET_TRANSPORT_QueueHandle; * "inbound" connection or because the communicator discovered the * presence of another peer. * - * @param handle connection to transport service + * @param ch connection to transport service * @param peer peer with which we can now communicate * @param address address in human-readable format, 0-terminated, UTF-8 * @param nt which network type does the @a address belong to? @@ -170,7 +170,7 @@ struct GNUNET_TRANSPORT_QueueHandle; * @return API handle identifying the new MQ */ struct GNUNET_TRANSPORT_QueueHandle * -GNUNET_TRANSPORT_communicator_mq_add (struct GNUNET_TRANSPORT_CommunicatorHandle *handle, +GNUNET_TRANSPORT_communicator_mq_add (struct GNUNET_TRANSPORT_CommunicatorHandle *ch, const struct GNUNET_PeerIdentity *peer, const char *address, enum GNUNET_ATS_Network_Type nt, @@ -198,16 +198,16 @@ struct GNUNET_TRANSPORT_AddressIdentifier; * Notify transport service about an address that this communicator * provides for this peer. * - * @param handle connection to transport service + * @param ch connection to transport service * @param address our address in human-readable format, 0-terminated, UTF-8 * @param nt which network type does the address belong to? * @param expiration when does the communicator forsee this address expiring? */ struct GNUNET_TRANSPORT_AddressIdentifier * -GNUNET_TRANSPORT_communicator_address_add (struct GNUNET_TRANSPORT_CommunicatorHandle *handle, +GNUNET_TRANSPORT_communicator_address_add (struct GNUNET_TRANSPORT_CommunicatorHandle *ch, const char *address, enum GNUNET_ATS_Network_Type nt, - struct GNUNET_TIME_Absolute expiration); + struct GNUNET_TIME_Relative expiration); /** diff --git a/src/transport/transport.h b/src/transport/transport.h index 75726e462..ec373286d 100644 --- a/src/transport/transport.h +++ b/src/transport/transport.h @@ -644,6 +644,263 @@ struct TransportPluginMonitorMessage }; + + + + + + + +/* *********************** TNG messages ***************** */ + +/** + * Add address to the list. + */ +struct GNUNET_TRANSPORT_AddAddressMessage +{ + + /** + * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS. + */ + struct GNUNET_MessageHeader header; + + /** + * Address identifier (used during deletion). + */ + uint32_t aid GNUNET_PACKED; + + /** + * When does the address expire? + */ + struct GNUNET_TIME_RelativeNBO expiration; + + /** + * An `enum GNUNET_ATS_Network_Type` in NBO. + */ + uint32_t nt; + + /* followed by UTF-8 encoded, 0-terminated human-readable address */ +}; + + +/** + * Remove address from the list. + */ +struct GNUNET_TRANSPORT_DelAddressMessage +{ + + /** + * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS. + */ + struct GNUNET_MessageHeader header; + + /** + * Address identifier. + */ + uint32_t aid GNUNET_PACKED; + +}; + + +/** + * Inform transport about an incoming message. + */ +struct GNUNET_TRANSPORT_IncomingMessage +{ + + /** + * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG. + */ + struct GNUNET_MessageHeader header; + + /** + * Do we use flow control or not? + */ + uint32_t fc_on GNUNET_PACKED; + + /** + * 64-bit number to identify the matching ACK. + */ + uint64_t fc_id GNUNET_PACKED; + + /** + * Sender identifier. + */ + struct GNUNET_PeerIdentity sender GNUNET_PACKED; + + /* followed by the message */ +}; + + +/** + * Transport informs us about being done with an incoming message. + * (only sent if fc_on was set). + */ +struct GNUNET_TRANSPORT_IncomingMessageAck +{ + + /** + * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK. + */ + struct GNUNET_MessageHeader header; + + /** + * Reserved (0) + */ + uint32_t reserved GNUNET_PACKED; + + /** + * Which message is being ACKed? + */ + uint64_t fc_id GNUNET_PACKED; + + /** + * Sender identifier of the original message. + */ + struct GNUNET_PeerIdentity sender GNUNET_PACKED; + +}; + + +/** + * Add queue to the transport + */ +struct GNUNET_TRANSPORT_AddQueueMessage +{ + + /** + * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_QUEUE. + */ + struct GNUNET_MessageHeader header; + + /** + * Queue identifier (used to identify the queue). + */ + uint32_t qid GNUNET_PACKED; + + /** + * Receiver that can be addressed via the queue. + */ + struct GNUNET_PeerIdentity receiver GNUNET_PACKED; + + /** + * An `enum GNUNET_ATS_Network_Type` in NBO. + */ + uint32_t nt; + + /* followed by UTF-8 encoded, 0-terminated human-readable address */ +}; + + +/** + * Remove queue, it is no longer available. + */ +struct GNUNET_TRANSPORT_DelQueueMessage +{ + + /** + * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_QUEUE. + */ + struct GNUNET_MessageHeader header; + + /** + * Address identifier. + */ + uint32_t qid GNUNET_PACKED; + + /** + * Receiver that can be addressed via the queue. + */ + struct GNUNET_PeerIdentity receiver GNUNET_PACKED; + +}; + + +/** + * Transport tells communicator that it wants a new queue. + */ +struct GNUNET_TRANSPORT_CreateQueue +{ + + /** + * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE. + */ + struct GNUNET_MessageHeader header; + + /** + * Always zero. + */ + uint32_t reserved GNUNET_PACKED; + + /** + * Receiver that can be addressed via the queue. + */ + struct GNUNET_PeerIdentity receiver GNUNET_PACKED; + + /* followed by UTF-8 encoded, 0-terminated human-readable address */ +}; + + +/** + * Inform communicator about transport's desire to send a message. + */ +struct GNUNET_TRANSPORT_SendMessageTo +{ + + /** + * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG. + */ + struct GNUNET_MessageHeader header; + + /** + * Which queue should we use? + */ + uint32_t qid GNUNET_PACKED; + + /** + * Message ID, used for flow control. + */ + uint64_t mid GNUNET_PACKED; + + /** + * Receiver identifier. + */ + struct GNUNET_PeerIdentity receiver GNUNET_PACKED; + + /* followed by the message */ +}; + + +/** + * Inform transport that message was sent. + */ +struct GNUNET_TRANSPORT_SendMessageToAck +{ + + /** + * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK. + */ + struct GNUNET_MessageHeader header; + + /** + * Success (#GNUNET_OK), failure (#GNUNET_SYSERR). + */ + uint32_t status GNUNET_PACKED; + + /** + * Message ID of the original message. + */ + uint64_t mid GNUNET_PACKED; + + /** + * Receiver identifier. + */ + struct GNUNET_PeerIdentity receiver GNUNET_PACKED; + +}; + + + GNUNET_NETWORK_STRUCT_END /* end of transport.h */ diff --git a/src/transport/transport_api2_communication.c b/src/transport/transport_api2_communication.c index e33c5f444..d446516bd 100644 --- a/src/transport/transport_api2_communication.c +++ b/src/transport/transport_api2_communication.c @@ -28,6 +28,79 @@ #include "transport.h" +/** + * How many messages do we keep at most in the queue to the + * transport service before we start to drop (default, + * can be changed via the configuration file). + */ +#define DEFAULT_MAX_QUEUE_LENGTH 16 + + +/** + * Information we track per packet to enable flow control. + */ +struct FlowControl +{ + /** + * Kept in a DLL. + */ + struct FlowControl *next; + + /** + * Kept in a DLL. + */ + struct FlowControl *prev; + + /** + * Function to call once the message was processed. + */ + GNUNET_TRANSPORT_MessageCompletedCallback cb; + + /** + * Closure for @e cb + */ + void *cb_cls; + + /** + * Which peer is this about? + */ + struct GNUNET_PeerIdentity sender; + + /** + * More-or-less unique ID for the message. + */ + uint64_t id; +}; + + +/** + * Information we track per message to tell the transport about + * success or failures. + */ +struct AckPending +{ + /** + * Kept in a DLL. + */ + struct AckPending *next; + + /** + * Kept in a DLL. + */ + struct AckPending *prev; + + /** + * Which peer is this about? + */ + struct GNUNET_PeerIdentity receiver; + + /** + * More-or-less unique ID for the message. + */ + uint64_t mid; +}; + + /** * Opaque handle to the transport service for communicators. */ @@ -43,6 +116,36 @@ struct GNUNET_TRANSPORT_CommunicatorHandle */ struct GNUNET_TRANSPORT_AddressIdentifier *ai_tail; + /** + * DLL of messages awaiting flow control confirmation (ack). + */ + struct FlowControl *fc_head; + + /** + * DLL of messages awaiting flow control confirmation (ack). + */ + struct FlowControl *fc_tail; + + /** + * DLL of messages awaiting transmission confirmation (ack). + */ + struct AckPending *ap_head; + + /** + * DLL of messages awaiting transmission confirmation (ack). + */ + struct AckPending *ac_tail; + + /** + * DLL of queues we offer. + */ + struct QueueHandle *queue_head; + + /** + * DLL of queues we offer. + */ + struct QueueHandle *queue_tail; + /** * Our configuration. */ @@ -64,6 +167,16 @@ struct GNUNET_TRANSPORT_CommunicatorHandle */ void *mq_init_cls; + /** + * Maximum permissable queue length. + */ + unsigned long long max_queue_length; + + /** + * Flow-control identifier generator. + */ + uint64_t fc_gen; + /** * MTU of the communicator */ @@ -74,10 +187,53 @@ struct GNUNET_TRANSPORT_CommunicatorHandle * transport service. */ uint32_t aid_gen; + + /** + * Queue identifier generator. + */ + uint32_t queue_gen; }; +/** + * Handle returned to identify the internal data structure the transport + * API has created to manage a message queue to a particular peer. + */ +struct GNUNET_TRANSPORT_QueueHandle +{ + /** + * Handle this queue belongs to. + */ + struct GNUNET_TRANSPORT_CommunicatorHandle *ch; + + /** + * Which peer we can communciate with. + */ + struct GNUNET_PeerIdentity peer; + + /** + * Address used by the communication queue. + */ + char *address; + + /** + * Network type of the communciation queue. + */ + enum GNUNET_ATS_Network_Type nt; + + /** + * The queue itself. + */ + struct GNUNET_MQ_Handle *mq; + + /** + * ID for this queue when talking to the transport service. + */ + uint32_t queue_id; + +}; + /** * Internal representation of an address a communicator is @@ -184,6 +340,100 @@ send_del_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai) } +/** + * Send message to the transport service about queue @a qh + * being now available. + * + * @param qh queue to add + */ +static void +send_add_queue (struct GNUNET_TRANSPORT_QueueHandle *qh) +{ + struct GNUNET_MQ_Envelope *env; + struct GNUNET_TRANSPORT_AddQueueMessage *aqm; + + if (NULL == ai->ch->mq) + return; + env = GNUNET_MQ_msg_extra (aqm, + strlen (ai->address) + 1, + GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_QUEUE); + aqm.receiver = qh->peer; + aqm.nt = htonl ((uint32_t) qh->nt); + aqm.qid = htonl (qh->qid); + memcpy (&aqm[1], + ai->address, + strlen (ai->address) + 1); + GNUNET_MQ_send (ai->ch->mq, + env); +} + + +/** + * Send message to the transport service about queue @a qh + * being no longer available. + * + * @param qh queue to delete + */ +static void +send_del_queue (struct GNUNET_TRANSPORT_QueueHandle *qh) +{ + struct GNUNET_MQ_Envelope *env; + struct GNUNET_TRANSPORT_DelQueueMessage *dqm; + + if (NULL == ai->ch->mq) + return; + env = GNUNET_MQ_msg (dqm, + GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_QUEUE); + dqm.qid = htonl (qh->qid); + dqm.receiver = qh->peer; + GNUNET_MQ_send (ai->ch->mq, + env); +} + + +/** + * Disconnect from the transport service. Purges + * all flow control entries as we will no longer receive + * the ACKs. Purges the ack pending entries as the + * transport will no longer expect the confirmations. + * + * @param ch service to disconnect from + */ +static void +disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch) +{ + struct FlowControl *fcn; + struct AckPending *apn; + + for (struct FlowControl *fc = ch->fc_head; + NULL != fc; + fc = fcn) + { + fcn = fc->next; + GNUNET_CONTAINER_DLL_remove (ch->fc_head, + ch->fc_tail, + fc); + fc->cb (fc->cb_cls, + GNUNET_SYSERR); + GNUNET_free (fc); + } + for (struct AckPending *ap = ch->ap_head; + NULL != ap; + ap = apn) + { + apn = ap->next; + GNUNET_CONTAINER_DLL_remove (ch->ap_head, + ch->ap_tail, + ap); + GNUNET_free (ap); + } + if (NULL == ch->mq) + return; + GNUNET_MQ_destroy (ch->mq); + ch->mq = NULL; +} + + /** * Function called on MQ errors. */ @@ -192,14 +442,229 @@ error_handler (void *cls, enum GNUNET_MQ_Error error) { struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "MQ failure, reconnecting to transport service.\n"); + disconnect (ch); + /* TODO: maybe do this with exponential backoff/delay */ + reconnect (ch); +} + + +/** + * Transport service acknowledged a message we gave it + * (with flow control enabled). Tell the communicator. + * + * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *` + * @param incoming_ack the ack + */ +static void +handle_incoming_ack (void *cls, + struct GNUNET_TRANSPORT_IncomingMessageAck *incoming_ack) +{ + struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; - GNUNET_MQ_destroy (ch->mq); - ch->mq = NULL; + for (struct FlowControl *fc = ch->fc_head; + NULL != fc; + fc = fc->next) + { + if ( (fc->id == incoming_ack->fc_id) && + (0 == memcmp (&fc->sender, + incoming_ack->sender, + sizeof (struct GNUNET_PeerIdentity))) ) + { + GNUNET_CONTAINER_DLL_remove (ch->fc_head, + ch->fc_tail, + fc); + fc->cb (fc->cb_cls, + GNUNET_OK); + GNUNET_free (fc); + return; + } + } + GNUNET_break (0); + disconnect (ch); /* TODO: maybe do this with exponential backoff/delay */ reconnect (ch); } +/** + * Transport service wants us to create a queue. Check if @a cq + * is well-formed. + * + * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *` + * @param cq the queue creation request + * @return #GNUNET_OK if @a smt is well-formed + */ +static int +check_create_queue (void *cls, + struct GNUNET_TRANSPORT_CreateQueue *cq) +{ + uint16_t len = ntohs (cq->header.size) - sizeof (*cq); + const char *addr = (const char *) &cq[1]; + + if ( (0 == len) || + ('\0' != addr[len-1]) ) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + + +/** + * Transport service wants us to create a queue. Tell the communicator. + * + * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *` + * @param cq the queue creation request + */ +static void +handle_create_queue (void *cls, + struct GNUNET_TRANSPORT_CreateQueue *cq) +{ + struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; + const char *addr = (const char *) &cq[1]; + + if (GNUNET_OK != + ch->mq_init (ch->mq_init_cls, + &cq->receiver, + addr)) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Address `%s' invalid for this communicator\n", + addr); + // TODO: do we notify the transport!? + } +} + + +/** + * Transport service wants us to send a message. Check if @a smt + * is well-formed. + * + * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *` + * @param smt the transmission request + * @return #GNUNET_OK if @a smt is well-formed + */ +static int +check_send_msg (void *cls, + struct GNUNET_TRANSPORT_SendMessageTo *smt) +{ + uint16_t len = ntohs (smt->header.size) - sizeof (*smt); + const struct GNUNET_MessageHeader *mh = (const struct GNUNET_MessageHeader *) &smt[1]; + + if (ntohs (mh->size) != len) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + + +/** + * Notify transport service about @a status of a message with + * @a mid sent to @a receiver. + * + * @param ch handle + * @param status #GNUNET_OK on success, #GNUNET_SYSERR on failure + * @param receiver which peer was the receiver + * @param mid message that the ack is about + */ +static void +send_ack (struct GNUNET_TRANSPORT_CommunicatorHandle *ch, + int status, + const struct GNUNET_PeerIdentity *receiver, + uint64_t mid) +{ + struct GNUNET_MQ_Envelope *env; + struct GNUNET_TRANSPORT_SendMessageToAck *ack; + + env = GNUNET_MQ_msg (ack, + GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK); + ack->status = htonl (GNUNET_OK); + ack->mid = ap->mid; + ack->receiver = ap->receiver; + GNUNET_MQ_send (ch->mq, + env); +} + + +/** + * Message queue transmission by communicator was successful, + * notify transport service. + * + * @param cls an `struct AckPending *` + */ +static void +send_ack_cb (void *cls) +{ + struct AckPending *ap = cls; + struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ap->ch; + + GNUNET_CONTAINER_DLL_remove (ch->ap_head, + ch->ap_tail, + ap); + send_ack (ch, + GNUNET_OK, + &ap->receiver, + ap->mid); + GNUNET_free (ap); +} + + +/** + * Transport service wants us to send a message. Tell the communicator. + * + * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *` + * @param smt the transmission request + */ +static void +handle_send_msg (void *cls, + struct GNUNET_TRANSPORT_SendMessageTo *smt) +{ + struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; + const struct GNUNET_MessageHeader *mh; + struct GNUNET_MQ_Envelope *env; + struct AckPending *ap; + struct QueueHandle *qh; + + for (qh = ch->queue_head;NULL != qh; qh = qh->next) + if ( (qh->queue_id == smt->qid) && + (0 == memcmp (&qh->peer, + &smt->target, + sizeof (struct GNUNET_PeerIdentity))) ) + break; + if (NULL == qh) + { + /* queue is already gone, tell transport this one failed */ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Transmission failed, queue no longer exists.\n"); + send_ack (ch, + GNUNET_NO, + &smt->receiver, + smt->mid); + return; + } + ap = GNUNET_new (struct AckPending); + ap->ch = ch; + ap->receiver = smt->receiver; + ap->mid = smt->mid; + GNUNET_CONTAINER_DLL_insert (ch->ap_head, + cp->ap_tail, + ap); + mh = (const struct GNUNET_MessageHeader *) &smt[1]; + env = GNUNET_MQ_msg_copy (mh); + GNUNET_MQ_notify_sent (env, + &send_ack_cb, + ap); + GNUNET_MQ_send (qh->mq, + env); +} + + /** * (re)connect our communicator to the transport service * @@ -209,6 +674,18 @@ static void reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch) { struct GNUNET_MQ_MessageHandler handlers[] = { + GNUNET_MQ_hd_fixed_size (incoming_ack, + GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK, + struct GNUNET_TRANSPORT_IncomingMessageAck, + ch), + GNUNET_MQ_hd_var_size (create_queue, + GNUNET_MESSAGE_TYPE_TRANSPORT_CREATE_QUEUE, + struct GNUNET_TRANSPORT_CreateQueue, + ch), + GNUNET_MQ_hd_var_size (send_msg, + GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG, + struct GNUNET_TRANSPORT_SendMessageTo, + ch), GNUNET_MQ_handler_end() }; @@ -217,10 +694,14 @@ reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch) handlers, &error_handler, ch); - for (struct GNUNET_TRANSPORT_AddressIdentifier ai = ch->ai_head; + for (struct GNUNET_TRANSPORT_AddressIdentifier *ai = ch->ai_head; NULL != ai; ai = ai->next) send_add_address (ai); + for (struct GNUNET_TRANSPORT_QueueHandle *qh = ch->queue_head; + NULL != qh; + qh = qh->next) + send_add_queue (qh); } @@ -253,6 +734,12 @@ GNUNET_TRANSPORT_communicator_connect (const struct GNUNET_CONFIGURATION_Handle ch->mq_init = mq_init; ch->mq_init_cls = mq_init_cls; reconnect (ch); + if (GNUNET_OK != + GNUNET_CONFIGURATION_get_value_number (cfg, + name, + "MAX_QUEUE_LENGTH", + &ch->max_queue_length)) + ch->max_queue_length = DEFAULT_MAX_QUEUE_LENGTH; if (NULL == ch->mq) { GNUNET_free (ch); @@ -270,12 +757,12 @@ GNUNET_TRANSPORT_communicator_connect (const struct GNUNET_CONFIGURATION_Handle void GNUNET_TRANSPORT_communicator_disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch) { + disconnect (ch); while (NULL != ch->ai_head) { GNUNET_break (0); /* communicator forgot to remove address, warn! */ GNUNET_TRANSPORT_communicator_address_remove (ch->ai_head); } - GNUNET_MQ_destroy (ch->mq); GNUNET_free (ch); } @@ -297,8 +784,8 @@ GNUNET_TRANSPORT_communicator_disconnect (struct GNUNET_TRANSPORT_CommunicatorHa * @return #GNUNET_OK if all is well, #GNUNET_NO if the message was * immediately dropped due to memory limitations (communicator * should try to apply back pressure), - * #GNUNET_SYSERR if the message is ill formed and communicator - * should try to reset stream + * #GNUNET_SYSERR if the message could not be delivered because + * the tranport service is not yet up */ int GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandle *ch, @@ -312,7 +799,33 @@ GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandl uint16_t msize; if (NULL == ai->ch->mq) - return; + return GNUNET_SYSERR; + if (NULL != cb) + { + struct FlowControl *fc; + + im->fc_on = htonl (GNUNET_YES); + im->fc_id = ai->ch->fc_gen++; + fc = GNUNET_new (struct FlowControl); + fc->sender = *sender; + fc->id = im->fc_id; + fc->cb = cb; + fc->cb_cls = cb_cls; + GNUNET_CONTAINER_DLL_insert (ch->fc_head, + ch->fc_tail, + fc); + } + else + { + if (GNUNET_MQ_get_length (ch->mq) >= ch->max_queue_length) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Dropping message: transprot is too slow, queue length %u exceeded\n", + ch->max_queue_length); + return GNUNET_NO; + } + } + msize = ntohs (msg->size); env = GNUNET_MQ_msg_extra (im, msize, @@ -320,7 +833,7 @@ GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandl if (NULL == env) { GNUNET_break (0); - return; + return GNUNET_SYSERR; } im->sender = *sender; memcpy (&im[1], @@ -328,19 +841,12 @@ GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandl msize); GNUNET_MQ_send (ai->ch->mq, env); + return GNUNET_OK; } /* ************************* Discovery *************************** */ -/** - * Handle returned to identify the internal data structure the transport - * API has created to manage a message queue to a particular peer. - */ -struct GNUNET_TRANSPORT_QueueHandle -{ -}; - /** * Notify transport service that an MQ became available due to an @@ -361,6 +867,20 @@ GNUNET_TRANSPORT_communicator_mq_add (struct GNUNET_TRANSPORT_CommunicatorHandle enum GNUNET_ATS_Network_Type nt, struct GNUNET_MQ_Handle *mq) { + struct GNUNET_TRANSPORT_QueueHandle *qh; + + qh = GNUNET_new (struct GNUNET_TRANSPORT_QueueHandle); + qh->ch = ch; + qh->peer = *peer; + qh->address = GNUNET_strdup (address); + qh->nt = nt; + qh->mq = mq; + qh->queue_id = ch->queue_gen++; + GNUNET_CONTAINER_DLL_insert (ch->queue_head, + ch->queue_tail, + qh); + send_add_queue (qh); + return qh; } @@ -373,11 +893,18 @@ GNUNET_TRANSPORT_communicator_mq_add (struct GNUNET_TRANSPORT_CommunicatorHandle void GNUNET_TRANSPORT_communicator_mq_del (struct GNUNET_TRANSPORT_QueueHandle *qh) { + struct GNUNET_TRANSPORT_CommunicatorHandle *ch = qh->ch; + + send_del_queue (qh); + GNUNET_CONTAINER_DLL_remove (ch->queue_head, + ch->queue_tail, + qh); + GNUNET_MQ_destroy (qh->mq); + GNUNET_free (qh->address); + GNUNET_free (qh); } - - /** * Notify transport service about an address that this communicator * provides for this peer. @@ -421,10 +948,10 @@ GNUNET_TRANSPORT_communicator_address_remove (struct GNUNET_TRANSPORT_AddressIde struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ai->ch; send_del_address (ai); - GNUNET_free (ai->address); GNUNET_CONTAINER_DLL_remove (ch->ai_head, ch->ai_tail, ai); + GNUNET_free (ai->address); GNUNET_free (ai); }