栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

WideTableMultiDimSQLParser 解析说明:ClickHouse / Hive 数组交并差运算

WideTableMultiDimSQLParser 解析说明:ClickHouse / Hive 数组交并差运算

WideTableMultiDimSQLParser 解析说明 1.ClickHouse 数组交并差运算
--交 t[1] ∩ t[2] : arrayIntersect(t[1], t[2])
select length(arrayDistinct(arrayFilter(x->x is not null, t.res))) as cnt
from (
         select arrayIntersect(t[3], arrayIntersect(t[1], t[2])) as res,
                array(
                            (select groupUniqArray(UserID) from hits_v1 where Sex = 1),
                            (select groupUniqArray(UserID) from hits_v1 where Age > 18),
                            (select groupUniqArray(UserID) from hits_v1 where RequestNum > 0)
                    )                                               t
         ) t;

--并 t[1] ∪ t[2]: arrayConcat(t[1], t[2])
select length(arrayDistinct(arrayFilter(x->x is not null, t.res))) as cnt
from (
         select arrayConcat(t[3], arrayConcat(t[1], t[2])) as res,
                array(
                            (select groupUniqArray(UserID) from hits_v1 where Sex = 1),
                            (select groupUniqArray(UserID) from hits_v1 where Age > 18),
                            (select groupUniqArray(UserID) from hits_v1 where RequestNum > 0)
                    )                                         t
         ) t;

--差 t[1]-t[2] : arrayMap(x->multiIf(x not in arrayIntersect(t[1], t[2]), x, NULL), t[1])
select length(arrayDistinct(arrayFilter(x->x is not null, t.res))) as cnt
from (
         select arrayIntersect(t[3], arrayMap(x->multiIf(x not in arrayIntersect(t[1], t[2]), x, NULL), t[1])) as res,
                array(
                            (select groupUniqArray(UserID) from hits_v1 where Sex = 1),
                            (select groupUniqArray(UserID) from hits_v1 where Age > 18),
                            (select groupUniqArray(UserID) from hits_v1 where RequestNum > 0)
                    )                                                                                             t
         ) t;

--并
select length(arrayDistinct(t.res))
from (
         select arrayConcat(t[3], arrayConcat(t[1], t[2])) as res,
                array(
                            (select groupUniqArray(UserID) from hits_v1 where Sex = 1),
                            (select groupUniqArray(UserID) from hits_v1 where Age > 18),
                            (select groupUniqArray(UserID) from hits_v1 where RequestNum > 0)
                    )                                         t
         ) t;

ClickHouse :

(arrayMap(x->multiIf(x not in arrayIntersect(t[1],t[2],(arrayIntersect(t[4],t[5],t[6]))), x, NULL), t[1]))
(select collect_set(user_id) from db1.table1 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f1     = '1'   )),
(select collect_set(user_id) from db2.table2 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f2     = '22'   )),
(select collect_set(user_id) from db2.table2 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f3     = 333   )),
(select collect_set(user_id) from db3.table3 where (  1=1  ) and (  f4     = '4'   )),
(select collect_set(user_id) from db3.table3 where (  1=1  ) and (  f5     = 5   )),
(select collect_set(user_id) from db3.table3 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f6     = 6   ))
2.Hive 数组交并差运算:
select
    array_intersect(array(1, 2), array(2, 3)) i,
    array_union(array(1, 2), array(2, 3)) u,
    array_except(array(1, 2), array(2, 3)) e;

Hive:

(array_except(t[1],t[2],(array_intersect(t[4],t[5],t[6]))))
(select collect_set(user_id) from db1.table1 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f1     = '1'   )),
(select collect_set(user_id) from db2.table2 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f2     = '22'   )),
(select collect_set(user_id) from db2.table2 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f3     = 333   )),
(select collect_set(user_id) from db3.table3 where (  1=1  ) and (  f4     = '4'   )),
(select collect_set(user_id) from db3.table3 where (  1=1  ) and (  f5     = 5   )),
(select collect_set(user_id) from db3.table3 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f6     = 6   ))
附源码
data class TagIdx(var kexprId: Int, var tagCode: String, var tagOptionCode: String, var conditionExpr: String, var index: Int)

