-ignore
[oweals/gnunet.git] / src / peerstore / peerstore_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file peerstore/peerstore_api.c
23  * @brief API for peerstore
24  * @author Omar Tarabai
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "peerstore.h"
29 #include "peerstore_common.h"
30
31 #define LOG(kind,...) GNUNET_log_from (kind, "peerstore-api",__VA_ARGS__)
32
33 /******************************************************************************/
34 /************************      DATA STRUCTURES     ****************************/
35 /******************************************************************************/
36
37 /**
38  * Handle to the PEERSTORE service.
39  */
40 struct GNUNET_PEERSTORE_Handle
41 {
42
43   /**
44    * Our configuration.
45    */
46   const struct GNUNET_CONFIGURATION_Handle *cfg;
47
48   /**
49    * Connection to the service.
50    */
51   struct GNUNET_CLIENT_Connection *client;
52
53   /**
54    * Message queue
55    */
56   struct GNUNET_MQ_Handle *mq;
57
58   /**
59    * Head of active STORE requests.
60    */
61   struct GNUNET_PEERSTORE_StoreContext *store_head;
62
63   /**
64    * Tail of active STORE requests.
65    */
66   struct GNUNET_PEERSTORE_StoreContext *store_tail;
67
68   /**
69    * Head of active ITERATE requests.
70    */
71   struct GNUNET_PEERSTORE_IterateContext *iterate_head;
72
73   /**
74    * Tail of active ITERATE requests.
75    */
76   struct GNUNET_PEERSTORE_IterateContext *iterate_tail;
77
78   /**
79    * Hashmap of watch requests
80    */
81   struct GNUNET_CONTAINER_MultiHashMap *watches;
82
83 };
84
85 /**
86  * Context for a store request
87  */
88 struct GNUNET_PEERSTORE_StoreContext
89 {
90   /**
91    * Kept in a DLL.
92    */
93   struct GNUNET_PEERSTORE_StoreContext *next;
94
95   /**
96    * Kept in a DLL.
97    */
98   struct GNUNET_PEERSTORE_StoreContext *prev;
99
100   /**
101    * Handle to the PEERSTORE service.
102    */
103   struct GNUNET_PEERSTORE_Handle *h;
104
105   /**
106    * MQ Envelope with store request message
107    */
108   struct GNUNET_MQ_Envelope *ev;
109
110   /**
111    * Continuation called with service response
112    */
113   GNUNET_PEERSTORE_Continuation cont;
114
115   /**
116    * Closure for 'cont'
117    */
118   void *cont_cls;
119
120   /**
121    * #GNUNET_YES / #GNUNET_NO
122    * if sent, cannot be canceled
123    */
124   int request_sent;
125
126 };
127
128 /**
129  * Context for a iterate request
130  */
131 struct GNUNET_PEERSTORE_IterateContext
132 {
133   /**
134    * Kept in a DLL.
135    */
136   struct GNUNET_PEERSTORE_IterateContext *next;
137
138   /**
139    * Kept in a DLL.
140    */
141   struct GNUNET_PEERSTORE_IterateContext *prev;
142
143   /**
144    * Handle to the PEERSTORE service.
145    */
146   struct GNUNET_PEERSTORE_Handle *h;
147
148   /**
149    * MQ Envelope with iterate request message
150    */
151   struct GNUNET_MQ_Envelope *ev;
152
153   /**
154    * Callback with each matching record
155    */
156   GNUNET_PEERSTORE_Processor callback;
157
158   /**
159    * Closure for 'callback'
160    */
161   void *callback_cls;
162
163   /**
164    * #GNUNET_YES / #GNUNET_NO
165    * if sent, cannot be canceled
166    */
167   int request_sent;
168
169   /**
170    * Task identifier for the function called
171    * on iterate request timeout
172    */
173   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
174
175 };
176
177 /**
178  * Context for a watch request
179  */
180 struct GNUNET_PEERSTORE_WatchContext
181 {
182   /**
183    * Kept in a DLL.
184    */
185   struct GNUNET_PEERSTORE_WatchContext *next;
186
187   /**
188    * Kept in a DLL.
189    */
190   struct GNUNET_PEERSTORE_WatchContext *prev;
191
192   /**
193    * Handle to the PEERSTORE service.
194    */
195   struct GNUNET_PEERSTORE_Handle *h;
196
197   /**
198    * MQ Envelope with watch request message
199    */
200   struct GNUNET_MQ_Envelope *ev;
201
202   /**
203    * Callback with each record received
204    */
205   GNUNET_PEERSTORE_Processor callback;
206
207   /**
208    * Closure for 'callback'
209    */
210   void *callback_cls;
211
212   /**
213    * Hash of the combined key
214    */
215   struct GNUNET_HashCode keyhash;
216
217   /**
218    * #GNUNET_YES / #GNUNET_NO
219    * if sent, cannot be canceled
220    */
221   int request_sent;
222
223 };
224
225 /******************************************************************************/
226 /*******************             DECLARATIONS             *********************/
227 /******************************************************************************/
228
229 /**
230  * When a response for store request is received
231  *
232  * @param cls a 'struct GNUNET_PEERSTORE_StoreContext *'
233  * @param msg message received, NULL on timeout or fatal error
234  */
235 void handle_store_result (void *cls, const struct GNUNET_MessageHeader *msg);
236
237 /**
238  * When a response for iterate request is received
239  *
240  * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
241  * @param msg message received, NULL on timeout or fatal error
242  */
243 void handle_iterate_result (void *cls, const struct GNUNET_MessageHeader *msg);
244
245 /**
246  * When a watch record is received
247  *
248  * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
249  * @param msg message received, NULL on timeout or fatal error
250  */
251 void handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg);
252
253 /**
254  * Close the existing connection to PEERSTORE and reconnect.
255  *
256  * @param h handle to the service
257  */
258 static void
259 reconnect (struct GNUNET_PEERSTORE_Handle *h);
260
261 /**
262  * MQ message handlers
263  */
264 static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
265     {&handle_store_result, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK, sizeof(struct GNUNET_MessageHeader)},
266     {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD, 0},
267     {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END, sizeof(struct GNUNET_MessageHeader)},
268     {&handle_watch_result, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD, 0},
269     GNUNET_MQ_HANDLERS_END
270 };
271
272 /******************************************************************************/
273 /*******************         CONNECTION FUNCTIONS         *********************/
274 /******************************************************************************/
275
276 static void
277 handle_client_error (void *cls, enum GNUNET_MQ_Error error)
278 {
279   struct GNUNET_PEERSTORE_Handle *h = cls;
280
281   LOG(GNUNET_ERROR_TYPE_ERROR, "Received an error notification from MQ of type: %d\n", error);
282   reconnect(h);
283 }
284
285 /**
286  * Close the existing connection to PEERSTORE and reconnect.
287  *
288  * @param h handle to the service
289  */
290 static void
291 reconnect (struct GNUNET_PEERSTORE_Handle *h)
292 {
293   LOG(GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n");
294   if (NULL != h->mq)
295   {
296     GNUNET_MQ_destroy(h->mq);
297     h->mq = NULL;
298   }
299   if (NULL != h->client)
300   {
301     GNUNET_CLIENT_disconnect (h->client);
302     h->client = NULL;
303   }
304   h->client = GNUNET_CLIENT_connect ("peerstore", h->cfg);
305   //FIXME: retry connecting if fails again (client == NULL)
306   h->mq = GNUNET_MQ_queue_for_connection_client(h->client,
307       mq_handlers,
308       &handle_client_error,
309       h);
310   //FIXME: resend pending requests after reconnecting
311
312 }
313
314 /**
315  * Connect to the PEERSTORE service.
316  *
317  * @return NULL on error
318  */
319 struct GNUNET_PEERSTORE_Handle *
320 GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
321 {
322   struct GNUNET_PEERSTORE_Handle *h;
323
324   h = GNUNET_new (struct GNUNET_PEERSTORE_Handle);
325   h->client = GNUNET_CLIENT_connect ("peerstore", cfg);
326   if(NULL == h->client)
327   {
328     GNUNET_free(h);
329     return NULL;
330   }
331   h->cfg = cfg;
332   h->mq = GNUNET_MQ_queue_for_connection_client(h->client,
333       mq_handlers,
334       &handle_client_error,
335       h);
336   if(NULL == h->mq)
337   {
338     GNUNET_free(h);
339     return NULL;
340   }
341   LOG(GNUNET_ERROR_TYPE_DEBUG, "New connection created\n");
342   return h;
343 }
344
345 /**
346  * Disconnect from the PEERSTORE service
347  * Do not call in case of pending requests
348  *
349  * @param h handle to disconnect
350  */
351 void
352 GNUNET_PEERSTORE_disconnect(struct GNUNET_PEERSTORE_Handle *h)
353 {
354   if(NULL != h->watches)
355   {
356     GNUNET_CONTAINER_multihashmap_destroy(h->watches);
357     h->watches = NULL;
358   }
359   if(NULL != h->mq)
360   {
361     GNUNET_MQ_destroy(h->mq);
362     h->mq = NULL;
363   }
364   if (NULL != h->client)
365   {
366     GNUNET_CLIENT_disconnect (h->client);
367     h->client = NULL;
368   }
369   GNUNET_free(h);
370   LOG(GNUNET_ERROR_TYPE_DEBUG, "Disconnected, BYE!\n");
371 }
372
373
374 /******************************************************************************/
375 /*******************            STORE FUNCTIONS           *********************/
376 /******************************************************************************/
377
378 /**
379  * When a response for store request is received
380  *
381  * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
382  * @param msg message received, NULL on timeout or fatal error
383  */
384 void handle_store_result (void *cls, const struct GNUNET_MessageHeader *msg)
385 {
386   struct GNUNET_PEERSTORE_Handle *h = cls;
387   struct GNUNET_PEERSTORE_StoreContext *sc;
388   GNUNET_PEERSTORE_Continuation cont;
389   void *cont_cls;
390
391   sc = h->store_head;
392   if(NULL == sc)
393   {
394     LOG(GNUNET_ERROR_TYPE_ERROR, "Unexpected store response, this should not happen.\n");
395     reconnect(h);
396     return;
397   }
398   cont = sc->cont;
399   cont_cls = sc->cont_cls;
400   GNUNET_CONTAINER_DLL_remove(h->store_head, h->store_tail, sc);
401   GNUNET_free(sc);
402   if(NULL == msg) /* Connection error */
403   {
404     if(NULL != cont)
405       cont(cont_cls, GNUNET_SYSERR);
406     reconnect(h);
407     return;
408   }
409   if(NULL != cont) /* Run continuation */
410     cont(cont_cls, GNUNET_OK);
411
412 }
413
414 /**
415  * Callback after MQ envelope is sent
416  *
417  * @param cls a 'struct GNUNET_PEERSTORE_StoreContext *'
418  */
419 void store_request_sent (void *cls)
420 {
421   struct GNUNET_PEERSTORE_StoreContext *sc = cls;
422
423   sc->request_sent = GNUNET_YES;
424   sc->ev = NULL;
425 }
426
427 /**
428  * Cancel a store request
429  *
430  * @param sc Store request context
431  */
432 void
433 GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc)
434 {
435   LOG(GNUNET_ERROR_TYPE_DEBUG,
436           "Canceling store request.\n");
437   if(GNUNET_NO == sc->request_sent)
438   {
439     if(NULL != sc->ev)
440     {
441       GNUNET_MQ_send_cancel(sc->ev);
442       sc->ev = NULL;
443     }
444     GNUNET_CONTAINER_DLL_remove(sc->h->store_head, sc->h->store_tail, sc);
445     GNUNET_free(sc);
446   }
447   else
448   { /* request already sent, will have to wait for response */
449     sc->cont = NULL;
450   }
451
452 }
453
454 /**
455  * Store a new entry in the PEERSTORE
456  *
457  * @param h Handle to the PEERSTORE service
458  * @param sub_system name of the sub system
459  * @param peer Peer Identity
460  * @param key entry key
461  * @param value entry value BLOB
462  * @param size size of 'value'
463  * @param expiry absolute time after which the entry is (possibly) deleted
464  * @param cont Continuation function after the store request is processed
465  * @param cont_cls Closure for 'cont'
466  */
467 struct GNUNET_PEERSTORE_StoreContext *
468 GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
469     const char *sub_system,
470     const struct GNUNET_PeerIdentity *peer,
471     const char *key,
472     const void *value,
473     size_t size,
474     struct GNUNET_TIME_Absolute expiry,
475     GNUNET_PEERSTORE_Continuation cont,
476     void *cont_cls)
477 {
478   struct GNUNET_MQ_Envelope *ev; //FIXME: add 'replace' flag in store function (similar to multihashmap)
479   struct GNUNET_PEERSTORE_StoreContext *sc;
480
481   LOG (GNUNET_ERROR_TYPE_DEBUG,
482       "Storing value (size: %lu) for subsytem `%s', peer `%s', key `%s'\n",
483       size, sub_system, GNUNET_i2s (peer), key);
484   ev = PEERSTORE_create_record_mq_envelope(sub_system,
485       peer,
486       key,
487       value,
488       size,
489       &expiry,
490       GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
491   sc = GNUNET_new(struct GNUNET_PEERSTORE_StoreContext);
492   sc->ev = ev;
493   sc->cont = cont;
494   sc->cont_cls = cont_cls;
495   sc->h = h;
496   sc->request_sent = GNUNET_NO;
497   GNUNET_CONTAINER_DLL_insert(h->store_head, h->store_tail, sc);
498   GNUNET_MQ_notify_sent(ev, &store_request_sent, sc);
499   GNUNET_MQ_send(h->mq, ev);
500   return sc;
501
502 }
503
504 /******************************************************************************/
505 /*******************           ITERATE FUNCTIONS          *********************/
506 /******************************************************************************/
507
508 /**
509  * When a response for iterate request is received
510  *
511  * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
512  * @param msg message received, NULL on timeout or fatal error
513  */
514 void handle_iterate_result (void *cls, const struct GNUNET_MessageHeader *msg)
515 {
516   struct GNUNET_PEERSTORE_Handle *h = cls;
517   struct GNUNET_PEERSTORE_IterateContext *ic;
518   GNUNET_PEERSTORE_Processor callback;
519   void *callback_cls;
520   uint16_t msg_type;
521   struct GNUNET_PEERSTORE_Record *record;
522   int continue_iter;
523
524   ic = h->iterate_head;
525   if(NULL == ic)
526   {
527     LOG(GNUNET_ERROR_TYPE_ERROR, "Unexpected iteration response, this should not happen.\n");
528     reconnect(h);
529     return;
530   }
531   callback = ic->callback;
532   callback_cls = ic->callback_cls;
533   if(NULL == msg) /* Connection error */
534   {
535
536     if(NULL != callback)
537       callback(callback_cls, NULL,
538           _("Error communicating with `PEERSTORE' service."));
539     reconnect(h);
540     return;
541   }
542   msg_type = ntohs(msg->type);
543   if(GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END == msg_type)
544   {
545     GNUNET_PEERSTORE_iterate_cancel(ic);
546     if(NULL != callback)
547       callback(callback_cls, NULL, NULL);
548     return;
549   }
550   if(NULL != callback)
551   {
552     record = PEERSTORE_parse_record_message(msg);
553     if(NULL == record)
554       continue_iter = callback(callback_cls, record, _("Received a malformed response from service."));
555     else
556       continue_iter = callback(callback_cls, record, NULL);
557     if(GNUNET_NO == continue_iter)
558       ic->callback = NULL;
559   }
560
561 }
562
563 /**
564  * Callback after MQ envelope is sent
565  *
566  * @param cls a 'struct GNUNET_PEERSTORE_IterateContext *'
567  */
568 void iterate_request_sent (void *cls)
569 {
570   struct GNUNET_PEERSTORE_IterateContext *ic = cls;
571
572   LOG(GNUNET_ERROR_TYPE_DEBUG, "Iterate request sent to service.\n");
573   ic->request_sent = GNUNET_YES;
574   ic->ev = NULL;
575 }
576
577 /**
578  * Called when the iterate request is timedout
579  *
580  * @param cls a 'struct GNUNET_PEERSTORE_IterateContext *'
581  */
582 void iterate_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
583 {
584   struct GNUNET_PEERSTORE_IterateContext *ic = cls;
585
586   ic->timeout_task = GNUNET_SCHEDULER_NO_TASK;
587   GNUNET_PEERSTORE_iterate_cancel(ic);
588 }
589
590 /**
591  * Cancel an iterate request
592  * Please do not call after the iterate request is done
593  *
594  * @param ic Iterate request context as returned by GNUNET_PEERSTORE_iterate()
595  */
596 void
597 GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic)
598 {
599   LOG(GNUNET_ERROR_TYPE_DEBUG, "Canceling iterate request.\n");
600   if(GNUNET_SCHEDULER_NO_TASK != ic->timeout_task)
601   {
602     GNUNET_SCHEDULER_cancel(ic->timeout_task);
603     ic->timeout_task = GNUNET_SCHEDULER_NO_TASK;
604   }
605   if(GNUNET_NO == ic->request_sent)
606   {
607     if(NULL != ic->ev)
608     {
609       GNUNET_MQ_send_cancel(ic->ev);
610       ic->ev = NULL;
611     }
612     GNUNET_CONTAINER_DLL_remove(ic->h->iterate_head, ic->h->iterate_tail, ic);
613     GNUNET_free(ic);
614   }
615   else
616     ic->callback = NULL;
617 }
618
619 /**
620  * Iterate over records matching supplied key information
621  *
622  * @param h handle to the PEERSTORE service
623  * @param sub_system name of sub system
624  * @param peer Peer identity (can be NULL)
625  * @param key entry key string (can be NULL)
626  * @param timeout time after which the iterate request is canceled
627  * @param callback function called with each matching record, all NULL's on end
628  * @param callback_cls closure for @a callback
629  * @return Handle to iteration request
630  */
631 struct GNUNET_PEERSTORE_IterateContext *
632 GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
633     const char *sub_system,
634     const struct GNUNET_PeerIdentity *peer,
635     const char *key,
636     struct GNUNET_TIME_Relative timeout,
637     GNUNET_PEERSTORE_Processor callback, void *callback_cls)
638 {
639   struct GNUNET_MQ_Envelope *ev;
640   struct GNUNET_PEERSTORE_IterateContext *ic;
641
642   ev = PEERSTORE_create_record_mq_envelope(sub_system,
643       peer,
644       key,
645       NULL,
646       0,
647       NULL,
648       GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
649   ic = GNUNET_new(struct GNUNET_PEERSTORE_IterateContext);
650   ic->callback = callback;
651   ic->callback_cls = callback_cls;
652   ic->ev = ev;
653   ic->h = h;
654   ic->request_sent = GNUNET_NO;
655   GNUNET_CONTAINER_DLL_insert(h->iterate_head, h->iterate_tail, ic);
656   LOG(GNUNET_ERROR_TYPE_DEBUG,
657         "Sending an iterate request for sub system `%s'\n", sub_system);
658   GNUNET_MQ_notify_sent(ev, &iterate_request_sent, ic);
659   GNUNET_MQ_send(h->mq, ev);
660   ic->timeout_task = GNUNET_SCHEDULER_add_delayed(timeout, &iterate_timeout, ic);
661   return ic;
662 }
663
664 /******************************************************************************/
665 /*******************            WATCH FUNCTIONS           *********************/
666 /******************************************************************************/
667
668 /**
669  * When a watch record is received
670  *
671  * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
672  * @param msg message received, NULL on timeout or fatal error
673  */
674 void handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg)
675 {
676   struct GNUNET_PEERSTORE_Handle *h = cls;
677   struct GNUNET_PEERSTORE_Record *record;
678   struct GNUNET_HashCode keyhash;
679   struct GNUNET_PEERSTORE_WatchContext *wc;
680
681   if(NULL == msg)
682   {
683     LOG(GNUNET_ERROR_TYPE_ERROR,
684         "Problem receiving a watch response, no way to determine which request.\n");
685     reconnect(h);
686     return;
687   }
688   LOG(GNUNET_ERROR_TYPE_DEBUG, "Received a watch record from service.\n");
689   record = PEERSTORE_parse_record_message(msg);
690   PEERSTORE_hash_key(record->sub_system,
691       record->peer, record->key, &keyhash);
692   wc = GNUNET_CONTAINER_multihashmap_get(h->watches, &keyhash);
693   if(NULL != wc->callback)
694     wc->callback(wc->callback_cls, record, NULL);
695   /* TODO: destroy record */
696 }
697
698 /**
699  * Callback after MQ envelope is sent
700  *
701  * @param cls a 'struct GNUNET_PEERSTORE_WatchContext *'
702  */
703 void watch_request_sent (void *cls)
704 {
705   struct GNUNET_PEERSTORE_WatchContext *wc = cls;
706
707   wc->request_sent = GNUNET_YES;
708   wc->ev = NULL;
709 }
710
711 /**
712  * Cancel a watch request
713  *
714  * @wc handle to the watch request
715  */
716 void
717 GNUNET_PEERSTORE_watch_cancel(struct GNUNET_PEERSTORE_WatchContext *wc)
718 {
719   struct GNUNET_PEERSTORE_Handle *h = wc->h;
720   struct GNUNET_MQ_Envelope *ev;
721   struct StoreKeyHashMessage *hm;
722
723   LOG(GNUNET_ERROR_TYPE_DEBUG, "Canceling watch.\n");
724   if(GNUNET_YES == wc->request_sent) /* If request already sent to service, send a cancel request. */
725   {
726     ev = GNUNET_MQ_msg(hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL);
727     GNUNET_MQ_send(h->mq, ev);
728     wc->callback = NULL;
729     wc->callback_cls = NULL;
730   }
731   if(NULL != wc->ev)
732   {
733     GNUNET_MQ_send_cancel(wc->ev);
734     wc->ev = NULL;
735   }
736   GNUNET_CONTAINER_multihashmap_remove(h->watches, &wc->keyhash, wc);
737   GNUNET_free(wc);
738
739 }
740
741 /**
742  * Request watching a given key
743  * User will be notified with any new values added to key
744  *
745  * @param h handle to the PEERSTORE service
746  * @param sub_system name of sub system
747  * @param peer Peer identity
748  * @param key entry key string
749  * @param callback function called with each new value
750  * @param callback_cls closure for @a callback
751  * @return Handle to watch request
752  */
753 struct GNUNET_PEERSTORE_WatchContext *
754 GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h,
755     const char *sub_system,
756     const struct GNUNET_PeerIdentity *peer,
757     const char *key,
758     GNUNET_PEERSTORE_Processor callback, void *callback_cls)
759 {
760   struct GNUNET_MQ_Envelope *ev;
761   struct StoreKeyHashMessage *hm;
762   struct GNUNET_PEERSTORE_WatchContext *wc;
763
764   ev = GNUNET_MQ_msg(hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
765   PEERSTORE_hash_key(sub_system, peer, key, &hm->keyhash);
766   wc = GNUNET_new(struct GNUNET_PEERSTORE_WatchContext);
767   wc->callback = callback;
768   wc->callback_cls = callback_cls;
769   wc->ev = ev;
770   wc->h = h;
771   wc->request_sent = GNUNET_NO;
772   wc->keyhash = hm->keyhash;
773   if(NULL == h->watches)
774     h->watches = GNUNET_CONTAINER_multihashmap_create(5, GNUNET_NO);
775   GNUNET_CONTAINER_multihashmap_put(h->watches, &wc->keyhash,
776       wc, GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
777   LOG(GNUNET_ERROR_TYPE_DEBUG,
778       "Sending a watch request for sub system `%s'.\n", sub_system);
779   GNUNET_MQ_notify_sent(ev, &watch_request_sent, wc);
780   GNUNET_MQ_send(h->mq, ev);
781   return wc;
782 }
783
784 /* end of peerstore_api.c */