未验证 提交 501bec87 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #14261 from taosdata/feature/stream

enh(stream): refine tqRetrieveDataBlock api
...@@ -199,10 +199,7 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) { ...@@ -199,10 +199,7 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
return pResInfo->userFields; return pResInfo->userFields;
} }
TAOS_RES *taos_query(TAOS *taos, const char *sql) { return taosQueryImpl(taos, sql, false); }
TAOS_RES *taos_query(TAOS *taos, const char *sql) {
return taosQueryImpl(taos, sql, false);
}
TAOS_ROW taos_fetch_row(TAOS_RES *res) { TAOS_ROW taos_fetch_row(TAOS_RES *res) {
if (res == NULL) { if (res == NULL) {
...@@ -594,7 +591,7 @@ int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) { ...@@ -594,7 +591,7 @@ int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) {
} }
int taos_validate_sql(TAOS *taos, const char *sql) { int taos_validate_sql(TAOS *taos, const char *sql) {
TAOS_RES* pObj = taosQueryImpl(taos, sql, true); TAOS_RES *pObj = taosQueryImpl(taos, sql, true);
int code = taos_errno(pObj); int code = taos_errno(pObj);
...@@ -911,13 +908,13 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { ...@@ -911,13 +908,13 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
goto _return; goto _return;
} }
SCatalog* pCtg = NULL; SCatalog *pCtg = NULL;
code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCtg); code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCtg);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _return; goto _return;
} }
char* sql = "taos_load_table_info"; char *sql = "taos_load_table_info";
code = buildRequest(pTscObj, sql, strlen(sql), &pRequest); code = buildRequest(pTscObj, sql, strlen(sql), &pRequest);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
terrno = code; terrno = code;
...@@ -928,9 +925,8 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { ...@@ -928,9 +925,8 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
tsem_init(&param.sem, 0, 0); tsem_init(&param.sem, 0, 0);
param.pRequest = pRequest; param.pRequest = pRequest;
SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter, SRequestConnInfo conn = {
.requestId = pRequest->requestId, .pTrans = pTscObj->pAppInfo->pTransporter, .requestId = pRequest->requestId, .requestObjRefId = pRequest->self};
.requestObjRefId = pRequest->self};
conn.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); conn.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
...@@ -951,7 +947,6 @@ _return: ...@@ -951,7 +947,6 @@ _return:
return code; return code;
} }
TAOS_STMT *taos_stmt_init(TAOS *taos) { TAOS_STMT *taos_stmt_init(TAOS *taos) {
STscObj *pObj = acquireTscObj(*(int64_t *)taos); STscObj *pObj = acquireTscObj(*(int64_t *)taos);
if (NULL == pObj) { if (NULL == pObj) {
......
...@@ -1164,7 +1164,7 @@ int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows) ...@@ -1164,7 +1164,7 @@ int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows)
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) { int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) {
int32_t code = 0; int32_t code = 0;
//ASSERT(numOfRows > 0); // ASSERT(numOfRows > 0);
if (numOfRows == 0) { if (numOfRows == 0) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -1662,7 +1662,8 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) ...@@ -1662,7 +1662,8 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock); int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
int32_t rows = pDataBlock->info.rows; int32_t rows = pDataBlock->info.rows;
int32_t len = 0; int32_t len = 0;
len += snprintf(dumpBuf + len, size - len, "\n%s |block type %d |child id %d|\n", flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId); len += snprintf(dumpBuf + len, size - len, "\n%s |block type %d |child id %d|\n", flag,
(int32_t)pDataBlock->info.type, pDataBlock->info.childId);
for (int32_t j = 0; j < rows; j++) { for (int32_t j = 0; j < rows; j++) {
len += snprintf(dumpBuf + len, size - len, "%s |", flag); len += snprintf(dumpBuf + len, size - len, "%s |", flag);
for (int32_t k = 0; k < colNum; k++) { for (int32_t k = 0; k < colNum; k++) {
......
...@@ -116,7 +116,7 @@ typedef void *tsdbReaderT; ...@@ -116,7 +116,7 @@ typedef void *tsdbReaderT;
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2 #define BLOCK_LOAD_TABLE_SEQ_ORDER 2
#define BLOCK_LOAD_TABLE_RR_ORDER 3 #define BLOCK_LOAD_TABLE_RR_ORDER 3
int32_t tsdbSetTableList(tsdbReaderT reader, SArray* tableList); int32_t tsdbSetTableList(tsdbReaderT reader, SArray *tableList);
tsdbReaderT tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, SArray *tableList, uint64_t qId, tsdbReaderT tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, SArray *tableList, uint64_t qId,
uint64_t taskId); uint64_t taskId);
tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *groupList, uint64_t qId, tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *groupList, uint64_t qId,
...@@ -150,8 +150,7 @@ int32_t tqReadHandleRemoveTbUidList(STqReadHandle *pHandle, const SArray *tbUidL ...@@ -150,8 +150,7 @@ int32_t tqReadHandleRemoveTbUidList(STqReadHandle *pHandle, const SArray *tbUidL
int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver); int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver);
bool tqNextDataBlock(STqReadHandle *pHandle); bool tqNextDataBlock(STqReadHandle *pHandle);
bool tqNextDataBlockFilterOut(STqReadHandle *pHandle, SHashObj *filterOutUids); bool tqNextDataBlockFilterOut(STqReadHandle *pHandle, SHashObj *filterOutUids);
int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReadHandle *pHandle, uint64_t *pGroupId, uint64_t *pUid, int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReadHandle *pHandle);
int32_t *pNumOfRows);
// sma // sma
int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days); int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);
......
...@@ -112,7 +112,7 @@ int32_t tqDataExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataBlkR ...@@ -112,7 +112,7 @@ int32_t tqDataExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataBlkR
tqReadHandleSetMsg(pReader, pReq, 0); tqReadHandleSetMsg(pReader, pReq, 0);
while (tqNextDataBlock(pReader)) { while (tqNextDataBlock(pReader)) {
SSDataBlock block = {0}; SSDataBlock block = {0};
if (tqRetrieveDataBlock(&block, pReader, &block.info.groupId, &block.info.uid, &block.info.rows) < 0) { if (tqRetrieveDataBlock(&block, pReader) < 0) {
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
ASSERT(0); ASSERT(0);
} }
...@@ -129,7 +129,7 @@ int32_t tqDataExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataBlkR ...@@ -129,7 +129,7 @@ int32_t tqDataExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataBlkR
tqReadHandleSetMsg(pReader, pReq, 0); tqReadHandleSetMsg(pReader, pReq, 0);
while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) { while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) {
SSDataBlock block = {0}; SSDataBlock block = {0};
if (tqRetrieveDataBlock(&block, pReader, &block.info.groupId, &block.info.uid, &block.info.rows) < 0) { if (tqRetrieveDataBlock(&block, pReader) < 0) {
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
ASSERT(0); ASSERT(0);
} }
......
...@@ -146,10 +146,7 @@ bool tqNextDataBlockFilterOut(STqReadHandle* pHandle, SHashObj* filterOutUids) { ...@@ -146,10 +146,7 @@ bool tqNextDataBlockFilterOut(STqReadHandle* pHandle, SHashObj* filterOutUids) {
return false; return false;
} }
int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReadHandle* pHandle, uint64_t* pGroupId, uint64_t* pUid, int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReadHandle* pHandle) {
int32_t* pNumOfRows) {
*pUid = 0;
// TODO: cache multiple schema // TODO: cache multiple schema
int32_t sversion = htonl(pHandle->pBlock->sversion); int32_t sversion = htonl(pHandle->pBlock->sversion);
if (pHandle->cachedSchemaSuid == 0 || pHandle->cachedSchemaVer != sversion || if (pHandle->cachedSchemaSuid == 0 || pHandle->cachedSchemaVer != sversion ||
...@@ -180,7 +177,6 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReadHandle* pHandle, uint64_ ...@@ -180,7 +177,6 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReadHandle* pHandle, uint64_
STSchema* pTschema = pHandle->pSchema; STSchema* pTschema = pHandle->pSchema;
SSchemaWrapper* pSchemaWrapper = pHandle->pSchemaWrapper; SSchemaWrapper* pSchemaWrapper = pHandle->pSchemaWrapper;
*pNumOfRows = pHandle->msgIter.numOfRows;
int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList); int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList);
if (colNumNeed == 0) { if (colNumNeed == 0) {
...@@ -221,22 +217,22 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReadHandle* pHandle, uint64_ ...@@ -221,22 +217,22 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReadHandle* pHandle, uint64_
} }
} }
if (blockDataEnsureCapacity(pBlock, *pNumOfRows) < 0) { if (blockDataEnsureCapacity(pBlock, pHandle->msgIter.numOfRows) < 0) {
goto FAIL; goto FAIL;
} }
int32_t colActual = blockDataGetNumOfCols(pBlock); int32_t colActual = blockDataGetNumOfCols(pBlock);
// TODO in stream shuffle case, fetch groupId
*pGroupId = 0;
STSRowIter iter = {0}; STSRowIter iter = {0};
tdSTSRowIterInit(&iter, pTschema); tdSTSRowIterInit(&iter, pTschema);
STSRow* row; STSRow* row;
int32_t curRow = 0; int32_t curRow = 0;
tInitSubmitBlkIter(&pHandle->msgIter, pHandle->pBlock, &pHandle->blkIter); tInitSubmitBlkIter(&pHandle->msgIter, pHandle->pBlock, &pHandle->blkIter);
*pUid = pHandle->msgIter.uid; // set the uid of table for submit block
pBlock->info.groupId = 0;
pBlock->info.uid = pHandle->msgIter.uid; // set the uid of table for submit block
pBlock->info.rows = pHandle->msgIter.numOfRows;
while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) { while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) {
tdSTSRowIterReset(&iter, row); tdSTSRowIterReset(&iter, row);
......
...@@ -507,20 +507,21 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { ...@@ -507,20 +507,21 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
STableScanInfo* pInfo = pOperator->info; STableScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
if(pInfo->currentGroupId == -1){ if (pInfo->currentGroupId == -1) {
pInfo->currentGroupId++; pInfo->currentGroupId++;
if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) { if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) {
setTaskStatus(pTaskInfo, TASK_COMPLETED); setTaskStatus(pTaskInfo, TASK_COMPLETED);
return NULL; return NULL;
} }
SArray *tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId); SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId);
tsdbCleanupReadHandle(pInfo->dataReader); tsdbCleanupReadHandle(pInfo->dataReader);
tsdbReaderT* pReader = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, tableList, pInfo->queryId, pInfo->taskId); tsdbReaderT* pReader =
tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, tableList, pInfo->queryId, pInfo->taskId);
pInfo->dataReader = pReader; pInfo->dataReader = pReader;
} }
SSDataBlock* result = doTableScanGroup(pOperator); SSDataBlock* result = doTableScanGroup(pOperator);
if(result){ if (result) {
return result; return result;
} }
...@@ -530,7 +531,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { ...@@ -530,7 +531,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
return NULL; return NULL;
} }
SArray *tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId); SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId);
tsdbSetTableList(pInfo->dataReader, tableList); tsdbSetTableList(pInfo->dataReader, tableList);
tsdbResetReadHandle(pInfo->dataReader, &pInfo->cond, 0); tsdbResetReadHandle(pInfo->dataReader, &pInfo->cond, 0);
...@@ -538,7 +539,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { ...@@ -538,7 +539,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
pInfo->scanTimes = 0; pInfo->scanTimes = 0;
result = doTableScanGroup(pOperator); result = doTableScanGroup(pOperator);
if(result){ if (result) {
return result; return result;
} }
...@@ -794,13 +795,12 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int3 ...@@ -794,13 +795,12 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int3
SResultWindowInfo* pCurWin = SResultWindowInfo* pCurWin =
getSessionTimeWindow(pAggSup, tsCols[(*pRowIndex)], INT64_MIN, pSDB->info.groupId, gap, &winIndex); getSessionTimeWindow(pAggSup, tsCols[(*pRowIndex)], INT64_MIN, pSDB->info.groupId, gap, &winIndex);
win = pCurWin->win; win = pCurWin->win;
(*pRowIndex) += (*pRowIndex) += updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, (*pRowIndex), gap, NULL);
updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, (*pRowIndex), gap, NULL);
} else { } else {
win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[(*pRowIndex)], &pInfo->interval, win =
pInfo->interval.precision, NULL); getActiveTimeWindow(NULL, &dumyInfo, tsCols[(*pRowIndex)], &pInfo->interval, pInfo->interval.precision, NULL);
(*pRowIndex) += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, (*pRowIndex), win.ekey, (*pRowIndex) += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, (*pRowIndex), win.ekey, binarySearchForKey, NULL,
binarySearchForKey, NULL, TSDB_ORDER_ASC); TSDB_ORDER_ASC);
} }
needRead = true; needRead = true;
} else if (isStateWindow(pInfo)) { } else if (isStateWindow(pInfo)) {
...@@ -821,7 +821,7 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int3 ...@@ -821,7 +821,7 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int3
STableScanInfo* pTableScanInfo = pInfo->pSnapshotReadOp->info; STableScanInfo* pTableScanInfo = pInfo->pSnapshotReadOp->info;
pTableScanInfo->cond.twindows[0] = win; pTableScanInfo->cond.twindows[0] = win;
pTableScanInfo->curTWinIdx = 0; pTableScanInfo->curTWinIdx = 0;
// tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0); // tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
// if (!pTableScanInfo->dataReader) { // if (!pTableScanInfo->dataReader) {
// return false; // return false;
// } // }
...@@ -1033,12 +1033,13 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { ...@@ -1033,12 +1033,13 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
while (tqNextDataBlock(pInfo->streamBlockReader)) { while (tqNextDataBlock(pInfo->streamBlockReader)) {
SSDataBlock block = {0}; SSDataBlock block = {0};
uint64_t groupId = 0;
uint64_t uid = 0;
int32_t numOfRows = 0;
// todo refactor // todo refactor
int32_t code = tqRetrieveDataBlock(&block, pInfo->streamBlockReader, &groupId, &uid, &numOfRows); int32_t code = tqRetrieveDataBlock(&block, pInfo->streamBlockReader);
uint64_t groupId = block.info.groupId;
uint64_t uid = block.info.uid;
int32_t numOfRows = block.info.rows;
if (code != TSDB_CODE_SUCCESS || numOfRows == 0) { if (code != TSDB_CODE_SUCCESS || numOfRows == 0) {
pTaskInfo->code = code; pTaskInfo->code = code;
...@@ -1154,9 +1155,9 @@ static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) { ...@@ -1154,9 +1155,9 @@ static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) {
return tableIdList; return tableIdList;
} }
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode,
STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo, SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup, uint64_t queryId,
STimeWindowAggSupp* pTwSup, uint64_t queryId, uint64_t taskId) { uint64_t taskId) {
SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo)); SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
...@@ -1746,7 +1747,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan ...@@ -1746,7 +1747,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
SArray* colList = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &num, COL_MATCH_FROM_COL_ID); SArray* colList = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &num, COL_MATCH_FROM_COL_ID);
pInfo->accountId = pScanPhyNode->accountId; pInfo->accountId = pScanPhyNode->accountId;
pInfo->pUser = taosMemoryStrDup((void*) pUser); pInfo->pUser = taosMemoryStrDup((void*)pUser);
pInfo->showRewrite = pScanPhyNode->showRewrite; pInfo->showRewrite = pScanPhyNode->showRewrite;
pInfo->pRes = pResBlock; pInfo->pRes = pResBlock;
pInfo->pCondition = pScanNode->node.pConditions; pInfo->pCondition = pScanNode->node.pConditions;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册