直接上效果图
提交之后可以看到
点击SQL详情
点击处理详情记录了inception审核的内容,然后可以点击手动,执行,打回功能,回滚暂时没写
结果如上都已经很清楚了,那么下面我们看实现过程,因为这是个人自己玩的项目,并没有上线,所以就本着简单的原则设计了下
django涉及的models几张表如下
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import models
from utils.basemodels import baseModel
from crm.models import User
# Create your models here.
class Instance(models.Model):
ENVS=(('dev','开发'),('test','测试'),('prod','生产'))
name=models.CharField(max_length=100,verbose_name='实例名称')
host=models.CharField(max_length=100,verbose_name='实例对应的IP地址')
port=models.IntegerField(verbose_name='实例端口')
create_user=models.ForeignKey(User,verbose_name='实例创建用户',on_delete=models.SET_NULL,blank=True,null=True)
dbrole=models.CharField(max_length=100,choices=((1,'master'),(0,'slave')),verbose_name='主从角色关系')
cluster=models.IntegerField(choices=((0,'单实例'),(1,'集群')),verbose_name='是否为集群0:否 1:是')
env=models.CharField(max_length=100,choices=ENVS,verbose_name='实例环境')
createtime = models.DateTimeField(auto_now_add=True, verbose_name='创建时间')
updatetime = models.DateTimeField(auto_now=True, verbose_name='修改时间')
remark = models.TextField(default='', null=True, blank=True, verbose_name='备注')
def __str__(self):
return self.name
class meta:
db_table='instance'
verbose_name='MySQL实例表'
verbose_name_plural=verbose_name
managed = True
permissions = (('installsinglemysql','安装单实例MySQL'),
('installmultimysql','安装多实例MySQL'),
)
class DbInfo(models.Model):
ENVS = (('dev', '开发'), ('test', '测试'), ('prod', '生产'))
name=models.CharField(max_length=100,verbose_name='实例名称')
host = models.CharField(max_length=100, verbose_name='实例对应的IP地址')
port = models.IntegerField(verbose_name='实例端口')
dbname=models.CharField(max_length=100,verbose_name='数据库名')
dbrole = models.CharField(max_length=100,choices=((1,'master'),(0,'slave')), verbose_name='主从角色关系')
env = models.CharField(max_length=100,choices=ENVS, verbose_name='实例环境')
createtime = models.DateTimeField(auto_now_add=True, verbose_name='创建时间')
def __str__(self):
return '%s-%s-%s-%s'%(self.get_env_display(),self.dbname,self.get_dbrole_display(),self.host)
class meta:
db_table='dbinfo'
verbose_name='数据库表'
verbose_name_plural=verbose_name
def get_env(self):
return self.get_env_display()
from django.contrib.auth.models import Permission
class DbAssign(models.Model):
ENVS = (('dev', '开发'), ('test', '测试'), ('prod', '生产'))
name = models.CharField(max_length=100, verbose_name='实例名称')
host = models.CharField(max_length=100, verbose_name='实例对应的IP地址')
port = models.IntegerField(verbose_name='实例端口')
dbname = models.CharField(max_length=100,verbose_name='数据库名')
dbrole = models.CharField(max_length=100,choices=(('master', '主'), ('slave', '从')), verbose_name='主从角色关系')
env = models.CharField(max_length=100,choices=ENVS, verbose_name='实例环境')
group=models.ManyToManyField(Permission,verbose_name='可使用此数据库的组')
def __str__(self):
return self.dbname
class meta:
db_table='dbassign'
verbose_name='数据库权限分配'
verbose_name_plural=verbose_name
class InceptSql(models.Model):
STATUS = (
(-3, u'已打回'),
(-2, u'已回滚'),
(-1, u'待执行'),
(0, u'已执行'),
(1, u'执行失败'),
(2, u'已手工执行'),
)
ENVS = (('dev', '开发'), ('test', '测试'), ('prod', '生产'))
db = models.ForeignKey(DbInfo, on_delete=models.CASCADE)
commiter = models.ForeignKey(User,on_delete=models.CASCADE,verbose_name='代码提交人',related_name='commiter')
sql_content = models.TextField(verbose_name='提交的SQL代码')
env = models.CharField(max_length=8, choices=ENVS,verbose_name='SQL应用环境')
treater = models.ForeignKey(User,null=True,blank=True,on_delete=models.CASCADE,verbose_name='代码执行人',related_name='treater')
status = models.IntegerField(default=-1, choices=STATUS)
execute_errors = models.TextField(default='', null=True, blank=True)
exe_affected_rows = models.CharField(max_length=10, null=True, blank=True)
roll_affected_rows = models.CharField(max_length=10, null=True, blank=True)
rollback_opid = models.TextField(null=True, blank=True)
rollback_db = models.CharField(max_length=100, null=True, blank=True)
inception_detail = models.TextField(default='', null=True, blank=True, verbose_name='inception详情')
createtime = models.DateTimeField(auto_now_add=True, verbose_name='创建时间')
updatetime = models.DateTimeField(auto_now=True, verbose_name='修改时间')
remark = models.TextField(default='', null=True, blank=True, verbose_name='备注')
class meta:
db_table = 'inceptsql'
verbose_name = '数据库Inception'
verbose_name_plural = verbose_name
managed = True
permissions = (('sqlcheck', 'MySQL脚本上线审核'),
('sqlexecute', 'MySQL脚本普通执行'),
('pt-osc', 'pt-osc执行'),
('gh-ost', 'gh-ost执行'),
('sqlrollback', 'MySQL脚本回退'),
('sqlmanual', 'MySQL脚本手工执行'),)
def __str__(self):
return '%s-%s-%s'%(self.id,self.get_env_display(),self.db)
class InceptStep(models.Model):
work_order = models.ForeignKey(InceptSql, on_delete=models.CASCADE)
user = models.ForeignKey(User, null=True, blank=True, on_delete=models.CASCADE,verbose_name='工单处理人员')
createtime = models.DateTimeField(auto_now_add=True, verbose_name='创建时间')
updatetime = models.DateTimeField(auto_now=True, verbose_name='修改时间')
remark = models.TextField(default='', null=True, blank=True, verbose_name='备注')
目前dbassign这张表没有使用
数据库inception审核需要安装inception,但是由于inception需要安装很多组件,我安装了goInception,很好移植安装,安装完成之后按照配置要求启动goInception,链接
goinception
vue前端展示页面code:
在这里插入代码片{{scope.row.envdisplay}} SQL详情 {{scope.row.dbinfo}}{{ scope.row.dbinfo.substr(0,10)+'...' }}流程 手动 执行 打回 回滚 处理详情 用户:{{wf.username}}-{{ wf.remark }} [{{icr}}]
{{sqlcontent}} {{rollbackres}}
sqlcheck.vue:
请输入上线要上线的SQL语句:
选择执行条件:
审核 重置 SQL语句:{{cfr.sql}}-----报错信息:{{ cfr.error_message }}
goinception.js
import axios from '@/config/httpConfig'
export function InceptionCheck(data){
return axios.post('/inception/check/',data)
}
export function InceptionExecute(data){
return axios.post('/inception/'+data.id+'/execute/',data)
}
export function InceptionRollback(data){
return axios.post('/inception/'+data.id+'/rollback/',data)
}
export function InceptionManual(data){
return axios.post('/inception/'+data.id+'/manual/',data)
}
export function Inception(){
return axios.get('/inception/')
}
export function InceptionDetail(id){
return axios.get('/inception/'+id+'/')
}
export function InceptionStepDetail(data){
return axios.post('/inceptstep/workflow/',data)
}
instance.js
import axios from '@/config/httpConfig'
export function InstanceList(){
return axios.get('/instance/')
}
export function InstanceDetail(id){
return axios.get('/instance/'+id+'/')
}
export function InstanceAdd(data){
return axios.post('/instance/',data)
}
export function InstanceDel(id){
return axios.delete('/instance/'+id+'/')
}
export function InstancePut(data){
return axios.put('/instance/',data)
}
export function installsinglemysql(data){
return axios.post('/singleinstall/',data)
}
export function installmultimysql(data){
return axios.post('/multiinstall/',data)
}
后端django实现代码:
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.http import JsonResponse
from django.shortcuts import render
# Create your views here.
from sqlmng.serializers import InstanceModelSerializer,DbAssignModelSerializer,DbInfoModelSerializer,InceptSqlModelSerializer,InceptStepModelSerializer
from sqlmng.models import Instance,DbInfo,DbAssign,InceptSql,InceptStep
from utils.dbtools.mysql_single_install import full_single_install
from utils.dbtools.mysql_multi_install import full_multil_install
from utils.dbtools.MySQLdb import MysqlHelper
from crm.models import User
from django.db.models.signals import post_save
from django.dispatch import receiver
import configparser
cnf = configparser.ConfigParser()
from restful_test.settings import INCEPTION_DIR
cnf.read(INCEPTION_DIR)
inception_host=cnf.get('inception','inception_host')
inception_backup_host=cnf.get('inception','inception_backup_host')
inception_backup_port=cnf.get('inception','inception_backup_port')
inception_backup_user=cnf.get('inception','inception_backup_user')
inception_backup_password=cnf.get('inception','inception_backup_password')
from django.core.exceptions import PermissionDenied
from rest_framework.response import Response
from utils.jwt_auth import parse_payload
from rest_framework.decorators import action
from rest_framework.viewsets import ModelViewSet
from rest_framework.views import APIView
import pymysql
class InstanceModelViewSet(ModelViewSet):
queryset = Instance.objects.all()
serializer_class = InstanceModelSerializer
class DbInfoModelViewSet(ModelViewSet):
queryset = DbInfo.objects.all()
serializer_class = DbInfoModelSerializer
filter_fields = ('name', 'host','env')
import json
class InceptSqlModelViewSet(ModelViewSet):
queryset =InceptSql.objects.all()
serializer_class = InceptSqlModelSerializer
@action(detail=False,methods=['POST'])
def check(self,request):
data=request.data
print("data的值是",data)
host=data['dbtag']['host']
port=data['dbtag']['port']
dbname = data['dbtag']['dbname']
db=data['dbtag']['db']
env=''
if data['env']=='开发':
env='dev'
if data['env']=='测试':
env='test'
if data['env']=='生产':
env='prod'
comment=data['comment']
userid=data['userid']
username=User.objects.get(id=userid).username
# print('username的值是:',username)
sqlcontent=data['sqlcontent']
sql = '''
inception_magic_start;
use {};
{}
inception_magic_commit;'''.format(host,port,dbname,sqlcontent)
conn = pymysql.connect(host='%s'%(inception_host), user='', passwd='',
db='', port=4000, charset="utf8mb4")
cur = conn.cursor()
cur.execute(sql)
result = cur.fetchall()
print("=========================")
res_list = [dict(zip([i[0] for i in cur.description], j)) for j in result]
print("+++++++++++++++++++++++++")
print(res_list)
errlen=len([i for i in res_list if i['error_level'] != 0])
cur.close()
conn.close()
if errlen==0:
sdata={'db':db,'commiter':userid,'sql_content':sqlcontent,
'env':env,'remark':comment,'inception_detail':json.dumps(res_list)}
serializer =InceptSqlModelSerializer(data=sdata)
# 判断发序列是否正确
res = serializer.is_valid()
if res:
serializer.save()
return Response({'status': 0})
else:
print(serializer.errors)
return Response({'status': 1})
else:
return Response({'status':1,'failreason':res_list})
@action(detail=True, methods=['POST'])
def execute(self, request,pk):
'''
检查用户是否有对inception执行的权限
'''
if request.user.is_anonymous:
token = request.meta.get('HTTP_AUTHORIZATION', '')
user = parse_payload(token)
request.user = user
res = request.user.roles.values_list('permissions__permission__content_type__app_label',
'permissions__permission__codename')
# print(res)
perms_list = ['.'.join(i) for i in res if i[0] != None]
perms = ('sqlmng.sqlexecute',)
if set(perms) <= set(perms_list):
obj = self.get_object()
print(pk, obj.sql_content, obj.pk)
# return Response("执行脚本成功")
data = request.data
userid = data['userid']
remark = data['remark']
# print(data)
sql = '''
inception_magic_start;
use {};
{}
inception_magic_commit;'''.format(obj.db.host, obj.db.port, obj.db.dbname, obj.sql_content)
conn = pymysql.connect(host='%s'%(inception_host), user='', passwd='',
db='', port=4000, charset="utf8mb4")
cur = conn.cursor()
cur.execute(sql)
result = cur.fetchall()
print("=========================")
res_list = [dict(zip([i[0] for i in cur.description], j)) for j in result]
print("+++++++++++++++++++++++++")
print(res_list)
errlen = len([i for i in res_list if i['error_level'] != 0])
cur.close()
conn.close()
if errlen == 0:
sdata = {'work_order': obj.pk, 'user': userid, 'remark': remark}
serializer = InceptStepModelSerializer(data=sdata)
# 判断发序列是否正确
res = serializer.is_valid()
if res:
serializer.save()
obj.inception_detail = json.dumps(res_list)
obj.treater = User.objects.get(id=userid)
obj.status = 0
obj.save()
return Response({'status': 0})
else:
print(serializer.errors)
obj.treater = User.objects.get(id=userid)
obj.inception_detail = json.dumps(res_list)
obj.status = 1
obj.save()
return Response({'status': 1})
else:
return Response({'status': 1})
else:
raise PermissionDenied
@action(detail=True, methods=['POST'])
def rollback(self, request,pk):
data = request.data
print(type(data['inception_detail']))
try:
backup_list = [dict(zip(['sequence','backup_dbname'],[j['sequence'],j['backup_dbname']])) for j in data['inception_detail'] if j['backup_dbname']]
dbname=backup_list[0]['backup_dbname']
condition=[(i['sequence']) for i in backup_list]
print(condition)
mydb=MysqlHelper(inception_backup_host,inception_backup_user,inception_backup_password,dbname,inception_backup_port)
sql='''
select opid_time,tablename from $_$inception_backup_information$_$;
'''
res=mydb.get_all(sql)
res_filter=[i for i in res if i[0] in condition]
print(res_filter)
sqlset=['''
select b.rollback_statement from $_$inception_backup_information$_$ a,%s b where a.opid_time='%s' and a.opid_time=b.opid_time and a.tablename='%s';
'''%(j[1],j[0],j[1]) for j in res_filter ]
resset=[mydb.get_all(stam) for stam in sqlset]
print(len(resset))
sqlres=''
for i in resset:
for j in i:
sqlres+='n'
sqlres+=j[0]
return Response(sqlres)
except Exception as e:
return Response("回滚语句生成失败,请检查是否进行备份,联系DBA大数据开发组")
@action(detail=True, methods=['POST'])
def manual(self, request,pk):
if request.user.is_anonymous:
token = request.meta.get('HTTP_AUTHORIZATION', '')
user = parse_payload(token)
request.user = user
res = request.user.roles.values_list('permissions__permission__content_type__app_label',
'permissions__permission__codename')
# print(res)
perms_list = ['.'.join(i) for i in res if i[0] != None]
perms = ('sqlmng.sqlmanual',)
if set(perms) <= set(perms_list):
data = request.data
print(data)
obj = self.get_object()
userid = data['userid']
remark = data['remark']
sdata = {'work_order': obj.pk, 'user': userid, 'remark': remark}
serializer = InceptStepModelSerializer(data=sdata)
obj.treater = User.objects.get(id=userid)
obj.status = 2
obj.save()
res = serializer.is_valid()
if res:
serializer.save()
return Response({'status': 0})
else:
return Response({'status': 1})
else:
raise PermissionDenied
@receiver(post_save,sender=InceptSql)
def inceptionflow_handler(sender,**kwargs):
work_order=kwargs.get('instance').id
user=kwargs.get('instance').commiter.id
treater = kwargs.get('instance').treater
remark = kwargs.get('instance').remark
if treater:
sdata={'work_order':work_order,'user':treater,'remark':remark}
else:
sdata={'work_order': work_order, 'user': user, 'remark': remark}
serializer=InceptStepModelSerializer(data=sdata)
if serializer.is_valid():
serializer.save()
else:
print(serializer.errors)
# print(kwargs.get('instance').id,kwargs.get('instance').sql_content)
class InceptStepModelViewSet(ModelViewSet):
queryset = InceptStep.objects.all()
serializer_class = InceptStepModelSerializer
@action(detail=False, methods=['POST'])
def workflow(self,request):
data=request.data
print(data)
work_order=data['work_order']
res=self.get_queryset().filter(work_order=work_order).order_by('-createtime')
ser=InceptStepModelSerializer(instance=res,many=True)
return Response(ser.data)
# return JsonResponse(json.dumps(list(res.values()),cls=DateEncoder),safe=False)
class DbAssignModelViewSet(ModelViewSet):
queryset = DbAssign.objects.all()
serializer_class = DbAssignModelSerializer
class SingleInstanceInstall(APIView):
permission_classes = ()
def post(self,request):
if request.user.is_anonymous:
token = request.meta.get('HTTP_AUTHORIZATION', '')
user = parse_payload(token)
request.user = user
res = request.user.roles.values_list('permissions__permission__content_type__app_label',
'permissions__permission__codename')
# print(res)
perms_list = ['.'.join(i) for i in res if i[0] != None]
perms = ('sqlmng.installsinglemysql',)
if set(perms) <= set(perms_list):
res=request.data
print('res的结果结果集是:',res)
full_single_install(res['host'],res['port'])
# print(res)
return Response(res)
else:
raise PermissionDenied
def get(self,request):
print(request.user)
print(request.user.has_perm('sqlmng.installsinglemysql'))
perms=('sqlmng.installsinglemysql',)
res = request.user.roles.values_list('permissions__permission__content_type__app_label',
'permissions__permission__codename')
# print(res)
perms_list = ['.'.join(i) for i in res if i[0] != None]
if set(perms) <= set(perms_list):
return Response("可以调用installsinglemysql方法")
# In case the 403 handler should be called raise the exception
else:
raise PermissionDenied
class MultiInstanceInstall(APIView):
permission_classes = ()
def post(self,request):
if request.user.is_anonymous:
token = request.meta.get('HTTP_AUTHORIZATION', '')
user = parse_payload(token)
request.user = user
res = request.user.roles.values_list('permissions__permission__content_type__app_label',
'permissions__permission__codename')
# print(res)
perms_list = ['.'.join(i) for i in res if i[0] != None]
perms = ('sqlmng.installmultimysql',)
if set(perms) <= set(perms_list):
res=request.data
print('res的结果结果集是:',res)
full_multil_install(res['master'],res['slave'],res['port'])
# print(res)
return Response(res)
else:
raise PermissionDenied
def get(self,request):
print(request.user)
print(request.user.has_perm('sqlmng.installmultimysql'))
perms=('sqlmng.installmultimysql',)
res = request.user.roles.values_list('permissions__permission__content_type__app_label',
'permissions__permission__codename')
# print(res)
perms_list = ['.'.join(i) for i in res if i[0] != None]
if set(perms) <= set(perms_list):
return Response("可以调用installmultimysql方法")
# In case the 403 handler should be called raise the exception
else:
raise PermissionDenied
基于restframework的基础上我们需要自己定义一些url,作为api接口是用,action价值体现在这,可以基于detail也可以不基于,receiver可以理解成触发器,触发某种事件的时候会自动执行
url.py
from rest_framework import routers
router=routers.DefaultRouter()
from .views import DbInfoModelViewSet,DbAssignModelViewSet,InstanceModelViewSet,SingleInstanceInstall,MultiInstanceInstall,InceptSqlModelViewSet,InceptStepModelViewSet
router.register('dbinfo',DbInfoModelViewSet)
router.register('dbassign',DbAssignModelViewSet)
router.register('instance',InstanceModelViewSet)
router.register('inception',InceptSqlModelViewSet)
router.register('inceptstep',InceptStepModelViewSet)
from django.conf.urls import url,include
urlpatterns = [
url(r'^',include(router.urls)),
url('^singleinstall',SingleInstanceInstall.as_view()),
url('^multiinstall', MultiInstanceInstall.as_view())
]
以上为这个项目实施的关键代码



