栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

springboot项目集成dolphinscheduler调度器 实现datax数据同步任务

springboot项目集成dolphinscheduler调度器 实现datax数据同步任务


Datax安装及基本使用请查看上一篇文章:https://blog.csdn.net/a924382407/article/details/120952339?spm=1001.2014.3001.5501

Datax概述 1.概述

2.功能清单
功能清单
CRUD增删改查 、启动任务、停止任务
3.说明:本项目只支持mysql及hbase之间的数据同步 代码模块 配置文件

pom.xml


    4.0.0
    
        com.geespace.microservices.bd-platform
        all
        1.0-SNAPSHOT
    

    data-sync-config
    1.0-SNAPSHOT

    
        1.8
        2.8.1
    

    

        
            com.github.pagehelper
            pagehelper-spring-boot-starter
        
        
            com.google.code.gson
            gson
            ${gson.version}
        

        
            org.springframework.boot
            spring-boot-starter-web
        
        
        
            org.elasticsearch
            elasticsearch
            6.8.12
        
        
            org.elasticsearch.client
            elasticsearch-rest-client
            6.8.12
        
        
            org.elasticsearch.client
            elasticsearch-rest-high-level-client
            6.8.12
        

        
            org.springframework.boot
            spring-boot-devtools
            runtime
            true
        
        
            mysql
            mysql-connector-java
            runtime
        
        
            org.springframework.boot
            spring-boot-configuration-processor
            true
        
        
            org.projectlombok
            lombok
            true
        
        
            org.springframework.boot
            spring-boot-starter-test
            test
            
                
                    org.junit.vintage
                    junit-vintage-engine
                
            
        
        
            com.alibaba
            fastjson
        
        
            org.codehaus.jackson
            jackson-core-asl
            1.9.13
            compile
        
        
            com.geespace.microservices.bd-platform
            data-config
            1.0-SNAPSHOT
            compile
        
        
        
            commons-httpclient
            commons-httpclient
            3.1
        
    

    
        
            
                org.apache.maven.plugins
                maven-shade-plugin
                2.2
                
                    
                        org.springframework.boot
                        spring-boot-maven-plugin
                        2.1.4.RELEASE
                    
                
                
                    false
                    true
                    
                        
                            *:*
                            
                                meta-INF
@Slf4j
@RestController
@RequestMapping("/dolphinscheduler/v1")
public class DataxDolphinschedulerController {
    @Autowired
    private RestTemplate restTemplate;
    @Value("${dolphinscheduler.token}")
    String token;
    @Value("${dolphinscheduler.address}")
    String address;
    public static final int ZERO = 0;
    public static final int SUCCESS = 200;
    public static final String CREATE = "create";
    public static final String UPDATe = "update";
    public static final String ADD = "add";
    public static final String DELETE = "delete";
    public static final String onLINE = "ONLINE";
    public static final String OFFLINE = "OFFLINE";
    public static final int ONE_THOUSAND_AND_FIVE_HUNDRED = 1500;
    public static final int SIX = 6;
    public static final int EIGHTY = 80;
    public static final int THREE = 3;
    @Autowired
    private SyncConfigService syncConfigService;


    
    @PostMapping("/project/process/datax")
    @Transactional(rollbackFor = Exception.class)
    public ReturnResult operatorDataxTask(HttpServletRequest request, @RequestBody @Validated ConfigAddForm form) {
        Long userId = Long.valueOf(request.getUserPrincipal().getName());
        form.setUserId(userId);

        ReturnResult dataxTaskReturnResult = syncConfigService.addConfig(form);
        if (dataxTaskReturnResult.getCode() != SUCCESS) {
            return dataxTaskReturnResult;
        }
        log.info("--(1)addDataxTaskResult--success");
        form.setId(dataxTaskReturnResult.getData().getId());
        if (dataxTaskReturnResult.getCode() == SUCCESS) {
            Boolean verifyResult = verifyProcessExist(userId + "-dataxTask", form.getProjectName());
            log.info("--(2)verifyProcessExist--success:{}", verifyResult);
            if (!verifyResult) {
                ProcessDto processDto = packageProcessParam(
                        "create", userId + "-dataxTask", dataxTaskReturnResult.getData(), null);
                log.info("--(3)packageProcessParam--success");
                processDto.setProjectName(form.getProjectName());
                processDto.setProjectId(form.getProjectId());
                dataxTaskReturnResult =  createProcess(processDto);
            } else {
                //获取用户下唯一工作流ID
                DolphinschedulerResponse processInfoList = getUserProcess(form.getProjectName());
                JSonObject processJson = new JSonObject();
                log.info("--(3)getUserProcess--success:{}", processInfoList);
                List> list = (List>) processInfoList.getData();
                for (Map map : list) {
                    if (map.get("name").equals(userId + "-dataxTask")) {
                        processJson.fluentPutAll(map);
                    }
                }
                ProcessDto processDto = packageProcessParam(
                        "add", userId + "-dataxTask", dataxTaskReturnResult.getData(), processJson);
                processDto.setId(processJson.getInteger("id"));
                log.info("--(4)packageProcessParam--success");
                if (processJson.getString(DictionaryEnum.RELEASE_STATE.getFiledString()).equals(ONLINE)) {
                    releaseProcessDefinition(form.getProjectName(), userId + "-dataxTask",
                            processDto.getId(), 0);
                    log.info("--(5)releaseProcessDefinition--OFFLINE--success");
                }
                dataxTaskReturnResult =  updateProcess(form, processDto);
            }
        }
        return dataxTaskReturnResult;
    }
    
    @PutMapping("/project/process/datax")
    @Transactional(rollbackFor = Exception.class)
    public ReturnResult updateDataxTask(HttpServletRequest request, @RequestBody @Validated ConfigUpdateForm form) {
        Long userId = Long.valueOf(request.getUserPrincipal().getName());
        form.setUserId(userId);
        ReturnResult dataxTaskReturnResult = syncConfigService.updateConfig(form);
        log.info("--(1)updateDataxTaskResult--mysql--success");
        if (dataxTaskReturnResult.getCode() == SUCCESS) {
            //获取用户下唯一工作流ID
            DolphinschedulerResponse processInfoList = getUserProcess(form.getProjectName());
            JSonObject processJson = new JSonObject();
            log.info("--(2)getUserProcess--success:{}", processInfoList);
            List> list = (List>) processInfoList.getData();
            for (Map map : list) {
                if (map.get("name").equals(userId + "-dataxTask")) {
                    processJson.fluentPutAll(map);
                }
            }
            ProcessDto processDto = packageProcessParam(
                    "update", userId + "-dataxTask", dataxTaskReturnResult.getData(), processJson);
            processDto.setProjectName(form.getProjectName());
            processDto.setProjectId(form.getProjectId());
            processDto.setId(processJson.getInteger("id"));
            log.info("--(3)packageProcessParam--success");
            if (processJson.getString(DictionaryEnum.RELEASE_STATE.getFiledString()).equals(ONLINE)) {
                releaseProcessDefinition(form.getProjectName(), userId + "-dataxTask",
                        processDto.getId(), 0);
                log.info("--(4)releaseProcessDefinition--OFFLINE--success");
            }
            ConfigAddForm configAddForm = new ConfigAddForm();
            BeanUtils.copyProperties(form, configAddForm);
            return updateProcess(configAddForm, processDto);
        }
        return dataxTaskReturnResult;
    }
    
