-fix non-deterministic peerstore sync failure
[oweals/gnunet.git] / src / peerstore / peerstore_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2013-2014 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file peerstore/peerstore_api.c
22  * @brief API for peerstore
23  * @author Omar Tarabai
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "peerstore.h"
29 #include "peerstore_common.h"
30
31 #define LOG(kind,...) GNUNET_log_from (kind, "peerstore-api",__VA_ARGS__)
32
33 /******************************************************************************/
34 /************************      DATA STRUCTURES     ****************************/
35 /******************************************************************************/
36
37 /**
38  * Handle to the PEERSTORE service.
39  */
40 struct GNUNET_PEERSTORE_Handle
41 {
42
43   /**
44    * Our configuration.
45    */
46   const struct GNUNET_CONFIGURATION_Handle *cfg;
47
48   /**
49    * Connection to the service.
50    */
51   struct GNUNET_CLIENT_Connection *client;
52
53   /**
54    * Message queue
55    */
56   struct GNUNET_MQ_Handle *mq;
57
58   /**
59    * Head of active STORE requests.
60    */
61   struct GNUNET_PEERSTORE_StoreContext *store_head;
62
63   /**
64    * Tail of active STORE requests.
65    */
66   struct GNUNET_PEERSTORE_StoreContext *store_tail;
67
68   /**
69    * Head of active ITERATE requests.
70    */
71   struct GNUNET_PEERSTORE_IterateContext *iterate_head;
72
73   /**
74    * Tail of active ITERATE requests.
75    */
76   struct GNUNET_PEERSTORE_IterateContext *iterate_tail;
77
78   /**
79    * Hashmap of watch requests
80    */
81   struct GNUNET_CONTAINER_MultiHashMap *watches;
82
83   /**
84    * Are we in the process of disconnecting but need to sync first?
85    */
86   int disconnecting;
87
88 };
89
90 /**
91  * Context for a store request
92  */
93 struct GNUNET_PEERSTORE_StoreContext
94 {
95   /**
96    * Kept in a DLL.
97    */
98   struct GNUNET_PEERSTORE_StoreContext *next;
99
100   /**
101    * Kept in a DLL.
102    */
103   struct GNUNET_PEERSTORE_StoreContext *prev;
104
105   /**
106    * Handle to the PEERSTORE service.
107    */
108   struct GNUNET_PEERSTORE_Handle *h;
109
110   /**
111    * Continuation called with service response
112    */
113   GNUNET_PEERSTORE_Continuation cont;
114
115   /**
116    * Closure for @e cont
117    */
118   void *cont_cls;
119
120   /**
121    * Which subsystem does the store?
122    */
123   char *sub_system;
124
125   /**
126    * Key for the store operation.
127    */
128   char *key;
129
130   /**
131    * Contains @e size bytes.
132    */
133   void *value;
134
135   /**
136    * Peer the store is for.
137    */
138   struct GNUNET_PeerIdentity peer;
139
140   /**
141    * Number of bytes in @e value.
142    */
143   size_t size;
144
145   /**
146    * When does the value expire?
147    */
148   struct GNUNET_TIME_Absolute expiry;
149
150   /**
151    * Options for the store operation.
152    */
153   enum GNUNET_PEERSTORE_StoreOption options;
154
155 };
156
157 /**
158  * Context for a iterate request
159  */
160 struct GNUNET_PEERSTORE_IterateContext
161 {
162   /**
163    * Kept in a DLL.
164    */
165   struct GNUNET_PEERSTORE_IterateContext *next;
166
167   /**
168    * Kept in a DLL.
169    */
170   struct GNUNET_PEERSTORE_IterateContext *prev;
171
172   /**
173    * Handle to the PEERSTORE service.
174    */
175   struct GNUNET_PEERSTORE_Handle *h;
176
177   /**
178    * Which subsystem does the store?
179    */
180   char *sub_system;
181
182   /**
183    * Peer the store is for.
184    */
185   struct GNUNET_PeerIdentity peer;
186
187   /**
188    * Key for the store operation.
189    */
190   char *key;
191
192   /**
193    * Operation timeout
194    */
195   struct GNUNET_TIME_Relative timeout;
196
197   /**
198    * Callback with each matching record
199    */
200   GNUNET_PEERSTORE_Processor callback;
201
202   /**
203    * Closure for @e callback
204    */
205   void *callback_cls;
206
207   /**
208    * #GNUNET_YES if we are currently processing records.
209    */
210   int iterating;
211
212   /**
213    * Task identifier for the function called
214    * on iterate request timeout
215    */
216   struct GNUNET_SCHEDULER_Task * timeout_task;
217
218 };
219
220 /**
221  * Context for a watch request
222  */
223 struct GNUNET_PEERSTORE_WatchContext
224 {
225   /**
226    * Kept in a DLL.
227    */
228   struct GNUNET_PEERSTORE_WatchContext *next;
229
230   /**
231    * Kept in a DLL.
232    */
233   struct GNUNET_PEERSTORE_WatchContext *prev;
234
235   /**
236    * Handle to the PEERSTORE service.
237    */
238   struct GNUNET_PEERSTORE_Handle *h;
239
240   /**
241    * Callback with each record received
242    */
243   GNUNET_PEERSTORE_Processor callback;
244
245   /**
246    * Closure for @e callback
247    */
248   void *callback_cls;
249
250   /**
251    * Hash of the combined key
252    */
253   struct GNUNET_HashCode keyhash;
254
255 };
256
257 /******************************************************************************/
258 /*******************             DECLARATIONS             *********************/
259 /******************************************************************************/
260
261 /**
262  * When a response for iterate request is received
263  *
264  * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
265  * @param msg message received, NULL on timeout or fatal error
266  */
267 static void
268 handle_iterate_result (void *cls, const struct GNUNET_MessageHeader *msg);
269
270 /**
271  * When a watch record is received
272  *
273  * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
274  * @param msg message received, NULL on timeout or fatal error
275  */
276 static void
277 handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg);
278
279 /**
280  * Close the existing connection to PEERSTORE and reconnect.
281  *
282  * @param h handle to the service
283  */
284 static void
285 reconnect (struct GNUNET_PEERSTORE_Handle *h);
286
287
288 /**
289  * Callback after MQ envelope is sent
290  *
291  * @param cls a `struct GNUNET_PEERSTORE_StoreContext *`
292  */
293 static void
294 store_request_sent (void *cls)
295 {
296   struct GNUNET_PEERSTORE_StoreContext *sc = cls;
297   GNUNET_PEERSTORE_Continuation cont;
298   void *cont_cls;
299
300   cont = sc->cont;
301   cont_cls = sc->cont_cls;
302   GNUNET_PEERSTORE_store_cancel (sc);
303   if (NULL != cont)
304     cont (cont_cls, GNUNET_OK);
305 }
306
307
308 /**
309  * MQ message handlers
310  */
311 static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
312   {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD, 0},
313   {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END,
314    sizeof (struct GNUNET_MessageHeader)},
315   {&handle_watch_result, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD, 0},
316   GNUNET_MQ_HANDLERS_END
317 };
318
319 /******************************************************************************/
320 /*******************         CONNECTION FUNCTIONS         *********************/
321 /******************************************************************************/
322
323 static void
324 handle_client_error (void *cls, enum GNUNET_MQ_Error error)
325 {
326   struct GNUNET_PEERSTORE_Handle *h = cls;
327
328   LOG (GNUNET_ERROR_TYPE_ERROR,
329        _("Received an error notification from MQ of type: %d\n"), error);
330   reconnect (h);
331 }
332
333
334 /**
335  * Iterator over previous watches to resend them
336  *
337  * @param cls the `struct GNUNET_PEERSTORE_Handle`
338  * @param key key for the watch
339  * @param value the `struct GNUNET_PEERSTORE_WatchContext *`
340  * @return #GNUNET_YES (continue to iterate)
341  */
342 static int
343 rewatch_it (void *cls, const struct GNUNET_HashCode *key, void *value)
344 {
345   struct GNUNET_PEERSTORE_Handle *h = cls;
346   struct GNUNET_PEERSTORE_WatchContext *wc = value;
347   struct StoreKeyHashMessage *hm;
348   struct GNUNET_MQ_Envelope *ev;
349
350   ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
351   hm->keyhash = wc->keyhash;
352   GNUNET_MQ_send (h->mq, ev);
353   return GNUNET_YES;
354 }
355
356
357 /**
358  * Called when the iterate request is timedout
359  *
360  * @param cls a `struct GNUNET_PEERSTORE_IterateContext *`
361  * @param tc Scheduler task context (unused)
362  */
363 static void
364 iterate_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
365 {
366   struct GNUNET_PEERSTORE_IterateContext *ic = cls;
367   GNUNET_PEERSTORE_Processor callback;
368   void *callback_cls;
369
370   ic->timeout_task = NULL;
371   callback = ic->callback;
372   callback_cls = ic->callback_cls;
373   GNUNET_PEERSTORE_iterate_cancel (ic);
374   if (NULL != callback)
375     callback (callback_cls, NULL, _("timeout"));
376 }
377
378
379 /**
380  * Close the existing connection to PEERSTORE and reconnect.
381  *
382  * @param h handle to the service
383  */
384 static void
385 reconnect (struct GNUNET_PEERSTORE_Handle *h)
386 {
387   struct GNUNET_PEERSTORE_IterateContext *ic;
388   struct GNUNET_PEERSTORE_IterateContext *next;
389   GNUNET_PEERSTORE_Processor icb;
390   void *icb_cls;
391   struct GNUNET_PEERSTORE_StoreContext *sc;
392   struct GNUNET_MQ_Envelope *ev;
393
394   LOG (GNUNET_ERROR_TYPE_DEBUG,
395        "Reconnecting...\n");
396   for (ic = h->iterate_head; NULL != ic; ic = next)
397   {
398     next = ic->next;
399     if (GNUNET_YES == ic->iterating)
400     {
401       icb = ic->callback;
402       icb_cls = ic->callback_cls;
403       GNUNET_PEERSTORE_iterate_cancel (ic);
404       if (NULL != icb)
405         icb (icb_cls,
406              NULL,
407              "Iteration canceled due to reconnection");
408     }
409   }
410   if (NULL != h->mq)
411   {
412     GNUNET_MQ_destroy (h->mq);
413     h->mq = NULL;
414   }
415   if (NULL != h->client)
416   {
417     GNUNET_CLIENT_disconnect (h->client);
418     h->client = NULL;
419   }
420
421   h->client = GNUNET_CLIENT_connect ("peerstore", h->cfg);
422   GNUNET_assert (NULL != h->client);
423   h->mq =
424       GNUNET_MQ_queue_for_connection_client (h->client, mq_handlers,
425                                              &handle_client_error, h);
426   LOG (GNUNET_ERROR_TYPE_DEBUG,
427        "Resending pending requests after reconnect.\n");
428   if (NULL != h->watches)
429     GNUNET_CONTAINER_multihashmap_iterate (h->watches, &rewatch_it, h);
430   for (ic = h->iterate_head; NULL != ic; ic = ic->next)
431   {
432     ev =
433       PEERSTORE_create_record_mq_envelope (ic->sub_system, &ic->peer, ic->key,
434                                            NULL, 0, NULL, 0,
435                                            GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
436     GNUNET_MQ_send (h->mq, ev);
437     ic->timeout_task =
438         GNUNET_SCHEDULER_add_delayed (ic->timeout, &iterate_timeout, ic);
439   }
440   for (sc = h->store_head; NULL != sc; sc = sc->next)
441   {
442     ev =
443       PEERSTORE_create_record_mq_envelope (sc->sub_system, &sc->peer, sc->key,
444                                            sc->value, sc->size, &sc->expiry,
445                                            sc->options,
446                                            GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
447     GNUNET_MQ_notify_sent (ev, &store_request_sent, sc);
448     GNUNET_MQ_send (h->mq, ev);
449   }
450 }
451
452
453 /**
454  * Iterator over watch requests to cancel them.
455  *
456  * @param cls unsused
457  * @param key key to the watch request
458  * @param value watch context
459  * @return #GNUNET_YES to continue iteration
460  */
461 static int
462 destroy_watch (void *cls, const struct GNUNET_HashCode *key, void *value)
463 {
464   struct GNUNET_PEERSTORE_WatchContext *wc = value;
465
466   GNUNET_PEERSTORE_watch_cancel (wc);
467   return GNUNET_YES;
468 }
469
470
471 /**
472  * Kill the connection to the service. This can be delayed in case of pending
473  * STORE requests and the user explicitly asked to sync first. Otherwise it is
474  * performed instantly.
475  *
476  * @param h Handle to the service.
477  */
478 static void
479 do_disconnect (struct GNUNET_PEERSTORE_Handle *h)
480 {
481   if (NULL != h->mq)
482   {
483     GNUNET_MQ_destroy (h->mq);
484     h->mq = NULL;
485   }
486   if (NULL != h->client)
487   {
488     GNUNET_CLIENT_disconnect (h->client);
489     h->client = NULL;
490   }
491   GNUNET_free (h);
492 }
493
494
495 /**
496  * Connect to the PEERSTORE service.
497  *
498  * @param cfg configuration to use
499  * @return NULL on error
500  */
501 struct GNUNET_PEERSTORE_Handle *
502 GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
503 {
504   struct GNUNET_PEERSTORE_Handle *h;
505
506   h = GNUNET_new (struct GNUNET_PEERSTORE_Handle);
507
508   h->client = GNUNET_CLIENT_connect ("peerstore", cfg);
509   if (NULL == h->client)
510   {
511     GNUNET_free (h);
512     return NULL;
513   }
514   h->cfg = cfg;
515   h->disconnecting = GNUNET_NO;
516   h->mq =
517       GNUNET_MQ_queue_for_connection_client (h->client, mq_handlers,
518                                              &handle_client_error, h);
519   if (NULL == h->mq)
520   {
521     GNUNET_CLIENT_disconnect (h->client);
522     GNUNET_free (h);
523     return NULL;
524   }
525   LOG (GNUNET_ERROR_TYPE_DEBUG, "New connection created\n");
526   return h;
527 }
528
529
530 /**
531  * Disconnect from the PEERSTORE service. Any pending ITERATE and WATCH requests
532  * will be canceled.
533  * Any pending STORE requests will depend on @e snyc_first flag.
534  *
535  * @param h handle to disconnect
536  * @param sync_first send any pending STORE requests before disconnecting
537  */
538 void
539 GNUNET_PEERSTORE_disconnect (struct GNUNET_PEERSTORE_Handle *h,
540                              int sync_first)
541 {
542   struct GNUNET_PEERSTORE_IterateContext *ic;
543   struct GNUNET_PEERSTORE_StoreContext *sc;
544
545   LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting.\n");
546   if (NULL != h->watches)
547   {
548     GNUNET_CONTAINER_multihashmap_iterate (h->watches, &destroy_watch, NULL);
549     GNUNET_CONTAINER_multihashmap_destroy (h->watches);
550     h->watches = NULL;
551   }
552   while (NULL != (ic = h->iterate_head))
553   {
554     GNUNET_break (0);
555     GNUNET_PEERSTORE_iterate_cancel (ic);
556   }
557   if (NULL != h->store_head)
558   {
559     if (GNUNET_YES == sync_first)
560     {
561       LOG (GNUNET_ERROR_TYPE_DEBUG,
562            "Delaying disconnection due to pending store requests.\n");
563       h->disconnecting = GNUNET_YES;
564       return;
565     }
566     while (NULL != (sc = h->store_head))
567       GNUNET_PEERSTORE_store_cancel (sc);
568   }
569   do_disconnect (h);
570 }
571
572
573 /******************************************************************************/
574 /*******************            STORE FUNCTIONS           *********************/
575 /******************************************************************************/
576
577
578 /**
579  * Cancel a store request
580  *
581  * @param sc Store request context
582  */
583 void
584 GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc)
585 {
586   struct GNUNET_PEERSTORE_Handle *h = sc->h;
587
588   GNUNET_CONTAINER_DLL_remove (sc->h->store_head, sc->h->store_tail, sc);
589   GNUNET_free (sc->sub_system);
590   GNUNET_free (sc->value);
591   GNUNET_free (sc->key);
592   GNUNET_free (sc);
593   if ((GNUNET_YES == h->disconnecting) && (NULL == h->store_head))
594     do_disconnect (h);
595 }
596
597
598 /**
599  * Store a new entry in the PEERSTORE.
600  * Note that stored entries can be lost in some cases
601  * such as power failure.
602  *
603  * @param h Handle to the PEERSTORE service
604  * @param sub_system name of the sub system
605  * @param peer Peer Identity
606  * @param key entry key
607  * @param value entry value BLOB
608  * @param size size of @e value
609  * @param expiry absolute time after which the entry is (possibly) deleted
610  * @param options options specific to the storage operation
611  * @param cont Continuation function after the store request is sent
612  * @param cont_cls Closure for @a cont
613  */
614 struct GNUNET_PEERSTORE_StoreContext *
615 GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
616                         const char *sub_system,
617                         const struct GNUNET_PeerIdentity *peer, const char *key,
618                         const void *value, size_t size,
619                         struct GNUNET_TIME_Absolute expiry,
620                         enum GNUNET_PEERSTORE_StoreOption options,
621                         GNUNET_PEERSTORE_Continuation cont, void *cont_cls)
622 {
623   struct GNUNET_MQ_Envelope *ev;
624   struct GNUNET_PEERSTORE_StoreContext *sc;
625
626   LOG (GNUNET_ERROR_TYPE_DEBUG,
627        "Storing value (size: %lu) for subsytem `%s', peer `%s', key `%s'\n",
628        size, sub_system, GNUNET_i2s (peer), key);
629   ev = PEERSTORE_create_record_mq_envelope (sub_system, peer, key, value, size,
630                                             &expiry, options,
631                                             GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
632   sc = GNUNET_new (struct GNUNET_PEERSTORE_StoreContext);
633
634   sc->sub_system = GNUNET_strdup (sub_system);
635   sc->peer = *peer;
636   sc->key = GNUNET_strdup (key);
637   sc->value = GNUNET_memdup (value, size);
638   sc->size = size;
639   sc->expiry = expiry;
640   sc->options = options;
641   sc->cont = cont;
642   sc->cont_cls = cont_cls;
643   sc->h = h;
644
645   GNUNET_CONTAINER_DLL_insert_tail (h->store_head, h->store_tail, sc);
646   GNUNET_MQ_notify_sent (ev, &store_request_sent, sc);
647   GNUNET_MQ_send (h->mq, ev);
648   return sc;
649
650 }
651
652
653 /******************************************************************************/
654 /*******************           ITERATE FUNCTIONS          *********************/
655 /******************************************************************************/
656
657 /**
658  * When a response for iterate request is received
659  *
660  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
661  * @param msg message received, NULL on timeout or fatal error
662  */
663 static void
664 handle_iterate_result (void *cls, const struct GNUNET_MessageHeader *msg)
665 {
666   struct GNUNET_PEERSTORE_Handle *h = cls;
667   struct GNUNET_PEERSTORE_IterateContext *ic;
668   GNUNET_PEERSTORE_Processor callback;
669   void *callback_cls;
670   uint16_t msg_type;
671   struct GNUNET_PEERSTORE_Record *record;
672   int continue_iter;
673
674   ic = h->iterate_head;
675   if (NULL == ic)
676   {
677     LOG (GNUNET_ERROR_TYPE_ERROR,
678          _("Unexpected iteration response, this should not happen.\n"));
679     reconnect (h);
680     return;
681   }
682   ic->iterating = GNUNET_YES;
683   callback = ic->callback;
684   callback_cls = ic->callback_cls;
685   if (NULL == msg)              /* Connection error */
686   {
687     if (NULL != callback)
688       callback (callback_cls, NULL,
689                 _("Error communicating with `PEERSTORE' service."));
690     reconnect (h);
691     return;
692   }
693   msg_type = ntohs (msg->type);
694   if (GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END == msg_type)
695   {
696     ic->iterating = GNUNET_NO;
697     GNUNET_PEERSTORE_iterate_cancel (ic);
698     if (NULL != callback)
699       callback (callback_cls, NULL, NULL);
700     return;
701   }
702   if (NULL != callback)
703   {
704     record = PEERSTORE_parse_record_message (msg);
705     if (NULL == record)
706       continue_iter =
707           callback (callback_cls, NULL,
708                     _("Received a malformed response from service."));
709     else
710     {
711       continue_iter = callback (callback_cls, record, NULL);
712       PEERSTORE_destroy_record (record);
713     }
714     if (GNUNET_NO == continue_iter)
715       ic->callback = NULL;
716   }
717 }
718
719
720 /**
721  * Cancel an iterate request
722  * Please do not call after the iterate request is done
723  *
724  * @param ic Iterate request context as returned by GNUNET_PEERSTORE_iterate()
725  */
726 void
727 GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic)
728 {
729   if (NULL != ic->timeout_task)
730   {
731     GNUNET_SCHEDULER_cancel (ic->timeout_task);
732     ic->timeout_task = NULL;
733   }
734   if (GNUNET_NO == ic->iterating)
735   {
736     GNUNET_CONTAINER_DLL_remove (ic->h->iterate_head,
737                                  ic->h->iterate_tail,
738                                  ic);
739     GNUNET_free (ic->sub_system);
740     if (NULL != ic->key)
741       GNUNET_free (ic->key);
742     GNUNET_free (ic);
743   }
744   else
745     ic->callback = NULL;
746 }
747
748
749 /**
750  * Iterate over records matching supplied key information
751  *
752  * @param h handle to the PEERSTORE service
753  * @param sub_system name of sub system
754  * @param peer Peer identity (can be NULL)
755  * @param key entry key string (can be NULL)
756  * @param timeout time after which the iterate request is canceled
757  * @param callback function called with each matching record, all NULL's on end
758  * @param callback_cls closure for @a callback
759  * @return Handle to iteration request
760  */
761 struct GNUNET_PEERSTORE_IterateContext *
762 GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
763                           const char *sub_system,
764                           const struct GNUNET_PeerIdentity *peer,
765                           const char *key, struct GNUNET_TIME_Relative timeout,
766                           GNUNET_PEERSTORE_Processor callback,
767                           void *callback_cls)
768 {
769   struct GNUNET_MQ_Envelope *ev;
770   struct GNUNET_PEERSTORE_IterateContext *ic;
771
772   ev = PEERSTORE_create_record_mq_envelope (sub_system, peer, key, NULL, 0,
773                                             NULL, 0,
774                                             GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
775   ic = GNUNET_new (struct GNUNET_PEERSTORE_IterateContext);
776
777   ic->callback = callback;
778   ic->callback_cls = callback_cls;
779   ic->h = h;
780   ic->sub_system = GNUNET_strdup (sub_system);
781   if (NULL != peer)
782     ic->peer = *peer;
783   if (NULL != key)
784     ic->key = GNUNET_strdup (key);
785   ic->timeout = timeout;
786   GNUNET_CONTAINER_DLL_insert_tail (h->iterate_head,
787                                     h->iterate_tail,
788                                     ic);
789   LOG (GNUNET_ERROR_TYPE_DEBUG,
790        "Sending an iterate request for sub system `%s'\n", sub_system);
791   GNUNET_MQ_send (h->mq, ev);
792   ic->timeout_task =
793       GNUNET_SCHEDULER_add_delayed (timeout, &iterate_timeout, ic);
794   return ic;
795 }
796
797
798 /******************************************************************************/
799 /*******************            WATCH FUNCTIONS           *********************/
800 /******************************************************************************/
801
802 /**
803  * When a watch record is received
804  *
805  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
806  * @param msg message received, NULL on timeout or fatal error
807  */
808 static void
809 handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg)
810 {
811   struct GNUNET_PEERSTORE_Handle *h = cls;
812   struct GNUNET_PEERSTORE_Record *record;
813   struct GNUNET_HashCode keyhash;
814   struct GNUNET_PEERSTORE_WatchContext *wc;
815
816   if (NULL == msg)
817   {
818     LOG (GNUNET_ERROR_TYPE_ERROR,
819          _
820          ("Problem receiving a watch response, no way to determine which request.\n"));
821     reconnect (h);
822     return;
823   }
824   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a watch record from service.\n");
825   record = PEERSTORE_parse_record_message (msg);
826   PEERSTORE_hash_key (record->sub_system, record->peer, record->key, &keyhash);
827   // FIXME: what if there are multiple watches for the same key?
828   wc = GNUNET_CONTAINER_multihashmap_get (h->watches, &keyhash);
829   if (NULL == wc)
830   {
831     LOG (GNUNET_ERROR_TYPE_ERROR,
832          _("Received a watch result for a non existing watch.\n"));
833     PEERSTORE_destroy_record (record);
834     reconnect (h);
835     return;
836   }
837   if (NULL != wc->callback)
838     wc->callback (wc->callback_cls, record, NULL);
839   PEERSTORE_destroy_record (record);
840 }
841
842
843 /**
844  * Cancel a watch request
845  *
846  * @param wc handle to the watch request
847  */
848 void
849 GNUNET_PEERSTORE_watch_cancel (struct GNUNET_PEERSTORE_WatchContext *wc)
850 {
851   struct GNUNET_PEERSTORE_Handle *h = wc->h;
852   struct GNUNET_MQ_Envelope *ev;
853   struct StoreKeyHashMessage *hm;
854
855   LOG (GNUNET_ERROR_TYPE_DEBUG,
856        "Canceling watch.\n");
857   ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL);
858   hm->keyhash = wc->keyhash;
859   GNUNET_MQ_send (h->mq, ev);
860   GNUNET_CONTAINER_multihashmap_remove (h->watches,
861                                         &wc->keyhash,
862                                         wc);
863   GNUNET_free (wc);
864 }
865
866
867 /**
868  * Request watching a given key
869  * User will be notified with any new values added to key
870  *
871  * @param h handle to the PEERSTORE service
872  * @param sub_system name of sub system
873  * @param peer Peer identity
874  * @param key entry key string
875  * @param callback function called with each new value
876  * @param callback_cls closure for @a callback
877  * @return Handle to watch request
878  */
879 struct GNUNET_PEERSTORE_WatchContext *
880 GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h,
881                         const char *sub_system,
882                         const struct GNUNET_PeerIdentity *peer, const char *key,
883                         GNUNET_PEERSTORE_Processor callback, void *callback_cls)
884 {
885   struct GNUNET_MQ_Envelope *ev;
886   struct StoreKeyHashMessage *hm;
887   struct GNUNET_PEERSTORE_WatchContext *wc;
888
889   GNUNET_assert (NULL != sub_system);
890   GNUNET_assert (NULL != peer);
891   GNUNET_assert (NULL != key);
892   ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
893   PEERSTORE_hash_key (sub_system,
894                       peer,
895                       key,
896                       &hm->keyhash);
897   wc = GNUNET_new (struct GNUNET_PEERSTORE_WatchContext);
898   wc->callback = callback;
899   wc->callback_cls = callback_cls;
900   wc->h = h;
901   wc->keyhash = hm->keyhash;
902   if (NULL == h->watches)
903     h->watches = GNUNET_CONTAINER_multihashmap_create (5, GNUNET_NO);
904   GNUNET_assert (GNUNET_OK ==
905                  GNUNET_CONTAINER_multihashmap_put (h->watches,
906                                                     &wc->keyhash,
907                                                     wc,
908                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
909   LOG (GNUNET_ERROR_TYPE_DEBUG,
910        "Sending a watch request for ss `%s', peer `%s', key `%s'.\n",
911        sub_system,
912        GNUNET_i2s (peer),
913        key);
914   GNUNET_MQ_send (h->mq, ev);
915   return wc;
916 }
917
918 /* end of peerstore_api.c */