未验证 提交 5fb25473 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #18013 from taosdata/feature/stream

fix(stream): partition tbname reset to null
......@@ -899,7 +899,10 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
void* pData = colDataGetVarData(pCol, 0);
// TODO check tbname validity
if (pData != (void*)-1) {
memcpy(pDest->info.parTbName, varDataVal(pData), varDataLen(pData));
memset(pDest->info.parTbName, 0, TSDB_TABLE_NAME_LEN);
int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN);
memcpy(pDest->info.parTbName, varDataVal(pData), len);
/*pDest->info.parTbName[len + 1] = 0;*/
} else {
pDest->info.parTbName[0] = 0;
}
......
......@@ -284,7 +284,7 @@ static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo*
}
static bool doFilterByBlockSMA(SFilterInfo* pFilterInfo, SColumnDataAgg** pColsAgg, int32_t numOfCols,
int32_t numOfRows) {
int32_t numOfRows) {
if (pColsAgg == NULL || pFilterInfo == NULL) {
return true;
}
......@@ -345,7 +345,7 @@ static void doSetTagColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlo
// todo handle the slimit info
void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator) {
SLimit* pLimit = &pLimitInfo->limit;
SLimit* pLimit = &pLimitInfo->limit;
const char* id = GET_TASKID(pTaskInfo);
if (pLimit->offset > 0 && pLimitInfo->remainOffset > 0) {
......@@ -499,7 +499,7 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction
typedef struct STableCachedVal {
const char* pName;
STag* pTags;
STag* pTags;
} STableCachedVal;
static void freeTableCachedVal(void* param) {
......@@ -513,13 +513,11 @@ static void freeTableCachedVal(void* param) {
taosMemoryFree(pVal);
}
//const void *key, size_t keyLen, void *value
static void freeCachedMetaItem(const void *key, size_t keyLen, void *value) {
freeTableCachedVal(value);
}
// const void *key, size_t keyLen, void *value
static void freeCachedMetaItem(const void* key, size_t keyLen, void* value) { freeTableCachedVal(value); }
int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr,
SSDataBlock* pBlock, int32_t rows, const char* idStr, STableMetaCacheInfo* pCache) {
int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock,
int32_t rows, const char* idStr, STableMetaCacheInfo* pCache) {
// currently only the tbname pseudo column
if (numOfExpr <= 0) {
return TSDB_CODE_SUCCESS;
......@@ -531,11 +529,11 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int
int32_t backupRows = pBlock->info.rows;
pBlock->info.rows = rows;
bool freeReader = false;
bool freeReader = false;
STableCachedVal val = {0};
SMetaReader mr = {0};
LRUHandle* h = NULL;
LRUHandle* h = NULL;
// 1. check if it is existed in meta cache
if (pCache == NULL) {
......@@ -582,7 +580,8 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int
val = *pVal;
freeReader = true;
int32_t ret = taosLRUCacheInsert(pCache->pTableMetaEntryCache, &pBlock->info.uid, sizeof(uint64_t), pVal, sizeof(STableCachedVal), freeCachedMetaItem, NULL, TAOS_LRU_PRIORITY_LOW);
int32_t ret = taosLRUCacheInsert(pCache->pTableMetaEntryCache, &pBlock->info.uid, sizeof(uint64_t), pVal,
sizeof(STableCachedVal), freeCachedMetaItem, NULL, TAOS_LRU_PRIORITY_LOW);
if (ret != TAOS_LRU_STATUS_OK) {
qError("failed to put meta into lru cache, code:%d, %s", ret, idStr);
freeTableCachedVal(pVal);
......@@ -594,13 +593,13 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int
taosLRUCacheRelease(pCache->pTableMetaEntryCache, h, false);
}
qDebug("retrieve table meta from cache:%"PRIu64", hit:%"PRIu64 " miss:%"PRIu64", %s", pCache->metaFetch, pCache->cacheHit,
(pCache->metaFetch - pCache->cacheHit), idStr);
qDebug("retrieve table meta from cache:%" PRIu64 ", hit:%" PRIu64 " miss:%" PRIu64 ", %s", pCache->metaFetch,
pCache->cacheHit, (pCache->metaFetch - pCache->cacheHit), idStr);
}
for (int32_t j = 0; j < numOfExpr; ++j) {
const SExprInfo* pExpr1 = &pExpr[j];
int32_t dstSlotId = pExpr1->base.resSchema.slotId;
int32_t dstSlotId = pExpr1->base.resSchema.slotId;
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
colInfoDataCleanup(pColInfoData, pBlock->info.rows);
......@@ -652,7 +651,7 @@ void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData,
fmGetScalarFuncExecFuncs(functionId, &fpSet);
size_t len = TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE;
char buf[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
char buf[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(buf, name)
SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, len, 1);
......@@ -904,12 +903,12 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
goto _error;
}
SScanPhysiNode* pScanNode = &pTableScanNode->scan;
SScanPhysiNode* pScanNode = &pTableScanNode->scan;
SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
int32_t numOfCols = 0;
int32_t code = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID,
&pInfo->matchInfo);
int32_t code =
extractColMatchInfo(pScanNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
......@@ -950,7 +949,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
setOperatorInfo(pOperator, "TableScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->exprSupp.numOfExprs = numOfCols;
pInfo->metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024*128, -1, .5);
pInfo->metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
if (pInfo->metaCache.pTableMetaEntryCache == NULL) {
code = terrno;
goto _error;
......@@ -1038,8 +1037,8 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STableBlockDistInfo blockDistInfo = {.minRows = INT_MAX, .maxRows = INT_MIN};
int32_t code = doGetTableRowSize(pBlockScanInfo->readHandle.meta, pBlockScanInfo->uid, (int32_t*)&blockDistInfo.rowSize,
GET_TASKID(pTaskInfo));
int32_t code = doGetTableRowSize(pBlockScanInfo->readHandle.meta, pBlockScanInfo->uid,
(int32_t*)&blockDistInfo.rowSize, GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
......@@ -1581,8 +1580,10 @@ static void calBlockTbName(SExprSupp* pTbNameCalSup, SSDataBlock* pBlock) {
void* pData = colDataGetData(pCol, 0);
// TODO check tbname validation
if (pData != (void*)-1 && pData != NULL) {
memcpy(pBlock->info.parTbName, varDataVal(pData), TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN));
pBlock->info.parTbName[TSDB_TABLE_NAME_LEN - 1] = 0;
memset(pBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN);
int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN);
memcpy(pBlock->info.parTbName, varDataVal(pData), len);
/*pBlock->info.parTbName[len + 1] = 0;*/
} else {
pBlock->info.parTbName[0] = 0;
}
......@@ -4288,7 +4289,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
int32_t numOfExprs = 0;
SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs);
int32_t code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs);
int32_t code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
......@@ -4417,7 +4418,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
T_LONG_JMP(pTaskInfo->env, code);
}
if (pOperator->exprSupp.pFilterInfo!= NULL) {
if (pOperator->exprSupp.pFilterInfo != NULL) {
int64_t st = taosGetTimestampMs();
doFilter(pBlock, pOperator->exprSupp.pFilterInfo, &pTableScanInfo->matchInfo);
......@@ -4793,7 +4794,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pInfo->sample.seed = taosGetTimestampSec();
pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册