From 25f451ba819beb71f7a8f01c0b52e60502c3bc73 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 4 May 2023 23:34:35 +0800 Subject: [PATCH] fix(tmq): fix result data block info. --- source/dnode/mnode/impl/src/mndSubscribe.c | 4 ++-- source/dnode/vnode/inc/vnode.h | 1 + source/dnode/vnode/src/tq/tqRead.c | 15 ++++++++++++--- 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 573c60549e..f4d6e27dea 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -197,7 +197,7 @@ static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) { return pRebSub; } -static void doRemoveExistedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, const SMqRebInputObj *pInput) { +static void doRemoveLostConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, const SMqRebInputObj *pInput) { int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers); const char *pSubKey = pOutput->pSub->key; @@ -339,7 +339,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); // 2. check and get actual removed consumers, put their vg into pHash - doRemoveExistedConsumers(pOutput, pHash, pInput); + doRemoveLostConsumers(pOutput, pHash, pInput); // 3. if previously no consumer, there are vgs not assigned, put these vg into pHash addUnassignedVgroups(pOutput, pHash); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 817e6fdae4..c7424cd233 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -241,6 +241,7 @@ typedef struct STqReader { SArray *pColIdList; // SArray int32_t cachedSchemaVer; int64_t cachedSchemaSuid; + int64_t cachedSchemaUid; SSchemaWrapper *pSchemaWrapper; SSDataBlock *pResBlock; } STqReader; diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 3ee706cd39..8622216b28 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -530,22 +530,27 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa pBlock->info.id.uid = uid; pBlock->info.version = pReader->msg.ver; - if (pReader->cachedSchemaSuid == 0 || pReader->cachedSchemaVer != sversion || pReader->cachedSchemaSuid != suid) { + if ((suid != 0 && pReader->cachedSchemaSuid != suid) || (suid == 0 && pReader->cachedSchemaUid != uid) || (pReader->cachedSchemaVer != sversion)) { tDeleteSchemaWrapper(pReader->pSchemaWrapper); pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1); if (pReader->pSchemaWrapper == NULL) { - tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table", - pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer); + tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64 "version %d, possibly dropped table", + pReader->pWalReader->pWal->cfg.vgId, suid, uid, pReader->cachedSchemaVer); pReader->cachedSchemaSuid = 0; terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; return -1; } + pReader->cachedSchemaUid = uid; pReader->cachedSchemaSuid = suid; pReader->cachedSchemaVer = sversion; SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper; + if (blockDataGetNumOfCols(pBlock) > 0) { + blockDataDestroy(pReader->pResBlock); + pReader->pResBlock = createDataBlock(); + } int32_t numOfCols = taosArrayGetSize(pReader->pColIdList); if (numOfCols == 0) { // all columns are required @@ -671,8 +676,12 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, j); while (1) { SColVal colVal; + tqDebug("start to extract column id:%d, index:%d", pColData->info.colId, sourceIdx); + tRowGet(pRow, pTSchema, sourceIdx, &colVal); if (colVal.cid < pColData->info.colId) { + tqDebug("colIndex:%d column id:%d in row, ignore, the required colId:%d, total cols in schema:%d", + sourceIdx, colVal.cid, pColData->info.colId, pTSchema->numOfCols); sourceIdx++; continue; } else if (colVal.cid == pColData->info.colId) { -- GitLab