栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

MySQL审核神器Inception

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

MySQL审核神器Inception

直接上效果图

提交之后可以看到

点击SQL详情

点击处理详情记录了inception审核的内容,然后可以点击手动,执行,打回功能,回滚暂时没写

结果如上都已经很清楚了,那么下面我们看实现过程,因为这是个人自己玩的项目,并没有上线,所以就本着简单的原则设计了下
django涉及的models几张表如下

# -*- coding: utf-8 -*-
from __future__ import unicode_literals

from django.db import models
from utils.basemodels import  baseModel
from crm.models import  User

# Create your models here.

class Instance(models.Model):
    ENVS=(('dev','开发'),('test','测试'),('prod','生产'))
    name=models.CharField(max_length=100,verbose_name='实例名称')
    host=models.CharField(max_length=100,verbose_name='实例对应的IP地址')
    port=models.IntegerField(verbose_name='实例端口')
    create_user=models.ForeignKey(User,verbose_name='实例创建用户',on_delete=models.SET_NULL,blank=True,null=True)
    dbrole=models.CharField(max_length=100,choices=((1,'master'),(0,'slave')),verbose_name='主从角色关系')
    cluster=models.IntegerField(choices=((0,'单实例'),(1,'集群')),verbose_name='是否为集群0:否 1:是')
    env=models.CharField(max_length=100,choices=ENVS,verbose_name='实例环境')
    createtime = models.DateTimeField(auto_now_add=True, verbose_name='创建时间')
    updatetime = models.DateTimeField(auto_now=True, verbose_name='修改时间')
    remark = models.TextField(default='', null=True, blank=True, verbose_name='备注')
    def __str__(self):
        return  self.name
    class meta:
        db_table='instance'
        verbose_name='MySQL实例表'
        verbose_name_plural=verbose_name
        managed = True
        permissions = (('installsinglemysql','安装单实例MySQL'),
                       ('installmultimysql','安装多实例MySQL'),
                       )

class DbInfo(models.Model):
    ENVS = (('dev', '开发'), ('test', '测试'), ('prod', '生产'))
    name=models.CharField(max_length=100,verbose_name='实例名称')
    host = models.CharField(max_length=100, verbose_name='实例对应的IP地址')
    port = models.IntegerField(verbose_name='实例端口')
    dbname=models.CharField(max_length=100,verbose_name='数据库名')
    dbrole = models.CharField(max_length=100,choices=((1,'master'),(0,'slave')), verbose_name='主从角色关系')
    env = models.CharField(max_length=100,choices=ENVS, verbose_name='实例环境')
    createtime = models.DateTimeField(auto_now_add=True, verbose_name='创建时间')
    def __str__(self):
        return  '%s-%s-%s-%s'%(self.get_env_display(),self.dbname,self.get_dbrole_display(),self.host)
    class meta:
        db_table='dbinfo'
        verbose_name='数据库表'
        verbose_name_plural=verbose_name
    def get_env(self):
        return  self.get_env_display()



from django.contrib.auth.models import  Permission
class DbAssign(models.Model):
    ENVS = (('dev', '开发'), ('test', '测试'), ('prod', '生产'))
    name = models.CharField(max_length=100, verbose_name='实例名称')
    host = models.CharField(max_length=100, verbose_name='实例对应的IP地址')
    port = models.IntegerField(verbose_name='实例端口')
    dbname = models.CharField(max_length=100,verbose_name='数据库名')
    dbrole = models.CharField(max_length=100,choices=(('master', '主'), ('slave', '从')), verbose_name='主从角色关系')
    env = models.CharField(max_length=100,choices=ENVS, verbose_name='实例环境')
    group=models.ManyToManyField(Permission,verbose_name='可使用此数据库的组')
    def __str__(self):
        return self.dbname
    class meta:
        db_table='dbassign'
        verbose_name='数据库权限分配'
        verbose_name_plural=verbose_name


