use NULL value in load_path_suffix to NOT load any files
[oweals/gnunet.git] / src / peerstore / peerstore_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2013-2016, 2019 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20 /**
21  * @file peerstore/peerstore_api.c
22  * @brief API for peerstore
23  * @author Omar Tarabai
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "peerstore.h"
29 #include "peerstore_common.h"
30
31 #define LOG(kind, ...) GNUNET_log_from (kind, "peerstore-api", __VA_ARGS__)
32
33 /******************************************************************************/
34 /************************      DATA STRUCTURES     ****************************/
35 /******************************************************************************/
36
37 /**
38  * Handle to the PEERSTORE service.
39  */
40 struct GNUNET_PEERSTORE_Handle
41 {
42   /**
43    * Our configuration.
44    */
45   const struct GNUNET_CONFIGURATION_Handle *cfg;
46
47   /**
48    * Message queue
49    */
50   struct GNUNET_MQ_Handle *mq;
51
52   /**
53    * Head of active STORE requests.
54    */
55   struct GNUNET_PEERSTORE_StoreContext *store_head;
56
57   /**
58    * Tail of active STORE requests.
59    */
60   struct GNUNET_PEERSTORE_StoreContext *store_tail;
61
62   /**
63    * Head of active ITERATE requests.
64    */
65   struct GNUNET_PEERSTORE_IterateContext *iterate_head;
66
67   /**
68    * Tail of active ITERATE requests.
69    */
70   struct GNUNET_PEERSTORE_IterateContext *iterate_tail;
71
72   /**
73    * Hashmap of watch requests
74    */
75   struct GNUNET_CONTAINER_MultiHashMap *watches;
76
77   /**
78    * ID of the task trying to reconnect to the service.
79    */
80   struct GNUNET_SCHEDULER_Task *reconnect_task;
81
82   /**
83    * Delay until we try to reconnect.
84    */
85   struct GNUNET_TIME_Relative reconnect_delay;
86
87   /**
88    * Are we in the process of disconnecting but need to sync first?
89    */
90   int disconnecting;
91 };
92
93 /**
94  * Context for a store request
95  */
96 struct GNUNET_PEERSTORE_StoreContext
97 {
98   /**
99    * Kept in a DLL.
100    */
101   struct GNUNET_PEERSTORE_StoreContext *next;
102
103   /**
104    * Kept in a DLL.
105    */
106   struct GNUNET_PEERSTORE_StoreContext *prev;
107
108   /**
109    * Handle to the PEERSTORE service.
110    */
111   struct GNUNET_PEERSTORE_Handle *h;
112
113   /**
114    * Continuation called with service response
115    */
116   GNUNET_PEERSTORE_Continuation cont;
117
118   /**
119    * Closure for @e cont
120    */
121   void *cont_cls;
122
123   /**
124    * Which subsystem does the store?
125    */
126   char *sub_system;
127
128   /**
129    * Key for the store operation.
130    */
131   char *key;
132
133   /**
134    * Contains @e size bytes.
135    */
136   void *value;
137
138   /**
139    * Peer the store is for.
140    */
141   struct GNUNET_PeerIdentity peer;
142
143   /**
144    * Number of bytes in @e value.
145    */
146   size_t size;
147
148   /**
149    * When does the value expire?
150    */
151   struct GNUNET_TIME_Absolute expiry;
152
153   /**
154    * Options for the store operation.
155    */
156   enum GNUNET_PEERSTORE_StoreOption options;
157 };
158
159 /**
160  * Context for a iterate request
161  */
162 struct GNUNET_PEERSTORE_IterateContext
163 {
164   /**
165    * Kept in a DLL.
166    */
167   struct GNUNET_PEERSTORE_IterateContext *next;
168
169   /**
170    * Kept in a DLL.
171    */
172   struct GNUNET_PEERSTORE_IterateContext *prev;
173
174   /**
175    * Handle to the PEERSTORE service.
176    */
177   struct GNUNET_PEERSTORE_Handle *h;
178
179   /**
180    * Which subsystem does the store?
181    */
182   char *sub_system;
183
184   /**
185    * Peer the store is for.
186    */
187   struct GNUNET_PeerIdentity peer;
188
189   /**
190    * Key for the store operation.
191    */
192   char *key;
193
194   /**
195    * Callback with each matching record
196    */
197   GNUNET_PEERSTORE_Processor callback;
198
199   /**
200    * Closure for @e callback
201    */
202   void *callback_cls;
203
204   /**
205    * #GNUNET_YES if we are currently processing records.
206    */
207   int iterating;
208 };
209
210 /**
211  * Context for a watch request
212  */
213 struct GNUNET_PEERSTORE_WatchContext
214 {
215   /**
216    * Kept in a DLL.
217    */
218   struct GNUNET_PEERSTORE_WatchContext *next;
219
220   /**
221    * Kept in a DLL.
222    */
223   struct GNUNET_PEERSTORE_WatchContext *prev;
224
225   /**
226    * Handle to the PEERSTORE service.
227    */
228   struct GNUNET_PEERSTORE_Handle *h;
229
230   /**
231    * Callback with each record received
232    */
233   GNUNET_PEERSTORE_Processor callback;
234
235   /**
236    * Closure for @e callback
237    */
238   void *callback_cls;
239
240   /**
241    * Hash of the combined key
242    */
243   struct GNUNET_HashCode keyhash;
244 };
245
246 /******************************************************************************/
247 /*******************             DECLARATIONS             *********************/
248 /******************************************************************************/
249
250 /**
251  * Close the existing connection to PEERSTORE and reconnect.
252  *
253  * @param cls a `struct GNUNET_PEERSTORE_Handle *h`
254  */
255 static void
256 reconnect (void *cls);
257
258
259 /**
260  * Disconnect from the peerstore service.
261  *
262  * @param h peerstore handle to disconnect
263  */
264 static void
265 disconnect (struct GNUNET_PEERSTORE_Handle *h)
266 {
267   struct GNUNET_PEERSTORE_IterateContext *next;
268
269   for (struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head; NULL != ic;
270        ic = next)
271   {
272     next = ic->next;
273     if (GNUNET_YES == ic->iterating)
274     {
275       GNUNET_PEERSTORE_Processor icb;
276       void *icb_cls;
277
278       icb = ic->callback;
279       icb_cls = ic->callback_cls;
280       GNUNET_PEERSTORE_iterate_cancel (ic);
281       if (NULL != icb)
282         icb (icb_cls, NULL, "Iteration canceled due to reconnection");
283     }
284   }
285
286   if (NULL != h->mq)
287   {
288     GNUNET_MQ_destroy (h->mq);
289     h->mq = NULL;
290   }
291 }
292
293
294 /**
295  * Function that will schedule the job that will try
296  * to connect us again to the client.
297  *
298  * @param h peerstore to reconnect
299  */
300 static void
301 disconnect_and_schedule_reconnect (struct GNUNET_PEERSTORE_Handle *h)
302 {
303   GNUNET_assert (NULL == h->reconnect_task);
304   disconnect (h);
305   LOG (GNUNET_ERROR_TYPE_DEBUG,
306        "Scheduling task to reconnect to PEERSTORE service in %s.\n",
307        GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, GNUNET_YES));
308   h->reconnect_task =
309     GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, &reconnect, h);
310   h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
311 }
312
313
314 /**
315  * Callback after MQ envelope is sent
316  *
317  * @param cls a `struct GNUNET_PEERSTORE_StoreContext *`
318  */
319 static void
320 store_request_sent (void *cls)
321 {
322   struct GNUNET_PEERSTORE_StoreContext *sc = cls;
323   GNUNET_PEERSTORE_Continuation cont;
324   void *cont_cls;
325
326   cont = sc->cont;
327   cont_cls = sc->cont_cls;
328   GNUNET_PEERSTORE_store_cancel (sc);
329   if (NULL != cont)
330     cont (cont_cls, GNUNET_OK);
331 }
332
333
334 /******************************************************************************/
335 /*******************         CONNECTION FUNCTIONS         *********************/
336 /******************************************************************************/
337
338
339 /**
340  * Function called when we had trouble talking to the service.
341  */
342 static void
343 handle_client_error (void *cls, enum GNUNET_MQ_Error error)
344 {
345   struct GNUNET_PEERSTORE_Handle *h = cls;
346
347   LOG (GNUNET_ERROR_TYPE_ERROR,
348        "Received an error notification from MQ of type: %d\n",
349        error);
350   disconnect_and_schedule_reconnect (h);
351 }
352
353
354 /**
355  * Iterator over previous watches to resend them
356  *
357  * @param cls the `struct GNUNET_PEERSTORE_Handle`
358  * @param key key for the watch
359  * @param value the `struct GNUNET_PEERSTORE_WatchContext *`
360  * @return #GNUNET_YES (continue to iterate)
361  */
362 static int
363 rewatch_it (void *cls, const struct GNUNET_HashCode *key, void *value)
364 {
365   struct GNUNET_PEERSTORE_Handle *h = cls;
366   struct GNUNET_PEERSTORE_WatchContext *wc = value;
367   struct StoreKeyHashMessage *hm;
368   struct GNUNET_MQ_Envelope *ev;
369
370   ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
371   hm->keyhash = wc->keyhash;
372   GNUNET_MQ_send (h->mq, ev);
373   return GNUNET_YES;
374 }
375
376
377 /**
378  * Iterator over watch requests to cancel them.
379  *
380  * @param cls unsused
381  * @param key key to the watch request
382  * @param value watch context
383  * @return #GNUNET_YES to continue iteration
384  */
385 static int
386 destroy_watch (void *cls, const struct GNUNET_HashCode *key, void *value)
387 {
388   struct GNUNET_PEERSTORE_WatchContext *wc = value;
389
390   GNUNET_PEERSTORE_watch_cancel (wc);
391   return GNUNET_YES;
392 }
393
394
395 /**
396  * Kill the connection to the service. This can be delayed in case of pending
397  * STORE requests and the user explicitly asked to sync first. Otherwise it is
398  * performed instantly.
399  *
400  * @param h Handle to the service.
401  */
402 static void
403 final_disconnect (struct GNUNET_PEERSTORE_Handle *h)
404 {
405   if (NULL != h->mq)
406   {
407     GNUNET_MQ_destroy (h->mq);
408     h->mq = NULL;
409   }
410   GNUNET_free (h);
411 }
412
413
414 /**
415  * Connect to the PEERSTORE service.
416  *
417  * @param cfg configuration to use
418  * @return NULL on error
419  */
420 struct GNUNET_PEERSTORE_Handle *
421 GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
422 {
423   struct GNUNET_PEERSTORE_Handle *h;
424
425   h = GNUNET_new (struct GNUNET_PEERSTORE_Handle);
426   h->cfg = cfg;
427   h->disconnecting = GNUNET_NO;
428   reconnect (h);
429   if (NULL == h->mq)
430   {
431     GNUNET_free (h);
432     return NULL;
433   }
434   return h;
435 }
436
437
438 /**
439  * Disconnect from the PEERSTORE service. Any pending ITERATE and WATCH requests
440  * will be canceled.
441  * Any pending STORE requests will depend on @e snyc_first flag.
442  *
443  * @param h handle to disconnect
444  * @param sync_first send any pending STORE requests before disconnecting
445  */
446 void
447 GNUNET_PEERSTORE_disconnect (struct GNUNET_PEERSTORE_Handle *h, int sync_first)
448 {
449   struct GNUNET_PEERSTORE_IterateContext *ic;
450   struct GNUNET_PEERSTORE_StoreContext *sc;
451
452   LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting.\n");
453   if (NULL != h->watches)
454   {
455     GNUNET_CONTAINER_multihashmap_iterate (h->watches, &destroy_watch, NULL);
456     GNUNET_CONTAINER_multihashmap_destroy (h->watches);
457     h->watches = NULL;
458   }
459   while (NULL != (ic = h->iterate_head))
460   {
461     GNUNET_break (0);
462     GNUNET_PEERSTORE_iterate_cancel (ic);
463   }
464   if (NULL != h->store_head)
465   {
466     if (GNUNET_YES == sync_first)
467     {
468       LOG (GNUNET_ERROR_TYPE_DEBUG,
469            "Delaying disconnection due to pending store requests.\n");
470       h->disconnecting = GNUNET_YES;
471       return;
472     }
473     while (NULL != (sc = h->store_head))
474       GNUNET_PEERSTORE_store_cancel (sc);
475   }
476   final_disconnect (h);
477 }
478
479
480 /******************************************************************************/
481 /*******************            STORE FUNCTIONS           *********************/
482 /******************************************************************************/
483
484
485 /**
486  * Cancel a store request
487  *
488  * @param sc Store request context
489  */
490 void
491 GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc)
492 {
493   struct GNUNET_PEERSTORE_Handle *h = sc->h;
494
495   GNUNET_CONTAINER_DLL_remove (sc->h->store_head, sc->h->store_tail, sc);
496   GNUNET_free (sc->sub_system);
497   GNUNET_free (sc->value);
498   GNUNET_free (sc->key);
499   GNUNET_free (sc);
500   if ((GNUNET_YES == h->disconnecting) && (NULL == h->store_head))
501     final_disconnect (h);
502 }
503
504
505 /**
506  * Store a new entry in the PEERSTORE.
507  * Note that stored entries can be lost in some cases
508  * such as power failure.
509  *
510  * @param h Handle to the PEERSTORE service
511  * @param sub_system name of the sub system
512  * @param peer Peer Identity
513  * @param key entry key
514  * @param value entry value BLOB
515  * @param size size of @e value
516  * @param expiry absolute time after which the entry is (possibly) deleted
517  * @param options options specific to the storage operation
518  * @param cont Continuation function after the store request is sent
519  * @param cont_cls Closure for @a cont
520  */
521 struct GNUNET_PEERSTORE_StoreContext *
522 GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
523                         const char *sub_system,
524                         const struct GNUNET_PeerIdentity *peer,
525                         const char *key,
526                         const void *value,
527                         size_t size,
528                         struct GNUNET_TIME_Absolute expiry,
529                         enum GNUNET_PEERSTORE_StoreOption options,
530                         GNUNET_PEERSTORE_Continuation cont,
531                         void *cont_cls)
532 {
533   struct GNUNET_MQ_Envelope *ev;
534   struct GNUNET_PEERSTORE_StoreContext *sc;
535
536   LOG (GNUNET_ERROR_TYPE_DEBUG,
537        "Storing value (size: %lu) for subsytem `%s', peer `%s', key `%s'\n",
538        size,
539        sub_system,
540        GNUNET_i2s (peer),
541        key);
542   ev =
543     PEERSTORE_create_record_mq_envelope (sub_system,
544                                          peer,
545                                          key,
546                                          value,
547                                          size,
548                                          expiry,
549                                          options,
550                                          GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
551   sc = GNUNET_new (struct GNUNET_PEERSTORE_StoreContext);
552
553   sc->sub_system = GNUNET_strdup (sub_system);
554   sc->peer = *peer;
555   sc->key = GNUNET_strdup (key);
556   sc->value = GNUNET_memdup (value, size);
557   sc->size = size;
558   sc->expiry = expiry;
559   sc->options = options;
560   sc->cont = cont;
561   sc->cont_cls = cont_cls;
562   sc->h = h;
563
564   GNUNET_CONTAINER_DLL_insert_tail (h->store_head, h->store_tail, sc);
565   GNUNET_MQ_notify_sent (ev, &store_request_sent, sc);
566   GNUNET_MQ_send (h->mq, ev);
567   return sc;
568 }
569
570
571 /******************************************************************************/
572 /*******************           ITERATE FUNCTIONS          *********************/
573 /******************************************************************************/
574
575
576 /**
577  * When a response for iterate request is received
578  *
579  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
580  * @param msg message received
581  */
582 static void
583 handle_iterate_end (void *cls, const struct GNUNET_MessageHeader *msg)
584 {
585   struct GNUNET_PEERSTORE_Handle *h = cls;
586   struct GNUNET_PEERSTORE_IterateContext *ic;
587   GNUNET_PEERSTORE_Processor callback;
588   void *callback_cls;
589
590   ic = h->iterate_head;
591   if (NULL == ic)
592   {
593     LOG (GNUNET_ERROR_TYPE_ERROR,
594          _ ("Unexpected iteration response, this should not happen.\n"));
595     disconnect_and_schedule_reconnect (h);
596     return;
597   }
598   callback = ic->callback;
599   callback_cls = ic->callback_cls;
600   ic->iterating = GNUNET_NO;
601   GNUNET_PEERSTORE_iterate_cancel (ic);
602   if (NULL != callback)
603     callback (callback_cls, NULL, NULL);
604   h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
605 }
606
607
608 /**
609  * When a response for iterate request is received, check the
610  * message is well-formed.
611  *
612  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
613  * @param msg message received
614  */
615 static int
616 check_iterate_result (void *cls, const struct StoreRecordMessage *msg)
617 {
618   /* we defer validation to #handle_iterate_result */
619   return GNUNET_OK;
620 }
621
622
623 /**
624  * When a response for iterate request is received
625  *
626  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
627  * @param msg message received
628  */
629 static void
630 handle_iterate_result (void *cls, const struct StoreRecordMessage *msg)
631 {
632   struct GNUNET_PEERSTORE_Handle *h = cls;
633   struct GNUNET_PEERSTORE_IterateContext *ic;
634   GNUNET_PEERSTORE_Processor callback;
635   void *callback_cls;
636   struct GNUNET_PEERSTORE_Record *record;
637
638   ic = h->iterate_head;
639   if (NULL == ic)
640   {
641     LOG (GNUNET_ERROR_TYPE_ERROR,
642          _ ("Unexpected iteration response, this should not happen.\n"));
643     disconnect_and_schedule_reconnect (h);
644     return;
645   }
646   ic->iterating = GNUNET_YES;
647   callback = ic->callback;
648   callback_cls = ic->callback_cls;
649   if (NULL == callback)
650     return;
651   record = PEERSTORE_parse_record_message (msg);
652   if (NULL == record)
653   {
654     callback (callback_cls,
655               NULL,
656               _ ("Received a malformed response from service."));
657   }
658   else
659   {
660     callback (callback_cls, record, NULL);
661     PEERSTORE_destroy_record (record);
662   }
663 }
664
665
666 /**
667  * Cancel an iterate request
668  * Please do not call after the iterate request is done
669  *
670  * @param ic Iterate request context as returned by GNUNET_PEERSTORE_iterate()
671  */
672 void
673 GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic)
674 {
675   if (GNUNET_NO == ic->iterating)
676   {
677     GNUNET_CONTAINER_DLL_remove (ic->h->iterate_head, ic->h->iterate_tail, ic);
678     GNUNET_free (ic->sub_system);
679     GNUNET_free_non_null (ic->key);
680     GNUNET_free (ic);
681   }
682   else
683     ic->callback = NULL;
684 }
685
686
687 /**
688  * Iterate over records matching supplied key information
689  *
690  * @param h handle to the PEERSTORE service
691  * @param sub_system name of sub system
692  * @param peer Peer identity (can be NULL)
693  * @param key entry key string (can be NULL)
694  * @param callback function called with each matching record, all NULL's on end
695  * @param callback_cls closure for @a callback
696  * @return Handle to iteration request
697  */
698 struct GNUNET_PEERSTORE_IterateContext *
699 GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
700                           const char *sub_system,
701                           const struct GNUNET_PeerIdentity *peer,
702                           const char *key,
703                           GNUNET_PEERSTORE_Processor callback,
704                           void *callback_cls)
705 {
706   struct GNUNET_MQ_Envelope *ev;
707   struct GNUNET_PEERSTORE_IterateContext *ic;
708
709   ev =
710     PEERSTORE_create_record_mq_envelope (sub_system,
711                                          peer,
712                                          key,
713                                          NULL,
714                                          0,
715                                          GNUNET_TIME_UNIT_FOREVER_ABS,
716                                          0,
717                                          GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
718   ic = GNUNET_new (struct GNUNET_PEERSTORE_IterateContext);
719   ic->callback = callback;
720   ic->callback_cls = callback_cls;
721   ic->h = h;
722   ic->sub_system = GNUNET_strdup (sub_system);
723   if (NULL != peer)
724     ic->peer = *peer;
725   if (NULL != key)
726     ic->key = GNUNET_strdup (key);
727   GNUNET_CONTAINER_DLL_insert_tail (h->iterate_head, h->iterate_tail, ic);
728   LOG (GNUNET_ERROR_TYPE_DEBUG,
729        "Sending an iterate request for sub system `%s'\n",
730        sub_system);
731   GNUNET_MQ_send (h->mq, ev);
732   return ic;
733 }
734
735
736 /******************************************************************************/
737 /*******************            WATCH FUNCTIONS           *********************/
738 /******************************************************************************/
739
740 /**
741  * When a watch record is received, validate it is well-formed.
742  *
743  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
744  * @param msg message received
745  */
746 static int
747 check_watch_record (void *cls, const struct StoreRecordMessage *msg)
748 {
749   /* we defer validation to #handle_watch_result */
750   return GNUNET_OK;
751 }
752
753
754 /**
755  * When a watch record is received, process it.
756  *
757  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
758  * @param msg message received
759  */
760 static void
761 handle_watch_record (void *cls, const struct StoreRecordMessage *msg)
762 {
763   struct GNUNET_PEERSTORE_Handle *h = cls;
764   struct GNUNET_PEERSTORE_Record *record;
765   struct GNUNET_HashCode keyhash;
766   struct GNUNET_PEERSTORE_WatchContext *wc;
767
768   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a watch record from service.\n");
769   record = PEERSTORE_parse_record_message (msg);
770   if (NULL == record)
771   {
772     disconnect_and_schedule_reconnect (h);
773     return;
774   }
775   PEERSTORE_hash_key (record->sub_system, &record->peer, record->key, &keyhash);
776   // FIXME: what if there are multiple watches for the same key?
777   wc = GNUNET_CONTAINER_multihashmap_get (h->watches, &keyhash);
778   if (NULL == wc)
779   {
780     LOG (GNUNET_ERROR_TYPE_ERROR,
781          _ ("Received a watch result for a non existing watch.\n"));
782     PEERSTORE_destroy_record (record);
783     disconnect_and_schedule_reconnect (h);
784     return;
785   }
786   if (NULL != wc->callback)
787     wc->callback (wc->callback_cls, record, NULL);
788   h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
789   PEERSTORE_destroy_record (record);
790 }
791
792
793 /**
794  * Close the existing connection to PEERSTORE and reconnect.
795  *
796  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
797  */
798 static void
799 reconnect (void *cls)
800 {
801   struct GNUNET_PEERSTORE_Handle *h = cls;
802   struct GNUNET_MQ_MessageHandler mq_handlers[] =
803   { GNUNET_MQ_hd_fixed_size (iterate_end,
804                              GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END,
805                              struct GNUNET_MessageHeader,
806                              h),
807     GNUNET_MQ_hd_var_size (iterate_result,
808                            GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD,
809                            struct StoreRecordMessage,
810                            h),
811     GNUNET_MQ_hd_var_size (watch_record,
812                            GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD,
813                            struct StoreRecordMessage,
814                            h),
815     GNUNET_MQ_handler_end () };
816   struct GNUNET_MQ_Envelope *ev;
817
818   h->reconnect_task = NULL;
819   LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n");
820   h->mq = GNUNET_CLIENT_connect (h->cfg,
821                                  "peerstore",
822                                  mq_handlers,
823                                  &handle_client_error,
824                                  h);
825   if (NULL == h->mq)
826     return;
827   LOG (GNUNET_ERROR_TYPE_DEBUG,
828        "Resending pending requests after reconnect.\n");
829   if (NULL != h->watches)
830     GNUNET_CONTAINER_multihashmap_iterate (h->watches, &rewatch_it, h);
831   for (struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head; NULL != ic;
832        ic = ic->next)
833   {
834     ev =
835       PEERSTORE_create_record_mq_envelope (ic->sub_system,
836                                            &ic->peer,
837                                            ic->key,
838                                            NULL,
839                                            0,
840                                            GNUNET_TIME_UNIT_FOREVER_ABS,
841                                            0,
842                                            GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
843     GNUNET_MQ_send (h->mq, ev);
844   }
845   for (struct GNUNET_PEERSTORE_StoreContext *sc = h->store_head; NULL != sc;
846        sc = sc->next)
847   {
848     ev =
849       PEERSTORE_create_record_mq_envelope (sc->sub_system,
850                                            &sc->peer,
851                                            sc->key,
852                                            sc->value,
853                                            sc->size,
854                                            sc->expiry,
855                                            sc->options,
856                                            GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
857     GNUNET_MQ_notify_sent (ev, &store_request_sent, sc);
858     GNUNET_MQ_send (h->mq, ev);
859   }
860 }
861
862
863 /**
864  * Cancel a watch request
865  *
866  * @param wc handle to the watch request
867  */
868 void
869 GNUNET_PEERSTORE_watch_cancel (struct GNUNET_PEERSTORE_WatchContext *wc)
870 {
871   struct GNUNET_PEERSTORE_Handle *h = wc->h;
872   struct GNUNET_MQ_Envelope *ev;
873   struct StoreKeyHashMessage *hm;
874
875   LOG (GNUNET_ERROR_TYPE_DEBUG, "Canceling watch.\n");
876   ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL);
877   hm->keyhash = wc->keyhash;
878   GNUNET_MQ_send (h->mq, ev);
879   GNUNET_assert (
880     GNUNET_YES ==
881     GNUNET_CONTAINER_multihashmap_remove (h->watches, &wc->keyhash, wc));
882   GNUNET_free (wc);
883 }
884
885
886 /**
887  * Request watching a given key
888  * User will be notified with any new values added to key
889  *
890  * @param h handle to the PEERSTORE service
891  * @param sub_system name of sub system
892  * @param peer Peer identity
893  * @param key entry key string
894  * @param callback function called with each new value
895  * @param callback_cls closure for @a callback
896  * @return Handle to watch request
897  */
898 struct GNUNET_PEERSTORE_WatchContext *
899 GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h,
900                         const char *sub_system,
901                         const struct GNUNET_PeerIdentity *peer,
902                         const char *key,
903                         GNUNET_PEERSTORE_Processor callback,
904                         void *callback_cls)
905 {
906   struct GNUNET_MQ_Envelope *ev;
907   struct StoreKeyHashMessage *hm;
908   struct GNUNET_PEERSTORE_WatchContext *wc;
909
910   ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
911   PEERSTORE_hash_key (sub_system, peer, key, &hm->keyhash);
912   wc = GNUNET_new (struct GNUNET_PEERSTORE_WatchContext);
913   wc->callback = callback;
914   wc->callback_cls = callback_cls;
915   wc->h = h;
916   wc->keyhash = hm->keyhash;
917   if (NULL == h->watches)
918     h->watches = GNUNET_CONTAINER_multihashmap_create (5, GNUNET_NO);
919   GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put (
920                    h->watches,
921                    &wc->keyhash,
922                    wc,
923                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
924   LOG (GNUNET_ERROR_TYPE_DEBUG,
925        "Sending a watch request for subsystem `%s', peer `%s', key `%s'.\n",
926        sub_system,
927        GNUNET_i2s (peer),
928        key);
929   GNUNET_MQ_send (h->mq, ev);
930   return wc;
931 }
932
933
934 /* end of peerstore_api.c */