该小工具可以 实现两个redis数据库某个库(redis 16个库)之间的复制,支持不同网络的redis之间的复制;
(可用于将开发本地的内网数据复制到正式线上)
ps:若是复制整个redis库,可以直接copy aof文件或是dump文件进行复制
from redis import StrictRedis
import json
import os
def read_connect():
host1 = input('源数据redis连接ip: (默认 127.0.0.1)') # '172.16.11.33'
if host1 == '':
host1 = '127.0.0.1'
password1 = input('源数据redis密码: (默认空)') # 'xtzGpdi123'
if password1 == '':
password1 = ''
database1 = input('源数据redis的数据库: (默认 0)') # 1
if database1 == '':
database1 = 0
redis = StrictRedis(host=host1, port=6379, db=database1, password=password1, decode_responses=True)
return redis
def write_connect():
host1 = input('目标数据redis连接ip: (默认 127.0.0.1)') # '172.16.11.33'
if host1 == '':
host1 = '127.0.0.1'
password1 = input('目标数据redis密码: (默认空)') # 'xtzGpdi123'
if password1 == '':
password1 = ''
database1 = input('目标数据redis的数据库: (默认 15)') # 1
if database1 == '':
database1 = 15
redis = StrictRedis(host=host1, port=6379, db=database1, password=password1, decode_responses=True)
return redis
def copy_data():
redis1 = read_connect()
redis2 = write_connect()
keys = redis1.keys('*')
for key in keys:
redis_type = redis1.type(key)
if 'string' == redis_type:
value = redis1.get(key)
redis2.set(key, value)
elif 'hash' == redis_type:
value = redis1.hgetall(key)
redis2.hmset(key, value)
elif 'list' == redis_type:
value = redis1.lrange(key, 0, -1)
redis2.lpush(key, *value)
elif 'set' == redis_type:
value = redis1.smembers(key)
redis2.sadd(key, *value)
elif 'zset' == redis_type:
values = redis1.zrange(key, 0, -1, False, True)
value = dict(values)
redis2.zadd(key, value)
def write_data():
redis1 = read_connect()
keys = redis1.keys('*')
print(len(keys))
file_name = 'temp.txt'
with open(file_name, 'a', encoding='UTF-8') as w:
for key in keys:
redis_type = redis1.type(key)
if 'string' == redis_type:
value = redis1.get(key)
w.write(key + '*string*' + value + 'n')
elif 'hash' == redis_type:
value = redis1.hgetall(key)
w.write(key + '*hash*' + str(value) + 'n')
elif 'list' == redis_type:
value = redis1.lrange(key, 0, -1)
w.write(key + '*list*' + str(value) + 'n')
elif 'set' == redis_type:
value = redis1.smembers(key)
w.write(key + '*set*' + str(value) + 'n')
elif 'zset' == redis_type:
values = redis1.zrange(key, 0, -1, False, True)
w.write(key + '*zset*' + str(values) + 'n')
def read_data():
redis = write_connect()
with open('./temp.txt') as lines: # 一次性读入txt文件,并把内容放在变量lines中
array = lines.readlines() # 返回的是一个列表,该列表每一个元素是txt文件的每一行
print(len(array))
for item in array: # 遍历array中的每个元素
item = item.strip('n') # 去掉换行符n
value_list = item.split('*')
redis_type = value_list[1]
key = value_list[0]
value = value_list[2]
if 'string' == redis_type:
redis.set(key, value)
elif 'hash' == redis_type:
json_value = json.loads(value.replace(''', '"'))
redis.hmset(key, json_value)
elif 'list' == redis_type:
value = list(eval(value))
redis.lpush(key, *value)
elif 'set' == redis_type:
value = tuple(eval(value))
redis.sadd(key, *value)
elif 'zset' == redis_type:
value = list(eval(value))
value_dict = {}
for temp in value:
value_dict[temp[0]] = temp[1]
redis.zadd(key, value_dict)
if __name__ == '__main__':
print('解释说明:')
print('两个redis数据库网络连通,按照提示执行【一次】程序即可')
print('两个redis数据库网络不通,需要按照提示执行【两次】程序,先在源数据数据库(所在的windows,或是可以连接源数据库的windows)执行一次程序,生成的temp.txt')
print('将temp.txt拷贝到目标数据库(所在的windows,或是可以连接目标数据库的windows),再按照提示执行一次程序(程序跟temp.txt同一个目录下)')
connect = input("两个redis数据库网络是否连通:(Y/N)")
if connect == '' or connect == 'Y' or connect == 'y':
copy_data() # 复制数据
else:
connect = input("是否读取源数据:(Y/N)")
if connect == '' or connect == 'Y' or connect == 'y':
write_data() # 写数据到temp.txt
else:
connect = input("是否写到目标数据库:(Y/N)")
if connect == '' or connect == 'Y' or connect == 'y':
read_data() # 从temp.txt读取数据
path = './temp.txt'
if os.path.exists(path): # 如果文件存在
os.remove(path)
else:
print('文件不存在,复制数据出错!!!')
# 编译exe文件 命令 pyinstaller -F 文件名.py
附上exe执行文件百度网盘
链接: https://pan.baidu.com/s/19fRTLqT0unllqVBdnIRmNQ 提取码: f6ev



