栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spark过滤数据根据事件进行属性解密

spark过滤数据根据事件进行属性解密

需求背景:

需要对事件过滤,对符合要求的数据进行解密处理

代码实现

package com.sensordata.zxjt.hispro;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.yelong.security.sm4.SM4Utils;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class zxjtHistoryProcess {


    public static  void main(String[] args) {

        SparkConf sparkConf = new SparkConf().setAppName("sensordata");
        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
        JavaRDD  javaRDD=  javaSparkContext.textFile(args[0]);

        JavaRDD result= javaRDD.filter(new Function() {
            @Override
            public Boolean call(String s) throws Exception {
                JSonObject jsonObject1 = JSON.parseObject(s);
                if(jsonObject1.getString("type").equals("track_signup")) {
                    return false;
                }else {
                    return true;
                }
            }
        }).map(new Function() {
            @Override
            public Object call(String s) throws Exception {
                JSonObject jsonObject = JSON.parseObject(s);
                if(jsonObject.getString("type").equals("track")) {
                   return returnEventJsonStr(jsonObject);

                }else if(jsonObject.getString("type").equals("profile_set")) {
                    return  returnProfileSetJsonStr(jsonObject);
                }
                return jsonObject.toJSonString();
            }
        });

        result.saveAsTextFile(args[1]);

    }

    public static boolean isbase64(String str) {

        if (str == null || str.trim().length() == 0 || str.length() % 4 != 0) return false;
        char[] strChars = str.toCharArray();
        for (char c : strChars) {
            if (!(c >= 'a' && c <= 'z') && !(c >= 'A' && c <= 'Z') && !(c >= '0' && c <= '9')
                    && c != '+' && c != '/' && c != '=') return false;
        }
        return  true;
    }

    public static boolean isbase64Regx(String str) {
        String base64Pattern = "^([A-Za-z0-9+/]{4})*([A-Za-z0-9+/]{4}|[A-Za-z0-9+/]{3}=|[A-Za-z0-9+/]{2}==)$";
        return Pattern.matches(base64Pattern, str);
    }

    public static boolean isMessyCode(String strName) {
        try {
            Pattern p = Pattern.compile("\s*|t*|r*|n*");
            Matcher m = p.matcher(strName);
            String after = m.replaceAll("");
            String temp = after.replaceAll("\p{P}", "");
            char[] ch = temp.trim().toCharArray();

            int length = (ch != null) ? ch.length : 0;
            for (int i = 0; i < length; i++) {
                char c = ch[i];
                if (!Character.isLetterOrDigit(c)) {
                    String str = "" + ch[i];
                    if (!str.matches("[u4e00-u9fa5]+")) {
                        return true;
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

        return false;
    }

    //数据校验
    public  static String returnStrVal(String value,String key) {

        String decodeval="-1";

        if (isbase64(value) && isbase64Regx(value)) {

            try {
                decodeval = SM4Utils.decodeBybase64(value, key.getBytes());
                if(isMessyCode(decodeval)){
                    decodeval="-1";
                }
            } catch (Exception e) {
            }
        }
        return decodeval;
    }


    public static JSonObject returnProfileSetJsonStr(JSonObject jsonObject){

        if(jsonObject.getString("distinct_id")!=null){

            String distinct_id = jsonObject.getString("distinct_id");
            String decodeval = returnStrVal(distinct_id, "Asdf5678ghjk1234");
            if(!decodeval.equals("-1")&&!isMessyCode(decodeval)) {
                jsonObject.put("distinct_id", decodeval);
            }
        }

        return jsonObject;
    }


    //返回event表的track格式json
    public static JSonObject returnEventJsonStr(JSonObject jsonObject){

        String distinctidRegx = "distinct_id";
        String propertiesRegx = "properties";
        String accountNumberRegx = "account_number";
        String phonenumberRegx = "phone_number";
        
        #秘钥自定义
        String key = "########";

        if(jsonObject.getString(distinctidRegx)!=null){
            String distinct_id = jsonObject.getString(distinctidRegx);
            String decodeval = returnStrVal(distinct_id,key);
            if(!decodeval.equals("-1")) {
                jsonObject.put(distinctidRegx, decodeval);
            }
        }

        //处理account_number,在properties属性里面
        if( jsonObject.get(propertiesRegx)!=null ){
            JSonObject jsonNode= (JSONObject) jsonObject.get(propertiesRegx);
            if (jsonNode.getString(accountNumberRegx)!=null) {
                String accountnumberVal = returnStrVal(jsonNode.getString(accountNumberRegx), key);
                if (!accountnumberVal.equals("-1")) {
                    jsonNode.put("account_number_plaintext",accountnumberVal);
                }
            }
        }

        //处理phone_number
        if( jsonObject.get(propertiesRegx)!=null ){
            JSonObject jsonNode= (JSONObject) jsonObject.get(propertiesRegx);
            if( jsonNode.getString(phonenumberRegx)!=null ) {
                String phonenumberVal = returnStrVal(jsonNode.getString(phonenumberRegx), key);
                if (!phonenumberVal.equals("-1")) {
                    jsonNode.put("phone_number_plaintext",phonenumberVal);
                }
            }
        }
        return jsonObject;
    }

}
 

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号