10 #include <sys/eventfd.h> 19 #define EVENT_THREAD_WAIT_TIMEOUT (2) 21 #define EVENT_THREAD_STACKSIZE (256 * 1024) 87 #ifdef CHECK_STACKSIZE 88 #define MAGIC_NUMBER 'Z' 100 #define event_thread_pop(this) (EventSubscriberData)dputil_list_pop((DPUtilList)(this)) 101 #define event_thread_pull(this, data) dputil_list_pull((DPUtilList)(this), (DPUtilListData)(data)) 102 #define event_thread_push(this, data) dputil_list_push((DPUtilList)(this), (DPUtilListData)(data)) 115 #ifdef CHECK_STACKSIZE 122 #define EVMSG_LOCK(this) DPUTIL_LOCK(&this->msgdata.lock); 123 #define EVMSG_UNLOCK DPUTIL_UNLOCK 187 ret = eventfd_write(this->eventfd, 1);
189 DEBUG_PRINT(
"################Failed to send event\n");
192 struct timespec timeout;
193 clock_gettime(CLOCK_REALTIME, &timeout);
195 ret = pthread_cond_timedwait(&this->msgdata.cond, &this->msgdata.lock, &timeout);
196 if(ret == ETIMEDOUT) {
206 return eventfd_write(this->eventfd, 1);
210 int is_ownthread = ((this->tid==0) || (pthread_self() == this->tid));
213 msg = &this->msgdata.store_msgs[this->msgdata.store_msg_cnt++];
215 msg = &this->msgdata.msg;
218 memset(msg, 0,
sizeof(*msg));
225 if(ret < 0) this->msgdata.store_msg_cnt--;
241 int is_ownthread = ((this->tid==0) || (pthread_self() == this->tid));
244 msg = &this->msgdata.store_msgs[this->msgdata.store_msg_cnt++];
246 msg = &this->msgdata.msg;
249 memset(msg, 0,
sizeof(*msg));
250 msg->
type=EVE_THREAD_MSG_TYPE_DEL;
257 if(ret < 0) this->msgdata.store_msg_cnt--;
264 int is_ownthread = ((this->tid==0) || (pthread_self() == this->tid));
267 msg = &this->msgdata.store_msgs[this->msgdata.store_msg_cnt++];
269 msg = &this->msgdata.msg;
272 memset(msg, 0,
sizeof(*msg));
273 msg->
type=EVE_THREAD_MSG_TYPE_STOP;
278 if(ret < 0) this->msgdata.store_msg_cnt--;
332 if(!this->event_base) {
341 if(!this->msg_evinfo) {
353 if(this->msg_evinfo) {
356 if(this->event_base) {
369 close(this->eventfd);
379 subscriber=subscriber->
next;
388 while(!this->is_stop) {
398 if(pthread_detach(pthread_self())) {
458 while(this->msgdata.store_msg_cnt && cnt) {
462 DEBUG_PRINT(
"cond signal from event thread to %p\n", &this->msgdata.cond );
463 pthread_cond_signal(&this->msgdata.cond);
473 int ret = eventfd_read(this->eventfd, &cnt);
482 static const char * pluginlist[] = {
483 EVENT_IF_PLUGIN_PATH
"/libevent_if_libevent.so",
484 EVENT_IF_PLUGIN_PATH
"/libevent_if_epoll.so",
485 EVENT_IF_PLUGIN_PATH
"/libevent_if_select.so",
491 struct stat stat_buf;
492 for(i = 0; pluginlist[i]; i ++ ) {
494 if((stat(pluginlist[i], &stat_buf)==0) && ( S_ISREG((stat_buf.st_mode & S_IFMT)) || S_ISLNK(stat_buf.st_mode & S_IFMT))) {
498 return pluginlist[i];
545 instance->
eventfd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC );
572 pthread_attr_init(&attr);
575 #ifdef CHECK_STACKSIZE 581 pthread_attr_destroy(&attr);
586 pthread_t tid = this->tid;
589 pthread_join(tid, NULL);
591 #ifdef CHECK_STACKSIZE 594 if (this->stack_adr[i] != MAGIC_NUMBER)
break;
615 pthread_mutex_init(&this->msgdata.lock, NULL);
616 pthread_cond_init(&this->msgdata.cond, NULL);
636 DEBUG_ERRPRINT(
"There is no install plugin, please check it!\n" );
642 DEBUG_ERRPRINT(
"Failed to open %s, err=%s!\n", plugin_path , dlerror() );
This is API of event thread.
static const char * event_tpool_thread_get_defaullt_plugin(void)
static void event_tpool_thread_call_msgs(EventTPoolThread this, eventfd_t cnt)
main messages caller
event_thread_msg_body_add_t add
when msg is update, body is subscribe
static void event_subscriber_data_free(EventTPoolThread this, EventSubscriberData data)
free instance
EventHandler(* add)(EventInstance this, EventSubscriber subscriber, void *arg)
EventSubscriberData head
list of subscriber
event_subscriber_t subscriber
static void event_tpool_thread_msg_cb_add(EventTPoolThread this, event_thread_msg_t *msg)
for add
int event_tpool_thread_unload_plugin(void)
void(* exit)(EventInstance this)
struct event_thread_msg_body_del_t event_thread_msg_body_del_t
static void event_tpool_thread_msg_cb_stop(EventTPoolThread this, event_thread_msg_t *msg)
for stop
struct event_thread_msg_body_add_t event_thread_msg_body_add_t
message definition for manage subscriber
union event_thread_msg_t::@2 data
void event_tpool_thread_stop(EventTPoolThread this)
stop thread
static void event_tpool_thread_msg_cb_del(EventTPoolThread this, event_thread_msg_t *msg)
for del
static EventSubscriberData event_subscriber_data_new(EventTPoolThread this, EventSubscriber subscriber, void *arg)
new instance
static struct @0 event_if_instance_g
interface definition
void(* free)(EventInstance this)
void event_thread_set_stack_size(size_t stack_size)
EventSubscriberData tail
list of subscriber
static void event_tpool_thread_cb(int, int, void *)
callback main
static int event_tpool_thread_load_all_fun(void)
static void event_thread_msg_send_del(EventTPoolThread this, int fd)
static void event_thread_set_func(void *handle, const char *func_name, void **func)
struct event_tpool_thread_t * EventTPoolThread
#define event_thread_push(this, data)
#define EVENT_THREAD_WAIT_TIMEOUT
static void event_thread_msg_send_update(EventTPoolThread this, EventSubscriber subscriber, void *arg)
static int event_thread_msg_send(EventTPoolThread this, EventThreadMsg msg)
static void event_thread_msg_send_add(EventTPoolThread this, EventSubscriber subscriber, void *arg)
static void event_thread_msg_send_subscribe(EventTPoolThread this, EventSubscriber subscriber, void *arg, int type)
void(* event_tpool_thread_msg_cb)(EventTPoolThread this, event_thread_msg_t *msg)
void event_tpool_thread_del(EventTPoolThread this, int fd)
delete subscriber
message definition for manage subscriber
void * EventHandler
Event handler related to fd.
static int event_thread_msg_send_stop(EventTPoolThread this)
EventHandler(* update)(EventInstance this, EventHandler handler, EventSubscriber subscriber, void *arg)
event_thread_msg_body_del_t del
when msg is del, body is fd
message struct definition
int(* loop)(EventInstance this)
void(* loopbreak)(EventInstance this)
#define event_thread_pull(this, data)
EventSubscriber class instance definition, this is storaged in any threads.
void event_tpool_thread_start(EventTPoolThread this)
start thread
void event_tpool_thread_update(EventTPoolThread this, EventSubscriber subscriber, void *arg)
update subscriber
#define EVENT_THREAD_STACKSIZE
void event_tpool_thread_add(EventTPoolThread this, EventSubscriber subscriber, void *arg)
add new subscriber
subscriber information define
event_thread_msg_info_t msgdata
int fd
file descripter of this subscriber
static void event_tpool_thread_msg_cb_call(EventTPoolThread this, event_thread_msg_t *msg)
static void event_tpool_thread_msg_cb_update(EventTPoolThread this, event_thread_msg_t *msg)
for update
void * EventInstance
Event management instance which get from event_if_new.
int eventfd
eventfd to use add message
static int event_thread_msg_send_without_lock(EventTPoolThread this, EventThreadMsg msg)
void * eventinfo
event related to subscriber
struct event_subscriber_data_t * EventSubscriberData
subscriber information define
struct event_thread_msg_t * EventThreadMsg
EventTPoolThread event_tpool_thread_new(size_t thread_size)
create and thread instance
struct event_thread_msg_info_t event_thread_msg_info_t
static void event_tpool_thread_free(EventTPoolThread this)
free thread instance, please call stop before calling it
static size_t event_thread_stack_size_g
thread stack size, user can change it by using event_tpool_set_stack_size
void(* del)(EventInstance this, EventHandler handler)
#define DEBUG_ERRPRINT(...)
static void event_tpool_thread_remove_event_base(EventTPoolThread this)
remove event_base
event_thread_msg_t * store_msgs
static int event_subscriber_data_get_fd(EventSubscriberData this)
get fd
static EventSubscriberData event_tpool_thread_get_subscriber(EventTPoolThread this, int fd)
get subscriber
static void * event_tpool_thread_main(void *arg)
main thread
int(* getfd)(EventHandler handler)
void event_thread_atfork_child(EventTPoolThread this)
static event_tpool_thread_msg_cb event_tpool_thread_msg_cb_table[]
callback table
static int event_tpool_thread_set_event_base(EventTPoolThread this)
set event_base
int event_tpool_thread_load_plugin(const char *plugin_path)
#define event_thread_pop(this)
enum event_thread_msg_t::@1 type