- struct GNUNET_CADET_Handle *h = cls;
- struct GNUNET_CADET_TransmitHandle *th;
- struct GNUNET_CADET_TransmitHandle *next;
- struct GNUNET_CADET_Channel *ch;
- char *cbuf = buf;
- size_t tsize;
- size_t psize;
- size_t nsize;
-
- LOG (GNUNET_ERROR_TYPE_DEBUG, "\n");
- LOG (GNUNET_ERROR_TYPE_DEBUG, "# Send packet() Buffer %u\n", size);
- if ((0 == size) || (NULL == buf))
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, "# Received NULL send callback on %p\n", h);
- reconnect (h);
- h->th = NULL;
- return 0;
- }
- tsize = 0;
- next = h->th_head;
- nsize = message_ready_size (h);
- while ((NULL != (th = next)) && (0 < nsize) && (size >= nsize))
- {
- ch = th->channel;
- if (GNUNET_YES == th_is_payload (th))
- {
- struct GNUNET_CADET_LocalData *dmsg;
- struct GNUNET_MessageHeader *mh;
-
- LOG (GNUNET_ERROR_TYPE_DEBUG, "# payload\n");
- if (GNUNET_NO == ch->allow_send)
- {
- /* This channel is not ready to transmit yet, try next message */
- next = th->next;
- continue;
- }
- ch->packet_size = 0;
- GNUNET_assert (size >= th->size);
- dmsg = (struct GNUNET_CADET_LocalData *) cbuf;
- mh = (struct GNUNET_MessageHeader *) &dmsg[1];
- psize = th->notify (th->notify_cls,
- size - sizeof (struct GNUNET_CADET_LocalData),
- mh);
-
- if (psize > 0)
- {
- psize += sizeof (struct GNUNET_CADET_LocalData);
- GNUNET_assert (size >= psize);
- dmsg->header.size = htons (psize);
- dmsg->id = htonl (ch->chid);
- dmsg->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
- LOG (GNUNET_ERROR_TYPE_DEBUG, "# payload type %s\n",
- GC_m2s (ntohs (mh->type)));
- ch->allow_send = GNUNET_NO;
- }
- else
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "# callback returned size 0, "
- "application canceled transmission\n");
- }
- }
- else
- {
- struct GNUNET_MessageHeader *mh = (struct GNUNET_MessageHeader *) &th[1];
-
- LOG (GNUNET_ERROR_TYPE_DEBUG, "# cadet internal traffic, type %s\n",
- GC_m2s (ntohs (mh->type)));
- memcpy (cbuf, &th[1], th->size);
- psize = th->size;
- }
- GNUNET_assert (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE >= psize);
-
- if (th->timeout_task != GNUNET_SCHEDULER_NO_TASK)
- GNUNET_SCHEDULER_cancel (th->timeout_task);
- GNUNET_CONTAINER_DLL_remove (h->th_head, h->th_tail, th);
- GNUNET_free (th);
- next = h->th_head;
- nsize = message_ready_size (h);
- cbuf += psize;
- size -= psize;
- tsize += psize;
- }
- LOG (GNUNET_ERROR_TYPE_DEBUG, "# total size: %u\n", tsize);
- h->th = NULL;
- size = message_ready_size (h);
- if (0 != size)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, "# next size: %u\n", size);
- h->th =
- GNUNET_CLIENT_notify_transmit_ready (h->client, size,
- GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_YES, &send_callback, h);
- }
- else
- {
- if (NULL != h->th_head)
- LOG (GNUNET_ERROR_TYPE_DEBUG, "# can't transmit any more\n");
- else
- LOG (GNUNET_ERROR_TYPE_DEBUG, "# nothing left to transmit\n");
- }
- if (GNUNET_NO == h->in_receive)
+ struct GNUNET_MQ_MessageHandler handlers[] = {
+ GNUNET_MQ_hd_fixed_size (channel_created,
+ GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_CREATE,
+ struct GNUNET_CADET_LocalChannelCreateMessage,
+ h),
+ GNUNET_MQ_hd_fixed_size (channel_destroy,
+ GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY,
+ struct GNUNET_CADET_LocalChannelDestroyMessage,
+ h),
+ GNUNET_MQ_hd_var_size (local_data,
+ GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA,
+ struct GNUNET_CADET_LocalData,
+ h),
+ GNUNET_MQ_hd_fixed_size (local_ack,
+ GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK,
+ struct GNUNET_CADET_LocalAck,
+ h),
+ GNUNET_MQ_hd_fixed_size (get_peers,
+ GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEERS,
+ struct GNUNET_CADET_LocalInfoPeer,
+ h),
+ GNUNET_MQ_hd_var_size (get_peer,
+ GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEER,
+ struct GNUNET_CADET_LocalInfoPeer,
+ h),
+ GNUNET_MQ_hd_fixed_size (get_tunnels,
+ GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNELS,
+ struct GNUNET_CADET_LocalInfoTunnel,
+ h),
+ GNUNET_MQ_hd_var_size (get_tunnel,
+ GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNEL,
+ struct GNUNET_CADET_LocalInfoTunnel,
+ h),
+ GNUNET_MQ_handler_end ()
+ };
+
+ h->mq = GNUNET_CLIENT_connect (h->cfg,
+ "cadet",
+ handlers,
+ &handle_mq_error,
+ h);
+ if (NULL == h->mq)