2 * This file is part of GNUnet
3 * Copyright (C) 2013 Christian Grothoff (and other contributing authors)
5 * GNUnet is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published
7 * by the Free Software Foundation; either version 3, or (at your
8 * option) any later version.
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with GNUnet; see the file COPYING. If not, write to the
17 * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 * Boston, MA 02110-1301, USA.
22 * @file psycstore/psycstore_api.c
23 * @brief API to interact with the PSYCstore service
24 * @author Gabor X Toth
25 * @author Christian Grothoff
31 #include "gnunet_util_lib.h"
32 #include "gnunet_constants.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_psycstore_service.h"
35 #include "gnunet_multicast_service.h"
36 #include "psycstore.h"
38 #define LOG(kind,...) GNUNET_log_from (kind, "psycstore-api",__VA_ARGS__)
40 typedef void (*DataCallback) ();
43 * Handle for an operation with the PSYCstore service.
45 struct GNUNET_PSYCSTORE_OperationHandle
49 * Main PSYCstore handle.
51 struct GNUNET_PSYCSTORE_Handle *h;
54 * We keep operations in a DLL.
56 struct GNUNET_PSYCSTORE_OperationHandle *next;
59 * We keep operations in a DLL.
61 struct GNUNET_PSYCSTORE_OperationHandle *prev;
64 * Continuation to invoke with the result of an operation.
66 GNUNET_PSYCSTORE_ResultCallback res_cb;
69 * Continuation to invoke with the result of an operation returning data.
74 * Closure for the callbacks.
84 * Message to send to the PSYCstore service.
85 * Allocated at the end of this struct.
87 const struct GNUNET_MessageHeader *msg;
92 * Handle for the service.
94 struct GNUNET_PSYCSTORE_Handle
97 * Configuration to use.
99 const struct GNUNET_CONFIGURATION_Handle *cfg;
102 * Socket (if available).
104 struct GNUNET_CLIENT_Connection *client;
107 * Head of operations to transmit.
109 struct GNUNET_PSYCSTORE_OperationHandle *transmit_head;
112 * Tail of operations to transmit.
114 struct GNUNET_PSYCSTORE_OperationHandle *transmit_tail;
117 * Head of active operations waiting for response.
119 struct GNUNET_PSYCSTORE_OperationHandle *op_head;
122 * Tail of active operations waiting for response.
124 struct GNUNET_PSYCSTORE_OperationHandle *op_tail;
127 * Currently pending transmission request, or NULL for none.
129 struct GNUNET_CLIENT_TransmitHandle *th;
132 * Task doing exponential back-off trying to reconnect.
134 struct GNUNET_SCHEDULER_Task * reconnect_task;
137 * Time for next connect retry.
139 struct GNUNET_TIME_Relative reconnect_delay;
142 * Last operation ID used.
147 * Are we polling for incoming messages right now?
154 * Get a fresh operation ID to distinguish between PSYCstore requests.
156 * @param h Handle to the PSYCstore service.
157 * @return next operation id to use
160 get_next_op_id (struct GNUNET_PSYCSTORE_Handle *h)
162 return h->last_op_id++;
167 * Find operation by ID.
169 * @return OperationHandle if found, or NULL otherwise.
171 static struct GNUNET_PSYCSTORE_OperationHandle *
172 find_op_by_id (struct GNUNET_PSYCSTORE_Handle *h, uint64_t op_id)
174 struct GNUNET_PSYCSTORE_OperationHandle *op = h->op_head;
177 if (op->op_id == op_id)
186 * Try again to connect to the PSYCstore service.
188 * @param cls handle to the PSYCstore service.
189 * @param tc scheduler context
192 reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
196 * Reschedule a connect attempt to the service.
198 * @param h transport service to reconnect
201 reschedule_connect (struct GNUNET_PSYCSTORE_Handle *h)
203 GNUNET_assert (h->reconnect_task == NULL);
207 GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
210 if (NULL != h->client)
212 GNUNET_CLIENT_disconnect (h->client);
215 h->in_receive = GNUNET_NO;
216 LOG (GNUNET_ERROR_TYPE_DEBUG,
217 "Scheduling task to reconnect to PSYCstore service in %s.\n",
218 GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, GNUNET_YES));
220 GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, &reconnect, h);
221 h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
226 * Schedule transmission of the next message from our queue.
228 * @param h PSYCstore handle
231 transmit_next (struct GNUNET_PSYCSTORE_Handle *h);
235 * Type of a function to call when we receive a message
239 * @param msg message received, NULL on timeout or fatal error
242 message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
244 struct GNUNET_PSYCSTORE_Handle *h = cls;
245 struct GNUNET_PSYCSTORE_OperationHandle *op;
246 const struct OperationResult *opres;
247 const struct CountersResult *cres;
248 const struct FragmentResult *fres;
249 const struct StateResult *sres;
254 reschedule_connect (h);
257 LOG (GNUNET_ERROR_TYPE_DEBUG,
258 "Received message of type %d from PSYCstore service.\n",
260 uint16_t size = ntohs (msg->size);
261 uint16_t type = ntohs (msg->type);
264 case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE:
265 if (size < sizeof (struct OperationResult))
267 LOG (GNUNET_ERROR_TYPE_ERROR,
268 "Received message of type %d with length %lu bytes. "
270 type, size, sizeof (struct OperationResult));
272 reschedule_connect (h);
276 opres = (const struct OperationResult *) msg;
277 str = (const char *) &opres[1];
278 if ( (size > sizeof (struct OperationResult)) &&
279 ('\0' != str[size - sizeof (struct OperationResult) - 1]) )
282 reschedule_connect (h);
285 if (size == sizeof (struct OperationResult))
288 op = find_op_by_id (h, GNUNET_ntohll (opres->op_id));
291 LOG (GNUNET_ERROR_TYPE_DEBUG,
292 "No callback registered for operation with ID %" PRIu64 ".\n",
293 type, GNUNET_ntohll (opres->op_id));
297 LOG (GNUNET_ERROR_TYPE_DEBUG,
298 "Received result message (type %d) with operation ID: %" PRIu64 "\n",
301 int64_t result_code = GNUNET_ntohll (opres->result_code) + INT64_MIN;
302 GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op);
303 if (NULL != op->res_cb)
305 const struct StateSyncRequest *ssreq;
306 switch (ntohs (op->msg->type))
308 case GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC:
309 ssreq = (const struct StateSyncRequest *) op->msg;
310 if (!(ssreq->flags & STATE_OP_LAST
311 || GNUNET_OK != result_code))
316 if (NULL != op->res_cb)
317 op->res_cb (op->cls, result_code, str, size - sizeof (*opres));
322 case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS:
323 if (size != sizeof (struct CountersResult))
325 LOG (GNUNET_ERROR_TYPE_ERROR,
326 "Received message of type %d with length %lu bytes. "
328 type, size, sizeof (struct CountersResult));
330 reschedule_connect (h);
334 cres = (const struct CountersResult *) msg;
336 op = find_op_by_id (h, GNUNET_ntohll (cres->op_id));
339 LOG (GNUNET_ERROR_TYPE_DEBUG,
340 "No callback registered for operation with ID %" PRIu64 ".\n",
341 type, GNUNET_ntohll (cres->op_id));
345 GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op);
346 if (NULL != op->data_cb)
347 ((GNUNET_PSYCSTORE_CountersCallback)
348 op->data_cb) (op->cls,
349 ntohl (cres->result_code) + INT32_MIN,
350 GNUNET_ntohll (cres->max_fragment_id),
351 GNUNET_ntohll (cres->max_message_id),
352 GNUNET_ntohll (cres->max_group_generation),
353 GNUNET_ntohll (cres->max_state_message_id));
358 case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_FRAGMENT:
359 if (size < sizeof (struct FragmentResult))
361 LOG (GNUNET_ERROR_TYPE_ERROR,
362 "Received message of type %d with length %lu bytes. "
364 type, size, sizeof (struct FragmentResult));
366 reschedule_connect (h);
370 fres = (const struct FragmentResult *) msg;
371 struct GNUNET_MULTICAST_MessageHeader *mmsg =
372 (struct GNUNET_MULTICAST_MessageHeader *) &fres[1];
373 if (size != sizeof (struct FragmentResult) + ntohs (mmsg->header.size))
375 LOG (GNUNET_ERROR_TYPE_ERROR,
376 "Received message of type %d with length %lu bytes. "
379 sizeof (struct FragmentResult) + ntohs (mmsg->header.size));
381 reschedule_connect (h);
385 op = find_op_by_id (h, GNUNET_ntohll (fres->op_id));
388 LOG (GNUNET_ERROR_TYPE_DEBUG,
389 "No callback registered for operation with ID %" PRIu64 ".\n",
390 type, GNUNET_ntohll (fres->op_id));
394 if (NULL != op->data_cb)
395 ((GNUNET_PSYCSTORE_FragmentCallback)
396 op->data_cb) (op->cls, mmsg, ntohl (fres->psycstore_flags));
400 case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_STATE:
401 if (size < sizeof (struct StateResult))
403 LOG (GNUNET_ERROR_TYPE_ERROR,
404 "Received message of type %d with length %lu bytes. "
406 type, size, sizeof (struct StateResult));
408 reschedule_connect (h);
412 sres = (const struct StateResult *) msg;
413 const char *name = (const char *) &sres[1];
414 uint16_t name_size = ntohs (sres->name_size);
416 if (name_size <= 2 || '\0' != name[name_size - 1])
418 LOG (GNUNET_ERROR_TYPE_ERROR,
419 "Received state result message (type %d) with invalid name.\n",
422 reschedule_connect (h);
426 op = find_op_by_id (h, GNUNET_ntohll (sres->op_id));
429 LOG (GNUNET_ERROR_TYPE_DEBUG,
430 "No callback registered for operation with ID %" PRIu64 ".\n",
431 type, GNUNET_ntohll (sres->op_id));
435 if (NULL != op->data_cb)
436 ((GNUNET_PSYCSTORE_StateCallback)
437 op->data_cb) (op->cls, name, (char *) &sres[1] + name_size,
438 ntohs (sres->header.size) - sizeof (*sres) - name_size);
444 reschedule_connect (h);
448 GNUNET_CLIENT_receive (h->client, &message_handler, h,
449 GNUNET_TIME_UNIT_FOREVER_REL);
454 * Transmit next message to service.
456 * @param cls The 'struct GNUNET_PSYCSTORE_Handle'.
457 * @param size Number of bytes available in buf.
458 * @param buf Where to copy the message.
459 * @return Number of bytes copied to buf.
462 send_next_message (void *cls, size_t size, void *buf)
464 struct GNUNET_PSYCSTORE_Handle *h = cls;
465 struct GNUNET_PSYCSTORE_OperationHandle *op = h->transmit_head;
471 ret = ntohs (op->msg->size);
474 reschedule_connect (h);
477 LOG (GNUNET_ERROR_TYPE_DEBUG,
478 "Sending message of type %d to PSYCstore service. ID: %" PRIu64 "\n",
479 ntohs (op->msg->type), op->op_id);
480 memcpy (buf, op->msg, ret);
482 GNUNET_CONTAINER_DLL_remove (h->transmit_head, h->transmit_tail, op);
484 if (NULL == op->res_cb && NULL == op->data_cb)
490 GNUNET_CONTAINER_DLL_insert_tail (h->op_head, h->op_tail, op);
493 if (NULL != h->transmit_head)
496 if (GNUNET_NO == h->in_receive)
498 h->in_receive = GNUNET_YES;
499 GNUNET_CLIENT_receive (h->client, &message_handler, h,
500 GNUNET_TIME_UNIT_FOREVER_REL);
507 * Schedule transmission of the next message from our queue.
509 * @param h PSYCstore handle.
512 transmit_next (struct GNUNET_PSYCSTORE_Handle *h)
514 if (NULL != h->th || NULL == h->client)
517 struct GNUNET_PSYCSTORE_OperationHandle *op = h->transmit_head;
521 h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
522 ntohs (op->msg->size),
523 GNUNET_TIME_UNIT_FOREVER_REL,
531 * Try again to connect to the PSYCstore service.
533 * @param cls Handle to the PSYCstore service.
534 * @param tc Scheduler context.
537 reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
539 struct GNUNET_PSYCSTORE_Handle *h = cls;
541 h->reconnect_task = NULL;
542 LOG (GNUNET_ERROR_TYPE_DEBUG,
543 "Connecting to PSYCstore service.\n");
544 GNUNET_assert (NULL == h->client);
545 h->client = GNUNET_CLIENT_connect ("psycstore", h->cfg);
546 GNUNET_assert (NULL != h->client);
552 * Connect to the PSYCstore service.
554 * @param cfg The configuration to use
555 * @return Handle to use
557 struct GNUNET_PSYCSTORE_Handle *
558 GNUNET_PSYCSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
560 struct GNUNET_PSYCSTORE_Handle *h
561 = GNUNET_new (struct GNUNET_PSYCSTORE_Handle);
563 h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
564 h->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, h);
570 * Disconnect from PSYCstore service
572 * @param h Handle to destroy
575 GNUNET_PSYCSTORE_disconnect (struct GNUNET_PSYCSTORE_Handle *h)
577 GNUNET_assert (NULL != h);
578 if (h->reconnect_task != NULL)
580 GNUNET_SCHEDULER_cancel (h->reconnect_task);
581 h->reconnect_task = NULL;
585 GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
588 if (NULL != h->client)
590 GNUNET_CLIENT_disconnect (h->client);
598 * Cancel a PSYCstore operation. Note that the operation MAY still
599 * be executed; this merely cancels the continuation; if the request
600 * was already transmitted, the service may still choose to complete
603 * @param op Operation to cancel.
606 GNUNET_PSYCSTORE_operation_cancel (struct GNUNET_PSYCSTORE_OperationHandle *op)
608 struct GNUNET_PSYCSTORE_Handle *h = op->h;
610 if (h->transmit_head != NULL && (h->transmit_head != op || NULL == h->client))
612 /* request not active, can simply remove */
613 GNUNET_CONTAINER_DLL_remove (h->transmit_head, h->transmit_tail, op);
619 /* request active but not yet with service, can still abort */
620 GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
622 GNUNET_CONTAINER_DLL_remove (h->transmit_head, h->transmit_tail, op);
627 /* request active with service, simply ensure continuations are not called */
634 * Store join/leave events for a PSYC channel in order to be able to answer
635 * membership test queries later.
638 * Handle for the PSYCstore.
640 * The channel where the event happened.
642 * Public key of joining/leaving slave.
644 * #GNUNET_YES on join, #GNUNET_NO on part.
645 * @param announced_at
646 * ID of the message that announced the membership change.
647 * @param effective_since
648 * Message ID this membership change is in effect since.
649 * For joins it is <= announced_at, for parts it is always 0.
650 * @param group_generation
651 * In case of a part, the last group generation the slave has access to.
652 * It has relevance when a larger message have fragments with different
655 * Callback to call with the result of the storage operation.
657 * Closure for the callback.
659 * @return Operation handle that can be used to cancel the operation.
661 struct GNUNET_PSYCSTORE_OperationHandle *
662 GNUNET_PSYCSTORE_membership_store (struct GNUNET_PSYCSTORE_Handle *h,
663 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
664 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
666 uint64_t announced_at,
667 uint64_t effective_since,
668 uint64_t group_generation,
669 GNUNET_PSYCSTORE_ResultCallback rcb,
672 GNUNET_assert (NULL != h);
673 GNUNET_assert (NULL != channel_key);
674 GNUNET_assert (NULL != slave_key);
675 GNUNET_assert (GNUNET_YES == did_join || GNUNET_NO == did_join);
676 GNUNET_assert (did_join
677 ? effective_since <= announced_at
678 : effective_since == 0);
680 struct MembershipStoreRequest *req;
681 struct GNUNET_PSYCSTORE_OperationHandle *
682 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
687 req = (struct MembershipStoreRequest *) &op[1];
688 op->msg = (struct GNUNET_MessageHeader *) req;
689 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_STORE);
690 req->header.size = htons (sizeof (*req));
691 req->channel_key = *channel_key;
692 req->slave_key = *slave_key;
693 req->did_join = did_join;
694 req->announced_at = GNUNET_htonll (announced_at);
695 req->effective_since = GNUNET_htonll (effective_since);
696 req->group_generation = GNUNET_htonll (group_generation);
698 op->op_id = get_next_op_id (h);
699 req->op_id = GNUNET_htonll (op->op_id);
701 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
709 * Test if a member was admitted to the channel at the given message ID.
711 * This is useful when relaying and replaying messages to check if a particular
712 * slave has access to the message fragment with a given group generation. It
713 * is also used when handling join requests to determine whether the slave is
714 * currently admitted to the channel.
717 * Handle for the PSYCstore.
719 * The channel we are interested in.
721 * Public key of slave whose membership to check.
723 * Message ID for which to do the membership test.
724 * @param group_generation
725 * Group generation of the fragment of the message to test.
726 * It has relevance if the message consists of multiple fragments with
727 * different group generations.
729 * Callback to call with the test result.
731 * Closure for the callback.
733 * @return Operation handle that can be used to cancel the operation.
735 struct GNUNET_PSYCSTORE_OperationHandle *
736 GNUNET_PSYCSTORE_membership_test (struct GNUNET_PSYCSTORE_Handle *h,
737 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
738 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
740 uint64_t group_generation,
741 GNUNET_PSYCSTORE_ResultCallback rcb,
744 struct MembershipTestRequest *req;
745 struct GNUNET_PSYCSTORE_OperationHandle *
746 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
751 req = (struct MembershipTestRequest *) &op[1];
752 op->msg = (struct GNUNET_MessageHeader *) req;
753 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_TEST);
754 req->header.size = htons (sizeof (*req));
755 req->channel_key = *channel_key;
756 req->slave_key = *slave_key;
757 req->message_id = GNUNET_htonll (message_id);
758 req->group_generation = GNUNET_htonll (group_generation);
760 op->op_id = get_next_op_id (h);
761 req->op_id = GNUNET_htonll (op->op_id);
763 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
771 * Store a message fragment sent to a channel.
773 * @param h Handle for the PSYCstore.
774 * @param channel_key The channel the message belongs to.
775 * @param message Message to store.
776 * @param psycstore_flags Flags indicating whether the PSYC message contains
778 * @param rcb Callback to call with the result of the operation.
779 * @param rcb_cls Closure for the callback.
781 * @return Handle that can be used to cancel the operation.
783 struct GNUNET_PSYCSTORE_OperationHandle *
784 GNUNET_PSYCSTORE_fragment_store (struct GNUNET_PSYCSTORE_Handle *h,
785 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
786 const struct GNUNET_MULTICAST_MessageHeader *msg,
787 enum GNUNET_PSYCSTORE_MessageFlags psycstore_flags,
788 GNUNET_PSYCSTORE_ResultCallback rcb,
791 uint16_t size = ntohs (msg->header.size);
792 struct FragmentStoreRequest *req;
793 struct GNUNET_PSYCSTORE_OperationHandle *
794 op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + size);
799 req = (struct FragmentStoreRequest *) &op[1];
800 op->msg = (struct GNUNET_MessageHeader *) req;
801 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_STORE);
802 req->header.size = htons (sizeof (*req) + size);
803 req->channel_key = *channel_key;
804 req->psycstore_flags = htonl (psycstore_flags);
805 memcpy (&req[1], msg, size);
807 op->op_id = get_next_op_id (h);
808 req->op_id = GNUNET_htonll (op->op_id);
810 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
818 * Retrieve message fragments by fragment ID range.
821 * Handle for the PSYCstore.
823 * The channel we are interested in.
825 * The slave requesting the fragment. If not NULL, a membership test is
826 * performed first and the fragment is only returned if the slave has
828 * @param first_fragment_id
829 * First fragment ID to retrieve.
830 * Use 0 to get the latest message fragment.
831 * @param last_fragment_id
832 * Last consecutive fragment ID to retrieve.
833 * Use 0 to get the latest message fragment.
834 * @param fragment_limit
835 * Maximum number of fragments to retrieve.
837 * Callback to call with the retrieved fragments.
839 * Callback to call with the result of the operation.
841 * Closure for the callbacks.
843 * @return Handle that can be used to cancel the operation.
845 struct GNUNET_PSYCSTORE_OperationHandle *
846 GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h,
847 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
848 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
849 uint64_t first_fragment_id,
850 uint64_t last_fragment_id,
851 GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
852 GNUNET_PSYCSTORE_ResultCallback rcb,
855 struct FragmentGetRequest *req;
856 struct GNUNET_PSYCSTORE_OperationHandle *
857 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
859 op->data_cb = (DataCallback) fragment_cb;
863 req = (struct FragmentGetRequest *) &op[1];
864 op->msg = (struct GNUNET_MessageHeader *) req;
865 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET);
866 req->header.size = htons (sizeof (*req));
867 req->channel_key = *channel_key;
868 req->first_fragment_id = GNUNET_htonll (first_fragment_id);
869 req->last_fragment_id = GNUNET_htonll (last_fragment_id);
870 if (NULL != slave_key)
872 req->slave_key = *slave_key;
873 req->do_membership_test = GNUNET_YES;
876 op->op_id = get_next_op_id (h);
877 req->op_id = GNUNET_htonll (op->op_id);
879 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
887 * Retrieve latest message fragments.
890 * Handle for the PSYCstore.
892 * The channel we are interested in.
894 * The slave requesting the fragment. If not NULL, a membership test is
895 * performed first and the fragment is only returned if the slave has
897 * @param first_fragment_id
898 * First fragment ID to retrieve.
899 * Use 0 to get the latest message fragment.
900 * @param last_fragment_id
901 * Last consecutive fragment ID to retrieve.
902 * Use 0 to get the latest message fragment.
903 * @param fragment_limit
904 * Maximum number of fragments to retrieve.
906 * Callback to call with the retrieved fragments.
908 * Callback to call with the result of the operation.
910 * Closure for the callbacks.
912 * @return Handle that can be used to cancel the operation.
914 struct GNUNET_PSYCSTORE_OperationHandle *
915 GNUNET_PSYCSTORE_fragment_get_latest (struct GNUNET_PSYCSTORE_Handle *h,
916 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
917 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
918 uint64_t fragment_limit,
919 GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
920 GNUNET_PSYCSTORE_ResultCallback rcb,
923 struct FragmentGetRequest *req;
924 struct GNUNET_PSYCSTORE_OperationHandle *
925 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
927 op->data_cb = (DataCallback) fragment_cb;
931 req = (struct FragmentGetRequest *) &op[1];
932 op->msg = (struct GNUNET_MessageHeader *) req;
933 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET);
934 req->header.size = htons (sizeof (*req));
935 req->channel_key = *channel_key;
936 req->fragment_limit = GNUNET_ntohll (fragment_limit);
937 if (NULL != slave_key)
939 req->slave_key = *slave_key;
940 req->do_membership_test = GNUNET_YES;
943 op->op_id = get_next_op_id (h);
944 req->op_id = GNUNET_htonll (op->op_id);
946 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
954 * Retrieve all fragments of messages in a message ID range.
957 * Handle for the PSYCstore.
959 * The channel we are interested in.
961 * The slave requesting the message.
962 * If not NULL, a membership test is performed first
963 * and the message is only returned if the slave has access to it.
964 * @param first_message_id
965 * First message ID to retrieve.
966 * @param last_message_id
967 * Last consecutive message ID to retrieve.
968 * @param method_prefix
969 * Retrieve only messages with a matching method prefix.
970 * @todo Implement method_prefix query.
972 * Callback to call with the retrieved fragments.
974 * Callback to call with the result of the operation.
976 * Closure for the callbacks.
978 * @return Handle that can be used to cancel the operation.
980 struct GNUNET_PSYCSTORE_OperationHandle *
981 GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h,
982 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
983 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
984 uint64_t first_message_id,
985 uint64_t last_message_id,
986 const char *method_prefix,
987 GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
988 GNUNET_PSYCSTORE_ResultCallback rcb,
991 struct MessageGetRequest *req;
992 if (NULL == method_prefix)
994 uint16_t method_size = strnlen (method_prefix,
995 GNUNET_SERVER_MAX_MESSAGE_SIZE
996 - sizeof (*req)) + 1;
998 struct GNUNET_PSYCSTORE_OperationHandle *
999 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
1001 op->data_cb = (DataCallback) fragment_cb;
1005 req = (struct MessageGetRequest *) &op[1];
1006 op->msg = (struct GNUNET_MessageHeader *) req;
1007 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET);
1008 req->header.size = htons (sizeof (*req) + method_size);
1009 req->channel_key = *channel_key;
1010 req->first_message_id = GNUNET_htonll (first_message_id);
1011 req->last_message_id = GNUNET_htonll (last_message_id);
1012 if (NULL != slave_key)
1014 req->slave_key = *slave_key;
1015 req->do_membership_test = GNUNET_YES;
1017 memcpy (&req[1], method_prefix, method_size);
1018 ((char *) &req[1])[method_size - 1] = '\0';
1020 op->op_id = get_next_op_id (h);
1021 req->op_id = GNUNET_htonll (op->op_id);
1023 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1031 * Retrieve all fragments of the latest messages.
1034 * Handle for the PSYCstore.
1035 * @param channel_key
1036 * The channel we are interested in.
1038 * The slave requesting the message.
1039 * If not NULL, a membership test is performed first
1040 * and the message is only returned if the slave has access to it.
1041 * @param message_limit
1042 * Maximum number of messages to retrieve.
1043 * @param method_prefix
1044 * Retrieve only messages with a matching method prefix.
1045 * @todo Implement method_prefix query.
1046 * @param fragment_cb
1047 * Callback to call with the retrieved fragments.
1049 * Callback to call with the result of the operation.
1051 * Closure for the callbacks.
1053 * @return Handle that can be used to cancel the operation.
1055 struct GNUNET_PSYCSTORE_OperationHandle *
1056 GNUNET_PSYCSTORE_message_get_latest (struct GNUNET_PSYCSTORE_Handle *h,
1057 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1058 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
1059 uint64_t message_limit,
1060 const char *method_prefix,
1061 GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
1062 GNUNET_PSYCSTORE_ResultCallback rcb,
1065 struct MessageGetRequest *req;
1067 if (NULL == method_prefix)
1069 uint16_t method_size = strnlen (method_prefix,
1070 GNUNET_SERVER_MAX_MESSAGE_SIZE
1071 - sizeof (*req)) + 1;
1072 GNUNET_assert ('\0' == method_prefix[method_size - 1]);
1074 struct GNUNET_PSYCSTORE_OperationHandle *
1075 op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + method_size);
1077 op->data_cb = (DataCallback) fragment_cb;
1081 req = (struct MessageGetRequest *) &op[1];
1082 op->msg = (struct GNUNET_MessageHeader *) req;
1083 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET);
1084 req->header.size = htons (sizeof (*req) + method_size);
1085 req->channel_key = *channel_key;
1086 req->message_limit = GNUNET_ntohll (message_limit);
1087 if (NULL != slave_key)
1089 req->slave_key = *slave_key;
1090 req->do_membership_test = GNUNET_YES;
1093 op->op_id = get_next_op_id (h);
1094 req->op_id = GNUNET_htonll (op->op_id);
1095 memcpy (&req[1], method_prefix, method_size);
1097 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1105 * Retrieve a fragment of message specified by its message ID and fragment
1109 * Handle for the PSYCstore.
1110 * @param channel_key
1111 * The channel we are interested in.
1113 * The slave requesting the message fragment. If not NULL, a membership
1114 * test is performed first and the message fragment is only returned
1115 * if the slave has access to it.
1117 * Message ID to retrieve. Use 0 to get the latest message.
1118 * @param fragment_offset
1119 * Offset of the fragment to retrieve.
1120 * @param fragment_cb
1121 * Callback to call with the retrieved fragments.
1123 * Callback to call with the result of the operation.
1125 * Closure for the callbacks.
1127 * @return Handle that can be used to cancel the operation.
1129 struct GNUNET_PSYCSTORE_OperationHandle *
1130 GNUNET_PSYCSTORE_message_get_fragment (struct GNUNET_PSYCSTORE_Handle *h,
1131 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1132 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
1133 uint64_t message_id,
1134 uint64_t fragment_offset,
1135 GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
1136 GNUNET_PSYCSTORE_ResultCallback rcb,
1139 struct MessageGetFragmentRequest *req;
1140 struct GNUNET_PSYCSTORE_OperationHandle *
1141 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
1143 op->data_cb = (DataCallback) fragment_cb;
1147 req = (struct MessageGetFragmentRequest *) &op[1];
1148 op->msg = (struct GNUNET_MessageHeader *) req;
1149 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET_FRAGMENT);
1150 req->header.size = htons (sizeof (*req));
1151 req->channel_key = *channel_key;
1152 req->message_id = GNUNET_htonll (message_id);
1153 req->fragment_offset = GNUNET_htonll (fragment_offset);
1154 if (NULL != slave_key)
1156 req->slave_key = *slave_key;
1157 req->do_membership_test = GNUNET_YES;
1160 op->op_id = get_next_op_id (h);
1161 req->op_id = GNUNET_htonll (op->op_id);
1163 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1171 * Retrieve latest values of counters for a channel master.
1173 * The current value of counters are needed when a channel master is restarted,
1174 * so that it can continue incrementing the counters from their last value.
1177 * Handle for the PSYCstore.
1178 * @param channel_key
1179 * Public key that identifies the channel.
1181 * Callback to call with the result.
1183 * Closure for the @a ccb callback.
1185 * @return Handle that can be used to cancel the operation.
1187 struct GNUNET_PSYCSTORE_OperationHandle *
1188 GNUNET_PSYCSTORE_counters_get (struct GNUNET_PSYCSTORE_Handle *h,
1189 struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1190 GNUNET_PSYCSTORE_CountersCallback ccb,
1193 struct OperationRequest *req;
1194 struct GNUNET_PSYCSTORE_OperationHandle *
1195 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
1200 req = (struct OperationRequest *) &op[1];
1201 op->msg = (struct GNUNET_MessageHeader *) req;
1202 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_COUNTERS_GET);
1203 req->header.size = htons (sizeof (*req));
1204 req->channel_key = *channel_key;
1206 op->op_id = get_next_op_id (h);
1207 req->op_id = GNUNET_htonll (op->op_id);
1209 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1217 * Apply modifiers of a message to the current channel state.
1219 * An error is returned if there are missing messages containing state
1220 * operations before the current one.
1223 * Handle for the PSYCstore.
1224 * @param channel_key
1225 * The channel we are interested in.
1227 * ID of the message that contains the @a modifiers.
1228 * @param state_delta
1229 * Value of the _state_delta PSYC header variable of the message.
1231 * Callback to call with the result of the operation.
1233 * Closure for the @a rcb callback.
1235 * @return Handle that can be used to cancel the operation.
1237 struct GNUNET_PSYCSTORE_OperationHandle *
1238 GNUNET_PSYCSTORE_state_modify (struct GNUNET_PSYCSTORE_Handle *h,
1239 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1240 uint64_t message_id,
1241 uint64_t state_delta,
1242 GNUNET_PSYCSTORE_ResultCallback rcb,
1245 struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
1246 struct StateModifyRequest *req;
1248 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
1253 req = (struct StateModifyRequest *) &op[1];
1254 op->msg = (struct GNUNET_MessageHeader *) req;
1255 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY);
1256 req->header.size = htons (sizeof (*req));
1257 req->channel_key = *channel_key;
1258 req->message_id = GNUNET_htonll (message_id);
1259 req->state_delta = GNUNET_htonll (state_delta);
1261 op->op_id = get_next_op_id (h);
1262 req->op_id = GNUNET_htonll (op->op_id);
1264 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1268 /* FIXME: only the last operation is returned,
1269 * operation_cancel() should be able to cancel all of them.
1275 * Store synchronized state.
1278 * Handle for the PSYCstore.
1279 * @param channel_key
1280 * The channel we are interested in.
1281 * @param max_state_message_id
1282 * ID of the last stateful message before @a state_hash_message_id.
1283 * @param state_hash_message_id
1284 * ID of the message that contains the state_hash PSYC header variable.
1285 * @param modifier_count
1286 * Number of elements in the @a modifiers array.
1288 * Full state to store.
1290 * Callback to call with the result of the operation.
1292 * Closure for the callback.
1294 * @return Handle that can be used to cancel the operation.
1296 struct GNUNET_PSYCSTORE_OperationHandle *
1297 GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h,
1298 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1299 uint64_t max_state_message_id,
1300 uint64_t state_hash_message_id,
1301 size_t modifier_count,
1302 const struct GNUNET_ENV_Modifier *modifiers,
1303 GNUNET_PSYCSTORE_ResultCallback rcb,
1306 struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
1309 for (i = 0; i < modifier_count; i++) {
1310 struct StateSyncRequest *req;
1311 uint16_t name_size = strlen (modifiers[i].name) + 1;
1313 op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size +
1314 modifiers[i].value_size);
1319 req = (struct StateSyncRequest *) &op[1];
1320 op->msg = (struct GNUNET_MessageHeader *) req;
1321 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC);
1322 req->header.size = htons (sizeof (*req) + name_size
1323 + modifiers[i].value_size);
1324 req->channel_key = *channel_key;
1325 req->max_state_message_id = GNUNET_htonll (max_state_message_id);
1326 req->state_hash_message_id = GNUNET_htonll (state_hash_message_id);
1327 req->name_size = htons (name_size);
1331 : (modifier_count - 1 == i)
1335 memcpy (&req[1], modifiers[i].name, name_size);
1336 memcpy ((char *) &req[1] + name_size, modifiers[i].value, modifiers[i].value_size);
1338 op->op_id = get_next_op_id (h);
1339 req->op_id = GNUNET_htonll (op->op_id);
1341 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1349 * Reset the state of a channel.
1351 * Delete all state variables stored for the given channel.
1354 * Handle for the PSYCstore.
1355 * @param channel_key
1356 * The channel we are interested in.
1358 * Callback to call with the result of the operation.
1360 * Closure for the callback.
1362 * @return Handle that can be used to cancel the operation.
1364 struct GNUNET_PSYCSTORE_OperationHandle *
1365 GNUNET_PSYCSTORE_state_reset (struct GNUNET_PSYCSTORE_Handle *h,
1366 const struct GNUNET_CRYPTO_EddsaPublicKey
1368 GNUNET_PSYCSTORE_ResultCallback rcb,
1371 struct OperationRequest *req;
1372 struct GNUNET_PSYCSTORE_OperationHandle *
1373 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
1378 req = (struct OperationRequest *) &op[1];
1379 op->msg = (struct GNUNET_MessageHeader *) req;
1380 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_RESET);
1381 req->header.size = htons (sizeof (*req));
1382 req->channel_key = *channel_key;
1384 op->op_id = get_next_op_id (h);
1385 req->op_id = GNUNET_htonll (op->op_id);
1387 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1396 * Update signed values of state variables in the state store.
1399 * Handle for the PSYCstore.
1400 * @param channel_key
1401 * The channel we are interested in.
1403 * Message ID that contained the state @a hash.
1405 * Hash of the serialized full state.
1407 * Callback to call with the result of the operation.
1409 * Closure for the callback.
1411 struct GNUNET_PSYCSTORE_OperationHandle *
1412 GNUNET_PSYCSTORE_state_hash_update (struct GNUNET_PSYCSTORE_Handle *h,
1413 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1414 uint64_t message_id,
1415 const struct GNUNET_HashCode *hash,
1416 GNUNET_PSYCSTORE_ResultCallback rcb,
1419 struct StateHashUpdateRequest *req;
1420 struct GNUNET_PSYCSTORE_OperationHandle *
1421 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
1426 req = (struct StateHashUpdateRequest *) &op[1];
1427 op->msg = (struct GNUNET_MessageHeader *) req;
1428 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_RESET);
1429 req->header.size = htons (sizeof (*req));
1430 req->channel_key = *channel_key;
1433 op->op_id = get_next_op_id (h);
1434 req->op_id = GNUNET_htonll (op->op_id);
1436 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1444 * Retrieve the best matching state variable.
1447 * Handle for the PSYCstore.
1448 * @param channel_key
1449 * The channel we are interested in.
1451 * Name of variable to match, the returned variable might be less specific.
1453 * Callback to return the matching state variable.
1455 * Callback to call with the result of the operation.
1457 * Closure for the callbacks.
1459 * @return Handle that can be used to cancel the operation.
1461 struct GNUNET_PSYCSTORE_OperationHandle *
1462 GNUNET_PSYCSTORE_state_get (struct GNUNET_PSYCSTORE_Handle *h,
1463 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1465 GNUNET_PSYCSTORE_StateCallback scb,
1466 GNUNET_PSYCSTORE_ResultCallback rcb,
1469 size_t name_size = strlen (name) + 1;
1470 struct OperationRequest *req;
1471 struct GNUNET_PSYCSTORE_OperationHandle *
1472 op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size);
1474 op->data_cb = (DataCallback) scb;
1478 req = (struct OperationRequest *) &op[1];
1479 op->msg = (struct GNUNET_MessageHeader *) req;
1480 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET);
1481 req->header.size = htons (sizeof (*req) + name_size);
1482 req->channel_key = *channel_key;
1483 memcpy (&req[1], name, name_size);
1485 op->op_id = get_next_op_id (h);
1486 req->op_id = GNUNET_htonll (op->op_id);
1488 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1497 * Retrieve all state variables for a channel with the given prefix.
1500 * Handle for the PSYCstore.
1501 * @param channel_key
1502 * The channel we are interested in.
1503 * @param name_prefix
1504 * Prefix of state variable names to match.
1506 * Callback to return matching state variables.
1508 * Callback to call with the result of the operation.
1510 * Closure for the callbacks.
1512 * @return Handle that can be used to cancel the operation.
1514 struct GNUNET_PSYCSTORE_OperationHandle *
1515 GNUNET_PSYCSTORE_state_get_prefix (struct GNUNET_PSYCSTORE_Handle *h,
1516 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1517 const char *name_prefix,
1518 GNUNET_PSYCSTORE_StateCallback scb,
1519 GNUNET_PSYCSTORE_ResultCallback rcb,
1522 size_t name_size = strlen (name_prefix) + 1;
1523 struct OperationRequest *req;
1524 struct GNUNET_PSYCSTORE_OperationHandle *
1525 op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size);
1527 op->data_cb = (DataCallback) scb;
1531 req = (struct OperationRequest *) &op[1];
1532 op->msg = (struct GNUNET_MessageHeader *) req;
1533 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET_PREFIX);
1534 req->header.size = htons (sizeof (*req) + name_size);
1535 req->channel_key = *channel_key;
1536 memcpy (&req[1], name_prefix, name_size);
1538 op->op_id = get_next_op_id (h);
1539 req->op_id = GNUNET_htonll (op->op_id);
1541 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1547 /* end of psycstore_api.c */