2 * This file is part of GNUnet
3 * (C) 2013 Christian Grothoff (and other contributing authors)
5 * GNUnet is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published
7 * by the Free Software Foundation; either version 3, or (at your
8 * option) any later version.
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with GNUnet; see the file COPYING. If not, write to the
17 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 * Boston, MA 02111-1307, USA.
22 * @file psycstore/psycstore_api.c
23 * @brief API to interact with the PSYCstore service
24 * @author Gabor X Toth
25 * @author Christian Grothoff
29 #include "gnunet_util_lib.h"
30 #include "gnunet_constants.h"
31 #include "gnunet_protocols.h"
32 #include "gnunet_psycstore_service.h"
33 #include "gnunet_multicast_service.h"
34 #include "psycstore.h"
36 #define LOG(kind,...) GNUNET_log_from (kind, "psycstore-api",__VA_ARGS__)
38 typedef void (*DataCallback) ();
41 * Handle for an operation with the PSYCstore service.
43 struct GNUNET_PSYCSTORE_OperationHandle
47 * Main PSYCstore handle.
49 struct GNUNET_PSYCSTORE_Handle *h;
52 * We keep operations in a DLL.
54 struct GNUNET_PSYCSTORE_OperationHandle *next;
57 * We keep operations in a DLL.
59 struct GNUNET_PSYCSTORE_OperationHandle *prev;
62 * Continuation to invoke with the result of an operation.
64 GNUNET_PSYCSTORE_ResultCallback res_cb;
67 * Continuation to invoke with the result of an operation returning data.
72 * Closure for the callbacks.
82 * Message to send to the PSYCstore service.
83 * Allocated at the end of this struct.
85 const struct GNUNET_MessageHeader *msg;
90 * Handle for the service.
92 struct GNUNET_PSYCSTORE_Handle
95 * Configuration to use.
97 const struct GNUNET_CONFIGURATION_Handle *cfg;
100 * Socket (if available).
102 struct GNUNET_CLIENT_Connection *client;
105 * Head of operations to transmit.
107 struct GNUNET_PSYCSTORE_OperationHandle *transmit_head;
110 * Tail of operations to transmit.
112 struct GNUNET_PSYCSTORE_OperationHandle *transmit_tail;
115 * Head of active operations waiting for response.
117 struct GNUNET_PSYCSTORE_OperationHandle *op_head;
120 * Tail of active operations waiting for response.
122 struct GNUNET_PSYCSTORE_OperationHandle *op_tail;
125 * Currently pending transmission request, or NULL for none.
127 struct GNUNET_CLIENT_TransmitHandle *th;
130 * Task doing exponential back-off trying to reconnect.
132 GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
135 * Time for next connect retry.
137 struct GNUNET_TIME_Relative reconnect_delay;
140 * Are we polling for incoming messages right now?
145 * The last operation id used for a PSYCstore operation.
147 uint32_t last_op_id_used;
153 * Get a fresh operation ID to distinguish between PSYCstore requests.
155 * @param h Handle to the PSYCstore service.
156 * @return next operation id to use
159 get_next_op_id (struct GNUNET_PSYCSTORE_Handle *h)
161 return h->last_op_id_used++;
166 * Find operation by ID.
168 * @return OperationHandle if found, or NULL otherwise.
170 static struct GNUNET_PSYCSTORE_OperationHandle *
171 find_op_by_id (struct GNUNET_PSYCSTORE_Handle *h, uint32_t op_id)
173 struct GNUNET_PSYCSTORE_OperationHandle *op = h->op_head;
176 if (op->op_id == op_id)
185 * Try again to connect to the PSYCstore service.
187 * @param cls handle to the PSYCstore service.
188 * @param tc scheduler context
191 reconnect (void *cls,
192 const struct GNUNET_SCHEDULER_TaskContext *tc);
196 * Reschedule a connect attempt to the service.
198 * @param h transport service to reconnect
201 reschedule_connect (struct GNUNET_PSYCSTORE_Handle *h)
203 GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
207 GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
210 if (NULL != h->client)
212 GNUNET_CLIENT_disconnect (h->client);
215 h->in_receive = GNUNET_NO;
216 LOG (GNUNET_ERROR_TYPE_DEBUG,
217 "Scheduling task to reconnect to PSYCstore service in %s.\n",
218 GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, GNUNET_YES));
220 GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, &reconnect, h);
221 h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
226 * Schedule transmission of the next message from our queue.
228 * @param h PSYCstore handle
231 transmit_next (struct GNUNET_PSYCSTORE_Handle *h);
235 * Type of a function to call when we receive a message
239 * @param msg message received, NULL on timeout or fatal error
242 message_handler (void *cls,
243 const struct GNUNET_MessageHeader *msg)
245 struct GNUNET_PSYCSTORE_Handle *h = cls;
246 struct GNUNET_PSYCSTORE_OperationHandle *op;
247 const struct OperationResult *opres;
248 const struct MasterCountersResult *mcres;
249 const struct SlaveCountersResult *scres;
250 const struct FragmentResult *fres;
251 const struct StateResult *sres;
257 reschedule_connect (h);
260 LOG (GNUNET_ERROR_TYPE_DEBUG,
261 "Received message of type %d from PSYCstore service\n",
263 size = ntohs (msg->size);
264 switch (ntohs (msg->type))
266 case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE:
267 if (size < sizeof (struct OperationResult))
269 LOG (GNUNET_ERROR_TYPE_ERROR,
270 "Received message of type %d with length %lu bytes. "
272 ntohs (msg->type), size, sizeof (struct OperationResult));
274 reschedule_connect (h);
278 opres = (const struct OperationResult *) msg;
279 str = (const char *) &opres[1];
280 if ( (size > sizeof (struct OperationResult)) &&
281 ('\0' != str[size - sizeof (struct OperationResult) - 1]) )
284 reschedule_connect (h);
287 if (size == sizeof (struct OperationResult))
290 op = find_op_by_id (h, ntohl (opres->op_id));
293 LOG (GNUNET_ERROR_TYPE_ERROR,
294 "Received result of an unkown operation ID: %ld\n",
295 ntohl (opres->op_id));
299 GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op);
300 if (NULL != op->res_cb)
302 const struct StateModifyRequest *smreq;
303 const struct StateSyncRequest *ssreq;
304 switch (ntohs (op->msg->type))
306 case GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY:
307 smreq = (const struct StateModifyRequest *) op->msg;
308 if (!(smreq->flags & STATE_OP_LAST
309 || GNUNET_OK != ntohl (opres->result_code)))
312 case GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC:
313 ssreq = (const struct StateSyncRequest *) op->msg;
314 if (!(ssreq->flags & STATE_OP_LAST
315 || GNUNET_OK != ntohl (opres->result_code)))
320 if (NULL != op->res_cb)
321 op->res_cb (op->cls, ntohl (opres->result_code), str);
326 case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS_MASTER:
327 if (size != sizeof (struct MasterCountersResult))
329 LOG (GNUNET_ERROR_TYPE_ERROR,
330 "Received message of type %d with length %lu bytes. "
332 ntohs (msg->type), size, sizeof (struct MasterCountersResult));
334 reschedule_connect (h);
338 mcres = (const struct MasterCountersResult *) msg;
340 op = find_op_by_id (h, ntohl (mcres->op_id));
343 LOG (GNUNET_ERROR_TYPE_ERROR,
344 "Received result of an unkown operation ID: %ld\n",
345 ntohl (mcres->op_id));
349 GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op);
350 if (NULL != op->data_cb)
351 ((GNUNET_PSYCSTORE_MasterCountersCallback)
352 op->data_cb) (op->cls,
353 GNUNET_ntohll (mcres->fragment_id),
354 GNUNET_ntohll (mcres->message_id),
355 GNUNET_ntohll (mcres->group_generation));
360 case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS_SLAVE:
361 if (size != sizeof (struct SlaveCountersResult))
363 LOG (GNUNET_ERROR_TYPE_ERROR,
364 "Received message of type %d with length %lu bytes. "
366 ntohs (msg->type), size, sizeof (struct SlaveCountersResult));
368 reschedule_connect (h);
372 scres = (const struct SlaveCountersResult *) msg;
374 op = find_op_by_id (h, ntohl (scres->op_id));
377 LOG (GNUNET_ERROR_TYPE_ERROR,
378 "Received result of an unkown operation ID: %ld\n",
379 ntohl (scres->op_id));
383 GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op);
384 if (NULL != op->data_cb)
385 ((GNUNET_PSYCSTORE_SlaveCountersCallback)
386 op->data_cb) (op->cls, GNUNET_ntohll (scres->max_known_msg_id));
391 case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_FRAGMENT:
392 if (size < sizeof (struct FragmentResult))
394 LOG (GNUNET_ERROR_TYPE_ERROR,
395 "Received message of type %d with length %lu bytes. "
397 ntohs (msg->type), size, sizeof (struct FragmentResult));
399 reschedule_connect (h);
403 fres = (const struct FragmentResult *) msg;
404 struct GNUNET_MULTICAST_MessageHeader *mmsg =
405 (struct GNUNET_MULTICAST_MessageHeader *) &fres[1];
406 if (size != sizeof (struct FragmentResult) + ntohs (mmsg->header.size))
408 LOG (GNUNET_ERROR_TYPE_ERROR,
409 "Received message of type %d with length %lu bytes. "
411 ntohs (msg->type), size,
412 sizeof (struct FragmentResult) + ntohs (mmsg->header.size));
414 reschedule_connect (h);
418 op = find_op_by_id (h, ntohl (fres->op_id));
421 LOG (GNUNET_ERROR_TYPE_ERROR,
422 "Received result of an unkown operation ID: %ld\n",
423 ntohl (fres->op_id));
427 if (NULL != op->data_cb)
428 ((GNUNET_PSYCSTORE_FragmentCallback)
429 op->data_cb) (op->cls, mmsg, ntohl (fres->psycstore_flags));
433 case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_STATE:
434 if (size < sizeof (struct StateResult))
436 LOG (GNUNET_ERROR_TYPE_ERROR,
437 "Received message of type %d with length %lu bytes. "
439 ntohs (msg->type), size, sizeof (struct StateResult));
441 reschedule_connect (h);
445 sres = (const struct StateResult *) msg;
446 const char *name = (const char *) &sres[1];
447 uint16_t name_size = ntohs (sres->name_size);
449 if (name_size <= 2 || '\0' != name[name_size - 1])
451 LOG (GNUNET_ERROR_TYPE_ERROR,
452 "Received state result message (type %d) with invalid name.\n",
453 ntohs (msg->type), name_size, name);
455 reschedule_connect (h);
459 op = find_op_by_id (h, ntohl (sres->op_id));
462 LOG (GNUNET_ERROR_TYPE_ERROR,
463 "Received result of an unkown operation ID: %ld\n",
464 ntohl (sres->op_id));
468 if (NULL != op->data_cb)
469 ((GNUNET_PSYCSTORE_StateCallback)
470 op->data_cb) (op->cls, name, (void *) &sres[1] + name_size,
471 ntohs (sres->header.size) - sizeof (*sres) - name_size);
477 reschedule_connect (h);
481 GNUNET_CLIENT_receive (h->client, &message_handler, h,
482 GNUNET_TIME_UNIT_FOREVER_REL);
487 * Transmit next message to service.
489 * @param cls The 'struct GNUNET_PSYCSTORE_Handle'.
490 * @param size Number of bytes available in buf.
491 * @param buf Where to copy the message.
492 * @return Number of bytes copied to buf.
495 send_next_message (void *cls, size_t size, void *buf)
497 struct GNUNET_PSYCSTORE_Handle *h = cls;
498 struct GNUNET_PSYCSTORE_OperationHandle *op = h->transmit_head;
504 ret = ntohs (op->msg->size);
507 reschedule_connect (h);
510 LOG (GNUNET_ERROR_TYPE_DEBUG,
511 "Sending message of type %d to PSYCstore service\n",
512 ntohs (op->msg->type));
513 memcpy (buf, op->msg, ret);
515 GNUNET_CONTAINER_DLL_remove (h->transmit_head, h->transmit_tail, op);
517 if (NULL == op->res_cb && NULL == op->data_cb)
523 GNUNET_CONTAINER_DLL_insert_tail (h->op_head, h->op_tail, op);
526 if (NULL != h->transmit_head)
529 if (GNUNET_NO == h->in_receive)
531 h->in_receive = GNUNET_YES;
532 GNUNET_CLIENT_receive (h->client, &message_handler, h,
533 GNUNET_TIME_UNIT_FOREVER_REL);
540 * Schedule transmission of the next message from our queue.
542 * @param h PSYCstore handle.
545 transmit_next (struct GNUNET_PSYCSTORE_Handle *h)
547 if (NULL != h->th || NULL == h->client)
550 struct GNUNET_PSYCSTORE_OperationHandle *op = h->transmit_head;
554 h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
555 ntohs (op->msg->size),
556 GNUNET_TIME_UNIT_FOREVER_REL,
564 * Try again to connect to the PSYCstore service.
566 * @param cls Handle to the PSYCstore service.
567 * @param tc Scheduler context.
570 reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
572 struct GNUNET_PSYCSTORE_Handle *h = cls;
574 h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
575 LOG (GNUNET_ERROR_TYPE_DEBUG,
576 "Connecting to PSYCstore service.\n");
577 GNUNET_assert (NULL == h->client);
578 h->client = GNUNET_CLIENT_connect ("psycstore", h->cfg);
579 GNUNET_assert (NULL != h->client);
585 * Connect to the PSYCstore service.
587 * @param cfg The configuration to use
588 * @return Handle to use
590 struct GNUNET_PSYCSTORE_Handle *
591 GNUNET_PSYCSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
593 struct GNUNET_PSYCSTORE_Handle *h;
595 h = GNUNET_new (struct GNUNET_PSYCSTORE_Handle);
597 h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
598 h->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, h);
604 * Disconnect from PSYCstore service
606 * @param h Handle to destroy
609 GNUNET_PSYCSTORE_disconnect (struct GNUNET_PSYCSTORE_Handle *h)
611 GNUNET_assert (NULL != h);
612 GNUNET_assert (h->op_head == h->op_tail);
613 if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
615 GNUNET_SCHEDULER_cancel (h->reconnect_task);
616 h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
620 GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
623 if (NULL != h->client)
625 GNUNET_CLIENT_disconnect (h->client);
633 * Cancel a PSYCstore operation. Note that the operation MAY still
634 * be executed; this merely cancels the continuation; if the request
635 * was already transmitted, the service may still choose to complete
638 * @param op Operation to cancel.
641 GNUNET_PSYCSTORE_operation_cancel (struct GNUNET_PSYCSTORE_OperationHandle *op)
643 struct GNUNET_PSYCSTORE_Handle *h = op->h;
645 if (h->transmit_head != NULL && (h->transmit_head != op || NULL == h->client))
647 /* request not active, can simply remove */
648 GNUNET_CONTAINER_DLL_remove (h->transmit_head, h->transmit_tail, op);
654 /* request active but not yet with service, can still abort */
655 GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
657 GNUNET_CONTAINER_DLL_remove (h->transmit_head, h->transmit_tail, op);
662 /* request active with service, simply ensure continuations are not called */
669 * Store join/leave events for a PSYC channel in order to be able to answer
670 * membership test queries later.
672 * @param h Handle for the PSYCstore.
673 * @param channel_key The channel where the event happened.
674 * @param slave_key Public key of joining/leaving slave.
675 * @param did_join #GNUNET_YES on join, #GNUNET_NO on part.
676 * @param announced_at ID of the message that announced the membership change.
677 * @param effective_since Message ID this membership change is in effect since.
678 * For joins it is <= announced_at, for parts it is always 0.
679 * @param group_generation In case of a part, the last group generation the
680 * slave has access to. It has relevance when a larger message have
681 * fragments with different group generations.
682 * @param rcb Callback to call with the result of the storage operation.
683 * @param rcb_cls Closure for the callback.
685 * @return Operation handle that can be used to cancel the operation.
687 struct GNUNET_PSYCSTORE_OperationHandle *
688 GNUNET_PSYCSTORE_membership_store (struct GNUNET_PSYCSTORE_Handle *h,
689 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
690 const struct GNUNET_CRYPTO_EccPublicSignKey *slave_key,
692 uint64_t announced_at,
693 uint64_t effective_since,
694 uint64_t group_generation,
695 GNUNET_PSYCSTORE_ResultCallback rcb,
698 GNUNET_assert (NULL != h);
699 GNUNET_assert (NULL != channel_key);
700 GNUNET_assert (NULL != slave_key);
701 GNUNET_assert (did_join
702 ? effective_since <= announced_at
703 : effective_since == 0);
705 struct MembershipStoreRequest *req;
706 struct GNUNET_PSYCSTORE_OperationHandle *op
707 = GNUNET_malloc (sizeof (*op) + sizeof (*req));
712 req = (struct MembershipStoreRequest *) &op[1];
713 op->msg = (struct GNUNET_MessageHeader *) req;
714 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_STORE);
715 req->header.size = htons (sizeof (*req));
716 req->channel_key = *channel_key;
717 req->slave_key = *slave_key;
718 req->did_join = htonl (did_join);
719 req->announced_at = GNUNET_htonll (announced_at);
720 req->effective_since = GNUNET_htonll (effective_since);
721 req->group_generation = GNUNET_htonll (group_generation);
723 op->op_id = get_next_op_id (h);
724 req->op_id = htonl (op->op_id);
726 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
734 * Test if a member was admitted to the channel at the given message ID.
736 * This is useful when relaying and replaying messages to check if a particular
737 * slave has access to the message fragment with a given group generation. It
738 * is also used when handling join requests to determine whether the slave is
739 * currently admitted to the channel.
741 * @param h Handle for the PSYCstore.
742 * @param channel_key The channel we are interested in.
743 * @param slave_key Public key of slave whose membership to check.
744 * @param message_id Message ID for which to do the membership test.
745 * @param group_generation Group generation of the fragment of the message to
746 * test. It has relevance if the message consists of multiple fragments
747 * with different group generations.
748 * @param rcb Callback to call with the test result.
749 * @param rcb_cls Closure for the callback.
751 * @return Operation handle that can be used to cancel the operation.
753 struct GNUNET_PSYCSTORE_OperationHandle *
754 GNUNET_PSYCSTORE_membership_test (struct GNUNET_PSYCSTORE_Handle *h,
755 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
756 const struct GNUNET_CRYPTO_EccPublicSignKey *slave_key,
758 uint64_t group_generation,
759 GNUNET_PSYCSTORE_ResultCallback rcb,
762 struct MembershipTestRequest *req;
763 struct GNUNET_PSYCSTORE_OperationHandle *op
764 = GNUNET_malloc (sizeof (*op) + sizeof (*req));
769 req = (struct MembershipTestRequest *) &op[1];
770 op->msg = (struct GNUNET_MessageHeader *) req;
771 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_TEST);
772 req->header.size = htons (sizeof (*req));
773 req->channel_key = *channel_key;
774 req->slave_key = *slave_key;
775 req->message_id = GNUNET_htonll (message_id);
776 req->group_generation = GNUNET_htonll (group_generation);
778 op->op_id = get_next_op_id (h);
779 req->op_id = htonl (op->op_id);
781 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
789 * Store a message fragment sent to a channel.
791 * @param h Handle for the PSYCstore.
792 * @param channel_key The channel the message belongs to.
793 * @param message Message to store.
794 * @param psycstore_flags Flags indicating whether the PSYC message contains
796 * @param rcb Callback to call with the result of the operation.
797 * @param rcb_cls Closure for the callback.
799 * @return Handle that can be used to cancel the operation.
801 struct GNUNET_PSYCSTORE_OperationHandle *
802 GNUNET_PSYCSTORE_fragment_store (struct GNUNET_PSYCSTORE_Handle *h,
803 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
804 const struct GNUNET_MULTICAST_MessageHeader *message,
805 uint32_t psycstore_flags,
806 GNUNET_PSYCSTORE_ResultCallback rcb,
809 uint16_t size = ntohs (message->header.size);
810 struct FragmentStoreRequest *req;
811 struct GNUNET_PSYCSTORE_OperationHandle *op
812 = GNUNET_malloc (sizeof (*op) + sizeof (*req) + size);
817 req = (struct FragmentStoreRequest *) &op[1];
818 op->msg = (struct GNUNET_MessageHeader *) req;
819 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_STORE);
820 req->header.size = htons (sizeof (*req) + size);
821 req->channel_key = *channel_key;
822 req->psycstore_flags = htonl (psycstore_flags);
823 memcpy (&req[1], message, size);
825 op->op_id = get_next_op_id (h);
826 req->op_id = htonl (op->op_id);
828 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
836 * Retrieve a message fragment by fragment ID.
838 * @param h Handle for the PSYCstore.
839 * @param channel_key The channel we are interested in.
840 * @param fragment_id Fragment ID to check. Use 0 to get the latest message fragment.
841 * @param fcb Callback to call with the retrieved fragments.
842 * @param rcb Callback to call with the result of the operation.
843 * @param cls Closure for the callbacks.
845 * @return Handle that can be used to cancel the operation.
847 struct GNUNET_PSYCSTORE_OperationHandle *
848 GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h,
849 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
850 uint64_t fragment_id,
851 GNUNET_PSYCSTORE_FragmentCallback fcb,
852 GNUNET_PSYCSTORE_ResultCallback rcb,
855 struct FragmentGetRequest *req;
856 struct GNUNET_PSYCSTORE_OperationHandle *op
857 = GNUNET_malloc (sizeof (*op) + sizeof (*req));
859 op->data_cb = (DataCallback) fcb;
863 req = (struct FragmentGetRequest *) &op[1];
864 op->msg = (struct GNUNET_MessageHeader *) req;
865 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET);
866 req->header.size = htons (sizeof (*req));
867 req->channel_key = *channel_key;
868 req->fragment_id = GNUNET_htonll (fragment_id);
870 op->op_id = get_next_op_id (h);
871 req->op_id = htonl (op->op_id);
873 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
881 * Retrieve all fragments of a message.
883 * @param h Handle for the PSYCstore.
884 * @param channel_key The channel we are interested in.
885 * @param message_id Message ID to check. Use 0 to get the latest message.
886 * @param fcb Callback to call with the retrieved fragments.
887 * @param rcb Callback to call with the result of the operation.
888 * @param cls Closure for the callbacks.
890 * @return Handle that can be used to cancel the operation.
892 struct GNUNET_PSYCSTORE_OperationHandle *
893 GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h,
894 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
896 GNUNET_PSYCSTORE_FragmentCallback fcb,
897 GNUNET_PSYCSTORE_ResultCallback rcb,
900 struct MessageGetRequest *req;
901 struct GNUNET_PSYCSTORE_OperationHandle *op
902 = GNUNET_malloc (sizeof (*op) + sizeof (*req));
904 op->data_cb = (DataCallback) fcb;
908 req = (struct MessageGetRequest *) &op[1];
909 op->msg = (struct GNUNET_MessageHeader *) req;
910 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET);
911 req->header.size = htons (sizeof (*req));
912 req->channel_key = *channel_key;
913 req->message_id = GNUNET_htonll (message_id);
915 op->op_id = get_next_op_id (h);
916 req->op_id = htonl (op->op_id);
918 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
926 * Retrieve a fragment of message specified by its message ID and fragment
929 * @param h Handle for the PSYCstore.
930 * @param channel_key The channel we are interested in.
931 * @param message_id Message ID to check. Use 0 to get the latest message.
932 * @param fragment_offset Offset of the fragment to retrieve.
933 * @param fcb Callback to call with the retrieved fragments.
934 * @param rcb Callback to call with the result of the operation.
935 * @param cls Closure for the callbacks.
937 * @return Handle that can be used to cancel the operation.
939 struct GNUNET_PSYCSTORE_OperationHandle *
940 GNUNET_PSYCSTORE_message_get_fragment (struct GNUNET_PSYCSTORE_Handle *h,
941 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
943 uint64_t fragment_offset,
944 GNUNET_PSYCSTORE_FragmentCallback fcb,
945 GNUNET_PSYCSTORE_ResultCallback rcb,
948 struct MessageGetFragmentRequest *req;
949 struct GNUNET_PSYCSTORE_OperationHandle *op
950 = GNUNET_malloc (sizeof (*op) + sizeof (*req));
952 op->data_cb = (DataCallback) fcb;
956 req = (struct MessageGetFragmentRequest *) &op[1];
957 op->msg = (struct GNUNET_MessageHeader *) req;
958 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET_FRAGMENT);
959 req->header.size = htons (sizeof (*req));
960 req->channel_key = *channel_key;
961 req->message_id = GNUNET_htonll (message_id);
962 req->fragment_offset = GNUNET_htonll (fragment_offset);
964 op->op_id = get_next_op_id (h);
965 req->op_id = htonl (op->op_id);
967 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
975 * Retrieve latest values of counters for a channel master.
977 * The current value of counters are needed when a channel master is restarted,
978 * so that it can continue incrementing the counters from their last value.
980 * @param h Handle for the PSYCstore.
981 * @param channel_key Public key that identifies the channel.
982 * @param mccb Callback to call with the result.
983 * @param mccb_cls Closure for the callback.
985 * @return Handle that can be used to cancel the operation.
987 struct GNUNET_PSYCSTORE_OperationHandle *
988 GNUNET_PSYCSTORE_counters_get_master (struct GNUNET_PSYCSTORE_Handle *h,
989 struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
990 GNUNET_PSYCSTORE_MasterCountersCallback mccb,
993 struct OperationRequest *req;
994 struct GNUNET_PSYCSTORE_OperationHandle *op
995 = GNUNET_malloc (sizeof (*op) + sizeof (*req));
1000 req = (struct OperationRequest *) &op[1];
1001 op->msg = (struct GNUNET_MessageHeader *) req;
1002 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_COUNTERS_GET_MASTER);
1003 req->header.size = htons (sizeof (*req));
1004 req->channel_key = *channel_key;
1006 op->op_id = get_next_op_id (h);
1007 req->op_id = htonl (op->op_id);
1009 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1018 * Retrieve latest values of counters for a channel slave.
1020 * The current value of counters are needed when a channel slave rejoins
1021 * and starts the state synchronization process.
1023 * @param h Handle for the PSYCstore.
1024 * @param channel_key Public key that identifies the channel.
1025 * @param sccb Callback to call with the result.
1026 * @param sccb_cls Closure for the callback.
1028 * @return Handle that can be used to cancel the operation.
1030 struct GNUNET_PSYCSTORE_OperationHandle *
1031 GNUNET_PSYCSTORE_counters_get_slave (struct GNUNET_PSYCSTORE_Handle *h,
1032 struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
1033 GNUNET_PSYCSTORE_SlaveCountersCallback sccb,
1036 struct OperationRequest *req;
1037 struct GNUNET_PSYCSTORE_OperationHandle *op
1038 = GNUNET_malloc (sizeof (*op) + sizeof (*req));
1043 req = (struct OperationRequest *) &op[1];
1044 op->msg = (struct GNUNET_MessageHeader *) req;
1045 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_COUNTERS_GET_SLAVE);
1046 req->header.size = htons (sizeof (*req));
1047 req->channel_key = *channel_key;
1049 op->op_id = get_next_op_id (h);
1050 req->op_id = htonl (op->op_id);
1052 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1060 * Apply modifiers of a message to the current channel state.
1062 * An error is returned if there are missing messages containing state
1063 * operations before the current one.
1065 * @param h Handle for the PSYCstore.
1066 * @param channel_key The channel we are interested in.
1067 * @param message_id ID of the message that contains the @a modifiers.
1068 * @param state_delta Value of the _state_delta PSYC header variable of the message.
1069 * @param modifier_count Number of elements in the @a modifiers array.
1070 * @param modifiers List of modifiers to apply.
1071 * @param rcb Callback to call with the result of the operation.
1072 * @param rcb_cls Closure for the callback.
1074 * @return Handle that can be used to cancel the operation.
1076 struct GNUNET_PSYCSTORE_OperationHandle *
1077 GNUNET_PSYCSTORE_state_modify (struct GNUNET_PSYCSTORE_Handle *h,
1078 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
1079 uint64_t message_id,
1080 uint64_t state_delta,
1081 size_t modifier_count,
1082 const struct GNUNET_ENV_Modifier *modifiers,
1083 GNUNET_PSYCSTORE_ResultCallback rcb,
1086 struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
1089 for (i = 0; i < modifier_count; i++) {
1090 struct StateModifyRequest *req;
1091 uint16_t name_size = strlen (modifiers[i].name) + 1;
1093 op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size +
1094 modifiers[i].value_size);
1099 req = (struct StateModifyRequest *) &op[1];
1100 op->msg = (struct GNUNET_MessageHeader *) req;
1101 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY);
1102 req->header.size = htons (sizeof (*req) + name_size
1103 + modifiers[i].value_size);
1104 req->channel_key = *channel_key;
1105 req->message_id = GNUNET_htonll (message_id);
1106 req->state_delta = GNUNET_htonll (state_delta);
1107 req->oper = modifiers[i].oper;
1108 req->name_size = htons (name_size);
1112 : modifier_count - 1 == i
1116 memcpy (&req[1], modifiers[i].name, name_size);
1117 memcpy ((void *) &req[1] + name_size, modifiers[i].value, modifiers[i].value_size);
1119 op->op_id = get_next_op_id (h);
1120 req->op_id = htonl (op->op_id);
1122 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1126 /* FIXME: only the last operation is returned,
1127 * operation_cancel() should be able to cancel all of them.
1133 * Store synchronized state.
1135 * @param h Handle for the PSYCstore.
1136 * @param channel_key The channel we are interested in.
1137 * @param message_id ID of the message that contains the state_hash PSYC header variable.
1138 * @param modifier_count Number of elements in the @a modifiers array.
1139 * @param modifiers Full state to store.
1140 * @param rcb Callback to call with the result of the operation.
1141 * @param rcb_cls Closure for the callback.
1143 * @return Handle that can be used to cancel the operation.
1145 struct GNUNET_PSYCSTORE_OperationHandle *
1146 GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h,
1147 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
1148 uint64_t message_id,
1149 size_t modifier_count,
1150 const struct GNUNET_ENV_Modifier *modifiers,
1151 GNUNET_PSYCSTORE_ResultCallback rcb,
1154 struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
1157 for (i = 0; i < modifier_count; i++) {
1158 struct StateSyncRequest *req;
1159 uint16_t name_size = strlen (modifiers[i].name) + 1;
1161 op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size +
1162 modifiers[i].value_size);
1167 req = (struct StateSyncRequest *) &op[1];
1168 op->msg = (struct GNUNET_MessageHeader *) req;
1169 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC);
1170 req->header.size = htons (sizeof (*req) + name_size
1171 + modifiers[i].value_size);
1172 req->channel_key = *channel_key;
1173 req->message_id = GNUNET_htonll (message_id);
1174 req->name_size = htons (name_size);
1178 : modifier_count - 1 == i
1182 memcpy (&req[1], modifiers[i].name, name_size);
1183 memcpy ((void *) &req[1] + name_size, modifiers[i].value, modifiers[i].value_size);
1185 op->op_id = get_next_op_id (h);
1186 req->op_id = htonl (op->op_id);
1188 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1196 * Reset the state of a channel.
1198 * Delete all state variables stored for the given channel.
1200 * @param h Handle for the PSYCstore.
1201 * @param channel_key The channel we are interested in.
1202 * @param rcb Callback to call with the result of the operation.
1203 * @param rcb_cls Closure for the callback.
1205 * @return Handle that can be used to cancel the operation.
1207 struct GNUNET_PSYCSTORE_OperationHandle *
1208 GNUNET_PSYCSTORE_state_reset (struct GNUNET_PSYCSTORE_Handle *h,
1209 const struct GNUNET_CRYPTO_EccPublicSignKey
1211 GNUNET_PSYCSTORE_ResultCallback rcb,
1214 struct OperationRequest *req;
1215 struct GNUNET_PSYCSTORE_OperationHandle *op
1216 = GNUNET_malloc (sizeof (*op) + sizeof (*req));
1221 req = (struct OperationRequest *) &op[1];
1222 op->msg = (struct GNUNET_MessageHeader *) req;
1223 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_RESET);
1224 req->header.size = htons (sizeof (*req));
1225 req->channel_key = *channel_key;
1227 op->op_id = get_next_op_id (h);
1228 req->op_id = htonl (op->op_id);
1230 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1239 * Update signed values of state variables in the state store.
1241 * @param h Handle for the PSYCstore.
1242 * @param channel_key The channel we are interested in.
1243 * @param message_id Message ID that contained the state @a hash.
1244 * @param hash Hash of the serialized full state.
1245 * @param rcb Callback to call with the result of the operation.
1246 * @param rcb_cls Closure for the callback.
1249 struct GNUNET_PSYCSTORE_OperationHandle *
1250 GNUNET_PSYCSTORE_state_hash_update (struct GNUNET_PSYCSTORE_Handle *h,
1251 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
1252 uint64_t message_id,
1253 const struct GNUNET_HashCode *hash,
1254 GNUNET_PSYCSTORE_ResultCallback rcb,
1257 struct StateHashUpdateRequest *req;
1258 struct GNUNET_PSYCSTORE_OperationHandle *op
1259 = GNUNET_malloc (sizeof (*op) + sizeof (*req));
1264 req = (struct StateHashUpdateRequest *) &op[1];
1265 op->msg = (struct GNUNET_MessageHeader *) req;
1266 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_RESET);
1267 req->header.size = htons (sizeof (*req));
1268 req->channel_key = *channel_key;
1271 op->op_id = get_next_op_id (h);
1272 req->op_id = htonl (op->op_id);
1274 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1282 * Retrieve the best matching state variable.
1284 * @param h Handle for the PSYCstore.
1285 * @param channel_key The channel we are interested in.
1286 * @param name Name of variable to match, the returned variable might be less specific.
1287 * @param scb Callback to return the matching state variable.
1288 * @param rcb Callback to call with the result of the operation.
1289 * @param cls Closure for the callbacks.
1291 * @return Handle that can be used to cancel the operation.
1293 struct GNUNET_PSYCSTORE_OperationHandle *
1294 GNUNET_PSYCSTORE_state_get (struct GNUNET_PSYCSTORE_Handle *h,
1295 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
1297 GNUNET_PSYCSTORE_StateCallback scb,
1298 GNUNET_PSYCSTORE_ResultCallback rcb,
1301 size_t name_size = strlen (name) + 1;
1302 struct OperationRequest *req;
1303 struct GNUNET_PSYCSTORE_OperationHandle *op
1304 = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size);
1306 op->data_cb = (DataCallback) scb;
1310 req = (struct OperationRequest *) &op[1];
1311 op->msg = (struct GNUNET_MessageHeader *) req;
1312 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET);
1313 req->header.size = htons (sizeof (*req) + name_size);
1314 req->channel_key = *channel_key;
1315 memcpy (&req[1], name, name_size);
1317 op->op_id = get_next_op_id (h);
1318 req->op_id = htonl (op->op_id);
1320 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1329 * Retrieve all state variables for a channel with the given prefix.
1331 * @param h Handle for the PSYCstore.
1332 * @param channel_key The channel we are interested in.
1333 * @param name_prefix Prefix of state variable names to match.
1334 * @param scb Callback to return matching state variables.
1335 * @param rcb Callback to call with the result of the operation.
1336 * @param cls Closure for the callbacks.
1338 * @return Handle that can be used to cancel the operation.
1340 struct GNUNET_PSYCSTORE_OperationHandle *
1341 GNUNET_PSYCSTORE_state_get_prefix (struct GNUNET_PSYCSTORE_Handle *h,
1342 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
1343 const char *name_prefix,
1344 GNUNET_PSYCSTORE_StateCallback scb,
1345 GNUNET_PSYCSTORE_ResultCallback rcb,
1348 size_t name_size = strlen (name_prefix) + 1;
1349 struct OperationRequest *req;
1350 struct GNUNET_PSYCSTORE_OperationHandle *op
1351 = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size);
1353 op->data_cb = (DataCallback) scb;
1357 req = (struct OperationRequest *) &op[1];
1358 op->msg = (struct GNUNET_MessageHeader *) req;
1359 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET_PREFIX);
1360 req->header.size = htons (sizeof (*req) + name_size);
1361 req->channel_key = *channel_key;
1362 memcpy (&req[1], name_prefix, name_size);
1364 op->op_id = get_next_op_id (h);
1365 req->op_id = htonl (op->op_id);
1367 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1373 /* end of psycstore_api.c */