7b8dce7bc98c7c71c9374053e2b4c6609878afd9
[claws.git] / src / etpan / etpan-thread-manager.c
1 /*
2  * Claws Mail -- a GTK+ based, lightweight, and fast e-mail client
3  * Copyright (C) 2005-2007 DINH Viet Hoa and the Claws Mail team
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 3 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program. If not, see <http://www.gnu.org/licenses/>.
17  * 
18  */
19
20 #ifdef HAVE_CONFIG_H
21 #  include "config.h"
22 #endif
23
24 #ifdef HAVE_LIBETPAN
25
26 #include "etpan-thread-manager.h"
27
28 #include <glib.h>
29 #include <stdlib.h>
30 #include <pthread.h>
31 #include <libetpan/mailsem.h>
32 #include <semaphore.h>
33 #include <unistd.h>
34
35 #include "etpan-errors.h"
36
37 #define POOL_UNBOUND_MAX 4
38
39 #define POOL_INIT_SIZE 8
40 #define OP_INIT_SIZE 8
41
42 static int etpan_thread_start(struct etpan_thread * thread);
43 static void etpan_thread_free(struct etpan_thread * thread);
44 static unsigned int etpan_thread_get_load(struct etpan_thread * thread);
45 static int etpan_thread_is_bound(struct etpan_thread * thread);
46 static int etpan_thread_manager_is_stopped(struct etpan_thread_manager * manager);
47 static void etpan_thread_join(struct etpan_thread * thread);
48 static struct etpan_thread * etpan_thread_new(void);
49 static int etpan_thread_op_cancelled(struct etpan_thread_op * op);
50 static void etpan_thread_op_lock(struct etpan_thread_op * op);
51 static void etpan_thread_op_unlock(struct etpan_thread_op * op);
52 static void etpan_thread_stop(struct etpan_thread * thread);
53
54 #if 0
55 static void etpan_thread_bind(struct etpan_thread * thread);
56 static int etpan_thread_manager_op_schedule(struct etpan_thread_manager * manager,
57              struct etpan_thread_op * op);
58 static void etpan_thread_manager_start(struct etpan_thread_manager * manager);
59 static void etpan_thread_op_cancel(struct etpan_thread_op * op);
60 #endif
61
62 enum {
63   TERMINATE_STATE_NONE,
64   TERMINATE_STATE_REQUESTED,
65   TERMINATE_STATE_DONE,
66 };
67
68 struct etpan_thread_manager * etpan_thread_manager_new(void)
69 {
70   struct etpan_thread_manager * manager;
71   int r;
72   
73   manager = malloc(sizeof(* manager));
74   if (manager == NULL)
75     goto err;
76   
77   manager->thread_pool = carray_new(POOL_INIT_SIZE);
78   if (manager->thread_pool == NULL)
79     goto free;
80
81   manager->thread_pending = carray_new(POOL_INIT_SIZE);
82   if (manager->thread_pending == NULL)
83     goto free_pool;
84   
85   manager->can_create_thread = 1;
86   manager->unbound_count = 0;
87   
88   r = pipe(manager->notify_fds);
89   if (r < 0)
90     goto free_pending;
91   
92   return manager;
93   
94  free_pending:
95   carray_free(manager->thread_pending);
96  free_pool:
97   carray_free(manager->thread_pool);
98  free:
99   free(manager);
100  err:
101   return NULL;
102 }
103
104 void etpan_thread_manager_free(struct etpan_thread_manager * manager)
105 {
106   close(manager->notify_fds[1]);
107   close(manager->notify_fds[0]);
108   carray_free(manager->thread_pending);
109   carray_free(manager->thread_pool);
110   free(manager);
111 }
112
113 static struct etpan_thread * etpan_thread_new(void)
114 {
115   struct etpan_thread * thread;
116   int r;
117   
118   thread = malloc(sizeof(* thread));
119   if (thread == NULL)
120     goto err;
121   
122   r = pthread_mutex_init(&thread->lock, NULL);
123   if (r != 0)
124     goto free;
125
126   thread->op_list = carray_new(OP_INIT_SIZE);
127   if (thread->op_list == NULL)
128     goto destroy_lock;
129   
130   thread->op_done_list = carray_new(OP_INIT_SIZE);
131   if (thread->op_done_list == NULL)
132     goto free_op_list;
133   
134   thread->start_sem = mailsem_new();
135   if (thread->start_sem == NULL)
136     goto free_op_done_list;
137   
138   thread->stop_sem = mailsem_new();
139   if (thread->stop_sem == NULL)
140     goto free_startsem;
141   
142   thread->op_sem = mailsem_new();
143   if (thread->op_sem == NULL)
144     goto free_stopsem;
145   
146   thread->manager = NULL;
147   thread->bound_count = 0;
148   thread->terminate_state = TERMINATE_STATE_NONE;
149   
150   return thread;
151   
152  free_stopsem:
153   mailsem_free(thread->stop_sem);
154  free_startsem:
155   mailsem_free(thread->start_sem);
156  free_op_done_list:
157   carray_free(thread->op_done_list);
158  free_op_list:
159   carray_free(thread->op_list);
160  destroy_lock:
161   pthread_mutex_destroy(&thread->lock);
162  free:
163   free(thread);
164  err:
165   return NULL;
166 }
167
168 static void etpan_thread_free(struct etpan_thread * thread)
169 {
170   mailsem_free(thread->op_sem);
171   mailsem_free(thread->stop_sem);
172   mailsem_free(thread->start_sem);
173   carray_free(thread->op_done_list);
174   carray_free(thread->op_list);
175   pthread_mutex_destroy(&thread->lock);
176   free(thread);
177 }
178
179 struct etpan_thread_op * etpan_thread_op_new(void)
180 {
181   struct etpan_thread_op * op;
182   int r;
183   
184   op = malloc(sizeof(* op));
185   if (op == NULL)
186     goto err;
187   
188   op->thread = NULL;
189   op->run = NULL;
190   op->callback = NULL;
191   op->callback_data = NULL;
192   op->callback_called = 0;
193   op->cancellable = 0;
194   op->cancelled = 0;
195   op->param = NULL;
196   op->result = NULL;
197   op->finished = 0;
198   op->imap = NULL;
199   op->nntp = NULL;
200
201   r = pthread_mutex_init(&op->lock, NULL);
202   if (r != 0)
203     goto free;
204   
205   return op;
206   
207  free:
208   free(op);
209  err:
210   return NULL;
211 }
212
213 void etpan_thread_op_free(struct etpan_thread_op * op)
214 {
215   pthread_mutex_destroy(&op->lock);
216   free(op);
217 }
218
219 static struct etpan_thread *
220 etpan_thread_manager_create_thread(struct etpan_thread_manager * manager)
221 {
222   struct etpan_thread * thread;
223   int r;
224   
225   thread = etpan_thread_new();
226   if (thread == NULL)
227     goto err;
228   
229   thread->manager = manager;
230   
231   r = etpan_thread_start(thread);
232   if (r != NO_ERROR)
233     goto free_thread;
234   
235   r = carray_add(manager->thread_pool, thread, NULL);
236   if (r < 0) {
237     etpan_thread_stop(thread);
238     goto free_thread;
239   }
240   
241   return thread;
242   
243  free_thread:
244   etpan_thread_free(thread);
245  err:
246   return NULL;
247 }
248
249 static void
250 etpan_thread_manager_terminate_thread(struct etpan_thread_manager * manager,
251     struct etpan_thread * thread)
252 {
253   unsigned int i;
254   int r;
255   
256   for(i = 0 ; i < carray_count(manager->thread_pool) ; i ++) {
257     if (carray_get(manager->thread_pool, i) == thread) {
258       carray_delete(manager->thread_pool, i);
259       break;
260     }
261   }
262   
263   if (!etpan_thread_is_bound(thread))
264     manager->unbound_count --;
265   
266   r = carray_add(manager->thread_pending, thread, NULL);
267   if (r < 0) {
268     g_warning("complete failure of thread due to lack of memory (thread stop)");
269   }
270   
271   etpan_thread_stop(thread);
272 }
273
274 static void manager_notify(struct etpan_thread_manager * manager)
275 {
276   char ch;
277   ssize_t r;
278   
279   ch = 1;
280   r = write(manager->notify_fds[1], &ch, 1);
281 }
282
283 static void manager_ack(struct etpan_thread_manager * manager)
284 {
285   char ch;
286   ssize_t r;
287   
288   r = read(manager->notify_fds[0], &ch, 1);
289 }
290
291 static void thread_lock(struct etpan_thread * thread)
292 {
293   pthread_mutex_lock(&thread->lock);
294 }
295
296 static void thread_unlock(struct etpan_thread * thread)
297 {
298   pthread_mutex_unlock(&thread->lock);
299 }
300
301 static void thread_notify(struct etpan_thread * thread)
302 {
303   manager_notify(thread->manager);
304 }
305
306 static void * thread_run(void * data)
307 {
308   struct etpan_thread * thread;
309   int r;
310   
311   thread = data;
312   
313   mailsem_up(thread->start_sem);
314   
315   while (1) {
316     int do_quit;
317     struct etpan_thread_op * op;
318     
319     mailsem_down(thread->op_sem);
320     
321     do_quit = 0;
322     op = NULL;
323     thread_lock(thread);
324     if (carray_count(thread->op_list) > 0) {
325       op = carray_get(thread->op_list, 0);
326       carray_delete_slow(thread->op_list, 0);
327     }
328     else {
329       do_quit = 1;
330     }
331     thread_unlock(thread);
332     
333     if (do_quit) {
334       break;
335     }
336     
337     if (!etpan_thread_op_cancelled(op)) {
338       if (op->run != NULL)
339         op->run(op);
340     }
341     
342     thread_lock(thread);
343     r = carray_add(thread->op_done_list, op, NULL);
344     if (r < 0) {
345       g_warning("complete failure of thread due to lack of memory (op done)");
346     }
347     thread_unlock(thread);
348     
349     thread_notify(thread);
350   }
351   
352   thread_lock(thread);
353   thread->terminate_state = TERMINATE_STATE_DONE;
354   thread_unlock(thread);
355   
356   thread_notify(thread);
357   
358   mailsem_up(thread->stop_sem);
359   
360   return NULL;
361 }
362
363 static int etpan_thread_start(struct etpan_thread * thread)
364 {
365   int r;
366   
367   r = pthread_create(&thread->th_id, NULL, thread_run, thread);
368   if (r != 0)
369     return ERROR_MEMORY;
370   
371   mailsem_down(thread->start_sem);
372   
373   return NO_ERROR;
374 }
375
376 static void etpan_thread_stop(struct etpan_thread * thread)
377 {
378   thread_lock(thread);
379   thread->terminate_state = TERMINATE_STATE_REQUESTED;
380   thread_unlock(thread);
381   
382   mailsem_up(thread->op_sem);
383   
384   /* this thread will be joined in the manager loop */
385 }
386
387 static int etpan_thread_is_stopped(struct etpan_thread * thread)
388 {
389   int stopped;
390   
391   thread_lock(thread);
392   stopped = (thread->terminate_state == TERMINATE_STATE_DONE);
393   thread_unlock(thread);
394   
395   return stopped;
396 }
397
398 static void etpan_thread_join(struct etpan_thread * thread)
399 {
400   mailsem_down(thread->stop_sem);
401   pthread_join(thread->th_id, NULL);
402 }
403
404 struct etpan_thread *
405 etpan_thread_manager_get_thread(struct etpan_thread_manager * manager)
406 {
407   struct etpan_thread * chosen_thread;
408   unsigned int chosen_thread_load;
409   unsigned int i;
410   struct etpan_thread * thread;
411   
412   /* chose a thread */
413   
414   chosen_thread = NULL;
415   chosen_thread_load = 0;
416   
417   for(i = 0 ; i < carray_count(manager->thread_pool) ; i ++) {
418     thread = carray_get(manager->thread_pool, i);
419     if (etpan_thread_is_bound(thread))
420       continue;
421     
422     if (chosen_thread == NULL) {
423       chosen_thread = thread;
424       chosen_thread_load = etpan_thread_get_load(thread);
425       
426       if (chosen_thread_load == 0)
427         break;
428     }
429     else {
430       unsigned int load;
431       
432       load = etpan_thread_get_load(thread);
433       
434       if (load < chosen_thread_load) {
435         chosen_thread = thread;
436         chosen_thread_load = load;
437       }
438     }
439   }
440   
441   if (chosen_thread != NULL) {
442     if (manager->can_create_thread && (chosen_thread_load != 0)) {
443       chosen_thread = NULL;
444     }
445   }
446   
447   /* choice done */
448   
449   if (chosen_thread != NULL)
450     return chosen_thread;
451   
452   thread = etpan_thread_manager_create_thread(manager);
453   if (thread == NULL)
454     goto err;
455   
456   manager->unbound_count ++;
457   if (manager->unbound_count >= POOL_UNBOUND_MAX)
458     manager->can_create_thread = 0;
459   
460   return thread;
461   
462  err:
463   return NULL;
464 }
465
466 static unsigned int etpan_thread_get_load(struct etpan_thread * thread)
467 {
468   unsigned int load;
469   
470   thread_lock(thread);
471   load = carray_count(thread->op_list);
472   thread_unlock(thread);
473   
474   return load;
475 }
476
477 #if 0
478 static void etpan_thread_bind(struct etpan_thread * thread)
479 {
480   thread->bound_count ++;
481 }
482 #endif
483
484 void etpan_thread_unbind(struct etpan_thread * thread)
485 {
486   thread->bound_count --;
487 }
488
489 static int etpan_thread_is_bound(struct etpan_thread * thread)
490 {
491   return (thread->bound_count != 0);
492 }
493
494 int etpan_thread_op_schedule(struct etpan_thread * thread,
495                              struct etpan_thread_op * op)
496 {
497   int r;
498   
499   if (thread->terminate_state != TERMINATE_STATE_NONE)
500     return ERROR_INVAL;
501   
502   thread_lock(thread);
503   r = carray_add(thread->op_list, op, NULL);
504   thread_unlock(thread);
505   
506   if (r < 0)
507     return ERROR_MEMORY;
508   
509   op->thread = thread;
510   mailsem_up(thread->op_sem);
511   
512   return NO_ERROR;
513 }
514
515 static void etpan_thread_op_lock(struct etpan_thread_op * op)
516 {
517   pthread_mutex_lock(&op->lock);
518 }
519
520 static void etpan_thread_op_unlock(struct etpan_thread_op * op)
521 {
522   pthread_mutex_unlock(&op->lock);
523 }
524
525 static int etpan_thread_op_cancelled(struct etpan_thread_op * op)
526 {
527   int cancelled;
528   
529   cancelled = 0;
530   etpan_thread_op_lock(op);
531   if (op->cancellable)
532     cancelled = op->cancelled;
533   etpan_thread_op_unlock(op);
534   
535   return cancelled;
536 }
537
538 #if 0
539 static void etpan_thread_op_cancel(struct etpan_thread_op * op)
540
541   etpan_thread_op_lock(op);
542   if (op->cancelled) {
543     g_warning("cancelled twice");
544   }
545   op->cancelled = 1;
546   if ((op->callback != NULL) && (!op->callback_called)) {
547     op->callback(op->cancelled, op->result, op->callback_data);
548     op->callback_called = 1;
549   }
550   etpan_thread_op_unlock(op);
551 }
552 #endif
553
554 #if 0
555 static int etpan_thread_manager_op_schedule(struct etpan_thread_manager * manager,
556     struct etpan_thread_op * op)
557 {
558   struct etpan_thread * thread;
559   
560   thread = etpan_thread_manager_get_thread(manager);
561   
562   if (thread == NULL)
563     goto err;
564   
565   return etpan_thread_op_schedule(thread, op);
566   
567  err:
568   return ERROR_MEMORY;
569 }
570 #endif
571
572 int etpan_thread_manager_get_fd(struct etpan_thread_manager * manager)
573 {
574   return manager->notify_fds[0];
575 }
576
577 static void loop_thread_list(carray * op_to_notify,
578     carray * thread_list)
579 {
580   unsigned int i;
581   int r;
582   
583   for(i = 0 ; i < carray_count(thread_list) ; i ++) {
584     struct etpan_thread * thread;
585     unsigned int j;
586     
587     thread = carray_get(thread_list, i);
588     
589     thread_lock(thread);
590     
591     for(j = 0 ; j < carray_count(thread->op_done_list) ; j ++) {
592       struct etpan_thread_op * op;
593       
594       op = carray_get(thread->op_done_list, j);
595       r = carray_add(op_to_notify, op, NULL);
596       if (r < 0) {
597         g_warning("complete failure of thread due to lack of memory (callback)");
598         break;
599       }
600     }
601     carray_set_size(thread->op_done_list, 0);
602     
603     thread_unlock(thread);
604   }
605 }
606
607 void etpan_thread_manager_loop(struct etpan_thread_manager * manager)
608 {
609   carray * op_to_notify;
610   unsigned int i;
611   
612   manager_ack(manager);
613   
614   op_to_notify = carray_new(OP_INIT_SIZE);
615   
616   loop_thread_list(op_to_notify, manager->thread_pool);
617   loop_thread_list(op_to_notify, manager->thread_pending);
618   
619   for(i = 0 ; i < carray_count(op_to_notify) ; i ++) {
620     struct etpan_thread_op * op;
621     int cancelled;
622     
623     op = carray_get(op_to_notify, i);
624     
625     cancelled = etpan_thread_op_cancelled(op);
626     
627     etpan_thread_op_lock(op);
628     
629     if (!op->callback_called) {
630       if (op->callback != NULL)
631         op->callback(op->cancelled, op->result, op->callback_data);
632     }
633     
634     etpan_thread_op_unlock(op);
635     
636     if (op->cleanup != NULL)
637       op->cleanup(op);
638   }
639   
640   carray_free(op_to_notify);
641   
642   i = 0;
643   while (i < carray_count(manager->thread_pending)) {
644     struct etpan_thread * thread;
645     
646     thread = carray_get(manager->thread_pending, i);
647     
648     if (etpan_thread_is_stopped(thread)) {
649       etpan_thread_join(thread);
650       
651       etpan_thread_free(thread);
652       
653       carray_delete_slow(manager->thread_pending, i);
654     }
655     else {
656       i ++;
657     }
658   }
659 }
660
661 #if 0
662 static void etpan_thread_manager_start(struct etpan_thread_manager * manager)
663 {
664   /* do nothing */
665 }
666 #endif
667
668 void etpan_thread_manager_stop(struct etpan_thread_manager * manager)
669 {
670   while (carray_count(manager->thread_pool) > 0) {
671     struct etpan_thread * thread;
672     
673     thread = carray_get(manager->thread_pool, 0);
674     etpan_thread_manager_terminate_thread(manager, thread);
675   }
676 }
677
678 static int etpan_thread_manager_is_stopped(struct etpan_thread_manager * manager)
679 {
680   return ((carray_count(manager->thread_pending) == 0) && 
681       (carray_count(manager->thread_pool) == 0));
682 }
683
684 void etpan_thread_manager_join(struct etpan_thread_manager * manager)
685 {
686   while (!etpan_thread_manager_is_stopped(manager)) {
687     etpan_thread_manager_loop(manager);
688   }
689 }
690 #endif