未验证 提交 ea230c91 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #4784 from taosdata/feature/TD-2426

[TD-2426]<feature>: first round batch create table
......@@ -184,7 +184,19 @@ void dnodeReprocessMWriteMsg(void *pMsg) {
dDebug("msg:%p, app:%p type:%s is redirected for mnode not running, retry times:%d", pWrite, pWrite->rpcMsg.ahandle,
taosMsg[pWrite->rpcMsg.msgType], pWrite->retry);
dnodeSendRedirectMsg(pMsg, true);
if (pWrite->pBatchMasterMsg) {
++pWrite->pBatchMasterMsg->received;
if (pWrite->pBatchMasterMsg->successed + pWrite->pBatchMasterMsg->received
>= pWrite->pBatchMasterMsg->expected) {
dnodeSendRedirectMsg(&pWrite->rpcMsg, true);
dnodeFreeMWriteMsg(pWrite);
}
mnodeDestroySubMsg(pWrite);
return;
}
dnodeSendRedirectMsg(&pWrite->rpcMsg, true);
dnodeFreeMWriteMsg(pWrite);
} else {
dDebug("msg:%p, app:%p type:%s is reput into mwrite queue:%p, retry times:%d", pWrite, pWrite->rpcMsg.ahandle,
......
......@@ -42,11 +42,12 @@ typedef struct SMnodeMsg {
struct SVgObj * pVgroup;
struct STableObj *pTable;
struct SSTableObj*pSTable;
struct SMnodeMsg *pBatchMasterMsg;
SMnodeRsp rpcRsp;
int8_t received;
int8_t successed;
int8_t expected;
int8_t retry;
int16_t received;
int16_t successed;
int16_t expected;
int16_t retry;
int32_t incomingTs;
int32_t code;
void * pObj;
......@@ -57,6 +58,7 @@ typedef struct SMnodeMsg {
void * mnodeCreateMsg(SRpcMsg *pRpcMsg);
int32_t mnodeInitMsg(SMnodeMsg *pMsg);
void mnodeCleanupMsg(SMnodeMsg *pMsg);
void mnodeDestroySubMsg(SMnodeMsg *pSubMsg);
int32_t mnodeInitSystem();
int32_t mnodeStartSystem();
......
......@@ -26,6 +26,7 @@
#include "tcompare.h"
#include "tdataformat.h"
#include "tgrant.h"
#include "tqueue.h"
#include "hash.h"
#include "mnode.h"
#include "dnode.h"
......@@ -720,6 +721,133 @@ static void mnodeExtractTableName(char* tableId, char* name) {
}
}
static SMnodeMsg *mnodeCreateSubMsg(SMnodeMsg *pBatchMasterMsg, int32_t contSize) {
SMnodeMsg *pSubMsg = taosAllocateQitem(sizeof(*pBatchMasterMsg) + contSize);
*pSubMsg = *pBatchMasterMsg;
//pSubMsg->pCont = (char *) pSubMsg + sizeof(SMnodeMsg);
pSubMsg->rpcMsg.pCont = pSubMsg->pCont;
pSubMsg->successed = 0;
pSubMsg->expected = 0;
SCMCreateTableMsg *pCM = pSubMsg->rpcMsg.pCont;
pCM->numOfTables = htonl(1);
pCM->contLen = htonl(contSize);
return pSubMsg;
}
void mnodeDestroySubMsg(SMnodeMsg *pSubMsg) {
if (pSubMsg) {
// pUser is retained in batch master msg
if (pSubMsg->pDb) mnodeDecDbRef(pSubMsg->pDb);
if (pSubMsg->pVgroup) mnodeDecVgroupRef(pSubMsg->pVgroup);
if (pSubMsg->pTable) mnodeDecTableRef(pSubMsg->pTable);
if (pSubMsg->pSTable) mnodeDecTableRef(pSubMsg->pSTable);
if (pSubMsg->pAcct) mnodeDecAcctRef(pSubMsg->pAcct);
if (pSubMsg->pDnode) mnodeDecDnodeRef(pSubMsg->pDnode);
taosFreeQitem(pSubMsg);
}
}
static int32_t mnodeValidateCreateTableMsg(SCreateTableMsg *pCreateTable, SMnodeMsg *pMsg) {
if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDb(pCreateTable->db);
if (pMsg->pDb == NULL) {
mError("msg:%p, app:%p table:%s, failed to create, db not selected", pMsg, pMsg->rpcMsg.ahandle, pCreateTable->tableId);
return TSDB_CODE_MND_DB_NOT_SELECTED;
}
if (pMsg->pDb->status != TSDB_DB_STATUS_READY) {
mError("db:%s, status:%d, in dropping", pMsg->pDb->name, pMsg->pDb->status);
return TSDB_CODE_MND_DB_IN_DROPPING;
}
if (pMsg->pTable == NULL) pMsg->pTable = mnodeGetTable(pCreateTable->tableId);
if (pMsg->pTable != NULL && pMsg->retry == 0) {
if (pCreateTable->getMeta) {
mDebug("msg:%p, app:%p table:%s, continue to get meta", pMsg, pMsg->rpcMsg.ahandle, pCreateTable->tableId);
return mnodeGetChildTableMeta(pMsg);
} else if (pCreateTable->igExists) {
mDebug("msg:%p, app:%p table:%s, is already exist", pMsg, pMsg->rpcMsg.ahandle, pCreateTable->tableId);
return TSDB_CODE_SUCCESS;
} else {
mError("msg:%p, app:%p table:%s, failed to create, table already exist", pMsg, pMsg->rpcMsg.ahandle,
pCreateTable->tableId);
return TSDB_CODE_MND_TABLE_ALREADY_EXIST;
}
}
if (pCreateTable->numOfTags != 0) {
mDebug("msg:%p, app:%p table:%s, create stable msg is received from thandle:%p", pMsg, pMsg->rpcMsg.ahandle,
pCreateTable->tableId, pMsg->rpcMsg.handle);
return mnodeProcessCreateSuperTableMsg(pMsg);
} else {
mDebug("msg:%p, app:%p table:%s, create ctable msg is received from thandle:%p", pMsg, pMsg->rpcMsg.ahandle,
pCreateTable->tableId, pMsg->rpcMsg.handle);
return mnodeProcessCreateChildTableMsg(pMsg);
}
}
static int32_t mnodeProcessBatchCreateTableMsg(SMnodeMsg *pMsg) {
if (pMsg->pBatchMasterMsg == NULL) { // batch master first round
pMsg->pBatchMasterMsg = pMsg;
SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont;
int32_t numOfTables = htonl(pCreate->numOfTables);
int32_t contentLen = htonl(pCreate->contLen);
pMsg->expected = numOfTables;
int32_t code = TSDB_CODE_SUCCESS;
SCreateTableMsg *pCreateTable = (SCreateTableMsg*) ((char*) pCreate + sizeof(SCMCreateTableMsg));
for (SCreateTableMsg *p = pCreateTable; p < (SCreateTableMsg *) ((char *) pCreate + contentLen); p = (SCreateTableMsg *) ((char *) p + htonl(p->len))) {
SMnodeMsg *pSubMsg = mnodeCreateSubMsg(pMsg, sizeof(SCMCreateTableMsg) + htonl(p->len));
memcpy(pSubMsg->pCont + sizeof(SCMCreateTableMsg), p, htonl(p->len));
code = mnodeValidateCreateTableMsg(p, pSubMsg);
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_MND_TABLE_ALREADY_EXIST) {
++pSubMsg->pBatchMasterMsg->successed;
mnodeDestroySubMsg(pSubMsg);
continue;
}
if (code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mnodeDestroySubMsg(pSubMsg);
return code;
}
}
if (pMsg->successed >= pMsg->expected) {
return code;
} else {
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
} else {
if (pMsg->pBatchMasterMsg != pMsg) { // batch sub replay
SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont;
SCreateTableMsg *pCreateTable = (SCreateTableMsg*) ((char*) pCreate + sizeof(SCMCreateTableMsg));
int32_t code = mnodeValidateCreateTableMsg(pCreateTable, pMsg);
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_MND_TABLE_ALREADY_EXIST) {
++pMsg->pBatchMasterMsg->successed;
mnodeDestroySubMsg(pMsg);
}
if (code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mnodeDestroySubMsg(pMsg);
return code;
}
if (pMsg->pBatchMasterMsg->successed + pMsg->pBatchMasterMsg->received
>= pMsg->pBatchMasterMsg->expected) {
return code;
} else {
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
} else { // batch master replay, reprocess the whole batch
assert(0);
}
}
}
static int32_t mnodeProcessCreateTableMsg(SMnodeMsg *pMsg) {
SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont;
......@@ -729,6 +857,11 @@ static int32_t mnodeProcessCreateTableMsg(SMnodeMsg *pMsg) {
// todo return error
}
// batch master msg first round or reprocessing and batch sub msg reprocessing
if (numOfTables > 1 || pMsg->pBatchMasterMsg != NULL) {
return mnodeProcessBatchCreateTableMsg(pMsg);
}
SCreateTableMsg *p = (SCreateTableMsg*)((char*) pCreate + sizeof(SCMCreateTableMsg));
if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDb(p->db);
if (pMsg->pDb == NULL) {
......@@ -1737,6 +1870,18 @@ static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) {
mDebug("msg:%p, app:%p table:%s, created in dnode, thandle:%p", pMsg, pMsg->rpcMsg.ahandle, pTable->info.tableId,
pMsg->rpcMsg.handle);
if (pMsg->pBatchMasterMsg) {
++pMsg->pBatchMasterMsg->successed;
if (pMsg->pBatchMasterMsg->successed + pMsg->pBatchMasterMsg->received
>= pMsg->pBatchMasterMsg->expected) {
dnodeSendRpcMWriteRsp(pMsg->pBatchMasterMsg, code);
}
mnodeDestroySubMsg(pMsg);
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
dnodeSendRpcMWriteRsp(pMsg, TSDB_CODE_SUCCESS);
}
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
......@@ -2477,6 +2622,19 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
mnodeSendDropChildTableMsg(pMsg, false);
rpcMsg->code = TSDB_CODE_SUCCESS;
if (pMsg->pBatchMasterMsg) {
++pMsg->pBatchMasterMsg->successed;
if (pMsg->pBatchMasterMsg->successed + pMsg->pBatchMasterMsg->received
>= pMsg->pBatchMasterMsg->expected) {
dnodeSendRpcMWriteRsp(pMsg->pBatchMasterMsg, rpcMsg->code);
}
mnodeDestroySubMsg(pMsg);
return;
}
dnodeSendRpcMWriteRsp(pMsg, rpcMsg->code);
return;
}
......@@ -2494,6 +2652,19 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
pMsg->pTable = NULL;
mnodeDestroyChildTable(pTable);
if (pMsg->pBatchMasterMsg) {
++pMsg->pBatchMasterMsg->received;
if (pMsg->pBatchMasterMsg->successed + pMsg->pBatchMasterMsg->received
>= pMsg->pBatchMasterMsg->expected) {
dnodeSendRpcMWriteRsp(pMsg->pBatchMasterMsg, code);
}
mnodeDestroySubMsg(pMsg);
return;
}
dnodeSendRpcMWriteRsp(pMsg, code);
}
} else {
......@@ -2519,6 +2690,19 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
//Avoid retry again in client
rpcMsg->code = TSDB_CODE_MND_VGROUP_NOT_READY;
}
if (pMsg->pBatchMasterMsg) {
++pMsg->pBatchMasterMsg->received;
if (pMsg->pBatchMasterMsg->successed + pMsg->pBatchMasterMsg->received
>= pMsg->pBatchMasterMsg->expected) {
dnodeSendRpcMWriteRsp(pMsg->pBatchMasterMsg, rpcMsg->code);
}
mnodeDestroySubMsg(pMsg);
return;
}
dnodeSendRpcMWriteRsp(pMsg, rpcMsg->code);
}
}
......
......@@ -28,6 +28,7 @@ class TDTestCase:
def run(self):
time.sleep(5)
tdSql.execute("use log")
tdSql.execute("create table cpustrm as select count(*), avg(cpu_taosd), max(cpu_taosd), min(cpu_taosd), avg(cpu_system), max(cpu_cores), min(cpu_cores), last(cpu_cores) from log.dn1 interval(4s)")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册