bebf9678ee3fc4a11a6eebdf4ce8f53a6a2ea6fa
[oweals/gnunet.git] / src / peerstore / peerstore_api.c
1 /*
2      This file is part of GNUnet.
3      (C)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file peerstore/peerstore_api.c
23  * @brief API for peerstore
24  * @author Omar Tarabai
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "peerstore.h"
29 #include "peerstore_common.h"
30
31 #define LOG(kind,...) GNUNET_log_from (kind, "peerstore-api",__VA_ARGS__)
32
33 /******************************************************************************/
34 /************************      DATA STRUCTURES     ****************************/
35 /******************************************************************************/
36
37 /**
38  * Handle to the PEERSTORE service.
39  */
40 struct GNUNET_PEERSTORE_Handle
41 {
42
43   /**
44    * Our configuration.
45    */
46   const struct GNUNET_CONFIGURATION_Handle *cfg;
47
48   /**
49    * Connection to the service.
50    */
51   struct GNUNET_CLIENT_Connection *client;
52
53   /**
54    * Message queue
55    */
56   struct GNUNET_MQ_Handle *mq;
57
58   /**
59    * Head of active STORE requests.
60    */
61   struct GNUNET_PEERSTORE_StoreContext *store_head;
62
63   /**
64    * Tail of active STORE requests.
65    */
66   struct GNUNET_PEERSTORE_StoreContext *store_tail;
67
68   /**
69    * Head of active ITERATE requests.
70    */
71   struct GNUNET_PEERSTORE_IterateContext *iterate_head;
72
73   /**
74    * Tail of active ITERATE requests.
75    */
76   struct GNUNET_PEERSTORE_IterateContext *iterate_tail;
77
78   /**
79    * Hashmap of watch requests
80    */
81   struct GNUNET_CONTAINER_MultiHashMap *watches;
82
83   /**
84    * Are we in the process of disconnecting but need to sync first?
85    */
86   int disconnecting;
87
88 };
89
90 /**
91  * Context for a store request
92  */
93 struct GNUNET_PEERSTORE_StoreContext
94 {
95   /**
96    * Kept in a DLL.
97    */
98   struct GNUNET_PEERSTORE_StoreContext *next;
99
100   /**
101    * Kept in a DLL.
102    */
103   struct GNUNET_PEERSTORE_StoreContext *prev;
104
105   /**
106    * Handle to the PEERSTORE service.
107    */
108   struct GNUNET_PEERSTORE_Handle *h;
109
110   /**
111    * MQ Envelope with store request message
112    */
113   struct GNUNET_MQ_Envelope *ev;
114
115   /**
116    * Continuation called with service response
117    */
118   GNUNET_PEERSTORE_Continuation cont;
119
120   /**
121    * Closure for 'cont'
122    */
123   void *cont_cls;
124
125 };
126
127 /**
128  * Context for a iterate request
129  */
130 struct GNUNET_PEERSTORE_IterateContext
131 {
132   /**
133    * Kept in a DLL.
134    */
135   struct GNUNET_PEERSTORE_IterateContext *next;
136
137   /**
138    * Kept in a DLL.
139    */
140   struct GNUNET_PEERSTORE_IterateContext *prev;
141
142   /**
143    * Handle to the PEERSTORE service.
144    */
145   struct GNUNET_PEERSTORE_Handle *h;
146
147   /**
148    * MQ Envelope with iterate request message
149    */
150   struct GNUNET_MQ_Envelope *ev;
151
152   /**
153    * Callback with each matching record
154    */
155   GNUNET_PEERSTORE_Processor callback;
156
157   /**
158    * Closure for 'callback'
159    */
160   void *callback_cls;
161
162   /**
163    * #GNUNET_YES / #GNUNET_NO
164    * if sent, cannot be canceled
165    */
166   int request_sent;
167
168   /**
169    * Task identifier for the function called
170    * on iterate request timeout
171    */
172   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
173
174 };
175
176 /**
177  * Context for a watch request
178  */
179 struct GNUNET_PEERSTORE_WatchContext
180 {
181   /**
182    * Kept in a DLL.
183    */
184   struct GNUNET_PEERSTORE_WatchContext *next;
185
186   /**
187    * Kept in a DLL.
188    */
189   struct GNUNET_PEERSTORE_WatchContext *prev;
190
191   /**
192    * Handle to the PEERSTORE service.
193    */
194   struct GNUNET_PEERSTORE_Handle *h;
195
196   /**
197    * MQ Envelope with watch request message
198    */
199   struct GNUNET_MQ_Envelope *ev;
200
201   /**
202    * Callback with each record received
203    */
204   GNUNET_PEERSTORE_Processor callback;
205
206   /**
207    * Closure for 'callback'
208    */
209   void *callback_cls;
210
211   /**
212    * Hash of the combined key
213    */
214   struct GNUNET_HashCode keyhash;
215
216   /**
217    * #GNUNET_YES / #GNUNET_NO
218    * if sent, cannot be canceled
219    */
220   int request_sent;
221
222 };
223
224 /******************************************************************************/
225 /*******************             DECLARATIONS             *********************/
226 /******************************************************************************/
227
228 /**
229  * When a response for iterate request is received
230  *
231  * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
232  * @param msg message received, NULL on timeout or fatal error
233  */
234 static void
235 handle_iterate_result (void *cls, const struct GNUNET_MessageHeader *msg);
236
237 /**
238  * When a watch record is received
239  *
240  * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
241  * @param msg message received, NULL on timeout or fatal error
242  */
243 static void
244 handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg);
245
246 /**
247  * Close the existing connection to PEERSTORE and reconnect.
248  *
249  * @param h handle to the service
250  */
251 static void
252 reconnect (struct GNUNET_PEERSTORE_Handle *h);
253
254 /**
255  * Callback after MQ envelope is sent
256  *
257  * @param cls a 'struct GNUNET_PEERSTORE_WatchContext *'
258  */
259 static void
260 watch_request_sent (void *cls);
261
262 /**
263  * Callback after MQ envelope is sent
264  *
265  * @param cls a 'struct GNUNET_PEERSTORE_IterateContext *'
266  */
267 static void
268 iterate_request_sent (void *cls);
269
270 /**
271  * Callback after MQ envelope is sent
272  *
273  * @param cls a 'struct GNUNET_PEERSTORE_StoreContext *'
274  */
275 static void
276 store_request_sent (void *cls);
277
278 /**
279  * MQ message handlers
280  */
281 static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
282   {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD, 0},
283   {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END,
284    sizeof (struct GNUNET_MessageHeader)},
285   {&handle_watch_result, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD, 0},
286   GNUNET_MQ_HANDLERS_END
287 };
288
289 /******************************************************************************/
290 /*******************         CONNECTION FUNCTIONS         *********************/
291 /******************************************************************************/
292
293 static void
294 handle_client_error (void *cls, enum GNUNET_MQ_Error error)
295 {
296   struct GNUNET_PEERSTORE_Handle *h = cls;
297
298   LOG (GNUNET_ERROR_TYPE_ERROR,
299        _("Received an error notification from MQ of type: %d\n"), error);
300   reconnect (h);
301 }
302
303
304 /**
305  * Iterator over previous watches to resend them
306  */
307 static int
308 rewatch_it (void *cls, const struct GNUNET_HashCode *key, void *value)
309 {
310   struct GNUNET_PEERSTORE_Handle *h = cls;
311   struct GNUNET_PEERSTORE_WatchContext *wc = value;
312   struct StoreKeyHashMessage *hm;
313
314   if (GNUNET_YES == wc->request_sent)
315   {                             /* Envelope gone, create new one. */
316     wc->ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
317     hm->keyhash = wc->keyhash;
318     wc->request_sent = GNUNET_NO;
319   }
320   GNUNET_MQ_notify_sent (wc->ev, &watch_request_sent, wc);
321   GNUNET_MQ_send (h->mq, wc->ev);
322   return GNUNET_YES;
323 }
324
325
326 /**
327  * Close the existing connection to PEERSTORE and reconnect.
328  *
329  * @param h handle to the service
330  */
331 static void
332 reconnect (struct GNUNET_PEERSTORE_Handle *h)
333 {
334   struct GNUNET_PEERSTORE_IterateContext *ic;
335   GNUNET_PEERSTORE_Processor icb;
336   void *icb_cls;
337   struct GNUNET_PEERSTORE_StoreContext *sc;
338
339   LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n");
340   if (NULL != h->mq)
341   {
342     GNUNET_MQ_destroy (h->mq);
343     h->mq = NULL;
344   }
345   if (NULL != h->client)
346   {
347     GNUNET_CLIENT_disconnect (h->client);
348     h->client = NULL;
349   }
350   h->client = GNUNET_CLIENT_connect ("peerstore", h->cfg);
351   GNUNET_assert (NULL != h->client);
352   h->mq =
353       GNUNET_MQ_queue_for_connection_client (h->client, mq_handlers,
354                                              &handle_client_error, h);
355   LOG (GNUNET_ERROR_TYPE_DEBUG,
356        "Resending pending requests after reconnect.\n");
357   if (NULL != h->watches)
358   {
359     GNUNET_CONTAINER_multihashmap_iterate (h->watches, &rewatch_it, h);
360   }
361   ic = h->iterate_head;
362   while (NULL != ic)
363   {
364     if (GNUNET_YES == ic->request_sent)
365     {
366       icb = ic->callback;
367       icb_cls = ic->callback_cls;
368       GNUNET_PEERSTORE_iterate_cancel (ic);
369       if (NULL != icb)
370         icb (icb_cls, NULL, _("Iteration canceled due to reconnection."));
371     }
372     else
373     {
374       GNUNET_MQ_notify_sent (ic->ev, &iterate_request_sent, ic);
375       GNUNET_MQ_send (h->mq, ic->ev);
376     }
377     ic = ic->next;
378   }
379   sc = h->store_head;
380   while (NULL != sc)
381   {
382     GNUNET_MQ_notify_sent (sc->ev, &store_request_sent, sc);
383     GNUNET_MQ_send (h->mq, sc->ev);
384     sc = sc->next;
385   }
386 }
387
388
389 /**
390  * Iterator over watch requests to cancel them.
391  *
392  * @param cls unsused
393  * @param key key to the watch request
394  * @param value watch context
395  * @return #GNUNET_YES to continue iteration
396  */
397 static int
398 destroy_watch (void *cls, const struct GNUNET_HashCode *key, void *value)
399 {
400   struct GNUNET_PEERSTORE_WatchContext *wc = value;
401
402   GNUNET_PEERSTORE_watch_cancel (wc);
403   return GNUNET_YES;
404 }
405
406
407 /**
408  * Kill the connection to the service. This can be delayed in case of pending
409  * STORE requests and the user explicitly asked to sync first. Otherwise it is
410  * performed instantly.
411  *
412  * @param h Handle to the service.
413  */
414 static void
415 do_disconnect (struct GNUNET_PEERSTORE_Handle *h)
416 {
417   if (NULL != h->mq)
418   {
419     GNUNET_MQ_destroy (h->mq);
420     h->mq = NULL;
421   }
422   if (NULL != h->client)
423   {
424     GNUNET_CLIENT_disconnect (h->client);
425     h->client = NULL;
426   }
427   GNUNET_free (h);
428 }
429
430
431 /**
432  * Connect to the PEERSTORE service.
433  *
434  * @return NULL on error
435  */
436 struct GNUNET_PEERSTORE_Handle *
437 GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
438 {
439   struct GNUNET_PEERSTORE_Handle *h;
440
441   h = GNUNET_new (struct GNUNET_PEERSTORE_Handle);
442
443   h->client = GNUNET_CLIENT_connect ("peerstore", cfg);
444   if (NULL == h->client)
445   {
446     GNUNET_free (h);
447     return NULL;
448   }
449   h->cfg = cfg;
450   h->disconnecting = GNUNET_NO;
451   h->mq =
452       GNUNET_MQ_queue_for_connection_client (h->client, mq_handlers,
453                                              &handle_client_error, h);
454   if (NULL == h->mq)
455   {
456     GNUNET_free (h);
457     return NULL;
458   }
459   LOG (GNUNET_ERROR_TYPE_DEBUG, "New connection created\n");
460   return h;
461 }
462
463
464 /**
465  * Disconnect from the PEERSTORE service. Any pending ITERATE and WATCH requests
466  * will be canceled.
467  * Any pending STORE requests will depend on @e snyc_first flag.
468  *
469  * @param h handle to disconnect
470  * @param sync_first send any pending STORE requests before disconnecting
471  */
472 void
473 GNUNET_PEERSTORE_disconnect (struct GNUNET_PEERSTORE_Handle *h, int sync_first)
474 {
475   struct GNUNET_PEERSTORE_IterateContext *ic;
476   struct GNUNET_PEERSTORE_IterateContext *ic_iter;
477   struct GNUNET_PEERSTORE_StoreContext *sc;
478   struct GNUNET_PEERSTORE_StoreContext *sc_iter;
479
480   LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting.\n");
481   if (NULL != h->watches)
482   {
483     GNUNET_CONTAINER_multihashmap_iterate (h->watches, &destroy_watch, NULL);
484     GNUNET_CONTAINER_multihashmap_destroy (h->watches);
485     h->watches = NULL;
486   }
487   ic_iter = h->iterate_head;
488   while (NULL != ic_iter)
489   {
490     ic = ic_iter;
491     ic_iter = ic_iter->next;
492     GNUNET_PEERSTORE_iterate_cancel (ic);
493   }
494   if (NULL != h->store_head)
495   {
496     if (GNUNET_YES == sync_first)
497     {
498       LOG (GNUNET_ERROR_TYPE_DEBUG,
499            "Delaying disconnection due to pending store requests.\n");
500       h->disconnecting = GNUNET_YES;
501       return;
502     }
503     sc_iter = h->store_head;
504     while (NULL != sc_iter)
505     {
506       sc = sc_iter;
507       sc_iter = sc_iter->next;
508       GNUNET_PEERSTORE_store_cancel (sc);
509     }
510   }
511   do_disconnect (h);
512 }
513
514
515 /******************************************************************************/
516 /*******************            STORE FUNCTIONS           *********************/
517 /******************************************************************************/
518
519 /**
520  * Callback after MQ envelope is sent
521  *
522  * @param cls a 'struct GNUNET_PEERSTORE_StoreContext *'
523  */
524 static void
525 store_request_sent (void *cls)
526 {
527   struct GNUNET_PEERSTORE_StoreContext *sc = cls;
528   GNUNET_PEERSTORE_Continuation cont;
529   void *cont_cls;
530
531   sc->ev = NULL;
532   cont = sc->cont;
533   cont_cls = sc->cont_cls;
534   GNUNET_PEERSTORE_store_cancel (sc);
535   if (NULL != cont)
536     cont (cont_cls, GNUNET_OK);
537 }
538
539
540 /**
541  * Cancel a store request
542  *
543  * @param sc Store request context
544  */
545 void
546 GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc)
547 {
548   struct GNUNET_PEERSTORE_Handle *h = sc->h;
549
550   if (NULL != sc->ev)
551   {
552     GNUNET_MQ_send_cancel (sc->ev);
553     sc->ev = NULL;
554   }
555   GNUNET_CONTAINER_DLL_remove (sc->h->store_head, sc->h->store_tail, sc);
556   GNUNET_free (sc);
557   if (GNUNET_YES == h->disconnecting && NULL == h->store_head)
558     do_disconnect (h);
559 }
560
561
562 /**
563  * Store a new entry in the PEERSTORE.
564  * Note that stored entries can be lost in some cases
565  * such as power failure.
566  *
567  * @param h Handle to the PEERSTORE service
568  * @param sub_system name of the sub system
569  * @param peer Peer Identity
570  * @param key entry key
571  * @param value entry value BLOB
572  * @param size size of @e value
573  * @param expiry absolute time after which the entry is (possibly) deleted
574  * @param options options specific to the storage operation
575  * @param cont Continuation function after the store request is sent
576  * @param cont_cls Closure for 'cont'
577  */
578 struct GNUNET_PEERSTORE_StoreContext *
579 GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
580                         const char *sub_system,
581                         const struct GNUNET_PeerIdentity *peer, const char *key,
582                         const void *value, size_t size,
583                         struct GNUNET_TIME_Absolute expiry,
584                         enum GNUNET_PEERSTORE_StoreOption options,
585                         GNUNET_PEERSTORE_Continuation cont, void *cont_cls)
586 {
587   struct GNUNET_MQ_Envelope *ev;
588   struct GNUNET_PEERSTORE_StoreContext *sc;
589
590   LOG (GNUNET_ERROR_TYPE_DEBUG,
591        "Storing value (size: %lu) for subsytem `%s', peer `%s', key `%s'\n",
592        size, sub_system, GNUNET_i2s (peer), key);
593   ev = PEERSTORE_create_record_mq_envelope (sub_system, peer, key, value, size,
594                                             &expiry, options,
595                                             GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
596   sc = GNUNET_new (struct GNUNET_PEERSTORE_StoreContext);
597
598   sc->ev = ev;
599   sc->cont = cont;
600   sc->cont_cls = cont_cls;
601   sc->h = h;
602   GNUNET_CONTAINER_DLL_insert_tail (h->store_head, h->store_tail, sc);
603   GNUNET_MQ_notify_sent (ev, &store_request_sent, sc);
604   GNUNET_MQ_send (h->mq, ev);
605   return sc;
606
607 }
608
609
610 /******************************************************************************/
611 /*******************           ITERATE FUNCTIONS          *********************/
612 /******************************************************************************/
613
614 /**
615  * When a response for iterate request is received
616  *
617  * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
618  * @param msg message received, NULL on timeout or fatal error
619  */
620 static void
621 handle_iterate_result (void *cls, const struct GNUNET_MessageHeader *msg)
622 {
623   struct GNUNET_PEERSTORE_Handle *h = cls;
624   struct GNUNET_PEERSTORE_IterateContext *ic;
625   GNUNET_PEERSTORE_Processor callback;
626   void *callback_cls;
627   uint16_t msg_type;
628   struct GNUNET_PEERSTORE_Record *record;
629   int continue_iter;
630
631   ic = h->iterate_head;
632   if (NULL == ic)
633   {
634     LOG (GNUNET_ERROR_TYPE_ERROR,
635          _("Unexpected iteration response, this should not happen.\n"));
636     reconnect (h);
637     return;
638   }
639   callback = ic->callback;
640   callback_cls = ic->callback_cls;
641   if (NULL == msg)              /* Connection error */
642   {
643
644     if (NULL != callback)
645       callback (callback_cls, NULL,
646                 _("Error communicating with `PEERSTORE' service."));
647     reconnect (h);
648     return;
649   }
650   msg_type = ntohs (msg->type);
651   if (GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END == msg_type)
652   {
653     ic->request_sent = GNUNET_NO;
654     GNUNET_PEERSTORE_iterate_cancel (ic);
655     if (NULL != callback)
656       callback (callback_cls, NULL, NULL);
657     return;
658   }
659   if (NULL != callback)
660   {
661     record = PEERSTORE_parse_record_message (msg);
662     if (NULL == record)
663       continue_iter =
664           callback (callback_cls, NULL,
665                     _("Received a malformed response from service."));
666     else
667     {
668       continue_iter = callback (callback_cls, record, NULL);
669       PEERSTORE_destroy_record (record);
670     }
671     if (GNUNET_NO == continue_iter)
672       ic->callback = NULL;
673   }
674 }
675
676
677 /**
678  * Callback after MQ envelope is sent
679  *
680  * @param cls a 'struct GNUNET_PEERSTORE_IterateContext *'
681  */
682 static void
683 iterate_request_sent (void *cls)
684 {
685   struct GNUNET_PEERSTORE_IterateContext *ic = cls;
686
687   LOG (GNUNET_ERROR_TYPE_DEBUG, "Iterate request sent to service.\n");
688   ic->request_sent = GNUNET_YES;
689   ic->ev = NULL;
690 }
691
692
693 /**
694  * Called when the iterate request is timedout
695  *
696  * @param cls a 'struct GNUNET_PEERSTORE_IterateContext *'
697  * @param tc Scheduler task context (unused)
698  */
699 static void
700 iterate_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
701 {
702   struct GNUNET_PEERSTORE_IterateContext *ic = cls;
703
704   ic->timeout_task = GNUNET_SCHEDULER_NO_TASK;
705   GNUNET_PEERSTORE_iterate_cancel (ic);
706 }
707
708
709 /**
710  * Cancel an iterate request
711  * Please do not call after the iterate request is done
712  *
713  * @param ic Iterate request context as returned by GNUNET_PEERSTORE_iterate()
714  */
715 void
716 GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic)
717 {
718   if (GNUNET_SCHEDULER_NO_TASK != ic->timeout_task)
719   {
720     GNUNET_SCHEDULER_cancel (ic->timeout_task);
721     ic->timeout_task = GNUNET_SCHEDULER_NO_TASK;
722   }
723   if (GNUNET_NO == ic->request_sent)
724   {
725     if (NULL != ic->ev)
726     {
727       GNUNET_MQ_send_cancel (ic->ev);
728       ic->ev = NULL;
729     }
730     GNUNET_CONTAINER_DLL_remove (ic->h->iterate_head, ic->h->iterate_tail, ic);
731     GNUNET_free (ic);
732   }
733   else
734     ic->callback = NULL;
735 }
736
737
738 /**
739  * Iterate over records matching supplied key information
740  *
741  * @param h handle to the PEERSTORE service
742  * @param sub_system name of sub system
743  * @param peer Peer identity (can be NULL)
744  * @param key entry key string (can be NULL)
745  * @param timeout time after which the iterate request is canceled
746  * @param callback function called with each matching record, all NULL's on end
747  * @param callback_cls closure for @a callback
748  * @return Handle to iteration request
749  */
750 struct GNUNET_PEERSTORE_IterateContext *
751 GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
752                           const char *sub_system,
753                           const struct GNUNET_PeerIdentity *peer,
754                           const char *key, struct GNUNET_TIME_Relative timeout,
755                           GNUNET_PEERSTORE_Processor callback,
756                           void *callback_cls)
757 {
758   struct GNUNET_MQ_Envelope *ev;
759   struct GNUNET_PEERSTORE_IterateContext *ic;
760
761   ev = PEERSTORE_create_record_mq_envelope (sub_system, peer, key, NULL, 0,
762                                             NULL, 0,
763                                             GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
764   ic = GNUNET_new (struct GNUNET_PEERSTORE_IterateContext);
765
766   ic->callback = callback;
767   ic->callback_cls = callback_cls;
768   ic->ev = ev;
769   ic->h = h;
770   ic->request_sent = GNUNET_NO;
771   GNUNET_CONTAINER_DLL_insert_tail (h->iterate_head, h->iterate_tail, ic);
772   LOG (GNUNET_ERROR_TYPE_DEBUG,
773        "Sending an iterate request for sub system `%s'\n", sub_system);
774   GNUNET_MQ_notify_sent (ev, &iterate_request_sent, ic);
775   GNUNET_MQ_send (h->mq, ev);
776   ic->timeout_task =
777       GNUNET_SCHEDULER_add_delayed (timeout, &iterate_timeout, ic);
778   return ic;
779 }
780
781
782 /******************************************************************************/
783 /*******************            WATCH FUNCTIONS           *********************/
784 /******************************************************************************/
785
786 /**
787  * When a watch record is received
788  *
789  * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
790  * @param msg message received, NULL on timeout or fatal error
791  */
792 static void
793 handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg)
794 {
795   struct GNUNET_PEERSTORE_Handle *h = cls;
796   struct GNUNET_PEERSTORE_Record *record;
797   struct GNUNET_HashCode keyhash;
798   struct GNUNET_PEERSTORE_WatchContext *wc;
799
800   if (NULL == msg)
801   {
802     LOG (GNUNET_ERROR_TYPE_ERROR,
803          _
804          ("Problem receiving a watch response, no way to determine which request.\n"));
805     reconnect (h);
806     return;
807   }
808   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a watch record from service.\n");
809   record = PEERSTORE_parse_record_message (msg);
810   PEERSTORE_hash_key (record->sub_system, record->peer, record->key, &keyhash);
811   wc = GNUNET_CONTAINER_multihashmap_get (h->watches, &keyhash);
812   if (NULL == wc)
813   {
814     LOG (GNUNET_ERROR_TYPE_ERROR,
815          _("Received a watch result for a non existing watch.\n"));
816     reconnect (h);
817     return;
818   }
819   if (NULL != wc->callback)
820     wc->callback (wc->callback_cls, record, NULL);
821   PEERSTORE_destroy_record (record);
822 }
823
824
825 /**
826  * Callback after MQ envelope is sent
827  *
828  * @param cls a 'struct GNUNET_PEERSTORE_WatchContext *'
829  */
830 static void
831 watch_request_sent (void *cls)
832 {
833   struct GNUNET_PEERSTORE_WatchContext *wc = cls;
834
835   wc->request_sent = GNUNET_YES;
836   wc->ev = NULL;
837 }
838
839
840 /**
841  * Cancel a watch request
842  *
843  * @param wc handle to the watch request
844  */
845 void
846 GNUNET_PEERSTORE_watch_cancel (struct GNUNET_PEERSTORE_WatchContext *wc)
847 {
848   struct GNUNET_PEERSTORE_Handle *h = wc->h;
849   struct GNUNET_MQ_Envelope *ev;
850   struct StoreKeyHashMessage *hm;
851
852   LOG (GNUNET_ERROR_TYPE_DEBUG, "Canceling watch.\n");
853   if (GNUNET_YES == wc->request_sent)   /* If request already sent to service, send a cancel request. */
854   {
855     ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL);
856     hm->keyhash = wc->keyhash;
857     GNUNET_MQ_send (h->mq, ev);
858     wc->callback = NULL;
859     wc->callback_cls = NULL;
860   }
861   if (NULL != wc->ev)
862   {
863     GNUNET_MQ_send_cancel (wc->ev);
864     wc->ev = NULL;
865   }
866   GNUNET_CONTAINER_multihashmap_remove (h->watches, &wc->keyhash, wc);
867   GNUNET_free (wc);
868
869 }
870
871
872 /**
873  * Request watching a given key
874  * User will be notified with any new values added to key
875  *
876  * @param h handle to the PEERSTORE service
877  * @param sub_system name of sub system
878  * @param peer Peer identity
879  * @param key entry key string
880  * @param callback function called with each new value
881  * @param callback_cls closure for @a callback
882  * @return Handle to watch request
883  */
884 struct GNUNET_PEERSTORE_WatchContext *
885 GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h,
886                         const char *sub_system,
887                         const struct GNUNET_PeerIdentity *peer, const char *key,
888                         GNUNET_PEERSTORE_Processor callback, void *callback_cls)
889 {
890   struct GNUNET_MQ_Envelope *ev;
891   struct StoreKeyHashMessage *hm;
892   struct GNUNET_PEERSTORE_WatchContext *wc;
893
894   GNUNET_assert (NULL != sub_system);
895   GNUNET_assert (NULL != peer);
896   GNUNET_assert (NULL != key);
897   ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
898   PEERSTORE_hash_key (sub_system, peer, key, &hm->keyhash);
899   wc = GNUNET_new (struct GNUNET_PEERSTORE_WatchContext);
900
901   wc->callback = callback;
902   wc->callback_cls = callback_cls;
903   wc->ev = ev;
904   wc->h = h;
905   wc->request_sent = GNUNET_NO;
906   wc->keyhash = hm->keyhash;
907   if (NULL == h->watches)
908     h->watches = GNUNET_CONTAINER_multihashmap_create (5, GNUNET_NO);
909   GNUNET_CONTAINER_multihashmap_put (h->watches, &wc->keyhash, wc,
910                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
911   LOG (GNUNET_ERROR_TYPE_DEBUG,
912        "Sending a watch request for ss `%s', peer `%s', key `%s'.\n",
913        sub_system, GNUNET_i2s (peer), key);
914   GNUNET_MQ_notify_sent (ev, &watch_request_sent, wc);
915   GNUNET_MQ_send (h->mq, ev);
916   return wc;
917 }
918
919 /* end of peerstore_api.c */