rename connecT -> connect now that the old API is dead
[oweals/gnunet.git] / src / core / core_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009-2016 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file core/core_api.c
22  * @brief core service; this is the main API for encrypted P2P
23  *        communications
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_constants.h"
29 #include "gnunet_core_service.h"
30 #include "core.h"
31
32 #define LOG(kind,...) GNUNET_log_from (kind, "core-api",__VA_ARGS__)
33
34
35 /**
36  * Information we track for each peer.
37  */
38 struct PeerRecord
39 {
40
41   /**
42    * Corresponding CORE handle.
43    */
44   struct GNUNET_CORE_Handle *h;
45
46   /**
47    * Message queue for the peer.
48    */
49   struct GNUNET_MQ_Handle *mq;
50
51   /**
52    * Message we are currently trying to pass to the CORE service
53    * for this peer (from @e mq).
54    */
55   struct GNUNET_MQ_Envelope *env;
56
57   /**
58    * Value the client returned when we connected, used
59    * as the closure in various places.
60    */
61   void *client_cls;
62
63   /**
64    * Peer the record is about.
65    */
66   struct GNUNET_PeerIdentity peer;
67
68   /**
69    * SendMessageRequest ID generator for this peer.
70    */
71   uint16_t smr_id_gen;
72
73 };
74
75
76 /**
77  * Context for the core service connection.
78  */
79 struct GNUNET_CORE_Handle
80 {
81
82   /**
83    * Configuration we're using.
84    */
85   const struct GNUNET_CONFIGURATION_Handle *cfg;
86
87   /**
88    * Closure for the various callbacks.
89    */
90   void *cls;
91
92   /**
93    * Function to call once we've handshaked with the core service.
94    */
95   GNUNET_CORE_StartupCallback init;
96
97   /**
98    * Function to call whenever we're notified about a peer connecting.
99    */
100   GNUNET_CORE_ConnectEventHandler connects;
101
102   /**
103    * Function to call whenever we're notified about a peer disconnecting.
104    */
105   GNUNET_CORE_DisconnectEventHandler disconnects;
106
107   /**
108    * Function handlers for messages of particular type.
109    */
110   struct GNUNET_MQ_MessageHandler *handlers;
111
112   /**
113    * Our message queue for transmissions to the service.
114    */
115   struct GNUNET_MQ_Handle *mq;
116
117   /**
118    * Hash map listing all of the peers that we are currently
119    * connected to.
120    */
121   struct GNUNET_CONTAINER_MultiPeerMap *peers;
122
123   /**
124    * Identity of this peer.
125    */
126   struct GNUNET_PeerIdentity me;
127
128   /**
129    * ID of reconnect task (if any).
130    */
131   struct GNUNET_SCHEDULER_Task *reconnect_task;
132
133   /**
134    * Current delay we use for re-trying to connect to core.
135    */
136   struct GNUNET_TIME_Relative retry_backoff;
137
138   /**
139    * Number of entries in the handlers array.
140    */
141   unsigned int hcnt;
142
143   /**
144    * Did we ever get INIT?
145    */
146   int have_init;
147
148 };
149
150
151 /**
152  * Our current client connection went down.  Clean it up
153  * and try to reconnect!
154  *
155  * @param h our handle to the core service
156  */
157 static void
158 reconnect (struct GNUNET_CORE_Handle *h);
159
160
161 /**
162  * Task schedule to try to re-connect to core.
163  *
164  * @param cls the `struct GNUNET_CORE_Handle`
165  * @param tc task context
166  */
167 static void
168 reconnect_task (void *cls)
169 {
170   struct GNUNET_CORE_Handle *h = cls;
171
172   h->reconnect_task = NULL;
173   LOG (GNUNET_ERROR_TYPE_DEBUG,
174        "Connecting to CORE service after delay\n");
175   reconnect (h);
176 }
177
178
179 /**
180  * Notify clients about disconnect and free the entry for connected
181  * peer.
182  *
183  * @param cls the `struct GNUNET_CORE_Handle *`
184  * @param key the peer identity (not used)
185  * @param value the `struct PeerRecord` to free.
186  * @return #GNUNET_YES (continue)
187  */
188 static int
189 disconnect_and_free_peer_entry (void *cls,
190                                 const struct GNUNET_PeerIdentity *key,
191                                 void *value)
192 {
193   struct GNUNET_CORE_Handle *h = cls;
194   struct PeerRecord *pr = value;
195
196   GNUNET_assert (pr->h == h);
197   if (NULL != h->disconnects)
198     h->disconnects (h->cls,
199                     &pr->peer,
200                     pr->client_cls);
201   GNUNET_assert (GNUNET_YES ==
202                  GNUNET_CONTAINER_multipeermap_remove (h->peers,
203                                                        key,
204                                                        pr));
205   GNUNET_MQ_destroy (pr->mq);
206   GNUNET_assert (NULL == pr->mq);
207   GNUNET_free (pr);
208   return GNUNET_YES;
209 }
210
211
212 /**
213  * Close down any existing connection to the CORE service and
214  * try re-establishing it later.
215  *
216  * @param h our handle
217  */
218 static void
219 reconnect_later (struct GNUNET_CORE_Handle *h)
220 {
221   GNUNET_assert (NULL == h->reconnect_task);
222   if (NULL != h->mq)
223   {
224     GNUNET_MQ_destroy (h->mq);
225     h->mq = NULL;
226   }
227   GNUNET_assert (NULL == h->reconnect_task);
228   h->reconnect_task =
229       GNUNET_SCHEDULER_add_delayed (h->retry_backoff,
230                                     &reconnect_task,
231                                     h);
232   GNUNET_CONTAINER_multipeermap_iterate (h->peers,
233                                          &disconnect_and_free_peer_entry,
234                                          h);
235   h->retry_backoff = GNUNET_TIME_STD_BACKOFF (h->retry_backoff);
236 }
237
238
239 /**
240  * Error handler for the message queue to the CORE service.
241  * On errors, we reconnect.
242  *
243  * @param cls closure, a `struct GNUNET_CORE_Handle *`
244  * @param error error code
245  */
246 static void
247 handle_mq_error (void *cls,
248                  enum GNUNET_MQ_Error error)
249 {
250   struct GNUNET_CORE_Handle *h = cls;
251
252   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
253               "MQ ERROR: %d\n",
254               error);
255   reconnect_later (h);
256 }
257
258
259 /**
260  * Inquire with CORE what options should be set for a message
261  * so that it is transmitted with the given @a priority and
262  * the given @a cork value.
263  *
264  * @param cork desired corking
265  * @param priority desired message priority
266  * @param[out] flags set to `flags` value for #GNUNET_MQ_set_options()
267  * @return `extra` argument to give to #GNUNET_MQ_set_options()
268  */
269 const void *
270 GNUNET_CORE_get_mq_options (int cork,
271                             enum GNUNET_CORE_Priority priority,
272                             uint64_t *flags)
273 {
274   *flags = ((uint64_t) priority) + (((uint64_t) cork) << 32);
275   return NULL;
276 }
277
278
279 /**
280  * Implement sending functionality of a message queue for
281  * us sending messages to a peer.
282  *
283  * @param mq the message queue
284  * @param msg the message to send
285  * @param impl_state state of the implementation
286  */
287 static void
288 core_mq_send_impl (struct GNUNET_MQ_Handle *mq,
289                    const struct GNUNET_MessageHeader *msg,
290                    void *impl_state)
291 {
292   struct PeerRecord *pr = impl_state;
293   struct GNUNET_CORE_Handle *h = pr->h;
294   struct SendMessageRequest *smr;
295   struct SendMessage *sm;
296   struct GNUNET_MQ_Envelope *env;
297   uint16_t msize;
298   uint64_t flags;
299   int cork;
300   enum GNUNET_CORE_Priority priority;
301
302   if (NULL == h->mq)
303   {
304     /* We're currently reconnecting, pretend this worked */
305     GNUNET_MQ_impl_send_continue (mq);
306     return;
307   }
308   GNUNET_assert (NULL == pr->env);
309   /* extract options from envelope */
310   env = GNUNET_MQ_get_current_envelope (mq);
311   GNUNET_break (NULL ==
312                 GNUNET_MQ_env_get_options (env,
313                                            &flags));
314   cork = (int) (flags >> 32);
315   priority = (uint32_t) flags;
316
317   /* check message size for sanity */
318   msize = ntohs (msg->size);
319   if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct SendMessage))
320   {
321     GNUNET_break (0);
322     GNUNET_MQ_impl_send_continue (mq);
323     return;
324   }
325
326   /* ask core for transmission */
327   LOG (GNUNET_ERROR_TYPE_DEBUG,
328        "Asking core for transmission of %u bytes to `%s'\n",
329        (unsigned int) msize,
330        GNUNET_i2s (&pr->peer));
331   env = GNUNET_MQ_msg (smr,
332                        GNUNET_MESSAGE_TYPE_CORE_SEND_REQUEST);
333   smr->priority = htonl ((uint32_t) priority);
334   // smr->deadline = GNUNET_TIME_absolute_hton (deadline);
335   smr->peer = pr->peer;
336   smr->reserved = htonl (0);
337   smr->size = htons (msize);
338   smr->smr_id = htons (++pr->smr_id_gen);
339   GNUNET_MQ_send (h->mq,
340                   env);
341
342   /* prepare message with actual transmission data */
343   pr->env = GNUNET_MQ_msg_nested_mh (sm,
344                                      GNUNET_MESSAGE_TYPE_CORE_SEND,
345                                      msg);
346   sm->priority = htonl ((uint32_t) priority);
347   // sm->deadline = GNUNET_TIME_absolute_hton (deadline);
348   sm->peer = pr->peer;
349   sm->cork = htonl ((uint32_t) cork);
350   sm->reserved = htonl (0);
351   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
352               "Calling get_message with buffer of %u bytes (%s)\n",
353               (unsigned int) msize,
354               cork ? "corked" : "uncorked");
355 }
356
357
358 /**
359  * Handle destruction of a message queue.  Implementations must not
360  * free @a mq, but should take care of @a impl_state.
361  *
362  * @param mq the message queue to destroy
363  * @param impl_state state of the implementation
364  */
365 static void
366 core_mq_destroy_impl (struct GNUNET_MQ_Handle *mq,
367                       void *impl_state)
368 {
369   struct PeerRecord *pr = impl_state;
370
371   GNUNET_assert (mq == pr->mq);
372   pr->mq = NULL;
373 }
374
375
376 /**
377  * Implementation function that cancels the currently sent message.
378  * Should basically undo whatever #mq_send_impl() did.
379  *
380  * @param mq message queue
381  * @param impl_state state specific to the implementation
382  */
383 static void
384 core_mq_cancel_impl (struct GNUNET_MQ_Handle *mq,
385                      void *impl_state)
386 {
387   struct PeerRecord *pr = impl_state;
388
389   GNUNET_assert (NULL != pr->env);
390   GNUNET_MQ_discard (pr->env);
391   pr->env = NULL;
392 }
393
394
395 /**
396  * We had an error processing a message we forwarded from a peer to
397  * the CORE service.  We should just complain about it but otherwise
398  * continue processing.
399  *
400  * @param cls closure
401  * @param error error code
402  */
403 static void
404 core_mq_error_handler (void *cls,
405                        enum GNUNET_MQ_Error error)
406 {
407   /* struct PeerRecord *pr = cls; */
408
409   GNUNET_break_op (0);
410 }
411
412
413 /**
414  * Add the given peer to the list of our connected peers
415  * and create the respective data structures and notify
416  * the application.
417  *
418  * @param h the core handle
419  * @param peer the peer that is connecting to us
420  */
421 static void
422 connect_peer (struct GNUNET_CORE_Handle *h,
423               const struct GNUNET_PeerIdentity *peer)
424 {
425   struct PeerRecord *pr;
426   uint64_t flags;
427   const void *extra;
428
429   pr = GNUNET_new (struct PeerRecord);
430   pr->peer = *peer;
431   pr->h = h;
432   GNUNET_assert (GNUNET_YES ==
433                  GNUNET_CONTAINER_multipeermap_put (h->peers,
434                                                     &pr->peer,
435                                                     pr,
436                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
437   pr->mq = GNUNET_MQ_queue_for_callbacks (&core_mq_send_impl,
438                                           &core_mq_destroy_impl,
439                                           &core_mq_cancel_impl,
440                                           pr,
441                                           h->handlers,
442                                           &core_mq_error_handler,
443                                           pr);
444   /* get our default options */
445   extra = GNUNET_CORE_get_mq_options (GNUNET_NO,
446                                       GNUNET_CORE_PRIO_BEST_EFFORT,
447                                       &flags);
448   GNUNET_MQ_set_options (pr->mq,
449                          flags,
450                          extra);
451   if (NULL != h->connects)
452   {
453     pr->client_cls = h->connects (h->cls,
454                                   &pr->peer,
455                                   pr->mq);
456     GNUNET_MQ_set_handlers_closure (pr->mq,
457                                     pr->client_cls);
458   }
459 }
460
461
462 /**
463  * Handle  init  reply message  received  from  CORE service.   Notify
464  * application  that we  are now  connected  to the  CORE.  Also  fake
465  * loopback connection.
466  *
467  * @param cls the `struct GNUNET_CORE_Handle`
468  * @param m the init reply
469  */
470 static void
471 handle_init_reply (void *cls,
472                    const struct InitReplyMessage *m)
473 {
474   struct GNUNET_CORE_Handle *h = cls;
475   GNUNET_CORE_StartupCallback init;
476
477   GNUNET_break (0 == ntohl (m->reserved));
478   h->retry_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
479   if (NULL != (init = h->init))
480   {
481     /* mark so we don't call init on reconnect */
482     h->init = NULL;
483     h->me = m->my_identity;
484     LOG (GNUNET_ERROR_TYPE_DEBUG,
485          "Connected to core service of peer `%s'.\n",
486          GNUNET_i2s (&h->me));
487     h->have_init = GNUNET_YES;
488     init (h->cls,
489           &h->me);
490   }
491   else
492   {
493     LOG (GNUNET_ERROR_TYPE_DEBUG,
494          "Successfully reconnected to core service.\n");
495     if (GNUNET_NO == h->have_init)
496     {
497       h->me = m->my_identity;
498       h->have_init = GNUNET_YES;
499     }
500     else
501     {
502       GNUNET_break (0 == memcmp (&h->me,
503                                  &m->my_identity,
504                                  sizeof (struct GNUNET_PeerIdentity)));
505     }
506   }
507   /* fake 'connect to self' */
508   connect_peer (h,
509                 &h->me);
510 }
511
512
513 /**
514  * Handle connect message received from CORE service.
515  * Notify the application about the new connection.
516  *
517  * @param cls the `struct GNUNET_CORE_Handle`
518  * @param cnm the connect message
519  */
520 static void
521 handle_connect_notify (void *cls,
522                        const struct ConnectNotifyMessage *cnm)
523 {
524   struct GNUNET_CORE_Handle *h = cls;
525   struct PeerRecord *pr;
526
527   LOG (GNUNET_ERROR_TYPE_DEBUG,
528        "Received notification about connection from `%s'.\n",
529        GNUNET_i2s (&cnm->peer));
530   if (0 == memcmp (&h->me,
531                    &cnm->peer,
532                    sizeof (struct GNUNET_PeerIdentity)))
533   {
534     /* connect to self!? */
535     GNUNET_break (0);
536     return;
537   }
538   pr = GNUNET_CONTAINER_multipeermap_get (h->peers,
539                                           &cnm->peer);
540   if (NULL != pr)
541   {
542     GNUNET_break (0);
543     reconnect_later (h);
544     return;
545   }
546   connect_peer (h,
547                 &cnm->peer);
548 }
549
550
551 /**
552  * Handle disconnect message received from CORE service.
553  * Notify the application about the lost connection.
554  *
555  * @param cls the `struct GNUNET_CORE_Handle`
556  * @param dnm message about the disconnect event
557  */
558 static void
559 handle_disconnect_notify (void *cls,
560                           const struct DisconnectNotifyMessage *dnm)
561 {
562   struct GNUNET_CORE_Handle *h = cls;
563   struct PeerRecord *pr;
564
565   if (0 == memcmp (&h->me,
566                    &dnm->peer,
567                    sizeof (struct GNUNET_PeerIdentity)))
568   {
569     /* disconnect from self!? */
570     GNUNET_break (0);
571     return;
572   }
573   GNUNET_break (0 == ntohl (dnm->reserved));
574   LOG (GNUNET_ERROR_TYPE_DEBUG,
575        "Received notification about disconnect from `%s'.\n",
576        GNUNET_i2s (&dnm->peer));
577   pr = GNUNET_CONTAINER_multipeermap_get (h->peers,
578                                           &dnm->peer);
579   if (NULL == pr)
580   {
581     GNUNET_break (0);
582     reconnect_later (h);
583     return;
584   }
585   disconnect_and_free_peer_entry (h,
586                                   &pr->peer,
587                                   pr);
588 }
589
590
591 /**
592  * Check that message received from CORE service is well-formed.
593  *
594  * @param cls the `struct GNUNET_CORE_Handle`
595  * @param ntm the message we got
596  * @return #GNUNET_OK if the message is well-formed
597  */
598 static int
599 check_notify_inbound (void *cls,
600                       const struct NotifyTrafficMessage *ntm)
601 {
602   uint16_t msize;
603   const struct GNUNET_MessageHeader *em;
604
605   msize = ntohs (ntm->header.size) - sizeof (struct NotifyTrafficMessage);
606   if (msize < sizeof (struct GNUNET_MessageHeader))
607   {
608     GNUNET_break (0);
609     return GNUNET_SYSERR;
610   }
611   em = (const struct GNUNET_MessageHeader *) &ntm[1];
612   if (msize != ntohs (em->size))
613   {
614     GNUNET_break (0);
615     return GNUNET_SYSERR;
616   }
617   return GNUNET_OK;
618 }
619
620
621 /**
622  * Handle inbound message received from CORE service.  If applicable,
623  * notify the application.
624  *
625  * @param cls the `struct GNUNET_CORE_Handle`
626  * @param ntm the message we got from CORE.
627  */
628 static void
629 handle_notify_inbound (void *cls,
630                        const struct NotifyTrafficMessage *ntm)
631 {
632   struct GNUNET_CORE_Handle *h = cls;
633   const struct GNUNET_MessageHeader *em;
634   struct PeerRecord *pr;
635
636   LOG (GNUNET_ERROR_TYPE_DEBUG,
637        "Received inbound message from `%s'.\n",
638        GNUNET_i2s (&ntm->peer));
639   em = (const struct GNUNET_MessageHeader *) &ntm[1];
640   pr = GNUNET_CONTAINER_multipeermap_get (h->peers,
641                                           &ntm->peer);
642   if (NULL == pr)
643   {
644     GNUNET_break (0);
645     reconnect_later (h);
646     return;
647   }
648   GNUNET_MQ_inject_message (pr->mq,
649                             em);
650 }
651
652
653 /**
654  * Handle message received from CORE service notifying us that we are
655  * now allowed to send a message to a peer.  If that message is still
656  * pending, put it into the queue to be transmitted.
657  *
658  * @param cls the `struct GNUNET_CORE_Handle`
659  * @param smr the message we got
660  */
661 static void
662 handle_send_ready (void *cls,
663                    const struct SendMessageReady *smr)
664 {
665   struct GNUNET_CORE_Handle *h = cls;
666   struct PeerRecord *pr;
667
668   pr = GNUNET_CONTAINER_multipeermap_get (h->peers,
669                                           &smr->peer);
670   if (NULL == pr)
671   {
672     GNUNET_break (0);
673     reconnect_later (h);
674     return;
675   }
676   LOG (GNUNET_ERROR_TYPE_DEBUG,
677        "Received notification about transmission readiness to `%s'.\n",
678        GNUNET_i2s (&smr->peer));
679   if (NULL == pr->env)
680   {
681     /* request must have been cancelled between the original request
682      * and the response from CORE, ignore CORE's readiness */
683     return;
684   }
685   if (ntohs (smr->smr_id) != pr->smr_id_gen)
686   {
687     /* READY message is for expired or cancelled message,
688      * ignore! (we should have already sent another request) */
689     return;
690   }
691
692   /* ok, all good, send message out! */
693   GNUNET_MQ_send (h->mq,
694                   pr->env);
695   pr->env = NULL;
696   GNUNET_MQ_impl_send_continue (pr->mq);
697 }
698
699
700 /**
701  * Our current client connection went down.  Clean it up and try to
702  * reconnect!
703  *
704  * @param h our handle to the core service
705  */
706 static void
707 reconnect (struct GNUNET_CORE_Handle *h)
708 {
709   struct GNUNET_MQ_MessageHandler handlers[] = {
710     GNUNET_MQ_hd_fixed_size (init_reply,
711                              GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY,
712                              struct InitReplyMessage,
713                              h),
714     GNUNET_MQ_hd_fixed_size (connect_notify,
715                              GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT,
716                              struct ConnectNotifyMessage,
717                              h),
718     GNUNET_MQ_hd_fixed_size (disconnect_notify,
719                              GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT,
720                              struct DisconnectNotifyMessage,
721                              h),
722     GNUNET_MQ_hd_var_size (notify_inbound,
723                            GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND,
724                            struct NotifyTrafficMessage,
725                            h),
726     GNUNET_MQ_hd_fixed_size (send_ready,
727                              GNUNET_MESSAGE_TYPE_CORE_SEND_READY,
728                              struct SendMessageReady,
729                              h),
730     GNUNET_MQ_handler_end ()
731   };
732   struct InitMessage *init;
733   struct GNUNET_MQ_Envelope *env;
734   uint16_t *ts;
735
736   GNUNET_assert (NULL == h->mq);
737   h->mq = GNUNET_CLIENT_connect (h->cfg,
738                                  "core",
739                                  handlers,
740                                  &handle_mq_error,
741                                  h);
742   if (NULL == h->mq)
743   {
744     reconnect_later (h);
745     return;
746   }
747   env = GNUNET_MQ_msg_extra (init,
748                              sizeof (uint16_t) * h->hcnt,
749                              GNUNET_MESSAGE_TYPE_CORE_INIT);
750   LOG (GNUNET_ERROR_TYPE_INFO,
751        "(Re)connecting to CORE service\n");
752   init->options = htonl (0);
753   ts = (uint16_t *) &init[1];
754   for (unsigned int hpos = 0; hpos < h->hcnt; hpos++)
755     ts[hpos] = htons (h->handlers[hpos].type);
756   GNUNET_MQ_send (h->mq,
757                   env);
758 }
759
760
761 /**
762  * Connect to the core service.  Note that the connection may complete
763  * (or fail) asynchronously.
764  *
765  * @param cfg configuration to use
766  * @param cls closure for the various callbacks that follow (including handlers in the handlers array)
767  * @param init callback to call once we have successfully
768  *        connected to the core service
769  * @param connects function to call on peer connect, can be NULL
770  * @param disconnects function to call on peer disconnect / timeout, can be NULL
771  * @param handlers callbacks for messages we care about, NULL-terminated
772  * @return handle to the core service (only useful for disconnect until @a init is called);
773  *                NULL on error (in this case, init is never called)
774  */
775 struct GNUNET_CORE_Handle *
776 GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
777                      void *cls,
778                      GNUNET_CORE_StartupCallback init,
779                      GNUNET_CORE_ConnectEventHandler connects,
780                      GNUNET_CORE_DisconnectEventHandler disconnects,
781                      const struct GNUNET_MQ_MessageHandler *handlers)
782 {
783   struct GNUNET_CORE_Handle *h;
784   unsigned int hcnt;
785
786   h = GNUNET_new (struct GNUNET_CORE_Handle);
787   h->cfg = cfg;
788   h->cls = cls;
789   h->init = init;
790   h->connects = connects;
791   h->disconnects = disconnects;
792   h->peers = GNUNET_CONTAINER_multipeermap_create (128,
793                                                    GNUNET_NO);
794   hcnt = 0;
795   if (NULL != handlers)
796     while (NULL != handlers[hcnt].cb)
797       hcnt++;
798   h->handlers = GNUNET_new_array (hcnt + 1,
799                                   struct GNUNET_MQ_MessageHandler);
800   if (NULL != handlers)
801     GNUNET_memcpy (h->handlers,
802                    handlers,
803                    hcnt * sizeof (struct GNUNET_MQ_MessageHandler));
804   h->hcnt = hcnt;
805   GNUNET_assert (hcnt <
806                  (GNUNET_SERVER_MAX_MESSAGE_SIZE -
807                   sizeof (struct InitMessage)) / sizeof (uint16_t));
808   LOG (GNUNET_ERROR_TYPE_DEBUG,
809        "Connecting to CORE service\n");
810   reconnect (h);
811   if (NULL == h->mq)
812   {
813     GNUNET_CORE_disconnect (h);
814     return NULL;
815   }
816   return h;
817 }
818
819
820 /**
821  * Disconnect from the core service.
822  *
823  * @param handle connection to core to disconnect
824  */
825 void
826 GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle)
827 {
828   LOG (GNUNET_ERROR_TYPE_DEBUG,
829        "Disconnecting from CORE service\n");
830   GNUNET_CONTAINER_multipeermap_iterate (handle->peers,
831                                          &disconnect_and_free_peer_entry,
832                                          handle);
833   GNUNET_CONTAINER_multipeermap_destroy (handle->peers);
834   handle->peers = NULL;
835   if (NULL != handle->reconnect_task)
836   {
837     GNUNET_SCHEDULER_cancel (handle->reconnect_task);
838     handle->reconnect_task = NULL;
839   }
840   if (NULL != handle->mq)
841   {
842     GNUNET_MQ_destroy (handle->mq);
843     handle->mq = NULL;
844   }
845   GNUNET_free (handle->handlers);
846   GNUNET_free (handle);
847 }
848
849
850 /**
851  * Obtain the message queue for a connected peer.
852  *
853  * @param h the core handle
854  * @param pid the identity of the peer to check if it has been connected to us
855  * @return NULL if peer is not connected
856  */
857 struct GNUNET_MQ_Handle *
858 GNUNET_CORE_get_mq (const struct GNUNET_CORE_Handle *h,
859                     const struct GNUNET_PeerIdentity *pid)
860 {
861   struct PeerRecord *pr;
862
863   pr = GNUNET_CONTAINER_multipeermap_get (h->peers,
864                                           pid);
865   if (NULL == pr)
866     return NULL;
867   return pr->mq;
868 }
869
870
871 /* end of core_api.c */