fix build issues
[oweals/gnunet.git] / src / peerstore / peerstore_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2013-2016 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file peerstore/peerstore_api.c
22  * @brief API for peerstore
23  * @author Omar Tarabai
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "peerstore.h"
29 #include "peerstore_common.h"
30
31 #define LOG(kind,...) GNUNET_log_from (kind, "peerstore-api",__VA_ARGS__)
32
33 /******************************************************************************/
34 /************************      DATA STRUCTURES     ****************************/
35 /******************************************************************************/
36
37 /**
38  * Handle to the PEERSTORE service.
39  */
40 struct GNUNET_PEERSTORE_Handle
41 {
42
43   /**
44    * Our configuration.
45    */
46   const struct GNUNET_CONFIGURATION_Handle *cfg;
47
48   /**
49    * Message queue
50    */
51   struct GNUNET_MQ_Handle *mq;
52
53   /**
54    * Head of active STORE requests.
55    */
56   struct GNUNET_PEERSTORE_StoreContext *store_head;
57
58   /**
59    * Tail of active STORE requests.
60    */
61   struct GNUNET_PEERSTORE_StoreContext *store_tail;
62
63   /**
64    * Head of active ITERATE requests.
65    */
66   struct GNUNET_PEERSTORE_IterateContext *iterate_head;
67
68   /**
69    * Tail of active ITERATE requests.
70    */
71   struct GNUNET_PEERSTORE_IterateContext *iterate_tail;
72
73   /**
74    * Hashmap of watch requests
75    */
76   struct GNUNET_CONTAINER_MultiHashMap *watches;
77
78   /**
79    * Are we in the process of disconnecting but need to sync first?
80    */
81   int disconnecting;
82
83 };
84
85 /**
86  * Context for a store request
87  */
88 struct GNUNET_PEERSTORE_StoreContext
89 {
90   /**
91    * Kept in a DLL.
92    */
93   struct GNUNET_PEERSTORE_StoreContext *next;
94
95   /**
96    * Kept in a DLL.
97    */
98   struct GNUNET_PEERSTORE_StoreContext *prev;
99
100   /**
101    * Handle to the PEERSTORE service.
102    */
103   struct GNUNET_PEERSTORE_Handle *h;
104
105   /**
106    * Continuation called with service response
107    */
108   GNUNET_PEERSTORE_Continuation cont;
109
110   /**
111    * Closure for @e cont
112    */
113   void *cont_cls;
114
115   /**
116    * Which subsystem does the store?
117    */
118   char *sub_system;
119
120   /**
121    * Key for the store operation.
122    */
123   char *key;
124
125   /**
126    * Contains @e size bytes.
127    */
128   void *value;
129
130   /**
131    * Peer the store is for.
132    */
133   struct GNUNET_PeerIdentity peer;
134
135   /**
136    * Number of bytes in @e value.
137    */
138   size_t size;
139
140   /**
141    * When does the value expire?
142    */
143   struct GNUNET_TIME_Absolute expiry;
144
145   /**
146    * Options for the store operation.
147    */
148   enum GNUNET_PEERSTORE_StoreOption options;
149
150 };
151
152 /**
153  * Context for a iterate request
154  */
155 struct GNUNET_PEERSTORE_IterateContext
156 {
157   /**
158    * Kept in a DLL.
159    */
160   struct GNUNET_PEERSTORE_IterateContext *next;
161
162   /**
163    * Kept in a DLL.
164    */
165   struct GNUNET_PEERSTORE_IterateContext *prev;
166
167   /**
168    * Handle to the PEERSTORE service.
169    */
170   struct GNUNET_PEERSTORE_Handle *h;
171
172   /**
173    * Which subsystem does the store?
174    */
175   char *sub_system;
176
177   /**
178    * Peer the store is for.
179    */
180   struct GNUNET_PeerIdentity peer;
181
182   /**
183    * Key for the store operation.
184    */
185   char *key;
186
187   /**
188    * Operation timeout
189    */
190   struct GNUNET_TIME_Relative timeout;
191
192   /**
193    * Callback with each matching record
194    */
195   GNUNET_PEERSTORE_Processor callback;
196
197   /**
198    * Closure for @e callback
199    */
200   void *callback_cls;
201
202   /**
203    * #GNUNET_YES if we are currently processing records.
204    */
205   int iterating;
206
207   /**
208    * Task identifier for the function called
209    * on iterate request timeout
210    */
211   struct GNUNET_SCHEDULER_Task *timeout_task;
212
213 };
214
215 /**
216  * Context for a watch request
217  */
218 struct GNUNET_PEERSTORE_WatchContext
219 {
220   /**
221    * Kept in a DLL.
222    */
223   struct GNUNET_PEERSTORE_WatchContext *next;
224
225   /**
226    * Kept in a DLL.
227    */
228   struct GNUNET_PEERSTORE_WatchContext *prev;
229
230   /**
231    * Handle to the PEERSTORE service.
232    */
233   struct GNUNET_PEERSTORE_Handle *h;
234
235   /**
236    * Callback with each record received
237    */
238   GNUNET_PEERSTORE_Processor callback;
239
240   /**
241    * Closure for @e callback
242    */
243   void *callback_cls;
244
245   /**
246    * Hash of the combined key
247    */
248   struct GNUNET_HashCode keyhash;
249
250 };
251
252 /******************************************************************************/
253 /*******************             DECLARATIONS             *********************/
254 /******************************************************************************/
255
256 /**
257  * Close the existing connection to PEERSTORE and reconnect.
258  *
259  * @param h handle to the service
260  */
261 static void
262 reconnect (struct GNUNET_PEERSTORE_Handle *h);
263
264
265 /**
266  * Callback after MQ envelope is sent
267  *
268  * @param cls a `struct GNUNET_PEERSTORE_StoreContext *`
269  */
270 static void
271 store_request_sent (void *cls)
272 {
273   struct GNUNET_PEERSTORE_StoreContext *sc = cls;
274   GNUNET_PEERSTORE_Continuation cont;
275   void *cont_cls;
276
277   cont = sc->cont;
278   cont_cls = sc->cont_cls;
279   GNUNET_PEERSTORE_store_cancel (sc);
280   if (NULL != cont)
281     cont (cont_cls, GNUNET_OK);
282 }
283
284
285 /******************************************************************************/
286 /*******************         CONNECTION FUNCTIONS         *********************/
287 /******************************************************************************/
288
289
290 /**
291  * Function called when we had trouble talking to the service.
292  */
293 static void
294 handle_client_error (void *cls,
295                      enum GNUNET_MQ_Error error)
296 {
297   struct GNUNET_PEERSTORE_Handle *h = cls;
298
299   LOG (GNUNET_ERROR_TYPE_ERROR,
300        "Received an error notification from MQ of type: %d\n",
301        error);
302   reconnect (h);
303 }
304
305
306 /**
307  * Iterator over previous watches to resend them
308  *
309  * @param cls the `struct GNUNET_PEERSTORE_Handle`
310  * @param key key for the watch
311  * @param value the `struct GNUNET_PEERSTORE_WatchContext *`
312  * @return #GNUNET_YES (continue to iterate)
313  */
314 static int
315 rewatch_it (void *cls,
316             const struct GNUNET_HashCode *key,
317             void *value)
318 {
319   struct GNUNET_PEERSTORE_Handle *h = cls;
320   struct GNUNET_PEERSTORE_WatchContext *wc = value;
321   struct StoreKeyHashMessage *hm;
322   struct GNUNET_MQ_Envelope *ev;
323
324   ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
325   hm->keyhash = wc->keyhash;
326   GNUNET_MQ_send (h->mq, ev);
327   return GNUNET_YES;
328 }
329
330
331 /**
332  * Called when the iterate request is timedout
333  *
334  * @param cls a `struct GNUNET_PEERSTORE_IterateContext *`
335  */
336 static void
337 iterate_timeout (void *cls)
338 {
339   struct GNUNET_PEERSTORE_IterateContext *ic = cls;
340   GNUNET_PEERSTORE_Processor callback;
341   void *callback_cls;
342
343   ic->timeout_task = NULL;
344   callback = ic->callback;
345   callback_cls = ic->callback_cls;
346   GNUNET_PEERSTORE_iterate_cancel (ic);
347   if (NULL != callback)
348     callback (callback_cls,
349               NULL,
350               _("timeout"));
351 }
352
353
354 /**
355  * Iterator over watch requests to cancel them.
356  *
357  * @param cls unsused
358  * @param key key to the watch request
359  * @param value watch context
360  * @return #GNUNET_YES to continue iteration
361  */
362 static int
363 destroy_watch (void *cls,
364                const struct GNUNET_HashCode *key,
365                void *value)
366 {
367   struct GNUNET_PEERSTORE_WatchContext *wc = value;
368
369   GNUNET_PEERSTORE_watch_cancel (wc);
370   return GNUNET_YES;
371 }
372
373
374 /**
375  * Kill the connection to the service. This can be delayed in case of pending
376  * STORE requests and the user explicitly asked to sync first. Otherwise it is
377  * performed instantly.
378  *
379  * @param h Handle to the service.
380  */
381 static void
382 do_disconnect (struct GNUNET_PEERSTORE_Handle *h)
383 {
384   if (NULL != h->mq)
385   {
386     GNUNET_MQ_destroy (h->mq);
387     h->mq = NULL;
388   }
389   GNUNET_free (h);
390 }
391
392
393 /**
394  * Connect to the PEERSTORE service.
395  *
396  * @param cfg configuration to use
397  * @return NULL on error
398  */
399 struct GNUNET_PEERSTORE_Handle *
400 GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
401 {
402   struct GNUNET_PEERSTORE_Handle *h;
403
404   h = GNUNET_new (struct GNUNET_PEERSTORE_Handle);
405   h->cfg = cfg;
406   h->disconnecting = GNUNET_NO;
407   reconnect (h);
408   if (NULL == h->mq)
409   {
410     GNUNET_free (h);
411     return NULL;
412   }
413   return h;
414 }
415
416
417 /**
418  * Disconnect from the PEERSTORE service. Any pending ITERATE and WATCH requests
419  * will be canceled.
420  * Any pending STORE requests will depend on @e snyc_first flag.
421  *
422  * @param h handle to disconnect
423  * @param sync_first send any pending STORE requests before disconnecting
424  */
425 void
426 GNUNET_PEERSTORE_disconnect (struct GNUNET_PEERSTORE_Handle *h,
427                              int sync_first)
428 {
429   struct GNUNET_PEERSTORE_IterateContext *ic;
430   struct GNUNET_PEERSTORE_StoreContext *sc;
431
432   LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting.\n");
433   if (NULL != h->watches)
434   {
435     GNUNET_CONTAINER_multihashmap_iterate (h->watches, &destroy_watch, NULL);
436     GNUNET_CONTAINER_multihashmap_destroy (h->watches);
437     h->watches = NULL;
438   }
439   while (NULL != (ic = h->iterate_head))
440   {
441     GNUNET_break (0);
442     GNUNET_PEERSTORE_iterate_cancel (ic);
443   }
444   if (NULL != h->store_head)
445   {
446     if (GNUNET_YES == sync_first)
447     {
448       LOG (GNUNET_ERROR_TYPE_DEBUG,
449            "Delaying disconnection due to pending store requests.\n");
450       h->disconnecting = GNUNET_YES;
451       return;
452     }
453     while (NULL != (sc = h->store_head))
454       GNUNET_PEERSTORE_store_cancel (sc);
455   }
456   do_disconnect (h);
457 }
458
459
460 /******************************************************************************/
461 /*******************            STORE FUNCTIONS           *********************/
462 /******************************************************************************/
463
464
465 /**
466  * Cancel a store request
467  *
468  * @param sc Store request context
469  */
470 void
471 GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc)
472 {
473   struct GNUNET_PEERSTORE_Handle *h = sc->h;
474
475   GNUNET_CONTAINER_DLL_remove (sc->h->store_head, sc->h->store_tail, sc);
476   GNUNET_free (sc->sub_system);
477   GNUNET_free (sc->value);
478   GNUNET_free (sc->key);
479   GNUNET_free (sc);
480   if ((GNUNET_YES == h->disconnecting) && (NULL == h->store_head))
481     do_disconnect (h);
482 }
483
484
485 /**
486  * Store a new entry in the PEERSTORE.
487  * Note that stored entries can be lost in some cases
488  * such as power failure.
489  *
490  * @param h Handle to the PEERSTORE service
491  * @param sub_system name of the sub system
492  * @param peer Peer Identity
493  * @param key entry key
494  * @param value entry value BLOB
495  * @param size size of @e value
496  * @param expiry absolute time after which the entry is (possibly) deleted
497  * @param options options specific to the storage operation
498  * @param cont Continuation function after the store request is sent
499  * @param cont_cls Closure for @a cont
500  */
501 struct GNUNET_PEERSTORE_StoreContext *
502 GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
503                         const char *sub_system,
504                         const struct GNUNET_PeerIdentity *peer,
505                         const char *key,
506                         const void *value, size_t size,
507                         struct GNUNET_TIME_Absolute expiry,
508                         enum GNUNET_PEERSTORE_StoreOption options,
509                         GNUNET_PEERSTORE_Continuation cont,
510                         void *cont_cls)
511 {
512   struct GNUNET_MQ_Envelope *ev;
513   struct GNUNET_PEERSTORE_StoreContext *sc;
514
515   LOG (GNUNET_ERROR_TYPE_DEBUG,
516        "Storing value (size: %lu) for subsytem `%s', peer `%s', key `%s'\n",
517        size, sub_system, GNUNET_i2s (peer), key);
518   ev = PEERSTORE_create_record_mq_envelope (sub_system, peer, key, value, size,
519                                             expiry, options,
520                                             GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
521   sc = GNUNET_new (struct GNUNET_PEERSTORE_StoreContext);
522
523   sc->sub_system = GNUNET_strdup (sub_system);
524   sc->peer = *peer;
525   sc->key = GNUNET_strdup (key);
526   sc->value = GNUNET_memdup (value, size);
527   sc->size = size;
528   sc->expiry = expiry;
529   sc->options = options;
530   sc->cont = cont;
531   sc->cont_cls = cont_cls;
532   sc->h = h;
533
534   GNUNET_CONTAINER_DLL_insert_tail (h->store_head, h->store_tail, sc);
535   GNUNET_MQ_notify_sent (ev, &store_request_sent, sc);
536   GNUNET_MQ_send (h->mq, ev);
537   return sc;
538
539 }
540
541
542 /******************************************************************************/
543 /*******************           ITERATE FUNCTIONS          *********************/
544 /******************************************************************************/
545
546
547 /**
548  * When a response for iterate request is received
549  *
550  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
551  * @param msg message received
552  */
553 static void
554 handle_iterate_end (void *cls,
555                     const struct GNUNET_MessageHeader *msg)
556 {
557   struct GNUNET_PEERSTORE_Handle *h = cls;
558   struct GNUNET_PEERSTORE_IterateContext *ic;
559   GNUNET_PEERSTORE_Processor callback;
560   void *callback_cls;
561
562   ic = h->iterate_head;
563   if (NULL == ic)
564   {
565     LOG (GNUNET_ERROR_TYPE_ERROR,
566          _("Unexpected iteration response, this should not happen.\n"));
567     reconnect (h);
568     return;
569   }
570   callback = ic->callback;
571   callback_cls = ic->callback_cls;
572   ic->iterating = GNUNET_NO;
573   GNUNET_PEERSTORE_iterate_cancel (ic);
574   if (NULL != callback)
575     callback (callback_cls, NULL, NULL);
576 }
577
578
579 /**
580  * When a response for iterate request is received, check the
581  * message is well-formed.
582  *
583  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
584  * @param msg message received
585  */
586 static int
587 check_iterate_result (void *cls,
588                       const struct StoreRecordMessage *msg)
589 {
590   /* we defer validation to #handle_iterate_result */
591   return GNUNET_OK;
592 }
593
594
595 /**
596  * When a response for iterate request is received
597  *
598  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
599  * @param msg message received
600  */
601 static void
602 handle_iterate_result (void *cls,
603                        const struct StoreRecordMessage *msg)
604 {
605   struct GNUNET_PEERSTORE_Handle *h = cls;
606   struct GNUNET_PEERSTORE_IterateContext *ic;
607   GNUNET_PEERSTORE_Processor callback;
608   void *callback_cls;
609   struct GNUNET_PEERSTORE_Record *record;
610
611   ic = h->iterate_head;
612   if (NULL == ic)
613   {
614     LOG (GNUNET_ERROR_TYPE_ERROR,
615          _("Unexpected iteration response, this should not happen.\n"));
616     reconnect (h);
617     return;
618   }
619   ic->iterating = GNUNET_YES;
620   callback = ic->callback;
621   callback_cls = ic->callback_cls;
622   if (NULL == callback)
623     return;
624   record = PEERSTORE_parse_record_message (msg);
625   if (NULL == record)
626   {
627     callback (callback_cls,
628               NULL,
629               _("Received a malformed response from service."));
630   }
631   else
632   {
633     callback (callback_cls,
634               record,
635               NULL);
636     PEERSTORE_destroy_record (record);
637   }
638 }
639
640
641 /**
642  * Cancel an iterate request
643  * Please do not call after the iterate request is done
644  *
645  * @param ic Iterate request context as returned by GNUNET_PEERSTORE_iterate()
646  */
647 void
648 GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic)
649 {
650   if (NULL != ic->timeout_task)
651   {
652     GNUNET_SCHEDULER_cancel (ic->timeout_task);
653     ic->timeout_task = NULL;
654   }
655   if (GNUNET_NO == ic->iterating)
656   {
657     GNUNET_CONTAINER_DLL_remove (ic->h->iterate_head,
658                                  ic->h->iterate_tail,
659                                  ic);
660     GNUNET_free (ic->sub_system);
661     GNUNET_free_non_null (ic->key);
662     GNUNET_free (ic);
663   }
664   else
665     ic->callback = NULL;
666 }
667
668
669 /**
670  * Iterate over records matching supplied key information
671  *
672  * @param h handle to the PEERSTORE service
673  * @param sub_system name of sub system
674  * @param peer Peer identity (can be NULL)
675  * @param key entry key string (can be NULL)
676  * @param timeout time after which the iterate request is canceled
677  * @param callback function called with each matching record, all NULL's on end
678  * @param callback_cls closure for @a callback
679  * @return Handle to iteration request
680  */
681 struct GNUNET_PEERSTORE_IterateContext *
682 GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
683                           const char *sub_system,
684                           const struct GNUNET_PeerIdentity *peer,
685                           const char *key,
686                           struct GNUNET_TIME_Relative timeout,
687                           GNUNET_PEERSTORE_Processor callback,
688                           void *callback_cls)
689 {
690   struct GNUNET_MQ_Envelope *ev;
691   struct GNUNET_PEERSTORE_IterateContext *ic;
692
693   ev = PEERSTORE_create_record_mq_envelope (sub_system,
694                                             peer,
695                                             key,
696                                             NULL, 0,
697                                             GNUNET_TIME_UNIT_FOREVER_ABS,
698                                             0,
699                                             GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
700   ic = GNUNET_new (struct GNUNET_PEERSTORE_IterateContext);
701
702   ic->callback = callback;
703   ic->callback_cls = callback_cls;
704   ic->h = h;
705   ic->sub_system = GNUNET_strdup (sub_system);
706   if (NULL != peer)
707     ic->peer = *peer;
708   if (NULL != key)
709     ic->key = GNUNET_strdup (key);
710   ic->timeout = timeout;
711   GNUNET_CONTAINER_DLL_insert_tail (h->iterate_head,
712                                     h->iterate_tail,
713                                     ic);
714   LOG (GNUNET_ERROR_TYPE_DEBUG,
715        "Sending an iterate request for sub system `%s'\n",
716        sub_system);
717   GNUNET_MQ_send (h->mq, ev);
718   ic->timeout_task =
719       GNUNET_SCHEDULER_add_delayed (timeout,
720                                     &iterate_timeout,
721                                     ic);
722   return ic;
723 }
724
725
726 /******************************************************************************/
727 /*******************            WATCH FUNCTIONS           *********************/
728 /******************************************************************************/
729
730 /**
731  * When a watch record is received, validate it is well-formed.
732  *
733  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
734  * @param msg message received
735  */
736 static int
737 check_watch_record (void *cls,
738                     const struct StoreRecordMessage *msg)
739 {
740   /* we defer validation to #handle_watch_result */
741   return GNUNET_OK;
742 }
743
744
745 /**
746  * When a watch record is received, process it.
747  *
748  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
749  * @param msg message received
750  */
751 static void
752 handle_watch_record (void *cls,
753                      const struct StoreRecordMessage *msg)
754 {
755   struct GNUNET_PEERSTORE_Handle *h = cls;
756   struct GNUNET_PEERSTORE_Record *record;
757   struct GNUNET_HashCode keyhash;
758   struct GNUNET_PEERSTORE_WatchContext *wc;
759
760   LOG (GNUNET_ERROR_TYPE_DEBUG,
761        "Received a watch record from service.\n");
762   record = PEERSTORE_parse_record_message (msg);
763   if (NULL == record)
764   {
765     reconnect (h);
766     return;
767   }
768   PEERSTORE_hash_key (record->sub_system,
769                       &record->peer,
770                       record->key,
771                       &keyhash);
772   // FIXME: what if there are multiple watches for the same key?
773   wc = GNUNET_CONTAINER_multihashmap_get (h->watches,
774                                           &keyhash);
775   if (NULL == wc)
776   {
777     LOG (GNUNET_ERROR_TYPE_ERROR,
778          _("Received a watch result for a non existing watch.\n"));
779     PEERSTORE_destroy_record (record);
780     reconnect (h);
781     return;
782   }
783   if (NULL != wc->callback)
784     wc->callback (wc->callback_cls,
785                   record,
786                   NULL);
787   PEERSTORE_destroy_record (record);
788 }
789
790
791 /**
792  * Close the existing connection to PEERSTORE and reconnect.
793  *
794  * @param h handle to the service
795  */
796 static void
797 reconnect (struct GNUNET_PEERSTORE_Handle *h)
798 {
799   struct GNUNET_MQ_MessageHandler mq_handlers[] = {
800     GNUNET_MQ_hd_fixed_size (iterate_end,
801                              GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END,
802                              struct GNUNET_MessageHeader,
803                              h),
804     GNUNET_MQ_hd_var_size (iterate_result,
805                            GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD,
806                            struct StoreRecordMessage,
807                            h),
808     GNUNET_MQ_hd_var_size (watch_record,
809                            GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD,
810                            struct StoreRecordMessage,
811                            h),
812     GNUNET_MQ_handler_end ()
813   };
814   struct GNUNET_PEERSTORE_IterateContext *ic;
815   struct GNUNET_PEERSTORE_IterateContext *next;
816   GNUNET_PEERSTORE_Processor icb;
817   void *icb_cls;
818   struct GNUNET_PEERSTORE_StoreContext *sc;
819   struct GNUNET_MQ_Envelope *ev;
820
821   LOG (GNUNET_ERROR_TYPE_DEBUG,
822        "Reconnecting...\n");
823   for (ic = h->iterate_head; NULL != ic; ic = next)
824   {
825     next = ic->next;
826     if (GNUNET_YES == ic->iterating)
827     {
828       icb = ic->callback;
829       icb_cls = ic->callback_cls;
830       GNUNET_PEERSTORE_iterate_cancel (ic);
831       if (NULL != icb)
832         icb (icb_cls,
833              NULL,
834              "Iteration canceled due to reconnection");
835     }
836   }
837   if (NULL != h->mq)
838   {
839     GNUNET_MQ_destroy (h->mq);
840     h->mq = NULL;
841   }
842   h->mq = GNUNET_CLIENT_connect (h->cfg,
843                                  "peerstore",
844                                  mq_handlers,
845                                  &handle_client_error,
846                                  h);
847   if (NULL == h->mq)
848     return;
849   LOG (GNUNET_ERROR_TYPE_DEBUG,
850        "Resending pending requests after reconnect.\n");
851   if (NULL != h->watches)
852     GNUNET_CONTAINER_multihashmap_iterate (h->watches,
853                                            &rewatch_it,
854                                            h);
855   for (ic = h->iterate_head; NULL != ic; ic = ic->next)
856   {
857     ev = PEERSTORE_create_record_mq_envelope (ic->sub_system,
858                                               &ic->peer,
859                                               ic->key,
860                                               NULL, 0,
861                                               GNUNET_TIME_UNIT_FOREVER_ABS,
862                                               0,
863                                               GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
864     GNUNET_MQ_send (h->mq, ev);
865     if (NULL != ic->timeout_task)
866       GNUNET_SCHEDULER_cancel (ic->timeout_task);
867     ic->timeout_task
868       = GNUNET_SCHEDULER_add_delayed (ic->timeout,
869                                       &iterate_timeout,
870                                       ic);
871   }
872   for (sc = h->store_head; NULL != sc; sc = sc->next)
873   {
874     ev = PEERSTORE_create_record_mq_envelope (sc->sub_system,
875                                               &sc->peer,
876                                               sc->key,
877                                               sc->value,
878                                               sc->size,
879                                               sc->expiry,
880                                               sc->options,
881                                               GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
882     GNUNET_MQ_notify_sent (ev,
883                            &store_request_sent,
884                            sc);
885     GNUNET_MQ_send (h->mq,
886                     ev);
887   }
888 }
889
890
891 /**
892  * Cancel a watch request
893  *
894  * @param wc handle to the watch request
895  */
896 void
897 GNUNET_PEERSTORE_watch_cancel (struct GNUNET_PEERSTORE_WatchContext *wc)
898 {
899   struct GNUNET_PEERSTORE_Handle *h = wc->h;
900   struct GNUNET_MQ_Envelope *ev;
901   struct StoreKeyHashMessage *hm;
902
903   LOG (GNUNET_ERROR_TYPE_DEBUG,
904        "Canceling watch.\n");
905   ev = GNUNET_MQ_msg (hm,
906                       GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL);
907   hm->keyhash = wc->keyhash;
908   GNUNET_MQ_send (h->mq, ev);
909   GNUNET_CONTAINER_multihashmap_remove (h->watches,
910                                         &wc->keyhash,
911                                         wc);
912   GNUNET_free (wc);
913 }
914
915
916 /**
917  * Request watching a given key
918  * User will be notified with any new values added to key
919  *
920  * @param h handle to the PEERSTORE service
921  * @param sub_system name of sub system
922  * @param peer Peer identity
923  * @param key entry key string
924  * @param callback function called with each new value
925  * @param callback_cls closure for @a callback
926  * @return Handle to watch request
927  */
928 struct GNUNET_PEERSTORE_WatchContext *
929 GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h,
930                         const char *sub_system,
931                         const struct GNUNET_PeerIdentity *peer,
932                         const char *key,
933                         GNUNET_PEERSTORE_Processor callback,
934                         void *callback_cls)
935 {
936   struct GNUNET_MQ_Envelope *ev;
937   struct StoreKeyHashMessage *hm;
938   struct GNUNET_PEERSTORE_WatchContext *wc;
939
940   ev = GNUNET_MQ_msg (hm,
941                       GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
942   PEERSTORE_hash_key (sub_system,
943                       peer,
944                       key,
945                       &hm->keyhash);
946   wc = GNUNET_new (struct GNUNET_PEERSTORE_WatchContext);
947   wc->callback = callback;
948   wc->callback_cls = callback_cls;
949   wc->h = h;
950   wc->keyhash = hm->keyhash;
951   if (NULL == h->watches)
952     h->watches = GNUNET_CONTAINER_multihashmap_create (5,
953                                                        GNUNET_NO);
954   GNUNET_assert (GNUNET_OK ==
955                  GNUNET_CONTAINER_multihashmap_put (h->watches,
956                                                     &wc->keyhash,
957                                                     wc,
958                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
959   LOG (GNUNET_ERROR_TYPE_DEBUG,
960        "Sending a watch request for subsystem `%s', peer `%s', key `%s'.\n",
961        sub_system,
962        GNUNET_i2s (peer),
963        key);
964   GNUNET_MQ_send (h->mq,
965                   ev);
966   return wc;
967 }
968
969 /* end of peerstore_api.c */