import mysql.connector.pooling
__config={
"host":"localhost",
"port":3306,
"user":"root",
"password":"cheng.1023",
"database":"vega"
}
try:
pool=mysql.connector.pooling.MySQLConnectionPool(
**__config,
pool_size=10
)
except Exception as e:
print(e)
二,创建user_dao.py定义用户sql内容
from db.mysql_db import pool
class UserDao:
#验证用户登陆
def login(self,username,password):
try:
con=pool.get_connection()
cursor=con.cursor()
sql="SELECt COUNT(*) FROM t_user WHERe username=%s AND "
"AES_DECRYPT(UNHEx(password),'HelloWorld')=%s"
cursor.execute(sql,(username,password))
count=cursor.fetchone()[0]
return True if count==1 else False
except Exception as e:
print(e)
finally:
if "con" in dir():
con.close()
#查询用户角色
def search_user_role(self,username):
try:
con=pool.get_connection()
cursor=con.cursor()
sql="SELECt r.role FROM t_user u JOIN t_role r ON u.role_id=r.id "
"WHERe u.username=%s"
cursor.execute(sql,[username])
role=cursor.fetchone()[0]
return role
except Exception as e:
print(e)
finally:
if "con" in dir():
con.close()
#添加记录
def insert(self,username,password,email,role_id):
try:
con = pool.get_connection()
con.start_transaction()
cursor=con.cursor()
sql="INSERT INTO t_user(username,password,email,role_id) "
"VALUES(%s,HEx(AES_ENCRYPT(%s,'HelloWorld')),%s,%s)"
cursor.execute(sql,(username,password,email,role_id))
con.commit()
except Exception as e:
if "con" in dir():
con.rollback()
print(e)
finally:
if "con" in dir():
con.close()
#查询用户总页数
def search_count_page(self):
try:
con=pool.get_connection()
cursor=con.cursor()
sql="SELECt CEIL(COUNT(*)/10) FROM t_user"
cursor.execute(sql)
count_page=cursor.fetchone()[0]
return count_page
except Exception as e:
print(e)
finally:
if "con" in dir():
con.close()
#查询用户分页记录
def search_list(self,page):
try:
con=pool.get_connection()
cursor=con.cursor()
sql="SELECt u.id,u.username,r.role "
"FROM t_user u JOIN t_role r "
"ON u.role_id=r.id "
"ORDER BY u.id "
"LIMIT %s,%s"
cursor.execute(sql,((page-1)*10,10))
result=cursor.fetchall()
return result
except Exception as e:
print(e)
finally:
if "con" in dir():
con.close()
#修改用户信息
def update(self,id,username,password,email,role_id):
try:
con = pool.get_connection()
con.start_transaction()
cursor=con.cursor()
sql="UPDATE t_user SET username=%s,"
"password=HEx(AES_ENCRYPT(%s,'HelloWorld')),"
"email=%s,role_id=%s "
"WHERe id=%s"
cursor.execute(sql,(username,password,email,role_id,id))
con.commit()
except Exception as e:
if "con" in dir():
con.rollback()
print(e)
finally:
if "con" in dir():
con.close()
#删除用户
def delete_by_id(self,id):
try:
con = pool.get_connection()
con.start_transaction()
cursor=con.cursor()
sql="DELETE FROM t_user WHERe id=%s"
cursor.execute(sql,[id])
con.commit()
except Exception as e:
if "con" in dir():
con.rollback()
print(e)
finally:
if "con" in dir():
con.close()
三,创建news_dao.py定义新闻sql内容
from db.mysql_db import pool
class NewsDao:
#查询待审批新闻列表
def search_unreview_list(self,page):
try:
con=pool.get_connection()
cursor=con.cursor()
sql="SELECt n.id,n.title,t.type,u.username "
"FROM t_news n JOIN t_type t ON n.type_id=t.id "
"JOIN t_user u ON n.editor_id=u.id "
"WHERe n.state=%s "
"ORDER BY n.create_time DESC "
"LIMIT %s,%s"
cursor.execute(sql,("待审批",(page-1)*10,10))
result=cursor.fetchall()
return result
except Exception as e:
print(e)
finally:
if "con" in dir():
con.close()
# 查询待审批新闻的总页数
def search_unreview_count_page(self):
try:
con=pool.get_connection()
cursor=con.cursor()
sql="SELECt CEIL(COUNT(*)/10) FROM t_news WHERe state=%s"
cursor.execute(sql,["待审批"])
count_page=cursor.fetchone()[0]
return count_page
except Exception as e:
print(e)
finally:
if "con" in dir():
con.close()
#审批新闻
def update_unreview_news(self,id):
try:
con = pool.get_connection()
con.start_transaction()
cursor=con.cursor()
sql="UPDATE t_news SET state=%s WHERe id=%s"
cursor.execute(sql,("已审批",id))
con.commit()
except Exception as e:
if "con" in dir():
con.rollback()
print(e)
finally:
if "con" in dir():
con.close()
#查询新闻列表
def search_list(self,page):
try:
con=pool.get_connection()
cursor=con.cursor()
sql="SELECT n.id,n.title,t.type,u.username "
"FROM t_news n JOIN t_type t ON n.type_id=t.id "
"JOIN t_user u ON n.editor_id=u.id "
"ORDER BY n.create_time DESC "
"LIMIT %s,%s"
cursor.execute(sql,((page-1)*10,10))
result=cursor.fetchall()
return result
except Exception as e:
print(e)
finally:
if "con" in dir():
con.close()
#查询新闻总页数
def search_count_page(self):
try:
con=pool.get_connection()
cursor=con.cursor()
sql="SELECt CEIL(COUNT(*)/10) FROM t_news"
cursor.execute(sql)
count_page=cursor.fetchone()[0]
return count_page
except Exception as e:
print(e)
finally:
if "con" in dir():
con.close()
#删除新闻
def delete_by_id(self,id):
try:
con = pool.get_connection()
con.start_transaction()
cursor=con.cursor()
sql="DELETE FROM t_news WHERe id=%s"
cursor.execute(sql,[id])
con.commit()
except Exception as e:
if "con" in dir():
con.rollback()
print(e)
finally:
if "con" in dir():
con.close()
四,创建role_dao.py定义查询角色sql内容
from db.mysql_db import pool
class RoleDao:
#查询角色列表
def search_list(self):
try:
con=pool.get_connection()
cursor=con.cursor()
sql="SELECt id,role FROM t_role"
cursor.execute(sql)
result=cursor.fetchall()
return result
except Exception as e:
print(e)
finally:
if "con" in dir():
con.close()
service文件
一,创建user_service.py定义用户python内容
from db.user_dao import UserDao
class UserService:
__user_dao = UserDao()
# 验证用户登陆
def login(self, username, password):
result = self.__user_dao.login(username, password)
return result
# 查询用户角色
def search_user_role(self, username):
role = self.__user_dao.search_user_role(username)
return role
# 添加记录
def insert(self, username, password, email, role_id):
self.__user_dao.insert(username, password, email, role_id)
# 查询用户总页数
def search_count_page(self):
count_page = self.__user_dao.search_count_page()
return count_page
# 查询用户分页记录
def search_list(self, page):
result = self.__user_dao.search_list(page)
return result
# 修改用户信息
def update(self, id, username, password, email, role_id):
self.__user_dao.update(id, username, password, email, role_id)
# 删除用户
def delete_by_id(self, id):
self.__user_dao.delete_by_id(id)
二,创建news_service.py定义新闻python内容
from db.news_dao import NewsDao
class NewsService:
__news_dao=NewsDao()
# 查询待审批新闻列表
def search_unreview_list(self,page):
result=self.__news_dao.search_unreview_list(page)
return result
# 查询待审批新闻的总页数
def search_unreview_count_page(self):
count_page=self.__news_dao.search_unreview_count_page()
return count_page
# 审批新闻
def update_unreview_news(self, id):
self.__news_dao.update_unreview_news(id)
#查询新闻列表
def search_list(self, page):
result=self.__news_dao.search_list(page)
return result
# 查询新闻总页数
def search_count_page(self):
count_page=self.__news_dao.search_count_page()
return count_page
# 删除新闻
def delete_by_id(self, id):
self.__news_dao.delete_by_id(id)
三,创建role_service.py定义角色python内容
from db.role_dao import RoleDao
class RoleService:
__role_dao=RoleDao()
# 查询角色列表
def search_list(self):
result=self.__role_dao.search_list()
return result
最终执行文件app.py
from colorama import Fore,Style,init
init()
from getpass import getpass
from service.user_service import UserService
from service.news_service import NewsService
from service.role_service import RoleService
import os
import sys
import time
__user_service=UserService()
__news_service=NewsService()
__role_service=RoleService()
while True:
os.system("cls")
print(Fore.LIGHTBLUE_EX,"nt==================")
print(Fore.LIGHTBLUE_EX,"nt欢迎使用新闻管理系统")
print(Fore.LIGHTBLUE_EX, "nt==================")
print(Fore.LIGHTGREEN_EX,"nt1.登陆系统")
print(Fore.LIGHTGREEN_EX,"nt2.退出系统")
print(Style.RESET_ALL)
opt=input("nt输入操作编号:")
if opt=="1":
username=input("nt用户名:")
password=getpass("nt密码:")
result=__user_service.login(username,password)
#登陆成功
if result==True:
#查询角色
role=__user_service.search_user_role(username)
while True:
os.system("cls")
if role=="新闻编辑":
print('test')
elif role=="管理员":
print(Fore.LIGHTGREEN_EX,"nt1.新闻管理")
print(Fore.LIGHTGREEN_EX, "nt2.用户管理")
print(Fore.LIGHTRED_EX, "ntback.退出登陆")
print(Fore.LIGHTRED_EX, "ntexit.退出系统")
print(Style.RESET_ALL)
opt = input("nt输入操作编号:")
if opt=="1":
while True:
os.system("cls")
print(Fore.LIGHTGREEN_EX, "nt1.审批新闻")
print(Fore.LIGHTGREEN_EX, "nt2.删除新闻")
print(Fore.LIGHTRED_EX, "ntback.返回上一层")
print(Style.RESET_ALL)
opt = input("nt输入操作编号:")
if opt=="1":
page=1
while True:
os.system("cls")
count_page=__news_service.search_unreview_count_page()
result=__news_service.search_unreview_list(page)
for index in range(len(result)):
one=result[index]
print(Fore.LIGHTBLUE_EX, "nt%dt%st%st%s"%(index+1,one[1],one[2],one[3]))
print(Fore.LIGHTBLUE_EX, "nt-------------------")
print(Fore.LIGHTBLUE_EX,"nt%d/%d"%(page,count_page))
print(Fore.LIGHTBLUE_EX, "nt-------------------")
print(Fore.LIGHTRED_EX, "ntback.返回上一层")
print(Fore.LIGHTRED_EX, "ntprev.上一页")
print(Fore.LIGHTRED_EX, "ntnext.下一页")
print(Style.RESET_ALL)
opt = input("nt输入操作编号:")
if opt=="back":
break
elif opt=="prev" and page>1:
page-=1
elif opt=="next" and page=1 and int(opt)<=10:
news_id=result[int(opt)-1][0]
__news_service.update_unreview_news(news_id)
elif opt=="2":
page=1
while True:
os.system("cls")
count_page=__news_service.search_count_page()
result=__news_service.search_list(page)
for index in range(len(result)):
one=result[index]
print(Fore.LIGHTBLUE_EX, "nt%dt%st%st%s"%(index+1,one[1],one[2],one[3]))
print(Fore.LIGHTBLUE_EX, "nt-------------------")
print(Fore.LIGHTBLUE_EX,"nt%d/%d"%(page,count_page))
print(Fore.LIGHTBLUE_EX, "nt-------------------")
print(Fore.LIGHTRED_EX, "ntback.返回上一层")
print(Fore.LIGHTRED_EX, "ntprev.上一页")
print(Fore.LIGHTRED_EX, "ntnext.下一页")
print(Style.RESET_ALL)
opt = input("nt输入操作编号:")
if opt=="back":
break
elif opt=="prev" and page>1:
page-=1
elif opt=="next" and page=1 and int(opt)<=10:
news_id=result[int(opt)-1][0]
__news_service.delete_by_id(news_id)
elif opt=="back":
break
elif opt=="2":
while True:
os.system("cls")
print(Fore.LIGHTGREEN_EX, "nt1.添加用户")
print(Fore.LIGHTGREEN_EX, "nt2.修改用户")
print(Fore.LIGHTGREEN_EX, "nt3.删除用户")
print(Fore.LIGHTRED_EX, "ntback.返回上一层")
print(Style.RESET_ALL)
opt = input("nt输入操作编号:")
if opt=="back":
break
elif opt=="1":
os.system("cls")
username=input("nt用户名:")
password = getpass("nt密码:")
repassword=getpass("nt重复密码:")
if password!=repassword:
print("nt两次密码不一致(3秒自动返回)")
time.sleep(3)
continue
email=input("nt邮箱:")
result=__role_service.search_list()
for index in range(len(result)):
one=result[index]
print(Fore.LIGHTBLUE_EX,"nt%d.%s"%(index+1,one[1]))
print(Style.RESET_ALL)
opt=input("nt角色编号:")
role_id=result[int(opt)-1][0]
__user_service.insert(username,password,email,role_id)
print("nt保存成功(3秒自动返回)")
time.sleep(3)
elif opt=="2":
page = 1
while True:
os.system("cls")
count_page = __user_service.search_count_page()
result = __user_service.search_list(page)
for index in range(len(result)):
one = result[index]
print(Fore.LIGHTBLUE_EX,
"nt%dt%st%s" % (index + 1, one[1], one[2]))
print(Fore.LIGHTBLUE_EX, "nt-------------------")
print(Fore.LIGHTBLUE_EX, "nt%d/%d" % (page, count_page))
print(Fore.LIGHTBLUE_EX, "nt-------------------")
print(Fore.LIGHTRED_EX, "ntback.返回上一层")
print(Fore.LIGHTRED_EX, "ntprev.上一页")
print(Fore.LIGHTRED_EX, "ntnext.下一页")
print(Style.RESET_ALL)
opt = input("nt输入操作编号:")
if opt == "back":
break
elif opt == "prev" and page > 1:
page -= 1
elif opt == "next" and page < count_page:
page += 1
elif int(opt) >= 1 and int(opt) <= 10:
os.system("cls")
user_id=result[int(opt)-1][0]
username = input("nt新用户名:")
password = getpass("nt新密码:")
repassword = getpass("nt再次输入密码:")
if password!=repassword:
print(Fore.LIGHTRED_EX,"nt两次密码不一致(3秒自动返回)")
print(Style.RESET_ALL)
time.sleep(3)
break
email = input("nt新邮箱:")
result = __role_service.search_list()
for index in range(len(result)):
one = result[index]
print(Fore.LIGHTBLUE_EX, "nt%d.%s" % (index + 1, one[1]))
print(Style.RESET_ALL)
opt = input("nt角色编号:")
role_id = result[int(opt) - 1][0]
opt=input("nt是否保存(Y/N)")
if opt=="Y" or opt=="y":
__user_service.update(user_id,username,password,email,role_id)
print("nt保存成功(3秒自动返回)")
time.sleep(3)
elif opt=="3":
page = 1
while True:
os.system("cls")
count_page = __user_service.search_count_page()
result = __user_service.search_list(page)
for index in range(len(result)):
one = result[index]
print(Fore.LIGHTBLUE_EX,
"nt%dt%st%s" % (index + 1, one[1], one[2]))
print(Fore.LIGHTBLUE_EX, "nt-------------------")
print(Fore.LIGHTBLUE_EX, "nt%d/%d" % (page, count_page))
print(Fore.LIGHTBLUE_EX, "nt-------------------")
print(Fore.LIGHTRED_EX, "ntback.返回上一层")
print(Fore.LIGHTRED_EX, "ntprev.上一页")
print(Fore.LIGHTRED_EX, "ntnext.下一页")
print(Style.RESET_ALL)
opt = input("nt输入操作编号:")
if opt == "back":
break
elif opt == "prev" and page > 1:
page -= 1
elif opt == "next" and page < count_page:
page += 1
elif int(opt) >= 1 and int(opt) <= 10:
os.system("cls")
user_id=result[int(opt)-1][0]
__user_service.delete_by_id(user_id)
print("nt删除成功(3秒自动返回)")
time.sleep(3)
if opt=='back':
break;
elif opt=='exit':
sys.exit(0)
else:
print("nt登录失败(3秒自动返回)")
time.sleep(3)
elif opt=="2":
sys.exit(0)
测试
…



