Changeset dce3903 for protocols


Ignore:
Timestamp:
2007-12-04T00:48:57Z (17 years ago)
Author:
ulim <a.sporto+bee@…>
Branches:
master
Children:
8076ec0, fa30fa5
Parents:
2ff2076
Message:

Send and receive seems to work now! Also adopted the new buffering strategy,
only one buffer of 2k per transfer now.

Location:
protocols
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • protocols/ft.h

    r2ff2076 rdce3903  
    2626#ifndef _FT_H
    2727#define _FT_H
     28
     29/*
     30 * One buffer is needed for each transfer. The receiver stores a message
     31 * in it and gives it to the sender. The sender will stall the receiver
     32 * till the buffer has been sent out.
     33 */
     34#define FT_BUFFER_SIZE 2048
    2835
    2936typedef enum {
     
    131138       
    132139        /*
    133          * If set, called when the transfer queue is running empty and
    134          * more data can be added.
     140         * called by the sending side to indicate that it is writable.
     141         * The callee should check if data is available and call the
     142         * function(as seen below) if that is the case.
    135143         */
    136         void (*out_of_data) ( struct file_transfer *file );
     144        gboolean (*write_request) ( struct file_transfer *file );
    137145
    138146        /*
    139147         * When sending files, protocols register this function to receive data.
     148         * This should only be called once after write_request is called. The caller
     149         * should not read more data until write_request is called again. This technique
     150         * avoids buffering.
    140151         */
    141         gboolean (*write) (struct file_transfer *file, char *buffer, int len );
     152        gboolean (*write) (struct file_transfer *file, char *buffer, unsigned int len );
     153
     154        /* The send buffer associated with this transfer.
     155         * Since receivers always wait for a write_request call one is enough.
     156         */
     157        char buffer[FT_BUFFER_SIZE];
    142158
    143159} file_transfer_t;
     
    154170void imcb_file_canceled( file_transfer_t *file, char *reason );
    155171
    156 /*
    157  * The given buffer is queued for transfer and MUST NOT be freed by the caller.
    158  * When the method returns false the caller should not invoke this method again
    159  * until out_of_data has been called.
    160  */
    161 gboolean imcb_file_write( file_transfer_t *file, gpointer data, size_t data_size );
    162 
    163172gboolean imcb_file_recv_start( file_transfer_t *ft );
    164173#endif
  • protocols/jabber/jabber.h

    r2ff2076 rdce3903  
    146146
    147147        size_t bytesread, byteswritten;
    148         int receiver_overflow;
    149148        int fd;
    150149        struct sockaddr_storage saddr;
     
    209208int jabber_bs_recv_request( struct im_connection *ic, struct xt_node *node, struct xt_node *qnode);
    210209gboolean jabber_bs_send_start( struct jabber_transfer *tf );
    211 gboolean jabber_bs_send_write( file_transfer_t *ft, char *buffer, int len );
     210gboolean jabber_bs_send_write( file_transfer_t *ft, char *buffer, unsigned int len );
    212211
    213212/* message.c */
  • protocols/jabber/s5bytestream.c

    r2ff2076 rdce3903  
    7272                return jabber_bs_abort( bt , msg ": %s", strerror( errno ) );
    7373
    74 #define JABBER_BS_BUFSIZE 65536
    75 
    7674gboolean jabber_bs_abort( struct bs_transfer *bt, char *format, ... );
    7775void jabber_bs_canceled( file_transfer_t *ft , char *reason );
     
    8381void jabber_bs_recv_answer_request( struct bs_transfer *bt );
    8482gboolean jabber_bs_recv_read( gpointer data, gint fd, b_input_condition cond );
    85 void jabber_bs_recv_out_of_data( file_transfer_t *ft );
     83gboolean jabber_bs_recv_write_request( file_transfer_t *ft );
    8684gboolean jabber_bs_recv_handshake( gpointer data, gint fd, b_input_condition cond );
    8785gboolean jabber_bs_recv_handshake_abort( struct bs_transfer *bt, char *error );
     
    109107        xt_free_node( bt->qnode );
    110108        g_free( bt );
    111 //iq_id
     109
    112110        jabber_si_free_transfer( ft );
    113111}
     
    326324                                sock_make_nonblocking( fd );
    327325
    328                                 imcb_log( bt->tf->ic, "Transferring file %s: Connecting to streamhost %s:%s", bt->tf->ft->file_name, host, port );
     326                                imcb_log( bt->tf->ic, "File %s: Connecting to streamhost %s:%s", bt->tf->ft->file_name, host, port );
    329327
    330328                                if( ( connect( fd, rp->ai_addr, rp->ai_addrlen ) == -1 ) &&
     
    426424                        jabber_bs_recv_answer_request( bt );
    427425
    428                         // reset in answer_request bt->tf->watch_in = 0;
    429426                        return FALSE;
    430427                }
     
    441438 * If the handshake failed we can try the next streamhost, if there is one.
    442439 * An intelligent sender would probably specify himself as the first streamhost and
    443  * a proxy as the second (Kopete is an example here). That way, a (potentially)
    444  * slow proxy is only used if neccessary.
     440 * a proxy as the second (Kopete and PSI are examples here). That way, a (potentially)
     441 * slow proxy is only used if neccessary. This of course also means, that the timeout
     442 * per streamhost should be kept short. If one or two firewalled adresses are specified,
     443 * they have to timeout first before a proxy is tried.
    445444 */
    446445gboolean jabber_bs_recv_handshake_abort( struct bs_transfer *bt, char *error )
     
    494493        struct xt_node *reply;
    495494
    496         imcb_log( tf->ic, "Transferring file %s: established SOCKS5 connection to %s:%s",
     495        imcb_log( tf->ic, "File %s: established SOCKS5 connection to %s:%s",
    497496                  tf->ft->file_name,
    498497                  xt_find_attr( bt->shnode, "host" ),
     
    501500        tf->ft->data = tf;
    502501        tf->ft->started = time( NULL );
    503         tf->watch_in = b_input_add( tf->fd, GAIM_INPUT_READ, jabber_bs_recv_read, tf );
    504         tf->ft->out_of_data = jabber_bs_recv_out_of_data;
     502        tf->watch_in = b_input_add( tf->fd, GAIM_INPUT_READ, jabber_bs_recv_read, bt );
     503        tf->ft->write_request = jabber_bs_recv_write_request;
    505504
    506505        reply = xt_new_node( "streamhost-used", NULL, NULL );
     
    519518}
    520519
    521 /* Reads till it is unscheduled or the receiver signifies an overflow. */
     520/*
     521 * This function is called from write_request directly. If no data is available, it will install itself
     522 * as a watcher for input on fd and once that happens, deliver the data and unschedule itself again.
     523 */
    522524gboolean jabber_bs_recv_read( gpointer data, gint fd, b_input_condition cond )
    523525{
    524526        int ret;
    525         struct jabber_transfer *tf = data;
    526         struct bs_transfer *bt = tf->streamhandle;
    527         char *buffer = g_malloc( JABBER_BS_BUFSIZE );
    528 
    529         if (tf->receiver_overflow)
    530         {
    531                 if( tf->watch_in )
     527        struct bs_transfer *bt = data;
     528        struct jabber_transfer *tf = bt->tf;
     529
     530        if( fd != 0 ) /* called via event thread */
     531        {
     532                tf->watch_in = 0;
     533                ASSERTSOCKOP( ret = recv( fd, tf->ft->buffer, sizeof( tf->ft->buffer ), 0 ) , "Receiving" );
     534        }
     535        else
     536        {
     537                /* called directly. There might not be any data available. */
     538                if( ( ( ret = recv( tf->fd, tf->ft->buffer, sizeof( tf->ft->buffer ), 0 ) ) == -1 ) &&
     539                    ( errno != EAGAIN ) )
     540                    return jabber_bs_abort( bt, "Receiving: %s", strerror( errno ) );
     541
     542                if( ( ret == -1 ) && ( errno == EAGAIN ) )
    532543                {
    533                         /* should never happen, BUG */
    534                         imcb_file_canceled( tf->ft, "Bug in jabber file transfer code: read while overflow is true. Please report" );
     544                        tf->watch_in = b_input_add( tf->fd, GAIM_INPUT_READ, jabber_bs_recv_read, bt );
    535545                        return FALSE;
    536546                }
    537547        }
    538548
    539         ASSERTSOCKOP( ret = recv( fd, buffer, JABBER_BS_BUFSIZE, 0 ) , "Receiving" );
    540 
    541         /* that should be all */
     549        /* shouldn't happen since we know the file size */
    542550        if( ret == 0 )
     551                return jabber_bs_abort( bt, "Remote end closed connection" );
     552       
     553        tf->bytesread += ret;
     554
     555        tf->ft->write( tf->ft, tf->ft->buffer, ret );   
     556
     557        return FALSE;
     558}
     559
     560/*
     561 * imc callback that is invoked when it is ready to receive some data.
     562 */
     563gboolean jabber_bs_recv_write_request( file_transfer_t *ft )
     564{
     565        struct jabber_transfer *tf = ft->data;
     566
     567        if( tf->watch_in )
     568        {
     569                imcb_file_canceled( ft, "BUG in jabber file transfer: write_request called when already watching for input" );
    543570                return FALSE;
    544        
    545         tf->bytesread += ret;
    546 
    547         buffer = g_realloc( buffer, ret );
    548 
    549         if ( ( tf->receiver_overflow = imcb_file_write( tf->ft, buffer, ret ) ) )
    550         {
    551                 /* wait for imcb to run out of data */
    552                 tf->watch_in = 0;
    553                 return FALSE;
    554         }
    555                
     571        }
     572       
     573        jabber_bs_recv_read( tf->streamhandle, 0 , 0 );
     574
    556575        return TRUE;
    557576}
    558577
    559 /* imcb callback that is invoked when it runs out of data.
    560  * We reschedule jabber_bs_read here if neccessary. */
    561 void jabber_bs_recv_out_of_data( file_transfer_t *ft )
    562 {
    563         struct jabber_transfer *tf = ft->data;
    564 
    565         tf->receiver_overflow = FALSE;
    566 
    567         if ( !tf->watch_in )
    568                 tf->watch_in = b_input_add( tf->fd, GAIM_INPUT_READ, jabber_bs_recv_read, tf );
    569 }
    570 
    571 /* signal ood and be done */
     578/*
     579 * Issues a write_request to imc.
     580 * */
    572581gboolean jabber_bs_send_can_write( gpointer data, gint fd, b_input_condition cond )
    573582{
    574583        struct bs_transfer *bt = data;
    575584
    576         bt->tf->ft->out_of_data( bt->tf->ft );
    577 
    578585        bt->tf->watch_out = 0;
     586
     587        bt->tf->ft->write_request( bt->tf->ft );
     588
    579589        return FALSE;
    580590}
    581591
    582 /* try to send the stuff. If you can't return false and wait for writable */
    583 gboolean jabber_bs_send_write( file_transfer_t *ft, char *buffer, int len )
     592/*
     593 * This should only be called if we can write, so just do it.
     594 * Add a write watch so we can write more during the next cycle (if possible).
     595 */
     596gboolean jabber_bs_send_write( file_transfer_t *ft, char *buffer, unsigned int len )
    584597{
    585598        struct jabber_transfer *tf = ft->data;
     
    587600        int ret;
    588601
    589         if ( ( ( ret = send( tf->fd, buffer, len, 0 ) ) == -1 ) &&
    590              ( errno != EAGAIN ) )
    591                 return jabber_bs_abort( bt, "send failed on socket with: %s", strerror( errno ) );
    592        
    593         if( ret == 0 )
    594                 return jabber_bs_abort( bt, "Remote end closed connection" );
    595        
    596         if( ret == -1 )
    597         {
    598                 bt->tf->watch_out = b_input_add( tf->fd, GAIM_INPUT_WRITE, jabber_bs_send_can_write, bt );
    599                 return FALSE;
    600         }
     602        if( tf->watch_out )
     603                return jabber_bs_abort( bt, "BUG: write() called while watching " );
     604       
     605        ASSERTSOCKOP( ret = send( tf->fd, buffer, len, 0 ), "Sending" );
     606
     607        tf->byteswritten += ret;
     608       
     609        /* TODO: this should really not be fatal */
     610        if( ret < len )
     611                return jabber_bs_abort( bt, "send() sent %d instead of %d (send buffer too big!)", ret, len );
     612
     613        bt->tf->watch_out = b_input_add( tf->fd, GAIM_INPUT_WRITE, jabber_bs_send_can_write, bt );
    601614               
    602615        return TRUE;
    603616}
    604617
     618/*
     619 * Handles the reply by the receiver containing the used streamhost.
     620 */
    605621static xt_status jabber_bs_send_handle_reply(struct im_connection *ic, struct xt_node *node, struct xt_node *orig ) {
    606622        struct jabber_transfer *tf = NULL;
     
    651667        if( bt->phase == BS_PHASE_REPLY )
    652668        {
     669                /* handshake went through, let's start transferring */
    653670                tf->ft->started = time( NULL );
    654                 tf->ft->out_of_data( tf->ft );
    655         }
    656 
    657         //bt->tf->watch_out = b_input_add( tf->fd, GAIM_INPUT_WRITE, jabber_bs_send_write, tf );
     671                tf->ft->write_request( tf->ft );
     672        }
    658673
    659674        return XT_HANDLED;
     
    681696        bt = g_new0( struct bs_transfer, 1 );
    682697        bt->tf = tf;
    683         //bt->qnode = xt_dup( qnode );
    684         //bt->shnode = bt->qnode->children;
    685698        bt->phase = BS_PHASE_CONNECT;
    686699        bt->pseudoadr = g_strdup( hash_hex );
     
    714727        iq = jabber_make_packet( "iq", "set", tf->tgt_jid, query );
    715728        xt_add_attr( iq, "from", tf->ini_jid );
    716 
    717         //xt_free_node( query );
    718729
    719730        jabber_cache_add( tf->ic, iq, jabber_bs_send_handle_reply );
     
    885896                        bt->phase = BS_PHASE_REPLY;
    886897
    887                         /* don't start sending till the streamhost-used message comes in */
     898                        imcb_log( tf->ic, "File %s: SOCKS5 handshake successful! Transfer about to start...", tf->ft->file_name );
     899
    888900                        if( tf->accepted )
    889901                        {
     902                                /* streamhost-used message came already in(possible?), let's start sending */
    890903                                tf->ft->started = time( NULL );
    891                                 tf->ft->out_of_data( tf->ft );
     904                                tf->ft->write_request( tf->ft );
    892905                        }
    893906
  • protocols/jabber/si.c

    r2ff2076 rdce3903  
    8484        struct jabber_data *jd = ic->proto_data;
    8585
    86         imcb_log( ic, "Incoming file from %s : %s %zd bytes", ic->irc->nick, ft->file_name, ft->file_size );
     86        imcb_log( ic, "Trying to send %s(%zd bytes) to %s", ft->file_name, ft->file_size, who );
    8787
    8888        tf = g_new0( struct jabber_transfer, 1 );
     
    224224
    225225/*
    226  * imcb called the accept callback which probably means that the user accepted this file transfer.
     226 * imc called the accept callback which probably means that the user accepted this file transfer.
    227227 * We send our response to the initiator.
    228228 * In the next step, the initiator will send us a request for the given stream type.
     
    275275{
    276276        struct xt_node *c, *d;
    277         char *ini_jid, *tgt_jid;
     277        char *ini_jid, *tgt_jid, *iq_id;
    278278        GSList *tflist;
    279279        struct jabber_transfer *tf=NULL;
    280280        struct jabber_data *jd = ic->proto_data;
    281         char *sid;
    282281
    283282        if( !( tgt_jid = xt_find_attr( node, "from" ) ) ||
     
    288287        }
    289288       
    290         imcb_log( ic, "GOT RESPONSE TO FILE" );
    291289        /* All this means we expect something like this: ( I think )
    292          * <iq from=... to=...>
     290         * <iq from=... to=... id=...>
    293291         *      <si xmlns=si>
    294          *              <file xmlns=ft/>
     292         *      [       <file xmlns=ft/>    ] <-- not neccessary
    295293         *              <feature xmlns=feature>
    296294         *                      <x xmlns=xdata type=submit>
     
    300298        if( !( tgt_jid = xt_find_attr( node, "from" ) ) ||
    301299            !( ini_jid = xt_find_attr( node, "to" ) ) ||
     300            !( iq_id   = xt_find_attr( node, "id" ) ) ||
    302301            !( c = xt_find_node( node->children, "si" ) ) ||
    303302            !( strcmp( xt_find_attr( c, "xmlns" ), XMLNS_SI ) == 0 ) ||
    304             !( sid = xt_find_attr( c, "id" ) )||
    305             !( d = xt_find_node( c->children, "file" ) ) ||
    306             !( strcmp( xt_find_attr( d, "xmlns" ), XMLNS_FILETRANSFER ) == 0 ) ||
     303/*          !( d = xt_find_node( c->children, "file" ) ) ||
     304            !( strcmp( xt_find_attr( d, "xmlns" ), XMLNS_FILETRANSFER ) == 0 ) || */
    307305            !( d = xt_find_node( c->children, "feature" ) ) ||
    308306            !( strcmp( xt_find_attr( d, "xmlns" ), XMLNS_FEATURE ) == 0 ) ||
     
    331329        {
    332330                struct jabber_transfer *tft = tflist->data;
    333                 if( ( strcmp( tft->sid, sid ) == 0 ) )
     331                if( ( strcmp( tft->iq_id, iq_id ) == 0 ) )
    334332                {
    335333                        tf = tft;
     
    346344        tf->ini_jid = g_strdup( ini_jid );
    347345        tf->tgt_jid = g_strdup( tgt_jid );
     346
     347        imcb_log( ic, "File %s: %s accepted the transfer!", tf->ft->file_name, tgt_jid );
    348348
    349349        jabber_bs_send_start( tf );
     
    423423        node = jabber_make_packet( "iq", "set", bud ? bud->full_jid : who, sinode );
    424424        jabber_cache_add( ic, node, jabber_si_handle_response );
     425        tf->iq_id = g_strdup( xt_find_attr( node, "id" ) );
    425426       
    426427        return jabber_write_packet( ic, node );
Note: See TracChangeset for help on using the changeset viewer.