c9a68f4bf1d4e455459fec5aee2d3ad1829d137b
[oweals/gnunet.git] / src / peerstore / peerstore_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file peerstore/peerstore_api.c
23  * @brief API for peerstore
24  * @author Omar Tarabai
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "peerstore.h"
29 #include "peerstore_common.h"
30
31 #define LOG(kind,...) GNUNET_log_from (kind, "peerstore-api",__VA_ARGS__)
32
33 /******************************************************************************/
34 /************************      DATA STRUCTURES     ****************************/
35 /******************************************************************************/
36
37 /**
38  * Handle to the PEERSTORE service.
39  */
40 struct GNUNET_PEERSTORE_Handle
41 {
42
43   /**
44    * Our configuration.
45    */
46   const struct GNUNET_CONFIGURATION_Handle *cfg;
47
48   /**
49    * Connection to the service.
50    */
51   struct GNUNET_CLIENT_Connection *client;
52
53   /**
54    * Message queue
55    */
56   struct GNUNET_MQ_Handle *mq;
57
58   /**
59    * Head of active STORE requests.
60    */
61   struct GNUNET_PEERSTORE_StoreContext *store_head;
62
63   /**
64    * Tail of active STORE requests.
65    */
66   struct GNUNET_PEERSTORE_StoreContext *store_tail;
67
68   /**
69    * Head of active ITERATE requests.
70    */
71   struct GNUNET_PEERSTORE_IterateContext *iterate_head;
72
73   /**
74    * Tail of active ITERATE requests.
75    */
76   struct GNUNET_PEERSTORE_IterateContext *iterate_tail;
77
78   /**
79    * Hashmap of watch requests
80    */
81   struct GNUNET_CONTAINER_MultiHashMap *watches;
82
83 };
84
85 /**
86  * Context for a store request
87  */
88 struct GNUNET_PEERSTORE_StoreContext
89 {
90   /**
91    * Kept in a DLL.
92    */
93   struct GNUNET_PEERSTORE_StoreContext *next;
94
95   /**
96    * Kept in a DLL.
97    */
98   struct GNUNET_PEERSTORE_StoreContext *prev;
99
100   /**
101    * Handle to the PEERSTORE service.
102    */
103   struct GNUNET_PEERSTORE_Handle *h;
104
105   /**
106    * MQ Envelope with store request message
107    */
108   struct GNUNET_MQ_Envelope *ev;
109
110   /**
111    * Continuation called with service response
112    */
113   GNUNET_PEERSTORE_Continuation cont;
114
115   /**
116    * Closure for 'cont'
117    */
118   void *cont_cls;
119
120   /**
121    * #GNUNET_YES / #GNUNET_NO
122    * if sent, cannot be canceled
123    */
124   int request_sent;
125
126 };
127
128 /**
129  * Context for a iterate request
130  */
131 struct GNUNET_PEERSTORE_IterateContext
132 {
133   /**
134    * Kept in a DLL.
135    */
136   struct GNUNET_PEERSTORE_IterateContext *next;
137
138   /**
139    * Kept in a DLL.
140    */
141   struct GNUNET_PEERSTORE_IterateContext *prev;
142
143   /**
144    * Handle to the PEERSTORE service.
145    */
146   struct GNUNET_PEERSTORE_Handle *h;
147
148   /**
149    * MQ Envelope with iterate request message
150    */
151   struct GNUNET_MQ_Envelope *ev;
152
153   /**
154    * Callback with each matching record
155    */
156   GNUNET_PEERSTORE_Processor callback;
157
158   /**
159    * Closure for 'callback'
160    */
161   void *callback_cls;
162
163   /**
164    * #GNUNET_YES / #GNUNET_NO
165    * if sent, cannot be canceled
166    */
167   int request_sent;
168
169   /**
170    * Task identifier for the function called
171    * on iterate request timeout
172    */
173   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
174
175 };
176
177 /**
178  * Context for a watch request
179  */
180 struct GNUNET_PEERSTORE_WatchContext
181 {
182   /**
183    * Kept in a DLL.
184    */
185   struct GNUNET_PEERSTORE_WatchContext *next;
186
187   /**
188    * Kept in a DLL.
189    */
190   struct GNUNET_PEERSTORE_WatchContext *prev;
191
192   /**
193    * Handle to the PEERSTORE service.
194    */
195   struct GNUNET_PEERSTORE_Handle *h;
196
197   /**
198    * MQ Envelope with watch request message
199    */
200   struct GNUNET_MQ_Envelope *ev;
201
202   /**
203    * Callback with each record received
204    */
205   GNUNET_PEERSTORE_Processor callback;
206
207   /**
208    * Closure for 'callback'
209    */
210   void *callback_cls;
211
212   /**
213    * Hash of the combined key
214    */
215   struct GNUNET_HashCode keyhash;
216
217   /**
218    * #GNUNET_YES / #GNUNET_NO
219    * if sent, cannot be canceled
220    */
221   int request_sent;
222
223 };
224
225 /******************************************************************************/
226 /*******************             DECLARATIONS             *********************/
227 /******************************************************************************/
228
229 /**
230  * When a response for store request is received
231  *
232  * @param cls a 'struct GNUNET_PEERSTORE_StoreContext *'
233  * @param msg message received, NULL on timeout or fatal error
234  */
235 void handle_store_result (void *cls, const struct GNUNET_MessageHeader *msg);
236
237 /**
238  * When a response for iterate request is received
239  *
240  * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
241  * @param msg message received, NULL on timeout or fatal error
242  */
243 void handle_iterate_result (void *cls, const struct GNUNET_MessageHeader *msg);
244
245 /**
246  * When a watch record is received
247  *
248  * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
249  * @param msg message received, NULL on timeout or fatal error
250  */
251 void handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg);
252
253 /**
254  * Close the existing connection to PEERSTORE and reconnect.
255  *
256  * @param h handle to the service
257  */
258 static void
259 reconnect (struct GNUNET_PEERSTORE_Handle *h);
260
261 /**
262  * MQ message handlers
263  */
264 static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
265     {&handle_store_result, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK, sizeof(struct GNUNET_MessageHeader)},
266     {&handle_store_result, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_FAIL, sizeof(struct GNUNET_MessageHeader)},
267     {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD, 0},
268     {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END, sizeof(struct GNUNET_MessageHeader)},
269     {&handle_watch_result, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD, 0},
270     GNUNET_MQ_HANDLERS_END
271 };
272
273 /******************************************************************************/
274 /*******************         CONNECTION FUNCTIONS         *********************/
275 /******************************************************************************/
276
277 static void
278 handle_client_error (void *cls, enum GNUNET_MQ_Error error)
279 {
280   struct GNUNET_PEERSTORE_Handle *h = cls;
281
282   LOG(GNUNET_ERROR_TYPE_ERROR, "Received an error notification from MQ of type: %d\n", error);
283   reconnect(h);
284 }
285
286 /**
287  * Close the existing connection to PEERSTORE and reconnect.
288  *
289  * @param h handle to the service
290  */
291 static void
292 reconnect (struct GNUNET_PEERSTORE_Handle *h)
293 {
294   LOG(GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n");
295   if (NULL != h->mq)
296   {
297     GNUNET_MQ_destroy(h->mq);
298     h->mq = NULL;
299   }
300   if (NULL != h->client)
301   {
302     GNUNET_CLIENT_disconnect (h->client);
303     h->client = NULL;
304   }
305   h->client = GNUNET_CLIENT_connect ("peerstore", h->cfg);
306   //FIXME: retry connecting if fails again (client == NULL)
307   h->mq = GNUNET_MQ_queue_for_connection_client(h->client,
308       mq_handlers,
309       &handle_client_error,
310       h);
311   //FIXME: resend pending requests after reconnecting
312
313 }
314
315 /**
316  * Connect to the PEERSTORE service.
317  *
318  * @return NULL on error
319  */
320 struct GNUNET_PEERSTORE_Handle *
321 GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
322 {
323   struct GNUNET_PEERSTORE_Handle *h;
324
325   h = GNUNET_new (struct GNUNET_PEERSTORE_Handle);
326   h->client = GNUNET_CLIENT_connect ("peerstore", cfg);
327   if(NULL == h->client)
328   {
329     GNUNET_free(h);
330     return NULL;
331   }
332   h->cfg = cfg;
333   h->mq = GNUNET_MQ_queue_for_connection_client(h->client,
334       mq_handlers,
335       &handle_client_error,
336       h);
337   if(NULL == h->mq)
338   {
339     GNUNET_free(h);
340     return NULL;
341   }
342   LOG(GNUNET_ERROR_TYPE_DEBUG, "New connection created\n");
343   return h;
344 }
345
346 /**
347  * Disconnect from the PEERSTORE service
348  * Do not call in case of pending requests
349  *
350  * @param h handle to disconnect
351  */
352 void
353 GNUNET_PEERSTORE_disconnect(struct GNUNET_PEERSTORE_Handle *h)
354 {
355   if(NULL != h->watches)
356   {
357     GNUNET_CONTAINER_multihashmap_destroy(h->watches);
358     h->watches = NULL;
359   }
360   if(NULL != h->mq)
361   {
362     GNUNET_MQ_destroy(h->mq);
363     h->mq = NULL;
364   }
365   if (NULL != h->client)
366   {
367     GNUNET_CLIENT_disconnect (h->client);
368     h->client = NULL;
369   }
370   GNUNET_free(h);
371   LOG(GNUNET_ERROR_TYPE_DEBUG, "Disconnected, BYE!\n");
372 }
373
374
375 /******************************************************************************/
376 /*******************            STORE FUNCTIONS           *********************/
377 /******************************************************************************/
378
379 /**
380  * When a response for store request is received
381  *
382  * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
383  * @param msg message received, NULL on timeout or fatal error
384  */
385 void handle_store_result (void *cls, const struct GNUNET_MessageHeader *msg)
386 {
387   struct GNUNET_PEERSTORE_Handle *h = cls;
388   struct GNUNET_PEERSTORE_StoreContext *sc;
389   uint16_t msg_type;
390   GNUNET_PEERSTORE_Continuation cont;
391   void *cont_cls;
392
393   sc = h->store_head;
394   if(NULL == sc)
395   {
396     LOG(GNUNET_ERROR_TYPE_ERROR, "Unexpected store response, this should not happen.\n");
397     reconnect(h);
398     return;
399   }
400   cont = sc->cont;
401   cont_cls = sc->cont_cls;
402   GNUNET_CONTAINER_DLL_remove(h->store_head, h->store_tail, sc);
403   GNUNET_free(sc);
404   if(NULL == msg) /* Connection error */
405   {
406     if(NULL != cont)
407       cont(cont_cls, GNUNET_SYSERR);
408     reconnect(h);
409     return;
410   }
411   if(NULL != cont) /* Run continuation */
412   {
413     msg_type = ntohs(msg->type);
414     if(GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK == msg_type)
415       cont(cont_cls, GNUNET_OK);
416     else if(GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_FAIL == msg_type)
417       cont(cont_cls, GNUNET_SYSERR);
418   }
419
420 }
421
422 /**
423  * Callback after MQ envelope is sent
424  *
425  * @param cls a 'struct GNUNET_PEERSTORE_StoreContext *'
426  */
427 void store_request_sent (void *cls)
428 {
429   struct GNUNET_PEERSTORE_StoreContext *sc = cls;
430
431   sc->request_sent = GNUNET_YES;
432   sc->ev = NULL;
433 }
434
435 /**
436  * Cancel a store request
437  *
438  * @param sc Store request context
439  */
440 void
441 GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc)
442 {
443   LOG(GNUNET_ERROR_TYPE_DEBUG,
444           "Canceling store request.\n");
445   if(GNUNET_NO == sc->request_sent)
446   {
447     if(NULL != sc->ev)
448     {
449       GNUNET_MQ_send_cancel(sc->ev);
450       sc->ev = NULL;
451     }
452     GNUNET_CONTAINER_DLL_remove(sc->h->store_head, sc->h->store_tail, sc);
453     GNUNET_free(sc);
454   }
455   else
456   { /* request already sent, will have to wait for response */
457     sc->cont = NULL;
458   }
459
460 }
461
462 /**
463  * Store a new entry in the PEERSTORE
464  *
465  * @param h Handle to the PEERSTORE service
466  * @param sub_system name of the sub system
467  * @param peer Peer Identity
468  * @param key entry key
469  * @param value entry value BLOB
470  * @param size size of 'value'
471  * @param expiry absolute time after which the entry is (possibly) deleted
472  * @param cont Continuation function after the store request is processed
473  * @param cont_cls Closure for 'cont'
474  */
475 struct GNUNET_PEERSTORE_StoreContext *
476 GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
477     const char *sub_system,
478     const struct GNUNET_PeerIdentity *peer,
479     const char *key,
480     const void *value,
481     size_t size,
482     struct GNUNET_TIME_Absolute expiry,
483     GNUNET_PEERSTORE_Continuation cont,
484     void *cont_cls)
485 {
486   struct GNUNET_MQ_Envelope *ev; //FIXME: add 'replace' flag in store function (similar to multihashmap)
487   struct GNUNET_PEERSTORE_StoreContext *sc;
488
489   LOG (GNUNET_ERROR_TYPE_DEBUG,
490       "Storing value (size: %lu) for subsytem `%s', peer `%s', key `%s'\n",
491       size, sub_system, GNUNET_i2s (peer), key);
492   ev = PEERSTORE_create_record_mq_envelope(sub_system,
493       peer,
494       key,
495       value,
496       size,
497       &expiry,
498       GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
499   sc = GNUNET_new(struct GNUNET_PEERSTORE_StoreContext);
500   sc->ev = ev;
501   sc->cont = cont;
502   sc->cont_cls = cont_cls;
503   sc->h = h;
504   sc->request_sent = GNUNET_NO;
505   GNUNET_CONTAINER_DLL_insert(h->store_head, h->store_tail, sc);
506   GNUNET_MQ_notify_sent(ev, &store_request_sent, sc);
507   GNUNET_MQ_send(h->mq, ev);
508   return sc;
509
510 }
511
512 /******************************************************************************/
513 /*******************           ITERATE FUNCTIONS          *********************/
514 /******************************************************************************/
515
516 /**
517  * When a response for iterate request is received
518  *
519  * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
520  * @param msg message received, NULL on timeout or fatal error
521  */
522 void handle_iterate_result (void *cls, const struct GNUNET_MessageHeader *msg)
523 {
524   struct GNUNET_PEERSTORE_Handle *h = cls;
525   struct GNUNET_PEERSTORE_IterateContext *ic;
526   GNUNET_PEERSTORE_Processor callback;
527   void *callback_cls;
528   uint16_t msg_type;
529   struct GNUNET_PEERSTORE_Record *record;
530   int continue_iter;
531
532   ic = h->iterate_head;
533   if(NULL == ic)
534   {
535     LOG(GNUNET_ERROR_TYPE_ERROR, "Unexpected iteration response, this should not happen.\n");
536     reconnect(h);
537     return;
538   }
539   callback = ic->callback;
540   callback_cls = ic->callback_cls;
541   if(NULL == msg) /* Connection error */
542   {
543
544     if(NULL != callback)
545       callback(callback_cls, NULL,
546           _("Error communicating with `PEERSTORE' service."));
547     reconnect(h);
548     return;
549   }
550   msg_type = ntohs(msg->type);
551   if(GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END == msg_type)
552   {
553     GNUNET_PEERSTORE_iterate_cancel(ic);
554     if(NULL != callback)
555       callback(callback_cls, NULL, NULL);
556     return;
557   }
558   if(NULL != callback)
559   {
560     record = PEERSTORE_parse_record_message(msg);
561     if(NULL == record)
562       continue_iter = callback(callback_cls, record, _("Received a malformed response from service."));
563     else
564       continue_iter = callback(callback_cls, record, NULL);
565     if(GNUNET_NO == continue_iter)
566       ic->callback = NULL;
567   }
568
569 }
570
571 /**
572  * Callback after MQ envelope is sent
573  *
574  * @param cls a 'struct GNUNET_PEERSTORE_IterateContext *'
575  */
576 void iterate_request_sent (void *cls)
577 {
578   struct GNUNET_PEERSTORE_IterateContext *ic = cls;
579
580   LOG(GNUNET_ERROR_TYPE_DEBUG, "Iterate request sent to service.\n");
581   ic->request_sent = GNUNET_YES;
582   ic->ev = NULL;
583 }
584
585 /**
586  * Called when the iterate request is timedout
587  *
588  * @param cls a 'struct GNUNET_PEERSTORE_IterateContext *'
589  */
590 void iterate_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
591 {
592   struct GNUNET_PEERSTORE_IterateContext *ic = cls;
593
594   ic->timeout_task = GNUNET_SCHEDULER_NO_TASK;
595   GNUNET_PEERSTORE_iterate_cancel(ic);
596 }
597
598 /**
599  * Cancel an iterate request
600  * Please do not call after the iterate request is done
601  *
602  * @param ic Iterate request context as returned by GNUNET_PEERSTORE_iterate()
603  */
604 void
605 GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic)
606 {
607   LOG(GNUNET_ERROR_TYPE_DEBUG, "Canceling iterate request.\n");
608   if(GNUNET_SCHEDULER_NO_TASK != ic->timeout_task)
609   {
610     GNUNET_SCHEDULER_cancel(ic->timeout_task);
611     ic->timeout_task = GNUNET_SCHEDULER_NO_TASK;
612   }
613   if(GNUNET_NO == ic->request_sent)
614   {
615     if(NULL != ic->ev)
616     {
617       GNUNET_MQ_send_cancel(ic->ev);
618       ic->ev = NULL;
619     }
620     GNUNET_CONTAINER_DLL_remove(ic->h->iterate_head, ic->h->iterate_tail, ic);
621     GNUNET_free(ic);
622   }
623   else
624     ic->callback = NULL;
625 }
626
627 /**
628  * Iterate over records matching supplied key information
629  *
630  * @param h handle to the PEERSTORE service
631  * @param sub_system name of sub system
632  * @param peer Peer identity (can be NULL)
633  * @param key entry key string (can be NULL)
634  * @param timeout time after which the iterate request is canceled
635  * @param callback function called with each matching record, all NULL's on end
636  * @param callback_cls closure for @a callback
637  * @return Handle to iteration request
638  */
639 struct GNUNET_PEERSTORE_IterateContext *
640 GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
641     const char *sub_system,
642     const struct GNUNET_PeerIdentity *peer,
643     const char *key,
644     struct GNUNET_TIME_Relative timeout,
645     GNUNET_PEERSTORE_Processor callback, void *callback_cls)
646 {
647   struct GNUNET_MQ_Envelope *ev;
648   struct GNUNET_PEERSTORE_IterateContext *ic;
649
650   ev = PEERSTORE_create_record_mq_envelope(sub_system,
651       peer,
652       key,
653       NULL,
654       0,
655       NULL,
656       GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
657   ic = GNUNET_new(struct GNUNET_PEERSTORE_IterateContext);
658   ic->callback = callback;
659   ic->callback_cls = callback_cls;
660   ic->ev = ev;
661   ic->h = h;
662   ic->request_sent = GNUNET_NO;
663   GNUNET_CONTAINER_DLL_insert(h->iterate_head, h->iterate_tail, ic);
664   LOG(GNUNET_ERROR_TYPE_DEBUG,
665         "Sending an iterate request for sub system `%s'\n", sub_system);
666   GNUNET_MQ_notify_sent(ev, &iterate_request_sent, ic);
667   GNUNET_MQ_send(h->mq, ev);
668   ic->timeout_task = GNUNET_SCHEDULER_add_delayed(timeout, &iterate_timeout, ic);
669   return ic;
670 }
671
672 /******************************************************************************/
673 /*******************            WATCH FUNCTIONS           *********************/
674 /******************************************************************************/
675
676 /**
677  * When a watch record is received
678  *
679  * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
680  * @param msg message received, NULL on timeout or fatal error
681  */
682 void handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg)
683 {
684   /*struct GNUNET_PEERSTORE_Handle *h = cls;
685   struct GNUNET_PEERSTORE_WatchContext *wc;
686   GNUNET_PEERSTORE_Processor callback;
687   void *callback_cls;
688
689
690
691   struct GNUNET_PEERSTORE_IterateContext *ic;
692   uint16_t msg_type;
693   struct GNUNET_PEERSTORE_Record *record;
694   int continue_iter;
695
696   ic = h->iterate_head;
697   if(NULL == ic)
698   {
699     LOG(GNUNET_ERROR_TYPE_ERROR, "Unexpected iteration response, this should not happen.\n");
700     reconnect(h);
701     return;
702   }
703   callback = ic->callback;
704   callback_cls = ic->callback_cls;
705   if(NULL == msg) * Connection error *
706   {
707
708     if(NULL != callback)
709       callback(callback_cls, NULL,
710           _("Error communicating with `PEERSTORE' service."));
711     reconnect(h);
712     return;
713   }
714   msg_type = ntohs(msg->type);
715   if(GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END == msg_type)
716   {
717     GNUNET_PEERSTORE_iterate_cancel(ic);
718     if(NULL != callback)
719       callback(callback_cls, NULL, NULL);
720     return;
721   }
722   if(NULL != callback)
723   {
724     record = PEERSTORE_parse_record_message(msg);
725     if(NULL == record)
726       continue_iter = callback(callback_cls, record, _("Received a malformed response from service."));
727     else
728       continue_iter = callback(callback_cls, record, NULL);
729     if(GNUNET_NO == continue_iter)
730       ic->callback = NULL;
731   }*/
732 }
733
734 /**
735  * Callback after MQ envelope is sent
736  *
737  * @param cls a 'struct GNUNET_PEERSTORE_WatchContext *'
738  */
739 void watch_request_sent (void *cls)
740 {
741   struct GNUNET_PEERSTORE_WatchContext *wc = cls;
742
743   wc->request_sent = GNUNET_YES;
744   wc->ev = NULL;
745 }
746
747 /**
748  * Cancel a watch request
749  *
750  * @wc handle to the watch request
751  */
752 void
753 GNUNET_PEERSTORE_watch_cancel(struct GNUNET_PEERSTORE_WatchContext *wc)
754 {
755   struct GNUNET_PEERSTORE_Handle *h = wc->h;
756   struct GNUNET_MQ_Envelope *ev;
757   struct StoreKeyHashMessage *hm;
758
759   LOG(GNUNET_ERROR_TYPE_DEBUG, "Canceling watch.\n");
760   if(GNUNET_YES == wc->request_sent) /* If request already sent to service, send a cancel request. */
761   {
762     ev = GNUNET_MQ_msg(hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL);
763     GNUNET_MQ_send(h->mq, ev);
764     wc->callback = NULL;
765     wc->callback_cls = NULL;
766   }
767   if(NULL != wc->ev)
768   {
769     GNUNET_MQ_send_cancel(wc->ev);
770     wc->ev = NULL;
771   }
772   GNUNET_CONTAINER_multihashmap_remove(h->watches, &wc->keyhash, wc);
773   GNUNET_free(wc);
774
775 }
776
777 /**
778  * Request watching a given key
779  * User will be notified with any new values added to key
780  *
781  * @param h handle to the PEERSTORE service
782  * @param sub_system name of sub system
783  * @param peer Peer identity
784  * @param key entry key string
785  * @param callback function called with each new value
786  * @param callback_cls closure for @a callback
787  * @return Handle to watch request
788  */
789 struct GNUNET_PEERSTORE_WatchContext *
790 GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h,
791     const char *sub_system,
792     const struct GNUNET_PeerIdentity *peer,
793     const char *key,
794     GNUNET_PEERSTORE_Processor callback, void *callback_cls)
795 {
796   struct GNUNET_MQ_Envelope *ev;
797   struct StoreKeyHashMessage *hm;
798   struct GNUNET_PEERSTORE_WatchContext *wc;
799
800   ev = GNUNET_MQ_msg(hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
801   PEERSTORE_hash_key(sub_system, peer, key, &hm->keyhash);
802   wc = GNUNET_new(struct GNUNET_PEERSTORE_WatchContext);
803   wc->callback = callback;
804   wc->callback_cls = callback_cls;
805   wc->ev = ev;
806   wc->h = h;
807   wc->request_sent = GNUNET_NO;
808   wc->keyhash = hm->keyhash;
809   if(NULL == h->watches)
810     h->watches = GNUNET_CONTAINER_multihashmap_create(5, GNUNET_NO);
811   GNUNET_CONTAINER_multihashmap_put(h->watches, &wc->keyhash,
812       wc, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
813   LOG(GNUNET_ERROR_TYPE_DEBUG,
814       "Sending a watch request for sub system `%s'.\n", sub_system);
815   GNUNET_MQ_notify_sent(ev, &watch_request_sent, wc);
816   GNUNET_MQ_send(h->mq, ev);
817   return wc;
818 }
819
820 /* end of peerstore_api.c */