design_pattern_for_c  V 1.00
event_tpool_manager.c
Go to the documentation of this file.
1 
5 #define _GNU_SOURCE
6 #include <elf.h>
7 #include <stdio.h>
8 #include <sched.h>
9 #include "config.h"
10 #include "event_threadpool.h"
11 #include "event_thread.h"
12 #include "dp_util.h"
13 #include "config.h"
14 
15 /*************
16  * public define
17 *************/
21 /*Fix size of max fds 4096 value FD (to care sign */
22 //#define EV_TPOLL_MAXFDS (64)
23 #define EV_TPOLL_U64_BITSIZE (64)
24 #define EV_TPOLL_U8_BITSIZE (8)
25 #define EV_TPOLL_USABLE_BITSIZE (64)
26 /*TODO: care over 4096 fd?*/
27 typedef struct event_tpool_thread_info_t {
29  size_t fdcnt;
30  union data_fds_u{
31  uint64_t u64;
32  uint8_t u8[EV_TPOLL_U8_BITSIZE];/*u8*8 byte*/
33  } *fds;
35 
36 typedef union data_fds_u data_fds_u;
37 
38 static void event_tpool_thread_set_fds(EventTPoolThreadInfo this, int fd);
39 static void event_tpool_thread_unset_fds(EventTPoolThreadInfo this, int fd);
41 
43 
52 static int event_tpool_thread_has_fd(EventTPoolThreadInfo this, int fd);
58 static EventTPoolThreadInfo event_tpool_thread_info_new(size_t thread_size, const char * plugin_path);
60 static void event_tpool_thread_info_free(EventTPoolThreadInfo this, size_t thread_size);
69  size_t thread_size;
71  pthread_mutex_t *lock;
72 };
77 #define EVT_TPOOL_MNG_LOCK(this) DPUTIL_LOCK(this->lock);
78 #define EVT_TPOOL_MNG_UNLOCK DPUTIL_UNLOCK;
82 static int event_tpool_manager_search_insert_thread(EventTPoolManager this, int fd, int *has_fd);
85 /*************
86  * for thread information list API
87 *************/;
88 static int event_tpoll_get_far_right_bit_index(uint8_t data) {
89  int index=0;
90  for(index=0;index< EV_TPOLL_U8_BITSIZE;index++) {
91  if((data)&(0x1<<index))break;
92  }
93  return index;
94 }
95 
96 //STDIN, STDOUT, STDERR 0, 1, 2
97 #define EV_TPOLL_FD_START (3)
98 
99 #define EV_TPOLL_FDSU64PLACE(fd) (((fd)-EV_TPOLL_FD_START)/EV_TPOLL_U64_BITSIZE)
100 #define EV_TPOLL_FDINDEX(fd, place) ((fd) - ((place)*EV_TPOLL_USABLE_BITSIZE) - EV_TPOLL_FD_START)
101 #define EV_TPOLL_FDINDEX_U8(fd, place, place_u8) (EV_TPOLL_FDINDEX(fd,place) - ((place_u8) * EV_TPOLL_U8_BITSIZE))
102 #define EV_TPOLL_FD(place, place_u8, index) (index + ((place)*EV_TPOLL_USABLE_BITSIZE) + ((place_u8) * EV_TPOLL_U8_BITSIZE) + EV_TPOLL_FD_START)
103 
105  uint64_t place = EV_TPOLL_FDSU64PLACE(fd);
106  uint16_t place_u8 = EV_TPOLL_FDINDEX(fd,place)/EV_TPOLL_U8_BITSIZE;
107  DEBUG_PRINT( "%s:place=%lu\n", __func__, place);
108  DEBUG_PRINT( "%s:fds=%lx\n", __func__, this->fds[place].u64);
109  this->fds[place].u8[place_u8] |= (0x1) << EV_TPOLL_FDINDEX_U8(fd,place, place_u8);
110 }
112  DEBUG_PRINT("%s:fd=%d\n", __func__, fd);
113  uint64_t place = EV_TPOLL_FDSU64PLACE(fd);
114  uint16_t place_u8 = EV_TPOLL_FDINDEX(fd,place)/EV_TPOLL_U8_BITSIZE;
115  DEBUG_PRINT( "%s:place=%lu\n", __func__, place);
116  DEBUG_PRINT( "%s:fds=%lx\n", __func__, this->fds[place].u64);
117  this->fds[place].u8[place_u8] &= ~((0x1) << EV_TPOLL_FDINDEX_U8(fd,place,place_u8));
118 }
120  DEBUG_PRINT("%s:fd=%d\n", __func__, fd);
121  uint64_t place = EV_TPOLL_FDSU64PLACE(fd);
122  uint16_t place_u8 = EV_TPOLL_FDINDEX(fd,place)/EV_TPOLL_U8_BITSIZE;
123  DEBUG_PRINT( "%s:place=%lu\n", __func__, place);
124  DEBUG_PRINT( "%s:fds=%lx\n", __func__, this->fds[place].u64);
125  int ret = (this->fds[place].u8[place_u8] & ((0x1) << EV_TPOLL_FDINDEX_U8(fd,place,place_u8)) );
126  return ret;
127 }
128 
130  event_tpool_thread_set_fds(this, fd);
131  this->fdcnt++;
132 }
133 
135  return event_tpool_thread_is_set_fds(this, fd);
136 }
137 
138 static void event_tpool_free_fddata(EventTPoolThreadInfo this, int place) {
139  int i=0, fd=0;
140  for(i = 0; i < EV_TPOLL_U8_BITSIZE && this->fdcnt; i ++) {
141  while(this->fdcnt && this->fds[place].u8[i]) {
142  /*get fd place*/
143  fd = EV_TPOLL_FD(place, i, event_tpoll_get_far_right_bit_index(this->fds[place].u8[i]));
144 
145  /*delete event*/
146  event_tpool_thread_del(this->tinstance, fd);
147 
148  /*unset*/event_tpool_thread_unset_fds(this, fd);
149  this->fdcnt--;
150  }
151  }
152 }
153 
156  size_t i = 0;
157  int fd=0;
158  int fd_base=0;
159  for(i = 0; this->fdcnt && i < EV_TPOLL_MAXFDS; i ++, fd_base += EV_TPOLL_USABLE_BITSIZE) {
160  event_tpool_free_fddata(this, i);
161  }
162 }
163 
166 }
167 
170  //delete all list
172  //stop thread, instance is release in thread
173  event_tpool_thread_stop(this->tinstance);
174 }
175 
179  this->fdcnt--;
180 }
181 
182 /*************
183  * for list API
184 *************/
185 static EventTPoolThreadInfo event_tpool_thread_info_new(size_t thread_size, const char * plugin_path) {
186  EventTPoolThreadInfo info = NULL;
187 
188  if(event_tpool_thread_load_plugin(plugin_path) < 0) {
189  DEBUG_ERRPRINT("Failed to load plugin!\n" );
190  return NULL;
191  }
192 
193  size_t size = sizeof(*info) * thread_size;/*sizeof EventTPoolThreadInfo*/
194  size += (sizeof(uint64_t)*EV_TPOLL_MAXFDS*thread_size);/*sizeof fds in EventTPoolThreadInfo*/
195  info = malloc(size);
196  if(!info) {
197  DEBUG_ERRPRINT("Failed to get instance threads!\n" );
199  return NULL;
200  }
201  memset(info, 0, size);
202 
203  /*set place for fds*/
204  void *current_p = info + thread_size;
205 
206  size_t i=0;
207  for( i = 0; i < thread_size; i ++ ) {
208  //keep 64bit * EV_TPOLL_MAXFDS => 4096 fds place
209  DEBUG_PRINT("info[%d]=%p!\n", i, current_p );
210  info[i].fds = (data_fds_u *)current_p;
211  current_p = info[i].fds + EV_TPOLL_MAXFDS;
212 
213  /*create thread instance*/
214  info[i].tinstance = event_tpool_thread_new(thread_size);
215  if(!info[i].tinstance) {
217  DEBUG_ERRPRINT("Failed to create thread new!\n" );
218  return NULL;
219  }
220 
222  }
223  return info;
224 }
225 
227 static void event_tpool_thread_info_free(EventTPoolThreadInfo this, size_t thread_size) {
228  size_t i=0;
229  for( i = 0; i < thread_size ; i ++ ) {
231  }
232  free(this);
233 }
234 /*************
235  * for private API
236 *************/
238  event_tpool_thread_info_free(this->threads, this->thread_size);
239  //lock is free after unlock
240  free(this);
241 }
242 
244  cpu_set_t child_set;
245  CPU_ZERO(&child_set);
246  sched_getaffinity(0, sizeof(child_set), &child_set);
247  return CPU_COUNT(&child_set)*2;
248 }
249 
250 static int event_tpool_manager_search_insert_thread(EventTPoolManager this, int fd, int *has_fd) {
251  int threadid=-1;
252  size_t i=0;
253  *has_fd = 0;
254  for( i = 0; i < this->thread_size; i ++ ) {
255  //already add fd?
256  if(event_tpool_thread_has_fd(&this->threads[i], fd)) {
257  *has_fd = 1;
258  threadid=i;
259  DEBUG_PRINT("event_tpool_thread_has_fd, in %d\n", threadid);
260  break;
261  }
262 
263  //add to other place
264  if( 0<= threadid && this->threads[threadid].fdcnt < this->threads[i].fdcnt) {
265  continue;
266  }
267 
268  threadid=i;
269  }
270  return threadid;
271 }
272 
273 /*************
274  * for public API
275 *************/
278 EventTPoolManager event_tpool_manager_new(int thread_num, int is_threadsafe, const char * plugin_path) {
279  pthread_mutex_t *lock=NULL;
280 //DEBUG_INIT
281  EventTPoolManager instance = (EventTPoolManager)calloc(1, sizeof(*instance));
282  if(!instance) {
283  DEBUG_ERRPRINT("Failed to get instance!\n" );
284  return NULL;
285  }
286 
287  //get lock instance
288  if(is_threadsafe) {
289  instance->lock = (pthread_mutex_t *)calloc(1, sizeof(pthread_mutex_t));
290  if(!instance->lock) {
291  DEBUG_ERRPRINT("Failed to get instance lock!\n" );
292  goto err;
293  }
294  pthread_mutex_init(instance->lock, NULL);
295  }
296 
297  //get thread instance
298  if(thread_num<=0) {
300  } else {
301  instance->thread_size = (size_t)thread_num;
302  }
303 
304  instance->threads = event_tpool_thread_info_new(instance->thread_size, plugin_path);
305  if(!instance->threads) {
306  DEBUG_ERRPRINT("Failed to get instance threads!\n" );
307  goto err;
308  }
309 //DEBUG_EXIT
310  return instance;
311 
312 err:
313  lock=instance->lock;
315  free(lock);
316  return NULL;
317 }
318 
320  pthread_mutex_t *lock=NULL;
321  if(!this) {
322  return;
323  }
324 EVT_TPOOL_MNG_LOCK(this);
325  lock = this->lock;
328  free(lock);
329 }
330 
332  size_t size=0;
333  if(!this) {
334  return -1;
335  }
336 
337 EVT_TPOOL_MNG_LOCK(this);
338  size = this->thread_size;
340  return size;
341 }
342 
344  event_tpool_add_result_t result={-1, NULL};
345  int id=-1;
346  if(!this || !subscriber) {
347  return result;
348  }
349 EVT_TPOOL_MNG_LOCK(this);
350 
351  int has_fd=0;
352  id = event_tpool_manager_search_insert_thread(this, subscriber->fd, &has_fd);
353 
354  /*reject there is same fd*/
355  if(has_fd) {
356  DEBUG_ERRPRINT("There is a setting for fd %d, please use event_tpool_update!\n", subscriber->fd);
357  goto end;
358  }
359 
360  /*accept if there is no same fd*/
361  event_tpool_thread_insert_fddata(&this->threads[id], subscriber->fd);
362  result.event_handle = &this->threads[id];
363  result.result = id;
364 end:
365 
367  /*after unlock, call to add API on event_thread*/
368  if(result.event_handle) event_tpool_thread_add(result.event_handle->tinstance, subscriber, arg);
369 
370  return result;
371 }
372 
374  event_tpool_add_result_t result={.result=-1, .event_handle=NULL};
375  if(!this || !subscriber) {
376  return result;
377  }
378  if(threadid < 0 || this->thread_size <= threadid) {
379  return result;
380  }
381 
382 EVT_TPOOL_MNG_LOCK(this);
383  int id, has_fd=0;
384  id = event_tpool_manager_search_insert_thread(this, subscriber->fd, &has_fd);
385  /*reject there is same fd*/
386  if(has_fd) {
387  DEBUG_ERRPRINT("There is a setting for fd %d, please use event_tpool_update!\n", subscriber->fd);
388  goto end;
389  }
390 
391  /*accept if there is no same fd*/
392  event_tpool_thread_insert_fddata(&this->threads[threadid], subscriber->fd);
393  result.event_handle = &this->threads[threadid];
394  result.result = threadid;
395 
396 end:
398  /*after unlock, call to add API on event_thread*/
399  if(result.event_handle) event_tpool_thread_add(result.event_handle->tinstance, subscriber, arg);
400  return result;
401 }
402 
404  event_tpool_add_result_t result={.result=-1, .event_handle=NULL};
405  int id = -1;
406  if(!this || !subscriber || !event_handle) {
407  return result;
408  }
409 
410 EVT_TPOOL_MNG_LOCK(this);
411 
412  /*Check is it deleted?*/
413  int has_fd;
414  id = event_tpool_manager_search_insert_thread(this, subscriber->fd, &has_fd);
415  if(!has_fd) {
416  DEBUG_ERRPRINT("There is no setting for fd %d, please use event_tpool_add!\n", subscriber->fd);
417  goto end;
418  }
419 
420  result.event_handle = event_handle;
421  result.result = id;
422 
423 end:
425  /*after unlock, call to add API on event_thread*/
426  if(result.event_handle) event_tpool_thread_update(result.event_handle->tinstance, subscriber, arg);
427  return result;
428 }
429 
430 void event_tpool_del(EventTPoolManager this, int fd) {
431  int id=0, has_fd=0;
432  if(!this) {
433  return;
434  }
435 EVT_TPOOL_MNG_LOCK(this);
436 
437  id = event_tpool_manager_search_insert_thread(this, fd, &has_fd);
438  if(!has_fd) {
439  DEBUG_ERRPRINT("There is no setting for fd %d, please use event_tpool_update!\n", fd);
440  goto end;
441  }
442 
443  event_tpool_thread_delete_thread(&this->threads[id], fd);
444 end:
446 
447  /*after unlock, call to delete API on event_thread*/
448  if(has_fd) {
449  event_tpool_thread_del(this->threads[id].tinstance, fd);
450  }
451 }
452 
454  if(this->lock) pthread_mutex_init(this->lock, NULL);
455 
456  size_t i=0;
457  for(i = 0; i < this->thread_size; i++ ) {
458  event_thread_atfork_child(this->threads[i].tinstance);
459  }
460 }
461 
462 void event_tpool_set_stack_size(size_t stack_size) {
463  event_thread_set_stack_size(stack_size);
464 }
This is API of event thread.
static void event_tpool_free_fddata_list(EventTPoolThreadInfo this)
free fddata
EventTPoolThreadInfo threads
Utility headers
#define EVT_TPOOL_MNG_UNLOCK
size_t event_tpool_manager_get_threadnum(EventTPoolManager this)
get size of thread
int event_tpool_thread_unload_plugin(void)
Definition: event_thread.c:649
EventTPoolManager event_tpool_manager_new(int thread_num, int is_threadsafe, const char *plugin_path)
new EventTPoolManager
static void event_tpool_manager_free_without_lock(EventTPoolManager this)
static int event_tpoll_get_far_right_bit_index(uint8_t data)
static void event_tpool_thread_insert_fddata(EventTPoolThreadInfo this, int fd)
static void event_tpool_free_fddata(EventTPoolThreadInfo this, int place)
void event_tpool_thread_stop(EventTPoolThread this)
stop thread
Definition: event_thread.c:585
static void event_tpool_thread_info_stop_thread(EventTPoolThreadInfo this)
stop thread
#define EVT_TPOOL_MNG_LOCK(this)
union data_fds_u data_fds_u
void(* free)(EventInstance this)
Definition: event_thread.c:39
void event_thread_set_stack_size(size_t stack_size)
Definition: event_thread.c:619
This is API as ThreadPool design petten by using libevent.
event_tpool_add_result_t event_tpool_add_thread(EventTPoolManager this, int threadid, EventSubscriber subscriber, void *arg)
add EventSubscriber to threadpool, if you want to choose thead, please use it.
static void event_tpool_thread_delete_thread(EventTPoolThreadInfo this, int fd)
delete thread
static EventTPoolThreadInfo event_tpool_thread_info_new(size_t thread_size, const char *plugin_path)
new thread info
event_tpool_add_result_t event_tpool_update(EventTPoolManager this, EventTPoolThreadInfo event_handle, EventSubscriber subscriber, void *arg)
update EventSubscriber to threadpool.
#define DEBUG_PRINT(...)
Definition: dp_debug.h:55
#define EV_TPOLL_U8_BITSIZE
static void event_tpool_thread_info_start_thread(EventTPoolThreadInfo instance)
start thread
void event_tpool_thread_del(EventTPoolThread this, int fd)
delete subscriber
Definition: event_thread.c:609
EventTPoolManager class instance definition.
void event_tpool_del(EventTPoolManager this, int fd)
delete EventSubscriber to threadapool.
event_tpool_add_result_t event_tpool_add(EventTPoolManager this, EventSubscriber subscriber, void *arg)
add EventSubscriber to threadpool
pthread_mutex_t * lock
static void event_tpool_thread_set_fds(EventTPoolThreadInfo this, int fd)
EventSubscriber class instance definition, this is storaged in any threads.
struct event_tpool_manager_t * EventTPoolManager
EventTPoolManager class definition.
void event_tpool_thread_start(EventTPoolThread this)
start thread
Definition: event_thread.c:569
static int event_tpool_manager_get_default_thrednum(void)
static int event_tpool_thread_is_set_fds(EventTPoolThreadInfo this, int fd)
void event_tpool_thread_update(EventTPoolThread this, EventSubscriber subscriber, void *arg)
update subscriber
Definition: event_thread.c:604
#define EV_TPOLL_FDINDEX(fd, place)
#define EV_TPOLL_FDINDEX_U8(fd, place, place_u8)
void event_tpool_thread_add(EventTPoolThread this, EventSubscriber subscriber, void *arg)
add new subscriber
Definition: event_thread.c:601
struct event_tpool_thread_info_t event_tpool_thread_info_t
static void event_tpool_thread_info_free(EventTPoolThreadInfo this, size_t thread_size)
free thread info
static int event_tpool_manager_search_insert_thread(EventTPoolManager this, int fd, int *has_fd)
search insert place, to use event_tpool_thread_insert_thread
int fd
file descripter of this subscriber
thread information
Definition: event_thread.c:105
#define EV_TPOLL_FDSU64PLACE(fd)
static int event_tpool_thread_has_fd(EventTPoolThreadInfo this, int fd)
search current setting
#define EV_TPOLL_USABLE_BITSIZE
EventTPoolThreadInfo event_handle
void event_tpool_manager_free(EventTPoolManager this)
destructor of EventTPoolManager
#define EV_TPOLL_FD(place, place_u8, index)
EventTPoolThread event_tpool_thread_new(size_t thread_size)
create and thread instance
Definition: event_thread.c:537
#define DEBUG_ERRPRINT(...)
Definition: dp_debug.h:69
static void event_tpool_thread_unset_fds(EventTPoolThreadInfo this, int fd)
void event_thread_atfork_child(EventTPoolThread this)
Definition: event_thread.c:612
void event_tpool_atfork_child(EventTPoolManager this)
Update member at fork, please call this API on child process if you use fork.
int event_tpool_thread_load_plugin(const char *plugin_path)
Definition: event_thread.c:623
union event_tpool_thread_info_t::data_fds_u * fds
add result definition
void event_tpool_set_stack_size(size_t stack_size)
Set thread stack size.