0
点赞
收藏
分享

微信扫一扫

WideTableMultiDimSQLParser 解析说明:ClickHouse / Hive 数组交并差运算

WideTableMultiDimSQLParser 解析说明

1.ClickHouse 数组交并差运算

--交 t[1] ∩ t[2] : arrayIntersect(t[1], t[2])
select length(arrayDistinct(arrayFilter(x->x is not null, t.res))) as cnt
from (
select arrayIntersect(t[3], arrayIntersect(t[1], t[2])) as res,
array(
(select groupUniqArray(UserID) from hits_v1 where Sex = 1),
(select groupUniqArray(UserID) from hits_v1 where Age > 18),
(select groupUniqArray(UserID) from hits_v1 where RequestNum > 0)
) t
) t;

--并 t[1] ∪ t[2]: arrayConcat(t[1], t[2])
select length(arrayDistinct(arrayFilter(x->x is not null, t.res))) as cnt
from (
select arrayConcat(t[3], arrayConcat(t[1], t[2])) as res,
array(
(select groupUniqArray(UserID) from hits_v1 where Sex = 1),
(select groupUniqArray(UserID) from hits_v1 where Age > 18),
(select groupUniqArray(UserID) from hits_v1 where RequestNum > 0)
) t
) t;

--差 t[1]-t[2] : arrayMap(x->multiIf(x not in arrayIntersect(t[1], t[2]), x, NULL), t[1])
select length(arrayDistinct(arrayFilter(x->x is not null, t.res))) as cnt
from (
select arrayIntersect(t[3], arrayMap(x->multiIf(x not in arrayIntersect(t[1], t[2]), x, NULL), t[1])) as res,
array(
(select groupUniqArray(UserID) from hits_v1 where Sex = 1),
(select groupUniqArray(UserID) from hits_v1 where Age > 18),
(select groupUniqArray(UserID) from hits_v1 where RequestNum > 0)
) t
) t;

--并
select length(arrayDistinct(t.res))
from (
select arrayConcat(t[3], arrayConcat(t[1], t[2])) as res,
array(
(select groupUniqArray(UserID) from hits_v1 where Sex = 1),
(select groupUniqArray(UserID) from hits_v1 where Age > 18),
(select groupUniqArray(UserID) from hits_v1 where RequestNum > 0)
) t
) t;

ClickHouse :

(arrayMap(x->multiIf(x not in arrayIntersect(t[1],t[2],(arrayIntersect(t[4],t[5],t[6]))), x, NULL), t[1]))
(select groupUniqArray(user_id) from db1.table1 where ( cate_id = '10001' and shop_id = 798322 ) and ( f1 = '1' )),
(select groupUniqArray(user_id) from db2.table2 where ( cate_id = '10001' and shop_id = 798322 ) and ( f2 = '22' )),
(select groupUniqArray(user_id) from db2.table2 where ( cate_id = '10001' and shop_id = 798322 ) and ( f3 = 333 )),
(select groupUniqArray(user_id) from db3.table3 where ( 1=1 ) and ( f4 = '4' )),
(select groupUniqArray(user_id) from db3.table3 where ( 1=1 ) and ( f5 = 5 )),
(select groupUniqArray(user_id) from db3.table3 where ( cate_id = '10001' and shop_id = 798322 ) and ( f6 = 6 ))

2.Hive 数组交并差运算:

select
array_intersect(array(1, 2), array(2, 3)) i,
array_union(array(1, 2), array(2, 3)) u,
array_except(array(1, 2), array(2, 3)) e;

Hive:

(array_except(t[1],t[2],(array_intersect(t[4],t[5],t[6]))))
(select collect_set(user_id) from db1.table1 where ( cate_id = '10001' and shop_id = 798322 ) and ( f1 = '1' )),
(select collect_set(user_id) from db2.table2 where ( cate_id = '10001' and shop_id = 798322 ) and ( f2 = '22' )),
(select collect_set(user_id) from db2.table2 where ( cate_id = '10001' and shop_id = 798322 ) and ( f3 = 333 )),
(select collect_set(user_id) from db3.table3 where ( 1=1 ) and ( f4 = '4' )),
(select collect_set(user_id) from db3.table3 where ( 1=1 ) and ( f5 = 5 )),
(select collect_set(user_id) from db3.table3 where ( cate_id = '10001' and shop_id = 798322 ) and ( f6 = 6 ))

附源码

data class TagIdx(var kexprId: Int, var tagCode: String, var tagOptionCode: String, var conditionExpr: String, var index: Int)

fun isLeafNode(e: KunLunExpression) = CollectionUtils.isEmpty(e.subExpression)

fun tagOptionConditions(requestDTO: SQLQueryReqDTO, tableMappingMap: Map<String, List<KTableMapping>>): List<TagIdx> {
val tagIdxList = mutableListOf<TagIdx>()
//递归解析rule表达式,打平成过滤条件列表
val kexpr: KunLunExpression = requestDTO.expression
parseTagIdx(kexpr, tagIdxList, tableMappingMap)
// 设置 index 字段值,用索引下标+1
tagIdxList.forEachIndexed { index, tagIdx ->
tagIdx.index = index + 1
}
return tagIdxList
}