class InceptSql(models.Model):
    STATUS = (
        (-3, u'已打回'),
        (-2, u'已回滚'),
        (-1, u'待执行'),
        (0, u'已执行'),
        (1, u'执行失败'),
        (2, u'已手工执行'),
    )
    ENVS = (('dev', '开发'), ('test', '测试'), ('prod', '生产'))
    db = models.ForeignKey(DbInfo, on_delete=models.CASCADE)
    commiter = models.ForeignKey(User,on_delete=models.CASCADE,verbose_name='代码提交人',related_name='commiter')
    sql_content = models.TextField(verbose_name='提交的SQL代码')
    env = models.CharField(max_length=8, choices=ENVS,verbose_name='SQL应用环境')
    treater = models.ForeignKey(User,null=True,blank=True,on_delete=models.CASCADE,verbose_name='代码执行人',related_name='treater')
    status = models.IntegerField(default=-1, choices=STATUS)
    execute_errors = models.TextField(default='', null=True, blank=True)
    exe_affected_rows = models.CharField(max_length=10, null=True, blank=True)
    roll_affected_rows = models.CharField(max_length=10, null=True, blank=True)
    rollback_opid = models.TextField(null=True, blank=True)
    rollback_db = models.CharField(max_length=100, null=True, blank=True)
    inception_detail = models.TextField(default='', null=True, blank=True, verbose_name='inception详情')
    createtime = models.DateTimeField(auto_now_add=True, verbose_name='创建时间')
    updatetime = models.DateTimeField(auto_now=True, verbose_name='修改时间')
    remark = models.TextField(default='', null=True, blank=True, verbose_name='备注')
    class meta:
        db_table = 'inceptsql'
        verbose_name = '数据库Inception'
        verbose_name_plural = verbose_name
        managed = True
        permissions = (('sqlcheck', 'MySQL脚本上线审核'),
                       ('sqlexecute', 'MySQL脚本普通执行'),
                       ('pt-osc', 'pt-osc执行'),
                       ('gh-ost', 'gh-ost执行'),
                       ('sqlrollback', 'MySQL脚本回退'),
                       ('sqlmanual', 'MySQL脚本手工执行'),)
    def __str__(self):
        return  '%s-%s-%s'%(self.id,self.get_env_display(),self.db)

class InceptStep(models.Model):
    work_order = models.ForeignKey(InceptSql, on_delete=models.CASCADE)
    user = models.ForeignKey(User, null=True, blank=True, on_delete=models.CASCADE,verbose_name='工单处理人员')
    createtime = models.DateTimeField(auto_now_add=True, verbose_name='创建时间')
    updatetime = models.DateTimeField(auto_now=True, verbose_name='修改时间')
    remark = models.TextField(default='', null=True, blank=True, verbose_name='备注')

目前dbassign这张表没有使用
数据库inception审核需要安装inception,但是由于inception需要安装很多组件,我安装了goInception,很好移植安装,安装完成之后按照配置要求启动goInception,链接
goinception

vue前端展示页面code:

在这里插入代码片


sqlcheck.vue:






goinception.js

import axios from '@/config/httpConfig'

export function InceptionCheck(data){
    return axios.post('/inception/check/',data)
}

export function InceptionExecute(data){
    return axios.post('/inception/'+data.id+'/execute/',data)
}

export function InceptionRollback(data){
    return axios.post('/inception/'+data.id+'/rollback/',data)
}

export function InceptionManual(data){
    return axios.post('/inception/'+data.id+'/manual/',data)
}


export function Inception(){
    return axios.get('/inception/')
}

export function InceptionDetail(id){
    return axios.get('/inception/'+id+'/')
}

export function InceptionStepDetail(data){
    return axios.post('/inceptstep/workflow/',data)
}


instance.js

import axios from '@/config/httpConfig'

export function InstanceList(){
    return axios.get('/instance/')
}

export function InstanceDetail(id){
    return axios.get('/instance/'+id+'/')
}

export function InstanceAdd(data){
    return axios.post('/instance/',data)
}
export function InstanceDel(id){
    return axios.delete('/instance/'+id+'/')
}
export function InstancePut(data){
    return axios.put('/instance/',data)
}

export function installsinglemysql(data){
    return axios.post('/singleinstall/',data)
}

export function installmultimysql(data){
    return axios.post('/multiinstall/',data)
}

后端django实现代码:

# -*- coding: utf-8 -*-
from __future__ import unicode_literals

from django.http import JsonResponse
from django.shortcuts import render

# Create your views here.

