plugin datastore mysql
[oweals/gnunet.git] / src / psycstore / psycstore_api.c
1 /*
2  * This file is part of GNUnet
3  * Copyright (C) 2013 GNUnet e.V.
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20
21 /**
22  * @file psycstore/psycstore_api.c
23  * @brief API to interact with the PSYCstore service
24  * @author Gabor X Toth
25  * @author Christian Grothoff
26  */
27
28 #include <inttypes.h>
29
30 #include "platform.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_constants.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_psycstore_service.h"
35 #include "gnunet_multicast_service.h"
36 #include "psycstore.h"
37
38 #define LOG(kind,...) GNUNET_log_from (kind, "psycstore-api",__VA_ARGS__)
39
40 typedef void (*DataCallback) ();
41
42 /**
43  * Handle for an operation with the PSYCstore service.
44  */
45 struct GNUNET_PSYCSTORE_OperationHandle
46 {
47
48   /**
49    * Main PSYCstore handle.
50    */
51   struct GNUNET_PSYCSTORE_Handle *h;
52
53   /**
54    * We keep operations in a DLL.
55    */
56   struct GNUNET_PSYCSTORE_OperationHandle *next;
57
58   /**
59    * We keep operations in a DLL.
60    */
61   struct GNUNET_PSYCSTORE_OperationHandle *prev;
62
63   /**
64    * Continuation to invoke with the result of an operation.
65    */
66   GNUNET_PSYCSTORE_ResultCallback res_cb;
67
68   /**
69    * Continuation to invoke with the result of an operation returning data.
70    */
71   DataCallback data_cb;
72
73   /**
74    * Closure for the callbacks.
75    */
76   void *cls;
77
78   /**
79    * Operation ID.
80    */
81   uint64_t op_id;
82
83   /**
84    * Message to send to the PSYCstore service.
85    * Allocated at the end of this struct.
86    */
87   const struct GNUNET_MessageHeader *msg;
88 };
89
90
91 /**
92  * Handle for the service.
93  */
94 struct GNUNET_PSYCSTORE_Handle
95 {
96   /**
97    * Configuration to use.
98    */
99   const struct GNUNET_CONFIGURATION_Handle *cfg;
100
101   /**
102    * Socket (if available).
103    */
104   struct GNUNET_CLIENT_Connection *client;
105
106   /**
107    * Head of operations to transmit.
108    */
109   struct GNUNET_PSYCSTORE_OperationHandle *transmit_head;
110
111   /**
112    * Tail of operations to transmit.
113    */
114   struct GNUNET_PSYCSTORE_OperationHandle *transmit_tail;
115
116   /**
117    * Head of active operations waiting for response.
118    */
119   struct GNUNET_PSYCSTORE_OperationHandle *op_head;
120
121   /**
122    * Tail of active operations waiting for response.
123    */
124   struct GNUNET_PSYCSTORE_OperationHandle *op_tail;
125
126   /**
127    * Currently pending transmission request, or NULL for none.
128    */
129   struct GNUNET_CLIENT_TransmitHandle *th;
130
131   /**
132    * Task doing exponential back-off trying to reconnect.
133    */
134   struct GNUNET_SCHEDULER_Task *reconnect_task;
135
136   /**
137    * Time for next connect retry.
138    */
139   struct GNUNET_TIME_Relative reconnect_delay;
140
141   /**
142    * Last operation ID used.
143    */
144   uint64_t last_op_id;
145
146   /**
147    * Are we polling for incoming messages right now?
148    */
149   uint8_t in_receive;
150 };
151
152
153 /**
154  * Get a fresh operation ID to distinguish between PSYCstore requests.
155  *
156  * @param h Handle to the PSYCstore service.
157  * @return next operation id to use
158  */
159 static uint64_t
160 get_next_op_id (struct GNUNET_PSYCSTORE_Handle *h)
161 {
162   return h->last_op_id++;
163 }
164
165
166 /**
167  * Find operation by ID.
168  *
169  * @return OperationHandle if found, or NULL otherwise.
170  */
171 static struct GNUNET_PSYCSTORE_OperationHandle *
172 find_op_by_id (struct GNUNET_PSYCSTORE_Handle *h, uint64_t op_id)
173 {
174   struct GNUNET_PSYCSTORE_OperationHandle *op = h->op_head;
175   while (NULL != op)
176   {
177     if (op->op_id == op_id)
178       return op;
179     op = op->next;
180   }
181   return NULL;
182 }
183
184
185 /**
186  * Try again to connect to the PSYCstore service.
187  *
188  * @param cls handle to the PSYCstore service.
189  */
190 static void
191 reconnect (void *cls);
192
193
194 /**
195  * Reschedule a connect attempt to the service.
196  *
197  * @param h transport service to reconnect
198  */
199 static void
200 reschedule_connect (struct GNUNET_PSYCSTORE_Handle *h)
201 {
202   GNUNET_assert (h->reconnect_task == NULL);
203
204   if (NULL != h->th)
205   {
206     GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
207     h->th = NULL;
208   }
209   if (NULL != h->client)
210   {
211     GNUNET_CLIENT_disconnect (h->client);
212     h->client = NULL;
213   }
214   h->in_receive = GNUNET_NO;
215   LOG (GNUNET_ERROR_TYPE_DEBUG,
216        "Scheduling task to reconnect to PSYCstore service in %s.\n",
217        GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, GNUNET_YES));
218   h->reconnect_task =
219       GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, &reconnect, h);
220   h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
221 }
222
223
224 /**
225  * Schedule transmission of the next message from our queue.
226  *
227  * @param h PSYCstore handle
228  */
229 static void
230 transmit_next (struct GNUNET_PSYCSTORE_Handle *h);
231
232
233 /**
234  * Type of a function to call when we receive a message
235  * from the service.
236  *
237  * @param cls closure
238  * @param msg message received, NULL on timeout or fatal error
239  */
240 static void
241 message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
242 {
243   struct GNUNET_PSYCSTORE_Handle *h = cls;
244   struct GNUNET_PSYCSTORE_OperationHandle *op;
245   const struct OperationResult *opres;
246   const struct CountersResult *cres;
247   const struct FragmentResult *fres;
248   const struct StateResult *sres;
249   const char *str;
250
251   if (NULL == msg)
252   {
253     reschedule_connect (h);
254     return;
255   }
256   LOG (GNUNET_ERROR_TYPE_DEBUG,
257        "Received message of type %d from PSYCstore service.\n",
258        ntohs (msg->type));
259   uint16_t size = ntohs (msg->size);
260   uint16_t type = ntohs (msg->type);
261   switch (type)
262   {
263   case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE:
264     if (size < sizeof (struct OperationResult))
265     {
266       LOG (GNUNET_ERROR_TYPE_ERROR,
267            "Received message of type %d with length %lu bytes. "
268            "Expected >= %lu\n",
269            type, size, sizeof (struct OperationResult));
270       GNUNET_break (0);
271       reschedule_connect (h);
272       return;
273     }
274
275     opres = (const struct OperationResult *) msg;
276     str = (const char *) &opres[1];
277     if ( (size > sizeof (struct OperationResult)) &&
278          ('\0' != str[size - sizeof (struct OperationResult) - 1]) )
279     {
280       GNUNET_break (0);
281       reschedule_connect (h);
282       return;
283     }
284     if (size == sizeof (struct OperationResult))
285       str = "";
286
287     op = find_op_by_id (h, GNUNET_ntohll (opres->op_id));
288     if (NULL == op)
289     {
290       LOG (GNUNET_ERROR_TYPE_DEBUG,
291            "No callback registered for operation with ID %" PRIu64 ".\n",
292            type, GNUNET_ntohll (opres->op_id));
293     }
294     else
295     {
296       LOG (GNUNET_ERROR_TYPE_DEBUG,
297            "Received result message (type %d) with operation ID: %" PRIu64 "\n",
298            type, op->op_id);
299
300       int64_t result_code = GNUNET_ntohll (opres->result_code) + INT64_MIN;
301       GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op);
302       if (NULL != op->res_cb)
303       {
304         const struct StateSyncRequest *ssreq;
305         switch (ntohs (op->msg->type))
306         {
307         case GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC:
308           ssreq = (const struct StateSyncRequest *) op->msg;
309           if (!(ssreq->flags & STATE_OP_LAST
310                 || GNUNET_OK != result_code))
311             op->res_cb = NULL;
312           break;
313         }
314       }
315       if (NULL != op->res_cb)
316         op->res_cb (op->cls, result_code, str, size - sizeof (*opres));
317       GNUNET_free (op);
318     }
319     break;
320
321   case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS:
322     if (size != sizeof (struct CountersResult))
323     {
324       LOG (GNUNET_ERROR_TYPE_ERROR,
325            "Received message of type %d with length %lu bytes. "
326            "Expected %lu\n",
327            type, size, sizeof (struct CountersResult));
328       GNUNET_break (0);
329       reschedule_connect (h);
330       return;
331     }
332
333     cres = (const struct CountersResult *) msg;
334
335     op = find_op_by_id (h, GNUNET_ntohll (cres->op_id));
336     if (NULL == op)
337     {
338       LOG (GNUNET_ERROR_TYPE_DEBUG,
339            "No callback registered for operation with ID %" PRIu64 ".\n",
340            type, GNUNET_ntohll (cres->op_id));
341     }
342     else
343     {
344       GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op);
345       if (NULL != op->data_cb)
346         ((GNUNET_PSYCSTORE_CountersCallback)
347          op->data_cb) (op->cls,
348                        ntohl (cres->result_code),
349                        GNUNET_ntohll (cres->max_fragment_id),
350                        GNUNET_ntohll (cres->max_message_id),
351                        GNUNET_ntohll (cres->max_group_generation),
352                        GNUNET_ntohll (cres->max_state_message_id));
353       GNUNET_free (op);
354     }
355     break;
356
357   case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_FRAGMENT:
358     if (size < sizeof (struct FragmentResult))
359     {
360       LOG (GNUNET_ERROR_TYPE_ERROR,
361            "Received message of type %d with length %lu bytes. "
362            "Expected >= %lu\n",
363            type, size, sizeof (struct FragmentResult));
364       GNUNET_break (0);
365       reschedule_connect (h);
366       return;
367     }
368
369     fres = (const struct FragmentResult *) msg;
370     struct GNUNET_MULTICAST_MessageHeader *mmsg =
371       (struct GNUNET_MULTICAST_MessageHeader *) &fres[1];
372     if (size != sizeof (struct FragmentResult) + ntohs (mmsg->header.size))
373     {
374       LOG (GNUNET_ERROR_TYPE_ERROR,
375            "Received message of type %d with length %lu bytes. "
376            "Expected = %lu\n",
377            type, size,
378            sizeof (struct FragmentResult) + ntohs (mmsg->header.size));
379       GNUNET_break (0);
380       reschedule_connect (h);
381       return;
382     }
383
384     op = find_op_by_id (h, GNUNET_ntohll (fres->op_id));
385     if (NULL == op)
386     {
387       LOG (GNUNET_ERROR_TYPE_DEBUG,
388            "No callback registered for operation with ID %" PRIu64 ".\n",
389            type, GNUNET_ntohll (fres->op_id));
390     }
391     else
392     {
393       if (NULL != op->data_cb)
394         ((GNUNET_PSYCSTORE_FragmentCallback)
395          op->data_cb) (op->cls, mmsg, ntohl (fres->psycstore_flags));
396     }
397     break;
398
399   case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_STATE:
400     if (size < sizeof (struct StateResult))
401     {
402       LOG (GNUNET_ERROR_TYPE_ERROR,
403            "Received message of type %d with length %lu bytes. "
404            "Expected >= %lu\n",
405            type, size, sizeof (struct StateResult));
406       GNUNET_break (0);
407       reschedule_connect (h);
408       return;
409     }
410
411     sres = (const struct StateResult *) msg;
412     const char *name = (const char *) &sres[1];
413     uint16_t name_size = ntohs (sres->name_size);
414
415     if (name_size <= 2 || '\0' != name[name_size - 1])
416     {
417       LOG (GNUNET_ERROR_TYPE_ERROR,
418            "Received state result message (type %d) with invalid name.\n",
419            type);
420       GNUNET_break (0);
421       reschedule_connect (h);
422       return;
423     }
424
425     op = find_op_by_id (h, GNUNET_ntohll (sres->op_id));
426     if (NULL == op)
427     {
428       LOG (GNUNET_ERROR_TYPE_DEBUG,
429            "No callback registered for operation with ID %" PRIu64 ".\n",
430            type, GNUNET_ntohll (sres->op_id));
431     }
432     else
433     {
434       if (NULL != op->data_cb)
435         ((GNUNET_PSYCSTORE_StateCallback)
436          op->data_cb) (op->cls, name, (char *) &sres[1] + name_size,
437                        ntohs (sres->header.size) - sizeof (*sres) - name_size);
438     }
439     break;
440
441   default:
442     GNUNET_break (0);
443     reschedule_connect (h);
444     return;
445   }
446
447   GNUNET_CLIENT_receive (h->client, &message_handler, h,
448                          GNUNET_TIME_UNIT_FOREVER_REL);
449 }
450
451
452 /**
453  * Transmit next message to service.
454  *
455  * @param cls The 'struct GNUNET_PSYCSTORE_Handle'.
456  * @param size Number of bytes available in buf.
457  * @param buf Where to copy the message.
458  * @return Number of bytes copied to buf.
459  */
460 static size_t
461 send_next_message (void *cls, size_t size, void *buf)
462 {
463   struct GNUNET_PSYCSTORE_Handle *h = cls;
464   struct GNUNET_PSYCSTORE_OperationHandle *op = h->transmit_head;
465   size_t ret;
466
467   h->th = NULL;
468   if (NULL == op)
469     return 0;
470   ret = ntohs (op->msg->size);
471   if (ret > size)
472   {
473     reschedule_connect (h);
474     return 0;
475   }
476   LOG (GNUNET_ERROR_TYPE_DEBUG,
477        "Sending message of type %d to PSYCstore service. ID: %" PRIu64 "\n",
478        ntohs (op->msg->type), op->op_id);
479   memcpy (buf, op->msg, ret);
480
481   GNUNET_CONTAINER_DLL_remove (h->transmit_head, h->transmit_tail, op);
482
483   if (NULL == op->res_cb && NULL == op->data_cb)
484   {
485     GNUNET_free (op);
486   }
487   else
488   {
489     GNUNET_CONTAINER_DLL_insert_tail (h->op_head, h->op_tail, op);
490   }
491
492   if (NULL != h->transmit_head)
493     transmit_next (h);
494
495   if (GNUNET_NO == h->in_receive)
496   {
497     h->in_receive = GNUNET_YES;
498     GNUNET_CLIENT_receive (h->client, &message_handler, h,
499                            GNUNET_TIME_UNIT_FOREVER_REL);
500   }
501   return ret;
502 }
503
504
505 /**
506  * Schedule transmission of the next message from our queue.
507  *
508  * @param h PSYCstore handle.
509  */
510 static void
511 transmit_next (struct GNUNET_PSYCSTORE_Handle *h)
512 {
513   if (NULL != h->th || NULL == h->client)
514     return;
515
516   struct GNUNET_PSYCSTORE_OperationHandle *op = h->transmit_head;
517   if (NULL == op)
518     return;
519
520   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
521                                                ntohs (op->msg->size),
522                                                GNUNET_TIME_UNIT_FOREVER_REL,
523                                                GNUNET_NO,
524                                                &send_next_message,
525                                                h);
526 }
527
528
529 /**
530  * Try again to connect to the PSYCstore service.
531  *
532  * @param cls Handle to the PSYCstore service.
533  */
534 static void
535 reconnect (void *cls)
536 {
537   struct GNUNET_PSYCSTORE_Handle *h = cls;
538
539   h->reconnect_task = NULL;
540   LOG (GNUNET_ERROR_TYPE_DEBUG,
541        "Connecting to PSYCstore service.\n");
542   GNUNET_assert (NULL == h->client);
543   h->client = GNUNET_CLIENT_connect ("psycstore", h->cfg);
544   GNUNET_assert (NULL != h->client);
545   transmit_next (h);
546 }
547
548
549 /**
550  * Connect to the PSYCstore service.
551  *
552  * @param cfg The configuration to use
553  * @return Handle to use
554  */
555 struct GNUNET_PSYCSTORE_Handle *
556 GNUNET_PSYCSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
557 {
558   struct GNUNET_PSYCSTORE_Handle *h
559     = GNUNET_new (struct GNUNET_PSYCSTORE_Handle);
560   h->cfg = cfg;
561   h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
562   h->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, h);
563   return h;
564 }
565
566
567 /**
568  * Disconnect from PSYCstore service
569  *
570  * @param h Handle to destroy
571  */
572 void
573 GNUNET_PSYCSTORE_disconnect (struct GNUNET_PSYCSTORE_Handle *h)
574 {
575   GNUNET_assert (NULL != h);
576   if (h->reconnect_task != NULL)
577   {
578     GNUNET_SCHEDULER_cancel (h->reconnect_task);
579     h->reconnect_task = NULL;
580   }
581   if (NULL != h->th)
582   {
583     GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
584     h->th = NULL;
585   }
586   if (NULL != h->client)
587   {
588     GNUNET_CLIENT_disconnect (h->client);
589     h->client = NULL;
590   }
591   GNUNET_free (h);
592 }
593
594
595 /**
596  * Cancel a PSYCstore operation. Note that the operation MAY still
597  * be executed; this merely cancels the continuation; if the request
598  * was already transmitted, the service may still choose to complete
599  * the operation.
600  *
601  * @param op Operation to cancel.
602  */
603 void
604 GNUNET_PSYCSTORE_operation_cancel (struct GNUNET_PSYCSTORE_OperationHandle *op)
605 {
606   struct GNUNET_PSYCSTORE_Handle *h = op->h;
607
608   if (h->transmit_head != NULL && (h->transmit_head != op || NULL == h->client))
609   {
610     /* request not active, can simply remove */
611     GNUNET_CONTAINER_DLL_remove (h->transmit_head, h->transmit_tail, op);
612     GNUNET_free (op);
613     return;
614   }
615   if (NULL != h->th)
616   {
617     /* request active but not yet with service, can still abort */
618     GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
619     h->th = NULL;
620     GNUNET_CONTAINER_DLL_remove (h->transmit_head, h->transmit_tail, op);
621     GNUNET_free (op);
622     transmit_next (h);
623     return;
624   }
625   /* request active with service, simply ensure continuations are not called */
626   op->res_cb = NULL;
627   op->data_cb = NULL;
628 }
629
630
631 /**
632  * Store join/leave events for a PSYC channel in order to be able to answer
633  * membership test queries later.
634  *
635  * @param h
636  *        Handle for the PSYCstore.
637  * @param channel_key
638  *        The channel where the event happened.
639  * @param slave_key
640  *        Public key of joining/leaving slave.
641  * @param did_join
642  *        #GNUNET_YES on join, #GNUNET_NO on part.
643  * @param announced_at
644  *        ID of the message that announced the membership change.
645  * @param effective_since
646  *        Message ID this membership change is in effect since.
647  *        For joins it is <= announced_at, for parts it is always 0.
648  * @param group_generation
649  *        In case of a part, the last group generation the slave has access to.
650  *        It has relevance when a larger message have fragments with different
651  *        group generations.
652  * @param rcb
653  *        Callback to call with the result of the storage operation.
654  * @param rcb_cls
655  *        Closure for the callback.
656  *
657  * @return Operation handle that can be used to cancel the operation.
658  */
659 struct GNUNET_PSYCSTORE_OperationHandle *
660 GNUNET_PSYCSTORE_membership_store (struct GNUNET_PSYCSTORE_Handle *h,
661                                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
662                                    const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
663                                    int did_join,
664                                    uint64_t announced_at,
665                                    uint64_t effective_since,
666                                    uint64_t group_generation,
667                                    GNUNET_PSYCSTORE_ResultCallback rcb,
668                                    void *rcb_cls)
669 {
670   GNUNET_assert (NULL != h);
671   GNUNET_assert (NULL != channel_key);
672   GNUNET_assert (NULL != slave_key);
673   GNUNET_assert (GNUNET_YES == did_join || GNUNET_NO == did_join);
674   GNUNET_assert (did_join
675                  ? effective_since <= announced_at
676                  : effective_since == 0);
677
678   struct MembershipStoreRequest *req;
679   struct GNUNET_PSYCSTORE_OperationHandle *
680     op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
681   op->h = h;
682   op->res_cb = rcb;
683   op->cls = rcb_cls;
684
685   req = (struct MembershipStoreRequest *) &op[1];
686   op->msg = (struct GNUNET_MessageHeader *) req;
687   req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_STORE);
688   req->header.size = htons (sizeof (*req));
689   req->channel_key = *channel_key;
690   req->slave_key = *slave_key;
691   req->did_join = did_join;
692   req->announced_at = GNUNET_htonll (announced_at);
693   req->effective_since = GNUNET_htonll (effective_since);
694   req->group_generation = GNUNET_htonll (group_generation);
695
696   op->op_id = get_next_op_id (h);
697   req->op_id = GNUNET_htonll (op->op_id);
698
699   GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
700   transmit_next (h);
701
702   return op;
703 }
704
705
706 /**
707  * Test if a member was admitted to the channel at the given message ID.
708  *
709  * This is useful when relaying and replaying messages to check if a particular
710  * slave has access to the message fragment with a given group generation.  It
711  * is also used when handling join requests to determine whether the slave is
712  * currently admitted to the channel.
713  *
714  * @param h
715  *        Handle for the PSYCstore.
716  * @param channel_key
717  *        The channel we are interested in.
718  * @param slave_key
719  *        Public key of slave whose membership to check.
720  * @param message_id
721  *        Message ID for which to do the membership test.
722  * @param group_generation
723  *        Group generation of the fragment of the message to test.
724  *        It has relevance if the message consists of multiple fragments with
725  *        different group generations.
726  * @param rcb
727  *        Callback to call with the test result.
728  * @param rcb_cls
729  *        Closure for the callback.
730  *
731  * @return Operation handle that can be used to cancel the operation.
732  */
733 struct GNUNET_PSYCSTORE_OperationHandle *
734 GNUNET_PSYCSTORE_membership_test (struct GNUNET_PSYCSTORE_Handle *h,
735                                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
736                                   const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
737                                   uint64_t message_id,
738                                   uint64_t group_generation,
739                                   GNUNET_PSYCSTORE_ResultCallback rcb,
740                                   void *rcb_cls)
741 {
742   struct MembershipTestRequest *req;
743   struct GNUNET_PSYCSTORE_OperationHandle *
744     op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
745   op->h = h;
746   op->res_cb = rcb;
747   op->cls = rcb_cls;
748
749   req = (struct MembershipTestRequest *) &op[1];
750   op->msg = (struct GNUNET_MessageHeader *) req;
751   req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_TEST);
752   req->header.size = htons (sizeof (*req));
753   req->channel_key = *channel_key;
754   req->slave_key = *slave_key;
755   req->message_id = GNUNET_htonll (message_id);
756   req->group_generation = GNUNET_htonll (group_generation);
757
758   op->op_id = get_next_op_id (h);
759   req->op_id = GNUNET_htonll (op->op_id);
760
761   GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
762   transmit_next (h);
763
764   return op;
765 }
766
767
768 /**
769  * Store a message fragment sent to a channel.
770  *
771  * @param h Handle for the PSYCstore.
772  * @param channel_key The channel the message belongs to.
773  * @param message Message to store.
774  * @param psycstore_flags Flags indicating whether the PSYC message contains
775  *        state modifiers.
776  * @param rcb Callback to call with the result of the operation.
777  * @param rcb_cls Closure for the callback.
778  *
779  * @return Handle that can be used to cancel the operation.
780  */
781 struct GNUNET_PSYCSTORE_OperationHandle *
782 GNUNET_PSYCSTORE_fragment_store (struct GNUNET_PSYCSTORE_Handle *h,
783                                  const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
784                                  const struct GNUNET_MULTICAST_MessageHeader *msg,
785                                  enum GNUNET_PSYCSTORE_MessageFlags psycstore_flags,
786                                  GNUNET_PSYCSTORE_ResultCallback rcb,
787                                  void *rcb_cls)
788 {
789   uint16_t size = ntohs (msg->header.size);
790   struct FragmentStoreRequest *req;
791   struct GNUNET_PSYCSTORE_OperationHandle *
792     op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + size);
793   op->h = h;
794   op->res_cb = rcb;
795   op->cls = rcb_cls;
796
797   req = (struct FragmentStoreRequest *) &op[1];
798   op->msg = (struct GNUNET_MessageHeader *) req;
799   req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_STORE);
800   req->header.size = htons (sizeof (*req) + size);
801   req->channel_key = *channel_key;
802   req->psycstore_flags = htonl (psycstore_flags);
803   memcpy (&req[1], msg, size);
804
805   op->op_id = get_next_op_id (h);
806   req->op_id = GNUNET_htonll (op->op_id);
807
808   GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
809   transmit_next (h);
810
811   return op;
812 }
813
814
815 /**
816  * Retrieve message fragments by fragment ID range.
817  *
818  * @param h
819  *        Handle for the PSYCstore.
820  * @param channel_key
821  *        The channel we are interested in.
822  * @param slave_key
823  *        The slave requesting the fragment.  If not NULL, a membership test is
824  *        performed first and the fragment is only returned if the slave has
825  *        access to it.
826  * @param first_fragment_id
827  *        First fragment ID to retrieve.
828  *        Use 0 to get the latest message fragment.
829  * @param last_fragment_id
830  *        Last consecutive fragment ID to retrieve.
831  *        Use 0 to get the latest message fragment.
832  * @param fragment_limit
833  *        Maximum number of fragments to retrieve.
834  * @param fragment_cb
835  *        Callback to call with the retrieved fragments.
836  * @param rcb
837  *        Callback to call with the result of the operation.
838  * @param cls
839  *        Closure for the callbacks.
840  *
841  * @return Handle that can be used to cancel the operation.
842  */
843 struct GNUNET_PSYCSTORE_OperationHandle *
844 GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h,
845                                const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
846                                const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
847                                uint64_t first_fragment_id,
848                                uint64_t last_fragment_id,
849                                GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
850                                GNUNET_PSYCSTORE_ResultCallback rcb,
851                                void *cls)
852 {
853   struct FragmentGetRequest *req;
854   struct GNUNET_PSYCSTORE_OperationHandle *
855     op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
856   op->h = h;
857   op->data_cb = (DataCallback) fragment_cb;
858   op->res_cb = rcb;
859   op->cls = cls;
860
861   req = (struct FragmentGetRequest *) &op[1];
862   op->msg = (struct GNUNET_MessageHeader *) req;
863   req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET);
864   req->header.size = htons (sizeof (*req));
865   req->channel_key = *channel_key;
866   req->first_fragment_id = GNUNET_htonll (first_fragment_id);
867   req->last_fragment_id = GNUNET_htonll (last_fragment_id);
868   if (NULL != slave_key)
869   {
870     req->slave_key = *slave_key;
871     req->do_membership_test = GNUNET_YES;
872   }
873
874   op->op_id = get_next_op_id (h);
875   req->op_id = GNUNET_htonll (op->op_id);
876
877   GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
878   transmit_next (h);
879
880   return op;
881 }
882
883
884 /**
885  * Retrieve latest message fragments.
886  *
887  * @param h
888  *        Handle for the PSYCstore.
889  * @param channel_key
890  *        The channel we are interested in.
891  * @param slave_key
892  *        The slave requesting the fragment.  If not NULL, a membership test is
893  *        performed first and the fragment is only returned if the slave has
894  *        access to it.
895  * @param first_fragment_id
896  *        First fragment ID to retrieve.
897  *        Use 0 to get the latest message fragment.
898  * @param last_fragment_id
899  *        Last consecutive fragment ID to retrieve.
900  *        Use 0 to get the latest message fragment.
901  * @param fragment_limit
902  *        Maximum number of fragments to retrieve.
903  * @param fragment_cb
904  *        Callback to call with the retrieved fragments.
905  * @param rcb
906  *        Callback to call with the result of the operation.
907  * @param cls
908  *        Closure for the callbacks.
909  *
910  * @return Handle that can be used to cancel the operation.
911  */
912 struct GNUNET_PSYCSTORE_OperationHandle *
913 GNUNET_PSYCSTORE_fragment_get_latest (struct GNUNET_PSYCSTORE_Handle *h,
914                                       const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
915                                       const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
916                                       uint64_t fragment_limit,
917                                       GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
918                                       GNUNET_PSYCSTORE_ResultCallback rcb,
919                                       void *cls)
920 {
921   struct FragmentGetRequest *req;
922   struct GNUNET_PSYCSTORE_OperationHandle *
923     op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
924   op->h = h;
925   op->data_cb = (DataCallback) fragment_cb;
926   op->res_cb = rcb;
927   op->cls = cls;
928
929   req = (struct FragmentGetRequest *) &op[1];
930   op->msg = (struct GNUNET_MessageHeader *) req;
931   req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET);
932   req->header.size = htons (sizeof (*req));
933   req->channel_key = *channel_key;
934   req->fragment_limit = GNUNET_ntohll (fragment_limit);
935   if (NULL != slave_key)
936   {
937     req->slave_key = *slave_key;
938     req->do_membership_test = GNUNET_YES;
939   }
940
941   op->op_id = get_next_op_id (h);
942   req->op_id = GNUNET_htonll (op->op_id);
943
944   GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
945   transmit_next (h);
946
947   return op;
948 }
949
950
951 /**
952  * Retrieve all fragments of messages in a message ID range.
953  *
954  * @param h
955  *        Handle for the PSYCstore.
956  * @param channel_key
957  *        The channel we are interested in.
958  * @param slave_key
959  *        The slave requesting the message.
960  *        If not NULL, a membership test is performed first
961  *        and the message is only returned if the slave has access to it.
962  * @param first_message_id
963  *        First message ID to retrieve.
964  * @param last_message_id
965  *        Last consecutive message ID to retrieve.
966  * @param fragment_limit
967  *        Maximum number of fragments to retrieve.
968  * @param method_prefix
969  *        Retrieve only messages with a matching method prefix.
970  * @todo Implement method_prefix query.
971  * @param fragment_cb
972  *        Callback to call with the retrieved fragments.
973  * @param result_cb
974  *        Callback to call with the result of the operation.
975  * @param cls
976  *        Closure for the callbacks.
977  *
978  * @return Handle that can be used to cancel the operation.
979  */
980 struct GNUNET_PSYCSTORE_OperationHandle *
981 GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h,
982                               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
983                               const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
984                               uint64_t first_message_id,
985                               uint64_t last_message_id,
986                               uint64_t fragment_limit,
987                               const char *method_prefix,
988                               GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
989                               GNUNET_PSYCSTORE_ResultCallback rcb,
990                               void *cls)
991 {
992   struct MessageGetRequest *req;
993   if (NULL == method_prefix)
994     method_prefix = "";
995   uint16_t method_size = strnlen (method_prefix,
996                                   GNUNET_SERVER_MAX_MESSAGE_SIZE
997                                   - sizeof (*req)) + 1;
998
999   struct GNUNET_PSYCSTORE_OperationHandle *
1000     op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
1001   op->h = h;
1002   op->data_cb = (DataCallback) fragment_cb;
1003   op->res_cb = rcb;
1004   op->cls = cls;
1005
1006   req = (struct MessageGetRequest *) &op[1];
1007   op->msg = (struct GNUNET_MessageHeader *) req;
1008   req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET);
1009   req->header.size = htons (sizeof (*req) + method_size);
1010   req->channel_key = *channel_key;
1011   req->first_message_id = GNUNET_htonll (first_message_id);
1012   req->last_message_id = GNUNET_htonll (last_message_id);
1013   req->fragment_limit = GNUNET_htonll (fragment_limit);
1014   if (NULL != slave_key)
1015   {
1016     req->slave_key = *slave_key;
1017     req->do_membership_test = GNUNET_YES;
1018   }
1019   memcpy (&req[1], method_prefix, method_size);
1020   ((char *) &req[1])[method_size - 1] = '\0';
1021
1022   op->op_id = get_next_op_id (h);
1023   req->op_id = GNUNET_htonll (op->op_id);
1024
1025   GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1026   transmit_next (h);
1027
1028   return op;
1029 }
1030
1031
1032 /**
1033  * Retrieve all fragments of the latest messages.
1034  *
1035  * @param h
1036  *        Handle for the PSYCstore.
1037  * @param channel_key
1038  *        The channel we are interested in.
1039  * @param slave_key
1040  *        The slave requesting the message.
1041  *        If not NULL, a membership test is performed first
1042  *        and the message is only returned if the slave has access to it.
1043  * @param message_limit
1044  *        Maximum number of messages to retrieve.
1045  * @param method_prefix
1046  *        Retrieve only messages with a matching method prefix.
1047  * @todo Implement method_prefix query.
1048  * @param fragment_cb
1049  *        Callback to call with the retrieved fragments.
1050  * @param result_cb
1051  *        Callback to call with the result of the operation.
1052  * @param cls
1053  *        Closure for the callbacks.
1054  *
1055  * @return Handle that can be used to cancel the operation.
1056  */
1057 struct GNUNET_PSYCSTORE_OperationHandle *
1058 GNUNET_PSYCSTORE_message_get_latest (struct GNUNET_PSYCSTORE_Handle *h,
1059                                      const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1060                                      const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
1061                                      uint64_t message_limit,
1062                                      const char *method_prefix,
1063                                      GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
1064                                      GNUNET_PSYCSTORE_ResultCallback rcb,
1065                                      void *cls)
1066 {
1067   struct MessageGetRequest *req;
1068
1069   if (NULL == method_prefix)
1070     method_prefix = "";
1071   uint16_t method_size = strnlen (method_prefix,
1072                                   GNUNET_SERVER_MAX_MESSAGE_SIZE
1073                                   - sizeof (*req)) + 1;
1074   GNUNET_assert ('\0' == method_prefix[method_size - 1]);
1075
1076   struct GNUNET_PSYCSTORE_OperationHandle *
1077     op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + method_size);
1078   op->h = h;
1079   op->data_cb = (DataCallback) fragment_cb;
1080   op->res_cb = rcb;
1081   op->cls = cls;
1082
1083   req = (struct MessageGetRequest *) &op[1];
1084   op->msg = (struct GNUNET_MessageHeader *) req;
1085   req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET);
1086   req->header.size = htons (sizeof (*req) + method_size);
1087   req->channel_key = *channel_key;
1088   req->message_limit = GNUNET_ntohll (message_limit);
1089   if (NULL != slave_key)
1090   {
1091     req->slave_key = *slave_key;
1092     req->do_membership_test = GNUNET_YES;
1093   }
1094
1095   op->op_id = get_next_op_id (h);
1096   req->op_id = GNUNET_htonll (op->op_id);
1097   memcpy (&req[1], method_prefix, method_size);
1098
1099   GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1100   transmit_next (h);
1101
1102   return op;
1103 }
1104
1105
1106 /**
1107  * Retrieve a fragment of message specified by its message ID and fragment
1108  * offset.
1109  *
1110  * @param h
1111  *        Handle for the PSYCstore.
1112  * @param channel_key
1113  *        The channel we are interested in.
1114  * @param slave_key
1115  *        The slave requesting the message fragment.  If not NULL, a membership
1116  *        test is performed first and the message fragment is only returned
1117  *        if the slave has access to it.
1118  * @param message_id
1119  *        Message ID to retrieve.  Use 0 to get the latest message.
1120  * @param fragment_offset
1121  *        Offset of the fragment to retrieve.
1122  * @param fragment_cb
1123  *        Callback to call with the retrieved fragments.
1124  * @param result_cb
1125  *        Callback to call with the result of the operation.
1126  * @param cls
1127  *        Closure for the callbacks.
1128  *
1129  * @return Handle that can be used to cancel the operation.
1130  */
1131 struct GNUNET_PSYCSTORE_OperationHandle *
1132 GNUNET_PSYCSTORE_message_get_fragment (struct GNUNET_PSYCSTORE_Handle *h,
1133                                        const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1134                                        const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
1135                                        uint64_t message_id,
1136                                        uint64_t fragment_offset,
1137                                        GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
1138                                        GNUNET_PSYCSTORE_ResultCallback rcb,
1139                                        void *cls)
1140 {
1141   struct MessageGetFragmentRequest *req;
1142   struct GNUNET_PSYCSTORE_OperationHandle *
1143     op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
1144   op->h = h;
1145   op->data_cb = (DataCallback) fragment_cb;
1146   op->res_cb = rcb;
1147   op->cls = cls;
1148
1149   req = (struct MessageGetFragmentRequest *) &op[1];
1150   op->msg = (struct GNUNET_MessageHeader *) req;
1151   req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET_FRAGMENT);
1152   req->header.size = htons (sizeof (*req));
1153   req->channel_key = *channel_key;
1154   req->message_id = GNUNET_htonll (message_id);
1155   req->fragment_offset = GNUNET_htonll (fragment_offset);
1156   if (NULL != slave_key)
1157   {
1158     req->slave_key = *slave_key;
1159     req->do_membership_test = GNUNET_YES;
1160   }
1161
1162   op->op_id = get_next_op_id (h);
1163   req->op_id = GNUNET_htonll (op->op_id);
1164
1165   GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1166   transmit_next (h);
1167
1168   return op;
1169 }
1170
1171
1172 /**
1173  * Retrieve latest values of counters for a channel master.
1174  *
1175  * The current value of counters are needed when a channel master is restarted,
1176  * so that it can continue incrementing the counters from their last value.
1177  *
1178  * @param h
1179  *        Handle for the PSYCstore.
1180  * @param channel_key
1181  *        Public key that identifies the channel.
1182  * @param ccb
1183  *        Callback to call with the result.
1184  * @param ccb_cls
1185  *        Closure for the @a ccb callback.
1186  *
1187  * @return Handle that can be used to cancel the operation.
1188  */
1189 struct GNUNET_PSYCSTORE_OperationHandle *
1190 GNUNET_PSYCSTORE_counters_get (struct GNUNET_PSYCSTORE_Handle *h,
1191                                struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1192                                GNUNET_PSYCSTORE_CountersCallback ccb,
1193                                void *ccb_cls)
1194 {
1195   struct OperationRequest *req;
1196   struct GNUNET_PSYCSTORE_OperationHandle *
1197     op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
1198   op->h = h;
1199   op->data_cb = ccb;
1200   op->cls = ccb_cls;
1201
1202   req = (struct OperationRequest *) &op[1];
1203   op->msg = (struct GNUNET_MessageHeader *) req;
1204   req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_COUNTERS_GET);
1205   req->header.size = htons (sizeof (*req));
1206   req->channel_key = *channel_key;
1207
1208   op->op_id = get_next_op_id (h);
1209   req->op_id = GNUNET_htonll (op->op_id);
1210
1211   GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1212   transmit_next (h);
1213
1214   return op;
1215 }
1216
1217
1218 /**
1219  * Apply modifiers of a message to the current channel state.
1220  *
1221  * An error is returned if there are missing messages containing state
1222  * operations before the current one.
1223  *
1224  * @param h
1225  *        Handle for the PSYCstore.
1226  * @param channel_key
1227  *        The channel we are interested in.
1228  * @param message_id
1229  *        ID of the message that contains the @a modifiers.
1230  * @param state_delta
1231  *        Value of the _state_delta PSYC header variable of the message.
1232  * @param rcb
1233  *        Callback to call with the result of the operation.
1234  * @param rcb_cls
1235  *        Closure for the @a rcb callback.
1236  *
1237  * @return Handle that can be used to cancel the operation.
1238  */
1239 struct GNUNET_PSYCSTORE_OperationHandle *
1240 GNUNET_PSYCSTORE_state_modify (struct GNUNET_PSYCSTORE_Handle *h,
1241                                const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1242                                uint64_t message_id,
1243                                uint64_t state_delta,
1244                                GNUNET_PSYCSTORE_ResultCallback rcb,
1245                                void *rcb_cls)
1246 {
1247   struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
1248   struct StateModifyRequest *req;
1249
1250   op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
1251   op->h = h;
1252   op->res_cb = rcb;
1253   op->cls = rcb_cls;
1254
1255   req = (struct StateModifyRequest *) &op[1];
1256   op->msg = (struct GNUNET_MessageHeader *) req;
1257   req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY);
1258   req->header.size = htons (sizeof (*req));
1259   req->channel_key = *channel_key;
1260   req->message_id = GNUNET_htonll (message_id);
1261   req->state_delta = GNUNET_htonll (state_delta);
1262
1263   op->op_id = get_next_op_id (h);
1264   req->op_id = GNUNET_htonll (op->op_id);
1265
1266   GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1267   transmit_next (h);
1268
1269   return op;
1270   /* FIXME: only the last operation is returned,
1271    *        operation_cancel() should be able to cancel all of them.
1272    */
1273 }
1274
1275
1276 /**
1277  * Store synchronized state.
1278  *
1279  * @param h
1280  *        Handle for the PSYCstore.
1281  * @param channel_key
1282  *        The channel we are interested in.
1283  * @param max_state_message_id
1284  *        ID of the last stateful message before @a state_hash_message_id.
1285  * @param state_hash_message_id
1286  *        ID of the message that contains the state_hash PSYC header variable.
1287  * @param modifier_count
1288  *        Number of elements in the @a modifiers array.
1289  * @param modifiers
1290  *        Full state to store.
1291  * @param rcb
1292  *        Callback to call with the result of the operation.
1293  * @param rcb_cls
1294  *        Closure for the callback.
1295  *
1296  * @return Handle that can be used to cancel the operation.
1297  */
1298 struct GNUNET_PSYCSTORE_OperationHandle *
1299 GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h,
1300                              const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1301                              uint64_t max_state_message_id,
1302                              uint64_t state_hash_message_id,
1303                              size_t modifier_count,
1304                              const struct GNUNET_PSYC_Modifier *modifiers,
1305                              GNUNET_PSYCSTORE_ResultCallback rcb,
1306                              void *rcb_cls)
1307 {
1308   struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
1309   size_t i;
1310
1311   for (i = 0; i < modifier_count; i++) {
1312     struct StateSyncRequest *req;
1313     uint16_t name_size = strlen (modifiers[i].name) + 1;
1314
1315     op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size +
1316                         modifiers[i].value_size);
1317     op->h = h;
1318     op->res_cb = rcb;
1319     op->cls = rcb_cls;
1320
1321     req = (struct StateSyncRequest *) &op[1];
1322     op->msg = (struct GNUNET_MessageHeader *) req;
1323     req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC);
1324     req->header.size = htons (sizeof (*req) + name_size
1325                               + modifiers[i].value_size);
1326     req->channel_key = *channel_key;
1327     req->max_state_message_id = GNUNET_htonll (max_state_message_id);
1328     req->state_hash_message_id = GNUNET_htonll (state_hash_message_id);
1329     req->name_size = htons (name_size);
1330     req->flags
1331       = (0 == i)
1332       ? STATE_OP_FIRST
1333       : (modifier_count - 1 == i)
1334       ? STATE_OP_LAST
1335       : 0;
1336
1337     memcpy (&req[1], modifiers[i].name, name_size);
1338     memcpy ((char *) &req[1] + name_size, modifiers[i].value, modifiers[i].value_size);
1339
1340     op->op_id = get_next_op_id (h);
1341     req->op_id = GNUNET_htonll (op->op_id);
1342
1343     GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1344     transmit_next (h);
1345   }
1346   return op;
1347 }
1348
1349
1350 /**
1351  * Reset the state of a channel.
1352  *
1353  * Delete all state variables stored for the given channel.
1354  *
1355  * @param h
1356  *        Handle for the PSYCstore.
1357  * @param channel_key
1358  *        The channel we are interested in.
1359  * @param rcb
1360  *        Callback to call with the result of the operation.
1361  * @param rcb_cls
1362  *        Closure for the callback.
1363  *
1364  * @return Handle that can be used to cancel the operation.
1365  */
1366 struct GNUNET_PSYCSTORE_OperationHandle *
1367 GNUNET_PSYCSTORE_state_reset (struct GNUNET_PSYCSTORE_Handle *h,
1368                               const struct GNUNET_CRYPTO_EddsaPublicKey
1369                               *channel_key,
1370                               GNUNET_PSYCSTORE_ResultCallback rcb,
1371                               void *rcb_cls)
1372 {
1373   struct OperationRequest *req;
1374   struct GNUNET_PSYCSTORE_OperationHandle *
1375     op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
1376   op->h = h;
1377   op->res_cb = rcb;
1378   op->cls = rcb_cls;
1379
1380   req = (struct OperationRequest *) &op[1];
1381   op->msg = (struct GNUNET_MessageHeader *) req;
1382   req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_RESET);
1383   req->header.size = htons (sizeof (*req));
1384   req->channel_key = *channel_key;
1385
1386   op->op_id = get_next_op_id (h);
1387   req->op_id = GNUNET_htonll (op->op_id);
1388
1389   GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1390   transmit_next (h);
1391
1392   return op;
1393 }
1394
1395
1396
1397 /**
1398  * Update signed values of state variables in the state store.
1399  *
1400  * @param h
1401  *        Handle for the PSYCstore.
1402  * @param channel_key
1403  *        The channel we are interested in.
1404  * @param message_id
1405  *        Message ID that contained the state @a hash.
1406  * @param hash
1407  *        Hash of the serialized full state.
1408  * @param rcb
1409  *        Callback to call with the result of the operation.
1410  * @param rcb_cls
1411  *        Closure for the callback.
1412  */
1413 struct GNUNET_PSYCSTORE_OperationHandle *
1414 GNUNET_PSYCSTORE_state_hash_update (struct GNUNET_PSYCSTORE_Handle *h,
1415                                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1416                                     uint64_t message_id,
1417                                     const struct GNUNET_HashCode *hash,
1418                                     GNUNET_PSYCSTORE_ResultCallback rcb,
1419                                     void *rcb_cls)
1420 {
1421   struct StateHashUpdateRequest *req;
1422   struct GNUNET_PSYCSTORE_OperationHandle *
1423     op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
1424   op->h = h;
1425   op->res_cb = rcb;
1426   op->cls = rcb_cls;
1427
1428   req = (struct StateHashUpdateRequest *) &op[1];
1429   op->msg = (struct GNUNET_MessageHeader *) req;
1430   req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_RESET);
1431   req->header.size = htons (sizeof (*req));
1432   req->channel_key = *channel_key;
1433   req->hash = *hash;
1434
1435   op->op_id = get_next_op_id (h);
1436   req->op_id = GNUNET_htonll (op->op_id);
1437
1438   GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1439   transmit_next (h);
1440
1441   return op;
1442 }
1443
1444
1445 /**
1446  * Retrieve the best matching state variable.
1447  *
1448  * @param h
1449  *        Handle for the PSYCstore.
1450  * @param channel_key
1451  *        The channel we are interested in.
1452  * @param name
1453  *        Name of variable to match, the returned variable might be less specific.
1454  * @param scb
1455  *        Callback to return the matching state variable.
1456  * @param rcb
1457  *        Callback to call with the result of the operation.
1458  * @param cls
1459  *        Closure for the callbacks.
1460  *
1461  * @return Handle that can be used to cancel the operation.
1462  */
1463 struct GNUNET_PSYCSTORE_OperationHandle *
1464 GNUNET_PSYCSTORE_state_get (struct GNUNET_PSYCSTORE_Handle *h,
1465                             const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1466                             const char *name,
1467                             GNUNET_PSYCSTORE_StateCallback scb,
1468                             GNUNET_PSYCSTORE_ResultCallback rcb,
1469                             void *cls)
1470 {
1471   size_t name_size = strlen (name) + 1;
1472   struct OperationRequest *req;
1473   struct GNUNET_PSYCSTORE_OperationHandle *
1474     op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size);
1475   op->h = h;
1476   op->data_cb = (DataCallback) scb;
1477   op->res_cb = rcb;
1478   op->cls = cls;
1479
1480   req = (struct OperationRequest *) &op[1];
1481   op->msg = (struct GNUNET_MessageHeader *) req;
1482   req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET);
1483   req->header.size = htons (sizeof (*req) + name_size);
1484   req->channel_key = *channel_key;
1485   memcpy (&req[1], name, name_size);
1486
1487   op->op_id = get_next_op_id (h);
1488   req->op_id = GNUNET_htonll (op->op_id);
1489
1490   GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1491   transmit_next (h);
1492
1493   return op;
1494 }
1495
1496
1497
1498 /**
1499  * Retrieve all state variables for a channel with the given prefix.
1500  *
1501  * @param h
1502  *        Handle for the PSYCstore.
1503  * @param channel_key
1504  *        The channel we are interested in.
1505  * @param name_prefix
1506  *        Prefix of state variable names to match.
1507  * @param scb
1508  *        Callback to return matching state variables.
1509  * @param rcb
1510  *        Callback to call with the result of the operation.
1511  * @param cls
1512  *        Closure for the callbacks.
1513  *
1514  * @return Handle that can be used to cancel the operation.
1515  */
1516 struct GNUNET_PSYCSTORE_OperationHandle *
1517 GNUNET_PSYCSTORE_state_get_prefix (struct GNUNET_PSYCSTORE_Handle *h,
1518                                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1519                                    const char *name_prefix,
1520                                    GNUNET_PSYCSTORE_StateCallback scb,
1521                                    GNUNET_PSYCSTORE_ResultCallback rcb,
1522                                    void *cls)
1523 {
1524   size_t name_size = strlen (name_prefix) + 1;
1525   struct OperationRequest *req;
1526   struct GNUNET_PSYCSTORE_OperationHandle *
1527     op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size);
1528   op->h = h;
1529   op->data_cb = (DataCallback) scb;
1530   op->res_cb = rcb;
1531   op->cls = cls;
1532
1533   req = (struct OperationRequest *) &op[1];
1534   op->msg = (struct GNUNET_MessageHeader *) req;
1535   req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET_PREFIX);
1536   req->header.size = htons (sizeof (*req) + name_size);
1537   req->channel_key = *channel_key;
1538   memcpy (&req[1], name_prefix, name_size);
1539
1540   op->op_id = get_next_op_id (h);
1541   req->op_id = GNUNET_htonll (op->op_id);
1542
1543   GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1544   transmit_next (h);
1545
1546   return op;
1547 }
1548
1549 /* end of psycstore_api.c */