fun parseTagIdx(kexpr: KunLunExpression, tagIdxList: MutableList<TagIdx>, tableMappingMap: Map<String, List<KTableMapping>>) {
val fieldCondition = kexpr.fieldCondition
if (null != fieldCondition) {

val dimFilter = StringBuilder()
// 维度过滤条件,每个标签 TableCode 上都有自己的维度.真正用于过滤的是 FieldCode,所以 fieldCondition 这里加上: tagDimCondition
val dimConditionList = kexpr.fieldCondition.dimConditionList

if (CollectionUtils.isEmpty(dimConditionList)) {
dimFilter.append(" 1=1 ")
} else {
val lastIndex = dimConditionList.size - 1
dimConditionList.forEachIndexed { index, dimField ->

val dimTagCode = dimField.tableCode
val dimFieldCode = dimField.fieldCode
val dimKTableMapping = tableMappingMap[dimTagCode]!![0]
val dimPhysicalField = dimKTableMapping.fields.first { it.srcField.columnCode == dimFieldCode }.dstField
val dimPhysicalcolumnCode = dimPhysicalField.columnCode
val dimFieldValueType = dimPhysicalField.fieldType
val v = parseFieldValue(dimField, dimFieldValueType)
val singleValue = v.get(0)?.sqlCondition

if (index != lastIndex) {
dimFilter.append(" $dimPhysicalcolumnCode = $singleValue and ")
} else {
dimFilter.append(" $dimPhysicalcolumnCode = $singleValue ")
}
}
}

val tagCode = fieldCondition.tableCode
val fieldCode = fieldCondition.fieldCode
val KTableMapping = tableMappingMap[tagCode]!![0]

val physicalField = KTableMapping.fields.first { it.srcField.columnCode == fieldCode }.dstField
val physicalcolumnCode = physicalField.columnCode
val fieldValueType = physicalField.fieldType
val targetFieldCode = KTableMapping.targetField.columnCode
val dbName = KTableMapping.physicDBName
val tableName = KTableMapping.getkTableCode()
val filterConditionClause = genFilterConditionClause(fieldCondition, physicalcolumnCode, fieldValueType)

val line = "select collect_set($targetFieldCode) from $dbName.$tableName where ( $dimFilter ) and ( $filterConditionClause )"
val tagIdx = TagIdx(kexprId = kexpr.tfId, tagCode = tagCode, tagOptionCode = fieldCode, conditionExpr = line, index = -1) // index 先设置默认值 -1
tagIdxList.add(tagIdx)
}
// 递归子语句
kexpr.subExpression?.forEach {
parseTagIdx(it, tagIdxList, tableMappingMap)
}
}

fun genFilterConditionClause(fieldCondition: FieldCondition, physicalField: String, fieldValueType: KFieldValueType): String {
val fv = parseFieldValue(fieldCondition, fieldValueType)
if (CollectionUtils.isEmpty(fv)) {
throw IllegalArgumentException("fieldCondition must have fieldValue!")
}
val size = fv.size
// 多值(1,2,3,4)
val listValue = StringBuilder()
listValue.append("(")
fv.forEachIndexed { index, fieldValue ->
if (index == size - 1)
listValue.append(fieldValue?.sqlCondition)
else
listValue.append(fieldValue?.sqlCondition).append(",")
}
listValue.append(")")
// 单值
val singleValue = fv.get(0)?.sqlCondition
val singleValueNoQuote = fv.get(0)?.qlCondition

var conditionExpr = ""
conditionExpr = when (fieldCondition.operator) {
ArithmeticOperatorEnum.LIKE -> " like '%${singleValueNoQuote}%' "
ArithmeticOperatorEnum.EQUAL -> " = ${singleValue} "
ArithmeticOperatorEnum.GREATER_EQUAL_THAN -> " >= ${singleValue} "
ArithmeticOperatorEnum.LESS_THAN -> " < ${singleValue} "
ArithmeticOperatorEnum.LESS_EQUAL_THAN -> " <= ${singleValue} "
ArithmeticOperatorEnum.GREATER_THAN -> " > ${singleValue} "
ArithmeticOperatorEnum.BETWEEN -> " between ${fv.get(0)?.sqlCondition} and ${fv.get(1)?.sqlCondition} "
ArithmeticOperatorEnum.IN -> " in ${listValue} "
ArithmeticOperatorEnum.NOT_IN -> " not in ${listValue} "

else -> throw IllegalStateException("${fieldCondition.operator} not supported yet")
}

return " $physicalField $conditionExpr "
}

