if (0 != ch->peer)
GNUNET_PEER_change_rc (ch->peer, -1);
GNUNET_free (ch);
-
}
add_to_queue (struct GNUNET_CADET_Handle *h,
struct GNUNET_CADET_TransmitHandle *th)
{
- GNUNET_CONTAINER_DLL_insert_tail (h->th_head, h->th_tail, th);
+ GNUNET_CONTAINER_DLL_insert_tail (h->th_head,
+ h->th_tail,
+ th);
}
}
-/**
- * Send an ack on the channel to confirm the processing of a message.
- *
- * @param ch Channel on which to send the ACK.
- */
-static void
-send_ack (struct GNUNET_CADET_Channel *ch)
-{
- struct GNUNET_CADET_LocalAck *msg;
- struct GNUNET_MQ_Envelope *env;
-
- env = GNUNET_MQ_msg (msg,
- GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
-
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Sending ACK on channel %X\n",
- ch->ccn.channel_of_client);
- msg->ccn = ch->ccn;
- GNUNET_MQ_send (ch->cadet->mq,
- env);
-}
-
-
/******************************************************************************/
/*********************** RECEIVE HANDLERS ****************************/
size_t osize;
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Requesting Data: %u bytes\n",
- th->size);
+ "Requesting Data: %u bytes (allow send is %u)\n",
+ th->size,
+ th->channel->allow_send);
GNUNET_assert (0 < th->channel->allow_send);
th->channel->allow_send--;
th->size,
&msg[1]);
GNUNET_assert (osize == th->size);
+
GNUNET_MQ_send (th->channel->cadet->mq,
env);
GNUNET_free (th);
ccn = msg->ccn;
port_number = &msg->port;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Creating incoming channel %X [%s]\n",
- ntohl (ccn.channel_of_client),
- GNUNET_h2s (port_number));
if (ntohl (ccn.channel_of_client) >= GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
{
GNUNET_break (0);
return;
}
port = find_port (h, port_number);
- if (NULL != port)
- {
- void *ctx;
-
- ch = create_channel (h, ccn);
- ch->peer = GNUNET_PEER_intern (&msg->peer);
- ch->cadet = h;
- ch->ccn = ccn;
- ch->port = port;
- ch->options = ntohl (msg->opt);
-
- LOG (GNUNET_ERROR_TYPE_DEBUG, " created channel %p\n", ch);
- ctx = port->handler (port->cls, ch, &msg->peer, port->hash, ch->options);
- if (NULL != ctx)
- ch->ctx = ctx;
- LOG (GNUNET_ERROR_TYPE_DEBUG, "User notified\n");
- }
- else
+ if (NULL == port)
{
struct GNUNET_CADET_LocalChannelDestroyMessage *d_msg;
struct GNUNET_MQ_Envelope *env;
- LOG (GNUNET_ERROR_TYPE_DEBUG, "No handler for incoming channels\n");
+ GNUNET_break (0);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "No handler for incoming channel %X [%s]\n",
+ ntohl (ccn.channel_of_client),
+ GNUNET_h2s (port_number));
+ /* FIXME: should disconnect instead, this is a serious error! */
env = GNUNET_MQ_msg (d_msg,
GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY);
d_msg->ccn = msg->ccn;
- GNUNET_MQ_send (h->mq, env);
+ GNUNET_MQ_send (h->mq,
+ env);
+ return;
}
- return;
+
+ ch = create_channel (h,
+ ccn);
+ ch->peer = GNUNET_PEER_intern (&msg->peer);
+ ch->cadet = h;
+ ch->ccn = ccn;
+ ch->port = port;
+ ch->options = ntohl (msg->opt);
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Creating incoming channel %X [%s] %p\n",
+ ntohl (ccn.channel_of_client),
+ GNUNET_h2s (port_number),
+ ch);
+ ch->ctx = port->handler (port->cls,
+ ch,
+ &msg->peer,
+ port->hash,
+ ch->options);
}
const struct GNUNET_MessageHeader *payload;
const struct GNUNET_CADET_MessageHandler *handler;
struct GNUNET_CADET_Channel *ch;
- unsigned int i;
uint16_t type;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Got a data message!\n");
ch = retrieve_channel (h,
message->ccn);
GNUNET_assert (NULL != ch);
payload = (struct GNUNET_MessageHeader *) &message[1];
- LOG (GNUNET_ERROR_TYPE_DEBUG, " %s data on channel %s [%X]\n",
+ type = ntohs (payload->type);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Got a %s data on channel %s [%X] of type %s (%u)\n",
GC_f2s (ntohl (ch->ccn.channel_of_client) >=
GNUNET_CADET_LOCAL_CHANNEL_ID_CLI),
GNUNET_i2s (GNUNET_PEER_resolve2 (ch->peer)),
- ntohl (message->ccn.channel_of_client));
-
- type = ntohs (payload->type);
- LOG (GNUNET_ERROR_TYPE_DEBUG, " payload type %s\n", GC_m2s (type));
- for (i = 0; i < h->n_handlers; i++)
+ ntohl (message->ccn.channel_of_client),
+ GC_m2s (type),
+ type);
+ for (unsigned i=0;i<h->n_handlers;i++)
{
handler = &h->message_handlers[i];
- LOG (GNUNET_ERROR_TYPE_DEBUG, " checking handler for type %u\n",
- handler->type);
if (handler->type == type)
{
if (GNUNET_OK !=
- handler->callback (h->cls, ch, &ch->ctx, payload))
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, "callback caused disconnection\n");
- GNUNET_CADET_channel_destroy (ch);
- break;
- }
- else
+ handler->callback (h->cls,
+ ch,
+ &ch->ctx,
+ payload))
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "callback completed successfully\n");
- break;
+ "callback caused disconnection\n");
+ GNUNET_CADET_channel_destroy (ch);
+ return;
}
+ return;
}
}
+ /* Other peer sent message we do not comprehend. */
+ GNUNET_break_op (0);
+ GNUNET_CADET_receive_done (ch);
}
ntohl (ccn.channel_of_client));
return;
}
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Got an ACK on channel %X!\n",
- ntohl (ch->ccn.channel_of_client));
ch->allow_send++;
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Checking for pending data\n");
+ "Got an ACK on channel %X, allow send now %u!\n",
+ ntohl (ch->ccn.channel_of_client),
+ ch->allow_send);
for (th = h->th_head; NULL != th; th = th->next)
{
if ( (th->channel == ch) &&
* original state.
*
* @param h handle to the cadet
- *
- * @return GNUNET_YES in case of sucess, GNUNET_NO otherwise (service down...)
+ * @return #GNUNET_YES in case of success, #GNUNET_NO otherwise (service down...)
*/
static int
do_reconnect (struct GNUNET_CADET_Handle *h)
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Requested RECONNECT, destroying all channels\n");
- for (ch = h->channels_head; NULL != ch; ch = h->channels_head)
+ while (NULL != (ch = h->channels_head))
destroy_channel (ch, GNUNET_YES);
if (NULL == h->reconnect_task)
h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_time,
{
struct GNUNET_CADET_Handle *h;
- LOG (GNUNET_ERROR_TYPE_DEBUG, "GNUNET_CADET_connect()\n");
h = GNUNET_new (struct GNUNET_CADET_Handle);
- LOG (GNUNET_ERROR_TYPE_DEBUG, " addr %p\n", h);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "GNUNET_CADET_connect() %p\n",
+ h);
h->cfg = cfg;
h->cleaner = cleaner;
h->ports = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_YES);
for (h->n_handlers = 0;
handlers && handlers[h->n_handlers].type;
h->n_handlers++) ;
- LOG (GNUNET_ERROR_TYPE_DEBUG, "GNUNET_CADET_connect() END\n");
return h;
}
* @param port Hash representing the port number.
* @param new_channel Function called when an channel is received.
* @param new_channel_cls Closure for @a new_channel.
- *
* @return Port handle.
*/
struct GNUNET_CADET_Port *
struct GNUNET_CADET_Channel *ch;
struct GNUNET_CADET_ClientChannelNumber ccn;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Creating new channel to %s:%u\n",
- GNUNET_i2s (peer), port);
ccn.channel_of_client = htonl (0);
ch = create_channel (h, ccn);
- LOG (GNUNET_ERROR_TYPE_DEBUG, " at %p\n", ch);
- LOG (GNUNET_ERROR_TYPE_DEBUG, " number %X\n",
- ntohl (ch->ccn.channel_of_client));
ch->ctx = channel_ctx;
ch->peer = GNUNET_PEER_intern (peer);
- env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_CREATE);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Creating new channel to %s:%u at %p number %X\n",
+ GNUNET_i2s (peer),
+ port,
+ ch,
+ ntohl (ch->ccn.channel_of_client));
+ env = GNUNET_MQ_msg (msg,
+ GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_CREATE);
msg->ccn = ch->ccn;
msg->port = *port;
msg->peer = *peer;
struct GNUNET_CADET_TransmitHandle *th;
struct GNUNET_CADET_TransmitHandle *next;
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Destroying channel\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Destroying channel\n");
h = channel->cadet;
for (th = h->th_head; th != NULL; th = next)
{
}
else
{
- LOG (GNUNET_ERROR_TYPE_WARNING, "no meta-traffic should be queued\n");
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "no meta-traffic should be queued\n");
}
GNUNET_CONTAINER_DLL_remove (h->th_head,
h->th_tail,
struct GNUNET_CADET_TransmitHandle *th;
GNUNET_assert (NULL != channel);
- GNUNET_assert (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE >= notify_size);
- LOG (GNUNET_ERROR_TYPE_DEBUG, "CADET NOTIFY TRANSMIT READY\n");
- LOG (GNUNET_ERROR_TYPE_DEBUG, " on channel %X\n", channel->ccn);
- LOG (GNUNET_ERROR_TYPE_DEBUG, " allow_send %d\n", channel->allow_send);
- if (ntohl (channel->ccn.channel_of_client) >=
- GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
- LOG (GNUNET_ERROR_TYPE_DEBUG, " to origin\n");
- else
- LOG (GNUNET_ERROR_TYPE_DEBUG, " to destination\n");
- LOG (GNUNET_ERROR_TYPE_DEBUG, " payload size %u\n", notify_size);
GNUNET_assert (NULL != notify);
+ GNUNET_assert (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE >= notify_size);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "CADET NOTIFY TRANSMIT READY on channel %X allow_send is %u to %s with %u bytes\n",
+ ntohl (channel->ccn.channel_of_client),
+ channel->allow_send,
+ (ntohl (channel->ccn.channel_of_client) >=
+ GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
+ ? "origin"
+ : "destination",
+ (unsigned int) notify_size);
if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us != maxdelay.rel_value_us)
{
LOG (GNUNET_ERROR_TYPE_WARNING,
th = GNUNET_new (struct GNUNET_CADET_TransmitHandle);
th->channel = channel;
th->size = notify_size;
- LOG (GNUNET_ERROR_TYPE_DEBUG, " total size %u\n", th->size);
th->notify = notify;
th->notify_cls = notify_cls;
if (0 != channel->allow_send)
- th->request_data_task = GNUNET_SCHEDULER_add_now (&request_data, th);
+ th->request_data_task
+ = GNUNET_SCHEDULER_add_now (&request_data,
+ th);
else
- add_to_queue (channel->cadet, th);
-
- LOG (GNUNET_ERROR_TYPE_DEBUG, "CADET NOTIFY TRANSMIT READY END\n");
+ add_to_queue (channel->cadet,
+ th);
return th;
}
if (NULL != th->request_data_task)
{
GNUNET_SCHEDULER_cancel (th->request_data_task);
+ th->request_data_task = NULL;
}
- th->request_data_task = NULL;
-
remove_from_queue (th);
GNUNET_free (th);
}
+/**
+ * Send an ack on the channel to confirm the processing of a message.
+ *
+ * @param ch Channel on which to send the ACK.
+ */
void
GNUNET_CADET_receive_done (struct GNUNET_CADET_Channel *channel)
{
- send_ack (channel);
+ struct GNUNET_CADET_LocalAck *msg;
+ struct GNUNET_MQ_Envelope *env;
+
+ env = GNUNET_MQ_msg (msg,
+ GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending ACK on channel %X\n",
+ ntohl (channel->ccn.channel_of_client));
+ msg->ccn = channel->ccn;
+ GNUNET_MQ_send (channel->cadet->mq,
+ env);
}