-preparations for replacement of try_connect call
[oweals/gnunet.git] / src / social / gnunet-service-social.c
1 /*
2  * This file is part of GNUnet
3  * Copyright (C) 2013 Christian Grothoff (and other contributing authors)
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20
21 /**
22  * @file social/gnunet-service-social.c
23  * @brief Social service
24  * @author Gabor X Toth
25  */
26
27 #include <inttypes.h>
28
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_statistics_service.h"
34 #include "gnunet_psyc_service.h"
35 #include "gnunet_psyc_util_lib.h"
36 #include "gnunet_social_service.h"
37 #include "social.h"
38
39
40 /**
41  * Handle to our current configuration.
42  */
43 static const struct GNUNET_CONFIGURATION_Handle *cfg;
44
45 /**
46  * Handle to the statistics service.
47  */
48 static struct GNUNET_STATISTICS_Handle *stats;
49
50 /**
51  * Notification context, simplifies client broadcasts.
52  */
53 static struct GNUNET_SERVER_NotificationContext *nc;
54
55 /**
56  * All connected hosts.
57  * Place's pub_key_hash -> struct Host
58  */
59 static struct GNUNET_CONTAINER_MultiHashMap *hosts;
60
61 /**
62  * All connected guests.
63  * Place's pub_key_hash -> struct Guest
64  */
65 static struct GNUNET_CONTAINER_MultiHashMap *guests;
66
67 /**
68  * Connected guests per place.
69  * Place's pub_key_hash -> Guest's pub_key -> struct Guest
70  */
71 static struct GNUNET_CONTAINER_MultiHashMap *place_guests;
72
73
74 /**
75  * Message fragment transmission queue.
76  */
77 struct FragmentTransmitQueue
78 {
79   struct FragmentTransmitQueue *prev;
80   struct FragmentTransmitQueue *next;
81
82   struct GNUNET_SERVER_Client *client;
83
84   /**
85    * Pointer to the next message part inside the data after this struct.
86    */
87   struct GNUNET_MessageHeader *next_part;
88
89   /**
90    * Size of message.
91    */
92   uint16_t size;
93
94   /**
95    * @see enum GNUNET_PSYC_MessageState
96    */
97   uint8_t state;
98
99   /* Followed by one or more message parts. */
100 };
101
102
103 /**
104  * Message transmission queue.
105  */
106 struct MessageTransmitQueue
107 {
108   struct MessageTransmitQueue *prev;
109   struct MessageTransmitQueue *next;
110
111   struct FragmentTransmitQueue *frags_head;
112   struct FragmentTransmitQueue *frags_tail;
113
114   struct GNUNET_SERVER_Client *client;
115 };
116
117 /**
118  * List of connected clients.
119  */
120 struct ClientListItem
121 {
122   struct ClientListItem *prev;
123   struct ClientListItem *next;
124
125   struct GNUNET_SERVER_Client *client;
126 };
127
128
129 /**
130  * Common part of the client context for both a host and guest.
131  */
132 struct Place
133 {
134   struct ClientListItem *clients_head;
135   struct ClientListItem *clients_tail;
136
137   struct MessageTransmitQueue *tmit_msgs_head;
138   struct MessageTransmitQueue *tmit_msgs_tail;
139
140   struct GNUNET_PSYC_Channel *channel;
141
142   /**
143    * Public key of the channel.
144    */
145   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
146
147   /**
148    * Hash of @a pub_key.
149    */
150   struct GNUNET_HashCode pub_key_hash;
151
152   /**
153    * Last message ID received for the place.
154    * 0 if there is no such message.
155    */
156   uint64_t max_message_id;
157
158   /**
159    * Is this a host (#GNUNET_YES), or guest (#GNUNET_NO)?
160    */
161   uint8_t is_host;
162
163   /**
164    * Is this place ready to receive messages from client?
165    * #GNUNET_YES or #GNUNET_NO
166    */
167   uint8_t is_ready;
168
169   /**
170    * Is the client disconnected?
171    * #GNUNET_YES or #GNUNET_NO
172    */
173   uint8_t is_disconnected;
174 };
175
176
177 /**
178  * Client context for a host.
179  */
180 struct Host
181 {
182   /**
183    * Place struct common for Host and Guest
184    */
185   struct Place plc;
186
187   /**
188    * Private key of the channel.
189    */
190   struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
191
192   /**
193    * Handle for the multicast origin.
194    */
195   struct GNUNET_PSYC_Master *master;
196
197   /**
198    * Transmit handle for multicast.
199    */
200   struct GNUNET_PSYC_MasterTransmitHandle *tmit_handle;
201
202   /**
203    * Incoming join requests.
204    * guest_key -> struct GNUNET_PSYC_JoinHandle *
205    */
206   struct GNUNET_CONTAINER_MultiHashMap *join_reqs;
207
208   /**
209    * @see enum GNUNET_PSYC_Policy
210    */
211   enum GNUNET_PSYC_Policy policy;
212 };
213
214
215 /**
216  * Client context for a guest.
217  */
218 struct Guest
219 {
220   /**
221    * Place struct common for Host and Guest.
222    */
223   struct Place plc;
224
225   /**
226    * Private key of the slave.
227    */
228   struct GNUNET_CRYPTO_EcdsaPrivateKey priv_key;
229
230   /**
231    * Public key of the slave.
232    */
233   struct GNUNET_CRYPTO_EcdsaPublicKey pub_key;
234
235   /**
236    * Hash of @a pub_key.
237    */
238   struct GNUNET_HashCode pub_key_hash;
239
240   /**
241    * Handle for the PSYC slave.
242    */
243   struct GNUNET_PSYC_Slave *slave;
244
245   /**
246    * Transmit handle for multicast.
247    */
248   struct GNUNET_PSYC_SlaveTransmitHandle *tmit_handle;
249
250   /**
251    * Peer identity of the origin.
252    */
253   struct GNUNET_PeerIdentity origin;
254
255   /**
256    * Number of items in @a relays.
257    */
258   uint32_t relay_count;
259
260   /**
261    * Relays that multicast can use to connect.
262    */
263   struct GNUNET_PeerIdentity *relays;
264
265   /**
266    * Join request to be transmitted to the master on join.
267    */
268   struct GNUNET_MessageHeader *join_req;
269
270   /**
271    * Join decision received from PSYC.
272    */
273   struct GNUNET_PSYC_JoinDecisionMessage *join_dcsn;
274
275 };
276
277
278 struct Client
279 {
280   /**
281    * Place where the client entered.
282    */
283   struct Place *plc;
284
285   /**
286    * Message queue for the message currently being transmitted
287    * by this client.
288    */
289   struct MessageTransmitQueue *tmit_msg;
290 };
291
292
293 struct OperationClosure
294 {
295   struct GNUNET_SERVER_Client *client;
296   struct Place *plc;
297   uint64_t op_id;
298   uint32_t flags;
299 };
300
301
302 static int
303 psyc_transmit_message (struct Place *plc);
304
305
306 /**
307  * Task run during shutdown.
308  *
309  * @param cls unused
310  * @param tc unused
311  */
312 static void
313 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
314 {
315   if (NULL != nc)
316   {
317     GNUNET_SERVER_notification_context_destroy (nc);
318     nc = NULL;
319   }
320   if (NULL != stats)
321   {
322     GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
323     stats = NULL;
324   }
325 }
326
327
328 /**
329  * Clean up host data structures after a client disconnected.
330  */
331 static void
332 cleanup_host (struct Host *hst)
333 {
334   struct Place *plc = &hst->plc;
335
336   if (NULL != hst->master)
337     GNUNET_PSYC_master_stop (hst->master, GNUNET_NO, NULL, NULL); // FIXME
338   GNUNET_CONTAINER_multihashmap_destroy (hst->join_reqs);
339   GNUNET_CONTAINER_multihashmap_remove (hosts, &plc->pub_key_hash, plc);
340 }
341
342
343 /**
344  * Clean up guest data structures after a client disconnected.
345  */
346 static void
347 cleanup_guest (struct Guest *gst)
348 {
349   struct Place *plc = &gst->plc;
350   struct GNUNET_CONTAINER_MultiHashMap *
351     plc_gst = GNUNET_CONTAINER_multihashmap_get (place_guests,
352                                                 &plc->pub_key_hash);
353   GNUNET_assert (NULL != plc_gst); // FIXME
354   GNUNET_CONTAINER_multihashmap_remove (plc_gst, &gst->pub_key_hash, gst);
355
356   if (0 == GNUNET_CONTAINER_multihashmap_size (plc_gst))
357   {
358     GNUNET_CONTAINER_multihashmap_remove (place_guests, &plc->pub_key_hash,
359                                           plc_gst);
360     GNUNET_CONTAINER_multihashmap_destroy (plc_gst);
361   }
362   GNUNET_CONTAINER_multihashmap_remove (guests, &plc->pub_key_hash, gst);
363
364   if (NULL != gst->join_req)
365     GNUNET_free (gst->join_req);
366   if (NULL != gst->relays)
367     GNUNET_free (gst->relays);
368   if (NULL != gst->slave)
369     GNUNET_PSYC_slave_part (gst->slave, GNUNET_NO, NULL, NULL); // FIXME
370   GNUNET_CONTAINER_multihashmap_remove (guests, &plc->pub_key_hash, plc);
371 }
372
373
374 /**
375  * Clean up place data structures after a client disconnected.
376  */
377 static void
378 cleanup_place (struct Place *plc)
379 {
380   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
381               "%p Cleaning up place %s\n",
382               plc, GNUNET_h2s (&plc->pub_key_hash));
383
384   (GNUNET_YES == plc->is_host)
385     ? cleanup_host ((struct Host *) plc)
386     : cleanup_guest ((struct Guest *) plc);
387   GNUNET_free (plc);
388 }
389
390
391 static void
392 schedule_cleanup_place (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
393 {
394   cleanup_place (cls);
395 }
396
397
398 /**
399  * Called whenever a client is disconnected.
400  * Frees our resources associated with that client.
401  *
402  * @param cls Closure.
403  * @param client Identification of the client.
404  */
405 static void
406 client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
407 {
408   if (NULL == client)
409     return;
410
411   struct Client *
412     ctx = GNUNET_SERVER_client_get_user_context (client, struct Client);
413   if (NULL == ctx)
414   {
415     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
416                 "%p User context is NULL in client_disconnect()\n", ctx);
417     GNUNET_break (0);
418     return;
419   }
420
421   struct Place *plc = ctx->plc;
422   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
423               "%p Client (%s) disconnected from place %s\n",
424               plc, (GNUNET_YES == plc->is_host) ? "host" : "guest",
425               GNUNET_h2s (&plc->pub_key_hash));
426
427   struct ClientListItem *cli = plc->clients_head;
428   while (NULL != cli)
429   {
430     if (cli->client == client)
431     {
432       GNUNET_CONTAINER_DLL_remove (plc->clients_head, plc->clients_tail, cli);
433       GNUNET_free (cli);
434       break;
435     }
436     cli = cli->next;
437   }
438
439   if (NULL == plc->clients_head)
440   { /* Last client disconnected. */
441     if (GNUNET_YES != plc->is_disconnected)
442     {
443       plc->is_disconnected = GNUNET_YES;
444       if (NULL != plc->tmit_msgs_head)
445       { /* Send pending messages to PSYC before cleanup. */
446         psyc_transmit_message (plc);
447       }
448       else
449       {
450         cleanup_place (plc);
451       }
452     }
453   }
454 }
455
456
457 /**
458  * Send message to all clients connected to the channel.
459  */
460 static void
461 client_send_msg (const struct Place *plc,
462                  const struct GNUNET_MessageHeader *msg)
463 {
464   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
465               "%p Sending message to clients.\n", plc);
466
467   struct ClientListItem *cli = plc->clients_head;
468   while (NULL != cli)
469   {
470     GNUNET_SERVER_notification_context_add (nc, cli->client);
471     GNUNET_SERVER_notification_context_unicast (nc, cli->client, msg, GNUNET_NO);
472     cli = cli->next;
473   }
474 }
475
476
477 /**
478  * Send a result code back to the client.
479  *
480  * @param client
481  *        Client that should receive the result code.
482  * @param result_code
483  *        Code to transmit.
484  * @param op_id
485  *        Operation ID in network byte order.
486  * @param data
487  *        Data payload or NULL.
488  * @param data_size
489  *        Size of @a data.
490  */
491 static void
492 client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id,
493                     int64_t result_code, const void *data, uint16_t data_size)
494 {
495   struct GNUNET_OperationResultMessage *res;
496
497   res = GNUNET_malloc (sizeof (*res) + data_size);
498   res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
499   res->header.size = htons (sizeof (*res) + data_size);
500   res->result_code = GNUNET_htonll (result_code);
501   res->op_id = op_id;
502   if (0 < data_size)
503     memcpy (&res[1], data, data_size);
504
505   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
506               "%p Sending result to client for operation #%" PRIu64 ": "
507               "%" PRId64 " (size: %u)\n",
508               client, GNUNET_ntohll (op_id), result_code, data_size);
509
510   GNUNET_SERVER_notification_context_add (nc, client);
511   GNUNET_SERVER_notification_context_unicast (nc, client, &res->header,
512                                               GNUNET_NO);
513   GNUNET_free (res);
514 }
515
516
517 /**
518  * Called after a PSYC master is started.
519  */
520 static void
521 psyc_master_started (void *cls, int result, uint64_t max_message_id)
522 {
523   struct Host *hst = cls;
524   struct Place *plc = &hst->plc;
525   plc->max_message_id = max_message_id;
526   plc->is_ready = GNUNET_YES;
527
528   struct GNUNET_PSYC_CountersResultMessage res;
529   res.header.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_HOST_ENTER_ACK);
530   res.header.size = htons (sizeof (res));
531   res.result_code = htonl (result);
532   res.max_message_id = GNUNET_htonll (plc->max_message_id);
533
534   client_send_msg (plc, &res.header);
535 }
536
537
538 /**
539  * Called when a PSYC master receives a join request.
540  */
541 static void
542 psyc_recv_join_request (void *cls,
543                         const struct GNUNET_PSYC_JoinRequestMessage *req,
544                         const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
545                         const struct GNUNET_PSYC_Message *join_msg,
546                         struct GNUNET_PSYC_JoinHandle *jh)
547 {
548   struct Host *hst = cls;
549   struct GNUNET_HashCode slave_key_hash;
550   GNUNET_CRYPTO_hash (slave_key, sizeof (*slave_key), &slave_key_hash);
551   GNUNET_CONTAINER_multihashmap_put (hst->join_reqs, &slave_key_hash, jh,
552                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
553   client_send_msg (&hst->plc, &req->header);
554 }
555
556
557 /**
558  * Called after a PSYC slave is connected.
559  */
560 static void
561 psyc_slave_connected (void *cls, int result, uint64_t max_message_id)
562 {
563   struct Guest *gst = cls;
564   struct Place *plc = &gst->plc;
565   plc->max_message_id = max_message_id;
566   plc->is_ready = GNUNET_YES;
567
568   struct GNUNET_PSYC_CountersResultMessage res;
569   res.header.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_GUEST_ENTER_ACK);
570   res.header.size = htons (sizeof (res));
571   res.result_code = htonl (result);
572   res.max_message_id = GNUNET_htonll (plc->max_message_id);
573
574   client_send_msg (plc, &res.header);
575 }
576
577
578 /**
579  * Called when a PSYC slave receives a join decision.
580  */
581 static void
582 psyc_recv_join_dcsn (void *cls,
583                      const struct GNUNET_PSYC_JoinDecisionMessage *dcsn,
584                      int is_admitted,
585                      const struct GNUNET_PSYC_Message *join_msg)
586 {
587   struct Guest *gst = cls;
588   client_send_msg (&gst->plc, &dcsn->header);
589 }
590
591
592 /**
593  * Called when a PSYC master or slave receives a message.
594  */
595 static void
596 psyc_recv_message (void *cls,
597                    uint64_t message_id,
598                    uint32_t flags,
599                    const struct GNUNET_PSYC_MessageHeader *msg)
600 {
601   struct Place *plc = cls;
602
603   char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&msg->slave_key);
604   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
605               "%p Received PSYC message of size %u from %s.\n",
606               plc, ntohs (msg->header.size), str);
607   GNUNET_free (str);
608
609   client_send_msg (plc, &msg->header);
610
611   /* FIXME: further processing */
612 }
613
614
615 /**
616  * Initialize place data structure.
617  */
618 static void
619 place_init (struct Place *plc)
620 {
621
622 }
623
624
625 /**
626  * Handle a connecting client entering a place as host.
627  */
628 static void
629 client_recv_host_enter (void *cls, struct GNUNET_SERVER_Client *client,
630                         const struct GNUNET_MessageHeader *msg)
631 {
632   const struct HostEnterRequest *req
633     = (const struct HostEnterRequest *) msg;
634
635   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
636   struct GNUNET_HashCode pub_key_hash;
637
638   GNUNET_CRYPTO_eddsa_key_get_public (&req->place_key, &pub_key);
639   GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
640
641   struct Host *
642     hst = GNUNET_CONTAINER_multihashmap_get (hosts, &pub_key_hash);
643   struct Place *plc;
644
645   if (NULL == hst)
646   {
647     hst = GNUNET_new (struct Host);
648     hst->policy = ntohl (req->policy);
649     hst->priv_key = req->place_key;
650     hst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
651
652     plc = &hst->plc;
653     plc->is_host = GNUNET_YES;
654     plc->pub_key = pub_key;
655     plc->pub_key_hash = pub_key_hash;
656     place_init (plc);
657
658     GNUNET_CONTAINER_multihashmap_put (hosts, &plc->pub_key_hash, plc,
659                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
660     hst->master = GNUNET_PSYC_master_start (cfg, &hst->priv_key, hst->policy,
661                                             &psyc_master_started,
662                                             &psyc_recv_join_request,
663                                             &psyc_recv_message, NULL, hst);
664     hst->plc.channel = GNUNET_PSYC_master_get_channel (hst->master);
665   }
666   else
667   {
668     plc = &hst->plc;
669
670     struct GNUNET_PSYC_CountersResultMessage res;
671     res.header.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_HOST_ENTER_ACK);
672     res.header.size = htons (sizeof (res));
673     res.result_code = htonl (GNUNET_OK);
674     res.max_message_id = GNUNET_htonll (plc->max_message_id);
675
676     GNUNET_SERVER_notification_context_add (nc, client);
677     GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
678                                                 GNUNET_NO);
679   }
680
681   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
682               "%p Client connected as host to place %s.\n",
683               hst, GNUNET_h2s (&plc->pub_key_hash));
684
685   struct ClientListItem *cli = GNUNET_new (struct ClientListItem);
686   cli->client = client;
687   GNUNET_CONTAINER_DLL_insert (plc->clients_head, plc->clients_tail, cli);
688
689   struct Client *ctx = GNUNET_new (struct Client);
690   ctx->plc = plc;
691   GNUNET_SERVER_client_set_user_context (client, ctx);
692   GNUNET_SERVER_receive_done (client, GNUNET_OK);
693 }
694
695
696 /**
697  * Handle a connecting client entering a place as guest.
698  */
699 static void
700 client_recv_guest_enter (void *cls, struct GNUNET_SERVER_Client *client,
701                          const struct GNUNET_MessageHeader *msg)
702 {
703   const struct GuestEnterRequest *req
704     = (const struct GuestEnterRequest *) msg;
705   uint16_t req_size = ntohs (req->header.size);
706
707   struct GNUNET_CRYPTO_EcdsaPublicKey gst_pub_key;
708   struct GNUNET_HashCode pub_key_hash, gst_pub_key_hash;
709
710   GNUNET_CRYPTO_ecdsa_key_get_public (&req->guest_key, &gst_pub_key);
711   GNUNET_CRYPTO_hash (&gst_pub_key, sizeof (gst_pub_key), &gst_pub_key_hash);
712   GNUNET_CRYPTO_hash (&req->place_key, sizeof (req->place_key), &pub_key_hash);
713
714   struct GNUNET_CONTAINER_MultiHashMap *
715     plc_gst = GNUNET_CONTAINER_multihashmap_get (place_guests, &pub_key_hash);
716   struct Guest *gst = NULL;
717   struct Place *plc;
718
719   if (NULL != plc_gst)
720   {
721     gst = GNUNET_CONTAINER_multihashmap_get (plc_gst, &gst_pub_key_hash);
722   }
723   if (NULL == gst || NULL == gst->slave)
724   {
725     gst = GNUNET_new (struct Guest);
726     gst->priv_key = req->guest_key;
727     gst->pub_key = gst_pub_key;
728     gst->pub_key_hash = gst_pub_key_hash;
729     gst->origin = req->origin;
730     gst->relay_count = ntohl (req->relay_count);
731
732     const struct GNUNET_PeerIdentity *
733       relays = (const struct GNUNET_PeerIdentity *) &req[1];
734     uint16_t relay_size = gst->relay_count * sizeof (*relays);
735     struct GNUNET_PSYC_Message *join_msg = NULL;
736     uint16_t join_msg_size = 0;
737
738     if (sizeof (*req) + relay_size + sizeof (struct GNUNET_MessageHeader)
739         <= req_size)
740     {
741       join_msg = (struct GNUNET_PSYC_Message *)
742         (((char *) &req[1]) + relay_size);
743       join_msg_size = ntohs (join_msg->header.size);
744     }
745     if (sizeof (*req) + relay_size + join_msg_size != req_size)
746     {
747       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
748                   "%u + %u + %u != %u\n",
749                   sizeof (*req), relay_size, join_msg_size, req_size);
750       GNUNET_break (0);
751       GNUNET_SERVER_client_disconnect (client);
752       GNUNET_free (gst);
753       return;
754     }
755     if (0 < gst->relay_count)
756     {
757       gst->relays = GNUNET_malloc (relay_size);
758       memcpy (gst->relays, &req[1], relay_size);
759     }
760
761     plc = &gst->plc;
762     plc->is_host = GNUNET_NO;
763     plc->pub_key = req->place_key;
764     plc->pub_key_hash = pub_key_hash;
765     place_init (plc);
766
767     if (NULL == plc_gst)
768     {
769       plc_gst = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
770       (void) GNUNET_CONTAINER_multihashmap_put (place_guests, &plc->pub_key_hash, plc_gst,
771                                                 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
772     }
773     (void) GNUNET_CONTAINER_multihashmap_put (plc_gst, &gst->pub_key_hash, gst,
774                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
775     (void) GNUNET_CONTAINER_multihashmap_put (guests, &plc->pub_key_hash, gst,
776                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
777     gst->slave
778       = GNUNET_PSYC_slave_join (cfg, &plc->pub_key, &gst->priv_key,
779                                 &gst->origin, gst->relay_count, gst->relays,
780                                 &psyc_recv_message, NULL, &psyc_slave_connected,
781                                 &psyc_recv_join_dcsn, gst, join_msg);
782     gst->plc.channel = GNUNET_PSYC_slave_get_channel (gst->slave);
783   }
784   else
785   {
786     plc = &gst->plc;
787
788     struct GNUNET_PSYC_CountersResultMessage res;
789     res.header.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_GUEST_ENTER_ACK);
790     res.header.size = htons (sizeof (res));
791     res.result_code = htonl (GNUNET_OK);
792     res.max_message_id = GNUNET_htonll (plc->max_message_id);
793
794     GNUNET_SERVER_notification_context_add (nc, client);
795     GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
796                                                 GNUNET_NO);
797     if (NULL != gst->join_dcsn)
798     {
799       GNUNET_SERVER_notification_context_add (nc, client);
800       GNUNET_SERVER_notification_context_unicast (nc, client,
801                                                   &gst->join_dcsn->header,
802                                                   GNUNET_NO);
803     }
804   }
805
806   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
807               "%p Client connected as guest to place %s.\n",
808               gst, GNUNET_h2s (&plc->pub_key_hash));
809
810   struct ClientListItem *cli = GNUNET_new (struct ClientListItem);
811   cli->client = client;
812   GNUNET_CONTAINER_DLL_insert (plc->clients_head, plc->clients_tail, cli);
813
814   struct Client *ctx = GNUNET_new (struct Client);
815   ctx->plc = plc;
816   GNUNET_SERVER_client_set_user_context (client, ctx);
817   GNUNET_SERVER_receive_done (client, GNUNET_OK);
818 }
819
820
821 struct JoinDecisionClosure
822 {
823   int32_t is_admitted;
824   struct GNUNET_PSYC_Message *msg;
825 };
826
827
828 /**
829  * Iterator callback for responding to join requests.
830  */
831 static int
832 psyc_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash,
833                          void *value)
834 {
835   struct JoinDecisionClosure *jcls = cls;
836   struct GNUNET_PSYC_JoinHandle *jh = value;
837   // FIXME: add relays
838   GNUNET_PSYC_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg);
839   return GNUNET_YES;
840 }
841
842
843 /**
844  * Handle an entry decision from a host client.
845  */
846 static void
847 client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
848                            const struct GNUNET_MessageHeader *msg)
849 {
850   struct Client *
851     ctx = GNUNET_SERVER_client_get_user_context (client, struct Client);
852   GNUNET_assert (NULL != ctx);
853   struct Place *plc = ctx->plc;
854   GNUNET_assert (GNUNET_YES == plc->is_host);
855   struct Host *hst = (struct Host *) plc;
856
857   struct GNUNET_PSYC_JoinDecisionMessage *
858     dcsn = (struct GNUNET_PSYC_JoinDecisionMessage *) msg;
859   struct JoinDecisionClosure jcls;
860   jcls.is_admitted = ntohl (dcsn->is_admitted);
861   jcls.msg
862     = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (msg->size))
863     ? (struct GNUNET_PSYC_Message *) &dcsn[1]
864     : NULL;
865
866   struct GNUNET_HashCode slave_key_hash;
867   GNUNET_CRYPTO_hash (&dcsn->slave_key, sizeof (dcsn->slave_key),
868                       &slave_key_hash);
869
870   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
871               "%p Got join decision (%d) from client for place %s..\n",
872               hst, jcls.is_admitted, GNUNET_h2s (&plc->pub_key_hash));
873   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
874               "%p ..and slave %s.\n",
875               hst, GNUNET_h2s (&slave_key_hash));
876
877   GNUNET_CONTAINER_multihashmap_get_multiple (hst->join_reqs, &slave_key_hash,
878                                               &psyc_send_join_decision, &jcls);
879   GNUNET_CONTAINER_multihashmap_remove_all (hst->join_reqs, &slave_key_hash);
880   GNUNET_SERVER_receive_done (client, GNUNET_OK);
881 }
882
883
884 /**
885  * Send acknowledgement to a client.
886  *
887  * Sent after a message fragment has been passed on to multicast.
888  *
889  * @param plc The place struct for the client.
890  */
891 static void
892 send_message_ack (struct Place *plc, struct GNUNET_SERVER_Client *client)
893 {
894   struct GNUNET_MessageHeader res;
895   res.size = htons (sizeof (res));
896   res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
897
898   GNUNET_SERVER_notification_context_add (nc, client);
899   GNUNET_SERVER_notification_context_unicast (nc, client, &res, GNUNET_NO);
900 }
901
902
903 /**
904  * Proceed to the next message part in the transmission queue.
905  *
906  * @param plc
907  *        Place where the transmission is going on.
908  * @param tmit_msg
909  *        Currently transmitted message.
910  * @param tmit_frag
911  *        Currently transmitted message fragment.
912  *
913  * @return @a tmit_frag, or NULL if reached the end of fragment.
914  */
915 static struct FragmentTransmitQueue *
916 psyc_transmit_queue_next_part (struct Place *plc,
917                                struct MessageTransmitQueue *tmit_msg,
918                                struct FragmentTransmitQueue *tmit_frag)
919 {
920   uint16_t psize = ntohs (tmit_frag->next_part->size);
921   if ((char *) tmit_frag->next_part + psize - ((char *) &tmit_frag[1])
922       < tmit_frag->size)
923   {
924     tmit_frag->next_part
925       = (struct GNUNET_MessageHeader *) ((char *) tmit_frag->next_part + psize);
926   }
927   else /* Reached end of current fragment. */
928   {
929     if (NULL != tmit_frag->client)
930       send_message_ack (plc, tmit_frag->client);
931     GNUNET_CONTAINER_DLL_remove (tmit_msg->frags_head, tmit_msg->frags_tail, tmit_frag);
932     GNUNET_free (tmit_frag);
933     tmit_frag = NULL;
934   }
935   return tmit_frag;
936 }
937
938
939 /**
940  * Proceed to next message in transmission queue.
941  *
942  * @param plc
943  *        Place where the transmission is going on.
944  * @param tmit_msg
945  *        Currently transmitted message.
946  *
947  * @return The next message in queue, or NULL if queue is empty.
948  */
949 static struct MessageTransmitQueue *
950 psyc_transmit_queue_next_msg (struct Place *plc,
951                               struct MessageTransmitQueue *tmit_msg)
952 {
953   GNUNET_CONTAINER_DLL_remove (plc->tmit_msgs_head, plc->tmit_msgs_tail, tmit_msg);
954   GNUNET_free (tmit_msg);
955   return plc->tmit_msgs_head;
956 }
957
958
959 /**
960  * Callback for data transmission to PSYC.
961  */
962 static int
963 psyc_transmit_notify_data (void *cls, uint16_t *data_size, void *data)
964 {
965   struct Place *plc = cls;
966   struct MessageTransmitQueue *tmit_msg = plc->tmit_msgs_head;
967   GNUNET_assert (NULL != tmit_msg);
968   struct FragmentTransmitQueue *tmit_frag = tmit_msg->frags_head;
969   if (NULL == tmit_frag)
970   { /* Rest of the message have not arrived yet, pause transmission */
971     *data_size = 0;
972     return GNUNET_NO;
973   }
974   struct GNUNET_MessageHeader *pmsg = tmit_frag->next_part;
975   if (NULL == pmsg)
976   {
977     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
978                 "%p psyc_transmit_notify_data: nothing to send.\n", plc);
979     *data_size = 0;
980     return GNUNET_NO;
981   }
982
983   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
984               "%p psyc_transmit_notify_data()\n", plc);
985   GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
986
987   uint16_t ptype = ntohs (pmsg->type);
988   uint16_t pdata_size = ntohs (pmsg->size) - sizeof (*pmsg);
989   int ret;
990
991   switch (ptype)
992   {
993   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
994     if (*data_size < pdata_size)
995     {
996       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
997                 "%p psyc_transmit_notify_data: buffer size too small for data.\n", plc);
998       *data_size = 0;
999       return GNUNET_NO;
1000     }
1001     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1002                 "%p psyc_transmit_notify_data: sending %u bytes.\n",
1003                 plc, pdata_size);
1004
1005     *data_size = pdata_size;
1006     memcpy (data, &pmsg[1], *data_size);
1007     ret = GNUNET_NO;
1008     break;
1009
1010   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1011     *data_size = 0;
1012     ret = GNUNET_YES;
1013     break;
1014
1015   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1016     *data_size = 0;
1017     ret = GNUNET_SYSERR;
1018     break;
1019
1020   default:
1021     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1022                 "%p psyc_transmit_notify_data: unexpected message part of type %u.\n",
1023                 plc, ptype);
1024     ret = GNUNET_SYSERR;
1025   }
1026
1027   if (GNUNET_SYSERR == ret && GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL != ptype)
1028   {
1029     *data_size = 0;
1030     tmit_msg = psyc_transmit_queue_next_msg (plc, tmit_msg);
1031     plc->is_disconnected = GNUNET_YES;
1032     GNUNET_SERVER_client_disconnect (tmit_frag->client);
1033     GNUNET_SCHEDULER_add_now (&schedule_cleanup_place, plc);
1034     return ret;
1035   }
1036   else
1037   {
1038     tmit_frag = psyc_transmit_queue_next_part (plc, tmit_msg, tmit_frag);
1039     if (NULL != tmit_frag)
1040     {
1041       struct GNUNET_MessageHeader *pmsg = tmit_frag->next_part;
1042       ptype = ntohs (pmsg->type);
1043       switch (ptype)
1044       {
1045       case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1046         ret = GNUNET_YES;
1047         break;
1048       case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1049         ret = GNUNET_SYSERR;
1050         break;
1051       }
1052       switch (ptype)
1053       {
1054       case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1055       case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1056         tmit_frag = psyc_transmit_queue_next_part (plc, tmit_msg, tmit_frag);
1057       }
1058     }
1059
1060     if (NULL == tmit_msg->frags_head
1061         && GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END <= ptype)
1062     { /* Reached end of current message. */
1063       tmit_msg = psyc_transmit_queue_next_msg (plc, tmit_msg);
1064     }
1065   }
1066
1067   if (ret != GNUNET_NO)
1068   {
1069     if (NULL != tmit_msg)
1070     {
1071       psyc_transmit_message (plc);
1072     }
1073     else if (GNUNET_YES == plc->is_disconnected)
1074     {
1075       /* FIXME: handle partial message (when still in_transmit) */
1076       cleanup_place (plc);
1077     }
1078   }
1079   return ret;
1080 }
1081
1082
1083 /**
1084  * Callback for modifier transmission to PSYC.
1085  */
1086 static int
1087 psyc_transmit_notify_mod (void *cls, uint16_t *data_size, void *data,
1088                           uint8_t *oper, uint32_t *full_value_size)
1089 {
1090   struct Place *plc = cls;
1091   struct MessageTransmitQueue *tmit_msg = plc->tmit_msgs_head;
1092   GNUNET_assert (NULL != tmit_msg);
1093   struct FragmentTransmitQueue *tmit_frag = tmit_msg->frags_head;
1094   if (NULL == tmit_frag)
1095   { /* Rest of the message have not arrived yet, pause transmission */
1096     *data_size = 0;
1097     return GNUNET_NO;
1098   }
1099   struct GNUNET_MessageHeader *pmsg = tmit_frag->next_part;
1100   if (NULL == pmsg)
1101   {
1102     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1103                 "%p psyc_transmit_notify_mod: nothing to send.\n", plc);
1104     *data_size = 0;
1105     return GNUNET_NO;
1106   }
1107
1108   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1109               "%p psyc_transmit_notify_mod()\n", plc);
1110   GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
1111
1112   uint16_t ptype = ntohs (pmsg->type);
1113   int ret;
1114
1115   switch (ptype)
1116   {
1117   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
1118   {
1119     if (NULL == oper)
1120     {
1121       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1122                   "%p psyc_transmit_notify_mod: oper is NULL.\n", plc);
1123       ret = GNUNET_SYSERR;
1124       break;
1125     }
1126     struct GNUNET_PSYC_MessageModifier *
1127       pmod = (struct GNUNET_PSYC_MessageModifier *) tmit_frag->next_part;
1128     uint16_t mod_size = ntohs (pmod->header.size) - sizeof (*pmod);
1129
1130     if (*data_size < mod_size)
1131     {
1132       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1133                 "%p psyc_transmit_notify_mod: buffer size too small for data.\n", plc);
1134       *data_size = 0;
1135       return GNUNET_NO;
1136     }
1137
1138     *full_value_size = ntohl (pmod->value_size);
1139     *oper = pmod->oper;
1140     *data_size = mod_size;
1141     memcpy (data, &pmod[1], mod_size);
1142     ret = GNUNET_NO;
1143     break;
1144   }
1145
1146   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
1147   {
1148     if (NULL != oper)
1149     {
1150       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1151                   "%p psyc_transmit_notify_mod: oper is not NULL.\n", plc);
1152       ret = GNUNET_SYSERR;
1153       break;
1154     }
1155     uint16_t mod_size = ntohs (pmsg->size) - sizeof (*pmsg);
1156     if (*data_size < mod_size)
1157     {
1158       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1159                 "%p psyc_transmit_notify_mod: buffer size too small for data.\n", plc);
1160       *data_size = 0;
1161       return GNUNET_NO;
1162     }
1163     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1164                 "%p psyc_transmit_notify_mod: sending %u bytes.\n", plc, mod_size);
1165
1166     *data_size = mod_size;
1167     memcpy (data, &pmsg[1], *data_size);
1168     ret = GNUNET_NO;
1169     break;
1170   }
1171
1172   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
1173   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1174   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1175     *data_size = 0;
1176     ret = GNUNET_YES;
1177     break;
1178
1179   default:
1180     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1181                 "%p psyc_transmit_notify_mod: unexpected message part of type %u.\n",
1182                 plc, ptype);
1183     ret = GNUNET_SYSERR;
1184   }
1185
1186   if (GNUNET_SYSERR == ret)
1187   {
1188     *data_size = 0;
1189     ret = GNUNET_SYSERR;
1190     tmit_msg = psyc_transmit_queue_next_msg (plc, tmit_msg);
1191     plc->is_disconnected = GNUNET_YES;
1192     GNUNET_SERVER_client_disconnect (tmit_frag->client);
1193     GNUNET_SCHEDULER_add_now (&schedule_cleanup_place, plc);
1194   }
1195   else
1196   {
1197     if (GNUNET_YES != ret)
1198       psyc_transmit_queue_next_part (plc, tmit_msg, tmit_frag);
1199
1200     if (NULL == tmit_msg->frags_head
1201         && GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END <= ptype)
1202     { /* Reached end of current message. */
1203       tmit_msg = psyc_transmit_queue_next_msg (plc, tmit_msg);
1204     }
1205   }
1206   return ret;
1207 }
1208
1209 /**
1210  * Callback for data transmission from a host to PSYC.
1211  */
1212 static int
1213 host_transmit_notify_data (void *cls, uint16_t *data_size, void *data)
1214 {
1215   int ret = psyc_transmit_notify_data (cls, data_size, data);
1216
1217   if (GNUNET_NO != ret)
1218   {
1219     struct Host *hst = cls;
1220     hst->tmit_handle = NULL;
1221   }
1222   return ret;
1223 }
1224
1225
1226 /**
1227  * Callback for the transmit functions of multicast.
1228  */
1229 static int
1230 guest_transmit_notify_data (void *cls, uint16_t *data_size, void *data)
1231 {
1232   int ret = psyc_transmit_notify_data (cls, data_size, data);
1233
1234   if (GNUNET_NO != ret)
1235   {
1236     struct Guest *gst = cls;
1237     gst->tmit_handle = NULL;
1238   }
1239   return ret;
1240 }
1241
1242
1243 /**
1244  * Callback for modifier transmission from a host to PSYC.
1245  */
1246 static int
1247 host_transmit_notify_mod (void *cls, uint16_t *data_size, void *data,
1248                           uint8_t *oper, uint32_t *full_value_size)
1249 {
1250   int ret = psyc_transmit_notify_mod (cls, data_size, data,
1251                                       oper, full_value_size);
1252   if (GNUNET_SYSERR == ret)
1253   {
1254     struct Host *hst = cls;
1255     hst->tmit_handle = NULL;
1256   }
1257   return ret;
1258 }
1259
1260
1261 /**
1262  * Callback for modifier transmission from a guest to PSYC.
1263  */
1264 static int
1265 guest_transmit_notify_mod (void *cls, uint16_t *data_size, void *data,
1266                            uint8_t *oper, uint32_t *full_value_size)
1267 {
1268   int ret = psyc_transmit_notify_mod (cls, data_size, data,
1269                                       oper, full_value_size);
1270   if (GNUNET_SYSERR == ret)
1271   {
1272     struct Guest *gst = cls;
1273     gst->tmit_handle = NULL;
1274   }
1275   return ret;
1276 }
1277
1278
1279 /**
1280  * Get method part of next message from transmission queue.
1281  *
1282  * @param tmit_msg
1283  *        Next item in message transmission queue.
1284  * @param[out] pmeth
1285  *        The message method is returned here.
1286  *
1287  * @return #GNUNET_OK on success
1288  *         #GNUNET_NO if there are no more messages in queue.
1289  *         #GNUNET_SYSERR if the next message is malformed.
1290  */
1291 static int
1292 psyc_transmit_queue_next_method (struct Place *plc,
1293                                  struct GNUNET_PSYC_MessageMethod **pmeth)
1294 {
1295   struct MessageTransmitQueue *tmit_msg = plc->tmit_msgs_head;
1296   if (NULL == tmit_msg)
1297     return GNUNET_NO;
1298
1299   struct FragmentTransmitQueue *tmit_frag = tmit_msg->frags_head;
1300   if (NULL == tmit_frag)
1301   {
1302     GNUNET_break (0);
1303     return GNUNET_NO;
1304   }
1305
1306   struct GNUNET_MessageHeader *pmsg = tmit_frag->next_part;
1307   if (NULL == pmsg
1308       || GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD != ntohs (pmsg->type))
1309   {
1310     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1311                 "%p psyc_transmit_queue_next_method: unexpected message part of type %u.\n",
1312                 plc, NULL != pmsg ? ntohs (pmsg->type) : 0);
1313     GNUNET_break (0);
1314     return GNUNET_SYSERR;
1315   }
1316
1317   uint16_t psize = ntohs (pmsg->size);
1318   *pmeth = (struct GNUNET_PSYC_MessageMethod *) pmsg;
1319   if (psize < sizeof (**pmeth) + 1 || '\0' != *((char *) *pmeth + psize - 1))
1320   {
1321     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1322                 "%p psyc_transmit_queue_next_method: invalid method name.\n",
1323                 plc, ntohs (pmsg->type));
1324     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1325                 "%u <= %u || NUL != %u\n",
1326                 sizeof (**pmeth), psize, *((char *) *pmeth + psize - 1));
1327     GNUNET_break (0);
1328     return GNUNET_SYSERR;
1329   }
1330
1331   psyc_transmit_queue_next_part (plc, tmit_msg, tmit_frag);
1332   return GNUNET_OK;
1333 }
1334
1335
1336 /**
1337  * Transmit the next message in queue from the host to the PSYC channel.
1338  */
1339 static int
1340 psyc_master_transmit_message (struct Host *hst)
1341 {
1342
1343   if (NULL == hst->tmit_handle)
1344   {
1345     struct GNUNET_PSYC_MessageMethod *pmeth = NULL;
1346     int ret = psyc_transmit_queue_next_method (&hst->plc, &pmeth);
1347     if (GNUNET_OK != ret)
1348       return ret;
1349
1350     hst->tmit_handle
1351       = GNUNET_PSYC_master_transmit (hst->master, (const char *) &pmeth[1],
1352                                      &host_transmit_notify_mod,
1353                                      &host_transmit_notify_data, hst,
1354                                      pmeth->flags);
1355   }
1356   else
1357   {
1358     GNUNET_PSYC_master_transmit_resume (hst->tmit_handle);
1359   }
1360   return GNUNET_OK;
1361 }
1362
1363
1364 /**
1365  * Transmit the next message in queue from a guest to the PSYC channel.
1366  */
1367 static int
1368 psyc_slave_transmit_message (struct Guest *gst)
1369 {
1370   if (NULL == gst->tmit_handle)
1371   {
1372     struct GNUNET_PSYC_MessageMethod *pmeth = NULL;
1373     int ret = psyc_transmit_queue_next_method (&gst->plc, &pmeth);
1374     if (GNUNET_OK != ret)
1375       return ret;
1376
1377     gst->tmit_handle
1378       = GNUNET_PSYC_slave_transmit (gst->slave, (const char *) &pmeth[1],
1379                                     &guest_transmit_notify_mod,
1380                                     &guest_transmit_notify_data, gst,
1381                                     pmeth->flags);
1382   }
1383   else
1384   {
1385     GNUNET_PSYC_slave_transmit_resume (gst->tmit_handle);
1386   }
1387   return GNUNET_OK;
1388 }
1389
1390
1391 /**
1392  * Transmit a message to PSYC.
1393  */
1394 static int
1395 psyc_transmit_message (struct Place *plc)
1396 {
1397   return
1398     (plc->is_host)
1399     ? psyc_master_transmit_message ((struct Host *) plc)
1400     : psyc_slave_transmit_message ((struct Guest *) plc);
1401 }
1402
1403
1404 /**
1405  * Queue message parts for sending to PSYC.
1406  *
1407  * @param plc          Place to send to.
1408  * @param client       Client the message originates from.
1409  * @param data_size    Size of @a data.
1410  * @param data         Concatenated message parts.
1411  * @param first_ptype  First message part type in @a data.
1412  * @param last_ptype   Last message part type in @a data.
1413  */
1414 static struct MessageTransmitQueue *
1415 psyc_transmit_queue_message (struct Place *plc,
1416                              struct GNUNET_SERVER_Client *client,
1417                              size_t data_size,
1418                              const void *data,
1419                              uint16_t first_ptype, uint16_t last_ptype,
1420                              struct MessageTransmitQueue *tmit_msg)
1421 {
1422   if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1423   {
1424     tmit_msg = GNUNET_malloc (sizeof (*tmit_msg));
1425     GNUNET_CONTAINER_DLL_insert_tail (plc->tmit_msgs_head, plc->tmit_msgs_tail, tmit_msg);
1426   }
1427   else if (NULL == tmit_msg)
1428   {
1429     return NULL;
1430   }
1431
1432   struct FragmentTransmitQueue *
1433     tmit_frag = GNUNET_malloc (sizeof (*tmit_frag) + data_size);
1434   memcpy (&tmit_frag[1], data, data_size);
1435   tmit_frag->next_part = (struct GNUNET_MessageHeader *) &tmit_frag[1];
1436   tmit_frag->client = client;
1437   tmit_frag->size = data_size;
1438
1439   GNUNET_CONTAINER_DLL_insert_tail (tmit_msg->frags_head, tmit_msg->frags_tail, tmit_frag);
1440   tmit_msg->client = client;
1441   return tmit_msg;
1442 }
1443
1444
1445 /**
1446  * Cancel transmission of current message to PSYC.
1447  *
1448  * @param plc     Place to send to.
1449  * @param client  Client the message originates from.
1450  */
1451 static void
1452 psyc_transmit_cancel (struct Place *plc, struct GNUNET_SERVER_Client *client)
1453 {
1454   uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
1455
1456   struct GNUNET_MessageHeader msg;
1457   msg.size = htons (sizeof (msg));
1458   msg.type = htons (type);
1459
1460   psyc_transmit_queue_message (plc, client, sizeof (msg), &msg, type, type, NULL);
1461   psyc_transmit_message (plc);
1462
1463   /* FIXME: cleanup */
1464 }
1465
1466
1467 /**
1468  * Handle an incoming message from a client, to be transmitted to the place.
1469  */
1470 static void
1471 client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
1472                           const struct GNUNET_MessageHeader *msg)
1473 {
1474   struct Client *
1475     ctx = GNUNET_SERVER_client_get_user_context (client, struct Client);
1476   GNUNET_assert (NULL != ctx);
1477   struct Place *plc = ctx->plc;
1478   int ret = GNUNET_SYSERR;
1479
1480   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1481               "%p Received message from client.\n", plc);
1482   GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
1483
1484   if (GNUNET_YES != plc->is_ready)
1485   {
1486     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1487                 "%p Place is not ready yet, disconnecting client.\n", plc);
1488     GNUNET_break (0);
1489     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1490     return;
1491   }
1492
1493   uint16_t size = ntohs (msg->size);
1494   uint16_t psize = size - sizeof (*msg);
1495   if (psize < sizeof (struct GNUNET_MessageHeader)
1496       || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < psize)
1497   {
1498     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1499                 "%p Received message with invalid payload size (%u) from client.\n",
1500                 plc, psize);
1501     GNUNET_break (0);
1502     psyc_transmit_cancel (plc, client);
1503     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1504     return;
1505   }
1506
1507   uint16_t first_ptype = 0, last_ptype = 0;
1508   if (GNUNET_SYSERR
1509       == GNUNET_PSYC_receive_check_parts (psize, (const char *) &msg[1],
1510                                           &first_ptype, &last_ptype))
1511   {
1512     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1513                 "%p Received invalid message part from client.\n", plc);
1514     GNUNET_break (0);
1515     psyc_transmit_cancel (plc, client);
1516     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1517     return;
1518   }
1519   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1520               "%p Received message with first part type %u and last part type %u.\n",
1521               plc, first_ptype, last_ptype);
1522
1523   ctx->tmit_msg
1524     = psyc_transmit_queue_message (plc, client, psize, &msg[1],
1525                                    first_ptype, last_ptype, ctx->tmit_msg);
1526   if (NULL != ctx->tmit_msg)
1527   {
1528     if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END <= last_ptype)
1529       ctx->tmit_msg = NULL;
1530     ret = psyc_transmit_message (plc);
1531   }
1532
1533   if (GNUNET_OK != ret)
1534   {
1535     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1536                 "%p Received invalid message part from client.\n", plc);
1537     GNUNET_break (0);
1538     psyc_transmit_cancel (plc, client);
1539     ret = GNUNET_SYSERR;
1540   }
1541   GNUNET_SERVER_receive_done (client, ret);
1542 }
1543
1544
1545 /**
1546  * A historic message arrived from PSYC.
1547  */
1548 static void
1549 psyc_recv_history_message (void *cls, uint64_t message_id, uint32_t flags,
1550                            const struct GNUNET_PSYC_MessageHeader *msg)
1551 {
1552   struct OperationClosure *opcls = cls;
1553   struct Place *plc = opcls->plc;
1554
1555   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1556               "%p Received historic message #%" PRId64 " (flags: %x)\n",
1557               plc, message_id, flags);
1558
1559   uint16_t size = ntohs (msg->header.size);
1560
1561   struct GNUNET_OperationResultMessage *
1562     res = GNUNET_malloc (sizeof (*res) + size);
1563   res->header.size = htons (sizeof (*res) + size);
1564   res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT);
1565   res->op_id = opcls->op_id;
1566   res->result_code = GNUNET_htonll (GNUNET_OK);
1567
1568   memcpy (&res[1], msg, size);
1569
1570   /** @todo FIXME: send only to requesting client */
1571   client_send_msg (plc, &res->header);
1572 }
1573
1574
1575 /**
1576  * Result of message history replay from PSYC.
1577  */
1578 static void
1579 psyc_recv_history_result (void *cls, int64_t result,
1580                           const void *err_msg, uint16_t err_msg_size)
1581 {
1582   struct OperationClosure *opcls = cls;
1583   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1584               "%p History replay #%" PRIu64 ": "
1585               "PSYCstore returned %" PRId64 " (%.*s)\n",
1586               opcls->plc, GNUNET_ntohll (opcls->op_id), result, err_msg_size, err_msg);
1587
1588   // FIXME: place might have been destroyed
1589   client_send_result (opcls->client, opcls->op_id, result, err_msg, err_msg_size);
1590 }
1591
1592
1593 /**
1594  * Client requests channel history.
1595  */
1596 static void
1597 client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client,
1598                             const struct GNUNET_MessageHeader *msg)
1599 {
1600   struct Client *
1601     ctx = GNUNET_SERVER_client_get_user_context (client, struct Client);
1602   GNUNET_assert (NULL != ctx);
1603   struct Place *plc = ctx->plc;
1604
1605   const struct GNUNET_PSYC_HistoryRequestMessage *
1606     req = (const struct GNUNET_PSYC_HistoryRequestMessage *) msg;
1607   uint16_t size = ntohs (msg->size);
1608   const char *method_prefix = (const char *) &req[1];
1609
1610   if (size < sizeof (*req) + 1
1611       || '\0' != method_prefix[size - sizeof (*req) - 1])
1612   {
1613     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1614                 "%p History replay #%" PRIu64 ": "
1615                 "invalid method prefix. size: %u < %u?\n",
1616                 plc, GNUNET_ntohll (req->op_id), size, sizeof (*req) + 1);
1617     GNUNET_break (0);
1618     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1619     return;
1620   }
1621
1622   struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls));
1623   opcls->client = client;
1624   opcls->plc = plc;
1625   opcls->op_id = req->op_id;
1626   opcls->flags = ntohl (req->flags);
1627
1628   if (0 == req->message_limit)
1629     GNUNET_PSYC_channel_history_replay (plc->channel,
1630                                         GNUNET_ntohll (req->start_message_id),
1631                                         GNUNET_ntohll (req->end_message_id),
1632                                         method_prefix, opcls->flags,
1633                                         &psyc_recv_history_message, NULL,
1634                                         &psyc_recv_history_result, opcls);
1635   else
1636     GNUNET_PSYC_channel_history_replay_latest (plc->channel,
1637                                                GNUNET_ntohll (req->message_limit),
1638                                                method_prefix, opcls->flags,
1639                                                &psyc_recv_history_message, NULL,
1640                                                &psyc_recv_history_result, opcls);
1641
1642   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1643 }
1644
1645
1646 /**
1647  * A state variable part arrived from PSYC.
1648  */
1649 void
1650 psyc_recv_state_var (void *cls,
1651                      const struct GNUNET_MessageHeader *mod,
1652                      const char *name,
1653                      const void *value,
1654                      uint32_t value_size,
1655                      uint32_t full_value_size)
1656 {
1657   struct OperationClosure *opcls = cls;
1658   struct Place *plc = opcls->plc;
1659
1660   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1661               "%p Received state variable %s from PSYC\n",
1662               plc, name);
1663
1664   uint16_t size = ntohs (mod->size);
1665
1666   struct GNUNET_OperationResultMessage *
1667     res = GNUNET_malloc (sizeof (*res) + size);
1668   res->header.size = htons (sizeof (*res) + size);
1669   res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
1670   res->op_id = opcls->op_id;
1671   res->result_code = GNUNET_htonll (GNUNET_OK);
1672
1673   memcpy (&res[1], mod, size);
1674
1675   /** @todo FIXME: send only to requesting client */
1676   client_send_msg (plc, &res->header);
1677 }
1678
1679
1680 /**
1681  * Result of retrieving state variable from PSYC.
1682  */
1683 static void
1684 psyc_recv_state_result (void *cls, int64_t result,
1685                         const void *err_msg, uint16_t err_msg_size)
1686 {
1687   struct OperationClosure *opcls = cls;
1688   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1689               "%p State get #%" PRIu64 ": "
1690               "PSYCstore returned %" PRId64 " (%.*s)\n",
1691               opcls->plc, GNUNET_ntohll (opcls->op_id), result, err_msg_size, err_msg);
1692
1693   // FIXME: place might have been destroyed
1694   client_send_result (opcls->client, opcls->op_id, result, err_msg, err_msg_size);
1695 }
1696
1697
1698 /**
1699  * Client requests channel history.
1700  */
1701 static void
1702 client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client,
1703                        const struct GNUNET_MessageHeader *msg)
1704 {
1705   struct Client *
1706     ctx = GNUNET_SERVER_client_get_user_context (client, struct Client);
1707   GNUNET_assert (NULL != ctx);
1708   struct Place *plc = ctx->plc;
1709
1710   const struct GNUNET_PSYC_StateRequestMessage *
1711     req = (const struct GNUNET_PSYC_StateRequestMessage *) msg;
1712   uint16_t size = ntohs (msg->size);
1713   const char *name = (const char *) &req[1];
1714
1715   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1716               "%p State get #%" PRIu64 ": %s\n",
1717               plc, GNUNET_ntohll (req->op_id), name);
1718
1719   if (size < sizeof (*req) + 1
1720       || '\0' != name[size - sizeof (*req) - 1])
1721   {
1722     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1723                 "%p State get #%" PRIu64 ": "
1724                 "invalid name. size: %u < %u?\n",
1725                 plc, GNUNET_ntohll (req->op_id), size, sizeof (*req) + 1);
1726     GNUNET_break (0);
1727     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1728     return;
1729   }
1730
1731   struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls));
1732   opcls->client = client;
1733   opcls->plc = plc;
1734   opcls->op_id = req->op_id;
1735
1736   switch (ntohs (msg->type))
1737   {
1738   case GNUNET_MESSAGE_TYPE_PSYC_STATE_GET:
1739       GNUNET_PSYC_channel_state_get (plc->channel, name,
1740                                      psyc_recv_state_var,
1741                                      psyc_recv_state_result, opcls);
1742       break;
1743
1744   case GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX:
1745       GNUNET_PSYC_channel_state_get_prefix (plc->channel, name,
1746                                             psyc_recv_state_var,
1747                                             psyc_recv_state_result, opcls);
1748       break;
1749
1750   default:
1751       GNUNET_assert (0);
1752   }
1753
1754   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1755 }
1756
1757
1758 static const struct GNUNET_SERVER_MessageHandler handlers[] = {
1759   { &client_recv_host_enter, NULL,
1760     GNUNET_MESSAGE_TYPE_SOCIAL_HOST_ENTER, 0 },
1761
1762   { &client_recv_guest_enter, NULL,
1763     GNUNET_MESSAGE_TYPE_SOCIAL_GUEST_ENTER, 0 },
1764
1765   { &client_recv_join_decision, NULL,
1766     GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
1767
1768   { &client_recv_psyc_message, NULL,
1769     GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
1770
1771   { &client_recv_history_replay, NULL,
1772     GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY, 0 },
1773
1774   { &client_recv_state_get, NULL,
1775     GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
1776
1777   { &client_recv_state_get, NULL,
1778     GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 },
1779
1780   { NULL, NULL, 0, 0 }
1781 };
1782
1783
1784 /**
1785  * Initialize the PSYC service.
1786  *
1787  * @param cls Closure.
1788  * @param server The initialized server.
1789  * @param c Configuration to use.
1790  */
1791 static void
1792 run (void *cls, struct GNUNET_SERVER_Handle *server,
1793      const struct GNUNET_CONFIGURATION_Handle *c)
1794 {
1795   cfg = c;
1796   stats = GNUNET_STATISTICS_create ("social", cfg);
1797   hosts = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1798   guests = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1799   place_guests = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1800   nc = GNUNET_SERVER_notification_context_create (server, 1);
1801   GNUNET_SERVER_add_handlers (server, handlers);
1802   GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
1803   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
1804                                 &shutdown_task, NULL);
1805 }
1806
1807
1808 /**
1809  * The main function for the service.
1810  *
1811  * @param argc number of arguments from the command line
1812  * @param argv command line arguments
1813  * @return 0 ok, 1 on error
1814  */
1815 int
1816 main (int argc, char *const *argv)
1817 {
1818   return (GNUNET_OK ==
1819           GNUNET_SERVICE_run (argc, argv, "social",
1820                               GNUNET_SERVICE_OPTION_NONE,
1821                               &run, NULL)) ? 0 : 1;
1822 }
1823
1824 /* end of gnunet-service-social.c */