src: for every AGPL3.0 file, add SPDX identifier.
[oweals/gnunet.git] / src / peerstore / peerstore_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2013-2016 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14     
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19 */
20 /**
21  * @file peerstore/peerstore_api.c
22  * @brief API for peerstore
23  * @author Omar Tarabai
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "peerstore.h"
29 #include "peerstore_common.h"
30
31 #define LOG(kind,...) GNUNET_log_from (kind, "peerstore-api",__VA_ARGS__)
32
33 /******************************************************************************/
34 /************************      DATA STRUCTURES     ****************************/
35 /******************************************************************************/
36
37 /**
38  * Handle to the PEERSTORE service.
39  */
40 struct GNUNET_PEERSTORE_Handle
41 {
42
43   /**
44    * Our configuration.
45    */
46   const struct GNUNET_CONFIGURATION_Handle *cfg;
47
48   /**
49    * Message queue
50    */
51   struct GNUNET_MQ_Handle *mq;
52
53   /**
54    * Head of active STORE requests.
55    */
56   struct GNUNET_PEERSTORE_StoreContext *store_head;
57
58   /**
59    * Tail of active STORE requests.
60    */
61   struct GNUNET_PEERSTORE_StoreContext *store_tail;
62
63   /**
64    * Head of active ITERATE requests.
65    */
66   struct GNUNET_PEERSTORE_IterateContext *iterate_head;
67
68   /**
69    * Tail of active ITERATE requests.
70    */
71   struct GNUNET_PEERSTORE_IterateContext *iterate_tail;
72
73   /**
74    * Hashmap of watch requests
75    */
76   struct GNUNET_CONTAINER_MultiHashMap *watches;
77
78   /**
79    * Are we in the process of disconnecting but need to sync first?
80    */
81   int disconnecting;
82
83 };
84
85 /**
86  * Context for a store request
87  */
88 struct GNUNET_PEERSTORE_StoreContext
89 {
90   /**
91    * Kept in a DLL.
92    */
93   struct GNUNET_PEERSTORE_StoreContext *next;
94
95   /**
96    * Kept in a DLL.
97    */
98   struct GNUNET_PEERSTORE_StoreContext *prev;
99
100   /**
101    * Handle to the PEERSTORE service.
102    */
103   struct GNUNET_PEERSTORE_Handle *h;
104
105   /**
106    * Continuation called with service response
107    */
108   GNUNET_PEERSTORE_Continuation cont;
109
110   /**
111    * Closure for @e cont
112    */
113   void *cont_cls;
114
115   /**
116    * Which subsystem does the store?
117    */
118   char *sub_system;
119
120   /**
121    * Key for the store operation.
122    */
123   char *key;
124
125   /**
126    * Contains @e size bytes.
127    */
128   void *value;
129
130   /**
131    * Peer the store is for.
132    */
133   struct GNUNET_PeerIdentity peer;
134
135   /**
136    * Number of bytes in @e value.
137    */
138   size_t size;
139
140   /**
141    * When does the value expire?
142    */
143   struct GNUNET_TIME_Absolute expiry;
144
145   /**
146    * Options for the store operation.
147    */
148   enum GNUNET_PEERSTORE_StoreOption options;
149
150 };
151
152 /**
153  * Context for a iterate request
154  */
155 struct GNUNET_PEERSTORE_IterateContext
156 {
157   /**
158    * Kept in a DLL.
159    */
160   struct GNUNET_PEERSTORE_IterateContext *next;
161
162   /**
163    * Kept in a DLL.
164    */
165   struct GNUNET_PEERSTORE_IterateContext *prev;
166
167   /**
168    * Handle to the PEERSTORE service.
169    */
170   struct GNUNET_PEERSTORE_Handle *h;
171
172   /**
173    * Which subsystem does the store?
174    */
175   char *sub_system;
176
177   /**
178    * Peer the store is for.
179    */
180   struct GNUNET_PeerIdentity peer;
181
182   /**
183    * Key for the store operation.
184    */
185   char *key;
186
187   /**
188    * Callback with each matching record
189    */
190   GNUNET_PEERSTORE_Processor callback;
191
192   /**
193    * Closure for @e callback
194    */
195   void *callback_cls;
196
197   /**
198    * #GNUNET_YES if we are currently processing records.
199    */
200   int iterating;
201
202 };
203
204 /**
205  * Context for a watch request
206  */
207 struct GNUNET_PEERSTORE_WatchContext
208 {
209   /**
210    * Kept in a DLL.
211    */
212   struct GNUNET_PEERSTORE_WatchContext *next;
213
214   /**
215    * Kept in a DLL.
216    */
217   struct GNUNET_PEERSTORE_WatchContext *prev;
218
219   /**
220    * Handle to the PEERSTORE service.
221    */
222   struct GNUNET_PEERSTORE_Handle *h;
223
224   /**
225    * Callback with each record received
226    */
227   GNUNET_PEERSTORE_Processor callback;
228
229   /**
230    * Closure for @e callback
231    */
232   void *callback_cls;
233
234   /**
235    * Hash of the combined key
236    */
237   struct GNUNET_HashCode keyhash;
238
239 };
240
241 /******************************************************************************/
242 /*******************             DECLARATIONS             *********************/
243 /******************************************************************************/
244
245 /**
246  * Close the existing connection to PEERSTORE and reconnect.
247  *
248  * @param h handle to the service
249  */
250 static void
251 reconnect (struct GNUNET_PEERSTORE_Handle *h);
252
253
254 /**
255  * Callback after MQ envelope is sent
256  *
257  * @param cls a `struct GNUNET_PEERSTORE_StoreContext *`
258  */
259 static void
260 store_request_sent (void *cls)
261 {
262   struct GNUNET_PEERSTORE_StoreContext *sc = cls;
263   GNUNET_PEERSTORE_Continuation cont;
264   void *cont_cls;
265
266   cont = sc->cont;
267   cont_cls = sc->cont_cls;
268   GNUNET_PEERSTORE_store_cancel (sc);
269   if (NULL != cont)
270     cont (cont_cls, GNUNET_OK);
271 }
272
273
274 /******************************************************************************/
275 /*******************         CONNECTION FUNCTIONS         *********************/
276 /******************************************************************************/
277
278
279 /**
280  * Function called when we had trouble talking to the service.
281  */
282 static void
283 handle_client_error (void *cls,
284                      enum GNUNET_MQ_Error error)
285 {
286   struct GNUNET_PEERSTORE_Handle *h = cls;
287
288   LOG (GNUNET_ERROR_TYPE_ERROR,
289        "Received an error notification from MQ of type: %d\n",
290        error);
291   reconnect (h);
292 }
293
294
295 /**
296  * Iterator over previous watches to resend them
297  *
298  * @param cls the `struct GNUNET_PEERSTORE_Handle`
299  * @param key key for the watch
300  * @param value the `struct GNUNET_PEERSTORE_WatchContext *`
301  * @return #GNUNET_YES (continue to iterate)
302  */
303 static int
304 rewatch_it (void *cls,
305             const struct GNUNET_HashCode *key,
306             void *value)
307 {
308   struct GNUNET_PEERSTORE_Handle *h = cls;
309   struct GNUNET_PEERSTORE_WatchContext *wc = value;
310   struct StoreKeyHashMessage *hm;
311   struct GNUNET_MQ_Envelope *ev;
312
313   ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
314   hm->keyhash = wc->keyhash;
315   GNUNET_MQ_send (h->mq, ev);
316   return GNUNET_YES;
317 }
318
319
320 /**
321  * Iterator over watch requests to cancel them.
322  *
323  * @param cls unsused
324  * @param key key to the watch request
325  * @param value watch context
326  * @return #GNUNET_YES to continue iteration
327  */
328 static int
329 destroy_watch (void *cls,
330                const struct GNUNET_HashCode *key,
331                void *value)
332 {
333   struct GNUNET_PEERSTORE_WatchContext *wc = value;
334
335   GNUNET_PEERSTORE_watch_cancel (wc);
336   return GNUNET_YES;
337 }
338
339
340 /**
341  * Kill the connection to the service. This can be delayed in case of pending
342  * STORE requests and the user explicitly asked to sync first. Otherwise it is
343  * performed instantly.
344  *
345  * @param h Handle to the service.
346  */
347 static void
348 do_disconnect (struct GNUNET_PEERSTORE_Handle *h)
349 {
350   if (NULL != h->mq)
351   {
352     GNUNET_MQ_destroy (h->mq);
353     h->mq = NULL;
354   }
355   GNUNET_free (h);
356 }
357
358
359 /**
360  * Connect to the PEERSTORE service.
361  *
362  * @param cfg configuration to use
363  * @return NULL on error
364  */
365 struct GNUNET_PEERSTORE_Handle *
366 GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
367 {
368   struct GNUNET_PEERSTORE_Handle *h;
369
370   h = GNUNET_new (struct GNUNET_PEERSTORE_Handle);
371   h->cfg = cfg;
372   h->disconnecting = GNUNET_NO;
373   reconnect (h);
374   if (NULL == h->mq)
375   {
376     GNUNET_free (h);
377     return NULL;
378   }
379   return h;
380 }
381
382
383 /**
384  * Disconnect from the PEERSTORE service. Any pending ITERATE and WATCH requests
385  * will be canceled.
386  * Any pending STORE requests will depend on @e snyc_first flag.
387  *
388  * @param h handle to disconnect
389  * @param sync_first send any pending STORE requests before disconnecting
390  */
391 void
392 GNUNET_PEERSTORE_disconnect (struct GNUNET_PEERSTORE_Handle *h,
393                              int sync_first)
394 {
395   struct GNUNET_PEERSTORE_IterateContext *ic;
396   struct GNUNET_PEERSTORE_StoreContext *sc;
397
398   LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting.\n");
399   if (NULL != h->watches)
400   {
401     GNUNET_CONTAINER_multihashmap_iterate (h->watches, &destroy_watch, NULL);
402     GNUNET_CONTAINER_multihashmap_destroy (h->watches);
403     h->watches = NULL;
404   }
405   while (NULL != (ic = h->iterate_head))
406   {
407     GNUNET_break (0);
408     GNUNET_PEERSTORE_iterate_cancel (ic);
409   }
410   if (NULL != h->store_head)
411   {
412     if (GNUNET_YES == sync_first)
413     {
414       LOG (GNUNET_ERROR_TYPE_DEBUG,
415            "Delaying disconnection due to pending store requests.\n");
416       h->disconnecting = GNUNET_YES;
417       return;
418     }
419     while (NULL != (sc = h->store_head))
420       GNUNET_PEERSTORE_store_cancel (sc);
421   }
422   do_disconnect (h);
423 }
424
425
426 /******************************************************************************/
427 /*******************            STORE FUNCTIONS           *********************/
428 /******************************************************************************/
429
430
431 /**
432  * Cancel a store request
433  *
434  * @param sc Store request context
435  */
436 void
437 GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc)
438 {
439   struct GNUNET_PEERSTORE_Handle *h = sc->h;
440
441   GNUNET_CONTAINER_DLL_remove (sc->h->store_head, sc->h->store_tail, sc);
442   GNUNET_free (sc->sub_system);
443   GNUNET_free (sc->value);
444   GNUNET_free (sc->key);
445   GNUNET_free (sc);
446   if ((GNUNET_YES == h->disconnecting) && (NULL == h->store_head))
447     do_disconnect (h);
448 }
449
450
451 /**
452  * Store a new entry in the PEERSTORE.
453  * Note that stored entries can be lost in some cases
454  * such as power failure.
455  *
456  * @param h Handle to the PEERSTORE service
457  * @param sub_system name of the sub system
458  * @param peer Peer Identity
459  * @param key entry key
460  * @param value entry value BLOB
461  * @param size size of @e value
462  * @param expiry absolute time after which the entry is (possibly) deleted
463  * @param options options specific to the storage operation
464  * @param cont Continuation function after the store request is sent
465  * @param cont_cls Closure for @a cont
466  */
467 struct GNUNET_PEERSTORE_StoreContext *
468 GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
469                         const char *sub_system,
470                         const struct GNUNET_PeerIdentity *peer,
471                         const char *key,
472                         const void *value, size_t size,
473                         struct GNUNET_TIME_Absolute expiry,
474                         enum GNUNET_PEERSTORE_StoreOption options,
475                         GNUNET_PEERSTORE_Continuation cont,
476                         void *cont_cls)
477 {
478   struct GNUNET_MQ_Envelope *ev;
479   struct GNUNET_PEERSTORE_StoreContext *sc;
480
481   LOG (GNUNET_ERROR_TYPE_DEBUG,
482        "Storing value (size: %lu) for subsytem `%s', peer `%s', key `%s'\n",
483        size, sub_system, GNUNET_i2s (peer), key);
484   ev = PEERSTORE_create_record_mq_envelope (sub_system, peer, key, value, size,
485                                             expiry, options,
486                                             GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
487   sc = GNUNET_new (struct GNUNET_PEERSTORE_StoreContext);
488
489   sc->sub_system = GNUNET_strdup (sub_system);
490   sc->peer = *peer;
491   sc->key = GNUNET_strdup (key);
492   sc->value = GNUNET_memdup (value, size);
493   sc->size = size;
494   sc->expiry = expiry;
495   sc->options = options;
496   sc->cont = cont;
497   sc->cont_cls = cont_cls;
498   sc->h = h;
499
500   GNUNET_CONTAINER_DLL_insert_tail (h->store_head, h->store_tail, sc);
501   GNUNET_MQ_notify_sent (ev, &store_request_sent, sc);
502   GNUNET_MQ_send (h->mq, ev);
503   return sc;
504
505 }
506
507
508 /******************************************************************************/
509 /*******************           ITERATE FUNCTIONS          *********************/
510 /******************************************************************************/
511
512
513 /**
514  * When a response for iterate request is received
515  *
516  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
517  * @param msg message received
518  */
519 static void
520 handle_iterate_end (void *cls,
521                     const struct GNUNET_MessageHeader *msg)
522 {
523   struct GNUNET_PEERSTORE_Handle *h = cls;
524   struct GNUNET_PEERSTORE_IterateContext *ic;
525   GNUNET_PEERSTORE_Processor callback;
526   void *callback_cls;
527
528   ic = h->iterate_head;
529   if (NULL == ic)
530   {
531     LOG (GNUNET_ERROR_TYPE_ERROR,
532          _("Unexpected iteration response, this should not happen.\n"));
533     reconnect (h);
534     return;
535   }
536   callback = ic->callback;
537   callback_cls = ic->callback_cls;
538   ic->iterating = GNUNET_NO;
539   GNUNET_PEERSTORE_iterate_cancel (ic);
540   if (NULL != callback)
541     callback (callback_cls, NULL, NULL);
542 }
543
544
545 /**
546  * When a response for iterate request is received, check the
547  * message is well-formed.
548  *
549  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
550  * @param msg message received
551  */
552 static int
553 check_iterate_result (void *cls,
554                       const struct StoreRecordMessage *msg)
555 {
556   /* we defer validation to #handle_iterate_result */
557   return GNUNET_OK;
558 }
559
560
561 /**
562  * When a response for iterate request is received
563  *
564  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
565  * @param msg message received
566  */
567 static void
568 handle_iterate_result (void *cls,
569                        const struct StoreRecordMessage *msg)
570 {
571   struct GNUNET_PEERSTORE_Handle *h = cls;
572   struct GNUNET_PEERSTORE_IterateContext *ic;
573   GNUNET_PEERSTORE_Processor callback;
574   void *callback_cls;
575   struct GNUNET_PEERSTORE_Record *record;
576
577   ic = h->iterate_head;
578   if (NULL == ic)
579   {
580     LOG (GNUNET_ERROR_TYPE_ERROR,
581          _("Unexpected iteration response, this should not happen.\n"));
582     reconnect (h);
583     return;
584   }
585   ic->iterating = GNUNET_YES;
586   callback = ic->callback;
587   callback_cls = ic->callback_cls;
588   if (NULL == callback)
589     return;
590   record = PEERSTORE_parse_record_message (msg);
591   if (NULL == record)
592   {
593     callback (callback_cls,
594               NULL,
595               _("Received a malformed response from service."));
596   }
597   else
598   {
599     callback (callback_cls,
600               record,
601               NULL);
602     PEERSTORE_destroy_record (record);
603   }
604 }
605
606
607 /**
608  * Cancel an iterate request
609  * Please do not call after the iterate request is done
610  *
611  * @param ic Iterate request context as returned by GNUNET_PEERSTORE_iterate()
612  */
613 void
614 GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic)
615 {
616   if (GNUNET_NO == ic->iterating)
617   {
618     GNUNET_CONTAINER_DLL_remove (ic->h->iterate_head,
619                                  ic->h->iterate_tail,
620                                  ic);
621     GNUNET_free (ic->sub_system);
622     GNUNET_free_non_null (ic->key);
623     GNUNET_free (ic);
624   }
625   else
626     ic->callback = NULL;
627 }
628
629
630 /**
631  * Iterate over records matching supplied key information
632  *
633  * @param h handle to the PEERSTORE service
634  * @param sub_system name of sub system
635  * @param peer Peer identity (can be NULL)
636  * @param key entry key string (can be NULL)
637  * @param callback function called with each matching record, all NULL's on end
638  * @param callback_cls closure for @a callback
639  * @return Handle to iteration request
640  */
641 struct GNUNET_PEERSTORE_IterateContext *
642 GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
643                           const char *sub_system,
644                           const struct GNUNET_PeerIdentity *peer,
645                           const char *key,
646                           GNUNET_PEERSTORE_Processor callback,
647                           void *callback_cls)
648 {
649   struct GNUNET_MQ_Envelope *ev;
650   struct GNUNET_PEERSTORE_IterateContext *ic;
651
652   ev = PEERSTORE_create_record_mq_envelope (sub_system,
653                                             peer,
654                                             key,
655                                             NULL, 0,
656                                             GNUNET_TIME_UNIT_FOREVER_ABS,
657                                             0,
658                                             GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
659   ic = GNUNET_new (struct GNUNET_PEERSTORE_IterateContext);
660   ic->callback = callback;
661   ic->callback_cls = callback_cls;
662   ic->h = h;
663   ic->sub_system = GNUNET_strdup (sub_system);
664   if (NULL != peer)
665     ic->peer = *peer;
666   if (NULL != key)
667     ic->key = GNUNET_strdup (key);
668   GNUNET_CONTAINER_DLL_insert_tail (h->iterate_head,
669                                     h->iterate_tail,
670                                     ic);
671   LOG (GNUNET_ERROR_TYPE_DEBUG,
672        "Sending an iterate request for sub system `%s'\n",
673        sub_system);
674   GNUNET_MQ_send (h->mq, ev);
675   return ic;
676 }
677
678
679 /******************************************************************************/
680 /*******************            WATCH FUNCTIONS           *********************/
681 /******************************************************************************/
682
683 /**
684  * When a watch record is received, validate it is well-formed.
685  *
686  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
687  * @param msg message received
688  */
689 static int
690 check_watch_record (void *cls,
691                     const struct StoreRecordMessage *msg)
692 {
693   /* we defer validation to #handle_watch_result */
694   return GNUNET_OK;
695 }
696
697
698 /**
699  * When a watch record is received, process it.
700  *
701  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
702  * @param msg message received
703  */
704 static void
705 handle_watch_record (void *cls,
706                      const struct StoreRecordMessage *msg)
707 {
708   struct GNUNET_PEERSTORE_Handle *h = cls;
709   struct GNUNET_PEERSTORE_Record *record;
710   struct GNUNET_HashCode keyhash;
711   struct GNUNET_PEERSTORE_WatchContext *wc;
712
713   LOG (GNUNET_ERROR_TYPE_DEBUG,
714        "Received a watch record from service.\n");
715   record = PEERSTORE_parse_record_message (msg);
716   if (NULL == record)
717   {
718     reconnect (h);
719     return;
720   }
721   PEERSTORE_hash_key (record->sub_system,
722                       &record->peer,
723                       record->key,
724                       &keyhash);
725   // FIXME: what if there are multiple watches for the same key?
726   wc = GNUNET_CONTAINER_multihashmap_get (h->watches,
727                                           &keyhash);
728   if (NULL == wc)
729   {
730     LOG (GNUNET_ERROR_TYPE_ERROR,
731          _("Received a watch result for a non existing watch.\n"));
732     PEERSTORE_destroy_record (record);
733     reconnect (h);
734     return;
735   }
736   if (NULL != wc->callback)
737     wc->callback (wc->callback_cls,
738                   record,
739                   NULL);
740   PEERSTORE_destroy_record (record);
741 }
742
743
744 /**
745  * Close the existing connection to PEERSTORE and reconnect.
746  *
747  * @param h handle to the service
748  */
749 static void
750 reconnect (struct GNUNET_PEERSTORE_Handle *h)
751 {
752   struct GNUNET_MQ_MessageHandler mq_handlers[] = {
753     GNUNET_MQ_hd_fixed_size (iterate_end,
754                              GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END,
755                              struct GNUNET_MessageHeader,
756                              h),
757     GNUNET_MQ_hd_var_size (iterate_result,
758                            GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD,
759                            struct StoreRecordMessage,
760                            h),
761     GNUNET_MQ_hd_var_size (watch_record,
762                            GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD,
763                            struct StoreRecordMessage,
764                            h),
765     GNUNET_MQ_handler_end ()
766   };
767   struct GNUNET_PEERSTORE_IterateContext *ic;
768   struct GNUNET_PEERSTORE_IterateContext *next;
769   GNUNET_PEERSTORE_Processor icb;
770   void *icb_cls;
771   struct GNUNET_PEERSTORE_StoreContext *sc;
772   struct GNUNET_MQ_Envelope *ev;
773
774   LOG (GNUNET_ERROR_TYPE_DEBUG,
775        "Reconnecting...\n");
776   for (ic = h->iterate_head; NULL != ic; ic = next)
777   {
778     next = ic->next;
779     if (GNUNET_YES == ic->iterating)
780     {
781       icb = ic->callback;
782       icb_cls = ic->callback_cls;
783       GNUNET_PEERSTORE_iterate_cancel (ic);
784       if (NULL != icb)
785         icb (icb_cls,
786              NULL,
787              "Iteration canceled due to reconnection");
788     }
789   }
790   if (NULL != h->mq)
791   {
792     GNUNET_MQ_destroy (h->mq);
793     h->mq = NULL;
794   }
795   h->mq = GNUNET_CLIENT_connect (h->cfg,
796                                  "peerstore",
797                                  mq_handlers,
798                                  &handle_client_error,
799                                  h);
800   if (NULL == h->mq)
801     return;
802   LOG (GNUNET_ERROR_TYPE_DEBUG,
803        "Resending pending requests after reconnect.\n");
804   if (NULL != h->watches)
805     GNUNET_CONTAINER_multihashmap_iterate (h->watches,
806                                            &rewatch_it,
807                                            h);
808   for (ic = h->iterate_head; NULL != ic; ic = ic->next)
809   {
810     ev = PEERSTORE_create_record_mq_envelope (ic->sub_system,
811                                               &ic->peer,
812                                               ic->key,
813                                               NULL, 0,
814                                               GNUNET_TIME_UNIT_FOREVER_ABS,
815                                               0,
816                                               GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
817     GNUNET_MQ_send (h->mq, ev);
818   }
819   for (sc = h->store_head; NULL != sc; sc = sc->next)
820   {
821     ev = PEERSTORE_create_record_mq_envelope (sc->sub_system,
822                                               &sc->peer,
823                                               sc->key,
824                                               sc->value,
825                                               sc->size,
826                                               sc->expiry,
827                                               sc->options,
828                                               GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
829     GNUNET_MQ_notify_sent (ev,
830                            &store_request_sent,
831                            sc);
832     GNUNET_MQ_send (h->mq,
833                     ev);
834   }
835 }
836
837
838 /**
839  * Cancel a watch request
840  *
841  * @param wc handle to the watch request
842  */
843 void
844 GNUNET_PEERSTORE_watch_cancel (struct GNUNET_PEERSTORE_WatchContext *wc)
845 {
846   struct GNUNET_PEERSTORE_Handle *h = wc->h;
847   struct GNUNET_MQ_Envelope *ev;
848   struct StoreKeyHashMessage *hm;
849
850   LOG (GNUNET_ERROR_TYPE_DEBUG,
851        "Canceling watch.\n");
852   ev = GNUNET_MQ_msg (hm,
853                       GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL);
854   hm->keyhash = wc->keyhash;
855   GNUNET_MQ_send (h->mq, ev);
856   GNUNET_CONTAINER_multihashmap_remove (h->watches,
857                                         &wc->keyhash,
858                                         wc);
859   GNUNET_free (wc);
860 }
861
862
863 /**
864  * Request watching a given key
865  * User will be notified with any new values added to key
866  *
867  * @param h handle to the PEERSTORE service
868  * @param sub_system name of sub system
869  * @param peer Peer identity
870  * @param key entry key string
871  * @param callback function called with each new value
872  * @param callback_cls closure for @a callback
873  * @return Handle to watch request
874  */
875 struct GNUNET_PEERSTORE_WatchContext *
876 GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h,
877                         const char *sub_system,
878                         const struct GNUNET_PeerIdentity *peer,
879                         const char *key,
880                         GNUNET_PEERSTORE_Processor callback,
881                         void *callback_cls)
882 {
883   struct GNUNET_MQ_Envelope *ev;
884   struct StoreKeyHashMessage *hm;
885   struct GNUNET_PEERSTORE_WatchContext *wc;
886
887   ev = GNUNET_MQ_msg (hm,
888                       GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
889   PEERSTORE_hash_key (sub_system,
890                       peer,
891                       key,
892                       &hm->keyhash);
893   wc = GNUNET_new (struct GNUNET_PEERSTORE_WatchContext);
894   wc->callback = callback;
895   wc->callback_cls = callback_cls;
896   wc->h = h;
897   wc->keyhash = hm->keyhash;
898   if (NULL == h->watches)
899     h->watches = GNUNET_CONTAINER_multihashmap_create (5,
900                                                        GNUNET_NO);
901   GNUNET_assert (GNUNET_OK ==
902                  GNUNET_CONTAINER_multihashmap_put (h->watches,
903                                                     &wc->keyhash,
904                                                     wc,
905                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
906   LOG (GNUNET_ERROR_TYPE_DEBUG,
907        "Sending a watch request for subsystem `%s', peer `%s', key `%s'.\n",
908        sub_system,
909        GNUNET_i2s (peer),
910        key);
911   GNUNET_MQ_send (h->mq,
912                   ev);
913   return wc;
914 }
915
916 /* end of peerstore_api.c */