栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

python网络作业:使用python的socket库实现ICMP协议的ping

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

python网络作业:使用python的socket库实现ICMP协议的ping

为什么要在python中实现?

  • 很多名牌大学喜欢考试用python的socket库实现ICMP协议的ping

  • 个别环境没有ping

直接上代码:

#!/usr/bin/python3# -*- coding: utf-8 -*-# 技术支持:https://www.jianshu.com/u/69f40328d4f0 # 技术支持 https://china-testing.github.io/# https://github.com/china-testing/python-api-tesing/blob/master/practices/ping.py#qq群144081101 567351477# CreateDate: 2018-11-22import os 
import argparse 
import socketimport structimport selectimport time


ICMP_ECHO_REQUEST = 8 # Platform specificDEFAULT_TIMEOUT = 2DEFAULT_COUNT = 4 class Pinger(object):
    """ Pings to a host -- the Pythonic way"""

    def __init__(self, target_host, count=DEFAULT_COUNT, timeout=DEFAULT_TIMEOUT):
        self.target_host = target_host
        self.count = count
        self.timeout = timeout    def do_checksum(self, source_string):
        """  Verify the packet integritity """
        sum = 0
        max_count = (len(source_string)/2)*2
        count = 0
        while count < max_count:

            val = source_string[count + 1]*256 + source_string[count]                   
            sum = sum + val
            sum = sum & 0xffffffff 
            count = count + 2

        if max_count> 16)  +  (sum & 0xffff)
        sum = sum + (sum >> 16)
        answer = ~sum
        answer = answer & 0xffff
        answer = answer >> 8 | (answer << 8 & 0xff00)        return answer    def receive_pong(self, sock, ID, timeout):
        """
        Receive ping from the socket.
        """
        time_remaining = timeout        while True:
            start_time = time.time()
            readable = select.select([sock], [], [], time_remaining)
            time_spent = (time.time() - start_time)            if readable[0] == []: # Timeout
                return

            time_received = time.time()
            recv_packet, addr = sock.recvfrom(1024)
            icmp_header = recv_packet[20:28]
            type, code, checksum, packet_ID, sequence = struct.unpack(       "bbHHh", icmp_header
   )            if packet_ID == ID:
                bytes_In_double = struct.calcsize("d")
                time_sent = struct.unpack("d", recv_packet[28:28 + bytes_In_double])[0]                return time_received - time_sent

            time_remaining = time_remaining - time_spent            if time_remaining <= 0:                return


    def send_ping(self, sock,  ID):
        """
        Send ping to the target host
        """
        target_addr  =  socket.gethostbyname(self.target_host)

        my_checksum = 0

        # Create a dummy heder with a 0 checksum.
        header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, 0, my_checksum, ID, 1)
        bytes_In_double = struct.calcsize("d")
        data = (192 - bytes_In_double) * "Q"
        data = struct.pack("d", time.time()) + bytes(data.encode('utf-8'))        # Get the checksum on the data and the dummy header.
        my_checksum = self.do_checksum(header + data)
        header = struct.pack(      "bbHHh", ICMP_ECHO_REQUEST, 0, socket.htons(my_checksum), ID, 1
  )
        packet = header + data
        sock.sendto(packet, (target_addr, 1))    def ping_once(self):
        """
        Returns the delay (in seconds) or none on timeout.
        """
        icmp = socket.getprotobyname("icmp")        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)        except socket.error as e:            if e.errno == 1:                # Not superuser, so operation not permitted
                e.msg +=  "ICMP messages can only be sent from root user processes"
                raise socket.error(e.msg)        except Exception as e:            print ("Exception: %s" %(e))

        my_ID = os.getpid() & 0xFFFF

        self.send_ping(sock, my_ID)
        delay = self.receive_pong(sock, my_ID, self.timeout)
        sock.close()        return delay    def ping(self):
        """
        Run the ping process
        """
        for i in range(self.count):            print ("Ping to %s..." % self.target_host,)            try:
                delay  =  self.ping_once()            except socket.gaierror as e:                print ("Ping failed. (socket error: '%s')" % e[1])                break

            if delay  ==  None:                print ("Ping failed. (timeout within %ssec.)" % self.timeout)            else:
                delay  =  delay * 1000
                print ("Get pong in %0.4fms" % delay)if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Python ping')
    parser.add_argument('host', action="store", help=u'主机名')
    given_args = parser.parse_args()  
    target_host = given_args.host
    pinger = Pinger(target_host=target_host)
    pinger.ping()

执行

注意要有root或管理员权限:

# python3 ping.py china-testing.github.ioPing to china-testing.github.io...
Get pong in 160.7175ms
Ping to china-testing.github.io...
Get pong in 160.8465ms
Ping to china-testing.github.io...
Get pong in 12.0983ms
Ping to china-testing.github.io...
Get pong in 161.3324ms# python3 ping.py www.so.comPing to www.so.com...
Get pong in 29.0303ms
Ping to www.so.com...
Get pong in 28.8599ms
Ping to www.so.com...
Get pong in 28.9860ms
Ping to www.so.com...
Get pong in 29.0167ms



作者:python作业AI毕业设计
链接:https://www.jianshu.com/p/435a0f2755e9


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/221199.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号