class RedisManager(object):
def __init__(self, host None, port None, cluster_nodes: list None, password None, jump None):
连接redis
try:
if jump:
ssh_config_file jump_machine_config_file()
if cluster_nodes:
# 首先打开一个tunnel用于startup_nodes寻找redis所有集群的ip
tunnels [(i[ host ], i[ port ]) for i in cluster_nodes]
ssh_tunnel SSHTunnelForwarder(jump, ssh_config_file ssh_config_file, remote_bind_addresses tunnels)
ssh_tunnel.start()
cluster_nodes [{ host : 127.0.0.1 , port : i} for i in ssh_tunnel.local_bind_ports]
self.client StrictRedisCluster(startup_nodes cluster_nodes, skip_full_coverage_check True,
nodemanager_follow_cluster True, password password)
# 取出收集到的集群node信息 保存在nodes和slots两个变量里 nodes是ip和port的对应信息 slots是key和nodes的对应信息
real_nodes self.client.connection_pool.nodes.nodes
real_slots self.client.connection_pool.nodes.slots
# 最后用取出的node信息再次开通tunnel用于取得对应key的value
tunnels [(i[ host ], i[ port ]) for i in real_nodes.values()]
ssh_tunnel2 SSHTunnelForwarder(jump, ssh_config_file ssh_config_file, remote_bind_addresses tunnels)
ssh_tunnel2.start()
tunnel_bindings {k[0]: v[1] for k, v in ssh_tunnel2.tunnel_bindings.items()}
# 使用binding的tunnel地址替换掉nodes和slots中的地址
cluster_nodes {}
for port in ssh_tunnel2.local_bind_ports:
host 127.0.0.1
name f {host}:{port}
cluster_nodes[name] { host : host, port : port, name : name, server_type : master }
print(cluster_nodes)
self.client.connection_pool.nodes.nodes cluster_nodes
for slots in real_slots.values():
for slot in slots:
real_host slot[ host ]
if real_host in tunnel_bindings:
slot[ host ] 127.0.0.1
slot[ port ] tunnel_bindings[real_host]
slot[ name ] f 127.0.0.1:{tunnel_bindings[real_host]}
else:
ssh_tunnel SSHTunnelForwarder(jump, ssh_config_file ssh_config_file, remote_bind_address (host, port))
ssh_tunnel.start()
pool redis.ConnectionPool(host 127.0.0.1 , port ssh_tunnel.local_bind_port)
self.client redis.Redis(connection_pool pool)
else:
if cluster_nodes:
self.client StrictRedisCluster(startup_nodes cluster_nodes, skip_full_coverage_check True,
nodemanager_follow_cluster True, password password)
else:
pool redis.ConnectionPool(host host, port port)
self.client redis.Redis(connection_pool pool)
except Exception as e:
logger.debug( Redis Error: %s % e)
ssh连接cassandra
import json
import traceback
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from sshtunnel import SSHTunnelForwarder
from config.settings import jump_machine_config_file