    @DeleteMapping("/project/process/datax/{projectName}/{id}")
    @Transactional(rollbackFor = Exception.class)
    public ReturnResult deleteDataxTask(HttpServletRequest request, @PathVariable("projectName") String projectName,
                                        @PathVariable("id") Long id) {
        Long userId = Long.valueOf(request.getUserPrincipal().getName());
        SyncConfigDto syncConfigDto = new SyncConfigDto();
        syncConfigDto.setId(id);
        ConfigAddForm configAddForm = new ConfigAddForm();
        configAddForm.setProjectName(projectName);
        ReturnResult dataxTaskReturnResult = syncConfigService.delete(id, userId);
        log.info("--(1)deleteDataxTask--mysql--success");
        if (dataxTaskReturnResult.getCode() == SUCCESS) {
            //获取用户下唯一工作流ID
            DolphinschedulerResponse processInfoList = getUserProcess(projectName);
            JSonObject processJson = new JSonObject();
            log.info("--(2)getUserProcess--success:{}", processInfoList);
            List> list = (List>) processInfoList.getData();
            for (Map map : list) {
                if (map.get("name").equals(userId + "-dataxTask")) {
                    processJson.fluentPutAll(map);
                }
            }
            ProcessDto processDto = packageProcessParam(
                    "delete", userId + "-dataxTask", syncConfigDto, processJson);
            processDto.setId(processJson.getInteger("id"));
            log.info("--(3)packageProcessParam--success");
            if (processJson.getString(DictionaryEnum.RELEASE_STATE.getFiledString()).equals(ONLINE)) {
                releaseProcessDefinition(projectName, userId + "-dataxTask",
                        processDto.getId(), 0);
                log.info("--(4)releaseProcessDefinition--OFFLINE--success");
            }
            if (JSONObject.parseObject(processDto.getLocations()).keySet().size() == 0) {
                //删除工作流
                deleteProcess(configAddForm, processDto);
            } else {
                //更新工作流
                updateProcess(configAddForm, processDto);
            }
        }
        return dataxTaskReturnResult;
    }

    
    public Boolean verifyProcessExist(String processName, String projectName) {
        HttpHeaders headers = new HttpHeaders();
        headers.set("token", token);
        headers.set("Content-Type", "application/json");
        HttpEntity requestEntity = new HttpEntity(headers);
        ResponseEntity returnResult =
            restTemplate.exchange(address + "/dolphinscheduler/projects/" + projectName
                            + "/process/verify-name?name=" + processName,
                HttpMethod.GET, requestEntity, DolphinschedulerResponse.class);
        if (returnResult.getBody().getCode() == ZERO) {
            return false;
        }
        return true;
    }

    
    public ReturnResult createProcess(ProcessDto processDto) {
        try {
            String postURL = address + "/dolphinscheduler/projects/"
                   + URLEncoder.encode(processDto.getProjectName(), "utf-8") + "/process/save";
            PostMethod postMethod = new PostMethod(postURL);
            postMethod.setRequestHeader("Content-Type",
                    "application/x-www-form-urlencoded;charset=utf-8");
            postMethod.setRequestHeader("token", token);
            NamevaluePair[] data = {new NamevaluePair("connects", processDto.getConnects()),
                new NamevaluePair("name", processDto.getName()),
                new NamevaluePair("locations", processDto.getLocations()),
                new NamevaluePair("processDefinitionJson", processDto.getProcessDefinitionJson())};
            postMethod.setRequestBody(data);
            org.apache.commons.httpclient.HttpClient httpClient = new org.apache.commons.httpclient.HttpClient();
            httpClient.executeMethod(postMethod);
            JSonObject result = JSONObject.parseObject(postMethod.getResponseBodyAsString());
            log.info("--(5)httpCreateProcess:{}", result);
            if (!result.get(DictionaryEnum.CODE.getFiledString()).equals(ZERO)) {
                return ReturnResult.error(BizCode.DB_INSERT_ERROR, result.getString("msg"));
            }
        } catch (Exception e) {
            log.info("请求异常:{}", e);
        }
        return ReturnResult.success();
    }

    
    public ReturnResult updateProcess(ConfigAddForm vo, ProcessDto processDto) {
        try {

            String postURL = address + "/dolphinscheduler/projects/"
                   + URLEncoder.encode(vo.getProjectName(), "utf-8") + "/process/update";
            PostMethod postMethod = new PostMethod(postURL);
            postMethod.setRequestHeader("Content-Type",
                    "application/x-www-form-urlencoded;charset=utf-8");
            postMethod.setRequestHeader("token", token);
            // 参数设置,需要注意的就是里边不能传NULL,要传空字符串
            NamevaluePair[] data = {new NamevaluePair("connects", processDto.getConnects()),
                new NamevaluePair("name", processDto.getName()),
                new NamevaluePair("locations", processDto.getLocations()),
                new NamevaluePair("id", processDto.getId().toString()),
                new NamevaluePair("processDefinitionJson", processDto.getProcessDefinitionJson())};
            postMethod.setRequestBody(data);
            org.apache.commons.httpclient.HttpClient httpClient = new org.apache.commons.httpclient.HttpClient();
            httpClient.executeMethod(postMethod);
            JSonObject result = JSONObject.parseObject(postMethod.getResponseBodyAsString());
            log.info("--(5)httpUpdateProcess:{}", result);
            if (!result.get(DictionaryEnum.CODE.getFiledString()).equals(ZERO)) {
                return ReturnResult.error(BizCode.DB_INSERT_ERROR, result.getString("msg"));
            }
        } catch (Exception e) {
            log.info("请求异常:{}", e);
        }
        return ReturnResult.success();
    }
    
    public DolphinschedulerResponse deleteProcess(ConfigAddForm dto, ProcessDto processDto) {
            HttpHeaders headers = new HttpHeaders();
            headers.set("token", token);
            headers.set("Content-Type", "application/json");
            HttpEntity requestEntity = new HttpEntity(headers);
            ResponseEntity returnResult =
                    restTemplate.exchange(address + "/dolphinscheduler/projects/" + dto.getProjectName()
                                   + "/process/delete?processDefinitionId=" + processDto.getId(),
                            HttpMethod.GET, requestEntity, DolphinschedulerResponse.class);
            log.info("--(5)httpDeleteProcess:{}", returnResult);
        return returnResult.getBody();
    }

    
    public Integer getSparkResourceJarId() {
        Integer resourceId = null;
        HttpHeaders headers = new HttpHeaders();
        headers.set("token", token);
        headers.set("Content-Type", "application/json");
        HttpEntity requestEntity = new HttpEntity(headers);
        ResponseEntity returnResult =
            restTemplate.exchange(address + "/dolphinscheduler/resources/authorize-resource-tree?userId=1",
                HttpMethod.GET, requestEntity, DolphinschedulerResponse.class);
        List> list = (List>) returnResult.getBody().getData();
        for (Map map : list) {
            if (map.get("name").equals("big_data02.jar")) {
                resourceId = Integer.valueOf(map.get("id").toString());
            }
        }
        return resourceId;
    }
    
    public DolphinschedulerResponse getUserProcess(String projectName) {
        HttpHeaders headers = new HttpHeaders();
        headers.set("token", token);
        headers.set("Content-Type", "application/json");
        HttpEntity requestEntity = new HttpEntity(headers);
        ResponseEntity returnResult =
            restTemplate.exchange(address + "/dolphinscheduler/projects/" + projectName + "/process/list",
                HttpMethod.GET, requestEntity, DolphinschedulerResponse.class);
        return returnResult.getBody();
    }
    
    public ProcessDto packageProcessParam(String type, String processName, SyncConfigDto dto, JSonObject processJson) {
        ProcessDto processDto = new ProcessDto();
        processDto.setConnects("[]");
        processDto.setName(processName);
        JSonObject locationsOne = new JSonObject();
        JSonObject locationsTwo = new JSonObject();
        locationsTwo.fluentPut("name", "datax-" + dto.getId()).fluentPut("targetarr", "").fluentPut("nodenumber", "0");
        locationsTwo.fluentPut("x", 0).fluentPut("y", 0);
        locationsOne.put("datax-" + dto.getId(), locationsTwo);

        // 创建工作流
        if (CREATE.equals(type)) {
            processDto = packageProcessParamOfCreate(processDto, dto, locationsOne);
         } else if (ADD.equals(type)) {
            //工作流添加节点
            processDto = packageProcessParamOfAdd(processDto, dto, processJson, locationsOne, locationsTwo);
        } else if (UPDATE.equals(type)) {
            //更新工作流-只更新参数processDefinitionJson的tasks参数
            processDto = packageProcessParamOfUpdate(processDto, processJson, dto);
        } else if (DELETE.equals(type)) {
            //更新工作流或删除工作流-更新则删除参数processDefinitionJson的tasks参数
            processDto = packageProcessParamOfDelete(processDto, processJson, dto);
        }
        return processDto;
    }
    
    public ProcessDto packageProcessParamOfCreate(ProcessDto processDto, SyncConfigDto dto, JSonObject locationsOne) {
        processDto.setLocations(locationsOne.toString());
        JSonObject processOne = new JSonObject();
        processOne.fluentPut("globalParams", new ArrayList<>()).fluentPut("tenantId", THREE).fluentPut("timeout", 0);
        JSonObject processTwo = new JSonObject();
        processTwo.fluentPut("type", "DATAX").fluentPut("id", "datax-" + dto.getId());
        processTwo.fluentPut("name", "datax-" + dto.getId()).fluentPut("description", "");

        String taskJsonString = dto.getContent().toString();
        processTwo.put("params", JSONObject.parseObject("{"localParams":[],"customConfig":1,"
               + ""json":"" + taskJsonString.replace(""", "\"") + ""}"));

        JSonObject jsonTimeout = new JSonObject();
        jsonTimeout.fluentPut("strategy", "").fluentPut("interval", null).fluentPut("enable", false);
        processTwo.fluentPut("timeout", jsonTimeout).fluentPut("runFlag", "NORMAL");
        JSonObject processTree = new JSonObject();
        processTree.fluentPut("successNode", new ArrayList<>()).fluentPut("failedNode", new ArrayList<>());
        JSonObject jsonconditionResult = new JSonObject();
        jsonconditionResult.put("successNode", new ArrayList<>());
        jsonconditionResult.put("failedNode", new ArrayList<>());
        processTwo.fluentPut("conditionResult", jsonconditionResult).fluentPut("dependence", new JSonObject());
        processTwo.fluentPut("maxRetryTimes", "0").fluentPut("retryInterval", "1");
        processTwo.fluentPut("taskInstancePriority", "MEDIUM").fluentPut("workerGroup", "default");
        processTwo.fluentPut("preTasks", new ArrayList<>());
        JSonArray processTaskArray = new JSonArray();
        processTaskArray.add(processTwo);
        processOne.put("tasks", processTaskArray);
        processDto.setProcessDefinitionJson(processOne.toString());
        return processDto;
    }
    
