int retry_counter;
- struct GNUNET_PeerIdentity target;
-
struct GNUNET_TIME_Relative timeout;
unsigned int priority;
- void *addr;
- size_t addrlen;
struct Session *session;
GNUNET_TRANSPORT_TransmitContinuation cont;
void *cont_cls;
struct GNUNET_ATS_Information ats_network;
};
+
+static int
+get_session_delete_it (void *cls, const GNUNET_HashCode * key, void *value)
+{
+ struct Session *s = value;
+ struct Plugin *plugin = cls;
+ GNUNET_assert (plugin != NULL);
+
+#if DEBUG_UNIX
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Deleting session for peer `%s' `%s' \n", GNUNET_i2s (&s->target), s->addr);
+#endif
+
+ plugin->env->session_end (plugin->env->cls, &s->target, s);
+
+ GNUNET_CONTAINER_multihashmap_remove(plugin->session_map, &s->target.hashPubKey, s);
+
+ GNUNET_free (s);
+
+ return GNUNET_YES;
+}
+
/**
* Disconnect from a remote node. Clean up session if we have one for this peer
*
void
unix_disconnect (void *cls, const struct GNUNET_PeerIdentity *target)
{
- /** TODO: Implement! */
+ struct Plugin *plugin = cls;
+ GNUNET_assert (plugin != NULL);
+
+ GNUNET_CONTAINER_multihashmap_get_multiple (plugin->session_map, &target->hashPubKey, &get_session_delete_it, plugin);
return;
}
{
GNUNET_CONTAINER_DLL_remove (plugin->msg_head, plugin->msg_tail, msgw);
if (msgw->cont != NULL)
- msgw->cont (msgw->cont_cls, &msgw->target, GNUNET_SYSERR);
+ msgw->cont (msgw->cont_cls, &msgw->session->target, GNUNET_SYSERR);
GNUNET_free (msgw->msg);
GNUNET_free (msgw);
}
}
#if DEBUG_UNIX
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"UNIX transmit %u-byte message to %s (%d: %s)\n",
(unsigned int) msgbuf_size, GNUNET_a2s (sb, sbs), (int) sent,
(sent < 0) ? STRERROR (errno) : "ok");
struct gsi_ctx
{
- const struct GNUNET_HELLO_Address *address;
+ char *address;
+ size_t addrlen;
struct Session *res;
};
struct gsi_ctx *gsi = cls;
struct Session *s = value;
- if ((gsi->address->address_length == s->addrlen) &&
- (0 == memcmp (gsi->address->address, s->addr, s->addrlen)))
+#if DEBUG_UNIX
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Comparing session %s %s\n", gsi->address, s->addr);
+#endif
+ if ((gsi->addrlen == s->addrlen) &&
+ (0 == memcmp (gsi->address, s->addr, s->addrlen)))
{
gsi->res = s;
return GNUNET_NO;
GNUNET_assert (address != NULL);
/* Check if already existing */
- gsi.address = address;
+ gsi.address = (char *) address->address;
+ gsi.addrlen = address->address_length;
gsi.res = NULL;
GNUNET_CONTAINER_multihashmap_get_multiple (plugin->session_map, &address->peer.hashPubKey, &get_session_it, &gsi);
+ if (gsi.res != NULL)
+ {
+#if DEBUG_UNIX
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Found existing session\n");
+#endif
+ return gsi.res;
+ }
/* Create a new session */
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "To be implemented\n");
- GNUNET_break (0);
+ s = GNUNET_malloc (sizeof (struct Session) + address->address_length);
+ s->addr = &s[1];
+ s->addrlen = address->address_length;
+ memcpy(s->addr, address->address, s->addrlen);
+ memcpy(&s->target, &address->peer, sizeof (struct GNUNET_PeerIdentity));
+
+ GNUNET_CONTAINER_multihashmap_put (plugin->session_map,
+ &address->peer.hashPubKey, s,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+#if DEBUG_UNIX
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Creating new session\n");
+#endif
+
return s;
}
struct GNUNET_TIME_Relative to,
GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
{
- ssize_t sent = -1;
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "To be implemented\n");
- GNUNET_break (0);
- return sent;
+ struct Plugin *plugin = cls;
+ struct UNIXMessageWrapper *wrapper;
+ struct UNIXMessage *message;
+ int ssize;
+
+ GNUNET_assert (plugin != NULL);
+ GNUNET_assert (session != NULL);
+
+ if (GNUNET_OK != GNUNET_CONTAINER_multihashmap_contains_value(plugin->session_map,
+ &session->target.hashPubKey, session))
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+
+ ssize = sizeof (struct UNIXMessage) + msgbuf_size;
+ message = GNUNET_malloc (sizeof (struct UNIXMessage) + msgbuf_size);
+ message->header.size = htons (ssize);
+ message->header.type = htons (0);
+ memcpy (&message->sender, plugin->env->my_identity,
+ sizeof (struct GNUNET_PeerIdentity));
+ memcpy (&message[1], msgbuf, msgbuf_size);
+
+ wrapper = GNUNET_malloc (sizeof (struct UNIXMessageWrapper));
+ wrapper->msg = message;
+ wrapper->msgsize = ssize;
+ wrapper->priority = priority;
+ wrapper->timeout = to;
+ wrapper->cont = cont;
+ wrapper->cont_cls = cont_cls;
+ wrapper->retry_counter = 0;
+ wrapper->session = session;
+
+ GNUNET_CONTAINER_DLL_insert(plugin->msg_head, plugin->msg_tail, wrapper);
+
+#if DEBUG_UNIX
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sent %d bytes to `%s'\n", sent,
+ (char *) session->addr);
+#endif
+
+ return ssize;
}
struct UNIXMessage *message;
struct UNIXMessageWrapper *wrapper;
int ssize;
-
- GNUNET_assert (NULL == session);
+ struct gsi_ctx gsi;
/* Build the message to be sent */
wrapper = GNUNET_malloc (sizeof (struct UNIXMessageWrapper) + addrlen);
sizeof (struct GNUNET_PeerIdentity));
memcpy (&message[1], msgbuf, msgbuf_size);
+ if (session == NULL)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Looking for existing session\n");
+ gsi.address = (char *) addr;
+ gsi.addrlen = addrlen;
+ gsi.res = NULL;
+ GNUNET_CONTAINER_multihashmap_get_multiple (plugin->session_map, &target->hashPubKey, &get_session_it, &gsi);
+ wrapper->session = gsi.res;
+ if (gsi.res == NULL)
+ {
+ wrapper->session = GNUNET_malloc (sizeof (struct Session) + addrlen);
+ wrapper->session->addr = &wrapper->session[1];
+ wrapper->session->addrlen = addrlen;
+ memcpy(wrapper->session->addr, addr, wrapper->session->addrlen);
+ memcpy(&wrapper->session->target, target, sizeof (struct GNUNET_PeerIdentity));
+ GNUNET_CONTAINER_multihashmap_put (plugin->session_map,
+ &target->hashPubKey, wrapper->session,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Created new session for `%s'\n", addr);
+ }
+ else
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Found existing session\n");
+
+ }
+ else
+ wrapper->session = session;
+
wrapper->msg = message;
wrapper->msgsize = ssize;
wrapper->priority = priority;
wrapper->timeout = timeout;
wrapper->cont = cont;
wrapper->cont_cls = cont_cls;
- wrapper->addr = &wrapper[1];
- wrapper->addrlen = addrlen;
wrapper->retry_counter = 0;
- memcpy (&wrapper->target, target, sizeof (struct GNUNET_PeerIdentity));
- memcpy (&wrapper[1], addr, addrlen);
-
GNUNET_CONTAINER_DLL_insert(plugin->msg_head, plugin->msg_tail, wrapper);
#if DEBUG_UNIX
sent = unix_real_send (plugin,
plugin->unix_sock.desc,
- &msgw->target,
+ &msgw->session->target,
(const char *) msgw->msg,
msgw->msgsize,
msgw->priority,
msgw->timeout,
- msgw->addr,
- msgw->addrlen,
+ msgw->session->addr,
+ msgw->session->addrlen,
msgw->cont, msgw->cont_cls);
/* successfully sent bytes */
if (sent > 0)
{
GNUNET_CONTAINER_DLL_remove(plugin->msg_head, plugin->msg_tail, msgw);
+ GNUNET_free (msgw->msg);
GNUNET_free (msgw);
return;
}
/* max retries */
if (msgw->retry_counter > MAX_RETRIES)
{
- msgw->cont (msgw->cont_cls, &msgw->target, GNUNET_SYSERR);
+ msgw->cont (msgw->cont_cls, &msgw->session->target, GNUNET_SYSERR);
GNUNET_CONTAINER_DLL_remove(plugin->msg_head, plugin->msg_tail, msgw);
GNUNET_break (0);
+ GNUNET_free (msgw->msg);
GNUNET_free (msgw);
return;
}
if (sent == -1)
{
GNUNET_CONTAINER_DLL_remove(plugin->msg_head, plugin->msg_tail, msgw);
+ GNUNET_free (msgw->msg);
GNUNET_free (msgw);
return;
}
unix_transport_server_stop (plugin);
+ GNUNET_CONTAINER_multihashmap_iterate (plugin->session_map, &get_session_delete_it, plugin);
GNUNET_CONTAINER_multihashmap_destroy (plugin->session_map);
GNUNET_NETWORK_fdset_destroy (plugin->rs);