栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

python练手

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

python练手

实现服务端基本功能,用户访问与文件上传简单版。

# 语音转文字 服务端
import socketserver
import json
import os

class MyServer(socketserver.BaseRequestHandler):

    def handle(self):
        # print('conn is:',self.request)
        # print('addr is:', self.client_address) # ('127.0.0.1', 30987)
        self.add_ip = self.client_address[0]
        self.add_post = str(self.client_address[1])

        while True:
            try:
                data = self.request.recv(102400)
                #如果获取为空就退出
                if not data:break
                #否则解码处理数据
                self.data = json.loads(data.decode('utf-8'))
                # data 是获取字典内容,self.client_address 是 ip地址与 端口
                print('客户端的消息:',self.data,self.client_address)
                #如果 leixing 是 文件传递 ,触发接收函数
                self.visit_response()
                #将结果反馈给客户端
                self.request.sendall(self.fankui.encode('utf-8'))

            except :
                print('连接异常')
                break

    #定义处理规则
    def visit_response(self):
        if self.data['leixing'] == '文件传递':
            try:
                file_download_statr = self.file_download()
                self.fankui = '{"leixing":"%s","ziduan1":"%s","ziduan2":"%s","ziduan3":"%s" }' % (
                self.data['leixing'], self.add_ip, self.add_post, file_download_statr)
            except:
                self.fankui = '{"leixing":"%s","ziduan1":"%s","ziduan2":"%s","ziduan3":"%s" }' % (
                self.data['leixing'], self.add_ip, self.add_post, '传递失败')

        elif self.data['leixing'] == '发起访问':
            try:
                self.fankui = '{"leixing":"%s","ziduan1":"%s","ziduan2":"%s","ziduan3":"%s" }' % (
                self.data['leixing'], self.add_ip, self.add_post, '访问成功')
            except:
                self.fankui = '{"leixing":"%s","ziduan1":"%s","ziduan2":"%s","ziduan3":"%s" }' % (
                self.data['leixing'], self.add_ip, self.add_post, '访问失败')

        else:
            try:
                self.fankui = '{"leixing":"%s","ziduan1":"%s","ziduan2":"%s","ziduan3":"%s" }' % (
                self.data['leixing'], self.add_ip, self.add_post, '访问成功')
            except:
                self.fankui = '{"leixing":"%s","ziduan1":"%s","ziduan2":"%s","ziduan3":"%s" }' % (
                self.data['leixing'], self.add_ip, self.add_post, '访问失败')

    #文件下载函数
    def file_download(self):

        file_path = self.data['ziduan1']
        file_size = self.data['ziduan2']
        file_hz = file_path.split('/')[-1]
        print('接收文件名:',file_hz,' 接收文件大小:',file_size,' 字节')
        # 文件传输的缓冲区
        BUFFER_SIZE = 4096
        # 接受客户端信息
        filename, file_size, new_filename = self.data['ziduan1'], self.data['ziduan2'], self.data['ziduan3']

        #判断月文件夹是否存在,不存在创建一个
        file_path_state1 = os.path.exists('./file_server/file_work_order/'+new_filename)
        if file_path_state1 == False:
            os.mkdir('./file_server/file_work_order/'+new_filename)
        # 获取文件的名字
        filename = os.path.basename(filename)
        file_size = int(file_size)

        if os.path.isfile(filename):
            f = open('./file_server/file_work_order/'+new_filename +'/' + file_hz, "wb")
        else:
            f = open('./file_server/file_work_order/'+new_filename +'/' + file_hz, "wb")
        rece_size = 0
        while rece_size < file_size:
            data = self.request.recv(BUFFER_SIZE)
            f.write(data)
            rece_size += len(data)
        else:
            return '传递成功'

#这里用作程序预备,目前建立必要的文件夹
def server_init():
    # 程序执行前先确认 ./file_server/file_work_order/ 是否存在,不存在新建
    file_path_state1 = os.path.exists('./file_server')
    if file_path_state1 == False:os.mkdir('./file_server')

    file_path_state2 = os.path.exists('./file_server/file_work_order/')
    if file_path_state2 == False:os.mkdir('./file_server/file_work_order/')