    public ProcessDto packageProcessParamOfAdd(ProcessDto processDto, SyncConfigDto dto, JSonObject processJson,
                                               JSonObject locationsOne, JSonObject locationsTwo) {
        String maxTaskKey = JsonTools.getJsonStringMaxKey(processJson.getString("locations"));
        Integer x = processJson.getJSonObject("locations").getJSonObject(maxTaskKey).getInteger("x");
        Integer y = processJson.getJSonObject("locations").getJSonObject(maxTaskKey).getInteger("y");
        if (x < ONE_THOUSAND_AND_FIVE_HUNDRED) {
            locationsTwo.fluentPut("x", x + EIGHTY).fluentPut("y", y);
        } else if (x >= ONE_THOUSAND_AND_FIVE_HUNDRED) {
            locationsTwo.fluentPut("y", y + EIGHTY).fluentPut("x", 0);
        }
        locationsOne = processJson.getJSonObject("locations").fluentPut("datax-" + dto.getId(), locationsTwo);
        processDto.setLocations(locationsOne.toString());
        processDto.setId(processJson.getInteger("id"));
        JSonObject processTwo = new JSonObject();
        processTwo.fluentPut("type", "DATAX").fluentPut("id", "datax-" + dto.getId());
        processTwo.fluentPut("name", "datax-" + dto.getId()).fluentPut("description", "");
        String taskJsonString = dto.getContent().toString().replace("}}", "} }").replace("{{", "{ {");
        processTwo.put("params", JSONObject.parseObject("{"localParams":[],"customConfig":1,"
                + ""json":"" + taskJsonString.replace(""", "\"") + ""}"));
        JSonObject jsonTimeout = new JSonObject();
        jsonTimeout.fluentPut("strategy", "").fluentPut("interval", null).fluentPut("enable", false);
        processTwo.fluentPut("timeout", jsonTimeout).fluentPut("runFlag", "NORMAL");
        JSonObject processTree = new JSonObject();
        processTree.fluentPut("successNode", new ArrayList<>()).fluentPut("failedNode", new ArrayList<>());
        JSonObject jsonconditionResult = new JSonObject();
        jsonconditionResult.put("successNode", new ArrayList<>());
        jsonconditionResult.put("failedNode", new ArrayList<>());
        processTwo.fluentPut("conditionResult", jsonconditionResult).fluentPut("dependence", new JSonObject());
        processTwo.fluentPut("maxRetryTimes", "0").fluentPut("retryInterval", "1");
        processTwo.fluentPut("taskInstancePriority", "MEDIUM").fluentPut("workerGroup", "default");
        processTwo.fluentPut("preTasks", new ArrayList<>());
        JSonObject jsonNew = processJson.getJSonObject("processDefinitionJson");
        JSonArray jsonArray = jsonNew.getJSonArray("tasks");
        jsonArray.add(processTwo);
        jsonNew.put("tasks", jsonArray);
        processDto.setProcessDefinitionJson(jsonNew.toString());
        return processDto;
    }
    
    public ProcessDto packageProcessParamOfUpdate(ProcessDto processDto, JSonObject processJson, SyncConfigDto dto) {
        processDto.setLocations(processJson.getString("locations"));
        processDto.setId(processJson.getInteger("id"));
        JSonArray jsonTasksArray = processJson.getJSonObject("processDefinitionJson").getJSonArray("tasks");
        JSonArray copyJsonTasksArray = new JSonArray();
        copyJsonTasksArray.addAll(jsonTasksArray);
        JSonObject processDefinitionJson = new JSonObject();
        String taskJsonString = dto.getContent().toString();
        for (Object object : jsonTasksArray) {
            JSonObject jsonObject = JSONObject.parseObject(object.toString());
            if (Long.valueOf(jsonObject.getString("id").substring(SIX)) == dto.getId()) {
                String json = jsonObject.getString("json");
                json = taskJsonString;
                copyJsonTasksArray.remove(jsonObject);
                jsonObject.getJSonObject("params").put("json", json);
                copyJsonTasksArray.add(jsonObject);
                processDefinitionJson = processJson.getJSonObject("processDefinitionJson");
                processDefinitionJson.put("tasks", copyJsonTasksArray);
            }
        }
        processDto.setProcessDefinitionJson(processDefinitionJson.toString());
        return processDto;
    }
    
    public ProcessDto packageProcessParamOfDelete(ProcessDto processDto, JSonObject processJson, SyncConfigDto dto) {
        processDto.setId(processJson.getInteger("id"));
        JSonObject locationsJson = processJson.getJSonObject("locations");
        JSonObject processDefinitionJson = processJson.getJSonObject("processDefinitionJson");
        JSonArray processDefinitionArray = processDefinitionJson.getJSonArray("tasks");
        JSonArray copyProcessDefinitionArray = new JSonArray();
        copyProcessDefinitionArray.addAll(processDefinitionArray);
        if (locationsJson.containsKey(DictionaryEnum.DATAX.getFiledString() + dto.getId())) {
            locationsJson.remove("datax-" + dto.getId());
            for (Object object : processDefinitionArray) {
                if (JSONObject.parseObject(object.toString()).getString("id").equals("datax-" + dto.getId())) {
                    copyProcessDefinitionArray.remove(object);
                }
            }
            processDefinitionJson.put("tasks", copyProcessDefinitionArray);
        }
        processDto.setLocations(locationsJson.toString());
        processDto.setProcessDefinitionJson(processDefinitionJson.toString());
        return processDto;
    }

    
    public ReturnResult releaseProcessDefinition(String projectName, String processName, Integer processId,
                  Integer releaseState) {
        try {
            String postURL = address + "/dolphinscheduler/projects/"
                   + URLEncoder.encode(projectName, "utf-8") + "/process/release";
            PostMethod postMethod = new PostMethod(postURL);
            postMethod.setRequestHeader("Content-Type",
                    "application/x-www-form-urlencoded;charset=utf-8");
            postMethod.setRequestHeader("token", token);
            // 参数设置,需要注意的就是里边不能传NULL,要传空字符串
            NamevaluePair[] data = {new NamevaluePair("name", processName),
                    new NamevaluePair("processId", processId.toString()),
                    new NamevaluePair("releaseState", releaseState.toString())};
            postMethod.setRequestBody(data);
            org.apache.commons.httpclient.HttpClient httpClient = new org.apache.commons.httpclient.HttpClient();
            httpClient.executeMethod(postMethod);
            JSonObject result = JSONObject.parseObject(postMethod.getResponseBodyAsString());
            if (!result.get(DictionaryEnum.CODE.getFiledString()).equals(ZERO)) {
                return ReturnResult.error(BizCode.DB_INSERT_ERROR, result.getString("msg"));
            }
        } catch (Exception e) {
            log.info("请求异常:{}", e);
        }
        return ReturnResult.success();
    }
    
    @GetMapping("/project/process/datax/start")
    public DolphinschedulerResponse startProcessDataxTask(
            @RequestParam("projectName") String projectName, @RequestParam("id") Integer id,
            HttpServletRequest request) {
        try {
            Long userId = Long.valueOf(request.getUserPrincipal().getName());
            DolphinschedulerResponse processInfoList = getUserProcess(projectName);
            if (processInfoList.getCode() != ZERO) {
                return processInfoList;
            }
            JSonObject processJson = new JSonObject();
            log.info("--(1)getUserProcess--success:{}", processInfoList);
            List> list = (List>) processInfoList.getData();
            for (Map map : list) {
                if (map.get("name").equals(userId + "-dataxTask")) {
                    processJson.fluentPutAll(map);
                }
            }
            if (processJson.getString(DictionaryEnum.RELEASE_STATE.getFiledString()).equals(OFFLINE)) {
                releaseProcessDefinition(projectName, userId + "-dataxTask",
                        processJson.getInteger("id"), 1);
                log.info("--(2)releaseProcessDefinition--ONLINE--success");
            }
            String postURL = address + "/dolphinscheduler/projects/" + URLEncoder.encode(projectName, "utf-8")
                   + "/executors/start-process-instance";
            PostMethod postMethod = new PostMethod(postURL);
            postMethod.setRequestHeader("Content-Type",
                    "application/x-www-form-urlencoded;charset=utf-8");
            postMethod.setRequestHeader("token", token);
            // 参数设置,需要注意的就是里边不能传NULL,要传空字符串
            NamevaluePair[] data = packageNamevaluePair(processJson, id);
            postMethod.setRequestBody(data);
            org.apache.commons.httpclient.HttpClient httpClient = new org.apache.commons.httpclient.HttpClient();
            httpClient.executeMethod(postMethod);
            JSonObject result = JSONObject.parseObject(postMethod.getResponseBodyAsString());
            log.info("--(2)startProcessInstance--result:{}", result);
            if (!result.get(DictionaryEnum.CODE.getFiledString()).equals(ZERO)) {
                return DolphinschedulerResponse.error(result.getInteger("code"), result.getString("msg"));
            }
        } catch (Exception e) {
            log.info("请求异常:{}", e);
        }
        return DolphinschedulerResponse.success();
    }
    
