现象
项目使用springbatch框架,多个分片使用jsch获取sftp连接去读文件,报错
其实就是进行多线程用jsch获取session连接,报错
com.jcraft.jsch.JSchException: Session.connect: java.net.SocketException: Connection reset
jsch版本
version=0.1.54
groupId=com.jcraft
artifactId=jsch
网上找了各种原因,有说ssh终端连接个数限制,有说tcp的连接问题,暂未找到最终原因,欢迎评论区告知
复现
public static Session getSshSession(String sftpHost, int sftpPort, String userName, String password) {
// 实例化
JSch jsch = new JSch();
// 获取sshSession
Session sshSession = null;
try {
sshSession = jsch.getSession(userName, sftpHost, sftpPort);
} catch (JSchException e) {
e.printStackTrace();
}
// 传入密码,添加密码
if (StringUtils.isNotBlank(password)) {
sshSession.setPassword(password);
}
// 实例化配置实体
Properties sshConfig = new Properties();
// 严格主机密钥检查
sshConfig.put("StrictHostKeyChecking", "no");
// 设置配置
sshSession.setConfig(sshConfig);
// 返回会话
return sshSession;
}
static void test() {
for (int i = 1; i < 50; i++) {
new Thread(() -> {
Session sshSession = getSshSession("*.*.*.*", 22, "root", "***");
try {
Thread.sleep(100);
sshSession.connect();
} catch (Exception e) {
e.printStackTrace();
} finally {
sshSession.disconnect();
}
}).start();
}
}
解决方案
使用apache.commons.pool2创建channel池
由于项目的sftp配置是动态变化,不是固定的,因此以下代码并未封装为springboot管理的bean
连接池配置:
public class ConnPoolConfig extends GenericObjectPoolConfig {
public ConnPoolConfig() {
// https://blog.csdn.net/weixin_42340670/article/details/108431381
// 对象池中最少需要有几个空闲对象
setMinIdle(4);
// 对象池的最大容量。池里最多存放多少个对象
setMaxTotal(10);
// 当从对象池里借走一个对象的时候,是否校验该对象的有效性
setTestonBorrow(true);
// 回收器线程多久执行一次空闲对象回收(轮询间隔时间,单位毫秒)
setTimeBetweenEvictionRunsMillis(60 * 60000);
// 当回收器在扫描空闲对象时,是否校验对象的有效性。
// 如果某个对象空闲时间还没达到规定的阈值,如果testWhileIdle配置为true,
// 那么就会检查该对象是否还有效,如果该对象的资源已经失效(例如:连接断开),那么他就可以被回收。
setTestWhileIdle(true);
}
}
连接池工厂:
public class ConnPoolFactory extends basePooledObjectFactory{ private String host; private Integer port; private String userName; private String password; private final String strictHostKeyChecking = "no"; public ConnPoolFactory(String host, Integer port, String userName, String password) { this.host = host; this.port = port; this.userName = userName; this.password = password; } @Override public ChannelSftp create() throws Exception { JSch jsch = new JSch(); Session session = jsch.getSession(userName, host, port); session.setPassword(password); Properties config = new Properties(); config.put("StrictHostKeyChecking", strictHostKeyChecking); session.setConfig(config); session.connect(); ChannelSftp channel = (ChannelSftp) session.openChannel("sftp"); channel.connect(); return channel; } @Override public PooledObject wrap(ChannelSftp obj) { return new DefaultPooledObject<>(obj); } // https://segmentfault.com/a/1190000003920723 // 销毁对象,如果对象池中检测到某个"对象"idle的时间超时, // 或者操作者向对象池"归还对象"时检测到"对象"已经无效,那么此时将会导致"对象销毁"; // "销毁对象"的操作设计相差甚远,但是必须明确: // 当调用此方法时,"对象"的生命周期必须结束.如果object是线程,那么此时线程必须退出; // 如果object是socket操作,那么此时socket必须关闭; // 如果object是文件流操作,那么此时"数据flush"且正常关闭. @Override public void destroyObject(PooledObject pooledObject) throws Exception { Channel channel = pooledObject.getObject(); Session session = channel.getSession(); channel.disconnect(); session.disconnect(); } // 检测对象是否"有效"; // Pool中不能保存无效的"对象",因此"后台检测线程"会周期性的检测Pool中"对象"的有效性, // 如果对象无效则会导致此对象从Pool中移除,并destroy; // 此外在调用者从Pool获取一个"对象"时,也会检测"对象"的有效性,确保不能讲"无效"的对象输出给调用者; // 当调用者使用完毕将"对象归还"到Pool时,仍然会检测对象的有效性.所谓有效性, // 就是此"对象"的状态是否符合预期,是否可以对调用者直接使用; // 如果对象是Socket,那么它的有效性就是socket的通道是否畅通/阻塞是否超时等. @Override public boolean validateObject(PooledObject pooledObject) { return pooledObject.getObject().isConnected(); } // "激活"对象,当Pool中决定移除一个对象交付给调用者时额外的"激活"操作, // 比如可以在activateObject方法中"重置"参数列表让调用者使用时感觉像一个"新创建"的对象一样; // 如果object是一个线程,可以在"激活"操作中重置"线程中断标记",或者让线程从阻塞中唤醒等; // 如果object是一个socket,那么可以在"激活操作"中刷新通道, // 或者对socket进行链接重建(假如socket意外关闭)等. @Override public void activateObject(PooledObject pooledObject) throws Exception { ChannelSftp channelSftp = pooledObject.getObject(); Session session = channelSftp.getSession(); if (!session.isConnected()) { session.connect(); channelSftp.connect(); } } // "钝化"对象,当调用者"归还对象"时,Pool将会"钝化对象"; // 钝化的言外之意,就是此"对象"暂且需要"休息"一下. // 如果object是一个socket,那么可以passivateObject中清除buffer,将socket阻塞; // 如果object是一个线程,可以在"钝化"操作中将线程sleep或者将线程中的某个对象wait. // 需要注意的时,activateObject和passivateObject两个方法需要对应,避免死锁或者"对象"状态的混乱. @Override public void passivateObject(PooledObject pooledObject) throws Exception { } }
连接池:
public class ConnPool extends GenericObjectPool{ private static final Map MAP = new ConcurrentHashMap<>(); private ConnPool(String host, Integer port, String userName, String password) { super(new ConnPoolFactory(host, port, userName, password), new ConnPoolConfig()); } public static ConnPool getConnPool(String host, Integer port, String userName, String password) { String key = host + ":" + port; ConnPool connPool = MAP.get(key); if (connPool == null) { synchronized (ConnPool.class) { connPool = MAP.get(key); if (connPool == null) { connPool = new ConnPool(host, port, userName, password); MAP.put(key, connPool); } } } return connPool; } }
该连接池支持针对不同的远程ip建立不同的池子
工具类封装:
public static ChannelSftp borrowChannel(ConnectionConfig connCfg) {
ConnPool connPool = ConnPool.getConnPool(connCfg.getHost(), connCfg.getPort(), connCfg.getUserName(),
connCfg.getPassword());
try {
return connPool.borrowObject();
} catch (Exception e) {
logger.error("Get channelSftp from pool fail", e);
}
}
public static void returnChannel(ConnectionConfig connCfg, ChannelSftp channel) {
ConnPool connPool = ConnPool.getConnPool(connCfg.getHost(), connCfg.getPort(), connCfg.getUserName(),
connCfg.getPassword());
try {
connPool.returnObject(channel);
} catch (Exception e) {
logger.error("Return channelSftp to pool fail", e);
}
}
测试无问题:
static void test2() {
AtomicInteger j = new AtomicInteger(0);
for (int i = 0; i < 50; i++) {
new Thread(() -> {
ConnPool connPool = ConnPool.getConnPool("*", 22, "root", "*");
System.out.println(connPool + "--" + j.getAndIncrement());
ChannelSftp channelSftp = null;
try {
channelSftp = connPool.borrowObject();
} catch (Exception e) {
e.printStackTrace();
} finally {
connPool.returnObject(channelSftp);
}
}).start();
}
}