/**
* 解析 fieldValue 值
*/
fun parseFieldValue(fieldCondition: FieldCondition, fieldValueType: KFieldValueType): List<FieldValue<*>?> {
val values = fieldCondition.values
if (values == null || values.isEmpty()) {
ExceptionHelper.bizError("illegal value size,values length must greater than 0.")
}

// 特征值类型
lateinit var clazz: Class<out FieldValue<*>>
when (fieldValueType) {
KFieldValueType.STRING -> clazz = StringFieldValue::class.java
KFieldValueType.LONG -> clazz = LongFieldValue::class.java
KFieldValueType.DOUBLE -> clazz = DoubleFieldValue::class.java
else -> ExceptionHelper.bizError("$fieldValueType fieldValueType not supported!")
}
return FieldValue.create(clazz, *values.toTypedArray())
}





/**
* 递归遍历KunLun表达式,并添加tagCode/ objectSet.
*/
fun recurExtractTagCodeAndObjectSet(expression: KunLunExpression, tagBaseFieldList: MutableList<TagBaseField>, objectSetList: MutableList<String>) {

// 子表达式为空,递归结束
if (isLeafNode(expression)) {
val fieldCondition = expression.fieldCondition

// 添加分群
if (StringUtils.isNotEmpty(fieldCondition.objectSetId)) {
objectSetList.add(fieldCondition.objectSetId)
} else {
// 添加标签
val tagBaseField = TagBaseField()
tagBaseField.tableCode = fieldCondition.tableCode
tagBaseField.fieldCode = fieldCondition.fieldCode
tagBaseFieldList.add(tagBaseField)
}
return
}

// 递归遍历子节点
for (subExpression in expression.subExpression) {
recurExtractTagCodeAndObjectSet(subExpression, tagBaseFieldList, objectSetList)
}
}

