From 9277029da7607d5c318afd925210e6254e0a1524 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 18 May 2021 10:42:18 +0800 Subject: [PATCH] [td-3186] --- src/client/inc/tscUtil.h | 2 +- src/client/inc/tsclient.h | 2 +- src/client/src/tscSQLParser.c | 4 +- src/client/src/tscServer.c | 2 +- src/client/src/tscSubquery.c | 47 +++++++----- src/client/src/tscUtil.c | 136 +++++++++++++++++++++++++++++----- src/query/inc/qExecutor.h | 2 +- src/query/src/qExecutor.c | 30 +++++--- 8 files changed, 172 insertions(+), 53 deletions(-) diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index de69e23b9b..ca1ea54e16 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -327,7 +327,7 @@ STableMeta* tscTableMetaDup(STableMeta* pTableMeta); int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAttr, void* addr); void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx, SSchema* pSchema); -void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo, SOperatorInfo* pOperator, char* sql, void* addr, int32_t stage); +void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo, SOperatorInfo* pOperator, char* sql, void* addr, int32_t stage); void* malloc_throw(size_t size); void* calloc_throw(size_t nmemb, size_t size); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 87da9c8b00..875b485cf2 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -454,7 +454,7 @@ int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo); void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo); void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock); -void handleDownstreamOperator(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSqlRes* pOutput); +void handleDownstreamOperator(SSqlRes** pRes, int32_t numOfUpstream, SQueryInfo* px, SSqlRes* pOutput); void destroyTableNameList(SSqlCmd* pCmd); void tscResetSqlCmd(SSqlCmd *pCmd, bool removeMeta); diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index c179ed1ec6..365ff4147a 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -7192,7 +7192,9 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) { STableMeta* pMeta = tscTableMetaDup(pTableMeta); STableMetaVgroupInfo p = {.pTableMeta = pMeta,}; - taosHashPut(pCmd->pTableMetaMap, name, strlen(name), &p, sizeof(STableMetaVgroupInfo)); + + const char* px = tNameGetTableName(pname); + taosHashPut(pCmd->pTableMetaMap, px, strlen(px), &p, sizeof(STableMetaVgroupInfo)); } else {// add to the retrieve table meta array list. char* t = strdup(name); taosArrayPush(plist, &t); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 08085d7216..f35be615a4 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1637,7 +1637,7 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) { list[i] = *pExprInfo; } - pQueryInfo->pQInfo = createQueryInfoFromQueryNode(pQueryInfo, list, &tableGroupInfo, NULL, NULL, pRes->pLocalMerger, MERGE_STAGE); + pQueryInfo->pQInfo = createQInfoFromQueryNode(pQueryInfo, list, &tableGroupInfo, NULL, NULL, pRes->pLocalMerger, MERGE_STAGE); } uint64_t localQueryId = 0; diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 90bc5c985f..406214aba0 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -3483,10 +3483,11 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) { return hasData; } -void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo, - SOperatorInfo* pSourceOperator, char* sql, void* merger, int32_t stage) { +// todo remove pExprs +void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo, + SOperatorInfo* pSourceOperator, char* sql, void* merger, int32_t stage) { assert(pQueryInfo != NULL); - int16_t numOfOutput = pQueryInfo->fieldsInfo.numOfOutput; +// int16_t numOfOutput = pQueryInfo->fieldsInfo.numOfOutput; SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo)); if (pQInfo == NULL) { @@ -3505,13 +3506,25 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, ST pQueryAttr->tableGroupInfo = *pTableGroupInfo; // calculate the result row size - for (int16_t col = 0; col < numOfOutput; ++col) { - assert(pExprs[col].base.resBytes > 0); - pQueryAttr->resultRowSize += pExprs[col].base.resBytes; + SExprInfo* pei = NULL; + int32_t num = 0; + if (pQueryAttr->pExpr3 != NULL) { + pei = pQueryAttr->pExpr3; + num = pQueryAttr->numOfExpr3; + } else if (pQueryAttr->pExpr2 != NULL) { + pei = pQueryAttr->pExpr2; + num = pQueryAttr->numOfExpr2; + } else { + pei = pQueryAttr->pExpr1; + num = pQueryAttr->numOfOutput; + } + + for (int16_t col = 0; col < num; ++col) { + pQueryAttr->resultRowSize += pei[col].base.resBytes; // keep the tag length - if (TSDB_COL_IS_TAG(pExprs[col].base.colInfo.flag)) { - pQueryAttr->tagLen += pExprs[col].base.resBytes; + if (TSDB_COL_IS_TAG(pei[col].base.colInfo.flag)) { + pQueryAttr->tagLen += pei[col].base.resBytes; } } @@ -3569,15 +3582,15 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, ST } } - for (int32_t i = 0; i < numOfOutput; ++i) { - SExprInfo* pExprInfo = &pExprs[i]; - if (pExprInfo->pExpr != NULL) { - tExprTreeDestroy(pExprInfo->pExpr, NULL); - pExprInfo->pExpr = NULL; - } - } - - tfree(pExprs); +// for (int32_t i = 0; i < numOfOutput; ++i) { +// SExprInfo* pExprInfo = &pExprs[i]; +// if (pExprInfo->pExpr != NULL) { +// tExprTreeDestroy(pExprInfo->pExpr, NULL); +// pExprInfo->pExpr = NULL; +// } +// } +// +// tfree(pExprs); createFilterInfo(pQueryAttr, 0); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index a35d44bba7..b1a7a34011 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -651,8 +651,16 @@ typedef struct SDummyInputInfo { SSqlRes *pRes; // refactor: remove it } SDummyInputInfo; +typedef struct SJoinStatus { + SSDataBlock* pBlock; // point to the upstream block + int32_t index; + bool completed;// current upstream is completed or not +} SJoinStatus; + typedef struct SJoinOperatorInfo { - int32_t a; + SSDataBlock *pRes; + SJoinStatus *status; + int32_t numOfUpstream; } SJoinOperatorInfo; SSDataBlock* doGetDataBlock(void* param, bool* newgroup) { @@ -685,6 +693,64 @@ SSDataBlock* doGetDataBlock(void* param, bool* newgroup) { return pBlock; } +SSDataBlock* doBlockJoin(void* param, bool* newgroup) { + SOperatorInfo *pOperator = (SOperatorInfo*) param; + assert(pOperator->numOfUpstream > 1); + + SSDataBlock* block0 = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); + SSDataBlock* block1 = pOperator->upstream[1]->exec(pOperator->upstream[1], newgroup); + + if (block1 == NULL || block0 == NULL) { + return NULL; + } + + assert(block0 != block1); + + SJoinOperatorInfo* pJoinInfo = pOperator->info; + pJoinInfo->status[0].pBlock = block0; + pJoinInfo->status[1].pBlock = block1; + + SJoinStatus* st0 = &pJoinInfo->status[0]; + SJoinStatus* st1 = &pJoinInfo->status[1]; + + while (st0->index < st0->pBlock->info.rows && st1->index < st1->pBlock->info.rows) { + SColumnInfoData* p0 = taosArrayGet(st0->pBlock->pDataBlock, 0); + SColumnInfoData* p1 = taosArrayGet(st1->pBlock->pDataBlock, 0); + + int64_t* ts0 = (int64_t*) p0->pData; + int64_t* ts1 = (int64_t*) p1->pData; + if (ts0[st0->index] == ts1[st1->index]) { // add to the final result buffer + // check if current output buffer is over the threshold to pause current loop + int32_t rows = pJoinInfo->pRes->info.rows; + for(int32_t j = 0; j < st0->pBlock->info.numOfCols; ++j) { + SColumnInfoData* pCol1 = taosArrayGet(pJoinInfo->pRes->pDataBlock, j); + SColumnInfoData* pSrc = taosArrayGet(st0->pBlock->pDataBlock, j); + + int32_t bytes = pSrc->info.bytes; + memcpy(pCol1->pData + rows * bytes, pSrc->pData + st0->index * bytes, bytes); + } + + for(int32_t j = 0; j < st1->pBlock->info.numOfCols; ++j) { + SColumnInfoData* pCol1 = taosArrayGet(pJoinInfo->pRes->pDataBlock, j + st0->pBlock->info.numOfCols); + SColumnInfoData* pSrc = taosArrayGet(st1->pBlock->pDataBlock, j); + + int32_t bytes = pSrc->info.bytes; + memcpy(pCol1->pData + rows * bytes, pSrc->pData + st1->index * bytes, bytes); + } + + st0->index++; + st1->index++; + pJoinInfo->pRes->info.rows++; + } else if (ts0[st0->index] < ts1[st1->index]) { + st0->index++; + } else { + st1->index++; + } + } + + return pJoinInfo->pRes; +} + static void destroyDummyInputOperator(void* param, int32_t numOfOutput) { SDummyInputInfo* pInfo = (SDummyInputInfo*) param; @@ -728,34 +794,36 @@ SOperatorInfo* createDummyInputOperator(char* pResult, SSchema* pSchema, int32_t return pOptr; } -SOperatorInfo* createJoinOperator(SOperatorInfo** pUpstream, int32_t numOfUpstream, SExprInfo* pExprInfo, int32_t numOfOutput) { - SJoinInfo* pInfo = calloc(1, sizeof(SJoinInfo)); -/* - pInfo->pRes = (SSqlRes*) pResult; - pInfo->block = calloc(numOfCols, sizeof(SSDataBlock)); - pInfo->block->info.numOfCols = numOfCols; +SOperatorInfo* createJoinOperator(SOperatorInfo** pUpstream, int32_t numOfUpstream, SSchema* pSchema, int32_t numOfOutput) { + SJoinOperatorInfo* pInfo = calloc(1, sizeof(SJoinOperatorInfo)); + pInfo->numOfUpstream = numOfUpstream; + pInfo->status = calloc(numOfUpstream, sizeof(SJoinStatus)); - pInfo->block->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); - for(int32_t i = 0; i < numOfCols; ++i) { + pInfo->pRes = calloc(1, sizeof(SSDataBlock)); + pInfo->pRes->info.numOfCols = numOfOutput; + + pInfo->pRes->pDataBlock = taosArrayInit(numOfOutput, sizeof(SColumnInfoData)); + for(int32_t i = 0; i < numOfOutput; ++i) { SColumnInfoData colData = {{0}}; colData.info.bytes = pSchema[i].bytes; colData.info.type = pSchema[i].type; colData.info.colId = pSchema[i].colId; + colData.pData = calloc(1, colData.info.bytes * 4096); - taosArrayPush(pInfo->block->pDataBlock, &colData); + taosArrayPush(pInfo->pRes->pDataBlock, &colData); } -*/ + SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo)); pOptr->name = "JoinOperator"; pOptr->operatorType = OP_Join; pOptr->numOfOutput = numOfOutput; pOptr->blockingOptr = false; pOptr->info = pInfo; - pOptr->exec = doGetDataBlock; + pOptr->exec = doBlockJoin; pOptr->cleanup = destroyDummyInputOperator; for(int32_t i = 0; i < numOfUpstream; ++i) { - appendUpstream(pOptr, pUpstream[0]); + appendUpstream(pOptr, pUpstream[i]); } return pOptr; @@ -775,7 +843,7 @@ void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo) { pRes->completed = (pRes->numOfRows == 0); } -void handleDownstreamOperator(SSqlRes* pRes, SQueryInfo* px, SSqlRes* pOutput) { +void handleDownstreamOperator(SSqlRes** pRes, int32_t numOfUpstream, SQueryInfo* px, SSqlRes* pOutput) { // handle the following query process if (px->pQInfo == NULL) { SColumnInfo* pColumnInfo = extractColumnInfoFromResult(px->colList); @@ -805,15 +873,44 @@ void handleDownstreamOperator(SSqlRes* pRes, SQueryInfo* px, SSqlRes* pOutput) { taosArrayPush(tableGroupInfo.pGroupList, &group); // if it is a join query, create join operator here - SOperatorInfo* pSourceOperator = createDummyInputOperator((char*)pRes, pSchema, numOfCols); + int32_t numOfCol1 = px->pTableMetaInfo[0]->pTableMeta->tableInfo.numOfColumns; + + SOperatorInfo* pSourceOperator = createDummyInputOperator((char*)pRes[0], pSchema, numOfCol1); + + SSchema* schema = NULL; if (px->numOfTables > 1) { + SOperatorInfo* p[2] = {0}; + p[0] = pSourceOperator; + + SSchema* pSchema1 = tscGetTableSchema(px->pTableMetaInfo[1]->pTableMeta); + numOfCol1 = px->pTableMetaInfo[1]->pTableMeta->tableInfo.numOfColumns; + + SOperatorInfo* pSourceOperator1 = createDummyInputOperator((char*)pRes[1], pSchema1, numOfCol1); + p[1] = pSourceOperator1; + + int32_t num = pSourceOperator->numOfOutput + pSourceOperator1->numOfOutput; + schema = calloc(num, sizeof(SSchema)); + + memcpy(&schema[0], pSchema, pSourceOperator->numOfOutput * sizeof(SSchema)); - pSourceOperator = createJoinOperator(&pSourceOperator, 1, NULL, pSourceOperator->numOfOutput); + memcpy(&schema[pSourceOperator->numOfOutput], pSchema1, pSourceOperator1->numOfOutput * sizeof(SSchema)); + pSourceOperator = createJoinOperator(p, px->numOfTables, schema, num); } SExprInfo* exprInfo = NULL; /*int32_t code = */ createQueryFunc(&info, numOfOutput, &exprInfo, px->exprList->pData, NULL, px->type, NULL); - px->pQInfo = createQueryInfoFromQueryNode(px, exprInfo, &tableGroupInfo, pSourceOperator, NULL, NULL, MASTER_SCAN); + for(int32_t i = 0; i < numOfOutput; ++i) { + SExprInfo* pex = taosArrayGetP(px->exprList, i); + int32_t colId = pex->base.colInfo.colId; + for(int32_t j = 0; j < pSourceOperator->numOfOutput; ++j) { + if (colId == schema[j].colId) { + pex->base.colInfo.colIndex = j; + break; + } + } + } + + px->pQInfo = createQInfoFromQueryNode(px, exprInfo, &tableGroupInfo, pSourceOperator, NULL, NULL, MASTER_SCAN); tfree(pColumnInfo); } @@ -3011,11 +3108,10 @@ static void doRetrieveSubqueryData(SSchedMsg *pMsg) { if (numOfRows > 0) { SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd); - SSqlObj *pSub = pSql->pSubs[0]; - handleDownstreamOperator(&pSub->res, pQueryInfo, &pSql->res); + SSqlRes* list[2] = {&pSql->pSubs[0]->res, &pSql->pSubs[1]->res}; + handleDownstreamOperator(list, 2, pQueryInfo, &pSql->res); } -// int32_t code = pSql->res.code; pSql->res.qId = -1; if (pSql->res.code == TSDB_CODE_SUCCESS) { (*pSql->fp)(pSql->param, pSql, pSql->res.numOfRows); diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 8e22b4e208..a39819cb51 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -521,7 +521,7 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter); -SOperatorInfo* createJoinOperator(SOperatorInfo** pUpstream, int32_t numOfUpstream, SExprInfo* pExprInfo, int32_t numOfOutput); +SOperatorInfo* createJoinOperator(SOperatorInfo** pUpstream, int32_t numOfUpstream, SSchema* pSchema, int32_t numOfOutput); SSDataBlock* doGlobalAggregate(void* param, bool* newgroup); SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 5035e9cbd7..6c92293bfd 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -1750,7 +1750,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf SOperatorInfo* prev = pRuntimeEnv->proot; if (i == 0) { pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, prev, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); - if (pRuntimeEnv->proot != NULL && prev->operatorType != OP_DummyInput) { // TODO refactor + if (pRuntimeEnv->proot != NULL && prev->operatorType != OP_DummyInput && prev->operatorType != OP_Join) { // TODO refactor setTableScanFilterOperatorInfo(prev->info, pRuntimeEnv->proot); } } else { @@ -3929,11 +3929,11 @@ void queryCostStatis(SQInfo *pQInfo) { void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream) { if (p->upstream == NULL) { - assert(p->numOfOutput == 0); + assert(p->numOfUpstream == 0); } - p->upstream = realloc(p->upstream, POINTER_BYTES * (p->numOfOutput + 1)); - p->upstream[p->numOfOutput++] = pUpstream; + p->upstream = realloc(p->upstream, POINTER_BYTES * (p->numOfUpstream + 1)); + p->upstream[p->numOfUpstream++] = pUpstream; } static void doDestroyTableQueryInfo(STableGroupInfo* pTableqinfoGroupInfo); @@ -4824,7 +4824,7 @@ static SSDataBlock* doArithmeticOperation(void* param, bool* newgroup) { bool prevVal = *newgroup; // The upstream exec may change the value of the newgroup, so use a local variable instead. - SSDataBlock* pBlock = pOperator->upstream[0]->exec(pOperator->upstream, newgroup); + SSDataBlock* pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); if (pBlock == NULL) { assert(*newgroup == false); @@ -4878,7 +4878,7 @@ static SSDataBlock* doLimit(void* param, bool* newgroup) { SSDataBlock* pBlock = NULL; while (1) { - pBlock = pOperator->upstream[0]->exec(pOperator->upstream, newgroup); + pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); if (pBlock == NULL) { setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); pOperator->status = OP_EXEC_DONE; @@ -4949,7 +4949,7 @@ static SSDataBlock* doFilter(void* param, bool* newgroup) { SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; while (1) { - SSDataBlock *pBlock = pOperator->upstream[0]->exec(pOperator->upstream, newgroup); + SSDataBlock *pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); if (pBlock == NULL) { break; } @@ -5219,7 +5219,7 @@ static SSDataBlock* doFill(void* param, bool* newgroup) { } while(1) { - SSDataBlock* pBlock = pOperator->upstream[0]->exec(pOperator->upstream, newgroup); + SSDataBlock* pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); if (*newgroup) { assert(pBlock != NULL); } @@ -5295,7 +5295,15 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) { pOperator->cleanup(pOperator->info, pOperator->numOfOutput); } - destroyOperatorInfo(pOperator->upstream[0]); + if (pOperator->upstream != NULL) { + for(int32_t i = 0; i < pOperator->numOfUpstream; ++i) { + destroyOperatorInfo(pOperator->upstream[i]); + } + + tfree(pOperator->upstream); + pOperator->numOfUpstream = 0; + } + tfree(pOperator->info); tfree(pOperator); } @@ -5844,7 +5852,7 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) { pRes->info.rows = 0; SSDataBlock* pBlock = NULL; while(1) { - pBlock = pOperator->upstream[0]->exec(pOperator->upstream, newgroup); + pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); if (pBlock == NULL) { setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); pOperator->status = OP_EXEC_DONE; @@ -6458,7 +6466,7 @@ static int32_t updateOutputBufForTopBotQuery(SQueriedTableInfo* pTableInfo, SCol return TSDB_CODE_SUCCESS; } -// TODO tag length should be passed from client +// TODO tag length should be passed from client, refactor int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExprInfo** pExprInfo, SSqlExpr** pExprMsg, SColumnInfo* pTagCols, int32_t queryType, void* pMsg) { *pExprInfo = NULL; -- GitLab