栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

大数据实时传输组件Maxwell-时区问题源码修改

大数据实时传输组件Maxwell-时区问题源码修改

项目场景:

大数据实时平台搭建,数据采集部分…


问题描述

Maxwell在处理timestamp字段类型时,默认使用utc时间,会差8个小时。设置配置文件中的参数jdbc_options和replication_jdbc_options,发现只对datetime字段生效,通过修改源码解决了这个问题。

各版本源码下载地址:https://github.com/zendesk/maxwell/tags
下载*.zip包就可以,本地解压后用idea打开修改编译


1.修改

类名:com.zendesk.maxwell.schema.columndef.ColumnDef.java

修改前:
case "datetime":
	case "timestamp":
	return new DateTimeColumnDef(name, type, pos, columnLength);
 
修改后:
case "datetime":
	return new DateTimeColumnDef(name, type, pos, columnLength);
case "timestamp":
	return new TimeStampColumnDef(name, type, pos, columnLength);

2.新增:

类名:com.zendesk.maxwell.schema.columndef.TimeStampColumnDef.java

package com.zendesk.maxwell.schema.columndef;
 
import com.zendesk.maxwell.producer.MaxwellOutputConfig;
 
import java.sql.Timestamp;
 
public class TimeStampColumnDef extends ColumnDefWithLength {
	public TimeStampColumnDef(String name, String type, short pos, Long columnLength) {
		super(name, type, pos, columnLength);
	}
 
	@Override
	protected String formatValue(Object value, MaxwellOutputConfig config) {
		Timestamp ts = TimeStampFormatter.extractTimestamp(value);
		String dateString = TimeStampFormatter.formatDateTime(value, ts);
 
		if ( dateString == null )
			return null;
		else
			return appendFractionalSeconds(dateString, ts.getNanos(), columnLength);
	}
 
}

类名:com.zendesk.maxwell.schema.columndef.TimeStampFormatter.java

package com.zendesk.maxwell.schema.columndef;
 
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.TimeZone;
 
public class TimeStampFormatter {
	private static SimpleDateFormat makeFormatter(String format, boolean utc) {
		SimpleDateFormat dateFormatter = new SimpleDateFormat(format);
		dateFormatter.setTimeZone(TimeZone.getTimeZone("Asia/Shanghai"));
		return dateFormatter;
	}
 
	private static SimpleDateFormat dateFormatter           = makeFormatter("yyyy-MM-dd", false);
	private static SimpleDateFormat dateUTCFormatter        = makeFormatter("yyyy-MM-dd", true);
	private static SimpleDateFormat dateTimeFormatter       = makeFormatter("yyyy-MM-dd HH:mm:ss", false);
	private static SimpleDateFormat dateTimeUTCFormatter    = makeFormatter("yyyy-MM-dd HH:mm:ss", true);
 
	public static Timestamp extractTimestamp(Object value) {
		if (value instanceof Long) {
			Long micros = (Long) value;
			long millis = floorDiv(micros, 1000L);
			Timestamp t = new Timestamp(millis);
			long microsonly = floorMod(micros, (long) 1000000);
			t.setNanos((int) microsonly * 1000);
			return t;
		} else if (value instanceof Timestamp) {
			return (Timestamp) value;
		} else if ( value instanceof Date ) {
			Long time = ((Date) value).getTime();
			return new Timestamp(time);
		} else
			throw new RuntimeException("couldn't extract date/time out of " + value);
	}
 
	private static Timestamp MIN_DATE = Timestamp.valueOf("1000-01-01 00:00:00");
 
	private static String format(SimpleDateFormat formatter, Timestamp ts) {
		if ( ts.before(MIN_DATE) ) {
			return null;
		} else {
			synchronized(formatter) {
					return formatter.format(ts);
			}
		}
	}
 
	public static String formatDate(Object value) {
		SimpleDateFormat formatter;
		if ( value instanceof Long )
			formatter = dateUTCFormatter;
		else
			formatter = dateFormatter;
 
		return format(formatter, extractTimestamp(value));
	}
 
	public static String formatDateTime(Object value, Timestamp ts) {
		SimpleDateFormat formatter;
 
		if ( value instanceof Long )
			formatter = dateTimeUTCFormatter;
		else
			formatter = dateTimeFormatter;
 
		return format(formatter, ts);
	}
 
	private static long floorDiv(long a, long b) {
		return ((a < 0)?(a - (b - 1)):a) / b;
	}
 
	private static long floorMod(long x, long y) {
		return x - floorDiv(x, y) * y;
	}
}

3.编译

编译成功后,将maxwell-1.xx.x.jar放到Maxwell的lib目录下,替换原有jar包(记得备份)

4.说明

config.conf配置文件中jdbc_options和replication_jdbc_options这两个参数还是需要配的,参数配置解决的是datetime字段时区的问题,修改源码解决的是timestamp字段时区的问题

5.其他问题

很遗憾平台最后还是没能用到该组件,由于在使用过程中该组件频繁出现宕机,查看日志报的是关于heartbeat心跳方面的错误,查了很多资料并请教还是没能解决该问题,可能和mysql配置有关,最后更换了备用方案canal.


原文章可见https://blog.csdn.net/woloqun/article/details/119907719

记得点赞收藏噢,关注不迷路~~~

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/773870.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号