fun isLeafNode(e: KunLunexpression) = CollectionUtils.isEmpty(e.subexpression)

fun tagOptionConditions(requestDTO: SQLQueryReqDTO, tableMappingMap: Map>): List {
    val tagIdxList = mutableListOf()
    //递归解析rule表达式,打平成过滤条件列表
    val kexpr: KunLunexpression = requestDTO.expression
    parseTagIdx(kexpr, tagIdxList, tableMappingMap)
    // 设置 index 字段值,用索引下标+1
    tagIdxList.forEachIndexed { index, tagIdx ->
        tagIdx.index = index + 1
    }
    return tagIdxList
}


fun parseTagIdx(kexpr: KunLunexpression, tagIdxList: MutableList, tableMappingMap: Map>) {
    val fieldCondition = kexpr.fieldCondition
    if (null != fieldCondition) {

        val dimFilter = StringBuilder()
        // 维度过滤条件,每个标签 TableCode 上都有自己的维度.真正用于过滤的是 FieldCode,所以 fieldCondition 这里加上: tagDimCondition
        val dimConditionList = kexpr.fieldCondition.dimConditionList

        if (CollectionUtils.isEmpty(dimConditionList)) {
            dimFilter.append(" 1=1 ")
        } else {
            val lastIndex = dimConditionList.size - 1
            dimConditionList.forEachIndexed { index, dimField ->

                val dimTagCode = dimField.tableCode
                val dimFieldCode = dimField.fieldCode
                val dimKTableMapping = tableMappingMap[dimTagCode]!![0]
                val dimPhysicalField = dimKTableMapping.fields.first { it.srcField.columnCode == dimFieldCode }.dstField
                val dimPhysicalcolumnCode = dimPhysicalField.columnCode
                val dimFieldValueType = dimPhysicalField.fieldType
                val v = parseFieldValue(dimField, dimFieldValueType)
                val singlevalue = v.get(0)?.sqlCondition

                if (index != lastIndex) {
                    dimFilter.append(" $dimPhysicalcolumnCode = $singlevalue and ")
                } else {
                    dimFilter.append(" $dimPhysicalcolumnCode = $singlevalue ")
                }
            }
        }

        val tagCode = fieldCondition.tableCode
        val fieldCode = fieldCondition.fieldCode
        val KTableMapping = tableMappingMap[tagCode]!![0]

        val physicalField = KTableMapping.fields.first { it.srcField.columnCode == fieldCode }.dstField
        val physicalcolumnCode = physicalField.columnCode
        val fieldValueType = physicalField.fieldType
        val targetFieldCode = KTableMapping.targetField.columnCode
        val dbName = KTableMapping.physicDBName
        val tableName = KTableMapping.getkTableCode()
        val filterConditionClause = genFilterConditionClause(fieldCondition, physicalcolumnCode, fieldValueType)

        val line = "select collect_set($targetFieldCode) from $dbName.$tableName where ( $dimFilter ) and ( $filterConditionClause )"
        val tagIdx = TagIdx(kexprId = kexpr.tfId, tagCode = tagCode, tagOptionCode = fieldCode, conditionExpr = line, index = -1) // index 先设置默认值 -1
        tagIdxList.add(tagIdx)
    }
    // 递归子语句
    kexpr.subexpression?.forEach {
        parseTagIdx(it, tagIdxList, tableMappingMap)
    }
}