    public NamevaluePair[] packageNamevaluePair(JSonObject processJson, Integer dragSparkTaskId) {
        NamevaluePair[] data = {
                new NamevaluePair("failureStrategy", "CONTINUE"),
                new NamevaluePair("processDefinitionId", processJson.getString("id")),
                new NamevaluePair("processInstancePriority", "MEDIUM"),
                new NamevaluePair("warningGroupId", "0"),
                new NamevaluePair("warningType", "NONE"),
                new NamevaluePair("runMode", "RUN_MODE_SERIAL"),
                new NamevaluePair("startNodeList", "datax-" + dragSparkTaskId),
                new NamevaluePair("taskDependType", "TASK_POST"),
                new NamevaluePair("workerGroup", "default")};
        return data;
    }


    
    @GetMapping(value = "/project/process/datax/execute/{projectName}/{id}/{executeType}")
    public DolphinschedulerResponse stopProcessDataxTask(@PathVariable("projectName") String projectName,
                                 @PathVariable("id") Long id, @PathVariable("executeType") String executeType) {
        log.info("--(1)stopProcessDataxTask--begin--projectName:{},id:{},executeType:{}", projectName, id, executeType);
        try {
            HttpHeaders headers = new HttpHeaders();
            headers.set("token", token);
            headers.set("Content-Type", "application/json");
            HttpEntity requestEntity = new HttpEntity(headers);
            ResponseEntity returnResult = restTemplate.exchange(address + "/"
   + "dolphinscheduler/projects/" + projectName + "/task-instance/list-paging?"
   + "pageNo=1&pageSize=100&taskName=datax-" + id, HttpMethod.GET, requestEntity, JSONObject.class);
            List> list =
                    (List>) returnResult.getBody().getJSonObject("data").get("totalList");
            Integer processInstanceId = null;
            for (Map map : list) {
                if (map.get("state").equals("RUNNING_EXEUTION")) {
                    processInstanceId = Integer.valueOf(map.get("processInstanceId").toString());
                }

            }
            if (StringUtils.isEmpty(processInstanceId)) {
                return DolphinschedulerResponse.error(Msg.TASK_HAS_BEEN_STOPPED);
            }
            log.info("--(2)getProcessInstanceId--success--:{}", processInstanceId);
            String postURL = address + "/dolphinscheduler/projects/"
                    + URLEncoder.encode(projectName, "utf-8") + "/executors/execute";
            PostMethod postMethod = new PostMethod(postURL);
            postMethod.setRequestHeader("Content-Type",
                    "application/x-www-form-urlencoded;charset=utf-8");
            postMethod.setRequestHeader("token", token);
            NamevaluePair[] data = {new NamevaluePair("executeType", executeType),
                    new NamevaluePair("processInstanceId", processInstanceId.toString())};
            postMethod.setRequestBody(data);
            org.apache.commons.httpclient.HttpClient httpClient = new org.apache.commons.httpclient.HttpClient();
            httpClient.executeMethod(postMethod);
            JSonObject result = JSONObject.parseObject(postMethod.getResponseBodyAsString());
            if (!result.get(DictionaryEnum.CODE.getFiledString()).equals(ZERO)) {
                return DolphinschedulerResponse.error(result.getInteger("code"), result.getString("msg"));
            }
            log.info("--(3)stopProcessSparkTask--success--:{}", result);
        } catch (UnsupportedEncodingException e) {
            log.info("UnsupportedEncodingException:{}", e);
        } catch (HttpException e) {
            log.info("HttpException:{}", e);
        } catch (IOException e) {
            log.info("IOException:{}", e);
        }
        return DolphinschedulerResponse.success();
    }

    
    @RequestMapping(value = "/project/process/datax/list", method = RequestMethod.POST)
    public ReturnResult> findAll(@RequestBody @Validated ConfigSelectForm form,
                                                           HttpServletRequest request) {
        Long userId = Long.valueOf(request.getUserPrincipal().getName());
        return syncConfigService.list(form, userId);
    }

    
    @RequestMapping(value = "/project/process/datax", method = RequestMethod.GET)
    public ReturnResult findById(@RequestParam Long id, HttpServletRequest request) {
        Long userId = Long.valueOf(request.getUserPrincipal().getName());
        return syncConfigService.findById(id, userId);
    }
}

ConfigAddForm
package com.geespace.microservices.builder.request;

import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;

import lombok.Data;


@Data
public class ConfigAddForm {

    
    @NotEmpty(message = "name不能为空")
    private String name;
    
    private String description;
    
    @NotNull(message = "同步方式不能为空")
    private int syncType;
    
    @NotNull(message = "读取数据源id不能为空")
    private Long readerConfigId;
    
    @NotEmpty(message = "读取参数不能为空")
    private JSonObject readerParam;
    
    @NotNull(message = "写入数据源id不能为空")
    private Long writerConfigId;
    
    @NotEmpty(message = "写入参数不能为空")
    private JSonObject writerParam;
    
    @NotEmpty(message = "字段对照表不能为空")
    private JSonArray columnMap;

    private Long userId;
    
    String projectName;
    
    @NotNull(message = "projectId not null")
    Long projectId;
    Long id;
}

ConfigUpdateForm
package com.geespace.microservices.builder.request;

import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;

import lombok.Data;


@Data
public class ConfigUpdateForm {
    @NotNull(message = "同步配置id不能为空")
    private Long id;
    
    @NotEmpty(message = "name不能为空")
    private String name;
    
    private String description;
    
    @NotNull(message = "同步方式不能为空")
    private int syncType;
    
    @NotNull(message = "读取数据源id不能为空")
    private Long readerConfigId;
    
    @NotEmpty(message = "读取参数不能为空")
    private JSonObject readerParam;
    
    @NotNull(message = "写入数据源id不能为空")
    private Long writerConfigId;
    
    @NotEmpty(message = "写入参数不能为空")
    private JSonObject writerParam;
    
    @NotEmpty(message = "字段对照表不能为空")
    private JSonArray columnMap;

    private Long userId;
    
    @NotNull(message = "projectId not null")
    Long projectId;
    
    String projectName;

}

ProcessDto
package com.geespace.microservices.builder.dto;

import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;


@Data
@EqualsAndHashCode(callSuper = false)
@ToString(callSuper = true)
public class ProcessDto {
    
    private Integer id;
    
    private String connects;
    
    private String locations;
    
    private String name;
    
    private String processDefinitionJson;
    
    String projectName;
    
    Long projectId;
}

SyncConfigDto
package com.geespace.microservices.builder.dto;

import com.alibaba.fastjson.JSONObject;

import lombok.Data;


@Data
public class SyncConfigDto {
    private Long id;

    
    private String name;

    
    private String description;

    
    private int syncType;
    
    private JSonObject content;
    
    String projectName;
    
    Long projectId;
}

SyncConfigService
package com.geespace.microservices.builder.service;

import com.geespace.microservices.builder.dto.SyncConfigDto;
import com.geespace.microservices.builder.request.ConfigAddForm;
import com.geespace.microservices.builder.request.ConfigSelectForm;
import com.geespace.microservices.builder.request.ConfigUpdateForm;
import com.geespace.microservices.builder.response.PageResult;
import com.geespace.microservices.builder.response.ReturnResult;


public interface SyncConfigService {
    
    ReturnResult addConfig(ConfigAddForm form);

    
    ReturnResult updateConfig(ConfigUpdateForm form);

    
    ReturnResult findById(Long id, Long userId);

    
    ReturnResult delete(Long id, Long userId);

    
    ReturnResult> list(ConfigSelectForm form, Long userId);
}

SyncConfigServiceImpl
package com.geespace.microservices.builder.service.impl;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.geespace.microservices.builder.biz.Contants;
import com.geespace.microservices.builder.dao.SyncConfigMapper;
import com.geespace.microservices.builder.dto.ColumnMap;
import com.geespace.microservices.builder.dto.SyncConfigDto;
import com.geespace.microservices.builder.entity.SyncConfig;
import com.geespace.microservices.builder.factory.baseParamTool;
import com.geespace.microservices.builder.factory.ParamToolFactory;
import com.geespace.microservices.builder.request.ConfigAddForm;
import com.geespace.microservices.builder.request.ConfigSelectForm;
import com.geespace.microservices.builder.request.ConfigUpdateForm;
import com.geespace.microservices.builder.response.BizCode;
import com.geespace.microservices.builder.response.PageResult;
import com.geespace.microservices.builder.response.ReturnResult;
import com.geespace.microservices.builder.service.SyncConfigService;
import com.geespace.microservices.datasource.dto.JdbcDataSourceDto;
import com.geespace.microservices.datasource.response.Response;
import com.geespace.microservices.datasource.service.JdbcDataSourceService;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;


