提交 25f451ba 编写于 作者: H Haojun Liao

fix(tmq): fix result data block info.

上级 1a8a834a
......@@ -197,7 +197,7 @@ static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
return pRebSub;
}
static void doRemoveExistedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, const SMqRebInputObj *pInput) {
static void doRemoveLostConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, const SMqRebInputObj *pInput) {
int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
const char *pSubKey = pOutput->pSub->key;
......@@ -339,7 +339,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
// 2. check and get actual removed consumers, put their vg into pHash
doRemoveExistedConsumers(pOutput, pHash, pInput);
doRemoveLostConsumers(pOutput, pHash, pInput);
// 3. if previously no consumer, there are vgs not assigned, put these vg into pHash
addUnassignedVgroups(pOutput, pHash);
......
......@@ -241,6 +241,7 @@ typedef struct STqReader {
SArray *pColIdList; // SArray<int16_t>
int32_t cachedSchemaVer;
int64_t cachedSchemaSuid;
int64_t cachedSchemaUid;
SSchemaWrapper *pSchemaWrapper;
SSDataBlock *pResBlock;
} STqReader;
......
......@@ -530,22 +530,27 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa
pBlock->info.id.uid = uid;
pBlock->info.version = pReader->msg.ver;
if (pReader->cachedSchemaSuid == 0 || pReader->cachedSchemaVer != sversion || pReader->cachedSchemaSuid != suid) {
if ((suid != 0 && pReader->cachedSchemaSuid != suid) || (suid == 0 && pReader->cachedSchemaUid != uid) || (pReader->cachedSchemaVer != sversion)) {
tDeleteSchemaWrapper(pReader->pSchemaWrapper);
pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1);
if (pReader->pSchemaWrapper == NULL) {
tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer);
tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64 "version %d, possibly dropped table",
pReader->pWalReader->pWal->cfg.vgId, suid, uid, pReader->cachedSchemaVer);
pReader->cachedSchemaSuid = 0;
terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
return -1;
}
pReader->cachedSchemaUid = uid;
pReader->cachedSchemaSuid = suid;
pReader->cachedSchemaVer = sversion;
SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
if (blockDataGetNumOfCols(pBlock) > 0) {
blockDataDestroy(pReader->pResBlock);
pReader->pResBlock = createDataBlock();
}
int32_t numOfCols = taosArrayGetSize(pReader->pColIdList);
if (numOfCols == 0) { // all columns are required
......@@ -671,8 +676,12 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, j);
while (1) {
SColVal colVal;
tqDebug("start to extract column id:%d, index:%d", pColData->info.colId, sourceIdx);
tRowGet(pRow, pTSchema, sourceIdx, &colVal);
if (colVal.cid < pColData->info.colId) {
tqDebug("colIndex:%d column id:%d in row, ignore, the required colId:%d, total cols in schema:%d",
sourceIdx, colVal.cid, pColData->info.colId, pTSchema->numOfCols);
sourceIdx++;
continue;
} else if (colVal.cid == pColData->info.colId) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册