栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink源码解析系列--ProcessingTimeService定时器

Flink源码解析系列--ProcessingTimeService定时器

本文的Flink源码版本为: 1.15-SNAPSHOT,读者可自行从Github clone.

@PublicEvolving
public interface ProcessingTimeService {
    // 返回当前的 processing time
    long getCurrentProcessingTime();

    
	// 注册定时器,并传入回调接口
	// 当系统的 processing time 推进到 timestamp 时,会执行传入的 ProcessingTimeCallback 回调接口中定义的方法
    ScheduledFuture registerTimer(long timestamp, ProcessingTimeCallback target);

    // 回调方法接口
    @PublicEvolving
    interface ProcessingTimeCallback {
        
		// 当定时器到达注册时间时,该方法会被调用
        void onProcessingTime(long time) throws IOException, InterruptedException, Exception;
    }
}

在 Stream 环境中,往往存在固定周期(FixedRate)或固定延误(FixedDelay)执行的需求,所以 Flink 在 org.apache.flink.streaming.runtime.tasks 包中继承并定义了同名的 ProcessingTimeService 接口:

public interface ProcessingTimeService
        extends org.apache.flink.api.common.operators.ProcessingTimeService {
	// 以固定周期执行某任务
	// 该调用和 JUC 包的 {@link ScheduledExecutor#scheduleAtFixedRate(Runnable, long, long, TimeUnit)} 表现类似
    ScheduledFuture scheduleAtFixedRate(
            ProcessingTimeCallback callback, long initialDelay, long period);

    // 以固定延迟执行某任务
    ScheduledFuture scheduleWithFixedDelay(
            ProcessingTimeCallback callback, long initialDelay, long period);

    CompletableFuture quiesce();
}

接着看一下其实现类 ProcessingTimeServiceImpl:

@Internal
class ProcessingTimeServiceImpl implements ProcessingTimeService {

    private final TimerService timerService;

    private final Function
            processingTimeCallbackWrapper;

    private final AtomicInteger numRunningTimers;

    private final CompletableFuture quiesceCompletedFuture;

    private volatile boolean quiesced;

    ProcessingTimeServiceImpl(
            TimerService timerService,
            Function
                    processingTimeCallbackWrapper) {
        this.timerService = timerService;
        this.processingTimeCallbackWrapper = processingTimeCallbackWrapper;

        this.numRunningTimers = new AtomicInteger(0);
        this.quiesceCompletedFuture = new CompletableFuture<>();
        this.quiesced = false;
    }

    @Override
    public long getCurrentProcessingTime() {
		// 调用 timerService 对应的方法
        return timerService.getCurrentProcessingTime();
    }

    @Override
    public ScheduledFuture registerTimer(long timestamp, ProcessingTimeCallback target) {
        if (isQuiesced()) {
            return new NeverCompleteFuture(
                    ProcessingTimeServiceUtil.getProcessingTimeDelay(
                            timestamp, getCurrentProcessingTime()));
        }
		// 调用 timerService 对应的方法
        return timerService.registerTimer(
                timestamp,
                addQuiesceProcessingToCallback(processingTimeCallbackWrapper.apply(target)));
    }

    @Override
    public ScheduledFuture scheduleAtFixedRate(
            ProcessingTimeCallback callback, long initialDelay, long period) {
        if (isQuiesced()) {
            return new NeverCompleteFuture(initialDelay);
        }
		// 调用 timerService 对应的方法
        return timerService.scheduleAtFixedRate(
                addQuiesceProcessingToCallback(processingTimeCallbackWrapper.apply(callback)),
                initialDelay,
                period);
    }

    @Override
    public ScheduledFuture scheduleWithFixedDelay(
            ProcessingTimeCallback callback, long initialDelay, long period) {
        if (isQuiesced()) {
            return new NeverCompleteFuture(initialDelay);
        }
		// 调用 timerService 对应的方法
        return timerService.scheduleWithFixedDelay(
                addQuiesceProcessingToCallback(processingTimeCallbackWrapper.apply(callback)),
                initialDelay,
                period);
    }

    @Override
    public CompletableFuture quiesce() {
        if (!quiesced) {
            quiesced = true;

            if (numRunningTimers.get() == 0) {
                quiesceCompletedFuture.complete(null);
            }
        }

        return quiesceCompletedFuture;
    }

    private boolean isQuiesced() {
        return quiesced;
    }