@Service
@Slf4j
public class SyncConfigServiceImpl implements SyncConfigService {
    public static final int ZERO = 0;
    public static final String Hbase = "hbase";
    @Autowired
    private SyncConfigMapper syncConfigMapper;

    @Autowired
    private JdbcDataSourceService dataSourceService;

    @Override
    public ReturnResult addConfig(ConfigAddForm form) {
        Integer checkResult = syncConfigMapper.checkNameUnique(form.getUserId(), form.getName(), null);
        if (checkResult == ZERO) {
            ColumnMap columnMap = makeColumnMap(form.getColumnMap());
            if (columnMap == null) {
                return ReturnResult.error(BizCode.COLUMN_MATCHING_ERROR);
            }
            // 查询reader数据源 填充reader
            JSonObject reader = packageReader(form.getReaderConfigId(), form.getReaderParam(), columnMap.getReader());
            // 查询writer数据源 填充writer
            JSonObject writer = packageWriter(form.getWriterConfigId(), form.getWriterParam(), columnMap.getWriter());
            JSonArray contentArray = new JSonArray();
            JSonObject content = new JSonObject();
            content.put("reader", reader);
            content.put("writer", writer);
            contentArray.add(content);
            SyncConfig syncConfig = new SyncConfig();
            syncConfig.setContent(packageJob(contentArray));
            syncConfig.setName(form.getName());
            syncConfig.setDescription(form.getDescription());
            syncConfig.setSyncType(form.getSyncType());
            syncConfig.setCreatedTimestamp(System.currentTimeMillis());
            syncConfig.setCreatedUser(form.getUserId());
            syncConfig.setModifiedTimestamp(System.currentTimeMillis());
            syncConfig.setProjectName(form.getProjectName());
            syncConfig.setProjectId(form.getProjectId());
            syncConfigMapper.insert(syncConfig);
            return ReturnResult.success(entityToDto(syncConfig));
        }
        log.error("SyncConfigServiceImpl--addConfig--NAME_IS_EXIST!");
        return ReturnResult.error(BizCode.NAME_IS_EXIST);
    }

    @Override
    public ReturnResult updateConfig(ConfigUpdateForm form) {
        SyncConfig syncConfig = syncConfigMapper.findById(form.getId());
        if (syncConfig == null || syncConfig.getCreatedUser() != form.getUserId()) {
            return ReturnResult.error(BizCode.UPDATE_OBJECT_NOT_EXIST);
        }
        Integer checkResult = syncConfigMapper.checkNameUnique(form.getUserId(), form.getName(), form.getId());
        if (checkResult == ZERO) {
            ColumnMap columnMap = makeColumnMap(form.getColumnMap());
            JSonObject reader = packageReader(form.getReaderConfigId(), form.getReaderParam(), columnMap.getReader());
            JSonObject writer = packageWriter(form.getWriterConfigId(), form.getWriterParam(), columnMap.getWriter());
            JSonArray contentArray = new JSonArray();
            JSonObject content = new JSonObject();
            content.put("reader", reader);
            content.put("writer", writer);
            contentArray.add(content);
            syncConfig.setContent(packageJob(contentArray));
            syncConfig.setName(form.getName());
            syncConfig.setDescription(form.getDescription());
            syncConfig.setSyncType(form.getSyncType());
            syncConfig.setModifiedTimestamp(System.currentTimeMillis());
            syncConfig.setProjectName(form.getProjectName());
            syncConfig.setProjectId(form.getProjectId());
            syncConfigMapper.update(syncConfig);
            return ReturnResult.success(entityToDto(syncConfig));
        }
        log.error("SyncConfigServiceImpl--updateConfig--NAME_IS_EXIST!");
        return ReturnResult.error(BizCode.NAME_IS_EXIST);
    }

    @Override
    public ReturnResult findById(Long id, Long userId) {
        SyncConfig syncConfig = syncConfigMapper.findById(id);
        if (syncConfig == null || syncConfig.getCreatedUser() != userId) {
            return ReturnResult.success(new SyncConfigDto());
        }
        return ReturnResult.success(entityToDto(syncConfig));
    }

    @Override
    public ReturnResult delete(Long id, Long userId) {
        log.debug("****id:{},userId:{}****", id, userId);
        SyncConfig syncConfig = syncConfigMapper.findById(id);
        log.debug("****syncConfig:{}****", syncConfig);
        log.debug("****syncConfig != null:{}", syncConfig != null);
        log.debug("****syncConfig.getCreatedUser():{},userId:{},syncConfig.getCreatedUser().equals(userId):{}",
                syncConfig.getCreatedUser(), userId, syncConfig.getCreatedUser().equals(userId));
        if (syncConfig != null && syncConfig.getCreatedUser().equals(userId)) {
            syncConfigMapper.delete(id);
            log.debug("****delete success!");
        }
        return ReturnResult.success();
    }

    @Override
    public ReturnResult> list(ConfigSelectForm form, Long userId) {
        SyncConfig syncConfig = new SyncConfig();
        syncConfig.setCreatedUser(userId);
        syncConfig.setName(form.getName());
        syncConfig.setProjectId(form.getProjectId());
        PageHelper.startPage(form.getPageNum(), form.getPageSize());
        PageInfo configPageInfo = new PageInfo<>(syncConfigMapper.list(syncConfig));
        PageResult result = new PageResult<>();
        result.setPageNum(configPageInfo.getPageNum());
        result.setPageSize(configPageInfo.getPageSize());
        result.setTotalCount(configPageInfo.getTotal());
        result.setTotalPage(configPageInfo.getPages());
        List dtoList =
            configPageInfo.getList().stream().map(this::entityToDto).collect(Collectors.toList());
        result.setList(dtoList);
        return ReturnResult.success(result);
    }

    
    private ColumnMap makeColumnMap(JSonArray columnMap) {
        List readerColumns = new ArrayList<>();
        List writerColumns = new ArrayList<>();
        for (int i = 0; i < columnMap.size(); i++) {
            JSonObject column = columnMap.getJSonObject(i);
            readerColumns.add(column.getString("reader"));
            writerColumns.add(column.getString("writer"));
        }
        if (CollectionUtils.isEmpty(readerColumns) || CollectionUtils.isEmpty(writerColumns)) {
            return null;
        }
        ColumnMap column = new ColumnMap();
        column.setReader(readerColumns);
        column.setWriter(writerColumns);
        return column;
    }

    
    private JSonObject packageReader(Long readerConfigId, JSonObject readerParam, List readerColumns) {
        Response descrypt = dataSourceService.findDescrypt(readerConfigId);
        if (!descrypt.responseSuccess()) {
            return null;
        }
        JdbcDataSourceDto jdbcDataSource = descrypt.getInfo();
        String sourceType = jdbcDataSource.getSourceType();
        baseParamTool baseParamTool = ParamToolFactory.getByType(sourceType);
        JSonObject reader = baseParamTool.makeReaderJson(jdbcDataSource, readerParam, readerColumns);
        return reader;
    }

    
    private JSonObject packageWriter(Long writerConfigId, JSonObject writerParam, List writerColumns) {
        Response descrypt = dataSourceService.findDescrypt(writerConfigId);
        if (!descrypt.responseSuccess()) {
            return null;
        }
        JdbcDataSourceDto jdbcDataSource = descrypt.getInfo();
        String sourceType = jdbcDataSource.getSourceType();
        baseParamTool baseParamTool = ParamToolFactory.getByType(sourceType);
        JSonObject writer = baseParamTool.makeWriterJson(jdbcDataSource, writerParam, writerColumns);
        return writer;
    }

    
    private JSonObject packageJob(JSonArray content) {
        JSonObject job = new JSonObject();
        JSonObject setting = new JSonObject();
        JSonObject speed = new JSonObject();
        speed.put("channel", 1);
        JSonObject errorLimit = new JSonObject();
        errorLimit.put("record", 0);
        errorLimit.put("percentage", Contants.PERCENTAGE);
        setting.put("speed", speed);
        setting.put("errorLimit", errorLimit);
        job.put("setting", setting);
        job.put("content", content);
        JSonObject jobContent = new JSonObject();
        jobContent.put("job", job);
        return jobContent;
    }

    
    private SyncConfigDto entityToDto(SyncConfig syncConfig) {
        SyncConfigDto configDto = new SyncConfigDto();
        BeanUtils.copyProperties(syncConfig, configDto);
        return configDto;
    }
}

SyncConfigMapper
package com.geespace.microservices.builder.dao;

import java.util.List;

import com.geespace.microservices.builder.entity.SyncConfig;

import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Result;
import org.apache.ibatis.annotations.ResultMap;
import org.apache.ibatis.annotations.Results;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.SelectKey;
import org.apache.ibatis.annotations.SelectProvider;
import org.apache.ibatis.annotations.Update;
import org.apache.ibatis.type.JdbcType;


@Mapper
public interface SyncConfigMapper {
    
