提交 6cbedc5d 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 40ffe0cd
......@@ -503,6 +503,55 @@ int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrap
return 0;
}
static int32_t buildResSDataBlock(SSDataBlock* pBlock, SSchemaWrapper* pSchema, const SArray* pColIdList) {
if (blockDataGetNumOfCols(pBlock) > 0) {
return TSDB_CODE_SUCCESS;
}
int32_t numOfCols = taosArrayGetSize(pColIdList);
if (numOfCols == 0) { // all columns are required
for (int32_t i = 0; i < pSchema->nCols; ++i) {
SSchema* pColSchema = &pSchema->pSchema[i];
SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
if (code != TSDB_CODE_SUCCESS) {
blockDataFreeRes(pBlock);
return TSDB_CODE_OUT_OF_MEMORY;
}
}
} else {
if (numOfCols > pSchema->nCols) {
numOfCols = pSchema->nCols;
}
int32_t i = 0;
int32_t j = 0;
while (i < pSchema->nCols && j < numOfCols) {
SSchema* pColSchema = &pSchema->pSchema[i];
col_id_t colIdSchema = pColSchema->colId;
col_id_t colIdNeed = *(col_id_t*)taosArrayGet(pColIdList, j);
if (colIdSchema < colIdNeed) {
i++;
} else if (colIdSchema > colIdNeed) {
j++;
} else {
SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
if (code != TSDB_CODE_SUCCESS) {
return -1;
}
i++;
j++;
}
}
}
return TSDB_CODE_SUCCESS;
}
int32_t tqRetrieveDataBlock(STqReader* pReader, SSubmitTbData** pSubmitTbDataRet) {
tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
......@@ -534,57 +583,16 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSubmitTbData** pSubmitTbDataRet
return -1;
}
ASSERT(pReader->cachedSchemaVer == pReader->pSchemaWrapper->version);
pReader->cachedSchemaUid = uid;
pReader->cachedSchemaSuid = suid;
pReader->cachedSchemaVer = sversion;
SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
if (blockDataGetNumOfCols(pBlock) > 0) {
blockDataDestroy(pReader->pResBlock);
pReader->pResBlock = createDataBlock();
pBlock = pReader->pResBlock;
pBlock->info.id.uid = uid;
pBlock->info.version = pReader->msg.ver;
}
int32_t numOfCols = taosArrayGetSize(pReader->pColIdList);
if (numOfCols == 0) { // all columns are required
for (int32_t i = 0; i < pSchemaWrapper->nCols; ++i) {
SSchema* pColSchema = &pSchemaWrapper->pSchema[i];
SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
if (code != TSDB_CODE_SUCCESS) {
blockDataFreeRes(pBlock);
return -1;
}
}
} else {
if (numOfCols > pSchemaWrapper->nCols) {
numOfCols = pSchemaWrapper->nCols;
}
int32_t i = 0;
int32_t j = 0;
while (i < pSchemaWrapper->nCols && j < numOfCols) {
SSchema* pColSchema = &pSchemaWrapper->pSchema[i];
col_id_t colIdSchema = pColSchema->colId;
col_id_t colIdNeed = *(col_id_t*)taosArrayGet(pReader->pColIdList, j);
if (colIdSchema < colIdNeed) {
i++;
} else if (colIdSchema > colIdNeed) {
j++;
} else {
SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
if (code != TSDB_CODE_SUCCESS) {
return -1;
}
i++;
j++;
}
if (blockDataGetNumOfCols(pBlock) == 0) {
int32_t code = buildResSDataBlock(pReader->pResBlock, pReader->pSchemaWrapper, pReader->pColIdList);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册