    private ProcessingTimeCallback addQuiesceProcessingToCallback(ProcessingTimeCallback callback) {

        return timestamp -> {
            if (isQuiesced()) {
                return;
            }

            numRunningTimers.incrementAndGet();
            try {
                // double check to deal with the race condition:
                // before executing the previous line to increase the number of running timers,
                // the quiesce-completed future is already completed as the number of running
                // timers is 0 and "quiesced" is true
                if (!isQuiesced()) {
                    callback.onProcessingTime(timestamp);
                }
            } finally {
                if (numRunningTimers.decrementAndGet() == 0 && isQuiesced()) {
                    quiesceCompletedFuture.complete(null);
                }
            }
        };
    }
}

由上述代码可知,ProcessingTimeServiceImpl 实现类主要是通过构造函数传入的 TimerService 来完成相应定时操作的。

而 TimerService 其实就是在 ProcessingTimeService 接口上进一步增强了一下:

@Internal
public interface TimerService extends ProcessingTimeService {

    // 返回服务是否被 shutdown
    boolean isTerminated();

	// 终止服务,并清除服务上注册的所有定时器
	// 该方法是 hard 和即时的,不会等待任何定时器任务的执行
	// 还未执行的未来定时任务将直接抛出 hard 异常
    void shutdownService();

    // 终止服务
	// 阻塞等待服务完全被终止,不会被中断
	// 除非超时返回异常
    boolean shutdownServiceUninterruptible(long timeoutMs);
}

下面重点讲解一下 TimerService 接口的实现类 SystemProcessingTimeService。

SystemProcessingTimeService 有2个构造函数:

@VisibleForTesting
// ExceptionHandler 为异常处理接口
SystemProcessingTimeService(ExceptionHandler exceptionHandler) {
	this(exceptionHandler, null);
}

SystemProcessingTimeService(ExceptionHandler exceptionHandler, ThreadFactory threadFactory) {

	this.exceptionHandler = checkNotNull(exceptionHandler);
	this.status = new AtomicInteger(STATUS_ALIVE);
	this.quiesceCompletedFuture = new CompletableFuture<>();
	
	// 注意,这里的 ScheduledTaskExecutor 是通过继承 JUC 的 ScheduledThreadPoolExecutor 线程池来实现的
	if (threadFactory == null) {
		this.timerService = new ScheduledTaskExecutor(1);
	} else {
		this.timerService = new ScheduledTaskExecutor(1, threadFactory);
	}

	// 如果 future 被 cancel 了,则所有定时任务会被清除
	this.timerService.setRemoveonCancelPolicy(true);

	// 确保 shutdown 操作会清除所有 pending tasks
	this.timerService.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
	this.timerService.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
}

SystemProcessingTimeService 没有直接使用 JDK.JUC 包提供的定时任务线程池 ScheduledTaskExecutor,而是通过继承 JUC.ScheduledThreadPoolExecutor 的方式自定义了1个线程池。

 private class ScheduledTaskExecutor extends ScheduledThreadPoolExecutor {

	public ScheduledTaskExecutor(int corePoolSize) {
		super(corePoolSize);
	}

	public ScheduledTaskExecutor(int corePoolSize, ThreadFactory threadFactory) {
		super(corePoolSize, threadFactory);
	}

	@Override
	// 仅重写了 terminated() 方法
	protected void terminated() {
		super.terminated();
		quiesceCompletedFuture.complete(null);
	}
}

getCurrentProcessingTime()

@Override
public long getCurrentProcessingTime() {
	return System.currentTimeMillis();
}

registerTimer(long timestamp, ProcessingTimeCallback callback)

@Override
public ScheduledFuture registerTimer(long timestamp, ProcessingTimeCallback callback) {

	long delay =
			ProcessingTimeServiceUtil.getProcessingTimeDelay(
					timestamp, getCurrentProcessingTime());

	// we directly try to register the timer and only react to the status on exception
	// that way we save unnecessary volatile accesses for each timer
	try {
		// 基于 timerService 的 schedule 方法实现的
		// 而 timerService 又是 JUC.ScheduledThreadPoolExecutor 的继承类
		// 本质上还是借用了 JUC.ScheduledThreadPoolExecutor 的方法实现
		return timerService.schedule(
				wraponTimerCallback(callback, timestamp), delay, TimeUnit.MILLISECONDS);
	} catch (RejectedExecutionException e) {
		final int status = this.status.get();
		if (status == STATUS_QUIESCED) {
			return new NeverCompleteFuture(delay);
		} else if (status == STATUS_SHUTDOWN) {
			throw new IllegalStateException("Timer service is shut down");
		} else {
			// something else happened, so propagate the exception
			throw e;
		}
	}
}

