*/
void *ctx;
- /**
- * Size of packet queued in this channel
- */
- unsigned int packet_size;
-
/**
* Channel options: reliability, etc.
*/
/**
* Are we allowed to send to the service?
*/
- int allow_send;
+ unsigned int allow_send;
};
{
ch->ccn = ccn;
}
- ch->allow_send = GNUNET_NO;
return ch;
}
*/
// FIXME: simplify: call_cleaner is always #GNUNET_YES!!!
static void
-destroy_channel (struct GNUNET_CADET_Channel *ch, int call_cleaner)
+destroy_channel (struct GNUNET_CADET_Channel *ch,
+ int call_cleaner)
{
struct GNUNET_CADET_Handle *h;
struct GNUNET_CADET_TransmitHandle *th;
struct GNUNET_CADET_TransmitHandle *next;
- LOG (GNUNET_ERROR_TYPE_DEBUG, " destroy_channel %X\n", ch->ccn);
-
if (NULL == ch)
{
GNUNET_break (0);
return;
}
h = ch->cadet;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ " destroy_channel %X of %p\n",
+ ch->ccn,
+ h);
GNUNET_CONTAINER_DLL_remove (h->channels_head,
h->channels_tail,
ch);
/* signal channel destruction */
- if ( (NULL != h->cleaner) && (0 != ch->peer) && (GNUNET_YES == call_cleaner) )
+ if ( (NULL != h->cleaner) &&
+ (0 != ch->peer) &&
+ (GNUNET_YES == call_cleaner) )
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, " calling cleaner\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ " calling cleaner\n");
h->cleaner (h->cls, ch, ch->ctx);
}
if (0 != ch->peer)
GNUNET_PEER_change_rc (ch->peer, -1);
GNUNET_free (ch);
-
}
add_to_queue (struct GNUNET_CADET_Handle *h,
struct GNUNET_CADET_TransmitHandle *th)
{
- GNUNET_CONTAINER_DLL_insert_tail (h->th_head, h->th_tail, th);
+ GNUNET_CONTAINER_DLL_insert_tail (h->th_head,
+ h->th_tail,
+ th);
}
}
-/**
- * Send an ack on the channel to confirm the processing of a message.
- *
- * @param ch Channel on which to send the ACK.
- */
-static void
-send_ack (struct GNUNET_CADET_Channel *ch)
-{
- struct GNUNET_CADET_LocalAck *msg;
- struct GNUNET_MQ_Envelope *env;
-
- env = GNUNET_MQ_msg (msg,
- GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
-
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Sending ACK on channel %X\n",
- ch->ccn.channel_of_client);
- msg->ccn = ch->ccn;
- GNUNET_MQ_send (ch->cadet->mq,
- env);
-}
-
-
/******************************************************************************/
/*********************** RECEIVE HANDLERS ****************************/
struct GNUNET_MQ_Envelope *env;
size_t osize;
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting Data: %u bytes\n", th->size);
-
- GNUNET_assert (GNUNET_YES == th->channel->allow_send);
- th->channel->allow_send = GNUNET_NO;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Requesting Data: %u bytes (allow send is %u)\n",
+ th->size,
+ th->channel->allow_send);
+
+ GNUNET_assert (0 < th->channel->allow_send);
+ th->channel->allow_send--;
+ /* NOTE: we may be allowed to send another packet immediately,
+ albeit the current logic waits for the ACK. */
th->request_data_task = NULL;
- th->channel->packet_size = 0;
remove_from_queue (th);
env = GNUNET_MQ_msg_extra (msg,
th->size,
&msg[1]);
GNUNET_assert (osize == th->size);
+
GNUNET_MQ_send (th->channel->cadet->mq,
env);
GNUNET_free (th);
ccn = msg->ccn;
port_number = &msg->port;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Creating incoming channel %X [%s]\n",
- ntohl (ccn.channel_of_client),
- GNUNET_h2s (port_number));
if (ntohl (ccn.channel_of_client) >= GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
{
GNUNET_break (0);
return;
}
port = find_port (h, port_number);
- if (NULL != port)
- {
- void *ctx;
-
- ch = create_channel (h, ccn);
- ch->allow_send = GNUNET_NO;
- ch->peer = GNUNET_PEER_intern (&msg->peer);
- ch->cadet = h;
- ch->ccn = ccn;
- ch->port = port;
- ch->options = ntohl (msg->opt);
-
- LOG (GNUNET_ERROR_TYPE_DEBUG, " created channel %p\n", ch);
- ctx = port->handler (port->cls, ch, &msg->peer, port->hash, ch->options);
- if (NULL != ctx)
- ch->ctx = ctx;
- LOG (GNUNET_ERROR_TYPE_DEBUG, "User notified\n");
- }
- else
+ if (NULL == port)
{
struct GNUNET_CADET_LocalChannelDestroyMessage *d_msg;
struct GNUNET_MQ_Envelope *env;
- LOG (GNUNET_ERROR_TYPE_DEBUG, "No handler for incoming channels\n");
+ GNUNET_break (0);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "No handler for incoming channel %X [%s]\n",
+ ntohl (ccn.channel_of_client),
+ GNUNET_h2s (port_number));
+ /* FIXME: should disconnect instead, this is a serious error! */
env = GNUNET_MQ_msg (d_msg,
GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY);
d_msg->ccn = msg->ccn;
- GNUNET_MQ_send (h->mq, env);
+ GNUNET_MQ_send (h->mq,
+ env);
+ return;
}
- return;
+
+ ch = create_channel (h,
+ ccn);
+ ch->peer = GNUNET_PEER_intern (&msg->peer);
+ ch->cadet = h;
+ ch->ccn = ccn;
+ ch->port = port;
+ ch->options = ntohl (msg->opt);
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Creating incoming channel %X [%s] %p\n",
+ ntohl (ccn.channel_of_client),
+ GNUNET_h2s (port_number),
+ ch);
+ ch->ctx = port->handler (port->cls,
+ ch,
+ &msg->peer,
+ port->hash,
+ ch->options);
}
const struct GNUNET_MessageHeader *payload;
const struct GNUNET_CADET_MessageHandler *handler;
struct GNUNET_CADET_Channel *ch;
- unsigned int i;
uint16_t type;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Got a data message!\n");
ch = retrieve_channel (h,
message->ccn);
GNUNET_assert (NULL != ch);
payload = (struct GNUNET_MessageHeader *) &message[1];
- LOG (GNUNET_ERROR_TYPE_DEBUG, " %s data on channel %s [%X]\n",
+ type = ntohs (payload->type);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Got a %s data on channel %s [%X] of type %s (%u)\n",
GC_f2s (ntohl (ch->ccn.channel_of_client) >=
GNUNET_CADET_LOCAL_CHANNEL_ID_CLI),
GNUNET_i2s (GNUNET_PEER_resolve2 (ch->peer)),
- ntohl (message->ccn.channel_of_client));
-
- type = ntohs (payload->type);
- LOG (GNUNET_ERROR_TYPE_DEBUG, " payload type %s\n", GC_m2s (type));
- for (i = 0; i < h->n_handlers; i++)
+ ntohl (message->ccn.channel_of_client),
+ GC_m2s (type),
+ type);
+ for (unsigned i=0;i<h->n_handlers;i++)
{
handler = &h->message_handlers[i];
- LOG (GNUNET_ERROR_TYPE_DEBUG, " checking handler for type %u\n",
- handler->type);
if (handler->type == type)
{
if (GNUNET_OK !=
- handler->callback (h->cls, ch, &ch->ctx, payload))
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, "callback caused disconnection\n");
- GNUNET_CADET_channel_destroy (ch);
- break;
- }
- else
+ handler->callback (h->cls,
+ ch,
+ &ch->ctx,
+ payload))
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "callback completed successfully\n");
- break;
+ "callback caused disconnection\n");
+ GNUNET_CADET_channel_destroy (ch);
+ return;
}
+ return;
}
}
+ /* Other peer sent message we do not comprehend. */
+ GNUNET_break_op (0);
+ GNUNET_CADET_receive_done (ch);
}
struct GNUNET_CADET_Handle *h = cls;
struct GNUNET_CADET_Channel *ch;
struct GNUNET_CADET_ClientChannelNumber ccn;
+ struct GNUNET_CADET_TransmitHandle *th;
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Got an ACK!\n");
ccn = message->ccn;
ch = retrieve_channel (h, ccn);
if (NULL == ch)
ntohl (ccn.channel_of_client));
return;
}
+ ch->allow_send++;
LOG (GNUNET_ERROR_TYPE_DEBUG,
- " on channel %X!\n",
- ntohl (ch->ccn.channel_of_client));
- ch->allow_send = GNUNET_YES;
- if (0 < ch->packet_size)
+ "Got an ACK on channel %X, allow send now %u!\n",
+ ntohl (ch->ccn.channel_of_client),
+ ch->allow_send);
+ for (th = h->th_head; NULL != th; th = th->next)
{
- struct GNUNET_CADET_TransmitHandle *th;
- struct GNUNET_CADET_TransmitHandle *next;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- " pending data, sending %u bytes!\n",
- ch->packet_size);
- for (th = h->th_head; NULL != th; th = next)
+ if ( (th->channel == ch) &&
+ (NULL == th->request_data_task) )
{
- next = th->next;
- if (th->channel == ch)
- {
- GNUNET_assert (NULL == th->request_data_task);
- th->request_data_task = GNUNET_SCHEDULER_add_now (&request_data, th);
- break;
- }
+ th->request_data_task
+ = GNUNET_SCHEDULER_add_now (&request_data,
+ th);
+ break;
}
- /* Complain if we got thru all th without sending anything, ch was wrong */
- GNUNET_break (NULL != th);
}
}
* original state.
*
* @param h handle to the cadet
- *
- * @return GNUNET_YES in case of sucess, GNUNET_NO otherwise (service down...)
+ * @return #GNUNET_YES in case of success, #GNUNET_NO otherwise (service down...)
*/
static int
do_reconnect (struct GNUNET_CADET_Handle *h)
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Requested RECONNECT, destroying all channels\n");
- for (ch = h->channels_head; NULL != ch; ch = h->channels_head)
+ while (NULL != (ch = h->channels_head))
destroy_channel (ch, GNUNET_YES);
if (NULL == h->reconnect_task)
h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_time,
{
struct GNUNET_CADET_Handle *h;
- LOG (GNUNET_ERROR_TYPE_DEBUG, "GNUNET_CADET_connect()\n");
h = GNUNET_new (struct GNUNET_CADET_Handle);
- LOG (GNUNET_ERROR_TYPE_DEBUG, " addr %p\n", h);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "GNUNET_CADET_connect() %p\n",
+ h);
h->cfg = cfg;
h->cleaner = cleaner;
h->ports = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_YES);
for (h->n_handlers = 0;
handlers && handlers[h->n_handlers].type;
h->n_handlers++) ;
- LOG (GNUNET_ERROR_TYPE_DEBUG, "GNUNET_CADET_connect() END\n");
return h;
}
+/**
+ * Disconnect from the cadet service. All channels will be destroyed. All channel
+ * disconnect callbacks will be called on any still connected peers, notifying
+ * about their disconnection. The registered inbound channel cleaner will be
+ * called should any inbound channels still exist.
+ *
+ * @param handle connection to cadet to disconnect
+ */
void
GNUNET_CADET_disconnect (struct GNUNET_CADET_Handle *handle)
{
* @param port Hash representing the port number.
* @param new_channel Function called when an channel is received.
* @param new_channel_cls Closure for @a new_channel.
- *
* @return Port handle.
*/
struct GNUNET_CADET_Port *
struct GNUNET_CADET_Channel *ch;
struct GNUNET_CADET_ClientChannelNumber ccn;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Creating new channel to %s:%u\n",
- GNUNET_i2s (peer), port);
ccn.channel_of_client = htonl (0);
ch = create_channel (h, ccn);
- LOG (GNUNET_ERROR_TYPE_DEBUG, " at %p\n", ch);
- LOG (GNUNET_ERROR_TYPE_DEBUG, " number %X\n",
- ntohl (ch->ccn.channel_of_client));
ch->ctx = channel_ctx;
ch->peer = GNUNET_PEER_intern (peer);
- env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_CREATE);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Creating new channel to %s:%u at %p number %X\n",
+ GNUNET_i2s (peer),
+ port,
+ ch,
+ ntohl (ch->ccn.channel_of_client));
+ env = GNUNET_MQ_msg (msg,
+ GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_CREATE);
msg->ccn = ch->ccn;
msg->port = *port;
msg->peer = *peer;
msg->opt = htonl (options);
- ch->allow_send = GNUNET_NO;
GNUNET_MQ_send (h->mq,
env);
return ch;
struct GNUNET_CADET_TransmitHandle *th;
struct GNUNET_CADET_TransmitHandle *next;
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Destroying channel\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Destroying channel\n");
h = channel->cadet;
for (th = h->th_head; th != NULL; th = next)
{
}
else
{
- LOG (GNUNET_ERROR_TYPE_WARNING, "no meta-traffic should be queued\n");
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "no meta-traffic should be queued\n");
}
- GNUNET_CONTAINER_DLL_remove (h->th_head, h->th_tail, th);
+ GNUNET_CONTAINER_DLL_remove (h->th_head,
+ h->th_tail,
+ th);
GNUNET_CADET_notify_transmit_ready_cancel (th);
}
}
GNUNET_MQ_send (h->mq,
env);
- destroy_channel (channel, GNUNET_YES);
+ destroy_channel (channel,
+ GNUNET_YES);
}
struct GNUNET_CADET_TransmitHandle *th;
GNUNET_assert (NULL != channel);
- GNUNET_assert (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE >= notify_size);
- LOG (GNUNET_ERROR_TYPE_DEBUG, "CADET NOTIFY TRANSMIT READY\n");
- LOG (GNUNET_ERROR_TYPE_DEBUG, " on channel %X\n", channel->ccn);
- LOG (GNUNET_ERROR_TYPE_DEBUG, " allow_send %d\n", channel->allow_send);
- if (ntohl (channel->ccn.channel_of_client) >=
- GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
- LOG (GNUNET_ERROR_TYPE_DEBUG, " to origin\n");
- else
- LOG (GNUNET_ERROR_TYPE_DEBUG, " to destination\n");
- LOG (GNUNET_ERROR_TYPE_DEBUG, " payload size %u\n", notify_size);
GNUNET_assert (NULL != notify);
- GNUNET_assert (0 == channel->packet_size); // Only one data packet allowed
-
+ GNUNET_assert (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE >= notify_size);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "CADET NOTIFY TRANSMIT READY on channel %X allow_send is %u to %s with %u bytes\n",
+ ntohl (channel->ccn.channel_of_client),
+ channel->allow_send,
+ (ntohl (channel->ccn.channel_of_client) >=
+ GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
+ ? "origin"
+ : "destination",
+ (unsigned int) notify_size);
if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us != maxdelay.rel_value_us)
{
LOG (GNUNET_ERROR_TYPE_WARNING,
th = GNUNET_new (struct GNUNET_CADET_TransmitHandle);
th->channel = channel;
th->size = notify_size;
- channel->packet_size = th->size;
- LOG (GNUNET_ERROR_TYPE_DEBUG, " total size %u\n", th->size);
th->notify = notify;
th->notify_cls = notify_cls;
- if (GNUNET_YES == channel->allow_send)
- th->request_data_task = GNUNET_SCHEDULER_add_now (&request_data, th);
+ if (0 != channel->allow_send)
+ th->request_data_task
+ = GNUNET_SCHEDULER_add_now (&request_data,
+ th);
else
- add_to_queue (channel->cadet, th);
-
- LOG (GNUNET_ERROR_TYPE_DEBUG, "CADET NOTIFY TRANSMIT READY END\n");
+ add_to_queue (channel->cadet,
+ th);
return th;
}
if (NULL != th->request_data_task)
{
GNUNET_SCHEDULER_cancel (th->request_data_task);
+ th->request_data_task = NULL;
}
- th->request_data_task = NULL;
-
remove_from_queue (th);
GNUNET_free (th);
}
+/**
+ * Send an ack on the channel to confirm the processing of a message.
+ *
+ * @param ch Channel on which to send the ACK.
+ */
void
GNUNET_CADET_receive_done (struct GNUNET_CADET_Channel *channel)
{
- send_ack (channel);
+ struct GNUNET_CADET_LocalAck *msg;
+ struct GNUNET_MQ_Envelope *env;
+
+ env = GNUNET_MQ_msg (msg,
+ GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending ACK on channel %X\n",
+ ntohl (channel->ccn.channel_of_client));
+ msg->ccn = channel->ccn;
+ GNUNET_MQ_send (channel->cadet->mq,
+ env);
}