栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

python telnet 交互_python的telnet?

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

python telnet 交互_python的telnet?

文章目录

一、说明二、代码三、用法总结

一、说明

通过telnetlib库,telnet到设备上并做一些测试。包括重启设备、等待重启完成、其它测试操作等。

二、代码
#! /usr/bin/python3.6
import logging
import telnetlib
import time


host_addr = '10.229.105.14'
port_num = '10017'


# 环境初始化
def init_env():
    global host_addr, port_num
    host_addr = input('请输入串口服务器地址:')
    if len(host_addr) == 0:
        host_addr = '10.229.105.14'
    print('串口服务器:', host_addr)

    port_num = input('请输入端口号:')
    if len(port_num) == 0:
        port_num = '10017'
    print('端口号: ', port_num)


class TelnetClient:
    def __init__(self,):
        self.session = telnetlib.Telnet()

    # 此函数实现telnet登录主机
    def login_host(self, host_addr, port_num):
        try:
            # self.session = telnetlib.Telnet(host_addr, port=port_num)
            self.session.open(host_addr, port=port_num)
        except:
            logging.warning('%s网络连接失败'%host_addr)
            return False
        # 延时两秒再收取返回结果,给服务端足够响应时间
        time.sleep(2)
        # 获取登录结果
        # read_very_eager()获取到的是的是上次获取之后本次获取之前的所有输出
        command_result = self.session.read_very_eager().decode('ascii')
        print('telnet result:', command_result)
        # if 'Login incorrect' not in command_result:
        #     logging.warning('%s登录成功'%host_addr)
        #     return True
        # else:
        #     logging.warning('%s登录失败,用户名或密码错误'%host_addr)
        #     return False
        return True

    # 退出telnet
    def logout_host(self):
        self.session.close()

	# 执行命令,不关心返回值
    def execute_command_without_res(self, command):
        # 执行命令
        self.session.write(command.encode('ascii')+b'n')

    # 此函数实现执行传过来的命令,并输出其执行结果
    def execute_some_command(self, command, time_wait):
        # 执行命令
        self.execute_command_without_res(command)
        time.sleep(time_wait)
        # 获取命令结果
        command_result = self.session.read_very_eager().decode()
        # logging.warning(command + '执行结果:n%s' % command_result)
        return command_result

	# 在timeout时间内,等待一些返回值(某些命令执行时间很长用得到)
    def wait_some_res(self, res, timeout):
        while timeout > 0:
            load_res = self.execute_some_command('r', 1)
            # enter_res = self.session.read_very_eager().decode('ascii')
            if res in load_res:
                return True
            time.sleep(2)
            timeout -= 3
        return False

	# 执行一些命令,并在timeout时间内等待返回值
    def execute_command_wait_some_res(self, command, res, timeout):
        self.session.write(command.encode('ascii')+b'n')
        while timeout>0:
            load_res = self.session.read_very_eager().decode('ascii')
            if res in load_res:
                return True
            time.sleep(2)
            timeout -= 2
        return False

	# 测试命令,升级boot
    def upgrade_boot(self, times):
        msg = 'echo upgrade boot the %d times' % (times)
        self.execute_command_without_res('r')
        self.execute_command_without_res(msg)
        now_ver_cmd = 'dmidecode -t 0 |grep "BIOS Revision"'
        now_ver_res = self.execute_some_command(now_ver_cmd, 5)
        if '2.3' in now_ver_res:
            print('now version is 2.3')
            up_boot_cmd = 'echo 1 /home/admin/ZBOOT-TXCPC-X86-REL_4_4.bin > /sys/bsp/bootflash/std_program'
            # self.execute_some_command(up_boot_cmd, 2)
            if self.execute_command_wait_some_res(up_boot_cmd, 'Bootflash program success', 10*60):
                switch_cmd = 'echo 1 > /sys/bsp/bootflash/std_switch'
                self.execute_command_wait_some_res(switch_cmd, 'Bootflash switch success', 5)

        elif '4.4' in now_ver_res:
            print('now version is 4.4')
            up_boot_cmd = 'echo 1 /home/admin/ZBOOT-TXCPC-X86-REL_3_8.bin > /sys/bsp/bootflash/std_program'
            # self.execute_some_command(up_boot_cmd, 2)
            if self.execute_command_wait_some_res(up_boot_cmd, 'Bootflash program success', 10*60):
                switch_cmd = 'echo 1 > /sys/bsp/bootflash/std_switch'
                self.execute_command_wait_some_res(switch_cmd, 'Bootflash switch success', 5)
        else:
            print('get boot version error')

	# 测试命令,升级dbg版本boot
    def upgrade_dbg_boot(self, times):
        msg = 'echo upgrade boot the %d times' % (times)
        self.execute_command_without_res('r')
        self.execute_command_without_res(msg)
        now_ver_cmd = 'dmidecode -t 0 |grep "BIOS Revision"'
        now_ver_res = self.execute_some_command(now_ver_cmd, 5)

        up_boot_cmd = 'echo 1 /home/admin/ZBOOT-TXCPC-X86-DBG_4_9.bin > /sys/bsp/bootflash/std_program'
        # self.execute_some_command(up_boot_cmd, 2)
        if self.execute_command_wait_some_res(up_boot_cmd, 'Bootflash program success', 10*60):
            switch_cmd = 'echo 1 > /sys/bsp/bootflash/std_switch'
            self.execute_command_wait_some_res(switch_cmd, 'Bootflash switch success', 5)

	# 测试命令,升级bootepld
    def upgrade_bootepld(self, times):
        msg = 'echo upgrade bootepld the %d times' % (times)
        self.execute_command_without_res('r')
        self.execute_command_without_res(msg)
        bootepld_ver_cmd = 'cat /sys/bsp/epldflash/bootepld0/info |grep epldver'
        bootepld_ver_res = self.execute_some_command(bootepld_ver_cmd, 3)
        if '1.03' in bootepld_ver_res:
            print('now version is 1.03')
            up_epld_cmd = 'echo 2 0 /home/admin/TXCPC_00_210900_EPLD_D3_CFG2_106.vpd >/sys/bsp/epldflash/std_program'
            self.execute_command_without_res('r')
            self.execute_command_without_res(up_epld_cmd)
        elif '1.06' in bootepld_ver_res:
            print('now version is 1.03')
            up_epld_cmd = 'echo 2 0 /home/admin/TXCPC_00_210900_EPLD_D3_CFG2_103.vpd >/sys/bsp/epldflash/std_program'
            self.execute_command_without_res('r')
            self.execute_command_without_res(up_epld_cmd)
        else:
            print('get bootepld version error')
            return

        self.wait_some_res('root@kis:', 5)
        if self.wait_some_res('root@kis:', 2*60):
            success_msg = 'upgrade bootepld the %d times success!!' % (times)
            print(success_msg)

	# 测试命令,升级workepld
    def upgrade_workepld(self, times):
        msg = 'echo upgrade workepld the %d times' % (times)
        self.execute_command_without_res('r')
        self.execute_command_without_res(msg)
        workepld_ver_cmd = 'cat /sys/bsp/epldflash/workepld1/info |grep epldver'
        workepld_ver_res = self.execute_some_command(workepld_ver_cmd, 3)
        if '1.08' in workepld_ver_res:
            print('now version is 1.08')
            up_epld_cmd = 'echo 1 0 /home/admin/ZXR10_5960M-8M-HX-TVMCCB-210900-D1A15-107.vpd > /sys/bsp/epldflash/std_program'
            self.execute_command_without_res('r')
            self.execute_command_without_res(up_epld_cmd)
        elif '1.07' in workepld_ver_res:
            print('now version is 1.07')
            up_epld_cmd = 'echo 1 0 /home/admin/ZXR10_5960M-8M-HX-TVMCCB-210900-D1A15-108.vpd > /sys/bsp/epldflash/std_program'
            self.execute_command_without_res('r')
            self.execute_command_without_res(up_epld_cmd)
        else:
            print('get bootepld version error')
            return

        self.wait_some_res('root@kis:', 5)
        if self.wait_some_res('root@kis:', 2*60):
            success_msg = 'upgrade bootepld the %d times success!!' % (times)
            print(success_msg)

	# 测试命令,重启环境
    def reboot_test(self):
        reboot_cmd = '/usr/sbin/reboot'
        if self.execute_command_wait_some_res(reboot_cmd, 'login:', 20*60):
            self.execute_command_wait_some_res('admin', 'Password:', 5)
            self.execute_command_without_res('kis1234')
            if self.wait_some_res('kis#', 10):
                self.execute_command_without_res('exit')
                self.execute_command_without_res('sudo su')
            elif self.wait_some_res('admin@kis:', 10):
                self.execute_command_without_res('sudo su')
        if self.wait_some_res('root@kis:', 10):
            self.execute_command_without_res('echo reboot success!!!!!!!')
        else:
            self.logout_host()


if __name__ == '__main__':
    # init_env()
    telnet_client = TelnetClient()
    # 如果登录结果返加True,则执行命令,然后退出
    if telnet_client.login_host(host_addr, port_num):
        for i in range(0, 500):
            # telnet_client.upgrade_dbg_boot(i)
            telnet_client.upgrade_bootepld(i)
            # telnet_client.upgrade_workepld(i)
            telnet_client.reboot_test()
    telnet_client.logout_host()


三、用法总结

具体用法看手册吧

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/786611.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号