8ef04604f209be4fc9abe8240f95a5cbe259fbac
[oweals/gnunet.git] / src / peerstore / peerstore_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2013-2014 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20 /**
21  * @file peerstore/peerstore_api.c
22  * @brief API for peerstore
23  * @author Omar Tarabai
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "peerstore.h"
29 #include "peerstore_common.h"
30
31 #define LOG(kind,...) GNUNET_log_from (kind, "peerstore-api",__VA_ARGS__)
32
33 /******************************************************************************/
34 /************************      DATA STRUCTURES     ****************************/
35 /******************************************************************************/
36
37 /**
38  * Handle to the PEERSTORE service.
39  */
40 struct GNUNET_PEERSTORE_Handle
41 {
42
43   /**
44    * Our configuration.
45    */
46   const struct GNUNET_CONFIGURATION_Handle *cfg;
47
48   /**
49    * Connection to the service.
50    */
51   struct GNUNET_CLIENT_Connection *client;
52
53   /**
54    * Message queue
55    */
56   struct GNUNET_MQ_Handle *mq;
57
58   /**
59    * Head of active STORE requests.
60    */
61   struct GNUNET_PEERSTORE_StoreContext *store_head;
62
63   /**
64    * Tail of active STORE requests.
65    */
66   struct GNUNET_PEERSTORE_StoreContext *store_tail;
67
68   /**
69    * Head of active ITERATE requests.
70    */
71   struct GNUNET_PEERSTORE_IterateContext *iterate_head;
72
73   /**
74    * Tail of active ITERATE requests.
75    */
76   struct GNUNET_PEERSTORE_IterateContext *iterate_tail;
77
78   /**
79    * Hashmap of watch requests
80    */
81   struct GNUNET_CONTAINER_MultiHashMap *watches;
82
83   /**
84    * Are we in the process of disconnecting but need to sync first?
85    */
86   int disconnecting;
87
88 };
89
90 /**
91  * Context for a store request
92  */
93 struct GNUNET_PEERSTORE_StoreContext
94 {
95   /**
96    * Kept in a DLL.
97    */
98   struct GNUNET_PEERSTORE_StoreContext *next;
99
100   /**
101    * Kept in a DLL.
102    */
103   struct GNUNET_PEERSTORE_StoreContext *prev;
104
105   /**
106    * Handle to the PEERSTORE service.
107    */
108   struct GNUNET_PEERSTORE_Handle *h;
109
110   /**
111    * Continuation called with service response
112    */
113   GNUNET_PEERSTORE_Continuation cont;
114
115   /**
116    * Closure for @e cont
117    */
118   void *cont_cls;
119
120   /**
121    * Which subsystem does the store?
122    */
123   char *sub_system;
124
125   /**
126    * Key for the store operation.
127    */
128   char *key;
129
130   /**
131    * Contains @e size bytes.
132    */
133   void *value;
134
135   /**
136    * MQ Envelope with store request message
137    */
138   struct GNUNET_MQ_Envelope *ev;
139
140   /**
141    * Peer the store is for.
142    */
143   struct GNUNET_PeerIdentity peer;
144
145   /**
146    * Number of bytes in @e value.
147    */
148   size_t size;
149
150   /**
151    * When does the value expire?
152    */
153   struct GNUNET_TIME_Absolute expiry;
154
155   /**
156    * Options for the store operation.
157    */
158   enum GNUNET_PEERSTORE_StoreOption options;
159
160 };
161
162 /**
163  * Context for a iterate request
164  */
165 struct GNUNET_PEERSTORE_IterateContext
166 {
167   /**
168    * Kept in a DLL.
169    */
170   struct GNUNET_PEERSTORE_IterateContext *next;
171
172   /**
173    * Kept in a DLL.
174    */
175   struct GNUNET_PEERSTORE_IterateContext *prev;
176
177   /**
178    * Handle to the PEERSTORE service.
179    */
180   struct GNUNET_PEERSTORE_Handle *h;
181
182   /**
183    * Which subsystem does the store?
184    */
185   char *sub_system;
186
187   /**
188    * Peer the store is for.
189    */
190   struct GNUNET_PeerIdentity peer;
191
192   /**
193    * Key for the store operation.
194    */
195   char *key;
196
197   /**
198    * Operation timeout
199    */
200   struct GNUNET_TIME_Relative timeout;
201
202   /**
203    * MQ Envelope with iterate request message
204    */
205   struct GNUNET_MQ_Envelope *ev;
206
207   /**
208    * Callback with each matching record
209    */
210   GNUNET_PEERSTORE_Processor callback;
211
212   /**
213    * Closure for @e callback
214    */
215   void *callback_cls;
216
217   /**
218    * #GNUNET_YES / #GNUNET_NO
219    * Iterate request has been sent and we are still expecting records
220    */
221   int iterating;
222
223   /**
224    * Task identifier for the function called
225    * on iterate request timeout
226    */
227   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
228
229 };
230
231 /**
232  * Context for a watch request
233  */
234 struct GNUNET_PEERSTORE_WatchContext
235 {
236   /**
237    * Kept in a DLL.
238    */
239   struct GNUNET_PEERSTORE_WatchContext *next;
240
241   /**
242    * Kept in a DLL.
243    */
244   struct GNUNET_PEERSTORE_WatchContext *prev;
245
246   /**
247    * Handle to the PEERSTORE service.
248    */
249   struct GNUNET_PEERSTORE_Handle *h;
250
251   /**
252    * MQ Envelope with watch request message
253    */
254   struct GNUNET_MQ_Envelope *ev;
255
256   /**
257    * Callback with each record received
258    */
259   GNUNET_PEERSTORE_Processor callback;
260
261   /**
262    * Closure for @e callback
263    */
264   void *callback_cls;
265
266   /**
267    * Hash of the combined key
268    */
269   struct GNUNET_HashCode keyhash;
270
271   /**
272    * #GNUNET_YES / #GNUNET_NO
273    * if sent, cannot be canceled
274    */
275   int request_sent;
276
277 };
278
279 /******************************************************************************/
280 /*******************             DECLARATIONS             *********************/
281 /******************************************************************************/
282
283 /**
284  * When a response for iterate request is received
285  *
286  * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
287  * @param msg message received, NULL on timeout or fatal error
288  */
289 static void
290 handle_iterate_result (void *cls, const struct GNUNET_MessageHeader *msg);
291
292 /**
293  * When a watch record is received
294  *
295  * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
296  * @param msg message received, NULL on timeout or fatal error
297  */
298 static void
299 handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg);
300
301 /**
302  * Close the existing connection to PEERSTORE and reconnect.
303  *
304  * @param h handle to the service
305  */
306 static void
307 reconnect (struct GNUNET_PEERSTORE_Handle *h);
308
309 /**
310  * Callback after MQ envelope is sent
311  *
312  * @param cls a 'struct GNUNET_PEERSTORE_WatchContext *'
313  */
314 static void
315 watch_request_sent (void *cls)
316 {
317   struct GNUNET_PEERSTORE_WatchContext *wc = cls;
318
319   wc->request_sent = GNUNET_YES;
320   wc->ev = NULL;
321 }
322
323
324 /**
325  * Callback after MQ envelope is sent
326  *
327  * @param cls a `struct GNUNET_PEERSTORE_IterateContext *`
328  */
329 static void
330 iterate_request_sent (void *cls)
331 {
332   struct GNUNET_PEERSTORE_IterateContext *ic = cls;
333
334   LOG (GNUNET_ERROR_TYPE_DEBUG, "Iterate request sent to service.\n");
335   ic->iterating = GNUNET_YES;
336   ic->ev = NULL;
337 }
338
339
340 /**
341  * Callback after MQ envelope is sent
342  *
343  * @param cls a `struct GNUNET_PEERSTORE_StoreContext *`
344  */
345 static void
346 store_request_sent (void *cls)
347 {
348   struct GNUNET_PEERSTORE_StoreContext *sc = cls;
349   GNUNET_PEERSTORE_Continuation cont;
350   void *cont_cls;
351
352   sc->ev = NULL;
353   cont = sc->cont;
354   cont_cls = sc->cont_cls;
355   GNUNET_PEERSTORE_store_cancel (sc);
356   if (NULL != cont)
357     cont (cont_cls, GNUNET_OK);
358 }
359
360
361 /**
362  * MQ message handlers
363  */
364 static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
365   {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD, 0},
366   {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END,
367    sizeof (struct GNUNET_MessageHeader)},
368   {&handle_watch_result, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD, 0},
369   GNUNET_MQ_HANDLERS_END
370 };
371
372 /******************************************************************************/
373 /*******************         CONNECTION FUNCTIONS         *********************/
374 /******************************************************************************/
375
376 static void
377 handle_client_error (void *cls, enum GNUNET_MQ_Error error)
378 {
379   struct GNUNET_PEERSTORE_Handle *h = cls;
380
381   LOG (GNUNET_ERROR_TYPE_ERROR,
382        _("Received an error notification from MQ of type: %d\n"), error);
383   reconnect (h);
384 }
385
386
387 /**
388  * Iterator over previous watches to resend them
389  */
390 static int
391 rewatch_it (void *cls, const struct GNUNET_HashCode *key, void *value)
392 {
393   struct GNUNET_PEERSTORE_Handle *h = cls;
394   struct GNUNET_PEERSTORE_WatchContext *wc = value;
395   struct StoreKeyHashMessage *hm;
396
397   if (GNUNET_YES == wc->request_sent)
398   {                             /* Envelope gone, create new one. */
399     wc->ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
400     hm->keyhash = wc->keyhash;
401     wc->request_sent = GNUNET_NO;
402   }
403   GNUNET_MQ_notify_sent (wc->ev, &watch_request_sent, wc);
404   GNUNET_MQ_send (h->mq, wc->ev);
405   return GNUNET_YES;
406 }
407
408
409 /**
410  * Called when the iterate request is timedout
411  *
412  * @param cls a `struct GNUNET_PEERSTORE_IterateContext *`
413  * @param tc Scheduler task context (unused)
414  */
415 static void
416 iterate_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
417 {
418   struct GNUNET_PEERSTORE_IterateContext *ic = cls;
419   GNUNET_PEERSTORE_Processor callback;
420   void *callback_cls;
421
422   ic->timeout_task = GNUNET_SCHEDULER_NO_TASK;
423   callback = ic->callback;
424   callback_cls = ic->callback_cls;
425   GNUNET_PEERSTORE_iterate_cancel (ic);
426   if (NULL != callback)
427     callback (callback_cls, NULL, _("timeout"));
428 }
429
430
431 /**
432  * Close the existing connection to PEERSTORE and reconnect.
433  *
434  * @param h handle to the service
435  */
436 static void
437 reconnect (struct GNUNET_PEERSTORE_Handle *h)
438 {
439   struct GNUNET_PEERSTORE_IterateContext *ic;
440   struct GNUNET_PEERSTORE_IterateContext *ic_tmp;
441   GNUNET_PEERSTORE_Processor icb;
442   void *icb_cls;
443   struct GNUNET_PEERSTORE_StoreContext *sc;
444
445   LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n");
446   for (sc = h->store_head; NULL != sc; sc = sc->next)
447   {
448     if (NULL != sc->ev)
449     {
450       GNUNET_MQ_send_cancel (sc->ev);
451       sc->ev = NULL;
452     }
453   }
454   ic = h->iterate_head;
455   while (NULL != ic)
456   {
457     if (GNUNET_YES == ic->iterating)
458     {
459       icb = ic->callback;
460       icb_cls = ic->callback_cls;
461       ic->iterating = GNUNET_NO;
462       ic_tmp = ic;
463       ic = ic->next;
464       GNUNET_PEERSTORE_iterate_cancel (ic_tmp);
465       if (NULL != icb)
466         icb (icb_cls, NULL, _("Iteration canceled due to reconnection."));
467     }
468     else
469     {
470       if (GNUNET_SCHEDULER_NO_TASK != ic->timeout_task)
471       {
472         GNUNET_SCHEDULER_cancel (ic->timeout_task);
473         ic->timeout_task = GNUNET_SCHEDULER_NO_TASK;
474       }
475       if (NULL != ic->ev)
476       {
477         GNUNET_MQ_send_cancel (ic->ev);
478         ic->ev = NULL;
479       }
480       ic = ic->next;
481     }
482   }
483   if (NULL != h->mq)
484   {
485     GNUNET_MQ_destroy (h->mq);
486     h->mq = NULL;
487   }
488   if (NULL != h->client)
489   {
490     GNUNET_CLIENT_disconnect (h->client);
491     h->client = NULL;
492   }
493   h->client = GNUNET_CLIENT_connect ("peerstore", h->cfg);
494   GNUNET_assert (NULL != h->client);
495   h->mq =
496       GNUNET_MQ_queue_for_connection_client (h->client, mq_handlers,
497                                              &handle_client_error, h);
498   LOG (GNUNET_ERROR_TYPE_DEBUG,
499        "Resending pending requests after reconnect.\n");
500   if (NULL != h->watches)
501     GNUNET_CONTAINER_multihashmap_iterate (h->watches, &rewatch_it, h);
502   for (ic = h->iterate_head; NULL != ic; ic = ic->next)
503   {
504     ic->ev =
505         PEERSTORE_create_record_mq_envelope (ic->sub_system, &ic->peer, ic->key,
506                                              NULL, 0, NULL, 0,
507                                              GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
508     GNUNET_MQ_notify_sent (ic->ev, &iterate_request_sent, ic);
509     GNUNET_MQ_send (h->mq, ic->ev);
510     ic->timeout_task =
511         GNUNET_SCHEDULER_add_delayed (ic->timeout, &iterate_timeout, ic);
512   }
513   for (sc = h->store_head; NULL != sc; sc = sc->next)
514   {
515     sc->ev =
516         PEERSTORE_create_record_mq_envelope (sc->sub_system, &sc->peer, sc->key,
517                                              sc->value, sc->size, &sc->expiry,
518                                              sc->options,
519                                              GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
520     GNUNET_MQ_notify_sent (sc->ev, &store_request_sent, sc);
521     GNUNET_MQ_send (h->mq, sc->ev);
522   }
523 }
524
525
526 /**
527  * Iterator over watch requests to cancel them.
528  *
529  * @param cls unsused
530  * @param key key to the watch request
531  * @param value watch context
532  * @return #GNUNET_YES to continue iteration
533  */
534 static int
535 destroy_watch (void *cls, const struct GNUNET_HashCode *key, void *value)
536 {
537   struct GNUNET_PEERSTORE_WatchContext *wc = value;
538
539   GNUNET_PEERSTORE_watch_cancel (wc);
540   return GNUNET_YES;
541 }
542
543
544 /**
545  * Kill the connection to the service. This can be delayed in case of pending
546  * STORE requests and the user explicitly asked to sync first. Otherwise it is
547  * performed instantly.
548  *
549  * @param h Handle to the service.
550  */
551 static void
552 do_disconnect (struct GNUNET_PEERSTORE_Handle *h)
553 {
554   if (NULL != h->mq)
555   {
556     GNUNET_MQ_destroy (h->mq);
557     h->mq = NULL;
558   }
559   if (NULL != h->client)
560   {
561     GNUNET_CLIENT_disconnect (h->client);
562     h->client = NULL;
563   }
564   GNUNET_free (h);
565 }
566
567
568 /**
569  * Connect to the PEERSTORE service.
570  *
571  * @param cfg configuration to use
572  * @return NULL on error
573  */
574 struct GNUNET_PEERSTORE_Handle *
575 GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
576 {
577   struct GNUNET_PEERSTORE_Handle *h;
578
579   h = GNUNET_new (struct GNUNET_PEERSTORE_Handle);
580
581   h->client = GNUNET_CLIENT_connect ("peerstore", cfg);
582   if (NULL == h->client)
583   {
584     GNUNET_free (h);
585     return NULL;
586   }
587   h->cfg = cfg;
588   h->disconnecting = GNUNET_NO;
589   h->mq =
590       GNUNET_MQ_queue_for_connection_client (h->client, mq_handlers,
591                                              &handle_client_error, h);
592   if (NULL == h->mq)
593   {
594     GNUNET_CLIENT_disconnect (h->client);
595     GNUNET_free (h);
596     return NULL;
597   }
598   LOG (GNUNET_ERROR_TYPE_DEBUG, "New connection created\n");
599   return h;
600 }
601
602
603 /**
604  * Disconnect from the PEERSTORE service. Any pending ITERATE and WATCH requests
605  * will be canceled.
606  * Any pending STORE requests will depend on @e snyc_first flag.
607  *
608  * @param h handle to disconnect
609  * @param sync_first send any pending STORE requests before disconnecting
610  */
611 void
612 GNUNET_PEERSTORE_disconnect (struct GNUNET_PEERSTORE_Handle *h, int sync_first)
613 {
614   struct GNUNET_PEERSTORE_IterateContext *ic;
615   struct GNUNET_PEERSTORE_IterateContext *ic_iter;
616   struct GNUNET_PEERSTORE_StoreContext *sc;
617   struct GNUNET_PEERSTORE_StoreContext *sc_iter;
618
619   LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting.\n");
620   if (NULL != h->watches)
621   {
622     GNUNET_CONTAINER_multihashmap_iterate (h->watches, &destroy_watch, NULL);
623     GNUNET_CONTAINER_multihashmap_destroy (h->watches);
624     h->watches = NULL;
625   }
626   ic_iter = h->iterate_head;
627   while (NULL != ic_iter)
628   {
629     GNUNET_break (0);
630     ic = ic_iter;
631     ic_iter = ic_iter->next;
632     GNUNET_PEERSTORE_iterate_cancel (ic);
633   }
634   if (NULL != h->store_head)
635   {
636     if (GNUNET_YES == sync_first)
637     {
638       LOG (GNUNET_ERROR_TYPE_DEBUG,
639            "Delaying disconnection due to pending store requests.\n");
640       h->disconnecting = GNUNET_YES;
641       return;
642     }
643     sc_iter = h->store_head;
644     while (NULL != sc_iter)
645     {
646       sc = sc_iter;
647       sc_iter = sc_iter->next;
648       GNUNET_PEERSTORE_store_cancel (sc);
649     }
650   }
651   do_disconnect (h);
652 }
653
654
655 /******************************************************************************/
656 /*******************            STORE FUNCTIONS           *********************/
657 /******************************************************************************/
658
659
660 /**
661  * Cancel a store request
662  *
663  * @param sc Store request context
664  */
665 void
666 GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc)
667 {
668   struct GNUNET_PEERSTORE_Handle *h = sc->h;
669
670   if (NULL != sc->ev)
671   {
672     GNUNET_MQ_send_cancel (sc->ev);
673     sc->ev = NULL;
674   }
675   GNUNET_CONTAINER_DLL_remove (sc->h->store_head, sc->h->store_tail, sc);
676   GNUNET_free (sc->sub_system);
677   GNUNET_free (sc->value);
678   GNUNET_free (sc->key);
679   GNUNET_free (sc);
680   if ((GNUNET_YES == h->disconnecting) && (NULL == h->store_head))
681     do_disconnect (h);
682 }
683
684
685 /**
686  * Store a new entry in the PEERSTORE.
687  * Note that stored entries can be lost in some cases
688  * such as power failure.
689  *
690  * @param h Handle to the PEERSTORE service
691  * @param sub_system name of the sub system
692  * @param peer Peer Identity
693  * @param key entry key
694  * @param value entry value BLOB
695  * @param size size of @e value
696  * @param expiry absolute time after which the entry is (possibly) deleted
697  * @param options options specific to the storage operation
698  * @param cont Continuation function after the store request is sent
699  * @param cont_cls Closure for @a cont
700  */
701 struct GNUNET_PEERSTORE_StoreContext *
702 GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
703                         const char *sub_system,
704                         const struct GNUNET_PeerIdentity *peer, const char *key,
705                         const void *value, size_t size,
706                         struct GNUNET_TIME_Absolute expiry,
707                         enum GNUNET_PEERSTORE_StoreOption options,
708                         GNUNET_PEERSTORE_Continuation cont, void *cont_cls)
709 {
710   struct GNUNET_MQ_Envelope *ev;
711   struct GNUNET_PEERSTORE_StoreContext *sc;
712
713   LOG (GNUNET_ERROR_TYPE_DEBUG,
714        "Storing value (size: %lu) for subsytem `%s', peer `%s', key `%s'\n",
715        size, sub_system, GNUNET_i2s (peer), key);
716   ev = PEERSTORE_create_record_mq_envelope (sub_system, peer, key, value, size,
717                                             &expiry, options,
718                                             GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
719   sc = GNUNET_new (struct GNUNET_PEERSTORE_StoreContext);
720
721   sc->sub_system = GNUNET_strdup (sub_system);
722   sc->peer = *peer;
723   sc->key = GNUNET_strdup (key);
724   sc->value = GNUNET_memdup (value, size);
725   sc->size = size;
726   sc->expiry = expiry;
727   sc->options = options;
728   sc->cont = cont;
729   sc->cont_cls = cont_cls;
730   sc->h = h;
731
732   GNUNET_CONTAINER_DLL_insert_tail (h->store_head, h->store_tail, sc);
733   GNUNET_MQ_notify_sent (ev, &store_request_sent, sc);
734   GNUNET_MQ_send (h->mq, ev);
735   return sc;
736
737 }
738
739
740 /******************************************************************************/
741 /*******************           ITERATE FUNCTIONS          *********************/
742 /******************************************************************************/
743
744 /**
745  * When a response for iterate request is received
746  *
747  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
748  * @param msg message received, NULL on timeout or fatal error
749  */
750 static void
751 handle_iterate_result (void *cls, const struct GNUNET_MessageHeader *msg)
752 {
753   struct GNUNET_PEERSTORE_Handle *h = cls;
754   struct GNUNET_PEERSTORE_IterateContext *ic;
755   GNUNET_PEERSTORE_Processor callback;
756   void *callback_cls;
757   uint16_t msg_type;
758   struct GNUNET_PEERSTORE_Record *record;
759   int continue_iter;
760
761   ic = h->iterate_head;
762   if (NULL == ic)
763   {
764     LOG (GNUNET_ERROR_TYPE_ERROR,
765          _("Unexpected iteration response, this should not happen.\n"));
766     reconnect (h);
767     return;
768   }
769   callback = ic->callback;
770   callback_cls = ic->callback_cls;
771   if (NULL == msg)              /* Connection error */
772   {
773     if (NULL != callback)
774       callback (callback_cls, NULL,
775                 _("Error communicating with `PEERSTORE' service."));
776     reconnect (h);
777     return;
778   }
779   msg_type = ntohs (msg->type);
780   if (GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END == msg_type)
781   {
782     ic->iterating = GNUNET_NO;
783     GNUNET_PEERSTORE_iterate_cancel (ic);
784     if (NULL != callback)
785       callback (callback_cls, NULL, NULL);
786     return;
787   }
788   if (NULL != callback)
789   {
790     record = PEERSTORE_parse_record_message (msg);
791     if (NULL == record)
792       continue_iter =
793           callback (callback_cls, NULL,
794                     _("Received a malformed response from service."));
795     else
796     {
797       continue_iter = callback (callback_cls, record, NULL);
798       PEERSTORE_destroy_record (record);
799     }
800     if (GNUNET_NO == continue_iter)
801       ic->callback = NULL;
802   }
803 }
804
805
806 /**
807  * Cancel an iterate request
808  * Please do not call after the iterate request is done
809  *
810  * @param ic Iterate request context as returned by GNUNET_PEERSTORE_iterate()
811  */
812 void
813 GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic)
814 {
815   if (GNUNET_SCHEDULER_NO_TASK != ic->timeout_task)
816   {
817     GNUNET_SCHEDULER_cancel (ic->timeout_task);
818     ic->timeout_task = GNUNET_SCHEDULER_NO_TASK;
819   }
820   if (GNUNET_NO == ic->iterating)
821   {
822     if (NULL != ic->ev)
823     {
824       GNUNET_MQ_send_cancel (ic->ev);
825       ic->ev = NULL;
826     }
827     GNUNET_CONTAINER_DLL_remove (ic->h->iterate_head, ic->h->iterate_tail, ic);
828     GNUNET_free (ic->sub_system);
829     if (NULL != ic->key)
830       GNUNET_free (ic->key);
831     GNUNET_free (ic);
832   }
833   else
834     ic->callback = NULL;
835 }
836
837
838 /**
839  * Iterate over records matching supplied key information
840  *
841  * @param h handle to the PEERSTORE service
842  * @param sub_system name of sub system
843  * @param peer Peer identity (can be NULL)
844  * @param key entry key string (can be NULL)
845  * @param timeout time after which the iterate request is canceled
846  * @param callback function called with each matching record, all NULL's on end
847  * @param callback_cls closure for @a callback
848  * @return Handle to iteration request
849  */
850 struct GNUNET_PEERSTORE_IterateContext *
851 GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
852                           const char *sub_system,
853                           const struct GNUNET_PeerIdentity *peer,
854                           const char *key, struct GNUNET_TIME_Relative timeout,
855                           GNUNET_PEERSTORE_Processor callback,
856                           void *callback_cls)
857 {
858   struct GNUNET_MQ_Envelope *ev;
859   struct GNUNET_PEERSTORE_IterateContext *ic;
860
861   ev = PEERSTORE_create_record_mq_envelope (sub_system, peer, key, NULL, 0,
862                                             NULL, 0,
863                                             GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
864   ic = GNUNET_new (struct GNUNET_PEERSTORE_IterateContext);
865
866   ic->callback = callback;
867   ic->callback_cls = callback_cls;
868   ic->ev = ev;
869   ic->h = h;
870   ic->sub_system = GNUNET_strdup (sub_system);
871   if (NULL != peer)
872     ic->peer = *peer;
873   if (NULL != key)
874     ic->key = GNUNET_strdup (key);
875   ic->timeout = timeout;
876   ic->iterating = GNUNET_NO;
877   GNUNET_CONTAINER_DLL_insert_tail (h->iterate_head, h->iterate_tail, ic);
878   LOG (GNUNET_ERROR_TYPE_DEBUG,
879        "Sending an iterate request for sub system `%s'\n", sub_system);
880   GNUNET_MQ_notify_sent (ev, &iterate_request_sent, ic);
881   GNUNET_MQ_send (h->mq, ev);
882   ic->timeout_task =
883       GNUNET_SCHEDULER_add_delayed (timeout, &iterate_timeout, ic);
884   return ic;
885 }
886
887
888 /******************************************************************************/
889 /*******************            WATCH FUNCTIONS           *********************/
890 /******************************************************************************/
891
892 /**
893  * When a watch record is received
894  *
895  * @param cls a `struct GNUNET_PEERSTORE_Handle *`
896  * @param msg message received, NULL on timeout or fatal error
897  */
898 static void
899 handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg)
900 {
901   struct GNUNET_PEERSTORE_Handle *h = cls;
902   struct GNUNET_PEERSTORE_Record *record;
903   struct GNUNET_HashCode keyhash;
904   struct GNUNET_PEERSTORE_WatchContext *wc;
905
906   if (NULL == msg)
907   {
908     LOG (GNUNET_ERROR_TYPE_ERROR,
909          _
910          ("Problem receiving a watch response, no way to determine which request.\n"));
911     reconnect (h);
912     return;
913   }
914   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a watch record from service.\n");
915   record = PEERSTORE_parse_record_message (msg);
916   PEERSTORE_hash_key (record->sub_system, record->peer, record->key, &keyhash);
917   wc = GNUNET_CONTAINER_multihashmap_get (h->watches, &keyhash);
918   if (NULL == wc)
919   {
920     LOG (GNUNET_ERROR_TYPE_ERROR,
921          _("Received a watch result for a non existing watch.\n"));
922     PEERSTORE_destroy_record (record);
923     reconnect (h);
924     return;
925   }
926   if (NULL != wc->callback)
927     wc->callback (wc->callback_cls, record, NULL);
928   PEERSTORE_destroy_record (record);
929 }
930
931
932 /**
933  * Cancel a watch request
934  *
935  * @param wc handle to the watch request
936  */
937 void
938 GNUNET_PEERSTORE_watch_cancel (struct GNUNET_PEERSTORE_WatchContext *wc)
939 {
940   struct GNUNET_PEERSTORE_Handle *h = wc->h;
941   struct GNUNET_MQ_Envelope *ev;
942   struct StoreKeyHashMessage *hm;
943
944   LOG (GNUNET_ERROR_TYPE_DEBUG, "Canceling watch.\n");
945   if (GNUNET_YES == wc->request_sent)   /* If request already sent to service, send a cancel request. */
946   {
947     ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL);
948     hm->keyhash = wc->keyhash;
949     GNUNET_MQ_send (h->mq, ev);
950     wc->callback = NULL;
951     wc->callback_cls = NULL;
952   }
953   if (NULL != wc->ev)
954   {
955     GNUNET_MQ_send_cancel (wc->ev);
956     wc->ev = NULL;
957   }
958   GNUNET_CONTAINER_multihashmap_remove (h->watches, &wc->keyhash, wc);
959   GNUNET_free (wc);
960
961 }
962
963
964 /**
965  * Request watching a given key
966  * User will be notified with any new values added to key
967  *
968  * @param h handle to the PEERSTORE service
969  * @param sub_system name of sub system
970  * @param peer Peer identity
971  * @param key entry key string
972  * @param callback function called with each new value
973  * @param callback_cls closure for @a callback
974  * @return Handle to watch request
975  */
976 struct GNUNET_PEERSTORE_WatchContext *
977 GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h,
978                         const char *sub_system,
979                         const struct GNUNET_PeerIdentity *peer, const char *key,
980                         GNUNET_PEERSTORE_Processor callback, void *callback_cls)
981 {
982   struct GNUNET_MQ_Envelope *ev;
983   struct StoreKeyHashMessage *hm;
984   struct GNUNET_PEERSTORE_WatchContext *wc;
985
986   GNUNET_assert (NULL != sub_system);
987   GNUNET_assert (NULL != peer);
988   GNUNET_assert (NULL != key);
989   ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
990   PEERSTORE_hash_key (sub_system, peer, key, &hm->keyhash);
991   wc = GNUNET_new (struct GNUNET_PEERSTORE_WatchContext);
992
993   wc->callback = callback;
994   wc->callback_cls = callback_cls;
995   wc->ev = ev;
996   wc->h = h;
997   wc->request_sent = GNUNET_NO;
998   wc->keyhash = hm->keyhash;
999   if (NULL == h->watches)
1000     h->watches = GNUNET_CONTAINER_multihashmap_create (5, GNUNET_NO);
1001   GNUNET_CONTAINER_multihashmap_put (h->watches, &wc->keyhash, wc,
1002                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
1003   LOG (GNUNET_ERROR_TYPE_DEBUG,
1004        "Sending a watch request for ss `%s', peer `%s', key `%s'.\n",
1005        sub_system, GNUNET_i2s (peer), key);
1006   GNUNET_MQ_notify_sent (ev, &watch_request_sent, wc);
1007   GNUNET_MQ_send (h->mq, ev);
1008   return wc;
1009 }
1010
1011 /* end of peerstore_api.c */