fun genFilterConditionClause(fieldCondition: FieldCondition, physicalField: String, fieldValueType: KFieldValueType): String {
    val fv = parseFieldValue(fieldCondition, fieldValueType)
    if (CollectionUtils.isEmpty(fv)) {
        throw IllegalArgumentException("fieldCondition must have fieldValue!")
    }
    val size = fv.size
    // 多值(1,2,3,4)
    val listValue = StringBuilder()
    listValue.append("(")
    fv.forEachIndexed { index, fieldValue ->
        if (index == size - 1)
            listValue.append(fieldValue?.sqlCondition)
        else
            listValue.append(fieldValue?.sqlCondition).append(",")
    }
    listValue.append(")")
    // 单值
    val singlevalue = fv.get(0)?.sqlCondition
    val singlevalueNoQuote = fv.get(0)?.qlCondition

    var conditionExpr = ""
    conditionExpr = when (fieldCondition.operator) {
        ArithmeticOperatorEnum.LIKE -> "  like '%${singlevalueNoQuote}%' "
        ArithmeticOperatorEnum.EQUAL -> "    = ${singlevalue} "
        ArithmeticOperatorEnum.GREATER_EQUAL_THAN -> "    >= ${singlevalue} "
        ArithmeticOperatorEnum.LESS_THAN -> "    < ${singlevalue} "
        ArithmeticOperatorEnum.LESS_EQUAL_THAN -> "    <= ${singlevalue} "
        ArithmeticOperatorEnum.GREATER_THAN -> "    > ${singlevalue} "
        ArithmeticOperatorEnum.BETWEEN -> "    between ${fv.get(0)?.sqlCondition} and ${fv.get(1)?.sqlCondition} "
        ArithmeticOperatorEnum.IN -> "    in ${listValue} "
        ArithmeticOperatorEnum.NOT_IN -> "    not in ${listValue} "

        else -> throw IllegalStateException("${fieldCondition.operator} not supported yet")
    }

    return " $physicalField $conditionExpr "
}


fun parseFieldValue(fieldCondition: FieldCondition, fieldValueType: KFieldValueType): List?> {
    val values = fieldCondition.values
    if (values == null || values.isEmpty()) {
        ExceptionHelper.bizError("illegal value size,values length must greater than 0.")
    }

    // 特征值类型
    lateinit var clazz: Class>
    when (fieldValueType) {
        KFieldValueType.STRING -> clazz = StringFieldValue::class.java
        KFieldValueType.LONG -> clazz = LongFieldValue::class.java
        KFieldValueType.DOUBLE -> clazz = DoubleFieldValue::class.java
        else -> ExceptionHelper.bizError("$fieldValueType fieldValueType not supported!")
    }
    return FieldValue.create(clazz, *values.toTypedArray())
}






fun recurExtractTagCodeAndObjectSet(expression: KunLunexpression, tagbaseFieldList: MutableList, objectSetList: MutableList) {

    // 子表达式为空,递归结束
    if (isLeafNode(expression)) {
        val fieldCondition = expression.fieldCondition

        // 添加分群
        if (StringUtils.isNotEmpty(fieldCondition.objectSetId)) {
            objectSetList.add(fieldCondition.objectSetId)
        } else {
            // 添加标签
            val tagbaseField = TagbaseField()
            tagbaseField.tableCode = fieldCondition.tableCode
            tagbaseField.fieldCode = fieldCondition.fieldCode
            tagbaseFieldList.add(tagbaseField)
        }
        return
    }

    // 递归遍历子节点
    for (subexpression in expression.subexpression) {
        recurExtractTagCodeAndObjectSet(subexpression, tagbaseFieldList, objectSetList)
    }
}

@Service
class CommonParseUtils {


