未验证 提交 e6c5fd87 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #19175 from taosdata/szhou/fixbugs2

enhance: remove assert
...@@ -65,6 +65,8 @@ typedef struct SSysTableScanInfo { ...@@ -65,6 +65,8 @@ typedef struct SSysTableScanInfo {
SSDataBlock* pRes; SSDataBlock* pRes;
int64_t numOfBlocks; // extract basic running information. int64_t numOfBlocks; // extract basic running information.
SLoadRemoteDataInfo loadInfo; SLoadRemoteDataInfo loadInfo;
int32_t tbnameSlotId;
} SSysTableScanInfo; } SSysTableScanInfo;
typedef struct { typedef struct {
...@@ -346,6 +348,11 @@ static int32_t optSysTabFilteImpl(void* arg, SNode* cond, SArray* result); ...@@ -346,6 +348,11 @@ static int32_t optSysTabFilteImpl(void* arg, SNode* cond, SArray* result);
static int32_t optSysCheckOper(SNode* pOpear); static int32_t optSysCheckOper(SNode* pOpear);
static int32_t optSysMergeRslt(SArray* mRslt, SArray* rslt); static int32_t optSysMergeRslt(SArray* mRslt, SArray* rslt);
static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableScanInfo* pInfo, const char* name,
SExecTaskInfo* pTaskInfo);
void extractTbnameSlotId(SSysTableScanInfo* pInfo, const SScanPhysiNode* pScanNode);
static SSDataBlock* sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScanInfo* pInfo,
const char* name, SSDataBlock* pBlock);
__optSysFilter optSysGetFilterFunc(int32_t ctype, bool* reverse) { __optSysFilter optSysGetFilterFunc(int32_t ctype, bool* reverse) {
if (ctype == OP_TYPE_LOWER_EQUAL || ctype == OP_TYPE_LOWER_THAN) { if (ctype == OP_TYPE_LOWER_EQUAL || ctype == OP_TYPE_LOWER_THAN) {
*reverse = true; *reverse = true;
...@@ -1309,84 +1316,112 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { ...@@ -1309,84 +1316,112 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
getDBNameFromCondition(pInfo->pCondition, dbName); getDBNameFromCondition(pInfo->pCondition, dbName);
sprintf(pInfo->req.db, "%d.%s", pInfo->accountId, dbName); sprintf(pInfo->req.db, "%d.%s", pInfo->accountId, dbName);
} }
SSDataBlock* pBlock = NULL;
if (strncasecmp(name, TSDB_INS_TABLE_TABLES, TSDB_TABLE_FNAME_LEN) == 0) { if (strncasecmp(name, TSDB_INS_TABLE_TABLES, TSDB_TABLE_FNAME_LEN) == 0) {
return sysTableScanUserTables(pOperator); pBlock = sysTableScanUserTables(pOperator);
} else if (strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0) { } else if (strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0) {
return sysTableScanUserTags(pOperator); pBlock = sysTableScanUserTags(pOperator);
} else if (strncasecmp(name, TSDB_INS_TABLE_STABLES, TSDB_TABLE_FNAME_LEN) == 0 && pInfo->showRewrite && } else if (strncasecmp(name, TSDB_INS_TABLE_STABLES, TSDB_TABLE_FNAME_LEN) == 0 && pInfo->showRewrite &&
IS_SYS_DBNAME(dbName)) { IS_SYS_DBNAME(dbName)) {
return sysTableScanUserSTables(pOperator); pBlock = sysTableScanUserSTables(pOperator);
} else { // load the meta from mnode of the given epset } else { // load the meta from mnode of the given epset
if (pOperator->status == OP_EXEC_DONE) { pBlock = sysTableScanFromMNode(pOperator, pInfo, name, pTaskInfo);
return NULL; }
}
while (1) {
int64_t startTs = taosGetTimestampUs();
tstrncpy(pInfo->req.tb, tNameGetTableName(&pInfo->name), tListLen(pInfo->req.tb));
tstrncpy(pInfo->req.user, pInfo->pUser, tListLen(pInfo->req.user));
int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req); return sysTableScanFillTbName(pOperator, pInfo, name, pBlock);
char* buf1 = taosMemoryCalloc(1, contLen); }
tSerializeSRetrieveTableReq(buf1, contLen, &pInfo->req);
// send the fetch remote task result reques static SSDataBlock* sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScanInfo* pInfo,
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); const char* name, SSDataBlock* pBlock) {
if (NULL == pMsgSendInfo) { if (pBlock != NULL) {
qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo)); if (pInfo->tbnameSlotId != -1) {
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; SColumnInfoData* pColumnInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, pInfo->tbnameSlotId);
return NULL; char varTbName[TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE] = {0};
memcpy(varDataVal(varTbName), name, strlen(name));
varDataSetLen(varTbName, strlen(name));
for (int i = 0; i < pBlock->info.rows; ++i) {
colDataAppend(pColumnInfoData, i, varTbName, NULL);
} }
doFilterResult(pBlock, pOperator->exprSupp.pFilterInfo);
}
}
if (pBlock && pBlock->info.rows != 0) {
return pBlock;
} else {
return NULL;
}
}
int32_t msgType = (strcasecmp(name, TSDB_INS_TABLE_DNODE_VARIABLES) == 0) ? TDMT_DND_SYSTABLE_RETRIEVE static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableScanInfo* pInfo, const char* name,
: TDMT_MND_SYSTABLE_RETRIEVE; SExecTaskInfo* pTaskInfo) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
pMsgSendInfo->param = pOperator; while (1) {
pMsgSendInfo->msgInfo.pData = buf1; int64_t startTs = taosGetTimestampUs();
pMsgSendInfo->msgInfo.len = contLen; tstrncpy(pInfo->req.tb, tNameGetTableName(&pInfo->name), tListLen(pInfo->req.tb));
pMsgSendInfo->msgType = msgType; tstrncpy(pInfo->req.user, pInfo->pUser, tListLen(pInfo->req.user));
pMsgSendInfo->fp = loadSysTableCallback;
pMsgSendInfo->requestId = pTaskInfo->id.queryId;
int64_t transporterId = 0; int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req);
int32_t code = char* buf1 = taosMemoryCalloc(1, contLen);
asyncSendMsgToServer(pInfo->readHandle.pMsgCb->clientRpc, &pInfo->epSet, &transporterId, pMsgSendInfo); tSerializeSRetrieveTableReq(buf1, contLen, &pInfo->req);
tsem_wait(&pInfo->ready);
if (pTaskInfo->code) { // send the fetch remote task result reques
qDebug("%s load meta data from mnode failed, totalRows:%" PRIu64 ", code:%s", GET_TASKID(pTaskInfo), SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
pInfo->loadInfo.totalRows, tstrerror(pTaskInfo->code)); if (NULL == pMsgSendInfo) {
return NULL; qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
} pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
SRetrieveMetaTableRsp* pRsp = pInfo->pRsp; int32_t msgType = (strcasecmp(name, TSDB_INS_TABLE_DNODE_VARIABLES) == 0) ? TDMT_DND_SYSTABLE_RETRIEVE
pInfo->req.showId = pRsp->handle; : TDMT_MND_SYSTABLE_RETRIEVE;
if (pRsp->numOfRows == 0 || pRsp->completed) { pMsgSendInfo->param = pOperator;
pOperator->status = OP_EXEC_DONE; pMsgSendInfo->msgInfo.pData = buf1;
qDebug("%s load meta data from mnode completed, rowsOfSource:%d, totalRows:%" PRIu64, GET_TASKID(pTaskInfo), pMsgSendInfo->msgInfo.len = contLen;
pRsp->numOfRows, pInfo->loadInfo.totalRows); pMsgSendInfo->msgType = msgType;
pMsgSendInfo->fp = loadSysTableCallback;
pMsgSendInfo->requestId = pTaskInfo->id.queryId;
if (pRsp->numOfRows == 0) { int64_t transporterId = 0;
taosMemoryFree(pRsp); int32_t code =
return NULL; asyncSendMsgToServer(pInfo->readHandle.pMsgCb->clientRpc, &pInfo->epSet, &transporterId, pMsgSendInfo);
} tsem_wait(&pInfo->ready);
}
char* pStart = pRsp->data; if (pTaskInfo->code) {
extractDataBlockFromFetchRsp(pInfo->pRes, pRsp->data, pInfo->matchInfo.pList, &pStart); qDebug("%s load meta data from mnode failed, totalRows:%" PRIu64 ", code:%s", GET_TASKID(pTaskInfo),
updateLoadRemoteInfo(&pInfo->loadInfo, pRsp->numOfRows, pRsp->compLen, startTs, pOperator); pInfo->loadInfo.totalRows, tstrerror(pTaskInfo->code));
return NULL;
}
// todo log the filter info SRetrieveMetaTableRsp* pRsp = pInfo->pRsp;
doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo); pInfo->req.showId = pRsp->handle;
taosMemoryFree(pRsp);
if (pInfo->pRes->info.rows > 0) { if (pRsp->numOfRows == 0 || pRsp->completed) {
return pInfo->pRes; pOperator->status = OP_EXEC_DONE;
} else if (pOperator->status == OP_EXEC_DONE) { qDebug("%s load meta data from mnode completed, rowsOfSource:%d, totalRows:%" PRIu64, GET_TASKID(pTaskInfo),
pRsp->numOfRows, pInfo->loadInfo.totalRows);
if (pRsp->numOfRows == 0) {
taosMemoryFree(pRsp);
return NULL; return NULL;
} }
} }
char* pStart = pRsp->data;
extractDataBlockFromFetchRsp(pInfo->pRes, pRsp->data, pInfo->matchInfo.pList, &pStart);
updateLoadRemoteInfo(&pInfo->loadInfo, pRsp->numOfRows, pRsp->compLen, startTs, pOperator);
// todo log the filter info
doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo);
taosMemoryFree(pRsp);
if (pInfo->pRes->info.rows > 0) {
return pInfo->pRes;
} else if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
} }
} }
...@@ -1407,6 +1442,8 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan ...@@ -1407,6 +1442,8 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
goto _error; goto _error;
} }
extractTbnameSlotId(pInfo, pScanNode);
pInfo->accountId = pScanPhyNode->accountId; pInfo->accountId = pScanPhyNode->accountId;
pInfo->pUser = taosMemoryStrDup((void*)pUser); pInfo->pUser = taosMemoryStrDup((void*)pUser);
pInfo->sysInfo = pScanPhyNode->sysInfo; pInfo->sysInfo = pScanPhyNode->sysInfo;
...@@ -1449,6 +1486,26 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan ...@@ -1449,6 +1486,26 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
return NULL; return NULL;
} }
void extractTbnameSlotId(SSysTableScanInfo* pInfo, const SScanPhysiNode* pScanNode) {
pInfo->tbnameSlotId = -1;
if (pScanNode->pScanPseudoCols != NULL) {
SNode* pNode = NULL;
FOREACH(pNode, pScanNode->pScanPseudoCols) {
STargetNode* pTargetNode = NULL;
if (nodeType(pNode) == QUERY_NODE_TARGET) {
pTargetNode = (STargetNode*)pNode;
SNode* expr = pTargetNode->pExpr;
if (nodeType(expr) == QUERY_NODE_FUNCTION) {
SFunctionNode* pFuncNode = (SFunctionNode*)expr;
if (pFuncNode->funcType == FUNCTION_TYPE_TBNAME) {
pInfo->tbnameSlotId = pTargetNode->slotId;
}
}
}
}
}
}
void destroySysScanOperator(void* param) { void destroySysScanOperator(void* param) {
SSysTableScanInfo* pInfo = (SSysTableScanInfo*)param; SSysTableScanInfo* pInfo = (SSysTableScanInfo*)param;
tsem_destroy(&pInfo->ready); tsem_destroy(&pInfo->ready);
......
...@@ -1230,7 +1230,6 @@ int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode * ...@@ -1230,7 +1230,6 @@ int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode *
if (uvTask->rspBuf.base != NULL) { if (uvTask->rspBuf.base != NULL) {
SUdfResponse rsp = {0}; SUdfResponse rsp = {0};
void *buf = decodeUdfResponse(uvTask->rspBuf.base, &rsp); void *buf = decodeUdfResponse(uvTask->rspBuf.base, &rsp);
assert(uvTask->rspBuf.len == POINTER_DISTANCE(buf, uvTask->rspBuf.base));
task->errCode = rsp.code; task->errCode = rsp.code;
switch (task->type) { switch (task->type) {
......
...@@ -400,7 +400,6 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { ...@@ -400,7 +400,6 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
SUdfdRpcSendRecvInfo *msgInfo = (SUdfdRpcSendRecvInfo *)pMsg->info.ahandle; SUdfdRpcSendRecvInfo *msgInfo = (SUdfdRpcSendRecvInfo *)pMsg->info.ahandle;
ASSERT(pMsg->info.ahandle != NULL);
if (pEpSet) { if (pEpSet) {
if (!isEpsetEqual(&global.mgmtEp.epSet, pEpSet)) { if (!isEpsetEqual(&global.mgmtEp.epSet, pEpSet)) {
......
...@@ -2388,6 +2388,9 @@ static bool tagScanOptShouldBeOptimized(SLogicNode* pNode) { ...@@ -2388,6 +2388,9 @@ static bool tagScanOptShouldBeOptimized(SLogicNode* pNode) {
if (pScan->hasNormalCols) { if (pScan->hasNormalCols) {
return false; return false;
} }
if (pScan->tableType == TSDB_SYSTEM_TABLE) {
return false;
}
if (NULL == pNode->pParent || QUERY_NODE_LOGIC_PLAN_AGG != nodeType(pNode->pParent) || if (NULL == pNode->pParent || QUERY_NODE_LOGIC_PLAN_AGG != nodeType(pNode->pParent) ||
1 != LIST_LENGTH(pNode->pParent->pChildren)) { 1 != LIST_LENGTH(pNode->pParent->pChildren)) {
return false; return false;
......
...@@ -333,13 +333,23 @@ static bool stbSplHasPartTbname(SNodeList* pPartKeys) { ...@@ -333,13 +333,23 @@ static bool stbSplHasPartTbname(SNodeList* pPartKeys) {
return false; return false;
} }
static bool stbSplIsPartTableAgg(SAggLogicNode* pAgg) { static bool stbSplNotSystemScan(SLogicNode* pNode) {
if (NULL != pAgg->pGroupKeys) { if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
return stbSplHasPartTbname(pAgg->pGroupKeys); return SCAN_TYPE_SYSTEM_TABLE != ((SScanLogicNode*)pNode)->scanType;
} else if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) {
return stbSplNotSystemScan((SLogicNode*)nodesListGetNode(pNode->pChildren, 0));
} else {
return true;
} }
}
static bool stbSplIsPartTableAgg(SAggLogicNode* pAgg) {
if (1 != LIST_LENGTH(pAgg->node.pChildren)) { if (1 != LIST_LENGTH(pAgg->node.pChildren)) {
return false; return false;
} }
if (NULL != pAgg->pGroupKeys) {
return stbSplHasPartTbname(pAgg->pGroupKeys) && stbSplNotSystemScan((SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0));
}
return stbSplHasPartTbname(stbSplGetPartKeys((SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0))); return stbSplHasPartTbname(stbSplGetPartKeys((SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0)));
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册