diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index fe38225ac8599fb650b3c47a5e39f4a245bf3f09..c4396387b199f20553aadb83f7ece4c69f6dd33f 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -122,7 +122,7 @@ int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const return 0; } -int32_t colDataReserve(SColumnInfoData* pColumnInfoData, size_t newSize) { +static int32_t colDataReserve(SColumnInfoData* pColumnInfoData, size_t newSize) { if (!IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { return TSDB_CODE_SUCCESS; } @@ -1677,101 +1677,6 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) { return buf; } -#if 0 -void blockDebugShowDataBlock(SSDataBlock* pBlock, const char* flag) { - SArray* dataBlocks = taosArrayInit(1, sizeof(SSDataBlock*)); - taosArrayPush(dataBlocks, &pBlock); - blockDebugShowDataBlocks(dataBlocks, flag); - taosArrayDestroy(dataBlocks); -} - -void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag) { - char pBuf[128] = {0}; - int32_t sz = taosArrayGetSize(dataBlocks); - for (int32_t i = 0; i < sz; i++) { - SSDataBlock* pDataBlock = taosArrayGet(dataBlocks, i); - size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); - - int32_t rows = pDataBlock->info.rows; - printf("%s |block ver %" PRIi64 " |block type %d |child id %d|group id %" PRIu64 "\n", flag, - pDataBlock->info.version, (int32_t)pDataBlock->info.type, pDataBlock->info.childId, - pDataBlock->info.id.groupId); - for (int32_t j = 0; j < rows; j++) { - printf("%s |", flag); - for (int32_t k = 0; k < numOfCols; k++) { - SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k); - void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes); - if (k == 0) { - printf("cols:%d |", (int32_t)numOfCols); - } - if (colDataIsNull(pColInfoData, rows, j, NULL)) { - printf(" %15s |", "NULL"); - continue; - } - - switch (pColInfoData->info.type) { - case TSDB_DATA_TYPE_TIMESTAMP: - formatTimestamp(pBuf, *(uint64_t*)var, TSDB_TIME_PRECISION_MILLI); - printf(" %25s |", pBuf); - break; - case TSDB_DATA_TYPE_BOOL: - printf(" %15" PRIi8 " |", *(int8_t*)var); - break; - case TSDB_DATA_TYPE_TINYINT: - printf(" %15" PRIi8 " |", *(int8_t*)var); - break; - case TSDB_DATA_TYPE_SMALLINT: - printf(" %15" PRIi16 " |", *(int16_t*)var); - break; - case TSDB_DATA_TYPE_INT: - printf(" %15d |", *(int32_t*)var); - break; - case TSDB_DATA_TYPE_UTINYINT: - printf(" %15" PRIu8 " |", *(uint8_t*)var); - break; - case TSDB_DATA_TYPE_USMALLINT: - printf(" %15" PRIu16 " |", *(uint16_t*)var); - break; - case TSDB_DATA_TYPE_UINT: - printf(" %15u |", *(uint32_t*)var); - break; - case TSDB_DATA_TYPE_BIGINT: - printf(" %15" PRId64 " |", *(int64_t*)var); - break; - case TSDB_DATA_TYPE_UBIGINT: - printf(" %15" PRIu64 " |", *(uint64_t*)var); - break; - case TSDB_DATA_TYPE_FLOAT: - printf(" %15f |", *(float*)var); - break; - case TSDB_DATA_TYPE_DOUBLE: - printf(" %15lf |", *(double*)var); - break; - case TSDB_DATA_TYPE_VARCHAR: - case TSDB_DATA_TYPE_GEOMETRY: { - char* pData = colDataGetVarData(pColInfoData, j); - int32_t dataSize = TMIN(sizeof(pBuf) - 1, varDataLen(pData)); - memset(pBuf, 0, dataSize + 1); - strncpy(pBuf, varDataVal(pData), dataSize); - printf(" %15s |", pBuf); - } break; - case TSDB_DATA_TYPE_NCHAR: { - char* pData = colDataGetVarData(pColInfoData, j); - int32_t dataSize = TMIN(sizeof(pBuf), varDataLen(pData)); - memset(pBuf, 0, dataSize); - (void)taosUcs4ToMbs((TdUcs4*)varDataVal(pData), dataSize, pBuf); - printf(" %15s |", pBuf); - } break; - default: - break; - } - } - printf("\n"); - } - } -} -#endif - // for debug char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) { int32_t size = 2048*1024; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index da05e950ce1323d083099eb128004bf80784ca07..18291ac70f67f44ff2f207b4ef55efc7e173dfaf 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1045,12 +1045,21 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms pRange->window.ekey = INT64_MAX; pRange->range.minVer = 0; pRange->range.maxVer = ver; + + tqDebug("s-task:%s fill-history task exists, update stream time window:%" PRId64 " - %" PRId64 + ", ver range:%" PRId64 " - %" PRId64, + pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer); + } else { + SHistDataRange* pRange = &pTask->dataRange; + tqDebug("s-task:%s no associated task, stream time window:%" PRId64 " - %" PRId64 ", ver range:%" PRId64 + " - %" PRId64, + pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer); } streamTaskCheckDownstreamTasks(pTask); } - tqDebug("vgId:%d s-task:%s is deployed and add meta from mnd, status:%d, total:%d", vgId, pTask->id.idStr, + tqDebug("vgId:%d s-task:%s is deployed and add meta from mnd, status:%d, numOfTasks:%d", vgId, pTask->id.idStr, pTask->status.taskStatus, numOfTasks); return 0; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 4434a29870550e078e2f12c90290de11575d1ebd..12e56800aa8a23691bf95a28b96363fa6575b2df 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2093,16 +2093,24 @@ FETCH_NEXT_BLOCK: { // do additional time window filter STimeWindow* pWindow = &pTaskInfo->streamInfo.fillHistoryWindow; - if (pWindow->skey != 0) { + if (pWindow->skey != INT64_MIN) { bool* p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool)); + bool hasUnqualified = false; SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex); for(int32_t i = 0; i < pBlock->info.rows; ++i) { int64_t* ts = (int64_t*) colDataGetData(pCol, i); p[i] = (*ts >= pWindow->skey); + + if (!p[i]) { + hasUnqualified = true; + } + } + + if (hasUnqualified) { + trimDataBlock(pBlock, pBlock->info.rows, p); } - trimDataBlock(pBlock, pBlock->info.rows, p); taosMemoryFree(p); } } diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index ec52e6fbe8e7344796d5ceb066ab8a3248771418..f1e43df23006d6b2ecbb005aa55c0c1443787fef 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -321,8 +321,8 @@ static void doCheckDownstreamStatus(SStreamTask* pTask, SStreamTask* pHTask) { pHTask->dataRange.range.minVer = 0; pHTask->dataRange.range.maxVer = pTask->chkInfo.currentVer; - qDebug("s-task:%s set the launch condition for fill history s-task:%s, window:%" PRId64 "-%" PRId64 - " verrange:%" PRId64 "-%" PRId64, + qDebug("s-task:%s set the launch condition for fill history s-task:%s, window:%" PRId64 " - %" PRId64 + " ver range:%" PRId64 " - %" PRId64, pTask->id.idStr, pHTask->id.idStr, pHTask->dataRange.window.skey, pHTask->dataRange.window.ekey, pHTask->dataRange.range.minVer, pHTask->dataRange.range.maxVer);