multicast: added replay_end(), returning replay handle from join_decision(); removed...
[oweals/gnunet.git] / src / fs / gnunet-service-fs_put.c
1 /*
2      This file is part of GNUnet.
3      (C) 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_put.c
23  * @brief API to PUT zero-anonymity index data from our datastore into the DHT
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet-service-fs.h"
28 #include "gnunet-service-fs_put.h"
29
30
31 /**
32  * How often do we at most PUT content into the DHT?
33  */
34 #define MAX_DHT_PUT_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
35
36 /**
37  * How many replicas do we try to create per PUT?
38  */
39 #define DEFAULT_PUT_REPLICATION 5
40
41
42 /**
43  * Context for each zero-anonymity iterator.
44  */
45 struct PutOperator
46 {
47
48   /**
49    * Request to datastore for DHT PUTs (or NULL).
50    */
51   struct GNUNET_DATASTORE_QueueEntry *dht_qe;
52
53   /**
54    * Type we request from the datastore.
55    */
56   enum GNUNET_BLOCK_Type dht_put_type;
57
58   /**
59    * Handle to PUT operation.
60    */
61   struct GNUNET_DHT_PutHandle *dht_put;
62
63   /**
64    * ID of task that collects blocks for DHT PUTs.
65    */
66   GNUNET_SCHEDULER_TaskIdentifier dht_task;
67
68   /**
69    * How many entires with zero anonymity of our type do we currently
70    * estimate to have in the database?
71    */
72   uint64_t zero_anonymity_count_estimate;
73
74   /**
75    * Current offset when iterating the database.
76    */
77   uint64_t current_offset;
78 };
79
80
81 /**
82  * ANY-terminated list of our operators (one per type
83  * of block that we're putting into the DHT).
84  */
85 static struct PutOperator operators[] = {
86   {NULL, GNUNET_BLOCK_TYPE_FS_UBLOCK, 0, 0, 0},
87   {NULL, GNUNET_BLOCK_TYPE_ANY, 0, 0, 0}
88 };
89
90
91 /**
92  * Task that is run periodically to obtain blocks for DHT PUTs.
93  *
94  * @param cls type of blocks to gather
95  * @param tc scheduler context (unused)
96  */
97 static void
98 gather_dht_put_blocks (void *cls,
99                        const struct GNUNET_SCHEDULER_TaskContext *tc);
100
101
102 /**
103  * Calculate when to run the next PUT operation and schedule it.
104  *
105  * @param po put operator to schedule
106  */
107 static void
108 schedule_next_put (struct PutOperator *po)
109 {
110   struct GNUNET_TIME_Relative delay;
111
112   if (po->zero_anonymity_count_estimate > 0)
113   {
114     delay =
115         GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY,
116                                      po->zero_anonymity_count_estimate);
117     delay = GNUNET_TIME_relative_min (delay, MAX_DHT_PUT_FREQ);
118   }
119   else
120   {
121     /* if we have NO zero-anonymity content yet, wait 5 minutes for some to
122      * (hopefully) appear */
123     delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5);
124   }
125   po->dht_task =
126       GNUNET_SCHEDULER_add_delayed (delay, &gather_dht_put_blocks, po);
127 }
128
129
130 /**
131  * Continuation called after DHT PUT operation has finished.
132  *
133  * @param cls type of blocks to gather
134  * @param success GNUNET_OK if the PUT was transmitted,
135  *                GNUNET_NO on timeout,
136  *                GNUNET_SYSERR on disconnect from service
137  *                after the PUT message was transmitted
138  *                (so we don't know if it was received or not)
139  */
140 static void
141 delay_dht_put_blocks (void *cls, int success)
142 {
143   struct PutOperator *po = cls;
144
145   po->dht_put = NULL;
146   schedule_next_put (po);
147 }
148
149
150 /**
151  * Task that is run periodically to obtain blocks for DHT PUTs.
152  *
153  * @param cls type of blocks to gather
154  * @param tc scheduler context
155  */
156 static void
157 delay_dht_put_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
158 {
159   struct PutOperator *po = cls;
160
161   po->dht_task = GNUNET_SCHEDULER_NO_TASK;
162   schedule_next_put (po);
163 }
164
165
166 /**
167  * Store content in DHT.
168  *
169  * @param cls closure
170  * @param key key for the content
171  * @param size number of bytes in data
172  * @param data content stored
173  * @param type type of the content
174  * @param priority priority of the content
175  * @param anonymity anonymity-level for the content
176  * @param expiration expiration time for the content
177  * @param uid unique identifier for the datum;
178  *        maybe 0 if no unique identifier is available
179  */
180 static void
181 process_dht_put_content (void *cls, 
182                          const struct GNUNET_HashCode * key, 
183                          size_t size,
184                          const void *data, 
185                          enum GNUNET_BLOCK_Type type,
186                          uint32_t priority, uint32_t anonymity,
187                          struct GNUNET_TIME_Absolute expiration, uint64_t uid)
188 {
189   struct PutOperator *po = cls;
190
191   po->dht_qe = NULL;
192   if (key == NULL)
193   {
194     po->zero_anonymity_count_estimate = po->current_offset - 1;
195     po->current_offset = 0;
196     po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_task, po);
197     return;
198   }
199   po->zero_anonymity_count_estimate =
200       GNUNET_MAX (po->current_offset, po->zero_anonymity_count_estimate);
201   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
202               "Retrieved block `%s' of type %u for DHT PUT\n", GNUNET_h2s (key),
203               type);
204   po->dht_put = GNUNET_DHT_put (GSF_dht, key, DEFAULT_PUT_REPLICATION,
205                                 GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE, type, size, data,
206                                 expiration, GNUNET_TIME_UNIT_FOREVER_REL,
207                                 &delay_dht_put_blocks, po);
208 }
209
210
211 /**
212  * Task that is run periodically to obtain blocks for DHT PUTs.
213  *
214  * @param cls type of blocks to gather
215  * @param tc scheduler context (unused)
216  */
217 static void
218 gather_dht_put_blocks (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
219 {
220   struct PutOperator *po = cls;
221
222   po->dht_task = GNUNET_SCHEDULER_NO_TASK;
223   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
224     return;
225   po->dht_qe =
226       GNUNET_DATASTORE_get_zero_anonymity (GSF_dsh, po->current_offset++, 0,
227                                            UINT_MAX,
228                                            GNUNET_TIME_UNIT_FOREVER_REL,
229                                            po->dht_put_type,
230                                            &process_dht_put_content, po);
231   if (NULL == po->dht_qe)
232     po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_task, po);
233 }
234
235
236 /**
237  * Setup the module.
238  */
239 void
240 GSF_put_init_ ()
241 {
242   unsigned int i;
243
244   i = 0;
245   while (operators[i].dht_put_type != GNUNET_BLOCK_TYPE_ANY)
246   {
247     operators[i].dht_task =
248         GNUNET_SCHEDULER_add_now (&gather_dht_put_blocks, &operators[i]);
249     i++;
250   }
251 }
252
253
254 /**
255  * Shutdown the module.
256  */
257 void
258 GSF_put_done_ ()
259 {
260   struct PutOperator *po;
261   unsigned int i;
262
263   i = 0;
264   while ((po = &operators[i])->dht_put_type != GNUNET_BLOCK_TYPE_ANY)
265   {
266     if (GNUNET_SCHEDULER_NO_TASK != po->dht_task)
267     {
268       GNUNET_SCHEDULER_cancel (po->dht_task);
269       po->dht_task = GNUNET_SCHEDULER_NO_TASK;
270     }
271     if (NULL != po->dht_put)
272     {
273       GNUNET_DHT_put_cancel (po->dht_put);
274       po->dht_put = NULL;
275     }
276     if (NULL != po->dht_qe)
277     {
278       GNUNET_DATASTORE_cancel (po->dht_qe);
279       po->dht_qe = NULL;
280     }
281     i++;
282   }
283 }
284
285 /* end of gnunet-service-fs_put.c */