2005-07-06 [colin] 1.9.12cvs24
[claws.git] / src / etpan / etpan-thread-manager.c
1 #ifdef HAVE_CONFIG_H
2 #  include "config.h"
3 #endif
4
5 #ifdef HAVE_LIBETPAN
6
7 #include "etpan-thread-manager.h"
8
9 #include <stdlib.h>
10 #include <pthread.h>
11 #include <libetpan/mailsem.h>
12 #include <semaphore.h>
13 #include <unistd.h>
14
15 #include "etpan-log.h"
16 #include "etpan-errors.h"
17
18 #define POOL_UNBOUND_MAX 4
19
20 #define POOL_INIT_SIZE 8
21 #define OP_INIT_SIZE 8
22
23 enum {
24   TERMINATE_STATE_NONE,
25   TERMINATE_STATE_REQUESTED,
26   TERMINATE_STATE_DONE,
27 };
28
29 struct etpan_thread_manager * etpan_thread_manager_new(void)
30 {
31   struct etpan_thread_manager * manager;
32   int r;
33   
34   manager = malloc(sizeof(* manager));
35   if (manager == NULL)
36     goto err;
37   
38   manager->thread_pool = carray_new(POOL_INIT_SIZE);
39   if (manager->thread_pool == NULL)
40     goto free;
41
42   manager->thread_pending = carray_new(POOL_INIT_SIZE);
43   if (manager->thread_pending == NULL)
44     goto free_pool;
45   
46   manager->can_create_thread = 1;
47   manager->unbound_count = 0;
48   
49   r = pipe(manager->notify_fds);
50   if (r < 0)
51     goto free_pending;
52   
53   return manager;
54   
55  free_pending:
56   carray_free(manager->thread_pending);
57  free_pool:
58   carray_free(manager->thread_pool);
59  free:
60   free(manager);
61  err:
62   return NULL;
63 }
64
65 void etpan_thread_manager_free(struct etpan_thread_manager * manager)
66 {
67   close(manager->notify_fds[1]);
68   close(manager->notify_fds[0]);
69   carray_free(manager->thread_pending);
70   carray_free(manager->thread_pool);
71   free(manager);
72 }
73
74 struct etpan_thread * etpan_thread_new(void)
75 {
76   struct etpan_thread * thread;
77   int r;
78   
79   thread = malloc(sizeof(* thread));
80   if (thread == NULL)
81     goto err;
82   
83   r = pthread_mutex_init(&thread->lock, NULL);
84   if (r != 0)
85     goto free;
86
87   thread->op_list = carray_new(OP_INIT_SIZE);
88   if (thread->op_list == NULL)
89     goto destroy_lock;
90   
91   thread->op_done_list = carray_new(OP_INIT_SIZE);
92   if (thread->op_done_list == NULL)
93     goto free_op_list;
94   
95   thread->start_sem = mailsem_new();
96   if (thread->start_sem == NULL)
97     goto free_op_done_list;
98   
99   thread->stop_sem = mailsem_new();
100   if (thread->stop_sem == NULL)
101     goto free_startsem;
102   
103   thread->op_sem = mailsem_new();
104   if (thread->op_sem == NULL)
105     goto free_stopsem;
106   
107   thread->manager = NULL;
108   thread->bound_count = 0;
109   thread->terminate_state = TERMINATE_STATE_NONE;
110   
111   return thread;
112   
113  free_stopsem:
114   mailsem_free(thread->stop_sem);
115  free_startsem:
116   mailsem_free(thread->start_sem);
117  free_op_done_list:
118   carray_free(thread->op_done_list);
119  free_op_list:
120   carray_free(thread->op_list);
121  destroy_lock:
122   pthread_mutex_destroy(&thread->lock);
123  free:
124   free(thread);
125  err:
126   return NULL;
127 }
128
129 void etpan_thread_free(struct etpan_thread * thread)
130 {
131   mailsem_free(thread->op_sem);
132   mailsem_free(thread->stop_sem);
133   mailsem_free(thread->start_sem);
134   carray_free(thread->op_done_list);
135   carray_free(thread->op_list);
136   pthread_mutex_destroy(&thread->lock);
137   free(thread);
138 }
139
140 struct etpan_thread_op * etpan_thread_op_new(void)
141 {
142   struct etpan_thread_op * op;
143   int r;
144   
145   op = malloc(sizeof(* op));
146   if (op == NULL)
147     goto err;
148   
149   op->thread = NULL;
150   op->run = NULL;
151   op->callback = NULL;
152   op->callback_data = NULL;
153   op->callback_called = 0;
154   op->cancellable = 0;
155   op->cancelled = 0;
156   op->param = NULL;
157   op->result = NULL;
158   
159   r = pthread_mutex_init(&op->lock, NULL);
160   if (r != 0)
161     goto free;
162   
163   return op;
164   
165  free:
166   free(op);
167  err:
168   return NULL;
169 }
170
171 void etpan_thread_op_free(struct etpan_thread_op * op)
172 {
173   pthread_mutex_destroy(&op->lock);
174   free(op);
175 }
176
177 static struct etpan_thread *
178 etpan_thread_manager_create_thread(struct etpan_thread_manager * manager)
179 {
180   struct etpan_thread * thread;
181   int r;
182   
183   thread = etpan_thread_new();
184   if (thread == NULL)
185     goto err;
186   
187   thread->manager = manager;
188   
189   r = etpan_thread_start(thread);
190   if (r != NO_ERROR)
191     goto free_thread;
192   
193   r = carray_add(manager->thread_pool, thread, NULL);
194   if (r < 0) {
195     etpan_thread_stop(thread);
196     goto free_thread;
197   }
198   
199   return thread;
200   
201  free_thread:
202   etpan_thread_free(thread);
203  err:
204   return NULL;
205 }
206
207 static void
208 etpan_thread_manager_terminate_thread(struct etpan_thread_manager * manager,
209     struct etpan_thread * thread)
210 {
211   unsigned int i;
212   int r;
213   
214   for(i = 0 ; i < carray_count(manager->thread_pool) ; i ++) {
215     if (carray_get(manager->thread_pool, i) == thread) {
216       carray_delete(manager->thread_pool, i);
217       break;
218     }
219   }
220   
221   if (!etpan_thread_is_bound(thread))
222     manager->unbound_count --;
223   
224   r = carray_add(manager->thread_pending, thread, NULL);
225   if (r < 0) {
226     ETPAN_LOG("complete failure of thread due to lack of memory (thread stop)");
227   }
228   
229   etpan_thread_stop(thread);
230 }
231
232 static void manager_notify(struct etpan_thread_manager * manager)
233 {
234   char ch;
235   ssize_t r;
236   
237   ch = 1;
238   r = write(manager->notify_fds[1], &ch, 1);
239 }
240
241 static void manager_ack(struct etpan_thread_manager * manager)
242 {
243   char ch;
244   ssize_t r;
245   
246   r = read(manager->notify_fds[0], &ch, 1);
247 }
248
249 static void thread_lock(struct etpan_thread * thread)
250 {
251   pthread_mutex_lock(&thread->lock);
252 }
253
254 static void thread_unlock(struct etpan_thread * thread)
255 {
256   pthread_mutex_unlock(&thread->lock);
257 }
258
259 static void thread_notify(struct etpan_thread * thread)
260 {
261   manager_notify(thread->manager);
262 }
263
264 static void * thread_run(void * data)
265 {
266   struct etpan_thread * thread;
267   int r;
268   
269   thread = data;
270   
271   mailsem_up(thread->start_sem);
272   
273   while (1) {
274     int do_quit;
275     struct etpan_thread_op * op;
276     
277     mailsem_down(thread->op_sem);
278     
279     do_quit = 0;
280     op = NULL;
281     thread_lock(thread);
282     if (carray_count(thread->op_list) > 0) {
283       op = carray_get(thread->op_list, 0);
284       carray_delete_slow(thread->op_list, 0);
285     }
286     else {
287       do_quit = 1;
288     }
289     thread_unlock(thread);
290     
291     if (do_quit) {
292       break;
293     }
294     
295     if (!etpan_thread_op_cancelled(op)) {
296       if (op->run != NULL)
297         op->run(op);
298     }
299     
300     thread_lock(thread);
301     r = carray_add(thread->op_done_list, op, NULL);
302     if (r < 0) {
303       ETPAN_LOG("complete failure of thread due to lack of memory (op done)");
304     }
305     thread_unlock(thread);
306     
307     thread_notify(thread);
308   }
309   
310   thread_lock(thread);
311   thread->terminate_state = TERMINATE_STATE_DONE;
312   thread_unlock(thread);
313   
314   thread_notify(thread);
315   
316   mailsem_up(thread->stop_sem);
317   
318   return NULL;
319 }
320
321 int etpan_thread_start(struct etpan_thread * thread)
322 {
323   int r;
324   
325   r = pthread_create(&thread->th_id, NULL, thread_run, thread);
326   if (r != 0)
327     return ERROR_MEMORY;
328   
329   mailsem_down(thread->start_sem);
330   
331   return NO_ERROR;
332 }
333
334 void etpan_thread_stop(struct etpan_thread * thread)
335 {
336   thread_lock(thread);
337   thread->terminate_state = TERMINATE_STATE_REQUESTED;
338   thread_unlock(thread);
339   
340   mailsem_up(thread->op_sem);
341   
342   /* this thread will be joined in the manager loop */
343 }
344
345 int etpan_thread_is_stopped(struct etpan_thread * thread)
346 {
347   int stopped;
348   
349   thread_lock(thread);
350   stopped = (thread->terminate_state == TERMINATE_STATE_DONE);
351   thread_unlock(thread);
352   
353   return stopped;
354 }
355
356 void etpan_thread_join(struct etpan_thread * thread)
357 {
358   mailsem_down(thread->stop_sem);
359   pthread_join(thread->th_id, NULL);
360 }
361
362 struct etpan_thread *
363 etpan_thread_manager_get_thread(struct etpan_thread_manager * manager)
364 {
365   struct etpan_thread * chosen_thread;
366   unsigned int chosen_thread_load;
367   unsigned int i;
368   struct etpan_thread * thread;
369   
370   /* chose a thread */
371   
372   chosen_thread = NULL;
373   chosen_thread_load = 0;
374   
375   for(i = 0 ; i < carray_count(manager->thread_pool) ; i ++) {
376     thread = carray_get(manager->thread_pool, i);
377     if (etpan_thread_is_bound(thread))
378       continue;
379     
380     if (chosen_thread == NULL) {
381       chosen_thread = thread;
382       chosen_thread_load = etpan_thread_get_load(thread);
383       
384       if (chosen_thread_load == 0)
385         break;
386     }
387     else {
388       unsigned int load;
389       
390       load = etpan_thread_get_load(thread);
391       
392       if (load < chosen_thread_load) {
393         chosen_thread = thread;
394         chosen_thread_load = load;
395       }
396     }
397   }
398   
399   if (chosen_thread != NULL) {
400     if (manager->can_create_thread && (chosen_thread_load != 0)) {
401       chosen_thread = NULL;
402     }
403   }
404   
405   /* choice done */
406   
407   if (chosen_thread != NULL)
408     return chosen_thread;
409   
410   thread = etpan_thread_manager_create_thread(manager);
411   if (thread == NULL)
412     goto err;
413   
414   manager->unbound_count ++;
415   if (manager->unbound_count >= POOL_UNBOUND_MAX)
416     manager->can_create_thread = 0;
417   
418   return thread;
419   
420  err:
421   return NULL;
422 }
423
424 unsigned int etpan_thread_get_load(struct etpan_thread * thread)
425 {
426   unsigned int load;
427   
428   thread_lock(thread);
429   load = carray_count(thread->op_list);
430   thread_unlock(thread);
431   
432   return load;
433 }
434
435 void etpan_thread_bind(struct etpan_thread * thread)
436 {
437   thread->bound_count ++;
438 }
439
440 void etpan_thread_unbind(struct etpan_thread * thread)
441 {
442   thread->bound_count --;
443 }
444
445 int etpan_thread_is_bound(struct etpan_thread * thread)
446 {
447   return (thread->bound_count != 0);
448 }
449
450 int etpan_thread_op_schedule(struct etpan_thread * thread,
451                              struct etpan_thread_op * op)
452 {
453   int r;
454   
455   if (thread->terminate_state != TERMINATE_STATE_NONE)
456     return ERROR_INVAL;
457   
458   thread_lock(thread);
459   r = carray_add(thread->op_list, op, NULL);
460   thread_unlock(thread);
461   
462   if (r < 0)
463     return ERROR_MEMORY;
464   
465   op->thread = thread;
466   mailsem_up(thread->op_sem);
467   
468   return NO_ERROR;
469 }
470
471 void etpan_thread_op_lock(struct etpan_thread_op * op)
472 {
473   pthread_mutex_lock(&op->lock);
474 }
475
476 void etpan_thread_op_unlock(struct etpan_thread_op * op)
477 {
478   pthread_mutex_unlock(&op->lock);
479 }
480
481 int etpan_thread_op_cancelled(struct etpan_thread_op * op)
482 {
483   int cancelled;
484   
485   cancelled = 0;
486   etpan_thread_op_lock(op);
487   if (op->cancellable)
488     cancelled = op->cancelled;
489   etpan_thread_op_unlock(op);
490   
491   return cancelled;
492 }
493
494 void etpan_thread_op_cancel(struct etpan_thread_op * op)
495
496   etpan_thread_op_lock(op);
497   if (op->cancelled) {
498     ETPAN_LOG("cancelled twice");
499   }
500   op->cancelled = 1;
501   if ((op->callback != NULL) && (!op->callback_called)) {
502     op->callback(op->cancelled, op->result, op->callback_data);
503     op->callback_called = 1;
504   }
505   etpan_thread_op_unlock(op);
506 }
507
508 int etpan_thread_manager_op_schedule(struct etpan_thread_manager * manager,
509     struct etpan_thread_op * op)
510 {
511   struct etpan_thread * thread;
512   
513   thread = etpan_thread_manager_get_thread(manager);
514   
515   if (thread == NULL)
516     goto err;
517   
518   return etpan_thread_op_schedule(thread, op);
519   
520  err:
521   return ERROR_MEMORY;
522 }
523
524 int etpan_thread_manager_get_fd(struct etpan_thread_manager * manager)
525 {
526   return manager->notify_fds[0];
527 }
528
529 static void loop_thread_list(carray * op_to_notify,
530     carray * thread_list)
531 {
532   unsigned int i;
533   int r;
534   
535   for(i = 0 ; i < carray_count(thread_list) ; i ++) {
536     struct etpan_thread * thread;
537     unsigned int j;
538     
539     thread = carray_get(thread_list, i);
540     
541     thread_lock(thread);
542     
543     for(j = 0 ; j < carray_count(thread->op_done_list) ; j ++) {
544       struct etpan_thread_op * op;
545       
546       op = carray_get(thread->op_done_list, j);
547       r = carray_add(op_to_notify, op, NULL);
548       if (r < 0) {
549         ETPAN_LOG("complete failure of thread due to lack of memory (callback)");
550         break;
551       }
552     }
553     carray_set_size(thread->op_done_list, 0);
554     
555     thread_unlock(thread);
556   }
557 }
558
559 void etpan_thread_manager_loop(struct etpan_thread_manager * manager)
560 {
561   carray * op_to_notify;
562   unsigned int i;
563   
564   manager_ack(manager);
565   
566   op_to_notify = carray_new(OP_INIT_SIZE);
567   
568   loop_thread_list(op_to_notify, manager->thread_pool);
569   loop_thread_list(op_to_notify, manager->thread_pending);
570   
571   for(i = 0 ; i < carray_count(op_to_notify) ; i ++) {
572     struct etpan_thread_op * op;
573     int cancelled;
574     
575     op = carray_get(op_to_notify, i);
576     
577     cancelled = etpan_thread_op_cancelled(op);
578     
579     etpan_thread_op_lock(op);
580     
581     if (!op->callback_called) {
582       if (op->callback != NULL)
583         op->callback(op->cancelled, op->result, op->callback_data);
584     }
585     
586     etpan_thread_op_unlock(op);
587     
588     if (op->cleanup != NULL)
589       op->cleanup(op);
590   }
591   
592   carray_free(op_to_notify);
593   
594   i = 0;
595   while (i < carray_count(manager->thread_pending)) {
596     struct etpan_thread * thread;
597     
598     thread = carray_get(manager->thread_pending, i);
599     
600     if (etpan_thread_is_stopped(thread)) {
601       etpan_thread_join(thread);
602       
603       etpan_thread_free(thread);
604       
605       carray_delete_slow(manager->thread_pending, i);
606     }
607     else {
608       i ++;
609     }
610   }
611 }
612
613 void etpan_thread_manager_start(struct etpan_thread_manager * manager)
614 {
615   /* do nothing */
616 }
617
618 void etpan_thread_manager_stop(struct etpan_thread_manager * manager)
619 {
620   while (carray_count(manager->thread_pool) > 0) {
621     struct etpan_thread * thread;
622     
623     thread = carray_get(manager->thread_pool, 0);
624     etpan_thread_manager_terminate_thread(manager, thread);
625   }
626 }
627
628 int etpan_thread_manager_is_stopped(struct etpan_thread_manager * manager)
629 {
630   return ((carray_count(manager->thread_pending) == 0) && 
631       (carray_count(manager->thread_pool) == 0));
632 }
633
634 void etpan_thread_manager_join(struct etpan_thread_manager * manager)
635 {
636   while (!etpan_thread_manager_is_stopped(manager)) {
637     etpan_thread_manager_loop(manager);
638   }
639 }
640 #endif