design_pattern_for_c  V 1.00
event_thread.c
Go to the documentation of this file.
1 
5 #include <stdio.h>
6 #include <string.h>
7 #include <pthread.h>
8 #include <unistd.h>
9 #include <sys/types.h>
10 #include <sys/eventfd.h>
11 #include <errno.h>
12 #include <sys/stat.h>
13 #include <dlfcn.h>
14 #include "event_thread.h"
15 #include "tpool_event_if.h"
16 #include "dp_util.h"
17 #include "config.h"
18 
19 #define EVENT_THREAD_WAIT_TIMEOUT (2)/*sec*/
20 
21 #define EVENT_THREAD_STACKSIZE (256 * 1024)/*suitable stack size*/
22 
23 /*************
24  * public define
25 *************/
29 static struct {
30  void * handle;
31  EventInstance (*new)(void);
32  EventHandler (*add)(EventInstance this, EventSubscriber subscriber, void *arg);
33  EventHandler (*update)(EventInstance this, EventHandler handler, EventSubscriber subscriber, void *arg);
34  void (*del)(EventInstance this, EventHandler handler);
35  int (*getfd)(EventHandler handler);
36  int (*loop)(EventInstance this);
37  void (*loopbreak)(EventInstance this);
38  void (*exit)(EventInstance this);
39  void (*free)(EventInstance this);
41 
42 static void event_thread_set_func(void *handle, const char *func_name, void ** func);
43 static int event_tpool_thread_load_all_fun(void);
44 static const char * event_tpool_thread_get_defaullt_plugin(void);
45 
48 
52  void * arg;
54 
56  int fd;
58 
60 struct event_thread_msg_t;
62 
65  enum {
70  } type;
71  union {
75  } data;
76 };
77 
78 typedef struct event_thread_msg_info_t {
80  size_t store_msg_cnt;
82  pthread_mutex_t lock;
83  pthread_cond_t cond;
85 
86 //#define CHECK_STACKSIZE
87 #ifdef CHECK_STACKSIZE
88 #define MAGIC_NUMBER 'Z'
89 #endif
90 
97  void * eventinfo;
98 };
99 
100 #define event_thread_pop(this) (EventSubscriberData)dputil_list_pop((DPUtilList)(this))
101 #define event_thread_pull(this, data) dputil_list_pull((DPUtilList)(this), (DPUtilListData)(data))
102 #define event_thread_push(this, data) dputil_list_push((DPUtilList)(this), (DPUtilListData)(data))
103 
109  event_thread_msg_info_t msgdata;/*<! message queue*/
110  int eventfd;
111  EventInstance event_base;/*<! base event*/
112  EventHandler msg_evinfo;/*<! msg event info*/
113  pthread_t tid;/*<! thread id*/
114  int is_stop;/*<! stop flag*/
115 #ifdef CHECK_STACKSIZE
116  char * stack_adr;
117 #endif
118 };
122 #define EVMSG_LOCK(this) DPUTIL_LOCK(&this->msgdata.lock);
123 #define EVMSG_UNLOCK DPUTIL_UNLOCK
124 
127 static void event_thread_msg_send_subscribe(EventTPoolThread this, EventSubscriber subscriber, void *arg, int type);
128 static void event_thread_msg_send_add(EventTPoolThread this, EventSubscriber subscriber, void *arg);
129 static void event_thread_msg_send_update(EventTPoolThread this, EventSubscriber subscriber, void *arg);
130 static void event_thread_msg_send_del(EventTPoolThread this, int fd);
152 static void * event_tpool_thread_main(void *arg);
171 };
172 
174 
176 static void event_tpool_thread_call_msgs(EventTPoolThread this, eventfd_t cnt);
178 static void event_tpool_thread_cb(int, int, void *);
181 /*************
182  * for EventSubscriberData.
183 *************/
185  int ret = 0;
186 EVMSG_LOCK(this)
187  ret = eventfd_write(this->eventfd, 1);
188  if(ret < 0) {
189  DEBUG_PRINT("################Failed to send event\n");
190  } else {
191  /*wait receive message notification from event thread main*/
192  struct timespec timeout;
193  clock_gettime(CLOCK_REALTIME, &timeout);
194  timeout.tv_sec += EVENT_THREAD_WAIT_TIMEOUT;
195  ret = pthread_cond_timedwait(&this->msgdata.cond, &this->msgdata.lock, &timeout);
196  if(ret == ETIMEDOUT) {
197  DEBUG_ERRPRINT("#####################timeout!!!!!!\n");
198  ret = -1;
199  }
200  }
202  return ret;
203 }
204 
206  return eventfd_write(this->eventfd, 1);
207 }
208 
209 static void event_thread_msg_send_subscribe(EventTPoolThread this, EventSubscriber subscriber, void *arg, int type) {
210  int is_ownthread = ((this->tid==0) || (pthread_self() == this->tid));
211  EventThreadMsg msg;
212  if(!is_ownthread) {
213  msg = &this->msgdata.store_msgs[this->msgdata.store_msg_cnt++];
214  } else {
215  msg = &this->msgdata.msg;
216  }
217 
218  memset(msg, 0, sizeof(*msg));
219  msg->type = type;
220  memcpy(&msg->data.add.subscriber, subscriber, sizeof(*subscriber));
221  msg->data.add.arg = arg;
222  if(!is_ownthread) {
223  int ret = event_thread_msg_send(this, msg);
224  /*to care stop thread */
225  if(ret < 0) this->msgdata.store_msg_cnt--;
226  } else {
228  }
229 }
230 static void event_thread_msg_send_add(EventTPoolThread this, EventSubscriber subscriber, void *arg) {
231  DEBUG_PRINT("add, subscriber->%d!\n", subscriber->fd);
232 
233  event_thread_msg_send_subscribe(this, subscriber, arg, EVE_THREAD_MSG_TYPE_ADD);
234 }
235 
236 static void event_thread_msg_send_update(EventTPoolThread this, EventSubscriber subscriber, void *arg) {
237  DEBUG_PRINT("update, subscriber->%d!\n", subscriber->fd);
238  event_thread_msg_send_subscribe(this, subscriber, arg, EVE_THREAD_MSG_TYPE_UPDATE);
239 }
240 static void event_thread_msg_send_del(EventTPoolThread this, int fd) {
241  int is_ownthread = ((this->tid==0) || (pthread_self() == this->tid));
242  EventThreadMsg msg;
243  if(!is_ownthread) {
244  msg = &this->msgdata.store_msgs[this->msgdata.store_msg_cnt++];
245  } else {
246  msg = &this->msgdata.msg;
247  }
248 
249  memset(msg, 0, sizeof(*msg));
250  msg->type=EVE_THREAD_MSG_TYPE_DEL;
251  msg->data.del.fd = fd;
252  DEBUG_PRINT("del, subscriber->%d!\n", fd);
253 
254  if(!is_ownthread) {
255  int ret = event_thread_msg_send(this, msg);
256  /*to care stop thread */
257  if(ret < 0) this->msgdata.store_msg_cnt--;
258  } else {
260  }
261 }
262 
264  int is_ownthread = ((this->tid==0) || (pthread_self() == this->tid));
265  EventThreadMsg msg;
266  if(!is_ownthread) {
267  msg = &this->msgdata.store_msgs[this->msgdata.store_msg_cnt++];
268  } else {
269  msg = &this->msgdata.msg;
270  }
271 
272  memset(msg, 0, sizeof(*msg));
273  msg->type=EVE_THREAD_MSG_TYPE_STOP;
274  int ret = 0;
275  if (!is_ownthread) {
276  ret = event_thread_msg_send_without_lock(this, msg);
277  /*to care stop thread */
278  if(ret < 0) this->msgdata.store_msg_cnt--;
279  ret=1;
280  } else {
282  }
283  DEBUG_PRINT("send stop, return %d!\n", ret);
284  return ret;
285 }
286 
288  event_tpool_thread_msg_cb_table[msg->type](this, msg);
289 }
290 
294  DEBUG_PRINT("add subscriber->%d!\n", subscriber->fd);
295  EventSubscriberData instance = calloc(1, sizeof(*instance));
296  if(!instance) {
297  return NULL;
298  }
299 
300  instance->eventinfo = event_if_instance_g.add(this->event_base, subscriber, arg);
301  if(!instance->eventinfo) {
302  DEBUG_ERRPRINT("Failed to new event!\n" );
303  goto err;
304  }
305 
306  return instance;
307 
308 err:
309  event_subscriber_data_free(this, instance);
310  return NULL;
311 }
312 
315  if(data->eventinfo) {
316  event_if_instance_g.del(this->event_base, data->eventinfo);
317  }
318  free(data);
319 }
322  return event_if_instance_g.getfd(this->eventinfo);
323 }
324 /*************
325  * for EventTPoolThread private API
326 *************/
331  this->event_base = event_if_instance_g.new();
332  if(!this->event_base) {
333  DEBUG_ERRPRINT("Failed to new event_base!\n" );
334  goto err;
335  }
336 
337  /*add event*/
338  event_subscriber_t subscriber={this->eventfd, EV_TPOOL_READ, event_tpool_thread_cb};
339  DEBUG_PRINT("base fd=%d\n", this->eventfd);
340  this->msg_evinfo = event_if_instance_g.add(this->event_base, &subscriber, this);
341  if(!this->msg_evinfo) {
342  DEBUG_ERRPRINT("Failed to new event!\n" );
343  goto err;
344  }
345 
346  return 0;
347 err:
349  return -1;
350 }
351 
353  if(this->msg_evinfo) {
354  event_if_instance_g.del(this->event_base, this->msg_evinfo);
355  }
356  if(this->event_base) {
357  event_if_instance_g.free(this->event_base);
358  }
359 }
360 
364  while(data) {
365  event_subscriber_data_free(this, data);
366  data = event_thread_pop(this);
367  }
369  close(this->eventfd);
370  free(this);
371 }
372 
374  EventSubscriberData subscriber = this->head;
375  while(subscriber) {
376  if(event_subscriber_data_get_fd(subscriber) == fd) {
377  break;
378  }
379  subscriber=subscriber->next;
380  }
381  return subscriber;
382 }
383 
385 static void * event_tpool_thread_main(void *arg) {
387  /*add atfork handler*/
388  while(!this->is_stop) {
389  if(event_if_instance_g.loop(this->event_base) < 0) {
390  break;
391  }
392  }
393 
394  DEBUG_PRINT("exit main thread!\n" );
395  event_if_instance_g.exit(this->event_base);
396 
398  if(pthread_detach(pthread_self())) {
399  //already wait join, call exit
400  pthread_exit(NULL);
401  }
402  return NULL;
403 }
409  /*add event*/
411 
412  EventSubscriberData instance = event_subscriber_data_new(this, &body->subscriber, body->arg);
413  if(!instance) {
414  DEBUG_ERRPRINT("Failed to new subscriber!\n" );
415  return;
416  }
417 
418  event_thread_push(this, instance);
419 }
420 
423  /*add event*/
426  if(!subscriber) {
427  DEBUG_ERRPRINT("Failed to find subscriber!\n" );
428  return;
429  }
430 
431  /*update event*/
432  subscriber->eventinfo = event_if_instance_g.update(this->event_base, subscriber->eventinfo, &body->subscriber, body->arg);
433 }
434 
438  EventSubscriberData subscriber = event_tpool_thread_get_subscriber(this, body->fd);
439  if(!subscriber) {
440  return;
441  }
442  //pull data from list
443  event_thread_pull(this, subscriber);
444  //delete event
445  event_subscriber_data_free(this, subscriber);
446 }
447 
450  (void)msg;
451  this->is_stop=1;
452  event_if_instance_g.loopbreak(this->event_base);
453 }
454 
455 static void event_tpool_thread_call_msgs(EventTPoolThread this, eventfd_t cnt) {
456 EVMSG_LOCK(this)
457  /* call */
458  while(this->msgdata.store_msg_cnt && cnt) {
459  cnt--;
460  event_tpool_thread_msg_cb_call(this, &this->msgdata.store_msgs[--this->msgdata.store_msg_cnt]);
461  /*notify event message to called API thread*/
462  DEBUG_PRINT("cond signal from event thread to %p\n", &this->msgdata.cond );
463  pthread_cond_signal(&this->msgdata.cond);
464  }
466 }
467 
469 static void event_tpool_thread_cb(int fd, int flag, void * arg) {
471 
472  eventfd_t cnt=0;
473  int ret = eventfd_read(this->eventfd, &cnt);
474  if(ret < 0) {
475  DEBUG_ERRPRINT("Failed to read event!\n" );
476  }
477 
478  event_tpool_thread_call_msgs(this, cnt);
479 }
480 
481 static const char * event_tpool_thread_get_defaullt_plugin(void) {
482  static const char * pluginlist[] = {
483  EVENT_IF_PLUGIN_PATH"/libevent_if_libevent.so",
484  EVENT_IF_PLUGIN_PATH"/libevent_if_epoll.so",
485  EVENT_IF_PLUGIN_PATH"/libevent_if_select.so",
486  NULL,
487  };
488 
489  size_t i = 0;
490  mode_t fmode;
491  struct stat stat_buf;
492  for(i = 0; pluginlist[i]; i ++ ) {
493  //check is it library file?
494  if((stat(pluginlist[i], &stat_buf)==0) && ( S_ISREG((stat_buf.st_mode & S_IFMT)) || S_ISLNK(stat_buf.st_mode & S_IFMT))) {
495  break;
496  }
497  }
498  return pluginlist[i];
499 }
500 
502  event_if_instance_g.new = dlsym(event_if_instance_g.handle, "event_if_new");
503  if(!event_if_instance_g.new) return -1;
504 
505  event_if_instance_g.add = dlsym(event_if_instance_g.handle, "event_if_add");
506  if(!event_if_instance_g.add) return -1;
507 
508  event_if_instance_g.update = dlsym(event_if_instance_g.handle, "event_if_update");
509  if(!event_if_instance_g.update) return -1;
510 
511  event_if_instance_g.del = dlsym(event_if_instance_g.handle, "event_if_del");
512  if(!event_if_instance_g.del) return -1;
513 
514  event_if_instance_g.getfd = dlsym(event_if_instance_g.handle, "event_if_getfd");
515  if(!event_if_instance_g.getfd) return -1;
516 
517  event_if_instance_g.loop = dlsym(event_if_instance_g.handle, "event_if_loop");
518  if(!event_if_instance_g.loop) return -1;
519 
520  event_if_instance_g.loopbreak = dlsym(event_if_instance_g.handle, "event_if_loopbreak");
521  if(!event_if_instance_g.loopbreak) return -1;
522 
523  event_if_instance_g.exit = dlsym(event_if_instance_g.handle, "event_if_exit");
524  if(!event_if_instance_g.exit) return -1;
525 
526  event_if_instance_g.free = dlsym(event_if_instance_g.handle, "event_if_free");
527  if(!event_if_instance_g.free) return -1;
528 
529  return 0;
530 }
531 
533 /*************
534  * for public API
535 *************/
538  EventTPoolThread instance = calloc(1, sizeof(*instance) + sizeof(event_thread_msg_t)*thread_size);
539  if(!instance) {
540  DEBUG_ERRPRINT("Failed to get instance!\n" );
541  }
542 
543  instance->msgdata.store_msgs = (EventThreadMsg)(instance + 1);
544 
545  instance->eventfd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC );
546  if(instance->eventfd == -1) {
547  DEBUG_ERRPRINT("Failed to create revc socket pair!\n");
548  goto err;
549  }
550 
551  pthread_mutex_init(&instance->msgdata.lock, NULL);
552  pthread_cond_init(&instance->msgdata.cond, NULL);
553 
554  if(event_tpool_thread_set_event_base(instance)) {
555  DEBUG_ERRPRINT("Failed to set base event!\n" );
556  goto err;
557  }
558 
559  return instance;
560 err:
561  if(instance->eventfd) {
562  close(instance->eventfd);
563  }
564  free(instance);
565  return NULL;
566 }
567 
570  this->tid=0;
571  pthread_attr_t attr;
572  pthread_attr_init(&attr);
573  pthread_attr_setstacksize(&attr, event_thread_stack_size_g);
574 
575 #ifdef CHECK_STACKSIZE
576  this->stack_adr = (char *) malloc(event_thread_stack_size_g);
577  memset(this->stack_adr, MAGIC_NUMBER, event_thread_stack_size_g);
578  pthread_attr_setstack(&attr, (void *) this->stack_adr, event_thread_stack_size_g);
579 #endif
580  pthread_create(&this->tid, &attr, event_tpool_thread_main, this);
581  pthread_attr_destroy(&attr);
582 }
583 
586  pthread_t tid = this->tid;
587  int ret = event_thread_msg_send_stop(this);
588  if(0<ret) {
589  pthread_join(tid, NULL);
590  }
591 #ifdef CHECK_STACKSIZE
592  int i=0;
593  for(i=0;i<event_thread_stack_size_g;i++) {
594  if (this->stack_adr[i] != MAGIC_NUMBER) break;
595  }
596  fprintf(stderr, "Used %d byte\n", event_thread_stack_size_g - i);
597 #endif
598 }
599 
601 void event_tpool_thread_add(EventTPoolThread this, EventSubscriber subscriber, void * arg) {
602  event_thread_msg_send_add(this, subscriber, arg);
603 }
604 void event_tpool_thread_update(EventTPoolThread this, EventSubscriber subscriber, void * arg) {
605  event_thread_msg_send_update(this, subscriber, arg);
606 }
607 
610  event_thread_msg_send_del(this, fd);
611 }
613  /*because there is no other thread, so always work like on pooled thread */
614  this->tid = 0;
615  pthread_mutex_init(&this->msgdata.lock, NULL);
616  pthread_cond_init(&this->msgdata.cond, NULL);
617 }
618 
619 void event_thread_set_stack_size(size_t stack_size) {
620  event_thread_stack_size_g = stack_size;
621 }
622 
623 int event_tpool_thread_load_plugin(const char *plugin_path) {
624  if(event_if_instance_g.handle) {
625  return 0;
626  }
627 
628  const char * path;
629  if(!plugin_path) {
631  } else {
632  path = plugin_path;
633  }
634 
635  if(!path) {
636  DEBUG_ERRPRINT("There is no install plugin, please check it!\n" );
637  return -1;
638  }
639 
640  event_if_instance_g.handle = dlopen(path, RTLD_NOW);
641  if(!event_if_instance_g.handle) {
642  DEBUG_ERRPRINT("Failed to open %s, err=%s!\n", plugin_path , dlerror() );
643  return -1;
644  }
645 
647 }
648 
650  if(event_if_instance_g.handle) {
651  dlclose(event_if_instance_g.handle);
652  }
653  memset(&event_if_instance_g, 0, sizeof(event_if_instance_g));
654 }
This is API of event thread.
static const char * event_tpool_thread_get_defaullt_plugin(void)
Definition: event_thread.c:481
#define EV_TPOOL_READ
static void event_tpool_thread_call_msgs(EventTPoolThread this, eventfd_t cnt)
main messages caller
Definition: event_thread.c:455
event_thread_msg_body_add_t add
when msg is update, body is subscribe
Definition: event_thread.c:72
static void event_subscriber_data_free(EventTPoolThread this, EventSubscriberData data)
free instance
Definition: event_thread.c:314
EventHandler(* add)(EventInstance this, EventSubscriber subscriber, void *arg)
Definition: event_thread.c:32
EventSubscriberData head
list of subscriber
Definition: event_thread.c:106
pthread_cond_t cond
Definition: event_thread.c:83
event_subscriber_t subscriber
Definition: event_thread.c:51
Utility headers
static void event_tpool_thread_msg_cb_add(EventTPoolThread this, event_thread_msg_t *msg)
for add
Definition: event_thread.c:408
int event_tpool_thread_unload_plugin(void)
Definition: event_thread.c:649
void(* exit)(EventInstance this)
Definition: event_thread.c:38
#define EVMSG_UNLOCK
Definition: event_thread.c:123
struct event_thread_msg_body_del_t event_thread_msg_body_del_t
static void event_tpool_thread_msg_cb_stop(EventTPoolThread this, event_thread_msg_t *msg)
for stop
Definition: event_thread.c:449
struct event_thread_msg_body_add_t event_thread_msg_body_add_t
message definition for manage subscriber
union event_thread_msg_t::@2 data
void event_tpool_thread_stop(EventTPoolThread this)
stop thread
Definition: event_thread.c:585
static void event_tpool_thread_msg_cb_del(EventTPoolThread this, event_thread_msg_t *msg)
for del
Definition: event_thread.c:436
EventSubscriberData next
Definition: event_thread.c:95
static EventSubscriberData event_subscriber_data_new(EventTPoolThread this, EventSubscriber subscriber, void *arg)
new instance
Definition: event_thread.c:293
static struct @0 event_if_instance_g
interface definition
void(* free)(EventInstance this)
Definition: event_thread.c:39
EventHandler msg_evinfo
Definition: event_thread.c:112
void event_thread_set_stack_size(size_t stack_size)
Definition: event_thread.c:619
EventSubscriberData tail
list of subscriber
Definition: event_thread.c:107
static void event_tpool_thread_cb(int, int, void *)
callback main
Definition: event_thread.c:469
static int event_tpool_thread_load_all_fun(void)
Definition: event_thread.c:501
static void event_thread_msg_send_del(EventTPoolThread this, int fd)
Definition: event_thread.c:240
EventInstance event_base
Definition: event_thread.c:111
static void event_thread_set_func(void *handle, const char *func_name, void **func)
struct event_tpool_thread_t * EventTPoolThread
Definition: event_thread.h:13
#define event_thread_push(this, data)
Definition: event_thread.c:102
pthread_mutex_t lock
Definition: event_thread.c:82
#define EVENT_THREAD_WAIT_TIMEOUT
Definition: event_thread.c:19
event_thread_msg_t msg
Definition: event_thread.c:79
static void event_thread_msg_send_update(EventTPoolThread this, EventSubscriber subscriber, void *arg)
Definition: event_thread.c:236
#define DEBUG_PRINT(...)
Definition: dp_debug.h:55
static int event_thread_msg_send(EventTPoolThread this, EventThreadMsg msg)
Definition: event_thread.c:184
void * handle
Definition: event_thread.c:30
static void event_thread_msg_send_add(EventTPoolThread this, EventSubscriber subscriber, void *arg)
Definition: event_thread.c:230
static void event_thread_msg_send_subscribe(EventTPoolThread this, EventSubscriber subscriber, void *arg, int type)
Definition: event_thread.c:209
void(* event_tpool_thread_msg_cb)(EventTPoolThread this, event_thread_msg_t *msg)
Definition: event_thread.c:156
void event_tpool_thread_del(EventTPoolThread this, int fd)
delete subscriber
Definition: event_thread.c:609
message definition for manage subscriber
Definition: event_thread.c:50
void * EventHandler
Event handler related to fd.
static int event_thread_msg_send_stop(EventTPoolThread this)
Definition: event_thread.c:263
EventHandler(* update)(EventInstance this, EventHandler handler, EventSubscriber subscriber, void *arg)
Definition: event_thread.c:33
event_thread_msg_body_del_t del
when msg is del, body is fd
Definition: event_thread.c:73
EventSubscriberData prev
Definition: event_thread.c:96
message struct definition
Definition: event_thread.c:64
int(* loop)(EventInstance this)
Definition: event_thread.c:36
void(* loopbreak)(EventInstance this)
Definition: event_thread.c:37
#define event_thread_pull(this, data)
Definition: event_thread.c:101
EventSubscriber class instance definition, this is storaged in any threads.
void event_tpool_thread_start(EventTPoolThread this)
start thread
Definition: event_thread.c:569
void event_tpool_thread_update(EventTPoolThread this, EventSubscriber subscriber, void *arg)
update subscriber
Definition: event_thread.c:604
#define EVENT_THREAD_STACKSIZE
Definition: event_thread.c:21
void event_tpool_thread_add(EventTPoolThread this, EventSubscriber subscriber, void *arg)
add new subscriber
Definition: event_thread.c:601
subscriber information define
Definition: event_thread.c:94
event_thread_msg_info_t msgdata
Definition: event_thread.c:109
int fd
file descripter of this subscriber
static void event_tpool_thread_msg_cb_call(EventTPoolThread this, event_thread_msg_t *msg)
Definition: event_thread.c:287
static void event_tpool_thread_msg_cb_update(EventTPoolThread this, event_thread_msg_t *msg)
for update
Definition: event_thread.c:422
#define EVMSG_LOCK(this)
Definition: event_thread.c:122
thread information
Definition: event_thread.c:105
void * EventInstance
Event management instance which get from event_if_new.
int eventfd
eventfd to use add message
Definition: event_thread.c:110
static int event_thread_msg_send_without_lock(EventTPoolThread this, EventThreadMsg msg)
Definition: event_thread.c:205
void * eventinfo
event related to subscriber
Definition: event_thread.c:97
struct event_subscriber_data_t * EventSubscriberData
subscriber information define
Definition: event_thread.c:92
struct event_thread_msg_t * EventThreadMsg
Definition: event_thread.c:61
EventTPoolThread event_tpool_thread_new(size_t thread_size)
create and thread instance
Definition: event_thread.c:537
struct event_thread_msg_info_t event_thread_msg_info_t
static void event_tpool_thread_free(EventTPoolThread this)
free thread instance, please call stop before calling it
Definition: event_thread.c:362
static size_t event_thread_stack_size_g
thread stack size, user can change it by using event_tpool_set_stack_size
Definition: event_thread.c:47
void(* del)(EventInstance this, EventHandler handler)
Definition: event_thread.c:34
#define DEBUG_ERRPRINT(...)
Definition: dp_debug.h:69
static void event_tpool_thread_remove_event_base(EventTPoolThread this)
remove event_base
Definition: event_thread.c:352
event_thread_msg_t * store_msgs
Definition: event_thread.c:81
static int event_subscriber_data_get_fd(EventSubscriberData this)
get fd
Definition: event_thread.c:321
static EventSubscriberData event_tpool_thread_get_subscriber(EventTPoolThread this, int fd)
get subscriber
Definition: event_thread.c:373
static void * event_tpool_thread_main(void *arg)
main thread
Definition: event_thread.c:385
int(* getfd)(EventHandler handler)
Definition: event_thread.c:35
void event_thread_atfork_child(EventTPoolThread this)
Definition: event_thread.c:612
static event_tpool_thread_msg_cb event_tpool_thread_msg_cb_table[]
callback table
Definition: event_thread.c:166
static int event_tpool_thread_set_event_base(EventTPoolThread this)
set event_base
Definition: event_thread.c:330
int event_tpool_thread_load_plugin(const char *plugin_path)
Definition: event_thread.c:623
#define event_thread_pop(this)
Definition: event_thread.c:100
enum event_thread_msg_t::@1 type