2 * Claws Mail -- a GTK+ based, lightweight, and fast e-mail client
3 * Copyright (C) 2005-2012 DINH Viet Hoa and the Claws Mail team
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
22 #include "claws-features.h"
27 #include "etpan-thread-manager.h"
32 #include <libetpan/mailsem.h>
33 #include <semaphore.h>
37 #include "etpan-errors.h"
40 #define POOL_UNBOUND_MAX 4
42 #define POOL_INIT_SIZE 8
43 #define OP_INIT_SIZE 8
45 static int etpan_thread_start(struct etpan_thread * thread);
46 static void etpan_thread_free(struct etpan_thread * thread);
47 static unsigned int etpan_thread_get_load(struct etpan_thread * thread);
48 static int etpan_thread_is_bound(struct etpan_thread * thread);
49 static int etpan_thread_manager_is_stopped(struct etpan_thread_manager * manager);
50 static void etpan_thread_join(struct etpan_thread * thread);
51 static struct etpan_thread * etpan_thread_new(void);
52 static int etpan_thread_op_cancelled(struct etpan_thread_op * op);
53 static void etpan_thread_op_lock(struct etpan_thread_op * op);
54 static void etpan_thread_op_unlock(struct etpan_thread_op * op);
55 static void etpan_thread_stop(struct etpan_thread * thread);
58 static void etpan_thread_bind(struct etpan_thread * thread);
59 static int etpan_thread_manager_op_schedule(struct etpan_thread_manager * manager,
60 struct etpan_thread_op * op);
61 static void etpan_thread_manager_start(struct etpan_thread_manager * manager);
62 static void etpan_thread_op_cancel(struct etpan_thread_op * op);
67 TERMINATE_STATE_REQUESTED,
71 struct etpan_thread_manager * etpan_thread_manager_new(void)
73 struct etpan_thread_manager * manager;
76 manager = malloc(sizeof(* manager));
80 manager->thread_pool = carray_new(POOL_INIT_SIZE);
81 if (manager->thread_pool == NULL)
84 manager->thread_pending = carray_new(POOL_INIT_SIZE);
85 if (manager->thread_pending == NULL)
88 manager->can_create_thread = 1;
89 manager->unbound_count = 0;
91 r = pipe(manager->notify_fds);
98 carray_free(manager->thread_pending);
100 carray_free(manager->thread_pool);
107 void etpan_thread_manager_free(struct etpan_thread_manager * manager)
109 close(manager->notify_fds[1]);
110 close(manager->notify_fds[0]);
111 carray_free(manager->thread_pending);
112 carray_free(manager->thread_pool);
116 static struct etpan_thread * etpan_thread_new(void)
118 struct etpan_thread * thread;
121 thread = malloc(sizeof(* thread));
125 r = pthread_mutex_init(&thread->lock, NULL);
129 thread->op_list = carray_new(OP_INIT_SIZE);
130 if (thread->op_list == NULL)
133 thread->op_done_list = carray_new(OP_INIT_SIZE);
134 if (thread->op_done_list == NULL)
137 thread->start_sem = mailsem_new();
138 if (thread->start_sem == NULL)
139 goto free_op_done_list;
141 thread->stop_sem = mailsem_new();
142 if (thread->stop_sem == NULL)
145 thread->op_sem = mailsem_new();
146 if (thread->op_sem == NULL)
149 thread->manager = NULL;
150 thread->bound_count = 0;
151 thread->terminate_state = TERMINATE_STATE_NONE;
156 mailsem_free(thread->stop_sem);
158 mailsem_free(thread->start_sem);
160 carray_free(thread->op_done_list);
162 carray_free(thread->op_list);
164 pthread_mutex_destroy(&thread->lock);
171 static void etpan_thread_free(struct etpan_thread * thread)
173 mailsem_free(thread->op_sem);
174 mailsem_free(thread->stop_sem);
175 mailsem_free(thread->start_sem);
176 carray_free(thread->op_done_list);
177 carray_free(thread->op_list);
178 pthread_mutex_destroy(&thread->lock);
182 struct etpan_thread_op * etpan_thread_op_new(void)
184 struct etpan_thread_op * op;
187 op = malloc(sizeof(* op));
194 op->callback_data = NULL;
195 op->callback_called = 0;
204 r = pthread_mutex_init(&op->lock, NULL);
216 void etpan_thread_op_free(struct etpan_thread_op * op)
218 pthread_mutex_destroy(&op->lock);
222 static struct etpan_thread *
223 etpan_thread_manager_create_thread(struct etpan_thread_manager * manager)
225 struct etpan_thread * thread;
228 thread = etpan_thread_new();
232 thread->manager = manager;
234 r = etpan_thread_start(thread);
238 r = carray_add(manager->thread_pool, thread, NULL);
240 etpan_thread_stop(thread);
247 etpan_thread_free(thread);
253 etpan_thread_manager_terminate_thread(struct etpan_thread_manager * manager,
254 struct etpan_thread * thread)
259 for(i = 0 ; i < carray_count(manager->thread_pool) ; i ++) {
260 if (carray_get(manager->thread_pool, i) == thread) {
261 carray_delete(manager->thread_pool, i);
266 if (!etpan_thread_is_bound(thread))
267 manager->unbound_count --;
269 r = carray_add(manager->thread_pending, thread, NULL);
271 g_warning("complete failure of thread due to lack of memory (thread stop)");
274 etpan_thread_stop(thread);
277 static void manager_notify(struct etpan_thread_manager * manager)
283 r = write(manager->notify_fds[1], &ch, 1);
285 g_warning("error writing notification to etpan thread manager");
289 static void manager_ack(struct etpan_thread_manager * manager)
294 r = read(manager->notify_fds[0], &ch, 1);
296 g_warning("error reading notification from etpan thread manager");
299 /* done in the GIOChannel handler in imap-thread.c and nntp-thread.c */
303 static void thread_lock(struct etpan_thread * thread)
305 pthread_mutex_lock(&thread->lock);
308 static void thread_unlock(struct etpan_thread * thread)
310 pthread_mutex_unlock(&thread->lock);
313 static void thread_notify(struct etpan_thread * thread)
315 manager_notify(thread->manager);
318 static void * thread_run(void * data)
320 struct etpan_thread * thread;
325 mailsem_up(thread->start_sem);
329 struct etpan_thread_op * op;
331 mailsem_down(thread->op_sem);
336 if (carray_count(thread->op_list) > 0) {
337 op = carray_get(thread->op_list, 0);
338 carray_delete_slow(thread->op_list, 0);
343 thread_unlock(thread);
349 if (!etpan_thread_op_cancelled(op)) {
355 r = carray_add(thread->op_done_list, op, NULL);
357 g_warning("complete failure of thread due to lack of memory (op done)");
359 thread_unlock(thread);
361 thread_notify(thread);
365 thread->terminate_state = TERMINATE_STATE_DONE;
366 thread_unlock(thread);
368 thread_notify(thread);
370 mailsem_up(thread->stop_sem);
375 static int etpan_thread_start(struct etpan_thread * thread)
379 r = pthread_create(&thread->th_id, NULL, thread_run, thread);
383 mailsem_down(thread->start_sem);
388 static void etpan_thread_stop(struct etpan_thread * thread)
391 thread->terminate_state = TERMINATE_STATE_REQUESTED;
392 thread_unlock(thread);
394 mailsem_up(thread->op_sem);
396 /* this thread will be joined in the manager loop */
399 static int etpan_thread_is_stopped(struct etpan_thread * thread)
404 stopped = (thread->terminate_state == TERMINATE_STATE_DONE);
405 thread_unlock(thread);
410 static void etpan_thread_join(struct etpan_thread * thread)
412 mailsem_down(thread->stop_sem);
413 pthread_join(thread->th_id, NULL);
416 struct etpan_thread *
417 etpan_thread_manager_get_thread(struct etpan_thread_manager * manager)
419 struct etpan_thread * chosen_thread;
420 unsigned int chosen_thread_load;
422 struct etpan_thread * thread;
426 chosen_thread = NULL;
427 chosen_thread_load = 0;
429 for(i = 0 ; i < carray_count(manager->thread_pool) ; i ++) {
430 thread = carray_get(manager->thread_pool, i);
431 if (etpan_thread_is_bound(thread))
434 if (chosen_thread == NULL) {
435 chosen_thread = thread;
436 chosen_thread_load = etpan_thread_get_load(thread);
438 if (chosen_thread_load == 0)
444 load = etpan_thread_get_load(thread);
446 if (load < chosen_thread_load) {
447 chosen_thread = thread;
448 chosen_thread_load = load;
453 if (chosen_thread != NULL) {
454 if (manager->can_create_thread && (chosen_thread_load != 0)) {
455 chosen_thread = NULL;
461 if (chosen_thread != NULL)
462 return chosen_thread;
464 thread = etpan_thread_manager_create_thread(manager);
468 manager->unbound_count ++;
469 if (manager->unbound_count >= POOL_UNBOUND_MAX)
470 manager->can_create_thread = 0;
478 static unsigned int etpan_thread_get_load(struct etpan_thread * thread)
483 load = carray_count(thread->op_list);
484 thread_unlock(thread);
490 static void etpan_thread_bind(struct etpan_thread * thread)
492 thread->bound_count ++;
496 void etpan_thread_unbind(struct etpan_thread * thread)
498 thread->bound_count --;
501 static int etpan_thread_is_bound(struct etpan_thread * thread)
503 return (thread->bound_count != 0);
506 int etpan_thread_op_schedule(struct etpan_thread * thread,
507 struct etpan_thread_op * op)
511 if (thread->terminate_state != TERMINATE_STATE_NONE)
515 r = carray_add(thread->op_list, op, NULL);
516 thread_unlock(thread);
522 mailsem_up(thread->op_sem);
527 static void etpan_thread_op_lock(struct etpan_thread_op * op)
529 pthread_mutex_lock(&op->lock);
532 static void etpan_thread_op_unlock(struct etpan_thread_op * op)
534 pthread_mutex_unlock(&op->lock);
537 static int etpan_thread_op_cancelled(struct etpan_thread_op * op)
542 etpan_thread_op_lock(op);
544 cancelled = op->cancelled;
545 etpan_thread_op_unlock(op);
550 int etpan_thread_manager_get_fd(struct etpan_thread_manager * manager)
552 return manager->notify_fds[0];
555 static void loop_thread_list(carray * op_to_notify,
556 carray * thread_list)
561 for(i = 0 ; i < carray_count(thread_list) ; i ++) {
562 struct etpan_thread * thread;
565 thread = carray_get(thread_list, i);
569 for(j = 0 ; j < carray_count(thread->op_done_list) ; j ++) {
570 struct etpan_thread_op * op;
572 op = carray_get(thread->op_done_list, j);
573 r = carray_add(op_to_notify, op, NULL);
575 g_warning("complete failure of thread due to lack of memory (callback)");
579 carray_set_size(thread->op_done_list, 0);
581 thread_unlock(thread);
585 void etpan_thread_manager_loop(struct etpan_thread_manager * manager)
587 carray * op_to_notify;
590 manager_ack(manager);
592 op_to_notify = carray_new(OP_INIT_SIZE);
594 loop_thread_list(op_to_notify, manager->thread_pool);
595 loop_thread_list(op_to_notify, manager->thread_pending);
597 for(i = 0 ; i < carray_count(op_to_notify) ; i ++) {
598 struct etpan_thread_op * op;
600 op = carray_get(op_to_notify, i);
602 etpan_thread_op_lock(op);
604 if (!op->callback_called) {
605 if (op->callback != NULL)
606 op->callback(op->cancelled, op->result, op->callback_data);
609 etpan_thread_op_unlock(op);
611 if (op->cleanup != NULL)
615 carray_free(op_to_notify);
618 while (i < carray_count(manager->thread_pending)) {
619 struct etpan_thread * thread;
621 thread = carray_get(manager->thread_pending, i);
623 if (etpan_thread_is_stopped(thread)) {
624 etpan_thread_join(thread);
626 etpan_thread_free(thread);
628 carray_delete_slow(manager->thread_pending, i);
637 static void etpan_thread_manager_start(struct etpan_thread_manager * manager)
643 void etpan_thread_manager_stop(struct etpan_thread_manager * manager)
645 while (carray_count(manager->thread_pool) > 0) {
646 struct etpan_thread * thread;
648 thread = carray_get(manager->thread_pool, 0);
649 etpan_thread_manager_terminate_thread(manager, thread);
653 static int etpan_thread_manager_is_stopped(struct etpan_thread_manager * manager)
655 return ((carray_count(manager->thread_pending) == 0) &&
656 (carray_count(manager->thread_pool) == 0));
659 void etpan_thread_manager_join(struct etpan_thread_manager * manager)
661 while (!etpan_thread_manager_is_stopped(manager)) {
662 etpan_thread_manager_loop(manager);