f72e8c7597470044f6a2694be725b2b616510c71
[claws.git] / src / etpan / etpan-thread-manager.c
1 /*
2  * Claws Mail -- a GTK+ based, lightweight, and fast e-mail client
3  * Copyright (C) 2005-2012 DINH Viet Hoa and the Claws Mail team
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 3 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program. If not, see <http://www.gnu.org/licenses/>.
17  * 
18  */
19
20 #ifdef HAVE_CONFIG_H
21 #  include "config.h"
22 #include "claws-features.h"
23 #endif
24
25 #ifdef HAVE_LIBETPAN
26
27 #include "etpan-thread-manager.h"
28
29 #include <glib.h>
30 #include <stdlib.h>
31 #include <pthread.h>
32 #include <libetpan/mailsem.h>
33 #include <semaphore.h>
34 #include <unistd.h>
35 #include <fcntl.h>
36
37 #include "etpan-errors.h"
38 #include "utils.h"
39
40 #define POOL_UNBOUND_MAX 4
41
42 #define POOL_INIT_SIZE 8
43 #define OP_INIT_SIZE 8
44
45 static int etpan_thread_start(struct etpan_thread * thread);
46 static void etpan_thread_free(struct etpan_thread * thread);
47 static unsigned int etpan_thread_get_load(struct etpan_thread * thread);
48 static int etpan_thread_is_bound(struct etpan_thread * thread);
49 static int etpan_thread_manager_is_stopped(struct etpan_thread_manager * manager);
50 static void etpan_thread_join(struct etpan_thread * thread);
51 static struct etpan_thread * etpan_thread_new(void);
52 static int etpan_thread_op_cancelled(struct etpan_thread_op * op);
53 static void etpan_thread_op_lock(struct etpan_thread_op * op);
54 static void etpan_thread_op_unlock(struct etpan_thread_op * op);
55 static void etpan_thread_stop(struct etpan_thread * thread);
56
57 #if 0
58 static void etpan_thread_bind(struct etpan_thread * thread);
59 static int etpan_thread_manager_op_schedule(struct etpan_thread_manager * manager,
60              struct etpan_thread_op * op);
61 static void etpan_thread_manager_start(struct etpan_thread_manager * manager);
62 static void etpan_thread_op_cancel(struct etpan_thread_op * op);
63 #endif
64
65 enum {
66   TERMINATE_STATE_NONE,
67   TERMINATE_STATE_REQUESTED,
68   TERMINATE_STATE_DONE,
69 };
70
71 struct etpan_thread_manager * etpan_thread_manager_new(void)
72 {
73   struct etpan_thread_manager * manager;
74   int r;
75   
76   manager = malloc(sizeof(* manager));
77   if (manager == NULL)
78     goto err;
79   
80   manager->thread_pool = carray_new(POOL_INIT_SIZE);
81   if (manager->thread_pool == NULL)
82     goto free;
83
84   manager->thread_pending = carray_new(POOL_INIT_SIZE);
85   if (manager->thread_pending == NULL)
86     goto free_pool;
87   
88   manager->can_create_thread = 1;
89   manager->unbound_count = 0;
90   
91   r = pipe(manager->notify_fds);
92   if (r < 0)
93     goto free_pending;
94   
95   return manager;
96   
97  free_pending:
98   carray_free(manager->thread_pending);
99  free_pool:
100   carray_free(manager->thread_pool);
101  free:
102   free(manager);
103  err:
104   return NULL;
105 }
106
107 void etpan_thread_manager_free(struct etpan_thread_manager * manager)
108 {
109   close(manager->notify_fds[1]);
110   close(manager->notify_fds[0]);
111   carray_free(manager->thread_pending);
112   carray_free(manager->thread_pool);
113   free(manager);
114 }
115
116 static struct etpan_thread * etpan_thread_new(void)
117 {
118   struct etpan_thread * thread;
119   int r;
120   
121   thread = malloc(sizeof(* thread));
122   if (thread == NULL)
123     goto err;
124   
125   r = pthread_mutex_init(&thread->lock, NULL);
126   if (r != 0)
127     goto free;
128
129   thread->op_list = carray_new(OP_INIT_SIZE);
130   if (thread->op_list == NULL)
131     goto destroy_lock;
132   
133   thread->op_done_list = carray_new(OP_INIT_SIZE);
134   if (thread->op_done_list == NULL)
135     goto free_op_list;
136   
137   thread->start_sem = mailsem_new();
138   if (thread->start_sem == NULL)
139     goto free_op_done_list;
140   
141   thread->stop_sem = mailsem_new();
142   if (thread->stop_sem == NULL)
143     goto free_startsem;
144   
145   thread->op_sem = mailsem_new();
146   if (thread->op_sem == NULL)
147     goto free_stopsem;
148   
149   thread->manager = NULL;
150   thread->bound_count = 0;
151   thread->terminate_state = TERMINATE_STATE_NONE;
152   
153   return thread;
154   
155  free_stopsem:
156   mailsem_free(thread->stop_sem);
157  free_startsem:
158   mailsem_free(thread->start_sem);
159  free_op_done_list:
160   carray_free(thread->op_done_list);
161  free_op_list:
162   carray_free(thread->op_list);
163  destroy_lock:
164   pthread_mutex_destroy(&thread->lock);
165  free:
166   free(thread);
167  err:
168   return NULL;
169 }
170
171 static void etpan_thread_free(struct etpan_thread * thread)
172 {
173   mailsem_free(thread->op_sem);
174   mailsem_free(thread->stop_sem);
175   mailsem_free(thread->start_sem);
176   carray_free(thread->op_done_list);
177   carray_free(thread->op_list);
178   pthread_mutex_destroy(&thread->lock);
179   free(thread);
180 }
181
182 struct etpan_thread_op * etpan_thread_op_new(void)
183 {
184   struct etpan_thread_op * op;
185   int r;
186   
187   op = malloc(sizeof(* op));
188   if (op == NULL)
189     goto err;
190   
191   op->thread = NULL;
192   op->run = NULL;
193   op->callback = NULL;
194   op->callback_data = NULL;
195   op->callback_called = 0;
196   op->cancellable = 0;
197   op->cancelled = 0;
198   op->param = NULL;
199   op->result = NULL;
200   op->finished = 0;
201   op->imap = NULL;
202   op->nntp = NULL;
203
204   r = pthread_mutex_init(&op->lock, NULL);
205   if (r != 0)
206     goto free;
207   
208   return op;
209   
210  free:
211   free(op);
212  err:
213   return NULL;
214 }
215
216 void etpan_thread_op_free(struct etpan_thread_op * op)
217 {
218   pthread_mutex_destroy(&op->lock);
219   free(op);
220 }
221
222 static struct etpan_thread *
223 etpan_thread_manager_create_thread(struct etpan_thread_manager * manager)
224 {
225   struct etpan_thread * thread;
226   int r;
227   
228   thread = etpan_thread_new();
229   if (thread == NULL)
230     goto err;
231   
232   thread->manager = manager;
233   
234   r = etpan_thread_start(thread);
235   if (r != NO_ERROR)
236     goto free_thread;
237   
238   r = carray_add(manager->thread_pool, thread, NULL);
239   if (r < 0) {
240     etpan_thread_stop(thread);
241     goto free_thread;
242   }
243   
244   return thread;
245   
246  free_thread:
247   etpan_thread_free(thread);
248  err:
249   return NULL;
250 }
251
252 static void
253 etpan_thread_manager_terminate_thread(struct etpan_thread_manager * manager,
254     struct etpan_thread * thread)
255 {
256   unsigned int i;
257   int r;
258   
259   for(i = 0 ; i < carray_count(manager->thread_pool) ; i ++) {
260     if (carray_get(manager->thread_pool, i) == thread) {
261       carray_delete(manager->thread_pool, i);
262       break;
263     }
264   }
265   
266   if (!etpan_thread_is_bound(thread))
267     manager->unbound_count --;
268   
269   r = carray_add(manager->thread_pending, thread, NULL);
270   if (r < 0) {
271     g_warning("complete failure of thread due to lack of memory (thread stop)");
272   }
273   
274   etpan_thread_stop(thread);
275 }
276
277 static void manager_notify(struct etpan_thread_manager * manager)
278 {
279   char ch;
280   ssize_t r;
281   
282   ch = 1;
283   r = write(manager->notify_fds[1], &ch, 1);
284   if (r < 0) {
285     g_warning("error writing notification to etpan thread manager");
286   }
287 }
288
289 static void manager_ack(struct etpan_thread_manager * manager)
290 {
291 #ifndef G_OS_WIN32
292   char ch;
293   ssize_t r;
294   r = read(manager->notify_fds[0], &ch, 1);
295   if (r != 1) {
296     g_warning("error reading notification from etpan thread manager");
297   }
298 #else
299   /* done in the GIOChannel handler in imap-thread.c and nntp-thread.c */
300 #endif
301 }
302
303 static void thread_lock(struct etpan_thread * thread)
304 {
305   pthread_mutex_lock(&thread->lock);
306 }
307
308 static void thread_unlock(struct etpan_thread * thread)
309 {
310   pthread_mutex_unlock(&thread->lock);
311 }
312
313 static void thread_notify(struct etpan_thread * thread)
314 {
315   manager_notify(thread->manager);
316 }
317
318 static void * thread_run(void * data)
319 {
320   struct etpan_thread * thread;
321   int r;
322   
323   thread = data;
324   
325   mailsem_up(thread->start_sem);
326   
327   while (1) {
328     int do_quit;
329     struct etpan_thread_op * op;
330     
331     mailsem_down(thread->op_sem);
332     
333     do_quit = 0;
334     op = NULL;
335     thread_lock(thread);
336     if (carray_count(thread->op_list) > 0) {
337       op = carray_get(thread->op_list, 0);
338       carray_delete_slow(thread->op_list, 0);
339     }
340     else {
341       do_quit = 1;
342     }
343     thread_unlock(thread);
344     
345     if (do_quit) {
346       break;
347     }
348     
349     if (!etpan_thread_op_cancelled(op)) {
350       if (op->run != NULL)
351         op->run(op);
352     }
353     
354     thread_lock(thread);
355     r = carray_add(thread->op_done_list, op, NULL);
356     if (r < 0) {
357       g_warning("complete failure of thread due to lack of memory (op done)");
358     }
359     thread_unlock(thread);
360     
361     thread_notify(thread);
362   }
363   
364   thread_lock(thread);
365   thread->terminate_state = TERMINATE_STATE_DONE;
366   thread_unlock(thread);
367   
368   thread_notify(thread);
369   
370   mailsem_up(thread->stop_sem);
371   
372   return NULL;
373 }
374
375 static int etpan_thread_start(struct etpan_thread * thread)
376 {
377   int r;
378   
379   r = pthread_create(&thread->th_id, NULL, thread_run, thread);
380   if (r != 0)
381     return ERROR_MEMORY;
382   
383   mailsem_down(thread->start_sem);
384   
385   return NO_ERROR;
386 }
387
388 static void etpan_thread_stop(struct etpan_thread * thread)
389 {
390   thread_lock(thread);
391   thread->terminate_state = TERMINATE_STATE_REQUESTED;
392   thread_unlock(thread);
393   
394   mailsem_up(thread->op_sem);
395   
396   /* this thread will be joined in the manager loop */
397 }
398
399 static int etpan_thread_is_stopped(struct etpan_thread * thread)
400 {
401   int stopped;
402   
403   thread_lock(thread);
404   stopped = (thread->terminate_state == TERMINATE_STATE_DONE);
405   thread_unlock(thread);
406   
407   return stopped;
408 }
409
410 static void etpan_thread_join(struct etpan_thread * thread)
411 {
412   mailsem_down(thread->stop_sem);
413   pthread_join(thread->th_id, NULL);
414 }
415
416 struct etpan_thread *
417 etpan_thread_manager_get_thread(struct etpan_thread_manager * manager)
418 {
419   struct etpan_thread * chosen_thread;
420   unsigned int chosen_thread_load;
421   unsigned int i;
422   struct etpan_thread * thread;
423   
424   /* chose a thread */
425   
426   chosen_thread = NULL;
427   chosen_thread_load = 0;
428   
429   for(i = 0 ; i < carray_count(manager->thread_pool) ; i ++) {
430     thread = carray_get(manager->thread_pool, i);
431     if (etpan_thread_is_bound(thread))
432       continue;
433     
434     if (chosen_thread == NULL) {
435       chosen_thread = thread;
436       chosen_thread_load = etpan_thread_get_load(thread);
437       
438       if (chosen_thread_load == 0)
439         break;
440     }
441     else {
442       unsigned int load;
443       
444       load = etpan_thread_get_load(thread);
445       
446       if (load < chosen_thread_load) {
447         chosen_thread = thread;
448         chosen_thread_load = load;
449       }
450     }
451   }
452   
453   if (chosen_thread != NULL) {
454     if (manager->can_create_thread && (chosen_thread_load != 0)) {
455       chosen_thread = NULL;
456     }
457   }
458   
459   /* choice done */
460   
461   if (chosen_thread != NULL)
462     return chosen_thread;
463   
464   thread = etpan_thread_manager_create_thread(manager);
465   if (thread == NULL)
466     goto err;
467   
468   manager->unbound_count ++;
469   if (manager->unbound_count >= POOL_UNBOUND_MAX)
470     manager->can_create_thread = 0;
471   
472   return thread;
473   
474  err:
475   return NULL;
476 }
477
478 static unsigned int etpan_thread_get_load(struct etpan_thread * thread)
479 {
480   unsigned int load;
481   
482   thread_lock(thread);
483   load = carray_count(thread->op_list);
484   thread_unlock(thread);
485   
486   return load;
487 }
488
489 #if 0
490 static void etpan_thread_bind(struct etpan_thread * thread)
491 {
492   thread->bound_count ++;
493 }
494 #endif
495
496 void etpan_thread_unbind(struct etpan_thread * thread)
497 {
498   thread->bound_count --;
499 }
500
501 static int etpan_thread_is_bound(struct etpan_thread * thread)
502 {
503   return (thread->bound_count != 0);
504 }
505
506 int etpan_thread_op_schedule(struct etpan_thread * thread,
507                              struct etpan_thread_op * op)
508 {
509   int r;
510   
511   if (thread->terminate_state != TERMINATE_STATE_NONE)
512     return ERROR_INVAL;
513   
514   thread_lock(thread);
515   r = carray_add(thread->op_list, op, NULL);
516   thread_unlock(thread);
517   
518   if (r < 0)
519     return ERROR_MEMORY;
520   
521   op->thread = thread;
522   mailsem_up(thread->op_sem);
523   
524   return NO_ERROR;
525 }
526
527 static void etpan_thread_op_lock(struct etpan_thread_op * op)
528 {
529   pthread_mutex_lock(&op->lock);
530 }
531
532 static void etpan_thread_op_unlock(struct etpan_thread_op * op)
533 {
534   pthread_mutex_unlock(&op->lock);
535 }
536
537 static int etpan_thread_op_cancelled(struct etpan_thread_op * op)
538 {
539   int cancelled;
540   
541   cancelled = 0;
542   etpan_thread_op_lock(op);
543   if (op->cancellable)
544     cancelled = op->cancelled;
545   etpan_thread_op_unlock(op);
546   
547   return cancelled;
548 }
549
550 int etpan_thread_manager_get_fd(struct etpan_thread_manager * manager)
551 {
552   return manager->notify_fds[0];
553 }
554
555 static void loop_thread_list(carray * op_to_notify,
556     carray * thread_list)
557 {
558   unsigned int i;
559   int r;
560   
561   for(i = 0 ; i < carray_count(thread_list) ; i ++) {
562     struct etpan_thread * thread;
563     unsigned int j;
564     
565     thread = carray_get(thread_list, i);
566     
567     thread_lock(thread);
568     
569     for(j = 0 ; j < carray_count(thread->op_done_list) ; j ++) {
570       struct etpan_thread_op * op;
571       
572       op = carray_get(thread->op_done_list, j);
573       r = carray_add(op_to_notify, op, NULL);
574       if (r < 0) {
575         g_warning("complete failure of thread due to lack of memory (callback)");
576         break;
577       }
578     }
579     carray_set_size(thread->op_done_list, 0);
580     
581     thread_unlock(thread);
582   }
583 }
584
585 void etpan_thread_manager_loop(struct etpan_thread_manager * manager)
586 {
587   carray * op_to_notify;
588   unsigned int i;
589   
590   manager_ack(manager);
591   
592   op_to_notify = carray_new(OP_INIT_SIZE);
593   
594   loop_thread_list(op_to_notify, manager->thread_pool);
595   loop_thread_list(op_to_notify, manager->thread_pending);
596   
597   for(i = 0 ; i < carray_count(op_to_notify) ; i ++) {
598     struct etpan_thread_op * op;
599     
600     op = carray_get(op_to_notify, i);
601     
602     etpan_thread_op_lock(op);
603     
604     if (!op->callback_called) {
605       if (op->callback != NULL)
606         op->callback(op->cancelled, op->result, op->callback_data);
607     }
608     
609     etpan_thread_op_unlock(op);
610     
611     if (op->cleanup != NULL)
612       op->cleanup(op);
613   }
614   
615   carray_free(op_to_notify);
616   
617   i = 0;
618   while (i < carray_count(manager->thread_pending)) {
619     struct etpan_thread * thread;
620     
621     thread = carray_get(manager->thread_pending, i);
622     
623     if (etpan_thread_is_stopped(thread)) {
624       etpan_thread_join(thread);
625       
626       etpan_thread_free(thread);
627       
628       carray_delete_slow(manager->thread_pending, i);
629     }
630     else {
631       i ++;
632     }
633   }
634 }
635
636 #if 0
637 static void etpan_thread_manager_start(struct etpan_thread_manager * manager)
638 {
639   /* do nothing */
640 }
641 #endif
642
643 void etpan_thread_manager_stop(struct etpan_thread_manager * manager)
644 {
645   while (carray_count(manager->thread_pool) > 0) {
646     struct etpan_thread * thread;
647     
648     thread = carray_get(manager->thread_pool, 0);
649     etpan_thread_manager_terminate_thread(manager, thread);
650   }
651 }
652
653 static int etpan_thread_manager_is_stopped(struct etpan_thread_manager * manager)
654 {
655   return ((carray_count(manager->thread_pending) == 0) && 
656       (carray_count(manager->thread_pool) == 0));
657 }
658
659 void etpan_thread_manager_join(struct etpan_thread_manager * manager)
660 {
661   while (!etpan_thread_manager_is_stopped(manager)) {
662     etpan_thread_manager_loop(manager);
663   }
664 }
665 #endif