beanstalkd源码解析(2) 作者: nbboy 时间: 2020-10-23 分类: 软件架构,软件工程,设计模式,C 评论 ###开篇 在开篇先介绍下beanstalkd的一些基础结构,然后分析下第二个问题。 ####基础结构 ```c //最小堆结构(优先队列) struct Heap { int cap; int len; void **data; Less less; Record rec; }; //ADT int heapinsert(Heap *h, void *x);//向堆中插入一个新元素 void* heapremove(Heap *h, int k);//删除堆中的一个元素 ``` ```c //集合结构 struct ms { size_t used, cap, last; void **items; ms_event_fn oninsert, onremove; }; //ADT void ms_init(ms a, ms_event_fn oninsert, ms_event_fn onremove);//初始化 void ms_clear(ms a);//清空 int ms_append(ms a, void *item);//追加 int ms_remove(ms a, void *item);//移除 int ms_contains(ms a, void *item);//是否包含 void *ms_take(ms a);//获取一个 ``` #### 业务结构 ```c //任务对象,客户端可以调度的一个单元 struct job { //持久化字段,这些字段都被写入到WAL日志中 Jobrec r; // persistent fields; these get written to the wal /* bookeeping fields; these are in-memory only */ char pad[6]; //任务属于的管道对象 tube tube; //在链表中的上个和下个任务 job prev, next; /* linked list of jobs */ //在哈希表中的下个任务 job ht_next; /* Next job in a hash table list */ //任务在当前堆中的位置 size_t heap_index; /* where is this job in its current heap */ File *file; job fnext; job fprev; void *reserver; int walresv; int walused; char body[]; // written separately to the wal }; #define make_job(pri,delay,ttr,body_size,tube) make_job_with_id(pri,delay,ttr,body_size,tube,0) job allocate_job(int body_size);//分配一个job job make_job_with_id(uint pri, int64 delay, int64 ttr, int body_size, tube tube, uint64 id);//创建job void job_free(job j);//释放job /* Lookup a job by job ID */ job job_find(uint64 job_id);//查找 /* the void* parameters are really job pointers */ void job_setheappos(void*, int); int job_pri_less(void*, void*); int job_delay_less(void*, void*); job job_copy(job j);//拷贝job const char * job_state(job j);//获取job状态 int job_list_any_p(job head); job job_remove(job j);//移除job void job_insert(job head, job j);//插入job /* for unit tests */ size_t get_all_jobs_used(void); ``` ```c //管道对象,用来抽象同一种消息类型 struct tube { //被引用次数,用于监控管道被使用的情况 uint refs; //管道名称 char name[MAX_TUBE_NAME_LEN]; //预备执行队列 Heap ready; //延迟执行队列 Heap delay; //等待的连接集合 struct ms waiting; /* set of conns */ //统计结构,stats系列命令用到该结构 struct stats stat; //正被使用的次数 uint using_ct; //正被监听的次数 uint watching_ct; int64 pause; // int64 deadline_at; // struct job buried; }; tube make_tube(const char *name);//创建一个tube void tube_dref(tube t); void tube_iref(tube t); tube tube_find(const char *name);//查找 tube tube_find_or_make(const char *name); ``` ####服务有关结构 ```c /* 对服务器对象的封装,全局单例 */ struct Server { //端口 char *port; //地址 char *addr; //用户 char *user; //WAL日志对象 Wal wal; //服务器开启的套接字 Socket sock; //客户端连接池 Heap conns; }; void srvserve(Server *srv); void srvaccept(Server *s, int ev); ``` ```c //客户端链接对象 struct Conn { //服务器对象 Server *srv; //Client套接字 Socket sock; char state; char type; //下一个连接对象 Conn *next; //操作的管道 tube use; //运作时间 int64 tickat; // time at which to do more work //在链接池中的索引 int tickpos; // position in srv->conns //最近的任务 job soonest_job; // memoization of the soonest job int rw; // currently want: 'r', 'w', or 'h' //阻塞超时的时间 int pending_timeout; char halfclosed; //命令 char cmd[LINE_BUF_SIZE]; // this string is NOT NUL-terminated int cmd_len; int cmd_read; //回复 char *reply; int reply_len; int reply_sent; char reply_buf[LINE_BUF_SIZE]; // this string IS NUL-terminated // How many bytes of in_job->body have been read so far. If in_job is NULL // while in_job_read is nonzero, we are in bit bucket mode and // in_job_read's meaning is inverted -- then it counts the bytes that // remain to be thrown away. int in_job_read; job in_job; // a job to be read from the client job out_job; int out_job_sent; //监听的集合 struct ms watch; //正在被消费的任务列表 struct job reserved_jobs; // linked list header }; int connless(Conn *a, Conn *b); void connrec(Conn *c, int i); void connwant(Conn *c, int rw); void connsched(Conn *c); void connclose(Conn *c); void connsetproducer(Conn *c); void connsetworker(Conn *c); job connsoonestjob(Conn *c); int conndeadlinesoon(Conn *c); int conn_ready(Conn *c); ``` 需要说明的是,文件有关的结构和WAL有关的结构,就不在这里贴出来了,因为我也没整明白。 ###分析 看第二个问题.回顾下刚才代码 ```c //投递任务到队列 static int enqueue_job(Server *s, job j, int64 delay, char update_store) { int r; j->reserver = NULL; if (delay) { //任务开始执行时间 j->r.deadline_at = nanoseconds() + delay; r = heapinsert(&j->tube->delay, j); if (!r) return 0; //设置为被等待执行状态 j->r.state = Delayed; } else {//立即执行的任务就投递到预备队列 ``` 如果是延迟任务,就放入到tube的延迟堆中,且状态是delay 那么什么时候状态变为ready,然后放入ready堆中以供客户端去消费? 客户端投递一个任务后,下一个硬件断点看下值的变化 ```c (gdb) watch all_jobs_init[4].r. body_size bury_ct created_at deadline_at delay id kick_ct pri release_ct reserve_ct state timeout_ct ttr (gdb) watch all_jobs_init[4].r.state Hardware watchpoint 6: all_jobs_init[4].r.state (gdb) info breakpoints Num Type Disp Enb Address What 6 hw watchpoint keep y all_jobs_init[4].r.state ``` 然后客户端开始消费任务,时间到期后,即命中断点 ```c (gdb) c Continuing. Hardware watchpoint 6: all_jobs_init[4].r.state Old value = 4 '\004' New value = 1 '\001' enqueue_job (s=0x611520 , j=0x62a970, delay=0, update_store=0 '\000') at prot.c:479 479 ready_ct++; (gdb) bt #0 enqueue_job (s=0x611520 , j=0x62a970, delay=0, update_store=0 '\000') at prot.c:479 #1 0x0000000000409e90 in prottick (s=0x611520 ) at prot.c:1904 #2 0x000000000040b2ee in srvserve (s=0x611520 ) at serv.c:51 #3 0x000000000040d1d4 in main (argc=1, argv=0x7fffffffe488) at main.c:100 ``` 从堆栈中看出,是在prottick中进行调度。截取关键代码片段 ```c while ((j = delay_q_peek())) { d = j->r.deadline_at - now; if (d > 0) { period = min(period, d); break; } j = delay_q_take(); //从delay里取出最近需要执行的job,放入ready队列里 r = enqueue_job(s, j, 0, 0); if (r < 1) bury_job(s, j, 0); /* out of memory, so bury it */ } ``` 从上面代码可以看出把delay堆中的任务往ready堆中移动的过程,不过这段代码是怎样触发的? 往上翻一下就可以找到答案 ```c for (;;) { period = prottick(s); //sockXXX都是对平台的网络抽象 int rw = socknext(&sock, period); if (rw == -1) { twarnx("socknext"); exit(1); } //调用上面的接收链接回调函数 if (rw) { //accept开始接受网络请求 sock->f(sock->x, rw); } } int socknext(Socket **s, int64 timeout) { int r; struct epoll_event ev; r = epoll_wait(epfd, &ev, 1, (int)(timeout/1000000)); ... ``` 触发的条件有两个要么timeout到期,要么有新事件到来,不明白epoll事件机制的可以复习下epoll,那如何 计算这个超时时间呢? ```c while ((j = delay_q_peek())) { d = j->r.deadline_at - now; if (d > 0) { period = min(period, d); break; } j = delay_q_take(); //从delay里取出最近需要执行的job,放入ready队列里 r = enqueue_job(s, j, 0, 0); if (r < 1) bury_job(s, j, 0); /* out of memory, so bury it */ } for (i = 0; i < tubes.used; i++) { t = tubes.items[i]; d = t->deadline_at - now; if (t->pause && d <= 0) { t->pause = 0; process_queue(); } else if (d > 0) { period = min(period, d); } } while (s->conns.len) { Conn *c = s->conns.data[0]; d = c->tickat - now; if (d > 0) { period = min(period, d); break; } heapremove(&s->conns, 0); conn_timeout(c); } ``` 也就是根据job,tube,conn这几个对象计算一个最小时间戳 总结下第二个问题: 1.通过epoll_wait(其他平台也类似)来等待超时或者网络事件,而超时时候就做定时逻辑,包括移动delay堆中的job到ready堆中 这篇贴的代码有点多,下一篇再说第三个问题…
beanstalkd源码解析(1) 作者: nbboy 时间: 2020-10-23 分类: 软件架构,软件工程,设计模式,C 评论 ###简介 Beanstalkd是一个简单、高效的工作队列系统,其最初设计目的是通过后台异步执行耗时任务方式降低高容量Web应用的页面延时。而其简单、轻量、易用等特点,和对任务优先级、延时 超时重发等控制,以及众多语言版本的客户端的良好支持,使其可以很好的在各种需要队列系统的场景中应用。 其采用的是生产者、消费者模式,借鉴了memcached的设计,协议也很简单。先了解下其基础概念: #####job - 任务 job是一个需要异步处理的任务,是Beanstalkd中的基本单元,job需要放在一个tube中。Beanstalkd中的任务(job)类似于其它队列系统中的消息(message)的概念。 #####tube - 管道 管道即某一种类型的任务队列,其类似与消息的主题(topic),是Producer和Consumer的操作对象。一个Beanstalkd中可以有多个管道, 每个管道都有自己的发布者(Producer)和消费者Consumer,管道之间互相不影响。 #####producer - 生产者 任务(job)的生产者,通过put命令来将一个job放到一个tube中。 #####consumer - 消费者 任务(job)的消费者,通过reserve、release、bury、delete命令来获取或改变job的状态。 在网络上找了一副图,描述的是其状态流转情况:  一个Beanstalkd任务可能会包含以下状态: **READY** - 需要立即处理的任务。当producer直接put一个任务时,任务就处于READY状态,以等待consumer来处理。当延时 (DELAYED) 任务到期后会自动成为当前READY状态的任务 **DELAYED** - 延迟执行的任务。当任务被延时put时,任务就处于DELAYED状态。等待时间过后,任务会被迁移到READY状态。当消费者处理任务后,可以用将消息再次放回DELAYED队列延迟执行 **RESERVED** - 已经被消费者获取,正在执行的任务。当consumer获取了当前READY的任务后,该任务的状态就会迁移到RESERVED状态,这时其它的consumer就不能再操作该任务。Beanstalkd会检查任务是否在TTR(time-to-run)内完成 **BURIED** - 保留的任务,这时任务不会被执行,也不会消失。当consumer完成该任务后,可以选择delete、release或者bury操作。 delete后,任务会被删除,生命周期结束;release操作可以重新把任务状态迁移回READY状态或DELAYED状态,使其他consumer可以继续获取和执行该任务。bury会拔任务休眠,等需要该任务时,再将休眠的任务kick回READY;也可能过delete删除BURIED状态的任务 **DELETED** - 消息被删除,Beanstalkd不再维持这些消息。即任务生命周期结束。 ###分析 请原谅我罗里吧嗦得说了那么多,现在直接进入主题。我一般看源码都是带着问题去看,这样效率会高很多,这次也不例外,先提出几个问题吧。 - **beanstalkd如何维护job的状态?** - **delay状态的job怎么修改为ready?** - **如何实现优先级?** - **数据是如何持久化?** 对于问题1可以关注use,put,reserve,delete等命令的处理逻辑 ######use 是生产者命令 (lldb) bt * thread #1, queue = 'com.apple.main-thread', stop reason = step over * frame #0: 0x000000010000c011 beanstalkd`make_and_insert_tube(name="abc") at tube.c:70 frame #1: 0x000000010000bfd9 beanstalkd`tube_find_or_make(name="abc") at tube.c:96 frame #2: 0x00000001000087d0 beanstalkd`dispatch_cmd(c=0x0000000100400430) at prot.c:1554 frame #3: 0x0000000100006ca5 beanstalkd`do_cmd(c=0x0000000100400430) at prot.c:1686 frame #4: 0x00000001000067ca beanstalkd`conn_data(c=0x0000000100400430) at prot.c:1726 frame #5: 0x0000000100006623 beanstalkd`h_conn(fd=6, which=114, c=0x0000000100400430) at prot.c:1868 frame #6: 0x0000000100005968 beanstalkd`prothandle(c=0x0000000100400430, ev=114) at prot.c:1880 frame #7: 0x000000010000bb75 beanstalkd`srvserve(s=0x00000001000104b8) at serv.c:63 frame #8: 0x000000010000e19f beanstalkd`main(argc=1, argv=0x00007ffeefbff370) at main.c:100 frame #9: 0x00007fff6df26085 libdyld.dylib`start + 1 ######跟踪put命令 * * (lldb) bt * thread #1, queue = 'com.apple.main-thread', stop reason = breakpoint 6.1 * frame #0: 0x0000000100004f99 beanstalkd`enqueue_job(s=0x00000001000104b8, j=0x0000000100203b90, delay=0, update_store='\x01') at prot.c:466 frame #1: 0x00000001000070bf beanstalkd`enqueue_incoming_job(c=0x0000000100203850) at prot.c:867 frame #2: 0x0000000100006dfb beanstalkd`maybe_enqueue_incoming_job(c=0x0000000100203850) at prot.c:1160 frame #3: 0x000000010000757f beanstalkd`dispatch_cmd(c=0x0000000100203850) at prot.c:1279 frame #4: 0x0000000100006ca5 beanstalkd`do_cmd(c=0x0000000100203850) at prot.c:1686 frame #5: 0x00000001000067ca beanstalkd`conn_data(c=0x0000000100203850) at prot.c:1726 frame #6: 0x0000000100006623 beanstalkd`h_conn(fd=6, which=114, c=0x0000000100203850) at prot.c:1868 frame #7: 0x0000000100005968 beanstalkd`prothandle(c=0x0000000100203850, ev=114) at prot.c:1880 frame #8: 0x000000010000bb75 beanstalkd`srvserve(s=0x00000001000104b8) at serv.c:63 frame #9: 0x000000010000e19f beanstalkd`main(argc=1, argv=0x00007ffeefbff370) at main.c:100 frame #10: 0x00007fff6df26085 libdyld.dylib`start + 1 put命令主要是在ready堆中插入job ```c frame #0: 0x0000000100004f99 beanstalkd`enqueue_job(s=0x00000001000104b8, j=0x0000000100500370, delay=0, update_store='\x01') at prot.c:466 463 { 464 int r; 465 -> 466 j->reserver = NULL; 467 if (delay) { 468 //任务开始执行时间 469 j->r.deadline_at = nanoseconds() + delay; Target 0: (beanstalkd) stopped. (lldb) l 470 r = heapinsert(&j->tube->delay, j); 471 if (!r) return 0; 472 //设置为被等待执行状态 473 j->r.state = Delayed; 474 } else {//立即执行的任务就投递到预备队列 475 r = heapinsert(&j->tube->ready, j); 476 if (!r) return 0; (lldb) l 477 //设置状态为预备执行状态 478 j->r.state = Ready; 479 ready_ct++; 480 481 //对于紧急任务,进行额外的跟踪处理 482 if (j->r.pri < URGENT_THRESHOLD) { 483 global_stat.urgent_ct++; (lldb) l 484 j->tube->stat.urgent_ct++; 485 } 486 } ``` ######reserve命令 (lldb) bt * thread #1, queue = 'com.apple.main-thread', stop reason = step in * frame #0: 0x000000010000ac0c beanstalkd`enqueue_waiting_conn(c=0x0000000100500420) at prot.c:687 frame #1: 0x00000001000097f0 beanstalkd`wait_for_job(c=0x0000000100500420, timeout=3600) at prot.c:1024 frame #2: 0x00000001000079cd beanstalkd`dispatch_cmd(c=0x0000000100500420) at prot.c:1358 frame #3: 0x0000000100006ca5 beanstalkd`do_cmd(c=0x0000000100500420) at prot.c:1686 frame #4: 0x00000001000067ca beanstalkd`conn_data(c=0x0000000100500420) at prot.c:1726 frame #5: 0x0000000100006623 beanstalkd`h_conn(fd=6, which=114, c=0x0000000100500420) at prot.c:1868 frame #6: 0x0000000100005968 beanstalkd`prothandle(c=0x0000000100500420, ev=114) at prot.c:1880 frame #7: 0x000000010000bb75 beanstalkd`srvserve(s=0x00000001000104b8) at serv.c:63 frame #8: 0x000000010000e19f beanstalkd`main(argc=1, argv=0x00007ffeefbff370) at main.c:100 frame #9: 0x00007fff6df26085 libdyld.dylib`start + 1 ```c 1344 case OP_RESERVE: /* FALLTHROUGH */ 1345 /* don't allow trailing garbage */ -> 1346 if (type == OP_RESERVE && c->cmd_len != CMD_RESERVE_LEN + 2) { 1347 return reply_msg(c, MSG_BAD_FORMAT); 1348 } 1349 Target 0: (beanstalkd) stopped. (lldb) l 1350 op_ct[type]++; 1351 connsetworker(c); 1352 1353 if (conndeadlinesoon(c) && !conn_ready(c)) { 1354 return reply_msg(c, MSG_DEADLINE_SOON); 1355 } 1356 (lldb) l 1357 /* try to get a new job for this guy */ 1358 wait_for_job(c, timeout); 1359 process_queue(); 1360 break; ``` 把当前conn先加入到waitting集合中 ```c * thread #1, queue = 'com.apple.main-thread', stop reason = step over frame #0: 0x00000001000097e7 beanstalkd`wait_for_job(c=0x0000000100500420, timeout=3600) at prot.c:1024 1021 wait_for_job(Conn *c, int timeout) 1022 { 1023 c->state = STATE_WAIT; -> 1024 enqueue_waiting_conn(c); 1025 1026 /* Set the pending timeout to the requested timeout amount */ 1027 c->pending_timeout = timeout; Target 0: (beanstalkd) stopped. (lldb) s Process 40697 stopped * thread #1, queue = 'com.apple.main-thread', stop reason = step in frame #0: 0x000000010000ac0c beanstalkd`enqueue_waiting_conn(c=0x0000000100500420) at prot.c:687 684 tube t; 685 size_t i; 686 -> 687 global_stat.waiting_ct++; 688 c->type |= CONN_TYPE_WAITING; 689 for (i = 0; i < c->watch.used; i++) { 690 t = c->watch.items[i]; Target 0: (beanstalkd) stopped. (lldb) l 691 t->stat.waiting_ct++; //加入到tube的等待集合中 692 ms_append(&t->waiting, c); 693 } 694 } ``` 状态从ready到reserve ```c thread #1, queue = 'com.apple.main-thread', stop reason = step in frame #0: 0x0000000100006030 beanstalkd`reserve_job(c=0x0000000100500420, j=0x0000000100203b90) at prot.c:380 377 static void 378 reserve_job(Conn *c, job j) 379 { -> 380 j->r.deadline_at = nanoseconds() + j->r.ttr; 381 global_stat.reserved_ct++; /* stats */ 382 j->tube->stat.reserved_ct++; 383 j->r.reserve_ct++; Target 0: (beanstalkd) stopped. (lldb) l //改变状态为reserved 384 j->r.state = Reserved; //插入到conn的reserved_jobs堆中 385 job_insert(&c->reserved_jobs, j); 386 j->reserver = c; 387 c->pending_timeout = -1; 388 if (c->soonest_job && j->r.deadline_at < c->soonest_job->r.deadline_at) { 389 c->soonest_job = j; 390 } (lldb) l 391 return reply_job(c, j, MSG_RESERVED); 392 } ``` ######简单看下delete操作 ```c case OP_DELETE: -> 1363 errno = 0; 1364 id = strtoull(c->cmd + CMD_DELETE_LEN, &end_buf, 10); 1365 if (errno) return reply_msg(c, MSG_BAD_FORMAT); 1366 op_ct[type]++; Target 0: (beanstalkd) stopped. (lldb) l 1367 //查找到job后,从几个堆中进行移除操作 1368 j = job_find(id); 1369 j = remove_reserved_job(c, j) ? : 1370 remove_ready_job(j) ? : 1371 remove_buried_job(j) ? : 1372 remove_delayed_job(j); 1373 (lldb) l 1374 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD); 1375 1376 j->tube->stat.total_delete_ct++; 1377 1378 j->r.state = Invalid; 1379 r = walwrite(&c->srv->wal, j); 1380 walmaint(&c->srv->wal); (lldb) l //最后进行释放操作 1381 job_free(j); ``` ######对第一个问题做下简单总结: 1.ready,delay堆都在tube对象进行维护,而reserved堆则在conn对象进行维护。 2.消费客户端连接都先放到对应watch的tube等待集合中(tube.waiting),在process_queue函数中进行统一消费,其用伪代码如下描述,这段是比较关键的部分 ```python while j = next_eligible_job()://从优先堆中取出job job_remove(tube.ready, j)//ready状态堆中移除job job_insert(conn.reserved, j)//把job插入客户端连接的reserved集合中 ``` 下一篇再说第二个问题...
redis 多线程初体验 作者: nbboy 时间: 2020-08-25 分类: 软件架构,软件工程,设计模式 评论 ###测试环境 mbp本机测试,测试机子配置: 2.9 GHz Intel Core i5 8 GB 1867 MHz DDR3 ###比较版本 redis都开启rdb和aof持久化,比较的版本redis-5.0.7和redis-6.0.6(开启多线程支持),使用测试工具就是作者提供的redis-benchmark,测试命令如下: ```shell redis-benchmark -t set,get -n 1000000 -r 100000000 -h 192.168.1.197 -d {dataSize} -c 200 --threads 4 ``` ###图表 用图表导出后得到: Redis Set命令压测结果 Redis Get命令压测结果 ###结论 我们从上面图表中得出结论,在开启多线程模式下,性能提高确实不少,特别是在包越大的情况下,效果更加明显。其实这是和作者的实现方式有关,在redis中,真正执行命令还是在主线程中,而是把网络数据收发和命令解析单独在i/o线程中去完成而已,作者说他不想让实现变得复杂,而效果也没那么明显。具体可以看下他的文章:http://antirez.com/news/126
redis rdb机制浅析 作者: nbboy 时间: 2020-08-24 分类: 软件架构,软件工程,设计模式 评论 ###dump rdb文件的流程: 1.redis fork()调用,创建子进程 2.利用copy-on-write技术,对内存dump出来到一个临时文件 3.完成dump文件后,替换该临时文件为dump.rdb文件 ###需要注意的几个问题: 1.dump的是执行这条命令时候的数据 2.从上面步骤可以看到,任何时候dump.rdb文件其实都是完整的 3.可以执行save,bgsave来手动执行dump操作,或者配置save规则让redis自动执行,其实也是执行bgsave 4.redis重启后,读取dump.rdb文件开始从磁盘到内存的数据加载过程,这个过程是阻塞的,直到完成加载 5.rdb方式会丢失数据,这部分数据就是从上一次dump到redis挂掉为止修改的数据 通过调试跟踪,bgsave命令最终的实现的文件在rdb.c里的bgsaveCommand ```c /* BGSAVE [SCHEDULE] bgsave执行的命令 */ void bgsaveCommand(client *c) { int schedule = 0; /* The SCHEDULE option changes the behavior of BGSAVE when an AOF rewrite * is in progress. Instead of returning an error a BGSAVE gets scheduled. */ if (c->argc > 1) { if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"schedule")) { schedule = 1; } else { addReply(c,shared.syntaxerr); return; } } rdbSaveInfo rsi, *rsiptr; //初始化rdb集群相关信息 rsiptr = rdbPopulateSaveInfo(&rsi); //如果bgsave进程还在执行中,则返回错误 if (server.rdb_child_pid != -1) { addReplyError(c,"Background save already in progress"); //这里也一样,如果系统还在执行gsave进程,或者执行aof重写进程,或者执行模块进程则选择不执行 } else if (hasActiveChildProcess()) { if (schedule) { server.rdb_bgsave_scheduled = 1; addReplyStatus(c,"Background saving scheduled"); } else { addReplyError(c, "Another child process is active (AOF?): can't BGSAVE right now. " "Use BGSAVE SCHEDULE in order to schedule a BGSAVE whenever " "possible."); } //执行真正的bgsave } else if (rdbSaveBackground(server.rdb_filename,rsiptr) == C_OK) { addReplyStatus(c,"Background saving started"); } else { addReply(c,shared.err); } } ``` 通过上面的代码,我们可以看到,aof重写进程在工作的时候,bgsave不会真正工作!在rdbSaveBackground里看下做了什么事情? ```c int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) { pid_t childpid; //再一次检测工作进程执行情况 if (hasActiveChildProcess()) return C_ERR; server.dirty_before_bgsave = server.dirty; server.lastbgsave_try = time(NULL); //建立父子进程通信之间的管道 openChildInfoPipe(); //执行包装的fork if ((childpid = redisFork()) == 0) { int retval; /* Child */ redisSetProcTitle("redis-rdb-bgsave"); redisSetCpuAffinity(server.bgsave_cpulist); //执行真正的保存rdb文件逻辑 retval = rdbSave(filename,rsi); if (retval == C_OK) { //用建立的管道,通知父进程,子进程已经结束的通知 sendChildCOWInfo(CHILD_INFO_TYPE_RDB, "RDB"); } exitFromChild((retval == C_OK) ? 0 : 1); } else { /* Parent */ if (childpid == -1) { closeChildInfoPipe(); server.lastbgsave_status = C_ERR; serverLog(LL_WARNING,"Can't save in background: fork: %s", strerror(errno)); return C_ERR; } serverLog(LL_NOTICE,"Background saving started by pid %d",childpid); server.rdb_save_time_start = time(NULL); server.rdb_child_pid = childpid; server.rdb_child_type = RDB_CHILD_TYPE_DISK; return C_OK; } return C_OK; /* unreached */ } ``` 代码注释非常详细了,可以看到用fork创建了子进程,真正的dump工作都是在子进程中完成的。因为操作系统实现的copy-on-write机制,所以fork后其实子进程地址空间和父进程地址空间还是同一个,所以数据完全可以dump出来,关于copy-on-write可以看下,这篇文章的理论知识https://wingsxdu.com/post/linux/concurrency-oriented-programming/fork-and-cow/ 进一步阅读rdbSave函数,具体的dump工作就是在这个函数里进行的。 ```c /* Save the DB on disk. Return C_ERR on error, C_OK on success. */ int rdbSave(char *filename, rdbSaveInfo *rsi) { char tmpfile[256]; char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */ FILE *fp; rio rdb; int error = 0; //建立临时文件,后续的rdb内容都先写到这个临时文件中 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid()); fp = fopen(tmpfile,"w"); if (!fp) { char *cwdp = getcwd(cwd,MAXPATHLEN); serverLog(LL_WARNING, "Failed opening the RDB file %s (in server root dir %s) " "for saving: %s", filename, cwdp ? cwdp : "unknown", strerror(errno)); return C_ERR; } //初始化文件,这里的rio是作者抽象的流式文件对象 rioInitWithFile(&rdb,fp); //通知模块触发持久化的事件 startSaving(RDBFLAGS_NONE); if (server.rdb_save_incremental_fsync) rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES); //这里执行真正的dump逻辑,涉及到很多rdb文件格式细节 if (rdbSaveRio(&rdb,&error,RDBFLAGS_NONE,rsi) == C_ERR) { errno = error; goto werr; } //下面的几个操作,都是保证让buffer中的数据都写入到磁盘 //关于这方面的讨论具体可以看下作者的讨论:http://oldblog.antirez.com/post/redis-persistence-demystified.html /* Make sure data will not remain on the OS's output buffers */ if (fflush(fp) == EOF) goto werr; if (fsync(fileno(fp)) == -1) goto werr; if (fclose(fp) == EOF) goto werr; /* Use RENAME to make sure the DB file is changed atomically only * if the generate DB file is ok. */ //把临时文件重新命名为dump.rdb(可以配置)文件 if (rename(tmpfile,filename) == -1) { char *cwdp = getcwd(cwd,MAXPATHLEN); serverLog(LL_WARNING, "Error moving temp DB file %s on the final " "destination %s (in server root dir %s): %s", tmpfile, filename, cwdp ? cwdp : "unknown", strerror(errno)); unlink(tmpfile); stopSaving(0); return C_ERR; } serverLog(LL_NOTICE,"DB saved on disk"); server.dirty = 0; server.lastsave = time(NULL); server.lastbgsave_status = C_OK; //通知模块持久化结束的事件 stopSaving(1); return C_OK; werr: serverLog(LL_WARNING,"Write error saving DB on disk: %s", strerror(errno)); fclose(fp); //执行错误的话,需要删除临时文件 unlink(tmpfile); stopSaving(0); return C_ERR; } ``` ###从上面代码代码可以得出结论: 1)都是先写到临时文件temp-dump.rdb中,然后再重命名为正式文件dump.rdb 2) 在持久化之前和之后,模块会收到开始和结束事件 3)rdbSaveRio根据rdb特有格式写到文件中,其实rdb也用在集群复制中 4)写到文件后,数据都会从用户缓存区和内核缓冲区强制写到磁盘驱动中去,也就是实现落盘,这样即使redis服务崩溃,或者系统崩溃这两个级别错误,都可以应对。 ###总结: rdbSaveRio中是具体的dump逻辑,根据rdb格式dump出来,这个文件格式的具体描述可以看https://github.com/sripathikrishnan/redis-rdb-tools/wiki/Redis-RDB-Dump-File-Format ,因为这个逻辑不是本篇文章的重点,所以不再叙述。 参考: https://blog.csdn.net/aitangyong/article/details/52045251
redis6.0 客户端缓存学习笔记 作者: nbboy 时间: 2020-08-12 分类: 软件架构,软件工程,设计模式 评论 这个功能的原因是在redis server 前面加入L1缓存,也就是进程内缓存,进程内的缓存好处是减少网络io开销和序列化、反序列化的开销,需要解决的核心问题是数据一致性问题。 Redis 同时在server side和client side改动来支持客户端缓存,这里有两种模式: ###默认模式: 该模式需要server side记录key的信息,维护一个**Invalidation Table**来记录所有的track key,客户端获取key后,记录在该表中,以后一旦有其他客户端改动该key,就向源客户端推送失效信息。需要注意的是1.这种模式是链接会话级别的,断开链接后,不在track key.2.发送失效推送后,该key不在track,除非再次get key.该种模式可以看到占用了一定的server端内存,而且也只能收到一次变更信息。 ###广播模式: 广播模式更好理解,采用Pub/Sub模式,给所有关心该key 的客户端都推送变更通知,这种方式不用server 端记录状态信息,但是推送的端稍微更多一些,但是可以指定前缀来过滤推送的信息,在指定该前缀的情况下,server端显然需要存储**Prefixes Table**,所以在文档中指出,尽量要让这个prefix key 设计得小一点,不然将很损耗cpu 通过配置项tracking-table-max-keys可以设置**Invalidation Table**的大小,默认是1M,如果达到这个上限就开始删除之前没有修改过的key 参考: https://redis.io/topics/client-side-caching#two-connections-mode https://www.slideshare.net/RedisLabs/redisconf18-techniques-for-synchronizing-inmemory-caches-with-redis http://remcarpediem.net/article/e3e7a535/