2 * This file is part of GNUnet
3 * Copyright (C) 2013 GNUnet e.V.
5 * GNUnet is free software: you can redistribute it and/or modify it
6 * under the terms of the GNU Affero General Public License as published
7 * by the Free Software Foundation, either version 3 of the License,
8 * or (at your option) any later version.
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Affero General Public License for more details.
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
20 * @file psycstore/psycstore_api.c
21 * @brief API to interact with the PSYCstore service
22 * @author Gabor X Toth
23 * @author Christian Grothoff
29 #include "gnunet_util_lib.h"
30 #include "gnunet_constants.h"
31 #include "gnunet_protocols.h"
32 #include "gnunet_psycstore_service.h"
33 #include "gnunet_multicast_service.h"
34 #include "psycstore.h"
36 #define LOG(kind,...) GNUNET_log_from (kind, "psycstore-api",__VA_ARGS__)
39 * Handle for an operation with the PSYCstore service.
41 struct GNUNET_PSYCSTORE_OperationHandle
45 * Main PSYCstore handle.
47 struct GNUNET_PSYCSTORE_Handle *h;
53 GNUNET_PSYCSTORE_FragmentCallback fragment_cb;
54 GNUNET_PSYCSTORE_CountersCallback counters_cb;
55 GNUNET_PSYCSTORE_StateCallback state_cb;
59 * Closure for callbacks.
66 struct GNUNET_MQ_Envelope *env;
76 * Handle for the service.
78 struct GNUNET_PSYCSTORE_Handle
81 * Configuration to use.
83 const struct GNUNET_CONFIGURATION_Handle *cfg;
88 struct GNUNET_MQ_Handle *mq;
93 struct GNUNET_OP_Handle *op;
96 * Task doing exponential back-off trying to reconnect.
98 struct GNUNET_SCHEDULER_Task *reconnect_task;
101 * Delay for next connect retry.
103 struct GNUNET_TIME_Relative reconnect_delay;
106 GNUNET_PSYCSTORE_FragmentCallback *fragment_cb;
108 GNUNET_PSYCSTORE_CountersCallback *counters_cb;
110 GNUNET_PSYCSTORE_StateCallback *state_cb;
112 * Closure for callbacks.
119 check_result_code (void *cls, const struct OperationResult *opres)
121 uint16_t size = ntohs (opres->header.size);
122 const char *str = (const char *) &opres[1];
123 if ( (sizeof (*opres) < size) &&
124 ('\0' != str[size - sizeof (*opres) - 1]) )
127 return GNUNET_SYSERR;
135 handle_result_code (void *cls, const struct OperationResult *opres)
137 struct GNUNET_PSYCSTORE_Handle *h = cls;
138 struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
139 uint16_t size = ntohs (opres->header.size);
142 str = (sizeof (*opres) < size) ? (const char *) &opres[1] : "";
144 if (GNUNET_YES == GNUNET_OP_result (h->op, GNUNET_ntohll (opres->op_id),
145 GNUNET_ntohll (opres->result_code) + INT64_MIN,
146 str, size - sizeof (*opres), (void **) &op))
148 LOG (GNUNET_ERROR_TYPE_DEBUG,
149 "handle_result_code: Received result message with OP ID: %" PRIu64 "\n",
150 GNUNET_ntohll (opres->op_id));
155 LOG (GNUNET_ERROR_TYPE_DEBUG,
156 "handle_result_code: No callback registered for OP ID %" PRIu64 ".\n",
157 GNUNET_ntohll (opres->op_id));
159 h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
164 handle_result_counters (void *cls, const struct CountersResult *cres)
166 struct GNUNET_PSYCSTORE_Handle *h = cls;
167 struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
169 if (GNUNET_YES == GNUNET_OP_get (h->op, GNUNET_ntohll (cres->op_id),
170 NULL, NULL, (void **) &op))
172 GNUNET_assert (NULL != op);
173 if (NULL != op->counters_cb)
175 op->counters_cb (op->cls,
176 ntohl (cres->result_code),
177 GNUNET_ntohll (cres->max_fragment_id),
178 GNUNET_ntohll (cres->max_message_id),
179 GNUNET_ntohll (cres->max_group_generation),
180 GNUNET_ntohll (cres->max_state_message_id));
182 GNUNET_OP_remove (h->op, GNUNET_ntohll (cres->op_id));
187 LOG (GNUNET_ERROR_TYPE_DEBUG,
188 "handle_result_counters: No callback registered for OP ID %" PRIu64 ".\n",
189 GNUNET_ntohll (cres->op_id));
191 h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
196 check_result_fragment (void *cls, const struct FragmentResult *fres)
198 uint16_t size = ntohs (fres->header.size);
199 struct GNUNET_MULTICAST_MessageHeader *mmsg =
200 (struct GNUNET_MULTICAST_MessageHeader *) &fres[1];
201 if (sizeof (*fres) + sizeof (*mmsg) < size
202 && sizeof (*fres) + ntohs (mmsg->header.size) != size)
204 LOG (GNUNET_ERROR_TYPE_ERROR,
205 "check_result_fragment: Received message with invalid length %lu bytes.\n",
206 size, sizeof (*fres));
208 return GNUNET_SYSERR;
215 handle_result_fragment (void *cls, const struct FragmentResult *fres)
217 struct GNUNET_PSYCSTORE_Handle *h = cls;
218 struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
220 if (GNUNET_YES == GNUNET_OP_get (h->op, GNUNET_ntohll (fres->op_id),
221 NULL, NULL, (void **) &op))
223 GNUNET_assert (NULL != op);
224 if (NULL != op->fragment_cb)
225 op->fragment_cb (op->cls,
226 (struct GNUNET_MULTICAST_MessageHeader *) &fres[1],
227 ntohl (fres->psycstore_flags));
228 //GNUNET_OP_remove (h->op, GNUNET_ntohll (fres->op_id));
233 LOG (GNUNET_ERROR_TYPE_DEBUG,
234 "handle_result_fragment: No callback registered for OP ID %" PRIu64 ".\n",
235 GNUNET_ntohll (fres->op_id));
237 h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
242 check_result_state (void *cls, const struct StateResult *sres)
244 const char *name = (const char *) &sres[1];
245 uint16_t size = ntohs (sres->header.size);
246 uint16_t name_size = ntohs (sres->name_size);
249 || size - sizeof (*sres) < name_size
250 || '\0' != name[name_size - 1])
252 LOG (GNUNET_ERROR_TYPE_ERROR,
253 "check_result_state: Received state result message with invalid name.\n");
255 return GNUNET_SYSERR;
262 handle_result_state (void *cls, const struct StateResult *sres)
264 struct GNUNET_PSYCSTORE_Handle *h = cls;
265 struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
267 const char *name = (const char *) &sres[1];
268 uint16_t name_size = ntohs (sres->name_size);
270 if (GNUNET_YES == GNUNET_OP_get (h->op, GNUNET_ntohll (sres->op_id),
271 NULL, NULL, (void **) &op))
273 GNUNET_assert (NULL != op);
274 if (NULL != op->state_cb)
275 op->state_cb (op->cls, name, (char *) &sres[1] + name_size,
276 ntohs (sres->header.size) - sizeof (*sres) - name_size);
277 //GNUNET_OP_remove (h->op, GNUNET_ntohll (sres->op_id));
282 LOG (GNUNET_ERROR_TYPE_DEBUG,
283 "handle_result_state: No callback registered for OP ID %" PRIu64 ".\n",
284 GNUNET_ntohll (sres->op_id));
286 h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
291 reconnect (void *cls);
295 * Client disconnected from service.
297 * Reconnect after backoff period.=
300 disconnected (void *cls, enum GNUNET_MQ_Error error)
302 struct GNUNET_PSYCSTORE_Handle *h = cls;
304 LOG (GNUNET_ERROR_TYPE_DEBUG,
305 "Origin client disconnected (%d), re-connecting\n",
309 GNUNET_MQ_destroy (h->mq);
310 GNUNET_OP_destroy (h->op);
315 h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_delay,
317 h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
322 do_connect (struct GNUNET_PSYCSTORE_Handle *h)
324 LOG (GNUNET_ERROR_TYPE_DEBUG,
325 "Connecting to PSYCstore service.\n");
327 struct GNUNET_MQ_MessageHandler handlers[] = {
328 GNUNET_MQ_hd_var_size (result_code,
329 GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE,
330 struct OperationResult,
332 GNUNET_MQ_hd_fixed_size (result_counters,
333 GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS,
334 struct CountersResult,
336 GNUNET_MQ_hd_var_size (result_fragment,
337 GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_FRAGMENT,
338 struct FragmentResult,
340 GNUNET_MQ_hd_var_size (result_state,
341 GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_STATE,
344 GNUNET_MQ_handler_end ()
347 h->op = GNUNET_OP_create ();
348 GNUNET_assert (NULL == h->mq);
349 h->mq = GNUNET_CLIENT_connect (h->cfg, "psycstore",
350 handlers, disconnected, h);
351 GNUNET_assert (NULL != h->mq);
356 * Try again to connect to the PSYCstore service.
358 * @param cls Handle to the PSYCstore service.
361 reconnect (void *cls)
363 struct GNUNET_PSYCSTORE_Handle *h = cls;
365 h->reconnect_task = NULL;
371 * Connect to the PSYCstore service.
373 * @param cfg The configuration to use
374 * @return Handle to use
376 struct GNUNET_PSYCSTORE_Handle *
377 GNUNET_PSYCSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
379 struct GNUNET_PSYCSTORE_Handle *h
380 = GNUNET_new (struct GNUNET_PSYCSTORE_Handle);
382 h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
389 * Disconnect from PSYCstore service
391 * @param h Handle to destroy
394 GNUNET_PSYCSTORE_disconnect (struct GNUNET_PSYCSTORE_Handle *h)
396 GNUNET_assert (NULL != h);
397 if (h->reconnect_task != NULL)
399 GNUNET_SCHEDULER_cancel (h->reconnect_task);
400 h->reconnect_task = NULL;
404 // FIXME: free data structures for pending operations
405 GNUNET_MQ_destroy (h->mq);
413 * Message sent notification.
415 * Remove invalidated envelope pointer.
418 message_sent (void *cls)
420 struct GNUNET_PSYCSTORE_OperationHandle *op = cls;
426 * Create a new operation.
428 static struct GNUNET_PSYCSTORE_OperationHandle *
429 op_create (struct GNUNET_PSYCSTORE_Handle *h,
430 struct GNUNET_OP_Handle *hop,
431 GNUNET_PSYCSTORE_ResultCallback result_cb,
434 struct GNUNET_PSYCSTORE_OperationHandle *
435 op = GNUNET_malloc (sizeof (*op));
437 op->op_id = GNUNET_OP_add (hop,
438 (GNUNET_ResultCallback) result_cb,
445 * Send a message associated with an operation.
452 * Message envelope to send.
454 * Operation ID to write in network byte order. NULL if not needed.
456 * @return Operation handle.
459 static struct GNUNET_PSYCSTORE_OperationHandle *
460 op_send (struct GNUNET_PSYCSTORE_Handle *h,
461 struct GNUNET_PSYCSTORE_OperationHandle *op,
462 struct GNUNET_MQ_Envelope *env,
467 *op_id = GNUNET_htonll (op->op_id);
469 GNUNET_MQ_notify_sent (env, message_sent, op);
470 GNUNET_MQ_send (h->mq, env);
476 * Cancel a PSYCstore operation. Note that the operation MAY still
477 * be executed; this merely cancels the continuation; if the request
478 * was already transmitted, the service may still choose to complete
481 * @param op Operation to cancel.
483 * @return #GNUNET_YES if message was not sent yet and got discarded,
484 * #GNUNET_NO if it was already sent, and only the callbacks got cancelled.
487 GNUNET_PSYCSTORE_operation_cancel (struct GNUNET_PSYCSTORE_OperationHandle *op)
489 struct GNUNET_PSYCSTORE_Handle *h = op->h;
494 GNUNET_MQ_send_cancel (op->env);
498 GNUNET_OP_remove (h->op, op->op_id);
506 * Store join/leave events for a PSYC channel in order to be able to answer
507 * membership test queries later.
510 * Handle for the PSYCstore.
512 * The channel where the event happened.
514 * Public key of joining/leaving slave.
516 * #GNUNET_YES on join, #GNUNET_NO on part.
517 * @param announced_at
518 * ID of the message that announced the membership change.
519 * @param effective_since
520 * Message ID this membership change is in effect since.
521 * For joins it is <= announced_at, for parts it is always 0.
522 * @param group_generation
523 * In case of a part, the last group generation the slave has access to.
524 * It has relevance when a larger message have fragments with different
527 * Callback to call with the result of the storage operation.
529 * Closure for the callback.
531 * @return Operation handle that can be used to cancel the operation.
533 struct GNUNET_PSYCSTORE_OperationHandle *
534 GNUNET_PSYCSTORE_membership_store (struct GNUNET_PSYCSTORE_Handle *h,
535 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
536 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
538 uint64_t announced_at,
539 uint64_t effective_since,
540 uint64_t group_generation,
541 GNUNET_PSYCSTORE_ResultCallback result_cb,
544 GNUNET_assert (NULL != h);
545 GNUNET_assert (NULL != channel_key);
546 GNUNET_assert (NULL != slave_key);
547 GNUNET_assert (GNUNET_YES == did_join || GNUNET_NO == did_join);
548 GNUNET_assert (did_join
549 ? effective_since <= announced_at
550 : effective_since == 0);
552 struct MembershipStoreRequest *req;
553 struct GNUNET_MQ_Envelope *
554 env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_STORE);
555 req->channel_key = *channel_key;
556 req->slave_key = *slave_key;
557 req->did_join = did_join;
558 req->announced_at = GNUNET_htonll (announced_at);
559 req->effective_since = GNUNET_htonll (effective_since);
560 req->group_generation = GNUNET_htonll (group_generation);
563 op_send (h, op_create (h, h->op, result_cb, cls),
569 * Test if a member was admitted to the channel at the given message ID.
571 * This is useful when relaying and replaying messages to check if a particular
572 * slave has access to the message fragment with a given group generation. It
573 * is also used when handling join requests to determine whether the slave is
574 * currently admitted to the channel.
577 * Handle for the PSYCstore.
579 * The channel we are interested in.
581 * Public key of slave whose membership to check.
583 * Message ID for which to do the membership test.
584 * @param group_generation
585 * Group generation of the fragment of the message to test.
586 * It has relevance if the message consists of multiple fragments with
587 * different group generations.
589 * Callback to call with the test result.
591 * Closure for the callback.
593 * @return Operation handle that can be used to cancel the operation.
595 struct GNUNET_PSYCSTORE_OperationHandle *
596 GNUNET_PSYCSTORE_membership_test (struct GNUNET_PSYCSTORE_Handle *h,
597 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
598 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
600 uint64_t group_generation,
601 GNUNET_PSYCSTORE_ResultCallback result_cb,
604 struct MembershipTestRequest *req;
605 struct GNUNET_MQ_Envelope *
606 env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_TEST);
607 req->channel_key = *channel_key;
608 req->slave_key = *slave_key;
609 req->message_id = GNUNET_htonll (message_id);
610 req->group_generation = GNUNET_htonll (group_generation);
613 op_send (h, op_create (h, h->op, result_cb, cls),
619 * Store a message fragment sent to a channel.
621 * @param h Handle for the PSYCstore.
622 * @param channel_key The channel the message belongs to.
623 * @param message Message to store.
624 * @param psycstore_flags Flags indicating whether the PSYC message contains
626 * @param result_cb Callback to call with the result of the operation.
627 * @param cls Closure for the callback.
629 * @return Handle that can be used to cancel the operation.
631 struct GNUNET_PSYCSTORE_OperationHandle *
632 GNUNET_PSYCSTORE_fragment_store (struct GNUNET_PSYCSTORE_Handle *h,
633 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
634 const struct GNUNET_MULTICAST_MessageHeader *msg,
635 enum GNUNET_PSYCSTORE_MessageFlags psycstore_flags,
636 GNUNET_PSYCSTORE_ResultCallback result_cb,
639 uint16_t size = ntohs (msg->header.size);
640 struct FragmentStoreRequest *req;
641 struct GNUNET_MQ_Envelope *
642 env = GNUNET_MQ_msg_extra (req, size,
643 GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_STORE);
644 req->channel_key = *channel_key;
645 req->psycstore_flags = htonl (psycstore_flags);
646 GNUNET_memcpy (&req[1], msg, size);
649 op_send (h, op_create (h, h->op, result_cb, cls),
655 * Retrieve message fragments by fragment ID range.
658 * Handle for the PSYCstore.
660 * The channel we are interested in.
662 * The slave requesting the fragment. If not NULL, a membership test is
663 * performed first and the fragment is only returned if the slave has
665 * @param first_fragment_id
666 * First fragment ID to retrieve.
667 * Use 0 to get the latest message fragment.
668 * @param last_fragment_id
669 * Last consecutive fragment ID to retrieve.
670 * Use 0 to get the latest message fragment.
671 * @param fragment_limit
672 * Maximum number of fragments to retrieve.
674 * Callback to call with the retrieved fragments.
676 * Callback to call with the result of the operation.
678 * Closure for the callbacks.
680 * @return Handle that can be used to cancel the operation.
682 struct GNUNET_PSYCSTORE_OperationHandle *
683 GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h,
684 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
685 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
686 uint64_t first_fragment_id,
687 uint64_t last_fragment_id,
688 GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
689 GNUNET_PSYCSTORE_ResultCallback result_cb,
692 struct FragmentGetRequest *req;
693 struct GNUNET_MQ_Envelope *
694 env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET);
695 req->channel_key = *channel_key;
696 req->first_fragment_id = GNUNET_htonll (first_fragment_id);
697 req->last_fragment_id = GNUNET_htonll (last_fragment_id);
698 if (NULL != slave_key)
700 req->slave_key = *slave_key;
701 req->do_membership_test = GNUNET_YES;
704 struct GNUNET_PSYCSTORE_OperationHandle *
705 op = op_create (h, h->op, result_cb, cls);
706 op->fragment_cb = fragment_cb;
708 return op_send (h, op, env, &req->op_id);
713 * Retrieve latest message fragments.
716 * Handle for the PSYCstore.
718 * The channel we are interested in.
720 * The slave requesting the fragment. If not NULL, a membership test is
721 * performed first and the fragment is only returned if the slave has
723 * @param first_fragment_id
724 * First fragment ID to retrieve.
725 * Use 0 to get the latest message fragment.
726 * @param last_fragment_id
727 * Last consecutive fragment ID to retrieve.
728 * Use 0 to get the latest message fragment.
729 * @param fragment_limit
730 * Maximum number of fragments to retrieve.
732 * Callback to call with the retrieved fragments.
734 * Callback to call with the result of the operation.
736 * Closure for the callbacks.
738 * @return Handle that can be used to cancel the operation.
740 struct GNUNET_PSYCSTORE_OperationHandle *
741 GNUNET_PSYCSTORE_fragment_get_latest (struct GNUNET_PSYCSTORE_Handle *h,
742 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
743 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
744 uint64_t fragment_limit,
745 GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
746 GNUNET_PSYCSTORE_ResultCallback result_cb,
749 struct FragmentGetRequest *req;
750 struct GNUNET_MQ_Envelope *
751 env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET);
752 req->channel_key = *channel_key;
753 req->fragment_limit = GNUNET_ntohll (fragment_limit);
754 if (NULL != slave_key)
756 req->slave_key = *slave_key;
757 req->do_membership_test = GNUNET_YES;
760 struct GNUNET_PSYCSTORE_OperationHandle *
761 op = op_create (h, h->op, result_cb, cls);
762 op->fragment_cb = fragment_cb;
764 return op_send (h, op, env, &req->op_id);
769 * Retrieve all fragments of messages in a message ID range.
772 * Handle for the PSYCstore.
774 * The channel we are interested in.
776 * The slave requesting the message.
777 * If not NULL, a membership test is performed first
778 * and the message is only returned if the slave has access to it.
779 * @param first_message_id
780 * First message ID to retrieve.
781 * @param last_message_id
782 * Last consecutive message ID to retrieve.
783 * @param fragment_limit
784 * Maximum number of fragments to retrieve.
785 * @param method_prefix
786 * Retrieve only messages with a matching method prefix.
787 * @todo Implement method_prefix query.
789 * Callback to call with the retrieved fragments.
791 * Callback to call with the result of the operation.
793 * Closure for the callbacks.
795 * @return Handle that can be used to cancel the operation.
797 struct GNUNET_PSYCSTORE_OperationHandle *
798 GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h,
799 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
800 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
801 uint64_t first_message_id,
802 uint64_t last_message_id,
803 uint64_t fragment_limit,
804 const char *method_prefix,
805 GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
806 GNUNET_PSYCSTORE_ResultCallback result_cb,
809 struct MessageGetRequest *req;
810 if (NULL == method_prefix)
812 uint16_t method_size = strnlen (method_prefix,
813 GNUNET_MAX_MESSAGE_SIZE
814 - sizeof (*req)) + 1;
816 struct GNUNET_MQ_Envelope *
817 env = GNUNET_MQ_msg_extra (req, method_size,
818 GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET);
819 req->channel_key = *channel_key;
820 req->first_message_id = GNUNET_htonll (first_message_id);
821 req->last_message_id = GNUNET_htonll (last_message_id);
822 req->fragment_limit = GNUNET_htonll (fragment_limit);
823 if (NULL != slave_key)
825 req->slave_key = *slave_key;
826 req->do_membership_test = GNUNET_YES;
828 GNUNET_memcpy (&req[1], method_prefix, method_size);
829 ((char *) &req[1])[method_size - 1] = '\0';
831 struct GNUNET_PSYCSTORE_OperationHandle *
832 op = op_create (h, h->op, result_cb, cls);
833 op->fragment_cb = fragment_cb;
835 return op_send (h, op, env, &req->op_id);
840 * Retrieve all fragments of the latest messages.
843 * Handle for the PSYCstore.
845 * The channel we are interested in.
847 * The slave requesting the message.
848 * If not NULL, a membership test is performed first
849 * and the message is only returned if the slave has access to it.
850 * @param message_limit
851 * Maximum number of messages to retrieve.
852 * @param method_prefix
853 * Retrieve only messages with a matching method prefix.
854 * @todo Implement method_prefix query.
856 * Callback to call with the retrieved fragments.
858 * Callback to call with the result of the operation.
860 * Closure for the callbacks.
862 * @return Handle that can be used to cancel the operation.
864 struct GNUNET_PSYCSTORE_OperationHandle *
865 GNUNET_PSYCSTORE_message_get_latest (struct GNUNET_PSYCSTORE_Handle *h,
866 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
867 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
868 uint64_t message_limit,
869 const char *method_prefix,
870 GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
871 GNUNET_PSYCSTORE_ResultCallback result_cb,
874 struct MessageGetRequest *req;
876 if (NULL == method_prefix)
878 uint16_t method_size = strnlen (method_prefix,
879 GNUNET_MAX_MESSAGE_SIZE
880 - sizeof (*req)) + 1;
881 GNUNET_assert ('\0' == method_prefix[method_size - 1]);
883 struct GNUNET_MQ_Envelope *
884 env = GNUNET_MQ_msg_extra (req, method_size,
885 GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET);
886 req->channel_key = *channel_key;
887 req->message_limit = GNUNET_ntohll (message_limit);
888 if (NULL != slave_key)
890 req->slave_key = *slave_key;
891 req->do_membership_test = GNUNET_YES;
893 GNUNET_memcpy (&req[1], method_prefix, method_size);
895 struct GNUNET_PSYCSTORE_OperationHandle *
896 op = op_create (h, h->op, result_cb, cls);
897 op->fragment_cb = fragment_cb;
899 return op_send (h, op, env, &req->op_id);
904 * Retrieve a fragment of message specified by its message ID and fragment
908 * Handle for the PSYCstore.
910 * The channel we are interested in.
912 * The slave requesting the message fragment. If not NULL, a membership
913 * test is performed first and the message fragment is only returned
914 * if the slave has access to it.
916 * Message ID to retrieve. Use 0 to get the latest message.
917 * @param fragment_offset
918 * Offset of the fragment to retrieve.
920 * Callback to call with the retrieved fragments.
922 * Callback to call with the result of the operation.
924 * Closure for the callbacks.
926 * @return Handle that can be used to cancel the operation.
928 struct GNUNET_PSYCSTORE_OperationHandle *
929 GNUNET_PSYCSTORE_message_get_fragment (struct GNUNET_PSYCSTORE_Handle *h,
930 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
931 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
933 uint64_t fragment_offset,
934 GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
935 GNUNET_PSYCSTORE_ResultCallback result_cb,
938 struct MessageGetFragmentRequest *req;
939 struct GNUNET_MQ_Envelope *
940 env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET_FRAGMENT);
942 req->channel_key = *channel_key;
943 req->message_id = GNUNET_htonll (message_id);
944 req->fragment_offset = GNUNET_htonll (fragment_offset);
945 if (NULL != slave_key)
947 req->slave_key = *slave_key;
948 req->do_membership_test = GNUNET_YES;
951 struct GNUNET_PSYCSTORE_OperationHandle *
952 op = op_create (h, h->op, result_cb, cls);
953 op->fragment_cb = fragment_cb;
955 return op_send (h, op, env, &req->op_id);
960 * Retrieve latest values of counters for a channel master.
962 * The current value of counters are needed when a channel master is restarted,
963 * so that it can continue incrementing the counters from their last value.
966 * Handle for the PSYCstore.
968 * Public key that identifies the channel.
970 * Callback to call with the result.
972 * Closure for the @a ccb callback.
974 * @return Handle that can be used to cancel the operation.
976 struct GNUNET_PSYCSTORE_OperationHandle *
977 GNUNET_PSYCSTORE_counters_get (struct GNUNET_PSYCSTORE_Handle *h,
978 struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
979 GNUNET_PSYCSTORE_CountersCallback counters_cb,
982 struct OperationRequest *req;
983 struct GNUNET_MQ_Envelope *
984 env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_COUNTERS_GET);
985 req->channel_key = *channel_key;
987 struct GNUNET_PSYCSTORE_OperationHandle *
988 op = op_create (h, h->op, NULL, NULL);
989 op->counters_cb = counters_cb;
991 return op_send (h, op, env, &req->op_id);
996 * Apply modifiers of a message to the current channel state.
998 * An error is returned if there are missing messages containing state
999 * operations before the current one.
1002 * Handle for the PSYCstore.
1003 * @param channel_key
1004 * The channel we are interested in.
1006 * ID of the message that contains the @a modifiers.
1007 * @param state_delta
1008 * Value of the _state_delta PSYC header variable of the message.
1010 * Callback to call with the result of the operation.
1012 * Closure for @a result_cb.
1014 * @return Handle that can be used to cancel the operation.
1016 struct GNUNET_PSYCSTORE_OperationHandle *
1017 GNUNET_PSYCSTORE_state_modify (struct GNUNET_PSYCSTORE_Handle *h,
1018 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1019 uint64_t message_id,
1020 uint64_t state_delta,
1021 GNUNET_PSYCSTORE_ResultCallback result_cb,
1024 struct StateModifyRequest *req;
1025 struct GNUNET_MQ_Envelope *
1026 env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY);
1027 req->channel_key = *channel_key;
1028 req->message_id = GNUNET_htonll (message_id);
1029 req->state_delta = GNUNET_htonll (state_delta);
1031 return op_send (h, op_create (h, h->op, result_cb, cls),
1036 struct StateSyncClosure
1038 GNUNET_PSYCSTORE_ResultCallback result_cb;
1045 state_sync_result (void *cls, int64_t result,
1046 const char *err_msg, uint16_t err_msg_size)
1048 struct StateSyncClosure *ssc = cls;
1049 if (GNUNET_OK != result || ssc->last)
1050 ssc->result_cb (ssc->cls, result, err_msg, err_msg_size);
1056 * Store synchronized state.
1059 * Handle for the PSYCstore.
1060 * @param channel_key
1061 * The channel we are interested in.
1062 * @param max_state_message_id
1063 * ID of the last stateful message before @a state_hash_message_id.
1064 * @param state_hash_message_id
1065 * ID of the message that contains the state_hash PSYC header variable.
1066 * @param modifier_count
1067 * Number of elements in the @a modifiers array.
1069 * Full state to store.
1071 * Callback to call with the result of the operation.
1073 * Closure for the callback.
1075 * @return Handle that can be used to cancel the operation.
1077 struct GNUNET_PSYCSTORE_OperationHandle *
1078 GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h,
1079 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1080 uint64_t max_state_message_id,
1081 uint64_t state_hash_message_id,
1082 size_t modifier_count,
1083 const struct GNUNET_PSYC_Modifier *modifiers,
1084 GNUNET_PSYCSTORE_ResultCallback result_cb,
1087 struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
1090 for (i = 0; i < modifier_count; i++) {
1091 struct StateSyncRequest *req;
1092 uint16_t name_size = strlen (modifiers[i].name) + 1;
1094 struct GNUNET_MQ_Envelope *
1095 env = GNUNET_MQ_msg_extra (req,
1096 sizeof (*req) + name_size + modifiers[i].value_size,
1097 GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC);
1099 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC);
1100 req->header.size = htons (sizeof (*req) + name_size
1101 + modifiers[i].value_size);
1102 req->channel_key = *channel_key;
1103 req->max_state_message_id = GNUNET_htonll (max_state_message_id);
1104 req->state_hash_message_id = GNUNET_htonll (state_hash_message_id);
1105 req->name_size = htons (name_size);
1109 : (modifier_count - 1 == i)
1113 GNUNET_memcpy (&req[1], modifiers[i].name, name_size);
1114 GNUNET_memcpy ((char *) &req[1] + name_size, modifiers[i].value, modifiers[i].value_size);
1116 struct StateSyncClosure *ssc = GNUNET_malloc (sizeof (*ssc));
1117 ssc->last = (req->flags & STATE_OP_LAST);
1118 ssc->result_cb = result_cb;
1121 op_send (h, op_create (h, h->op, state_sync_result, ssc),
1124 // FIXME: only one operation is returned,
1125 // add pointers to other operations and make all cancellable.
1131 * Reset the state of a channel.
1133 * Delete all state variables stored for the given channel.
1136 * Handle for the PSYCstore.
1137 * @param channel_key
1138 * The channel we are interested in.
1140 * Callback to call with the result of the operation.
1142 * Closure for the callback.
1144 * @return Handle that can be used to cancel the operation.
1146 struct GNUNET_PSYCSTORE_OperationHandle *
1147 GNUNET_PSYCSTORE_state_reset (struct GNUNET_PSYCSTORE_Handle *h,
1148 const struct GNUNET_CRYPTO_EddsaPublicKey
1150 GNUNET_PSYCSTORE_ResultCallback result_cb,
1153 struct OperationRequest *req;
1154 struct GNUNET_MQ_Envelope *
1155 env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_RESET);
1156 req->channel_key = *channel_key;
1159 op_send (h, op_create (h, h->op, result_cb, cls),
1165 * Update signed values of state variables in the state store.
1168 * Handle for the PSYCstore.
1169 * @param channel_key
1170 * The channel we are interested in.
1172 * Message ID that contained the state @a hash.
1174 * Hash of the serialized full state.
1176 * Callback to call with the result of the operation.
1178 * Closure for the callback.
1180 struct GNUNET_PSYCSTORE_OperationHandle *
1181 GNUNET_PSYCSTORE_state_hash_update (struct GNUNET_PSYCSTORE_Handle *h,
1182 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1183 uint64_t message_id,
1184 const struct GNUNET_HashCode *hash,
1185 GNUNET_PSYCSTORE_ResultCallback result_cb,
1188 struct StateHashUpdateRequest *req;
1189 struct GNUNET_MQ_Envelope *
1190 env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_HASH_UPDATE);
1191 req->channel_key = *channel_key;
1195 op_send (h, op_create (h, h->op, result_cb, cls),
1201 * Retrieve the best matching state variable.
1204 * Handle for the PSYCstore.
1205 * @param channel_key
1206 * The channel we are interested in.
1208 * Name of variable to match, the returned variable might be less specific.
1210 * Callback to return the matching state variable.
1212 * Callback to call with the result of the operation.
1214 * Closure for the callbacks.
1216 * @return Handle that can be used to cancel the operation.
1218 struct GNUNET_PSYCSTORE_OperationHandle *
1219 GNUNET_PSYCSTORE_state_get (struct GNUNET_PSYCSTORE_Handle *h,
1220 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1222 GNUNET_PSYCSTORE_StateCallback state_cb,
1223 GNUNET_PSYCSTORE_ResultCallback result_cb,
1226 size_t name_size = strlen (name) + 1;
1227 struct OperationRequest *req;
1228 struct GNUNET_MQ_Envelope *
1229 env = GNUNET_MQ_msg_extra (req, name_size,
1230 GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET);
1231 req->channel_key = *channel_key;
1232 GNUNET_memcpy (&req[1], name, name_size);
1234 struct GNUNET_PSYCSTORE_OperationHandle *
1235 op = op_create (h, h->op, result_cb, cls);
1236 op->state_cb = state_cb;
1238 return op_send (h, op, env, &req->op_id);
1243 * Retrieve all state variables for a channel with the given prefix.
1246 * Handle for the PSYCstore.
1247 * @param channel_key
1248 * The channel we are interested in.
1249 * @param name_prefix
1250 * Prefix of state variable names to match.
1252 * Callback to return matching state variables.
1254 * Callback to call with the result of the operation.
1256 * Closure for the callbacks.
1258 * @return Handle that can be used to cancel the operation.
1260 struct GNUNET_PSYCSTORE_OperationHandle *
1261 GNUNET_PSYCSTORE_state_get_prefix (struct GNUNET_PSYCSTORE_Handle *h,
1262 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1263 const char *name_prefix,
1264 GNUNET_PSYCSTORE_StateCallback state_cb,
1265 GNUNET_PSYCSTORE_ResultCallback result_cb,
1268 size_t name_size = strlen (name_prefix) + 1;
1269 struct OperationRequest *req;
1270 struct GNUNET_MQ_Envelope *
1271 env = GNUNET_MQ_msg_extra (req, name_size,
1272 GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET_PREFIX);
1273 req->channel_key = *channel_key;
1274 GNUNET_memcpy (&req[1], name_prefix, name_size);
1276 struct GNUNET_PSYCSTORE_OperationHandle *
1277 op = op_create (h, h->op, result_cb, cls);
1278 op->state_cb = state_cb;
1280 return op_send (h, op, env, &req->op_id);
1283 /* end of psycstore_api.c */