2 * This file is part of GNUnet
3 * Copyright (C) 2013 Christian Grothoff (and other contributing authors)
5 * GNUnet is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published
7 * by the Free Software Foundation; either version 3, or (at your
8 * option) any later version.
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with GNUnet; see the file COPYING. If not, write to the
17 * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 * Boston, MA 02110-1301, USA.
22 * @file psycstore/psycstore_api.c
23 * @brief API to interact with the PSYCstore service
24 * @author Gabor X Toth
25 * @author Christian Grothoff
31 #include "gnunet_util_lib.h"
32 #include "gnunet_constants.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_psycstore_service.h"
35 #include "gnunet_multicast_service.h"
36 #include "psycstore.h"
38 #define LOG(kind,...) GNUNET_log_from (kind, "psycstore-api",__VA_ARGS__)
40 typedef void (*DataCallback) ();
43 * Handle for an operation with the PSYCstore service.
45 struct GNUNET_PSYCSTORE_OperationHandle
49 * Main PSYCstore handle.
51 struct GNUNET_PSYCSTORE_Handle *h;
54 * We keep operations in a DLL.
56 struct GNUNET_PSYCSTORE_OperationHandle *next;
59 * We keep operations in a DLL.
61 struct GNUNET_PSYCSTORE_OperationHandle *prev;
64 * Continuation to invoke with the result of an operation.
66 GNUNET_PSYCSTORE_ResultCallback res_cb;
69 * Continuation to invoke with the result of an operation returning data.
74 * Closure for the callbacks.
84 * Message to send to the PSYCstore service.
85 * Allocated at the end of this struct.
87 const struct GNUNET_MessageHeader *msg;
92 * Handle for the service.
94 struct GNUNET_PSYCSTORE_Handle
97 * Configuration to use.
99 const struct GNUNET_CONFIGURATION_Handle *cfg;
102 * Socket (if available).
104 struct GNUNET_CLIENT_Connection *client;
107 * Head of operations to transmit.
109 struct GNUNET_PSYCSTORE_OperationHandle *transmit_head;
112 * Tail of operations to transmit.
114 struct GNUNET_PSYCSTORE_OperationHandle *transmit_tail;
117 * Head of active operations waiting for response.
119 struct GNUNET_PSYCSTORE_OperationHandle *op_head;
122 * Tail of active operations waiting for response.
124 struct GNUNET_PSYCSTORE_OperationHandle *op_tail;
127 * Currently pending transmission request, or NULL for none.
129 struct GNUNET_CLIENT_TransmitHandle *th;
132 * Task doing exponential back-off trying to reconnect.
134 struct GNUNET_SCHEDULER_Task * reconnect_task;
137 * Time for next connect retry.
139 struct GNUNET_TIME_Relative reconnect_delay;
142 * Last operation ID used.
147 * Are we polling for incoming messages right now?
154 * Get a fresh operation ID to distinguish between PSYCstore requests.
156 * @param h Handle to the PSYCstore service.
157 * @return next operation id to use
160 get_next_op_id (struct GNUNET_PSYCSTORE_Handle *h)
162 return h->last_op_id++;
167 * Find operation by ID.
169 * @return OperationHandle if found, or NULL otherwise.
171 static struct GNUNET_PSYCSTORE_OperationHandle *
172 find_op_by_id (struct GNUNET_PSYCSTORE_Handle *h, uint64_t op_id)
174 struct GNUNET_PSYCSTORE_OperationHandle *op = h->op_head;
177 if (op->op_id == op_id)
186 * Try again to connect to the PSYCstore service.
188 * @param cls handle to the PSYCstore service.
189 * @param tc scheduler context
192 reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
196 * Reschedule a connect attempt to the service.
198 * @param h transport service to reconnect
201 reschedule_connect (struct GNUNET_PSYCSTORE_Handle *h)
203 GNUNET_assert (h->reconnect_task == NULL);
207 GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
210 if (NULL != h->client)
212 GNUNET_CLIENT_disconnect (h->client);
215 h->in_receive = GNUNET_NO;
216 LOG (GNUNET_ERROR_TYPE_DEBUG,
217 "Scheduling task to reconnect to PSYCstore service in %s.\n",
218 GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, GNUNET_YES));
220 GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, &reconnect, h);
221 h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
226 * Schedule transmission of the next message from our queue.
228 * @param h PSYCstore handle
231 transmit_next (struct GNUNET_PSYCSTORE_Handle *h);
235 * Type of a function to call when we receive a message
239 * @param msg message received, NULL on timeout or fatal error
242 message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
244 struct GNUNET_PSYCSTORE_Handle *h = cls;
245 struct GNUNET_PSYCSTORE_OperationHandle *op;
246 const struct OperationResult *opres;
247 const struct CountersResult *cres;
248 const struct FragmentResult *fres;
249 const struct StateResult *sres;
254 reschedule_connect (h);
257 LOG (GNUNET_ERROR_TYPE_DEBUG,
258 "Received message of type %d from PSYCstore service.\n",
260 uint16_t size = ntohs (msg->size);
261 uint16_t type = ntohs (msg->type);
264 case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE:
265 if (size < sizeof (struct OperationResult))
267 LOG (GNUNET_ERROR_TYPE_ERROR,
268 "Received message of type %d with length %lu bytes. "
270 type, size, sizeof (struct OperationResult));
272 reschedule_connect (h);
276 opres = (const struct OperationResult *) msg;
277 str = (const char *) &opres[1];
278 if ( (size > sizeof (struct OperationResult)) &&
279 ('\0' != str[size - sizeof (struct OperationResult) - 1]) )
282 reschedule_connect (h);
285 if (size == sizeof (struct OperationResult))
288 op = find_op_by_id (h, GNUNET_ntohll (opres->op_id));
291 LOG (GNUNET_ERROR_TYPE_DEBUG,
292 "No callback registered for operation with ID %" PRIu64 ".\n",
293 type, GNUNET_ntohll (opres->op_id));
297 LOG (GNUNET_ERROR_TYPE_DEBUG,
298 "Received result message (type %d) with operation ID: %" PRIu64 "\n",
301 int64_t result_code = GNUNET_ntohll (opres->result_code) + INT64_MIN;
302 GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op);
303 if (NULL != op->res_cb)
305 const struct StateSyncRequest *ssreq;
306 switch (ntohs (op->msg->type))
308 case GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC:
309 ssreq = (const struct StateSyncRequest *) op->msg;
310 if (!(ssreq->flags & STATE_OP_LAST
311 || GNUNET_OK != result_code))
316 if (NULL != op->res_cb)
317 op->res_cb (op->cls, result_code, str, size - sizeof (*opres));
322 case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS:
323 if (size != sizeof (struct CountersResult))
325 LOG (GNUNET_ERROR_TYPE_ERROR,
326 "Received message of type %d with length %lu bytes. "
328 type, size, sizeof (struct CountersResult));
330 reschedule_connect (h);
334 cres = (const struct CountersResult *) msg;
336 op = find_op_by_id (h, GNUNET_ntohll (cres->op_id));
339 LOG (GNUNET_ERROR_TYPE_DEBUG,
340 "No callback registered for operation with ID %" PRIu64 ".\n",
341 type, GNUNET_ntohll (cres->op_id));
345 GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op);
346 if (NULL != op->data_cb)
347 ((GNUNET_PSYCSTORE_CountersCallback)
348 op->data_cb) (op->cls,
349 ntohl (cres->result_code),
350 GNUNET_ntohll (cres->max_fragment_id),
351 GNUNET_ntohll (cres->max_message_id),
352 GNUNET_ntohll (cres->max_group_generation),
353 GNUNET_ntohll (cres->max_state_message_id));
358 case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_FRAGMENT:
359 if (size < sizeof (struct FragmentResult))
361 LOG (GNUNET_ERROR_TYPE_ERROR,
362 "Received message of type %d with length %lu bytes. "
364 type, size, sizeof (struct FragmentResult));
366 reschedule_connect (h);
370 fres = (const struct FragmentResult *) msg;
371 struct GNUNET_MULTICAST_MessageHeader *mmsg =
372 (struct GNUNET_MULTICAST_MessageHeader *) &fres[1];
373 if (size != sizeof (struct FragmentResult) + ntohs (mmsg->header.size))
375 LOG (GNUNET_ERROR_TYPE_ERROR,
376 "Received message of type %d with length %lu bytes. "
379 sizeof (struct FragmentResult) + ntohs (mmsg->header.size));
381 reschedule_connect (h);
385 op = find_op_by_id (h, GNUNET_ntohll (fres->op_id));
388 LOG (GNUNET_ERROR_TYPE_DEBUG,
389 "No callback registered for operation with ID %" PRIu64 ".\n",
390 type, GNUNET_ntohll (fres->op_id));
394 if (NULL != op->data_cb)
395 ((GNUNET_PSYCSTORE_FragmentCallback)
396 op->data_cb) (op->cls, mmsg, ntohl (fres->psycstore_flags));
400 case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_STATE:
401 if (size < sizeof (struct StateResult))
403 LOG (GNUNET_ERROR_TYPE_ERROR,
404 "Received message of type %d with length %lu bytes. "
406 type, size, sizeof (struct StateResult));
408 reschedule_connect (h);
412 sres = (const struct StateResult *) msg;
413 const char *name = (const char *) &sres[1];
414 uint16_t name_size = ntohs (sres->name_size);
416 if (name_size <= 2 || '\0' != name[name_size - 1])
418 LOG (GNUNET_ERROR_TYPE_ERROR,
419 "Received state result message (type %d) with invalid name.\n",
422 reschedule_connect (h);
426 op = find_op_by_id (h, GNUNET_ntohll (sres->op_id));
429 LOG (GNUNET_ERROR_TYPE_DEBUG,
430 "No callback registered for operation with ID %" PRIu64 ".\n",
431 type, GNUNET_ntohll (sres->op_id));
435 if (NULL != op->data_cb)
436 ((GNUNET_PSYCSTORE_StateCallback)
437 op->data_cb) (op->cls, name, (char *) &sres[1] + name_size,
438 ntohs (sres->header.size) - sizeof (*sres) - name_size);
444 reschedule_connect (h);
448 GNUNET_CLIENT_receive (h->client, &message_handler, h,
449 GNUNET_TIME_UNIT_FOREVER_REL);
454 * Transmit next message to service.
456 * @param cls The 'struct GNUNET_PSYCSTORE_Handle'.
457 * @param size Number of bytes available in buf.
458 * @param buf Where to copy the message.
459 * @return Number of bytes copied to buf.
462 send_next_message (void *cls, size_t size, void *buf)
464 struct GNUNET_PSYCSTORE_Handle *h = cls;
465 struct GNUNET_PSYCSTORE_OperationHandle *op = h->transmit_head;
471 ret = ntohs (op->msg->size);
474 reschedule_connect (h);
477 LOG (GNUNET_ERROR_TYPE_DEBUG,
478 "Sending message of type %d to PSYCstore service. ID: %" PRIu64 "\n",
479 ntohs (op->msg->type), op->op_id);
480 memcpy (buf, op->msg, ret);
482 GNUNET_CONTAINER_DLL_remove (h->transmit_head, h->transmit_tail, op);
484 if (NULL == op->res_cb && NULL == op->data_cb)
490 GNUNET_CONTAINER_DLL_insert_tail (h->op_head, h->op_tail, op);
493 if (NULL != h->transmit_head)
496 if (GNUNET_NO == h->in_receive)
498 h->in_receive = GNUNET_YES;
499 GNUNET_CLIENT_receive (h->client, &message_handler, h,
500 GNUNET_TIME_UNIT_FOREVER_REL);
507 * Schedule transmission of the next message from our queue.
509 * @param h PSYCstore handle.
512 transmit_next (struct GNUNET_PSYCSTORE_Handle *h)
514 if (NULL != h->th || NULL == h->client)
517 struct GNUNET_PSYCSTORE_OperationHandle *op = h->transmit_head;
521 h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
522 ntohs (op->msg->size),
523 GNUNET_TIME_UNIT_FOREVER_REL,
531 * Try again to connect to the PSYCstore service.
533 * @param cls Handle to the PSYCstore service.
534 * @param tc Scheduler context.
537 reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
539 struct GNUNET_PSYCSTORE_Handle *h = cls;
541 h->reconnect_task = NULL;
542 LOG (GNUNET_ERROR_TYPE_DEBUG,
543 "Connecting to PSYCstore service.\n");
544 GNUNET_assert (NULL == h->client);
545 h->client = GNUNET_CLIENT_connect ("psycstore", h->cfg);
546 GNUNET_assert (NULL != h->client);
552 * Connect to the PSYCstore service.
554 * @param cfg The configuration to use
555 * @return Handle to use
557 struct GNUNET_PSYCSTORE_Handle *
558 GNUNET_PSYCSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
560 struct GNUNET_PSYCSTORE_Handle *h
561 = GNUNET_new (struct GNUNET_PSYCSTORE_Handle);
563 h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
564 h->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, h);
570 * Disconnect from PSYCstore service
572 * @param h Handle to destroy
575 GNUNET_PSYCSTORE_disconnect (struct GNUNET_PSYCSTORE_Handle *h)
577 GNUNET_assert (NULL != h);
578 if (h->reconnect_task != NULL)
580 GNUNET_SCHEDULER_cancel (h->reconnect_task);
581 h->reconnect_task = NULL;
585 GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
588 if (NULL != h->client)
590 GNUNET_CLIENT_disconnect (h->client);
598 * Cancel a PSYCstore operation. Note that the operation MAY still
599 * be executed; this merely cancels the continuation; if the request
600 * was already transmitted, the service may still choose to complete
603 * @param op Operation to cancel.
606 GNUNET_PSYCSTORE_operation_cancel (struct GNUNET_PSYCSTORE_OperationHandle *op)
608 struct GNUNET_PSYCSTORE_Handle *h = op->h;
610 if (h->transmit_head != NULL && (h->transmit_head != op || NULL == h->client))
612 /* request not active, can simply remove */
613 GNUNET_CONTAINER_DLL_remove (h->transmit_head, h->transmit_tail, op);
619 /* request active but not yet with service, can still abort */
620 GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
622 GNUNET_CONTAINER_DLL_remove (h->transmit_head, h->transmit_tail, op);
627 /* request active with service, simply ensure continuations are not called */
634 * Store join/leave events for a PSYC channel in order to be able to answer
635 * membership test queries later.
638 * Handle for the PSYCstore.
640 * The channel where the event happened.
642 * Public key of joining/leaving slave.
644 * #GNUNET_YES on join, #GNUNET_NO on part.
645 * @param announced_at
646 * ID of the message that announced the membership change.
647 * @param effective_since
648 * Message ID this membership change is in effect since.
649 * For joins it is <= announced_at, for parts it is always 0.
650 * @param group_generation
651 * In case of a part, the last group generation the slave has access to.
652 * It has relevance when a larger message have fragments with different
655 * Callback to call with the result of the storage operation.
657 * Closure for the callback.
659 * @return Operation handle that can be used to cancel the operation.
661 struct GNUNET_PSYCSTORE_OperationHandle *
662 GNUNET_PSYCSTORE_membership_store (struct GNUNET_PSYCSTORE_Handle *h,
663 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
664 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
666 uint64_t announced_at,
667 uint64_t effective_since,
668 uint64_t group_generation,
669 GNUNET_PSYCSTORE_ResultCallback rcb,
672 GNUNET_assert (NULL != h);
673 GNUNET_assert (NULL != channel_key);
674 GNUNET_assert (NULL != slave_key);
675 GNUNET_assert (GNUNET_YES == did_join || GNUNET_NO == did_join);
676 GNUNET_assert (did_join
677 ? effective_since <= announced_at
678 : effective_since == 0);
680 struct MembershipStoreRequest *req;
681 struct GNUNET_PSYCSTORE_OperationHandle *
682 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
687 req = (struct MembershipStoreRequest *) &op[1];
688 op->msg = (struct GNUNET_MessageHeader *) req;
689 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_STORE);
690 req->header.size = htons (sizeof (*req));
691 req->channel_key = *channel_key;
692 req->slave_key = *slave_key;
693 req->did_join = did_join;
694 req->announced_at = GNUNET_htonll (announced_at);
695 req->effective_since = GNUNET_htonll (effective_since);
696 req->group_generation = GNUNET_htonll (group_generation);
698 op->op_id = get_next_op_id (h);
699 req->op_id = GNUNET_htonll (op->op_id);
701 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
709 * Test if a member was admitted to the channel at the given message ID.
711 * This is useful when relaying and replaying messages to check if a particular
712 * slave has access to the message fragment with a given group generation. It
713 * is also used when handling join requests to determine whether the slave is
714 * currently admitted to the channel.
717 * Handle for the PSYCstore.
719 * The channel we are interested in.
721 * Public key of slave whose membership to check.
723 * Message ID for which to do the membership test.
724 * @param group_generation
725 * Group generation of the fragment of the message to test.
726 * It has relevance if the message consists of multiple fragments with
727 * different group generations.
729 * Callback to call with the test result.
731 * Closure for the callback.
733 * @return Operation handle that can be used to cancel the operation.
735 struct GNUNET_PSYCSTORE_OperationHandle *
736 GNUNET_PSYCSTORE_membership_test (struct GNUNET_PSYCSTORE_Handle *h,
737 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
738 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
740 uint64_t group_generation,
741 GNUNET_PSYCSTORE_ResultCallback rcb,
744 struct MembershipTestRequest *req;
745 struct GNUNET_PSYCSTORE_OperationHandle *
746 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
751 req = (struct MembershipTestRequest *) &op[1];
752 op->msg = (struct GNUNET_MessageHeader *) req;
753 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_TEST);
754 req->header.size = htons (sizeof (*req));
755 req->channel_key = *channel_key;
756 req->slave_key = *slave_key;
757 req->message_id = GNUNET_htonll (message_id);
758 req->group_generation = GNUNET_htonll (group_generation);
760 op->op_id = get_next_op_id (h);
761 req->op_id = GNUNET_htonll (op->op_id);
763 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
771 * Store a message fragment sent to a channel.
773 * @param h Handle for the PSYCstore.
774 * @param channel_key The channel the message belongs to.
775 * @param message Message to store.
776 * @param psycstore_flags Flags indicating whether the PSYC message contains
778 * @param rcb Callback to call with the result of the operation.
779 * @param rcb_cls Closure for the callback.
781 * @return Handle that can be used to cancel the operation.
783 struct GNUNET_PSYCSTORE_OperationHandle *
784 GNUNET_PSYCSTORE_fragment_store (struct GNUNET_PSYCSTORE_Handle *h,
785 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
786 const struct GNUNET_MULTICAST_MessageHeader *msg,
787 enum GNUNET_PSYCSTORE_MessageFlags psycstore_flags,
788 GNUNET_PSYCSTORE_ResultCallback rcb,
791 uint16_t size = ntohs (msg->header.size);
792 struct FragmentStoreRequest *req;
793 struct GNUNET_PSYCSTORE_OperationHandle *
794 op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + size);
799 req = (struct FragmentStoreRequest *) &op[1];
800 op->msg = (struct GNUNET_MessageHeader *) req;
801 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_STORE);
802 req->header.size = htons (sizeof (*req) + size);
803 req->channel_key = *channel_key;
804 req->psycstore_flags = htonl (psycstore_flags);
805 memcpy (&req[1], msg, size);
807 op->op_id = get_next_op_id (h);
808 req->op_id = GNUNET_htonll (op->op_id);
810 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
818 * Retrieve message fragments by fragment ID range.
821 * Handle for the PSYCstore.
823 * The channel we are interested in.
825 * The slave requesting the fragment. If not NULL, a membership test is
826 * performed first and the fragment is only returned if the slave has
828 * @param first_fragment_id
829 * First fragment ID to retrieve.
830 * Use 0 to get the latest message fragment.
831 * @param last_fragment_id
832 * Last consecutive fragment ID to retrieve.
833 * Use 0 to get the latest message fragment.
834 * @param fragment_limit
835 * Maximum number of fragments to retrieve.
837 * Callback to call with the retrieved fragments.
839 * Callback to call with the result of the operation.
841 * Closure for the callbacks.
843 * @return Handle that can be used to cancel the operation.
845 struct GNUNET_PSYCSTORE_OperationHandle *
846 GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h,
847 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
848 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
849 uint64_t first_fragment_id,
850 uint64_t last_fragment_id,
851 GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
852 GNUNET_PSYCSTORE_ResultCallback rcb,
855 struct FragmentGetRequest *req;
856 struct GNUNET_PSYCSTORE_OperationHandle *
857 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
859 op->data_cb = (DataCallback) fragment_cb;
863 req = (struct FragmentGetRequest *) &op[1];
864 op->msg = (struct GNUNET_MessageHeader *) req;
865 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET);
866 req->header.size = htons (sizeof (*req));
867 req->channel_key = *channel_key;
868 req->first_fragment_id = GNUNET_htonll (first_fragment_id);
869 req->last_fragment_id = GNUNET_htonll (last_fragment_id);
870 if (NULL != slave_key)
872 req->slave_key = *slave_key;
873 req->do_membership_test = GNUNET_YES;
876 op->op_id = get_next_op_id (h);
877 req->op_id = GNUNET_htonll (op->op_id);
879 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
887 * Retrieve latest message fragments.
890 * Handle for the PSYCstore.
892 * The channel we are interested in.
894 * The slave requesting the fragment. If not NULL, a membership test is
895 * performed first and the fragment is only returned if the slave has
897 * @param first_fragment_id
898 * First fragment ID to retrieve.
899 * Use 0 to get the latest message fragment.
900 * @param last_fragment_id
901 * Last consecutive fragment ID to retrieve.
902 * Use 0 to get the latest message fragment.
903 * @param fragment_limit
904 * Maximum number of fragments to retrieve.
906 * Callback to call with the retrieved fragments.
908 * Callback to call with the result of the operation.
910 * Closure for the callbacks.
912 * @return Handle that can be used to cancel the operation.
914 struct GNUNET_PSYCSTORE_OperationHandle *
915 GNUNET_PSYCSTORE_fragment_get_latest (struct GNUNET_PSYCSTORE_Handle *h,
916 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
917 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
918 uint64_t fragment_limit,
919 GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
920 GNUNET_PSYCSTORE_ResultCallback rcb,
923 struct FragmentGetRequest *req;
924 struct GNUNET_PSYCSTORE_OperationHandle *
925 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
927 op->data_cb = (DataCallback) fragment_cb;
931 req = (struct FragmentGetRequest *) &op[1];
932 op->msg = (struct GNUNET_MessageHeader *) req;
933 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET);
934 req->header.size = htons (sizeof (*req));
935 req->channel_key = *channel_key;
936 req->fragment_limit = GNUNET_ntohll (fragment_limit);
937 if (NULL != slave_key)
939 req->slave_key = *slave_key;
940 req->do_membership_test = GNUNET_YES;
943 op->op_id = get_next_op_id (h);
944 req->op_id = GNUNET_htonll (op->op_id);
946 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
954 * Retrieve all fragments of messages in a message ID range.
957 * Handle for the PSYCstore.
959 * The channel we are interested in.
961 * The slave requesting the message.
962 * If not NULL, a membership test is performed first
963 * and the message is only returned if the slave has access to it.
964 * @param first_message_id
965 * First message ID to retrieve.
966 * @param last_message_id
967 * Last consecutive message ID to retrieve.
968 * @param fragment_limit
969 * Maximum number of fragments to retrieve.
970 * @param method_prefix
971 * Retrieve only messages with a matching method prefix.
972 * @todo Implement method_prefix query.
974 * Callback to call with the retrieved fragments.
976 * Callback to call with the result of the operation.
978 * Closure for the callbacks.
980 * @return Handle that can be used to cancel the operation.
982 struct GNUNET_PSYCSTORE_OperationHandle *
983 GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h,
984 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
985 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
986 uint64_t first_message_id,
987 uint64_t last_message_id,
988 uint64_t fragment_limit,
989 const char *method_prefix,
990 GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
991 GNUNET_PSYCSTORE_ResultCallback rcb,
994 struct MessageGetRequest *req;
995 if (NULL == method_prefix)
997 uint16_t method_size = strnlen (method_prefix,
998 GNUNET_SERVER_MAX_MESSAGE_SIZE
999 - sizeof (*req)) + 1;
1001 struct GNUNET_PSYCSTORE_OperationHandle *
1002 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
1004 op->data_cb = (DataCallback) fragment_cb;
1008 req = (struct MessageGetRequest *) &op[1];
1009 op->msg = (struct GNUNET_MessageHeader *) req;
1010 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET);
1011 req->header.size = htons (sizeof (*req) + method_size);
1012 req->channel_key = *channel_key;
1013 req->first_message_id = GNUNET_htonll (first_message_id);
1014 req->last_message_id = GNUNET_htonll (last_message_id);
1015 req->fragment_limit = GNUNET_htonll (fragment_limit);
1016 if (NULL != slave_key)
1018 req->slave_key = *slave_key;
1019 req->do_membership_test = GNUNET_YES;
1021 memcpy (&req[1], method_prefix, method_size);
1022 ((char *) &req[1])[method_size - 1] = '\0';
1024 op->op_id = get_next_op_id (h);
1025 req->op_id = GNUNET_htonll (op->op_id);
1027 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1035 * Retrieve all fragments of the latest messages.
1038 * Handle for the PSYCstore.
1039 * @param channel_key
1040 * The channel we are interested in.
1042 * The slave requesting the message.
1043 * If not NULL, a membership test is performed first
1044 * and the message is only returned if the slave has access to it.
1045 * @param message_limit
1046 * Maximum number of messages to retrieve.
1047 * @param method_prefix
1048 * Retrieve only messages with a matching method prefix.
1049 * @todo Implement method_prefix query.
1050 * @param fragment_cb
1051 * Callback to call with the retrieved fragments.
1053 * Callback to call with the result of the operation.
1055 * Closure for the callbacks.
1057 * @return Handle that can be used to cancel the operation.
1059 struct GNUNET_PSYCSTORE_OperationHandle *
1060 GNUNET_PSYCSTORE_message_get_latest (struct GNUNET_PSYCSTORE_Handle *h,
1061 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1062 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
1063 uint64_t message_limit,
1064 const char *method_prefix,
1065 GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
1066 GNUNET_PSYCSTORE_ResultCallback rcb,
1069 struct MessageGetRequest *req;
1071 if (NULL == method_prefix)
1073 uint16_t method_size = strnlen (method_prefix,
1074 GNUNET_SERVER_MAX_MESSAGE_SIZE
1075 - sizeof (*req)) + 1;
1076 GNUNET_assert ('\0' == method_prefix[method_size - 1]);
1078 struct GNUNET_PSYCSTORE_OperationHandle *
1079 op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + method_size);
1081 op->data_cb = (DataCallback) fragment_cb;
1085 req = (struct MessageGetRequest *) &op[1];
1086 op->msg = (struct GNUNET_MessageHeader *) req;
1087 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET);
1088 req->header.size = htons (sizeof (*req) + method_size);
1089 req->channel_key = *channel_key;
1090 req->message_limit = GNUNET_ntohll (message_limit);
1091 if (NULL != slave_key)
1093 req->slave_key = *slave_key;
1094 req->do_membership_test = GNUNET_YES;
1097 op->op_id = get_next_op_id (h);
1098 req->op_id = GNUNET_htonll (op->op_id);
1099 memcpy (&req[1], method_prefix, method_size);
1101 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1109 * Retrieve a fragment of message specified by its message ID and fragment
1113 * Handle for the PSYCstore.
1114 * @param channel_key
1115 * The channel we are interested in.
1117 * The slave requesting the message fragment. If not NULL, a membership
1118 * test is performed first and the message fragment is only returned
1119 * if the slave has access to it.
1121 * Message ID to retrieve. Use 0 to get the latest message.
1122 * @param fragment_offset
1123 * Offset of the fragment to retrieve.
1124 * @param fragment_cb
1125 * Callback to call with the retrieved fragments.
1127 * Callback to call with the result of the operation.
1129 * Closure for the callbacks.
1131 * @return Handle that can be used to cancel the operation.
1133 struct GNUNET_PSYCSTORE_OperationHandle *
1134 GNUNET_PSYCSTORE_message_get_fragment (struct GNUNET_PSYCSTORE_Handle *h,
1135 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1136 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
1137 uint64_t message_id,
1138 uint64_t fragment_offset,
1139 GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
1140 GNUNET_PSYCSTORE_ResultCallback rcb,
1143 struct MessageGetFragmentRequest *req;
1144 struct GNUNET_PSYCSTORE_OperationHandle *
1145 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
1147 op->data_cb = (DataCallback) fragment_cb;
1151 req = (struct MessageGetFragmentRequest *) &op[1];
1152 op->msg = (struct GNUNET_MessageHeader *) req;
1153 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET_FRAGMENT);
1154 req->header.size = htons (sizeof (*req));
1155 req->channel_key = *channel_key;
1156 req->message_id = GNUNET_htonll (message_id);
1157 req->fragment_offset = GNUNET_htonll (fragment_offset);
1158 if (NULL != slave_key)
1160 req->slave_key = *slave_key;
1161 req->do_membership_test = GNUNET_YES;
1164 op->op_id = get_next_op_id (h);
1165 req->op_id = GNUNET_htonll (op->op_id);
1167 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1175 * Retrieve latest values of counters for a channel master.
1177 * The current value of counters are needed when a channel master is restarted,
1178 * so that it can continue incrementing the counters from their last value.
1181 * Handle for the PSYCstore.
1182 * @param channel_key
1183 * Public key that identifies the channel.
1185 * Callback to call with the result.
1187 * Closure for the @a ccb callback.
1189 * @return Handle that can be used to cancel the operation.
1191 struct GNUNET_PSYCSTORE_OperationHandle *
1192 GNUNET_PSYCSTORE_counters_get (struct GNUNET_PSYCSTORE_Handle *h,
1193 struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1194 GNUNET_PSYCSTORE_CountersCallback ccb,
1197 struct OperationRequest *req;
1198 struct GNUNET_PSYCSTORE_OperationHandle *
1199 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
1204 req = (struct OperationRequest *) &op[1];
1205 op->msg = (struct GNUNET_MessageHeader *) req;
1206 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_COUNTERS_GET);
1207 req->header.size = htons (sizeof (*req));
1208 req->channel_key = *channel_key;
1210 op->op_id = get_next_op_id (h);
1211 req->op_id = GNUNET_htonll (op->op_id);
1213 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1221 * Apply modifiers of a message to the current channel state.
1223 * An error is returned if there are missing messages containing state
1224 * operations before the current one.
1227 * Handle for the PSYCstore.
1228 * @param channel_key
1229 * The channel we are interested in.
1231 * ID of the message that contains the @a modifiers.
1232 * @param state_delta
1233 * Value of the _state_delta PSYC header variable of the message.
1235 * Callback to call with the result of the operation.
1237 * Closure for the @a rcb callback.
1239 * @return Handle that can be used to cancel the operation.
1241 struct GNUNET_PSYCSTORE_OperationHandle *
1242 GNUNET_PSYCSTORE_state_modify (struct GNUNET_PSYCSTORE_Handle *h,
1243 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1244 uint64_t message_id,
1245 uint64_t state_delta,
1246 GNUNET_PSYCSTORE_ResultCallback rcb,
1249 struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
1250 struct StateModifyRequest *req;
1252 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
1257 req = (struct StateModifyRequest *) &op[1];
1258 op->msg = (struct GNUNET_MessageHeader *) req;
1259 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY);
1260 req->header.size = htons (sizeof (*req));
1261 req->channel_key = *channel_key;
1262 req->message_id = GNUNET_htonll (message_id);
1263 req->state_delta = GNUNET_htonll (state_delta);
1265 op->op_id = get_next_op_id (h);
1266 req->op_id = GNUNET_htonll (op->op_id);
1268 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1272 /* FIXME: only the last operation is returned,
1273 * operation_cancel() should be able to cancel all of them.
1279 * Store synchronized state.
1282 * Handle for the PSYCstore.
1283 * @param channel_key
1284 * The channel we are interested in.
1285 * @param max_state_message_id
1286 * ID of the last stateful message before @a state_hash_message_id.
1287 * @param state_hash_message_id
1288 * ID of the message that contains the state_hash PSYC header variable.
1289 * @param modifier_count
1290 * Number of elements in the @a modifiers array.
1292 * Full state to store.
1294 * Callback to call with the result of the operation.
1296 * Closure for the callback.
1298 * @return Handle that can be used to cancel the operation.
1300 struct GNUNET_PSYCSTORE_OperationHandle *
1301 GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h,
1302 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1303 uint64_t max_state_message_id,
1304 uint64_t state_hash_message_id,
1305 size_t modifier_count,
1306 const struct GNUNET_ENV_Modifier *modifiers,
1307 GNUNET_PSYCSTORE_ResultCallback rcb,
1310 struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
1313 for (i = 0; i < modifier_count; i++) {
1314 struct StateSyncRequest *req;
1315 uint16_t name_size = strlen (modifiers[i].name) + 1;
1317 op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size +
1318 modifiers[i].value_size);
1323 req = (struct StateSyncRequest *) &op[1];
1324 op->msg = (struct GNUNET_MessageHeader *) req;
1325 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC);
1326 req->header.size = htons (sizeof (*req) + name_size
1327 + modifiers[i].value_size);
1328 req->channel_key = *channel_key;
1329 req->max_state_message_id = GNUNET_htonll (max_state_message_id);
1330 req->state_hash_message_id = GNUNET_htonll (state_hash_message_id);
1331 req->name_size = htons (name_size);
1335 : (modifier_count - 1 == i)
1339 memcpy (&req[1], modifiers[i].name, name_size);
1340 memcpy ((char *) &req[1] + name_size, modifiers[i].value, modifiers[i].value_size);
1342 op->op_id = get_next_op_id (h);
1343 req->op_id = GNUNET_htonll (op->op_id);
1345 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1353 * Reset the state of a channel.
1355 * Delete all state variables stored for the given channel.
1358 * Handle for the PSYCstore.
1359 * @param channel_key
1360 * The channel we are interested in.
1362 * Callback to call with the result of the operation.
1364 * Closure for the callback.
1366 * @return Handle that can be used to cancel the operation.
1368 struct GNUNET_PSYCSTORE_OperationHandle *
1369 GNUNET_PSYCSTORE_state_reset (struct GNUNET_PSYCSTORE_Handle *h,
1370 const struct GNUNET_CRYPTO_EddsaPublicKey
1372 GNUNET_PSYCSTORE_ResultCallback rcb,
1375 struct OperationRequest *req;
1376 struct GNUNET_PSYCSTORE_OperationHandle *
1377 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
1382 req = (struct OperationRequest *) &op[1];
1383 op->msg = (struct GNUNET_MessageHeader *) req;
1384 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_RESET);
1385 req->header.size = htons (sizeof (*req));
1386 req->channel_key = *channel_key;
1388 op->op_id = get_next_op_id (h);
1389 req->op_id = GNUNET_htonll (op->op_id);
1391 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1400 * Update signed values of state variables in the state store.
1403 * Handle for the PSYCstore.
1404 * @param channel_key
1405 * The channel we are interested in.
1407 * Message ID that contained the state @a hash.
1409 * Hash of the serialized full state.
1411 * Callback to call with the result of the operation.
1413 * Closure for the callback.
1415 struct GNUNET_PSYCSTORE_OperationHandle *
1416 GNUNET_PSYCSTORE_state_hash_update (struct GNUNET_PSYCSTORE_Handle *h,
1417 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1418 uint64_t message_id,
1419 const struct GNUNET_HashCode *hash,
1420 GNUNET_PSYCSTORE_ResultCallback rcb,
1423 struct StateHashUpdateRequest *req;
1424 struct GNUNET_PSYCSTORE_OperationHandle *
1425 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
1430 req = (struct StateHashUpdateRequest *) &op[1];
1431 op->msg = (struct GNUNET_MessageHeader *) req;
1432 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_RESET);
1433 req->header.size = htons (sizeof (*req));
1434 req->channel_key = *channel_key;
1437 op->op_id = get_next_op_id (h);
1438 req->op_id = GNUNET_htonll (op->op_id);
1440 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1448 * Retrieve the best matching state variable.
1451 * Handle for the PSYCstore.
1452 * @param channel_key
1453 * The channel we are interested in.
1455 * Name of variable to match, the returned variable might be less specific.
1457 * Callback to return the matching state variable.
1459 * Callback to call with the result of the operation.
1461 * Closure for the callbacks.
1463 * @return Handle that can be used to cancel the operation.
1465 struct GNUNET_PSYCSTORE_OperationHandle *
1466 GNUNET_PSYCSTORE_state_get (struct GNUNET_PSYCSTORE_Handle *h,
1467 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1469 GNUNET_PSYCSTORE_StateCallback scb,
1470 GNUNET_PSYCSTORE_ResultCallback rcb,
1473 size_t name_size = strlen (name) + 1;
1474 struct OperationRequest *req;
1475 struct GNUNET_PSYCSTORE_OperationHandle *
1476 op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size);
1478 op->data_cb = (DataCallback) scb;
1482 req = (struct OperationRequest *) &op[1];
1483 op->msg = (struct GNUNET_MessageHeader *) req;
1484 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET);
1485 req->header.size = htons (sizeof (*req) + name_size);
1486 req->channel_key = *channel_key;
1487 memcpy (&req[1], name, name_size);
1489 op->op_id = get_next_op_id (h);
1490 req->op_id = GNUNET_htonll (op->op_id);
1492 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1501 * Retrieve all state variables for a channel with the given prefix.
1504 * Handle for the PSYCstore.
1505 * @param channel_key
1506 * The channel we are interested in.
1507 * @param name_prefix
1508 * Prefix of state variable names to match.
1510 * Callback to return matching state variables.
1512 * Callback to call with the result of the operation.
1514 * Closure for the callbacks.
1516 * @return Handle that can be used to cancel the operation.
1518 struct GNUNET_PSYCSTORE_OperationHandle *
1519 GNUNET_PSYCSTORE_state_get_prefix (struct GNUNET_PSYCSTORE_Handle *h,
1520 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1521 const char *name_prefix,
1522 GNUNET_PSYCSTORE_StateCallback scb,
1523 GNUNET_PSYCSTORE_ResultCallback rcb,
1526 size_t name_size = strlen (name_prefix) + 1;
1527 struct OperationRequest *req;
1528 struct GNUNET_PSYCSTORE_OperationHandle *
1529 op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size);
1531 op->data_cb = (DataCallback) scb;
1535 req = (struct OperationRequest *) &op[1];
1536 op->msg = (struct GNUNET_MessageHeader *) req;
1537 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET_PREFIX);
1538 req->header.size = htons (sizeof (*req) + name_size);
1539 req->channel_key = *channel_key;
1540 memcpy (&req[1], name_prefix, name_size);
1542 op->op_id = get_next_op_id (h);
1543 req->op_id = GNUNET_htonll (op->op_id);
1545 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1551 /* end of psycstore_api.c */