- redis 的线程模型设置
- 请求解析
- Redis I/O 线程的启停时机
- redis 命令执行过程
- RESP协议
- 命令调用
- 返回响应
- 执行命令
曾经确实是单线程,至于原因主要还是性能。在 redis6 之后,发现在数据量特别大的时候,网络 I/O 数据的读/写将占用执行期间大部分 CPU 时间,成为 Redis 主要性能瓶颈之一。后来便创建了I/O线程,将不同的客户端的I/O数据的读/写操作分配到不同的I/O线程中进行处理。
可通过 io-threads 配置项设置I/O线程数量。
设计意义:
1、redis瓶颈不在数据处理,在网络I/O
2、单线程降低了数据操作的复杂度
3、多线程可能存在线程切换、资源竟态、死锁等情况
redis 在 networking.c 中定义了如下变量:
pthread_t io_threads[IO_THREADS_MAX_NUM]; //存储所有线程的线程标识符 pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM]; //用于启停 I/O 线程 _Atomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM]; //每个I/O线程待处理的客户端数量 int io_threads_active; int io_threads_op; list *io_threads_list[IO_THREADS_MAX_NUM]; //每个线程的客户端队列
看来一会儿还要补一下 redis对客户端相关处理 相关代码,不然不全。
Redis 启动时,会调用 initThreadedIO 函数创建 I/O 线程,默认处于停用状态。
请求解析
redis认为多线程执行I/O读操作对性能影响不大,默认使用单线程执行I/O读操作。
如果要开启多线程读,可以修改配置项:io-threads-do-reads yes
下面函数会将待读客户端分配给各个I/O线程,等待IO线程读取并解析请求数据:
int handleClientsWithPendingReadsUsingThreads(void) {
if (!io_threads_active || !server.io_threads_do_reads) return 0;
int processed = listLength(server.clients_pending_read);
if (processed == 0) return 0;
if (tio_debug) printf("%d TOTAL READ pending clientsn", processed);
//划分之后,每个线程只需要处理自己队列上的客户端,从而将数据分隔,线程之间独立执行,互不影响
listIter li;
listNode *ln;
listRewind(server.clients_pending_read,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodevalue(ln);
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
io_threads_op = IO_THREADS_OP_READ;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
io_threads_pending[j] = count;
}
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodevalue(ln);
readQueryFromClient(c->conn);
}
listEmpty(io_threads_list[0]);
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
if (tio_debug) printf("I/O READ All threads finshedn");
//当所有客户端数据都已经被读取并解析完成,主线程开始遍历所有的客户端,执行命令
while(listLength(server.clients_pending_read)) {
ln = listFirst(server.clients_pending_read);
client *c = listNodevalue(ln);
c->flags &= ~CLIENT_PENDING_READ;
listDelNode(server.clients_pending_read,ln);
if (c->flags & CLIENT_PENDING_COMMAND) {
c->flags &= ~CLIENT_PENDING_COMMAND;
if (processCommandAndResetClient(c) == C_ERR) {
continue;
}
}
processInputBuffer(c);
}
return processed;
}
Redis I/O 线程的启停时机
是我们配置了使用IO线程就一定要用吗?怎么选用配套的CPU呢?这些都是问题。
1、需要我们修改配置文件
2、待处理客户端数量大于或等于指定I/O线程数量的2倍
当没有任务的时候,I/O 线程处于忙等状态,为避免这些I/O与主线程争夺 CPU 资源,redis建议运行机器的CPU不少于4核,I/O数小于核数(超频另说)。
redis 命令执行过程 RESP协议
RESP可以序列化以下几种数据类型:整数、错误信息、单行字符串、多行字符串、数组。
了解就好,还是更喜欢 PB。
命令调用
解析完命令请求后,会调用下面的函数处理该命令请求:
void processInputBuffer(client *c) {
while(c->qb_pos < sdslen(c->querybuf)) {
if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;
if (c->flags & CLIENT_BLOCKED) break;
if (c->flags & CLIENT_PENDING_COMMAND) break;
if (server.lua_timedout && c->flags & CLIENT_MASTER) break;
if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;
if (!c->reqtype) {
if (c->querybuf[c->qb_pos] == '*') {
c->reqtype = PROTO_REQ_MULTIBULK;
} else {
c->reqtype = PROTO_REQ_INLINE;
}
}
if (c->reqtype == PROTO_REQ_INLINE) {
if (processInlineBuffer(c) != C_OK) break;
if (server.gopher_enabled &&
((c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '/') ||
c->argc == 0))
{
processGopherRequest(c);
resetClient(c);
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
break;
}
} else if (c->reqtype == PROTO_REQ_MULTIBULK) {
if (processMultibulkBuffer(c) != C_OK) break;
} else {
serverPanic("Unknown request type");
}
if (c->argc == 0) {
resetClient(c);
} else {
if (c->flags & CLIENT_PENDING_READ) {
c->flags |= CLIENT_PENDING_COMMAND;
break;
}
if (processCommandAndResetClient(c) == C_ERR) {
return;
}
}
}
if (c->qb_pos) {
sdsrange(c->querybuf,c->qb_pos,-1);
c->qb_pos = 0;
}
qb_pos 为查询缓冲区最新读取位置,该位置小于查询缓冲区内容长度时,循环继续执行。
}
processMultibulkBuffer 函数从查询缓冲区的数据中解析请求报文,获取命令及命令参数:
int processMultibulkBuffer(client *c) {
char *newline = NULL;
int ok;
long long ll;
if (c->multibulklen == 0) {
serverAssertWithInfo(c,NULL,c->argc == 0);
newline = strchr(c->querybuf+c->qb_pos,'r');
if (newline == NULL) {
if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
addReplyError(c,"Protocol error: too big mbulk count string");
setProtocolError("too big mbulk count string",c);
}
return C_ERR;
}
if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2))
return C_ERR;
serverAssertWithInfo(c,NULL,c->querybuf[c->qb_pos] == '*');
ok = string2ll(c->querybuf+1+c->qb_pos,newline-(c->querybuf+1+c->qb_pos),&ll);
if (!ok || ll > 1024*1024) {
addReplyError(c,"Protocol error: invalid multibulk length");
setProtocolError("invalid mbulk count",c);
return C_ERR;
}
c->qb_pos = (newline-c->querybuf)+2;
if (ll <= 0) return C_OK;
c->multibulklen = ll;
if (c->argv) zfree(c->argv);
c->argv = zmalloc(sizeof(robj*)*c->multibulklen);
}
serverAssertWithInfo(c,NULL,c->multibulklen > 0);
while(c->multibulklen) {
if (c->bulklen == -1) {
newline = strchr(c->querybuf+c->qb_pos,'r');
if (newline == NULL) {
if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
addReplyError(c,
"Protocol error: too big bulk count string");
setProtocolError("too big bulk count string",c);
return C_ERR;
}
break;
}
if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2))
break;
if (c->querybuf[c->qb_pos] != '$') {
addReplyErrorFormat(c,
"Protocol error: expected '$', got '%c'",
c->querybuf[c->qb_pos]);
setProtocolError("expected $ but got something else",c);
return C_ERR;
}
ok = string2ll(c->querybuf+c->qb_pos+1,newline-(c->querybuf+c->qb_pos+1),&ll);
if (!ok || ll < 0 || ll > server.proto_max_bulk_len) {
addReplyError(c,"Protocol error: invalid bulk length");
setProtocolError("invalid bulk length",c);
return C_ERR;
}
c->qb_pos = newline-c->querybuf+2;
if (ll >= PROTO_MBULK_BIG_ARG) {
if (sdslen(c->querybuf)-c->qb_pos <= (size_t)ll+2) {
sdsrange(c->querybuf,c->qb_pos,-1);
c->qb_pos = 0;
c->querybuf = sdsMakeRoomFor(c->querybuf,ll+2);
}
}
c->bulklen = ll;
}
if (sdslen(c->querybuf)-c->qb_pos < (size_t)(c->bulklen+2)) {
break;
} else {
if (c->qb_pos == 0 &&
c->bulklen >= PROTO_MBULK_BIG_ARG &&
sdslen(c->querybuf) == (size_t)(c->bulklen+2))
{
c->argv[c->argc++] = createObject(OBJ_STRING,c->querybuf);
sdsIncrLen(c->querybuf,-2);
c->querybuf = sdsnewlen(SDS_NOINIT,c->bulklen+2);
sdsclear(c->querybuf);
} else {
c->argv[c->argc++] =
createStringObject(c->querybuf+c->qb_pos,c->bulklen);
c->qb_pos += c->bulklen+2;
}
c->bulklen = -1;
c->multibulklen--;
}
}
if (c->multibulklen == 0) return C_OK;
return C_ERR;
}
用户请求触达,触发 AE_READABLE 事件,调用 readQueryFromClient 函数处理事件。
返回响应
client中定义了两个回复缓冲区:
一个字符数组,大小为16KB;一个结构体链表:
char buf[PROTO_REPLY_CHUNK_BYTES]; list *reply;
先尝试写入 client.buf,如果client.buf写不下,则尝试写入client.reply中。
执行命令
上面一波操作之后,命令参数已经存储在 client.argv中。
processCommandAndResetClient 函数调用 processCommand 函数执行命令,并在命令执行后调用 commandProcessed 执行后续逻辑。



