栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

datax之关于毒丸对象的使用

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

datax之关于毒丸对象的使用

前言:

    毒丸对象,在日常开发中倒不是经常使用到。笔者在阅读datax的源码时,发现到这个骚操作,赶紧去查了下相关概念和使用场景。本文就当做一个简单的记录,以后在类似的开发场景中能够及时想到这么一种方案。

1.毒丸对象

    毒丸,是指放在一个队列上的对象。一般在FIFO的队列中,生产者生产消息,消费者消费消息,当生产者生产完所有的消息后,一般会最终发送一个毒丸对象(告诉消费者这是最后一个对象);而当消费者按顺序消费对象时,若消费到毒丸对象,则可以判定这是生产者生产的最后一个对象了,后续就可以关闭消费者。

    所以,针对毒丸对象的使用场景一般就是如此,消费者可以通过毒丸对象来了解是否已经消费完所有的消息,若全部完成,可顺利结束当前消费线程。

2.datax的使用示例

    笔者不多介绍datax的一些知识,读者可以从其官网获取。

    在笔者的示例中,从一个MysqlReader读取数据,同步到一个MysqlWriter。从源码看来这分别是通过两个线程来执行的任务。MysqlReader产生的数据则交给Channel来暂存,MysqlWriter则从Channel中拉取数据。具体代码如下:

2.1 生产数据代码:
public class RecordExchanger extends TransformerExchanger implements RecordSender, RecordReceiver {
	public void sendToWriter(Record record) {
		if(shutdown){
			throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, "");
		}
		record = doTransformer(record);
		if (record == null) {
			return;
		}
        // 托管给channel
		this.channel.push(record);
		doStat();
	}
}

2.1.1 毒丸对象的放置

    当Channel关闭时或Exchange关闭时,则放置毒丸对象,告诉消费者数据已全部发送完毕

public class MemoryChannel extends Channel {
	public void close() {
		super.close();
		try {
            // 放置毒丸对象
			this.queue.put(TerminateRecord.get());
		} catch (InterruptedException ex) {
			Thread.currentThread().interrupt();
		}
	}
}
2.2 拉取数据代码:
public class RecordExchanger extends TransformerExchanger implements RecordSender, RecordReceiver {
	@Override
	public Record getFromReader() {
		if(shutdown){
			throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, "");
		}
        // 通过channel拉取数据
		Record record = this.channel.pull();
        // TerminateRecord则是毒丸数据
		return (record instanceof TerminateRecord ? null : record);
	}
}

2.2.1 消费者获取到毒丸对象之后的操作

public class CommonRdbmsWriter {
	public void startWriteWithConnection(RecordReceiver recordReceiver, TaskPluginCollector taskPluginCollector, Connection connection) {
        ...
        try {
            Record record;
            // 获取到毒丸对象时返回null,此时则跳出while循环
            while((record = recordReceiver.getFromReader()) != null) {
                ...

                writeBuffer.add(record);
                bufferBytes += record.getMemorySize();
                if (writeBuffer.size() >= this.batchSize || bufferBytes >= this.batchByteSize) {
                    this.doBatchInsert(connection, writeBuffer);
                    writeBuffer.clear();
                    bufferBytes = 0;
                }
            }
        }
2.3 Channel(默认使用MemoryChannel)
public class MemoryChannel extends Channel {

	private int bufferSize = 0;
	// 使用ArrayBlockingQueue来暂存数据
	private ArrayBlockingQueue queue = null;
    
    // 存放数据
    protected void doPush(Record r) {
		try {
			long startTime = System.nanoTime();
			this.queue.put(r);
			waitWriterTime += System.nanoTime() - startTime;
            memoryBytes.addAndGet(r.getMemorySize());
		} catch (InterruptedException ex) {
			Thread.currentThread().interrupt();
		}
	}
    
    // 拉取数据
    protected Record doPull() {
		try {
			long startTime = System.nanoTime();
			Record r = this.queue.take();
			waitReaderTime += System.nanoTime() - startTime;
			memoryBytes.addAndGet(-r.getMemorySize());
			return r;
		} catch (InterruptedException e) {
			Thread.currentThread().interrupt();
			throw new IllegalStateException(e);
		}
	}
}
总结:

    多学习些源码还是有好处的,有很多意想不到的知识点运用。与君共勉。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/305478.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号