paragraph for gnunet devs that don't know how to use the web
[oweals/gnunet.git] / src / peerstore / peerstore_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2013-2016 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14     
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18 /**
19  * @file peerstore/peerstore_api.c
20  * @brief API for peerstore
21  * @author Omar Tarabai
22  * @author Christian Grothoff
23  */
24 #include "platform.h"
25 #include "gnunet_util_lib.h"
26 #include "peerstore.h"
27 #include "peerstore_common.h"
28
29 #define LOG(kind,...) GNUNET_log_from (kind, "peerstore-api",__VA_ARGS__)
30
31 /******************************************************************************/
32 /************************      DATA STRUCTURES     ****************************/
33 /******************************************************************************/
34
35 /**
36  * Handle to the PEERSTORE service.
37  */
38 struct GNUNET_PEERSTORE_Handle
39 {
40
41   /**
42    * Our configuration.
43    */
44   const struct GNUNET_CONFIGURATION_Handle *cfg;
45
46   /**
47    * Message queue
48    */
49   struct GNUNET_MQ_Handle *mq;
50
51   /**
52    * Head of active STORE requests.
53    */
54   struct GNUNET_PEERSTORE_StoreContext *store_head;
55
56   /**
57    * Tail of active STORE requests.
58    */
59   struct GNUNET_PEERSTORE_StoreContext *store_tail;
60
61   /**
62    * Head of active ITERATE requests.
63    */
64   struct GNUNET_PEERSTORE_IterateContext *iterate_head;
65
66   /**
67    * Tail of active ITERATE requests.
68    */
69   struct GNUNET_PEERSTORE_IterateContext *iterate_tail;
70
71   /**
72    * Hashmap of watch requests
73    */
74   struct GNUNET_CONTAINER_MultiHashMap *watches;
75
76   /**
77    * Are we in the process of disconnecting but need to sync first?
78    */
79   int disconnecting;
80
81 };
82
83 /**
84  * Context for a store request
85  */
86 struct GNUNET_PEERSTORE_StoreContext
87 {
88   /**
89    * Kept in a DLL.
90    */
91   struct GNUNET_PEERSTORE_StoreContext *next;
92
93   /**
94    * Kept in a DLL.
95    */
96   struct GNUNET_PEERSTORE_StoreContext *prev;
97
98   /**
99    * Handle to the PEERSTORE service.
100    */
101   struct GNUNET_PEERSTORE_Handle *h;
102
103   /**
104    * Continuation called with service response
105    */
106   GNUNET_PEERSTORE_Continuation cont;
107
108   /**
109    * Closure for @e cont
110    */
111   void *cont_cls;
112
113   /**
114    * Which subsystem does the store?
115    */
116   char *sub_system;
117
118   /**
119    * Key for the store operation.
120    */
121   char *key;
122
123   /**
124    * Contains @e size bytes.
125    */
126   void *value;
127
128   /**
129    * Peer the store is for.
130    */
131   struct GNUNET_PeerIdentity peer;
132
133   /**
134    * Number of bytes in @e value.
135    */
136   size_t size;
137
138   /**
139    * When does the value expire?
140    */
141   struct GNUNET_TIME_Absolute expiry;
142
143   /**
144    * Options for the store operation.
145    */
146   enum GNUNET_PEERSTORE_StoreOption options;
147
148 };
149
150 /**
151  * Context for a iterate request
152  */
153 struct GNUNET_PEERSTORE_IterateContext
154 {
155   /**
156    * Kept in a DLL.
157    */
158   struct GNUNET_PEERSTORE_IterateContext *next;
159
160   /**
161    * Kept in a DLL.
162    */
163   struct GNUNET_PEERSTORE_IterateContext *prev;
164
165   /**
166    * Handle to the PEERSTORE service.
167    */
168   struct GNUNET_PEERSTORE_Handle *h;
169
170   /**
171    * Which subsystem does the store?
172    */
173   char *sub_system;
174
175   /**
176    * Peer the store is for.
177    */
178   struct GNUNET_PeerIdentity peer;
179
180   /**
181    * Key for the store operation.
182    */
183   char *key;
184
185   /**
186    * Operation timeout
187    */
188   struct GNUNET_TIME_Relative timeout;
189
190   /**
191    * Callback with each matching record
192    */
193   GNUNET_PEERSTORE_Processor callback;
194
195   /**
196    * Closure for @e callback
197    */
198   void *callback_cls;
199
200   /**
201    * #GNUNET_YES if we are currently processing records.
202    */
203   int iterating;
204
205   /**
206    * Task identifier for the function called
207    * on iterate request timeout
208    */
209   struct GNUNET_SCHEDULER_Task *timeout_task;
210
211 };
212
213 /**
214  * Context for a watch request
215  */
216 struct GNUNET_PEERSTORE_WatchContext
217 {
218   /**
219    * Kept in a DLL.
220    */
221   struct GNUNET_PEERSTORE_WatchContext *next;
222
223   /**
224    * Kept in a DLL.
225    */
226   struct GNUNET_PEERSTORE_WatchContext *prev;
227
228   /**
229    * Handle to the PEERSTORE service.
230    */
231   struct GNUNET_PEERSTORE_Handle *h;
232
233   /**
234    * Callback with each record received
235    */
236   GNUNET_PEERSTORE_Processor callback;
237
238   /**
239    * Closure for @e callback
240    */
241   void *callback_cls;
242
243   /**
244    * Hash of the combined key
245    */
246   struct GNUNET_HashCode keyhash;
247
248 };
249
250 /******************************************************************************/
251 /*******************             DECLARATIONS             *********************/
252 /******************************************************************************/
253
254 /**
255  * Close the existing connection to PEERSTORE and reconnect.
256  *
257  * @param h handle to the service
258  */
259 static void
260 reconnect (struct GNUNET_PEERSTORE_Handle *h);
261
262
263 /**
264  * Callback after MQ envelope is sent
265  *
266  * @param cls a `struct GNUNET_PEERSTORE_StoreContext *`
267  */
268 static void
269 store_request_sent (void *cls)
270 {
271   struct GNUNET_PEERSTORE_StoreContext *sc = cls;
272   GNUNET_PEERSTORE_Continuation cont;
273   void *cont_cls;
274
275   cont = sc->cont;
276   cont_cls = sc->cont_cls;
277   GNUNET_PEERSTORE_store_cancel (sc);
278   if (NULL != cont)
279     cont (cont_cls, GNUNET_OK);
280 }
281
282
283 /******************************************************************************/
284 /*******************         CONNECTION FUNCTIONS         *********************/
285 /******************************************************************************/
286
287
288 /**
289  * Function called when we had trouble talking to the service.
290  */
291 static void
292 handle_client_error (void *cls,
293                      enum GNUNET_MQ_Error error)
294 {
295   struct GNUNET_PEERSTORE_Handle *h = cls;
296
297   LOG (GNUNET_ERROR_TYPE_ERROR,
298        "Received an error notification from MQ of type: %d\n",
299        error);
300   reconnect (h);
301 }
302
303
304 /**
305  * Iterator over previous watches to resend them
306  *
307  * @param cls the `struct GNUNET_PEERSTORE_Handle`
308  * @param key key for the watch
309  * @param value the `struct GNUNET_PEERSTORE_WatchContext *`
310  * @return #GNUNET_YES (continue to iterate)
311  */
312 static int
313 rewatch_it (void *cls,
314             const struct GNUNET_HashCode *key,
315             void *value)
316 {
317   struct GNUNET_PEERSTORE_Handle *h = cls;
318   struct GNUNET_PEERSTORE_WatchContext *wc = value;
319   struct StoreKeyHashMessage *hm;
320   struct GNUNET_MQ_Envelope *ev;
321
322   ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
323   hm->keyhash = wc->keyhash;
324   GNUNET_MQ_send (h->mq, ev);
325   return GNUNET_YES;
326 }
327
328
329 /**
330  * Called when the iterate request is timedout
331  *
332  * @param cls a `struct GNUNET_PEERSTORE_IterateContext *`
333  */
334 static void
335 iterate_timeout (void *cls)
336 {
337   struct GNUNET_PEERSTORE_IterateContext *ic = cls;
338   GNUNET_PEERSTORE_Processor callback;
339   void *callback_cls;
340
341   ic->timeout_task = NULL;
342   callback = ic->callback;
343   callback_cls = ic->callback_cls;
344   GNUNET_PEERSTORE_iterate_cancel (ic);
345   if (NULL != callback)
346     callback (callback_cls,
347               NULL,
348               _("timeout"));
349 }
350
351
352 /**
353  * Iterator over watch requests to cancel them.
354  *
355  * @param cls unsused
356  * @param key key to the watch request
357  * @param value watch context
358  * @return #GNUNET_YES to continue iteration
359  */
360 static int
361 destroy_watch (void *cls,
362                const struct GNUNET_HashCode *key,
363                void *value)
364 {
365   struct GNUNET_PEERSTORE_WatchContext *wc = value;
366
367   GNUNET_PEERSTORE_watch_cancel (wc);
368   return GNUNET_YES;
369 }
370
371
372 /**
373  * Kill the connection to the service. This can be delayed in case of pending
374  * STORE requests and the user explicitly asked to sync first. Otherwise it is
375  * performed instantly.
376  *
377  * @param h Handle to the service.
378  */
379 static void
380 do_disconnect (struct GNUNET_PEERSTORE_Handle *h)
381 {
382   if (NULL != h->mq)
383   {
384     GNUNET_MQ_destroy (h->mq);
385     h->mq = NULL;
386   }
387   GNUNET_free (h);
388 }
389
390
391 /**
392  * Connect to the PEERSTORE service.
393  *
394  * @param cfg configuration to use
395  * @return NULL on error
396  */
397 struct GNUNET_PEERSTORE_Handle *
398 GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
399 {
400   struct GNUNET_PEERSTORE_Handle *h;
401
402   h = GNUNET_new (struct GNUNET_PEERSTORE_Handle);
403   h->cfg = cfg;
404   h->disconnecting = GNUNET_NO;
405   reconnect (h);
406   if (NULL == h->mq)
407   {
408     GNUNET_free (h);
409     return NULL;
410   }
411   return h;
412 }
413
414
415 /**
416  * Disconnect from the PEERSTORE service. Any pending ITERATE and WATCH requests
417  * will be canceled.
418  * Any pending STORE requests will depend on @e snyc_first flag.
419  *
420  * @param h handle to disconnect
421  * @param sync_first send any pending STORE requests before disconnecting
422  */
423 void
424 GNUNET_PEERSTORE_disconnect (struct GNUNET_PEERSTORE_Handle *h,
425                              int sync_first)
426 {
427   struct GNUNET_PEERSTORE_IterateContext *ic;
428   struct GNUNET_PEERSTORE_StoreContext *sc;
429
430   LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting.\n");
431   if (NULL != h->watches)
432   {
433     GNUNET_CONTAINER_multihashmap_iterate (h->watches, &destroy_watch, NULL);
434     GNUNET_CONTAINER_multihashmap_destroy (h->watches);
435     h->watches = NULL;
436   }
437   while (NULL != (ic = h->iterate_head))
438   {
439     GNUNET_break (0);
440     GNUNET_PEERSTORE_iterate_cancel (ic);
441   }
442   if (NULL != h->store_head)
443   {
444     if (GNUNET_YES == sync_first)
445     {
446       LOG (GNUNET_ERROR_TYPE_DEBUG,
447            "Delaying disconnection due to pending store requests.\n");
448       h->disconnecting = GNUNET_YES;
449       return;
450     }
451     while (NULL != (sc = h->store_head))
452       GNUNET_PEERSTORE_store_cancel (sc);
453   }
454   do_disconnect (h);
455 }
456
457
458 /******************************************************************************/
459 /*******************            STORE FUNCTIONS           *********************/
460 /******************************************************************************/
461
462
463 /**
464  * Cancel a store request
465  *
466  * @param sc Store request context
467  */
468 void
469 GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc)
470 {
471   struct GNUNET_PEERSTORE_Handle *h = sc->h;
472
473   GNUNET_CONTAINER_DLL_remove (sc->h->store_head, sc->h->store_tail, sc);
474   GNUNET_free (sc->sub_system);
475   GNUNET_free (sc->value);
476   GNUNET_free (sc->key);
477   GNUNET_free (sc);
478   if ((GNUNET_YES == h->disconnecting) && (NULL == h->store_head))
479     do_disconnect (h);
480 }
481
482
483 /**
484  * Store a new entry in the PEERSTORE.
485  * Note that stored entries can be lost in some cases
486  * such as power failure.
487  *
488  * @param h Handle to the PEERSTORE service
489  * @param sub_system name of the sub system
490  * @param peer Peer Identity
491  * @param key entry key
492  * @param value entry value BLOB
493  * @param size size of @e value
494  * @param expiry absolute time after which the entry is (possibly) deleted
495  * @param options options specific to the storage operation
496  * @param cont Continuation function after the store request is sent
497  * @param cont_cls Closure for @a cont
498  */
499 struct GNUNET_PEERSTORE_StoreContext *
500 GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
501                         const char *sub_system,
502                         const struct GNUNET_PeerIdentity *peer,
503                         const char *key,
504                         const void *value, size_t size,
505                         struct GNUNET_TIME_Absolute expiry,
506                         enum GNUNET_PEERSTORE_StoreOption options,
507                         GNUNET_PEERSTORE_Continuation cont,
508                         void *cont_cls)
509 {
510   struct GNUNET_MQ_Envelope *ev;
511   struct GNUNET_PEERSTORE_StoreContext *sc;
512
513   LOG (GNUNET_ERROR_TYPE_DEBUG,
514        "Storing value (size: %lu) for subsytem `%s', peer `%s', key `%s'\n",
515        size, sub_system, GNUNET_i2s (peer), key);
516   ev = PEERSTORE_create_record_mq_envelope (sub_system, peer, key, value, size,
517                                             expiry, options,
518                                             GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
519   sc = GNUNET_new (struct GNUNET_PEERSTORE_StoreContext);
520
521   sc->sub_system = GNUNET_strdup (sub_system);
522   sc->peer = *peer;
523   sc->key = GNUNET_strdup (key);
524   sc->value = GNUNET_memdup (value, size);
525   sc->size = size;
526   sc->expiry = expiry;
527   sc->options = options;
528   sc->cont = cont;
529   sc->cont_cls = cont_cls;
530   sc->h = h;
531
532   GNUNET_CONTAINER_DLL_insert_tail (h->store_head, h->store_tail, sc);
533   GNUNET_MQ_notify_sent (ev, &store_request_sent, sc);
534   GNUNET_MQ_send (h->mq, ev);
535   return sc;
536
537 }
538
539
540 /******************************************************************************/
541 /*******************           ITERATE FUNCTIONS          *********************/
542 /******************************************************************************/
543
544
545 /**
546  * When a response for iterate request is received
547  *
548  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
549  * @param msg message received
550  */
551 static void
552 handle_iterate_end (void *cls,
553                     const struct GNUNET_MessageHeader *msg)
554 {
555   struct GNUNET_PEERSTORE_Handle *h = cls;
556   struct GNUNET_PEERSTORE_IterateContext *ic;
557   GNUNET_PEERSTORE_Processor callback;
558   void *callback_cls;
559
560   ic = h->iterate_head;
561   if (NULL == ic)
562   {
563     LOG (GNUNET_ERROR_TYPE_ERROR,
564          _("Unexpected iteration response, this should not happen.\n"));
565     reconnect (h);
566     return;
567   }
568   callback = ic->callback;
569   callback_cls = ic->callback_cls;
570   ic->iterating = GNUNET_NO;
571   GNUNET_PEERSTORE_iterate_cancel (ic);
572   if (NULL != callback)
573     callback (callback_cls, NULL, NULL);
574 }
575
576
577 /**
578  * When a response for iterate request is received, check the
579  * message is well-formed.
580  *
581  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
582  * @param msg message received
583  */
584 static int
585 check_iterate_result (void *cls,
586                       const struct StoreRecordMessage *msg)
587 {
588   /* we defer validation to #handle_iterate_result */
589   return GNUNET_OK;
590 }
591
592
593 /**
594  * When a response for iterate request is received
595  *
596  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
597  * @param msg message received
598  */
599 static void
600 handle_iterate_result (void *cls,
601                        const struct StoreRecordMessage *msg)
602 {
603   struct GNUNET_PEERSTORE_Handle *h = cls;
604   struct GNUNET_PEERSTORE_IterateContext *ic;
605   GNUNET_PEERSTORE_Processor callback;
606   void *callback_cls;
607   struct GNUNET_PEERSTORE_Record *record;
608
609   ic = h->iterate_head;
610   if (NULL == ic)
611   {
612     LOG (GNUNET_ERROR_TYPE_ERROR,
613          _("Unexpected iteration response, this should not happen.\n"));
614     reconnect (h);
615     return;
616   }
617   ic->iterating = GNUNET_YES;
618   callback = ic->callback;
619   callback_cls = ic->callback_cls;
620   if (NULL == callback)
621     return;
622   record = PEERSTORE_parse_record_message (msg);
623   if (NULL == record)
624   {
625     callback (callback_cls,
626               NULL,
627               _("Received a malformed response from service."));
628   }
629   else
630   {
631     callback (callback_cls,
632               record,
633               NULL);
634     PEERSTORE_destroy_record (record);
635   }
636 }
637
638
639 /**
640  * Cancel an iterate request
641  * Please do not call after the iterate request is done
642  *
643  * @param ic Iterate request context as returned by GNUNET_PEERSTORE_iterate()
644  */
645 void
646 GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic)
647 {
648   if (NULL != ic->timeout_task)
649   {
650     GNUNET_SCHEDULER_cancel (ic->timeout_task);
651     ic->timeout_task = NULL;
652   }
653   if (GNUNET_NO == ic->iterating)
654   {
655     GNUNET_CONTAINER_DLL_remove (ic->h->iterate_head,
656                                  ic->h->iterate_tail,
657                                  ic);
658     GNUNET_free (ic->sub_system);
659     GNUNET_free_non_null (ic->key);
660     GNUNET_free (ic);
661   }
662   else
663     ic->callback = NULL;
664 }
665
666
667 /**
668  * Iterate over records matching supplied key information
669  *
670  * @param h handle to the PEERSTORE service
671  * @param sub_system name of sub system
672  * @param peer Peer identity (can be NULL)
673  * @param key entry key string (can be NULL)
674  * @param timeout time after which the iterate request is canceled
675  * @param callback function called with each matching record, all NULL's on end
676  * @param callback_cls closure for @a callback
677  * @return Handle to iteration request
678  */
679 struct GNUNET_PEERSTORE_IterateContext *
680 GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
681                           const char *sub_system,
682                           const struct GNUNET_PeerIdentity *peer,
683                           const char *key,
684                           struct GNUNET_TIME_Relative timeout,
685                           GNUNET_PEERSTORE_Processor callback,
686                           void *callback_cls)
687 {
688   struct GNUNET_MQ_Envelope *ev;
689   struct GNUNET_PEERSTORE_IterateContext *ic;
690
691   ev = PEERSTORE_create_record_mq_envelope (sub_system,
692                                             peer,
693                                             key,
694                                             NULL, 0,
695                                             GNUNET_TIME_UNIT_FOREVER_ABS,
696                                             0,
697                                             GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
698   ic = GNUNET_new (struct GNUNET_PEERSTORE_IterateContext);
699
700   ic->callback = callback;
701   ic->callback_cls = callback_cls;
702   ic->h = h;
703   ic->sub_system = GNUNET_strdup (sub_system);
704   if (NULL != peer)
705     ic->peer = *peer;
706   if (NULL != key)
707     ic->key = GNUNET_strdup (key);
708   ic->timeout = timeout;
709   GNUNET_CONTAINER_DLL_insert_tail (h->iterate_head,
710                                     h->iterate_tail,
711                                     ic);
712   LOG (GNUNET_ERROR_TYPE_DEBUG,
713        "Sending an iterate request for sub system `%s'\n",
714        sub_system);
715   GNUNET_MQ_send (h->mq, ev);
716   ic->timeout_task =
717       GNUNET_SCHEDULER_add_delayed (timeout,
718                                     &iterate_timeout,
719                                     ic);
720   return ic;
721 }
722
723
724 /******************************************************************************/
725 /*******************            WATCH FUNCTIONS           *********************/
726 /******************************************************************************/
727
728 /**
729  * When a watch record is received, validate it is well-formed.
730  *
731  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
732  * @param msg message received
733  */
734 static int
735 check_watch_record (void *cls,
736                     const struct StoreRecordMessage *msg)
737 {
738   /* we defer validation to #handle_watch_result */
739   return GNUNET_OK;
740 }
741
742
743 /**
744  * When a watch record is received, process it.
745  *
746  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
747  * @param msg message received
748  */
749 static void
750 handle_watch_record (void *cls,
751                      const struct StoreRecordMessage *msg)
752 {
753   struct GNUNET_PEERSTORE_Handle *h = cls;
754   struct GNUNET_PEERSTORE_Record *record;
755   struct GNUNET_HashCode keyhash;
756   struct GNUNET_PEERSTORE_WatchContext *wc;
757
758   LOG (GNUNET_ERROR_TYPE_DEBUG,
759        "Received a watch record from service.\n");
760   record = PEERSTORE_parse_record_message (msg);
761   if (NULL == record)
762   {
763     reconnect (h);
764     return;
765   }
766   PEERSTORE_hash_key (record->sub_system,
767                       &record->peer,
768                       record->key,
769                       &keyhash);
770   // FIXME: what if there are multiple watches for the same key?
771   wc = GNUNET_CONTAINER_multihashmap_get (h->watches,
772                                           &keyhash);
773   if (NULL == wc)
774   {
775     LOG (GNUNET_ERROR_TYPE_ERROR,
776          _("Received a watch result for a non existing watch.\n"));
777     PEERSTORE_destroy_record (record);
778     reconnect (h);
779     return;
780   }
781   if (NULL != wc->callback)
782     wc->callback (wc->callback_cls,
783                   record,
784                   NULL);
785   PEERSTORE_destroy_record (record);
786 }
787
788
789 /**
790  * Close the existing connection to PEERSTORE and reconnect.
791  *
792  * @param h handle to the service
793  */
794 static void
795 reconnect (struct GNUNET_PEERSTORE_Handle *h)
796 {
797   struct GNUNET_MQ_MessageHandler mq_handlers[] = {
798     GNUNET_MQ_hd_fixed_size (iterate_end,
799                              GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END,
800                              struct GNUNET_MessageHeader,
801                              h),
802     GNUNET_MQ_hd_var_size (iterate_result,
803                            GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD,
804                            struct StoreRecordMessage,
805                            h),
806     GNUNET_MQ_hd_var_size (watch_record,
807                            GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD,
808                            struct StoreRecordMessage,
809                            h),
810     GNUNET_MQ_handler_end ()
811   };
812   struct GNUNET_PEERSTORE_IterateContext *ic;
813   struct GNUNET_PEERSTORE_IterateContext *next;
814   GNUNET_PEERSTORE_Processor icb;
815   void *icb_cls;
816   struct GNUNET_PEERSTORE_StoreContext *sc;
817   struct GNUNET_MQ_Envelope *ev;
818
819   LOG (GNUNET_ERROR_TYPE_DEBUG,
820        "Reconnecting...\n");
821   for (ic = h->iterate_head; NULL != ic; ic = next)
822   {
823     next = ic->next;
824     if (GNUNET_YES == ic->iterating)
825     {
826       icb = ic->callback;
827       icb_cls = ic->callback_cls;
828       GNUNET_PEERSTORE_iterate_cancel (ic);
829       if (NULL != icb)
830         icb (icb_cls,
831              NULL,
832              "Iteration canceled due to reconnection");
833     }
834   }
835   if (NULL != h->mq)
836   {
837     GNUNET_MQ_destroy (h->mq);
838     h->mq = NULL;
839   }
840   h->mq = GNUNET_CLIENT_connect (h->cfg,
841                                  "peerstore",
842                                  mq_handlers,
843                                  &handle_client_error,
844                                  h);
845   if (NULL == h->mq)
846     return;
847   LOG (GNUNET_ERROR_TYPE_DEBUG,
848        "Resending pending requests after reconnect.\n");
849   if (NULL != h->watches)
850     GNUNET_CONTAINER_multihashmap_iterate (h->watches,
851                                            &rewatch_it,
852                                            h);
853   for (ic = h->iterate_head; NULL != ic; ic = ic->next)
854   {
855     ev = PEERSTORE_create_record_mq_envelope (ic->sub_system,
856                                               &ic->peer,
857                                               ic->key,
858                                               NULL, 0,
859                                               GNUNET_TIME_UNIT_FOREVER_ABS,
860                                               0,
861                                               GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
862     GNUNET_MQ_send (h->mq, ev);
863     if (NULL != ic->timeout_task)
864       GNUNET_SCHEDULER_cancel (ic->timeout_task);
865     ic->timeout_task
866       = GNUNET_SCHEDULER_add_delayed (ic->timeout,
867                                       &iterate_timeout,
868                                       ic);
869   }
870   for (sc = h->store_head; NULL != sc; sc = sc->next)
871   {
872     ev = PEERSTORE_create_record_mq_envelope (sc->sub_system,
873                                               &sc->peer,
874                                               sc->key,
875                                               sc->value,
876                                               sc->size,
877                                               sc->expiry,
878                                               sc->options,
879                                               GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
880     GNUNET_MQ_notify_sent (ev,
881                            &store_request_sent,
882                            sc);
883     GNUNET_MQ_send (h->mq,
884                     ev);
885   }
886 }
887
888
889 /**
890  * Cancel a watch request
891  *
892  * @param wc handle to the watch request
893  */
894 void
895 GNUNET_PEERSTORE_watch_cancel (struct GNUNET_PEERSTORE_WatchContext *wc)
896 {
897   struct GNUNET_PEERSTORE_Handle *h = wc->h;
898   struct GNUNET_MQ_Envelope *ev;
899   struct StoreKeyHashMessage *hm;
900
901   LOG (GNUNET_ERROR_TYPE_DEBUG,
902        "Canceling watch.\n");
903   ev = GNUNET_MQ_msg (hm,
904                       GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL);
905   hm->keyhash = wc->keyhash;
906   GNUNET_MQ_send (h->mq, ev);
907   GNUNET_CONTAINER_multihashmap_remove (h->watches,
908                                         &wc->keyhash,
909                                         wc);
910   GNUNET_free (wc);
911 }
912
913
914 /**
915  * Request watching a given key
916  * User will be notified with any new values added to key
917  *
918  * @param h handle to the PEERSTORE service
919  * @param sub_system name of sub system
920  * @param peer Peer identity
921  * @param key entry key string
922  * @param callback function called with each new value
923  * @param callback_cls closure for @a callback
924  * @return Handle to watch request
925  */
926 struct GNUNET_PEERSTORE_WatchContext *
927 GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h,
928                         const char *sub_system,
929                         const struct GNUNET_PeerIdentity *peer,
930                         const char *key,
931                         GNUNET_PEERSTORE_Processor callback,
932                         void *callback_cls)
933 {
934   struct GNUNET_MQ_Envelope *ev;
935   struct StoreKeyHashMessage *hm;
936   struct GNUNET_PEERSTORE_WatchContext *wc;
937
938   ev = GNUNET_MQ_msg (hm,
939                       GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
940   PEERSTORE_hash_key (sub_system,
941                       peer,
942                       key,
943                       &hm->keyhash);
944   wc = GNUNET_new (struct GNUNET_PEERSTORE_WatchContext);
945   wc->callback = callback;
946   wc->callback_cls = callback_cls;
947   wc->h = h;
948   wc->keyhash = hm->keyhash;
949   if (NULL == h->watches)
950     h->watches = GNUNET_CONTAINER_multihashmap_create (5,
951                                                        GNUNET_NO);
952   GNUNET_assert (GNUNET_OK ==
953                  GNUNET_CONTAINER_multihashmap_put (h->watches,
954                                                     &wc->keyhash,
955                                                     wc,
956                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
957   LOG (GNUNET_ERROR_TYPE_DEBUG,
958        "Sending a watch request for subsystem `%s', peer `%s', key `%s'.\n",
959        sub_system,
960        GNUNET_i2s (peer),
961        key);
962   GNUNET_MQ_send (h->mq,
963                   ev);
964   return wc;
965 }
966
967 /* end of peerstore_api.c */