@Service
class CommonParseUtils {


fun getTableMappingMap(tenant: Tenant, requestDTO: SQLQueryReqDTO): Map<String, List<KTableMapping>> {
// 标签 & 分群
val tagBaseFieldList: MutableList<TagBaseField> = mutableListOf()
val objectSetList: MutableList<String> = mutableListOf()
recurExtractTagCodeAndObjectSet(requestDTO.getExpression(), tagBaseFieldList, objectSetList)
// META
val tableMappingList: List<KTableMapping> = getTagCodeTableMapping(tenant.id, tagBaseFieldList, requestDTO.getDriverType())
return tableMappingList.groupBy { it.tableCode }
}

/**
* 获取KunLun表达式中所有标签对应物理表的映射关系.
*/
fun getTagCodeTableMapping(tenantId: Long, tagBaseFieldList: List<TagBaseField>, driverType: DriverType): List<KTableMapping> {
if (CollectionUtils.isEmpty(tagBaseFieldList)) {
return emptyList()
}

// 获取映射关系
// TODO 元数据: kTableMappings
val kTableMappings: List<KTableMapping> = ArrayList()

val tagCodeTableMapping = kTableMappings.stream().collect(Collectors.toMap({ obj: KTableMapping -> obj.tableCode }, Function.identity()))

// check
for (tagBaseField in tagBaseFieldList) {
val kTableMapping = tagCodeTableMapping[tagBaseField.tableCode] ?: throw ExceptionHelper.bizError(String.format("tag code [%s] is non-exists", tagBaseField.tableCode))
val fields = kTableMapping.fields
val existsTagOption = fields.stream().noneMatch { kFieldMapping: KFieldMapping -> kFieldMapping.srcField.columnCode == tagBaseField.fieldCode }
if (!existsTagOption) {
throw ExceptionHelper.bizError(String.format("tag option [%s] is non-exists", tagBaseField.fieldCode))
}
}
return kTableMappings
}

}
/**
* 宽表多维标签CH SQL 解析器
* @author chenguangjian.jk
* @date 2022-03-09 02:28:48
*/
@Service
class WideTableMultiDimCHSQLParser {
val log = LoggerFactory.getLogger(WideTableMultiDimCHSQLParser::class.java)

@Resource
lateinit var commonParseUtils: CommonParseUtils

/**
* 宽表多维标签预估 SQL
*/
fun parseCount(tenant: Tenant, requestDTO: SQLQueryReqDTO): String {
val tableMappingMap = commonParseUtils.getTableMappingMap(tenant, requestDTO)
// Parse KunLunExpression
return WIDE_TABLE_COUNT_SQL_TEMPLATE(
expr = expr(requestDTO, tableMappingMap),
arrayLines = arrayLines(requestDTO, tableMappingMap)
)
}


/**
* 宽表多维标签圈选 SQL
*/
fun parseCircle(tenant: Tenant, requestDTO: SQLQueryReqDTO): String {
val tableMappingMap = commonParseUtils.getTableMappingMap(tenant, requestDTO)

val csvFile = ""
// Parse KunLunExpression
return WIDE_TABLE_CIRCLE_SQL_TEMPLATE(
expr = expr(requestDTO, tableMappingMap),
arrayLines = arrayLines(requestDTO, tableMappingMap),
csvFile = csvFile,
)
}


fun expr(requestDTO: SQLQueryReqDTO, tableMappingMap: Map<String, List<KTableMapping>>): String {
val tagIdxs: List<TagIdx> = tagOptionConditions(requestDTO, tableMappingMap)
val exprMap = tagIdxs.groupBy { it.kexprId }
return genWhereClause(exprMap, requestDTO.expression)
}


private fun genWhereClause(exprMap: Map<Int, List<TagIdx>>, kunLunExpression: KunLunExpression): String {
val subExpression = kunLunExpression.subExpression
if (CollectionUtils.isEmpty(subExpression)) { // 叶子节点
return ""
}

val w = StringBuffer()
val size = subExpression.size
val logic = kunLunExpression.logic

w.append("(")

if (logic == LogicOperatorEnum.AND) {
w.append("arrayIntersect(")
} else if (logic == LogicOperatorEnum.OR) {
w.append("arrayConcat(")
} else if (logic == LogicOperatorEnum.EXCEPT) {
w.append("arrayMap(x->multiIf(x not in arrayIntersect(")
} else {
throw IllegalArgumentException("logic $logic not supported!")
}

var firstTagIdx: Int = 1
subExpression.forEachIndexed { index, e ->
// 最叶子节点
if (isLeafNode(e)) {
val targetTagIdx = exprMap[e.tfId]?.get(0)
val tagIdx = targetTagIdx!!.index

// 计算差集使用
if (index == 0) {
firstTagIdx = tagIdx
}

if (index != size - 1) {
w.append("t[$tagIdx],")
} else {
w.append("t[$tagIdx]")
}
}
// 递归非叶子节点
else {
w.append(genWhereClause(exprMap, e))
}
}

if (logic == LogicOperatorEnum.AND || logic == LogicOperatorEnum.OR) {
w.append("))")
} else if (logic == LogicOperatorEnum.EXCEPT) {
w.append("), x, NULL), t[$firstTagIdx]))")
}

return w.toString()
}


/**
* 生成 arrayLines (最后一行没有: , 逗号)
(select groupUniqArray(UserID) from db.hits_v1 where Sex = 1),
(select groupUniqArray(UserID) from db.hits_v1 where Age > 18),
(select groupUniqArray(UserID) from db.hits_v1 where RequestNum > 0)
*/
fun arrayLines(requestDTO: SQLQueryReqDTO, tableMappingMap: Map<String, List<KTableMapping>>): String {
val tagIdxs: List<TagIdx> = tagOptionConditions(requestDTO, tableMappingMap)
val size = tagIdxs.size
val arrayLines = StringBuffer()

tagIdxs.forEachIndexed { index, tagIdx ->
if (index != size - 1) {
arrayLines.append("(${tagIdx.conditionExpr}), \n")
} else {
arrayLines.append("(${tagIdx.conditionExpr}) \n")
}
}
return arrayLines.toString()
}


/**
select length(arrayDistinct(arrayFilter(x->x is not null, t.res))) as cnt
from (
select arrayIntersect(t[3], arrayIntersect(t[1], t[2])) as res,
array(
(select groupUniqArray(UserID) from hits_v1 where Sex = 1),
(select groupUniqArray(UserID) from hits_v1 where Age > 18),
(select groupUniqArray(UserID) from hits_v1 where RequestNum > 0)
) t
) t
*/
private fun WIDE_TABLE_COUNT_SQL_TEMPLATE(
expr: String,
arrayLines: String,
) = """
select length(arrayDistinct(arrayFilter(x->x is not null, t.res))) as cnt
from (
select $expr as res,
array(
$arrayLines
) t
) t
"""


/**
select arrayJoin(arrayDistinct(arrayFilter(x->x is not null, t.res)))
from (
select arrayIntersect(t[3], arrayIntersect(t[1], t[2])) as res,
array(
(select groupUniqArray(UserID) from hits_v1 where Sex = 1),
(select groupUniqArray(UserID) from hits_v1 where Age > 18),
(select groupUniqArray(UserID) from hits_v1 where RequestNum > 0)
) t
) t
INTO OUTFILE 'tos:///xxx' FORMAT CSV
settings distributed_perfect_shard=1,max_execution_time = 600
*/
private fun WIDE_TABLE_CIRCLE_SQL_TEMPLATE(
expr: String,
arrayLines: String,
csvFile: String,
) = """
select arrayJoin(arrayDistinct(arrayFilter(x->x is not null, t.res)))
from (
select $expr as res,
array(
$arrayLines
) t
) t
INTO OUTFILE 'tos:///xxx' FORMAT CSV
settings distributed_perfect_shard=1,max_execution_time = 600
"""


}