    @Insert({"insert into sync_config (name, description, content, sync_type, created_timestamp, created_user, ",
        "modified_timestamp,project_id,project_name) values (#{name,jdbcType=VARCHAR},#{description,jdbcType=VARCHAR},",
        "#{content,jdbcType=OTHER, typeHandler=com.geespace.microservices.builder.handler.MySqlJsonHandler}, ",
        "#{syncType,jdbcType=TINYINT}, #{createdTimestamp,jdbcType=BIGINT}, #{createdUser,jdbcType=BIGINT}, ",
        "#{modifiedTimestamp,jdbcType=BIGINT},#{projectId},#{projectName})"})
    @SelectKey(statement = "SELECT LAST_INSERT_ID()", keyProperty = "id", before = false, resultType = Long.class)
    Long insert(SyncConfig syncConfig);

    
    @Update({"update sync_config set ",
        "name = #{name,jdbcType=VARCHAR}, description = #{description,jdbcType=VARCHAR}, ",
        "sync_type = #{syncType,jdbcType=TINYINT}, content = #{content,jdbcType=OTHER,",
        "typeHandler=com.geespace.microservices.builder.handler.MySqlJsonHandler},project_id=#{projectId}, ",
        "created_timestamp = #{createdTimestamp,jdbcType=BIGINT}, created_user = #{createdUser,jdbcType=BIGINT}, ",
        "modified_timestamp = #{modifiedTimestamp,jdbcType=BIGINT},project_name=#{projectName},project_id=#{projectId}",
        " where id = #{id,jdbcType=BIGINT}"})
    int update(SyncConfig syncConfig);

    
    @Delete("delete from sync_config where id = #{id,jdbcType=BIGINT}")
    int delete(Long id);

    
    @SelectProvider(type = SyncConfigSqlProvider.class, method = "select")
    @Results(id = "resultMap",
        value = {@Result(column = "id", property = "id", jdbcType = JdbcType.BIGINT, id = true),
            @Result(column = "name", property = "name", jdbcType = JdbcType.VARCHAR),
            @Result(column = "description", property = "description", jdbcType = JdbcType.VARCHAR),
            @Result(column = "sync_type", property = "syncType", jdbcType = JdbcType.TINYINT),
            @Result(column = "content", property = "content", jdbcType = JdbcType.OTHER,
                typeHandler = com.geespace.microservices.builder.handler.MySqlJsonHandler.class),
            @Result(column = "created_timestamp", property = "createdTimestamp", jdbcType = JdbcType.BIGINT),
            @Result(column = "created_user", property = "createdUser", jdbcType = JdbcType.BIGINT),
            @Result(column = "project_id", property = "projectId", jdbcType = JdbcType.BIGINT),
            @Result(column = "project_name", property = "projectName", jdbcType = JdbcType.VARCHAR),
            @Result(column = "modified_timestamp", property = "modifiedTimestamp", jdbcType = JdbcType.BIGINT)})
    List list(SyncConfig syncConfig);

    
    @Select({"select id,project_id,project_name,name, description, sync_type, content,"
            + " created_timestamp, created_user, modified_timestamp ",
        "from sync_config where id = #{id,jdbcType=BIGINT}"})
    @ResultMap("resultMap")
    SyncConfig findById(Long id);

    
    @SelectProvider(type = SyncConfigSqlProvider.class, method = "checkNameUnique")
    Integer checkNameUnique(Long createdUser, String name, Long id);
}

SyncConfigSqlProvider
package com.geespace.microservices.builder.dao;

import com.geespace.microservices.builder.entity.SyncConfig;

import org.apache.commons.lang3.StringUtils;
import org.apache.ibatis.jdbc.SQL;


public class SyncConfigSqlProvider {
    
    public String select(SyncConfig syncConfig) {
        SQL sql = new SQL();
        sql.SELECT("id,project_id,project_name,name, description, sync_type, content, created_timestamp,"
                + " created_user, modified_timestamp");
        sql.FROM("sync_config");
        sql.WHERe("created_user = #{createdUser,jdbcType=BIGINT}");
        if (!org.springframework.util.StringUtils.isEmpty(syncConfig.getProjectId())) {
            sql.WHERe("project_id=#{projectId}");
        }
        if (!StringUtils.isBlank(syncConfig.getName())) {
            sql.WHERe("name like concat('%', #{name,jdbcType=VARCHAR}, '%')");
        }
        sql.ORDER_BY("id desc");
        return sql.toString();
    }

    
    public String checkNameUnique(Long createdUser, String name, Long id) {
        SQL sql = new SQL();
        sql.SELECt("COUNT(name)");
        sql.FROM("sync_config");
        if (!org.springframework.util.StringUtils.isEmpty(id)) {
            sql.WHERe("id != #{id}");
        }
        sql.WHERe("created_user=#{createdUser} and name=#{name}");
        return sql.toString();
    }
}

JdbcDataSourceService
package com.geespace.microservices.datasource.service;

import java.util.List;

import com.geespace.microservices.datasource.dto.JdbcDataSourceDto;
import com.geespace.microservices.datasource.form.datasource.DataSourceAddForm;
import com.geespace.microservices.datasource.form.datasource.DataSourceSelectForm;
import com.geespace.microservices.datasource.form.datasource.DataSourceUpdateForm;
import com.geespace.microservices.datasource.response.PageResult;
import com.geespace.microservices.datasource.response.Response;


public interface JdbcDataSourceService {
    
    Response addDataSource(DataSourceAddForm dataSourceAddForm);

    
    Response updateDataSource(DataSourceUpdateForm dataSourceUpdateForm);

    
    Response deleteDataSource(Long id);

    
    Response> list(Long creator);

    
    Response> listByType(Long creator, List type);

    
    Response> listmeta();

    
    Response> select(DataSourceSelectForm form);

    
    Response find(Long id);

    
    Response findmetaDataSource(Long id);

    
    Response findDescrypt(Long id);
}

JdbcDataSourceServiceImpl
package com.geespace.microservices.datasource.service.impl;

import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;

import com.geespace.microservices.datasource.dao.DataSourceMapper;
import com.geespace.microservices.datasource.dao.metaDataSourceMapper;
import com.geespace.microservices.datasource.dto.JdbcDataSourceDto;
import com.geespace.microservices.datasource.entity.JdbcDataSource;
import com.geespace.microservices.datasource.enums.JdbcDataSourceStatusEnum;
import com.geespace.microservices.datasource.form.datasource.DataSourceAddForm;
import com.geespace.microservices.datasource.form.datasource.DataSourceSelectForm;
import com.geespace.microservices.datasource.form.datasource.DataSourceUpdateForm;
import com.geespace.microservices.datasource.response.Msg;
import com.geespace.microservices.datasource.response.PageResult;
import com.geespace.microservices.datasource.response.Response;
import com.geespace.microservices.datasource.service.JdbcDataSourceService;
import com.geespace.microservices.datasource.util.AesUtil;
import com.geespace.microservices.datasource.util.LocalCacheUtil;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;

import lombok.extern.slf4j.Slf4j;

import org.apache.commons.lang.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;


@Slf4j
@Service
public class JdbcDataSourceServiceImpl implements JdbcDataSourceService {
    @Autowired
    private DataSourceMapper dataSourceMapper;

    @Autowired
    private metaDataSourceMapper metaDataSourceMapper;

    @Override
    public Response addDataSource(DataSourceAddForm dataSourceAddForm) {
        JdbcDataSource dataSource = new JdbcDataSource();
        BeanUtils.copyProperties(dataSourceAddForm, dataSource);
        JdbcDataSource exist = dataSourceMapper.nameExist(dataSource);
        if (exist != null) {
            return Response.error(Msg.DATASOURCE_NAME_ALREAD_EXIST);
        }
        String userName = AesUtil.decrypt(dataSource.getUserName());
        // 判断账密是否为密文
        if (userName == null) {
            dataSource.setUserName(AesUtil.encrypt(dataSource.getUserName()));
        }
        String pwd = AesUtil.decrypt(dataSource.getPassword());
        if (pwd == null) {
            dataSource.setPassword(AesUtil.encrypt(dataSource.getPassword()));
        }
        dataSource.setCreateTime(new Date());
        dataSource.setUpdateTime(new Date());
        dataSource.setStatus(JdbcDataSourceStatusEnum.USING.getStatus());
        dataSourceMapper.insert(dataSource);
        JdbcDataSourceDto dataSourceDto = new JdbcDataSourceDto();
        BeanUtils.copyProperties(dataSource, dataSourceDto);
        return Response.success(dataSourceDto);
    }

