X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fmulticast%2Fgnunet-service-multicast.c;h=b068f13083e72b557ca586532d857b2aeb125a1a;hb=be35c296dd1f2aa82dcf718be5b257fa340d5328;hp=de65e0ab743a773531cffefefca95a81ce17dfbd;hpb=9407ee77841b771e774d51eaa2916bceed047c86;p=oweals%2Fgnunet.git diff --git a/src/multicast/gnunet-service-multicast.c b/src/multicast/gnunet-service-multicast.c index de65e0ab7..b068f1308 100644 --- a/src/multicast/gnunet-service-multicast.c +++ b/src/multicast/gnunet-service-multicast.c @@ -162,6 +162,16 @@ struct Channel */ struct GNUNET_PeerIdentity peer; + /** + * Current window size, set by cadet_notify_window_change() + */ + int32_t window_size; + + /** + * Is the connection established? + */ + int8_t is_connected; + /** * Is the remote peer admitted to the group? * @see enum JoinStatus @@ -336,6 +346,17 @@ struct ReplayRequestKey }; +static struct Channel * +cadet_channel_create (struct Group *grp, struct GNUNET_PeerIdentity *peer); + +static void +cadet_channel_destroy (struct Channel *chn); + +static void +client_send_join_decision (struct Member *mem, + const struct MulticastJoinDecisionMessageHeader *hdcsn); + + /** * Task run during shutdown. * @@ -497,7 +518,7 @@ replay_req_remove_client (struct Group *grp, struct GNUNET_SERVICE_Client *clien { if (c == client) { - GNUNET_CONTAINER_multihashmap_remove (replay_req_client, &key, client); + GNUNET_CONTAINER_multihashmap_remove (grp_replay_req, &key, client); GNUNET_CONTAINER_multihashmap_iterator_destroy (it); return GNUNET_YES; } @@ -653,6 +674,9 @@ client_send_origin (struct GNUNET_HashCode *pub_key_hash, static void client_send_ack (struct GNUNET_HashCode *pub_key_hash) { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Sending message ACK to client.\n"); + static struct GNUNET_MessageHeader *msg = NULL; if (NULL == msg) { @@ -671,36 +695,6 @@ struct CadetTransmitClosure }; -/** - * CADET is ready to transmit a message. - */ -size_t -cadet_notify_transmit_ready (void *cls, size_t buf_size, void *buf) -{ - if (0 == buf_size) - { - /* FIXME: connection closed */ - return 0; - } - struct CadetTransmitClosure *tcls = cls; - struct Channel *chn = tcls->chn; - uint16_t msg_size = ntohs (tcls->msg->size); - GNUNET_assert (msg_size <= buf_size); - GNUNET_memcpy (buf, tcls->msg, msg_size); - GNUNET_free (tcls); - - if (0 == chn->msgs_pending) - { - GNUNET_break (0); - } - else if (0 == --chn->msgs_pending) - { - client_send_ack (&chn->group_pub_hash); - } - return msg_size; -} - - /** * Send a message to a CADET channel. * @@ -710,53 +704,22 @@ cadet_notify_transmit_ready (void *cls, size_t buf_size, void *buf) static void cadet_send_channel (struct Channel *chn, const struct GNUNET_MessageHeader *msg) { - uint16_t msg_size = ntohs (msg->size); - struct GNUNET_MessageHeader *msg_copy = GNUNET_malloc (msg_size); - GNUNET_memcpy (msg_copy, msg, msg_size); - - struct CadetTransmitClosure *tcls = GNUNET_malloc (sizeof (*tcls)); - tcls->chn = chn; - tcls->msg = msg_copy; - - chn->msgs_pending++; - chn->tmit_handle - = GNUNET_CADET_notify_transmit_ready (chn->channel, GNUNET_NO, - GNUNET_TIME_UNIT_FOREVER_REL, - msg_size, - &cadet_notify_transmit_ready, - tcls); - GNUNET_assert (NULL != chn->tmit_handle); -} + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_copy (msg); + GNUNET_MQ_send (GNUNET_CADET_get_mq (chn->channel), env); -/** - * Create new outgoing CADET channel. - * - * @param peer - * Peer to connect to. - * @param group_pub_key - * Public key of group the channel belongs to. - * @param group_pub_hash - * Hash of @a group_pub_key. - * - * @return Channel. - */ -static struct Channel * -cadet_channel_create (struct Group *grp, struct GNUNET_PeerIdentity *peer) -{ - struct Channel *chn = GNUNET_malloc (sizeof (*chn)); - chn->group = grp; - chn->group_pub_key = grp->pub_key; - chn->group_pub_hash = grp->pub_key_hash; - chn->peer = *peer; - chn->direction = DIR_OUTGOING; - chn->join_status = JOIN_WAITING; - chn->channel = GNUNET_CADET_channel_create (cadet, chn, &chn->peer, - &grp->cadet_port_hash, - GNUNET_CADET_OPTION_RELIABLE); - GNUNET_CONTAINER_multihashmap_put (channels_out, &chn->group_pub_hash, chn, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); - return chn; + if (0 < chn->window_size) + { + client_send_ack (&chn->group_pub_hash); + } + else + { + chn->msgs_pending++; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "%p Queuing message. Pending messages: %u\n", + chn, chn->msgs_pending); + } } @@ -787,7 +750,7 @@ cadet_send_join_decision_cb (void *cls, const struct MulticastJoinDecisionMessageHeader *hdcsn = cls; struct Channel *chn = channel; - const struct MulticastJoinDecisionMessage *dcsn = + const struct MulticastJoinDecisionMessage *dcsn = (struct MulticastJoinDecisionMessage *) &hdcsn[1]; if (0 == memcmp (&hdcsn->member_pub_key, &chn->member_pub_key, sizeof (chn->member_pub_key)) @@ -870,31 +833,74 @@ cadet_send_parents (struct GNUNET_HashCode *pub_key_hash, /** - * New incoming CADET channel. + * CADET channel connect handler. + * + * @see GNUNET_CADET_ConnectEventHandler() */ static void * -cadet_notify_channel_new (void *cls, - struct GNUNET_CADET_Channel *channel, - const struct GNUNET_PeerIdentity *initiator, - const struct GNUNET_HashCode *port, - enum GNUNET_CADET_ChannelOption options) +cadet_notify_connect (void *cls, + struct GNUNET_CADET_Channel *channel, + const struct GNUNET_PeerIdentity *source) +{ + struct Channel *chn = GNUNET_malloc (sizeof *chn); + chn->group = cls; + chn->channel = channel; + chn->direction = DIR_INCOMING; + chn->join_status = JOIN_NOT_ASKED; + + GNUNET_CONTAINER_multihashmap_put (channels_in, &chn->group_pub_hash, chn, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + return chn; +} + + +/** + * CADET window size change handler. + * + * @see GNUNET_CADET_WindowSizeEventHandler() + */ +static void +cadet_notify_window_change (void *cls, + const struct GNUNET_CADET_Channel *channel, + int window_size) { - return NULL; + struct Channel *chn = cls; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "%p Window size changed to %d. Pending messages: %u\n", + chn, window_size, chn->msgs_pending); + + chn->is_connected = GNUNET_YES; + chn->window_size = (int32_t) window_size; + + for (int i = 0; i < window_size; i++) + { + if (0 < chn->msgs_pending) + { + client_send_ack (&chn->group_pub_hash); + chn->msgs_pending--; + } + else + { + break; + } + } } /** - * CADET channel is being destroyed. + * CADET channel disconnect handler. + * + * @see GNUNET_CADET_DisconnectEventHandler() */ static void -cadet_notify_channel_end (void *cls, - const struct GNUNET_CADET_Channel *channel, - void *ctx) +cadet_notify_disconnect (void *cls, + const struct GNUNET_CADET_Channel *channel) { - if (NULL == ctx) + if (NULL == cls) return; - struct Channel *chn = ctx; + struct Channel *chn = cls; if (NULL != chn->group) { if (GNUNET_NO == chn->group->is_origin) @@ -905,26 +911,417 @@ cadet_notify_channel_end (void *cls, } } - while (GNUNET_YES == replay_req_remove_cadet (chn)); + int ret; + do + { + ret = replay_req_remove_cadet (chn); + } + while (GNUNET_YES == ret); + + GNUNET_free (chn); +} + + +static int +check_cadet_join_request (void *cls, + const struct MulticastJoinRequestMessage *req) +{ + struct Channel *chn = cls; + + if (NULL == chn + || JOIN_NOT_ASKED != chn->join_status) + { + return GNUNET_SYSERR; + } + + uint16_t size = ntohs (req->header.size); + if (size < sizeof (*req)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + if (ntohl (req->purpose.size) != (size + - sizeof (req->header) + - sizeof (req->reserved) + - sizeof (req->signature))) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + if (GNUNET_OK != + GNUNET_CRYPTO_ecdsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST, + &req->purpose, &req->signature, + &req->member_pub_key)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + + return GNUNET_OK; +} + + +/** + * Incoming join request message from CADET. + */ +static void +handle_cadet_join_request (void *cls, + const struct MulticastJoinRequestMessage *req) +{ + struct Channel *chn = cls; + GNUNET_CADET_receive_done (chn->channel); + + struct GNUNET_HashCode group_pub_hash; + GNUNET_CRYPTO_hash (&req->group_pub_key, sizeof (req->group_pub_key), &group_pub_hash); + chn->group_pub_key = req->group_pub_key; + chn->group_pub_hash = group_pub_hash; + chn->member_pub_key = req->member_pub_key; + chn->peer = req->peer; + chn->join_status = JOIN_WAITING; + + client_send_all (&group_pub_hash, &req->header); +} + + +static int +check_cadet_join_decision (void *cls, + const struct MulticastJoinDecisionMessageHeader *hdcsn) +{ + uint16_t size = ntohs (hdcsn->header.size); + if (size < sizeof (struct MulticastJoinDecisionMessageHeader) + + sizeof (struct MulticastJoinDecisionMessage)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + + struct Channel *chn = cls; + if (NULL == chn) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + if (NULL == chn->group || GNUNET_NO != chn->group->is_origin) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + switch (chn->join_status) + { + case JOIN_REFUSED: + return GNUNET_SYSERR; + + case JOIN_ADMITTED: + return GNUNET_OK; + + case JOIN_NOT_ASKED: + case JOIN_WAITING: + break; + } + + return GNUNET_OK; +} + + +/** + * Incoming join decision message from CADET. + */ +static void +handle_cadet_join_decision (void *cls, + const struct MulticastJoinDecisionMessageHeader *hdcsn) +{ + const struct MulticastJoinDecisionMessage * + dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1]; + + struct Channel *chn = cls; + GNUNET_CADET_receive_done (chn->channel); + + // FIXME: do we need to copy chn->peer or compare it with hdcsn->peer? + struct Member *mem = (struct Member *) chn->group; + client_send_join_decision (mem, hdcsn); + if (GNUNET_YES == ntohl (dcsn->is_admitted)) + { + chn->join_status = JOIN_ADMITTED; + } + else + { + chn->join_status = JOIN_REFUSED; + cadet_channel_destroy (chn); + } +} + + +static int +check_cadet_message (void *cls, + const struct GNUNET_MULTICAST_MessageHeader *msg) +{ + uint16_t size = ntohs (msg->header.size); + if (size < sizeof (*msg)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + + struct Channel *chn = cls; + if (NULL == chn) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + if (ntohl (msg->purpose.size) != (size + - sizeof (msg->header) + - sizeof (msg->hop_counter) + - sizeof (msg->signature))) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + if (GNUNET_OK != + GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE, + &msg->purpose, &msg->signature, + &chn->group_pub_key)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + + return GNUNET_OK; +} + + +/** + * Incoming multicast message from CADET. + */ +static void +handle_cadet_message (void *cls, + const struct GNUNET_MULTICAST_MessageHeader *msg) +{ + struct Channel *chn = cls; + GNUNET_CADET_receive_done (chn->channel); + client_send_all (&chn->group_pub_hash, &msg->header); +} + + +static int +check_cadet_request (void *cls, + const struct GNUNET_MULTICAST_RequestHeader *req) +{ + uint16_t size = ntohs (req->header.size); + if (size < sizeof (*req)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + + struct Channel *chn = cls; + if (NULL == chn) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + if (ntohl (req->purpose.size) != (size + - sizeof (req->header) + - sizeof (req->member_pub_key) + - sizeof (req->signature))) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + if (GNUNET_OK != + GNUNET_CRYPTO_ecdsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST, + &req->purpose, &req->signature, + &req->member_pub_key)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + + return GNUNET_OK; +} + + +/** + * Incoming multicast request message from CADET. + */ +static void +handle_cadet_request (void *cls, + const struct GNUNET_MULTICAST_RequestHeader *req) +{ + struct Channel *chn = cls; + GNUNET_CADET_receive_done (chn->channel); + client_send_origin (&chn->group_pub_hash, &req->header); +} + + +static int +check_cadet_replay_request (void *cls, + const struct MulticastReplayRequestMessage *req) +{ + uint16_t size = ntohs (req->header.size); + if (size < sizeof (*req)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + + struct Channel *chn = cls; + if (NULL == chn) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + + return GNUNET_OK; +} + + +/** + * Incoming multicast replay request from CADET. + */ +static void +handle_cadet_replay_request (void *cls, + const struct MulticastReplayRequestMessage *req) +{ + struct Channel *chn = cls; + GNUNET_CADET_receive_done (chn->channel); + + struct MulticastReplayRequestMessage rep = *req; + GNUNET_memcpy (&rep.member_pub_key, &chn->member_pub_key, sizeof (chn->member_pub_key)); + + struct GNUNET_CONTAINER_MultiHashMap * + grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet, + &chn->group->pub_key_hash); + if (NULL == grp_replay_req) + { + grp_replay_req = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); + GNUNET_CONTAINER_multihashmap_put (replay_req_cadet, + &chn->group->pub_key_hash, grp_replay_req, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); + } + struct GNUNET_HashCode key_hash; + replay_key_hash (rep.fragment_id, rep.message_id, rep.fragment_offset, + rep.flags, &key_hash); + GNUNET_CONTAINER_multihashmap_put (grp_replay_req, &key_hash, chn, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + + client_send_random (&chn->group_pub_hash, &rep.header); +} + + +static int +check_cadet_replay_response (void *cls, + const struct MulticastReplayResponseMessage *res) +{ + struct Channel *chn = cls; + if (NULL == chn) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + + +/** + * Incoming multicast replay response from CADET. + */ +static void +handle_cadet_replay_response (void *cls, + const struct MulticastReplayResponseMessage *res) +{ + struct Channel *chn = cls; + GNUNET_CADET_receive_done (chn->channel); + + /* @todo FIXME: got replay error response, send request to other members */ +} + + +static void +group_set_cadet_port_hash (struct Group *grp) +{ + struct CadetPort { + struct GNUNET_CRYPTO_EddsaPublicKey pub_key; + uint32_t app_type; + } port = { + grp->pub_key, + GNUNET_APPLICATION_TYPE_MULTICAST, + }; + GNUNET_CRYPTO_hash (&port, sizeof (port), &grp->cadet_port_hash); +} + + + +/** + * Create new outgoing CADET channel. + * + * @param peer + * Peer to connect to. + * @param group_pub_key + * Public key of group the channel belongs to. + * @param group_pub_hash + * Hash of @a group_pub_key. + * + * @return Channel. + */ +static struct Channel * +cadet_channel_create (struct Group *grp, struct GNUNET_PeerIdentity *peer) +{ + struct Channel *chn = GNUNET_malloc (sizeof (*chn)); + chn->group = grp; + chn->group_pub_key = grp->pub_key; + chn->group_pub_hash = grp->pub_key_hash; + chn->peer = *peer; + chn->direction = DIR_OUTGOING; + chn->is_connected = GNUNET_NO; + chn->join_status = JOIN_WAITING; + + struct GNUNET_MQ_MessageHandler cadet_handlers[] = { + GNUNET_MQ_hd_var_size (cadet_message, + GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, + struct GNUNET_MULTICAST_MessageHeader, + chn), - GNUNET_free (chn); + GNUNET_MQ_hd_var_size (cadet_join_decision, + GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION, + struct MulticastJoinDecisionMessageHeader, + chn), + + GNUNET_MQ_hd_var_size (cadet_replay_request, + GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, + struct MulticastReplayRequestMessage, + chn), + + GNUNET_MQ_hd_var_size (cadet_replay_response, + GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE, + struct MulticastReplayResponseMessage, + chn), + + GNUNET_MQ_handler_end () + }; + + chn->channel = GNUNET_CADET_channel_creatE (cadet, chn, &chn->peer, + &grp->cadet_port_hash, + GNUNET_CADET_OPTION_RELIABLE, + cadet_notify_window_change, + cadet_notify_disconnect, + cadet_handlers); + GNUNET_CONTAINER_multihashmap_put (channels_out, &chn->group_pub_hash, chn, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + return chn; } +/** + * Destroy outgoing CADET channel. + */ static void -group_set_cadet_port_hash (struct Group *grp) +cadet_channel_destroy (struct Channel *chn) { - struct CadetPort { - struct GNUNET_CRYPTO_EddsaPublicKey pub_key; - uint32_t app_type; - } port = { - grp->pub_key, - GNUNET_APPLICATION_TYPE_MULTICAST, - }; - GNUNET_CRYPTO_hash (&port, sizeof (port), &grp->cadet_port_hash); + GNUNET_CADET_channel_destroy (chn->channel); + GNUNET_CONTAINER_multihashmap_remove_all (channels_out, &chn->group_pub_hash); + GNUNET_free (chn); } - /** * Handle a connecting client starting an origin. */ @@ -961,8 +1358,44 @@ handle_client_origin_start (void *cls, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); group_set_cadet_port_hash (grp); - orig->cadet_port = GNUNET_CADET_open_port (cadet, &grp->cadet_port_hash, - cadet_notify_channel_new, NULL); + + struct GNUNET_MQ_MessageHandler cadet_handlers[] = { + GNUNET_MQ_hd_var_size (cadet_message, + GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, + struct GNUNET_MULTICAST_MessageHeader, + grp), + + GNUNET_MQ_hd_var_size (cadet_request, + GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST, + struct GNUNET_MULTICAST_RequestHeader, + grp), + + GNUNET_MQ_hd_var_size (cadet_join_request, + GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, + struct MulticastJoinRequestMessage, + grp), + + GNUNET_MQ_hd_var_size (cadet_replay_request, + GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, + struct MulticastReplayRequestMessage, + grp), + + GNUNET_MQ_hd_var_size (cadet_replay_response, + GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE, + struct MulticastReplayResponseMessage, + grp), + + GNUNET_MQ_handler_end () + }; + + + orig->cadet_port = GNUNET_CADET_open_porT (cadet, + &grp->cadet_port_hash, + cadet_notify_connect, + NULL, + cadet_notify_window_change, + cadet_notify_disconnect, + cadet_handlers); } else { @@ -1258,10 +1691,8 @@ handle_client_multicast_message (void *cls, } client_send_all (&grp->pub_key_hash, &out->header); - if (0 == cadet_send_children (&grp->pub_key_hash, &out->header)) - { - client_send_ack (&grp->pub_key_hash); - } + cadet_send_children (&grp->pub_key_hash, &out->header); + client_send_ack (&grp->pub_key_hash); GNUNET_free (out); GNUNET_SERVICE_client_continue (client); @@ -1543,278 +1974,6 @@ handle_client_replay_response (void *cls, } -/** - * Incoming join request message from CADET. - */ -int -cadet_recv_join_request (void *cls, - struct GNUNET_CADET_Channel *channel, - void **ctx, - const struct GNUNET_MessageHeader *m) -{ - GNUNET_CADET_receive_done(channel); - const struct MulticastJoinRequestMessage * - req = (const struct MulticastJoinRequestMessage *) m; - uint16_t size = ntohs (m->size); - if (size < sizeof (*req)) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } - if (NULL != *ctx) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } - if (ntohl (req->purpose.size) != (size - - sizeof (req->header) - - sizeof (req->reserved) - - sizeof (req->signature))) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } - if (GNUNET_OK != - GNUNET_CRYPTO_ecdsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST, - &req->purpose, &req->signature, - &req->member_pub_key)) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } - - struct GNUNET_HashCode group_pub_hash; - GNUNET_CRYPTO_hash (&req->group_pub_key, sizeof (req->group_pub_key), &group_pub_hash); - - struct Channel *chn = GNUNET_malloc (sizeof *chn); - chn->channel = channel; - chn->group_pub_key = req->group_pub_key; - chn->group_pub_hash = group_pub_hash; - chn->member_pub_key = req->member_pub_key; - chn->peer = req->peer; - chn->join_status = JOIN_WAITING; - GNUNET_CONTAINER_multihashmap_put (channels_in, &chn->group_pub_hash, chn, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); - *ctx = chn; - - client_send_all (&group_pub_hash, m); - return GNUNET_OK; -} - - -/** - * Incoming join decision message from CADET. - */ -int -cadet_recv_join_decision (void *cls, - struct GNUNET_CADET_Channel *channel, - void **ctx, - const struct GNUNET_MessageHeader *m) -{ - GNUNET_CADET_receive_done (channel); - const struct MulticastJoinDecisionMessageHeader * - hdcsn = (const struct MulticastJoinDecisionMessageHeader *) m; - const struct MulticastJoinDecisionMessage * - dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1]; - uint16_t size = ntohs (m->size); - if (size < sizeof (struct MulticastJoinDecisionMessageHeader) + - sizeof (struct MulticastJoinDecisionMessage)) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } - struct Channel *chn = *ctx; - if (NULL == chn) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } - if (NULL == chn->group || GNUNET_NO != chn->group->is_origin) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } - switch (chn->join_status) - { - case JOIN_REFUSED: - return GNUNET_SYSERR; - - case JOIN_ADMITTED: - return GNUNET_OK; - - case JOIN_NOT_ASKED: - case JOIN_WAITING: - break; - } - - // FIXME: do we need to copy chn->peer or compare it with hdcsn->peer? - struct Member *mem = (struct Member *) chn->group; - client_send_join_decision (mem, hdcsn); - if (GNUNET_YES == ntohl (dcsn->is_admitted)) - { - chn->join_status = JOIN_ADMITTED; - return GNUNET_OK; - } - else - { - chn->join_status = JOIN_REFUSED; - return GNUNET_SYSERR; - } -} - -/** - * Incoming multicast message from CADET. - */ -int -cadet_recv_message (void *cls, - struct GNUNET_CADET_Channel *channel, - void **ctx, - const struct GNUNET_MessageHeader *m) -{ - GNUNET_CADET_receive_done(channel); - const struct GNUNET_MULTICAST_MessageHeader * - msg = (const struct GNUNET_MULTICAST_MessageHeader *) m; - uint16_t size = ntohs (m->size); - if (size < sizeof (*msg)) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } - struct Channel *chn = *ctx; - if (NULL == chn) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } - if (ntohl (msg->purpose.size) != (size - - sizeof (msg->header) - - sizeof (msg->hop_counter) - - sizeof (msg->signature))) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } - if (GNUNET_OK != - GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE, - &msg->purpose, &msg->signature, - &chn->group_pub_key)) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } - - client_send_all (&chn->group_pub_hash, m); - return GNUNET_OK; -} - - -/** - * Incoming multicast request message from CADET. - */ -int -cadet_recv_request (void *cls, - struct GNUNET_CADET_Channel *channel, - void **ctx, - const struct GNUNET_MessageHeader *m) -{ - GNUNET_CADET_receive_done(channel); - const struct GNUNET_MULTICAST_RequestHeader * - req = (const struct GNUNET_MULTICAST_RequestHeader *) m; - uint16_t size = ntohs (m->size); - if (size < sizeof (*req)) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } - struct Channel *chn = *ctx; - if (NULL == chn) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } - if (ntohl (req->purpose.size) != (size - - sizeof (req->header) - - sizeof (req->member_pub_key) - - sizeof (req->signature))) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } - if (GNUNET_OK != - GNUNET_CRYPTO_ecdsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST, - &req->purpose, &req->signature, - &req->member_pub_key)) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } - - client_send_origin (&chn->group_pub_hash, m); - return GNUNET_OK; -} - - -/** - * Incoming multicast replay request from CADET. - */ -int -cadet_recv_replay_request (void *cls, - struct GNUNET_CADET_Channel *channel, - void **ctx, - const struct GNUNET_MessageHeader *m) -{ - GNUNET_CADET_receive_done(channel); - struct MulticastReplayRequestMessage rep; - uint16_t size = ntohs (m->size); - if (size < sizeof (rep)) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } - struct Channel *chn = *ctx; - - GNUNET_memcpy (&rep, m, sizeof (rep)); - GNUNET_memcpy (&rep.member_pub_key, &chn->member_pub_key, sizeof (chn->member_pub_key)); - - struct GNUNET_CONTAINER_MultiHashMap * - grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet, - &chn->group->pub_key_hash); - if (NULL == grp_replay_req) - { - grp_replay_req = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); - GNUNET_CONTAINER_multihashmap_put (replay_req_cadet, - &chn->group->pub_key_hash, grp_replay_req, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); - } - struct GNUNET_HashCode key_hash; - replay_key_hash (rep.fragment_id, rep.message_id, rep.fragment_offset, - rep.flags, &key_hash); - GNUNET_CONTAINER_multihashmap_put (grp_replay_req, &key_hash, chn, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); - - client_send_random (&chn->group_pub_hash, &rep.header); - return GNUNET_OK; -} - - -/** - * Incoming multicast replay response from CADET. - */ -int -cadet_recv_replay_response (void *cls, - struct GNUNET_CADET_Channel *channel, - void **ctx, - const struct GNUNET_MessageHeader *m) -{ - GNUNET_CADET_receive_done(channel); - //struct Channel *chn = *ctx; - - /* @todo FIXME: got replay error response, send request to other members */ - - return GNUNET_OK; -} - - /** * A new client connected. * @@ -1898,32 +2057,6 @@ client_notify_disconnect (void *cls, } -/** - * Message handlers for CADET. - */ -static const struct GNUNET_CADET_MessageHandler cadet_handlers[] = { - { cadet_recv_join_request, - GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, 0 }, - - { cadet_recv_join_decision, - GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION, 0 }, - - { cadet_recv_message, - GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, 0 }, - - { cadet_recv_request, - GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST, 0 }, - - { cadet_recv_replay_request, - GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, 0 }, - - { cadet_recv_replay_response, - GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE, 0 }, - - { NULL, 0, 0 } -}; - - /** * Service started. * @@ -1949,9 +2082,8 @@ run (void *cls, replay_req_cadet = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); replay_req_client = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); - cadet = GNUNET_CADET_connect (cfg, NULL, - cadet_notify_channel_end, - cadet_handlers); + cadet = GNUNET_CADET_connecT (cfg); + GNUNET_assert (NULL != cadet); GNUNET_SCHEDULER_add_shutdown (&shutdown_task,