提交 f1bd8299 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 27b7d1ec
...@@ -200,7 +200,7 @@ STqReader *tqReaderOpen(SVnode *pVnode); ...@@ -200,7 +200,7 @@ STqReader *tqReaderOpen(SVnode *pVnode);
void tqReaderClose(STqReader *); void tqReaderClose(STqReader *);
void tqReaderSetColIdList(STqReader *pReader, SArray *pColIdList); void tqReaderSetColIdList(STqReader *pReader, SArray *pColIdList);
int32_t tqReaderSetTbUidList(STqReader *pReader, const SArray *tbUidList); int32_t tqReaderSetTbUidList(STqReader *pReader, const SArray *tbUidList, const char* id);
int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList); int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList);
int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList); int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList);
......
...@@ -708,8 +708,9 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg ...@@ -708,8 +708,9 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i); int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid); tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid);
} }
pHandle->execHandle.pTqReader = tqReaderOpen(pVnode); pHandle->execHandle.pTqReader = tqReaderOpen(pVnode);
tqReaderSetTbUidList(pHandle->execHandle.pTqReader, tbUidList); tqReaderSetTbUidList(pHandle->execHandle.pTqReader, tbUidList, NULL);
taosArrayDestroy(tbUidList); taosArrayDestroy(tbUidList);
buildSnapContext(handle.vnode, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta, buildSnapContext(handle.vnode, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta,
......
...@@ -344,8 +344,9 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { ...@@ -344,8 +344,9 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i); int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid); tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid);
} }
handle.execHandle.pTqReader = tqReaderOpen(pTq->pVnode); handle.execHandle.pTqReader = tqReaderOpen(pTq->pVnode);
tqReaderSetTbUidList(handle.execHandle.pTqReader, tbUidList); tqReaderSetTbUidList(handle.execHandle.pTqReader, tbUidList, NULL);
taosArrayDestroy(tbUidList); taosArrayDestroy(tbUidList);
buildSnapContext(reader.vnode, reader.version, handle.execHandle.execTb.suid, handle.execHandle.subType, buildSnapContext(reader.vnode, reader.version, handle.execHandle.execTb.suid, handle.execHandle.subType,
......
...@@ -394,8 +394,8 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id) { ...@@ -394,8 +394,8 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id) {
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
if (pReader->tbIdHash == NULL) { if (pReader->tbIdHash == NULL) {
SSDataBlock* pRes = NULL; SSDataBlock* pRes = NULL;
int32_t code = tqRetrieveDataBlock(pReader, &pRes, NULL); int32_t code = tqRetrieveDataBlock(pReader, &pRes, NULL);
if (code == TSDB_CODE_SUCCESS && pRes->info.rows > 0) { if (code == TSDB_CODE_SUCCESS && pRes->info.rows > 0) {
return true; return true;
} }
...@@ -457,7 +457,7 @@ bool tqNextBlockImpl(STqReader* pReader, const char* idstr) { ...@@ -457,7 +457,7 @@ bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData); int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
while (pReader->nextBlk < numOfBlocks) { while (pReader->nextBlk < numOfBlocks) {
tqDebug("tq reader next data block, len:%d ver:%" PRId64 " index:%d/%d, %s", pReader->msg.msgLen, pReader->msg.ver, tqDebug("try next data block, len:%d ver:%" PRId64 " index:%d/%d, %s", pReader->msg.msgLen, pReader->msg.ver,
pReader->nextBlk, numOfBlocks, idstr); pReader->nextBlk, numOfBlocks, idstr);
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
...@@ -467,10 +467,11 @@ bool tqNextBlockImpl(STqReader* pReader, const char* idstr) { ...@@ -467,10 +467,11 @@ bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)); void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
if (ret != NULL) { if (ret != NULL) {
tqDebug("tq reader block found, ver:%" PRId64 ", uid:%" PRId64, pReader->msg.ver, pSubmitTbData->uid); tqDebug("block found, ver:%" PRId64 ", uid:%" PRId64", %s", pReader->msg.ver, pSubmitTbData->uid, idstr);
return true; return true;
} else { } else {
tqDebug("tq reader discard submit block, uid:%" PRId64 ", continue", pSubmitTbData->uid); tqDebug("discard submit block, uid:%" PRId64 ", total queried tables:%d continue %s", pSubmitTbData->uid,
taosHashGetSize(pReader->tbIdHash), idstr);
} }
pReader->nextBlk++; pReader->nextBlk++;
...@@ -604,7 +605,6 @@ static int32_t doSetVal(SColumnInfoData* pColumnInfoData, int32_t rowIndex, SCol ...@@ -604,7 +605,6 @@ static int32_t doSetVal(SColumnInfoData* pColumnInfoData, int32_t rowIndex, SCol
int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* id) { int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* id) {
tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk); tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk++); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk++);
SSDataBlock* pBlock = pReader->pResBlock; SSDataBlock* pBlock = pReader->pResBlock;
...@@ -612,6 +612,7 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* ...@@ -612,6 +612,7 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char*
blockDataCleanup(pBlock); blockDataCleanup(pBlock);
int32_t vgId = pReader->pWalReader->pWal->cfg.vgId;
int32_t sversion = pSubmitTbData->sver; int32_t sversion = pSubmitTbData->sver;
int64_t suid = pSubmitTbData->suid; int64_t suid = pSubmitTbData->suid;
int64_t uid = pSubmitTbData->uid; int64_t uid = pSubmitTbData->uid;
...@@ -628,7 +629,7 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* ...@@ -628,7 +629,7 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char*
if (pReader->pSchemaWrapper == NULL) { if (pReader->pSchemaWrapper == NULL) {
tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64 tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64
"version %d, possibly dropped table", "version %d, possibly dropped table",
pReader->pWalReader->pWal->cfg.vgId, suid, uid, pReader->cachedSchemaVer); vgId, suid, uid, pReader->cachedSchemaVer);
pReader->cachedSchemaSuid = 0; pReader->cachedSchemaSuid = 0;
terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
return -1; return -1;
...@@ -642,6 +643,7 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* ...@@ -642,6 +643,7 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char*
if (blockDataGetNumOfCols(pBlock) == 0) { if (blockDataGetNumOfCols(pBlock) == 0) {
int32_t code = buildResSDataBlock(pReader->pResBlock, pReader->pSchemaWrapper, pReader->pColIdList); int32_t code = buildResSDataBlock(pReader->pResBlock, pReader->pSchemaWrapper, pReader->pColIdList);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tqError("vgId:%d failed to build data block, code:%s", vgId, tstrerror(code));
return code; return code;
} }
} }
...@@ -998,7 +1000,7 @@ FAIL: ...@@ -998,7 +1000,7 @@ FAIL:
void tqReaderSetColIdList(STqReader* pReader, SArray* pColIdList) { pReader->pColIdList = pColIdList; } void tqReaderSetColIdList(STqReader* pReader, SArray* pColIdList) { pReader->pColIdList = pColIdList; }
int tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList) { int tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList, const char* id) {
if (pReader->tbIdHash) { if (pReader->tbIdHash) {
taosHashClear(pReader->tbIdHash); taosHashClear(pReader->tbIdHash);
} else { } else {
...@@ -1015,6 +1017,7 @@ int tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList) { ...@@ -1015,6 +1017,7 @@ int tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList) {
taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0); taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
} }
tqDebug("s-task:%s %d tables are set to be queried target table", id, (int32_t) taosArrayGetSize(tbUidList));
return 0; return 0;
} }
......
...@@ -401,7 +401,7 @@ int32_t qUpdateTableListForStreamScanner(qTaskInfo_t tinfo, const SArray* tableI ...@@ -401,7 +401,7 @@ int32_t qUpdateTableListForStreamScanner(qTaskInfo_t tinfo, const SArray* tableI
int32_t code = 0; int32_t code = 0;
if (isAdd) { if (isAdd) {
qDebug("add %d tables id into query list, %s", (int32_t)taosArrayGetSize(tableIdList), id); qDebug("try to add %d tables id into query list, %s", (int32_t)taosArrayGetSize(tableIdList), id);
} }
// traverse to the stream scanner node to add this table id // traverse to the stream scanner node to add this table id
......
...@@ -2308,7 +2308,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys ...@@ -2308,7 +2308,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
SArray* pColIds = NULL; SArray* pColIds = NULL;
SStreamScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamScanInfo)); SStreamScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
SStorageAPI* pAPI = &pTaskInfo->storageAPI; SStorageAPI* pAPI = &pTaskInfo->storageAPI;
const char* idstr = pTaskInfo->id.str;
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
...@@ -2419,7 +2420,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys ...@@ -2419,7 +2420,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
// set the extract column id to streamHandle // set the extract column id to streamHandle
pAPI->tqReaderFn.tqReaderSetColIdList(pInfo->tqReader, pColIds); pAPI->tqReaderFn.tqReaderSetColIdList(pInfo->tqReader, pColIds);
SArray* tableIdList = extractTableIdList(((STableScanInfo*)(pInfo->pTableScanOp->info))->base.pTableListInfo); SArray* tableIdList = extractTableIdList(((STableScanInfo*)(pInfo->pTableScanOp->info))->base.pTableListInfo);
code = pAPI->tqReaderFn.tqReaderSetQueryTableList(pInfo->tqReader, tableIdList); code = pAPI->tqReaderFn.tqReaderSetQueryTableList(pInfo->tqReader, tableIdList, idstr);
if (code != 0) { if (code != 0) {
taosArrayDestroy(tableIdList); taosArrayDestroy(tableIdList);
goto _error; goto _error;
......
...@@ -318,7 +318,7 @@ int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, in ...@@ -318,7 +318,7 @@ int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, in
msg.pCont = buf; msg.pCont = buf;
msg.msgType = pTask->dispatchMsgType; msg.msgType = pTask->dispatchMsgType;
qDebug("dispatch from s-task:%s to taskId:0x%x vgId:%d data msg", pTask->id.idStr, pReq->taskId, vgId); qDebug("s-task:%s dispatch msg to taskId:0x%x vgId:%d data msg", pTask->id.idStr, pReq->taskId, vgId);
tmsgSendReq(pEpSet, &msg); tmsgSendReq(pEpSet, &msg);
code = 0; code = 0;
......
...@@ -133,7 +133,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i ...@@ -133,7 +133,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
taosArrayPush(pRes, &block); taosArrayPush(pRes, &block);
qDebug("s-task:%s (child %d) executed and get block, total blocks:%d, size:%.2fMiB", pTask->id.idStr, qDebug("s-task:%s (child %d) executed and get %d result blocks, size:%.2fMiB", pTask->id.idStr,
pTask->selfChildId, numOfBlocks, size / 1048576.0); pTask->selfChildId, numOfBlocks, size / 1048576.0);
// current output should be dispatched to down stream nodes // current output should be dispatched to down stream nodes
...@@ -371,7 +371,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { ...@@ -371,7 +371,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
streamQueueProcessSuccess(pTask->inputQueue); streamQueueProcessSuccess(pTask->inputQueue);
if (batchSize > MAX_STREAM_EXEC_BATCH_NUM) { if (batchSize > MAX_STREAM_EXEC_BATCH_NUM) {
qDebug("s-task:%s maximum batch limit:%d reached, processing this batch of blocks", id, qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id,
MAX_STREAM_EXEC_BATCH_NUM); MAX_STREAM_EXEC_BATCH_NUM);
break; break;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册