fix leak
[oweals/gnunet.git] / src / core / core_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009-2016 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file core/core_api.c
22  * @brief core service; this is the main API for encrypted P2P
23  *        communications
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_constants.h"
29 #include "gnunet_core_service.h"
30 #include "core.h"
31
32 #define LOG(kind,...) GNUNET_log_from (kind, "core-api",__VA_ARGS__)
33
34
35 /**
36  * Information we track for each peer.
37  */
38 struct PeerRecord
39 {
40
41   /**
42    * Corresponding CORE handle.
43    */
44   struct GNUNET_CORE_Handle *h;
45
46   /**
47    * Message queue for the peer.
48    */
49   struct GNUNET_MQ_Handle *mq;
50
51   /**
52    * Message we are currently trying to pass to the CORE service
53    * for this peer (from @e mq).
54    */
55   struct GNUNET_MQ_Envelope *env;
56
57   /**
58    * Value the client returned when we connected, used
59    * as the closure in various places.
60    */
61   void *client_cls;
62
63   /**
64    * Peer the record is about.
65    */
66   struct GNUNET_PeerIdentity peer;
67
68   /**
69    * SendMessageRequest ID generator for this peer.
70    */
71   uint16_t smr_id_gen;
72
73 };
74
75
76 /**
77  * Context for the core service connection.
78  */
79 struct GNUNET_CORE_Handle
80 {
81
82   /**
83    * Configuration we're using.
84    */
85   const struct GNUNET_CONFIGURATION_Handle *cfg;
86
87   /**
88    * Closure for the various callbacks.
89    */
90   void *cls;
91
92   /**
93    * Function to call once we've handshaked with the core service.
94    */
95   GNUNET_CORE_StartupCallback init;
96
97   /**
98    * Function to call whenever we're notified about a peer connecting.
99    */
100   GNUNET_CORE_ConnectEventHandler connects;
101
102   /**
103    * Function to call whenever we're notified about a peer disconnecting.
104    */
105   GNUNET_CORE_DisconnectEventHandler disconnects;
106
107   /**
108    * Function handlers for messages of particular type.
109    */
110   struct GNUNET_MQ_MessageHandler *handlers;
111
112   /**
113    * Our message queue for transmissions to the service.
114    */
115   struct GNUNET_MQ_Handle *mq;
116
117   /**
118    * Hash map listing all of the peers that we are currently
119    * connected to.
120    */
121   struct GNUNET_CONTAINER_MultiPeerMap *peers;
122
123   /**
124    * Identity of this peer.
125    */
126   struct GNUNET_PeerIdentity me;
127
128   /**
129    * ID of reconnect task (if any).
130    */
131   struct GNUNET_SCHEDULER_Task *reconnect_task;
132
133   /**
134    * Current delay we use for re-trying to connect to core.
135    */
136   struct GNUNET_TIME_Relative retry_backoff;
137
138   /**
139    * Number of entries in the handlers array.
140    */
141   unsigned int hcnt;
142
143   /**
144    * Did we ever get INIT?
145    */
146   int have_init;
147
148 };
149
150
151 /**
152  * Our current client connection went down.  Clean it up
153  * and try to reconnect!
154  *
155  * @param h our handle to the core service
156  */
157 static void
158 reconnect (struct GNUNET_CORE_Handle *h);
159
160
161 /**
162  * Task schedule to try to re-connect to core.
163  *
164  * @param cls the `struct GNUNET_CORE_Handle`
165  * @param tc task context
166  */
167 static void
168 reconnect_task (void *cls)
169 {
170   struct GNUNET_CORE_Handle *h = cls;
171
172   h->reconnect_task = NULL;
173   LOG (GNUNET_ERROR_TYPE_DEBUG,
174        "Connecting to CORE service after delay\n");
175   reconnect (h);
176 }
177
178
179 /**
180  * Notify clients about disconnect and free the entry for connected
181  * peer.
182  *
183  * @param cls the `struct GNUNET_CORE_Handle *`
184  * @param key the peer identity (not used)
185  * @param value the `struct PeerRecord` to free.
186  * @return #GNUNET_YES (continue)
187  */
188 static int
189 disconnect_and_free_peer_entry (void *cls,
190                                 const struct GNUNET_PeerIdentity *key,
191                                 void *value)
192 {
193   struct GNUNET_CORE_Handle *h = cls;
194   struct PeerRecord *pr = value;
195
196   GNUNET_assert (pr->h == h);
197   if (NULL != h->disconnects)
198     h->disconnects (h->cls,
199                     &pr->peer,
200                     pr->client_cls);
201   GNUNET_assert (GNUNET_YES ==
202                  GNUNET_CONTAINER_multipeermap_remove (h->peers,
203                                                        key,
204                                                        pr));
205   GNUNET_MQ_destroy (pr->mq);
206   GNUNET_assert (NULL == pr->mq);
207   if (NULL != pr->env)
208   {
209     GNUNET_MQ_discard (pr->env);
210     pr->env = NULL;
211   }
212   GNUNET_free (pr);
213   return GNUNET_YES;
214 }
215
216
217 /**
218  * Close down any existing connection to the CORE service and
219  * try re-establishing it later.
220  *
221  * @param h our handle
222  */
223 static void
224 reconnect_later (struct GNUNET_CORE_Handle *h)
225 {
226   GNUNET_assert (NULL == h->reconnect_task);
227   if (NULL != h->mq)
228   {
229     GNUNET_MQ_destroy (h->mq);
230     h->mq = NULL;
231   }
232   GNUNET_assert (NULL == h->reconnect_task);
233   h->reconnect_task =
234       GNUNET_SCHEDULER_add_delayed (h->retry_backoff,
235                                     &reconnect_task,
236                                     h);
237   GNUNET_CONTAINER_multipeermap_iterate (h->peers,
238                                          &disconnect_and_free_peer_entry,
239                                          h);
240   h->retry_backoff = GNUNET_TIME_STD_BACKOFF (h->retry_backoff);
241 }
242
243
244 /**
245  * Error handler for the message queue to the CORE service.
246  * On errors, we reconnect.
247  *
248  * @param cls closure, a `struct GNUNET_CORE_Handle *`
249  * @param error error code
250  */
251 static void
252 handle_mq_error (void *cls,
253                  enum GNUNET_MQ_Error error)
254 {
255   struct GNUNET_CORE_Handle *h = cls;
256
257   LOG (GNUNET_ERROR_TYPE_DEBUG,
258        "MQ ERROR: %d\n",
259        error);
260   reconnect_later (h);
261 }
262
263
264 /**
265  * Inquire with CORE what options should be set for a message
266  * so that it is transmitted with the given @a priority and
267  * the given @a cork value.
268  *
269  * @param cork desired corking
270  * @param priority desired message priority
271  * @param[out] flags set to `flags` value for #GNUNET_MQ_set_options()
272  * @return `extra` argument to give to #GNUNET_MQ_set_options()
273  */
274 const void *
275 GNUNET_CORE_get_mq_options (int cork,
276                             enum GNUNET_CORE_Priority priority,
277                             uint64_t *flags)
278 {
279   *flags = ((uint64_t) priority) + (((uint64_t) cork) << 32);
280   return NULL;
281 }
282
283
284 /**
285  * Implement sending functionality of a message queue for
286  * us sending messages to a peer.
287  *
288  * @param mq the message queue
289  * @param msg the message to send
290  * @param impl_state state of the implementation
291  */
292 static void
293 core_mq_send_impl (struct GNUNET_MQ_Handle *mq,
294                    const struct GNUNET_MessageHeader *msg,
295                    void *impl_state)
296 {
297   struct PeerRecord *pr = impl_state;
298   struct GNUNET_CORE_Handle *h = pr->h;
299   struct SendMessageRequest *smr;
300   struct SendMessage *sm;
301   struct GNUNET_MQ_Envelope *env;
302   uint16_t msize;
303   uint64_t flags;
304   int cork;
305   enum GNUNET_CORE_Priority priority;
306
307   if (NULL == h->mq)
308   {
309     /* We're currently reconnecting, pretend this worked */
310     GNUNET_MQ_impl_send_continue (mq);
311     return;
312   }
313   GNUNET_assert (NULL == pr->env);
314   /* extract options from envelope */
315   env = GNUNET_MQ_get_current_envelope (mq);
316   GNUNET_break (NULL ==
317                 GNUNET_MQ_env_get_options (env,
318                                            &flags));
319   cork = (int) (flags >> 32);
320   priority = (uint32_t) flags;
321
322   /* check message size for sanity */
323   msize = ntohs (msg->size);
324   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct SendMessage))
325   {
326     GNUNET_break (0);
327     GNUNET_MQ_impl_send_continue (mq);
328     return;
329   }
330
331   /* ask core for transmission */
332   LOG (GNUNET_ERROR_TYPE_DEBUG,
333        "Asking core for transmission of %u bytes to `%s'\n",
334        (unsigned int) msize,
335        GNUNET_i2s (&pr->peer));
336   env = GNUNET_MQ_msg (smr,
337                        GNUNET_MESSAGE_TYPE_CORE_SEND_REQUEST);
338   smr->priority = htonl ((uint32_t) priority);
339   // smr->deadline = GNUNET_TIME_absolute_hton (deadline);
340   smr->peer = pr->peer;
341   smr->reserved = htonl (0);
342   smr->size = htons (msize);
343   smr->smr_id = htons (++pr->smr_id_gen);
344   GNUNET_MQ_send (h->mq,
345                   env);
346
347   /* prepare message with actual transmission data */
348   pr->env = GNUNET_MQ_msg_nested_mh (sm,
349                                      GNUNET_MESSAGE_TYPE_CORE_SEND,
350                                      msg);
351   sm->priority = htonl ((uint32_t) priority);
352   // sm->deadline = GNUNET_TIME_absolute_hton (deadline);
353   sm->peer = pr->peer;
354   sm->cork = htonl ((uint32_t) cork);
355   sm->reserved = htonl (0);
356   LOG (GNUNET_ERROR_TYPE_DEBUG,
357        "Calling get_message with buffer of %u bytes (%s)\n",
358        (unsigned int) msize,
359        cork ? "corked" : "uncorked");
360 }
361
362
363 /**
364  * Handle destruction of a message queue.  Implementations must not
365  * free @a mq, but should take care of @a impl_state.
366  *
367  * @param mq the message queue to destroy
368  * @param impl_state state of the implementation
369  */
370 static void
371 core_mq_destroy_impl (struct GNUNET_MQ_Handle *mq,
372                       void *impl_state)
373 {
374   struct PeerRecord *pr = impl_state;
375
376   GNUNET_assert (mq == pr->mq);
377   pr->mq = NULL;
378 }
379
380
381 /**
382  * Implementation function that cancels the currently sent message.
383  * Should basically undo whatever #mq_send_impl() did.
384  *
385  * @param mq message queue
386  * @param impl_state state specific to the implementation
387  */
388 static void
389 core_mq_cancel_impl (struct GNUNET_MQ_Handle *mq,
390                      void *impl_state)
391 {
392   struct PeerRecord *pr = impl_state;
393
394   GNUNET_assert (NULL != pr->env);
395   GNUNET_MQ_discard (pr->env);
396   pr->env = NULL;
397 }
398
399
400 /**
401  * We had an error processing a message we forwarded from a peer to
402  * the CORE service.  We should just complain about it but otherwise
403  * continue processing.
404  *
405  * @param cls closure
406  * @param error error code
407  */
408 static void
409 core_mq_error_handler (void *cls,
410                        enum GNUNET_MQ_Error error)
411 {
412   /* struct PeerRecord *pr = cls; */
413
414   GNUNET_break_op (0);
415 }
416
417
418 /**
419  * Add the given peer to the list of our connected peers
420  * and create the respective data structures and notify
421  * the application.
422  *
423  * @param h the core handle
424  * @param peer the peer that is connecting to us
425  */
426 static void
427 connect_peer (struct GNUNET_CORE_Handle *h,
428               const struct GNUNET_PeerIdentity *peer)
429 {
430   struct PeerRecord *pr;
431   uint64_t flags;
432   const void *extra;
433
434   pr = GNUNET_new (struct PeerRecord);
435   pr->peer = *peer;
436   pr->h = h;
437   GNUNET_assert (GNUNET_YES ==
438                  GNUNET_CONTAINER_multipeermap_put (h->peers,
439                                                     &pr->peer,
440                                                     pr,
441                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
442   pr->mq = GNUNET_MQ_queue_for_callbacks (&core_mq_send_impl,
443                                           &core_mq_destroy_impl,
444                                           &core_mq_cancel_impl,
445                                           pr,
446                                           h->handlers,
447                                           &core_mq_error_handler,
448                                           pr);
449   /* get our default options */
450   extra = GNUNET_CORE_get_mq_options (GNUNET_NO,
451                                       GNUNET_CORE_PRIO_BEST_EFFORT,
452                                       &flags);
453   GNUNET_MQ_set_options (pr->mq,
454                          flags,
455                          extra);
456   if (NULL != h->connects)
457   {
458     pr->client_cls = h->connects (h->cls,
459                                   &pr->peer,
460                                   pr->mq);
461     GNUNET_MQ_set_handlers_closure (pr->mq,
462                                     pr->client_cls);
463   }
464 }
465
466
467 /**
468  * Handle  init  reply message  received  from  CORE service.   Notify
469  * application  that we  are now  connected  to the  CORE.  Also  fake
470  * loopback connection.
471  *
472  * @param cls the `struct GNUNET_CORE_Handle`
473  * @param m the init reply
474  */
475 static void
476 handle_init_reply (void *cls,
477                    const struct InitReplyMessage *m)
478 {
479   struct GNUNET_CORE_Handle *h = cls;
480   GNUNET_CORE_StartupCallback init;
481
482   GNUNET_break (0 == ntohl (m->reserved));
483   h->retry_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
484   if (NULL != (init = h->init))
485   {
486     /* mark so we don't call init on reconnect */
487     h->init = NULL;
488     h->me = m->my_identity;
489     LOG (GNUNET_ERROR_TYPE_DEBUG,
490          "Connected to core service of peer `%s'.\n",
491          GNUNET_i2s (&h->me));
492     h->have_init = GNUNET_YES;
493     init (h->cls,
494           &h->me);
495   }
496   else
497   {
498     LOG (GNUNET_ERROR_TYPE_DEBUG,
499          "Successfully reconnected to core service.\n");
500     if (GNUNET_NO == h->have_init)
501     {
502       h->me = m->my_identity;
503       h->have_init = GNUNET_YES;
504     }
505     else
506     {
507       GNUNET_break (0 == memcmp (&h->me,
508                                  &m->my_identity,
509                                  sizeof (struct GNUNET_PeerIdentity)));
510     }
511   }
512   /* fake 'connect to self' */
513   connect_peer (h,
514                 &h->me);
515 }
516
517
518 /**
519  * Handle connect message received from CORE service.
520  * Notify the application about the new connection.
521  *
522  * @param cls the `struct GNUNET_CORE_Handle`
523  * @param cnm the connect message
524  */
525 static void
526 handle_connect_notify (void *cls,
527                        const struct ConnectNotifyMessage *cnm)
528 {
529   struct GNUNET_CORE_Handle *h = cls;
530   struct PeerRecord *pr;
531
532   LOG (GNUNET_ERROR_TYPE_DEBUG,
533        "Received notification about connection from `%s'.\n",
534        GNUNET_i2s (&cnm->peer));
535   if (0 == memcmp (&h->me,
536                    &cnm->peer,
537                    sizeof (struct GNUNET_PeerIdentity)))
538   {
539     /* connect to self!? */
540     GNUNET_break (0);
541     return;
542   }
543   pr = GNUNET_CONTAINER_multipeermap_get (h->peers,
544                                           &cnm->peer);
545   if (NULL != pr)
546   {
547     GNUNET_break (0);
548     reconnect_later (h);
549     return;
550   }
551   connect_peer (h,
552                 &cnm->peer);
553 }
554
555
556 /**
557  * Handle disconnect message received from CORE service.
558  * Notify the application about the lost connection.
559  *
560  * @param cls the `struct GNUNET_CORE_Handle`
561  * @param dnm message about the disconnect event
562  */
563 static void
564 handle_disconnect_notify (void *cls,
565                           const struct DisconnectNotifyMessage *dnm)
566 {
567   struct GNUNET_CORE_Handle *h = cls;
568   struct PeerRecord *pr;
569
570   if (0 == memcmp (&h->me,
571                    &dnm->peer,
572                    sizeof (struct GNUNET_PeerIdentity)))
573   {
574     /* disconnect from self!? */
575     GNUNET_break (0);
576     return;
577   }
578   GNUNET_break (0 == ntohl (dnm->reserved));
579   LOG (GNUNET_ERROR_TYPE_DEBUG,
580        "Received notification about disconnect from `%s'.\n",
581        GNUNET_i2s (&dnm->peer));
582   pr = GNUNET_CONTAINER_multipeermap_get (h->peers,
583                                           &dnm->peer);
584   if (NULL == pr)
585   {
586     GNUNET_break (0);
587     reconnect_later (h);
588     return;
589   }
590   disconnect_and_free_peer_entry (h,
591                                   &pr->peer,
592                                   pr);
593 }
594
595
596 /**
597  * Check that message received from CORE service is well-formed.
598  *
599  * @param cls the `struct GNUNET_CORE_Handle`
600  * @param ntm the message we got
601  * @return #GNUNET_OK if the message is well-formed
602  */
603 static int
604 check_notify_inbound (void *cls,
605                       const struct NotifyTrafficMessage *ntm)
606 {
607   uint16_t msize;
608   const struct GNUNET_MessageHeader *em;
609
610   msize = ntohs (ntm->header.size) - sizeof (struct NotifyTrafficMessage);
611   if (msize < sizeof (struct GNUNET_MessageHeader))
612   {
613     GNUNET_break (0);
614     return GNUNET_SYSERR;
615   }
616   em = (const struct GNUNET_MessageHeader *) &ntm[1];
617   if (msize != ntohs (em->size))
618   {
619     GNUNET_break (0);
620     return GNUNET_SYSERR;
621   }
622   return GNUNET_OK;
623 }
624
625
626 /**
627  * Handle inbound message received from CORE service.  If applicable,
628  * notify the application.
629  *
630  * @param cls the `struct GNUNET_CORE_Handle`
631  * @param ntm the message we got from CORE.
632  */
633 static void
634 handle_notify_inbound (void *cls,
635                        const struct NotifyTrafficMessage *ntm)
636 {
637   struct GNUNET_CORE_Handle *h = cls;
638   const struct GNUNET_MessageHeader *em;
639   struct PeerRecord *pr;
640
641   LOG (GNUNET_ERROR_TYPE_DEBUG,
642        "Received inbound message from `%s'.\n",
643        GNUNET_i2s (&ntm->peer));
644   em = (const struct GNUNET_MessageHeader *) &ntm[1];
645   pr = GNUNET_CONTAINER_multipeermap_get (h->peers,
646                                           &ntm->peer);
647   if (NULL == pr)
648   {
649     GNUNET_break (0);
650     reconnect_later (h);
651     return;
652   }
653   GNUNET_MQ_inject_message (pr->mq,
654                             em);
655 }
656
657
658 /**
659  * Handle message received from CORE service notifying us that we are
660  * now allowed to send a message to a peer.  If that message is still
661  * pending, put it into the queue to be transmitted.
662  *
663  * @param cls the `struct GNUNET_CORE_Handle`
664  * @param smr the message we got
665  */
666 static void
667 handle_send_ready (void *cls,
668                    const struct SendMessageReady *smr)
669 {
670   struct GNUNET_CORE_Handle *h = cls;
671   struct PeerRecord *pr;
672
673   pr = GNUNET_CONTAINER_multipeermap_get (h->peers,
674                                           &smr->peer);
675   if (NULL == pr)
676   {
677     GNUNET_break (0);
678     reconnect_later (h);
679     return;
680   }
681   LOG (GNUNET_ERROR_TYPE_DEBUG,
682        "Received notification about transmission readiness to `%s'.\n",
683        GNUNET_i2s (&smr->peer));
684   if (NULL == pr->env)
685   {
686     /* request must have been cancelled between the original request
687      * and the response from CORE, ignore CORE's readiness */
688     return;
689   }
690   if (ntohs (smr->smr_id) != pr->smr_id_gen)
691   {
692     /* READY message is for expired or cancelled message,
693      * ignore! (we should have already sent another request) */
694     return;
695   }
696
697   /* ok, all good, send message out! */
698   GNUNET_MQ_send (h->mq,
699                   pr->env);
700   pr->env = NULL;
701   GNUNET_MQ_impl_send_continue (pr->mq);
702 }
703
704
705 /**
706  * Our current client connection went down.  Clean it up and try to
707  * reconnect!
708  *
709  * @param h our handle to the core service
710  */
711 static void
712 reconnect (struct GNUNET_CORE_Handle *h)
713 {
714   struct GNUNET_MQ_MessageHandler handlers[] = {
715     GNUNET_MQ_hd_fixed_size (init_reply,
716                              GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY,
717                              struct InitReplyMessage,
718                              h),
719     GNUNET_MQ_hd_fixed_size (connect_notify,
720                              GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT,
721                              struct ConnectNotifyMessage,
722                              h),
723     GNUNET_MQ_hd_fixed_size (disconnect_notify,
724                              GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT,
725                              struct DisconnectNotifyMessage,
726                              h),
727     GNUNET_MQ_hd_var_size (notify_inbound,
728                            GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND,
729                            struct NotifyTrafficMessage,
730                            h),
731     GNUNET_MQ_hd_fixed_size (send_ready,
732                              GNUNET_MESSAGE_TYPE_CORE_SEND_READY,
733                              struct SendMessageReady,
734                              h),
735     GNUNET_MQ_handler_end ()
736   };
737   struct InitMessage *init;
738   struct GNUNET_MQ_Envelope *env;
739   uint16_t *ts;
740
741   GNUNET_assert (NULL == h->mq);
742   h->mq = GNUNET_CLIENT_connect (h->cfg,
743                                  "core",
744                                  handlers,
745                                  &handle_mq_error,
746                                  h);
747   if (NULL == h->mq)
748   {
749     reconnect_later (h);
750     return;
751   }
752   env = GNUNET_MQ_msg_extra (init,
753                              sizeof (uint16_t) * h->hcnt,
754                              GNUNET_MESSAGE_TYPE_CORE_INIT);
755   LOG (GNUNET_ERROR_TYPE_INFO,
756        "(Re)connecting to CORE service\n");
757   init->options = htonl (0);
758   ts = (uint16_t *) &init[1];
759   for (unsigned int hpos = 0; hpos < h->hcnt; hpos++)
760     ts[hpos] = htons (h->handlers[hpos].type);
761   GNUNET_MQ_send (h->mq,
762                   env);
763 }
764
765
766 /**
767  * Connect to the core service.  Note that the connection may complete
768  * (or fail) asynchronously.
769  *
770  * @param cfg configuration to use
771  * @param cls closure for the various callbacks that follow (including handlers in the handlers array)
772  * @param init callback to call once we have successfully
773  *        connected to the core service
774  * @param connects function to call on peer connect, can be NULL
775  * @param disconnects function to call on peer disconnect / timeout, can be NULL
776  * @param handlers callbacks for messages we care about, NULL-terminated
777  * @return handle to the core service (only useful for disconnect until @a init is called);
778  *                NULL on error (in this case, init is never called)
779  */
780 struct GNUNET_CORE_Handle *
781 GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
782                      void *cls,
783                      GNUNET_CORE_StartupCallback init,
784                      GNUNET_CORE_ConnectEventHandler connects,
785                      GNUNET_CORE_DisconnectEventHandler disconnects,
786                      const struct GNUNET_MQ_MessageHandler *handlers)
787 {
788   struct GNUNET_CORE_Handle *h;
789   unsigned int hcnt;
790
791   h = GNUNET_new (struct GNUNET_CORE_Handle);
792   h->cfg = cfg;
793   h->cls = cls;
794   h->init = init;
795   h->connects = connects;
796   h->disconnects = disconnects;
797   h->peers = GNUNET_CONTAINER_multipeermap_create (128,
798                                                    GNUNET_NO);
799   hcnt = 0;
800   if (NULL != handlers)
801     while (NULL != handlers[hcnt].cb)
802       hcnt++;
803   h->handlers = GNUNET_new_array (hcnt + 1,
804                                   struct GNUNET_MQ_MessageHandler);
805   if (NULL != handlers)
806     GNUNET_memcpy (h->handlers,
807                    handlers,
808                    hcnt * sizeof (struct GNUNET_MQ_MessageHandler));
809   h->hcnt = hcnt;
810   GNUNET_assert (hcnt <
811                  (GNUNET_SERVER_MAX_MESSAGE_SIZE -
812                   sizeof (struct InitMessage)) / sizeof (uint16_t));
813   LOG (GNUNET_ERROR_TYPE_DEBUG,
814        "Connecting to CORE service\n");
815   reconnect (h);
816   if (NULL == h->mq)
817   {
818     GNUNET_CORE_disconnect (h);
819     return NULL;
820   }
821   return h;
822 }
823
824
825 /**
826  * Disconnect from the core service.
827  *
828  * @param handle connection to core to disconnect
829  */
830 void
831 GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle)
832 {
833   LOG (GNUNET_ERROR_TYPE_DEBUG,
834        "Disconnecting from CORE service\n");
835   GNUNET_CONTAINER_multipeermap_iterate (handle->peers,
836                                          &disconnect_and_free_peer_entry,
837                                          handle);
838   GNUNET_CONTAINER_multipeermap_destroy (handle->peers);
839   handle->peers = NULL;
840   if (NULL != handle->reconnect_task)
841   {
842     GNUNET_SCHEDULER_cancel (handle->reconnect_task);
843     handle->reconnect_task = NULL;
844   }
845   if (NULL != handle->mq)
846   {
847     GNUNET_MQ_destroy (handle->mq);
848     handle->mq = NULL;
849   }
850   GNUNET_free (handle->handlers);
851   GNUNET_free (handle);
852 }
853
854
855 /**
856  * Obtain the message queue for a connected peer.
857  *
858  * @param h the core handle
859  * @param pid the identity of the peer to check if it has been connected to us
860  * @return NULL if peer is not connected
861  */
862 struct GNUNET_MQ_Handle *
863 GNUNET_CORE_get_mq (const struct GNUNET_CORE_Handle *h,
864                     const struct GNUNET_PeerIdentity *pid)
865 {
866   struct PeerRecord *pr;
867
868   pr = GNUNET_CONTAINER_multipeermap_get (h->peers,
869                                           pid);
870   if (NULL == pr)
871     return NULL;
872   return pr->mq;
873 }
874
875
876 /* end of core_api.c */