/*
This file is part of GNUnet.
- Copyright (C) 2012, 2013, 2014 GNUnet e.V.
+ Copyright (C) 2012, 2013, 2014, 2018 GNUnet e.V.
- GNUnet is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published
- by the Free Software Foundation; either version 3, or (at your
- option) any later version.
+ GNUnet is free software: you can redistribute it and/or modify it
+ under the terms of the GNU Affero General Public License as published
+ by the Free Software Foundation, either version 3 of the License,
+ or (at your option) any later version.
GNUnet is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with GNUnet; see the file COPYING. If not, write to the
- Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
- Boston, MA 02110-1301, USA.
+ Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
/**
* @brief namestore for the GNUnet naming system
* @author Matthias Wachs
* @author Christian Grothoff
+ *
+ * TODO:
+ * - run testcases, make sure everything works!
*/
#include "platform.h"
#include "gnunet_util_lib.h"
#define LOG_STRERROR_FILE(kind,syscall,filename) GNUNET_log_from_strerror_file (kind, "util", syscall, filename)
+/**
+ * If a monitor takes more than 1 minute to process an event, print a warning.
+ */
+#define MONITOR_STALL_WARN_DELAY GNUNET_TIME_UNIT_MINUTES
+
/**
* A namestore client
*/
struct GNUNET_SCHEDULER_Task *task;
+ /**
+ * Task to warn about slow monitors.
+ */
+ struct GNUNET_SCHEDULER_Task *sa_wait_warning;
+
+ /**
+ * Since when are we blocked on this monitor?
+ */
+ struct GNUNET_TIME_Absolute sa_waiting_start;
+
/**
* Last sequence number in the zone iteration used to address next
* result of the zone iteration in the store
*/
uint64_t limit;
+ /**
+ * How many more requests may we receive from the iterator
+ * before it is at the limit we gave it? Will be below or
+ * equal to @e limit. The effective limit for monitor
+ * events is thus @e iteration_cnt - @e limit!
+ */
+ uint64_t iteration_cnt;
+
+ /**
+ * Are we (still) in the initial iteration pass?
+ */
+ int in_first_iteration;
+
+ /**
+ * Is there a store activity waiting for this monitor? We only raise the
+ * flag when it happens and search the DLL for the store activity when we
+ * had a limit increase. If we cannot find any waiting store activity at
+ * that time, we clear the flag again.
+ */
+ int sa_waiting;
+
};
};
+/**
+ * Information for an ongoing #handle_record_store() operation.
+ * Needed as we may wait for monitors to be ready for the notification.
+ */
+struct StoreActivity
+{
+ /**
+ * Kept in a DLL.
+ */
+ struct StoreActivity *next;
+
+ /**
+ * Kept in a DLL.
+ */
+ struct StoreActivity *prev;
+
+ /**
+ * Which client triggered the store activity?
+ */
+ struct NamestoreClient *nc;
+
+ /**
+ * Copy of the original store message (as data fields in @e rd will
+ * point into it!).
+ */
+ const struct RecordStoreMessage *rsm;
+
+ /**
+ * Next zone monitor that still needs to be notified about this PUT.
+ */
+ struct ZoneMonitor *zm_pos;
+
+ /**
+ * Label nicely canonicalized (lower case).
+ */
+ char *conv_name;
+
+};
+
+
/**
* Public key of all zeros.
*/
*/
static struct ZoneMonitor *monitor_tail;
+/**
+ * Head of DLL of monitor-blocked store activities.
+ */
+static struct StoreActivity *sa_head;
+
+/**
+ * Tail of DLL of monitor-blocked store activities.
+ */
+static struct StoreActivity *sa_tail;
+
/**
* Notification context shared by all monitors.
*/
/**
- * Called whenever a client is disconnected.
- * Frees our resources associated with that client.
+ * Release memory used by @a sa.
*
- * @param cls closure
- * @param client identification of the client
- * @param app_ctx the `struct NamestoreClient` of @a client
+ * @param sa activity to free
*/
static void
-client_disconnect_cb (void *cls,
- struct GNUNET_SERVICE_Client *client,
- void *app_ctx)
-{
- struct NamestoreClient *nc = app_ctx;
- struct ZoneIteration *no;
- struct ZoneMonitor *zm;
- struct CacheOperation *cop;
-
- (void) cls;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Client %p disconnected\n",
- client);
- for (zm = monitor_head; NULL != zm; zm = zm->next)
- {
- if (nc == zm->nc)
- {
- GNUNET_CONTAINER_DLL_remove (monitor_head,
- monitor_tail,
- zm);
- if (NULL != zm->task)
- {
- GNUNET_SCHEDULER_cancel (zm->task);
- zm->task = NULL;
- }
- GNUNET_free (zm);
- break;
- }
- }
- while (NULL != (no = nc->op_head))
- {
- GNUNET_CONTAINER_DLL_remove (nc->op_head,
- nc->op_tail,
- no);
- GNUNET_free (no);
- }
- for (cop = cop_head; NULL != cop; cop = cop->next)
- if (nc == cop->nc)
- cop->nc = NULL;
- GNUNET_free (nc);
-}
-
-
-/**
- * Add a client to our list of active clients.
- *
- * @param cls NULL
- * @param client client to add
- * @param mq message queue for @a client
- * @return internal namestore client structure for this client
- */
-static void *
-client_connect_cb (void *cls,
- struct GNUNET_SERVICE_Client *client,
- struct GNUNET_MQ_Handle *mq)
+free_store_activity (struct StoreActivity *sa)
{
- struct NamestoreClient *nc;
-
- (void) cls;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Client %p connected\n",
- client);
- nc = GNUNET_new (struct NamestoreClient);
- nc->client = client;
- nc->mq = mq;
- return nc;
+ GNUNET_CONTAINER_DLL_remove (sa_head,
+ sa_tail,
+ sa);
+ GNUNET_free (sa->conv_name);
+ GNUNET_free (sa);
}
uint64_t latest_expiration;
size_t req;
char *data;
- int record_offset;
size_t data_offset;
+ struct GNUNET_GNSRECORD_Data *target;
(*rdc_res) = 1 + rd2_length;
if (0 == 1 + rd2_length)
{
+ GNUNET_break (0);
(*rd_res) = NULL;
return;
}
- req = 0;
- for (unsigned int c=0; c< 1; c++)
- req += sizeof (struct GNUNET_GNSRECORD_Data) + nick_rd[c].data_size;
- for (unsigned int c=0; c< rd2_length; c++)
- req += sizeof (struct GNUNET_GNSRECORD_Data) + rd2[c].data_size;
- (*rd_res) = GNUNET_malloc (req);
- data = (char *) &(*rd_res)[1 + rd2_length];
+ req = sizeof (struct GNUNET_GNSRECORD_Data) + nick_rd->data_size;
+ for (unsigned int i=0; i<rd2_length; i++)
+ {
+ const struct GNUNET_GNSRECORD_Data *orig = &rd2[i];
+
+ if (req + sizeof (struct GNUNET_GNSRECORD_Data) + orig->data_size < req)
+ {
+ GNUNET_break (0);
+ (*rd_res) = NULL;
+ return;
+ }
+ req += sizeof (struct GNUNET_GNSRECORD_Data) + orig->data_size;
+ }
+ target = GNUNET_malloc (req);
+ (*rd_res) = target;
+ data = (char *) &target[1 + rd2_length];
data_offset = 0;
latest_expiration = 0;
- for (unsigned int c=0; c< rd2_length; c++)
+ for (unsigned int i=0;i<rd2_length;i++)
{
- if (0 != (rd2[c].flags & GNUNET_GNSRECORD_RF_RELATIVE_EXPIRATION))
+ const struct GNUNET_GNSRECORD_Data *orig = &rd2[i];
+
+ if (0 != (orig->flags & GNUNET_GNSRECORD_RF_RELATIVE_EXPIRATION))
{
- if ((GNUNET_TIME_absolute_get().abs_value_us + rd2[c].expiration_time) >
- latest_expiration)
- latest_expiration = rd2[c].expiration_time;
+ if ((GNUNET_TIME_absolute_get().abs_value_us + orig->expiration_time) >
+ latest_expiration)
+ latest_expiration = orig->expiration_time;
}
- else if (rd2[c].expiration_time > latest_expiration)
- latest_expiration = rd2[c].expiration_time;
- (*rd_res)[c] = rd2[c];
- (*rd_res)[c].data = (void *) &data[data_offset];
- GNUNET_memcpy ((void *) (*rd_res)[c].data,
- rd2[c].data,
- rd2[c].data_size);
- data_offset += (*rd_res)[c].data_size;
+ else if (orig->expiration_time > latest_expiration)
+ latest_expiration = orig->expiration_time;
+ target[i] = *orig;
+ target[i].data = (void *) &data[data_offset];
+ GNUNET_memcpy (&data[data_offset],
+ orig->data,
+ orig->data_size);
+ data_offset += orig->data_size;
}
/* append nick */
- record_offset = rd2_length;
- (*rd_res)[record_offset] = *nick_rd;
- (*rd_res)[record_offset].expiration_time = latest_expiration;
- (*rd_res)[record_offset].data = (void *) &data[data_offset];
- GNUNET_memcpy ((void *) (*rd_res)[record_offset].data,
+ target[rd2_length] = *nick_rd;
+ target[rd2_length].expiration_time = latest_expiration;
+ target[rd2_length].data = (void *) &data[data_offset];
+ GNUNET_memcpy (&data[data_offset],
nick_rd->data,
nick_rd->data_size);
- data_offset += (*rd_res)[record_offset].data_size;
- GNUNET_assert (req == (sizeof (struct GNUNET_GNSRECORD_Data)) * (*rdc_res) + data_offset);
+ data_offset += nick_rd->data_size;
+ GNUNET_assert (req ==
+ (sizeof (struct GNUNET_GNSRECORD_Data)) * (*rdc_res) + data_offset);
}
struct GNUNET_GNSRECORD_Data *res;
unsigned int res_count;
size_t name_len;
- size_t rd_ser_len;
+ ssize_t rd_ser_len;
char *name_tmp;
char *rd_ser;
nick = get_nick_record (zone_key);
+
+ GNUNET_assert (-1 !=
+ GNUNET_GNSRECORD_records_get_size (rd_count,
+ rd));
+
if ( (NULL != nick) &&
(0 != strcmp (name,
GNUNET_GNS_EMPTY_LABEL_AT)))
res = (struct GNUNET_GNSRECORD_Data *) rd;
}
+ GNUNET_assert (-1 !=
+ GNUNET_GNSRECORD_records_get_size (res_count,
+ res));
+
+
name_len = strlen (name) + 1;
- rd_ser_len = GNUNET_GNSRECORD_records_get_size (res_count, res);
+ rd_ser_len = GNUNET_GNSRECORD_records_get_size (res_count,
+ res);
+ if (rd_ser_len < 0)
+ {
+ GNUNET_break (0);
+ GNUNET_SERVICE_client_drop (nc->client);
+ return;
+ }
+ if (rd_ser_len >= UINT16_MAX - name_len - sizeof (*zir_msg))
+ {
+ GNUNET_break (0);
+ GNUNET_SERVICE_client_drop (nc->client);
+ return;
+ }
env = GNUNET_MQ_msg_extra (zir_msg,
name_len + rd_ser_len,
GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_RESULT);
zir_msg->gns_header.r_id = htonl (request_id);
zir_msg->name_len = htons (name_len);
zir_msg->rd_count = htons (res_count);
- zir_msg->rd_len = htons (rd_ser_len);
+ zir_msg->rd_len = htons ((uint16_t) rd_ser_len);
zir_msg->private_key = *zone_key;
name_tmp = (char *) &zir_msg[1];
GNUNET_memcpy (name_tmp,
name,
name_len);
rd_ser = &name_tmp[name_len];
- GNUNET_GNSRECORD_records_serialize (res_count,
- res,
- rd_ser_len,
- rd_ser);
+ GNUNET_assert (rd_ser_len ==
+ GNUNET_GNSRECORD_records_serialize (res_count,
+ res,
+ rd_ser_len,
+ rd_ser));
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Sending RECORD_RESULT message with %u records\n",
res_count);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Sending RECORD_STORE_RESPONSE message\n");
+ GNUNET_STATISTICS_update (statistics,
+ "Store requests completed",
+ 1,
+ GNUNET_NO);
env = GNUNET_MQ_msg (rcr_msg,
GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_STORE_RESPONSE);
rcr_msg->gns_header.r_id = htonl (rid);
}
+/**
+ * Print a warning that one of our monitors is no longer reacting.
+ *
+ * @param cls a `struct ZoneMonitor` to warn about
+ */
+static void
+warn_monitor_slow (void *cls)
+{
+ struct ZoneMonitor *zm = cls;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "No response from monitor since %s\n",
+ GNUNET_STRINGS_absolute_time_to_string (zm->sa_waiting_start));
+ zm->sa_wait_warning = GNUNET_SCHEDULER_add_delayed (MONITOR_STALL_WARN_DELAY,
+ &warn_monitor_slow,
+ zm);
+}
+
+
+/**
+ * Continue processing the @a sa.
+ *
+ * @param sa store activity to process
+ */
+static void
+continue_store_activity (struct StoreActivity *sa)
+{
+ const struct RecordStoreMessage *rp_msg = sa->rsm;
+ unsigned int rd_count;
+ size_t name_len;
+ size_t rd_ser_len;
+ uint32_t rid;
+ const char *name_tmp;
+ const char *rd_ser;
+
+ rid = ntohl (rp_msg->gns_header.r_id);
+ name_len = ntohs (rp_msg->name_len);
+ rd_count = ntohs (rp_msg->rd_count);
+ rd_ser_len = ntohs (rp_msg->rd_len);
+ name_tmp = (const char *) &rp_msg[1];
+ rd_ser = &name_tmp[name_len];
+ {
+ struct GNUNET_GNSRECORD_Data rd[GNUNET_NZL(rd_count)];
+
+ /* We did this before, must succeed again */
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_GNSRECORD_records_deserialize (rd_ser_len,
+ rd_ser,
+ rd_count,
+ rd));
+
+ for (struct ZoneMonitor *zm = sa->zm_pos;
+ NULL != zm;
+ zm = sa->zm_pos)
+ {
+ if ( (0 != memcmp (&rp_msg->private_key,
+ &zm->zone,
+ sizeof (struct GNUNET_CRYPTO_EcdsaPrivateKey))) &&
+ (0 != memcmp (&zm->zone,
+ &zero,
+ sizeof (struct GNUNET_CRYPTO_EcdsaPrivateKey))) )
+ {
+ sa->zm_pos = zm->next; /* not interesting to this monitor */
+ continue; // -- fails tests, but why not here?
+ }
+ if (zm->limit == zm->iteration_cnt)
+ {
+ zm->sa_waiting = GNUNET_YES;
+ zm->sa_waiting_start = GNUNET_TIME_absolute_get ();
+ if (NULL != zm->sa_wait_warning)
+ GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
+ zm->sa_wait_warning = GNUNET_SCHEDULER_add_delayed (MONITOR_STALL_WARN_DELAY,
+ &warn_monitor_slow,
+ zm);
+ return; /* blocked on zone monitor */
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Notifying monitor about changes under label `%s'\n",
+ sa->conv_name);
+ zm->limit--;
+ send_lookup_response (zm->nc,
+ 0,
+ &rp_msg->private_key,
+ sa->conv_name,
+ rd_count,
+ rd);
+ sa->zm_pos = zm->next;
+ }
+ /* great, done with the monitors, unpack (again) for refresh_block operation */
+ refresh_block (sa->nc,
+ rid,
+ &rp_msg->private_key,
+ sa->conv_name,
+ rd_count,
+ rd);
+ }
+ GNUNET_SERVICE_client_continue (sa->nc->client);
+ free_store_activity (sa);
+}
+
+
+/**
+ * Called whenever a client is disconnected.
+ * Frees our resources associated with that client.
+ *
+ * @param cls closure
+ * @param client identification of the client
+ * @param app_ctx the `struct NamestoreClient` of @a client
+ */
+static void
+client_disconnect_cb (void *cls,
+ struct GNUNET_SERVICE_Client *client,
+ void *app_ctx)
+{
+ struct NamestoreClient *nc = app_ctx;
+ struct ZoneIteration *no;
+ struct CacheOperation *cop;
+
+ (void) cls;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Client %p disconnected\n",
+ client);
+ for (struct ZoneMonitor *zm = monitor_head; NULL != zm; zm = zm->next)
+ {
+ struct StoreActivity *san;
+
+ if (nc != zm->nc)
+ continue;
+ GNUNET_CONTAINER_DLL_remove (monitor_head,
+ monitor_tail,
+ zm);
+ if (NULL != zm->task)
+ {
+ GNUNET_SCHEDULER_cancel (zm->task);
+ zm->task = NULL;
+ }
+ if (NULL != zm->sa_wait_warning)
+ {
+ GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
+ zm->sa_wait_warning = NULL;
+ }
+ for (struct StoreActivity *sa = sa_head; NULL != sa; sa = san)
+ {
+ san = sa->next;
+ if (zm == sa->zm_pos)
+ {
+ sa->zm_pos = zm->next;
+ /* this may free sa */
+ continue_store_activity (sa);
+ }
+ }
+ GNUNET_free (zm);
+ break;
+ }
+ for (struct StoreActivity *sa = sa_head; NULL != sa; sa = sa->next)
+ {
+ if (sa->nc == nc)
+ {
+ /* this may free sa */
+ free_store_activity (sa);
+ break; /* there can only be one per nc */
+ }
+ }
+ while (NULL != (no = nc->op_head))
+ {
+ GNUNET_CONTAINER_DLL_remove (nc->op_head,
+ nc->op_tail,
+ no);
+ GNUNET_free (no);
+ }
+ for (cop = cop_head; NULL != cop; cop = cop->next)
+ if (nc == cop->nc)
+ cop->nc = NULL;
+ GNUNET_free (nc);
+}
+
+
+/**
+ * Add a client to our list of active clients.
+ *
+ * @param cls NULL
+ * @param client client to add
+ * @param mq message queue for @a client
+ * @return internal namestore client structure for this client
+ */
+static void *
+client_connect_cb (void *cls,
+ struct GNUNET_SERVICE_Client *client,
+ struct GNUNET_MQ_Handle *mq)
+{
+ struct NamestoreClient *nc;
+
+ (void) cls;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Client %p connected\n",
+ client);
+ nc = GNUNET_new (struct NamestoreClient);
+ nc->client = client;
+ nc->mq = mq;
+ return nc;
+}
+
+
/**
* Closure for #lookup_it().
*/
/**
* FIXME.
*/
- size_t rd_ser_len;
+ ssize_t rd_ser_len;
};
/**
* FIXME.
+ *
* @param seq sequence number of the record
*/
static void
const struct GNUNET_GNSRECORD_Data *rd)
{
struct RecordLookupContext *rlc = cls;
- struct GNUNET_GNSRECORD_Data *rd_res;
- unsigned int rdc_res;
(void) private_key;
(void) seq;
- if (0 == strcmp (label,
+ if (0 != strcmp (label,
rlc->label))
+ return;
+ rlc->found = GNUNET_YES;
+ if (0 == rd_count)
+ {
+ rlc->rd_ser_len = 0;
+ rlc->res_rd_count = 0;
+ rlc->res_rd = NULL;
+ return;
+ }
+ if ( (NULL != rlc->nick) &&
+ (0 != strcmp (label,
+ GNUNET_GNS_EMPTY_LABEL_AT)) )
{
- rlc->found = GNUNET_YES;
- if (0 != rd_count)
+ /* Merge */
+ struct GNUNET_GNSRECORD_Data *rd_res;
+ unsigned int rdc_res;
+
+ rd_res = NULL;
+ rdc_res = 0;
+ rlc->nick->flags = (rlc->nick->flags | GNUNET_GNSRECORD_RF_PRIVATE) ^ GNUNET_GNSRECORD_RF_PRIVATE;
+ merge_with_nick_records (rlc->nick,
+ rd_count,
+ rd,
+ &rdc_res,
+ &rd_res);
+ rlc->rd_ser_len = GNUNET_GNSRECORD_records_get_size (rdc_res,
+ rd_res);
+ if (rlc->rd_ser_len < 0)
{
- if ( (NULL != rlc->nick) &&
- (0 != strcmp (label,
- GNUNET_GNS_EMPTY_LABEL_AT)) )
- {
- /* Merge */
- rd_res = NULL;
- rdc_res = 0;
- rlc->nick->flags = (rlc->nick->flags | GNUNET_GNSRECORD_RF_PRIVATE) ^ GNUNET_GNSRECORD_RF_PRIVATE;
- merge_with_nick_records (rlc->nick,
- rd_count,
- rd,
- &rdc_res,
- &rd_res);
- rlc->rd_ser_len = GNUNET_GNSRECORD_records_get_size (rdc_res,
- rd_res);
- rlc->res_rd_count = rdc_res;
- rlc->res_rd = GNUNET_malloc (rlc->rd_ser_len);
+ GNUNET_break (0);
+ GNUNET_free (rd_res);
+ rlc->found = GNUNET_NO;
+ rlc->rd_ser_len = 0;
+ return;
+ }
+ rlc->res_rd_count = rdc_res;
+ rlc->res_rd = GNUNET_malloc (rlc->rd_ser_len);
+ if (rlc->rd_ser_len !=
GNUNET_GNSRECORD_records_serialize (rdc_res,
rd_res,
rlc->rd_ser_len,
- rlc->res_rd);
-
- GNUNET_free (rd_res);
- GNUNET_free (rlc->nick);
- rlc->nick = NULL;
- }
- else
- {
- rlc->rd_ser_len = GNUNET_GNSRECORD_records_get_size (rd_count,
- rd);
- rlc->res_rd_count = rd_count;
- rlc->res_rd = GNUNET_malloc (rlc->rd_ser_len);
+ rlc->res_rd))
+ {
+ GNUNET_break (0);
+ GNUNET_free (rlc->res_rd);
+ rlc->res_rd = NULL;
+ rlc->res_rd_count = 0;
+ rlc->rd_ser_len = 0;
+ GNUNET_free (rd_res);
+ rlc->found = GNUNET_NO;
+ return;
+ }
+ GNUNET_free (rd_res);
+ GNUNET_free (rlc->nick);
+ rlc->nick = NULL;
+ }
+ else
+ {
+ rlc->rd_ser_len = GNUNET_GNSRECORD_records_get_size (rd_count,
+ rd);
+ if (rlc->rd_ser_len < 0)
+ {
+ GNUNET_break (0);
+ rlc->found = GNUNET_NO;
+ rlc->rd_ser_len = 0;
+ return;
+ }
+ rlc->res_rd_count = rd_count;
+ rlc->res_rd = GNUNET_malloc (rlc->rd_ser_len);
+ if (rlc->rd_ser_len !=
GNUNET_GNSRECORD_records_serialize (rd_count,
rd,
rlc->rd_ser_len,
- rlc->res_rd);
- }
- }
- else
+ rlc->res_rd))
{
- rlc->rd_ser_len = 0;
- rlc->res_rd_count = 0;
+ GNUNET_break (0);
+ GNUNET_free (rlc->res_rd);
rlc->res_rd = NULL;
+ rlc->res_rd_count = 0;
+ rlc->rd_ser_len = 0;
+ rlc->found = GNUNET_NO;
+ return;
}
}
}
const char *rd_ser;
unsigned int rd_count;
int res;
- struct ZoneMonitor *zm;
+ struct StoreActivity *sa;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Received NAMESTORE_RECORD_STORE message\n");
name_tmp = (const char *) &rp_msg[1];
rd_ser = &name_tmp[name_len];
{
- struct GNUNET_GNSRECORD_Data rd[rd_count];
+ struct GNUNET_GNSRECORD_Data rd[GNUNET_NZL(rd_count)];
if (GNUNET_OK !=
GNUNET_GNSRECORD_records_deserialize (rd_ser_len,
GNUNET_SERVICE_client_drop (nc->client);
return;
}
+ GNUNET_STATISTICS_update (statistics,
+ "Well-formed store requests received",
+ 1,
+ GNUNET_NO);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Creating %u records for name `%s'\n",
(unsigned int) rd_count,
}
else
{
- struct GNUNET_GNSRECORD_Data rd_clean[rd_count];
- unsigned int rd_clean_off;
-
/* remove "NICK" records, unless this is for the
#GNUNET_GNS_EMPTY_LABEL_AT label */
+ struct GNUNET_GNSRECORD_Data rd_clean[GNUNET_NZL(rd_count)];
+ unsigned int rd_clean_off;
+
rd_clean_off = 0;
for (unsigned int i=0;i<rd_count;i++)
{
conv_name,
rd_clean_off,
rd_clean);
- if (GNUNET_OK == res)
- {
- for (zm = monitor_head; NULL != zm; zm = zm->next)
- {
- if ( (0 == memcmp (&rp_msg->private_key,
- &zm->zone,
- sizeof (struct GNUNET_CRYPTO_EcdsaPrivateKey))) ||
- (0 == memcmp (&zm->zone,
- &zero,
- sizeof (struct GNUNET_CRYPTO_EcdsaPrivateKey))) )
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Notifying monitor about changes under label `%s'\n",
- conv_name);
- send_lookup_response (zm->nc,
- 0,
- &rp_msg->private_key,
- conv_name,
- rd_count, rd);
- }
- else
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Monitor is for another zone\n");
- }
- if (NULL == monitor_head)
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "No monitors active\n");
- }
- else
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Error storing record: %d\n",
- res);
- }
}
- if (GNUNET_OK == res)
+
+ if (GNUNET_OK != res)
{
- refresh_block (nc,
- rid,
- &rp_msg->private_key,
- conv_name,
- rd_count,
- rd);
+ /* store not successful, not need to tell monitors */
+ send_store_response (nc,
+ res,
+ rid);
GNUNET_SERVICE_client_continue (nc->client);
GNUNET_free (conv_name);
return;
}
- GNUNET_free (conv_name);
+
+ sa = GNUNET_malloc (sizeof (struct StoreActivity) +
+ ntohs (rp_msg->gns_header.header.size));
+ GNUNET_CONTAINER_DLL_insert (sa_head,
+ sa_tail,
+ sa);
+ sa->nc = nc;
+ sa->rsm = (const struct RecordStoreMessage *) &sa[1];
+ GNUNET_memcpy (&sa[1],
+ rp_msg,
+ ntohs (rp_msg->gns_header.header.size));
+ sa->zm_pos = monitor_head;
+ sa->conv_name = conv_name;
+ continue_store_activity (sa);
}
- send_store_response (nc,
- res,
- rid);
- GNUNET_SERVICE_client_continue (nc->client);
}
struct ZoneToNameResponseMessage *ztnr_msg;
int16_t res;
size_t name_len;
- size_t rd_ser_len;
+ ssize_t rd_ser_len;
size_t msg_size;
char *name_tmp;
char *rd_tmp;
name);
res = GNUNET_YES;
name_len = (NULL == name) ? 0 : strlen (name) + 1;
- rd_ser_len = GNUNET_GNSRECORD_records_get_size (rd_count, rd);
+ rd_ser_len = GNUNET_GNSRECORD_records_get_size (rd_count,
+ rd);
+ if (rd_ser_len < 0)
+ {
+ GNUNET_break (0);
+ ztn_ctx->success = GNUNET_SYSERR;
+ return;
+ }
msg_size = sizeof (struct ZoneToNameResponseMessage) + name_len + rd_ser_len;
if (msg_size >= GNUNET_MAX_MESSAGE_SIZE)
{
name,
name_len);
rd_tmp = &name_tmp[name_len];
- GNUNET_GNSRECORD_records_serialize (rd_count,
- rd,
- rd_ser_len,
- rd_tmp);
+ GNUNET_assert (rd_ser_len ==
+ GNUNET_GNSRECORD_records_serialize (rd_count,
+ rd,
+ rd_ser_len,
+ rd_tmp));
ztn_ctx->success = GNUNET_OK;
GNUNET_MQ_send (ztn_ctx->nc->mq,
env);
struct ZoneToNameResponseMessage *ztnr_msg;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received `%s' message\n",
- "ZONE_TO_NAME");
+ "Received ZONE_TO_NAME message\n");
ztn_ctx.rid = ntohl (ztn_msg->gns_header.r_id);
ztn_ctx.nc = nc;
ztn_ctx.success = GNUNET_NO;
}
+/**
+ * Function called when the monitor is ready for more data, and we
+ * should thus unblock PUT operations that were blocked on the
+ * monitor not being ready.
+ */
+static void
+monitor_unblock (struct ZoneMonitor *zm)
+{
+ struct StoreActivity *sa = sa_head;
+
+ while ( (NULL != sa) &&
+ (zm->limit > zm->iteration_cnt) )
+ {
+ struct StoreActivity *sn = sa->next;
+
+ if (sa->zm_pos == zm)
+ continue_store_activity (sa);
+ sa = sn;
+ }
+ if (zm->limit > zm->iteration_cnt)
+ {
+ zm->sa_waiting = GNUNET_NO;
+ if (NULL != zm->sa_wait_warning)
+ {
+ GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
+ zm->sa_wait_warning = NULL;
+ }
+ }
+ else if (GNUNET_YES == zm->sa_waiting)
+ {
+ zm->sa_waiting_start = GNUNET_TIME_absolute_get ();
+ if (NULL != zm->sa_wait_warning)
+ GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
+ zm->sa_wait_warning = GNUNET_SCHEDULER_add_delayed (MONITOR_STALL_WARN_DELAY,
+ &warn_monitor_slow,
+ zm);
+ }
+}
+
+
/**
* Send 'sync' message to zone monitor, we're now in sync.
*
GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_SYNC);
GNUNET_MQ_send (zm->nc->mq,
env);
+ /* mark iteration done */
+ zm->in_first_iteration = GNUNET_NO;
+ zm->iteration_cnt = 0;
+ if ( (zm->limit > 0) &&
+ (zm->sa_waiting) )
+ monitor_unblock (zm);
}
/**
- * Obtain the next datum during the zone monitor's zone intiial iteration.
+ * Obtain the next datum during the zone monitor's zone initial iteration.
*
* @param cls zone monitor that does its initial iteration
*/
static void
-monitor_next (void *cls);
+monitor_iteration_next (void *cls);
/**
struct ZoneMonitor *zm = cls;
zm->seq = seq;
- if (NULL == name)
- {
- /* finished with iteration */
- monitor_sync (zm);
- return;
- }
+ GNUNET_assert (NULL != name);
GNUNET_STATISTICS_update (statistics,
"Monitor notifications sent",
1,
GNUNET_NO);
+ zm->limit--;
+ zm->iteration_cnt--;
send_lookup_response (zm->nc,
0,
zone_key,
name,
rd_count,
rd);
- zm->task = GNUNET_SCHEDULER_add_now (&monitor_next,
- zm);
+ if ( (0 == zm->iteration_cnt) &&
+ (0 != zm->limit) )
+ {
+ /* We are done with the current iteration batch, AND the
+ client would right now accept more, so go again! */
+ GNUNET_assert (NULL == zm->task);
+ zm->task = GNUNET_SCHEDULER_add_now (&monitor_iteration_next,
+ zm);
+ }
}
zm = GNUNET_new (struct ZoneMonitor);
zm->nc = nc;
zm->zone = zis_msg->zone;
+ zm->limit = 1;
+ zm->in_first_iteration = (GNUNET_YES == ntohl (zis_msg->iterate_first));
GNUNET_CONTAINER_DLL_insert (monitor_head,
monitor_tail,
zm);
GNUNET_SERVICE_client_continue (nc->client);
GNUNET_notification_context_add (monitor_nc,
nc->mq);
- if (GNUNET_YES == ntohl (zis_msg->iterate_first))
- zm->task = GNUNET_SCHEDULER_add_now (&monitor_next,
+ if (zm->in_first_iteration)
+ zm->task = GNUNET_SCHEDULER_add_now (&monitor_iteration_next,
zm);
else
monitor_sync (zm);
* @param cls zone monitor that does its initial iteration
*/
static void
-monitor_next (void *cls)
+monitor_iteration_next (void *cls)
{
struct ZoneMonitor *zm = cls;
int ret;
zm->task = NULL;
+ GNUNET_assert (0 == zm->iteration_cnt);
+ if (zm->limit > 16)
+ zm->iteration_cnt = zm->limit / 2; /* leave half for monitor events */
+ else
+ zm->iteration_cnt = zm->limit; /* use it all */
ret = GSN_database->iterate_records (GSN_database->cls,
(0 == memcmp (&zm->zone,
&zero,
? NULL
: &zm->zone,
zm->seq,
- 1,
+ zm->iteration_cnt,
&monitor_iterate_cb,
zm);
if (GNUNET_SYSERR == ret)
return;
}
zm->limit += inc;
-#if 0
- if (GNUNET_YES == ntohl (zis_msg->iterate_first))
- zm->task = GNUNET_SCHEDULER_add_now (&monitor_next,
- zm);
- else
- monitor_sync (zm);
-#endif
+ if ( (zm->in_first_iteration) &&
+ (zm->limit == inc) )
+ {
+ /* We are still iterating, and the previous iteration must
+ have stopped due to the client's limit, so continue it! */
+ GNUNET_assert (NULL == zm->task);
+ zm->task = GNUNET_SCHEDULER_add_now (&monitor_iteration_next,
+ zm);
+ }
+ GNUNET_assert (zm->iteration_cnt <= zm->limit);
+ if ( (zm->limit > zm->iteration_cnt) &&
+ (zm->sa_waiting) )
+ {
+ monitor_unblock (zm);
+ }
+ else if (GNUNET_YES == zm->sa_waiting)
+ {
+ if (NULL != zm->sa_wait_warning)
+ GNUNET_SCHEDULER_cancel (zm->sa_wait_warning);
+ zm->sa_waiting_start = GNUNET_TIME_absolute_get ();
+ zm->sa_wait_warning = GNUNET_SCHEDULER_add_delayed (MONITOR_STALL_WARN_DELAY,
+ &warn_monitor_slow,
+ zm);
+ }
}