2 * Claws Mail -- a GTK+ based, lightweight, and fast e-mail client
3 * Copyright (C) 2005-2007 DINH Viet Hoa and the Claws Mail team
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
26 #include "etpan-thread-manager.h"
31 #include <libetpan/mailsem.h>
32 #include <semaphore.h>
35 #include "etpan-errors.h"
37 #define POOL_UNBOUND_MAX 4
39 #define POOL_INIT_SIZE 8
40 #define OP_INIT_SIZE 8
42 static void etpan_thread_free(struct etpan_thread * thread);
43 static unsigned int etpan_thread_get_load(struct etpan_thread * thread);
44 static int etpan_thread_is_bound(struct etpan_thread * thread);
45 static int etpan_thread_manager_is_stopped(struct etpan_thread_manager * manager);
46 static void etpan_thread_join(struct etpan_thread * thread);
47 static struct etpan_thread * etpan_thread_new(void);
48 static int etpan_thread_op_cancelled(struct etpan_thread_op * op);
49 static void etpan_thread_op_lock(struct etpan_thread_op * op);
50 static void etpan_thread_op_unlock(struct etpan_thread_op * op);
51 static void etpan_thread_stop(struct etpan_thread * thread);
54 static void etpan_thread_bind(struct etpan_thread * thread);
55 static int etpan_thread_manager_op_schedule(struct etpan_thread_manager * manager,
56 struct etpan_thread_op * op);
57 static void etpan_thread_manager_start(struct etpan_thread_manager * manager);
58 static void etpan_thread_op_cancel(struct etpan_thread_op * op);
63 TERMINATE_STATE_REQUESTED,
67 struct etpan_thread_manager * etpan_thread_manager_new(void)
69 struct etpan_thread_manager * manager;
72 manager = malloc(sizeof(* manager));
76 manager->thread_pool = carray_new(POOL_INIT_SIZE);
77 if (manager->thread_pool == NULL)
80 manager->thread_pending = carray_new(POOL_INIT_SIZE);
81 if (manager->thread_pending == NULL)
84 manager->can_create_thread = 1;
85 manager->unbound_count = 0;
87 r = pipe(manager->notify_fds);
94 carray_free(manager->thread_pending);
96 carray_free(manager->thread_pool);
103 void etpan_thread_manager_free(struct etpan_thread_manager * manager)
105 close(manager->notify_fds[1]);
106 close(manager->notify_fds[0]);
107 carray_free(manager->thread_pending);
108 carray_free(manager->thread_pool);
112 static struct etpan_thread * etpan_thread_new(void)
114 struct etpan_thread * thread;
117 thread = malloc(sizeof(* thread));
121 r = pthread_mutex_init(&thread->lock, NULL);
125 thread->op_list = carray_new(OP_INIT_SIZE);
126 if (thread->op_list == NULL)
129 thread->op_done_list = carray_new(OP_INIT_SIZE);
130 if (thread->op_done_list == NULL)
133 thread->start_sem = mailsem_new();
134 if (thread->start_sem == NULL)
135 goto free_op_done_list;
137 thread->stop_sem = mailsem_new();
138 if (thread->stop_sem == NULL)
141 thread->op_sem = mailsem_new();
142 if (thread->op_sem == NULL)
145 thread->manager = NULL;
146 thread->bound_count = 0;
147 thread->terminate_state = TERMINATE_STATE_NONE;
152 mailsem_free(thread->stop_sem);
154 mailsem_free(thread->start_sem);
156 carray_free(thread->op_done_list);
158 carray_free(thread->op_list);
160 pthread_mutex_destroy(&thread->lock);
167 static void etpan_thread_free(struct etpan_thread * thread)
169 mailsem_free(thread->op_sem);
170 mailsem_free(thread->stop_sem);
171 mailsem_free(thread->start_sem);
172 carray_free(thread->op_done_list);
173 carray_free(thread->op_list);
174 pthread_mutex_destroy(&thread->lock);
178 struct etpan_thread_op * etpan_thread_op_new(void)
180 struct etpan_thread_op * op;
183 op = malloc(sizeof(* op));
190 op->callback_data = NULL;
191 op->callback_called = 0;
197 r = pthread_mutex_init(&op->lock, NULL);
209 void etpan_thread_op_free(struct etpan_thread_op * op)
211 pthread_mutex_destroy(&op->lock);
215 static struct etpan_thread *
216 etpan_thread_manager_create_thread(struct etpan_thread_manager * manager)
218 struct etpan_thread * thread;
221 thread = etpan_thread_new();
225 thread->manager = manager;
227 r = etpan_thread_start(thread);
231 r = carray_add(manager->thread_pool, thread, NULL);
233 etpan_thread_stop(thread);
240 etpan_thread_free(thread);
246 etpan_thread_manager_terminate_thread(struct etpan_thread_manager * manager,
247 struct etpan_thread * thread)
252 for(i = 0 ; i < carray_count(manager->thread_pool) ; i ++) {
253 if (carray_get(manager->thread_pool, i) == thread) {
254 carray_delete(manager->thread_pool, i);
259 if (!etpan_thread_is_bound(thread))
260 manager->unbound_count --;
262 r = carray_add(manager->thread_pending, thread, NULL);
264 g_warning("complete failure of thread due to lack of memory (thread stop)");
267 etpan_thread_stop(thread);
270 static void manager_notify(struct etpan_thread_manager * manager)
276 r = write(manager->notify_fds[1], &ch, 1);
279 static void manager_ack(struct etpan_thread_manager * manager)
284 r = read(manager->notify_fds[0], &ch, 1);
287 static void thread_lock(struct etpan_thread * thread)
289 pthread_mutex_lock(&thread->lock);
292 static void thread_unlock(struct etpan_thread * thread)
294 pthread_mutex_unlock(&thread->lock);
297 static void thread_notify(struct etpan_thread * thread)
299 manager_notify(thread->manager);
302 static void * thread_run(void * data)
304 struct etpan_thread * thread;
309 mailsem_up(thread->start_sem);
313 struct etpan_thread_op * op;
315 mailsem_down(thread->op_sem);
320 if (carray_count(thread->op_list) > 0) {
321 op = carray_get(thread->op_list, 0);
322 carray_delete_slow(thread->op_list, 0);
327 thread_unlock(thread);
333 if (!etpan_thread_op_cancelled(op)) {
339 r = carray_add(thread->op_done_list, op, NULL);
341 g_warning("complete failure of thread due to lack of memory (op done)");
343 thread_unlock(thread);
345 thread_notify(thread);
349 thread->terminate_state = TERMINATE_STATE_DONE;
350 thread_unlock(thread);
352 thread_notify(thread);
354 mailsem_up(thread->stop_sem);
359 int etpan_thread_start(struct etpan_thread * thread)
363 r = pthread_create(&thread->th_id, NULL, thread_run, thread);
367 mailsem_down(thread->start_sem);
372 static void etpan_thread_stop(struct etpan_thread * thread)
375 thread->terminate_state = TERMINATE_STATE_REQUESTED;
376 thread_unlock(thread);
378 mailsem_up(thread->op_sem);
380 /* this thread will be joined in the manager loop */
383 int etpan_thread_is_stopped(struct etpan_thread * thread)
388 stopped = (thread->terminate_state == TERMINATE_STATE_DONE);
389 thread_unlock(thread);
394 static void etpan_thread_join(struct etpan_thread * thread)
396 mailsem_down(thread->stop_sem);
397 pthread_join(thread->th_id, NULL);
400 struct etpan_thread *
401 etpan_thread_manager_get_thread(struct etpan_thread_manager * manager)
403 struct etpan_thread * chosen_thread;
404 unsigned int chosen_thread_load;
406 struct etpan_thread * thread;
410 chosen_thread = NULL;
411 chosen_thread_load = 0;
413 for(i = 0 ; i < carray_count(manager->thread_pool) ; i ++) {
414 thread = carray_get(manager->thread_pool, i);
415 if (etpan_thread_is_bound(thread))
418 if (chosen_thread == NULL) {
419 chosen_thread = thread;
420 chosen_thread_load = etpan_thread_get_load(thread);
422 if (chosen_thread_load == 0)
428 load = etpan_thread_get_load(thread);
430 if (load < chosen_thread_load) {
431 chosen_thread = thread;
432 chosen_thread_load = load;
437 if (chosen_thread != NULL) {
438 if (manager->can_create_thread && (chosen_thread_load != 0)) {
439 chosen_thread = NULL;
445 if (chosen_thread != NULL)
446 return chosen_thread;
448 thread = etpan_thread_manager_create_thread(manager);
452 manager->unbound_count ++;
453 if (manager->unbound_count >= POOL_UNBOUND_MAX)
454 manager->can_create_thread = 0;
462 static unsigned int etpan_thread_get_load(struct etpan_thread * thread)
467 load = carray_count(thread->op_list);
468 thread_unlock(thread);
474 static void etpan_thread_bind(struct etpan_thread * thread)
476 thread->bound_count ++;
480 void etpan_thread_unbind(struct etpan_thread * thread)
482 thread->bound_count --;
485 static int etpan_thread_is_bound(struct etpan_thread * thread)
487 return (thread->bound_count != 0);
490 int etpan_thread_op_schedule(struct etpan_thread * thread,
491 struct etpan_thread_op * op)
495 if (thread->terminate_state != TERMINATE_STATE_NONE)
499 r = carray_add(thread->op_list, op, NULL);
500 thread_unlock(thread);
506 mailsem_up(thread->op_sem);
511 static void etpan_thread_op_lock(struct etpan_thread_op * op)
513 pthread_mutex_lock(&op->lock);
516 static void etpan_thread_op_unlock(struct etpan_thread_op * op)
518 pthread_mutex_unlock(&op->lock);
521 static int etpan_thread_op_cancelled(struct etpan_thread_op * op)
526 etpan_thread_op_lock(op);
528 cancelled = op->cancelled;
529 etpan_thread_op_unlock(op);
535 static void etpan_thread_op_cancel(struct etpan_thread_op * op)
537 etpan_thread_op_lock(op);
539 g_warning("cancelled twice");
542 if ((op->callback != NULL) && (!op->callback_called)) {
543 op->callback(op->cancelled, op->result, op->callback_data);
544 op->callback_called = 1;
546 etpan_thread_op_unlock(op);
551 static int etpan_thread_manager_op_schedule(struct etpan_thread_manager * manager,
552 struct etpan_thread_op * op)
554 struct etpan_thread * thread;
556 thread = etpan_thread_manager_get_thread(manager);
561 return etpan_thread_op_schedule(thread, op);
568 int etpan_thread_manager_get_fd(struct etpan_thread_manager * manager)
570 return manager->notify_fds[0];
573 static void loop_thread_list(carray * op_to_notify,
574 carray * thread_list)
579 for(i = 0 ; i < carray_count(thread_list) ; i ++) {
580 struct etpan_thread * thread;
583 thread = carray_get(thread_list, i);
587 for(j = 0 ; j < carray_count(thread->op_done_list) ; j ++) {
588 struct etpan_thread_op * op;
590 op = carray_get(thread->op_done_list, j);
591 r = carray_add(op_to_notify, op, NULL);
593 g_warning("complete failure of thread due to lack of memory (callback)");
597 carray_set_size(thread->op_done_list, 0);
599 thread_unlock(thread);
603 void etpan_thread_manager_loop(struct etpan_thread_manager * manager)
605 carray * op_to_notify;
608 manager_ack(manager);
610 op_to_notify = carray_new(OP_INIT_SIZE);
612 loop_thread_list(op_to_notify, manager->thread_pool);
613 loop_thread_list(op_to_notify, manager->thread_pending);
615 for(i = 0 ; i < carray_count(op_to_notify) ; i ++) {
616 struct etpan_thread_op * op;
619 op = carray_get(op_to_notify, i);
621 cancelled = etpan_thread_op_cancelled(op);
623 etpan_thread_op_lock(op);
625 if (!op->callback_called) {
626 if (op->callback != NULL)
627 op->callback(op->cancelled, op->result, op->callback_data);
630 etpan_thread_op_unlock(op);
632 if (op->cleanup != NULL)
636 carray_free(op_to_notify);
639 while (i < carray_count(manager->thread_pending)) {
640 struct etpan_thread * thread;
642 thread = carray_get(manager->thread_pending, i);
644 if (etpan_thread_is_stopped(thread)) {
645 etpan_thread_join(thread);
647 etpan_thread_free(thread);
649 carray_delete_slow(manager->thread_pending, i);
658 static void etpan_thread_manager_start(struct etpan_thread_manager * manager)
664 void etpan_thread_manager_stop(struct etpan_thread_manager * manager)
666 while (carray_count(manager->thread_pool) > 0) {
667 struct etpan_thread * thread;
669 thread = carray_get(manager->thread_pool, 0);
670 etpan_thread_manager_terminate_thread(manager, thread);
674 static int etpan_thread_manager_is_stopped(struct etpan_thread_manager * manager)
676 return ((carray_count(manager->thread_pending) == 0) &&
677 (carray_count(manager->thread_pool) == 0));
680 void etpan_thread_manager_join(struct etpan_thread_manager * manager)
682 while (!etpan_thread_manager_is_stopped(manager)) {
683 etpan_thread_manager_loop(manager);