Datax安装及基本使用请查看上一篇文章:https://blog.csdn.net/a924382407/article/details/120952339?spm=1001.2014.3001.5501
| 功能清单 |
|---|
| CRUD增删改查 、启动任务、停止任务 |
ConfigAddForm4.0.0 data-sync-config com.geespace.microservices.bd-platform all1.0-SNAPSHOT 1.0-SNAPSHOT 1.8 2.8.1 com.github.pagehelper pagehelper-spring-boot-startercom.google.code.gson gson${gson.version} org.springframework.boot spring-boot-starter-weborg.elasticsearch elasticsearch6.8.12 org.elasticsearch.client elasticsearch-rest-client6.8.12 org.elasticsearch.client elasticsearch-rest-high-level-client6.8.12 org.springframework.boot spring-boot-devtoolsruntime true mysql mysql-connector-javaruntime org.springframework.boot spring-boot-configuration-processortrue org.projectlombok lomboktrue org.springframework.boot spring-boot-starter-testtest org.junit.vintage junit-vintage-enginecom.alibaba fastjsonorg.codehaus.jackson jackson-core-asl1.9.13 compile com.geespace.microservices.bd-platform data-config1.0-SNAPSHOT compile commons-httpclient commons-httpclient3.1 org.apache.maven.plugins maven-shade-plugin2.2 org.springframework.boot spring-boot-maven-plugin2.1.4.RELEASE false true *:* meta-INF @Slf4j @RestController @RequestMapping("/dolphinscheduler/v1") public class DataxDolphinschedulerController { @Autowired private RestTemplate restTemplate; @Value("${dolphinscheduler.token}") String token; @Value("${dolphinscheduler.address}") String address; public static final int ZERO = 0; public static final int SUCCESS = 200; public static final String CREATE = "create"; public static final String UPDATe = "update"; public static final String ADD = "add"; public static final String DELETE = "delete"; public static final String onLINE = "ONLINE"; public static final String OFFLINE = "OFFLINE"; public static final int ONE_THOUSAND_AND_FIVE_HUNDRED = 1500; public static final int SIX = 6; public static final int EIGHTY = 80; public static final int THREE = 3; @Autowired private SyncConfigService syncConfigService; @PostMapping("/project/process/datax") @Transactional(rollbackFor = Exception.class) public ReturnResult operatorDataxTask(HttpServletRequest request, @RequestBody @Validated ConfigAddForm form) { Long userId = Long.valueOf(request.getUserPrincipal().getName()); form.setUserId(userId); ReturnResult dataxTaskReturnResult = syncConfigService.addConfig(form); if (dataxTaskReturnResult.getCode() != SUCCESS) { return dataxTaskReturnResult; } log.info("--(1)addDataxTaskResult--success"); form.setId(dataxTaskReturnResult.getData().getId()); if (dataxTaskReturnResult.getCode() == SUCCESS) { Boolean verifyResult = verifyProcessExist(userId + "-dataxTask", form.getProjectName()); log.info("--(2)verifyProcessExist--success:{}", verifyResult); if (!verifyResult) { ProcessDto processDto = packageProcessParam( "create", userId + "-dataxTask", dataxTaskReturnResult.getData(), null); log.info("--(3)packageProcessParam--success"); processDto.setProjectName(form.getProjectName()); processDto.setProjectId(form.getProjectId()); dataxTaskReturnResult = createProcess(processDto); } else { //获取用户下唯一工作流ID DolphinschedulerResponse processInfoList = getUserProcess(form.getProjectName()); JSonObject processJson = new JSonObject(); log.info("--(3)getUserProcess--success:{}", processInfoList); List
package com.geespace.microservices.builder.request;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import lombok.Data;
@Data
public class ConfigAddForm {
@NotEmpty(message = "name不能为空")
private String name;
private String description;
@NotNull(message = "同步方式不能为空")
private int syncType;
@NotNull(message = "读取数据源id不能为空")
private Long readerConfigId;
@NotEmpty(message = "读取参数不能为空")
private JSonObject readerParam;
@NotNull(message = "写入数据源id不能为空")
private Long writerConfigId;
@NotEmpty(message = "写入参数不能为空")
private JSonObject writerParam;
@NotEmpty(message = "字段对照表不能为空")
private JSonArray columnMap;
private Long userId;
String projectName;
@NotNull(message = "projectId not null")
Long projectId;
Long id;
}
ConfigUpdateForm
package com.geespace.microservices.builder.request;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import lombok.Data;
@Data
public class ConfigUpdateForm {
@NotNull(message = "同步配置id不能为空")
private Long id;
@NotEmpty(message = "name不能为空")
private String name;
private String description;
@NotNull(message = "同步方式不能为空")
private int syncType;
@NotNull(message = "读取数据源id不能为空")
private Long readerConfigId;
@NotEmpty(message = "读取参数不能为空")
private JSonObject readerParam;
@NotNull(message = "写入数据源id不能为空")
private Long writerConfigId;
@NotEmpty(message = "写入参数不能为空")
private JSonObject writerParam;
@NotEmpty(message = "字段对照表不能为空")
private JSonArray columnMap;
private Long userId;
@NotNull(message = "projectId not null")
Long projectId;
String projectName;
}
ProcessDto
package com.geespace.microservices.builder.dto;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
@Data
@EqualsAndHashCode(callSuper = false)
@ToString(callSuper = true)
public class ProcessDto {
private Integer id;
private String connects;
private String locations;
private String name;
private String processDefinitionJson;
String projectName;
Long projectId;
}
SyncConfigDto
package com.geespace.microservices.builder.dto;
import com.alibaba.fastjson.JSONObject;
import lombok.Data;
@Data
public class SyncConfigDto {
private Long id;
private String name;
private String description;
private int syncType;
private JSonObject content;
String projectName;
Long projectId;
}
SyncConfigService
package com.geespace.microservices.builder.service;
import com.geespace.microservices.builder.dto.SyncConfigDto;
import com.geespace.microservices.builder.request.ConfigAddForm;
import com.geespace.microservices.builder.request.ConfigSelectForm;
import com.geespace.microservices.builder.request.ConfigUpdateForm;
import com.geespace.microservices.builder.response.PageResult;
import com.geespace.microservices.builder.response.ReturnResult;
public interface SyncConfigService {
ReturnResult addConfig(ConfigAddForm form);
ReturnResult updateConfig(ConfigUpdateForm form);
ReturnResult findById(Long id, Long userId);
ReturnResult delete(Long id, Long userId);
ReturnResult> list(ConfigSelectForm form, Long userId);
}
SyncConfigServiceImpl
package com.geespace.microservices.builder.service.impl;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.geespace.microservices.builder.biz.Contants;
import com.geespace.microservices.builder.dao.SyncConfigMapper;
import com.geespace.microservices.builder.dto.ColumnMap;
import com.geespace.microservices.builder.dto.SyncConfigDto;
import com.geespace.microservices.builder.entity.SyncConfig;
import com.geespace.microservices.builder.factory.baseParamTool;
import com.geespace.microservices.builder.factory.ParamToolFactory;
import com.geespace.microservices.builder.request.ConfigAddForm;
import com.geespace.microservices.builder.request.ConfigSelectForm;
import com.geespace.microservices.builder.request.ConfigUpdateForm;
import com.geespace.microservices.builder.response.BizCode;
import com.geespace.microservices.builder.response.PageResult;
import com.geespace.microservices.builder.response.ReturnResult;
import com.geespace.microservices.builder.service.SyncConfigService;
import com.geespace.microservices.datasource.dto.JdbcDataSourceDto;
import com.geespace.microservices.datasource.response.Response;
import com.geespace.microservices.datasource.service.JdbcDataSourceService;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
@Service
@Slf4j
public class SyncConfigServiceImpl implements SyncConfigService {
public static final int ZERO = 0;
public static final String Hbase = "hbase";
@Autowired
private SyncConfigMapper syncConfigMapper;
@Autowired
private JdbcDataSourceService dataSourceService;
@Override
public ReturnResult addConfig(ConfigAddForm form) {
Integer checkResult = syncConfigMapper.checkNameUnique(form.getUserId(), form.getName(), null);
if (checkResult == ZERO) {
ColumnMap columnMap = makeColumnMap(form.getColumnMap());
if (columnMap == null) {
return ReturnResult.error(BizCode.COLUMN_MATCHING_ERROR);
}
// 查询reader数据源 填充reader
JSonObject reader = packageReader(form.getReaderConfigId(), form.getReaderParam(), columnMap.getReader());
// 查询writer数据源 填充writer
JSonObject writer = packageWriter(form.getWriterConfigId(), form.getWriterParam(), columnMap.getWriter());
JSonArray contentArray = new JSonArray();
JSonObject content = new JSonObject();
content.put("reader", reader);
content.put("writer", writer);
contentArray.add(content);
SyncConfig syncConfig = new SyncConfig();
syncConfig.setContent(packageJob(contentArray));
syncConfig.setName(form.getName());
syncConfig.setDescription(form.getDescription());
syncConfig.setSyncType(form.getSyncType());
syncConfig.setCreatedTimestamp(System.currentTimeMillis());
syncConfig.setCreatedUser(form.getUserId());
syncConfig.setModifiedTimestamp(System.currentTimeMillis());
syncConfig.setProjectName(form.getProjectName());
syncConfig.setProjectId(form.getProjectId());
syncConfigMapper.insert(syncConfig);
return ReturnResult.success(entityToDto(syncConfig));
}
log.error("SyncConfigServiceImpl--addConfig--NAME_IS_EXIST!");
return ReturnResult.error(BizCode.NAME_IS_EXIST);
}
@Override
public ReturnResult updateConfig(ConfigUpdateForm form) {
SyncConfig syncConfig = syncConfigMapper.findById(form.getId());
if (syncConfig == null || syncConfig.getCreatedUser() != form.getUserId()) {
return ReturnResult.error(BizCode.UPDATE_OBJECT_NOT_EXIST);
}
Integer checkResult = syncConfigMapper.checkNameUnique(form.getUserId(), form.getName(), form.getId());
if (checkResult == ZERO) {
ColumnMap columnMap = makeColumnMap(form.getColumnMap());
JSonObject reader = packageReader(form.getReaderConfigId(), form.getReaderParam(), columnMap.getReader());
JSonObject writer = packageWriter(form.getWriterConfigId(), form.getWriterParam(), columnMap.getWriter());
JSonArray contentArray = new JSonArray();
JSonObject content = new JSonObject();
content.put("reader", reader);
content.put("writer", writer);
contentArray.add(content);
syncConfig.setContent(packageJob(contentArray));
syncConfig.setName(form.getName());
syncConfig.setDescription(form.getDescription());
syncConfig.setSyncType(form.getSyncType());
syncConfig.setModifiedTimestamp(System.currentTimeMillis());
syncConfig.setProjectName(form.getProjectName());
syncConfig.setProjectId(form.getProjectId());
syncConfigMapper.update(syncConfig);
return ReturnResult.success(entityToDto(syncConfig));
}
log.error("SyncConfigServiceImpl--updateConfig--NAME_IS_EXIST!");
return ReturnResult.error(BizCode.NAME_IS_EXIST);
}
@Override
public ReturnResult findById(Long id, Long userId) {
SyncConfig syncConfig = syncConfigMapper.findById(id);
if (syncConfig == null || syncConfig.getCreatedUser() != userId) {
return ReturnResult.success(new SyncConfigDto());
}
return ReturnResult.success(entityToDto(syncConfig));
}
@Override
public ReturnResult delete(Long id, Long userId) {
log.debug("****id:{},userId:{}****", id, userId);
SyncConfig syncConfig = syncConfigMapper.findById(id);
log.debug("****syncConfig:{}****", syncConfig);
log.debug("****syncConfig != null:{}", syncConfig != null);
log.debug("****syncConfig.getCreatedUser():{},userId:{},syncConfig.getCreatedUser().equals(userId):{}",
syncConfig.getCreatedUser(), userId, syncConfig.getCreatedUser().equals(userId));
if (syncConfig != null && syncConfig.getCreatedUser().equals(userId)) {
syncConfigMapper.delete(id);
log.debug("****delete success!");
}
return ReturnResult.success();
}
@Override
public ReturnResult> list(ConfigSelectForm form, Long userId) {
SyncConfig syncConfig = new SyncConfig();
syncConfig.setCreatedUser(userId);
syncConfig.setName(form.getName());
syncConfig.setProjectId(form.getProjectId());
PageHelper.startPage(form.getPageNum(), form.getPageSize());
PageInfo configPageInfo = new PageInfo<>(syncConfigMapper.list(syncConfig));
PageResult result = new PageResult<>();
result.setPageNum(configPageInfo.getPageNum());
result.setPageSize(configPageInfo.getPageSize());
result.setTotalCount(configPageInfo.getTotal());
result.setTotalPage(configPageInfo.getPages());
List dtoList =
configPageInfo.getList().stream().map(this::entityToDto).collect(Collectors.toList());
result.setList(dtoList);
return ReturnResult.success(result);
}
private ColumnMap makeColumnMap(JSonArray columnMap) {
List readerColumns = new ArrayList<>();
List writerColumns = new ArrayList<>();
for (int i = 0; i < columnMap.size(); i++) {
JSonObject column = columnMap.getJSonObject(i);
readerColumns.add(column.getString("reader"));
writerColumns.add(column.getString("writer"));
}
if (CollectionUtils.isEmpty(readerColumns) || CollectionUtils.isEmpty(writerColumns)) {
return null;
}
ColumnMap column = new ColumnMap();
column.setReader(readerColumns);
column.setWriter(writerColumns);
return column;
}
private JSonObject packageReader(Long readerConfigId, JSonObject readerParam, List readerColumns) {
Response descrypt = dataSourceService.findDescrypt(readerConfigId);
if (!descrypt.responseSuccess()) {
return null;
}
JdbcDataSourceDto jdbcDataSource = descrypt.getInfo();
String sourceType = jdbcDataSource.getSourceType();
baseParamTool baseParamTool = ParamToolFactory.getByType(sourceType);
JSonObject reader = baseParamTool.makeReaderJson(jdbcDataSource, readerParam, readerColumns);
return reader;
}
private JSonObject packageWriter(Long writerConfigId, JSonObject writerParam, List writerColumns) {
Response descrypt = dataSourceService.findDescrypt(writerConfigId);
if (!descrypt.responseSuccess()) {
return null;
}
JdbcDataSourceDto jdbcDataSource = descrypt.getInfo();
String sourceType = jdbcDataSource.getSourceType();
baseParamTool baseParamTool = ParamToolFactory.getByType(sourceType);
JSonObject writer = baseParamTool.makeWriterJson(jdbcDataSource, writerParam, writerColumns);
return writer;
}
private JSonObject packageJob(JSonArray content) {
JSonObject job = new JSonObject();
JSonObject setting = new JSonObject();
JSonObject speed = new JSonObject();
speed.put("channel", 1);
JSonObject errorLimit = new JSonObject();
errorLimit.put("record", 0);
errorLimit.put("percentage", Contants.PERCENTAGE);
setting.put("speed", speed);
setting.put("errorLimit", errorLimit);
job.put("setting", setting);
job.put("content", content);
JSonObject jobContent = new JSonObject();
jobContent.put("job", job);
return jobContent;
}
private SyncConfigDto entityToDto(SyncConfig syncConfig) {
SyncConfigDto configDto = new SyncConfigDto();
BeanUtils.copyProperties(syncConfig, configDto);
return configDto;
}
}
SyncConfigMapper
package com.geespace.microservices.builder.dao;
import java.util.List;
import com.geespace.microservices.builder.entity.SyncConfig;
import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Result;
import org.apache.ibatis.annotations.ResultMap;
import org.apache.ibatis.annotations.Results;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.SelectKey;
import org.apache.ibatis.annotations.SelectProvider;
import org.apache.ibatis.annotations.Update;
import org.apache.ibatis.type.JdbcType;
@Mapper
public interface SyncConfigMapper {
@Insert({"insert into sync_config (name, description, content, sync_type, created_timestamp, created_user, ",
"modified_timestamp,project_id,project_name) values (#{name,jdbcType=VARCHAR},#{description,jdbcType=VARCHAR},",
"#{content,jdbcType=OTHER, typeHandler=com.geespace.microservices.builder.handler.MySqlJsonHandler}, ",
"#{syncType,jdbcType=TINYINT}, #{createdTimestamp,jdbcType=BIGINT}, #{createdUser,jdbcType=BIGINT}, ",
"#{modifiedTimestamp,jdbcType=BIGINT},#{projectId},#{projectName})"})
@SelectKey(statement = "SELECT LAST_INSERT_ID()", keyProperty = "id", before = false, resultType = Long.class)
Long insert(SyncConfig syncConfig);
@Update({"update sync_config set ",
"name = #{name,jdbcType=VARCHAR}, description = #{description,jdbcType=VARCHAR}, ",
"sync_type = #{syncType,jdbcType=TINYINT}, content = #{content,jdbcType=OTHER,",
"typeHandler=com.geespace.microservices.builder.handler.MySqlJsonHandler},project_id=#{projectId}, ",
"created_timestamp = #{createdTimestamp,jdbcType=BIGINT}, created_user = #{createdUser,jdbcType=BIGINT}, ",
"modified_timestamp = #{modifiedTimestamp,jdbcType=BIGINT},project_name=#{projectName},project_id=#{projectId}",
" where id = #{id,jdbcType=BIGINT}"})
int update(SyncConfig syncConfig);
@Delete("delete from sync_config where id = #{id,jdbcType=BIGINT}")
int delete(Long id);
@SelectProvider(type = SyncConfigSqlProvider.class, method = "select")
@Results(id = "resultMap",
value = {@Result(column = "id", property = "id", jdbcType = JdbcType.BIGINT, id = true),
@Result(column = "name", property = "name", jdbcType = JdbcType.VARCHAR),
@Result(column = "description", property = "description", jdbcType = JdbcType.VARCHAR),
@Result(column = "sync_type", property = "syncType", jdbcType = JdbcType.TINYINT),
@Result(column = "content", property = "content", jdbcType = JdbcType.OTHER,
typeHandler = com.geespace.microservices.builder.handler.MySqlJsonHandler.class),
@Result(column = "created_timestamp", property = "createdTimestamp", jdbcType = JdbcType.BIGINT),
@Result(column = "created_user", property = "createdUser", jdbcType = JdbcType.BIGINT),
@Result(column = "project_id", property = "projectId", jdbcType = JdbcType.BIGINT),
@Result(column = "project_name", property = "projectName", jdbcType = JdbcType.VARCHAR),
@Result(column = "modified_timestamp", property = "modifiedTimestamp", jdbcType = JdbcType.BIGINT)})
List list(SyncConfig syncConfig);
@Select({"select id,project_id,project_name,name, description, sync_type, content,"
+ " created_timestamp, created_user, modified_timestamp ",
"from sync_config where id = #{id,jdbcType=BIGINT}"})
@ResultMap("resultMap")
SyncConfig findById(Long id);
@SelectProvider(type = SyncConfigSqlProvider.class, method = "checkNameUnique")
Integer checkNameUnique(Long createdUser, String name, Long id);
}
SyncConfigSqlProvider
package com.geespace.microservices.builder.dao;
import com.geespace.microservices.builder.entity.SyncConfig;
import org.apache.commons.lang3.StringUtils;
import org.apache.ibatis.jdbc.SQL;
public class SyncConfigSqlProvider {
public String select(SyncConfig syncConfig) {
SQL sql = new SQL();
sql.SELECT("id,project_id,project_name,name, description, sync_type, content, created_timestamp,"
+ " created_user, modified_timestamp");
sql.FROM("sync_config");
sql.WHERe("created_user = #{createdUser,jdbcType=BIGINT}");
if (!org.springframework.util.StringUtils.isEmpty(syncConfig.getProjectId())) {
sql.WHERe("project_id=#{projectId}");
}
if (!StringUtils.isBlank(syncConfig.getName())) {
sql.WHERe("name like concat('%', #{name,jdbcType=VARCHAR}, '%')");
}
sql.ORDER_BY("id desc");
return sql.toString();
}
public String checkNameUnique(Long createdUser, String name, Long id) {
SQL sql = new SQL();
sql.SELECt("COUNT(name)");
sql.FROM("sync_config");
if (!org.springframework.util.StringUtils.isEmpty(id)) {
sql.WHERe("id != #{id}");
}
sql.WHERe("created_user=#{createdUser} and name=#{name}");
return sql.toString();
}
}
JdbcDataSourceService
package com.geespace.microservices.datasource.service;
import java.util.List;
import com.geespace.microservices.datasource.dto.JdbcDataSourceDto;
import com.geespace.microservices.datasource.form.datasource.DataSourceAddForm;
import com.geespace.microservices.datasource.form.datasource.DataSourceSelectForm;
import com.geespace.microservices.datasource.form.datasource.DataSourceUpdateForm;
import com.geespace.microservices.datasource.response.PageResult;
import com.geespace.microservices.datasource.response.Response;
public interface JdbcDataSourceService {
Response addDataSource(DataSourceAddForm dataSourceAddForm);
Response updateDataSource(DataSourceUpdateForm dataSourceUpdateForm);
Response deleteDataSource(Long id);
Response> list(Long creator);
Response> listByType(Long creator, List type);
Response> listmeta();
Response> select(DataSourceSelectForm form);
Response find(Long id);
Response findmetaDataSource(Long id);
Response findDescrypt(Long id);
}
JdbcDataSourceServiceImpl
package com.geespace.microservices.datasource.service.impl;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
import com.geespace.microservices.datasource.dao.DataSourceMapper;
import com.geespace.microservices.datasource.dao.metaDataSourceMapper;
import com.geespace.microservices.datasource.dto.JdbcDataSourceDto;
import com.geespace.microservices.datasource.entity.JdbcDataSource;
import com.geespace.microservices.datasource.enums.JdbcDataSourceStatusEnum;
import com.geespace.microservices.datasource.form.datasource.DataSourceAddForm;
import com.geespace.microservices.datasource.form.datasource.DataSourceSelectForm;
import com.geespace.microservices.datasource.form.datasource.DataSourceUpdateForm;
import com.geespace.microservices.datasource.response.Msg;
import com.geespace.microservices.datasource.response.PageResult;
import com.geespace.microservices.datasource.response.Response;
import com.geespace.microservices.datasource.service.JdbcDataSourceService;
import com.geespace.microservices.datasource.util.AesUtil;
import com.geespace.microservices.datasource.util.LocalCacheUtil;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class JdbcDataSourceServiceImpl implements JdbcDataSourceService {
@Autowired
private DataSourceMapper dataSourceMapper;
@Autowired
private metaDataSourceMapper metaDataSourceMapper;
@Override
public Response addDataSource(DataSourceAddForm dataSourceAddForm) {
JdbcDataSource dataSource = new JdbcDataSource();
BeanUtils.copyProperties(dataSourceAddForm, dataSource);
JdbcDataSource exist = dataSourceMapper.nameExist(dataSource);
if (exist != null) {
return Response.error(Msg.DATASOURCE_NAME_ALREAD_EXIST);
}
String userName = AesUtil.decrypt(dataSource.getUserName());
// 判断账密是否为密文
if (userName == null) {
dataSource.setUserName(AesUtil.encrypt(dataSource.getUserName()));
}
String pwd = AesUtil.decrypt(dataSource.getPassword());
if (pwd == null) {
dataSource.setPassword(AesUtil.encrypt(dataSource.getPassword()));
}
dataSource.setCreateTime(new Date());
dataSource.setUpdateTime(new Date());
dataSource.setStatus(JdbcDataSourceStatusEnum.USING.getStatus());
dataSourceMapper.insert(dataSource);
JdbcDataSourceDto dataSourceDto = new JdbcDataSourceDto();
BeanUtils.copyProperties(dataSource, dataSourceDto);
return Response.success(dataSourceDto);
}
@Override
public Response updateDataSource(DataSourceUpdateForm dataSourceUpdateForm) {
JdbcDataSource dataSource = dataSourceMapper.find(dataSourceUpdateForm.getId());
if (dataSource == null || dataSourceUpdateForm.getCreator() != dataSource.getCreator()) {
return Response.error(Msg.DATASOURCE_NOT_EXIST);
}
String userName = AesUtil.decrypt(dataSourceUpdateForm.getUserName());
// 判断账密是否为密文
if (userName == null) {
dataSourceUpdateForm.setUserName(AesUtil.encrypt(dataSourceUpdateForm.getUserName()));
}
String pwd = AesUtil.decrypt(dataSourceUpdateForm.getPassword());
if (pwd == null) {
dataSourceUpdateForm.setPassword(AesUtil.encrypt(dataSourceUpdateForm.getPassword()));
}
String originName = dataSource.getSourceName();
// 注意copyProperties是将source中的属性全部copy到target中
BeanUtils.copyProperties(dataSourceUpdateForm, dataSource);
JdbcDataSource exist = dataSourceMapper.nameExist(dataSource);
if (exist != null && !exist.getSourceName().equals(originName)) {
return Response.error(Msg.DATASOURCE_NAME_ALREAD_EXIST);
}
dataSource.setUpdateTime(new Date());
dataSourceMapper.update(dataSource);
LocalCacheUtil.remove(dataSource.getCreator() + originName);
JdbcDataSourceDto dataSourceDto = new JdbcDataSourceDto();
BeanUtils.copyProperties(dataSource, dataSourceDto);
return Response.success(dataSourceDto);
}
@Override
public Response deleteDataSource(Long id) {
dataSourceMapper.delete(id);
return Response.success();
}
@Override
public Response> list(Long creator) {
List list = dataSourceMapper.list(creator);
List listDto = list.stream().map(this::getDto).collect(Collectors.toList());
return Response.success(listDto);
}
@Override
public Response> listByType(Long creator, List type) {
List list = dataSourceMapper.listByType(creator, type);
List listDto = list.stream().map(this::getDto).collect(Collectors.toList());
return Response.success(listDto);
}
@Override
public Response> listmeta() {
List list = metaDataSourceMapper.list();
List listDto = list.stream().map(this::getDto).collect(Collectors.toList());
return Response.success(listDto);
}
@Override
public Response> select(DataSourceSelectForm form) {
JdbcDataSource select = new JdbcDataSource();
select.setSourceName(form.getSourceName());
select.setCreator(form.getCreator());
PageHelper.startPage(form.getPageNum(), form.getPageSize());
List list = dataSourceMapper.select(select);
PageInfo pageInfo = new PageInfo<>(list);
PageResult pageResult = new PageResult<>();
pageResult.setPageNum(pageInfo.getPageNum());
pageResult.setPageSize(pageInfo.getPageSize());
pageResult.setTotalPage(pageInfo.getPages());
pageResult.setTotalCount(pageInfo.getTotal());
pageResult.setList(pageInfo.getList().stream().map(this::getDto).collect(Collectors.toList()));
return Response.success(pageResult);
}
@Override
public Response find(Long id) {
JdbcDataSource jdbcDataSource = dataSourceMapper.find(id);
if (jdbcDataSource == null) {
return Response.error(Msg.DATASOURCE_NOT_EXIST);
}
return Response.success(getDto(jdbcDataSource));
}
@Override
public Response findmetaDataSource(Long id) {
JdbcDataSource jdbcDataSource = this.metaDataSourceMapper.find(id);
if (jdbcDataSource == null) {
return Response.error(Msg.DATASOURCE_NOT_EXIST);
}
return Response.success(getDto(jdbcDataSource));
}
@Override
public Response findDescrypt(Long id) {
JdbcDataSource jdbcDataSource = dataSourceMapper.find(id);
if (jdbcDataSource == null) {
return Response.error(Msg.DATASOURCE_NOT_EXIST);
}
if (!StringUtils.isBlank(jdbcDataSource.getUserName())) {
jdbcDataSource.setUserName(AesUtil.decrypt(jdbcDataSource.getUserName()));
}
if (!StringUtils.isBlank(jdbcDataSource.getPassword())) {
jdbcDataSource.setPassword(AesUtil.decrypt(jdbcDataSource.getPassword()));
}
return Response.success(getDto(jdbcDataSource));
}
private JdbcDataSourceDto getDto(JdbcDataSource jdbcDataSource) {
JdbcDataSourceDto dto = new JdbcDataSourceDto();
BeanUtils.copyProperties(jdbcDataSource, dto);
return dto;
}
}
DataSourceMapper
package com.geespace.microservices.datasource.dao;
import java.util.List;
import com.geespace.microservices.datasource.entity.JdbcDataSource;
import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Options;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Result;
import org.apache.ibatis.annotations.ResultMap;
import org.apache.ibatis.annotations.Results;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.SelectKey;
import org.apache.ibatis.annotations.SelectProvider;
import org.apache.ibatis.annotations.Update;
import org.apache.ibatis.type.JdbcType;
@Mapper
public interface DataSourceMapper {
@Insert({
"insert into ge_jdbc_datasource (source_type, source_name, jdbc_url, user_name, password, zk_address, znode, ",
"database_name, jdbc_driver_class, remark, creator, create_time, update_time, status)",
"values (#{sourceType,jdbcType=VARCHAR}, #{sourceName,jdbcType=VARCHAR}, #{jdbcUrl,jdbcType=VARCHAR}, ",
"#{userName,jdbcType=VARCHAR}, #{password,jdbcType=VARCHAR}, #{zkAddress,jdbcType=VARCHAR}, ",
"#{znode,jdbcType=VARCHAR}, #{databaseName,jdbcType=VARCHAR}, #{jdbcDriverClass,jdbcType=VARCHAR}, ",
"#{remark,jdbcType=VARCHAR}, #{creator,jdbcType=BIGINT}, #{createTime,jdbcType=TIMESTAMP}, ",
"#{updateTime,jdbcType=TIMESTAMP}, #{status,jdbcType=TINYINT})"})
@SelectKey(statement = "SELECT LAST_INSERT_ID()", keyProperty = "id", before = false, resultType = Long.class)
int insert(JdbcDataSource source);
@Update({"update ge_jdbc_datasource",
"set source_type = #{sourceType,jdbcType=VARCHAR}, source_name = #{sourceName,jdbcType=VARCHAR}, ",
"jdbc_url = #{jdbcUrl,jdbcType=VARCHAR}, user_name = #{userName,jdbcType=VARCHAR}, ",
"password = #{password,jdbcType=VARCHAR}, zk_address = #{zkAddress,jdbcType=VARCHAR}, ",
"znode = #{znode,jdbcType=VARCHAR}, database_name = #{databaseName,jdbcType=VARCHAR}, ",
"jdbc_driver_class = #{jdbcDriverClass,jdbcType=VARCHAR}, remark = #{remark,jdbcType=VARCHAR}, ",
"update_time = #{updateTime,jdbcType=TIMESTAMP}, status = #{status,jdbcType=TINYINT}",
"where id = #{id,jdbcType=BIGINT} and status = 1"})
int update(JdbcDataSource source);
@Delete("delete from ge_jdbc_datasource where id = #{id,jdbcType=BIGINT}")
int delete(Long id);
@Select({"select * from ge_jdbc_datasource where creator = #{creator,jdbcType=BIGINT} and status = 1",
" order by id desc"})
@Results(id = "resultMap",
value = {@Result(column = "id", property = "id", jdbcType = JdbcType.BIGINT, id = true),
@Result(column = "source_type", property = "sourceType", jdbcType = JdbcType.VARCHAR),
@Result(column = "source_name", property = "sourceName", jdbcType = JdbcType.VARCHAR),
@Result(column = "jdbc_url", property = "jdbcUrl", jdbcType = JdbcType.VARCHAR),
@Result(column = "user_name", property = "userName", jdbcType = JdbcType.VARCHAR),
@Result(column = "password", property = "password", jdbcType = JdbcType.VARCHAR),
@Result(column = "zk_address", property = "zkAddress", jdbcType = JdbcType.VARCHAR),
@Result(column = "znode", property = "znode", jdbcType = JdbcType.VARCHAR),
@Result(column = "database_name", property = "databaseName", jdbcType = JdbcType.VARCHAR),
@Result(column = "jdbc_driver_class", property = "jdbcDriverClass", jdbcType = JdbcType.VARCHAR),
@Result(column = "remark", property = "remark", jdbcType = JdbcType.VARCHAR),
@Result(column = "creator", property = "creator", jdbcType = JdbcType.BIGINT),
@Result(column = "create_time", property = "createTime", jdbcType = JdbcType.TIMESTAMP),
@Result(column = "update_time", property = "updateTime", jdbcType = JdbcType.TIMESTAMP),
@Result(column = "status", property = "status", jdbcType = JdbcType.TINYINT)})
List list(Long creator);
@Select({""})
@ResultMap("resultMap")
List listByType(@Param("creator") Long creator, @Param("type") List type);
@SelectProvider(type = DataSourceSqlProvider.class, method = "select")
@ResultMap("resultMap")
List select(JdbcDataSource jdbcDataSource);
@Options(flushCache = Options.FlushCachePolicy.TRUE)
@Select("select * from ge_jdbc_datasource where id = #{id,jdbcType=BIGINT} and status = 1")
@ResultMap("resultMap")
JdbcDataSource find(Long id);
@Select({"select * from ge_jdbc_datasource ",
"where source_name = #{sourceName,jdbcType=VARCHAR} and creator = #{creator,jdbcType=BIGINT}"})
@ResultMap("resultMap")
JdbcDataSource nameExist(JdbcDataSource jdbcDataSource);
}
DataSourceSqlProvider
package com.geespace.microservices.datasource.dao;
import com.geespace.microservices.datasource.entity.JdbcDataSource;
import org.apache.commons.lang.StringUtils;
import org.apache.ibatis.jdbc.SQL;
public class DataSourceSqlProvider {
public String select(JdbcDataSource jdbcDataSource) {
SQL sql = new SQL();
sql.SELECT("*");
sql.FROM("ge_jdbc_datasource");
sql.WHERe("status = 1");
if (jdbcDataSource.getCreator() != null) {
sql.WHERe("creator = #{creator,jdbcType=BIGINT}");
}
if (!StringUtils.isBlank(jdbcDataSource.getSourceName())) {
sql.WHERe("source_name like concat('%', #{sourceName,jdbcType=VARCHAR}, '%')");
}
sql.ORDER_BY("id desc");
return sql.toString();
}
}
metaDataSourceMapper
package com.geespace.microservices.datasource.dao;
import java.util.List;
import com.geespace.microservices.datasource.entity.JdbcDataSource;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Result;
import org.apache.ibatis.annotations.ResultMap;
import org.apache.ibatis.annotations.Results;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.type.JdbcType;
@Mapper
public interface metaDataSourceMapper {
@Select("select * from ge_meta_datasource ")
@Results(id = "resultMap",
value = {@Result(column = "id", property = "id", jdbcType = JdbcType.BIGINT, id = true),
@Result(column = "source_type", property = "sourceType", jdbcType = JdbcType.VARCHAR),
@Result(column = "source_name", property = "sourceName", jdbcType = JdbcType.VARCHAR),
@Result(column = "jdbc_url", property = "jdbcUrl", jdbcType = JdbcType.VARCHAR),
@Result(column = "user_name", property = "userName", jdbcType = JdbcType.VARCHAR),
@Result(column = "password", property = "password", jdbcType = JdbcType.VARCHAR),
@Result(column = "zk_address", property = "zkAddress", jdbcType = JdbcType.VARCHAR),
@Result(column = "znode", property = "znode", jdbcType = JdbcType.VARCHAR),
@Result(column = "database_name", property = "databaseName", jdbcType = JdbcType.VARCHAR),
@Result(column = "jdbc_driver_class", property = "jdbcDriverClass", jdbcType = JdbcType.VARCHAR),
@Result(column = "remark", property = "remark", jdbcType = JdbcType.VARCHAR),
@Result(column = "creator", property = "creator", jdbcType = JdbcType.BIGINT),
@Result(column = "create_time", property = "createTime", jdbcType = JdbcType.TIMESTAMP),
@Result(column = "update_time", property = "updateTime", jdbcType = JdbcType.TIMESTAMP),
@Result(column = "status", property = "status", jdbcType = JdbcType.TINYINT)})
List list();
@Select("select * from ge_meta_datasource where id = #{id,jdbcType=BIGINT} ")
@ResultMap("resultMap")
JdbcDataSource find(Long id);
}
DataSourceSelectForm
package com.geespace.microservices.datasource.form.datasource;
import javax.validation.constraints.NotNull;
import lombok.Data;
@Data
public class DataSourceSelectForm {
private String sourceName;
private Long creator;
@NotNull(message = "pageSize不能为空")
private int pageSize;
@NotNull(message = "pageNum不能为空")
private int pageNum;
}
DataSourceAddForm
package com.geespace.microservices.datasource.form.datasource;
import javax.validation.constraints.NotBlank;
import lombok.Data;
@Data
public class DataSourceAddForm {
@NotBlank(message = "数据源类型不能为空")
private String sourceType;
@NotBlank(message = "数据源名称不能为空")
private String sourceName;
private String jdbcUrl;
private String userName;
private String password;
private String zkAddress;
private String znode;
private String databaseName;
private String jdbcDriverClass;
private String remark;
private Long creator;
}
DataSourceUpdateForm
package com.geespace.microservices.datasource.form.datasource;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import lombok.Data;
@Data
public class DataSourceUpdateForm {
@NotNull(message = "id不能为空")
private Long id;
@NotBlank(message = "数据源类型不能为空")
private String sourceType;
@NotBlank(message = "数据源名称不能为空")
private String sourceName;
private String jdbcUrl;
private String userName;
private String password;
private String zkAddress;
private String znode;
private String databaseName;
private String jdbcDriverClass;
private String remark;
private Long creator;
}
YAPI测试用例
5.1查询全部同步任务配置(分页)
{
"pageNum": 1,
"pageSize": 10,
"projectId": 28,
"name": "测试"
}
5.2 创建同步任务配置-mysql->mysql
{
"name": "测试同步任务配置-mysql-mysql-1",
"description": "测试同步任务配置-mysql-mysql-1",
"projectName": "test测试1",
"projectId": 28,
"syncType": 2,
"readerConfigId": 1,
"readerParam": {
"table": "test_test"
},
"writerConfigId": 1,
"writerParam": {
"table": "test_test_1"
},
"columnMap": [
{
"reader": "id",
"writer": "id"
},
{
"reader": "name",
"writer": "name"
}
]
}
5.6 创建同步任务配置-hbase->hbase
{
"name": "测试同步任务配置-hbase-hbase-1",
"description": "测试同步任务配置-hbase-hbase-1",
"projectName": "test测试1",
"projectId": 28,
"syncType": 2,
"readerConfigId": 130,
"readerParam": {
"table": "test_test"
},
"writerConfigId": 130,
"writerParam": {
"table": "test_test_1",
"rowkeyColumns": [
"f:id",
"f:name"
]
},
"columnMap": [
{
"reader": "f:id",
"writer": "f:id"
},
{
"reader": "f:name",
"writer": "f:name"
}
]
}
5.7 创建同步任务配置-mysql->hbase
{
"name": "测试同步任务配置-mysql-hbase-1",
"description": "测试同步任务配置mysql-hbase-1",
"projectName": "test测试1",
"projectId": 28,
"syncType": 2,
"readerConfigId": 1,
"readerParam": {
"table": "test_test"
},
"writerConfigId": 130,
"writerParam": {
"table": "test_test_1",
"rowkeyColumns": [
"f:id",
"f:name"
]
},
"columnMap": [
{
"reader": "id",
"writer": "f:id"
},
{
"reader": "name",
"writer": "f:name"
}
]
}
5.8 创建同步任务配置-hbase->mysql
{
"name": "测试同步任务配置-hbase-mysql-1",
"description": "测试同步任务配置-hbase-mysql-1",
"projectName": "test测试1",
"projectId": 28,
"syncType": 2,
"readerConfigId": 130,
"readerParam": {
"table": "test_test",
"rowkeyColumns": [
"f:id",
"f:name"
]
},
"writerConfigId": 1,
"writerParam": {
"table": "test_test_1"
},
"columnMap": [
{
"reader": "f:id",
"writer": "id"
},
{
"reader": "f:name",
"writer": "name"
}
]
}
5.3 更新同步任务配置
{
"id": 82,
"name": "测试同步任务配置-mysql-3",
"description": "测试同步任务配置-mysql-3",
"projectName": "数据同步任务",
"projectId": 19,
"syncType": 2,
"readerConfigId": 1,
"readerParam": {
"table": "test_test"
},
"writerConfigId": 1,
"writerParam": {
"table": "test_test_1"
},
"columnMap": [
{
"reader": "id",
"writer": "id"
},
{
"reader": "name",
"writer": "name"
}
]
}
5.5 删除同步任务配置
5.4 查询同步任务配置
3.3 执行数据同步任务
3.4 停止数据同步任务



