Redis数据持久化机制AOF原理分析一
/* Function called at startup to load RDB or AOF file in memory. */ void loadDataFromDisk(void) { long long start = ustime(); if (server.aof_state == REDIS_AOF_ON) { if (loadAppendOnlyFile(server.aof_filename) == REDIS_OK) redisLog(REDIS_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000); } else { if (rdbLoad(server.rdb_filename) == REDIS_OK) { redisLog(REDIS_NOTICE,"DB loaded from disk: %.3f seconds", (float)(ustime()-start)/1000000); } else if (errno != ENOENT) { redisLog(REDIS_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno)); exit(1); } } }Server首先判断加载AOF文件是因为AOF文件中的数据要比RDB文件中的数据要新。
int loadAppendOnlyFile(char *filename) { struct redisClient *fakeClient; FILE *fp = fopen(filename,"r"); struct redis_stat sb; int old_aof_state = server.aof_state; long loops = 0; //redis_fstat就是fstat64函数,通过fileno(fp)得到文件描述符,获取文件的状态存储于sb中, //具体可以参考stat函数,st_size就是文件的字节数 if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) { server.aof_current_size = 0; fclose(fp); return REDIS_ERR; } if (fp == NULL) {//打开文件失败 redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno)); exit(1); } /* Temporarily disable AOF, to prevent EXEC from feeding a MULTI * to the same file we're about to read. */ server.aof_state = REDIS_AOF_OFF; fakeClient = createFakeClient(); //建立伪终端 startLoading(fp); // 定义于 rdb.c ,更新服务器的载入状态 while(1) { int argc, j; unsigned long len; robj **argv; char buf[128]; sds argsds; struct redisCommand *cmd; /* Serve the clients from time to time */ // 有间隔地处理外部请求,ftello()函数得到文件的当前位置,返回值为long if (!(loops++ % 1000)) { loadingProgress(ftello(fp));//保存aof文件读取的位置,ftellno(fp)获取文件当前位置 aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT);//处理事件 } //按行读取AOF数据 if (fgets(buf,sizeof(buf),fp) == NULL) { if (feof(fp))//达到文件尾EOF break; else goto readerr; } //读取AOF文件中的命令,依照Redis的协议处理 if (buf[0] != '*') goto fmterr; argc = atoi(buf+1);//参数个数 if (argc < 1) goto fmterr; argv = zmalloc(sizeof(robj*)*argc);//参数值 for (j = 0; j < argc; j++) { if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr; if (buf[0] != '$') goto fmterr; len = strtol(buf+1,NULL,10);//每个bulk的长度 argsds = sdsnewlen(NULL,len);//新建一个空sds //按照bulk的长度读取 if (len && fread(argsds,len,1,fp) == 0) goto fmterr; argv[j] = createObject(REDIS_STRING,argsds); if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF 跳过\r\n*/ } /* Command lookup */ cmd = lookupCommand(argv[0]->ptr); if (!cmd) { redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", (char*)argv[0]->ptr); exit(1); } /* Run the command in the context of a fake client */ fakeClient->argc = argc; fakeClient->argv = argv; cmd->proc(fakeClient);//执行命令 /* The fake client should not have a reply */ redisAssert(fakeClient->bufpos == 0 && listLength(fakeClient->reply) == 0); /* The fake client should never get blocked */ redisAssert((fakeClient->flags & REDIS_BLOCKED) == 0); /* Clean up. Command code may have changed argv/argc so we use the * argv/argc of the client instead of the local variables. */ for (j = 0; j < fakeClient->argc; j++) decrRefCount(fakeClient->argv[j]); zfree(fakeClient->argv); } /* This point can only be reached when EOF is reached without errors. * If the client is in the middle of a MULTI/EXEC, log error and quit. */ if (fakeClient->flags & REDIS_MULTI) goto readerr; fclose(fp); freeFakeClient(fakeClient); server.aof_state = old_aof_state; stopLoading(); aofUpdateCurrentSize(); //更新server.aof_current_size,AOF文件大小 server.aof_rewrite_base_size = server.aof_current_size; return REDIS_OK; ………… }在前面一篇关于AOF参数配置的博客遗留了一个问题,server.aof_current_size参数的初始化,下面解决这个疑问。
void aofUpdateCurrentSize(void) { struct redis_stat sb; if (redis_fstat(server.aof_fd,&sb) == -1) { redisLog(REDIS_WARNING,"Unable to obtain the AOF file length. stat: %s", strerror(errno)); } else { server.aof_current_size = sb.st_size; } }redis_fstat是作者对Linux中fstat64函数的重命名,该还是就是获取文件相关的参数信息,具体可以Google之,sb.st_size就是当前AOF文件的大小。这里需要知道server.aof_fd即AOF文件描述符,该参数的初始化在initServer()函数中
/* Open the AOF file if needed. */ if (server.aof_state == REDIS_AOF_ON) { server.aof_fd = open(server.aof_filename,O_WRONLY|O_APPEND|O_CREAT,0644); if (server.aof_fd == -1) { redisLog(REDIS_WARNING, "Can't open the append-only file: %s",strerror(errno)); exit(1); } }至此,Redis Server启动加载硬盘中AOF文件数据的操作就成功结束了。
当客户端执行Set等修改数据库中字段的指令时就会造成Server数据库中数据被修改,这些修改的数据应该被实时更新到AOF文件中,并且也要按照一定的fsync机制刷新到硬盘中,保证数据不会丢失。
/* This function gets called every time Redis is entering the * main loop of the event driven library, that is, before to sleep * for ready file descriptors. */ void beforeSleep(struct aeEventLoop *eventLoop) { REDIS_NOTUSED(eventLoop); listNode *ln; redisClient *c; /* Run a fast expire cycle (the called function will return * ASAP if a fast cycle is not needed). */ if (server.active_expire_enabled && server.masterhost == NULL) activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST); /* Try to process pending commands for clients that were just unblocked. */ while (listLength(server.unblocked_clients)) { ln = listFirst(server.unblocked_clients); redisAssert(ln != NULL); c = ln->value; listDelNode(server.unblocked_clients,ln); c->flags &= ~REDIS_UNBLOCKED; /* Process remaining data in the input buffer. */ //处理客户端在阻塞期间接收到的客户端发送的请求 if (c->querybuf && sdslen(c->querybuf) > 0) { server.current_client = c; processInputBuffer(c); server.current_client = NULL; } } /* Write the AOF buffer on disk */ //将server.aof_buf中的数据追加到AOF文件中并fsync到硬盘上 flushAppendOnlyFile(0); }通过上面的代码及注释可以发现,beforeSleep函数做了三件事:1、处理过期键,2、处理阻塞期间的客户端请求,3、将server.aof_buf中的数据追加到AOF文件中并fsync刷新到硬盘上,flushAppendOnlyFile函数给定了一个参数force,表示是否强制写入AOF文件,0表示非强制即支持延迟写,1表示强制写入。void flushAppendOnlyFile(int force) { ssize_t nwritten; int sync_in_progress = 0; if (sdslen(server.aof_buf) == 0) return; // 返回后台正在等待执行的 fsync 数量 if (server.aof_fsync == AOF_FSYNC_EVERYSEC) sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0; // AOF 模式为每秒 fsync ,并且 force 不为 1 如果可以的话,推延冲洗 if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) { /* With this append fsync policy we do background fsyncing. * If the fsync is still in progress we can try to delay * the write for a couple of seconds. */ // 如果 aof_fsync 队列里已经有正在等待的任务 if (sync_in_progress) { // 上一次没有推迟冲洗过,记录推延的当前时间,然后返回 if (server.aof_flush_postponed_start == 0) { /* No previous write postponinig, remember that we are * postponing the flush and return. */ server.aof_flush_postponed_start = server.unixtime; return; } else if (server.unixtime - server.aof_flush_postponed_start < 2) { // 允许在两秒之内的推延冲洗 /* We were already waiting for fsync to finish, but for less * than two seconds this is still ok. Postpone again. */ return; } /* Otherwise fall trough, and go write since we can't wait * over two seconds. */ server.aof_delayed_fsync++; redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis."); } } /* If you are following this code path, then we are going to write so * set reset the postponed flush sentinel to zero. */ server.aof_flush_postponed_start = 0; /* We want to perform a single write. This should be guaranteed atomic * at least if the filesystem we are writing is a real physical one. * While this will save us against the server being killed I don't think * there is much to do about the whole server stopping for power problems * or alike */ // 将 AOF 缓存写入到文件,如果一切幸运的话,写入会原子性地完成 nwritten = write(server.aof_fd,server.aof_buf,sdslen(server.aof_buf)); if (nwritten != (signed)sdslen(server.aof_buf)) {//出错 /* Ooops, we are in troubles. The best thing to do for now is * aborting instead of giving the illusion that everything is * working as expected. */ if (nwritten == -1) { redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno)); } else { redisLog(REDIS_WARNING,"Exiting on short write while writing to " "the append-only file: %s (nwritten=%ld, " "expected=%ld)", strerror(errno), (long)nwritten, (long)sdslen(server.aof_buf)); if (ftruncate(server.aof_fd, server.aof_current_size) == -1) { redisLog(REDIS_WARNING, "Could not remove short write " "from the append-only file. Redis may refuse " "to load the AOF the next time it starts. " "ftruncate: %s", strerror(errno)); } } exit(1); } server.aof_current_size += nwritten; /* Re-use AOF buffer when it is small enough. The maximum comes from the * arena size of 4k minus some overhead (but is otherwise arbitrary). */ // 如果 aof 缓存不是太大,那么重用它,否则,清空 aof 缓存 if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) { sdsclear(server.aof_buf); } else { sdsfree(server.aof_buf); server.aof_buf = sdsempty(); } /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are * children doing I/O in the background. */ //aof rdb子进程运行中不支持fsync并且aof rdb子进程正在运行,那么直接返回, //但是数据已经写到aof文件中,只是没有刷新到硬盘 if (server.aof_no_fsync_on_rewrite && (server.aof_child_pid != -1 || server.rdb_child_pid != -1)) return; /* Perform the fsync if needed. */ if (server.aof_fsync == AOF_FSYNC_ALWAYS) {//总是fsync,那么直接进行fsync /* aof_fsync is defined as fdatasync() for Linux in order to avoid * flushing metadata. */ aof_fsync(server.aof_fd); /* Let's try to get this data on the disk */ server.aof_last_fsync = server.unixtime; } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC && server.unixtime > server.aof_last_fsync)) { if (!sync_in_progress) aof_background_fsync(server.aof_fd);//放到后台线程进行fsync server.aof_last_fsync = server.unixtime; } }上述代码中请关注server.aof_fsync参数,即设置Redis fsync AOF文件到硬盘的策略,如果设置为AOF_FSYNC_ALWAYS,那么直接在主进程中fsync,如果设置为AOF_FSYNC_EVERYSEC,那么放入后台线程中fsync,后台线程的代码在bio.c中。