From 006d59bfd79e1a82f0ae70fbc710a5335b0fcdce Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 29 Apr 2021 15:05:54 +0800 Subject: [PATCH] [td-3299] --- src/client/inc/tscUtil.h | 1 + src/client/inc/tsclient.h | 6 ++ src/client/src/tscLocalMerge.c | 36 ++++---- src/client/src/tscSubquery.c | 18 +--- src/client/src/tscUtil.c | 3 +- src/query/src/qPlan.c | 145 ++++++++++++++++++++++++++++----- 6 files changed, 152 insertions(+), 57 deletions(-) diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 9220754330..68f55c9912 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -200,6 +200,7 @@ SColumn* tscColumnClone(const SColumn* src); bool tscColumnExists(SArray* pColumnList, int32_t columnIndex, uint64_t uid); SColumn* tscColumnListInsert(SArray* pColumnList, int32_t columnIndex, uint64_t uid, SSchema* pSchema); void tscColumnListDestroy(SArray* pColList); +void tscColumnListCopy(SArray* dst, const SArray* src, uint64_t tableUid); void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index ec3b0c4421..749a896bfd 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -238,6 +238,12 @@ typedef struct SQueryInfo { SArray *pUpstream; // SArray struct SQueryInfo *pDownstream; int32_t havingFieldNum; + bool stableQuery; + bool groupbyColumn; + bool simpleAgg; + bool arithmeticOnAgg; + bool projectionQuery; + bool hasFilter; } SQueryInfo; typedef struct { diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index f1e0196d68..f192d6d1f0 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -14,7 +14,7 @@ */ #include "tscLocalMerge.h" -//#include "tscSubquery.h" +#include "tscSubquery.h" #include "os.h" #include "texpr.h" #include "tlosertree.h" @@ -96,9 +96,7 @@ int32_t tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tO return TSDB_CODE_TSC_APP_ERROR; } - size_t size = sizeof(SLocalMerger) + POINTER_BYTES * numOfFlush; - - *pMerger = (SLocalMerger *) calloc(1, size); + *pMerger = (SLocalMerger *) calloc(1, sizeof(SLocalMerger)); if ((*pMerger) == NULL) { tscError("0x%"PRIx64" failed to create local merge structure, out of memory", id); @@ -107,7 +105,7 @@ int32_t tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tO } (*pMerger)->pExtMemBuffer = pMemBuffer; - (*pMerger)->pLocalDataSrc = (SLocalDataSource **)&pMerger[1]; + (*pMerger)->pLocalDataSrc = calloc(numOfFlush, POINTER_BYTES); assert((*pMerger)->pLocalDataSrc != NULL); (*pMerger)->numOfBuffer = numOfFlush; @@ -304,19 +302,22 @@ void tscDestroyLocalMerger(SLocalMerger* pLocalMerger) { return; } - if (pLocalMerger->pLoserTree) { - tfree(pLocalMerger->pLoserTree->param); - tfree(pLocalMerger->pLoserTree); - } - - tscLocalReducerEnvDestroy(pLocalMerger->pExtMemBuffer, pLocalMerger->pDesc, pLocalMerger->numOfVnode); for (int32_t i = 0; i < pLocalMerger->numOfBuffer; ++i) { tfree(pLocalMerger->pLocalDataSrc[i]); } pLocalMerger->numOfBuffer = 0; + tscLocalReducerEnvDestroy(pLocalMerger->pExtMemBuffer, pLocalMerger->pDesc, pLocalMerger->numOfVnode); + pLocalMerger->numOfCompleted = 0; + + if (pLocalMerger->pLoserTree) { + tfree(pLocalMerger->pLoserTree->param); + tfree(pLocalMerger->pLoserTree); + } + tfree(pLocalMerger->buf); + tfree(pLocalMerger->pLocalDataSrc); free(pLocalMerger); } @@ -786,11 +787,10 @@ SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup) { SColIndex * pIndex = taosArrayGet(pInfo->orderColumnList, i); SColumnInfoData *pColInfo = taosArrayGet(pInfo->binfo.pRes->pDataBlock, pIndex->colIndex); - char *newRow = - COLMODEL_GET_VAL(pOneDataSrc->filePage.data, pOneDataSrc->pMemBuffer->pColumnModel, - pOneDataSrc->rowIdx, pIndex->colIndex); + char *newRow = COLMODEL_GET_VAL(pOneDataSrc->filePage.data, pOneDataSrc->pMemBuffer->pColumnModel, + pOneDataSrc->rowIdx, pIndex->colIndex); - char * data = pInfo->prevRow[i]; + char *data = pInfo->prevRow[i]; int32_t ret = columnValueAscendingComparator(data, newRow, pColInfo->info.type, pColInfo->info.bytes); if (ret == 0) { continue; @@ -809,9 +809,8 @@ SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup) { SColIndex * pIndex = taosArrayGet(pInfo->orderColumnList, i); SColumnInfoData *pColInfo = taosArrayGet(pInfo->binfo.pRes->pDataBlock, pIndex->colIndex); - char *curCol = - COLMODEL_GET_VAL(pOneDataSrc->filePage.data, pOneDataSrc->pMemBuffer->pColumnModel, - pOneDataSrc->rowIdx, pIndex->colIndex); + char *curCol = COLMODEL_GET_VAL(pOneDataSrc->filePage.data, pOneDataSrc->pMemBuffer->pColumnModel, + pOneDataSrc->rowIdx, pIndex->colIndex); memcpy(pInfo->prevRow[i], curCol, pColInfo->info.bytes); } @@ -956,7 +955,6 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) { if (pInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP && pRes->info.rows > 0) { STimeWindow* w = &pRes->info.window; - // TODO in case of desc order, swap it w->skey = *(int64_t*)pInfoData->pData; w->ekey = *(int64_t*)(((char*)pInfoData->pData) + TSDB_KEYSIZE * (pRes->info.rows - 1)); diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 5e78901d6f..8e5d26337d 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -65,15 +65,11 @@ static void skipRemainValue(STSBuf* pTSBuf, tVariant* tag1) { } static void subquerySetState(SSqlObj *pSql, SSubqueryState *subState, int idx, int8_t state) { - assert(idx < subState->numOfSub); - assert(subState->states); + assert(idx < subState->numOfSub && subState->states != NULL); + tscDebug("subquery:0x%"PRIx64",%d state set to %d", pSql->self, idx, state); pthread_mutex_lock(&subState->mutex); - - tscDebug("subquery:0x%"PRIx64",%d state set to %d", pSql->self, idx, state); - subState->states[idx] = state; - pthread_mutex_unlock(&subState->mutex); } @@ -110,13 +106,9 @@ static bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx) { pthread_mutex_lock(&subState->mutex); bool done = allSubqueryDone(pParentSql); - if (done) { - tscDebug("0x%"PRIx64" subquery:0x%"PRIx64",%d all subs already done", pParentSql->self, - pSql->self, idx); - + tscDebug("0x%"PRIx64" subquery:0x%"PRIx64",%d all subs already done", pParentSql->self, pSql->self, idx); pthread_mutex_unlock(&subState->mutex); - return false; } @@ -125,9 +117,7 @@ static bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx) { subState->states[idx] = 1; done = allSubqueryDone(pParentSql); - pthread_mutex_unlock(&subState->mutex); - return done; } @@ -701,8 +691,6 @@ void freeJoinSubqueryObj(SSqlObj* pSql) { } tfree(pSql->subState.states); - - pSql->subState.numOfSub = 0; } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 95acce3d77..7e30e92b9c 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -829,7 +829,9 @@ void tscResetSqlCmd(SSqlCmd* pCmd, bool removeMeta) { void tscFreeSqlResult(SSqlObj* pSql) { SSqlRes* pRes = &pSql->res; + tscDestroyLocalMerger(pRes->pLocalMerger); + pRes->pLocalMerger = NULL; tscDestroyResPointerInfo(pRes); memset(&pSql->res, 0, sizeof(SSqlRes)); @@ -853,7 +855,6 @@ void tscFreeSubobj(SSqlObj* pSql) { } tfree(pSql->subState.states); - pSql->subState.numOfSub = 0; } diff --git a/src/query/src/qPlan.c b/src/query/src/qPlan.c index 0554a887ec..89a3e07b10 100644 --- a/src/query/src/qPlan.c +++ b/src/query/src/qPlan.c @@ -1,37 +1,138 @@ +#include #include "os.h" -#include "tsclient.h" #include "qUtil.h" #include "texpr.h" +#include "tsclient.h" -#define QNODE_PROJECT 1 -#define QNODE_FILTER 2 -#define QNODE_RELATION 3 -#define QNODE_AGGREGATE 4 -#define QNODE_GROUPBY 5 -#define QNODE_LIMIT 6 -#define QNODE_JOIN 7 -#define QNODE_DIST 8 -#define QNODE_SORT 9 -#define QNODE_UNIONALL 10 -#define QNODE_TIMEWINDOW 11 +#define QNODE_PROJECT 1 +#define QNODE_FILTER 2 +#define QNODE_TABLESCAN 3 +#define QNODE_AGGREGATE 4 +#define QNODE_GROUPBY 5 +#define QNODE_LIMIT 6 +#define QNODE_JOIN 7 +#define QNODE_DISTINCT 8 +#define QNODE_SORT 9 +#define QNODE_UNIONALL 10 +#define QNODE_TIMEWINDOW 11 +#define QNODE_SESSIONWINDOW 12 +#define QNODE_FILL 13 + +typedef struct SQueryNodeBasicInfo { + int32_t type; + char *name; +} SQueryNodeBasicInfo; typedef struct SQueryNode { - int32_t type; // the type of logic node - char *name; // the name of logic node + SQueryNodeBasicInfo info; +// char *name; // the name of logic node +// int32_t type; // the type of logic node - SSchema *pSchema; // the schema of the input SSDatablock - int32_t numOfCols; // number of input columns - SExprInfo *pExpr; // the query functions or sql aggregations - int32_t numOfOutput; // number of result columns, which is also the number of pExprs + SSchema *pSchema; // the schema of the input SSDatablock + int32_t numOfCols; // number of input columns + SExprInfo *pExpr; // the query functions or sql aggregations + int32_t numOfOutput; // number of result columns, which is also the number of pExprs // previous operator to generated result for current node to process // in case of join, multiple prev nodes exist. - struct SQueryNode* prevNode; - struct SQueryNode* nextNode; + SArray *pPrevNodes;// upstream nodes + struct SQueryNode *nextNode; } SQueryNode; -// TODO create the query plan -SQueryNode* qCreateQueryPlan(SQueryInfo* pQueryInfo) { +static SQueryNode* createQueryNode(int32_t type, const char* name, SQueryNode** prev, int32_t numOfPrev) { + SQueryNode* pNode = calloc(1, sizeof(SQueryNode)); + pNode->info.type = type; + pNode->info.name = strdup(name); + pNode->pPrevNodes = taosArrayInit(4, POINTER_BYTES); + for(int32_t i = 0; i < numOfPrev; ++i) { + taosArrayPush(pNode->pPrevNodes, &prev[i]); + } + + return pNode; +} + +static SQueryNode* doCreateQueryPlanForOneTable(SQueryInfo* pQueryInfo) { + SQueryNode* pNode = createQueryNode(QNODE_TABLESCAN, "", NULL, 0); + + // check for filter + if (pQueryInfo->hasFilter) { + pNode = createQueryNode(QNODE_FILTER, "", &pNode, 1); + } + + if (pQueryInfo->distinctTag) { + pNode = createQueryNode(QNODE_DISTINCT, "", &pNode, 0); + + } else if (pQueryInfo->projectionQuery) { + pNode = createQueryNode(QNODE_PROJECT, "", &pNode, 1); + } else { // check for aggregation + if (pQueryInfo->interval.interval > 0) { + pNode = createQueryNode(QNODE_TIMEWINDOW, "", &pNode, 1); + } else if (pQueryInfo->groupbyColumn) { + pNode = createQueryNode(QNODE_GROUPBY, "", &pNode, 1); + } else if (pQueryInfo->sessionWindow.gap > 0) { + pNode = createQueryNode(QNODE_SESSIONWINDOW, "", &pNode, 1); + } else if (pQueryInfo->simpleAgg) { + pNode = createQueryNode(QNODE_AGGREGATE, "", &pNode, 1); + } + + if (pQueryInfo->havingFieldNum > 0) { + pNode = createQueryNode(QNODE_FILTER, "", &pNode, 1); + } + + if (pQueryInfo->arithmeticOnAgg) { + pNode = createQueryNode(QNODE_PROJECT, "", &pNode, 1); + } + + if (pQueryInfo->fillType != TSDB_FILL_NONE) { + pNode = createQueryNode(QNODE_FILL, "", &pNode, 1); + } + } + + if (pQueryInfo->limit.limit != -1 || pQueryInfo->limit.offset != 0) { + pNode = createQueryNode(QNODE_LIMIT, "", &pNode, 1); + } + + return pNode; +} + +SArray* qCreateQueryPlan(SQueryInfo* pQueryInfo) { + // join and subquery + SArray* upstream = NULL; + if (pQueryInfo->pUpstream != NULL) { // subquery in the from clause + upstream = taosArrayInit(4, POINTER_BYTES); + + size_t size = taosArrayGetSize(pQueryInfo->pUpstream); + for(int32_t i = 0; i < size; ++i) { + SQueryInfo* pq = taosArrayGet(pQueryInfo->pUpstream, i); + SArray* p = qCreateQueryPlan(pq); + taosArrayPushBatch(upstream, p->pData, (int32_t) taosArrayGetSize(p)); + } + } + + if (pQueryInfo->numOfTables > 1) { // it is a join query + // 1. separate the select clause according to table + int32_t tableIndex = 0; + STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[tableIndex]; + uint64_t uid = pTableMetaInfo->pTableMeta->id.uid; + + SArray* exprList = taosArrayInit(4, POINTER_BYTES); + if (tscSqlExprCopy(exprList, pQueryInfo->exprList, uid, true) != 0) { + terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; + exit(-1); + } + + SArray* tableColumnList = taosArrayInit(4, sizeof(SColumn)); + tscColumnListCopy(tableColumnList, pQueryInfo->colList, uid); + + + // 2. + SQueryNode* pNode = doCreateQueryPlanForOneTable(pQueryInfo); + UNUSED(pNode); + } else { // only one table, normal query process + SQueryNode* pNode = doCreateQueryPlanForOneTable(pQueryInfo); + UNUSED(pNode); + } + return NULL; } -- GitLab