* Send message to all clients connected to the group.
*/
static void
-client_send_group (const struct Group *grp,
- const struct GNUNET_MessageHeader *msg)
+client_send_group_keep_envelope (const struct Group *grp,
+ struct GNUNET_MQ_Envelope *env)
{
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "%p Sending message to all clients of the group.\n", grp);
+ struct ClientList *cli = grp->clients_head;
- struct ClientList *cl = grp->clients_head;
- while (NULL != cl)
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "%p Sending message to all clients of the group.\n",
+ grp);
+ while (NULL != cli)
{
- struct GNUNET_MQ_Envelope *
- env = GNUNET_MQ_msg_copy (msg);
-
- GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (cl->client),
- env);
- cl = cl->next;
+ GNUNET_MQ_send_copy (GNUNET_SERVICE_client_get_mq (cli->client),
+ env);
+ cli = cli->next;
}
}
+/**
+ * Send message to all clients connected to the group and
+ * takes care of freeing @env.
+ */
+static void
+client_send_group (const struct Group *grp,
+ struct GNUNET_MQ_Envelope *env)
+{
+ client_send_group_keep_envelope (grp, env);
+ GNUNET_MQ_discard (env);
+}
+
+
/**
* Iterator callback for sending a message to origin clients.
*/
client_send_origin_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
void *origin)
{
- const struct GNUNET_MessageHeader *msg = cls;
+ struct GNUNET_MQ_Envelope *env = cls;
struct Member *orig = origin;
- client_send_group (&orig->group, msg);
+ client_send_group_keep_envelope (&orig->group, env);
return GNUNET_YES;
}
client_send_member_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
void *member)
{
- const struct GNUNET_MessageHeader *msg = cls;
+ struct GNUNET_MQ_Envelope *env = cls;
struct Member *mem = member;
if (NULL != mem->join_dcsn)
{ /* Only send message to admitted members */
- client_send_group (&mem->group, msg);
+ client_send_group_keep_envelope (&mem->group, env);
}
return GNUNET_YES;
}
*/
static int
client_send_all (struct GNUNET_HashCode *pub_key_hash,
- const struct GNUNET_MessageHeader *msg)
+ struct GNUNET_MQ_Envelope *env)
{
int n = 0;
n += GNUNET_CONTAINER_multihashmap_get_multiple (origins, pub_key_hash,
client_send_origin_cb,
- (void *) msg);
+ (void *) env);
n += GNUNET_CONTAINER_multihashmap_get_multiple (members, pub_key_hash,
client_send_member_cb,
- (void *) msg);
+ (void *) env);
+ GNUNET_MQ_discard (env);
return n;
}
*/
static int
client_send_random (struct GNUNET_HashCode *pub_key_hash,
- const struct GNUNET_MessageHeader *msg)
+ struct GNUNET_MQ_Envelope *env)
{
int n = 0;
n = GNUNET_CONTAINER_multihashmap_get_random (origins, client_send_origin_cb,
- (void *) msg);
+ (void *) env);
if (n <= 0)
n = GNUNET_CONTAINER_multihashmap_get_random (members, client_send_member_cb,
- (void *) msg);
+ (void *) env);
return n;
}
*/
static int
client_send_origin (struct GNUNET_HashCode *pub_key_hash,
- const struct GNUNET_MessageHeader *msg)
+ struct GNUNET_MQ_Envelope *env)
{
int n = 0;
n += GNUNET_CONTAINER_multihashmap_get_multiple (origins, pub_key_hash,
client_send_origin_cb,
- (void *) msg);
+ (void *) env);
return n;
}
static void
client_send_ack (struct GNUNET_HashCode *pub_key_hash)
{
+ struct GNUNET_MQ_Envelope *env;
+
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Sending message ACK to client.\n");
-
- static struct GNUNET_MessageHeader *msg = NULL;
- if (NULL == msg)
- {
- msg = GNUNET_malloc (sizeof (*msg));
- msg->type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK);
- msg->size = htons (sizeof (*msg));
- }
- client_send_all (pub_key_hash, msg);
+ env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK);
+ client_send_all (pub_key_hash, env);
}
chn->peer = req->peer;
chn->join_status = JOIN_WAITING;
- client_send_all (&group_pub_hash, &req->header);
+ client_send_all (&group_pub_hash,
+ GNUNET_MQ_msg_copy (&req->header));
}
{
struct Channel *chn = cls;
GNUNET_CADET_receive_done (chn->channel);
- client_send_all (&chn->group_pub_hash, &msg->header);
+ client_send_all (&chn->group_pub_hash,
+ GNUNET_MQ_msg_copy (&msg->header));
}
{
struct Channel *chn = cls;
GNUNET_CADET_receive_done (chn->channel);
- client_send_origin (&chn->group_pub_hash, &req->header);
+ client_send_origin (&chn->group_pub_hash,
+ GNUNET_MQ_msg_copy (&req->header));
}
-static int
-check_cadet_replay_request (void *cls,
- const struct MulticastReplayRequestMessage *req)
-{
- uint16_t size = ntohs (req->header.size);
- if (size < sizeof (*req))
- {
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
-
- struct Channel *chn = cls;
- if (NULL == chn)
- {
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
-
- return GNUNET_OK;
-}
+// FIXME: do checks in handle_cadet_replay_request
+//static int
+//check_cadet_replay_request (void *cls,
+// const struct MulticastReplayRequestMessage *req)
+//{
+// uint16_t size = ntohs (req->header.size);
+// if (size < sizeof (*req))
+// {
+// GNUNET_break_op (0);
+// return GNUNET_SYSERR;
+// }
+//
+// struct Channel *chn = cls;
+// if (NULL == chn)
+// {
+// GNUNET_break_op (0);
+// return GNUNET_SYSERR;
+// }
+//
+// return GNUNET_OK;
+//}
/**
const struct MulticastReplayRequestMessage *req)
{
struct Channel *chn = cls;
+
GNUNET_CADET_receive_done (chn->channel);
struct MulticastReplayRequestMessage rep = *req;
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
}
struct GNUNET_HashCode key_hash;
- replay_key_hash (rep.fragment_id, rep.message_id, rep.fragment_offset,
- rep.flags, &key_hash);
+ replay_key_hash (rep.fragment_id,
+ rep.message_id,
+ rep.fragment_offset,
+ rep.flags,
+ &key_hash);
GNUNET_CONTAINER_multihashmap_put (grp_replay_req, &key_hash, chn,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
- client_send_random (&chn->group_pub_hash, &rep.header);
+ client_send_random (&chn->group_pub_hash,
+ GNUNET_MQ_msg_copy (&rep.header));
}
struct MulticastJoinDecisionMessageHeader,
chn),
- GNUNET_MQ_hd_var_size (cadet_replay_request,
- GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
- struct MulticastReplayRequestMessage,
- chn),
+ GNUNET_MQ_hd_fixed_size (cadet_replay_request,
+ GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
+ struct MulticastReplayRequestMessage,
+ chn),
GNUNET_MQ_hd_var_size (cadet_replay_response,
GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE,
struct MulticastJoinRequestMessage,
grp),
- GNUNET_MQ_hd_var_size (cadet_replay_request,
- GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
- struct MulticastReplayRequestMessage,
- grp),
+ GNUNET_MQ_hd_fixed_size (cadet_replay_request,
+ GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
+ struct MulticastReplayRequestMessage,
+ grp),
GNUNET_MQ_hd_var_size (cadet_replay_response,
GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE,
}
GNUNET_CONTAINER_multihashmap_put (grp_mem, &mem->pub_key_hash, mem,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
-
+
+ // FIXME: should the members hash map have option UNIQUE_FAST?
GNUNET_CONTAINER_multihashmap_put (members, &grp->pub_key_hash, mem,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
}
char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&mem->pub_key);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Client connected to group %s as member %s (%s).\n",
+ "Client connected to group %s as member %s (%s). size = %d\n",
GNUNET_h2s (&grp->pub_key_hash),
GNUNET_h2s2 (&mem->pub_key_hash),
- str);
+ str,
+ GNUNET_CONTAINER_multihashmap_size (members));
GNUNET_free (str);
if (NULL != mem->join_dcsn)
GNUNET_free (mem->join_req);
mem->join_req = req;
- if (0 == client_send_origin (&grp->pub_key_hash, &mem->join_req->header))
+ if (0 ==
+ client_send_origin (&grp->pub_key_hash,
+ GNUNET_MQ_msg_copy (&mem->join_req->header)))
{ /* No local origins, send to remote origin */
cadet_send_join_request (mem);
}
client_send_join_decision (struct Member *mem,
const struct MulticastJoinDecisionMessageHeader *hdcsn)
{
- client_send_group (&mem->group, &hdcsn->header);
+ client_send_group (&mem->group, GNUNET_MQ_msg_copy (&hdcsn->header));
const struct MulticastJoinDecisionMessage *
dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1];
return;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p Got join decision from client for group %s..\n",
+ "%p got join decision from client for group %s..\n",
grp, GNUNET_h2s (&grp->pub_key_hash));
struct GNUNET_CONTAINER_MultiHashMap *
}
+static void
+handle_client_part_request (void *cls,
+ const struct GNUNET_MessageHeader *msg)
+{
+ struct Client *c = cls;
+ struct GNUNET_SERVICE_Client *client = c->client;
+ struct Group *grp = c->group;
+ struct GNUNET_MQ_Envelope *env;
+
+ if (NULL == grp)
+ {
+ GNUNET_break (0);
+ GNUNET_SERVICE_client_drop (client);
+ return;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p got part request from client for group %s.\n",
+ grp, GNUNET_h2s (&grp->pub_key_hash));
+ env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_MULTICAST_PART_ACK);
+ client_send_group (grp, env);
+ GNUNET_SERVICE_client_continue (client);
+}
+
+
static int
check_client_multicast_message (void *cls,
const struct GNUNET_MULTICAST_MessageHeader *msg)
GNUNET_assert (GNUNET_YES == grp->is_origin);
struct Origin *orig = grp->origin;
+ // FIXME: use GNUNET_MQ_msg_copy
/* FIXME: yucky, should use separate message structs for P2P and CS! */
struct GNUNET_MULTICAST_MessageHeader *
out = (struct GNUNET_MULTICAST_MessageHeader *) GNUNET_copy_message (&msg->header);
GNUNET_assert (0);
}
- client_send_all (&grp->pub_key_hash, &out->header);
+ client_send_all (&grp->pub_key_hash, GNUNET_MQ_msg_copy (&out->header));
cadet_send_children (&grp->pub_key_hash, &out->header);
client_send_ack (&grp->pub_key_hash);
GNUNET_free (out);
}
uint8_t send_ack = GNUNET_YES;
- if (0 == client_send_origin (&grp->pub_key_hash, &out->header))
+ if (0 ==
+ client_send_origin (&grp->pub_key_hash,
+ GNUNET_MQ_msg_copy (&out->header)))
{ /* No local origins, send to remote origin */
if (NULL != mem->origin_channel)
{
GNUNET_CONTAINER_multihashmap_put (grp_replay_req, &key_hash, client,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
- if (0 == client_send_origin (&grp->pub_key_hash, &rep->header))
+ if (0 ==
+ client_send_origin (&grp->pub_key_hash,
+ GNUNET_MQ_msg_copy (&rep->header)))
{ /* No local origin, replay from remote members / origin. */
if (NULL != mem->origin_channel)
{
else
{
/* FIXME: not yet connected to origin */
+ GNUNET_assert (0);
GNUNET_SERVICE_client_drop (client);
return;
}
grp, (GNUNET_YES == grp->is_origin) ? "origin" : "member",
GNUNET_h2s (&grp->pub_key_hash));
+ // FIXME (due to protocol change): here we must not remove all clients,
+ // only the one we were notified about!
struct ClientList *cl = grp->clients_head;
while (NULL != cl)
{
GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION,
struct MulticastJoinDecisionMessageHeader,
NULL),
+ GNUNET_MQ_hd_fixed_size (client_part_request,
+ GNUNET_MESSAGE_TYPE_MULTICAST_PART_REQUEST,
+ struct GNUNET_MessageHeader,
+ NULL),
GNUNET_MQ_hd_var_size (client_multicast_message,
GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
struct GNUNET_MULTICAST_MessageHeader,
static void
-group_disconnect (struct GNUNET_MULTICAST_Group *grp,
- GNUNET_ContinuationCallback cb,
- void *cls)
+handle_group_part_ack (void *cls,
+ const struct GNUNET_MessageHeader *msg)
{
- grp->is_disconnecting = GNUNET_YES;
- grp->disconnect_cb = cb;
- grp->disconnect_cls = cls;
+ struct GNUNET_MULTICAST_Group *grp = cls;
- if (NULL != grp->mq)
- {
- struct GNUNET_MQ_Envelope *last = GNUNET_MQ_get_last_envelope (grp->mq);
- if (NULL != last)
- {
- GNUNET_MQ_notify_sent (last,
- (GNUNET_SCHEDULER_TaskCallback) group_cleanup, grp);
- }
- else
- {
- group_cleanup (grp);
- }
- }
- else
- {
- group_cleanup (grp);
- }
+ group_cleanup (grp);
}
GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
struct MulticastJoinRequestMessage,
grp),
+ GNUNET_MQ_hd_fixed_size (group_part_ack,
+ GNUNET_MESSAGE_TYPE_MULTICAST_PART_ACK,
+ struct GNUNET_MessageHeader,
+ grp),
GNUNET_MQ_hd_fixed_size (group_replay_request,
GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
struct MulticastReplayRequestMessage,
void *stop_cls)
{
struct GNUNET_MULTICAST_Group *grp = &orig->grp;
+ struct GNUNET_MQ_Envelope *env;
- group_disconnect (grp, stop_cb, stop_cls);
+ grp->is_disconnecting = GNUNET_YES;
+ grp->disconnect_cb = stop_cb;
+ grp->disconnect_cls = stop_cls;
+ env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_MULTICAST_PART_REQUEST);
+ GNUNET_MQ_send (grp->mq, env);
}
GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION,
struct MulticastJoinDecisionMessageHeader,
mem),
+ GNUNET_MQ_hd_fixed_size (group_part_ack,
+ GNUNET_MESSAGE_TYPE_MULTICAST_PART_ACK,
+ struct GNUNET_MessageHeader,
+ grp),
GNUNET_MQ_hd_fixed_size (group_replay_request,
GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
struct MulticastReplayRequestMessage,
GNUNET_ContinuationCallback part_cb,
void *part_cls)
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Member parting.\n", mem);
struct GNUNET_MULTICAST_Group *grp = &mem->grp;
+ struct GNUNET_MQ_Envelope *env;
mem->join_dcsn_cb = NULL;
grp->join_req_cb = NULL;
grp->message_cb = NULL;
grp->replay_msg_cb = NULL;
grp->replay_frag_cb = NULL;
-
- group_disconnect (grp, part_cb, part_cls);
+ grp->is_disconnecting = GNUNET_YES;
+ grp->disconnect_cb = part_cb;
+ grp->disconnect_cls = part_cls;
+ env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_MULTICAST_PART_REQUEST);
+ GNUNET_MQ_send (grp->mq, env);
}