-must notify client on timeout
[oweals/gnunet.git] / src / peerstore / peerstore_api.c
1 /*
2      This file is part of GNUnet.
3      (C)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file peerstore/peerstore_api.c
23  * @brief API for peerstore
24  * @author Omar Tarabai
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "peerstore.h"
29 #include "peerstore_common.h"
30
31 #define LOG(kind,...) GNUNET_log_from (kind, "peerstore-api",__VA_ARGS__)
32
33 /******************************************************************************/
34 /************************      DATA STRUCTURES     ****************************/
35 /******************************************************************************/
36
37 /**
38  * Handle to the PEERSTORE service.
39  */
40 struct GNUNET_PEERSTORE_Handle
41 {
42
43   /**
44    * Our configuration.
45    */
46   const struct GNUNET_CONFIGURATION_Handle *cfg;
47
48   /**
49    * Connection to the service.
50    */
51   struct GNUNET_CLIENT_Connection *client;
52
53   /**
54    * Message queue
55    */
56   struct GNUNET_MQ_Handle *mq;
57
58   /**
59    * Head of active STORE requests.
60    */
61   struct GNUNET_PEERSTORE_StoreContext *store_head;
62
63   /**
64    * Tail of active STORE requests.
65    */
66   struct GNUNET_PEERSTORE_StoreContext *store_tail;
67
68   /**
69    * Head of active ITERATE requests.
70    */
71   struct GNUNET_PEERSTORE_IterateContext *iterate_head;
72
73   /**
74    * Tail of active ITERATE requests.
75    */
76   struct GNUNET_PEERSTORE_IterateContext *iterate_tail;
77
78   /**
79    * Hashmap of watch requests
80    */
81   struct GNUNET_CONTAINER_MultiHashMap *watches;
82
83   /**
84    * Are we in the process of disconnecting but need to sync first?
85    */
86   int disconnecting;
87
88 };
89
90 /**
91  * Context for a store request
92  */
93 struct GNUNET_PEERSTORE_StoreContext
94 {
95   /**
96    * Kept in a DLL.
97    */
98   struct GNUNET_PEERSTORE_StoreContext *next;
99
100   /**
101    * Kept in a DLL.
102    */
103   struct GNUNET_PEERSTORE_StoreContext *prev;
104
105   /**
106    * Handle to the PEERSTORE service.
107    */
108   struct GNUNET_PEERSTORE_Handle *h;
109
110   /**
111    * MQ Envelope with store request message
112    */
113   struct GNUNET_MQ_Envelope *ev;
114
115   /**
116    * Continuation called with service response
117    */
118   GNUNET_PEERSTORE_Continuation cont;
119
120   /**
121    * Closure for 'cont'
122    */
123   void *cont_cls;
124
125 };
126
127 /**
128  * Context for a iterate request
129  */
130 struct GNUNET_PEERSTORE_IterateContext
131 {
132   /**
133    * Kept in a DLL.
134    */
135   struct GNUNET_PEERSTORE_IterateContext *next;
136
137   /**
138    * Kept in a DLL.
139    */
140   struct GNUNET_PEERSTORE_IterateContext *prev;
141
142   /**
143    * Handle to the PEERSTORE service.
144    */
145   struct GNUNET_PEERSTORE_Handle *h;
146
147   /**
148    * MQ Envelope with iterate request message
149    */
150   struct GNUNET_MQ_Envelope *ev;
151
152   /**
153    * Callback with each matching record
154    */
155   GNUNET_PEERSTORE_Processor callback;
156
157   /**
158    * Closure for 'callback'
159    */
160   void *callback_cls;
161
162   /**
163    * #GNUNET_YES / #GNUNET_NO
164    * Iterate request has been sent and we are still expecting records
165    */
166   int iterating;
167
168   /**
169    * Task identifier for the function called
170    * on iterate request timeout
171    */
172   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
173
174 };
175
176 /**
177  * Context for a watch request
178  */
179 struct GNUNET_PEERSTORE_WatchContext
180 {
181   /**
182    * Kept in a DLL.
183    */
184   struct GNUNET_PEERSTORE_WatchContext *next;
185
186   /**
187    * Kept in a DLL.
188    */
189   struct GNUNET_PEERSTORE_WatchContext *prev;
190
191   /**
192    * Handle to the PEERSTORE service.
193    */
194   struct GNUNET_PEERSTORE_Handle *h;
195
196   /**
197    * MQ Envelope with watch request message
198    */
199   struct GNUNET_MQ_Envelope *ev;
200
201   /**
202    * Callback with each record received
203    */
204   GNUNET_PEERSTORE_Processor callback;
205
206   /**
207    * Closure for 'callback'
208    */
209   void *callback_cls;
210
211   /**
212    * Hash of the combined key
213    */
214   struct GNUNET_HashCode keyhash;
215
216   /**
217    * #GNUNET_YES / #GNUNET_NO
218    * if sent, cannot be canceled
219    */
220   int request_sent;
221
222 };
223
224 /******************************************************************************/
225 /*******************             DECLARATIONS             *********************/
226 /******************************************************************************/
227
228 /**
229  * When a response for iterate request is received
230  *
231  * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
232  * @param msg message received, NULL on timeout or fatal error
233  */
234 static void
235 handle_iterate_result (void *cls, const struct GNUNET_MessageHeader *msg);
236
237 /**
238  * When a watch record is received
239  *
240  * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
241  * @param msg message received, NULL on timeout or fatal error
242  */
243 static void
244 handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg);
245
246 /**
247  * Close the existing connection to PEERSTORE and reconnect.
248  *
249  * @param h handle to the service
250  */
251 static void
252 reconnect (struct GNUNET_PEERSTORE_Handle *h);
253
254 /**
255  * Callback after MQ envelope is sent
256  *
257  * @param cls a 'struct GNUNET_PEERSTORE_WatchContext *'
258  */
259 static void
260 watch_request_sent (void *cls);
261
262 /**
263  * Callback after MQ envelope is sent
264  *
265  * @param cls a 'struct GNUNET_PEERSTORE_IterateContext *'
266  */
267 static void
268 iterate_request_sent (void *cls);
269
270 /**
271  * Callback after MQ envelope is sent
272  *
273  * @param cls a 'struct GNUNET_PEERSTORE_StoreContext *'
274  */
275 static void
276 store_request_sent (void *cls);
277
278 /**
279  * MQ message handlers
280  */
281 static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
282   {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD, 0},
283   {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END,
284    sizeof (struct GNUNET_MessageHeader)},
285   {&handle_watch_result, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD, 0},
286   GNUNET_MQ_HANDLERS_END
287 };
288
289 /******************************************************************************/
290 /*******************         CONNECTION FUNCTIONS         *********************/
291 /******************************************************************************/
292
293 static void
294 handle_client_error (void *cls, enum GNUNET_MQ_Error error)
295 {
296   struct GNUNET_PEERSTORE_Handle *h = cls;
297
298   LOG (GNUNET_ERROR_TYPE_ERROR,
299        _("Received an error notification from MQ of type: %d\n"), error);
300   reconnect (h);
301 }
302
303
304 /**
305  * Iterator over previous watches to resend them
306  */
307 static int
308 rewatch_it (void *cls, const struct GNUNET_HashCode *key, void *value)
309 {
310   struct GNUNET_PEERSTORE_Handle *h = cls;
311   struct GNUNET_PEERSTORE_WatchContext *wc = value;
312   struct StoreKeyHashMessage *hm;
313
314   if (GNUNET_YES == wc->request_sent)
315   {                             /* Envelope gone, create new one. */
316     wc->ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
317     hm->keyhash = wc->keyhash;
318     wc->request_sent = GNUNET_NO;
319   }
320   GNUNET_MQ_notify_sent (wc->ev, &watch_request_sent, wc);
321   GNUNET_MQ_send (h->mq, wc->ev);
322   return GNUNET_YES;
323 }
324
325
326 /**
327  * Close the existing connection to PEERSTORE and reconnect.
328  *
329  * @param h handle to the service
330  */
331 static void
332 reconnect (struct GNUNET_PEERSTORE_Handle *h)
333 {
334   struct GNUNET_PEERSTORE_IterateContext *ic;
335   GNUNET_PEERSTORE_Processor icb;
336   void *icb_cls;
337   struct GNUNET_PEERSTORE_StoreContext *sc;
338
339   LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n");
340   if (NULL != h->mq)
341   {
342     GNUNET_MQ_destroy (h->mq);
343     h->mq = NULL;
344   }
345   if (NULL != h->client)
346   {
347     GNUNET_CLIENT_disconnect (h->client);
348     h->client = NULL;
349   }
350   h->client = GNUNET_CLIENT_connect ("peerstore", h->cfg);
351   GNUNET_assert (NULL != h->client);
352   h->mq =
353       GNUNET_MQ_queue_for_connection_client (h->client, mq_handlers,
354                                              &handle_client_error, h);
355   LOG (GNUNET_ERROR_TYPE_DEBUG,
356        "Resending pending requests after reconnect.\n");
357   if (NULL != h->watches)
358   {
359     GNUNET_CONTAINER_multihashmap_iterate (h->watches, &rewatch_it, h);
360   }
361   ic = h->iterate_head;
362   while (NULL != ic)
363   {
364     if (GNUNET_YES == ic->iterating)
365     {
366       icb = ic->callback;
367       icb_cls = ic->callback_cls;
368       GNUNET_PEERSTORE_iterate_cancel (ic);
369       if (NULL != icb)
370         icb (icb_cls, NULL, _("Iteration canceled due to reconnection."));
371     }
372     else
373     {
374       GNUNET_MQ_notify_sent (ic->ev, &iterate_request_sent, ic);
375       GNUNET_MQ_send (h->mq, ic->ev);
376     }
377     ic = ic->next;
378   }
379   sc = h->store_head;
380   while (NULL != sc)
381   {
382     GNUNET_MQ_notify_sent (sc->ev, &store_request_sent, sc);
383     GNUNET_MQ_send (h->mq, sc->ev);
384     sc = sc->next;
385   }
386 }
387
388
389 /**
390  * Iterator over watch requests to cancel them.
391  *
392  * @param cls unsused
393  * @param key key to the watch request
394  * @param value watch context
395  * @return #GNUNET_YES to continue iteration
396  */
397 static int
398 destroy_watch (void *cls, const struct GNUNET_HashCode *key, void *value)
399 {
400   struct GNUNET_PEERSTORE_WatchContext *wc = value;
401
402   GNUNET_PEERSTORE_watch_cancel (wc);
403   return GNUNET_YES;
404 }
405
406
407 /**
408  * Kill the connection to the service. This can be delayed in case of pending
409  * STORE requests and the user explicitly asked to sync first. Otherwise it is
410  * performed instantly.
411  *
412  * @param h Handle to the service.
413  */
414 static void
415 do_disconnect (struct GNUNET_PEERSTORE_Handle *h)
416 {
417   if (NULL != h->mq)
418   {
419     GNUNET_MQ_destroy (h->mq);
420     h->mq = NULL;
421   }
422   if (NULL != h->client)
423   {
424     GNUNET_CLIENT_disconnect (h->client);
425     h->client = NULL;
426   }
427   GNUNET_free (h);
428 }
429
430
431 /**
432  * Connect to the PEERSTORE service.
433  *
434  * @return NULL on error
435  */
436 struct GNUNET_PEERSTORE_Handle *
437 GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
438 {
439   struct GNUNET_PEERSTORE_Handle *h;
440
441   h = GNUNET_new (struct GNUNET_PEERSTORE_Handle);
442
443   h->client = GNUNET_CLIENT_connect ("peerstore", cfg);
444   if (NULL == h->client)
445   {
446     GNUNET_free (h);
447     return NULL;
448   }
449   h->cfg = cfg;
450   h->disconnecting = GNUNET_NO;
451   h->mq =
452       GNUNET_MQ_queue_for_connection_client (h->client, mq_handlers,
453                                              &handle_client_error, h);
454   if (NULL == h->mq)
455   {
456     GNUNET_free (h);
457     return NULL;
458   }
459   LOG (GNUNET_ERROR_TYPE_DEBUG, "New connection created\n");
460   return h;
461 }
462
463
464 /**
465  * Disconnect from the PEERSTORE service. Any pending ITERATE and WATCH requests
466  * will be canceled.
467  * Any pending STORE requests will depend on @e snyc_first flag.
468  *
469  * @param h handle to disconnect
470  * @param sync_first send any pending STORE requests before disconnecting
471  */
472 void
473 GNUNET_PEERSTORE_disconnect (struct GNUNET_PEERSTORE_Handle *h, int sync_first)
474 {
475   struct GNUNET_PEERSTORE_IterateContext *ic;
476   struct GNUNET_PEERSTORE_IterateContext *ic_iter;
477   struct GNUNET_PEERSTORE_StoreContext *sc;
478   struct GNUNET_PEERSTORE_StoreContext *sc_iter;
479
480   LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting.\n");
481   if (NULL != h->watches)
482   {
483     GNUNET_CONTAINER_multihashmap_iterate (h->watches,
484                                            &destroy_watch,
485                                            NULL);
486     GNUNET_CONTAINER_multihashmap_destroy (h->watches);
487     h->watches = NULL;
488   }
489   ic_iter = h->iterate_head;
490   while (NULL != ic_iter)
491   {
492     GNUNET_break (0);
493     ic = ic_iter;
494     ic_iter = ic_iter->next;
495     GNUNET_PEERSTORE_iterate_cancel (ic);
496   }
497   if (NULL != h->store_head)
498   {
499     if (GNUNET_YES == sync_first)
500     {
501       LOG (GNUNET_ERROR_TYPE_DEBUG,
502            "Delaying disconnection due to pending store requests.\n");
503       h->disconnecting = GNUNET_YES;
504       return;
505     }
506     sc_iter = h->store_head;
507     while (NULL != sc_iter)
508     {
509       sc = sc_iter;
510       sc_iter = sc_iter->next;
511       GNUNET_PEERSTORE_store_cancel (sc);
512     }
513   }
514   do_disconnect (h);
515 }
516
517
518 /******************************************************************************/
519 /*******************            STORE FUNCTIONS           *********************/
520 /******************************************************************************/
521
522 /**
523  * Callback after MQ envelope is sent
524  *
525  * @param cls a 'struct GNUNET_PEERSTORE_StoreContext *'
526  */
527 static void
528 store_request_sent (void *cls)
529 {
530   struct GNUNET_PEERSTORE_StoreContext *sc = cls;
531   GNUNET_PEERSTORE_Continuation cont;
532   void *cont_cls;
533
534   sc->ev = NULL;
535   cont = sc->cont;
536   cont_cls = sc->cont_cls;
537   GNUNET_PEERSTORE_store_cancel (sc);
538   if (NULL != cont)
539     cont (cont_cls, GNUNET_OK);
540 }
541
542
543 /**
544  * Cancel a store request
545  *
546  * @param sc Store request context
547  */
548 void
549 GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc)
550 {
551   struct GNUNET_PEERSTORE_Handle *h = sc->h;
552
553   if (NULL != sc->ev)
554   {
555     GNUNET_MQ_send_cancel (sc->ev);
556     sc->ev = NULL;
557   }
558   GNUNET_CONTAINER_DLL_remove (sc->h->store_head, sc->h->store_tail, sc);
559   GNUNET_free (sc);
560   if (GNUNET_YES == h->disconnecting && NULL == h->store_head)
561     do_disconnect (h);
562 }
563
564
565 /**
566  * Store a new entry in the PEERSTORE.
567  * Note that stored entries can be lost in some cases
568  * such as power failure.
569  *
570  * @param h Handle to the PEERSTORE service
571  * @param sub_system name of the sub system
572  * @param peer Peer Identity
573  * @param key entry key
574  * @param value entry value BLOB
575  * @param size size of @e value
576  * @param expiry absolute time after which the entry is (possibly) deleted
577  * @param options options specific to the storage operation
578  * @param cont Continuation function after the store request is sent
579  * @param cont_cls Closure for 'cont'
580  */
581 struct GNUNET_PEERSTORE_StoreContext *
582 GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
583                         const char *sub_system,
584                         const struct GNUNET_PeerIdentity *peer, const char *key,
585                         const void *value, size_t size,
586                         struct GNUNET_TIME_Absolute expiry,
587                         enum GNUNET_PEERSTORE_StoreOption options,
588                         GNUNET_PEERSTORE_Continuation cont, void *cont_cls)
589 {
590   struct GNUNET_MQ_Envelope *ev;
591   struct GNUNET_PEERSTORE_StoreContext *sc;
592
593   LOG (GNUNET_ERROR_TYPE_DEBUG,
594        "Storing value (size: %lu) for subsytem `%s', peer `%s', key `%s'\n",
595        size, sub_system, GNUNET_i2s (peer), key);
596   ev = PEERSTORE_create_record_mq_envelope (sub_system, peer, key, value, size,
597                                             &expiry, options,
598                                             GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
599   sc = GNUNET_new (struct GNUNET_PEERSTORE_StoreContext);
600
601   sc->ev = ev;
602   sc->cont = cont;
603   sc->cont_cls = cont_cls;
604   sc->h = h;
605   GNUNET_CONTAINER_DLL_insert_tail (h->store_head, h->store_tail, sc);
606   GNUNET_MQ_notify_sent (ev, &store_request_sent, sc);
607   GNUNET_MQ_send (h->mq, ev);
608   return sc;
609
610 }
611
612
613 /******************************************************************************/
614 /*******************           ITERATE FUNCTIONS          *********************/
615 /******************************************************************************/
616
617 /**
618  * When a response for iterate request is received
619  *
620  * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
621  * @param msg message received, NULL on timeout or fatal error
622  */
623 static void
624 handle_iterate_result (void *cls, const struct GNUNET_MessageHeader *msg)
625 {
626   struct GNUNET_PEERSTORE_Handle *h = cls;
627   struct GNUNET_PEERSTORE_IterateContext *ic;
628   GNUNET_PEERSTORE_Processor callback;
629   void *callback_cls;
630   uint16_t msg_type;
631   struct GNUNET_PEERSTORE_Record *record;
632   int continue_iter;
633
634   ic = h->iterate_head;
635   if (NULL == ic)
636   {
637     LOG (GNUNET_ERROR_TYPE_ERROR,
638          _("Unexpected iteration response, this should not happen.\n"));
639     reconnect (h);
640     return;
641   }
642   callback = ic->callback;
643   callback_cls = ic->callback_cls;
644   if (NULL == msg)              /* Connection error */
645   {
646
647     if (NULL != callback)
648       callback (callback_cls, NULL,
649                 _("Error communicating with `PEERSTORE' service."));
650     reconnect (h);
651     return;
652   }
653   msg_type = ntohs (msg->type);
654   if (GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END == msg_type)
655   {
656     ic->iterating = GNUNET_NO;
657     GNUNET_PEERSTORE_iterate_cancel (ic);
658     if (NULL != callback)
659       callback (callback_cls, NULL, NULL);
660     return;
661   }
662   if (NULL != callback)
663   {
664     record = PEERSTORE_parse_record_message (msg);
665     if (NULL == record)
666       continue_iter =
667           callback (callback_cls, NULL,
668                     _("Received a malformed response from service."));
669     else
670     {
671       continue_iter = callback (callback_cls, record, NULL);
672       PEERSTORE_destroy_record (record);
673     }
674     if (GNUNET_NO == continue_iter)
675       ic->callback = NULL;
676   }
677 }
678
679
680 /**
681  * Callback after MQ envelope is sent
682  *
683  * @param cls a `struct GNUNET_PEERSTORE_IterateContext *`
684  */
685 static void
686 iterate_request_sent (void *cls)
687 {
688   struct GNUNET_PEERSTORE_IterateContext *ic = cls;
689
690   LOG (GNUNET_ERROR_TYPE_DEBUG,
691        "Iterate request sent to service.\n");
692   ic->iterating = GNUNET_YES;
693   ic->ev = NULL;
694 }
695
696
697 /**
698  * Called when the iterate request is timedout
699  *
700  * @param cls a `struct GNUNET_PEERSTORE_IterateContext *`
701  * @param tc Scheduler task context (unused)
702  */
703 static void
704 iterate_timeout (void *cls,
705                  const struct GNUNET_SCHEDULER_TaskContext *tc)
706 {
707   struct GNUNET_PEERSTORE_IterateContext *ic = cls;
708   GNUNET_PEERSTORE_Processor callback;
709   void *callback_cls;
710
711   ic->timeout_task = GNUNET_SCHEDULER_NO_TASK;
712   callback = ic->callback;
713   callback_cls = ic->callback_cls;
714   GNUNET_PEERSTORE_iterate_cancel (ic);
715   if (NULL != callback)
716     callback (callback_cls,
717               NULL,
718               _("timeout"));
719 }
720
721
722 /**
723  * Cancel an iterate request
724  * Please do not call after the iterate request is done
725  *
726  * @param ic Iterate request context as returned by GNUNET_PEERSTORE_iterate()
727  */
728 void
729 GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic)
730 {
731   if (GNUNET_SCHEDULER_NO_TASK != ic->timeout_task)
732   {
733     GNUNET_SCHEDULER_cancel (ic->timeout_task);
734     ic->timeout_task = GNUNET_SCHEDULER_NO_TASK;
735   }
736   if (GNUNET_NO == ic->iterating)
737   {
738     if (NULL != ic->ev)
739     {
740       GNUNET_MQ_send_cancel (ic->ev);
741       ic->ev = NULL;
742     }
743     GNUNET_CONTAINER_DLL_remove (ic->h->iterate_head,
744                                  ic->h->iterate_tail,
745                                  ic);
746     GNUNET_free (ic);
747   }
748   else
749     ic->callback = NULL;
750 }
751
752
753 /**
754  * Iterate over records matching supplied key information
755  *
756  * @param h handle to the PEERSTORE service
757  * @param sub_system name of sub system
758  * @param peer Peer identity (can be NULL)
759  * @param key entry key string (can be NULL)
760  * @param timeout time after which the iterate request is canceled
761  * @param callback function called with each matching record, all NULL's on end
762  * @param callback_cls closure for @a callback
763  * @return Handle to iteration request
764  */
765 struct GNUNET_PEERSTORE_IterateContext *
766 GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
767                           const char *sub_system,
768                           const struct GNUNET_PeerIdentity *peer,
769                           const char *key,
770                           struct GNUNET_TIME_Relative timeout,
771                           GNUNET_PEERSTORE_Processor callback,
772                           void *callback_cls)
773 {
774   struct GNUNET_MQ_Envelope *ev;
775   struct GNUNET_PEERSTORE_IterateContext *ic;
776
777   ev = PEERSTORE_create_record_mq_envelope (sub_system, peer, key, NULL, 0,
778                                             NULL, 0,
779                                             GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
780   ic = GNUNET_new (struct GNUNET_PEERSTORE_IterateContext);
781
782   ic->callback = callback;
783   ic->callback_cls = callback_cls;
784   ic->ev = ev;
785   ic->h = h;
786   ic->iterating = GNUNET_NO;
787   GNUNET_CONTAINER_DLL_insert_tail (h->iterate_head,
788                                     h->iterate_tail,
789                                     ic);
790   LOG (GNUNET_ERROR_TYPE_DEBUG,
791        "Sending an iterate request for sub system `%s'\n", sub_system);
792   GNUNET_MQ_notify_sent (ev, &iterate_request_sent, ic);
793   GNUNET_MQ_send (h->mq, ev);
794   ic->timeout_task =
795       GNUNET_SCHEDULER_add_delayed (timeout, &iterate_timeout, ic);
796   return ic;
797 }
798
799
800 /******************************************************************************/
801 /*******************            WATCH FUNCTIONS           *********************/
802 /******************************************************************************/
803
804 /**
805  * When a watch record is received
806  *
807  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
808  * @param msg message received, NULL on timeout or fatal error
809  */
810 static void
811 handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg)
812 {
813   struct GNUNET_PEERSTORE_Handle *h = cls;
814   struct GNUNET_PEERSTORE_Record *record;
815   struct GNUNET_HashCode keyhash;
816   struct GNUNET_PEERSTORE_WatchContext *wc;
817
818   if (NULL == msg)
819   {
820     LOG (GNUNET_ERROR_TYPE_ERROR,
821          _("Problem receiving a watch response, no way to determine which request.\n"));
822     reconnect (h);
823     return;
824   }
825   LOG (GNUNET_ERROR_TYPE_DEBUG,
826        "Received a watch record from service.\n");
827   record = PEERSTORE_parse_record_message (msg);
828   PEERSTORE_hash_key (record->sub_system, record->peer, record->key, &keyhash);
829   wc = GNUNET_CONTAINER_multihashmap_get (h->watches, &keyhash);
830   if (NULL == wc)
831   {
832     LOG (GNUNET_ERROR_TYPE_ERROR,
833          _("Received a watch result for a non existing watch.\n"));
834     PEERSTORE_destroy_record (record);
835     reconnect (h);
836     return;
837   }
838   if (NULL != wc->callback)
839     wc->callback (wc->callback_cls, record, NULL);
840   PEERSTORE_destroy_record (record);
841 }
842
843
844 /**
845  * Callback after MQ envelope is sent
846  *
847  * @param cls a `struct GNUNET_PEERSTORE_WatchContext *`
848  */
849 static void
850 watch_request_sent (void *cls)
851 {
852   struct GNUNET_PEERSTORE_WatchContext *wc = cls;
853
854   wc->request_sent = GNUNET_YES;
855   wc->ev = NULL;
856 }
857
858
859 /**
860  * Cancel a watch request
861  *
862  * @param wc handle to the watch request
863  */
864 void
865 GNUNET_PEERSTORE_watch_cancel (struct GNUNET_PEERSTORE_WatchContext *wc)
866 {
867   struct GNUNET_PEERSTORE_Handle *h = wc->h;
868   struct GNUNET_MQ_Envelope *ev;
869   struct StoreKeyHashMessage *hm;
870
871   LOG (GNUNET_ERROR_TYPE_DEBUG, "Canceling watch.\n");
872   if (GNUNET_YES == wc->request_sent)   /* If request already sent to service, send a cancel request. */
873   {
874     ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL);
875     hm->keyhash = wc->keyhash;
876     GNUNET_MQ_send (h->mq, ev);
877     wc->callback = NULL;
878     wc->callback_cls = NULL;
879   }
880   if (NULL != wc->ev)
881   {
882     GNUNET_MQ_send_cancel (wc->ev);
883     wc->ev = NULL;
884   }
885   GNUNET_CONTAINER_multihashmap_remove (h->watches, &wc->keyhash, wc);
886   GNUNET_free (wc);
887
888 }
889
890
891 /**
892  * Request watching a given key
893  * User will be notified with any new values added to key
894  *
895  * @param h handle to the PEERSTORE service
896  * @param sub_system name of sub system
897  * @param peer Peer identity
898  * @param key entry key string
899  * @param callback function called with each new value
900  * @param callback_cls closure for @a callback
901  * @return Handle to watch request
902  */
903 struct GNUNET_PEERSTORE_WatchContext *
904 GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h,
905                         const char *sub_system,
906                         const struct GNUNET_PeerIdentity *peer, const char *key,
907                         GNUNET_PEERSTORE_Processor callback, void *callback_cls)
908 {
909   struct GNUNET_MQ_Envelope *ev;
910   struct StoreKeyHashMessage *hm;
911   struct GNUNET_PEERSTORE_WatchContext *wc;
912
913   GNUNET_assert (NULL != sub_system);
914   GNUNET_assert (NULL != peer);
915   GNUNET_assert (NULL != key);
916   ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
917   PEERSTORE_hash_key (sub_system, peer, key, &hm->keyhash);
918   wc = GNUNET_new (struct GNUNET_PEERSTORE_WatchContext);
919
920   wc->callback = callback;
921   wc->callback_cls = callback_cls;
922   wc->ev = ev;
923   wc->h = h;
924   wc->request_sent = GNUNET_NO;
925   wc->keyhash = hm->keyhash;
926   if (NULL == h->watches)
927     h->watches = GNUNET_CONTAINER_multihashmap_create (5, GNUNET_NO);
928   GNUNET_CONTAINER_multihashmap_put (h->watches, &wc->keyhash, wc,
929                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
930   LOG (GNUNET_ERROR_TYPE_DEBUG,
931        "Sending a watch request for ss `%s', peer `%s', key `%s'.\n",
932        sub_system, GNUNET_i2s (peer), key);
933   GNUNET_MQ_notify_sent (ev, &watch_request_sent, wc);
934   GNUNET_MQ_send (h->mq, ev);
935   return wc;
936 }
937
938 /* end of peerstore_api.c */