(目录在右侧→
官网
文档
配置参数
本文的代码文件地址
准备 一节系个人兴趣,可以直接看locust使用一节,测试可以找一个公共接口测试,如https://www.baidu.com
准备 数据库 起个数据库# 起容器 docker run -itd --name test_db -p 3396:3306 -e MYSQL_ROOT_PASSWORD=123456 mariadb # 登录 mysql -h192.168.1.105 -P3396 -uroot -p123456 # 创建数据库 MariaDB [(none)]> create database tdb;造一些数据
import pymysql
import numpy as np
HOST = '192.168.1.105'
PORT = 3396
USER = 'root'
PWD = '123456'
DB = 'tdb'
TABLE = 'employee'
def getEmployee(bit=8):
chars = [chr(i) for i in range(65, 91)] + [chr(i) for i in range(97, 123)]
name = ''.join(np.random.choice(chars, bit))
age = np.random.randint(18, 60)
sex = np.random.choice(['0', '1'])
return str((0, name, age, sex))
def dataGen():
database = pymysql.connect(user=USER, password=PWD, host=HOST, port=PORT, database=DB, charset='utf8')
cursor = database.cursor()
# 创建表
sql_ct = "CREATE TABLE IF NOT EXISTS {} ( "
"eid INT AUTO_INCREMENT, "
"ename VARCHAr(20) NOT NULL, "
"age INT, "
"sex VARCHAr(1), "
"PRIMARY KEY(eid))".format(TABLE)
cursor.execute(sql_ct)
for i in range(100000):
employee = getEmployee()
sql_i = "INSERT INTO {} VALUE {}".format(TABLE, employee)
cursor.execute(sql_i)
if (i+1)%100==0:
print('r[{}/100000]'.format(i+1), end='')
database.commit()
database.commit()
database.close()
if __name__ == '__main__':
dataGen()
构建出来的数据如下
起个服务 做个http服务,dbaserver.pyimport pymysql
from socketserver import ThreadingMixIn
from http.server import HTTPServer
from http.server import SimpleHTTPRequestHandler
from sys import argv
import logging
HOST = '192.168.5.217'
PORT = 3396
USER = 'root'
PWD = '123456'
DB = 'tdb'
TABLE = 'employee'
def dataSelect(eid):
database = pymysql.connect(user=USER, password=PWD, host=HOST, port=PORT, database=DB, charset='utf8')
cursor = database.cursor()
sql_s = "SELECT * FROM {} WHERe eid={}".format(TABLE, eid)
cursor.execute(sql_s)
res = cursor.fetchall() # 获得结果
database.close()
if res:
ee = res[0]
return {'name': ee[1], 'age': ee[2], 'sex': '女' if ee[3]=='0' else '男'}
return 'no this employee by eid={}'.format(eid)
class ThreadingServer(ThreadingMixIn, HTTPServer):
pass
class RequestHandler(SimpleHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.send_header('Content-type', 'text/plain;charset=utf-8')
try:
eid = int(self.path[1:])
except:
eid = -1
response = dataSelect(eid)
logging.info('request of {} by eid={}'.format(response, eid))
self.end_headers()
self.wfile.write(str(response).encode())
def run(server_class=ThreadingServer, handler_class=RequestHandler, port=8888):
server_address = ('0.0.0.0', port)
httpd = server_class(server_address, handler_class)
logging.info('server start in http://{}:{}'.format(*server_address))
try:
httpd.serve_forever()
except KeyboardInterrupt:
pass
httpd.server_close()
def main():
logging.basicConfig(level=logging.INFO, format='%(levelname)-8s %(asctime)s: %(message)s',
datefmt='%m-%d %H:%M')
if len(argv) == 2:
run(port=int(argv[1]))
else:
run()
if __name__ == '__main__':
main()
运行这个python脚本,访问这个服务是这样的
locust压力测试 安装pip install locust基于python测试脚本
from locust import HttpUser, between, task
import numpy as np
class StressHandler(HttpUser): # 继承locust的HttpUser类
wait_time = between(0, 0) # 在请求之前发做下随机延时p1-p2秒
@task # 添加该注解的函数才会被执行
def testApi(self):
# 随机一个id
eid = np.random.randint(1, 100000)
path = '/{}'.format(eid)
# 这里只传path即可,域名将会从web前端传入
self.client.get(path)
# @task
def testBaidu(self):
path = '/'
self.client.get(path)
运行locust
locust -f stresshandler.py
运行效果如下
然后可以从前端进行访问,0.0.0.0替换成启动locust的机器的ip或者127.0.0.1,如果不自行制定,默认端口是8089。
新建任务,注意host如果不是域名另外需要指定端口。
启动之后
进一步(多进程)在真实的场景中,发压能力会受到当前机器性能的限制,CPU、带宽是比较常见的掣肘点,带宽可能不那么容易提高。合理的利用CPU的多核也可以进一步提高发压强度,locust也提供了这样的支持,甚至可以通过多台机器实现分布式发压。
可以用top命令观察CPU、内存占用
iftop观察带宽使用
iostat可以观察磁盘状态(这里用不到)
locust -f stresshandler.py --master # master # 另起一个控制台 locust -f stresshandler.py --worker # worker # 可以再起控制台起多个worker
在master的启动命令中,可以指定--master-bind-host和--master-bind-port,在worker的启动命令中,可以指定--master-host和--master-port,这样既可以在多台机器上起worker来实现更大规模的压力测试。默认host 127.0.0.1,默认port 5557。
这里起了两个worker,用户数和用户增加速度将会被均分到两个worker实现。
可以追加的配置项众多,可以通过配置文件来统一配置。
# master.conf # https://docs.locust.io/en/stable/configuration.html locustfile = stresshandler.py master = true web-port = 6789 print-stats = true only-summary = true
locust --config master.conf
# worker.conf # https://docs.locust.io/en/stable/configuration.html locustfile = stresshandler.py headless = true worker = true # master-host = 127.0.0.1 # master-port = 5557
# 需要新开命令行窗口启动 locust --config salve.conf
master和worker的交互端口为5557,如果端口冲突,可以参见配置文档修改端口。
更进一步(跳过前端起任务)可以配置不通过web端传入参数,直接写在配置文件里,启动后等结果就成了。
# master.conf locustfile = stresshandler.py master = true web-port = 6789 host = https://www.baidu.com users = 9 spawn-rate = 3 run-time = 20s headless = true # 不在前端启动,需要配合上面的配置传入测试参数 csv = ./data/csv_prefix # 生成csv结果文件,_stats.csv, _stats_history.csv and _failures.csv print-stats = true # 在控制台打印状态 html = ./data/web_report.html # html前端报告 print-stats = true only-summary = true
# worker.conf locustfile = stresshandler.py headless = true worker = true # master-host = 127.0.0.1 # master-port = 5557
locust --conf worker.conf再完善下(脚本起多worker)
启动的过程也可以借助python的多线程。
import os
import subprocess
N_WORKER = 3
WORK_DIR = os.path.abspath(os.path.dirname(__file__))
def runMaster():
print('start了master')
subprocess.run('locust --config ./data/master.conf', shell=True, cwd=WORK_DIR)
def runWorker():
print('start了一个worker')
subprocess.Popen('locust --config ./data/worker.conf', shell=True, cwd=WORK_DIR)
def clear():
# 清理掉跟5557的任务,是master跟worker默认交互的端口
subprocess.run("kill -9 $(lsof -i:5557 | awk 'NR>1{print $2}')", shell=True)
print('清理了任务')
def main():
for i in range(N_WORKER):
runWorker()
runMaster()
if __name__ == '__main__':
clear()
main()
一个完整demo
最后,怎么用我这个起一个压测(本文的代码文件地址)
以这个url为例
https://www.baidu.com/s?wd=test # 功能是百度搜索“test” Host: https://www.baidu.com path: /s params: wd=test
修改测试脚本
-
在stresshandler.py中添加这样一个task
@task def testSearch(self): path = '/s?wd=test' self.client.get(path) -
修改配置文件(data/master.conf,data/worker.conf),如果不从前端的话,这里的配置文件使用“再进一步”中的配置文件
-
修改启动文件start.py,主要是修改worker进程数量,默认是3,
N_WORKER = 3
-
启动start.py,python3 start.py,报告会生成在data目录下