if __name__ == '__main__':

    #服务器文件夹准备
    server_init()
    #服务器开始
    s = socketserver.ThreadingTCPServer(('127.0.0.1', 8564), MyServer)
    #类似实现连接循环
    s.serve_forever()
# 语音转文字 客户端
import socket
import json
import time
import os
import tqdm

class My_Main():
    def __init__(self,ip_num,port_num):
        self.ip_num=ip_num
        self.port_num=port_num

    def socket_dlgc(self,leixing, name_text, pass_text, in_time):
        tcp_client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        tcp_client.connect((self.ip_num, self.port_num))
        if None == name_text:print('与服务器断开连接')
        #发送
        msg = '{"leixing":"%s","ziduan1":"%s","ziduan2":"%s","ziduan3":"%s" }' % (str(leixing), str(name_text), str(pass_text), str(in_time))
        tcp_client.send(msg.encode("utf-8"))  # 说话    #
        data = tcp_client.recv(102400)  # 听话
        js_data = json.loads(data.decode('utf-8'))
        tcp_client.close()
        return js_data

    def file_up(self,fujian_label,in_time):
        # 文件传输的缓冲区
        BUFFER_SIZE = 4096
        # 创建连接
        s = socket.socket()
        s.connect((self.ip_num, self.port_num))
        # 传递文件到指定目录下
        filename = fujian_label.replace('/', '//')
        # 文件大小
        file_size = os.path.getsize(filename)
        print('发送文件名:',filename,' 发送文件大小:', file_size,' 字节')
        chuandi_tup = '{"leixing":"%s","ziduan1":"%s","ziduan2":"%d","ziduan3":"%s" }' % ('文件传递', filename, file_size, in_time)
        s.send(chuandi_tup.encode())
        # 文件传输
        progress = tqdm.tqdm(range(file_size), f"发送{filename}", unit="B", unit_divisor=BUFFER_SIZE)

        with open(filename, "rb") as f:
            for _ in progress:
                # 读取文件
                bytes_read = f.read(BUFFER_SIZE)
                if not bytes_read:
                    break
                # sendall确保及时网络忙碌的时候,数据仍然可以传输
                s.sendall(bytes_read)
                progress.update(len(bytes_read))
            data = s.recv(102400)  # 听话
            js_data = json.loads(data.decode('utf-8'))
        # 关闭资源
        s.close()
        return js_data

# 时间计算
def get_current_time(input_date='0'):
    # 如果时间传入为空
    if input_date == '0':
        ct = time.time() # - 24 * 60 * 60  #如果是取昨天日期是减数值
        local_time = time.localtime(ct)
        data_head = time.strftime("%Y%m%d%H%M%S", local_time)
        data_secs = abs(ct - round(ct)) * 1000
        time_stamp = "%s%03d" % (data_head, data_secs)
    else:
        time_stamp = input_date + '120000001'
    return time_stamp

if __name__ == '__main__':
    ip_num = '127.0.0.1'
    port_num = 8564
    #加载类,传入目标ip与端口
    user = My_Main(ip_num,port_num)

    #访问时间年月日时分秒14位
    in_time =get_current_time()[0:14]
    in_month = get_current_time()[0:6]

    #发起访问
    dlgc_fk = user.socket_dlgc('发起访问','shazhicheng', '沙',in_time)
    print('服务器反馈', dlgc_fk)

    #文件传递给服务器
    file_path ='./wav/50465d2c6902_20220415103960自.mp3'
    file_statr = user.file_up(file_path,in_month)
    #如果执行结果不成功,再次执行一次,保底
    if file_statr['ziduan3'] != '传递成功':
        file_statr = user.file_up(file_path,in_month)
    print('服务器反馈', file_statr)

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/834591.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号