栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

写一个基于 sqlite3 的 sqlite 连接类

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

写一个基于 sqlite3 的 sqlite 连接类

写一个基于 sqlite3 的 sqlite 连接类
    • 写作背景
    • 思路
      • 0. 预处理
      • 1. 创建数据库连接对象
      • 2. 返回数据库对象
      • 3. 创建数据表
      • 4. 基础操作
      • 5. 关闭数据库连接
        • 显示关闭
        • 隐式关闭
    • 代码展示
    • 结尾

写作背景

本菜鸡在练习编程的时候经常会有 链接数据库 的需求,一般 为了简便 ,连接的是 sqlite 数据库,为了 简化代码 ,于是想 自定义类 。本文就由本菜鸡带你们 自定义 一个 sqlite 连接类 。
如果觉得我这篇文章写的好的话,能不能给我 点个赞 ,评论 一波。
如果要点个 关注 的话也不是不可以珞。

思路

这个类需要实现以下功能:

  1. 既然要链接数据库,最基础的一定是 初始化数据库连接 ,在函数 __init__ 中就能实现。
  2. 如果 需要连接对象 ,就 返回上一步创建的连接对象 ,在函数 return_driver 中实现。
  3. 连接完数据库后,我们需要 创建数据表 ,使用函数 exec_sql 实现(其实就是传入 SQL 语句直接执行)。
  4. 从删库到跑路 ,既然是要操作数据库,那么基本的 增、删、改、查 一个都不能少,在函数 insert 、 delete 、 modify 和 search 中实现。
  5. 记得可不要忘了 关闭数据库连接 ,要不然会出问题哦。
  6. 记录日志,在必要的地方添加日志记录,方便查看问题。

思路有了,那我们现在就来一一实现吧!!!
注:以下各步均为类中的某一函数。

0. 预处理

首先我们要导入必要的库:

  • sqlite3 ,这个是 核心 ,以下代码都要围绕着它转。这个库是 Python 内置库 ,不需要 pip 即可使用。
  • logging ,用来 记录日志 。

然后我们设置一下 日志格式 ,如下
时间 - 文件名[line: 行数] - 日志级别 - Message:信息
那我们就设置一下日志(我选择在类外设置,也可以在类内定义个函数设置):

logging.basicConfig(
	filename='process.log',
	filemode='a',
	format='%(asctime)s - %(filename)s[line: %(lineno)d] - %(levelname)s - Message: %(message)s',
	level=logging.DEBUG
)

上述代码的意思是:

  • 日志保存在文件 process.log 中
  • 日志写入的模式是 a ,也就是 追加写
  • 日志的格式如上述
  • 日志的级别是 DEBUG 级别,也就是 各种提示信息都写入日志文件 ,包括 报错 、 信息 、 警告 等。
1. 创建数据库连接对象

这一步很简单,简简单单一句话:

def __init__(self, db_name):
	self.driver = sqlite3.connect(db_name)

也就是 实例化对象时 要传入 数据库名 ,用来 创建数据库连接 。

2. 返回数据库对象

这一步也是简简单单一句话,就直接把上一步创建的连接对象 retrun 就好了

def return_driver(self):
	return self.driver
3. 创建数据表

也就是函数直接执行传入的 SQL 语句。
创建数据表的语法如下:

CREATE TABLE 【table_name】 (
	【字段名】 【字段约束】,
	...
)

然后我们执行 SQL 语句即可。

def exec_sql(self, sql):
    try:
        # 执行 sql 语句
        self.driver.execute(sql)
        # 提交 sql 语句
        self.driver.commit()
        # 写入日志
        logging.info(f'Execute sql successfully.')
        return True
    except Exception as e:
        logging.error(e)
        return False

测试一下。

日志显示语法错误。
修改正确后:



目测代码执行没问题。

现在看一下数据库。

可以看到,我们预期的功能实现了。

4. 基础操作 增

增加数据就要用到 SQL 中的 insert 语句,语法如下:

INSERT INTO 【表名】 (【以 ',' 分隔的字段名】) VALUES (【以 ',' 分隔的值】)

其中,字段名 可以省略,代表 向所有字段插入数据 。

了解语法后,我们可以定义一个函数,实现以下功能:

  • 传入 要插入数据的表名 和 要插入的数据(字典 / 列表) 。
  • 使用第一步创建的 连接对象 执行 SQL 语句并提交即可
  • 注:不能忘了异常捕捉。
