beanstalkd源码解析(2) 作者: nbboy 时间: 2020-10-23 分类: 软件架构,软件工程,设计模式,C 评论 ###开篇 在开篇先介绍下beanstalkd的一些基础结构,然后分析下第二个问题。 ####基础结构 ```c //最小堆结构(优先队列) struct Heap { int cap; int len; void **data; Less less; Record rec; }; //ADT int heapinsert(Heap *h, void *x);//向堆中插入一个新元素 void* heapremove(Heap *h, int k);//删除堆中的一个元素 ``` ```c //集合结构 struct ms { size_t used, cap, last; void **items; ms_event_fn oninsert, onremove; }; //ADT void ms_init(ms a, ms_event_fn oninsert, ms_event_fn onremove);//初始化 void ms_clear(ms a);//清空 int ms_append(ms a, void *item);//追加 int ms_remove(ms a, void *item);//移除 int ms_contains(ms a, void *item);//是否包含 void *ms_take(ms a);//获取一个 ``` #### 业务结构 ```c //任务对象,客户端可以调度的一个单元 struct job { //持久化字段,这些字段都被写入到WAL日志中 Jobrec r; // persistent fields; these get written to the wal /* bookeeping fields; these are in-memory only */ char pad[6]; //任务属于的管道对象 tube tube; //在链表中的上个和下个任务 job prev, next; /* linked list of jobs */ //在哈希表中的下个任务 job ht_next; /* Next job in a hash table list */ //任务在当前堆中的位置 size_t heap_index; /* where is this job in its current heap */ File *file; job fnext; job fprev; void *reserver; int walresv; int walused; char body[]; // written separately to the wal }; #define make_job(pri,delay,ttr,body_size,tube) make_job_with_id(pri,delay,ttr,body_size,tube,0) job allocate_job(int body_size);//分配一个job job make_job_with_id(uint pri, int64 delay, int64 ttr, int body_size, tube tube, uint64 id);//创建job void job_free(job j);//释放job /* Lookup a job by job ID */ job job_find(uint64 job_id);//查找 /* the void* parameters are really job pointers */ void job_setheappos(void*, int); int job_pri_less(void*, void*); int job_delay_less(void*, void*); job job_copy(job j);//拷贝job const char * job_state(job j);//获取job状态 int job_list_any_p(job head); job job_remove(job j);//移除job void job_insert(job head, job j);//插入job /* for unit tests */ size_t get_all_jobs_used(void); ``` ```c //管道对象,用来抽象同一种消息类型 struct tube { //被引用次数,用于监控管道被使用的情况 uint refs; //管道名称 char name[MAX_TUBE_NAME_LEN]; //预备执行队列 Heap ready; //延迟执行队列 Heap delay; //等待的连接集合 struct ms waiting; /* set of conns */ //统计结构,stats系列命令用到该结构 struct stats stat; //正被使用的次数 uint using_ct; //正被监听的次数 uint watching_ct; int64 pause; // int64 deadline_at; // struct job buried; }; tube make_tube(const char *name);//创建一个tube void tube_dref(tube t); void tube_iref(tube t); tube tube_find(const char *name);//查找 tube tube_find_or_make(const char *name); ``` ####服务有关结构 ```c /* 对服务器对象的封装,全局单例 */ struct Server { //端口 char *port; //地址 char *addr; //用户 char *user; //WAL日志对象 Wal wal; //服务器开启的套接字 Socket sock; //客户端连接池 Heap conns; }; void srvserve(Server *srv); void srvaccept(Server *s, int ev); ``` ```c //客户端链接对象 struct Conn { //服务器对象 Server *srv; //Client套接字 Socket sock; char state; char type; //下一个连接对象 Conn *next; //操作的管道 tube use; //运作时间 int64 tickat; // time at which to do more work //在链接池中的索引 int tickpos; // position in srv->conns //最近的任务 job soonest_job; // memoization of the soonest job int rw; // currently want: 'r', 'w', or 'h' //阻塞超时的时间 int pending_timeout; char halfclosed; //命令 char cmd[LINE_BUF_SIZE]; // this string is NOT NUL-terminated int cmd_len; int cmd_read; //回复 char *reply; int reply_len; int reply_sent; char reply_buf[LINE_BUF_SIZE]; // this string IS NUL-terminated // How many bytes of in_job->body have been read so far. If in_job is NULL // while in_job_read is nonzero, we are in bit bucket mode and // in_job_read's meaning is inverted -- then it counts the bytes that // remain to be thrown away. int in_job_read; job in_job; // a job to be read from the client job out_job; int out_job_sent; //监听的集合 struct ms watch; //正在被消费的任务列表 struct job reserved_jobs; // linked list header }; int connless(Conn *a, Conn *b); void connrec(Conn *c, int i); void connwant(Conn *c, int rw); void connsched(Conn *c); void connclose(Conn *c); void connsetproducer(Conn *c); void connsetworker(Conn *c); job connsoonestjob(Conn *c); int conndeadlinesoon(Conn *c); int conn_ready(Conn *c); ``` 需要说明的是,文件有关的结构和WAL有关的结构,就不在这里贴出来了,因为我也没整明白。 ###分析 看第二个问题.回顾下刚才代码 ```c //投递任务到队列 static int enqueue_job(Server *s, job j, int64 delay, char update_store) { int r; j->reserver = NULL; if (delay) { //任务开始执行时间 j->r.deadline_at = nanoseconds() + delay; r = heapinsert(&j->tube->delay, j); if (!r) return 0; //设置为被等待执行状态 j->r.state = Delayed; } else {//立即执行的任务就投递到预备队列 ``` 如果是延迟任务,就放入到tube的延迟堆中,且状态是delay 那么什么时候状态变为ready,然后放入ready堆中以供客户端去消费? 客户端投递一个任务后,下一个硬件断点看下值的变化 ```c (gdb) watch all_jobs_init[4].r. body_size bury_ct created_at deadline_at delay id kick_ct pri release_ct reserve_ct state timeout_ct ttr (gdb) watch all_jobs_init[4].r.state Hardware watchpoint 6: all_jobs_init[4].r.state (gdb) info breakpoints Num Type Disp Enb Address What 6 hw watchpoint keep y all_jobs_init[4].r.state ``` 然后客户端开始消费任务,时间到期后,即命中断点 ```c (gdb) c Continuing. Hardware watchpoint 6: all_jobs_init[4].r.state Old value = 4 '\004' New value = 1 '\001' enqueue_job (s=0x611520 , j=0x62a970, delay=0, update_store=0 '\000') at prot.c:479 479 ready_ct++; (gdb) bt #0 enqueue_job (s=0x611520 , j=0x62a970, delay=0, update_store=0 '\000') at prot.c:479 #1 0x0000000000409e90 in prottick (s=0x611520 ) at prot.c:1904 #2 0x000000000040b2ee in srvserve (s=0x611520 ) at serv.c:51 #3 0x000000000040d1d4 in main (argc=1, argv=0x7fffffffe488) at main.c:100 ``` 从堆栈中看出,是在prottick中进行调度。截取关键代码片段 ```c while ((j = delay_q_peek())) { d = j->r.deadline_at - now; if (d > 0) { period = min(period, d); break; } j = delay_q_take(); //从delay里取出最近需要执行的job,放入ready队列里 r = enqueue_job(s, j, 0, 0); if (r < 1) bury_job(s, j, 0); /* out of memory, so bury it */ } ``` 从上面代码可以看出把delay堆中的任务往ready堆中移动的过程,不过这段代码是怎样触发的? 往上翻一下就可以找到答案 ```c for (;;) { period = prottick(s); //sockXXX都是对平台的网络抽象 int rw = socknext(&sock, period); if (rw == -1) { twarnx("socknext"); exit(1); } //调用上面的接收链接回调函数 if (rw) { //accept开始接受网络请求 sock->f(sock->x, rw); } } int socknext(Socket **s, int64 timeout) { int r; struct epoll_event ev; r = epoll_wait(epfd, &ev, 1, (int)(timeout/1000000)); ... ``` 触发的条件有两个要么timeout到期,要么有新事件到来,不明白epoll事件机制的可以复习下epoll,那如何 计算这个超时时间呢? ```c while ((j = delay_q_peek())) { d = j->r.deadline_at - now; if (d > 0) { period = min(period, d); break; } j = delay_q_take(); //从delay里取出最近需要执行的job,放入ready队列里 r = enqueue_job(s, j, 0, 0); if (r < 1) bury_job(s, j, 0); /* out of memory, so bury it */ } for (i = 0; i < tubes.used; i++) { t = tubes.items[i]; d = t->deadline_at - now; if (t->pause && d <= 0) { t->pause = 0; process_queue(); } else if (d > 0) { period = min(period, d); } } while (s->conns.len) { Conn *c = s->conns.data[0]; d = c->tickat - now; if (d > 0) { period = min(period, d); break; } heapremove(&s->conns, 0); conn_timeout(c); } ``` 也就是根据job,tube,conn这几个对象计算一个最小时间戳 总结下第二个问题: 1.通过epoll_wait(其他平台也类似)来等待超时或者网络事件,而超时时候就做定时逻辑,包括移动delay堆中的job到ready堆中 这篇贴的代码有点多,下一篇再说第三个问题…