/*
This file is part of GNUnet.
- Copyright (C) 2012, 2013 Christian Grothoff (and other contributing authors)
+ Copyright (C) 2012, 2013 GNUnet e.V.
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
*/
uint8_t in_transmit;
+ /**
+ * Number of MULTICAST_FRAGMENT_ACK messages we are still waiting for.
+ */
+ uint8_t acks_pending;
+
/**
* Is this the origin or a member?
*/
/**
* Public key of the member requesting join.
*/
- struct GNUNET_CRYPTO_EcdsaPublicKey member_key;
+ struct GNUNET_CRYPTO_EcdsaPublicKey member_pub_key;
/**
* Peer identity of the member requesting join.
*/
struct GNUNET_MULTICAST_MemberReplayHandle
{
-
- GNUNET_MULTICAST_ResultCallback result_cb;
- void *result_cls;
};
+static void
+origin_to_all (struct GNUNET_MULTICAST_Origin *orig);
+
+static void
+member_to_origin (struct GNUNET_MULTICAST_Member *mem);
+
+
/**
* Send first message to the service after connecting.
*/
group_send_connect_msg (struct GNUNET_MULTICAST_Group *grp)
{
uint16_t cmsg_size = ntohs (grp->connect_msg->size);
- struct GNUNET_MessageHeader * cmsg = GNUNET_malloc (cmsg_size);
+ struct GNUNET_MessageHeader *cmsg = GNUNET_malloc (cmsg_size);
memcpy (cmsg, grp->connect_msg, cmsg_size);
GNUNET_CLIENT_MANAGER_transmit_now (grp->client, cmsg);
+ GNUNET_free (cmsg);
}
jmsg = NULL;
jh = GNUNET_malloc (sizeof (*jh));
jh->group = grp;
- jh->member_key = jreq->member_key;
+ jh->member_pub_key = jreq->member_pub_key;
jh->peer = jreq->peer;
- grp->join_req_cb (grp->cb_cls, &jreq->member_key, jmsg, jh);
+ grp->join_req_cb (grp->cb_cls, &jreq->member_pub_key, jmsg, jh);
}
}
+/**
+ * Receive message/request fragment acknowledgement from service.
+ */
+static void
+group_recv_fragment_ack (void *cls,
+ struct GNUNET_CLIENT_MANAGER_Connection *client,
+ const struct GNUNET_MessageHeader *msg)
+{
+ struct GNUNET_MULTICAST_Group *
+ grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Got fragment ACK. in_transmit=%u, acks_pending=%u\n",
+ grp, grp->in_transmit, grp->acks_pending);
+
+ if (0 == grp->acks_pending)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Ignoring extraneous fragment ACK.\n", grp);
+ return;
+ }
+ grp->acks_pending--;
+
+ if (GNUNET_YES != grp->in_transmit)
+ return;
+
+ if (GNUNET_YES == grp->is_origin)
+ origin_to_all ((struct GNUNET_MULTICAST_Origin *) grp);
+ else
+ member_to_origin ((struct GNUNET_MULTICAST_Member *) grp);
+}
+
/**
* Origin receives uniquest request from a member.
*/
struct GNUNET_MULTICAST_ReplayHandle * rh = GNUNET_malloc (sizeof (*rh));
rh->grp = grp;
rh->req = *rep;
- grp->replay_frag_cb (grp->cb_cls, &rep->member_key,
+ grp->replay_frag_cb (grp->cb_cls, &rep->member_pub_key,
GNUNET_ntohll (rep->fragment_id),
GNUNET_ntohll (rep->flags), rh);
}
struct GNUNET_MULTICAST_ReplayHandle * rh = GNUNET_malloc (sizeof (*rh));
rh->grp = grp;
rh->req = *rep;
- grp->replay_msg_cb (grp->cb_cls, &rep->member_key,
+ grp->replay_msg_cb (grp->cb_cls, &rep->member_pub_key,
GNUNET_ntohll (rep->message_id),
GNUNET_ntohll (rep->fragment_offset),
GNUNET_ntohll (rep->flags), rh);
GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
sizeof (struct GNUNET_MULTICAST_RequestHeader), GNUNET_YES },
+ { group_recv_fragment_ack, NULL,
+ GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
+ sizeof (struct GNUNET_MessageHeader), GNUNET_YES },
+
{ group_recv_join_request, NULL,
GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
sizeof (struct MulticastJoinRequestMessage), GNUNET_YES },
GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES },
+ { group_recv_fragment_ack, NULL,
+ GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
+ sizeof (struct GNUNET_MessageHeader), GNUNET_YES },
+
{ group_recv_join_request, NULL,
GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
sizeof (struct MulticastJoinRequestMessage), GNUNET_YES },
hdcsn->header.size = htons (sizeof (*hdcsn) + sizeof (*dcsn)
+ relay_size + join_resp_size);
hdcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION);
- hdcsn->member_key = join->member_key;
+ hdcsn->member_pub_key = join->member_pub_key;
hdcsn->peer = join->peer;
dcsn = (struct MulticastJoinDecisionMessage *) &hdcsn[1];
memcpy (((char *) &dcsn[1]) + relay_size, join_resp, join_resp_size);
GNUNET_CLIENT_MANAGER_transmit (grp->client, &hdcsn->header);
+ GNUNET_free (hdcsn);
GNUNET_free (join);
return NULL;
}
static void
origin_to_all (struct GNUNET_MULTICAST_Origin *orig)
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, "origin_to_all()\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%p origin_to_all()\n", orig);
struct GNUNET_MULTICAST_Group *grp = &orig->grp;
struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
+ GNUNET_assert (GNUNET_YES == grp->in_transmit);
size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
struct GNUNET_MULTICAST_MessageHeader *msg = GNUNET_malloc (buf_size);
|| GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size)
{
LOG (GNUNET_ERROR_TYPE_ERROR,
- "OriginTransmitNotify() returned error or invalid message size.\n");
+ "%p OriginTransmitNotify() returned error or invalid message size.\n",
+ orig);
/* FIXME: handle error */
GNUNET_free (msg);
return;
if (GNUNET_NO == ret && 0 == buf_size)
{
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%p OriginTransmitNotify() - transmission paused.\n", orig);
GNUNET_free (msg);
return; /* Transmission paused. */
}
msg->fragment_offset = GNUNET_htonll (tmit->fragment_offset);
tmit->fragment_offset += sizeof (*msg) + buf_size;
+ grp->acks_pending++;
GNUNET_CLIENT_MANAGER_transmit (grp->client, &msg->header);
+ GNUNET_free (msg);
+
+ if (GNUNET_YES == ret)
+ grp->in_transmit = GNUNET_NO;
}
GNUNET_MULTICAST_OriginTransmitNotify notify,
void *notify_cls)
{
-/* FIXME
- if (GNUNET_YES == orig->grp.in_transmit)
+ struct GNUNET_MULTICAST_Group *grp = &orig->grp;
+ if (GNUNET_YES == grp->in_transmit)
return NULL;
- orig->grp.in_transmit = GNUNET_YES;
-*/
+ grp->in_transmit = GNUNET_YES;
struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
tmit->origin = orig;
tmit->message_id = message_id;
+ tmit->fragment_offset = 0;
tmit->group_generation = group_generation;
tmit->notify = notify;
tmit->notify_cls = notify_cls;
void
GNUNET_MULTICAST_origin_to_all_resume (struct GNUNET_MULTICAST_OriginTransmitHandle *th)
{
+ struct GNUNET_MULTICAST_Group *grp = &th->origin->grp;
+ if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit)
+ return;
origin_to_all (th->origin);
}
void
GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginTransmitHandle *th)
{
+ th->origin->grp.in_transmit = GNUNET_NO;
}
*/
struct GNUNET_MULTICAST_Member *
GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
- const struct GNUNET_CRYPTO_EddsaPublicKey *group_key,
+ const struct GNUNET_CRYPTO_EddsaPublicKey *group_pub_key,
const struct GNUNET_CRYPTO_EcdsaPrivateKey *member_key,
const struct GNUNET_PeerIdentity *origin,
uint16_t relay_count,
join = GNUNET_malloc (sizeof (*join) + relay_size + join_msg_size);
join->header.size = htons (sizeof (*join) + relay_size + join_msg_size);
join->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN);
- join->group_key = *group_key;
+ join->group_pub_key = *group_pub_key;
join->member_key = *member_key;
join->origin = *origin;
join->relay_count = ntohl (relay_count);
* @param flags
* Additional flags for the replay request.
* It is used and defined by GNUNET_MULTICAST_ReplayFragmentCallback
- * @param result_cb
- * Function to call when the replayed message fragment arrives.
- * @param result_cls
- * Closure for @a result_cb.
*
* @return Replay request handle.
*/
struct GNUNET_MULTICAST_MemberReplayHandle *
GNUNET_MULTICAST_member_replay_fragment (struct GNUNET_MULTICAST_Member *mem,
uint64_t fragment_id,
- uint64_t flags,
- GNUNET_MULTICAST_ResultCallback result_cb,
- void *result_cls)
+ uint64_t flags)
{
member_replay_request (mem, fragment_id, 0, 0, flags);
+ // FIXME: return
}
* @param flags
* Additional flags for the replay request.
* It is used & defined by GNUNET_MULTICAST_ReplayMessageCallback
- * @param result_cb
- * Function to call for each replayed message fragment.
- * @param result_cls
- * Closure for @a result_cb.
*
* @return Replay request handle, NULL on error.
*/
GNUNET_MULTICAST_member_replay_message (struct GNUNET_MULTICAST_Member *mem,
uint64_t message_id,
uint64_t fragment_offset,
- uint64_t flags,
- GNUNET_MULTICAST_ResultCallback result_cb,
- void *result_cls)
+ uint64_t flags)
{
member_replay_request (mem, 0, message_id, fragment_offset, flags);
-}
-
-
-/**
- * Cancel a replay request.
- *
- * @param rh
- * Request to cancel.
- */
-void
-GNUNET_MULTICAST_member_replay_cancel (struct GNUNET_MULTICAST_MemberReplayHandle *rh)
-{
+ // FIXME: return
}
LOG (GNUNET_ERROR_TYPE_DEBUG, "member_to_origin()\n");
struct GNUNET_MULTICAST_Group *grp = &mem->grp;
struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
+ GNUNET_assert (GNUNET_YES == grp->in_transmit);
size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
struct GNUNET_MULTICAST_RequestHeader *req = GNUNET_malloc (buf_size);
tmit->fragment_offset += sizeof (*req) + buf_size;
GNUNET_CLIENT_MANAGER_transmit (grp->client, &req->header);
+ GNUNET_free (req);
+
+ if (GNUNET_YES == ret)
+ grp->in_transmit = GNUNET_NO;
}
GNUNET_MULTICAST_MemberTransmitNotify notify,
void *notify_cls)
{
-/* FIXME
if (GNUNET_YES == mem->grp.in_transmit)
return NULL;
mem->grp.in_transmit = GNUNET_YES;
-*/
struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
tmit->member = mem;
tmit->request_id = request_id;
+ tmit->fragment_offset = 0;
tmit->notify = notify;
tmit->notify_cls = notify_cls;
void
GNUNET_MULTICAST_member_to_origin_resume (struct GNUNET_MULTICAST_MemberTransmitHandle *th)
{
+ struct GNUNET_MULTICAST_Group *grp = &th->member->grp;
+ if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit)
+ return;
member_to_origin (th->member);
}
void
GNUNET_MULTICAST_member_to_origin_cancel (struct GNUNET_MULTICAST_MemberTransmitHandle *th)
{
+ th->member->grp.in_transmit = GNUNET_NO;
}