1.redis客户端启动主流程分析
目前研读的代码是redis5.0,启动流程在redis-cli.c的main函数中
先列一下main函数中,我认为比较重要的函数
main(){
// 客户端默认配置设置
config.hostip = sdsnew("127.0.0.1");
....
// 2.解析命令行携带的参数
firstarg = parseOptions(argc,argv);
// 3. 针对不同的模式进行处理
CLUSTER_MANAGER_MODE()
config.latency_mode
config.latency_dist_mode
// 4.当参数为0且没有任何命令提供时,则进入交互模式
if (argc == 0 && !config.eval) {
signal(SIGPIPE, SIG_IGN);
//客户端连接,授权,选择默认的db0库
cliConnect(0);
repl();
}
// 后面就可以不用看了,如果进入了我们经常使用的交互模式后,会在上面进行循环,直到执行退出命令,
// 网络断开连接啥的
}
从上面我们可以看出,重点内容其实就两点:
1.客户端连接
2.进入交互模式
2. 客户端连接 cliConnect(0)
函数功能:
- context = redisConnect(config.hostip,config.hostport); 进行redis server的tcp连接
- anetKeepAlive(NULL, context->fd, REDIS_CLI_KEEPALIVE_INTERVAL); socket 保活
- cliAuth() 发送授权命令到redis server
- 发送选择db命令到redis server,默认是数据库0
static int cliConnect(int flags) {
if (context == NULL || flags & CC_FORCE) {
// 连接未初始化的情况
if (context != NULL) {
// 上线文释放
redisFree(context);
}
// 没有连接
if (config.hostsocket == NULL) {
//进行连接
context = redisConnect(config.hostip,config.hostport);
} else {
// 连接unix的socket
context = redisConnectUnix(config.hostsocket);
}
if (context->err) {
// 连接失败
if (!(flags & CC_QUIET)) {
fprintf(stderr,"Could not connect to Redis at ");
if (config.hostsocket == NULL)
fprintf(stderr,"%s:%d: %sn",
config.hostip,config.hostport,context->errstr);
else
fprintf(stderr,"%s: %sn",
config.hostsocket,context->errstr);
}
redisFree(context);
context = NULL;
return REDIS_ERR;
}
anetKeepAlive(NULL, context->fd, REDIS_CLI_KEEPALIVE_INTERVAL);
if (cliAuth() != REDIS_OK)
return REDIS_ERR;
if (cliSelect() != REDIS_OK)
return REDIS_ERR;
}
return REDIS_OK;
}
3.进入交互模式repl(void)
核心内容:
- 初始化帮助信息cliInitHelp();
- cliRefreshPrompt(); 刷新输入框
- linenoise(context ? config.prompt : "not connected> ") 读取命令行
- argv = cliSplitArgs(line,&argc); 将读取的命令行进行拆分
- repeat = strtol(argv[0], &endptr, 10); 命令行重复命令检查
- 存储命令行到历史记录中去
- 判断是否为quit或者exit,是则退出
- restart 命令进行会话重启
- connect命令 进行连接
- clear命令进行屏幕内容清理
- 核心环节(issueCommandRepeat(argc-skipargs, argv+skipargs, repeat);)
- cliReadReply(0); 客户端接收回复
其中从3步到第12步是一个循环的过程,阻塞是发生在监听用户输入。
static void repl(void) {
sds historyfile = NULL;
int history = 0;
char *line;
int argc;
sds *argv;
//初始化帮助
cliInitHelp();
// 客户端整合帮助信息
cliIntegrateHelp();
config.interactive = 1;
linenoiseSetMultiLine(1);
//设置回调
linenoiseSetCompletionCallback(completionCallback);
linenoiseSetHintsCallback(hintsCallback);
linenoiseSetFreeHintsCallback(freeHintsCallback);
if (isatty(fileno(stdin))) {
historyfile = getDotfilePath(REDIS_CLI_HISTFILE_ENV,REDIS_CLI_HISTFILE_DEFAULT);
//keep in-memory history always regardless if history file can be determined
history = 1;
if (historyfile != NULL) {
linenoiseHistoryLoad(historyfile);
}
cliLoadPreferences();
}
//刷新输入框
cliRefreshPrompt();
//利用linenoise获得输入信息
while((line = linenoise(context ? config.prompt : "not connected> ")) != NULL) {
if (line[0] != ' ') {
long repeat = 1;
int skipargs = 0;
char *endptr = NULL;
//把输入信息分割成字符串数组
argv = cliSplitArgs(line,&argc);
if (argv && argc > 0) {
errno = 0;
repeat = strtol(argv[0], &endptr, 10);
if (argc > 1 && *endptr == ' ') {
if (errno == ERANGE || errno == EINVAL || repeat <= 0) {
fputs("Invalid redis-cli repeat command option value.n", stdout);
sdsfreesplitres(argv, argc);
linenoiseFree(line);
continue;
}
skipargs = 1;
} else {
repeat = 1;
}
}
if (!(argv && argc > 0 && !strcasecmp(argv[0+skipargs], "auth"))) {
if (history) linenoiseHistoryAdd(line);
if (historyfile) linenoiseHistorySave(historyfile);
}
if (argv == NULL) {
printf("Invalid argument(s)n");
linenoiseFree(line);
continue;
} else if (argc > 0) {
//判断是否为qiut,exit
if (strcasecmp(argv[0],"quit") == 0 ||
strcasecmp(argv[0],"exit") == 0)
{
exit(0);
} else if (argv[0][0] == ':') {
cliSetPreferences(argv,argc,1);
sdsfreesplitres(argv,argc);
linenoiseFree(line);
continue;
} else if (strcasecmp(argv[0],"restart") == 0) {
if (config.eval) {
config.eval_ldb = 1;
config.output = OUTPUT_RAW;
return;
} else {
printf("Use 'restart' only in Lua debugging mode.");
}
} else if (argc == 3 && !strcasecmp(argv[0],"connect")) {
sdsfree(config.hostip);
config.hostip = sdsnew(argv[1]);
config.hostport = atoi(argv[2]);
cliRefreshPrompt();
cliConnect(CC_FORCE);
} else if (argc == 1 && !strcasecmp(argv[0],"clear")) {
linenoiseClearScreen();
}
//进行命令发送
else {
long long start_time = mstime(), elapsed;
// 发送命令
issueCommandRepeat(argc-skipargs, argv+skipargs, repeat);
if (config.eval_ldb_end) {
config.eval_ldb_end = 0;
cliReadReply(0);
printf("n(Lua debugging session ended%s)nn",
config.eval_ldb_sync ? "" :
" -- dataset changes rolled back");
}
// 耗时
elapsed = mstime()-start_time;
if (elapsed >= 500 &&
config.output == OUTPUT_STANDARD)
{
printf("(%.2fs)n",(double)elapsed/1000);
}
}
}
sdsfreesplitres(argv,argc);
}
linenoiseFree(line);
}
// 退出
exit(0);
}
4.命令重复发送issueCommandRepeat
这个部分主要是处理发送失败后重连发送的。
核心还是cliSendCommand 方法
static int issueCommandRepeat(int argc, char **argv, long repeat) {
while (1) {
config.cluster_reissue_command = 0;
if (cliSendCommand(argc,argv,repeat) != REDIS_OK) {
// 发送失败进行重连
cliConnect(CC_FORCE);
if (cliSendCommand(argc,argv,repeat) != REDIS_OK) {
cliPrintContextError();
return REDIS_ERR;
}
}
if (config.cluster_mode && config.cluster_reissue_command) {
cliConnect(CC_FORCE);
} else {
break;
}
}
return REDIS_OK;
}
5.cliSendCommand 发送redis命令
函数的主要功能:
- 申请内存存储命令
- 命令行转化为resp协议redisAppendCommandArgv(context,argc,(const char**)argv,argvlen);
- 将resp协议命令传入cliReadReply 进行命令发送,并读取响应结果
- 至于后续的读取服务器返回信息redisBufferRead
- cliRefreshPrompt 刷新客户端交互窗口,进行返回结果输出。



