big scheduler refactoring, expect some issues
[oweals/gnunet.git] / src / dv / dv_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file dv/dv_api.c
23  * @brief library to access the DV service
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  */
27 #include "platform.h"
28 #include "gnunet_bandwidth_lib.h"
29 #include "gnunet_client_lib.h"
30 #include "gnunet_constants.h"
31 #include "gnunet_container_lib.h"
32 #include "gnunet_arm_service.h"
33 #include "gnunet_hello_lib.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_server_lib.h"
36 #include "gnunet_time_lib.h"
37 #include "gnunet_dv_service.h"
38 #include "dv.h"
39 #include "../transport/plugin_transport.h"
40
41 /**
42  * Store ready to send messages
43  */
44 struct PendingMessages
45 {
46   /**
47    * Linked list of pending messages
48    */
49   struct PendingMessages *next;
50
51   /**
52    * Message that is pending
53    */
54   struct GNUNET_DV_SendMessage *msg;
55
56   /**
57    * Timeout for this message
58    */
59   struct GNUNET_TIME_Absolute timeout;
60
61 };
62
63 /**
64  * Handle for the service.
65  */
66 struct GNUNET_DV_Handle
67 {
68
69   /**
70    * Configuration to use.
71    */
72   const struct GNUNET_CONFIGURATION_Handle *cfg;
73
74   /**
75    * Socket (if available).
76    */
77   struct GNUNET_CLIENT_Connection *client;
78
79   /**
80    * Currently pending transmission request.
81    */
82   struct GNUNET_CLIENT_TransmitHandle *th;
83
84   /**
85    * List of the currently pending messages for the DV service.
86    */
87   struct PendingMessages *pending_list;
88
89   /**
90    * Message we are currently sending.
91    */
92   struct PendingMessages *current;
93
94   /**
95    * Handler for messages we receive from the DV service
96    */
97   GNUNET_DV_MessageReceivedHandler receive_handler;
98
99   /**
100    * Closure for the receive handler
101    */
102   void *receive_cls;
103
104   /**
105    * Current unique ID
106    */
107   uint32_t uid_gen;
108
109   /**
110    * Hashmap containing outstanding send requests awaiting confirmation.
111    */
112   struct GNUNET_CONTAINER_MultiHashMap *send_callbacks;
113
114 };
115
116
117 struct StartContext
118 {
119   /**
120    * Start message
121    */
122   struct GNUNET_MessageHeader *message;
123
124   /**
125    * Handle to service, in case of timeout
126    */
127   struct GNUNET_DV_Handle *handle;
128 };
129
130 struct SendCallbackContext
131 {
132   /**
133    * The continuation to call once a message is confirmed sent (or failed)
134    */
135   GNUNET_TRANSPORT_TransmitContinuation cont;
136
137   /**
138    * Closure to call with send continuation.
139    */
140   void *cont_cls;
141
142   /**
143    * Target of the message.
144    */
145   struct GNUNET_PeerIdentity target;
146 };
147
148 /**
149  * Convert unique ID to hash code.
150  *
151  * @param uid unique ID to convert
152  * @param hash set to uid (extended with zeros)
153  */
154 static void
155 hash_from_uid (uint32_t uid,
156                GNUNET_HashCode *hash)
157 {
158   memset (hash, 0, sizeof(GNUNET_HashCode));
159   *((uint32_t*)hash) = uid;
160 }
161
162 /**
163  * Try to (re)connect to the dv service.
164  *
165  * @param ret handle to the (disconnected) dv service
166  *
167  * @return GNUNET_YES on success, GNUNET_NO on failure.
168  */
169 static int
170 try_connect (struct GNUNET_DV_Handle *ret)
171 {
172   if (ret->client != NULL)
173     return GNUNET_OK;
174   ret->client = GNUNET_CLIENT_connect ("dv", ret->cfg);
175   if (ret->client != NULL)
176     return GNUNET_YES;
177 #if DEBUG_DV_MESSAGES
178   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
179               _("Failed to connect to the dv service!\n"));
180 #endif
181   return GNUNET_NO;
182 }
183
184 static void process_pending_message(struct GNUNET_DV_Handle *handle);
185
186 /**
187  * Send complete, schedule next
188  *
189  * @param handle handle to the dv service
190  * @param code return code for send (unused)
191  */
192 static void
193 finish (struct GNUNET_DV_Handle *handle, int code)
194 {
195   struct PendingMessages *pos = handle->current;
196   handle->current = NULL;
197   process_pending_message (handle);
198
199   GNUNET_free(pos->msg);
200   GNUNET_free (pos);
201 }
202
203 /**
204  * Notification that we can send data
205  *
206  * @param cls handle to the dv service (struct GNUNET_DV_Handle)
207  * @param size how many bytes can we send
208  * @param buf where to copy the message to send
209  *
210  * @return how many bytes we copied to buf
211  */
212 static size_t
213 transmit_pending (void *cls, size_t size, void *buf)
214 {
215   struct GNUNET_DV_Handle *handle = cls;
216   size_t ret;
217   size_t tsize;
218
219 #if DEBUG_DV
220   if (handle->current != NULL)
221     GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "DV API: Transmit pending called with message type %d\n", ntohs(handle->current->msg->header.type));
222 #endif
223
224   if (buf == NULL)
225     {
226 #if DEBUG_DV
227       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "DV API: Transmit pending FAILED!\n\n\n");
228 #endif
229       finish(handle, GNUNET_SYSERR);
230       return 0;
231     }
232   handle->th = NULL;
233
234   ret = 0;
235
236   if (handle->current != NULL)
237   {
238     tsize = ntohs(handle->current->msg->header.size);
239     if (size >= tsize)
240     {
241       memcpy(buf, handle->current->msg, tsize);
242 #if DEBUG_DV
243       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "DV API: Copied %d bytes into buffer!\n\n\n", tsize);
244 #endif
245       finish(handle, GNUNET_OK);
246       return tsize;
247     }
248
249   }
250
251   return ret;
252 }
253
254 /**
255  * Try to send messages from list of messages to send
256  *
257  * @param handle handle to the distance vector service
258  */
259 static void process_pending_message(struct GNUNET_DV_Handle *handle)
260 {
261
262   if (handle->current != NULL)
263     return;                     /* action already pending */
264   if (GNUNET_YES != try_connect (handle))
265     {
266       finish (handle, GNUNET_SYSERR);
267       return;
268     }
269
270   /* schedule next action */
271   handle->current = handle->pending_list;
272   if (NULL == handle->current)
273     {
274       return;
275     }
276   handle->pending_list = handle->pending_list->next;
277   handle->current->next = NULL;
278
279   if (NULL ==
280       (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
281                                                          ntohs(handle->current->msg->header.size),
282                                                          handle->current->msg->timeout,
283                                                          GNUNET_YES,
284                                                          &transmit_pending, handle)))
285     {
286 #if DEBUG_DV
287       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
288                   "Failed to transmit request to dv service.\n");
289 #endif
290       finish (handle, GNUNET_SYSERR);
291     }
292 }
293
294 /**
295  * Add a pending message to the linked list
296  *
297  * @param handle handle to the specified DV api
298  * @param msg the message to add to the list
299  */
300 static void add_pending(struct GNUNET_DV_Handle *handle, struct GNUNET_DV_SendMessage *msg)
301 {
302   struct PendingMessages *new_message;
303   struct PendingMessages *pos;
304   struct PendingMessages *last;
305
306   new_message = GNUNET_malloc(sizeof(struct PendingMessages));
307   new_message->msg = msg;
308
309   if (handle->pending_list != NULL)
310     {
311       pos = handle->pending_list;
312       while(pos != NULL)
313         {
314           last = pos;
315           pos = pos->next;
316         }
317       last->next = new_message;
318     }
319   else
320     {
321       handle->pending_list = new_message;
322     }
323
324   process_pending_message(handle);
325 }
326
327 /**
328  * Handles a message sent from the DV service to us.
329  * Parse it out and give it to the plugin.
330  *
331  * @param cls the handle to the DV API
332  * @param msg the message that was received
333  */
334 void handle_message_receipt (void *cls,
335                              const struct GNUNET_MessageHeader * msg)
336 {
337   struct GNUNET_DV_Handle *handle = cls;
338   struct GNUNET_DV_MessageReceived *received_msg;
339   struct GNUNET_DV_SendResultMessage *send_result_msg;
340   size_t packed_msg_len;
341   size_t sender_address_len;
342   char *sender_address;
343   char *packed_msg;
344   char *packed_msg_start;
345   GNUNET_HashCode uidhash;
346   struct SendCallbackContext *send_ctx;
347
348   if (msg == NULL)
349   {
350 #if DEBUG_DV_MESSAGES
351     GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "DV_API receive: connection closed\n");
352 #endif
353     return; /* Connection closed? */
354   }
355
356   GNUNET_assert((ntohs(msg->type) == GNUNET_MESSAGE_TYPE_TRANSPORT_DV_RECEIVE) || (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_TRANSPORT_DV_SEND_RESULT));
357
358   switch (ntohs(msg->type))
359   {
360   case GNUNET_MESSAGE_TYPE_TRANSPORT_DV_RECEIVE:
361     if (ntohs(msg->size) < sizeof(struct GNUNET_DV_MessageReceived))
362       return;
363
364     received_msg = (struct GNUNET_DV_MessageReceived *)msg;
365     packed_msg_len = ntohl(received_msg->msg_len);
366     sender_address_len = ntohs(msg->size) - packed_msg_len - sizeof(struct GNUNET_DV_MessageReceived);
367     GNUNET_assert(sender_address_len > 0);
368     sender_address = GNUNET_malloc(sender_address_len);
369     memcpy(sender_address, &received_msg[1], sender_address_len);
370     packed_msg_start = (char *)&received_msg[1];
371     packed_msg = GNUNET_malloc(packed_msg_len);
372     memcpy(packed_msg, &packed_msg_start[sender_address_len], packed_msg_len);
373
374 #if DEBUG_DV_MESSAGES
375     GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "DV_API receive: packed message type: %d or %d\n", ntohs(((struct GNUNET_MessageHeader *)packed_msg)->type), ((struct GNUNET_MessageHeader *)packed_msg)->type);
376     GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "DV_API receive: message sender reported as %s\n", GNUNET_i2s(&received_msg->sender));
377     GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "DV_API receive: distance is %u\n", ntohl(received_msg->distance));
378 #endif
379
380     handle->receive_handler(handle->receive_cls,
381                             &received_msg->sender,
382                             packed_msg,
383                             packed_msg_len,
384                             ntohl(received_msg->distance),
385                             sender_address,
386                             sender_address_len);
387
388     GNUNET_free(sender_address);
389     break;
390   case GNUNET_MESSAGE_TYPE_TRANSPORT_DV_SEND_RESULT:
391     if (ntohs(msg->size) < sizeof(struct GNUNET_DV_SendResultMessage))
392       return;
393
394     send_result_msg = (struct GNUNET_DV_SendResultMessage *)msg;
395     hash_from_uid(ntohl(send_result_msg->uid), &uidhash);
396     send_ctx = GNUNET_CONTAINER_multihashmap_get(handle->send_callbacks, &uidhash);
397
398     if ((send_ctx != NULL) && (send_ctx->cont != NULL))
399       {
400         if (ntohl(send_result_msg->result) == 0)
401           {
402             send_ctx->cont(send_ctx->cont_cls, &send_ctx->target, GNUNET_OK);
403           }
404         else
405           {
406             send_ctx->cont(send_ctx->cont_cls, &send_ctx->target, GNUNET_SYSERR);
407           }
408       }
409     GNUNET_free_non_null(send_ctx);
410     break;
411   default:
412     break;
413   }
414   GNUNET_CLIENT_receive (handle->client,
415                          &handle_message_receipt,
416                          handle, GNUNET_TIME_UNIT_FOREVER_REL);
417 }
418
419 /**
420  * Send a message from the plugin to the DV service indicating that
421  * a message should be sent via DV to some peer.
422  *
423  * @param dv_handle the handle to the DV api
424  * @param target the final target of the message
425  * @param msgbuf the msg(s) to send
426  * @param msgbuf_size the size of msgbuf
427  * @param priority priority to pass on to core when sending the message
428  * @param timeout how long can this message be delayed (pass through to core)
429  * @param addr the address of this peer (internally known to DV)
430  * @param addrlen the length of the peer address
431  * @param cont continuation to call once the message has been sent (or failed)
432  * @param cont_cls closure for continuation
433  *
434  */
435 int GNUNET_DV_send (struct GNUNET_DV_Handle *dv_handle,
436                     const struct GNUNET_PeerIdentity *target,
437                     const char *msgbuf,
438                     size_t msgbuf_size,
439                     unsigned int priority,
440                     struct GNUNET_TIME_Relative timeout,
441                     const void *addr,
442                     size_t addrlen,
443                     GNUNET_TRANSPORT_TransmitContinuation
444                     cont, void *cont_cls)
445 {
446   struct GNUNET_DV_SendMessage *msg;
447   struct SendCallbackContext *send_ctx;
448   char *end_of_message;
449   GNUNET_HashCode uidhash;
450   int msize;
451 #if DEBUG_DV_MESSAGES
452   dv_handle->uid_gen = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, UINT32_MAX);
453 #else
454   dv_handle->uid_gen++;
455 #endif
456
457   msize = sizeof(struct GNUNET_DV_SendMessage) + addrlen + msgbuf_size;
458   msg = GNUNET_malloc(msize);
459   msg->header.size = htons(msize);
460   msg->header.type = htons(GNUNET_MESSAGE_TYPE_TRANSPORT_DV_SEND);
461   memcpy(&msg->target, target, sizeof(struct GNUNET_PeerIdentity));
462   msg->priority = htonl(priority);
463   msg->timeout = timeout;
464   msg->addrlen = htonl(addrlen);
465   msg->uid = htonl(dv_handle->uid_gen);
466   memcpy(&msg[1], addr, addrlen);
467   end_of_message = (char *)&msg[1];
468   end_of_message = &end_of_message[addrlen];
469   memcpy(end_of_message, msgbuf, msgbuf_size);
470   add_pending(dv_handle, msg);
471   send_ctx = GNUNET_malloc(sizeof(struct SendCallbackContext));
472   send_ctx->cont = cont;
473   send_ctx->cont_cls = cont_cls;
474   memcpy(&send_ctx->target, target, sizeof(struct GNUNET_PeerIdentity));
475   hash_from_uid(dv_handle->uid_gen, &uidhash);
476   GNUNET_CONTAINER_multihashmap_put(dv_handle->send_callbacks, &uidhash, send_ctx, GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
477
478   return GNUNET_OK;
479 }
480
481 /**
482  * Callback to transmit a start message to
483  * the DV service, once we can send
484  *
485  * @param cls struct StartContext
486  * @param size how much can we send
487  * @param buf where to copy the message
488  *
489  * @return number of bytes copied to buf
490  */
491 static size_t
492 transmit_start (void *cls, size_t size, void *buf)
493 {
494   struct StartContext *start_context = cls;
495   struct GNUNET_DV_Handle *handle = start_context->handle;
496   size_t tsize;
497 #if DEBUG_DV
498   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "DV API: sending start request to service\n");
499 #endif
500   if (buf == NULL)
501     {
502       GNUNET_free(start_context->message);
503       GNUNET_free(start_context);
504       GNUNET_DV_disconnect(handle);
505       return 0;
506     }
507
508   tsize = ntohs(start_context->message->size);
509   if (size >= tsize)
510   {
511     memcpy(buf, start_context->message, tsize);
512     GNUNET_free(start_context->message);
513     GNUNET_free(start_context);
514     GNUNET_CLIENT_receive (handle->client,
515                            &handle_message_receipt,
516                            handle, GNUNET_TIME_UNIT_FOREVER_REL);
517
518
519     return tsize;
520   }
521
522   return 0;
523 }
524
525 /**
526  * Connect to the DV service
527  *
528  * @param cfg the configuration to use
529  * @param receive_handler method call when on receipt from the service
530  * @param receive_handler_cls closure for receive_handler
531  *
532  * @return handle to the DV service
533  */
534 struct GNUNET_DV_Handle *
535 GNUNET_DV_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
536                   GNUNET_DV_MessageReceivedHandler receive_handler,
537                   void *receive_handler_cls)
538 {
539   struct GNUNET_DV_Handle *handle;
540   struct GNUNET_MessageHeader *start_message;
541   struct StartContext *start_context;
542   handle = GNUNET_malloc(sizeof(struct GNUNET_DV_Handle));
543
544   handle->cfg = cfg;
545   handle->pending_list = NULL;
546   handle->current = NULL;
547   handle->th = NULL;
548   handle->client = GNUNET_CLIENT_connect("dv", cfg);
549   handle->receive_handler = receive_handler;
550   handle->receive_cls = receive_handler_cls;
551
552   if (handle->client == NULL)
553     {
554       GNUNET_free(handle);
555       return NULL;
556     }
557
558   start_message = GNUNET_malloc(sizeof(struct GNUNET_MessageHeader));
559   start_message->size = htons(sizeof(struct GNUNET_MessageHeader));
560   start_message->type = htons(GNUNET_MESSAGE_TYPE_DV_START);
561
562   start_context = GNUNET_malloc(sizeof(struct StartContext));
563   start_context->handle = handle;
564   start_context->message = start_message;
565   GNUNET_CLIENT_notify_transmit_ready (handle->client,
566                                        sizeof(struct GNUNET_MessageHeader),
567                                        GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 60),
568                                        GNUNET_YES,
569                                        &transmit_start, start_context);
570
571   handle->send_callbacks = GNUNET_CONTAINER_multihashmap_create(100);
572
573   return handle;
574 }
575
576 /**
577  * Disconnect from the DV service
578  *
579  * @param handle the current handle to the service to disconnect
580  */
581 void GNUNET_DV_disconnect(struct GNUNET_DV_Handle *handle)
582 {
583   struct PendingMessages *pos;
584
585   GNUNET_assert(handle != NULL);
586
587   if (handle->th != NULL) /* We have a live transmit request in the Aether */
588     {
589       GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
590       handle->th = NULL;
591     }
592   if (handle->current != NULL) /* We are trying to send something now, clean it up */
593     GNUNET_free(handle->current);
594   while (NULL != (pos = handle->pending_list)) /* Remove all pending sends from the list */
595     {
596       handle->pending_list = pos->next;
597       GNUNET_free(pos);
598     }
599   if (handle->client != NULL) /* Finally, disconnect from the service */
600     {
601       GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
602       handle->client = NULL;
603     }
604
605   GNUNET_free (handle);
606 }
607
608 /* end of dv_api.c */