0b8c366a3a0819503afd9d9d47c49f97409f1d53
[claws.git] / src / etpan / etpan-thread-manager.c
1 /*
2  * Claws Mail -- a GTK+ based, lightweight, and fast e-mail client
3  * Copyright (C) 2005-2012 DINH Viet Hoa and the Claws Mail team
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 3 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program. If not, see <http://www.gnu.org/licenses/>.
17  * 
18  */
19
20 #ifdef HAVE_CONFIG_H
21 #  include "config.h"
22 #include "claws-features.h"
23 #endif
24
25 #ifdef HAVE_LIBETPAN
26
27 #include "etpan-thread-manager.h"
28
29 #include <glib.h>
30 #include <stdlib.h>
31 #include <pthread.h>
32 #include <libetpan/mailsem.h>
33 #include <semaphore.h>
34 #include <unistd.h>
35 #include <fcntl.h>
36
37 #include "etpan-errors.h"
38 #include "utils.h"
39
40 #define POOL_UNBOUND_MAX 4
41
42 #define POOL_INIT_SIZE 8
43 #define OP_INIT_SIZE 8
44
45 static int etpan_thread_start(struct etpan_thread * thread);
46 static void etpan_thread_free(struct etpan_thread * thread);
47 static unsigned int etpan_thread_get_load(struct etpan_thread * thread);
48 static int etpan_thread_is_bound(struct etpan_thread * thread);
49 static int etpan_thread_manager_is_stopped(struct etpan_thread_manager * manager);
50 static void etpan_thread_join(struct etpan_thread * thread);
51 static struct etpan_thread * etpan_thread_new(void);
52 static int etpan_thread_op_cancelled(struct etpan_thread_op * op);
53 static void etpan_thread_op_lock(struct etpan_thread_op * op);
54 static void etpan_thread_op_unlock(struct etpan_thread_op * op);
55 static void etpan_thread_stop(struct etpan_thread * thread);
56
57 #if 0
58 static void etpan_thread_bind(struct etpan_thread * thread);
59 static int etpan_thread_manager_op_schedule(struct etpan_thread_manager * manager,
60              struct etpan_thread_op * op);
61 static void etpan_thread_manager_start(struct etpan_thread_manager * manager);
62 static void etpan_thread_op_cancel(struct etpan_thread_op * op);
63 #endif
64
65 enum {
66   TERMINATE_STATE_NONE,
67   TERMINATE_STATE_REQUESTED,
68   TERMINATE_STATE_DONE,
69 };
70
71 struct etpan_thread_manager * etpan_thread_manager_new(void)
72 {
73   struct etpan_thread_manager * manager;
74   int r;
75   
76   manager = malloc(sizeof(* manager));
77   if (manager == NULL)
78     goto err;
79   
80   manager->thread_pool = carray_new(POOL_INIT_SIZE);
81   if (manager->thread_pool == NULL)
82     goto free;
83
84   manager->thread_pending = carray_new(POOL_INIT_SIZE);
85   if (manager->thread_pending == NULL)
86     goto free_pool;
87   
88   manager->can_create_thread = 1;
89   manager->unbound_count = 0;
90   
91   r = pipe(manager->notify_fds);
92   if (r < 0)
93     goto free_pending;
94   
95   return manager;
96   
97  free_pending:
98   carray_free(manager->thread_pending);
99  free_pool:
100   carray_free(manager->thread_pool);
101  free:
102   free(manager);
103  err:
104   return NULL;
105 }
106
107 void etpan_thread_manager_free(struct etpan_thread_manager * manager)
108 {
109   close(manager->notify_fds[1]);
110   close(manager->notify_fds[0]);
111   carray_free(manager->thread_pending);
112   carray_free(manager->thread_pool);
113   free(manager);
114 }
115
116 static struct etpan_thread * etpan_thread_new(void)
117 {
118   struct etpan_thread * thread;
119   int r;
120   
121   thread = malloc(sizeof(* thread));
122   if (thread == NULL)
123     goto err;
124   
125   r = pthread_mutex_init(&thread->lock, NULL);
126   if (r != 0)
127     goto free;
128
129   thread->op_list = carray_new(OP_INIT_SIZE);
130   if (thread->op_list == NULL)
131     goto destroy_lock;
132   
133   thread->op_done_list = carray_new(OP_INIT_SIZE);
134   if (thread->op_done_list == NULL)
135     goto free_op_list;
136   
137   thread->start_sem = mailsem_new();
138   if (thread->start_sem == NULL)
139     goto free_op_done_list;
140   
141   thread->stop_sem = mailsem_new();
142   if (thread->stop_sem == NULL)
143     goto free_startsem;
144   
145   thread->op_sem = mailsem_new();
146   if (thread->op_sem == NULL)
147     goto free_stopsem;
148   
149   thread->manager = NULL;
150   thread->bound_count = 0;
151   thread->terminate_state = TERMINATE_STATE_NONE;
152   
153   return thread;
154   
155  free_stopsem:
156   mailsem_free(thread->stop_sem);
157  free_startsem:
158   mailsem_free(thread->start_sem);
159  free_op_done_list:
160   carray_free(thread->op_done_list);
161  free_op_list:
162   carray_free(thread->op_list);
163  destroy_lock:
164   pthread_mutex_destroy(&thread->lock);
165  free:
166   free(thread);
167  err:
168   return NULL;
169 }
170
171 static void etpan_thread_free(struct etpan_thread * thread)
172 {
173   mailsem_free(thread->op_sem);
174   mailsem_free(thread->stop_sem);
175   mailsem_free(thread->start_sem);
176   carray_free(thread->op_done_list);
177   carray_free(thread->op_list);
178   pthread_mutex_destroy(&thread->lock);
179   free(thread);
180 }
181
182 struct etpan_thread_op * etpan_thread_op_new(void)
183 {
184   struct etpan_thread_op * op;
185   int r;
186   
187   op = malloc(sizeof(* op));
188   if (op == NULL)
189     goto err;
190   
191   op->thread = NULL;
192   op->run = NULL;
193   op->callback = NULL;
194   op->callback_data = NULL;
195   op->callback_called = 0;
196   op->cancellable = 0;
197   op->cancelled = 0;
198   op->param = NULL;
199   op->result = NULL;
200   op->finished = 0;
201   op->imap = NULL;
202   op->nntp = NULL;
203
204   r = pthread_mutex_init(&op->lock, NULL);
205   if (r != 0)
206     goto free;
207   
208   return op;
209   
210  free:
211   free(op);
212  err:
213   return NULL;
214 }
215
216 void etpan_thread_op_free(struct etpan_thread_op * op)
217 {
218   pthread_mutex_destroy(&op->lock);
219   free(op);
220 }
221
222 static struct etpan_thread *
223 etpan_thread_manager_create_thread(struct etpan_thread_manager * manager)
224 {
225   struct etpan_thread * thread;
226   int r;
227   
228   thread = etpan_thread_new();
229   if (thread == NULL)
230     goto err;
231   
232   thread->manager = manager;
233   
234   r = etpan_thread_start(thread);
235   if (r != NO_ERROR)
236     goto free_thread;
237   
238   r = carray_add(manager->thread_pool, thread, NULL);
239   if (r < 0) {
240     etpan_thread_stop(thread);
241     goto free_thread;
242   }
243   
244   return thread;
245   
246  free_thread:
247   etpan_thread_free(thread);
248  err:
249   return NULL;
250 }
251
252 static void
253 etpan_thread_manager_terminate_thread(struct etpan_thread_manager * manager,
254     struct etpan_thread * thread)
255 {
256   unsigned int i;
257   int r;
258   
259   for(i = 0 ; i < carray_count(manager->thread_pool) ; i ++) {
260     if (carray_get(manager->thread_pool, i) == thread) {
261       carray_delete(manager->thread_pool, i);
262       break;
263     }
264   }
265   
266   if (!etpan_thread_is_bound(thread))
267     manager->unbound_count --;
268   
269   r = carray_add(manager->thread_pending, thread, NULL);
270   if (r < 0) {
271     g_warning("complete failure of thread due to lack of memory (thread stop)");
272   }
273   
274   etpan_thread_stop(thread);
275 }
276
277 static void manager_notify(struct etpan_thread_manager * manager)
278 {
279   char ch;
280   ssize_t r;
281   
282   ch = 1;
283   r = write(manager->notify_fds[1], &ch, 1);
284 }
285
286 static void manager_ack(struct etpan_thread_manager * manager)
287 {
288 #ifndef G_OS_WIN32
289   char ch;
290   ssize_t r;
291   r = read(manager->notify_fds[0], &ch, 1);
292 #else
293   /* done in the GIOChannel handler in imap-thread.c and nntp-thread.c */
294 #endif
295 }
296
297 static void thread_lock(struct etpan_thread * thread)
298 {
299   pthread_mutex_lock(&thread->lock);
300 }
301
302 static void thread_unlock(struct etpan_thread * thread)
303 {
304   pthread_mutex_unlock(&thread->lock);
305 }
306
307 static void thread_notify(struct etpan_thread * thread)
308 {
309   manager_notify(thread->manager);
310 }
311
312 static void * thread_run(void * data)
313 {
314   struct etpan_thread * thread;
315   int r;
316   
317   thread = data;
318   
319   mailsem_up(thread->start_sem);
320   
321   while (1) {
322     int do_quit;
323     struct etpan_thread_op * op;
324     
325     mailsem_down(thread->op_sem);
326     
327     do_quit = 0;
328     op = NULL;
329     thread_lock(thread);
330     if (carray_count(thread->op_list) > 0) {
331       op = carray_get(thread->op_list, 0);
332       carray_delete_slow(thread->op_list, 0);
333     }
334     else {
335       do_quit = 1;
336     }
337     thread_unlock(thread);
338     
339     if (do_quit) {
340       break;
341     }
342     
343     if (!etpan_thread_op_cancelled(op)) {
344       if (op->run != NULL)
345         op->run(op);
346     }
347     
348     thread_lock(thread);
349     r = carray_add(thread->op_done_list, op, NULL);
350     if (r < 0) {
351       g_warning("complete failure of thread due to lack of memory (op done)");
352     }
353     thread_unlock(thread);
354     
355     thread_notify(thread);
356   }
357   
358   thread_lock(thread);
359   thread->terminate_state = TERMINATE_STATE_DONE;
360   thread_unlock(thread);
361   
362   thread_notify(thread);
363   
364   mailsem_up(thread->stop_sem);
365   
366   return NULL;
367 }
368
369 static int etpan_thread_start(struct etpan_thread * thread)
370 {
371   int r;
372   
373   r = pthread_create(&thread->th_id, NULL, thread_run, thread);
374   if (r != 0)
375     return ERROR_MEMORY;
376   
377   mailsem_down(thread->start_sem);
378   
379   return NO_ERROR;
380 }
381
382 static void etpan_thread_stop(struct etpan_thread * thread)
383 {
384   thread_lock(thread);
385   thread->terminate_state = TERMINATE_STATE_REQUESTED;
386   thread_unlock(thread);
387   
388   mailsem_up(thread->op_sem);
389   
390   /* this thread will be joined in the manager loop */
391 }
392
393 static int etpan_thread_is_stopped(struct etpan_thread * thread)
394 {
395   int stopped;
396   
397   thread_lock(thread);
398   stopped = (thread->terminate_state == TERMINATE_STATE_DONE);
399   thread_unlock(thread);
400   
401   return stopped;
402 }
403
404 static void etpan_thread_join(struct etpan_thread * thread)
405 {
406   mailsem_down(thread->stop_sem);
407   pthread_join(thread->th_id, NULL);
408 }
409
410 struct etpan_thread *
411 etpan_thread_manager_get_thread(struct etpan_thread_manager * manager)
412 {
413   struct etpan_thread * chosen_thread;
414   unsigned int chosen_thread_load;
415   unsigned int i;
416   struct etpan_thread * thread;
417   
418   /* chose a thread */
419   
420   chosen_thread = NULL;
421   chosen_thread_load = 0;
422   
423   for(i = 0 ; i < carray_count(manager->thread_pool) ; i ++) {
424     thread = carray_get(manager->thread_pool, i);
425     if (etpan_thread_is_bound(thread))
426       continue;
427     
428     if (chosen_thread == NULL) {
429       chosen_thread = thread;
430       chosen_thread_load = etpan_thread_get_load(thread);
431       
432       if (chosen_thread_load == 0)
433         break;
434     }
435     else {
436       unsigned int load;
437       
438       load = etpan_thread_get_load(thread);
439       
440       if (load < chosen_thread_load) {
441         chosen_thread = thread;
442         chosen_thread_load = load;
443       }
444     }
445   }
446   
447   if (chosen_thread != NULL) {
448     if (manager->can_create_thread && (chosen_thread_load != 0)) {
449       chosen_thread = NULL;
450     }
451   }
452   
453   /* choice done */
454   
455   if (chosen_thread != NULL)
456     return chosen_thread;
457   
458   thread = etpan_thread_manager_create_thread(manager);
459   if (thread == NULL)
460     goto err;
461   
462   manager->unbound_count ++;
463   if (manager->unbound_count >= POOL_UNBOUND_MAX)
464     manager->can_create_thread = 0;
465   
466   return thread;
467   
468  err:
469   return NULL;
470 }
471
472 static unsigned int etpan_thread_get_load(struct etpan_thread * thread)
473 {
474   unsigned int load;
475   
476   thread_lock(thread);
477   load = carray_count(thread->op_list);
478   thread_unlock(thread);
479   
480   return load;
481 }
482
483 #if 0
484 static void etpan_thread_bind(struct etpan_thread * thread)
485 {
486   thread->bound_count ++;
487 }
488 #endif
489
490 void etpan_thread_unbind(struct etpan_thread * thread)
491 {
492   thread->bound_count --;
493 }
494
495 static int etpan_thread_is_bound(struct etpan_thread * thread)
496 {
497   return (thread->bound_count != 0);
498 }
499
500 int etpan_thread_op_schedule(struct etpan_thread * thread,
501                              struct etpan_thread_op * op)
502 {
503   int r;
504   
505   if (thread->terminate_state != TERMINATE_STATE_NONE)
506     return ERROR_INVAL;
507   
508   thread_lock(thread);
509   r = carray_add(thread->op_list, op, NULL);
510   thread_unlock(thread);
511   
512   if (r < 0)
513     return ERROR_MEMORY;
514   
515   op->thread = thread;
516   mailsem_up(thread->op_sem);
517   
518   return NO_ERROR;
519 }
520
521 static void etpan_thread_op_lock(struct etpan_thread_op * op)
522 {
523   pthread_mutex_lock(&op->lock);
524 }
525
526 static void etpan_thread_op_unlock(struct etpan_thread_op * op)
527 {
528   pthread_mutex_unlock(&op->lock);
529 }
530
531 static int etpan_thread_op_cancelled(struct etpan_thread_op * op)
532 {
533   int cancelled;
534   
535   cancelled = 0;
536   etpan_thread_op_lock(op);
537   if (op->cancellable)
538     cancelled = op->cancelled;
539   etpan_thread_op_unlock(op);
540   
541   return cancelled;
542 }
543
544 int etpan_thread_manager_get_fd(struct etpan_thread_manager * manager)
545 {
546   return manager->notify_fds[0];
547 }
548
549 static void loop_thread_list(carray * op_to_notify,
550     carray * thread_list)
551 {
552   unsigned int i;
553   int r;
554   
555   for(i = 0 ; i < carray_count(thread_list) ; i ++) {
556     struct etpan_thread * thread;
557     unsigned int j;
558     
559     thread = carray_get(thread_list, i);
560     
561     thread_lock(thread);
562     
563     for(j = 0 ; j < carray_count(thread->op_done_list) ; j ++) {
564       struct etpan_thread_op * op;
565       
566       op = carray_get(thread->op_done_list, j);
567       r = carray_add(op_to_notify, op, NULL);
568       if (r < 0) {
569         g_warning("complete failure of thread due to lack of memory (callback)");
570         break;
571       }
572     }
573     carray_set_size(thread->op_done_list, 0);
574     
575     thread_unlock(thread);
576   }
577 }
578
579 void etpan_thread_manager_loop(struct etpan_thread_manager * manager)
580 {
581   carray * op_to_notify;
582   unsigned int i;
583   
584   manager_ack(manager);
585   
586   op_to_notify = carray_new(OP_INIT_SIZE);
587   
588   loop_thread_list(op_to_notify, manager->thread_pool);
589   loop_thread_list(op_to_notify, manager->thread_pending);
590   
591   for(i = 0 ; i < carray_count(op_to_notify) ; i ++) {
592     struct etpan_thread_op * op;
593     int cancelled;
594     
595     op = carray_get(op_to_notify, i);
596     
597     cancelled = etpan_thread_op_cancelled(op);
598     
599     etpan_thread_op_lock(op);
600     
601     if (!op->callback_called) {
602       if (op->callback != NULL)
603         op->callback(op->cancelled, op->result, op->callback_data);
604     }
605     
606     etpan_thread_op_unlock(op);
607     
608     if (op->cleanup != NULL)
609       op->cleanup(op);
610   }
611   
612   carray_free(op_to_notify);
613   
614   i = 0;
615   while (i < carray_count(manager->thread_pending)) {
616     struct etpan_thread * thread;
617     
618     thread = carray_get(manager->thread_pending, i);
619     
620     if (etpan_thread_is_stopped(thread)) {
621       etpan_thread_join(thread);
622       
623       etpan_thread_free(thread);
624       
625       carray_delete_slow(manager->thread_pending, i);
626     }
627     else {
628       i ++;
629     }
630   }
631 }
632
633 #if 0
634 static void etpan_thread_manager_start(struct etpan_thread_manager * manager)
635 {
636   /* do nothing */
637 }
638 #endif
639
640 void etpan_thread_manager_stop(struct etpan_thread_manager * manager)
641 {
642   while (carray_count(manager->thread_pool) > 0) {
643     struct etpan_thread * thread;
644     
645     thread = carray_get(manager->thread_pool, 0);
646     etpan_thread_manager_terminate_thread(manager, thread);
647   }
648 }
649
650 static int etpan_thread_manager_is_stopped(struct etpan_thread_manager * manager)
651 {
652   return ((carray_count(manager->thread_pending) == 0) && 
653       (carray_count(manager->thread_pool) == 0));
654 }
655
656 void etpan_thread_manager_join(struct etpan_thread_manager * manager)
657 {
658   while (!etpan_thread_manager_is_stopped(manager)) {
659     etpan_thread_manager_loop(manager);
660   }
661 }
662 #endif