提交 fafdfc73 编写于 作者: dengyihao's avatar dengyihao

fix TD-19686

上级 d53ab62f
......@@ -30,10 +30,10 @@
// The numOfOutputGroups is specified by physical plan. and will not be affect by numOfGroups
struct STableListInfo {
bool oneTableForEachGroup;
int32_t numOfOuputGroups; // the data block will be generated one by one
int32_t* groupOffset; // keep the offset value for each group in the tableList
int32_t numOfOuputGroups; // the data block will be generated one by one
int32_t* groupOffset; // keep the offset value for each group in the tableList
SArray* pTableList;
SHashObj* map; // speedup acquire the tableQueryInfo by table uid
SHashObj* map; // speedup acquire the tableQueryInfo by table uid
uint64_t suid;
};
......@@ -1678,9 +1678,7 @@ uint64_t tableListGetSize(const STableListInfo* pTableList) {
return taosArrayGetSize(pTableList->pTableList);
}
uint64_t tableListGetSuid(const STableListInfo* pTableList) {
return pTableList->suid;
}
uint64_t tableListGetSuid(const STableListInfo* pTableList) { return pTableList->suid; }
STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index) {
if (taosArrayGetSize(pTableList->pTableList) == 0) {
......@@ -1718,7 +1716,7 @@ int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t
}
int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t ordinalGroupIndex, STableKeyInfo** pKeyInfo,
int32_t* size) {
int32_t* size) {
int32_t total = tableListGetOutputGroups(pTableList);
if (ordinalGroupIndex < 0 || ordinalGroupIndex >= total) {
return TSDB_CODE_INVALID_PARA;
......@@ -1728,7 +1726,7 @@ int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t ordinalG
// 1. only one group exists, and 2. one table exists for each group.
if (total == 1) {
*size = tableListGetSize(pTableList);
*pKeyInfo = (*size == 0)? NULL:taosArrayGet(pTableList->pTableList, 0);
*pKeyInfo = (*size == 0) ? NULL : taosArrayGet(pTableList->pTableList, 0);
return TSDB_CODE_SUCCESS;
} else if (total == tableListGetSize(pTableList)) {
*size = 1;
......@@ -1806,13 +1804,13 @@ void tableListClear(STableListInfo* pTableListInfo) {
}
static int32_t orderbyGroupIdComparFn(const void* p1, const void* p2) {
STableKeyInfo* pInfo1 = (STableKeyInfo*) p1;
STableKeyInfo* pInfo2 = (STableKeyInfo*) p2;
STableKeyInfo* pInfo1 = (STableKeyInfo*)p1;
STableKeyInfo* pInfo2 = (STableKeyInfo*)p2;
if (pInfo1->groupId == pInfo2->groupId) {
return 0;
} else {
return pInfo1->groupId < pInfo2->groupId? -1:1;
return pInfo1->groupId < pInfo2->groupId ? -1 : 1;
}
}
......@@ -1825,12 +1823,12 @@ static int32_t sortTableGroup(STableListInfo* pTableListInfo) {
SArray* pList = taosArrayInit(4, sizeof(int32_t));
STableKeyInfo* pInfo = taosArrayGet(pTableListInfo->pTableList, 0);
uint64_t gid = pInfo->groupId;
uint64_t gid = pInfo->groupId;
int32_t start = 0;
taosArrayPush(pList, &start);
for(int32_t i = 1; i < size; ++i) {
for (int32_t i = 1; i < size; ++i) {
pInfo = taosArrayGet(pTableListInfo->pTableList, i);
if (pInfo->groupId != gid) {
taosArrayPush(pList, &i);
......@@ -1845,16 +1843,17 @@ static int32_t sortTableGroup(STableListInfo* pTableListInfo) {
return TDB_CODE_SUCCESS;
}
int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group, bool groupSort) {
int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group,
bool groupSort) {
int32_t code = TSDB_CODE_SUCCESS;
ASSERT(pTableListInfo->map != NULL);
bool groupByTbname = groupbyTbname(group);
bool groupByTbname = groupbyTbname(group);
size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
if (group == NULL || groupByTbname) {
for (int32_t i = 0; i < numOfTables; i++) {
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
info->groupId = groupByTbname? info->uid:0;
info->groupId = groupByTbname ? info->uid : 0;
}
pTableListInfo->oneTableForEachGroup = groupByTbname;
......@@ -1878,7 +1877,7 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle*
// add all table entry in the hash map
size_t size = taosArrayGetSize(pTableListInfo->pTableList);
for(int32_t i = 0; i < size; ++i) {
for (int32_t i = 0; i < size; ++i) {
STableKeyInfo* p = taosArrayGet(pTableListInfo->pTableList, i);
taosHashPut(pTableListInfo->map, &p->uid, sizeof(uint64_t), &i, sizeof(int32_t));
}
......@@ -1889,7 +1888,7 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle*
int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle,
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
struct SExecTaskInfo* pTaskInfo) {
int64_t st = taosGetTimestampUs();
int64_t st = taosGetTimestampUs();
const char* idStr = GET_TASKID(pTaskInfo);
if (pHandle == NULL) {
......@@ -1919,7 +1918,7 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
return code;
}
pTaskInfo->cost.groupIdMapTime = (taosGetTimestampUs() - st1)/1000.0;
pTaskInfo->cost.groupIdMapTime = (taosGetTimestampUs() - st1) / 1000.0;
qDebug("generate group id map completed, elapsed time:%.2f ms %s", pTaskInfo->cost.groupIdMapTime, idStr);
return TSDB_CODE_SUCCESS;
......
......@@ -344,7 +344,8 @@ static bool doLoadBlockSMA(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
return true;
}
static void doSetTagColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, int32_t rows) {
static void doSetTagColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo,
int32_t rows) {
if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
SExprSupp* pSup = &pTableScanInfo->pseudoSup;
......@@ -1068,8 +1069,11 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi
size_t num = tableListGetSize(pTableListInfo);
void* pList = tableListGetInfo(pTableListInfo, 0);
tsdbReaderOpen(readHandle->vnode, &cond, pList, num, &pInfo->pHandle, pTaskInfo->id.str);
cleanupQueryTableDataCond(&cond);
code = tsdbReaderOpen(readHandle->vnode, &cond, pList, num, &pInfo->pHandle, pTaskInfo->id.str);
if (code != 0) {
cleanupQueryTableDataCond(&cond);
goto _error;
}
}
pInfo->readHandle = *readHandle;
......@@ -1163,6 +1167,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
T_LONG_JMP(pTaskInfo->env, code);
return NULL;
}
......@@ -2419,8 +2424,9 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
if (pHandle->initTableReader) {
pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
pTSInfo->dataReader = NULL;
if (tsdbReaderOpen(pHandle->vnode, &pTSInfo->cond, pList, num, &pTSInfo->dataReader, NULL) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
int32_t code = tsdbReaderOpen(pHandle->vnode, &pTSInfo->cond, pList, num, &pTSInfo->dataReader, NULL);
if (code != 0) {
terrno = code;
destroyTableScanOperatorInfo(pTableScanOp);
goto _error;
}
......@@ -4285,130 +4291,6 @@ int32_t createMultipleDataReaders(SQueryTableDataCond* pQueryCond, SReadHandle*
return TSDB_CODE_SUCCESS;
}
int32_t createMultipleDataReaders2(SQueryTableDataCond* pQueryCond, SReadHandle* pHandle,
STableListInfo* pTableListInfo, int32_t tableStartIdx, int32_t tableEndIdx,
STsdbReader** ppReader, const char* idstr) {
STsdbReader* pReader = NULL;
void* pStart = tableListGetInfo(pTableListInfo, tableStartIdx);
int32_t num = tableEndIdx - tableStartIdx + 1;
int32_t code = tsdbReaderOpen(pHandle->vnode, pQueryCond, pStart, num, &pReader, idstr);
if (code != 0) {
return code;
}
*ppReader = pReader;
return TSDB_CODE_SUCCESS;
}
static int32_t loadDataBlockFromOneTable2(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo,
SSDataBlock* pBlock, uint32_t* status) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STableMergeScanInfo* pInfo = pOperator->info;
uint64_t uid = pBlock->info.uid;
SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder;
pCost->totalBlocks += 1;
pCost->totalRows += pBlock->info.rows;
*status = pInfo->dataBlockLoadFlag;
if (pTableScanInfo->pFilterNode != NULL ||
overlapWithTimeWindow(&pTableScanInfo->interval, &pBlock->info, pTableScanInfo->cond.order)) {
(*status) = FUNC_DATA_REQUIRED_DATA_LOAD;
}
SDataBlockInfo* pBlockInfo = &pBlock->info;
taosMemoryFreeClear(pBlock->pBlockAgg);
if (*status == FUNC_DATA_REQUIRED_FILTEROUT) {
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
pCost->filterOutBlocks += 1;
return TSDB_CODE_SUCCESS;
} else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
pCost->skipBlocks += 1;
// clear all data in pBlock that are set when handing the previous block
for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
SColumnInfoData* pcol = taosArrayGet(pBlock->pDataBlock, i);
pcol->pData = NULL;
}
return TSDB_CODE_SUCCESS;
} else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) {
pCost->loadBlockStatis += 1;
bool allColumnsHaveAgg = true;
SColumnDataAgg** pColAgg = NULL;
STsdbReader* reader = pTableScanInfo->pReader;
tsdbRetrieveDatablockSMA(reader, &pColAgg, &allColumnsHaveAgg);
if (allColumnsHaveAgg == true) {
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
// todo create this buffer during creating operator
if (pBlock->pBlockAgg == NULL) {
pBlock->pBlockAgg = taosMemoryCalloc(numOfCols, POINTER_BYTES);
}
for (int32_t i = 0; i < numOfCols; ++i) {
SColMatchItem* pColMatchInfo = taosArrayGet(pTableScanInfo->matchInfo.pList, i);
if (!pColMatchInfo->needOutput) {
continue;
}
pBlock->pBlockAgg[pColMatchInfo->dstSlotId] = pColAgg[i];
}
return TSDB_CODE_SUCCESS;
} else { // failed to load the block sma data, data block statistics does not exist, load data block instead
*status = FUNC_DATA_REQUIRED_DATA_LOAD;
}
}
ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD);
pCost->totalCheckedRows += pBlock->info.rows;
pCost->loadBlocks += 1;
STsdbReader* reader = pTableScanInfo->pReader;
SArray* pCols = tsdbRetrieveDataBlock(reader, NULL);
if (pCols == NULL) {
return terrno;
}
relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true);
// currently only the tbname pseudo column
if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pseudoSup.pExprInfo,
pTableScanInfo->pseudoSup.numOfExprs, pBlock, pBlock->info.rows, GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
}
if (pTableScanInfo->pFilterNode != NULL) {
int64_t st = taosGetTimestampMs();
doFilter(pTableScanInfo->pFilterNode, pBlock, &pTableScanInfo->matchInfo, NULL);
double el = (taosGetTimestampUs() - st) / 1000.0;
pTableScanInfo->readRecorder.filterTime += el;
if (pBlock->info.rows == 0) {
pCost->filterOutBlocks += 1;
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d, elapsed time:%.2f ms",
GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, el);
} else {
qDebug("%s data block filter applied, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), el);
}
}
return TSDB_CODE_SUCCESS;
}
// todo refactor
static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo,
int32_t readerIdx, SSDataBlock* pBlock, uint32_t* status) {
......@@ -4501,8 +4383,9 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
// currently only the tbname pseudo column
if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pseudoSup.pExprInfo,
pTableScanInfo->pseudoSup.numOfExprs, pBlock, pBlock->info.rows, GET_TASKID(pTaskInfo));
int32_t code =
addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pseudoSup.pExprInfo,
pTableScanInfo->pseudoSup.numOfExprs, pBlock, pBlock->info.rows, GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
......@@ -4534,7 +4417,7 @@ typedef struct STableMergeScanSortSourceParam {
SSDataBlock* inputBlock;
} STableMergeScanSortSourceParam;
static SSDataBlock* getTableDataBlockTemp(void* param) {
static SSDataBlock* getTableDataBlockImpl(void* param) {
STableMergeScanSortSourceParam* source = param;
SOperatorInfo* pOperator = source->pOperator;
STableMergeScanInfo* pInfo = pOperator->info;
......@@ -4551,7 +4434,11 @@ static SSDataBlock* getTableDataBlockTemp(void* param) {
void* p = tableListGetInfo(pInfo->tableListInfo, readIdx + pInfo->tableStartIndex);
SReadHandle* pHandle = &pInfo->readHandle;
tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, &pInfo->pReader, GET_TASKID(pTaskInfo));
int32_t code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, &pInfo->pReader, GET_TASKID(pTaskInfo));
if (code != 0) {
T_LONG_JMP(pOperator->pTaskInfo->env, code);
}
STsdbReader* reader = pInfo->pReader;
while (tsdbNextDataBlock(reader)) {
......@@ -4602,55 +4489,6 @@ static SSDataBlock* getTableDataBlockTemp(void* param) {
pInfo->pReader = NULL;
return NULL;
}
static SSDataBlock* getTableDataBlock2(void* param) {
STableMergeScanSortSourceParam* source = param;
SOperatorInfo* pOperator = source->pOperator;
int64_t uid = source->uid;
SSDataBlock* pBlock = source->inputBlock;
STableMergeScanInfo* pTableScanInfo = pOperator->info;
int64_t st = taosGetTimestampUs();
blockDataCleanup(pBlock);
STsdbReader* reader = pTableScanInfo->pReader;
while (tsdbTableNextDataBlock(reader, uid)) {
if (isTaskKilled(pOperator->pTaskInfo)) {
T_LONG_JMP(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
// process this data block based on the probabilities
bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample);
if (!processThisBlock) {
continue;
}
blockDataCleanup(pBlock);
int32_t rows = 0;
tsdbRetrieveDataBlockInfo(reader, &rows, &pBlock->info.uid, &pBlock->info.window);
blockDataEnsureCapacity(pBlock, rows);
pBlock->info.rows = rows;
uint32_t status = 0;
int32_t code = loadDataBlockFromOneTable2(pOperator, pTableScanInfo, pBlock, &status);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pOperator->pTaskInfo->env, code);
}
// current block is filter out according to filter condition, continue load the next block
if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
continue;
}
pBlock->info.groupId = getTableGroupId(pOperator->pTaskInfo->pTableInfoList, pBlock->info.uid);
pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows;
pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
return pBlock;
}
return NULL;
}
static SSDataBlock* getTableDataBlock(void* param) {
STableMergeScanSortSourceParam* source = param;
......@@ -4760,7 +4598,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
pInfo->pSortInputBlock, pTaskInfo->id.str);
tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlockTemp, NULL, NULL);
tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlockImpl, NULL, NULL);
// one table has one data block
int32_t numOfTable = tableEndIdx - tableStartIdx + 1;
......
......@@ -127,7 +127,7 @@ static void uvFreeCb(uv_handle_t* handle);
static FORCE_INLINE void uvStartSendRespImpl(SSvrMsg* smsg);
static void uvPrepareSendData(SSvrMsg* msg, uv_buf_t* wb);
static int uvPrepareSendData(SSvrMsg* msg, uv_buf_t* wb);
static void uvStartSendResp(SSvrMsg* msg);
static void uvNotifyLinkBrokenToApp(SSvrConn* conn);
......@@ -382,7 +382,7 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) {
taosMemoryFree(req);
}
static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
SSvrConn* pConn = smsg->pConn;
STransMsg* pMsg = &smsg->msg;
if (pMsg->pCont == 0) {
......@@ -395,6 +395,13 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
pHead->hasEpSet = pMsg->info.hasEpSet;
pHead->magicNum = htonl(TRANS_MAGIC_NUM);
// handle invalid drop_task resp, TD-20098
if (pMsg->msgType == TDMT_SCH_DROP_TASK && pMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
transQueuePop(&pConn->srvMsgs);
destroySmsg(smsg);
return -1;
}
if (pConn->status == ConnNormal) {
pHead->msgType = (0 == pMsg->msgType ? pConn->inType + 1 : pMsg->msgType);
if (smsg->type == Release) pHead->msgType = 0;
......@@ -429,6 +436,7 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
wb->base = (char*)pHead;
wb->len = len;
return 0;
}
static FORCE_INLINE void uvStartSendRespImpl(SSvrMsg* smsg) {
......@@ -438,7 +446,9 @@ static FORCE_INLINE void uvStartSendRespImpl(SSvrMsg* smsg) {
}
uv_buf_t wb;
uvPrepareSendData(smsg, &wb);
if (uvPrepareSendData(smsg, &wb) < 0) {
return;
}
transRefSrvHandle(pConn);
uv_write_t* req = transReqQueuePush(&pConn->wreqQueue);
......@@ -449,8 +459,9 @@ static void uvStartSendResp(SSvrMsg* smsg) {
SSvrConn* pConn = smsg->pConn;
if (pConn->broken == true) {
// persist by
transFreeMsg(smsg->msg.pCont);
taosMemoryFree(smsg);
destroySmsg(smsg);
// transFreeMsg(smsg->msg.pCont);
// taosMemoryFree(smsg);
transUnrefSrvHandle(pConn);
return;
}
......@@ -746,10 +757,11 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
return;
}
transSockInfo2Str(&sockname, pConn->src);
struct sockaddr_in addr = *(struct sockaddr_in*)&sockname;
struct sockaddr_in addr = *(struct sockaddr_in*)&peername;
pConn->clientIp = addr.sin_addr.s_addr;
pConn->port = ntohs(addr.sin_port);
uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocRecvBufferCb, uvOnRecvCb);
} else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册