    fun getTableMappingMap(tenant: Tenant, requestDTO: SQLQueryReqDTO): Map> {
        // 标签 & 分群
        val tagbaseFieldList: MutableList = mutableListOf()
        val objectSetList: MutableList = mutableListOf()
        recurExtractTagCodeAndObjectSet(requestDTO.getexpression(), tagbaseFieldList, objectSetList)
        // meta
        val tableMappingList: List = getTagCodeTableMapping(tenant.id, tagbaseFieldList, requestDTO.getDriverType())
        return tableMappingList.groupBy { it.tableCode }
    }

    
    fun getTagCodeTableMapping(tenantId: Long, tagbaseFieldList: List, driverType: DriverType): List {
        if (CollectionUtils.isEmpty(tagbaseFieldList)) {
            return emptyList()
        }

        // 获取映射关系
        // TODO 元数据: kTableMappings
        val kTableMappings: List = ArrayList()

        val tagCodeTableMapping = kTableMappings.stream().collect(Collectors.toMap({ obj: KTableMapping -> obj.tableCode }, Function.identity()))

        // check
        for (tagbaseField in tagbaseFieldList) {
            val kTableMapping = tagCodeTableMapping[tagbaseField.tableCode] ?: throw ExceptionHelper.bizError(String.format("tag code [%s] is non-exists", tagbaseField.tableCode))
            val fields = kTableMapping.fields
            val existsTagOption = fields.stream().noneMatch { kFieldMapping: KFieldMapping -> kFieldMapping.srcField.columnCode == tagbaseField.fieldCode }
            if (!existsTagOption) {
                throw ExceptionHelper.bizError(String.format("tag option [%s] is non-exists", tagbaseField.fieldCode))
            }
        }
        return kTableMappings
    }

}
@Service
class WideTableMultiDimCHSQLParser {
    val log = LoggerFactory.getLogger(WideTableMultiDimCHSQLParser::class.java)

    @Resource
    lateinit var commonParseUtils: CommonParseUtils

    
    fun parseCount(tenant: Tenant, requestDTO: SQLQueryReqDTO): String {
        val tableMappingMap = commonParseUtils.getTableMappingMap(tenant, requestDTO)
        // Parse KunLunexpression
        return WIDE_TABLE_COUNT_SQL_TEMPLATE(
            expr = expr(requestDTO, tableMappingMap),
            arrayLines = arrayLines(requestDTO, tableMappingMap)
        )
    }


    
    fun parseCircle(tenant: Tenant, requestDTO: SQLQueryReqDTO): String {
        val tableMappingMap = commonParseUtils.getTableMappingMap(tenant, requestDTO)
        
        val csvFile = ""
        // Parse KunLunexpression
        return WIDE_TABLE_CIRCLE_SQL_TEMPLATE(
            expr = expr(requestDTO, tableMappingMap),
            arrayLines = arrayLines(requestDTO, tableMappingMap),
            csvFile = csvFile,
        )
    }


    fun expr(requestDTO: SQLQueryReqDTO, tableMappingMap: Map>): String {
        val tagIdxs: List = tagOptionConditions(requestDTO, tableMappingMap)
        val exprMap = tagIdxs.groupBy { it.kexprId }
        return genWhereClause(exprMap, requestDTO.expression)
    }


    private fun genWhereClause(exprMap: Map>, kunLunexpression: KunLunexpression): String {
        val subexpression = kunLunexpression.subexpression
        if (CollectionUtils.isEmpty(subexpression)) { // 叶子节点
            return ""
        }

        val w = StringBuffer()
        val size = subexpression.size
        val logic = kunLunexpression.logic

        w.append("(")

        if (logic == LogicOperatorEnum.AND) {
            w.append("arrayIntersect(")
        } else if (logic == LogicOperatorEnum.OR) {
            w.append("arrayConcat(")
        } else if (logic == LogicOperatorEnum.EXCEPT) {
            w.append("arrayMap(x->multiIf(x not in arrayIntersect(")
        } else {
            throw IllegalArgumentException("logic $logic not supported!")
        }

        var firstTagIdx: Int = 1
        subexpression.forEachIndexed { index, e ->
            // 最叶子节点
            if (isLeafNode(e)) {
                val targetTagIdx = exprMap[e.tfId]?.get(0)
                val tagIdx = targetTagIdx!!.index

                // 计算差集使用
                if (index == 0) {
                    firstTagIdx = tagIdx
                }

                if (index != size - 1) {
                    w.append("t[$tagIdx],")
                } else {
                    w.append("t[$tagIdx]")
                }
            }
            // 递归非叶子节点
            else {
                w.append(genWhereClause(exprMap, e))
            }
        }

        if (logic == LogicOperatorEnum.AND || logic == LogicOperatorEnum.OR) {
            w.append("))")
        } else if (logic == LogicOperatorEnum.EXCEPT) {
            w.append("), x, NULL), t[$firstTagIdx]))")
        }

        return w.toString()
    }


    
    fun arrayLines(requestDTO: SQLQueryReqDTO, tableMappingMap: Map>): String {
        val tagIdxs: List = tagOptionConditions(requestDTO, tableMappingMap)
        val size = tagIdxs.size
        val arrayLines = StringBuffer()

        tagIdxs.forEachIndexed { index, tagIdx ->
            if (index != size - 1) {
                arrayLines.append("(${tagIdx.conditionExpr}), n")
            } else {
                arrayLines.append("(${tagIdx.conditionExpr})  n")
            }
        }
        return arrayLines.toString()
    }


    
    private fun WIDE_TABLE_COUNT_SQL_TEMPLATE(
        expr: String,
        arrayLines: String,
    ) = """
select length(arrayDistinct(arrayFilter(x->x is not null, t.res))) as cnt
from (
    select $expr as res,
    array(
    $arrayLines
    ) t
) t
"""


    
    private fun WIDE_TABLE_CIRCLE_SQL_TEMPLATE(
        expr: String,
        arrayLines: String,
        csvFile: String,
    ) = """
select arrayJoin(arrayDistinct(arrayFilter(x->x is not null, t.res)))
from (
    select $expr as res,
    array(
    $arrayLines
    ) t
) t
INTO OUTFILe 'tos:///xxx' FORMAT CSV
settings distributed_perfect_shard=1,max_execution_time = 600
"""


}



