first batch of license fixes (boring)
[oweals/gnunet.git] / src / peerstore / peerstore_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2013-2016 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14 */
15 /**
16  * @file peerstore/peerstore_api.c
17  * @brief API for peerstore
18  * @author Omar Tarabai
19  * @author Christian Grothoff
20  */
21 #include "platform.h"
22 #include "gnunet_util_lib.h"
23 #include "peerstore.h"
24 #include "peerstore_common.h"
25
26 #define LOG(kind,...) GNUNET_log_from (kind, "peerstore-api",__VA_ARGS__)
27
28 /******************************************************************************/
29 /************************      DATA STRUCTURES     ****************************/
30 /******************************************************************************/
31
32 /**
33  * Handle to the PEERSTORE service.
34  */
35 struct GNUNET_PEERSTORE_Handle
36 {
37
38   /**
39    * Our configuration.
40    */
41   const struct GNUNET_CONFIGURATION_Handle *cfg;
42
43   /**
44    * Message queue
45    */
46   struct GNUNET_MQ_Handle *mq;
47
48   /**
49    * Head of active STORE requests.
50    */
51   struct GNUNET_PEERSTORE_StoreContext *store_head;
52
53   /**
54    * Tail of active STORE requests.
55    */
56   struct GNUNET_PEERSTORE_StoreContext *store_tail;
57
58   /**
59    * Head of active ITERATE requests.
60    */
61   struct GNUNET_PEERSTORE_IterateContext *iterate_head;
62
63   /**
64    * Tail of active ITERATE requests.
65    */
66   struct GNUNET_PEERSTORE_IterateContext *iterate_tail;
67
68   /**
69    * Hashmap of watch requests
70    */
71   struct GNUNET_CONTAINER_MultiHashMap *watches;
72
73   /**
74    * Are we in the process of disconnecting but need to sync first?
75    */
76   int disconnecting;
77
78 };
79
80 /**
81  * Context for a store request
82  */
83 struct GNUNET_PEERSTORE_StoreContext
84 {
85   /**
86    * Kept in a DLL.
87    */
88   struct GNUNET_PEERSTORE_StoreContext *next;
89
90   /**
91    * Kept in a DLL.
92    */
93   struct GNUNET_PEERSTORE_StoreContext *prev;
94
95   /**
96    * Handle to the PEERSTORE service.
97    */
98   struct GNUNET_PEERSTORE_Handle *h;
99
100   /**
101    * Continuation called with service response
102    */
103   GNUNET_PEERSTORE_Continuation cont;
104
105   /**
106    * Closure for @e cont
107    */
108   void *cont_cls;
109
110   /**
111    * Which subsystem does the store?
112    */
113   char *sub_system;
114
115   /**
116    * Key for the store operation.
117    */
118   char *key;
119
120   /**
121    * Contains @e size bytes.
122    */
123   void *value;
124
125   /**
126    * Peer the store is for.
127    */
128   struct GNUNET_PeerIdentity peer;
129
130   /**
131    * Number of bytes in @e value.
132    */
133   size_t size;
134
135   /**
136    * When does the value expire?
137    */
138   struct GNUNET_TIME_Absolute expiry;
139
140   /**
141    * Options for the store operation.
142    */
143   enum GNUNET_PEERSTORE_StoreOption options;
144
145 };
146
147 /**
148  * Context for a iterate request
149  */
150 struct GNUNET_PEERSTORE_IterateContext
151 {
152   /**
153    * Kept in a DLL.
154    */
155   struct GNUNET_PEERSTORE_IterateContext *next;
156
157   /**
158    * Kept in a DLL.
159    */
160   struct GNUNET_PEERSTORE_IterateContext *prev;
161
162   /**
163    * Handle to the PEERSTORE service.
164    */
165   struct GNUNET_PEERSTORE_Handle *h;
166
167   /**
168    * Which subsystem does the store?
169    */
170   char *sub_system;
171
172   /**
173    * Peer the store is for.
174    */
175   struct GNUNET_PeerIdentity peer;
176
177   /**
178    * Key for the store operation.
179    */
180   char *key;
181
182   /**
183    * Operation timeout
184    */
185   struct GNUNET_TIME_Relative timeout;
186
187   /**
188    * Callback with each matching record
189    */
190   GNUNET_PEERSTORE_Processor callback;
191
192   /**
193    * Closure for @e callback
194    */
195   void *callback_cls;
196
197   /**
198    * #GNUNET_YES if we are currently processing records.
199    */
200   int iterating;
201
202   /**
203    * Task identifier for the function called
204    * on iterate request timeout
205    */
206   struct GNUNET_SCHEDULER_Task *timeout_task;
207
208 };
209
210 /**
211  * Context for a watch request
212  */
213 struct GNUNET_PEERSTORE_WatchContext
214 {
215   /**
216    * Kept in a DLL.
217    */
218   struct GNUNET_PEERSTORE_WatchContext *next;
219
220   /**
221    * Kept in a DLL.
222    */
223   struct GNUNET_PEERSTORE_WatchContext *prev;
224
225   /**
226    * Handle to the PEERSTORE service.
227    */
228   struct GNUNET_PEERSTORE_Handle *h;
229
230   /**
231    * Callback with each record received
232    */
233   GNUNET_PEERSTORE_Processor callback;
234
235   /**
236    * Closure for @e callback
237    */
238   void *callback_cls;
239
240   /**
241    * Hash of the combined key
242    */
243   struct GNUNET_HashCode keyhash;
244
245 };
246
247 /******************************************************************************/
248 /*******************             DECLARATIONS             *********************/
249 /******************************************************************************/
250
251 /**
252  * Close the existing connection to PEERSTORE and reconnect.
253  *
254  * @param h handle to the service
255  */
256 static void
257 reconnect (struct GNUNET_PEERSTORE_Handle *h);
258
259
260 /**
261  * Callback after MQ envelope is sent
262  *
263  * @param cls a `struct GNUNET_PEERSTORE_StoreContext *`
264  */
265 static void
266 store_request_sent (void *cls)
267 {
268   struct GNUNET_PEERSTORE_StoreContext *sc = cls;
269   GNUNET_PEERSTORE_Continuation cont;
270   void *cont_cls;
271
272   cont = sc->cont;
273   cont_cls = sc->cont_cls;
274   GNUNET_PEERSTORE_store_cancel (sc);
275   if (NULL != cont)
276     cont (cont_cls, GNUNET_OK);
277 }
278
279
280 /******************************************************************************/
281 /*******************         CONNECTION FUNCTIONS         *********************/
282 /******************************************************************************/
283
284
285 /**
286  * Function called when we had trouble talking to the service.
287  */
288 static void
289 handle_client_error (void *cls,
290                      enum GNUNET_MQ_Error error)
291 {
292   struct GNUNET_PEERSTORE_Handle *h = cls;
293
294   LOG (GNUNET_ERROR_TYPE_ERROR,
295        "Received an error notification from MQ of type: %d\n",
296        error);
297   reconnect (h);
298 }
299
300
301 /**
302  * Iterator over previous watches to resend them
303  *
304  * @param cls the `struct GNUNET_PEERSTORE_Handle`
305  * @param key key for the watch
306  * @param value the `struct GNUNET_PEERSTORE_WatchContext *`
307  * @return #GNUNET_YES (continue to iterate)
308  */
309 static int
310 rewatch_it (void *cls,
311             const struct GNUNET_HashCode *key,
312             void *value)
313 {
314   struct GNUNET_PEERSTORE_Handle *h = cls;
315   struct GNUNET_PEERSTORE_WatchContext *wc = value;
316   struct StoreKeyHashMessage *hm;
317   struct GNUNET_MQ_Envelope *ev;
318
319   ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
320   hm->keyhash = wc->keyhash;
321   GNUNET_MQ_send (h->mq, ev);
322   return GNUNET_YES;
323 }
324
325
326 /**
327  * Called when the iterate request is timedout
328  *
329  * @param cls a `struct GNUNET_PEERSTORE_IterateContext *`
330  */
331 static void
332 iterate_timeout (void *cls)
333 {
334   struct GNUNET_PEERSTORE_IterateContext *ic = cls;
335   GNUNET_PEERSTORE_Processor callback;
336   void *callback_cls;
337
338   ic->timeout_task = NULL;
339   callback = ic->callback;
340   callback_cls = ic->callback_cls;
341   GNUNET_PEERSTORE_iterate_cancel (ic);
342   if (NULL != callback)
343     callback (callback_cls,
344               NULL,
345               _("timeout"));
346 }
347
348
349 /**
350  * Iterator over watch requests to cancel them.
351  *
352  * @param cls unsused
353  * @param key key to the watch request
354  * @param value watch context
355  * @return #GNUNET_YES to continue iteration
356  */
357 static int
358 destroy_watch (void *cls,
359                const struct GNUNET_HashCode *key,
360                void *value)
361 {
362   struct GNUNET_PEERSTORE_WatchContext *wc = value;
363
364   GNUNET_PEERSTORE_watch_cancel (wc);
365   return GNUNET_YES;
366 }
367
368
369 /**
370  * Kill the connection to the service. This can be delayed in case of pending
371  * STORE requests and the user explicitly asked to sync first. Otherwise it is
372  * performed instantly.
373  *
374  * @param h Handle to the service.
375  */
376 static void
377 do_disconnect (struct GNUNET_PEERSTORE_Handle *h)
378 {
379   if (NULL != h->mq)
380   {
381     GNUNET_MQ_destroy (h->mq);
382     h->mq = NULL;
383   }
384   GNUNET_free (h);
385 }
386
387
388 /**
389  * Connect to the PEERSTORE service.
390  *
391  * @param cfg configuration to use
392  * @return NULL on error
393  */
394 struct GNUNET_PEERSTORE_Handle *
395 GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
396 {
397   struct GNUNET_PEERSTORE_Handle *h;
398
399   h = GNUNET_new (struct GNUNET_PEERSTORE_Handle);
400   h->cfg = cfg;
401   h->disconnecting = GNUNET_NO;
402   reconnect (h);
403   if (NULL == h->mq)
404   {
405     GNUNET_free (h);
406     return NULL;
407   }
408   return h;
409 }
410
411
412 /**
413  * Disconnect from the PEERSTORE service. Any pending ITERATE and WATCH requests
414  * will be canceled.
415  * Any pending STORE requests will depend on @e snyc_first flag.
416  *
417  * @param h handle to disconnect
418  * @param sync_first send any pending STORE requests before disconnecting
419  */
420 void
421 GNUNET_PEERSTORE_disconnect (struct GNUNET_PEERSTORE_Handle *h,
422                              int sync_first)
423 {
424   struct GNUNET_PEERSTORE_IterateContext *ic;
425   struct GNUNET_PEERSTORE_StoreContext *sc;
426
427   LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting.\n");
428   if (NULL != h->watches)
429   {
430     GNUNET_CONTAINER_multihashmap_iterate (h->watches, &destroy_watch, NULL);
431     GNUNET_CONTAINER_multihashmap_destroy (h->watches);
432     h->watches = NULL;
433   }
434   while (NULL != (ic = h->iterate_head))
435   {
436     GNUNET_break (0);
437     GNUNET_PEERSTORE_iterate_cancel (ic);
438   }
439   if (NULL != h->store_head)
440   {
441     if (GNUNET_YES == sync_first)
442     {
443       LOG (GNUNET_ERROR_TYPE_DEBUG,
444            "Delaying disconnection due to pending store requests.\n");
445       h->disconnecting = GNUNET_YES;
446       return;
447     }
448     while (NULL != (sc = h->store_head))
449       GNUNET_PEERSTORE_store_cancel (sc);
450   }
451   do_disconnect (h);
452 }
453
454
455 /******************************************************************************/
456 /*******************            STORE FUNCTIONS           *********************/
457 /******************************************************************************/
458
459
460 /**
461  * Cancel a store request
462  *
463  * @param sc Store request context
464  */
465 void
466 GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc)
467 {
468   struct GNUNET_PEERSTORE_Handle *h = sc->h;
469
470   GNUNET_CONTAINER_DLL_remove (sc->h->store_head, sc->h->store_tail, sc);
471   GNUNET_free (sc->sub_system);
472   GNUNET_free (sc->value);
473   GNUNET_free (sc->key);
474   GNUNET_free (sc);
475   if ((GNUNET_YES == h->disconnecting) && (NULL == h->store_head))
476     do_disconnect (h);
477 }
478
479
480 /**
481  * Store a new entry in the PEERSTORE.
482  * Note that stored entries can be lost in some cases
483  * such as power failure.
484  *
485  * @param h Handle to the PEERSTORE service
486  * @param sub_system name of the sub system
487  * @param peer Peer Identity
488  * @param key entry key
489  * @param value entry value BLOB
490  * @param size size of @e value
491  * @param expiry absolute time after which the entry is (possibly) deleted
492  * @param options options specific to the storage operation
493  * @param cont Continuation function after the store request is sent
494  * @param cont_cls Closure for @a cont
495  */
496 struct GNUNET_PEERSTORE_StoreContext *
497 GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
498                         const char *sub_system,
499                         const struct GNUNET_PeerIdentity *peer,
500                         const char *key,
501                         const void *value, size_t size,
502                         struct GNUNET_TIME_Absolute expiry,
503                         enum GNUNET_PEERSTORE_StoreOption options,
504                         GNUNET_PEERSTORE_Continuation cont,
505                         void *cont_cls)
506 {
507   struct GNUNET_MQ_Envelope *ev;
508   struct GNUNET_PEERSTORE_StoreContext *sc;
509
510   LOG (GNUNET_ERROR_TYPE_DEBUG,
511        "Storing value (size: %lu) for subsytem `%s', peer `%s', key `%s'\n",
512        size, sub_system, GNUNET_i2s (peer), key);
513   ev = PEERSTORE_create_record_mq_envelope (sub_system, peer, key, value, size,
514                                             expiry, options,
515                                             GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
516   sc = GNUNET_new (struct GNUNET_PEERSTORE_StoreContext);
517
518   sc->sub_system = GNUNET_strdup (sub_system);
519   sc->peer = *peer;
520   sc->key = GNUNET_strdup (key);
521   sc->value = GNUNET_memdup (value, size);
522   sc->size = size;
523   sc->expiry = expiry;
524   sc->options = options;
525   sc->cont = cont;
526   sc->cont_cls = cont_cls;
527   sc->h = h;
528
529   GNUNET_CONTAINER_DLL_insert_tail (h->store_head, h->store_tail, sc);
530   GNUNET_MQ_notify_sent (ev, &store_request_sent, sc);
531   GNUNET_MQ_send (h->mq, ev);
532   return sc;
533
534 }
535
536
537 /******************************************************************************/
538 /*******************           ITERATE FUNCTIONS          *********************/
539 /******************************************************************************/
540
541
542 /**
543  * When a response for iterate request is received
544  *
545  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
546  * @param msg message received
547  */
548 static void
549 handle_iterate_end (void *cls,
550                     const struct GNUNET_MessageHeader *msg)
551 {
552   struct GNUNET_PEERSTORE_Handle *h = cls;
553   struct GNUNET_PEERSTORE_IterateContext *ic;
554   GNUNET_PEERSTORE_Processor callback;
555   void *callback_cls;
556
557   ic = h->iterate_head;
558   if (NULL == ic)
559   {
560     LOG (GNUNET_ERROR_TYPE_ERROR,
561          _("Unexpected iteration response, this should not happen.\n"));
562     reconnect (h);
563     return;
564   }
565   callback = ic->callback;
566   callback_cls = ic->callback_cls;
567   ic->iterating = GNUNET_NO;
568   GNUNET_PEERSTORE_iterate_cancel (ic);
569   if (NULL != callback)
570     callback (callback_cls, NULL, NULL);
571 }
572
573
574 /**
575  * When a response for iterate request is received, check the
576  * message is well-formed.
577  *
578  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
579  * @param msg message received
580  */
581 static int
582 check_iterate_result (void *cls,
583                       const struct StoreRecordMessage *msg)
584 {
585   /* we defer validation to #handle_iterate_result */
586   return GNUNET_OK;
587 }
588
589
590 /**
591  * When a response for iterate request is received
592  *
593  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
594  * @param msg message received
595  */
596 static void
597 handle_iterate_result (void *cls,
598                        const struct StoreRecordMessage *msg)
599 {
600   struct GNUNET_PEERSTORE_Handle *h = cls;
601   struct GNUNET_PEERSTORE_IterateContext *ic;
602   GNUNET_PEERSTORE_Processor callback;
603   void *callback_cls;
604   struct GNUNET_PEERSTORE_Record *record;
605
606   ic = h->iterate_head;
607   if (NULL == ic)
608   {
609     LOG (GNUNET_ERROR_TYPE_ERROR,
610          _("Unexpected iteration response, this should not happen.\n"));
611     reconnect (h);
612     return;
613   }
614   ic->iterating = GNUNET_YES;
615   callback = ic->callback;
616   callback_cls = ic->callback_cls;
617   if (NULL == callback)
618     return;
619   record = PEERSTORE_parse_record_message (msg);
620   if (NULL == record)
621   {
622     callback (callback_cls,
623               NULL,
624               _("Received a malformed response from service."));
625   }
626   else
627   {
628     callback (callback_cls,
629               record,
630               NULL);
631     PEERSTORE_destroy_record (record);
632   }
633 }
634
635
636 /**
637  * Cancel an iterate request
638  * Please do not call after the iterate request is done
639  *
640  * @param ic Iterate request context as returned by GNUNET_PEERSTORE_iterate()
641  */
642 void
643 GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic)
644 {
645   if (NULL != ic->timeout_task)
646   {
647     GNUNET_SCHEDULER_cancel (ic->timeout_task);
648     ic->timeout_task = NULL;
649   }
650   if (GNUNET_NO == ic->iterating)
651   {
652     GNUNET_CONTAINER_DLL_remove (ic->h->iterate_head,
653                                  ic->h->iterate_tail,
654                                  ic);
655     GNUNET_free (ic->sub_system);
656     GNUNET_free_non_null (ic->key);
657     GNUNET_free (ic);
658   }
659   else
660     ic->callback = NULL;
661 }
662
663
664 /**
665  * Iterate over records matching supplied key information
666  *
667  * @param h handle to the PEERSTORE service
668  * @param sub_system name of sub system
669  * @param peer Peer identity (can be NULL)
670  * @param key entry key string (can be NULL)
671  * @param timeout time after which the iterate request is canceled
672  * @param callback function called with each matching record, all NULL's on end
673  * @param callback_cls closure for @a callback
674  * @return Handle to iteration request
675  */
676 struct GNUNET_PEERSTORE_IterateContext *
677 GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
678                           const char *sub_system,
679                           const struct GNUNET_PeerIdentity *peer,
680                           const char *key,
681                           struct GNUNET_TIME_Relative timeout,
682                           GNUNET_PEERSTORE_Processor callback,
683                           void *callback_cls)
684 {
685   struct GNUNET_MQ_Envelope *ev;
686   struct GNUNET_PEERSTORE_IterateContext *ic;
687
688   ev = PEERSTORE_create_record_mq_envelope (sub_system,
689                                             peer,
690                                             key,
691                                             NULL, 0,
692                                             GNUNET_TIME_UNIT_FOREVER_ABS,
693                                             0,
694                                             GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
695   ic = GNUNET_new (struct GNUNET_PEERSTORE_IterateContext);
696
697   ic->callback = callback;
698   ic->callback_cls = callback_cls;
699   ic->h = h;
700   ic->sub_system = GNUNET_strdup (sub_system);
701   if (NULL != peer)
702     ic->peer = *peer;
703   if (NULL != key)
704     ic->key = GNUNET_strdup (key);
705   ic->timeout = timeout;
706   GNUNET_CONTAINER_DLL_insert_tail (h->iterate_head,
707                                     h->iterate_tail,
708                                     ic);
709   LOG (GNUNET_ERROR_TYPE_DEBUG,
710        "Sending an iterate request for sub system `%s'\n",
711        sub_system);
712   GNUNET_MQ_send (h->mq, ev);
713   ic->timeout_task =
714       GNUNET_SCHEDULER_add_delayed (timeout,
715                                     &iterate_timeout,
716                                     ic);
717   return ic;
718 }
719
720
721 /******************************************************************************/
722 /*******************            WATCH FUNCTIONS           *********************/
723 /******************************************************************************/
724
725 /**
726  * When a watch record is received, validate it is well-formed.
727  *
728  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
729  * @param msg message received
730  */
731 static int
732 check_watch_record (void *cls,
733                     const struct StoreRecordMessage *msg)
734 {
735   /* we defer validation to #handle_watch_result */
736   return GNUNET_OK;
737 }
738
739
740 /**
741  * When a watch record is received, process it.
742  *
743  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
744  * @param msg message received
745  */
746 static void
747 handle_watch_record (void *cls,
748                      const struct StoreRecordMessage *msg)
749 {
750   struct GNUNET_PEERSTORE_Handle *h = cls;
751   struct GNUNET_PEERSTORE_Record *record;
752   struct GNUNET_HashCode keyhash;
753   struct GNUNET_PEERSTORE_WatchContext *wc;
754
755   LOG (GNUNET_ERROR_TYPE_DEBUG,
756        "Received a watch record from service.\n");
757   record = PEERSTORE_parse_record_message (msg);
758   if (NULL == record)
759   {
760     reconnect (h);
761     return;
762   }
763   PEERSTORE_hash_key (record->sub_system,
764                       &record->peer,
765                       record->key,
766                       &keyhash);
767   // FIXME: what if there are multiple watches for the same key?
768   wc = GNUNET_CONTAINER_multihashmap_get (h->watches,
769                                           &keyhash);
770   if (NULL == wc)
771   {
772     LOG (GNUNET_ERROR_TYPE_ERROR,
773          _("Received a watch result for a non existing watch.\n"));
774     PEERSTORE_destroy_record (record);
775     reconnect (h);
776     return;
777   }
778   if (NULL != wc->callback)
779     wc->callback (wc->callback_cls,
780                   record,
781                   NULL);
782   PEERSTORE_destroy_record (record);
783 }
784
785
786 /**
787  * Close the existing connection to PEERSTORE and reconnect.
788  *
789  * @param h handle to the service
790  */
791 static void
792 reconnect (struct GNUNET_PEERSTORE_Handle *h)
793 {
794   struct GNUNET_MQ_MessageHandler mq_handlers[] = {
795     GNUNET_MQ_hd_fixed_size (iterate_end,
796                              GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END,
797                              struct GNUNET_MessageHeader,
798                              h),
799     GNUNET_MQ_hd_var_size (iterate_result,
800                            GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD,
801                            struct StoreRecordMessage,
802                            h),
803     GNUNET_MQ_hd_var_size (watch_record,
804                            GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD,
805                            struct StoreRecordMessage,
806                            h),
807     GNUNET_MQ_handler_end ()
808   };
809   struct GNUNET_PEERSTORE_IterateContext *ic;
810   struct GNUNET_PEERSTORE_IterateContext *next;
811   GNUNET_PEERSTORE_Processor icb;
812   void *icb_cls;
813   struct GNUNET_PEERSTORE_StoreContext *sc;
814   struct GNUNET_MQ_Envelope *ev;
815
816   LOG (GNUNET_ERROR_TYPE_DEBUG,
817        "Reconnecting...\n");
818   for (ic = h->iterate_head; NULL != ic; ic = next)
819   {
820     next = ic->next;
821     if (GNUNET_YES == ic->iterating)
822     {
823       icb = ic->callback;
824       icb_cls = ic->callback_cls;
825       GNUNET_PEERSTORE_iterate_cancel (ic);
826       if (NULL != icb)
827         icb (icb_cls,
828              NULL,
829              "Iteration canceled due to reconnection");
830     }
831   }
832   if (NULL != h->mq)
833   {
834     GNUNET_MQ_destroy (h->mq);
835     h->mq = NULL;
836   }
837   h->mq = GNUNET_CLIENT_connect (h->cfg,
838                                  "peerstore",
839                                  mq_handlers,
840                                  &handle_client_error,
841                                  h);
842   if (NULL == h->mq)
843     return;
844   LOG (GNUNET_ERROR_TYPE_DEBUG,
845        "Resending pending requests after reconnect.\n");
846   if (NULL != h->watches)
847     GNUNET_CONTAINER_multihashmap_iterate (h->watches,
848                                            &rewatch_it,
849                                            h);
850   for (ic = h->iterate_head; NULL != ic; ic = ic->next)
851   {
852     ev = PEERSTORE_create_record_mq_envelope (ic->sub_system,
853                                               &ic->peer,
854                                               ic->key,
855                                               NULL, 0,
856                                               GNUNET_TIME_UNIT_FOREVER_ABS,
857                                               0,
858                                               GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
859     GNUNET_MQ_send (h->mq, ev);
860     if (NULL != ic->timeout_task)
861       GNUNET_SCHEDULER_cancel (ic->timeout_task);
862     ic->timeout_task
863       = GNUNET_SCHEDULER_add_delayed (ic->timeout,
864                                       &iterate_timeout,
865                                       ic);
866   }
867   for (sc = h->store_head; NULL != sc; sc = sc->next)
868   {
869     ev = PEERSTORE_create_record_mq_envelope (sc->sub_system,
870                                               &sc->peer,
871                                               sc->key,
872                                               sc->value,
873                                               sc->size,
874                                               sc->expiry,
875                                               sc->options,
876                                               GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
877     GNUNET_MQ_notify_sent (ev,
878                            &store_request_sent,
879                            sc);
880     GNUNET_MQ_send (h->mq,
881                     ev);
882   }
883 }
884
885
886 /**
887  * Cancel a watch request
888  *
889  * @param wc handle to the watch request
890  */
891 void
892 GNUNET_PEERSTORE_watch_cancel (struct GNUNET_PEERSTORE_WatchContext *wc)
893 {
894   struct GNUNET_PEERSTORE_Handle *h = wc->h;
895   struct GNUNET_MQ_Envelope *ev;
896   struct StoreKeyHashMessage *hm;
897
898   LOG (GNUNET_ERROR_TYPE_DEBUG,
899        "Canceling watch.\n");
900   ev = GNUNET_MQ_msg (hm,
901                       GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL);
902   hm->keyhash = wc->keyhash;
903   GNUNET_MQ_send (h->mq, ev);
904   GNUNET_CONTAINER_multihashmap_remove (h->watches,
905                                         &wc->keyhash,
906                                         wc);
907   GNUNET_free (wc);
908 }
909
910
911 /**
912  * Request watching a given key
913  * User will be notified with any new values added to key
914  *
915  * @param h handle to the PEERSTORE service
916  * @param sub_system name of sub system
917  * @param peer Peer identity
918  * @param key entry key string
919  * @param callback function called with each new value
920  * @param callback_cls closure for @a callback
921  * @return Handle to watch request
922  */
923 struct GNUNET_PEERSTORE_WatchContext *
924 GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h,
925                         const char *sub_system,
926                         const struct GNUNET_PeerIdentity *peer,
927                         const char *key,
928                         GNUNET_PEERSTORE_Processor callback,
929                         void *callback_cls)
930 {
931   struct GNUNET_MQ_Envelope *ev;
932   struct StoreKeyHashMessage *hm;
933   struct GNUNET_PEERSTORE_WatchContext *wc;
934
935   ev = GNUNET_MQ_msg (hm,
936                       GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
937   PEERSTORE_hash_key (sub_system,
938                       peer,
939                       key,
940                       &hm->keyhash);
941   wc = GNUNET_new (struct GNUNET_PEERSTORE_WatchContext);
942   wc->callback = callback;
943   wc->callback_cls = callback_cls;
944   wc->h = h;
945   wc->keyhash = hm->keyhash;
946   if (NULL == h->watches)
947     h->watches = GNUNET_CONTAINER_multihashmap_create (5,
948                                                        GNUNET_NO);
949   GNUNET_assert (GNUNET_OK ==
950                  GNUNET_CONTAINER_multihashmap_put (h->watches,
951                                                     &wc->keyhash,
952                                                     wc,
953                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
954   LOG (GNUNET_ERROR_TYPE_DEBUG,
955        "Sending a watch request for subsystem `%s', peer `%s', key `%s'.\n",
956        sub_system,
957        GNUNET_i2s (peer),
958        key);
959   GNUNET_MQ_send (h->mq,
960                   ev);
961   return wc;
962 }
963
964 /* end of peerstore_api.c */