adding extended proxy support for http(s) client
[oweals/gnunet.git] / src / psyc / gnunet-service-psyc.c
1 /*
2  * This file is part of GNUnet
3  * (C) 2013 Christian Grothoff (and other contributing authors)
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * @file psyc/gnunet-service-psyc.c
23  * @brief PSYC service
24  * @author Gabor X Toth
25  */
26
27 #include <inttypes.h>
28
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_statistics_service.h"
34 #include "gnunet_multicast_service.h"
35 #include "gnunet_psycstore_service.h"
36 #include "gnunet_psyc_service.h"
37 #include "psyc.h"
38
39
40 /**
41  * Handle to our current configuration.
42  */
43 static const struct GNUNET_CONFIGURATION_Handle *cfg;
44
45 /**
46  * Handle to the statistics service.
47  */
48 static struct GNUNET_STATISTICS_Handle *stats;
49
50 /**
51  * Notification context, simplifies client broadcasts.
52  */
53 static struct GNUNET_SERVER_NotificationContext *nc;
54
55 /**
56  * Handle to the PSYCstore.
57  */
58 static struct GNUNET_PSYCSTORE_Handle *store;
59
60 /**
61  * All connected masters and slaves.
62  * Channel's pub_key_hash -> struct Channel
63  */
64 static struct GNUNET_CONTAINER_MultiHashMap *clients;
65
66
67 /**
68  * Message in the transmission queue.
69  */
70 struct TransmitMessage
71 {
72   struct TransmitMessage *prev;
73   struct TransmitMessage *next;
74
75   /**
76    * Buffer with message to be transmitted.
77    */
78   char *buf;
79
80   /**
81    * Size of @a buf
82    */
83   uint16_t size
84 ;
85   /**
86    * @see enum MessageState
87    */
88   uint8_t state;
89 };
90
91
92 /**
93  * Cache for received message fragments.
94  * Message fragments are only sent to clients after all modifiers arrived.
95  *
96  * chan_key -> MultiHashMap chan_msgs
97  */
98 static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
99
100
101 /**
102  * Entry in the chan_msgs hashmap of @a recv_cache:
103  * fragment_id -> FragmentEntry
104  */
105 struct FragmentEntry
106 {
107   struct GNUNET_MULTICAST_MessageHeader *mmsg;
108   uint16_t ref_count;
109 };
110
111
112 /**
113  * Entry in the @a recv_msgs hash map of a @a Channel.
114  * message_id -> FragmentCache
115  */
116 struct FragmentCache
117 {
118   /**
119    * Total size of header fragments (METHOD & MODIFIERs)
120    */
121   uint64_t header_size;
122
123   /**
124    * Fragment IDs stored in @a recv_cache.
125    */
126   struct GNUNET_CONTAINER_Heap *fragments;
127 };
128
129
130 /**
131  * Common part of the client context for both a master and slave channel.
132  */
133 struct Channel
134 {
135   struct GNUNET_SERVER_Client *client;
136
137   struct TransmitMessage *tmit_head;
138   struct TransmitMessage *tmit_tail;
139
140   /**
141    * Received fragments not yet sent to the client.
142    * message_id -> FragmentCache
143    */
144   struct GNUNET_CONTAINER_MultiHashMap *recv_msgs;
145
146   /**
147    * FIXME
148    */
149   GNUNET_SCHEDULER_TaskIdentifier tmit_task;
150
151   /**
152    * Public key of the channel.
153    */
154   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
155
156   /**
157    * Hash of @a pub_key.
158    */
159   struct GNUNET_HashCode pub_key_hash;
160
161   /**
162    * Expected value size for the modifier being received from the PSYC service.
163    */
164   uint32_t tmit_mod_value_size_expected;
165
166   /**
167    * Actual value size for the modifier being received from the PSYC service.
168    */
169   uint32_t tmit_mod_value_size;
170
171   /**
172    * @see enum MessageState
173    */
174   uint8_t tmit_state;
175
176   /**
177    * FIXME
178    */
179   uint8_t in_transmit;
180
181   /**
182    * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
183    */
184   uint8_t is_master;
185
186   /**
187    * Ready to receive messages from client? #GNUNET_YES or #GNUNET_NO
188    */
189   uint8_t ready;
190
191   /**
192    * Is the client disconnected? #GNUNET_YES or #GNUNET_NO
193    */
194   uint8_t disconnected;
195 };
196
197
198 /**
199  * Client context for a channel master.
200  */
201 struct Master
202 {
203   /**
204    * Channel struct common for Master and Slave
205    */
206   struct Channel channel;
207
208   /**
209    * Private key of the channel.
210    */
211   struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
212
213   /**
214    * Handle for the multicast origin.
215    */
216   struct GNUNET_MULTICAST_Origin *origin;
217
218   /**
219    * Transmit handle for multicast.
220    */
221   struct GNUNET_MULTICAST_OriginMessageHandle *tmit_handle;
222
223   /**
224    * Maximum message ID for this channel.
225    *
226    * Incremented before sending a message, thus the message_id in messages sent
227    * starts from 1.
228    */
229   uint64_t max_message_id;
230
231   /**
232    * ID of the last message that contains any state operations.
233    * 0 if there is no such message.
234    */
235   uint64_t max_state_message_id;
236
237   /**
238    * Maximum group generation for this channel.
239    */
240   uint64_t max_group_generation;
241
242   /**
243    * @see enum GNUNET_PSYC_Policy
244    */
245   uint32_t policy;
246 };
247
248
249 /**
250  * Client context for a channel slave.
251  */
252 struct Slave
253 {
254   /**
255    * Channel struct common for Master and Slave
256    */
257   struct Channel channel;
258
259   /**
260    * Private key of the slave.
261    */
262   struct GNUNET_CRYPTO_EddsaPrivateKey slave_key;
263
264   /**
265    * Handle for the multicast member.
266    */
267   struct GNUNET_MULTICAST_Member *member;
268
269   /**
270    * Transmit handle for multicast.
271    */
272   struct GNUNET_MULTICAST_MemberRequestHandle *tmit_handle;
273
274   /**
275    * Peer identity of the origin.
276    */
277   struct GNUNET_PeerIdentity origin;
278
279   /**
280    * Number of items in @a relays.
281    */
282   uint32_t relay_count;
283
284   /**
285    * Relays that multicast can use to connect.
286    */
287   struct GNUNET_PeerIdentity *relays;
288
289   /**
290    * Join request to be transmitted to the master on join.
291    */
292   struct GNUNET_MessageHeader *join_req;
293
294   /**
295    * Maximum message ID for this channel.
296    */
297   uint64_t max_message_id;
298
299   /**
300    * Maximum request ID for this channel.
301    */
302   uint64_t max_request_id;
303 };
304
305
306 static inline void
307 transmit_message (struct Channel *ch, uint8_t inc_msg_id);
308
309
310 /**
311  * Task run during shutdown.
312  *
313  * @param cls unused
314  * @param tc unused
315  */
316 static void
317 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
318 {
319   if (NULL != nc)
320   {
321     GNUNET_SERVER_notification_context_destroy (nc);
322     nc = NULL;
323   }
324   if (NULL != stats)
325   {
326     GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
327     stats = NULL;
328   }
329 }
330
331
332 static void
333 client_cleanup (struct Channel *ch)
334 {
335   /* FIXME: fragment_cache_clear */
336
337   if (ch->is_master)
338   {
339     struct Master *mst = (struct Master *) ch;
340     if (NULL != mst->origin)
341       GNUNET_MULTICAST_origin_stop (mst->origin);
342     GNUNET_CONTAINER_multihashmap_remove (clients, &ch->pub_key_hash, mst);
343   }
344   else
345   {
346     struct Slave *slv = (struct Slave *) ch;
347     if (NULL != slv->join_req)
348       GNUNET_free (slv->join_req);
349     if (NULL != slv->relays)
350       GNUNET_free (slv->relays);
351     if (NULL != slv->member)
352       GNUNET_MULTICAST_member_part (slv->member);
353   }
354
355   GNUNET_free (ch);
356 }
357
358
359 /**
360  * Called whenever a client is disconnected.
361  * Frees our resources associated with that client.
362  *
363  * @param cls Closure.
364  * @param client Identification of the client.
365  */
366 static void
367 client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
368 {
369   if (NULL == client)
370     return;
371
372   struct Channel *ch
373     = GNUNET_SERVER_client_get_user_context (client, struct Channel);
374   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Client disconnected\n", ch);
375
376   if (NULL == ch)
377   {
378     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
379                 "%p User context is NULL in client_disconnect()\n", ch);
380     GNUNET_break (0);
381     return;
382   }
383
384   ch->disconnected = GNUNET_YES;
385
386   /* Send pending messages to multicast before cleanup. */
387   if (NULL != ch->tmit_head)
388   {
389     transmit_message (ch, GNUNET_NO);
390   }
391   else
392   {
393     client_cleanup (ch);
394   }
395 }
396
397
398 /**
399  * Master receives a join request from a slave.
400  */
401 static void
402 join_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
403          const struct GNUNET_MessageHeader *join_req,
404          struct GNUNET_MULTICAST_JoinHandle *jh)
405 {
406
407 }
408
409
410 static void
411 membership_test_cb (void *cls,
412                     const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
413                     uint64_t message_id, uint64_t group_generation,
414                     struct GNUNET_MULTICAST_MembershipTestHandle *mth)
415 {
416
417 }
418
419
420 static void
421 replay_fragment_cb (void *cls,
422                     const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
423                     uint64_t fragment_id, uint64_t flags,
424                     struct GNUNET_MULTICAST_ReplayHandle *rh)
425
426 {
427 }
428
429
430 static void
431 replay_message_cb (void *cls,
432                    const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
433                    uint64_t message_id,
434                    uint64_t fragment_offset,
435                    uint64_t flags,
436                    struct GNUNET_MULTICAST_ReplayHandle *rh)
437 {
438
439 }
440
441
442 static void
443 fragment_store_result (void *cls, int64_t result, const char *err_msg)
444 {
445   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
446               "fragment_store() returned %l (%s)\n", result, err_msg);
447 }
448
449
450 static void
451 message_to_client (struct Channel *ch,
452                    const struct GNUNET_MULTICAST_MessageHeader *mmsg)
453 {
454   uint16_t size = ntohs (mmsg->header.size);
455   struct GNUNET_PSYC_MessageHeader *pmsg;
456   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
457
458   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
459               "%p Sending message to client. "
460               "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
461               ch, GNUNET_ntohll (mmsg->fragment_id),
462               GNUNET_ntohll (mmsg->message_id));
463
464   pmsg = GNUNET_malloc (psize);
465   pmsg->header.size = htons (psize);
466   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
467   pmsg->message_id = mmsg->message_id;
468
469   memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
470
471   GNUNET_SERVER_notification_context_add (nc, ch->client);
472   GNUNET_SERVER_notification_context_unicast (nc, ch->client,
473                                               (const struct GNUNET_MessageHeader *) pmsg,
474                                               GNUNET_NO);
475   GNUNET_free (pmsg);
476 }
477
478
479 /**
480  * Convert an uint64_t in network byte order to a HashCode
481  * that can be used as key in a MultiHashMap
482  */
483 static inline void
484 hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
485 {
486   /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
487
488   n = ((n <<  8) & 0xFF00FF00FF00FF00ULL) | ((n >>  8) & 0x00FF00FF00FF00FFULL);
489   n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
490
491   *key = (struct GNUNET_HashCode) {{ 0 }};
492   *((uint64_t *) key)
493     = (n << 32) | (n >> 32);
494 }
495
496
497 /**
498  * Convert an uint64_t in host byte order to a HashCode
499  * that can be used as key in a MultiHashMap
500  */
501 static inline void
502 hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
503 {
504 #if __BYTE_ORDER == __BIG_ENDIAN
505   hash_key_from_nll (key, n);
506 #elif __BYTE_ORDER == __LITTLE_ENDIAN
507   *key = (struct GNUNET_HashCode) {{ 0 }};
508   *((uint64_t *) key) = n;
509 #else
510   #error byteorder undefined
511 #endif
512 }
513
514
515 static void
516 fragment_cache_insert (struct Channel *ch,
517                        const struct GNUNET_HashCode *msg_id,
518                        struct FragmentCache *frag_cache,
519                        const struct GNUNET_MULTICAST_MessageHeader *mmsg,
520                        uint16_t last_part_type)
521 {
522   uint16_t size = ntohs (mmsg->header.size);
523   struct GNUNET_CONTAINER_MultiHashMap
524     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
525                                                     &ch->pub_key_hash);
526
527   if (NULL == frag_cache)
528   {
529     frag_cache = GNUNET_new (struct FragmentCache);
530     frag_cache->fragments
531       = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
532
533     if (NULL == ch->recv_msgs)
534     {
535       ch->recv_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
536     }
537     GNUNET_CONTAINER_multihashmap_put (ch->recv_msgs, msg_id, frag_cache,
538                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
539
540     if (NULL == chan_msgs)
541     {
542       chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
543       GNUNET_CONTAINER_multihashmap_put (recv_cache, &ch->pub_key_hash, chan_msgs,
544                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
545     }
546   }
547
548   struct GNUNET_HashCode *frag_id = GNUNET_new (struct GNUNET_HashCode);
549   hash_key_from_nll (frag_id, mmsg->fragment_id);
550   struct FragmentEntry
551     *frag_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, frag_id);
552   if (NULL == frag_entry)
553   {
554     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
555                 "%p Adding message fragment to cache. "
556                 "fragment_id: %" PRIu64 ", "
557                 "header_size: %" PRIu64 " + %" PRIu64 ").\n",
558                 ch, GNUNET_ntohll (mmsg->fragment_id),
559                 frag_cache->header_size, size);
560     frag_entry = GNUNET_new (struct FragmentEntry);
561     frag_entry->ref_count = 1;
562     frag_entry->mmsg = GNUNET_malloc (size);
563     memcpy (frag_entry->mmsg, mmsg, size);
564     GNUNET_CONTAINER_multihashmap_put (chan_msgs, frag_id, frag_entry,
565                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
566   }
567   else
568   {
569     frag_entry->ref_count++;
570     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
571                 "%p Message fragment already in cache. "
572                 "fragment_id: %" PRIu64 ", ref_count: %u\n",
573                 ch, GNUNET_ntohll (mmsg->fragment_id), frag_entry->ref_count);
574   }
575
576   switch (last_part_type)
577   {
578   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
579   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
580   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
581     frag_cache->header_size += size;
582   }
583   GNUNET_CONTAINER_heap_insert (frag_cache->fragments, frag_id,
584                                 GNUNET_ntohll (mmsg->fragment_id));
585 }
586
587
588 static void
589 fragment_cache_clear (struct Channel *ch,
590                       const struct GNUNET_HashCode *msg_id,
591                       struct FragmentCache *frag_cache,
592                       uint8_t send_to_client)
593 {
594   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
595               "%p Clearing message fragment cache.\n", ch);
596
597   struct GNUNET_CONTAINER_MultiHashMap
598     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
599                                                     &ch->pub_key_hash);
600   GNUNET_assert (NULL != chan_msgs);
601   struct GNUNET_HashCode *frag_id;
602
603   while ((frag_id = GNUNET_CONTAINER_heap_remove_root (frag_cache->fragments)))
604   {
605     struct FragmentEntry
606       *frag_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, frag_id);
607     if (frag_entry != NULL)
608     {
609       if (GNUNET_YES == send_to_client)
610       {
611         message_to_client (ch, frag_entry->mmsg);
612       }
613       if (1 == frag_entry->ref_count)
614       {
615         GNUNET_CONTAINER_multihashmap_remove (chan_msgs, frag_id, frag_entry);
616         GNUNET_free (frag_entry->mmsg);
617         GNUNET_free (frag_entry);
618       }
619       else
620       {
621         frag_entry->ref_count--;
622       }
623     }
624     GNUNET_free (frag_id);
625   }
626
627   GNUNET_CONTAINER_multihashmap_remove (ch->recv_msgs, msg_id, frag_cache);
628   GNUNET_CONTAINER_heap_destroy (frag_cache->fragments);
629   GNUNET_free (frag_cache);
630 }
631
632
633 /**
634  * Incoming message fragment from multicast.
635  *
636  * Store it using PSYCstore and send it to the client of the channel.
637  */
638 static void
639 message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
640 {
641   struct Channel *ch = cls;
642   uint16_t type = ntohs (msg->type);
643   uint16_t size = ntohs (msg->size);
644
645   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
646               "%p Received message of type %u and size %u from multicast.\n",
647               ch, type, size);
648
649   switch (type)
650   {
651   case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE:
652   {
653     GNUNET_PSYCSTORE_fragment_store (store, &ch->pub_key,
654                                      (const struct
655                                       GNUNET_MULTICAST_MessageHeader *) msg,
656                                      0, NULL, NULL);
657
658 #if TODO
659     /* FIXME: apply modifiers to state in PSYCstore */
660     GNUNET_PSYCSTORE_state_modify (store, &ch->pub_key,
661                                    GNUNET_ntohll (mmsg->message_id),
662                                    meth->mod_count, mods,
663                                    rcb, rcb_cls);
664 #endif
665
666     const struct GNUNET_MULTICAST_MessageHeader
667       *mmsg = (const struct GNUNET_MULTICAST_MessageHeader *) msg;
668
669     uint16_t ptype = GNUNET_PSYC_message_last_part (size - sizeof (*mmsg),
670                                                     (const char *) &mmsg[1]);
671     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
672                 "Last message part type %u\n", ptype);
673
674     if (GNUNET_NO == ptype)
675     {
676       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
677                   "%p Received message with invalid parts from multicast. "
678                   "Dropping message.\n", ch);
679       GNUNET_break_op (0);
680       break;
681     }
682
683     struct GNUNET_HashCode msg_id;
684     hash_key_from_nll (&msg_id, mmsg->message_id);
685
686     struct FragmentCache *frag_cache = NULL;
687     if (NULL != ch->recv_msgs)
688       frag_cache = GNUNET_CONTAINER_multihashmap_get (ch->recv_msgs, &msg_id);
689
690     switch (ptype)
691     {
692     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
693     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
694       /* FIXME: check state flag / max_state_message_id */
695       if (NULL == frag_cache)
696       {
697         message_to_client (ch, mmsg);
698         break;
699       }
700       else
701       {
702         if (GNUNET_ntohll (mmsg->fragment_offset) == frag_cache->header_size)
703         { /* first data fragment after the header, send cached fragments */
704           fragment_cache_clear (ch, &msg_id, frag_cache, GNUNET_YES);
705           message_to_client (ch, mmsg);
706           break;
707         }
708         else
709         { /* still missing fragments from the header, cache data fragment */
710           /* fall thru */
711         }
712       }
713
714     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
715     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
716     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
717       /* not all modifiers arrived yet, cache fragment */
718       fragment_cache_insert (ch, &msg_id, frag_cache, mmsg, ptype);
719       break;
720
721     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
722       if (NULL != frag_cache)
723       { /* fragments not yet sent to client, remove from cache */
724         fragment_cache_clear (ch, &msg_id, frag_cache, GNUNET_NO);
725       }
726       else
727       {
728         message_to_client (ch, mmsg);
729       }
730       break;
731     }
732     break;
733   }
734   default:
735     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
736                 "%p Dropping unknown message of type %u and size %u.\n",
737                 ch, type, size);
738   }
739 }
740
741
742 /**
743  * Incoming request fragment from multicast for a master.
744  *
745  * @param cls           Master.
746  * @param slave_key     Sending slave's public key.
747  * @param msg           The message.
748  * @param flags         Request flags.
749  */
750 static void
751 request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
752             const struct GNUNET_MessageHeader *msg,
753             enum GNUNET_MULTICAST_MessageFlags flags)
754 {
755   struct Master *mst = cls;
756   struct Channel *ch = &mst->channel;
757
758   uint16_t type = ntohs (msg->type);
759   uint16_t size = ntohs (msg->size);
760
761   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
762               "%p Received request of type %u and size %u from multicast.\n",
763               ch, type, size);
764
765   switch (type)
766   {
767   case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST:
768   {
769     const struct GNUNET_MULTICAST_RequestHeader *req
770       = (const struct GNUNET_MULTICAST_RequestHeader *) msg;
771
772     /* FIXME: see message_cb() */
773     if (GNUNET_NO == GNUNET_PSYC_message_last_part (size - sizeof (*req),
774                                                     (const char *) &req[1]))
775     {
776       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
777                   "%p Dropping message with invalid parts "
778                   "received from multicast.\n", ch);
779       GNUNET_break_op (0);
780       break;
781     }
782
783     struct GNUNET_PSYC_MessageHeader *pmsg;
784     uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
785     pmsg = GNUNET_malloc (psize);
786     pmsg->header.size = htons (psize);
787     pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
788     pmsg->message_id = req->request_id;
789     pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
790
791     memcpy (&pmsg[1], &req[1], size - sizeof (*req));
792
793     GNUNET_SERVER_notification_context_add (nc, ch->client);
794     GNUNET_SERVER_notification_context_unicast (nc, ch->client,
795                                                 (const struct GNUNET_MessageHeader *) pmsg,
796                                                 GNUNET_NO);
797     GNUNET_free (pmsg);
798     break;
799   }
800   default:
801     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
802                 "%p Dropping unknown request of type %u and size %u.\n",
803                 ch, type, size);
804     GNUNET_break_op (0);
805   }
806 }
807
808
809 /**
810  * Response from PSYCstore with the current counter values for a channel master.
811  */
812 static void
813 master_counters_cb (void *cls, int result, uint64_t max_fragment_id,
814                     uint64_t max_message_id, uint64_t max_group_generation,
815                     uint64_t max_state_message_id)
816 {
817   struct Master *mst = cls;
818   struct Channel *ch = &mst->channel;
819   struct CountersResult *res = GNUNET_malloc (sizeof (*res));
820   res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
821   res->header.size = htons (sizeof (*res));
822   res->result_code = htonl (result);
823   res->max_message_id = GNUNET_htonll (max_message_id);
824
825   if (GNUNET_OK == result || GNUNET_NO == result)
826   {
827     mst->max_message_id = max_message_id;
828     mst->max_state_message_id = max_state_message_id;
829     mst->max_group_generation = max_group_generation;
830     mst->origin
831       = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key,
832                                        max_fragment_id + 1,
833                                        join_cb, membership_test_cb,
834                                        replay_fragment_cb, replay_message_cb,
835                                        request_cb, message_cb, ch);
836     ch->ready = GNUNET_YES;
837   }
838   GNUNET_SERVER_notification_context_add (nc, ch->client);
839   GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
840                                               GNUNET_NO);
841   GNUNET_free (res);
842 }
843
844
845 /**
846  * Response from PSYCstore with the current counter values for a channel slave.
847  */
848 void
849 slave_counters_cb (void *cls, int result, uint64_t max_fragment_id,
850                    uint64_t max_message_id, uint64_t max_group_generation,
851                    uint64_t max_state_message_id)
852 {
853   struct Slave *slv = cls;
854   struct Channel *ch = &slv->channel;
855   struct CountersResult *res = GNUNET_malloc (sizeof (*res));
856   res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
857   res->header.size = htons (sizeof (*res));
858   res->result_code = htonl (result);
859   res->max_message_id = GNUNET_htonll (max_message_id);
860
861   if (GNUNET_OK == result || GNUNET_NO == result)
862   {
863     slv->max_message_id = max_message_id;
864     slv->member
865       = GNUNET_MULTICAST_member_join (cfg, &ch->pub_key, &slv->slave_key,
866                                       &slv->origin,
867                                       slv->relay_count, slv->relays,
868                                       slv->join_req, join_cb,
869                                       membership_test_cb,
870                                       replay_fragment_cb, replay_message_cb,
871                                       message_cb, ch);
872     ch->ready = GNUNET_YES;
873   }
874
875   GNUNET_SERVER_notification_context_add (nc, ch->client);
876   GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
877                                               GNUNET_NO);
878   GNUNET_free (res);
879 }
880
881
882 /**
883  * Handle a connecting client starting a channel master.
884  */
885 static void
886 handle_master_start (void *cls, struct GNUNET_SERVER_Client *client,
887                      const struct GNUNET_MessageHeader *msg)
888 {
889   const struct MasterStartRequest *req
890     = (const struct MasterStartRequest *) msg;
891   struct Master *mst = GNUNET_new (struct Master);
892   struct Channel *ch = &mst->channel;
893   ch->client = client;
894   ch->is_master = GNUNET_YES;
895   mst->policy = ntohl (req->policy);
896   mst->priv_key = req->channel_key;
897   GNUNET_CRYPTO_eddsa_key_get_public (&mst->priv_key, &ch->pub_key);
898   GNUNET_CRYPTO_hash (&ch->pub_key, sizeof (ch->pub_key), &ch->pub_key_hash);
899   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
900               "%p Master connected to channel %s.\n",
901               mst, GNUNET_h2s (&ch->pub_key_hash));
902
903   GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key, master_counters_cb, mst);
904
905   GNUNET_SERVER_client_set_user_context (client, &mst->channel);
906   GNUNET_CONTAINER_multihashmap_put (clients, &ch->pub_key_hash, mst,
907                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
908   GNUNET_SERVER_receive_done (client, GNUNET_OK);
909 }
910
911
912 /**
913  * Handle a connecting client joining as a channel slave.
914  */
915 static void
916 handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
917                    const struct GNUNET_MessageHeader *msg)
918 {
919   const struct SlaveJoinRequest *req
920     = (const struct SlaveJoinRequest *) msg;
921   struct Slave *slv = GNUNET_new (struct Slave);
922   struct Channel *ch = &slv->channel;
923   slv->channel.client = client;
924   slv->channel.is_master = GNUNET_NO;
925   slv->slave_key = req->slave_key;
926   ch->pub_key = req->channel_key;
927   GNUNET_CRYPTO_hash (&ch->pub_key, sizeof (ch->pub_key),
928                       &ch->pub_key_hash);
929   slv->origin = req->origin;
930   slv->relay_count = ntohl (req->relay_count);
931   if (0 < slv->relay_count)
932   {
933     const struct GNUNET_PeerIdentity *relays
934       = (const struct GNUNET_PeerIdentity *) &req[1];
935     slv->relays
936       = GNUNET_malloc (slv->relay_count * sizeof (struct GNUNET_PeerIdentity));
937     uint32_t i;
938     for (i = 0; i < slv->relay_count; i++)
939       memcpy (&slv->relays[i], &relays[i], sizeof (*relays));
940   }
941
942   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
943               "%p Slave connected to channel %s.\n",
944               slv, GNUNET_h2s (&ch->pub_key_hash));
945
946   GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key, slave_counters_cb, slv);
947
948   GNUNET_SERVER_client_set_user_context (client, &slv->channel);
949   GNUNET_SERVER_receive_done (client, GNUNET_OK);
950 }
951
952
953 /**
954  * Send acknowledgement to a client.
955  *
956  * Sent after a message fragment has been passed on to multicast.
957  *
958  * @param ch The channel struct for the client.
959  */
960 static void
961 send_message_ack (struct Channel *ch)
962 {
963   struct GNUNET_MessageHeader res;
964   res.size = htons (sizeof (res));
965   res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
966
967   GNUNET_SERVER_notification_context_add (nc, ch->client);
968   GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res,
969                                               GNUNET_NO);
970 }
971
972
973 /**
974  * Callback for the transmit functions of multicast.
975  */
976 static int
977 transmit_notify (void *cls, size_t *data_size, void *data)
978 {
979   struct Channel *ch = cls;
980   struct TransmitMessage *tmit_msg = ch->tmit_head;
981
982   if (NULL == tmit_msg || *data_size < tmit_msg->size)
983   {
984     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
985                 "%p transmit_notify: nothing to send.\n", ch);
986     *data_size = 0;
987     return GNUNET_NO;
988   }
989
990   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
991               "%p transmit_notify: sending %u bytes.\n", ch, tmit_msg->size);
992
993   *data_size = tmit_msg->size;
994   memcpy (data, tmit_msg->buf, *data_size);
995
996   GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, tmit_msg);
997   GNUNET_free (tmit_msg);
998
999   int ret = (MSG_STATE_END < ch->tmit_state) ? GNUNET_NO : GNUNET_YES;
1000   send_message_ack (ch);
1001
1002   if (0 == ch->tmit_task)
1003   {
1004     if (NULL != ch->tmit_head)
1005     {
1006       transmit_message (ch, GNUNET_NO);
1007     }
1008     else if (ch->disconnected)
1009     {
1010       /* FIXME: handle partial message (when still in_transmit) */
1011       client_cleanup (ch);
1012     }
1013   }
1014
1015   return ret;
1016 }
1017
1018
1019 /**
1020  * Callback for the transmit functions of multicast.
1021  */
1022 static int
1023 master_transmit_notify (void *cls, size_t *data_size, void *data)
1024 {
1025   int ret = transmit_notify (cls, data_size, data);
1026
1027   if (GNUNET_YES == ret)
1028   {
1029     struct Master *mst = cls;
1030     mst->tmit_handle = NULL;
1031   }
1032   return ret;
1033 }
1034
1035
1036 /**
1037  * Callback for the transmit functions of multicast.
1038  */
1039 static int
1040 slave_transmit_notify (void *cls, size_t *data_size, void *data)
1041 {
1042   int ret = transmit_notify (cls, data_size, data);
1043
1044   if (GNUNET_YES == ret)
1045   {
1046     struct Slave *slv = cls;
1047     slv->tmit_handle = NULL;
1048   }
1049   return ret;
1050 }
1051
1052
1053 /**
1054  * Transmit a message from a channel master to the multicast group.
1055  */
1056 static void
1057 master_transmit_message (struct Master *mst, uint8_t inc_msg_id)
1058 {
1059   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_transmit_message()\n", mst);
1060   mst->channel.tmit_task = 0;
1061   if (NULL == mst->tmit_handle)
1062   {
1063     if (GNUNET_YES == inc_msg_id)
1064       mst->max_message_id++;
1065     mst->tmit_handle
1066       = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id,
1067                                         mst->max_group_generation,
1068                                         master_transmit_notify, mst);
1069   }
1070   else
1071   {
1072     GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
1073   }
1074 }
1075
1076
1077 /**
1078  * Transmit a message from a channel slave to the multicast group.
1079  */
1080 static void
1081 slave_transmit_message (struct Slave *slv, uint8_t inc_msg_id)
1082 {
1083   slv->channel.tmit_task = 0;
1084   if (NULL == slv->tmit_handle)
1085   {
1086     if (GNUNET_YES == inc_msg_id)
1087       slv->max_message_id++;
1088     slv->tmit_handle
1089       = GNUNET_MULTICAST_member_to_origin (slv->member, slv->max_request_id,
1090                                            slave_transmit_notify, slv);
1091   }
1092   else
1093   {
1094     GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
1095   }
1096 }
1097
1098
1099 static inline void
1100 transmit_message (struct Channel *ch, uint8_t inc_msg_id)
1101 {
1102   ch->is_master
1103     ? master_transmit_message ((struct Master *) ch, inc_msg_id)
1104     : slave_transmit_message ((struct Slave *) ch, inc_msg_id);
1105 }
1106
1107
1108 static void
1109 transmit_error (struct Channel *ch)
1110 {
1111   struct GNUNET_MessageHeader *msg;
1112   struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg)
1113                                                     + sizeof (*msg));
1114   msg = (struct GNUNET_MessageHeader *) &tmit_msg[1];
1115   msg->size = ntohs (sizeof (*msg));
1116   msg->type = ntohs (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
1117
1118   tmit_msg->buf = (char *) &tmit_msg[1];
1119   tmit_msg->size = sizeof (*msg);
1120   tmit_msg->state = ch->tmit_state;
1121   GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
1122   transmit_message (ch, GNUNET_NO);
1123
1124   /* FIXME: cleanup */
1125 }
1126
1127
1128 /**
1129  * Incoming message from a client.
1130  */
1131 static void
1132 handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
1133                      const struct GNUNET_MessageHeader *msg)
1134 {
1135   struct Channel *ch
1136     = GNUNET_SERVER_client_get_user_context (client, struct Channel);
1137   GNUNET_assert (NULL != ch);
1138
1139   if (GNUNET_YES != ch->ready)
1140   {
1141     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1142                 "%p Dropping message from client, channel is not ready yet.\n",
1143                 ch);
1144     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1145     return;
1146   }
1147
1148   uint8_t inc_msg_id = GNUNET_NO;
1149   uint16_t size = ntohs (msg->size);
1150   uint16_t psize = 0, ptype = 0, pos = 0;
1151
1152   if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
1153   {
1154     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large\n", ch);
1155     GNUNET_break (0);
1156     transmit_error (ch);
1157     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1158     return;
1159   }
1160
1161   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1162               "%p Received message from client.\n", ch);
1163   GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
1164
1165   for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
1166   {
1167     const struct GNUNET_MessageHeader *pmsg
1168       = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
1169     psize = ntohs (pmsg->size);
1170     ptype = ntohs (pmsg->type);
1171     if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
1172     {
1173       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1174                   "%p Received invalid message part of type %u and size %u "
1175                   "from client.\n", ch, ptype, psize);
1176       GNUNET_break (0);
1177       transmit_error (ch);
1178       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1179       return;
1180     }
1181     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1182                 "%p Received message part from client.\n", ch);
1183     GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
1184
1185     if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == ptype)
1186       inc_msg_id = GNUNET_YES;
1187   }
1188
1189   size -= sizeof (*msg);
1190   struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + size);
1191   tmit_msg->buf = (char *) &tmit_msg[1];
1192   memcpy (tmit_msg->buf, &msg[1], size);
1193   tmit_msg->size = size;
1194   tmit_msg->state = ch->tmit_state;
1195   GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
1196   transmit_message (ch, inc_msg_id);
1197
1198   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1199 };
1200
1201
1202 /**
1203  * Client requests to add a slave to the membership database.
1204  */
1205 static void
1206 handle_slave_add (void *cls, struct GNUNET_SERVER_Client *client,
1207                   const struct GNUNET_MessageHeader *msg)
1208 {
1209
1210 }
1211
1212
1213 /**
1214  * Client requests to remove a slave from the membership database.
1215  */
1216 static void
1217 handle_slave_remove (void *cls, struct GNUNET_SERVER_Client *client,
1218                      const struct GNUNET_MessageHeader *msg)
1219 {
1220
1221 }
1222
1223
1224 /**
1225  * Client requests channel history from PSYCstore.
1226  */
1227 static void
1228 handle_story_request (void *cls, struct GNUNET_SERVER_Client *client,
1229                       const struct GNUNET_MessageHeader *msg)
1230 {
1231
1232 }
1233
1234
1235 /**
1236  * Client requests best matching state variable from PSYCstore.
1237  */
1238 static void
1239 handle_state_get (void *cls, struct GNUNET_SERVER_Client *client,
1240                   const struct GNUNET_MessageHeader *msg)
1241 {
1242
1243 }
1244
1245
1246 /**
1247  * Client requests state variables with a given prefix from PSYCstore.
1248  */
1249 static void
1250 handle_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
1251                          const struct GNUNET_MessageHeader *msg)
1252 {
1253
1254 }
1255
1256
1257 /**
1258  * Initialize the PSYC service.
1259  *
1260  * @param cls Closure.
1261  * @param server The initialized server.
1262  * @param c Configuration to use.
1263  */
1264 static void
1265 run (void *cls, struct GNUNET_SERVER_Handle *server,
1266      const struct GNUNET_CONFIGURATION_Handle *c)
1267 {
1268   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
1269     { &handle_master_start, NULL,
1270       GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
1271
1272     { &handle_slave_join, NULL,
1273       GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
1274
1275     { &handle_psyc_message, NULL,
1276       GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
1277
1278     { &handle_slave_add, NULL,
1279       GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD, 0 },
1280
1281     { &handle_slave_remove, NULL,
1282       GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM, 0 },
1283
1284     { &handle_story_request, NULL,
1285       GNUNET_MESSAGE_TYPE_PSYC_STORY_REQUEST, 0 },
1286
1287     { &handle_state_get, NULL,
1288       GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
1289
1290     { &handle_state_get_prefix, NULL,
1291       GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 }
1292   };
1293
1294   cfg = c;
1295   store = GNUNET_PSYCSTORE_connect (cfg);
1296   stats = GNUNET_STATISTICS_create ("psyc", cfg);
1297   clients = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1298   recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1299   nc = GNUNET_SERVER_notification_context_create (server, 1);
1300   GNUNET_SERVER_add_handlers (server, handlers);
1301   GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
1302   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
1303                                 &shutdown_task, NULL);
1304 }
1305
1306
1307 /**
1308  * The main function for the service.
1309  *
1310  * @param argc number of arguments from the command line
1311  * @param argv command line arguments
1312  * @return 0 ok, 1 on error
1313  */
1314 int
1315 main (int argc, char *const *argv)
1316 {
1317   return (GNUNET_OK ==
1318           GNUNET_SERVICE_run (argc, argv, "psyc",
1319                               GNUNET_SERVICE_OPTION_NONE,
1320                               &run, NULL)) ? 0 : 1;
1321 }
1322
1323 /* end of gnunet-service-psycstore.c */