fun main() {
    val requestDTO = SQLQueryReqDTO()
    val tableMappingMap: HashMap> = hashMapOf()
    val expression = KunLunexpression()
    expression.logic = LogicOperatorEnum.EXCEPT
    val subexpressionList = arrayListOf()
    val e1 = KunLunexpression()
    val e2 = KunLunexpression()
    val e3 = KunLunexpression()

    val dimList = listOf(
        FieldCondition("", "t1", "cate_id", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("10001")),
        FieldCondition("", "t1", "shop_id", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("798322"))
    )

    e1.fieldCondition = FieldCondition("", "t1", "f1", dimList, ArithmeticOperatorEnum.EQUAL, listOf("1"))
    e2.fieldCondition = FieldCondition("", "t2", "f2", dimList, ArithmeticOperatorEnum.EQUAL, listOf("22"))
    e3.fieldCondition = FieldCondition("", "t2", "f3", dimList, ArithmeticOperatorEnum.EQUAL, listOf("333"))
    e3.logic = LogicOperatorEnum.AND

    val e3SubexpressionList = arrayListOf()
    val e31 = KunLunexpression()
    val e32 = KunLunexpression()
    val e33 = KunLunexpression()
    e3SubexpressionList.add(e31)
    e3SubexpressionList.add(e32)
    e3SubexpressionList.add(e33)
    e31.fieldCondition = FieldCondition("", "t3", "f4", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("4"))
    e32.fieldCondition = FieldCondition("", "t3", "f5", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("5"))
    e33.fieldCondition = FieldCondition("", "t3", "f6", dimList, ArithmeticOperatorEnum.EQUAL, listOf("6"))
    e3.subexpression = e3SubexpressionList

    subexpressionList.add(e1)
    subexpressionList.add(e2)
    subexpressionList.add(e3)
    expression.subexpression = subexpressionList
    requestDTO.expression = expression

    // KTableMapping(boolean rowMapping, String tableCode, String kTableCode, String physicDBName, KField targetField, KSource source, List fields)
    // KField(String columnCode, String fieldCode, KFieldValueType fieldType, String description)
    // KSource(Long tagSrcTaskId, String tagSrcDb, String tagSrcTable, String tagSrcTableJoinField)
    // KFieldMapping(KField srcField, KField dstField)

    tableMappingMap["t1"] = listOf(KTableMapping(
        "t1",
        "table1",
        "db1",
        KField("user_id", "", KFieldValueType.STRING, ""),
        KSource(0, "db1", "table1", "user_id"),
        listOf(
            KFieldMapping(
                KField("f1", "", KFieldValueType.STRING, ""), // srcField
                KField("f1", "", KFieldValueType.STRING, "") // dstField
            ),
            KFieldMapping(
                KField("cate_id", "", KFieldValueType.STRING, ""), // srcField
                KField("cate_id", "", KFieldValueType.STRING, "") // dstField
            ),
            KFieldMapping(
                KField("shop_id", "", KFieldValueType.LONG, ""), // srcField
                KField("shop_id", "", KFieldValueType.LONG, "") // dstField
            ),
        )
    ))

    tableMappingMap["t2"] = listOf(KTableMapping(
        "t2",
        "table2",
        "db2",
        KField("user_id", "", KFieldValueType.STRING, ""),
        KSource(0, "db2", "table2", "user_id"),
        listOf(
            KFieldMapping(
                KField("f2", "", KFieldValueType.STRING, ""), // srcField
                KField("f2", "", KFieldValueType.STRING, "") // dstField
            ),
            KFieldMapping(
                KField("f3", "", KFieldValueType.LONG, ""), // srcField
                KField("f3", "", KFieldValueType.LONG, "") // dstField
            ),
            KFieldMapping(
                KField("cate_id", "", KFieldValueType.STRING, ""), // srcField
                KField("cate_id", "", KFieldValueType.STRING, "") // dstField
            ),
            KFieldMapping(
                KField("shop_id", "", KFieldValueType.LONG, ""), // srcField
                KField("shop_id", "", KFieldValueType.LONG, "") // dstField
            ),
        )
    ))

    tableMappingMap["t3"] = listOf(KTableMapping(
        "t3",
        "table3",
        "db3",
        KField("user_id", "", KFieldValueType.STRING, ""),
        KSource(0, "db3", "table3", "user_id"),
        listOf(
            KFieldMapping(
                KField("f4", "", KFieldValueType.STRING, ""), // srcField
                KField("f4", "", KFieldValueType.STRING, "") // dstField
            ),
            KFieldMapping(
                KField("f5", "", KFieldValueType.LONG, ""), // srcField
                KField("f5", "", KFieldValueType.LONG, "") // dstField
            ),
            KFieldMapping(
                KField("f6", "", KFieldValueType.LONG, ""), // srcField
                KField("f6", "", KFieldValueType.LONG, "") // dstField
            ),
            KFieldMapping(
                KField("cate_id", "", KFieldValueType.STRING, ""), // srcField
                KField("cate_id", "", KFieldValueType.STRING, "") // dstField
            ),
            KFieldMapping(
                KField("shop_id", "", KFieldValueType.LONG, ""), // srcField
                KField("shop_id", "", KFieldValueType.LONG, "") // dstField
            ),
        )
    ))

    val WideTableMultiDimCHSQLParser = WideTableMultiDimCHSQLParser()
    val expr = WideTableMultiDimCHSQLParser.expr(requestDTO, tableMappingMap)
    val arrayLines = WideTableMultiDimCHSQLParser.arrayLines(requestDTO, tableMappingMap)

    println(expr)
    println(arrayLines)
}










