2 * Claws Mail -- a GTK+ based, lightweight, and fast e-mail client
3 * Copyright (C) 2005-2007 DINH Viet Hoa and the Claws Mail team
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
26 #include "etpan-thread-manager.h"
31 #include <libetpan/mailsem.h>
32 #include <semaphore.h>
35 #include "etpan-errors.h"
37 #define POOL_UNBOUND_MAX 4
39 #define POOL_INIT_SIZE 8
40 #define OP_INIT_SIZE 8
42 static int etpan_thread_start(struct etpan_thread * thread);
43 static void etpan_thread_free(struct etpan_thread * thread);
44 static unsigned int etpan_thread_get_load(struct etpan_thread * thread);
45 static int etpan_thread_is_bound(struct etpan_thread * thread);
46 static int etpan_thread_manager_is_stopped(struct etpan_thread_manager * manager);
47 static void etpan_thread_join(struct etpan_thread * thread);
48 static struct etpan_thread * etpan_thread_new(void);
49 static int etpan_thread_op_cancelled(struct etpan_thread_op * op);
50 static void etpan_thread_op_lock(struct etpan_thread_op * op);
51 static void etpan_thread_op_unlock(struct etpan_thread_op * op);
52 static void etpan_thread_stop(struct etpan_thread * thread);
55 static void etpan_thread_bind(struct etpan_thread * thread);
56 static int etpan_thread_manager_op_schedule(struct etpan_thread_manager * manager,
57 struct etpan_thread_op * op);
58 static void etpan_thread_manager_start(struct etpan_thread_manager * manager);
59 static void etpan_thread_op_cancel(struct etpan_thread_op * op);
64 TERMINATE_STATE_REQUESTED,
68 struct etpan_thread_manager * etpan_thread_manager_new(void)
70 struct etpan_thread_manager * manager;
73 manager = malloc(sizeof(* manager));
77 manager->thread_pool = carray_new(POOL_INIT_SIZE);
78 if (manager->thread_pool == NULL)
81 manager->thread_pending = carray_new(POOL_INIT_SIZE);
82 if (manager->thread_pending == NULL)
85 manager->can_create_thread = 1;
86 manager->unbound_count = 0;
88 r = pipe(manager->notify_fds);
95 carray_free(manager->thread_pending);
97 carray_free(manager->thread_pool);
104 void etpan_thread_manager_free(struct etpan_thread_manager * manager)
106 close(manager->notify_fds[1]);
107 close(manager->notify_fds[0]);
108 carray_free(manager->thread_pending);
109 carray_free(manager->thread_pool);
113 static struct etpan_thread * etpan_thread_new(void)
115 struct etpan_thread * thread;
118 thread = malloc(sizeof(* thread));
122 r = pthread_mutex_init(&thread->lock, NULL);
126 thread->op_list = carray_new(OP_INIT_SIZE);
127 if (thread->op_list == NULL)
130 thread->op_done_list = carray_new(OP_INIT_SIZE);
131 if (thread->op_done_list == NULL)
134 thread->start_sem = mailsem_new();
135 if (thread->start_sem == NULL)
136 goto free_op_done_list;
138 thread->stop_sem = mailsem_new();
139 if (thread->stop_sem == NULL)
142 thread->op_sem = mailsem_new();
143 if (thread->op_sem == NULL)
146 thread->manager = NULL;
147 thread->bound_count = 0;
148 thread->terminate_state = TERMINATE_STATE_NONE;
153 mailsem_free(thread->stop_sem);
155 mailsem_free(thread->start_sem);
157 carray_free(thread->op_done_list);
159 carray_free(thread->op_list);
161 pthread_mutex_destroy(&thread->lock);
168 static void etpan_thread_free(struct etpan_thread * thread)
170 mailsem_free(thread->op_sem);
171 mailsem_free(thread->stop_sem);
172 mailsem_free(thread->start_sem);
173 carray_free(thread->op_done_list);
174 carray_free(thread->op_list);
175 pthread_mutex_destroy(&thread->lock);
179 struct etpan_thread_op * etpan_thread_op_new(void)
181 struct etpan_thread_op * op;
184 op = malloc(sizeof(* op));
191 op->callback_data = NULL;
192 op->callback_called = 0;
201 r = pthread_mutex_init(&op->lock, NULL);
213 void etpan_thread_op_free(struct etpan_thread_op * op)
215 pthread_mutex_destroy(&op->lock);
219 static struct etpan_thread *
220 etpan_thread_manager_create_thread(struct etpan_thread_manager * manager)
222 struct etpan_thread * thread;
225 thread = etpan_thread_new();
229 thread->manager = manager;
231 r = etpan_thread_start(thread);
235 r = carray_add(manager->thread_pool, thread, NULL);
237 etpan_thread_stop(thread);
244 etpan_thread_free(thread);
250 etpan_thread_manager_terminate_thread(struct etpan_thread_manager * manager,
251 struct etpan_thread * thread)
256 for(i = 0 ; i < carray_count(manager->thread_pool) ; i ++) {
257 if (carray_get(manager->thread_pool, i) == thread) {
258 carray_delete(manager->thread_pool, i);
263 if (!etpan_thread_is_bound(thread))
264 manager->unbound_count --;
266 r = carray_add(manager->thread_pending, thread, NULL);
268 g_warning("complete failure of thread due to lack of memory (thread stop)");
271 etpan_thread_stop(thread);
274 static void manager_notify(struct etpan_thread_manager * manager)
280 r = write(manager->notify_fds[1], &ch, 1);
283 static void manager_ack(struct etpan_thread_manager * manager)
288 r = read(manager->notify_fds[0], &ch, 1);
291 static void thread_lock(struct etpan_thread * thread)
293 pthread_mutex_lock(&thread->lock);
296 static void thread_unlock(struct etpan_thread * thread)
298 pthread_mutex_unlock(&thread->lock);
301 static void thread_notify(struct etpan_thread * thread)
303 manager_notify(thread->manager);
306 static void * thread_run(void * data)
308 struct etpan_thread * thread;
313 mailsem_up(thread->start_sem);
317 struct etpan_thread_op * op;
319 mailsem_down(thread->op_sem);
324 if (carray_count(thread->op_list) > 0) {
325 op = carray_get(thread->op_list, 0);
326 carray_delete_slow(thread->op_list, 0);
331 thread_unlock(thread);
337 if (!etpan_thread_op_cancelled(op)) {
343 r = carray_add(thread->op_done_list, op, NULL);
345 g_warning("complete failure of thread due to lack of memory (op done)");
347 thread_unlock(thread);
349 thread_notify(thread);
353 thread->terminate_state = TERMINATE_STATE_DONE;
354 thread_unlock(thread);
356 thread_notify(thread);
358 mailsem_up(thread->stop_sem);
363 static int etpan_thread_start(struct etpan_thread * thread)
367 r = pthread_create(&thread->th_id, NULL, thread_run, thread);
371 mailsem_down(thread->start_sem);
376 static void etpan_thread_stop(struct etpan_thread * thread)
379 thread->terminate_state = TERMINATE_STATE_REQUESTED;
380 thread_unlock(thread);
382 mailsem_up(thread->op_sem);
384 /* this thread will be joined in the manager loop */
387 static int etpan_thread_is_stopped(struct etpan_thread * thread)
392 stopped = (thread->terminate_state == TERMINATE_STATE_DONE);
393 thread_unlock(thread);
398 static void etpan_thread_join(struct etpan_thread * thread)
400 mailsem_down(thread->stop_sem);
401 pthread_join(thread->th_id, NULL);
404 struct etpan_thread *
405 etpan_thread_manager_get_thread(struct etpan_thread_manager * manager)
407 struct etpan_thread * chosen_thread;
408 unsigned int chosen_thread_load;
410 struct etpan_thread * thread;
414 chosen_thread = NULL;
415 chosen_thread_load = 0;
417 for(i = 0 ; i < carray_count(manager->thread_pool) ; i ++) {
418 thread = carray_get(manager->thread_pool, i);
419 if (etpan_thread_is_bound(thread))
422 if (chosen_thread == NULL) {
423 chosen_thread = thread;
424 chosen_thread_load = etpan_thread_get_load(thread);
426 if (chosen_thread_load == 0)
432 load = etpan_thread_get_load(thread);
434 if (load < chosen_thread_load) {
435 chosen_thread = thread;
436 chosen_thread_load = load;
441 if (chosen_thread != NULL) {
442 if (manager->can_create_thread && (chosen_thread_load != 0)) {
443 chosen_thread = NULL;
449 if (chosen_thread != NULL)
450 return chosen_thread;
452 thread = etpan_thread_manager_create_thread(manager);
456 manager->unbound_count ++;
457 if (manager->unbound_count >= POOL_UNBOUND_MAX)
458 manager->can_create_thread = 0;
466 static unsigned int etpan_thread_get_load(struct etpan_thread * thread)
471 load = carray_count(thread->op_list);
472 thread_unlock(thread);
478 static void etpan_thread_bind(struct etpan_thread * thread)
480 thread->bound_count ++;
484 void etpan_thread_unbind(struct etpan_thread * thread)
486 thread->bound_count --;
489 static int etpan_thread_is_bound(struct etpan_thread * thread)
491 return (thread->bound_count != 0);
494 int etpan_thread_op_schedule(struct etpan_thread * thread,
495 struct etpan_thread_op * op)
499 if (thread->terminate_state != TERMINATE_STATE_NONE)
503 r = carray_add(thread->op_list, op, NULL);
504 thread_unlock(thread);
510 mailsem_up(thread->op_sem);
515 static void etpan_thread_op_lock(struct etpan_thread_op * op)
517 pthread_mutex_lock(&op->lock);
520 static void etpan_thread_op_unlock(struct etpan_thread_op * op)
522 pthread_mutex_unlock(&op->lock);
525 static int etpan_thread_op_cancelled(struct etpan_thread_op * op)
530 etpan_thread_op_lock(op);
532 cancelled = op->cancelled;
533 etpan_thread_op_unlock(op);
539 static void etpan_thread_op_cancel(struct etpan_thread_op * op)
541 etpan_thread_op_lock(op);
543 g_warning("cancelled twice");
546 if ((op->callback != NULL) && (!op->callback_called)) {
547 op->callback(op->cancelled, op->result, op->callback_data);
548 op->callback_called = 1;
550 etpan_thread_op_unlock(op);
555 static int etpan_thread_manager_op_schedule(struct etpan_thread_manager * manager,
556 struct etpan_thread_op * op)
558 struct etpan_thread * thread;
560 thread = etpan_thread_manager_get_thread(manager);
565 return etpan_thread_op_schedule(thread, op);
572 int etpan_thread_manager_get_fd(struct etpan_thread_manager * manager)
574 return manager->notify_fds[0];
577 static void loop_thread_list(carray * op_to_notify,
578 carray * thread_list)
583 for(i = 0 ; i < carray_count(thread_list) ; i ++) {
584 struct etpan_thread * thread;
587 thread = carray_get(thread_list, i);
591 for(j = 0 ; j < carray_count(thread->op_done_list) ; j ++) {
592 struct etpan_thread_op * op;
594 op = carray_get(thread->op_done_list, j);
595 r = carray_add(op_to_notify, op, NULL);
597 g_warning("complete failure of thread due to lack of memory (callback)");
601 carray_set_size(thread->op_done_list, 0);
603 thread_unlock(thread);
607 void etpan_thread_manager_loop(struct etpan_thread_manager * manager)
609 carray * op_to_notify;
612 manager_ack(manager);
614 op_to_notify = carray_new(OP_INIT_SIZE);
616 loop_thread_list(op_to_notify, manager->thread_pool);
617 loop_thread_list(op_to_notify, manager->thread_pending);
619 for(i = 0 ; i < carray_count(op_to_notify) ; i ++) {
620 struct etpan_thread_op * op;
623 op = carray_get(op_to_notify, i);
625 cancelled = etpan_thread_op_cancelled(op);
627 etpan_thread_op_lock(op);
629 if (!op->callback_called) {
630 if (op->callback != NULL)
631 op->callback(op->cancelled, op->result, op->callback_data);
634 etpan_thread_op_unlock(op);
636 if (op->cleanup != NULL)
640 carray_free(op_to_notify);
643 while (i < carray_count(manager->thread_pending)) {
644 struct etpan_thread * thread;
646 thread = carray_get(manager->thread_pending, i);
648 if (etpan_thread_is_stopped(thread)) {
649 etpan_thread_join(thread);
651 etpan_thread_free(thread);
653 carray_delete_slow(manager->thread_pending, i);
662 static void etpan_thread_manager_start(struct etpan_thread_manager * manager)
668 void etpan_thread_manager_stop(struct etpan_thread_manager * manager)
670 while (carray_count(manager->thread_pool) > 0) {
671 struct etpan_thread * thread;
673 thread = carray_get(manager->thread_pool, 0);
674 etpan_thread_manager_terminate_thread(manager, thread);
678 static int etpan_thread_manager_is_stopped(struct etpan_thread_manager * manager)
680 return ((carray_count(manager->thread_pending) == 0) &&
681 (carray_count(manager->thread_pool) == 0));
684 void etpan_thread_manager_join(struct etpan_thread_manager * manager)
686 while (!etpan_thread_manager_is_stopped(manager)) {
687 etpan_thread_manager_loop(manager);