2 * Claws Mail -- a GTK+ based, lightweight, and fast e-mail client
3 * Copyright (C) 2005-2011 DINH Viet Hoa and the Claws Mail team
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
26 #include "etpan-thread-manager.h"
31 #include <libetpan/mailsem.h>
32 #include <semaphore.h>
36 #include "etpan-errors.h"
39 #define POOL_UNBOUND_MAX 4
41 #define POOL_INIT_SIZE 8
42 #define OP_INIT_SIZE 8
44 static int etpan_thread_start(struct etpan_thread * thread);
45 static void etpan_thread_free(struct etpan_thread * thread);
46 static unsigned int etpan_thread_get_load(struct etpan_thread * thread);
47 static int etpan_thread_is_bound(struct etpan_thread * thread);
48 static int etpan_thread_manager_is_stopped(struct etpan_thread_manager * manager);
49 static void etpan_thread_join(struct etpan_thread * thread);
50 static struct etpan_thread * etpan_thread_new(void);
51 static int etpan_thread_op_cancelled(struct etpan_thread_op * op);
52 static void etpan_thread_op_lock(struct etpan_thread_op * op);
53 static void etpan_thread_op_unlock(struct etpan_thread_op * op);
54 static void etpan_thread_stop(struct etpan_thread * thread);
57 static void etpan_thread_bind(struct etpan_thread * thread);
58 static int etpan_thread_manager_op_schedule(struct etpan_thread_manager * manager,
59 struct etpan_thread_op * op);
60 static void etpan_thread_manager_start(struct etpan_thread_manager * manager);
61 static void etpan_thread_op_cancel(struct etpan_thread_op * op);
66 TERMINATE_STATE_REQUESTED,
70 struct etpan_thread_manager * etpan_thread_manager_new(void)
72 struct etpan_thread_manager * manager;
75 manager = malloc(sizeof(* manager));
79 manager->thread_pool = carray_new(POOL_INIT_SIZE);
80 if (manager->thread_pool == NULL)
83 manager->thread_pending = carray_new(POOL_INIT_SIZE);
84 if (manager->thread_pending == NULL)
87 manager->can_create_thread = 1;
88 manager->unbound_count = 0;
90 r = pipe(manager->notify_fds);
97 carray_free(manager->thread_pending);
99 carray_free(manager->thread_pool);
106 void etpan_thread_manager_free(struct etpan_thread_manager * manager)
108 close(manager->notify_fds[1]);
109 close(manager->notify_fds[0]);
110 carray_free(manager->thread_pending);
111 carray_free(manager->thread_pool);
115 static struct etpan_thread * etpan_thread_new(void)
117 struct etpan_thread * thread;
120 thread = malloc(sizeof(* thread));
124 r = pthread_mutex_init(&thread->lock, NULL);
128 thread->op_list = carray_new(OP_INIT_SIZE);
129 if (thread->op_list == NULL)
132 thread->op_done_list = carray_new(OP_INIT_SIZE);
133 if (thread->op_done_list == NULL)
136 thread->start_sem = mailsem_new();
137 if (thread->start_sem == NULL)
138 goto free_op_done_list;
140 thread->stop_sem = mailsem_new();
141 if (thread->stop_sem == NULL)
144 thread->op_sem = mailsem_new();
145 if (thread->op_sem == NULL)
148 thread->manager = NULL;
149 thread->bound_count = 0;
150 thread->terminate_state = TERMINATE_STATE_NONE;
155 mailsem_free(thread->stop_sem);
157 mailsem_free(thread->start_sem);
159 carray_free(thread->op_done_list);
161 carray_free(thread->op_list);
163 pthread_mutex_destroy(&thread->lock);
170 static void etpan_thread_free(struct etpan_thread * thread)
172 mailsem_free(thread->op_sem);
173 mailsem_free(thread->stop_sem);
174 mailsem_free(thread->start_sem);
175 carray_free(thread->op_done_list);
176 carray_free(thread->op_list);
177 pthread_mutex_destroy(&thread->lock);
181 struct etpan_thread_op * etpan_thread_op_new(void)
183 struct etpan_thread_op * op;
186 op = malloc(sizeof(* op));
193 op->callback_data = NULL;
194 op->callback_called = 0;
203 r = pthread_mutex_init(&op->lock, NULL);
215 void etpan_thread_op_free(struct etpan_thread_op * op)
217 pthread_mutex_destroy(&op->lock);
221 static struct etpan_thread *
222 etpan_thread_manager_create_thread(struct etpan_thread_manager * manager)
224 struct etpan_thread * thread;
227 thread = etpan_thread_new();
231 thread->manager = manager;
233 r = etpan_thread_start(thread);
237 r = carray_add(manager->thread_pool, thread, NULL);
239 etpan_thread_stop(thread);
246 etpan_thread_free(thread);
252 etpan_thread_manager_terminate_thread(struct etpan_thread_manager * manager,
253 struct etpan_thread * thread)
258 for(i = 0 ; i < carray_count(manager->thread_pool) ; i ++) {
259 if (carray_get(manager->thread_pool, i) == thread) {
260 carray_delete(manager->thread_pool, i);
265 if (!etpan_thread_is_bound(thread))
266 manager->unbound_count --;
268 r = carray_add(manager->thread_pending, thread, NULL);
270 g_warning("complete failure of thread due to lack of memory (thread stop)");
273 etpan_thread_stop(thread);
276 static void manager_notify(struct etpan_thread_manager * manager)
282 r = write(manager->notify_fds[1], &ch, 1);
285 static void manager_ack(struct etpan_thread_manager * manager)
290 r = read(manager->notify_fds[0], &ch, 1);
292 /* done in the GIOChannel handler in imap-thread.c and nntp-thread.c */
296 static void thread_lock(struct etpan_thread * thread)
298 pthread_mutex_lock(&thread->lock);
301 static void thread_unlock(struct etpan_thread * thread)
303 pthread_mutex_unlock(&thread->lock);
306 static void thread_notify(struct etpan_thread * thread)
308 manager_notify(thread->manager);
311 static void * thread_run(void * data)
313 struct etpan_thread * thread;
318 mailsem_up(thread->start_sem);
322 struct etpan_thread_op * op;
324 mailsem_down(thread->op_sem);
329 if (carray_count(thread->op_list) > 0) {
330 op = carray_get(thread->op_list, 0);
331 carray_delete_slow(thread->op_list, 0);
336 thread_unlock(thread);
342 if (!etpan_thread_op_cancelled(op)) {
348 r = carray_add(thread->op_done_list, op, NULL);
350 g_warning("complete failure of thread due to lack of memory (op done)");
352 thread_unlock(thread);
354 thread_notify(thread);
358 thread->terminate_state = TERMINATE_STATE_DONE;
359 thread_unlock(thread);
361 thread_notify(thread);
363 mailsem_up(thread->stop_sem);
368 static int etpan_thread_start(struct etpan_thread * thread)
372 r = pthread_create(&thread->th_id, NULL, thread_run, thread);
376 mailsem_down(thread->start_sem);
381 static void etpan_thread_stop(struct etpan_thread * thread)
384 thread->terminate_state = TERMINATE_STATE_REQUESTED;
385 thread_unlock(thread);
387 mailsem_up(thread->op_sem);
389 /* this thread will be joined in the manager loop */
392 static int etpan_thread_is_stopped(struct etpan_thread * thread)
397 stopped = (thread->terminate_state == TERMINATE_STATE_DONE);
398 thread_unlock(thread);
403 static void etpan_thread_join(struct etpan_thread * thread)
405 mailsem_down(thread->stop_sem);
406 pthread_join(thread->th_id, NULL);
409 struct etpan_thread *
410 etpan_thread_manager_get_thread(struct etpan_thread_manager * manager)
412 struct etpan_thread * chosen_thread;
413 unsigned int chosen_thread_load;
415 struct etpan_thread * thread;
419 chosen_thread = NULL;
420 chosen_thread_load = 0;
422 for(i = 0 ; i < carray_count(manager->thread_pool) ; i ++) {
423 thread = carray_get(manager->thread_pool, i);
424 if (etpan_thread_is_bound(thread))
427 if (chosen_thread == NULL) {
428 chosen_thread = thread;
429 chosen_thread_load = etpan_thread_get_load(thread);
431 if (chosen_thread_load == 0)
437 load = etpan_thread_get_load(thread);
439 if (load < chosen_thread_load) {
440 chosen_thread = thread;
441 chosen_thread_load = load;
446 if (chosen_thread != NULL) {
447 if (manager->can_create_thread && (chosen_thread_load != 0)) {
448 chosen_thread = NULL;
454 if (chosen_thread != NULL)
455 return chosen_thread;
457 thread = etpan_thread_manager_create_thread(manager);
461 manager->unbound_count ++;
462 if (manager->unbound_count >= POOL_UNBOUND_MAX)
463 manager->can_create_thread = 0;
471 static unsigned int etpan_thread_get_load(struct etpan_thread * thread)
476 load = carray_count(thread->op_list);
477 thread_unlock(thread);
483 static void etpan_thread_bind(struct etpan_thread * thread)
485 thread->bound_count ++;
489 void etpan_thread_unbind(struct etpan_thread * thread)
491 thread->bound_count --;
494 static int etpan_thread_is_bound(struct etpan_thread * thread)
496 return (thread->bound_count != 0);
499 int etpan_thread_op_schedule(struct etpan_thread * thread,
500 struct etpan_thread_op * op)
504 if (thread->terminate_state != TERMINATE_STATE_NONE)
508 r = carray_add(thread->op_list, op, NULL);
509 thread_unlock(thread);
515 mailsem_up(thread->op_sem);
520 static void etpan_thread_op_lock(struct etpan_thread_op * op)
522 pthread_mutex_lock(&op->lock);
525 static void etpan_thread_op_unlock(struct etpan_thread_op * op)
527 pthread_mutex_unlock(&op->lock);
530 static int etpan_thread_op_cancelled(struct etpan_thread_op * op)
535 etpan_thread_op_lock(op);
537 cancelled = op->cancelled;
538 etpan_thread_op_unlock(op);
543 int etpan_thread_manager_get_fd(struct etpan_thread_manager * manager)
545 return manager->notify_fds[0];
548 static void loop_thread_list(carray * op_to_notify,
549 carray * thread_list)
554 for(i = 0 ; i < carray_count(thread_list) ; i ++) {
555 struct etpan_thread * thread;
558 thread = carray_get(thread_list, i);
562 for(j = 0 ; j < carray_count(thread->op_done_list) ; j ++) {
563 struct etpan_thread_op * op;
565 op = carray_get(thread->op_done_list, j);
566 r = carray_add(op_to_notify, op, NULL);
568 g_warning("complete failure of thread due to lack of memory (callback)");
572 carray_set_size(thread->op_done_list, 0);
574 thread_unlock(thread);
578 void etpan_thread_manager_loop(struct etpan_thread_manager * manager)
580 carray * op_to_notify;
583 manager_ack(manager);
585 op_to_notify = carray_new(OP_INIT_SIZE);
587 loop_thread_list(op_to_notify, manager->thread_pool);
588 loop_thread_list(op_to_notify, manager->thread_pending);
590 for(i = 0 ; i < carray_count(op_to_notify) ; i ++) {
591 struct etpan_thread_op * op;
594 op = carray_get(op_to_notify, i);
596 cancelled = etpan_thread_op_cancelled(op);
598 etpan_thread_op_lock(op);
600 if (!op->callback_called) {
601 if (op->callback != NULL)
602 op->callback(op->cancelled, op->result, op->callback_data);
605 etpan_thread_op_unlock(op);
607 if (op->cleanup != NULL)
611 carray_free(op_to_notify);
614 while (i < carray_count(manager->thread_pending)) {
615 struct etpan_thread * thread;
617 thread = carray_get(manager->thread_pending, i);
619 if (etpan_thread_is_stopped(thread)) {
620 etpan_thread_join(thread);
622 etpan_thread_free(thread);
624 carray_delete_slow(manager->thread_pending, i);
633 static void etpan_thread_manager_start(struct etpan_thread_manager * manager)
639 void etpan_thread_manager_stop(struct etpan_thread_manager * manager)
641 while (carray_count(manager->thread_pool) > 0) {
642 struct etpan_thread * thread;
644 thread = carray_get(manager->thread_pool, 0);
645 etpan_thread_manager_terminate_thread(manager, thread);
649 static int etpan_thread_manager_is_stopped(struct etpan_thread_manager * manager)
651 return ((carray_count(manager->thread_pending) == 0) &&
652 (carray_count(manager->thread_pool) == 0));
655 void etpan_thread_manager_join(struct etpan_thread_manager * manager)
657 while (!etpan_thread_manager_is_stopped(manager)) {
658 etpan_thread_manager_loop(manager);