2006-12-16 [colin] 2.6.1cvs43
[claws.git] / src / common / session.c
1 /*
2  * Sylpheed -- a GTK+ based, lightweight, and fast e-mail client
3  * Copyright (C) 1999-2006 Hiroyuki Yamamoto and the Claws Mail team
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 2 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18  */
19
20 #ifdef HAVE_CONFIG_H
21 #  include "config.h"
22 #endif
23
24 #include "defs.h"
25
26 #include <glib.h>
27 #include <glib/gi18n.h>
28
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <string.h>
32 #include <unistd.h>
33 #include <time.h>
34 #include <errno.h>
35
36 #include "session.h"
37 #include "utils.h"
38
39 static gint session_connect_cb          (SockInfo       *sock,
40                                          gpointer        data);
41 static gint session_close               (Session        *session);
42
43 static gboolean session_timeout_cb      (gpointer        data);
44
45 static gboolean session_recv_msg_idle_cb        (gpointer        data);
46 static gboolean session_recv_data_idle_cb       (gpointer        data);
47
48 static gboolean session_read_msg_cb     (SockInfo       *source,
49                                          GIOCondition    condition,
50                                          gpointer        data);
51 static gboolean session_read_data_cb    (SockInfo       *source,
52                                          GIOCondition    condition,
53                                          gpointer        data);
54 static gboolean session_write_msg_cb    (SockInfo       *source,
55                                          GIOCondition    condition,
56                                          gpointer        data);
57 static gboolean session_write_data_cb   (SockInfo       *source,
58                                          GIOCondition    condition,
59                                          gpointer        data);
60
61
62 void session_init(Session *session)
63 {
64         session->type = SESSION_UNKNOWN;
65         session->sock = NULL;
66         session->server = NULL;
67         session->port = 0;
68 #if USE_OPENSSL
69         session->ssl_type = SSL_NONE;
70 #endif
71         session->nonblocking = TRUE;
72         session->state = SESSION_READY;
73         session->last_access_time = time(NULL);
74
75         g_get_current_time(&session->tv_prev);
76
77         session->conn_id = 0;
78
79         session->io_tag = 0;
80
81         session->read_buf_p = session->read_buf;
82         session->read_buf_len = 0;
83
84         session->read_msg_buf = g_string_sized_new(1024);
85         session->read_data_buf = g_byte_array_new();
86
87         session->write_buf = NULL;
88         session->write_buf_p = NULL;
89         session->write_buf_len = 0;
90
91         session->write_data = NULL;
92         session->write_data_p = NULL;
93         session->write_data_len = 0;
94
95         session->timeout_tag = 0;
96         session->timeout_interval = 0;
97
98         session->data = NULL;
99 }
100
101 /*!
102  *\brief        Set up parent and child process
103  *              Childloop: Read commands from parent,
104  *              send to server, get answer, pass to parent
105  *
106  *\param        session Contains session information
107  *              server to connect to
108  *              port to connect to
109  *
110  *\return        0 : success
111  *              -1 : pipe / fork errors (parent)
112  *               1 : connection error (child)
113  */
114 gint session_connect(Session *session, const gchar *server, gushort port)
115 {
116 #ifdef G_OS_UNIX
117         session->server = g_strdup(server);
118         session->port = port;
119
120         session->conn_id = sock_connect_async(server, port, session_connect_cb,
121                                               session);
122         if (session->conn_id < 0) {
123                 g_warning("can't connect to server.");
124                 session_close(session);
125                 return -1;
126         }
127
128         return 0;
129 #else
130         SockInfo *sock;
131
132         session->server = g_strdup(server);
133         session->port = port;
134
135         sock = sock_connect(server, port);
136         if (sock == NULL) {
137                 g_warning("can't connect to server.");
138                 session_close(session);
139                 return -1;
140         }
141
142         return session_connect_cb(sock, session);
143 #endif
144 }
145
146 static gint session_connect_cb(SockInfo *sock, gpointer data)
147 {
148         Session *session = SESSION(data);
149
150         session->conn_id = 0;
151
152         if (!sock) {
153                 g_warning("can't connect to server.");
154                 session->state = SESSION_ERROR;
155                 return -1;
156         }
157
158         session->sock = sock;
159
160 #if USE_OPENSSL
161         if (session->ssl_type == SSL_TUNNEL) {
162                 sock_set_nonblocking_mode(sock, FALSE);
163                 if (!ssl_init_socket(sock)) {
164                         g_warning("can't initialize SSL.");
165                         log_error(_("SSL handshake failed\n"));
166                         session->state = SESSION_ERROR;
167                         return -1;
168                 }
169         }
170 #endif
171
172         sock_set_nonblocking_mode(sock, session->nonblocking);
173
174         debug_print("session (%p): connected\n", session);
175
176         session->state = SESSION_RECV;
177         session->io_tag = sock_add_watch(session->sock, G_IO_IN,
178                                          session_read_msg_cb,
179                                          session);
180
181         return 0;
182 }
183
184 /*!
185  *\brief        child and parent: send DISCONNECT message to other process
186  *
187  *\param        session Contains session information
188  *
189  *\return        0 : success
190  */
191 gint session_disconnect(Session *session)
192 {
193         session_close(session);
194         return 0;
195 }
196
197 /*!
198  *\brief        parent ?
199  *
200  *\param        session Contains session information
201  */
202 void session_destroy(Session *session)
203 {
204         g_return_if_fail(session != NULL);
205         g_return_if_fail(session->destroy != NULL);
206
207         session_close(session);
208         session->destroy(session);
209         g_free(session->server);
210         g_string_free(session->read_msg_buf, TRUE);
211         g_byte_array_free(session->read_data_buf, TRUE);
212         g_free(session->read_data_terminator);
213         g_free(session->write_buf);
214
215         debug_print("session (%p): destroyed\n", session);
216
217         g_free(session);
218 }
219
220 gboolean session_is_connected(Session *session)
221 {
222         return (session->state == SESSION_READY ||
223                 session->state == SESSION_SEND ||
224                 session->state == SESSION_RECV);
225 }
226
227 void session_set_access_time(Session *session)
228 {
229         session->last_access_time = time(NULL);
230 }
231
232 void session_set_timeout(Session *session, guint interval)
233 {
234         if (session->timeout_tag > 0)
235                 g_source_remove(session->timeout_tag);
236
237         session->timeout_interval = interval;
238         if (interval > 0)
239                 session->timeout_tag =
240                         g_timeout_add(interval, session_timeout_cb, session);
241         else
242                 session->timeout_tag = 0;
243 }
244
245 static gboolean session_timeout_cb(gpointer data)
246 {
247         Session *session = SESSION(data);
248
249         g_warning("session timeout.\n");
250
251         if (session->io_tag > 0) {
252                 g_source_remove(session->io_tag);
253                 session->io_tag = 0;
254         }
255
256         session->timeout_tag = 0;
257         session->state = SESSION_TIMEOUT;
258
259         return FALSE;
260 }
261
262 void session_set_recv_message_notify(Session *session,
263                                      RecvMsgNotify notify_func, gpointer data)
264 {
265         session->recv_msg_notify = notify_func;
266         session->recv_msg_notify_data = data;
267 }
268
269 void session_set_recv_data_progressive_notify
270                                         (Session *session,
271                                          RecvDataProgressiveNotify notify_func,
272                                          gpointer data)
273 {
274         session->recv_data_progressive_notify = notify_func,
275         session->recv_data_progressive_notify_data = data;
276 }
277
278 void session_set_recv_data_notify(Session *session, RecvDataNotify notify_func,
279                                   gpointer data)
280 {
281         session->recv_data_notify = notify_func;
282         session->recv_data_notify_data = data;
283 }
284
285 void session_set_send_data_progressive_notify
286                                         (Session *session,
287                                          SendDataProgressiveNotify notify_func,
288                                          gpointer data)
289 {
290         session->send_data_progressive_notify = notify_func;
291         session->send_data_progressive_notify_data = data;
292 }
293
294 void session_set_send_data_notify(Session *session, SendDataNotify notify_func,
295                                   gpointer data)
296 {
297         session->send_data_notify = notify_func;
298         session->send_data_notify_data = data;
299 }
300
301 /*!
302  *\brief        child and parent cleanup (child closes first)
303  *
304  *\param        session Contains session information
305  *
306  *\return        0 : success
307  */
308 static gint session_close(Session *session)
309 {
310         g_return_val_if_fail(session != NULL, -1);
311
312 #ifdef G_OS_UNIX
313         if (session->conn_id > 0) {
314                 sock_connect_async_cancel(session->conn_id);
315                 session->conn_id = 0;
316                 debug_print("session (%p): connection cancelled\n", session);
317         }
318 #endif
319
320         session_set_timeout(session, 0);
321
322         if (session->io_tag > 0) {
323                 g_source_remove(session->io_tag);
324                 session->io_tag = 0;
325         }
326
327         if (session->sock) {
328                 sock_close(session->sock);
329                 session->sock = NULL;
330                 session->state = SESSION_DISCONNECTED;
331                 debug_print("session (%p): closed\n", session);
332         }
333
334         return 0;
335 }
336
337 #if USE_OPENSSL
338 gint session_start_tls(Session *session)
339 {
340         gboolean nb_mode;
341
342         nb_mode = sock_is_nonblocking_mode(session->sock);
343
344         if (nb_mode)
345                 sock_set_nonblocking_mode(session->sock, FALSE);
346
347         if (!ssl_init_socket_with_method(session->sock, SSL_METHOD_TLSv1)) {
348                 g_warning("couldn't start TLS session.\n");
349                 if (nb_mode)
350                         sock_set_nonblocking_mode(session->sock, TRUE);
351                 return -1;
352         }
353
354         if (nb_mode)
355                 sock_set_nonblocking_mode(session->sock, session->nonblocking);
356
357         return 0;
358 }
359 #endif
360
361 gint session_send_msg(Session *session, SessionMsgType type, const gchar *msg)
362 {
363         gboolean ret;
364
365         g_return_val_if_fail(session->write_buf == NULL, -1);
366         g_return_val_if_fail(msg != NULL, -1);
367         g_return_val_if_fail(msg[0] != '\0', -1);
368
369         session->state = SESSION_SEND;
370         session->write_buf = g_strconcat(msg, "\r\n", NULL);
371         session->write_buf_p = session->write_buf;
372         session->write_buf_len = strlen(msg) + 2;
373
374         ret = session_write_msg_cb(session->sock, G_IO_OUT, session);
375
376         if (ret == TRUE)
377                 session->io_tag = sock_add_watch(session->sock, G_IO_OUT,
378                                                  session_write_msg_cb, session);
379         else if (session->state == SESSION_ERROR)
380                 return -1;
381
382         return 0;
383 }
384
385 gint session_recv_msg(Session *session)
386 {
387         g_return_val_if_fail(session->read_msg_buf->len == 0, -1);
388
389         session->state = SESSION_RECV;
390
391         if (session->read_buf_len > 0)
392                 g_idle_add(session_recv_msg_idle_cb, session);
393         else
394                 session->io_tag = sock_add_watch(session->sock, G_IO_IN,
395                                                  session_read_msg_cb, session);
396
397         return 0;
398 }
399
400 static gboolean session_recv_msg_idle_cb(gpointer data)
401 {
402         Session *session = SESSION(data);
403         gboolean ret;
404
405         ret = session_read_msg_cb(session->sock, G_IO_IN, session);
406
407         if (ret == TRUE)
408                 session->io_tag = sock_add_watch(session->sock, G_IO_IN,
409                                                  session_read_msg_cb, session);
410
411         return FALSE;
412 }
413
414 /*!
415  *\brief        parent (child?): send data to other process
416  *
417  *\param        session Contains session information
418  *              data Data to send
419  *              size Bytes to send
420  *
421  *\return        0 : success
422  *              -1 : error
423  */
424 gint session_send_data(Session *session, const guchar *data, guint size)
425 {
426         gboolean ret;
427
428         g_return_val_if_fail(session->write_data == NULL, -1);
429         g_return_val_if_fail(data != NULL, -1);
430         g_return_val_if_fail(size != 0, -1);
431
432         session->state = SESSION_SEND;
433
434         session->write_data = data;
435         session->write_data_p = session->write_data;
436         session->write_data_len = size;
437         g_get_current_time(&session->tv_prev);
438
439         ret = session_write_data_cb(session->sock, G_IO_OUT, session);
440
441         if (ret == TRUE)
442                 session->io_tag = sock_add_watch(session->sock, G_IO_OUT,
443                                                  session_write_data_cb,
444                                                  session);
445         else if (session->state == SESSION_ERROR)
446                 return -1;
447
448         return 0;
449 }
450
451 gint session_recv_data(Session *session, guint size, const gchar *terminator)
452 {
453         g_return_val_if_fail(session->read_data_buf->len == 0, -1);
454
455         session->state = SESSION_RECV;
456
457         g_free(session->read_data_terminator);
458         session->read_data_terminator = g_strdup(terminator);
459         g_get_current_time(&session->tv_prev);
460
461         if (session->read_buf_len > 0)
462                 g_idle_add(session_recv_data_idle_cb, session);
463         else
464                 session->io_tag = sock_add_watch(session->sock, G_IO_IN,
465                                                  session_read_data_cb, session);
466
467         return 0;
468 }
469
470 static gboolean session_recv_data_idle_cb(gpointer data)
471 {
472         Session *session = SESSION(data);
473         gboolean ret;
474
475         ret = session_read_data_cb(session->sock, G_IO_IN, session);
476
477         if (ret == TRUE)
478                 session->io_tag = sock_add_watch(session->sock, G_IO_IN,
479                                                  session_read_data_cb, session);
480
481         return FALSE;
482 }
483
484 static gboolean session_read_msg_cb(SockInfo *source, GIOCondition condition,
485                                     gpointer data)
486 {
487         Session *session = SESSION(data);
488         gchar buf[SESSION_BUFFSIZE];
489         gint line_len;
490         gchar *newline;
491         gchar *msg;
492         gint ret;
493
494         g_return_val_if_fail(condition == G_IO_IN, FALSE);
495
496         session_set_timeout(session, session->timeout_interval);
497
498         if (session->read_buf_len == 0) {
499                 gint read_len;
500
501                 read_len = sock_read(session->sock, session->read_buf,
502                                      SESSION_BUFFSIZE - 1);
503
504                 if (read_len == -1 && session->state == SESSION_DISCONNECTED) {
505                         g_warning ("sock_read: session disconnected\n");
506                         if (session->io_tag > 0) {
507                                 g_source_remove(session->io_tag);
508                                 session->io_tag = 0;
509                         }
510                         return FALSE;
511                 }
512                 
513                 if (read_len == 0) {
514                         g_warning("sock_read: received EOF\n");
515                         session->state = SESSION_EOF;
516                         return FALSE;
517                 }
518
519                 if (read_len < 0) {
520                         switch (errno) {
521                         case EAGAIN:
522                                 return TRUE;
523                         default:
524                                 g_warning("sock_read: %s\n", g_strerror(errno));
525                                 session->state = SESSION_ERROR;
526                                 return FALSE;
527                         }
528                 }
529
530                 session->read_buf_len = read_len;
531         }
532
533         if ((newline = memchr(session->read_buf_p, '\n', session->read_buf_len))
534                 != NULL)
535                 line_len = newline - session->read_buf_p + 1;
536         else
537                 line_len = session->read_buf_len;
538
539         if (line_len == 0)
540                 return TRUE;
541
542         memcpy(buf, session->read_buf_p, line_len);
543         buf[line_len] = '\0';
544
545         g_string_append(session->read_msg_buf, buf);
546
547         session->read_buf_len -= line_len;
548         if (session->read_buf_len == 0)
549                 session->read_buf_p = session->read_buf;
550         else
551                 session->read_buf_p += line_len;
552
553         /* incomplete read */
554         if (buf[line_len - 1] != '\n')
555                 return TRUE;
556
557         /* complete */
558         if (session->io_tag > 0) {
559                 g_source_remove(session->io_tag);
560                 session->io_tag = 0;
561         }
562
563         /* callback */
564         msg = g_strdup(session->read_msg_buf->str);
565         strretchomp(msg);
566         g_string_truncate(session->read_msg_buf, 0);
567
568         ret = session->recv_msg(session, msg);
569         session->recv_msg_notify(session, msg, session->recv_msg_notify_data);
570
571         g_free(msg);
572
573         if (ret < 0)
574                 session->state = SESSION_ERROR;
575
576         return FALSE;
577 }
578
579 static gboolean session_read_data_cb(SockInfo *source, GIOCondition condition,
580                                      gpointer data)
581 {
582         Session *session = SESSION(data);
583         GByteArray *data_buf;
584         gint terminator_len;
585         gboolean complete = FALSE;
586         guint data_len;
587         gint ret;
588
589         g_return_val_if_fail(condition == G_IO_IN, FALSE);
590
591         session_set_timeout(session, session->timeout_interval);
592
593         if (session->read_buf_len == 0) {
594                 gint read_len;
595
596                 read_len = sock_read(session->sock, session->read_buf,
597                                      SESSION_BUFFSIZE);
598
599                 if (read_len == 0) {
600                         g_warning("sock_read: received EOF\n");
601                         session->state = SESSION_EOF;
602                         return FALSE;
603                 }
604
605                 if (read_len < 0) {
606                         switch (errno) {
607                         case EAGAIN:
608                                 return TRUE;
609                         default:
610                                 g_warning("sock_read: %s\n", g_strerror(errno));
611                                 session->state = SESSION_ERROR;
612                                 return FALSE;
613                         }
614                 }
615
616                 session->read_buf_len = read_len;
617         }
618
619         data_buf = session->read_data_buf;
620         terminator_len = strlen(session->read_data_terminator);
621
622         if (session->read_buf_len == 0)
623                 return TRUE;
624
625         g_byte_array_append(data_buf, session->read_buf_p,
626                             session->read_buf_len);
627
628         session->read_buf_len = 0;
629         session->read_buf_p = session->read_buf;
630
631         /* check if data is terminated */
632         if (data_buf->len >= terminator_len) {
633                 if (memcmp(data_buf->data, session->read_data_terminator,
634                            terminator_len) == 0)
635                         complete = TRUE;
636                 else if (data_buf->len >= terminator_len + 2 &&
637                          memcmp(data_buf->data + data_buf->len -
638                                 (terminator_len + 2), "\r\n", 2) == 0 &&
639                          memcmp(data_buf->data + data_buf->len -
640                                 terminator_len, session->read_data_terminator,
641                                 terminator_len) == 0)
642                         complete = TRUE;
643         }
644
645         /* incomplete read */
646         if (!complete) {
647                 GTimeVal tv_cur;
648
649                 g_get_current_time(&tv_cur);
650                 if (tv_cur.tv_sec - session->tv_prev.tv_sec > 0 ||
651                     tv_cur.tv_usec - session->tv_prev.tv_usec >
652                     UI_REFRESH_INTERVAL) {
653                         session->recv_data_progressive_notify
654                                 (session, data_buf->len, 0,
655                                  session->recv_data_progressive_notify_data);
656                         g_get_current_time(&session->tv_prev);
657                 }
658                 return TRUE;
659         }
660
661         /* complete */
662         if (session->io_tag > 0) {
663                 g_source_remove(session->io_tag);
664                 session->io_tag = 0;
665         }
666
667         data_len = data_buf->len - terminator_len;
668
669         /* callback */
670         ret = session->recv_data_finished(session, (gchar *)data_buf->data,
671                                           data_len);
672
673         g_byte_array_set_size(data_buf, 0);
674
675         session->recv_data_notify(session, data_len,
676                                   session->recv_data_notify_data);
677
678         if (ret < 0)
679                 session->state = SESSION_ERROR;
680
681         return FALSE;
682 }
683
684 static gint session_write_buf(Session *session)
685 {
686         gint write_len;
687         gint to_write_len;
688
689         g_return_val_if_fail(session->write_buf != NULL, -1);
690         g_return_val_if_fail(session->write_buf_p != NULL, -1);
691         g_return_val_if_fail(session->write_buf_len > 0, -1);
692
693         to_write_len = session->write_buf_len -
694                 (session->write_buf_p - session->write_buf);
695         to_write_len = MIN(to_write_len, SESSION_BUFFSIZE);
696
697         write_len = sock_write(session->sock, session->write_buf_p,
698                                to_write_len);
699
700         if (write_len < 0) {
701                 switch (errno) {
702                 case EAGAIN:
703                         write_len = 0;
704                         break;
705                 default:
706                         g_warning("sock_write: %s\n", g_strerror(errno));
707                         session->state = SESSION_ERROR;
708                         return -1;
709                 }
710         }
711
712         /* incomplete write */
713         if (session->write_buf_p - session->write_buf + write_len <
714             session->write_buf_len) {
715                 session->write_buf_p += write_len;
716                 return 1;
717         }
718
719         g_free(session->write_buf);
720         session->write_buf = NULL;
721         session->write_buf_p = NULL;
722         session->write_buf_len = 0;
723
724         return 0;
725 }
726
727 static gint session_write_data(Session *session)
728 {
729         gint write_len;
730         gint to_write_len;
731
732         g_return_val_if_fail(session->write_data != NULL, -1);
733         g_return_val_if_fail(session->write_data_p != NULL, -1);
734         g_return_val_if_fail(session->write_data_len > 0, -1);
735
736         to_write_len = session->write_data_len -
737                 (session->write_data_p - session->write_data);
738         to_write_len = MIN(to_write_len, SESSION_BUFFSIZE);
739
740         write_len = sock_write(session->sock, session->write_data_p,
741                                to_write_len);
742
743         if (write_len < 0) {
744                 switch (errno) {
745                 case EAGAIN:
746                         write_len = 0;
747                         break;
748                 default:
749                         g_warning("sock_write: %s\n", g_strerror(errno));
750                         session->state = SESSION_ERROR;
751                         return -1;
752                 }
753         }
754
755         /* incomplete write */
756         if (session->write_data_p - session->write_data + write_len <
757             session->write_data_len) {
758                 session->write_data_p += write_len;
759                 return 1;
760         }
761
762         session->write_data = NULL;
763         session->write_data_p = NULL;
764         session->write_data_len = 0;
765
766         return 0;
767 }
768
769 static gboolean session_write_msg_cb(SockInfo *source, GIOCondition condition,
770                                      gpointer data)
771 {
772         Session *session = SESSION(data);
773         gint ret;
774
775         g_return_val_if_fail(condition == G_IO_OUT, FALSE);
776         g_return_val_if_fail(session->write_buf != NULL, FALSE);
777         g_return_val_if_fail(session->write_buf_p != NULL, FALSE);
778         g_return_val_if_fail(session->write_buf_len > 0, FALSE);
779
780         ret = session_write_buf(session);
781
782         if (ret < 0) {
783                 session->state = SESSION_ERROR;
784                 return FALSE;
785         } else if (ret > 0)
786                 return TRUE;
787
788         if (session->io_tag > 0) {
789                 g_source_remove(session->io_tag);
790                 session->io_tag = 0;
791         }
792
793         session_recv_msg(session);
794
795         return FALSE;
796 }
797
798 static gboolean session_write_data_cb(SockInfo *source,
799                                       GIOCondition condition, gpointer data)
800 {
801         Session *session = SESSION(data);
802         guint write_data_len;
803         gint ret;
804
805         g_return_val_if_fail(condition == G_IO_OUT, FALSE);
806         g_return_val_if_fail(session->write_data != NULL, FALSE);
807         g_return_val_if_fail(session->write_data_p != NULL, FALSE);
808         g_return_val_if_fail(session->write_data_len > 0, FALSE);
809
810         write_data_len = session->write_data_len;
811
812         ret = session_write_data(session);
813
814         if (ret < 0) {
815                 session->state = SESSION_ERROR;
816                 return FALSE;
817         } else if (ret > 0) {
818                 GTimeVal tv_cur;
819
820                 g_get_current_time(&tv_cur);
821                 if (tv_cur.tv_sec - session->tv_prev.tv_sec > 0 ||
822                     tv_cur.tv_usec - session->tv_prev.tv_usec >
823                     UI_REFRESH_INTERVAL) {
824                         session_set_timeout(session, session->timeout_interval);
825                         session->send_data_progressive_notify
826                                 (session,
827                                  session->write_data_p - session->write_data,
828                                  write_data_len,
829                                  session->send_data_progressive_notify_data);
830                         g_get_current_time(&session->tv_prev);
831                 }
832                 return TRUE;
833         }
834
835         if (session->io_tag > 0) {
836                 g_source_remove(session->io_tag);
837                 session->io_tag = 0;
838         }
839
840         /* callback */
841         ret = session->send_data_finished(session, write_data_len);
842         session->send_data_notify(session, write_data_len,
843                                   session->send_data_notify_data);
844
845         return FALSE;
846 }