- moved timeout handling responsibility from for nat tests from caller to the library
[oweals/gnunet.git] / src / peerstore / peerstore_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file peerstore/peerstore_api.c
23  * @brief API for peerstore
24  * @author Omar Tarabai
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "peerstore.h"
29 #include "peerstore_common.h"
30
31 #define LOG(kind,...) GNUNET_log_from (kind, "peerstore-api",__VA_ARGS__)
32
33 /******************************************************************************/
34 /************************      DATA STRUCTURES     ****************************/
35 /******************************************************************************/
36
37 /**
38  * Handle to the PEERSTORE service.
39  */
40 struct GNUNET_PEERSTORE_Handle
41 {
42
43   /**
44    * Our configuration.
45    */
46   const struct GNUNET_CONFIGURATION_Handle *cfg;
47
48   /**
49    * Connection to the service.
50    */
51   struct GNUNET_CLIENT_Connection *client;
52
53   /**
54    * Message queue
55    */
56   struct GNUNET_MQ_Handle *mq;
57
58   /**
59    * Head of active STORE requests.
60    */
61   struct GNUNET_PEERSTORE_StoreContext *store_head;
62
63   /**
64    * Tail of active STORE requests.
65    */
66   struct GNUNET_PEERSTORE_StoreContext *store_tail;
67
68   /**
69    * Head of active ITERATE requests.
70    */
71   struct GNUNET_PEERSTORE_IterateContext *iterate_head;
72
73   /**
74    * Tail of active ITERATE requests.
75    */
76   struct GNUNET_PEERSTORE_IterateContext *iterate_tail;
77
78   /**
79    * Hashmap of watch requests
80    */
81   struct GNUNET_CONTAINER_MultiHashMap *watches;
82
83 };
84
85 /**
86  * Context for a store request
87  */
88 struct GNUNET_PEERSTORE_StoreContext
89 {
90   /**
91    * Kept in a DLL.
92    */
93   struct GNUNET_PEERSTORE_StoreContext *next;
94
95   /**
96    * Kept in a DLL.
97    */
98   struct GNUNET_PEERSTORE_StoreContext *prev;
99
100   /**
101    * Handle to the PEERSTORE service.
102    */
103   struct GNUNET_PEERSTORE_Handle *h;
104
105   /**
106    * MQ Envelope with store request message
107    */
108   struct GNUNET_MQ_Envelope *ev;
109
110   /**
111    * Continuation called with service response
112    */
113   GNUNET_PEERSTORE_Continuation cont;
114
115   /**
116    * Closure for 'cont'
117    */
118   void *cont_cls;
119
120 };
121
122 /**
123  * Context for a iterate request
124  */
125 struct GNUNET_PEERSTORE_IterateContext
126 {
127   /**
128    * Kept in a DLL.
129    */
130   struct GNUNET_PEERSTORE_IterateContext *next;
131
132   /**
133    * Kept in a DLL.
134    */
135   struct GNUNET_PEERSTORE_IterateContext *prev;
136
137   /**
138    * Handle to the PEERSTORE service.
139    */
140   struct GNUNET_PEERSTORE_Handle *h;
141
142   /**
143    * MQ Envelope with iterate request message
144    */
145   struct GNUNET_MQ_Envelope *ev;
146
147   /**
148    * Callback with each matching record
149    */
150   GNUNET_PEERSTORE_Processor callback;
151
152   /**
153    * Closure for 'callback'
154    */
155   void *callback_cls;
156
157   /**
158    * #GNUNET_YES / #GNUNET_NO
159    * if sent, cannot be canceled
160    */
161   int request_sent;
162
163   /**
164    * Task identifier for the function called
165    * on iterate request timeout
166    */
167   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
168
169 };
170
171 /**
172  * Context for a watch request
173  */
174 struct GNUNET_PEERSTORE_WatchContext
175 {
176   /**
177    * Kept in a DLL.
178    */
179   struct GNUNET_PEERSTORE_WatchContext *next;
180
181   /**
182    * Kept in a DLL.
183    */
184   struct GNUNET_PEERSTORE_WatchContext *prev;
185
186   /**
187    * Handle to the PEERSTORE service.
188    */
189   struct GNUNET_PEERSTORE_Handle *h;
190
191   /**
192    * MQ Envelope with watch request message
193    */
194   struct GNUNET_MQ_Envelope *ev;
195
196   /**
197    * Callback with each record received
198    */
199   GNUNET_PEERSTORE_Processor callback;
200
201   /**
202    * Closure for 'callback'
203    */
204   void *callback_cls;
205
206   /**
207    * Hash of the combined key
208    */
209   struct GNUNET_HashCode keyhash;
210
211   /**
212    * #GNUNET_YES / #GNUNET_NO
213    * if sent, cannot be canceled
214    */
215   int request_sent;
216
217 };
218
219 /******************************************************************************/
220 /*******************             DECLARATIONS             *********************/
221 /******************************************************************************/
222
223 /**
224  * When a response for iterate request is received
225  *
226  * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
227  * @param msg message received, NULL on timeout or fatal error
228  */
229 void handle_iterate_result (void *cls, const struct GNUNET_MessageHeader *msg);
230
231 /**
232  * When a watch record is received
233  *
234  * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
235  * @param msg message received, NULL on timeout or fatal error
236  */
237 void handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg);
238
239 /**
240  * Close the existing connection to PEERSTORE and reconnect.
241  *
242  * @param h handle to the service
243  */
244 static void
245 reconnect (struct GNUNET_PEERSTORE_Handle *h);
246
247 /**
248  * Callback after MQ envelope is sent
249  *
250  * @param cls a 'struct GNUNET_PEERSTORE_WatchContext *'
251  */
252 void watch_request_sent (void *cls);
253
254 /**
255  * Callback after MQ envelope is sent
256  *
257  * @param cls a 'struct GNUNET_PEERSTORE_IterateContext *'
258  */
259 void iterate_request_sent (void *cls);
260
261 /**
262  * Callback after MQ envelope is sent
263  *
264  * @param cls a 'struct GNUNET_PEERSTORE_StoreContext *'
265  */
266 void store_request_sent (void *cls);
267
268 /**
269  * MQ message handlers
270  */
271 static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
272     {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD, 0},
273     {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END, sizeof(struct GNUNET_MessageHeader)},
274     {&handle_watch_result, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD, 0},
275     GNUNET_MQ_HANDLERS_END
276 };
277
278 /******************************************************************************/
279 /*******************         CONNECTION FUNCTIONS         *********************/
280 /******************************************************************************/
281
282 static void
283 handle_client_error (void *cls, enum GNUNET_MQ_Error error)
284 {
285   struct GNUNET_PEERSTORE_Handle *h = cls;
286
287   LOG(GNUNET_ERROR_TYPE_ERROR, "Received an error notification from MQ of type: %d\n", error);
288   reconnect(h);
289 }
290
291 /**
292  * Iterator over previous watches to resend them
293  */
294 int rewatch_it(void *cls,
295     const struct GNUNET_HashCode *key,
296     void *value)
297 {
298   struct GNUNET_PEERSTORE_Handle *h = cls;
299   struct GNUNET_PEERSTORE_WatchContext *wc = value;
300   struct StoreKeyHashMessage *hm;
301
302   if(GNUNET_YES == wc->request_sent)
303   { /* Envelope gone, create new one. */
304     wc->ev = GNUNET_MQ_msg(hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
305     hm->keyhash = wc->keyhash;
306     wc->request_sent = GNUNET_NO;
307   }
308   GNUNET_MQ_notify_sent(wc->ev, &watch_request_sent, wc);
309   GNUNET_MQ_send(h->mq, wc->ev);
310   return GNUNET_YES;
311 }
312
313 /**
314  * Close the existing connection to PEERSTORE and reconnect.
315  *
316  * @param h handle to the service
317  */
318 static void
319 reconnect (struct GNUNET_PEERSTORE_Handle *h)
320 {
321   struct GNUNET_PEERSTORE_IterateContext *ic;
322   GNUNET_PEERSTORE_Processor icb;
323   void *icb_cls;
324   struct GNUNET_PEERSTORE_StoreContext *sc;
325
326   LOG(GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n");
327   if (NULL != h->mq)
328   {
329     GNUNET_MQ_destroy(h->mq);
330     h->mq = NULL;
331   }
332   if (NULL != h->client)
333   {
334     GNUNET_CLIENT_disconnect (h->client);
335     h->client = NULL;
336   }
337   h->client = GNUNET_CLIENT_connect ("peerstore", h->cfg);
338   GNUNET_assert(NULL != h->client);
339   h->mq = GNUNET_MQ_queue_for_connection_client(h->client,
340       mq_handlers,
341       &handle_client_error,
342       h);
343   LOG(GNUNET_ERROR_TYPE_DEBUG,
344       "Resending pending requests after reconnect.\n");
345   if (NULL != h->watches)
346   {
347     GNUNET_CONTAINER_multihashmap_iterate(h->watches,
348         &rewatch_it, h);
349   }
350   ic = h->iterate_head;
351   while (NULL != ic)
352   {
353     if (GNUNET_YES == ic->request_sent)
354     {
355       icb = ic->callback;
356       icb_cls = ic->callback_cls;
357       GNUNET_PEERSTORE_iterate_cancel(ic);
358       if(NULL != icb)
359         icb(icb_cls, NULL,_("Iteration canceled due to reconnection."));
360     }
361     else
362     {
363       GNUNET_MQ_notify_sent(ic->ev, &iterate_request_sent, ic);
364       GNUNET_MQ_send(h->mq, ic->ev);
365     }
366     ic = ic->next;
367   }
368   sc = h->store_head;
369   while (NULL != sc)
370   {
371     GNUNET_MQ_notify_sent(sc->ev, &store_request_sent, sc);
372     GNUNET_MQ_send(h->mq, sc->ev);
373     sc = sc->next;
374   }
375 }
376
377 /**
378  * Connect to the PEERSTORE service.
379  *
380  * @return NULL on error
381  */
382 struct GNUNET_PEERSTORE_Handle *
383 GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
384 {
385   struct GNUNET_PEERSTORE_Handle *h;
386
387   h = GNUNET_new (struct GNUNET_PEERSTORE_Handle);
388   h->client = GNUNET_CLIENT_connect ("peerstore", cfg);
389   if(NULL == h->client)
390   {
391     GNUNET_free(h);
392     return NULL;
393   }
394   h->cfg = cfg;
395   h->mq = GNUNET_MQ_queue_for_connection_client(h->client,
396       mq_handlers,
397       &handle_client_error,
398       h);
399   if(NULL == h->mq)
400   {
401     GNUNET_free(h);
402     return NULL;
403   }
404   LOG(GNUNET_ERROR_TYPE_DEBUG, "New connection created\n");
405   return h;
406 }
407
408 /**
409  * Disconnect from the PEERSTORE service
410  * Do not call in case of pending requests
411  *
412  * @param h handle to disconnect
413  */
414 void
415 GNUNET_PEERSTORE_disconnect(struct GNUNET_PEERSTORE_Handle *h)
416 {
417   LOG(GNUNET_ERROR_TYPE_DEBUG, "Disconnecting.\n");
418   if(NULL != h->watches)
419   {
420     GNUNET_CONTAINER_multihashmap_destroy(h->watches);
421     h->watches = NULL;
422   }
423   if(NULL != h->mq)
424   {
425     GNUNET_MQ_destroy(h->mq);
426     h->mq = NULL;
427   }
428   if (NULL != h->client)
429   {
430     GNUNET_CLIENT_disconnect (h->client);
431     h->client = NULL;
432   }
433   GNUNET_free(h);
434   LOG(GNUNET_ERROR_TYPE_DEBUG, "Disconnected, BYE!\n");
435 }
436
437
438 /******************************************************************************/
439 /*******************            STORE FUNCTIONS           *********************/
440 /******************************************************************************/
441
442 /**
443  * Callback after MQ envelope is sent
444  *
445  * @param cls a 'struct GNUNET_PEERSTORE_StoreContext *'
446  */
447 void store_request_sent (void *cls)
448 {
449   struct GNUNET_PEERSTORE_StoreContext *sc = cls;
450   GNUNET_PEERSTORE_Continuation cont;
451   void *cont_cls;
452
453   sc->ev = NULL;
454   cont = sc->cont;
455   cont_cls = sc->cont_cls;
456   GNUNET_PEERSTORE_store_cancel(sc);
457   if(NULL != cont)
458     cont(cont_cls, GNUNET_OK);
459 }
460
461 /**
462  * Cancel a store request
463  *
464  * @param sc Store request context
465  */
466 void
467 GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc)
468 {
469   if(NULL != sc->ev)
470   {
471     GNUNET_MQ_send_cancel(sc->ev);
472     sc->ev = NULL;
473   }
474   GNUNET_CONTAINER_DLL_remove(sc->h->store_head, sc->h->store_tail, sc);
475   GNUNET_free(sc);
476 }
477
478 /**
479  * Store a new entry in the PEERSTORE.
480  * Note that stored entries can be lost in some cases
481  * such as power failure.
482  *
483  * @param h Handle to the PEERSTORE service
484  * @param sub_system name of the sub system
485  * @param peer Peer Identity
486  * @param key entry key
487  * @param value entry value BLOB
488  * @param size size of 'value'
489  * @param expiry absolute time after which the entry is (possibly) deleted
490  * @param cont Continuation function after the store request is processed
491  * @param cont_cls Closure for 'cont'
492  */
493 struct GNUNET_PEERSTORE_StoreContext *
494 GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
495     const char *sub_system,
496     const struct GNUNET_PeerIdentity *peer,
497     const char *key,
498     const void *value,
499     size_t size,
500     struct GNUNET_TIME_Absolute expiry,
501     enum GNUNET_PEERSTORE_StoreOption options,
502     GNUNET_PEERSTORE_Continuation cont,
503     void *cont_cls)
504 {
505   struct GNUNET_MQ_Envelope *ev;
506   struct GNUNET_PEERSTORE_StoreContext *sc;
507
508   LOG (GNUNET_ERROR_TYPE_DEBUG,
509       "Storing value (size: %lu) for subsytem `%s', peer `%s', key `%s'\n",
510       size, sub_system, GNUNET_i2s (peer), key);
511   ev = PEERSTORE_create_record_mq_envelope(sub_system,
512       peer,
513       key,
514       value,
515       size,
516       &expiry,
517       options,
518       GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
519   sc = GNUNET_new(struct GNUNET_PEERSTORE_StoreContext);
520   sc->ev = ev;
521   sc->cont = cont;
522   sc->cont_cls = cont_cls;
523   sc->h = h;
524   GNUNET_CONTAINER_DLL_insert_tail(h->store_head, h->store_tail, sc);
525   GNUNET_MQ_notify_sent(ev, &store_request_sent, sc);
526   GNUNET_MQ_send(h->mq, ev);
527   return sc;
528
529 }
530
531 /******************************************************************************/
532 /*******************           ITERATE FUNCTIONS          *********************/
533 /******************************************************************************/
534
535 /**
536  * When a response for iterate request is received
537  *
538  * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
539  * @param msg message received, NULL on timeout or fatal error
540  */
541 void handle_iterate_result (void *cls, const struct GNUNET_MessageHeader *msg)
542 {
543   struct GNUNET_PEERSTORE_Handle *h = cls;
544   struct GNUNET_PEERSTORE_IterateContext *ic;
545   GNUNET_PEERSTORE_Processor callback;
546   void *callback_cls;
547   uint16_t msg_type;
548   struct GNUNET_PEERSTORE_Record *record;
549   int continue_iter;
550
551   ic = h->iterate_head;
552   if(NULL == ic)
553   {
554     LOG(GNUNET_ERROR_TYPE_ERROR, "Unexpected iteration response, this should not happen.\n");
555     reconnect(h);
556     return;
557   }
558   callback = ic->callback;
559   callback_cls = ic->callback_cls;
560   if(NULL == msg) /* Connection error */
561   {
562
563     if(NULL != callback)
564       callback(callback_cls, NULL,
565           _("Error communicating with `PEERSTORE' service."));
566     reconnect(h);
567     return;
568   }
569   msg_type = ntohs(msg->type);
570   if(GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END == msg_type)
571   {
572     ic->request_sent = GNUNET_NO;
573     GNUNET_PEERSTORE_iterate_cancel(ic);
574     if(NULL != callback)
575       callback(callback_cls, NULL, NULL);
576     return;
577   }
578   if(NULL != callback)
579   {
580     record = PEERSTORE_parse_record_message(msg);
581     if(NULL == record)
582       continue_iter = callback(callback_cls, NULL, _("Received a malformed response from service."));
583     else
584     {
585       continue_iter = callback(callback_cls, record, NULL);
586       PEERSTORE_destroy_record(record);
587     }
588     if(GNUNET_NO == continue_iter)
589       ic->callback = NULL;
590   }
591
592 }
593
594 /**
595  * Callback after MQ envelope is sent
596  *
597  * @param cls a 'struct GNUNET_PEERSTORE_IterateContext *'
598  */
599 void iterate_request_sent (void *cls)
600 {
601   struct GNUNET_PEERSTORE_IterateContext *ic = cls;
602
603   LOG(GNUNET_ERROR_TYPE_DEBUG, "Iterate request sent to service.\n");
604   ic->request_sent = GNUNET_YES;
605   ic->ev = NULL;
606 }
607
608 /**
609  * Called when the iterate request is timedout
610  *
611  * @param cls a 'struct GNUNET_PEERSTORE_IterateContext *'
612  */
613 void iterate_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
614 {
615   struct GNUNET_PEERSTORE_IterateContext *ic = cls;
616
617   ic->timeout_task = GNUNET_SCHEDULER_NO_TASK;
618   GNUNET_PEERSTORE_iterate_cancel(ic);
619 }
620
621 /**
622  * Cancel an iterate request
623  * Please do not call after the iterate request is done
624  *
625  * @param ic Iterate request context as returned by GNUNET_PEERSTORE_iterate()
626  */
627 void
628 GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic)
629 {
630   if(GNUNET_SCHEDULER_NO_TASK != ic->timeout_task)
631   {
632     GNUNET_SCHEDULER_cancel(ic->timeout_task);
633     ic->timeout_task = GNUNET_SCHEDULER_NO_TASK;
634   }
635   if(GNUNET_NO == ic->request_sent)
636   {
637     if(NULL != ic->ev)
638     {
639       GNUNET_MQ_send_cancel(ic->ev);
640       ic->ev = NULL;
641     }
642     GNUNET_CONTAINER_DLL_remove(ic->h->iterate_head, ic->h->iterate_tail, ic);
643     GNUNET_free(ic);
644   }
645   else
646     ic->callback = NULL;
647 }
648
649 /**
650  * Iterate over records matching supplied key information
651  *
652  * @param h handle to the PEERSTORE service
653  * @param sub_system name of sub system
654  * @param peer Peer identity (can be NULL)
655  * @param key entry key string (can be NULL)
656  * @param timeout time after which the iterate request is canceled
657  * @param callback function called with each matching record, all NULL's on end
658  * @param callback_cls closure for @a callback
659  * @return Handle to iteration request
660  */
661 struct GNUNET_PEERSTORE_IterateContext *
662 GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
663     const char *sub_system,
664     const struct GNUNET_PeerIdentity *peer,
665     const char *key,
666     struct GNUNET_TIME_Relative timeout,
667     GNUNET_PEERSTORE_Processor callback, void *callback_cls)
668 {
669   struct GNUNET_MQ_Envelope *ev;
670   struct GNUNET_PEERSTORE_IterateContext *ic;
671
672   ev = PEERSTORE_create_record_mq_envelope(sub_system,
673       peer,
674       key,
675       NULL,
676       0,
677       NULL,
678       0,
679       GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
680   ic = GNUNET_new(struct GNUNET_PEERSTORE_IterateContext);
681   ic->callback = callback;
682   ic->callback_cls = callback_cls;
683   ic->ev = ev;
684   ic->h = h;
685   ic->request_sent = GNUNET_NO;
686   GNUNET_CONTAINER_DLL_insert_tail(h->iterate_head, h->iterate_tail, ic);
687   LOG(GNUNET_ERROR_TYPE_DEBUG,
688         "Sending an iterate request for sub system `%s'\n", sub_system);
689   GNUNET_MQ_notify_sent(ev, &iterate_request_sent, ic);
690   GNUNET_MQ_send(h->mq, ev);
691   ic->timeout_task = GNUNET_SCHEDULER_add_delayed(timeout, &iterate_timeout, ic);
692   return ic;
693 }
694
695 /******************************************************************************/
696 /*******************            WATCH FUNCTIONS           *********************/
697 /******************************************************************************/
698
699 /**
700  * When a watch record is received
701  *
702  * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
703  * @param msg message received, NULL on timeout or fatal error
704  */
705 void handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg)
706 {
707   struct GNUNET_PEERSTORE_Handle *h = cls;
708   struct GNUNET_PEERSTORE_Record *record;
709   struct GNUNET_HashCode keyhash;
710   struct GNUNET_PEERSTORE_WatchContext *wc;
711
712   if(NULL == msg)
713   {
714     LOG(GNUNET_ERROR_TYPE_ERROR,
715         "Problem receiving a watch response, no way to determine which request.\n");
716     reconnect(h);
717     return;
718   }
719   LOG(GNUNET_ERROR_TYPE_DEBUG, "Received a watch record from service.\n");
720   record = PEERSTORE_parse_record_message(msg);
721   PEERSTORE_hash_key(record->sub_system,
722       record->peer, record->key, &keyhash);
723   wc = GNUNET_CONTAINER_multihashmap_get(h->watches, &keyhash);
724   if(NULL != wc->callback)
725     wc->callback(wc->callback_cls, record, NULL);
726   PEERSTORE_destroy_record(record);
727 }
728
729 /**
730  * Callback after MQ envelope is sent
731  *
732  * @param cls a 'struct GNUNET_PEERSTORE_WatchContext *'
733  */
734 void watch_request_sent (void *cls)
735 {
736   struct GNUNET_PEERSTORE_WatchContext *wc = cls;
737
738   wc->request_sent = GNUNET_YES;
739   wc->ev = NULL;
740 }
741
742 /**
743  * Cancel a watch request
744  *
745  * @wc handle to the watch request
746  */
747 void
748 GNUNET_PEERSTORE_watch_cancel(struct GNUNET_PEERSTORE_WatchContext *wc)
749 {
750   struct GNUNET_PEERSTORE_Handle *h = wc->h;
751   struct GNUNET_MQ_Envelope *ev;
752   struct StoreKeyHashMessage *hm;
753
754   LOG(GNUNET_ERROR_TYPE_DEBUG, "Canceling watch.\n");
755   if(GNUNET_YES == wc->request_sent) /* If request already sent to service, send a cancel request. */
756   {
757     ev = GNUNET_MQ_msg(hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL);
758     GNUNET_MQ_send(h->mq, ev);
759     wc->callback = NULL;
760     wc->callback_cls = NULL;
761   }
762   if(NULL != wc->ev)
763   {
764     GNUNET_MQ_send_cancel(wc->ev);
765     wc->ev = NULL;
766   }
767   GNUNET_CONTAINER_multihashmap_remove(h->watches, &wc->keyhash, wc);
768   GNUNET_free(wc);
769
770 }
771
772 /**
773  * Request watching a given key
774  * User will be notified with any new values added to key
775  *
776  * @param h handle to the PEERSTORE service
777  * @param sub_system name of sub system
778  * @param peer Peer identity
779  * @param key entry key string
780  * @param callback function called with each new value
781  * @param callback_cls closure for @a callback
782  * @return Handle to watch request
783  */
784 struct GNUNET_PEERSTORE_WatchContext *
785 GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h,
786     const char *sub_system,
787     const struct GNUNET_PeerIdentity *peer,
788     const char *key,
789     GNUNET_PEERSTORE_Processor callback, void *callback_cls)
790 {
791   struct GNUNET_MQ_Envelope *ev;
792   struct StoreKeyHashMessage *hm;
793   struct GNUNET_PEERSTORE_WatchContext *wc;
794
795   ev = GNUNET_MQ_msg(hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
796   PEERSTORE_hash_key(sub_system, peer, key, &hm->keyhash);
797   wc = GNUNET_new(struct GNUNET_PEERSTORE_WatchContext);
798   wc->callback = callback;
799   wc->callback_cls = callback_cls;
800   wc->ev = ev;
801   wc->h = h;
802   wc->request_sent = GNUNET_NO;
803   wc->keyhash = hm->keyhash;
804   if(NULL == h->watches)
805     h->watches = GNUNET_CONTAINER_multihashmap_create(5, GNUNET_NO);
806   GNUNET_CONTAINER_multihashmap_put(h->watches, &wc->keyhash,
807       wc, GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
808   LOG(GNUNET_ERROR_TYPE_DEBUG,
809       "Sending a watch request for ss `%s', peer `%s', key `%s'.\n",
810       sub_system, GNUNET_i2s(peer), key);
811   GNUNET_MQ_notify_sent(ev, &watch_request_sent, wc);
812   GNUNET_MQ_send(h->mq, ev);
813   return wc;
814 }
815
816 /* end of peerstore_api.c */