2007-01-21 [colin] 2.7.1cvs44
[claws.git] / src / etpan / etpan-thread-manager.c
1 /*
2  * Claws Mail -- a GTK+ based, lightweight, and fast e-mail client
3  * Copyright (C) 2005-2007 DINH Viet Hoa and the Claws Mail team
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 2 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18  */
19
20 #ifdef HAVE_CONFIG_H
21 #  include "config.h"
22 #endif
23
24 #ifdef HAVE_LIBETPAN
25
26 #include "etpan-thread-manager.h"
27
28 #include <glib.h>
29 #include <stdlib.h>
30 #include <pthread.h>
31 #include <libetpan/mailsem.h>
32 #include <semaphore.h>
33 #include <unistd.h>
34
35 #include "etpan-errors.h"
36
37 #define POOL_UNBOUND_MAX 4
38
39 #define POOL_INIT_SIZE 8
40 #define OP_INIT_SIZE 8
41
42 static void etpan_thread_free(struct etpan_thread * thread);
43 static unsigned int etpan_thread_get_load(struct etpan_thread * thread);
44 static int etpan_thread_is_bound(struct etpan_thread * thread);
45 static int etpan_thread_manager_is_stopped(struct etpan_thread_manager * manager);
46 static void etpan_thread_join(struct etpan_thread * thread);
47 static struct etpan_thread * etpan_thread_new(void);
48 static int etpan_thread_op_cancelled(struct etpan_thread_op * op);
49 static void etpan_thread_op_lock(struct etpan_thread_op * op);
50 static void etpan_thread_op_unlock(struct etpan_thread_op * op);
51 static void etpan_thread_stop(struct etpan_thread * thread);
52
53 #if 0
54 static void etpan_thread_bind(struct etpan_thread * thread);
55 static int etpan_thread_manager_op_schedule(struct etpan_thread_manager * manager,
56              struct etpan_thread_op * op);
57 static void etpan_thread_manager_start(struct etpan_thread_manager * manager);
58 static void etpan_thread_op_cancel(struct etpan_thread_op * op);
59 #endif
60
61 enum {
62   TERMINATE_STATE_NONE,
63   TERMINATE_STATE_REQUESTED,
64   TERMINATE_STATE_DONE,
65 };
66
67 struct etpan_thread_manager * etpan_thread_manager_new(void)
68 {
69   struct etpan_thread_manager * manager;
70   int r;
71   
72   manager = malloc(sizeof(* manager));
73   if (manager == NULL)
74     goto err;
75   
76   manager->thread_pool = carray_new(POOL_INIT_SIZE);
77   if (manager->thread_pool == NULL)
78     goto free;
79
80   manager->thread_pending = carray_new(POOL_INIT_SIZE);
81   if (manager->thread_pending == NULL)
82     goto free_pool;
83   
84   manager->can_create_thread = 1;
85   manager->unbound_count = 0;
86   
87   r = pipe(manager->notify_fds);
88   if (r < 0)
89     goto free_pending;
90   
91   return manager;
92   
93  free_pending:
94   carray_free(manager->thread_pending);
95  free_pool:
96   carray_free(manager->thread_pool);
97  free:
98   free(manager);
99  err:
100   return NULL;
101 }
102
103 void etpan_thread_manager_free(struct etpan_thread_manager * manager)
104 {
105   close(manager->notify_fds[1]);
106   close(manager->notify_fds[0]);
107   carray_free(manager->thread_pending);
108   carray_free(manager->thread_pool);
109   free(manager);
110 }
111
112 static struct etpan_thread * etpan_thread_new(void)
113 {
114   struct etpan_thread * thread;
115   int r;
116   
117   thread = malloc(sizeof(* thread));
118   if (thread == NULL)
119     goto err;
120   
121   r = pthread_mutex_init(&thread->lock, NULL);
122   if (r != 0)
123     goto free;
124
125   thread->op_list = carray_new(OP_INIT_SIZE);
126   if (thread->op_list == NULL)
127     goto destroy_lock;
128   
129   thread->op_done_list = carray_new(OP_INIT_SIZE);
130   if (thread->op_done_list == NULL)
131     goto free_op_list;
132   
133   thread->start_sem = mailsem_new();
134   if (thread->start_sem == NULL)
135     goto free_op_done_list;
136   
137   thread->stop_sem = mailsem_new();
138   if (thread->stop_sem == NULL)
139     goto free_startsem;
140   
141   thread->op_sem = mailsem_new();
142   if (thread->op_sem == NULL)
143     goto free_stopsem;
144   
145   thread->manager = NULL;
146   thread->bound_count = 0;
147   thread->terminate_state = TERMINATE_STATE_NONE;
148   
149   return thread;
150   
151  free_stopsem:
152   mailsem_free(thread->stop_sem);
153  free_startsem:
154   mailsem_free(thread->start_sem);
155  free_op_done_list:
156   carray_free(thread->op_done_list);
157  free_op_list:
158   carray_free(thread->op_list);
159  destroy_lock:
160   pthread_mutex_destroy(&thread->lock);
161  free:
162   free(thread);
163  err:
164   return NULL;
165 }
166
167 static void etpan_thread_free(struct etpan_thread * thread)
168 {
169   mailsem_free(thread->op_sem);
170   mailsem_free(thread->stop_sem);
171   mailsem_free(thread->start_sem);
172   carray_free(thread->op_done_list);
173   carray_free(thread->op_list);
174   pthread_mutex_destroy(&thread->lock);
175   free(thread);
176 }
177
178 struct etpan_thread_op * etpan_thread_op_new(void)
179 {
180   struct etpan_thread_op * op;
181   int r;
182   
183   op = malloc(sizeof(* op));
184   if (op == NULL)
185     goto err;
186   
187   op->thread = NULL;
188   op->run = NULL;
189   op->callback = NULL;
190   op->callback_data = NULL;
191   op->callback_called = 0;
192   op->cancellable = 0;
193   op->cancelled = 0;
194   op->param = NULL;
195   op->result = NULL;
196   
197   r = pthread_mutex_init(&op->lock, NULL);
198   if (r != 0)
199     goto free;
200   
201   return op;
202   
203  free:
204   free(op);
205  err:
206   return NULL;
207 }
208
209 void etpan_thread_op_free(struct etpan_thread_op * op)
210 {
211   pthread_mutex_destroy(&op->lock);
212   free(op);
213 }
214
215 static struct etpan_thread *
216 etpan_thread_manager_create_thread(struct etpan_thread_manager * manager)
217 {
218   struct etpan_thread * thread;
219   int r;
220   
221   thread = etpan_thread_new();
222   if (thread == NULL)
223     goto err;
224   
225   thread->manager = manager;
226   
227   r = etpan_thread_start(thread);
228   if (r != NO_ERROR)
229     goto free_thread;
230   
231   r = carray_add(manager->thread_pool, thread, NULL);
232   if (r < 0) {
233     etpan_thread_stop(thread);
234     goto free_thread;
235   }
236   
237   return thread;
238   
239  free_thread:
240   etpan_thread_free(thread);
241  err:
242   return NULL;
243 }
244
245 static void
246 etpan_thread_manager_terminate_thread(struct etpan_thread_manager * manager,
247     struct etpan_thread * thread)
248 {
249   unsigned int i;
250   int r;
251   
252   for(i = 0 ; i < carray_count(manager->thread_pool) ; i ++) {
253     if (carray_get(manager->thread_pool, i) == thread) {
254       carray_delete(manager->thread_pool, i);
255       break;
256     }
257   }
258   
259   if (!etpan_thread_is_bound(thread))
260     manager->unbound_count --;
261   
262   r = carray_add(manager->thread_pending, thread, NULL);
263   if (r < 0) {
264     g_warning("complete failure of thread due to lack of memory (thread stop)");
265   }
266   
267   etpan_thread_stop(thread);
268 }
269
270 static void manager_notify(struct etpan_thread_manager * manager)
271 {
272   char ch;
273   ssize_t r;
274   
275   ch = 1;
276   r = write(manager->notify_fds[1], &ch, 1);
277 }
278
279 static void manager_ack(struct etpan_thread_manager * manager)
280 {
281   char ch;
282   ssize_t r;
283   
284   r = read(manager->notify_fds[0], &ch, 1);
285 }
286
287 static void thread_lock(struct etpan_thread * thread)
288 {
289   pthread_mutex_lock(&thread->lock);
290 }
291
292 static void thread_unlock(struct etpan_thread * thread)
293 {
294   pthread_mutex_unlock(&thread->lock);
295 }
296
297 static void thread_notify(struct etpan_thread * thread)
298 {
299   manager_notify(thread->manager);
300 }
301
302 static void * thread_run(void * data)
303 {
304   struct etpan_thread * thread;
305   int r;
306   
307   thread = data;
308   
309   mailsem_up(thread->start_sem);
310   
311   while (1) {
312     int do_quit;
313     struct etpan_thread_op * op;
314     
315     mailsem_down(thread->op_sem);
316     
317     do_quit = 0;
318     op = NULL;
319     thread_lock(thread);
320     if (carray_count(thread->op_list) > 0) {
321       op = carray_get(thread->op_list, 0);
322       carray_delete_slow(thread->op_list, 0);
323     }
324     else {
325       do_quit = 1;
326     }
327     thread_unlock(thread);
328     
329     if (do_quit) {
330       break;
331     }
332     
333     if (!etpan_thread_op_cancelled(op)) {
334       if (op->run != NULL)
335         op->run(op);
336     }
337     
338     thread_lock(thread);
339     r = carray_add(thread->op_done_list, op, NULL);
340     if (r < 0) {
341       g_warning("complete failure of thread due to lack of memory (op done)");
342     }
343     thread_unlock(thread);
344     
345     thread_notify(thread);
346   }
347   
348   thread_lock(thread);
349   thread->terminate_state = TERMINATE_STATE_DONE;
350   thread_unlock(thread);
351   
352   thread_notify(thread);
353   
354   mailsem_up(thread->stop_sem);
355   
356   return NULL;
357 }
358
359 int etpan_thread_start(struct etpan_thread * thread)
360 {
361   int r;
362   
363   r = pthread_create(&thread->th_id, NULL, thread_run, thread);
364   if (r != 0)
365     return ERROR_MEMORY;
366   
367   mailsem_down(thread->start_sem);
368   
369   return NO_ERROR;
370 }
371
372 static void etpan_thread_stop(struct etpan_thread * thread)
373 {
374   thread_lock(thread);
375   thread->terminate_state = TERMINATE_STATE_REQUESTED;
376   thread_unlock(thread);
377   
378   mailsem_up(thread->op_sem);
379   
380   /* this thread will be joined in the manager loop */
381 }
382
383 int etpan_thread_is_stopped(struct etpan_thread * thread)
384 {
385   int stopped;
386   
387   thread_lock(thread);
388   stopped = (thread->terminate_state == TERMINATE_STATE_DONE);
389   thread_unlock(thread);
390   
391   return stopped;
392 }
393
394 static void etpan_thread_join(struct etpan_thread * thread)
395 {
396   mailsem_down(thread->stop_sem);
397   pthread_join(thread->th_id, NULL);
398 }
399
400 struct etpan_thread *
401 etpan_thread_manager_get_thread(struct etpan_thread_manager * manager)
402 {
403   struct etpan_thread * chosen_thread;
404   unsigned int chosen_thread_load;
405   unsigned int i;
406   struct etpan_thread * thread;
407   
408   /* chose a thread */
409   
410   chosen_thread = NULL;
411   chosen_thread_load = 0;
412   
413   for(i = 0 ; i < carray_count(manager->thread_pool) ; i ++) {
414     thread = carray_get(manager->thread_pool, i);
415     if (etpan_thread_is_bound(thread))
416       continue;
417     
418     if (chosen_thread == NULL) {
419       chosen_thread = thread;
420       chosen_thread_load = etpan_thread_get_load(thread);
421       
422       if (chosen_thread_load == 0)
423         break;
424     }
425     else {
426       unsigned int load;
427       
428       load = etpan_thread_get_load(thread);
429       
430       if (load < chosen_thread_load) {
431         chosen_thread = thread;
432         chosen_thread_load = load;
433       }
434     }
435   }
436   
437   if (chosen_thread != NULL) {
438     if (manager->can_create_thread && (chosen_thread_load != 0)) {
439       chosen_thread = NULL;
440     }
441   }
442   
443   /* choice done */
444   
445   if (chosen_thread != NULL)
446     return chosen_thread;
447   
448   thread = etpan_thread_manager_create_thread(manager);
449   if (thread == NULL)
450     goto err;
451   
452   manager->unbound_count ++;
453   if (manager->unbound_count >= POOL_UNBOUND_MAX)
454     manager->can_create_thread = 0;
455   
456   return thread;
457   
458  err:
459   return NULL;
460 }
461
462 static unsigned int etpan_thread_get_load(struct etpan_thread * thread)
463 {
464   unsigned int load;
465   
466   thread_lock(thread);
467   load = carray_count(thread->op_list);
468   thread_unlock(thread);
469   
470   return load;
471 }
472
473 #if 0
474 static void etpan_thread_bind(struct etpan_thread * thread)
475 {
476   thread->bound_count ++;
477 }
478 #endif
479
480 void etpan_thread_unbind(struct etpan_thread * thread)
481 {
482   thread->bound_count --;
483 }
484
485 static int etpan_thread_is_bound(struct etpan_thread * thread)
486 {
487   return (thread->bound_count != 0);
488 }
489
490 int etpan_thread_op_schedule(struct etpan_thread * thread,
491                              struct etpan_thread_op * op)
492 {
493   int r;
494   
495   if (thread->terminate_state != TERMINATE_STATE_NONE)
496     return ERROR_INVAL;
497   
498   thread_lock(thread);
499   r = carray_add(thread->op_list, op, NULL);
500   thread_unlock(thread);
501   
502   if (r < 0)
503     return ERROR_MEMORY;
504   
505   op->thread = thread;
506   mailsem_up(thread->op_sem);
507   
508   return NO_ERROR;
509 }
510
511 static void etpan_thread_op_lock(struct etpan_thread_op * op)
512 {
513   pthread_mutex_lock(&op->lock);
514 }
515
516 static void etpan_thread_op_unlock(struct etpan_thread_op * op)
517 {
518   pthread_mutex_unlock(&op->lock);
519 }
520
521 static int etpan_thread_op_cancelled(struct etpan_thread_op * op)
522 {
523   int cancelled;
524   
525   cancelled = 0;
526   etpan_thread_op_lock(op);
527   if (op->cancellable)
528     cancelled = op->cancelled;
529   etpan_thread_op_unlock(op);
530   
531   return cancelled;
532 }
533
534 #if 0
535 static void etpan_thread_op_cancel(struct etpan_thread_op * op)
536
537   etpan_thread_op_lock(op);
538   if (op->cancelled) {
539     g_warning("cancelled twice");
540   }
541   op->cancelled = 1;
542   if ((op->callback != NULL) && (!op->callback_called)) {
543     op->callback(op->cancelled, op->result, op->callback_data);
544     op->callback_called = 1;
545   }
546   etpan_thread_op_unlock(op);
547 }
548 #endif
549
550 #if 0
551 static int etpan_thread_manager_op_schedule(struct etpan_thread_manager * manager,
552     struct etpan_thread_op * op)
553 {
554   struct etpan_thread * thread;
555   
556   thread = etpan_thread_manager_get_thread(manager);
557   
558   if (thread == NULL)
559     goto err;
560   
561   return etpan_thread_op_schedule(thread, op);
562   
563  err:
564   return ERROR_MEMORY;
565 }
566 #endif
567
568 int etpan_thread_manager_get_fd(struct etpan_thread_manager * manager)
569 {
570   return manager->notify_fds[0];
571 }
572
573 static void loop_thread_list(carray * op_to_notify,
574     carray * thread_list)
575 {
576   unsigned int i;
577   int r;
578   
579   for(i = 0 ; i < carray_count(thread_list) ; i ++) {
580     struct etpan_thread * thread;
581     unsigned int j;
582     
583     thread = carray_get(thread_list, i);
584     
585     thread_lock(thread);
586     
587     for(j = 0 ; j < carray_count(thread->op_done_list) ; j ++) {
588       struct etpan_thread_op * op;
589       
590       op = carray_get(thread->op_done_list, j);
591       r = carray_add(op_to_notify, op, NULL);
592       if (r < 0) {
593         g_warning("complete failure of thread due to lack of memory (callback)");
594         break;
595       }
596     }
597     carray_set_size(thread->op_done_list, 0);
598     
599     thread_unlock(thread);
600   }
601 }
602
603 void etpan_thread_manager_loop(struct etpan_thread_manager * manager)
604 {
605   carray * op_to_notify;
606   unsigned int i;
607   
608   manager_ack(manager);
609   
610   op_to_notify = carray_new(OP_INIT_SIZE);
611   
612   loop_thread_list(op_to_notify, manager->thread_pool);
613   loop_thread_list(op_to_notify, manager->thread_pending);
614   
615   for(i = 0 ; i < carray_count(op_to_notify) ; i ++) {
616     struct etpan_thread_op * op;
617     int cancelled;
618     
619     op = carray_get(op_to_notify, i);
620     
621     cancelled = etpan_thread_op_cancelled(op);
622     
623     etpan_thread_op_lock(op);
624     
625     if (!op->callback_called) {
626       if (op->callback != NULL)
627         op->callback(op->cancelled, op->result, op->callback_data);
628     }
629     
630     etpan_thread_op_unlock(op);
631     
632     if (op->cleanup != NULL)
633       op->cleanup(op);
634   }
635   
636   carray_free(op_to_notify);
637   
638   i = 0;
639   while (i < carray_count(manager->thread_pending)) {
640     struct etpan_thread * thread;
641     
642     thread = carray_get(manager->thread_pending, i);
643     
644     if (etpan_thread_is_stopped(thread)) {
645       etpan_thread_join(thread);
646       
647       etpan_thread_free(thread);
648       
649       carray_delete_slow(manager->thread_pending, i);
650     }
651     else {
652       i ++;
653     }
654   }
655 }
656
657 #if 0
658 static void etpan_thread_manager_start(struct etpan_thread_manager * manager)
659 {
660   /* do nothing */
661 }
662 #endif
663
664 void etpan_thread_manager_stop(struct etpan_thread_manager * manager)
665 {
666   while (carray_count(manager->thread_pool) > 0) {
667     struct etpan_thread * thread;
668     
669     thread = carray_get(manager->thread_pool, 0);
670     etpan_thread_manager_terminate_thread(manager, thread);
671   }
672 }
673
674 static int etpan_thread_manager_is_stopped(struct etpan_thread_manager * manager)
675 {
676   return ((carray_count(manager->thread_pending) == 0) && 
677       (carray_count(manager->thread_pool) == 0));
678 }
679
680 void etpan_thread_manager_join(struct etpan_thread_manager * manager)
681 {
682   while (!etpan_thread_manager_is_stopped(manager)) {
683     etpan_thread_manager_loop(manager);
684   }
685 }
686 #endif