diff --git a/src/client/src/tscSecondaryMerge.c b/src/client/src/tscSecondaryMerge.c index 575f7ee8f43a4a4bc7c7aeec8f062e0a829bac63..251d4079e3cc186636d032427d2d106c218f47ee 100644 --- a/src/client/src/tscSecondaryMerge.c +++ b/src/client/src/tscSecondaryMerge.c @@ -808,18 +808,19 @@ void adjustLoserTreeFromNewData(SLocalReducer *pLocalReducer, SLocalDataSource * } } -void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo *pQueryInfo, - SFillInfo *pFillInfo) { +void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo *pQueryInfo, SFillInfo *pFillInfo) { // discard following dataset in the same group and reset the interpolation information STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); - int16_t prec = tinfo.precision; - int64_t stime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.skey : pQueryInfo->window.ekey; - int64_t revisedSTime = - taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, prec); - taosResetFillInfo(pFillInfo, revisedSTime); + if (pFillInfo != NULL) { + int64_t stime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.skey : pQueryInfo->window.ekey; + int64_t revisedSTime = + taosGetIntervalStartTimestamp(stime, pQueryInfo->slidingTime, pQueryInfo->slidingTimeUnit, tinfo.precision); + + taosResetFillInfo(pFillInfo, revisedSTime); + } pLocalReducer->discard = true; pLocalReducer->discardData->num = 0; @@ -915,13 +916,12 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo if (pQueryInfo->limit.limit >= 0 && pRes->numOfClauseTotal > pQueryInfo->limit.limit) { /* impose the limitation of output rows on the final result */ int32_t prevSize = pFinalDataPage->num; - int32_t overFlow = pRes->numOfClauseTotal - pQueryInfo->limit.limit; - - assert(overFlow < pRes->numOfRows); + int32_t overflow = pRes->numOfClauseTotal - pQueryInfo->limit.limit; + assert(overflow < pRes->numOfRows); pRes->numOfClauseTotal = pQueryInfo->limit.limit; - pRes->numOfRows -= overFlow; - pFinalDataPage->num -= overFlow; + pRes->numOfRows -= overflow; + pFinalDataPage->num -= overflow; tColModelCompact(pLocalReducer->resColModel, pFinalDataPage, prevSize); @@ -988,13 +988,13 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo if (pRes->numOfRows > 0) { if (pQueryInfo->limit.limit >= 0 && pRes->numOfClauseTotal > pQueryInfo->limit.limit) { - int32_t overFlow = pRes->numOfClauseTotal - pQueryInfo->limit.limit; - pRes->numOfRows -= overFlow; + int32_t overflow = pRes->numOfClauseTotal - pQueryInfo->limit.limit; + pRes->numOfRows -= overflow; assert(pRes->numOfRows >= 0); pRes->numOfClauseTotal = pQueryInfo->limit.limit; - pFinalDataPage->num -= overFlow; + pFinalDataPage->num -= overflow; /* set remain data to be discarded, and reset the interpolation information */ savePrevRecordAndSetupInterpoInfo(pLocalReducer, pQueryInfo, pFillInfo); diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index 51a5dad4869f3d464d83397157d9fda4482dc6a9..956121086ca5f8fc13c716428156d2754f9fb12b 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -106,9 +106,28 @@ typedef void *SDataRow; SDataRow tdNewDataRowFromSchema(STSchema *pSchema); void tdFreeDataRow(SDataRow row); void tdInitDataRow(SDataRow row, STSchema *pSchema); -int tdAppendColVal(SDataRow row, void *value, int8_t type, int32_t bytes, int32_t offset); SDataRow tdDataRowDup(SDataRow row); +static FORCE_INLINE int tdAppendColVal(SDataRow row, void *value, int8_t type, int32_t bytes, int32_t offset) { + ASSERT(value != NULL); + int32_t toffset = offset + TD_DATA_ROW_HEAD_SIZE; + char * ptr = (char *)POINTER_SHIFT(row, dataRowLen(row)); + + switch (type) { + case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_NCHAR: + *(VarDataOffsetT *)POINTER_SHIFT(row, toffset) = dataRowLen(row); + memcpy(ptr, value, varDataTLen(value)); + dataRowLen(row) += varDataTLen(value); + break; + default: + memcpy(POINTER_SHIFT(row, toffset), value, TYPE_BYTES[type]); + break; + } + + return 0; +} + // NOTE: offset here including the header size static FORCE_INLINE void *tdGetRowDataOfCol(SDataRow row, int8_t type, int32_t offset) { switch (type) { diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index 9d81cd07af47ec20b727fc0a1e945bb289823964..bd082db16c4dab6db04ea4f322b8ceccb21b58e7 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -158,32 +158,6 @@ void tdFreeDataRow(SDataRow row) { if (row) free(row); } -/** - * Append a column value to the data row - * @param type: column type - * @param bytes: column bytes - * @param offset: offset in the data row tuple, not including the data row header - */ -int tdAppendColVal(SDataRow row, void *value, int8_t type, int32_t bytes, int32_t offset) { - ASSERT(value != NULL); - int32_t toffset = offset + TD_DATA_ROW_HEAD_SIZE; - char * ptr = POINTER_SHIFT(row, dataRowLen(row)); - - switch (type) { - case TSDB_DATA_TYPE_BINARY: - case TSDB_DATA_TYPE_NCHAR: - *(VarDataOffsetT *)POINTER_SHIFT(row, toffset) = dataRowLen(row); - memcpy(ptr, value, varDataTLen(value)); - dataRowLen(row) += varDataTLen(value); - break; - default: - memcpy(POINTER_SHIFT(row, toffset), value, TYPE_BYTES[type]); - break; - } - - return 0; -} - SDataRow tdDataRowDup(SDataRow row) { SDataRow trow = malloc(dataRowLen(row)); if (trow == NULL) return NULL; diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index f444b7b5c87f6feee64e056f4c4696d1141d0397..6b348b7fc7e7cf2aee7fb8210cb715f2bfb27ba9 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -140,7 +140,7 @@ int32_t rpcDebugFlag = 135; int32_t uDebugFlag = 131; int32_t debugFlag = 131; int32_t sDebugFlag = 135; -int32_t tsdbDebugFlag = 135; +int32_t tsdbDebugFlag = 131; // the maximum number of results for projection query on super table that are returned from // one virtual node, to order according to timestamp diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index e4f3142b8962ea41f7661968827fa18073860d14..7935bb7ff5f8660e021818a8b4f4b1887ed71194 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -40,6 +40,7 @@ typedef struct { int num; // number of continuous streams struct SCqObj *pHead; void *dbConn; + int master; pthread_mutex_t mutex; } SCqContext; @@ -58,6 +59,7 @@ typedef struct SCqObj { int cqDebugFlag = 135; static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row); +static void cqCreateStream(SCqContext *pContext, SCqObj *pObj); void *cqOpen(void *ahandle, const SCqCfg *pCfg) { @@ -69,6 +71,7 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) { pContext->vgId = pCfg->vgId; pContext->cqWrite = pCfg->cqWrite; pContext->ahandle = ahandle; + tscEmbedded = 1; pthread_mutex_init(&pContext->mutex, NULL); @@ -84,6 +87,8 @@ void cqClose(void *handle) { cqStop(pContext); // free all resources + pthread_mutex_lock(&pContext->mutex); + SCqObj *pObj = pContext->pHead; while (pObj) { SCqObj *pTemp = pObj; @@ -91,6 +96,8 @@ void cqClose(void *handle) { free(pTemp); } + pthread_mutex_unlock(&pContext->mutex); + pthread_mutex_destroy(&pContext->mutex); cTrace("vgId:%d, CQ is closed", pContext->vgId); @@ -100,28 +107,15 @@ void cqClose(void *handle) { void cqStart(void *handle) { SCqContext *pContext = handle; cTrace("vgId:%d, start all CQs", pContext->vgId); - if (pContext->dbConn) return; + if (pContext->dbConn || pContext->master) return; pthread_mutex_lock(&pContext->mutex); - tscEmbedded = 1; - pContext->dbConn = taos_connect("localhost", pContext->user, pContext->pass, NULL, 0); - if (pContext->dbConn == NULL) { - cError("vgId:%d, failed to connect to TDengine(%s)", pContext->vgId, tstrerror(terrno)); - pthread_mutex_unlock(&pContext->mutex); - return; - } + pContext->master = 1; SCqObj *pObj = pContext->pHead; while (pObj) { - int64_t lastKey = 0; - pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, lastKey, pObj, NULL); - if (pObj->pStream) { - pContext->num++; - cTrace("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr); - } else { - cError("vgId:%d, id:%d CQ:%s, failed to open", pContext->vgId, pObj->tid, pObj->sqlStr); - } + cqCreateStream(pContext, pObj); pObj = pObj->next; } @@ -131,10 +125,11 @@ void cqStart(void *handle) { void cqStop(void *handle) { SCqContext *pContext = handle; cTrace("vgId:%d, stop all CQs", pContext->vgId); - if (pContext->dbConn == NULL) return; + if (pContext->dbConn == NULL || pContext->master == 0) return; pthread_mutex_lock(&pContext->mutex); + pContext->master = 0; SCqObj *pObj = pContext->pHead; while (pObj) { if (pObj->pStream) { @@ -176,16 +171,7 @@ void *cqCreate(void *handle, int tid, char *sqlStr, SSchema *pSchema, int column if (pContext->pHead) pContext->pHead->prev = pObj; pContext->pHead = pObj; - if (pContext->dbConn) { - int64_t lastKey = 0; - pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, lastKey, pObj, NULL); - if (pObj->pStream) { - pContext->num++; - cTrace("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr); - } else { - cError("vgId:%d, id:%d CQ:%s, failed to launch", pContext->vgId, pObj->tid, pObj->sqlStr); - } - } + cqCreateStream(pContext, pObj); pthread_mutex_unlock(&pContext->mutex); @@ -218,6 +204,26 @@ void cqDrop(void *handle) { pthread_mutex_lock(&pContext->mutex); } +static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { + + if (pContext->dbConn == NULL) { + pContext->dbConn = taos_connect("localhost", pContext->user, pContext->pass, NULL, 0); + if (pContext->dbConn == NULL) { + cError("vgId:%d, failed to connect to TDengine(%s)", pContext->vgId, tstrerror(terrno)); + } + return; + } + + int64_t lastKey = 0; + pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, lastKey, pObj, NULL); + if (pObj->pStream) { + pContext->num++; + cTrace("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr); + } else { + cError("vgId:%d, id:%d CQ:%s, failed to open", pContext->vgId, pObj->tid, pObj->sqlStr); + } +} + static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { SCqObj *pObj = (SCqObj *)param; SCqContext *pContext = pObj->pContext; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 8c8c89bdd1817398b66c6ea5bbcaf3ec5f780b16..cdb847654624753fb617c0ca393766d89604c000 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -3030,11 +3030,13 @@ static void updateTableQueryInfoForReverseScan(SQuery *pQuery, STableQueryInfo * // order has change already! int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); - if (!QUERY_IS_ASC_QUERY(pQuery)) { - assert(pTableQueryInfo->win.ekey >= pTableQueryInfo->lastKey + step); - } else { - assert(pTableQueryInfo->win.ekey <= pTableQueryInfo->lastKey + step); - } + + // TODO validate the assertion +// if (!QUERY_IS_ASC_QUERY(pQuery)) { +// assert(pTableQueryInfo->win.ekey >= pTableQueryInfo->lastKey + step); +// } else { +// assert(pTableQueryInfo->win.ekey <= pTableQueryInfo->lastKey + step); +// } pTableQueryInfo->win.ekey = pTableQueryInfo->lastKey + step; @@ -3113,7 +3115,7 @@ void disableFuncInReverseScan(SQInfo *pQInfo) { void switchCtxOrder(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - SWITCH_ORDER(pRuntimeEnv->pCtx[i] .order); + SWITCH_ORDER(pRuntimeEnv->pCtx[i].order); } } @@ -4384,10 +4386,11 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) { SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis); - TSKEY nextKey = blockInfo.window.skey; if (!isIntervalQuery(pQuery)) { - setExecutionContext(pQInfo, &pTableQueryInfo->id, pTableQueryInfo->groupIdx, nextKey); + int32_t step = QUERY_IS_ASC_QUERY(pQuery)? 1:-1; + setExecutionContext(pQInfo, &pTableQueryInfo->id, pTableQueryInfo->groupIdx, blockInfo.window.ekey + step); } else { // interval query + TSKEY nextKey = blockInfo.window.skey; setIntervalQueryRange(pQInfo, nextKey); int32_t ret = setAdditionalInfo(pQInfo, &pTableQueryInfo->id, pTableQueryInfo); diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 7477838e72d68a6843d0a43fa9480cac093168a6..de8304aa6251782a97ac6866f3fb8174ca294251 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -16,7 +16,7 @@ static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable); static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable); static int tsdbEstimateTableEncodeSize(STable *pTable); -static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable); +static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable, bool rmFromIdx); /** * Encode a TSDB table object as a binary content @@ -411,7 +411,7 @@ int tsdbDropTable(TsdbRepoT *repo, STableId tableId) { tsdbTrace("vgId:%d, table %s is dropped! tid:%d, uid:%" PRId64, pRepo->config.tsdbId, varDataVal(pTable->name), tableId.tid, tableId.uid); - if (tsdbRemoveTableFromMeta(pMeta, pTable) < 0) return -1; + if (tsdbRemoveTableFromMeta(pMeta, pTable, true) < 0) return -1; return 0; @@ -501,7 +501,7 @@ static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) { return 0; } -static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable) { +static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable, bool rmFromIdx) { if (pTable->type == TSDB_SUPER_TABLE) { SSkipListIterator *pIter = tSkipListCreateIter(pTable->pIndex); while (tSkipListIterNext(pIter)) { @@ -510,7 +510,7 @@ static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable) { ASSERT(tTable != NULL && tTable->type == TSDB_CHILD_TABLE); - tsdbRemoveTableFromMeta(pMeta, tTable); + tsdbRemoveTableFromMeta(pMeta, tTable, false); } tSkipListDestroyIter(pIter); @@ -526,7 +526,7 @@ static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable) { } } else { pMeta->tables[pTable->tableId.tid] = NULL; - if (pTable->type == TSDB_CHILD_TABLE) { + if (pTable->type == TSDB_CHILD_TABLE && rmFromIdx) { tsdbRemoveTableFromIndex(pMeta, pTable); } diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 115a32567c734ff5182554ce8111a5eaa50e9f41..d3d890f36115faf65659bfd1670ee1c65a597732 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -545,9 +545,10 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock /*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo); TSKEY k1 = TSKEY_INITIAL_VAL, k2 = TSKEY_INITIAL_VAL; - if (pCheckInfo->iter != NULL) { + if (pCheckInfo->iter != NULL && tSkipListIterGet(pCheckInfo->iter) != NULL) { SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter); - SDataRow row = SL_GET_NODE_DATA(node); + + SDataRow row = SL_GET_NODE_DATA(node); k1 = dataRowKey(row); if (k1 == binfo.window.skey) { @@ -561,9 +562,10 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock } } - if (pCheckInfo->iiter != NULL) { + if (pCheckInfo->iiter != NULL && tSkipListIterGet(pCheckInfo->iiter) != NULL) { SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter); - SDataRow row = SL_GET_NODE_DATA(node); + + SDataRow row = SL_GET_NODE_DATA(node); k2 = dataRowKey(row); if (k2 == binfo.window.skey) { @@ -583,6 +585,12 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock mergeDataInDataBlock(pQueryHandle, pCheckInfo, pBlock, sa); } else { pQueryHandle->realNumOfRows = binfo.rows; + + cur->rows = binfo.rows; + cur->win = binfo.window; + cur->mixBlock = false; + cur->blockCompleted = true; + cur->lastKey = binfo.window.ekey + (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)? 1:-1); } } } else { //desc order @@ -914,7 +922,10 @@ static void mergeDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC; int32_t end = vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfPoints, key, order); - + if (tsArray[end] == key) { // the value of key in cache equals to the end timestamp value, ignore it + tSkipListIterNext(pCheckInfo->iter); + } + int32_t start = -1; if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) { int32_t remain = end - pos + 1; diff --git a/tests/How-To-Run-Test-And-How-To-Add-New-Test-Case.md b/tests/How-To-Run-Test-And-How-To-Add-New-Test-Case.md index 7c42d47d1b9c95a863539e3dbe1f4b94abf6c753..b285fe815596a55d8e6689aeab1401f4388e94ec 100644 --- a/tests/How-To-Run-Test-And-How-To-Add-New-Test-Case.md +++ b/tests/How-To-Run-Test-And-How-To-Add-New-Test-Case.md @@ -11,6 +11,19 @@ 4. pip install src/connector/python/linux/python2 ; pip3 install src/connector/python/linux/python3 +> Note: Both Python2 and Python3 are currently supported by the Python test +> framework. Since Python2 is no longer officially supported by Python Software +> Foundation since January 1, 2020, it is recommended that subsequent test case +> development be guaranteed to run correctly on Python3. + +> For Python2, please consider being compatible if appropriate without +> additional burden. +> +> If you use some new Linux distribution like Ubuntu 20.04 which already do not +> include Python2, please do not install Python2-related packages. +> +>   + ### How to run Python test suite 1. cd \/tests/pytest @@ -211,13 +224,6 @@ def checkAffectedRows(self, expectAffectedRows): ... -> Note: Both Python2 and Python3 are currently supported by the Python test -> case. Since Python2 is no longer officially supported by January 1, 2020, it -> is recommended that subsequent test case development be guaranteed to run -> correctly on Python3. For Python2, please consider being compatible if -> appropriate without additional -> burden.   - ### CI submission adoption principle. - Every commit / PR compilation must pass. Currently, the warning is treated