-skeletons for transport-ng
[oweals/gnunet.git] / src / peerstore / peerstore_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2013-2016 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file peerstore/peerstore_api.c
22  * @brief API for peerstore
23  * @author Omar Tarabai
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "peerstore.h"
29 #include "peerstore_common.h"
30
31 #define LOG(kind,...) GNUNET_log_from (kind, "peerstore-api",__VA_ARGS__)
32
33 /******************************************************************************/
34 /************************      DATA STRUCTURES     ****************************/
35 /******************************************************************************/
36
37 /**
38  * Handle to the PEERSTORE service.
39  */
40 struct GNUNET_PEERSTORE_Handle
41 {
42
43   /**
44    * Our configuration.
45    */
46   const struct GNUNET_CONFIGURATION_Handle *cfg;
47
48   /**
49    * Message queue
50    */
51   struct GNUNET_MQ_Handle *mq;
52
53   /**
54    * Head of active STORE requests.
55    */
56   struct GNUNET_PEERSTORE_StoreContext *store_head;
57
58   /**
59    * Tail of active STORE requests.
60    */
61   struct GNUNET_PEERSTORE_StoreContext *store_tail;
62
63   /**
64    * Head of active ITERATE requests.
65    */
66   struct GNUNET_PEERSTORE_IterateContext *iterate_head;
67
68   /**
69    * Tail of active ITERATE requests.
70    */
71   struct GNUNET_PEERSTORE_IterateContext *iterate_tail;
72
73   /**
74    * Hashmap of watch requests
75    */
76   struct GNUNET_CONTAINER_MultiHashMap *watches;
77
78   /**
79    * Are we in the process of disconnecting but need to sync first?
80    */
81   int disconnecting;
82
83 };
84
85 /**
86  * Context for a store request
87  */
88 struct GNUNET_PEERSTORE_StoreContext
89 {
90   /**
91    * Kept in a DLL.
92    */
93   struct GNUNET_PEERSTORE_StoreContext *next;
94
95   /**
96    * Kept in a DLL.
97    */
98   struct GNUNET_PEERSTORE_StoreContext *prev;
99
100   /**
101    * Handle to the PEERSTORE service.
102    */
103   struct GNUNET_PEERSTORE_Handle *h;
104
105   /**
106    * Continuation called with service response
107    */
108   GNUNET_PEERSTORE_Continuation cont;
109
110   /**
111    * Closure for @e cont
112    */
113   void *cont_cls;
114
115   /**
116    * Which subsystem does the store?
117    */
118   char *sub_system;
119
120   /**
121    * Key for the store operation.
122    */
123   char *key;
124
125   /**
126    * Contains @e size bytes.
127    */
128   void *value;
129
130   /**
131    * Peer the store is for.
132    */
133   struct GNUNET_PeerIdentity peer;
134
135   /**
136    * Number of bytes in @e value.
137    */
138   size_t size;
139
140   /**
141    * When does the value expire?
142    */
143   struct GNUNET_TIME_Absolute expiry;
144
145   /**
146    * Options for the store operation.
147    */
148   enum GNUNET_PEERSTORE_StoreOption options;
149
150 };
151
152 /**
153  * Context for a iterate request
154  */
155 struct GNUNET_PEERSTORE_IterateContext
156 {
157   /**
158    * Kept in a DLL.
159    */
160   struct GNUNET_PEERSTORE_IterateContext *next;
161
162   /**
163    * Kept in a DLL.
164    */
165   struct GNUNET_PEERSTORE_IterateContext *prev;
166
167   /**
168    * Handle to the PEERSTORE service.
169    */
170   struct GNUNET_PEERSTORE_Handle *h;
171
172   /**
173    * Which subsystem does the store?
174    */
175   char *sub_system;
176
177   /**
178    * Peer the store is for.
179    */
180   struct GNUNET_PeerIdentity peer;
181
182   /**
183    * Key for the store operation.
184    */
185   char *key;
186
187   /**
188    * Operation timeout
189    */
190   struct GNUNET_TIME_Relative timeout;
191
192   /**
193    * Callback with each matching record
194    */
195   GNUNET_PEERSTORE_Processor callback;
196
197   /**
198    * Closure for @e callback
199    */
200   void *callback_cls;
201
202   /**
203    * #GNUNET_YES if we are currently processing records.
204    */
205   int iterating;
206
207   /**
208    * Task identifier for the function called
209    * on iterate request timeout
210    */
211   struct GNUNET_SCHEDULER_Task *timeout_task;
212
213 };
214
215 /**
216  * Context for a watch request
217  */
218 struct GNUNET_PEERSTORE_WatchContext
219 {
220   /**
221    * Kept in a DLL.
222    */
223   struct GNUNET_PEERSTORE_WatchContext *next;
224
225   /**
226    * Kept in a DLL.
227    */
228   struct GNUNET_PEERSTORE_WatchContext *prev;
229
230   /**
231    * Handle to the PEERSTORE service.
232    */
233   struct GNUNET_PEERSTORE_Handle *h;
234
235   /**
236    * Callback with each record received
237    */
238   GNUNET_PEERSTORE_Processor callback;
239
240   /**
241    * Closure for @e callback
242    */
243   void *callback_cls;
244
245   /**
246    * Hash of the combined key
247    */
248   struct GNUNET_HashCode keyhash;
249
250 };
251
252 /******************************************************************************/
253 /*******************             DECLARATIONS             *********************/
254 /******************************************************************************/
255
256 /**
257  * Close the existing connection to PEERSTORE and reconnect.
258  *
259  * @param h handle to the service
260  */
261 static void
262 reconnect (struct GNUNET_PEERSTORE_Handle *h);
263
264
265 /**
266  * Callback after MQ envelope is sent
267  *
268  * @param cls a `struct GNUNET_PEERSTORE_StoreContext *`
269  */
270 static void
271 store_request_sent (void *cls)
272 {
273   struct GNUNET_PEERSTORE_StoreContext *sc = cls;
274   GNUNET_PEERSTORE_Continuation cont;
275   void *cont_cls;
276
277   cont = sc->cont;
278   cont_cls = sc->cont_cls;
279   GNUNET_PEERSTORE_store_cancel (sc);
280   if (NULL != cont)
281     cont (cont_cls, GNUNET_OK);
282 }
283
284
285 /******************************************************************************/
286 /*******************         CONNECTION FUNCTIONS         *********************/
287 /******************************************************************************/
288
289 static void
290 handle_client_error (void *cls,
291                      enum GNUNET_MQ_Error error)
292 {
293   struct GNUNET_PEERSTORE_Handle *h = cls;
294
295   LOG (GNUNET_ERROR_TYPE_ERROR,
296        _("Received an error notification from MQ of type: %d\n"),
297        error);
298   reconnect (h);
299 }
300
301
302 /**
303  * Iterator over previous watches to resend them
304  *
305  * @param cls the `struct GNUNET_PEERSTORE_Handle`
306  * @param key key for the watch
307  * @param value the `struct GNUNET_PEERSTORE_WatchContext *`
308  * @return #GNUNET_YES (continue to iterate)
309  */
310 static int
311 rewatch_it (void *cls,
312             const struct GNUNET_HashCode *key,
313             void *value)
314 {
315   struct GNUNET_PEERSTORE_Handle *h = cls;
316   struct GNUNET_PEERSTORE_WatchContext *wc = value;
317   struct StoreKeyHashMessage *hm;
318   struct GNUNET_MQ_Envelope *ev;
319
320   ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
321   hm->keyhash = wc->keyhash;
322   GNUNET_MQ_send (h->mq, ev);
323   return GNUNET_YES;
324 }
325
326
327 /**
328  * Called when the iterate request is timedout
329  *
330  * @param cls a `struct GNUNET_PEERSTORE_IterateContext *`
331  */
332 static void
333 iterate_timeout (void *cls)
334 {
335   struct GNUNET_PEERSTORE_IterateContext *ic = cls;
336   GNUNET_PEERSTORE_Processor callback;
337   void *callback_cls;
338
339   ic->timeout_task = NULL;
340   callback = ic->callback;
341   callback_cls = ic->callback_cls;
342   GNUNET_PEERSTORE_iterate_cancel (ic);
343   if (NULL != callback)
344     callback (callback_cls, NULL, _("timeout"));
345 }
346
347
348 /**
349  * Iterator over watch requests to cancel them.
350  *
351  * @param cls unsused
352  * @param key key to the watch request
353  * @param value watch context
354  * @return #GNUNET_YES to continue iteration
355  */
356 static int
357 destroy_watch (void *cls,
358                const struct GNUNET_HashCode *key,
359                void *value)
360 {
361   struct GNUNET_PEERSTORE_WatchContext *wc = value;
362
363   GNUNET_PEERSTORE_watch_cancel (wc);
364   return GNUNET_YES;
365 }
366
367
368 /**
369  * Kill the connection to the service. This can be delayed in case of pending
370  * STORE requests and the user explicitly asked to sync first. Otherwise it is
371  * performed instantly.
372  *
373  * @param h Handle to the service.
374  */
375 static void
376 do_disconnect (struct GNUNET_PEERSTORE_Handle *h)
377 {
378   if (NULL != h->mq)
379   {
380     GNUNET_MQ_destroy (h->mq);
381     h->mq = NULL;
382   }
383   GNUNET_free (h);
384 }
385
386
387 /**
388  * Connect to the PEERSTORE service.
389  *
390  * @param cfg configuration to use
391  * @return NULL on error
392  */
393 struct GNUNET_PEERSTORE_Handle *
394 GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
395 {
396   struct GNUNET_PEERSTORE_Handle *h;
397
398   h = GNUNET_new (struct GNUNET_PEERSTORE_Handle);
399   h->cfg = cfg;
400   h->disconnecting = GNUNET_NO;
401   reconnect (h);
402   if (NULL == h->mq)
403   {
404     GNUNET_free (h);
405     return NULL;
406   }
407   return h;
408 }
409
410
411 /**
412  * Disconnect from the PEERSTORE service. Any pending ITERATE and WATCH requests
413  * will be canceled.
414  * Any pending STORE requests will depend on @e snyc_first flag.
415  *
416  * @param h handle to disconnect
417  * @param sync_first send any pending STORE requests before disconnecting
418  */
419 void
420 GNUNET_PEERSTORE_disconnect (struct GNUNET_PEERSTORE_Handle *h,
421                              int sync_first)
422 {
423   struct GNUNET_PEERSTORE_IterateContext *ic;
424   struct GNUNET_PEERSTORE_StoreContext *sc;
425
426   LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting.\n");
427   if (NULL != h->watches)
428   {
429     GNUNET_CONTAINER_multihashmap_iterate (h->watches, &destroy_watch, NULL);
430     GNUNET_CONTAINER_multihashmap_destroy (h->watches);
431     h->watches = NULL;
432   }
433   while (NULL != (ic = h->iterate_head))
434   {
435     GNUNET_break (0);
436     GNUNET_PEERSTORE_iterate_cancel (ic);
437   }
438   if (NULL != h->store_head)
439   {
440     if (GNUNET_YES == sync_first)
441     {
442       LOG (GNUNET_ERROR_TYPE_DEBUG,
443            "Delaying disconnection due to pending store requests.\n");
444       h->disconnecting = GNUNET_YES;
445       return;
446     }
447     while (NULL != (sc = h->store_head))
448       GNUNET_PEERSTORE_store_cancel (sc);
449   }
450   do_disconnect (h);
451 }
452
453
454 /******************************************************************************/
455 /*******************            STORE FUNCTIONS           *********************/
456 /******************************************************************************/
457
458
459 /**
460  * Cancel a store request
461  *
462  * @param sc Store request context
463  */
464 void
465 GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc)
466 {
467   struct GNUNET_PEERSTORE_Handle *h = sc->h;
468
469   GNUNET_CONTAINER_DLL_remove (sc->h->store_head, sc->h->store_tail, sc);
470   GNUNET_free (sc->sub_system);
471   GNUNET_free (sc->value);
472   GNUNET_free (sc->key);
473   GNUNET_free (sc);
474   if ((GNUNET_YES == h->disconnecting) && (NULL == h->store_head))
475     do_disconnect (h);
476 }
477
478
479 /**
480  * Store a new entry in the PEERSTORE.
481  * Note that stored entries can be lost in some cases
482  * such as power failure.
483  *
484  * @param h Handle to the PEERSTORE service
485  * @param sub_system name of the sub system
486  * @param peer Peer Identity
487  * @param key entry key
488  * @param value entry value BLOB
489  * @param size size of @e value
490  * @param expiry absolute time after which the entry is (possibly) deleted
491  * @param options options specific to the storage operation
492  * @param cont Continuation function after the store request is sent
493  * @param cont_cls Closure for @a cont
494  */
495 struct GNUNET_PEERSTORE_StoreContext *
496 GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
497                         const char *sub_system,
498                         const struct GNUNET_PeerIdentity *peer,
499                         const char *key,
500                         const void *value, size_t size,
501                         struct GNUNET_TIME_Absolute expiry,
502                         enum GNUNET_PEERSTORE_StoreOption options,
503                         GNUNET_PEERSTORE_Continuation cont,
504                         void *cont_cls)
505 {
506   struct GNUNET_MQ_Envelope *ev;
507   struct GNUNET_PEERSTORE_StoreContext *sc;
508
509   LOG (GNUNET_ERROR_TYPE_DEBUG,
510        "Storing value (size: %lu) for subsytem `%s', peer `%s', key `%s'\n",
511        size, sub_system, GNUNET_i2s (peer), key);
512   ev = PEERSTORE_create_record_mq_envelope (sub_system, peer, key, value, size,
513                                             &expiry, options,
514                                             GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
515   sc = GNUNET_new (struct GNUNET_PEERSTORE_StoreContext);
516
517   sc->sub_system = GNUNET_strdup (sub_system);
518   sc->peer = *peer;
519   sc->key = GNUNET_strdup (key);
520   sc->value = GNUNET_memdup (value, size);
521   sc->size = size;
522   sc->expiry = expiry;
523   sc->options = options;
524   sc->cont = cont;
525   sc->cont_cls = cont_cls;
526   sc->h = h;
527
528   GNUNET_CONTAINER_DLL_insert_tail (h->store_head, h->store_tail, sc);
529   GNUNET_MQ_notify_sent (ev, &store_request_sent, sc);
530   GNUNET_MQ_send (h->mq, ev);
531   return sc;
532
533 }
534
535
536 /******************************************************************************/
537 /*******************           ITERATE FUNCTIONS          *********************/
538 /******************************************************************************/
539
540
541 /**
542  * When a response for iterate request is received
543  *
544  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
545  * @param msg message received
546  */
547 static void
548 handle_iterate_end (void *cls,
549                     const struct GNUNET_MessageHeader *msg)
550 {
551   struct GNUNET_PEERSTORE_Handle *h = cls;
552   struct GNUNET_PEERSTORE_IterateContext *ic;
553   GNUNET_PEERSTORE_Processor callback;
554   void *callback_cls;
555
556   ic = h->iterate_head;
557   if (NULL == ic)
558   {
559     LOG (GNUNET_ERROR_TYPE_ERROR,
560          _("Unexpected iteration response, this should not happen.\n"));
561     reconnect (h);
562     return;
563   }
564   callback = ic->callback;
565   callback_cls = ic->callback_cls;
566   ic->iterating = GNUNET_NO;
567   GNUNET_PEERSTORE_iterate_cancel (ic);
568   if (NULL != callback)
569     callback (callback_cls, NULL, NULL);
570 }
571
572
573 /**
574  * When a response for iterate request is received, check the
575  * message is well-formed.
576  *
577  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
578  * @param msg message received
579  */
580 static int
581 check_iterate_result (void *cls,
582                       const struct GNUNET_MessageHeader *msg)
583 {
584   /* we defer validation to #handle_iterate_result */
585   return GNUNET_OK;
586 }
587
588
589 /**
590  * When a response for iterate request is received
591  *
592  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
593  * @param msg message received
594  */
595 static void
596 handle_iterate_result (void *cls,
597                        const struct GNUNET_MessageHeader *msg)
598 {
599   struct GNUNET_PEERSTORE_Handle *h = cls;
600   struct GNUNET_PEERSTORE_IterateContext *ic;
601   GNUNET_PEERSTORE_Processor callback;
602   void *callback_cls;
603   struct GNUNET_PEERSTORE_Record *record;
604
605   ic = h->iterate_head;
606   if (NULL == ic)
607   {
608     LOG (GNUNET_ERROR_TYPE_ERROR,
609          _("Unexpected iteration response, this should not happen.\n"));
610     reconnect (h);
611     return;
612   }
613   ic->iterating = GNUNET_YES;
614   callback = ic->callback;
615   callback_cls = ic->callback_cls;
616   if (NULL == callback)
617     return;
618   record = PEERSTORE_parse_record_message (msg);
619   if (NULL == record)
620   {
621     callback (callback_cls,
622               NULL,
623               _("Received a malformed response from service."));
624   }
625   else
626   {
627     callback (callback_cls,
628               record,
629               NULL);
630     PEERSTORE_destroy_record (record);
631   }
632 }
633
634
635 /**
636  * Cancel an iterate request
637  * Please do not call after the iterate request is done
638  *
639  * @param ic Iterate request context as returned by GNUNET_PEERSTORE_iterate()
640  */
641 void
642 GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic)
643 {
644   if (NULL != ic->timeout_task)
645   {
646     GNUNET_SCHEDULER_cancel (ic->timeout_task);
647     ic->timeout_task = NULL;
648   }
649   if (GNUNET_NO == ic->iterating)
650   {
651     GNUNET_CONTAINER_DLL_remove (ic->h->iterate_head,
652                                  ic->h->iterate_tail,
653                                  ic);
654     GNUNET_free (ic->sub_system);
655     GNUNET_free_non_null (ic->key);
656     GNUNET_free (ic);
657   }
658   else
659     ic->callback = NULL;
660 }
661
662
663 /**
664  * Iterate over records matching supplied key information
665  *
666  * @param h handle to the PEERSTORE service
667  * @param sub_system name of sub system
668  * @param peer Peer identity (can be NULL)
669  * @param key entry key string (can be NULL)
670  * @param timeout time after which the iterate request is canceled
671  * @param callback function called with each matching record, all NULL's on end
672  * @param callback_cls closure for @a callback
673  * @return Handle to iteration request
674  */
675 struct GNUNET_PEERSTORE_IterateContext *
676 GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
677                           const char *sub_system,
678                           const struct GNUNET_PeerIdentity *peer,
679                           const char *key,
680                           struct GNUNET_TIME_Relative timeout,
681                           GNUNET_PEERSTORE_Processor callback,
682                           void *callback_cls)
683 {
684   struct GNUNET_MQ_Envelope *ev;
685   struct GNUNET_PEERSTORE_IterateContext *ic;
686
687   ev = PEERSTORE_create_record_mq_envelope (sub_system, peer, key, NULL, 0,
688                                             NULL, 0,
689                                             GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
690   ic = GNUNET_new (struct GNUNET_PEERSTORE_IterateContext);
691
692   ic->callback = callback;
693   ic->callback_cls = callback_cls;
694   ic->h = h;
695   ic->sub_system = GNUNET_strdup (sub_system);
696   if (NULL != peer)
697     ic->peer = *peer;
698   if (NULL != key)
699     ic->key = GNUNET_strdup (key);
700   ic->timeout = timeout;
701   GNUNET_CONTAINER_DLL_insert_tail (h->iterate_head,
702                                     h->iterate_tail,
703                                     ic);
704   LOG (GNUNET_ERROR_TYPE_DEBUG,
705        "Sending an iterate request for sub system `%s'\n",
706        sub_system);
707   GNUNET_MQ_send (h->mq, ev);
708   ic->timeout_task =
709       GNUNET_SCHEDULER_add_delayed (timeout,
710                                     &iterate_timeout,
711                                     ic);
712   return ic;
713 }
714
715
716 /******************************************************************************/
717 /*******************            WATCH FUNCTIONS           *********************/
718 /******************************************************************************/
719
720 /**
721  * When a watch record is received, validate it is well-formed.
722  *
723  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
724  * @param msg message received
725  */
726 static int
727 check_watch_record (void *cls,
728                     const struct GNUNET_MessageHeader *msg)
729 {
730   /* we defer validation to #handle_watch_result */
731   return GNUNET_OK;
732 }
733
734
735 /**
736  * When a watch record is received, process it.
737  *
738  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
739  * @param msg message received
740  */
741 static void
742 handle_watch_record (void *cls,
743                      const struct GNUNET_MessageHeader *msg)
744 {
745   struct GNUNET_PEERSTORE_Handle *h = cls;
746   struct GNUNET_PEERSTORE_Record *record;
747   struct GNUNET_HashCode keyhash;
748   struct GNUNET_PEERSTORE_WatchContext *wc;
749
750   LOG (GNUNET_ERROR_TYPE_DEBUG,
751        "Received a watch record from service.\n");
752   record = PEERSTORE_parse_record_message (msg);
753   if (NULL == record)
754   {
755     reconnect (h);
756     return;
757   }
758   PEERSTORE_hash_key (record->sub_system,
759                       record->peer,
760                       record->key,
761                       &keyhash);
762   // FIXME: what if there are multiple watches for the same key?
763   wc = GNUNET_CONTAINER_multihashmap_get (h->watches,
764                                           &keyhash);
765   if (NULL == wc)
766   {
767     LOG (GNUNET_ERROR_TYPE_ERROR,
768          _("Received a watch result for a non existing watch.\n"));
769     PEERSTORE_destroy_record (record);
770     reconnect (h);
771     return;
772   }
773   if (NULL != wc->callback)
774     wc->callback (wc->callback_cls,
775                   record,
776                   NULL);
777   PEERSTORE_destroy_record (record);
778 }
779
780
781 /**
782  * Close the existing connection to PEERSTORE and reconnect.
783  *
784  * @param h handle to the service
785  */
786 static void
787 reconnect (struct GNUNET_PEERSTORE_Handle *h)
788 {
789   GNUNET_MQ_hd_fixed_size (iterate_end,
790                            GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END,
791                            struct GNUNET_MessageHeader);
792   GNUNET_MQ_hd_var_size (iterate_result,
793                          GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD,
794                          struct GNUNET_MessageHeader);
795   GNUNET_MQ_hd_var_size (watch_record,
796                          GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD,
797                          struct GNUNET_MessageHeader);
798   struct GNUNET_MQ_MessageHandler mq_handlers[] = {
799     make_iterate_end_handler (h),
800     make_iterate_result_handler (h),
801     make_watch_record_handler (h),
802     GNUNET_MQ_handler_end ()
803   };
804   struct GNUNET_PEERSTORE_IterateContext *ic;
805   struct GNUNET_PEERSTORE_IterateContext *next;
806   GNUNET_PEERSTORE_Processor icb;
807   void *icb_cls;
808   struct GNUNET_PEERSTORE_StoreContext *sc;
809   struct GNUNET_MQ_Envelope *ev;
810
811   LOG (GNUNET_ERROR_TYPE_DEBUG,
812        "Reconnecting...\n");
813   for (ic = h->iterate_head; NULL != ic; ic = next)
814   {
815     next = ic->next;
816     if (GNUNET_YES == ic->iterating)
817     {
818       icb = ic->callback;
819       icb_cls = ic->callback_cls;
820       GNUNET_PEERSTORE_iterate_cancel (ic);
821       if (NULL != icb)
822         icb (icb_cls,
823              NULL,
824              "Iteration canceled due to reconnection");
825     }
826   }
827   if (NULL != h->mq)
828   {
829     GNUNET_MQ_destroy (h->mq);
830     h->mq = NULL;
831   }
832   h->mq = GNUNET_CLIENT_connecT (h->cfg,
833                                  "peerstore",
834                                  mq_handlers,
835                                  &handle_client_error,
836                                  h);
837   if (NULL == h->mq)
838     return;
839   LOG (GNUNET_ERROR_TYPE_DEBUG,
840        "Resending pending requests after reconnect.\n");
841   if (NULL != h->watches)
842     GNUNET_CONTAINER_multihashmap_iterate (h->watches,
843                                            &rewatch_it,
844                                            h);
845   for (ic = h->iterate_head; NULL != ic; ic = ic->next)
846   {
847     ev = PEERSTORE_create_record_mq_envelope (ic->sub_system,
848                                               &ic->peer,
849                                               ic->key,
850                                               NULL, 0,
851                                               NULL, 0,
852                                               GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
853     GNUNET_MQ_send (h->mq, ev);
854     ic->timeout_task
855       = GNUNET_SCHEDULER_add_delayed (ic->timeout,
856                                       &iterate_timeout,
857                                       ic);
858   }
859   for (sc = h->store_head; NULL != sc; sc = sc->next)
860   {
861     ev = PEERSTORE_create_record_mq_envelope (sc->sub_system,
862                                               &sc->peer,
863                                               sc->key,
864                                               sc->value,
865                                               sc->size,
866                                               &sc->expiry,
867                                               sc->options,
868                                               GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
869     GNUNET_MQ_notify_sent (ev,
870                            &store_request_sent,
871                            sc);
872     GNUNET_MQ_send (h->mq,
873                     ev);
874   }
875 }
876
877
878 /**
879  * Cancel a watch request
880  *
881  * @param wc handle to the watch request
882  */
883 void
884 GNUNET_PEERSTORE_watch_cancel (struct GNUNET_PEERSTORE_WatchContext *wc)
885 {
886   struct GNUNET_PEERSTORE_Handle *h = wc->h;
887   struct GNUNET_MQ_Envelope *ev;
888   struct StoreKeyHashMessage *hm;
889
890   LOG (GNUNET_ERROR_TYPE_DEBUG,
891        "Canceling watch.\n");
892   ev = GNUNET_MQ_msg (hm,
893                       GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL);
894   hm->keyhash = wc->keyhash;
895   GNUNET_MQ_send (h->mq, ev);
896   GNUNET_CONTAINER_multihashmap_remove (h->watches,
897                                         &wc->keyhash,
898                                         wc);
899   GNUNET_free (wc);
900 }
901
902
903 /**
904  * Request watching a given key
905  * User will be notified with any new values added to key
906  *
907  * @param h handle to the PEERSTORE service
908  * @param sub_system name of sub system
909  * @param peer Peer identity
910  * @param key entry key string
911  * @param callback function called with each new value
912  * @param callback_cls closure for @a callback
913  * @return Handle to watch request
914  */
915 struct GNUNET_PEERSTORE_WatchContext *
916 GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h,
917                         const char *sub_system,
918                         const struct GNUNET_PeerIdentity *peer,
919                         const char *key,
920                         GNUNET_PEERSTORE_Processor callback,
921                         void *callback_cls)
922 {
923   struct GNUNET_MQ_Envelope *ev;
924   struct StoreKeyHashMessage *hm;
925   struct GNUNET_PEERSTORE_WatchContext *wc;
926
927   ev = GNUNET_MQ_msg (hm,
928                       GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
929   PEERSTORE_hash_key (sub_system,
930                       peer,
931                       key,
932                       &hm->keyhash);
933   wc = GNUNET_new (struct GNUNET_PEERSTORE_WatchContext);
934   wc->callback = callback;
935   wc->callback_cls = callback_cls;
936   wc->h = h;
937   wc->keyhash = hm->keyhash;
938   if (NULL == h->watches)
939     h->watches = GNUNET_CONTAINER_multihashmap_create (5, GNUNET_NO);
940   GNUNET_assert (GNUNET_OK ==
941                  GNUNET_CONTAINER_multihashmap_put (h->watches,
942                                                     &wc->keyhash,
943                                                     wc,
944                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
945   LOG (GNUNET_ERROR_TYPE_DEBUG,
946        "Sending a watch request for ss `%s', peer `%s', key `%s'.\n",
947        sub_system,
948        GNUNET_i2s (peer),
949        key);
950   GNUNET_MQ_send (h->mq, ev);
951   return wc;
952 }
953
954 /* end of peerstore_api.c */