2 * This file is part of GNUnet
3 * Copyright (C) 2013 GNUnet e.V.
5 * GNUnet is free software: you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published
7 * by the Free Software Foundation, either version 3 of the License,
8 * or (at your option) any later version.
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Affero General Public License for more details.
17 * @file psycstore/psycstore_api.c
18 * @brief API to interact with the PSYCstore service
19 * @author Gabor X Toth
20 * @author Christian Grothoff
26 #include "gnunet_util_lib.h"
27 #include "gnunet_constants.h"
28 #include "gnunet_protocols.h"
29 #include "gnunet_psycstore_service.h"
30 #include "gnunet_multicast_service.h"
31 #include "psycstore.h"
33 #define LOG(kind,...) GNUNET_log_from (kind, "psycstore-api",__VA_ARGS__)
36 * Handle for an operation with the PSYCstore service.
38 struct GNUNET_PSYCSTORE_OperationHandle
42 * Main PSYCstore handle.
44 struct GNUNET_PSYCSTORE_Handle *h;
50 GNUNET_PSYCSTORE_FragmentCallback fragment_cb;
51 GNUNET_PSYCSTORE_CountersCallback counters_cb;
52 GNUNET_PSYCSTORE_StateCallback state_cb;
56 * Closure for callbacks.
63 struct GNUNET_MQ_Envelope *env;
73 * Handle for the service.
75 struct GNUNET_PSYCSTORE_Handle
78 * Configuration to use.
80 const struct GNUNET_CONFIGURATION_Handle *cfg;
85 struct GNUNET_MQ_Handle *mq;
90 struct GNUNET_OP_Handle *op;
93 * Task doing exponential back-off trying to reconnect.
95 struct GNUNET_SCHEDULER_Task *reconnect_task;
98 * Delay for next connect retry.
100 struct GNUNET_TIME_Relative reconnect_delay;
103 GNUNET_PSYCSTORE_FragmentCallback *fragment_cb;
105 GNUNET_PSYCSTORE_CountersCallback *counters_cb;
107 GNUNET_PSYCSTORE_StateCallback *state_cb;
109 * Closure for callbacks.
116 check_result_code (void *cls, const struct OperationResult *opres)
118 uint16_t size = ntohs (opres->header.size);
119 const char *str = (const char *) &opres[1];
120 if ( (sizeof (*opres) < size) &&
121 ('\0' != str[size - sizeof (*opres) - 1]) )
124 return GNUNET_SYSERR;
132 handle_result_code (void *cls, const struct OperationResult *opres)
134 struct GNUNET_PSYCSTORE_Handle *h = cls;
135 struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
136 uint16_t size = ntohs (opres->header.size);
139 str = (sizeof (*opres) < size) ? (const char *) &opres[1] : "";
141 if (GNUNET_YES == GNUNET_OP_result (h->op, GNUNET_ntohll (opres->op_id),
142 GNUNET_ntohll (opres->result_code) + INT64_MIN,
143 str, size - sizeof (*opres), (void **) &op))
145 LOG (GNUNET_ERROR_TYPE_DEBUG,
146 "handle_result_code: Received result message with OP ID: %" PRIu64 "\n",
147 GNUNET_ntohll (opres->op_id));
152 LOG (GNUNET_ERROR_TYPE_DEBUG,
153 "handle_result_code: No callback registered for OP ID %" PRIu64 ".\n",
154 GNUNET_ntohll (opres->op_id));
156 h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
161 handle_result_counters (void *cls, const struct CountersResult *cres)
163 struct GNUNET_PSYCSTORE_Handle *h = cls;
164 struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
166 if (GNUNET_YES == GNUNET_OP_get (h->op, GNUNET_ntohll (cres->op_id),
167 NULL, NULL, (void **) &op))
169 GNUNET_assert (NULL != op);
170 if (NULL != op->counters_cb)
172 op->counters_cb (op->cls,
173 ntohl (cres->result_code),
174 GNUNET_ntohll (cres->max_fragment_id),
175 GNUNET_ntohll (cres->max_message_id),
176 GNUNET_ntohll (cres->max_group_generation),
177 GNUNET_ntohll (cres->max_state_message_id));
179 GNUNET_OP_remove (h->op, GNUNET_ntohll (cres->op_id));
184 LOG (GNUNET_ERROR_TYPE_DEBUG,
185 "handle_result_counters: No callback registered for OP ID %" PRIu64 ".\n",
186 GNUNET_ntohll (cres->op_id));
188 h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
193 check_result_fragment (void *cls, const struct FragmentResult *fres)
195 uint16_t size = ntohs (fres->header.size);
196 struct GNUNET_MULTICAST_MessageHeader *mmsg =
197 (struct GNUNET_MULTICAST_MessageHeader *) &fres[1];
198 if (sizeof (*fres) + sizeof (*mmsg) < size
199 && sizeof (*fres) + ntohs (mmsg->header.size) != size)
201 LOG (GNUNET_ERROR_TYPE_ERROR,
202 "check_result_fragment: Received message with invalid length %lu bytes.\n",
203 size, sizeof (*fres));
205 return GNUNET_SYSERR;
212 handle_result_fragment (void *cls, const struct FragmentResult *fres)
214 struct GNUNET_PSYCSTORE_Handle *h = cls;
215 struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
217 if (GNUNET_YES == GNUNET_OP_get (h->op, GNUNET_ntohll (fres->op_id),
218 NULL, NULL, (void **) &op))
220 GNUNET_assert (NULL != op);
221 if (NULL != op->fragment_cb)
222 op->fragment_cb (op->cls,
223 (struct GNUNET_MULTICAST_MessageHeader *) &fres[1],
224 ntohl (fres->psycstore_flags));
225 //GNUNET_OP_remove (h->op, GNUNET_ntohll (fres->op_id));
230 LOG (GNUNET_ERROR_TYPE_DEBUG,
231 "handle_result_fragment: No callback registered for OP ID %" PRIu64 ".\n",
232 GNUNET_ntohll (fres->op_id));
234 h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
239 check_result_state (void *cls, const struct StateResult *sres)
241 const char *name = (const char *) &sres[1];
242 uint16_t size = ntohs (sres->header.size);
243 uint16_t name_size = ntohs (sres->name_size);
246 || size - sizeof (*sres) < name_size
247 || '\0' != name[name_size - 1])
249 LOG (GNUNET_ERROR_TYPE_ERROR,
250 "check_result_state: Received state result message with invalid name.\n");
252 return GNUNET_SYSERR;
259 handle_result_state (void *cls, const struct StateResult *sres)
261 struct GNUNET_PSYCSTORE_Handle *h = cls;
262 struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
264 const char *name = (const char *) &sres[1];
265 uint16_t name_size = ntohs (sres->name_size);
267 if (GNUNET_YES == GNUNET_OP_get (h->op, GNUNET_ntohll (sres->op_id),
268 NULL, NULL, (void **) &op))
270 GNUNET_assert (NULL != op);
271 if (NULL != op->state_cb)
272 op->state_cb (op->cls, name, (char *) &sres[1] + name_size,
273 ntohs (sres->header.size) - sizeof (*sres) - name_size);
274 //GNUNET_OP_remove (h->op, GNUNET_ntohll (sres->op_id));
279 LOG (GNUNET_ERROR_TYPE_DEBUG,
280 "handle_result_state: No callback registered for OP ID %" PRIu64 ".\n",
281 GNUNET_ntohll (sres->op_id));
283 h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
288 reconnect (void *cls);
292 * Client disconnected from service.
294 * Reconnect after backoff period.=
297 disconnected (void *cls, enum GNUNET_MQ_Error error)
299 struct GNUNET_PSYCSTORE_Handle *h = cls;
301 LOG (GNUNET_ERROR_TYPE_DEBUG,
302 "Origin client disconnected (%d), re-connecting\n",
306 GNUNET_MQ_destroy (h->mq);
307 GNUNET_OP_destroy (h->op);
312 h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_delay,
314 h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
319 do_connect (struct GNUNET_PSYCSTORE_Handle *h)
321 LOG (GNUNET_ERROR_TYPE_DEBUG,
322 "Connecting to PSYCstore service.\n");
324 struct GNUNET_MQ_MessageHandler handlers[] = {
325 GNUNET_MQ_hd_var_size (result_code,
326 GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE,
327 struct OperationResult,
329 GNUNET_MQ_hd_fixed_size (result_counters,
330 GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS,
331 struct CountersResult,
333 GNUNET_MQ_hd_var_size (result_fragment,
334 GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_FRAGMENT,
335 struct FragmentResult,
337 GNUNET_MQ_hd_var_size (result_state,
338 GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_STATE,
341 GNUNET_MQ_handler_end ()
344 h->op = GNUNET_OP_create ();
345 GNUNET_assert (NULL == h->mq);
346 h->mq = GNUNET_CLIENT_connect (h->cfg, "psycstore",
347 handlers, disconnected, h);
348 GNUNET_assert (NULL != h->mq);
353 * Try again to connect to the PSYCstore service.
355 * @param cls Handle to the PSYCstore service.
358 reconnect (void *cls)
360 struct GNUNET_PSYCSTORE_Handle *h = cls;
362 h->reconnect_task = NULL;
368 * Connect to the PSYCstore service.
370 * @param cfg The configuration to use
371 * @return Handle to use
373 struct GNUNET_PSYCSTORE_Handle *
374 GNUNET_PSYCSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
376 struct GNUNET_PSYCSTORE_Handle *h
377 = GNUNET_new (struct GNUNET_PSYCSTORE_Handle);
379 h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
386 * Disconnect from PSYCstore service
388 * @param h Handle to destroy
391 GNUNET_PSYCSTORE_disconnect (struct GNUNET_PSYCSTORE_Handle *h)
393 GNUNET_assert (NULL != h);
394 if (h->reconnect_task != NULL)
396 GNUNET_SCHEDULER_cancel (h->reconnect_task);
397 h->reconnect_task = NULL;
401 // FIXME: free data structures for pending operations
402 GNUNET_MQ_destroy (h->mq);
410 * Message sent notification.
412 * Remove invalidated envelope pointer.
415 message_sent (void *cls)
417 struct GNUNET_PSYCSTORE_OperationHandle *op = cls;
423 * Create a new operation.
425 static struct GNUNET_PSYCSTORE_OperationHandle *
426 op_create (struct GNUNET_PSYCSTORE_Handle *h,
427 struct GNUNET_OP_Handle *hop,
428 GNUNET_PSYCSTORE_ResultCallback result_cb,
431 struct GNUNET_PSYCSTORE_OperationHandle *
432 op = GNUNET_malloc (sizeof (*op));
434 op->op_id = GNUNET_OP_add (hop,
435 (GNUNET_ResultCallback) result_cb,
442 * Send a message associated with an operation.
449 * Message envelope to send.
451 * Operation ID to write in network byte order. NULL if not needed.
453 * @return Operation handle.
456 static struct GNUNET_PSYCSTORE_OperationHandle *
457 op_send (struct GNUNET_PSYCSTORE_Handle *h,
458 struct GNUNET_PSYCSTORE_OperationHandle *op,
459 struct GNUNET_MQ_Envelope *env,
464 *op_id = GNUNET_htonll (op->op_id);
466 GNUNET_MQ_notify_sent (env, message_sent, op);
467 GNUNET_MQ_send (h->mq, env);
473 * Cancel a PSYCstore operation. Note that the operation MAY still
474 * be executed; this merely cancels the continuation; if the request
475 * was already transmitted, the service may still choose to complete
478 * @param op Operation to cancel.
480 * @return #GNUNET_YES if message was not sent yet and got discarded,
481 * #GNUNET_NO if it was already sent, and only the callbacks got cancelled.
484 GNUNET_PSYCSTORE_operation_cancel (struct GNUNET_PSYCSTORE_OperationHandle *op)
486 struct GNUNET_PSYCSTORE_Handle *h = op->h;
491 GNUNET_MQ_send_cancel (op->env);
495 GNUNET_OP_remove (h->op, op->op_id);
503 * Store join/leave events for a PSYC channel in order to be able to answer
504 * membership test queries later.
507 * Handle for the PSYCstore.
509 * The channel where the event happened.
511 * Public key of joining/leaving slave.
513 * #GNUNET_YES on join, #GNUNET_NO on part.
514 * @param announced_at
515 * ID of the message that announced the membership change.
516 * @param effective_since
517 * Message ID this membership change is in effect since.
518 * For joins it is <= announced_at, for parts it is always 0.
519 * @param group_generation
520 * In case of a part, the last group generation the slave has access to.
521 * It has relevance when a larger message have fragments with different
524 * Callback to call with the result of the storage operation.
526 * Closure for the callback.
528 * @return Operation handle that can be used to cancel the operation.
530 struct GNUNET_PSYCSTORE_OperationHandle *
531 GNUNET_PSYCSTORE_membership_store (struct GNUNET_PSYCSTORE_Handle *h,
532 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
533 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
535 uint64_t announced_at,
536 uint64_t effective_since,
537 uint64_t group_generation,
538 GNUNET_PSYCSTORE_ResultCallback result_cb,
541 GNUNET_assert (NULL != h);
542 GNUNET_assert (NULL != channel_key);
543 GNUNET_assert (NULL != slave_key);
544 GNUNET_assert (GNUNET_YES == did_join || GNUNET_NO == did_join);
545 GNUNET_assert (did_join
546 ? effective_since <= announced_at
547 : effective_since == 0);
549 struct MembershipStoreRequest *req;
550 struct GNUNET_MQ_Envelope *
551 env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_STORE);
552 req->channel_key = *channel_key;
553 req->slave_key = *slave_key;
554 req->did_join = did_join;
555 req->announced_at = GNUNET_htonll (announced_at);
556 req->effective_since = GNUNET_htonll (effective_since);
557 req->group_generation = GNUNET_htonll (group_generation);
560 op_send (h, op_create (h, h->op, result_cb, cls),
566 * Test if a member was admitted to the channel at the given message ID.
568 * This is useful when relaying and replaying messages to check if a particular
569 * slave has access to the message fragment with a given group generation. It
570 * is also used when handling join requests to determine whether the slave is
571 * currently admitted to the channel.
574 * Handle for the PSYCstore.
576 * The channel we are interested in.
578 * Public key of slave whose membership to check.
580 * Message ID for which to do the membership test.
581 * @param group_generation
582 * Group generation of the fragment of the message to test.
583 * It has relevance if the message consists of multiple fragments with
584 * different group generations.
586 * Callback to call with the test result.
588 * Closure for the callback.
590 * @return Operation handle that can be used to cancel the operation.
592 struct GNUNET_PSYCSTORE_OperationHandle *
593 GNUNET_PSYCSTORE_membership_test (struct GNUNET_PSYCSTORE_Handle *h,
594 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
595 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
597 uint64_t group_generation,
598 GNUNET_PSYCSTORE_ResultCallback result_cb,
601 struct MembershipTestRequest *req;
602 struct GNUNET_MQ_Envelope *
603 env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_TEST);
604 req->channel_key = *channel_key;
605 req->slave_key = *slave_key;
606 req->message_id = GNUNET_htonll (message_id);
607 req->group_generation = GNUNET_htonll (group_generation);
610 op_send (h, op_create (h, h->op, result_cb, cls),
616 * Store a message fragment sent to a channel.
618 * @param h Handle for the PSYCstore.
619 * @param channel_key The channel the message belongs to.
620 * @param message Message to store.
621 * @param psycstore_flags Flags indicating whether the PSYC message contains
623 * @param result_cb Callback to call with the result of the operation.
624 * @param cls Closure for the callback.
626 * @return Handle that can be used to cancel the operation.
628 struct GNUNET_PSYCSTORE_OperationHandle *
629 GNUNET_PSYCSTORE_fragment_store (struct GNUNET_PSYCSTORE_Handle *h,
630 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
631 const struct GNUNET_MULTICAST_MessageHeader *msg,
632 enum GNUNET_PSYCSTORE_MessageFlags psycstore_flags,
633 GNUNET_PSYCSTORE_ResultCallback result_cb,
636 uint16_t size = ntohs (msg->header.size);
637 struct FragmentStoreRequest *req;
638 struct GNUNET_MQ_Envelope *
639 env = GNUNET_MQ_msg_extra (req, size,
640 GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_STORE);
641 req->channel_key = *channel_key;
642 req->psycstore_flags = htonl (psycstore_flags);
643 GNUNET_memcpy (&req[1], msg, size);
646 op_send (h, op_create (h, h->op, result_cb, cls),
652 * Retrieve message fragments by fragment ID range.
655 * Handle for the PSYCstore.
657 * The channel we are interested in.
659 * The slave requesting the fragment. If not NULL, a membership test is
660 * performed first and the fragment is only returned if the slave has
662 * @param first_fragment_id
663 * First fragment ID to retrieve.
664 * Use 0 to get the latest message fragment.
665 * @param last_fragment_id
666 * Last consecutive fragment ID to retrieve.
667 * Use 0 to get the latest message fragment.
668 * @param fragment_limit
669 * Maximum number of fragments to retrieve.
671 * Callback to call with the retrieved fragments.
673 * Callback to call with the result of the operation.
675 * Closure for the callbacks.
677 * @return Handle that can be used to cancel the operation.
679 struct GNUNET_PSYCSTORE_OperationHandle *
680 GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h,
681 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
682 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
683 uint64_t first_fragment_id,
684 uint64_t last_fragment_id,
685 GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
686 GNUNET_PSYCSTORE_ResultCallback result_cb,
689 struct FragmentGetRequest *req;
690 struct GNUNET_MQ_Envelope *
691 env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET);
692 req->channel_key = *channel_key;
693 req->first_fragment_id = GNUNET_htonll (first_fragment_id);
694 req->last_fragment_id = GNUNET_htonll (last_fragment_id);
695 if (NULL != slave_key)
697 req->slave_key = *slave_key;
698 req->do_membership_test = GNUNET_YES;
701 struct GNUNET_PSYCSTORE_OperationHandle *
702 op = op_create (h, h->op, result_cb, cls);
703 op->fragment_cb = fragment_cb;
705 return op_send (h, op, env, &req->op_id);
710 * Retrieve latest message fragments.
713 * Handle for the PSYCstore.
715 * The channel we are interested in.
717 * The slave requesting the fragment. If not NULL, a membership test is
718 * performed first and the fragment is only returned if the slave has
720 * @param first_fragment_id
721 * First fragment ID to retrieve.
722 * Use 0 to get the latest message fragment.
723 * @param last_fragment_id
724 * Last consecutive fragment ID to retrieve.
725 * Use 0 to get the latest message fragment.
726 * @param fragment_limit
727 * Maximum number of fragments to retrieve.
729 * Callback to call with the retrieved fragments.
731 * Callback to call with the result of the operation.
733 * Closure for the callbacks.
735 * @return Handle that can be used to cancel the operation.
737 struct GNUNET_PSYCSTORE_OperationHandle *
738 GNUNET_PSYCSTORE_fragment_get_latest (struct GNUNET_PSYCSTORE_Handle *h,
739 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
740 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
741 uint64_t fragment_limit,
742 GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
743 GNUNET_PSYCSTORE_ResultCallback result_cb,
746 struct FragmentGetRequest *req;
747 struct GNUNET_MQ_Envelope *
748 env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET);
749 req->channel_key = *channel_key;
750 req->fragment_limit = GNUNET_ntohll (fragment_limit);
751 if (NULL != slave_key)
753 req->slave_key = *slave_key;
754 req->do_membership_test = GNUNET_YES;
757 struct GNUNET_PSYCSTORE_OperationHandle *
758 op = op_create (h, h->op, result_cb, cls);
759 op->fragment_cb = fragment_cb;
761 return op_send (h, op, env, &req->op_id);
766 * Retrieve all fragments of messages in a message ID range.
769 * Handle for the PSYCstore.
771 * The channel we are interested in.
773 * The slave requesting the message.
774 * If not NULL, a membership test is performed first
775 * and the message is only returned if the slave has access to it.
776 * @param first_message_id
777 * First message ID to retrieve.
778 * @param last_message_id
779 * Last consecutive message ID to retrieve.
780 * @param fragment_limit
781 * Maximum number of fragments to retrieve.
782 * @param method_prefix
783 * Retrieve only messages with a matching method prefix.
784 * @todo Implement method_prefix query.
786 * Callback to call with the retrieved fragments.
788 * Callback to call with the result of the operation.
790 * Closure for the callbacks.
792 * @return Handle that can be used to cancel the operation.
794 struct GNUNET_PSYCSTORE_OperationHandle *
795 GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h,
796 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
797 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
798 uint64_t first_message_id,
799 uint64_t last_message_id,
800 uint64_t fragment_limit,
801 const char *method_prefix,
802 GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
803 GNUNET_PSYCSTORE_ResultCallback result_cb,
806 struct MessageGetRequest *req;
807 if (NULL == method_prefix)
809 uint16_t method_size = strnlen (method_prefix,
810 GNUNET_MAX_MESSAGE_SIZE
811 - sizeof (*req)) + 1;
813 struct GNUNET_MQ_Envelope *
814 env = GNUNET_MQ_msg_extra (req, method_size,
815 GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET);
816 req->channel_key = *channel_key;
817 req->first_message_id = GNUNET_htonll (first_message_id);
818 req->last_message_id = GNUNET_htonll (last_message_id);
819 req->fragment_limit = GNUNET_htonll (fragment_limit);
820 if (NULL != slave_key)
822 req->slave_key = *slave_key;
823 req->do_membership_test = GNUNET_YES;
825 GNUNET_memcpy (&req[1], method_prefix, method_size);
826 ((char *) &req[1])[method_size - 1] = '\0';
828 struct GNUNET_PSYCSTORE_OperationHandle *
829 op = op_create (h, h->op, result_cb, cls);
830 op->fragment_cb = fragment_cb;
832 return op_send (h, op, env, &req->op_id);
837 * Retrieve all fragments of the latest messages.
840 * Handle for the PSYCstore.
842 * The channel we are interested in.
844 * The slave requesting the message.
845 * If not NULL, a membership test is performed first
846 * and the message is only returned if the slave has access to it.
847 * @param message_limit
848 * Maximum number of messages to retrieve.
849 * @param method_prefix
850 * Retrieve only messages with a matching method prefix.
851 * @todo Implement method_prefix query.
853 * Callback to call with the retrieved fragments.
855 * Callback to call with the result of the operation.
857 * Closure for the callbacks.
859 * @return Handle that can be used to cancel the operation.
861 struct GNUNET_PSYCSTORE_OperationHandle *
862 GNUNET_PSYCSTORE_message_get_latest (struct GNUNET_PSYCSTORE_Handle *h,
863 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
864 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
865 uint64_t message_limit,
866 const char *method_prefix,
867 GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
868 GNUNET_PSYCSTORE_ResultCallback result_cb,
871 struct MessageGetRequest *req;
873 if (NULL == method_prefix)
875 uint16_t method_size = strnlen (method_prefix,
876 GNUNET_MAX_MESSAGE_SIZE
877 - sizeof (*req)) + 1;
878 GNUNET_assert ('\0' == method_prefix[method_size - 1]);
880 struct GNUNET_MQ_Envelope *
881 env = GNUNET_MQ_msg_extra (req, method_size,
882 GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET);
883 req->channel_key = *channel_key;
884 req->message_limit = GNUNET_ntohll (message_limit);
885 if (NULL != slave_key)
887 req->slave_key = *slave_key;
888 req->do_membership_test = GNUNET_YES;
890 GNUNET_memcpy (&req[1], method_prefix, method_size);
892 struct GNUNET_PSYCSTORE_OperationHandle *
893 op = op_create (h, h->op, result_cb, cls);
894 op->fragment_cb = fragment_cb;
896 return op_send (h, op, env, &req->op_id);
901 * Retrieve a fragment of message specified by its message ID and fragment
905 * Handle for the PSYCstore.
907 * The channel we are interested in.
909 * The slave requesting the message fragment. If not NULL, a membership
910 * test is performed first and the message fragment is only returned
911 * if the slave has access to it.
913 * Message ID to retrieve. Use 0 to get the latest message.
914 * @param fragment_offset
915 * Offset of the fragment to retrieve.
917 * Callback to call with the retrieved fragments.
919 * Callback to call with the result of the operation.
921 * Closure for the callbacks.
923 * @return Handle that can be used to cancel the operation.
925 struct GNUNET_PSYCSTORE_OperationHandle *
926 GNUNET_PSYCSTORE_message_get_fragment (struct GNUNET_PSYCSTORE_Handle *h,
927 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
928 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
930 uint64_t fragment_offset,
931 GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
932 GNUNET_PSYCSTORE_ResultCallback result_cb,
935 struct MessageGetFragmentRequest *req;
936 struct GNUNET_MQ_Envelope *
937 env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET_FRAGMENT);
939 req->channel_key = *channel_key;
940 req->message_id = GNUNET_htonll (message_id);
941 req->fragment_offset = GNUNET_htonll (fragment_offset);
942 if (NULL != slave_key)
944 req->slave_key = *slave_key;
945 req->do_membership_test = GNUNET_YES;
948 struct GNUNET_PSYCSTORE_OperationHandle *
949 op = op_create (h, h->op, result_cb, cls);
950 op->fragment_cb = fragment_cb;
952 return op_send (h, op, env, &req->op_id);
957 * Retrieve latest values of counters for a channel master.
959 * The current value of counters are needed when a channel master is restarted,
960 * so that it can continue incrementing the counters from their last value.
963 * Handle for the PSYCstore.
965 * Public key that identifies the channel.
967 * Callback to call with the result.
969 * Closure for the @a ccb callback.
971 * @return Handle that can be used to cancel the operation.
973 struct GNUNET_PSYCSTORE_OperationHandle *
974 GNUNET_PSYCSTORE_counters_get (struct GNUNET_PSYCSTORE_Handle *h,
975 struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
976 GNUNET_PSYCSTORE_CountersCallback counters_cb,
979 struct OperationRequest *req;
980 struct GNUNET_MQ_Envelope *
981 env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_COUNTERS_GET);
982 req->channel_key = *channel_key;
984 struct GNUNET_PSYCSTORE_OperationHandle *
985 op = op_create (h, h->op, NULL, NULL);
986 op->counters_cb = counters_cb;
988 return op_send (h, op, env, &req->op_id);
993 * Apply modifiers of a message to the current channel state.
995 * An error is returned if there are missing messages containing state
996 * operations before the current one.
999 * Handle for the PSYCstore.
1000 * @param channel_key
1001 * The channel we are interested in.
1003 * ID of the message that contains the @a modifiers.
1004 * @param state_delta
1005 * Value of the _state_delta PSYC header variable of the message.
1007 * Callback to call with the result of the operation.
1009 * Closure for @a result_cb.
1011 * @return Handle that can be used to cancel the operation.
1013 struct GNUNET_PSYCSTORE_OperationHandle *
1014 GNUNET_PSYCSTORE_state_modify (struct GNUNET_PSYCSTORE_Handle *h,
1015 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1016 uint64_t message_id,
1017 uint64_t state_delta,
1018 GNUNET_PSYCSTORE_ResultCallback result_cb,
1021 struct StateModifyRequest *req;
1022 struct GNUNET_MQ_Envelope *
1023 env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY);
1024 req->channel_key = *channel_key;
1025 req->message_id = GNUNET_htonll (message_id);
1026 req->state_delta = GNUNET_htonll (state_delta);
1028 return op_send (h, op_create (h, h->op, result_cb, cls),
1033 struct StateSyncClosure
1035 GNUNET_PSYCSTORE_ResultCallback result_cb;
1042 state_sync_result (void *cls, int64_t result,
1043 const char *err_msg, uint16_t err_msg_size)
1045 struct StateSyncClosure *ssc = cls;
1046 if (GNUNET_OK != result || ssc->last)
1047 ssc->result_cb (ssc->cls, result, err_msg, err_msg_size);
1053 * Store synchronized state.
1056 * Handle for the PSYCstore.
1057 * @param channel_key
1058 * The channel we are interested in.
1059 * @param max_state_message_id
1060 * ID of the last stateful message before @a state_hash_message_id.
1061 * @param state_hash_message_id
1062 * ID of the message that contains the state_hash PSYC header variable.
1063 * @param modifier_count
1064 * Number of elements in the @a modifiers array.
1066 * Full state to store.
1068 * Callback to call with the result of the operation.
1070 * Closure for the callback.
1072 * @return Handle that can be used to cancel the operation.
1074 struct GNUNET_PSYCSTORE_OperationHandle *
1075 GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h,
1076 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1077 uint64_t max_state_message_id,
1078 uint64_t state_hash_message_id,
1079 size_t modifier_count,
1080 const struct GNUNET_PSYC_Modifier *modifiers,
1081 GNUNET_PSYCSTORE_ResultCallback result_cb,
1084 struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
1087 for (i = 0; i < modifier_count; i++) {
1088 struct StateSyncRequest *req;
1089 uint16_t name_size = strlen (modifiers[i].name) + 1;
1091 struct GNUNET_MQ_Envelope *
1092 env = GNUNET_MQ_msg_extra (req,
1093 sizeof (*req) + name_size + modifiers[i].value_size,
1094 GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC);
1096 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC);
1097 req->header.size = htons (sizeof (*req) + name_size
1098 + modifiers[i].value_size);
1099 req->channel_key = *channel_key;
1100 req->max_state_message_id = GNUNET_htonll (max_state_message_id);
1101 req->state_hash_message_id = GNUNET_htonll (state_hash_message_id);
1102 req->name_size = htons (name_size);
1106 : (modifier_count - 1 == i)
1110 GNUNET_memcpy (&req[1], modifiers[i].name, name_size);
1111 GNUNET_memcpy ((char *) &req[1] + name_size, modifiers[i].value, modifiers[i].value_size);
1113 struct StateSyncClosure *ssc = GNUNET_malloc (sizeof (*ssc));
1114 ssc->last = (req->flags & STATE_OP_LAST);
1115 ssc->result_cb = result_cb;
1118 op_send (h, op_create (h, h->op, state_sync_result, ssc),
1121 // FIXME: only one operation is returned,
1122 // add pointers to other operations and make all cancellable.
1128 * Reset the state of a channel.
1130 * Delete all state variables stored for the given channel.
1133 * Handle for the PSYCstore.
1134 * @param channel_key
1135 * The channel we are interested in.
1137 * Callback to call with the result of the operation.
1139 * Closure for the callback.
1141 * @return Handle that can be used to cancel the operation.
1143 struct GNUNET_PSYCSTORE_OperationHandle *
1144 GNUNET_PSYCSTORE_state_reset (struct GNUNET_PSYCSTORE_Handle *h,
1145 const struct GNUNET_CRYPTO_EddsaPublicKey
1147 GNUNET_PSYCSTORE_ResultCallback result_cb,
1150 struct OperationRequest *req;
1151 struct GNUNET_MQ_Envelope *
1152 env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_RESET);
1153 req->channel_key = *channel_key;
1156 op_send (h, op_create (h, h->op, result_cb, cls),
1162 * Update signed values of state variables in the state store.
1165 * Handle for the PSYCstore.
1166 * @param channel_key
1167 * The channel we are interested in.
1169 * Message ID that contained the state @a hash.
1171 * Hash of the serialized full state.
1173 * Callback to call with the result of the operation.
1175 * Closure for the callback.
1177 struct GNUNET_PSYCSTORE_OperationHandle *
1178 GNUNET_PSYCSTORE_state_hash_update (struct GNUNET_PSYCSTORE_Handle *h,
1179 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1180 uint64_t message_id,
1181 const struct GNUNET_HashCode *hash,
1182 GNUNET_PSYCSTORE_ResultCallback result_cb,
1185 struct StateHashUpdateRequest *req;
1186 struct GNUNET_MQ_Envelope *
1187 env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_HASH_UPDATE);
1188 req->channel_key = *channel_key;
1192 op_send (h, op_create (h, h->op, result_cb, cls),
1198 * Retrieve the best matching state variable.
1201 * Handle for the PSYCstore.
1202 * @param channel_key
1203 * The channel we are interested in.
1205 * Name of variable to match, the returned variable might be less specific.
1207 * Callback to return the matching state variable.
1209 * Callback to call with the result of the operation.
1211 * Closure for the callbacks.
1213 * @return Handle that can be used to cancel the operation.
1215 struct GNUNET_PSYCSTORE_OperationHandle *
1216 GNUNET_PSYCSTORE_state_get (struct GNUNET_PSYCSTORE_Handle *h,
1217 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1219 GNUNET_PSYCSTORE_StateCallback state_cb,
1220 GNUNET_PSYCSTORE_ResultCallback result_cb,
1223 size_t name_size = strlen (name) + 1;
1224 struct OperationRequest *req;
1225 struct GNUNET_MQ_Envelope *
1226 env = GNUNET_MQ_msg_extra (req, name_size,
1227 GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET);
1228 req->channel_key = *channel_key;
1229 GNUNET_memcpy (&req[1], name, name_size);
1231 struct GNUNET_PSYCSTORE_OperationHandle *
1232 op = op_create (h, h->op, result_cb, cls);
1233 op->state_cb = state_cb;
1235 return op_send (h, op, env, &req->op_id);
1240 * Retrieve all state variables for a channel with the given prefix.
1243 * Handle for the PSYCstore.
1244 * @param channel_key
1245 * The channel we are interested in.
1246 * @param name_prefix
1247 * Prefix of state variable names to match.
1249 * Callback to return matching state variables.
1251 * Callback to call with the result of the operation.
1253 * Closure for the callbacks.
1255 * @return Handle that can be used to cancel the operation.
1257 struct GNUNET_PSYCSTORE_OperationHandle *
1258 GNUNET_PSYCSTORE_state_get_prefix (struct GNUNET_PSYCSTORE_Handle *h,
1259 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1260 const char *name_prefix,
1261 GNUNET_PSYCSTORE_StateCallback state_cb,
1262 GNUNET_PSYCSTORE_ResultCallback result_cb,
1265 size_t name_size = strlen (name_prefix) + 1;
1266 struct OperationRequest *req;
1267 struct GNUNET_MQ_Envelope *
1268 env = GNUNET_MQ_msg_extra (req, name_size,
1269 GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET_PREFIX);
1270 req->channel_key = *channel_key;
1271 GNUNET_memcpy (&req[1], name_prefix, name_size);
1273 struct GNUNET_PSYCSTORE_OperationHandle *
1274 op = op_create (h, h->op, result_cb, cls);
1275 op->state_cb = state_cb;
1277 return op_send (h, op, env, &req->op_id);
1280 /* end of psycstore_api.c */