@Service
class WideTableMultiDimHiveSQLParser {

    val log = LoggerFactory.getLogger(WideTableMultiDimCHSQLParser::class.java)
    @Resource
    lateinit var commonParseUtils: CommonParseUtils
    
    fun parseCount(tenant: Tenant, requestDTO: SQLQueryReqDTO): String {
        val tableMappingMap = commonParseUtils.getTableMappingMap(tenant, requestDTO)
        // Parse KunLunexpression
        return WIDE_TABLE_COUNT_SQL_TEMPLATE(
            expr = expr(requestDTO, tableMappingMap),
            arrayLines = arrayLines(requestDTO, tableMappingMap)
        )
    }


    
    fun parseCircle(tenant: Tenant, requestDTO: SQLQueryReqDTO): String {
        val tableMappingMap = commonParseUtils.getTableMappingMap(tenant, requestDTO)
       
        val csvFile = ""
        // Parse KunLunexpression
        return WIDE_TABLE_CIRCLE_SQL_TEMPLATE(
            expr = expr(requestDTO, tableMappingMap),
            arrayLines = arrayLines(requestDTO, tableMappingMap),
            csvFile = csvFile,
        )
    }


    fun expr(requestDTO: SQLQueryReqDTO, tableMappingMap: Map>): String {
        val tagIdxs: List = tagOptionConditions(requestDTO, tableMappingMap)
        val exprMap = tagIdxs.groupBy { it.kexprId }
        return genWhereClause(exprMap, requestDTO.expression)
    }