/**
tagIdxList=[{"conditionExpr":"select groupUniqArray(user_id) from db1.table1 where ( cate_id = '1001' ) and ( f1 = '1' )","index":1,"kexprId":684563482,"tagCode":"t1","tagOptionCode":"f1"},{"conditionExpr":"select groupUniqArray(user_id) from db2.table2 where ( cate_id = '1002' ) and ( f2 = '22' )","index":2,"kexprId":684642314,"tagCode":"t2","tagOptionCode":"f2"},{"conditionExpr":"select groupUniqArray(user_id) from db2.table2 where ( shop_id = '798322' ) and ( f3 = 333 )","index":3,"kexprId":568144263,"tagCode":"t2","tagOptionCode":"f3"},{"conditionExpr":"select groupUniqArray(user_id) from db3.table3 where ( cate_id = '1004' ) and ( f4 = '4' )","index":4,"kexprId":684626037,"tagCode":"t3","tagOptionCode":"f4"},{"conditionExpr":"select groupUniqArray(user_id) from db3.table3 where ( cate_id = '1005' ) and ( f5 = 5 )","index":5,"kexprId":684627036,"tagCode":"t3","tagOptionCode":"f5"},{"conditionExpr":"select groupUniqArray(user_id) from db3.table3 where ( cate_id = '1006' ) and ( f6 = 6 )","index":6,"kexprId":684628027,"tagCode":"t3","tagOptionCode":"f6"}]
(arrayMap(x->multiIf(x not in arrayIntersect(t[1],t[2],(arrayIntersect(t[4],t[5],t[6]))), x, NULL), t[1]))
(select groupUniqArray(user_id) from db1.table1 where ( cate_id = '1001' ) and ( f1 = '1' )),
(select groupUniqArray(user_id) from db2.table2 where ( cate_id = '1002' ) and ( f2 = '22' )),
(select groupUniqArray(user_id) from db2.table2 where ( shop_id = '798322' ) and ( f3 = 333 )),
(select groupUniqArray(user_id) from db3.table3 where ( cate_id = '1004' ) and ( f4 = '4' )),
(select groupUniqArray(user_id) from db3.table3 where ( cate_id = '1005' ) and ( f5 = 5 )),
(select groupUniqArray(user_id) from db3.table3 where ( cate_id = '1006' ) and ( f6 = 6 ))
*/
fun main() {
val requestDTO = SQLQueryReqDTO()
val tableMappingMap: HashMap<String, List<KTableMapping>> = hashMapOf()
val expression = KunLunExpression()
expression.logic = LogicOperatorEnum.EXCEPT
val subExpressionList = arrayListOf<KunLunExpression>()
val e1 = KunLunExpression()
val e2 = KunLunExpression()
val e3 = KunLunExpression()

val dimList = listOf(
FieldCondition("", "t1", "cate_id", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("10001")),
FieldCondition("", "t1", "shop_id", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("798322"))
)

e1.fieldCondition = FieldCondition("", "t1", "f1", dimList, ArithmeticOperatorEnum.EQUAL, listOf("1"))
e2.fieldCondition = FieldCondition("", "t2", "f2", dimList, ArithmeticOperatorEnum.EQUAL, listOf("22"))
e3.fieldCondition = FieldCondition("", "t2", "f3", dimList, ArithmeticOperatorEnum.EQUAL, listOf("333"))
e3.logic = LogicOperatorEnum.AND

val e3SubExpressionList = arrayListOf<KunLunExpression>()
val e31 = KunLunExpression()
val e32 = KunLunExpression()
val e33 = KunLunExpression()
e3SubExpressionList.add(e31)
e3SubExpressionList.add(e32)
e3SubExpressionList.add(e33)
e31.fieldCondition = FieldCondition("", "t3", "f4", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("4"))
e32.fieldCondition = FieldCondition("", "t3", "f5", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("5"))
e33.fieldCondition = FieldCondition("", "t3", "f6", dimList, ArithmeticOperatorEnum.EQUAL, listOf("6"))
e3.subExpression = e3SubExpressionList

subExpressionList.add(e1)
subExpressionList.add(e2)
subExpressionList.add(e3)
expression.subExpression = subExpressionList
requestDTO.expression = expression

// KTableMapping(boolean rowMapping, String tableCode, String kTableCode, String physicDBName, KField targetField, KSource source, List<KFieldMapping> fields)
// KField(String columnCode, String fieldCode, KFieldValueType fieldType, String description)
// KSource(Long tagSrcTaskId, String tagSrcDb, String tagSrcTable, String tagSrcTableJoinField)
// KFieldMapping(KField srcField, KField dstField)

tableMappingMap["t1"] = listOf(KTableMapping(
"t1",
"table1",
"db1",
KField("user_id", "", KFieldValueType.STRING, ""),
KSource(0, "db1", "table1", "user_id"),
listOf(
KFieldMapping(
KField("f1", "", KFieldValueType.STRING, ""), // srcField
KField("f1", "", KFieldValueType.STRING, "") // dstField
),
KFieldMapping(
KField("cate_id", "", KFieldValueType.STRING, ""), // srcField
KField("cate_id", "", KFieldValueType.STRING, "") // dstField
),
KFieldMapping(
KField("shop_id", "", KFieldValueType.LONG, ""), // srcField
KField("shop_id", "", KFieldValueType.LONG, "") // dstField
),
)
))

tableMappingMap["t2"] = listOf(KTableMapping(
"t2",
"table2",
"db2",
KField("user_id", "", KFieldValueType.STRING, ""),
KSource(0, "db2", "table2", "user_id"),
listOf(
KFieldMapping(
KField("f2", "", KFieldValueType.STRING, ""), // srcField
KField("f2", "", KFieldValueType.STRING, "") // dstField
),
KFieldMapping(
KField("f3", "", KFieldValueType.LONG, ""), // srcField
KField("f3", "", KFieldValueType.LONG, "") // dstField
),
KFieldMapping(
KField("cate_id", "", KFieldValueType.STRING, ""), // srcField
KField("cate_id", "", KFieldValueType.STRING, "") // dstField
),
KFieldMapping(
KField("shop_id", "", KFieldValueType.LONG, ""), // srcField
KField("shop_id", "", KFieldValueType.LONG, "") // dstField
),
)
))

tableMappingMap["t3"] = listOf(KTableMapping(
"t3",
"table3",
"db3",
KField("user_id", "", KFieldValueType.STRING, ""),
KSource(0, "db3", "table3", "user_id"),
listOf(
KFieldMapping(
KField("f4", "", KFieldValueType.STRING, ""), // srcField
KField("f4", "", KFieldValueType.STRING, "") // dstField
),
KFieldMapping(
KField("f5", "", KFieldValueType.LONG, ""), // srcField
KField("f5", "", KFieldValueType.LONG, "") // dstField
),
KFieldMapping(
KField("f6", "", KFieldValueType.LONG, ""), // srcField
KField("f6", "", KFieldValueType.LONG, "") // dstField
),
KFieldMapping(
KField("cate_id", "", KFieldValueType.STRING, ""), // srcField
KField("cate_id", "", KFieldValueType.STRING, "") // dstField
),
KFieldMapping(
KField("shop_id", "", KFieldValueType.LONG, ""), // srcField
KField("shop_id", "", KFieldValueType.LONG, "") // dstField
),
)
))

val WideTableMultiDimCHSQLParser = WideTableMultiDimCHSQLParser()
val expr = WideTableMultiDimCHSQLParser.expr(requestDTO, tableMappingMap)
val arrayLines = WideTableMultiDimCHSQLParser.arrayLines(requestDTO, tableMappingMap)

println(expr)
println(arrayLines)
}









