* inbound session: mhd_connection *
*/
void * recv_endpoint;
+
+ /**
+ * Current queue size
+ */
+ size_t queue_length_cur;
+
+ /**
+ * Max queue size
+ */
+ size_t queue_length_max;
+
};
/**
/**
* Iterator to remove peer context
* @param cls the plugin
- * @key the peers public key hashcode
- * @value the peer context
+ * @param key the peers public key hashcode
+ * @param value the peer context
* @return GNUNET_YES on success
*/
int remove_peer_context_Iterator (void *cls, const GNUNET_HashCode *key, void *value)
{
if (NULL!=msg->transmit_cont)
msg->transmit_cont (msg->transmit_cont_cls,&pc->identity,GNUNET_OK);
+ ps->queue_length_cur -= msg->size;
remove_http_message(ps,msg);
}
}
ps->recv_active=GNUNET_NO;
ps->peercontext=pc;
ps->session_id =id_num;
+ ps->queue_length_cur = 0;
+ ps->queue_length_max = GNUNET_SERVER_MAX_MESSAGE_SIZE;
ps->url = create_url (plugin, ps->addr, ps->addrlen, ps->session_id);
GNUNET_CONTAINER_DLL_insert(pc->head,pc->tail,ps);
GNUNET_STATISTICS_update (plugin->env->stats,
/* Calling transmit continuation */
if (NULL != ps->pending_msgs_tail->transmit_cont)
msg->transmit_cont (ps->pending_msgs_tail->transmit_cont_cls,&(ps->peercontext)->identity,GNUNET_OK);
+ ps->queue_length_cur -= msg->size;
remove_http_message(ps, msg);
}
return bytes_sent;
curl_multi_remove_handle(plugin->multi_handle,ps->send_endpoint);
//curl_easy_cleanup(ps->send_endpoint);
//ps->send_endpoint=NULL;
- cur_msg = ps->pending_msgs_tail;
- if (( NULL != cur_msg) && ( NULL != cur_msg->transmit_cont))
- cur_msg->transmit_cont (cur_msg->transmit_cont_cls,&pc->identity,GNUNET_SYSERR);
+ while (ps->pending_msgs_tail != NULL)
+ {
+ cur_msg = ps->pending_msgs_tail;
+ if ( NULL != cur_msg->transmit_cont)
+ cur_msg->transmit_cont (cur_msg->transmit_cont_cls,&pc->identity,GNUNET_SYSERR);
+ ps->queue_length_cur -= cur_msg->size;
+ remove_http_message(ps,cur_msg);
+ }
}
/* GET connection failed */
if (msg->easy_handle == ps->recv_endpoint)
http_result);
#endif
/* Calling transmit continuation */
- cur_msg = ps->pending_msgs_tail;
- if (( NULL != cur_msg) && (NULL != cur_msg->transmit_cont))
+ while (ps->pending_msgs_tail != NULL)
{
- /* HTTP 1xx : Last message before here was informational */
- if ((http_result >=100) && (http_result < 200))
- cur_msg->transmit_cont (cur_msg->transmit_cont_cls,&pc->identity,GNUNET_OK);
- /* HTTP 2xx: successful operations */
- if ((http_result >=200) && (http_result < 300))
- cur_msg->transmit_cont (cur_msg->transmit_cont_cls,&pc->identity,GNUNET_OK);
- /* HTTP 3xx..5xx: error */
- if ((http_result >=300) && (http_result < 600))
- cur_msg->transmit_cont (cur_msg->transmit_cont_cls,&pc->identity,GNUNET_SYSERR);
+ cur_msg = ps->pending_msgs_tail;
+ if ( NULL != cur_msg->transmit_cont)
+ {
+ /* HTTP 1xx : Last message before here was informational */
+ if ((http_result >=100) && (http_result < 200))
+ cur_msg->transmit_cont (cur_msg->transmit_cont_cls,&pc->identity,GNUNET_OK);
+ /* HTTP 2xx: successful operations */
+ if ((http_result >=200) && (http_result < 300))
+ cur_msg->transmit_cont (cur_msg->transmit_cont_cls,&pc->identity,GNUNET_OK);
+ /* HTTP 3xx..5xx: error */
+ if ((http_result >=300) && (http_result < 600))
+ cur_msg->transmit_cont (cur_msg->transmit_cont_cls,&pc->identity,GNUNET_SYSERR);
+ }
+ ps->queue_length_cur -= cur_msg->size;
+ remove_http_message(ps,cur_msg);
}
+
ps->send_connected = GNUNET_NO;
ps->send_active = GNUNET_NO;
curl_multi_remove_handle(plugin->multi_handle,ps->send_endpoint);
/**
* Function setting up file descriptors and scheduling task to run
*
- * @param cls plugin as closure
+ * @param plugin plugin as closure
* @return GNUNET_SYSERR for hard failure, GNUNET_OK for ok
*/
static int curl_schedule(struct Plugin *plugin)
/**
* select best session to transmit data to peer
*
- * @param cls closure
* @param pc peer context of target peer
* @param addr address of target peer
* @param addrlen address length
ps->pending_msgs_tail = NULL;
ps->peercontext=pc;
ps->session_id = pc->session_id_counter;
+ ps->queue_length_cur = 0;
+ ps->queue_length_max = GNUNET_SERVER_MAX_MESSAGE_SIZE;
pc->session_id_counter++;
ps->url = create_url (plugin, ps->addr, ps->addrlen, ps->session_id);
if (ps->msgtok == NULL)
}
}
- /* create msg */
- msg = GNUNET_malloc (sizeof (struct HTTP_Message) + msgbuf_size);
- msg->next = NULL;
- msg->size = msgbuf_size;
- msg->pos = 0;
- msg->buf = (char *) &msg[1];
- msg->transmit_cont = cont;
- msg->transmit_cont_cls = cont_cls;
- memcpy (msg->buf,msgbuf, msgbuf_size);
- GNUNET_CONTAINER_DLL_insert(ps->pending_msgs_head,ps->pending_msgs_tail,msg);
-
- if (send_check_connections (plugin, ps) == GNUNET_SYSERR)
+ if (msgbuf_size >= (ps->queue_length_max - ps->queue_length_cur))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,"Queue %X full: (%u) bytes in queue, would discard message (%u)\n", ps, (ps->queue_length_max - ps->queue_length_cur), msgbuf_size);
+ //return GNUNET_SYSERR;
+ }
+
+ /* create msg */
+ msg = GNUNET_malloc (sizeof (struct HTTP_Message) + msgbuf_size);
+ msg->next = NULL;
+ msg->size = msgbuf_size;
+ msg->pos = 0;
+ msg->buf = (char *) &msg[1];
+ msg->transmit_cont = cont;
+ msg->transmit_cont_cls = cont_cls;
+ memcpy (msg->buf,msgbuf, msgbuf_size);
+
+ GNUNET_CONTAINER_DLL_insert(ps->pending_msgs_head,ps->pending_msgs_tail,msg);
+ ps->queue_length_cur += msgbuf_size;
+
+ if (send_check_connections (plugin, ps) == GNUNET_SYSERR)
return GNUNET_SYSERR;
if (force_address != GNUNET_YES)
pc->last_session = ps;