first batch of license fixes (boring)
[oweals/gnunet.git] / src / psycstore / psycstore_api.c
1 /*
2  * This file is part of GNUnet
3  * Copyright (C) 2013 GNUnet e.V.
4  *
5  * GNUnet is free software: you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published
7  * by the Free Software Foundation, either version 3 of the License,
8  * or (at your option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Affero General Public License for more details.
14  */
15
16 /**
17  * @file psycstore/psycstore_api.c
18  * @brief API to interact with the PSYCstore service
19  * @author Gabor X Toth
20  * @author Christian Grothoff
21  */
22
23 #include <inttypes.h>
24
25 #include "platform.h"
26 #include "gnunet_util_lib.h"
27 #include "gnunet_constants.h"
28 #include "gnunet_protocols.h"
29 #include "gnunet_psycstore_service.h"
30 #include "gnunet_multicast_service.h"
31 #include "psycstore.h"
32
33 #define LOG(kind,...) GNUNET_log_from (kind, "psycstore-api",__VA_ARGS__)
34
35 /**
36  * Handle for an operation with the PSYCstore service.
37  */
38 struct GNUNET_PSYCSTORE_OperationHandle
39 {
40
41   /**
42    * Main PSYCstore handle.
43    */
44   struct GNUNET_PSYCSTORE_Handle *h;
45
46   /**
47    * Data callbacks.
48    */
49   union {
50     GNUNET_PSYCSTORE_FragmentCallback fragment_cb;
51     GNUNET_PSYCSTORE_CountersCallback counters_cb;
52     GNUNET_PSYCSTORE_StateCallback state_cb;
53   };
54
55   /**
56    * Closure for callbacks.
57    */
58   void *cls;
59
60   /**
61    * Message envelope.
62    */
63   struct GNUNET_MQ_Envelope *env;
64
65   /**
66    * Operation ID.
67    */
68   uint64_t op_id;
69 };
70
71
72 /**
73  * Handle for the service.
74  */
75 struct GNUNET_PSYCSTORE_Handle
76 {
77   /**
78    * Configuration to use.
79    */
80   const struct GNUNET_CONFIGURATION_Handle *cfg;
81
82   /**
83    * Client connection.
84    */
85   struct GNUNET_MQ_Handle *mq;
86
87   /**
88    * Async operations.
89    */
90   struct GNUNET_OP_Handle *op;
91
92   /**
93    * Task doing exponential back-off trying to reconnect.
94    */
95   struct GNUNET_SCHEDULER_Task *reconnect_task;
96
97   /**
98    * Delay for next connect retry.
99    */
100   struct GNUNET_TIME_Relative reconnect_delay;
101
102
103   GNUNET_PSYCSTORE_FragmentCallback *fragment_cb;
104
105   GNUNET_PSYCSTORE_CountersCallback *counters_cb;
106
107   GNUNET_PSYCSTORE_StateCallback *state_cb;
108   /**
109    * Closure for callbacks.
110    */
111   void *cb_cls;
112 };
113
114
115 static int
116 check_result_code (void *cls, const struct OperationResult *opres)
117 {
118   uint16_t size = ntohs (opres->header.size);
119   const char *str = (const char *) &opres[1];
120   if ( (sizeof (*opres) < size) &&
121        ('\0' != str[size - sizeof (*opres) - 1]) )
122   {
123     GNUNET_break (0);
124     return GNUNET_SYSERR;
125   }
126
127   return GNUNET_OK;
128 }
129
130
131 static void
132 handle_result_code (void *cls, const struct OperationResult *opres)
133 {
134   struct GNUNET_PSYCSTORE_Handle *h = cls;
135   struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
136   uint16_t size = ntohs (opres->header.size);
137
138   const char *
139     str = (sizeof (*opres) < size) ? (const char *) &opres[1] : "";
140
141   if (GNUNET_YES == GNUNET_OP_result (h->op, GNUNET_ntohll (opres->op_id),
142                                       GNUNET_ntohll (opres->result_code) + INT64_MIN,
143                                       str, size - sizeof (*opres), (void **) &op))
144   {
145     LOG (GNUNET_ERROR_TYPE_DEBUG,
146          "handle_result_code: Received result message with OP ID: %" PRIu64 "\n",
147          GNUNET_ntohll (opres->op_id));
148     GNUNET_free (op);
149   }
150   else
151   {
152     LOG (GNUNET_ERROR_TYPE_DEBUG,
153          "handle_result_code: No callback registered for OP ID %" PRIu64 ".\n",
154          GNUNET_ntohll (opres->op_id));
155   }
156   h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
157 }
158
159
160 static void
161 handle_result_counters (void *cls, const struct CountersResult *cres)
162 {
163   struct GNUNET_PSYCSTORE_Handle *h = cls;
164   struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
165
166   if (GNUNET_YES == GNUNET_OP_get (h->op, GNUNET_ntohll (cres->op_id),
167                                    NULL, NULL, (void **) &op))
168   {
169     GNUNET_assert (NULL != op);
170     if (NULL != op->counters_cb)
171     {
172       op->counters_cb (op->cls,
173                        ntohl (cres->result_code),
174                        GNUNET_ntohll (cres->max_fragment_id),
175                        GNUNET_ntohll (cres->max_message_id),
176                        GNUNET_ntohll (cres->max_group_generation),
177                        GNUNET_ntohll (cres->max_state_message_id));
178     }
179     GNUNET_OP_remove (h->op, GNUNET_ntohll (cres->op_id));
180     GNUNET_free (op);
181   }
182   else
183   {
184     LOG (GNUNET_ERROR_TYPE_DEBUG,
185          "handle_result_counters: No callback registered for OP ID %" PRIu64 ".\n",
186          GNUNET_ntohll (cres->op_id));
187   }
188   h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
189 }
190
191
192 static int
193 check_result_fragment (void *cls, const struct FragmentResult *fres)
194 {
195   uint16_t size = ntohs (fres->header.size);
196   struct GNUNET_MULTICAST_MessageHeader *mmsg =
197     (struct GNUNET_MULTICAST_MessageHeader *) &fres[1];
198   if (sizeof (*fres) + sizeof (*mmsg) < size
199       && sizeof (*fres) + ntohs (mmsg->header.size) != size)
200   {
201     LOG (GNUNET_ERROR_TYPE_ERROR,
202          "check_result_fragment: Received message with invalid length %lu bytes.\n",
203          size, sizeof (*fres));
204     GNUNET_break (0);
205     return GNUNET_SYSERR;
206   }
207   return GNUNET_OK;
208 }
209
210
211 static void
212 handle_result_fragment (void *cls, const struct FragmentResult *fres)
213 {
214   struct GNUNET_PSYCSTORE_Handle *h = cls;
215   struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
216
217   if (GNUNET_YES == GNUNET_OP_get (h->op, GNUNET_ntohll (fres->op_id),
218                                    NULL, NULL, (void **) &op))
219   {
220     GNUNET_assert (NULL != op);
221     if (NULL != op->fragment_cb)
222       op->fragment_cb (op->cls,
223                        (struct GNUNET_MULTICAST_MessageHeader *) &fres[1],
224                        ntohl (fres->psycstore_flags));
225     //GNUNET_OP_remove (h->op, GNUNET_ntohll (fres->op_id));
226     //GNUNET_free (op);
227   }
228   else
229   {
230     LOG (GNUNET_ERROR_TYPE_DEBUG,
231          "handle_result_fragment: No callback registered for OP ID %" PRIu64 ".\n",
232          GNUNET_ntohll (fres->op_id));
233   }
234   h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
235 }
236
237
238 static int
239 check_result_state (void *cls, const struct StateResult *sres)
240 {
241   const char *name = (const char *) &sres[1];
242   uint16_t size = ntohs (sres->header.size);
243   uint16_t name_size = ntohs (sres->name_size);
244
245   if (name_size <= 2
246       || size - sizeof (*sres) < name_size
247       || '\0' != name[name_size - 1])
248   {
249     LOG (GNUNET_ERROR_TYPE_ERROR,
250          "check_result_state: Received state result message with invalid name.\n");
251     GNUNET_break (0);
252     return GNUNET_SYSERR;
253   }
254   return GNUNET_OK;
255 }
256
257
258 static void
259 handle_result_state (void *cls, const struct StateResult *sres)
260 {
261   struct GNUNET_PSYCSTORE_Handle *h = cls;
262   struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
263
264   const char *name = (const char *) &sres[1];
265   uint16_t name_size = ntohs (sres->name_size);
266
267   if (GNUNET_YES == GNUNET_OP_get (h->op, GNUNET_ntohll (sres->op_id),
268                                    NULL, NULL, (void **) &op))
269   {
270     GNUNET_assert (NULL != op);
271     if (NULL != op->state_cb)
272        op->state_cb (op->cls, name, (char *) &sres[1] + name_size,
273                      ntohs (sres->header.size) - sizeof (*sres) - name_size);
274     //GNUNET_OP_remove (h->op, GNUNET_ntohll (sres->op_id));
275     //GNUNET_free (op);
276   }
277   else
278   {
279     LOG (GNUNET_ERROR_TYPE_DEBUG,
280          "handle_result_state: No callback registered for OP ID %" PRIu64 ".\n",
281          GNUNET_ntohll (sres->op_id));
282   }
283   h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
284 }
285
286
287 static void
288 reconnect (void *cls);
289
290
291 /**
292  * Client disconnected from service.
293  *
294  * Reconnect after backoff period.=
295  */
296 static void
297 disconnected (void *cls, enum GNUNET_MQ_Error error)
298 {
299   struct GNUNET_PSYCSTORE_Handle *h = cls;
300
301   LOG (GNUNET_ERROR_TYPE_DEBUG,
302        "Origin client disconnected (%d), re-connecting\n",
303        (int) error);
304   if (NULL != h->mq)
305   {
306     GNUNET_MQ_destroy (h->mq);
307     GNUNET_OP_destroy (h->op);
308     h->mq = NULL;
309     h->op = NULL;
310   }
311
312   h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_delay,
313                                                     &reconnect, h);
314   h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
315 }
316
317
318 static void
319 do_connect (struct GNUNET_PSYCSTORE_Handle *h)
320 {
321   LOG (GNUNET_ERROR_TYPE_DEBUG,
322        "Connecting to PSYCstore service.\n");
323
324   struct GNUNET_MQ_MessageHandler handlers[] = {
325     GNUNET_MQ_hd_var_size (result_code,
326                            GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE,
327                            struct OperationResult,
328                            h),
329     GNUNET_MQ_hd_fixed_size (result_counters,
330                              GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS,
331                              struct CountersResult,
332                              h),
333     GNUNET_MQ_hd_var_size (result_fragment,
334                            GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_FRAGMENT,
335                            struct FragmentResult,
336                            h),
337     GNUNET_MQ_hd_var_size (result_state,
338                            GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_STATE,
339                            struct StateResult,
340                            h),
341     GNUNET_MQ_handler_end ()
342   };
343
344   h->op = GNUNET_OP_create ();
345   GNUNET_assert (NULL == h->mq);
346   h->mq = GNUNET_CLIENT_connect (h->cfg, "psycstore",
347                                  handlers, disconnected, h);
348   GNUNET_assert (NULL != h->mq);
349 }
350
351
352 /**
353  * Try again to connect to the PSYCstore service.
354  *
355  * @param cls Handle to the PSYCstore service.
356  */
357 static void
358 reconnect (void *cls)
359 {
360   struct GNUNET_PSYCSTORE_Handle *h = cls;
361
362   h->reconnect_task = NULL;
363   do_connect (cls);
364 }
365
366
367 /**
368  * Connect to the PSYCstore service.
369  *
370  * @param cfg The configuration to use
371  * @return Handle to use
372  */
373 struct GNUNET_PSYCSTORE_Handle *
374 GNUNET_PSYCSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
375 {
376   struct GNUNET_PSYCSTORE_Handle *h
377     = GNUNET_new (struct GNUNET_PSYCSTORE_Handle);
378   h->cfg = cfg;
379   h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
380   do_connect (h);
381   return h;
382 }
383
384
385 /**
386  * Disconnect from PSYCstore service
387  *
388  * @param h Handle to destroy
389  */
390 void
391 GNUNET_PSYCSTORE_disconnect (struct GNUNET_PSYCSTORE_Handle *h)
392 {
393   GNUNET_assert (NULL != h);
394   if (h->reconnect_task != NULL)
395   {
396     GNUNET_SCHEDULER_cancel (h->reconnect_task);
397     h->reconnect_task = NULL;
398   }
399   if (NULL != h->mq)
400   {
401     // FIXME: free data structures for pending operations
402     GNUNET_MQ_destroy (h->mq);
403     h->mq = NULL;
404   }
405   GNUNET_free (h);
406 }
407
408
409 /**
410  * Message sent notification.
411  *
412  * Remove invalidated envelope pointer.
413  */
414 static void
415 message_sent (void *cls)
416 {
417   struct GNUNET_PSYCSTORE_OperationHandle *op = cls;
418   op->env = NULL;
419 }
420
421
422 /**
423  * Create a new operation.
424  */
425 static struct GNUNET_PSYCSTORE_OperationHandle *
426 op_create (struct GNUNET_PSYCSTORE_Handle *h,
427            struct GNUNET_OP_Handle *hop,
428            GNUNET_PSYCSTORE_ResultCallback result_cb,
429            void *cls)
430 {
431   struct GNUNET_PSYCSTORE_OperationHandle *
432     op = GNUNET_malloc (sizeof (*op));
433   op->h = h;
434   op->op_id = GNUNET_OP_add (hop,
435                              (GNUNET_ResultCallback) result_cb,
436                              cls, op);
437   return op;
438 }
439
440
441 /**
442  * Send a message associated with an operation.
443  *
444  * @param h
445  *        PSYCstore handle.
446  * @param op
447  *        Operation handle.
448  * @param env
449  *        Message envelope to send.
450  * @param[out] op_id
451  *        Operation ID to write in network byte order. NULL if not needed.
452  *
453  * @return Operation handle.
454  *
455  */
456 static struct GNUNET_PSYCSTORE_OperationHandle *
457 op_send (struct GNUNET_PSYCSTORE_Handle *h,
458          struct GNUNET_PSYCSTORE_OperationHandle *op,
459          struct GNUNET_MQ_Envelope *env,
460          uint64_t *op_id)
461 {
462   op->env = env;
463   if (NULL != op_id)
464     *op_id = GNUNET_htonll (op->op_id);
465
466   GNUNET_MQ_notify_sent (env, message_sent, op);
467   GNUNET_MQ_send (h->mq, env);
468   return op;
469 }
470
471
472 /**
473  * Cancel a PSYCstore operation. Note that the operation MAY still
474  * be executed; this merely cancels the continuation; if the request
475  * was already transmitted, the service may still choose to complete
476  * the operation.
477  *
478  * @param op Operation to cancel.
479  *
480  * @return #GNUNET_YES if message was not sent yet and got discarded,
481  *         #GNUNET_NO  if it was already sent, and only the callbacks got cancelled.
482  */
483 int
484 GNUNET_PSYCSTORE_operation_cancel (struct GNUNET_PSYCSTORE_OperationHandle *op)
485 {
486   struct GNUNET_PSYCSTORE_Handle *h = op->h;
487   int ret = GNUNET_NO;
488
489   if (NULL != op->env)
490   {
491     GNUNET_MQ_send_cancel (op->env);
492     ret = GNUNET_YES;
493   }
494
495   GNUNET_OP_remove (h->op, op->op_id);
496   GNUNET_free (op);
497
498   return ret;
499 }
500
501
502 /**
503  * Store join/leave events for a PSYC channel in order to be able to answer
504  * membership test queries later.
505  *
506  * @param h
507  *        Handle for the PSYCstore.
508  * @param channel_key
509  *        The channel where the event happened.
510  * @param slave_key
511  *        Public key of joining/leaving slave.
512  * @param did_join
513  *        #GNUNET_YES on join, #GNUNET_NO on part.
514  * @param announced_at
515  *        ID of the message that announced the membership change.
516  * @param effective_since
517  *        Message ID this membership change is in effect since.
518  *        For joins it is <= announced_at, for parts it is always 0.
519  * @param group_generation
520  *        In case of a part, the last group generation the slave has access to.
521  *        It has relevance when a larger message have fragments with different
522  *        group generations.
523  * @param result_cb
524  *        Callback to call with the result of the storage operation.
525  * @param cls
526  *        Closure for the callback.
527  *
528  * @return Operation handle that can be used to cancel the operation.
529  */
530 struct GNUNET_PSYCSTORE_OperationHandle *
531 GNUNET_PSYCSTORE_membership_store (struct GNUNET_PSYCSTORE_Handle *h,
532                                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
533                                    const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
534                                    int did_join,
535                                    uint64_t announced_at,
536                                    uint64_t effective_since,
537                                    uint64_t group_generation,
538                                    GNUNET_PSYCSTORE_ResultCallback result_cb,
539                                    void *cls)
540 {
541   GNUNET_assert (NULL != h);
542   GNUNET_assert (NULL != channel_key);
543   GNUNET_assert (NULL != slave_key);
544   GNUNET_assert (GNUNET_YES == did_join || GNUNET_NO == did_join);
545   GNUNET_assert (did_join
546                  ? effective_since <= announced_at
547                  : effective_since == 0);
548
549   struct MembershipStoreRequest *req;
550   struct GNUNET_MQ_Envelope *
551     env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_STORE);
552   req->channel_key = *channel_key;
553   req->slave_key = *slave_key;
554   req->did_join = did_join;
555   req->announced_at = GNUNET_htonll (announced_at);
556   req->effective_since = GNUNET_htonll (effective_since);
557   req->group_generation = GNUNET_htonll (group_generation);
558
559   return
560     op_send (h, op_create (h, h->op, result_cb, cls),
561              env, &req->op_id);
562 }
563
564
565 /**
566  * Test if a member was admitted to the channel at the given message ID.
567  *
568  * This is useful when relaying and replaying messages to check if a particular
569  * slave has access to the message fragment with a given group generation.  It
570  * is also used when handling join requests to determine whether the slave is
571  * currently admitted to the channel.
572  *
573  * @param h
574  *        Handle for the PSYCstore.
575  * @param channel_key
576  *        The channel we are interested in.
577  * @param slave_key
578  *        Public key of slave whose membership to check.
579  * @param message_id
580  *        Message ID for which to do the membership test.
581  * @param group_generation
582  *        Group generation of the fragment of the message to test.
583  *        It has relevance if the message consists of multiple fragments with
584  *        different group generations.
585  * @param result_cb
586  *        Callback to call with the test result.
587  * @param cls
588  *        Closure for the callback.
589  *
590  * @return Operation handle that can be used to cancel the operation.
591  */
592 struct GNUNET_PSYCSTORE_OperationHandle *
593 GNUNET_PSYCSTORE_membership_test (struct GNUNET_PSYCSTORE_Handle *h,
594                                   const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
595                                   const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
596                                   uint64_t message_id,
597                                   uint64_t group_generation,
598                                   GNUNET_PSYCSTORE_ResultCallback result_cb,
599                                   void *cls)
600 {
601   struct MembershipTestRequest *req;
602   struct GNUNET_MQ_Envelope *
603     env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_TEST);
604   req->channel_key = *channel_key;
605   req->slave_key = *slave_key;
606   req->message_id = GNUNET_htonll (message_id);
607   req->group_generation = GNUNET_htonll (group_generation);
608
609   return
610     op_send (h, op_create (h, h->op, result_cb, cls),
611              env, &req->op_id);
612 }
613
614
615 /**
616  * Store a message fragment sent to a channel.
617  *
618  * @param h Handle for the PSYCstore.
619  * @param channel_key The channel the message belongs to.
620  * @param message Message to store.
621  * @param psycstore_flags Flags indicating whether the PSYC message contains
622  *        state modifiers.
623  * @param result_cb Callback to call with the result of the operation.
624  * @param cls Closure for the callback.
625  *
626  * @return Handle that can be used to cancel the operation.
627  */
628 struct GNUNET_PSYCSTORE_OperationHandle *
629 GNUNET_PSYCSTORE_fragment_store (struct GNUNET_PSYCSTORE_Handle *h,
630                                  const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
631                                  const struct GNUNET_MULTICAST_MessageHeader *msg,
632                                  enum GNUNET_PSYCSTORE_MessageFlags psycstore_flags,
633                                  GNUNET_PSYCSTORE_ResultCallback result_cb,
634                                  void *cls)
635 {
636   uint16_t size = ntohs (msg->header.size);
637   struct FragmentStoreRequest *req;
638   struct GNUNET_MQ_Envelope *
639     env = GNUNET_MQ_msg_extra (req, size,
640                                GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_STORE);
641   req->channel_key = *channel_key;
642   req->psycstore_flags = htonl (psycstore_flags);
643   GNUNET_memcpy (&req[1], msg, size);
644
645   return
646     op_send (h, op_create (h, h->op, result_cb, cls),
647              env, &req->op_id);
648 }
649
650
651 /**
652  * Retrieve message fragments by fragment ID range.
653  *
654  * @param h
655  *        Handle for the PSYCstore.
656  * @param channel_key
657  *        The channel we are interested in.
658  * @param slave_key
659  *        The slave requesting the fragment.  If not NULL, a membership test is
660  *        performed first and the fragment is only returned if the slave has
661  *        access to it.
662  * @param first_fragment_id
663  *        First fragment ID to retrieve.
664  *        Use 0 to get the latest message fragment.
665  * @param last_fragment_id
666  *        Last consecutive fragment ID to retrieve.
667  *        Use 0 to get the latest message fragment.
668  * @param fragment_limit
669  *        Maximum number of fragments to retrieve.
670  * @param fragment_cb
671  *        Callback to call with the retrieved fragments.
672  * @param result_cb
673  *        Callback to call with the result of the operation.
674  * @param cls
675  *        Closure for the callbacks.
676  *
677  * @return Handle that can be used to cancel the operation.
678  */
679 struct GNUNET_PSYCSTORE_OperationHandle *
680 GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h,
681                                const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
682                                const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
683                                uint64_t first_fragment_id,
684                                uint64_t last_fragment_id,
685                                GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
686                                GNUNET_PSYCSTORE_ResultCallback result_cb,
687                                void *cls)
688 {
689   struct FragmentGetRequest *req;
690   struct GNUNET_MQ_Envelope *
691     env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET);
692   req->channel_key = *channel_key;
693   req->first_fragment_id = GNUNET_htonll (first_fragment_id);
694   req->last_fragment_id = GNUNET_htonll (last_fragment_id);
695   if (NULL != slave_key)
696   {
697     req->slave_key = *slave_key;
698     req->do_membership_test = GNUNET_YES;
699   }
700
701   struct GNUNET_PSYCSTORE_OperationHandle *
702     op = op_create (h, h->op, result_cb, cls);
703   op->fragment_cb = fragment_cb;
704   op->cls = cls;
705   return op_send (h, op, env, &req->op_id);
706 }
707
708
709 /**
710  * Retrieve latest message fragments.
711  *
712  * @param h
713  *        Handle for the PSYCstore.
714  * @param channel_key
715  *        The channel we are interested in.
716  * @param slave_key
717  *        The slave requesting the fragment.  If not NULL, a membership test is
718  *        performed first and the fragment is only returned if the slave has
719  *        access to it.
720  * @param first_fragment_id
721  *        First fragment ID to retrieve.
722  *        Use 0 to get the latest message fragment.
723  * @param last_fragment_id
724  *        Last consecutive fragment ID to retrieve.
725  *        Use 0 to get the latest message fragment.
726  * @param fragment_limit
727  *        Maximum number of fragments to retrieve.
728  * @param fragment_cb
729  *        Callback to call with the retrieved fragments.
730  * @param result_cb
731  *        Callback to call with the result of the operation.
732  * @param cls
733  *        Closure for the callbacks.
734  *
735  * @return Handle that can be used to cancel the operation.
736  */
737 struct GNUNET_PSYCSTORE_OperationHandle *
738 GNUNET_PSYCSTORE_fragment_get_latest (struct GNUNET_PSYCSTORE_Handle *h,
739                                       const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
740                                       const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
741                                       uint64_t fragment_limit,
742                                       GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
743                                       GNUNET_PSYCSTORE_ResultCallback result_cb,
744                                       void *cls)
745 {
746   struct FragmentGetRequest *req;
747   struct GNUNET_MQ_Envelope *
748     env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET);
749   req->channel_key = *channel_key;
750   req->fragment_limit = GNUNET_ntohll (fragment_limit);
751   if (NULL != slave_key)
752   {
753     req->slave_key = *slave_key;
754     req->do_membership_test = GNUNET_YES;
755   }
756
757   struct GNUNET_PSYCSTORE_OperationHandle *
758     op = op_create (h, h->op, result_cb, cls);
759   op->fragment_cb = fragment_cb;
760   op->cls = cls;
761   return op_send (h, op, env, &req->op_id);
762 }
763
764
765 /**
766  * Retrieve all fragments of messages in a message ID range.
767  *
768  * @param h
769  *        Handle for the PSYCstore.
770  * @param channel_key
771  *        The channel we are interested in.
772  * @param slave_key
773  *        The slave requesting the message.
774  *        If not NULL, a membership test is performed first
775  *        and the message is only returned if the slave has access to it.
776  * @param first_message_id
777  *        First message ID to retrieve.
778  * @param last_message_id
779  *        Last consecutive message ID to retrieve.
780  * @param fragment_limit
781  *        Maximum number of fragments to retrieve.
782  * @param method_prefix
783  *        Retrieve only messages with a matching method prefix.
784  * @todo Implement method_prefix query.
785  * @param fragment_cb
786  *        Callback to call with the retrieved fragments.
787  * @param result_cb
788  *        Callback to call with the result of the operation.
789  * @param cls
790  *        Closure for the callbacks.
791  *
792  * @return Handle that can be used to cancel the operation.
793  */
794 struct GNUNET_PSYCSTORE_OperationHandle *
795 GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h,
796                               const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
797                               const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
798                               uint64_t first_message_id,
799                               uint64_t last_message_id,
800                               uint64_t fragment_limit,
801                               const char *method_prefix,
802                               GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
803                               GNUNET_PSYCSTORE_ResultCallback result_cb,
804                               void *cls)
805 {
806   struct MessageGetRequest *req;
807   if (NULL == method_prefix)
808     method_prefix = "";
809   uint16_t method_size = strnlen (method_prefix,
810                                   GNUNET_MAX_MESSAGE_SIZE
811                                   - sizeof (*req)) + 1;
812
813   struct GNUNET_MQ_Envelope *
814     env = GNUNET_MQ_msg_extra (req, method_size,
815                                GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET);
816   req->channel_key = *channel_key;
817   req->first_message_id = GNUNET_htonll (first_message_id);
818   req->last_message_id = GNUNET_htonll (last_message_id);
819   req->fragment_limit = GNUNET_htonll (fragment_limit);
820   if (NULL != slave_key)
821   {
822     req->slave_key = *slave_key;
823     req->do_membership_test = GNUNET_YES;
824   }
825   GNUNET_memcpy (&req[1], method_prefix, method_size);
826   ((char *) &req[1])[method_size - 1] = '\0';
827
828   struct GNUNET_PSYCSTORE_OperationHandle *
829     op = op_create (h, h->op, result_cb, cls);
830   op->fragment_cb = fragment_cb;
831   op->cls = cls;
832   return op_send (h, op, env, &req->op_id);
833 }
834
835
836 /**
837  * Retrieve all fragments of the latest messages.
838  *
839  * @param h
840  *        Handle for the PSYCstore.
841  * @param channel_key
842  *        The channel we are interested in.
843  * @param slave_key
844  *        The slave requesting the message.
845  *        If not NULL, a membership test is performed first
846  *        and the message is only returned if the slave has access to it.
847  * @param message_limit
848  *        Maximum number of messages to retrieve.
849  * @param method_prefix
850  *        Retrieve only messages with a matching method prefix.
851  * @todo Implement method_prefix query.
852  * @param fragment_cb
853  *        Callback to call with the retrieved fragments.
854  * @param result_cb
855  *        Callback to call with the result of the operation.
856  * @param cls
857  *        Closure for the callbacks.
858  *
859  * @return Handle that can be used to cancel the operation.
860  */
861 struct GNUNET_PSYCSTORE_OperationHandle *
862 GNUNET_PSYCSTORE_message_get_latest (struct GNUNET_PSYCSTORE_Handle *h,
863                                      const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
864                                      const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
865                                      uint64_t message_limit,
866                                      const char *method_prefix,
867                                      GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
868                                      GNUNET_PSYCSTORE_ResultCallback result_cb,
869                                      void *cls)
870 {
871   struct MessageGetRequest *req;
872
873   if (NULL == method_prefix)
874     method_prefix = "";
875   uint16_t method_size = strnlen (method_prefix,
876                                   GNUNET_MAX_MESSAGE_SIZE
877                                   - sizeof (*req)) + 1;
878   GNUNET_assert ('\0' == method_prefix[method_size - 1]);
879
880   struct GNUNET_MQ_Envelope *
881     env = GNUNET_MQ_msg_extra (req, method_size,
882                                GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET);
883   req->channel_key = *channel_key;
884   req->message_limit = GNUNET_ntohll (message_limit);
885   if (NULL != slave_key)
886   {
887     req->slave_key = *slave_key;
888     req->do_membership_test = GNUNET_YES;
889   }
890   GNUNET_memcpy (&req[1], method_prefix, method_size);
891
892   struct GNUNET_PSYCSTORE_OperationHandle *
893     op = op_create (h, h->op, result_cb, cls);
894   op->fragment_cb = fragment_cb;
895   op->cls = cls;
896   return op_send (h, op, env, &req->op_id);
897 }
898
899
900 /**
901  * Retrieve a fragment of message specified by its message ID and fragment
902  * offset.
903  *
904  * @param h
905  *        Handle for the PSYCstore.
906  * @param channel_key
907  *        The channel we are interested in.
908  * @param slave_key
909  *        The slave requesting the message fragment.  If not NULL, a membership
910  *        test is performed first and the message fragment is only returned
911  *        if the slave has access to it.
912  * @param message_id
913  *        Message ID to retrieve.  Use 0 to get the latest message.
914  * @param fragment_offset
915  *        Offset of the fragment to retrieve.
916  * @param fragment_cb
917  *        Callback to call with the retrieved fragments.
918  * @param result_cb
919  *        Callback to call with the result of the operation.
920  * @param cls
921  *        Closure for the callbacks.
922  *
923  * @return Handle that can be used to cancel the operation.
924  */
925 struct GNUNET_PSYCSTORE_OperationHandle *
926 GNUNET_PSYCSTORE_message_get_fragment (struct GNUNET_PSYCSTORE_Handle *h,
927                                        const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
928                                        const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
929                                        uint64_t message_id,
930                                        uint64_t fragment_offset,
931                                        GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
932                                        GNUNET_PSYCSTORE_ResultCallback result_cb,
933                                        void *cls)
934 {
935   struct MessageGetFragmentRequest *req;
936   struct GNUNET_MQ_Envelope *
937     env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET_FRAGMENT);
938
939   req->channel_key = *channel_key;
940   req->message_id = GNUNET_htonll (message_id);
941   req->fragment_offset = GNUNET_htonll (fragment_offset);
942   if (NULL != slave_key)
943   {
944     req->slave_key = *slave_key;
945     req->do_membership_test = GNUNET_YES;
946   }
947
948   struct GNUNET_PSYCSTORE_OperationHandle *
949     op = op_create (h, h->op, result_cb, cls);
950   op->fragment_cb = fragment_cb;
951   op->cls = cls;
952   return op_send (h, op, env, &req->op_id);
953 }
954
955
956 /**
957  * Retrieve latest values of counters for a channel master.
958  *
959  * The current value of counters are needed when a channel master is restarted,
960  * so that it can continue incrementing the counters from their last value.
961  *
962  * @param h
963  *        Handle for the PSYCstore.
964  * @param channel_key
965  *        Public key that identifies the channel.
966  * @param ccb
967  *        Callback to call with the result.
968  * @param ccb_cls
969  *        Closure for the @a ccb callback.
970  *
971  * @return Handle that can be used to cancel the operation.
972  */
973 struct GNUNET_PSYCSTORE_OperationHandle *
974 GNUNET_PSYCSTORE_counters_get (struct GNUNET_PSYCSTORE_Handle *h,
975                                struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
976                                GNUNET_PSYCSTORE_CountersCallback counters_cb,
977                                void *cls)
978 {
979   struct OperationRequest *req;
980   struct GNUNET_MQ_Envelope *
981     env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_COUNTERS_GET);
982   req->channel_key = *channel_key;
983
984   struct GNUNET_PSYCSTORE_OperationHandle *
985     op = op_create (h, h->op, NULL, NULL);
986   op->counters_cb = counters_cb;
987   op->cls = cls;
988   return op_send (h, op, env, &req->op_id);
989 }
990
991
992 /**
993  * Apply modifiers of a message to the current channel state.
994  *
995  * An error is returned if there are missing messages containing state
996  * operations before the current one.
997  *
998  * @param h
999  *        Handle for the PSYCstore.
1000  * @param channel_key
1001  *        The channel we are interested in.
1002  * @param message_id
1003  *        ID of the message that contains the @a modifiers.
1004  * @param state_delta
1005  *        Value of the _state_delta PSYC header variable of the message.
1006  * @param result_cb
1007  *        Callback to call with the result of the operation.
1008  * @param cls
1009  *        Closure for @a result_cb.
1010  *
1011  * @return Handle that can be used to cancel the operation.
1012  */
1013 struct GNUNET_PSYCSTORE_OperationHandle *
1014 GNUNET_PSYCSTORE_state_modify (struct GNUNET_PSYCSTORE_Handle *h,
1015                                const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1016                                uint64_t message_id,
1017                                uint64_t state_delta,
1018                                GNUNET_PSYCSTORE_ResultCallback result_cb,
1019                                void *cls)
1020 {
1021   struct StateModifyRequest *req;
1022   struct GNUNET_MQ_Envelope *
1023     env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY);
1024   req->channel_key = *channel_key;
1025   req->message_id = GNUNET_htonll (message_id);
1026   req->state_delta = GNUNET_htonll (state_delta);
1027
1028   return op_send (h, op_create (h, h->op, result_cb, cls),
1029                   env, &req->op_id);
1030 }
1031
1032
1033 struct StateSyncClosure
1034 {
1035   GNUNET_PSYCSTORE_ResultCallback result_cb;
1036   void *cls;
1037   uint8_t last;
1038 };
1039
1040
1041 static void
1042 state_sync_result (void *cls, int64_t result,
1043                    const char *err_msg, uint16_t err_msg_size)
1044 {
1045   struct StateSyncClosure *ssc = cls;
1046   if (GNUNET_OK != result || ssc->last)
1047     ssc->result_cb (ssc->cls, result, err_msg, err_msg_size);
1048   GNUNET_free (ssc);
1049 }
1050
1051
1052 /**
1053  * Store synchronized state.
1054  *
1055  * @param h
1056  *        Handle for the PSYCstore.
1057  * @param channel_key
1058  *        The channel we are interested in.
1059  * @param max_state_message_id
1060  *        ID of the last stateful message before @a state_hash_message_id.
1061  * @param state_hash_message_id
1062  *        ID of the message that contains the state_hash PSYC header variable.
1063  * @param modifier_count
1064  *        Number of elements in the @a modifiers array.
1065  * @param modifiers
1066  *        Full state to store.
1067  * @param result_cb
1068  *        Callback to call with the result of the operation.
1069  * @param cls
1070  *        Closure for the callback.
1071  *
1072  * @return Handle that can be used to cancel the operation.
1073  */
1074 struct GNUNET_PSYCSTORE_OperationHandle *
1075 GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h,
1076                              const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1077                              uint64_t max_state_message_id,
1078                              uint64_t state_hash_message_id,
1079                              size_t modifier_count,
1080                              const struct GNUNET_PSYC_Modifier *modifiers,
1081                              GNUNET_PSYCSTORE_ResultCallback result_cb,
1082                              void *cls)
1083 {
1084   struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
1085   size_t i;
1086
1087   for (i = 0; i < modifier_count; i++) {
1088     struct StateSyncRequest *req;
1089     uint16_t name_size = strlen (modifiers[i].name) + 1;
1090
1091     struct GNUNET_MQ_Envelope *
1092       env = GNUNET_MQ_msg_extra (req,
1093                                  sizeof (*req) + name_size + modifiers[i].value_size,
1094                                  GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC);
1095
1096     req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC);
1097     req->header.size = htons (sizeof (*req) + name_size
1098                               + modifiers[i].value_size);
1099     req->channel_key = *channel_key;
1100     req->max_state_message_id = GNUNET_htonll (max_state_message_id);
1101     req->state_hash_message_id = GNUNET_htonll (state_hash_message_id);
1102     req->name_size = htons (name_size);
1103     req->flags
1104       = (0 == i)
1105       ? STATE_OP_FIRST
1106       : (modifier_count - 1 == i)
1107       ? STATE_OP_LAST
1108       : 0;
1109
1110     GNUNET_memcpy (&req[1], modifiers[i].name, name_size);
1111     GNUNET_memcpy ((char *) &req[1] + name_size, modifiers[i].value, modifiers[i].value_size);
1112
1113     struct StateSyncClosure *ssc = GNUNET_malloc (sizeof (*ssc));
1114     ssc->last = (req->flags & STATE_OP_LAST);
1115     ssc->result_cb = result_cb;
1116     ssc->cls = cls;
1117
1118     op_send (h, op_create (h, h->op, state_sync_result, ssc),
1119              env, &req->op_id);
1120   }
1121   // FIXME: only one operation is returned,
1122   //        add pointers to other operations and make all cancellable.
1123   return op;
1124 }
1125
1126
1127 /**
1128  * Reset the state of a channel.
1129  *
1130  * Delete all state variables stored for the given channel.
1131  *
1132  * @param h
1133  *        Handle for the PSYCstore.
1134  * @param channel_key
1135  *        The channel we are interested in.
1136  * @param result_cb
1137  *        Callback to call with the result of the operation.
1138  * @param cls
1139  *        Closure for the callback.
1140  *
1141  * @return Handle that can be used to cancel the operation.
1142  */
1143 struct GNUNET_PSYCSTORE_OperationHandle *
1144 GNUNET_PSYCSTORE_state_reset (struct GNUNET_PSYCSTORE_Handle *h,
1145                               const struct GNUNET_CRYPTO_EddsaPublicKey
1146                               *channel_key,
1147                               GNUNET_PSYCSTORE_ResultCallback result_cb,
1148                               void *cls)
1149 {
1150   struct OperationRequest *req;
1151   struct GNUNET_MQ_Envelope *
1152     env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_RESET);
1153   req->channel_key = *channel_key;
1154
1155   return
1156     op_send (h, op_create (h, h->op, result_cb, cls),
1157              env, &req->op_id);
1158 }
1159
1160
1161 /**
1162  * Update signed values of state variables in the state store.
1163  *
1164  * @param h
1165  *        Handle for the PSYCstore.
1166  * @param channel_key
1167  *        The channel we are interested in.
1168  * @param message_id
1169  *        Message ID that contained the state @a hash.
1170  * @param hash
1171  *        Hash of the serialized full state.
1172  * @param result_cb
1173  *        Callback to call with the result of the operation.
1174  * @param cls
1175  *        Closure for the callback.
1176  */
1177 struct GNUNET_PSYCSTORE_OperationHandle *
1178 GNUNET_PSYCSTORE_state_hash_update (struct GNUNET_PSYCSTORE_Handle *h,
1179                                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1180                                     uint64_t message_id,
1181                                     const struct GNUNET_HashCode *hash,
1182                                     GNUNET_PSYCSTORE_ResultCallback result_cb,
1183                                     void *cls)
1184 {
1185   struct StateHashUpdateRequest *req;
1186   struct GNUNET_MQ_Envelope *
1187     env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_HASH_UPDATE);
1188   req->channel_key = *channel_key;
1189   req->hash = *hash;
1190
1191   return
1192     op_send (h, op_create (h, h->op, result_cb, cls),
1193              env, &req->op_id);
1194 }
1195
1196
1197 /**
1198  * Retrieve the best matching state variable.
1199  *
1200  * @param h
1201  *        Handle for the PSYCstore.
1202  * @param channel_key
1203  *        The channel we are interested in.
1204  * @param name
1205  *        Name of variable to match, the returned variable might be less specific.
1206  * @param state_cb
1207  *        Callback to return the matching state variable.
1208  * @param result_cb
1209  *        Callback to call with the result of the operation.
1210  * @param cls
1211  *        Closure for the callbacks.
1212  *
1213  * @return Handle that can be used to cancel the operation.
1214  */
1215 struct GNUNET_PSYCSTORE_OperationHandle *
1216 GNUNET_PSYCSTORE_state_get (struct GNUNET_PSYCSTORE_Handle *h,
1217                             const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1218                             const char *name,
1219                             GNUNET_PSYCSTORE_StateCallback state_cb,
1220                             GNUNET_PSYCSTORE_ResultCallback result_cb,
1221                             void *cls)
1222 {
1223   size_t name_size = strlen (name) + 1;
1224   struct OperationRequest *req;
1225   struct GNUNET_MQ_Envelope *
1226     env = GNUNET_MQ_msg_extra (req, name_size,
1227                                GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET);
1228   req->channel_key = *channel_key;
1229   GNUNET_memcpy (&req[1], name, name_size);
1230
1231   struct GNUNET_PSYCSTORE_OperationHandle *
1232     op = op_create (h, h->op, result_cb, cls);
1233   op->state_cb = state_cb;
1234   op->cls = cls;
1235   return op_send (h, op, env, &req->op_id);
1236 }
1237
1238
1239 /**
1240  * Retrieve all state variables for a channel with the given prefix.
1241  *
1242  * @param h
1243  *        Handle for the PSYCstore.
1244  * @param channel_key
1245  *        The channel we are interested in.
1246  * @param name_prefix
1247  *        Prefix of state variable names to match.
1248  * @param state_cb
1249  *        Callback to return matching state variables.
1250  * @param result_cb
1251  *        Callback to call with the result of the operation.
1252  * @param cls
1253  *        Closure for the callbacks.
1254  *
1255  * @return Handle that can be used to cancel the operation.
1256  */
1257 struct GNUNET_PSYCSTORE_OperationHandle *
1258 GNUNET_PSYCSTORE_state_get_prefix (struct GNUNET_PSYCSTORE_Handle *h,
1259                                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1260                                    const char *name_prefix,
1261                                    GNUNET_PSYCSTORE_StateCallback state_cb,
1262                                    GNUNET_PSYCSTORE_ResultCallback result_cb,
1263                                    void *cls)
1264 {
1265   size_t name_size = strlen (name_prefix) + 1;
1266   struct OperationRequest *req;
1267   struct GNUNET_MQ_Envelope *
1268     env = GNUNET_MQ_msg_extra (req, name_size,
1269                                GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET_PREFIX);
1270   req->channel_key = *channel_key;
1271   GNUNET_memcpy (&req[1], name_prefix, name_size);
1272
1273   struct GNUNET_PSYCSTORE_OperationHandle *
1274     op = op_create (h, h->op, result_cb, cls);
1275   op->state_cb = state_cb;
1276   op->cls = cls;
1277   return op_send (h, op, env, &req->op_id);
1278 }
1279
1280 /* end of psycstore_api.c */