- struct GNUNET_DHT_Handle *handle = cls;
- const struct GNUNET_DHT_ClientResultMessage *dht_msg;
- uint16_t msize;
- int ret;
-
- if (NULL == msg)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Error receiving data from DHT service, reconnecting\n");
- do_disconnect (handle);
- return;
- }
- ret = GNUNET_SYSERR;
- msize = ntohs (msg->size);
- switch (ntohs (msg->type))
- {
- case GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET:
- if (msize < sizeof (struct GNUNET_DHT_MonitorGetMessage))
- {
- GNUNET_break (0);
- break;
- }
- ret = process_monitor_get_message(handle,
- (const struct GNUNET_DHT_MonitorGetMessage *) msg);
- break;
- case GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP:
- if (msize < sizeof (struct GNUNET_DHT_MonitorGetRespMessage))
- {
- GNUNET_break (0);
- break;
- }
- ret = process_monitor_get_resp_message(handle,
- (const struct GNUNET_DHT_MonitorGetRespMessage *) msg);
- break;
- case GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT:
- if (msize < sizeof (struct GNUNET_DHT_MonitorPutMessage))
- {
- GNUNET_break (0);
- break;
- }
- ret = process_monitor_put_message(handle,
- (const struct GNUNET_DHT_MonitorPutMessage *) msg);
- break;
- case GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT_RESP:
- /* Not implemented yet */
- GNUNET_break(0);
- break;
- case GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT:
- if (ntohs (msg->size) < sizeof (struct GNUNET_DHT_ClientResultMessage))
- {
- GNUNET_break (0);
- break;
- }
- ret = GNUNET_OK;
- dht_msg = (const struct GNUNET_DHT_ClientResultMessage *) msg;
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Received reply for `%s' from DHT service %p\n",
- GNUNET_h2s (&dht_msg->key), handle);
- GNUNET_CONTAINER_multihashmap_get_multiple (handle->active_requests,
- &dht_msg->key, &process_reply,
- (void *) dht_msg);
- break;
- case GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK:
- if (ntohs (msg->size) != sizeof (struct GNUNET_DHT_ClientPutConfirmationMessage))
- {
- GNUNET_break (0);
- break;
- }
- ret = process_put_confirmation_message (handle,
- (const struct GNUNET_DHT_ClientPutConfirmationMessage*) msg);
- break;
- default:
- GNUNET_break(0);
- break;
- }
- if (GNUNET_OK != ret)
+ struct GNUNET_MQ_MessageHandler handlers[] = {
+ GNUNET_MQ_hd_var_size (monitor_get,
+ GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET,
+ struct GNUNET_DHT_MonitorGetMessage,
+ h),
+ GNUNET_MQ_hd_var_size (monitor_get_resp,
+ GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP,
+ struct GNUNET_DHT_MonitorGetRespMessage,
+ h),
+ GNUNET_MQ_hd_var_size (monitor_put,
+ GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT,
+ struct GNUNET_DHT_MonitorPutMessage,
+ h),
+ GNUNET_MQ_hd_var_size (client_result,
+ GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT,
+ struct GNUNET_DHT_ClientResultMessage,
+ h),
+ GNUNET_MQ_hd_fixed_size (put_confirmation,
+ GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK,
+ struct GNUNET_DHT_ClientPutConfirmationMessage,
+ h),
+ GNUNET_MQ_handler_end ()
+ };
+ if (NULL != h->mq)
+ return GNUNET_OK;
+ h->mq = GNUNET_CLIENT_connect (h->cfg,
+ "dht",
+ handlers,
+ &mq_error_handler,
+ h);
+ if (NULL == h->mq)