2 * This file is part of GNUnet
3 * Copyright (C) 2013 GNUnet e.V.
5 * GNUnet is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published
7 * by the Free Software Foundation; either version 3, or (at your
8 * option) any later version.
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with GNUnet; see the file COPYING. If not, write to the
17 * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 * Boston, MA 02110-1301, USA.
22 * @file psycstore/psycstore_api.c
23 * @brief API to interact with the PSYCstore service
24 * @author Gabor X Toth
25 * @author Christian Grothoff
31 #include "gnunet_util_lib.h"
32 #include "gnunet_constants.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_psycstore_service.h"
35 #include "gnunet_multicast_service.h"
36 #include "psycstore.h"
38 #define LOG(kind,...) GNUNET_log_from (kind, "psycstore-api",__VA_ARGS__)
40 typedef void (*DataCallback) ();
43 * Handle for an operation with the PSYCstore service.
45 struct GNUNET_PSYCSTORE_OperationHandle
49 * Main PSYCstore handle.
51 struct GNUNET_PSYCSTORE_Handle *h;
54 * We keep operations in a DLL.
56 struct GNUNET_PSYCSTORE_OperationHandle *next;
59 * We keep operations in a DLL.
61 struct GNUNET_PSYCSTORE_OperationHandle *prev;
64 * Continuation to invoke with the result of an operation.
66 GNUNET_PSYCSTORE_ResultCallback res_cb;
69 * Continuation to invoke with the result of an operation returning data.
74 * Closure for the callbacks.
84 * Message to send to the PSYCstore service.
85 * Allocated at the end of this struct.
87 const struct GNUNET_MessageHeader *msg;
92 * Handle for the service.
94 struct GNUNET_PSYCSTORE_Handle
97 * Configuration to use.
99 const struct GNUNET_CONFIGURATION_Handle *cfg;
102 * Socket (if available).
104 struct GNUNET_CLIENT_Connection *client;
107 * Head of operations to transmit.
109 struct GNUNET_PSYCSTORE_OperationHandle *transmit_head;
112 * Tail of operations to transmit.
114 struct GNUNET_PSYCSTORE_OperationHandle *transmit_tail;
117 * Head of active operations waiting for response.
119 struct GNUNET_PSYCSTORE_OperationHandle *op_head;
122 * Tail of active operations waiting for response.
124 struct GNUNET_PSYCSTORE_OperationHandle *op_tail;
127 * Currently pending transmission request, or NULL for none.
129 struct GNUNET_CLIENT_TransmitHandle *th;
132 * Task doing exponential back-off trying to reconnect.
134 struct GNUNET_SCHEDULER_Task *reconnect_task;
137 * Time for next connect retry.
139 struct GNUNET_TIME_Relative reconnect_delay;
142 * Last operation ID used.
147 * Are we polling for incoming messages right now?
154 * Get a fresh operation ID to distinguish between PSYCstore requests.
156 * @param h Handle to the PSYCstore service.
157 * @return next operation id to use
160 get_next_op_id (struct GNUNET_PSYCSTORE_Handle *h)
162 return h->last_op_id++;
167 * Find operation by ID.
169 * @return OperationHandle if found, or NULL otherwise.
171 static struct GNUNET_PSYCSTORE_OperationHandle *
172 find_op_by_id (struct GNUNET_PSYCSTORE_Handle *h, uint64_t op_id)
174 struct GNUNET_PSYCSTORE_OperationHandle *op = h->op_head;
177 if (op->op_id == op_id)
186 * Try again to connect to the PSYCstore service.
188 * @param cls handle to the PSYCstore service.
191 reconnect (void *cls);
195 * Reschedule a connect attempt to the service.
197 * @param h transport service to reconnect
200 reschedule_connect (struct GNUNET_PSYCSTORE_Handle *h)
202 GNUNET_assert (h->reconnect_task == NULL);
206 GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
209 if (NULL != h->client)
211 GNUNET_CLIENT_disconnect (h->client);
214 h->in_receive = GNUNET_NO;
215 LOG (GNUNET_ERROR_TYPE_DEBUG,
216 "Scheduling task to reconnect to PSYCstore service in %s.\n",
217 GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, GNUNET_YES));
219 GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, &reconnect, h);
220 h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
225 * Schedule transmission of the next message from our queue.
227 * @param h PSYCstore handle
230 transmit_next (struct GNUNET_PSYCSTORE_Handle *h);
234 * Type of a function to call when we receive a message
238 * @param msg message received, NULL on timeout or fatal error
241 message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
243 struct GNUNET_PSYCSTORE_Handle *h = cls;
244 struct GNUNET_PSYCSTORE_OperationHandle *op;
245 const struct OperationResult *opres;
246 const struct CountersResult *cres;
247 const struct FragmentResult *fres;
248 const struct StateResult *sres;
253 reschedule_connect (h);
256 LOG (GNUNET_ERROR_TYPE_DEBUG,
257 "Received message of type %d from PSYCstore service.\n",
259 uint16_t size = ntohs (msg->size);
260 uint16_t type = ntohs (msg->type);
263 case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE:
264 if (size < sizeof (struct OperationResult))
266 LOG (GNUNET_ERROR_TYPE_ERROR,
267 "Received message of type %d with length %lu bytes. "
269 type, size, sizeof (struct OperationResult));
271 reschedule_connect (h);
275 opres = (const struct OperationResult *) msg;
276 str = (const char *) &opres[1];
277 if ( (size > sizeof (struct OperationResult)) &&
278 ('\0' != str[size - sizeof (struct OperationResult) - 1]) )
281 reschedule_connect (h);
284 if (size == sizeof (struct OperationResult))
287 op = find_op_by_id (h, GNUNET_ntohll (opres->op_id));
290 LOG (GNUNET_ERROR_TYPE_DEBUG,
291 "No callback registered for operation with ID %" PRIu64 ".\n",
292 type, GNUNET_ntohll (opres->op_id));
296 LOG (GNUNET_ERROR_TYPE_DEBUG,
297 "Received result message (type %d) with operation ID: %" PRIu64 "\n",
300 int64_t result_code = GNUNET_ntohll (opres->result_code) + INT64_MIN;
301 GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op);
302 if (NULL != op->res_cb)
304 const struct StateSyncRequest *ssreq;
305 switch (ntohs (op->msg->type))
307 case GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC:
308 ssreq = (const struct StateSyncRequest *) op->msg;
309 if (!(ssreq->flags & STATE_OP_LAST
310 || GNUNET_OK != result_code))
315 if (NULL != op->res_cb)
316 op->res_cb (op->cls, result_code, str, size - sizeof (*opres));
321 case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS:
322 if (size != sizeof (struct CountersResult))
324 LOG (GNUNET_ERROR_TYPE_ERROR,
325 "Received message of type %d with length %lu bytes. "
327 type, size, sizeof (struct CountersResult));
329 reschedule_connect (h);
333 cres = (const struct CountersResult *) msg;
335 op = find_op_by_id (h, GNUNET_ntohll (cres->op_id));
338 LOG (GNUNET_ERROR_TYPE_DEBUG,
339 "No callback registered for operation with ID %" PRIu64 ".\n",
340 type, GNUNET_ntohll (cres->op_id));
344 GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op);
345 if (NULL != op->data_cb)
346 ((GNUNET_PSYCSTORE_CountersCallback)
347 op->data_cb) (op->cls,
348 ntohl (cres->result_code),
349 GNUNET_ntohll (cres->max_fragment_id),
350 GNUNET_ntohll (cres->max_message_id),
351 GNUNET_ntohll (cres->max_group_generation),
352 GNUNET_ntohll (cres->max_state_message_id));
357 case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_FRAGMENT:
358 if (size < sizeof (struct FragmentResult))
360 LOG (GNUNET_ERROR_TYPE_ERROR,
361 "Received message of type %d with length %lu bytes. "
363 type, size, sizeof (struct FragmentResult));
365 reschedule_connect (h);
369 fres = (const struct FragmentResult *) msg;
370 struct GNUNET_MULTICAST_MessageHeader *mmsg =
371 (struct GNUNET_MULTICAST_MessageHeader *) &fres[1];
372 if (size != sizeof (struct FragmentResult) + ntohs (mmsg->header.size))
374 LOG (GNUNET_ERROR_TYPE_ERROR,
375 "Received message of type %d with length %lu bytes. "
378 sizeof (struct FragmentResult) + ntohs (mmsg->header.size));
380 reschedule_connect (h);
384 op = find_op_by_id (h, GNUNET_ntohll (fres->op_id));
387 LOG (GNUNET_ERROR_TYPE_DEBUG,
388 "No callback registered for operation with ID %" PRIu64 ".\n",
389 type, GNUNET_ntohll (fres->op_id));
393 if (NULL != op->data_cb)
394 ((GNUNET_PSYCSTORE_FragmentCallback)
395 op->data_cb) (op->cls, mmsg, ntohl (fres->psycstore_flags));
399 case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_STATE:
400 if (size < sizeof (struct StateResult))
402 LOG (GNUNET_ERROR_TYPE_ERROR,
403 "Received message of type %d with length %lu bytes. "
405 type, size, sizeof (struct StateResult));
407 reschedule_connect (h);
411 sres = (const struct StateResult *) msg;
412 const char *name = (const char *) &sres[1];
413 uint16_t name_size = ntohs (sres->name_size);
415 if (name_size <= 2 || '\0' != name[name_size - 1])
417 LOG (GNUNET_ERROR_TYPE_ERROR,
418 "Received state result message (type %d) with invalid name.\n",
421 reschedule_connect (h);
425 op = find_op_by_id (h, GNUNET_ntohll (sres->op_id));
428 LOG (GNUNET_ERROR_TYPE_DEBUG,
429 "No callback registered for operation with ID %" PRIu64 ".\n",
430 type, GNUNET_ntohll (sres->op_id));
434 if (NULL != op->data_cb)
435 ((GNUNET_PSYCSTORE_StateCallback)
436 op->data_cb) (op->cls, name, (char *) &sres[1] + name_size,
437 ntohs (sres->header.size) - sizeof (*sres) - name_size);
443 reschedule_connect (h);
447 GNUNET_CLIENT_receive (h->client, &message_handler, h,
448 GNUNET_TIME_UNIT_FOREVER_REL);
453 * Transmit next message to service.
455 * @param cls The 'struct GNUNET_PSYCSTORE_Handle'.
456 * @param size Number of bytes available in buf.
457 * @param buf Where to copy the message.
458 * @return Number of bytes copied to buf.
461 send_next_message (void *cls, size_t size, void *buf)
463 struct GNUNET_PSYCSTORE_Handle *h = cls;
464 struct GNUNET_PSYCSTORE_OperationHandle *op = h->transmit_head;
470 ret = ntohs (op->msg->size);
473 reschedule_connect (h);
476 LOG (GNUNET_ERROR_TYPE_DEBUG,
477 "Sending message of type %d to PSYCstore service. ID: %" PRIu64 "\n",
478 ntohs (op->msg->type), op->op_id);
479 memcpy (buf, op->msg, ret);
481 GNUNET_CONTAINER_DLL_remove (h->transmit_head, h->transmit_tail, op);
483 if (NULL == op->res_cb && NULL == op->data_cb)
489 GNUNET_CONTAINER_DLL_insert_tail (h->op_head, h->op_tail, op);
492 if (NULL != h->transmit_head)
495 if (GNUNET_NO == h->in_receive)
497 h->in_receive = GNUNET_YES;
498 GNUNET_CLIENT_receive (h->client, &message_handler, h,
499 GNUNET_TIME_UNIT_FOREVER_REL);
506 * Schedule transmission of the next message from our queue.
508 * @param h PSYCstore handle.
511 transmit_next (struct GNUNET_PSYCSTORE_Handle *h)
513 if (NULL != h->th || NULL == h->client)
516 struct GNUNET_PSYCSTORE_OperationHandle *op = h->transmit_head;
520 h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
521 ntohs (op->msg->size),
522 GNUNET_TIME_UNIT_FOREVER_REL,
530 * Try again to connect to the PSYCstore service.
532 * @param cls Handle to the PSYCstore service.
535 reconnect (void *cls)
537 struct GNUNET_PSYCSTORE_Handle *h = cls;
539 h->reconnect_task = NULL;
540 LOG (GNUNET_ERROR_TYPE_DEBUG,
541 "Connecting to PSYCstore service.\n");
542 GNUNET_assert (NULL == h->client);
543 h->client = GNUNET_CLIENT_connect ("psycstore", h->cfg);
544 GNUNET_assert (NULL != h->client);
550 * Connect to the PSYCstore service.
552 * @param cfg The configuration to use
553 * @return Handle to use
555 struct GNUNET_PSYCSTORE_Handle *
556 GNUNET_PSYCSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
558 struct GNUNET_PSYCSTORE_Handle *h
559 = GNUNET_new (struct GNUNET_PSYCSTORE_Handle);
561 h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
562 h->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, h);
568 * Disconnect from PSYCstore service
570 * @param h Handle to destroy
573 GNUNET_PSYCSTORE_disconnect (struct GNUNET_PSYCSTORE_Handle *h)
575 GNUNET_assert (NULL != h);
576 if (h->reconnect_task != NULL)
578 GNUNET_SCHEDULER_cancel (h->reconnect_task);
579 h->reconnect_task = NULL;
583 GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
586 if (NULL != h->client)
588 GNUNET_CLIENT_disconnect (h->client);
596 * Cancel a PSYCstore operation. Note that the operation MAY still
597 * be executed; this merely cancels the continuation; if the request
598 * was already transmitted, the service may still choose to complete
601 * @param op Operation to cancel.
604 GNUNET_PSYCSTORE_operation_cancel (struct GNUNET_PSYCSTORE_OperationHandle *op)
606 struct GNUNET_PSYCSTORE_Handle *h = op->h;
608 if (h->transmit_head != NULL && (h->transmit_head != op || NULL == h->client))
610 /* request not active, can simply remove */
611 GNUNET_CONTAINER_DLL_remove (h->transmit_head, h->transmit_tail, op);
617 /* request active but not yet with service, can still abort */
618 GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
620 GNUNET_CONTAINER_DLL_remove (h->transmit_head, h->transmit_tail, op);
625 /* request active with service, simply ensure continuations are not called */
632 * Store join/leave events for a PSYC channel in order to be able to answer
633 * membership test queries later.
636 * Handle for the PSYCstore.
638 * The channel where the event happened.
640 * Public key of joining/leaving slave.
642 * #GNUNET_YES on join, #GNUNET_NO on part.
643 * @param announced_at
644 * ID of the message that announced the membership change.
645 * @param effective_since
646 * Message ID this membership change is in effect since.
647 * For joins it is <= announced_at, for parts it is always 0.
648 * @param group_generation
649 * In case of a part, the last group generation the slave has access to.
650 * It has relevance when a larger message have fragments with different
653 * Callback to call with the result of the storage operation.
655 * Closure for the callback.
657 * @return Operation handle that can be used to cancel the operation.
659 struct GNUNET_PSYCSTORE_OperationHandle *
660 GNUNET_PSYCSTORE_membership_store (struct GNUNET_PSYCSTORE_Handle *h,
661 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
662 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
664 uint64_t announced_at,
665 uint64_t effective_since,
666 uint64_t group_generation,
667 GNUNET_PSYCSTORE_ResultCallback rcb,
670 GNUNET_assert (NULL != h);
671 GNUNET_assert (NULL != channel_key);
672 GNUNET_assert (NULL != slave_key);
673 GNUNET_assert (GNUNET_YES == did_join || GNUNET_NO == did_join);
674 GNUNET_assert (did_join
675 ? effective_since <= announced_at
676 : effective_since == 0);
678 struct MembershipStoreRequest *req;
679 struct GNUNET_PSYCSTORE_OperationHandle *
680 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
685 req = (struct MembershipStoreRequest *) &op[1];
686 op->msg = (struct GNUNET_MessageHeader *) req;
687 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_STORE);
688 req->header.size = htons (sizeof (*req));
689 req->channel_key = *channel_key;
690 req->slave_key = *slave_key;
691 req->did_join = did_join;
692 req->announced_at = GNUNET_htonll (announced_at);
693 req->effective_since = GNUNET_htonll (effective_since);
694 req->group_generation = GNUNET_htonll (group_generation);
696 op->op_id = get_next_op_id (h);
697 req->op_id = GNUNET_htonll (op->op_id);
699 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
707 * Test if a member was admitted to the channel at the given message ID.
709 * This is useful when relaying and replaying messages to check if a particular
710 * slave has access to the message fragment with a given group generation. It
711 * is also used when handling join requests to determine whether the slave is
712 * currently admitted to the channel.
715 * Handle for the PSYCstore.
717 * The channel we are interested in.
719 * Public key of slave whose membership to check.
721 * Message ID for which to do the membership test.
722 * @param group_generation
723 * Group generation of the fragment of the message to test.
724 * It has relevance if the message consists of multiple fragments with
725 * different group generations.
727 * Callback to call with the test result.
729 * Closure for the callback.
731 * @return Operation handle that can be used to cancel the operation.
733 struct GNUNET_PSYCSTORE_OperationHandle *
734 GNUNET_PSYCSTORE_membership_test (struct GNUNET_PSYCSTORE_Handle *h,
735 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
736 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
738 uint64_t group_generation,
739 GNUNET_PSYCSTORE_ResultCallback rcb,
742 struct MembershipTestRequest *req;
743 struct GNUNET_PSYCSTORE_OperationHandle *
744 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
749 req = (struct MembershipTestRequest *) &op[1];
750 op->msg = (struct GNUNET_MessageHeader *) req;
751 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_TEST);
752 req->header.size = htons (sizeof (*req));
753 req->channel_key = *channel_key;
754 req->slave_key = *slave_key;
755 req->message_id = GNUNET_htonll (message_id);
756 req->group_generation = GNUNET_htonll (group_generation);
758 op->op_id = get_next_op_id (h);
759 req->op_id = GNUNET_htonll (op->op_id);
761 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
769 * Store a message fragment sent to a channel.
771 * @param h Handle for the PSYCstore.
772 * @param channel_key The channel the message belongs to.
773 * @param message Message to store.
774 * @param psycstore_flags Flags indicating whether the PSYC message contains
776 * @param rcb Callback to call with the result of the operation.
777 * @param rcb_cls Closure for the callback.
779 * @return Handle that can be used to cancel the operation.
781 struct GNUNET_PSYCSTORE_OperationHandle *
782 GNUNET_PSYCSTORE_fragment_store (struct GNUNET_PSYCSTORE_Handle *h,
783 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
784 const struct GNUNET_MULTICAST_MessageHeader *msg,
785 enum GNUNET_PSYCSTORE_MessageFlags psycstore_flags,
786 GNUNET_PSYCSTORE_ResultCallback rcb,
789 uint16_t size = ntohs (msg->header.size);
790 struct FragmentStoreRequest *req;
791 struct GNUNET_PSYCSTORE_OperationHandle *
792 op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + size);
797 req = (struct FragmentStoreRequest *) &op[1];
798 op->msg = (struct GNUNET_MessageHeader *) req;
799 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_STORE);
800 req->header.size = htons (sizeof (*req) + size);
801 req->channel_key = *channel_key;
802 req->psycstore_flags = htonl (psycstore_flags);
803 memcpy (&req[1], msg, size);
805 op->op_id = get_next_op_id (h);
806 req->op_id = GNUNET_htonll (op->op_id);
808 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
816 * Retrieve message fragments by fragment ID range.
819 * Handle for the PSYCstore.
821 * The channel we are interested in.
823 * The slave requesting the fragment. If not NULL, a membership test is
824 * performed first and the fragment is only returned if the slave has
826 * @param first_fragment_id
827 * First fragment ID to retrieve.
828 * Use 0 to get the latest message fragment.
829 * @param last_fragment_id
830 * Last consecutive fragment ID to retrieve.
831 * Use 0 to get the latest message fragment.
832 * @param fragment_limit
833 * Maximum number of fragments to retrieve.
835 * Callback to call with the retrieved fragments.
837 * Callback to call with the result of the operation.
839 * Closure for the callbacks.
841 * @return Handle that can be used to cancel the operation.
843 struct GNUNET_PSYCSTORE_OperationHandle *
844 GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h,
845 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
846 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
847 uint64_t first_fragment_id,
848 uint64_t last_fragment_id,
849 GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
850 GNUNET_PSYCSTORE_ResultCallback rcb,
853 struct FragmentGetRequest *req;
854 struct GNUNET_PSYCSTORE_OperationHandle *
855 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
857 op->data_cb = (DataCallback) fragment_cb;
861 req = (struct FragmentGetRequest *) &op[1];
862 op->msg = (struct GNUNET_MessageHeader *) req;
863 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET);
864 req->header.size = htons (sizeof (*req));
865 req->channel_key = *channel_key;
866 req->first_fragment_id = GNUNET_htonll (first_fragment_id);
867 req->last_fragment_id = GNUNET_htonll (last_fragment_id);
868 if (NULL != slave_key)
870 req->slave_key = *slave_key;
871 req->do_membership_test = GNUNET_YES;
874 op->op_id = get_next_op_id (h);
875 req->op_id = GNUNET_htonll (op->op_id);
877 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
885 * Retrieve latest message fragments.
888 * Handle for the PSYCstore.
890 * The channel we are interested in.
892 * The slave requesting the fragment. If not NULL, a membership test is
893 * performed first and the fragment is only returned if the slave has
895 * @param first_fragment_id
896 * First fragment ID to retrieve.
897 * Use 0 to get the latest message fragment.
898 * @param last_fragment_id
899 * Last consecutive fragment ID to retrieve.
900 * Use 0 to get the latest message fragment.
901 * @param fragment_limit
902 * Maximum number of fragments to retrieve.
904 * Callback to call with the retrieved fragments.
906 * Callback to call with the result of the operation.
908 * Closure for the callbacks.
910 * @return Handle that can be used to cancel the operation.
912 struct GNUNET_PSYCSTORE_OperationHandle *
913 GNUNET_PSYCSTORE_fragment_get_latest (struct GNUNET_PSYCSTORE_Handle *h,
914 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
915 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
916 uint64_t fragment_limit,
917 GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
918 GNUNET_PSYCSTORE_ResultCallback rcb,
921 struct FragmentGetRequest *req;
922 struct GNUNET_PSYCSTORE_OperationHandle *
923 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
925 op->data_cb = (DataCallback) fragment_cb;
929 req = (struct FragmentGetRequest *) &op[1];
930 op->msg = (struct GNUNET_MessageHeader *) req;
931 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET);
932 req->header.size = htons (sizeof (*req));
933 req->channel_key = *channel_key;
934 req->fragment_limit = GNUNET_ntohll (fragment_limit);
935 if (NULL != slave_key)
937 req->slave_key = *slave_key;
938 req->do_membership_test = GNUNET_YES;
941 op->op_id = get_next_op_id (h);
942 req->op_id = GNUNET_htonll (op->op_id);
944 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
952 * Retrieve all fragments of messages in a message ID range.
955 * Handle for the PSYCstore.
957 * The channel we are interested in.
959 * The slave requesting the message.
960 * If not NULL, a membership test is performed first
961 * and the message is only returned if the slave has access to it.
962 * @param first_message_id
963 * First message ID to retrieve.
964 * @param last_message_id
965 * Last consecutive message ID to retrieve.
966 * @param fragment_limit
967 * Maximum number of fragments to retrieve.
968 * @param method_prefix
969 * Retrieve only messages with a matching method prefix.
970 * @todo Implement method_prefix query.
972 * Callback to call with the retrieved fragments.
974 * Callback to call with the result of the operation.
976 * Closure for the callbacks.
978 * @return Handle that can be used to cancel the operation.
980 struct GNUNET_PSYCSTORE_OperationHandle *
981 GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h,
982 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
983 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
984 uint64_t first_message_id,
985 uint64_t last_message_id,
986 uint64_t fragment_limit,
987 const char *method_prefix,
988 GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
989 GNUNET_PSYCSTORE_ResultCallback rcb,
992 struct MessageGetRequest *req;
993 if (NULL == method_prefix)
995 uint16_t method_size = strnlen (method_prefix,
996 GNUNET_SERVER_MAX_MESSAGE_SIZE
997 - sizeof (*req)) + 1;
999 struct GNUNET_PSYCSTORE_OperationHandle *
1000 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
1002 op->data_cb = (DataCallback) fragment_cb;
1006 req = (struct MessageGetRequest *) &op[1];
1007 op->msg = (struct GNUNET_MessageHeader *) req;
1008 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET);
1009 req->header.size = htons (sizeof (*req) + method_size);
1010 req->channel_key = *channel_key;
1011 req->first_message_id = GNUNET_htonll (first_message_id);
1012 req->last_message_id = GNUNET_htonll (last_message_id);
1013 req->fragment_limit = GNUNET_htonll (fragment_limit);
1014 if (NULL != slave_key)
1016 req->slave_key = *slave_key;
1017 req->do_membership_test = GNUNET_YES;
1019 memcpy (&req[1], method_prefix, method_size);
1020 ((char *) &req[1])[method_size - 1] = '\0';
1022 op->op_id = get_next_op_id (h);
1023 req->op_id = GNUNET_htonll (op->op_id);
1025 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1033 * Retrieve all fragments of the latest messages.
1036 * Handle for the PSYCstore.
1037 * @param channel_key
1038 * The channel we are interested in.
1040 * The slave requesting the message.
1041 * If not NULL, a membership test is performed first
1042 * and the message is only returned if the slave has access to it.
1043 * @param message_limit
1044 * Maximum number of messages to retrieve.
1045 * @param method_prefix
1046 * Retrieve only messages with a matching method prefix.
1047 * @todo Implement method_prefix query.
1048 * @param fragment_cb
1049 * Callback to call with the retrieved fragments.
1051 * Callback to call with the result of the operation.
1053 * Closure for the callbacks.
1055 * @return Handle that can be used to cancel the operation.
1057 struct GNUNET_PSYCSTORE_OperationHandle *
1058 GNUNET_PSYCSTORE_message_get_latest (struct GNUNET_PSYCSTORE_Handle *h,
1059 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1060 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
1061 uint64_t message_limit,
1062 const char *method_prefix,
1063 GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
1064 GNUNET_PSYCSTORE_ResultCallback rcb,
1067 struct MessageGetRequest *req;
1069 if (NULL == method_prefix)
1071 uint16_t method_size = strnlen (method_prefix,
1072 GNUNET_SERVER_MAX_MESSAGE_SIZE
1073 - sizeof (*req)) + 1;
1074 GNUNET_assert ('\0' == method_prefix[method_size - 1]);
1076 struct GNUNET_PSYCSTORE_OperationHandle *
1077 op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + method_size);
1079 op->data_cb = (DataCallback) fragment_cb;
1083 req = (struct MessageGetRequest *) &op[1];
1084 op->msg = (struct GNUNET_MessageHeader *) req;
1085 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET);
1086 req->header.size = htons (sizeof (*req) + method_size);
1087 req->channel_key = *channel_key;
1088 req->message_limit = GNUNET_ntohll (message_limit);
1089 if (NULL != slave_key)
1091 req->slave_key = *slave_key;
1092 req->do_membership_test = GNUNET_YES;
1095 op->op_id = get_next_op_id (h);
1096 req->op_id = GNUNET_htonll (op->op_id);
1097 memcpy (&req[1], method_prefix, method_size);
1099 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1107 * Retrieve a fragment of message specified by its message ID and fragment
1111 * Handle for the PSYCstore.
1112 * @param channel_key
1113 * The channel we are interested in.
1115 * The slave requesting the message fragment. If not NULL, a membership
1116 * test is performed first and the message fragment is only returned
1117 * if the slave has access to it.
1119 * Message ID to retrieve. Use 0 to get the latest message.
1120 * @param fragment_offset
1121 * Offset of the fragment to retrieve.
1122 * @param fragment_cb
1123 * Callback to call with the retrieved fragments.
1125 * Callback to call with the result of the operation.
1127 * Closure for the callbacks.
1129 * @return Handle that can be used to cancel the operation.
1131 struct GNUNET_PSYCSTORE_OperationHandle *
1132 GNUNET_PSYCSTORE_message_get_fragment (struct GNUNET_PSYCSTORE_Handle *h,
1133 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1134 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
1135 uint64_t message_id,
1136 uint64_t fragment_offset,
1137 GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
1138 GNUNET_PSYCSTORE_ResultCallback rcb,
1141 struct MessageGetFragmentRequest *req;
1142 struct GNUNET_PSYCSTORE_OperationHandle *
1143 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
1145 op->data_cb = (DataCallback) fragment_cb;
1149 req = (struct MessageGetFragmentRequest *) &op[1];
1150 op->msg = (struct GNUNET_MessageHeader *) req;
1151 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET_FRAGMENT);
1152 req->header.size = htons (sizeof (*req));
1153 req->channel_key = *channel_key;
1154 req->message_id = GNUNET_htonll (message_id);
1155 req->fragment_offset = GNUNET_htonll (fragment_offset);
1156 if (NULL != slave_key)
1158 req->slave_key = *slave_key;
1159 req->do_membership_test = GNUNET_YES;
1162 op->op_id = get_next_op_id (h);
1163 req->op_id = GNUNET_htonll (op->op_id);
1165 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1173 * Retrieve latest values of counters for a channel master.
1175 * The current value of counters are needed when a channel master is restarted,
1176 * so that it can continue incrementing the counters from their last value.
1179 * Handle for the PSYCstore.
1180 * @param channel_key
1181 * Public key that identifies the channel.
1183 * Callback to call with the result.
1185 * Closure for the @a ccb callback.
1187 * @return Handle that can be used to cancel the operation.
1189 struct GNUNET_PSYCSTORE_OperationHandle *
1190 GNUNET_PSYCSTORE_counters_get (struct GNUNET_PSYCSTORE_Handle *h,
1191 struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1192 GNUNET_PSYCSTORE_CountersCallback ccb,
1195 struct OperationRequest *req;
1196 struct GNUNET_PSYCSTORE_OperationHandle *
1197 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
1202 req = (struct OperationRequest *) &op[1];
1203 op->msg = (struct GNUNET_MessageHeader *) req;
1204 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_COUNTERS_GET);
1205 req->header.size = htons (sizeof (*req));
1206 req->channel_key = *channel_key;
1208 op->op_id = get_next_op_id (h);
1209 req->op_id = GNUNET_htonll (op->op_id);
1211 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1219 * Apply modifiers of a message to the current channel state.
1221 * An error is returned if there are missing messages containing state
1222 * operations before the current one.
1225 * Handle for the PSYCstore.
1226 * @param channel_key
1227 * The channel we are interested in.
1229 * ID of the message that contains the @a modifiers.
1230 * @param state_delta
1231 * Value of the _state_delta PSYC header variable of the message.
1233 * Callback to call with the result of the operation.
1235 * Closure for the @a rcb callback.
1237 * @return Handle that can be used to cancel the operation.
1239 struct GNUNET_PSYCSTORE_OperationHandle *
1240 GNUNET_PSYCSTORE_state_modify (struct GNUNET_PSYCSTORE_Handle *h,
1241 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1242 uint64_t message_id,
1243 uint64_t state_delta,
1244 GNUNET_PSYCSTORE_ResultCallback rcb,
1247 struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
1248 struct StateModifyRequest *req;
1250 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
1255 req = (struct StateModifyRequest *) &op[1];
1256 op->msg = (struct GNUNET_MessageHeader *) req;
1257 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY);
1258 req->header.size = htons (sizeof (*req));
1259 req->channel_key = *channel_key;
1260 req->message_id = GNUNET_htonll (message_id);
1261 req->state_delta = GNUNET_htonll (state_delta);
1263 op->op_id = get_next_op_id (h);
1264 req->op_id = GNUNET_htonll (op->op_id);
1266 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1270 /* FIXME: only the last operation is returned,
1271 * operation_cancel() should be able to cancel all of them.
1277 * Store synchronized state.
1280 * Handle for the PSYCstore.
1281 * @param channel_key
1282 * The channel we are interested in.
1283 * @param max_state_message_id
1284 * ID of the last stateful message before @a state_hash_message_id.
1285 * @param state_hash_message_id
1286 * ID of the message that contains the state_hash PSYC header variable.
1287 * @param modifier_count
1288 * Number of elements in the @a modifiers array.
1290 * Full state to store.
1292 * Callback to call with the result of the operation.
1294 * Closure for the callback.
1296 * @return Handle that can be used to cancel the operation.
1298 struct GNUNET_PSYCSTORE_OperationHandle *
1299 GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h,
1300 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1301 uint64_t max_state_message_id,
1302 uint64_t state_hash_message_id,
1303 size_t modifier_count,
1304 const struct GNUNET_PSYC_Modifier *modifiers,
1305 GNUNET_PSYCSTORE_ResultCallback rcb,
1308 struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
1311 for (i = 0; i < modifier_count; i++) {
1312 struct StateSyncRequest *req;
1313 uint16_t name_size = strlen (modifiers[i].name) + 1;
1315 op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size +
1316 modifiers[i].value_size);
1321 req = (struct StateSyncRequest *) &op[1];
1322 op->msg = (struct GNUNET_MessageHeader *) req;
1323 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC);
1324 req->header.size = htons (sizeof (*req) + name_size
1325 + modifiers[i].value_size);
1326 req->channel_key = *channel_key;
1327 req->max_state_message_id = GNUNET_htonll (max_state_message_id);
1328 req->state_hash_message_id = GNUNET_htonll (state_hash_message_id);
1329 req->name_size = htons (name_size);
1333 : (modifier_count - 1 == i)
1337 memcpy (&req[1], modifiers[i].name, name_size);
1338 memcpy ((char *) &req[1] + name_size, modifiers[i].value, modifiers[i].value_size);
1340 op->op_id = get_next_op_id (h);
1341 req->op_id = GNUNET_htonll (op->op_id);
1343 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1351 * Reset the state of a channel.
1353 * Delete all state variables stored for the given channel.
1356 * Handle for the PSYCstore.
1357 * @param channel_key
1358 * The channel we are interested in.
1360 * Callback to call with the result of the operation.
1362 * Closure for the callback.
1364 * @return Handle that can be used to cancel the operation.
1366 struct GNUNET_PSYCSTORE_OperationHandle *
1367 GNUNET_PSYCSTORE_state_reset (struct GNUNET_PSYCSTORE_Handle *h,
1368 const struct GNUNET_CRYPTO_EddsaPublicKey
1370 GNUNET_PSYCSTORE_ResultCallback rcb,
1373 struct OperationRequest *req;
1374 struct GNUNET_PSYCSTORE_OperationHandle *
1375 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
1380 req = (struct OperationRequest *) &op[1];
1381 op->msg = (struct GNUNET_MessageHeader *) req;
1382 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_RESET);
1383 req->header.size = htons (sizeof (*req));
1384 req->channel_key = *channel_key;
1386 op->op_id = get_next_op_id (h);
1387 req->op_id = GNUNET_htonll (op->op_id);
1389 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1398 * Update signed values of state variables in the state store.
1401 * Handle for the PSYCstore.
1402 * @param channel_key
1403 * The channel we are interested in.
1405 * Message ID that contained the state @a hash.
1407 * Hash of the serialized full state.
1409 * Callback to call with the result of the operation.
1411 * Closure for the callback.
1413 struct GNUNET_PSYCSTORE_OperationHandle *
1414 GNUNET_PSYCSTORE_state_hash_update (struct GNUNET_PSYCSTORE_Handle *h,
1415 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1416 uint64_t message_id,
1417 const struct GNUNET_HashCode *hash,
1418 GNUNET_PSYCSTORE_ResultCallback rcb,
1421 struct StateHashUpdateRequest *req;
1422 struct GNUNET_PSYCSTORE_OperationHandle *
1423 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
1428 req = (struct StateHashUpdateRequest *) &op[1];
1429 op->msg = (struct GNUNET_MessageHeader *) req;
1430 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_RESET);
1431 req->header.size = htons (sizeof (*req));
1432 req->channel_key = *channel_key;
1435 op->op_id = get_next_op_id (h);
1436 req->op_id = GNUNET_htonll (op->op_id);
1438 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1446 * Retrieve the best matching state variable.
1449 * Handle for the PSYCstore.
1450 * @param channel_key
1451 * The channel we are interested in.
1453 * Name of variable to match, the returned variable might be less specific.
1455 * Callback to return the matching state variable.
1457 * Callback to call with the result of the operation.
1459 * Closure for the callbacks.
1461 * @return Handle that can be used to cancel the operation.
1463 struct GNUNET_PSYCSTORE_OperationHandle *
1464 GNUNET_PSYCSTORE_state_get (struct GNUNET_PSYCSTORE_Handle *h,
1465 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1467 GNUNET_PSYCSTORE_StateCallback scb,
1468 GNUNET_PSYCSTORE_ResultCallback rcb,
1471 size_t name_size = strlen (name) + 1;
1472 struct OperationRequest *req;
1473 struct GNUNET_PSYCSTORE_OperationHandle *
1474 op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size);
1476 op->data_cb = (DataCallback) scb;
1480 req = (struct OperationRequest *) &op[1];
1481 op->msg = (struct GNUNET_MessageHeader *) req;
1482 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET);
1483 req->header.size = htons (sizeof (*req) + name_size);
1484 req->channel_key = *channel_key;
1485 memcpy (&req[1], name, name_size);
1487 op->op_id = get_next_op_id (h);
1488 req->op_id = GNUNET_htonll (op->op_id);
1490 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1499 * Retrieve all state variables for a channel with the given prefix.
1502 * Handle for the PSYCstore.
1503 * @param channel_key
1504 * The channel we are interested in.
1505 * @param name_prefix
1506 * Prefix of state variable names to match.
1508 * Callback to return matching state variables.
1510 * Callback to call with the result of the operation.
1512 * Closure for the callbacks.
1514 * @return Handle that can be used to cancel the operation.
1516 struct GNUNET_PSYCSTORE_OperationHandle *
1517 GNUNET_PSYCSTORE_state_get_prefix (struct GNUNET_PSYCSTORE_Handle *h,
1518 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1519 const char *name_prefix,
1520 GNUNET_PSYCSTORE_StateCallback scb,
1521 GNUNET_PSYCSTORE_ResultCallback rcb,
1524 size_t name_size = strlen (name_prefix) + 1;
1525 struct OperationRequest *req;
1526 struct GNUNET_PSYCSTORE_OperationHandle *
1527 op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size);
1529 op->data_cb = (DataCallback) scb;
1533 req = (struct OperationRequest *) &op[1];
1534 op->msg = (struct GNUNET_MessageHeader *) req;
1535 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET_PREFIX);
1536 req->header.size = htons (sizeof (*req) + name_size);
1537 req->channel_key = *channel_key;
1538 memcpy (&req[1], name_prefix, name_size);
1540 op->op_id = get_next_op_id (h);
1541 req->op_id = GNUNET_htonll (op->op_id);
1543 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1549 /* end of psycstore_api.c */