refactor DHT for new service API
[oweals/gnunet.git] / src / core / core_api_2.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009-2016 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file core/core_api_2.c
22  * @brief core service; this is the main API for encrypted P2P
23  *        communications
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_constants.h"
29 #include "gnunet_core_service.h"
30 #include "core.h"
31
32 #define LOG(kind,...) GNUNET_log_from (kind, "core-api",__VA_ARGS__)
33
34
35 /**
36  * Information we track for each peer.
37  */
38 struct PeerRecord
39 {
40
41   /**
42    * Corresponding CORE handle.
43    */
44   struct GNUNET_CORE_Handle *h;
45
46   /**
47    * Message queue for the peer.
48    */
49   struct GNUNET_MQ_Handle *mq;
50
51   /**
52    * Message we are currently trying to pass to the CORE service
53    * for this peer (from @e mq).
54    */
55   struct GNUNET_MQ_Envelope *env;
56
57   /**
58    * Value the client returned when we connected, used
59    * as the closure in various places.
60    */
61   void *client_cls;
62
63   /**
64    * Peer the record is about.
65    */
66   struct GNUNET_PeerIdentity peer;
67
68   /**
69    * SendMessageRequest ID generator for this peer.
70    */
71   uint16_t smr_id_gen;
72
73 };
74
75
76 /**
77  * Context for the core service connection.
78  */
79 struct GNUNET_CORE_Handle
80 {
81
82   /**
83    * Configuration we're using.
84    */
85   const struct GNUNET_CONFIGURATION_Handle *cfg;
86
87   /**
88    * Closure for the various callbacks.
89    */
90   void *cls;
91
92   /**
93    * Function to call once we've handshaked with the core service.
94    */
95   GNUNET_CORE_StartupCallback init;
96
97   /**
98    * Function to call whenever we're notified about a peer connecting.
99    */
100   GNUNET_CORE_ConnecTEventHandler connects;
101
102   /**
103    * Function to call whenever we're notified about a peer disconnecting.
104    */
105   GNUNET_CORE_DisconnecTEventHandler disconnects;
106
107   /**
108    * Function handlers for messages of particular type.
109    */
110   struct GNUNET_MQ_MessageHandler *handlers;
111
112   /**
113    * Our message queue for transmissions to the service.
114    */
115   struct GNUNET_MQ_Handle *mq;
116
117   /**
118    * Hash map listing all of the peers that we are currently
119    * connected to.
120    */
121   struct GNUNET_CONTAINER_MultiPeerMap *peers;
122
123   /**
124    * Identity of this peer.
125    */
126   struct GNUNET_PeerIdentity me;
127
128   /**
129    * ID of reconnect task (if any).
130    */
131   struct GNUNET_SCHEDULER_Task *reconnect_task;
132
133   /**
134    * Current delay we use for re-trying to connect to core.
135    */
136   struct GNUNET_TIME_Relative retry_backoff;
137
138   /**
139    * Number of entries in the handlers array.
140    */
141   unsigned int hcnt;
142
143   /**
144    * Did we ever get INIT?
145    */
146   int have_init;
147
148 };
149
150
151 /**
152  * Our current client connection went down.  Clean it up
153  * and try to reconnect!
154  *
155  * @param h our handle to the core service
156  */
157 static void
158 reconnect (struct GNUNET_CORE_Handle *h);
159
160
161 /**
162  * Task schedule to try to re-connect to core.
163  *
164  * @param cls the `struct GNUNET_CORE_Handle`
165  * @param tc task context
166  */
167 static void
168 reconnect_task (void *cls)
169 {
170   struct GNUNET_CORE_Handle *h = cls;
171
172   h->reconnect_task = NULL;
173   LOG (GNUNET_ERROR_TYPE_DEBUG,
174        "Connecting to CORE service after delay\n");
175   reconnect (h);
176 }
177
178
179 /**
180  * Notify clients about disconnect and free the entry for connected
181  * peer.
182  *
183  * @param cls the `struct GNUNET_CORE_Handle *`
184  * @param key the peer identity (not used)
185  * @param value the `struct PeerRecord` to free.
186  * @return #GNUNET_YES (continue)
187  */
188 static int
189 disconnect_and_free_peer_entry (void *cls,
190                                 const struct GNUNET_PeerIdentity *key,
191                                 void *value)
192 {
193   struct GNUNET_CORE_Handle *h = cls;
194   struct PeerRecord *pr = value;
195
196   GNUNET_assert (pr->h == h);
197   if (NULL != h->disconnects)
198     h->disconnects (h->cls,
199                     &pr->peer,
200                     pr->client_cls);
201   GNUNET_assert (GNUNET_YES ==
202                  GNUNET_CONTAINER_multipeermap_remove (h->peers,
203                                                        key,
204                                                        pr));
205   GNUNET_MQ_destroy (pr->mq);
206   GNUNET_assert (NULL == pr->mq);
207   GNUNET_free (pr);
208   return GNUNET_YES;
209 }
210
211
212 /**
213  * Close down any existing connection to the CORE service and
214  * try re-establishing it later.
215  *
216  * @param h our handle
217  */
218 static void
219 reconnect_later (struct GNUNET_CORE_Handle *h)
220 {
221   GNUNET_assert (NULL == h->reconnect_task);
222   if (NULL != h->mq)
223   {
224     GNUNET_MQ_destroy (h->mq);
225     h->mq = NULL;
226   }
227   GNUNET_assert (NULL == h->reconnect_task);
228   h->reconnect_task =
229       GNUNET_SCHEDULER_add_delayed (h->retry_backoff,
230                                     &reconnect_task,
231                                     h);
232   GNUNET_CONTAINER_multipeermap_iterate (h->peers,
233                                          &disconnect_and_free_peer_entry,
234                                          h);
235   h->retry_backoff = GNUNET_TIME_STD_BACKOFF (h->retry_backoff);
236 }
237
238
239 /**
240  * Error handler for the message queue to the CORE service.
241  * On errors, we reconnect.
242  *
243  * @param cls closure, a `struct GNUNET_CORE_Handle *`
244  * @param error error code
245  */
246 static void
247 handle_mq_error (void *cls,
248                  enum GNUNET_MQ_Error error)
249 {
250   struct GNUNET_CORE_Handle *h = cls;
251
252   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
253               "MQ ERROR: %d\n",
254               error);
255   reconnect_later (h);
256 }
257
258
259 /**
260  * Inquire with CORE what options should be set for a message
261  * so that it is transmitted with the given @a priority and
262  * the given @a cork value.
263  *
264  * @param cork desired corking
265  * @param priority desired message priority
266  * @param[out] flags set to `flags` value for #GNUNET_MQ_set_options()
267  * @return `extra` argument to give to #GNUNET_MQ_set_options()
268  */
269 const void *
270 GNUNET_CORE_get_mq_options (int cork,
271                             enum GNUNET_CORE_Priority priority,
272                             uint64_t *flags)
273 {
274   *flags = ((uint64_t) priority) + (((uint64_t) cork) << 32);
275   return NULL;
276 }
277
278
279 /**
280  * Implement sending functionality of a message queue for
281  * us sending messages to a peer.
282  *
283  * @param mq the message queue
284  * @param msg the message to send
285  * @param impl_state state of the implementation
286  */
287 static void
288 core_mq_send_impl (struct GNUNET_MQ_Handle *mq,
289                    const struct GNUNET_MessageHeader *msg,
290                    void *impl_state)
291 {
292   struct PeerRecord *pr = impl_state;
293   struct GNUNET_CORE_Handle *h = pr->h;
294   struct SendMessageRequest *smr;
295   struct SendMessage *sm;
296   struct GNUNET_MQ_Envelope *env;
297   uint16_t msize;
298   uint64_t flags;
299   int cork;
300   enum GNUNET_CORE_Priority priority;
301
302   GNUNET_assert (NULL == pr->env);
303   /* extract options from envelope */
304   env = GNUNET_MQ_get_current_envelope (mq);
305   GNUNET_break (NULL ==
306                 GNUNET_MQ_env_get_options (env,
307                                            &flags));
308   cork = (int) (flags >> 32);
309   priority = (uint32_t) flags;
310
311   /* check message size for sanity */
312   msize = ntohs (msg->size);
313   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct SendMessage))
314   {
315     GNUNET_break (0);
316     GNUNET_MQ_impl_send_continue (mq);
317     return;
318   }
319
320   /* ask core for transmission */
321   LOG (GNUNET_ERROR_TYPE_DEBUG,
322        "Asking core for transmission of %u bytes to `%s'\n",
323        (unsigned int) msize,
324        GNUNET_i2s (&pr->peer));
325   env = GNUNET_MQ_msg (smr,
326                        GNUNET_MESSAGE_TYPE_CORE_SEND_REQUEST);
327   smr->priority = htonl ((uint32_t) priority);
328   // smr->deadline = GNUNET_TIME_absolute_hton (deadline);
329   smr->peer = pr->peer;
330   smr->reserved = htonl (0);
331   smr->size = htons (msize);
332   smr->smr_id = htons (++pr->smr_id_gen);
333   GNUNET_MQ_send (h->mq,
334                   env);
335
336   /* prepare message with actual transmission data */
337   pr->env = GNUNET_MQ_msg_nested_mh (sm,
338                                      GNUNET_MESSAGE_TYPE_CORE_SEND,
339                                      msg);
340   sm->priority = htonl ((uint32_t) priority);
341   // sm->deadline = GNUNET_TIME_absolute_hton (deadline);
342   sm->peer = pr->peer;
343   sm->cork = htonl ((uint32_t) cork);
344   sm->reserved = htonl (0);
345   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
346               "Calling get_message with buffer of %u bytes (%s)\n",
347               (unsigned int) msize,
348               cork ? "corked" : "uncorked");
349 }
350
351
352 /**
353  * Handle destruction of a message queue.  Implementations must not
354  * free @a mq, but should take care of @a impl_state.
355  *
356  * @param mq the message queue to destroy
357  * @param impl_state state of the implementation
358  */
359 static void
360 core_mq_destroy_impl (struct GNUNET_MQ_Handle *mq,
361                       void *impl_state)
362 {
363   struct PeerRecord *pr = impl_state;
364
365   GNUNET_assert (mq == pr->mq);
366   pr->mq = NULL;
367 }
368
369
370 /**
371  * Implementation function that cancels the currently sent message.
372  * Should basically undo whatever #mq_send_impl() did.
373  *
374  * @param mq message queue
375  * @param impl_state state specific to the implementation
376  */
377 static void
378 core_mq_cancel_impl (struct GNUNET_MQ_Handle *mq,
379                      void *impl_state)
380 {
381   struct PeerRecord *pr = impl_state;
382
383   GNUNET_assert (NULL != pr->env);
384   GNUNET_MQ_discard (pr->env);
385   pr->env = NULL;
386 }
387
388
389 /**
390  * We had an error processing a message we forwarded from a peer to
391  * the CORE service.  We should just complain about it but otherwise
392  * continue processing.
393  *
394  * @param cls closure
395  * @param error error code
396  */
397 static void
398 core_mq_error_handler (void *cls,
399                        enum GNUNET_MQ_Error error)
400 {
401   /* struct PeerRecord *pr = cls; */
402
403   GNUNET_break_op (0);
404 }
405
406
407 /**
408  * Add the given peer to the list of our connected peers
409  * and create the respective data structures and notify
410  * the application.
411  *
412  * @param h the core handle
413  * @param peer the peer that is connecting to us
414  */
415 static void
416 connect_peer (struct GNUNET_CORE_Handle *h,
417               const struct GNUNET_PeerIdentity *peer)
418 {
419   struct PeerRecord *pr;
420   uint64_t flags;
421   const void *extra;
422
423   pr = GNUNET_new (struct PeerRecord);
424   pr->peer = *peer;
425   pr->h = h;
426   GNUNET_assert (GNUNET_YES ==
427                  GNUNET_CONTAINER_multipeermap_put (h->peers,
428                                                     &pr->peer,
429                                                     pr,
430                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
431   pr->mq = GNUNET_MQ_queue_for_callbacks (&core_mq_send_impl,
432                                           &core_mq_destroy_impl,
433                                           &core_mq_cancel_impl,
434                                           pr,
435                                           h->handlers,
436                                           &core_mq_error_handler,
437                                           pr);
438   /* get our default options */
439   extra = GNUNET_CORE_get_mq_options (GNUNET_NO,
440                                       GNUNET_CORE_PRIO_BEST_EFFORT,
441                                       &flags);
442   GNUNET_MQ_set_options (pr->mq,
443                          flags,
444                          extra);
445   if (NULL != h->connects)
446   {
447     pr->client_cls = h->connects (h->cls,
448                                   &pr->peer,
449                                   pr->mq);
450     GNUNET_MQ_set_handlers_closure (pr->mq,
451                                     pr->client_cls);
452   }
453 }
454
455
456 /**
457  * Handle  init  reply message  received  from  CORE service.   Notify
458  * application  that we  are now  connected  to the  CORE.  Also  fake
459  * loopback connection.
460  *
461  * @param cls the `struct GNUNET_CORE_Handle`
462  * @param m the init reply
463  */
464 static void
465 handle_init_reply (void *cls,
466                    const struct InitReplyMessage *m)
467 {
468   struct GNUNET_CORE_Handle *h = cls;
469   GNUNET_CORE_StartupCallback init;
470
471   GNUNET_break (0 == ntohl (m->reserved));
472   h->retry_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
473   if (NULL != (init = h->init))
474   {
475     /* mark so we don't call init on reconnect */
476     h->init = NULL;
477     h->me = m->my_identity;
478     LOG (GNUNET_ERROR_TYPE_DEBUG,
479          "Connected to core service of peer `%s'.\n",
480          GNUNET_i2s (&h->me));
481     h->have_init = GNUNET_YES;
482     init (h->cls,
483           &h->me);
484   }
485   else
486   {
487     LOG (GNUNET_ERROR_TYPE_DEBUG,
488          "Successfully reconnected to core service.\n");
489     if (GNUNET_NO == h->have_init)
490     {
491       h->me = m->my_identity;
492       h->have_init = GNUNET_YES;
493     }
494     else
495     {
496       GNUNET_break (0 == memcmp (&h->me,
497                                  &m->my_identity,
498                                  sizeof (struct GNUNET_PeerIdentity)));
499     }
500   }
501   /* fake 'connect to self' */
502   connect_peer (h,
503                 &h->me);
504 }
505
506
507 /**
508  * Handle connect message received from CORE service.
509  * Notify the application about the new connection.
510  *
511  * @param cls the `struct GNUNET_CORE_Handle`
512  * @param cnm the connect message
513  */
514 static void
515 handle_connect_notify (void *cls,
516                        const struct ConnectNotifyMessage *cnm)
517 {
518   struct GNUNET_CORE_Handle *h = cls;
519   struct PeerRecord *pr;
520
521   LOG (GNUNET_ERROR_TYPE_DEBUG,
522        "Received notification about connection from `%s'.\n",
523        GNUNET_i2s (&cnm->peer));
524   if (0 == memcmp (&h->me,
525                    &cnm->peer,
526                    sizeof (struct GNUNET_PeerIdentity)))
527   {
528     /* connect to self!? */
529     GNUNET_break (0);
530     return;
531   }
532   pr = GNUNET_CONTAINER_multipeermap_get (h->peers,
533                                           &cnm->peer);
534   if (NULL != pr)
535   {
536     GNUNET_break (0);
537     reconnect_later (h);
538     return;
539   }
540   connect_peer (h,
541                 &cnm->peer);
542 }
543
544
545 /**
546  * Handle disconnect message received from CORE service.
547  * Notify the application about the lost connection.
548  *
549  * @param cls the `struct GNUNET_CORE_Handle`
550  * @param dnm message about the disconnect event
551  */
552 static void
553 handle_disconnect_notify (void *cls,
554                           const struct DisconnectNotifyMessage *dnm)
555 {
556   struct GNUNET_CORE_Handle *h = cls;
557   struct PeerRecord *pr;
558
559   if (0 == memcmp (&h->me,
560                    &dnm->peer,
561                    sizeof (struct GNUNET_PeerIdentity)))
562   {
563     /* disconnect from self!? */
564     GNUNET_break (0);
565     return;
566   }
567   GNUNET_break (0 == ntohl (dnm->reserved));
568   LOG (GNUNET_ERROR_TYPE_DEBUG,
569        "Received notification about disconnect from `%s'.\n",
570        GNUNET_i2s (&dnm->peer));
571   pr = GNUNET_CONTAINER_multipeermap_get (h->peers,
572                                           &dnm->peer);
573   if (NULL == pr)
574   {
575     GNUNET_break (0);
576     reconnect_later (h);
577     return;
578   }
579   disconnect_and_free_peer_entry (h,
580                                   &pr->peer,
581                                   pr);
582 }
583
584
585 /**
586  * Check that message received from CORE service is well-formed.
587  *
588  * @param cls the `struct GNUNET_CORE_Handle`
589  * @param ntm the message we got
590  * @return #GNUNET_OK if the message is well-formed
591  */
592 static int
593 check_notify_inbound (void *cls,
594                       const struct NotifyTrafficMessage *ntm)
595 {
596   uint16_t msize;
597   const struct GNUNET_MessageHeader *em;
598
599   msize = ntohs (ntm->header.size) - sizeof (struct NotifyTrafficMessage);
600   if (msize < sizeof (struct GNUNET_MessageHeader))
601   {
602     GNUNET_break (0);
603     return GNUNET_SYSERR;
604   }
605   em = (const struct GNUNET_MessageHeader *) &ntm[1];
606   if (msize != ntohs (em->size))
607   {
608     GNUNET_break (0);
609     return GNUNET_SYSERR;
610   }
611   return GNUNET_OK;
612 }
613
614
615 /**
616  * Handle inbound message received from CORE service.  If applicable,
617  * notify the application.
618  *
619  * @param cls the `struct GNUNET_CORE_Handle`
620  * @param ntm the message we got from CORE.
621  */
622 static void
623 handle_notify_inbound (void *cls,
624                        const struct NotifyTrafficMessage *ntm)
625 {
626   struct GNUNET_CORE_Handle *h = cls;
627   const struct GNUNET_MessageHeader *em;
628   struct PeerRecord *pr;
629
630   LOG (GNUNET_ERROR_TYPE_DEBUG,
631        "Received inbound message from `%s'.\n",
632        GNUNET_i2s (&ntm->peer));
633   em = (const struct GNUNET_MessageHeader *) &ntm[1];
634   pr = GNUNET_CONTAINER_multipeermap_get (h->peers,
635                                           &ntm->peer);
636   if (NULL == pr)
637   {
638     GNUNET_break (0);
639     reconnect_later (h);
640     return;
641   }
642   GNUNET_MQ_inject_message (pr->mq,
643                             em);
644 }
645
646
647 /**
648  * Handle message received from CORE service notifying us that we are
649  * now allowed to send a message to a peer.  If that message is still
650  * pending, put it into the queue to be transmitted.
651  *
652  * @param cls the `struct GNUNET_CORE_Handle`
653  * @param smr the message we got
654  */
655 static void
656 handle_send_ready (void *cls,
657                    const struct SendMessageReady *smr)
658 {
659   struct GNUNET_CORE_Handle *h = cls;
660   struct PeerRecord *pr;
661
662   pr = GNUNET_CONTAINER_multipeermap_get (h->peers,
663                                           &smr->peer);
664   if (NULL == pr)
665   {
666     GNUNET_break (0);
667     reconnect_later (h);
668     return;
669   }
670   LOG (GNUNET_ERROR_TYPE_DEBUG,
671        "Received notification about transmission readiness to `%s'.\n",
672        GNUNET_i2s (&smr->peer));
673   if (NULL == pr->env)
674   {
675     /* request must have been cancelled between the original request
676      * and the response from CORE, ignore CORE's readiness */
677     return;
678   }
679   if (ntohs (smr->smr_id) != pr->smr_id_gen)
680   {
681     /* READY message is for expired or cancelled message,
682      * ignore! (we should have already sent another request) */
683     return;
684   }
685
686   /* ok, all good, send message out! */
687   GNUNET_MQ_send (h->mq,
688                   pr->env);
689   pr->env = NULL;
690   GNUNET_MQ_impl_send_continue (pr->mq);
691 }
692
693
694 /**
695  * Our current client connection went down.  Clean it up and try to
696  * reconnect!
697  *
698  * @param h our handle to the core service
699  */
700 static void
701 reconnect (struct GNUNET_CORE_Handle *h)
702 {
703   struct GNUNET_MQ_MessageHandler handlers[] = {
704     GNUNET_MQ_hd_fixed_size (init_reply,
705                              GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY,
706                              struct InitReplyMessage,
707                              h),
708     GNUNET_MQ_hd_fixed_size (connect_notify,
709                              GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT,
710                              struct ConnectNotifyMessage,
711                              h),
712     GNUNET_MQ_hd_fixed_size (disconnect_notify,
713                              GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT,
714                              struct DisconnectNotifyMessage,
715                              h),
716     GNUNET_MQ_hd_var_size (notify_inbound,
717                            GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND,
718                            struct NotifyTrafficMessage,
719                            h),
720     GNUNET_MQ_hd_fixed_size (send_ready,
721                              GNUNET_MESSAGE_TYPE_CORE_SEND_READY,
722                              struct SendMessageReady,
723                              h),
724     GNUNET_MQ_handler_end ()
725   };
726   struct InitMessage *init;
727   struct GNUNET_MQ_Envelope *env;
728   uint16_t *ts;
729
730   GNUNET_assert (NULL == h->mq);
731   h->mq = GNUNET_CLIENT_connecT (h->cfg,
732                                  "core",
733                                  handlers,
734                                  &handle_mq_error,
735                                  h);
736   if (NULL == h->mq)
737   {
738     reconnect_later (h);
739     return;
740   }
741   env = GNUNET_MQ_msg_extra (init,
742                              sizeof (uint16_t) * h->hcnt,
743                              GNUNET_MESSAGE_TYPE_CORE_INIT);
744   LOG (GNUNET_ERROR_TYPE_INFO,
745        "(Re)connecting to CORE service\n");
746   init->options = htonl (0);
747   ts = (uint16_t *) &init[1];
748   for (unsigned int hpos = 0; hpos < h->hcnt; hpos++)
749     ts[hpos] = htons (h->handlers[hpos].type);
750   GNUNET_MQ_send (h->mq,
751                   env);
752 }
753
754
755 /**
756  * Connect to the core service.  Note that the connection may complete
757  * (or fail) asynchronously.
758  *
759  * @param cfg configuration to use
760  * @param cls closure for the various callbacks that follow (including handlers in the handlers array)
761  * @param init callback to call once we have successfully
762  *        connected to the core service
763  * @param connects function to call on peer connect, can be NULL
764  * @param disconnects function to call on peer disconnect / timeout, can be NULL
765  * @param handlers callbacks for messages we care about, NULL-terminated
766  * @return handle to the core service (only useful for disconnect until @a init is called);
767  *                NULL on error (in this case, init is never called)
768  */
769 struct GNUNET_CORE_Handle *
770 GNUNET_CORE_connecT (const struct GNUNET_CONFIGURATION_Handle *cfg,
771                      void *cls,
772                      GNUNET_CORE_StartupCallback init,
773                      GNUNET_CORE_ConnecTEventHandler connects,
774                      GNUNET_CORE_DisconnecTEventHandler disconnects,
775                      const struct GNUNET_MQ_MessageHandler *handlers)
776 {
777   struct GNUNET_CORE_Handle *h;
778   unsigned int hcnt;
779
780   h = GNUNET_new (struct GNUNET_CORE_Handle);
781   h->cfg = cfg;
782   h->cls = cls;
783   h->init = init;
784   h->connects = connects;
785   h->disconnects = disconnects;
786   h->peers = GNUNET_CONTAINER_multipeermap_create (128,
787                                                    GNUNET_NO);
788   hcnt = 0;
789   if (NULL != handlers)
790     while (NULL != handlers[hcnt].cb)
791       hcnt++;
792   h->handlers = GNUNET_new_array (hcnt + 1,
793                                   struct GNUNET_MQ_MessageHandler);
794   if (NULL != handlers)
795     GNUNET_memcpy (h->handlers,
796                    handlers,
797                    hcnt * sizeof (struct GNUNET_MQ_MessageHandler));
798   h->hcnt = hcnt;
799   GNUNET_assert (hcnt <
800                  (GNUNET_SERVER_MAX_MESSAGE_SIZE -
801                   sizeof (struct InitMessage)) / sizeof (uint16_t));
802   LOG (GNUNET_ERROR_TYPE_DEBUG,
803        "Connecting to CORE service\n");
804   reconnect (h);
805   if (NULL == h->mq)
806   {
807     GNUNET_CORE_disconnect (h);
808     return NULL;
809   }
810   return h;
811 }
812
813
814 /**
815  * Disconnect from the core service.
816  *
817  * @param handle connection to core to disconnect
818  */
819 void
820 GNUNET_CORE_disconnecT (struct GNUNET_CORE_Handle *handle)
821 {
822   LOG (GNUNET_ERROR_TYPE_DEBUG,
823        "Disconnecting from CORE service\n");
824   GNUNET_CONTAINER_multipeermap_iterate (handle->peers,
825                                          &disconnect_and_free_peer_entry,
826                                          handle);
827   GNUNET_CONTAINER_multipeermap_destroy (handle->peers);
828   handle->peers = NULL;
829   if (NULL != handle->reconnect_task)
830   {
831     GNUNET_SCHEDULER_cancel (handle->reconnect_task);
832     handle->reconnect_task = NULL;
833   }
834   if (NULL != handle->mq)
835   {
836     GNUNET_MQ_destroy (handle->mq);
837     handle->mq = NULL;
838   }
839   GNUNET_free (handle->handlers);
840   GNUNET_free (handle);
841 }
842
843
844 /**
845  * Obtain the message queue for a connected peer.
846  *
847  * @param h the core handle
848  * @param pid the identity of the peer to check if it has been connected to us
849  * @return NULL if peer is not connected
850  */
851 struct GNUNET_MQ_Handle *
852 GNUNET_CORE_get_mq (const struct GNUNET_CORE_Handle *h,
853                     const struct GNUNET_PeerIdentity *pid)
854 {
855   struct PeerRecord *pr;
856
857   pr = GNUNET_CONTAINER_multipeermap_get (h->peers,
858                                           pid);
859   if (NULL == pr)
860     return NULL;
861   return pr->mq;
862 }
863
864
865 /* end of core_api.c */