栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 系统运维 > 运维 > Linux

Python3 HTTP服务器文件上传

Linux 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Python3 HTTP服务器文件上传

# !/usr/bin/env python3
import datetime
import email
import html
import http.server
import io
import mimetypes
import os
import posixpath
import re
import shutil
import sys
import urllib.error
import urllib.parse
import urllib.request
from http import HTTPStatus

__version__ = "0.1"
__all__ = ["SimpleHTTPRequestHandler"]


class SimpleHTTPRequestHandler(http.server.baseHTTPRequestHandler):
    server_version = "SimpleHTTP/" + __version__
    extensions_map = _encodings_map_default = {
        '.gz': 'application/gzip',
        '.Z': 'application/octet-stream',
        '.bz2': 'application/x-bzip2',
        '.xz': 'application/x-xz',
    }

    def __init__(self, *args, directory=None, **kwargs):
        if directory is None:
            directory = os.getcwd()
            print(1111,directory)
        self.directory = os.fspath(directory)
        print(os.fspath(directory))
        super().__init__(*args, **kwargs)

    def do_GET(self):
        f = self.send_head()
        if f:
            try:
                self.copyfile(f, self.wfile)
            finally:
                f.close()

    def do_HEAD(self):
        f = self.send_head()
        if f:
            f.close()

    def send_head(self):
        path = self.translate_path(self.path)
        f = None
        if os.path.isdir(path):
            parts = urllib.parse.urlsplit(self.path)
            if not parts.path.endswith('/'):
                # redirect browser - doing basically what apache does
                self.send_response(HTTPStatus.MOVED_PERMANENTLY)
                new_parts = (parts[0], parts[1], parts[2] + '/',
                             parts[3], parts[4])
                new_url = urllib.parse.urlunsplit(new_parts)
                self.send_header("Location", new_url)
                self.end_headers()
                return None
            for index in "index.html", "index.htm":
                index = os.path.join(path, index)
                if os.path.exists(index):
                    path = index
                    break
            else:
                return self.list_directory(path)

        ctype = self.guess_type(path)
        if path.endswith("/"):
            self.send_error(HTTPStatus.NOT_FOUND, "File not found")
            return None
        try:
            f = open(path, 'rb')
        except OSError:
            self.send_error(HTTPStatus.NOT_FOUND, "File not found")
            return None
        try:
            fs = os.fstat(f.fileno())
            # Use browser cache if possible
            if ("If-Modified-Since" in self.headers
                    and "If-None-Match" not in self.headers):
                # compare If-Modified-Since and time of last file modification
                try:
                    ims = email.utils.parsedate_to_datetime(self.headers["If-Modified-Since"])
                except (TypeError, IndexError, OverflowError, ValueError):
                    # ignore ill-formed values
                    pass
                else:
                    if ims.tzinfo is None:
                        # obsolete format with no timezone, cf.
                        # https://tools.ietf.org/html/rfc7231#section-7.1.1.1
                        ims = ims.replace(tzinfo=datetime.timezone.utc)
                    if ims.tzinfo is datetime.timezone.utc:
                        # compare to UTC datetime of last modification
                        last_modif = datetime.datetime.fromtimestamp(
                            fs.st_mtime, datetime.timezone.utc)
                        # remove microseconds, like in If-Modified-Since
                        last_modif = last_modif.replace(microsecond=0)

                        if last_modif <= ims:
                            self.send_response(HTTPStatus.NOT_MODIFIED)
                            self.end_headers()
                            f.close()
                            return None

            self.send_response(HTTPStatus.OK)
            self.send_header("Content-type", ctype)
            self.send_header("Content-Length", str(fs[6]))
            self.send_header("Last-Modified",
                             self.date_time_string(fs.st_mtime))
            self.end_headers()
            return f
        except:
            f.close()
            raise

    def list_directory(self, path):
        try:
            list_dir = os.listdir(path)
        except OSError:
            self.send_error(HTTPStatus.NOT_FOUND, "No permission to list_dir directory")
            return None
        list_dir.sort(key=lambda a: a.lower())
        r = []
        try:
            display_path = urllib.parse.unquote(self.path, errors='surrogatepass')
        except UnicodeDecodeError:
            display_path = urllib.parse.unquote(path)
        display_path = html.escape(display_path, quote=False)
        enc = sys.getfilesystemencoding()

        form = """
            文件上传n
            
n n n
n""" title = 'Directory listing for %s' % display_path r.append('') r.append('n') r.append('' % enc) r.append('%sn' % title) r.append('%sn%s' % (form, title)) r.append('
n
    ') for name in list_dir: fullname = os.path.join(path, name) displayname = linkname = name # Append / for directories or @ for symbolic links if os.path.isdir(fullname): displayname = name + "/" linkname = name + "/" if os.path.islink(fullname): displayname = name + "@" # Note: a link to a directory displays with @ and links with / r.append('
  • %s
  • ' % (urllib.parse.quote(linkname, errors='surrogatepass'), html.escape(displayname, quote=False))) r.append('
n
nnn') encoded = 'n'.join(r).encode(enc, 'surrogate escape') f = io.BytesIO() f.write(encoded) f.seek(0) self.send_response(HTTPStatus.OK) self.send_header("Content-type", "text/html; charset=%s" % enc) self.send_header("Content-Length", str(len(encoded))) self.end_headers() return f def translate_path(self, path): # abandon query parameters path = path.split('?', 1)[0] path = path.split('#', 1)[0] # Don't forget explicit trailing slash when normalizing. Issue17324 trailing_slash = path.rstrip().endswith('/') try: path = urllib.parse.unquote(path, errors='surrogatepass') except UnicodeDecodeError: path = urllib.parse.unquote(path) path = posixpath.normpath(path) words = path.split('/') words = filter(None, words) path = self.directory for word in words: if os.path.dirname(word) or word in (os.curdir, os.pardir): # Ignore components that are not a simple file/directory name continue path = os.path.join(path, word) if trailing_slash: path += '/' return path def copyfile(self, source, outputfile): shutil.copyfileobj(source, outputfile) def guess_type(self, path): base, ext = posixpath.splitext(path) if ext in self.extensions_map: return self.extensions_map[ext] ext = ext.lower() if ext in self.extensions_map: return self.extensions_map[ext] guess, _ = mimetypes.guess_type(path) if guess: return guess return 'application/octet-stream' def do_POST(self): r, info = self.deal_post_data() self.log_message('%s, %s => %s' % (r, info, self.client_address)) enc = sys.getfilesystemencoding() res = [ '', 'n', '' % enc, '%sn' % "Upload Result Page", '%sn' % "Upload Result" ] if r: res.append('

SUCCESS: %s

n' % info) else: res.append('

FAILURE: %s

' % info) res.append('back' % self.headers['referer']) res.append('') encoded = 'n'.join(res).encode(enc, 'surrogate escape') f = io.BytesIO() f.write(encoded) length = f.tell() f.seek(0) self.send_response(200) self.send_header("Content-type", "text/html") self.send_header("Content-Length", str(length)) self.end_headers() if f: self.copyfile(f, self.wfile) f.close() def deal_post_data(self): content_type = self.headers['content-type'] if not content_type: return False, "Content-Type header doesn't contain boundary" boundary = content_type.split("=")[1].encode() remain_bytes = int(self.headers['content-length']) line = self.rfile.readline() remain_bytes -= len(line) if boundary not in line: return False, "Content NOT begin with boundary" line = self.rfile.readline() remain_bytes -= len(line) fn = re.findall(r'Content-Disposition.*name="file"; filename="(.*)"', line.decode()) if not fn: return False, "Can't find out file name..." path = self.translate_path(self.path) fn = os.path.join(path, fn[0]) line = self.rfile.readline() remain_bytes -= len(line) line = self.rfile.readline() remain_bytes -= len(line) try: out = open(fn, 'wb') except IOError: return False, "Can't create file to write, do you have permission to write?" preline = self.rfile.readline() remain_bytes -= len(preline) while remain_bytes > 0: line = self.rfile.readline() remain_bytes -= len(line) if boundary in line: preline = preline[0:-1] if preline.endswith(b'r'): preline = preline[0:-1] out.write(preline) out.close() return True, "File '%s' upload success!" % fn else: out.write(preline) preline = line return False, "Unexpect Ends of data." if __name__ == '__main__': http.server.test( HandlerClass=SimpleHTTPRequestHandler, ServerClass=http.server.HTTPServer )
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/675901.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号