--交 t[1] ∩ t[2] : arrayIntersect(t[1], t[2])
select length(arrayDistinct(arrayFilter(x->x is not null, t.res))) as cnt
from (
select arrayIntersect(t[3], arrayIntersect(t[1], t[2])) as res,
array(
(select groupUniqArray(UserID) from hits_v1 where Sex = 1),
(select groupUniqArray(UserID) from hits_v1 where Age > 18),
(select groupUniqArray(UserID) from hits_v1 where RequestNum > 0)
) t
) t;
--并 t[1] ∪ t[2]: arrayConcat(t[1], t[2])
select length(arrayDistinct(arrayFilter(x->x is not null, t.res))) as cnt
from (
select arrayConcat(t[3], arrayConcat(t[1], t[2])) as res,
array(
(select groupUniqArray(UserID) from hits_v1 where Sex = 1),
(select groupUniqArray(UserID) from hits_v1 where Age > 18),
(select groupUniqArray(UserID) from hits_v1 where RequestNum > 0)
) t
) t;
--差 t[1]-t[2] : arrayMap(x->multiIf(x not in arrayIntersect(t[1], t[2]), x, NULL), t[1])
select length(arrayDistinct(arrayFilter(x->x is not null, t.res))) as cnt
from (
select arrayIntersect(t[3], arrayMap(x->multiIf(x not in arrayIntersect(t[1], t[2]), x, NULL), t[1])) as res,
array(
(select groupUniqArray(UserID) from hits_v1 where Sex = 1),
(select groupUniqArray(UserID) from hits_v1 where Age > 18),
(select groupUniqArray(UserID) from hits_v1 where RequestNum > 0)
) t
) t;
--并
select length(arrayDistinct(t.res))
from (
select arrayConcat(t[3], arrayConcat(t[1], t[2])) as res,
array(
(select groupUniqArray(UserID) from hits_v1 where Sex = 1),
(select groupUniqArray(UserID) from hits_v1 where Age > 18),
(select groupUniqArray(UserID) from hits_v1 where RequestNum > 0)
) t
) t;
ClickHouse :
(arrayMap(x->multiIf(x not in arrayIntersect(t[1],t[2],(arrayIntersect(t[4],t[5],t[6]))), x, NULL), t[1])) (select collect_set(user_id) from db1.table1 where ( cate_id = '10001' and shop_id = 798322 ) and ( f1 = '1' )), (select collect_set(user_id) from db2.table2 where ( cate_id = '10001' and shop_id = 798322 ) and ( f2 = '22' )), (select collect_set(user_id) from db2.table2 where ( cate_id = '10001' and shop_id = 798322 ) and ( f3 = 333 )), (select collect_set(user_id) from db3.table3 where ( 1=1 ) and ( f4 = '4' )), (select collect_set(user_id) from db3.table3 where ( 1=1 ) and ( f5 = 5 )), (select collect_set(user_id) from db3.table3 where ( cate_id = '10001' and shop_id = 798322 ) and ( f6 = 6 ))2.Hive 数组交并差运算:
select
array_intersect(array(1, 2), array(2, 3)) i,
array_union(array(1, 2), array(2, 3)) u,
array_except(array(1, 2), array(2, 3)) e;
Hive:
(array_except(t[1],t[2],(array_intersect(t[4],t[5],t[6])))) (select collect_set(user_id) from db1.table1 where ( cate_id = '10001' and shop_id = 798322 ) and ( f1 = '1' )), (select collect_set(user_id) from db2.table2 where ( cate_id = '10001' and shop_id = 798322 ) and ( f2 = '22' )), (select collect_set(user_id) from db2.table2 where ( cate_id = '10001' and shop_id = 798322 ) and ( f3 = 333 )), (select collect_set(user_id) from db3.table3 where ( 1=1 ) and ( f4 = '4' )), (select collect_set(user_id) from db3.table3 where ( 1=1 ) and ( f5 = 5 )), (select collect_set(user_id) from db3.table3 where ( cate_id = '10001' and shop_id = 798322 ) and ( f6 = 6 ))附源码
data class TagIdx(var kexprId: Int, var tagCode: String, var tagOptionCode: String, var conditionExpr: String, var index: Int) fun isLeafNode(e: KunLunexpression) = CollectionUtils.isEmpty(e.subexpression) fun tagOptionConditions(requestDTO: SQLQueryReqDTO, tableMappingMap: Map>): List { val tagIdxList = mutableListOf () //递归解析rule表达式,打平成过滤条件列表 val kexpr: KunLunexpression = requestDTO.expression parseTagIdx(kexpr, tagIdxList, tableMappingMap) // 设置 index 字段值,用索引下标+1 tagIdxList.forEachIndexed { index, tagIdx -> tagIdx.index = index + 1 } return tagIdxList } fun parseTagIdx(kexpr: KunLunexpression, tagIdxList: MutableList , tableMappingMap: Map >) { val fieldCondition = kexpr.fieldCondition if (null != fieldCondition) { val dimFilter = StringBuilder() // 维度过滤条件,每个标签 TableCode 上都有自己的维度.真正用于过滤的是 FieldCode,所以 fieldCondition 这里加上: tagDimCondition val dimConditionList = kexpr.fieldCondition.dimConditionList if (CollectionUtils.isEmpty(dimConditionList)) { dimFilter.append(" 1=1 ") } else { val lastIndex = dimConditionList.size - 1 dimConditionList.forEachIndexed { index, dimField -> val dimTagCode = dimField.tableCode val dimFieldCode = dimField.fieldCode val dimKTableMapping = tableMappingMap[dimTagCode]!![0] val dimPhysicalField = dimKTableMapping.fields.first { it.srcField.columnCode == dimFieldCode }.dstField val dimPhysicalcolumnCode = dimPhysicalField.columnCode val dimFieldValueType = dimPhysicalField.fieldType val v = parseFieldValue(dimField, dimFieldValueType) val singlevalue = v.get(0)?.sqlCondition if (index != lastIndex) { dimFilter.append(" $dimPhysicalcolumnCode = $singlevalue and ") } else { dimFilter.append(" $dimPhysicalcolumnCode = $singlevalue ") } } } val tagCode = fieldCondition.tableCode val fieldCode = fieldCondition.fieldCode val KTableMapping = tableMappingMap[tagCode]!![0] val physicalField = KTableMapping.fields.first { it.srcField.columnCode == fieldCode }.dstField val physicalcolumnCode = physicalField.columnCode val fieldValueType = physicalField.fieldType val targetFieldCode = KTableMapping.targetField.columnCode val dbName = KTableMapping.physicDBName val tableName = KTableMapping.getkTableCode() val filterConditionClause = genFilterConditionClause(fieldCondition, physicalcolumnCode, fieldValueType) val line = "select collect_set($targetFieldCode) from $dbName.$tableName where ( $dimFilter ) and ( $filterConditionClause )" val tagIdx = TagIdx(kexprId = kexpr.tfId, tagCode = tagCode, tagOptionCode = fieldCode, conditionExpr = line, index = -1) // index 先设置默认值 -1 tagIdxList.add(tagIdx) } // 递归子语句 kexpr.subexpression?.forEach { parseTagIdx(it, tagIdxList, tableMappingMap) } } fun genFilterConditionClause(fieldCondition: FieldCondition, physicalField: String, fieldValueType: KFieldValueType): String { val fv = parseFieldValue(fieldCondition, fieldValueType) if (CollectionUtils.isEmpty(fv)) { throw IllegalArgumentException("fieldCondition must have fieldValue!") } val size = fv.size // 多值(1,2,3,4) val listValue = StringBuilder() listValue.append("(") fv.forEachIndexed { index, fieldValue -> if (index == size - 1) listValue.append(fieldValue?.sqlCondition) else listValue.append(fieldValue?.sqlCondition).append(",") } listValue.append(")") // 单值 val singlevalue = fv.get(0)?.sqlCondition val singlevalueNoQuote = fv.get(0)?.qlCondition var conditionExpr = "" conditionExpr = when (fieldCondition.operator) { ArithmeticOperatorEnum.LIKE -> " like '%${singlevalueNoQuote}%' " ArithmeticOperatorEnum.EQUAL -> " = ${singlevalue} " ArithmeticOperatorEnum.GREATER_EQUAL_THAN -> " >= ${singlevalue} " ArithmeticOperatorEnum.LESS_THAN -> " < ${singlevalue} " ArithmeticOperatorEnum.LESS_EQUAL_THAN -> " <= ${singlevalue} " ArithmeticOperatorEnum.GREATER_THAN -> " > ${singlevalue} " ArithmeticOperatorEnum.BETWEEN -> " between ${fv.get(0)?.sqlCondition} and ${fv.get(1)?.sqlCondition} " ArithmeticOperatorEnum.IN -> " in ${listValue} " ArithmeticOperatorEnum.NOT_IN -> " not in ${listValue} " else -> throw IllegalStateException("${fieldCondition.operator} not supported yet") } return " $physicalField $conditionExpr " } fun parseFieldValue(fieldCondition: FieldCondition, fieldValueType: KFieldValueType): List ?> { val values = fieldCondition.values if (values == null || values.isEmpty()) { ExceptionHelper.bizError("illegal value size,values length must greater than 0.") } // 特征值类型 lateinit var clazz: Class > when (fieldValueType) { KFieldValueType.STRING -> clazz = StringFieldValue::class.java KFieldValueType.LONG -> clazz = LongFieldValue::class.java KFieldValueType.DOUBLE -> clazz = DoubleFieldValue::class.java else -> ExceptionHelper.bizError("$fieldValueType fieldValueType not supported!") } return FieldValue.create(clazz, *values.toTypedArray()) } fun recurExtractTagCodeAndObjectSet(expression: KunLunexpression, tagbaseFieldList: MutableList , objectSetList: MutableList ) { // 子表达式为空,递归结束 if (isLeafNode(expression)) { val fieldCondition = expression.fieldCondition // 添加分群 if (StringUtils.isNotEmpty(fieldCondition.objectSetId)) { objectSetList.add(fieldCondition.objectSetId) } else { // 添加标签 val tagbaseField = TagbaseField() tagbaseField.tableCode = fieldCondition.tableCode tagbaseField.fieldCode = fieldCondition.fieldCode tagbaseFieldList.add(tagbaseField) } return } // 递归遍历子节点 for (subexpression in expression.subexpression) { recurExtractTagCodeAndObjectSet(subexpression, tagbaseFieldList, objectSetList) } } @Service class CommonParseUtils { fun getTableMappingMap(tenant: Tenant, requestDTO: SQLQueryReqDTO): Map > { // 标签 & 分群 val tagbaseFieldList: MutableList = mutableListOf() val objectSetList: MutableList = mutableListOf() recurExtractTagCodeAndObjectSet(requestDTO.getexpression(), tagbaseFieldList, objectSetList) // meta val tableMappingList: List = getTagCodeTableMapping(tenant.id, tagbaseFieldList, requestDTO.getDriverType()) return tableMappingList.groupBy { it.tableCode } } fun getTagCodeTableMapping(tenantId: Long, tagbaseFieldList: List , driverType: DriverType): List { if (CollectionUtils.isEmpty(tagbaseFieldList)) { return emptyList() } // 获取映射关系 // TODO 元数据: kTableMappings val kTableMappings: List = ArrayList() val tagCodeTableMapping = kTableMappings.stream().collect(Collectors.toMap({ obj: KTableMapping -> obj.tableCode }, Function.identity())) // check for (tagbaseField in tagbaseFieldList) { val kTableMapping = tagCodeTableMapping[tagbaseField.tableCode] ?: throw ExceptionHelper.bizError(String.format("tag code [%s] is non-exists", tagbaseField.tableCode)) val fields = kTableMapping.fields val existsTagOption = fields.stream().noneMatch { kFieldMapping: KFieldMapping -> kFieldMapping.srcField.columnCode == tagbaseField.fieldCode } if (!existsTagOption) { throw ExceptionHelper.bizError(String.format("tag option [%s] is non-exists", tagbaseField.fieldCode)) } } return kTableMappings } }
@Service
class WideTableMultiDimCHSQLParser {
val log = LoggerFactory.getLogger(WideTableMultiDimCHSQLParser::class.java)
@Resource
lateinit var commonParseUtils: CommonParseUtils
fun parseCount(tenant: Tenant, requestDTO: SQLQueryReqDTO): String {
val tableMappingMap = commonParseUtils.getTableMappingMap(tenant, requestDTO)
// Parse KunLunexpression
return WIDE_TABLE_COUNT_SQL_TEMPLATE(
expr = expr(requestDTO, tableMappingMap),
arrayLines = arrayLines(requestDTO, tableMappingMap)
)
}
fun parseCircle(tenant: Tenant, requestDTO: SQLQueryReqDTO): String {
val tableMappingMap = commonParseUtils.getTableMappingMap(tenant, requestDTO)
val csvFile = ""
// Parse KunLunexpression
return WIDE_TABLE_CIRCLE_SQL_TEMPLATE(
expr = expr(requestDTO, tableMappingMap),
arrayLines = arrayLines(requestDTO, tableMappingMap),
csvFile = csvFile,
)
}
fun expr(requestDTO: SQLQueryReqDTO, tableMappingMap: Map>): String {
val tagIdxs: List = tagOptionConditions(requestDTO, tableMappingMap)
val exprMap = tagIdxs.groupBy { it.kexprId }
return genWhereClause(exprMap, requestDTO.expression)
}
private fun genWhereClause(exprMap: Map>, kunLunexpression: KunLunexpression): String {
val subexpression = kunLunexpression.subexpression
if (CollectionUtils.isEmpty(subexpression)) { // 叶子节点
return ""
}
val w = StringBuffer()
val size = subexpression.size
val logic = kunLunexpression.logic
w.append("(")
if (logic == LogicOperatorEnum.AND) {
w.append("arrayIntersect(")
} else if (logic == LogicOperatorEnum.OR) {
w.append("arrayConcat(")
} else if (logic == LogicOperatorEnum.EXCEPT) {
w.append("arrayMap(x->multiIf(x not in arrayIntersect(")
} else {
throw IllegalArgumentException("logic $logic not supported!")
}
var firstTagIdx: Int = 1
subexpression.forEachIndexed { index, e ->
// 最叶子节点
if (isLeafNode(e)) {
val targetTagIdx = exprMap[e.tfId]?.get(0)
val tagIdx = targetTagIdx!!.index
// 计算差集使用
if (index == 0) {
firstTagIdx = tagIdx
}
if (index != size - 1) {
w.append("t[$tagIdx],")
} else {
w.append("t[$tagIdx]")
}
}
// 递归非叶子节点
else {
w.append(genWhereClause(exprMap, e))
}
}
if (logic == LogicOperatorEnum.AND || logic == LogicOperatorEnum.OR) {
w.append("))")
} else if (logic == LogicOperatorEnum.EXCEPT) {
w.append("), x, NULL), t[$firstTagIdx]))")
}
return w.toString()
}
fun arrayLines(requestDTO: SQLQueryReqDTO, tableMappingMap: Map>): String {
val tagIdxs: List = tagOptionConditions(requestDTO, tableMappingMap)
val size = tagIdxs.size
val arrayLines = StringBuffer()
tagIdxs.forEachIndexed { index, tagIdx ->
if (index != size - 1) {
arrayLines.append("(${tagIdx.conditionExpr}), n")
} else {
arrayLines.append("(${tagIdx.conditionExpr}) n")
}
}
return arrayLines.toString()
}
private fun WIDE_TABLE_COUNT_SQL_TEMPLATE(
expr: String,
arrayLines: String,
) = """
select length(arrayDistinct(arrayFilter(x->x is not null, t.res))) as cnt
from (
select $expr as res,
array(
$arrayLines
) t
) t
"""
private fun WIDE_TABLE_CIRCLE_SQL_TEMPLATE(
expr: String,
arrayLines: String,
csvFile: String,
) = """
select arrayJoin(arrayDistinct(arrayFilter(x->x is not null, t.res)))
from (
select $expr as res,
array(
$arrayLines
) t
) t
INTO OUTFILe 'tos:///xxx' FORMAT CSV
settings distributed_perfect_shard=1,max_execution_time = 600
"""
}
fun main() {
val requestDTO = SQLQueryReqDTO()
val tableMappingMap: HashMap> = hashMapOf()
val expression = KunLunexpression()
expression.logic = LogicOperatorEnum.EXCEPT
val subexpressionList = arrayListOf()
val e1 = KunLunexpression()
val e2 = KunLunexpression()
val e3 = KunLunexpression()
val dimList = listOf(
FieldCondition("", "t1", "cate_id", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("10001")),
FieldCondition("", "t1", "shop_id", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("798322"))
)
e1.fieldCondition = FieldCondition("", "t1", "f1", dimList, ArithmeticOperatorEnum.EQUAL, listOf("1"))
e2.fieldCondition = FieldCondition("", "t2", "f2", dimList, ArithmeticOperatorEnum.EQUAL, listOf("22"))
e3.fieldCondition = FieldCondition("", "t2", "f3", dimList, ArithmeticOperatorEnum.EQUAL, listOf("333"))
e3.logic = LogicOperatorEnum.AND
val e3SubexpressionList = arrayListOf()
val e31 = KunLunexpression()
val e32 = KunLunexpression()
val e33 = KunLunexpression()
e3SubexpressionList.add(e31)
e3SubexpressionList.add(e32)
e3SubexpressionList.add(e33)
e31.fieldCondition = FieldCondition("", "t3", "f4", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("4"))
e32.fieldCondition = FieldCondition("", "t3", "f5", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("5"))
e33.fieldCondition = FieldCondition("", "t3", "f6", dimList, ArithmeticOperatorEnum.EQUAL, listOf("6"))
e3.subexpression = e3SubexpressionList
subexpressionList.add(e1)
subexpressionList.add(e2)
subexpressionList.add(e3)
expression.subexpression = subexpressionList
requestDTO.expression = expression
// KTableMapping(boolean rowMapping, String tableCode, String kTableCode, String physicDBName, KField targetField, KSource source, List fields)
// KField(String columnCode, String fieldCode, KFieldValueType fieldType, String description)
// KSource(Long tagSrcTaskId, String tagSrcDb, String tagSrcTable, String tagSrcTableJoinField)
// KFieldMapping(KField srcField, KField dstField)
tableMappingMap["t1"] = listOf(KTableMapping(
"t1",
"table1",
"db1",
KField("user_id", "", KFieldValueType.STRING, ""),
KSource(0, "db1", "table1", "user_id"),
listOf(
KFieldMapping(
KField("f1", "", KFieldValueType.STRING, ""), // srcField
KField("f1", "", KFieldValueType.STRING, "") // dstField
),
KFieldMapping(
KField("cate_id", "", KFieldValueType.STRING, ""), // srcField
KField("cate_id", "", KFieldValueType.STRING, "") // dstField
),
KFieldMapping(
KField("shop_id", "", KFieldValueType.LONG, ""), // srcField
KField("shop_id", "", KFieldValueType.LONG, "") // dstField
),
)
))
tableMappingMap["t2"] = listOf(KTableMapping(
"t2",
"table2",
"db2",
KField("user_id", "", KFieldValueType.STRING, ""),
KSource(0, "db2", "table2", "user_id"),
listOf(
KFieldMapping(
KField("f2", "", KFieldValueType.STRING, ""), // srcField
KField("f2", "", KFieldValueType.STRING, "") // dstField
),
KFieldMapping(
KField("f3", "", KFieldValueType.LONG, ""), // srcField
KField("f3", "", KFieldValueType.LONG, "") // dstField
),
KFieldMapping(
KField("cate_id", "", KFieldValueType.STRING, ""), // srcField
KField("cate_id", "", KFieldValueType.STRING, "") // dstField
),
KFieldMapping(
KField("shop_id", "", KFieldValueType.LONG, ""), // srcField
KField("shop_id", "", KFieldValueType.LONG, "") // dstField
),
)
))
tableMappingMap["t3"] = listOf(KTableMapping(
"t3",
"table3",
"db3",
KField("user_id", "", KFieldValueType.STRING, ""),
KSource(0, "db3", "table3", "user_id"),
listOf(
KFieldMapping(
KField("f4", "", KFieldValueType.STRING, ""), // srcField
KField("f4", "", KFieldValueType.STRING, "") // dstField
),
KFieldMapping(
KField("f5", "", KFieldValueType.LONG, ""), // srcField
KField("f5", "", KFieldValueType.LONG, "") // dstField
),
KFieldMapping(
KField("f6", "", KFieldValueType.LONG, ""), // srcField
KField("f6", "", KFieldValueType.LONG, "") // dstField
),
KFieldMapping(
KField("cate_id", "", KFieldValueType.STRING, ""), // srcField
KField("cate_id", "", KFieldValueType.STRING, "") // dstField
),
KFieldMapping(
KField("shop_id", "", KFieldValueType.LONG, ""), // srcField
KField("shop_id", "", KFieldValueType.LONG, "") // dstField
),
)
))
val WideTableMultiDimCHSQLParser = WideTableMultiDimCHSQLParser()
val expr = WideTableMultiDimCHSQLParser.expr(requestDTO, tableMappingMap)
val arrayLines = WideTableMultiDimCHSQLParser.arrayLines(requestDTO, tableMappingMap)
println(expr)
println(arrayLines)
}
@Service
class WideTableMultiDimHiveSQLParser {
val log = LoggerFactory.getLogger(WideTableMultiDimCHSQLParser::class.java)
@Resource
lateinit var commonParseUtils: CommonParseUtils
fun parseCount(tenant: Tenant, requestDTO: SQLQueryReqDTO): String {
val tableMappingMap = commonParseUtils.getTableMappingMap(tenant, requestDTO)
// Parse KunLunexpression
return WIDE_TABLE_COUNT_SQL_TEMPLATE(
expr = expr(requestDTO, tableMappingMap),
arrayLines = arrayLines(requestDTO, tableMappingMap)
)
}
fun parseCircle(tenant: Tenant, requestDTO: SQLQueryReqDTO): String {
val tableMappingMap = commonParseUtils.getTableMappingMap(tenant, requestDTO)
val csvFile = ""
// Parse KunLunexpression
return WIDE_TABLE_CIRCLE_SQL_TEMPLATE(
expr = expr(requestDTO, tableMappingMap),
arrayLines = arrayLines(requestDTO, tableMappingMap),
csvFile = csvFile,
)
}
fun expr(requestDTO: SQLQueryReqDTO, tableMappingMap: Map>): String {
val tagIdxs: List = tagOptionConditions(requestDTO, tableMappingMap)
val exprMap = tagIdxs.groupBy { it.kexprId }
return genWhereClause(exprMap, requestDTO.expression)
}
private fun genWhereClause(exprMap: Map>, kunLunexpression: KunLunexpression): String {
val subexpression = kunLunexpression.subexpression
if (CollectionUtils.isEmpty(subexpression)) { // 叶子节点
return ""
}
val w = StringBuffer()
val size = subexpression.size
val logic = kunLunexpression.logic
w.append("(")
if (logic == LogicOperatorEnum.AND) {
w.append("array_intersect(")
} else if (logic == LogicOperatorEnum.OR) {
w.append("array_union(")
} else if (logic == LogicOperatorEnum.EXCEPT) {
w.append("array_except(")
} else {
throw IllegalArgumentException("logic $logic not supported!")
}
var firstTagIdx: Int = 1
subexpression.forEachIndexed { index, e ->
// 最叶子节点
if (isLeafNode(e)) {
val targetTagIdx = exprMap[e.tfId]?.get(0)
val tagIdx = targetTagIdx!!.index
// 计算差集使用
if (index == 0) {
firstTagIdx = tagIdx
}
if (index != size - 1) {
w.append("t[$tagIdx],")
} else {
w.append("t[$tagIdx]")
}
}
// 递归非叶子节点
else {
w.append(genWhereClause(exprMap, e))
}
}
w.append("))")
return w.toString()
}
fun arrayLines(requestDTO: SQLQueryReqDTO, tableMappingMap: Map>): String {
val tagIdxs: List = tagOptionConditions(requestDTO, tableMappingMap)
val size = tagIdxs.size
val arrayLines = StringBuffer()
tagIdxs.forEachIndexed { index, tagIdx ->
if (index != size - 1) {
arrayLines.append("(${tagIdx.conditionExpr}), n")
} else {
arrayLines.append("(${tagIdx.conditionExpr}) n")
}
}
return arrayLines.toString()
}
private fun WIDE_TABLE_COUNT_SQL_TEMPLATE(
expr: String,
arrayLines: String,
) = """
select size(t.res) as cnt
from (
select $expr as res,
array(
$arrayLines
) t
) t
"""
private fun WIDE_TABLE_CIRCLE_SQL_TEMPLATE(
expr: String,
arrayLines: String,
csvFile: String,
) = """
select explode(t.res) as ids
from (
select $expr as res,
array(
$arrayLines
) t
) t
"""
}
fun main() {
val requestDTO = SQLQueryReqDTO()
val tableMappingMap: HashMap> = hashMapOf()
val expression = KunLunexpression()
expression.logic = LogicOperatorEnum.EXCEPT
val subexpressionList = arrayListOf()
val e1 = KunLunexpression()
val e2 = KunLunexpression()
val e3 = KunLunexpression()
val dimList = listOf(
FieldCondition("", "t1", "cate_id", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("10001")),
FieldCondition("", "t1", "shop_id", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("798322"))
)
e1.fieldCondition = FieldCondition("", "t1", "f1", dimList, ArithmeticOperatorEnum.EQUAL, listOf("1"))
e2.fieldCondition = FieldCondition("", "t2", "f2", dimList, ArithmeticOperatorEnum.EQUAL, listOf("22"))
e3.fieldCondition = FieldCondition("", "t2", "f3", dimList, ArithmeticOperatorEnum.EQUAL, listOf("333"))
e3.logic = LogicOperatorEnum.AND
val e3SubexpressionList = arrayListOf()
val e31 = KunLunexpression()
val e32 = KunLunexpression()
val e33 = KunLunexpression()
e3SubexpressionList.add(e31)
e3SubexpressionList.add(e32)
e3SubexpressionList.add(e33)
e31.fieldCondition = FieldCondition("", "t3", "f4", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("4"))
e32.fieldCondition = FieldCondition("", "t3", "f5", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("5"))
e33.fieldCondition = FieldCondition("", "t3", "f6", dimList, ArithmeticOperatorEnum.EQUAL, listOf("6"))
e3.subexpression = e3SubexpressionList
subexpressionList.add(e1)
subexpressionList.add(e2)
subexpressionList.add(e3)
expression.subexpression = subexpressionList
requestDTO.expression = expression
// KTableMapping(boolean rowMapping, String tableCode, String kTableCode, String physicDBName, KField targetField, KSource source, List fields)
// KField(String columnCode, String fieldCode, KFieldValueType fieldType, String description)
// KSource(Long tagSrcTaskId, String tagSrcDb, String tagSrcTable, String tagSrcTableJoinField)
// KFieldMapping(KField srcField, KField dstField)
tableMappingMap["t1"] = listOf(KTableMapping(
"t1",
"table1",
"db1",
KField("user_id", "", KFieldValueType.STRING, ""),
KSource(0, "db1", "table1", "user_id"),
listOf(
KFieldMapping(
KField("f1", "", KFieldValueType.STRING, ""), // srcField
KField("f1", "", KFieldValueType.STRING, "") // dstField
),
KFieldMapping(
KField("cate_id", "", KFieldValueType.STRING, ""), // srcField
KField("cate_id", "", KFieldValueType.STRING, "") // dstField
),
KFieldMapping(
KField("shop_id", "", KFieldValueType.LONG, ""), // srcField
KField("shop_id", "", KFieldValueType.LONG, "") // dstField
),
)
))
tableMappingMap["t2"] = listOf(KTableMapping(
"t2",
"table2",
"db2",
KField("user_id", "", KFieldValueType.STRING, ""),
KSource(0, "db2", "table2", "user_id"),
listOf(
KFieldMapping(
KField("f2", "", KFieldValueType.STRING, ""), // srcField
KField("f2", "", KFieldValueType.STRING, "") // dstField
),
KFieldMapping(
KField("f3", "", KFieldValueType.LONG, ""), // srcField
KField("f3", "", KFieldValueType.LONG, "") // dstField
),
)
))
tableMappingMap["t3"] = listOf(KTableMapping(
"t3",
"table3",
"db3",
KField("user_id", "", KFieldValueType.STRING, ""),
KSource(0, "db3", "table3", "user_id"),
listOf(
KFieldMapping(
KField("f4", "", KFieldValueType.STRING, ""), // srcField
KField("f4", "", KFieldValueType.STRING, "") // dstField
),
KFieldMapping(
KField("f5", "", KFieldValueType.LONG, ""), // srcField
KField("f5", "", KFieldValueType.LONG, "") // dstField
),
KFieldMapping(
KField("f6", "", KFieldValueType.LONG, ""), // srcField
KField("f6", "", KFieldValueType.LONG, "") // dstField
),
)
))
val WideTableMultiDimHiveSQLParser = WideTableMultiDimHiveSQLParser()
val expr = WideTableMultiDimHiveSQLParser.expr(requestDTO, tableMappingMap)
val arrayLines = WideTableMultiDimHiveSQLParser.arrayLines(requestDTO, tableMappingMap)
println(expr)
println(arrayLines)
} 


