提交 2b803d8b 编写于 作者: H Haojun Liao

enh(query): set uid for the result datablock of stream scanner when handling the submit block.

上级 92300e95
......@@ -126,7 +126,7 @@ int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList
int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver);
bool tqNextDataBlock(STqReadHandle *pHandle);
int32_t tqRetrieveDataBlock(SArray **ppCols, STqReadHandle *pHandle, uint64_t *pGroupId, int32_t *pNumOfRows,
int32_t tqRetrieveDataBlock(SArray **ppCols, STqReadHandle *pHandle, uint64_t *pGroupId, uint64_t* pUid, int32_t *pNumOfRows,
int16_t *pNumOfCols);
// need to reposition
......
......@@ -161,7 +161,7 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
tqReadHandleSetMsg(pReader, pReq, 0);
while (tqNextDataBlock(pReader)) {
SSDataBlock block = {0};
if (tqRetrieveDataBlock(&block.pDataBlock, pReader, &block.info.groupId, &block.info.rows,
if (tqRetrieveDataBlock(&block.pDataBlock, pReader, &block.info.groupId, &block.info.uid, &block.info.rows,
&block.info.numOfCols) < 0) {
ASSERT(0);
}
......@@ -540,7 +540,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
tqReadHandleSetMsg(pReader, pCont, 0);
while (tqNextDataBlock(pReader)) {
SSDataBlock block = {0};
if (tqRetrieveDataBlock(&block.pDataBlock, pReader, &block.info.groupId, &block.info.rows,
if (tqRetrieveDataBlock(&block.pDataBlock, pReader, &block.info.groupId, &block.info.uid, &block.info.rows,
&block.info.numOfCols) < 0) {
ASSERT(0);
}
......
......@@ -84,10 +84,12 @@ bool tqNextDataBlock(STqReadHandle* pHandle) {
return false;
}
int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* pGroupId, int32_t* pNumOfRows,
int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* pGroupId, uint64_t* pUid, int32_t* pNumOfRows,
int16_t* pNumOfCols) {
/*int32_t sversion = pHandle->pBlock->sversion;*/
// TODO set to real sversion
*pUid = 0;
int32_t sversion = 0;
if (pHandle->sver != sversion) {
pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion);
......@@ -169,7 +171,10 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p
tdSTSRowIterInit(&iter, pTschema);
STSRow* row;
int32_t curRow = 0;
tInitSubmitBlkIter(&pHandle->msgIter, pHandle->pBlock, &pHandle->blkIter);
*pUid = pHandle->pBlock->uid; // set the uid of table for submit block
while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) {
tdSTSRowIterReset(&iter, row);
// get all wanted col of that block
......
......@@ -539,10 +539,12 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
while (tqNextDataBlock(pInfo->readerHandle)) {
SArray* pCols = NULL;
uint64_t groupId;
int32_t numOfRows;
int16_t outputCol;
int32_t code = tqRetrieveDataBlock(&pCols, pInfo->readerHandle, &groupId, &numOfRows, &outputCol);
uint64_t groupId = 0;
uint64_t uid = 0;
int32_t numOfRows = 0;
int16_t outputCol = 0;
int32_t code = tqRetrieveDataBlock(&pCols, pInfo->readerHandle, &groupId, &uid, &numOfRows, &outputCol);
if (code != TSDB_CODE_SUCCESS || numOfRows == 0) {
pTaskInfo->code = code;
......@@ -551,6 +553,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
pInfo->pRes->info.groupId = groupId;
pInfo->pRes->info.rows = numOfRows;
pInfo->pRes->info.uid = uid;
int32_t numOfCols = pInfo->pRes->info.numOfCols;
for (int32_t i = 0; i < numOfCols; ++i) {
......@@ -606,10 +609,8 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock*
SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
goto _error;
}
int32_t numOfOutput = taosArrayGetSize(pColList);
......@@ -626,16 +627,13 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock*
tqReadHandleSetColIdList((STqReadHandle*)streamReadHandle, pColIds);
int32_t code = tqReadHandleSetTbUidList(streamReadHandle, pTableIdList);
if (code != 0) {
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
return NULL;
goto _error;
}
pInfo->pBlockLists = taosArrayInit(4, POINTER_BYTES);
if (pInfo->pBlockLists == NULL) {
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
return NULL;
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
pInfo->readerHandle = streamReadHandle;
......@@ -647,7 +645,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock*
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfExprs = pResBlock->info.numOfCols;
pOperator->numOfExprs = pResBlock->info.numOfCols;
pOperator->fpSet._openFn = operatorDummyOpenFn;
pOperator->fpSet.getNextFn = doStreamBlockScan;
pOperator->fpSet.closeFn = operatorDummyCloseFn;
......@@ -656,6 +654,11 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock*
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamBlockScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL);
return pOperator;
_error:
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
return NULL;
}
static void destroySysScanOperator(void* param, int32_t numOfOutput) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册