2 * This file is part of GNUnet
3 * Copyright (C) 2013 Christian Grothoff (and other contributing authors)
5 * GNUnet is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published
7 * by the Free Software Foundation; either version 3, or (at your
8 * option) any later version.
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with GNUnet; see the file COPYING. If not, write to the
17 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 * Boston, MA 02111-1307, USA.
22 * @file psycstore/psycstore_api.c
23 * @brief API to interact with the PSYCstore service
24 * @author Gabor X Toth
25 * @author Christian Grothoff
31 #include "gnunet_util_lib.h"
32 #include "gnunet_constants.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_psycstore_service.h"
35 #include "gnunet_multicast_service.h"
36 #include "psycstore.h"
38 #define LOG(kind,...) GNUNET_log_from (kind, "psycstore-api",__VA_ARGS__)
40 typedef void (*DataCallback) ();
43 * Handle for an operation with the PSYCstore service.
45 struct GNUNET_PSYCSTORE_OperationHandle
49 * Main PSYCstore handle.
51 struct GNUNET_PSYCSTORE_Handle *h;
54 * We keep operations in a DLL.
56 struct GNUNET_PSYCSTORE_OperationHandle *next;
59 * We keep operations in a DLL.
61 struct GNUNET_PSYCSTORE_OperationHandle *prev;
64 * Continuation to invoke with the result of an operation.
66 GNUNET_PSYCSTORE_ResultCallback res_cb;
69 * Continuation to invoke with the result of an operation returning data.
74 * Closure for the callbacks.
84 * Message to send to the PSYCstore service.
85 * Allocated at the end of this struct.
87 const struct GNUNET_MessageHeader *msg;
92 * Handle for the service.
94 struct GNUNET_PSYCSTORE_Handle
97 * Configuration to use.
99 const struct GNUNET_CONFIGURATION_Handle *cfg;
102 * Socket (if available).
104 struct GNUNET_CLIENT_Connection *client;
107 * Head of operations to transmit.
109 struct GNUNET_PSYCSTORE_OperationHandle *transmit_head;
112 * Tail of operations to transmit.
114 struct GNUNET_PSYCSTORE_OperationHandle *transmit_tail;
117 * Head of active operations waiting for response.
119 struct GNUNET_PSYCSTORE_OperationHandle *op_head;
122 * Tail of active operations waiting for response.
124 struct GNUNET_PSYCSTORE_OperationHandle *op_tail;
127 * Currently pending transmission request, or NULL for none.
129 struct GNUNET_CLIENT_TransmitHandle *th;
132 * Task doing exponential back-off trying to reconnect.
134 struct GNUNET_SCHEDULER_Task * reconnect_task;
137 * Time for next connect retry.
139 struct GNUNET_TIME_Relative reconnect_delay;
142 * Last operation ID used.
147 * Are we polling for incoming messages right now?
154 * Get a fresh operation ID to distinguish between PSYCstore requests.
156 * @param h Handle to the PSYCstore service.
157 * @return next operation id to use
160 get_next_op_id (struct GNUNET_PSYCSTORE_Handle *h)
162 return h->last_op_id++;
167 * Find operation by ID.
169 * @return OperationHandle if found, or NULL otherwise.
171 static struct GNUNET_PSYCSTORE_OperationHandle *
172 find_op_by_id (struct GNUNET_PSYCSTORE_Handle *h, uint64_t op_id)
174 struct GNUNET_PSYCSTORE_OperationHandle *op = h->op_head;
177 if (op->op_id == op_id)
186 * Try again to connect to the PSYCstore service.
188 * @param cls handle to the PSYCstore service.
189 * @param tc scheduler context
192 reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
196 * Reschedule a connect attempt to the service.
198 * @param h transport service to reconnect
201 reschedule_connect (struct GNUNET_PSYCSTORE_Handle *h)
203 GNUNET_assert (h->reconnect_task == NULL);
207 GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
210 if (NULL != h->client)
212 GNUNET_CLIENT_disconnect (h->client);
215 h->in_receive = GNUNET_NO;
216 LOG (GNUNET_ERROR_TYPE_DEBUG,
217 "Scheduling task to reconnect to PSYCstore service in %s.\n",
218 GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, GNUNET_YES));
220 GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, &reconnect, h);
221 h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
226 * Schedule transmission of the next message from our queue.
228 * @param h PSYCstore handle
231 transmit_next (struct GNUNET_PSYCSTORE_Handle *h);
235 * Type of a function to call when we receive a message
239 * @param msg message received, NULL on timeout or fatal error
242 message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
244 struct GNUNET_PSYCSTORE_Handle *h = cls;
245 struct GNUNET_PSYCSTORE_OperationHandle *op;
246 const struct OperationResult *opres;
247 const struct CountersResult *cres;
248 const struct FragmentResult *fres;
249 const struct StateResult *sres;
254 reschedule_connect (h);
257 LOG (GNUNET_ERROR_TYPE_DEBUG,
258 "Received message of type %d from PSYCstore service.\n",
260 uint16_t size = ntohs (msg->size);
261 uint16_t type = ntohs (msg->type);
264 case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE:
265 if (size < sizeof (struct OperationResult))
267 LOG (GNUNET_ERROR_TYPE_ERROR,
268 "Received message of type %d with length %lu bytes. "
270 type, size, sizeof (struct OperationResult));
272 reschedule_connect (h);
276 opres = (const struct OperationResult *) msg;
277 str = (const char *) &opres[1];
278 if ( (size > sizeof (struct OperationResult)) &&
279 ('\0' != str[size - sizeof (struct OperationResult) - 1]) )
282 reschedule_connect (h);
285 if (size == sizeof (struct OperationResult))
288 op = find_op_by_id (h, GNUNET_ntohll (opres->op_id));
291 LOG (GNUNET_ERROR_TYPE_DEBUG,
292 "No callback registered for operation with ID %" PRIu64 ".\n",
293 type, GNUNET_ntohll (opres->op_id));
297 LOG (GNUNET_ERROR_TYPE_DEBUG,
298 "Received result message (type %d) with operation ID: %" PRIu64 "\n",
301 int64_t result_code = GNUNET_ntohll (opres->result_code) + INT64_MIN;
302 GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op);
303 if (NULL != op->res_cb)
305 const struct StateModifyRequest *smreq;
306 const struct StateSyncRequest *ssreq;
307 switch (ntohs (op->msg->type))
309 case GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY:
310 smreq = (const struct StateModifyRequest *) op->msg;
311 if (!(smreq->flags & STATE_OP_LAST
312 || GNUNET_OK != result_code))
315 case GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC:
316 ssreq = (const struct StateSyncRequest *) op->msg;
317 if (!(ssreq->flags & STATE_OP_LAST
318 || GNUNET_OK != result_code))
323 if (NULL != op->res_cb)
324 op->res_cb (op->cls, result_code, str);
329 case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS:
330 if (size != sizeof (struct CountersResult))
332 LOG (GNUNET_ERROR_TYPE_ERROR,
333 "Received message of type %d with length %lu bytes. "
335 type, size, sizeof (struct CountersResult));
337 reschedule_connect (h);
341 cres = (const struct CountersResult *) msg;
343 op = find_op_by_id (h, GNUNET_ntohll (cres->op_id));
346 LOG (GNUNET_ERROR_TYPE_DEBUG,
347 "No callback registered for operation with ID %" PRIu64 ".\n",
348 type, GNUNET_ntohll (cres->op_id));
352 GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op);
353 if (NULL != op->data_cb)
354 ((GNUNET_PSYCSTORE_CountersCallback)
355 op->data_cb) (op->cls,
356 ntohl (cres->result_code) + INT32_MIN,
357 GNUNET_ntohll (cres->max_fragment_id),
358 GNUNET_ntohll (cres->max_message_id),
359 GNUNET_ntohll (cres->max_group_generation),
360 GNUNET_ntohll (cres->max_state_message_id));
365 case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_FRAGMENT:
366 if (size < sizeof (struct FragmentResult))
368 LOG (GNUNET_ERROR_TYPE_ERROR,
369 "Received message of type %d with length %lu bytes. "
371 type, size, sizeof (struct FragmentResult));
373 reschedule_connect (h);
377 fres = (const struct FragmentResult *) msg;
378 struct GNUNET_MULTICAST_MessageHeader *mmsg =
379 (struct GNUNET_MULTICAST_MessageHeader *) &fres[1];
380 if (size != sizeof (struct FragmentResult) + ntohs (mmsg->header.size))
382 LOG (GNUNET_ERROR_TYPE_ERROR,
383 "Received message of type %d with length %lu bytes. "
386 sizeof (struct FragmentResult) + ntohs (mmsg->header.size));
388 reschedule_connect (h);
392 op = find_op_by_id (h, GNUNET_ntohll (fres->op_id));
395 LOG (GNUNET_ERROR_TYPE_DEBUG,
396 "No callback registered for operation with ID %" PRIu64 ".\n",
397 type, GNUNET_ntohll (fres->op_id));
401 if (NULL != op->data_cb)
402 ((GNUNET_PSYCSTORE_FragmentCallback)
403 op->data_cb) (op->cls, mmsg, ntohl (fres->psycstore_flags));
407 case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_STATE:
408 if (size < sizeof (struct StateResult))
410 LOG (GNUNET_ERROR_TYPE_ERROR,
411 "Received message of type %d with length %lu bytes. "
413 type, size, sizeof (struct StateResult));
415 reschedule_connect (h);
419 sres = (const struct StateResult *) msg;
420 const char *name = (const char *) &sres[1];
421 uint16_t name_size = ntohs (sres->name_size);
423 if (name_size <= 2 || '\0' != name[name_size - 1])
425 LOG (GNUNET_ERROR_TYPE_ERROR,
426 "Received state result message (type %d) with invalid name.\n",
429 reschedule_connect (h);
433 op = find_op_by_id (h, GNUNET_ntohll (sres->op_id));
436 LOG (GNUNET_ERROR_TYPE_DEBUG,
437 "No callback registered for operation with ID %" PRIu64 ".\n",
438 type, GNUNET_ntohll (sres->op_id));
442 if (NULL != op->data_cb)
443 ((GNUNET_PSYCSTORE_StateCallback)
444 op->data_cb) (op->cls, name, (char *) &sres[1] + name_size,
445 ntohs (sres->header.size) - sizeof (*sres) - name_size);
451 reschedule_connect (h);
455 GNUNET_CLIENT_receive (h->client, &message_handler, h,
456 GNUNET_TIME_UNIT_FOREVER_REL);
461 * Transmit next message to service.
463 * @param cls The 'struct GNUNET_PSYCSTORE_Handle'.
464 * @param size Number of bytes available in buf.
465 * @param buf Where to copy the message.
466 * @return Number of bytes copied to buf.
469 send_next_message (void *cls, size_t size, void *buf)
471 struct GNUNET_PSYCSTORE_Handle *h = cls;
472 struct GNUNET_PSYCSTORE_OperationHandle *op = h->transmit_head;
478 ret = ntohs (op->msg->size);
481 reschedule_connect (h);
484 LOG (GNUNET_ERROR_TYPE_DEBUG,
485 "Sending message of type %d to PSYCstore service. ID: %" PRIu64 "\n",
486 ntohs (op->msg->type), op->op_id);
487 memcpy (buf, op->msg, ret);
489 GNUNET_CONTAINER_DLL_remove (h->transmit_head, h->transmit_tail, op);
491 if (NULL == op->res_cb && NULL == op->data_cb)
497 GNUNET_CONTAINER_DLL_insert_tail (h->op_head, h->op_tail, op);
500 if (NULL != h->transmit_head)
503 if (GNUNET_NO == h->in_receive)
505 h->in_receive = GNUNET_YES;
506 GNUNET_CLIENT_receive (h->client, &message_handler, h,
507 GNUNET_TIME_UNIT_FOREVER_REL);
514 * Schedule transmission of the next message from our queue.
516 * @param h PSYCstore handle.
519 transmit_next (struct GNUNET_PSYCSTORE_Handle *h)
521 if (NULL != h->th || NULL == h->client)
524 struct GNUNET_PSYCSTORE_OperationHandle *op = h->transmit_head;
528 h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
529 ntohs (op->msg->size),
530 GNUNET_TIME_UNIT_FOREVER_REL,
538 * Try again to connect to the PSYCstore service.
540 * @param cls Handle to the PSYCstore service.
541 * @param tc Scheduler context.
544 reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
546 struct GNUNET_PSYCSTORE_Handle *h = cls;
548 h->reconnect_task = NULL;
549 LOG (GNUNET_ERROR_TYPE_DEBUG,
550 "Connecting to PSYCstore service.\n");
551 GNUNET_assert (NULL == h->client);
552 h->client = GNUNET_CLIENT_connect ("psycstore", h->cfg);
553 GNUNET_assert (NULL != h->client);
559 * Connect to the PSYCstore service.
561 * @param cfg The configuration to use
562 * @return Handle to use
564 struct GNUNET_PSYCSTORE_Handle *
565 GNUNET_PSYCSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
567 struct GNUNET_PSYCSTORE_Handle *h
568 = GNUNET_new (struct GNUNET_PSYCSTORE_Handle);
570 h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
571 h->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, h);
577 * Disconnect from PSYCstore service
579 * @param h Handle to destroy
582 GNUNET_PSYCSTORE_disconnect (struct GNUNET_PSYCSTORE_Handle *h)
584 GNUNET_assert (NULL != h);
585 if (h->reconnect_task != NULL)
587 GNUNET_SCHEDULER_cancel (h->reconnect_task);
588 h->reconnect_task = NULL;
592 GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
595 if (NULL != h->client)
597 GNUNET_CLIENT_disconnect (h->client);
605 * Cancel a PSYCstore operation. Note that the operation MAY still
606 * be executed; this merely cancels the continuation; if the request
607 * was already transmitted, the service may still choose to complete
610 * @param op Operation to cancel.
613 GNUNET_PSYCSTORE_operation_cancel (struct GNUNET_PSYCSTORE_OperationHandle *op)
615 struct GNUNET_PSYCSTORE_Handle *h = op->h;
617 if (h->transmit_head != NULL && (h->transmit_head != op || NULL == h->client))
619 /* request not active, can simply remove */
620 GNUNET_CONTAINER_DLL_remove (h->transmit_head, h->transmit_tail, op);
626 /* request active but not yet with service, can still abort */
627 GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
629 GNUNET_CONTAINER_DLL_remove (h->transmit_head, h->transmit_tail, op);
634 /* request active with service, simply ensure continuations are not called */
641 * Store join/leave events for a PSYC channel in order to be able to answer
642 * membership test queries later.
645 * Handle for the PSYCstore.
647 * The channel where the event happened.
649 * Public key of joining/leaving slave.
651 * #GNUNET_YES on join, #GNUNET_NO on part.
652 * @param announced_at
653 * ID of the message that announced the membership change.
654 * @param effective_since
655 * Message ID this membership change is in effect since.
656 * For joins it is <= announced_at, for parts it is always 0.
657 * @param group_generation
658 * In case of a part, the last group generation the slave has access to.
659 * It has relevance when a larger message have fragments with different
662 * Callback to call with the result of the storage operation.
664 * Closure for the callback.
666 * @return Operation handle that can be used to cancel the operation.
668 struct GNUNET_PSYCSTORE_OperationHandle *
669 GNUNET_PSYCSTORE_membership_store (struct GNUNET_PSYCSTORE_Handle *h,
670 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
671 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
673 uint64_t announced_at,
674 uint64_t effective_since,
675 uint64_t group_generation,
676 GNUNET_PSYCSTORE_ResultCallback rcb,
679 GNUNET_assert (NULL != h);
680 GNUNET_assert (NULL != channel_key);
681 GNUNET_assert (NULL != slave_key);
682 GNUNET_assert (GNUNET_YES == did_join || GNUNET_NO == did_join);
683 GNUNET_assert (did_join
684 ? effective_since <= announced_at
685 : effective_since == 0);
687 struct MembershipStoreRequest *req;
688 struct GNUNET_PSYCSTORE_OperationHandle *
689 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
694 req = (struct MembershipStoreRequest *) &op[1];
695 op->msg = (struct GNUNET_MessageHeader *) req;
696 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_STORE);
697 req->header.size = htons (sizeof (*req));
698 req->channel_key = *channel_key;
699 req->slave_key = *slave_key;
700 req->did_join = did_join;
701 req->announced_at = GNUNET_htonll (announced_at);
702 req->effective_since = GNUNET_htonll (effective_since);
703 req->group_generation = GNUNET_htonll (group_generation);
705 op->op_id = get_next_op_id (h);
706 req->op_id = GNUNET_htonll (op->op_id);
708 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
716 * Test if a member was admitted to the channel at the given message ID.
718 * This is useful when relaying and replaying messages to check if a particular
719 * slave has access to the message fragment with a given group generation. It
720 * is also used when handling join requests to determine whether the slave is
721 * currently admitted to the channel.
724 * Handle for the PSYCstore.
726 * The channel we are interested in.
728 * Public key of slave whose membership to check.
730 * Message ID for which to do the membership test.
731 * @param group_generation
732 * Group generation of the fragment of the message to test.
733 * It has relevance if the message consists of multiple fragments with
734 * different group generations.
736 * Callback to call with the test result.
738 * Closure for the callback.
740 * @return Operation handle that can be used to cancel the operation.
742 struct GNUNET_PSYCSTORE_OperationHandle *
743 GNUNET_PSYCSTORE_membership_test (struct GNUNET_PSYCSTORE_Handle *h,
744 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
745 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
747 uint64_t group_generation,
748 GNUNET_PSYCSTORE_ResultCallback rcb,
751 struct MembershipTestRequest *req;
752 struct GNUNET_PSYCSTORE_OperationHandle *
753 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
758 req = (struct MembershipTestRequest *) &op[1];
759 op->msg = (struct GNUNET_MessageHeader *) req;
760 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_TEST);
761 req->header.size = htons (sizeof (*req));
762 req->channel_key = *channel_key;
763 req->slave_key = *slave_key;
764 req->message_id = GNUNET_htonll (message_id);
765 req->group_generation = GNUNET_htonll (group_generation);
767 op->op_id = get_next_op_id (h);
768 req->op_id = GNUNET_htonll (op->op_id);
770 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
778 * Store a message fragment sent to a channel.
780 * @param h Handle for the PSYCstore.
781 * @param channel_key The channel the message belongs to.
782 * @param message Message to store.
783 * @param psycstore_flags Flags indicating whether the PSYC message contains
785 * @param rcb Callback to call with the result of the operation.
786 * @param rcb_cls Closure for the callback.
788 * @return Handle that can be used to cancel the operation.
790 struct GNUNET_PSYCSTORE_OperationHandle *
791 GNUNET_PSYCSTORE_fragment_store (struct GNUNET_PSYCSTORE_Handle *h,
792 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
793 const struct GNUNET_MULTICAST_MessageHeader *msg,
794 enum GNUNET_PSYCSTORE_MessageFlags psycstore_flags,
795 GNUNET_PSYCSTORE_ResultCallback rcb,
798 uint16_t size = ntohs (msg->header.size);
799 struct FragmentStoreRequest *req;
800 struct GNUNET_PSYCSTORE_OperationHandle *
801 op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + size);
806 req = (struct FragmentStoreRequest *) &op[1];
807 op->msg = (struct GNUNET_MessageHeader *) req;
808 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_STORE);
809 req->header.size = htons (sizeof (*req) + size);
810 req->channel_key = *channel_key;
811 req->psycstore_flags = htonl (psycstore_flags);
812 memcpy (&req[1], msg, size);
814 op->op_id = get_next_op_id (h);
815 req->op_id = GNUNET_htonll (op->op_id);
817 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
825 * Retrieve message fragments by fragment ID range.
828 * Handle for the PSYCstore.
830 * The channel we are interested in.
832 * The slave requesting the fragment. If not NULL, a membership test is
833 * performed first and the fragment is only returned if the slave has
835 * @param first_fragment_id
836 * First fragment ID to retrieve.
837 * Use 0 to get the latest message fragment.
838 * @param last_fragment_id
839 * Last consecutive fragment ID to retrieve.
840 * Use 0 to get the latest message fragment.
841 * @param fragment_limit
842 * Maximum number of fragments to retrieve.
844 * Callback to call with the retrieved fragments.
846 * Callback to call with the result of the operation.
848 * Closure for the callbacks.
850 * @return Handle that can be used to cancel the operation.
852 struct GNUNET_PSYCSTORE_OperationHandle *
853 GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h,
854 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
855 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
856 uint64_t first_fragment_id,
857 uint64_t last_fragment_id,
858 GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
859 GNUNET_PSYCSTORE_ResultCallback rcb,
862 struct FragmentGetRequest *req;
863 struct GNUNET_PSYCSTORE_OperationHandle *
864 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
866 op->data_cb = (DataCallback) fragment_cb;
870 req = (struct FragmentGetRequest *) &op[1];
871 op->msg = (struct GNUNET_MessageHeader *) req;
872 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET);
873 req->header.size = htons (sizeof (*req));
874 req->channel_key = *channel_key;
875 req->first_fragment_id = GNUNET_htonll (first_fragment_id);
876 req->last_fragment_id = GNUNET_htonll (last_fragment_id);
877 if (NULL != slave_key)
879 req->slave_key = *slave_key;
880 req->do_membership_test = GNUNET_YES;
883 op->op_id = get_next_op_id (h);
884 req->op_id = GNUNET_htonll (op->op_id);
886 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
894 * Retrieve latest message fragments.
897 * Handle for the PSYCstore.
899 * The channel we are interested in.
901 * The slave requesting the fragment. If not NULL, a membership test is
902 * performed first and the fragment is only returned if the slave has
904 * @param first_fragment_id
905 * First fragment ID to retrieve.
906 * Use 0 to get the latest message fragment.
907 * @param last_fragment_id
908 * Last consecutive fragment ID to retrieve.
909 * Use 0 to get the latest message fragment.
910 * @param fragment_limit
911 * Maximum number of fragments to retrieve.
913 * Callback to call with the retrieved fragments.
915 * Callback to call with the result of the operation.
917 * Closure for the callbacks.
919 * @return Handle that can be used to cancel the operation.
921 struct GNUNET_PSYCSTORE_OperationHandle *
922 GNUNET_PSYCSTORE_fragment_get_latest (struct GNUNET_PSYCSTORE_Handle *h,
923 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
924 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
925 uint64_t fragment_limit,
926 GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
927 GNUNET_PSYCSTORE_ResultCallback rcb,
930 struct FragmentGetRequest *req;
931 struct GNUNET_PSYCSTORE_OperationHandle *
932 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
934 op->data_cb = (DataCallback) fragment_cb;
938 req = (struct FragmentGetRequest *) &op[1];
939 op->msg = (struct GNUNET_MessageHeader *) req;
940 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET);
941 req->header.size = htons (sizeof (*req));
942 req->channel_key = *channel_key;
943 req->fragment_limit = GNUNET_ntohll (fragment_limit);
944 if (NULL != slave_key)
946 req->slave_key = *slave_key;
947 req->do_membership_test = GNUNET_YES;
950 op->op_id = get_next_op_id (h);
951 req->op_id = GNUNET_htonll (op->op_id);
953 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
961 * Retrieve all fragments of messages in a message ID range.
964 * Handle for the PSYCstore.
966 * The channel we are interested in.
968 * The slave requesting the message. If not NULL, a membership test is
969 * performed first and the message is only returned if the slave has
971 * @param first_message_id
972 * First message ID to retrieve.
973 * Use 0 to get the latest message.
974 * @param last_message_id
975 * Last consecutive message ID to retrieve.
976 * Use 0 to get the latest message.
978 * Callback to call with the retrieved fragments.
980 * Callback to call with the result of the operation.
982 * Closure for the callbacks.
984 * @return Handle that can be used to cancel the operation.
986 struct GNUNET_PSYCSTORE_OperationHandle *
987 GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h,
988 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
989 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
990 uint64_t first_message_id,
991 uint64_t last_message_id,
992 GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
993 GNUNET_PSYCSTORE_ResultCallback rcb,
996 struct MessageGetRequest *req;
997 struct GNUNET_PSYCSTORE_OperationHandle *
998 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
1000 op->data_cb = (DataCallback) fragment_cb;
1004 req = (struct MessageGetRequest *) &op[1];
1005 op->msg = (struct GNUNET_MessageHeader *) req;
1006 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET);
1007 req->header.size = htons (sizeof (*req));
1008 req->channel_key = *channel_key;
1009 req->first_message_id = GNUNET_htonll (first_message_id);
1010 req->last_message_id = GNUNET_htonll (last_message_id);
1011 if (NULL != slave_key)
1013 req->slave_key = *slave_key;
1014 req->do_membership_test = GNUNET_YES;
1017 op->op_id = get_next_op_id (h);
1018 req->op_id = GNUNET_htonll (op->op_id);
1020 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1028 * Retrieve all fragments of the latest messages.
1031 * Handle for the PSYCstore.
1032 * @param channel_key
1033 * The channel we are interested in.
1035 * The slave requesting the message. If not NULL, a membership test is
1036 * performed first and the message is only returned if the slave has
1038 * @param message_limit
1039 * Maximum number of messages to retrieve.
1040 * @param fragment_cb
1041 * Callback to call with the retrieved fragments.
1043 * Callback to call with the result of the operation.
1045 * Closure for the callbacks.
1047 * @return Handle that can be used to cancel the operation.
1049 struct GNUNET_PSYCSTORE_OperationHandle *
1050 GNUNET_PSYCSTORE_message_get_latest (struct GNUNET_PSYCSTORE_Handle *h,
1051 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1052 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
1053 uint64_t message_limit,
1054 GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
1055 GNUNET_PSYCSTORE_ResultCallback rcb,
1058 struct MessageGetRequest *req;
1059 struct GNUNET_PSYCSTORE_OperationHandle *
1060 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
1062 op->data_cb = (DataCallback) fragment_cb;
1066 req = (struct MessageGetRequest *) &op[1];
1067 op->msg = (struct GNUNET_MessageHeader *) req;
1068 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET);
1069 req->header.size = htons (sizeof (*req));
1070 req->channel_key = *channel_key;
1071 req->message_limit = GNUNET_ntohll (message_limit);
1072 if (NULL != slave_key)
1074 req->slave_key = *slave_key;
1075 req->do_membership_test = GNUNET_YES;
1078 op->op_id = get_next_op_id (h);
1079 req->op_id = GNUNET_htonll (op->op_id);
1081 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1089 * Retrieve a fragment of message specified by its message ID and fragment
1093 * Handle for the PSYCstore.
1094 * @param channel_key
1095 * The channel we are interested in.
1097 * The slave requesting the message fragment. If not NULL, a membership
1098 * test is performed first and the message fragment is only returned
1099 * if the slave has access to it.
1101 * Message ID to retrieve. Use 0 to get the latest message.
1102 * @param fragment_offset
1103 * Offset of the fragment to retrieve.
1104 * @param fragment_cb
1105 * Callback to call with the retrieved fragments.
1107 * Callback to call with the result of the operation.
1109 * Closure for the callbacks.
1111 * @return Handle that can be used to cancel the operation.
1113 struct GNUNET_PSYCSTORE_OperationHandle *
1114 GNUNET_PSYCSTORE_message_get_fragment (struct GNUNET_PSYCSTORE_Handle *h,
1115 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1116 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
1117 uint64_t message_id,
1118 uint64_t fragment_offset,
1119 GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
1120 GNUNET_PSYCSTORE_ResultCallback rcb,
1123 struct MessageGetFragmentRequest *req;
1124 struct GNUNET_PSYCSTORE_OperationHandle *
1125 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
1127 op->data_cb = (DataCallback) fragment_cb;
1131 req = (struct MessageGetFragmentRequest *) &op[1];
1132 op->msg = (struct GNUNET_MessageHeader *) req;
1133 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET_FRAGMENT);
1134 req->header.size = htons (sizeof (*req));
1135 req->channel_key = *channel_key;
1136 req->message_id = GNUNET_htonll (message_id);
1137 req->fragment_offset = GNUNET_htonll (fragment_offset);
1138 if (NULL != slave_key)
1140 req->slave_key = *slave_key;
1141 req->do_membership_test = GNUNET_YES;
1144 op->op_id = get_next_op_id (h);
1145 req->op_id = GNUNET_htonll (op->op_id);
1147 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1155 * Retrieve latest values of counters for a channel master.
1157 * The current value of counters are needed when a channel master is restarted,
1158 * so that it can continue incrementing the counters from their last value.
1160 * @param h Handle for the PSYCstore.
1161 * @param channel_key Public key that identifies the channel.
1162 * @param ccb Callback to call with the result.
1163 * @param ccb_cls Closure for the @a ccb callback.
1165 * @return Handle that can be used to cancel the operation.
1167 struct GNUNET_PSYCSTORE_OperationHandle *
1168 GNUNET_PSYCSTORE_counters_get (struct GNUNET_PSYCSTORE_Handle *h,
1169 struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1170 GNUNET_PSYCSTORE_CountersCallback ccb,
1173 struct OperationRequest *req;
1174 struct GNUNET_PSYCSTORE_OperationHandle *
1175 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
1180 req = (struct OperationRequest *) &op[1];
1181 op->msg = (struct GNUNET_MessageHeader *) req;
1182 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_COUNTERS_GET);
1183 req->header.size = htons (sizeof (*req));
1184 req->channel_key = *channel_key;
1186 op->op_id = get_next_op_id (h);
1187 req->op_id = GNUNET_htonll (op->op_id);
1189 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1197 * Apply modifiers of a message to the current channel state.
1199 * An error is returned if there are missing messages containing state
1200 * operations before the current one.
1202 * @param h Handle for the PSYCstore.
1203 * @param channel_key The channel we are interested in.
1204 * @param message_id ID of the message that contains the @a modifiers.
1205 * @param state_delta Value of the _state_delta PSYC header variable of the message.
1206 * @param modifier_count Number of elements in the @a modifiers array.
1207 * @param modifiers List of modifiers to apply.
1208 * @param rcb Callback to call with the result of the operation.
1209 * @param rcb_cls Closure for the @a rcb callback.
1211 * @return Handle that can be used to cancel the operation.
1213 struct GNUNET_PSYCSTORE_OperationHandle *
1214 GNUNET_PSYCSTORE_state_modify (struct GNUNET_PSYCSTORE_Handle *h,
1215 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1216 uint64_t message_id,
1217 uint64_t state_delta,
1218 size_t modifier_count,
1219 const struct GNUNET_ENV_Modifier *modifiers,
1220 GNUNET_PSYCSTORE_ResultCallback rcb,
1223 struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
1226 for (i = 0; i < modifier_count; i++) {
1227 struct StateModifyRequest *req;
1228 uint16_t name_size = strlen (modifiers[i].name) + 1;
1230 op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size +
1231 modifiers[i].value_size);
1236 req = (struct StateModifyRequest *) &op[1];
1237 op->msg = (struct GNUNET_MessageHeader *) req;
1238 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY);
1239 req->header.size = htons (sizeof (*req) + name_size
1240 + modifiers[i].value_size);
1241 req->channel_key = *channel_key;
1242 req->message_id = GNUNET_htonll (message_id);
1243 req->state_delta = GNUNET_htonll (state_delta);
1244 req->oper = modifiers[i].oper;
1245 req->name_size = htons (name_size);
1249 : modifier_count - 1 == i
1253 memcpy (&req[1], modifiers[i].name, name_size);
1254 memcpy ((char *) &req[1] + name_size, modifiers[i].value, modifiers[i].value_size);
1256 op->op_id = get_next_op_id (h);
1257 req->op_id = GNUNET_htonll (op->op_id);
1259 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1263 /* FIXME: only the last operation is returned,
1264 * operation_cancel() should be able to cancel all of them.
1270 * Store synchronized state.
1272 * @param h Handle for the PSYCstore.
1273 * @param channel_key The channel we are interested in.
1274 * @param message_id ID of the message that contains the state_hash PSYC header variable.
1275 * @param modifier_count Number of elements in the @a modifiers array.
1276 * @param modifiers Full state to store.
1277 * @param rcb Callback to call with the result of the operation.
1278 * @param rcb_cls Closure for the callback.
1280 * @return Handle that can be used to cancel the operation.
1282 struct GNUNET_PSYCSTORE_OperationHandle *
1283 GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h,
1284 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1285 uint64_t message_id,
1286 size_t modifier_count,
1287 const struct GNUNET_ENV_Modifier *modifiers,
1288 GNUNET_PSYCSTORE_ResultCallback rcb,
1291 struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
1294 for (i = 0; i < modifier_count; i++) {
1295 struct StateSyncRequest *req;
1296 uint16_t name_size = strlen (modifiers[i].name) + 1;
1298 op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size +
1299 modifiers[i].value_size);
1304 req = (struct StateSyncRequest *) &op[1];
1305 op->msg = (struct GNUNET_MessageHeader *) req;
1306 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC);
1307 req->header.size = htons (sizeof (*req) + name_size
1308 + modifiers[i].value_size);
1309 req->channel_key = *channel_key;
1310 req->message_id = GNUNET_htonll (message_id);
1311 req->name_size = htons (name_size);
1315 : (modifier_count - 1 == i)
1319 memcpy (&req[1], modifiers[i].name, name_size);
1320 memcpy ((char *) &req[1] + name_size, modifiers[i].value, modifiers[i].value_size);
1322 op->op_id = get_next_op_id (h);
1323 req->op_id = GNUNET_htonll (op->op_id);
1325 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1333 * Reset the state of a channel.
1335 * Delete all state variables stored for the given channel.
1337 * @param h Handle for the PSYCstore.
1338 * @param channel_key The channel we are interested in.
1339 * @param rcb Callback to call with the result of the operation.
1340 * @param rcb_cls Closure for the callback.
1342 * @return Handle that can be used to cancel the operation.
1344 struct GNUNET_PSYCSTORE_OperationHandle *
1345 GNUNET_PSYCSTORE_state_reset (struct GNUNET_PSYCSTORE_Handle *h,
1346 const struct GNUNET_CRYPTO_EddsaPublicKey
1348 GNUNET_PSYCSTORE_ResultCallback rcb,
1351 struct OperationRequest *req;
1352 struct GNUNET_PSYCSTORE_OperationHandle *
1353 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
1358 req = (struct OperationRequest *) &op[1];
1359 op->msg = (struct GNUNET_MessageHeader *) req;
1360 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_RESET);
1361 req->header.size = htons (sizeof (*req));
1362 req->channel_key = *channel_key;
1364 op->op_id = get_next_op_id (h);
1365 req->op_id = GNUNET_htonll (op->op_id);
1367 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1376 * Update signed values of state variables in the state store.
1378 * @param h Handle for the PSYCstore.
1379 * @param channel_key The channel we are interested in.
1380 * @param message_id Message ID that contained the state @a hash.
1381 * @param hash Hash of the serialized full state.
1382 * @param rcb Callback to call with the result of the operation.
1383 * @param rcb_cls Closure for the callback.
1386 struct GNUNET_PSYCSTORE_OperationHandle *
1387 GNUNET_PSYCSTORE_state_hash_update (struct GNUNET_PSYCSTORE_Handle *h,
1388 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1389 uint64_t message_id,
1390 const struct GNUNET_HashCode *hash,
1391 GNUNET_PSYCSTORE_ResultCallback rcb,
1394 struct StateHashUpdateRequest *req;
1395 struct GNUNET_PSYCSTORE_OperationHandle *
1396 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
1401 req = (struct StateHashUpdateRequest *) &op[1];
1402 op->msg = (struct GNUNET_MessageHeader *) req;
1403 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_RESET);
1404 req->header.size = htons (sizeof (*req));
1405 req->channel_key = *channel_key;
1408 op->op_id = get_next_op_id (h);
1409 req->op_id = GNUNET_htonll (op->op_id);
1411 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1419 * Retrieve the best matching state variable.
1421 * @param h Handle for the PSYCstore.
1422 * @param channel_key The channel we are interested in.
1423 * @param name Name of variable to match, the returned variable might be less specific.
1424 * @param scb Callback to return the matching state variable.
1425 * @param rcb Callback to call with the result of the operation.
1426 * @param cls Closure for the callbacks.
1428 * @return Handle that can be used to cancel the operation.
1430 struct GNUNET_PSYCSTORE_OperationHandle *
1431 GNUNET_PSYCSTORE_state_get (struct GNUNET_PSYCSTORE_Handle *h,
1432 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1434 GNUNET_PSYCSTORE_StateCallback scb,
1435 GNUNET_PSYCSTORE_ResultCallback rcb,
1438 size_t name_size = strlen (name) + 1;
1439 struct OperationRequest *req;
1440 struct GNUNET_PSYCSTORE_OperationHandle *
1441 op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size);
1443 op->data_cb = (DataCallback) scb;
1447 req = (struct OperationRequest *) &op[1];
1448 op->msg = (struct GNUNET_MessageHeader *) req;
1449 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET);
1450 req->header.size = htons (sizeof (*req) + name_size);
1451 req->channel_key = *channel_key;
1452 memcpy (&req[1], name, name_size);
1454 op->op_id = get_next_op_id (h);
1455 req->op_id = GNUNET_htonll (op->op_id);
1457 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1466 * Retrieve all state variables for a channel with the given prefix.
1468 * @param h Handle for the PSYCstore.
1469 * @param channel_key The channel we are interested in.
1470 * @param name_prefix Prefix of state variable names to match.
1471 * @param scb Callback to return matching state variables.
1472 * @param rcb Callback to call with the result of the operation.
1473 * @param cls Closure for the callbacks.
1475 * @return Handle that can be used to cancel the operation.
1477 struct GNUNET_PSYCSTORE_OperationHandle *
1478 GNUNET_PSYCSTORE_state_get_prefix (struct GNUNET_PSYCSTORE_Handle *h,
1479 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1480 const char *name_prefix,
1481 GNUNET_PSYCSTORE_StateCallback scb,
1482 GNUNET_PSYCSTORE_ResultCallback rcb,
1485 size_t name_size = strlen (name_prefix) + 1;
1486 struct OperationRequest *req;
1487 struct GNUNET_PSYCSTORE_OperationHandle *
1488 op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size);
1490 op->data_cb = (DataCallback) scb;
1494 req = (struct OperationRequest *) &op[1];
1495 op->msg = (struct GNUNET_MessageHeader *) req;
1496 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET_PREFIX);
1497 req->header.size = htons (sizeof (*req) + name_size);
1498 req->channel_key = *channel_key;
1499 memcpy (&req[1], name_prefix, name_size);
1501 op->op_id = get_next_op_id (h);
1502 req->op_id = GNUNET_htonll (op->op_id);
1504 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1510 /* end of psycstore_api.c */