提交 d9cde31b 编写于 作者: D dapan1121

feature/scheduler

上级 9b0f5df9
......@@ -213,6 +213,7 @@ typedef struct SSubplan {
ESubplanType subplanType;
int32_t msgType; // message type for subplan, used to denote the send message type to vnode.
int32_t level; // the execution level of current subplan, starting from 0 in a top-down manner.
char dbFName[TSDB_DB_FNAME_LEN];
SQueryNodeAddr execNode; // for the scan/modify subplan, the optional execution node
SQueryNodeStat execNodeStat; // only for scan subplan
SNodeList* pChildren; // the datasource subplan,from which to fetch the result
......
......@@ -181,9 +181,11 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
#define SET_META_TYPE_TABLE(t) (t) = META_TYPE_TABLE
#define SET_META_TYPE_BOTH_TABLE(t) (t) = META_TYPE_BOTH_TABLE
#define ONLY_RSP_HEAD_ERROR(_code) ((_code) == TSDB_CODE_VND_INVALID_VGROUP_ID)
#define NEED_CLIENT_RM_TBLMETA_ERROR(_code) ((_code) == TSDB_CODE_TDB_INVALID_TABLE_ID || (_code) == TSDB_CODE_VND_TB_NOT_EXIST)
#define NEED_CLIENT_REFRESH_VG_ERROR(_code) ((_code) == TSDB_CODE_VND_HASH_MISMATCH)
#define NEED_CLIENT_HANDLE_ERROR(_code) (NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code))
#define NEED_CLIENT_REFRESH_VG_ERROR(_code) ((_code) == TSDB_CODE_VND_HASH_MISMATCH || (_code) == TSDB_CODE_VND_INVALID_VGROUP_ID)
#define NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code) ((_code) == TSDB_CODE_TDB_TABLE_RECREATED)
#define NEED_CLIENT_HANDLE_ERROR(_code) (NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code))
#define NEED_SCHEDULER_RETRY_ERROR(_code) ((_code) == TSDB_CODE_RPC_REDIRECT)
......
......@@ -354,6 +354,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TDB_MESSED_MSG TAOS_DEF_ERROR_CODE(0, 0x0614)
#define TSDB_CODE_TDB_IVLD_TAG_VAL TAOS_DEF_ERROR_CODE(0, 0x0615)
#define TSDB_CODE_TDB_NO_CACHE_LAST_ROW TAOS_DEF_ERROR_CODE(0, 0x0616)
#define TSDB_CODE_TDB_TABLE_RECREATED TAOS_DEF_ERROR_CODE(0, 0x0617)
// query
#define TSDB_CODE_QRY_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0700)
......
......@@ -278,13 +278,13 @@ int32_t clientProcessErrorList(SArray **pList) {
for (int32_t i = 0; i < errNum; ++i) {
SQueryErrorInfo *errInfo = taosArrayGet(errList, i);
if (TSDB_CODE_VND_HASH_MISMATCH == errInfo->code) {
if (NEED_CLIENT_REFRESH_VG_ERROR(errInfo->code)) {
if (i == (errNum - 1)) {
break;
}
// TODO REMOVE SAME DB ERROR
} else if (NEED_CLIENT_RM_TBLMETA_ERROR(errInfo->code)) {
} else if (NEED_CLIENT_REFRESH_TBLMETA_ERROR(errInfo->code) || NEED_CLIENT_RM_TBLMETA_ERROR(errInfo->code)) {
continue;
} else {
taosArrayRemove(errList, i);
......@@ -355,6 +355,28 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
}
catalogRemoveTableMeta(pCatalog, &errInfo->tableName);
} else if (NEED_CLIENT_REFRESH_TBLMETA_ERROR(errInfo->code)) {
++needRetryNum;
SCatalog *pCatalog = NULL;
tcode = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
if (tcode != TSDB_CODE_SUCCESS) {
++needRetryFailNum;
code = tcode;
continue;
}
SEpSet epset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(&errInfo->tableName, dbFName);
tcode = catalogRefreshTableMeta(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, &errInfo->tableName, -1);
if (tcode != TSDB_CODE_SUCCESS) {
++needRetryFailNum;
code = tcode;
continue;
}
}
}
......
......@@ -805,6 +805,24 @@ static int32_t dndWriteRpcMsgToVnodeQueue(STaosQueue *pQueue, SRpcMsg *pRpcMsg,
return code;
}
static void dndGenerateResponseHead(SRpcMsg *pMsg, void **pRspHead, int *contLen) {
if (TDMT_VND_SUBMIT != pMsg->msgType && TDMT_VND_QUERY != pMsg->msgType
&& TDMT_VND_CREATE_TABLE != pMsg->msgType && TDMT_VND_TABLE_META != pMsg->msgType) {
return;
}
*pRspHead = rpcMallocCont(sizeof(SRspHead));
if (NULL == *pRspHead) {
return;
}
SMsgHead *pHead = pMsg->pCont;
strcpy(((SRspHead *)(*pRspHead))->dbFName, pHead->dbFName);
*contLen = sizeof(SRspHead);
}
static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) {
SMsgHead *pHead = pMsg->pCont;
pHead->contLen = htonl(pHead->contLen);
......@@ -815,6 +833,7 @@ static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) {
dError("vgId:%d, failed to acquire vnode while process req", pHead->vgId);
if (pMsg->msgType & 1u) {
SRpcMsg rsp = {.handle = pMsg->handle, .code = TSDB_CODE_VND_INVALID_VGROUP_ID};
dndGenerateResponseHead(pMsg, &rsp.pCont, &rsp.contLen);
rpcSendResponse(&rsp);
}
rpcFreeCont(pMsg->pCont);
......
......@@ -83,21 +83,20 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
for (int i = 0; i < reqNum; i++) {
SVCreateTbReq *pCreateTbReq = taosArrayGet(vCreateTbBatchReq.pArray, i);
// TODO OPEN THIS
#if 0
char tableFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(&pCreateTbReq->name, tableFName);
SMsgHead *pHead = (SMsgHead *)pMsg->pCont;
sprintf(tableFName, "%s.%s", pHead->dbFName, pCreateTbReq->name);
int32_t code = vnodeValidateTableHash(&pVnode->config, tableFName);
if (code) {
SVCreateTbRsp rsp;
rsp.code = code;
memcpy(rsp.tableName, pCreateTbReq->name, sizeof(rsp.tableName));
tNameFromString(&rsp.tableName, tableFName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
if (NULL == vCreateTbBatchRsp.rspList) {
vCreateTbBatchRsp.rspList = taosArrayInit(reqNum - i, sizeof(SVCreateTbRsp));
if (NULL == vCreateTbBatchRsp.rspList) {
vError("vgId:%d, failed to init array: %d", reqNum - i);
vError("vgId:%d, failed to init array: %d", pVnode->vgId, reqNum - i);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
......@@ -106,8 +105,6 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
taosArrayPush(vCreateTbBatchRsp.rspList, &rsp);
}
#endif
if (metaCreateTable(pVnode->pMeta, pCreateTbReq) < 0) {
// TODO: handle error
vError("vgId:%d, failed to create table: %s", pVnode->vgId, pCreateTbReq->name);
......
......@@ -1722,18 +1722,17 @@ int32_t ctgRefreshDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, c
if (inCache) {
input.dbId = dbCache->dbId;
input.vgVersion = dbCache->vgInfo->vgVersion;
input.numOfTable = dbCache->vgInfo->numOfTable;
ctgReleaseVgInfo(dbCache);
ctgReleaseDBCache(pCtg, dbCache);
} else {
input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
}
input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
input.numOfTable = 0;
code = ctgGetDBVgInfoFromMnode(pCtg, pRpc, pMgmtEps, &input, &DbOut);
if (code) {
if (CTG_DB_NOT_EXIST(code) && input.vgVersion > CTG_DEFAULT_INVALID_VERSION) {
if (CTG_DB_NOT_EXIST(code) && inCache) {
ctgDebug("db no longer exist, dbFName:%s, dbId:%" PRIx64, input.db, input.dbId);
ctgPushRmDBMsgInQueue(pCtg, input.db, input.dbId);
}
......@@ -2660,7 +2659,7 @@ int32_t catalogRefreshTableMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgm
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
CTG_API_LEAVE(ctgRefreshTblMeta(pCtg, pTrans, pMgmtEps, pTableName, CTG_FLAG_FORCE_UPDATE | CTG_FLAG_MAKE_STB(isSTable), NULL, false));
CTG_API_LEAVE(ctgRefreshTblMeta(pCtg, pTrans, pMgmtEps, pTableName, CTG_FLAG_FORCE_UPDATE | CTG_FLAG_MAKE_STB(isSTable), NULL, true));
}
int32_t catalogRefreshGetTableMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable) {
......
......@@ -8201,6 +8201,22 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
}
}
STbCfg* pTbCfg = metaGetTbInfoByUid(pHandle->meta, pScanPhyNode->uid);
if (pTbCfg == NULL) {
tb_uid_t uid = 0;
pTbCfg = metaGetTbInfoByName(pHandle->meta, pScanPhyNode->tableName.tname, &uid);
if (pTbCfg) {
errInfo->code = TSDB_CODE_TDB_TABLE_RECREATED;
errInfo->tableName = pScanPhyNode->tableName;
return NULL;
}
errInfo->code = TSDB_CODE_TDB_INVALID_TABLE_ID;
errInfo->tableName = pScanPhyNode->tableName;
return NULL;
}
size_t numOfCols = LIST_LENGTH(pScanPhyNode->pScanCols);
tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, (uint64_t)queryId, taskId);
if (NULL == pDataReader) {
......
......@@ -78,6 +78,8 @@ typedef struct STableDataBlocks {
char *pData;
bool cloned;
STagData tagData;
char tableName[TSDB_TABLE_NAME_LEN];
char dbFName[TSDB_DB_FNAME_LEN];
SParsedDataColInfo boundColumnInfo;
SRowBuilder rowBuilder;
......@@ -115,10 +117,11 @@ static FORCE_INLINE void getMemRowAppendInfo(SSchema *pSchema, uint8_t rowType,
}
}
static FORCE_INLINE int32_t setBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows) {
pBlocks->tid = pTableMeta->suid;
pBlocks->uid = pTableMeta->uid;
pBlocks->sversion = pTableMeta->sversion;
static FORCE_INLINE int32_t setBlockInfo(SSubmitBlk *pBlocks, STableDataBlocks* dataBuf, int32_t numOfRows) {
pBlocks->tid = dataBuf->pTableMeta->suid;
pBlocks->uid = dataBuf->pTableMeta->uid;
pBlocks->sversion = dataBuf->pTableMeta->sversion;
strcpy(pBlocks->tableName, dataBuf->tableName);
if (pBlocks->numOfRows + numOfRows >= INT16_MAX) {
return TSDB_CODE_TSC_INVALID_OPERATION;
......
......@@ -50,6 +50,8 @@ typedef struct SInsertParseContext {
SParseContext* pComCxt; // input
char *pSql; // input
SMsgBuf msg; // input
char dbFName[TSDB_DB_FNAME_LEN];
char tableName[TSDB_TABLE_NAME_LEN];
STableMeta* pTableMeta; // each table
SParsedDataColInfo tags; // each table
SKVRowBuilder tagsBuilder; // each table
......@@ -228,6 +230,9 @@ static int32_t getTableMeta(SInsertParseContext* pCxt, SToken* pTname) {
CHECK_CODE(catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &vg));
CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
pCxt->pTableMeta->vgId = vg.vgId; // todo remove
strcpy(pCxt->tableName, name.tname);
tNameGetFullDbName(&name, pCxt->dbFName);
return TSDB_CODE_SUCCESS;
}
......@@ -241,10 +246,11 @@ static int32_t findCol(SToken* pColname, int32_t start, int32_t end, SSchema* pS
return -1;
}
static void buildMsgHeader(SVgDataBlocks* blocks) {
static void buildMsgHeader(STableDataBlocks* src, SVgDataBlocks* blocks) {
SSubmitReq* submit = (SSubmitReq*)blocks->pData;
submit->header.vgId = htonl(blocks->vg.vgId);
submit->header.contLen = htonl(blocks->size);
strcpy(submit->header.dbFName, src->dbFName);
submit->length = submit->header.contLen;
submit->numOfBlocks = htonl(blocks->numOfTables);
SSubmitBlk* blk = (SSubmitBlk*)(submit + 1);
......@@ -278,7 +284,7 @@ static int32_t buildOutput(SInsertParseContext* pCxt) {
dst->numOfTables = src->numOfTables;
dst->size = src->size;
TSWAP(dst->pData, src->pData, char*);
buildMsgHeader(dst);
buildMsgHeader(src, dst);
taosArrayPush(pCxt->pOutput->pDataBlocks, &dst);
}
return TSDB_CODE_SUCCESS;
......@@ -893,7 +899,7 @@ static int32_t parseValuesClause(SInsertParseContext* pCxt, STableDataBlocks* da
CHECK_CODE(parseValues(pCxt, dataBuf, maxNumOfRows, &numOfRows));
SSubmitBlk *pBlocks = (SSubmitBlk *)(dataBuf->pData);
if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, dataBuf->pTableMeta, numOfRows)) {
if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, dataBuf, numOfRows)) {
return buildInvalidOperationMsg(&pCxt->msg, "too many rows in sql, total number of rows should be less than 32767");
}
......@@ -970,7 +976,9 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
STableDataBlocks *dataBuf = NULL;
CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, pCxt->pTableMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE,
sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta, &dataBuf, NULL));
strcpy(dataBuf->tableName, pCxt->tableName);
strcpy(dataBuf->dbFName, pCxt->dbFName);
if (TK_NK_LP == sToken.type) {
// pSql -> field1_name, ...)
CHECK_CODE(parseBoundColumns(pCxt, &dataBuf->boundColumnInfo, getTableColumnSchema(pCxt->pTableMeta)));
......
......@@ -1239,6 +1239,7 @@ static void destroyTranslateContext(STranslateContext* pCxt) {
typedef struct SVgroupTablesBatch {
SVCreateTbBatchReq req;
SVgroupInfo info;
char dbName[TSDB_DB_NAME_LEN];
} SVgroupTablesBatch;
static void toSchema(const SColumnDefNode* pCol, int32_t colId, SSchema* pSchema) {
......@@ -1254,7 +1255,7 @@ static void destroyCreateTbReq(SVCreateTbReq* pReq) {
}
static int32_t buildNormalTableBatchReq(
const char* pTableName, const SNodeList* pColumns, const SVgroupInfo* pVgroupInfo, SVgroupTablesBatch* pBatch) {
const char* pDbName, const char* pTableName, const SNodeList* pColumns, const SVgroupInfo* pVgroupInfo, SVgroupTablesBatch* pBatch) {
SVCreateTbReq req = {0};
req.type = TD_NORMAL_TABLE;
req.name = strdup(pTableName);
......@@ -1272,6 +1273,7 @@ static int32_t buildNormalTableBatchReq(
}
pBatch->info = *pVgroupInfo;
strcpy(pBatch->dbName, pDbName);
pBatch->req.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq));
if (NULL == pBatch->req.pArray) {
destroyCreateTbReq(&req);
......@@ -1282,7 +1284,7 @@ static int32_t buildNormalTableBatchReq(
return TSDB_CODE_SUCCESS;
}
static int32_t serializeVgroupTablesBatch(SVgroupTablesBatch* pTbBatch, SArray* pBufArray) {
static int32_t serializeVgroupTablesBatch(int32_t acctId, SVgroupTablesBatch* pTbBatch, SArray* pBufArray) {
int tlen = sizeof(SMsgHead) + tSerializeSVCreateTbBatchReq(NULL, &(pTbBatch->req));
void* buf = malloc(tlen);
if (NULL == buf) {
......@@ -1290,6 +1292,7 @@ static int32_t serializeVgroupTablesBatch(SVgroupTablesBatch* pTbBatch, SArray*
}
((SMsgHead*)buf)->vgId = htonl(pTbBatch->info.vgId);
((SMsgHead*)buf)->contLen = htonl(tlen);
sprintf(((SMsgHead*)buf)->dbFName, "%d.%s", acctId, pTbBatch->dbName);
void* pBuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tSerializeSVCreateTbBatchReq(&pBuf, &(pTbBatch->req));
......@@ -1351,16 +1354,16 @@ static void destroyCreateTbReqArray(SArray* pArray) {
taosArrayDestroy(pArray);
}
static int32_t buildCreateTableDataBlock(const SCreateTableStmt* pStmt, const SVgroupInfo* pInfo, SArray** pBufArray) {
static int32_t buildCreateTableDataBlock(int32_t acctId, const SCreateTableStmt* pStmt, const SVgroupInfo* pInfo, SArray** pBufArray) {
*pBufArray = taosArrayInit(1, POINTER_BYTES);
if (NULL == *pBufArray) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SVgroupTablesBatch tbatch = {0};
int32_t code = buildNormalTableBatchReq(pStmt->tableName, pStmt->pCols, pInfo, &tbatch);
int32_t code = buildNormalTableBatchReq(pStmt->dbName, pStmt->tableName, pStmt->pCols, pInfo, &tbatch);
if (TSDB_CODE_SUCCESS == code) {
code = serializeVgroupTablesBatch(&tbatch, *pBufArray);
code = serializeVgroupTablesBatch(acctId, &tbatch, *pBufArray);
}
destroyCreateTbReqBatch(&tbatch);
......@@ -1377,7 +1380,7 @@ static int32_t rewriteCreateTable(STranslateContext* pCxt, SQuery* pQuery) {
int32_t code = getTableHashVgroup(pCxt->pParseCxt, pStmt->dbName, pStmt->tableName, &info);
SArray* pBufArray = NULL;
if (TSDB_CODE_SUCCESS == code) {
code = buildCreateTableDataBlock(pStmt, &info, &pBufArray);
code = buildCreateTableDataBlock(pCxt->pParseCxt->acctId, pStmt, &info, &pBufArray);
}
if (TSDB_CODE_SUCCESS == code) {
code = rewriteToVnodeModifOpStmt(pQuery, pBufArray);
......@@ -1389,7 +1392,7 @@ static int32_t rewriteCreateTable(STranslateContext* pCxt, SQuery* pQuery) {
return code;
}
static void addCreateTbReqIntoVgroup(SHashObj* pVgroupHashmap, const char* pTableName, SKVRow row, uint64_t suid, SVgroupInfo* pVgInfo) {
static void addCreateTbReqIntoVgroup(SHashObj* pVgroupHashmap, const char* pDbName, const char* pTableName, SKVRow row, uint64_t suid, SVgroupInfo* pVgInfo) {
struct SVCreateTbReq req = {0};
req.type = TD_CHILD_TABLE;
req.name = strdup(pTableName);
......@@ -1400,6 +1403,7 @@ static void addCreateTbReqIntoVgroup(SHashObj* pVgroupHashmap, const char* pTabl
if (pTableBatch == NULL) {
SVgroupTablesBatch tBatch = {0};
tBatch.info = *pVgInfo;
strcpy(tBatch.dbName, pDbName);
tBatch.req.pArray = taosArrayInit(4, sizeof(struct SVCreateTbReq));
taosArrayPush(tBatch.req.pArray, &req);
......@@ -1546,7 +1550,7 @@ static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableCla
code = getTableHashVgroup(pCxt->pParseCxt, pStmt->dbName, pStmt->tableName, &info);
}
if (TSDB_CODE_SUCCESS == code) {
addCreateTbReqIntoVgroup(pVgroupHashmap, pStmt->tableName, row, pSuperTableMeta->uid, &info);
addCreateTbReqIntoVgroup(pVgroupHashmap, pStmt->dbName, pStmt->tableName, row, pSuperTableMeta->uid, &info);
}
tfree(pSuperTableMeta);
......@@ -1554,7 +1558,7 @@ static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableCla
return code;
}
static SArray* serializeVgroupsTablesBatch(SHashObj* pVgroupHashmap) {
static SArray* serializeVgroupsTablesBatch(int32_t acctId, SHashObj* pVgroupHashmap) {
SArray* pBufArray = taosArrayInit(taosHashGetSize(pVgroupHashmap), sizeof(void*));
if (NULL == pBufArray) {
return NULL;
......@@ -1568,7 +1572,7 @@ static SArray* serializeVgroupsTablesBatch(SHashObj* pVgroupHashmap) {
break;
}
serializeVgroupTablesBatch(pTbBatch, pBufArray);
serializeVgroupTablesBatch(acctId, pTbBatch, pBufArray);
destroyCreateTbReqBatch(pTbBatch);
} while (true);
......@@ -1593,7 +1597,7 @@ static int32_t rewriteCreateMultiTable(STranslateContext* pCxt, SQuery* pQuery)
}
}
SArray* pBufArray = serializeVgroupsTablesBatch(pVgroupHashmap);
SArray* pBufArray = serializeVgroupsTablesBatch(pCxt->pParseCxt->acctId, pVgroupHashmap);
taosHashCleanup(pVgroupHashmap);
if (NULL == pBufArray) {
return TSDB_CODE_OUT_OF_MEMORY;
......
......@@ -256,6 +256,7 @@ static SPhysiNode* createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* p
pTableScan->scanRange = pScanLogicNode->scanRange;
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode);
tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
return (SPhysiNode*)pTableScan;
}
......
......@@ -933,6 +933,29 @@ _return:
SCH_RET(schProcessOnTaskFailure(pJob, pJob->fetchTask, code, NULL));
}
int32_t schRspHeadToErrList(SSchJob *pJob, SSchTask *pTask, int32_t errCode, SRspHead *head, SArray **errList) {
SQueryErrorInfo errInfo = {0};
errInfo.code = errCode;
if (tNameFromString(&errInfo.tableName, head->dbFName, T_NAME_ACCT | T_NAME_DB)) {
SCH_TASK_ELOG("invalid rsp head, dbFName:%s", head->dbFName);
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
*errList = taosArrayInit(1, sizeof(SQueryErrorInfo));
if (NULL == *errList) {
SCH_TASK_ELOG("taskArrayInit %d errInfo failed", 1);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
if (NULL == taosArrayPush(*errList, &errInfo)) {
SCH_TASK_ELOG("taosArrayPush err to errList failed, dbFName:%s", head->dbFName);
taosArrayDestroy(*errList);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
return TSDB_CODE_SUCCESS;
}
// Note: no more task error processing, handled in function internal
int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) {
......@@ -954,6 +977,12 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
case TDMT_VND_CREATE_TABLE_RSP: {
SVCreateTbBatchRsp batchRsp = {0};
if (msg) {
if (ONLY_RSP_HEAD_ERROR(rspCode)) {
SCH_ERR_JRET(schRspHeadToErrList(pJob, pTask, rspCode, (SRspHead *)msg, &errList));
errInfoGot = true;
SCH_ERR_JRET(rspCode);
}
tDeserializeSVCreateTbBatchRsp(msg, msgSize, &batchRsp);
if (batchRsp.rspList) {
int32_t num = taosArrayGetSize(batchRsp.rspList);
......@@ -973,7 +1002,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
taosArrayPush(errList, &errInfo);
}
taosArrayDestroy(batchRsp.rspList);
taosArrayDestroy(batchRsp.rspList);
errInfoGot = true;
}
}
......@@ -984,23 +1013,22 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
break;
}
case TDMT_VND_SUBMIT_RSP: {
#if 0 //TODO OPEN THIS
SSubmitRsp *rsp = (SSubmitRsp *)msg;
if (msg) {
if (ONLY_RSP_HEAD_ERROR(rspCode)) {
SCH_ERR_JRET(schRspHeadToErrList(pJob, pTask, rspCode, (SRspHead *)msg, &errList));
errInfoGot = true;
SCH_ERR_JRET(rspCode);
}
SSubmitRsp *rsp = (SSubmitRsp *)msg;
SCH_ERR_JRET(rsp->code);
if (rspCode != TSDB_CODE_SUCCESS || NULL == msg || rsp->code != TSDB_CODE_SUCCESS) {
SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode));
pJob->resNumOfRows += rsp->affectedRows;
}
pJob->resNumOfRows += rsp->affectedRows;
#else
SCH_ERR_JRET(rspCode);
SSubmitRsp *rsp = (SSubmitRsp *)msg;
if (rsp) {
pJob->resNumOfRows += rsp->affectedRows;
}
#endif
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
break;
......@@ -1008,6 +1036,12 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
case TDMT_VND_QUERY_RSP: {
SQueryTableRsp rsp = {0};
if (msg) {
if (ONLY_RSP_HEAD_ERROR(rspCode)) {
SCH_ERR_JRET(schRspHeadToErrList(pJob, pTask, rspCode, (SRspHead *)msg, &errList));
errInfoGot = true;
SCH_ERR_JRET(rspCode);
}
tDeserializeSQueryTableRsp(msg, msgSize, &rsp);
if (rsp.code) {
errInfo.code = rsp.code;
......@@ -1326,6 +1360,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
SSubQueryMsg *pMsg = msg;
pMsg->header.vgId = htonl(addr->nodeId);
strcpy(pMsg->header.dbFName, pTask->plan->dbFName);
pMsg->sId = htobe64(schMgmt.sId);
pMsg->queryId = htobe64(pJob->queryId);
pMsg->taskId = htobe64(pTask->taskId);
......
......@@ -350,6 +350,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_AVAIL_DISK, "No available disk")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_MESSED_MSG, "TSDB messed message")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVLD_TAG_VAL, "TSDB invalid tag value")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_CACHE_LAST_ROW, "TSDB no cache last row data")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_RECREATED, "Table re-created")
// query
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_QHANDLE, "Invalid handle")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册