2008-09-01 [colin] 3.5.0cvs88
[claws.git] / src / etpan / etpan-thread-manager.c
1 /*
2  * Claws Mail -- a GTK+ based, lightweight, and fast e-mail client
3  * Copyright (C) 2005-2007 DINH Viet Hoa and the Claws Mail team
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 3 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program. If not, see <http://www.gnu.org/licenses/>.
17  * 
18  */
19
20 #ifdef HAVE_CONFIG_H
21 #  include "config.h"
22 #endif
23
24 #ifdef HAVE_LIBETPAN
25
26 #include "etpan-thread-manager.h"
27
28 #include <glib.h>
29 #include <stdlib.h>
30 #include <pthread.h>
31 #include <libetpan/mailsem.h>
32 #include <semaphore.h>
33 #include <unistd.h>
34 #include <fcntl.h>
35
36 #include "etpan-errors.h"
37
38 #define POOL_UNBOUND_MAX 4
39
40 #define POOL_INIT_SIZE 8
41 #define OP_INIT_SIZE 8
42
43 static int etpan_thread_start(struct etpan_thread * thread);
44 static void etpan_thread_free(struct etpan_thread * thread);
45 static unsigned int etpan_thread_get_load(struct etpan_thread * thread);
46 static int etpan_thread_is_bound(struct etpan_thread * thread);
47 static int etpan_thread_manager_is_stopped(struct etpan_thread_manager * manager);
48 static void etpan_thread_join(struct etpan_thread * thread);
49 static struct etpan_thread * etpan_thread_new(void);
50 static int etpan_thread_op_cancelled(struct etpan_thread_op * op);
51 static void etpan_thread_op_lock(struct etpan_thread_op * op);
52 static void etpan_thread_op_unlock(struct etpan_thread_op * op);
53 static void etpan_thread_stop(struct etpan_thread * thread);
54
55 #if 0
56 static void etpan_thread_bind(struct etpan_thread * thread);
57 static int etpan_thread_manager_op_schedule(struct etpan_thread_manager * manager,
58              struct etpan_thread_op * op);
59 static void etpan_thread_manager_start(struct etpan_thread_manager * manager);
60 static void etpan_thread_op_cancel(struct etpan_thread_op * op);
61 #endif
62
63 enum {
64   TERMINATE_STATE_NONE,
65   TERMINATE_STATE_REQUESTED,
66   TERMINATE_STATE_DONE,
67 };
68
69 struct etpan_thread_manager * etpan_thread_manager_new(void)
70 {
71   struct etpan_thread_manager * manager;
72   int r;
73   
74   manager = malloc(sizeof(* manager));
75   if (manager == NULL)
76     goto err;
77   
78   manager->thread_pool = carray_new(POOL_INIT_SIZE);
79   if (manager->thread_pool == NULL)
80     goto free;
81
82   manager->thread_pending = carray_new(POOL_INIT_SIZE);
83   if (manager->thread_pending == NULL)
84     goto free_pool;
85   
86   manager->can_create_thread = 1;
87   manager->unbound_count = 0;
88   
89   r = pipe(manager->notify_fds);
90   if (r < 0)
91     goto free_pending;
92   
93   return manager;
94   
95  free_pending:
96   carray_free(manager->thread_pending);
97  free_pool:
98   carray_free(manager->thread_pool);
99  free:
100   free(manager);
101  err:
102   return NULL;
103 }
104
105 void etpan_thread_manager_free(struct etpan_thread_manager * manager)
106 {
107   close(manager->notify_fds[1]);
108   close(manager->notify_fds[0]);
109   carray_free(manager->thread_pending);
110   carray_free(manager->thread_pool);
111   free(manager);
112 }
113
114 static struct etpan_thread * etpan_thread_new(void)
115 {
116   struct etpan_thread * thread;
117   int r;
118   
119   thread = malloc(sizeof(* thread));
120   if (thread == NULL)
121     goto err;
122   
123   r = pthread_mutex_init(&thread->lock, NULL);
124   if (r != 0)
125     goto free;
126
127   thread->op_list = carray_new(OP_INIT_SIZE);
128   if (thread->op_list == NULL)
129     goto destroy_lock;
130   
131   thread->op_done_list = carray_new(OP_INIT_SIZE);
132   if (thread->op_done_list == NULL)
133     goto free_op_list;
134   
135   thread->start_sem = mailsem_new();
136   if (thread->start_sem == NULL)
137     goto free_op_done_list;
138   
139   thread->stop_sem = mailsem_new();
140   if (thread->stop_sem == NULL)
141     goto free_startsem;
142   
143   thread->op_sem = mailsem_new();
144   if (thread->op_sem == NULL)
145     goto free_stopsem;
146   
147   thread->manager = NULL;
148   thread->bound_count = 0;
149   thread->terminate_state = TERMINATE_STATE_NONE;
150   
151   return thread;
152   
153  free_stopsem:
154   mailsem_free(thread->stop_sem);
155  free_startsem:
156   mailsem_free(thread->start_sem);
157  free_op_done_list:
158   carray_free(thread->op_done_list);
159  free_op_list:
160   carray_free(thread->op_list);
161  destroy_lock:
162   pthread_mutex_destroy(&thread->lock);
163  free:
164   free(thread);
165  err:
166   return NULL;
167 }
168
169 static void etpan_thread_free(struct etpan_thread * thread)
170 {
171   mailsem_free(thread->op_sem);
172   mailsem_free(thread->stop_sem);
173   mailsem_free(thread->start_sem);
174   carray_free(thread->op_done_list);
175   carray_free(thread->op_list);
176   pthread_mutex_destroy(&thread->lock);
177   free(thread);
178 }
179
180 struct etpan_thread_op * etpan_thread_op_new(void)
181 {
182   struct etpan_thread_op * op;
183   int r;
184   
185   op = malloc(sizeof(* op));
186   if (op == NULL)
187     goto err;
188   
189   op->thread = NULL;
190   op->run = NULL;
191   op->callback = NULL;
192   op->callback_data = NULL;
193   op->callback_called = 0;
194   op->cancellable = 0;
195   op->cancelled = 0;
196   op->param = NULL;
197   op->result = NULL;
198   op->finished = 0;
199   op->imap = NULL;
200   op->nntp = NULL;
201
202   r = pthread_mutex_init(&op->lock, NULL);
203   if (r != 0)
204     goto free;
205   
206   return op;
207   
208  free:
209   free(op);
210  err:
211   return NULL;
212 }
213
214 void etpan_thread_op_free(struct etpan_thread_op * op)
215 {
216   pthread_mutex_destroy(&op->lock);
217   free(op);
218 }
219
220 static struct etpan_thread *
221 etpan_thread_manager_create_thread(struct etpan_thread_manager * manager)
222 {
223   struct etpan_thread * thread;
224   int r;
225   
226   thread = etpan_thread_new();
227   if (thread == NULL)
228     goto err;
229   
230   thread->manager = manager;
231   
232   r = etpan_thread_start(thread);
233   if (r != NO_ERROR)
234     goto free_thread;
235   
236   r = carray_add(manager->thread_pool, thread, NULL);
237   if (r < 0) {
238     etpan_thread_stop(thread);
239     goto free_thread;
240   }
241   
242   return thread;
243   
244  free_thread:
245   etpan_thread_free(thread);
246  err:
247   return NULL;
248 }
249
250 static void
251 etpan_thread_manager_terminate_thread(struct etpan_thread_manager * manager,
252     struct etpan_thread * thread)
253 {
254   unsigned int i;
255   int r;
256   
257   for(i = 0 ; i < carray_count(manager->thread_pool) ; i ++) {
258     if (carray_get(manager->thread_pool, i) == thread) {
259       carray_delete(manager->thread_pool, i);
260       break;
261     }
262   }
263   
264   if (!etpan_thread_is_bound(thread))
265     manager->unbound_count --;
266   
267   r = carray_add(manager->thread_pending, thread, NULL);
268   if (r < 0) {
269     g_warning("complete failure of thread due to lack of memory (thread stop)");
270   }
271   
272   etpan_thread_stop(thread);
273 }
274
275 static void manager_notify(struct etpan_thread_manager * manager)
276 {
277   char ch;
278   ssize_t r;
279   
280   ch = 1;
281   r = write(manager->notify_fds[1], &ch, 1);
282 }
283
284 static void manager_ack(struct etpan_thread_manager * manager)
285 {
286   char ch;
287   ssize_t r;
288   
289   r = read(manager->notify_fds[0], &ch, 1);
290 }
291
292 static void thread_lock(struct etpan_thread * thread)
293 {
294   pthread_mutex_lock(&thread->lock);
295 }
296
297 static void thread_unlock(struct etpan_thread * thread)
298 {
299   pthread_mutex_unlock(&thread->lock);
300 }
301
302 static void thread_notify(struct etpan_thread * thread)
303 {
304   manager_notify(thread->manager);
305 }
306
307 static void * thread_run(void * data)
308 {
309   struct etpan_thread * thread;
310   int r;
311   
312   thread = data;
313   
314   mailsem_up(thread->start_sem);
315   
316   while (1) {
317     int do_quit;
318     struct etpan_thread_op * op;
319     
320     mailsem_down(thread->op_sem);
321     
322     do_quit = 0;
323     op = NULL;
324     thread_lock(thread);
325     if (carray_count(thread->op_list) > 0) {
326       op = carray_get(thread->op_list, 0);
327       carray_delete_slow(thread->op_list, 0);
328     }
329     else {
330       do_quit = 1;
331     }
332     thread_unlock(thread);
333     
334     if (do_quit) {
335       break;
336     }
337     
338     if (!etpan_thread_op_cancelled(op)) {
339       if (op->run != NULL)
340         op->run(op);
341     }
342     
343     thread_lock(thread);
344     r = carray_add(thread->op_done_list, op, NULL);
345     if (r < 0) {
346       g_warning("complete failure of thread due to lack of memory (op done)");
347     }
348     thread_unlock(thread);
349     
350     thread_notify(thread);
351   }
352   
353   thread_lock(thread);
354   thread->terminate_state = TERMINATE_STATE_DONE;
355   thread_unlock(thread);
356   
357   thread_notify(thread);
358   
359   mailsem_up(thread->stop_sem);
360   
361   return NULL;
362 }
363
364 static int etpan_thread_start(struct etpan_thread * thread)
365 {
366   int r;
367   
368   r = pthread_create(&thread->th_id, NULL, thread_run, thread);
369   if (r != 0)
370     return ERROR_MEMORY;
371   
372   mailsem_down(thread->start_sem);
373   
374   return NO_ERROR;
375 }
376
377 static void etpan_thread_stop(struct etpan_thread * thread)
378 {
379   thread_lock(thread);
380   thread->terminate_state = TERMINATE_STATE_REQUESTED;
381   thread_unlock(thread);
382   
383   mailsem_up(thread->op_sem);
384   
385   /* this thread will be joined in the manager loop */
386 }
387
388 static int etpan_thread_is_stopped(struct etpan_thread * thread)
389 {
390   int stopped;
391   
392   thread_lock(thread);
393   stopped = (thread->terminate_state == TERMINATE_STATE_DONE);
394   thread_unlock(thread);
395   
396   return stopped;
397 }
398
399 static void etpan_thread_join(struct etpan_thread * thread)
400 {
401   mailsem_down(thread->stop_sem);
402   pthread_join(thread->th_id, NULL);
403 }
404
405 struct etpan_thread *
406 etpan_thread_manager_get_thread(struct etpan_thread_manager * manager)
407 {
408   struct etpan_thread * chosen_thread;
409   unsigned int chosen_thread_load;
410   unsigned int i;
411   struct etpan_thread * thread;
412   
413   /* chose a thread */
414   
415   chosen_thread = NULL;
416   chosen_thread_load = 0;
417   
418   for(i = 0 ; i < carray_count(manager->thread_pool) ; i ++) {
419     thread = carray_get(manager->thread_pool, i);
420     if (etpan_thread_is_bound(thread))
421       continue;
422     
423     if (chosen_thread == NULL) {
424       chosen_thread = thread;
425       chosen_thread_load = etpan_thread_get_load(thread);
426       
427       if (chosen_thread_load == 0)
428         break;
429     }
430     else {
431       unsigned int load;
432       
433       load = etpan_thread_get_load(thread);
434       
435       if (load < chosen_thread_load) {
436         chosen_thread = thread;
437         chosen_thread_load = load;
438       }
439     }
440   }
441   
442   if (chosen_thread != NULL) {
443     if (manager->can_create_thread && (chosen_thread_load != 0)) {
444       chosen_thread = NULL;
445     }
446   }
447   
448   /* choice done */
449   
450   if (chosen_thread != NULL)
451     return chosen_thread;
452   
453   thread = etpan_thread_manager_create_thread(manager);
454   if (thread == NULL)
455     goto err;
456   
457   manager->unbound_count ++;
458   if (manager->unbound_count >= POOL_UNBOUND_MAX)
459     manager->can_create_thread = 0;
460   
461   return thread;
462   
463  err:
464   return NULL;
465 }
466
467 static unsigned int etpan_thread_get_load(struct etpan_thread * thread)
468 {
469   unsigned int load;
470   
471   thread_lock(thread);
472   load = carray_count(thread->op_list);
473   thread_unlock(thread);
474   
475   return load;
476 }
477
478 #if 0
479 static void etpan_thread_bind(struct etpan_thread * thread)
480 {
481   thread->bound_count ++;
482 }
483 #endif
484
485 void etpan_thread_unbind(struct etpan_thread * thread)
486 {
487   thread->bound_count --;
488 }
489
490 static int etpan_thread_is_bound(struct etpan_thread * thread)
491 {
492   return (thread->bound_count != 0);
493 }
494
495 int etpan_thread_op_schedule(struct etpan_thread * thread,
496                              struct etpan_thread_op * op)
497 {
498   int r;
499   
500   if (thread->terminate_state != TERMINATE_STATE_NONE)
501     return ERROR_INVAL;
502   
503   thread_lock(thread);
504   r = carray_add(thread->op_list, op, NULL);
505   thread_unlock(thread);
506   
507   if (r < 0)
508     return ERROR_MEMORY;
509   
510   op->thread = thread;
511   mailsem_up(thread->op_sem);
512   
513   return NO_ERROR;
514 }
515
516 static void etpan_thread_op_lock(struct etpan_thread_op * op)
517 {
518   pthread_mutex_lock(&op->lock);
519 }
520
521 static void etpan_thread_op_unlock(struct etpan_thread_op * op)
522 {
523   pthread_mutex_unlock(&op->lock);
524 }
525
526 static int etpan_thread_op_cancelled(struct etpan_thread_op * op)
527 {
528   int cancelled;
529   
530   cancelled = 0;
531   etpan_thread_op_lock(op);
532   if (op->cancellable)
533     cancelled = op->cancelled;
534   etpan_thread_op_unlock(op);
535   
536   return cancelled;
537 }
538
539 #if 0
540 static void etpan_thread_op_cancel(struct etpan_thread_op * op)
541
542   etpan_thread_op_lock(op);
543   if (op->cancelled) {
544     g_warning("cancelled twice");
545   }
546   op->cancelled = 1;
547   if ((op->callback != NULL) && (!op->callback_called)) {
548     op->callback(op->cancelled, op->result, op->callback_data);
549     op->callback_called = 1;
550   }
551   etpan_thread_op_unlock(op);
552 }
553 #endif
554
555 #if 0
556 static int etpan_thread_manager_op_schedule(struct etpan_thread_manager * manager,
557     struct etpan_thread_op * op)
558 {
559   struct etpan_thread * thread;
560   
561   thread = etpan_thread_manager_get_thread(manager);
562   
563   if (thread == NULL)
564     goto err;
565   
566   return etpan_thread_op_schedule(thread, op);
567   
568  err:
569   return ERROR_MEMORY;
570 }
571 #endif
572
573 int etpan_thread_manager_get_fd(struct etpan_thread_manager * manager)
574 {
575   return manager->notify_fds[0];
576 }
577
578 static void loop_thread_list(carray * op_to_notify,
579     carray * thread_list)
580 {
581   unsigned int i;
582   int r;
583   
584   for(i = 0 ; i < carray_count(thread_list) ; i ++) {
585     struct etpan_thread * thread;
586     unsigned int j;
587     
588     thread = carray_get(thread_list, i);
589     
590     thread_lock(thread);
591     
592     for(j = 0 ; j < carray_count(thread->op_done_list) ; j ++) {
593       struct etpan_thread_op * op;
594       
595       op = carray_get(thread->op_done_list, j);
596       r = carray_add(op_to_notify, op, NULL);
597       if (r < 0) {
598         g_warning("complete failure of thread due to lack of memory (callback)");
599         break;
600       }
601     }
602     carray_set_size(thread->op_done_list, 0);
603     
604     thread_unlock(thread);
605   }
606 }
607
608 void etpan_thread_manager_loop(struct etpan_thread_manager * manager)
609 {
610   carray * op_to_notify;
611   unsigned int i;
612   
613   manager_ack(manager);
614   
615   op_to_notify = carray_new(OP_INIT_SIZE);
616   
617   loop_thread_list(op_to_notify, manager->thread_pool);
618   loop_thread_list(op_to_notify, manager->thread_pending);
619   
620   for(i = 0 ; i < carray_count(op_to_notify) ; i ++) {
621     struct etpan_thread_op * op;
622     int cancelled;
623     
624     op = carray_get(op_to_notify, i);
625     
626     cancelled = etpan_thread_op_cancelled(op);
627     
628     etpan_thread_op_lock(op);
629     
630     if (!op->callback_called) {
631       if (op->callback != NULL)
632         op->callback(op->cancelled, op->result, op->callback_data);
633     }
634     
635     etpan_thread_op_unlock(op);
636     
637     if (op->cleanup != NULL)
638       op->cleanup(op);
639   }
640   
641   carray_free(op_to_notify);
642   
643   i = 0;
644   while (i < carray_count(manager->thread_pending)) {
645     struct etpan_thread * thread;
646     
647     thread = carray_get(manager->thread_pending, i);
648     
649     if (etpan_thread_is_stopped(thread)) {
650       etpan_thread_join(thread);
651       
652       etpan_thread_free(thread);
653       
654       carray_delete_slow(manager->thread_pending, i);
655     }
656     else {
657       i ++;
658     }
659   }
660 }
661
662 #if 0
663 static void etpan_thread_manager_start(struct etpan_thread_manager * manager)
664 {
665   /* do nothing */
666 }
667 #endif
668
669 void etpan_thread_manager_stop(struct etpan_thread_manager * manager)
670 {
671   while (carray_count(manager->thread_pool) > 0) {
672     struct etpan_thread * thread;
673     
674     thread = carray_get(manager->thread_pool, 0);
675     etpan_thread_manager_terminate_thread(manager, thread);
676   }
677 }
678
679 static int etpan_thread_manager_is_stopped(struct etpan_thread_manager * manager)
680 {
681   return ((carray_count(manager->thread_pending) == 0) && 
682       (carray_count(manager->thread_pool) == 0));
683 }
684
685 void etpan_thread_manager_join(struct etpan_thread_manager * manager)
686 {
687   while (!etpan_thread_manager_is_stopped(manager)) {
688     etpan_thread_manager_loop(manager);
689   }
690 }
691 #endif