提交 dbd943b8 编写于 作者: H Hongze Cheng

Merge branch 'feature/vnode_refact1' of https://github.com/taosdata/TDengine...

Merge branch 'feature/vnode_refact1' of https://github.com/taosdata/TDengine into feature/vnode_refact1
......@@ -1604,6 +1604,7 @@ int32_t tDeserializeSVCreateTbBatchRsp(void* buf, int32_t bufLen, SVCreateTbBatc
// TDMT_VND_DROP_TABLE =================
typedef struct {
const char* name;
int8_t igNotExists;
} SVDropTbReq;
typedef struct {
......
......@@ -616,6 +616,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_PAR_OFFSET_LESS_ZERO TAOS_DEF_ERROR_CODE(0, 0x2637)
#define TSDB_CODE_PAR_SLIMIT_LEAK_PARTITION_BY TAOS_DEF_ERROR_CODE(0, 0x2638)
#define TSDB_CODE_PAR_INVALID_TOPIC_QUERY TAOS_DEF_ERROR_CODE(0, 0x2639)
#define TSDB_CODE_PAR_INVALID_DROP_STABLE TAOS_DEF_ERROR_CODE(0, 0x263A)
//planner
#define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700)
......
......@@ -3800,6 +3800,7 @@ static int32_t tEncodeSVDropTbReq(SCoder *pCoder, const SVDropTbReq *pReq) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeCStr(pCoder, pReq->name) < 0) return -1;
if (tEncodeI8(pCoder, pReq->igNotExists) < 0) return -1;
tEndEncode(pCoder);
return 0;
......@@ -3809,6 +3810,7 @@ static int32_t tDecodeSVDropTbReq(SCoder *pCoder, SVDropTbReq *pReq) {
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeCStr(pCoder, &pReq->name) < 0) return -1;
if (tDecodeI8(pCoder, &pReq->igNotExists) < 0) return -1;
tEndDecode(pCoder);
return 0;
......
......@@ -3134,11 +3134,11 @@ static int32_t rewriteShow(STranslateContext* pCxt, SQuery* pQuery) {
return code;
}
typedef struct SVgroupTablesBatch {
typedef struct SVgroupCreateTableBatch {
SVCreateTbBatchReq req;
SVgroupInfo info;
char dbName[TSDB_DB_NAME_LEN];
} SVgroupTablesBatch;
} SVgroupCreateTableBatch;
static void destroyCreateTbReq(SVCreateTbReq* pReq) {
taosMemoryFreeClear(pReq->name);
......@@ -3146,7 +3146,7 @@ static void destroyCreateTbReq(SVCreateTbReq* pReq) {
}
static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt* pStmt, const SVgroupInfo* pVgroupInfo,
SVgroupTablesBatch* pBatch) {
SVgroupCreateTableBatch* pBatch) {
char dbFName[TSDB_DB_FNAME_LEN] = {0};
SName name = {.type = TSDB_DB_NAME_T, .acctId = acctId};
strcpy(name.dbname, pStmt->dbName);
......@@ -3180,13 +3180,13 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt*
return TSDB_CODE_SUCCESS;
}
static int32_t serializeVgroupTablesBatch(SVgroupTablesBatch* pTbBatch, SArray* pBufArray) {
static int32_t serializeVgroupCreateTableBatch(SVgroupCreateTableBatch* pTbBatch, SArray* pBufArray) {
int tlen;
SCoder coder = {0};
int32_t ret = 0;
tEncodeSize(tEncodeSVCreateTbBatchReq, &pTbBatch->req, tlen, ret);
tlen += sizeof(SMsgHead); //+ tSerializeSVCreateTbBatchReq(NULL, &(pTbBatch->req));
tlen += sizeof(SMsgHead);
void* buf = taosMemoryMalloc(tlen);
if (NULL == buf) {
return TSDB_CODE_OUT_OF_MEMORY;
......@@ -3212,7 +3212,7 @@ static int32_t serializeVgroupTablesBatch(SVgroupTablesBatch* pTbBatch, SArray*
return TSDB_CODE_SUCCESS;
}
static void destroyCreateTbReqBatch(SVgroupTablesBatch* pTbBatch) {
static void destroyCreateTbReqBatch(SVgroupCreateTableBatch* pTbBatch) {
size_t size = taosArrayGetSize(pTbBatch->req.pArray);
for (int32_t i = 0; i < size; ++i) {
SVCreateTbReq* pTableReq = taosArrayGet(pTbBatch->req.pArray, i);
......@@ -3257,10 +3257,10 @@ static int32_t buildCreateTableDataBlock(int32_t acctId, const SCreateTableStmt*
return TSDB_CODE_OUT_OF_MEMORY;
}
SVgroupTablesBatch tbatch = {0};
SVgroupCreateTableBatch tbatch = {0};
int32_t code = buildNormalTableBatchReq(acctId, pStmt, pInfo, &tbatch);
if (TSDB_CODE_SUCCESS == code) {
code = serializeVgroupTablesBatch(&tbatch, *pBufArray);
code = serializeVgroupCreateTableBatch(&tbatch, *pBufArray);
}
destroyCreateTbReqBatch(&tbatch);
......@@ -3305,9 +3305,9 @@ static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, c
req.ctb.suid = suid;
req.ctb.pTag = row;
SVgroupTablesBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pVgInfo->vgId, sizeof(pVgInfo->vgId));
SVgroupCreateTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pVgInfo->vgId, sizeof(pVgInfo->vgId));
if (pTableBatch == NULL) {
SVgroupTablesBatch tBatch = {0};
SVgroupCreateTableBatch tBatch = {0};
tBatch.info = *pVgInfo;
strcpy(tBatch.dbName, pDbName);
......@@ -3480,21 +3480,21 @@ static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableCla
return code;
}
static SArray* serializeVgroupsTablesBatch(int32_t acctId, SHashObj* pVgroupHashmap) {
static SArray* serializeVgroupsCreateTableBatch(int32_t acctId, SHashObj* pVgroupHashmap) {
SArray* pBufArray = taosArrayInit(taosHashGetSize(pVgroupHashmap), sizeof(void*));
if (NULL == pBufArray) {
return NULL;
}
int32_t code = TSDB_CODE_SUCCESS;
SVgroupTablesBatch* pTbBatch = NULL;
SVgroupCreateTableBatch* pTbBatch = NULL;
do {
pTbBatch = taosHashIterate(pVgroupHashmap, pTbBatch);
if (pTbBatch == NULL) {
break;
}
serializeVgroupTablesBatch(pTbBatch, pBufArray);
serializeVgroupCreateTableBatch(pTbBatch, pBufArray);
destroyCreateTbReqBatch(pTbBatch);
} while (true);
......@@ -3519,7 +3519,143 @@ static int32_t rewriteCreateMultiTable(STranslateContext* pCxt, SQuery* pQuery)
}
}
SArray* pBufArray = serializeVgroupsTablesBatch(pCxt->pParseCxt->acctId, pVgroupHashmap);
SArray* pBufArray = serializeVgroupsCreateTableBatch(pCxt->pParseCxt->acctId, pVgroupHashmap);
taosHashCleanup(pVgroupHashmap);
if (NULL == pBufArray) {
return TSDB_CODE_OUT_OF_MEMORY;
}
return rewriteToVnodeModifOpStmt(pQuery, pBufArray);
}
typedef struct SVgroupDropTableBatch {
SVDropTbBatchReq req;
SVgroupInfo info;
char dbName[TSDB_DB_NAME_LEN];
} SVgroupDropTableBatch;
static void addDropTbReqIntoVgroup(SHashObj* pVgroupHashmap, SDropTableClause* pClause, SVgroupInfo* pVgInfo) {
SVDropTbReq req = {.name = pClause->tableName, .igNotExists = pClause->ignoreNotExists};
SVgroupDropTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pVgInfo->vgId, sizeof(pVgInfo->vgId));
if (NULL == pTableBatch) {
SVgroupDropTableBatch tBatch = {0};
tBatch.info = *pVgInfo;
tBatch.req.pArray = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbReq));
taosArrayPush(tBatch.req.pArray, &req);
taosHashPut(pVgroupHashmap, &pVgInfo->vgId, sizeof(pVgInfo->vgId), &tBatch, sizeof(tBatch));
} else { // add to the correct vgroup
taosArrayPush(pTableBatch->req.pArray, &req);
}
}
static int32_t buildDropTableVgroupHashmap(STranslateContext* pCxt, SDropTableClause* pClause, bool* pIsSuperTable,
SHashObj* pVgroupHashmap) {
STableMeta* pTableMeta = NULL;
int32_t code = getTableMeta(pCxt, pClause->dbName, pClause->tableName, &pTableMeta);
if (TSDB_CODE_SUCCESS == code && TSDB_SUPER_TABLE == pTableMeta->tableType) {
*pIsSuperTable = true;
goto over;
}
*pIsSuperTable = false;
SVgroupInfo info = {0};
if (TSDB_CODE_SUCCESS == code) {
code = getTableHashVgroup(pCxt, pClause->dbName, pClause->tableName, &info);
}
if (TSDB_CODE_SUCCESS == code) {
addDropTbReqIntoVgroup(pVgroupHashmap, pClause, &info);
}
over:
taosMemoryFreeClear(pTableMeta);
return code;
}
static void destroyDropTbReqBatch(SVgroupDropTableBatch* pTbBatch) { taosArrayDestroy(pTbBatch->req.pArray); }
static int32_t serializeVgroupDropTableBatch(SVgroupDropTableBatch* pTbBatch, SArray* pBufArray) {
int tlen;
SCoder coder = {0};
int32_t ret = 0;
tEncodeSize(tEncodeSVDropTbBatchReq, &pTbBatch->req, tlen, ret);
tlen += sizeof(SMsgHead);
void* buf = taosMemoryMalloc(tlen);
if (NULL == buf) {
return TSDB_CODE_OUT_OF_MEMORY;
}
((SMsgHead*)buf)->vgId = htonl(pTbBatch->info.vgId);
((SMsgHead*)buf)->contLen = htonl(tlen);
void* pBuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tCoderInit(&coder, TD_LITTLE_ENDIAN, pBuf, tlen - sizeof(SMsgHead), TD_ENCODER);
tEncodeSVDropTbBatchReq(&coder, &pTbBatch->req);
tCoderClear(&coder);
SVgDataBlocks* pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
if (NULL == pVgData) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pVgData->vg = pTbBatch->info;
pVgData->pData = buf;
pVgData->size = tlen;
pVgData->numOfTables = (int32_t)taosArrayGetSize(pTbBatch->req.pArray);
taosArrayPush(pBufArray, &pVgData);
return TSDB_CODE_SUCCESS;
}
static SArray* serializeVgroupsDropTableBatch(int32_t acctId, SHashObj* pVgroupHashmap) {
SArray* pBufArray = taosArrayInit(taosHashGetSize(pVgroupHashmap), sizeof(void*));
if (NULL == pBufArray) {
return NULL;
}
int32_t code = TSDB_CODE_SUCCESS;
SVgroupDropTableBatch* pTbBatch = NULL;
do {
pTbBatch = taosHashIterate(pVgroupHashmap, pTbBatch);
if (pTbBatch == NULL) {
break;
}
serializeVgroupDropTableBatch(pTbBatch, pBufArray);
destroyDropTbReqBatch(pTbBatch);
} while (true);
return pBufArray;
}
static int32_t rewriteDropTable(STranslateContext* pCxt, SQuery* pQuery) {
SDropTableStmt* pStmt = (SDropTableStmt*)pQuery->pRoot;
SHashObj* pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
if (NULL == pVgroupHashmap) {
return TSDB_CODE_OUT_OF_MEMORY;
}
bool isSuperTable = false;
SNode* pNode;
FOREACH(pNode, pStmt->pTables) {
int32_t code = buildDropTableVgroupHashmap(pCxt, (SDropTableClause*)pNode, &isSuperTable, pVgroupHashmap);
if (TSDB_CODE_SUCCESS != code) {
taosHashCleanup(pVgroupHashmap);
return code;
}
if (isSuperTable && LIST_LENGTH(pStmt->pTables) > 1) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DROP_STABLE);
}
}
if (isSuperTable) {
taosHashCleanup(pVgroupHashmap);
return TSDB_CODE_SUCCESS;
}
SArray* pBufArray = serializeVgroupsDropTableBatch(pCxt->pParseCxt->acctId, pVgroupHashmap);
taosHashCleanup(pVgroupHashmap);
if (NULL == pBufArray) {
return TSDB_CODE_OUT_OF_MEMORY;
......@@ -3565,6 +3701,9 @@ static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) {
case QUERY_NODE_CREATE_MULTI_TABLE_STMT:
code = rewriteCreateMultiTable(pCxt, pQuery);
break;
case QUERY_NODE_DROP_TABLE_STMT:
code = rewriteDropTable(pCxt, pQuery);
break;
case QUERY_NODE_ALTER_TABLE_STMT:
if (TSDB_ALTER_TABLE_UPDATE_TAG_VAL == ((SAlterTableStmt*)pQuery->pRoot)->alterType) {
code = rewriteAlterTable(pCxt, pQuery);
......
......@@ -126,6 +126,8 @@ static char* getSyntaxErrFormat(int32_t errCode) {
return "slimit/soffset only available for PARTITION BY query";
case TSDB_CODE_PAR_INVALID_TOPIC_QUERY:
return "Invalid topic query";
case TSDB_CODE_PAR_INVALID_DROP_STABLE:
return "Cannot drop super table in batch";
case TSDB_CODE_OUT_OF_MEMORY:
return "Out of memory";
default:
......
......@@ -944,9 +944,16 @@ static int32_t createSetOperatorLogicNode(SLogicPlanContext* pCxt, SSetOperator*
}
static int32_t getMsgType(ENodeType sqlType) {
return (QUERY_NODE_CREATE_TABLE_STMT == sqlType || QUERY_NODE_CREATE_MULTI_TABLE_STMT == sqlType)
? TDMT_VND_CREATE_TABLE
: TDMT_VND_SUBMIT;
switch (sqlType) {
case QUERY_NODE_CREATE_TABLE_STMT:
case QUERY_NODE_CREATE_MULTI_TABLE_STMT:
return TDMT_VND_CREATE_TABLE;
case QUERY_NODE_DROP_TABLE_STMT:
return TDMT_VND_DROP_TABLE;
default:
break;
}
return TDMT_VND_SUBMIT;
}
static int32_t createVnodeModifLogicNode(SLogicPlanContext* pCxt, SVnodeModifOpStmt* pStmt, SLogicNode** pLogicNode) {
......
......@@ -245,6 +245,7 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m
SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
return TSDB_CODE_SUCCESS;
case TDMT_VND_CREATE_TABLE_RSP:
case TDMT_VND_DROP_TABLE_RSP:
case TDMT_VND_SUBMIT_RSP:
break;
default:
......@@ -369,7 +370,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
}
for (int32_t n = 0; n < childNum; ++n) {
SSubplan * child = (SSubplan *)nodesListGetNode(pPlan->pChildren, n);
SSubplan *child = (SSubplan *)nodesListGetNode(pPlan->pChildren, n);
SSchTask **childTask = taosHashGet(planToTask, &child, POINTER_BYTES);
if (NULL == childTask || NULL == *childTask) {
SCH_TASK_ELOG("subplan children relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
......@@ -401,7 +402,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
}
for (int32_t n = 0; n < parentNum; ++n) {
SSubplan * parent = (SSubplan *)nodesListGetNode(pPlan->pParents, n);
SSubplan *parent = (SSubplan *)nodesListGetNode(pPlan->pParents, n);
SSchTask **parentTask = taosHashGet(planToTask, &parent, POINTER_BYTES);
if (NULL == parentTask || NULL == *parentTask) {
SCH_TASK_ELOG("subplan parent relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
......@@ -491,7 +492,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
SSchLevel level = {0};
SNodeListNode *plans = NULL;
int32_t taskNum = 0;
SSchLevel * pLevel = NULL;
SSchLevel *pLevel = NULL;
level.status = JOB_TASK_STATUS_NOT_START;
......@@ -1094,6 +1095,30 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
break;
}
case TDMT_VND_DROP_TABLE_RSP: {
SVDropTbBatchRsp batchRsp = {0};
if (msg) {
SCoder coder = {0};
tCoderInit(&coder, TD_LITTLE_ENDIAN, msg, msgSize, TD_DECODER);
code = tDecodeSVDropTbBatchRsp(&coder, &batchRsp);
if (TSDB_CODE_SUCCESS == code && batchRsp.pArray) {
int32_t num = taosArrayGetSize(batchRsp.pArray);
for (int32_t i = 0; i < num; ++i) {
SVDropTbRsp *rsp = taosArrayGet(batchRsp.pArray, i);
if (NEED_CLIENT_HANDLE_ERROR(rsp->code)) {
tCoderClear(&coder);
SCH_ERR_JRET(rsp->code);
}
}
}
tCoderClear(&coder);
SCH_ERR_JRET(code);
}
SCH_ERR_JRET(rspCode);
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
break;
}
case TDMT_VND_SUBMIT_RSP: {
if (msg) {
SSubmitRsp *rsp = (SSubmitRsp *)msg;
......@@ -1267,7 +1292,7 @@ int32_t schUpdateTaskExecNodeHandle(SSchTask *pTask, void *handle, int32_t rspCo
int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, int32_t rspCode) {
int32_t code = 0;
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
SSchTask * pTask = NULL;
SSchTask *pTask = NULL;
SSchJob *pJob = schAcquireJob(pParam->refId);
if (NULL == pJob) {
......@@ -1316,6 +1341,10 @@ int32_t schHandleCreateTableCallback(void *param, const SDataBuf *pMsg, int32_t
return schHandleCallback(param, pMsg, TDMT_VND_CREATE_TABLE_RSP, code);
}
int32_t schHandleDropTableCallback(void *param, const SDataBuf *pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_DROP_TABLE_RSP, code);
}
int32_t schHandleQueryCallback(void *param, const SDataBuf *pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_QUERY_RSP, code);
}
......@@ -1412,6 +1441,9 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
case TDMT_VND_CREATE_TABLE:
*fp = schHandleCreateTableCallback;
break;
case TDMT_VND_DROP_TABLE:
*fp = schHandleDropTableCallback;
break;
case TDMT_VND_SUBMIT:
*fp = schHandleSubmitCallback;
break;
......@@ -1617,8 +1649,8 @@ _return:
int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
int32_t code = 0;
SSchHbCallbackParam *param = NULL;
SMsgSendInfo * pMsgSendInfo = NULL;
SQueryNodeAddr * addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
SMsgSendInfo *pMsgSendInfo = NULL;
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
SQueryNodeEpId epId = {0};
epId.nodeId = addr->nodeId;
......@@ -1759,10 +1791,10 @@ int32_t schCloneHbRpcCtx(SRpcCtx *pSrc, SRpcCtx *pDst) {
}
SRpcCtxVal dst = {0};
void * pIter = taosHashIterate(pSrc->args, NULL);
void *pIter = taosHashIterate(pSrc->args, NULL);
while (pIter) {
SRpcCtxVal *pVal = (SRpcCtxVal *)pIter;
int32_t * msgType = taosHashGetKey(pIter, NULL);
int32_t *msgType = taosHashGetKey(pIter, NULL);
dst = *pVal;
dst.val = NULL;
......@@ -1916,7 +1948,7 @@ _return:
int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t msgType) {
uint32_t msgSize = 0;
void * msg = NULL;
void *msg = NULL;
int32_t code = 0;
bool isCandidateAddr = false;
bool persistHandle = false;
......@@ -1931,6 +1963,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
switch (msgType) {
case TDMT_VND_CREATE_TABLE:
case TDMT_VND_DROP_TABLE:
case TDMT_VND_SUBMIT: {
msgSize = pTask->msgLen;
msg = taosMemoryCalloc(1, msgSize);
......@@ -2673,7 +2706,7 @@ int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub) {
SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
for (int32_t m = 0; m < pLevel->taskNum; ++m) {
SSchTask * pTask = taosArrayGet(pLevel->subTasks, m);
SSchTask *pTask = taosArrayGet(pLevel->subTasks, m);
SQuerySubDesc subDesc = {.tid = pTask->taskId, .status = pTask->status};
taosArrayPush(pSub, &subDesc);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册