52b1b22ca574bd3890c82ecae6b964d8af07f212
[oweals/gnunet.git] / src / peerstore / peerstore_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2013-2014 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20 /**
21  * @file peerstore/peerstore_api.c
22  * @brief API for peerstore
23  * @author Omar Tarabai
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "peerstore.h"
29 #include "peerstore_common.h"
30
31 #define LOG(kind,...) GNUNET_log_from (kind, "peerstore-api",__VA_ARGS__)
32
33 /******************************************************************************/
34 /************************      DATA STRUCTURES     ****************************/
35 /******************************************************************************/
36
37 /**
38  * Handle to the PEERSTORE service.
39  */
40 struct GNUNET_PEERSTORE_Handle
41 {
42
43   /**
44    * Our configuration.
45    */
46   const struct GNUNET_CONFIGURATION_Handle *cfg;
47
48   /**
49    * Connection to the service.
50    */
51   struct GNUNET_CLIENT_Connection *client;
52
53   /**
54    * Message queue
55    */
56   struct GNUNET_MQ_Handle *mq;
57
58   /**
59    * Head of active STORE requests.
60    */
61   struct GNUNET_PEERSTORE_StoreContext *store_head;
62
63   /**
64    * Tail of active STORE requests.
65    */
66   struct GNUNET_PEERSTORE_StoreContext *store_tail;
67
68   /**
69    * Head of active ITERATE requests.
70    */
71   struct GNUNET_PEERSTORE_IterateContext *iterate_head;
72
73   /**
74    * Tail of active ITERATE requests.
75    */
76   struct GNUNET_PEERSTORE_IterateContext *iterate_tail;
77
78   /**
79    * Hashmap of watch requests
80    */
81   struct GNUNET_CONTAINER_MultiHashMap *watches;
82
83   /**
84    * Are we in the process of disconnecting but need to sync first?
85    */
86   int disconnecting;
87
88 };
89
90 /**
91  * Context for a store request
92  */
93 struct GNUNET_PEERSTORE_StoreContext
94 {
95   /**
96    * Kept in a DLL.
97    */
98   struct GNUNET_PEERSTORE_StoreContext *next;
99
100   /**
101    * Kept in a DLL.
102    */
103   struct GNUNET_PEERSTORE_StoreContext *prev;
104
105   /**
106    * Handle to the PEERSTORE service.
107    */
108   struct GNUNET_PEERSTORE_Handle *h;
109
110   /**
111    * Continuation called with service response
112    */
113   GNUNET_PEERSTORE_Continuation cont;
114
115   /**
116    * Closure for @e cont
117    */
118   void *cont_cls;
119
120   /**
121    * Which subsystem does the store?
122    */
123   char *sub_system;
124
125   /**
126    * Key for the store operation.
127    */
128   char *key;
129
130   /**
131    * Contains @e size bytes.
132    */
133   void *value;
134
135   /**
136    * MQ Envelope with store request message
137    */
138   struct GNUNET_MQ_Envelope *ev;
139
140   /**
141    * Peer the store is for.
142    */
143   struct GNUNET_PeerIdentity peer;
144
145   /**
146    * Number of bytes in @e value.
147    */
148   size_t size;
149
150   /**
151    * When does the value expire?
152    */
153   struct GNUNET_TIME_Absolute expiry;
154
155   /**
156    * Options for the store operation.
157    */
158   enum GNUNET_PEERSTORE_StoreOption options;
159
160 };
161
162 /**
163  * Context for a iterate request
164  */
165 struct GNUNET_PEERSTORE_IterateContext
166 {
167   /**
168    * Kept in a DLL.
169    */
170   struct GNUNET_PEERSTORE_IterateContext *next;
171
172   /**
173    * Kept in a DLL.
174    */
175   struct GNUNET_PEERSTORE_IterateContext *prev;
176
177   /**
178    * Handle to the PEERSTORE service.
179    */
180   struct GNUNET_PEERSTORE_Handle *h;
181
182   /**
183    * Which subsystem does the store?
184    */
185   char *sub_system;
186
187   /**
188    * Peer the store is for.
189    */
190   struct GNUNET_PeerIdentity peer;
191
192   /**
193    * Key for the store operation.
194    */
195   char *key;
196
197   /**
198    * Operation timeout
199    */
200   struct GNUNET_TIME_Relative timeout;
201
202   /**
203    * MQ Envelope with iterate request message
204    */
205   struct GNUNET_MQ_Envelope *ev;
206
207   /**
208    * Callback with each matching record
209    */
210   GNUNET_PEERSTORE_Processor callback;
211
212   /**
213    * Closure for @e callback
214    */
215   void *callback_cls;
216
217   /**
218    * #GNUNET_YES / #GNUNET_NO
219    * Iterate request has been sent and we are still expecting records
220    */
221   int iterating;
222
223   /**
224    * Task identifier for the function called
225    * on iterate request timeout
226    */
227   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
228
229 };
230
231 /**
232  * Context for a watch request
233  */
234 struct GNUNET_PEERSTORE_WatchContext
235 {
236   /**
237    * Kept in a DLL.
238    */
239   struct GNUNET_PEERSTORE_WatchContext *next;
240
241   /**
242    * Kept in a DLL.
243    */
244   struct GNUNET_PEERSTORE_WatchContext *prev;
245
246   /**
247    * Handle to the PEERSTORE service.
248    */
249   struct GNUNET_PEERSTORE_Handle *h;
250
251   /**
252    * MQ Envelope with watch request message
253    */
254   struct GNUNET_MQ_Envelope *ev;
255
256   /**
257    * Callback with each record received
258    */
259   GNUNET_PEERSTORE_Processor callback;
260
261   /**
262    * Closure for @e callback
263    */
264   void *callback_cls;
265
266   /**
267    * Hash of the combined key
268    */
269   struct GNUNET_HashCode keyhash;
270
271   /**
272    * #GNUNET_YES / #GNUNET_NO
273    * if sent, cannot be canceled
274    */
275   int request_sent;
276
277 };
278
279 /******************************************************************************/
280 /*******************             DECLARATIONS             *********************/
281 /******************************************************************************/
282
283 /**
284  * When a response for iterate request is received
285  *
286  * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
287  * @param msg message received, NULL on timeout or fatal error
288  */
289 static void
290 handle_iterate_result (void *cls, const struct GNUNET_MessageHeader *msg);
291
292 /**
293  * When a watch record is received
294  *
295  * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
296  * @param msg message received, NULL on timeout or fatal error
297  */
298 static void
299 handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg);
300
301 /**
302  * Close the existing connection to PEERSTORE and reconnect.
303  *
304  * @param h handle to the service
305  */
306 static void
307 reconnect (struct GNUNET_PEERSTORE_Handle *h);
308
309 /**
310  * Callback after MQ envelope is sent
311  *
312  * @param cls a 'struct GNUNET_PEERSTORE_WatchContext *'
313  */
314 static void
315 watch_request_sent (void *cls)
316 {
317   struct GNUNET_PEERSTORE_WatchContext *wc = cls;
318
319   wc->request_sent = GNUNET_YES;
320   wc->ev = NULL;
321 }
322
323
324 /**
325  * Callback after MQ envelope is sent
326  *
327  * @param cls a `struct GNUNET_PEERSTORE_IterateContext *`
328  */
329 static void
330 iterate_request_sent (void *cls)
331 {
332   struct GNUNET_PEERSTORE_IterateContext *ic = cls;
333
334   LOG (GNUNET_ERROR_TYPE_DEBUG, "Iterate request sent to service.\n");
335   ic->iterating = GNUNET_YES;
336   ic->ev = NULL;
337 }
338
339
340 /**
341  * Callback after MQ envelope is sent
342  *
343  * @param cls a `struct GNUNET_PEERSTORE_StoreContext *`
344  */
345 static void
346 store_request_sent (void *cls)
347 {
348   struct GNUNET_PEERSTORE_StoreContext *sc = cls;
349   GNUNET_PEERSTORE_Continuation cont;
350   void *cont_cls;
351
352   sc->ev = NULL;
353   cont = sc->cont;
354   cont_cls = sc->cont_cls;
355   GNUNET_PEERSTORE_store_cancel (sc);
356   if (NULL != cont)
357     cont (cont_cls, GNUNET_OK);
358 }
359
360
361 /**
362  * MQ message handlers
363  */
364 static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
365   {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD, 0},
366   {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END,
367    sizeof (struct GNUNET_MessageHeader)},
368   {&handle_watch_result, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD, 0},
369   GNUNET_MQ_HANDLERS_END
370 };
371
372 /******************************************************************************/
373 /*******************         CONNECTION FUNCTIONS         *********************/
374 /******************************************************************************/
375
376 static void
377 handle_client_error (void *cls, enum GNUNET_MQ_Error error)
378 {
379   struct GNUNET_PEERSTORE_Handle *h = cls;
380
381   LOG (GNUNET_ERROR_TYPE_ERROR,
382        _("Received an error notification from MQ of type: %d\n"), error);
383   reconnect (h);
384 }
385
386
387 /**
388  * Iterator over previous watches to resend them
389  */
390 static int
391 rewatch_it (void *cls, const struct GNUNET_HashCode *key, void *value)
392 {
393   struct GNUNET_PEERSTORE_Handle *h = cls;
394   struct GNUNET_PEERSTORE_WatchContext *wc = value;
395   struct StoreKeyHashMessage *hm;
396
397   if (GNUNET_YES == wc->request_sent)
398   {                             /* Envelope gone, create new one. */
399     wc->ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
400     hm->keyhash = wc->keyhash;
401     wc->request_sent = GNUNET_NO;
402   }
403   GNUNET_MQ_notify_sent (wc->ev, &watch_request_sent, wc);
404   GNUNET_MQ_send (h->mq, wc->ev);
405   return GNUNET_YES;
406 }
407
408
409 /**
410  * Called when the iterate request is timedout
411  *
412  * @param cls a `struct GNUNET_PEERSTORE_IterateContext *`
413  * @param tc Scheduler task context (unused)
414  */
415 static void
416 iterate_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
417 {
418   struct GNUNET_PEERSTORE_IterateContext *ic = cls;
419   GNUNET_PEERSTORE_Processor callback;
420   void *callback_cls;
421
422   ic->timeout_task = GNUNET_SCHEDULER_NO_TASK;
423   callback = ic->callback;
424   callback_cls = ic->callback_cls;
425   GNUNET_PEERSTORE_iterate_cancel (ic);
426   if (NULL != callback)
427     callback (callback_cls, NULL, _("timeout"));
428 }
429
430
431 /**
432  * Close the existing connection to PEERSTORE and reconnect.
433  *
434  * @param h handle to the service
435  */
436 static void
437 reconnect (struct GNUNET_PEERSTORE_Handle *h)
438 {
439   struct GNUNET_PEERSTORE_IterateContext *ic;
440   GNUNET_PEERSTORE_Processor icb;
441   void *icb_cls;
442   struct GNUNET_PEERSTORE_StoreContext *sc;
443
444   LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n");
445   for (sc = h->store_head; NULL != sc; sc = sc->next)
446   {
447     if (NULL != sc->ev)
448     {
449       GNUNET_MQ_send_cancel (sc->ev);
450       sc->ev = NULL;
451     }
452   }
453   if (NULL != h->mq)
454   {
455     GNUNET_MQ_destroy (h->mq);
456     h->mq = NULL;
457   }
458   if (NULL != h->client)
459   {
460     GNUNET_CLIENT_disconnect (h->client);
461     h->client = NULL;
462   }
463   h->client = GNUNET_CLIENT_connect ("peerstore", h->cfg);
464   GNUNET_assert (NULL != h->client);
465   h->mq =
466       GNUNET_MQ_queue_for_connection_client (h->client, mq_handlers,
467                                              &handle_client_error, h);
468   LOG (GNUNET_ERROR_TYPE_DEBUG,
469        "Resending pending requests after reconnect.\n");
470   if (NULL != h->watches)
471     GNUNET_CONTAINER_multihashmap_iterate (h->watches, &rewatch_it, h);
472   for (ic = h->iterate_head; NULL != ic; ic = ic->next)
473   {
474     if (GNUNET_YES == ic->iterating)
475     {
476       icb = ic->callback;
477       icb_cls = ic->callback_cls;
478       GNUNET_PEERSTORE_iterate_cancel (ic);
479       if (NULL != icb)
480         icb (icb_cls, NULL, _("Iteration canceled due to reconnection."));
481     }
482     else
483     {
484       if (GNUNET_SCHEDULER_NO_TASK != ic->timeout_task)
485       {
486         GNUNET_SCHEDULER_cancel (ic->timeout_task);
487         ic->timeout_task = GNUNET_SCHEDULER_NO_TASK;
488       }
489       if (NULL != ic->ev)
490       {
491         GNUNET_MQ_send_cancel (ic->ev);
492         ic->ev = NULL;
493       }
494       ic->ev =
495           PEERSTORE_create_record_mq_envelope (ic->sub_system, &ic->peer,
496                                                ic->key, NULL, 0, NULL, 0,
497                                                GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
498       GNUNET_MQ_notify_sent (ic->ev, &iterate_request_sent, ic);
499       GNUNET_MQ_send (h->mq, ic->ev);
500       ic->timeout_task =
501           GNUNET_SCHEDULER_add_delayed (ic->timeout, &iterate_timeout, ic);
502     }
503   }
504   for (sc = h->store_head; NULL != sc; sc = sc->next)
505   {
506     sc->ev =
507         PEERSTORE_create_record_mq_envelope (sc->sub_system, &sc->peer, sc->key,
508                                              sc->value, sc->size, &sc->expiry,
509                                              sc->options,
510                                              GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
511     GNUNET_MQ_notify_sent (sc->ev, &store_request_sent, sc);
512     GNUNET_MQ_send (h->mq, sc->ev);
513   }
514 }
515
516
517 /**
518  * Iterator over watch requests to cancel them.
519  *
520  * @param cls unsused
521  * @param key key to the watch request
522  * @param value watch context
523  * @return #GNUNET_YES to continue iteration
524  */
525 static int
526 destroy_watch (void *cls, const struct GNUNET_HashCode *key, void *value)
527 {
528   struct GNUNET_PEERSTORE_WatchContext *wc = value;
529
530   GNUNET_PEERSTORE_watch_cancel (wc);
531   return GNUNET_YES;
532 }
533
534
535 /**
536  * Kill the connection to the service. This can be delayed in case of pending
537  * STORE requests and the user explicitly asked to sync first. Otherwise it is
538  * performed instantly.
539  *
540  * @param h Handle to the service.
541  */
542 static void
543 do_disconnect (struct GNUNET_PEERSTORE_Handle *h)
544 {
545   if (NULL != h->mq)
546   {
547     GNUNET_MQ_destroy (h->mq);
548     h->mq = NULL;
549   }
550   if (NULL != h->client)
551   {
552     GNUNET_CLIENT_disconnect (h->client);
553     h->client = NULL;
554   }
555   GNUNET_free (h);
556 }
557
558
559 /**
560  * Connect to the PEERSTORE service.
561  *
562  * @param cfg configuration to use
563  * @return NULL on error
564  */
565 struct GNUNET_PEERSTORE_Handle *
566 GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
567 {
568   struct GNUNET_PEERSTORE_Handle *h;
569
570   h = GNUNET_new (struct GNUNET_PEERSTORE_Handle);
571
572   h->client = GNUNET_CLIENT_connect ("peerstore", cfg);
573   if (NULL == h->client)
574   {
575     GNUNET_free (h);
576     return NULL;
577   }
578   h->cfg = cfg;
579   h->disconnecting = GNUNET_NO;
580   h->mq =
581       GNUNET_MQ_queue_for_connection_client (h->client, mq_handlers,
582                                              &handle_client_error, h);
583   if (NULL == h->mq)
584   {
585     GNUNET_CLIENT_disconnect (h->client);
586     GNUNET_free (h);
587     return NULL;
588   }
589   LOG (GNUNET_ERROR_TYPE_DEBUG, "New connection created\n");
590   return h;
591 }
592
593
594 /**
595  * Disconnect from the PEERSTORE service. Any pending ITERATE and WATCH requests
596  * will be canceled.
597  * Any pending STORE requests will depend on @e snyc_first flag.
598  *
599  * @param h handle to disconnect
600  * @param sync_first send any pending STORE requests before disconnecting
601  */
602 void
603 GNUNET_PEERSTORE_disconnect (struct GNUNET_PEERSTORE_Handle *h, int sync_first)
604 {
605   struct GNUNET_PEERSTORE_IterateContext *ic;
606   struct GNUNET_PEERSTORE_IterateContext *ic_iter;
607   struct GNUNET_PEERSTORE_StoreContext *sc;
608   struct GNUNET_PEERSTORE_StoreContext *sc_iter;
609
610   LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting.\n");
611   if (NULL != h->watches)
612   {
613     GNUNET_CONTAINER_multihashmap_iterate (h->watches, &destroy_watch, NULL);
614     GNUNET_CONTAINER_multihashmap_destroy (h->watches);
615     h->watches = NULL;
616   }
617   ic_iter = h->iterate_head;
618   while (NULL != ic_iter)
619   {
620     GNUNET_break (0);
621     ic = ic_iter;
622     ic_iter = ic_iter->next;
623     GNUNET_PEERSTORE_iterate_cancel (ic);
624   }
625   if (NULL != h->store_head)
626   {
627     if (GNUNET_YES == sync_first)
628     {
629       LOG (GNUNET_ERROR_TYPE_DEBUG,
630            "Delaying disconnection due to pending store requests.\n");
631       h->disconnecting = GNUNET_YES;
632       return;
633     }
634     sc_iter = h->store_head;
635     while (NULL != sc_iter)
636     {
637       sc = sc_iter;
638       sc_iter = sc_iter->next;
639       GNUNET_PEERSTORE_store_cancel (sc);
640     }
641   }
642   do_disconnect (h);
643 }
644
645
646 /******************************************************************************/
647 /*******************            STORE FUNCTIONS           *********************/
648 /******************************************************************************/
649
650
651 /**
652  * Cancel a store request
653  *
654  * @param sc Store request context
655  */
656 void
657 GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc)
658 {
659   struct GNUNET_PEERSTORE_Handle *h = sc->h;
660
661   if (NULL != sc->ev)
662   {
663     GNUNET_MQ_send_cancel (sc->ev);
664     sc->ev = NULL;
665   }
666   GNUNET_CONTAINER_DLL_remove (sc->h->store_head, sc->h->store_tail, sc);
667   GNUNET_free (sc->sub_system);
668   GNUNET_free (sc->value);
669   GNUNET_free (sc->key);
670   GNUNET_free (sc);
671   if ((GNUNET_YES == h->disconnecting) && (NULL == h->store_head))
672     do_disconnect (h);
673 }
674
675
676 /**
677  * Store a new entry in the PEERSTORE.
678  * Note that stored entries can be lost in some cases
679  * such as power failure.
680  *
681  * @param h Handle to the PEERSTORE service
682  * @param sub_system name of the sub system
683  * @param peer Peer Identity
684  * @param key entry key
685  * @param value entry value BLOB
686  * @param size size of @e value
687  * @param expiry absolute time after which the entry is (possibly) deleted
688  * @param options options specific to the storage operation
689  * @param cont Continuation function after the store request is sent
690  * @param cont_cls Closure for @a cont
691  */
692 struct GNUNET_PEERSTORE_StoreContext *
693 GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
694                         const char *sub_system,
695                         const struct GNUNET_PeerIdentity *peer, const char *key,
696                         const void *value, size_t size,
697                         struct GNUNET_TIME_Absolute expiry,
698                         enum GNUNET_PEERSTORE_StoreOption options,
699                         GNUNET_PEERSTORE_Continuation cont, void *cont_cls)
700 {
701   struct GNUNET_MQ_Envelope *ev;
702   struct GNUNET_PEERSTORE_StoreContext *sc;
703
704   LOG (GNUNET_ERROR_TYPE_DEBUG,
705        "Storing value (size: %lu) for subsytem `%s', peer `%s', key `%s'\n",
706        size, sub_system, GNUNET_i2s (peer), key);
707   ev = PEERSTORE_create_record_mq_envelope (sub_system, peer, key, value, size,
708                                             &expiry, options,
709                                             GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
710   sc = GNUNET_new (struct GNUNET_PEERSTORE_StoreContext);
711
712   sc->sub_system = GNUNET_strdup (sub_system);
713   sc->peer = *peer;
714   sc->key = GNUNET_strdup (key);
715   sc->value = GNUNET_memdup (value, size);
716   sc->size = size;
717   sc->expiry = expiry;
718   sc->options = options;
719   sc->cont = cont;
720   sc->cont_cls = cont_cls;
721   sc->h = h;
722
723   GNUNET_CONTAINER_DLL_insert_tail (h->store_head, h->store_tail, sc);
724   GNUNET_MQ_notify_sent (ev, &store_request_sent, sc);
725   GNUNET_MQ_send (h->mq, ev);
726   return sc;
727
728 }
729
730
731 /******************************************************************************/
732 /*******************           ITERATE FUNCTIONS          *********************/
733 /******************************************************************************/
734
735 /**
736  * When a response for iterate request is received
737  *
738  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
739  * @param msg message received, NULL on timeout or fatal error
740  */
741 static void
742 handle_iterate_result (void *cls, const struct GNUNET_MessageHeader *msg)
743 {
744   struct GNUNET_PEERSTORE_Handle *h = cls;
745   struct GNUNET_PEERSTORE_IterateContext *ic;
746   GNUNET_PEERSTORE_Processor callback;
747   void *callback_cls;
748   uint16_t msg_type;
749   struct GNUNET_PEERSTORE_Record *record;
750   int continue_iter;
751
752   ic = h->iterate_head;
753   if (NULL == ic)
754   {
755     LOG (GNUNET_ERROR_TYPE_ERROR,
756          _("Unexpected iteration response, this should not happen.\n"));
757     reconnect (h);
758     return;
759   }
760   callback = ic->callback;
761   callback_cls = ic->callback_cls;
762   if (NULL == msg)              /* Connection error */
763   {
764     if (NULL != callback)
765       callback (callback_cls, NULL,
766                 _("Error communicating with `PEERSTORE' service."));
767     reconnect (h);
768     return;
769   }
770   msg_type = ntohs (msg->type);
771   if (GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END == msg_type)
772   {
773     ic->iterating = GNUNET_NO;
774     GNUNET_PEERSTORE_iterate_cancel (ic);
775     if (NULL != callback)
776       callback (callback_cls, NULL, NULL);
777     return;
778   }
779   if (NULL != callback)
780   {
781     record = PEERSTORE_parse_record_message (msg);
782     if (NULL == record)
783       continue_iter =
784           callback (callback_cls, NULL,
785                     _("Received a malformed response from service."));
786     else
787     {
788       continue_iter = callback (callback_cls, record, NULL);
789       PEERSTORE_destroy_record (record);
790     }
791     if (GNUNET_NO == continue_iter)
792       ic->callback = NULL;
793   }
794 }
795
796
797 /**
798  * Cancel an iterate request
799  * Please do not call after the iterate request is done
800  *
801  * @param ic Iterate request context as returned by GNUNET_PEERSTORE_iterate()
802  */
803 void
804 GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic)
805 {
806   if (GNUNET_SCHEDULER_NO_TASK != ic->timeout_task)
807   {
808     GNUNET_SCHEDULER_cancel (ic->timeout_task);
809     ic->timeout_task = GNUNET_SCHEDULER_NO_TASK;
810   }
811   if (GNUNET_NO == ic->iterating)
812   {
813     if (NULL != ic->ev)
814     {
815       GNUNET_MQ_send_cancel (ic->ev);
816       ic->ev = NULL;
817     }
818     GNUNET_CONTAINER_DLL_remove (ic->h->iterate_head, ic->h->iterate_tail, ic);
819     GNUNET_free (ic->sub_system);
820     if (NULL != ic->key)
821       GNUNET_free (ic->key);
822     GNUNET_free (ic);
823   }
824   else
825     ic->callback = NULL;
826 }
827
828
829 /**
830  * Iterate over records matching supplied key information
831  *
832  * @param h handle to the PEERSTORE service
833  * @param sub_system name of sub system
834  * @param peer Peer identity (can be NULL)
835  * @param key entry key string (can be NULL)
836  * @param timeout time after which the iterate request is canceled
837  * @param callback function called with each matching record, all NULL's on end
838  * @param callback_cls closure for @a callback
839  * @return Handle to iteration request
840  */
841 struct GNUNET_PEERSTORE_IterateContext *
842 GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
843                           const char *sub_system,
844                           const struct GNUNET_PeerIdentity *peer,
845                           const char *key, struct GNUNET_TIME_Relative timeout,
846                           GNUNET_PEERSTORE_Processor callback,
847                           void *callback_cls)
848 {
849   struct GNUNET_MQ_Envelope *ev;
850   struct GNUNET_PEERSTORE_IterateContext *ic;
851
852   ev = PEERSTORE_create_record_mq_envelope (sub_system, peer, key, NULL, 0,
853                                             NULL, 0,
854                                             GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
855   ic = GNUNET_new (struct GNUNET_PEERSTORE_IterateContext);
856
857   ic->callback = callback;
858   ic->callback_cls = callback_cls;
859   ic->ev = ev;
860   ic->h = h;
861   ic->sub_system = GNUNET_strdup (sub_system);
862   if (NULL != peer)
863     ic->peer = *peer;
864   if (NULL != key)
865     ic->key = GNUNET_strdup (key);
866   ic->timeout = timeout;
867   ic->iterating = GNUNET_NO;
868   GNUNET_CONTAINER_DLL_insert_tail (h->iterate_head, h->iterate_tail, ic);
869   LOG (GNUNET_ERROR_TYPE_DEBUG,
870        "Sending an iterate request for sub system `%s'\n", sub_system);
871   GNUNET_MQ_notify_sent (ev, &iterate_request_sent, ic);
872   GNUNET_MQ_send (h->mq, ev);
873   ic->timeout_task =
874       GNUNET_SCHEDULER_add_delayed (timeout, &iterate_timeout, ic);
875   return ic;
876 }
877
878
879 /******************************************************************************/
880 /*******************            WATCH FUNCTIONS           *********************/
881 /******************************************************************************/
882
883 /**
884  * When a watch record is received
885  *
886  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
887  * @param msg message received, NULL on timeout or fatal error
888  */
889 static void
890 handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg)
891 {
892   struct GNUNET_PEERSTORE_Handle *h = cls;
893   struct GNUNET_PEERSTORE_Record *record;
894   struct GNUNET_HashCode keyhash;
895   struct GNUNET_PEERSTORE_WatchContext *wc;
896
897   if (NULL == msg)
898   {
899     LOG (GNUNET_ERROR_TYPE_ERROR,
900          _
901          ("Problem receiving a watch response, no way to determine which request.\n"));
902     reconnect (h);
903     return;
904   }
905   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a watch record from service.\n");
906   record = PEERSTORE_parse_record_message (msg);
907   PEERSTORE_hash_key (record->sub_system, record->peer, record->key, &keyhash);
908   wc = GNUNET_CONTAINER_multihashmap_get (h->watches, &keyhash);
909   if (NULL == wc)
910   {
911     LOG (GNUNET_ERROR_TYPE_ERROR,
912          _("Received a watch result for a non existing watch.\n"));
913     PEERSTORE_destroy_record (record);
914     reconnect (h);
915     return;
916   }
917   if (NULL != wc->callback)
918     wc->callback (wc->callback_cls, record, NULL);
919   PEERSTORE_destroy_record (record);
920 }
921
922
923 /**
924  * Cancel a watch request
925  *
926  * @param wc handle to the watch request
927  */
928 void
929 GNUNET_PEERSTORE_watch_cancel (struct GNUNET_PEERSTORE_WatchContext *wc)
930 {
931   struct GNUNET_PEERSTORE_Handle *h = wc->h;
932   struct GNUNET_MQ_Envelope *ev;
933   struct StoreKeyHashMessage *hm;
934
935   LOG (GNUNET_ERROR_TYPE_DEBUG, "Canceling watch.\n");
936   if (GNUNET_YES == wc->request_sent)   /* If request already sent to service, send a cancel request. */
937   {
938     ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL);
939     hm->keyhash = wc->keyhash;
940     GNUNET_MQ_send (h->mq, ev);
941     wc->callback = NULL;
942     wc->callback_cls = NULL;
943   }
944   if (NULL != wc->ev)
945   {
946     GNUNET_MQ_send_cancel (wc->ev);
947     wc->ev = NULL;
948   }
949   GNUNET_CONTAINER_multihashmap_remove (h->watches, &wc->keyhash, wc);
950   GNUNET_free (wc);
951
952 }
953
954
955 /**
956  * Request watching a given key
957  * User will be notified with any new values added to key
958  *
959  * @param h handle to the PEERSTORE service
960  * @param sub_system name of sub system
961  * @param peer Peer identity
962  * @param key entry key string
963  * @param callback function called with each new value
964  * @param callback_cls closure for @a callback
965  * @return Handle to watch request
966  */
967 struct GNUNET_PEERSTORE_WatchContext *
968 GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h,
969                         const char *sub_system,
970                         const struct GNUNET_PeerIdentity *peer, const char *key,
971                         GNUNET_PEERSTORE_Processor callback, void *callback_cls)
972 {
973   struct GNUNET_MQ_Envelope *ev;
974   struct StoreKeyHashMessage *hm;
975   struct GNUNET_PEERSTORE_WatchContext *wc;
976
977   GNUNET_assert (NULL != sub_system);
978   GNUNET_assert (NULL != peer);
979   GNUNET_assert (NULL != key);
980   ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
981   PEERSTORE_hash_key (sub_system, peer, key, &hm->keyhash);
982   wc = GNUNET_new (struct GNUNET_PEERSTORE_WatchContext);
983
984   wc->callback = callback;
985   wc->callback_cls = callback_cls;
986   wc->ev = ev;
987   wc->h = h;
988   wc->request_sent = GNUNET_NO;
989   wc->keyhash = hm->keyhash;
990   if (NULL == h->watches)
991     h->watches = GNUNET_CONTAINER_multihashmap_create (5, GNUNET_NO);
992   GNUNET_CONTAINER_multihashmap_put (h->watches, &wc->keyhash, wc,
993                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
994   LOG (GNUNET_ERROR_TYPE_DEBUG,
995        "Sending a watch request for ss `%s', peer `%s', key `%s'.\n",
996        sub_system, GNUNET_i2s (peer), key);
997   GNUNET_MQ_notify_sent (ev, &watch_request_sent, wc);
998   GNUNET_MQ_send (h->mq, ev);
999   return wc;
1000 }
1001
1002 /* end of peerstore_api.c */