2 * Claws Mail -- a GTK+ based, lightweight, and fast e-mail client
3 * Copyright (C) 2005-2007 DINH Viet Hoa and the Claws Mail team
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
26 #include "etpan-thread-manager.h"
31 #include <libetpan/mailsem.h>
32 #include <semaphore.h>
36 #include "etpan-errors.h"
38 #define POOL_UNBOUND_MAX 4
40 #define POOL_INIT_SIZE 8
41 #define OP_INIT_SIZE 8
43 static int etpan_thread_start(struct etpan_thread * thread);
44 static void etpan_thread_free(struct etpan_thread * thread);
45 static unsigned int etpan_thread_get_load(struct etpan_thread * thread);
46 static int etpan_thread_is_bound(struct etpan_thread * thread);
47 static int etpan_thread_manager_is_stopped(struct etpan_thread_manager * manager);
48 static void etpan_thread_join(struct etpan_thread * thread);
49 static struct etpan_thread * etpan_thread_new(void);
50 static int etpan_thread_op_cancelled(struct etpan_thread_op * op);
51 static void etpan_thread_op_lock(struct etpan_thread_op * op);
52 static void etpan_thread_op_unlock(struct etpan_thread_op * op);
53 static void etpan_thread_stop(struct etpan_thread * thread);
56 static void etpan_thread_bind(struct etpan_thread * thread);
57 static int etpan_thread_manager_op_schedule(struct etpan_thread_manager * manager,
58 struct etpan_thread_op * op);
59 static void etpan_thread_manager_start(struct etpan_thread_manager * manager);
60 static void etpan_thread_op_cancel(struct etpan_thread_op * op);
65 TERMINATE_STATE_REQUESTED,
69 struct etpan_thread_manager * etpan_thread_manager_new(void)
71 struct etpan_thread_manager * manager;
74 manager = malloc(sizeof(* manager));
78 manager->thread_pool = carray_new(POOL_INIT_SIZE);
79 if (manager->thread_pool == NULL)
82 manager->thread_pending = carray_new(POOL_INIT_SIZE);
83 if (manager->thread_pending == NULL)
86 manager->can_create_thread = 1;
87 manager->unbound_count = 0;
89 r = pipe(manager->notify_fds);
96 carray_free(manager->thread_pending);
98 carray_free(manager->thread_pool);
105 void etpan_thread_manager_free(struct etpan_thread_manager * manager)
107 close(manager->notify_fds[1]);
108 close(manager->notify_fds[0]);
109 carray_free(manager->thread_pending);
110 carray_free(manager->thread_pool);
114 static struct etpan_thread * etpan_thread_new(void)
116 struct etpan_thread * thread;
119 thread = malloc(sizeof(* thread));
123 r = pthread_mutex_init(&thread->lock, NULL);
127 thread->op_list = carray_new(OP_INIT_SIZE);
128 if (thread->op_list == NULL)
131 thread->op_done_list = carray_new(OP_INIT_SIZE);
132 if (thread->op_done_list == NULL)
135 thread->start_sem = mailsem_new();
136 if (thread->start_sem == NULL)
137 goto free_op_done_list;
139 thread->stop_sem = mailsem_new();
140 if (thread->stop_sem == NULL)
143 thread->op_sem = mailsem_new();
144 if (thread->op_sem == NULL)
147 thread->manager = NULL;
148 thread->bound_count = 0;
149 thread->terminate_state = TERMINATE_STATE_NONE;
154 mailsem_free(thread->stop_sem);
156 mailsem_free(thread->start_sem);
158 carray_free(thread->op_done_list);
160 carray_free(thread->op_list);
162 pthread_mutex_destroy(&thread->lock);
169 static void etpan_thread_free(struct etpan_thread * thread)
171 mailsem_free(thread->op_sem);
172 mailsem_free(thread->stop_sem);
173 mailsem_free(thread->start_sem);
174 carray_free(thread->op_done_list);
175 carray_free(thread->op_list);
176 pthread_mutex_destroy(&thread->lock);
180 struct etpan_thread_op * etpan_thread_op_new(void)
182 struct etpan_thread_op * op;
185 op = malloc(sizeof(* op));
192 op->callback_data = NULL;
193 op->callback_called = 0;
202 r = pthread_mutex_init(&op->lock, NULL);
214 void etpan_thread_op_free(struct etpan_thread_op * op)
216 pthread_mutex_destroy(&op->lock);
220 static struct etpan_thread *
221 etpan_thread_manager_create_thread(struct etpan_thread_manager * manager)
223 struct etpan_thread * thread;
226 thread = etpan_thread_new();
230 thread->manager = manager;
232 r = etpan_thread_start(thread);
236 r = carray_add(manager->thread_pool, thread, NULL);
238 etpan_thread_stop(thread);
245 etpan_thread_free(thread);
251 etpan_thread_manager_terminate_thread(struct etpan_thread_manager * manager,
252 struct etpan_thread * thread)
257 for(i = 0 ; i < carray_count(manager->thread_pool) ; i ++) {
258 if (carray_get(manager->thread_pool, i) == thread) {
259 carray_delete(manager->thread_pool, i);
264 if (!etpan_thread_is_bound(thread))
265 manager->unbound_count --;
267 r = carray_add(manager->thread_pending, thread, NULL);
269 g_warning("complete failure of thread due to lack of memory (thread stop)");
272 etpan_thread_stop(thread);
275 static void manager_notify(struct etpan_thread_manager * manager)
281 r = write(manager->notify_fds[1], &ch, 1);
284 static void manager_ack(struct etpan_thread_manager * manager)
289 r = read(manager->notify_fds[0], &ch, 1);
291 /* done in the GIOChannel handler in imap-thread.c and nntp-thread.c */
295 static void thread_lock(struct etpan_thread * thread)
297 pthread_mutex_lock(&thread->lock);
300 static void thread_unlock(struct etpan_thread * thread)
302 pthread_mutex_unlock(&thread->lock);
305 static void thread_notify(struct etpan_thread * thread)
307 manager_notify(thread->manager);
310 static void * thread_run(void * data)
312 struct etpan_thread * thread;
317 mailsem_up(thread->start_sem);
321 struct etpan_thread_op * op;
323 mailsem_down(thread->op_sem);
328 if (carray_count(thread->op_list) > 0) {
329 op = carray_get(thread->op_list, 0);
330 carray_delete_slow(thread->op_list, 0);
335 thread_unlock(thread);
341 if (!etpan_thread_op_cancelled(op)) {
347 r = carray_add(thread->op_done_list, op, NULL);
349 g_warning("complete failure of thread due to lack of memory (op done)");
351 thread_unlock(thread);
353 thread_notify(thread);
357 thread->terminate_state = TERMINATE_STATE_DONE;
358 thread_unlock(thread);
360 thread_notify(thread);
362 mailsem_up(thread->stop_sem);
367 static int etpan_thread_start(struct etpan_thread * thread)
371 r = pthread_create(&thread->th_id, NULL, thread_run, thread);
375 mailsem_down(thread->start_sem);
380 static void etpan_thread_stop(struct etpan_thread * thread)
383 thread->terminate_state = TERMINATE_STATE_REQUESTED;
384 thread_unlock(thread);
386 mailsem_up(thread->op_sem);
388 /* this thread will be joined in the manager loop */
391 static int etpan_thread_is_stopped(struct etpan_thread * thread)
396 stopped = (thread->terminate_state == TERMINATE_STATE_DONE);
397 thread_unlock(thread);
402 static void etpan_thread_join(struct etpan_thread * thread)
404 mailsem_down(thread->stop_sem);
405 pthread_join(thread->th_id, NULL);
408 struct etpan_thread *
409 etpan_thread_manager_get_thread(struct etpan_thread_manager * manager)
411 struct etpan_thread * chosen_thread;
412 unsigned int chosen_thread_load;
414 struct etpan_thread * thread;
418 chosen_thread = NULL;
419 chosen_thread_load = 0;
421 for(i = 0 ; i < carray_count(manager->thread_pool) ; i ++) {
422 thread = carray_get(manager->thread_pool, i);
423 if (etpan_thread_is_bound(thread))
426 if (chosen_thread == NULL) {
427 chosen_thread = thread;
428 chosen_thread_load = etpan_thread_get_load(thread);
430 if (chosen_thread_load == 0)
436 load = etpan_thread_get_load(thread);
438 if (load < chosen_thread_load) {
439 chosen_thread = thread;
440 chosen_thread_load = load;
445 if (chosen_thread != NULL) {
446 if (manager->can_create_thread && (chosen_thread_load != 0)) {
447 chosen_thread = NULL;
453 if (chosen_thread != NULL)
454 return chosen_thread;
456 thread = etpan_thread_manager_create_thread(manager);
460 manager->unbound_count ++;
461 if (manager->unbound_count >= POOL_UNBOUND_MAX)
462 manager->can_create_thread = 0;
470 static unsigned int etpan_thread_get_load(struct etpan_thread * thread)
475 load = carray_count(thread->op_list);
476 thread_unlock(thread);
482 static void etpan_thread_bind(struct etpan_thread * thread)
484 thread->bound_count ++;
488 void etpan_thread_unbind(struct etpan_thread * thread)
490 thread->bound_count --;
493 static int etpan_thread_is_bound(struct etpan_thread * thread)
495 return (thread->bound_count != 0);
498 int etpan_thread_op_schedule(struct etpan_thread * thread,
499 struct etpan_thread_op * op)
503 if (thread->terminate_state != TERMINATE_STATE_NONE)
507 r = carray_add(thread->op_list, op, NULL);
508 thread_unlock(thread);
514 mailsem_up(thread->op_sem);
519 static void etpan_thread_op_lock(struct etpan_thread_op * op)
521 pthread_mutex_lock(&op->lock);
524 static void etpan_thread_op_unlock(struct etpan_thread_op * op)
526 pthread_mutex_unlock(&op->lock);
529 static int etpan_thread_op_cancelled(struct etpan_thread_op * op)
534 etpan_thread_op_lock(op);
536 cancelled = op->cancelled;
537 etpan_thread_op_unlock(op);
543 static void etpan_thread_op_cancel(struct etpan_thread_op * op)
545 etpan_thread_op_lock(op);
547 g_warning("cancelled twice");
550 if ((op->callback != NULL) && (!op->callback_called)) {
551 op->callback(op->cancelled, op->result, op->callback_data);
552 op->callback_called = 1;
554 etpan_thread_op_unlock(op);
559 static int etpan_thread_manager_op_schedule(struct etpan_thread_manager * manager,
560 struct etpan_thread_op * op)
562 struct etpan_thread * thread;
564 thread = etpan_thread_manager_get_thread(manager);
569 return etpan_thread_op_schedule(thread, op);
576 int etpan_thread_manager_get_fd(struct etpan_thread_manager * manager)
578 return manager->notify_fds[0];
581 static void loop_thread_list(carray * op_to_notify,
582 carray * thread_list)
587 for(i = 0 ; i < carray_count(thread_list) ; i ++) {
588 struct etpan_thread * thread;
591 thread = carray_get(thread_list, i);
595 for(j = 0 ; j < carray_count(thread->op_done_list) ; j ++) {
596 struct etpan_thread_op * op;
598 op = carray_get(thread->op_done_list, j);
599 r = carray_add(op_to_notify, op, NULL);
601 g_warning("complete failure of thread due to lack of memory (callback)");
605 carray_set_size(thread->op_done_list, 0);
607 thread_unlock(thread);
611 void etpan_thread_manager_loop(struct etpan_thread_manager * manager)
613 carray * op_to_notify;
616 manager_ack(manager);
618 op_to_notify = carray_new(OP_INIT_SIZE);
620 loop_thread_list(op_to_notify, manager->thread_pool);
621 loop_thread_list(op_to_notify, manager->thread_pending);
623 for(i = 0 ; i < carray_count(op_to_notify) ; i ++) {
624 struct etpan_thread_op * op;
627 op = carray_get(op_to_notify, i);
629 cancelled = etpan_thread_op_cancelled(op);
631 etpan_thread_op_lock(op);
633 if (!op->callback_called) {
634 if (op->callback != NULL)
635 op->callback(op->cancelled, op->result, op->callback_data);
638 etpan_thread_op_unlock(op);
640 if (op->cleanup != NULL)
644 carray_free(op_to_notify);
647 while (i < carray_count(manager->thread_pending)) {
648 struct etpan_thread * thread;
650 thread = carray_get(manager->thread_pending, i);
652 if (etpan_thread_is_stopped(thread)) {
653 etpan_thread_join(thread);
655 etpan_thread_free(thread);
657 carray_delete_slow(manager->thread_pending, i);
666 static void etpan_thread_manager_start(struct etpan_thread_manager * manager)
672 void etpan_thread_manager_stop(struct etpan_thread_manager * manager)
674 while (carray_count(manager->thread_pool) > 0) {
675 struct etpan_thread * thread;
677 thread = carray_get(manager->thread_pool, 0);
678 etpan_thread_manager_terminate_thread(manager, thread);
682 static int etpan_thread_manager_is_stopped(struct etpan_thread_manager * manager)
684 return ((carray_count(manager->thread_pending) == 0) &&
685 (carray_count(manager->thread_pool) == 0));
688 void etpan_thread_manager_join(struct etpan_thread_manager * manager)
690 while (!etpan_thread_manager_is_stopped(manager)) {
691 etpan_thread_manager_loop(manager);