39f37b02291f8883a581a4ca8216eaf853b7b7a6
[oweals/gnunet.git] / src / peerstore / peerstore_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2013-2014 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20 /**
21  * @file peerstore/peerstore_api.c
22  * @brief API for peerstore
23  * @author Omar Tarabai
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "peerstore.h"
29 #include "peerstore_common.h"
30
31 #define LOG(kind,...) GNUNET_log_from (kind, "peerstore-api",__VA_ARGS__)
32
33 /******************************************************************************/
34 /************************      DATA STRUCTURES     ****************************/
35 /******************************************************************************/
36
37 /**
38  * Handle to the PEERSTORE service.
39  */
40 struct GNUNET_PEERSTORE_Handle
41 {
42
43   /**
44    * Our configuration.
45    */
46   const struct GNUNET_CONFIGURATION_Handle *cfg;
47
48   /**
49    * Connection to the service.
50    */
51   struct GNUNET_CLIENT_Connection *client;
52
53   /**
54    * Message queue
55    */
56   struct GNUNET_MQ_Handle *mq;
57
58   /**
59    * Head of active STORE requests.
60    */
61   struct GNUNET_PEERSTORE_StoreContext *store_head;
62
63   /**
64    * Tail of active STORE requests.
65    */
66   struct GNUNET_PEERSTORE_StoreContext *store_tail;
67
68   /**
69    * Head of active ITERATE requests.
70    */
71   struct GNUNET_PEERSTORE_IterateContext *iterate_head;
72
73   /**
74    * Tail of active ITERATE requests.
75    */
76   struct GNUNET_PEERSTORE_IterateContext *iterate_tail;
77
78   /**
79    * Hashmap of watch requests
80    */
81   struct GNUNET_CONTAINER_MultiHashMap *watches;
82
83   /**
84    * Are we in the process of disconnecting but need to sync first?
85    */
86   int disconnecting;
87
88 };
89
90 /**
91  * Context for a store request
92  */
93 struct GNUNET_PEERSTORE_StoreContext
94 {
95   /**
96    * Kept in a DLL.
97    */
98   struct GNUNET_PEERSTORE_StoreContext *next;
99
100   /**
101    * Kept in a DLL.
102    */
103   struct GNUNET_PEERSTORE_StoreContext *prev;
104
105   /**
106    * Handle to the PEERSTORE service.
107    */
108   struct GNUNET_PEERSTORE_Handle *h;
109
110   /**
111    * Continuation called with service response
112    */
113   GNUNET_PEERSTORE_Continuation cont;
114
115   /**
116    * Closure for @e cont
117    */
118   void *cont_cls;
119
120   /**
121    * Which subsystem does the store?
122    */
123   char *sub_system;
124
125   /**
126    * Key for the store operation.
127    */
128   char *key;
129
130   /**
131    * Contains @e size bytes.
132    */
133   void *value;
134
135   /**
136    * Peer the store is for.
137    */
138   struct GNUNET_PeerIdentity peer;
139
140   /**
141    * Number of bytes in @e value.
142    */
143   size_t size;
144
145   /**
146    * When does the value expire?
147    */
148   struct GNUNET_TIME_Absolute expiry;
149
150   /**
151    * Options for the store operation.
152    */
153   enum GNUNET_PEERSTORE_StoreOption options;
154
155 };
156
157 /**
158  * Context for a iterate request
159  */
160 struct GNUNET_PEERSTORE_IterateContext
161 {
162   /**
163    * Kept in a DLL.
164    */
165   struct GNUNET_PEERSTORE_IterateContext *next;
166
167   /**
168    * Kept in a DLL.
169    */
170   struct GNUNET_PEERSTORE_IterateContext *prev;
171
172   /**
173    * Handle to the PEERSTORE service.
174    */
175   struct GNUNET_PEERSTORE_Handle *h;
176
177   /**
178    * Which subsystem does the store?
179    */
180   char *sub_system;
181
182   /**
183    * Peer the store is for.
184    */
185   struct GNUNET_PeerIdentity peer;
186
187   /**
188    * Key for the store operation.
189    */
190   char *key;
191
192   /**
193    * Operation timeout
194    */
195   struct GNUNET_TIME_Relative timeout;
196
197   /**
198    * Callback with each matching record
199    */
200   GNUNET_PEERSTORE_Processor callback;
201
202   /**
203    * Closure for @e callback
204    */
205   void *callback_cls;
206
207   /**
208    * #GNUNET_YES if we are currently processing records.
209    */
210   int iterating;
211
212   /**
213    * Task identifier for the function called
214    * on iterate request timeout
215    */
216   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
217
218 };
219
220 /**
221  * Context for a watch request
222  */
223 struct GNUNET_PEERSTORE_WatchContext
224 {
225   /**
226    * Kept in a DLL.
227    */
228   struct GNUNET_PEERSTORE_WatchContext *next;
229
230   /**
231    * Kept in a DLL.
232    */
233   struct GNUNET_PEERSTORE_WatchContext *prev;
234
235   /**
236    * Handle to the PEERSTORE service.
237    */
238   struct GNUNET_PEERSTORE_Handle *h;
239
240   /**
241    * Callback with each record received
242    */
243   GNUNET_PEERSTORE_Processor callback;
244
245   /**
246    * Closure for @e callback
247    */
248   void *callback_cls;
249
250   /**
251    * Hash of the combined key
252    */
253   struct GNUNET_HashCode keyhash;
254
255 };
256
257 /******************************************************************************/
258 /*******************             DECLARATIONS             *********************/
259 /******************************************************************************/
260
261 /**
262  * When a response for iterate request is received
263  *
264  * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
265  * @param msg message received, NULL on timeout or fatal error
266  */
267 static void
268 handle_iterate_result (void *cls, const struct GNUNET_MessageHeader *msg);
269
270 /**
271  * When a watch record is received
272  *
273  * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
274  * @param msg message received, NULL on timeout or fatal error
275  */
276 static void
277 handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg);
278
279 /**
280  * Close the existing connection to PEERSTORE and reconnect.
281  *
282  * @param h handle to the service
283  */
284 static void
285 reconnect (struct GNUNET_PEERSTORE_Handle *h);
286
287
288 /**
289  * Callback after MQ envelope is sent
290  *
291  * @param cls a `struct GNUNET_PEERSTORE_StoreContext *`
292  */
293 static void
294 store_request_sent (void *cls)
295 {
296   struct GNUNET_PEERSTORE_StoreContext *sc = cls;
297   GNUNET_PEERSTORE_Continuation cont;
298   void *cont_cls;
299
300   cont = sc->cont;
301   cont_cls = sc->cont_cls;
302   GNUNET_PEERSTORE_store_cancel (sc);
303   if (NULL != cont)
304     cont (cont_cls, GNUNET_OK);
305 }
306
307
308 /**
309  * MQ message handlers
310  */
311 static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
312   {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD, 0},
313   {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END,
314    sizeof (struct GNUNET_MessageHeader)},
315   {&handle_watch_result, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD, 0},
316   GNUNET_MQ_HANDLERS_END
317 };
318
319 /******************************************************************************/
320 /*******************         CONNECTION FUNCTIONS         *********************/
321 /******************************************************************************/
322
323 static void
324 handle_client_error (void *cls, enum GNUNET_MQ_Error error)
325 {
326   struct GNUNET_PEERSTORE_Handle *h = cls;
327
328   LOG (GNUNET_ERROR_TYPE_ERROR,
329        _("Received an error notification from MQ of type: %d\n"), error);
330   reconnect (h);
331 }
332
333
334 /**
335  * Iterator over previous watches to resend them
336  *
337  * @param cls the `struct GNUNET_PEERSTORE_Handle`
338  * @param key key for the watch
339  * @param value the `struct GNUNET_PEERSTORE_WatchContext *`
340  * @return #GNUNET_YES (continue to iterate)
341  */
342 static int
343 rewatch_it (void *cls, const struct GNUNET_HashCode *key, void *value)
344 {
345   struct GNUNET_PEERSTORE_Handle *h = cls;
346   struct GNUNET_PEERSTORE_WatchContext *wc = value;
347   struct StoreKeyHashMessage *hm;
348   struct GNUNET_MQ_Envelope *ev;
349
350   ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
351   hm->keyhash = wc->keyhash;
352   GNUNET_MQ_send (h->mq, ev);
353   return GNUNET_YES;
354 }
355
356
357 /**
358  * Called when the iterate request is timedout
359  *
360  * @param cls a `struct GNUNET_PEERSTORE_IterateContext *`
361  * @param tc Scheduler task context (unused)
362  */
363 static void
364 iterate_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
365 {
366   struct GNUNET_PEERSTORE_IterateContext *ic = cls;
367   GNUNET_PEERSTORE_Processor callback;
368   void *callback_cls;
369
370   ic->timeout_task = GNUNET_SCHEDULER_NO_TASK;
371   callback = ic->callback;
372   callback_cls = ic->callback_cls;
373   GNUNET_PEERSTORE_iterate_cancel (ic);
374   if (NULL != callback)
375     callback (callback_cls, NULL, _("timeout"));
376 }
377
378
379 /**
380  * Close the existing connection to PEERSTORE and reconnect.
381  *
382  * @param h handle to the service
383  */
384 static void
385 reconnect (struct GNUNET_PEERSTORE_Handle *h)
386 {
387   struct GNUNET_PEERSTORE_IterateContext *ic;
388   struct GNUNET_PEERSTORE_IterateContext *next;
389   GNUNET_PEERSTORE_Processor icb;
390   void *icb_cls;
391   struct GNUNET_PEERSTORE_StoreContext *sc;
392   struct GNUNET_MQ_Envelope *ev;
393
394   LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n");
395   for (ic = h->iterate_head; NULL != ic; ic = next)
396   {
397     next = ic->next;
398     icb = ic->callback;
399     icb_cls = ic->callback_cls;
400     GNUNET_PEERSTORE_iterate_cancel (ic);
401     if (NULL != icb)
402       icb (icb_cls, NULL, _("Iteration canceled due to reconnection."));
403   }
404   if (NULL != h->mq)
405   {
406     GNUNET_MQ_destroy (h->mq);
407     h->mq = NULL;
408   }
409   if (NULL != h->client)
410   {
411     GNUNET_CLIENT_disconnect (h->client);
412     h->client = NULL;
413   }
414
415   h->client = GNUNET_CLIENT_connect ("peerstore", h->cfg);
416   GNUNET_assert (NULL != h->client);
417   h->mq =
418       GNUNET_MQ_queue_for_connection_client (h->client, mq_handlers,
419                                              &handle_client_error, h);
420   LOG (GNUNET_ERROR_TYPE_DEBUG,
421        "Resending pending requests after reconnect.\n");
422   if (NULL != h->watches)
423     GNUNET_CONTAINER_multihashmap_iterate (h->watches, &rewatch_it, h);
424   for (ic = h->iterate_head; NULL != ic; ic = ic->next)
425   {
426     ev =
427       PEERSTORE_create_record_mq_envelope (ic->sub_system, &ic->peer, ic->key,
428                                            NULL, 0, NULL, 0,
429                                            GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
430     GNUNET_MQ_send (h->mq, ev);
431     ic->timeout_task =
432         GNUNET_SCHEDULER_add_delayed (ic->timeout, &iterate_timeout, ic);
433   }
434   for (sc = h->store_head; NULL != sc; sc = sc->next)
435   {
436     ev =
437       PEERSTORE_create_record_mq_envelope (sc->sub_system, &sc->peer, sc->key,
438                                            sc->value, sc->size, &sc->expiry,
439                                            sc->options,
440                                            GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
441     GNUNET_MQ_notify_sent (ev, &store_request_sent, sc);
442     GNUNET_MQ_send (h->mq, ev);
443   }
444 }
445
446
447 /**
448  * Iterator over watch requests to cancel them.
449  *
450  * @param cls unsused
451  * @param key key to the watch request
452  * @param value watch context
453  * @return #GNUNET_YES to continue iteration
454  */
455 static int
456 destroy_watch (void *cls, const struct GNUNET_HashCode *key, void *value)
457 {
458   struct GNUNET_PEERSTORE_WatchContext *wc = value;
459
460   GNUNET_PEERSTORE_watch_cancel (wc);
461   return GNUNET_YES;
462 }
463
464
465 /**
466  * Kill the connection to the service. This can be delayed in case of pending
467  * STORE requests and the user explicitly asked to sync first. Otherwise it is
468  * performed instantly.
469  *
470  * @param h Handle to the service.
471  */
472 static void
473 do_disconnect (struct GNUNET_PEERSTORE_Handle *h)
474 {
475   if (NULL != h->mq)
476   {
477     GNUNET_MQ_destroy (h->mq);
478     h->mq = NULL;
479   }
480   if (NULL != h->client)
481   {
482     GNUNET_CLIENT_disconnect (h->client);
483     h->client = NULL;
484   }
485   GNUNET_free (h);
486 }
487
488
489 /**
490  * Connect to the PEERSTORE service.
491  *
492  * @param cfg configuration to use
493  * @return NULL on error
494  */
495 struct GNUNET_PEERSTORE_Handle *
496 GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
497 {
498   struct GNUNET_PEERSTORE_Handle *h;
499
500   h = GNUNET_new (struct GNUNET_PEERSTORE_Handle);
501
502   h->client = GNUNET_CLIENT_connect ("peerstore", cfg);
503   if (NULL == h->client)
504   {
505     GNUNET_free (h);
506     return NULL;
507   }
508   h->cfg = cfg;
509   h->disconnecting = GNUNET_NO;
510   h->mq =
511       GNUNET_MQ_queue_for_connection_client (h->client, mq_handlers,
512                                              &handle_client_error, h);
513   if (NULL == h->mq)
514   {
515     GNUNET_CLIENT_disconnect (h->client);
516     GNUNET_free (h);
517     return NULL;
518   }
519   LOG (GNUNET_ERROR_TYPE_DEBUG, "New connection created\n");
520   return h;
521 }
522
523
524 /**
525  * Disconnect from the PEERSTORE service. Any pending ITERATE and WATCH requests
526  * will be canceled.
527  * Any pending STORE requests will depend on @e snyc_first flag.
528  *
529  * @param h handle to disconnect
530  * @param sync_first send any pending STORE requests before disconnecting
531  */
532 void
533 GNUNET_PEERSTORE_disconnect (struct GNUNET_PEERSTORE_Handle *h, int sync_first)
534 {
535   struct GNUNET_PEERSTORE_IterateContext *ic;
536   struct GNUNET_PEERSTORE_IterateContext *ic_iter;
537   struct GNUNET_PEERSTORE_StoreContext *sc;
538   struct GNUNET_PEERSTORE_StoreContext *sc_iter;
539
540   LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting.\n");
541   if (NULL != h->watches)
542   {
543     GNUNET_CONTAINER_multihashmap_iterate (h->watches, &destroy_watch, NULL);
544     GNUNET_CONTAINER_multihashmap_destroy (h->watches);
545     h->watches = NULL;
546   }
547   ic_iter = h->iterate_head;
548   while (NULL != ic_iter)
549   {
550     GNUNET_break (0);
551     ic = ic_iter;
552     ic_iter = ic_iter->next;
553     GNUNET_PEERSTORE_iterate_cancel (ic);
554   }
555   if (NULL != h->store_head)
556   {
557     if (GNUNET_YES == sync_first)
558     {
559       LOG (GNUNET_ERROR_TYPE_DEBUG,
560            "Delaying disconnection due to pending store requests.\n");
561       h->disconnecting = GNUNET_YES;
562       return;
563     }
564     sc_iter = h->store_head;
565     while (NULL != sc_iter)
566     {
567       sc = sc_iter;
568       sc_iter = sc_iter->next;
569       GNUNET_PEERSTORE_store_cancel (sc);
570     }
571   }
572   do_disconnect (h);
573 }
574
575
576 /******************************************************************************/
577 /*******************            STORE FUNCTIONS           *********************/
578 /******************************************************************************/
579
580
581 /**
582  * Cancel a store request
583  *
584  * @param sc Store request context
585  */
586 void
587 GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc)
588 {
589   struct GNUNET_PEERSTORE_Handle *h = sc->h;
590
591   GNUNET_CONTAINER_DLL_remove (sc->h->store_head, sc->h->store_tail, sc);
592   GNUNET_free (sc->sub_system);
593   GNUNET_free (sc->value);
594   GNUNET_free (sc->key);
595   GNUNET_free (sc);
596   if ((GNUNET_YES == h->disconnecting) && (NULL == h->store_head))
597     do_disconnect (h);
598 }
599
600
601 /**
602  * Store a new entry in the PEERSTORE.
603  * Note that stored entries can be lost in some cases
604  * such as power failure.
605  *
606  * @param h Handle to the PEERSTORE service
607  * @param sub_system name of the sub system
608  * @param peer Peer Identity
609  * @param key entry key
610  * @param value entry value BLOB
611  * @param size size of @e value
612  * @param expiry absolute time after which the entry is (possibly) deleted
613  * @param options options specific to the storage operation
614  * @param cont Continuation function after the store request is sent
615  * @param cont_cls Closure for @a cont
616  */
617 struct GNUNET_PEERSTORE_StoreContext *
618 GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
619                         const char *sub_system,
620                         const struct GNUNET_PeerIdentity *peer, const char *key,
621                         const void *value, size_t size,
622                         struct GNUNET_TIME_Absolute expiry,
623                         enum GNUNET_PEERSTORE_StoreOption options,
624                         GNUNET_PEERSTORE_Continuation cont, void *cont_cls)
625 {
626   struct GNUNET_MQ_Envelope *ev;
627   struct GNUNET_PEERSTORE_StoreContext *sc;
628
629   LOG (GNUNET_ERROR_TYPE_DEBUG,
630        "Storing value (size: %lu) for subsytem `%s', peer `%s', key `%s'\n",
631        size, sub_system, GNUNET_i2s (peer), key);
632   ev = PEERSTORE_create_record_mq_envelope (sub_system, peer, key, value, size,
633                                             &expiry, options,
634                                             GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
635   sc = GNUNET_new (struct GNUNET_PEERSTORE_StoreContext);
636
637   sc->sub_system = GNUNET_strdup (sub_system);
638   sc->peer = *peer;
639   sc->key = GNUNET_strdup (key);
640   sc->value = GNUNET_memdup (value, size);
641   sc->size = size;
642   sc->expiry = expiry;
643   sc->options = options;
644   sc->cont = cont;
645   sc->cont_cls = cont_cls;
646   sc->h = h;
647
648   GNUNET_CONTAINER_DLL_insert_tail (h->store_head, h->store_tail, sc);
649   GNUNET_MQ_notify_sent (ev, &store_request_sent, sc);
650   GNUNET_MQ_send (h->mq, ev);
651   return sc;
652
653 }
654
655
656 /******************************************************************************/
657 /*******************           ITERATE FUNCTIONS          *********************/
658 /******************************************************************************/
659
660 /**
661  * When a response for iterate request is received
662  *
663  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
664  * @param msg message received, NULL on timeout or fatal error
665  */
666 static void
667 handle_iterate_result (void *cls, const struct GNUNET_MessageHeader *msg)
668 {
669   struct GNUNET_PEERSTORE_Handle *h = cls;
670   struct GNUNET_PEERSTORE_IterateContext *ic;
671   GNUNET_PEERSTORE_Processor callback;
672   void *callback_cls;
673   uint16_t msg_type;
674   struct GNUNET_PEERSTORE_Record *record;
675   int continue_iter;
676
677   ic = h->iterate_head;
678   if (NULL == ic)
679   {
680     LOG (GNUNET_ERROR_TYPE_ERROR,
681          _("Unexpected iteration response, this should not happen.\n"));
682     reconnect (h);
683     return;
684   }
685   ic->iterating = GNUNET_YES;
686   callback = ic->callback;
687   callback_cls = ic->callback_cls;
688   if (NULL == msg)              /* Connection error */
689   {
690     if (NULL != callback)
691       callback (callback_cls, NULL,
692                 _("Error communicating with `PEERSTORE' service."));
693     reconnect (h);
694     return;
695   }
696   msg_type = ntohs (msg->type);
697   if (GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END == msg_type)
698   {
699     ic->iterating = GNUNET_NO;
700     GNUNET_PEERSTORE_iterate_cancel (ic);
701     if (NULL != callback)
702       callback (callback_cls, NULL, NULL);
703     return;
704   }
705   if (NULL != callback)
706   {
707     record = PEERSTORE_parse_record_message (msg);
708     if (NULL == record)
709       continue_iter =
710           callback (callback_cls, NULL,
711                     _("Received a malformed response from service."));
712     else
713     {
714       continue_iter = callback (callback_cls, record, NULL);
715       PEERSTORE_destroy_record (record);
716     }
717     if (GNUNET_NO == continue_iter)
718       ic->callback = NULL;
719   }
720 }
721
722
723 /**
724  * Cancel an iterate request
725  * Please do not call after the iterate request is done
726  *
727  * @param ic Iterate request context as returned by GNUNET_PEERSTORE_iterate()
728  */
729 void
730 GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic)
731 {
732   if (GNUNET_SCHEDULER_NO_TASK != ic->timeout_task)
733   {
734     GNUNET_SCHEDULER_cancel (ic->timeout_task);
735     ic->timeout_task = GNUNET_SCHEDULER_NO_TASK;
736   }
737   if (GNUNET_NO == ic->iterating)
738   {
739     GNUNET_CONTAINER_DLL_remove (ic->h->iterate_head, ic->h->iterate_tail, ic);
740     GNUNET_free (ic->sub_system);
741     if (NULL != ic->key)
742       GNUNET_free (ic->key);
743     GNUNET_free (ic);
744   }
745   else
746     ic->callback = NULL;
747 }
748
749
750 /**
751  * Iterate over records matching supplied key information
752  *
753  * @param h handle to the PEERSTORE service
754  * @param sub_system name of sub system
755  * @param peer Peer identity (can be NULL)
756  * @param key entry key string (can be NULL)
757  * @param timeout time after which the iterate request is canceled
758  * @param callback function called with each matching record, all NULL's on end
759  * @param callback_cls closure for @a callback
760  * @return Handle to iteration request
761  */
762 struct GNUNET_PEERSTORE_IterateContext *
763 GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
764                           const char *sub_system,
765                           const struct GNUNET_PeerIdentity *peer,
766                           const char *key, struct GNUNET_TIME_Relative timeout,
767                           GNUNET_PEERSTORE_Processor callback,
768                           void *callback_cls)
769 {
770   struct GNUNET_MQ_Envelope *ev;
771   struct GNUNET_PEERSTORE_IterateContext *ic;
772
773   ev = PEERSTORE_create_record_mq_envelope (sub_system, peer, key, NULL, 0,
774                                             NULL, 0,
775                                             GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
776   ic = GNUNET_new (struct GNUNET_PEERSTORE_IterateContext);
777
778   ic->callback = callback;
779   ic->callback_cls = callback_cls;
780   ic->h = h;
781   ic->sub_system = GNUNET_strdup (sub_system);
782   if (NULL != peer)
783     ic->peer = *peer;
784   if (NULL != key)
785     ic->key = GNUNET_strdup (key);
786   ic->timeout = timeout;
787   GNUNET_CONTAINER_DLL_insert_tail (h->iterate_head,
788                                     h->iterate_tail,
789                                     ic);
790   LOG (GNUNET_ERROR_TYPE_DEBUG,
791        "Sending an iterate request for sub system `%s'\n", sub_system);
792   GNUNET_MQ_send (h->mq, ev);
793   ic->timeout_task =
794       GNUNET_SCHEDULER_add_delayed (timeout, &iterate_timeout, ic);
795   return ic;
796 }
797
798
799 /******************************************************************************/
800 /*******************            WATCH FUNCTIONS           *********************/
801 /******************************************************************************/
802
803 /**
804  * When a watch record is received
805  *
806  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
807  * @param msg message received, NULL on timeout or fatal error
808  */
809 static void
810 handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg)
811 {
812   struct GNUNET_PEERSTORE_Handle *h = cls;
813   struct GNUNET_PEERSTORE_Record *record;
814   struct GNUNET_HashCode keyhash;
815   struct GNUNET_PEERSTORE_WatchContext *wc;
816
817   if (NULL == msg)
818   {
819     LOG (GNUNET_ERROR_TYPE_ERROR,
820          _
821          ("Problem receiving a watch response, no way to determine which request.\n"));
822     reconnect (h);
823     return;
824   }
825   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a watch record from service.\n");
826   record = PEERSTORE_parse_record_message (msg);
827   PEERSTORE_hash_key (record->sub_system, record->peer, record->key, &keyhash);
828   // FIXME: what if there are multiple watches for the same key?
829   wc = GNUNET_CONTAINER_multihashmap_get (h->watches, &keyhash);
830   if (NULL == wc)
831   {
832     LOG (GNUNET_ERROR_TYPE_ERROR,
833          _("Received a watch result for a non existing watch.\n"));
834     PEERSTORE_destroy_record (record);
835     reconnect (h);
836     return;
837   }
838   if (NULL != wc->callback)
839     wc->callback (wc->callback_cls, record, NULL);
840   PEERSTORE_destroy_record (record);
841 }
842
843
844 /**
845  * Cancel a watch request
846  *
847  * @param wc handle to the watch request
848  */
849 void
850 GNUNET_PEERSTORE_watch_cancel (struct GNUNET_PEERSTORE_WatchContext *wc)
851 {
852   struct GNUNET_PEERSTORE_Handle *h = wc->h;
853   struct GNUNET_MQ_Envelope *ev;
854   struct StoreKeyHashMessage *hm;
855
856   LOG (GNUNET_ERROR_TYPE_DEBUG,
857        "Canceling watch.\n");
858   ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL);
859   hm->keyhash = wc->keyhash;
860   GNUNET_MQ_send (h->mq, ev);
861   GNUNET_CONTAINER_multihashmap_remove (h->watches,
862                                         &wc->keyhash,
863                                         wc);
864   GNUNET_free (wc);
865 }
866
867
868 /**
869  * Request watching a given key
870  * User will be notified with any new values added to key
871  *
872  * @param h handle to the PEERSTORE service
873  * @param sub_system name of sub system
874  * @param peer Peer identity
875  * @param key entry key string
876  * @param callback function called with each new value
877  * @param callback_cls closure for @a callback
878  * @return Handle to watch request
879  */
880 struct GNUNET_PEERSTORE_WatchContext *
881 GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h,
882                         const char *sub_system,
883                         const struct GNUNET_PeerIdentity *peer, const char *key,
884                         GNUNET_PEERSTORE_Processor callback, void *callback_cls)
885 {
886   struct GNUNET_MQ_Envelope *ev;
887   struct StoreKeyHashMessage *hm;
888   struct GNUNET_PEERSTORE_WatchContext *wc;
889
890   GNUNET_assert (NULL != sub_system);
891   GNUNET_assert (NULL != peer);
892   GNUNET_assert (NULL != key);
893   ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
894   PEERSTORE_hash_key (sub_system,
895                       peer,
896                       key,
897                       &hm->keyhash);
898   wc = GNUNET_new (struct GNUNET_PEERSTORE_WatchContext);
899   wc->callback = callback;
900   wc->callback_cls = callback_cls;
901   wc->h = h;
902   wc->keyhash = hm->keyhash;
903   if (NULL == h->watches)
904     h->watches = GNUNET_CONTAINER_multihashmap_create (5, GNUNET_NO);
905   GNUNET_assert (GNUNET_OK ==
906                  GNUNET_CONTAINER_multihashmap_put (h->watches,
907                                                     &wc->keyhash,
908                                                     wc,
909                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
910   LOG (GNUNET_ERROR_TYPE_DEBUG,
911        "Sending a watch request for ss `%s', peer `%s', key `%s'.\n",
912        sub_system,
913        GNUNET_i2s (peer),
914        key);
915   GNUNET_MQ_send (h->mq, ev);
916   return wc;
917 }
918
919 /* end of peerstore_api.c */