    @Override
    public Response updateDataSource(DataSourceUpdateForm dataSourceUpdateForm) {
        JdbcDataSource dataSource = dataSourceMapper.find(dataSourceUpdateForm.getId());
        if (dataSource == null || dataSourceUpdateForm.getCreator() != dataSource.getCreator()) {
            return Response.error(Msg.DATASOURCE_NOT_EXIST);
        }
        String userName = AesUtil.decrypt(dataSourceUpdateForm.getUserName());
        // 判断账密是否为密文
        if (userName == null) {
            dataSourceUpdateForm.setUserName(AesUtil.encrypt(dataSourceUpdateForm.getUserName()));
        }
        String pwd = AesUtil.decrypt(dataSourceUpdateForm.getPassword());
        if (pwd == null) {
            dataSourceUpdateForm.setPassword(AesUtil.encrypt(dataSourceUpdateForm.getPassword()));
        }
        String originName = dataSource.getSourceName();
        // 注意copyProperties是将source中的属性全部copy到target中
        BeanUtils.copyProperties(dataSourceUpdateForm, dataSource);
        JdbcDataSource exist = dataSourceMapper.nameExist(dataSource);
        if (exist != null && !exist.getSourceName().equals(originName)) {
            return Response.error(Msg.DATASOURCE_NAME_ALREAD_EXIST);
        }
        dataSource.setUpdateTime(new Date());
        dataSourceMapper.update(dataSource);
        LocalCacheUtil.remove(dataSource.getCreator() + originName);
        JdbcDataSourceDto dataSourceDto = new JdbcDataSourceDto();
        BeanUtils.copyProperties(dataSource, dataSourceDto);
        return Response.success(dataSourceDto);
    }

    @Override
    public Response deleteDataSource(Long id) {
        dataSourceMapper.delete(id);
        return Response.success();
    }

    @Override
    public Response> list(Long creator) {
        List list = dataSourceMapper.list(creator);
        List listDto = list.stream().map(this::getDto).collect(Collectors.toList());
        return Response.success(listDto);
    }

    @Override
    public Response> listByType(Long creator, List type) {
        List list = dataSourceMapper.listByType(creator, type);
        List listDto = list.stream().map(this::getDto).collect(Collectors.toList());
        return Response.success(listDto);
    }

    @Override
    public Response> listmeta() {
        List list = metaDataSourceMapper.list();
        List listDto = list.stream().map(this::getDto).collect(Collectors.toList());
        return Response.success(listDto);
    }

    @Override
    public Response> select(DataSourceSelectForm form) {
        JdbcDataSource select = new JdbcDataSource();
        select.setSourceName(form.getSourceName());
        select.setCreator(form.getCreator());
        PageHelper.startPage(form.getPageNum(), form.getPageSize());
        List list = dataSourceMapper.select(select);
        PageInfo pageInfo = new PageInfo<>(list);
        PageResult pageResult = new PageResult<>();
        pageResult.setPageNum(pageInfo.getPageNum());
        pageResult.setPageSize(pageInfo.getPageSize());
        pageResult.setTotalPage(pageInfo.getPages());
        pageResult.setTotalCount(pageInfo.getTotal());
        pageResult.setList(pageInfo.getList().stream().map(this::getDto).collect(Collectors.toList()));
        return Response.success(pageResult);
    }

    @Override
    public Response find(Long id) {
        JdbcDataSource jdbcDataSource = dataSourceMapper.find(id);
        if (jdbcDataSource == null) {
            return Response.error(Msg.DATASOURCE_NOT_EXIST);
        }
        return Response.success(getDto(jdbcDataSource));
    }

    @Override
    public Response findmetaDataSource(Long id) {
        JdbcDataSource jdbcDataSource = this.metaDataSourceMapper.find(id);
        if (jdbcDataSource == null) {
            return Response.error(Msg.DATASOURCE_NOT_EXIST);
        }
        return Response.success(getDto(jdbcDataSource));
    }

    @Override
    public Response findDescrypt(Long id) {
        JdbcDataSource jdbcDataSource = dataSourceMapper.find(id);
        if (jdbcDataSource == null) {
            return Response.error(Msg.DATASOURCE_NOT_EXIST);
        }
        if (!StringUtils.isBlank(jdbcDataSource.getUserName())) {
            jdbcDataSource.setUserName(AesUtil.decrypt(jdbcDataSource.getUserName()));
        }
        if (!StringUtils.isBlank(jdbcDataSource.getPassword())) {
            jdbcDataSource.setPassword(AesUtil.decrypt(jdbcDataSource.getPassword()));
        }
        return Response.success(getDto(jdbcDataSource));
    }

    
    private JdbcDataSourceDto getDto(JdbcDataSource jdbcDataSource) {
        JdbcDataSourceDto dto = new JdbcDataSourceDto();
        BeanUtils.copyProperties(jdbcDataSource, dto);
        return dto;
    }
}

DataSourceMapper
package com.geespace.microservices.datasource.dao;

import java.util.List;

import com.geespace.microservices.datasource.entity.JdbcDataSource;

import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Options;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Result;
import org.apache.ibatis.annotations.ResultMap;
import org.apache.ibatis.annotations.Results;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.SelectKey;
import org.apache.ibatis.annotations.SelectProvider;
import org.apache.ibatis.annotations.Update;
import org.apache.ibatis.type.JdbcType;


@Mapper
public interface DataSourceMapper {
    
    @Insert({
        "insert into ge_jdbc_datasource (source_type, source_name, jdbc_url, user_name, password, zk_address, znode, ",
        "database_name, jdbc_driver_class, remark, creator, create_time, update_time, status)",
        "values (#{sourceType,jdbcType=VARCHAR}, #{sourceName,jdbcType=VARCHAR}, #{jdbcUrl,jdbcType=VARCHAR}, ",
        "#{userName,jdbcType=VARCHAR}, #{password,jdbcType=VARCHAR}, #{zkAddress,jdbcType=VARCHAR}, ",
        "#{znode,jdbcType=VARCHAR}, #{databaseName,jdbcType=VARCHAR}, #{jdbcDriverClass,jdbcType=VARCHAR}, ",
        "#{remark,jdbcType=VARCHAR}, #{creator,jdbcType=BIGINT}, #{createTime,jdbcType=TIMESTAMP}, ",
        "#{updateTime,jdbcType=TIMESTAMP}, #{status,jdbcType=TINYINT})"})
    @SelectKey(statement = "SELECT LAST_INSERT_ID()", keyProperty = "id", before = false, resultType = Long.class)
    int insert(JdbcDataSource source);

    
    @Update({"update ge_jdbc_datasource",
        "set source_type = #{sourceType,jdbcType=VARCHAR}, source_name = #{sourceName,jdbcType=VARCHAR}, ",
        "jdbc_url = #{jdbcUrl,jdbcType=VARCHAR}, user_name = #{userName,jdbcType=VARCHAR}, ",
        "password = #{password,jdbcType=VARCHAR}, zk_address = #{zkAddress,jdbcType=VARCHAR}, ",
        "znode = #{znode,jdbcType=VARCHAR}, database_name = #{databaseName,jdbcType=VARCHAR}, ",
        "jdbc_driver_class = #{jdbcDriverClass,jdbcType=VARCHAR}, remark = #{remark,jdbcType=VARCHAR}, ",
        "update_time = #{updateTime,jdbcType=TIMESTAMP}, status = #{status,jdbcType=TINYINT}",
        "where id = #{id,jdbcType=BIGINT} and status = 1"})
    int update(JdbcDataSource source);

    
    @Delete("delete from ge_jdbc_datasource where id = #{id,jdbcType=BIGINT}")
    int delete(Long id);

    
    @Select({"select * from ge_jdbc_datasource where creator = #{creator,jdbcType=BIGINT} and status = 1",
        " order by id desc"})
    @Results(id = "resultMap",
        value = {@Result(column = "id", property = "id", jdbcType = JdbcType.BIGINT, id = true),
            @Result(column = "source_type", property = "sourceType", jdbcType = JdbcType.VARCHAR),
            @Result(column = "source_name", property = "sourceName", jdbcType = JdbcType.VARCHAR),
            @Result(column = "jdbc_url", property = "jdbcUrl", jdbcType = JdbcType.VARCHAR),
            @Result(column = "user_name", property = "userName", jdbcType = JdbcType.VARCHAR),
            @Result(column = "password", property = "password", jdbcType = JdbcType.VARCHAR),
            @Result(column = "zk_address", property = "zkAddress", jdbcType = JdbcType.VARCHAR),
            @Result(column = "znode", property = "znode", jdbcType = JdbcType.VARCHAR),
            @Result(column = "database_name", property = "databaseName", jdbcType = JdbcType.VARCHAR),
            @Result(column = "jdbc_driver_class", property = "jdbcDriverClass", jdbcType = JdbcType.VARCHAR),
            @Result(column = "remark", property = "remark", jdbcType = JdbcType.VARCHAR),
            @Result(column = "creator", property = "creator", jdbcType = JdbcType.BIGINT),
            @Result(column = "create_time", property = "createTime", jdbcType = JdbcType.TIMESTAMP),
            @Result(column = "update_time", property = "updateTime", jdbcType = JdbcType.TIMESTAMP),
            @Result(column = "status", property = "status", jdbcType = JdbcType.TINYINT)})
    List list(Long creator);

    
    @Select({""})
    @ResultMap("resultMap")
    List listByType(@Param("creator") Long creator, @Param("type") List type);

    
    @SelectProvider(type = DataSourceSqlProvider.class, method = "select")
    @ResultMap("resultMap")
    List select(JdbcDataSource jdbcDataSource);

    
    @Options(flushCache = Options.FlushCachePolicy.TRUE)
    @Select("select * from ge_jdbc_datasource where id = #{id,jdbcType=BIGINT} and status = 1")
    @ResultMap("resultMap")
    JdbcDataSource find(Long id);

    
    @Select({"select * from ge_jdbc_datasource ",
        "where source_name = #{sourceName,jdbcType=VARCHAR} and creator = #{creator,jdbcType=BIGINT}"})
    @ResultMap("resultMap")
    JdbcDataSource nameExist(JdbcDataSource jdbcDataSource);
}

