c530e0a1dc6df906c030581e2cff69df50010c19
[claws.git] / src / etpan / etpan-thread-manager.c
1 #include "etpan-thread-manager.h"
2
3 #include <stdlib.h>
4 #include <pthread.h>
5 #include <libetpan/mailsem.h>
6 #include <semaphore.h>
7 #include <unistd.h>
8
9 #include "etpan-log.h"
10 #include "etpan-errors.h"
11
12 #define POOL_UNBOUND_MAX 4
13
14 #define POOL_INIT_SIZE 8
15 #define OP_INIT_SIZE 8
16
17 enum {
18   TERMINATE_STATE_NONE,
19   TERMINATE_STATE_REQUESTED,
20   TERMINATE_STATE_DONE,
21 };
22
23 struct etpan_thread_manager * etpan_thread_manager_new(void)
24 {
25   struct etpan_thread_manager * manager;
26   int r;
27   
28   manager = malloc(sizeof(* manager));
29   if (manager == NULL)
30     goto err;
31   
32   manager->thread_pool = carray_new(POOL_INIT_SIZE);
33   if (manager->thread_pool == NULL)
34     goto free;
35
36   manager->thread_pending = carray_new(POOL_INIT_SIZE);
37   if (manager->thread_pending == NULL)
38     goto free_pool;
39   
40   manager->can_create_thread = 1;
41   manager->unbound_count = 0;
42   
43   r = pipe(manager->notify_fds);
44   if (r < 0)
45     goto free_pending;
46   
47   return manager;
48   
49  free_pending:
50   carray_free(manager->thread_pending);
51  free_pool:
52   carray_free(manager->thread_pool);
53  free:
54   free(manager);
55  err:
56   return NULL;
57 }
58
59 void etpan_thread_manager_free(struct etpan_thread_manager * manager)
60 {
61   close(manager->notify_fds[1]);
62   close(manager->notify_fds[0]);
63   carray_free(manager->thread_pending);
64   carray_free(manager->thread_pool);
65   free(manager);
66 }
67
68 struct etpan_thread * etpan_thread_new(void)
69 {
70   struct etpan_thread * thread;
71   int r;
72   
73   thread = malloc(sizeof(* thread));
74   if (thread == NULL)
75     goto err;
76   
77   r = pthread_mutex_init(&thread->lock, NULL);
78   if (r != 0)
79     goto free;
80
81   thread->op_list = carray_new(OP_INIT_SIZE);
82   if (thread->op_list == NULL)
83     goto destroy_lock;
84   
85   thread->op_done_list = carray_new(OP_INIT_SIZE);
86   if (thread->op_done_list == NULL)
87     goto free_op_list;
88   
89   thread->start_sem = mailsem_new();
90   if (thread->start_sem == NULL)
91     goto free_op_done_list;
92   
93   thread->stop_sem = mailsem_new();
94   if (thread->stop_sem == NULL)
95     goto free_startsem;
96   
97   thread->op_sem = mailsem_new();
98   if (thread->op_sem == NULL)
99     goto free_stopsem;
100   
101   thread->manager = NULL;
102   thread->bound_count = 0;
103   thread->terminate_state = TERMINATE_STATE_NONE;
104   
105   return thread;
106   
107  free_stopsem:
108   mailsem_free(thread->stop_sem);
109  free_startsem:
110   mailsem_free(thread->start_sem);
111  free_op_done_list:
112   carray_free(thread->op_done_list);
113  free_op_list:
114   carray_free(thread->op_list);
115  destroy_lock:
116   pthread_mutex_destroy(&thread->lock);
117  free:
118   free(thread);
119  err:
120   return NULL;
121 }
122
123 void etpan_thread_free(struct etpan_thread * thread)
124 {
125   mailsem_free(thread->op_sem);
126   mailsem_free(thread->stop_sem);
127   mailsem_free(thread->start_sem);
128   carray_free(thread->op_done_list);
129   carray_free(thread->op_list);
130   pthread_mutex_destroy(&thread->lock);
131   free(thread);
132 }
133
134 struct etpan_thread_op * etpan_thread_op_new(void)
135 {
136   struct etpan_thread_op * op;
137   int r;
138   
139   op = malloc(sizeof(* op));
140   if (op == NULL)
141     goto err;
142   
143   op->thread = NULL;
144   op->run = NULL;
145   op->callback = NULL;
146   op->callback_data = NULL;
147   op->callback_called = 0;
148   op->cancellable = 0;
149   op->cancelled = 0;
150   op->param = NULL;
151   op->result = NULL;
152   
153   r = pthread_mutex_init(&op->lock, NULL);
154   if (r != 0)
155     goto free;
156   
157   return op;
158   
159  free:
160   free(op);
161  err:
162   return NULL;
163 }
164
165 void etpan_thread_op_free(struct etpan_thread_op * op)
166 {
167   pthread_mutex_destroy(&op->lock);
168   free(op);
169 }
170
171 static struct etpan_thread *
172 etpan_thread_manager_create_thread(struct etpan_thread_manager * manager)
173 {
174   struct etpan_thread * thread;
175   int r;
176   
177   thread = etpan_thread_new();
178   if (thread == NULL)
179     goto err;
180   
181   thread->manager = manager;
182   
183   r = etpan_thread_start(thread);
184   if (r != NO_ERROR)
185     goto free_thread;
186   
187   r = carray_add(manager->thread_pool, thread, NULL);
188   if (r < 0) {
189     etpan_thread_stop(thread);
190     goto free_thread;
191   }
192   
193   return thread;
194   
195  free_thread:
196   etpan_thread_free(thread);
197  err:
198   return NULL;
199 }
200
201 static void
202 etpan_thread_manager_terminate_thread(struct etpan_thread_manager * manager,
203     struct etpan_thread * thread)
204 {
205   unsigned int i;
206   int r;
207   
208   for(i = 0 ; i < carray_count(manager->thread_pool) ; i ++) {
209     if (carray_get(manager->thread_pool, i) == thread) {
210       carray_delete(manager->thread_pool, i);
211       break;
212     }
213   }
214   
215   if (!etpan_thread_is_bound(thread))
216     manager->unbound_count --;
217   
218   r = carray_add(manager->thread_pending, thread, NULL);
219   if (r < 0) {
220     ETPAN_LOG("complete failure of thread due to lack of memory (thread stop)");
221   }
222   
223   etpan_thread_stop(thread);
224 }
225
226 static void manager_notify(struct etpan_thread_manager * manager)
227 {
228   char ch;
229   ssize_t r;
230   
231   ch = 1;
232   r = write(manager->notify_fds[1], &ch, 1);
233 }
234
235 static void manager_ack(struct etpan_thread_manager * manager)
236 {
237   char ch;
238   ssize_t r;
239   
240   r = read(manager->notify_fds[0], &ch, 1);
241 }
242
243 static void thread_lock(struct etpan_thread * thread)
244 {
245   pthread_mutex_lock(&thread->lock);
246 }
247
248 static void thread_unlock(struct etpan_thread * thread)
249 {
250   pthread_mutex_unlock(&thread->lock);
251 }
252
253 static void thread_notify(struct etpan_thread * thread)
254 {
255   manager_notify(thread->manager);
256 }
257
258 static void * thread_run(void * data)
259 {
260   struct etpan_thread * thread;
261   int r;
262   
263   thread = data;
264   
265   mailsem_up(thread->start_sem);
266   
267   while (1) {
268     int do_quit;
269     struct etpan_thread_op * op;
270     
271     mailsem_down(thread->op_sem);
272     
273     do_quit = 0;
274     op = NULL;
275     thread_lock(thread);
276     if (carray_count(thread->op_list) > 0) {
277       op = carray_get(thread->op_list, 0);
278       carray_delete_slow(thread->op_list, 0);
279     }
280     else {
281       do_quit = 1;
282     }
283     thread_unlock(thread);
284     
285     if (do_quit) {
286       break;
287     }
288     
289     if (!etpan_thread_op_cancelled(op)) {
290       if (op->run != NULL)
291         op->run(op);
292     }
293     
294     thread_lock(thread);
295     r = carray_add(thread->op_done_list, op, NULL);
296     if (r < 0) {
297       ETPAN_LOG("complete failure of thread due to lack of memory (op done)");
298     }
299     thread_unlock(thread);
300     
301     thread_notify(thread);
302   }
303   
304   thread_lock(thread);
305   thread->terminate_state = TERMINATE_STATE_DONE;
306   thread_unlock(thread);
307   
308   thread_notify(thread);
309   
310   mailsem_up(thread->stop_sem);
311   
312   return NULL;
313 }
314
315 int etpan_thread_start(struct etpan_thread * thread)
316 {
317   int r;
318   
319   r = pthread_create(&thread->th_id, NULL, thread_run, thread);
320   if (r != 0)
321     return ERROR_MEMORY;
322   
323   mailsem_down(thread->start_sem);
324   
325   return NO_ERROR;
326 }
327
328 void etpan_thread_stop(struct etpan_thread * thread)
329 {
330   thread_lock(thread);
331   thread->terminate_state = TERMINATE_STATE_REQUESTED;
332   thread_unlock(thread);
333   
334   mailsem_up(thread->op_sem);
335   
336   /* this thread will be joined in the manager loop */
337 }
338
339 int etpan_thread_is_stopped(struct etpan_thread * thread)
340 {
341   int stopped;
342   
343   thread_lock(thread);
344   stopped = (thread->terminate_state == TERMINATE_STATE_DONE);
345   thread_unlock(thread);
346   
347   return stopped;
348 }
349
350 void etpan_thread_join(struct etpan_thread * thread)
351 {
352   mailsem_down(thread->stop_sem);
353   pthread_join(thread->th_id, NULL);
354 }
355
356 struct etpan_thread *
357 etpan_thread_manager_get_thread(struct etpan_thread_manager * manager)
358 {
359   struct etpan_thread * chosen_thread;
360   unsigned int chosen_thread_load;
361   unsigned int i;
362   struct etpan_thread * thread;
363   
364   /* chose a thread */
365   
366   chosen_thread = NULL;
367   chosen_thread_load = 0;
368   
369   for(i = 0 ; i < carray_count(manager->thread_pool) ; i ++) {
370     thread = carray_get(manager->thread_pool, i);
371     if (etpan_thread_is_bound(thread))
372       continue;
373     
374     if (chosen_thread == NULL) {
375       chosen_thread = thread;
376       chosen_thread_load = etpan_thread_get_load(thread);
377       
378       if (chosen_thread_load == 0)
379         break;
380     }
381     else {
382       unsigned int load;
383       
384       load = etpan_thread_get_load(thread);
385       
386       if (load < chosen_thread_load) {
387         chosen_thread = thread;
388         chosen_thread_load = load;
389       }
390     }
391   }
392   
393   if (chosen_thread != NULL) {
394     if (manager->can_create_thread && (chosen_thread_load != 0)) {
395       chosen_thread = NULL;
396     }
397   }
398   
399   /* choice done */
400   
401   if (chosen_thread != NULL)
402     return chosen_thread;
403   
404   thread = etpan_thread_manager_create_thread(manager);
405   if (thread == NULL)
406     goto err;
407   
408   manager->unbound_count ++;
409   if (manager->unbound_count >= POOL_UNBOUND_MAX)
410     manager->can_create_thread = 0;
411   
412   return thread;
413   
414  err:
415   return NULL;
416 }
417
418 unsigned int etpan_thread_get_load(struct etpan_thread * thread)
419 {
420   unsigned int load;
421   
422   thread_lock(thread);
423   load = carray_count(thread->op_list);
424   thread_unlock(thread);
425   
426   return load;
427 }
428
429 void etpan_thread_bind(struct etpan_thread * thread)
430 {
431   thread->bound_count ++;
432 }
433
434 void etpan_thread_unbind(struct etpan_thread * thread)
435 {
436   thread->bound_count --;
437 }
438
439 int etpan_thread_is_bound(struct etpan_thread * thread)
440 {
441   return (thread->bound_count != 0);
442 }
443
444 int etpan_thread_op_schedule(struct etpan_thread * thread,
445                              struct etpan_thread_op * op)
446 {
447   int r;
448   
449   if (thread->terminate_state != TERMINATE_STATE_NONE)
450     return ERROR_INVAL;
451   
452   thread_lock(thread);
453   r = carray_add(thread->op_list, op, NULL);
454   thread_unlock(thread);
455   
456   if (r < 0)
457     return ERROR_MEMORY;
458   
459   op->thread = thread;
460   mailsem_up(thread->op_sem);
461   
462   return NO_ERROR;
463 }
464
465 void etpan_thread_op_lock(struct etpan_thread_op * op)
466 {
467   pthread_mutex_lock(&op->lock);
468 }
469
470 void etpan_thread_op_unlock(struct etpan_thread_op * op)
471 {
472   pthread_mutex_unlock(&op->lock);
473 }
474
475 int etpan_thread_op_cancelled(struct etpan_thread_op * op)
476 {
477   int cancelled;
478   
479   cancelled = 0;
480   etpan_thread_op_lock(op);
481   if (op->cancellable)
482     cancelled = op->cancelled;
483   etpan_thread_op_unlock(op);
484   
485   return cancelled;
486 }
487
488 void etpan_thread_op_cancel(struct etpan_thread_op * op)
489
490   etpan_thread_op_lock(op);
491   if (op->cancelled) {
492     ETPAN_LOG("cancelled twice");
493   }
494   op->cancelled = 1;
495   if ((op->callback != NULL) && (!op->callback_called)) {
496     op->callback(op->cancelled, op->result, op->callback_data);
497     op->callback_called = 1;
498   }
499   etpan_thread_op_unlock(op);
500 }
501
502 int etpan_thread_manager_op_schedule(struct etpan_thread_manager * manager,
503     struct etpan_thread_op * op)
504 {
505   struct etpan_thread * thread;
506   
507   thread = etpan_thread_manager_get_thread(manager);
508   
509   if (thread == NULL)
510     goto err;
511   
512   return etpan_thread_op_schedule(thread, op);
513   
514  err:
515   return ERROR_MEMORY;
516 }
517
518 int etpan_thread_manager_get_fd(struct etpan_thread_manager * manager)
519 {
520   return manager->notify_fds[0];
521 }
522
523 static void loop_thread_list(carray * op_to_notify,
524     carray * thread_list)
525 {
526   unsigned int i;
527   int r;
528   
529   for(i = 0 ; i < carray_count(thread_list) ; i ++) {
530     struct etpan_thread * thread;
531     unsigned int j;
532     
533     thread = carray_get(thread_list, i);
534     
535     thread_lock(thread);
536     
537     for(j = 0 ; j < carray_count(thread->op_done_list) ; j ++) {
538       struct etpan_thread_op * op;
539       
540       op = carray_get(thread->op_done_list, j);
541       r = carray_add(op_to_notify, op, NULL);
542       if (r < 0) {
543         ETPAN_LOG("complete failure of thread due to lack of memory (callback)");
544         break;
545       }
546     }
547     carray_set_size(thread->op_done_list, 0);
548     
549     thread_unlock(thread);
550   }
551 }
552
553 void etpan_thread_manager_loop(struct etpan_thread_manager * manager)
554 {
555   carray * op_to_notify;
556   unsigned int i;
557   
558   manager_ack(manager);
559   
560   op_to_notify = carray_new(OP_INIT_SIZE);
561   
562   loop_thread_list(op_to_notify, manager->thread_pool);
563   loop_thread_list(op_to_notify, manager->thread_pending);
564   
565   for(i = 0 ; i < carray_count(op_to_notify) ; i ++) {
566     struct etpan_thread_op * op;
567     int cancelled;
568     
569     op = carray_get(op_to_notify, i);
570     
571     cancelled = etpan_thread_op_cancelled(op);
572     
573     etpan_thread_op_lock(op);
574     
575     if (!op->callback_called) {
576       if (op->callback != NULL)
577         op->callback(op->cancelled, op->result, op->callback_data);
578     }
579     
580     etpan_thread_op_unlock(op);
581     
582     if (op->cleanup != NULL)
583       op->cleanup(op);
584   }
585   
586   carray_free(op_to_notify);
587   
588   i = 0;
589   while (i < carray_count(manager->thread_pending)) {
590     struct etpan_thread * thread;
591     
592     thread = carray_get(manager->thread_pending, i);
593     
594     if (etpan_thread_is_stopped(thread)) {
595       etpan_thread_join(thread);
596       
597       etpan_thread_free(thread);
598       
599       carray_delete_slow(manager->thread_pending, i);
600     }
601     else {
602       i ++;
603     }
604   }
605 }
606
607 void etpan_thread_manager_start(struct etpan_thread_manager * manager)
608 {
609   /* do nothing */
610 }
611
612 void etpan_thread_manager_stop(struct etpan_thread_manager * manager)
613 {
614   while (carray_count(manager->thread_pool) > 0) {
615     struct etpan_thread * thread;
616     
617     thread = carray_get(manager->thread_pool, 0);
618     etpan_thread_manager_terminate_thread(manager, thread);
619   }
620 }
621
622 int etpan_thread_manager_is_stopped(struct etpan_thread_manager * manager)
623 {
624   return ((carray_count(manager->thread_pending) == 0) && 
625       (carray_count(manager->thread_pool) == 0));
626 }
627
628 void etpan_thread_manager_join(struct etpan_thread_manager * manager)
629 {
630   while (!etpan_thread_manager_is_stopped(manager)) {
631     etpan_thread_manager_loop(manager);
632   }
633 }