提交 43d79129 编写于 作者: D dapan1121

feature/scheduler

上级 bfe1a136
......@@ -190,20 +190,14 @@ typedef struct SEp {
} SEp;
typedef struct {
char dbFName[TSDB_DB_FNAME_LEN];
int32_t contLen;
int32_t vgId;
} SMsgHead;
typedef struct {
char dbFName[TSDB_DB_FNAME_LEN];
} SRspHead;
// Submit message for one table
typedef struct SSubmitBlk {
int64_t uid; // table unique id
int32_t tid; // table id
char tableName[TSDB_TABLE_NAME_LEN];
int32_t padding; // TODO just for padding here
int32_t sversion; // data schema version
int32_t dataLen; // data part length, not including the SSubmitBlk head
......@@ -466,7 +460,6 @@ typedef struct {
typedef struct {
int32_t code;
SName tableName;
} SQueryTableRsp;
int32_t tSerializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp);
......@@ -1356,6 +1349,7 @@ typedef struct {
typedef struct SVCreateTbReq {
int64_t ver; // use a general definition
char* dbFName;
char* name;
uint32_t ttl;
uint32_t keep;
......@@ -1381,7 +1375,6 @@ typedef struct SVCreateTbReq {
typedef struct {
int32_t code;
SName tableName;
int tmp; // TODO: to avoid compile error
} SVCreateTbRsp, SVUpdateTbRsp;
......
......@@ -69,7 +69,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isA
* @param qId
* @return
*/
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, SQueryErrorInfo *errInfo);
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle);
/**
* The main task execution function, including query on both table and multiple tables,
......
......@@ -134,11 +134,6 @@ typedef struct SQueryNodeStat {
int32_t tableNum; // vg table number, unit is TSDB_TABLE_NUM_UNIT
} SQueryNodeStat;
typedef struct SQueryErrorInfo {
int32_t code;
SName tableName;
} SQueryErrorInfo;
int32_t initTaskQueue();
int32_t cleanupTaskQueue();
......@@ -181,7 +176,6 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
#define SET_META_TYPE_TABLE(t) (t) = META_TYPE_TABLE
#define SET_META_TYPE_BOTH_TABLE(t) (t) = META_TYPE_BOTH_TABLE
#define ONLY_RSP_HEAD_ERROR(_code) ((_code) == TSDB_CODE_VND_INVALID_VGROUP_ID)
#define NEED_CLIENT_RM_TBLMETA_ERROR(_code) ((_code) == TSDB_CODE_TDB_INVALID_TABLE_ID || (_code) == TSDB_CODE_VND_TB_NOT_EXIST)
#define NEED_CLIENT_REFRESH_VG_ERROR(_code) ((_code) == TSDB_CODE_VND_HASH_MISMATCH || (_code) == TSDB_CODE_VND_INVALID_VGROUP_ID)
#define NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code) ((_code) == TSDB_CODE_TDB_TABLE_RECREATED)
......@@ -189,6 +183,8 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
#define NEED_SCHEDULER_RETRY_ERROR(_code) ((_code) == TSDB_CODE_RPC_REDIRECT)
#define REQUEST_MAX_RETRY_NUM 3
#define qFatal(...) \
do { \
if (qDebugFlag & DEBUG_FATAL) { \
......
......@@ -53,7 +53,6 @@ typedef struct SQueryProfileSummary {
typedef struct SQueryResult {
int32_t code;
SArray *errList; // SArray<SQueryErrorInfo>
uint64_t numOfRows;
int32_t msgSize;
char *msg;
......
......@@ -183,7 +183,8 @@ typedef struct SRequestObj {
char* msgBuf;
void* pInfo; // sql parse info, generated by parser module
int32_t code;
SArray* errList; // SArray<SQueryErrorInfo>
SArray* dbList;
SArray* tableList;
SQueryExecMetric metric;
SRequestSendRecvBody body;
} SRequestObj;
......
......@@ -231,7 +231,6 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
schedulerFreeJob(pRequest->body.queryJob);
}
pRequest->errList = res.errList;
pRequest->code = code;
terrno = code;
return pRequest->code;
......@@ -245,7 +244,6 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
}
}
pRequest->errList = res.errList;
pRequest->code = res.code;
terrno = res.code;
return pRequest->code;
......@@ -277,125 +275,61 @@ _return:
return pRequest;
}
int32_t clientProcessErrorList(SArray **pList) {
SArray *errList = *pList;
int32_t errNum = (int32_t)taosArrayGetSize(errList);
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
SCatalog *pCatalog = NULL;
int32_t code = 0;
int32_t dbNum = taosArrayGetSize(pRequest->dbList);
int32_t tblNum = taosArrayGetSize(pRequest->tableList);
if (dbNum <= 0 && tblNum <= 0) {
return TSDB_CODE_QRY_APP_ERROR;
}
for (int32_t i = 0; i < errNum; ++i) {
SQueryErrorInfo *errInfo = taosArrayGet(errList, i);
if (NEED_CLIENT_REFRESH_VG_ERROR(errInfo->code)) {
if (i == (errNum - 1)) {
break;
}
// TODO REMOVE SAME DB ERROR
} else if (NEED_CLIENT_REFRESH_TBLMETA_ERROR(errInfo->code) || NEED_CLIENT_RM_TBLMETA_ERROR(errInfo->code)) {
continue;
} else {
taosArrayRemove(errList, i);
--i;
--errNum;
code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
SEpSet epset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
for (int32_t i = 0; i < dbNum; ++i) {
char *dbFName = taosArrayGet(pRequest->dbList, i);
code = catalogRefreshDBVgInfo(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, dbFName);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
if (0 == errNum) {
taosArrayDestroy(*pList);
*pList = NULL;
for (int32_t i = 0; i < tblNum; ++i) {
SName *tableName = taosArrayGet(pRequest->tableList, i);
code = catalogRefreshTableMeta(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, tableName, -1);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
return TSDB_CODE_SUCCESS;
return code;
}
SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
SRequestObj* pRequest = NULL;
int32_t retryNum = 0;
int32_t code = 0;
int32_t needRetryNum = 0;
int32_t needRetryFailNum = 0;
while (true) {
while (retryNum++ < REQUEST_MAX_RETRY_NUM) {
pRequest = execQueryImpl(pTscObj, sql, sqlLen);
if (TSDB_CODE_SUCCESS == pRequest->code || NULL == pRequest->errList) {
if (TSDB_CODE_SUCCESS == pRequest->code || !NEED_CLIENT_HANDLE_ERROR(pRequest->code)) {
break;
}
code = clientProcessErrorList(&pRequest->errList);
if (code != TSDB_CODE_SUCCESS || NULL == pRequest->errList) {
code = refreshMeta(pTscObj, pRequest);
if (code) {
pRequest->code = code;
break;
}
int32_t errNum = (int32_t)taosArrayGetSize(pRequest->errList);
for (int32_t i = 0; i < errNum; ++i) {
SQueryErrorInfo *errInfo = taosArrayGet(pRequest->errList, i);
int32_t tcode = 0;
if (NEED_CLIENT_REFRESH_VG_ERROR(errInfo->code)) {
++needRetryNum;
SCatalog *pCatalog = NULL;
tcode = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
if (tcode != TSDB_CODE_SUCCESS) {
++needRetryFailNum;
code = tcode;
continue;
}
SEpSet epset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(&errInfo->tableName, dbFName);
tcode = catalogRefreshDBVgInfo(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, dbFName);
if (tcode != TSDB_CODE_SUCCESS) {
++needRetryFailNum;
code = tcode;
continue;
}
} else if (NEED_CLIENT_RM_TBLMETA_ERROR(errInfo->code)) {
SCatalog *pCatalog = NULL;
tcode = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
if (tcode != TSDB_CODE_SUCCESS) {
code = tcode;
continue;
}
catalogRemoveTableMeta(pCatalog, &errInfo->tableName);
} else if (NEED_CLIENT_REFRESH_TBLMETA_ERROR(errInfo->code)) {
++needRetryNum;
SCatalog *pCatalog = NULL;
tcode = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
if (tcode != TSDB_CODE_SUCCESS) {
++needRetryFailNum;
code = tcode;
continue;
}
SEpSet epset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(&errInfo->tableName, dbFName);
tcode = catalogRefreshTableMeta(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, &errInfo->tableName, -1);
if (tcode != TSDB_CODE_SUCCESS) {
++needRetryFailNum;
code = tcode;
continue;
}
}
}
if ((needRetryNum && (0 == needRetryFailNum) && (TDMT_VND_SUBMIT != pRequest->type && TDMT_VND_CREATE_TABLE != pRequest->type))
|| (needRetryNum && (needRetryNum > needRetryFailNum) && (TDMT_VND_SUBMIT == pRequest->type && TDMT_VND_CREATE_TABLE == pRequest->type))) {
destroyRequest(pRequest);
continue;
}
break;
}
if (code) {
pRequest->code = code;
}
return pRequest;
......
......@@ -2631,10 +2631,6 @@ int32_t tSerializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->code) < 0) return -1;
if (tEncodeI8(&encoder, pRsp->tableName.type) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->tableName.acctId) < 0) return -1;
if (tEncodeCStr(&encoder, pRsp->tableName.dbname) < 0) return -1;
if (tEncodeCStr(&encoder, pRsp->tableName.tname) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
......@@ -2648,10 +2644,6 @@ int32_t tDeserializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pR
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->code) < 0) return -1;
if (tDecodeI8(&decoder, &pRsp->tableName.type) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->tableName.acctId) < 0) return -1;
if (tDecodeCStrTo(&decoder, pRsp->tableName.dbname) < 0) return -1;
if (tDecodeCStrTo(&decoder, pRsp->tableName.tname) < 0) return -1;
tEndDecode(&decoder);
tCoderClear(&decoder);
......@@ -2669,10 +2661,6 @@ int32_t tSerializeSVCreateTbBatchRsp(void *buf, int32_t bufLen, SVCreateTbBatchR
for (int32_t i = 0; i < num; ++i) {
SVCreateTbRsp *rsp = taosArrayGet(pRsp->rspList, i);
if (tEncodeI32(&encoder, rsp->code) < 0) return -1;
if (tEncodeU8(&encoder, rsp->tableName.type) < 0) return -1;
if (tEncodeI32(&encoder, rsp->tableName.acctId) < 0) return -1;
if (tEncodeCStr(&encoder, rsp->tableName.dbname) < 0) return -1;
if (tEncodeCStr(&encoder, rsp->tableName.tname) < 0) return -1;
}
} else {
if (tEncodeI32(&encoder, 0) < 0) return -1;
......@@ -2697,10 +2685,6 @@ int32_t tDeserializeSVCreateTbBatchRsp(void *buf, int32_t bufLen, SVCreateTbBatc
for (int32_t i = 0; i < num; ++i) {
SVCreateTbRsp rsp = {0};
if (tDecodeI32(&decoder, &rsp.code) < 0) return -1;
if (tDecodeU8(&decoder, &rsp.tableName.type) < 0) return -1;
if (tDecodeI32(&decoder, &rsp.tableName.acctId) < 0) return -1;
if (tDecodeCStrTo(&decoder, rsp.tableName.dbname) < 0) return -1;
if (tDecodeCStrTo(&decoder, rsp.tableName.tname) < 0) return -1;
if (NULL == taosArrayPush(pRsp->rspList, &rsp)) return -1;
}
} else {
......
......@@ -805,24 +805,6 @@ static int32_t dndWriteRpcMsgToVnodeQueue(STaosQueue *pQueue, SRpcMsg *pRpcMsg,
return code;
}
static void dndGenerateResponseHead(SRpcMsg *pMsg, void **pRspHead, int *contLen) {
if (TDMT_VND_SUBMIT != pMsg->msgType && TDMT_VND_QUERY != pMsg->msgType
&& TDMT_VND_CREATE_TABLE != pMsg->msgType && TDMT_VND_TABLE_META != pMsg->msgType) {
return;
}
*pRspHead = rpcMallocCont(sizeof(SRspHead));
if (NULL == *pRspHead) {
return;
}
SMsgHead *pHead = pMsg->pCont;
strcpy(((SRspHead *)(*pRspHead))->dbFName, pHead->dbFName);
*contLen = sizeof(SRspHead);
}
static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) {
SMsgHead *pHead = pMsg->pCont;
pHead->contLen = htonl(pHead->contLen);
......@@ -833,7 +815,6 @@ static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) {
dError("vgId:%d, failed to acquire vnode while process req", pHead->vgId);
if (pMsg->msgType & 1u) {
SRpcMsg rsp = {.handle = pMsg->handle, .code = TSDB_CODE_VND_INVALID_VGROUP_ID};
dndGenerateResponseHead(pMsg, &rsp.pCont, &rsp.contLen);
rpcSendResponse(&rsp);
}
rpcFreeCont(pMsg->pCont);
......
......@@ -61,6 +61,7 @@ STSchema * metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver);
STSma * metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid);
STSmaWrapper * metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid);
SArray * metaGetSmaTbUids(SMeta *pMeta, bool isDup);
int metaGetTbNum(SMeta *pMeta);
SMTbCursor *metaOpenTbCursor(SMeta *pMeta);
void metaCloseTbCursor(SMTbCursor *pTbCur);
......
......@@ -704,6 +704,18 @@ SMTbCursor *metaOpenTbCursor(SMeta *pMeta) {
return pTbCur;
}
int metaGetTbNum(SMeta *pMeta) {
SMetaDB *pDB = pMeta->pDB;
DB_BTREE_STAT *sp1;
pDB->pTbDB->stat(pDB->pNtbIdx, NULL, &sp1, 0);
DB_BTREE_STAT *sp2;
pDB->pTbDB->stat(pDB->pCtbIdx, NULL, &sp2, 0);
return sp1->bt_nkeys + sp2->bt_nkeys;
}
void metaCloseTbCursor(SMTbCursor *pTbCur) {
if (pTbCur) {
if (pTbCur->pCur) {
......
......@@ -27,7 +27,7 @@ int32_t vnodeSync(SVnode *pVnode) { return 0; }
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
pLoad->vgId = pVnode->vgId;
pLoad->role = TAOS_SYNC_STATE_LEADER;
pLoad->numOfTables = 500;
pLoad->numOfTables = metaGetTbNum(pVnode->pMeta);
pLoad->numOfTimeSeries = 400;
pLoad->totalStorage = 300;
pLoad->compStorage = 200;
......
......@@ -85,22 +85,12 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
char tableFName[TSDB_TABLE_FNAME_LEN];
SMsgHead *pHead = (SMsgHead *)pMsg->pCont;
sprintf(tableFName, "%s.%s", pHead->dbFName, pCreateTbReq->name);
sprintf(tableFName, "%s.%s", pCreateTbReq->dbFName, pCreateTbReq->name);
int32_t code = vnodeValidateTableHash(&pVnode->config, tableFName);
if (code) {
SVCreateTbRsp rsp;
rsp.code = code;
tNameFromString(&rsp.tableName, tableFName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
if (NULL == vCreateTbBatchRsp.rspList) {
vCreateTbBatchRsp.rspList = taosArrayInit(reqNum - i, sizeof(SVCreateTbRsp));
if (NULL == vCreateTbBatchRsp.rspList) {
vError("vgId:%d, failed to init array: %d", pVnode->vgId, reqNum - i);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
}
taosArrayPush(vCreateTbBatchRsp.rspList, &rsp);
}
......
......@@ -711,7 +711,7 @@ int32_t getMaximumIdleDurationSec();
void doInvokeUdf(struct SUdfInfo* pUdfInfo, SqlFunctionCtx* pCtx, int32_t idx, int32_t type);
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, SQueryErrorInfo *errInfo);
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId);
#ifdef __cplusplus
}
......
......@@ -84,7 +84,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
}
qTaskInfo_t pTaskInfo = NULL;
code = qCreateExecTask(streamReadHandle, 0, 0, plan, &pTaskInfo, NULL, NULL);
code = qCreateExecTask(streamReadHandle, 0, 0, plan, &pTaskInfo, NULL);
if (code != TSDB_CODE_SUCCESS) {
// TODO: destroy SSubplan & pTaskInfo
terrno = code;
......
......@@ -51,11 +51,11 @@ static void freeqinfoFn(void *qhandle) {
qDestroyTask(*handle);
}
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, SQueryErrorInfo *errInfo) {
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle) {
assert(readHandle != NULL && pSubplan != NULL);
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, errInfo);
int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
......
......@@ -8126,7 +8126,7 @@ static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t
static SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo);
static SArray* extractScanColumnId(SNodeList* pNodeList);
SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo, SQueryErrorInfo *errInfo) {
SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) {
if (nodeType(pPhyNode) == QUERY_NODE_PHYSICAL_PLAN_PROJECT) { // ignore the project node
pPhyNode = nodesListGetNode(pPhyNode->pChildren, 0);
}
......@@ -8135,39 +8135,9 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pPhyNode)) {
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode;
if (TSDB_SUPER_TABLE != pScanPhyNode->tableType) {
char tableFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(&pScanPhyNode->tableName, tableFName);
int32_t code = vnodeValidateTableHash(pHandle->config, tableFName);
if (code) {
errInfo->code = code;
errInfo->tableName = pScanPhyNode->tableName;
return NULL;
}
}
STbCfg* pTbCfg = metaGetTbInfoByUid(pHandle->meta, pScanPhyNode->uid);
if (pTbCfg == NULL) {
tb_uid_t uid = 0;
pTbCfg = metaGetTbInfoByName(pHandle->meta, pScanPhyNode->tableName.tname, &uid);
if (pTbCfg) {
errInfo->code = TSDB_CODE_TDB_TABLE_RECREATED;
errInfo->tableName = pScanPhyNode->tableName;
return NULL;
}
errInfo->code = TSDB_CODE_TDB_INVALID_TABLE_ID;
errInfo->tableName = pScanPhyNode->tableName;
return NULL;
}
size_t numOfCols = LIST_LENGTH(pScanPhyNode->pScanCols);
tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, (uint64_t)queryId, taskId);
if (NULL == pDataReader) {
errInfo->code = terrno;
errInfo->tableName = pScanPhyNode->tableName;
return NULL;
}
int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId);
......@@ -8208,10 +8178,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
for (int32_t i = 0; i < size; ++i) {
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo, errInfo);
if (errInfo->code) {
return NULL;
}
SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
SArray* pExprInfo = createExprInfo((SAggPhysiNode*)pPhyNode);
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
......@@ -8330,7 +8297,7 @@ tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle*
return NULL;
}
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, SQueryErrorInfo *errInfo) {
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId) {
uint64_t queryId = pPlan->id.queryId;
int32_t code = TSDB_CODE_SUCCESS;
......@@ -8341,9 +8308,9 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
}
STableGroupInfo group = {0};
(*pTaskInfo)->pRoot = doCreateOperatorTreeNode(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &group, errInfo);
if (errInfo->code) {
code = errInfo->code;
(*pTaskInfo)->pRoot = doCreateOperatorTreeNode(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &group);
if (NULL == (*pTaskInfo)->pRoot) {
code = terrno;
goto _complete;
}
......
......@@ -1230,4 +1230,4 @@ TEST(testCase, time_interval_Operator_Test) {
}
#endif
#pragma GCC diagnostic pop
#pragma GCC diagnosti
\ No newline at end of file
......@@ -121,7 +121,6 @@ static FORCE_INLINE int32_t setBlockInfo(SSubmitBlk *pBlocks, STableDataBlocks*
pBlocks->tid = dataBuf->pTableMeta->suid;
pBlocks->uid = dataBuf->pTableMeta->uid;
pBlocks->sversion = dataBuf->pTableMeta->sversion;
strcpy(pBlocks->tableName, dataBuf->tableName);
if (pBlocks->numOfRows + numOfRows >= INT16_MAX) {
return TSDB_CODE_TSC_INVALID_OPERATION;
......
......@@ -1049,6 +1049,4 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
if (TSDB_CODE_SUCCESS == code) {
code = parseInsertBody(&context);
}
destroyInsertParseContext(&context);
return code;
}
dest
\ No newline at end of file
......@@ -1743,8 +1743,4 @@ int32_t doTranslate(SParseContext* pParseCxt, SQuery* pQuery) {
code = translateQuery(&cxt, pQuery->pRoot);
}
if (TSDB_CODE_SUCCESS == code) {
code = setQuery(&cxt, pQuery);
}
destroyTranslateContext(&cxt);
return code;
}
code = setQu
\ No newline at end of file
......@@ -36,7 +36,7 @@ int32_t qwBuildAndSendFetchRsp(void *connection, SRetrieveTableRsp *pRsp, int32_
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete);
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, void *connection);
int32_t qwBuildAndSendReadyRsp(void *connection, int32_t code);
int32_t qwBuildAndSendQueryRsp(void *connection, int32_t code, SQueryErrorInfo *errInfo);
int32_t qwBuildAndSendQueryRsp(void *connection, int32_t code);
void qwFreeFetchRsp(void *msg);
int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp);
int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp);
......
......@@ -998,7 +998,6 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) {
qTaskInfo_t pTaskInfo = NULL;
DataSinkHandle sinkHandle = NULL;
SQWTaskCtx *ctx = NULL;
SQueryErrorInfo errInfo = {0};
QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, &output));
......@@ -1020,7 +1019,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) {
QW_ERR_JRET(code);
}
code = qCreateExecTask(qwMsg->node, 0, tId, (struct SSubplan *)plan, &pTaskInfo, &sinkHandle, &errInfo);
code = qCreateExecTask(qwMsg->node, 0, tId, (struct SSubplan *)plan, &pTaskInfo, &sinkHandle);
if (code) {
QW_TASK_ELOG("qCreateExecTask failed, code:%s", tstrerror(code));
QW_ERR_JRET(code);
......@@ -1033,7 +1032,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) {
//TODO OPTIMIZE EMTYP RESULT QUERY RSP TO AVOID FURTHER FETCH
QW_ERR_JRET(qwBuildAndSendQueryRsp(qwMsg->connection, code, NULL));
QW_ERR_JRET(qwBuildAndSendQueryRsp(qwMsg->connection, code));
QW_TASK_DLOG("query msg rsped, code:%d", code);
queryRsped = true;
......@@ -1052,7 +1051,7 @@ _return:
}
if (!queryRsped) {
qwBuildAndSendQueryRsp(qwMsg->connection, rspCode, &errInfo);
qwBuildAndSendQueryRsp(qwMsg->connection, rspCode);
QW_TASK_DLOG("query msg rsped, code:%x", rspCode);
}
......
......@@ -44,12 +44,9 @@ void qwFreeFetchRsp(void *msg) {
}
}
int32_t qwBuildAndSendQueryRsp(void *connection, int32_t code, SQueryErrorInfo *errInfo) {
int32_t qwBuildAndSendQueryRsp(void *connection, int32_t code) {
SRpcMsg *pMsg = (SRpcMsg *)connection;
SQueryTableRsp rsp = {.code = code};
if (errInfo && errInfo->code) {
rsp.tableName = errInfo->tableName;
}
int32_t contLen = tSerializeSQueryTableRsp(NULL, 0, &rsp);
void *msg = rpcMallocCont(contLen);
......
......@@ -1363,4 +1363,4 @@ int main(int argc, char** argv) {
return RUN_ALL_TESTS();
}
#pragma GCC diagnostic pop
#
\ No newline at end of file
......@@ -223,7 +223,7 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough);
int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask);
int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask);
int32_t schFetchFromRemote(SSchJob *pJob);
int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode, SArray *errList);
int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode);
#ifdef __cplusplus
......
......@@ -259,7 +259,7 @@ _return:
SCH_UNLOCK(SCH_WRITE, &ctrl->lock);
if (code) {
code = schProcessOnTaskFailure(pJob, pTask, code, NULL);
code = schProcessOnTaskFailure(pJob, pTask, code);
}
SCH_RET(code);
......
......@@ -671,14 +671,42 @@ int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchHbTrans *trans) {
return TSDB_CODE_SUCCESS;
}
int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCode) {
// if already FAILED, no more processing
SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, status));
void schUpdateJobErrCode(SSchJob *pJob, int32_t errCode) {
if (TSDB_CODE_SUCCESS == errCode) {
return;
}
int32_t origCode = atomic_load_32(&pJob->errCode);
if (TSDB_CODE_SUCCESS == origCode) {
if (origCode == atomic_val_compare_exchange_32(&pJob->errCode, origCode, errCode)) {
goto _return;
}
origCode = atomic_load_32(&pJob->errCode);
}
if (NEED_CLIENT_HANDLE_ERROR(origCode)) {
return;
}
if (errCode) {
if (NEED_CLIENT_HANDLE_ERROR(errCode)) {
atomic_store_32(&pJob->errCode, errCode);
goto _return;
}
return;
_return:
SCH_JOB_DLOG("job errCode updated to %x - %s", errCode, tstrerror(errCode));
}
int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCode) {
// if already FAILED, no more processing
SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, status));
schUpdateJobErrCode(pJob, errCode);
if (atomic_load_8(&pJob->userFetch) || pJob->attr.syncSchedule) {
tsem_post(&pJob->rspSem);
}
......@@ -729,48 +757,12 @@ int32_t schProcessOnDataFetched(SSchJob *job) {
tsem_post(&job->rspSem);
}
int32_t schPushToErrInfoList(SSchJob *pJob, SSchTask *pTask, SArray *errList) {
if (NULL == errList || !SCH_IS_DATA_SRC_TASK(pTask)) {
return TSDB_CODE_SUCCESS;
}
if (NULL == pJob->errList) {
SSchLevel *level = taosArrayGetLast(pJob->levels);
pJob->errList = taosArrayInit(level->taskNum, sizeof(SQueryErrorInfo));
if (NULL == pJob->errList) {
SCH_TASK_ELOG("taosArrayInit %d errInfofailed", pJob->taskNum);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
}
SQueryErrorInfo *errInfo = NULL;
int32_t errNum = taosArrayGetSize(errList);
for (int32_t i = 0; i < errNum; ++i) {
errInfo = taosArrayGet(errList, i);
if (!NEED_CLIENT_HANDLE_ERROR(errInfo->code)) {
continue;
}
if (NULL == taosArrayPush(pJob->errList, errInfo)) {
SCH_TASK_ELOG("taosArrayPush errInfo to list failed, errCode:%x", errInfo->code);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
}
return TSDB_CODE_SUCCESS;
}
// Note: no more task error processing, handled in function internal
int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode, SArray *errList) {
int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
int8_t status = 0;
if (schJobNeedToStop(pJob, &status)) {
SCH_TASK_DLOG("task failed not processed cause of job status, job status:%d", status);
taosArrayDestroy(errList);
SCH_RET(atomic_load_32(&pJob->errCode));
}
......@@ -794,8 +786,6 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode,
}
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_FAILED);
SCH_ERR_JRET(schPushToErrInfoList(pJob, pTask, errList));
if (SCH_IS_WAIT_ALL_JOB(pJob)) {
SCH_LOCK(SCH_WRITE, &pTask->level->lock);
......@@ -803,17 +793,14 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode,
taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);
atomic_store_32(&pJob->errCode, errCode);
schUpdateJobErrCode(pJob, errCode);
if (taskDone < pTask->level->taskNum) {
SCH_TASK_DLOG("need to wait other tasks, doneNum:%d, allNum:%d", taskDone, pTask->level->taskNum);
taosArrayDestroy(errList);
SCH_RET(errCode);
}
}
} else {
taosArrayDestroy(errList);
SCH_ERR_JRET(schHandleTaskRetry(pJob, pTask));
return TSDB_CODE_SUCCESS;
......@@ -821,8 +808,6 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode,
_return:
taosArrayDestroy(errList);
SCH_RET(schProcessOnJobFailure(pJob, errCode));
}
......@@ -930,30 +915,7 @@ _return:
atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0);
SCH_RET(schProcessOnTaskFailure(pJob, pJob->fetchTask, code, NULL));
}
int32_t schRspHeadToErrList(SSchJob *pJob, SSchTask *pTask, int32_t errCode, SRspHead *head, SArray **errList) {
SQueryErrorInfo errInfo = {0};
errInfo.code = errCode;
if (tNameFromString(&errInfo.tableName, head->dbFName, T_NAME_ACCT | T_NAME_DB)) {
SCH_TASK_ELOG("invalid rsp head, dbFName:%s", head->dbFName);
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
*errList = taosArrayInit(1, sizeof(SQueryErrorInfo));
if (NULL == *errList) {
SCH_TASK_ELOG("taskArrayInit %d errInfo failed", 1);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
if (NULL == taosArrayPush(*errList, &errInfo)) {
SCH_TASK_ELOG("taosArrayPush err to errList failed, dbFName:%s", head->dbFName);
taosArrayDestroy(*errList);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
return TSDB_CODE_SUCCESS;
SCH_RET(schProcessOnTaskFailure(pJob, pJob->fetchTask, code));
}
......@@ -961,9 +923,6 @@ int32_t schRspHeadToErrList(SSchJob *pJob, SSchTask *pTask, int32_t errCode, SRs
int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) {
int32_t code = 0;
int8_t status = 0;
bool errInfoGot = false;
SQueryErrorInfo errInfo = {0};
SArray *errList = NULL;
if (schJobNeedToStop(pJob, &status)) {
SCH_TASK_ELOG("rsp not processed cause of job status, job status:%d", status);
......@@ -977,33 +936,18 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
case TDMT_VND_CREATE_TABLE_RSP: {
SVCreateTbBatchRsp batchRsp = {0};
if (msg) {
if (ONLY_RSP_HEAD_ERROR(rspCode)) {
SCH_ERR_JRET(schRspHeadToErrList(pJob, pTask, rspCode, (SRspHead *)msg, &errList));
errInfoGot = true;
SCH_ERR_JRET(rspCode);
}
tDeserializeSVCreateTbBatchRsp(msg, msgSize, &batchRsp);
if (batchRsp.rspList) {
int32_t num = taosArrayGetSize(batchRsp.rspList);
errList = taosArrayInit(num, sizeof(SQueryErrorInfo));
if (NULL == errList) {
SCH_TASK_ELOG("taskArrayInit %d errInfo failed", num);
taosArrayDestroy(batchRsp.rspList);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
for (int32_t i = 0; i < num; ++i) {
SVCreateTbRsp *rsp = taosArrayGet(batchRsp.rspList, i);
errInfo.code = rsp->code;
errInfo.tableName = rsp->tableName;
taosArrayPush(errList, &errInfo);
if (NEED_CLIENT_HANDLE_ERROR(rsp->code)) {
taosArrayDestroy(batchRsp.rspList);
SCH_ERR_JRET(rsp->code);
}
}
taosArrayDestroy(batchRsp.rspList);
errInfoGot = true;
}
}
......@@ -1014,12 +958,6 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
}
case TDMT_VND_SUBMIT_RSP: {
if (msg) {
if (ONLY_RSP_HEAD_ERROR(rspCode)) {
SCH_ERR_JRET(schRspHeadToErrList(pJob, pTask, rspCode, (SRspHead *)msg, &errList));
errInfoGot = true;
SCH_ERR_JRET(rspCode);
}
SSubmitRsp *rsp = (SSubmitRsp *)msg;
SCH_ERR_JRET(rsp->code);
......@@ -1036,27 +974,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
case TDMT_VND_QUERY_RSP: {
SQueryTableRsp rsp = {0};
if (msg) {
if (ONLY_RSP_HEAD_ERROR(rspCode)) {
SCH_ERR_JRET(schRspHeadToErrList(pJob, pTask, rspCode, (SRspHead *)msg, &errList));
errInfoGot = true;
SCH_ERR_JRET(rspCode);
}
tDeserializeSQueryTableRsp(msg, msgSize, &rsp);
if (rsp.code) {
errInfo.code = rsp.code;
errInfo.tableName = rsp.tableName;
errList = taosArrayInit(1, sizeof(SQueryErrorInfo));
if (NULL == errList) {
SCH_TASK_ELOG("taskArrayInit %d errInfo failed", 1);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
taosArrayPush(errList, &errInfo);
errInfoGot = true;
}
SCH_ERR_JRET(rsp.code);
}
......@@ -1124,7 +1042,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
_return:
SCH_RET(schProcessOnTaskFailure(pJob, pTask, code, errInfoGot ? errList : NULL));
SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
}
......@@ -1360,7 +1278,6 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
SSubQueryMsg *pMsg = msg;
pMsg->header.vgId = htonl(addr->nodeId);
strcpy(pMsg->header.dbFName, pTask->plan->dbFName);
pMsg->sId = htobe64(schMgmt.sId);
pMsg->queryId = htobe64(pJob->queryId);
pMsg->taskId = htobe64(pTask->taskId);
......@@ -1550,7 +1467,7 @@ int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
_return:
SCH_RET(schProcessOnTaskFailure(pJob, pTask, code, NULL));
SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
}
int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level) {
......@@ -1666,7 +1583,6 @@ void schFreeJobImpl(void *job) {
taosArrayDestroy(pJob->levels);
taosArrayDestroy(pJob->nodeList);
taosArrayDestroy(pJob->errList);
tfree(pJob->resData);
......@@ -1806,8 +1722,6 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan* pDag, in
pRes->code = atomic_load_32(&job->errCode);
pRes->numOfRows = job->resNumOfRows;
pRes->errList = job->errList;
job->errList = NULL;
schReleaseJob(*pJob);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册