2011-07-30 [mir] 3.7.9cvs38
[claws.git] / src / etpan / etpan-thread-manager.c
1 /*
2  * Claws Mail -- a GTK+ based, lightweight, and fast e-mail client
3  * Copyright (C) 2005-2011 DINH Viet Hoa and the Claws Mail team
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 3 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program. If not, see <http://www.gnu.org/licenses/>.
17  * 
18  */
19
20 #ifdef HAVE_CONFIG_H
21 #  include "config.h"
22 #endif
23
24 #ifdef HAVE_LIBETPAN
25
26 #include "etpan-thread-manager.h"
27
28 #include <glib.h>
29 #include <stdlib.h>
30 #include <pthread.h>
31 #include <libetpan/mailsem.h>
32 #include <semaphore.h>
33 #include <unistd.h>
34 #include <fcntl.h>
35
36 #include "etpan-errors.h"
37 #include "utils.h"
38
39 #define POOL_UNBOUND_MAX 4
40
41 #define POOL_INIT_SIZE 8
42 #define OP_INIT_SIZE 8
43
44 static int etpan_thread_start(struct etpan_thread * thread);
45 static void etpan_thread_free(struct etpan_thread * thread);
46 static unsigned int etpan_thread_get_load(struct etpan_thread * thread);
47 static int etpan_thread_is_bound(struct etpan_thread * thread);
48 static int etpan_thread_manager_is_stopped(struct etpan_thread_manager * manager);
49 static void etpan_thread_join(struct etpan_thread * thread);
50 static struct etpan_thread * etpan_thread_new(void);
51 static int etpan_thread_op_cancelled(struct etpan_thread_op * op);
52 static void etpan_thread_op_lock(struct etpan_thread_op * op);
53 static void etpan_thread_op_unlock(struct etpan_thread_op * op);
54 static void etpan_thread_stop(struct etpan_thread * thread);
55
56 #if 0
57 static void etpan_thread_bind(struct etpan_thread * thread);
58 static int etpan_thread_manager_op_schedule(struct etpan_thread_manager * manager,
59              struct etpan_thread_op * op);
60 static void etpan_thread_manager_start(struct etpan_thread_manager * manager);
61 static void etpan_thread_op_cancel(struct etpan_thread_op * op);
62 #endif
63
64 enum {
65   TERMINATE_STATE_NONE,
66   TERMINATE_STATE_REQUESTED,
67   TERMINATE_STATE_DONE,
68 };
69
70 struct etpan_thread_manager * etpan_thread_manager_new(void)
71 {
72   struct etpan_thread_manager * manager;
73   int r;
74   
75   manager = malloc(sizeof(* manager));
76   if (manager == NULL)
77     goto err;
78   
79   manager->thread_pool = carray_new(POOL_INIT_SIZE);
80   if (manager->thread_pool == NULL)
81     goto free;
82
83   manager->thread_pending = carray_new(POOL_INIT_SIZE);
84   if (manager->thread_pending == NULL)
85     goto free_pool;
86   
87   manager->can_create_thread = 1;
88   manager->unbound_count = 0;
89   
90   r = pipe(manager->notify_fds);
91   if (r < 0)
92     goto free_pending;
93   
94   return manager;
95   
96  free_pending:
97   carray_free(manager->thread_pending);
98  free_pool:
99   carray_free(manager->thread_pool);
100  free:
101   free(manager);
102  err:
103   return NULL;
104 }
105
106 void etpan_thread_manager_free(struct etpan_thread_manager * manager)
107 {
108   close(manager->notify_fds[1]);
109   close(manager->notify_fds[0]);
110   carray_free(manager->thread_pending);
111   carray_free(manager->thread_pool);
112   free(manager);
113 }
114
115 static struct etpan_thread * etpan_thread_new(void)
116 {
117   struct etpan_thread * thread;
118   int r;
119   
120   thread = malloc(sizeof(* thread));
121   if (thread == NULL)
122     goto err;
123   
124   r = pthread_mutex_init(&thread->lock, NULL);
125   if (r != 0)
126     goto free;
127
128   thread->op_list = carray_new(OP_INIT_SIZE);
129   if (thread->op_list == NULL)
130     goto destroy_lock;
131   
132   thread->op_done_list = carray_new(OP_INIT_SIZE);
133   if (thread->op_done_list == NULL)
134     goto free_op_list;
135   
136   thread->start_sem = mailsem_new();
137   if (thread->start_sem == NULL)
138     goto free_op_done_list;
139   
140   thread->stop_sem = mailsem_new();
141   if (thread->stop_sem == NULL)
142     goto free_startsem;
143   
144   thread->op_sem = mailsem_new();
145   if (thread->op_sem == NULL)
146     goto free_stopsem;
147   
148   thread->manager = NULL;
149   thread->bound_count = 0;
150   thread->terminate_state = TERMINATE_STATE_NONE;
151   
152   return thread;
153   
154  free_stopsem:
155   mailsem_free(thread->stop_sem);
156  free_startsem:
157   mailsem_free(thread->start_sem);
158  free_op_done_list:
159   carray_free(thread->op_done_list);
160  free_op_list:
161   carray_free(thread->op_list);
162  destroy_lock:
163   pthread_mutex_destroy(&thread->lock);
164  free:
165   free(thread);
166  err:
167   return NULL;
168 }
169
170 static void etpan_thread_free(struct etpan_thread * thread)
171 {
172   mailsem_free(thread->op_sem);
173   mailsem_free(thread->stop_sem);
174   mailsem_free(thread->start_sem);
175   carray_free(thread->op_done_list);
176   carray_free(thread->op_list);
177   pthread_mutex_destroy(&thread->lock);
178   free(thread);
179 }
180
181 struct etpan_thread_op * etpan_thread_op_new(void)
182 {
183   struct etpan_thread_op * op;
184   int r;
185   
186   op = malloc(sizeof(* op));
187   if (op == NULL)
188     goto err;
189   
190   op->thread = NULL;
191   op->run = NULL;
192   op->callback = NULL;
193   op->callback_data = NULL;
194   op->callback_called = 0;
195   op->cancellable = 0;
196   op->cancelled = 0;
197   op->param = NULL;
198   op->result = NULL;
199   op->finished = 0;
200   op->imap = NULL;
201   op->nntp = NULL;
202
203   r = pthread_mutex_init(&op->lock, NULL);
204   if (r != 0)
205     goto free;
206   
207   return op;
208   
209  free:
210   free(op);
211  err:
212   return NULL;
213 }
214
215 void etpan_thread_op_free(struct etpan_thread_op * op)
216 {
217   pthread_mutex_destroy(&op->lock);
218   free(op);
219 }
220
221 static struct etpan_thread *
222 etpan_thread_manager_create_thread(struct etpan_thread_manager * manager)
223 {
224   struct etpan_thread * thread;
225   int r;
226   
227   thread = etpan_thread_new();
228   if (thread == NULL)
229     goto err;
230   
231   thread->manager = manager;
232   
233   r = etpan_thread_start(thread);
234   if (r != NO_ERROR)
235     goto free_thread;
236   
237   r = carray_add(manager->thread_pool, thread, NULL);
238   if (r < 0) {
239     etpan_thread_stop(thread);
240     goto free_thread;
241   }
242   
243   return thread;
244   
245  free_thread:
246   etpan_thread_free(thread);
247  err:
248   return NULL;
249 }
250
251 static void
252 etpan_thread_manager_terminate_thread(struct etpan_thread_manager * manager,
253     struct etpan_thread * thread)
254 {
255   unsigned int i;
256   int r;
257   
258   for(i = 0 ; i < carray_count(manager->thread_pool) ; i ++) {
259     if (carray_get(manager->thread_pool, i) == thread) {
260       carray_delete(manager->thread_pool, i);
261       break;
262     }
263   }
264   
265   if (!etpan_thread_is_bound(thread))
266     manager->unbound_count --;
267   
268   r = carray_add(manager->thread_pending, thread, NULL);
269   if (r < 0) {
270     g_warning("complete failure of thread due to lack of memory (thread stop)");
271   }
272   
273   etpan_thread_stop(thread);
274 }
275
276 static void manager_notify(struct etpan_thread_manager * manager)
277 {
278   char ch;
279   ssize_t r;
280   
281   ch = 1;
282   r = write(manager->notify_fds[1], &ch, 1);
283 }
284
285 static void manager_ack(struct etpan_thread_manager * manager)
286 {
287 #ifndef G_OS_WIN32
288   char ch;
289   ssize_t r;
290   r = read(manager->notify_fds[0], &ch, 1);
291 #else
292   /* done in the GIOChannel handler in imap-thread.c and nntp-thread.c */
293 #endif
294 }
295
296 static void thread_lock(struct etpan_thread * thread)
297 {
298   pthread_mutex_lock(&thread->lock);
299 }
300
301 static void thread_unlock(struct etpan_thread * thread)
302 {
303   pthread_mutex_unlock(&thread->lock);
304 }
305
306 static void thread_notify(struct etpan_thread * thread)
307 {
308   manager_notify(thread->manager);
309 }
310
311 static void * thread_run(void * data)
312 {
313   struct etpan_thread * thread;
314   int r;
315   
316   thread = data;
317   
318   mailsem_up(thread->start_sem);
319   
320   while (1) {
321     int do_quit;
322     struct etpan_thread_op * op;
323     
324     mailsem_down(thread->op_sem);
325     
326     do_quit = 0;
327     op = NULL;
328     thread_lock(thread);
329     if (carray_count(thread->op_list) > 0) {
330       op = carray_get(thread->op_list, 0);
331       carray_delete_slow(thread->op_list, 0);
332     }
333     else {
334       do_quit = 1;
335     }
336     thread_unlock(thread);
337     
338     if (do_quit) {
339       break;
340     }
341     
342     if (!etpan_thread_op_cancelled(op)) {
343       if (op->run != NULL)
344         op->run(op);
345     }
346     
347     thread_lock(thread);
348     r = carray_add(thread->op_done_list, op, NULL);
349     if (r < 0) {
350       g_warning("complete failure of thread due to lack of memory (op done)");
351     }
352     thread_unlock(thread);
353     
354     thread_notify(thread);
355   }
356   
357   thread_lock(thread);
358   thread->terminate_state = TERMINATE_STATE_DONE;
359   thread_unlock(thread);
360   
361   thread_notify(thread);
362   
363   mailsem_up(thread->stop_sem);
364   
365   return NULL;
366 }
367
368 static int etpan_thread_start(struct etpan_thread * thread)
369 {
370   int r;
371   
372   r = pthread_create(&thread->th_id, NULL, thread_run, thread);
373   if (r != 0)
374     return ERROR_MEMORY;
375   
376   mailsem_down(thread->start_sem);
377   
378   return NO_ERROR;
379 }
380
381 static void etpan_thread_stop(struct etpan_thread * thread)
382 {
383   thread_lock(thread);
384   thread->terminate_state = TERMINATE_STATE_REQUESTED;
385   thread_unlock(thread);
386   
387   mailsem_up(thread->op_sem);
388   
389   /* this thread will be joined in the manager loop */
390 }
391
392 static int etpan_thread_is_stopped(struct etpan_thread * thread)
393 {
394   int stopped;
395   
396   thread_lock(thread);
397   stopped = (thread->terminate_state == TERMINATE_STATE_DONE);
398   thread_unlock(thread);
399   
400   return stopped;
401 }
402
403 static void etpan_thread_join(struct etpan_thread * thread)
404 {
405   mailsem_down(thread->stop_sem);
406   pthread_join(thread->th_id, NULL);
407 }
408
409 struct etpan_thread *
410 etpan_thread_manager_get_thread(struct etpan_thread_manager * manager)
411 {
412   struct etpan_thread * chosen_thread;
413   unsigned int chosen_thread_load;
414   unsigned int i;
415   struct etpan_thread * thread;
416   
417   /* chose a thread */
418   
419   chosen_thread = NULL;
420   chosen_thread_load = 0;
421   
422   for(i = 0 ; i < carray_count(manager->thread_pool) ; i ++) {
423     thread = carray_get(manager->thread_pool, i);
424     if (etpan_thread_is_bound(thread))
425       continue;
426     
427     if (chosen_thread == NULL) {
428       chosen_thread = thread;
429       chosen_thread_load = etpan_thread_get_load(thread);
430       
431       if (chosen_thread_load == 0)
432         break;
433     }
434     else {
435       unsigned int load;
436       
437       load = etpan_thread_get_load(thread);
438       
439       if (load < chosen_thread_load) {
440         chosen_thread = thread;
441         chosen_thread_load = load;
442       }
443     }
444   }
445   
446   if (chosen_thread != NULL) {
447     if (manager->can_create_thread && (chosen_thread_load != 0)) {
448       chosen_thread = NULL;
449     }
450   }
451   
452   /* choice done */
453   
454   if (chosen_thread != NULL)
455     return chosen_thread;
456   
457   thread = etpan_thread_manager_create_thread(manager);
458   if (thread == NULL)
459     goto err;
460   
461   manager->unbound_count ++;
462   if (manager->unbound_count >= POOL_UNBOUND_MAX)
463     manager->can_create_thread = 0;
464   
465   return thread;
466   
467  err:
468   return NULL;
469 }
470
471 static unsigned int etpan_thread_get_load(struct etpan_thread * thread)
472 {
473   unsigned int load;
474   
475   thread_lock(thread);
476   load = carray_count(thread->op_list);
477   thread_unlock(thread);
478   
479   return load;
480 }
481
482 #if 0
483 static void etpan_thread_bind(struct etpan_thread * thread)
484 {
485   thread->bound_count ++;
486 }
487 #endif
488
489 void etpan_thread_unbind(struct etpan_thread * thread)
490 {
491   thread->bound_count --;
492 }
493
494 static int etpan_thread_is_bound(struct etpan_thread * thread)
495 {
496   return (thread->bound_count != 0);
497 }
498
499 int etpan_thread_op_schedule(struct etpan_thread * thread,
500                              struct etpan_thread_op * op)
501 {
502   int r;
503   
504   if (thread->terminate_state != TERMINATE_STATE_NONE)
505     return ERROR_INVAL;
506   
507   thread_lock(thread);
508   r = carray_add(thread->op_list, op, NULL);
509   thread_unlock(thread);
510   
511   if (r < 0)
512     return ERROR_MEMORY;
513   
514   op->thread = thread;
515   mailsem_up(thread->op_sem);
516   
517   return NO_ERROR;
518 }
519
520 static void etpan_thread_op_lock(struct etpan_thread_op * op)
521 {
522   pthread_mutex_lock(&op->lock);
523 }
524
525 static void etpan_thread_op_unlock(struct etpan_thread_op * op)
526 {
527   pthread_mutex_unlock(&op->lock);
528 }
529
530 static int etpan_thread_op_cancelled(struct etpan_thread_op * op)
531 {
532   int cancelled;
533   
534   cancelled = 0;
535   etpan_thread_op_lock(op);
536   if (op->cancellable)
537     cancelled = op->cancelled;
538   etpan_thread_op_unlock(op);
539   
540   return cancelled;
541 }
542
543 int etpan_thread_manager_get_fd(struct etpan_thread_manager * manager)
544 {
545   return manager->notify_fds[0];
546 }
547
548 static void loop_thread_list(carray * op_to_notify,
549     carray * thread_list)
550 {
551   unsigned int i;
552   int r;
553   
554   for(i = 0 ; i < carray_count(thread_list) ; i ++) {
555     struct etpan_thread * thread;
556     unsigned int j;
557     
558     thread = carray_get(thread_list, i);
559     
560     thread_lock(thread);
561     
562     for(j = 0 ; j < carray_count(thread->op_done_list) ; j ++) {
563       struct etpan_thread_op * op;
564       
565       op = carray_get(thread->op_done_list, j);
566       r = carray_add(op_to_notify, op, NULL);
567       if (r < 0) {
568         g_warning("complete failure of thread due to lack of memory (callback)");
569         break;
570       }
571     }
572     carray_set_size(thread->op_done_list, 0);
573     
574     thread_unlock(thread);
575   }
576 }
577
578 void etpan_thread_manager_loop(struct etpan_thread_manager * manager)
579 {
580   carray * op_to_notify;
581   unsigned int i;
582   
583   manager_ack(manager);
584   
585   op_to_notify = carray_new(OP_INIT_SIZE);
586   
587   loop_thread_list(op_to_notify, manager->thread_pool);
588   loop_thread_list(op_to_notify, manager->thread_pending);
589   
590   for(i = 0 ; i < carray_count(op_to_notify) ; i ++) {
591     struct etpan_thread_op * op;
592     int cancelled;
593     
594     op = carray_get(op_to_notify, i);
595     
596     cancelled = etpan_thread_op_cancelled(op);
597     
598     etpan_thread_op_lock(op);
599     
600     if (!op->callback_called) {
601       if (op->callback != NULL)
602         op->callback(op->cancelled, op->result, op->callback_data);
603     }
604     
605     etpan_thread_op_unlock(op);
606     
607     if (op->cleanup != NULL)
608       op->cleanup(op);
609   }
610   
611   carray_free(op_to_notify);
612   
613   i = 0;
614   while (i < carray_count(manager->thread_pending)) {
615     struct etpan_thread * thread;
616     
617     thread = carray_get(manager->thread_pending, i);
618     
619     if (etpan_thread_is_stopped(thread)) {
620       etpan_thread_join(thread);
621       
622       etpan_thread_free(thread);
623       
624       carray_delete_slow(manager->thread_pending, i);
625     }
626     else {
627       i ++;
628     }
629   }
630 }
631
632 #if 0
633 static void etpan_thread_manager_start(struct etpan_thread_manager * manager)
634 {
635   /* do nothing */
636 }
637 #endif
638
639 void etpan_thread_manager_stop(struct etpan_thread_manager * manager)
640 {
641   while (carray_count(manager->thread_pool) > 0) {
642     struct etpan_thread * thread;
643     
644     thread = carray_get(manager->thread_pool, 0);
645     etpan_thread_manager_terminate_thread(manager, thread);
646   }
647 }
648
649 static int etpan_thread_manager_is_stopped(struct etpan_thread_manager * manager)
650 {
651   return ((carray_count(manager->thread_pending) == 0) && 
652       (carray_count(manager->thread_pool) == 0));
653 }
654
655 void etpan_thread_manager_join(struct etpan_thread_manager * manager)
656 {
657   while (!etpan_thread_manager_is_stopped(manager)) {
658     etpan_thread_manager_loop(manager);
659   }
660 }
661 #endif