WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Affero General Public License for more details.
-
+
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
+
+ SPDX-License-Identifier: AGPL3.0-or-later
+ */
/**
* @file transport/transport_api2_communication.c
#include "gnunet_util_lib.h"
#include "gnunet_protocols.h"
#include "gnunet_transport_communication_service.h"
+#include "gnunet_ats_transport_service.h"
#include "transport.h"
/**
- * How many messages do we keep at most in the queue to the
- * transport service before we start to drop (default,
+ * How many messages do we keep at most in the queue to the
+ * transport service before we start to drop (default,
* can be changed via the configuration file).
*/
#define DEFAULT_MAX_QUEUE_LENGTH 16
* Closure for @e cb
*/
void *cb_cls;
-
+
/**
* Which peer is this about?
*/
*/
struct AckPending *prev;
+ /**
+ * Communicator this entry belongs to.
+ */
+ struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
+
/**
* Which peer is this about?
*/
/**
* DLL of messages awaiting transmission confirmation (ack).
*/
- struct AckPending *ac_tail;
+ struct AckPending *ap_tail;
/**
* DLL of queues we offer.
*/
- struct QueueHandle *queue_head;
-
+ struct GNUNET_TRANSPORT_QueueHandle *queue_head;
+
/**
* DLL of queues we offer.
*/
- struct QueueHandle *queue_tail;
-
+ struct GNUNET_TRANSPORT_QueueHandle *queue_tail;
+
/**
* Our configuration.
*/
const struct GNUNET_CONFIGURATION_Handle *cfg;
/**
- * Name of the communicator.
+ * Config section to use.
*/
- const char *name;
+ const char *config_section;
+
+ /**
+ * Address prefix to use.
+ */
+ const char *addr_prefix;
/**
* Function to call when the transport service wants us to initiate
*/
void *mq_init_cls;
+ /**
+ * Function to call when the transport service receives messages
+ * for a communicator (i.e. for NAT traversal or for non-bidirectional
+ * communicators).
+ */
+ GNUNET_TRANSPORT_CommunicatorNotify notify_cb;
+
+ /**
+ * Closure for @e notify_Cb.
+ */
+ void *notify_cb_cls;
+
+ /**
+ * Queue to talk to the transport service.
+ */
+ struct GNUNET_MQ_Handle *mq;
+
/**
* Maximum permissable queue length.
*/
unsigned long long max_queue_length;
-
+
/**
* Flow-control identifier generator.
*/
uint64_t fc_gen;
- /**
- * MTU of the communicator
- */
- size_t mtu;
-
/**
* Internal UUID for the address used in communication with the
* transport service.
* Queue identifier generator.
*/
uint32_t queue_gen;
-
+
+ /**
+ * Characteristics of the communicator.
+ */
+ enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc;
};
struct GNUNET_TRANSPORT_QueueHandle
{
/**
- * Handle this queue belongs to.
+ * Kept in a DLL.
*/
- struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
+ struct GNUNET_TRANSPORT_QueueHandle *next;
/**
- * Which peer we can communciate with.
+ * Kept in a DLL.
*/
- struct GNUNET_PeerIdentity peer;
+ struct GNUNET_TRANSPORT_QueueHandle *prev;
+
+ /**
+ * Handle this queue belongs to.
+ */
+ struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
/**
* Address used by the communication queue.
- */
+ */
char *address;
+ /**
+ * The queue itself.
+ */
+ struct GNUNET_MQ_Handle *mq;
+
+ /**
+ * Which peer we can communciate with.
+ */
+ struct GNUNET_PeerIdentity peer;
+
/**
* Network type of the communciation queue.
*/
- enum GNUNET_ATS_Network_Type nt;
+ enum GNUNET_NetworkType nt;
/**
- * The queue itself.
- */
- struct GNUNET_MQ_Handle *mq;
+ * Communication status of the queue.
+ */
+ enum GNUNET_TRANSPORT_ConnectionStatus cs;
/**
* ID for this queue when talking to the transport service.
*/
uint32_t queue_id;
-
+
+ /**
+ * Maximum transmission unit for the queue.
+ */
+ uint32_t mtu;
};
*/
struct GNUNET_TRANSPORT_AddressIdentifier
{
-
/**
* Kept in a DLL.
*/
* Kept in a DLL.
*/
struct GNUNET_TRANSPORT_AddressIdentifier *prev;
-
+
/**
* Transport handle where the address was added.
*/
* address.)
*/
struct GNUNET_TIME_Relative expiration;
-
+
/**
* Internal UUID for the address used in communication with the
* transport service.
/**
* Network type for the address.
*/
- enum GNUNET_ATS_Network_Type nt;
-
+ enum GNUNET_NetworkType nt;
};
{
struct GNUNET_MQ_Envelope *env;
struct GNUNET_TRANSPORT_AddAddressMessage *aam;
-
+
if (NULL == ai->ch->mq)
return;
env = GNUNET_MQ_msg_extra (aam,
- strlen (ai->address) + 1,
- GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS);
- aam->expiration = GNUNET_TIME_relative_to_nbo (ai->expiration);
+ strlen (ai->address) + 1,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS);
+ aam->expiration = GNUNET_TIME_relative_hton (ai->expiration);
aam->nt = htonl ((uint32_t) ai->nt);
- memcpy (&aam[1],
- ai->address,
- strlen (ai->address) + 1);
- GNUNET_MQ_send (ai->ch->mq,
- env);
+ memcpy (&aam[1], ai->address, strlen (ai->address) + 1);
+ GNUNET_MQ_send (ai->ch->mq, env);
}
{
struct GNUNET_MQ_Envelope *env;
struct GNUNET_TRANSPORT_DelAddressMessage *dam;
-
+
if (NULL == ai->ch->mq)
return;
- env = GNUNET_MQ_msg (dam,
- GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS);
- dam.aid = htonl (ai->aid);
- GNUNET_MQ_send (ai->ch->mq,
- env);
+ env = GNUNET_MQ_msg (dam, GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS);
+ dam->aid = htonl (ai->aid);
+ GNUNET_MQ_send (ai->ch->mq, env);
}
{
struct GNUNET_MQ_Envelope *env;
struct GNUNET_TRANSPORT_AddQueueMessage *aqm;
-
- if (NULL == ai->ch->mq)
+
+ if (NULL == qh->ch->mq)
return;
env = GNUNET_MQ_msg_extra (aqm,
- strlen (ai->address) + 1,
- GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_QUEUE);
- aqm.receiver = qh->peer;
- aqm.nt = htonl ((uint32_t) qh->nt);
- aqm.qid = htonl (qh->qid);
- memcpy (&aqm[1],
- ai->address,
- strlen (ai->address) + 1);
- GNUNET_MQ_send (ai->ch->mq,
- env);
+ strlen (qh->address) + 1,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP);
+ aqm->qid = htonl (qh->queue_id);
+ aqm->receiver = qh->peer;
+ aqm->nt = htonl ((uint32_t) qh->nt);
+ aqm->mtu = htonl (qh->mtu);
+ aqm->cs = htonl ((uint32_t) qh->cs);
+ memcpy (&aqm[1], qh->address, strlen (qh->address) + 1);
+ GNUNET_MQ_send (qh->ch->mq, env);
}
{
struct GNUNET_MQ_Envelope *env;
struct GNUNET_TRANSPORT_DelQueueMessage *dqm;
-
- if (NULL == ai->ch->mq)
+
+ if (NULL == qh->ch->mq)
return;
- env = GNUNET_MQ_msg (dqm,
- GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_QUEUE);
- dqm.qid = htonl (qh->qid);
- dqm.receiver = qh->peer;
- GNUNET_MQ_send (ai->ch->mq,
- env);
+ env = GNUNET_MQ_msg (dqm, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN);
+ dqm->qid = htonl (qh->queue_id);
+ dqm->receiver = qh->peer;
+ GNUNET_MQ_send (qh->ch->mq, env);
}
{
struct FlowControl *fcn;
struct AckPending *apn;
-
- for (struct FlowControl *fc = ch->fc_head;
- NULL != fc;
- fc = fcn)
+
+ for (struct FlowControl *fc = ch->fc_head; NULL != fc; fc = fcn)
{
fcn = fc->next;
- GNUNET_CONTAINER_DLL_remove (ch->fc_head,
- ch->fc_tail,
- fc);
- fc->cb (fc->cb_cls,
- GNUNET_SYSERR);
+ GNUNET_CONTAINER_DLL_remove (ch->fc_head, ch->fc_tail, fc);
+ fc->cb (fc->cb_cls, GNUNET_SYSERR);
GNUNET_free (fc);
}
- for (struct AckPending *ap = ch->ap_head;
- NULL != ap;
- ap = apn)
+ for (struct AckPending *ap = ch->ap_head; NULL != ap; ap = apn)
{
apn = ap->next;
- GNUNET_CONTAINER_DLL_remove (ch->ap_head,
- ch->ap_tail,
- ap);
+ GNUNET_CONTAINER_DLL_remove (ch->ap_head, ch->ap_tail, ap);
GNUNET_free (ap);
}
if (NULL == ch->mq)
* Function called on MQ errors.
*/
static void
-error_handler (void *cls,
- enum GNUNET_MQ_Error error)
+error_handler (void *cls, enum GNUNET_MQ_Error error)
{
struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "MQ failure, reconnecting to transport service.\n");
+ "MQ failure %d, reconnecting to transport service.\n",
+ error);
disconnect (ch);
/* TODO: maybe do this with exponential backoff/delay */
reconnect (ch);
* @param incoming_ack the ack
*/
static void
-handle_incoming_ack (void *cls,
- struct GNUNET_TRANSPORT_IncomingMessageAck *incoming_ack)
+handle_incoming_ack (
+ void *cls,
+ const struct GNUNET_TRANSPORT_IncomingMessageAck *incoming_ack)
{
struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
-
- for (struct FlowControl *fc = ch->fc_head;
- NULL != fc;
- fc = fc->next)
+
+ for (struct FlowControl *fc = ch->fc_head; NULL != fc; fc = fc->next)
{
- if ( (fc->id == incoming_ack->fc_id) &&
- (0 == memcmp (&fc->sender,
- incoming_ack->sender,
- sizeof (struct GNUNET_PeerIdentity))) )
+ if ((fc->id == incoming_ack->fc_id) &&
+ (0 == memcmp (&fc->sender,
+ &incoming_ack->sender,
+ sizeof(struct GNUNET_PeerIdentity))))
{
- GNUNET_CONTAINER_DLL_remove (ch->fc_head,
- ch->fc_tail,
- fc);
- fc->cb (fc->cb_cls,
- GNUNET_OK);
+ GNUNET_CONTAINER_DLL_remove (ch->fc_head, ch->fc_tail, fc);
+ fc->cb (fc->cb_cls, GNUNET_OK);
GNUNET_free (fc);
return;
}
* @return #GNUNET_OK if @a smt is well-formed
*/
static int
-check_create_queue (void *cls,
- struct GNUNET_TRANSPORT_CreateQueue *cq)
+check_create_queue (void *cls, const struct GNUNET_TRANSPORT_CreateQueue *cq)
{
- uint16_t len = ntohs (cq->header.size) - sizeof (*cq);
- const char *addr = (const char *) &cq[1];
-
- if ( (0 == len) ||
- ('\0' != addr[len-1]) )
- {
- GNUNET_break (0);
- return GNUNET_SYSERR;
- }
+ (void) cls;
+ GNUNET_MQ_check_zero_termination (cq);
return GNUNET_OK;
}
* @param cq the queue creation request
*/
static void
-handle_create_queue (void *cls,
- struct GNUNET_TRANSPORT_CreateQueue *cq)
+handle_create_queue (void *cls, const struct GNUNET_TRANSPORT_CreateQueue *cq)
{
struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
const char *addr = (const char *) &cq[1];
+ struct GNUNET_TRANSPORT_CreateQueueResponse *cqr;
+ struct GNUNET_MQ_Envelope *env;
- if (GNUNET_OK !=
- ch->mq_init (ch->mq_init_cls,
- &cq->receiver,
- addr))
+ if (GNUNET_OK != ch->mq_init (ch->mq_init_cls, &cq->receiver, addr))
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Address `%s' invalid for this communicator\n",
- addr);
- // TODO: do we notify the transport!?
+ "Address `%s' invalid for this communicator\n",
+ addr);
+ env = GNUNET_MQ_msg (cqr, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL);
}
+ else
+ {
+ env = GNUNET_MQ_msg (cqr, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK);
+ }
+ cqr->request_id = cq->request_id;
+ GNUNET_MQ_send (ch->mq, env);
}
* @return #GNUNET_OK if @a smt is well-formed
*/
static int
-check_send_msg (void *cls,
- struct GNUNET_TRANSPORT_SendMessageTo *smt)
+check_send_msg (void *cls, const struct GNUNET_TRANSPORT_SendMessageTo *smt)
{
- uint16_t len = ntohs (smt->header.size) - sizeof (*smt);
- const struct GNUNET_MessageHeader *mh = (const struct GNUNET_MessageHeader *) &smt[1];
-
- if (ntohs (mh->size) != len)
- {
- GNUNET_break (0);
- return GNUNET_SYSERR;
- }
+ (void) cls;
+ GNUNET_MQ_check_boxed_message (smt);
return GNUNET_OK;
}
*/
static void
send_ack (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
- int status,
- const struct GNUNET_PeerIdentity *receiver,
- uint64_t mid)
+ int status,
+ const struct GNUNET_PeerIdentity *receiver,
+ uint64_t mid)
{
struct GNUNET_MQ_Envelope *env;
struct GNUNET_TRANSPORT_SendMessageToAck *ack;
- env = GNUNET_MQ_msg (ack,
- GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK);
- ack->status = htonl (GNUNET_OK);
- ack->mid = ap->mid;
- ack->receiver = ap->receiver;
- GNUNET_MQ_send (ch->mq,
- env);
+ env = GNUNET_MQ_msg (ack, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK);
+ ack->status = htonl (status);
+ ack->mid = mid;
+ ack->receiver = *receiver;
+ GNUNET_MQ_send (ch->mq, env);
}
struct AckPending *ap = cls;
struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ap->ch;
- GNUNET_CONTAINER_DLL_remove (ch->ap_head,
- ch->ap_tail,
- ap);
- send_ack (ch,
- GNUNET_OK,
- &ap->receiver,
- ap->mid);
+ GNUNET_CONTAINER_DLL_remove (ch->ap_head, ch->ap_tail, ap);
+ send_ack (ch, GNUNET_OK, &ap->receiver, ap->mid);
GNUNET_free (ap);
}
* @param smt the transmission request
*/
static void
-handle_send_msg (void *cls,
- struct GNUNET_TRANSPORT_SendMessageTo *smt)
+handle_send_msg (void *cls, const struct GNUNET_TRANSPORT_SendMessageTo *smt)
{
struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
const struct GNUNET_MessageHeader *mh;
struct GNUNET_MQ_Envelope *env;
struct AckPending *ap;
- struct QueueHandle *qh;
-
- for (qh = ch->queue_head;NULL != qh; qh = qh->next)
- if ( (qh->queue_id == smt->qid) &&
- (0 == memcmp (&qh->peer,
- &smt->target,
- sizeof (struct GNUNET_PeerIdentity))) )
- break;
+ struct GNUNET_TRANSPORT_QueueHandle *qh;
+
+ for (qh = ch->queue_head; NULL != qh; qh = qh->next)
+ if ((qh->queue_id == smt->qid) &&
+ (0 == memcmp (&qh->peer,
+ &smt->receiver,
+ sizeof(struct GNUNET_PeerIdentity))))
+ break;
if (NULL == qh)
{
/* queue is already gone, tell transport this one failed */
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Transmission failed, queue no longer exists.\n");
- send_ack (ch,
- GNUNET_NO,
- &smt->receiver,
- smt->mid);
+ "Transmission failed, queue no longer exists.\n");
+ send_ack (ch, GNUNET_NO, &smt->receiver, smt->mid);
return;
}
ap = GNUNET_new (struct AckPending);
ap->ch = ch;
ap->receiver = smt->receiver;
ap->mid = smt->mid;
- GNUNET_CONTAINER_DLL_insert (ch->ap_head,
- cp->ap_tail,
- ap);
+ GNUNET_CONTAINER_DLL_insert (ch->ap_head, ch->ap_tail, ap);
mh = (const struct GNUNET_MessageHeader *) &smt[1];
env = GNUNET_MQ_msg_copy (mh);
- GNUNET_MQ_notify_sent (env,
- &send_ack_cb,
- ap);
- GNUNET_MQ_send (qh->mq,
- env);
+ GNUNET_MQ_notify_sent (env, &send_ack_cb, ap);
+ GNUNET_MQ_send (qh->mq, env);
+}
+
+
+/**
+ * Transport service gives us backchannel message. Check if @a bi
+ * is well-formed.
+ *
+ * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
+ * @param bi the backchannel message
+ * @return #GNUNET_OK if @a smt is well-formed
+ */
+static int
+check_backchannel_incoming (
+ void *cls,
+ const struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *bi)
+{
+ (void) cls;
+ GNUNET_MQ_check_boxed_message (bi);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Transport service gives us backchannel message. Handle it.
+ *
+ * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
+ * @param bi the backchannel message
+ */
+static void
+handle_backchannel_incoming (
+ void *cls,
+ const struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *bi)
+{
+ struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
+
+ if (NULL != ch->notify_cb)
+ ch->notify_cb (ch->notify_cb_cls,
+ &bi->pid,
+ (const struct GNUNET_MessageHeader *) &bi[1]);
+ else
+ GNUNET_log (
+ GNUNET_ERROR_TYPE_INFO,
+ _ ("Dropped backchanel message: handler not provided by communicator\n"));
}
static void
reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
{
- struct GNUNET_MQ_MessageHandler handlers[] = {
- GNUNET_MQ_hd_fixed_size (incoming_ack,
- GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK,
- struct GNUNET_TRANSPORT_IncomingMessageAck,
- ch),
+ struct GNUNET_MQ_MessageHandler handlers[] =
+ { GNUNET_MQ_hd_fixed_size (incoming_ack,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK,
+ struct GNUNET_TRANSPORT_IncomingMessageAck,
+ ch),
GNUNET_MQ_hd_var_size (create_queue,
- GNUNET_MESSAGE_TYPE_TRANSPORT_CREATE_QUEUE,
- struct GNUNET_TRANSPORT_CreateQueue,
- ch),
+ GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE,
+ struct GNUNET_TRANSPORT_CreateQueue,
+ ch),
GNUNET_MQ_hd_var_size (send_msg,
- GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG,
- struct GNUNET_TRANSPORT_SendMessageTo,
- ch),
- GNUNET_MQ_handler_end()
- };
-
- ch->mq = GNUNET_CLIENT_connect (cfg,
- "transport",
- handlers,
- &error_handler,
- ch);
- for (struct GNUNET_TRANSPORT_AddressIdentifier *ai = ch->ai_head;
- NULL != ai;
+ GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG,
+ struct GNUNET_TRANSPORT_SendMessageTo,
+ ch),
+ GNUNET_MQ_hd_var_size (
+ backchannel_incoming,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL_INCOMING,
+ struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming,
+ ch),
+ GNUNET_MQ_handler_end () };
+ struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam;
+ struct GNUNET_MQ_Envelope *env;
+
+ ch->mq =
+ GNUNET_CLIENT_connect (ch->cfg, "transport", handlers, &error_handler, ch);
+ if (NULL == ch->mq)
+ return;
+ env = GNUNET_MQ_msg_extra (cam,
+ strlen (ch->addr_prefix) + 1,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR);
+ cam->cc = htonl ((uint32_t) ch->cc);
+ memcpy (&cam[1], ch->addr_prefix, strlen (ch->addr_prefix) + 1);
+ GNUNET_MQ_send (ch->mq, env);
+ for (struct GNUNET_TRANSPORT_AddressIdentifier *ai = ch->ai_head; NULL != ai;
ai = ai->next)
send_add_address (ai);
- for (struct GNUNET_TRANSPORT_QueueHandle *qh = ch->queue_head;
- NULL != qh;
+ for (struct GNUNET_TRANSPORT_QueueHandle *qh = ch->queue_head; NULL != qh;
qh = qh->next)
send_add_queue (qh);
}
* Connect to the transport service.
*
* @param cfg configuration to use
- * @param name name of the communicator that is connecting
+ * @param config_section section of the configuration to use for options
+ * @param addr_prefix address prefix for addresses supported by this
+ * communicator, could be NULL for incoming-only communicators
+ * @param cc what characteristics does the communicator have?
* @param mtu maximum message size supported by communicator, 0 if
* sending is not supported, SIZE_MAX for no MTU
* @param mq_init function to call to initialize a message queue given
* the address of another peer, can be NULL if the
* communicator only supports receiving messages
* @param mq_init_cls closure for @a mq_init
+ * @param notify_cb function to pass backchannel messages to communicator
+ * @param notify_cb_cls closure for @a notify_cb
* @return NULL on error
*/
struct GNUNET_TRANSPORT_CommunicatorHandle *
-GNUNET_TRANSPORT_communicator_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
- const char *name,
- size_t mtu,
- GNUNET_TRANSPORT_CommunicatorMqInit mq_init,
- void *mq_init_cls)
+GNUNET_TRANSPORT_communicator_connect (
+ const struct GNUNET_CONFIGURATION_Handle *cfg,
+ const char *config_section,
+ const char *addr_prefix,
+ enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc,
+ GNUNET_TRANSPORT_CommunicatorMqInit mq_init,
+ void *mq_init_cls,
+ GNUNET_TRANSPORT_CommunicatorNotify notify_cb,
+ void *notify_cb_cls)
{
struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
-
+
ch = GNUNET_new (struct GNUNET_TRANSPORT_CommunicatorHandle);
ch->cfg = cfg;
- ch->name = name;
- ch->mtu = mtu;
+ ch->config_section = config_section;
+ ch->addr_prefix = addr_prefix;
ch->mq_init = mq_init;
ch->mq_init_cls = mq_init_cls;
+ ch->notify_cb = notify_cb;
+ ch->notify_cb_cls = notify_cb_cls;
+ ch->cc = cc;
reconnect (ch);
if (GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_number (cfg,
- name,
- "MAX_QUEUE_LENGTH",
- &ch->max_queue_length))
+ config_section,
+ "MAX_QUEUE_LENGTH",
+ &ch->max_queue_length))
ch->max_queue_length = DEFAULT_MAX_QUEUE_LENGTH;
if (NULL == ch->mq)
{
* @param ch handle returned from connect
*/
void
-GNUNET_TRANSPORT_communicator_disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
+GNUNET_TRANSPORT_communicator_disconnect (
+ struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
{
disconnect (ch);
while (NULL != ch->ai_head)
{
- GNUNET_break (0); /* communicator forgot to remove address, warn! */
+ GNUNET_break (0); /* communicator forgot to remove address, warn! */
GNUNET_TRANSPORT_communicator_address_remove (ch->ai_head);
}
GNUNET_free (ch);
* @param sender presumed sender of the message (details to be checked
* by higher layers)
* @param msg the message
+ * @param expected_addr_validity how long does the communicator believe it
+ * will continue to be able to receive messages from the same address
+ * on which it received this message?
* @param cb function to call once handling the message is done, NULL if
* flow control is not supported by this communicator
* @param cb_cls closure for @a cb
* the tranport service is not yet up
*/
int
-GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
- const struct GNUNET_PeerIdentity *sender,
- const struct GNUNET_MessageHeader *msg,
- GNUNET_TRANSPORT_MessageCompletedCallback cb,
- void *cb_cls)
+GNUNET_TRANSPORT_communicator_receive (
+ struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *msg,
+ struct GNUNET_TIME_Relative expected_addr_validity,
+ GNUNET_TRANSPORT_MessageCompletedCallback cb,
+ void *cb_cls)
{
struct GNUNET_MQ_Envelope *env;
struct GNUNET_TRANSPORT_IncomingMessage *im;
uint16_t msize;
-
- if (NULL == ai->ch->mq)
+
+ if (NULL == ch->mq)
return GNUNET_SYSERR;
+ if ((NULL == cb) && (GNUNET_MQ_get_length (ch->mq) >= ch->max_queue_length))
+ {
+ GNUNET_log (
+ GNUNET_ERROR_TYPE_WARNING,
+ "Dropping message: transprot is too slow, queue length %llu exceeded\n",
+ ch->max_queue_length);
+ return GNUNET_NO;
+ }
+
+ msize = ntohs (msg->size);
+ env =
+ GNUNET_MQ_msg_extra (im, msize, GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG);
+ if (NULL == env)
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+ im->expected_address_validity =
+ GNUNET_TIME_relative_hton (expected_addr_validity);
+ im->sender = *sender;
+ memcpy (&im[1], msg, msize);
if (NULL != cb)
{
struct FlowControl *fc;
im->fc_on = htonl (GNUNET_YES);
- im->fc_id = ai->ch->fc_gen++;
+ im->fc_id = ch->fc_gen++;
fc = GNUNET_new (struct FlowControl);
fc->sender = *sender;
fc->id = im->fc_id;
fc->cb = cb;
fc->cb_cls = cb_cls;
- GNUNET_CONTAINER_DLL_insert (ch->fc_head,
- ch->fc_tail,
- fc);
- }
- else
- {
- if (GNUNET_MQ_get_length (ch->mq) >= ch->max_queue_length)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Dropping message: transprot is too slow, queue length %u exceeded\n",
- ch->max_queue_length);
- return GNUNET_NO;
- }
- }
-
- msize = ntohs (msg->size);
- env = GNUNET_MQ_msg_extra (im,
- msize,
- GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG);
- if (NULL == env)
- {
- GNUNET_break (0);
- return GNUNET_SYSERR;
+ GNUNET_CONTAINER_DLL_insert (ch->fc_head, ch->fc_tail, fc);
}
- im->sender = *sender;
- memcpy (&im[1],
- msg,
- msize);
- GNUNET_MQ_send (ai->ch->mq,
- env);
+ GNUNET_MQ_send (ch->mq, env);
return GNUNET_OK;
}
* @param ch connection to transport service
* @param peer peer with which we can now communicate
* @param address address in human-readable format, 0-terminated, UTF-8
+ * @param mtu maximum message size supported by queue, 0 if
+ * sending is not supported, SIZE_MAX for no MTU
* @param nt which network type does the @a address belong to?
+ * @param cc what characteristics does the communicator have?
+ * @param cs what is the connection status of the queue?
* @param mq message queue of the @a peer
* @return API handle identifying the new MQ
*/
struct GNUNET_TRANSPORT_QueueHandle *
-GNUNET_TRANSPORT_communicator_mq_add (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
- const struct GNUNET_PeerIdentity *peer,
- const char *address,
- enum GNUNET_ATS_Network_Type nt,
- struct GNUNET_MQ_Handle *mq)
+GNUNET_TRANSPORT_communicator_mq_add (
+ struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
+ const struct GNUNET_PeerIdentity *peer,
+ const char *address,
+ uint32_t mtu,
+ enum GNUNET_NetworkType nt,
+ enum GNUNET_TRANSPORT_ConnectionStatus cs,
+ struct GNUNET_MQ_Handle *mq)
{
struct GNUNET_TRANSPORT_QueueHandle *qh;
qh->peer = *peer;
qh->address = GNUNET_strdup (address);
qh->nt = nt;
+ qh->mtu = mtu;
+ qh->cs = cs;
qh->mq = mq;
qh->queue_id = ch->queue_gen++;
- GNUNET_CONTAINER_DLL_insert (ch->queue_head,
- ch->queue_tail,
- qh);
+ GNUNET_CONTAINER_DLL_insert (ch->queue_head, ch->queue_tail, qh);
send_add_queue (qh);
return qh;
}
GNUNET_TRANSPORT_communicator_mq_del (struct GNUNET_TRANSPORT_QueueHandle *qh)
{
struct GNUNET_TRANSPORT_CommunicatorHandle *ch = qh->ch;
-
+
send_del_queue (qh);
- GNUNET_CONTAINER_DLL_remove (ch->queue_head,
- ch->queue_tail,
- qh);
+ GNUNET_CONTAINER_DLL_remove (ch->queue_head, ch->queue_tail, qh);
GNUNET_MQ_destroy (qh->mq);
GNUNET_free (qh->address);
GNUNET_free (qh);
* @param expiration when does the communicator forsee this address expiring?
*/
struct GNUNET_TRANSPORT_AddressIdentifier *
-GNUNET_TRANSPORT_communicator_address_add (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
- const char *address,
- enum GNUNET_ATS_Network_Type nt,
- struct GNUNET_TIME_Relative expiration)
+GNUNET_TRANSPORT_communicator_address_add (
+ struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
+ const char *address,
+ enum GNUNET_NetworkType nt,
+ struct GNUNET_TIME_Relative expiration)
{
struct GNUNET_TRANSPORT_AddressIdentifier *ai;
ai->address = GNUNET_strdup (address);
ai->nt = nt;
ai->expiration = expiration;
- ai->aid = handle->aid_gen++;
- GNUNET_CONTAINER_DLL_insert (handle->ai_head,
- handle->ai_tail,
- ai);
+ ai->aid = ch->aid_gen++;
+ GNUNET_CONTAINER_DLL_insert (ch->ai_head, ch->ai_tail, ai);
send_add_address (ai);
return ai;
}
* @param ai address that is no longer provided
*/
void
-GNUNET_TRANSPORT_communicator_address_remove (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
+GNUNET_TRANSPORT_communicator_address_remove (
+ struct GNUNET_TRANSPORT_AddressIdentifier *ai)
{
struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ai->ch;
send_del_address (ai);
- GNUNET_CONTAINER_DLL_remove (ch->ai_head,
- ch->ai_tail,
- ai);
+ GNUNET_CONTAINER_DLL_remove (ch->ai_head, ch->ai_tail, ai);
GNUNET_free (ai->address);
GNUNET_free (ai);
}
+/* ************************* Backchannel *************************** */
+
+
+/**
+ * The communicator asks the transport service to route a message via
+ * a different path to another communicator service at another peer.
+ * This must only be done for special control traffic (as there is no
+ * flow control for this API), such as acknowledgements, and generally
+ * only be done if the communicator is uni-directional (i.e. cannot
+ * send the message back itself).
+ *
+ * @param ch handle of this communicator
+ * @param pid peer to send the message to
+ * @param comm name of the communicator to send the message to
+ * @param header header of the message to transmit and pass via the
+ * notify-API to @a pid's communicator @a comm
+ */
+void
+GNUNET_TRANSPORT_communicator_notify (
+ struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
+ const struct GNUNET_PeerIdentity *pid,
+ const char *comm,
+ const struct GNUNET_MessageHeader *header)
+{
+ struct GNUNET_MQ_Envelope *env;
+ struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb;
+ size_t slen = strlen (comm) + 1;
+ uint16_t mlen = ntohs (header->size);
+
+ GNUNET_assert (mlen + slen + sizeof(*cb) < UINT16_MAX);
+ env =
+ GNUNET_MQ_msg_extra (cb,
+ slen + mlen,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL);
+ cb->pid = *pid;
+ memcpy (&cb[1], header, mlen);
+ memcpy (((char *) &cb[1]) + mlen, comm, slen);
+ GNUNET_MQ_send (ch->mq, env);
+}
+
+
/* end of transport_api2_communication.c */