diff --git a/src/client/inc/tscSubquery.h b/src/client/inc/tscSubquery.h index 07e05803979c93f00312e17da0fbd6822b2a522b..7876069ccee841485fae9069d1116c13f385ce7b 100644 --- a/src/client/inc/tscSubquery.h +++ b/src/client/inc/tscSubquery.h @@ -28,7 +28,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql); void tscSetupOutputColumnIndex(SSqlObj* pSql); void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code); -SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, int32_t index); +SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, int32_t index); void tscHandleMasterJoinQuery(SSqlObj* pSql); diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 594226b1fc2925cc8f36faed58125e1be715d9bd..76a9bbac10655e6487614c6b9de230cabf19e0c5 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -109,7 +109,6 @@ SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint void* tscDestroyBlockArrayList(SArray* pDataBlockList); int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock); -void tscFreeUnusedDataBlocks(SArray* pDataBlockList); int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pDataList); int32_t tscGetDataBlockFromList(void* pHashList, SArray* pDataBlockList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, const char* tableId, STableMeta* pTableMeta, diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index d2f74bdd5923ce0d3aec9afab68bcf13a092ccb8..d800d62c8ad1da8c2a485411021770d6b3a368eb 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -554,27 +554,48 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCm numOfGroupByCols++; } - int32_t *orderIdx = (int32_t *)calloc(numOfGroupByCols, sizeof(int32_t)); - if (orderIdx == NULL) { + int32_t *orderColIndexList = (int32_t *)calloc(numOfGroupByCols, sizeof(int32_t)); + if (orderColIndexList == NULL) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } if (numOfGroupByCols > 0) { - int32_t startCols = pQueryInfo->fieldsInfo.numOfOutput - pQueryInfo->groupbyExpr.numOfGroupCols; - // tags value locate at the last columns - for (int32_t i = 0; i < pQueryInfo->groupbyExpr.numOfGroupCols; ++i) { - orderIdx[i] = startCols++; - } + if (pQueryInfo->groupbyExpr.numOfGroupCols > 0) { + int32_t startCols = pQueryInfo->fieldsInfo.numOfOutput - pQueryInfo->groupbyExpr.numOfGroupCols; + + // the last "pQueryInfo->groupbyExpr.numOfGroupCols" columns are order-by columns + for (int32_t i = 0; i < pQueryInfo->groupbyExpr.numOfGroupCols; ++i) { + orderColIndexList[i] = startCols++; + } + + if (pQueryInfo->interval.interval != 0) { + // the first column is the timestamp, handles queries like "interval(10m) group by tags" + orderColIndexList[numOfGroupByCols - 1] = PRIMARYKEY_TIMESTAMP_COL_INDEX; //TODO ??? + } + } else { + /* + * 1. the orderby ts asc/desc projection query for the super table + * 2. interval query without groupby clause + */ + if (pQueryInfo->interval.interval != 0) { + orderColIndexList[0] = PRIMARYKEY_TIMESTAMP_COL_INDEX; + } else { + size_t size = tscSqlExprNumOfExprs(pQueryInfo); + for (int32_t i = 0; i < size; ++i) { + SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i); + if (pExpr->functionId == TSDB_FUNC_PRJ && pExpr->colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { + orderColIndexList[0] = i; + } + } + } - if (pQueryInfo->interval.interval != 0) { - // the first column is the timestamp, handles queries like "interval(10m) group by tags" - orderIdx[numOfGroupByCols - 1] = PRIMARYKEY_TIMESTAMP_COL_INDEX; + assert(pQueryInfo->order.orderColId == PRIMARYKEY_TIMESTAMP_COL_INDEX); } } - *pOrderDesc = tOrderDesCreate(orderIdx, numOfGroupByCols, pModel, pQueryInfo->order.order); - taosTFree(orderIdx); + *pOrderDesc = tOrderDesCreate(orderColIndexList, numOfGroupByCols, pModel, pQueryInfo->order.order); + taosTFree(orderColIndexList); if (*pOrderDesc == NULL) { return TSDB_CODE_TSC_OUT_OF_MEMORY; @@ -588,7 +609,6 @@ bool isSameGroup(SSqlCmd *pCmd, SLocalReducer *pReducer, char *pPrev, tFilePage // disable merge procedure for column projection query int16_t functionId = pReducer->pCtx[0].functionId; - assert(functionId != TSDB_FUNC_ARITHM); if (pReducer->orderPrjOnSTable) { return true; } @@ -606,7 +626,7 @@ bool isSameGroup(SSqlCmd *pCmd, SLocalReducer *pReducer, char *pPrev, tFilePage return true; } - if (orderInfo->pData[numOfCols - 1] == PRIMARYKEY_TIMESTAMP_COL_INDEX) { + if (orderInfo->colIndex[numOfCols - 1] == PRIMARYKEY_TIMESTAMP_COL_INDEX) { /* * super table interval query * if the order columns is the primary timestamp, all result data belongs to one group @@ -620,7 +640,7 @@ bool isSameGroup(SSqlCmd *pCmd, SLocalReducer *pReducer, char *pPrev, tFilePage } // only one row exists - int32_t index = orderInfo->pData[0]; + int32_t index = orderInfo->colIndex[0]; int32_t offset = (pOrderDesc->pColumnModel)->pFields[index].offset; int32_t ret = memcmp(pPrev + offset, tmpBuffer->data + offset, pOrderDesc->pColumnModel->rowSize - offset); @@ -661,7 +681,6 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr pSchema[i].bytes = pExpr->resBytes; pSchema[i].type = (int8_t)pExpr->resType; - rlen += pExpr->resBytes; } @@ -701,12 +720,8 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr int16_t type = -1; int16_t bytes = 0; - // if ((pExpr->functionId >= TSDB_FUNC_FIRST_DST && pExpr->functionId <= TSDB_FUNC_LAST_DST) || - // (pExpr->functionId >= TSDB_FUNC_SUM && pExpr->functionId <= TSDB_FUNC_MAX) || - // pExpr->functionId == TSDB_FUNC_LAST_ROW) { // the final result size and type in the same as query on single table. // so here, set the flag to be false; - int32_t functionId = pExpr->functionId; if (functionId >= TSDB_FUNC_TS && functionId <= TSDB_FUNC_DIFF) { type = pModel->pFields[i].field.type; diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 7fd7ce26dd2ca72ec796e7bd49aaa839e87124fc..055d44ff1ede33a0a8f78900a1189a3289b37a6e 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -626,6 +626,11 @@ int32_t parseIntervalClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQ return TSDB_CODE_SUCCESS; } + // orderby column not set yet, set it to be the primary timestamp column + if (pQueryInfo->order.orderColId == INT32_MIN) { + pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX; + } + // interval is not null SStrToken* t = &pQuerySql->interval; if (parseNatualDuration(t->z, t->n, &pQueryInfo->interval.interval, &pQueryInfo->interval.intervalUnit) != TSDB_CODE_SUCCESS) { @@ -1347,6 +1352,32 @@ static void addProjectQueryCol(SQueryInfo* pQueryInfo, int32_t startPos, SColumn insertResultField(pQueryInfo, startPos, &ids, pExpr->resBytes, (int8_t)pExpr->resType, pExpr->aliasName, pExpr); } +static void addPrimaryTsColIntoResult(SQueryInfo* pQueryInfo) { + // primary timestamp column has been added already + size_t size = tscSqlExprNumOfExprs(pQueryInfo); + for (int32_t i = 0; i < size; ++i) { + SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); + if (pExpr->functionId == TSDB_FUNC_PRJ && pExpr->colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { + return; + } + } + + SColumnIndex index = {0}; + + // set the constant column value always attached to first table. + STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, PRIMARYKEY_TIMESTAMP_COL_INDEX); + + // add the timestamp column into the output columns + int32_t numOfCols = (int32_t)tscSqlExprNumOfExprs(pQueryInfo); + tscAddSpecialColumnForSelect(pQueryInfo, numOfCols, TSDB_FUNC_PRJ, &index, pSchema, TSDB_COL_NORMAL); + + SFieldSupInfo* pSupInfo = tscFieldInfoGetSupp(&pQueryInfo->fieldsInfo, numOfCols); + pSupInfo->visible = false; + + pQueryInfo->type |= TSDB_QUERY_TYPE_PROJECTION_QUERY; +} + int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSelection, bool isSTable, bool joinQuery) { assert(pSelection != NULL && pCmd != NULL); @@ -1400,20 +1431,7 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel // there is only one user-defined column in the final result field, add the timestamp column. size_t numOfSrcCols = taosArrayGetSize(pQueryInfo->colList); if (numOfSrcCols <= 0 && !tscQueryTags(pQueryInfo)) { - SColumnIndex index = {0}; - - // set the constant column value always attached to first table. - STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, clauseIndex, 0); - SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, PRIMARYKEY_TIMESTAMP_COL_INDEX); - - // add the timestamp column into the output columns - int32_t numOfCols = (int32_t)tscSqlExprNumOfExprs(pQueryInfo); - tscAddSpecialColumnForSelect(pQueryInfo, numOfCols, TSDB_FUNC_PRJ, &index, pSchema, TSDB_COL_NORMAL); - - SFieldSupInfo* pSupInfo = tscFieldInfoGetSupp(&pQueryInfo->fieldsInfo, numOfCols); - pSupInfo->visible = false; - - pQueryInfo->type |= TSDB_QUERY_TYPE_PROJECTION_QUERY; + addPrimaryTsColIntoResult(pQueryInfo); } if (!functionCompatibleCheck(pQueryInfo, joinQuery)) { @@ -4371,14 +4389,13 @@ int32_t parseFillClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuery static void setDefaultOrderInfo(SQueryInfo* pQueryInfo) { /* set default timestamp order information for all queries */ - pQueryInfo->order.order = TSDB_ORDER_ASC; STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + pQueryInfo->order.order = TSDB_ORDER_ASC; if (isTopBottomQuery(pQueryInfo)) { - pQueryInfo->order.order = TSDB_ORDER_ASC; pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX; - } else { - pQueryInfo->order.orderColId = -1; + } else { // in case of select tbname from super_table, the defualt order column can not be the primary ts column + pQueryInfo->order.orderColId = INT32_MIN; } /* for super table query, set default ascending order for group output */ @@ -4482,6 +4499,11 @@ int32_t parseOrderbyClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQu } else { pQueryInfo->order.order = pSortorder->a[0].sortOrder; pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX; + + // orderby ts query on super table + if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { + addPrimaryTsColIntoResult(pQueryInfo); + } } } @@ -6277,6 +6299,11 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { return TSDB_CODE_TSC_INVALID_SQL; } + // set order by info + if (parseOrderbyClause(pCmd, pQueryInfo, pQuerySql, tscGetTableSchema(pTableMetaInfo->pTableMeta)) != TSDB_CODE_SUCCESS) { + return TSDB_CODE_TSC_INVALID_SQL; + } + // set interval value if (parseIntervalClause(pCmd, pQueryInfo, pQuerySql) != TSDB_CODE_SUCCESS) { return TSDB_CODE_TSC_INVALID_SQL; @@ -6287,11 +6314,6 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { } } - // set order by info - if (parseOrderbyClause(pCmd, pQueryInfo, pQuerySql, tscGetTableSchema(pTableMetaInfo->pTableMeta)) != TSDB_CODE_SUCCESS) { - return TSDB_CODE_TSC_INVALID_SQL; - } - // user does not specified the query time window, twa is not allowed in such case. if ((pQueryInfo->window.skey == INT64_MIN || pQueryInfo->window.ekey == INT64_MAX || (pQueryInfo->window.ekey == INT64_MAX / 1000 && tinfo.precision == TSDB_TIME_PRECISION_MILLI)) && tscIsTWAQuery(pQueryInfo)) { diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 5ccb2aee8c62a8ad200115c6ca5b534f6a357275..49759bc4d30fa9a9a89591c33c3dc6666894d1b2 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -166,7 +166,7 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ } // todo handle failed to create sub query -SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, int32_t index) { +SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, int32_t index) { SJoinSupporter* pSupporter = calloc(1, sizeof(SJoinSupporter)); if (pSupporter == NULL) { return NULL; @@ -1300,11 +1300,11 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { int32_t code = TSDB_CODE_SUCCESS; // todo add test - SSubqueryState *pState = calloc(1, sizeof(SSubqueryState)); - if (pState == NULL) { - code = TSDB_CODE_TSC_OUT_OF_MEMORY; - goto _error; - } +// SSubqueryState *pState = calloc(1, sizeof(SSubqueryState)); +// if (pState == NULL) { +// code = TSDB_CODE_TSC_OUT_OF_MEMORY; +// goto _error; +// } pSql->subState.numOfSub = pQueryInfo->numOfTables; @@ -1312,7 +1312,7 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { tscDebug("%p start subquery, total:%d", pSql, pQueryInfo->numOfTables); for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { - SJoinSupporter *pSupporter = tscCreateJoinSupporter(pSql, pState, i); + SJoinSupporter *pSupporter = tscCreateJoinSupporter(pSql, i); if (pSupporter == NULL) { // failed to create support struct, abort current query tscError("%p tableIndex:%d, failed to allocate join support object, abort further query", pSql, i); @@ -1357,8 +1357,8 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { tscQueueAsyncRes(pSql); } -static void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs, SSubqueryState* pState) { - assert(numOfSubs <= pSql->subState.numOfSub && numOfSubs >= 0 && pState != NULL); +static void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs) { + assert(numOfSubs <= pSql->subState.numOfSub && numOfSubs >= 0); for(int32_t i = 0; i < numOfSubs; ++i) { SSqlObj* pSub = pSql->pSubs[i]; @@ -1371,8 +1371,6 @@ static void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs, SSubqueryState taos_free_result(pSub); } - - free(pState); } int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { @@ -1395,9 +1393,10 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - - pSql->subState.numOfSub = pTableMetaInfo->vgroupList->numOfVgroups; - assert(pSql->subState.numOfSub > 0); + SSubqueryState *pState = &pSql->subState; + + pState->numOfSub = pTableMetaInfo->vgroupList->numOfVgroups; + assert(pState->numOfSub > 0); int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, nBufferSize); if (ret != 0) { @@ -1407,26 +1406,24 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { return ret; } - pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES); + pSql->pSubs = calloc(pState->numOfSub, POINTER_BYTES); - tscDebug("%p retrieved query data from %d vnode(s)", pSql, pSql->subState.numOfSub); - SSubqueryState *pState = calloc(1, sizeof(SSubqueryState)); + tscDebug("%p retrieved query data from %d vnode(s)", pSql, pState->numOfSub); - if (pSql->pSubs == NULL || pState == NULL) { - taosTFree(pState); + if (pSql->pSubs == NULL) { taosTFree(pSql->pSubs); pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; - tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pSql->subState.numOfSub); + tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pState->numOfSub); tscQueueAsyncRes(pSql); return ret; } - pSql->subState.numOfRemain = pSql->subState.numOfSub; + pState->numOfRemain = pState->numOfSub; pRes->code = TSDB_CODE_SUCCESS; int32_t i = 0; - for (; i < pSql->subState.numOfSub; ++i) { + for (; i < pState->numOfSub; ++i) { SRetrieveSupport *trs = (SRetrieveSupport *)calloc(1, sizeof(SRetrieveSupport)); if (trs == NULL) { tscError("%p failed to malloc buffer for SRetrieveSupport, orderOfSub:%d, reason:%s", pSql, i, strerror(errno)); @@ -1465,22 +1462,22 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { tscDebug("%p sub:%p create subquery success. orderOfSub:%d", pSql, pNew, trs->subqueryIndex); } - if (i < pSql->subState.numOfSub) { + if (i < pState->numOfSub) { tscError("%p failed to prepare subquery structure and launch subqueries", pSql); pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; - tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pSql->subState.numOfSub); - doCleanupSubqueries(pSql, i, pState); + tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pState->numOfSub); + doCleanupSubqueries(pSql, i); return pRes->code; // free all allocated resource } if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) { - tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pSql->subState.numOfSub); - doCleanupSubqueries(pSql, i, pState); + tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pState->numOfSub); + doCleanupSubqueries(pSql, i); return pRes->code; } - for(int32_t j = 0; j < pSql->subState.numOfSub; ++j) { + for(int32_t j = 0; j < pState->numOfSub; ++j) { SSqlObj* pSub = pSql->pSubs[j]; SRetrieveSupport* pSupport = pSub->param; diff --git a/src/dnode/CMakeLists.txt b/src/dnode/CMakeLists.txt index dea90811ad83206b05e29b0e2468b8057ac8d7a8..5608cfd6d16826e399f9ea41dc026d2ab2459610 100644 --- a/src/dnode/CMakeLists.txt +++ b/src/dnode/CMakeLists.txt @@ -4,6 +4,7 @@ PROJECT(TDengine) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/mnode/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/tsdb/inc) +INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/sync/inc) INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/inc) INCLUDE_DIRECTORIES(inc) diff --git a/src/dnode/src/dnodeVWrite.c b/src/dnode/src/dnodeVWrite.c index 311bebc788bf2ef66343bd44cfdd878d38a45d91..e56bae0d7e84953af493176a585ba3e7060b751b 100644 --- a/src/dnode/src/dnodeVWrite.c +++ b/src/dnode/src/dnodeVWrite.c @@ -24,8 +24,10 @@ #include "twal.h" #include "tdataformat.h" #include "tglobal.h" +#include "tsync.h" #include "vnode.h" #include "dnodeInt.h" +#include "syncInt.h" #include "dnodeVWrite.h" #include "dnodeMgmt.h" @@ -239,6 +241,10 @@ static void *dnodeProcessWriteQueue(void *param) { pHead->len = pWrite->contLen; dDebug("%p, rpc msg:%s will be processed in vwrite queue", pWrite->rpcMsg.ahandle, taosMsg[pWrite->rpcMsg.msgType]); + } else if (type == TAOS_QTYPE_CQ) { + pHead = (SWalHead *)((char*)item + sizeof(SSyncHead)); + dTrace("%p, CQ wal msg:%s will be processed in vwrite queue, version:%" PRIu64, pHead, taosMsg[pHead->msgType], + pHead->version); } else { pHead = (SWalHead *)item; dTrace("%p, wal msg:%s will be processed in vwrite queue, version:%" PRIu64, pHead, taosMsg[pHead->msgType], diff --git a/src/query/inc/qExtbuffer.h b/src/query/inc/qExtbuffer.h index 0bdcf5c45ef6d9ce283d36ccd9d7b6296fc67472..df6e64ddd85c4ec6be693f262ea561ee23f3bf0b 100644 --- a/src/query/inc/qExtbuffer.h +++ b/src/query/inc/qExtbuffer.h @@ -89,7 +89,7 @@ typedef struct SColumnModel { typedef struct SColumnOrderInfo { int32_t numOfCols; - int16_t pData[]; + int16_t colIndex[]; } SColumnOrderInfo; typedef struct tOrderDescriptor { diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 07a4bf5a05895edfd66c33464b1b9323e352c953..f742616b050887f1728d8ea3e7baf51bcdce9a95 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -1537,7 +1537,7 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY if (isNull((const char*) &pQuery->fillVal[colIndex], pCtx->inputType)) { pCtx->param[1].nType = TSDB_DATA_TYPE_NULL; } else { // todo refactor, tVariantCreateFromBinary should handle the NULL value - if(pCtx->inputType != TSDB_DATA_TYPE_BINARY && pCtx->inputType != TSDB_DATA_TYPE_NCHAR) { + if (pCtx->inputType != TSDB_DATA_TYPE_BINARY && pCtx->inputType != TSDB_DATA_TYPE_NCHAR) { tVariantCreateFromBinary(&pCtx->param[1], (char*) &pQuery->fillVal[colIndex], pCtx->inputBytes, pCtx->inputType); } } diff --git a/src/query/src/qExtbuffer.c b/src/query/src/qExtbuffer.c index 1d3120ead47852da3e3ec6bdcc6f92de9c52cb60..fc9c60b39b0cfa4b591cc77c1efcdac4e6647ce9 100644 --- a/src/query/src/qExtbuffer.c +++ b/src/query/src/qExtbuffer.c @@ -343,8 +343,10 @@ static FORCE_INLINE int32_t primaryKeyComparator(int64_t f1, int64_t f2, int32_t if (f1 == f2) { return 0; } - - if (colIdx == 0 && tsOrder == TSDB_ORDER_DESC) { // primary column desc order + + assert(colIdx == 0); + + if (tsOrder == TSDB_ORDER_DESC) { // primary column desc order return (f1 < f2) ? 1 : -1; } else { // asc return (f1 < f2) ? -1 : 1; @@ -435,7 +437,7 @@ int32_t compare_a(tOrderDescriptor *pDescriptor, int32_t numOfRows1, int32_t s1, int32_t cmpCnt = pDescriptor->orderInfo.numOfCols; for (int32_t i = 0; i < cmpCnt; ++i) { - int32_t colIdx = pDescriptor->orderInfo.pData[i]; + int32_t colIdx = pDescriptor->orderInfo.colIndex[i]; char *f1 = COLMODEL_GET_VAL(data1, pDescriptor->pColumnModel, numOfRows1, s1, colIdx); char *f2 = COLMODEL_GET_VAL(data2, pDescriptor->pColumnModel, numOfRows2, s2, colIdx); @@ -467,7 +469,7 @@ int32_t compare_d(tOrderDescriptor *pDescriptor, int32_t numOfRows1, int32_t s1, int32_t cmpCnt = pDescriptor->orderInfo.numOfCols; for (int32_t i = 0; i < cmpCnt; ++i) { - int32_t colIdx = pDescriptor->orderInfo.pData[i]; + int32_t colIdx = pDescriptor->orderInfo.colIndex[i]; char *f1 = COLMODEL_GET_VAL(data1, pDescriptor->pColumnModel, numOfRows1, s1, colIdx); char *f2 = COLMODEL_GET_VAL(data2, pDescriptor->pColumnModel, numOfRows2, s2, colIdx); @@ -557,13 +559,13 @@ static void median(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t sta int32_t midIdx = ((end - start) >> 1) + start; #if defined(_DEBUG_VIEW) - int32_t f = pDescriptor->orderInfo.pData[0]; + int32_t f = pDescriptor->orderInfo.colIndex[0]; char *midx = COLMODEL_GET_VAL(data, pDescriptor->pColumnModel, numOfRows, midIdx, f); char *startx = COLMODEL_GET_VAL(data, pDescriptor->pColumnModel, numOfRows, start, f); char *endx = COLMODEL_GET_VAL(data, pDescriptor->pColumnModel, numOfRows, end, f); - int32_t colIdx = pDescriptor->orderInfo.pData[0]; + int32_t colIdx = pDescriptor->orderInfo.colIndex[0]; tSortDataPrint(pDescriptor->pColumnModel->pFields[colIdx].field.type, "before", startx, midx, endx); #endif @@ -591,7 +593,7 @@ static void median(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t sta } static UNUSED_FUNC void tRowModelDisplay(tOrderDescriptor *pDescriptor, int32_t numOfRows, char *d, int32_t len) { - int32_t colIdx = pDescriptor->orderInfo.pData[0]; + int32_t colIdx = pDescriptor->orderInfo.colIndex[0]; for (int32_t i = 0; i < len; ++i) { char *startx = COLMODEL_GET_VAL(d, pDescriptor->pColumnModel, numOfRows, i, colIdx); @@ -1075,7 +1077,7 @@ tOrderDescriptor *tOrderDesCreate(const int32_t *orderColIdx, int32_t numOfOrder desc->orderInfo.numOfCols = numOfOrderCols; for (int32_t i = 0; i < numOfOrderCols; ++i) { - desc->orderInfo.pData[i] = orderColIdx[i]; + desc->orderInfo.colIndex[i] = orderColIdx[i]; } return desc; diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 8ca71e4555534b874fd173edaf25896d9613decf..a4b18606d188859b86d529207d696d2106daad3e 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -72,7 +72,6 @@ typedef struct STableCheckInfo { SCompInfo* pCompInfo; int32_t compSize; int32_t numOfBlocks; // number of qualified data blocks not the original blocks - SDataCols* pDataCols; int32_t chosen; // indicate which iterator should move forward bool initBuf; // whether to initialize the in-memory skip list iterator or not SSkipListIterator* iter; // mem buffer skip list iterator @@ -117,10 +116,12 @@ typedef struct STsdbQueryHandle { SFileGroupIter fileIter; SRWHelper rhelper; STableBlockInfo* pDataBlockInfo; + + SDataCols *pDataCols; // in order to hold current file data block int32_t allocSize; // allocated data block size - SMemTable* mem; // mem-table - SMemTable* imem; // imem-table, acquired from snapshot - SArray* defaultLoadColumn;// default load column + SMemTable *mem; // mem-table + SMemTable *imem; // imem-table, acquired from snapshot + SArray *defaultLoadColumn;// default load column SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */ SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQuery */ @@ -181,7 +182,72 @@ static SArray* getDefaultLoadColumns(STsdbQueryHandle* pQueryHandle, bool loadTS return pLocalIdList; } -TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, void* qinfo) { +static SArray* createCheckInfoFromTableGroup(STsdbQueryHandle* pQueryHandle, STableGroupInfo* pGroupList, STsdbMeta* pMeta) { + size_t sizeOfGroup = taosArrayGetSize(pGroupList->pGroupList); + assert(sizeOfGroup >= 1 && pMeta != NULL); + + // allocate buffer in order to load data blocks from file + SArray* pTableCheckInfo = taosArrayInit(pGroupList->numOfTables, sizeof(STableCheckInfo)); + if (pTableCheckInfo == NULL) { + return NULL; + } + + // todo apply the lastkey of table check to avoid to load header file + for (int32_t i = 0; i < sizeOfGroup; ++i) { + SArray* group = *(SArray**) taosArrayGet(pGroupList->pGroupList, i); + + size_t gsize = taosArrayGetSize(group); + assert(gsize > 0); + + for (int32_t j = 0; j < gsize; ++j) { + STableKeyInfo* pKeyInfo = (STableKeyInfo*) taosArrayGet(group, j); + + STableCheckInfo info = { .lastKey = pKeyInfo->lastKey, .pTableObj = pKeyInfo->pTable }; + info.tableId = ((STable*)(pKeyInfo->pTable))->tableId; + + assert(info.pTableObj != NULL && (info.pTableObj->type == TSDB_NORMAL_TABLE || + info.pTableObj->type == TSDB_CHILD_TABLE || info.pTableObj->type == TSDB_STREAM_TABLE)); + + info.tableId.tid = info.pTableObj->tableId.tid; + info.tableId.uid = info.pTableObj->tableId.uid; + + if (ASCENDING_TRAVERSE(pQueryHandle->order)) { + assert(info.lastKey >= pQueryHandle->window.skey); + } else { + assert(info.lastKey <= pQueryHandle->window.skey); + } + + taosArrayPush(pTableCheckInfo, &info); + tsdbDebug("%p check table uid:%"PRId64", tid:%d from lastKey:%"PRId64" %p", pQueryHandle, info.tableId.uid, + info.tableId.tid, info.lastKey, pQueryHandle->qinfo); + } + } + + taosArraySort(pTableCheckInfo, tsdbCheckInfoCompar); + return pTableCheckInfo; +} + +static SArray* createCheckInfoFromCheckInfo(SArray* pTableCheckInfo, TSKEY skey) { + size_t si = taosArrayGetSize(pTableCheckInfo); + SArray* pNew = taosArrayInit(si, sizeof(STableCheckInfo)); + if (pNew == NULL) { + return NULL; + } + + for (int32_t j = 0; j < si; ++j) { + STableCheckInfo* pCheckInfo = (STableCheckInfo*) taosArrayGet(pTableCheckInfo, j); + STableCheckInfo info = { .lastKey = skey, .pTableObj = pCheckInfo->pTableObj}; + + info.tableId = pCheckInfo->tableId; + taosArrayPush(pNew, &info); + } + + // it is ordered already, no need to sort again. + taosArraySort(pNew, tsdbCheckInfoCompar); + return pNew; +} + +static STsdbQueryHandle* tsdbQueryTablesImpl(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, void* qinfo) { STsdbQueryHandle* pQueryHandle = calloc(1, sizeof(STsdbQueryHandle)); if (pQueryHandle == NULL) { goto out_of_memory; @@ -205,9 +271,7 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab } tsdbTakeMemSnapshot(pQueryHandle->pTsdb, &pQueryHandle->mem, &pQueryHandle->imem); - - size_t sizeOfGroup = taosArrayGetSize(groupList->pGroupList); - assert(sizeOfGroup >= 1 && pCond != NULL && pCond->numOfCols > 0); + assert(pCond != NULL && pCond->numOfCols > 0); if (ASCENDING_TRAVERSE(pCond->order)) { assert(pQueryHandle->window.skey <= pQueryHandle->window.ekey); @@ -216,21 +280,19 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab } // allocate buffer in order to load data blocks from file - int32_t numOfCols = pCond->numOfCols; - - pQueryHandle->statis = calloc(numOfCols, sizeof(SDataStatis)); + pQueryHandle->statis = calloc(pCond->numOfCols, sizeof(SDataStatis)); if (pQueryHandle->statis == NULL) { goto out_of_memory; } - pQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); // todo: use list instead of array? + pQueryHandle->pColumns = taosArrayInit(pCond->numOfCols, sizeof(SColumnInfoData)); // todo: use list instead of array? if (pQueryHandle->pColumns == NULL) { goto out_of_memory; } - for (int32_t i = 0; i < numOfCols; ++i) { + for (int32_t i = 0; i < pCond->numOfCols; ++i) { SColumnInfoData colInfo = {{0}, 0}; - + colInfo.info = pCond->colList[i]; colInfo.pData = calloc(1, EXTRA_BYTES + pQueryHandle->outputCapacity * pCond->colList[i].bytes); if (colInfo.pData == NULL) { @@ -240,61 +302,47 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab pQueryHandle->statis[i].colId = colInfo.info.colId; } - pQueryHandle->pTableCheckInfo = taosArrayInit(groupList->numOfTables, sizeof(STableCheckInfo)); - if (pQueryHandle->pTableCheckInfo == NULL) { - goto out_of_memory; - } + pQueryHandle->defaultLoadColumn = getDefaultLoadColumns(pQueryHandle, true); STsdbMeta* pMeta = tsdbGetMeta(tsdb); - assert(pMeta != NULL && sizeOfGroup >= 1 && pCond != NULL && pCond->numOfCols > 0); - - // todo apply the lastkey of table check to avoid to load header file - for (int32_t i = 0; i < sizeOfGroup; ++i) { - SArray* group = *(SArray**) taosArrayGet(groupList->pGroupList, i); - - size_t gsize = taosArrayGetSize(group); - assert(gsize > 0); - - for (int32_t j = 0; j < gsize; ++j) { - STableKeyInfo* pKeyInfo = (STableKeyInfo*) taosArrayGet(group, j); - - STableCheckInfo info = { .lastKey = pKeyInfo->lastKey, .pTableObj = pKeyInfo->pTable }; - info.tableId = ((STable*)(pKeyInfo->pTable))->tableId; + assert(pMeta != NULL); - assert(info.pTableObj != NULL && (info.pTableObj->type == TSDB_NORMAL_TABLE || - info.pTableObj->type == TSDB_CHILD_TABLE || info.pTableObj->type == TSDB_STREAM_TABLE)); - - info.tableId.tid = info.pTableObj->tableId.tid; - info.tableId.uid = info.pTableObj->tableId.uid; - - if (ASCENDING_TRAVERSE(pQueryHandle->order)) { - assert(info.lastKey >= pQueryHandle->window.skey); - } else { - assert(info.lastKey <= pQueryHandle->window.skey); - } - - taosArrayPush(pQueryHandle->pTableCheckInfo, &info); - tsdbDebug("%p check table uid:%"PRId64", tid:%d from lastKey:%"PRId64" %p", pQueryHandle, info.tableId.uid, - info.tableId.tid, info.lastKey, qinfo); - } + pQueryHandle->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pQueryHandle->pTsdb->config.maxRowsPerFileBlock); + if (pQueryHandle->pDataCols == NULL) { + tsdbError("%p failed to malloc buf for pDataCols, %p", pQueryHandle, pQueryHandle->qinfo); + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + goto out_of_memory; } - - taosArraySort(pQueryHandle->pTableCheckInfo, tsdbCheckInfoCompar); - pQueryHandle->defaultLoadColumn = getDefaultLoadColumns(pQueryHandle, true); - - tsdbDebug("%p total numOfTable:%" PRIzu " in query, %p", pQueryHandle, taosArrayGetSize(pQueryHandle->pTableCheckInfo), pQueryHandle->qinfo); tsdbInitDataBlockLoadInfo(&pQueryHandle->dataBlockLoadInfo); tsdbInitCompBlockLoadInfo(&pQueryHandle->compBlockLoadInfo); return (TsdbQueryHandleT) pQueryHandle; -out_of_memory: + out_of_memory: tsdbCleanupQueryHandle(pQueryHandle); terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return NULL; } +TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, void* qinfo) { + STsdbQueryHandle* pQueryHandle = tsdbQueryTablesImpl(tsdb, pCond, qinfo); + + STsdbMeta* pMeta = tsdbGetMeta(tsdb); + assert(pMeta != NULL); + + // todo apply the lastkey of table check to avoid to load header file + pQueryHandle->pTableCheckInfo = createCheckInfoFromTableGroup(pQueryHandle, groupList, pMeta); + if (pQueryHandle->pTableCheckInfo == NULL) { + tsdbCleanupQueryHandle(pQueryHandle); + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return NULL; + } + + tsdbDebug("%p total numOfTable:%" PRIzu " in query, %p", pQueryHandle, taosArrayGetSize(pQueryHandle->pTableCheckInfo), pQueryHandle->qinfo); + return (TsdbQueryHandleT) pQueryHandle; +} + TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, void* qinfo) { pCond->twindow = changeTableGroupByLastrow(groupList); @@ -689,22 +737,10 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo } static int32_t doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo, int32_t slotIndex) { - STsdbRepo *pRepo = pQueryHandle->pTsdb; - int64_t st = taosGetTimestampUs(); - - if (pCheckInfo->pDataCols == NULL) { - STsdbMeta* pMeta = tsdbGetMeta(pRepo); - - pCheckInfo->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock); - if (pCheckInfo->pDataCols == NULL) { - tsdbError("%p failed to malloc buf for pDataCols, %p", pQueryHandle, pQueryHandle->qinfo); - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - goto _error; - } - } + int64_t st = taosGetTimestampUs(); - STSchema* pSchema = tsdbGetTableSchema(pCheckInfo->pTableObj); - int32_t code = tdInitDataCols(pCheckInfo->pDataCols, pSchema); + STSchema *pSchema = tsdbGetTableSchema(pCheckInfo->pTableObj); + int32_t code = tdInitDataCols(pQueryHandle->pDataCols, pSchema); if (code != TSDB_CODE_SUCCESS) { tsdbError("%p failed to malloc buf for pDataCols, %p", pQueryHandle, pQueryHandle->qinfo); terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; @@ -1924,77 +1960,33 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { pQueryHandle->type = TSDB_QUERY_TYPE_ALL; return true; } else { - STsdbQueryHandle* pSecQueryHandle = calloc(1, sizeof(STsdbQueryHandle)); - if (pSecQueryHandle == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + STimeWindow win = (STimeWindow) {pQueryHandle->window.skey, INT64_MAX}; + STsdbQueryCond cond = { + .order = TSDB_ORDER_ASC, + .numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pQueryHandle)), + .twindow = win}; + + cond.colList = calloc(cond.numOfCols, sizeof(SColumnInfo)); + if (cond.colList == NULL) { + terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; return false; } - pSecQueryHandle->order = TSDB_ORDER_ASC; - pSecQueryHandle->window = (STimeWindow) {pQueryHandle->window.skey, INT64_MAX}; - pSecQueryHandle->pTsdb = pQueryHandle->pTsdb; - pSecQueryHandle->type = TSDB_QUERY_TYPE_ALL; - pSecQueryHandle->cur.fid = -1; - pSecQueryHandle->cur.win = TSWINDOW_INITIALIZER; - pSecQueryHandle->checkFiles = true; - pSecQueryHandle->activeIndex = 0; - pSecQueryHandle->outputCapacity = ((STsdbRepo*)pSecQueryHandle->pTsdb)->config.maxRowsPerFileBlock; - - if (tsdbInitReadHelper(&pSecQueryHandle->rhelper, (STsdbRepo*) pSecQueryHandle->pTsdb) != 0) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - free(pSecQueryHandle); - return false; + for(int32_t i = 0; i < cond.numOfCols; ++i) { + SColumnInfoData* pColInfoData = taosArrayGet(pQueryHandle->pColumns, i); + memcpy(&cond.colList[i], &pColInfoData->info, sizeof(SColumnInfo)); } - tsdbTakeMemSnapshot(pSecQueryHandle->pTsdb, &pSecQueryHandle->mem, &pSecQueryHandle->imem); + STsdbQueryHandle* pSecQueryHandle = tsdbQueryTablesImpl(pQueryHandle->pTsdb, &cond, pQueryHandle->qinfo); - // allocate buffer in order to load data blocks from file - int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pQueryHandle)); + taosTFree(cond.colList); - pSecQueryHandle->statis = calloc(numOfCols, sizeof(SDataStatis)); - pSecQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); - if (pSecQueryHandle->statis == NULL || pSecQueryHandle->pColumns == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + pSecQueryHandle->pTableCheckInfo = createCheckInfoFromCheckInfo(pQueryHandle->pTableCheckInfo, pSecQueryHandle->window.skey); + if (pSecQueryHandle->pTableCheckInfo == NULL) { tsdbCleanupQueryHandle(pSecQueryHandle); return false; } - for (int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData colInfo = {{0}, 0}; - SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i); - - colInfo.info = pCol->info; - colInfo.pData = calloc(1, EXTRA_BYTES + pQueryHandle->outputCapacity * pCol->info.bytes); - if (colInfo.pData == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - tsdbCleanupQueryHandle(pSecQueryHandle); - return false; - } - - taosArrayPush(pSecQueryHandle->pColumns, &colInfo); - } - - size_t si = taosArrayGetSize(pQueryHandle->pTableCheckInfo); - pSecQueryHandle->pTableCheckInfo = taosArrayInit(si, sizeof(STableCheckInfo)); - STsdbMeta* pMeta = tsdbGetMeta(pQueryHandle->pTsdb); - assert(pMeta != NULL); - - for (int32_t j = 0; j < si; ++j) { - STableCheckInfo* pCheckInfo = (STableCheckInfo*) taosArrayGet(pQueryHandle->pTableCheckInfo, j); - STableCheckInfo info = { - .lastKey = pSecQueryHandle->window.skey, - .pTableObj = pCheckInfo->pTableObj, - }; - - info.tableId = pCheckInfo->tableId; - - taosArrayPush(pSecQueryHandle->pTableCheckInfo, &info); - } - - tsdbInitDataBlockLoadInfo(&pSecQueryHandle->dataBlockLoadInfo); - tsdbInitCompBlockLoadInfo(&pSecQueryHandle->compBlockLoadInfo); - pSecQueryHandle->defaultLoadColumn = taosArrayClone(pQueryHandle->defaultLoadColumn); - if (!tsdbNextDataBlock((void*) pSecQueryHandle)) { tsdbCleanupQueryHandle(pSecQueryHandle); return false; @@ -2003,6 +1995,9 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { tsdbRetrieveDataBlockInfo((void*) pSecQueryHandle, &blockInfo); tsdbRetrieveDataBlock((void*) pSecQueryHandle, pSecQueryHandle->defaultLoadColumn); + int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pSecQueryHandle)); + size_t si = taosArrayGetSize(pSecQueryHandle->pTableCheckInfo); + for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i); memcpy((char*)pCol->pData, (char*)pCol->pData + pCol->info.bytes * (pQueryHandle->cur.rows - 1), pCol->info.bytes); @@ -2016,11 +2011,11 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { SColumnInfoData* pTSCol = taosArrayGet(pQueryHandle->pColumns, 0); // it is ascending order - pQueryHandle->cur.win = (STimeWindow){((TSKEY*)pTSCol->pData)[0], ((TSKEY*)pTSCol->pData)[1]}; + pQueryHandle->order = TSDB_ORDER_DESC; pQueryHandle->window = pQueryHandle->cur.win; + pQueryHandle->cur.win = (STimeWindow){((TSKEY*)pTSCol->pData)[0], ((TSKEY*)pTSCol->pData)[1]}; pQueryHandle->cur.rows = 2; pQueryHandle->cur.mixBlock = true; - pQueryHandle->order = TSDB_ORDER_DESC; int32_t step = -1;// one step for ascending order traverse for (int32_t j = 0; j < si; ++j) { @@ -2686,8 +2681,6 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) { STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i); destroyTableMemIterator(pTableCheckInfo); - tdFreeDataCols(pTableCheckInfo->pDataCols); - pTableCheckInfo->pDataCols = NULL; taosTFree(pTableCheckInfo->pCompInfo); } taosArrayDestroy(pQueryHandle->pTableCheckInfo); @@ -2711,6 +2704,9 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) { tsdbDestroyHelper(&pQueryHandle->rhelper); + tdFreeDataCols(pQueryHandle->pDataCols); + pQueryHandle->pDataCols = NULL; + SIOCostSummary* pCost = &pQueryHandle->cost; tsdbDebug("%p :io-cost summary: statis-info:%"PRId64" us, datablock:%" PRId64" us, check data:%"PRId64" us, %p", pQueryHandle, pCost->statisInfoLoadTime, pCost->blockLoadTime, pCost->checkForNextTime, pQueryHandle->qinfo); diff --git a/src/vnode/CMakeLists.txt b/src/vnode/CMakeLists.txt index 09cfa632fbfaaa77b32c09b49f051c0714b33564..de0cdb028b5f9231f57c78fa3411714c93de85c7 100644 --- a/src/vnode/CMakeLists.txt +++ b/src/vnode/CMakeLists.txt @@ -4,6 +4,7 @@ PROJECT(TDengine) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/tsdb/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/dnode/inc) +INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/sync/inc) INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/inc) INCLUDE_DIRECTORIES(inc) AUX_SOURCE_DIRECTORY(src SRC) diff --git a/src/vnode/inc/vnodeInt.h b/src/vnode/inc/vnodeInt.h index 38f7c8e6056644591993027b410283b41fb8560c..169334c6119a35f53fd0a69dd1f7cb6952fbb727 100644 --- a/src/vnode/inc/vnodeInt.h +++ b/src/vnode/inc/vnodeInt.h @@ -62,6 +62,7 @@ typedef struct { } SVnodeObj; int vnodeWriteToQueue(void *param, void *pHead, int type); +int vnodeWriteCqMsgToQueue(void *param, void *pHead, int type); void vnodeInitWriteFp(void); void vnodeInitReadFp(void); diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 0c1f9d103b6f355a77521a3f9c20372f24db29d0..a9bcf948b6cb00c25a0b283a0cb63126925abd8c 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -259,7 +259,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { strcpy(cqCfg.pass, tsInternalPass); strcpy(cqCfg.db, pVnode->db); cqCfg.vgId = vnode; - cqCfg.cqWrite = vnodeWriteToQueue; + cqCfg.cqWrite = vnodeWriteCqMsgToQueue; pVnode->cq = cqOpen(pVnode, &cqCfg); if (pVnode->cq == NULL) { vnodeCleanUp(pVnode); diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 70b08b9669d0a66f3f06596305d10091e7c1d36a..c4924f312f4bf16b6c536d58dba75d92d1f75c85 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -22,9 +22,11 @@ #include "tutil.h" #include "tsdb.h" #include "twal.h" +#include "tsync.h" #include "tdataformat.h" #include "vnode.h" #include "vnodeInt.h" +#include "syncInt.h" #include "tcq.h" static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *, SRspRet *); @@ -189,6 +191,25 @@ static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspR return TSDB_CODE_SUCCESS; } + +int vnodeWriteCqMsgToQueue(void *param, void *data, int type) { + SVnodeObj *pVnode = param; + SWalHead * pHead = data; + + int size = sizeof(SWalHead) + pHead->len; + SSyncHead *pSync = (SSyncHead*) taosAllocateQitem(size + sizeof(SSyncHead)); + SWalHead *pWal = (SWalHead *)(pSync + 1); + memcpy(pWal, pHead, size); + + atomic_add_fetch_32(&pVnode->refCount, 1); + vDebug("CQ: vgId:%d, get vnode wqueue, refCount:%d", pVnode->vgId, pVnode->refCount); + + taosWriteQitem(pVnode->wqueue, type, pSync); + + return 0; +} + + int vnodeWriteToQueue(void *param, void *data, int type) { SVnodeObj *pVnode = param; SWalHead * pHead = data;