social: place load/save
[oweals/gnunet.git] / src / peerstore / peerstore_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2013-2014 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file peerstore/peerstore_api.c
22  * @brief API for peerstore
23  * @author Omar Tarabai
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "peerstore.h"
29 #include "peerstore_common.h"
30
31 #define LOG(kind,...) GNUNET_log_from (kind, "peerstore-api",__VA_ARGS__)
32
33 /******************************************************************************/
34 /************************      DATA STRUCTURES     ****************************/
35 /******************************************************************************/
36
37 /**
38  * Handle to the PEERSTORE service.
39  */
40 struct GNUNET_PEERSTORE_Handle
41 {
42
43   /**
44    * Our configuration.
45    */
46   const struct GNUNET_CONFIGURATION_Handle *cfg;
47
48   /**
49    * Connection to the service.
50    */
51   struct GNUNET_CLIENT_Connection *client;
52
53   /**
54    * Message queue
55    */
56   struct GNUNET_MQ_Handle *mq;
57
58   /**
59    * Head of active STORE requests.
60    */
61   struct GNUNET_PEERSTORE_StoreContext *store_head;
62
63   /**
64    * Tail of active STORE requests.
65    */
66   struct GNUNET_PEERSTORE_StoreContext *store_tail;
67
68   /**
69    * Head of active ITERATE requests.
70    */
71   struct GNUNET_PEERSTORE_IterateContext *iterate_head;
72
73   /**
74    * Tail of active ITERATE requests.
75    */
76   struct GNUNET_PEERSTORE_IterateContext *iterate_tail;
77
78   /**
79    * Hashmap of watch requests
80    */
81   struct GNUNET_CONTAINER_MultiHashMap *watches;
82
83   /**
84    * Are we in the process of disconnecting but need to sync first?
85    */
86   int disconnecting;
87
88 };
89
90 /**
91  * Context for a store request
92  */
93 struct GNUNET_PEERSTORE_StoreContext
94 {
95   /**
96    * Kept in a DLL.
97    */
98   struct GNUNET_PEERSTORE_StoreContext *next;
99
100   /**
101    * Kept in a DLL.
102    */
103   struct GNUNET_PEERSTORE_StoreContext *prev;
104
105   /**
106    * Handle to the PEERSTORE service.
107    */
108   struct GNUNET_PEERSTORE_Handle *h;
109
110   /**
111    * Continuation called with service response
112    */
113   GNUNET_PEERSTORE_Continuation cont;
114
115   /**
116    * Closure for @e cont
117    */
118   void *cont_cls;
119
120   /**
121    * Which subsystem does the store?
122    */
123   char *sub_system;
124
125   /**
126    * Key for the store operation.
127    */
128   char *key;
129
130   /**
131    * Contains @e size bytes.
132    */
133   void *value;
134
135   /**
136    * Peer the store is for.
137    */
138   struct GNUNET_PeerIdentity peer;
139
140   /**
141    * Number of bytes in @e value.
142    */
143   size_t size;
144
145   /**
146    * When does the value expire?
147    */
148   struct GNUNET_TIME_Absolute expiry;
149
150   /**
151    * Options for the store operation.
152    */
153   enum GNUNET_PEERSTORE_StoreOption options;
154
155 };
156
157 /**
158  * Context for a iterate request
159  */
160 struct GNUNET_PEERSTORE_IterateContext
161 {
162   /**
163    * Kept in a DLL.
164    */
165   struct GNUNET_PEERSTORE_IterateContext *next;
166
167   /**
168    * Kept in a DLL.
169    */
170   struct GNUNET_PEERSTORE_IterateContext *prev;
171
172   /**
173    * Handle to the PEERSTORE service.
174    */
175   struct GNUNET_PEERSTORE_Handle *h;
176
177   /**
178    * Which subsystem does the store?
179    */
180   char *sub_system;
181
182   /**
183    * Peer the store is for.
184    */
185   struct GNUNET_PeerIdentity peer;
186
187   /**
188    * Key for the store operation.
189    */
190   char *key;
191
192   /**
193    * Operation timeout
194    */
195   struct GNUNET_TIME_Relative timeout;
196
197   /**
198    * Callback with each matching record
199    */
200   GNUNET_PEERSTORE_Processor callback;
201
202   /**
203    * Closure for @e callback
204    */
205   void *callback_cls;
206
207   /**
208    * #GNUNET_YES if we are currently processing records.
209    */
210   int iterating;
211
212   /**
213    * Task identifier for the function called
214    * on iterate request timeout
215    */
216   struct GNUNET_SCHEDULER_Task *timeout_task;
217
218 };
219
220 /**
221  * Context for a watch request
222  */
223 struct GNUNET_PEERSTORE_WatchContext
224 {
225   /**
226    * Kept in a DLL.
227    */
228   struct GNUNET_PEERSTORE_WatchContext *next;
229
230   /**
231    * Kept in a DLL.
232    */
233   struct GNUNET_PEERSTORE_WatchContext *prev;
234
235   /**
236    * Handle to the PEERSTORE service.
237    */
238   struct GNUNET_PEERSTORE_Handle *h;
239
240   /**
241    * Callback with each record received
242    */
243   GNUNET_PEERSTORE_Processor callback;
244
245   /**
246    * Closure for @e callback
247    */
248   void *callback_cls;
249
250   /**
251    * Hash of the combined key
252    */
253   struct GNUNET_HashCode keyhash;
254
255 };
256
257 /******************************************************************************/
258 /*******************             DECLARATIONS             *********************/
259 /******************************************************************************/
260
261 /**
262  * When a response for iterate request is received
263  *
264  * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
265  * @param msg message received, NULL on timeout or fatal error
266  */
267 static void
268 handle_iterate_result (void *cls, const struct GNUNET_MessageHeader *msg);
269
270 /**
271  * When a watch record is received
272  *
273  * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
274  * @param msg message received, NULL on timeout or fatal error
275  */
276 static void
277 handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg);
278
279 /**
280  * Close the existing connection to PEERSTORE and reconnect.
281  *
282  * @param h handle to the service
283  */
284 static void
285 reconnect (struct GNUNET_PEERSTORE_Handle *h);
286
287
288 /**
289  * Callback after MQ envelope is sent
290  *
291  * @param cls a `struct GNUNET_PEERSTORE_StoreContext *`
292  */
293 static void
294 store_request_sent (void *cls)
295 {
296   struct GNUNET_PEERSTORE_StoreContext *sc = cls;
297   GNUNET_PEERSTORE_Continuation cont;
298   void *cont_cls;
299
300   cont = sc->cont;
301   cont_cls = sc->cont_cls;
302   GNUNET_PEERSTORE_store_cancel (sc);
303   if (NULL != cont)
304     cont (cont_cls, GNUNET_OK);
305 }
306
307
308 /**
309  * MQ message handlers
310  */
311 static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
312   {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD, 0},
313   {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END,
314    sizeof (struct GNUNET_MessageHeader)},
315   {&handle_watch_result, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD, 0},
316   GNUNET_MQ_HANDLERS_END
317 };
318
319 /******************************************************************************/
320 /*******************         CONNECTION FUNCTIONS         *********************/
321 /******************************************************************************/
322
323 static void
324 handle_client_error (void *cls, enum GNUNET_MQ_Error error)
325 {
326   struct GNUNET_PEERSTORE_Handle *h = cls;
327
328   LOG (GNUNET_ERROR_TYPE_ERROR,
329        _("Received an error notification from MQ of type: %d\n"), error);
330   reconnect (h);
331 }
332
333
334 /**
335  * Iterator over previous watches to resend them
336  *
337  * @param cls the `struct GNUNET_PEERSTORE_Handle`
338  * @param key key for the watch
339  * @param value the `struct GNUNET_PEERSTORE_WatchContext *`
340  * @return #GNUNET_YES (continue to iterate)
341  */
342 static int
343 rewatch_it (void *cls,
344             const struct GNUNET_HashCode *key,
345             void *value)
346 {
347   struct GNUNET_PEERSTORE_Handle *h = cls;
348   struct GNUNET_PEERSTORE_WatchContext *wc = value;
349   struct StoreKeyHashMessage *hm;
350   struct GNUNET_MQ_Envelope *ev;
351
352   ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
353   hm->keyhash = wc->keyhash;
354   GNUNET_MQ_send (h->mq, ev);
355   return GNUNET_YES;
356 }
357
358
359 /**
360  * Called when the iterate request is timedout
361  *
362  * @param cls a `struct GNUNET_PEERSTORE_IterateContext *`
363  */
364 static void
365 iterate_timeout (void *cls)
366 {
367   struct GNUNET_PEERSTORE_IterateContext *ic = cls;
368   GNUNET_PEERSTORE_Processor callback;
369   void *callback_cls;
370
371   ic->timeout_task = NULL;
372   callback = ic->callback;
373   callback_cls = ic->callback_cls;
374   GNUNET_PEERSTORE_iterate_cancel (ic);
375   if (NULL != callback)
376     callback (callback_cls, NULL, _("timeout"));
377 }
378
379
380 /**
381  * Close the existing connection to PEERSTORE and reconnect.
382  *
383  * @param h handle to the service
384  */
385 static void
386 reconnect (struct GNUNET_PEERSTORE_Handle *h)
387 {
388   struct GNUNET_PEERSTORE_IterateContext *ic;
389   struct GNUNET_PEERSTORE_IterateContext *next;
390   GNUNET_PEERSTORE_Processor icb;
391   void *icb_cls;
392   struct GNUNET_PEERSTORE_StoreContext *sc;
393   struct GNUNET_MQ_Envelope *ev;
394
395   LOG (GNUNET_ERROR_TYPE_DEBUG,
396        "Reconnecting...\n");
397   for (ic = h->iterate_head; NULL != ic; ic = next)
398   {
399     next = ic->next;
400     if (GNUNET_YES == ic->iterating)
401     {
402       icb = ic->callback;
403       icb_cls = ic->callback_cls;
404       GNUNET_PEERSTORE_iterate_cancel (ic);
405       if (NULL != icb)
406         icb (icb_cls,
407              NULL,
408              "Iteration canceled due to reconnection");
409     }
410   }
411   if (NULL != h->mq)
412   {
413     GNUNET_MQ_destroy (h->mq);
414     h->mq = NULL;
415   }
416   if (NULL != h->client)
417   {
418     GNUNET_CLIENT_disconnect (h->client);
419     h->client = NULL;
420   }
421
422   h->client = GNUNET_CLIENT_connect ("peerstore", h->cfg);
423   GNUNET_assert (NULL != h->client);
424   h->mq =
425       GNUNET_MQ_queue_for_connection_client (h->client, mq_handlers,
426                                              &handle_client_error, h);
427   LOG (GNUNET_ERROR_TYPE_DEBUG,
428        "Resending pending requests after reconnect.\n");
429   if (NULL != h->watches)
430     GNUNET_CONTAINER_multihashmap_iterate (h->watches, &rewatch_it, h);
431   for (ic = h->iterate_head; NULL != ic; ic = ic->next)
432   {
433     ev =
434       PEERSTORE_create_record_mq_envelope (ic->sub_system, &ic->peer, ic->key,
435                                            NULL, 0, NULL, 0,
436                                            GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
437     GNUNET_MQ_send (h->mq, ev);
438     ic->timeout_task =
439         GNUNET_SCHEDULER_add_delayed (ic->timeout, &iterate_timeout, ic);
440   }
441   for (sc = h->store_head; NULL != sc; sc = sc->next)
442   {
443     ev =
444       PEERSTORE_create_record_mq_envelope (sc->sub_system, &sc->peer, sc->key,
445                                            sc->value, sc->size, &sc->expiry,
446                                            sc->options,
447                                            GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
448     GNUNET_MQ_notify_sent (ev, &store_request_sent, sc);
449     GNUNET_MQ_send (h->mq, ev);
450   }
451 }
452
453
454 /**
455  * Iterator over watch requests to cancel them.
456  *
457  * @param cls unsused
458  * @param key key to the watch request
459  * @param value watch context
460  * @return #GNUNET_YES to continue iteration
461  */
462 static int
463 destroy_watch (void *cls, const struct GNUNET_HashCode *key, void *value)
464 {
465   struct GNUNET_PEERSTORE_WatchContext *wc = value;
466
467   GNUNET_PEERSTORE_watch_cancel (wc);
468   return GNUNET_YES;
469 }
470
471
472 /**
473  * Kill the connection to the service. This can be delayed in case of pending
474  * STORE requests and the user explicitly asked to sync first. Otherwise it is
475  * performed instantly.
476  *
477  * @param h Handle to the service.
478  */
479 static void
480 do_disconnect (struct GNUNET_PEERSTORE_Handle *h)
481 {
482   if (NULL != h->mq)
483   {
484     GNUNET_MQ_destroy (h->mq);
485     h->mq = NULL;
486   }
487   if (NULL != h->client)
488   {
489     GNUNET_CLIENT_disconnect (h->client);
490     h->client = NULL;
491   }
492   GNUNET_free (h);
493 }
494
495
496 /**
497  * Connect to the PEERSTORE service.
498  *
499  * @param cfg configuration to use
500  * @return NULL on error
501  */
502 struct GNUNET_PEERSTORE_Handle *
503 GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
504 {
505   struct GNUNET_PEERSTORE_Handle *h;
506
507   h = GNUNET_new (struct GNUNET_PEERSTORE_Handle);
508
509   h->client = GNUNET_CLIENT_connect ("peerstore", cfg);
510   if (NULL == h->client)
511   {
512     GNUNET_free (h);
513     return NULL;
514   }
515   h->cfg = cfg;
516   h->disconnecting = GNUNET_NO;
517   h->mq =
518       GNUNET_MQ_queue_for_connection_client (h->client, mq_handlers,
519                                              &handle_client_error, h);
520   if (NULL == h->mq)
521   {
522     GNUNET_CLIENT_disconnect (h->client);
523     GNUNET_free (h);
524     return NULL;
525   }
526   LOG (GNUNET_ERROR_TYPE_DEBUG, "New connection created\n");
527   return h;
528 }
529
530
531 /**
532  * Disconnect from the PEERSTORE service. Any pending ITERATE and WATCH requests
533  * will be canceled.
534  * Any pending STORE requests will depend on @e snyc_first flag.
535  *
536  * @param h handle to disconnect
537  * @param sync_first send any pending STORE requests before disconnecting
538  */
539 void
540 GNUNET_PEERSTORE_disconnect (struct GNUNET_PEERSTORE_Handle *h,
541                              int sync_first)
542 {
543   struct GNUNET_PEERSTORE_IterateContext *ic;
544   struct GNUNET_PEERSTORE_StoreContext *sc;
545
546   LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting.\n");
547   if (NULL != h->watches)
548   {
549     GNUNET_CONTAINER_multihashmap_iterate (h->watches, &destroy_watch, NULL);
550     GNUNET_CONTAINER_multihashmap_destroy (h->watches);
551     h->watches = NULL;
552   }
553   while (NULL != (ic = h->iterate_head))
554   {
555     GNUNET_break (0);
556     GNUNET_PEERSTORE_iterate_cancel (ic);
557   }
558   if (NULL != h->store_head)
559   {
560     if (GNUNET_YES == sync_first)
561     {
562       LOG (GNUNET_ERROR_TYPE_DEBUG,
563            "Delaying disconnection due to pending store requests.\n");
564       h->disconnecting = GNUNET_YES;
565       return;
566     }
567     while (NULL != (sc = h->store_head))
568       GNUNET_PEERSTORE_store_cancel (sc);
569   }
570   do_disconnect (h);
571 }
572
573
574 /******************************************************************************/
575 /*******************            STORE FUNCTIONS           *********************/
576 /******************************************************************************/
577
578
579 /**
580  * Cancel a store request
581  *
582  * @param sc Store request context
583  */
584 void
585 GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc)
586 {
587   struct GNUNET_PEERSTORE_Handle *h = sc->h;
588
589   GNUNET_CONTAINER_DLL_remove (sc->h->store_head, sc->h->store_tail, sc);
590   GNUNET_free (sc->sub_system);
591   GNUNET_free (sc->value);
592   GNUNET_free (sc->key);
593   GNUNET_free (sc);
594   if ((GNUNET_YES == h->disconnecting) && (NULL == h->store_head))
595     do_disconnect (h);
596 }
597
598
599 /**
600  * Store a new entry in the PEERSTORE.
601  * Note that stored entries can be lost in some cases
602  * such as power failure.
603  *
604  * @param h Handle to the PEERSTORE service
605  * @param sub_system name of the sub system
606  * @param peer Peer Identity
607  * @param key entry key
608  * @param value entry value BLOB
609  * @param size size of @e value
610  * @param expiry absolute time after which the entry is (possibly) deleted
611  * @param options options specific to the storage operation
612  * @param cont Continuation function after the store request is sent
613  * @param cont_cls Closure for @a cont
614  */
615 struct GNUNET_PEERSTORE_StoreContext *
616 GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
617                         const char *sub_system,
618                         const struct GNUNET_PeerIdentity *peer, const char *key,
619                         const void *value, size_t size,
620                         struct GNUNET_TIME_Absolute expiry,
621                         enum GNUNET_PEERSTORE_StoreOption options,
622                         GNUNET_PEERSTORE_Continuation cont, void *cont_cls)
623 {
624   struct GNUNET_MQ_Envelope *ev;
625   struct GNUNET_PEERSTORE_StoreContext *sc;
626
627   LOG (GNUNET_ERROR_TYPE_DEBUG,
628        "Storing value (size: %lu) for subsytem `%s', peer `%s', key `%s'\n",
629        size, sub_system, GNUNET_i2s (peer), key);
630   ev = PEERSTORE_create_record_mq_envelope (sub_system, peer, key, value, size,
631                                             &expiry, options,
632                                             GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
633   sc = GNUNET_new (struct GNUNET_PEERSTORE_StoreContext);
634
635   sc->sub_system = GNUNET_strdup (sub_system);
636   sc->peer = *peer;
637   sc->key = GNUNET_strdup (key);
638   sc->value = GNUNET_memdup (value, size);
639   sc->size = size;
640   sc->expiry = expiry;
641   sc->options = options;
642   sc->cont = cont;
643   sc->cont_cls = cont_cls;
644   sc->h = h;
645
646   GNUNET_CONTAINER_DLL_insert_tail (h->store_head, h->store_tail, sc);
647   GNUNET_MQ_notify_sent (ev, &store_request_sent, sc);
648   GNUNET_MQ_send (h->mq, ev);
649   return sc;
650
651 }
652
653
654 /******************************************************************************/
655 /*******************           ITERATE FUNCTIONS          *********************/
656 /******************************************************************************/
657
658 /**
659  * When a response for iterate request is received
660  *
661  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
662  * @param msg message received, NULL on timeout or fatal error
663  */
664 static void
665 handle_iterate_result (void *cls, const struct GNUNET_MessageHeader *msg)
666 {
667   struct GNUNET_PEERSTORE_Handle *h = cls;
668   struct GNUNET_PEERSTORE_IterateContext *ic;
669   GNUNET_PEERSTORE_Processor callback;
670   void *callback_cls;
671   uint16_t msg_type;
672   struct GNUNET_PEERSTORE_Record *record;
673   int continue_iter;
674
675   ic = h->iterate_head;
676   if (NULL == ic)
677   {
678     LOG (GNUNET_ERROR_TYPE_ERROR,
679          _("Unexpected iteration response, this should not happen.\n"));
680     reconnect (h);
681     return;
682   }
683   ic->iterating = GNUNET_YES;
684   callback = ic->callback;
685   callback_cls = ic->callback_cls;
686   if (NULL == msg)              /* Connection error */
687   {
688     if (NULL != callback)
689       callback (callback_cls, NULL,
690                 _("Error communicating with `PEERSTORE' service."));
691     reconnect (h);
692     return;
693   }
694   msg_type = ntohs (msg->type);
695   if (GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END == msg_type)
696   {
697     ic->iterating = GNUNET_NO;
698     GNUNET_PEERSTORE_iterate_cancel (ic);
699     if (NULL != callback)
700       callback (callback_cls, NULL, NULL);
701     return;
702   }
703   if (NULL != callback)
704   {
705     record = PEERSTORE_parse_record_message (msg);
706     if (NULL == record)
707       continue_iter =
708           callback (callback_cls, NULL,
709                     _("Received a malformed response from service."));
710     else
711     {
712       continue_iter = callback (callback_cls, record, NULL);
713       PEERSTORE_destroy_record (record);
714     }
715     if (GNUNET_NO == continue_iter)
716       ic->callback = NULL;
717   }
718 }
719
720
721 /**
722  * Cancel an iterate request
723  * Please do not call after the iterate request is done
724  *
725  * @param ic Iterate request context as returned by GNUNET_PEERSTORE_iterate()
726  */
727 void
728 GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic)
729 {
730   if (NULL != ic->timeout_task)
731   {
732     GNUNET_SCHEDULER_cancel (ic->timeout_task);
733     ic->timeout_task = NULL;
734   }
735   if (GNUNET_NO == ic->iterating)
736   {
737     GNUNET_CONTAINER_DLL_remove (ic->h->iterate_head,
738                                  ic->h->iterate_tail,
739                                  ic);
740     GNUNET_free (ic->sub_system);
741     if (NULL != ic->key)
742       GNUNET_free (ic->key);
743     GNUNET_free (ic);
744   }
745   else
746     ic->callback = NULL;
747 }
748
749
750 /**
751  * Iterate over records matching supplied key information
752  *
753  * @param h handle to the PEERSTORE service
754  * @param sub_system name of sub system
755  * @param peer Peer identity (can be NULL)
756  * @param key entry key string (can be NULL)
757  * @param timeout time after which the iterate request is canceled
758  * @param callback function called with each matching record, all NULL's on end
759  * @param callback_cls closure for @a callback
760  * @return Handle to iteration request
761  */
762 struct GNUNET_PEERSTORE_IterateContext *
763 GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
764                           const char *sub_system,
765                           const struct GNUNET_PeerIdentity *peer,
766                           const char *key, struct GNUNET_TIME_Relative timeout,
767                           GNUNET_PEERSTORE_Processor callback,
768                           void *callback_cls)
769 {
770   struct GNUNET_MQ_Envelope *ev;
771   struct GNUNET_PEERSTORE_IterateContext *ic;
772
773   ev = PEERSTORE_create_record_mq_envelope (sub_system, peer, key, NULL, 0,
774                                             NULL, 0,
775                                             GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
776   ic = GNUNET_new (struct GNUNET_PEERSTORE_IterateContext);
777
778   ic->callback = callback;
779   ic->callback_cls = callback_cls;
780   ic->h = h;
781   ic->sub_system = GNUNET_strdup (sub_system);
782   if (NULL != peer)
783     ic->peer = *peer;
784   if (NULL != key)
785     ic->key = GNUNET_strdup (key);
786   ic->timeout = timeout;
787   GNUNET_CONTAINER_DLL_insert_tail (h->iterate_head,
788                                     h->iterate_tail,
789                                     ic);
790   LOG (GNUNET_ERROR_TYPE_DEBUG,
791        "Sending an iterate request for sub system `%s'\n", sub_system);
792   GNUNET_MQ_send (h->mq, ev);
793   ic->timeout_task =
794       GNUNET_SCHEDULER_add_delayed (timeout, &iterate_timeout, ic);
795   return ic;
796 }
797
798
799 /******************************************************************************/
800 /*******************            WATCH FUNCTIONS           *********************/
801 /******************************************************************************/
802
803 /**
804  * When a watch record is received
805  *
806  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
807  * @param msg message received, NULL on timeout or fatal error
808  */
809 static void
810 handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg)
811 {
812   struct GNUNET_PEERSTORE_Handle *h = cls;
813   struct GNUNET_PEERSTORE_Record *record;
814   struct GNUNET_HashCode keyhash;
815   struct GNUNET_PEERSTORE_WatchContext *wc;
816
817   if (NULL == msg)
818   {
819     LOG (GNUNET_ERROR_TYPE_ERROR,
820          _
821          ("Problem receiving a watch response, no way to determine which request.\n"));
822     reconnect (h);
823     return;
824   }
825   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a watch record from service.\n");
826   record = PEERSTORE_parse_record_message (msg);
827   PEERSTORE_hash_key (record->sub_system, record->peer, record->key, &keyhash);
828   // FIXME: what if there are multiple watches for the same key?
829   wc = GNUNET_CONTAINER_multihashmap_get (h->watches, &keyhash);
830   if (NULL == wc)
831   {
832     LOG (GNUNET_ERROR_TYPE_ERROR,
833          _("Received a watch result for a non existing watch.\n"));
834     PEERSTORE_destroy_record (record);
835     reconnect (h);
836     return;
837   }
838   if (NULL != wc->callback)
839     wc->callback (wc->callback_cls, record, NULL);
840   PEERSTORE_destroy_record (record);
841 }
842
843
844 /**
845  * Cancel a watch request
846  *
847  * @param wc handle to the watch request
848  */
849 void
850 GNUNET_PEERSTORE_watch_cancel (struct GNUNET_PEERSTORE_WatchContext *wc)
851 {
852   struct GNUNET_PEERSTORE_Handle *h = wc->h;
853   struct GNUNET_MQ_Envelope *ev;
854   struct StoreKeyHashMessage *hm;
855
856   LOG (GNUNET_ERROR_TYPE_DEBUG,
857        "Canceling watch.\n");
858   ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL);
859   hm->keyhash = wc->keyhash;
860   GNUNET_MQ_send (h->mq, ev);
861   GNUNET_CONTAINER_multihashmap_remove (h->watches,
862                                         &wc->keyhash,
863                                         wc);
864   GNUNET_free (wc);
865 }
866
867
868 /**
869  * Request watching a given key
870  * User will be notified with any new values added to key
871  *
872  * @param h handle to the PEERSTORE service
873  * @param sub_system name of sub system
874  * @param peer Peer identity
875  * @param key entry key string
876  * @param callback function called with each new value
877  * @param callback_cls closure for @a callback
878  * @return Handle to watch request
879  */
880 struct GNUNET_PEERSTORE_WatchContext *
881 GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h,
882                         const char *sub_system,
883                         const struct GNUNET_PeerIdentity *peer, const char *key,
884                         GNUNET_PEERSTORE_Processor callback, void *callback_cls)
885 {
886   struct GNUNET_MQ_Envelope *ev;
887   struct StoreKeyHashMessage *hm;
888   struct GNUNET_PEERSTORE_WatchContext *wc;
889
890   GNUNET_assert (NULL != sub_system);
891   GNUNET_assert (NULL != peer);
892   GNUNET_assert (NULL != key);
893   ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
894   PEERSTORE_hash_key (sub_system,
895                       peer,
896                       key,
897                       &hm->keyhash);
898   wc = GNUNET_new (struct GNUNET_PEERSTORE_WatchContext);
899   wc->callback = callback;
900   wc->callback_cls = callback_cls;
901   wc->h = h;
902   wc->keyhash = hm->keyhash;
903   if (NULL == h->watches)
904     h->watches = GNUNET_CONTAINER_multihashmap_create (5, GNUNET_NO);
905   GNUNET_assert (GNUNET_OK ==
906                  GNUNET_CONTAINER_multihashmap_put (h->watches,
907                                                     &wc->keyhash,
908                                                     wc,
909                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
910   LOG (GNUNET_ERROR_TYPE_DEBUG,
911        "Sending a watch request for ss `%s', peer `%s', key `%s'.\n",
912        sub_system,
913        GNUNET_i2s (peer),
914        key);
915   GNUNET_MQ_send (h->mq, ev);
916   return wc;
917 }
918
919 /* end of peerstore_api.c */