from sqlmng.serializers import  InstanceModelSerializer,DbAssignModelSerializer,DbInfoModelSerializer,InceptSqlModelSerializer,InceptStepModelSerializer
from sqlmng.models import Instance,DbInfo,DbAssign,InceptSql,InceptStep
from utils.dbtools.mysql_single_install import full_single_install
from utils.dbtools.mysql_multi_install import  full_multil_install
from utils.dbtools.MySQLdb import MysqlHelper
from crm.models import  User
from django.db.models.signals import post_save
from django.dispatch import receiver
import configparser

cnf = configparser.ConfigParser()
from  restful_test.settings import INCEPTION_DIR
cnf.read(INCEPTION_DIR)
inception_host=cnf.get('inception','inception_host')
inception_backup_host=cnf.get('inception','inception_backup_host')
inception_backup_port=cnf.get('inception','inception_backup_port')
inception_backup_user=cnf.get('inception','inception_backup_user')
inception_backup_password=cnf.get('inception','inception_backup_password')
from django.core.exceptions import PermissionDenied

from rest_framework.response import Response
from utils.jwt_auth import  parse_payload
from rest_framework.decorators import action

from rest_framework.viewsets import  ModelViewSet
from rest_framework.views import  APIView

import pymysql

class InstanceModelViewSet(ModelViewSet):
    queryset = Instance.objects.all()
    serializer_class = InstanceModelSerializer

class DbInfoModelViewSet(ModelViewSet):
    queryset = DbInfo.objects.all()
    serializer_class = DbInfoModelSerializer
    filter_fields = ('name', 'host','env')

