psyc: in-order message delivery
[oweals/gnunet.git] / src / psyc / psyc_api.c
1 /*
2  * This file is part of GNUnet
3  * (C) 2013 Christian Grothoff (and other contributing authors)
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * @file psyc/psyc_api.c
23  * @brief PSYC service; high-level access to the PSYC protocol
24  *        note that clients of this API are NOT expected to
25  *        understand the PSYC message format, only the semantics!
26  *        Parsing (and serializing) the PSYC stream format is done
27  *        within the implementation of the libgnunetpsyc library,
28  *        and this API deliberately exposes as little as possible
29  *        of the actual data stream format to the application!
30  * @author Gabor X Toth
31  */
32
33 #include <inttypes.h>
34
35 #include "platform.h"
36 #include "gnunet_util_lib.h"
37 #include "gnunet_env_lib.h"
38 #include "gnunet_multicast_service.h"
39 #include "gnunet_psyc_service.h"
40 #include "psyc.h"
41
42 #define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__)
43
44 struct OperationHandle
45 {
46   struct OperationHandle *prev;
47   struct OperationHandle *next;
48   struct GNUNET_MessageHeader *msg;
49 };
50
51
52 /**
53  * Handle for a pending PSYC transmission operation.
54  */
55 struct GNUNET_PSYC_ChannelTransmitHandle
56 {
57   struct GNUNET_PSYC_Channel *ch;
58   GNUNET_PSYC_TransmitNotifyModifier notify_mod;
59   GNUNET_PSYC_TransmitNotifyData notify_data;
60   void *notify_cls;
61   enum MessageState state;
62 };
63
64 /**
65  * Handle to access PSYC channel operations for both the master and slaves.
66  */
67 struct GNUNET_PSYC_Channel
68 {
69   /**
70    * Transmission handle;
71    */
72   struct GNUNET_PSYC_ChannelTransmitHandle tmit;
73
74   /**
75    * Configuration to use.
76    */
77   const struct GNUNET_CONFIGURATION_Handle *cfg;
78
79   /**
80    * Socket (if available).
81    */
82   struct GNUNET_CLIENT_Connection *client;
83
84   /**
85    * Currently pending transmission request, or NULL for none.
86    */
87   struct GNUNET_CLIENT_TransmitHandle *th;
88
89   /**
90    * Head of operations to transmit.
91    */
92   struct OperationHandle *tmit_head;
93
94   /**
95    * Tail of operations to transmit.
96    */
97   struct OperationHandle *tmit_tail;
98
99   /**
100    * Message being transmitted to the PSYC service.
101    */
102   struct OperationHandle *tmit_msg;
103
104   /**
105    * Message to send on reconnect.
106    */
107   struct GNUNET_MessageHeader *reconnect_msg;
108
109   /**
110    * Task doing exponential back-off trying to reconnect.
111    */
112   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
113
114   /**
115    * Time for next connect retry.
116    */
117   struct GNUNET_TIME_Relative reconnect_delay;
118
119   /**
120    * Message part callback.
121    */
122   GNUNET_PSYC_MessageCallback message_cb;
123
124   /**
125    * Message part callback for historic message.
126    */
127   GNUNET_PSYC_MessageCallback hist_message_cb;
128
129   /**
130    * Join handler callback.
131    */
132   GNUNET_PSYC_JoinCallback join_cb;
133
134   /**
135    * Closure for @a message_cb and @a join_cb.
136    */
137   void *cb_cls;
138
139   /**
140    * ID of the message being received from the PSYC service.
141    */
142   uint64_t recv_message_id;
143
144   /**
145    * Public key of the slave from which a message is being received.
146    */
147   struct GNUNET_CRYPTO_EddsaPublicKey recv_slave_key;
148
149   /**
150    * State of the currently being received message from the PSYC service.
151    */
152   enum MessageState recv_state;
153
154   /**
155    * Flags for the currently being received message from the PSYC service.
156    */
157   enum GNUNET_PSYC_MessageFlags recv_flags;
158
159   /**
160    * Expected value size for the modifier being received from the PSYC service.
161    */
162   uint32_t recv_mod_value_size_expected;
163
164   /**
165    * Actual value size for the modifier being received from the PSYC service.
166    */
167   uint32_t recv_mod_value_size;
168
169   /**
170    * Is transmission paused?
171    */
172   uint8_t tmit_paused;
173
174   /**
175    * Are we still waiting for a PSYC_TRANSMIT_ACK?
176    */
177   uint8_t tmit_ack_pending;
178
179   /**
180    * Are we polling for incoming messages right now?
181    */
182   uint8_t in_receive;
183
184   /**
185    * Are we currently transmitting a message?
186    */
187   uint8_t in_transmit;
188
189   /**
190    * Is this a master or slave channel?
191    */
192   uint8_t is_master;
193 };
194
195
196 /**
197  * Handle for the master of a PSYC channel.
198  */
199 struct GNUNET_PSYC_Master
200 {
201   struct GNUNET_PSYC_Channel ch;
202
203   GNUNET_PSYC_MasterStartCallback start_cb;
204
205   uint64_t max_message_id;
206 };
207
208
209 /**
210  * Handle for a PSYC channel slave.
211  */
212 struct GNUNET_PSYC_Slave
213 {
214   struct GNUNET_PSYC_Channel ch;
215
216   GNUNET_PSYC_SlaveJoinCallback join_cb;
217
218   uint64_t max_message_id;
219 };
220
221
222 /**
223  * Handle that identifies a join request.
224  *
225  * Used to match calls to #GNUNET_PSYC_JoinCallback to the
226  * corresponding calls to GNUNET_PSYC_join_decision().
227  */
228 struct GNUNET_PSYC_JoinHandle
229 {
230
231 };
232
233
234 /**
235  * Handle for a pending PSYC transmission operation.
236  */
237 struct GNUNET_PSYC_SlaveTransmitHandle
238 {
239
240 };
241
242
243 /**
244  * Handle to a story telling operation.
245  */
246 struct GNUNET_PSYC_Story
247 {
248
249 };
250
251
252 /**
253  * Handle for a state query operation.
254  */
255 struct GNUNET_PSYC_StateQuery
256 {
257
258 };
259
260
261 static void
262 reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
263
264
265 static void
266 channel_transmit_data (struct GNUNET_PSYC_Channel *ch);
267
268
269 /**
270  * Reschedule a connect attempt to the service.
271  *
272  * @param c channel to reconnect
273  */
274 static void
275 reschedule_connect (struct GNUNET_PSYC_Channel *c)
276 {
277   GNUNET_assert (c->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
278
279   if (NULL != c->th)
280   {
281     GNUNET_CLIENT_notify_transmit_ready_cancel (c->th);
282     c->th = NULL;
283   }
284   if (NULL != c->client)
285   {
286     GNUNET_CLIENT_disconnect (c->client);
287     c->client = NULL;
288   }
289   c->in_receive = GNUNET_NO;
290   LOG (GNUNET_ERROR_TYPE_DEBUG,
291        "Scheduling task to reconnect to PSYC service in %s.\n",
292        GNUNET_STRINGS_relative_time_to_string (c->reconnect_delay, GNUNET_YES));
293   c->reconnect_task =
294       GNUNET_SCHEDULER_add_delayed (c->reconnect_delay, &reconnect, c);
295   c->reconnect_delay = GNUNET_TIME_STD_BACKOFF (c->reconnect_delay);
296 }
297
298
299 /**
300  * Schedule transmission of the next message from our queue.
301  *
302  * @param ch PSYC channel handle
303  */
304 static void
305 transmit_next (struct GNUNET_PSYC_Channel *ch);
306
307
308 /**
309  * Reset data stored related to the last received message.
310  */
311 static void
312 recv_reset (struct GNUNET_PSYC_Channel *ch)
313 {
314   ch->recv_state = MSG_STATE_START;
315   ch->recv_flags = 0;
316   ch->recv_message_id = 0;
317   //FIXME: ch->recv_slave_key = { 0 };
318   ch->recv_mod_value_size = 0;
319   ch->recv_mod_value_size_expected = 0;
320 }
321
322
323 static void
324 recv_error (struct GNUNET_PSYC_Channel *ch)
325 {
326   GNUNET_PSYC_MessageCallback message_cb
327     = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC
328     ? ch->hist_message_cb
329     : ch->message_cb;
330
331   if (NULL != message_cb)
332     message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, NULL);
333
334   recv_reset (ch);
335 }
336
337
338 /**
339  * Queue a message part for transmission to the PSYC service.
340  *
341  * The message part is added to the current message buffer.
342  * When this buffer is full, it is added to the transmission queue.
343  *
344  * @param ch Channel struct for the client.
345  * @param msg Modifier message part, or NULL when there's no more modifiers.
346  * @param end End of message.
347  */
348 static void
349 queue_message (struct GNUNET_PSYC_Channel *ch,
350                const struct GNUNET_MessageHeader *msg,
351                uint8_t end)
352 {
353   uint16_t size = msg ? ntohs (msg->size) : 0;
354
355   LOG (GNUNET_ERROR_TYPE_DEBUG,
356        "Queueing message of type %u and size %u (end: %u)).\n",
357        ntohs (msg->type), size, end);
358
359   struct OperationHandle *op = ch->tmit_msg;
360   if (NULL != op)
361   {
362     if (NULL == msg
363         || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < op->msg->size + size)
364     {
365       /* End of message or buffer is full, add it to transmission queue
366        * and start with empty buffer */
367       op->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
368       op->msg->size = htons (op->msg->size);
369       GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
370       ch->tmit_msg = op = NULL;
371       ch->tmit_ack_pending++;
372     }
373     else
374     {
375       /* Message fits in current buffer, append */
376       ch->tmit_msg = op
377         = GNUNET_realloc (op, sizeof (*op) + op->msg->size + size);
378       op->msg = (struct GNUNET_MessageHeader *) &op[1];
379       memcpy ((char *) op->msg + op->msg->size, msg, size);
380       op->msg->size += size;
381     }
382   }
383
384   if (NULL == op && NULL != msg)
385   {
386     /* Empty buffer, copy over message. */
387     ch->tmit_msg = op
388       = GNUNET_malloc (sizeof (*op) + sizeof (*op->msg) + size);
389     op->msg = (struct GNUNET_MessageHeader *) &op[1];
390     op->msg->size = sizeof (*op->msg) + size;
391     memcpy (&op->msg[1], msg, size);
392   }
393
394   if (NULL != op
395       && (GNUNET_YES == end
396           || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
397               < op->msg->size + sizeof (struct GNUNET_MessageHeader))))
398   {
399     /* End of message or buffer is full, add it to transmission queue. */
400     op->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
401     op->msg->size = htons (op->msg->size);
402     GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
403     ch->tmit_msg = op = NULL;
404     ch->tmit_ack_pending++;
405   }
406
407   if (GNUNET_YES == end)
408     ch->in_transmit = GNUNET_NO;
409
410   transmit_next (ch);
411 }
412
413
414 /**
415  * Request a modifier from a client to transmit.
416  *
417  * @param mst Master handle.
418  */
419 static void
420 channel_transmit_mod (struct GNUNET_PSYC_Channel *ch)
421 {
422   uint16_t max_data_size, data_size;
423   char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
424   struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
425   int notify_ret;
426
427   switch (ch->tmit.state)
428   {
429   case MSG_STATE_MODIFIER:
430   {
431     struct GNUNET_PSYC_MessageModifier *mod
432       = (struct GNUNET_PSYC_MessageModifier *) msg;
433     max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
434     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
435     msg->size = sizeof (struct GNUNET_PSYC_MessageModifier);
436     notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls, &data_size, &mod[1],
437                                       &mod->oper, &mod->value_size);
438     mod->name_size = strnlen ((char *) &mod[1], data_size);
439     if (mod->name_size < data_size)
440     {
441       mod->value_size = htonl (mod->value_size);
442       mod->name_size = htons (mod->name_size);
443     }
444     else if (0 < data_size)
445     {
446       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got invalid modifier name.\n");
447       notify_ret = GNUNET_SYSERR;
448     }
449     break;
450   }
451   case MSG_STATE_MOD_CONT:
452   {
453     max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
454     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
455     msg->size = sizeof (struct GNUNET_MessageHeader);
456     notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls,
457                                       &data_size, &msg[1], NULL, NULL);
458     break;
459   }
460   default:
461     GNUNET_assert (0);
462   }
463
464   switch (notify_ret)
465   {
466   case GNUNET_NO:
467     if (0 == data_size)
468     { /* Transmission paused, nothing to send. */
469       ch->tmit_paused = GNUNET_YES;
470       return;
471     }
472     ch->tmit.state = MSG_STATE_MOD_CONT;
473     break;
474
475   case GNUNET_YES:
476     if (0 == data_size)
477     {
478       /* End of modifiers. */
479       ch->tmit.state = MSG_STATE_DATA;
480       if (0 == ch->tmit_ack_pending)
481         channel_transmit_data (ch);
482
483       return;
484     }
485     ch->tmit.state = MSG_STATE_MODIFIER;
486     break;
487
488   default:
489     LOG (GNUNET_ERROR_TYPE_ERROR,
490          "MasterTransmitNotifyModifier returned error "
491          "when requesting a modifier.\n");
492
493     ch->tmit.state = MSG_STATE_CANCEL;
494     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
495     msg->size = htons (sizeof (*msg));
496
497     queue_message (ch, msg, GNUNET_YES);
498     return;
499   }
500
501   if (0 < data_size)
502   {
503     GNUNET_assert (data_size <= max_data_size);
504     msg->size = htons (msg->size + data_size);
505     queue_message (ch, msg, GNUNET_NO);
506   }
507
508   channel_transmit_mod (ch);
509 }
510
511
512 /**
513  * Request data from a client to transmit.
514  *
515  * @param mst Master handle.
516  */
517 static void
518 channel_transmit_data (struct GNUNET_PSYC_Channel *ch)
519 {
520   uint16_t data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD;
521   char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
522   struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
523
524   msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
525
526   int notify_ret = ch->tmit.notify_data (ch->tmit.notify_cls,
527                                          &data_size, &msg[1]);
528   switch (notify_ret)
529   {
530   case GNUNET_NO:
531     if (0 == data_size)
532     {
533       /* Transmission paused, nothing to send. */
534       ch->tmit_paused = GNUNET_YES;
535       return;
536     }
537     break;
538
539   case GNUNET_YES:
540     ch->tmit.state = MSG_STATE_END;
541     break;
542
543   default:
544     LOG (GNUNET_ERROR_TYPE_ERROR,
545          "MasterTransmitNotify returned error when requesting data.\n");
546
547     ch->tmit.state = MSG_STATE_CANCEL;
548     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
549     msg->size = htons (sizeof (*msg));
550     queue_message (ch, msg, GNUNET_YES);
551     return;
552   }
553
554   if (0 < data_size)
555   {
556     GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD);
557     msg->size = htons (sizeof (*msg) + data_size);
558     queue_message (ch, msg, !notify_ret);
559   }
560
561   /* End of message. */
562   if (GNUNET_YES == notify_ret)
563   {
564     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
565     msg->size = htons (sizeof (*msg));
566     queue_message (ch, msg, GNUNET_YES);
567   }
568 }
569
570
571 /**
572  * Send a message to a channel.
573  *
574  * @param ch Handle to the PSYC channel.
575  * @param method_name Which method should be invoked.
576  * @param notify_mod Function to call to obtain modifiers.
577  * @param notify_data Function to call to obtain fragments of the data.
578  * @param notify_cls Closure for @a notify_mod and @a notify_data.
579  * @param flags Flags for the message being transmitted.
580  * @return Transmission handle, NULL on error (i.e. more than one request queued).
581  */
582 static struct GNUNET_PSYC_ChannelTransmitHandle *
583 channel_transmit (struct GNUNET_PSYC_Channel *ch,
584                   const char *method_name,
585                   GNUNET_PSYC_TransmitNotifyModifier notify_mod,
586                   GNUNET_PSYC_TransmitNotifyData notify_data,
587                   void *notify_cls,
588                   uint32_t flags)
589 {
590   if (GNUNET_NO != ch->in_transmit)
591     return NULL;
592   ch->in_transmit = GNUNET_YES;
593
594   size_t size = strlen (method_name) + 1;
595   struct GNUNET_PSYC_MessageMethod *pmeth;
596   struct OperationHandle *op;
597
598   ch->tmit_msg = op = GNUNET_malloc (sizeof (*op) + sizeof (*op->msg)
599                                      + sizeof (*pmeth) + size);
600   op->msg = (struct GNUNET_MessageHeader *) &op[1];
601   op->msg->size = sizeof (*op->msg) + sizeof (*pmeth) + size;
602
603   pmeth = (struct GNUNET_PSYC_MessageMethod *) &op->msg[1];
604   pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
605   pmeth->header.size = htons (sizeof (*pmeth) + size);
606   pmeth->flags = htonl (flags);
607   memcpy (&pmeth[1], method_name, size);
608
609   ch->tmit.ch = ch;
610   ch->tmit.notify_mod = notify_mod;
611   ch->tmit.notify_data = notify_data;
612   ch->tmit.notify_cls = notify_cls;
613   ch->tmit.state = MSG_STATE_MODIFIER;
614
615   channel_transmit_mod (ch);
616   return &ch->tmit;
617 }
618
619
620 /**
621  * Resume transmission to the channel.
622  *
623  * @param th Handle of the request that is being resumed.
624  */
625 static void
626 channel_transmit_resume (struct GNUNET_PSYC_ChannelTransmitHandle *th)
627 {
628   struct GNUNET_PSYC_Channel *ch = th->ch;
629   if (0 == ch->tmit_ack_pending)
630   {
631     ch->tmit_paused = GNUNET_NO;
632     channel_transmit_data (ch);
633   }
634 }
635
636
637 /**
638  * Abort transmission request to channel.
639  *
640  * @param th Handle of the request that is being aborted.
641  */
642 static void
643 channel_transmit_cancel (struct GNUNET_PSYC_ChannelTransmitHandle *th)
644 {
645   struct GNUNET_PSYC_Channel *ch = th->ch;
646   if (GNUNET_NO == ch->in_transmit)
647     return;
648 }
649
650
651 /**
652  * Handle incoming message from the PSYC service.
653  *
654  * @param ch The channel the message is sent to.
655  * @param pmsg The message.
656  */
657 static void
658 handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
659                      const struct GNUNET_PSYC_MessageHeader *msg)
660 {
661   uint16_t size = ntohs (msg->header.size);
662   uint32_t flags = ntohl (msg->flags);
663
664   GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG,
665                            (struct GNUNET_MessageHeader *) msg);
666
667   if (MSG_STATE_START == ch->recv_state)
668   {
669     ch->recv_message_id = GNUNET_ntohll (msg->message_id);
670     ch->recv_flags = flags;
671     ch->recv_slave_key = msg->slave_key;
672     ch->recv_mod_value_size = 0;
673     ch->recv_mod_value_size_expected = 0;
674   }
675   else if (GNUNET_ntohll (msg->message_id) != ch->recv_message_id)
676   {
677     // FIXME
678     LOG (GNUNET_ERROR_TYPE_WARNING,
679          "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n",
680          GNUNET_ntohll (msg->message_id), ch->recv_message_id);
681     GNUNET_break_op (0);
682     recv_error (ch);
683     return;
684   }
685   else if (flags != ch->recv_flags)
686   {
687     LOG (GNUNET_ERROR_TYPE_WARNING,
688          "Unexpected message flags. Got: %lu, expected: %lu\n",
689          flags, ch->recv_flags);
690     GNUNET_break_op (0);
691     recv_error (ch);
692     return;
693   }
694
695   uint16_t pos = 0, psize = 0, ptype, size_eq, size_min;
696
697   for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
698   {
699     const struct GNUNET_MessageHeader *pmsg
700       = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
701     psize = ntohs (pmsg->size);
702     ptype = ntohs (pmsg->type);
703     size_eq = size_min = 0;
704
705     if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
706     {
707       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
708                   "Dropping message of type %u with invalid size %u.\n",
709                   ptype, psize);
710       recv_error (ch);
711       return;
712     }
713
714     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
715                 "Received message part from PSYC.\n");
716     GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
717
718     switch (ptype)
719     {
720     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
721       size_min = sizeof (struct GNUNET_PSYC_MessageMethod);
722       break;
723     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
724       size_min = sizeof (struct GNUNET_PSYC_MessageModifier);
725       break;
726     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
727     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
728       size_min = sizeof (struct GNUNET_MessageHeader);
729       break;
730     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
731     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
732       size_eq = sizeof (struct GNUNET_MessageHeader);
733       break;
734     default:
735       GNUNET_break_op (0);
736       recv_error (ch);
737       return;
738     }
739
740     if (! ((0 < size_eq && psize == size_eq)
741            || (0 < size_min && size_min <= psize)))
742     {
743       GNUNET_break_op (0);
744       recv_error (ch);
745       return;
746     }
747
748     switch (ptype)
749     {
750     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
751     {
752       struct GNUNET_PSYC_MessageMethod *meth
753         = (struct GNUNET_PSYC_MessageMethod *) pmsg;
754
755       if (MSG_STATE_START != ch->recv_state)
756       {
757         LOG (GNUNET_ERROR_TYPE_WARNING,
758              "Dropping out of order message method (%u).\n",
759              ch->recv_state);
760         /* It is normal to receive an incomplete message right after connecting,
761          * but should not happen later.
762          * FIXME: add a check for this condition.
763          */
764         GNUNET_break_op (0);
765         recv_error (ch);
766         return;
767       }
768
769       if ('\0' != *((char *) meth + psize - 1))
770       {
771         LOG (GNUNET_ERROR_TYPE_WARNING,
772              "Dropping message with malformed method. "
773              "Message ID: %" PRIu64 "\n", ch->recv_message_id);
774         GNUNET_break_op (0);
775         recv_error (ch);
776         return;
777       }
778       ch->recv_state = MSG_STATE_METHOD;
779       break;
780     }
781     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
782     {
783       if (!(MSG_STATE_METHOD == ch->recv_state
784             || MSG_STATE_MODIFIER == ch->recv_state
785             || MSG_STATE_MOD_CONT == ch->recv_state))
786       {
787         LOG (GNUNET_ERROR_TYPE_WARNING,
788              "Dropping out of order message modifier (%u).\n",
789              ch->recv_state);
790         GNUNET_break_op (0);
791         recv_error (ch);
792         return;
793       }
794
795       struct GNUNET_PSYC_MessageModifier *mod
796         = (struct GNUNET_PSYC_MessageModifier *) pmsg;
797
798       uint16_t name_size = ntohs (mod->name_size);
799       ch->recv_mod_value_size_expected = ntohl (mod->value_size);
800       ch->recv_mod_value_size = psize - sizeof (*mod) - name_size - 1;
801
802       if (psize < sizeof (*mod) + name_size + 1
803           || '\0' != *((char *) &mod[1] + name_size)
804           || ch->recv_mod_value_size_expected < ch->recv_mod_value_size)
805       {
806         LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping malformed modifier.\n");
807         GNUNET_break_op (0);
808         recv_error (ch);
809         return;
810       }
811       ch->recv_state = MSG_STATE_MODIFIER;
812       break;
813     }
814     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
815     {
816       ch->recv_mod_value_size += psize - sizeof (*pmsg);
817
818       if (!(MSG_STATE_MODIFIER == ch->recv_state
819             || MSG_STATE_MOD_CONT == ch->recv_state)
820           || ch->recv_mod_value_size_expected < ch->recv_mod_value_size)
821       {
822         LOG (GNUNET_ERROR_TYPE_WARNING,
823              "Dropping out of order message modifier continuation "
824              "!(%u == %u || %u == %u) || %lu < %lu.\n",
825              MSG_STATE_MODIFIER, ch->recv_state,
826              MSG_STATE_MOD_CONT, ch->recv_state,
827              ch->recv_mod_value_size_expected, ch->recv_mod_value_size);
828         GNUNET_break_op (0);
829         recv_error (ch);
830         return;
831       }
832       break;
833     }
834     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
835     {
836       if (ch->recv_state < MSG_STATE_METHOD
837           || ch->recv_mod_value_size_expected != ch->recv_mod_value_size)
838       {
839         LOG (GNUNET_ERROR_TYPE_WARNING,
840              "Dropping out of order message data fragment "
841              "(%u < %u || %lu != %lu).\n",
842              ch->recv_state, MSG_STATE_METHOD,
843              ch->recv_mod_value_size_expected, ch->recv_mod_value_size);
844
845         GNUNET_break_op (0);
846         recv_error (ch);
847         return;
848       }
849       ch->recv_state = MSG_STATE_DATA;
850       break;
851     }
852     }
853
854     GNUNET_PSYC_MessageCallback message_cb
855       = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC
856       ? ch->hist_message_cb
857       : ch->message_cb;
858
859     if (NULL != message_cb)
860       message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, pmsg);
861
862     switch (ptype)
863     {
864     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
865     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
866       recv_reset (ch);
867       break;
868     }
869   }
870 }
871
872
873 /**
874  * Handle incoming message acknowledgement from the PSYC service.
875  *
876  * @param ch The channel the acknowledgement is sent to.
877  */
878 static void
879 handle_psyc_message_ack (struct GNUNET_PSYC_Channel *ch)
880 {
881   if (0 == ch->tmit_ack_pending)
882   {
883     LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n");
884     GNUNET_break (0);
885     return;
886   }
887   ch->tmit_ack_pending--;
888
889   switch (ch->tmit.state)
890   {
891   case MSG_STATE_MODIFIER:
892   case MSG_STATE_MOD_CONT:
893     if (GNUNET_NO == ch->tmit_paused)
894       channel_transmit_mod (ch);
895     break;
896
897   case MSG_STATE_DATA:
898     if (GNUNET_NO == ch->tmit_paused)
899       channel_transmit_data (ch);
900     break;
901
902   case MSG_STATE_END:
903   case MSG_STATE_CANCEL:
904     break;
905
906   default:
907     LOG (GNUNET_ERROR_TYPE_DEBUG,
908          "Ignoring message ACK in state %u.\n", ch->tmit.state);
909   }
910 }
911
912
913 /**
914  * Type of a function to call when we receive a message
915  * from the service.
916  *
917  * @param cls closure
918  * @param msg message received, NULL on timeout or fatal error
919  */
920 static void
921 message_handler (void *cls,
922                  const struct GNUNET_MessageHeader *msg)
923 {
924   // YUCK! => please have disjoint message handlers...
925   struct GNUNET_PSYC_Channel *ch = cls;
926   struct GNUNET_PSYC_Master *mst = cls;
927   struct GNUNET_PSYC_Slave *slv = cls;
928
929   if (NULL == msg)
930   {
931     // timeout / disconnected from server, reconnect
932     reschedule_connect (ch);
933     return;
934   }
935   uint16_t size_eq = 0;
936   uint16_t size_min = 0;
937   uint16_t size = ntohs (msg->size);
938   uint16_t type = ntohs (msg->type);
939
940   LOG (GNUNET_ERROR_TYPE_DEBUG,
941        "Received message of type %d and size %u from PSYC service\n",
942        type, size);
943
944   switch (type)
945   {
946   case GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK:
947   case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK:
948     size_eq = sizeof (struct CountersResult);
949     break;
950   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
951     size_min = sizeof (struct GNUNET_PSYC_MessageHeader);
952     break;
953   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK:
954     size_eq = sizeof (struct GNUNET_MessageHeader);
955     break;
956   default:
957     GNUNET_break_op (0);
958     return;
959   }
960
961   if (! ((0 < size_eq && size == size_eq)
962          || (0 < size_min && size_min <= size)))
963   {
964     GNUNET_break_op (0);
965     return;
966   }
967
968   switch (type)
969   {
970   case GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK:
971   {
972     struct CountersResult *cres = (struct CountersResult *) msg;
973     mst->max_message_id = GNUNET_ntohll (cres->max_message_id);
974     if (NULL != mst->start_cb)
975       mst->start_cb (ch->cb_cls, mst->max_message_id);
976     break;
977   }
978   case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK:
979   {
980     struct CountersResult *cres = (struct CountersResult *) msg;
981     slv->max_message_id = GNUNET_ntohll (cres->max_message_id);
982     if (NULL != slv->join_cb)
983       slv->join_cb (ch->cb_cls, slv->max_message_id);
984     break;
985   }
986   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK:
987   {
988     handle_psyc_message_ack (ch);
989     break;
990   }
991
992   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
993     handle_psyc_message (ch, (const struct GNUNET_PSYC_MessageHeader *) msg);
994     break;
995   }
996
997   if (NULL != ch->client)
998   {
999     GNUNET_CLIENT_receive (ch->client, &message_handler, ch,
1000                            GNUNET_TIME_UNIT_FOREVER_REL);
1001   }
1002 }
1003
1004
1005 /**
1006  * Transmit next message to service.
1007  *
1008  * @param cls The 'struct GNUNET_PSYC_Channel'.
1009  * @param size Number of bytes available in buf.
1010  * @param buf Where to copy the message.
1011  * @return Number of bytes copied to buf.
1012  */
1013 static size_t
1014 send_next_message (void *cls, size_t size, void *buf)
1015 {
1016   struct GNUNET_PSYC_Channel *ch = cls;
1017   struct OperationHandle *op = ch->tmit_head;
1018   size_t ret;
1019   LOG (GNUNET_ERROR_TYPE_DEBUG, "send_next_message()\n");
1020   ch->th = NULL;
1021   if (NULL == op->msg)
1022     return 0;
1023   ret = ntohs (op->msg->size);
1024   if (ret > size)
1025   {
1026     reschedule_connect (ch);
1027     return 0;
1028   }
1029   memcpy (buf, op->msg, ret);
1030
1031   GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, op);
1032   GNUNET_free (op);
1033
1034   if (NULL != ch->tmit_head)
1035     transmit_next (ch);
1036
1037   if (GNUNET_NO == ch->in_receive)
1038   {
1039     ch->in_receive = GNUNET_YES;
1040     GNUNET_CLIENT_receive (ch->client, &message_handler, ch,
1041                            GNUNET_TIME_UNIT_FOREVER_REL);
1042   }
1043   return ret;
1044 }
1045
1046
1047 /**
1048  * Schedule transmission of the next message from our queue.
1049  *
1050  * @param ch PSYC handle.
1051  */
1052 static void
1053 transmit_next (struct GNUNET_PSYC_Channel *ch)
1054 {
1055   LOG (GNUNET_ERROR_TYPE_DEBUG, "transmit_next()\n");
1056   if (NULL != ch->th || NULL == ch->client)
1057     return;
1058
1059   struct OperationHandle *op = ch->tmit_head;
1060   if (NULL == op)
1061     return;
1062
1063   ch->th = GNUNET_CLIENT_notify_transmit_ready (ch->client,
1064                                                 ntohs (op->msg->size),
1065                                                 GNUNET_TIME_UNIT_FOREVER_REL,
1066                                                 GNUNET_NO,
1067                                                 &send_next_message,
1068                                                 ch);
1069 }
1070
1071
1072 /**
1073  * Try again to connect to the PSYC service.
1074  *
1075  * @param cls Channel handle.
1076  * @param tc Scheduler context.
1077  */
1078 static void
1079 reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1080 {
1081   struct GNUNET_PSYC_Channel *ch = cls;
1082
1083   recv_reset (ch);
1084   ch->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
1085   LOG (GNUNET_ERROR_TYPE_DEBUG,
1086        "Connecting to PSYC service.\n");
1087   GNUNET_assert (NULL == ch->client);
1088   ch->client = GNUNET_CLIENT_connect ("psyc", ch->cfg);
1089   GNUNET_assert (NULL != ch->client);
1090
1091   if (NULL == ch->tmit_head ||
1092       ch->tmit_head->msg->type != ch->reconnect_msg->type)
1093   {
1094     uint16_t reconn_size = ntohs (ch->reconnect_msg->size);
1095     struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + reconn_size);
1096     memcpy (&op[1], ch->reconnect_msg, reconn_size);
1097     op->msg = (struct GNUNET_MessageHeader *) &op[1];
1098     GNUNET_CONTAINER_DLL_insert (ch->tmit_head, ch->tmit_tail, op);
1099   }
1100   transmit_next (ch);
1101 }
1102
1103
1104 /**
1105  * Disconnect from the PSYC service.
1106  *
1107  * @param c Channel handle to disconnect
1108  */
1109 static void
1110 disconnect (void *c)
1111 {
1112   struct GNUNET_PSYC_Channel *ch = c;
1113
1114   GNUNET_assert (NULL != ch);
1115   if (ch->tmit_head != ch->tmit_tail)
1116   {
1117     LOG (GNUNET_ERROR_TYPE_ERROR,
1118          "Disconnecting while there are still outstanding messages!\n");
1119     GNUNET_break (0);
1120   }
1121   if (ch->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
1122   {
1123     GNUNET_SCHEDULER_cancel (ch->reconnect_task);
1124     ch->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
1125   }
1126   if (NULL != ch->th)
1127   {
1128     GNUNET_CLIENT_notify_transmit_ready_cancel (ch->th);
1129     ch->th = NULL;
1130   }
1131   if (NULL != ch->client)
1132   {
1133     GNUNET_CLIENT_disconnect (ch->client);
1134     ch->client = NULL;
1135   }
1136   if (NULL != ch->reconnect_msg)
1137   {
1138     GNUNET_free (ch->reconnect_msg);
1139     ch->reconnect_msg = NULL;
1140   }
1141 }
1142
1143
1144 /**
1145  * Start a PSYC master channel.
1146  *
1147  * Will start a multicast group identified by the given ECC key.  Messages
1148  * received from group members will be given to the respective handler methods.
1149  * If a new member wants to join a group, the "join" method handler will be
1150  * invoked; the join handler must then generate a "join" message to approve the
1151  * joining of the new member.  The channel can also change group membership
1152  * without explicit requests.  Note that PSYC doesn't itself "understand" join
1153  * or part messages, the respective methods must call other PSYC functions to
1154  * inform PSYC about the meaning of the respective events.
1155  *
1156  * @param cfg Configuration to use (to connect to PSYC service).
1157  * @param channel_key ECC key that will be used to sign messages for this
1158  *        PSYC session. The public key is used to identify the PSYC channel.
1159  *        Note that end-users will usually not use the private key directly, but
1160  *        rather look it up in GNS for places managed by other users, or select
1161  *        a file with the private key(s) when setting up their own channels
1162  *        FIXME: we'll likely want to use NOT the p521 curve here, but a cheaper
1163  *        one in the future.
1164  * @param policy Channel policy specifying join and history restrictions.
1165  *        Used to automate join decisions.
1166  * @param message_cb Function to invoke on message parts received from slaves.
1167  * @param join_cb Function to invoke when a peer wants to join.
1168  * @param master_started_cb Function to invoke after the channel master started.
1169  * @param cls Closure for @a master_started_cb and @a join_cb.
1170  * @return Handle for the channel master, NULL on error.
1171  */
1172 struct GNUNET_PSYC_Master *
1173 GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
1174                           const struct GNUNET_CRYPTO_EddsaPrivateKey *channel_key,
1175                           enum GNUNET_PSYC_Policy policy,
1176                           GNUNET_PSYC_MessageCallback message_cb,
1177                           GNUNET_PSYC_JoinCallback join_cb,
1178                           GNUNET_PSYC_MasterStartCallback master_started_cb,
1179                           void *cls)
1180 {
1181   struct GNUNET_PSYC_Master *mst = GNUNET_malloc (sizeof (*mst));
1182   struct GNUNET_PSYC_Channel *ch = &mst->ch;
1183   struct MasterStartRequest *req = GNUNET_malloc (sizeof (*req));
1184
1185   req->header.size = htons (sizeof (*req));
1186   req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START);
1187   req->channel_key = *channel_key;
1188   req->policy = policy;
1189
1190   ch->cfg = cfg;
1191   ch->is_master = GNUNET_YES;
1192   ch->reconnect_msg = (struct GNUNET_MessageHeader *) req;
1193   ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
1194   ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, mst);
1195
1196   ch->message_cb = message_cb;
1197   ch->join_cb = join_cb;
1198   ch->cb_cls = cls;
1199   mst->start_cb = master_started_cb;
1200
1201   return mst;
1202 }
1203
1204
1205 /**
1206  * Stop a PSYC master channel.
1207  *
1208  * @param master PSYC channel master to stop.
1209  */
1210 void
1211 GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *master)
1212 {
1213   disconnect (master);
1214   GNUNET_free (master);
1215 }
1216
1217
1218 /**
1219  * Function to call with the decision made for a join request.
1220  *
1221  * Must be called once and only once in response to an invocation of the
1222  * #GNUNET_PSYC_JoinCallback.
1223  *
1224  * @param jh Join request handle.
1225  * @param is_admitted #GNUNET_YES if joining is approved,
1226  *        #GNUNET_NO if it is disapproved.
1227  * @param relay_count Number of relays given.
1228  * @param relays Array of suggested peers that might be useful relays to use
1229  *        when joining the multicast group (essentially a list of peers that
1230  *        are already part of the multicast group and might thus be willing
1231  *        to help with routing).  If empty, only this local peer (which must
1232  *        be the multicast origin) is a good candidate for building the
1233  *        multicast tree.  Note that it is unnecessary to specify our own
1234  *        peer identity in this array.
1235  * @param method_name Method name for the message transmitted with the response.
1236  * @param env Environment containing transient variables for the message, or NULL.
1237  * @param data Data of the message.
1238  * @param data_size Size of @a data.
1239  */
1240 void
1241 GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh,
1242                            int is_admitted,
1243                            uint32_t relay_count,
1244                            const struct GNUNET_PeerIdentity *relays,
1245                            const char *method_name,
1246                            const struct GNUNET_ENV_Environment *env,
1247                            const void *data,
1248                            size_t data_size)
1249 {
1250
1251 }
1252
1253
1254 /**
1255  * Send a message to call a method to all members in the PSYC channel.
1256  *
1257  * @param master Handle to the PSYC channel.
1258  * @param method_name Which method should be invoked.
1259  * @param notify_mod Function to call to obtain modifiers.
1260  * @param notify_data Function to call to obtain fragments of the data.
1261  * @param notify_cls Closure for @a notify_mod and @a notify_data.
1262  * @param flags Flags for the message being transmitted.
1263  * @return Transmission handle, NULL on error (i.e. more than one request queued).
1264  */
1265 struct GNUNET_PSYC_MasterTransmitHandle *
1266 GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master,
1267                              const char *method_name,
1268                              GNUNET_PSYC_TransmitNotifyModifier notify_mod,
1269                              GNUNET_PSYC_TransmitNotifyData notify_data,
1270                              void *notify_cls,
1271                              enum GNUNET_PSYC_MasterTransmitFlags flags)
1272 {
1273   return (struct GNUNET_PSYC_MasterTransmitHandle *)
1274     channel_transmit (&master->ch, method_name, notify_mod, notify_data,
1275                       notify_cls, flags);
1276 }
1277
1278
1279 /**
1280  * Resume transmission to the channel.
1281  *
1282  * @param th Handle of the request that is being resumed.
1283  */
1284 void
1285 GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th)
1286 {
1287   channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
1288 }
1289
1290
1291 /**
1292  * Abort transmission request to the channel.
1293  *
1294  * @param th Handle of the request that is being aborted.
1295  */
1296 void
1297 GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle *th)
1298 {
1299   channel_transmit_cancel ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
1300 }
1301
1302
1303 /**
1304  * Join a PSYC channel.
1305  *
1306  * The entity joining is always the local peer.  The user must immediately use
1307  * the GNUNET_PSYC_slave_transmit() functions to transmit a @e join_msg to the
1308  * channel; if the join request succeeds, the channel state (and @e recent
1309  * method calls) will be replayed to the joining member.  There is no explicit
1310  * notification on failure (as the channel may simply take days to approve,
1311  * and disapproval is simply being ignored).
1312  *
1313  * @param cfg Configuration to use.
1314  * @param channel_key ECC public key that identifies the channel we wish to join.
1315  * @param slave_key ECC private-public key pair that identifies the slave, and
1316  *        used by multicast to sign the join request and subsequent unicast
1317  *        requests sent to the master.
1318  * @param origin Peer identity of the origin.
1319  * @param relay_count Number of peers in the @a relays array.
1320  * @param relays Peer identities of members of the multicast group, which serve
1321  *        as relays and used to join the group at.
1322  * @param message_cb Function to invoke on message parts received from the
1323  *        channel, typically at least contains method handlers for @e join and
1324  *        @e part.
1325  * @param join_cb function invoked once we have joined with the current
1326  *        message ID of the channel
1327  * @param slave_joined_cb Function to invoke when a peer wants to join.
1328  * @param cls Closure for @a message_cb and @a slave_joined_cb.
1329  * @param method_name Method name for the join request.
1330  * @param env Environment containing transient variables for the request, or NULL.
1331  * @param data Payload for the join message.
1332  * @param data_size Number of bytes in @a data.
1333  * @return Handle for the slave, NULL on error.
1334  */
1335 struct GNUNET_PSYC_Slave *
1336 GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
1337                         const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1338                         const struct GNUNET_CRYPTO_EddsaPrivateKey *slave_key,
1339                         const struct GNUNET_PeerIdentity *origin,
1340                         uint32_t relay_count,
1341                         const struct GNUNET_PeerIdentity *relays,
1342                         GNUNET_PSYC_MessageCallback message_cb,
1343                         GNUNET_PSYC_JoinCallback join_cb,
1344                         GNUNET_PSYC_SlaveJoinCallback slave_joined_cb,
1345                         void *cls,
1346                         const char *method_name,
1347                         const struct GNUNET_ENV_Environment *env,
1348                         const void *data,
1349                         uint16_t data_size)
1350 {
1351   struct GNUNET_PSYC_Slave *slv = GNUNET_malloc (sizeof (*slv));
1352   struct GNUNET_PSYC_Channel *ch = &slv->ch;
1353   struct SlaveJoinRequest *req
1354     = GNUNET_malloc (sizeof (*req) + relay_count * sizeof (*relays));
1355   req->header.size = htons (sizeof (*req)
1356                             + relay_count * sizeof (*relays));
1357   req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN);
1358   req->channel_key = *channel_key;
1359   req->slave_key = *slave_key;
1360   req->origin = *origin;
1361   req->relay_count = htonl (relay_count);
1362   memcpy (&req[1], relays, relay_count * sizeof (*relays));
1363
1364   ch->message_cb = message_cb;
1365   ch->join_cb = join_cb;
1366   ch->cb_cls = cls;
1367
1368   ch->cfg = cfg;
1369   ch->is_master = GNUNET_NO;
1370   ch->reconnect_msg = (struct GNUNET_MessageHeader *) req;
1371   ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
1372   ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, slv);
1373
1374   slv->join_cb = slave_joined_cb;
1375   return slv;
1376 }
1377
1378
1379 /**
1380  * Part a PSYC channel.
1381  *
1382  * Will terminate the connection to the PSYC service.  Polite clients should
1383  * first explicitly send a part request (via GNUNET_PSYC_slave_transmit()).
1384  *
1385  * @param slave Slave handle.
1386  */
1387 void
1388 GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slave)
1389 {
1390   disconnect (slave);
1391   GNUNET_free (slave);
1392 }
1393
1394
1395 /**
1396  * Request a message to be sent to the channel master.
1397  *
1398  * @param slave Slave handle.
1399  * @param method_name Which (PSYC) method should be invoked (on host).
1400  * @param notify_mod Function to call to obtain modifiers.
1401  * @param notify_data Function to call to obtain fragments of the data.
1402  * @param notify_cls Closure for @a notify.
1403  * @param flags Flags for the message being transmitted.
1404  * @return Transmission handle, NULL on error (i.e. more than one request
1405  *         queued).
1406  */
1407 struct GNUNET_PSYC_SlaveTransmitHandle *
1408 GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave,
1409                             const char *method_name,
1410                             GNUNET_PSYC_TransmitNotifyModifier notify_mod,
1411                             GNUNET_PSYC_TransmitNotifyData notify_data,
1412                             void *notify_cls,
1413                             enum GNUNET_PSYC_SlaveTransmitFlags flags)
1414 {
1415   return (struct GNUNET_PSYC_SlaveTransmitHandle *)
1416     channel_transmit (&slave->ch, method_name,
1417                       notify_mod, notify_data, notify_cls, flags);
1418 }
1419
1420
1421 /**
1422  * Resume transmission to the master.
1423  *
1424  * @param th Handle of the request that is being resumed.
1425  */
1426 void
1427 GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_SlaveTransmitHandle *th)
1428 {
1429   channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
1430 }
1431
1432
1433 /**
1434  * Abort transmission request to master.
1435  *
1436  * @param th Handle of the request that is being aborted.
1437  */
1438 void
1439 GNUNET_PSYC_slave_transmit_cancel (struct GNUNET_PSYC_SlaveTransmitHandle *th)
1440 {
1441   channel_transmit_cancel ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
1442 }
1443
1444
1445 /**
1446  * Convert a channel @a master to a @e channel handle to access the @e channel
1447  * APIs.
1448  *
1449  * @param master Channel master handle.
1450  * @return Channel handle, valid for as long as @a master is valid.
1451  */
1452 struct GNUNET_PSYC_Channel *
1453 GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master)
1454 {
1455   return &master->ch;
1456 }
1457
1458
1459 /**
1460  * Convert @a slave to a @e channel handle to access the @e channel APIs.
1461  *
1462  * @param slave Slave handle.
1463  * @return Channel handle, valid for as long as @a slave is valid.
1464  */
1465 struct GNUNET_PSYC_Channel *
1466 GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slave)
1467 {
1468   return &slave->ch;
1469 }
1470
1471
1472 /**
1473  * Add a slave to the channel's membership list.
1474  *
1475  * Note that this will NOT generate any PSYC traffic, it will merely update the
1476  * local database to modify how we react to <em>membership test</em> queries.
1477  * The channel master still needs to explicitly transmit a @e join message to
1478  * notify other channel members and they then also must still call this function
1479  * in their respective methods handling the @e join message.  This way, how @e
1480  * join and @e part operations are exactly implemented is still up to the
1481  * application; for example, there might be a @e part_all method to kick out
1482  * everyone.
1483  *
1484  * Note that channel slaves are explicitly trusted to execute such methods
1485  * correctly; not doing so correctly will result in either denying other slaves
1486  * access or offering access to channel data to non-members.
1487  *
1488  * @param channel Channel handle.
1489  * @param slave_key Identity of channel slave to add.
1490  * @param announced_at ID of the message that announced the membership change.
1491  * @param effective_since Addition of slave is in effect since this message ID.
1492  */
1493 void
1494 GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *channel,
1495                                const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
1496                                uint64_t announced_at,
1497                                uint64_t effective_since)
1498 {
1499   struct ChannelSlaveAdd *slvadd;
1500   struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + sizeof (*slvadd));
1501
1502   slvadd = (struct ChannelSlaveAdd *) &op[1];
1503   op->msg = (struct GNUNET_MessageHeader *) slvadd;
1504
1505   slvadd->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD);
1506   slvadd->header.size = htons (sizeof (*slvadd));
1507   slvadd->announced_at = GNUNET_htonll (announced_at);
1508   slvadd->effective_since = GNUNET_htonll (effective_since);
1509   GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head,
1510                                     channel->tmit_tail,
1511                                     op);
1512   transmit_next (channel);
1513 }
1514
1515
1516 /**
1517  * Remove a slave from the channel's membership list.
1518  *
1519  * Note that this will NOT generate any PSYC traffic, it will merely update the
1520  * local database to modify how we react to <em>membership test</em> queries.
1521  * The channel master still needs to explicitly transmit a @e part message to
1522  * notify other channel members and they then also must still call this function
1523  * in their respective methods handling the @e part message.  This way, how
1524  * @e join and @e part operations are exactly implemented is still up to the
1525  * application; for example, there might be a @e part_all message to kick out
1526  * everyone.
1527  *
1528  * Note that channel members are explicitly trusted to perform these
1529  * operations correctly; not doing so correctly will result in either
1530  * denying members access or offering access to channel data to
1531  * non-members.
1532  *
1533  * @param channel Channel handle.
1534  * @param slave_key Identity of channel slave to remove.
1535  * @param announced_at ID of the message that announced the membership change.
1536  */
1537 void
1538 GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *channel,
1539                                   const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
1540                                   uint64_t announced_at)
1541 {
1542   struct ChannelSlaveRemove *slvrm;
1543   struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + sizeof (*slvrm));
1544
1545   slvrm = (struct ChannelSlaveRemove *) &op[1];
1546   op->msg = (struct GNUNET_MessageHeader *) slvrm;
1547   slvrm->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM);
1548   slvrm->header.size = htons (sizeof (*slvrm));
1549   slvrm->announced_at = GNUNET_htonll (announced_at);
1550   GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head,
1551                                     channel->tmit_tail,
1552                                     op);
1553   transmit_next (channel);
1554 }
1555
1556
1557 /**
1558  * Request to be told the message history of the channel.
1559  *
1560  * Historic messages (but NOT the state at the time) will be replayed (given to
1561  * the normal method handlers) if available and if access is permitted.
1562  *
1563  * To get the latest message, use 0 for both the start and end message ID.
1564  *
1565  * @param channel Which channel should be replayed?
1566  * @param start_message_id Earliest interesting point in history.
1567  * @param end_message_id Last (exclusive) interesting point in history.
1568  * @param message_cb Function to invoke on message parts received from the story.
1569  * @param finish_cb Function to call when the requested story has been fully
1570  *        told (counting message IDs might not suffice, as some messages
1571  *        might be secret and thus the listener would not know the story is
1572  *        finished without being told explicitly) once this function
1573  *        has been called, the client must not call
1574  *        GNUNET_PSYC_channel_story_tell_cancel() anymore.
1575  * @param cls Closure for the callbacks.
1576  * @return Handle to cancel story telling operation.
1577  */
1578 struct GNUNET_PSYC_Story *
1579 GNUNET_PSYC_channel_story_tell (struct GNUNET_PSYC_Channel *channel,
1580                                 uint64_t start_message_id,
1581                                 uint64_t end_message_id,
1582                                 GNUNET_PSYC_MessageCallback message_cb,
1583                                 GNUNET_PSYC_FinishCallback finish_cb,
1584                                 void *cls)
1585 {
1586   return NULL;
1587 }
1588
1589
1590 /**
1591  * Abort story telling.
1592  *
1593  * This function must not be called from within method handlers (as given to
1594  * GNUNET_PSYC_slave_join()) of the slave.
1595  *
1596  * @param story Story telling operation to stop.
1597  */
1598 void
1599 GNUNET_PSYC_channel_story_tell_cancel (struct GNUNET_PSYC_Story *story)
1600 {
1601
1602 }
1603
1604
1605 /**
1606  * Retrieve the best matching channel state variable.
1607  *
1608  * If the requested variable name is not present in the state, the nearest
1609  * less-specific name is matched; for example, requesting "_a_b" will match "_a"
1610  * if "_a_b" does not exist.
1611  *
1612  * @param channel Channel handle.
1613  * @param full_name Full name of the requested variable, the actual variable
1614  *        returned might have a shorter name..
1615  * @param cb Function called once when a matching state variable is found.
1616  *        Not called if there's no matching state variable.
1617  * @param cb_cls Closure for the callbacks.
1618  * @return Handle that can be used to cancel the query operation.
1619  */
1620 struct GNUNET_PSYC_StateQuery *
1621 GNUNET_PSYC_channel_state_get (struct GNUNET_PSYC_Channel *channel,
1622                                const char *full_name,
1623                                GNUNET_PSYC_StateCallback cb,
1624                                void *cb_cls)
1625 {
1626   return NULL;
1627 }
1628
1629
1630 /**
1631  * Return all channel state variables whose name matches a given prefix.
1632  *
1633  * A name matches if it starts with the given @a name_prefix, thus requesting
1634  * the empty prefix ("") will match all values; requesting "_a_b" will also
1635  * return values stored under "_a_b_c".
1636  *
1637  * The @a state_cb is invoked on all matching state variables asynchronously, as
1638  * the state is stored in and retrieved from the PSYCstore,
1639  *
1640  * @param channel Channel handle.
1641  * @param name_prefix Prefix of the state variable name to match.
1642  * @param cb Function to call with the matching state variables.
1643  * @param cb_cls Closure for the callbacks.
1644  * @return Handle that can be used to cancel the query operation.
1645  */
1646 struct GNUNET_PSYC_StateQuery *
1647 GNUNET_PSYC_channel_state_get_prefix (struct GNUNET_PSYC_Channel *channel,
1648                                       const char *name_prefix,
1649                                       GNUNET_PSYC_StateCallback cb,
1650                                       void *cb_cls)
1651 {
1652   return NULL;
1653 }
1654
1655
1656 /**
1657  * Cancel a state query operation.
1658  *
1659  * @param query Handle for the operation to cancel.
1660  */
1661 void
1662 GNUNET_PSYC_channel_state_get_cancel (struct GNUNET_PSYC_StateQuery *query)
1663 {
1664
1665 }
1666
1667
1668 /* end of psyc_api.c */