scheduleAtFixedRate 和 scheduleWithFixedDelay

@Override
public ScheduledFuture scheduleAtFixedRate(
		ProcessingTimeCallback callback, long initialDelay, long period) {
	return scheduleRepeatedly(callback, initialDelay, period, false);
}

@Override
public ScheduledFuture scheduleWithFixedDelay(
		ProcessingTimeCallback callback, long initialDelay, long period) {
	return scheduleRepeatedly(callback, initialDelay, period, true);
}

private ScheduledFuture scheduleRepeatedly(
		ProcessingTimeCallback callback, long initialDelay, long period, boolean fixedDelay) {
	final long nextTimestamp = getCurrentProcessingTime() + initialDelay;
	final Runnable task = wraponTimerCallback(callback, nextTimestamp, period);

	// we directly try to register the timer and only react to the status on exception
	// that way we save unnecessary volatile accesses for each timer
	try {
		return fixedDelay
				// 最终调用的还是 JUC.ScheduledThreadPoolExecutor 的实现方法
				? timerService.scheduleWithFixedDelay(
						task, initialDelay, period, TimeUnit.MILLISECONDS)
				: timerService.scheduleAtFixedRate(
						task, initialDelay, period, TimeUnit.MILLISECONDS);
	} catch (RejectedExecutionException e) {
		final int status = this.status.get();
		if (status == STATUS_QUIESCED) {
			return new NeverCompleteFuture(initialDelay);
		} else if (status == STATUS_SHUTDOWN) {
			throw new IllegalStateException("Timer service is shut down");
		} else {
			// something else happened, so propagate the exception
			throw e;
		}
	}
}

如果你不熟悉 JDK.JUC 包里 ScheduledThreadPoolExecutor 类的方法实现,可以参考我之前写的1篇博客:

ScheduledThreadPoolExecutor源码解析

可以发现,无论是 registerTimer 方法,还是 scheduleAtFixedRate 或 scheduleWithFixedDelay 方法,都是通过 wraponTimerCallback 方法将 ProcessingTimeCallback 回调接口和 Runnable 绑定在一起的。

private Runnable wraponTimerCallback(
		ProcessingTimeCallback callback, long nextTimestamp, long period) {
	return new ScheduledTask(status, exceptionHandler, callback, nextTimestamp, period);
}

private static final class ScheduledTask implements Runnable {
	private final AtomicInteger serviceStatus;
	private final ExceptionHandler exceptionHandler;
	// 回调接口
	private final ProcessingTimeCallback callback;

	private long nextTimestamp;
	private final long period;

	ScheduledTask(
			AtomicInteger serviceStatus,
			ExceptionHandler exceptionHandler,
			ProcessingTimeCallback callback,
			long timestamp,
			long period) {
		this.serviceStatus = serviceStatus;
		this.exceptionHandler = exceptionHandler;
		this.callback = callback;
		this.nextTimestamp = timestamp;
		this.period = period;
	}

	@Override
	public void run() {
		// 首先检查服务状态是否为 ALIVE
		// 若不是,说明服务被终止了,任务直接返回,无需执行回调接口
		if (serviceStatus.get() != STATUS_ALIVE) {
			return;
		}
		try {
			// 执行回调接口的 onProcessingTime 方法
			callback.onProcessingTime(nextTimestamp);
		} catch (Exception ex) {
			exceptionHandler.handleException(ex);
		}
		nextTimestamp += period;
	}
}

ProcessingTimeService 的典型应用就是周期性水印的发送。

TimestampsAndWatermarksOperator 类实现了 ProcessingTimeCallback 接口,并实现了回调方法 onProcessingTime:

@Override
public void onProcessingTime(long timestamp) throws Exception {
	// 代码①:调用水印生成器的 onPeriodicEmit 方法发送水印
	watermarkGenerator.onPeriodicEmit(wmOutput);

	final long now = getProcessingTimeService().getCurrentProcessingTime();
	// 获取该 Operator 持有的 ProcessingTimeService
	// 并注册定时器,将 this 作为回调接口传进去
	// 当系统时钟到达 now + watermarkInterval 时,会再次调用该类的 onProcessingTime 方法
	// 即间隔 watermarkInterval 毫秒,又再次执行①处代码,从而实现了周期性水印
	getProcessingTimeService().registerTimer(now + watermarkInterval, this);
}

本文到此结束,感谢阅读!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/780657.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号