import  json
class InceptSqlModelViewSet(ModelViewSet):
    queryset =InceptSql.objects.all()
    serializer_class = InceptSqlModelSerializer
    @action(detail=False,methods=['POST'])
    def check(self,request):
        data=request.data
        print("data的值是",data)
        host=data['dbtag']['host']
        port=data['dbtag']['port']
        dbname = data['dbtag']['dbname']
        db=data['dbtag']['db']
        env=''
        if data['env']=='开发':
            env='dev'
        if data['env']=='测试':
            env='test'
        if data['env']=='生产':
            env='prod'
        comment=data['comment']
        userid=data['userid']
        username=User.objects.get(id=userid).username
        # print('username的值是:',username)
        sqlcontent=data['sqlcontent']
        sql = '''
        inception_magic_start;
        use {};
       {}
        inception_magic_commit;'''.format(host,port,dbname,sqlcontent)

        conn = pymysql.connect(host='%s'%(inception_host), user='', passwd='',
                               db='', port=4000, charset="utf8mb4")
        cur = conn.cursor()
        cur.execute(sql)
        result = cur.fetchall()
        print("=========================")
        res_list = [dict(zip([i[0] for i in cur.description], j)) for j in result]
        print("+++++++++++++++++++++++++")
        print(res_list)
        errlen=len([i for i in res_list if i['error_level'] != 0])
        cur.close()
        conn.close()
        if errlen==0:
            sdata={'db':db,'commiter':userid,'sql_content':sqlcontent,
                   'env':env,'remark':comment,'inception_detail':json.dumps(res_list)}
            serializer =InceptSqlModelSerializer(data=sdata)
            # 判断发序列是否正确
            res = serializer.is_valid()
            if res:
                serializer.save()
                return Response({'status': 0})
            else:
                print(serializer.errors)
                return Response({'status': 1})
        else:
            return Response({'status':1,'failreason':res_list})

    @action(detail=True, methods=['POST'])
    def execute(self, request,pk):
        '''
        检查用户是否有对inception执行的权限
        '''
        if request.user.is_anonymous:
            token = request.meta.get('HTTP_AUTHORIZATION', '')
            user = parse_payload(token)
            request.user = user
        res = request.user.roles.values_list('permissions__permission__content_type__app_label',
                                             'permissions__permission__codename')
        # print(res)
        perms_list = ['.'.join(i) for i in res if i[0] != None]
        perms = ('sqlmng.sqlexecute',)
        if set(perms) <= set(perms_list):
            obj = self.get_object()
            print(pk, obj.sql_content, obj.pk)
            # return Response("执行脚本成功")
            data = request.data
            userid = data['userid']
            remark = data['remark']
            # print(data)
            sql = '''
                           inception_magic_start;
                           use {};
                          {}
                           inception_magic_commit;'''.format(obj.db.host, obj.db.port, obj.db.dbname, obj.sql_content)

            conn = pymysql.connect(host='%s'%(inception_host), user='', passwd='',
                                   db='', port=4000, charset="utf8mb4")
            cur = conn.cursor()
            cur.execute(sql)
            result = cur.fetchall()
            print("=========================")
            res_list = [dict(zip([i[0] for i in cur.description], j)) for j in result]
            print("+++++++++++++++++++++++++")
            print(res_list)
            errlen = len([i for i in res_list if i['error_level'] != 0])
            cur.close()
            conn.close()
            if errlen == 0:
                sdata = {'work_order': obj.pk, 'user': userid, 'remark': remark}
                serializer = InceptStepModelSerializer(data=sdata)
                # 判断发序列是否正确
                res = serializer.is_valid()
                if res:
                    serializer.save()
                    obj.inception_detail = json.dumps(res_list)
                    obj.treater = User.objects.get(id=userid)
                    obj.status = 0
                    obj.save()
                    return Response({'status': 0})
                else:
                    print(serializer.errors)
                    obj.treater = User.objects.get(id=userid)
                    obj.inception_detail = json.dumps(res_list)
                    obj.status = 1
                    obj.save()
                    return Response({'status': 1})
            else:
                return Response({'status': 1})
        else:
            raise PermissionDenied


    @action(detail=True, methods=['POST'])
    def rollback(self, request,pk):
        data = request.data
        print(type(data['inception_detail']))
        try:
            backup_list = [dict(zip(['sequence','backup_dbname'],[j['sequence'],j['backup_dbname']])) for j in data['inception_detail'] if j['backup_dbname']]
            dbname=backup_list[0]['backup_dbname']
            condition=[(i['sequence']) for i in backup_list]
            print(condition)
            mydb=MysqlHelper(inception_backup_host,inception_backup_user,inception_backup_password,dbname,inception_backup_port)
            sql='''
            select opid_time,tablename from $_$inception_backup_information$_$;
            '''
            res=mydb.get_all(sql)
            res_filter=[i for i in res if i[0] in condition]
            print(res_filter)
            sqlset=['''
                select b.rollback_statement  from $_$inception_backup_information$_$ a,%s b where a.opid_time='%s' and a.opid_time=b.opid_time and a.tablename='%s';
                '''%(j[1],j[0],j[1])  for j in res_filter ]
            resset=[mydb.get_all(stam) for stam in  sqlset]
            print(len(resset))
            sqlres=''
            for i in resset:
                for j in i:
                    sqlres+='n'
                    sqlres+=j[0]

            return Response(sqlres)
        except Exception as e:
            return Response("回滚语句生成失败,请检查是否进行备份,联系DBA大数据开发组")

    @action(detail=True, methods=['POST'])
    def manual(self, request,pk):
        if request.user.is_anonymous:
            token = request.meta.get('HTTP_AUTHORIZATION', '')
            user = parse_payload(token)
            request.user = user
        res = request.user.roles.values_list('permissions__permission__content_type__app_label',
                                             'permissions__permission__codename')
        # print(res)
        perms_list = ['.'.join(i) for i in res if i[0] != None]
        perms = ('sqlmng.sqlmanual',)
        if set(perms) <= set(perms_list):
            data = request.data
            print(data)
            obj = self.get_object()
            userid = data['userid']
            remark = data['remark']
            sdata = {'work_order': obj.pk, 'user': userid, 'remark': remark}
            serializer = InceptStepModelSerializer(data=sdata)
            obj.treater = User.objects.get(id=userid)
            obj.status = 2
            obj.save()
            res = serializer.is_valid()
            if res:
                serializer.save()
                return Response({'status': 0})
            else:
                return Response({'status': 1})

        else:
            raise  PermissionDenied

@receiver(post_save,sender=InceptSql)
def inceptionflow_handler(sender,**kwargs):
    work_order=kwargs.get('instance').id
    user=kwargs.get('instance').commiter.id
    treater = kwargs.get('instance').treater
    remark = kwargs.get('instance').remark
    if treater:
        sdata={'work_order':work_order,'user':treater,'remark':remark}
    else:
        sdata={'work_order': work_order, 'user': user, 'remark': remark}
    serializer=InceptStepModelSerializer(data=sdata)
    if serializer.is_valid():
        serializer.save()
    else:
        print(serializer.errors)
    # print(kwargs.get('instance').id,kwargs.get('instance').sql_content)

