栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flinksql的初始化

flinksql的初始化

Mavn的依赖
 
        1.8
        ${java.version}
        ${java.version}
        1.12.0
        2.12
        3.1.3
    

    
        
            org.apache.flink
            flink-java
            ${flink.version}
        

        
            org.apache.flink
            flink-streaming-java_${scala.version}
            ${flink.version}
        

        
            org.apache.flink
            flink-connector-kafka_${scala.version}
            ${flink.version}
        

        
            org.apache.flink
            flink-clients_${scala.version}
            ${flink.version}
        

        
            org.apache.flink
            flink-cep_${scala.version}
            ${flink.version}
        

        
            org.apache.flink
            flink-json
            ${flink.version}
        

        
            com.alibaba
            fastjson
            1.2.68
        

        
        
            org.apache.hadoop
            hadoop-client
            ${hadoop.version}
        

        
            mysql
            mysql-connector-java
            5.1.49
        

        
            com.alibaba.ververica
            flink-connector-mysql-cdc
            1.2.0
        

        
        
            org.slf4j
            slf4j-api
            1.7.25
        

        
            org.slf4j
            slf4j-log4j12
            1.7.25
        

        
            org.apache.logging.log4j
            log4j-to-slf4j
            2.14.0
        

        
        
            org.projectlombok
            lombok
            1.18.12
        

        
            org.apache.flink
            flink-connector-jdbc_${scala.version}
            ${flink.version}
        

        
            org.apache.phoenix
            phoenix-spark
            5.0.0-Hbase-2.0
            
                
                    org.glassfish
                    javax.el
                
            
        

        
        
            commons-beanutils
            commons-beanutils
            1.9.3
        

        
        
            com.google.guava
            guava
            29.0-jre
        

        
            redis.clients
            jedis
            3.3.0
        

        
            ru.yandex.clickhouse
            clickhouse-jdbc
            0.2.4
            
                
                    com.fasterxml.jackson.core
                    jackson-databind
                
                
                    com.fasterxml.jackson.core
                    jackson-core
                
            
        

        
            org.apache.flink
            flink-table-api-java-bridge_${scala.version}
            ${flink.version}
        

        
            org.apache.flink
            flink-table-planner-blink_${scala.version}
            ${flink.version}
        

        
            com.janeluo
            ikanalyzer
            2012_u6
        
        
            
            com.clickhouse
            clickhouse-jdbc
            0.3.2-patch4
            
            http
            
                
                    *
                    *
                
            
        

    

    
        
            
                org.apache.maven.plugins
                maven-assembly-plugin
                3.0.0
                
                    
                        jar-with-dependencies
                    
                
                
                    
                        make-assembly
                        package
                        
                            single
                        
                    
                
            
        
    
核心代码
 //流代码
        EnvironmentSettings environment = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env, environment);
        //dataStream.print();
        //以一个的分钟作为周期
        SingleOutputStreamOperator> streamOperator = dataStreams.timeWindowAll(Time.minutes(1)).apply(new AllWindowFunction, TimeWindow>() {
            @Override
            public void apply(TimeWindow timeWindow, Iterable iterable, Collector> collector) throws Exception {
                ArrayList list = Lists.newArrayList(iterable);
                if (list.size() > 0) {
                    collector.collect(list);
                }
            }
        });
        //dataStreams.print();
        
        Table table = streamTableEnvironment.fromDataStream(dataStreams, "user_id,item_id,cate_id,times,name,keyword,factory,price,pro,city,par,brank");
        streamTableEnvironment.createTemporaryView("t1", table);
        streamOperator.addSink(new OrderSinkFunc());
        //,tumble(times, interval '1' day)
        Table table1 = streamTableEnvironment.sqlQuery("select item_id,name,count(*)as num ,sum(price) as total from t1 group by item_id,name ");
        //支持撤回
        streamTableEnvironment.toRetractStream(table1, Row.class).print("输出结果");

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/742207.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号