diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 573c60549eb7eed1864ad44145199d4ca24b0ce2..f4d6e27dea7f07d7570b02ed5479d1375896910c 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -197,7 +197,7 @@ static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) { return pRebSub; } -static void doRemoveExistedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, const SMqRebInputObj *pInput) { +static void doRemoveLostConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, const SMqRebInputObj *pInput) { int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers); const char *pSubKey = pOutput->pSub->key; @@ -339,7 +339,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); // 2. check and get actual removed consumers, put their vg into pHash - doRemoveExistedConsumers(pOutput, pHash, pInput); + doRemoveLostConsumers(pOutput, pHash, pInput); // 3. if previously no consumer, there are vgs not assigned, put these vg into pHash addUnassignedVgroups(pOutput, pHash); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 817e6fdae4e9b2d430436cb2628fe5dcfb37ace2..c7424cd2330f094f385e71bb1fc8dba185a4f4dd 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -241,6 +241,7 @@ typedef struct STqReader { SArray *pColIdList; // SArray int32_t cachedSchemaVer; int64_t cachedSchemaSuid; + int64_t cachedSchemaUid; SSchemaWrapper *pSchemaWrapper; SSDataBlock *pResBlock; } STqReader; diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 3ee706cd3982215d4a1711fec563f38164fd4cfe..8622216b28ddb135943504926ff06401e4cd60db 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -530,22 +530,27 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa pBlock->info.id.uid = uid; pBlock->info.version = pReader->msg.ver; - if (pReader->cachedSchemaSuid == 0 || pReader->cachedSchemaVer != sversion || pReader->cachedSchemaSuid != suid) { + if ((suid != 0 && pReader->cachedSchemaSuid != suid) || (suid == 0 && pReader->cachedSchemaUid != uid) || (pReader->cachedSchemaVer != sversion)) { tDeleteSchemaWrapper(pReader->pSchemaWrapper); pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1); if (pReader->pSchemaWrapper == NULL) { - tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table", - pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer); + tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64 "version %d, possibly dropped table", + pReader->pWalReader->pWal->cfg.vgId, suid, uid, pReader->cachedSchemaVer); pReader->cachedSchemaSuid = 0; terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; return -1; } + pReader->cachedSchemaUid = uid; pReader->cachedSchemaSuid = suid; pReader->cachedSchemaVer = sversion; SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper; + if (blockDataGetNumOfCols(pBlock) > 0) { + blockDataDestroy(pReader->pResBlock); + pReader->pResBlock = createDataBlock(); + } int32_t numOfCols = taosArrayGetSize(pReader->pColIdList); if (numOfCols == 0) { // all columns are required @@ -671,8 +676,12 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, j); while (1) { SColVal colVal; + tqDebug("start to extract column id:%d, index:%d", pColData->info.colId, sourceIdx); + tRowGet(pRow, pTSchema, sourceIdx, &colVal); if (colVal.cid < pColData->info.colId) { + tqDebug("colIndex:%d column id:%d in row, ignore, the required colId:%d, total cols in schema:%d", + sourceIdx, colVal.cid, pColData->info.colId, pTSchema->numOfCols); sourceIdx++; continue; } else if (colVal.cid == pColData->info.colId) {