栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 系统运维 > 运维 > Linux

CoProcessFunction实战三部曲之二:状态处理

Linux 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

CoProcessFunction实战三部曲之二:状态处理

| 名称 | 链接 | 备注 |

| :-- | :-- | :-- |

| 项目主页 | https://github.com/zq2599/blog_demos | 该项目在GitHub上的主页 |

| git仓库地址(https) | https://github.com/zq2599/blog_demos.git | 该项目源码的仓库地址,https协议 |

| git仓库地址(ssh) | git@github.com:zq2599/blog_demos.git | 该项目源码的仓库地址,ssh协议 |

这个git项目中有多个文件夹,本章的应用在flinkstudy文件夹下,如下图红框所示:

[](

)编码

  1. 字符串转Tuple2的Map函数,以及抽象类AbstractCoProcessFunctionExecutor都和上一篇[《CoProcessFunction实战三部曲之一:基本功能》](

)一模一样;

  1. 新增AbstractCoProcessFunctionExecutor的子类AddTwoSourcevalue.java,源码如下,稍后会说明几个关键点:

package com.bolingcavalry.coprocessfunction;

import org.apache.flink.api.common.state.ValueState;

import org.apache.flink.api.common.state.ValueStateDescriptor;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.streaming.api.functions.co.CoProcessFunction;

import org.apache.flink.util.Collector;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

public class AddTwoSourcevalue extends AbstractCoProcessFunctionExecutor {

private static final Logger logger = LoggerFactory.getLogger(AddTwoSourcevalue.class);

@Override

protected CoProcessFunction, Tuple2, Tuple2> getCoProcessFunctionInstance() {

return new CoProcessFunction, Tuple2, Tuple2>() {

// 某个key在processElement1中存入的状态

private ValueState state1;

// 某个key在processElement2中存入的状态

private ValueState state2;

@Override

public void open(Configuration parameters) throws Exception {

// 初始化状态

state1 = getRuntimeContext().getState(new ValueStateDescriptor<>(“myState1”, Integer.class));

state2 = getRuntimeContext().getState(new ValueStateDescriptor<>(“myState2”, Integer.class));

}

@Override

public void processElement1(Tuple2 value, Context ctx, Collector> out) throws Exception {

logger.info(“处理元素1:{}”, value);

String key = value.f0;

Integer value2 = state2.value();

// value2为空,就表示processElement2还没有处理或这个key,

// 这时候就把value1保存起来

if(null==value2) {

logger.info(“2号流还未收到过[{}],把1号流收到的值[{}]保存起来”, key, value.f1);

state1.update(value.f1);

} else {

logger.info(“2号流收到过[{}],值是[{}],现在把两个值相加后输出”, key, value2);

// 输出一个新的元素到下游节点

out.collect(new Tuple2<>(key, value.f1 + value2));

// 把2号流的状态清理掉

state2.clear();

}

}

@Override

public void processElement2(Tuple2 value, Context ctx, Collector> out) throws Ex

【一线大厂Java面试题解析+核心总结学习笔记+最新架构讲解视频+实战项目源码讲义】

浏览器打开:qq.cn.hn/FTf 免费领取

ception {

logger.info(“处理元素2:{}”, value);

String key = value.f0;

Integer value1 = state1.value();

// value1为空,就表示processElement1还没有处理或这个key,

// 这时候就把value2保存起来

if(null==value1) {

logger.info(“1号流还未收到过[{}],把2号流收到的值[{}]保存起来”, key, value.f1);

state2.update(value.f1);

} else {

logger.info(“1号流收到过[{}],值是[{}],现在把两个值相加后输出”, key, value1);

// 输出一个新的元素到下游节点

out.collect(new Tuple2<>(key, value.f1 + value1));

// 把1号流的状态清理掉

state1.clear();

}

}

};

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/361390.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号