fix bad free
[oweals/gnunet.git] / src / core / core_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009-2016 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14     
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18 /**
19  * @file core/core_api.c
20  * @brief core service; this is the main API for encrypted P2P
21  *        communications
22  * @author Christian Grothoff
23  */
24 #include "platform.h"
25 #include "gnunet_util_lib.h"
26 #include "gnunet_constants.h"
27 #include "gnunet_core_service.h"
28 #include "core.h"
29
30 #define LOG(kind,...) GNUNET_log_from (kind, "core-api",__VA_ARGS__)
31
32
33 /**
34  * Information we track for each peer.
35  */
36 struct PeerRecord
37 {
38
39   /**
40    * Corresponding CORE handle.
41    */
42   struct GNUNET_CORE_Handle *h;
43
44   /**
45    * Message queue for the peer.
46    */
47   struct GNUNET_MQ_Handle *mq;
48
49   /**
50    * Message we are currently trying to pass to the CORE service
51    * for this peer (from @e mq).
52    */
53   struct GNUNET_MQ_Envelope *env;
54
55   /**
56    * Value the client returned when we connected, used
57    * as the closure in various places.
58    */
59   void *client_cls;
60
61   /**
62    * Peer the record is about.
63    */
64   struct GNUNET_PeerIdentity peer;
65
66   /**
67    * SendMessageRequest ID generator for this peer.
68    */
69   uint16_t smr_id_gen;
70
71 };
72
73
74 /**
75  * Context for the core service connection.
76  */
77 struct GNUNET_CORE_Handle
78 {
79
80   /**
81    * Configuration we're using.
82    */
83   const struct GNUNET_CONFIGURATION_Handle *cfg;
84
85   /**
86    * Closure for the various callbacks.
87    */
88   void *cls;
89
90   /**
91    * Function to call once we've handshaked with the core service.
92    */
93   GNUNET_CORE_StartupCallback init;
94
95   /**
96    * Function to call whenever we're notified about a peer connecting.
97    */
98   GNUNET_CORE_ConnectEventHandler connects;
99
100   /**
101    * Function to call whenever we're notified about a peer disconnecting.
102    */
103   GNUNET_CORE_DisconnectEventHandler disconnects;
104
105   /**
106    * Function handlers for messages of particular type.
107    */
108   struct GNUNET_MQ_MessageHandler *handlers;
109
110   /**
111    * Our message queue for transmissions to the service.
112    */
113   struct GNUNET_MQ_Handle *mq;
114
115   /**
116    * Hash map listing all of the peers that we are currently
117    * connected to.
118    */
119   struct GNUNET_CONTAINER_MultiPeerMap *peers;
120
121   /**
122    * Identity of this peer.
123    */
124   struct GNUNET_PeerIdentity me;
125
126   /**
127    * ID of reconnect task (if any).
128    */
129   struct GNUNET_SCHEDULER_Task *reconnect_task;
130
131   /**
132    * Current delay we use for re-trying to connect to core.
133    */
134   struct GNUNET_TIME_Relative retry_backoff;
135
136   /**
137    * Number of entries in the handlers array.
138    */
139   unsigned int hcnt;
140
141   /**
142    * Did we ever get INIT?
143    */
144   int have_init;
145
146 };
147
148
149 /**
150  * Our current client connection went down.  Clean it up
151  * and try to reconnect!
152  *
153  * @param h our handle to the core service
154  */
155 static void
156 reconnect (struct GNUNET_CORE_Handle *h);
157
158
159 /**
160  * Task schedule to try to re-connect to core.
161  *
162  * @param cls the `struct GNUNET_CORE_Handle`
163  * @param tc task context
164  */
165 static void
166 reconnect_task (void *cls)
167 {
168   struct GNUNET_CORE_Handle *h = cls;
169
170   h->reconnect_task = NULL;
171   LOG (GNUNET_ERROR_TYPE_DEBUG,
172        "Connecting to CORE service after delay\n");
173   reconnect (h);
174 }
175
176
177 /**
178  * Notify clients about disconnect and free the entry for connected
179  * peer.
180  *
181  * @param cls the `struct GNUNET_CORE_Handle *`
182  * @param key the peer identity (not used)
183  * @param value the `struct PeerRecord` to free.
184  * @return #GNUNET_YES (continue)
185  */
186 static int
187 disconnect_and_free_peer_entry (void *cls,
188                                 const struct GNUNET_PeerIdentity *key,
189                                 void *value)
190 {
191   struct GNUNET_CORE_Handle *h = cls;
192   struct PeerRecord *pr = value;
193
194   GNUNET_assert (pr->h == h);
195   if (NULL != h->disconnects)
196     h->disconnects (h->cls,
197                     &pr->peer,
198                     pr->client_cls);
199   GNUNET_assert (GNUNET_YES ==
200                  GNUNET_CONTAINER_multipeermap_remove (h->peers,
201                                                        key,
202                                                        pr));
203   GNUNET_MQ_destroy (pr->mq);
204   GNUNET_assert (NULL == pr->mq);
205   if (NULL != pr->env)
206   {
207     GNUNET_MQ_discard (pr->env);
208     pr->env = NULL;
209   }
210   GNUNET_free (pr);
211   return GNUNET_YES;
212 }
213
214
215 /**
216  * Close down any existing connection to the CORE service and
217  * try re-establishing it later.
218  *
219  * @param h our handle
220  */
221 static void
222 reconnect_later (struct GNUNET_CORE_Handle *h)
223 {
224   GNUNET_assert (NULL == h->reconnect_task);
225   if (NULL != h->mq)
226   {
227     GNUNET_MQ_destroy (h->mq);
228     h->mq = NULL;
229   }
230   GNUNET_assert (NULL == h->reconnect_task);
231   h->reconnect_task =
232       GNUNET_SCHEDULER_add_delayed (h->retry_backoff,
233                                     &reconnect_task,
234                                     h);
235   GNUNET_CONTAINER_multipeermap_iterate (h->peers,
236                                          &disconnect_and_free_peer_entry,
237                                          h);
238   h->retry_backoff = GNUNET_TIME_STD_BACKOFF (h->retry_backoff);
239 }
240
241
242 /**
243  * Error handler for the message queue to the CORE service.
244  * On errors, we reconnect.
245  *
246  * @param cls closure, a `struct GNUNET_CORE_Handle *`
247  * @param error error code
248  */
249 static void
250 handle_mq_error (void *cls,
251                  enum GNUNET_MQ_Error error)
252 {
253   struct GNUNET_CORE_Handle *h = cls;
254
255   LOG (GNUNET_ERROR_TYPE_DEBUG,
256        "MQ ERROR: %d\n",
257        error);
258   reconnect_later (h);
259 }
260
261
262 /**
263  * Inquire with CORE what options should be set for a message
264  * so that it is transmitted with the given @a priority and
265  * the given @a cork value.
266  *
267  * @param cork desired corking
268  * @param priority desired message priority
269  * @param[out] flags set to `flags` value for #GNUNET_MQ_set_options()
270  * @return `extra` argument to give to #GNUNET_MQ_set_options()
271  */
272 const void *
273 GNUNET_CORE_get_mq_options (int cork,
274                             enum GNUNET_CORE_Priority priority,
275                             uint64_t *flags)
276 {
277   *flags = ((uint64_t) priority) + (((uint64_t) cork) << 32);
278   return NULL;
279 }
280
281
282 /**
283  * Implement sending functionality of a message queue for
284  * us sending messages to a peer.
285  *
286  * @param mq the message queue
287  * @param msg the message to send
288  * @param impl_state state of the implementation
289  */
290 static void
291 core_mq_send_impl (struct GNUNET_MQ_Handle *mq,
292                    const struct GNUNET_MessageHeader *msg,
293                    void *impl_state)
294 {
295   struct PeerRecord *pr = impl_state;
296   struct GNUNET_CORE_Handle *h = pr->h;
297   struct SendMessageRequest *smr;
298   struct SendMessage *sm;
299   struct GNUNET_MQ_Envelope *env;
300   uint16_t msize;
301   uint64_t flags;
302   int cork;
303   enum GNUNET_CORE_Priority priority;
304
305   if (NULL == h->mq)
306   {
307     /* We're currently reconnecting, pretend this worked */
308     GNUNET_MQ_impl_send_continue (mq);
309     return;
310   }
311   GNUNET_assert (NULL == pr->env);
312   /* extract options from envelope */
313   env = GNUNET_MQ_get_current_envelope (mq);
314   GNUNET_break (NULL ==
315                 GNUNET_MQ_env_get_options (env,
316                                            &flags));
317   cork = (int) (flags >> 32);
318   priority = (uint32_t) flags;
319
320   /* check message size for sanity */
321   msize = ntohs (msg->size);
322   if (msize >= GNUNET_MAX_MESSAGE_SIZE - sizeof (struct SendMessage))
323   {
324     GNUNET_break (0);
325     GNUNET_MQ_impl_send_continue (mq);
326     return;
327   }
328
329   /* ask core for transmission */
330   LOG (GNUNET_ERROR_TYPE_DEBUG,
331        "Asking core for transmission of %u bytes to `%s'\n",
332        (unsigned int) msize,
333        GNUNET_i2s (&pr->peer));
334   env = GNUNET_MQ_msg (smr,
335                        GNUNET_MESSAGE_TYPE_CORE_SEND_REQUEST);
336   smr->priority = htonl ((uint32_t) priority);
337   smr->peer = pr->peer;
338   smr->reserved = htonl (0);
339   smr->size = htons (msize);
340   smr->smr_id = htons (++pr->smr_id_gen);
341   GNUNET_MQ_send (h->mq,
342                   env);
343
344   /* prepare message with actual transmission data */
345   pr->env = GNUNET_MQ_msg_nested_mh (sm,
346                                      GNUNET_MESSAGE_TYPE_CORE_SEND,
347                                      msg);
348   sm->priority = htonl ((uint32_t) priority);
349   sm->peer = pr->peer;
350   sm->cork = htonl ((uint32_t) cork);
351   sm->reserved = htonl (0);
352   LOG (GNUNET_ERROR_TYPE_DEBUG,
353        "Calling get_message with buffer of %u bytes (%s)\n",
354        (unsigned int) msize,
355        cork ? "corked" : "uncorked");
356 }
357
358
359 /**
360  * Handle destruction of a message queue.  Implementations must not
361  * free @a mq, but should take care of @a impl_state.
362  *
363  * @param mq the message queue to destroy
364  * @param impl_state state of the implementation
365  */
366 static void
367 core_mq_destroy_impl (struct GNUNET_MQ_Handle *mq,
368                       void *impl_state)
369 {
370   struct PeerRecord *pr = impl_state;
371
372   GNUNET_assert (mq == pr->mq);
373   pr->mq = NULL;
374 }
375
376
377 /**
378  * Implementation function that cancels the currently sent message.
379  * Should basically undo whatever #mq_send_impl() did.
380  *
381  * @param mq message queue
382  * @param impl_state state specific to the implementation
383  */
384 static void
385 core_mq_cancel_impl (struct GNUNET_MQ_Handle *mq,
386                      void *impl_state)
387 {
388   struct PeerRecord *pr = impl_state;
389
390   GNUNET_assert (NULL != pr->env);
391   GNUNET_MQ_discard (pr->env);
392   pr->env = NULL;
393 }
394
395
396 /**
397  * We had an error processing a message we forwarded from a peer to
398  * the CORE service.  We should just complain about it but otherwise
399  * continue processing.
400  *
401  * @param cls closure
402  * @param error error code
403  */
404 static void
405 core_mq_error_handler (void *cls,
406                        enum GNUNET_MQ_Error error)
407 {
408   /* struct PeerRecord *pr = cls; */
409
410   GNUNET_break_op (0);
411 }
412
413
414 /**
415  * Add the given peer to the list of our connected peers
416  * and create the respective data structures and notify
417  * the application.
418  *
419  * @param h the core handle
420  * @param peer the peer that is connecting to us
421  */
422 static void
423 connect_peer (struct GNUNET_CORE_Handle *h,
424               const struct GNUNET_PeerIdentity *peer)
425 {
426   struct PeerRecord *pr;
427   uint64_t flags;
428   const void *extra;
429
430   pr = GNUNET_new (struct PeerRecord);
431   pr->peer = *peer;
432   pr->h = h;
433   GNUNET_assert (GNUNET_YES ==
434                  GNUNET_CONTAINER_multipeermap_put (h->peers,
435                                                     &pr->peer,
436                                                     pr,
437                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
438   pr->mq = GNUNET_MQ_queue_for_callbacks (&core_mq_send_impl,
439                                           &core_mq_destroy_impl,
440                                           &core_mq_cancel_impl,
441                                           pr,
442                                           h->handlers,
443                                           &core_mq_error_handler,
444                                           pr);
445   /* get our default options */
446   extra = GNUNET_CORE_get_mq_options (GNUNET_NO,
447                                       GNUNET_CORE_PRIO_BEST_EFFORT,
448                                       &flags);
449   GNUNET_MQ_set_options (pr->mq,
450                          flags,
451                          extra);
452   if (NULL != h->connects)
453   {
454     pr->client_cls = h->connects (h->cls,
455                                   &pr->peer,
456                                   pr->mq);
457     GNUNET_MQ_set_handlers_closure (pr->mq,
458                                     pr->client_cls);
459   }
460 }
461
462
463 /**
464  * Handle  init  reply message  received  from  CORE service.   Notify
465  * application  that we  are now  connected  to the  CORE.  Also  fake
466  * loopback connection.
467  *
468  * @param cls the `struct GNUNET_CORE_Handle`
469  * @param m the init reply
470  */
471 static void
472 handle_init_reply (void *cls,
473                    const struct InitReplyMessage *m)
474 {
475   struct GNUNET_CORE_Handle *h = cls;
476   GNUNET_CORE_StartupCallback init;
477
478   GNUNET_break (0 == ntohl (m->reserved));
479   h->retry_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
480   if (NULL != (init = h->init))
481   {
482     /* mark so we don't call init on reconnect */
483     h->init = NULL;
484     h->me = m->my_identity;
485     LOG (GNUNET_ERROR_TYPE_DEBUG,
486          "Connected to core service of peer `%s'.\n",
487          GNUNET_i2s (&h->me));
488     h->have_init = GNUNET_YES;
489     init (h->cls,
490           &h->me);
491   }
492   else
493   {
494     LOG (GNUNET_ERROR_TYPE_DEBUG,
495          "Successfully reconnected to core service.\n");
496     if (GNUNET_NO == h->have_init)
497     {
498       h->me = m->my_identity;
499       h->have_init = GNUNET_YES;
500     }
501     else
502     {
503       GNUNET_break (0 == memcmp (&h->me,
504                                  &m->my_identity,
505                                  sizeof (struct GNUNET_PeerIdentity)));
506     }
507   }
508   /* fake 'connect to self' */
509   connect_peer (h,
510                 &h->me);
511 }
512
513
514 /**
515  * Handle connect message received from CORE service.
516  * Notify the application about the new connection.
517  *
518  * @param cls the `struct GNUNET_CORE_Handle`
519  * @param cnm the connect message
520  */
521 static void
522 handle_connect_notify (void *cls,
523                        const struct ConnectNotifyMessage *cnm)
524 {
525   struct GNUNET_CORE_Handle *h = cls;
526   struct PeerRecord *pr;
527
528   LOG (GNUNET_ERROR_TYPE_DEBUG,
529        "Received notification about connection from `%s'.\n",
530        GNUNET_i2s (&cnm->peer));
531   if (0 == memcmp (&h->me,
532                    &cnm->peer,
533                    sizeof (struct GNUNET_PeerIdentity)))
534   {
535     /* connect to self!? */
536     GNUNET_break (0);
537     return;
538   }
539   pr = GNUNET_CONTAINER_multipeermap_get (h->peers,
540                                           &cnm->peer);
541   if (NULL != pr)
542   {
543     GNUNET_break (0);
544     reconnect_later (h);
545     return;
546   }
547   connect_peer (h,
548                 &cnm->peer);
549 }
550
551
552 /**
553  * Handle disconnect message received from CORE service.
554  * Notify the application about the lost connection.
555  *
556  * @param cls the `struct GNUNET_CORE_Handle`
557  * @param dnm message about the disconnect event
558  */
559 static void
560 handle_disconnect_notify (void *cls,
561                           const struct DisconnectNotifyMessage *dnm)
562 {
563   struct GNUNET_CORE_Handle *h = cls;
564   struct PeerRecord *pr;
565
566   if (0 == memcmp (&h->me,
567                    &dnm->peer,
568                    sizeof (struct GNUNET_PeerIdentity)))
569   {
570     /* disconnect from self!? */
571     GNUNET_break (0);
572     return;
573   }
574   GNUNET_break (0 == ntohl (dnm->reserved));
575   LOG (GNUNET_ERROR_TYPE_DEBUG,
576        "Received notification about disconnect from `%s'.\n",
577        GNUNET_i2s (&dnm->peer));
578   pr = GNUNET_CONTAINER_multipeermap_get (h->peers,
579                                           &dnm->peer);
580   if (NULL == pr)
581   {
582     GNUNET_break (0);
583     reconnect_later (h);
584     return;
585   }
586   disconnect_and_free_peer_entry (h,
587                                   &pr->peer,
588                                   pr);
589 }
590
591
592 /**
593  * Check that message received from CORE service is well-formed.
594  *
595  * @param cls the `struct GNUNET_CORE_Handle`
596  * @param ntm the message we got
597  * @return #GNUNET_OK if the message is well-formed
598  */
599 static int
600 check_notify_inbound (void *cls,
601                       const struct NotifyTrafficMessage *ntm)
602 {
603   uint16_t msize;
604   const struct GNUNET_MessageHeader *em;
605
606   msize = ntohs (ntm->header.size) - sizeof (struct NotifyTrafficMessage);
607   if (msize < sizeof (struct GNUNET_MessageHeader))
608   {
609     GNUNET_break (0);
610     return GNUNET_SYSERR;
611   }
612   em = (const struct GNUNET_MessageHeader *) &ntm[1];
613   if (msize != ntohs (em->size))
614   {
615     GNUNET_break (0);
616     return GNUNET_SYSERR;
617   }
618   return GNUNET_OK;
619 }
620
621
622 /**
623  * Handle inbound message received from CORE service.  If applicable,
624  * notify the application.
625  *
626  * @param cls the `struct GNUNET_CORE_Handle`
627  * @param ntm the message we got from CORE.
628  */
629 static void
630 handle_notify_inbound (void *cls,
631                        const struct NotifyTrafficMessage *ntm)
632 {
633   struct GNUNET_CORE_Handle *h = cls;
634   const struct GNUNET_MessageHeader *em;
635   struct PeerRecord *pr;
636
637   LOG (GNUNET_ERROR_TYPE_DEBUG,
638        "Received inbound message from `%s'.\n",
639        GNUNET_i2s (&ntm->peer));
640   em = (const struct GNUNET_MessageHeader *) &ntm[1];
641   pr = GNUNET_CONTAINER_multipeermap_get (h->peers,
642                                           &ntm->peer);
643   if (NULL == pr)
644   {
645     GNUNET_break (0);
646     reconnect_later (h);
647     return;
648   }
649   GNUNET_MQ_inject_message (pr->mq,
650                             em);
651 }
652
653
654 /**
655  * Handle message received from CORE service notifying us that we are
656  * now allowed to send a message to a peer.  If that message is still
657  * pending, put it into the queue to be transmitted.
658  *
659  * @param cls the `struct GNUNET_CORE_Handle`
660  * @param smr the message we got
661  */
662 static void
663 handle_send_ready (void *cls,
664                    const struct SendMessageReady *smr)
665 {
666   struct GNUNET_CORE_Handle *h = cls;
667   struct PeerRecord *pr;
668
669   pr = GNUNET_CONTAINER_multipeermap_get (h->peers,
670                                           &smr->peer);
671   if (NULL == pr)
672   {
673     GNUNET_break (0);
674     reconnect_later (h);
675     return;
676   }
677   LOG (GNUNET_ERROR_TYPE_DEBUG,
678        "Received notification about transmission readiness to `%s'.\n",
679        GNUNET_i2s (&smr->peer));
680   if (NULL == pr->env)
681   {
682     /* request must have been cancelled between the original request
683      * and the response from CORE, ignore CORE's readiness */
684     return;
685   }
686   if (ntohs (smr->smr_id) != pr->smr_id_gen)
687   {
688     /* READY message is for expired or cancelled message,
689      * ignore! (we should have already sent another request) */
690     return;
691   }
692
693   /* ok, all good, send message out! */
694   GNUNET_MQ_send (h->mq,
695                   pr->env);
696   pr->env = NULL;
697   GNUNET_MQ_impl_send_continue (pr->mq);
698 }
699
700
701 /**
702  * Our current client connection went down.  Clean it up and try to
703  * reconnect!
704  *
705  * @param h our handle to the core service
706  */
707 static void
708 reconnect (struct GNUNET_CORE_Handle *h)
709 {
710   struct GNUNET_MQ_MessageHandler handlers[] = {
711     GNUNET_MQ_hd_fixed_size (init_reply,
712                              GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY,
713                              struct InitReplyMessage,
714                              h),
715     GNUNET_MQ_hd_fixed_size (connect_notify,
716                              GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT,
717                              struct ConnectNotifyMessage,
718                              h),
719     GNUNET_MQ_hd_fixed_size (disconnect_notify,
720                              GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT,
721                              struct DisconnectNotifyMessage,
722                              h),
723     GNUNET_MQ_hd_var_size (notify_inbound,
724                            GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND,
725                            struct NotifyTrafficMessage,
726                            h),
727     GNUNET_MQ_hd_fixed_size (send_ready,
728                              GNUNET_MESSAGE_TYPE_CORE_SEND_READY,
729                              struct SendMessageReady,
730                              h),
731     GNUNET_MQ_handler_end ()
732   };
733   struct InitMessage *init;
734   struct GNUNET_MQ_Envelope *env;
735   uint16_t *ts;
736
737   GNUNET_assert (NULL == h->mq);
738   h->mq = GNUNET_CLIENT_connect (h->cfg,
739                                  "core",
740                                  handlers,
741                                  &handle_mq_error,
742                                  h);
743   if (NULL == h->mq)
744   {
745     reconnect_later (h);
746     return;
747   }
748   env = GNUNET_MQ_msg_extra (init,
749                              sizeof (uint16_t) * h->hcnt,
750                              GNUNET_MESSAGE_TYPE_CORE_INIT);
751   LOG (GNUNET_ERROR_TYPE_INFO,
752        "(Re)connecting to CORE service\n");
753   init->options = htonl (0);
754   ts = (uint16_t *) &init[1];
755   for (unsigned int hpos = 0; hpos < h->hcnt; hpos++)
756     ts[hpos] = htons (h->handlers[hpos].type);
757   GNUNET_MQ_send (h->mq,
758                   env);
759 }
760
761
762 /**
763  * Connect to the core service.  Note that the connection may complete
764  * (or fail) asynchronously.
765  *
766  * @param cfg configuration to use
767  * @param cls closure for the various callbacks that follow (including handlers in the handlers array)
768  * @param init callback to call once we have successfully
769  *        connected to the core service
770  * @param connects function to call on peer connect, can be NULL
771  * @param disconnects function to call on peer disconnect / timeout, can be NULL
772  * @param handlers callbacks for messages we care about, NULL-terminated
773  * @return handle to the core service (only useful for disconnect until @a init is called);
774  *                NULL on error (in this case, init is never called)
775  */
776 struct GNUNET_CORE_Handle *
777 GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
778                      void *cls,
779                      GNUNET_CORE_StartupCallback init,
780                      GNUNET_CORE_ConnectEventHandler connects,
781                      GNUNET_CORE_DisconnectEventHandler disconnects,
782                      const struct GNUNET_MQ_MessageHandler *handlers)
783 {
784   struct GNUNET_CORE_Handle *h;
785
786   h = GNUNET_new (struct GNUNET_CORE_Handle);
787   h->cfg = cfg;
788   h->cls = cls;
789   h->init = init;
790   h->connects = connects;
791   h->disconnects = disconnects;
792   h->peers = GNUNET_CONTAINER_multipeermap_create (128,
793                                                    GNUNET_NO);
794   h->handlers = GNUNET_MQ_copy_handlers (handlers);
795   h->hcnt = GNUNET_MQ_count_handlers (handlers);
796   GNUNET_assert (h->hcnt <
797                  (GNUNET_MAX_MESSAGE_SIZE -
798                   sizeof (struct InitMessage)) / sizeof (uint16_t));
799   LOG (GNUNET_ERROR_TYPE_DEBUG,
800        "Connecting to CORE service\n");
801   reconnect (h);
802   if (NULL == h->mq)
803   {
804     GNUNET_CORE_disconnect (h);
805     return NULL;
806   }
807   return h;
808 }
809
810
811 /**
812  * Disconnect from the core service.
813  *
814  * @param handle connection to core to disconnect
815  */
816 void
817 GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle)
818 {
819   LOG (GNUNET_ERROR_TYPE_DEBUG,
820        "Disconnecting from CORE service\n");
821   GNUNET_CONTAINER_multipeermap_iterate (handle->peers,
822                                          &disconnect_and_free_peer_entry,
823                                          handle);
824   GNUNET_CONTAINER_multipeermap_destroy (handle->peers);
825   handle->peers = NULL;
826   if (NULL != handle->reconnect_task)
827   {
828     GNUNET_SCHEDULER_cancel (handle->reconnect_task);
829     handle->reconnect_task = NULL;
830   }
831   if (NULL != handle->mq)
832   {
833     GNUNET_MQ_destroy (handle->mq);
834     handle->mq = NULL;
835   }
836   GNUNET_free_non_null (handle->handlers);
837   GNUNET_free (handle);
838 }
839
840
841 /**
842  * Obtain the message queue for a connected peer.
843  *
844  * @param h the core handle
845  * @param pid the identity of the peer to check if it has been connected to us
846  * @return NULL if peer is not connected
847  */
848 struct GNUNET_MQ_Handle *
849 GNUNET_CORE_get_mq (const struct GNUNET_CORE_Handle *h,
850                     const struct GNUNET_PeerIdentity *pid)
851 {
852   struct PeerRecord *pr;
853
854   pr = GNUNET_CONTAINER_multipeermap_get (h->peers,
855                                           pid);
856   if (NULL == pr)
857     return NULL;
858   return pr->mq;
859 }
860
861
862 /* end of core_api.c */