提交 fed7f170 编写于 作者: P plum-lihui

Merge branch '3.0' of github.com:taosdata/TDengine into 3.0

......@@ -232,9 +232,9 @@ typedef struct SSelectStmt {
char stmtName[TSDB_TABLE_NAME_LEN];
uint8_t precision;
bool isEmptyResult;
bool isTimeOrderQuery;
bool hasAggFuncs;
bool hasRepeatScanFuncs;
bool isTimeOrderQuery;
} SSelectStmt;
typedef enum ESetOperatorType { SET_OP_TYPE_UNION_ALL = 1, SET_OP_TYPE_UNION } ESetOperatorType;
......
......@@ -634,6 +634,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_PAR_INVALID_TAGS_NUM TAOS_DEF_ERROR_CODE(0, 0x2643)
#define TSDB_CODE_PAR_PERMISSION_DENIED TAOS_DEF_ERROR_CODE(0, 0x2644)
#define TSDB_CODE_PAR_INVALID_STREAM_QUERY TAOS_DEF_ERROR_CODE(0, 0x2645)
#define TSDB_CODE_PAR_INVALID_INTERNAL_PK TAOS_DEF_ERROR_CODE(0, 0x2646)
//planner
#define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700)
......
......@@ -46,6 +46,7 @@ SSchema* getTableTagSchema(const STableMeta* pTableMeta);
int32_t getNumOfColumns(const STableMeta* pTableMeta);
int32_t getNumOfTags(const STableMeta* pTableMeta);
STableComInfo getTableInfo(const STableMeta* pTableMeta);
STableMeta* tableMetaDup(const STableMeta* pTableMeta);
int parseJsontoTagData(const char* json, SKVRowBuilder* kvRowBuilder, SMsgBuf* errMsg, int16_t startColId);
int32_t trimString(const char* src, int32_t len, char* dst, int32_t dlen);
......
......@@ -78,7 +78,7 @@ static bool checkUserName(SAstCreateContext* pCxt, SToken* pUserName) {
static bool checkPassword(SAstCreateContext* pCxt, const SToken* pPasswordToken, char* pPassword) {
if (NULL == pPasswordToken) {
pCxt->errCode = TSDB_CODE_PAR_SYNTAX_ERROR;
} else if (pPasswordToken->n >= (TSDB_USET_PASSWORD_LEN - 2)) {
} else if (pPasswordToken->n >= (TSDB_USET_PASSWORD_LEN + 2)) {
pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_NAME_OR_PASSWD_TOO_LONG);
} else {
strncpy(pPassword, pPasswordToken->z, pPasswordToken->n);
......
......@@ -53,6 +53,7 @@ typedef struct SInsertParseContext {
SHashObj* pTableBlockHashObj; // global
SHashObj* pSubTableHashObj; // global
SArray* pVgDataBlocks; // global
SHashObj* pTableNameHashObj; // global
int32_t totalNum;
SVnodeModifOpStmt* pOutput;
SStmtCallback* pStmtCb;
......@@ -237,7 +238,7 @@ static int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, con
return code;
}
static int32_t getTableMetaImpl(SInsertParseContext* pCxt, SName* name, char *dbFname, bool isStb) {
static int32_t getTableMetaImpl(SInsertParseContext* pCxt, SName* name, char* dbFname, bool isStb) {
SParseContext* pBasicCtx = pCxt->pComCxt;
bool pass = false;
......@@ -252,6 +253,7 @@ static int32_t getTableMetaImpl(SInsertParseContext* pCxt, SName* name, char *db
} else {
CHECK_CODE(catalogGetTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, name,
&pCxt->pTableMeta));
ASSERT(pCxt->pTableMeta->tableInfo.rowSize > 0);
SVgroupInfo vg;
CHECK_CODE(
catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, name, &vg));
......@@ -260,9 +262,13 @@ static int32_t getTableMetaImpl(SInsertParseContext* pCxt, SName* name, char *db
return TSDB_CODE_SUCCESS;
}
static int32_t getTableMeta(SInsertParseContext* pCxt, SName* name, char *dbFname) { return getTableMetaImpl(pCxt, name, dbFname, false); }
static int32_t getTableMeta(SInsertParseContext* pCxt, SName* name, char* dbFname) {
return getTableMetaImpl(pCxt, name, dbFname, false);
}
static int32_t getSTableMeta(SInsertParseContext* pCxt, SName* name, char *dbFname) { return getTableMetaImpl(pCxt, name, dbFname, true); }
static int32_t getSTableMeta(SInsertParseContext* pCxt, SName* name, char* dbFname) {
return getTableMetaImpl(pCxt, name, dbFname, true);
}
static int32_t findCol(SToken* pColname, int32_t start, int32_t end, SSchema* pSchema) {
while (start < end) {
......@@ -863,7 +869,7 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SName* name, char* tb
createSName(&sname, &sToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg);
char stbFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(&sname, stbFName);
CHECK_CODE(getSTableMeta(pCxt, &sname, stbFName));
if (TSDB_SUPER_TABLE != pCxt->pTableMeta->tableType) {
return buildInvalidOperationMsg(&pCxt->msg, "create table only from super table is allowed");
......@@ -1063,8 +1069,8 @@ static void destroyInsertParseContext(SInsertParseContext* pCxt) {
// [...];
static int32_t parseInsertBody(SInsertParseContext* pCxt) {
int32_t tbNum = 0;
char tbFName[TSDB_TABLE_FNAME_LEN];
bool autoCreateTbl = false;
char tbFName[TSDB_TABLE_FNAME_LEN];
bool autoCreateTbl = false;
// for each table
while (1) {
......@@ -1106,6 +1112,8 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
createSName(&name, &tbnameToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg);
tNameExtractFullName(&name, tbFName);
CHECK_CODE(taosHashPut(pCxt->pTableNameHashObj, tbFName, strlen(tbFName), &name, sizeof(SName)));
// USING cluase
if (TK_USING == sToken.type) {
CHECK_CODE(parseUsingClause(pCxt, &name, tbFName));
......@@ -1158,7 +1166,8 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
memcpy(tags, &pCxt->tags, sizeof(pCxt->tags));
(*pCxt->pStmtCb->setInfoFn)(pCxt->pStmtCb->pStmt, pCxt->pTableMeta, tags, tbFName, autoCreateTbl, pCxt->pVgroupsHashObj, pCxt->pTableBlockHashObj);
(*pCxt->pStmtCb->setInfoFn)(pCxt->pStmtCb->pStmt, pCxt->pTableMeta, tags, tbFName, autoCreateTbl,
pCxt->pVgroupsHashObj, pCxt->pTableBlockHashObj);
memset(&pCxt->tags, 0, sizeof(pCxt->tags));
pCxt->pVgroupsHashObj = NULL;
......@@ -1187,7 +1196,8 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
.pSql = (char*)pContext->pSql,
.msg = {.buf = pContext->pMsg, .len = pContext->msgLen},
.pTableMeta = NULL,
.pSubTableHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, false),
.pSubTableHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK),
.pTableNameHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK),
.totalNum = 0,
.pOutput = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT),
.pStmtCb = pContext->pStmtCb};
......@@ -1196,12 +1206,13 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
(*pContext->pStmtCb->getExecInfoFn)(pContext->pStmtCb->pStmt, &context.pVgroupsHashObj,
&context.pTableBlockHashObj);
} else {
context.pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
context.pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
context.pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
context.pTableBlockHashObj =
taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
}
if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj || NULL == context.pSubTableHashObj ||
NULL == context.pOutput) {
NULL == context.pTableNameHashObj || NULL == context.pOutput) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
......@@ -1214,6 +1225,10 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
if (NULL == *pQuery) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*pQuery)->pTableList = taosArrayInit(taosHashGetSize(context.pTableNameHashObj), sizeof(SName));
if (NULL == (*pQuery)->pTableList) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*pQuery)->execMode = QUERY_EXEC_MODE_SCHEDULE;
(*pQuery)->haveResultSet = false;
(*pQuery)->msgType = TDMT_VND_SUBMIT;
......@@ -1226,6 +1241,13 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
if (TSDB_CODE_SUCCESS == code) {
code = parseInsertBody(&context);
}
if (TSDB_CODE_SUCCESS == code) {
SName* pTable = taosHashIterate(context.pTableNameHashObj, NULL);
while (NULL != pTable) {
taosArrayPush((*pQuery)->pTableList, pTable);
pTable = taosHashIterate(context.pTableNameHashObj, pTable);
}
}
destroyInsertParseContext(&context);
return code;
}
......@@ -1479,7 +1501,6 @@ int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBu
taosMemoryFree(pSTSchema);
}
#endif
}
if (rowEnd) {
......@@ -1677,10 +1698,10 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsFormat, SArray* cols
buildCreateTbReq(&smlHandle->createTblReq, tableName, row, pTableMeta->suid);
STableDataBlocks* pDataBlock = NULL;
ret = getDataBlockFromList(smlHandle->pBlockHash, &pTableMeta->uid, sizeof(pTableMeta->uid), TSDB_DEFAULT_PAYLOAD_SIZE,
sizeof(SSubmitBlk), getTableInfo(pTableMeta).rowSize, pTableMeta,
&pDataBlock, NULL, &smlHandle->createTblReq);
if(ret != TSDB_CODE_SUCCESS){
ret = getDataBlockFromList(smlHandle->pBlockHash, &pTableMeta->uid, sizeof(pTableMeta->uid),
TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk), getTableInfo(pTableMeta).rowSize,
pTableMeta, &pDataBlock, NULL, &smlHandle->createTblReq);
if (ret != TSDB_CODE_SUCCESS) {
buildInvalidOperationMsg(&pBuf, "create data block error");
return ret;
}
......
......@@ -137,7 +137,7 @@ static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t star
}
memset(dataBuf->pData, 0, sizeof(SSubmitBlk));
dataBuf->pTableMeta = pTableMeta;
dataBuf->pTableMeta = tableMetaDup(pTableMeta);
SParsedDataColInfo* pColInfo = &dataBuf->boundColumnInfo;
SSchema* pSchema = getTableColumnSchema(dataBuf->pTableMeta);
......@@ -465,7 +465,7 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** p
taosMemoryFreeClear(blkKeyInfo.pKeyTuple);
return ret;
}
ASSERT(pOneTableBlock->pTableMeta->tableInfo.rowSize > 0);
// the maximum expanded size in byte when a row-wise data is converted to SDataRow format
int32_t expandSize = isRawPayload ? getRowExpandSize(pOneTableBlock->pTableMeta) : 0;
int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize +
......
......@@ -368,11 +368,15 @@ static int32_t createColumnsByTable(STranslateContext* pCxt, const STableNode* p
return TSDB_CODE_SUCCESS;
}
static bool isInternalPrimaryKey(const SColumnNode* pCol) {
return PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId && 0 == strcmp(pCol->colName, PK_TS_COL_INTERNAL_NAME);
}
static bool findAndSetColumn(SColumnNode* pCol, const STableNode* pTable) {
bool found = false;
if (QUERY_NODE_REAL_TABLE == nodeType(pTable)) {
const STableMeta* pMeta = ((SRealTableNode*)pTable)->pMeta;
if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId && 0 == strcmp(pCol->colName, PK_TS_COL_INTERNAL_NAME)) {
if (isInternalPrimaryKey(pCol)) {
setColumnInfoBySchema((SRealTableNode*)pTable, pMeta->schema, false, pCol);
return true;
}
......@@ -389,7 +393,9 @@ static bool findAndSetColumn(SColumnNode* pCol, const STableNode* pTable) {
SNode* pNode;
FOREACH(pNode, pProjectList) {
SExprNode* pExpr = (SExprNode*)pNode;
if (0 == strcmp(pCol->colName, pExpr->aliasName)) {
if (0 == strcmp(pCol->colName, pExpr->aliasName) ||
((QUERY_NODE_COLUMN == nodeType(pExpr) && PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pExpr)->colId) &&
isInternalPrimaryKey(pCol))) {
setColumnInfoByExpr(pTable, pExpr, pCol);
found = true;
break;
......@@ -433,7 +439,11 @@ static EDealRes translateColumnWithoutPrefix(STranslateContext* pCxt, SColumnNod
}
}
if (!found) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_COLUMN, pCol->colName);
if (isInternalPrimaryKey(pCol)) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_INTERNAL_PK);
} else {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_COLUMN, pCol->colName);
}
}
return DEAL_RES_CONTINUE;
}
......@@ -3655,6 +3665,9 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt*
destroyCreateTbReq(&req);
return TSDB_CODE_OUT_OF_MEMORY;
}
if (pStmt->ignoreExists) {
req.flags |= TD_CREATE_IF_NOT_EXISTS;
}
SNode* pCol;
col_id_t index = 0;
FOREACH(pCol, pStmt->pCols) {
......@@ -3785,24 +3798,27 @@ static int32_t rewriteCreateTable(STranslateContext* pCxt, SQuery* pQuery) {
return code;
}
static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, const char* pDbName,
const char* pTableName, SKVRow row, uint64_t suid, SVgroupInfo* pVgInfo) {
static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, SCreateSubTableClause* pStmt, SKVRow row,
uint64_t suid, SVgroupInfo* pVgInfo) {
char dbFName[TSDB_DB_FNAME_LEN] = {0};
SName name = {.type = TSDB_DB_NAME_T, .acctId = acctId};
strcpy(name.dbname, pDbName);
strcpy(name.dbname, pStmt->dbName);
tNameGetFullDbName(&name, dbFName);
struct SVCreateTbReq req = {0};
req.type = TD_CHILD_TABLE;
req.name = strdup(pTableName);
req.name = strdup(pStmt->tableName);
req.ctb.suid = suid;
req.ctb.pTag = row;
if (pStmt->ignoreExists) {
req.flags |= TD_CREATE_IF_NOT_EXISTS;
}
SVgroupCreateTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pVgInfo->vgId, sizeof(pVgInfo->vgId));
if (pTableBatch == NULL) {
SVgroupCreateTableBatch tBatch = {0};
tBatch.info = *pVgInfo;
strcpy(tBatch.dbName, pDbName);
strcpy(tBatch.dbName, pStmt->dbName);
tBatch.req.pArray = taosArrayInit(4, sizeof(struct SVCreateTbReq));
taosArrayPush(tBatch.req.pArray, &req);
......@@ -3964,8 +3980,7 @@ static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableCla
code = getTableHashVgroup(pCxt, pStmt->dbName, pStmt->tableName, &info);
}
if (TSDB_CODE_SUCCESS == code) {
addCreateTbReqIntoVgroup(pCxt->pParseCxt->acctId, pVgroupHashmap, pStmt->dbName, pStmt->tableName, row,
pSuperTableMeta->uid, &info);
addCreateTbReqIntoVgroup(pCxt->pParseCxt->acctId, pVgroupHashmap, pStmt, row, pSuperTableMeta->uid, &info);
}
taosMemoryFreeClear(pSuperTableMeta);
......
......@@ -146,6 +146,8 @@ static char* getSyntaxErrFormat(int32_t errCode) {
return "Invalid binary/nchar column length";
case TSDB_CODE_PAR_INVALID_TAGS_NUM:
return "Invalid number of tag columns";
case TSDB_CODE_PAR_INVALID_INTERNAL_PK:
return "Invalid _c0 or _rowts expression";
case TSDB_CODE_OUT_OF_MEMORY:
return "Out of memory";
default:
......@@ -191,7 +193,7 @@ int32_t buildSyntaxErrMsg(SMsgBuf* pBuf, const char* additionalInfo, const char*
return TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
}
SSchema *getTableColumnSchema(const STableMeta *pTableMeta) {
SSchema* getTableColumnSchema(const STableMeta* pTableMeta) {
assert(pTableMeta != NULL);
return (SSchema*)pTableMeta->schema;
}
......@@ -226,6 +228,23 @@ STableComInfo getTableInfo(const STableMeta* pTableMeta) {
return pTableMeta->tableInfo;
}
static uint32_t getTableMetaSize(const STableMeta* pTableMeta) {
int32_t totalCols = 0;
if (pTableMeta->tableInfo.numOfColumns >= 0) {
totalCols = pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags;
}
return sizeof(STableMeta) + totalCols * sizeof(SSchema);
}
STableMeta* tableMetaDup(const STableMeta* pTableMeta) {
size_t size = getTableMetaSize(pTableMeta);
STableMeta* p = taosMemoryMalloc(size);
memcpy(p, pTableMeta, size);
return p;
}
int32_t trimString(const char* src, int32_t len, char* dst, int32_t dlen) {
if (len <= 0 || dlen <= 0) return 0;
......
......@@ -151,20 +151,32 @@ static bool needOptimizeDynamicScan(const SFunctionNode* pFunc) {
static int32_t osdGetRelatedFuncs(SScanLogicNode* pScan, SNodeList** pSdrFuncs, SNodeList** pDsoFuncs) {
SNodeList* pAllFuncs = osdGetAllFuncs(pScan->node.pParent);
SNodeList* pTmpSdrFuncs = NULL;
SNodeList* pTmpDsoFuncs = NULL;
SNode* pFunc = NULL;
bool otherFunc = false;
FOREACH(pFunc, pAllFuncs) {
int32_t code = TSDB_CODE_SUCCESS;
if (needOptimizeDataRequire((SFunctionNode*)pFunc)) {
code = nodesListMakeStrictAppend(pSdrFuncs, nodesCloneNode(pFunc));
code = nodesListMakeStrictAppend(&pTmpSdrFuncs, nodesCloneNode(pFunc));
} else if (needOptimizeDynamicScan((SFunctionNode*)pFunc)) {
code = nodesListMakeStrictAppend(pDsoFuncs, nodesCloneNode(pFunc));
code = nodesListMakeStrictAppend(&pTmpDsoFuncs, nodesCloneNode(pFunc));
} else {
otherFunc = true;
}
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyList(*pSdrFuncs);
nodesDestroyList(*pDsoFuncs);
nodesDestroyList(pTmpSdrFuncs);
nodesDestroyList(pTmpDsoFuncs);
return code;
}
}
if (otherFunc) {
nodesDestroyList(pTmpSdrFuncs);
nodesDestroyList(pTmpDsoFuncs);
} else {
*pSdrFuncs = pTmpSdrFuncs;
*pDsoFuncs = pTmpDsoFuncs;
}
return TSDB_CODE_SUCCESS;
}
......
......@@ -18,7 +18,7 @@
static char* getUsageErrFormat(int32_t errCode) {
switch (errCode) {
case TSDB_CODE_PLAN_EXPECTED_TS_EQUAL:
return "l.ts = r.ts is expected in join expression";
return "left.ts = right.ts is expected in join expression";
case TSDB_CODE_PLAN_NOT_SUPPORT_CROSS_JOIN:
return "not support cross join";
default:
......
......@@ -28,6 +28,8 @@ TEST_F(PlanOptimizeTest, optimizeScanData) {
run("SELECT COUNT(c1) FROM t1");
run("SELECT COUNT(CAST(c1 AS BIGINT)) FROM t1");
run("SELECT PERCENTILE(c1, 40), COUNT(*) FROM t1");
}
TEST_F(PlanOptimizeTest, orderByPrimaryKey) {
......
......@@ -1092,11 +1092,10 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
if (TSDB_CODE_SUCCESS == code && batchRsp.nRsps > 0) {
for (int32_t i = 0; i < batchRsp.nRsps; ++i) {
SVCreateTbRsp *rsp = batchRsp.pRsps + i;
if (NEED_CLIENT_HANDLE_ERROR(rsp->code)) {
tDecoderClear(&coder);
SCH_ERR_JRET(rsp->code);
} else if (TSDB_CODE_SUCCESS != rsp->code) {
if (TSDB_CODE_SUCCESS != rsp->code) {
code = rsp->code;
tDecoderClear(&coder);
SCH_ERR_JRET(code);
}
}
}
......@@ -1117,11 +1116,10 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
if (TSDB_CODE_SUCCESS == code && batchRsp.nRsps > 0) {
for (int32_t i = 0; i < batchRsp.nRsps; ++i) {
SVDropTbRsp *rsp = batchRsp.pRsps + i;
if (NEED_CLIENT_HANDLE_ERROR(rsp->code)) {
tDecoderClear(&coder);
SCH_ERR_JRET(rsp->code);
} else if (TSDB_CODE_SUCCESS != rsp->code) {
if (TSDB_CODE_SUCCESS != rsp->code) {
code = rsp->code;
tDecoderClear(&coder);
SCH_ERR_JRET(code);
}
}
}
......@@ -1137,7 +1135,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
SCH_ERR_JRET(rspCode);
if (msg) {
SDecoder coder = {0};
SDecoder coder = {0};
SSubmitRsp *rsp = taosMemoryMalloc(sizeof(*rsp));
tDecoderInit(&coder, msg, msgSize);
code = tDecodeSSubmitRsp(&coder, rsp);
......@@ -1146,10 +1144,21 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
tFreeSSubmitRsp(rsp);
SCH_ERR_JRET(code);
}
if (rsp->nBlocks > 0) {
for (int32_t i = 0; i < rsp->nBlocks; ++i) {
SSubmitBlkRsp *blk = rsp->pBlocks + i;
if (TSDB_CODE_SUCCESS != blk->code) {
code = blk->code;
tFreeSSubmitRsp(rsp);
SCH_ERR_JRET(code);
}
}
}
atomic_add_fetch_32(&pJob->resNumOfRows, rsp->affectedRows);
SCH_TASK_DLOG("submit succeed, affectedRows:%d", rsp->affectedRows);
if (pJob->attr.needRes) {
SCH_LOCK(SCH_WRITE, &pJob->resLock);
if (pJob->resData) {
......@@ -1160,7 +1169,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
memcpy(sum->pBlocks + sum->nBlocks - rsp->nBlocks, rsp->pBlocks, rsp->nBlocks * sizeof(*sum->pBlocks));
taosMemoryFree(rsp->pBlocks);
taosMemoryFree(rsp);
} else {
} else {
pJob->resData = rsp;
}
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册