f10e9a68d7aec5ef1ed76c5954032dd417e5c841
[claws.git] / src / etpan / etpan-thread-manager.c
1 /*
2  * Claws Mail -- a GTK+ based, lightweight, and fast e-mail client
3  * Copyright (C) 2005-2007 DINH Viet Hoa and the Claws Mail team
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 2 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18  */
19
20 #ifdef HAVE_CONFIG_H
21 #  include "config.h"
22 #endif
23
24 #ifdef HAVE_LIBETPAN
25
26 #include "etpan-thread-manager.h"
27
28 #include <stdlib.h>
29 #include <pthread.h>
30 #include <libetpan/mailsem.h>
31 #include <semaphore.h>
32 #include <unistd.h>
33
34 #include "etpan-log.h"
35 #include "etpan-errors.h"
36
37 #define POOL_UNBOUND_MAX 4
38
39 #define POOL_INIT_SIZE 8
40 #define OP_INIT_SIZE 8
41
42 enum {
43   TERMINATE_STATE_NONE,
44   TERMINATE_STATE_REQUESTED,
45   TERMINATE_STATE_DONE,
46 };
47
48 struct etpan_thread_manager * etpan_thread_manager_new(void)
49 {
50   struct etpan_thread_manager * manager;
51   int r;
52   
53   manager = malloc(sizeof(* manager));
54   if (manager == NULL)
55     goto err;
56   
57   manager->thread_pool = carray_new(POOL_INIT_SIZE);
58   if (manager->thread_pool == NULL)
59     goto free;
60
61   manager->thread_pending = carray_new(POOL_INIT_SIZE);
62   if (manager->thread_pending == NULL)
63     goto free_pool;
64   
65   manager->can_create_thread = 1;
66   manager->unbound_count = 0;
67   
68   r = pipe(manager->notify_fds);
69   if (r < 0)
70     goto free_pending;
71   
72   return manager;
73   
74  free_pending:
75   carray_free(manager->thread_pending);
76  free_pool:
77   carray_free(manager->thread_pool);
78  free:
79   free(manager);
80  err:
81   return NULL;
82 }
83
84 void etpan_thread_manager_free(struct etpan_thread_manager * manager)
85 {
86   close(manager->notify_fds[1]);
87   close(manager->notify_fds[0]);
88   carray_free(manager->thread_pending);
89   carray_free(manager->thread_pool);
90   free(manager);
91 }
92
93 struct etpan_thread * etpan_thread_new(void)
94 {
95   struct etpan_thread * thread;
96   int r;
97   
98   thread = malloc(sizeof(* thread));
99   if (thread == NULL)
100     goto err;
101   
102   r = pthread_mutex_init(&thread->lock, NULL);
103   if (r != 0)
104     goto free;
105
106   thread->op_list = carray_new(OP_INIT_SIZE);
107   if (thread->op_list == NULL)
108     goto destroy_lock;
109   
110   thread->op_done_list = carray_new(OP_INIT_SIZE);
111   if (thread->op_done_list == NULL)
112     goto free_op_list;
113   
114   thread->start_sem = mailsem_new();
115   if (thread->start_sem == NULL)
116     goto free_op_done_list;
117   
118   thread->stop_sem = mailsem_new();
119   if (thread->stop_sem == NULL)
120     goto free_startsem;
121   
122   thread->op_sem = mailsem_new();
123   if (thread->op_sem == NULL)
124     goto free_stopsem;
125   
126   thread->manager = NULL;
127   thread->bound_count = 0;
128   thread->terminate_state = TERMINATE_STATE_NONE;
129   
130   return thread;
131   
132  free_stopsem:
133   mailsem_free(thread->stop_sem);
134  free_startsem:
135   mailsem_free(thread->start_sem);
136  free_op_done_list:
137   carray_free(thread->op_done_list);
138  free_op_list:
139   carray_free(thread->op_list);
140  destroy_lock:
141   pthread_mutex_destroy(&thread->lock);
142  free:
143   free(thread);
144  err:
145   return NULL;
146 }
147
148 void etpan_thread_free(struct etpan_thread * thread)
149 {
150   mailsem_free(thread->op_sem);
151   mailsem_free(thread->stop_sem);
152   mailsem_free(thread->start_sem);
153   carray_free(thread->op_done_list);
154   carray_free(thread->op_list);
155   pthread_mutex_destroy(&thread->lock);
156   free(thread);
157 }
158
159 struct etpan_thread_op * etpan_thread_op_new(void)
160 {
161   struct etpan_thread_op * op;
162   int r;
163   
164   op = malloc(sizeof(* op));
165   if (op == NULL)
166     goto err;
167   
168   op->thread = NULL;
169   op->run = NULL;
170   op->callback = NULL;
171   op->callback_data = NULL;
172   op->callback_called = 0;
173   op->cancellable = 0;
174   op->cancelled = 0;
175   op->param = NULL;
176   op->result = NULL;
177   
178   r = pthread_mutex_init(&op->lock, NULL);
179   if (r != 0)
180     goto free;
181   
182   return op;
183   
184  free:
185   free(op);
186  err:
187   return NULL;
188 }
189
190 void etpan_thread_op_free(struct etpan_thread_op * op)
191 {
192   pthread_mutex_destroy(&op->lock);
193   free(op);
194 }
195
196 static struct etpan_thread *
197 etpan_thread_manager_create_thread(struct etpan_thread_manager * manager)
198 {
199   struct etpan_thread * thread;
200   int r;
201   
202   thread = etpan_thread_new();
203   if (thread == NULL)
204     goto err;
205   
206   thread->manager = manager;
207   
208   r = etpan_thread_start(thread);
209   if (r != NO_ERROR)
210     goto free_thread;
211   
212   r = carray_add(manager->thread_pool, thread, NULL);
213   if (r < 0) {
214     etpan_thread_stop(thread);
215     goto free_thread;
216   }
217   
218   return thread;
219   
220  free_thread:
221   etpan_thread_free(thread);
222  err:
223   return NULL;
224 }
225
226 static void
227 etpan_thread_manager_terminate_thread(struct etpan_thread_manager * manager,
228     struct etpan_thread * thread)
229 {
230   unsigned int i;
231   int r;
232   
233   for(i = 0 ; i < carray_count(manager->thread_pool) ; i ++) {
234     if (carray_get(manager->thread_pool, i) == thread) {
235       carray_delete(manager->thread_pool, i);
236       break;
237     }
238   }
239   
240   if (!etpan_thread_is_bound(thread))
241     manager->unbound_count --;
242   
243   r = carray_add(manager->thread_pending, thread, NULL);
244   if (r < 0) {
245     ETPAN_LOG("complete failure of thread due to lack of memory (thread stop)");
246   }
247   
248   etpan_thread_stop(thread);
249 }
250
251 static void manager_notify(struct etpan_thread_manager * manager)
252 {
253   char ch;
254   ssize_t r;
255   
256   ch = 1;
257   r = write(manager->notify_fds[1], &ch, 1);
258 }
259
260 static void manager_ack(struct etpan_thread_manager * manager)
261 {
262   char ch;
263   ssize_t r;
264   
265   r = read(manager->notify_fds[0], &ch, 1);
266 }
267
268 static void thread_lock(struct etpan_thread * thread)
269 {
270   pthread_mutex_lock(&thread->lock);
271 }
272
273 static void thread_unlock(struct etpan_thread * thread)
274 {
275   pthread_mutex_unlock(&thread->lock);
276 }
277
278 static void thread_notify(struct etpan_thread * thread)
279 {
280   manager_notify(thread->manager);
281 }
282
283 static void * thread_run(void * data)
284 {
285   struct etpan_thread * thread;
286   int r;
287   
288   thread = data;
289   
290   mailsem_up(thread->start_sem);
291   
292   while (1) {
293     int do_quit;
294     struct etpan_thread_op * op;
295     
296     mailsem_down(thread->op_sem);
297     
298     do_quit = 0;
299     op = NULL;
300     thread_lock(thread);
301     if (carray_count(thread->op_list) > 0) {
302       op = carray_get(thread->op_list, 0);
303       carray_delete_slow(thread->op_list, 0);
304     }
305     else {
306       do_quit = 1;
307     }
308     thread_unlock(thread);
309     
310     if (do_quit) {
311       break;
312     }
313     
314     if (!etpan_thread_op_cancelled(op)) {
315       if (op->run != NULL)
316         op->run(op);
317     }
318     
319     thread_lock(thread);
320     r = carray_add(thread->op_done_list, op, NULL);
321     if (r < 0) {
322       ETPAN_LOG("complete failure of thread due to lack of memory (op done)");
323     }
324     thread_unlock(thread);
325     
326     thread_notify(thread);
327   }
328   
329   thread_lock(thread);
330   thread->terminate_state = TERMINATE_STATE_DONE;
331   thread_unlock(thread);
332   
333   thread_notify(thread);
334   
335   mailsem_up(thread->stop_sem);
336   
337   return NULL;
338 }
339
340 int etpan_thread_start(struct etpan_thread * thread)
341 {
342   int r;
343   
344   r = pthread_create(&thread->th_id, NULL, thread_run, thread);
345   if (r != 0)
346     return ERROR_MEMORY;
347   
348   mailsem_down(thread->start_sem);
349   
350   return NO_ERROR;
351 }
352
353 void etpan_thread_stop(struct etpan_thread * thread)
354 {
355   thread_lock(thread);
356   thread->terminate_state = TERMINATE_STATE_REQUESTED;
357   thread_unlock(thread);
358   
359   mailsem_up(thread->op_sem);
360   
361   /* this thread will be joined in the manager loop */
362 }
363
364 int etpan_thread_is_stopped(struct etpan_thread * thread)
365 {
366   int stopped;
367   
368   thread_lock(thread);
369   stopped = (thread->terminate_state == TERMINATE_STATE_DONE);
370   thread_unlock(thread);
371   
372   return stopped;
373 }
374
375 void etpan_thread_join(struct etpan_thread * thread)
376 {
377   mailsem_down(thread->stop_sem);
378   pthread_join(thread->th_id, NULL);
379 }
380
381 struct etpan_thread *
382 etpan_thread_manager_get_thread(struct etpan_thread_manager * manager)
383 {
384   struct etpan_thread * chosen_thread;
385   unsigned int chosen_thread_load;
386   unsigned int i;
387   struct etpan_thread * thread;
388   
389   /* chose a thread */
390   
391   chosen_thread = NULL;
392   chosen_thread_load = 0;
393   
394   for(i = 0 ; i < carray_count(manager->thread_pool) ; i ++) {
395     thread = carray_get(manager->thread_pool, i);
396     if (etpan_thread_is_bound(thread))
397       continue;
398     
399     if (chosen_thread == NULL) {
400       chosen_thread = thread;
401       chosen_thread_load = etpan_thread_get_load(thread);
402       
403       if (chosen_thread_load == 0)
404         break;
405     }
406     else {
407       unsigned int load;
408       
409       load = etpan_thread_get_load(thread);
410       
411       if (load < chosen_thread_load) {
412         chosen_thread = thread;
413         chosen_thread_load = load;
414       }
415     }
416   }
417   
418   if (chosen_thread != NULL) {
419     if (manager->can_create_thread && (chosen_thread_load != 0)) {
420       chosen_thread = NULL;
421     }
422   }
423   
424   /* choice done */
425   
426   if (chosen_thread != NULL)
427     return chosen_thread;
428   
429   thread = etpan_thread_manager_create_thread(manager);
430   if (thread == NULL)
431     goto err;
432   
433   manager->unbound_count ++;
434   if (manager->unbound_count >= POOL_UNBOUND_MAX)
435     manager->can_create_thread = 0;
436   
437   return thread;
438   
439  err:
440   return NULL;
441 }
442
443 unsigned int etpan_thread_get_load(struct etpan_thread * thread)
444 {
445   unsigned int load;
446   
447   thread_lock(thread);
448   load = carray_count(thread->op_list);
449   thread_unlock(thread);
450   
451   return load;
452 }
453
454 void etpan_thread_bind(struct etpan_thread * thread)
455 {
456   thread->bound_count ++;
457 }
458
459 void etpan_thread_unbind(struct etpan_thread * thread)
460 {
461   thread->bound_count --;
462 }
463
464 int etpan_thread_is_bound(struct etpan_thread * thread)
465 {
466   return (thread->bound_count != 0);
467 }
468
469 int etpan_thread_op_schedule(struct etpan_thread * thread,
470                              struct etpan_thread_op * op)
471 {
472   int r;
473   
474   if (thread->terminate_state != TERMINATE_STATE_NONE)
475     return ERROR_INVAL;
476   
477   thread_lock(thread);
478   r = carray_add(thread->op_list, op, NULL);
479   thread_unlock(thread);
480   
481   if (r < 0)
482     return ERROR_MEMORY;
483   
484   op->thread = thread;
485   mailsem_up(thread->op_sem);
486   
487   return NO_ERROR;
488 }
489
490 void etpan_thread_op_lock(struct etpan_thread_op * op)
491 {
492   pthread_mutex_lock(&op->lock);
493 }
494
495 void etpan_thread_op_unlock(struct etpan_thread_op * op)
496 {
497   pthread_mutex_unlock(&op->lock);
498 }
499
500 int etpan_thread_op_cancelled(struct etpan_thread_op * op)
501 {
502   int cancelled;
503   
504   cancelled = 0;
505   etpan_thread_op_lock(op);
506   if (op->cancellable)
507     cancelled = op->cancelled;
508   etpan_thread_op_unlock(op);
509   
510   return cancelled;
511 }
512
513 void etpan_thread_op_cancel(struct etpan_thread_op * op)
514
515   etpan_thread_op_lock(op);
516   if (op->cancelled) {
517     ETPAN_LOG("cancelled twice");
518   }
519   op->cancelled = 1;
520   if ((op->callback != NULL) && (!op->callback_called)) {
521     op->callback(op->cancelled, op->result, op->callback_data);
522     op->callback_called = 1;
523   }
524   etpan_thread_op_unlock(op);
525 }
526
527 int etpan_thread_manager_op_schedule(struct etpan_thread_manager * manager,
528     struct etpan_thread_op * op)
529 {
530   struct etpan_thread * thread;
531   
532   thread = etpan_thread_manager_get_thread(manager);
533   
534   if (thread == NULL)
535     goto err;
536   
537   return etpan_thread_op_schedule(thread, op);
538   
539  err:
540   return ERROR_MEMORY;
541 }
542
543 int etpan_thread_manager_get_fd(struct etpan_thread_manager * manager)
544 {
545   return manager->notify_fds[0];
546 }
547
548 static void loop_thread_list(carray * op_to_notify,
549     carray * thread_list)
550 {
551   unsigned int i;
552   int r;
553   
554   for(i = 0 ; i < carray_count(thread_list) ; i ++) {
555     struct etpan_thread * thread;
556     unsigned int j;
557     
558     thread = carray_get(thread_list, i);
559     
560     thread_lock(thread);
561     
562     for(j = 0 ; j < carray_count(thread->op_done_list) ; j ++) {
563       struct etpan_thread_op * op;
564       
565       op = carray_get(thread->op_done_list, j);
566       r = carray_add(op_to_notify, op, NULL);
567       if (r < 0) {
568         ETPAN_LOG("complete failure of thread due to lack of memory (callback)");
569         break;
570       }
571     }
572     carray_set_size(thread->op_done_list, 0);
573     
574     thread_unlock(thread);
575   }
576 }
577
578 void etpan_thread_manager_loop(struct etpan_thread_manager * manager)
579 {
580   carray * op_to_notify;
581   unsigned int i;
582   
583   manager_ack(manager);
584   
585   op_to_notify = carray_new(OP_INIT_SIZE);
586   
587   loop_thread_list(op_to_notify, manager->thread_pool);
588   loop_thread_list(op_to_notify, manager->thread_pending);
589   
590   for(i = 0 ; i < carray_count(op_to_notify) ; i ++) {
591     struct etpan_thread_op * op;
592     int cancelled;
593     
594     op = carray_get(op_to_notify, i);
595     
596     cancelled = etpan_thread_op_cancelled(op);
597     
598     etpan_thread_op_lock(op);
599     
600     if (!op->callback_called) {
601       if (op->callback != NULL)
602         op->callback(op->cancelled, op->result, op->callback_data);
603     }
604     
605     etpan_thread_op_unlock(op);
606     
607     if (op->cleanup != NULL)
608       op->cleanup(op);
609   }
610   
611   carray_free(op_to_notify);
612   
613   i = 0;
614   while (i < carray_count(manager->thread_pending)) {
615     struct etpan_thread * thread;
616     
617     thread = carray_get(manager->thread_pending, i);
618     
619     if (etpan_thread_is_stopped(thread)) {
620       etpan_thread_join(thread);
621       
622       etpan_thread_free(thread);
623       
624       carray_delete_slow(manager->thread_pending, i);
625     }
626     else {
627       i ++;
628     }
629   }
630 }
631
632 void etpan_thread_manager_start(struct etpan_thread_manager * manager)
633 {
634   /* do nothing */
635 }
636
637 void etpan_thread_manager_stop(struct etpan_thread_manager * manager)
638 {
639   while (carray_count(manager->thread_pool) > 0) {
640     struct etpan_thread * thread;
641     
642     thread = carray_get(manager->thread_pool, 0);
643     etpan_thread_manager_terminate_thread(manager, thread);
644   }
645 }
646
647 int etpan_thread_manager_is_stopped(struct etpan_thread_manager * manager)
648 {
649   return ((carray_count(manager->thread_pending) == 0) && 
650       (carray_count(manager->thread_pool) == 0));
651 }
652
653 void etpan_thread_manager_join(struct etpan_thread_manager * manager)
654 {
655   while (!etpan_thread_manager_is_stopped(manager)) {
656     etpan_thread_manager_loop(manager);
657   }
658 }
659 #endif