参考:
https://docs.djangoproject.com/zh-hans/3.2/topics/db/optimization
https://www.jianshu.com/p/aded5b2029f6
https://angysmark.medium.com
https://www.kawabangga.com/posts/4484
Django 的 ORM 属于 Python 众多 ORM 选项中综合性能较高的 ORM 之一,同时作为 Django 高耦合的核心模块,我们在此必须投入十足的学习成本
ORM 帮助我们节省了很多 CURD 操作,数据之间的关联属性无论是外键,一对多还是多对多都能够很好的处理,极大的提高了开发效率,但是同样的,ORM 本身如果使用不当将造成严重的性能问题,比如对外键的渲染,很可能造成 N+1 查询问题或其他性能问题,尤其是使用 django-rest-framework 过程中,Serializer 中 Queryset 使用 .all() 的方式去获取等
1. 基本实践 1.1 常见分析方法常见用于 ORM 性能分析的,可以使用 QuerySet.explain() 获取 ORM 翻译的 SQL 的执行计划,与原始 SQL 的 explan 类似,当然也可以使用 py-spy、Django-silk、Django-debug-toolbar 查看语句或 SQL 执行耗时情况,也可以自定义中间件或者 logger 打印 SQL 及耗时
1.2 在开发前必须要做的事情在 model 设计阶段,要提前规划好可能被用做搜索条件的字段,简历好对应索引,编写 ORM 语句时要尽量依靠索引。
同时,对字段大小,长度及类型合理分析,选择合适的类型
1.3 理解 QuerySet QuerySet 是惰性的创建 QuerySet 并不会引发任何数据库活动。你可以将一整天的过滤器都堆积在一起,Django 只会在 QuerySet 被计算时执行查询操作,如下,只有在第四步 QuerySet 被使用时才执行 SQL
>>> q = Entry.objects.filter(headline__startswith="What") >>> q = q.filter(pub_date__lte=datetime.date.today()) >>> q = q.exclude(body_text__icontains="food") >>> print(q)QuerySet 只在被计算时执行
>QuerySet 本身可以被构造,过滤,切片,或者复制赋值等,是无需访问数据库的。只有在你需要从数据库取出数据或者,向数据库存入数据时才需要访问数据库。
你可以用以下方式执行一个 QuerySet:
-
迭代:一个 QuerySet 是可迭代的,当你第一次迭代它时,它就会执行其数据库查询。例如,这将打印数据库中所有条目的标题:
for e in Entry.objects.all(): print(e.headline)注意:如果你想做的只是确定至少一个结果是否存在,不要使用这个。使用 exists() 会更有效。
-
切片
正如在 限制 QuerySet 条目数 中所解释的那样,QuerySet 可以使用 Python 的数组切片语法进行切片。切片一个未执行的 QuerySet 通常会返回另一个未执行的 QuerySet,但如果使用切片语法的 step 参数,Django 会执行数据库查询,并返回一个列表。切片一个已经执行过的 QuerySet 也会返回一个列表。
还要注意的是,即使对一个未执行的 QuerySet 进行切片,返回另一个未执行的 QuerySet,也不允许进一步修改它(例如,添加更多的过滤器,或修改排序),因为这不能很好地翻译成 SQL,也没有明确的含义。 -
Pickle 序列化/缓存
关于 pickling QuerySets 时涉及的细节,请参见下一节。就本节而言,重要的是,结果是从数据库中读取的。
-
repr( )
当你调用 repr() 时,所在 QuerySet 会被执行。这是为了方便 Python 交互式解释器,所以当你交互式使用 API 时,可以立即看到你的结果。
-
len( )
当你调用 len() 时,会执行 QuerySet。正如你所期望的,这将返回结果列表的长度。
注意:如果你只需要确定集合中的记录数(而不需要实际的对象),那么使用 SQL 的 SELECt COUNT(*) 在数据库层面上处理计数会更有效率。Django 提供了一个 count()方法正是为了这个原因。 -
list( )
通过调用 list() 强制执行 QuerySet,例如:
entry_list = list(Entry.objects.all())
-
bool( )
在布尔语境中测试 QuerySet,如使用 bool()、or、and 或 if 语句,将导致查询被执行。如果至少有一个结果,则 QuerySet 为 True,否则为 False,例如:
if Entry.objects.filter(headline="Test"): print("There is at least one Entry with the headline Test")注意:如果你只想确定至少一个结果是否存在(而不需要实际的对象),使用 exences() 更高效。
如果你将一个 ORM 语句赋值给一个变量,比如 a1=Book.objects.first() 和 a2=Book.objects.first() ,a1 和 a2 在内存中是两个完全不同的对象,a1 的变化无法影响到 a2
再或者 a3=Book.objects.all(),这里已经被执行,我们此时进行切片等操作如 a3[1:3] 等,ORM 将从内存中检索结果
2. 使用iterator()执行 QuerySet (通过执行查询),并在结果上返回一个迭代器(见 PEP 234)。QuerySet 通常会在内部缓存其结果,因此重复执行不会导致额外的查询。相反,iterator() 将直接读取结果,而不在 QuerySet 级别做任何缓存(在内部,默认的迭代器调用 iterator() 并缓存返回值)。对于一个只需要访问一次就能返回大量对象的 QuerySet 来说,这可以带来更好的性能,并显著减少内存。
简单来说,当查询结果有很多对象时,QuerySet 的缓存行为会导致使用大量内存。如果你需要对查询结果进行好几次循环,这种缓存是有意义的,但是对于 queryset 只循环一次的情况,缓存就没什么意义了。在这种情况下,iterator() 可能会有所帮助。
for book in Books.objects.all():
do_xxx(book)
上面的查询,Django 会把 Books 的所有数据载入内存,然后进行一次循环,其实我们更想要保持这个数据库 connection, 每次循环的取出一条 book 数据,然后调用 do_xxx 操作。iterator 就是我们的救星。
for book in Books.objects.all().iterator():
do_xxx(book)
有了 iterator,你就可以编写线性数据表或者 CSV 流了。就能增量写入文件或者发送给用户。
特别是跟 values,values_list 结合在一起的时候,能尽可能少的使用内存。在需要对表中的每一行进行修改的迁移期间,使用 iterator 也非常方便。 不能因为迁移不是面向客户的就可以降低对效率的要求。 长时间运行的迁移可能意味着事务锁定或停机。
请注意,在已经被执行的 QuerySet 上使用 iterator() 会迫使它再次执行,重复查询。
另外,使用 iterator() 会导致之前的 refetch_related() 调用被忽略,因为这两种优化方式放在一起没有意义。
根据数据库后端,查询结果将被一次性加载或使用服务器端的游标从数据库中流转。具体查看
3. 在数据库中执行数据库操作,而不是在 Python 代码中例如:
- 在最基本的层面上,使用 filter 和 exclude 在数据库中进行过滤。
- 使用 F 表达式根据同一模型中的其他字段进行过滤。
- 利用 annotate 等在数据库中执行聚合查询
若其不足以生成你需要的 SQL:
3.1 使用 RawSQLPS:使用原始 SQL 查询时,需要额外注意 SQL 注入问题
最简单直接的方法是 RawSQL表达式,它允许一些 SQL 显式的添加到查询中。例如
>>> from django.db.models.expressions import RawSQL
>>> queryset.annotate(val=RawSQL("select col from sometable where othercol = %s", (param,)))
这些额外的查找可能无法移植到不同的数据库引擎中(因为你是显式地编写 SQL 代码),并且违反了 DRY 原则,所以你应该尽可能地避免它们。
# RawSQL expressions can also be used as the target of __in filters
>>> queryset.filter(id__in=RawSQL("select id from sometable where col = %s", (param,)))
3.2 使用原生 SQL
编写你自己的 自定义 SQL 来检索数据或填充模型。使用 django.db.connection.query 找出 Django 为你写的东西,例如
from django.db import connection
def my_custom_sql(self):
with connection.cursor() as cursor:
cursor.execute("UPDATe bar SET foo = 1 WHERe baz = %s", [self.baz])
cursor.execute("SELECT foo FROM bar WHERe baz = %s", [self.baz])
row = cursor.fetchone()
return row
4. 使用唯一索引列来检索单个对象
当使用 unique() 或 db_index 的列来检索单个对象时,有两个原因。首先,由于底层数据库索引的存在,查询的速度会更快。另外,如果多个对象与查找对象相匹配,查询的运行速度可能会慢很多;在列上有一个唯一约束保证这种情况永远不会发生。
因此使用 示例 Weblog 模型 :
>>> entry = Entry.objects.get(id=10)
会比以下更快:
>>> entry = Entry.objects.get(headline="News Item Title")
因为 id 通过数据库索引,并且保证是唯一的。
执行以下操作可能非常慢:
>>> entry = Entry.objects.get(headline__startswith="News")
首先,headline 没有被索引,这将使得底层数据库获取变慢。
其次,查找不保证只返回一个对象。如果查询匹配多于一个对象,它将从数据库中检索并传递所有对象。如果数据库位于单独的服务器上,那这个损失将更复杂,网络开销和延迟也是一个因素。
5. 提前对数据进行预取使用 select_related() 和 prefetch_related() 对 model 关联的外键或者多对多,一对多关系进行预取,将多个操作合并为一个 SQL,可以参考本站 《Django ORM中的N+1问题》 一文
需要注意的是
- 在 管理器和默认管理器 中使用。请注意管理器何时被使用;有时这很棘手,所以不要做出假设。
- 在视图代码层或其他层中,可能在需要时使用 prefetch_related_objects() 。
当您只想要一个 dict 或值 list,而不需要 ORM 模型对象时,请适当地使用 values()。
这些可以用于替换模板代码中的模型对象——只要您提供的 dicts 与模板中使用的 dicts 具有相同的属性,就没有问题。values() 结果为列表中的字典的查询集, values_list() 结果为列表中的元组的查询集,如果想要获取传统 python 的列表元组格式,可以在 values_list 中加入 flat=True 参数
6.2 使用 QuerySet.defer() 和 only()当您知道不需要(或在大多数情况下不需要)某些数据库列来避免加载它们时,才使用 defer()和 only()。请注意,如果确实使用它们,ORM 将不得不在一个单独的查询中获取它们,如果您不恰当地使用它们,那么这将是一个悲观化。
不要在没有分析的情况下过分使用延迟字段,因为数据库必须从磁盘中读取结果中单行的大部分非文本、非 VARCHAR 数据,即使它最终只使用的几列。当你不想加载许多文本数据或需要大量处理来转换回 Python 的字段, defer() 和 only() 方法最有用。总之,先分析,再优化。
6.3 使用 QuerySet.count()如果您只想要计数使用 queryset.count(),而不是 len(queryset)。
6.4 使用 QuerySet.exists()如果只是想要确认一个 ORM 是否能获取到数据,可以使用 queryset.exists()
6.5 不要过度使用 count() 和 exists()如果您需要 QuerySet 中的其他数据,只需计算它。
例如,假设电子邮件模型具有主题属性和与用户的多对多关系,则以下代码是最佳的:
if display_emails:
emails = user.emails.all()
if emails:
print('You have', len(emails), 'emails:')
for email in emails:
print(email.subject)
else:
print('You do not have any emails.')
这是最佳的,因为:
- 由于查询集是懒惰的,因此如果 display_email 为 False,则不会进行数据库查询。
- 将 user.emails.all() 存储在 emails 变量中允许重新使用其结果缓存。
- 如果电子邮件导致 QuerySet.__bool__() 被调用,这将导致在数据库上运行 user.Email.all() 查询。如果没有任何结果,它将返回 False,否则返回 True。
- 使用 len 调用 QuerySet.__len__(),重用结果缓存。
- for 循环迭代已经填充的缓存。
与其检索一堆对象、设置一些值并单独保存它们,不如通过 QuerySet.update() 使用批量 SQL UPDATE 语句。类似地,尽可能进行批量删除。
注意,尽管这些批量更新方法不会调用单独实例的 save() 或 delete() 方法,这意味着你为这些方法添加的任何自定义行为都不会执行,包括来自正常数据库对象信号( signals )的任何内容。
6.7 直接使用外键值如果只需要外键值,那么使用已有对象上的外键值,而不是获取所有相关对象并获取它的主键。比如:
entry.blog_id
替换为
entry.blog.id6.8 如无需要,不要排序结果
排序是耗时的;对每个字段的排序是数据库必须执行的操作。如果模型有一个默认排序( meta.ordering )并且不需要它,那么可以通过调用没有参数的 order_by() 在查询集上删除它。
添加索引到你的数据库上可以帮助改进排序性能。
7. 使用批量方法 7.1 批量插入当创建对象时,尽可能使用 bulk_create() 方法来减少 SQL 查询数量。比如:
Entry.objects.bulk_create([
Entry(headline='This is a test'),
Entry(headline='This is only a test'),
])
要优于:
Entry.objects.create(headline='This is a test') Entry.objects.create(headline='This is only a test')
注意这个方法有一些注意事项( caveats to this method ),因此要确保它适用于你的情况。
7.2 批量更新当更新对象时,尽可能使用 bulk_update() 方法来减少 SQL 查询数。给定对象的列表或查询集:
entries = Entry.objects.bulk_create([
Entry(headline='This is a test'),
Entry(headline='This is only a test'),
])
下面示例:
entries[0].headline = 'This is not a test' entries[1].headline = 'This is no longer a test' Entry.objects.bulk_update(entries, ['headline'])
要优于
entries[0].headline = 'This is not a test' entries[0].save() entries[1].headline = 'This is no longer a test' entries[1].save()
注意此方法有一些 注意事项 ,因此确保它适合你的案例。
7.3 批量插入当插入对象到 ManyToManyFields 时,使用带有多个对象的 add() 来减少 SQL 查询的数量。举例:
my_band.members.add(me, my_friend)
要优于:
my_band.members.add(me) my_band.members.add(my_friend)
其中 Bands 和 Artists 有多对多关系。
当不同的对象对插入到 ManyToManyField 或者自定义的 through表被定义时,可以使用 bulk_create()方法来减少 SQL 查询的数量。比如:
PizzaToppingRelationship = Pizza.toppings.through
PizzaToppingRelationship.objects.bulk_create([
PizzaToppingRelationship(pizza=my_pizza, topping=pepperoni),
PizzaToppingRelationship(pizza=your_pizza, topping=pepperoni),
PizzaToppingRelationship(pizza=your_pizza, topping=mushroom),
], ignore_conflicts=True)
要优于:
my_pizza.toppings.add(pepperoni) your_pizza.toppings.add(pepperoni, mushroom)
其中 Pizza 和 Topping 是多对多关系。注意这里有一些注意事项( caveats to this method ),因此要确保它适用于你的案例。
7.4 批量删除当从 ManyToManyFields 删除对象时,可以使用带有多个对象的 remove() 来减少 SQL 查询的数量。比如:
my_band.members.remove(me, my_friend)
要优于:
my_band.members.remove(me) my_band.members.remove(my_friend)
其中 Bands 和 Artists 有多对多关系。
当从 ManyToManyFields 里删除不同的对象对时,可以在带有多种 through 模型实例的 Q 表达式上使用 delete() 来减少 SQL 查询的数量。比如:
from django.db.models import Q
PizzaToppingRelationship = Pizza.toppings.through
PizzaToppingRelationship.objects.filter(
Q(pizza=my_pizza, topping=pepperoni) |
Q(pizza=your_pizza, topping=pepperoni) |
Q(pizza=your_pizza, topping=mushroom)
).delete()
要优于:
my_pizza.toppings.remove(pepperoni) your_pizza.toppings.remove(pepperoni, mushroom)
其中 Pizza 和 Topping 有多对多关系。
8. 使用 Cached PropertyDjango 提供了一很实用的装饰器 @cached_property ,用这个替换 @property 的话,一个对象在读取这个 property 的时候只会计算一次,同一个对象在第一次之后来读取这个 property 都会使用缓存。
有点类似于 Python 中的 @lru_cache。
9. DRF 实践无论是 Cache 还是 prefetch 的方法,都是有一些复杂的。如果前端用户到一些字段,就没有必要一次性返回。
刚开始写 DRF 中的 Serializer 的时候,倾向于每一个 Model 都有一个 Serializer,然后这些 Serializer 都互相关联。最终,导致查询一个列表页的时候,每一个 item 相关的数据,以及这些数据相关的数据,都被一次性展示出来了。即使优化过后也难以维护。
后来总结出来一个比较好的实践,是每一个 Model 都有两个 Serializer:
-
ListSerialzer:对于所有的外键只展开一层,不展开外键的外键
- 用于列表页 API 的显示
- 这样查询的时候,只需要对于每一个外键查询一次 in 就可以了
-
DetailSerializer:按需求展示所有的外键
- 用于详情页的渲染
- 对于每一个外键关联的 row,可能都要再进行一次查询,把所有关联的外键都展开,方便展示。但是因为只有一个对象,所以也不会特别慢。但是依然要注意 N + 1,如果嵌套的太深,考虑不一次展示那么多,新提供一个 API 进行查询
这样的好处是我们可以按需进行 prefetch,List 页面的 API 只需要 prefetch 直接关联的外键就可以了,Detail 的 API 可以按需进行级联 prefetch. 总体的原则就是尽量避免多重外键的 prefetch.
值得一提的是在 django-rest-framework 中,是可以在同一个 ModelViewSet 里面,针对不同的 API,使用不同的 Serializer 的:
def get_serializer_class(self):
if self.action == "list":
return ExperimentListSerializer
return super().get_serializer_class()
10. 常见调试 ORM 工具及执行时间查看
在 ipython 等交互环境中查看原始 SQL
确保 Django DEBUG 设置为 True。然后,
>>> from django.db import connection >>> connection.queries [] >>> Author.objects.all()]> >>> connection.queries [{u'time': u'0.002', u'sql': u'SELECt "library_author"."id", "library_author"."name" FROM "library_author" LIMIT 21'}]
connection.queries 只有在调试为真时。它是一个按查询执行顺序排列的字典列表。每个词典都有以下内容:
``sql`` -- The raw SQL statement ``time`` -- How long the statement took to execute, in seconds.
connection.queries 查询包括所有 SQL 语句——插入、更新、选择等。每次应用程序访问数据库时,查询都会被记录下来。
如果您使用多个数据库,您可以在 connections 字典的每个成员上使用相同的接口:
>>> from django.db import connections >>> connections['my_db_alias'].queries
如果需要在函数的任意位置手动清除查询列表,只需调用 reset_queries(),如下所示:
from django.db import reset_queries reset_queries()django-exensions 显示原始 SQL
在 shell 命令行的环境下,可以使用 django-exensions 的 shell_plus 命令并打开 --print-sql 选项。
python manage.py shell_plus --print-sql
运行该 shell 后可以在命令行输入查询语句,会自动打印 SQL 内容。
>>> Author.objects.all() SELECt "library_author"."id", "library_author"."name" FROM "library_author" LIMIT 21 Execution time: 0.001393s [Database: default]调试配置面板 django-debug-toolbar]>
这个教程很多就不说了,可以根据它展示的 SQL 情况针对性优化
自定义中间件自动打印 sql 执行情况新建文件 sql_middleware.py
"""
Gist code by vstoykov, you can check his original gist at:
https://gist.github.com/vstoykov/1390853/5d2e8fac3ca2b2ada8c7de2fb70c021e50927375
Changes:
Ignoring static file requests and a certain useless admin request from triggering the logger.
Updated statements to make it Python 3 friendly.
"""
from django.db import connection
from django.conf import settings
import os
def terminal_width():
"""
Function to compute the terminal width.
"""
width = 0
try:
import struct, fcntl, termios
s = struct.pack('HHHH', 0, 0, 0, 0)
x = fcntl.ioctl(1, termios.TIOCGWINSZ, s)
width = struct.unpack('HHHH', x)[1]
except:
pass
if width <= 0:
try:
width = int(os.environ['COLUMNS'])
except:
pass
if width <= 0:
width = 80
return width
def SqlPrintingMiddleware(get_response):
def middleware(request):
response = get_response(request)
if not settings.DEBUG or
len(connection.queries) == 0 or
request.path_info.startswith(settings.MEDIA_URL) or
'/admin/jsi18n/' in request.path_info:
return response
indentation = 2
print("nn%s 33[1;35m[SQL Queries for] 33[1;34m %s 33[0mn" % (" " * indentation, request.path_info))
width = terminal_width()
total_time = 0.0
for query in connection.queries:
nice_sql = query['sql'].replace('"', '').replace(',', ', ')
sql = " 33[1;31m[%s] 33[0m %s" % (query['time'], nice_sql)
total_time = total_time + float(query['time'])
while len(sql) > width - indentation:
print("%s%s" % (" " * indentation, sql[:width - indentation]))
sql = sql[width - indentation:]
print("%s%sn" % (" " * indentation, sql))
replace_tuple = (" " * indentation, str(total_time))
print("%s 33[1;32m[TOTAL TIME: %s seconds] 33[0m" % replace_tuple)
print("%s 33[1;32m[TOTAL QUERIES: %s] 33[0m" % (" " * indentation, len(connection.queries)))
return response
return middleware
然后在 settings.py 文件中的 MIDDLEWARE 部分添加
MIDDLEWARE = [
...
"sql_middleware.SqlPrintingMiddleware", # 注意你自己的导入位置
]
可以得到类似的输出
配置 Django logger 打印慢 SQL配置一个新的 logger,然后配置
"filters": {
"slow_sql_above_50ms": {
"()": "django.utils.log.CallbackFilter",
"callback": lambda record: not hasattr(record, "duration")
or record.duration > 0.05, # output slow queries only
},
},
就可以将 SQL 日志过滤出来,然后只 log 请求时间 >50ms 的。



