提交 be08d243 编写于 作者: S slguan

[TD-10] add codes for create table

上级 f98efaf2
......@@ -185,9 +185,10 @@ static int32_t dnodeOpenVnodes() {
typedef void (*CleanupFp)(char *);
static void dnodeCleanupVnodes() {
int32_t num = taosGetIntHashSize(tsDnodeVnodesHash);
taosCleanUpIntHashWithFp(tsDnodeVnodesHash, dnodeCleanupVnode);
taosCleanUpIntHashWithFp(tsDnodeVnodesHash, (CleanupFp)dnodeCleanupVnode);
dPrint("all vnodes is opened, num:%d", num);
......@@ -207,7 +208,7 @@ static int32_t dnodeOpenVnode(int32_t vnode, char *rootDir) {
vnodeObj.vnode = vnode; //tsdbInfo->tsdbCfg.tsdbId;
vnodeObj.status = TSDB_VN_STATUS_NOT_READY;
vnodeObj.refCount = 1;
vnodeObj.version = version;
vnodeObj.version = 0;
vnodeObj.wworker = dnodeAllocateWriteWorker();
vnodeObj.rworker = dnodeAllocateReadWorker();
vnodeObj.wal = NULL;
......@@ -289,7 +290,7 @@ static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *pVnodeCfg) {
void *pTsdb = tsdbCreateRepo(rootDir, &tsdbCfg, NULL);
if (pTsdb == NULL) {
dError("failed to create tsdb in vnode:%d, reason:%s", pVnodeCfg->vnode, tstrerror(terrno));
dError("vgroup:%d, failed to create tsdb in vnode:%d, reason:%s", pVnodeCfg->cfg.vgId, pVnodeCfg->vnode, tstrerror(terrno));
return terrno;
......@@ -309,7 +310,7 @@ static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *pVnodeCfg) {
taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *) (&vnodeObj));
dPrint("vnode:%d is created", pVnodeCfg->vnode);
dPrint("vgroup:%d, vnode:%d is created", pVnodeCfg->cfg.vgId, pVnodeCfg->vnode);
......@@ -339,12 +340,12 @@ static void dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions);
pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile);
dTrace("start to create vnode:%d", pCreate->vnode);
dTrace("vgroup:%d, start to create vnode:%d", pCreate->cfg.vgId, pCreate->vnode);
SVnodeObj *pVnodeObj = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pCreate->cfg.vgId);
if (pVnodeObj != NULL) {
rpcRsp.code = TSDB_CODE_SUCCESS;
dPrint("vnode:%d is already exist", pCreate->vnode);
dPrint("vgroup:%d, vnode:%d is already exist", pCreate->cfg.vgId, pCreate->vnode);
} else {
rpcRsp.code = dnodeCreateVnode(pCreate);
......@@ -291,13 +291,13 @@ static void dnodeProcessSubmitMsg(SWriteMsg *pMsg) {
static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg) {
SMDCreateTableMsg *pTable = pMsg->rpcMsg.pCont;
dTrace("start to create table:%s in vgroup:%d", pTable->tableId, pTable->vgId);
dTrace("table:%s, start to create in dnode, vgroup:%d", pTable->tableId, pTable->vgId);
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
void *pVnode = dnodeGetVnode(pTable->vgId);
if (pVnode == NULL) {
dTrace("failed to create table:%s in vgroup:%d, reason:%s", pTable->tableId, pTable->vgId, tstrerror(rpcRsp.code));
dTrace("table:%s, failed to create in vgroup:%d, reason:%s", pTable->tableId, pTable->vgId, tstrerror(rpcRsp.code));
......@@ -306,7 +306,7 @@ static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg) {
if (pTsdb == NULL) {
dTrace("failed to create table:%s in vgroup:%d, reason:%s", pTable->tableId, pTable->vgId, tstrerror(rpcRsp.code));
dTrace("table:%s, failed to create in vgroup:%d, reason:%s", pTable->tableId, pTable->vgId, tstrerror(rpcRsp.code));
......@@ -347,10 +347,10 @@ static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg) {
if (rpcRsp.code != TSDB_CODE_SUCCESS) {
dError("failed to create table:%s in vgroup:%d, reason:%s", pTable->tableId, pTable->vgId, tstrerror(rpcRsp.code));
dError("table:%s, failed to create in vgroup:%d, reason:%s", pTable->tableId, pTable->vgId, tstrerror(rpcRsp.code));
} else {
dTrace("create table:%s in vgroup:%d finished", pTable->tableId, pTable->vgId);
dTrace("table:%s, created in dnode", pTable->tableId);
......@@ -97,7 +97,6 @@ struct _vg_obj;
typedef struct SSuperTableObj {
char tableId[TSDB_TABLE_ID_LEN + 1];
int8_t type;
int8_t dirty;
uint64_t uid;
int32_t sid;
int32_t vgId;
......@@ -177,15 +177,6 @@ enum _mgmt_table {
#define TSDB_KILL_MSG_LEN 30
typedef enum {
TSDB_TABLE_TYPE_SUPER_TABLE = 0, // super table
TSDB_TABLE_TYPE_CHILD_TABLE = 1, // table created from super table
TSDB_TABLE_TYPE_NORMAL_TABLE = 2, // ordinary table
TSDB_TABLE_TYPE_STREAM_TABLE = 3, // table created from stream computing
} ETableType;
#define TSDB_VN_READ_ACCCESS ((char)0x1)
#define TSDB_VN_WRITE_ACCCESS ((char)0x2)
......@@ -258,11 +249,9 @@ typedef struct {
int32_t sversion;
int32_t tagDataLen;
int32_t sqlDataLen;
int32_t numOfVPeers;
uint64_t uid;
uint64_t superTableUid;
uint64_t createdTime;
SVnodeDesc vpeerDesc[TSDB_MAX_MPEERS];
char tableId[TSDB_TABLE_ID_LEN + 1];
char superTableId[TSDB_TABLE_ID_LEN + 1];
char data[];
......@@ -38,7 +38,6 @@ int32_t mgmtAlterTable(SDbObj *pDb, SCMAlterTableMsg *pAlter);
void mgmtAddTableIntoSuperTable(SSuperTableObj *pStable);
void mgmtRemoveTableFromSuperTable(SSuperTableObj *pStable);
void mgmtSetTableDirty(STableInfo *pTable, bool isDirty);
SMDDropTableMsg *mgmtBuildRemoveTableMsg(STableInfo *pTable);
SMDDropSTableMsg *mgmtBuildRemoveSuperTableMsg(STableInfo *pTable);
......@@ -276,32 +276,27 @@ static void *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgrou
int32_t totalCols = pTable->superTable->numOfColumns + pTable->superTable->numOfTags;
int32_t contLen = sizeof(SMDCreateTableMsg) + totalCols * sizeof(SSchema) + tagDataLen;
SMDCreateTableMsg *pCreateTable = rpcMallocCont(contLen);
if (pCreateTable == NULL) {
SMDCreateTableMsg *pCreate = rpcMallocCont(contLen);
if (pCreate == NULL) {
return NULL;
memcpy(pCreateTable->tableId, pTable->tableId, TSDB_TABLE_ID_LEN);
memcpy(pCreateTable->superTableId, pTable->superTable->tableId, TSDB_TABLE_ID_LEN);
pCreateTable->tableType = pTable->type;
pCreateTable->numOfColumns = htons(pTable->superTable->numOfColumns);
pCreateTable->numOfTags = htons(pTable->superTable->numOfTags);
pCreateTable->sid = htonl(pTable->sid);
pCreateTable->sversion = htonl(pTable->superTable->sversion);
pCreateTable->tagDataLen = htonl(tagDataLen);
pCreateTable->sqlDataLen = 0;
pCreateTable->contLen = htonl(contLen);
pCreateTable->numOfVPeers = htonl(pVgroup->numOfVnodes);
pCreateTable->uid = htobe64(pTable->uid);
pCreateTable->superTableUid = htobe64(pTable->superTable->uid);
pCreateTable->createdTime = htobe64(pTable->createdTime);
for (int i = 0; i < pVgroup->numOfVnodes; ++i) {
pCreateTable->vpeerDesc[i].ip = htonl(pVgroup->vnodeGid[i].ip);
pCreateTable->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode);
SSchema *pSchema = (SSchema *) pCreateTable->data;
memcpy(pCreate->tableId, pTable->tableId, TSDB_TABLE_ID_LEN);
memcpy(pCreate->superTableId, pTable->superTable->tableId, TSDB_TABLE_ID_LEN);
pCreate->contLen = htonl(contLen);
pCreate->vgId = htonl(pVgroup->vgId);
pCreate->tableType = pTable->type;
pCreate->numOfColumns = htons(pTable->superTable->numOfColumns);
pCreate->numOfTags = htons(pTable->superTable->numOfTags);
pCreate->sid = htonl(pTable->sid);
pCreate->sversion = htonl(pTable->superTable->sversion);
pCreate->tagDataLen = htonl(tagDataLen);
pCreate->sqlDataLen = 0;
pCreate->uid = htobe64(pTable->uid);
pCreate->superTableUid = htobe64(pTable->superTable->uid);
pCreate->createdTime = htobe64(pTable->createdTime);
SSchema *pSchema = (SSchema *) pCreate->data;
memcpy(pSchema, pTable->superTable->schema, totalCols * sizeof(SSchema));
for (int32_t col = 0; col < totalCols; ++col) {
pSchema->bytes = htons(pSchema->bytes);
......@@ -309,20 +304,20 @@ static void *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgrou
memcpy(pCreateTable + sizeof(SMDCreateTableMsg) + totalCols * sizeof(SSchema), pTagData, tagDataLen);
memcpy(pCreate->data + totalCols * sizeof(SSchema), pTagData, tagDataLen);
return pCreateTable;
return pCreate;
int32_t mgmtCreateChildTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid,
SMDCreateTableMsg **pDCreateOut, STableInfo **pTableOut) {
SMDCreateTableMsg **pMDCreateOut, STableInfo **pTableOut) {
int32_t numOfTables = sdbGetNumOfRows(tsChildTableSdb);
if (numOfTables >= tsMaxTables) {
mError("table:%s, numOfTables:%d exceed maxTables:%d", pCreate->tableId, numOfTables, tsMaxTables);
char *pTagData = (char *) pCreate->schema; // it is a tag key
char *pTagData = (char *) pCreate->schema; // it is a tag key
SSuperTableObj *pSuperTable = mgmtGetSuperTable(pTagData);
if (pSuperTable == NULL) {
mError("table:%s, corresponding super table does not exist", pCreate->tableId);
......@@ -336,32 +331,31 @@ int32_t mgmtCreateChildTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgObj
strcpy(pTable->tableId, pCreate->tableId);
strcpy(pTable->superTableId, pSuperTable->tableId);
pTable->createdTime = taosGetTimestampMs();
pTable->superTable = pSuperTable;
pTable->vgId = pVgroup->vgId;
pTable->sid = sid;
pTable->type = TSDB_CHILD_TABLE;
pTable->uid = (((uint64_t) pTable->vgId) << 40) + ((((uint64_t) pTable->sid) & ((1ul << 24) - 1ul)) << 16) +
((uint64_t) sdbGetVersion() & ((1ul << 16) - 1ul));
pTable->sid = sid;
pTable->vgId = pVgroup->vgId;
pTable->createdTime = taosGetTimestampMs();
pTable->superTable = pSuperTable;
if (sdbInsertRow(tsChildTableSdb, pTable, 0) < 0) {
mError("table:%s, update sdb error", pCreate->tableId);
pTagData += (TSDB_TABLE_ID_LEN + 1);
int32_t tagDataLen = contLen - sizeof(SCMCreateTableMsg) - TSDB_TABLE_ID_LEN - 1;
*pDCreateOut = mgmtBuildCreateChildTableMsg(pTable, pVgroup, pTagData, tagDataLen);
if (*pDCreateOut == NULL) {
*pMDCreateOut = mgmtBuildCreateChildTableMsg(pTable, pVgroup, pTagData, tagDataLen);
if (*pMDCreateOut == NULL) {
mError("table:%s, failed to build create table message", pCreate->tableId);
*pTableOut = (STableInfo *) pTable;
mTrace("table:%s, create ctable in vgroup, vgroup:%d sid:%d vnode:%d uid:%" PRIu64 ,
pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid);
mTrace("table:%s, create ctable in vgroup, uid:%" PRIu64 , pTable->tableId, pTable->uid);
......@@ -301,13 +301,13 @@ static void *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgr
pCreate->vgId = htonl(pVgroup->vgId);
pCreate->tableType = pTable->type;
pCreate->numOfColumns = htons(pTable->numOfColumns);
pCreate->numOfTags = htons(0);
pCreate->numOfTags = 0;
pCreate->sid = htonl(pTable->sid);
pCreate->sversion = htonl(pTable->sversion);
pCreate->tagDataLen = htonl(0);
pCreate->tagDataLen = 0;
pCreate->sqlDataLen = htonl(pTable->sqlLen);
pCreate->uid = htobe64(pTable->uid);
pCreate->superTableUid = htobe64(0);
pCreate->superTableUid = 0;
pCreate->createdTime = htobe64(pTable->createdTime);
SSchema *pSchema = (SSchema *) pCreate->data;
......@@ -338,11 +338,11 @@ int32_t mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgOb
strcpy(pTable->tableId, pCreate->tableId);
pTable->createdTime = taosGetTimestampMs();
pTable->type = TSDB_NORMAL_TABLE;
pTable->vgId = pVgroup->vgId;
pTable->sid = sid;
pTable->uid = (((uint64_t) pTable->createdTime) << 16) + ((uint64_t) sdbGetVersion() & ((1ul << 16) - 1ul));
pTable->sid = sid;
pTable->createdTime = taosGetTimestampMs();
pTable->sversion = 0;
pTable->numOfColumns = pCreate->numOfColumns;
pTable->sqlLen = pTable->sqlLen;
......@@ -364,7 +364,7 @@ int32_t mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgOb
pTable->sqlLen = pCreate->sqlLen;
if (pTable->sqlLen != 0) {
pTable->type = TSDB_STREAM_TABLE;
pTable->sql = calloc(1, pTable->sqlLen);
if (pTable->sql == NULL) {
......@@ -377,20 +377,20 @@ int32_t mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgOb
if (sdbInsertRow(tsNormalTableSdb, pTable, 0) < 0) {
mError("table:%s, update sdb error", pCreate->tableId);
*pDCreateOut = mgmtBuildCreateNormalTableMsg(pTable, pVgroup);
if (*pDCreateOut == NULL) {
mError("table:%s, failed to build create table message", pCreate->tableId);
sdbDeleteRow(tsNormalTableSdb, pTable);
*pTableOut = (STableInfo *) pTable;
mTrace("table:%s, create ntable in vgroup, vgroup:%d sid:%d vnode:%d uid:%" PRIu64 ,
pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid);
mTrace("table:%s, create ntable in vgroup, uid:%" PRIu64 , pTable->tableId, pTable->uid);
......@@ -184,7 +184,7 @@ static void mgmtProcessShowMsg(SQueuedMsg *pMsg) {
if (!tsMgmtShowMetaFp[pShowMsg->type]) {
mError("show type:%d %s is not support", pShowMsg->type, taosMsg[pShowMsg->type]);
mError("show type:%s is not support", taosGetShowTypeStr(pShowMsg->type));
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_OPS_NOT_SUPPORT);
......@@ -206,6 +206,7 @@ static void mgmtProcessShowMsg(SQueuedMsg *pMsg) {
pShowRsp->qhandle = htobe64((uint64_t) pShow);
mTrace("show:%p, type:%s, start to get meta", pShow, taosGetShowTypeStr(pShowMsg->type));
int32_t code = (*tsMgmtShowMetaFp[pShowMsg->type])(&pShowRsp->tableMeta, pShow, pMsg->thandle);
if (code == 0) {
SRpcMsg rpcRsp = {
......@@ -217,7 +218,7 @@ static void mgmtProcessShowMsg(SQueuedMsg *pMsg) {
} else {
mError("pShow:%p, type:%d %s, failed to get Meta, code:%d", pShow, pShowMsg->type, taosMsg[pShowMsg->type], code);
mError("show:%p, type:%s, failed to get meta, reason:%s", pShow, taosGetShowTypeStr(pShowMsg->type), tstrerror(code));
......@@ -65,6 +65,8 @@ static void mgmtSuperTableActionInit() {
mgmtSuperTableActionFp[SDB_TYPE_DECODE] = mgmtSuperTableActionDecode;
mgmtSuperTableActionFp[SDB_TYPE_RESET] = mgmtSuperTableActionReset;
mgmtSuperTableActionFp[SDB_TYPE_DESTROY] = mgmtSuperTableActionDestroy;
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_METRIC, mgmtGetShowSuperTableMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_METRIC, mgmtRetrieveShowSuperTables);
void *mgmtSuperTableActionReset(void *row, char *str, int32_t size, int32_t *ssize) {
......@@ -213,14 +215,14 @@ int32_t mgmtCreateSuperTable(SDbObj *pDb, SCMCreateTableMsg *pCreate) {
strcpy(pStable->tableId, pCreate->tableId);
pStable->type = TSDB_SUPER_TABLE;
pStable->createdTime = taosGetTimestampMs();
pStable->vgId = 0;
pStable->sid = 0;
pStable->uid = (((uint64_t)pStable->createdTime) << 16) + ((uint64_t)sdbGetVersion() & ((1ul << 16) - 1ul));
pStable->sversion = 0;
pStable->numOfColumns = pCreate->numOfColumns;
pStable->numOfTags = pCreate->numOfTags;
pStable->type = TSDB_SUPER_TABLE;
pStable->createdTime = taosGetTimestampMs();
pStable->vgId = 0;
pStable->sid = 0;
pStable->uid = (((uint64_t) pStable->createdTime) << 16) + ((uint64_t) sdbGetVersion() & ((1ul << 16) - 1ul));
pStable->sversion = 0;
pStable->numOfColumns = htons(pCreate->numOfColumns);
pStable->numOfTags = htons(pCreate->numOfTags);
int32_t numOfCols = pCreate->numOfColumns + pCreate->numOfTags;
int32_t schemaSize = numOfCols * sizeof(SSchema);
......@@ -233,16 +235,17 @@ int32_t mgmtCreateSuperTable(SDbObj *pDb, SCMCreateTableMsg *pCreate) {
memcpy(pStable->schema, pCreate->schema, numOfCols * sizeof(SSchema));
pStable->nextColId = 0;
for (int32_t col = 0; col < pCreate->numOfColumns; col++) {
SSchema *tschema = (SSchema *)pStable->schema;
for (int32_t col = 0; col < pStable->numOfColumns; col++) {
SSchema *tschema = pStable->schema;
tschema[col].colId = pStable->nextColId++;
if (sdbInsertRow(tsSuperTableSdb, pStable, 0) < 0) {
mError("table:%s, update sdb error", pCreate->tableId);
mError("stable:%s, update sdb error", pCreate->tableId);
mPrint("stable:%s, is created, tags:%d cols:%d", pCreate->tableId, pStable->numOfTags, pStable->numOfColumns);
......@@ -500,7 +503,7 @@ static int32_t mgmtGetShowSuperTableMeta(STableMeta *pMeta, SShowObj *pShow, voi
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "created_time");
strcpy(pSchema[cols].name, "create time");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
......@@ -150,7 +150,7 @@ static void mgmtCreateTable(SVgObj *pVgroup, SQueuedMsg *pMsg) {
int32_t sid = taosAllocateId(pVgroup->idPool);
if (sid < 0) {
mTrace("thandle:%p, no enough sid in vgroup:%d, start to create a new one", pMsg->thandle, pVgroup->vgId);
mTrace("tables:%s, no enough sid in vgroup:%d", pVgroup->vgId);
......@@ -160,15 +160,15 @@ static void mgmtCreateTable(SVgObj *pVgroup, SQueuedMsg *pMsg) {
SMDCreateTableMsg *pMDCreate = NULL;
if (pCreate->numOfColumns == 0) {
mTrace("thandle:%p, create ctable:%s, vgroup:%d sid:%d ahandle:%p", pMsg->thandle, pCreate->tableId, pVgroup->vgId, sid, pMsg);
mTrace("table:%s, is a child table, vgroup:%d sid:%d ahandle:%p", pCreate->tableId, pVgroup->vgId, sid, pMsg);
code = mgmtCreateChildTable(pCreate, pMsg->contLen, pVgroup, sid, &pMDCreate, &pTable);
} else {
mTrace("thandle:%p, create ntable:%s, vgroup:%d sid:%d ahandle:%p", pMsg->thandle, pCreate->tableId, pVgroup->vgId, sid, pMsg);
mTrace("table:%s, is a normal table, vgroup:%d sid:%d ahandle:%p", pCreate->tableId, pVgroup->vgId, sid, pMsg);
code = mgmtCreateNormalTable(pCreate, pMsg->contLen, pVgroup, sid, &pMDCreate, &pTable);
if (code != TSDB_CODE_SUCCESS) {
mTrace("thandle:%p, failed to create table:%s in vgroup:%d", pMsg->thandle, pCreate->tableId, pVgroup->vgId);
mTrace("table:%s, failed to create in vgroup:%d", pCreate->tableId, pVgroup->vgId);
mgmtSendSimpleResp(pMsg->thandle, code);
......@@ -282,13 +282,13 @@ int32_t mgmtGetShowTableMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) {
pShow->bytes[cols] = TSDB_TABLE_NAME_LEN;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "table_name");
strcpy(pSchema[cols].name, "table name");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "created_time");
strcpy(pSchema[cols].name, "create time");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
......@@ -300,7 +300,7 @@ int32_t mgmtGetShowTableMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) {
pShow->bytes[cols] = TSDB_TABLE_NAME_LEN;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "stable");
strcpy(pSchema[cols].name, "super table name");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
......@@ -437,25 +437,20 @@ SMDDropTableMsg *mgmtBuildRemoveTableMsg(STableInfo *pTable) {
return pRemove;
void mgmtSetTableDirty(STableInfo *pTable, bool isDirty) {
// TODO: if dirty, delete from sdb
pTable->dirty = isDirty;
void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) {
if (mgmtCheckRedirect(pMsg->thandle)) return;
SCMCreateTableMsg *pCreate = pMsg->pCont;
mTrace("thandle:%p, start to create table:%s", pMsg->thandle, pCreate->tableId);
mTrace("table:%s, create msg is received from thandle:%p", pCreate->tableId, pMsg->thandle);
if (mgmtCheckExpired()) {
mError("thandle:%p, failed to create table:%s, grant expired", pCreate->tableId);
mError("table:%s, failed to create, grant expired", pCreate->tableId);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_GRANT_EXPIRED);
if (!pMsg->pUser->writeAuth) {
mError("thandle:%p, failed to create table:%s, no rights", pMsg->thandle, pCreate->tableId);
mError("table:%s, failed to create, no rights", pCreate->tableId);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS);
......@@ -463,14 +458,14 @@ void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) {
SAcctObj *pAcct = pMsg->pUser->pAcct;
int32_t code = mgmtCheckTableLimit(pAcct, htons(pCreate->numOfColumns));
if (code != TSDB_CODE_SUCCESS) {
mError("thandle:%p, failed to create table:%s, exceed the limit", pMsg->thandle, pCreate->tableId);
mError("table:%s, failed to create, exceed the limit", pCreate->tableId);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS);
pMsg->pDb = mgmtGetDb(pCreate->db);
if (pMsg->pDb == NULL) {
mError("thandle:%p, failed to create table:%s, db not selected", pMsg->thandle, pCreate->tableId);
mError("table:%s, failed to create, db not selected", pCreate->tableId);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_DB_NOT_SELECTED);
......@@ -478,19 +473,18 @@ void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) {
STableInfo *pTable = mgmtGetTable(pCreate->tableId);
if (pTable != NULL) {
if (pCreate->igExists) {
mTrace("thandle:%p, table:%s is already exist", pMsg->thandle, pCreate->tableId);
mTrace("table:%s is already exist", pCreate->tableId);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SUCCESS);
} else {
mError("thandle:%p, failed to create table:%s, table already exist", pMsg->thandle, pCreate->tableId);
mError("table:%s, failed to create, table already exist", pCreate->tableId);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_TABLE_ALREADY_EXIST);
if (pCreate->numOfTags != 0) {
mTrace("thandle:%p, start to create super table:%s, tags:%d columns:%d",
pMsg->thandle, pCreate->tableId, pCreate->numOfTags, pCreate->numOfColumns);
mTrace("table:%s, is a super table", pCreate->tableId);
code = mgmtCreateSuperTable(pMsg->pDb, pCreate);
mgmtSendSimpleResp(pMsg->thandle, code);
......@@ -498,7 +492,7 @@ void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) {
code = mgmtCheckTimeSeries(pCreate->numOfColumns);
if (code != TSDB_CODE_SUCCESS) {
mError("thandle:%p, failed to create table:%s, timeseries exceed the limit", pMsg->thandle, pCreate->tableId);
mError("table:%s, failed to create, timeseries exceed the limit", pCreate->tableId);
mgmtSendSimpleResp(pMsg->thandle, code);
......@@ -509,10 +503,10 @@ void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) {
SVgObj *pVgroup = mgmtGetAvailableVgroup(pMsg->pDb);
if (pVgroup == NULL) {
mTrace("thandle:%p, table:%s start to create a new vgroup", newMsg->thandle, pCreate->tableId);
mTrace("table:%s, start to create a new vgroup", pCreate->tableId);
} else {
mTrace("thandle:%p, create table:%s in vgroup:%d", newMsg->thandle, pCreate->tableId, pVgroup->vgId);
mTrace("table:%s, vgroup:%d is selected", pCreate->tableId, pVgroup->vgId);
mgmtCreateTable(pVgroup, newMsg);
......@@ -761,19 +755,19 @@ static void mgmtProcessCreateTableRsp(SRpcMsg *rpcMsg) {
STableInfo *pTable = queueMsg->ahandle;
mTrace("thandle:%p, create table:%s rsp received, ahandle:%p code:%d received:%d",
queueMsg->thandle, pTable->tableId, rpcMsg->handle, rpcMsg->code, queueMsg->received);
mTrace("table:%s, create table rsp received, thandle:%p ahandle:%p result:%s", pTable->tableId, queueMsg->thandle,
rpcMsg->handle, tstrerror(rpcMsg->code));
if (rpcMsg->code != TSDB_CODE_SUCCESS) {
mgmtSetTableDirty(pTable, true);
//sdbDeleteRow(tsVgroupSdb, pVgroup);
if (pTable->type == TSDB_CHILD_TABLE) {
sdbDeleteRow(tsChildTableSdb, pTable);
} else if (pTable->type == TSDB_NORMAL_TABLE){
sdbDeleteRow(tsNormalTableSdb, pTable);
} else {}
mError("table:%s, failed to create in dnode, reason:%s", pTable->tableId, tstrerror(rpcMsg->code));
mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code);
mError("table:%s, failed to create in dnode, reason:%s, set it dirty", pTable->tableId, tstrerror(rpcMsg->code));
mgmtSetTableDirty(pTable, true);
} else {
mTrace("table:%s, created in dnode", pTable->tableId);
mgmtSetTableDirty(pTable, false);
if (queueMsg->msgType != TSDB_MSG_TYPE_CM_CREATE_TABLE) {
SQueuedMsg *newMsg = calloc(1, sizeof(SQueuedMsg));
newMsg->msgType = queueMsg->msgType;
......@@ -783,7 +777,7 @@ static void mgmtProcessCreateTableRsp(SRpcMsg *rpcMsg) {
newMsg->contLen = queueMsg->contLen;
newMsg->pCont = rpcMallocCont(newMsg->contLen);
memcpy(newMsg->pCont, queueMsg->pCont, newMsg->contLen);
mTrace("table:%s, start to process get meta", pTable->tableId);
mTrace("table:%s, start to get meta", pTable->tableId);
} else {
mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code);
......@@ -150,7 +150,7 @@ void mgmtProcessVgTimer(void *handle, void *tmrId) {
void mgmtCreateVgroup(SQueuedMsg *pMsg) {
SDbObj *pDb = pMsg->pDb;
if (pDb == NULL) {
mError("thandle:%p, failed to create vgroup, db not found", pMsg->thandle);
mError("failed to create vgroup, db not found");
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_DB);
......@@ -159,7 +159,7 @@ void mgmtCreateVgroup(SQueuedMsg *pMsg) {
strcpy(pVgroup->dbName, pDb->name);
pVgroup->numOfVnodes = pDb->cfg.replications;
if (mgmtAllocVnodes(pVgroup) != 0) {
mError("thandle:%p, db:%s no enough dnode to alloc %d vnodes", pMsg->thandle, pDb->name, pVgroup->numOfVnodes);
mError("db:%s, no enough dnode to alloc %d vnodes to vgroup", pDb->name, pVgroup->numOfVnodes);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_ENOUGH_DNODES);
......@@ -175,11 +175,9 @@ void mgmtCreateVgroup(SQueuedMsg *pMsg) {
sdbInsertRow(tsVgroupSdb, pVgroup, 0);
mPrint("thandle:%p, vgroup:%d is created in mnode, db:%s replica:%d", pMsg->thandle, pVgroup->vgId, pDb->name,
mPrint("vgroup:%d, is created in mnode, db:%s replica:%d", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes);
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
mPrint("thandle:%p, vgroup:%d, dnode:%s vnode:%d", pMsg->thandle, pVgroup->vgId,
taosIpStr(pVgroup->vnodeGid[i].ip), pVgroup->vnodeGid[i].vnode);
mPrint("vgroup:%d, dnode:%s vnode:%d", pVgroup->vgId, taosIpStr(pVgroup->vnodeGid[i].ip), pVgroup->vnodeGid[i].vnode);
pMsg->ahandle = pVgroup;
......@@ -595,7 +593,7 @@ void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, vo
void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle) {
mTrace("send create vgroup:%d msg, ahandle:%p", pVgroup->vgId, ahandle);
mTrace("vgroup:%d, send create all vnodes msg, ahandle:%p", pVgroup->vgId, ahandle);
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip);
mgmtSendCreateVnodeMsg(pVgroup, pVgroup->vnodeGid[i].vnode, &ipSet, ahandle);
......@@ -613,9 +611,9 @@ static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
SVgObj *pVgroup = queueMsg->ahandle;
mTrace("thandle:%p, vgroup:%d create vnode rsp received, ahandle:%p code:%d received:%d successed:%d expected:%d",
queueMsg->thandle, pVgroup->vgId, rpcMsg->handle, rpcMsg->code, queueMsg->received, queueMsg->successed,
mTrace("vgroup:%d, create vnode rsp received, result:%s received:%d successed:%d expected:%d, thandle:%p ahandle:%p",
pVgroup->vgId, tstrerror(rpcMsg->code), queueMsg->received, queueMsg->successed, queueMsg->expected,
queueMsg->thandle, rpcMsg->handle);
if (queueMsg->received != queueMsg->expected) return;
......@@ -94,17 +94,17 @@ enum TSDB_TABLE_STATUS {
const char* taosGetVgroupStatusStr(int32_t vgroupStatus);
const char* taosGetDbStatusStr(int32_t dbStatus);
const char* taosGetVnodeStatusStr(int32_t vnodeStatus);
const char* taosGetVnodeSyncStatusStr(int32_t vnodeSyncStatus);
const char* taosGetVnodeDropStatusStr(int32_t dropping);
const char* taosGetDnodeStatusStr(int32_t dnodeStatus);
const char* taosGetDnodeLbStatusStr(int32_t dnodeBalanceStatus);
const char* taosGetVgroupLbStatusStr(int32_t vglbStatus);
const char* taosGetVnodeStreamStatusStr(int32_t vnodeStreamStatus);
const char* taosGetTableStatusStr(int32_t tableStatus);
char* taosGetVgroupStatusStr(int32_t vgroupStatus);
char* taosGetDbStatusStr(int32_t dbStatus);
char* taosGetVnodeStatusStr(int32_t vnodeStatus);
char* taosGetVnodeSyncStatusStr(int32_t vnodeSyncStatus);
char* taosGetVnodeDropStatusStr(int32_t dropping);
char* taosGetDnodeStatusStr(int32_t dnodeStatus);
char* taosGetDnodeLbStatusStr(int32_t dnodeBalanceStatus);
char* taosGetVgroupLbStatusStr(int32_t vglbStatus);
char* taosGetVnodeStreamStatusStr(int32_t vnodeStreamStatus);
char* taosGetTableStatusStr(int32_t tableStatus);
char *taosGetShowTypeStr(int32_t showType);
#ifdef __cplusplus
......@@ -16,7 +16,7 @@
#include "taosmsg.h"
#include "tstatus.h"
const char* taosGetVgroupStatusStr(int32_t vgroupStatus) {
char* taosGetVgroupStatusStr(int32_t vgroupStatus) {
switch (vgroupStatus) {
case TSDB_VG_STATUS_READY: return tstrerror(vgroupStatus);
case TSDB_VG_STATUS_IN_PROGRESS: return tstrerror(vgroupStatus);
......@@ -29,7 +29,7 @@ const char* taosGetVgroupStatusStr(int32_t vgroupStatus) {
const char* taosGetDbStatusStr(int32_t dbStatus) {
char* taosGetDbStatusStr(int32_t dbStatus) {
switch (dbStatus) {
case TSDB_DB_STATUS_READY: return "ready";
case TSDB_DB_STATUS_DROPPING: return "dropping";
......@@ -38,7 +38,7 @@ const char* taosGetDbStatusStr(int32_t dbStatus) {
const char* taosGetVnodeStatusStr(int32_t vnodeStatus) {
char* taosGetVnodeStatusStr(int32_t vnodeStatus) {
switch (vnodeStatus) {
case TSDB_VN_STATUS_OFFLINE: return "offline";
case TSDB_VN_STATUS_CREATING: return "creating";
......@@ -51,7 +51,7 @@ const char* taosGetVnodeStatusStr(int32_t vnodeStatus) {
const char* taosGetVnodeSyncStatusStr(int32_t vnodeSyncStatus) {
char* taosGetVnodeSyncStatusStr(int32_t vnodeSyncStatus) {
switch (vnodeSyncStatus) {
case TSDB_VN_SYNC_STATUS_INIT: return "ready";
case TSDB_VN_SYNC_STATUS_SYNCING: return "syncing";
......@@ -61,7 +61,7 @@ const char* taosGetVnodeSyncStatusStr(int32_t vnodeSyncStatus) {
const char* taosGetVnodeDropStatusStr(int32_t dropping) {
char* taosGetVnodeDropStatusStr(int32_t dropping) {
switch (dropping) {
case TSDB_VN_DROP_STATUS_READY: return "ready";
case TSDB_VN_DROP_STATUS_DROPPING: return "dropping";
......@@ -69,7 +69,7 @@ const char* taosGetVnodeDropStatusStr(int32_t dropping) {
const char* taosGetDnodeStatusStr(int32_t dnodeStatus) {
char* taosGetDnodeStatusStr(int32_t dnodeStatus) {
switch (dnodeStatus) {
case TSDB_DN_STATUS_OFFLINE: return "offline";
case TSDB_DN_STATUS_READY: return "ready";
......@@ -77,7 +77,7 @@ const char* taosGetDnodeStatusStr(int32_t dnodeStatus) {
const char* taosGetDnodeLbStatusStr(int32_t dnodeBalanceStatus) {
char* taosGetDnodeLbStatusStr(int32_t dnodeBalanceStatus) {
switch (dnodeBalanceStatus) {
case TSDB_DN_LB_STATUS_BALANCED: return "balanced";
case TSDB_DN_LB_STATUS_BALANCING: return "balancing";
......@@ -87,7 +87,7 @@ const char* taosGetDnodeLbStatusStr(int32_t dnodeBalanceStatus) {
const char* taosGetVgroupLbStatusStr(int32_t vglbStatus) {
char* taosGetVgroupLbStatusStr(int32_t vglbStatus) {
switch (vglbStatus) {
case TSDB_VG_LB_STATUS_READY: return "ready";
case TSDB_VG_LB_STATUS_UPDATE: return "updating";
......@@ -95,7 +95,7 @@ const char* taosGetVgroupLbStatusStr(int32_t vglbStatus) {
const char* taosGetVnodeStreamStatusStr(int32_t vnodeStreamStatus) {
char* taosGetVnodeStreamStatusStr(int32_t vnodeStreamStatus) {
switch (vnodeStreamStatus) {
case TSDB_VN_STREAM_STATUS_START: return "start";
case TSDB_VN_STREAM_STATUS_STOP: return "stop";
......@@ -103,9 +103,9 @@ const char* taosGetVnodeStreamStatusStr(int32_t vnodeStreamStatus) {
const char* taosGetTableStatusStr(int32_t tableStatus) {
char* taosGetTableStatusStr(int32_t tableStatus) {
switch(tableStatus) {
case TSDB_METER_STATE_INSERTING: return "inserting";
case TSDB_METER_STATE_INSERTING:return "inserting";
case TSDB_METER_STATE_IMPORTING:return "importing";
case TSDB_METER_STATE_UPDATING: return "updating";
case TSDB_METER_STATE_DROPPING: return "deleting";
......@@ -114,3 +114,25 @@ const char* taosGetTableStatusStr(int32_t tableStatus) {
default:return "undefined";
char *taosGetShowTypeStr(int32_t showType) {
switch (showType) {
case TSDB_MGMT_TABLE_ACCT: return "show accounts";
case TSDB_MGMT_TABLE_USER: return "show users";
case TSDB_MGMT_TABLE_DB: return "show databases";
case TSDB_MGMT_TABLE_TABLE: return "show tables";
case TSDB_MGMT_TABLE_DNODE: return "show dnodes";
case TSDB_MGMT_TABLE_MNODE: return "show mnodes";
case TSDB_MGMT_TABLE_VGROUP: return "show vgroups";
case TSDB_MGMT_TABLE_METRIC: return "show stables";
case TSDB_MGMT_TABLE_MODULE: return "show modules";
case TSDB_MGMT_TABLE_QUERIES: return "show queries";
case TSDB_MGMT_TABLE_STREAMS: return "show streams";
case TSDB_MGMT_TABLE_CONFIGS: return "show configs";
case TSDB_MGMT_TABLE_CONNS: return "show connections";
case TSDB_MGMT_TABLE_SCORES: return "show scores";
case TSDB_MGMT_TABLE_GRANTS: return "show grants";
case TSDB_MGMT_TABLE_VNODES: return "show vnodes";
default: return "undefined";
......@@ -128,4 +128,5 @@ char *taosMsg[] = {
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册