7fe3cd151c97a46cc0d493b4b816fc04e6528dbe
[claws.git] / src / etpan / etpan-thread-manager.c
1 /*
2  * Claws Mail -- a GTK+ based, lightweight, and fast e-mail client
3  * Copyright (C) 2005-2007 DINH Viet Hoa and the Claws Mail team
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 3 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program. If not, see <http://www.gnu.org/licenses/>.
17  * 
18  */
19
20 #ifdef HAVE_CONFIG_H
21 #  include "config.h"
22 #endif
23
24 #ifdef HAVE_LIBETPAN
25
26 #include "etpan-thread-manager.h"
27
28 #include <glib.h>
29 #include <stdlib.h>
30 #include <pthread.h>
31 #include <libetpan/mailsem.h>
32 #include <semaphore.h>
33 #include <unistd.h>
34 #include <fcntl.h>
35
36 #include "etpan-errors.h"
37
38 #define POOL_UNBOUND_MAX 4
39
40 #define POOL_INIT_SIZE 8
41 #define OP_INIT_SIZE 8
42
43 static int etpan_thread_start(struct etpan_thread * thread);
44 static void etpan_thread_free(struct etpan_thread * thread);
45 static unsigned int etpan_thread_get_load(struct etpan_thread * thread);
46 static int etpan_thread_is_bound(struct etpan_thread * thread);
47 static int etpan_thread_manager_is_stopped(struct etpan_thread_manager * manager);
48 static void etpan_thread_join(struct etpan_thread * thread);
49 static struct etpan_thread * etpan_thread_new(void);
50 static int etpan_thread_op_cancelled(struct etpan_thread_op * op);
51 static void etpan_thread_op_lock(struct etpan_thread_op * op);
52 static void etpan_thread_op_unlock(struct etpan_thread_op * op);
53 static void etpan_thread_stop(struct etpan_thread * thread);
54
55 #if 0
56 static void etpan_thread_bind(struct etpan_thread * thread);
57 static int etpan_thread_manager_op_schedule(struct etpan_thread_manager * manager,
58              struct etpan_thread_op * op);
59 static void etpan_thread_manager_start(struct etpan_thread_manager * manager);
60 static void etpan_thread_op_cancel(struct etpan_thread_op * op);
61 #endif
62
63 enum {
64   TERMINATE_STATE_NONE,
65   TERMINATE_STATE_REQUESTED,
66   TERMINATE_STATE_DONE,
67 };
68
69 struct etpan_thread_manager * etpan_thread_manager_new(void)
70 {
71   struct etpan_thread_manager * manager;
72   int r;
73   
74   manager = malloc(sizeof(* manager));
75   if (manager == NULL)
76     goto err;
77   
78   manager->thread_pool = carray_new(POOL_INIT_SIZE);
79   if (manager->thread_pool == NULL)
80     goto free;
81
82   manager->thread_pending = carray_new(POOL_INIT_SIZE);
83   if (manager->thread_pending == NULL)
84     goto free_pool;
85   
86   manager->can_create_thread = 1;
87   manager->unbound_count = 0;
88   
89   r = pipe(manager->notify_fds);
90   if (r < 0)
91     goto free_pending;
92   
93   return manager;
94   
95  free_pending:
96   carray_free(manager->thread_pending);
97  free_pool:
98   carray_free(manager->thread_pool);
99  free:
100   free(manager);
101  err:
102   return NULL;
103 }
104
105 void etpan_thread_manager_free(struct etpan_thread_manager * manager)
106 {
107   close(manager->notify_fds[1]);
108   close(manager->notify_fds[0]);
109   carray_free(manager->thread_pending);
110   carray_free(manager->thread_pool);
111   free(manager);
112 }
113
114 static struct etpan_thread * etpan_thread_new(void)
115 {
116   struct etpan_thread * thread;
117   int r;
118   
119   thread = malloc(sizeof(* thread));
120   if (thread == NULL)
121     goto err;
122   
123   r = pthread_mutex_init(&thread->lock, NULL);
124   if (r != 0)
125     goto free;
126
127   thread->op_list = carray_new(OP_INIT_SIZE);
128   if (thread->op_list == NULL)
129     goto destroy_lock;
130   
131   thread->op_done_list = carray_new(OP_INIT_SIZE);
132   if (thread->op_done_list == NULL)
133     goto free_op_list;
134   
135   thread->start_sem = mailsem_new();
136   if (thread->start_sem == NULL)
137     goto free_op_done_list;
138   
139   thread->stop_sem = mailsem_new();
140   if (thread->stop_sem == NULL)
141     goto free_startsem;
142   
143   thread->op_sem = mailsem_new();
144   if (thread->op_sem == NULL)
145     goto free_stopsem;
146   
147   thread->manager = NULL;
148   thread->bound_count = 0;
149   thread->terminate_state = TERMINATE_STATE_NONE;
150   
151   return thread;
152   
153  free_stopsem:
154   mailsem_free(thread->stop_sem);
155  free_startsem:
156   mailsem_free(thread->start_sem);
157  free_op_done_list:
158   carray_free(thread->op_done_list);
159  free_op_list:
160   carray_free(thread->op_list);
161  destroy_lock:
162   pthread_mutex_destroy(&thread->lock);
163  free:
164   free(thread);
165  err:
166   return NULL;
167 }
168
169 static void etpan_thread_free(struct etpan_thread * thread)
170 {
171   mailsem_free(thread->op_sem);
172   mailsem_free(thread->stop_sem);
173   mailsem_free(thread->start_sem);
174   carray_free(thread->op_done_list);
175   carray_free(thread->op_list);
176   pthread_mutex_destroy(&thread->lock);
177   free(thread);
178 }
179
180 struct etpan_thread_op * etpan_thread_op_new(void)
181 {
182   struct etpan_thread_op * op;
183   int r;
184   
185   op = malloc(sizeof(* op));
186   if (op == NULL)
187     goto err;
188   
189   op->thread = NULL;
190   op->run = NULL;
191   op->callback = NULL;
192   op->callback_data = NULL;
193   op->callback_called = 0;
194   op->cancellable = 0;
195   op->cancelled = 0;
196   op->param = NULL;
197   op->result = NULL;
198   op->finished = 0;
199   op->imap = NULL;
200   op->nntp = NULL;
201
202   r = pthread_mutex_init(&op->lock, NULL);
203   if (r != 0)
204     goto free;
205   
206   return op;
207   
208  free:
209   free(op);
210  err:
211   return NULL;
212 }
213
214 void etpan_thread_op_free(struct etpan_thread_op * op)
215 {
216   pthread_mutex_destroy(&op->lock);
217   free(op);
218 }
219
220 static struct etpan_thread *
221 etpan_thread_manager_create_thread(struct etpan_thread_manager * manager)
222 {
223   struct etpan_thread * thread;
224   int r;
225   
226   thread = etpan_thread_new();
227   if (thread == NULL)
228     goto err;
229   
230   thread->manager = manager;
231   
232   r = etpan_thread_start(thread);
233   if (r != NO_ERROR)
234     goto free_thread;
235   
236   r = carray_add(manager->thread_pool, thread, NULL);
237   if (r < 0) {
238     etpan_thread_stop(thread);
239     goto free_thread;
240   }
241   
242   return thread;
243   
244  free_thread:
245   etpan_thread_free(thread);
246  err:
247   return NULL;
248 }
249
250 static void
251 etpan_thread_manager_terminate_thread(struct etpan_thread_manager * manager,
252     struct etpan_thread * thread)
253 {
254   unsigned int i;
255   int r;
256   
257   for(i = 0 ; i < carray_count(manager->thread_pool) ; i ++) {
258     if (carray_get(manager->thread_pool, i) == thread) {
259       carray_delete(manager->thread_pool, i);
260       break;
261     }
262   }
263   
264   if (!etpan_thread_is_bound(thread))
265     manager->unbound_count --;
266   
267   r = carray_add(manager->thread_pending, thread, NULL);
268   if (r < 0) {
269     g_warning("complete failure of thread due to lack of memory (thread stop)");
270   }
271   
272   etpan_thread_stop(thread);
273 }
274
275 static void manager_notify(struct etpan_thread_manager * manager)
276 {
277   char ch;
278   ssize_t r;
279   
280   ch = 1;
281   r = write(manager->notify_fds[1], &ch, 1);
282 }
283
284 static void manager_ack(struct etpan_thread_manager * manager)
285 {
286 #ifndef G_OS_WIN32
287   char ch;
288   ssize_t r;
289   r = read(manager->notify_fds[0], &ch, 1);
290 #else
291   /* done in the GIOChannel handler in imap-thread.c and nntp-thread.c */
292 #endif
293 }
294
295 static void thread_lock(struct etpan_thread * thread)
296 {
297   pthread_mutex_lock(&thread->lock);
298 }
299
300 static void thread_unlock(struct etpan_thread * thread)
301 {
302   pthread_mutex_unlock(&thread->lock);
303 }
304
305 static void thread_notify(struct etpan_thread * thread)
306 {
307   manager_notify(thread->manager);
308 }
309
310 static void * thread_run(void * data)
311 {
312   struct etpan_thread * thread;
313   int r;
314   
315   thread = data;
316   
317   mailsem_up(thread->start_sem);
318   
319   while (1) {
320     int do_quit;
321     struct etpan_thread_op * op;
322     
323     mailsem_down(thread->op_sem);
324     
325     do_quit = 0;
326     op = NULL;
327     thread_lock(thread);
328     if (carray_count(thread->op_list) > 0) {
329       op = carray_get(thread->op_list, 0);
330       carray_delete_slow(thread->op_list, 0);
331     }
332     else {
333       do_quit = 1;
334     }
335     thread_unlock(thread);
336     
337     if (do_quit) {
338       break;
339     }
340     
341     if (!etpan_thread_op_cancelled(op)) {
342       if (op->run != NULL)
343         op->run(op);
344     }
345     
346     thread_lock(thread);
347     r = carray_add(thread->op_done_list, op, NULL);
348     if (r < 0) {
349       g_warning("complete failure of thread due to lack of memory (op done)");
350     }
351     thread_unlock(thread);
352     
353     thread_notify(thread);
354   }
355   
356   thread_lock(thread);
357   thread->terminate_state = TERMINATE_STATE_DONE;
358   thread_unlock(thread);
359   
360   thread_notify(thread);
361   
362   mailsem_up(thread->stop_sem);
363   
364   return NULL;
365 }
366
367 static int etpan_thread_start(struct etpan_thread * thread)
368 {
369   int r;
370   
371   r = pthread_create(&thread->th_id, NULL, thread_run, thread);
372   if (r != 0)
373     return ERROR_MEMORY;
374   
375   mailsem_down(thread->start_sem);
376   
377   return NO_ERROR;
378 }
379
380 static void etpan_thread_stop(struct etpan_thread * thread)
381 {
382   thread_lock(thread);
383   thread->terminate_state = TERMINATE_STATE_REQUESTED;
384   thread_unlock(thread);
385   
386   mailsem_up(thread->op_sem);
387   
388   /* this thread will be joined in the manager loop */
389 }
390
391 static int etpan_thread_is_stopped(struct etpan_thread * thread)
392 {
393   int stopped;
394   
395   thread_lock(thread);
396   stopped = (thread->terminate_state == TERMINATE_STATE_DONE);
397   thread_unlock(thread);
398   
399   return stopped;
400 }
401
402 static void etpan_thread_join(struct etpan_thread * thread)
403 {
404   mailsem_down(thread->stop_sem);
405   pthread_join(thread->th_id, NULL);
406 }
407
408 struct etpan_thread *
409 etpan_thread_manager_get_thread(struct etpan_thread_manager * manager)
410 {
411   struct etpan_thread * chosen_thread;
412   unsigned int chosen_thread_load;
413   unsigned int i;
414   struct etpan_thread * thread;
415   
416   /* chose a thread */
417   
418   chosen_thread = NULL;
419   chosen_thread_load = 0;
420   
421   for(i = 0 ; i < carray_count(manager->thread_pool) ; i ++) {
422     thread = carray_get(manager->thread_pool, i);
423     if (etpan_thread_is_bound(thread))
424       continue;
425     
426     if (chosen_thread == NULL) {
427       chosen_thread = thread;
428       chosen_thread_load = etpan_thread_get_load(thread);
429       
430       if (chosen_thread_load == 0)
431         break;
432     }
433     else {
434       unsigned int load;
435       
436       load = etpan_thread_get_load(thread);
437       
438       if (load < chosen_thread_load) {
439         chosen_thread = thread;
440         chosen_thread_load = load;
441       }
442     }
443   }
444   
445   if (chosen_thread != NULL) {
446     if (manager->can_create_thread && (chosen_thread_load != 0)) {
447       chosen_thread = NULL;
448     }
449   }
450   
451   /* choice done */
452   
453   if (chosen_thread != NULL)
454     return chosen_thread;
455   
456   thread = etpan_thread_manager_create_thread(manager);
457   if (thread == NULL)
458     goto err;
459   
460   manager->unbound_count ++;
461   if (manager->unbound_count >= POOL_UNBOUND_MAX)
462     manager->can_create_thread = 0;
463   
464   return thread;
465   
466  err:
467   return NULL;
468 }
469
470 static unsigned int etpan_thread_get_load(struct etpan_thread * thread)
471 {
472   unsigned int load;
473   
474   thread_lock(thread);
475   load = carray_count(thread->op_list);
476   thread_unlock(thread);
477   
478   return load;
479 }
480
481 #if 0
482 static void etpan_thread_bind(struct etpan_thread * thread)
483 {
484   thread->bound_count ++;
485 }
486 #endif
487
488 void etpan_thread_unbind(struct etpan_thread * thread)
489 {
490   thread->bound_count --;
491 }
492
493 static int etpan_thread_is_bound(struct etpan_thread * thread)
494 {
495   return (thread->bound_count != 0);
496 }
497
498 int etpan_thread_op_schedule(struct etpan_thread * thread,
499                              struct etpan_thread_op * op)
500 {
501   int r;
502   
503   if (thread->terminate_state != TERMINATE_STATE_NONE)
504     return ERROR_INVAL;
505   
506   thread_lock(thread);
507   r = carray_add(thread->op_list, op, NULL);
508   thread_unlock(thread);
509   
510   if (r < 0)
511     return ERROR_MEMORY;
512   
513   op->thread = thread;
514   mailsem_up(thread->op_sem);
515   
516   return NO_ERROR;
517 }
518
519 static void etpan_thread_op_lock(struct etpan_thread_op * op)
520 {
521   pthread_mutex_lock(&op->lock);
522 }
523
524 static void etpan_thread_op_unlock(struct etpan_thread_op * op)
525 {
526   pthread_mutex_unlock(&op->lock);
527 }
528
529 static int etpan_thread_op_cancelled(struct etpan_thread_op * op)
530 {
531   int cancelled;
532   
533   cancelled = 0;
534   etpan_thread_op_lock(op);
535   if (op->cancellable)
536     cancelled = op->cancelled;
537   etpan_thread_op_unlock(op);
538   
539   return cancelled;
540 }
541
542 #if 0
543 static void etpan_thread_op_cancel(struct etpan_thread_op * op)
544
545   etpan_thread_op_lock(op);
546   if (op->cancelled) {
547     g_warning("cancelled twice");
548   }
549   op->cancelled = 1;
550   if ((op->callback != NULL) && (!op->callback_called)) {
551     op->callback(op->cancelled, op->result, op->callback_data);
552     op->callback_called = 1;
553   }
554   etpan_thread_op_unlock(op);
555 }
556 #endif
557
558 #if 0
559 static int etpan_thread_manager_op_schedule(struct etpan_thread_manager * manager,
560     struct etpan_thread_op * op)
561 {
562   struct etpan_thread * thread;
563   
564   thread = etpan_thread_manager_get_thread(manager);
565   
566   if (thread == NULL)
567     goto err;
568   
569   return etpan_thread_op_schedule(thread, op);
570   
571  err:
572   return ERROR_MEMORY;
573 }
574 #endif
575
576 int etpan_thread_manager_get_fd(struct etpan_thread_manager * manager)
577 {
578   return manager->notify_fds[0];
579 }
580
581 static void loop_thread_list(carray * op_to_notify,
582     carray * thread_list)
583 {
584   unsigned int i;
585   int r;
586   
587   for(i = 0 ; i < carray_count(thread_list) ; i ++) {
588     struct etpan_thread * thread;
589     unsigned int j;
590     
591     thread = carray_get(thread_list, i);
592     
593     thread_lock(thread);
594     
595     for(j = 0 ; j < carray_count(thread->op_done_list) ; j ++) {
596       struct etpan_thread_op * op;
597       
598       op = carray_get(thread->op_done_list, j);
599       r = carray_add(op_to_notify, op, NULL);
600       if (r < 0) {
601         g_warning("complete failure of thread due to lack of memory (callback)");
602         break;
603       }
604     }
605     carray_set_size(thread->op_done_list, 0);
606     
607     thread_unlock(thread);
608   }
609 }
610
611 void etpan_thread_manager_loop(struct etpan_thread_manager * manager)
612 {
613   carray * op_to_notify;
614   unsigned int i;
615   
616   manager_ack(manager);
617   
618   op_to_notify = carray_new(OP_INIT_SIZE);
619   
620   loop_thread_list(op_to_notify, manager->thread_pool);
621   loop_thread_list(op_to_notify, manager->thread_pending);
622   
623   for(i = 0 ; i < carray_count(op_to_notify) ; i ++) {
624     struct etpan_thread_op * op;
625     int cancelled;
626     
627     op = carray_get(op_to_notify, i);
628     
629     cancelled = etpan_thread_op_cancelled(op);
630     
631     etpan_thread_op_lock(op);
632     
633     if (!op->callback_called) {
634       if (op->callback != NULL)
635         op->callback(op->cancelled, op->result, op->callback_data);
636     }
637     
638     etpan_thread_op_unlock(op);
639     
640     if (op->cleanup != NULL)
641       op->cleanup(op);
642   }
643   
644   carray_free(op_to_notify);
645   
646   i = 0;
647   while (i < carray_count(manager->thread_pending)) {
648     struct etpan_thread * thread;
649     
650     thread = carray_get(manager->thread_pending, i);
651     
652     if (etpan_thread_is_stopped(thread)) {
653       etpan_thread_join(thread);
654       
655       etpan_thread_free(thread);
656       
657       carray_delete_slow(manager->thread_pending, i);
658     }
659     else {
660       i ++;
661     }
662   }
663 }
664
665 #if 0
666 static void etpan_thread_manager_start(struct etpan_thread_manager * manager)
667 {
668   /* do nothing */
669 }
670 #endif
671
672 void etpan_thread_manager_stop(struct etpan_thread_manager * manager)
673 {
674   while (carray_count(manager->thread_pool) > 0) {
675     struct etpan_thread * thread;
676     
677     thread = carray_get(manager->thread_pool, 0);
678     etpan_thread_manager_terminate_thread(manager, thread);
679   }
680 }
681
682 static int etpan_thread_manager_is_stopped(struct etpan_thread_manager * manager)
683 {
684   return ((carray_count(manager->thread_pending) == 0) && 
685       (carray_count(manager->thread_pool) == 0));
686 }
687
688 void etpan_thread_manager_join(struct etpan_thread_manager * manager)
689 {
690   while (!etpan_thread_manager_is_stopped(manager)) {
691     etpan_thread_manager_loop(manager);
692   }
693 }
694 #endif