- redis集群概述
- 客户端重定向
- 槽位迁移
- redis cluster 的运行流程
哨兵、主从、集群,一串下来。
redis cluster 主要作用如下(虽然是显而易见了):
数据分片,流量分发。
cluster 将不同的数据分发给不同的节点,不过没有使用一致性hash算法,而是引入了Hash槽位的概念。cluster有16384个槽位,每个槽位只能指派给一个节点。
所以本文的重心也就很明确了:redis集群是如何实现通信及数据分片、流量分发的
关于实操:【redis】闲得无聊,来聊聊当下爆火的 redis集群,顺便搭一个玩玩呗
客户端重定向
如果cluster中的某个节点收到客户端请求,但请求中查询的键不是当前节点负责的,则它将通知客户端进行重定向,客户端重新发送请求给真正的数据存储节点。
那是怎么实现?包发过来,不在我这儿,我告诉她在他那儿,让她重发给他?
还是包发过来,不在我这儿,我告诉她不在我这儿,因为我也不知道在谁那儿,让她一个一个自己去试试?
还是什么其他的方式?
我想是第一种,但是现实是什么样的,再看看。
int processCommand(client *c) {
......
if (server.cluster_enabled &&
!(c->flags & CLIENT_MASTER) &&
!(c->flags & CLIENT_LUA &&
server.lua_caller->flags & CLIENT_MASTER) &&
!(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 &&
c->cmd->proc != execCommand))
{
int hashslot;
int error_code;
//查找真正的存储节点
clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
&hashslot,&error_code);
if (n == NULL || n != server.cluster->myself) {
if (c->cmd->proc == execCommand) {
discardTransaction(c);
} else {
flagTransaction(c);
}
//返回 ASK 或 MOBED 转向标志及重定向目标节点,通知客户端重定向
clusterRedirectClient(c,n,hashslot,error_code);
return C_OK;
}
}
......
}
getNodeByQuery函数负责查找数据存储节点:
clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) {
clusterNode *n = NULL;
robj *firstkey = NULL;
int multiple_keys = 0;
multiState *ms, _ms;
multiCmd mc;
int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0;
if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
return myself;
if (error_code) *error_code = CLUSTER_REDIR_NONE;
if (cmd->proc == execCommand) {
if (!(c->flags & CLIENT_MULTI)) return myself;
ms = &c->mstate;
} else {
ms = &_ms;
_ms.commands = &mc;
_ms.count = 1;
mc.argv = argv;
mc.argc = argc;
mc.cmd = cmd;
}
for (i = 0; i < ms->count; i++) {
struct redisCommand *mcmd;
robj **margv;
int margc, *keyindex, numkeys, j;
mcmd = ms->commands[i].cmd;
margc = ms->commands[i].argc;
margv = ms->commands[i].argv;
keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys);
for (j = 0; j < numkeys; j++) {
robj *thiskey = margv[keyindex[j]];
int thisslot = keyHashSlot((char*)thiskey->ptr,
sdslen(thiskey->ptr));
if (firstkey == NULL) {
firstkey = thiskey;
slot = thisslot;
n = server.cluster->slots[slot];
if (n == NULL) {
getKeysFreeResult(keyindex);
if (error_code)
*error_code = CLUSTER_REDIR_DOWN_UNBOUND;
return NULL;
}
if (n == myself &&
server.cluster->migrating_slots_to[slot] != NULL)
{
migrating_slot = 1;
} else if (server.cluster->importing_slots_from[slot] != NULL) {
importing_slot = 1;
}
} else {
if (!equalStringObjects(firstkey,thiskey)) {
if (slot != thisslot) {
getKeysFreeResult(keyindex);
if (error_code)
*error_code = CLUSTER_REDIR_CROSS_SLOT;
return NULL;
} else {
multiple_keys = 1;
}
}
}
if ((migrating_slot || importing_slot) &&
lookupKeyRead(&server.db[0],thiskey) == NULL)
{
missing_keys++;
}
}
getKeysFreeResult(keyindex);
}
if (n == NULL) return myself;
if (server.cluster->state != CLUSTER_OK) {
if (!server.cluster_allow_reads_when_down) {
if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE;
return NULL;
} else if (!(cmd->flags & CMD_READONLY) && !(cmd->proc == evalCommand)
&& !(cmd->proc == evalShaCommand))
{
if (error_code) *error_code = CLUSTER_REDIR_DOWN_RO_STATE;
return NULL;
} else {
}
}
if (hashslot) *hashslot = slot;
if ((migrating_slot || importing_slot) && cmd->proc == migrateCommand)
return myself;
if (migrating_slot && missing_keys) {
if (error_code) *error_code = CLUSTER_REDIR_ASK;
return server.cluster->migrating_slots_to[slot];
}
if (importing_slot &&
(c->flags & CLIENT_ASKING || cmd->flags & CMD_ASKING))
{
if (multiple_keys && missing_keys) {
if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE;
return NULL;
} else {
return myself;
}
}
if (c->flags & CLIENT_READonLY &&
(cmd->flags & CMD_READonLY || cmd->proc == evalCommand ||
cmd->proc == evalShaCommand) &&
nodeIsSlave(myself) &&
myself->slaveof == n)
{
return myself;
}
if (n != myself && error_code) *error_code = CLUSTER_REDIR_MOVED;
return n;
}
槽位迁移
什么是槽位迁移?rehash,就这样理解嘛。
什么时候发生?比方我某个节点挂了、比方我某个节点又复活可以分摊压力了;
void migrateCommand(client *c) {
migrateCachedSocket *cs;
int copy = 0, replace = 0, j;
char *username = NULL;
char *password = NULL;
long timeout;
long dbid;
robj **ov = NULL;
robj **kv = NULL;
robj **newargv = NULL;
rio cmd, payload;
int may_retry = 1;
int write_error = 0;
int argv_rewritten = 0;
int first_key = 3;
int num_keys = 1;
for (j = 6; j < c->argc; j++) {
int moreargs = (c->argc-1) - j;
if (!strcasecmp(c->argv[j]->ptr,"copy")) {
copy = 1;
} else if (!strcasecmp(c->argv[j]->ptr,"replace")) {
replace = 1;
} else if (!strcasecmp(c->argv[j]->ptr,"auth")) {
if (!moreargs) {
addReply(c,shared.syntaxerr);
return;
}
j++;
password = c->argv[j]->ptr;
} else if (!strcasecmp(c->argv[j]->ptr,"auth2")) {
if (moreargs < 2) {
addReply(c,shared.syntaxerr);
return;
}
username = c->argv[++j]->ptr;
password = c->argv[++j]->ptr;
} else if (!strcasecmp(c->argv[j]->ptr,"keys")) {
if (sdslen(c->argv[3]->ptr) != 0) {
addReplyError(c,
"When using MIGRATE KEYS option, the key argument"
" must be set to the empty string");
return;
}
first_key = j+1;
num_keys = c->argc - j - 1;
break;
} else {
addReply(c,shared.syntaxerr);
return;
}
}
if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != C_OK ||
getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != C_OK)
{
return;
}
if (timeout <= 0) timeout = 1000;
ov = zrealloc(ov,sizeof(robj*)*num_keys);
kv = zrealloc(kv,sizeof(robj*)*num_keys);
int oi = 0;
for (j = 0; j < num_keys; j++) {
if ((ov[oi] = lookupKeyRead(c->db,c->argv[first_key+j])) != NULL) {
kv[oi] = c->argv[first_key+j];
oi++;
}
}
num_keys = oi;
if (num_keys == 0) {
zfree(ov); zfree(kv);
addReplySds(c,sdsnew("+NOKEYrn"));
return;
}
try_again:
write_error = 0;
cs = migrateGetSocket(c,c->argv[1],c->argv[2],timeout);
if (cs == NULL) {
zfree(ov); zfree(kv);
return;
}
rioInitWithBuffer(&cmd,sdsempty());
if (password) {
int arity = username ? 3 : 2;
serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',arity));
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"AUTH",4));
if (username) {
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,username,
sdslen(username)));
}
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,password,
sdslen(password)));
}
int select = cs->last_dbid != dbid;
if (select) {
serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6));
serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid));
}
int non_expired = 0;
for (j = 0; j < num_keys; j++) {
long long ttl = 0;
long long expireat = getExpire(c->db,kv[j]);
if (expireat != -1) {
ttl = expireat-mstime();
if (ttl < 0) {
continue;
}
if (ttl < 1) ttl = 1;
}
kv[non_expired++] = kv[j];
serverAssertWithInfo(c,NULL,
rioWriteBulkCount(&cmd,'*',replace ? 5 : 4));
if (server.cluster_enabled)
serverAssertWithInfo(c,NULL,
rioWriteBulkString(&cmd,"RESTORE-ASKING",14));
else
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
serverAssertWithInfo(c,NULL,sdsEncodedObject(kv[j]));
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,kv[j]->ptr,
sdslen(kv[j]->ptr)));
serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl));
createDumpPayload(&payload,ov[j],kv[j]);
serverAssertWithInfo(c,NULL,
rioWriteBulkString(&cmd,payload.io.buffer.ptr,
sdslen(payload.io.buffer.ptr)));
sdsfree(payload.io.buffer.ptr);
if (replace)
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7));
}
num_keys = non_expired;
errno = 0;
{
sds buf = cmd.io.buffer.ptr;
size_t pos = 0, towrite;
int nwritten = 0;
while ((towrite = sdslen(buf)-pos) > 0) {
towrite = (towrite > (64*1024) ? (64*1024) : towrite);
nwritten = connSyncWrite(cs->conn,buf+pos,towrite,timeout);
if (nwritten != (signed)towrite) {
write_error = 1;
goto socket_err;
}
pos += nwritten;
}
}
char buf0[1024];
char buf1[1024];
char buf2[1024];
if (password && connSyncReadLine(cs->conn, buf0, sizeof(buf0), timeout) <= 0)
goto socket_err;
if (select && connSyncReadLine(cs->conn, buf1, sizeof(buf1), timeout) <= 0)
goto socket_err;
int error_from_target = 0;
int socket_error = 0;
int del_idx = 1;
if (!copy) newargv = zmalloc(sizeof(robj*)*(num_keys+1));
for (j = 0; j < num_keys; j++) {
if (connSyncReadLine(cs->conn, buf2, sizeof(buf2), timeout) <= 0) {
socket_error = 1;
break;
}
if ((password && buf0[0] == '-') ||
(select && buf1[0] == '-') ||
buf2[0] == '-')
{
if (!error_from_target) {
cs->last_dbid = -1;
char *errbuf;
if (password && buf0[0] == '-') errbuf = buf0;
else if (select && buf1[0] == '-') errbuf = buf1;
else errbuf = buf2;
error_from_target = 1;
addReplyErrorFormat(c,"Target instance replied with error: %s",
errbuf+1);
}
} else {
if (!copy) {
dbDelete(c->db,kv[j]);
signalModifiedKey(c,c->db,kv[j]);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",kv[j],c->db->id);
server.dirty++;
newargv[del_idx++] = kv[j];
incrRefCount(kv[j]);
}
}
}
if (!error_from_target && socket_error && j == 0 && may_retry &&
errno != ETIMEDOUT)
{
goto socket_err;
}
if (socket_error) migrateCloseSocket(c->argv[1],c->argv[2]);
if (!copy) {
if (del_idx > 1) {
newargv[0] = createStringObject("DEL",3);
replaceClientCommandVector(c,del_idx,newargv);
argv_rewritten = 1;
} else {
zfree(newargv);
}
newargv = NULL;
}
if (!error_from_target && socket_error) {
may_retry = 0;
goto socket_err;
}
if (!error_from_target) {
cs->last_dbid = dbid;
addReply(c,shared.ok);
} else {
}
sdsfree(cmd.io.buffer.ptr);
zfree(ov); zfree(kv); zfree(newargv);
return;
socket_err:
sdsfree(cmd.io.buffer.ptr);
if (!argv_rewritten) migrateCloseSocket(c->argv[1],c->argv[2]);
zfree(newargv);
newargv = NULL;
if (errno != ETIMEDOUT && may_retry) {
may_retry = 0;
goto try_again;
}
zfree(ov); zfree(kv);
addReplySds(c,
sdscatprintf(sdsempty(),
"-IOERR error or timeout %s to target instancern",
write_error ? "writing" : "reading"));
return;
}
redis cluster 的运行流程
这个启动过程我就不放代码了,和前面的主从、哨兵有很多相似之处。
1、节点启动
2、节点握手,参见主从握手
3、指派槽位
4、建立主从关系
5、节点通信:Gossip算法
6、故障转移,故障转移机制与哨兵基本一致



