-towards addressing #3581
[oweals/gnunet.git] / src / peerstore / peerstore_api.c
1 /*
2      This file is part of GNUnet.
3      (C)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file peerstore/peerstore_api.c
23  * @brief API for peerstore
24  * @author Omar Tarabai
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "peerstore.h"
29 #include "peerstore_common.h"
30
31 #define LOG(kind,...) GNUNET_log_from (kind, "peerstore-api",__VA_ARGS__)
32
33 /******************************************************************************/
34 /************************      DATA STRUCTURES     ****************************/
35 /******************************************************************************/
36
37 /**
38  * Handle to the PEERSTORE service.
39  */
40 struct GNUNET_PEERSTORE_Handle
41 {
42
43   /**
44    * Our configuration.
45    */
46   const struct GNUNET_CONFIGURATION_Handle *cfg;
47
48   /**
49    * Connection to the service.
50    */
51   struct GNUNET_CLIENT_Connection *client;
52
53   /**
54    * Message queue
55    */
56   struct GNUNET_MQ_Handle *mq;
57
58   /**
59    * Head of active STORE requests.
60    */
61   struct GNUNET_PEERSTORE_StoreContext *store_head;
62
63   /**
64    * Tail of active STORE requests.
65    */
66   struct GNUNET_PEERSTORE_StoreContext *store_tail;
67
68   /**
69    * Head of active ITERATE requests.
70    */
71   struct GNUNET_PEERSTORE_IterateContext *iterate_head;
72
73   /**
74    * Tail of active ITERATE requests.
75    */
76   struct GNUNET_PEERSTORE_IterateContext *iterate_tail;
77
78   /**
79    * Hashmap of watch requests
80    */
81   struct GNUNET_CONTAINER_MultiHashMap *watches;
82
83   /**
84    * Are we in the process of disconnecting but need to sync first?
85    */
86   int disconnecting;
87
88 };
89
90 /**
91  * Context for a store request
92  */
93 struct GNUNET_PEERSTORE_StoreContext
94 {
95   /**
96    * Kept in a DLL.
97    */
98   struct GNUNET_PEERSTORE_StoreContext *next;
99
100   /**
101    * Kept in a DLL.
102    */
103   struct GNUNET_PEERSTORE_StoreContext *prev;
104
105   /**
106    * Handle to the PEERSTORE service.
107    */
108   struct GNUNET_PEERSTORE_Handle *h;
109
110   /**
111    * Continuation called with service response
112    */
113   GNUNET_PEERSTORE_Continuation cont;
114
115   /**
116    * Closure for @e cont
117    */
118   void *cont_cls;
119
120   /**
121    * Which subsystem does the store?
122    */
123   char *sub_system;
124
125   /**
126    * Key for the store operation.
127    */
128   char *key;
129
130   /**
131    * Contains @e size bytes.
132    */
133   void *value;
134
135   /**
136    * MQ Envelope with store request message
137    */
138   struct GNUNET_MQ_Envelope *ev;
139
140   /**
141    * Peer the store is for.
142    */
143   struct GNUNET_PeerIdentity peer;
144
145   /**
146    * Number of bytes in @e value.
147    */
148   size_t size;
149
150   /**
151    * When does the value expire?
152    */
153   struct GNUNET_TIME_Absolute expiry;
154
155   /**
156    * Options for the store operation.
157    */
158   enum GNUNET_PEERSTORE_StoreOption options;
159
160 };
161
162 /**
163  * Context for a iterate request
164  */
165 struct GNUNET_PEERSTORE_IterateContext
166 {
167   /**
168    * Kept in a DLL.
169    */
170   struct GNUNET_PEERSTORE_IterateContext *next;
171
172   /**
173    * Kept in a DLL.
174    */
175   struct GNUNET_PEERSTORE_IterateContext *prev;
176
177   /**
178    * Handle to the PEERSTORE service.
179    */
180   struct GNUNET_PEERSTORE_Handle *h;
181
182   /**
183    * MQ Envelope with iterate request message
184    */
185   struct GNUNET_MQ_Envelope *ev;
186
187   /**
188    * Callback with each matching record
189    */
190   GNUNET_PEERSTORE_Processor callback;
191
192   /**
193    * Closure for @e callback
194    */
195   void *callback_cls;
196
197   /**
198    * #GNUNET_YES / #GNUNET_NO
199    * Iterate request has been sent and we are still expecting records
200    */
201   int iterating;
202
203   /**
204    * Task identifier for the function called
205    * on iterate request timeout
206    */
207   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
208
209 };
210
211 /**
212  * Context for a watch request
213  */
214 struct GNUNET_PEERSTORE_WatchContext
215 {
216   /**
217    * Kept in a DLL.
218    */
219   struct GNUNET_PEERSTORE_WatchContext *next;
220
221   /**
222    * Kept in a DLL.
223    */
224   struct GNUNET_PEERSTORE_WatchContext *prev;
225
226   /**
227    * Handle to the PEERSTORE service.
228    */
229   struct GNUNET_PEERSTORE_Handle *h;
230
231   /**
232    * MQ Envelope with watch request message
233    */
234   struct GNUNET_MQ_Envelope *ev;
235
236   /**
237    * Callback with each record received
238    */
239   GNUNET_PEERSTORE_Processor callback;
240
241   /**
242    * Closure for @e callback
243    */
244   void *callback_cls;
245
246   /**
247    * Hash of the combined key
248    */
249   struct GNUNET_HashCode keyhash;
250
251   /**
252    * #GNUNET_YES / #GNUNET_NO
253    * if sent, cannot be canceled
254    */
255   int request_sent;
256
257 };
258
259 /******************************************************************************/
260 /*******************             DECLARATIONS             *********************/
261 /******************************************************************************/
262
263 /**
264  * When a response for iterate request is received
265  *
266  * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
267  * @param msg message received, NULL on timeout or fatal error
268  */
269 static void
270 handle_iterate_result (void *cls, const struct GNUNET_MessageHeader *msg);
271
272 /**
273  * When a watch record is received
274  *
275  * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
276  * @param msg message received, NULL on timeout or fatal error
277  */
278 static void
279 handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg);
280
281 /**
282  * Close the existing connection to PEERSTORE and reconnect.
283  *
284  * @param h handle to the service
285  */
286 static void
287 reconnect (struct GNUNET_PEERSTORE_Handle *h);
288
289 /**
290  * Callback after MQ envelope is sent
291  *
292  * @param cls a 'struct GNUNET_PEERSTORE_WatchContext *'
293  */
294 static void
295 watch_request_sent (void *cls);
296
297 /**
298  * Callback after MQ envelope is sent
299  *
300  * @param cls a 'struct GNUNET_PEERSTORE_IterateContext *'
301  */
302 static void
303 iterate_request_sent (void *cls);
304
305
306 /**
307  * Callback after MQ envelope is sent
308  *
309  * @param cls a `struct GNUNET_PEERSTORE_StoreContext *`
310  */
311 static void
312 store_request_sent (void *cls)
313 {
314   struct GNUNET_PEERSTORE_StoreContext *sc = cls;
315   GNUNET_PEERSTORE_Continuation cont;
316   void *cont_cls;
317
318   sc->ev = NULL;
319   cont = sc->cont;
320   cont_cls = sc->cont_cls;
321   GNUNET_PEERSTORE_store_cancel (sc);
322   if (NULL != cont)
323     cont (cont_cls, GNUNET_OK);
324 }
325
326
327 /**
328  * MQ message handlers
329  */
330 static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
331   {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD, 0},
332   {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END,
333    sizeof (struct GNUNET_MessageHeader)},
334   {&handle_watch_result, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD, 0},
335   GNUNET_MQ_HANDLERS_END
336 };
337
338 /******************************************************************************/
339 /*******************         CONNECTION FUNCTIONS         *********************/
340 /******************************************************************************/
341
342 static void
343 handle_client_error (void *cls, enum GNUNET_MQ_Error error)
344 {
345   struct GNUNET_PEERSTORE_Handle *h = cls;
346
347   LOG (GNUNET_ERROR_TYPE_ERROR,
348        _("Received an error notification from MQ of type: %d\n"), error);
349   reconnect (h);
350 }
351
352
353 /**
354  * Iterator over previous watches to resend them
355  */
356 static int
357 rewatch_it (void *cls,
358             const struct GNUNET_HashCode *key,
359             void *value)
360 {
361   struct GNUNET_PEERSTORE_Handle *h = cls;
362   struct GNUNET_PEERSTORE_WatchContext *wc = value;
363   struct StoreKeyHashMessage *hm;
364
365   if (GNUNET_YES == wc->request_sent)
366   {                             /* Envelope gone, create new one. */
367     wc->ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
368     hm->keyhash = wc->keyhash;
369     wc->request_sent = GNUNET_NO;
370   }
371   GNUNET_MQ_notify_sent (wc->ev, &watch_request_sent, wc);
372   GNUNET_MQ_send (h->mq, wc->ev);
373   return GNUNET_YES;
374 }
375
376
377 /**
378  * Close the existing connection to PEERSTORE and reconnect.
379  *
380  * @param h handle to the service
381  */
382 static void
383 reconnect (struct GNUNET_PEERSTORE_Handle *h)
384 {
385   struct GNUNET_PEERSTORE_IterateContext *ic;
386   GNUNET_PEERSTORE_Processor icb;
387   void *icb_cls;
388   struct GNUNET_PEERSTORE_StoreContext *sc;
389
390   LOG (GNUNET_ERROR_TYPE_DEBUG,
391        "Reconnecting...\n");
392   for (sc = h->store_head; NULL != sc; sc = sc->next)
393   {
394     if (NULL != sc->ev)
395     {
396       GNUNET_MQ_send_cancel (sc->ev);
397       sc->ev = NULL;
398     }
399   }
400   if (NULL != h->mq)
401   {
402     GNUNET_MQ_destroy (h->mq);
403     h->mq = NULL;
404   }
405   if (NULL != h->client)
406   {
407     GNUNET_CLIENT_disconnect (h->client);
408     h->client = NULL;
409   }
410   h->client = GNUNET_CLIENT_connect ("peerstore", h->cfg);
411   GNUNET_assert (NULL != h->client);
412   h->mq =
413       GNUNET_MQ_queue_for_connection_client (h->client, mq_handlers,
414                                              &handle_client_error, h);
415   LOG (GNUNET_ERROR_TYPE_DEBUG,
416        "Resending pending requests after reconnect.\n");
417   if (NULL != h->watches)
418     GNUNET_CONTAINER_multihashmap_iterate (h->watches,
419                                            &rewatch_it,
420                                            h);
421   for (ic = h->iterate_head; NULL != ic; ic = ic->next)
422   {
423     if (GNUNET_YES == ic->iterating)
424     {
425       icb = ic->callback;
426       icb_cls = ic->callback_cls;
427       GNUNET_PEERSTORE_iterate_cancel (ic);
428       if (NULL != icb)
429         icb (icb_cls,
430              NULL,
431              _("Iteration canceled due to reconnection."));
432     }
433     else
434     {
435       GNUNET_MQ_notify_sent (ic->ev,
436                              &iterate_request_sent,
437                              ic);
438       GNUNET_MQ_send (h->mq, ic->ev);
439     }
440   }
441   for (sc = h->store_head; NULL != sc; sc = sc->next)
442   {
443     sc->ev = PEERSTORE_create_record_mq_envelope (sc->sub_system,
444                                                   &sc->peer,
445                                                   sc->key,
446                                                   sc->value,
447                                                   sc->size,
448                                                   &sc->expiry,
449                                                   sc->options,
450                                                   GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
451     GNUNET_MQ_notify_sent (sc->ev,
452                            &store_request_sent,
453                            sc);
454     GNUNET_MQ_send (h->mq, sc->ev);
455   }
456 }
457
458
459 /**
460  * Iterator over watch requests to cancel them.
461  *
462  * @param cls unsused
463  * @param key key to the watch request
464  * @param value watch context
465  * @return #GNUNET_YES to continue iteration
466  */
467 static int
468 destroy_watch (void *cls,
469                const struct GNUNET_HashCode *key,
470                void *value)
471 {
472   struct GNUNET_PEERSTORE_WatchContext *wc = value;
473
474   GNUNET_PEERSTORE_watch_cancel (wc);
475   return GNUNET_YES;
476 }
477
478
479 /**
480  * Kill the connection to the service. This can be delayed in case of pending
481  * STORE requests and the user explicitly asked to sync first. Otherwise it is
482  * performed instantly.
483  *
484  * @param h Handle to the service.
485  */
486 static void
487 do_disconnect (struct GNUNET_PEERSTORE_Handle *h)
488 {
489   if (NULL != h->mq)
490   {
491     GNUNET_MQ_destroy (h->mq);
492     h->mq = NULL;
493   }
494   if (NULL != h->client)
495   {
496     GNUNET_CLIENT_disconnect (h->client);
497     h->client = NULL;
498   }
499   GNUNET_free (h);
500 }
501
502
503 /**
504  * Connect to the PEERSTORE service.
505  *
506  * @param cfg configuration to use
507  * @return NULL on error
508  */
509 struct GNUNET_PEERSTORE_Handle *
510 GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
511 {
512   struct GNUNET_PEERSTORE_Handle *h;
513
514   h = GNUNET_new (struct GNUNET_PEERSTORE_Handle);
515
516   h->client = GNUNET_CLIENT_connect ("peerstore", cfg);
517   if (NULL == h->client)
518   {
519     GNUNET_free (h);
520     return NULL;
521   }
522   h->cfg = cfg;
523   h->disconnecting = GNUNET_NO;
524   h->mq =
525       GNUNET_MQ_queue_for_connection_client (h->client, mq_handlers,
526                                              &handle_client_error, h);
527   if (NULL == h->mq)
528   {
529     GNUNET_CLIENT_disconnect (h->client);
530     GNUNET_free (h);
531     return NULL;
532   }
533   LOG (GNUNET_ERROR_TYPE_DEBUG,
534        "New connection created\n");
535   return h;
536 }
537
538
539 /**
540  * Disconnect from the PEERSTORE service. Any pending ITERATE and WATCH requests
541  * will be canceled.
542  * Any pending STORE requests will depend on @e snyc_first flag.
543  *
544  * @param h handle to disconnect
545  * @param sync_first send any pending STORE requests before disconnecting
546  */
547 void
548 GNUNET_PEERSTORE_disconnect (struct GNUNET_PEERSTORE_Handle *h, int sync_first)
549 {
550   struct GNUNET_PEERSTORE_IterateContext *ic;
551   struct GNUNET_PEERSTORE_IterateContext *ic_iter;
552   struct GNUNET_PEERSTORE_StoreContext *sc;
553   struct GNUNET_PEERSTORE_StoreContext *sc_iter;
554
555   LOG (GNUNET_ERROR_TYPE_DEBUG,
556        "Disconnecting.\n");
557   if (NULL != h->watches)
558   {
559     GNUNET_CONTAINER_multihashmap_iterate (h->watches,
560                                            &destroy_watch,
561                                            NULL);
562     GNUNET_CONTAINER_multihashmap_destroy (h->watches);
563     h->watches = NULL;
564   }
565   ic_iter = h->iterate_head;
566   while (NULL != ic_iter)
567   {
568     GNUNET_break (0);
569     ic = ic_iter;
570     ic_iter = ic_iter->next;
571     GNUNET_PEERSTORE_iterate_cancel (ic);
572   }
573   if (NULL != h->store_head)
574   {
575     if (GNUNET_YES == sync_first)
576     {
577       LOG (GNUNET_ERROR_TYPE_DEBUG,
578            "Delaying disconnection due to pending store requests.\n");
579       h->disconnecting = GNUNET_YES;
580       return;
581     }
582     sc_iter = h->store_head;
583     while (NULL != sc_iter)
584     {
585       sc = sc_iter;
586       sc_iter = sc_iter->next;
587       GNUNET_PEERSTORE_store_cancel (sc);
588     }
589   }
590   do_disconnect (h);
591 }
592
593
594 /******************************************************************************/
595 /*******************            STORE FUNCTIONS           *********************/
596 /******************************************************************************/
597
598
599 /**
600  * Cancel a store request
601  *
602  * @param sc Store request context
603  */
604 void
605 GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc)
606 {
607   struct GNUNET_PEERSTORE_Handle *h = sc->h;
608
609   if (NULL != sc->ev)
610   {
611     GNUNET_MQ_send_cancel (sc->ev);
612     sc->ev = NULL;
613   }
614   GNUNET_CONTAINER_DLL_remove (sc->h->store_head,
615                                sc->h->store_tail,
616                                sc);
617   GNUNET_free (sc->sub_system);
618   GNUNET_free (sc->value);
619   GNUNET_free (sc->key);
620   GNUNET_free (sc);
621   if ( (GNUNET_YES == h->disconnecting) &&
622        (NULL == h->store_head) )
623     do_disconnect (h);
624 }
625
626
627 /**
628  * Store a new entry in the PEERSTORE.
629  * Note that stored entries can be lost in some cases
630  * such as power failure.
631  *
632  * @param h Handle to the PEERSTORE service
633  * @param sub_system name of the sub system
634  * @param peer Peer Identity
635  * @param key entry key
636  * @param value entry value BLOB
637  * @param size size of @e value
638  * @param expiry absolute time after which the entry is (possibly) deleted
639  * @param options options specific to the storage operation
640  * @param cont Continuation function after the store request is sent
641  * @param cont_cls Closure for @a cont
642  */
643 struct GNUNET_PEERSTORE_StoreContext *
644 GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
645                         const char *sub_system,
646                         const struct GNUNET_PeerIdentity *peer,
647                         const char *key,
648                         const void *value,
649                         size_t size,
650                         struct GNUNET_TIME_Absolute expiry,
651                         enum GNUNET_PEERSTORE_StoreOption options,
652                         GNUNET_PEERSTORE_Continuation cont,
653                         void *cont_cls)
654 {
655   struct GNUNET_MQ_Envelope *ev;
656   struct GNUNET_PEERSTORE_StoreContext *sc;
657
658   LOG (GNUNET_ERROR_TYPE_DEBUG,
659        "Storing value (size: %lu) for subsytem `%s', peer `%s', key `%s'\n",
660        size, sub_system, GNUNET_i2s (peer), key);
661   ev = PEERSTORE_create_record_mq_envelope (sub_system,
662                                             peer,
663                                             key,
664                                             value,
665                                             size,
666                                             &expiry,
667                                             options,
668                                             GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
669   sc = GNUNET_new (struct GNUNET_PEERSTORE_StoreContext);
670   sc->sub_system = GNUNET_strdup (sub_system);
671   sc->peer = *peer;
672   sc->key = GNUNET_strdup (key);
673   sc->value = GNUNET_memdup (value, size);
674   sc->size = size;
675   sc->expiry = expiry;
676   sc->options = options;
677   sc->cont = cont;
678   sc->cont_cls = cont_cls;
679   sc->h = h;
680
681   GNUNET_CONTAINER_DLL_insert_tail (h->store_head,
682                                     h->store_tail,
683                                     sc);
684   GNUNET_MQ_notify_sent (ev,
685                          &store_request_sent,
686                          sc);
687   GNUNET_MQ_send (h->mq, ev);
688   return sc;
689
690 }
691
692
693 /******************************************************************************/
694 /*******************           ITERATE FUNCTIONS          *********************/
695 /******************************************************************************/
696
697 /**
698  * When a response for iterate request is received
699  *
700  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
701  * @param msg message received, NULL on timeout or fatal error
702  */
703 static void
704 handle_iterate_result (void *cls,
705                        const struct GNUNET_MessageHeader *msg)
706 {
707   struct GNUNET_PEERSTORE_Handle *h = cls;
708   struct GNUNET_PEERSTORE_IterateContext *ic;
709   GNUNET_PEERSTORE_Processor callback;
710   void *callback_cls;
711   uint16_t msg_type;
712   struct GNUNET_PEERSTORE_Record *record;
713   int continue_iter;
714
715   ic = h->iterate_head;
716   if (NULL == ic)
717   {
718     LOG (GNUNET_ERROR_TYPE_ERROR,
719          _("Unexpected iteration response, this should not happen.\n"));
720     reconnect (h);
721     return;
722   }
723   callback = ic->callback;
724   callback_cls = ic->callback_cls;
725   if (NULL == msg)              /* Connection error */
726   {
727
728     if (NULL != callback)
729       callback (callback_cls, NULL,
730                 _("Error communicating with `PEERSTORE' service."));
731     reconnect (h);
732     return;
733   }
734   msg_type = ntohs (msg->type);
735   if (GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END == msg_type)
736   {
737     ic->iterating = GNUNET_NO;
738     GNUNET_PEERSTORE_iterate_cancel (ic);
739     if (NULL != callback)
740       callback (callback_cls, NULL, NULL);
741     return;
742   }
743   if (NULL != callback)
744   {
745     record = PEERSTORE_parse_record_message (msg);
746     if (NULL == record)
747       continue_iter =
748           callback (callback_cls, NULL,
749                     _("Received a malformed response from service."));
750     else
751     {
752       continue_iter = callback (callback_cls, record, NULL);
753       PEERSTORE_destroy_record (record);
754     }
755     if (GNUNET_NO == continue_iter)
756       ic->callback = NULL;
757   }
758 }
759
760
761 /**
762  * Callback after MQ envelope is sent
763  *
764  * @param cls a `struct GNUNET_PEERSTORE_IterateContext *`
765  */
766 static void
767 iterate_request_sent (void *cls)
768 {
769   struct GNUNET_PEERSTORE_IterateContext *ic = cls;
770
771   LOG (GNUNET_ERROR_TYPE_DEBUG,
772        "Iterate request sent to service.\n");
773   ic->iterating = GNUNET_YES;
774   ic->ev = NULL;
775 }
776
777
778 /**
779  * Called when the iterate request is timedout
780  *
781  * @param cls a `struct GNUNET_PEERSTORE_IterateContext *`
782  * @param tc Scheduler task context (unused)
783  */
784 static void
785 iterate_timeout (void *cls,
786                  const struct GNUNET_SCHEDULER_TaskContext *tc)
787 {
788   struct GNUNET_PEERSTORE_IterateContext *ic = cls;
789   GNUNET_PEERSTORE_Processor callback;
790   void *callback_cls;
791
792   ic->timeout_task = GNUNET_SCHEDULER_NO_TASK;
793   callback = ic->callback;
794   callback_cls = ic->callback_cls;
795   GNUNET_PEERSTORE_iterate_cancel (ic);
796   if (NULL != callback)
797     callback (callback_cls,
798               NULL,
799               _("timeout"));
800 }
801
802
803 /**
804  * Cancel an iterate request
805  * Please do not call after the iterate request is done
806  *
807  * @param ic Iterate request context as returned by GNUNET_PEERSTORE_iterate()
808  */
809 void
810 GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic)
811 {
812   if (GNUNET_SCHEDULER_NO_TASK != ic->timeout_task)
813   {
814     GNUNET_SCHEDULER_cancel (ic->timeout_task);
815     ic->timeout_task = GNUNET_SCHEDULER_NO_TASK;
816   }
817   if (GNUNET_NO == ic->iterating)
818   {
819     if (NULL != ic->ev)
820     {
821       GNUNET_MQ_send_cancel (ic->ev);
822       ic->ev = NULL;
823     }
824     GNUNET_CONTAINER_DLL_remove (ic->h->iterate_head,
825                                  ic->h->iterate_tail,
826                                  ic);
827     GNUNET_free (ic);
828   }
829   else
830     ic->callback = NULL;
831 }
832
833
834 /**
835  * Iterate over records matching supplied key information
836  *
837  * @param h handle to the PEERSTORE service
838  * @param sub_system name of sub system
839  * @param peer Peer identity (can be NULL)
840  * @param key entry key string (can be NULL)
841  * @param timeout time after which the iterate request is canceled
842  * @param callback function called with each matching record, all NULL's on end
843  * @param callback_cls closure for @a callback
844  * @return Handle to iteration request
845  */
846 struct GNUNET_PEERSTORE_IterateContext *
847 GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
848                           const char *sub_system,
849                           const struct GNUNET_PeerIdentity *peer,
850                           const char *key,
851                           struct GNUNET_TIME_Relative timeout,
852                           GNUNET_PEERSTORE_Processor callback,
853                           void *callback_cls)
854 {
855   struct GNUNET_MQ_Envelope *ev;
856   struct GNUNET_PEERSTORE_IterateContext *ic;
857
858   ev = PEERSTORE_create_record_mq_envelope (sub_system, peer, key, NULL, 0,
859                                             NULL, 0,
860                                             GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
861   ic = GNUNET_new (struct GNUNET_PEERSTORE_IterateContext);
862
863   ic->callback = callback;
864   ic->callback_cls = callback_cls;
865   ic->ev = ev;
866   ic->h = h;
867   ic->iterating = GNUNET_NO;
868   GNUNET_CONTAINER_DLL_insert_tail (h->iterate_head,
869                                     h->iterate_tail,
870                                     ic);
871   LOG (GNUNET_ERROR_TYPE_DEBUG,
872        "Sending an iterate request for sub system `%s'\n", sub_system);
873   GNUNET_MQ_notify_sent (ev, &iterate_request_sent, ic);
874   GNUNET_MQ_send (h->mq, ev);
875   ic->timeout_task =
876       GNUNET_SCHEDULER_add_delayed (timeout, &iterate_timeout, ic);
877   return ic;
878 }
879
880
881 /******************************************************************************/
882 /*******************            WATCH FUNCTIONS           *********************/
883 /******************************************************************************/
884
885 /**
886  * When a watch record is received
887  *
888  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
889  * @param msg message received, NULL on timeout or fatal error
890  */
891 static void
892 handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg)
893 {
894   struct GNUNET_PEERSTORE_Handle *h = cls;
895   struct GNUNET_PEERSTORE_Record *record;
896   struct GNUNET_HashCode keyhash;
897   struct GNUNET_PEERSTORE_WatchContext *wc;
898
899   if (NULL == msg)
900   {
901     LOG (GNUNET_ERROR_TYPE_ERROR,
902          _("Problem receiving a watch response, no way to determine which request.\n"));
903     reconnect (h);
904     return;
905   }
906   LOG (GNUNET_ERROR_TYPE_DEBUG,
907        "Received a watch record from service.\n");
908   record = PEERSTORE_parse_record_message (msg);
909   PEERSTORE_hash_key (record->sub_system, record->peer, record->key, &keyhash);
910   wc = GNUNET_CONTAINER_multihashmap_get (h->watches, &keyhash);
911   if (NULL == wc)
912   {
913     LOG (GNUNET_ERROR_TYPE_ERROR,
914          _("Received a watch result for a non existing watch.\n"));
915     PEERSTORE_destroy_record (record);
916     reconnect (h);
917     return;
918   }
919   if (NULL != wc->callback)
920     wc->callback (wc->callback_cls, record, NULL);
921   PEERSTORE_destroy_record (record);
922 }
923
924
925 /**
926  * Callback after MQ envelope is sent
927  *
928  * @param cls a `struct GNUNET_PEERSTORE_WatchContext *`
929  */
930 static void
931 watch_request_sent (void *cls)
932 {
933   struct GNUNET_PEERSTORE_WatchContext *wc = cls;
934
935   wc->request_sent = GNUNET_YES;
936   wc->ev = NULL;
937 }
938
939
940 /**
941  * Cancel a watch request
942  *
943  * @param wc handle to the watch request
944  */
945 void
946 GNUNET_PEERSTORE_watch_cancel (struct GNUNET_PEERSTORE_WatchContext *wc)
947 {
948   struct GNUNET_PEERSTORE_Handle *h = wc->h;
949   struct GNUNET_MQ_Envelope *ev;
950   struct StoreKeyHashMessage *hm;
951
952   LOG (GNUNET_ERROR_TYPE_DEBUG, "Canceling watch.\n");
953   if (GNUNET_YES == wc->request_sent)   /* If request already sent to service, send a cancel request. */
954   {
955     ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL);
956     hm->keyhash = wc->keyhash;
957     GNUNET_MQ_send (h->mq, ev);
958     wc->callback = NULL;
959     wc->callback_cls = NULL;
960   }
961   if (NULL != wc->ev)
962   {
963     GNUNET_MQ_send_cancel (wc->ev);
964     wc->ev = NULL;
965   }
966   GNUNET_CONTAINER_multihashmap_remove (h->watches, &wc->keyhash, wc);
967   GNUNET_free (wc);
968
969 }
970
971
972 /**
973  * Request watching a given key
974  * User will be notified with any new values added to key
975  *
976  * @param h handle to the PEERSTORE service
977  * @param sub_system name of sub system
978  * @param peer Peer identity
979  * @param key entry key string
980  * @param callback function called with each new value
981  * @param callback_cls closure for @a callback
982  * @return Handle to watch request
983  */
984 struct GNUNET_PEERSTORE_WatchContext *
985 GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h,
986                         const char *sub_system,
987                         const struct GNUNET_PeerIdentity *peer, const char *key,
988                         GNUNET_PEERSTORE_Processor callback, void *callback_cls)
989 {
990   struct GNUNET_MQ_Envelope *ev;
991   struct StoreKeyHashMessage *hm;
992   struct GNUNET_PEERSTORE_WatchContext *wc;
993
994   GNUNET_assert (NULL != sub_system);
995   GNUNET_assert (NULL != peer);
996   GNUNET_assert (NULL != key);
997   ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
998   PEERSTORE_hash_key (sub_system, peer, key, &hm->keyhash);
999   wc = GNUNET_new (struct GNUNET_PEERSTORE_WatchContext);
1000
1001   wc->callback = callback;
1002   wc->callback_cls = callback_cls;
1003   wc->ev = ev;
1004   wc->h = h;
1005   wc->request_sent = GNUNET_NO;
1006   wc->keyhash = hm->keyhash;
1007   if (NULL == h->watches)
1008     h->watches = GNUNET_CONTAINER_multihashmap_create (5, GNUNET_NO);
1009   GNUNET_CONTAINER_multihashmap_put (h->watches, &wc->keyhash, wc,
1010                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
1011   LOG (GNUNET_ERROR_TYPE_DEBUG,
1012        "Sending a watch request for ss `%s', peer `%s', key `%s'.\n",
1013        sub_system, GNUNET_i2s (peer), key);
1014   GNUNET_MQ_notify_sent (ev, &watch_request_sent, wc);
1015   GNUNET_MQ_send (h->mq, ev);
1016   return wc;
1017 }
1018
1019 /* end of peerstore_api.c */