diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 67819fe27f3263316adf081e3c3701b3344679ed..c1baf7d7d23043378b6ad25ce4f5b1182076cf67 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -3999,8 +3999,17 @@ static int32_t checkAndSetJoinCondInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tS return checkAndSetJoinCondInfo(pCmd, pQueryInfo, pExpr->pRight); } + tSqlExpr* pLeft = pExpr->pLeft; + tSqlExpr* pRight = pExpr->pRight; + if(pLeft->tokenId == TK_ARROW){ + pLeft = pLeft->pLeft; + } + if(pRight->tokenId == TK_ARROW){ + pRight = pRight->pLeft; + } + SColumnIndex index = COLUMN_INDEX_INITIALIZER; - if (getColumnIndexByName(&pExpr->pLeft->columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != TSDB_CODE_SUCCESS) { + if (getColumnIndexByName(&pLeft->columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != TSDB_CODE_SUCCESS) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg4); } @@ -4034,7 +4043,7 @@ static int32_t checkAndSetJoinCondInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tS int16_t leftIdx = index.tableIndex; index = (SColumnIndex)COLUMN_INDEX_INITIALIZER; - if (getColumnIndexByName(&pExpr->pRight->columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != TSDB_CODE_SUCCESS) { + if (getColumnIndexByName(&pRight->columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != TSDB_CODE_SUCCESS) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg4); } @@ -4293,8 +4302,23 @@ static bool validateJoinExprNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr const char* msg3 = "join column must have same type"; const char* msg4 = "self join is not allowed"; const char* msg5 = "join table must be the same type(table to table, super table to super table)"; + const char* msg6 = "tag json key must be string"; + const char* msg7 = "tag json key in json must be same"; tSqlExpr* pRight = pExpr->pRight; + if(pRight->tokenId == TK_ARROW){ + if(!IS_VAR_DATA_TYPE(pExpr->pLeft->pRight->value.nType) || pExpr->pLeft->pRight->value.nType != pExpr->pRight->pRight->value.nType){ + invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6); + return false; + } + if(pExpr->pLeft->pRight->value.nLen != pExpr->pRight->pRight->value.nLen + || strncmp(pExpr->pLeft->pRight->value.pz, pExpr->pRight->pRight->value.pz, pExpr->pRight->pRight->value.nLen) != 0){ + invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7); + return false; + } + + pRight = pExpr->pRight->pLeft; + } if (pRight->tokenId != TK_ID) { return true; @@ -4648,7 +4672,7 @@ static int32_t handleExprInQueryCond(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSql } } - if (pRight != NULL && pRight->tokenId == TK_ID) { // join on tag columns for stable query + if (pRight != NULL && (pRight->tokenId == TK_ID || pRight->tokenId == TK_ARROW)) { // join on tag columns for stable query if (!validateJoinExprNode(pCmd, pQueryInfo, *pExpr, &index)) { return TSDB_CODE_TSC_INVALID_OPERATION; } diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 27b50d36a47aaef5f83694d4dfd01ac25ed2d74d..e8cbc70a16ad1e83ddc9a32f543862d88176fcf2 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -137,6 +137,14 @@ do { \ } \ } while (0) +#define GET_JSON_KEY(exprInfo) \ +char* param = NULL; \ +int32_t paramLen = 0; \ +if(exprInfo->base.numOfParams > 0){ \ + param = exprInfo->base.param[0].pz; \ + paramLen = exprInfo->base.param[0].nLen; \ +} + uint64_t queryHandleId = 0; int32_t getMaximumIdleDurationSec() { @@ -3047,7 +3055,7 @@ void filterColRowsInDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SSDataBlock* pBlock static SColumnInfo* doGetTagColumnInfoById(SColumnInfo* pTagColList, int32_t numOfTags, int16_t colId); -static void doSetTagValueInParam(void* pTable, int32_t tagColId, tVariant *tag, int16_t type, int16_t bytes); +static void doSetTagValueInParam(void* pTable, char* param, int32_t paraLen, int32_t tagColId, tVariant *tag, int16_t type, int16_t bytes); static uint32_t doFilterByBlockTimeWindow(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) { SQLFunctionCtx* pCtx = pTableScanInfo->pCtx; @@ -3129,7 +3137,8 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa // compare tag first tVariant t = {0}; - doSetTagValueInParam(pRuntimeEnv->current->pTable, tagId, &t, pColInfo->type, pColInfo->bytes); + GET_JSON_KEY(pExprInfo) + doSetTagValueInParam(pRuntimeEnv->current->pTable, param, paramLen, tagId, &t, pColInfo->type, pColInfo->bytes); setTimestampListJoinInfo(pRuntimeEnv, &t, pRuntimeEnv->current); STSElem elem = tsBufGetElem(pRuntimeEnv->pTsBuf); @@ -3336,7 +3345,7 @@ int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) { * set tag value in SQLFunctionCtx * e.g.,tag information into input buffer */ -static void doSetTagValueInParam(void* pTable, int32_t tagColId, tVariant *tag, int16_t type, int16_t bytes) { +static void doSetTagValueInParam(void* pTable, char* param, int32_t paramLen, int32_t tagColId, tVariant *tag, int16_t type, int16_t bytes) { tVariantDestroy(tag); char* val = NULL; @@ -3359,10 +3368,13 @@ static void doSetTagValueInParam(void* pTable, int32_t tagColId, tVariant *tag, //tVariantCreateFromBinary(tag, varDataVal(val), varDataLen(val), type); } else if(type == TSDB_DATA_TYPE_JSON){ char jsonVal[TSDB_MAX_JSON_TAGS_LEN] = {0}; - getJsonTagValueAll(val, jsonVal, TSDB_MAX_JSON_TAGS_LEN); + if(param && paramLen > 0){ + getJsonTagValueElment(pTable, param, paramLen, jsonVal, bytes); + }else{ + getJsonTagValueAll(val, jsonVal, TSDB_MAX_JSON_TAGS_LEN); + } tVariantCreateFromBinary(tag, jsonVal, varDataTLen(jsonVal), type); - } - else { + } else { tVariantCreateFromBinary(tag, val, bytes, type); } } @@ -3391,8 +3403,8 @@ void setTagValue(SOperatorInfo* pOperatorInfo, void *pTable, SQLFunctionCtx* pCt int16_t tagColId = (int16_t)pExprInfo->base.param[0].i64; SColumnInfo* pColInfo = doGetTagColumnInfoById(pQueryAttr->tagColList, pQueryAttr->numOfTags, tagColId); - - doSetTagValueInParam(pTable, tagColId, &pCtx[0].tag, pColInfo->type, pColInfo->bytes); + GET_JSON_KEY(pExprInfo) + doSetTagValueInParam(pTable, param, paramLen, tagColId, &pCtx[0].tag, pColInfo->type, pColInfo->bytes); return; } else { // set tag value, by which the results are aggregated. @@ -3408,7 +3420,8 @@ void setTagValue(SOperatorInfo* pOperatorInfo, void *pTable, SQLFunctionCtx* pCt } // todo use tag column index to optimize performance - doSetTagValueInParam(pTable, pLocalExprInfo->base.colInfo.colId, &pCtx[idx].tag, pLocalExprInfo->base.resType, + GET_JSON_KEY(pLocalExprInfo) + doSetTagValueInParam(pTable, param, paramLen,pLocalExprInfo->base.colInfo.colId, &pCtx[idx].tag, pLocalExprInfo->base.resType, pLocalExprInfo->base.resBytes); if (IS_NUMERIC_TYPE(pLocalExprInfo->base.resType) @@ -4011,7 +4024,8 @@ void setCtxTagForJoin(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, SExpr int16_t tagColId = (int16_t)pExprInfo->base.param[0].i64; SColumnInfo* pColInfo = doGetTagColumnInfoById(pQueryAttr->tagColList, pQueryAttr->numOfTags, tagColId); - doSetTagValueInParam(pTable, tagColId, &pCtx->tag, pColInfo->type, pColInfo->bytes); + GET_JSON_KEY(pExprInfo) + doSetTagValueInParam(pTable, param, paramLen, tagColId, &pCtx->tag, pColInfo->type, pColInfo->bytes); int16_t tagType = pCtx[0].tag.nType; if (tagType == TSDB_DATA_TYPE_BINARY || tagType == TSDB_DATA_TYPE_NCHAR) {