From: David Brodski Date: Tue, 9 Nov 2010 22:24:58 +0000 (+0000) Subject: wlan: seperate fragment and session queue X-Git-Tag: initial-import-from-subversion-38251~19784 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=3db08333ec2b301cc8f204ad74f5aa8a25e3426f;p=oweals%2Fgnunet.git wlan: seperate fragment and session queue wlan: schedule for timeouts used dll: insert at tail dll: insert before element --- diff --git a/src/include/gnunet_container_lib.h b/src/include/gnunet_container_lib.h index 2abbfadc8..480b98be8 100644 --- a/src/include/gnunet_container_lib.h +++ b/src/include/gnunet_container_lib.h @@ -693,6 +693,23 @@ int GNUNET_CONTAINER_multihashmap_get_multiple (const struct (head)->prev = element; \ (head) = (element); } while (0) +/** + * Insert an element at the tail of a DLL. Assumes that head, tail and + * element are structs with prev and next fields. + * + * @param head pointer to the head of the DLL + * @param tail pointer to the tail of the DLL + * @param element element to insert + */ +#define GNUNET_CONTAINER_DLL_insert_tail(head,tail,element) do { \ + (element)->prev = (tail); \ + (element)->next = NULL; \ + if ((head) == NULL) \ + (head) = element; \ + else \ + (tail)->next = element; \ + (tail) = (element); } while (0) + /** * Insert an element into a DLL after the given other element. Insert * at the head if the other element is NULL. @@ -719,7 +736,31 @@ int GNUNET_CONTAINER_multihashmap_get_multiple (const struct else \ (element)->next->prev = (element); } while (0) - +/** + * Insert an element into a DLL before the given other element. Insert + * at the tail if the other element is NULL. + * + * @param head pointer to the head of the DLL + * @param tail pointer to the tail of the DLL + * @param other prior element, NULL for insertion at head of DLL + * @param element element to insert + */ +#define GNUNET_CONTAINER_DLL_insert_before(head,tail,other,element) do { \ + (element)->next = (other); \ + if (NULL == other) \ + { \ + (element)->prev = (tail); \ + (tail) = (element); \ + } \ + else \ + { \ + (element)->prev = (other)->prev; \ + (other)->prev = (element); \ + } \ + if (NULL == (element)->prev) \ + (head) = (element); \ + else \ + (element)->prev->next = (element); } while (0) /** diff --git a/src/transport/plugin_transport_wlan.c b/src/transport/plugin_transport_wlan.c index 5ebe0a76d..4b83db3e9 100644 --- a/src/transport/plugin_transport_wlan.c +++ b/src/transport/plugin_transport_wlan.c @@ -48,6 +48,7 @@ #define FRAGMENT_TIMEOUT 1000 +#define FRAGMENT_QUEUE_SIZE 10 #define DEBUG_wlan GNUNET_NO @@ -164,6 +165,24 @@ struct Plugin */ uint pendingsessions; + /** + * Messages in the fragmentation queue, head + */ + + struct FragmentMessage * pending_Fragment_Messages_head; + + /** + * Messages in the fragmentation queue, tail + */ + + struct FragmentMessage * pending_Fragment_Messages_tail; + + /** + * number of pending fragment message + */ + + uint pending_fragment_messages; + }; //TODO doxigen @@ -177,13 +196,49 @@ struct Sessionqueue //TODO doxigen -struct FragmentQueue +struct AckQueue { - struct FragmentQueue * next; - struct FragmentQueue * prev; + struct AckQueue * next; + struct AckQueue * prev; int fragment_num; }; +/** + * Information kept for each message that is yet to + * be transmitted. + */ +struct PendingMessage +{ + + /** + * The pending message + */ + char *msg; + + /** + * Continuation function to call once the message + * has been sent. Can be NULL if there is no + * continuation to call. + */ + GNUNET_TRANSPORT_TransmitContinuation transmit_cont; + + /** + * Cls for transmit_cont + */ + void * transmit_cont_cls; + + /** + * Timeout value for the pending message. + */ + struct GNUNET_TIME_Absolute timeout; + + /** + * Size of the message + */ + size_t message_size; + +}; + /** * Session handle for connections. */ @@ -201,16 +256,10 @@ struct Session struct Plugin *plugin; /** - * Messages currently pending for transmission + * Message currently pending for transmission * to this peer, if any. */ - struct PendingMessage *pending_messages_head; - - /** - * Messages currently pending for transmission - * to this peer, if any. - */ - struct PendingMessage *pending_messages_tail; + struct PendingMessage *pending_message; /** * To whom are we talking to (set to our identity @@ -241,7 +290,7 @@ struct Session struct GNUNET_TIME_Absolute last_activity; /** - * current number for message incoming , to distinguish between the messages + * current number for message incoming, to distinguish between the messages */ uint32_t message_id_in; @@ -250,79 +299,76 @@ struct Session */ uint32_t message_id_out; + /** + * does this session have a message in the fragment queue + */ + + int has_fragment; }; -/** - * Information kept for each message that is yet to - * be transmitted. - */ -struct PendingMessage + + + +struct FragmentMessage { + /** + * Session this message belongs to + */ - /** - * This is a doubly-linked list. - */ - struct PendingMessage *next; + struct Session *session; - /** - * This is a doubly-linked list. - */ - struct PendingMessage *prev; + /** + * This is a doubly-linked list. + */ + struct FragmentMessage *next; - /** - * The pending message - */ - const char *msg; + /** + * This is a doubly-linked list. + */ + struct FragmentMessage *prev; - /** - * Continuation function to call once the message - * has been sent. Can be NULL if there is no - * continuation to call. - */ - GNUNET_TRANSPORT_TransmitContinuation transmit_cont; + /** + * The pending message + */ + char *msg; - /** - * Cls for transmit_cont - */ - void * transmit_cont_cls; + /** + * Timeout value for the pending message. + */ + struct GNUNET_TIME_Absolute timeout; - /** - * Timeout value for the pending message. - */ - struct GNUNET_TIME_Absolute timeout; + /** + * Timeout value for the pending fragments. + * Stores the time when the next msg fragment ack has to be received + */ + struct GNUNET_TIME_Absolute next_ack; - /** - * Timeout value for the pending fragments. - * Stores the time when the last msg fragment ack was received - */ - struct GNUNET_TIME_Absolute last_ack; + /** + * Sorted queue with the acks received for fragments; head + */ - /** - * Sorted queue with the acks received for fragments; head - */ + struct AckQueue * head; - struct FragmentQueue * head; + /** + * Sorted queue with the acks received for fragments; tail + */ - /** - * Sorted queue with the acks received for fragments; tail - */ + struct AckQueue * tail; - struct FragmentQueue * tail; + /** + * Size of the message + */ + size_t message_size; - /** - * Size of the message - */ - size_t message_size; + /** + * pos / next fragment number in the message, for fragmentation/segmentation, + * some acks can be missing but there is still time + */ + uint32_t message_pos; - /** - * pos / next fragment number in the message, for fragmentation/segmentation, - * some acks can be missing but there is still time - */ - uint32_t message_pos; }; - /** * Header for messages which need fragmentation */ @@ -401,6 +447,8 @@ static int wlan_plugin_address_suggested (void *cls, const void *addr, size_t addrlen); uint16_t getcrc16 (const char *msgbuf, size_t msgbuf_size); +static void do_transmit (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); +static void check_fragment_queue (struct Plugin * plugin); /** * get the next message number, at the moment just a random one @@ -461,23 +509,25 @@ get_Session (struct Plugin *plugin, queue->content->plugin = plugin; memcpy(queue->content->addr, addr, 6); queue->content->message_id_out = get_next_message_id(); + queue->content->has_fragment = 0; //queue welcome message for new sessions, not realy needed //struct WelcomeMessage welcome; struct PendingMessage *pm; - pm = GNUNET_malloc (sizeof (struct PendingMessage) + GNUNET_HELLO_size(* (plugin->env->our_hello))); - pm->msg = (const char*) &pm[1]; + pm = GNUNET_malloc (sizeof (struct PendingMessage)); + pm->msg = GNUNET_malloc(GNUNET_HELLO_size(* (plugin->env->our_hello))); pm->message_size = GNUNET_HELLO_size(* (plugin->env->our_hello)); //welcome.header.size = htons (GNUNET_HELLO_size(* (plugin->env->our_hello))); //welcome.header.type = htons (GNUNET_MESSAGE_TYPE_WLAN_ADVERTISEMENT); //welcome.clientIdentity = *plugin->env->my_identity; - memcpy (&pm[1], * plugin->env->our_hello, GNUNET_HELLO_size(* (plugin->env->our_hello))); + memcpy ( (pm->msg), * plugin->env->our_hello, GNUNET_HELLO_size(* (plugin->env->our_hello))); pm->timeout = GNUNET_TIME_UNIT_FOREVER_ABS; - GNUNET_CONTAINER_DLL_insert ((queue->content)->pending_messages_head, - (queue->content)->pending_messages_tail, - pm); + queue->content->pending_message = pm; plugin->pendingsessions ++; - GNUNET_CONTAINER_DLL_insert_after(plugin->pending_Sessions, plugin->pending_Sessions_tail, plugin->pending_Sessions_tail, queue); + GNUNET_CONTAINER_DLL_insert_tail(plugin->pending_Sessions, plugin->pending_Sessions_tail, queue); + + check_fragment_queue(plugin); + return queue->content; } @@ -520,8 +570,8 @@ queue_Session (struct Plugin *plugin, //TODO doxigen static void -free_acks (struct PendingMessage * pm){ - struct FragmentQueue * fq; +free_acks (struct FragmentMessage * pm){ + struct AckQueue * fq; while (pm->head != NULL){ fq = pm->head; GNUNET_CONTAINER_DLL_remove(pm->head, pm->tail, fq); @@ -529,6 +579,181 @@ free_acks (struct PendingMessage * pm){ } } +//TODO doxigen +static void +delay_fragment_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc){ + struct Plugin * plugin = cls; + plugin->server_write_task = GNUNET_SCHEDULER_NO_TASK; + + if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN) + return; + + // GNUNET_TIME_UNIT_FOREVER_REL is needed to clean up old msg + plugin->server_write_task + = GNUNET_SCHEDULER_add_write_file(GNUNET_TIME_UNIT_FOREVER_REL, + plugin->server_stdin_handle, + &do_transmit, + plugin); +} + + +//TODO doxigen +struct GNUNET_TIME_Relative +get_next_frag_timeout (struct FragmentMessage * fm) +{ + return GNUNET_TIME_relative_min(GNUNET_TIME_absolute_get_remaining(fm->next_ack), GNUNET_TIME_absolute_get_remaining(fm->timeout)); +} + +//TODO doxigen +/** + * Function to get the timeout value for acks for this session + */ + +struct GNUNET_TIME_Relative +get_ack_timeout (struct FragmentMessage * fm){ + struct GNUNET_TIME_Relative timeout; + timeout.rel_value = FRAGMENT_TIMEOUT; + return timeout; +} + +//TODO doxigen +/** + * Function to set the timer for the next timeout of the fragment queue + */ +static void +check_next_fragment_timeout (struct Plugin * plugin){ + struct FragmentMessage * fm; + if (plugin->server_write_task != GNUNET_SCHEDULER_NO_TASK){ + GNUNET_SCHEDULER_cancel(plugin->server_write_task); + } + fm = plugin->pending_Fragment_Messages_head; + if (fm != NULL){ + plugin->server_write_task = GNUNET_SCHEDULER_add_delayed(get_next_frag_timeout(fm), &delay_fragment_task, plugin); + } +} + +//TODO doxigen +/** + * Function to get the next queued Session, removes the session from the queue + */ + +static struct Session * +get_next_queue_Session (struct Plugin * plugin){ + struct Session * session; + struct Sessionqueue * sessionqueue; + struct Sessionqueue * sessionqueue_alt; + struct PendingMessage * pm; + sessionqueue = plugin->pending_Sessions; + while (sessionqueue != NULL){ + session = sessionqueue->content; + pm = session->pending_message; + + //check for message timeout + if (GNUNET_TIME_absolute_get_remaining(pm->timeout).rel_value > 0){ + //check if session has no message in the fragment queue + if (! session->has_fragment){ + plugin->pendingsessions --; + GNUNET_CONTAINER_DLL_remove (plugin->pending_Sessions, + plugin->pending_Sessions_tail, sessionqueue); + GNUNET_free(sessionqueue); + + return session; + } else { + sessionqueue = sessionqueue->next; + } + } else { + + session->pending_message = NULL; + //call the cont func that it did not work + if (pm->transmit_cont != NULL) + pm->transmit_cont (pm->transmit_cont_cls, + &(session->target), GNUNET_SYSERR); + GNUNET_free(pm->msg); + GNUNET_free(pm); + + sessionqueue_alt = sessionqueue; + sessionqueue = sessionqueue->next; + plugin->pendingsessions --; + GNUNET_CONTAINER_DLL_remove (plugin->pending_Sessions, + plugin->pending_Sessions_tail, sessionqueue_alt); + + GNUNET_free(sessionqueue_alt); + + } + + + } + return NULL; +} + +//TODO doxigen +/** + * Function to sort the message into the message fragment queue + */ +static void +sort_fragment_into_queue (struct Plugin * plugin, struct FragmentMessage * fm){ + struct FragmentMessage * fm2; + //sort into the list at the right position + + fm2 = plugin->pending_Fragment_Messages_head; + + while (fm2 != NULL){ + if (GNUNET_TIME_absolute_get_difference(fm2->next_ack, fm->next_ack).rel_value == 0){ + break; + } else { + fm2 = fm2->next; + } + } + + GNUNET_CONTAINER_DLL_insert_after(plugin->pending_Fragment_Messages_head, + plugin->pending_Fragment_Messages_tail,fm2,fm); +} + +//TODO doxigen +/** + * Function to check if there is some space in the fragment queue + */ + +static void +check_fragment_queue (struct Plugin * plugin){ + struct Session * session; + struct FragmentMessage * fm; + + struct PendingMessage * pm; + + if (plugin->pending_fragment_messages < FRAGMENT_QUEUE_SIZE){ + session = get_next_queue_Session(plugin); + if (session != NULL){ + pm = session->pending_message; + session->pending_message = NULL; + session->has_fragment = 1; + GNUNET_assert(pm != NULL); + + fm = GNUNET_malloc(sizeof(struct FragmentMessage)); + fm->message_size = pm->message_size; + fm->msg = pm->msg; + fm->session = session; + fm->timeout.abs_value = pm->timeout.abs_value; + fm->message_pos = 0; + fm->next_ack = GNUNET_TIME_absolute_get(); + + if (pm->transmit_cont != NULL) + pm->transmit_cont (pm->transmit_cont_cls, + &(session->target), GNUNET_OK); + GNUNET_free(pm); + + sort_fragment_into_queue(plugin,fm); + plugin->pending_fragment_messages ++; + + //generate new message id + session->message_id_out = get_next_message_id(); + + //check if timeout changed + check_next_fragment_timeout(plugin); + } + } +} + /** * Function called to when wlan helper is ready to get some data * @@ -539,15 +764,17 @@ free_acks (struct PendingMessage * pm){ static void do_transmit (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { + struct Plugin * plugin = cls; + plugin->server_write_task = GNUNET_SCHEDULER_NO_TASK; + ssize_t bytes; if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN) return; struct Session * session; - struct Sessionqueue * queue; - struct PendingMessage * pm; + struct FragmentMessage * fm; struct IeeeHeader * wlanheader; struct RadiotapHeader * radioHeader; struct GNUNET_MessageHeader * msgheader; @@ -556,289 +783,143 @@ do_transmit (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) const char * copystart = NULL; uint16_t copysize = 0; uint copyoffset = 0; - struct FragmentQueue * akt = NULL; - int exit = 0; - - int i = 0; - - struct GNUNET_TIME_Absolute nextsend; - struct GNUNET_TIME_Relative timeout; - struct Sessionqueue * nextsession = NULL; - - timeout.rel_value = FRAGMENT_TIMEOUT; - nextsend = GNUNET_TIME_absolute_get_forever(); + struct AckQueue * akt = NULL; + //int exit = 0; - queue = plugin->pending_Sessions; - - // check if the are some pending sessions/messages ... - GNUNET_assert(queue != NULL); - - session = queue->content; + fm = plugin->pending_Fragment_Messages_head; + GNUNET_assert(fm != NULL); + session = fm->session; GNUNET_assert(session != NULL); - pm = session->pending_messages_head; - GNUNET_assert(pm != NULL); - - // get next valid session - // check if this session is only waiting to receive the acks for an already send fragments to finish it - // timeout is not reached - for (i = 0; i < plugin->pendingsessions; i++){ - - // check if the are some pending sessions/messages ... - GNUNET_assert(queue != NULL); + // test if message timed out + if (GNUNET_TIME_absolute_get_remaining(fm->timeout).rel_value == 0){ + free_acks(fm); + GNUNET_assert(plugin->pending_fragment_messages > 0); + plugin->pending_fragment_messages --; + GNUNET_CONTAINER_DLL_remove(plugin->pending_Fragment_Messages_head, + plugin->pending_Fragment_Messages_tail, fm); - session = queue->content; - GNUNET_assert(session != NULL); + GNUNET_free(fm->msg); - pm = session->pending_messages_head; - GNUNET_assert(pm != NULL); - - //save next session - nextsession = queue->next; - // test if message timed out - while (GNUNET_TIME_absolute_get_remaining(pm->timeout).rel_value == 0){ - //remove message - //free the acks - free_acks (pm); - //call the cont func that it did not work - if (pm->transmit_cont != NULL) - pm->transmit_cont (pm->transmit_cont_cls, - &(session->target), GNUNET_SYSERR); - //remove the message - GNUNET_CONTAINER_DLL_remove (session->pending_messages_head, - session->pending_messages_tail, - pm); - GNUNET_free(pm); - - //test if there are no more messages pending for this session - if (session->pending_messages_head == NULL){ - - //test if tail is null too - GNUNET_assert(session->pending_messages_tail == NULL); - - plugin->pendingsessions --; - GNUNET_CONTAINER_DLL_remove (plugin->pending_Sessions, plugin->pending_Sessions_tail, queue); - GNUNET_free(queue); - queue = NULL; - break; - - } else { - pm = session->pending_messages_head; - } - - } - // restore next session if necessary - if (queue == NULL){ - queue = nextsession; - nextsession = NULL; - //there are no more messages in this session - continue; - } - nextsession = NULL; - - // test if retransmit is needed - if (GNUNET_TIME_absolute_get_duration(pm->last_ack).rel_value < FRAGMENT_TIMEOUT) { - // get last offset for this message - copyoffset = pm->message_size /(WLAN_MTU - sizeof(struct FragmentationHeader)); - // one more is the end - copyoffset ++; - // test if it is not the end - if (copyoffset > pm->message_pos){ - nextsession = queue; - break; - } - - nextsend = GNUNET_TIME_absolute_min(GNUNET_TIME_absolute_add(pm->last_ack, timeout), nextsend); - - GNUNET_CONTAINER_DLL_remove (plugin->pending_Sessions, plugin->pending_Sessions_tail, queue); - //insert at the tail - GNUNET_CONTAINER_DLL_insert_after (plugin->pending_Sessions, - plugin->pending_Sessions_tail, - plugin->pending_Sessions_tail, queue); - - //get next pending session - queue = queue->next; - - } else { - // retransmit - nextsession = queue; - break; - } - } + GNUNET_free(fm); + check_fragment_queue(plugin); + } else { + if (fm->message_size > WLAN_MTU) { + size += sizeof(struct FragmentationHeader); + // check/set for retransmission + if (GNUNET_TIME_absolute_get_duration(fm->next_ack).rel_value == 0) { + + // be positive and try again later :-D + fm->next_ack = GNUNET_TIME_relative_to_absolute(get_ack_timeout(fm)); + // find first missing fragment + akt = fm->head; + fm->message_pos = 0; + + //test if ack 0 was already received + while (akt != NULL){ + //if fragment is present, take next + if (akt->fragment_num == fm->message_pos) { + fm->message_pos ++; + } + //next ack is bigger then the fragment number + //in case there is something like this: (acks) 1, 2, 5, 6, ... + //and we send 3 again, the next number should be 4 + else if (akt->fragment_num > fm->message_pos) { + break; + } + + akt = akt->next; - //test if there is one session to send something - if (nextsession != NULL){ - queue = nextsession; + } - // check if the are some pending sessions/messages ... - GNUNET_assert(queue != NULL); - session = queue->content; - GNUNET_assert(session != NULL); + } - pm = session->pending_messages_head; - GNUNET_assert(pm != NULL); - } else { - //nothing to send at the moment - plugin->server_read_task = - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining(nextsend), - &do_transmit, plugin); + copyoffset = (WLAN_MTU - sizeof(struct FragmentationHeader)) * fm->message_pos; + fragheader.fragment_off_or_num = htons(fm->message_pos); + fragheader.message_id = htonl(session->message_id_out); - } + // start should be smaller then the packet size + GNUNET_assert(copyoffset < fm->message_size); + copystart = fm->msg + copyoffset; + //size of the fragment is either the MTU - overhead + //or the missing part of the message in case this is the last fragment + copysize = GNUNET_MIN(fm->message_size - copyoffset, + WLAN_MTU - sizeof(struct FragmentationHeader)); + fragheader.header.size = htons(copysize); + fragheader.header.type = GNUNET_MESSAGE_TYPE_WLAN_FRAGMENT; - if (pm->message_size > WLAN_MTU) { - size += sizeof(struct FragmentationHeader); - // check for retransmission - if (GNUNET_TIME_absolute_get_duration(pm->last_ack).rel_value > FRAGMENT_TIMEOUT) { - // TODO retransmit - // be positive and try again later :-D - pm->last_ack = GNUNET_TIME_absolute_get(); - // find first missing fragment - exit = 0; - akt = pm->head; - pm->message_pos = 0; + //get the next missing fragment + akt = fm->head; + fm->message_pos ++; - //test if ack 0 was already received + //test if ack was already received while (akt != NULL){ //if fragment is present, take next - if (akt->fragment_num == pm->message_pos) { - pm->message_pos ++; + if (akt->fragment_num == fm->message_pos) { + fm->message_pos ++; } //next ack is bigger then the fragment number //in case there is something like this: (acks) 1, 2, 5, 6, ... //and we send 3 again, the next number should be 4 - if (akt->fragment_num > pm->message_pos) { - break; - } + else if (akt->fragment_num > fm->message_pos) { + break; + } akt = akt->next; - } + } else { + // there is no need to split + copystart = fm->msg; + copysize = fm->message_size; + } + size += copysize; + size += sizeof(struct RadiotapHeader) + sizeof(struct IeeeHeader) + + sizeof(struct GNUNET_MessageHeader); + msgheader = GNUNET_malloc(size); + msgheader->size = htons(size - sizeof(struct GNUNET_MessageHeader)); + msgheader->type = GNUNET_MESSAGE_TYPE_WLAN_HELPER_DATA; - } + radioHeader = (struct RadiotapHeader*) &msgheader[1]; + getRadiotapHeader(radioHeader); - copyoffset = (WLAN_MTU - sizeof(struct FragmentationHeader)) * pm->message_pos; - fragheader.fragment_off_or_num = pm->message_pos; - fragheader.message_id = session->message_id_out; - - // start should be smaller then the packet size - //TODO send some other data if everything was send but not all acks are present - GNUNET_assert(copyoffset < pm->message_size); - copystart = pm->msg + copyoffset; - - //size of the fragment is either the MTU - overhead - //or the missing part of the message in case this is the last fragment - copysize = GNUNET_MIN(pm->message_size - copyoffset, - WLAN_MTU - sizeof(struct FragmentationHeader)); - fragheader.header.size = copysize; - fragheader.header.type = GNUNET_MESSAGE_TYPE_WLAN_FRAGMENT; - - //get the next missing fragment - exit = 0; - akt = pm->head; - pm->message_pos ++; - - //test if ack was already received - while (akt != NULL){ - //if fragment is present, take next - if (akt->fragment_num == pm->message_pos) { - pm->message_pos ++; - } - //next ack is bigger then the fragment number - //in case there is something like this: (acks) 1, 2, 5, 6, ... - //and we send 3 again, the next number should be 4 - if (akt->fragment_num > pm->message_pos) { - break; - } - - akt = akt->next; - } + wlanheader = (struct IeeeHeader *) &radioHeader[1]; + getWlanHeader(wlanheader); + //could be faster if content is just send and not copyed before + //fragmentheader is needed + if (fm->message_size > WLAN_MTU){ + fragheader.message_crc = htons(getcrc16(copystart, copysize)); + memcpy(&wlanheader[1],&fragheader, sizeof(struct FragmentationHeader)); + memcpy(&wlanheader[1] + sizeof(struct FragmentationHeader),copystart,copysize); } else { - // there is no need to split - copystart = pm->msg; - copysize = pm->message_size; + memcpy(&wlanheader[1],copystart,copysize); } - size += copysize; - size += sizeof(struct RadiotapHeader) + sizeof(struct IeeeHeader) - + sizeof(struct GNUNET_MessageHeader); - msgheader = GNUNET_malloc(size); - msgheader->size = htons(size - sizeof(struct GNUNET_MessageHeader)); - msgheader->type = GNUNET_MESSAGE_TYPE_WLAN_HELPER_DATA; - - radioHeader = (struct RadiotapHeader*) &msgheader[1]; - getRadiotapHeader(radioHeader); - - wlanheader = (struct IeeeHeader *) &radioHeader[1]; - getWlanHeader(wlanheader); - - - //could be faster if content is just send and not copyed before - //fragmentheader is needed - if (pm->message_size > WLAN_MTU){ - fragheader.message_crc = getcrc16(copystart, copysize); - memcpy(&wlanheader[1],&fragheader, sizeof(struct FragmentationHeader)); - memcpy(&wlanheader[1] + sizeof(struct FragmentationHeader),copystart,copysize); - } else { - memcpy(&wlanheader[1],copystart,copysize); - } - - bytes = GNUNET_DISK_file_write(plugin->server_stdin_handle, msgheader, size); - - - - + bytes = GNUNET_DISK_file_write(plugin->server_stdin_handle, msgheader, size); + GNUNET_assert(bytes == size); + //check if this was the last fragment of this message, if true then queue at the end of the list + if (copysize + copyoffset >= fm->message_size){ + GNUNET_assert(copysize + copyoffset == fm->message_size); + GNUNET_CONTAINER_DLL_remove (plugin->pending_Fragment_Messages_head, + plugin->pending_Fragment_Messages_tail, fm); - if (bytes < 1) - { - return; - } - - //plugin->server_read_task = - //GNUNET_SCHEDULER_add_read_file (GNUNET_TIME_UNIT_FOREVER_REL, - // plugin->server_stdout_handle, &wlan_plugin_helper_read, plugin); - -} - - - -/** - * If we have pending messages, ask the server to - * transmit them (schedule the respective tasks, etc.) - * - * @param Plugin env to get everything needed - */ -static void -process_pending_messages (struct Plugin * plugin) -{ - struct Sessionqueue * queue; - struct Session * session; + GNUNET_CONTAINER_DLL_insert_tail(plugin->pending_Fragment_Messages_head, + plugin->pending_Fragment_Messages_tail, fm); + // if fragments have opimized timeouts + //sort_fragment_into_queue(plugin,fm); - if (plugin->pending_Sessions == NULL) - return; - - queue = plugin->pending_Sessions; - //contet should not be empty - GNUNET_assert(queue->content != NULL); - - session = queue->content; - //pending sessions should have some msg - GNUNET_assert(session->pending_messages_head != NULL); + } + check_next_fragment_timeout(plugin); - // GNUNET_TIME_UNIT_FOREVER_REL is needed to clean up old msg - plugin->server_write_task - = GNUNET_SCHEDULER_add_write_file(GNUNET_TIME_UNIT_FOREVER_REL, - plugin->server_stdin_handle, - &do_transmit, - plugin); + } } @@ -940,48 +1021,24 @@ wlan_plugin_send (void *cls, queue_Session(plugin, session); //queue message in session - newmsg = GNUNET_malloc(sizeof(struct PendingMessage) + msgbuf_size + sizeof(struct WlanHeader)); - newmsg->msg = (const char*) &newmsg[1]; - wlanheader = (struct WlanHeader *) &newmsg[1]; - //copy msg to buffer, not fragmented / segmented yet, but with message header - wlanheader->header.size = msgbuf_size; - wlanheader->header.type = GNUNET_MESSAGE_TYPE_WLAN_DATA; - wlanheader->target = *target; - wlanheader->crc = getcrc32(msgbuf, msgbuf_size); - memcpy(&wlanheader[1], msgbuf, msgbuf_size); - newmsg->transmit_cont = cont; - newmsg->transmit_cont_cls = cont_cls; - newmsg->timeout = GNUNET_TIME_relative_to_absolute(timeout); - newmsg->message_pos = 0; - newmsg->message_size = msgbuf_size + sizeof(struct WlanHeader); - newmsg->next = NULL; - - //check if queue is empty - struct PendingMessage * tailmsg; - tailmsg = session->pending_messages_tail; - - //new tail is the new msg - session->pending_messages_tail = newmsg; - newmsg->prev = tailmsg; - - //test if tail was not NULL (queue is empty) - if (tailmsg == NULL){ - // head should be NULL too - GNUNET_assert(session->pending_messages_head == NULL); - - session->pending_messages_head = newmsg; - + if (session->pending_message == NULL){ + newmsg = GNUNET_malloc(sizeof(struct PendingMessage)); + (newmsg->msg) = GNUNET_malloc(msgbuf_size + sizeof(struct WlanHeader)); + wlanheader = (struct WlanHeader *) newmsg->msg; + //copy msg to buffer, not fragmented / segmented yet, but with message header + wlanheader->header.size = htons(msgbuf_size); + wlanheader->header.type = GNUNET_MESSAGE_TYPE_WLAN_DATA; + wlanheader->target = *target; + wlanheader->crc = getcrc32(msgbuf, msgbuf_size); + memcpy(&wlanheader[1], msgbuf, msgbuf_size); + newmsg->transmit_cont = cont; + newmsg->transmit_cont_cls = cont_cls; + newmsg->timeout = GNUNET_TIME_relative_to_absolute(timeout); + newmsg->message_size = msgbuf_size + sizeof(struct WlanHeader); } else { - //next at the tail should be NULL - GNUNET_assert(tailmsg->next == NULL); - - //queue the msg - tailmsg->next = newmsg; + //TODO if message is send while hello is still pending, other cases should not occur } - - process_pending_messages(plugin); - - + check_fragment_queue(plugin); //FIXME not the correct size return msgbuf_size; @@ -1012,15 +1069,11 @@ wlan_plugin_disconnect (void *cls, GNUNET_assert (queue->content == NULL); if (memcmp(target, &(queue->content->target), sizeof(struct GNUNET_PeerIdentity)) == 0) { - // sesion found + // session found // remove PendingMessage - while (queue->content->pending_messages_head != NULL){ - pm = queue->content->pending_messages_head; - free_acks(pm); - GNUNET_CONTAINER_DLL_remove(queue->content->pending_messages_head,queue->content->pending_messages_tail, pm); - GNUNET_free(pm); - - } + pm = queue->content->pending_message; + GNUNET_free(pm->msg); + GNUNET_free(pm); GNUNET_free(queue->content); GNUNET_CONTAINER_DLL_remove(plugin->sessions, plugin->sessions_tail, queue); @@ -1295,6 +1348,8 @@ gnunet_plugin_transport_wlan_init (void *cls) plugin = GNUNET_malloc (sizeof (struct Plugin)); plugin->env = env; plugin->pendingsessions = 0; + plugin->server_write_task = GNUNET_SCHEDULER_NO_TASK; + plugin->server_read_task = GNUNET_SCHEDULER_NO_TASK; wlan_transport_start_wlan_helper(plugin); plugin->consoltoken = GNUNET_SERVER_mst_create(&wlan_process_helper,plugin); @@ -1310,6 +1365,7 @@ gnunet_plugin_transport_wlan_init (void *cls) api->check_address = &wlan_plugin_address_suggested; api->address_to_string = &wlan_plugin_address_to_string; + start_next_message_id(); return api;