2 * This file is part of GNUnet
3 * Copyright (C) 2013 GNUnet e.V.
5 * GNUnet is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published
7 * by the Free Software Foundation; either version 3, or (at your
8 * option) any later version.
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with GNUnet; see the file COPYING. If not, write to the
17 * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 * Boston, MA 02110-1301, USA.
22 * @file psycstore/psycstore_api.c
23 * @brief API to interact with the PSYCstore service
24 * @author Gabor X Toth
25 * @author Christian Grothoff
31 #include "gnunet_util_lib.h"
32 #include "gnunet_constants.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_psycstore_service.h"
35 #include "gnunet_multicast_service.h"
36 #include "psycstore.h"
38 #define LOG(kind,...) GNUNET_log_from (kind, "psycstore-api",__VA_ARGS__)
41 * Handle for an operation with the PSYCstore service.
43 struct GNUNET_PSYCSTORE_OperationHandle
47 * Main PSYCstore handle.
49 struct GNUNET_PSYCSTORE_Handle *h;
55 GNUNET_PSYCSTORE_FragmentCallback fragment_cb;
56 GNUNET_PSYCSTORE_CountersCallback counters_cb;
57 GNUNET_PSYCSTORE_StateCallback state_cb;
61 * Closure for callbacks.
68 struct GNUNET_MQ_Envelope *env;
78 * Handle for the service.
80 struct GNUNET_PSYCSTORE_Handle
83 * Configuration to use.
85 const struct GNUNET_CONFIGURATION_Handle *cfg;
90 struct GNUNET_MQ_Handle *mq;
95 struct GNUNET_OP_Handle *op;
98 * Task doing exponential back-off trying to reconnect.
100 struct GNUNET_SCHEDULER_Task *reconnect_task;
103 * Delay for next connect retry.
105 struct GNUNET_TIME_Relative reconnect_delay;
108 GNUNET_PSYCSTORE_FragmentCallback *fragment_cb;
110 GNUNET_PSYCSTORE_CountersCallback *counters_cb;
112 GNUNET_PSYCSTORE_StateCallback *state_cb;
114 * Closure for callbacks.
121 check_result_code (void *cls, const struct OperationResult *opres)
123 uint16_t size = ntohs (opres->header.size);
124 const char *str = (const char *) &opres[1];
125 if ( (sizeof (*opres) < size) &&
126 ('\0' != str[size - sizeof (*opres) - 1]) )
129 return GNUNET_SYSERR;
137 handle_result_code (void *cls, const struct OperationResult *opres)
139 struct GNUNET_PSYCSTORE_Handle *h = cls;
140 struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
141 uint16_t size = ntohs (opres->header.size);
144 str = (sizeof (*opres) < size) ? (const char *) &opres[1] : "";
146 if (GNUNET_YES == GNUNET_OP_result (h->op, GNUNET_ntohll (opres->op_id),
147 GNUNET_ntohll (opres->result_code) + INT64_MIN,
148 str, size - sizeof (*opres), (void **) &op))
150 LOG (GNUNET_ERROR_TYPE_DEBUG,
151 "handle_result_code: Received result message with operation ID: %" PRIu64 "\n",
152 GNUNET_ntohll (opres->op_id));
157 LOG (GNUNET_ERROR_TYPE_DEBUG,
158 "handle_result_code: No callback registered for operation with ID %" PRIu64 ".\n",
159 GNUNET_ntohll (opres->op_id));
161 h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
166 handle_result_counters (void *cls, const struct CountersResult *cres)
168 struct GNUNET_PSYCSTORE_Handle *h = cls;
169 struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
171 if (GNUNET_YES == GNUNET_OP_get (h->op, GNUNET_ntohll (cres->op_id),
172 NULL, NULL, (void **) &op))
174 GNUNET_assert (NULL != op);
175 if (NULL != op->counters_cb)
177 op->counters_cb (op->cls,
178 ntohl (cres->result_code),
179 GNUNET_ntohll (cres->max_fragment_id),
180 GNUNET_ntohll (cres->max_message_id),
181 GNUNET_ntohll (cres->max_group_generation),
182 GNUNET_ntohll (cres->max_state_message_id));
184 GNUNET_OP_remove (h->op, GNUNET_ntohll (cres->op_id));
189 LOG (GNUNET_ERROR_TYPE_DEBUG,
190 "handle_result_counters: No callback registered for operation with ID %" PRIu64 ".\n",
191 GNUNET_ntohll (cres->op_id));
193 h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
198 check_result_fragment (void *cls, const struct FragmentResult *fres)
200 uint16_t size = ntohs (fres->header.size);
201 struct GNUNET_MULTICAST_MessageHeader *mmsg =
202 (struct GNUNET_MULTICAST_MessageHeader *) &fres[1];
203 if (sizeof (*fres) + sizeof (*mmsg) < size
204 && sizeof (*fres) + ntohs (mmsg->header.size) != size)
206 LOG (GNUNET_ERROR_TYPE_ERROR,
207 "check_result_fragment: Received message with invalid length %lu bytes.\n",
208 size, sizeof (*fres));
210 return GNUNET_SYSERR;
217 handle_result_fragment (void *cls, const struct FragmentResult *fres)
219 struct GNUNET_PSYCSTORE_Handle *h = cls;
220 struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
222 if (GNUNET_YES == GNUNET_OP_get (h->op, GNUNET_ntohll (fres->op_id),
223 NULL, NULL, (void **) &op))
225 GNUNET_assert (NULL != op);
226 if (NULL != op->fragment_cb)
227 op->fragment_cb (op->cls,
228 (struct GNUNET_MULTICAST_MessageHeader *) &fres[1],
229 ntohl (fres->psycstore_flags));
230 //GNUNET_OP_remove (h->op, GNUNET_ntohll (fres->op_id));
235 LOG (GNUNET_ERROR_TYPE_DEBUG,
236 "handle_result_fragment: No callback registered for operation with ID %" PRIu64 ".\n",
237 GNUNET_ntohll (fres->op_id));
239 h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
244 check_result_state (void *cls, const struct StateResult *sres)
246 const char *name = (const char *) &sres[1];
247 uint16_t size = ntohs (sres->header.size);
248 uint16_t name_size = ntohs (sres->name_size);
251 || size - sizeof (*sres) < name_size
252 || '\0' != name[name_size - 1])
254 LOG (GNUNET_ERROR_TYPE_ERROR,
255 "check_result_state: Received state result message with invalid name.\n");
257 return GNUNET_SYSERR;
264 handle_result_state (void *cls, const struct StateResult *sres)
266 struct GNUNET_PSYCSTORE_Handle *h = cls;
267 struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
269 const char *name = (const char *) &sres[1];
270 uint16_t name_size = ntohs (sres->name_size);
272 if (GNUNET_YES == GNUNET_OP_get (h->op, GNUNET_ntohll (sres->op_id),
273 NULL, NULL, (void **) &op))
275 GNUNET_assert (NULL != op);
276 if (NULL != op->state_cb)
277 op->state_cb (op->cls, name, (char *) &sres[1] + name_size,
278 ntohs (sres->header.size) - sizeof (*sres) - name_size);
279 //GNUNET_OP_remove (h->op, GNUNET_ntohll (sres->op_id));
284 LOG (GNUNET_ERROR_TYPE_DEBUG,
285 "handle_result_state: No callback registered for operation with ID %" PRIu64 ".\n",
286 GNUNET_ntohll (sres->op_id));
288 h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
293 reconnect (void *cls);
297 * Client disconnected from service.
299 * Reconnect after backoff period.=
302 disconnected (void *cls, enum GNUNET_MQ_Error error)
304 struct GNUNET_PSYCSTORE_Handle *h = cls;
306 LOG (GNUNET_ERROR_TYPE_DEBUG,
307 "Origin client disconnected (%d), re-connecting\n",
311 GNUNET_MQ_destroy (h->mq);
312 GNUNET_OP_destroy (h->op);
317 h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_delay,
319 h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
324 do_connect (struct GNUNET_PSYCSTORE_Handle *h)
326 LOG (GNUNET_ERROR_TYPE_DEBUG,
327 "Connecting to PSYCstore service.\n");
329 struct GNUNET_MQ_MessageHandler handlers[] = {
330 GNUNET_MQ_hd_var_size (result_code,
331 GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE,
332 struct OperationResult,
334 GNUNET_MQ_hd_fixed_size (result_counters,
335 GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS,
336 struct CountersResult,
338 GNUNET_MQ_hd_var_size (result_fragment,
339 GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_FRAGMENT,
340 struct FragmentResult,
342 GNUNET_MQ_hd_var_size (result_state,
343 GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_STATE,
346 GNUNET_MQ_handler_end ()
349 h->op = GNUNET_OP_create ();
350 GNUNET_assert (NULL == h->mq);
351 h->mq = GNUNET_CLIENT_connecT (h->cfg, "psycstore",
352 handlers, disconnected, h);
353 GNUNET_assert (NULL != h->mq);
358 * Try again to connect to the PSYCstore service.
360 * @param cls Handle to the PSYCstore service.
363 reconnect (void *cls)
370 * Connect to the PSYCstore service.
372 * @param cfg The configuration to use
373 * @return Handle to use
375 struct GNUNET_PSYCSTORE_Handle *
376 GNUNET_PSYCSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
378 struct GNUNET_PSYCSTORE_Handle *h
379 = GNUNET_new (struct GNUNET_PSYCSTORE_Handle);
381 h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
388 * Disconnect from PSYCstore service
390 * @param h Handle to destroy
393 GNUNET_PSYCSTORE_disconnect (struct GNUNET_PSYCSTORE_Handle *h)
395 GNUNET_assert (NULL != h);
396 if (h->reconnect_task != NULL)
398 GNUNET_SCHEDULER_cancel (h->reconnect_task);
399 h->reconnect_task = NULL;
403 // FIXME: free data structures for pending operations
404 GNUNET_MQ_destroy (h->mq);
412 * Message sent notification.
414 * Remove invalidated envelope pointer.
417 message_sent (void *cls)
419 struct GNUNET_PSYCSTORE_OperationHandle *op = cls;
425 * Create a new operation.
427 static struct GNUNET_PSYCSTORE_OperationHandle *
428 op_create (struct GNUNET_PSYCSTORE_Handle *h,
429 struct GNUNET_OP_Handle *hop,
430 GNUNET_PSYCSTORE_ResultCallback result_cb,
433 struct GNUNET_PSYCSTORE_OperationHandle *
434 op = GNUNET_malloc (sizeof (*op));
436 op->op_id = GNUNET_OP_add (hop,
437 (GNUNET_ResultCallback) result_cb,
444 * Send a message associated with an operation.
451 * Message envelope to send.
453 * Operation ID to write in network byte order. NULL if not needed.
455 * @return Operation handle.
458 static struct GNUNET_PSYCSTORE_OperationHandle *
459 op_send (struct GNUNET_PSYCSTORE_Handle *h,
460 struct GNUNET_PSYCSTORE_OperationHandle *op,
461 struct GNUNET_MQ_Envelope *env,
466 *op_id = GNUNET_htonll (op->op_id);
468 GNUNET_MQ_notify_sent (env, message_sent, op);
469 GNUNET_MQ_send (h->mq, env);
475 * Cancel a PSYCstore operation. Note that the operation MAY still
476 * be executed; this merely cancels the continuation; if the request
477 * was already transmitted, the service may still choose to complete
480 * @param op Operation to cancel.
482 * @return #GNUNET_YES if message was not sent yet and got discarded,
483 * #GNUNET_NO if it was already sent, and only the callbacks got cancelled.
486 GNUNET_PSYCSTORE_operation_cancel (struct GNUNET_PSYCSTORE_OperationHandle *op)
488 struct GNUNET_PSYCSTORE_Handle *h = op->h;
493 GNUNET_MQ_send_cancel (op->env);
497 GNUNET_OP_remove (h->op, op->op_id);
505 * Store join/leave events for a PSYC channel in order to be able to answer
506 * membership test queries later.
509 * Handle for the PSYCstore.
511 * The channel where the event happened.
513 * Public key of joining/leaving slave.
515 * #GNUNET_YES on join, #GNUNET_NO on part.
516 * @param announced_at
517 * ID of the message that announced the membership change.
518 * @param effective_since
519 * Message ID this membership change is in effect since.
520 * For joins it is <= announced_at, for parts it is always 0.
521 * @param group_generation
522 * In case of a part, the last group generation the slave has access to.
523 * It has relevance when a larger message have fragments with different
526 * Callback to call with the result of the storage operation.
528 * Closure for the callback.
530 * @return Operation handle that can be used to cancel the operation.
532 struct GNUNET_PSYCSTORE_OperationHandle *
533 GNUNET_PSYCSTORE_membership_store (struct GNUNET_PSYCSTORE_Handle *h,
534 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
535 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
537 uint64_t announced_at,
538 uint64_t effective_since,
539 uint64_t group_generation,
540 GNUNET_PSYCSTORE_ResultCallback result_cb,
543 GNUNET_assert (NULL != h);
544 GNUNET_assert (NULL != channel_key);
545 GNUNET_assert (NULL != slave_key);
546 GNUNET_assert (GNUNET_YES == did_join || GNUNET_NO == did_join);
547 GNUNET_assert (did_join
548 ? effective_since <= announced_at
549 : effective_since == 0);
551 struct MembershipStoreRequest *req;
552 struct GNUNET_MQ_Envelope *
553 env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_STORE);
554 req->channel_key = *channel_key;
555 req->slave_key = *slave_key;
556 req->did_join = did_join;
557 req->announced_at = GNUNET_htonll (announced_at);
558 req->effective_since = GNUNET_htonll (effective_since);
559 req->group_generation = GNUNET_htonll (group_generation);
562 op_send (h, op_create (h, h->op, result_cb, cls),
568 * Test if a member was admitted to the channel at the given message ID.
570 * This is useful when relaying and replaying messages to check if a particular
571 * slave has access to the message fragment with a given group generation. It
572 * is also used when handling join requests to determine whether the slave is
573 * currently admitted to the channel.
576 * Handle for the PSYCstore.
578 * The channel we are interested in.
580 * Public key of slave whose membership to check.
582 * Message ID for which to do the membership test.
583 * @param group_generation
584 * Group generation of the fragment of the message to test.
585 * It has relevance if the message consists of multiple fragments with
586 * different group generations.
588 * Callback to call with the test result.
590 * Closure for the callback.
592 * @return Operation handle that can be used to cancel the operation.
594 struct GNUNET_PSYCSTORE_OperationHandle *
595 GNUNET_PSYCSTORE_membership_test (struct GNUNET_PSYCSTORE_Handle *h,
596 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
597 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
599 uint64_t group_generation,
600 GNUNET_PSYCSTORE_ResultCallback result_cb,
603 struct MembershipTestRequest *req;
604 struct GNUNET_MQ_Envelope *
605 env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_TEST);
606 req->channel_key = *channel_key;
607 req->slave_key = *slave_key;
608 req->message_id = GNUNET_htonll (message_id);
609 req->group_generation = GNUNET_htonll (group_generation);
612 op_send (h, op_create (h, h->op, result_cb, cls),
618 * Store a message fragment sent to a channel.
620 * @param h Handle for the PSYCstore.
621 * @param channel_key The channel the message belongs to.
622 * @param message Message to store.
623 * @param psycstore_flags Flags indicating whether the PSYC message contains
625 * @param result_cb Callback to call with the result of the operation.
626 * @param cls Closure for the callback.
628 * @return Handle that can be used to cancel the operation.
630 struct GNUNET_PSYCSTORE_OperationHandle *
631 GNUNET_PSYCSTORE_fragment_store (struct GNUNET_PSYCSTORE_Handle *h,
632 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
633 const struct GNUNET_MULTICAST_MessageHeader *msg,
634 enum GNUNET_PSYCSTORE_MessageFlags psycstore_flags,
635 GNUNET_PSYCSTORE_ResultCallback result_cb,
638 uint16_t size = ntohs (msg->header.size);
639 struct FragmentStoreRequest *req;
640 struct GNUNET_MQ_Envelope *
641 env = GNUNET_MQ_msg_extra (req, size,
642 GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_STORE);
643 req->channel_key = *channel_key;
644 req->psycstore_flags = htonl (psycstore_flags);
645 GNUNET_memcpy (&req[1], msg, size);
648 op_send (h, op_create (h, h->op, result_cb, cls),
654 * Retrieve message fragments by fragment ID range.
657 * Handle for the PSYCstore.
659 * The channel we are interested in.
661 * The slave requesting the fragment. If not NULL, a membership test is
662 * performed first and the fragment is only returned if the slave has
664 * @param first_fragment_id
665 * First fragment ID to retrieve.
666 * Use 0 to get the latest message fragment.
667 * @param last_fragment_id
668 * Last consecutive fragment ID to retrieve.
669 * Use 0 to get the latest message fragment.
670 * @param fragment_limit
671 * Maximum number of fragments to retrieve.
673 * Callback to call with the retrieved fragments.
675 * Callback to call with the result of the operation.
677 * Closure for the callbacks.
679 * @return Handle that can be used to cancel the operation.
681 struct GNUNET_PSYCSTORE_OperationHandle *
682 GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h,
683 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
684 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
685 uint64_t first_fragment_id,
686 uint64_t last_fragment_id,
687 GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
688 GNUNET_PSYCSTORE_ResultCallback result_cb,
691 struct FragmentGetRequest *req;
692 struct GNUNET_MQ_Envelope *
693 env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET);
694 req->channel_key = *channel_key;
695 req->first_fragment_id = GNUNET_htonll (first_fragment_id);
696 req->last_fragment_id = GNUNET_htonll (last_fragment_id);
697 if (NULL != slave_key)
699 req->slave_key = *slave_key;
700 req->do_membership_test = GNUNET_YES;
703 struct GNUNET_PSYCSTORE_OperationHandle *
704 op = op_create (h, h->op, result_cb, cls);
705 op->fragment_cb = fragment_cb;
707 return op_send (h, op, env, &req->op_id);
712 * Retrieve latest message fragments.
715 * Handle for the PSYCstore.
717 * The channel we are interested in.
719 * The slave requesting the fragment. If not NULL, a membership test is
720 * performed first and the fragment is only returned if the slave has
722 * @param first_fragment_id
723 * First fragment ID to retrieve.
724 * Use 0 to get the latest message fragment.
725 * @param last_fragment_id
726 * Last consecutive fragment ID to retrieve.
727 * Use 0 to get the latest message fragment.
728 * @param fragment_limit
729 * Maximum number of fragments to retrieve.
731 * Callback to call with the retrieved fragments.
733 * Callback to call with the result of the operation.
735 * Closure for the callbacks.
737 * @return Handle that can be used to cancel the operation.
739 struct GNUNET_PSYCSTORE_OperationHandle *
740 GNUNET_PSYCSTORE_fragment_get_latest (struct GNUNET_PSYCSTORE_Handle *h,
741 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
742 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
743 uint64_t fragment_limit,
744 GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
745 GNUNET_PSYCSTORE_ResultCallback result_cb,
748 struct FragmentGetRequest *req;
749 struct GNUNET_MQ_Envelope *
750 env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET);
751 req->channel_key = *channel_key;
752 req->fragment_limit = GNUNET_ntohll (fragment_limit);
753 if (NULL != slave_key)
755 req->slave_key = *slave_key;
756 req->do_membership_test = GNUNET_YES;
759 struct GNUNET_PSYCSTORE_OperationHandle *
760 op = op_create (h, h->op, result_cb, cls);
761 op->fragment_cb = fragment_cb;
763 return op_send (h, op, env, &req->op_id);
768 * Retrieve all fragments of messages in a message ID range.
771 * Handle for the PSYCstore.
773 * The channel we are interested in.
775 * The slave requesting the message.
776 * If not NULL, a membership test is performed first
777 * and the message is only returned if the slave has access to it.
778 * @param first_message_id
779 * First message ID to retrieve.
780 * @param last_message_id
781 * Last consecutive message ID to retrieve.
782 * @param fragment_limit
783 * Maximum number of fragments to retrieve.
784 * @param method_prefix
785 * Retrieve only messages with a matching method prefix.
786 * @todo Implement method_prefix query.
788 * Callback to call with the retrieved fragments.
790 * Callback to call with the result of the operation.
792 * Closure for the callbacks.
794 * @return Handle that can be used to cancel the operation.
796 struct GNUNET_PSYCSTORE_OperationHandle *
797 GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h,
798 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
799 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
800 uint64_t first_message_id,
801 uint64_t last_message_id,
802 uint64_t fragment_limit,
803 const char *method_prefix,
804 GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
805 GNUNET_PSYCSTORE_ResultCallback result_cb,
808 struct MessageGetRequest *req;
809 if (NULL == method_prefix)
811 uint16_t method_size = strnlen (method_prefix,
812 GNUNET_SERVER_MAX_MESSAGE_SIZE
813 - sizeof (*req)) + 1;
815 struct GNUNET_MQ_Envelope *
816 env = GNUNET_MQ_msg_extra (req, method_size,
817 GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET);
818 req->channel_key = *channel_key;
819 req->first_message_id = GNUNET_htonll (first_message_id);
820 req->last_message_id = GNUNET_htonll (last_message_id);
821 req->fragment_limit = GNUNET_htonll (fragment_limit);
822 if (NULL != slave_key)
824 req->slave_key = *slave_key;
825 req->do_membership_test = GNUNET_YES;
827 GNUNET_memcpy (&req[1], method_prefix, method_size);
828 ((char *) &req[1])[method_size - 1] = '\0';
830 struct GNUNET_PSYCSTORE_OperationHandle *
831 op = op_create (h, h->op, result_cb, cls);
832 op->fragment_cb = fragment_cb;
834 return op_send (h, op, env, &req->op_id);
839 * Retrieve all fragments of the latest messages.
842 * Handle for the PSYCstore.
844 * The channel we are interested in.
846 * The slave requesting the message.
847 * If not NULL, a membership test is performed first
848 * and the message is only returned if the slave has access to it.
849 * @param message_limit
850 * Maximum number of messages to retrieve.
851 * @param method_prefix
852 * Retrieve only messages with a matching method prefix.
853 * @todo Implement method_prefix query.
855 * Callback to call with the retrieved fragments.
857 * Callback to call with the result of the operation.
859 * Closure for the callbacks.
861 * @return Handle that can be used to cancel the operation.
863 struct GNUNET_PSYCSTORE_OperationHandle *
864 GNUNET_PSYCSTORE_message_get_latest (struct GNUNET_PSYCSTORE_Handle *h,
865 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
866 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
867 uint64_t message_limit,
868 const char *method_prefix,
869 GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
870 GNUNET_PSYCSTORE_ResultCallback result_cb,
873 struct MessageGetRequest *req;
875 if (NULL == method_prefix)
877 uint16_t method_size = strnlen (method_prefix,
878 GNUNET_SERVER_MAX_MESSAGE_SIZE
879 - sizeof (*req)) + 1;
880 GNUNET_assert ('\0' == method_prefix[method_size - 1]);
882 struct GNUNET_MQ_Envelope *
883 env = GNUNET_MQ_msg_extra (req, method_size,
884 GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET);
885 req->channel_key = *channel_key;
886 req->message_limit = GNUNET_ntohll (message_limit);
887 if (NULL != slave_key)
889 req->slave_key = *slave_key;
890 req->do_membership_test = GNUNET_YES;
892 GNUNET_memcpy (&req[1], method_prefix, method_size);
894 struct GNUNET_PSYCSTORE_OperationHandle *
895 op = op_create (h, h->op, result_cb, cls);
896 op->fragment_cb = fragment_cb;
898 return op_send (h, op, env, &req->op_id);
903 * Retrieve a fragment of message specified by its message ID and fragment
907 * Handle for the PSYCstore.
909 * The channel we are interested in.
911 * The slave requesting the message fragment. If not NULL, a membership
912 * test is performed first and the message fragment is only returned
913 * if the slave has access to it.
915 * Message ID to retrieve. Use 0 to get the latest message.
916 * @param fragment_offset
917 * Offset of the fragment to retrieve.
919 * Callback to call with the retrieved fragments.
921 * Callback to call with the result of the operation.
923 * Closure for the callbacks.
925 * @return Handle that can be used to cancel the operation.
927 struct GNUNET_PSYCSTORE_OperationHandle *
928 GNUNET_PSYCSTORE_message_get_fragment (struct GNUNET_PSYCSTORE_Handle *h,
929 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
930 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
932 uint64_t fragment_offset,
933 GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
934 GNUNET_PSYCSTORE_ResultCallback result_cb,
937 struct MessageGetFragmentRequest *req;
938 struct GNUNET_MQ_Envelope *
939 env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET_FRAGMENT);
941 req->channel_key = *channel_key;
942 req->message_id = GNUNET_htonll (message_id);
943 req->fragment_offset = GNUNET_htonll (fragment_offset);
944 if (NULL != slave_key)
946 req->slave_key = *slave_key;
947 req->do_membership_test = GNUNET_YES;
950 struct GNUNET_PSYCSTORE_OperationHandle *
951 op = op_create (h, h->op, result_cb, cls);
952 op->fragment_cb = fragment_cb;
954 return op_send (h, op, env, &req->op_id);
959 * Retrieve latest values of counters for a channel master.
961 * The current value of counters are needed when a channel master is restarted,
962 * so that it can continue incrementing the counters from their last value.
965 * Handle for the PSYCstore.
967 * Public key that identifies the channel.
969 * Callback to call with the result.
971 * Closure for the @a ccb callback.
973 * @return Handle that can be used to cancel the operation.
975 struct GNUNET_PSYCSTORE_OperationHandle *
976 GNUNET_PSYCSTORE_counters_get (struct GNUNET_PSYCSTORE_Handle *h,
977 struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
978 GNUNET_PSYCSTORE_CountersCallback counters_cb,
981 struct OperationRequest *req;
982 struct GNUNET_MQ_Envelope *
983 env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_COUNTERS_GET);
984 req->channel_key = *channel_key;
986 struct GNUNET_PSYCSTORE_OperationHandle *
987 op = op_create (h, h->op, NULL, NULL);
988 op->counters_cb = counters_cb;
990 return op_send (h, op, env, &req->op_id);
995 * Apply modifiers of a message to the current channel state.
997 * An error is returned if there are missing messages containing state
998 * operations before the current one.
1001 * Handle for the PSYCstore.
1002 * @param channel_key
1003 * The channel we are interested in.
1005 * ID of the message that contains the @a modifiers.
1006 * @param state_delta
1007 * Value of the _state_delta PSYC header variable of the message.
1009 * Callback to call with the result of the operation.
1011 * Closure for @a result_cb.
1013 * @return Handle that can be used to cancel the operation.
1015 struct GNUNET_PSYCSTORE_OperationHandle *
1016 GNUNET_PSYCSTORE_state_modify (struct GNUNET_PSYCSTORE_Handle *h,
1017 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1018 uint64_t message_id,
1019 uint64_t state_delta,
1020 GNUNET_PSYCSTORE_ResultCallback result_cb,
1023 struct StateModifyRequest *req;
1024 struct GNUNET_MQ_Envelope *
1025 env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY);
1026 req->channel_key = *channel_key;
1027 req->message_id = GNUNET_htonll (message_id);
1028 req->state_delta = GNUNET_htonll (state_delta);
1030 return op_send (h, op_create (h, h->op, result_cb, cls),
1035 struct StateSyncClosure
1037 GNUNET_PSYCSTORE_ResultCallback result_cb;
1044 state_sync_result (void *cls, int64_t result,
1045 const char *err_msg, uint16_t err_msg_size)
1047 struct StateSyncClosure *ssc = cls;
1048 if (GNUNET_OK != result || ssc->last)
1049 ssc->result_cb (ssc->cls, result, err_msg, err_msg_size);
1055 * Store synchronized state.
1058 * Handle for the PSYCstore.
1059 * @param channel_key
1060 * The channel we are interested in.
1061 * @param max_state_message_id
1062 * ID of the last stateful message before @a state_hash_message_id.
1063 * @param state_hash_message_id
1064 * ID of the message that contains the state_hash PSYC header variable.
1065 * @param modifier_count
1066 * Number of elements in the @a modifiers array.
1068 * Full state to store.
1070 * Callback to call with the result of the operation.
1072 * Closure for the callback.
1074 * @return Handle that can be used to cancel the operation.
1076 struct GNUNET_PSYCSTORE_OperationHandle *
1077 GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h,
1078 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1079 uint64_t max_state_message_id,
1080 uint64_t state_hash_message_id,
1081 size_t modifier_count,
1082 const struct GNUNET_PSYC_Modifier *modifiers,
1083 GNUNET_PSYCSTORE_ResultCallback result_cb,
1086 struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
1089 for (i = 0; i < modifier_count; i++) {
1090 struct StateSyncRequest *req;
1091 uint16_t name_size = strlen (modifiers[i].name) + 1;
1093 struct GNUNET_MQ_Envelope *
1094 env = GNUNET_MQ_msg_extra (req,
1095 sizeof (*req) + name_size + modifiers[i].value_size,
1096 GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC);
1098 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC);
1099 req->header.size = htons (sizeof (*req) + name_size
1100 + modifiers[i].value_size);
1101 req->channel_key = *channel_key;
1102 req->max_state_message_id = GNUNET_htonll (max_state_message_id);
1103 req->state_hash_message_id = GNUNET_htonll (state_hash_message_id);
1104 req->name_size = htons (name_size);
1108 : (modifier_count - 1 == i)
1112 GNUNET_memcpy (&req[1], modifiers[i].name, name_size);
1113 GNUNET_memcpy ((char *) &req[1] + name_size, modifiers[i].value, modifiers[i].value_size);
1115 struct StateSyncClosure *ssc = GNUNET_malloc (sizeof (*ssc));
1116 ssc->last = (req->flags & STATE_OP_LAST);
1117 ssc->result_cb = result_cb;
1120 op_send (h, op_create (h, h->op, state_sync_result, ssc),
1123 // FIXME: only one operation is returned,
1124 // add pointers to other operations and make all cancellable.
1130 * Reset the state of a channel.
1132 * Delete all state variables stored for the given channel.
1135 * Handle for the PSYCstore.
1136 * @param channel_key
1137 * The channel we are interested in.
1139 * Callback to call with the result of the operation.
1141 * Closure for the callback.
1143 * @return Handle that can be used to cancel the operation.
1145 struct GNUNET_PSYCSTORE_OperationHandle *
1146 GNUNET_PSYCSTORE_state_reset (struct GNUNET_PSYCSTORE_Handle *h,
1147 const struct GNUNET_CRYPTO_EddsaPublicKey
1149 GNUNET_PSYCSTORE_ResultCallback result_cb,
1152 struct OperationRequest *req;
1153 struct GNUNET_MQ_Envelope *
1154 env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_RESET);
1155 req->channel_key = *channel_key;
1158 op_send (h, op_create (h, h->op, result_cb, cls),
1164 * Update signed values of state variables in the state store.
1167 * Handle for the PSYCstore.
1168 * @param channel_key
1169 * The channel we are interested in.
1171 * Message ID that contained the state @a hash.
1173 * Hash of the serialized full state.
1175 * Callback to call with the result of the operation.
1177 * Closure for the callback.
1179 struct GNUNET_PSYCSTORE_OperationHandle *
1180 GNUNET_PSYCSTORE_state_hash_update (struct GNUNET_PSYCSTORE_Handle *h,
1181 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1182 uint64_t message_id,
1183 const struct GNUNET_HashCode *hash,
1184 GNUNET_PSYCSTORE_ResultCallback result_cb,
1187 struct StateHashUpdateRequest *req;
1188 struct GNUNET_MQ_Envelope *
1189 env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_HASH_UPDATE);
1190 req->channel_key = *channel_key;
1194 op_send (h, op_create (h, h->op, result_cb, cls),
1200 * Retrieve the best matching state variable.
1203 * Handle for the PSYCstore.
1204 * @param channel_key
1205 * The channel we are interested in.
1207 * Name of variable to match, the returned variable might be less specific.
1209 * Callback to return the matching state variable.
1211 * Callback to call with the result of the operation.
1213 * Closure for the callbacks.
1215 * @return Handle that can be used to cancel the operation.
1217 struct GNUNET_PSYCSTORE_OperationHandle *
1218 GNUNET_PSYCSTORE_state_get (struct GNUNET_PSYCSTORE_Handle *h,
1219 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1221 GNUNET_PSYCSTORE_StateCallback state_cb,
1222 GNUNET_PSYCSTORE_ResultCallback result_cb,
1225 size_t name_size = strlen (name) + 1;
1226 struct OperationRequest *req;
1227 struct GNUNET_MQ_Envelope *
1228 env = GNUNET_MQ_msg_extra (req, name_size,
1229 GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET);
1230 req->channel_key = *channel_key;
1231 GNUNET_memcpy (&req[1], name, name_size);
1233 struct GNUNET_PSYCSTORE_OperationHandle *
1234 op = op_create (h, h->op, result_cb, cls);
1235 op->state_cb = state_cb;
1237 return op_send (h, op, env, &req->op_id);
1242 * Retrieve all state variables for a channel with the given prefix.
1245 * Handle for the PSYCstore.
1246 * @param channel_key
1247 * The channel we are interested in.
1248 * @param name_prefix
1249 * Prefix of state variable names to match.
1251 * Callback to return matching state variables.
1253 * Callback to call with the result of the operation.
1255 * Closure for the callbacks.
1257 * @return Handle that can be used to cancel the operation.
1259 struct GNUNET_PSYCSTORE_OperationHandle *
1260 GNUNET_PSYCSTORE_state_get_prefix (struct GNUNET_PSYCSTORE_Handle *h,
1261 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1262 const char *name_prefix,
1263 GNUNET_PSYCSTORE_StateCallback state_cb,
1264 GNUNET_PSYCSTORE_ResultCallback result_cb,
1267 size_t name_size = strlen (name_prefix) + 1;
1268 struct OperationRequest *req;
1269 struct GNUNET_MQ_Envelope *
1270 env = GNUNET_MQ_msg_extra (req, name_size,
1271 GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET_PREFIX);
1272 req->channel_key = *channel_key;
1273 GNUNET_memcpy (&req[1], name_prefix, name_size);
1275 struct GNUNET_PSYCSTORE_OperationHandle *
1276 op = op_create (h, h->op, result_cb, cls);
1277 op->state_cb = state_cb;
1279 return op_send (h, op, env, &req->op_id);
1282 /* end of psycstore_api.c */