81d287b534ab77b6875c75285ca55f94806c5d37
[oweals/gnunet.git] / src / peerstore / peerstore_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file peerstore/peerstore_api.c
23  * @brief API for peerstore
24  * @author Omar Tarabai
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "peerstore.h"
29 #include "peerstore_common.h"
30
31 #define LOG(kind,...) GNUNET_log_from (kind, "peerstore-api",__VA_ARGS__)
32
33 /******************************************************************************/
34 /************************      DATA STRUCTURES     ****************************/
35 /******************************************************************************/
36
37 /**
38  * Handle to the PEERSTORE service.
39  */
40 struct GNUNET_PEERSTORE_Handle
41 {
42
43   /**
44    * Our configuration.
45    */
46   const struct GNUNET_CONFIGURATION_Handle *cfg;
47
48   /**
49    * Connection to the service.
50    */
51   struct GNUNET_CLIENT_Connection *client;
52
53   /**
54    * Message queue
55    */
56   struct GNUNET_MQ_Handle *mq;
57
58   /**
59    * Head of active STORE requests.
60    */
61   struct GNUNET_PEERSTORE_StoreContext *store_head;
62
63   /**
64    * Tail of active STORE requests.
65    */
66   struct GNUNET_PEERSTORE_StoreContext *store_tail;
67
68   /**
69    * Head of active ITERATE requests.
70    */
71   struct GNUNET_PEERSTORE_IterateContext *iterate_head;
72
73   /**
74    * Tail of active ITERATE requests.
75    */
76   struct GNUNET_PEERSTORE_IterateContext *iterate_tail;
77
78   /**
79    * Hashmap of watch requests
80    */
81   struct GNUNET_CONTAINER_MultiHashMap *watches;
82
83   /**
84    * Are we in the process of disconnecting but need to sync first?
85    */
86   int disconnecting;
87
88 };
89
90 /**
91  * Context for a store request
92  */
93 struct GNUNET_PEERSTORE_StoreContext
94 {
95   /**
96    * Kept in a DLL.
97    */
98   struct GNUNET_PEERSTORE_StoreContext *next;
99
100   /**
101    * Kept in a DLL.
102    */
103   struct GNUNET_PEERSTORE_StoreContext *prev;
104
105   /**
106    * Handle to the PEERSTORE service.
107    */
108   struct GNUNET_PEERSTORE_Handle *h;
109
110   /**
111    * MQ Envelope with store request message
112    */
113   struct GNUNET_MQ_Envelope *ev;
114
115   /**
116    * Continuation called with service response
117    */
118   GNUNET_PEERSTORE_Continuation cont;
119
120   /**
121    * Closure for 'cont'
122    */
123   void *cont_cls;
124
125 };
126
127 /**
128  * Context for a iterate request
129  */
130 struct GNUNET_PEERSTORE_IterateContext
131 {
132   /**
133    * Kept in a DLL.
134    */
135   struct GNUNET_PEERSTORE_IterateContext *next;
136
137   /**
138    * Kept in a DLL.
139    */
140   struct GNUNET_PEERSTORE_IterateContext *prev;
141
142   /**
143    * Handle to the PEERSTORE service.
144    */
145   struct GNUNET_PEERSTORE_Handle *h;
146
147   /**
148    * MQ Envelope with iterate request message
149    */
150   struct GNUNET_MQ_Envelope *ev;
151
152   /**
153    * Callback with each matching record
154    */
155   GNUNET_PEERSTORE_Processor callback;
156
157   /**
158    * Closure for 'callback'
159    */
160   void *callback_cls;
161
162   /**
163    * #GNUNET_YES / #GNUNET_NO
164    * if sent, cannot be canceled
165    */
166   int request_sent;
167
168   /**
169    * Task identifier for the function called
170    * on iterate request timeout
171    */
172   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
173
174 };
175
176 /**
177  * Context for a watch request
178  */
179 struct GNUNET_PEERSTORE_WatchContext
180 {
181   /**
182    * Kept in a DLL.
183    */
184   struct GNUNET_PEERSTORE_WatchContext *next;
185
186   /**
187    * Kept in a DLL.
188    */
189   struct GNUNET_PEERSTORE_WatchContext *prev;
190
191   /**
192    * Handle to the PEERSTORE service.
193    */
194   struct GNUNET_PEERSTORE_Handle *h;
195
196   /**
197    * MQ Envelope with watch request message
198    */
199   struct GNUNET_MQ_Envelope *ev;
200
201   /**
202    * Callback with each record received
203    */
204   GNUNET_PEERSTORE_Processor callback;
205
206   /**
207    * Closure for 'callback'
208    */
209   void *callback_cls;
210
211   /**
212    * Hash of the combined key
213    */
214   struct GNUNET_HashCode keyhash;
215
216   /**
217    * #GNUNET_YES / #GNUNET_NO
218    * if sent, cannot be canceled
219    */
220   int request_sent;
221
222 };
223
224 /******************************************************************************/
225 /*******************             DECLARATIONS             *********************/
226 /******************************************************************************/
227
228 /**
229  * When a response for iterate request is received
230  *
231  * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
232  * @param msg message received, NULL on timeout or fatal error
233  */
234 static void
235 handle_iterate_result (void *cls, const struct GNUNET_MessageHeader *msg);
236
237 /**
238  * When a watch record is received
239  *
240  * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
241  * @param msg message received, NULL on timeout or fatal error
242  */
243 static void
244 handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg);
245
246 /**
247  * Close the existing connection to PEERSTORE and reconnect.
248  *
249  * @param h handle to the service
250  */
251 static void
252 reconnect (struct GNUNET_PEERSTORE_Handle *h);
253
254 /**
255  * Callback after MQ envelope is sent
256  *
257  * @param cls a 'struct GNUNET_PEERSTORE_WatchContext *'
258  */
259 static void watch_request_sent (void *cls);
260
261 /**
262  * Callback after MQ envelope is sent
263  *
264  * @param cls a 'struct GNUNET_PEERSTORE_IterateContext *'
265  */
266 static void iterate_request_sent (void *cls);
267
268 /**
269  * Callback after MQ envelope is sent
270  *
271  * @param cls a 'struct GNUNET_PEERSTORE_StoreContext *'
272  */
273 static void store_request_sent (void *cls);
274
275 /**
276  * MQ message handlers
277  */
278 static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
279     {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD, 0},
280     {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END, sizeof(struct GNUNET_MessageHeader)},
281     {&handle_watch_result, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD, 0},
282     GNUNET_MQ_HANDLERS_END
283 };
284
285 /******************************************************************************/
286 /*******************         CONNECTION FUNCTIONS         *********************/
287 /******************************************************************************/
288
289 static void
290 handle_client_error (void *cls, enum GNUNET_MQ_Error error)
291 {
292   struct GNUNET_PEERSTORE_Handle *h = cls;
293
294   LOG(GNUNET_ERROR_TYPE_ERROR, _("Received an error notification from MQ of type: %d\n"), error);
295   reconnect(h);
296 }
297
298 /**
299  * Iterator over previous watches to resend them
300  */
301 static int rewatch_it(void *cls,
302     const struct GNUNET_HashCode *key,
303     void *value)
304 {
305   struct GNUNET_PEERSTORE_Handle *h = cls;
306   struct GNUNET_PEERSTORE_WatchContext *wc = value;
307   struct StoreKeyHashMessage *hm;
308
309   if(GNUNET_YES == wc->request_sent)
310   { /* Envelope gone, create new one. */
311     wc->ev = GNUNET_MQ_msg(hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
312     hm->keyhash = wc->keyhash;
313     wc->request_sent = GNUNET_NO;
314   }
315   GNUNET_MQ_notify_sent(wc->ev, &watch_request_sent, wc);
316   GNUNET_MQ_send(h->mq, wc->ev);
317   return GNUNET_YES;
318 }
319
320 /**
321  * Close the existing connection to PEERSTORE and reconnect.
322  *
323  * @param h handle to the service
324  */
325 static void
326 reconnect (struct GNUNET_PEERSTORE_Handle *h)
327 {
328   struct GNUNET_PEERSTORE_IterateContext *ic;
329   GNUNET_PEERSTORE_Processor icb;
330   void *icb_cls;
331   struct GNUNET_PEERSTORE_StoreContext *sc;
332
333   LOG(GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n");
334   if (NULL != h->mq)
335   {
336     GNUNET_MQ_destroy(h->mq);
337     h->mq = NULL;
338   }
339   if (NULL != h->client)
340   {
341     GNUNET_CLIENT_disconnect (h->client);
342     h->client = NULL;
343   }
344   h->client = GNUNET_CLIENT_connect ("peerstore", h->cfg);
345   GNUNET_assert(NULL != h->client);
346   h->mq = GNUNET_MQ_queue_for_connection_client(h->client,
347       mq_handlers,
348       &handle_client_error,
349       h);
350   LOG(GNUNET_ERROR_TYPE_DEBUG,
351       "Resending pending requests after reconnect.\n");
352   if (NULL != h->watches)
353   {
354     GNUNET_CONTAINER_multihashmap_iterate(h->watches,
355         &rewatch_it, h);
356   }
357   ic = h->iterate_head;
358   while (NULL != ic)
359   {
360     if (GNUNET_YES == ic->request_sent)
361     {
362       icb = ic->callback;
363       icb_cls = ic->callback_cls;
364       GNUNET_PEERSTORE_iterate_cancel(ic);
365       if(NULL != icb)
366         icb(icb_cls, NULL,_("Iteration canceled due to reconnection."));
367     }
368     else
369     {
370       GNUNET_MQ_notify_sent(ic->ev, &iterate_request_sent, ic);
371       GNUNET_MQ_send(h->mq, ic->ev);
372     }
373     ic = ic->next;
374   }
375   sc = h->store_head;
376   while (NULL != sc)
377   {
378     GNUNET_MQ_notify_sent(sc->ev, &store_request_sent, sc);
379     GNUNET_MQ_send(h->mq, sc->ev);
380     sc = sc->next;
381   }
382 }
383
384 /**
385  * Iterator over watch requests to cancel them.
386  *
387  * @param cls unsused
388  * @param key key to the watch request
389  * @param value watch context
390  * @return #GNUNET_YES to continue iteration
391  */
392 static int
393 destroy_watch (void *cls, const struct GNUNET_HashCode *key, void *value)
394 {
395   struct GNUNET_PEERSTORE_WatchContext *wc = value;
396
397   GNUNET_PEERSTORE_watch_cancel (wc);
398   return GNUNET_YES;
399 }
400
401 /**
402  * Kill the connection to the service. This can be delayed in case of pending
403  * STORE requests and the user explicitly asked to sync first. Otherwise it is
404  * performed instantly.
405  *
406  * @param h Handle to the service.
407  */
408 static void do_disconnect (struct GNUNET_PEERSTORE_Handle *h)
409 {
410   if(NULL != h->mq)
411   {
412     GNUNET_MQ_destroy(h->mq);
413     h->mq = NULL;
414   }
415   if (NULL != h->client)
416   {
417     GNUNET_CLIENT_disconnect (h->client);
418     h->client = NULL;
419   }
420   GNUNET_free(h);
421 }
422
423 /**
424  * Connect to the PEERSTORE service.
425  *
426  * @return NULL on error
427  */
428 struct GNUNET_PEERSTORE_Handle *
429 GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
430 {
431   struct GNUNET_PEERSTORE_Handle *h;
432
433   h = GNUNET_new (struct GNUNET_PEERSTORE_Handle);
434   h->client = GNUNET_CLIENT_connect ("peerstore", cfg);
435   if(NULL == h->client)
436   {
437     GNUNET_free(h);
438     return NULL;
439   }
440   h->cfg = cfg;
441   h->disconnecting = GNUNET_NO;
442   h->mq = GNUNET_MQ_queue_for_connection_client(h->client,
443       mq_handlers,
444       &handle_client_error,
445       h);
446   if(NULL == h->mq)
447   {
448     GNUNET_free(h);
449     return NULL;
450   }
451   LOG(GNUNET_ERROR_TYPE_DEBUG, "New connection created\n");
452   return h;
453 }
454
455 /**
456  * Disconnect from the PEERSTORE service. Any pending ITERATE and WATCH requests
457  * will be canceled. Any pending STORE requests will depend on @snyc_first flag.
458  *
459  * @param h handle to disconnect
460  * @param sync_first send any pending STORE requests before disconnecting
461  */
462 void
463 GNUNET_PEERSTORE_disconnect (struct GNUNET_PEERSTORE_Handle *h, int sync_first)
464 {
465   struct GNUNET_PEERSTORE_IterateContext *ic;
466   struct GNUNET_PEERSTORE_IterateContext *ic_iter;
467   struct GNUNET_PEERSTORE_StoreContext *sc;
468   struct GNUNET_PEERSTORE_StoreContext *sc_iter;
469
470   LOG(GNUNET_ERROR_TYPE_DEBUG, "Disconnecting.\n");
471   if(NULL != h->watches)
472   {
473     GNUNET_CONTAINER_multihashmap_iterate (h->watches, &destroy_watch, NULL);
474     GNUNET_CONTAINER_multihashmap_destroy(h->watches);
475     h->watches = NULL;
476   }
477   ic_iter = h->iterate_head;
478   while (NULL != ic_iter)
479   {
480     ic = ic_iter;
481     ic_iter = ic_iter->next;
482     GNUNET_PEERSTORE_iterate_cancel (ic);
483   }
484   if (NULL != h->store_head)
485   {
486     if (GNUNET_YES == sync_first)
487     {
488       LOG (GNUNET_ERROR_TYPE_DEBUG,
489           "Delaying disconnection due to pending store requests.\n");
490       h->disconnecting = GNUNET_YES;
491       return;
492     }
493     sc_iter = h->store_head;
494     while (NULL != sc_iter)
495     {
496       sc = sc_iter;
497       sc_iter = sc_iter->next;
498       GNUNET_PEERSTORE_store_cancel (sc);
499     }
500   }
501   do_disconnect (h);
502 }
503
504
505 /******************************************************************************/
506 /*******************            STORE FUNCTIONS           *********************/
507 /******************************************************************************/
508
509 /**
510  * Callback after MQ envelope is sent
511  *
512  * @param cls a 'struct GNUNET_PEERSTORE_StoreContext *'
513  */
514 static void store_request_sent (void *cls)
515 {
516   struct GNUNET_PEERSTORE_StoreContext *sc = cls;
517   GNUNET_PEERSTORE_Continuation cont;
518   void *cont_cls;
519
520   sc->ev = NULL;
521   cont = sc->cont;
522   cont_cls = sc->cont_cls;
523   GNUNET_PEERSTORE_store_cancel(sc);
524   if(NULL != cont)
525     cont(cont_cls, GNUNET_OK);
526 }
527
528 /**
529  * Cancel a store request
530  *
531  * @param sc Store request context
532  */
533 void
534 GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc)
535 {
536   struct GNUNET_PEERSTORE_Handle *h = sc->h;
537
538   if(NULL != sc->ev)
539   {
540     GNUNET_MQ_send_cancel(sc->ev);
541     sc->ev = NULL;
542   }
543   GNUNET_CONTAINER_DLL_remove(sc->h->store_head, sc->h->store_tail, sc);
544   GNUNET_free(sc);
545   if (GNUNET_YES == h->disconnecting && NULL == h->store_head)
546     do_disconnect (h);
547 }
548
549 /**
550  * Store a new entry in the PEERSTORE.
551  * Note that stored entries can be lost in some cases
552  * such as power failure.
553  *
554  * @param h Handle to the PEERSTORE service
555  * @param sub_system name of the sub system
556  * @param peer Peer Identity
557  * @param key entry key
558  * @param value entry value BLOB
559  * @param size size of 'value'
560  * @param expiry absolute time after which the entry is (possibly) deleted
561  * @param cont Continuation function after the store request is sent
562  * @param cont_cls Closure for 'cont'
563  */
564 struct GNUNET_PEERSTORE_StoreContext *
565 GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
566     const char *sub_system,
567     const struct GNUNET_PeerIdentity *peer,
568     const char *key,
569     const void *value,
570     size_t size,
571     struct GNUNET_TIME_Absolute expiry,
572     enum GNUNET_PEERSTORE_StoreOption options,
573     GNUNET_PEERSTORE_Continuation cont,
574     void *cont_cls)
575 {
576   struct GNUNET_MQ_Envelope *ev;
577   struct GNUNET_PEERSTORE_StoreContext *sc;
578
579   LOG (GNUNET_ERROR_TYPE_DEBUG,
580       "Storing value (size: %lu) for subsytem `%s', peer `%s', key `%s'\n",
581       size, sub_system, GNUNET_i2s (peer), key);
582   ev = PEERSTORE_create_record_mq_envelope(sub_system,
583       peer,
584       key,
585       value,
586       size,
587       &expiry,
588       options,
589       GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
590   sc = GNUNET_new(struct GNUNET_PEERSTORE_StoreContext);
591   sc->ev = ev;
592   sc->cont = cont;
593   sc->cont_cls = cont_cls;
594   sc->h = h;
595   GNUNET_CONTAINER_DLL_insert_tail(h->store_head, h->store_tail, sc);
596   GNUNET_MQ_notify_sent(ev, &store_request_sent, sc);
597   GNUNET_MQ_send(h->mq, ev);
598   return sc;
599
600 }
601
602 /******************************************************************************/
603 /*******************           ITERATE FUNCTIONS          *********************/
604 /******************************************************************************/
605
606 /**
607  * When a response for iterate request is received
608  *
609  * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
610  * @param msg message received, NULL on timeout or fatal error
611  */
612 static void
613 handle_iterate_result (void *cls, const struct GNUNET_MessageHeader *msg)
614 {
615   struct GNUNET_PEERSTORE_Handle *h = cls;
616   struct GNUNET_PEERSTORE_IterateContext *ic;
617   GNUNET_PEERSTORE_Processor callback;
618   void *callback_cls;
619   uint16_t msg_type;
620   struct GNUNET_PEERSTORE_Record *record;
621   int continue_iter;
622
623   ic = h->iterate_head;
624   if(NULL == ic)
625   {
626     LOG(GNUNET_ERROR_TYPE_ERROR, _("Unexpected iteration response, this should not happen.\n"));
627     reconnect(h);
628     return;
629   }
630   callback = ic->callback;
631   callback_cls = ic->callback_cls;
632   if(NULL == msg) /* Connection error */
633   {
634
635     if(NULL != callback)
636       callback(callback_cls, NULL,
637           _("Error communicating with `PEERSTORE' service."));
638     reconnect(h);
639     return;
640   }
641   msg_type = ntohs(msg->type);
642   if(GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END == msg_type)
643   {
644     ic->request_sent = GNUNET_NO;
645     GNUNET_PEERSTORE_iterate_cancel(ic);
646     if(NULL != callback)
647       callback(callback_cls, NULL, NULL);
648     return;
649   }
650   if(NULL != callback)
651   {
652     record = PEERSTORE_parse_record_message(msg);
653     if(NULL == record)
654       continue_iter = callback(callback_cls, NULL, _("Received a malformed response from service."));
655     else
656     {
657       continue_iter = callback(callback_cls, record, NULL);
658       PEERSTORE_destroy_record(record);
659     }
660     if(GNUNET_NO == continue_iter)
661       ic->callback = NULL;
662   }
663
664 }
665
666 /**
667  * Callback after MQ envelope is sent
668  *
669  * @param cls a 'struct GNUNET_PEERSTORE_IterateContext *'
670  */
671 static void iterate_request_sent (void *cls)
672 {
673   struct GNUNET_PEERSTORE_IterateContext *ic = cls;
674
675   LOG(GNUNET_ERROR_TYPE_DEBUG, "Iterate request sent to service.\n");
676   ic->request_sent = GNUNET_YES;
677   ic->ev = NULL;
678 }
679
680 /**
681  * Called when the iterate request is timedout
682  *
683  * @param cls a 'struct GNUNET_PEERSTORE_IterateContext *'
684  */
685 static void
686 iterate_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
687 {
688   struct GNUNET_PEERSTORE_IterateContext *ic = cls;
689
690   ic->timeout_task = GNUNET_SCHEDULER_NO_TASK;
691   GNUNET_PEERSTORE_iterate_cancel(ic);
692 }
693
694 /**
695  * Cancel an iterate request
696  * Please do not call after the iterate request is done
697  *
698  * @param ic Iterate request context as returned by GNUNET_PEERSTORE_iterate()
699  */
700 void
701 GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic)
702 {
703   if(GNUNET_SCHEDULER_NO_TASK != ic->timeout_task)
704   {
705     GNUNET_SCHEDULER_cancel(ic->timeout_task);
706     ic->timeout_task = GNUNET_SCHEDULER_NO_TASK;
707   }
708   if(GNUNET_NO == ic->request_sent)
709   {
710     if(NULL != ic->ev)
711     {
712       GNUNET_MQ_send_cancel(ic->ev);
713       ic->ev = NULL;
714     }
715     GNUNET_CONTAINER_DLL_remove(ic->h->iterate_head, ic->h->iterate_tail, ic);
716     GNUNET_free(ic);
717   }
718   else
719     ic->callback = NULL;
720 }
721
722 /**
723  * Iterate over records matching supplied key information
724  *
725  * @param h handle to the PEERSTORE service
726  * @param sub_system name of sub system
727  * @param peer Peer identity (can be NULL)
728  * @param key entry key string (can be NULL)
729  * @param timeout time after which the iterate request is canceled
730  * @param callback function called with each matching record, all NULL's on end
731  * @param callback_cls closure for @a callback
732  * @return Handle to iteration request
733  */
734 struct GNUNET_PEERSTORE_IterateContext *
735 GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
736     const char *sub_system,
737     const struct GNUNET_PeerIdentity *peer,
738     const char *key,
739     struct GNUNET_TIME_Relative timeout,
740     GNUNET_PEERSTORE_Processor callback, void *callback_cls)
741 {
742   struct GNUNET_MQ_Envelope *ev;
743   struct GNUNET_PEERSTORE_IterateContext *ic;
744
745   ev = PEERSTORE_create_record_mq_envelope(sub_system,
746       peer,
747       key,
748       NULL,
749       0,
750       NULL,
751       0,
752       GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
753   ic = GNUNET_new(struct GNUNET_PEERSTORE_IterateContext);
754   ic->callback = callback;
755   ic->callback_cls = callback_cls;
756   ic->ev = ev;
757   ic->h = h;
758   ic->request_sent = GNUNET_NO;
759   GNUNET_CONTAINER_DLL_insert_tail(h->iterate_head, h->iterate_tail, ic);
760   LOG(GNUNET_ERROR_TYPE_DEBUG,
761         "Sending an iterate request for sub system `%s'\n", sub_system);
762   GNUNET_MQ_notify_sent(ev, &iterate_request_sent, ic);
763   GNUNET_MQ_send(h->mq, ev);
764   ic->timeout_task = GNUNET_SCHEDULER_add_delayed(timeout, &iterate_timeout, ic);
765   return ic;
766 }
767
768 /******************************************************************************/
769 /*******************            WATCH FUNCTIONS           *********************/
770 /******************************************************************************/
771
772 /**
773  * When a watch record is received
774  *
775  * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
776  * @param msg message received, NULL on timeout or fatal error
777  */
778 static void
779 handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg)
780 {
781   struct GNUNET_PEERSTORE_Handle *h = cls;
782   struct GNUNET_PEERSTORE_Record *record;
783   struct GNUNET_HashCode keyhash;
784   struct GNUNET_PEERSTORE_WatchContext *wc;
785
786   if(NULL == msg)
787   {
788     LOG(GNUNET_ERROR_TYPE_ERROR,
789         _("Problem receiving a watch response, no way to determine which request.\n"));
790     reconnect(h);
791     return;
792   }
793   LOG(GNUNET_ERROR_TYPE_DEBUG, "Received a watch record from service.\n");
794   record = PEERSTORE_parse_record_message(msg);
795   PEERSTORE_hash_key(record->sub_system,
796       record->peer, record->key, &keyhash);
797   wc = GNUNET_CONTAINER_multihashmap_get(h->watches, &keyhash);
798   if(NULL == wc)
799   {
800     LOG(GNUNET_ERROR_TYPE_ERROR,
801         _("Received a watch result for a non existing watch.\n"));
802     reconnect(h);
803     return;
804   }
805   if(NULL != wc->callback)
806     wc->callback(wc->callback_cls, record, NULL);
807   PEERSTORE_destroy_record(record);
808 }
809
810 /**
811  * Callback after MQ envelope is sent
812  *
813  * @param cls a 'struct GNUNET_PEERSTORE_WatchContext *'
814  */
815 static void watch_request_sent (void *cls)
816 {
817   struct GNUNET_PEERSTORE_WatchContext *wc = cls;
818
819   wc->request_sent = GNUNET_YES;
820   wc->ev = NULL;
821 }
822
823 /**
824  * Cancel a watch request
825  *
826  * @wc handle to the watch request
827  */
828 void
829 GNUNET_PEERSTORE_watch_cancel(struct GNUNET_PEERSTORE_WatchContext *wc)
830 {
831   struct GNUNET_PEERSTORE_Handle *h = wc->h;
832   struct GNUNET_MQ_Envelope *ev;
833   struct StoreKeyHashMessage *hm;
834
835   LOG(GNUNET_ERROR_TYPE_DEBUG, "Canceling watch.\n");
836   if(GNUNET_YES == wc->request_sent) /* If request already sent to service, send a cancel request. */
837   {
838     ev = GNUNET_MQ_msg(hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL);
839     hm->keyhash = wc->keyhash;
840     GNUNET_MQ_send(h->mq, ev);
841     wc->callback = NULL;
842     wc->callback_cls = NULL;
843   }
844   if(NULL != wc->ev)
845   {
846     GNUNET_MQ_send_cancel(wc->ev);
847     wc->ev = NULL;
848   }
849   GNUNET_CONTAINER_multihashmap_remove(h->watches, &wc->keyhash, wc);
850   GNUNET_free(wc);
851
852 }
853
854 /**
855  * Request watching a given key
856  * User will be notified with any new values added to key
857  *
858  * @param h handle to the PEERSTORE service
859  * @param sub_system name of sub system
860  * @param peer Peer identity
861  * @param key entry key string
862  * @param callback function called with each new value
863  * @param callback_cls closure for @a callback
864  * @return Handle to watch request
865  */
866 struct GNUNET_PEERSTORE_WatchContext *
867 GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h,
868     const char *sub_system,
869     const struct GNUNET_PeerIdentity *peer,
870     const char *key,
871     GNUNET_PEERSTORE_Processor callback, void *callback_cls)
872 {
873   struct GNUNET_MQ_Envelope *ev;
874   struct StoreKeyHashMessage *hm;
875   struct GNUNET_PEERSTORE_WatchContext *wc;
876
877   GNUNET_assert(NULL != sub_system);
878   GNUNET_assert(NULL != peer);
879   GNUNET_assert(NULL != key);
880   ev = GNUNET_MQ_msg(hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
881   PEERSTORE_hash_key(sub_system, peer, key, &hm->keyhash);
882   wc = GNUNET_new(struct GNUNET_PEERSTORE_WatchContext);
883   wc->callback = callback;
884   wc->callback_cls = callback_cls;
885   wc->ev = ev;
886   wc->h = h;
887   wc->request_sent = GNUNET_NO;
888   wc->keyhash = hm->keyhash;
889   if(NULL == h->watches)
890     h->watches = GNUNET_CONTAINER_multihashmap_create(5, GNUNET_NO);
891   GNUNET_CONTAINER_multihashmap_put(h->watches, &wc->keyhash,
892       wc, GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
893   LOG(GNUNET_ERROR_TYPE_DEBUG,
894       "Sending a watch request for ss `%s', peer `%s', key `%s'.\n",
895       sub_system, GNUNET_i2s(peer), key);
896   GNUNET_MQ_notify_sent(ev, &watch_request_sent, wc);
897   GNUNET_MQ_send(h->mq, ev);
898   return wc;
899 }
900
901 /* end of peerstore_api.c */