DataSourceSqlProvider
package com.geespace.microservices.datasource.dao;

import com.geespace.microservices.datasource.entity.JdbcDataSource;

import org.apache.commons.lang.StringUtils;
import org.apache.ibatis.jdbc.SQL;


public class DataSourceSqlProvider {
    
    public String select(JdbcDataSource jdbcDataSource) {
        SQL sql = new SQL();
        sql.SELECT("*");
        sql.FROM("ge_jdbc_datasource");
        sql.WHERe("status = 1");
        if (jdbcDataSource.getCreator() != null) {
            sql.WHERe("creator = #{creator,jdbcType=BIGINT}");
        }
        if (!StringUtils.isBlank(jdbcDataSource.getSourceName())) {
            sql.WHERe("source_name like concat('%', #{sourceName,jdbcType=VARCHAR}, '%')");
        }
        sql.ORDER_BY("id desc");
        return sql.toString();
    }
}

metaDataSourceMapper
package com.geespace.microservices.datasource.dao;

import java.util.List;

import com.geespace.microservices.datasource.entity.JdbcDataSource;

import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Result;
import org.apache.ibatis.annotations.ResultMap;
import org.apache.ibatis.annotations.Results;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.type.JdbcType;


@Mapper
public interface metaDataSourceMapper {
    
    @Select("select * from ge_meta_datasource ")
    @Results(id = "resultMap",
        value = {@Result(column = "id", property = "id", jdbcType = JdbcType.BIGINT, id = true),
            @Result(column = "source_type", property = "sourceType", jdbcType = JdbcType.VARCHAR),
            @Result(column = "source_name", property = "sourceName", jdbcType = JdbcType.VARCHAR),
            @Result(column = "jdbc_url", property = "jdbcUrl", jdbcType = JdbcType.VARCHAR),
            @Result(column = "user_name", property = "userName", jdbcType = JdbcType.VARCHAR),
            @Result(column = "password", property = "password", jdbcType = JdbcType.VARCHAR),
            @Result(column = "zk_address", property = "zkAddress", jdbcType = JdbcType.VARCHAR),
            @Result(column = "znode", property = "znode", jdbcType = JdbcType.VARCHAR),
            @Result(column = "database_name", property = "databaseName", jdbcType = JdbcType.VARCHAR),
            @Result(column = "jdbc_driver_class", property = "jdbcDriverClass", jdbcType = JdbcType.VARCHAR),
            @Result(column = "remark", property = "remark", jdbcType = JdbcType.VARCHAR),
            @Result(column = "creator", property = "creator", jdbcType = JdbcType.BIGINT),
            @Result(column = "create_time", property = "createTime", jdbcType = JdbcType.TIMESTAMP),
            @Result(column = "update_time", property = "updateTime", jdbcType = JdbcType.TIMESTAMP),
            @Result(column = "status", property = "status", jdbcType = JdbcType.TINYINT)})
    List list();
    
    @Select("select * from ge_meta_datasource where id = #{id,jdbcType=BIGINT} ")
    @ResultMap("resultMap")
    JdbcDataSource find(Long id);
}

DataSourceSelectForm
package com.geespace.microservices.datasource.form.datasource;

import javax.validation.constraints.NotNull;

import lombok.Data;


@Data
public class DataSourceSelectForm {
    
    private String sourceName;
    
    private Long creator;
    
    @NotNull(message = "pageSize不能为空")
    private int pageSize;
    
    @NotNull(message = "pageNum不能为空")
    private int pageNum;
}

DataSourceAddForm
package com.geespace.microservices.datasource.form.datasource;

import javax.validation.constraints.NotBlank;

import lombok.Data;


@Data
public class DataSourceAddForm {
    
    @NotBlank(message = "数据源类型不能为空")
    private String sourceType;
    
    @NotBlank(message = "数据源名称不能为空")
    private String sourceName;
    
    private String jdbcUrl;
    
    private String userName;
    
    private String password;
    
    private String zkAddress;
    
    private String znode;
    
    private String databaseName;
    
    private String jdbcDriverClass;
    
    private String remark;
    
    private Long creator;

}

DataSourceUpdateForm
package com.geespace.microservices.datasource.form.datasource;

import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;

import lombok.Data;


@Data
public class DataSourceUpdateForm {
    
    @NotNull(message = "id不能为空")
    private Long id;
    
    @NotBlank(message = "数据源类型不能为空")
    private String sourceType;
    
    @NotBlank(message = "数据源名称不能为空")
    private String sourceName;
    
    private String jdbcUrl;
    
    private String userName;
    
    private String password;
    
    private String zkAddress;
    
    private String znode;
    
    private String databaseName;
    
    private String jdbcDriverClass;
    
    private String remark;
    
    private Long creator;
}

YAPI测试用例 5.1查询全部同步任务配置(分页)

{
  "pageNum": 1,
  "pageSize": 10,
  "projectId": 28,
  "name": "测试"
}
5.2 创建同步任务配置-mysql->mysql
{
  "name": "测试同步任务配置-mysql-mysql-1",
  "description": "测试同步任务配置-mysql-mysql-1",
  "projectName": "test测试1",
  "projectId": 28,
  "syncType": 2,
  "readerConfigId": 1,
  "readerParam": {
    "table": "test_test"
  },
  "writerConfigId": 1,
  "writerParam": {
    "table": "test_test_1"
  },
  "columnMap": [
    {
      "reader": "id",
      "writer": "id"
    },
    {
      "reader": "name",
      "writer": "name"
    }
  ]
}
5.6 创建同步任务配置-hbase->hbase
{
  "name": "测试同步任务配置-hbase-hbase-1",
  "description": "测试同步任务配置-hbase-hbase-1",
  "projectName": "test测试1",
  "projectId": 28,
  "syncType": 2,
  "readerConfigId": 130,
  "readerParam": {
    "table": "test_test"
  },
  "writerConfigId": 130,
  "writerParam": {
    "table": "test_test_1",
    "rowkeyColumns": [
      "f:id",
      "f:name"
    ]
  },
  "columnMap": [
    {
      "reader": "f:id",
      "writer": "f:id"
    },
    {
      "reader": "f:name",
      "writer": "f:name"
    }
  ]
}
5.7 创建同步任务配置-mysql->hbase
{
  "name": "测试同步任务配置-mysql-hbase-1",
  "description": "测试同步任务配置mysql-hbase-1",
  "projectName": "test测试1",
  "projectId": 28,
  "syncType": 2,
  "readerConfigId": 1,
  "readerParam": {
    "table": "test_test"
  },
  "writerConfigId": 130,
  "writerParam": {
    "table": "test_test_1",
    "rowkeyColumns": [
      "f:id",
      "f:name"
    ]
  },
  "columnMap": [
    {
      "reader": "id",
      "writer": "f:id"
    },
    {
      "reader": "name",
      "writer": "f:name"
    }
  ]
}
5.8 创建同步任务配置-hbase->mysql
{
  "name": "测试同步任务配置-hbase-mysql-1",
  "description": "测试同步任务配置-hbase-mysql-1",
  "projectName": "test测试1",
  "projectId": 28,
  "syncType": 2,
  "readerConfigId": 130,
  "readerParam": {
    "table": "test_test",
    "rowkeyColumns": [
      "f:id",
      "f:name"
    ]
  },
  "writerConfigId": 1,
  "writerParam": {
    "table": "test_test_1"
  },
  "columnMap": [
    {
      "reader": "f:id",
      "writer": "id"
    },
    {
      "reader": "f:name",
      "writer": "name"
    }
  ]
}
5.3 更新同步任务配置
{
  "id": 82,
  "name": "测试同步任务配置-mysql-3",
  "description": "测试同步任务配置-mysql-3",
  "projectName": "数据同步任务",
  "projectId": 19,
  "syncType": 2,
  "readerConfigId": 1,
  "readerParam": {
    "table": "test_test"
  },
  "writerConfigId": 1,
  "writerParam": {
    "table": "test_test_1"
  },
  "columnMap": [
    {
      "reader": "id",
      "writer": "id"
    },
    {
      "reader": "name",
      "writer": "name"
    }
  ]
}
5.5 删除同步任务配置

5.4 查询同步任务配置

3.3 执行数据同步任务

3.4 停止数据同步任务

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/350401.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号