+/******************************************************************************/
+/******************* WATCH FUNCTIONS *********************/
+/******************************************************************************/
+
+/**
+ * When a watch record is received
+ *
+ * @param cls a `struct GNUNET_PEERSTORE_Handle *`
+ * @param msg message received, NULL on timeout or fatal error
+ */
+static void
+handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg)
+{
+ struct GNUNET_PEERSTORE_Handle *h = cls;
+ struct GNUNET_PEERSTORE_Record *record;
+ struct GNUNET_HashCode keyhash;
+ struct GNUNET_PEERSTORE_WatchContext *wc;
+
+ if (NULL == msg)
+ {
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ _
+ ("Problem receiving a watch response, no way to determine which request.\n"));
+ reconnect (h);
+ return;
+ }
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a watch record from service.\n");
+ record = PEERSTORE_parse_record_message (msg);
+ PEERSTORE_hash_key (record->sub_system, record->peer, record->key, &keyhash);
+ // FIXME: what if there are multiple watches for the same key?
+ wc = GNUNET_CONTAINER_multihashmap_get (h->watches, &keyhash);
+ if (NULL == wc)
+ {
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ _("Received a watch result for a non existing watch.\n"));
+ PEERSTORE_destroy_record (record);
+ reconnect (h);
+ return;
+ }
+ if (NULL != wc->callback)
+ wc->callback (wc->callback_cls, record, NULL);
+ PEERSTORE_destroy_record (record);
+}
+
+
+/**
+ * Cancel a watch request
+ *
+ * @param wc handle to the watch request
+ */
+void
+GNUNET_PEERSTORE_watch_cancel (struct GNUNET_PEERSTORE_WatchContext *wc)
+{
+ struct GNUNET_PEERSTORE_Handle *h = wc->h;
+ struct GNUNET_MQ_Envelope *ev;
+ struct StoreKeyHashMessage *hm;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Canceling watch.\n");
+ ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL);
+ hm->keyhash = wc->keyhash;
+ GNUNET_MQ_send (h->mq, ev);
+ GNUNET_CONTAINER_multihashmap_remove (h->watches,
+ &wc->keyhash,
+ wc);
+ GNUNET_free (wc);