source: protocols/oscar/txqueue.c @ b7d3cc34

0.99
Last change on this file since b7d3cc34 was b7d3cc34, checked in by Wilmer van der Gaast <wilmer@…>, at 2005-11-06T18:23:18Z

Initial repository (0.99 release tree)

  • Property mode set to 100644
File size: 9.7 KB
Line 
1/*
2 *  aim_txqueue.c
3 *
4 * Herein lies all the mangement routines for the transmit (Tx) queue.
5 *
6 */
7
8#include <aim.h>
9#include "im.h"
10
11#ifndef _WIN32
12#include <sys/socket.h>
13#endif
14
15/*
16 * Allocate a new tx frame.
17 *
18 * This is more for looks than anything else.
19 *
20 * Right now, that is.  If/when we implement a pool of transmit
21 * frames, this will become the request-an-unused-frame part.
22 *
23 * framing = AIM_FRAMETYPE_OFT/FLAP
24 * chan = channel for FLAP, hdrtype for OFT
25 *
26 */
27aim_frame_t *aim_tx_new(aim_session_t *sess, aim_conn_t *conn, guint8 framing, guint8 chan, int datalen)
28{
29        aim_frame_t *fr;
30
31        if (!conn) {
32                do_error_dialog(sess->aux_data, "no connection specified", "Gaim");
33                return NULL;
34        }
35
36        /* For sanity... */
37        if ((conn->type == AIM_CONN_TYPE_RENDEZVOUS) || 
38                        (conn->type == AIM_CONN_TYPE_RENDEZVOUS_OUT)) {
39                if (framing != AIM_FRAMETYPE_OFT) {
40                        do_error_dialog(sess->aux_data, "attempted to allocate inappropriate frame type for rendezvous connection", "Gaim");
41                        return NULL;
42                }
43        } else {
44                if (framing != AIM_FRAMETYPE_FLAP) {
45                        do_error_dialog(sess->aux_data, "attempted to allocate inappropriate frame type for FLAP connection", "Gaim");
46                        return NULL;
47                }
48        }
49
50        if (!(fr = (aim_frame_t *)g_new0(aim_frame_t,1)))
51                return NULL;
52
53        fr->conn = conn; 
54
55        fr->hdrtype = framing;
56
57        if (fr->hdrtype == AIM_FRAMETYPE_FLAP) {
58
59                fr->hdr.flap.type = chan;
60
61        } else if (fr->hdrtype == AIM_FRAMETYPE_OFT) {
62
63                fr->hdr.oft.type = chan;
64                fr->hdr.oft.hdr2len = 0; /* this will get setup by caller */
65
66        } else 
67                do_error_dialog(sess->aux_data, "unknown framing", "Gaim");
68
69        if (datalen > 0) {
70                guint8 *data;
71
72                if (!(data = (unsigned char *)g_malloc(datalen))) {
73                        aim_frame_destroy(fr);
74                        return NULL;
75                }
76
77                aim_bstream_init(&fr->data, data, datalen);
78        }
79
80        return fr;
81}
82
83/*
84 * aim_tx_enqeue__queuebased()
85 *
86 * The overall purpose here is to enqueue the passed in command struct
87 * into the outgoing (tx) queue.  Basically...
88 *   1) Make a scope-irrelevent copy of the struct
89 *   3) Mark as not-sent-yet
90 *   4) Enqueue the struct into the list
91 *   6) Return
92 *
93 * Note that this is only used when doing queue-based transmitting;
94 * that is, when sess->tx_enqueue is set to &aim_tx_enqueue__queuebased.
95 *
96 */
97static int aim_tx_enqueue__queuebased(aim_session_t *sess, aim_frame_t *fr)
98{
99
100        if (!fr->conn) {
101                do_error_dialog(sess->aux_data, "WARNING: enqueueing packet with no connection", "Gaim");
102                fr->conn = aim_getconn_type(sess, AIM_CONN_TYPE_BOS);
103        }
104
105        if (fr->hdrtype == AIM_FRAMETYPE_FLAP) {
106                /* assign seqnum -- XXX should really not assign until hardxmit */
107                fr->hdr.flap.seqnum = aim_get_next_txseqnum(fr->conn);
108        }
109
110        fr->handled = 0; /* not sent yet */
111
112        /* see overhead note in aim_rxqueue counterpart */
113        if (!sess->queue_outgoing)
114                sess->queue_outgoing = fr;
115        else {
116                aim_frame_t *cur;
117
118                for (cur = sess->queue_outgoing; cur->next; cur = cur->next)
119                        ;
120                cur->next = fr;
121        }
122
123        return 0;
124}
125
126/*
127 * aim_tx_enqueue__immediate()
128 *
129 * Parallel to aim_tx_enqueue__queuebased, however, this bypasses
130 * the whole queue mess when you want immediate writes to happen.
131 *
132 * Basically the same as its __queuebased couterpart, however
133 * instead of doing a list append, it just calls aim_tx_sendframe()
134 * right here.
135 *
136 */
137static int aim_tx_enqueue__immediate(aim_session_t *sess, aim_frame_t *fr)
138{
139
140        if (!fr->conn) {
141                do_error_dialog(sess->aux_data, "packet has no connection", "Gaim");
142                aim_frame_destroy(fr);
143                return 0;
144        }
145
146        if (fr->hdrtype == AIM_FRAMETYPE_FLAP)
147                fr->hdr.flap.seqnum = aim_get_next_txseqnum(fr->conn);
148
149        fr->handled = 0; /* not sent yet */
150
151        aim_tx_sendframe(sess, fr);
152
153        aim_frame_destroy(fr);
154
155        return 0;
156}
157
158int aim_tx_setenqueue(aim_session_t *sess, int what, int (*func)(aim_session_t *, aim_frame_t *))
159{
160       
161        if (what == AIM_TX_QUEUED)
162                sess->tx_enqueue = &aim_tx_enqueue__queuebased;
163        else if (what == AIM_TX_IMMEDIATE) 
164                sess->tx_enqueue = &aim_tx_enqueue__immediate;
165        else if (what == AIM_TX_USER) {
166                if (!func)
167                        return -EINVAL;
168                sess->tx_enqueue = func;
169        } else
170                return -EINVAL; /* unknown action */
171
172        return 0;
173}
174
175int aim_tx_enqueue(aim_session_t *sess, aim_frame_t *fr)
176{
177       
178        /*
179         * If we want to send a connection thats inprogress, we have to force
180         * them to use the queue based version. Otherwise, use whatever they
181         * want.
182         */
183        if (fr && fr->conn && 
184                        (fr->conn->status & AIM_CONN_STATUS_INPROGRESS)) {
185                return aim_tx_enqueue__queuebased(sess, fr);
186        }
187
188        return (*sess->tx_enqueue)(sess, fr);
189}
190
191/*
192 *  aim_get_next_txseqnum()
193 *
194 *   This increments the tx command count, and returns the seqnum
195 *   that should be stamped on the next FLAP packet sent.  This is
196 *   normally called during the final step of packet preparation
197 *   before enqueuement (in aim_tx_enqueue()).
198 *
199 */
200flap_seqnum_t aim_get_next_txseqnum(aim_conn_t *conn)
201{
202        flap_seqnum_t ret;
203       
204        ret = ++conn->seqnum;
205
206        return ret;
207}
208
209static int aim_send(int fd, const void *buf, size_t count)
210{
211        int left, cur;
212
213        for (cur = 0, left = count; left; ) {
214                int ret;
215
216                ret = send(fd, ((unsigned char *)buf)+cur, left, 0);
217                if (ret == -1)
218                        return -1;
219                else if (ret == 0)
220                        return cur;
221
222                cur += ret;
223                left -= ret;
224        }
225
226        return cur;
227}
228
229static int aim_bstream_send(aim_bstream_t *bs, aim_conn_t *conn, size_t count)
230{
231        int wrote = 0;
232        if (!bs || !conn || (count < 0))
233                return -EINVAL;
234
235        if (count > aim_bstream_empty(bs))
236                count = aim_bstream_empty(bs); /* truncate to remaining space */
237
238        if (count) {
239                if ((conn->type == AIM_CONN_TYPE_RENDEZVOUS) && 
240                    (conn->subtype == AIM_CONN_SUBTYPE_OFT_DIRECTIM)) {
241                        /* I strongly suspect that this is a horrible thing to do
242                         * and I feel really guilty doing it. */
243                        const char *sn = aim_directim_getsn(conn);
244                        aim_rxcallback_t userfunc;
245                        while (count - wrote > 1024) {
246                                wrote = wrote + aim_send(conn->fd, bs->data + bs->offset + wrote, 1024);
247                                if ((userfunc=aim_callhandler(conn->sessv, conn, 
248                                                              AIM_CB_FAM_SPECIAL, 
249                                                              AIM_CB_SPECIAL_IMAGETRANSFER)))
250                                  userfunc(conn->sessv, NULL, sn, 
251                                           count-wrote>1024 ? ((double)wrote / count) : 1);
252                        }
253                }
254                if (count - wrote) {
255                        wrote = wrote + aim_send(conn->fd, bs->data + bs->offset + wrote, count - wrote);
256                }
257               
258        }
259       
260        bs->offset += wrote;
261
262        return wrote;   
263}
264
265static int sendframe_flap(aim_session_t *sess, aim_frame_t *fr)
266{
267        aim_bstream_t obs;
268        guint8 *obs_raw;
269        int payloadlen, err = 0, obslen;
270
271        payloadlen = aim_bstream_curpos(&fr->data);
272
273        if (!(obs_raw = g_malloc(6 + payloadlen)))
274                return -ENOMEM;
275
276        aim_bstream_init(&obs, obs_raw, 6 + payloadlen);
277
278        /* FLAP header */
279        aimbs_put8(&obs, 0x2a);
280        aimbs_put8(&obs, fr->hdr.flap.type);
281        aimbs_put16(&obs, fr->hdr.flap.seqnum);
282        aimbs_put16(&obs, payloadlen);
283
284        /* payload */
285        aim_bstream_rewind(&fr->data);
286        aimbs_putbs(&obs, &fr->data, payloadlen);
287
288        obslen = aim_bstream_curpos(&obs);
289        aim_bstream_rewind(&obs);
290        if (aim_bstream_send(&obs, fr->conn, obslen) != obslen)
291                err = -errno;
292       
293        g_free(obs_raw); /* XXX aim_bstream_free */
294
295        fr->handled = 1;
296        fr->conn->lastactivity = time(NULL);
297
298        return err;
299}
300
301static int sendframe_oft(aim_session_t *sess, aim_frame_t *fr)
302{
303        aim_bstream_t hbs;
304        guint8 *hbs_raw;
305        int hbslen;
306        int err = 0;
307       
308        hbslen = 8 + fr->hdr.oft.hdr2len;
309        if (!(hbs_raw = g_malloc(hbslen)))
310                return -1;
311
312        aim_bstream_init(&hbs, hbs_raw, hbslen);
313
314        aimbs_putraw(&hbs, fr->hdr.oft.magic, 4);
315        aimbs_put16(&hbs, fr->hdr.oft.hdr2len + 8);
316        aimbs_put16(&hbs, fr->hdr.oft.type);
317        aimbs_putraw(&hbs, fr->hdr.oft.hdr2, fr->hdr.oft.hdr2len);
318
319        aim_bstream_rewind(&hbs);
320       
321       
322        if (aim_bstream_send(&hbs, fr->conn, hbslen) != hbslen) {
323
324                err = -errno;
325               
326        } else if (aim_bstream_curpos(&fr->data)) {
327                int len;
328
329                len = aim_bstream_curpos(&fr->data);
330                aim_bstream_rewind(&fr->data);
331               
332                if (aim_bstream_send(&fr->data, fr->conn, len) != len)
333                        err = -errno;
334        }
335
336        g_free(hbs_raw); /* XXX aim_bstream_free */
337
338        fr->handled = 1;
339        fr->conn->lastactivity = time(NULL);
340
341
342        return err;
343
344
345}
346
347int aim_tx_sendframe(aim_session_t *sess, aim_frame_t *fr)
348{
349        if (fr->hdrtype == AIM_FRAMETYPE_FLAP)
350                return sendframe_flap(sess, fr);
351        else if (fr->hdrtype == AIM_FRAMETYPE_OFT)
352                return sendframe_oft(sess, fr);
353        return -1;
354}
355
356int aim_tx_flushqueue(aim_session_t *sess)
357{
358        aim_frame_t *cur;
359
360        for (cur = sess->queue_outgoing; cur; cur = cur->next) {
361
362                if (cur->handled)
363                        continue; /* already been sent */
364
365                if (cur->conn && (cur->conn->status & AIM_CONN_STATUS_INPROGRESS))
366                        continue;
367
368                /*
369                 * And now for the meager attempt to force transmit
370                 * latency and avoid missed messages.
371                 */
372                if ((cur->conn->lastactivity + cur->conn->forcedlatency) >= time(NULL)) {
373                        /*
374                         * XXX should be a break! we dont want to block the
375                         * upper layers
376                         *
377                         * XXX or better, just do this right.
378                         *
379                         */
380                        sleep((cur->conn->lastactivity + cur->conn->forcedlatency) - time(NULL));
381                }
382
383                /* XXX this should call the custom "queuing" function!! */
384                aim_tx_sendframe(sess, cur);
385        }
386
387        /* purge sent commands from queue */
388        aim_tx_purgequeue(sess);
389
390        return 0;
391}
392
393/*
394 *  aim_tx_purgequeue()
395 * 
396 *  This is responsable for removing sent commands from the transmit
397 *  queue. This is not a required operation, but it of course helps
398 *  reduce memory footprint at run time! 
399 *
400 */
401void aim_tx_purgequeue(aim_session_t *sess)
402{
403        aim_frame_t *cur, **prev;
404
405        for (prev = &sess->queue_outgoing; (cur = *prev); ) {
406
407                if (cur->handled) {
408                        *prev = cur->next;
409
410                        aim_frame_destroy(cur);
411
412                } else
413                        prev = &cur->next;
414        }
415
416        return;
417}
418
419/**
420 * aim_tx_cleanqueue - get rid of packets waiting for tx on a dying conn
421 * @sess: session
422 * @conn: connection that's dying
423 *
424 * for now this simply marks all packets as sent and lets them
425 * disappear without warning.
426 *
427 */
428void aim_tx_cleanqueue(aim_session_t *sess, aim_conn_t *conn)
429{
430        aim_frame_t *cur;
431
432        for (cur = sess->queue_outgoing; cur; cur = cur->next) {
433                if (cur->conn == conn)
434                        cur->handled = 1;
435        }
436
437        return;
438}
439
440
Note: See TracBrowser for help on using the repository browser.