栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark SQL & Elasticsearch

Spark SQL & Elasticsearch

Spark SQL & Elasticsearch

一、读取二、转换三、写入四、适配分析器方案五、示例


一、读取

使用spark-sql读取es数据如下代码所示:

SparkSession session = SparkSession.builder()
    .appName("esTest")
    .master("local[*]")
    .getOrCreate();

Dataset dataset = session.read()
    .format("org.elasticsearch.spark.sql")
    .option("es.nodes", "192.168.6.3:9200") // 设置参数
    .load("test"); // es索引名,多个使用逗号分隔,可以使用*号

注意:

    索引名使用多个或通配符时,mapping不能存在冲突,否则会导致读取时失败

    存储嵌套结构时,mapping可以嵌套,也可以平铺(字段名包含点号),但数据必须嵌套,不能平铺,否则会导致读取时失败。示例如下。

    (1)嵌套mapping

    {
      "properties": {
        "ip": {
          "type": "object",
          "properties": {
            "src": {
              "type": "long"
            }
          }
        }
      }
    }
    

    (2)平铺mapping

    {
      "properties": {
        "ip.src": {
          "type": "long"
          }
        }
      }
    }
    

    (3)嵌套数据

    {
      "ip": {
        "src": 123456
      }
    }
    

    (4)平铺数据(不能这样存储)

    {
      "ip.src": 123456
    }
    

    若某字段格式为数组,由于mapping不区分数组,会导致访问时失败,解决方式为设置es.read.field.as.array.include参数,值为所有以数组格式存储的字段名,多个以逗号分隔

    更多参数参考官网


二、转换

    Row一般使用GenericRow或GenericRowWithSchema,从es读取到的为GenericRowWithSchema。

    GenericRow:可以使用RowFactory.create(...values)创建。只能使用下标获取字段值,可以通过schema.fieldIndex("fieldName")获取字段下标

    GenericRowWithSchema:使用构造方法创建。可以直接使用row.getAs("fieldName")获取字段值

    创建schema比较简便的方式如下

    StructType schema = DataTypes.createStructType(new StructField[]{
        // p1: 字段名 p2: 数据类型 p3: 允许为空
        DataTypes.createStructField("ip", DataTypes.LongType, true),
        // 数组类型
        DataTypes.createStructField("ports", DataTypes.createArrayType(DataTypes.IntegerType), true)
    });
    

    创建Row类型的Encoder比较简便的方式如下

    expressionEncoder rowEncoder = RowEncoder.apply(schema);
    

    读数组

    WrappedArray ports = row.getAs("ports");
    

    写数组。可以不区分类型,直接写Object数组即可

    new Object[]{value1, value2, ...}
    

    读对象

    Row ip = row.getAs("ip");
    Long sip = ip.getAs("src");
    

    写对象

    // 带schema
    new GenericRowWithSchema(new Object[]{value1, value2, ...}, schema)
    // 不带schema
    new RowFactory.create(value1, value2, ...)
    

三、写入

使用spark-sql将Dataset写入es如下代码所示:

JavaEsSparkSQL.saveToEs(
    dataset,
    "test2", // 索引名
    Collections.singletonMap("es.nodes", "192.168.6.3:9200") // 参数设置
);

注意:

    建议提前创建mapping,参考官网说明

四、适配分析器方案

基本思路如下

    使用spark-sql从es读取数据(seaTunnel已支持)使用flatMap算子,将元数据转换为分析数据(analyse阶段)使用groupByKey算子,按id分组使用reduceGroups算子,聚合各组数据(merge阶段)使用map算子,将数据格式转换回Dataset将Dataset保存到es(seaTunnel已支持)

我们需要做的就是开发一个插件,集成第2到5步,通过读取资产配置,生成分析和合并工具,放到flatMap和reduceGroups中执行。目前资产配置已有,分析和合并工具需要做部分改动。


五、示例

业务介绍:

    原始数据包含源IP、目的IP、源端口、目的端口统计每个IP出现的次数和对应的所有端口号

原始数据表为test,mapping格式如下

{
  "properties": {
    "sip": {
      "type": "long"
    },
    "dip": {
      "type": "long"
    },
    "sp": {
      "type": "integer"
    },
    "dp": {
      "type": "integer"
    }
  }
}

统计数据表为test2,mapping格式如下

{
  "properties": {
    "ip": {
      "type": "long"
    },
    "ports": {
      "type": "integer"
    },
    "count": {
      "type": "integer"
    }
  }
}

计算代码如下

package com.example.spark;

import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.ReduceFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.catalyst.encoders.expressionEncoder;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.elasticsearch.spark.sql.api.java.JavaEsSparkSQL;
import scala.Tuple2;
import scala.collection.mutable.WrappedArray;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;


public class ElasticsearchTest {

    public static void main(String[] args) {
        SparkSession session = SparkSession.builder().appName("esTest").master("local[*]").getOrCreate();
        
        StructType schema = DataTypes.createStructType(new StructField[]{
                DataTypes.createStructField("ip", DataTypes.LongType, true),
                DataTypes.createStructField("ports", DataTypes.createArrayType(DataTypes.IntegerType), true),
                DataTypes.createStructField("count", DataTypes.IntegerType, false)
        });

        expressionEncoder rowEncoder = RowEncoder.apply(schema);

        // 读取原始数据
        Dataset dataset = session.read()
                .format("org.elasticsearch.spark.sql")
                .option("es.nodes", "192.168.6.3:9200")
                .load("test");
        
        // 根据原始数据生成统计数据
        Dataset flatMapDataset = dataset.flatMap((FlatMapFunction) row -> {
            List list = new ArrayList<>();
            list.add(new GenericRowWithSchema(new Object[] {
                    row.getAs("sip"),
                    new Object[] {row.getAs("sp")},
                    1
            }, schema));
            list.add(new GenericRowWithSchema(new Object[] {
                    row.getAs("dip"),
                    new Object[] {row.getAs("dp")},
                    1
            }, schema));
            return list.iterator();
        }, rowEncoder);

        // 将统计数据按ip分组
        KeyValueGroupedDataset groupDataset = flatMapDataset.groupByKey(
                (MapFunction) row -> row.getAs("ip"),
                Encoders.LONG());

        // 聚合各组的统计数据
        Dataset> reduceDataset = groupDataset.reduceGroups((ReduceFunction) (row1, row2) -> {
            long ip = row1.getAs("ip");
            WrappedArray ports1 = row1.getAs("ports");
            WrappedArray ports2 = row2.getAs("ports");
            int count1 = row1.getAs("count");
            int count2 = row2.getAs("count");
            Integer[] ports = new Integer[ports1.length() + ports2.length()];
            ports1.copyToArray(ports);
            ports2.copyToArray(ports, ports1.length());
            return RowFactory.create(ip, ports, count1 + count2);
        });

        // 转换数据结构
        Dataset mapDataset = reduceDataset.map((MapFunction, Row>) tuple -> tuple._2, rowEncoder);

        // 写入统计数据
        JavaEsSparkSQL.saveToEs(mapDataset, "test2", Collections.singletonMap("es.nodes", "192.168.6.3:9200"));
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/734994.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号