/**
* 宽表多维标签 HIVE SQL 解析器
* @author chenguangjian.jk
* @date 2022-03-09 02:28:48
*/
@Service
class WideTableMultiDimHiveSQLParser {

val log = LoggerFactory.getLogger(WideTableMultiDimCHSQLParser::class.java)
@Resource
lateinit var commonParseUtils: CommonParseUtils
/**
* 宽表多维标签预估 SQL
*/
fun parseCount(tenant: Tenant, requestDTO: SQLQueryReqDTO): String {
val tableMappingMap = commonParseUtils.getTableMappingMap(tenant, requestDTO)
// Parse KunLunExpression
return WIDE_TABLE_COUNT_SQL_TEMPLATE(
expr = expr(requestDTO, tableMappingMap),
arrayLines = arrayLines(requestDTO, tableMappingMap)
)
}


/**
* 宽表多维标签圈选 SQL
*/
fun parseCircle(tenant: Tenant, requestDTO: SQLQueryReqDTO): String {
val tableMappingMap = commonParseUtils.getTableMappingMap(tenant, requestDTO)

val csvFile = ""
// Parse KunLunExpression
return WIDE_TABLE_CIRCLE_SQL_TEMPLATE(
expr = expr(requestDTO, tableMappingMap),
arrayLines = arrayLines(requestDTO, tableMappingMap),
csvFile = csvFile,
)
}


fun expr(requestDTO: SQLQueryReqDTO, tableMappingMap: Map<String, List<KTableMapping>>): String {
val tagIdxs: List<TagIdx> = tagOptionConditions(requestDTO, tableMappingMap)
val exprMap = tagIdxs.groupBy { it.kexprId }
return genWhereClause(exprMap, requestDTO.expression)
}


private fun genWhereClause(exprMap: Map<Int, List<TagIdx>>, kunLunExpression: KunLunExpression): String {
val subExpression = kunLunExpression.subExpression
if (CollectionUtils.isEmpty(subExpression)) { // 叶子节点
return ""
}

val w = StringBuffer()
val size = subExpression.size
val logic = kunLunExpression.logic

w.append("(")

if (logic == LogicOperatorEnum.AND) {
w.append("array_intersect(")
} else if (logic == LogicOperatorEnum.OR) {
w.append("array_union(")
} else if (logic == LogicOperatorEnum.EXCEPT) {
w.append("array_except(")
} else {
throw IllegalArgumentException("logic $logic not supported!")
}

var firstTagIdx: Int = 1
subExpression.forEachIndexed { index, e ->
// 最叶子节点
if (isLeafNode(e)) {
val targetTagIdx = exprMap[e.tfId]?.get(0)
val tagIdx = targetTagIdx!!.index

// 计算差集使用
if (index == 0) {
firstTagIdx = tagIdx
}

if (index != size - 1) {
w.append("t[$tagIdx],")
} else {
w.append("t[$tagIdx]")
}
}
// 递归非叶子节点
else {
w.append(genWhereClause(exprMap, e))
}
}

w.append("))")
return w.toString()
}


/**
* 生成 arrayLines (最后一行没有: , 逗号)
(select groupUniqArray(UserID) from db.hits_v1 where Sex = 1),
(select groupUniqArray(UserID) from db.hits_v1 where Age > 18),
(select groupUniqArray(UserID) from db.hits_v1 where RequestNum > 0)
*/
fun arrayLines(requestDTO: SQLQueryReqDTO, tableMappingMap: Map<String, List<KTableMapping>>): String {
val tagIdxs: List<TagIdx> = tagOptionConditions(requestDTO, tableMappingMap)
val size = tagIdxs.size
val arrayLines = StringBuffer()

tagIdxs.forEachIndexed { index, tagIdx ->
if (index != size - 1) {
arrayLines.append("(${tagIdx.conditionExpr}), \n")
} else {
arrayLines.append("(${tagIdx.conditionExpr}) \n")
}
}
return arrayLines.toString()
}



/**
select size(t.res) as cnt
from (
select array_intersect(t[3], array_intersect(t[1], t[2])) as res,
array(
(select collect_set(UserID) from hits_v1 where Sex = 1),
(select collect_set(UserID) from hits_v1 where Age > 18),
(select collect_set(UserID) from hits_v1 where RequestNum > 0)
) t
) t
*/
private fun WIDE_TABLE_COUNT_SQL_TEMPLATE(
expr: String,
arrayLines: String,
) = """
select size(t.res) as cnt
from (
select $expr as res,
array(
$arrayLines
) t
) t
"""


/**
select explode(t.res) as ids
from (
select array_intersect(t[3], array_intersect(t[1], t[2])) as res,
array(
(select collect_set(UserID) from hits_v1 where Sex = 1),
(select collect_set(UserID) from hits_v1 where Age > 18),
(select collect_set(UserID) from hits_v1 where RequestNum > 0)
) t
) t
*/
private fun WIDE_TABLE_CIRCLE_SQL_TEMPLATE(
expr: String,
arrayLines: String,
csvFile: String,
) = """
select explode(t.res) as ids
from (
select $expr as res,
array(
$arrayLines
) t
) t
"""






}


