85f86ceaa203fe3c2b78d4d0a0098560227a55b8
[oweals/gnunet.git] / src / psyc / psyc_api.c
1 /*
2  * This file is part of GNUnet
3  * (C) 2013 Christian Grothoff (and other contributing authors)
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * @file psyc/psyc_api.c
23  * @brief PSYC service; high-level access to the PSYC protocol
24  *        note that clients of this API are NOT expected to
25  *        understand the PSYC message format, only the semantics!
26  *        Parsing (and serializing) the PSYC stream format is done
27  *        within the implementation of the libgnunetpsyc library,
28  *        and this API deliberately exposes as little as possible
29  *        of the actual data stream format to the application!
30  * @author Gabor X Toth
31  */
32
33 #include <inttypes.h>
34
35 #include "platform.h"
36 #include "gnunet_util_lib.h"
37 #include "gnunet_env_lib.h"
38 #include "gnunet_multicast_service.h"
39 #include "gnunet_psyc_service.h"
40 #include "psyc.h"
41
42 #define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__)
43
44 struct MessageQueue
45 {
46   struct MessageQueue *prev;
47   struct MessageQueue *next;
48 };
49
50
51 /**
52  * Handle for a pending PSYC transmission operation.
53  */
54 struct GNUNET_PSYC_ChannelTransmitHandle
55 {
56   struct GNUNET_PSYC_Channel *ch;
57   GNUNET_PSYC_TransmitNotifyModifier notify_mod;
58   GNUNET_PSYC_TransmitNotifyData notify_data;
59   void *notify_cls;
60   enum MessageState state;
61 };
62
63 /**
64  * Handle to access PSYC channel operations for both the master and slaves.
65  */
66 struct GNUNET_PSYC_Channel
67 {
68   /**
69    * Transmission handle;
70    */
71   struct GNUNET_PSYC_ChannelTransmitHandle tmit;
72
73   /**
74    * Configuration to use.
75    */
76   const struct GNUNET_CONFIGURATION_Handle *cfg;
77
78   /**
79    * Socket (if available).
80    */
81   struct GNUNET_CLIENT_Connection *client;
82
83   /**
84    * Currently pending transmission request, or NULL for none.
85    */
86   struct GNUNET_CLIENT_TransmitHandle *th;
87
88   /**
89    * Head of messages to transmit to the service.
90    */
91   struct MessageQueue *tmit_head;
92
93   /**
94    * Tail of operations to transmit to the service.
95    */
96   struct MessageQueue *tmit_tail;
97
98   /**
99    * Message currently being transmitted to the service.
100    */
101   struct MessageQueue *tmit_msg;
102
103   /**
104    * Message to send on reconnect.
105    */
106   struct GNUNET_MessageHeader *reconnect_msg;
107
108   /**
109    * Task doing exponential back-off trying to reconnect.
110    */
111   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
112
113   /**
114    * Time for next connect retry.
115    */
116   struct GNUNET_TIME_Relative reconnect_delay;
117
118   /**
119    * Message part callback.
120    */
121   GNUNET_PSYC_MessageCallback message_cb;
122
123   /**
124    * Message part callback for historic message.
125    */
126   GNUNET_PSYC_MessageCallback hist_message_cb;
127
128   /**
129    * Join handler callback.
130    */
131   GNUNET_PSYC_JoinCallback join_cb;
132
133   /**
134    * Closure for @a message_cb and @a join_cb.
135    */
136   void *cb_cls;
137
138   /**
139    * ID of the message being received from the PSYC service.
140    */
141   uint64_t recv_message_id;
142
143   /**
144    * Public key of the slave from which a message is being received.
145    */
146   struct GNUNET_CRYPTO_EddsaPublicKey recv_slave_key;
147
148   /**
149    * State of the currently being received message from the PSYC service.
150    */
151   enum MessageState recv_state;
152
153   /**
154    * Flags for the currently being received message from the PSYC service.
155    */
156   enum GNUNET_PSYC_MessageFlags recv_flags;
157
158   /**
159    * Expected value size for the modifier being received from the PSYC service.
160    */
161   uint32_t recv_mod_value_size_expected;
162
163   /**
164    * Actual value size for the modifier being received from the PSYC service.
165    */
166   uint32_t recv_mod_value_size;
167
168   /**
169    * Is transmission paused?
170    */
171   uint8_t tmit_paused;
172
173   /**
174    * Are we still waiting for a PSYC_TRANSMIT_ACK?
175    */
176   uint8_t tmit_ack_pending;
177
178   /**
179    * Are we polling for incoming messages right now?
180    */
181   uint8_t in_receive;
182
183   /**
184    * Are we currently transmitting a message?
185    */
186   uint8_t in_transmit;
187
188   /**
189    * Is this a master or slave channel?
190    */
191   uint8_t is_master;
192 };
193
194
195 /**
196  * Handle for the master of a PSYC channel.
197  */
198 struct GNUNET_PSYC_Master
199 {
200   struct GNUNET_PSYC_Channel ch;
201
202   GNUNET_PSYC_MasterStartCallback start_cb;
203 };
204
205
206 /**
207  * Handle for a PSYC channel slave.
208  */
209 struct GNUNET_PSYC_Slave
210 {
211   struct GNUNET_PSYC_Channel ch;
212
213   GNUNET_PSYC_SlaveJoinCallback join_cb;
214 };
215
216
217 /**
218  * Handle that identifies a join request.
219  *
220  * Used to match calls to #GNUNET_PSYC_JoinCallback to the
221  * corresponding calls to GNUNET_PSYC_join_decision().
222  */
223 struct GNUNET_PSYC_JoinHandle
224 {
225
226 };
227
228
229 /**
230  * Handle for a pending PSYC transmission operation.
231  */
232 struct GNUNET_PSYC_SlaveTransmitHandle
233 {
234
235 };
236
237
238 /**
239  * Handle to a story telling operation.
240  */
241 struct GNUNET_PSYC_Story
242 {
243
244 };
245
246
247 /**
248  * Handle for a state query operation.
249  */
250 struct GNUNET_PSYC_StateQuery
251 {
252
253 };
254
255
256 static void
257 reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
258
259
260 static void
261 channel_transmit_data (struct GNUNET_PSYC_Channel *ch);
262
263
264 /**
265  * Reschedule a connect attempt to the service.
266  *
267  * @param ch  Channel to reconnect.
268  */
269 static void
270 reschedule_connect (struct GNUNET_PSYC_Channel *ch)
271 {
272   GNUNET_assert (ch->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
273
274   if (NULL != ch->th)
275   {
276     GNUNET_CLIENT_notify_transmit_ready_cancel (ch->th);
277     ch->th = NULL;
278   }
279   if (NULL != ch->client)
280   {
281     GNUNET_CLIENT_disconnect (ch->client);
282     ch->client = NULL;
283   }
284   ch->in_receive = GNUNET_NO;
285   LOG (GNUNET_ERROR_TYPE_DEBUG,
286        "Scheduling task to reconnect to PSYC service in %s.\n",
287        GNUNET_STRINGS_relative_time_to_string (ch->reconnect_delay, GNUNET_YES));
288   ch->reconnect_task =
289       GNUNET_SCHEDULER_add_delayed (ch->reconnect_delay, &reconnect, ch);
290   ch->reconnect_delay = GNUNET_TIME_STD_BACKOFF (ch->reconnect_delay);
291 }
292
293
294 /**
295  * Schedule transmission of the next message from our queue.
296  *
297  * @param ch PSYC channel handle
298  */
299 static void
300 transmit_next (struct GNUNET_PSYC_Channel *ch);
301
302
303 /**
304  * Reset stored data related to the last received message.
305  */
306 static void
307 recv_reset (struct GNUNET_PSYC_Channel *ch)
308 {
309   ch->recv_state = MSG_STATE_START;
310   ch->recv_flags = 0;
311   ch->recv_message_id = 0;
312   //FIXME: ch->recv_slave_key = { 0 };
313   ch->recv_mod_value_size = 0;
314   ch->recv_mod_value_size_expected = 0;
315 }
316
317
318 static void
319 recv_error (struct GNUNET_PSYC_Channel *ch)
320 {
321   GNUNET_PSYC_MessageCallback message_cb
322     = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC
323     ? ch->hist_message_cb
324     : ch->message_cb;
325
326   if (NULL != message_cb)
327     message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, NULL);
328
329   recv_reset (ch);
330 }
331
332
333 /**
334  * Queue a message part for transmission to the PSYC service.
335  *
336  * The message part is added to the current message buffer.
337  * When this buffer is full, it is added to the transmission queue.
338  *
339  * @param ch Channel struct for the client.
340  * @param msg Modifier message part, or NULL when there's no more modifiers.
341  * @param end End of message.
342  */
343 static void
344 queue_message (struct GNUNET_PSYC_Channel *ch,
345                const struct GNUNET_MessageHeader *msg,
346                uint8_t end)
347 {
348   uint16_t size = msg ? ntohs (msg->size) : 0;
349
350   LOG (GNUNET_ERROR_TYPE_DEBUG,
351        "Queueing message of type %u and size %u (end: %u)).\n",
352        ntohs (msg->type), size, end);
353
354   struct MessageQueue *mq = ch->tmit_msg;
355   struct GNUNET_MessageHeader *qmsg = NULL;
356   if (NULL != mq)
357   {
358     qmsg = (struct GNUNET_MessageHeader *) &mq[1];
359     if (NULL == msg
360         || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < qmsg->size + size)
361     {
362       /* End of message or buffer is full, add it to transmission queue
363        * and start with empty buffer */
364       qmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
365       qmsg->size = htons (qmsg->size);
366       GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, mq);
367       ch->tmit_msg = mq = NULL;
368       ch->tmit_ack_pending++;
369     }
370     else
371     {
372       /* Message fits in current buffer, append */
373       ch->tmit_msg
374         = mq = GNUNET_realloc (mq, sizeof (*mq) + qmsg->size + size);
375       qmsg = (struct GNUNET_MessageHeader *) &mq[1];
376       memcpy ((char *) qmsg + qmsg->size, msg, size);
377       qmsg->size += size;
378     }
379   }
380
381   if (NULL == mq && NULL != msg)
382   {
383     /* Empty buffer, copy over message. */
384     ch->tmit_msg
385       = mq = GNUNET_malloc (sizeof (*mq) + sizeof (*qmsg) + size);
386     qmsg = (struct GNUNET_MessageHeader *) &mq[1];
387     qmsg->size = sizeof (*qmsg) + size;
388     memcpy (&qmsg[1], msg, size);
389   }
390
391   if (NULL != mq
392       && (GNUNET_YES == end
393           || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
394               < qmsg->size + sizeof (struct GNUNET_MessageHeader))))
395   {
396     /* End of message or buffer is full, add it to transmission queue. */
397     qmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
398     qmsg->size = htons (qmsg->size);
399     GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, mq);
400     ch->tmit_msg = mq = NULL;
401     ch->tmit_ack_pending++;
402   }
403
404   if (GNUNET_YES == end)
405     ch->in_transmit = GNUNET_NO;
406
407   transmit_next (ch);
408 }
409
410
411 /**
412  * Request a modifier from a client to transmit.
413  *
414  * @param mst Master handle.
415  */
416 static void
417 channel_transmit_mod (struct GNUNET_PSYC_Channel *ch)
418 {
419   uint16_t max_data_size, data_size;
420   char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
421   struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
422   int notify_ret;
423
424   switch (ch->tmit.state)
425   {
426   case MSG_STATE_MODIFIER:
427   {
428     struct GNUNET_PSYC_MessageModifier *mod
429       = (struct GNUNET_PSYC_MessageModifier *) msg;
430     max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
431     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
432     msg->size = sizeof (struct GNUNET_PSYC_MessageModifier);
433     notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls, &data_size, &mod[1],
434                                       &mod->oper, &mod->value_size);
435     mod->name_size = strnlen ((char *) &mod[1], data_size);
436     if (mod->name_size < data_size)
437     {
438       mod->value_size = htonl (mod->value_size);
439       mod->name_size = htons (mod->name_size);
440     }
441     else if (0 < data_size)
442     {
443       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got invalid modifier name.\n");
444       notify_ret = GNUNET_SYSERR;
445     }
446     break;
447   }
448   case MSG_STATE_MOD_CONT:
449   {
450     max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
451     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
452     msg->size = sizeof (struct GNUNET_MessageHeader);
453     notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls,
454                                       &data_size, &msg[1], NULL, NULL);
455     break;
456   }
457   default:
458     GNUNET_assert (0);
459   }
460
461   switch (notify_ret)
462   {
463   case GNUNET_NO:
464     if (0 == data_size)
465     { /* Transmission paused, nothing to send. */
466       ch->tmit_paused = GNUNET_YES;
467       return;
468     }
469     ch->tmit.state = MSG_STATE_MOD_CONT;
470     break;
471
472   case GNUNET_YES:
473     if (0 == data_size)
474     {
475       /* End of modifiers. */
476       ch->tmit.state = MSG_STATE_DATA;
477       if (0 == ch->tmit_ack_pending)
478         channel_transmit_data (ch);
479
480       return;
481     }
482     ch->tmit.state = MSG_STATE_MODIFIER;
483     break;
484
485   default:
486     LOG (GNUNET_ERROR_TYPE_ERROR,
487          "MasterTransmitNotifyModifier returned error "
488          "when requesting a modifier.\n");
489
490     ch->tmit.state = MSG_STATE_CANCEL;
491     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
492     msg->size = htons (sizeof (*msg));
493
494     queue_message (ch, msg, GNUNET_YES);
495     return;
496   }
497
498   if (0 < data_size)
499   {
500     GNUNET_assert (data_size <= max_data_size);
501     msg->size = htons (msg->size + data_size);
502     queue_message (ch, msg, GNUNET_NO);
503   }
504
505   channel_transmit_mod (ch);
506 }
507
508
509 /**
510  * Request data from a client to transmit.
511  *
512  * @param mst Master handle.
513  */
514 static void
515 channel_transmit_data (struct GNUNET_PSYC_Channel *ch)
516 {
517   uint16_t data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD;
518   char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
519   struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
520
521   msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
522
523   int notify_ret = ch->tmit.notify_data (ch->tmit.notify_cls,
524                                          &data_size, &msg[1]);
525   switch (notify_ret)
526   {
527   case GNUNET_NO:
528     if (0 == data_size)
529     {
530       /* Transmission paused, nothing to send. */
531       ch->tmit_paused = GNUNET_YES;
532       return;
533     }
534     break;
535
536   case GNUNET_YES:
537     ch->tmit.state = MSG_STATE_END;
538     break;
539
540   default:
541     LOG (GNUNET_ERROR_TYPE_ERROR,
542          "MasterTransmitNotify returned error when requesting data.\n");
543
544     ch->tmit.state = MSG_STATE_CANCEL;
545     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
546     msg->size = htons (sizeof (*msg));
547     queue_message (ch, msg, GNUNET_YES);
548     return;
549   }
550
551   if (0 < data_size)
552   {
553     GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD);
554     msg->size = htons (sizeof (*msg) + data_size);
555     queue_message (ch, msg, !notify_ret);
556   }
557
558   /* End of message. */
559   if (GNUNET_YES == notify_ret)
560   {
561     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
562     msg->size = htons (sizeof (*msg));
563     queue_message (ch, msg, GNUNET_YES);
564   }
565 }
566
567
568 /**
569  * Send a message to a channel.
570  *
571  * @param ch Handle to the PSYC channel.
572  * @param method_name Which method should be invoked.
573  * @param notify_mod Function to call to obtain modifiers.
574  * @param notify_data Function to call to obtain fragments of the data.
575  * @param notify_cls Closure for @a notify_mod and @a notify_data.
576  * @param flags Flags for the message being transmitted.
577  *
578  * @return Transmission handle, NULL on error (i.e. more than one request queued).
579  */
580 static struct GNUNET_PSYC_ChannelTransmitHandle *
581 channel_transmit (struct GNUNET_PSYC_Channel *ch,
582                   const char *method_name,
583                   GNUNET_PSYC_TransmitNotifyModifier notify_mod,
584                   GNUNET_PSYC_TransmitNotifyData notify_data,
585                   void *notify_cls,
586                   uint32_t flags)
587 {
588   if (GNUNET_NO != ch->in_transmit)
589     return NULL;
590   ch->in_transmit = GNUNET_YES;
591
592   size_t size = strlen (method_name) + 1;
593   struct GNUNET_PSYC_MessageMethod *pmeth;
594   struct GNUNET_MessageHeader *qmsg;
595   struct MessageQueue *
596     mq = ch->tmit_msg = GNUNET_malloc (sizeof (*mq) + sizeof (*qmsg)
597                                        + sizeof (*pmeth) + size);
598   qmsg = (struct GNUNET_MessageHeader *) &mq[1];
599   qmsg->size = sizeof (*qmsg) + sizeof (*pmeth) + size;
600
601   pmeth = (struct GNUNET_PSYC_MessageMethod *) &qmsg[1];
602   pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
603   pmeth->header.size = htons (sizeof (*pmeth) + size);
604   pmeth->flags = htonl (flags);
605   memcpy (&pmeth[1], method_name, size);
606
607   ch->tmit.ch = ch;
608   ch->tmit.notify_mod = notify_mod;
609   ch->tmit.notify_data = notify_data;
610   ch->tmit.notify_cls = notify_cls;
611   ch->tmit.state = MSG_STATE_MODIFIER;
612
613   channel_transmit_mod (ch);
614   return &ch->tmit;
615 }
616
617
618 /**
619  * Resume transmission to the channel.
620  *
621  * @param th Handle of the request that is being resumed.
622  */
623 static void
624 channel_transmit_resume (struct GNUNET_PSYC_ChannelTransmitHandle *th)
625 {
626   struct GNUNET_PSYC_Channel *ch = th->ch;
627   if (0 == ch->tmit_ack_pending)
628   {
629     ch->tmit_paused = GNUNET_NO;
630     channel_transmit_data (ch);
631   }
632 }
633
634
635 /**
636  * Abort transmission request to channel.
637  *
638  * @param th Handle of the request that is being aborted.
639  */
640 static void
641 channel_transmit_cancel (struct GNUNET_PSYC_ChannelTransmitHandle *th)
642 {
643   struct GNUNET_PSYC_Channel *ch = th->ch;
644   if (GNUNET_NO == ch->in_transmit)
645     return;
646 }
647
648
649 /**
650  * Handle incoming message from the PSYC service.
651  *
652  * @param ch The channel the message is sent to.
653  * @param pmsg The message.
654  */
655 static void
656 handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
657                      const struct GNUNET_PSYC_MessageHeader *msg)
658 {
659   uint16_t size = ntohs (msg->header.size);
660   uint32_t flags = ntohl (msg->flags);
661
662   GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG,
663                            (struct GNUNET_MessageHeader *) msg);
664
665   if (MSG_STATE_START == ch->recv_state)
666   {
667     ch->recv_message_id = GNUNET_ntohll (msg->message_id);
668     ch->recv_flags = flags;
669     ch->recv_slave_key = msg->slave_key;
670     ch->recv_mod_value_size = 0;
671     ch->recv_mod_value_size_expected = 0;
672   }
673   else if (GNUNET_ntohll (msg->message_id) != ch->recv_message_id)
674   {
675     // FIXME
676     LOG (GNUNET_ERROR_TYPE_WARNING,
677          "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n",
678          GNUNET_ntohll (msg->message_id), ch->recv_message_id);
679     GNUNET_break_op (0);
680     recv_error (ch);
681     return;
682   }
683   else if (flags != ch->recv_flags)
684   {
685     LOG (GNUNET_ERROR_TYPE_WARNING,
686          "Unexpected message flags. Got: %lu, expected: %lu\n",
687          flags, ch->recv_flags);
688     GNUNET_break_op (0);
689     recv_error (ch);
690     return;
691   }
692
693   uint16_t pos = 0, psize = 0, ptype, size_eq, size_min;
694
695   for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
696   {
697     const struct GNUNET_MessageHeader *pmsg
698       = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
699     psize = ntohs (pmsg->size);
700     ptype = ntohs (pmsg->type);
701     size_eq = size_min = 0;
702
703     if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
704     {
705       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
706                   "Dropping message of type %u with invalid size %u.\n",
707                   ptype, psize);
708       recv_error (ch);
709       return;
710     }
711
712     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
713                 "Received message part from PSYC.\n");
714     GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
715
716     switch (ptype)
717     {
718     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
719       size_min = sizeof (struct GNUNET_PSYC_MessageMethod);
720       break;
721     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
722       size_min = sizeof (struct GNUNET_PSYC_MessageModifier);
723       break;
724     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
725     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
726       size_min = sizeof (struct GNUNET_MessageHeader);
727       break;
728     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
729     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
730       size_eq = sizeof (struct GNUNET_MessageHeader);
731       break;
732     default:
733       GNUNET_break_op (0);
734       recv_error (ch);
735       return;
736     }
737
738     if (! ((0 < size_eq && psize == size_eq)
739            || (0 < size_min && size_min <= psize)))
740     {
741       GNUNET_break_op (0);
742       recv_error (ch);
743       return;
744     }
745
746     switch (ptype)
747     {
748     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
749     {
750       struct GNUNET_PSYC_MessageMethod *meth
751         = (struct GNUNET_PSYC_MessageMethod *) pmsg;
752
753       if (MSG_STATE_START != ch->recv_state)
754       {
755         LOG (GNUNET_ERROR_TYPE_WARNING,
756              "Dropping out of order message method (%u).\n",
757              ch->recv_state);
758         /* It is normal to receive an incomplete message right after connecting,
759          * but should not happen later.
760          * FIXME: add a check for this condition.
761          */
762         GNUNET_break_op (0);
763         recv_error (ch);
764         return;
765       }
766
767       if ('\0' != *((char *) meth + psize - 1))
768       {
769         LOG (GNUNET_ERROR_TYPE_WARNING,
770              "Dropping message with malformed method. "
771              "Message ID: %" PRIu64 "\n", ch->recv_message_id);
772         GNUNET_break_op (0);
773         recv_error (ch);
774         return;
775       }
776       ch->recv_state = MSG_STATE_METHOD;
777       break;
778     }
779     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
780     {
781       if (!(MSG_STATE_METHOD == ch->recv_state
782             || MSG_STATE_MODIFIER == ch->recv_state
783             || MSG_STATE_MOD_CONT == ch->recv_state))
784       {
785         LOG (GNUNET_ERROR_TYPE_WARNING,
786              "Dropping out of order message modifier (%u).\n",
787              ch->recv_state);
788         GNUNET_break_op (0);
789         recv_error (ch);
790         return;
791       }
792
793       struct GNUNET_PSYC_MessageModifier *mod
794         = (struct GNUNET_PSYC_MessageModifier *) pmsg;
795
796       uint16_t name_size = ntohs (mod->name_size);
797       ch->recv_mod_value_size_expected = ntohl (mod->value_size);
798       ch->recv_mod_value_size = psize - sizeof (*mod) - name_size - 1;
799
800       if (psize < sizeof (*mod) + name_size + 1
801           || '\0' != *((char *) &mod[1] + name_size)
802           || ch->recv_mod_value_size_expected < ch->recv_mod_value_size)
803       {
804         LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping malformed modifier.\n");
805         GNUNET_break_op (0);
806         recv_error (ch);
807         return;
808       }
809       ch->recv_state = MSG_STATE_MODIFIER;
810       break;
811     }
812     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
813     {
814       ch->recv_mod_value_size += psize - sizeof (*pmsg);
815
816       if (!(MSG_STATE_MODIFIER == ch->recv_state
817             || MSG_STATE_MOD_CONT == ch->recv_state)
818           || ch->recv_mod_value_size_expected < ch->recv_mod_value_size)
819       {
820         LOG (GNUNET_ERROR_TYPE_WARNING,
821              "Dropping out of order message modifier continuation "
822              "!(%u == %u || %u == %u) || %lu < %lu.\n",
823              MSG_STATE_MODIFIER, ch->recv_state,
824              MSG_STATE_MOD_CONT, ch->recv_state,
825              ch->recv_mod_value_size_expected, ch->recv_mod_value_size);
826         GNUNET_break_op (0);
827         recv_error (ch);
828         return;
829       }
830       break;
831     }
832     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
833     {
834       if (ch->recv_state < MSG_STATE_METHOD
835           || ch->recv_mod_value_size_expected != ch->recv_mod_value_size)
836       {
837         LOG (GNUNET_ERROR_TYPE_WARNING,
838              "Dropping out of order message data fragment "
839              "(%u < %u || %lu != %lu).\n",
840              ch->recv_state, MSG_STATE_METHOD,
841              ch->recv_mod_value_size_expected, ch->recv_mod_value_size);
842
843         GNUNET_break_op (0);
844         recv_error (ch);
845         return;
846       }
847       ch->recv_state = MSG_STATE_DATA;
848       break;
849     }
850     }
851
852     GNUNET_PSYC_MessageCallback message_cb
853       = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC
854       ? ch->hist_message_cb
855       : ch->message_cb;
856
857     if (NULL != message_cb)
858       message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, pmsg);
859
860     switch (ptype)
861     {
862     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
863     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
864       recv_reset (ch);
865       break;
866     }
867   }
868 }
869
870
871 /**
872  * Handle incoming message acknowledgement from the PSYC service.
873  *
874  * @param ch The channel the acknowledgement is sent to.
875  */
876 static void
877 handle_psyc_message_ack (struct GNUNET_PSYC_Channel *ch)
878 {
879   if (0 == ch->tmit_ack_pending)
880   {
881     LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n");
882     GNUNET_break (0);
883     return;
884   }
885   ch->tmit_ack_pending--;
886
887   switch (ch->tmit.state)
888   {
889   case MSG_STATE_MODIFIER:
890   case MSG_STATE_MOD_CONT:
891     if (GNUNET_NO == ch->tmit_paused)
892       channel_transmit_mod (ch);
893     break;
894
895   case MSG_STATE_DATA:
896     if (GNUNET_NO == ch->tmit_paused)
897       channel_transmit_data (ch);
898     break;
899
900   case MSG_STATE_END:
901   case MSG_STATE_CANCEL:
902     break;
903
904   default:
905     LOG (GNUNET_ERROR_TYPE_DEBUG,
906          "Ignoring message ACK in state %u.\n", ch->tmit.state);
907   }
908 }
909
910
911 /**
912  * Type of a function to call when we receive a message
913  * from the service.
914  *
915  * @param cls closure
916  * @param msg message received, NULL on timeout or fatal error
917  */
918 static void
919 message_handler (void *cls,
920                  const struct GNUNET_MessageHeader *msg)
921 {
922   // YUCK! => please have disjoint message handlers...
923   struct GNUNET_PSYC_Channel *ch = cls;
924   struct GNUNET_PSYC_Master *mst = cls;
925   struct GNUNET_PSYC_Slave *slv = cls;
926
927   if (NULL == msg)
928   {
929     // timeout / disconnected from service, reconnect
930     reschedule_connect (ch);
931     return;
932   }
933   uint16_t size_eq = 0;
934   uint16_t size_min = 0;
935   uint16_t size = ntohs (msg->size);
936   uint16_t type = ntohs (msg->type);
937
938   LOG (GNUNET_ERROR_TYPE_DEBUG,
939        "Received message of type %d and size %u from PSYC service\n",
940        type, size);
941
942   switch (type)
943   {
944   case GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK:
945   case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK:
946     size_eq = sizeof (struct CountersResult);
947     break;
948   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
949     size_min = sizeof (struct GNUNET_PSYC_MessageHeader);
950     break;
951   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK:
952     size_eq = sizeof (struct GNUNET_MessageHeader);
953     break;
954   default:
955     GNUNET_break_op (0);
956     return;
957   }
958
959   if (! ((0 < size_eq && size == size_eq)
960          || (0 < size_min && size_min <= size)))
961   {
962     GNUNET_break_op (0);
963     return;
964   }
965
966   switch (type)
967   {
968   case GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK:
969   {
970     struct CountersResult *cres = (struct CountersResult *) msg;
971     if (NULL != mst->start_cb)
972       mst->start_cb (ch->cb_cls, GNUNET_ntohll (cres->max_message_id));
973     break;
974   }
975   case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK:
976   {
977     struct CountersResult *cres = (struct CountersResult *) msg;
978     if (NULL != slv->join_cb)
979       slv->join_cb (ch->cb_cls, GNUNET_ntohll (cres->max_message_id));
980     break;
981   }
982   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK:
983   {
984     handle_psyc_message_ack (ch);
985     break;
986   }
987
988   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
989     handle_psyc_message (ch, (const struct GNUNET_PSYC_MessageHeader *) msg);
990     break;
991   }
992
993   if (NULL != ch->client)
994   {
995     GNUNET_CLIENT_receive (ch->client, &message_handler, ch,
996                            GNUNET_TIME_UNIT_FOREVER_REL);
997   }
998 }
999
1000
1001 /**
1002  * Transmit next message to service.
1003  *
1004  * @param cls   The struct GNUNET_PSYC_Channel.
1005  * @param size  Number of bytes available in @a buf.
1006  * @param buf   Where to copy the message.
1007  *
1008  * @return Number of bytes copied to @a buf.
1009  */
1010 static size_t
1011 send_next_message (void *cls, size_t size, void *buf)
1012 {
1013   LOG (GNUNET_ERROR_TYPE_DEBUG, "send_next_message()\n");
1014   struct GNUNET_PSYC_Channel *ch = cls;
1015   struct MessageQueue *mq = ch->tmit_head;
1016   if (NULL == mq)
1017     return 0;
1018   struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1];
1019   size_t ret = ntohs (qmsg->size);
1020   ch->th = NULL;
1021   if (ret > size)
1022   {
1023     reschedule_connect (ch);
1024     return 0;
1025   }
1026   memcpy (buf, qmsg, ret);
1027
1028   GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, mq);
1029   GNUNET_free (mq);
1030
1031   if (NULL != ch->tmit_head)
1032     transmit_next (ch);
1033
1034   if (GNUNET_NO == ch->in_receive)
1035   {
1036     ch->in_receive = GNUNET_YES;
1037     GNUNET_CLIENT_receive (ch->client, &message_handler, ch,
1038                            GNUNET_TIME_UNIT_FOREVER_REL);
1039   }
1040   return ret;
1041 }
1042
1043
1044 /**
1045  * Schedule transmission of the next message from our queue.
1046  *
1047  * @param ch PSYC handle.
1048  */
1049 static void
1050 transmit_next (struct GNUNET_PSYC_Channel *ch)
1051 {
1052   LOG (GNUNET_ERROR_TYPE_DEBUG, "transmit_next()\n");
1053   if (NULL != ch->th || NULL == ch->client)
1054     return;
1055
1056   struct MessageQueue *mq = ch->tmit_head;
1057   if (NULL == mq)
1058     return;
1059   struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1];
1060
1061   ch->th = GNUNET_CLIENT_notify_transmit_ready (ch->client,
1062                                                 ntohs (qmsg->size),
1063                                                 GNUNET_TIME_UNIT_FOREVER_REL,
1064                                                 GNUNET_NO,
1065                                                 &send_next_message,
1066                                                 ch);
1067 }
1068
1069
1070 /**
1071  * Try again to connect to the PSYC service.
1072  *
1073  * @param cls Channel handle.
1074  * @param tc Scheduler context.
1075  */
1076 static void
1077 reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1078 {
1079   struct GNUNET_PSYC_Channel *ch = cls;
1080
1081   recv_reset (ch);
1082   ch->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
1083   LOG (GNUNET_ERROR_TYPE_DEBUG,
1084        "Connecting to PSYC service.\n");
1085   GNUNET_assert (NULL == ch->client);
1086   ch->client = GNUNET_CLIENT_connect ("psyc", ch->cfg);
1087   GNUNET_assert (NULL != ch->client);
1088   uint16_t reconn_size = ntohs (ch->reconnect_msg->size);
1089
1090   if (NULL == ch->tmit_head ||
1091       0 != memcmp (&ch->tmit_head[1], ch->reconnect_msg, reconn_size))
1092   {
1093     struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + reconn_size);
1094     memcpy (&mq[1], ch->reconnect_msg, reconn_size);
1095     GNUNET_CONTAINER_DLL_insert (ch->tmit_head, ch->tmit_tail, mq);
1096   }
1097   transmit_next (ch);
1098 }
1099
1100
1101 /**
1102  * Disconnect from the PSYC service.
1103  *
1104  * @param c  Channel handle to disconnect.
1105  */
1106 static void
1107 disconnect (void *c)
1108 {
1109   struct GNUNET_PSYC_Channel *ch = c;
1110
1111   GNUNET_assert (NULL != ch);
1112   if (ch->tmit_head != ch->tmit_tail)
1113   {
1114     LOG (GNUNET_ERROR_TYPE_ERROR,
1115          "Disconnecting while there are still outstanding messages!\n");
1116     GNUNET_break (0);
1117   }
1118   if (ch->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
1119   {
1120     GNUNET_SCHEDULER_cancel (ch->reconnect_task);
1121     ch->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
1122   }
1123   if (NULL != ch->th)
1124   {
1125     GNUNET_CLIENT_notify_transmit_ready_cancel (ch->th);
1126     ch->th = NULL;
1127   }
1128   if (NULL != ch->client)
1129   {
1130     GNUNET_CLIENT_disconnect (ch->client);
1131     ch->client = NULL;
1132   }
1133   if (NULL != ch->reconnect_msg)
1134   {
1135     GNUNET_free (ch->reconnect_msg);
1136     ch->reconnect_msg = NULL;
1137   }
1138 }
1139
1140
1141 /**
1142  * Start a PSYC master channel.
1143  *
1144  * Will start a multicast group identified by the given ECC key.  Messages
1145  * received from group members will be given to the respective handler methods.
1146  * If a new member wants to join a group, the "join" method handler will be
1147  * invoked; the join handler must then generate a "join" message to approve the
1148  * joining of the new member.  The channel can also change group membership
1149  * without explicit requests.  Note that PSYC doesn't itself "understand" join
1150  * or part messages, the respective methods must call other PSYC functions to
1151  * inform PSYC about the meaning of the respective events.
1152  *
1153  * @param cfg Configuration to use (to connect to PSYC service).
1154  * @param channel_key ECC key that will be used to sign messages for this
1155  *        PSYC session. The public key is used to identify the PSYC channel.
1156  *        Note that end-users will usually not use the private key directly, but
1157  *        rather look it up in GNS for places managed by other users, or select
1158  *        a file with the private key(s) when setting up their own channels
1159  *        FIXME: we'll likely want to use NOT the p521 curve here, but a cheaper
1160  *        one in the future.
1161  * @param policy Channel policy specifying join and history restrictions.
1162  *        Used to automate join decisions.
1163  * @param message_cb Function to invoke on message parts received from slaves.
1164  * @param join_cb Function to invoke when a peer wants to join.
1165  * @param master_started_cb Function to invoke after the channel master started.
1166  * @param cls Closure for @a master_started_cb and @a join_cb.
1167  *
1168  * @return Handle for the channel master, NULL on error.
1169  */
1170 struct GNUNET_PSYC_Master *
1171 GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
1172                           const struct GNUNET_CRYPTO_EddsaPrivateKey *channel_key,
1173                           enum GNUNET_PSYC_Policy policy,
1174                           GNUNET_PSYC_MessageCallback message_cb,
1175                           GNUNET_PSYC_JoinCallback join_cb,
1176                           GNUNET_PSYC_MasterStartCallback master_started_cb,
1177                           void *cls)
1178 {
1179   struct GNUNET_PSYC_Master *mst = GNUNET_malloc (sizeof (*mst));
1180   struct GNUNET_PSYC_Channel *ch = &mst->ch;
1181   struct MasterStartRequest *req = GNUNET_malloc (sizeof (*req));
1182
1183   req->header.size = htons (sizeof (*req));
1184   req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START);
1185   req->channel_key = *channel_key;
1186   req->policy = policy;
1187
1188   mst->start_cb = master_started_cb;
1189   ch->message_cb = message_cb;
1190   ch->join_cb = join_cb;
1191   ch->cb_cls = cls;
1192   ch->cfg = cfg;
1193   ch->is_master = GNUNET_YES;
1194   ch->reconnect_msg = (struct GNUNET_MessageHeader *) req;
1195   ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
1196   ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, mst);
1197
1198   return mst;
1199 }
1200
1201
1202 /**
1203  * Stop a PSYC master channel.
1204  *
1205  * @param master PSYC channel master to stop.
1206  */
1207 void
1208 GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *master)
1209 {
1210   disconnect (master);
1211   GNUNET_free (master);
1212 }
1213
1214
1215 /**
1216  * Function to call with the decision made for a join request.
1217  *
1218  * Must be called once and only once in response to an invocation of the
1219  * #GNUNET_PSYC_JoinCallback.
1220  *
1221  * @param jh Join request handle.
1222  * @param is_admitted #GNUNET_YES if joining is approved,
1223  *        #GNUNET_NO if it is disapproved.
1224  * @param relay_count Number of relays given.
1225  * @param relays Array of suggested peers that might be useful relays to use
1226  *        when joining the multicast group (essentially a list of peers that
1227  *        are already part of the multicast group and might thus be willing
1228  *        to help with routing).  If empty, only this local peer (which must
1229  *        be the multicast origin) is a good candidate for building the
1230  *        multicast tree.  Note that it is unnecessary to specify our own
1231  *        peer identity in this array.
1232  * @param method_name Method name for the message transmitted with the response.
1233  * @param env Environment containing transient variables for the message, or NULL.
1234  * @param data Data of the message.
1235  * @param data_size Size of @a data.
1236  */
1237 void
1238 GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh,
1239                            int is_admitted,
1240                            uint32_t relay_count,
1241                            const struct GNUNET_PeerIdentity *relays,
1242                            const char *method_name,
1243                            const struct GNUNET_ENV_Environment *env,
1244                            const void *data,
1245                            size_t data_size)
1246 {
1247
1248 }
1249
1250
1251 /**
1252  * Send a message to call a method to all members in the PSYC channel.
1253  *
1254  * @param master Handle to the PSYC channel.
1255  * @param method_name Which method should be invoked.
1256  * @param notify_mod Function to call to obtain modifiers.
1257  * @param notify_data Function to call to obtain fragments of the data.
1258  * @param notify_cls Closure for @a notify_mod and @a notify_data.
1259  * @param flags Flags for the message being transmitted.
1260  *
1261  * @return Transmission handle, NULL on error (i.e. more than one request queued).
1262  */
1263 struct GNUNET_PSYC_MasterTransmitHandle *
1264 GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master,
1265                              const char *method_name,
1266                              GNUNET_PSYC_TransmitNotifyModifier notify_mod,
1267                              GNUNET_PSYC_TransmitNotifyData notify_data,
1268                              void *notify_cls,
1269                              enum GNUNET_PSYC_MasterTransmitFlags flags)
1270 {
1271   return (struct GNUNET_PSYC_MasterTransmitHandle *)
1272     channel_transmit (&master->ch, method_name, notify_mod, notify_data,
1273                       notify_cls, flags);
1274 }
1275
1276
1277 /**
1278  * Resume transmission to the channel.
1279  *
1280  * @param th Handle of the request that is being resumed.
1281  */
1282 void
1283 GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th)
1284 {
1285   channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
1286 }
1287
1288
1289 /**
1290  * Abort transmission request to the channel.
1291  *
1292  * @param th Handle of the request that is being aborted.
1293  */
1294 void
1295 GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle *th)
1296 {
1297   channel_transmit_cancel ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
1298 }
1299
1300
1301 /**
1302  * Join a PSYC channel.
1303  *
1304  * The entity joining is always the local peer.  The user must immediately use
1305  * the GNUNET_PSYC_slave_transmit() functions to transmit a @e join_msg to the
1306  * channel; if the join request succeeds, the channel state (and @e recent
1307  * method calls) will be replayed to the joining member.  There is no explicit
1308  * notification on failure (as the channel may simply take days to approve,
1309  * and disapproval is simply being ignored).
1310  *
1311  * @param cfg Configuration to use.
1312  * @param channel_key ECC public key that identifies the channel we wish to join.
1313  * @param slave_key ECC private-public key pair that identifies the slave, and
1314  *        used by multicast to sign the join request and subsequent unicast
1315  *        requests sent to the master.
1316  * @param origin Peer identity of the origin.
1317  * @param relay_count Number of peers in the @a relays array.
1318  * @param relays Peer identities of members of the multicast group, which serve
1319  *        as relays and used to join the group at.
1320  * @param message_cb Function to invoke on message parts received from the
1321  *        channel, typically at least contains method handlers for @e join and
1322  *        @e part.
1323  * @param join_cb function invoked once we have joined with the current
1324  *        message ID of the channel
1325  * @param slave_joined_cb Function to invoke when a peer wants to join.
1326  * @param cls Closure for @a message_cb and @a slave_joined_cb.
1327  * @param method_name Method name for the join request.
1328  * @param env Environment containing transient variables for the request, or NULL.
1329  * @param data Payload for the join message.
1330  * @param data_size Number of bytes in @a data.
1331  *
1332  * @return Handle for the slave, NULL on error.
1333  */
1334 struct GNUNET_PSYC_Slave *
1335 GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
1336                         const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1337                         const struct GNUNET_CRYPTO_EddsaPrivateKey *slave_key,
1338                         const struct GNUNET_PeerIdentity *origin,
1339                         uint32_t relay_count,
1340                         const struct GNUNET_PeerIdentity *relays,
1341                         GNUNET_PSYC_MessageCallback message_cb,
1342                         GNUNET_PSYC_JoinCallback join_cb,
1343                         GNUNET_PSYC_SlaveJoinCallback slave_joined_cb,
1344                         void *cls,
1345                         const char *method_name,
1346                         const struct GNUNET_ENV_Environment *env,
1347                         const void *data,
1348                         uint16_t data_size)
1349 {
1350   struct GNUNET_PSYC_Slave *slv = GNUNET_malloc (sizeof (*slv));
1351   struct GNUNET_PSYC_Channel *ch = &slv->ch;
1352   struct SlaveJoinRequest *req
1353     = GNUNET_malloc (sizeof (*req) + relay_count * sizeof (*relays));
1354   req->header.size = htons (sizeof (*req)
1355                             + relay_count * sizeof (*relays));
1356   req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN);
1357   req->channel_key = *channel_key;
1358   req->slave_key = *slave_key;
1359   req->origin = *origin;
1360   req->relay_count = htonl (relay_count);
1361   memcpy (&req[1], relays, relay_count * sizeof (*relays));
1362
1363   slv->join_cb = slave_joined_cb;
1364   ch->message_cb = message_cb;
1365   ch->join_cb = join_cb;
1366   ch->cb_cls = cls;
1367
1368   ch->cfg = cfg;
1369   ch->is_master = GNUNET_NO;
1370   ch->reconnect_msg = (struct GNUNET_MessageHeader *) req;
1371   ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
1372   ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, slv);
1373
1374   return slv;
1375 }
1376
1377
1378 /**
1379  * Part a PSYC channel.
1380  *
1381  * Will terminate the connection to the PSYC service.  Polite clients should
1382  * first explicitly send a part request (via GNUNET_PSYC_slave_transmit()).
1383  *
1384  * @param slave Slave handle.
1385  */
1386 void
1387 GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slave)
1388 {
1389   disconnect (slave);
1390   GNUNET_free (slave);
1391 }
1392
1393
1394 /**
1395  * Request a message to be sent to the channel master.
1396  *
1397  * @param slave Slave handle.
1398  * @param method_name Which (PSYC) method should be invoked (on host).
1399  * @param notify_mod Function to call to obtain modifiers.
1400  * @param notify_data Function to call to obtain fragments of the data.
1401  * @param notify_cls Closure for @a notify.
1402  * @param flags Flags for the message being transmitted.
1403  *
1404  * @return Transmission handle, NULL on error (i.e. more than one request
1405  *         queued).
1406  */
1407 struct GNUNET_PSYC_SlaveTransmitHandle *
1408 GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave,
1409                             const char *method_name,
1410                             GNUNET_PSYC_TransmitNotifyModifier notify_mod,
1411                             GNUNET_PSYC_TransmitNotifyData notify_data,
1412                             void *notify_cls,
1413                             enum GNUNET_PSYC_SlaveTransmitFlags flags)
1414 {
1415   return (struct GNUNET_PSYC_SlaveTransmitHandle *)
1416     channel_transmit (&slave->ch, method_name,
1417                       notify_mod, notify_data, notify_cls, flags);
1418 }
1419
1420
1421 /**
1422  * Resume transmission to the master.
1423  *
1424  * @param th Handle of the request that is being resumed.
1425  */
1426 void
1427 GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_SlaveTransmitHandle *th)
1428 {
1429   channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
1430 }
1431
1432
1433 /**
1434  * Abort transmission request to master.
1435  *
1436  * @param th Handle of the request that is being aborted.
1437  */
1438 void
1439 GNUNET_PSYC_slave_transmit_cancel (struct GNUNET_PSYC_SlaveTransmitHandle *th)
1440 {
1441   channel_transmit_cancel ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
1442 }
1443
1444
1445 /**
1446  * Convert a channel @a master to a @e channel handle to access the @e channel
1447  * APIs.
1448  *
1449  * @param master Channel master handle.
1450  *
1451  * @return Channel handle, valid for as long as @a master is valid.
1452  */
1453 struct GNUNET_PSYC_Channel *
1454 GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master)
1455 {
1456   return &master->ch;
1457 }
1458
1459
1460 /**
1461  * Convert @a slave to a @e channel handle to access the @e channel APIs.
1462  *
1463  * @param slave Slave handle.
1464  *
1465  * @return Channel handle, valid for as long as @a slave is valid.
1466  */
1467 struct GNUNET_PSYC_Channel *
1468 GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slave)
1469 {
1470   return &slave->ch;
1471 }
1472
1473
1474 /**
1475  * Add a slave to the channel's membership list.
1476  *
1477  * Note that this will NOT generate any PSYC traffic, it will merely update the
1478  * local database to modify how we react to <em>membership test</em> queries.
1479  * The channel master still needs to explicitly transmit a @e join message to
1480  * notify other channel members and they then also must still call this function
1481  * in their respective methods handling the @e join message.  This way, how @e
1482  * join and @e part operations are exactly implemented is still up to the
1483  * application; for example, there might be a @e part_all method to kick out
1484  * everyone.
1485  *
1486  * Note that channel slaves are explicitly trusted to execute such methods
1487  * correctly; not doing so correctly will result in either denying other slaves
1488  * access or offering access to channel data to non-members.
1489  *
1490  * @param channel Channel handle.
1491  * @param slave_key Identity of channel slave to add.
1492  * @param announced_at ID of the message that announced the membership change.
1493  * @param effective_since Addition of slave is in effect since this message ID.
1494  */
1495 void
1496 GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *channel,
1497                                const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
1498                                uint64_t announced_at,
1499                                uint64_t effective_since)
1500 {
1501   struct ChannelSlaveAdd *slvadd;
1502   struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + sizeof (*slvadd));
1503
1504   slvadd = (struct ChannelSlaveAdd *) &mq[1];
1505   slvadd->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD);
1506   slvadd->header.size = htons (sizeof (*slvadd));
1507   slvadd->announced_at = GNUNET_htonll (announced_at);
1508   slvadd->effective_since = GNUNET_htonll (effective_since);
1509   GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head,
1510                                     channel->tmit_tail,
1511                                     mq);
1512   transmit_next (channel);
1513 }
1514
1515
1516 /**
1517  * Remove a slave from the channel's membership list.
1518  *
1519  * Note that this will NOT generate any PSYC traffic, it will merely update the
1520  * local database to modify how we react to <em>membership test</em> queries.
1521  * The channel master still needs to explicitly transmit a @e part message to
1522  * notify other channel members and they then also must still call this function
1523  * in their respective methods handling the @e part message.  This way, how
1524  * @e join and @e part operations are exactly implemented is still up to the
1525  * application; for example, there might be a @e part_all message to kick out
1526  * everyone.
1527  *
1528  * Note that channel members are explicitly trusted to perform these
1529  * operations correctly; not doing so correctly will result in either
1530  * denying members access or offering access to channel data to
1531  * non-members.
1532  *
1533  * @param channel Channel handle.
1534  * @param slave_key Identity of channel slave to remove.
1535  * @param announced_at ID of the message that announced the membership change.
1536  */
1537 void
1538 GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *channel,
1539                                   const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
1540                                   uint64_t announced_at)
1541 {
1542   struct ChannelSlaveRemove *slvrm;
1543   struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + sizeof (*slvrm));
1544
1545   slvrm = (struct ChannelSlaveRemove *) &mq[1];
1546   slvrm->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM);
1547   slvrm->header.size = htons (sizeof (*slvrm));
1548   slvrm->announced_at = GNUNET_htonll (announced_at);
1549   GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head,
1550                                     channel->tmit_tail,
1551                                     mq);
1552   transmit_next (channel);
1553 }
1554
1555
1556 /**
1557  * Request to be told the message history of the channel.
1558  *
1559  * Historic messages (but NOT the state at the time) will be replayed (given to
1560  * the normal method handlers) if available and if access is permitted.
1561  *
1562  * To get the latest message, use 0 for both the start and end message ID.
1563  *
1564  * @param channel Which channel should be replayed?
1565  * @param start_message_id Earliest interesting point in history.
1566  * @param end_message_id Last (exclusive) interesting point in history.
1567  * @param message_cb Function to invoke on message parts received from the story.
1568  * @param finish_cb Function to call when the requested story has been fully
1569  *        told (counting message IDs might not suffice, as some messages
1570  *        might be secret and thus the listener would not know the story is
1571  *        finished without being told explicitly) once this function
1572  *        has been called, the client must not call
1573  *        GNUNET_PSYC_channel_story_tell_cancel() anymore.
1574  * @param cls Closure for the callbacks.
1575  *
1576  * @return Handle to cancel story telling operation.
1577  */
1578 struct GNUNET_PSYC_Story *
1579 GNUNET_PSYC_channel_story_tell (struct GNUNET_PSYC_Channel *channel,
1580                                 uint64_t start_message_id,
1581                                 uint64_t end_message_id,
1582                                 GNUNET_PSYC_MessageCallback message_cb,
1583                                 GNUNET_PSYC_FinishCallback finish_cb,
1584                                 void *cls)
1585 {
1586   return NULL;
1587 }
1588
1589
1590 /**
1591  * Abort story telling.
1592  *
1593  * This function must not be called from within method handlers (as given to
1594  * GNUNET_PSYC_slave_join()) of the slave.
1595  *
1596  * @param story Story telling operation to stop.
1597  */
1598 void
1599 GNUNET_PSYC_channel_story_tell_cancel (struct GNUNET_PSYC_Story *story)
1600 {
1601
1602 }
1603
1604
1605 /**
1606  * Retrieve the best matching channel state variable.
1607  *
1608  * If the requested variable name is not present in the state, the nearest
1609  * less-specific name is matched; for example, requesting "_a_b" will match "_a"
1610  * if "_a_b" does not exist.
1611  *
1612  * @param channel Channel handle.
1613  * @param full_name Full name of the requested variable, the actual variable
1614  *        returned might have a shorter name..
1615  * @param cb Function called once when a matching state variable is found.
1616  *        Not called if there's no matching state variable.
1617  * @param cb_cls Closure for the callbacks.
1618  *
1619  * @return Handle that can be used to cancel the query operation.
1620  */
1621 struct GNUNET_PSYC_StateQuery *
1622 GNUNET_PSYC_channel_state_get (struct GNUNET_PSYC_Channel *channel,
1623                                const char *full_name,
1624                                GNUNET_PSYC_StateCallback cb,
1625                                void *cb_cls)
1626 {
1627   return NULL;
1628 }
1629
1630
1631 /**
1632  * Return all channel state variables whose name matches a given prefix.
1633  *
1634  * A name matches if it starts with the given @a name_prefix, thus requesting
1635  * the empty prefix ("") will match all values; requesting "_a_b" will also
1636  * return values stored under "_a_b_c".
1637  *
1638  * The @a state_cb is invoked on all matching state variables asynchronously, as
1639  * the state is stored in and retrieved from the PSYCstore,
1640  *
1641  * @param channel Channel handle.
1642  * @param name_prefix Prefix of the state variable name to match.
1643  * @param cb Function to call with the matching state variables.
1644  * @param cb_cls Closure for the callbacks.
1645  *
1646  * @return Handle that can be used to cancel the query operation.
1647  */
1648 struct GNUNET_PSYC_StateQuery *
1649 GNUNET_PSYC_channel_state_get_prefix (struct GNUNET_PSYC_Channel *channel,
1650                                       const char *name_prefix,
1651                                       GNUNET_PSYC_StateCallback cb,
1652                                       void *cb_cls)
1653 {
1654   return NULL;
1655 }
1656
1657
1658 /**
1659  * Cancel a state query operation.
1660  *
1661  * @param query Handle for the operation to cancel.
1662  */
1663 void
1664 GNUNET_PSYC_channel_state_get_cancel (struct GNUNET_PSYC_StateQuery *query)
1665 {
1666
1667 }
1668
1669
1670 /* end of psyc_api.c */