porting xdht to new service API, major code de-duplication effort
[oweals/gnunet.git] / src / core / core_api_mq.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009-2014 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file core/core_api_mq.c
22  * @brief MQ support for core service
23  * @author Christian Grothoff
24  * @author Florian Dold
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_constants.h"
29 #include "gnunet_core_service.h"
30 #include "core.h"
31
32 #define LOG(kind,...) GNUNET_log_from (kind, "core-api",__VA_ARGS__)
33
34
35 /**
36  * Internal state of a GNUNET-MQ queue for CORE.
37  */
38 struct CoreMQState
39 {
40   /**
41    * Which peer does this queue target?
42    */
43   struct GNUNET_PeerIdentity target;
44
45   /**
46    * Handle to the CORE service used by this MQ.
47    */
48   struct GNUNET_CORE_Handle *core;
49
50   /**
51    * Transmission handle (if in use).
52    */
53   struct GNUNET_CORE_TransmitHandle *th;
54 };
55
56
57 /**
58  * Function called to notify a client about the connection
59  * begin ready to queue more data.  @a buf will be
60  * NULL and @a size zero if the connection was closed for
61  * writing in the meantime.
62  *
63  * @param cls closure
64  * @param size number of bytes available in @a buf
65  * @param buf where the callee should write the message
66  * @return number of bytes written to @a buf
67  */
68 static size_t
69 core_mq_ntr (void *cls, size_t size,
70              void *buf)
71 {
72   struct GNUNET_MQ_Handle *mq = cls;
73   struct CoreMQState *mqs = GNUNET_MQ_impl_state (mq);
74   const struct GNUNET_MessageHeader *mh = GNUNET_MQ_impl_current (mq);
75   size_t msg_size = ntohs (mh->size);
76
77   LOG (GNUNET_ERROR_TYPE_DEBUG,
78        "ntr called (size %u, type %u)\n",
79        msg_size,
80        ntohs (mh->type));
81   mqs->th = NULL;
82   if (NULL == buf)
83   {
84     LOG (GNUNET_ERROR_TYPE_DEBUG,
85          "send error\n");
86     GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_WRITE);
87     return 0;
88   }
89   GNUNET_memcpy (buf, mh, msg_size);
90   GNUNET_MQ_impl_send_continue (mq);
91   return msg_size;
92 }
93
94
95 /**
96  * Signature of functions implementing the
97  * sending functionality of a message queue.
98  *
99  * @param mq the message queue
100  * @param msg the message to send
101  * @param impl_state state of the implementation
102  */
103 static void
104 core_mq_send (struct GNUNET_MQ_Handle *mq,
105               const struct GNUNET_MessageHeader *msg,
106               void *impl_state)
107 {
108   struct CoreMQState *mqs = impl_state;
109
110   GNUNET_assert (NULL == mqs->th);
111   LOG (GNUNET_ERROR_TYPE_DEBUG,
112        "Sending queued message (size %u)\n",
113        ntohs (msg->size));
114   mqs->th = GNUNET_CORE_notify_transmit_ready (mqs->core, GNUNET_YES, 0,
115                                                GNUNET_TIME_UNIT_FOREVER_REL,
116                                                &mqs->target,
117                                                ntohs (msg->size),
118                                                &core_mq_ntr, mq);
119 }
120
121
122 /**
123  * Signature of functions implementing the
124  * destruction of a message queue.
125  * Implementations must not free @a mq, but should
126  * take care of @a impl_state.
127  *
128  * @param mq the message queue to destroy
129  * @param impl_state state of the implementation
130  */
131 static void
132 core_mq_destroy (struct GNUNET_MQ_Handle *mq,
133                  void *impl_state)
134 {
135   struct CoreMQState *mqs = impl_state;
136
137   if (NULL != mqs->th)
138   {
139     GNUNET_CORE_notify_transmit_ready_cancel (mqs->th);
140     mqs->th = NULL;
141   }
142   GNUNET_free (mqs);
143 }
144
145
146 /**
147  * Implementation function that cancels the currently sent message.
148  *
149  * @param mq message queue
150  * @param impl_state state specific to the implementation
151  */
152 static void
153 core_mq_cancel (struct GNUNET_MQ_Handle *mq,
154                 void *impl_state)
155 {
156   struct CoreMQState *mqs = impl_state;
157
158   GNUNET_assert (NULL != mqs->th);
159   GNUNET_CORE_notify_transmit_ready_cancel (mqs->th);
160 }
161
162
163 /**
164  * Create a message queue for sending messages to a peer with CORE.
165  * Messages may only be queued with #GNUNET_MQ_send once the init callback has
166  * been called for the given handle.
167  * There must only be one queue per peer for each core handle.
168  * The message queue can only be used to transmit messages,
169  * not to receive them.
170  *
171  * @param h the core handle
172  * @param target the target peer for this queue, may not be NULL
173  * @return a message queue for sending messages over the core handle
174  *         to the target peer
175  */
176 struct GNUNET_MQ_Handle *
177 GNUNET_CORE_mq_create (struct GNUNET_CORE_Handle *h,
178                        const struct GNUNET_PeerIdentity *target)
179 {
180   struct CoreMQState *mqs = GNUNET_new (struct CoreMQState);
181
182   mqs->core = h;
183   mqs->target = *target;
184   return GNUNET_MQ_queue_for_callbacks (&core_mq_send,
185                                         &core_mq_destroy,
186                                         &core_mq_cancel,
187                                         mqs,
188                                         NULL, NULL, NULL);
189 }
190
191 /* end of core_api_mq.c */