高并发情况下,我们经常遇到这样的问题,就是多个用户同一时间更新表中的同一行的同一个字段,比如给count加1,此时不考虑锁的问题会造成数据丢失,比如A B 同时操作,A B 读取的count初始值都是1, A B 分别执行加 1 操作,操作后数据库的值是2,而不是3。
乐观锁
EFCore中用并发Token可以避免此类事情的发生,其原理就是执行update操作时添加了一个条件就是count=旧值 sql 大致如下
update testblock set count = 2 where count=oldcount(读取的旧值)
使用了乐观锁之后A B 会有一个人操作失败。因为他们读取的初始值都是1, 假定 A 先操作完,此时数据库的count值变成 2 , B 操作时 count=1(旧值)的条件不在成立。
悲观锁
就是执行下面这类sql,锁住表中的一行,待本次操作commit后,操作这一行的其他方法才会继续执行,悲观锁容易造成数据库的死锁
select 。。。。from testblock for update
关于Demo
demo里对必要的函数做了解释,其中使用tenacity 包实现retry功能,即操作失败后继续执行7(我的设置)次
PGTools.py
import psycopg2
class PGServer:
def __init__(self, user, password, host, port, db) -> None:
self.user = user
self.password = password
self.host = host
self.port = port
self.db = db
def connect(self):
self.conn = psycopg2.connect(
host=self.host, database=self.db, user=self.user, password=self.password, port=self.port)
self.cour = self.conn.cursor()
def execute(self,sql,data):
self.cour.execute(sql,data)
rowcount = self.cour.rowcount
self.conn.commit()
if rowcount<=0:
raise Exception('another has changed')
def fetchOne(self,sql,data):
self.cour.execute(sql,data)
return self.cour.fetchone()
def fetchAll(self,sql,data):
self.cour.execute(sql,data)
return self.cour.fetchall()
def closeConnect(self):
self.cour.close()
self.conn.close()
main.py
from PGTool.PGTools import PGServer
from threading import Thread
import time
from tenacity import retry,stop_after_attempt,RetryError
class Test:
def __init__(self) -> None:
self.server = PGServer('postgres', 'postgres',
'127.0.0.1', 5432, 'postgres')
self.server.connect()
# remove for update will occurred concurrency error
def update(self,sql):
id,count = self.server.fetchOne('select id,count from testlock where id=%(id)s for update',{'id':1})
print(id)
time.sleep(10)
self.server.execute(sql,{"count":int(count)+1,"id":id})
class TestOptimistic:
def __init__(self) -> None:
self.server = PGServer('postgres', 'postgres',
'127.0.0.1', 5432, 'postgres')
self.server.connect()
# will retry 7 times
@retry(stop=stop_after_attempt(7))
def update(self,sql):
try:
id,oldcount = self.server.fetchOne('select id,count from testlock where id=%(id)s',{'id':1})
print(oldcount)
self.server.execute(sql,{"count":int(oldcount)+1,"id":id,'oldcount':oldcount})
self.server.closeConnect()
except RetryError as e:
print(e)
def disconnetDB(self):
self.server.closeConnect()
print('after retry')
# 悲观锁
# t1 = Thread(target=Test().update,args=('update testlock set count=%(count)s where id=%(id)s',))
# t2 = Thread(target=Test().update,args=('update testlock set count=%(count)s where id=%(id)s',))
# t1.start()
# t2.start()
# 乐观锁
t1 = Thread(target=TestOptimistic().update,args=('update testlock set count=%(count)s where id=%(id)s and count=%(oldcount)s',))
t2 = Thread(target=TestOptimistic().update,args=('update testlock set count=%(count)s where id=%(id)s and count=%(oldcount)s',))
t1.start()
t2.start()
https://tenacity.readthedocs.io/en/latest/



