栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

Python postgresql 悲观锁与乐观锁

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Python postgresql 悲观锁与乐观锁

高并发情况下,我们经常遇到这样的问题,就是多个用户同一时间更新表中的同一行的同一个字段,比如给count加1,此时不考虑锁的问题会造成数据丢失,比如A B 同时操作,A B 读取的count初始值都是1, A B 分别执行加 1 操作,操作后数据库的值是2,而不是3。
乐观锁
EFCore中用并发Token可以避免此类事情的发生,其原理就是执行update操作时添加了一个条件就是count=旧值 sql 大致如下

update testblock set count = 2 where count=oldcount(读取的旧值)

使用了乐观锁之后A B 会有一个人操作失败。因为他们读取的初始值都是1, 假定 A 先操作完,此时数据库的count值变成 2 , B 操作时 count=1(旧值)的条件不在成立。

悲观锁

就是执行下面这类sql,锁住表中的一行,待本次操作commit后,操作这一行的其他方法才会继续执行,悲观锁容易造成数据库的死锁

select 。。。。from testblock for update

关于Demo
demo里对必要的函数做了解释,其中使用tenacity 包实现retry功能,即操作失败后继续执行7(我的设置)次

PGTools.py

import psycopg2


class PGServer:

    def __init__(self, user, password, host, port, db) -> None:
        self.user = user
        self.password = password
        self.host = host
        self.port = port
        self.db = db

    def connect(self):
        self.conn = psycopg2.connect(
            host=self.host, database=self.db, user=self.user, password=self.password, port=self.port)
        self.cour = self.conn.cursor()
    
    def execute(self,sql,data):
        self.cour.execute(sql,data)
        rowcount = self.cour.rowcount
        self.conn.commit()
        if rowcount<=0:
            raise Exception('another has changed')

    def fetchOne(self,sql,data):
        self.cour.execute(sql,data)
        return self.cour.fetchone()
    
    def fetchAll(self,sql,data):
        self.cour.execute(sql,data)
        return self.cour.fetchall()

    
    def closeConnect(self):
        self.cour.close()
        self.conn.close()

main.py

from PGTool.PGTools import PGServer
from threading import Thread
import time
from tenacity import retry,stop_after_attempt,RetryError

class Test:
    def __init__(self) -> None:
        self.server = PGServer('postgres', 'postgres',
                               '127.0.0.1', 5432, 'postgres')
        self.server.connect()

    
    # remove for update  will occurred concurrency error
    def update(self,sql):
        id,count = self.server.fetchOne('select id,count from testlock where id=%(id)s for update',{'id':1})
        print(id)
        time.sleep(10)
        self.server.execute(sql,{"count":int(count)+1,"id":id})

class TestOptimistic:
    def __init__(self) -> None:
        self.server = PGServer('postgres', 'postgres',
                               '127.0.0.1', 5432, 'postgres')
        self.server.connect()
        
    # will retry 7 times
    @retry(stop=stop_after_attempt(7))
    def update(self,sql):
        try:
            id,oldcount = self.server.fetchOne('select id,count from testlock where id=%(id)s',{'id':1})
            print(oldcount)
            self.server.execute(sql,{"count":int(oldcount)+1,"id":id,'oldcount':oldcount})
            self.server.closeConnect()
        except RetryError as e:
            print(e)
        
    def disconnetDB(self):
        self.server.closeConnect()
        print('after retry')
       
# 悲观锁 
# t1 = Thread(target=Test().update,args=('update testlock set count=%(count)s where id=%(id)s',))   
# t2 = Thread(target=Test().update,args=('update testlock set count=%(count)s where id=%(id)s',))   
# t1.start()
# t2.start()


# 乐观锁
t1 = Thread(target=TestOptimistic().update,args=('update testlock set count=%(count)s where id=%(id)s and count=%(oldcount)s',))   
t2 = Thread(target=TestOptimistic().update,args=('update testlock set count=%(count)s where id=%(id)s and count=%(oldcount)s',))   
t1.start()
t2.start()

https://tenacity.readthedocs.io/en/latest/

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/498745.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号