46f170a23f61c48675d56ed55a5a725ec92c9a19
[oweals/gnunet.git] / src / peerstore / peerstore_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2013-2014 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20 /**
21  * @file peerstore/peerstore_api.c
22  * @brief API for peerstore
23  * @author Omar Tarabai
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "peerstore.h"
29 #include "peerstore_common.h"
30
31 #define LOG(kind,...) GNUNET_log_from (kind, "peerstore-api",__VA_ARGS__)
32
33 /******************************************************************************/
34 /************************      DATA STRUCTURES     ****************************/
35 /******************************************************************************/
36
37 /**
38  * Handle to the PEERSTORE service.
39  */
40 struct GNUNET_PEERSTORE_Handle
41 {
42
43   /**
44    * Our configuration.
45    */
46   const struct GNUNET_CONFIGURATION_Handle *cfg;
47
48   /**
49    * Connection to the service.
50    */
51   struct GNUNET_CLIENT_Connection *client;
52
53   /**
54    * Message queue
55    */
56   struct GNUNET_MQ_Handle *mq;
57
58   /**
59    * Head of active STORE requests.
60    */
61   struct GNUNET_PEERSTORE_StoreContext *store_head;
62
63   /**
64    * Tail of active STORE requests.
65    */
66   struct GNUNET_PEERSTORE_StoreContext *store_tail;
67
68   /**
69    * Head of active ITERATE requests.
70    */
71   struct GNUNET_PEERSTORE_IterateContext *iterate_head;
72
73   /**
74    * Tail of active ITERATE requests.
75    */
76   struct GNUNET_PEERSTORE_IterateContext *iterate_tail;
77
78   /**
79    * Hashmap of watch requests
80    */
81   struct GNUNET_CONTAINER_MultiHashMap *watches;
82
83   /**
84    * Are we in the process of disconnecting but need to sync first?
85    */
86   int disconnecting;
87
88 };
89
90 /**
91  * Context for a store request
92  */
93 struct GNUNET_PEERSTORE_StoreContext
94 {
95   /**
96    * Kept in a DLL.
97    */
98   struct GNUNET_PEERSTORE_StoreContext *next;
99
100   /**
101    * Kept in a DLL.
102    */
103   struct GNUNET_PEERSTORE_StoreContext *prev;
104
105   /**
106    * Handle to the PEERSTORE service.
107    */
108   struct GNUNET_PEERSTORE_Handle *h;
109
110   /**
111    * Continuation called with service response
112    */
113   GNUNET_PEERSTORE_Continuation cont;
114
115   /**
116    * Closure for @e cont
117    */
118   void *cont_cls;
119
120   /**
121    * Which subsystem does the store?
122    */
123   char *sub_system;
124
125   /**
126    * Key for the store operation.
127    */
128   char *key;
129
130   /**
131    * Contains @e size bytes.
132    */
133   void *value;
134
135   /**
136    * MQ Envelope with store request message
137    */
138   struct GNUNET_MQ_Envelope *ev;
139
140   /**
141    * Peer the store is for.
142    */
143   struct GNUNET_PeerIdentity peer;
144
145   /**
146    * Number of bytes in @e value.
147    */
148   size_t size;
149
150   /**
151    * When does the value expire?
152    */
153   struct GNUNET_TIME_Absolute expiry;
154
155   /**
156    * Options for the store operation.
157    */
158   enum GNUNET_PEERSTORE_StoreOption options;
159
160 };
161
162 /**
163  * Context for a iterate request
164  */
165 struct GNUNET_PEERSTORE_IterateContext
166 {
167   /**
168    * Kept in a DLL.
169    */
170   struct GNUNET_PEERSTORE_IterateContext *next;
171
172   /**
173    * Kept in a DLL.
174    */
175   struct GNUNET_PEERSTORE_IterateContext *prev;
176
177   /**
178    * Handle to the PEERSTORE service.
179    */
180   struct GNUNET_PEERSTORE_Handle *h;
181
182   /**
183    * MQ Envelope with iterate request message
184    */
185   struct GNUNET_MQ_Envelope *ev;
186
187   /**
188    * Callback with each matching record
189    */
190   GNUNET_PEERSTORE_Processor callback;
191
192   /**
193    * Closure for @e callback
194    */
195   void *callback_cls;
196
197   /**
198    * #GNUNET_YES / #GNUNET_NO
199    * Iterate request has been sent and we are still expecting records
200    */
201   int iterating;
202
203   /**
204    * Task identifier for the function called
205    * on iterate request timeout
206    */
207   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
208
209 };
210
211 /**
212  * Context for a watch request
213  */
214 struct GNUNET_PEERSTORE_WatchContext
215 {
216   /**
217    * Kept in a DLL.
218    */
219   struct GNUNET_PEERSTORE_WatchContext *next;
220
221   /**
222    * Kept in a DLL.
223    */
224   struct GNUNET_PEERSTORE_WatchContext *prev;
225
226   /**
227    * Handle to the PEERSTORE service.
228    */
229   struct GNUNET_PEERSTORE_Handle *h;
230
231   /**
232    * MQ Envelope with watch request message
233    */
234   struct GNUNET_MQ_Envelope *ev;
235
236   /**
237    * Callback with each record received
238    */
239   GNUNET_PEERSTORE_Processor callback;
240
241   /**
242    * Closure for @e callback
243    */
244   void *callback_cls;
245
246   /**
247    * Hash of the combined key
248    */
249   struct GNUNET_HashCode keyhash;
250
251   /**
252    * #GNUNET_YES / #GNUNET_NO
253    * if sent, cannot be canceled
254    */
255   int request_sent;
256
257 };
258
259 /******************************************************************************/
260 /*******************             DECLARATIONS             *********************/
261 /******************************************************************************/
262
263 /**
264  * When a response for iterate request is received
265  *
266  * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
267  * @param msg message received, NULL on timeout or fatal error
268  */
269 static void
270 handle_iterate_result (void *cls, const struct GNUNET_MessageHeader *msg);
271
272 /**
273  * When a watch record is received
274  *
275  * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
276  * @param msg message received, NULL on timeout or fatal error
277  */
278 static void
279 handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg);
280
281 /**
282  * Close the existing connection to PEERSTORE and reconnect.
283  *
284  * @param h handle to the service
285  */
286 static void
287 reconnect (struct GNUNET_PEERSTORE_Handle *h);
288
289 /**
290  * Callback after MQ envelope is sent
291  *
292  * @param cls a 'struct GNUNET_PEERSTORE_WatchContext *'
293  */
294 static void
295 watch_request_sent (void *cls)
296 {
297   struct GNUNET_PEERSTORE_WatchContext *wc = cls;
298
299   wc->request_sent = GNUNET_YES;
300   wc->ev = NULL;
301 }
302
303
304 /**
305  * Callback after MQ envelope is sent
306  *
307  * @param cls a `struct GNUNET_PEERSTORE_IterateContext *`
308  */
309 static void
310 iterate_request_sent (void *cls)
311 {
312   struct GNUNET_PEERSTORE_IterateContext *ic = cls;
313
314   LOG (GNUNET_ERROR_TYPE_DEBUG,
315        "Iterate request sent to service.\n");
316   ic->iterating = GNUNET_YES;
317   ic->ev = NULL;
318 }
319
320
321 /**
322  * Callback after MQ envelope is sent
323  *
324  * @param cls a `struct GNUNET_PEERSTORE_StoreContext *`
325  */
326 static void
327 store_request_sent (void *cls)
328 {
329   struct GNUNET_PEERSTORE_StoreContext *sc = cls;
330   GNUNET_PEERSTORE_Continuation cont;
331   void *cont_cls;
332
333   sc->ev = NULL;
334   cont = sc->cont;
335   cont_cls = sc->cont_cls;
336   GNUNET_PEERSTORE_store_cancel (sc);
337   if (NULL != cont)
338     cont (cont_cls, GNUNET_OK);
339 }
340
341
342 /**
343  * MQ message handlers
344  */
345 static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
346   {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD, 0},
347   {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END,
348    sizeof (struct GNUNET_MessageHeader)},
349   {&handle_watch_result, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD, 0},
350   GNUNET_MQ_HANDLERS_END
351 };
352
353 /******************************************************************************/
354 /*******************         CONNECTION FUNCTIONS         *********************/
355 /******************************************************************************/
356
357 static void
358 handle_client_error (void *cls, enum GNUNET_MQ_Error error)
359 {
360   struct GNUNET_PEERSTORE_Handle *h = cls;
361
362   LOG (GNUNET_ERROR_TYPE_ERROR,
363        _("Received an error notification from MQ of type: %d\n"), error);
364   reconnect (h);
365 }
366
367
368 /**
369  * Iterator over previous watches to resend them
370  */
371 static int
372 rewatch_it (void *cls,
373             const struct GNUNET_HashCode *key,
374             void *value)
375 {
376   struct GNUNET_PEERSTORE_Handle *h = cls;
377   struct GNUNET_PEERSTORE_WatchContext *wc = value;
378   struct StoreKeyHashMessage *hm;
379
380   if (GNUNET_YES == wc->request_sent)
381   {                             /* Envelope gone, create new one. */
382     wc->ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
383     hm->keyhash = wc->keyhash;
384     wc->request_sent = GNUNET_NO;
385   }
386   GNUNET_MQ_notify_sent (wc->ev, &watch_request_sent, wc);
387   GNUNET_MQ_send (h->mq, wc->ev);
388   return GNUNET_YES;
389 }
390
391
392 /**
393  * Close the existing connection to PEERSTORE and reconnect.
394  *
395  * @param h handle to the service
396  */
397 static void
398 reconnect (struct GNUNET_PEERSTORE_Handle *h)
399 {
400   struct GNUNET_PEERSTORE_IterateContext *ic;
401   GNUNET_PEERSTORE_Processor icb;
402   void *icb_cls;
403   struct GNUNET_PEERSTORE_StoreContext *sc;
404
405   LOG (GNUNET_ERROR_TYPE_DEBUG,
406        "Reconnecting...\n");
407   for (sc = h->store_head; NULL != sc; sc = sc->next)
408   {
409     if (NULL != sc->ev)
410     {
411       GNUNET_MQ_send_cancel (sc->ev);
412       sc->ev = NULL;
413     }
414   }
415   if (NULL != h->mq)
416   {
417     GNUNET_MQ_destroy (h->mq);
418     h->mq = NULL;
419   }
420   if (NULL != h->client)
421   {
422     GNUNET_CLIENT_disconnect (h->client);
423     h->client = NULL;
424   }
425   h->client = GNUNET_CLIENT_connect ("peerstore", h->cfg);
426   GNUNET_assert (NULL != h->client);
427   h->mq =
428       GNUNET_MQ_queue_for_connection_client (h->client, mq_handlers,
429                                              &handle_client_error, h);
430   LOG (GNUNET_ERROR_TYPE_DEBUG,
431        "Resending pending requests after reconnect.\n");
432   if (NULL != h->watches)
433     GNUNET_CONTAINER_multihashmap_iterate (h->watches,
434                                            &rewatch_it,
435                                            h);
436   for (ic = h->iterate_head; NULL != ic; ic = ic->next)
437   {
438     if (GNUNET_YES == ic->iterating)
439     {
440       icb = ic->callback;
441       icb_cls = ic->callback_cls;
442       GNUNET_PEERSTORE_iterate_cancel (ic);
443       if (NULL != icb)
444         icb (icb_cls,
445              NULL,
446              _("Iteration canceled due to reconnection."));
447     }
448     else
449     {
450       GNUNET_MQ_notify_sent (ic->ev,
451                              &iterate_request_sent,
452                              ic);
453       GNUNET_MQ_send (h->mq, ic->ev);
454     }
455   }
456   for (sc = h->store_head; NULL != sc; sc = sc->next)
457   {
458     sc->ev = PEERSTORE_create_record_mq_envelope (sc->sub_system,
459                                                   &sc->peer,
460                                                   sc->key,
461                                                   sc->value,
462                                                   sc->size,
463                                                   &sc->expiry,
464                                                   sc->options,
465                                                   GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
466     GNUNET_MQ_notify_sent (sc->ev,
467                            &store_request_sent,
468                            sc);
469     GNUNET_MQ_send (h->mq, sc->ev);
470   }
471 }
472
473
474 /**
475  * Iterator over watch requests to cancel them.
476  *
477  * @param cls unsused
478  * @param key key to the watch request
479  * @param value watch context
480  * @return #GNUNET_YES to continue iteration
481  */
482 static int
483 destroy_watch (void *cls,
484                const struct GNUNET_HashCode *key,
485                void *value)
486 {
487   struct GNUNET_PEERSTORE_WatchContext *wc = value;
488
489   GNUNET_PEERSTORE_watch_cancel (wc);
490   return GNUNET_YES;
491 }
492
493
494 /**
495  * Kill the connection to the service. This can be delayed in case of pending
496  * STORE requests and the user explicitly asked to sync first. Otherwise it is
497  * performed instantly.
498  *
499  * @param h Handle to the service.
500  */
501 static void
502 do_disconnect (struct GNUNET_PEERSTORE_Handle *h)
503 {
504   if (NULL != h->mq)
505   {
506     GNUNET_MQ_destroy (h->mq);
507     h->mq = NULL;
508   }
509   if (NULL != h->client)
510   {
511     GNUNET_CLIENT_disconnect (h->client);
512     h->client = NULL;
513   }
514   GNUNET_free (h);
515 }
516
517
518 /**
519  * Connect to the PEERSTORE service.
520  *
521  * @param cfg configuration to use
522  * @return NULL on error
523  */
524 struct GNUNET_PEERSTORE_Handle *
525 GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
526 {
527   struct GNUNET_PEERSTORE_Handle *h;
528
529   h = GNUNET_new (struct GNUNET_PEERSTORE_Handle);
530
531   h->client = GNUNET_CLIENT_connect ("peerstore", cfg);
532   if (NULL == h->client)
533   {
534     GNUNET_free (h);
535     return NULL;
536   }
537   h->cfg = cfg;
538   h->disconnecting = GNUNET_NO;
539   h->mq =
540       GNUNET_MQ_queue_for_connection_client (h->client, mq_handlers,
541                                              &handle_client_error, h);
542   if (NULL == h->mq)
543   {
544     GNUNET_CLIENT_disconnect (h->client);
545     GNUNET_free (h);
546     return NULL;
547   }
548   LOG (GNUNET_ERROR_TYPE_DEBUG,
549        "New connection created\n");
550   return h;
551 }
552
553
554 /**
555  * Disconnect from the PEERSTORE service. Any pending ITERATE and WATCH requests
556  * will be canceled.
557  * Any pending STORE requests will depend on @e snyc_first flag.
558  *
559  * @param h handle to disconnect
560  * @param sync_first send any pending STORE requests before disconnecting
561  */
562 void
563 GNUNET_PEERSTORE_disconnect (struct GNUNET_PEERSTORE_Handle *h, int sync_first)
564 {
565   struct GNUNET_PEERSTORE_IterateContext *ic;
566   struct GNUNET_PEERSTORE_IterateContext *ic_iter;
567   struct GNUNET_PEERSTORE_StoreContext *sc;
568   struct GNUNET_PEERSTORE_StoreContext *sc_iter;
569
570   LOG (GNUNET_ERROR_TYPE_DEBUG,
571        "Disconnecting.\n");
572   if (NULL != h->watches)
573   {
574     GNUNET_CONTAINER_multihashmap_iterate (h->watches,
575                                            &destroy_watch,
576                                            NULL);
577     GNUNET_CONTAINER_multihashmap_destroy (h->watches);
578     h->watches = NULL;
579   }
580   ic_iter = h->iterate_head;
581   while (NULL != ic_iter)
582   {
583     GNUNET_break (0);
584     ic = ic_iter;
585     ic_iter = ic_iter->next;
586     GNUNET_PEERSTORE_iterate_cancel (ic);
587   }
588   if (NULL != h->store_head)
589   {
590     if (GNUNET_YES == sync_first)
591     {
592       LOG (GNUNET_ERROR_TYPE_DEBUG,
593            "Delaying disconnection due to pending store requests.\n");
594       h->disconnecting = GNUNET_YES;
595       return;
596     }
597     sc_iter = h->store_head;
598     while (NULL != sc_iter)
599     {
600       sc = sc_iter;
601       sc_iter = sc_iter->next;
602       GNUNET_PEERSTORE_store_cancel (sc);
603     }
604   }
605   do_disconnect (h);
606 }
607
608
609 /******************************************************************************/
610 /*******************            STORE FUNCTIONS           *********************/
611 /******************************************************************************/
612
613
614 /**
615  * Cancel a store request
616  *
617  * @param sc Store request context
618  */
619 void
620 GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc)
621 {
622   struct GNUNET_PEERSTORE_Handle *h = sc->h;
623
624   if (NULL != sc->ev)
625   {
626     GNUNET_MQ_send_cancel (sc->ev);
627     sc->ev = NULL;
628   }
629   GNUNET_CONTAINER_DLL_remove (sc->h->store_head,
630                                sc->h->store_tail,
631                                sc);
632   GNUNET_free (sc->sub_system);
633   GNUNET_free (sc->value);
634   GNUNET_free (sc->key);
635   GNUNET_free (sc);
636   if ( (GNUNET_YES == h->disconnecting) &&
637        (NULL == h->store_head) )
638     do_disconnect (h);
639 }
640
641
642 /**
643  * Store a new entry in the PEERSTORE.
644  * Note that stored entries can be lost in some cases
645  * such as power failure.
646  *
647  * @param h Handle to the PEERSTORE service
648  * @param sub_system name of the sub system
649  * @param peer Peer Identity
650  * @param key entry key
651  * @param value entry value BLOB
652  * @param size size of @e value
653  * @param expiry absolute time after which the entry is (possibly) deleted
654  * @param options options specific to the storage operation
655  * @param cont Continuation function after the store request is sent
656  * @param cont_cls Closure for @a cont
657  */
658 struct GNUNET_PEERSTORE_StoreContext *
659 GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
660                         const char *sub_system,
661                         const struct GNUNET_PeerIdentity *peer,
662                         const char *key,
663                         const void *value,
664                         size_t size,
665                         struct GNUNET_TIME_Absolute expiry,
666                         enum GNUNET_PEERSTORE_StoreOption options,
667                         GNUNET_PEERSTORE_Continuation cont,
668                         void *cont_cls)
669 {
670   struct GNUNET_MQ_Envelope *ev;
671   struct GNUNET_PEERSTORE_StoreContext *sc;
672
673   LOG (GNUNET_ERROR_TYPE_DEBUG,
674        "Storing value (size: %lu) for subsytem `%s', peer `%s', key `%s'\n",
675        size, sub_system, GNUNET_i2s (peer), key);
676   ev = PEERSTORE_create_record_mq_envelope (sub_system,
677                                             peer,
678                                             key,
679                                             value,
680                                             size,
681                                             &expiry,
682                                             options,
683                                             GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
684   sc = GNUNET_new (struct GNUNET_PEERSTORE_StoreContext);
685   sc->sub_system = GNUNET_strdup (sub_system);
686   sc->peer = *peer;
687   sc->key = GNUNET_strdup (key);
688   sc->value = GNUNET_memdup (value, size);
689   sc->size = size;
690   sc->expiry = expiry;
691   sc->options = options;
692   sc->cont = cont;
693   sc->cont_cls = cont_cls;
694   sc->h = h;
695
696   GNUNET_CONTAINER_DLL_insert_tail (h->store_head,
697                                     h->store_tail,
698                                     sc);
699   GNUNET_MQ_notify_sent (ev,
700                          &store_request_sent,
701                          sc);
702   GNUNET_MQ_send (h->mq, ev);
703   return sc;
704
705 }
706
707
708 /******************************************************************************/
709 /*******************           ITERATE FUNCTIONS          *********************/
710 /******************************************************************************/
711
712 /**
713  * When a response for iterate request is received
714  *
715  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
716  * @param msg message received, NULL on timeout or fatal error
717  */
718 static void
719 handle_iterate_result (void *cls,
720                        const struct GNUNET_MessageHeader *msg)
721 {
722   struct GNUNET_PEERSTORE_Handle *h = cls;
723   struct GNUNET_PEERSTORE_IterateContext *ic;
724   GNUNET_PEERSTORE_Processor callback;
725   void *callback_cls;
726   uint16_t msg_type;
727   struct GNUNET_PEERSTORE_Record *record;
728   int continue_iter;
729
730   ic = h->iterate_head;
731   if (NULL == ic)
732   {
733     LOG (GNUNET_ERROR_TYPE_ERROR,
734          _("Unexpected iteration response, this should not happen.\n"));
735     reconnect (h);
736     return;
737   }
738   callback = ic->callback;
739   callback_cls = ic->callback_cls;
740   if (NULL == msg)              /* Connection error */
741   {
742     if (NULL != callback)
743       callback (callback_cls, NULL,
744                 _("Error communicating with `PEERSTORE' service."));
745     reconnect (h);
746     return;
747   }
748   msg_type = ntohs (msg->type);
749   if (GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END == msg_type)
750   {
751     ic->iterating = GNUNET_NO;
752     GNUNET_PEERSTORE_iterate_cancel (ic);
753     if (NULL != callback)
754       callback (callback_cls, NULL, NULL);
755     return;
756   }
757   if (NULL != callback)
758   {
759     record = PEERSTORE_parse_record_message (msg);
760     if (NULL == record)
761       continue_iter =
762           callback (callback_cls, NULL,
763                     _("Received a malformed response from service."));
764     else
765     {
766       continue_iter = callback (callback_cls, record, NULL);
767       PEERSTORE_destroy_record (record);
768     }
769     if (GNUNET_NO == continue_iter)
770       ic->callback = NULL;
771   }
772 }
773
774
775 /**
776  * Called when the iterate request is timedout
777  *
778  * @param cls a `struct GNUNET_PEERSTORE_IterateContext *`
779  * @param tc Scheduler task context (unused)
780  */
781 static void
782 iterate_timeout (void *cls,
783                  const struct GNUNET_SCHEDULER_TaskContext *tc)
784 {
785   struct GNUNET_PEERSTORE_IterateContext *ic = cls;
786   GNUNET_PEERSTORE_Processor callback;
787   void *callback_cls;
788
789   ic->timeout_task = GNUNET_SCHEDULER_NO_TASK;
790   callback = ic->callback;
791   callback_cls = ic->callback_cls;
792   GNUNET_PEERSTORE_iterate_cancel (ic);
793   if (NULL != callback)
794     callback (callback_cls,
795               NULL,
796               _("timeout"));
797 }
798
799
800 /**
801  * Cancel an iterate request
802  * Please do not call after the iterate request is done
803  *
804  * @param ic Iterate request context as returned by GNUNET_PEERSTORE_iterate()
805  */
806 void
807 GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic)
808 {
809   if (GNUNET_SCHEDULER_NO_TASK != ic->timeout_task)
810   {
811     GNUNET_SCHEDULER_cancel (ic->timeout_task);
812     ic->timeout_task = GNUNET_SCHEDULER_NO_TASK;
813   }
814   if (GNUNET_NO == ic->iterating)
815   {
816     if (NULL != ic->ev)
817     {
818       GNUNET_MQ_send_cancel (ic->ev);
819       ic->ev = NULL;
820     }
821     GNUNET_CONTAINER_DLL_remove (ic->h->iterate_head,
822                                  ic->h->iterate_tail,
823                                  ic);
824     GNUNET_free (ic);
825   }
826   else
827     ic->callback = NULL;
828 }
829
830
831 /**
832  * Iterate over records matching supplied key information
833  *
834  * @param h handle to the PEERSTORE service
835  * @param sub_system name of sub system
836  * @param peer Peer identity (can be NULL)
837  * @param key entry key string (can be NULL)
838  * @param timeout time after which the iterate request is canceled
839  * @param callback function called with each matching record, all NULL's on end
840  * @param callback_cls closure for @a callback
841  * @return Handle to iteration request
842  */
843 struct GNUNET_PEERSTORE_IterateContext *
844 GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
845                           const char *sub_system,
846                           const struct GNUNET_PeerIdentity *peer,
847                           const char *key,
848                           struct GNUNET_TIME_Relative timeout,
849                           GNUNET_PEERSTORE_Processor callback,
850                           void *callback_cls)
851 {
852   struct GNUNET_MQ_Envelope *ev;
853   struct GNUNET_PEERSTORE_IterateContext *ic;
854
855   ev = PEERSTORE_create_record_mq_envelope (sub_system, peer, key, NULL, 0,
856                                             NULL, 0,
857                                             GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
858   ic = GNUNET_new (struct GNUNET_PEERSTORE_IterateContext);
859
860   ic->callback = callback;
861   ic->callback_cls = callback_cls;
862   ic->ev = ev;
863   ic->h = h;
864   ic->iterating = GNUNET_NO;
865   GNUNET_CONTAINER_DLL_insert_tail (h->iterate_head,
866                                     h->iterate_tail,
867                                     ic);
868   LOG (GNUNET_ERROR_TYPE_DEBUG,
869        "Sending an iterate request for sub system `%s'\n", sub_system);
870   GNUNET_MQ_notify_sent (ev, &iterate_request_sent, ic);
871   GNUNET_MQ_send (h->mq, ev);
872   ic->timeout_task =
873       GNUNET_SCHEDULER_add_delayed (timeout, &iterate_timeout, ic);
874   return ic;
875 }
876
877
878 /******************************************************************************/
879 /*******************            WATCH FUNCTIONS           *********************/
880 /******************************************************************************/
881
882 /**
883  * When a watch record is received
884  *
885  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
886  * @param msg message received, NULL on timeout or fatal error
887  */
888 static void
889 handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg)
890 {
891   struct GNUNET_PEERSTORE_Handle *h = cls;
892   struct GNUNET_PEERSTORE_Record *record;
893   struct GNUNET_HashCode keyhash;
894   struct GNUNET_PEERSTORE_WatchContext *wc;
895
896   if (NULL == msg)
897   {
898     LOG (GNUNET_ERROR_TYPE_ERROR,
899          _("Problem receiving a watch response, no way to determine which request.\n"));
900     reconnect (h);
901     return;
902   }
903   LOG (GNUNET_ERROR_TYPE_DEBUG,
904        "Received a watch record from service.\n");
905   record = PEERSTORE_parse_record_message (msg);
906   PEERSTORE_hash_key (record->sub_system, record->peer, record->key, &keyhash);
907   wc = GNUNET_CONTAINER_multihashmap_get (h->watches, &keyhash);
908   if (NULL == wc)
909   {
910     LOG (GNUNET_ERROR_TYPE_ERROR,
911          _("Received a watch result for a non existing watch.\n"));
912     PEERSTORE_destroy_record (record);
913     reconnect (h);
914     return;
915   }
916   if (NULL != wc->callback)
917     wc->callback (wc->callback_cls, record, NULL);
918   PEERSTORE_destroy_record (record);
919 }
920
921
922 /**
923  * Cancel a watch request
924  *
925  * @param wc handle to the watch request
926  */
927 void
928 GNUNET_PEERSTORE_watch_cancel (struct GNUNET_PEERSTORE_WatchContext *wc)
929 {
930   struct GNUNET_PEERSTORE_Handle *h = wc->h;
931   struct GNUNET_MQ_Envelope *ev;
932   struct StoreKeyHashMessage *hm;
933
934   LOG (GNUNET_ERROR_TYPE_DEBUG, "Canceling watch.\n");
935   if (GNUNET_YES == wc->request_sent)   /* If request already sent to service, send a cancel request. */
936   {
937     ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL);
938     hm->keyhash = wc->keyhash;
939     GNUNET_MQ_send (h->mq, ev);
940     wc->callback = NULL;
941     wc->callback_cls = NULL;
942   }
943   if (NULL != wc->ev)
944   {
945     GNUNET_MQ_send_cancel (wc->ev);
946     wc->ev = NULL;
947   }
948   GNUNET_CONTAINER_multihashmap_remove (h->watches, &wc->keyhash, wc);
949   GNUNET_free (wc);
950
951 }
952
953
954 /**
955  * Request watching a given key
956  * User will be notified with any new values added to key
957  *
958  * @param h handle to the PEERSTORE service
959  * @param sub_system name of sub system
960  * @param peer Peer identity
961  * @param key entry key string
962  * @param callback function called with each new value
963  * @param callback_cls closure for @a callback
964  * @return Handle to watch request
965  */
966 struct GNUNET_PEERSTORE_WatchContext *
967 GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h,
968                         const char *sub_system,
969                         const struct GNUNET_PeerIdentity *peer, const char *key,
970                         GNUNET_PEERSTORE_Processor callback, void *callback_cls)
971 {
972   struct GNUNET_MQ_Envelope *ev;
973   struct StoreKeyHashMessage *hm;
974   struct GNUNET_PEERSTORE_WatchContext *wc;
975
976   GNUNET_assert (NULL != sub_system);
977   GNUNET_assert (NULL != peer);
978   GNUNET_assert (NULL != key);
979   ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
980   PEERSTORE_hash_key (sub_system, peer, key, &hm->keyhash);
981   wc = GNUNET_new (struct GNUNET_PEERSTORE_WatchContext);
982
983   wc->callback = callback;
984   wc->callback_cls = callback_cls;
985   wc->ev = ev;
986   wc->h = h;
987   wc->request_sent = GNUNET_NO;
988   wc->keyhash = hm->keyhash;
989   if (NULL == h->watches)
990     h->watches = GNUNET_CONTAINER_multihashmap_create (5, GNUNET_NO);
991   GNUNET_CONTAINER_multihashmap_put (h->watches, &wc->keyhash, wc,
992                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
993   LOG (GNUNET_ERROR_TYPE_DEBUG,
994        "Sending a watch request for ss `%s', peer `%s', key `%s'.\n",
995        sub_system, GNUNET_i2s (peer), key);
996   GNUNET_MQ_notify_sent (ev, &watch_request_sent, wc);
997   GNUNET_MQ_send (h->mq, ev);
998   return wc;
999 }
1000
1001 /* end of peerstore_api.c */