    private fun genWhereClause(exprMap: Map>, kunLunexpression: KunLunexpression): String {
        val subexpression = kunLunexpression.subexpression
        if (CollectionUtils.isEmpty(subexpression)) { // 叶子节点
            return ""
        }

        val w = StringBuffer()
        val size = subexpression.size
        val logic = kunLunexpression.logic

        w.append("(")

        if (logic == LogicOperatorEnum.AND) {
            w.append("array_intersect(")
        } else if (logic == LogicOperatorEnum.OR) {
            w.append("array_union(")
        } else if (logic == LogicOperatorEnum.EXCEPT) {
            w.append("array_except(")
        } else {
            throw IllegalArgumentException("logic $logic not supported!")
        }

        var firstTagIdx: Int = 1
        subexpression.forEachIndexed { index, e ->
            // 最叶子节点
            if (isLeafNode(e)) {
                val targetTagIdx = exprMap[e.tfId]?.get(0)
                val tagIdx = targetTagIdx!!.index

                // 计算差集使用
                if (index == 0) {
                    firstTagIdx = tagIdx
                }

                if (index != size - 1) {
                    w.append("t[$tagIdx],")
                } else {
                    w.append("t[$tagIdx]")
                }
            }
            // 递归非叶子节点
            else {
                w.append(genWhereClause(exprMap, e))
            }
        }

        w.append("))")
        return w.toString()
    }


    
    fun arrayLines(requestDTO: SQLQueryReqDTO, tableMappingMap: Map>): String {
        val tagIdxs: List = tagOptionConditions(requestDTO, tableMappingMap)
        val size = tagIdxs.size
        val arrayLines = StringBuffer()

        tagIdxs.forEachIndexed { index, tagIdx ->
            if (index != size - 1) {
                arrayLines.append("(${tagIdx.conditionExpr}), n")
            } else {
                arrayLines.append("(${tagIdx.conditionExpr})  n")
            }
        }
        return arrayLines.toString()
    }



    
    private fun WIDE_TABLE_COUNT_SQL_TEMPLATE(
        expr: String,
        arrayLines: String,
    ) = """
select size(t.res) as cnt
from (
    select $expr as res,
    array(
    $arrayLines
    ) t
) t
"""


    
    private fun WIDE_TABLE_CIRCLE_SQL_TEMPLATE(
        expr: String,
        arrayLines: String,
        csvFile: String,
    ) = """
select explode(t.res) as ids
from (
    select $expr as res,
    array(
    $arrayLines
    ) t
) t
"""






}



