栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

【小沐学python】实现socket网络通信(二)

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

【小沐学python】实现socket网络通信(二)

【小沐学python】开发环境(一)
【小沐学python】实现socket网络通信(二)

socket网络通信
  • 1、简介
  • 2、TCP方式
    • 2.1 服务端
    • 2.2 客户端
  • 3、UDP方式
    • 3.1 发送端
    • 3.2 接收端
  • 4、HTTP方式
  • 5、WebSocket
    • 5.1 安装websockets库
    • 5.2 服务端
    • 5.3 客户端
  • 后记

1、简介

Python 提供了两个级别访问的网络服务:

  • 低级别的网络服务支持基本的 Socket,它提供了标准的 BSD Sockets API,可以访问底层操作系统 Socket 接口的全部方法。
  • 高级别的网络服务模块 SocketServer, 它提供了服务器中心类,可以简化网络服务器的开发。
2、TCP方式 2.1 服务端
#!/usr/bin/python
# -*- coding: UTF-8 -*-
# filename:test_tcp_server.py
# creator: tomcat
# date: 2021-11-16

from socket import *
from time import ctime
from datetime import datetime

HOST = ''
PORT = 27015
BUFSIZ = 1024
ADDR = (HOST, PORT)

s = socket(AF_INET, SOCK_STREAM)
s.bind(ADDR)
s.listen(5)

while True:
    print('waiting for clients ...')
    c, addr = s.accept()
    t = datetime.now ().strftime ('%H:%M:%S')
    print('   connnecting from: [', t, "]", addr)

    while True:
        data = c.recv(BUFSIZ)
        if not data:
            break
        print("recv: ", data )
        c.send(('[%s] %s' % (ctime(), "A message from python server.")).encode())
    c.close()
    s.close()

运行结果如下:

测试对应的客户端(C++),运行结果如下:

2.2 客户端
#!/usr/bin/python
# -*- coding: UTF-8 -*-
# filename:test_tcp_client.py
# creator: tomcat
# date: 2021-11-16

from socket import *
from time import ctime
from datetime import datetime

BUFSIZ = 1024
ADDR = ('127.0.0.1', 27015) # or 'localhost'

c = socket(AF_INET, SOCK_STREAM)
c.connect(ADDR)
while True:
    data = input('Input: ')
    if not data:
        break
    c.send(data.encode())
    data = c.recv(BUFSIZ)
    if not data:
        break

    t = datetime.now().strftime('%H:%M:%S')
    print("[", t,"]",data.decode('utf-8'))
c.close()

运行结果如下:

测试对应的服务端(C++),运行结果如下:

3、UDP方式 3.1 发送端
#!/usr/bin/python
# -*- coding: UTF-8 -*-
# filename:test_udp_client.py
# creator: tomcat
# date: 2021-11-16

from socket import *
from datetime import *

BUFSIZ = 1024
ADDR = ('127.0.0.1', 27015) # or 'localhost'

c = socket(AF_INET, SOCK_DGRAM)

while True:
    data = input('Input: ')
    if not data:
        break
    c.sendto(data.encode(), ADDR)
    data = c.recvfrom(BUFSIZ)
    if not data:
        break

    t = datetime.now().strftime('%H:%M:%S')
    print("[", t,"]", data)
c.close()

运行结果如下:

测试对应的接收端(C++),运行结果如下:

3.2 接收端
#!/usr/bin/python
# -*- coding: UTF-8 -*-
# filename:test_udp_server.py
# creator: tomcat
# date: 2021-11-16

from socket import *
from time import ctime
from datetime import datetime

BUFSIZ = 1024
ADDR = ('127.0.0.1', 27015) # or 'localhost'

s = socket(AF_INET, SOCK_DGRAM)
s.bind(ADDR)

while True:
    print('waiting for clients ...')
    data, addr = s.recvfrom(1024)
    t = datetime.now ().strftime ('%H:%M:%S')
    print('   connnecting from: [', t, "]", addr)

    data = data.decode()
    if not data:
        break
    print('[Received]', data)
    send = input('Input: ')
    s.sendto(send.encode(), addr) 
s.close()

运行结果如下:

测试对应的发送端(C++),运行结果如下:

4、HTTP方式
import requests
import base64

def acces_api_with_cookie(url_login,USERNAME,PASSWORD,url_access):

    # Start a session so we can have persistant cookies
    session = requests.session()

    # This is the form data that the page sends when logging in
    login_data = {
        'username': USERNAME,
        'password': PASSWORD,
        'submit': 'login',
    }

    # Authenticate
    r = session.post(url_login, data=login_data)

    # Try accessing a page that requires you to be logged in
    r = session.get(url_access)
    #print r.content
    #print r.text
    print(r.status_code)
    return r.content

    #print r.content

id = 1
title = base64.b64encode("wuhan")
studytime = "a"
operat="b"
warning="c"
report="d"
table="portal_post"
url=base64.b64encode('{"action":"portal/Article/index","param":{"id":2}}')

url_full = "http://192.168.88.116/user/favorite/add?id="+str(id)
           +"&title="+title
           +"&studytime="+studytime
           +"&operat="+operat
           +"&warning="+warning
           +"&report="+report
           +"&table="+table
           +"&url="+url
print(url_full)

acces_api_with_cookie("http://192.168.88.116/user/login/dologin.html","admin","admin888", url_full)

5、WebSocket
  • The WebSocket Protocol

The WebSocket Protocol enables two-way communication between a client running untrusted code in a controlled environment to a remote host that has opted-in to communications from that code. The security model used for this is the origin-based security model commonly used by web browsers. The protocol consists of an opening handshake followed by basic message framing, layered over TCP. The goal of this technology is to provide a mechanism for browser-based applications that need two-way communication with servers that does not rely on opening multiple HTTP connections (e.g., using XMLHttpRequest or s and long polling).

5.1 安装websockets库
$ pip install websockets

$ pip list

5.2 服务端
#!/usr/bin/python
# -*- coding: UTF-8 -*-
# filename:test_websocket_server.py
# description: WS server example
# creator: tomcat
# date: 2021-11-16

import asyncio
import websockets

async def main_loop(ws, path):
    name = await ws.recv()
    print(f"< {name}")

    msg = f"Python server: {name}!"
    print(f"> {msg}")
    await ws.send(msg)

start_server = websockets.serve(main_loop, '127.0.0.1', 12345)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

运行结果如下:

测试对应的客户端(js)如下:




    
    
    
    
    


    


浏览器chrome的运行结果如下:

Wireshark 抓取webscoket如下:

5.3 客户端
#!/usr/bin/python
# -*- coding: UTF-8 -*-
# filename:test_websocket_server.py
# description: WS client example
# creator: tomcat
# date: 2021-11-16

import asyncio
import websockets


async def main_loop():
    async with websockets.connect('ws://localhost:12345') as ws:
        name = input(">")

        await ws.send(name)
        print(f"> {name}")

        msg = await ws.recv()
        print(f"< {msg}")


asyncio.get_event_loop().run_until_complete(main_loop())

后记

如果你觉得该方法或代码有一点点用处,可以给作者点个赞;╮( ̄▽ ̄)╭
如果你感觉方法或代码不咋地//(ㄒoㄒ)//,就在评论处留言,作者继续改进。o_O???
谢谢各位童鞋们啦( ´ ▽´ )ノ ( ´ ▽´)っ!!!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/529423.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号