2 * This file is part of GNUnet
3 * (C) 2013 Christian Grothoff (and other contributing authors)
5 * GNUnet is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published
7 * by the Free Software Foundation; either version 3, or (at your
8 * option) any later version.
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with GNUnet; see the file COPYING. If not, write to the
17 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 * Boston, MA 02111-1307, USA.
22 * @file psycstore/psycstore_api.c
23 * @brief API to interact with the PSYCstore service
24 * @author Gabor X Toth
25 * @author Christian Grothoff
29 #include "gnunet_util_lib.h"
30 #include "gnunet_constants.h"
31 #include "gnunet_protocols.h"
32 #include "gnunet_psycstore_service.h"
33 #include "gnunet_multicast_service.h"
34 #include "psycstore.h"
36 #define LOG(kind,...) GNUNET_log_from (kind, "psycstore-api",__VA_ARGS__)
38 typedef void (*DataCallback) ();
41 * Handle for an operation with the PSYCstore service.
43 struct GNUNET_PSYCSTORE_OperationHandle
47 * Main PSYCstore handle.
49 struct GNUNET_PSYCSTORE_Handle *h;
52 * We keep operations in a DLL.
54 struct GNUNET_PSYCSTORE_OperationHandle *next;
57 * We keep operations in a DLL.
59 struct GNUNET_PSYCSTORE_OperationHandle *prev;
62 * Continuation to invoke with the result of an operation.
64 GNUNET_PSYCSTORE_ResultCallback res_cb;
67 * Continuation to invoke with the result of an operation returning data.
72 * Closure for the callbacks.
82 * Message to send to the PSYCstore service.
83 * Allocated at the end of this struct.
85 const struct GNUNET_MessageHeader *msg;
90 * Handle for the service.
92 struct GNUNET_PSYCSTORE_Handle
95 * Configuration to use.
97 const struct GNUNET_CONFIGURATION_Handle *cfg;
100 * Socket (if available).
102 struct GNUNET_CLIENT_Connection *client;
105 * Head of operations to transmit.
107 struct GNUNET_PSYCSTORE_OperationHandle *transmit_head;
110 * Tail of operations to transmit.
112 struct GNUNET_PSYCSTORE_OperationHandle *transmit_tail;
115 * Head of active operations waiting for response.
117 struct GNUNET_PSYCSTORE_OperationHandle *op_head;
120 * Tail of active operations waiting for response.
122 struct GNUNET_PSYCSTORE_OperationHandle *op_tail;
125 * Currently pending transmission request, or NULL for none.
127 struct GNUNET_CLIENT_TransmitHandle *th;
130 * Task doing exponential back-off trying to reconnect.
132 GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
135 * Time for next connect retry.
137 struct GNUNET_TIME_Relative reconnect_delay;
140 * Are we polling for incoming messages right now?
145 * The last operation id used for a PSYCstore operation.
147 uint32_t last_op_id_used;
153 * Get a fresh operation ID to distinguish between PSYCstore requests.
155 * @param h Handle to the PSYCstore service.
156 * @return next operation id to use
159 get_next_op_id (struct GNUNET_PSYCSTORE_Handle *h)
161 return h->last_op_id_used++;
166 * Find operation by ID.
168 * @return OperationHandle if found, or NULL otherwise.
170 static struct GNUNET_PSYCSTORE_OperationHandle *
171 find_op_by_id (struct GNUNET_PSYCSTORE_Handle *h, uint32_t op_id)
173 struct GNUNET_PSYCSTORE_OperationHandle *op = h->op_head;
176 if (op->op_id == op_id)
185 * Try again to connect to the PSYCstore service.
187 * @param cls handle to the PSYCstore service.
188 * @param tc scheduler context
191 reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
195 * Reschedule a connect attempt to the service.
197 * @param h transport service to reconnect
200 reschedule_connect (struct GNUNET_PSYCSTORE_Handle *h)
202 GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
206 GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
209 if (NULL != h->client)
211 GNUNET_CLIENT_disconnect (h->client);
214 h->in_receive = GNUNET_NO;
215 LOG (GNUNET_ERROR_TYPE_DEBUG,
216 "Scheduling task to reconnect to PSYCstore service in %s.\n",
217 GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, GNUNET_YES));
219 GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, &reconnect, h);
220 h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
225 * Schedule transmission of the next message from our queue.
227 * @param h PSYCstore handle
230 transmit_next (struct GNUNET_PSYCSTORE_Handle *h);
234 * Type of a function to call when we receive a message
238 * @param msg message received, NULL on timeout or fatal error
241 message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
243 struct GNUNET_PSYCSTORE_Handle *h = cls;
244 struct GNUNET_PSYCSTORE_OperationHandle *op;
245 const struct OperationResult *opres;
246 const struct CountersResult *cres;
247 const struct FragmentResult *fres;
248 const struct StateResult *sres;
253 reschedule_connect (h);
256 LOG (GNUNET_ERROR_TYPE_DEBUG,
257 "Received message of type %d from PSYCstore service.\n",
259 uint16_t size = ntohs (msg->size);
260 uint16_t type = ntohs (msg->type);
263 case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE:
264 if (size < sizeof (struct OperationResult))
266 LOG (GNUNET_ERROR_TYPE_ERROR,
267 "Received message of type %d with length %lu bytes. "
269 type, size, sizeof (struct OperationResult));
271 reschedule_connect (h);
275 opres = (const struct OperationResult *) msg;
276 str = (const char *) &opres[1];
277 if ( (size > sizeof (struct OperationResult)) &&
278 ('\0' != str[size - sizeof (struct OperationResult) - 1]) )
281 reschedule_connect (h);
284 if (size == sizeof (struct OperationResult))
287 op = find_op_by_id (h, ntohl (opres->op_id));
290 LOG (GNUNET_ERROR_TYPE_DEBUG,
291 "No callback registered for operation with ID %ld.\n",
292 type, ntohl (opres->op_id));
296 LOG (GNUNET_ERROR_TYPE_DEBUG,
297 "Received result message (type %d) with operation ID: %ld\n",
300 GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op);
301 if (NULL != op->res_cb)
303 const struct StateModifyRequest *smreq;
304 const struct StateSyncRequest *ssreq;
305 switch (ntohs (op->msg->type))
307 case GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY:
308 smreq = (const struct StateModifyRequest *) op->msg;
309 if (!(smreq->flags & STATE_OP_LAST
310 || GNUNET_OK != ntohl (opres->result_code)))
313 case GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC:
314 ssreq = (const struct StateSyncRequest *) op->msg;
315 if (!(ssreq->flags & STATE_OP_LAST
316 || GNUNET_OK != ntohl (opres->result_code)))
321 if (NULL != op->res_cb)
322 op->res_cb (op->cls, ntohl (opres->result_code), str);
327 case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS:
328 if (size != sizeof (struct CountersResult))
330 LOG (GNUNET_ERROR_TYPE_ERROR,
331 "Received message of type %d with length %lu bytes. "
333 type, size, sizeof (struct CountersResult));
335 reschedule_connect (h);
339 cres = (const struct CountersResult *) msg;
341 op = find_op_by_id (h, ntohl (cres->op_id));
344 LOG (GNUNET_ERROR_TYPE_DEBUG,
345 "No callback registered for operation with ID %ld.\n",
346 type, ntohl (cres->op_id));
350 GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op);
351 if (NULL != op->data_cb)
352 ((GNUNET_PSYCSTORE_CountersCallback)
353 op->data_cb) (op->cls, ntohl (cres->result_code),
354 GNUNET_ntohll (cres->max_fragment_id),
355 GNUNET_ntohll (cres->max_message_id),
356 GNUNET_ntohll (cres->max_group_generation),
357 GNUNET_ntohll (cres->max_state_message_id));
362 case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_FRAGMENT:
363 if (size < sizeof (struct FragmentResult))
365 LOG (GNUNET_ERROR_TYPE_ERROR,
366 "Received message of type %d with length %lu bytes. "
368 type, size, sizeof (struct FragmentResult));
370 reschedule_connect (h);
374 fres = (const struct FragmentResult *) msg;
375 struct GNUNET_MULTICAST_MessageHeader *mmsg =
376 (struct GNUNET_MULTICAST_MessageHeader *) &fres[1];
377 if (size != sizeof (struct FragmentResult) + ntohs (mmsg->header.size))
379 LOG (GNUNET_ERROR_TYPE_ERROR,
380 "Received message of type %d with length %lu bytes. "
383 sizeof (struct FragmentResult) + ntohs (mmsg->header.size));
385 reschedule_connect (h);
389 op = find_op_by_id (h, ntohl (fres->op_id));
392 LOG (GNUNET_ERROR_TYPE_DEBUG,
393 "No callback registered for operation with ID %ld.\n",
394 type, ntohl (fres->op_id));
398 if (NULL != op->data_cb)
399 ((GNUNET_PSYCSTORE_FragmentCallback)
400 op->data_cb) (op->cls, mmsg, ntohl (fres->psycstore_flags));
404 case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_STATE:
405 if (size < sizeof (struct StateResult))
407 LOG (GNUNET_ERROR_TYPE_ERROR,
408 "Received message of type %d with length %lu bytes. "
410 type, size, sizeof (struct StateResult));
412 reschedule_connect (h);
416 sres = (const struct StateResult *) msg;
417 const char *name = (const char *) &sres[1];
418 uint16_t name_size = ntohs (sres->name_size);
420 if (name_size <= 2 || '\0' != name[name_size - 1])
422 LOG (GNUNET_ERROR_TYPE_ERROR,
423 "Received state result message (type %d) with invalid name.\n",
426 reschedule_connect (h);
430 op = find_op_by_id (h, ntohl (sres->op_id));
433 LOG (GNUNET_ERROR_TYPE_DEBUG,
434 "No callback registered for operation with ID %ld.\n",
435 type, ntohl (sres->op_id));
439 if (NULL != op->data_cb)
440 ((GNUNET_PSYCSTORE_StateCallback)
441 op->data_cb) (op->cls, name, (char *) &sres[1] + name_size,
442 ntohs (sres->header.size) - sizeof (*sres) - name_size);
448 reschedule_connect (h);
452 GNUNET_CLIENT_receive (h->client, &message_handler, h,
453 GNUNET_TIME_UNIT_FOREVER_REL);
458 * Transmit next message to service.
460 * @param cls The 'struct GNUNET_PSYCSTORE_Handle'.
461 * @param size Number of bytes available in buf.
462 * @param buf Where to copy the message.
463 * @return Number of bytes copied to buf.
466 send_next_message (void *cls, size_t size, void *buf)
468 struct GNUNET_PSYCSTORE_Handle *h = cls;
469 struct GNUNET_PSYCSTORE_OperationHandle *op = h->transmit_head;
475 ret = ntohs (op->msg->size);
478 reschedule_connect (h);
481 LOG (GNUNET_ERROR_TYPE_DEBUG,
482 "Sending message of type %d to PSYCstore service. ID: %u\n",
483 ntohs (op->msg->type), op->op_id);
484 memcpy (buf, op->msg, ret);
486 GNUNET_CONTAINER_DLL_remove (h->transmit_head, h->transmit_tail, op);
488 if (NULL == op->res_cb && NULL == op->data_cb)
494 GNUNET_CONTAINER_DLL_insert_tail (h->op_head, h->op_tail, op);
497 if (NULL != h->transmit_head)
500 if (GNUNET_NO == h->in_receive)
502 h->in_receive = GNUNET_YES;
503 GNUNET_CLIENT_receive (h->client, &message_handler, h,
504 GNUNET_TIME_UNIT_FOREVER_REL);
511 * Schedule transmission of the next message from our queue.
513 * @param h PSYCstore handle.
516 transmit_next (struct GNUNET_PSYCSTORE_Handle *h)
518 if (NULL != h->th || NULL == h->client)
521 struct GNUNET_PSYCSTORE_OperationHandle *op = h->transmit_head;
525 h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
526 ntohs (op->msg->size),
527 GNUNET_TIME_UNIT_FOREVER_REL,
535 * Try again to connect to the PSYCstore service.
537 * @param cls Handle to the PSYCstore service.
538 * @param tc Scheduler context.
541 reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
543 struct GNUNET_PSYCSTORE_Handle *h = cls;
545 h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
546 LOG (GNUNET_ERROR_TYPE_DEBUG,
547 "Connecting to PSYCstore service.\n");
548 GNUNET_assert (NULL == h->client);
549 h->client = GNUNET_CLIENT_connect ("psycstore", h->cfg);
550 GNUNET_assert (NULL != h->client);
556 * Connect to the PSYCstore service.
558 * @param cfg The configuration to use
559 * @return Handle to use
561 struct GNUNET_PSYCSTORE_Handle *
562 GNUNET_PSYCSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
564 struct GNUNET_PSYCSTORE_Handle *h
565 = GNUNET_new (struct GNUNET_PSYCSTORE_Handle);
567 h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
568 h->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, h);
574 * Disconnect from PSYCstore service
576 * @param h Handle to destroy
579 GNUNET_PSYCSTORE_disconnect (struct GNUNET_PSYCSTORE_Handle *h)
581 GNUNET_assert (NULL != h);
582 if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
584 GNUNET_SCHEDULER_cancel (h->reconnect_task);
585 h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
589 GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
592 if (NULL != h->client)
594 GNUNET_CLIENT_disconnect (h->client);
602 * Cancel a PSYCstore operation. Note that the operation MAY still
603 * be executed; this merely cancels the continuation; if the request
604 * was already transmitted, the service may still choose to complete
607 * @param op Operation to cancel.
610 GNUNET_PSYCSTORE_operation_cancel (struct GNUNET_PSYCSTORE_OperationHandle *op)
612 struct GNUNET_PSYCSTORE_Handle *h = op->h;
614 if (h->transmit_head != NULL && (h->transmit_head != op || NULL == h->client))
616 /* request not active, can simply remove */
617 GNUNET_CONTAINER_DLL_remove (h->transmit_head, h->transmit_tail, op);
623 /* request active but not yet with service, can still abort */
624 GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
626 GNUNET_CONTAINER_DLL_remove (h->transmit_head, h->transmit_tail, op);
631 /* request active with service, simply ensure continuations are not called */
638 * Store join/leave events for a PSYC channel in order to be able to answer
639 * membership test queries later.
641 * @param h Handle for the PSYCstore.
642 * @param channel_key The channel where the event happened.
643 * @param slave_key Public key of joining/leaving slave.
644 * @param did_join #GNUNET_YES on join, #GNUNET_NO on part.
645 * @param announced_at ID of the message that announced the membership change.
646 * @param effective_since Message ID this membership change is in effect since.
647 * For joins it is <= announced_at, for parts it is always 0.
648 * @param group_generation In case of a part, the last group generation the
649 * slave has access to. It has relevance when a larger message have
650 * fragments with different group generations.
651 * @param rcb Callback to call with the result of the storage operation.
652 * @param rcb_cls Closure for the callback.
654 * @return Operation handle that can be used to cancel the operation.
656 struct GNUNET_PSYCSTORE_OperationHandle *
657 GNUNET_PSYCSTORE_membership_store (struct GNUNET_PSYCSTORE_Handle *h,
658 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
659 const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
661 uint64_t announced_at,
662 uint64_t effective_since,
663 uint64_t group_generation,
664 GNUNET_PSYCSTORE_ResultCallback rcb,
667 GNUNET_assert (NULL != h);
668 GNUNET_assert (NULL != channel_key);
669 GNUNET_assert (NULL != slave_key);
670 GNUNET_assert (did_join
671 ? effective_since <= announced_at
672 : effective_since == 0);
674 struct MembershipStoreRequest *req;
675 struct GNUNET_PSYCSTORE_OperationHandle *op
676 = GNUNET_malloc (sizeof (*op) + sizeof (*req));
681 req = (struct MembershipStoreRequest *) &op[1];
682 op->msg = (struct GNUNET_MessageHeader *) req;
683 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_STORE);
684 req->header.size = htons (sizeof (*req));
685 req->channel_key = *channel_key;
686 req->slave_key = *slave_key;
687 req->did_join = htonl (did_join);
688 req->announced_at = GNUNET_htonll (announced_at);
689 req->effective_since = GNUNET_htonll (effective_since);
690 req->group_generation = GNUNET_htonll (group_generation);
692 op->op_id = get_next_op_id (h);
693 req->op_id = htonl (op->op_id);
695 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
703 * Test if a member was admitted to the channel at the given message ID.
705 * This is useful when relaying and replaying messages to check if a particular
706 * slave has access to the message fragment with a given group generation. It
707 * is also used when handling join requests to determine whether the slave is
708 * currently admitted to the channel.
710 * @param h Handle for the PSYCstore.
711 * @param channel_key The channel we are interested in.
712 * @param slave_key Public key of slave whose membership to check.
713 * @param message_id Message ID for which to do the membership test.
714 * @param group_generation Group generation of the fragment of the message to
715 * test. It has relevance if the message consists of multiple fragments
716 * with different group generations.
717 * @param rcb Callback to call with the test result.
718 * @param rcb_cls Closure for the callback.
720 * @return Operation handle that can be used to cancel the operation.
722 struct GNUNET_PSYCSTORE_OperationHandle *
723 GNUNET_PSYCSTORE_membership_test (struct GNUNET_PSYCSTORE_Handle *h,
724 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
725 const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
727 uint64_t group_generation,
728 GNUNET_PSYCSTORE_ResultCallback rcb,
731 struct MembershipTestRequest *req;
732 struct GNUNET_PSYCSTORE_OperationHandle *op
733 = GNUNET_malloc (sizeof (*op) + sizeof (*req));
738 req = (struct MembershipTestRequest *) &op[1];
739 op->msg = (struct GNUNET_MessageHeader *) req;
740 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_TEST);
741 req->header.size = htons (sizeof (*req));
742 req->channel_key = *channel_key;
743 req->slave_key = *slave_key;
744 req->message_id = GNUNET_htonll (message_id);
745 req->group_generation = GNUNET_htonll (group_generation);
747 op->op_id = get_next_op_id (h);
748 req->op_id = htonl (op->op_id);
750 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
758 * Store a message fragment sent to a channel.
760 * @param h Handle for the PSYCstore.
761 * @param channel_key The channel the message belongs to.
762 * @param message Message to store.
763 * @param psycstore_flags Flags indicating whether the PSYC message contains
765 * @param rcb Callback to call with the result of the operation.
766 * @param rcb_cls Closure for the callback.
768 * @return Handle that can be used to cancel the operation.
770 struct GNUNET_PSYCSTORE_OperationHandle *
771 GNUNET_PSYCSTORE_fragment_store (struct GNUNET_PSYCSTORE_Handle *h,
772 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
773 const struct GNUNET_MULTICAST_MessageHeader *message,
774 uint32_t psycstore_flags,
775 GNUNET_PSYCSTORE_ResultCallback rcb,
778 uint16_t size = ntohs (message->header.size);
779 struct FragmentStoreRequest *req;
780 struct GNUNET_PSYCSTORE_OperationHandle *op
781 = GNUNET_malloc (sizeof (*op) + sizeof (*req) + size);
786 req = (struct FragmentStoreRequest *) &op[1];
787 op->msg = (struct GNUNET_MessageHeader *) req;
788 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_STORE);
789 req->header.size = htons (sizeof (*req) + size);
790 req->channel_key = *channel_key;
791 req->psycstore_flags = htonl (psycstore_flags);
792 memcpy (&req[1], message, size);
794 op->op_id = get_next_op_id (h);
795 req->op_id = htonl (op->op_id);
797 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
805 * Retrieve a message fragment by fragment ID.
807 * @param h Handle for the PSYCstore.
808 * @param channel_key The channel we are interested in.
809 * @param fragment_id Fragment ID to check. Use 0 to get the latest message fragment.
810 * @param fcb Callback to call with the retrieved fragments.
811 * @param rcb Callback to call with the result of the operation.
812 * @param cls Closure for the callbacks.
814 * @return Handle that can be used to cancel the operation.
816 struct GNUNET_PSYCSTORE_OperationHandle *
817 GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h,
818 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
819 uint64_t fragment_id,
820 GNUNET_PSYCSTORE_FragmentCallback fcb,
821 GNUNET_PSYCSTORE_ResultCallback rcb,
824 struct FragmentGetRequest *req;
825 struct GNUNET_PSYCSTORE_OperationHandle *op
826 = GNUNET_malloc (sizeof (*op) + sizeof (*req));
828 op->data_cb = (DataCallback) fcb;
832 req = (struct FragmentGetRequest *) &op[1];
833 op->msg = (struct GNUNET_MessageHeader *) req;
834 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET);
835 req->header.size = htons (sizeof (*req));
836 req->channel_key = *channel_key;
837 req->fragment_id = GNUNET_htonll (fragment_id);
839 op->op_id = get_next_op_id (h);
840 req->op_id = htonl (op->op_id);
842 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
850 * Retrieve all fragments of a message.
852 * @param h Handle for the PSYCstore.
853 * @param channel_key The channel we are interested in.
854 * @param message_id Message ID to check. Use 0 to get the latest message.
855 * @param fcb Callback to call with the retrieved fragments.
856 * @param rcb Callback to call with the result of the operation.
857 * @param cls Closure for the callbacks.
859 * @return Handle that can be used to cancel the operation.
861 struct GNUNET_PSYCSTORE_OperationHandle *
862 GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h,
863 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
865 GNUNET_PSYCSTORE_FragmentCallback fcb,
866 GNUNET_PSYCSTORE_ResultCallback rcb,
869 struct MessageGetRequest *req;
870 struct GNUNET_PSYCSTORE_OperationHandle *op
871 = GNUNET_malloc (sizeof (*op) + sizeof (*req));
873 op->data_cb = (DataCallback) fcb;
877 req = (struct MessageGetRequest *) &op[1];
878 op->msg = (struct GNUNET_MessageHeader *) req;
879 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET);
880 req->header.size = htons (sizeof (*req));
881 req->channel_key = *channel_key;
882 req->message_id = GNUNET_htonll (message_id);
884 op->op_id = get_next_op_id (h);
885 req->op_id = htonl (op->op_id);
887 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
895 * Retrieve a fragment of message specified by its message ID and fragment
898 * @param h Handle for the PSYCstore.
899 * @param channel_key The channel we are interested in.
900 * @param message_id Message ID to check. Use 0 to get the latest message.
901 * @param fragment_offset Offset of the fragment to retrieve.
902 * @param fcb Callback to call with the retrieved fragments.
903 * @param rcb Callback to call with the result of the operation.
904 * @param cls Closure for the callbacks.
906 * @return Handle that can be used to cancel the operation.
908 struct GNUNET_PSYCSTORE_OperationHandle *
909 GNUNET_PSYCSTORE_message_get_fragment (struct GNUNET_PSYCSTORE_Handle *h,
910 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
912 uint64_t fragment_offset,
913 GNUNET_PSYCSTORE_FragmentCallback fcb,
914 GNUNET_PSYCSTORE_ResultCallback rcb,
917 struct MessageGetFragmentRequest *req;
918 struct GNUNET_PSYCSTORE_OperationHandle *op
919 = GNUNET_malloc (sizeof (*op) + sizeof (*req));
921 op->data_cb = (DataCallback) fcb;
925 req = (struct MessageGetFragmentRequest *) &op[1];
926 op->msg = (struct GNUNET_MessageHeader *) req;
927 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET_FRAGMENT);
928 req->header.size = htons (sizeof (*req));
929 req->channel_key = *channel_key;
930 req->message_id = GNUNET_htonll (message_id);
931 req->fragment_offset = GNUNET_htonll (fragment_offset);
933 op->op_id = get_next_op_id (h);
934 req->op_id = htonl (op->op_id);
936 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
944 * Retrieve latest values of counters for a channel master.
946 * The current value of counters are needed when a channel master is restarted,
947 * so that it can continue incrementing the counters from their last value.
949 * @param h Handle for the PSYCstore.
950 * @param channel_key Public key that identifies the channel.
951 * @param ccb Callback to call with the result.
952 * @param ccb_cls Closure for the @a ccb callback.
954 * @return Handle that can be used to cancel the operation.
956 struct GNUNET_PSYCSTORE_OperationHandle *
957 GNUNET_PSYCSTORE_counters_get (struct GNUNET_PSYCSTORE_Handle *h,
958 struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
959 GNUNET_PSYCSTORE_CountersCallback ccb,
962 struct OperationRequest *req;
963 struct GNUNET_PSYCSTORE_OperationHandle *op
964 = GNUNET_malloc (sizeof (*op) + sizeof (*req));
969 req = (struct OperationRequest *) &op[1];
970 op->msg = (struct GNUNET_MessageHeader *) req;
971 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_COUNTERS_GET);
972 req->header.size = htons (sizeof (*req));
973 req->channel_key = *channel_key;
975 op->op_id = get_next_op_id (h);
976 req->op_id = htonl (op->op_id);
978 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
986 * Apply modifiers of a message to the current channel state.
988 * An error is returned if there are missing messages containing state
989 * operations before the current one.
991 * @param h Handle for the PSYCstore.
992 * @param channel_key The channel we are interested in.
993 * @param message_id ID of the message that contains the @a modifiers.
994 * @param state_delta Value of the _state_delta PSYC header variable of the message.
995 * @param modifier_count Number of elements in the @a modifiers array.
996 * @param modifiers List of modifiers to apply.
997 * @param rcb Callback to call with the result of the operation.
998 * @param rcb_cls Closure for the @a rcb callback.
1000 * @return Handle that can be used to cancel the operation.
1002 struct GNUNET_PSYCSTORE_OperationHandle *
1003 GNUNET_PSYCSTORE_state_modify (struct GNUNET_PSYCSTORE_Handle *h,
1004 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1005 uint64_t message_id,
1006 uint64_t state_delta,
1007 size_t modifier_count,
1008 const struct GNUNET_ENV_Modifier *modifiers,
1009 GNUNET_PSYCSTORE_ResultCallback rcb,
1012 struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
1015 for (i = 0; i < modifier_count; i++) {
1016 struct StateModifyRequest *req;
1017 uint16_t name_size = strlen (modifiers[i].name) + 1;
1019 op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size +
1020 modifiers[i].value_size);
1025 req = (struct StateModifyRequest *) &op[1];
1026 op->msg = (struct GNUNET_MessageHeader *) req;
1027 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY);
1028 req->header.size = htons (sizeof (*req) + name_size
1029 + modifiers[i].value_size);
1030 req->channel_key = *channel_key;
1031 req->message_id = GNUNET_htonll (message_id);
1032 req->state_delta = GNUNET_htonll (state_delta);
1033 req->oper = modifiers[i].oper;
1034 req->name_size = htons (name_size);
1038 : modifier_count - 1 == i
1042 memcpy (&req[1], modifiers[i].name, name_size);
1043 memcpy ((char *) &req[1] + name_size, modifiers[i].value, modifiers[i].value_size);
1045 op->op_id = get_next_op_id (h);
1046 req->op_id = htonl (op->op_id);
1048 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1052 /* FIXME: only the last operation is returned,
1053 * operation_cancel() should be able to cancel all of them.
1059 * Store synchronized state.
1061 * @param h Handle for the PSYCstore.
1062 * @param channel_key The channel we are interested in.
1063 * @param message_id ID of the message that contains the state_hash PSYC header variable.
1064 * @param modifier_count Number of elements in the @a modifiers array.
1065 * @param modifiers Full state to store.
1066 * @param rcb Callback to call with the result of the operation.
1067 * @param rcb_cls Closure for the callback.
1069 * @return Handle that can be used to cancel the operation.
1071 struct GNUNET_PSYCSTORE_OperationHandle *
1072 GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h,
1073 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1074 uint64_t message_id,
1075 size_t modifier_count,
1076 const struct GNUNET_ENV_Modifier *modifiers,
1077 GNUNET_PSYCSTORE_ResultCallback rcb,
1080 struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
1083 for (i = 0; i < modifier_count; i++) {
1084 struct StateSyncRequest *req;
1085 uint16_t name_size = strlen (modifiers[i].name) + 1;
1087 op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size +
1088 modifiers[i].value_size);
1093 req = (struct StateSyncRequest *) &op[1];
1094 op->msg = (struct GNUNET_MessageHeader *) req;
1095 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC);
1096 req->header.size = htons (sizeof (*req) + name_size
1097 + modifiers[i].value_size);
1098 req->channel_key = *channel_key;
1099 req->message_id = GNUNET_htonll (message_id);
1100 req->name_size = htons (name_size);
1104 : modifier_count - 1 == i
1108 memcpy (&req[1], modifiers[i].name, name_size);
1109 memcpy ((char *) &req[1] + name_size, modifiers[i].value, modifiers[i].value_size);
1111 op->op_id = get_next_op_id (h);
1112 req->op_id = htonl (op->op_id);
1114 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1122 * Reset the state of a channel.
1124 * Delete all state variables stored for the given channel.
1126 * @param h Handle for the PSYCstore.
1127 * @param channel_key The channel we are interested in.
1128 * @param rcb Callback to call with the result of the operation.
1129 * @param rcb_cls Closure for the callback.
1131 * @return Handle that can be used to cancel the operation.
1133 struct GNUNET_PSYCSTORE_OperationHandle *
1134 GNUNET_PSYCSTORE_state_reset (struct GNUNET_PSYCSTORE_Handle *h,
1135 const struct GNUNET_CRYPTO_EddsaPublicKey
1137 GNUNET_PSYCSTORE_ResultCallback rcb,
1140 struct OperationRequest *req;
1141 struct GNUNET_PSYCSTORE_OperationHandle *op
1142 = GNUNET_malloc (sizeof (*op) + sizeof (*req));
1147 req = (struct OperationRequest *) &op[1];
1148 op->msg = (struct GNUNET_MessageHeader *) req;
1149 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_RESET);
1150 req->header.size = htons (sizeof (*req));
1151 req->channel_key = *channel_key;
1153 op->op_id = get_next_op_id (h);
1154 req->op_id = htonl (op->op_id);
1156 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1165 * Update signed values of state variables in the state store.
1167 * @param h Handle for the PSYCstore.
1168 * @param channel_key The channel we are interested in.
1169 * @param message_id Message ID that contained the state @a hash.
1170 * @param hash Hash of the serialized full state.
1171 * @param rcb Callback to call with the result of the operation.
1172 * @param rcb_cls Closure for the callback.
1175 struct GNUNET_PSYCSTORE_OperationHandle *
1176 GNUNET_PSYCSTORE_state_hash_update (struct GNUNET_PSYCSTORE_Handle *h,
1177 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1178 uint64_t message_id,
1179 const struct GNUNET_HashCode *hash,
1180 GNUNET_PSYCSTORE_ResultCallback rcb,
1183 struct StateHashUpdateRequest *req;
1184 struct GNUNET_PSYCSTORE_OperationHandle *op
1185 = GNUNET_malloc (sizeof (*op) + sizeof (*req));
1190 req = (struct StateHashUpdateRequest *) &op[1];
1191 op->msg = (struct GNUNET_MessageHeader *) req;
1192 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_RESET);
1193 req->header.size = htons (sizeof (*req));
1194 req->channel_key = *channel_key;
1197 op->op_id = get_next_op_id (h);
1198 req->op_id = htonl (op->op_id);
1200 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1208 * Retrieve the best matching state variable.
1210 * @param h Handle for the PSYCstore.
1211 * @param channel_key The channel we are interested in.
1212 * @param name Name of variable to match, the returned variable might be less specific.
1213 * @param scb Callback to return the matching state variable.
1214 * @param rcb Callback to call with the result of the operation.
1215 * @param cls Closure for the callbacks.
1217 * @return Handle that can be used to cancel the operation.
1219 struct GNUNET_PSYCSTORE_OperationHandle *
1220 GNUNET_PSYCSTORE_state_get (struct GNUNET_PSYCSTORE_Handle *h,
1221 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1223 GNUNET_PSYCSTORE_StateCallback scb,
1224 GNUNET_PSYCSTORE_ResultCallback rcb,
1227 size_t name_size = strlen (name) + 1;
1228 struct OperationRequest *req;
1229 struct GNUNET_PSYCSTORE_OperationHandle *op
1230 = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size);
1232 op->data_cb = (DataCallback) scb;
1236 req = (struct OperationRequest *) &op[1];
1237 op->msg = (struct GNUNET_MessageHeader *) req;
1238 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET);
1239 req->header.size = htons (sizeof (*req) + name_size);
1240 req->channel_key = *channel_key;
1241 memcpy (&req[1], name, name_size);
1243 op->op_id = get_next_op_id (h);
1244 req->op_id = htonl (op->op_id);
1246 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1255 * Retrieve all state variables for a channel with the given prefix.
1257 * @param h Handle for the PSYCstore.
1258 * @param channel_key The channel we are interested in.
1259 * @param name_prefix Prefix of state variable names to match.
1260 * @param scb Callback to return matching state variables.
1261 * @param rcb Callback to call with the result of the operation.
1262 * @param cls Closure for the callbacks.
1264 * @return Handle that can be used to cancel the operation.
1266 struct GNUNET_PSYCSTORE_OperationHandle *
1267 GNUNET_PSYCSTORE_state_get_prefix (struct GNUNET_PSYCSTORE_Handle *h,
1268 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1269 const char *name_prefix,
1270 GNUNET_PSYCSTORE_StateCallback scb,
1271 GNUNET_PSYCSTORE_ResultCallback rcb,
1274 size_t name_size = strlen (name_prefix) + 1;
1275 struct OperationRequest *req;
1276 struct GNUNET_PSYCSTORE_OperationHandle *op
1277 = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size);
1279 op->data_cb = (DataCallback) scb;
1283 req = (struct OperationRequest *) &op[1];
1284 op->msg = (struct GNUNET_MessageHeader *) req;
1285 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET_PREFIX);
1286 req->header.size = htons (sizeof (*req) + name_size);
1287 req->channel_key = *channel_key;
1288 memcpy (&req[1], name_prefix, name_size);
1290 op->op_id = get_next_op_id (h);
1291 req->op_id = htonl (op->op_id);
1293 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1299 /* end of psycstore_api.c */