def insert(self, table, item):

	# 如果传入的是字典,就对应字段增加数据
	if type(item) == dict:
	
		# 字段部分初始化
        key_sql = ''
        
        # 数据部分初始化
        value_sql = ''
        
        # 当 item 中有元素时,取出一个元素,并拼接字符串
        while item:
            key, value = item.popitem()
            
			# 字段名直接加入字段名字符串
            key_sql += f'`{key}`'
            
            # 如果数据部分是字符串
            if type(value) == str:
            
            	# 删除影响插入的特殊字符
                value = value.replace('\', '').replace('/', '').replace('t', '').replace('n', '').replace('[', '【').replace(']', '】').replace('"', '\"')
                
                # 拼接字符串
                # 如果字符串有单引号就用双引号
                # 否则相反
                value_sql += f'"{value}"' if "'" in value else f"'{value}'"

			# 如果数据不是字符串,直接将其变成字符串
			# 针对 整形,浮点型 等数据类型
            else:
                value_sql += str(value)
            
            # 如果不是最后一组键值对,加入 ',' 分隔
            if item:
               key_sql += ', '
               value_sql += ', '
       
       # 把字段部分和数据部分的字符串拼接到 sql 语句上
        sql = f'insert into {table} ({key_sql}) values ({value_sql})'
    
    # item 为列表,代表插入 所有字段 的数据
    else:
    	
    	# 初始化数据字符串
        value_sql = ''
        
		# 遍历数据
        for i, t in enumerate(item):

			# 同上处理方法
            if type(t) == str:
            
                t = t.replace('\', '').replace('/', '').replace('t', '').replace('n', '').replace('[', '【').replace(']', '】').replace('"', '\"')
                
                value_sql += f'"{t}"' if "'" in t else f"'{t}'"
                
            else:
            
                value_sql += str(t)
                
            if i != len(item)-1:
            
                value_sql += ', '

        value_sql = '(' + value_sql + ')' if not value_sql.startswith('(') and not value_sql.endswith(')') else value_sql
        
        sql = f'insert into {table} values {value_sql}'

	# 输出 SQL 语句,便于查看是否有误
    print('insert_sql:', sql)
    
    # 尝试提交 SQL 语句
    # 提交成功返回 True
    # 提交失败返回 False
    try:
        self.driver.execute(sql)
        self.driver.commit()
        return True
    except:
        return False

我们可以把上述处理 value 的过程单独写成一个函数,以便调用。

def __check_value(self, value):

	# 如果数据是字符串,删除特殊符号
    if type(value) == str:
    
        value = value.replace('\', '').replace('/', '').replace('t', '')
        	.replace('n', '').replace('[', '【').replace(']', '】')
        	.replace('"', "'")
        	
        # 为数据加上引号,因为插入数据时字符串数据需要引号
        return f'"{value}"'
    
    else:
        return str(value)

而 value_sql += value 就应该变成 value_sql += self.__check_value(value) 。

感觉还是有点麻烦,所以可以使用 列表生成器 再简化一下代码,如下:

def insert(self, table, item):

	# 如果是字典
    if type(item) == dict:
    
        # 字段部分初始化
        key_sql = []
        # 数据部分初始化
        value_sql = []
        
        # 当 item 中有元素时,随机取出一个元素,添加到列表中
        while item:
            key, value = item.popitem()
			
			# 将列名加入列表中,为防止与 SQL 内置关键字重名,使用 `` 包含列名
            key_sql.append(f'`{key}`')
            
            # 数据处理后加入列表中
            value_sql.append(self.__check_value(value))
        
        # 生成字符串
        key_sql, value_sql = ', '.join(key_sql), ', '.join(value_sql)
        
        # 把字段部分和数据部分的字符串拼接到 sql 语句上
        sql = f'insert into {table} ({key_sql}) values ({value_sql})'
        
    else:
		
        value_sql = '(' + ', '.join([self.__check_value(t) for t in item]) + ')'

        sql = f'insert into {table} values {value_sql}'
        
    print('insert_sql:', sql)
	
	# 执行并提交
    try:
        self.driver.execute(sql)
        self.driver.commit()
        logging.info('Execute insert sql successfully.')
        return True
    except Exception as e:
    	logging.error(e)
        return False

测试一下。
看一下日志。

看一下数据库。
成功完成。

SQL 中 删除数据 时要用到 delete 关键字,语法如下:

DELETE FROM TABLE 【table_name】 【限定条件】

那我们就可以编写函数,传入 表名 和 限制条件(默认为空) ,代码如下:

def delete(self, table, limit=''):
    sql = f'delete from {table} '
    
    # 如果有限制条件就加上
    if limit:
        sql += limit
        
    try:
        # 执行删除语句
        self.driver.execute(sql)
        # 提交
        self.driver.commit()
        
        logging.info('Execute delete sql successfully.')
        return True
        
    except Exception as e:
        logging.error(e)
        return False

测试一下。
看一下日志。

看一下数据库中的数据。

只剩下了一条数据,说明删除操作成功执行。

修改数据 也是业务中经常遇到的一种操作,要使用 update 关键字,语法如下:

UPDATE 【table_name】 SET 【键】=【值】, ... 【限制条件】

省略号 ... 代表可能要修改多个键值。
因为同样有设置值的部分,所以我们可以借鉴一下上述 增 的代码,只不过传入的参数要增加一个:限制条件,如果没有限制条件的话,就会 对所有数据进行修改操作 。函数代码如下:

def modify(self, table, item, limit=None):

    # 同上,拼接 sql 语句
    sql = f'update {table} set '

    set_sql = []
    
    while item:
        key, value = item.popitem()
        
        if type(value) == str:
            value = self.__check_value(value)
            set_sql.append(f'`{key}`="{value}"')
            
        else:
            set_sql.append(f'`{key}`={value}')
            
    sql += ', '.join(set_sql)
    
    # 如果有限制条件就加上
    if limit:
        sql += f' {limit}'
        
    print('update_sql:', sql)

    # 执行 sql 语句并返回结果
    try:
        self.driver.execute(sql)
        self.driver.commit()
        logging.info('Execute update sql successfully.')
        return True
    except Exception as e:
        logging.error(e)
        return False

测试一下。

看一下日志。
看一下数据库。

成功完成 修改数据 操作。

查找数据 可谓是 最频繁使用 的业务了。因为我们很多时候都是要查找数据库中的数据,为了 数据分析 、机器学习 之类的,又或者只是 单纯的显示数据 ,而该操作基于 select 关键字,语法如下:

SELECT 【字段名】 FROM 【table_name】 【限制条件】

关键点在于 字段名 和 限制条件 ,我们定义的函数接受三个参数:表名 、字段名 和 限制条件 。代码如下:

def search(self, table, field='*', limit=None):
    
    # 如果要搜索的字段为列表,就把列表转换成 sql 语句的形式
    s_field = ', '.join(field) if type(field) == list else field
    
    # 拼接 sql 字符串
    sql = f'select {s_field} from {table}'
    
    # 如果有限制条件就加上
    if limit:
        sql += f' {limit}'
        
    print('search_sql:', sql)
    logging.info(f'Search sql: {sql}.')
    
    # 创建游标
    cs = self.driver.cursor()
    # 执行查询语句
    cs.execute(sql)
    # 获得返回的所有结果
    result = cs.fetchall()
    
    print('search_result:', result)
	# 写入日志
	logging.info(f'Search result: {result}.')
    return result

为了测试,我们先多增加一些数据。增加后的数据表如下。

  1. 让我们查找一下表中的 所有数据 。

    返回了正确结果。

  2. 查找一下 姓名 为 lhys 的数据。

  3. 查找一下 count 值为 13 的数据。

  4. 查找一下 count 值为 偶数 的数据。

    就先测试到这里,如果各位看官还有想测试的可以 自行测试 哦。

5. 关闭数据库连接

我们可以选择关闭的模式:显示关闭 和 隐式关闭 。

显示关闭

显示关闭好理解,就调用函数,关闭连接就行了,这个就不设置返回值了。代码如下:

def close(self):
	logging.info('Connection closed.')
    self.driver.close()
    return 


还行。

隐式关闭

这个就要利用 Python 中的关键字 with ,也就是像我们打开文件一样:

with open('xxx.xx', 'r') as f:
	f.read()

我们也让我们这个类支持这种功能,怎么实现呢?这就需要设置 __enter__ 函数和 __exit__ 函数了。参考 这篇文章。

def __enter__(self):
	print('使用了 with 关键字.')
    logging.info('使用了 with 关键字.')
    # 返回实例化对象
    return self

def __exit__(self, exc_type, exc_val, exc_tb):
	print('退出了.')
    logging.info('退出了.')
    # 调用 显示关闭 函数,关闭连接
    self.close()
    return True

尝试一下。
看一下日志。

乱码了,不过可以看出来功能还是正常实现了。

代码展示

将上述代码合到一个类中,代码如下:

#!/usr/bin/python3
# -*- coding=utf-8 -*-

import logging
import sqlite3

logging.basicConfig(
    filename='process.log',
    filemode='a',
    format='%(asctime)s - %(filename)s[line: %(lineno)d] - %(levelname)s - Message: %(message)s',
    level=logging.DEBUG
)

class SqliteOperation:

    def __init__(self, db_name):
        self.driver = sqlite3.connect(db_name)

    def return_driver(self):
        return self.driver

    def exec_sql(self, sql):
        try:
            # 执行 sql 语句
            self.driver.execute(sql)
            # 提交 sql 语句
            self.driver.commit()
            logging.info(f'Execute sql successfully.')
            return True
        except Exception as e:
            logging.error(e)
            return False

    def __check_value(self, value):
        if type(value) == str:
            value = value.replace('\', '').replace('/', '')
                .replace('t', '').replace('n', '').replace('[', '【')
                .replace(']', '】').replace('"', "'")
            return f'"{value}"'
        else:
            return str(value)

    def insert(self, table, item):
        if type(item) == dict:
            # 字段部分初始化
            key_sql = []
            # 数据部分初始化
            value_sql = []
            # 当 item 中有元素时,随机取出一个元素,并拼接字符串
            while item:
                key, value = item.popitem()
                key_sql.append(f'`{key}`')
                value_sql.append(self.__check_value(value))

            key_sql, value_sql = ', '.join(key_sql), ', '.join(value_sql)
            # 把字段部分和数据部分的字符串拼接到 sql 语句上
            sql = f'insert into {table} ({key_sql}) values ({value_sql})'
        else:

            value_sql = '(' + ', '.join([self.__check_value(t) for t in item]) + ')'

            sql = f'insert into {table} values {value_sql}'

        print('insert_sql:', sql)
        logging.info(f'Insert sql: {sql}.')

        try:
            self.driver.execute(sql)
            self.driver.commit()
            logging.info('Execute insert sql successfully.')
            return True
        except Exception as e:
            logging.error(e)
            return False

    def delete(self, table, limit=None):

        sql = f'delete from {table} '
        # 如果有限制条件就加上
        if limit:
            sql += limit

        logging.info(f'Delete sql: {sql}.')

        try:
            # 执行删除语句
            self.driver.execute(sql)
            # 提交
            self.driver.commit()

            logging.info('Execute delete sql successfully.')

            return True
        except Exception as e:
            logging.error(e)
            return False

    def modify(self, table, item, limit=None):
        # 同上,拼接 sql 语句
        sql = f'update {table} set '
        set_sql = []

        while item:
            key, value = item.popitem()

            if type(value) == str:
                value = self.__check_value(value)
                set_sql.append(f'`{key}`={value}')

            else:
                set_sql.append(f'`{key}`={value}')

        sql += ', '.join(set_sql)

        # 如果有限制条件就加上
        if limit:
            sql += f' {limit}'

        print('update_sql:', sql)
        logging.info(f'Update sql: {sql}.')

        # 执行 sql 语句并返回结果
        try:
            self.driver.execute(sql)
            self.driver.commit()
            logging.info('Execute update sql successfully.')
            return True
        except Exception as e:
            logging.error(e)
            return False

    def search(self, table, field='*', limit=None):

        # 如果要搜索的字段为列表,就把列表转换成 sql 语句的形式
        s_field = ', '.join(field) if type(field) == list else field

        # 拼接 sql 字符串
        sql = f'select {s_field} from {table}'

        # 如果有限制条件就加上
        if limit:
            sql += f' {limit}'

        print('search_sql:', sql)
        logging.info(f'Search sql: {sql}.')

        cs = self.driver.cursor()
        cs.execute(sql)
        result = cs.fetchall()

        print('search_result:', result)
        logging.info(f'Search result: {result}.')
        return result

    def close(self):
        logging.info('Connection closed.')
        self.driver.close()

    def __enter__(self):
        print('使用了 with 关键字.')
        logging.info('使用了 with 关键字.')
        # 返回实例化对象
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        print('退出了.')
        logging.info('退出了.')
        self.close()
        return True



结尾

有想要一起学习 python 的小伙伴可以 私信我 进群哦。

以上就是我要分享的内容,因为学识尚浅,会有不足,还请各位大佬指正。
有什么问题也可在评论区留言。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/445399.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号