psyc, multicast: join decision
[oweals/gnunet.git] / src / psyc / psyc_api.c
1 /*
2  * This file is part of GNUnet
3  * (C) 2013 Christian Grothoff (and other contributing authors)
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * @file psyc/psyc_api.c
23  * @brief PSYC service; high-level access to the PSYC protocol
24  *        note that clients of this API are NOT expected to
25  *        understand the PSYC message format, only the semantics!
26  *        Parsing (and serializing) the PSYC stream format is done
27  *        within the implementation of the libgnunetpsyc library,
28  *        and this API deliberately exposes as little as possible
29  *        of the actual data stream format to the application!
30  * @author Gabor X Toth
31  */
32
33 #include <inttypes.h>
34
35 #include "platform.h"
36 #include "gnunet_util_lib.h"
37 #include "gnunet_env_lib.h"
38 #include "gnunet_multicast_service.h"
39 #include "gnunet_psyc_service.h"
40 #include "psyc.h"
41
42 #define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__)
43
44 struct MessageQueue
45 {
46   struct MessageQueue *prev;
47   struct MessageQueue *next;
48   /* Followed by struct GNUNET_MessageHeader msg */
49 };
50
51
52 /**
53  * Handle for a pending PSYC transmission operation.
54  */
55 struct GNUNET_PSYC_ChannelTransmitHandle
56 {
57   struct GNUNET_PSYC_Channel *ch;
58   GNUNET_PSYC_TransmitNotifyModifier notify_mod;
59   GNUNET_PSYC_TransmitNotifyData notify_data;
60   void *notify_cls;
61   enum MessageState state;
62 };
63
64 /**
65  * Handle to access PSYC channel operations for both the master and slaves.
66  */
67 struct GNUNET_PSYC_Channel
68 {
69   /**
70    * Transmission handle;
71    */
72   struct GNUNET_PSYC_ChannelTransmitHandle tmit;
73
74   /**
75    * Configuration to use.
76    */
77   const struct GNUNET_CONFIGURATION_Handle *cfg;
78
79   /**
80    * Socket (if available).
81    */
82   struct GNUNET_CLIENT_Connection *client;
83
84   /**
85    * Currently pending transmission request, or NULL for none.
86    */
87   struct GNUNET_CLIENT_TransmitHandle *th;
88
89   /**
90    * Head of messages to transmit to the service.
91    */
92   struct MessageQueue *tmit_head;
93
94   /**
95    * Tail of operations to transmit to the service.
96    */
97   struct MessageQueue *tmit_tail;
98
99   /**
100    * Message currently being transmitted to the service.
101    */
102   struct MessageQueue *tmit_msg;
103
104   /**
105    * Message to send on reconnect.
106    */
107   struct GNUNET_MessageHeader *reconnect_msg;
108
109   /**
110    * Task doing exponential back-off trying to reconnect.
111    */
112   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
113
114   /**
115    * Time for next connect retry.
116    */
117   struct GNUNET_TIME_Relative reconnect_delay;
118
119   /**
120    * Message part callback.
121    */
122   GNUNET_PSYC_MessageCallback message_cb;
123
124   /**
125    * Message part callback for historic message.
126    */
127   GNUNET_PSYC_MessageCallback hist_message_cb;
128
129   /**
130    * Closure for @a message_cb.
131    */
132   void *cb_cls;
133
134   /**
135    * ID of the message being received from the PSYC service.
136    */
137   uint64_t recv_message_id;
138
139   /**
140    * Public key of the slave from which a message is being received.
141    */
142   struct GNUNET_CRYPTO_EddsaPublicKey recv_slave_key;
143
144   /**
145    * State of the currently being received message from the PSYC service.
146    */
147   enum MessageState recv_state;
148
149   /**
150    * Flags for the currently being received message from the PSYC service.
151    */
152   enum GNUNET_PSYC_MessageFlags recv_flags;
153
154   /**
155    * Expected value size for the modifier being received from the PSYC service.
156    */
157   uint32_t recv_mod_value_size_expected;
158
159   /**
160    * Actual value size for the modifier being received from the PSYC service.
161    */
162   uint32_t recv_mod_value_size;
163
164   /**
165    * Is transmission paused?
166    */
167   uint8_t tmit_paused;
168
169   /**
170    * Are we still waiting for a PSYC_TRANSMIT_ACK?
171    */
172   uint8_t tmit_ack_pending;
173
174   /**
175    * Are we polling for incoming messages right now?
176    */
177   uint8_t in_receive;
178
179   /**
180    * Are we currently transmitting a message?
181    */
182   uint8_t in_transmit;
183
184   /**
185    * Is this a master or slave channel?
186    */
187   uint8_t is_master;
188 };
189
190
191 /**
192  * Handle for the master of a PSYC channel.
193  */
194 struct GNUNET_PSYC_Master
195 {
196   struct GNUNET_PSYC_Channel ch;
197
198   GNUNET_PSYC_MasterStartCallback start_cb;
199
200   /**
201    * Join handler callback.
202    */
203   GNUNET_PSYC_JoinCallback join_cb;
204 };
205
206
207 /**
208  * Handle for a PSYC channel slave.
209  */
210 struct GNUNET_PSYC_Slave
211 {
212   struct GNUNET_PSYC_Channel ch;
213
214   GNUNET_PSYC_SlaveJoinCallback join_cb;
215 };
216
217
218 /**
219  * Handle that identifies a join request.
220  *
221  * Used to match calls to #GNUNET_PSYC_JoinCallback to the
222  * corresponding calls to GNUNET_PSYC_join_decision().
223  */
224 struct GNUNET_PSYC_JoinHandle
225 {
226   struct GNUNET_PSYC_Master *mst;
227   struct GNUNET_CRYPTO_EddsaPublicKey slave_key;
228 };
229
230
231 /**
232  * Handle for a pending PSYC transmission operation.
233  */
234 struct GNUNET_PSYC_SlaveTransmitHandle
235 {
236
237 };
238
239
240 /**
241  * Handle to a story telling operation.
242  */
243 struct GNUNET_PSYC_Story
244 {
245
246 };
247
248
249 /**
250  * Handle for a state query operation.
251  */
252 struct GNUNET_PSYC_StateQuery
253 {
254
255 };
256
257
258 static void
259 reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
260
261
262 static void
263 channel_transmit_data (struct GNUNET_PSYC_Channel *ch);
264
265
266 /**
267  * Reschedule a connect attempt to the service.
268  *
269  * @param ch  Channel to reconnect.
270  */
271 static void
272 reschedule_connect (struct GNUNET_PSYC_Channel *ch)
273 {
274   GNUNET_assert (ch->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
275
276   if (NULL != ch->th)
277   {
278     GNUNET_CLIENT_notify_transmit_ready_cancel (ch->th);
279     ch->th = NULL;
280   }
281   if (NULL != ch->client)
282   {
283     GNUNET_CLIENT_disconnect (ch->client);
284     ch->client = NULL;
285   }
286   ch->in_receive = GNUNET_NO;
287   LOG (GNUNET_ERROR_TYPE_DEBUG,
288        "Scheduling task to reconnect to PSYC service in %s.\n",
289        GNUNET_STRINGS_relative_time_to_string (ch->reconnect_delay, GNUNET_YES));
290   ch->reconnect_task =
291       GNUNET_SCHEDULER_add_delayed (ch->reconnect_delay, &reconnect, ch);
292   ch->reconnect_delay = GNUNET_TIME_STD_BACKOFF (ch->reconnect_delay);
293 }
294
295
296 /**
297  * Schedule transmission of the next message from our queue.
298  *
299  * @param ch PSYC channel handle
300  */
301 static void
302 transmit_next (struct GNUNET_PSYC_Channel *ch);
303
304
305 /**
306  * Reset stored data related to the last received message.
307  */
308 static void
309 recv_reset (struct GNUNET_PSYC_Channel *ch)
310 {
311   ch->recv_state = MSG_STATE_START;
312   ch->recv_flags = 0;
313   ch->recv_message_id = 0;
314   //FIXME: ch->recv_slave_key = { 0 };
315   ch->recv_mod_value_size = 0;
316   ch->recv_mod_value_size_expected = 0;
317 }
318
319
320 static void
321 recv_error (struct GNUNET_PSYC_Channel *ch)
322 {
323   GNUNET_PSYC_MessageCallback message_cb
324     = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC
325     ? ch->hist_message_cb
326     : ch->message_cb;
327
328   if (NULL != message_cb)
329     message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, NULL);
330
331   recv_reset (ch);
332 }
333
334
335 /**
336  * Queue a message part for transmission to the PSYC service.
337  *
338  * The message part is added to the current message buffer.
339  * When this buffer is full, it is added to the transmission queue.
340  *
341  * @param ch Channel struct for the client.
342  * @param msg Modifier message part, or NULL when there's no more modifiers.
343  * @param end End of message.
344  */
345 static void
346 queue_message (struct GNUNET_PSYC_Channel *ch,
347                const struct GNUNET_MessageHeader *msg,
348                uint8_t end)
349 {
350   uint16_t size = msg ? ntohs (msg->size) : 0;
351
352   LOG (GNUNET_ERROR_TYPE_DEBUG,
353        "Queueing message of type %u and size %u (end: %u)).\n",
354        ntohs (msg->type), size, end);
355
356   struct MessageQueue *mq = ch->tmit_msg;
357   struct GNUNET_MessageHeader *qmsg = NULL;
358   if (NULL != mq)
359   {
360     qmsg = (struct GNUNET_MessageHeader *) &mq[1];
361     if (NULL == msg
362         || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < qmsg->size + size)
363     {
364       /* End of message or buffer is full, add it to transmission queue
365        * and start with empty buffer */
366       qmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
367       qmsg->size = htons (qmsg->size);
368       GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, mq);
369       ch->tmit_msg = mq = NULL;
370       ch->tmit_ack_pending++;
371     }
372     else
373     {
374       /* Message fits in current buffer, append */
375       ch->tmit_msg
376         = mq = GNUNET_realloc (mq, sizeof (*mq) + qmsg->size + size);
377       qmsg = (struct GNUNET_MessageHeader *) &mq[1];
378       memcpy ((char *) qmsg + qmsg->size, msg, size);
379       qmsg->size += size;
380     }
381   }
382
383   if (NULL == mq && NULL != msg)
384   {
385     /* Empty buffer, copy over message. */
386     ch->tmit_msg
387       = mq = GNUNET_malloc (sizeof (*mq) + sizeof (*qmsg) + size);
388     qmsg = (struct GNUNET_MessageHeader *) &mq[1];
389     qmsg->size = sizeof (*qmsg) + size;
390     memcpy (&qmsg[1], msg, size);
391   }
392
393   if (NULL != mq
394       && (GNUNET_YES == end
395           || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
396               < qmsg->size + sizeof (struct GNUNET_MessageHeader))))
397   {
398     /* End of message or buffer is full, add it to transmission queue. */
399     qmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
400     qmsg->size = htons (qmsg->size);
401     GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, mq);
402     ch->tmit_msg = mq = NULL;
403     ch->tmit_ack_pending++;
404   }
405
406   if (GNUNET_YES == end)
407     ch->in_transmit = GNUNET_NO;
408
409   transmit_next (ch);
410 }
411
412
413 /**
414  * Request a modifier from a client to transmit.
415  *
416  * @param mst Master handle.
417  */
418 static void
419 channel_transmit_mod (struct GNUNET_PSYC_Channel *ch)
420 {
421   uint16_t max_data_size, data_size;
422   char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
423   struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
424   int notify_ret;
425
426   switch (ch->tmit.state)
427   {
428   case MSG_STATE_MODIFIER:
429   {
430     struct GNUNET_PSYC_MessageModifier *mod
431       = (struct GNUNET_PSYC_MessageModifier *) msg;
432     max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
433     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
434     msg->size = sizeof (struct GNUNET_PSYC_MessageModifier);
435     notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls, &data_size, &mod[1],
436                                       &mod->oper, &mod->value_size);
437     mod->name_size = strnlen ((char *) &mod[1], data_size);
438     if (mod->name_size < data_size)
439     {
440       mod->value_size = htonl (mod->value_size);
441       mod->name_size = htons (mod->name_size);
442     }
443     else if (0 < data_size)
444     {
445       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got invalid modifier name.\n");
446       notify_ret = GNUNET_SYSERR;
447     }
448     break;
449   }
450   case MSG_STATE_MOD_CONT:
451   {
452     max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
453     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
454     msg->size = sizeof (struct GNUNET_MessageHeader);
455     notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls,
456                                       &data_size, &msg[1], NULL, NULL);
457     break;
458   }
459   default:
460     GNUNET_assert (0);
461   }
462
463   switch (notify_ret)
464   {
465   case GNUNET_NO:
466     if (0 == data_size)
467     { /* Transmission paused, nothing to send. */
468       ch->tmit_paused = GNUNET_YES;
469       return;
470     }
471     ch->tmit.state = MSG_STATE_MOD_CONT;
472     break;
473
474   case GNUNET_YES:
475     if (0 == data_size)
476     {
477       /* End of modifiers. */
478       ch->tmit.state = MSG_STATE_DATA;
479       if (0 == ch->tmit_ack_pending)
480         channel_transmit_data (ch);
481
482       return;
483     }
484     ch->tmit.state = MSG_STATE_MODIFIER;
485     break;
486
487   default:
488     LOG (GNUNET_ERROR_TYPE_ERROR,
489          "MasterTransmitNotifyModifier returned error "
490          "when requesting a modifier.\n");
491
492     ch->tmit.state = MSG_STATE_CANCEL;
493     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
494     msg->size = htons (sizeof (*msg));
495
496     queue_message (ch, msg, GNUNET_YES);
497     return;
498   }
499
500   if (0 < data_size)
501   {
502     GNUNET_assert (data_size <= max_data_size);
503     msg->size = htons (msg->size + data_size);
504     queue_message (ch, msg, GNUNET_NO);
505   }
506
507   channel_transmit_mod (ch);
508 }
509
510
511 /**
512  * Request data from a client to transmit.
513  *
514  * @param mst Master handle.
515  */
516 static void
517 channel_transmit_data (struct GNUNET_PSYC_Channel *ch)
518 {
519   uint16_t data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD;
520   char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
521   struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
522
523   msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
524
525   int notify_ret = ch->tmit.notify_data (ch->tmit.notify_cls,
526                                          &data_size, &msg[1]);
527   switch (notify_ret)
528   {
529   case GNUNET_NO:
530     if (0 == data_size)
531     {
532       /* Transmission paused, nothing to send. */
533       ch->tmit_paused = GNUNET_YES;
534       return;
535     }
536     break;
537
538   case GNUNET_YES:
539     ch->tmit.state = MSG_STATE_END;
540     break;
541
542   default:
543     LOG (GNUNET_ERROR_TYPE_ERROR,
544          "MasterTransmitNotify returned error when requesting data.\n");
545
546     ch->tmit.state = MSG_STATE_CANCEL;
547     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
548     msg->size = htons (sizeof (*msg));
549     queue_message (ch, msg, GNUNET_YES);
550     return;
551   }
552
553   if (0 < data_size)
554   {
555     GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD);
556     msg->size = htons (sizeof (*msg) + data_size);
557     queue_message (ch, msg, !notify_ret);
558   }
559
560   /* End of message. */
561   if (GNUNET_YES == notify_ret)
562   {
563     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
564     msg->size = htons (sizeof (*msg));
565     queue_message (ch, msg, GNUNET_YES);
566   }
567 }
568
569
570 /**
571  * Send a message to a channel.
572  *
573  * @param ch Handle to the PSYC channel.
574  * @param method_name Which method should be invoked.
575  * @param notify_mod Function to call to obtain modifiers.
576  * @param notify_data Function to call to obtain fragments of the data.
577  * @param notify_cls Closure for @a notify_mod and @a notify_data.
578  * @param flags Flags for the message being transmitted.
579  *
580  * @return Transmission handle, NULL on error (i.e. more than one request queued).
581  */
582 static struct GNUNET_PSYC_ChannelTransmitHandle *
583 channel_transmit (struct GNUNET_PSYC_Channel *ch,
584                   const char *method_name,
585                   GNUNET_PSYC_TransmitNotifyModifier notify_mod,
586                   GNUNET_PSYC_TransmitNotifyData notify_data,
587                   void *notify_cls,
588                   uint32_t flags)
589 {
590   if (GNUNET_NO != ch->in_transmit)
591     return NULL;
592   ch->in_transmit = GNUNET_YES;
593
594   size_t size = strlen (method_name) + 1;
595   struct GNUNET_PSYC_MessageMethod *pmeth;
596   struct GNUNET_MessageHeader *qmsg;
597   struct MessageQueue *
598     mq = ch->tmit_msg = GNUNET_malloc (sizeof (*mq) + sizeof (*qmsg)
599                                        + sizeof (*pmeth) + size);
600   qmsg = (struct GNUNET_MessageHeader *) &mq[1];
601   qmsg->size = sizeof (*qmsg) + sizeof (*pmeth) + size;
602
603   pmeth = (struct GNUNET_PSYC_MessageMethod *) &qmsg[1];
604   pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
605   pmeth->header.size = htons (sizeof (*pmeth) + size);
606   pmeth->flags = htonl (flags);
607   memcpy (&pmeth[1], method_name, size);
608
609   ch->tmit.ch = ch;
610   ch->tmit.notify_mod = notify_mod;
611   ch->tmit.notify_data = notify_data;
612   ch->tmit.notify_cls = notify_cls;
613   ch->tmit.state = MSG_STATE_MODIFIER;
614
615   channel_transmit_mod (ch);
616   return &ch->tmit;
617 }
618
619
620 /**
621  * Resume transmission to the channel.
622  *
623  * @param th Handle of the request that is being resumed.
624  */
625 static void
626 channel_transmit_resume (struct GNUNET_PSYC_ChannelTransmitHandle *th)
627 {
628   struct GNUNET_PSYC_Channel *ch = th->ch;
629   if (0 == ch->tmit_ack_pending)
630   {
631     ch->tmit_paused = GNUNET_NO;
632     channel_transmit_data (ch);
633   }
634 }
635
636
637 /**
638  * Abort transmission request to channel.
639  *
640  * @param th Handle of the request that is being aborted.
641  */
642 static void
643 channel_transmit_cancel (struct GNUNET_PSYC_ChannelTransmitHandle *th)
644 {
645   struct GNUNET_PSYC_Channel *ch = th->ch;
646   if (GNUNET_NO == ch->in_transmit)
647     return;
648 }
649
650
651 /**
652  * Handle incoming message from the PSYC service.
653  *
654  * @param ch The channel the message is sent to.
655  * @param pmsg The message.
656  */
657 static void
658 handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
659                      const struct GNUNET_PSYC_MessageHeader *msg)
660 {
661   uint16_t size = ntohs (msg->header.size);
662   uint32_t flags = ntohl (msg->flags);
663
664   GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG,
665                            (struct GNUNET_MessageHeader *) msg);
666
667   if (MSG_STATE_START == ch->recv_state)
668   {
669     ch->recv_message_id = GNUNET_ntohll (msg->message_id);
670     ch->recv_flags = flags;
671     ch->recv_slave_key = msg->slave_key;
672     ch->recv_mod_value_size = 0;
673     ch->recv_mod_value_size_expected = 0;
674   }
675   else if (GNUNET_ntohll (msg->message_id) != ch->recv_message_id)
676   {
677     // FIXME
678     LOG (GNUNET_ERROR_TYPE_WARNING,
679          "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n",
680          GNUNET_ntohll (msg->message_id), ch->recv_message_id);
681     GNUNET_break_op (0);
682     recv_error (ch);
683     return;
684   }
685   else if (flags != ch->recv_flags)
686   {
687     LOG (GNUNET_ERROR_TYPE_WARNING,
688          "Unexpected message flags. Got: %lu, expected: %lu\n",
689          flags, ch->recv_flags);
690     GNUNET_break_op (0);
691     recv_error (ch);
692     return;
693   }
694
695   uint16_t pos = 0, psize = 0, ptype, size_eq, size_min;
696
697   for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
698   {
699     const struct GNUNET_MessageHeader *pmsg
700       = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
701     psize = ntohs (pmsg->size);
702     ptype = ntohs (pmsg->type);
703     size_eq = size_min = 0;
704
705     if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
706     {
707       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
708                   "Dropping message of type %u with invalid size %u.\n",
709                   ptype, psize);
710       recv_error (ch);
711       return;
712     }
713
714     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
715                 "Received message part from PSYC.\n");
716     GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
717
718     switch (ptype)
719     {
720     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
721       size_min = sizeof (struct GNUNET_PSYC_MessageMethod);
722       break;
723     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
724       size_min = sizeof (struct GNUNET_PSYC_MessageModifier);
725       break;
726     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
727     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
728       size_min = sizeof (struct GNUNET_MessageHeader);
729       break;
730     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
731     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
732       size_eq = sizeof (struct GNUNET_MessageHeader);
733       break;
734     default:
735       GNUNET_break_op (0);
736       recv_error (ch);
737       return;
738     }
739
740     if (! ((0 < size_eq && psize == size_eq)
741            || (0 < size_min && size_min <= psize)))
742     {
743       GNUNET_break_op (0);
744       recv_error (ch);
745       return;
746     }
747
748     switch (ptype)
749     {
750     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
751     {
752       struct GNUNET_PSYC_MessageMethod *meth
753         = (struct GNUNET_PSYC_MessageMethod *) pmsg;
754
755       if (MSG_STATE_START != ch->recv_state)
756       {
757         LOG (GNUNET_ERROR_TYPE_WARNING,
758              "Dropping out of order message method (%u).\n",
759              ch->recv_state);
760         /* It is normal to receive an incomplete message right after connecting,
761          * but should not happen later.
762          * FIXME: add a check for this condition.
763          */
764         GNUNET_break_op (0);
765         recv_error (ch);
766         return;
767       }
768
769       if ('\0' != *((char *) meth + psize - 1))
770       {
771         LOG (GNUNET_ERROR_TYPE_WARNING,
772              "Dropping message with malformed method. "
773              "Message ID: %" PRIu64 "\n", ch->recv_message_id);
774         GNUNET_break_op (0);
775         recv_error (ch);
776         return;
777       }
778       ch->recv_state = MSG_STATE_METHOD;
779       break;
780     }
781     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
782     {
783       if (!(MSG_STATE_METHOD == ch->recv_state
784             || MSG_STATE_MODIFIER == ch->recv_state
785             || MSG_STATE_MOD_CONT == ch->recv_state))
786       {
787         LOG (GNUNET_ERROR_TYPE_WARNING,
788              "Dropping out of order message modifier (%u).\n",
789              ch->recv_state);
790         GNUNET_break_op (0);
791         recv_error (ch);
792         return;
793       }
794
795       struct GNUNET_PSYC_MessageModifier *mod
796         = (struct GNUNET_PSYC_MessageModifier *) pmsg;
797
798       uint16_t name_size = ntohs (mod->name_size);
799       ch->recv_mod_value_size_expected = ntohl (mod->value_size);
800       ch->recv_mod_value_size = psize - sizeof (*mod) - name_size - 1;
801
802       if (psize < sizeof (*mod) + name_size + 1
803           || '\0' != *((char *) &mod[1] + name_size)
804           || ch->recv_mod_value_size_expected < ch->recv_mod_value_size)
805       {
806         LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping malformed modifier.\n");
807         GNUNET_break_op (0);
808         recv_error (ch);
809         return;
810       }
811       ch->recv_state = MSG_STATE_MODIFIER;
812       break;
813     }
814     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
815     {
816       ch->recv_mod_value_size += psize - sizeof (*pmsg);
817
818       if (!(MSG_STATE_MODIFIER == ch->recv_state
819             || MSG_STATE_MOD_CONT == ch->recv_state)
820           || ch->recv_mod_value_size_expected < ch->recv_mod_value_size)
821       {
822         LOG (GNUNET_ERROR_TYPE_WARNING,
823              "Dropping out of order message modifier continuation "
824              "!(%u == %u || %u == %u) || %lu < %lu.\n",
825              MSG_STATE_MODIFIER, ch->recv_state,
826              MSG_STATE_MOD_CONT, ch->recv_state,
827              ch->recv_mod_value_size_expected, ch->recv_mod_value_size);
828         GNUNET_break_op (0);
829         recv_error (ch);
830         return;
831       }
832       break;
833     }
834     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
835     {
836       if (ch->recv_state < MSG_STATE_METHOD
837           || ch->recv_mod_value_size_expected != ch->recv_mod_value_size)
838       {
839         LOG (GNUNET_ERROR_TYPE_WARNING,
840              "Dropping out of order message data fragment "
841              "(%u < %u || %lu != %lu).\n",
842              ch->recv_state, MSG_STATE_METHOD,
843              ch->recv_mod_value_size_expected, ch->recv_mod_value_size);
844
845         GNUNET_break_op (0);
846         recv_error (ch);
847         return;
848       }
849       ch->recv_state = MSG_STATE_DATA;
850       break;
851     }
852     }
853
854     GNUNET_PSYC_MessageCallback message_cb
855       = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC
856       ? ch->hist_message_cb
857       : ch->message_cb;
858
859     if (NULL != message_cb)
860       message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, pmsg);
861
862     switch (ptype)
863     {
864     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
865     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
866       recv_reset (ch);
867       break;
868     }
869   }
870 }
871
872
873 /**
874  * Handle incoming message acknowledgement from the PSYC service.
875  *
876  * @param ch The channel the acknowledgement is sent to.
877  */
878 static void
879 handle_psyc_message_ack (struct GNUNET_PSYC_Channel *ch)
880 {
881   if (0 == ch->tmit_ack_pending)
882   {
883     LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n");
884     GNUNET_break (0);
885     return;
886   }
887   ch->tmit_ack_pending--;
888
889   switch (ch->tmit.state)
890   {
891   case MSG_STATE_MODIFIER:
892   case MSG_STATE_MOD_CONT:
893     if (GNUNET_NO == ch->tmit_paused)
894       channel_transmit_mod (ch);
895     break;
896
897   case MSG_STATE_DATA:
898     if (GNUNET_NO == ch->tmit_paused)
899       channel_transmit_data (ch);
900     break;
901
902   case MSG_STATE_END:
903   case MSG_STATE_CANCEL:
904     break;
905
906   default:
907     LOG (GNUNET_ERROR_TYPE_DEBUG,
908          "Ignoring message ACK in state %u.\n", ch->tmit.state);
909   }
910 }
911
912
913 static void
914 handle_psyc_join_request (struct GNUNET_PSYC_Master *mst,
915                           const struct MasterJoinRequest *req)
916 {
917   struct GNUNET_PSYC_MessageHeader *msg = NULL;
918   if (ntohs (req->header.size) <= sizeof (*req) + sizeof (*msg))
919     msg = (struct GNUNET_PSYC_MessageHeader *) &req[1];
920
921   struct GNUNET_PSYC_JoinHandle *jh = GNUNET_malloc (sizeof (*jh));
922   jh->mst = mst;
923   jh->slave_key = req->slave_key;
924
925   mst->join_cb (mst->ch.cb_cls, &req->slave_key, msg, jh);
926 }
927
928
929 /**
930  * Type of a function to call when we receive a message
931  * from the service.
932  *
933  * @param cls closure
934  * @param msg message received, NULL on timeout or fatal error
935  */
936 static void
937 message_handler (void *cls,
938                  const struct GNUNET_MessageHeader *msg)
939 {
940   struct GNUNET_PSYC_Channel *ch = cls;
941   struct GNUNET_PSYC_Master *mst = cls;
942   struct GNUNET_PSYC_Slave *slv = cls;
943
944   if (NULL == msg)
945   {
946     // timeout / disconnected from service, reconnect
947     reschedule_connect (ch);
948     return;
949   }
950   uint16_t size_eq = 0;
951   uint16_t size_min = 0;
952   uint16_t size = ntohs (msg->size);
953   uint16_t type = ntohs (msg->type);
954
955   LOG (GNUNET_ERROR_TYPE_DEBUG,
956        "Received message of type %d and size %u from PSYC service\n",
957        type, size);
958
959   switch (type)
960   {
961   case GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK:
962   case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK:
963     size_eq = sizeof (struct CountersResult);
964     break;
965   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
966     size_min = sizeof (struct GNUNET_PSYC_MessageHeader);
967     break;
968   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK:
969     size_eq = sizeof (struct GNUNET_MessageHeader);
970     break;
971   case GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST:
972     size_min = sizeof (struct MasterJoinRequest);
973     break;
974   default:
975     GNUNET_break_op (0);
976     return;
977   }
978
979   if (! ((0 < size_eq && size == size_eq)
980          || (0 < size_min && size_min <= size)))
981   {
982     GNUNET_break_op (0);
983     return;
984   }
985
986   switch (type)
987   {
988   case GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK:
989   {
990     struct CountersResult *cres = (struct CountersResult *) msg;
991     if (NULL != mst->start_cb)
992       mst->start_cb (ch->cb_cls, GNUNET_ntohll (cres->max_message_id));
993     break;
994   }
995   case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK:
996   {
997     struct CountersResult *cres = (struct CountersResult *) msg;
998     if (NULL != slv->join_cb)
999       slv->join_cb (ch->cb_cls, GNUNET_ntohll (cres->max_message_id));
1000     break;
1001   }
1002   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK:
1003   {
1004     handle_psyc_message_ack (ch);
1005     break;
1006   }
1007
1008   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
1009     handle_psyc_message (ch, (const struct GNUNET_PSYC_MessageHeader *) msg);
1010     break;
1011
1012   case GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST:
1013     handle_psyc_join_request ((struct GNUNET_PSYC_Master *) ch,
1014                               (const struct MasterJoinRequest *) msg);
1015     break;
1016   }
1017
1018   if (NULL != ch->client)
1019   {
1020     GNUNET_CLIENT_receive (ch->client, &message_handler, ch,
1021                            GNUNET_TIME_UNIT_FOREVER_REL);
1022   }
1023 }
1024
1025
1026 /**
1027  * Transmit next message to service.
1028  *
1029  * @param cls   The struct GNUNET_PSYC_Channel.
1030  * @param size  Number of bytes available in @a buf.
1031  * @param buf   Where to copy the message.
1032  *
1033  * @return Number of bytes copied to @a buf.
1034  */
1035 static size_t
1036 send_next_message (void *cls, size_t size, void *buf)
1037 {
1038   LOG (GNUNET_ERROR_TYPE_DEBUG, "send_next_message()\n");
1039   struct GNUNET_PSYC_Channel *ch = cls;
1040   struct MessageQueue *mq = ch->tmit_head;
1041   if (NULL == mq)
1042     return 0;
1043   struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1];
1044   size_t ret = ntohs (qmsg->size);
1045   ch->th = NULL;
1046   if (ret > size)
1047   {
1048     reschedule_connect (ch);
1049     return 0;
1050   }
1051   memcpy (buf, qmsg, ret);
1052
1053   GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, mq);
1054   GNUNET_free (mq);
1055
1056   if (NULL != ch->tmit_head)
1057     transmit_next (ch);
1058
1059   if (GNUNET_NO == ch->in_receive)
1060   {
1061     ch->in_receive = GNUNET_YES;
1062     GNUNET_CLIENT_receive (ch->client, &message_handler, ch,
1063                            GNUNET_TIME_UNIT_FOREVER_REL);
1064   }
1065   return ret;
1066 }
1067
1068
1069 /**
1070  * Schedule transmission of the next message from our queue.
1071  *
1072  * @param ch PSYC handle.
1073  */
1074 static void
1075 transmit_next (struct GNUNET_PSYC_Channel *ch)
1076 {
1077   LOG (GNUNET_ERROR_TYPE_DEBUG, "transmit_next()\n");
1078   if (NULL != ch->th || NULL == ch->client)
1079     return;
1080
1081   struct MessageQueue *mq = ch->tmit_head;
1082   if (NULL == mq)
1083     return;
1084   struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1];
1085
1086   ch->th = GNUNET_CLIENT_notify_transmit_ready (ch->client,
1087                                                 ntohs (qmsg->size),
1088                                                 GNUNET_TIME_UNIT_FOREVER_REL,
1089                                                 GNUNET_NO,
1090                                                 &send_next_message,
1091                                                 ch);
1092 }
1093
1094
1095 /**
1096  * Try again to connect to the PSYC service.
1097  *
1098  * @param cls Channel handle.
1099  * @param tc Scheduler context.
1100  */
1101 static void
1102 reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1103 {
1104   struct GNUNET_PSYC_Channel *ch = cls;
1105
1106   recv_reset (ch);
1107   ch->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
1108   LOG (GNUNET_ERROR_TYPE_DEBUG,
1109        "Connecting to PSYC service.\n");
1110   GNUNET_assert (NULL == ch->client);
1111   ch->client = GNUNET_CLIENT_connect ("psyc", ch->cfg);
1112   GNUNET_assert (NULL != ch->client);
1113   uint16_t reconn_size = ntohs (ch->reconnect_msg->size);
1114
1115   if (NULL == ch->tmit_head ||
1116       0 != memcmp (&ch->tmit_head[1], ch->reconnect_msg, reconn_size))
1117   {
1118     struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + reconn_size);
1119     memcpy (&mq[1], ch->reconnect_msg, reconn_size);
1120     GNUNET_CONTAINER_DLL_insert (ch->tmit_head, ch->tmit_tail, mq);
1121   }
1122   transmit_next (ch);
1123 }
1124
1125
1126 /**
1127  * Disconnect from the PSYC service.
1128  *
1129  * @param c  Channel handle to disconnect.
1130  */
1131 static void
1132 disconnect (void *c)
1133 {
1134   struct GNUNET_PSYC_Channel *ch = c;
1135
1136   GNUNET_assert (NULL != ch);
1137   if (ch->tmit_head != ch->tmit_tail)
1138   {
1139     LOG (GNUNET_ERROR_TYPE_ERROR,
1140          "Disconnecting while there are still outstanding messages!\n");
1141     GNUNET_break (0);
1142   }
1143   if (ch->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
1144   {
1145     GNUNET_SCHEDULER_cancel (ch->reconnect_task);
1146     ch->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
1147   }
1148   if (NULL != ch->th)
1149   {
1150     GNUNET_CLIENT_notify_transmit_ready_cancel (ch->th);
1151     ch->th = NULL;
1152   }
1153   if (NULL != ch->client)
1154   {
1155     GNUNET_CLIENT_disconnect (ch->client);
1156     ch->client = NULL;
1157   }
1158   if (NULL != ch->reconnect_msg)
1159   {
1160     GNUNET_free (ch->reconnect_msg);
1161     ch->reconnect_msg = NULL;
1162   }
1163 }
1164
1165
1166 /**
1167  * Start a PSYC master channel.
1168  *
1169  * Will start a multicast group identified by the given ECC key.  Messages
1170  * received from group members will be given to the respective handler methods.
1171  * If a new member wants to join a group, the "join" method handler will be
1172  * invoked; the join handler must then generate a "join" message to approve the
1173  * joining of the new member.  The channel can also change group membership
1174  * without explicit requests.  Note that PSYC doesn't itself "understand" join
1175  * or part messages, the respective methods must call other PSYC functions to
1176  * inform PSYC about the meaning of the respective events.
1177  *
1178  * @param cfg Configuration to use (to connect to PSYC service).
1179  * @param channel_key ECC key that will be used to sign messages for this
1180  *        PSYC session. The public key is used to identify the PSYC channel.
1181  *        Note that end-users will usually not use the private key directly, but
1182  *        rather look it up in GNS for places managed by other users, or select
1183  *        a file with the private key(s) when setting up their own channels
1184  *        FIXME: we'll likely want to use NOT the p521 curve here, but a cheaper
1185  *        one in the future.
1186  * @param policy Channel policy specifying join and history restrictions.
1187  *        Used to automate join decisions.
1188  * @param message_cb Function to invoke on message parts received from slaves.
1189  * @param join_cb Function to invoke when a peer wants to join.
1190  * @param master_started_cb Function to invoke after the channel master started.
1191  * @param cls Closure for @a master_started_cb and @a join_cb.
1192  *
1193  * @return Handle for the channel master, NULL on error.
1194  */
1195 struct GNUNET_PSYC_Master *
1196 GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
1197                           const struct GNUNET_CRYPTO_EddsaPrivateKey *channel_key,
1198                           enum GNUNET_PSYC_Policy policy,
1199                           GNUNET_PSYC_MessageCallback message_cb,
1200                           GNUNET_PSYC_JoinCallback join_cb,
1201                           GNUNET_PSYC_MasterStartCallback master_started_cb,
1202                           void *cls)
1203 {
1204   struct GNUNET_PSYC_Master *mst = GNUNET_malloc (sizeof (*mst));
1205   struct GNUNET_PSYC_Channel *ch = &mst->ch;
1206   struct MasterStartRequest *req = GNUNET_malloc (sizeof (*req));
1207
1208   req->header.size = htons (sizeof (*req));
1209   req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START);
1210   req->channel_key = *channel_key;
1211   req->policy = policy;
1212
1213   mst->start_cb = master_started_cb;
1214   mst->join_cb = join_cb;
1215   ch->message_cb = message_cb;
1216   ch->cb_cls = cls;
1217   ch->cfg = cfg;
1218   ch->is_master = GNUNET_YES;
1219   ch->reconnect_msg = (struct GNUNET_MessageHeader *) req;
1220   ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
1221   ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, mst);
1222
1223   return mst;
1224 }
1225
1226
1227 /**
1228  * Stop a PSYC master channel.
1229  *
1230  * @param master PSYC channel master to stop.
1231  */
1232 void
1233 GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *master)
1234 {
1235   disconnect (master);
1236   GNUNET_free (master);
1237 }
1238
1239
1240 /**
1241  * Function to call with the decision made for a join request.
1242  *
1243  * Must be called once and only once in response to an invocation of the
1244  * #GNUNET_PSYC_JoinCallback.
1245  *
1246  * @param jh Join request handle.
1247  * @param is_admitted #GNUNET_YES if joining is approved,
1248  *        #GNUNET_NO if it is disapproved.
1249  * @param relay_count Number of relays given.
1250  * @param relays Array of suggested peers that might be useful relays to use
1251  *        when joining the multicast group (essentially a list of peers that
1252  *        are already part of the multicast group and might thus be willing
1253  *        to help with routing).  If empty, only this local peer (which must
1254  *        be the multicast origin) is a good candidate for building the
1255  *        multicast tree.  Note that it is unnecessary to specify our own
1256  *        peer identity in this array.
1257  * @param method_name Method name for the message transmitted with the response.
1258  * @param env Environment containing transient variables for the message, or NULL.
1259  * @param data Data of the message.
1260  * @param data_size Size of @a data.
1261  */
1262 void
1263 GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh,
1264                            int is_admitted,
1265                            uint32_t relay_count,
1266                            const struct GNUNET_PeerIdentity *relays,
1267                            const char *method_name,
1268                            const struct GNUNET_ENV_Environment *env,
1269                            const void *data,
1270                            size_t data_size)
1271 {
1272   struct GNUNET_PSYC_Channel *ch = &jh->mst->ch;
1273
1274   struct MasterJoinDecision *dcsn;
1275   struct GNUNET_PSYC_MessageHeader *pmsg;
1276   uint16_t pmsg_size = 0;
1277 /* FIXME:
1278   sizeof (*pmsg)
1279     + sizeof (struct GNUNET_PSYC_MessageMethod)
1280     + vars_size
1281     + sizeof (struct GNUNET_MessageHeader) + data_size
1282     + sizeof (struct GNUNET_MessageHeader);
1283 */
1284   uint16_t relay_size = relay_count * sizeof (*relays);
1285   struct MessageQueue *
1286     mq = GNUNET_malloc (sizeof (*mq) + sizeof (*dcsn) + relay_size + pmsg_size);
1287   dcsn = (struct MasterJoinDecision *) &mq[1];
1288   dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
1289   dcsn->header.size = htons (sizeof (*mq) + sizeof (*dcsn)
1290                              + relay_size + pmsg_size);
1291   dcsn->is_admitted = (GNUNET_YES == is_admitted) ? GNUNET_YES : GNUNET_NO;
1292   dcsn->slave_key = jh->slave_key;
1293
1294   /* FIXME: add message parts to pmsg */
1295   memcpy (&dcsn[1], pmsg, pmsg_size);
1296
1297   GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, mq);
1298   transmit_next (ch);
1299 }
1300
1301
1302 /**
1303  * Send a message to call a method to all members in the PSYC channel.
1304  *
1305  * @param master Handle to the PSYC channel.
1306  * @param method_name Which method should be invoked.
1307  * @param notify_mod Function to call to obtain modifiers.
1308  * @param notify_data Function to call to obtain fragments of the data.
1309  * @param notify_cls Closure for @a notify_mod and @a notify_data.
1310  * @param flags Flags for the message being transmitted.
1311  *
1312  * @return Transmission handle, NULL on error (i.e. more than one request queued).
1313  */
1314 struct GNUNET_PSYC_MasterTransmitHandle *
1315 GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master,
1316                              const char *method_name,
1317                              GNUNET_PSYC_TransmitNotifyModifier notify_mod,
1318                              GNUNET_PSYC_TransmitNotifyData notify_data,
1319                              void *notify_cls,
1320                              enum GNUNET_PSYC_MasterTransmitFlags flags)
1321 {
1322   return (struct GNUNET_PSYC_MasterTransmitHandle *)
1323     channel_transmit (&master->ch, method_name, notify_mod, notify_data,
1324                       notify_cls, flags);
1325 }
1326
1327
1328 /**
1329  * Resume transmission to the channel.
1330  *
1331  * @param th Handle of the request that is being resumed.
1332  */
1333 void
1334 GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th)
1335 {
1336   channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
1337 }
1338
1339
1340 /**
1341  * Abort transmission request to the channel.
1342  *
1343  * @param th Handle of the request that is being aborted.
1344  */
1345 void
1346 GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle *th)
1347 {
1348   channel_transmit_cancel ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
1349 }
1350
1351
1352 /**
1353  * Join a PSYC channel.
1354  *
1355  * The entity joining is always the local peer.  The user must immediately use
1356  * the GNUNET_PSYC_slave_transmit() functions to transmit a @e join_msg to the
1357  * channel; if the join request succeeds, the channel state (and @e recent
1358  * method calls) will be replayed to the joining member.  There is no explicit
1359  * notification on failure (as the channel may simply take days to approve,
1360  * and disapproval is simply being ignored).
1361  *
1362  * @param cfg Configuration to use.
1363  * @param channel_key ECC public key that identifies the channel we wish to join.
1364  * @param slave_key ECC private-public key pair that identifies the slave, and
1365  *        used by multicast to sign the join request and subsequent unicast
1366  *        requests sent to the master.
1367  * @param origin Peer identity of the origin.
1368  * @param relay_count Number of peers in the @a relays array.
1369  * @param relays Peer identities of members of the multicast group, which serve
1370  *        as relays and used to join the group at.
1371  * @param message_cb Function to invoke on message parts received from the
1372  *        channel, typically at least contains method handlers for @e join and
1373  *        @e part.
1374  * @param slave_joined_cb Function invoked once we have joined the channel.
1375  * @param cls Closure for @a message_cb and @a slave_joined_cb.
1376  * @param method_name Method name for the join request.
1377  * @param env Environment containing transient variables for the request, or NULL.
1378  * @param data Payload for the join message.
1379  * @param data_size Number of bytes in @a data.
1380  *
1381  * @return Handle for the slave, NULL on error.
1382  */
1383 struct GNUNET_PSYC_Slave *
1384 GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
1385                         const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1386                         const struct GNUNET_CRYPTO_EddsaPrivateKey *slave_key,
1387                         const struct GNUNET_PeerIdentity *origin,
1388                         uint32_t relay_count,
1389                         const struct GNUNET_PeerIdentity *relays,
1390                         GNUNET_PSYC_MessageCallback message_cb,
1391                         GNUNET_PSYC_SlaveJoinCallback slave_joined_cb,
1392                         void *cls,
1393                         const char *method_name,
1394                         const struct GNUNET_ENV_Environment *env,
1395                         const void *data,
1396                         uint16_t data_size)
1397 {
1398   struct GNUNET_PSYC_Slave *slv = GNUNET_malloc (sizeof (*slv));
1399   struct GNUNET_PSYC_Channel *ch = &slv->ch;
1400   struct SlaveJoinRequest *req
1401     = GNUNET_malloc (sizeof (*req) + relay_count * sizeof (*relays));
1402   req->header.size = htons (sizeof (*req)
1403                             + relay_count * sizeof (*relays));
1404   req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN);
1405   req->channel_key = *channel_key;
1406   req->slave_key = *slave_key;
1407   req->origin = *origin;
1408   req->relay_count = htonl (relay_count);
1409   memcpy (&req[1], relays, relay_count * sizeof (*relays));
1410
1411   slv->join_cb = slave_joined_cb;
1412   ch->message_cb = message_cb;
1413   ch->cb_cls = cls;
1414
1415   ch->cfg = cfg;
1416   ch->is_master = GNUNET_NO;
1417   ch->reconnect_msg = (struct GNUNET_MessageHeader *) req;
1418   ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
1419   ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, slv);
1420
1421   return slv;
1422 }
1423
1424
1425 /**
1426  * Part a PSYC channel.
1427  *
1428  * Will terminate the connection to the PSYC service.  Polite clients should
1429  * first explicitly send a part request (via GNUNET_PSYC_slave_transmit()).
1430  *
1431  * @param slave Slave handle.
1432  */
1433 void
1434 GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slave)
1435 {
1436   disconnect (slave);
1437   GNUNET_free (slave);
1438 }
1439
1440
1441 /**
1442  * Request a message to be sent to the channel master.
1443  *
1444  * @param slave Slave handle.
1445  * @param method_name Which (PSYC) method should be invoked (on host).
1446  * @param notify_mod Function to call to obtain modifiers.
1447  * @param notify_data Function to call to obtain fragments of the data.
1448  * @param notify_cls Closure for @a notify.
1449  * @param flags Flags for the message being transmitted.
1450  *
1451  * @return Transmission handle, NULL on error (i.e. more than one request
1452  *         queued).
1453  */
1454 struct GNUNET_PSYC_SlaveTransmitHandle *
1455 GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave,
1456                             const char *method_name,
1457                             GNUNET_PSYC_TransmitNotifyModifier notify_mod,
1458                             GNUNET_PSYC_TransmitNotifyData notify_data,
1459                             void *notify_cls,
1460                             enum GNUNET_PSYC_SlaveTransmitFlags flags)
1461 {
1462   return (struct GNUNET_PSYC_SlaveTransmitHandle *)
1463     channel_transmit (&slave->ch, method_name,
1464                       notify_mod, notify_data, notify_cls, flags);
1465 }
1466
1467
1468 /**
1469  * Resume transmission to the master.
1470  *
1471  * @param th Handle of the request that is being resumed.
1472  */
1473 void
1474 GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_SlaveTransmitHandle *th)
1475 {
1476   channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
1477 }
1478
1479
1480 /**
1481  * Abort transmission request to master.
1482  *
1483  * @param th Handle of the request that is being aborted.
1484  */
1485 void
1486 GNUNET_PSYC_slave_transmit_cancel (struct GNUNET_PSYC_SlaveTransmitHandle *th)
1487 {
1488   channel_transmit_cancel ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
1489 }
1490
1491
1492 /**
1493  * Convert a channel @a master to a @e channel handle to access the @e channel
1494  * APIs.
1495  *
1496  * @param master Channel master handle.
1497  *
1498  * @return Channel handle, valid for as long as @a master is valid.
1499  */
1500 struct GNUNET_PSYC_Channel *
1501 GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master)
1502 {
1503   return &master->ch;
1504 }
1505
1506
1507 /**
1508  * Convert @a slave to a @e channel handle to access the @e channel APIs.
1509  *
1510  * @param slave Slave handle.
1511  *
1512  * @return Channel handle, valid for as long as @a slave is valid.
1513  */
1514 struct GNUNET_PSYC_Channel *
1515 GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slave)
1516 {
1517   return &slave->ch;
1518 }
1519
1520
1521 /**
1522  * Add a slave to the channel's membership list.
1523  *
1524  * Note that this will NOT generate any PSYC traffic, it will merely update the
1525  * local database to modify how we react to <em>membership test</em> queries.
1526  * The channel master still needs to explicitly transmit a @e join message to
1527  * notify other channel members and they then also must still call this function
1528  * in their respective methods handling the @e join message.  This way, how @e
1529  * join and @e part operations are exactly implemented is still up to the
1530  * application; for example, there might be a @e part_all method to kick out
1531  * everyone.
1532  *
1533  * Note that channel slaves are explicitly trusted to execute such methods
1534  * correctly; not doing so correctly will result in either denying other slaves
1535  * access or offering access to channel data to non-members.
1536  *
1537  * @param channel Channel handle.
1538  * @param slave_key Identity of channel slave to add.
1539  * @param announced_at ID of the message that announced the membership change.
1540  * @param effective_since Addition of slave is in effect since this message ID.
1541  */
1542 void
1543 GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *channel,
1544                                const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
1545                                uint64_t announced_at,
1546                                uint64_t effective_since)
1547 {
1548   struct ChannelSlaveAdd *slvadd;
1549   struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + sizeof (*slvadd));
1550
1551   slvadd = (struct ChannelSlaveAdd *) &mq[1];
1552   slvadd->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD);
1553   slvadd->header.size = htons (sizeof (*slvadd));
1554   slvadd->announced_at = GNUNET_htonll (announced_at);
1555   slvadd->effective_since = GNUNET_htonll (effective_since);
1556   GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head,
1557                                     channel->tmit_tail,
1558                                     mq);
1559   transmit_next (channel);
1560 }
1561
1562
1563 /**
1564  * Remove a slave from the channel's membership list.
1565  *
1566  * Note that this will NOT generate any PSYC traffic, it will merely update the
1567  * local database to modify how we react to <em>membership test</em> queries.
1568  * The channel master still needs to explicitly transmit a @e part message to
1569  * notify other channel members and they then also must still call this function
1570  * in their respective methods handling the @e part message.  This way, how
1571  * @e join and @e part operations are exactly implemented is still up to the
1572  * application; for example, there might be a @e part_all message to kick out
1573  * everyone.
1574  *
1575  * Note that channel members are explicitly trusted to perform these
1576  * operations correctly; not doing so correctly will result in either
1577  * denying members access or offering access to channel data to
1578  * non-members.
1579  *
1580  * @param channel Channel handle.
1581  * @param slave_key Identity of channel slave to remove.
1582  * @param announced_at ID of the message that announced the membership change.
1583  */
1584 void
1585 GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *channel,
1586                                   const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
1587                                   uint64_t announced_at)
1588 {
1589   struct ChannelSlaveRemove *slvrm;
1590   struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + sizeof (*slvrm));
1591
1592   slvrm = (struct ChannelSlaveRemove *) &mq[1];
1593   slvrm->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM);
1594   slvrm->header.size = htons (sizeof (*slvrm));
1595   slvrm->announced_at = GNUNET_htonll (announced_at);
1596   GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head,
1597                                     channel->tmit_tail,
1598                                     mq);
1599   transmit_next (channel);
1600 }
1601
1602
1603 /**
1604  * Request to be told the message history of the channel.
1605  *
1606  * Historic messages (but NOT the state at the time) will be replayed (given to
1607  * the normal method handlers) if available and if access is permitted.
1608  *
1609  * To get the latest message, use 0 for both the start and end message ID.
1610  *
1611  * @param channel Which channel should be replayed?
1612  * @param start_message_id Earliest interesting point in history.
1613  * @param end_message_id Last (exclusive) interesting point in history.
1614  * @param message_cb Function to invoke on message parts received from the story.
1615  * @param finish_cb Function to call when the requested story has been fully
1616  *        told (counting message IDs might not suffice, as some messages
1617  *        might be secret and thus the listener would not know the story is
1618  *        finished without being told explicitly) once this function
1619  *        has been called, the client must not call
1620  *        GNUNET_PSYC_channel_story_tell_cancel() anymore.
1621  * @param cls Closure for the callbacks.
1622  *
1623  * @return Handle to cancel story telling operation.
1624  */
1625 struct GNUNET_PSYC_Story *
1626 GNUNET_PSYC_channel_story_tell (struct GNUNET_PSYC_Channel *channel,
1627                                 uint64_t start_message_id,
1628                                 uint64_t end_message_id,
1629                                 GNUNET_PSYC_MessageCallback message_cb,
1630                                 GNUNET_PSYC_FinishCallback finish_cb,
1631                                 void *cls)
1632 {
1633   return NULL;
1634 }
1635
1636
1637 /**
1638  * Abort story telling.
1639  *
1640  * This function must not be called from within method handlers (as given to
1641  * GNUNET_PSYC_slave_join()) of the slave.
1642  *
1643  * @param story Story telling operation to stop.
1644  */
1645 void
1646 GNUNET_PSYC_channel_story_tell_cancel (struct GNUNET_PSYC_Story *story)
1647 {
1648
1649 }
1650
1651
1652 /**
1653  * Retrieve the best matching channel state variable.
1654  *
1655  * If the requested variable name is not present in the state, the nearest
1656  * less-specific name is matched; for example, requesting "_a_b" will match "_a"
1657  * if "_a_b" does not exist.
1658  *
1659  * @param channel Channel handle.
1660  * @param full_name Full name of the requested variable, the actual variable
1661  *        returned might have a shorter name..
1662  * @param cb Function called once when a matching state variable is found.
1663  *        Not called if there's no matching state variable.
1664  * @param cb_cls Closure for the callbacks.
1665  *
1666  * @return Handle that can be used to cancel the query operation.
1667  */
1668 struct GNUNET_PSYC_StateQuery *
1669 GNUNET_PSYC_channel_state_get (struct GNUNET_PSYC_Channel *channel,
1670                                const char *full_name,
1671                                GNUNET_PSYC_StateCallback cb,
1672                                void *cb_cls)
1673 {
1674   return NULL;
1675 }
1676
1677
1678 /**
1679  * Return all channel state variables whose name matches a given prefix.
1680  *
1681  * A name matches if it starts with the given @a name_prefix, thus requesting
1682  * the empty prefix ("") will match all values; requesting "_a_b" will also
1683  * return values stored under "_a_b_c".
1684  *
1685  * The @a state_cb is invoked on all matching state variables asynchronously, as
1686  * the state is stored in and retrieved from the PSYCstore,
1687  *
1688  * @param channel Channel handle.
1689  * @param name_prefix Prefix of the state variable name to match.
1690  * @param cb Function to call with the matching state variables.
1691  * @param cb_cls Closure for the callbacks.
1692  *
1693  * @return Handle that can be used to cancel the query operation.
1694  */
1695 struct GNUNET_PSYC_StateQuery *
1696 GNUNET_PSYC_channel_state_get_prefix (struct GNUNET_PSYC_Channel *channel,
1697                                       const char *name_prefix,
1698                                       GNUNET_PSYC_StateCallback cb,
1699                                       void *cb_cls)
1700 {
1701   return NULL;
1702 }
1703
1704
1705 /**
1706  * Cancel a state query operation.
1707  *
1708  * @param query Handle for the operation to cancel.
1709  */
1710 void
1711 GNUNET_PSYC_channel_state_get_cancel (struct GNUNET_PSYC_StateQuery *query)
1712 {
1713
1714 }
1715
1716
1717 /* end of psyc_api.c */