From: Bart Polot Date: Tue, 20 Sep 2016 01:21:59 +0000 (+0000) Subject: Port CADET to CORE MQ API X-Git-Tag: initial-import-from-subversion-38251~251 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=b4d5f474eef10017a470dccb01dae86c32bd5ddb;p=oweals%2Fgnunet.git Port CADET to CORE MQ API --- diff --git a/src/cadet/gnunet-service-cadet_connection.c b/src/cadet/gnunet-service-cadet_connection.c index 0c11c24df..29695243f 100644 --- a/src/cadet/gnunet-service-cadet_connection.c +++ b/src/cadet/gnunet-service-cadet_connection.c @@ -268,7 +268,7 @@ struct CadetConnectionQueue /** * Peer queue handle, to cancel if necessary. */ - struct CadetPeerQueue *q; + struct CadetPeerQueue *peer_q; /** * Continuation to call once sent. @@ -312,7 +312,8 @@ static struct GNUNET_CONTAINER_MultiHashMap *connections; /** * How many connections are we willing to maintain. - * Local connections are always allowed, even if there are more connections than max. + * Local connections are always allowed, + * even if there are more connections than max. */ static unsigned long long max_connections; @@ -620,41 +621,95 @@ send_ack (struct CadetConnection *c, unsigned int buffer, int fwd, int force) } +/** + * Update performance information if we are a connection's endpoint. + * + * @param c Connection to update. + * @param wait How much time did we wait to send the last message. + * @param size Size of the last message. + */ +static void +update_perf (struct CadetConnection *c, + struct GNUNET_TIME_Relative wait, + uint16_t size) +{ + struct CadetConnectionPerformance *p; + double usecsperbyte; + + if (NULL == c->perf) + return; /* Only endpoints are interested in timing. */ + + p = c->perf; + usecsperbyte = ((double) wait.rel_value_us) / size; + if (p->size == AVG_MSGS) + { + /* Array is full. Substract oldest value, add new one and store. */ + p->avg -= (p->usecsperbyte[p->idx] / AVG_MSGS); + p->usecsperbyte[p->idx] = usecsperbyte; + p->avg += (p->usecsperbyte[p->idx] / AVG_MSGS); + } + else + { + /* Array not yet full. Add current value to avg and store. */ + p->usecsperbyte[p->idx] = usecsperbyte; + p->avg *= p->size; + p->avg += p->usecsperbyte[p->idx]; + p->size++; + p->avg /= p->size; + } + p->idx = (p->idx + 1) % AVG_MSGS; +} + + /** * Callback called when a connection queued message is sent. * * Calculates the average time and connection packet tracking. * - * @param cls Closure (ConnectionQueue Handle). + * @param cls Closure (ConnectionQueue Handle), can be NULL. * @param c Connection this message was on. + * @param fwd Was this a FWD going message? * @param sent Was it really sent? (Could have been canceled) * @param type Type of message sent. - * @param pid Packet ID, or 0 if not applicable (create, destroy, etc). - * @param fwd Was this a FWD going message? + * @param payload_type Type of payload, if applicable. + * @param pid Message ID, or 0 if not applicable (create, destroy, etc). * @param size Size of the message. * @param wait Time spent waiting for core (only the time for THIS message) - * @return #GNUNET_YES if connection was destroyed, #GNUNET_NO otherwise. */ -static int +static void conn_message_sent (void *cls, - struct CadetConnection *c, int sent, - uint16_t type, uint32_t pid, int fwd, size_t size, + struct CadetConnection *c, int fwd, int sent, + uint16_t type, uint16_t payload_type, uint32_t pid, + size_t size, struct GNUNET_TIME_Relative wait) { - struct CadetConnectionPerformance *p; - struct CadetFlowControl *fc; struct CadetConnectionQueue *q = cls; - double usecsperbyte; + struct CadetFlowControl *fc; int forced; GCC_check_connections (); - LOG (GNUNET_ERROR_TYPE_DEBUG, "connection message_sent\n"); + /* If c is NULL, nothing to update. */ + if (NULL == c) + { + if (type != GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN + && type != GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY) + { + LOG (GNUNET_ERROR_TYPE_ERROR, "Message %s sent on NULL connection!\n", + GC_m2s (type)); + } + GCC_check_connections (); + return; + } + + LOG (GNUNET_ERROR_TYPE_DEBUG, "connection message_sent\n"); GCC_debug (c, GNUNET_ERROR_TYPE_DEBUG); + /* Update flow control info. */ fc = fwd ? &c->fwd_fc : &c->bck_fc; LOG (GNUNET_ERROR_TYPE_DEBUG, " %ssent %s %s pid %u\n", - sent ? "" : "not ", GC_f2s (fwd), GC_m2s (type), pid); + sent ? "" : "not ", GC_f2s (fwd), + GC_m2s (type), GC_m2s (payload_type), pid); if (NULL != q) { forced = q->forced; @@ -674,17 +729,7 @@ conn_message_sent (void *cls, { forced = GNUNET_NO; } - if (NULL == c) - { - if (type != GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN - && type != GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY) - { - LOG (GNUNET_ERROR_TYPE_ERROR, "Message %s sent on NULL connection!\n", - GC_m2s (type)); - } - GCC_check_connections (); - return GNUNET_NO; - } + LOG (GNUNET_ERROR_TYPE_DEBUG, " C_P- %p %u\n", c, c->pending_messages); c->pending_messages--; if ( (GNUNET_YES == c->destroy) && @@ -694,8 +739,9 @@ conn_message_sent (void *cls, "! destroying connection!\n"); GCC_destroy (c); GCC_check_connections (); - return GNUNET_YES; + return; } + /* Send ACK if needed, after accounting for sent ID in fc->queue_n */ switch (type) { @@ -758,30 +804,8 @@ conn_message_sent (void *cls, } LOG (GNUNET_ERROR_TYPE_DEBUG, "! message sent!\n"); - if (NULL == c->perf) - return GNUNET_NO; /* Only endpoints are interested in timing. */ - - p = c->perf; - usecsperbyte = ((double) wait.rel_value_us) / size; - if (p->size == AVG_MSGS) - { - /* Array is full. Substract oldest value, add new one and store. */ - p->avg -= (p->usecsperbyte[p->idx] / AVG_MSGS); - p->usecsperbyte[p->idx] = usecsperbyte; - p->avg += (p->usecsperbyte[p->idx] / AVG_MSGS); - } - else - { - /* Array not yet full. Add current value to avg and store. */ - p->usecsperbyte[p->idx] = usecsperbyte; - p->avg *= p->size; - p->avg += p->usecsperbyte[p->idx]; - p->size++; - p->avg /= p->size; - } - p->idx = (p->idx + 1) % AVG_MSGS; + update_perf (c, wait, size); GCC_check_connections (); - return GNUNET_NO; } @@ -950,27 +974,26 @@ is_ooo_ok (uint32_t last_pid_recv, uint32_t ooo_pid, uint32_t ooo_bitmap) * Is traffic coming from this sender 'FWD' traffic? * * @param c Connection to check. - * @param sender Peer identity of neighbor. + * @param sender Short peer identity of neighbor. * * @return #GNUNET_YES in case the sender is the 'prev' hop and therefore * the traffic is 'FWD'. * #GNUNET_NO for BCK. - * #GNUNET_SYSERR for errors. + * #GNUNET_SYSERR for errors (sender isn't a hop in the connection). */ static int is_fwd (const struct CadetConnection *c, - const struct GNUNET_PeerIdentity *sender) + const struct CadetPeer *sender) { GNUNET_PEER_Id id; - id = GNUNET_PEER_search (sender); + id = GCP_get_short_id (sender); if (GCP_get_short_id (get_prev_hop (c)) == id) return GNUNET_YES; if (GCP_get_short_id (get_next_hop (c)) == id) return GNUNET_NO; - GNUNET_break (0); return GNUNET_SYSERR; } @@ -979,29 +1002,40 @@ is_fwd (const struct CadetConnection *c, * Sends a CONNECTION ACK message in reponse to a received CONNECTION_CREATE * or a first CONNECTION_ACK directed to us. * - * @param connection Connection to confirm. + * @param c Connection to confirm. * @param fwd Should we send it FWD? (root->dest) * (First (~SYNACK) goes BCK, second (~ACK) goes FWD) */ static void -send_connection_ack (struct CadetConnection *connection, int fwd) +send_connection_ack (struct CadetConnection *c, int fwd) { + struct GNUNET_CADET_ConnectionACK msg; struct CadetTunnel *t; size_t size = sizeof (struct GNUNET_CADET_ConnectionACK); GCC_check_connections (); - t = connection->t; + t = c->t; LOG (GNUNET_ERROR_TYPE_INFO, "==> { C %s ACK} %19s on conn %s (%p) %s [%5u]\n", - GC_f2s (!fwd), "", GCC_2s (connection), connection, GC_f2s (fwd), size); - GCP_queue_add (get_hop (connection, fwd), NULL, - GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK, UINT16_MAX, 0, - size, connection, fwd, &conn_message_sent, NULL); - connection->pending_messages++; + GC_f2s (!fwd), "", GCC_2s (c), c, GC_f2s (fwd), size); + + msg.header.size = htons (sizeof (struct GNUNET_CADET_ConnectionACK)); + msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK); + msg.cid = c->id; + + GNUNET_assert (NULL == c->maintenance_q); + c->maintenance_q = GCP_send (get_hop (c, fwd), &msg.header, + GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK, 0, + c, fwd, + &conn_message_sent, NULL); + LOG (GNUNET_ERROR_TYPE_DEBUG, " C_P+ %p %u (conn`ACK)\n", + c, c->pending_messages); + c->pending_messages++; + if (CADET_TUNNEL_NEW == GCT_get_cstate (t)) GCT_change_cstate (t, CADET_TUNNEL_WAITING); - if (CADET_CONNECTION_READY != connection->state) - connection_change_state (connection, CADET_CONNECTION_SENT); + if (CADET_CONNECTION_READY != c->state) + connection_change_state (c, CADET_CONNECTION_SENT); GCC_check_connections (); } @@ -1042,17 +1076,15 @@ send_broken (struct CadetConnection *c, * @param connection_id Connection ID. * @param id1 Peer that has disconnected, probably local peer. * @param id2 Peer that has disconnected can be NULL if unknown. - * @param peer Peer to notify (neighbor who sent the connection). + * @param neighbor Peer to notify (neighbor who sent the connection). */ static void send_broken_unknown (const struct GNUNET_CADET_Hash *connection_id, const struct GNUNET_PeerIdentity *id1, const struct GNUNET_PeerIdentity *id2, - const struct GNUNET_PeerIdentity *peer_id) + struct CadetPeer *neighbor) { struct GNUNET_CADET_ConnectionBroken *msg; - struct CadetPeerQueue *q; - struct CadetPeer *neighbor; GCC_check_connections (); LOG (GNUNET_ERROR_TYPE_INFO, "--> BROKEN on unknown connection %s\n", @@ -1067,14 +1099,10 @@ send_broken_unknown (const struct GNUNET_CADET_Hash *connection_id, msg->peer2 = *id2; else memset (&msg->peer2, 0, sizeof (msg->peer2)); - neighbor = GCP_get (peer_id, GNUNET_NO); /* We MUST know neighbor. */ - GNUNET_assert (NULL != neighbor); - q = GCP_queue_add (neighbor, msg, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN, - UINT16_MAX, 2, - sizeof (struct GNUNET_CADET_ConnectionBroken), - NULL, GNUNET_SYSERR, /* connection, fwd */ - NULL, NULL); /* continuation */ - GNUNET_assert (NULL != q); + GNUNET_assert (NULL != GCP_send (neighbor, &msg->header, + UINT16_MAX, 2, + NULL, GNUNET_SYSERR, /* connection, fwd */ + NULL, NULL)); /* continuation */ GCC_check_connections (); } @@ -1310,38 +1338,6 @@ schedule_next_keepalive (struct CadetConnection *c, int fwd) } -/** - * @brief Re-initiate traffic on this connection if necessary. - * - * Check if there is traffic queued towards this peer - * and the core transmit handle is NULL (traffic was stalled). - * If so, call core tmt rdy. - * - * @param c Connection on which initiate traffic. - * @param fwd Is this about fwd traffic? - */ -static void -connection_unlock_queue (struct CadetConnection *c, int fwd) -{ - struct CadetPeer *peer; - - GCC_check_connections (); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "connection_unlock_queue %s on %s\n", - GC_f2s (fwd), GCC_2s (c)); - - if (GCC_is_terminal (c, fwd)) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, " is terminal, can unlock!\n"); - return; - } - - peer = get_hop (c, fwd); - GCP_queue_unlock (peer, c); - GCC_check_connections (); -} - - /** * Cancel all transmissions that belong to a certain connection. * @@ -1356,7 +1352,6 @@ connection_cancel_queues (struct CadetConnection *c, int fwd) { struct CadetFlowControl *fc; - struct CadetPeer *peer; GCC_check_connections (); LOG (GNUNET_ERROR_TYPE_DEBUG, @@ -1380,8 +1375,6 @@ connection_cancel_queues (struct CadetConnection *c, GCC_cancel (fc->poll_msg); LOG (GNUNET_ERROR_TYPE_DEBUG, " cancelled POLL msg for fc %p\n", fc); } - peer = get_hop (c, fwd); - GCP_queue_cancel (peer, c); GCC_check_connections (); } @@ -1470,53 +1463,6 @@ connection_poll (void *cls) } -/** - * Resend all queued messages for a connection on other connections of the - * same tunnel, if possible. The connection WILL BE DESTROYED by this function. - * - * @param c Connection whose messages to resend. - * @param fwd Resend fwd messages? - */ -static void -resend_messages_and_destroy (struct CadetConnection *c, int fwd) -{ - struct GNUNET_MessageHeader *out_msg; - struct CadetTunnel *t = c->t; - struct CadetPeer *neighbor; - unsigned int pending; - int destroyed; - - GCC_check_connections (); - mark_destroyed (c); - - destroyed = GNUNET_NO; - neighbor = get_hop (c, fwd); - pending = c->pending_messages; - - while (NULL != (out_msg = GCP_connection_pop (neighbor, c, &destroyed))) - { - if (NULL != t) - GCT_resend_message (out_msg, t); - GNUNET_free (out_msg); - } - - /* All pending messages should have been popped, - * and the connection destroyed by the continuation. - */ - if (GNUNET_YES != destroyed) - { - if (0 != pending) - { - GNUNET_break (0); - GCC_debug (c, GNUNET_ERROR_TYPE_ERROR); - if (NULL != t) GCT_debug (t, GNUNET_ERROR_TYPE_ERROR); - } - GCC_destroy (c); - } - GCC_check_connections (); -} - - /** * Generic connection timeout implementation. * @@ -1529,10 +1475,7 @@ resend_messages_and_destroy (struct CadetConnection *c, int fwd) static void connection_timeout (struct CadetConnection *c, int fwd) { - struct CadetFlowControl *reverse_fc; - GCC_check_connections (); - reverse_fc = fwd ? &c->bck_fc : &c->fwd_fc; LOG (GNUNET_ERROR_TYPE_INFO, "Connection %s %s timed out. Destroying.\n", @@ -1546,17 +1489,13 @@ connection_timeout (struct CadetConnection *c, int fwd) return; } - /* If dest, salvage queued traffic. */ + /* If dest, send "broken" notification. */ if (GCC_is_terminal (c, fwd)) { - const struct GNUNET_PeerIdentity *next_hop; + struct CadetPeer *next_hop; - next_hop = GCP_get_id (fwd ? get_prev_hop (c) : get_next_hop (c)); + next_hop = fwd ? get_prev_hop (c) : get_next_hop (c); send_broken_unknown (&c->id, &my_full_id, NULL, next_hop); - if (0 < reverse_fc->queue_n) - resend_messages_and_destroy (c, !fwd); - GCC_check_connections (); - return; } GCC_destroy (c); @@ -1907,13 +1846,13 @@ add_to_peer (struct CadetConnection *c, * Log receipt of message on stderr (INFO level). * * @param message Message received. - * @param peer Peer who sent the message. - * @param hash Connection ID. + * @param peer Peer who sent the message. + * @param conn_id Connection ID of the message. */ static void log_message (const struct GNUNET_MessageHeader *message, - const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_CADET_Hash *hash) + const struct CadetPeer *peer, + const struct GNUNET_CADET_Hash *conn_id) { uint16_t size; uint16_t type; @@ -1933,8 +1872,8 @@ log_message (const struct GNUNET_MessageHeader *message, arrow = "--"; } LOG (GNUNET_ERROR_TYPE_INFO, "<%s %s on conn %s from %s, %6u bytes\n", - arrow, GC_m2s (type), GNUNET_h2s (GC_h2hc (hash)), - GNUNET_i2s (peer), (unsigned int) size); + arrow, GC_m2s (type), GNUNET_h2s (GC_h2hc (conn_id)), + GCP_2s(peer), (unsigned int) size); } /******************************************************************************/ @@ -1942,22 +1881,17 @@ log_message (const struct GNUNET_MessageHeader *message, /******************************************************************************/ /** - * Core handler for connection creation. + * Handler for connection creation. * - * @param cls Closure (unused). - * @param peer Sender (neighbor). - * @param message Message. - * @return #GNUNET_OK to keep the connection open, - * #GNUNET_SYSERR to close it (signal serious error) + * @param peer Message sender (neighbor). + * @param msg Message itself. */ -int -GCC_handle_create (void *cls, - const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_MessageHeader *message) +void +GCC_handle_create (struct CadetPeer *peer, + const struct GNUNET_CADET_ConnectionCreate *msg) { - struct GNUNET_CADET_ConnectionCreate *msg; + const struct GNUNET_CADET_Hash *cid; struct GNUNET_PeerIdentity *id; - struct GNUNET_CADET_Hash *cid; struct CadetPeerPath *path; struct CadetPeer *dest_peer; struct CadetPeer *orig_peer; @@ -1966,38 +1900,26 @@ GCC_handle_create (void *cls, uint16_t size; GCC_check_connections (); - /* Check size */ - size = ntohs (message->size); - if (size < sizeof (struct GNUNET_CADET_ConnectionCreate)) - { - GNUNET_break_op (0); - return GNUNET_OK; - } + size = ntohs (msg->header.size); /* Calculate hops */ size -= sizeof (struct GNUNET_CADET_ConnectionCreate); - if (size % sizeof (struct GNUNET_PeerIdentity)) - { - GNUNET_break_op (0); - return GNUNET_OK; - } if (0 != size % sizeof (struct GNUNET_PeerIdentity)) { GNUNET_break_op (0); - return GNUNET_OK; + return; } size /= sizeof (struct GNUNET_PeerIdentity); if (1 > size) { GNUNET_break_op (0); - return GNUNET_OK; + return; } LOG (GNUNET_ERROR_TYPE_DEBUG, " path has %u hops.\n", size); /* Get parameters */ - msg = (struct GNUNET_CADET_ConnectionCreate *) message; cid = &msg->cid; - log_message (message, peer, cid); + log_message (&msg->header, peer, cid); id = (struct GNUNET_PeerIdentity *) &msg[1]; LOG (GNUNET_ERROR_TYPE_DEBUG, " origin: %s\n", GNUNET_i2s (id)); @@ -2012,16 +1934,15 @@ GCC_handle_create (void *cls, /* Path was malformed, probably our own ID was not in it. */ GNUNET_STATISTICS_update (stats, "# malformed paths", 1, GNUNET_NO); GNUNET_break_op (0); - return GNUNET_OK; + return; } - if (0 == own_pos) { /* We received this request from a neighbor, we cannot be origin */ GNUNET_STATISTICS_update (stats, "# fake paths", 1, GNUNET_NO); GNUNET_break_op (0); path_destroy (path); - return GNUNET_OK; + return; } LOG (GNUNET_ERROR_TYPE_DEBUG, " Own position: %u\n", own_pos); @@ -2035,14 +1956,14 @@ GCC_handle_create (void *cls, GNUNET_break (0); path_destroy (path); GCC_check_connections (); - return GNUNET_OK; + return; } send_broken_unknown (cid, &my_full_id, GNUNET_PEER_resolve2 (path->peers[own_pos + 1]), peer); path_destroy (path); GCC_check_connections (); - return GNUNET_OK; + return; } GCP_add_path_to_all (path, GNUNET_NO); connection_reset_timeout (c, GNUNET_YES); @@ -2092,40 +2013,32 @@ GCC_handle_create (void *cls, LOG (GNUNET_ERROR_TYPE_DEBUG, " Retransmitting.\n"); GCP_add_path (dest_peer, path_duplicate (path), GNUNET_NO); GCP_add_path_to_origin (orig_peer, path_duplicate (path), GNUNET_NO); - GNUNET_assert (NULL == GCC_send_prebuilt_message (message, 0, 0, c, - GNUNET_YES, GNUNET_YES, - NULL, NULL)); + GNUNET_assert (NULL == + GCC_send_prebuilt_message (&msg->header, 0, 0, c, + GNUNET_YES, GNUNET_YES, + NULL, NULL)); } path_destroy (path); GCC_check_connections (); - return GNUNET_OK; } /** - * Core handler for path confirmations. + * Handler for connection confirmations. * - * @param cls closure - * @param message message - * @param peer peer identity this notification is about - * @return #GNUNET_OK to keep the connection open, - * #GNUNET_SYSERR to close it (signal serious error) + * @param peer Message sender (neighbor). + * @param msg Message itself. */ -int -GCC_handle_confirm (void *cls, - const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_MessageHeader *message) +void +GCC_handle_confirm (struct CadetPeer *peer, + const struct GNUNET_CADET_ConnectionACK *msg) { - struct GNUNET_CADET_ConnectionACK *msg; struct CadetConnection *c; - struct CadetPeerPath *p; - struct CadetPeer *pi; enum CadetConnectionState oldstate; int fwd; GCC_check_connections (); - msg = (struct GNUNET_CADET_ConnectionACK *) message; - log_message (message, peer, &msg->cid); + log_message (&msg->header, peer, &msg->cid); c = connection_get (&msg->cid); if (NULL == c) { @@ -2135,30 +2048,30 @@ GCC_handle_confirm (void *cls, " don't know the connection!\n"); send_broken_unknown (&msg->cid, &my_full_id, NULL, peer); GCC_check_connections (); - return GNUNET_OK; + return; } - if (GNUNET_NO != c->destroy) { GNUNET_assert (CADET_CONNECTION_DESTROYED == c->state); + GNUNET_STATISTICS_update (stats, "# control on dying connection", + 1, GNUNET_NO); LOG (GNUNET_ERROR_TYPE_DEBUG, "connection %s being destroyed, ignoring confirm\n", GCC_2s (c)); GCC_check_connections (); - return GNUNET_OK; + return; } oldstate = c->state; - LOG (GNUNET_ERROR_TYPE_DEBUG, " via peer %s\n", GNUNET_i2s (peer)); - pi = GCP_get (peer, GNUNET_YES); - if (get_next_hop (c) == pi) + LOG (GNUNET_ERROR_TYPE_DEBUG, " via peer %s\n", GCP_2s (peer)); + if (get_next_hop (c) == peer) { LOG (GNUNET_ERROR_TYPE_DEBUG, " SYNACK\n"); fwd = GNUNET_NO; if (CADET_CONNECTION_SENT == oldstate) connection_change_state (c, CADET_CONNECTION_ACK); } - else if (get_prev_hop (c) == pi) + else if (get_prev_hop (c) == peer) { LOG (GNUNET_ERROR_TYPE_DEBUG, " FINAL ACK\n"); fwd = GNUNET_YES; @@ -2166,17 +2079,18 @@ GCC_handle_confirm (void *cls, } else { + GNUNET_STATISTICS_update (stats, "# control on connection from wrong peer", + 1, GNUNET_NO); GNUNET_break_op (0); - return GNUNET_OK; + return; } connection_reset_timeout (c, fwd); /* Add path to peers? */ - p = c->path; - if (NULL != p) + if (NULL != c->path) { - GCP_add_path_to_all (p, GNUNET_YES); + GCP_add_path_to_all (c->path, GNUNET_YES); } else { @@ -2184,12 +2098,12 @@ GCC_handle_confirm (void *cls, } /* Message for us as creator? */ - if (GCC_is_origin (c, GNUNET_YES)) + if (GNUNET_YES == GCC_is_origin (c, GNUNET_YES)) { if (GNUNET_NO != fwd) { - GNUNET_break_op (0); - return GNUNET_OK; + GNUNET_break (0); + return; } LOG (GNUNET_ERROR_TYPE_DEBUG, " Connection (SYN)ACK for us!\n"); @@ -2197,7 +2111,7 @@ GCC_handle_confirm (void *cls, if (CADET_CONNECTION_SENT == oldstate) connection_reset_timeout (c, GNUNET_YES); - /* Change connection state */ + /* Change connection state, send ACK */ connection_change_state (c, CADET_CONNECTION_READY); send_connection_ack (c, GNUNET_YES); @@ -2205,7 +2119,7 @@ GCC_handle_confirm (void *cls, if (CADET_TUNNEL_WAITING == GCT_get_cstate (c->t)) GCT_change_cstate (c->t, CADET_TUNNEL_READY); GCC_check_connections (); - return GNUNET_OK; + return; } /* Message for us as destination? */ @@ -2213,8 +2127,8 @@ GCC_handle_confirm (void *cls, { if (GNUNET_YES != fwd) { - GNUNET_break_op (0); - return GNUNET_OK; + GNUNET_break (0); + return; } LOG (GNUNET_ERROR_TYPE_DEBUG, " Connection ACK for us!\n"); @@ -2226,41 +2140,34 @@ GCC_handle_confirm (void *cls, if (CADET_TUNNEL_WAITING == GCT_get_cstate (c->t)) GCT_change_cstate (c->t, CADET_TUNNEL_READY); GCC_check_connections (); - return GNUNET_OK; + return; } LOG (GNUNET_ERROR_TYPE_DEBUG, " not for us, retransmitting...\n"); GNUNET_assert (NULL == - GCC_send_prebuilt_message (message, 0, 0, c, fwd, + GCC_send_prebuilt_message (&msg->header, 0, 0, c, fwd, GNUNET_YES, NULL, NULL)); GCC_check_connections (); - return GNUNET_OK; + return; } /** - * Core handler for notifications of broken connections. + * Handler for notifications of broken connections. * - * @param cls Closure (unused). - * @param id Peer identity of sending neighbor. - * @param message Message. - * @return #GNUNET_OK to keep the connection open, - * #GNUNET_SYSERR to close it (signal serious error) + * @param peer Message sender (neighbor). + * @param msg Message itself. */ -int -GCC_handle_broken (void* cls, - const struct GNUNET_PeerIdentity* id, - const struct GNUNET_MessageHeader* message) +void +GCC_handle_broken (struct CadetPeer *peer, + const struct GNUNET_CADET_ConnectionBroken *msg) { - struct GNUNET_CADET_ConnectionBroken *msg; struct CadetConnection *c; struct CadetTunnel *t; - int pending; int fwd; GCC_check_connections (); - msg = (struct GNUNET_CADET_ConnectionBroken *) message; - log_message (message, id, &msg->cid); + log_message (&msg->header, peer, &msg->cid); LOG (GNUNET_ERROR_TYPE_DEBUG, " regarding %s\n", GNUNET_i2s (&msg->peer1)); LOG (GNUNET_ERROR_TYPE_DEBUG, " regarding %s\n", @@ -2269,13 +2176,21 @@ GCC_handle_broken (void* cls, if (NULL == c) { LOG (GNUNET_ERROR_TYPE_DEBUG, " duplicate CONNECTION_BROKEN\n"); + GNUNET_STATISTICS_update (stats, "# duplicate CONNECTION_BROKEN", + 1, GNUNET_NO); GCC_check_connections (); - return GNUNET_OK; + return; } t = c->t; - fwd = is_fwd (c, id); + fwd = is_fwd (c, peer); + if (GNUNET_SYSERR == fwd) + { + GNUNET_break_op (0); + GCC_check_connections (); + return; + } mark_destroyed (c); if (GCC_is_terminal (c, fwd)) { @@ -2286,7 +2201,7 @@ GCC_handle_broken (void* cls, /* A terminal connection should not have 't' set to NULL. */ GNUNET_break (0); GCC_debug (c, GNUNET_ERROR_TYPE_ERROR); - return GNUNET_OK; + return; } endpoint = GCP_get_short (c->path->peers[c->path->length - 1], GNUNET_YES); if (2 < c->path->length) @@ -2297,44 +2212,35 @@ GCC_handle_broken (void* cls, GCT_remove_connection (t, c); c->t = NULL; - pending = c->pending_messages; - if (0 < pending) - resend_messages_and_destroy (c, !fwd); - else - GCC_destroy (c); + GCC_destroy (c); } else { - GNUNET_assert (NULL == GCC_send_prebuilt_message (message, 0, 0, c, fwd, - GNUNET_YES, NULL, NULL)); + GNUNET_assert (NULL == + GCC_send_prebuilt_message (&msg->header, 0, 0, c, fwd, + GNUNET_YES, NULL, NULL)); connection_cancel_queues (c, !fwd); } GCC_check_connections (); - return GNUNET_OK; + return; } /** - * Core handler for tunnel destruction + * Handler for notifications of destroyed connections. * - * @param cls Closure (unused). - * @param peer Peer identity of sending neighbor. - * @param message Message. - * @return #GNUNET_OK to keep the connection open, - * #GNUNET_SYSERR to close it (signal serious error) + * @param peer Message sender (neighbor). + * @param msg Message itself. */ -int -GCC_handle_destroy (void *cls, - const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_MessageHeader *message) +void +GCC_handle_destroy (struct CadetPeer *peer, + const struct GNUNET_CADET_ConnectionDestroy *msg) { - const struct GNUNET_CADET_ConnectionDestroy *msg; struct CadetConnection *c; int fwd; GCC_check_connections (); - msg = (const struct GNUNET_CADET_ConnectionDestroy *) message; - log_message (message, peer, &msg->cid); + log_message (&msg->header, peer, &msg->cid); c = connection_get (&msg->cid); if (NULL == c) { @@ -2346,20 +2252,23 @@ GCC_handle_destroy (void *cls, "# control on unknown connection", 1, GNUNET_NO); LOG (GNUNET_ERROR_TYPE_DEBUG, - " connection unknown: already destroyed?\n"); + " connection unknown destroyed: previously destroyed?\n"); GCC_check_connections (); - return GNUNET_OK; + return; } + fwd = is_fwd (c, peer); if (GNUNET_SYSERR == fwd) { - GNUNET_break_op (0); /* FIXME */ - return GNUNET_OK; + GNUNET_break_op (0); + GCC_check_connections (); + return; } + if (GNUNET_NO == GCC_is_terminal (c, fwd)) { GNUNET_assert (NULL == - GCC_send_prebuilt_message (message, 0, 0, c, fwd, + GCC_send_prebuilt_message (&msg->header, 0, 0, c, fwd, GNUNET_YES, NULL, NULL)); } else if (0 == c->pending_messages) @@ -2367,7 +2276,7 @@ GCC_handle_destroy (void *cls, LOG (GNUNET_ERROR_TYPE_DEBUG, " directly destroying connection!\n"); GCC_destroy (c); GCC_check_connections (); - return GNUNET_OK; + return; } mark_destroyed (c); if (NULL != c->t) @@ -2376,65 +2285,188 @@ GCC_handle_destroy (void *cls, c->t = NULL; } GCC_check_connections (); - return GNUNET_OK; + return; } /** - * Check the message against internal state and test if it goes FWD or BCK. - * - * Updates the PID, state and timeout values for the connection. + * Handler for cadet network traffic hop-by-hop acks. * - * @param message Message to check. It must belong to an existing connection. - * @param minimum_size The message cannot be smaller than this value. - * @param cid Connection ID (even if @a c is NULL, the ID is still needed). - * @param c Connection this message should belong. If NULL, check fails. - * @param neighbor Neighbor that sent the message. + * @param peer Message sender (neighbor). + * @param msg Message itself. */ -static int -check_message (const struct GNUNET_MessageHeader *message, - size_t minimum_size, - const struct GNUNET_CADET_Hash* cid, - struct CadetConnection *c, - const struct GNUNET_PeerIdentity *neighbor, - uint32_t pid) +void +GCC_handle_ack (struct CadetPeer *peer, + const struct GNUNET_CADET_ACK *msg) { - GNUNET_PEER_Id neighbor_id; + struct CadetConnection *c; struct CadetFlowControl *fc; - struct CadetPeer *hop; + uint32_t ack; int fwd; - uint16_t type; - - /* Check size */ - if (ntohs (message->size) < minimum_size) - { - GNUNET_break_op (0); - LOG (GNUNET_ERROR_TYPE_WARNING, "Size %u < %u\n", - ntohs (message->size), minimum_size); - return GNUNET_SYSERR; - } - /* Check connection */ + GCC_check_connections (); + log_message (&msg->header, peer, &msg->cid); + c = connection_get (&msg->cid); if (NULL == c) { GNUNET_STATISTICS_update (stats, - "# unknown connection", - 1, GNUNET_NO); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "%s on unknown connection %s\n", - GC_m2s (ntohs (message->type)), - GNUNET_h2s (GC_h2hc (cid))); - send_broken_unknown (cid, + "# ack on unknown connection", + 1, + GNUNET_NO); + send_broken_unknown (&msg->cid, &my_full_id, NULL, - neighbor); - return GNUNET_SYSERR; + peer); + GCC_check_connections (); + return; + } + + /* Is this a forward or backward ACK? */ + if (get_next_hop (c) == peer) + { + fc = &c->fwd_fc; + fwd = GNUNET_YES; + } + else if (get_prev_hop (c) == peer) + { + fc = &c->bck_fc; + fwd = GNUNET_NO; + } + else + { + GNUNET_break_op (0); + return; + } + + ack = ntohl (msg->ack); + LOG (GNUNET_ERROR_TYPE_DEBUG, " %s ACK %u (was %u)\n", + GC_f2s (fwd), ack, fc->last_ack_recv); + if (GC_is_pid_bigger (ack, fc->last_ack_recv)) + fc->last_ack_recv = ack; + + /* Cancel polling if the ACK is big enough. */ + if (NULL != fc->poll_task && + GC_is_pid_bigger (fc->last_ack_recv, fc->last_pid_sent)) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, " Cancel poll\n"); + GNUNET_SCHEDULER_cancel (fc->poll_task); + fc->poll_task = NULL; + fc->poll_time = GNUNET_TIME_UNIT_SECONDS; + } + + GCC_check_connections (); +} + + +/** + * Handler for cadet network traffic hop-by-hop data counter polls. + * + * @param peer Message sender (neighbor). + * @param msg Message itself. + */ +void +GCC_handle_poll (struct CadetPeer *peer, + const struct GNUNET_CADET_Poll *msg) +{ + struct CadetConnection *c; + struct CadetFlowControl *fc; + uint32_t pid; + int fwd; + + GCC_check_connections (); + log_message (&msg->header, peer, &msg->cid); + c = connection_get (&msg->cid); + if (NULL == c) + { + GNUNET_STATISTICS_update (stats, "# poll on unknown connection", 1, + GNUNET_NO); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "POLL message on unknown connection %s!\n", + GNUNET_h2s (GC_h2hc (&msg->cid))); + send_broken_unknown (&msg->cid, + &my_full_id, + NULL, + peer); + GCC_check_connections (); + return; + } + + /* Is this a forward or backward ACK? + * Note: a poll should never be needed in a loopback case, + * since there is no possiblility of packet loss there, so + * this way of discerining FWD/BCK should not be a problem. + */ + if (get_next_hop (c) == peer) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, " FWD FC\n"); + fc = &c->fwd_fc; + } + else if (get_prev_hop (c) == peer) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, " BCK FC\n"); + fc = &c->bck_fc; + } + else + { + GNUNET_break_op (0); + return; + } + + pid = ntohl (msg->pid); + LOG (GNUNET_ERROR_TYPE_DEBUG, " PID %u, OLD %u\n", pid, fc->last_pid_recv); + fc->last_pid_recv = pid; + fwd = fc == &c->bck_fc; + GCC_send_ack (c, fwd, GNUNET_YES); + GCC_check_connections (); +} + + +/** + * Check the message against internal state and test if it goes FWD or BCK. + * + * Updates the PID, state and timeout values for the connection. + * + * @param message Message to check. It must belong to an existing connection. + * @param cid Connection ID (even if @a c is NULL, the ID is still needed). + * @param c Connection this message should belong. If NULL, check fails. + * @param sender Neighbor that sent the message. + * + * @return #GNUNET_YES if the message goes FWD. + * #GNUNET_NO if it goes BCK. + * #GNUNET_SYSERR if there is an error (unauthorized sender, ...). + */ +static int +check_message (const struct GNUNET_MessageHeader *message, + const struct GNUNET_CADET_Hash* cid, + struct CadetConnection *c, + struct CadetPeer *sender, + uint32_t pid) +{ + struct CadetFlowControl *fc; + struct CadetPeer *hop; + int fwd; + uint16_t type; + + /* Check connection */ + if (NULL == c) + { + GNUNET_STATISTICS_update (stats, + "# unknown connection", + 1, GNUNET_NO); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s on unknown connection %s\n", + GC_m2s (ntohs (message->type)), + GNUNET_h2s (GC_h2hc (cid))); + send_broken_unknown (cid, + &my_full_id, + NULL, + sender); + return GNUNET_SYSERR; } /* Check if origin is as expected */ - neighbor_id = GNUNET_PEER_search (neighbor); hop = get_prev_hop (c); - if (neighbor_id == GCP_get_short_id (hop)) + if (sender == hop) { fwd = GNUNET_YES; } @@ -2442,7 +2474,7 @@ check_message (const struct GNUNET_MessageHeader *message, { hop = get_next_hop (c); GNUNET_break (hop == c->next_peer); - if (neighbor_id == GCP_get_short_id (hop)) + if (sender == hop) { fwd = GNUNET_NO; } @@ -2508,123 +2540,111 @@ check_message (const struct GNUNET_MessageHeader *message, /** - * Generic handler for cadet network encrypted traffic. + * Handler for key exchange traffic (Axolotl KX). * - * @param peer Peer identity this notification is about. - * @param msg Encrypted message. - * @return #GNUNET_OK to keep the connection open, - * #GNUNET_SYSERR to close it (signal serious error) + * @param peer Message sender (neighbor). + * @param msg Message itself. */ -static int -handle_cadet_encrypted (const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_MessageHeader *message) +void +GCC_handle_kx (struct CadetPeer *peer, + const struct GNUNET_CADET_KX *msg) { - const struct GNUNET_CADET_AX *ax_msg; const struct GNUNET_CADET_Hash* cid; struct CadetConnection *c; - size_t minimum_size; - size_t overhead; - uint32_t pid; int fwd; GCC_check_connections (); - GNUNET_assert (GNUNET_MESSAGE_TYPE_CADET_AX == ntohs (message->type)); - overhead = sizeof (struct GNUNET_CADET_AX); - ax_msg = (const struct GNUNET_CADET_AX *) message; - cid = &ax_msg->cid; - pid = ntohl (ax_msg->pid); - log_message (message, peer, cid); - - minimum_size = sizeof (struct GNUNET_MessageHeader) + overhead; + cid = &msg->cid; + log_message (&msg->header, peer, cid); + c = connection_get (cid); - fwd = check_message (message, - minimum_size, + fwd = check_message (&msg->header, cid, c, peer, - pid); + 0); /* If something went wrong, discard message. */ if (GNUNET_SYSERR == fwd) { + GNUNET_break_op (0); GCC_check_connections (); - return GNUNET_OK; + return; } /* Is this message for us? */ if (GCC_is_terminal (c, fwd)) { - GNUNET_STATISTICS_update (stats, "# received encrypted", 1, GNUNET_NO); - + LOG (GNUNET_ERROR_TYPE_DEBUG, " message for us!\n"); + GNUNET_STATISTICS_update (stats, "# received KX", 1, GNUNET_NO); if (NULL == c->t) { - GNUNET_break (GNUNET_NO != c->destroy); - return GNUNET_OK; + GNUNET_break (0); + return; } - GCT_handle_encrypted (c->t, message); - GCC_send_ack (c, fwd, GNUNET_NO); + GCT_handle_kx (c->t, &msg[1].header); GCC_check_connections (); - return GNUNET_OK; + return; } /* Message not for us: forward to next hop */ LOG (GNUNET_ERROR_TYPE_DEBUG, " not for us, retransmitting...\n"); GNUNET_STATISTICS_update (stats, "# messages forwarded", 1, GNUNET_NO); - GNUNET_assert (NULL == GCC_send_prebuilt_message (message, 0, 0, c, fwd, + GNUNET_assert (NULL == GCC_send_prebuilt_message (&msg->header, 0, 0, c, fwd, GNUNET_NO, NULL, NULL)); GCC_check_connections (); - return GNUNET_OK; } /** - * Generic handler for cadet network encrypted traffic. + * Handler for encrypted cadet network traffic (channel mgmt, data). * - * @param peer Peer identity this notification is about. - * @param msg Encrypted message. - * @return #GNUNET_OK to keep the connection open, - * #GNUNET_SYSERR to close it (signal serious error) + * @param peer Message sender (neighbor). + * @param msg Message itself. */ -static int -handle_cadet_kx (const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_CADET_KX *msg) +void +GCC_handle_encrypted (struct CadetPeer *peer, + const struct GNUNET_CADET_AX *msg) { const struct GNUNET_CADET_Hash* cid; struct CadetConnection *c; - size_t expected_size; + uint32_t pid; int fwd; GCC_check_connections (); cid = &msg->cid; + pid = ntohl (msg->pid); log_message (&msg->header, peer, cid); - expected_size = sizeof (struct GNUNET_CADET_KX) - + sizeof (struct GNUNET_MessageHeader); c = connection_get (cid); fwd = check_message (&msg->header, - expected_size, cid, c, peer, - 0); + pid); /* If something went wrong, discard message. */ if (GNUNET_SYSERR == fwd) - return GNUNET_OK; + { + GNUNET_break_op (0); + GCC_check_connections (); + return; + } /* Is this message for us? */ if (GCC_is_terminal (c, fwd)) { - LOG (GNUNET_ERROR_TYPE_DEBUG, " message for us!\n"); - GNUNET_STATISTICS_update (stats, "# received KX", 1, GNUNET_NO); + GNUNET_STATISTICS_update (stats, "# received encrypted", 1, GNUNET_NO); + if (NULL == c->t) { - GNUNET_break (0); - return GNUNET_OK; + GNUNET_break (GNUNET_NO != c->destroy); + return; } - GCT_handle_kx (c->t, &msg[1].header); + GCT_handle_encrypted (c->t, &msg->header); + GCC_send_ack (c, fwd, GNUNET_NO); GCC_check_connections (); - return GNUNET_OK; + return; } /* Message not for us: forward to next hop */ @@ -2633,259 +2653,6 @@ handle_cadet_kx (const struct GNUNET_PeerIdentity *peer, GNUNET_assert (NULL == GCC_send_prebuilt_message (&msg->header, 0, 0, c, fwd, GNUNET_NO, NULL, NULL)); GCC_check_connections (); - return GNUNET_OK; -} - - -/** - * Core handler for key exchange traffic (ephemeral key, ping, pong). - * - * @param cls Closure (unused). - * @param message Message received. - * @param peer Peer who sent the message. - * @return #GNUNET_OK to keep the connection open, - * #GNUNET_SYSERR to close it (signal serious error) - */ -int -GCC_handle_kx (void *cls, - const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_MessageHeader *message) -{ - GCC_check_connections (); - return handle_cadet_kx (peer, (struct GNUNET_CADET_KX *) message); -} - - -/** - * Core handler for encrypted cadet network traffic (channel mgmt, data). - * - * @param cls Closure (unused). - * @param message Message received. - * @param peer Peer who sent the message. - * @return #GNUNET_OK to keep the connection open, - * #GNUNET_SYSERR to close it (signal serious error) - */ -int -GCC_handle_encrypted (void *cls, const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_MessageHeader *message) -{ - GCC_check_connections (); - return handle_cadet_encrypted (peer, message); -} - - -/** - * Core handler for cadet network traffic point-to-point acks. - * - * @param cls closure - * @param message message - * @param peer peer identity this notification is about - * @return #GNUNET_OK to keep the connection open, - * #GNUNET_SYSERR to close it (signal serious error) - */ -int -GCC_handle_ack (void *cls, const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_MessageHeader *message) -{ - struct GNUNET_CADET_ACK *msg; - struct CadetConnection *c; - struct CadetFlowControl *fc; - GNUNET_PEER_Id id; - uint32_t ack; - int fwd; - - GCC_check_connections (); - msg = (struct GNUNET_CADET_ACK *) message; - log_message (message, peer, &msg->cid); - c = connection_get (&msg->cid); - if (NULL == c) - { - GNUNET_STATISTICS_update (stats, - "# ack on unknown connection", - 1, - GNUNET_NO); - send_broken_unknown (&msg->cid, - &my_full_id, - NULL, - peer); - GCC_check_connections (); - return GNUNET_OK; - } - - /* Is this a forward or backward ACK? */ - id = GNUNET_PEER_search (peer); - if (GCP_get_short_id (get_next_hop (c)) == id) - { - fc = &c->fwd_fc; - fwd = GNUNET_YES; - } - else if (GCP_get_short_id (get_prev_hop (c)) == id) - { - fc = &c->bck_fc; - fwd = GNUNET_NO; - } - else - { - GNUNET_break_op (0); - return GNUNET_OK; - } - - ack = ntohl (msg->ack); - LOG (GNUNET_ERROR_TYPE_DEBUG, " %s ACK %u (was %u)\n", - GC_f2s (fwd), ack, fc->last_ack_recv); - if (GC_is_pid_bigger (ack, fc->last_ack_recv)) - fc->last_ack_recv = ack; - - /* Cancel polling if the ACK is big enough. */ - if (NULL != fc->poll_task && - GC_is_pid_bigger (fc->last_ack_recv, fc->last_pid_sent)) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, " Cancel poll\n"); - GNUNET_SCHEDULER_cancel (fc->poll_task); - fc->poll_task = NULL; - fc->poll_time = GNUNET_TIME_UNIT_SECONDS; - } - - connection_unlock_queue (c, fwd); - GCC_check_connections (); - return GNUNET_OK; -} - - -/** - * Core handler for cadet network traffic point-to-point ack polls. - * - * @param cls closure - * @param message message - * @param peer peer identity this notification is about - * @return #GNUNET_OK to keep the connection open, - * #GNUNET_SYSERR to close it (signal serious error) - */ -int -GCC_handle_poll (void *cls, - const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_MessageHeader *message) -{ - struct GNUNET_CADET_Poll *msg; - struct CadetConnection *c; - struct CadetFlowControl *fc; - GNUNET_PEER_Id id; - uint32_t pid; - int fwd; - - GCC_check_connections (); - msg = (struct GNUNET_CADET_Poll *) message; - log_message (message, peer, &msg->cid); - c = connection_get (&msg->cid); - if (NULL == c) - { - GNUNET_STATISTICS_update (stats, "# poll on unknown connection", 1, - GNUNET_NO); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "POLL message on unknown connection %s!\n", - GNUNET_h2s (GC_h2hc (&msg->cid))); - send_broken_unknown (&msg->cid, - &my_full_id, - NULL, - peer); - GCC_check_connections (); - return GNUNET_OK; - } - - /* Is this a forward or backward ACK? - * Note: a poll should never be needed in a loopback case, - * since there is no possiblility of packet loss there, so - * this way of discerining FWD/BCK should not be a problem. - */ - id = GNUNET_PEER_search (peer); - if (GCP_get_short_id (get_next_hop (c)) == id) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, " FWD FC\n"); - fc = &c->fwd_fc; - } - else if (GCP_get_short_id (get_prev_hop (c)) == id) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, " BCK FC\n"); - fc = &c->bck_fc; - } - else - { - GNUNET_break_op (0); - return GNUNET_OK; - } - - pid = ntohl (msg->pid); - LOG (GNUNET_ERROR_TYPE_DEBUG, " PID %u, OLD %u\n", pid, fc->last_pid_recv); - fc->last_pid_recv = pid; - fwd = fc == &c->bck_fc; - GCC_send_ack (c, fwd, GNUNET_YES); - GCC_check_connections (); - - return GNUNET_OK; -} - - -/** - * Send an ACK on the appropriate connection/channel, depending on - * the direction and the position of the peer. - * - * @param c Which connection to send the hop-by-hop ACK. - * @param fwd Is this a fwd ACK? (will go dest->root). - * @param force Send the ACK even if suboptimal (e.g. requested by POLL). - */ -void -GCC_send_ack (struct CadetConnection *c, int fwd, int force) -{ - unsigned int buffer; - - GCC_check_connections (); - LOG (GNUNET_ERROR_TYPE_DEBUG, "GCC send %s ACK on %s\n", - GC_f2s (fwd), GCC_2s (c)); - - if (NULL == c) - { - GNUNET_break (0); - return; - } - - if (GNUNET_NO != c->destroy) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, " being destroyed, why bother...\n"); - GCC_check_connections (); - return; - } - - /* Get available buffer space */ - if (GCC_is_terminal (c, fwd)) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, " getting from all channels\n"); - buffer = GCT_get_channels_buffer (c->t); - } - else - { - LOG (GNUNET_ERROR_TYPE_DEBUG, " getting from one connection\n"); - buffer = GCC_get_buffer (c, fwd); - } - LOG (GNUNET_ERROR_TYPE_DEBUG, " buffer available: %u\n", buffer); - if (0 == buffer && GNUNET_NO == force) - { - GCC_check_connections (); - return; - } - - /* Send available buffer space */ - if (GCC_is_origin (c, fwd)) - { - GNUNET_assert (NULL != c->t); - LOG (GNUNET_ERROR_TYPE_DEBUG, " sending on channels...\n"); - GCT_unchoke_channels (c->t); - } - else - { - LOG (GNUNET_ERROR_TYPE_DEBUG, " sending on connection\n"); - send_ack (c, buffer, fwd, force); - } - GCC_check_connections (); } @@ -2974,12 +2741,13 @@ GCC_shutdown (void) * Create a connection. * * @param cid Connection ID (either created locally or imposed remotely). - * @param t Tunnel this connection belongs to (or NULL); + * @param t Tunnel this connection belongs to (or NULL for transit connections); * @param path Path this connection has to use (copy is made). * @param own_pos Own position in the @c path path. * - * @return Newly created connection, NULL in case of error (own id not in path). - */ + * @return Newly created connection. + * NULL in case of error: own id not in path, wrong neighbors, ... +*/ struct CadetConnection * GCC_new (const struct GNUNET_CADET_Hash *cid, struct CadetTunnel *t, @@ -3036,6 +2804,14 @@ GCC_new (const struct GNUNET_CADET_Hash *cid, } +/** + * Connection is no longer needed: destroy it. + * + * Cancels all pending traffic (including possible DESTROY messages), all + * maintenance tasks and removes the connection from neighbor peers and tunnel. + * + * @param c Connection to destroy. + */ void GCC_destroy (struct CadetConnection *c) { @@ -3428,6 +3204,7 @@ GCC_is_direct (struct CadetConnection *c) * @param message Message to send. Function makes a copy of it. * If message is not hop-by-hop, decrements TTL of copy. * @param payload_type Type of payload, in case the message is encrypted. + * @param payload_id ID of the payload (PID, ACK, ...). * @param c Connection on which this message is transmitted. * @param fwd Is this a fwd message? * @param force Force the connection to accept the message (buffer overfill). @@ -3446,7 +3223,7 @@ GCC_send_prebuilt_message (const struct GNUNET_MessageHeader *message, { struct CadetFlowControl *fc; struct CadetConnectionQueue *q; - void *data; + struct GNUNET_MessageHeader *copy; size_t size; uint16_t type; int droppable; @@ -3460,8 +3237,8 @@ GCC_send_prebuilt_message (const struct GNUNET_MessageHeader *message, } size = ntohs (message->size); - data = GNUNET_malloc (size); - GNUNET_memcpy (data, message, size); + copy = GNUNET_malloc (size); + GNUNET_memcpy (copy, message, size); type = ntohs (message->type); LOG (GNUNET_ERROR_TYPE_INFO, "--> %s (%s %4u) on conn %s (%p) %s [%5u]\n", @@ -3478,7 +3255,7 @@ GCC_send_prebuilt_message (const struct GNUNET_MessageHeader *message, struct GNUNET_CADET_ConnectionBroken *bmsg; case GNUNET_MESSAGE_TYPE_CADET_AX: - axmsg = (struct GNUNET_CADET_AX *) data; + axmsg = (struct GNUNET_CADET_AX *) copy; axmsg->cid = c->id; LOG (GNUNET_ERROR_TYPE_DEBUG, " Q_N+ %p %u\n", fc, fc->queue_n); LOG (GNUNET_ERROR_TYPE_DEBUG, "last pid sent %u\n", fc->last_pid_sent); @@ -3494,41 +3271,42 @@ GCC_send_prebuilt_message (const struct GNUNET_MessageHeader *message, break; case GNUNET_MESSAGE_TYPE_CADET_KX: - kmsg = (struct GNUNET_CADET_KX *) data; + kmsg = (struct GNUNET_CADET_KX *) copy; kmsg->cid = c->id; break; case GNUNET_MESSAGE_TYPE_CADET_ACK: - amsg = (struct GNUNET_CADET_ACK *) data; + amsg = (struct GNUNET_CADET_ACK *) copy; amsg->cid = c->id; LOG (GNUNET_ERROR_TYPE_DEBUG, " ack %u\n", ntohl (amsg->ack)); droppable = GNUNET_NO; break; case GNUNET_MESSAGE_TYPE_CADET_POLL: - pmsg = (struct GNUNET_CADET_Poll *) data; + pmsg = (struct GNUNET_CADET_Poll *) copy; pmsg->cid = c->id; LOG (GNUNET_ERROR_TYPE_DEBUG, " POLL %u\n", ntohl (pmsg->pid)); droppable = GNUNET_NO; break; case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY: - dmsg = (struct GNUNET_CADET_ConnectionDestroy *) data; + dmsg = (struct GNUNET_CADET_ConnectionDestroy *) copy; dmsg->cid = c->id; break; case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN: - bmsg = (struct GNUNET_CADET_ConnectionBroken *) data; + bmsg = (struct GNUNET_CADET_ConnectionBroken *) copy; bmsg->cid = c->id; break; case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE: case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK: + GNUNET_break (0); /* Should've used specific functions. */ break; default: GNUNET_break (0); - GNUNET_free (data); + GNUNET_free (copy); return NULL; } @@ -3543,7 +3321,7 @@ GCC_send_prebuilt_message (const struct GNUNET_MessageHeader *message, { fc->queue_n--; } - GNUNET_free (data); + GNUNET_free (copy); return NULL; /* Drop this message */ } @@ -3553,12 +3331,14 @@ GCC_send_prebuilt_message (const struct GNUNET_MessageHeader *message, q = GNUNET_new (struct CadetConnectionQueue); q->forced = !droppable; - q->q = GCP_queue_add (get_hop (c, fwd), data, type, payload_type, payload_id, - size, c, fwd, &conn_message_sent, q); - if (NULL == q->q) + q->peer_q = GCP_send (get_hop (c, fwd), copy, + payload_type, payload_id, + c, fwd, + &conn_message_sent, q); + if (NULL == q->peer_q) { LOG (GNUNET_ERROR_TYPE_DEBUG, "dropping msg on %s, NULL q\n", GCC_2s (c)); - GNUNET_free (data); + GNUNET_free (copy); GNUNET_free (q); GCC_check_connections (); return NULL; @@ -3584,8 +3364,8 @@ GCC_cancel (struct CadetConnectionQueue *q) { LOG (GNUNET_ERROR_TYPE_DEBUG, "! GCC cancel message\n"); - /* queue destroy calls message_sent, which calls q->cont and frees q */ - GCP_queue_destroy (q->q, GNUNET_YES, GNUNET_NO, 0); + /* send_cancel calls message_sent, which calls q->cont and frees q */ + GCP_send_cancel (q->peer_q); GCC_check_connections (); } @@ -3594,35 +3374,116 @@ GCC_cancel (struct CadetConnectionQueue *q) * Sends a CREATE CONNECTION message for a path to a peer. * Changes the connection and tunnel states if necessary. * - * @param connection Connection to create. + * @param c Connection to create. */ void -GCC_send_create (struct CadetConnection *connection) +GCC_send_create (struct CadetConnection *c) { enum CadetTunnelCState state; size_t size; GCC_check_connections (); size = sizeof (struct GNUNET_CADET_ConnectionCreate); - size += connection->path->length * sizeof (struct GNUNET_PeerIdentity); + size += c->path->length * sizeof (struct GNUNET_PeerIdentity); + { + /* Allocate message on the stack */ + unsigned char cbuf[size]; + struct GNUNET_CADET_ConnectionCreate *msg; + struct GNUNET_PeerIdentity *peers; + + msg = (struct GNUNET_CADET_ConnectionCreate *) cbuf; + msg->header.size = htons (size); + msg->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE); + msg->cid = *GCC_get_id (c); + peers = (struct GNUNET_PeerIdentity *) &msg[1]; + for (int i = 0; i < c->path->length; i++) + { + GNUNET_PEER_resolve (c->path->peers[i], peers++); + } + GNUNET_assert (NULL == c->maintenance_q); + c->maintenance_q = GCP_send (get_next_hop (c), + &msg->header, + GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE, 0, + c, GNUNET_YES, + &conn_message_sent, NULL); + } LOG (GNUNET_ERROR_TYPE_INFO, "==> %s %19s on conn %s (%p) FWD [%5u]\n", GC_m2s (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE), "", - GCC_2s (connection), connection, size); + GCC_2s (c), c, size); LOG (GNUNET_ERROR_TYPE_DEBUG, " C_P+ %p %u (create)\n", - connection, connection->pending_messages); - connection->pending_messages++; - - connection->maintenance_q = - GCP_queue_add (get_next_hop (connection), NULL, - GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE, UINT16_MAX, 0, - size, connection, GNUNET_YES, &conn_message_sent, NULL); + c, c->pending_messages); + c->pending_messages++; - state = GCT_get_cstate (connection->t); + state = GCT_get_cstate (c->t); if (CADET_TUNNEL_SEARCHING == state || CADET_TUNNEL_NEW == state) - GCT_change_cstate (connection->t, CADET_TUNNEL_WAITING); - if (CADET_CONNECTION_NEW == connection->state) - connection_change_state (connection, CADET_CONNECTION_SENT); + GCT_change_cstate (c->t, CADET_TUNNEL_WAITING); + if (CADET_CONNECTION_NEW == c->state) + connection_change_state (c, CADET_CONNECTION_SENT); + GCC_check_connections (); +} + + +/** + * Send an ACK on the appropriate connection/channel, depending on + * the direction and the position of the peer. + * + * @param c Which connection to send the hop-by-hop ACK. + * @param fwd Is this a fwd ACK? (will go dest->root). + * @param force Send the ACK even if suboptimal (e.g. requested by POLL). + */ +void +GCC_send_ack (struct CadetConnection *c, int fwd, int force) +{ + unsigned int buffer; + + GCC_check_connections (); + LOG (GNUNET_ERROR_TYPE_DEBUG, "GCC send %s ACK on %s\n", + GC_f2s (fwd), GCC_2s (c)); + + if (NULL == c) + { + GNUNET_break (0); + return; + } + + if (GNUNET_NO != c->destroy) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, " being destroyed, why bother...\n"); + GCC_check_connections (); + return; + } + + /* Get available buffer space */ + if (GCC_is_terminal (c, fwd)) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, " getting from all channels\n"); + buffer = GCT_get_channels_buffer (c->t); + } + else + { + LOG (GNUNET_ERROR_TYPE_DEBUG, " getting from one connection\n"); + buffer = GCC_get_buffer (c, fwd); + } + LOG (GNUNET_ERROR_TYPE_DEBUG, " buffer available: %u\n", buffer); + if (0 == buffer && GNUNET_NO == force) + { + GCC_check_connections (); + return; + } + + /* Send available buffer space */ + if (GNUNET_YES == GCC_is_origin (c, fwd)) + { + GNUNET_assert (NULL != c->t); + LOG (GNUNET_ERROR_TYPE_DEBUG, " sending on channels...\n"); + GCT_unchoke_channels (c->t); + } + else + { + LOG (GNUNET_ERROR_TYPE_DEBUG, " sending on connection\n"); + send_ack (c, buffer, fwd, force); + } GCC_check_connections (); } diff --git a/src/cadet/gnunet-service-cadet_connection.h b/src/cadet/gnunet-service-cadet_connection.h index e96e2f24c..6302cd898 100644 --- a/src/cadet/gnunet-service-cadet_connection.h +++ b/src/cadet/gnunet-service-cadet_connection.h @@ -118,90 +118,86 @@ typedef void /** - * Core handler for connection creation. + * Handler for connection creation. * - * @param cls Closure (unused). - * @param peer Sender (neighbor). - * @param message Message. - * @return #GNUNET_OK to keep the connection open, - * #GNUNET_SYSERR to close it (signal serious error) + * @param peer Message sender (neighbor). + * @param msg Message itself. */ -int -GCC_handle_create (void *cls, - const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_MessageHeader *message); +void +GCC_handle_create (struct CadetPeer *peer, + const struct GNUNET_CADET_ConnectionCreate *msg); /** - * Core handler for path confirmations. + * Handler for connection confirmations. * - * @param cls closure - * @param message message - * @param peer peer identity this notification is about - * @return #GNUNET_OK to keep the connection open, - * #GNUNET_SYSERR to close it (signal serious error) + * @param peer Message sender (neighbor). + * @param msg Message itself. */ -int -GCC_handle_confirm (void *cls, - const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_MessageHeader *message); +void +GCC_handle_confirm (struct CadetPeer *peer, + const struct GNUNET_CADET_ConnectionACK *msg); /** - * Core handler for notifications of broken paths + * Handler for notifications of broken connections. * - * @param cls Closure (unused). - * @param id Peer identity of sending neighbor. - * @param message Message. - * @return #GNUNET_OK to keep the connection open, - * #GNUNET_SYSERR to close it (signal serious error) + * @param peer Message sender (neighbor). + * @param msg Message itself. */ -int -GCC_handle_broken (void* cls, - const struct GNUNET_PeerIdentity* id, - const struct GNUNET_MessageHeader* message); +void +GCC_handle_broken (struct CadetPeer *peer, + const struct GNUNET_CADET_ConnectionBroken *msg); /** - * Core handler for tunnel destruction + * Handler for notifications of destroyed connections. * - * @param cls Closure (unused). - * @param peer Peer identity of sending neighbor. - * @param message Message. - * - * @return GNUNET_OK to keep the connection open, - * GNUNET_SYSERR to close it (signal serious error) + * @param peer Message sender (neighbor). + * @param msg Message itself. */ -int -GCC_handle_destroy (void *cls, const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_MessageHeader *message); +void +GCC_handle_destroy (struct CadetPeer *peer, + const struct GNUNET_CADET_ConnectionDestroy *msg); /** - * Core handler for key exchange traffic (ephemeral key, ping, pong). + * Handler for cadet network traffic hop-by-hop acks. * - * @param cls Closure (unused). - * @param message Message received. - * @param peer Peer who sent the message. + * @param peer Message sender (neighbor). + * @param msg Message itself. + */ +void +GCC_handle_ack (struct CadetPeer *peer, + const struct GNUNET_CADET_ACK *msg); + +/** + * Handler for cadet network traffic hop-by-hop data counter polls. * - * @return GNUNET_OK to keep the connection open, - * GNUNET_SYSERR to close it (signal serious error) + * @param peer Message sender (neighbor). + * @param msg Message itself. */ -int -GCC_handle_kx (void *cls, const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_MessageHeader *message); +void +GCC_handle_poll (struct CadetPeer *peer, + const struct GNUNET_CADET_Poll *msg); /** - * Core handler for encrypted cadet network traffic (channel mgmt, data). + * Handler for key exchange traffic (Axolotl KX). * - * @param cls Closure (unused). - * @param message Message received. - * @param peer Peer who sent the message. + * @param peer Message sender (neighbor). + * @param msg Message itself. + */ +void +GCC_handle_kx (struct CadetPeer *peer, + const struct GNUNET_CADET_KX *msg); + +/** + * Handler for encrypted cadet network traffic (channel mgmt, data). * - * @return GNUNET_OK to keep the connection open, - * GNUNET_SYSERR to close it (signal serious error) + * @param peer Message sender (neighbor). + * @param msg Message itself. */ -int -GCC_handle_encrypted (void *cls, const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_MessageHeader *message); +void +GCC_handle_encrypted (struct CadetPeer *peer, + const struct GNUNET_CADET_AX *msg); /** * Core handler for axolotl key exchange traffic. @@ -229,34 +225,6 @@ int GCC_handle_ax (void *cls, const struct GNUNET_PeerIdentity *peer, struct GNUNET_MessageHeader *message); -/** - * Core handler for cadet network traffic point-to-point acks. - * - * @param cls closure - * @param message message - * @param peer peer identity this notification is about - * - * @return GNUNET_OK to keep the connection open, - * GNUNET_SYSERR to close it (signal serious error) - */ -int -GCC_handle_ack (void *cls, const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_MessageHeader *message); - -/** - * Core handler for cadet network traffic point-to-point ack polls. - * - * @param cls closure - * @param message message - * @param peer peer identity this notification is about - * - * @return GNUNET_OK to keep the connection open, - * GNUNET_SYSERR to close it (signal serious error) - */ -int -GCC_handle_poll (void *cls, const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_MessageHeader *message); - /** * Core handler for cadet keepalives. * @@ -301,11 +269,12 @@ GCC_shutdown (void); * Create a connection. * * @param cid Connection ID (either created locally or imposed remotely). - * @param t Tunnel this connection belongs to (or NULL); + * @param t Tunnel this connection belongs to (or NULL for transit connections); * @param path Path this connection has to use (copy is made). * @param own_pos Own position in the @c path path. * - * @return Newly created connection, NULL in case of error (own id not in path). + * @return Newly created connection. + * NULL in case of error: own id not in path, wrong neighbors, ... */ struct CadetConnection * GCC_new (const struct GNUNET_CADET_Hash *cid, @@ -525,6 +494,7 @@ GCC_cancel (struct CadetConnectionQueue *q); * @param message Message to send. Function makes a copy of it. * If message is not hop-by-hop, decrements TTL of copy. * @param payload_type Type of payload, in case the message is encrypted. + * @param payload_id ID of the payload (PID, ACK, ...). * @param c Connection on which this message is transmitted. * @param fwd Is this a fwd message? * @param force Force the connection to accept the message (buffer overfill). diff --git a/src/cadet/gnunet-service-cadet_local.c b/src/cadet/gnunet-service-cadet_local.c index 303eaee86..9be1224c1 100644 --- a/src/cadet/gnunet-service-cadet_local.c +++ b/src/cadet/gnunet-service-cadet_local.c @@ -720,8 +720,6 @@ show_peer_iterator (void *cls, struct CadetPeer *p = value; struct CadetTunnel *t; - GCP_debug (p, GNUNET_ERROR_TYPE_ERROR); - t = GCP_get_tunnel (p); if (NULL != t) GCT_debug (t, GNUNET_ERROR_TYPE_ERROR); diff --git a/src/cadet/gnunet-service-cadet_peer.c b/src/cadet/gnunet-service-cadet_peer.c index 64d9168fd..5ccd8f014 100644 --- a/src/cadet/gnunet-service-cadet_peer.c +++ b/src/cadet/gnunet-service-cadet_peer.c @@ -44,164 +44,145 @@ /******************************** STRUCTS **********************************/ /******************************************************************************/ + /** - * Struct containing info about a queued transmission to this peer + * Struct containing all information regarding a given peer */ -struct CadetPeerQueue +struct CadetPeer { /** - * DLL next + * ID of the peer */ - struct CadetPeerQueue *next; + GNUNET_PEER_Id id; /** - * DLL previous + * Last time we heard from this peer */ - struct CadetPeerQueue *prev; + struct GNUNET_TIME_Absolute last_contact; /** - * Peer this transmission is directed to. + * Paths to reach the peer, ordered by ascending hop count */ - struct CadetPeer *peer; + struct CadetPeerPath *path_head; /** - * Connection this message belongs to. + * Paths to reach the peer, ordered by ascending hop count */ - struct CadetConnection *c; + struct CadetPeerPath *path_tail; /** - * Is FWD in c? + * Handle to stop the DHT search for paths to this peer */ - int fwd; + struct GCD_search_handle *search_h; /** - * Pointer to info stucture used as cls. + * Handle to stop the DHT search for paths to this peer */ - void *cls; + struct GNUNET_SCHEDULER_Task *search_delayed; /** - * Type of message + * Tunnel to this peer, if any. */ - uint16_t type; + struct CadetTunnel *tunnel; /** - * Type of message + * Connections that go through this peer; indexed by tid. */ - uint16_t payload_type; + struct GNUNET_CONTAINER_MultiHashMap *connections; /** - * Type of message + * Handle for core transmissions. */ - uint32_t payload_id; + struct GNUNET_MQ_Handle *core_mq; /** - * Size of the message + * How many messages are in the queue to this peer. */ - size_t size; + unsigned int queue_n; /** - * Set when this message starts waiting for CORE. + * Hello message. */ - struct GNUNET_TIME_Absolute start_waiting; + struct GNUNET_HELLO_Message* hello; /** - * Function to call on sending. + * Handle to us offering the HELLO to the transport. */ - GCP_sent cont; + struct GNUNET_TRANSPORT_OfferHelloHandle *hello_offer; /** - * Closure for callback. + * Handle to our ATS request asking ATS to suggest an address + * to TRANSPORT for this peer (to establish a direct link). */ - void *cont_cls; + struct GNUNET_ATS_ConnectivitySuggestHandle *connectivity_suggestion; + }; /** - * Struct containing all information regarding a given peer + * Information about a queued message on the peer level. */ -struct CadetPeer -{ - /** - * ID of the peer - */ - GNUNET_PEER_Id id; - - /** - * Last time we heard from this peer - */ - struct GNUNET_TIME_Absolute last_contact; +struct CadetPeerQueue { /** - * Paths to reach the peer, ordered by ascending hop count - */ - struct CadetPeerPath *path_head; - - /** - * Paths to reach the peer, ordered by ascending hop count - */ - struct CadetPeerPath *path_tail; - - /** - * Handle to stop the DHT search for paths to this peer + * Envelope to cancel message before MQ sends it. */ - struct GCD_search_handle *search_h; + struct GNUNET_MQ_Envelope *env; /** - * Handle to stop the DHT search for paths to this peer + * Peer (neighbor) this message is being sent to. */ - struct GNUNET_SCHEDULER_Task *search_delayed; + struct CadetPeer *peer; /** - * Tunnel to this peer, if any. + * Continuation to call to notify higher layers about message sent. */ - struct CadetTunnel *tunnel; + GCP_sent cont; /** - * Connections that go through this peer; indexed by tid. + * Closure for @a cont. */ - struct GNUNET_CONTAINER_MultiHashMap *connections; + void *cont_cls; /** - * Handle for queued transmissions + * Time when message was queued for sending. */ - struct GNUNET_CORE_TransmitHandle *core_transmit; + struct GNUNET_TIME_Absolute queue_timestamp; /** - * Timestamp + * #GNUNET_YES if message was management traffic (POLL, ACK, ...). */ - struct GNUNET_TIME_Absolute tmt_time; + int management_traffic; /** - * Transmission queue to core DLL head + * Message type. */ - struct CadetPeerQueue *queue_head; + uint16_t type; /** - * Transmission queue to core DLL tail + * Message size. */ - struct CadetPeerQueue *queue_tail; + uint16_t size; /** - * How many messages are in the queue to this peer. + * Type of the message's payload, if it was encrypted data. */ - unsigned int queue_n; + uint16_t payload_type; /** - * Hello message. + *ID of the payload (PID, ACK #, ...). */ - struct GNUNET_HELLO_Message* hello; + uint16_t payload_id; /** - * Handle to us offering the HELLO to the transport. + * Connection this message was sent on. */ - struct GNUNET_TRANSPORT_OfferHelloHandle *hello_offer; + struct CadetConnection *c; /** - * Handle to our ATS request asking ATS to suggest an address - * to TRANSPORT for this peer (to establish a direct link). + * Direction in @a c this message was send on (#GNUNET_YES = FWD). */ - struct GNUNET_ATS_ConnectivitySuggestHandle *connectivity_suggestion; - + int c_fwd; }; @@ -260,98 +241,6 @@ static struct GNUNET_ATS_ConnectivityHandle *ats_ch; static int in_shutdown; -/******************************************************************************/ -/***************************** DEBUG *********************************/ -/******************************************************************************/ - -/** - * Log all kinds of info about the queueing status of a peer. - * - * @param p Peer whose queue to show. - * @param level Error level to use for logging. - */ -static void -queue_debug (const struct CadetPeer *p, enum GNUNET_ErrorType level) -{ - struct GNUNET_TIME_Relative core_wait_time; - struct CadetPeerQueue *q; - int do_log; - - do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK), - "cadet-p2p", - __FILE__, __FUNCTION__, __LINE__); - if (0 == do_log) - return; - - LOG2 (level, "QQQ Message queue towards %s\n", GCP_2s (p)); - LOG2 (level, "QQQ queue length: %u\n", p->queue_n); - LOG2 (level, "QQQ core tmt rdy: %p\n", p->core_transmit); - if (NULL != p->core_transmit) - { - core_wait_time = GNUNET_TIME_absolute_get_duration (p->tmt_time); - LOG2 (level, "QQQ core called %s ago\n", - GNUNET_STRINGS_relative_time_to_string (core_wait_time, GNUNET_NO)); - } - for (q = p->queue_head; NULL != q; q = q->next) - { - LOG2 (level, "QQQ - %s %s on %s\n", - GC_m2s (q->type), GC_f2s (q->fwd), GCC_2s (q->c)); - LOG2 (level, "QQQ payload %s, %u\n", - GC_m2s (q->payload_type), q->payload_id); - LOG2 (level, "QQQ size: %u bytes\n", q->size); - } - - LOG2 (level, "QQQ End queue towards %s\n", GCP_2s (p)); -} - - -/** - * Log all kinds of info about a peer. - * - * @param peer Peer. - */ -void -GCP_debug (const struct CadetPeer *p, enum GNUNET_ErrorType level) -{ - struct CadetPeerPath *path; - unsigned int conns; - int do_log; - - do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK), - "cadet-p2p", - __FILE__, __FUNCTION__, __LINE__); - if (0 == do_log) - return; - - if (NULL == p) - { - LOG2 (level, "PPP DEBUG PEER NULL\n"); - return; - } - - LOG2 (level, "PPP DEBUG PEER %s\n", GCP_2s (p)); - LOG2 (level, "PPP last contact %s\n", - GNUNET_STRINGS_absolute_time_to_string (p->last_contact)); - for (path = p->path_head; NULL != path; path = path->next) - { - char *s; - - s = path_2s (path); - LOG2 (level, "PPP path: %s\n", s); - GNUNET_free (s); - } - - LOG2 (level, "PPP core transmit handle %p\n", p->core_transmit); - LOG2 (level, "PPP DHT GET handle %p\n", p->search_h); - conns = 0; - if (NULL != p->connections) - conns += GNUNET_CONTAINER_multihashmap_size (p->connections); - LOG2 (level, "PPP # connections over link to peer: %u\n", conns); - queue_debug (p, level); - LOG2 (level, "PPP DEBUG END\n"); -} - - /******************************************************************************/ /***************************** CORE HELPERS *********************************/ /******************************************************************************/ @@ -415,12 +304,16 @@ pop_direct_path (struct CadetPeer *peer) /** * Method called whenever a given peer connects. * - * @param cls closure - * @param peer peer identity this notification is about + * @param cls Core closure (unused). + * @param peer Peer identity this notification is about + * @param mq Message Queue to this peer. + * + * @return Internal closure for handlers (CadetPeer struct). */ -static void -core_connect (void *cls, - const struct GNUNET_PeerIdentity *peer) +static void * +core_connect_handler (void *cls, + const struct GNUNET_PeerIdentity *peer, + struct GNUNET_MQ_Handle *mq) { struct CadetPeer *neighbor; struct CadetPeerPath *path; @@ -431,6 +324,8 @@ core_connect (void *cls, sizeof (own_id), "%s", GNUNET_i2s (&my_full_id)); + + /* Save a path to the neighbor */ neighbor = GCP_get (peer, GNUNET_YES); if (myid == neighbor->id) { @@ -448,11 +343,14 @@ core_connect (void *cls, path = path_new (2); path->peers[1] = neighbor->id; GNUNET_PEER_change_rc (neighbor->id, 1); + GNUNET_assert (NULL == neighbor->core_mq); + neighbor->core_mq = mq; } path->peers[0] = myid; GNUNET_PEER_change_rc (myid, 1); GCP_add_path (neighbor, path, GNUNET_YES); + /* Create the connections hashmap */ GNUNET_assert (NULL == neighbor->connections); neighbor->connections = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_NO); GNUNET_STATISTICS_update (stats, @@ -462,42 +360,47 @@ core_connect (void *cls, if ( (NULL != GCP_get_tunnel (neighbor)) && (0 > GNUNET_CRYPTO_cmp_peer_identity (&my_full_id, peer)) ) + { GCP_connect (neighbor); + } GCC_check_connections (); + + return neighbor; } /** * Method called whenever a peer disconnects. * - * @param cls closure - * @param peer peer identity this notification is about + * @param cls Core closure (unused). + * @param peer Peer identity this notification is about. + * @param internal_cls Internal closure (CadetPeer struct). */ static void -core_disconnect (void *cls, - const struct GNUNET_PeerIdentity *peer) +core_disconnect_handler (void *cls, + const struct GNUNET_PeerIdentity *peer, + void *internal_cls) { - struct CadetPeer *p; + struct CadetPeer *p = internal_cls; struct CadetPeerPath *direct_path; char own_id[16]; GCC_check_connections (); strncpy (own_id, GNUNET_i2s (&my_full_id), 16); own_id[15] = '\0'; - p = GNUNET_CONTAINER_multipeermap_get (peers, peer); - if (NULL == p) - { - GNUNET_break (GNUNET_YES == in_shutdown); - return; - } if (myid == p->id) + { LOG (GNUNET_ERROR_TYPE_INFO, "DISCONNECTED %s (self)\n", own_id); + } else + { LOG (GNUNET_ERROR_TYPE_INFO, "DISCONNECTED %s <= %s\n", own_id, GNUNET_i2s (peer)); + p->core_mq = NULL; + } direct_path = pop_direct_path (p); if (NULL != p->connections) { @@ -507,12 +410,6 @@ core_disconnect (void *cls, GNUNET_CONTAINER_multihashmap_destroy (p->connections); p->connections = NULL; } - if (NULL != p->core_transmit) - { - GNUNET_CORE_notify_transmit_ready_cancel (p->core_transmit); - p->core_transmit = NULL; - p->tmt_time.abs_value_us = 0; - } GNUNET_STATISTICS_update (stats, "# peers", -1, @@ -522,230 +419,349 @@ core_disconnect (void *cls, } -/** - * Functions to handle messages from core - */ -static struct GNUNET_CORE_MessageHandler core_handlers[] = { - {&GCC_handle_create, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE, 0}, - {&GCC_handle_confirm, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK, - sizeof (struct GNUNET_CADET_ConnectionACK)}, - {&GCC_handle_broken, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN, - sizeof (struct GNUNET_CADET_ConnectionBroken)}, - {&GCC_handle_destroy, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY, - sizeof (struct GNUNET_CADET_ConnectionDestroy)}, - {&GCC_handle_ack, GNUNET_MESSAGE_TYPE_CADET_ACK, - sizeof (struct GNUNET_CADET_ACK)}, - {&GCC_handle_poll, GNUNET_MESSAGE_TYPE_CADET_POLL, - sizeof (struct GNUNET_CADET_Poll)}, - {&GCC_handle_kx, GNUNET_MESSAGE_TYPE_CADET_KX, 0}, - {&GCC_handle_encrypted, GNUNET_MESSAGE_TYPE_CADET_AX, 0}, - {NULL, 0, 0} -}; - +/******************************************************************************/ +/******************************************************************************/ +/******************************************************************************/ +/******************************************************************************/ +/******************************************************************************/ /** - * To be called on core init/fail. + * Check if the create_connection message has the appropriate size. * - * @param cls Closure (config) - * @param identity the public identity of this peer + * @param cls Closure (unused). + * @param msg Message to check. + * + * @return #GNUNET_YES if size is correct, #GNUNET_NO otherwise. */ -static void -core_init (void *cls, - const struct GNUNET_PeerIdentity *identity) +static int +check_create (void *cls, const struct GNUNET_CADET_ConnectionCreate *msg) { - const struct GNUNET_CONFIGURATION_Handle *c = cls; - static int i = 0; + uint16_t size; - LOG (GNUNET_ERROR_TYPE_DEBUG, "Core init\n"); - if (0 != memcmp (identity, &my_full_id, sizeof (my_full_id))) + size = ntohs (msg->header.size); + if (size < sizeof (*msg)) { - LOG (GNUNET_ERROR_TYPE_ERROR, _("Wrong CORE service\n")); - LOG (GNUNET_ERROR_TYPE_ERROR, " core id %s\n", GNUNET_i2s (identity)); - LOG (GNUNET_ERROR_TYPE_ERROR, " my id %s\n", GNUNET_i2s (&my_full_id)); - GNUNET_CORE_disconnect (core_handle); - core_handle = GNUNET_CORE_connect (c, /* Main configuration */ - NULL, /* Closure passed to CADET functions */ - &core_init, /* Call core_init once connected */ - &core_connect, /* Handle connects */ - &core_disconnect, /* remove peers on disconnects */ - NULL, /* Don't notify about all incoming messages */ - GNUNET_NO, /* For header only in notification */ - NULL, /* Don't notify about all outbound messages */ - GNUNET_NO, /* For header-only out notification */ - core_handlers); /* Register these handlers */ - if (10 < i++) - GNUNET_assert (0); + GNUNET_break_op (0); + return GNUNET_NO; } - GML_start (); + return GNUNET_YES; } - /** - * Core callback to write a pre-constructed data packet to core buffer - * - * @param cls Closure (CadetTransmissionDescriptor with data in "data" member). - * @param size Number of bytes available in buf. - * @param buf Where the to write the message. - * - * @return number of bytes written to buf - */ -static size_t -send_core_data_raw (void *cls, size_t size, void *buf) + * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE + * + * @param cls Closure (CadetPeer for neighbor that sent the message). + * @param msg Message itself. + */ +static void +handle_create (void *cls, const struct GNUNET_CADET_ConnectionCreate *msg) { - struct GNUNET_MessageHeader *msg = cls; - size_t total_size; - - GNUNET_assert (NULL != msg); - total_size = ntohs (msg->size); - - if (total_size > size) - { - GNUNET_break (0); - return 0; - } - GNUNET_memcpy (buf, msg, total_size); - GNUNET_free (cls); - return total_size; + struct CadetPeer *peer = cls; + GCC_handle_create (peer, msg); } /** - * Function to send a create connection message to a peer. + * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK * - * @param c Connection to create. - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * @return number of bytes written to buf + * @param cls Closure (CadetPeer for neighbor that sent the message). + * @param msg Message itself. */ -static size_t -send_core_connection_create (struct CadetConnection *c, size_t size, void *buf) +static void +handle_confirm (void *cls, const struct GNUNET_CADET_ConnectionACK *msg) { - struct GNUNET_CADET_ConnectionCreate *msg; - struct GNUNET_PeerIdentity *peer_ptr; - const struct CadetPeerPath *p = GCC_get_path (c); - size_t size_needed; - int i; - - if (NULL == p) - return 0; - - LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending CONNECTION CREATE...\n"); - size_needed = - sizeof (struct GNUNET_CADET_ConnectionCreate) + - p->length * sizeof (struct GNUNET_PeerIdentity); - - if (size < size_needed || NULL == buf) - { - GNUNET_break (0); - return 0; - } - msg = (struct GNUNET_CADET_ConnectionCreate *) buf; - msg->header.size = htons (size_needed); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE); - msg->cid = *GCC_get_id (c); + struct CadetPeer *peer = cls; + GCC_handle_confirm (peer, msg); +} - peer_ptr = (struct GNUNET_PeerIdentity *) &msg[1]; - for (i = 0; i < p->length; i++) - { - GNUNET_PEER_resolve (p->peers[i], peer_ptr++); - } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "CONNECTION CREATE (%u bytes long) sent!\n", - size_needed); - return size_needed; +/** + * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN + * + * @param cls Closure (CadetPeer for neighbor that sent the message). + * @param msg Message itself. + */ +static void +handle_broken (void *cls, const struct GNUNET_CADET_ConnectionBroken *msg) +{ + struct CadetPeer *peer = cls; + GCC_handle_broken (peer, msg); } /** - * Creates a path ack message in buf and frees all unused resources. - * - * @param c Connection to send an ACK on. - * @param size number of bytes available in buf - * @param buf where the callee should write the message + * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY * - * @return number of bytes written to buf + * @param cls Closure (CadetPeer for neighbor that sent the message). + * @param msg Message itself. */ -static size_t -send_core_connection_ack (struct CadetConnection *c, size_t size, void *buf) +static void +handle_destroy (void *cls, const struct GNUNET_CADET_ConnectionDestroy *msg) { - struct GNUNET_CADET_ConnectionACK *msg = buf; + struct CadetPeer *peer = cls; + GCC_handle_destroy (peer, msg); +} - LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending CONNECTION ACK...\n"); - if (sizeof (struct GNUNET_CADET_ConnectionACK) > size) - { - GNUNET_break (0); - return 0; - } - msg->header.size = htons (sizeof (struct GNUNET_CADET_ConnectionACK)); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK); - msg->cid = *GCC_get_id (c); - LOG (GNUNET_ERROR_TYPE_DEBUG, "CONNECTION ACK sent!\n"); - return sizeof (struct GNUNET_CADET_ConnectionACK); +/** + * Handle for #GNUNET_MESSAGE_TYPE_CADET_ACK + * + * @param cls Closure (CadetPeer for neighbor that sent the message). + * @param msg Message itself. + */ +static void +handle_ack (void *cls, const struct GNUNET_CADET_ACK *msg) +{ + struct CadetPeer *peer = cls; + GCC_handle_ack (peer, msg); } -/******************************************************************************/ -/******************************** STATIC ***********************************/ -/******************************************************************************/ +/** + * Handle for #GNUNET_MESSAGE_TYPE_CADET_POLL + * + * @param cls Closure (CadetPeer for neighbor that sent the message). + * @param msg Message itself. + */ +static void +handle_poll (void *cls, const struct GNUNET_CADET_Poll *msg) +{ + struct CadetPeer *peer = cls; + GCC_handle_poll (peer, msg); +} /** - * Get priority for a queued message. + * Check if the Key eXchange message has the appropriate size. * - * @param q Queued message + * @param cls Closure (unused). + * @param msg Message to check. * - * @return CORE priority to use. + * @return #GNUNET_YES if size is correct, #GNUNET_NO otherwise. */ -static enum GNUNET_CORE_Priority -get_priority (struct CadetPeerQueue *q) +static int +check_kx (void *cls, const struct GNUNET_CADET_KX *msg) { - enum GNUNET_CORE_Priority low; - enum GNUNET_CORE_Priority high; + uint16_t size; + uint16_t expected_size; - if (NULL == q) - { - GNUNET_break (0); - return GNUNET_CORE_PRIO_BACKGROUND; - } + size = ntohs (msg->header.size); + expected_size = sizeof (struct GNUNET_CADET_KX) + + sizeof (struct GNUNET_MessageHeader); - /* Relayed traffic has lower priority, our own traffic has higher */ - if (NULL == q->c || GNUNET_NO == GCC_is_origin (q->c, q->fwd)) - { - low = GNUNET_CORE_PRIO_BEST_EFFORT; - high = GNUNET_CORE_PRIO_URGENT; - } - else + if (size < expected_size) { - low = GNUNET_CORE_PRIO_URGENT; - high = GNUNET_CORE_PRIO_CRITICAL_CONTROL; + GNUNET_break_op (0); + return GNUNET_NO; } + return GNUNET_YES; +} - /* Bulky payload has lower priority, control traffic has higher. */ - if (GNUNET_MESSAGE_TYPE_CADET_AX == q->type) - return low; - return high; +/** + * Handle for #GNUNET_MESSAGE_TYPE_CADET_KX + * + * @param cls Closure (CadetPeer for neighbor that sent the message). + * @param msg Message itself. + */ +static void +handle_kx (void *cls, const struct GNUNET_CADET_KX *msg) +{ + struct CadetPeer *peer = cls; + GCC_handle_kx (peer, msg); } /** - * Destroy the peer_info and free any allocated resources linked to it + * Check if the encrypted message has the appropriate size. * - * @param peer The peer_info to destroy. - * @return #GNUNET_OK on success + * @param cls Closure (unused). + * @param msg Message to check. + * + * @return #GNUNET_YES if size is correct, #GNUNET_NO otherwise. */ static int -peer_destroy (struct CadetPeer *peer) +check_encrypted (void *cls, const struct GNUNET_CADET_AX *msg) { - struct GNUNET_PeerIdentity id; - struct CadetPeerPath *p; - struct CadetPeerPath *nextp; + uint16_t size; + uint16_t minimum_size; - GNUNET_PEER_resolve (peer->id, &id); - GNUNET_PEER_change_rc (peer->id, -1); + size = ntohs (msg->header.size); + minimum_size = sizeof (struct GNUNET_CADET_AX) + + sizeof (struct GNUNET_MessageHeader); - LOG (GNUNET_ERROR_TYPE_INFO, + if (size < minimum_size) + { + GNUNET_break_op (0); + return GNUNET_NO; + } + return GNUNET_YES; +} + +/** + * Handle for #GNUNET_MESSAGE_TYPE_CADET_AX (AXolotl encrypted traffic). + * + * @param cls Closure (CadetPeer for neighbor that sent the message). + * @param msg Message itself. + */ +static void +handle_encrypted (void *cls, const struct GNUNET_CADET_AX *msg) +{ + struct CadetPeer *peer = cls; + GCC_handle_encrypted (peer, msg); +} + + +/** + * To be called on core init/fail. + * + * @param cls Closure (config) + * @param identity The public identity of this peer. + */ +static void +core_init_notify (void *cls, + const struct GNUNET_PeerIdentity *identity); + + +static void +connect_to_core (const struct GNUNET_CONFIGURATION_Handle *c) +{ + struct GNUNET_MQ_MessageHandler core_handlers[] = { + GNUNET_MQ_hd_var_size (create, + GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE, + struct GNUNET_CADET_ConnectionCreate, + NULL), + GNUNET_MQ_hd_fixed_size (confirm, + GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK, + struct GNUNET_CADET_ConnectionACK, + NULL), + GNUNET_MQ_hd_fixed_size (broken, + GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN, + struct GNUNET_CADET_ConnectionBroken, + NULL), + GNUNET_MQ_hd_fixed_size (destroy, + GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY, + struct GNUNET_CADET_ConnectionDestroy, + NULL), + GNUNET_MQ_hd_fixed_size (ack, + GNUNET_MESSAGE_TYPE_CADET_ACK, + struct GNUNET_CADET_ACK, + NULL), + GNUNET_MQ_hd_fixed_size (poll, + GNUNET_MESSAGE_TYPE_CADET_POLL, + struct GNUNET_CADET_Poll, + NULL), + GNUNET_MQ_hd_var_size (kx, + GNUNET_MESSAGE_TYPE_CADET_KX, + struct GNUNET_CADET_KX, + NULL), + GNUNET_MQ_hd_var_size (encrypted, + GNUNET_MESSAGE_TYPE_CADET_AX, + struct GNUNET_CADET_AX, + NULL), + GNUNET_MQ_handler_end () + }; + core_handle = GNUNET_CORE_connecT (c, NULL, + &core_init_notify, + &core_connect_handler, + &core_disconnect_handler, + core_handlers); +} + +/******************************************************************************/ +/******************************************************************************/ +/******************************************************************************/ +/******************************************************************************/ +/******************************************************************************/ + +/** + * To be called on core init/fail. + * + * @param cls Closure (config) + * @param identity The public identity of this peer. + */ +static void +core_init_notify (void *cls, + const struct GNUNET_PeerIdentity *core_identity) +{ + const struct GNUNET_CONFIGURATION_Handle *c = cls; + + LOG (GNUNET_ERROR_TYPE_DEBUG, "Core init\n"); + if (0 != memcmp (core_identity, &my_full_id, sizeof (my_full_id))) + { + LOG (GNUNET_ERROR_TYPE_ERROR, _("Wrong CORE service\n")); + LOG (GNUNET_ERROR_TYPE_ERROR, " core id %s\n", GNUNET_i2s (core_identity)); + LOG (GNUNET_ERROR_TYPE_ERROR, " my id %s\n", GNUNET_i2s (&my_full_id)); + GNUNET_CORE_disconnecT (core_handle); + connect_to_core (c); + return; + } + GML_start (); +} + + +/******************************************************************************/ +/******************************** STATIC ***********************************/ +/******************************************************************************/ + + +/** + * Get priority for a queued message. + * + * @param q Queued message + * + * @return CORE priority to use. + * + * FIXME make static + * FIXME use when sending + */ +enum GNUNET_CORE_Priority +get_priority (struct CadetPeerQueue *q) +{ + enum GNUNET_CORE_Priority low; + enum GNUNET_CORE_Priority high; + + if (NULL == q) + { + GNUNET_break (0); + return GNUNET_CORE_PRIO_BACKGROUND; + } + + /* Relayed traffic has lower priority, our own traffic has higher */ + if (NULL == q->c || GNUNET_NO == GCC_is_origin (q->c, q->c_fwd)) + { + low = GNUNET_CORE_PRIO_BEST_EFFORT; + high = GNUNET_CORE_PRIO_URGENT; + } + else + { + low = GNUNET_CORE_PRIO_URGENT; + high = GNUNET_CORE_PRIO_CRITICAL_CONTROL; + } + + /* Bulky payload has lower priority, control traffic has higher. */ + if (GNUNET_MESSAGE_TYPE_CADET_AX == q->type) + return low; + return high; +} + + +/** + * Destroy the peer_info and free any allocated resources linked to it + * + * @param peer The peer_info to destroy. + * @return #GNUNET_OK on success + */ +static int +peer_destroy (struct CadetPeer *peer) +{ + struct GNUNET_PeerIdentity id; + struct CadetPeerPath *p; + struct CadetPeerPath *nextp; + + GNUNET_PEER_resolve (peer->id, &id); + GNUNET_PEER_change_rc (peer->id, -1); + + LOG (GNUNET_ERROR_TYPE_INFO, "destroying peer %s\n", GNUNET_i2s (&id)); @@ -784,20 +800,6 @@ peer_destroy (struct CadetPeer *peer) GNUNET_ATS_connectivity_suggest_cancel (peer->connectivity_suggestion); peer->connectivity_suggestion = NULL; } - while (NULL != peer->queue_head) - { - /* This function destroys the current peer->queue_head but - * replaces it with the next in the queue, so it is correct - * to while() here. - */ - GCP_queue_destroy (peer->queue_head, GNUNET_YES, GNUNET_NO, 0); - } - if (NULL != peer->core_transmit) - { - GNUNET_break (0); /* GCP_queue_destroy should've cancelled it! */ - GNUNET_CORE_notify_transmit_ready_cancel (peer->core_transmit); - peer->core_transmit = NULL; - } GNUNET_free_non_null (peer->hello); GNUNET_free (peer); @@ -831,7 +833,6 @@ shutdown_peer (void *cls, } - /** * Check if peer is searching for a path (either active or delayed search). * @@ -995,64 +996,6 @@ peer_get_best_path (const struct CadetPeer *peer) } -/** - * Is this queue element sendable? - * - * - All management traffic is always sendable. - * - For payload traffic, check the connection flow control. - * - * @param q Queue element to inspect. - * @return #GNUNET_YES if it is sendable, #GNUNET_NO otherwise. - */ -static int -queue_is_sendable (struct CadetPeerQueue *q) -{ - /* Is PID-independent? */ - switch (q->type) - { - case GNUNET_MESSAGE_TYPE_CADET_ACK: - case GNUNET_MESSAGE_TYPE_CADET_POLL: - case GNUNET_MESSAGE_TYPE_CADET_KX: - case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE: - case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK: - case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY: - case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN: - return GNUNET_YES; - - case GNUNET_MESSAGE_TYPE_CADET_AX: - break; - - default: - GNUNET_break (0); - } - - return GCC_is_sendable (q->c, q->fwd); -} - - -/** - * Get first sendable message. - * - * @param peer The destination peer. - * - * @return First transmittable message, if any. Otherwise, NULL. - */ -static struct CadetPeerQueue * -peer_get_first_message (const struct CadetPeer *peer) -{ - struct CadetPeerQueue *q; - - for (q = peer->queue_head; NULL != q; q = q->next) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, "Checking q:%p on c:%s\n", q, GCC_2s (q->c)); - if (queue_is_sendable (q)) - return q; - } - - return NULL; -} - - /** * Function to process paths received for a new peer addition. The recorded * paths form the initial tunnel, which can be optimized later. @@ -1089,19 +1032,6 @@ search_handler (void *cls, const struct CadetPeerPath *path) } -/** - * Adjust core requested size to accomodate an ACK. - * - * @param message_size Requested size. - * - * @return Size enough to fit @c message_size and an ACK. - */ -static size_t -get_core_size (size_t message_size) -{ - return message_size + sizeof (struct GNUNET_CADET_ACK); -} - /** * Test if a message type is connection management traffic * or regular payload traffic. @@ -1118,86 +1048,14 @@ is_connection_management (uint16_t type) } -/** - * Fill a core buffer with the appropriate data for the queued message. - * - * @param queue Queue element for the message. - * @param buf Core buffer to fill. - * @param size Size remaining in @c buf. - * @param[out] pid In case its an encrypted payload, set payload. - * - * @return Bytes written to @c buf. - */ -static size_t -fill_buf (struct CadetPeerQueue *queue, void *buf, size_t size, uint32_t *pid) -{ - struct CadetConnection *c = queue->c; - size_t msg_size; - - switch (queue->type) - { - case GNUNET_MESSAGE_TYPE_CADET_AX: - *pid = GCC_get_pid (queue->c, queue->fwd); - LOG (GNUNET_ERROR_TYPE_DEBUG, " ax payload ID %u\n", *pid); - msg_size = send_core_data_raw (queue->cls, size, buf); - ((struct GNUNET_CADET_AX *) buf)->pid = htonl (*pid); - break; - case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY: - case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN: - case GNUNET_MESSAGE_TYPE_CADET_KX: - case GNUNET_MESSAGE_TYPE_CADET_ACK: - case GNUNET_MESSAGE_TYPE_CADET_POLL: - LOG (GNUNET_ERROR_TYPE_DEBUG, " raw %s\n", GC_m2s (queue->type)); - msg_size = send_core_data_raw (queue->cls, size, buf); - break; - case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE: - LOG (GNUNET_ERROR_TYPE_DEBUG, " path create\n"); - if (GCC_is_origin (c, GNUNET_YES)) - msg_size = send_core_connection_create (c, size, buf); - else - msg_size = send_core_data_raw (queue->cls, size, buf); - break; - case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK: - LOG (GNUNET_ERROR_TYPE_DEBUG, " path ack\n"); - if (GCC_is_origin (c, GNUNET_NO) || - GCC_is_origin (c, GNUNET_YES)) - { - msg_size = send_core_connection_ack (c, size, buf); - } - else - { - msg_size = send_core_data_raw (queue->cls, size, buf); - } - break; - case GNUNET_MESSAGE_TYPE_CADET_DATA: - case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_CREATE: - case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY: - /* This should be encapsulted */ - msg_size = 0; - GNUNET_assert (0); - break; - default: - GNUNET_break (0); - LOG (GNUNET_ERROR_TYPE_WARNING, " type unknown: %u\n", queue->type); - msg_size = 0; - } - - GNUNET_assert (size >= msg_size); - - return msg_size; -} - - /** * Debug function should NEVER return true in production code, useful to * simulate losses for testcases. * - * @param q Queue handle with info about the message. - * * @return #GNUNET_YES or #GNUNET_NO with the decision to drop. */ static int -should_I_drop (struct CadetPeerQueue *q) +should_I_drop (void) { if (0 == drop_percent) return GNUNET_NO; @@ -1209,297 +1067,87 @@ should_I_drop (struct CadetPeerQueue *q) } -/** - * Core callback to write a queued packet to core buffer - * - * @param cls Closure (peer info). - * @param size Number of bytes available in buf. - * @param buf Where the to write the message. - * - * @return number of bytes written to buf - */ -static size_t -queue_send (void *cls, size_t size, void *buf) -{ - struct CadetPeer *peer = cls; - struct CadetConnection *c; - struct CadetPeerQueue *queue; - struct GNUNET_TIME_Relative core_wait_time; - const char *wait_s; - const struct GNUNET_PeerIdentity *dst_id; - size_t msg_size; - size_t total_size; - size_t rest; - char *dst; - uint32_t pid; - - GCC_check_connections (); - LOG (GNUNET_ERROR_TYPE_DEBUG, "\n"); - LOG (GNUNET_ERROR_TYPE_DEBUG, "\n"); - LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue send towards %s (max %u)\n", - GCP_2s (peer), size); - - /* Sanity checking */ - if (NULL == buf || 0 == size) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, " not allowed/\n"); - if (GNUNET_NO == in_shutdown) - { - queue = peer_get_first_message (peer); - if (NULL == queue) - { - peer->core_transmit = NULL; - peer->tmt_time.abs_value_us = 0; - GCC_check_connections (); - return 0; - } - dst_id = GNUNET_PEER_resolve2 (peer->id); - peer->core_transmit = - GNUNET_CORE_notify_transmit_ready (core_handle, - GNUNET_NO, get_priority (queue), - GNUNET_TIME_UNIT_FOREVER_REL, - dst_id, - get_core_size (queue->size), - &queue_send, - peer); - peer->tmt_time = GNUNET_TIME_absolute_get (); - } - else - { - peer->core_transmit = NULL; - peer->tmt_time.abs_value_us = 0; - } - GCC_check_connections (); - return 0; - } - - /* Init */ - rest = size; - total_size = 0; - dst = (char *) buf; - pid = 0; - peer->core_transmit = NULL; - queue = peer_get_first_message (peer); - if (NULL == queue) - { - GNUNET_break (0); /* Core tmt_rdy should've been canceled */ - peer->tmt_time.abs_value_us = 0; - return 0; - } - core_wait_time = GNUNET_TIME_absolute_get_duration (peer->tmt_time); - wait_s = GNUNET_STRINGS_relative_time_to_string (core_wait_time, GNUNET_YES); - if (core_wait_time.rel_value_us >= 1000000) - { - LOG (GNUNET_ERROR_TYPE_WARNING, - " %s: core wait time %s (> 1 second) for %u bytes\n", - GCP_2s (peer), wait_s, queue->size); - } - peer->tmt_time.abs_value_us = 0; - - /* Copy all possible messages to the core buffer */ - while (NULL != queue && rest >= queue->size) - { - c = queue->c; - - LOG (GNUNET_ERROR_TYPE_DEBUG, " on conn %s %s\n", - GCC_2s (c), GC_f2s(queue->fwd)); - LOG (GNUNET_ERROR_TYPE_DEBUG, " size %u ok (%u/%u)\n", - queue->size, total_size, size); - - msg_size = fill_buf (queue, (void *) dst, size, &pid); - - if (should_I_drop (queue)) - { - LOG (GNUNET_ERROR_TYPE_WARNING, "DD %s (%s %u) on conn %s %s\n", - GC_m2s (queue->type), GC_m2s (queue->payload_type), - queue->payload_id, GCC_2s (c), GC_f2s (queue->fwd)); - msg_size = 0; - } - else - { - LOG (GNUNET_ERROR_TYPE_INFO, - ">>> %s (%s %4u) on conn %s (%p) %s [%5u], after %s\n", - GC_m2s (queue->type), GC_m2s (queue->payload_type), - queue->payload_id, GCC_2s (c), c, - GC_f2s (queue->fwd), msg_size, wait_s); - } - total_size += msg_size; - rest -= msg_size; - dst = &dst[msg_size]; - msg_size = 0; - - /* Free queue, but cls was freed by send_core_* in fill_buf. */ - (void) GCP_queue_destroy (queue, GNUNET_NO, GNUNET_YES, pid); - - /* Next! */ - queue = peer_get_first_message (peer); - } - - /* If more data in queue, send next */ - if (NULL != queue) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, " more data! (%u)\n", queue->size); - if (NULL == peer->core_transmit) - { - dst_id = GNUNET_PEER_resolve2 (peer->id); - peer->core_transmit = - GNUNET_CORE_notify_transmit_ready (core_handle, - GNUNET_NO, get_priority (queue), - GNUNET_TIME_UNIT_FOREVER_REL, - dst_id, - get_core_size (queue->size), - &queue_send, - peer); - peer->tmt_time = GNUNET_TIME_absolute_get (); - queue->start_waiting = GNUNET_TIME_absolute_get (); - } - else - { - LOG (GNUNET_ERROR_TYPE_DEBUG, "* tmt rdy called somewhere else\n"); - } -// GCC_start_poll (); FIXME needed? - } - else - { -// GCC_stop_poll(); FIXME needed? - } - - LOG (GNUNET_ERROR_TYPE_DEBUG, " return %d\n", total_size); - queue_debug (peer, GNUNET_ERROR_TYPE_DEBUG); - GCC_check_connections (); - return total_size; -} - - /******************************************************************************/ /******************************** API ***********************************/ /******************************************************************************/ - /** - * Free a transmission that was already queued with all resources - * associated to the request. + * Call the continuation after a message has been sent or dropped. * - * If connection was marked to be destroyed, and this was the last queued - * message on it, the connection will be free'd as a result. - * - * @param queue Queue handler to cancel. - * @param clear_cls Is it necessary to free associated cls? - * @param sent Was it really sent? (Could have been canceled) - * @param pid PID, if relevant (was sent and was a payload message). - * - * @return #GNUNET_YES if connection was destroyed as a result, - * #GNUNET_NO otherwise. + * @param q Queue handle. + * @param sent #GNUNET_YES if was sent to CORE, #GNUNET_NO if dropped. */ -int -GCP_queue_destroy (struct CadetPeerQueue *queue, - int clear_cls, - int sent, - uint32_t pid) +static void +call_peer_cont (const struct CadetPeerQueue *q, int sent) { - struct CadetPeer *peer; - int connection_destroyed; - - GCC_check_connections (); - peer = queue->peer; - LOG (GNUNET_ERROR_TYPE_DEBUG, "queue destroy %s\n", GC_m2s (queue->type)); - if (GNUNET_YES == clear_cls) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, " free cls\n"); - switch (queue->type) - { - case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY: - LOG (GNUNET_ERROR_TYPE_INFO, "destroying a DESTROY message\n"); - /* fall through */ - case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK: - case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE: - case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN: - case GNUNET_MESSAGE_TYPE_CADET_KX: - case GNUNET_MESSAGE_TYPE_CADET_AX: - case GNUNET_MESSAGE_TYPE_CADET_ACK: - case GNUNET_MESSAGE_TYPE_CADET_POLL: - GNUNET_free_non_null (queue->cls); - break; - - default: - GNUNET_break (0); - LOG (GNUNET_ERROR_TYPE_ERROR, " type %s unknown!\n", - GC_m2s (queue->type)); - } - } - GNUNET_CONTAINER_DLL_remove (peer->queue_head, peer->queue_tail, queue); - - if (!is_connection_management (queue->type)) - { - peer->queue_n--; - } - - if (NULL != queue->cont) + LOG (GNUNET_ERROR_TYPE_DEBUG, " core mq just sent %s\n", GC_m2s (q->type)); + if (NULL != q->cont) { struct GNUNET_TIME_Relative wait_time; - wait_time = GNUNET_TIME_absolute_get_duration (queue->start_waiting); + wait_time = GNUNET_TIME_absolute_get_duration (q->queue_timestamp); LOG (GNUNET_ERROR_TYPE_DEBUG, " calling callback, time elapsed %s\n", GNUNET_STRINGS_relative_time_to_string (wait_time, GNUNET_NO)); - connection_destroyed = queue->cont (queue->cont_cls, - queue->c, sent, queue->type, pid, - queue->fwd, queue->size, wait_time); - } - else - { - connection_destroyed = GNUNET_NO; + q->cont (q->cont_cls, + q->c, q->c_fwd, sent, + q->type, q->payload_type, q->payload_id, + q->size, wait_time); } +} + + +/** + * Function called by MQ when a message is sent to CORE. + * + * @param cls Closure (queue handle). + */ +static void +mq_sent (void *cls) +{ + struct CadetPeerQueue *q = cls; - if (NULL == peer_get_first_message (peer) && NULL != peer->core_transmit) + if (GNUNET_NO == q->management_traffic) { - GNUNET_CORE_notify_transmit_ready_cancel (peer->core_transmit); - peer->core_transmit = NULL; - peer->tmt_time.abs_value_us = 0; + q->peer->queue_n--; } - - GNUNET_free (queue); - GCC_check_connections (); - return connection_destroyed; + call_peer_cont (q, GNUNET_YES); + GNUNET_free (q); } /** - * @brief Queue and pass message to core when possible. + * @brief Send a message to another peer (using CORE). * * @param peer Peer towards which to queue the message. - * @param cls Closure (@c type dependant). It will be used by queue_send to - * build the message to be sent if not already prebuilt. - * @param type Type of the message. - * @param payload_type Type of the message's payload + * @param message Message to send. + * @param payload_type Type of the message's payload, for debug messages. * 0 if the message is a retransmission (unknown payload). * UINT16_MAX if the message does not have payload. * @param payload_id ID of the payload (MID, ACK #, etc) - * @param size Size of the message. * @param c Connection this message belongs to (can be NULL). * @param fwd Is this a message going root->dest? (FWD ACK are NOT FWD!) - * @param cont Continuation to be called once CORE has taken the message. + * @param cont Continuation to be called once CORE has sent the message. * @param cont_cls Closure for @c cont. * - * @return Handle to cancel the message before it is sent. Once cont is called - * message has been sent and therefore the handle is no longer valid. + * @return A handle to the message in the queue or NULL (if dropped). */ struct CadetPeerQueue * -GCP_queue_add (struct CadetPeer *peer, - void *cls, - uint16_t type, - uint16_t payload_type, - uint32_t payload_id, - size_t size, - struct CadetConnection *c, - int fwd, - GCP_sent cont, - void *cont_cls) +GCP_send (struct CadetPeer *peer, + const struct GNUNET_MessageHeader *message, + uint16_t payload_type, + uint32_t payload_id, + struct CadetConnection *c, + int fwd, + GCP_sent cont, + void *cont_cls) { struct CadetPeerQueue *q; - int priority; - int call_core; + uint16_t type; + uint16_t size; GCC_check_connections (); + type = ntohs (message->type); + size = ntohs (message->size); LOG (GNUNET_ERROR_TYPE_DEBUG, "que %s (%s %4u) on conn %s (%p) %s towards %s (size %u)\n", GC_m2s (type), GC_m2s (payload_type), payload_id, @@ -1508,282 +1156,68 @@ GCP_queue_add (struct CadetPeer *peer, if (NULL == peer->connections) { /* We are not connected to this peer, ignore request. */ + GNUNET_break (0); LOG (GNUNET_ERROR_TYPE_INFO, "%s not a neighbor\n", GCP_2s (peer)); GNUNET_STATISTICS_update (stats, "# messages dropped due to wrong hop", 1, GNUNET_NO); return NULL; } - priority = 0; - if (is_connection_management (type)) - { - priority = 100; - } - LOG (GNUNET_ERROR_TYPE_DEBUG, "priority %d\n", priority); - - call_core = (NULL == c || GNUNET_MESSAGE_TYPE_CADET_KX == type) ? - GNUNET_YES : GCC_is_sendable (c, fwd); q = GNUNET_new (struct CadetPeerQueue); - q->cls = cls; + q->env = GNUNET_MQ_msg_copy (message); + q->peer = peer; + q->cont = cont; + q->cont_cls = cont_cls; + q->queue_timestamp = GNUNET_TIME_absolute_get (); + q->management_traffic = is_connection_management (type); q->type = type; + q->size = size; q->payload_type = payload_type; q->payload_id = payload_id; - q->size = size; - q->peer = peer; q->c = c; - q->fwd = fwd; - q->cont = cont; - q->cont_cls = cont_cls; - if (100 > priority) - { - GNUNET_CONTAINER_DLL_insert_tail (peer->queue_head, peer->queue_tail, q); - peer->queue_n++; - } - else - { - GNUNET_CONTAINER_DLL_insert (peer->queue_head, peer->queue_tail, q); - call_core = GNUNET_YES; - } + q->c_fwd = fwd; + GNUNET_MQ_notify_sent (q->env, mq_sent, q); - q->start_waiting = GNUNET_TIME_absolute_get (); - if (NULL == peer->core_transmit && GNUNET_YES == call_core) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "calling core tmt rdy towards %s for %u bytes\n", - GCP_2s (peer), size); - peer->core_transmit = - GNUNET_CORE_notify_transmit_ready (core_handle, - GNUNET_NO, get_priority (q), - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_PEER_resolve2 (peer->id), - get_core_size (size), - &queue_send, peer); - peer->tmt_time = GNUNET_TIME_absolute_get (); - } - else if (GNUNET_NO == call_core) + if (GNUNET_YES == q->management_traffic) { - LOG (GNUNET_ERROR_TYPE_DEBUG, "core tmt rdy towards %s not needed\n", - GCP_2s (peer)); - + GNUNET_MQ_send (peer->core_mq, q->env); // FIXME implement "_urgent", use } else { - struct GNUNET_TIME_Relative elapsed; - elapsed = GNUNET_TIME_absolute_get_duration (peer->tmt_time); - LOG (GNUNET_ERROR_TYPE_DEBUG, "core tmt rdy towards %s already called %s\n", - GCP_2s (peer), - GNUNET_STRINGS_relative_time_to_string (elapsed, GNUNET_NO)); - - } - queue_debug (peer, GNUNET_ERROR_TYPE_DEBUG); - GCC_check_connections (); - return q; -} - - -/** - * Cancel all queued messages to a peer that belong to a certain connection. - * - * @param peer Peer towards whom to cancel. - * @param c Connection whose queued messages to cancel. Might be destroyed by - * the sent continuation call. - */ -void -GCP_queue_cancel (struct CadetPeer *peer, - struct CadetConnection *c) -{ - struct CadetPeerQueue *q; - struct CadetPeerQueue *next; - struct CadetPeerQueue *prev; - int connection_destroyed; - - GCC_check_connections (); - connection_destroyed = GNUNET_NO; - for (q = peer->queue_head; NULL != q; q = next) - { - prev = q->prev; - if (q->c == c) + if (GNUNET_YES == should_I_drop()) { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "GMP queue cancel %s\n", - GC_m2s (q->type)); - GNUNET_assert (GNUNET_NO == connection_destroyed); - if (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY == q->type) - { - q->c = NULL; - } - else - { - connection_destroyed = GCP_queue_destroy (q, GNUNET_YES, GNUNET_NO, 0); - } - - /* Get next from prev, q->next might be already freed: - * queue destroy -> callback -> GCC_destroy -> cancel_queues -> here - */ - if (NULL == prev) - next = peer->queue_head; - else - next = prev->next; - } - else - { - next = q->next; - } - } - - if ( (NULL == peer->queue_head) && - (NULL != peer->core_transmit) ) - { - GNUNET_CORE_notify_transmit_ready_cancel (peer->core_transmit); - peer->core_transmit = NULL; - peer->tmt_time.abs_value_us = 0; - } - GCC_check_connections (); -} - - -/** - * Get the first transmittable message for a connection. - * - * @param peer Neighboring peer. - * @param c Connection. - * - * @return First transmittable message. - */ -static struct CadetPeerQueue * -connection_get_first_message (struct CadetPeer *peer, struct CadetConnection *c) -{ - struct CadetPeerQueue *q; - - for (q = peer->queue_head; NULL != q; q = q->next) - { - if (q->c != c) - continue; - if (queue_is_sendable (q)) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, " sendable!!\n"); - return q; + LOG (GNUNET_ERROR_TYPE_WARNING, "DD %s (%s %u) on conn %s %s\n", + GC_m2s (q->type), GC_m2s (q->payload_type), + q->payload_id, GCC_2s (c), GC_f2s (q->c_fwd)); + GNUNET_MQ_discard (q->env); + call_peer_cont (q, GNUNET_NO); + GNUNET_free (q); + return NULL; } - LOG (GNUNET_ERROR_TYPE_DEBUG, " not sendable\n"); + GNUNET_MQ_send (peer->core_mq, q->env); + peer->queue_n++; } - return NULL; -} - - -/** - * Get the first message for a connection and unqueue it. - * - * Only tunnel (or higher) level messages are unqueued. Connection specific - * messages are silently destroyed upon encounter. - * - * @param peer Neighboring peer. - * @param c Connection. - * @param destroyed[in/out] Was the connection destroyed (prev/as a result)?. - * Can NOT be NULL. - * - * @return First message for this connection. - */ -struct GNUNET_MessageHeader * -GCP_connection_pop (struct CadetPeer *peer, - struct CadetConnection *c, - int *destroyed) -{ - struct CadetPeerQueue *q; - struct CadetPeerQueue *next; - struct GNUNET_MessageHeader *msg; - int dest; - GCC_check_connections (); - GNUNET_assert (NULL != destroyed); - LOG (GNUNET_ERROR_TYPE_DEBUG, "connection_pop on conn %p\n", c); - for (q = peer->queue_head; NULL != q; q = next) - { - next = q->next; - if (q->c != c) - continue; - LOG (GNUNET_ERROR_TYPE_DEBUG, " - queued: %s (%s %u), cont: %p\n", - GC_m2s (q->type), GC_m2s (q->payload_type), q->payload_id, - q->cont); - switch (q->type) - { - case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE: - case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK: - case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY: - case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN: - case GNUNET_MESSAGE_TYPE_CADET_ACK: - case GNUNET_MESSAGE_TYPE_CADET_POLL: - dest = GCP_queue_destroy (q, GNUNET_YES, GNUNET_NO, 0); - if (GNUNET_YES == dest) - { - GNUNET_break (GNUNET_NO == *destroyed); - *destroyed = GNUNET_YES; - } - continue; - - case GNUNET_MESSAGE_TYPE_CADET_KX: - case GNUNET_MESSAGE_TYPE_CADET_AX: - case GNUNET_MESSAGE_TYPE_CADET_AX_KX: - msg = (struct GNUNET_MessageHeader *) q->cls; - dest = GCP_queue_destroy (q, GNUNET_NO, GNUNET_NO, 0); - if (GNUNET_YES == dest) - { - GNUNET_break (GNUNET_NO == *destroyed); - *destroyed = GNUNET_YES; - } - return msg; - - default: - GNUNET_break (0); - LOG (GNUNET_ERROR_TYPE_DEBUG, "Unknown message %s\n", GC_m2s (q->type)); - } - } - GCC_check_connections (); - return NULL; + return q; } /** - * Unlock a possibly locked queue for a connection. + * Cancel sending a message. Message must have been sent with + * #GCP_send before. May not be called after the notify sent + * callback has been called. * - * If there is a message that can be sent on this connection, call core for it. - * Otherwise (if core transmit is already called or there is no sendable - * message) do nothing. + * It DOES call the continuation given to #GCP_send. * - * @param peer Peer who keeps the queue. - * @param c Connection whose messages to unlock. + * @param q Queue handle to cancel */ void -GCP_queue_unlock (struct CadetPeer *peer, struct CadetConnection *c) +GCP_send_cancel (struct CadetPeerQueue *q) { - struct CadetPeerQueue *q; - size_t size; - - GCC_check_connections (); - if (NULL != peer->core_transmit) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, " already unlocked!\n"); - return; /* Already unlocked */ - } - - q = connection_get_first_message (peer, c); - if (NULL == q) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, " queue empty!\n"); - return; /* Nothing to transmit */ - } - - size = q->size; - peer->core_transmit = - GNUNET_CORE_notify_transmit_ready (core_handle, - GNUNET_NO, get_priority (q), - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_PEER_resolve2 (peer->id), - get_core_size (size), - &queue_send, - peer); - peer->tmt_time = GNUNET_TIME_absolute_get (); - GCC_check_connections (); + call_peer_cont (q, GNUNET_NO); + GNUNET_MQ_send_cancel (q->env); + GNUNET_free (q); } @@ -1824,23 +1258,12 @@ GCP_init (const struct GNUNET_CONFIGURATION_Handle *c) LOG (GNUNET_ERROR_TYPE_WARNING, "**************************************\n"); } ats_ch = GNUNET_ATS_connectivity_init (c); - core_handle = GNUNET_CORE_connect (c, /* Main configuration */ - NULL, /* Closure passed to CADET functions */ - &core_init, /* Call core_init once connected */ - &core_connect, /* Handle connects */ - &core_disconnect, /* remove peers on disconnects */ - NULL, /* Don't notify about all incoming messages */ - GNUNET_NO, /* For header only in notification */ - NULL, /* Don't notify about all outbound messages */ - GNUNET_NO, /* For header-only out notification */ - core_handlers); /* Register these handlers */ + connect_to_core (c); if (NULL == core_handle) { GNUNET_break (0); GNUNET_SCHEDULER_shutdown (); - return; } - } @@ -1853,13 +1276,10 @@ GCP_shutdown (void) LOG (GNUNET_ERROR_TYPE_DEBUG, "Shutting down peer subsystem\n"); in_shutdown = GNUNET_YES; - GNUNET_CONTAINER_multipeermap_iterate (peers, - &shutdown_peer, - NULL); if (NULL != core_handle) { - GNUNET_CORE_disconnect (core_handle); - core_handle = NULL; + GNUNET_CORE_disconnecT (core_handle); + core_handle = NULL; } if (NULL != ats_ch) { @@ -1867,6 +1287,12 @@ GCP_shutdown (void) ats_ch = NULL; } GNUNET_PEER_change_rc (myid, -1); + /* With MQ API, CORE calls the disconnect handler for every peer + * after calling GNUNET_CORE_disconnecT, shutdown must occur *after* that. + */ + GNUNET_CONTAINER_multipeermap_iterate (peers, + &shutdown_peer, + NULL); GNUNET_CONTAINER_multipeermap_destroy (peers); peers = NULL; } @@ -2054,7 +1480,6 @@ GCP_is_neighbor (const struct CadetPeer *peer) } /* Is not a neighbor but connections is not NULL, probably disconnecting */ - GNUNET_break (0); return GNUNET_NO; } @@ -2254,7 +1679,8 @@ GCP_add_path_to_all (const struct CadetPeerPath *p, int confirmed) { unsigned int i; - /* TODO: invert and add */ + /* TODO: invert and add to origin */ + /* TODO: replace all "GCP_add_path" with this, make the other one static */ GCC_check_connections (); for (i = 0; i < p->length && p->peers[i] != myid; i++) /* skip'em */ ; for (i++; i < p->length; i++) diff --git a/src/cadet/gnunet-service-cadet_peer.h b/src/cadet/gnunet-service-cadet_peer.h index 950c68fb6..093cfa21a 100644 --- a/src/cadet/gnunet-service-cadet_peer.h +++ b/src/cadet/gnunet-service-cadet_peer.h @@ -47,7 +47,7 @@ extern "C" struct CadetPeer; /** - * Struct containing info about a queued transmission to this peer + * Handle to queued messages on a peer level. */ struct CadetPeerQueue; @@ -59,18 +59,19 @@ struct CadetPeerQueue; * * @param cls Closure. * @param c Connection this message was on. + * @param fwd Was this a FWD going message? * @param sent Was it really sent? (Could have been canceled) * @param type Type of message sent. - * @param pid Packet ID, or 0 if not applicable (create, destroy, etc). - * @param fwd Was this a FWD going message? + * @param payload_type Type of payload, if applicable. + * @param pid Message ID, or 0 if not applicable (create, destroy, etc). * @param size Size of the message. * @param wait Time spent waiting for core (only the time for THIS message) - * @return #GNUNET_YES if connection was destroyed, #GNUNET_NO otherwise. */ -typedef int +typedef void (*GCP_sent) (void *cls, - struct CadetConnection *c, int sent, - uint16_t type, uint32_t pid, int fwd, size_t size, + struct CadetConnection *c, int fwd, int sent, + uint16_t type, uint16_t payload_type, uint32_t pid, + size_t size, struct GNUNET_TIME_Relative wait); /** @@ -146,97 +147,40 @@ void GCP_connect (struct CadetPeer *peer); /** - * Free a transmission that was already queued with all resources - * associated to the request. - * - * If connection was marked to be destroyed, and this was the last queued - * message on it, the connection will be free'd as a result. - * - * @param queue Queue handler to cancel. - * @param clear_cls Is it necessary to free associated cls? - * @param sent Was it really sent? (Could have been canceled) - * @param pid PID, if relevant (was sent and was a payload message). - * - * @return #GNUNET_YES if connection was destroyed as a result, - * #GNUNET_NO otherwise. - */ -int -GCP_queue_destroy (struct CadetPeerQueue *queue, int clear_cls, - int sent, uint32_t pid); - -/** - * @brief Queue and pass message to core when possible. + * @brief Send a message to another peer (using CORE). * * @param peer Peer towards which to queue the message. - * @param cls Closure (@c type dependant). It will be used by queue_send to - * build the message to be sent if not already prebuilt. - * @param type Type of the message. - * @param payload_type Type of the message's payload + * @param message Message to send. + * @param payload_type Type of the message's payload, for debug messages. * 0 if the message is a retransmission (unknown payload). * UINT16_MAX if the message does not have payload. * @param payload_id ID of the payload (MID, ACK #, etc) - * @param size Size of the message. * @param c Connection this message belongs to (can be NULL). * @param fwd Is this a message going root->dest? (FWD ACK are NOT FWD!) - * @param cont Continuation to be called once CORE has taken the message. + * @param cont Continuation to be called once CORE has sent the message. * @param cont_cls Closure for @c cont. - * - * @return Handle to cancel the message before it is sent. Once cont is called - * message has been sent and therefore the handle is no longer valid. */ struct CadetPeerQueue * -GCP_queue_add (struct CadetPeer *peer, - void *cls, - uint16_t type, - uint16_t payload_type, - uint32_t payload_id, - size_t size, - struct CadetConnection *c, - int fwd, - GCP_sent cont, - void *cont_cls); - -/** - * Cancel all queued messages to a peer that belong to a certain connection. - * - * @param peer Peer towards whom to cancel. - * @param c Connection whose queued messages to cancel. Might be destroyed by - * the sent continuation call. - */ -void -GCP_queue_cancel (struct CadetPeer *peer, struct CadetConnection *c); - -/** - * Get the first message for a connection and unqueue it. - * - * Only tunnel (or higher) level messages are unqueued. Connection specific - * messages are silently destroyed upon encounter. - * - * @param peer Neighboring peer. - * @param c Connection. - * @param destroyed[in/out] Was the connection destroyed as a result?. - * Can NOT be NULL. - * - * - * @return First message for this connection. - */ -struct GNUNET_MessageHeader * -GCP_connection_pop (struct CadetPeer *peer, - struct CadetConnection *c, - int *destroyed); +GCP_send (struct CadetPeer *peer, + const struct GNUNET_MessageHeader *message, + uint16_t payload_type, + uint32_t payload_id, + struct CadetConnection *c, + int fwd, + GCP_sent cont, + void *cont_cls); /** - * Unlock a possibly locked queue for a connection. + * Cancel sending a message. Message must have been sent with + * #GCP_send before. May not be called after the notify sent + * callback has been called. * - * If there is a message that can be sent on this connection, call core for it. - * Otherwise (if core transmit is already called or there is no sendable - * message) do nothing. + * It does NOT call the continuation given to #GCP_send. * - * @param peer Peer who keeps the queue. - * @param c Connection whose messages to unlock. + * @param q Queue handle to cancel */ void -GCP_queue_unlock (struct CadetPeer *peer, struct CadetConnection *c); +GCP_send_cancel (struct CadetPeerQueue *q); /** * Set tunnel. diff --git a/src/cadet/gnunet-service-cadet_tunnel.c b/src/cadet/gnunet-service-cadet_tunnel.c index 0ede4a886..e60c3c023 100644 --- a/src/cadet/gnunet-service-cadet_tunnel.c +++ b/src/cadet/gnunet-service-cadet_tunnel.c @@ -1600,7 +1600,6 @@ send_kx (struct CadetTunnel *t, { GNUNET_break (0); GCT_debug (t, GNUNET_ERROR_TYPE_ERROR); - GCP_debug (t->peer, GNUNET_ERROR_TYPE_ERROR); } return NULL; } @@ -2245,6 +2244,10 @@ GCT_handle_encrypted (struct CadetTunnel *t, * * @param t Tunnel on which the message came. * @param message Payload of KX message. + * + * FIXME: not needed anymore + * - substitute with call to kx_ax + * - eliminate encapsulation */ void GCT_handle_kx (struct CadetTunnel *t, @@ -3366,34 +3369,6 @@ GCT_send_ax_kx (struct CadetTunnel *t, int force_reply) } -/** - * Sends an already built and encrypted message on a tunnel, choosing the best - * connection. Useful for re-queueing messages queued on a destroyed connection. - * - * @param message Message to send. Function modifies it. - * @param t Tunnel on which this message is transmitted. - */ -void -GCT_resend_message (const struct GNUNET_MessageHeader *message, - struct CadetTunnel *t) -{ - struct CadetConnection *c; - int fwd; - - c = tunnel_get_connection (t); - if (NULL == c) - { - /* TODO queue in tunnel, marked as encrypted */ - LOG (GNUNET_ERROR_TYPE_DEBUG, "No connection available, dropping.\n"); - return; - } - fwd = GCC_is_origin (c, GNUNET_YES); - GNUNET_break (NULL == GCC_send_prebuilt_message (message, UINT16_MAX, 0, - c, fwd, - GNUNET_YES, NULL, NULL)); -} - - /** * Is the tunnel directed towards the local peer? * diff --git a/src/cadet/gnunet-service-cadet_tunnel.h b/src/cadet/gnunet-service-cadet_tunnel.h index ca553a7d3..8d65cbebd 100644 --- a/src/cadet/gnunet-service-cadet_tunnel.h +++ b/src/cadet/gnunet-service-cadet_tunnel.h @@ -503,18 +503,6 @@ void GCT_send_ax_kx (struct CadetTunnel *t, int force_reply); -/** - * Sends an already built and encrypted message on a tunnel, choosing the best - * connection. Useful for re-queueing messages queued on a destroyed connection. - * - * @param message Message to send. Function modifies it. - * @param t Tunnel on which this message is transmitted. - */ -void -GCT_resend_message (const struct GNUNET_MessageHeader *message, - struct CadetTunnel *t); - - /** * Is the tunnel directed towards the local peer? *