栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

Locust压力测试

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Locust压力测试

(目录在右侧→

官网

文档

配置参数

本文的代码文件地址

准备 一节系个人兴趣,可以直接看locust使用一节,测试可以找一个公共接口测试,如https://www.baidu.com

准备 数据库 起个数据库
# 起容器
docker run -itd --name test_db -p 3396:3306 -e MYSQL_ROOT_PASSWORD=123456 mariadb
# 登录
mysql -h192.168.1.105 -P3396 -uroot -p123456
# 创建数据库
MariaDB [(none)]> create database tdb;

造一些数据
import pymysql
import numpy as np

HOST = '192.168.1.105'
PORT = 3396
USER = 'root'
PWD = '123456'
DB = 'tdb'
TABLE = 'employee'

def getEmployee(bit=8):
    chars = [chr(i) for i in range(65, 91)] + [chr(i) for i in range(97, 123)]
    name = ''.join(np.random.choice(chars, bit))
    age = np.random.randint(18, 60)
    sex = np.random.choice(['0', '1'])
    return str((0, name, age, sex))

def dataGen():
    database = pymysql.connect(user=USER, password=PWD, host=HOST, port=PORT, database=DB, charset='utf8')
    cursor = database.cursor()

    # 创建表
    sql_ct = "CREATE TABLE IF NOT EXISTS {} ( " 
             "eid INT AUTO_INCREMENT, " 
             "ename  VARCHAr(20) NOT NULL, " 
             "age INT, " 
             "sex VARCHAr(1), " 
             "PRIMARY KEY(eid))".format(TABLE)
    cursor.execute(sql_ct)

    for i in range(100000):

        employee = getEmployee()
        sql_i = "INSERT INTO {} VALUE {}".format(TABLE, employee)
        cursor.execute(sql_i)
        if (i+1)%100==0:
            print('r[{}/100000]'.format(i+1), end='')
            database.commit()
    database.commit()

    database.close()

if __name__ == '__main__':
    dataGen()

构建出来的数据如下

起个服务 做个http服务,dbaserver.py
import pymysql
from socketserver import ThreadingMixIn
from http.server import HTTPServer
from http.server import SimpleHTTPRequestHandler
from sys import argv
import logging

HOST = '192.168.5.217'
PORT = 3396
USER = 'root'
PWD = '123456'
DB = 'tdb'
TABLE = 'employee'

def dataSelect(eid):
    database = pymysql.connect(user=USER, password=PWD, host=HOST, port=PORT, database=DB, charset='utf8')
    cursor = database.cursor()

    sql_s = "SELECT * FROM {} WHERe eid={}".format(TABLE, eid)
    cursor.execute(sql_s)
    res = cursor.fetchall()  # 获得结果
    database.close()
    if res:
        ee = res[0]
        return {'name': ee[1], 'age': ee[2], 'sex': '女' if ee[3]=='0' else '男'}
    return 'no this employee by eid={}'.format(eid)

class ThreadingServer(ThreadingMixIn, HTTPServer):
    pass

class RequestHandler(SimpleHTTPRequestHandler):
    def do_GET(self):
        self.send_response(200)
        self.send_header('Content-type', 'text/plain;charset=utf-8')
        try:
            eid = int(self.path[1:])
        except:
            eid = -1
        response = dataSelect(eid)
        logging.info('request of {} by eid={}'.format(response, eid))
        self.end_headers()
        self.wfile.write(str(response).encode())

def run(server_class=ThreadingServer, handler_class=RequestHandler, port=8888):
    server_address = ('0.0.0.0', port)
    httpd = server_class(server_address, handler_class)
    logging.info('server start in http://{}:{}'.format(*server_address))
    try:
        httpd.serve_forever()
    except KeyboardInterrupt:
        pass
    httpd.server_close()

def main():
    logging.basicConfig(level=logging.INFO, format='%(levelname)-8s %(asctime)s: %(message)s',
                        datefmt='%m-%d %H:%M')
    if len(argv) == 2:
        run(port=int(argv[1]))
    else:
        run()

if __name__ == '__main__':
    main()

运行这个python脚本,访问这个服务是这样的

locust压力测试 安装
pip install locust
基于python测试脚本
from locust import HttpUser, between, task
import numpy as np

class StressHandler(HttpUser):							# 继承locust的HttpUser类
    wait_time = between(0, 0)   						# 在请求之前发做下随机延时p1-p2秒

    @task   											# 添加该注解的函数才会被执行
    def testApi(self):
	      # 随机一个id
        eid = np.random.randint(1, 100000)	
        path = '/{}'.format(eid)
        # 这里只传path即可,域名将会从web前端传入
        self.client.get(path)
        
    # @task
    def testBaidu(self):
        path = '/'
        self.client.get(path)
运行locust
locust -f stresshandler.py

运行效果如下

然后可以从前端进行访问,0.0.0.0替换成启动locust的机器的ip或者127.0.0.1,如果不自行制定,默认端口是8089。

新建任务,注意host如果不是域名另外需要指定端口。

启动之后

进一步(多进程)

在真实的场景中,发压能力会受到当前机器性能的限制,CPU、带宽是比较常见的掣肘点,带宽可能不那么容易提高。合理的利用CPU的多核也可以进一步提高发压强度,locust也提供了这样的支持,甚至可以通过多台机器实现分布式发压。

可以用top命令观察CPU、内存占用

iftop观察带宽使用

iostat可以观察磁盘状态(这里用不到)

locust -f stresshandler.py --master		# master
# 另起一个控制台
locust -f stresshandler.py --worker		# worker
# 可以再起控制台起多个worker

在master的启动命令中,可以指定--master-bind-host和--master-bind-port,在worker的启动命令中,可以指定--master-host和--master-port,这样既可以在多台机器上起worker来实现更大规模的压力测试。默认host 127.0.0.1,默认port 5557。


这里起了两个worker,用户数和用户增加速度将会被均分到两个worker实现。

再进一步(配置文件)

可以追加的配置项众多,可以通过配置文件来统一配置。

# master.conf
# https://docs.locust.io/en/stable/configuration.html
locustfile = stresshandler.py
master = true
web-port = 6789
print-stats = true
only-summary = true
locust --config master.conf
# worker.conf
# https://docs.locust.io/en/stable/configuration.html
locustfile = stresshandler.py
headless = true
worker = true
# master-host = 127.0.0.1
# master-port = 5557
# 需要新开命令行窗口启动
locust --config salve.conf

master和worker的交互端口为5557,如果端口冲突,可以参见配置文档修改端口。

更进一步(跳过前端起任务)

可以配置不通过web端传入参数,直接写在配置文件里,启动后等结果就成了。

  # master.conf
  locustfile = stresshandler.py
  master = true
  web-port = 6789
  host = https://www.baidu.com
  users = 9
  spawn-rate = 3
  run-time = 20s
  headless = true                 		# 不在前端启动,需要配合上面的配置传入测试参数
  csv = ./data/csv_prefix		        # 生成csv结果文件,_stats.csv, _stats_history.csv and _failures.csv
  print-stats = true			    	# 在控制台打印状态
  html = ./data/web_report.html			# html前端报告
  print-stats = true
  only-summary = true
  # worker.conf
  locustfile = stresshandler.py
  headless = true
  worker = true
  # master-host = 127.0.0.1
  # master-port = 5557
locust --conf worker.conf
再完善下(脚本起多worker)

启动的过程也可以借助python的多线程。

import os
import subprocess

N_WORKER = 3
WORK_DIR = os.path.abspath(os.path.dirname(__file__))

def runMaster():
    print('start了master')
    subprocess.run('locust --config ./data/master.conf', shell=True, cwd=WORK_DIR)

def runWorker():
    print('start了一个worker')
    subprocess.Popen('locust --config ./data/worker.conf', shell=True, cwd=WORK_DIR)

def clear():
    # 清理掉跟5557的任务,是master跟worker默认交互的端口
    subprocess.run("kill -9 $(lsof -i:5557 | awk 'NR>1{print $2}')", shell=True)
    print('清理了任务')

def main():
    for i in range(N_WORKER):
        runWorker()
    runMaster()


if __name__ == '__main__':
    clear()
    main()

一个完整demo

最后,怎么用我这个起一个压测(本文的代码文件地址)

以这个url为例

https://www.baidu.com/s?wd=test		# 功能是百度搜索“test”
Host:	 https://www.baidu.com
path:	 /s
params:	 wd=test

修改测试脚本

  • 在stresshandler.py中添加这样一个task

    	@task
        def testSearch(self):
            path = '/s?wd=test'
            self.client.get(path)
    
  • 修改配置文件(data/master.conf,data/worker.conf),如果不从前端的话,这里的配置文件使用“再进一步”中的配置文件

  • 修改启动文件start.py,主要是修改worker进程数量,默认是3,

    N_WORKER = 3
    
  • 启动start.py,python3 start.py,报告会生成在data目录下

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/589238.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号