fun main() {
    val requestDTO = SQLQueryReqDTO()
    val tableMappingMap: HashMap> = hashMapOf()
    val expression = KunLunexpression()
    expression.logic = LogicOperatorEnum.EXCEPT
    val subexpressionList = arrayListOf()
    val e1 = KunLunexpression()
    val e2 = KunLunexpression()
    val e3 = KunLunexpression()

    val dimList = listOf(
        FieldCondition("", "t1", "cate_id", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("10001")),
        FieldCondition("", "t1", "shop_id", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("798322"))
    )

    e1.fieldCondition = FieldCondition("", "t1", "f1", dimList, ArithmeticOperatorEnum.EQUAL, listOf("1"))
    e2.fieldCondition = FieldCondition("", "t2", "f2", dimList, ArithmeticOperatorEnum.EQUAL, listOf("22"))
    e3.fieldCondition = FieldCondition("", "t2", "f3", dimList, ArithmeticOperatorEnum.EQUAL, listOf("333"))
    e3.logic = LogicOperatorEnum.AND

    val e3SubexpressionList = arrayListOf()
    val e31 = KunLunexpression()
    val e32 = KunLunexpression()
    val e33 = KunLunexpression()
    e3SubexpressionList.add(e31)
    e3SubexpressionList.add(e32)
    e3SubexpressionList.add(e33)
    e31.fieldCondition = FieldCondition("", "t3", "f4", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("4"))
    e32.fieldCondition = FieldCondition("", "t3", "f5", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("5"))
    e33.fieldCondition = FieldCondition("", "t3", "f6", dimList, ArithmeticOperatorEnum.EQUAL, listOf("6"))
    e3.subexpression = e3SubexpressionList

    subexpressionList.add(e1)
    subexpressionList.add(e2)
    subexpressionList.add(e3)
    expression.subexpression = subexpressionList
    requestDTO.expression = expression

    // KTableMapping(boolean rowMapping, String tableCode, String kTableCode, String physicDBName, KField targetField, KSource source, List fields)
    // KField(String columnCode, String fieldCode, KFieldValueType fieldType, String description)
    // KSource(Long tagSrcTaskId, String tagSrcDb, String tagSrcTable, String tagSrcTableJoinField)
    // KFieldMapping(KField srcField, KField dstField)

    tableMappingMap["t1"] = listOf(KTableMapping(
        "t1",
        "table1",
        "db1",
        KField("user_id", "", KFieldValueType.STRING, ""),
        KSource(0, "db1", "table1", "user_id"),
        listOf(
            KFieldMapping(
                KField("f1", "", KFieldValueType.STRING, ""), // srcField
                KField("f1", "", KFieldValueType.STRING, "") // dstField
            ),
            KFieldMapping(
                KField("cate_id", "", KFieldValueType.STRING, ""), // srcField
                KField("cate_id", "", KFieldValueType.STRING, "") // dstField
            ),
            KFieldMapping(
                KField("shop_id", "", KFieldValueType.LONG, ""), // srcField
                KField("shop_id", "", KFieldValueType.LONG, "") // dstField
            ),
        )
    ))

    tableMappingMap["t2"] = listOf(KTableMapping(
        "t2",
        "table2",
        "db2",
        KField("user_id", "", KFieldValueType.STRING, ""),
        KSource(0, "db2", "table2", "user_id"),
        listOf(
            KFieldMapping(
                KField("f2", "", KFieldValueType.STRING, ""), // srcField
                KField("f2", "", KFieldValueType.STRING, "") // dstField
            ),
            KFieldMapping(
                KField("f3", "", KFieldValueType.LONG, ""), // srcField
                KField("f3", "", KFieldValueType.LONG, "") // dstField
            ),
        )
    ))

    tableMappingMap["t3"] = listOf(KTableMapping(
        "t3",
        "table3",
        "db3",
        KField("user_id", "", KFieldValueType.STRING, ""),
        KSource(0, "db3", "table3", "user_id"),
        listOf(
            KFieldMapping(
                KField("f4", "", KFieldValueType.STRING, ""), // srcField
                KField("f4", "", KFieldValueType.STRING, "") // dstField
            ),
            KFieldMapping(
                KField("f5", "", KFieldValueType.LONG, ""), // srcField
                KField("f5", "", KFieldValueType.LONG, "") // dstField
            ),
            KFieldMapping(
                KField("f6", "", KFieldValueType.LONG, ""), // srcField
                KField("f6", "", KFieldValueType.LONG, "") // dstField
            ),
        )
    ))

    val WideTableMultiDimHiveSQLParser = WideTableMultiDimHiveSQLParser()
    val expr = WideTableMultiDimHiveSQLParser.expr(requestDTO, tableMappingMap)
    val arrayLines = WideTableMultiDimHiveSQLParser.arrayLines(requestDTO, tableMappingMap)

    println(expr)
    println(arrayLines)
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/760970.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号