beanstalkd源码解析(2) 作者: nbboy 时间: 2020-10-23 分类: 软件架构,软件工程,设计模式,C 评论 ###开篇 在开篇先介绍下beanstalkd的一些基础结构,然后分析下第二个问题。 ####基础结构 ```c //最小堆结构(优先队列) struct Heap { int cap; int len; void **data; Less less; Record rec; }; //ADT int heapinsert(Heap *h, void *x);//向堆中插入一个新元素 void* heapremove(Heap *h, int k);//删除堆中的一个元素 ``` ```c //集合结构 struct ms { size_t used, cap, last; void **items; ms_event_fn oninsert, onremove; }; //ADT void ms_init(ms a, ms_event_fn oninsert, ms_event_fn onremove);//初始化 void ms_clear(ms a);//清空 int ms_append(ms a, void *item);//追加 int ms_remove(ms a, void *item);//移除 int ms_contains(ms a, void *item);//是否包含 void *ms_take(ms a);//获取一个 ``` #### 业务结构 ```c //任务对象,客户端可以调度的一个单元 struct job { //持久化字段,这些字段都被写入到WAL日志中 Jobrec r; // persistent fields; these get written to the wal /* bookeeping fields; these are in-memory only */ char pad[6]; //任务属于的管道对象 tube tube; //在链表中的上个和下个任务 job prev, next; /* linked list of jobs */ //在哈希表中的下个任务 job ht_next; /* Next job in a hash table list */ //任务在当前堆中的位置 size_t heap_index; /* where is this job in its current heap */ File *file; job fnext; job fprev; void *reserver; int walresv; int walused; char body[]; // written separately to the wal }; #define make_job(pri,delay,ttr,body_size,tube) make_job_with_id(pri,delay,ttr,body_size,tube,0) job allocate_job(int body_size);//分配一个job job make_job_with_id(uint pri, int64 delay, int64 ttr, int body_size, tube tube, uint64 id);//创建job void job_free(job j);//释放job /* Lookup a job by job ID */ job job_find(uint64 job_id);//查找 /* the void* parameters are really job pointers */ void job_setheappos(void*, int); int job_pri_less(void*, void*); int job_delay_less(void*, void*); job job_copy(job j);//拷贝job const char * job_state(job j);//获取job状态 int job_list_any_p(job head); job job_remove(job j);//移除job void job_insert(job head, job j);//插入job /* for unit tests */ size_t get_all_jobs_used(void); ``` ```c //管道对象,用来抽象同一种消息类型 struct tube { //被引用次数,用于监控管道被使用的情况 uint refs; //管道名称 char name[MAX_TUBE_NAME_LEN]; //预备执行队列 Heap ready; //延迟执行队列 Heap delay; //等待的连接集合 struct ms waiting; /* set of conns */ //统计结构,stats系列命令用到该结构 struct stats stat; //正被使用的次数 uint using_ct; //正被监听的次数 uint watching_ct; int64 pause; // int64 deadline_at; // struct job buried; }; tube make_tube(const char *name);//创建一个tube void tube_dref(tube t); void tube_iref(tube t); tube tube_find(const char *name);//查找 tube tube_find_or_make(const char *name); ``` ####服务有关结构 ```c /* 对服务器对象的封装,全局单例 */ struct Server { //端口 char *port; //地址 char *addr; //用户 char *user; //WAL日志对象 Wal wal; //服务器开启的套接字 Socket sock; //客户端连接池 Heap conns; }; void srvserve(Server *srv); void srvaccept(Server *s, int ev); ``` ```c //客户端链接对象 struct Conn { //服务器对象 Server *srv; //Client套接字 Socket sock; char state; char type; //下一个连接对象 Conn *next; //操作的管道 tube use; //运作时间 int64 tickat; // time at which to do more work //在链接池中的索引 int tickpos; // position in srv->conns //最近的任务 job soonest_job; // memoization of the soonest job int rw; // currently want: 'r', 'w', or 'h' //阻塞超时的时间 int pending_timeout; char halfclosed; //命令 char cmd[LINE_BUF_SIZE]; // this string is NOT NUL-terminated int cmd_len; int cmd_read; //回复 char *reply; int reply_len; int reply_sent; char reply_buf[LINE_BUF_SIZE]; // this string IS NUL-terminated // How many bytes of in_job->body have been read so far. If in_job is NULL // while in_job_read is nonzero, we are in bit bucket mode and // in_job_read's meaning is inverted -- then it counts the bytes that // remain to be thrown away. int in_job_read; job in_job; // a job to be read from the client job out_job; int out_job_sent; //监听的集合 struct ms watch; //正在被消费的任务列表 struct job reserved_jobs; // linked list header }; int connless(Conn *a, Conn *b); void connrec(Conn *c, int i); void connwant(Conn *c, int rw); void connsched(Conn *c); void connclose(Conn *c); void connsetproducer(Conn *c); void connsetworker(Conn *c); job connsoonestjob(Conn *c); int conndeadlinesoon(Conn *c); int conn_ready(Conn *c); ``` 需要说明的是,文件有关的结构和WAL有关的结构,就不在这里贴出来了,因为我也没整明白。 ###分析 看第二个问题.回顾下刚才代码 ```c //投递任务到队列 static int enqueue_job(Server *s, job j, int64 delay, char update_store) { int r; j->reserver = NULL; if (delay) { //任务开始执行时间 j->r.deadline_at = nanoseconds() + delay; r = heapinsert(&j->tube->delay, j); if (!r) return 0; //设置为被等待执行状态 j->r.state = Delayed; } else {//立即执行的任务就投递到预备队列 ``` 如果是延迟任务,就放入到tube的延迟堆中,且状态是delay 那么什么时候状态变为ready,然后放入ready堆中以供客户端去消费? 客户端投递一个任务后,下一个硬件断点看下值的变化 ```c (gdb) watch all_jobs_init[4].r. body_size bury_ct created_at deadline_at delay id kick_ct pri release_ct reserve_ct state timeout_ct ttr (gdb) watch all_jobs_init[4].r.state Hardware watchpoint 6: all_jobs_init[4].r.state (gdb) info breakpoints Num Type Disp Enb Address What 6 hw watchpoint keep y all_jobs_init[4].r.state ``` 然后客户端开始消费任务,时间到期后,即命中断点 ```c (gdb) c Continuing. Hardware watchpoint 6: all_jobs_init[4].r.state Old value = 4 '\004' New value = 1 '\001' enqueue_job (s=0x611520 , j=0x62a970, delay=0, update_store=0 '\000') at prot.c:479 479 ready_ct++; (gdb) bt #0 enqueue_job (s=0x611520 , j=0x62a970, delay=0, update_store=0 '\000') at prot.c:479 #1 0x0000000000409e90 in prottick (s=0x611520 ) at prot.c:1904 #2 0x000000000040b2ee in srvserve (s=0x611520 ) at serv.c:51 #3 0x000000000040d1d4 in main (argc=1, argv=0x7fffffffe488) at main.c:100 ``` 从堆栈中看出,是在prottick中进行调度。截取关键代码片段 ```c while ((j = delay_q_peek())) { d = j->r.deadline_at - now; if (d > 0) { period = min(period, d); break; } j = delay_q_take(); //从delay里取出最近需要执行的job,放入ready队列里 r = enqueue_job(s, j, 0, 0); if (r < 1) bury_job(s, j, 0); /* out of memory, so bury it */ } ``` 从上面代码可以看出把delay堆中的任务往ready堆中移动的过程,不过这段代码是怎样触发的? 往上翻一下就可以找到答案 ```c for (;;) { period = prottick(s); //sockXXX都是对平台的网络抽象 int rw = socknext(&sock, period); if (rw == -1) { twarnx("socknext"); exit(1); } //调用上面的接收链接回调函数 if (rw) { //accept开始接受网络请求 sock->f(sock->x, rw); } } int socknext(Socket **s, int64 timeout) { int r; struct epoll_event ev; r = epoll_wait(epfd, &ev, 1, (int)(timeout/1000000)); ... ``` 触发的条件有两个要么timeout到期,要么有新事件到来,不明白epoll事件机制的可以复习下epoll,那如何 计算这个超时时间呢? ```c while ((j = delay_q_peek())) { d = j->r.deadline_at - now; if (d > 0) { period = min(period, d); break; } j = delay_q_take(); //从delay里取出最近需要执行的job,放入ready队列里 r = enqueue_job(s, j, 0, 0); if (r < 1) bury_job(s, j, 0); /* out of memory, so bury it */ } for (i = 0; i < tubes.used; i++) { t = tubes.items[i]; d = t->deadline_at - now; if (t->pause && d <= 0) { t->pause = 0; process_queue(); } else if (d > 0) { period = min(period, d); } } while (s->conns.len) { Conn *c = s->conns.data[0]; d = c->tickat - now; if (d > 0) { period = min(period, d); break; } heapremove(&s->conns, 0); conn_timeout(c); } ``` 也就是根据job,tube,conn这几个对象计算一个最小时间戳 总结下第二个问题: 1.通过epoll_wait(其他平台也类似)来等待超时或者网络事件,而超时时候就做定时逻辑,包括移动delay堆中的job到ready堆中 这篇贴的代码有点多,下一篇再说第三个问题…
beanstalkd源码解析(1) 作者: nbboy 时间: 2020-10-23 分类: 软件架构,软件工程,设计模式,C 评论 ###简介 Beanstalkd是一个简单、高效的工作队列系统,其最初设计目的是通过后台异步执行耗时任务方式降低高容量Web应用的页面延时。而其简单、轻量、易用等特点,和对任务优先级、延时 超时重发等控制,以及众多语言版本的客户端的良好支持,使其可以很好的在各种需要队列系统的场景中应用。 其采用的是生产者、消费者模式,借鉴了memcached的设计,协议也很简单。先了解下其基础概念: #####job - 任务 job是一个需要异步处理的任务,是Beanstalkd中的基本单元,job需要放在一个tube中。Beanstalkd中的任务(job)类似于其它队列系统中的消息(message)的概念。 #####tube - 管道 管道即某一种类型的任务队列,其类似与消息的主题(topic),是Producer和Consumer的操作对象。一个Beanstalkd中可以有多个管道, 每个管道都有自己的发布者(Producer)和消费者Consumer,管道之间互相不影响。 #####producer - 生产者 任务(job)的生产者,通过put命令来将一个job放到一个tube中。 #####consumer - 消费者 任务(job)的消费者,通过reserve、release、bury、delete命令来获取或改变job的状态。 在网络上找了一副图,描述的是其状态流转情况:  一个Beanstalkd任务可能会包含以下状态: **READY** - 需要立即处理的任务。当producer直接put一个任务时,任务就处于READY状态,以等待consumer来处理。当延时 (DELAYED) 任务到期后会自动成为当前READY状态的任务 **DELAYED** - 延迟执行的任务。当任务被延时put时,任务就处于DELAYED状态。等待时间过后,任务会被迁移到READY状态。当消费者处理任务后,可以用将消息再次放回DELAYED队列延迟执行 **RESERVED** - 已经被消费者获取,正在执行的任务。当consumer获取了当前READY的任务后,该任务的状态就会迁移到RESERVED状态,这时其它的consumer就不能再操作该任务。Beanstalkd会检查任务是否在TTR(time-to-run)内完成 **BURIED** - 保留的任务,这时任务不会被执行,也不会消失。当consumer完成该任务后,可以选择delete、release或者bury操作。 delete后,任务会被删除,生命周期结束;release操作可以重新把任务状态迁移回READY状态或DELAYED状态,使其他consumer可以继续获取和执行该任务。bury会拔任务休眠,等需要该任务时,再将休眠的任务kick回READY;也可能过delete删除BURIED状态的任务 **DELETED** - 消息被删除,Beanstalkd不再维持这些消息。即任务生命周期结束。 ###分析 请原谅我罗里吧嗦得说了那么多,现在直接进入主题。我一般看源码都是带着问题去看,这样效率会高很多,这次也不例外,先提出几个问题吧。 - **beanstalkd如何维护job的状态?** - **delay状态的job怎么修改为ready?** - **如何实现优先级?** - **数据是如何持久化?** 对于问题1可以关注use,put,reserve,delete等命令的处理逻辑 ######use 是生产者命令 (lldb) bt * thread #1, queue = 'com.apple.main-thread', stop reason = step over * frame #0: 0x000000010000c011 beanstalkd`make_and_insert_tube(name="abc") at tube.c:70 frame #1: 0x000000010000bfd9 beanstalkd`tube_find_or_make(name="abc") at tube.c:96 frame #2: 0x00000001000087d0 beanstalkd`dispatch_cmd(c=0x0000000100400430) at prot.c:1554 frame #3: 0x0000000100006ca5 beanstalkd`do_cmd(c=0x0000000100400430) at prot.c:1686 frame #4: 0x00000001000067ca beanstalkd`conn_data(c=0x0000000100400430) at prot.c:1726 frame #5: 0x0000000100006623 beanstalkd`h_conn(fd=6, which=114, c=0x0000000100400430) at prot.c:1868 frame #6: 0x0000000100005968 beanstalkd`prothandle(c=0x0000000100400430, ev=114) at prot.c:1880 frame #7: 0x000000010000bb75 beanstalkd`srvserve(s=0x00000001000104b8) at serv.c:63 frame #8: 0x000000010000e19f beanstalkd`main(argc=1, argv=0x00007ffeefbff370) at main.c:100 frame #9: 0x00007fff6df26085 libdyld.dylib`start + 1 ######跟踪put命令 * * (lldb) bt * thread #1, queue = 'com.apple.main-thread', stop reason = breakpoint 6.1 * frame #0: 0x0000000100004f99 beanstalkd`enqueue_job(s=0x00000001000104b8, j=0x0000000100203b90, delay=0, update_store='\x01') at prot.c:466 frame #1: 0x00000001000070bf beanstalkd`enqueue_incoming_job(c=0x0000000100203850) at prot.c:867 frame #2: 0x0000000100006dfb beanstalkd`maybe_enqueue_incoming_job(c=0x0000000100203850) at prot.c:1160 frame #3: 0x000000010000757f beanstalkd`dispatch_cmd(c=0x0000000100203850) at prot.c:1279 frame #4: 0x0000000100006ca5 beanstalkd`do_cmd(c=0x0000000100203850) at prot.c:1686 frame #5: 0x00000001000067ca beanstalkd`conn_data(c=0x0000000100203850) at prot.c:1726 frame #6: 0x0000000100006623 beanstalkd`h_conn(fd=6, which=114, c=0x0000000100203850) at prot.c:1868 frame #7: 0x0000000100005968 beanstalkd`prothandle(c=0x0000000100203850, ev=114) at prot.c:1880 frame #8: 0x000000010000bb75 beanstalkd`srvserve(s=0x00000001000104b8) at serv.c:63 frame #9: 0x000000010000e19f beanstalkd`main(argc=1, argv=0x00007ffeefbff370) at main.c:100 frame #10: 0x00007fff6df26085 libdyld.dylib`start + 1 put命令主要是在ready堆中插入job ```c frame #0: 0x0000000100004f99 beanstalkd`enqueue_job(s=0x00000001000104b8, j=0x0000000100500370, delay=0, update_store='\x01') at prot.c:466 463 { 464 int r; 465 -> 466 j->reserver = NULL; 467 if (delay) { 468 //任务开始执行时间 469 j->r.deadline_at = nanoseconds() + delay; Target 0: (beanstalkd) stopped. (lldb) l 470 r = heapinsert(&j->tube->delay, j); 471 if (!r) return 0; 472 //设置为被等待执行状态 473 j->r.state = Delayed; 474 } else {//立即执行的任务就投递到预备队列 475 r = heapinsert(&j->tube->ready, j); 476 if (!r) return 0; (lldb) l 477 //设置状态为预备执行状态 478 j->r.state = Ready; 479 ready_ct++; 480 481 //对于紧急任务,进行额外的跟踪处理 482 if (j->r.pri < URGENT_THRESHOLD) { 483 global_stat.urgent_ct++; (lldb) l 484 j->tube->stat.urgent_ct++; 485 } 486 } ``` ######reserve命令 (lldb) bt * thread #1, queue = 'com.apple.main-thread', stop reason = step in * frame #0: 0x000000010000ac0c beanstalkd`enqueue_waiting_conn(c=0x0000000100500420) at prot.c:687 frame #1: 0x00000001000097f0 beanstalkd`wait_for_job(c=0x0000000100500420, timeout=3600) at prot.c:1024 frame #2: 0x00000001000079cd beanstalkd`dispatch_cmd(c=0x0000000100500420) at prot.c:1358 frame #3: 0x0000000100006ca5 beanstalkd`do_cmd(c=0x0000000100500420) at prot.c:1686 frame #4: 0x00000001000067ca beanstalkd`conn_data(c=0x0000000100500420) at prot.c:1726 frame #5: 0x0000000100006623 beanstalkd`h_conn(fd=6, which=114, c=0x0000000100500420) at prot.c:1868 frame #6: 0x0000000100005968 beanstalkd`prothandle(c=0x0000000100500420, ev=114) at prot.c:1880 frame #7: 0x000000010000bb75 beanstalkd`srvserve(s=0x00000001000104b8) at serv.c:63 frame #8: 0x000000010000e19f beanstalkd`main(argc=1, argv=0x00007ffeefbff370) at main.c:100 frame #9: 0x00007fff6df26085 libdyld.dylib`start + 1 ```c 1344 case OP_RESERVE: /* FALLTHROUGH */ 1345 /* don't allow trailing garbage */ -> 1346 if (type == OP_RESERVE && c->cmd_len != CMD_RESERVE_LEN + 2) { 1347 return reply_msg(c, MSG_BAD_FORMAT); 1348 } 1349 Target 0: (beanstalkd) stopped. (lldb) l 1350 op_ct[type]++; 1351 connsetworker(c); 1352 1353 if (conndeadlinesoon(c) && !conn_ready(c)) { 1354 return reply_msg(c, MSG_DEADLINE_SOON); 1355 } 1356 (lldb) l 1357 /* try to get a new job for this guy */ 1358 wait_for_job(c, timeout); 1359 process_queue(); 1360 break; ``` 把当前conn先加入到waitting集合中 ```c * thread #1, queue = 'com.apple.main-thread', stop reason = step over frame #0: 0x00000001000097e7 beanstalkd`wait_for_job(c=0x0000000100500420, timeout=3600) at prot.c:1024 1021 wait_for_job(Conn *c, int timeout) 1022 { 1023 c->state = STATE_WAIT; -> 1024 enqueue_waiting_conn(c); 1025 1026 /* Set the pending timeout to the requested timeout amount */ 1027 c->pending_timeout = timeout; Target 0: (beanstalkd) stopped. (lldb) s Process 40697 stopped * thread #1, queue = 'com.apple.main-thread', stop reason = step in frame #0: 0x000000010000ac0c beanstalkd`enqueue_waiting_conn(c=0x0000000100500420) at prot.c:687 684 tube t; 685 size_t i; 686 -> 687 global_stat.waiting_ct++; 688 c->type |= CONN_TYPE_WAITING; 689 for (i = 0; i < c->watch.used; i++) { 690 t = c->watch.items[i]; Target 0: (beanstalkd) stopped. (lldb) l 691 t->stat.waiting_ct++; //加入到tube的等待集合中 692 ms_append(&t->waiting, c); 693 } 694 } ``` 状态从ready到reserve ```c thread #1, queue = 'com.apple.main-thread', stop reason = step in frame #0: 0x0000000100006030 beanstalkd`reserve_job(c=0x0000000100500420, j=0x0000000100203b90) at prot.c:380 377 static void 378 reserve_job(Conn *c, job j) 379 { -> 380 j->r.deadline_at = nanoseconds() + j->r.ttr; 381 global_stat.reserved_ct++; /* stats */ 382 j->tube->stat.reserved_ct++; 383 j->r.reserve_ct++; Target 0: (beanstalkd) stopped. (lldb) l //改变状态为reserved 384 j->r.state = Reserved; //插入到conn的reserved_jobs堆中 385 job_insert(&c->reserved_jobs, j); 386 j->reserver = c; 387 c->pending_timeout = -1; 388 if (c->soonest_job && j->r.deadline_at < c->soonest_job->r.deadline_at) { 389 c->soonest_job = j; 390 } (lldb) l 391 return reply_job(c, j, MSG_RESERVED); 392 } ``` ######简单看下delete操作 ```c case OP_DELETE: -> 1363 errno = 0; 1364 id = strtoull(c->cmd + CMD_DELETE_LEN, &end_buf, 10); 1365 if (errno) return reply_msg(c, MSG_BAD_FORMAT); 1366 op_ct[type]++; Target 0: (beanstalkd) stopped. (lldb) l 1367 //查找到job后,从几个堆中进行移除操作 1368 j = job_find(id); 1369 j = remove_reserved_job(c, j) ? : 1370 remove_ready_job(j) ? : 1371 remove_buried_job(j) ? : 1372 remove_delayed_job(j); 1373 (lldb) l 1374 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD); 1375 1376 j->tube->stat.total_delete_ct++; 1377 1378 j->r.state = Invalid; 1379 r = walwrite(&c->srv->wal, j); 1380 walmaint(&c->srv->wal); (lldb) l //最后进行释放操作 1381 job_free(j); ``` ######对第一个问题做下简单总结: 1.ready,delay堆都在tube对象进行维护,而reserved堆则在conn对象进行维护。 2.消费客户端连接都先放到对应watch的tube等待集合中(tube.waiting),在process_queue函数中进行统一消费,其用伪代码如下描述,这段是比较关键的部分 ```python while j = next_eligible_job()://从优先堆中取出job job_remove(tube.ready, j)//ready状态堆中移除job job_insert(conn.reserved, j)//把job插入客户端连接的reserved集合中 ``` 下一篇再说第二个问题...