23 #define EV_TPOLL_U64_BITSIZE (64) 24 #define EV_TPOLL_U8_BITSIZE (8) 25 #define EV_TPOLL_USABLE_BITSIZE (64) 77 #define EVT_TPOOL_MNG_LOCK(this) DPUTIL_LOCK(this->lock); 78 #define EVT_TPOOL_MNG_UNLOCK DPUTIL_UNLOCK; 91 if((data)&(0x1<<index))
break;
97 #define EV_TPOLL_FD_START (3) 99 #define EV_TPOLL_FDSU64PLACE(fd) (((fd)-EV_TPOLL_FD_START)/EV_TPOLL_U64_BITSIZE) 100 #define EV_TPOLL_FDINDEX(fd, place) ((fd) - ((place)*EV_TPOLL_USABLE_BITSIZE) - EV_TPOLL_FD_START) 101 #define EV_TPOLL_FDINDEX_U8(fd, place, place_u8) (EV_TPOLL_FDINDEX(fd,place) - ((place_u8) * EV_TPOLL_U8_BITSIZE)) 102 #define EV_TPOLL_FD(place, place_u8, index) (index + ((place)*EV_TPOLL_USABLE_BITSIZE) + ((place_u8) * EV_TPOLL_U8_BITSIZE) + EV_TPOLL_FD_START) 108 DEBUG_PRINT(
"%s:fds=%lx\n", __func__, this->fds[place].u64);
116 DEBUG_PRINT(
"%s:fds=%lx\n", __func__, this->fds[place].u64);
124 DEBUG_PRINT(
"%s:fds=%lx\n", __func__, this->fds[place].u64);
125 int ret = (this->fds[place].u8[place_u8] & ((0x1) <<
EV_TPOLL_FDINDEX_U8(fd,place,place_u8)) );
141 while(this->fdcnt && this->fds[place].u8[i]) {
193 size_t size =
sizeof(*info) * thread_size;
194 size += (
sizeof(uint64_t)*EV_TPOLL_MAXFDS*thread_size);
201 memset(info, 0, size);
204 void *current_p = info + thread_size;
207 for( i = 0; i < thread_size; i ++ ) {
211 current_p = info[i].
fds + EV_TPOLL_MAXFDS;
215 if(!info[i].tinstance) {
229 for( i = 0; i < thread_size ; i ++ ) {
245 CPU_ZERO(&child_set);
246 sched_getaffinity(0,
sizeof(child_set), &child_set);
247 return CPU_COUNT(&child_set)*2;
254 for( i = 0; i < this->thread_size; i ++ ) {
259 DEBUG_PRINT(
"event_tpool_thread_has_fd, in %d\n", threadid);
264 if( 0<= threadid && this->threads[threadid].fdcnt < this->threads[i].fdcnt) {
279 pthread_mutex_t *lock=NULL;
289 instance->
lock = (pthread_mutex_t *)calloc(1,
sizeof(pthread_mutex_t));
290 if(!instance->
lock) {
294 pthread_mutex_init(instance->
lock, NULL);
320 pthread_mutex_t *lock=NULL;
338 size = this->thread_size;
346 if(!
this || !subscriber) {
356 DEBUG_ERRPRINT(
"There is a setting for fd %d, please use event_tpool_update!\n", subscriber->
fd);
375 if(!
this || !subscriber) {
378 if(threadid < 0 || this->thread_size <= threadid) {
387 DEBUG_ERRPRINT(
"There is a setting for fd %d, please use event_tpool_update!\n", subscriber->
fd);
406 if(!
this || !subscriber || !event_handle) {
416 DEBUG_ERRPRINT(
"There is no setting for fd %d, please use event_tpool_add!\n", subscriber->
fd);
439 DEBUG_ERRPRINT(
"There is no setting for fd %d, please use event_tpool_update!\n", fd);
454 if(this->lock) pthread_mutex_init(this->lock, NULL);
457 for(i = 0; i < this->thread_size; i++ ) {
This is API of event thread.
static void event_tpool_free_fddata_list(EventTPoolThreadInfo this)
free fddata
EventTPoolThreadInfo threads
#define EVT_TPOOL_MNG_UNLOCK
size_t event_tpool_manager_get_threadnum(EventTPoolManager this)
get size of thread
int event_tpool_thread_unload_plugin(void)
EventTPoolManager event_tpool_manager_new(int thread_num, int is_threadsafe, const char *plugin_path)
new EventTPoolManager
static void event_tpool_manager_free_without_lock(EventTPoolManager this)
static int event_tpoll_get_far_right_bit_index(uint8_t data)
static void event_tpool_thread_insert_fddata(EventTPoolThreadInfo this, int fd)
static void event_tpool_free_fddata(EventTPoolThreadInfo this, int place)
void event_tpool_thread_stop(EventTPoolThread this)
stop thread
static void event_tpool_thread_info_stop_thread(EventTPoolThreadInfo this)
stop thread
uint8_t u8[EV_TPOLL_U8_BITSIZE]
#define EVT_TPOOL_MNG_LOCK(this)
union data_fds_u data_fds_u
void(* free)(EventInstance this)
void event_thread_set_stack_size(size_t stack_size)
This is API as ThreadPool design petten by using libevent.
event_tpool_add_result_t event_tpool_add_thread(EventTPoolManager this, int threadid, EventSubscriber subscriber, void *arg)
add EventSubscriber to threadpool, if you want to choose thead, please use it.
static void event_tpool_thread_delete_thread(EventTPoolThreadInfo this, int fd)
delete thread
static EventTPoolThreadInfo event_tpool_thread_info_new(size_t thread_size, const char *plugin_path)
new thread info
event_tpool_add_result_t event_tpool_update(EventTPoolManager this, EventTPoolThreadInfo event_handle, EventSubscriber subscriber, void *arg)
update EventSubscriber to threadpool.
#define EV_TPOLL_U8_BITSIZE
static void event_tpool_thread_info_start_thread(EventTPoolThreadInfo instance)
start thread
void event_tpool_thread_del(EventTPoolThread this, int fd)
delete subscriber
EventTPoolManager class instance definition.
void event_tpool_del(EventTPoolManager this, int fd)
delete EventSubscriber to threadapool.
event_tpool_add_result_t event_tpool_add(EventTPoolManager this, EventSubscriber subscriber, void *arg)
add EventSubscriber to threadpool
static void event_tpool_thread_set_fds(EventTPoolThreadInfo this, int fd)
EventSubscriber class instance definition, this is storaged in any threads.
struct event_tpool_manager_t * EventTPoolManager
EventTPoolManager class definition.
void event_tpool_thread_start(EventTPoolThread this)
start thread
static int event_tpool_manager_get_default_thrednum(void)
static int event_tpool_thread_is_set_fds(EventTPoolThreadInfo this, int fd)
void event_tpool_thread_update(EventTPoolThread this, EventSubscriber subscriber, void *arg)
update subscriber
#define EV_TPOLL_FDINDEX(fd, place)
#define EV_TPOLL_FDINDEX_U8(fd, place, place_u8)
void event_tpool_thread_add(EventTPoolThread this, EventSubscriber subscriber, void *arg)
add new subscriber
struct event_tpool_thread_info_t event_tpool_thread_info_t
EventTPoolThread tinstance
static void event_tpool_thread_info_free(EventTPoolThreadInfo this, size_t thread_size)
free thread info
static int event_tpool_manager_search_insert_thread(EventTPoolManager this, int fd, int *has_fd)
search insert place, to use event_tpool_thread_insert_thread
int fd
file descripter of this subscriber
#define EV_TPOLL_FDSU64PLACE(fd)
static int event_tpool_thread_has_fd(EventTPoolThreadInfo this, int fd)
search current setting
#define EV_TPOLL_USABLE_BITSIZE
EventTPoolThreadInfo event_handle
void event_tpool_manager_free(EventTPoolManager this)
destructor of EventTPoolManager
#define EV_TPOLL_FD(place, place_u8, index)
EventTPoolThread event_tpool_thread_new(size_t thread_size)
create and thread instance
#define DEBUG_ERRPRINT(...)
static void event_tpool_thread_unset_fds(EventTPoolThreadInfo this, int fd)
void event_thread_atfork_child(EventTPoolThread this)
void event_tpool_atfork_child(EventTPoolManager this)
Update member at fork, please call this API on child process if you use fork.
int event_tpool_thread_load_plugin(const char *plugin_path)
union event_tpool_thread_info_t::data_fds_u * fds
void event_tpool_set_stack_size(size_t stack_size)
Set thread stack size.