/**
WideTableMultiDimCHSQLParser - tagIdxList=[{"conditionExpr":"select collect_set(user_id) from db1.table1 where ( cate_id = '10001' and shop_id = 798322 ) and ( f1 = '1' )","index":1,"kexprId":-316732738,"tagCode":"t1","tagOptionCode":"f1"},{"conditionExpr":"select collect_set(user_id) from db2.table2 where ( cate_id = '10001' and shop_id = 798322 ) and ( f2 = '22' )","index":2,"kexprId":-316653905,"tagCode":"t2","tagOptionCode":"f2"},{"conditionExpr":"select collect_set(user_id) from db2.table2 where ( cate_id = '10001' and shop_id = 798322 ) and ( f3 = 333 )","index":3,"kexprId":-315132611,"tagCode":"t2","tagOptionCode":"f3"},{"conditionExpr":"select collect_set(user_id) from db3.table3 where ( 1=1 ) and ( f4 = '4' )","index":4,"kexprId":127438862,"tagCode":"t3","tagOptionCode":"f4"},{"conditionExpr":"select collect_set(user_id) from db3.table3 where ( 1=1 ) and ( f5 = 5 )","index":5,"kexprId":127439854,"tagCode":"t3","tagOptionCode":"f5"},{"conditionExpr":"select collect_set(user_id) from db3.table3 where ( cate_id = '10001' and shop_id = 798322 ) and ( f6 = 6 )","index":6,"kexprId":-316668196,"tagCode":"t3","tagOptionCode":"f6"}]
(array_except(t[1],t[2],(array_intersect(t[4],t[5],t[6]))))
(select collect_set(user_id) from db1.table1 where ( cate_id = '10001' and shop_id = 798322 ) and ( f1 = '1' )),
(select collect_set(user_id) from db2.table2 where ( cate_id = '10001' and shop_id = 798322 ) and ( f2 = '22' )),
(select collect_set(user_id) from db2.table2 where ( cate_id = '10001' and shop_id = 798322 ) and ( f3 = 333 )),
(select collect_set(user_id) from db3.table3 where ( 1=1 ) and ( f4 = '4' )),
(select collect_set(user_id) from db3.table3 where ( 1=1 ) and ( f5 = 5 )),
(select collect_set(user_id) from db3.table3 where ( cate_id = '10001' and shop_id = 798322 ) and ( f6 = 6 ))
*/
fun main() {
val requestDTO = SQLQueryReqDTO()
val tableMappingMap: HashMap<String, List<KTableMapping>> = hashMapOf()
val expression = KunLunExpression()
expression.logic = LogicOperatorEnum.EXCEPT
val subExpressionList = arrayListOf<KunLunExpression>()
val e1 = KunLunExpression()
val e2 = KunLunExpression()
val e3 = KunLunExpression()

val dimList = listOf(
FieldCondition("", "t1", "cate_id", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("10001")),
FieldCondition("", "t1", "shop_id", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("798322"))
)

e1.fieldCondition = FieldCondition("", "t1", "f1", dimList, ArithmeticOperatorEnum.EQUAL, listOf("1"))
e2.fieldCondition = FieldCondition("", "t2", "f2", dimList, ArithmeticOperatorEnum.EQUAL, listOf("22"))
e3.fieldCondition = FieldCondition("", "t2", "f3", dimList, ArithmeticOperatorEnum.EQUAL, listOf("333"))
e3.logic = LogicOperatorEnum.AND

val e3SubExpressionList = arrayListOf<KunLunExpression>()
val e31 = KunLunExpression()
val e32 = KunLunExpression()
val e33 = KunLunExpression()
e3SubExpressionList.add(e31)
e3SubExpressionList.add(e32)
e3SubExpressionList.add(e33)
e31.fieldCondition = FieldCondition("", "t3", "f4", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("4"))
e32.fieldCondition = FieldCondition("", "t3", "f5", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("5"))
e33.fieldCondition = FieldCondition("", "t3", "f6", dimList, ArithmeticOperatorEnum.EQUAL, listOf("6"))
e3.subExpression = e3SubExpressionList

subExpressionList.add(e1)
subExpressionList.add(e2)
subExpressionList.add(e3)
expression.subExpression = subExpressionList
requestDTO.expression = expression

// KTableMapping(boolean rowMapping, String tableCode, String kTableCode, String physicDBName, KField targetField, KSource source, List<KFieldMapping> fields)
// KField(String columnCode, String fieldCode, KFieldValueType fieldType, String description)
// KSource(Long tagSrcTaskId, String tagSrcDb, String tagSrcTable, String tagSrcTableJoinField)
// KFieldMapping(KField srcField, KField dstField)

tableMappingMap["t1"] = listOf(KTableMapping(
"t1",
"table1",
"db1",
KField("user_id", "", KFieldValueType.STRING, ""),
KSource(0, "db1", "table1", "user_id"),
listOf(
KFieldMapping(
KField("f1", "", KFieldValueType.STRING, ""), // srcField
KField("f1", "", KFieldValueType.STRING, "") // dstField
),
KFieldMapping(
KField("cate_id", "", KFieldValueType.STRING, ""), // srcField
KField("cate_id", "", KFieldValueType.STRING, "") // dstField
),
KFieldMapping(
KField("shop_id", "", KFieldValueType.LONG, ""), // srcField
KField("shop_id", "", KFieldValueType.LONG, "") // dstField
),
)
))

tableMappingMap["t2"] = listOf(KTableMapping(
"t2",
"table2",
"db2",
KField("user_id", "", KFieldValueType.STRING, ""),
KSource(0, "db2", "table2", "user_id"),
listOf(
KFieldMapping(
KField("f2", "", KFieldValueType.STRING, ""), // srcField
KField("f2", "", KFieldValueType.STRING, "") // dstField
),
KFieldMapping(
KField("f3", "", KFieldValueType.LONG, ""), // srcField
KField("f3", "", KFieldValueType.LONG, "") // dstField
),
)
))

tableMappingMap["t3"] = listOf(KTableMapping(
"t3",
"table3",
"db3",
KField("user_id", "", KFieldValueType.STRING, ""),
KSource(0, "db3", "table3", "user_id"),
listOf(
KFieldMapping(
KField("f4", "", KFieldValueType.STRING, ""), // srcField
KField("f4", "", KFieldValueType.STRING, "") // dstField
),
KFieldMapping(
KField("f5", "", KFieldValueType.LONG, ""), // srcField
KField("f5", "", KFieldValueType.LONG, "") // dstField
),
KFieldMapping(
KField("f6", "", KFieldValueType.LONG, ""), // srcField
KField("f6", "", KFieldValueType.LONG, "") // dstField
),
)
))

val WideTableMultiDimHiveSQLParser = WideTableMultiDimHiveSQLParser()
val expr = WideTableMultiDimHiveSQLParser.expr(requestDTO, tableMappingMap)
val arrayLines = WideTableMultiDimHiveSQLParser.arrayLines(requestDTO, tableMappingMap)

println(expr)
println(arrayLines)
}


举报

相关推荐

0 条评论