class InceptStepModelViewSet(ModelViewSet):
    queryset = InceptStep.objects.all()
    serializer_class = InceptStepModelSerializer

    @action(detail=False, methods=['POST'])
    def workflow(self,request):
        data=request.data
        print(data)
        work_order=data['work_order']
        res=self.get_queryset().filter(work_order=work_order).order_by('-createtime')
        ser=InceptStepModelSerializer(instance=res,many=True)
        return  Response(ser.data)


        # return JsonResponse(json.dumps(list(res.values()),cls=DateEncoder),safe=False)



class DbAssignModelViewSet(ModelViewSet):
    queryset = DbAssign.objects.all()
    serializer_class = DbAssignModelSerializer


class SingleInstanceInstall(APIView):
    permission_classes = ()
    def post(self,request):
        if request.user.is_anonymous:
            token = request.meta.get('HTTP_AUTHORIZATION', '')
            user = parse_payload(token)
            request.user = user
        res = request.user.roles.values_list('permissions__permission__content_type__app_label',
                                             'permissions__permission__codename')
        # print(res)
        perms_list = ['.'.join(i) for i in res if i[0] != None]
        perms = ('sqlmng.installsinglemysql',)
        if set(perms) <= set(perms_list):
            res=request.data
            print('res的结果结果集是:',res)
            full_single_install(res['host'],res['port'])
            # print(res)
            return Response(res)
        else:
            raise  PermissionDenied
    def get(self,request):
        print(request.user)
        print(request.user.has_perm('sqlmng.installsinglemysql'))
        perms=('sqlmng.installsinglemysql',)
        res = request.user.roles.values_list('permissions__permission__content_type__app_label',
                                     'permissions__permission__codename')
        # print(res)
        perms_list = ['.'.join(i) for i in res if i[0] != None]
        if set(perms) <= set(perms_list):
            return Response("可以调用installsinglemysql方法")
        # In case the 403 handler should be called raise the exception
        else:
            raise PermissionDenied

class MultiInstanceInstall(APIView):
    permission_classes = ()
    def post(self,request):
        if request.user.is_anonymous:
            token = request.meta.get('HTTP_AUTHORIZATION', '')
            user = parse_payload(token)
            request.user = user
        res = request.user.roles.values_list('permissions__permission__content_type__app_label',
                                             'permissions__permission__codename')
        # print(res)
        perms_list = ['.'.join(i) for i in res if i[0] != None]
        perms = ('sqlmng.installmultimysql',)
        if set(perms) <= set(perms_list):
            res=request.data
            print('res的结果结果集是:',res)
            full_multil_install(res['master'],res['slave'],res['port'])
            # print(res)
            return Response(res)
        else:
            raise  PermissionDenied
    def get(self,request):
        print(request.user)
        print(request.user.has_perm('sqlmng.installmultimysql'))
        perms=('sqlmng.installmultimysql',)
        res = request.user.roles.values_list('permissions__permission__content_type__app_label',
                                     'permissions__permission__codename')
        # print(res)
        perms_list = ['.'.join(i) for i in res if i[0] != None]
        if set(perms) <= set(perms_list):
            return Response("可以调用installmultimysql方法")
        # In case the 403 handler should be called raise the exception
        else:
            raise PermissionDenied

基于restframework的基础上我们需要自己定义一些url,作为api接口是用,action价值体现在这,可以基于detail也可以不基于,receiver可以理解成触发器,触发某种事件的时候会自动执行
url.py

from rest_framework import  routers
router=routers.DefaultRouter()
from .views import DbInfoModelViewSet,DbAssignModelViewSet,InstanceModelViewSet,SingleInstanceInstall,MultiInstanceInstall,InceptSqlModelViewSet,InceptStepModelViewSet
router.register('dbinfo',DbInfoModelViewSet)
router.register('dbassign',DbAssignModelViewSet)
router.register('instance',InstanceModelViewSet)
router.register('inception',InceptSqlModelViewSet)
router.register('inceptstep',InceptStepModelViewSet)
from django.conf.urls import  url,include
urlpatterns = [
    url(r'^',include(router.urls)),
    url('^singleinstall',SingleInstanceInstall.as_view()),
    url('^multiinstall', MultiInstanceInstall.as_view())
]

以上为这个项目实施的关键代码

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/743999.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号