1.只用ftp实现上传下载功能
from ftplib import FTP
import os
class MyFTP:
def __init__(self):
'''ftp服务器主机IP,端口等配置'''
self.ftp_host = "***.**.**.***"
self.ftp_port = 21
self.ftp_user = '***'
self.ftp_passwd = '***'
self.ftp = FTP()
def get_ftp_host(self):
return self.ftp_host
def get_ftp_port(self):
return self.ftp_port
def get_ftp_user(self):
return self.ftp_user
def get_ftp_passwd(self):
return self.ftp_passwd
# 连接到ftp服务器
def connect(self):
print('is connecting to ftp server %s on %s' % (self.ftp_host, self.ftp_port))
self.ftp.connect(self.ftp_host, self.ftp_port)
# 登陆到ftp服务器
def login(self):
print('ready to login ftp server')
self.ftp.login(self.ftp_user, self.ftp_passwd)
print('login ftp server successfully')
print(self.ftp.getwelcome())
# 友好的关闭连接
def quit(self):
try:
self.ftp.quit()
print('colose ftp connection successfully')
except Exception as e:
print('%s' % e)
# 上传文件夹
def upload_folder(self, local_path, remote_path):
if not os.path.isdir(local_path):
print('出错了,请选择要上传的文件夹')
return
local_path = local_path.strip() # 以防万一,去除首尾空格
local_path = local_path.rstrip('/') # 去除右部 /
local_path = local_path.rstrip('\') # 去除右部 \
remote_path = remote_path.strip()
remote_path = remote_path.rstrip('/')
remote_path = remote_path.rstrip('\')
self.ftp.cwd(remote_path)
last_dir = os.path.basename(local_path)
print(last_dir)
remote_path = os.path.join(remote_path, last_dir)
print(remote_path)
remote_path = remote_path.replace('\', '/') # 转为linux标准路径
print(remote_path)
# # 如果ftp服务器上不存在该路径,则创建对应路径下的目录
# try:
# self.ftp.mkd(last_dir)
# except:
# # print('dir: %s already exists' % last_dir)
# pass
#
# sub_items = os.listdir(local_path)[:2]
# print(sub_items)
# for sub_item in sub_items:
# sub_item_path = os.path.join(local_path, sub_item)
# if os.path.isdir(sub_item_path): # 如果子项目为目录
# self.upload_folder(sub_item_path, remote_path)
# else:
# self.upload_file(sub_item_path, remote_path)
# 上传文件
def upload_file(self, src_file_path, remote_path):
remote_file_name = os.path.split(src_file_path)[1]
remote_path = remote_path + '/' + remote_file_name
# try: # 如果文件不存在,调用file.size(filename)会报错
# if self.ftp.size(remote_path) != None:
# print("文件%s已存在" % remote_path)
# return
# except Exception as e:
# pass
with open(src_file_path, 'rb') as file_handler:
self.ftp.storbinary('STOR %s' % remote_path, file_handler)
print('文件:%s 已经上传到ftp' % src_file_path)
# 下载目录
def download_dir(self, local_path, remote_path):
if os.path.isfile(local_path):
print('出错了,请选择文件保存位置')
return
local_path = local_path.strip() # 以防万一,去除首尾空格
remote_path = remote_path.strip()
remote_path = remote_path.rstrip('/')
remote_path = remote_path.rstrip('\')
last_dir = os.path.basename(remote_path)
local_path = os.path.join(local_path, last_dir)
local_path = local_path.replace('/', '\') # 转为Windows标准路径
# 如果本地客户端不存在该路径,则创建对应路径下的目录
if not os.path.isdir(local_path):
os.mkdir(local_path)
sub_items = self.ftp.nlst(remote_path)
for sub_item in sub_items:
try:
self.ftp.cwd(sub_item) # 如果子项目为目录
self.download_dir(local_path, sub_item)
except Exception: # 非目录
self.download_file(local_path, sub_item)
def download_file(self, local_path, remote_file_path):
if os.path.isdir(local_path):
print('请选择文件保存目录路径')
return
last_file_name = os.path.split(remote_file_path)[1]
local_file_path = os.path.join(local_path, last_file_name)
if os.path.isfile(local_file_path):
local_file_path = local_file_path.replace('\', '/')
print('文件:%s 已存在' % local_file_path)
return
with open(local_file_path, 'wb') as file_handle:
self.ftp.retrbinary('RETR %s' % remote_file_path, file_handle.write)
if __name__ == '__main__':
ftp = MyFTP()
ftp.connect()
ftp.login()
print(os.getcwd())
# local_path = '/home/work/models'
local_path = 'C:\Users\Desktop\models'
path = os.listdir(local_path)
ftp.upload_folder(local_path, '/home/models')
# ftp.download_dir(local_path, '/home/models')
ftp.quit()
2.pyqt5和ftp结合实现上传功能,并修改文件名
up_date_model.py
# -*- coding: utf-8 -*-
# Form implementation generated from reading ui file 'update_model.ui'
#
# Created by: PyQt5 UI code generator 5.12.3
#
# WARNING! All changes made in this file will be lost!
from PyQt5 import QtCore, QtGui, QtWidgets
class Ui_MainWindow(object):
def setupUi(self, MainWindow):
MainWindow.setObjectName("MainWindow")
MainWindow.resize(800, 600)
self.centralwidget = QtWidgets.QWidget(MainWindow)
self.centralwidget.setObjectName("centralwidget")
self.pushButton = QtWidgets.QPushButton(self.centralwidget)
self.pushButton.setGeometry(QtCore.QRect(550, 170, 151, 34))
self.pushButton.setObjectName("pushButton")
self.label_3 = QtWidgets.QLabel(self.centralwidget)
self.label_3.setGeometry(QtCore.QRect(250, 40, 311, 61))
font = QtGui.QFont()
font.setPointSize(18)
self.label_3.setFont(font)
self.label_3.setObjectName("label_3")
self.label_4 = QtWidgets.QLabel(self.centralwidget)
self.label_4.setGeometry(QtCore.QRect(200, 270, 81, 18))
self.label_4.setObjectName("label_4")
self.comboBox = QtWidgets.QComboBox(self.centralwidget)
self.comboBox.setGeometry(QtCore.QRect(320, 270, 99, 24))
self.comboBox.setEditable(True)
self.comboBox.setObjectName("comboBox")
self.comboBox.addItem("")
self.comboBox.addItem("")
self.comboBox.addItem("")
self.comboBox.setItemText(2, "")
self.label = QtWidgets.QLabel(self.centralwidget)
self.label.setGeometry(QtCore.QRect(200, 160, 81, 18))
self.label.setObjectName("label")
self.label_2 = QtWidgets.QLabel(self.centralwidget)
self.label_2.setGeometry(QtCore.QRect(200, 210, 81, 18))
self.label_2.setObjectName("label_2")
self.lineEdit = QtWidgets.QLineEdit(self.centralwidget)
self.lineEdit.setGeometry(QtCore.QRect(320, 160, 113, 25))
self.lineEdit.setObjectName("lineEdit")
self.lineEdit_2 = QtWidgets.QLineEdit(self.centralwidget)
self.lineEdit_2.setGeometry(QtCore.QRect(320, 210, 113, 25))
self.lineEdit_2.setObjectName("lineEdit_2")
MainWindow.setCentralWidget(self.centralwidget)
self.menubar = QtWidgets.QMenuBar(MainWindow)
self.menubar.setGeometry(QtCore.QRect(0, 0, 800, 30))
self.menubar.setObjectName("menubar")
MainWindow.setMenuBar(self.menubar)
self.statusbar = QtWidgets.QStatusBar(MainWindow)
self.statusbar.setObjectName("statusbar")
MainWindow.setStatusBar(self.statusbar)
self.retranslateUi(MainWindow)
QtCore.QmetaObject.connectSlotsByName(MainWindow)
def retranslateUi(self, MainWindow):
_translate = QtCore.QCoreApplication.translate
MainWindow.setWindowTitle(_translate("MainWindow", "MainWindow"))
self.pushButton.setText(_translate("MainWindow", "更新"))
self.label_3.setText(_translate("MainWindow", "自动化上传"))
self.label_4.setText(_translate("MainWindow", "业务类型"))
self.comboBox.setCurrentText(_translate("MainWindow", "***"))
self.comboBox.setItemText(0, _translate("MainWindow", "***"))
self.comboBox.setItemText(1, _translate("MainWindow", "***"))
self.label.setText(_translate("MainWindow", "cid"))
self.label_2.setText(_translate("MainWindow", "did"))
too_update.py
import os
from update_model import Ui_MainWindow
import sys
from PyQt5.QtWidgets import QMainWindow,QFileDialog,QApplication,QMessageBox
from ftplib import FTP
import configparser
import os
class MyFTP:
def __init__(self):
'''ftp服务器主机IP,端口等配置'''
config = configparser.ConfigParser() # 类实例化
# config.read('./config.ini')
# self.ftp_host = config.get('FTP', 'ftp_host')
# self.ftp_port = config['FTP']['ftp_port']
# self.ftp_user = config['FTP']['ftp_user']
# self.ftp_passwd = config['FTP']['ftp_passwd']
self.ftp = FTP()
def get_ftp_host(self):
return self.ftp_host
def get_ftp_port(self):
return self.ftp_port
def get_ftp_user(self):
return self.ftp_user
def get_ftp_passwd(self):
return self.ftp_passwd
# 连接到ftp服务器
def connect(self):
print('is connecting to ftp server %s on %s' % (self.ftp_host, self.ftp_port))
self.ftp.connect(self.ftp_host, int(self.ftp_port))
# 登陆到ftp服务器
def login(self):
print('ready to login ftp server')
self.ftp.login(self.ftp_user, self.ftp_passwd)
print('login ftp server successfully')
print(self.ftp.getwelcome())
# 友好的关闭连接
def quit(self):
try:
self.ftp.quit()
print('colose ftp connection successfully')
except Exception as e:
print('%s' % e)
# 上传文件
def upload_file(self, src_file_path,cid,model_name, target_path):
remote_file_name = os.path.split(src_file_path)[1]
print(remote_file_name)
target_path = target_path + '/' +cid +'/'
try:
self.ftp.mkd(target_path)
except:
# print('dir: %s already exists' % last_dir)
pass
rename_path = target_path + model_name
remote_path = target_path + remote_file_name
print(remote_path)
print(rename_path)
try: # 如果文件不存在,调用file.size(filename)会报错
if self.ftp.size(rename_path) != None:
print("文件%s已存在" % remote_path)
return
except Exception as e:
pass
with open(src_file_path, 'rb') as file_handler:
self.ftp.storbinary('STOR %s' % remote_path, file_handler)
print('文件:%s 已经上传到ftp' % src_file_path)
print(self.ftp.cwd(target_path))
self.ftp.rename(remote_file_name,model_name)
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.ui = Ui_MainWindow()
self.ui.setupUi(self)
self.ui.pushButton.clicked.connect(self.import_data)
# self.ui.pushButton_2.clicked.connect(self.resure_rename)
def up_date(self,model_name):
ftp = MyFTP()
ftp.connect()
ftp.login()
# model_name = 'model_12_34_footfall.pkl'
date_type = model_name.split('_')[3].split('.')[0]
print(date_type)
cid = model_name.split('_')[1]
local_path = './model/' + date_type + '.pkl'
print(local_path)
ftp.upload_file(local_path,cid,model_name,'/home/woqu/v2nfs/appdir/sandbox/pos/models')
# ftp.upload_file(local_path,cid,model_name, '/data/yu1/dump/pos/models-yu1')
def datetype_revice(self,date_type):
date_type_english = 'footfall'
if date_type == '***':
date_type_english = "***english"
elif date_type == '***':
date_type_english = "***english"
return date_type_english
def import_data(self):
cid = self.ui.lineEdit.text()
did = self.ui.lineEdit_2.text()
date_type = self.ui.comboBox.currentText()
date_type_english = self.datetype_revice(date_type)
print(cid,did,date_type)
model_name = 'model_'+cid+'_'+did+'_'+date_type_english+'.pkl'
self.up_date(model_name)
print(model_name)
self.meassage('温馨提示',date_type+ ' ' + model_name + ' 上传成功')
# global data_path
# data_path = QFileDialog.getExistingDirectory(self, '选取文件夹', './')
# if data_path:
# self.ui.lineEdit.setText(data_path)
# else:
# self.meassage('文件夹路径错误','请选择正确的文件夹!')
# def resure_rename(self):
# files = os.listdir(data_path)
# try:
# for filename in files:
# portion = os.path.splitext(filename)
# old_path = os.path.join(data_path, filename)
# print(old_path)
# new_name = portion[0]+'_'+ portion[1]
# new_path = os.path.join(data_path, new_name)
# print(new_path)
# os.rename(old_path, new_path)
# self.ui.lineEdit_2.setText('文件名修改完毕!')
# except:
# self.ui.lineEdit_2.setText('文件名未修改成功!')
def meassage(self, str1,str2):
msgBox = QMessageBox.about(self, str1,str2)
if __name__ == "__main__":
app = QApplication(sys.argv)
win = MainWindow()
win.show()
sys.exit(app.exec_())



