提交 7427a37f 编写于 作者: K kailixu

chore: meta sync

上级 ea3f0289
...@@ -232,7 +232,7 @@ int32_t tsKeepTimeOffset = 0; ...@@ -232,7 +232,7 @@ int32_t tsKeepTimeOffset = 0;
int32_t tsDiskCfgNum = 0; int32_t tsDiskCfgNum = 0;
int32_t tsTopicBianryLen = 16000; int32_t tsTopicBianryLen = 16000;
int32_t tsMetaSyncOption = 0; int32_t tsMetaSyncOption = 1;
#ifndef _STORAGE #ifndef _STORAGE
SDiskCfg tsDiskCfg[1]; SDiskCfg tsDiskCfg[1];
......
...@@ -222,6 +222,8 @@ int32_t* taosGetErrno(); ...@@ -222,6 +222,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_INVALID_TOPIC_PARTITONS TAOS_DEF_ERROR_CODE(0, 0x0394) //"Invalid topic partitons num, valid range: [1, 1000]) #define TSDB_CODE_MND_INVALID_TOPIC_PARTITONS TAOS_DEF_ERROR_CODE(0, 0x0394) //"Invalid topic partitons num, valid range: [1, 1000])
#define TSDB_CODE_MND_TOPIC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0395) //"Topic already exists) #define TSDB_CODE_MND_TOPIC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0395) //"Topic already exists)
#define TSDB_CODE_MND_INVALID_FORMAT TAOS_DEF_ERROR_CODE(0, 0x0396) //"Invalid format)
// dnode // dnode
#define TSDB_CODE_DND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0400) //"Message not processed" #define TSDB_CODE_DND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0400) //"Message not processed"
#define TSDB_CODE_DND_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0401) //"Dnode out of memory" #define TSDB_CODE_DND_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0401) //"Dnode out of memory"
......
...@@ -14,60 +14,59 @@ ...@@ -14,60 +14,59 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "mnodeTable.h"
#include "taosmsg.h" #include "dnode.h"
#include "tutil.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "tscompression.h"
#include "tname.h"
#include "tidpool.h"
#include "tglobal.h"
#include "tcompare.h"
#include "tdataformat.h"
#include "tgrant.h"
#include "tqueue.h"
#include "hash.h" #include "hash.h"
#include "mnode.h" #include "mnode.h"
#include "dnode.h"
#include "mnodeDef.h"
#include "mnodeInt.h"
#include "mnodeAcct.h" #include "mnodeAcct.h"
#include "mnodeDb.h" #include "mnodeDb.h"
#include "mnodeDef.h"
#include "mnodeDnode.h" #include "mnodeDnode.h"
#include "mnodeFunc.h"
#include "mnodeInt.h"
#include "mnodePeer.h"
#include "mnodeRead.h"
#include "mnodeSdb.h" #include "mnodeSdb.h"
#include "mnodeShow.h" #include "mnodeShow.h"
#include "mnodeTable.h"
#include "mnodeUser.h" #include "mnodeUser.h"
#include "mnodeVgroup.h" #include "mnodeVgroup.h"
#include "mnodeWrite.h" #include "mnodeWrite.h"
#include "mnodeRead.h" #include "os.h"
#include "mnodePeer.h" #include "taoserror.h"
#include "mnodeFunc.h" #include "taosmsg.h"
#include "tcompare.h"
#include "tdataformat.h"
#include "tglobal.h"
#include "tgrant.h"
#include "tidpool.h"
#include "tname.h"
#include "tqueue.h"
#include "tscompression.h"
#include "tutil.h"
#define ALTER_CTABLE_RETRY_TIMES 3 #define ALTER_CTABLE_RETRY_TIMES 3
#define CREATE_CTABLE_RETRY_TIMES 10 #define CREATE_CTABLE_RETRY_TIMES 10
#define CREATE_CTABLE_RETRY_SEC 14 #define CREATE_CTABLE_RETRY_SEC 14
// informal // informal
#define META_SYNC_TABLE_NAME "_taos_meta_sync_table_name_taos_"
#define META_SYNC_TABLE_NAME_LEN 32 #define META_SYNC_TABLE_NAME_LEN 32
#define META_SYNC_TABLE_NAME "_taos_meta_sync_table_name_taos_"
#define META_SYNC_CRET_MNDTB "_taos_meta_sync_cret_mndtb_taos_"
#define META_SYNC_DROP_MNDTB "_taos_meta_sync_drop_mndtb_taos_" #define META_SYNC_DROP_MNDTB "_taos_meta_sync_drop_mndtb_taos_"
#define META_SYNC_DROP_VNDTB "_taos_meta_sync_drop_vndtb_taos_" #define META_SYNC_DROP_VNDTB "_taos_meta_sync_drop_vndtb_taos_"
#define META_SYNC_DROP_TABLE_LEN 32
// informal // informal
int64_t tsCTableRid = -1; int64_t tsCTableRid = -1;
static void * tsChildTableSdb; static void *tsChildTableSdb;
int64_t tsSTableRid = -1; int64_t tsSTableRid = -1;
static void * tsSuperTableSdb; static void *tsSuperTableSdb;
static SHashObj *tsSTableUidHash; static SHashObj *tsSTableUidHash;
static int32_t tsChildTableUpdateSize; static int32_t tsChildTableUpdateSize;
static int32_t tsSuperTableUpdateSize; static int32_t tsSuperTableUpdateSize;
static void * mnodeGetChildTable(char *tableId); static void *mnodeGetChildTable(char *tableId);
static void * mnodeGetSuperTable(char *tableId); static void *mnodeGetSuperTable(char *tableId);
static void * mnodeGetSuperTableByUid(uint64_t uid); static void *mnodeGetSuperTableByUid(uint64_t uid);
static void mnodeDropAllChildTablesInStable(SSTableObj *pStable); static void mnodeDropAllChildTablesInStable(SSTableObj *pStable);
static void mnodeAddTableIntoStable(SSTableObj *pStable, SCTableObj *pCtable); static void mnodeAddTableIntoStable(SSTableObj *pStable, SCTableObj *pCtable);
static void mnodeRemoveTableFromStable(SSTableObj *pStable, SCTableObj *pCtable); static void mnodeRemoveTableFromStable(SSTableObj *pStable, SCTableObj *pCtable);
...@@ -83,6 +82,7 @@ static int32_t mnodeProcessCreateTableMsg(SMnodeMsg *pMsg); ...@@ -83,6 +82,7 @@ static int32_t mnodeProcessCreateTableMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg);
static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg); static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg);
static int32_t mnodeProcessMetaSyncCreateChildTableMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessDropTableMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessDropTableMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg);
...@@ -115,10 +115,10 @@ static void mnodeDestroyChildTable(SCTableObj *pTable) { ...@@ -115,10 +115,10 @@ static void mnodeDestroyChildTable(SCTableObj *pTable) {
tfree(pTable); tfree(pTable);
} }
static char* mnodeGetTableShowPattern(SShowObj *pShow) { static char *mnodeGetTableShowPattern(SShowObj *pShow) {
char* pattern = NULL; char *pattern = NULL;
if (pShow != NULL && pShow->payloadLen > 0) { if (pShow != NULL && pShow->payloadLen > 0) {
pattern = (char*)malloc(pShow->payloadLen + 1); pattern = (char *)malloc(pShow->payloadLen + 1);
if (pattern == NULL) { if (pattern == NULL) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL; return NULL;
...@@ -441,11 +441,11 @@ static int32_t mnodeInitChildTables() { ...@@ -441,11 +441,11 @@ static int32_t mnodeInitChildTables() {
SCTableObj tObj; SCTableObj tObj;
tsChildTableUpdateSize = (int32_t)((int8_t *)tObj.updateEnd - (int8_t *)&tObj.info.type); tsChildTableUpdateSize = (int32_t)((int8_t *)tObj.updateEnd - (int8_t *)&tObj.info.type);
SSdbTableDesc desc = { SSdbTableDesc desc = {.id = SDB_TABLE_CTABLE,
.id = SDB_TABLE_CTABLE,
.name = "ctables", .name = "ctables",
.hashSessions = TSDB_DEFAULT_CTABLES_HASH_SIZE, .hashSessions = TSDB_DEFAULT_CTABLES_HASH_SIZE,
.maxRowSize = sizeof(SCTableObj) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16) + TSDB_TABLE_FNAME_LEN + TSDB_CQ_SQL_SIZE, .maxRowSize = sizeof(SCTableObj) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16) +
TSDB_TABLE_FNAME_LEN + TSDB_CQ_SQL_SIZE,
.refCountPos = (int32_t)((int8_t *)(&tObj.refCount) - (int8_t *)&tObj), .refCountPos = (int32_t)((int8_t *)(&tObj.refCount) - (int8_t *)&tObj),
.keyType = SDB_KEY_VAR_STRING, .keyType = SDB_KEY_VAR_STRING,
.fpInsert = mnodeChildTableActionInsert, .fpInsert = mnodeChildTableActionInsert,
...@@ -454,8 +454,7 @@ static int32_t mnodeInitChildTables() { ...@@ -454,8 +454,7 @@ static int32_t mnodeInitChildTables() {
.fpEncode = mnodeChildTableActionEncode, .fpEncode = mnodeChildTableActionEncode,
.fpDecode = mnodeChildTableActionDecode, .fpDecode = mnodeChildTableActionDecode,
.fpDestroy = mnodeChildTableActionDestroy, .fpDestroy = mnodeChildTableActionDestroy,
.fpRestored = mnodeChildTableActionRestored .fpRestored = mnodeChildTableActionRestored};
};
tsCTableRid = sdbOpenTable(&desc); tsCTableRid = sdbOpenTable(&desc);
tsChildTableSdb = sdbGetTableByRid(tsCTableRid); tsChildTableSdb = sdbGetTableByRid(tsCTableRid);
...@@ -473,13 +472,9 @@ static void mnodeCleanupChildTables() { ...@@ -473,13 +472,9 @@ static void mnodeCleanupChildTables() {
tsChildTableSdb = NULL; tsChildTableSdb = NULL;
} }
int64_t mnodeGetSuperTableNum() { int64_t mnodeGetSuperTableNum() { return sdbGetNumOfRows(tsSuperTableSdb); }
return sdbGetNumOfRows(tsSuperTableSdb);
}
int64_t mnodeGetChildTableNum() { int64_t mnodeGetChildTableNum() { return sdbGetNumOfRows(tsChildTableSdb); }
return sdbGetNumOfRows(tsChildTableSdb);
}
static void mnodeAddTableIntoStable(SSTableObj *pStable, SCTableObj *pCtable) { static void mnodeAddTableIntoStable(SSTableObj *pStable, SCTableObj *pCtable) {
atomic_add_fetch_32(&pStable->numOfTables, 1); atomic_add_fetch_32(&pStable->numOfTables, 1);
...@@ -492,8 +487,8 @@ static void mnodeAddTableIntoStable(SSTableObj *pStable, SCTableObj *pCtable) { ...@@ -492,8 +487,8 @@ static void mnodeAddTableIntoStable(SSTableObj *pStable, SCTableObj *pCtable) {
if (pStable->vgHash != NULL) { if (pStable->vgHash != NULL) {
if (taosHashGet(pStable->vgHash, &pCtable->vgId, sizeof(pCtable->vgId)) == NULL) { if (taosHashGet(pStable->vgHash, &pCtable->vgId, sizeof(pCtable->vgId)) == NULL) {
taosHashPut(pStable->vgHash, &pCtable->vgId, sizeof(pCtable->vgId), &pCtable->vgId, sizeof(pCtable->vgId)); taosHashPut(pStable->vgHash, &pCtable->vgId, sizeof(pCtable->vgId), &pCtable->vgId, sizeof(pCtable->vgId));
mDebug("stable:%s, vgId:%d is put into stable vgId hash:%p, sizeOfVgList:%d", pStable->info.tableId, pCtable->vgId, mDebug("stable:%s, vgId:%d is put into stable vgId hash:%p, sizeOfVgList:%d", pStable->info.tableId,
pStable->vgHash, taosHashGetSize(pStable->vgHash)); pCtable->vgId, pStable->vgHash, taosHashGetSize(pStable->vgHash));
} }
} }
} }
...@@ -530,7 +525,7 @@ static int32_t mnodeSuperTableActionDestroy(SSdbRow *pRow) { ...@@ -530,7 +525,7 @@ static int32_t mnodeSuperTableActionDestroy(SSdbRow *pRow) {
static int32_t mnodeSuperTableActionInsert(SSdbRow *pRow) { static int32_t mnodeSuperTableActionInsert(SSdbRow *pRow) {
SSTableObj *pStable = pRow->pObj; SSTableObj *pStable = pRow->pObj;
SDbObj * pDb = mnodeGetDbByTableName(pStable->info.tableId); SDbObj *pDb = mnodeGetDbByTableName(pStable->info.tableId);
if (pDb != NULL && pDb->status == TSDB_DB_STATUS_READY) { if (pDb != NULL && pDb->status == TSDB_DB_STATUS_READY) {
mnodeAddSuperTableIntoDb(pDb); mnodeAddSuperTableIntoDb(pDb);
} }
...@@ -544,7 +539,7 @@ static int32_t mnodeSuperTableActionInsert(SSdbRow *pRow) { ...@@ -544,7 +539,7 @@ static int32_t mnodeSuperTableActionInsert(SSdbRow *pRow) {
static int32_t mnodeSuperTableActionDelete(SSdbRow *pRow) { static int32_t mnodeSuperTableActionDelete(SSdbRow *pRow) {
SSTableObj *pStable = pRow->pObj; SSTableObj *pStable = pRow->pObj;
SDbObj * pDb = mnodeGetDbByTableName(pStable->info.tableId); SDbObj *pDb = mnodeGetDbByTableName(pStable->info.tableId);
if (pDb != NULL) { if (pDb != NULL) {
mnodeRemoveSuperTableFromDb(pDb); mnodeRemoveSuperTableFromDb(pDb);
mnodeDropAllChildTablesInStable((SSTableObj *)pStable); mnodeDropAllChildTablesInStable((SSTableObj *)pStable);
...@@ -616,11 +611,11 @@ static int32_t mnodeSuperTableActionEncode(SSdbRow *pRow) { ...@@ -616,11 +611,11 @@ static int32_t mnodeSuperTableActionEncode(SSdbRow *pRow) {
static int32_t mnodeSuperTableActionDecode(SSdbRow *pRow) { static int32_t mnodeSuperTableActionDecode(SSdbRow *pRow) {
assert(pRow->rowData != NULL); assert(pRow->rowData != NULL);
SSTableObj *pStable = (SSTableObj *) calloc(1, sizeof(SSTableObj)); SSTableObj *pStable = (SSTableObj *)calloc(1, sizeof(SSTableObj));
if (pStable == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY; if (pStable == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY;
int32_t len = (int32_t)strlen(pRow->rowData); int32_t len = (int32_t)strlen(pRow->rowData);
if (len >= TSDB_TABLE_FNAME_LEN){ if (len >= TSDB_TABLE_FNAME_LEN) {
free(pStable); free(pStable);
return TSDB_CODE_MND_INVALID_TABLE_ID; return TSDB_CODE_MND_INVALID_TABLE_ID;
} }
...@@ -643,19 +638,17 @@ static int32_t mnodeSuperTableActionDecode(SSdbRow *pRow) { ...@@ -643,19 +638,17 @@ static int32_t mnodeSuperTableActionDecode(SSdbRow *pRow) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeSuperTableActionRestored() { static int32_t mnodeSuperTableActionRestored() { return 0; }
return 0;
}
static int32_t mnodeInitSuperTables() { static int32_t mnodeInitSuperTables() {
SSTableObj tObj; SSTableObj tObj;
tsSuperTableUpdateSize = (int32_t)((int8_t *)tObj.updateEnd - (int8_t *)&tObj.info.type); tsSuperTableUpdateSize = (int32_t)((int8_t *)tObj.updateEnd - (int8_t *)&tObj.info.type);
SSdbTableDesc desc = { SSdbTableDesc desc = {.id = SDB_TABLE_STABLE,
.id = SDB_TABLE_STABLE,
.name = "stables", .name = "stables",
.hashSessions = TSDB_DEFAULT_STABLES_HASH_SIZE, .hashSessions = TSDB_DEFAULT_STABLES_HASH_SIZE,
.maxRowSize = sizeof(SSTableObj) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16) + TSDB_TABLE_FNAME_LEN, .maxRowSize = sizeof(SSTableObj) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16) +
TSDB_TABLE_FNAME_LEN,
.refCountPos = (int32_t)((int8_t *)(&tObj.refCount) - (int8_t *)&tObj), .refCountPos = (int32_t)((int8_t *)(&tObj.refCount) - (int8_t *)&tObj),
.keyType = SDB_KEY_VAR_STRING, .keyType = SDB_KEY_VAR_STRING,
.fpInsert = mnodeSuperTableActionInsert, .fpInsert = mnodeSuperTableActionInsert,
...@@ -664,8 +657,7 @@ static int32_t mnodeInitSuperTables() { ...@@ -664,8 +657,7 @@ static int32_t mnodeInitSuperTables() {
.fpEncode = mnodeSuperTableActionEncode, .fpEncode = mnodeSuperTableActionEncode,
.fpDecode = mnodeSuperTableActionDecode, .fpDecode = mnodeSuperTableActionDecode,
.fpDestroy = mnodeSuperTableActionDestroy, .fpDestroy = mnodeSuperTableActionDestroy,
.fpRestored = mnodeSuperTableActionRestored .fpRestored = mnodeSuperTableActionRestored};
};
tsSTableUidHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); tsSTableUidHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
tsSTableRid = sdbOpenTable(&desc); tsSTableRid = sdbOpenTable(&desc);
...@@ -725,13 +717,9 @@ int32_t mnodeInitTables() { ...@@ -725,13 +717,9 @@ int32_t mnodeInitTables() {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void *mnodeGetChildTable(char *tableId) { static void *mnodeGetChildTable(char *tableId) { return sdbGetRow(tsChildTableSdb, tableId); }
return sdbGetRow(tsChildTableSdb, tableId);
}
static void *mnodeGetSuperTable(char *tableId) { static void *mnodeGetSuperTable(char *tableId) { return sdbGetRow(tsSuperTableSdb, tableId); }
return sdbGetRow(tsSuperTableSdb, tableId);
}
static void *mnodeGetSuperTableByUid(uint64_t uid) { static void *mnodeGetSuperTableByUid(uint64_t uid) {
SSTableObj **ppStable = taosHashGet(tsSTableUidHash, &uid, sizeof(int64_t)); SSTableObj **ppStable = taosHashGet(tsSTableUidHash, &uid, sizeof(int64_t));
...@@ -760,17 +748,13 @@ void *mnodeGetNextChildTable(void *pIter, SCTableObj **pTable) { ...@@ -760,17 +748,13 @@ void *mnodeGetNextChildTable(void *pIter, SCTableObj **pTable) {
return sdbFetchRow(tsChildTableSdb, pIter, (void **)pTable); return sdbFetchRow(tsChildTableSdb, pIter, (void **)pTable);
} }
void mnodeCancelGetNextChildTable(void *pIter) { void mnodeCancelGetNextChildTable(void *pIter) { sdbFreeIter(tsChildTableSdb, pIter); }
sdbFreeIter(tsChildTableSdb, pIter);
}
void *mnodeGetNextSuperTable(void *pIter, SSTableObj **pTable) { void *mnodeGetNextSuperTable(void *pIter, SSTableObj **pTable) {
return sdbFetchRow(tsSuperTableSdb, pIter, (void **)pTable); return sdbFetchRow(tsSuperTableSdb, pIter, (void **)pTable);
} }
void mnodeCancelGetNextSuperTable(void *pIter) { void mnodeCancelGetNextSuperTable(void *pIter) { sdbFreeIter(tsSuperTableSdb, pIter); }
sdbFreeIter(tsSuperTableSdb, pIter);
}
void mnodeIncTableRef(void *p1) { void mnodeIncTableRef(void *p1) {
STableObj *pTable = (STableObj *)p1; STableObj *pTable = (STableObj *)p1;
...@@ -798,7 +782,7 @@ void mnodeCleanupTables() { ...@@ -798,7 +782,7 @@ void mnodeCleanupTables() {
} }
// todo move to name.h, add length of table name // todo move to name.h, add length of table name
static void mnodeExtractTableName(char* tableId, char* name) { static void mnodeExtractTableName(char *tableId, char *name) {
int pos = -1; int pos = -1;
int num = 0; int num = 0;
for (pos = 0; tableId[pos] != 0; ++pos) { for (pos = 0; tableId[pos] != 0; ++pos) {
...@@ -815,7 +799,7 @@ static SMnodeMsg *mnodeCreateSubMsg(SMnodeMsg *pBatchMasterMsg, int32_t contSize ...@@ -815,7 +799,7 @@ static SMnodeMsg *mnodeCreateSubMsg(SMnodeMsg *pBatchMasterMsg, int32_t contSize
SMnodeMsg *pSubMsg = taosAllocateQitem(sizeof(*pBatchMasterMsg) + contSize); SMnodeMsg *pSubMsg = taosAllocateQitem(sizeof(*pBatchMasterMsg) + contSize);
*pSubMsg = *pBatchMasterMsg; *pSubMsg = *pBatchMasterMsg;
//pSubMsg->pCont = (char *) pSubMsg + sizeof(SMnodeMsg); // pSubMsg->pCont = (char *) pSubMsg + sizeof(SMnodeMsg);
pSubMsg->rpcMsg.pCont = pSubMsg->pCont; pSubMsg->rpcMsg.pCont = pSubMsg->pCont;
pSubMsg->successed = 0; pSubMsg->successed = 0;
pSubMsg->expected = 0; pSubMsg->expected = 0;
...@@ -846,7 +830,8 @@ static int32_t mnodeValidateCreateTableMsg(SCreateTableMsg *pCreateTable, SMnode ...@@ -846,7 +830,8 @@ static int32_t mnodeValidateCreateTableMsg(SCreateTableMsg *pCreateTable, SMnode
} }
if (pMsg->pDb == NULL) { if (pMsg->pDb == NULL) {
mError("msg:%p, app:%p table:%s, failed to create, db not selected", pMsg, pMsg->rpcMsg.ahandle, pCreateTable->tableName); mError("msg:%p, app:%p table:%s, failed to create, db not selected", pMsg, pMsg->rpcMsg.ahandle,
pCreateTable->tableName);
return TSDB_CODE_MND_DB_NOT_SELECTED; return TSDB_CODE_MND_DB_NOT_SELECTED;
} }
...@@ -891,13 +876,14 @@ static int32_t mnodeProcessBatchCreateTableMsg(SMnodeMsg *pMsg) { ...@@ -891,13 +876,14 @@ static int32_t mnodeProcessBatchCreateTableMsg(SMnodeMsg *pMsg) {
pMsg->expected = numOfTables; pMsg->expected = numOfTables;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SCreateTableMsg *pCreateTable = (SCreateTableMsg*) ((char*) pCreate + sizeof(SCMCreateTableMsg)); SCreateTableMsg *pCreateTable = (SCreateTableMsg *)((char *)pCreate + sizeof(SCMCreateTableMsg));
for (SCreateTableMsg *p = pCreateTable; p < (SCreateTableMsg *) ((char *) pCreate + contentLen); p = (SCreateTableMsg *) ((char *) p + htonl(p->len))) { for (SCreateTableMsg *p = pCreateTable; p < (SCreateTableMsg *)((char *)pCreate + contentLen);
p = (SCreateTableMsg *)((char *)p + htonl(p->len))) {
SMnodeMsg *pSubMsg = mnodeCreateSubMsg(pMsg, sizeof(SCMCreateTableMsg) + htonl(p->len)); SMnodeMsg *pSubMsg = mnodeCreateSubMsg(pMsg, sizeof(SCMCreateTableMsg) + htonl(p->len));
memcpy(pSubMsg->pCont + sizeof(SCMCreateTableMsg), p, htonl(p->len)); memcpy(pSubMsg->pCont + sizeof(SCMCreateTableMsg), p, htonl(p->len));
code = mnodeValidateCreateTableMsg(p, pSubMsg); code = mnodeValidateCreateTableMsg(p, pSubMsg);
if (code == TSDB_CODE_SUCCESS || ( p->igExists == 1 && code == TSDB_CODE_MND_TABLE_ALREADY_EXIST )) { if (code == TSDB_CODE_SUCCESS || (p->igExists == 1 && code == TSDB_CODE_MND_TABLE_ALREADY_EXIST)) {
++pSubMsg->pBatchMasterMsg->successed; ++pSubMsg->pBatchMasterMsg->successed;
mnodeDestroySubMsg(pSubMsg); mnodeDestroySubMsg(pSubMsg);
continue; continue;
...@@ -917,7 +903,7 @@ static int32_t mnodeProcessBatchCreateTableMsg(SMnodeMsg *pMsg) { ...@@ -917,7 +903,7 @@ static int32_t mnodeProcessBatchCreateTableMsg(SMnodeMsg *pMsg) {
} else { } else {
if (pMsg->pBatchMasterMsg != pMsg) { // batch sub replay if (pMsg->pBatchMasterMsg != pMsg) { // batch sub replay
SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont; SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont;
SCreateTableMsg *pCreateTable = (SCreateTableMsg*) ((char*) pCreate + sizeof(SCMCreateTableMsg)); SCreateTableMsg *pCreateTable = (SCreateTableMsg *)((char *)pCreate + sizeof(SCMCreateTableMsg));
int32_t code = mnodeValidateCreateTableMsg(pCreateTable, pMsg); int32_t code = mnodeValidateCreateTableMsg(pCreateTable, pMsg);
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_MND_TABLE_ALREADY_EXIST) { if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_MND_TABLE_ALREADY_EXIST) {
++pMsg->pBatchMasterMsg->successed; ++pMsg->pBatchMasterMsg->successed;
...@@ -930,8 +916,7 @@ static int32_t mnodeProcessBatchCreateTableMsg(SMnodeMsg *pMsg) { ...@@ -930,8 +916,7 @@ static int32_t mnodeProcessBatchCreateTableMsg(SMnodeMsg *pMsg) {
mnodeDestroySubMsg(pMsg); mnodeDestroySubMsg(pMsg);
} }
if (pMsg->pBatchMasterMsg->successed + pMsg->pBatchMasterMsg->received if (pMsg->pBatchMasterMsg->successed + pMsg->pBatchMasterMsg->received >= pMsg->pBatchMasterMsg->expected) {
>= pMsg->pBatchMasterMsg->expected) {
dnodeSendRpcMWriteRsp(pMsg->pBatchMasterMsg, pMsg->pBatchMasterMsg->code); dnodeSendRpcMWriteRsp(pMsg->pBatchMasterMsg, pMsg->pBatchMasterMsg->code);
} }
...@@ -957,7 +942,7 @@ static int32_t mnodeProcessCreateTableMsg(SMnodeMsg *pMsg) { ...@@ -957,7 +942,7 @@ static int32_t mnodeProcessCreateTableMsg(SMnodeMsg *pMsg) {
return mnodeProcessBatchCreateTableMsg(pMsg); return mnodeProcessBatchCreateTableMsg(pMsg);
} }
SCreateTableMsg *p = (SCreateTableMsg*)((char*) pCreate + sizeof(SCMCreateTableMsg)); SCreateTableMsg *p = (SCreateTableMsg *)((char *)pCreate + sizeof(SCMCreateTableMsg));
if (pMsg->pDb == NULL) { if (pMsg->pDb == NULL) {
pMsg->pDb = mnodeGetDbByTableName(p->tableName); pMsg->pDb = mnodeGetDbByTableName(p->tableName);
} }
...@@ -981,7 +966,8 @@ static int32_t mnodeProcessCreateTableMsg(SMnodeMsg *pMsg) { ...@@ -981,7 +966,8 @@ static int32_t mnodeProcessCreateTableMsg(SMnodeMsg *pMsg) {
mDebug("msg:%p, app:%p table:%s, is already exist", pMsg, pMsg->rpcMsg.ahandle, p->tableName); mDebug("msg:%p, app:%p table:%s, is already exist", pMsg, pMsg->rpcMsg.ahandle, p->tableName);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { } else {
mError("msg:%p, app:%p table:%s, failed to create, table already exist", pMsg, pMsg->rpcMsg.ahandle, p->tableName); mError("msg:%p, app:%p table:%s, failed to create, table already exist", pMsg, pMsg->rpcMsg.ahandle,
p->tableName);
return TSDB_CODE_MND_TABLE_ALREADY_EXIST; return TSDB_CODE_MND_TABLE_ALREADY_EXIST;
} }
} }
...@@ -1055,8 +1041,8 @@ static int32_t mnodeProcessDropTableMsg(SMnodeMsg *pMsg) { ...@@ -1055,8 +1041,8 @@ static int32_t mnodeProcessDropTableMsg(SMnodeMsg *pMsg) {
static int32_t mnodeProcessTableMetaMsg(SMnodeMsg *pMsg) { static int32_t mnodeProcessTableMetaMsg(SMnodeMsg *pMsg) {
STableInfoMsg *pInfo = pMsg->rpcMsg.pCont; STableInfoMsg *pInfo = pMsg->rpcMsg.pCont;
pInfo->createFlag = htons(pInfo->createFlag); pInfo->createFlag = htons(pInfo->createFlag);
mDebug("msg:%p, app:%p table:%s, table meta msg is received from thandle:%p, createFlag:%d", pMsg, pMsg->rpcMsg.ahandle, mDebug("msg:%p, app:%p table:%s, table meta msg is received from thandle:%p, createFlag:%d", pMsg,
pInfo->tableFname, pMsg->rpcMsg.handle, pInfo->createFlag); pMsg->rpcMsg.ahandle, pInfo->tableFname, pMsg->rpcMsg.handle, pInfo->createFlag);
if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDbByTableName(pInfo->tableFname); if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDbByTableName(pInfo->tableFname);
if (pMsg->pDb == NULL) { if (pMsg->pDb == NULL) {
...@@ -1096,20 +1082,18 @@ static int32_t mnodeCreateSuperTableCb(SMnodeMsg *pMsg, int32_t code) { ...@@ -1096,20 +1082,18 @@ static int32_t mnodeCreateSuperTableCb(SMnodeMsg *pMsg, int32_t code) {
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
mLInfo("stable:%s, is created in sdb, uid:%" PRIu64, pTable->info.tableId, pTable->uid); mLInfo("stable:%s, is created in sdb, uid:%" PRIu64, pTable->info.tableId, pTable->uid);
if(pMsg->pBatchMasterMsg) if (pMsg->pBatchMasterMsg) pMsg->pBatchMasterMsg->successed++;
pMsg->pBatchMasterMsg->successed ++;
} else { } else {
mError("msg:%p, app:%p stable:%s, failed to create in sdb, reason:%s", pMsg, pMsg->rpcMsg.ahandle, pTable->info.tableId, mError("msg:%p, app:%p stable:%s, failed to create in sdb, reason:%s", pMsg, pMsg->rpcMsg.ahandle,
tstrerror(code)); pTable->info.tableId, tstrerror(code));
SSdbRow desc = {.type = SDB_OPER_GLOBAL, .pObj = pTable, .pTable = tsSuperTableSdb}; SSdbRow desc = {.type = SDB_OPER_GLOBAL, .pObj = pTable, .pTable = tsSuperTableSdb};
sdbDeleteRow(&desc); sdbDeleteRow(&desc);
if(pMsg->pBatchMasterMsg) if (pMsg->pBatchMasterMsg) pMsg->pBatchMasterMsg->received++;
pMsg->pBatchMasterMsg->received ++;
} }
monSaveAuditLog(MON_DDL_CMD_CREATE_SUPER_TABLE, mnodeGetUserFromMsg(pMsg), pTable->info.tableId, !code); monSaveAuditLog(MON_DDL_CMD_CREATE_SUPER_TABLE, mnodeGetUserFromMsg(pMsg), pTable->info.tableId, !code);
// if super table create by batch msg, check done and send finished to client // if super table create by batch msg, check done and send finished to client
if(pMsg->pBatchMasterMsg) { if (pMsg->pBatchMasterMsg) {
if (pMsg->pBatchMasterMsg->successed + pMsg->pBatchMasterMsg->received >= pMsg->pBatchMasterMsg->expected) if (pMsg->pBatchMasterMsg->successed + pMsg->pBatchMasterMsg->received >= pMsg->pBatchMasterMsg->expected)
dnodeSendRpcMWriteRsp(pMsg->pBatchMasterMsg, code); dnodeSendRpcMWriteRsp(pMsg->pBatchMasterMsg, code);
} }
...@@ -1119,7 +1103,7 @@ static int32_t mnodeCreateSuperTableCb(SMnodeMsg *pMsg, int32_t code) { ...@@ -1119,7 +1103,7 @@ static int32_t mnodeCreateSuperTableCb(SMnodeMsg *pMsg, int32_t code) {
static uint64_t mnodeCreateSuperTableUid() { static uint64_t mnodeCreateSuperTableUid() {
int64_t us = taosGetTimestampUs(); int64_t us = taosGetTimestampUs();
uint64_t x = (us & ((((uint64_t)1)<<40) - 1)); uint64_t x = (us & ((((uint64_t)1) << 40) - 1));
x = x << 24; x = x << 24;
return x + ((sdbGetVersion() & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul)); return x + ((sdbGetVersion() & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
...@@ -1139,7 +1123,7 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) { ...@@ -1139,7 +1123,7 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) {
return TSDB_CODE_MND_INVALID_CREATE_TABLE_MSG; return TSDB_CODE_MND_INVALID_CREATE_TABLE_MSG;
} }
SCreateTableMsg* pCreate = (SCreateTableMsg*)((char*)pCreate1 + sizeof(SCMCreateTableMsg)); SCreateTableMsg *pCreate = (SCreateTableMsg *)((char *)pCreate1 + sizeof(SCMCreateTableMsg));
int16_t numOfTags = htons(pCreate->numOfTags); int16_t numOfTags = htons(pCreate->numOfTags);
if (numOfTags > TSDB_MAX_TAGS) { if (numOfTags > TSDB_MAX_TAGS) {
...@@ -1150,13 +1134,15 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) { ...@@ -1150,13 +1134,15 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) {
int16_t numOfColumns = htons(pCreate->numOfColumns); int16_t numOfColumns = htons(pCreate->numOfColumns);
int32_t numOfCols = numOfColumns + numOfTags; int32_t numOfCols = numOfColumns + numOfTags;
if (numOfCols > TSDB_MAX_COLUMNS) { if (numOfCols > TSDB_MAX_COLUMNS) {
mError("msg:%p, app:%p table:%s, failed to create, too many columns", pMsg, pMsg->rpcMsg.ahandle, pCreate->tableName); mError("msg:%p, app:%p table:%s, failed to create, too many columns", pMsg, pMsg->rpcMsg.ahandle,
pCreate->tableName);
return TSDB_CODE_MND_TOO_MANY_COLUMNS; return TSDB_CODE_MND_TOO_MANY_COLUMNS;
} }
SSTableObj *pStable = calloc(1, sizeof(SSTableObj)); SSTableObj *pStable = calloc(1, sizeof(SSTableObj));
if (pStable == NULL) { if (pStable == NULL) {
mError("msg:%p, app:%p table:%s, failed to create, no enough memory", pMsg, pMsg->rpcMsg.ahandle, pCreate->tableName); mError("msg:%p, app:%p table:%s, failed to create, no enough memory", pMsg, pMsg->rpcMsg.ahandle,
pCreate->tableName);
return TSDB_CODE_MND_OUT_OF_MEMORY; return TSDB_CODE_MND_OUT_OF_MEMORY;
} }
...@@ -1174,7 +1160,8 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) { ...@@ -1174,7 +1160,8 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) {
if (pStable->schema == NULL) { if (pStable->schema == NULL) {
tfree(pStable->info.tableId); tfree(pStable->info.tableId);
tfree(pStable); tfree(pStable);
mError("msg:%p, app:%p table:%s, failed to create, no schema input", pMsg, pMsg->rpcMsg.ahandle, pCreate->tableName); mError("msg:%p, app:%p table:%s, failed to create, no schema input", pMsg, pMsg->rpcMsg.ahandle,
pCreate->tableName);
return TSDB_CODE_MND_INVALID_TABLE_NAME; return TSDB_CODE_MND_INVALID_TABLE_NAME;
} }
...@@ -1189,7 +1176,8 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) { ...@@ -1189,7 +1176,8 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) {
} }
if (!tIsValidSchema(pStable->schema, pStable->numOfColumns, pStable->numOfTags)) { if (!tIsValidSchema(pStable->schema, pStable->numOfColumns, pStable->numOfTags)) {
mError("msg:%p, app:%p table:%s, failed to create table, invalid schema", pMsg, pMsg->rpcMsg.ahandle, pCreate->tableName); mError("msg:%p, app:%p table:%s, failed to create table, invalid schema", pMsg, pMsg->rpcMsg.ahandle,
pCreate->tableName);
tfree(pStable->info.tableId); tfree(pStable->info.tableId);
tfree(pStable->schema); tfree(pStable->schema);
tfree(pStable); tfree(pStable);
...@@ -1199,14 +1187,12 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) { ...@@ -1199,14 +1187,12 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) {
pMsg->pTable = (STableObj *)pStable; pMsg->pTable = (STableObj *)pStable;
mnodeIncTableRef(pMsg->pTable); mnodeIncTableRef(pMsg->pTable);
SSdbRow row = { SSdbRow row = {.type = SDB_OPER_GLOBAL,
.type = SDB_OPER_GLOBAL,
.pTable = tsSuperTableSdb, .pTable = tsSuperTableSdb,
.pObj = pStable, .pObj = pStable,
.rowSize = sizeof(SSTableObj) + schemaSize, .rowSize = sizeof(SSTableObj) + schemaSize,
.pMsg = pMsg, .pMsg = pMsg,
.fpRsp = mnodeCreateSuperTableCb .fpRsp = mnodeCreateSuperTableCb};
};
int32_t code = sdbInsertRow(&row); int32_t code = sdbInsertRow(&row);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
...@@ -1267,13 +1253,11 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) { ...@@ -1267,13 +1253,11 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) {
mInfo("msg:%p, app:%p stable:%s will be dropped, hash:%p sizeOfVgList:%d", pMsg, pMsg->rpcMsg.ahandle, mInfo("msg:%p, app:%p stable:%s will be dropped, hash:%p sizeOfVgList:%d", pMsg, pMsg->rpcMsg.ahandle,
pStable->info.tableId, pStable->vgHash, taosHashGetSize(pStable->vgHash)); pStable->info.tableId, pStable->vgHash, taosHashGetSize(pStable->vgHash));
SSdbRow row = { SSdbRow row = {.type = SDB_OPER_GLOBAL,
.type = SDB_OPER_GLOBAL,
.pTable = tsSuperTableSdb, .pTable = tsSuperTableSdb,
.pObj = pStable, .pObj = pStable,
.pMsg = pMsg, .pMsg = pMsg,
.fpRsp = mnodeDropSuperTableCb .fpRsp = mnodeDropSuperTableCb};
};
int32_t code = sdbDeleteRow(&row); int32_t code = sdbDeleteRow(&row);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
...@@ -1285,7 +1269,7 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) { ...@@ -1285,7 +1269,7 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) {
} }
static int32_t mnodeFindSuperTableTagIndex(SSTableObj *pStable, const char *tagName) { static int32_t mnodeFindSuperTableTagIndex(SSTableObj *pStable, const char *tagName) {
SSchema *schema = (SSchema *) pStable->schema; SSchema *schema = (SSchema *)pStable->schema;
for (int32_t tag = 0; tag < pStable->numOfTags; tag++) { for (int32_t tag = 0; tag < pStable->numOfTags; tag++) {
if (strcmp(schema[pStable->numOfColumns + tag].name, tagName) == 0) { if (strcmp(schema[pStable->numOfColumns + tag].name, tagName) == 0) {
return tag; return tag;
...@@ -1344,13 +1328,11 @@ static int32_t mnodeAddSuperTableTag(SMnodeMsg *pMsg, SSchema schema[], int32_t ...@@ -1344,13 +1328,11 @@ static int32_t mnodeAddSuperTableTag(SMnodeMsg *pMsg, SSchema schema[], int32_t
mInfo("msg:%p, app:%p stable %s, start to add tag %s", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId, mInfo("msg:%p, app:%p stable %s, start to add tag %s", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId,
schema[0].name); schema[0].name);
SSdbRow row = { SSdbRow row = {.type = SDB_OPER_GLOBAL,
.type = SDB_OPER_GLOBAL,
.pTable = tsSuperTableSdb, .pTable = tsSuperTableSdb,
.pObj = pStable, .pObj = pStable,
.pMsg = pMsg, .pMsg = pMsg,
.fpRsp = mnodeAddSuperTableTagCb .fpRsp = mnodeAddSuperTableTagCb};
};
return sdbUpdateRow(&row); return sdbUpdateRow(&row);
} }
...@@ -1383,13 +1365,11 @@ static int32_t mnodeDropSuperTableTag(SMnodeMsg *pMsg, char *tagName) { ...@@ -1383,13 +1365,11 @@ static int32_t mnodeDropSuperTableTag(SMnodeMsg *pMsg, char *tagName) {
mInfo("msg:%p, app:%p stable %s, start to drop tag %s", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId, tagName); mInfo("msg:%p, app:%p stable %s, start to drop tag %s", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId, tagName);
SSdbRow row = { SSdbRow row = {.type = SDB_OPER_GLOBAL,
.type = SDB_OPER_GLOBAL,
.pTable = tsSuperTableSdb, .pTable = tsSuperTableSdb,
.pObj = pStable, .pObj = pStable,
.pMsg = pMsg, .pMsg = pMsg,
.fpRsp = mnodeDropSuperTableTagCb .fpRsp = mnodeDropSuperTableTagCb};
};
return sdbUpdateRow(&row); return sdbUpdateRow(&row);
} }
...@@ -1426,25 +1406,23 @@ static int32_t mnodeModifySuperTableTagName(SMnodeMsg *pMsg, char *oldTagName, c ...@@ -1426,25 +1406,23 @@ static int32_t mnodeModifySuperTableTagName(SMnodeMsg *pMsg, char *oldTagName, c
} }
// update // update
SSchema *schema = (SSchema *) (pStable->schema + pStable->numOfColumns + col); SSchema *schema = (SSchema *)(pStable->schema + pStable->numOfColumns + col);
tstrncpy(schema->name, newTagName, sizeof(schema->name)); tstrncpy(schema->name, newTagName, sizeof(schema->name));
mInfo("msg:%p, app:%p stable %s, start to modify tag %s to %s", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId, mInfo("msg:%p, app:%p stable %s, start to modify tag %s to %s", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId,
oldTagName, newTagName); oldTagName, newTagName);
SSdbRow row = { SSdbRow row = {.type = SDB_OPER_GLOBAL,
.type = SDB_OPER_GLOBAL,
.pTable = tsSuperTableSdb, .pTable = tsSuperTableSdb,
.pObj = pStable, .pObj = pStable,
.pMsg = pMsg, .pMsg = pMsg,
.fpRsp = mnodeModifySuperTableTagNameCb .fpRsp = mnodeModifySuperTableTagNameCb};
};
return sdbUpdateRow(&row); return sdbUpdateRow(&row);
} }
static int32_t mnodeFindSuperTableColumnIndex(SSTableObj *pStable, char *colName) { static int32_t mnodeFindSuperTableColumnIndex(SSTableObj *pStable, char *colName) {
SSchema *schema = (SSchema *) pStable->schema; SSchema *schema = (SSchema *)pStable->schema;
for (int32_t col = 0; col < pStable->numOfColumns; col++) { for (int32_t col = 0; col < pStable->numOfColumns; col++) {
if (strcmp(schema[col].name, colName) == 0) { if (strcmp(schema[col].name, colName) == 0) {
return col; return col;
...@@ -1470,7 +1448,8 @@ static int32_t mnodeAddSuperTableColumn(SMnodeMsg *pMsg, SSchema schema[], int32 ...@@ -1470,7 +1448,8 @@ static int32_t mnodeAddSuperTableColumn(SMnodeMsg *pMsg, SSchema schema[], int32
SDbObj *pDb = pMsg->pDb; SDbObj *pDb = pMsg->pDb;
SSTableObj *pStable = (SSTableObj *)pMsg->pTable; SSTableObj *pStable = (SSTableObj *)pMsg->pTable;
if (ncols <= 0) { if (ncols <= 0) {
mError("msg:%p, app:%p stable:%s, add column, ncols:%d <= 0", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId, ncols); mError("msg:%p, app:%p stable:%s, add column, ncols:%d <= 0", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId,
ncols);
return TSDB_CODE_MND_APP_ERROR; return TSDB_CODE_MND_APP_ERROR;
} }
...@@ -1500,7 +1479,7 @@ static int32_t mnodeAddSuperTableColumn(SMnodeMsg *pMsg, SSchema schema[], int32 ...@@ -1500,7 +1479,7 @@ static int32_t mnodeAddSuperTableColumn(SMnodeMsg *pMsg, SSchema schema[], int32
sizeof(SSchema) * pStable->numOfTags); sizeof(SSchema) * pStable->numOfTags);
memcpy(pStable->schema + pStable->numOfColumns, schema, sizeof(SSchema) * ncols); memcpy(pStable->schema + pStable->numOfColumns, schema, sizeof(SSchema) * ncols);
SSchema *tschema = (SSchema *) (pStable->schema + pStable->numOfColumns); SSchema *tschema = (SSchema *)(pStable->schema + pStable->numOfColumns);
for (int32_t i = 0; i < ncols; i++) { for (int32_t i = 0; i < ncols; i++) {
tschema[i].colId = pStable->nextColId++; tschema[i].colId = pStable->nextColId++;
} }
...@@ -1517,13 +1496,11 @@ static int32_t mnodeAddSuperTableColumn(SMnodeMsg *pMsg, SSchema schema[], int32 ...@@ -1517,13 +1496,11 @@ static int32_t mnodeAddSuperTableColumn(SMnodeMsg *pMsg, SSchema schema[], int32
mInfo("msg:%p, app:%p stable %s, start to add column", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId); mInfo("msg:%p, app:%p stable %s, start to add column", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId);
SSdbRow row = { SSdbRow row = {.type = SDB_OPER_GLOBAL,
.type = SDB_OPER_GLOBAL,
.pTable = tsSuperTableSdb, .pTable = tsSuperTableSdb,
.pObj = pStable, .pObj = pStable,
.pMsg = pMsg, .pMsg = pMsg,
.fpRsp = mnodeAddSuperTableColumnCb .fpRsp = mnodeAddSuperTableColumnCb};
};
return sdbUpdateRow(&row); return sdbUpdateRow(&row);
} }
...@@ -1545,8 +1522,8 @@ static int32_t mnodeDropSuperTableColumn(SMnodeMsg *pMsg, char *colName) { ...@@ -1545,8 +1522,8 @@ static int32_t mnodeDropSuperTableColumn(SMnodeMsg *pMsg, char *colName) {
SSTableObj *pStable = (SSTableObj *)pMsg->pTable; SSTableObj *pStable = (SSTableObj *)pMsg->pTable;
int32_t col = mnodeFindSuperTableColumnIndex(pStable, colName); int32_t col = mnodeFindSuperTableColumnIndex(pStable, colName);
if (col <= 0) { if (col <= 0) {
mError("msg:%p, app:%p stable:%s, drop column, column:%s not exist", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId, mError("msg:%p, app:%p stable:%s, drop column, column:%s not exist", pMsg, pMsg->rpcMsg.ahandle,
colName); pStable->info.tableId, colName);
return TSDB_CODE_MND_FIELD_NOT_EXIST; return TSDB_CODE_MND_FIELD_NOT_EXIST;
} }
...@@ -1568,13 +1545,11 @@ static int32_t mnodeDropSuperTableColumn(SMnodeMsg *pMsg, char *colName) { ...@@ -1568,13 +1545,11 @@ static int32_t mnodeDropSuperTableColumn(SMnodeMsg *pMsg, char *colName) {
mInfo("msg:%p, app:%p stable %s, start to delete column", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId); mInfo("msg:%p, app:%p stable %s, start to delete column", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId);
SSdbRow row = { SSdbRow row = {.type = SDB_OPER_GLOBAL,
.type = SDB_OPER_GLOBAL,
.pTable = tsSuperTableSdb, .pTable = tsSuperTableSdb,
.pObj = pStable, .pObj = pStable,
.pMsg = pMsg, .pMsg = pMsg,
.fpRsp = mnodeDropSuperTableColumnCb .fpRsp = mnodeDropSuperTableColumnCb};
};
return sdbUpdateRow(&row); return sdbUpdateRow(&row);
} }
...@@ -1593,12 +1568,11 @@ static int32_t mnodeChangeSuperTableColumnCb(SMnodeMsg *pMsg, int32_t code) { ...@@ -1593,12 +1568,11 @@ static int32_t mnodeChangeSuperTableColumnCb(SMnodeMsg *pMsg, int32_t code) {
static int32_t mnodeChangeSuperTableColumn(SMnodeMsg *pMsg) { static int32_t mnodeChangeSuperTableColumn(SMnodeMsg *pMsg) {
SAlterTableMsg *pAlter = pMsg->rpcMsg.pCont; SAlterTableMsg *pAlter = pMsg->rpcMsg.pCont;
char* name = pAlter->schema[0].name; char *name = pAlter->schema[0].name;
SSTableObj *pStable = (SSTableObj *)pMsg->pTable; SSTableObj *pStable = (SSTableObj *)pMsg->pTable;
int32_t col = mnodeFindSuperTableColumnIndex(pStable, name); int32_t col = mnodeFindSuperTableColumnIndex(pStable, name);
if (col < 0) { if (col < 0) {
mError("msg:%p, app:%p stable:%s, change column, name:%s", pMsg, pMsg->rpcMsg.ahandle, mError("msg:%p, app:%p stable:%s, change column, name:%s", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId, name);
pStable->info.tableId, name);
return TSDB_CODE_MND_FIELD_NOT_EXIST; return TSDB_CODE_MND_FIELD_NOT_EXIST;
} }
...@@ -1619,7 +1593,7 @@ static int32_t mnodeChangeSuperTableColumn(SMnodeMsg *pMsg) { ...@@ -1619,7 +1593,7 @@ static int32_t mnodeChangeSuperTableColumn(SMnodeMsg *pMsg) {
} }
// update // update
SSchema *schema = (SSchema *) (pStable->schema + col); SSchema *schema = (SSchema *)(pStable->schema + col);
ASSERT(schema->type == TSDB_DATA_TYPE_BINARY || schema->type == TSDB_DATA_TYPE_NCHAR); ASSERT(schema->type == TSDB_DATA_TYPE_BINARY || schema->type == TSDB_DATA_TYPE_NCHAR);
if (pAlter->schema[0].bytes <= schema->bytes) { if (pAlter->schema[0].bytes <= schema->bytes) {
...@@ -1630,33 +1604,30 @@ static int32_t mnodeChangeSuperTableColumn(SMnodeMsg *pMsg) { ...@@ -1630,33 +1604,30 @@ static int32_t mnodeChangeSuperTableColumn(SMnodeMsg *pMsg) {
schema->bytes = pAlter->schema[0].bytes; schema->bytes = pAlter->schema[0].bytes;
pStable->sversion++; pStable->sversion++;
mInfo("msg:%p, app:%p stable %s, start to modify column %s len to %d", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId, mInfo("msg:%p, app:%p stable %s, start to modify column %s len to %d", pMsg, pMsg->rpcMsg.ahandle,
name, schema->bytes); pStable->info.tableId, name, schema->bytes);
SSdbRow row = { SSdbRow row = {.type = SDB_OPER_GLOBAL,
.type = SDB_OPER_GLOBAL,
.pTable = tsSuperTableSdb, .pTable = tsSuperTableSdb,
.pObj = pStable, .pObj = pStable,
.pMsg = pMsg, .pMsg = pMsg,
.fpRsp = mnodeChangeSuperTableColumnCb .fpRsp = mnodeChangeSuperTableColumnCb};
};
return sdbUpdateRow(&row); return sdbUpdateRow(&row);
} }
static int32_t mnodeChangeSuperTableTag(SMnodeMsg *pMsg) { static int32_t mnodeChangeSuperTableTag(SMnodeMsg *pMsg) {
SAlterTableMsg *pAlter = pMsg->rpcMsg.pCont; SAlterTableMsg *pAlter = pMsg->rpcMsg.pCont;
char* name = pAlter->schema[0].name; char *name = pAlter->schema[0].name;
SSTableObj *pStable = (SSTableObj *)pMsg->pTable; SSTableObj *pStable = (SSTableObj *)pMsg->pTable;
int32_t col = mnodeFindSuperTableTagIndex(pStable, name); int32_t col = mnodeFindSuperTableTagIndex(pStable, name);
if (col < 0) { if (col < 0) {
mError("msg:%p, app:%p stable:%s, change column, name:%s", pMsg, pMsg->rpcMsg.ahandle, mError("msg:%p, app:%p stable:%s, change column, name:%s", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId, name);
pStable->info.tableId, name);
return TSDB_CODE_MND_FIELD_NOT_EXIST; return TSDB_CODE_MND_FIELD_NOT_EXIST;
} }
// update // update
SSchema *schema = (SSchema *) (pStable->schema + col + pStable->numOfColumns); SSchema *schema = (SSchema *)(pStable->schema + col + pStable->numOfColumns);
ASSERT(schema->type == TSDB_DATA_TYPE_BINARY || schema->type == TSDB_DATA_TYPE_NCHAR); ASSERT(schema->type == TSDB_DATA_TYPE_BINARY || schema->type == TSDB_DATA_TYPE_NCHAR);
if (pAlter->schema[0].bytes <= schema->bytes) { if (pAlter->schema[0].bytes <= schema->bytes) {
mError("msg:%p, app:%p stable:%s, modify tag len. tag:%s, len from %d to %d", pMsg, pMsg->rpcMsg.ahandle, mError("msg:%p, app:%p stable:%s, modify tag len. tag:%s, len from %d to %d", pMsg, pMsg->rpcMsg.ahandle,
...@@ -1669,13 +1640,11 @@ static int32_t mnodeChangeSuperTableTag(SMnodeMsg *pMsg) { ...@@ -1669,13 +1640,11 @@ static int32_t mnodeChangeSuperTableTag(SMnodeMsg *pMsg) {
mInfo("msg:%p, app:%p stable %s, start to modify tag len %s to %d", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId, mInfo("msg:%p, app:%p stable %s, start to modify tag len %s to %d", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId,
name, schema->bytes); name, schema->bytes);
SSdbRow row = { SSdbRow row = {.type = SDB_OPER_GLOBAL,
.type = SDB_OPER_GLOBAL,
.pTable = tsSuperTableSdb, .pTable = tsSuperTableSdb,
.pObj = pStable, .pObj = pStable,
.pMsg = pMsg, .pMsg = pMsg,
.fpRsp = mnodeChangeSuperTableColumnCb .fpRsp = mnodeChangeSuperTableColumnCb};
};
return sdbUpdateRow(&row); return sdbUpdateRow(&row);
} }
...@@ -1694,7 +1663,7 @@ static int32_t mnodeGetShowSuperTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, ...@@ -1694,7 +1663,7 @@ static int32_t mnodeGetShowSuperTableMeta(STableMetaMsg *pMeta, SShowObj *pShow,
int32_t cols = 0; int32_t cols = 0;
SSchema *pSchema = pMeta->schema; SSchema *pSchema = pMeta->schema;
SSchema* tbnameSchema = tGetTbnameColumnSchema(); SSchema *tbnameSchema = tGetTbnameColumnSchema();
pShow->bytes[cols] = tbnameSchema->bytes; pShow->bytes[cols] = tbnameSchema->bytes;
pSchema[cols].type = tbnameSchema->type; pSchema[cols].type = tbnameSchema->type;
strcpy(pSchema[cols].name, "name"); strcpy(pSchema[cols].name, "name");
...@@ -1741,7 +1710,7 @@ static int32_t mnodeGetShowSuperTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, ...@@ -1741,7 +1710,7 @@ static int32_t mnodeGetShowSuperTableMeta(STableMetaMsg *pMeta, SShowObj *pShow,
// retrieve super tables // retrieve super tables
int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, void *pConn) { int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
int32_t numOfRows = 0; int32_t numOfRows = 0;
char * pWrite; char *pWrite;
int32_t cols = 0; int32_t cols = 0;
SSTableObj *pTable = NULL; SSTableObj *pTable = NULL;
char prefix[64] = {0}; char prefix[64] = {0};
...@@ -1763,7 +1732,7 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, ...@@ -1763,7 +1732,7 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows,
SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER; SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER;
char stableName[TSDB_TABLE_NAME_LEN] = {0}; char stableName[TSDB_TABLE_NAME_LEN] = {0};
char* pattern = mnodeGetTableShowPattern(pShow); char *pattern = mnodeGetTableShowPattern(pShow);
if (pShow->payloadLen > 0 && pattern == NULL) { if (pShow->payloadLen > 0 && pattern == NULL) {
return 0; return 0;
} }
...@@ -1779,7 +1748,8 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, ...@@ -1779,7 +1748,8 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows,
memset(stableName, 0, tListLen(stableName)); memset(stableName, 0, tListLen(stableName));
mnodeExtractTableName(pTable->info.tableId, stableName); mnodeExtractTableName(pTable->info.tableId, stableName);
if (pShow->payloadLen > 0 && patternMatch(pattern, stableName, sizeof(stableName) - 1, &info) != TSDB_PATTERN_MATCH) { if (pShow->payloadLen > 0 &&
patternMatch(pattern, stableName, sizeof(stableName) - 1, &info) != TSDB_PATTERN_MATCH) {
mnodeDecTableRef(pTable); mnodeDecTableRef(pTable);
continue; continue;
} }
...@@ -1789,7 +1759,7 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, ...@@ -1789,7 +1759,7 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows,
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
int16_t len = (int16_t)strnlen(stableName, TSDB_TABLE_NAME_LEN - 1); int16_t len = (int16_t)strnlen(stableName, TSDB_TABLE_NAME_LEN - 1);
*(int16_t*) pWrite = len; *(int16_t *)pWrite = len;
pWrite += sizeof(int16_t); // todo refactor pWrite += sizeof(int16_t); // todo refactor
strncpy(pWrite, stableName, len); strncpy(pWrite, stableName, len);
...@@ -1828,7 +1798,7 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, ...@@ -1828,7 +1798,7 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows,
} }
void mnodeDropAllSuperTables(SDbObj *pDropDb) { void mnodeDropAllSuperTables(SDbObj *pDropDb) {
void * pIter= NULL; void *pIter = NULL;
int32_t numOfTables = 0; int32_t numOfTables = 0;
SSTableObj *pTable = NULL; SSTableObj *pTable = NULL;
...@@ -1850,7 +1820,7 @@ void mnodeDropAllSuperTables(SDbObj *pDropDb) { ...@@ -1850,7 +1820,7 @@ void mnodeDropAllSuperTables(SDbObj *pDropDb) {
.pObj = pTable, .pObj = pTable,
}; };
sdbDeleteRow(&row); sdbDeleteRow(&row);
numOfTables ++; numOfTables++;
} }
mnodeDecTableRef(pTable); mnodeDecTableRef(pTable);
...@@ -1874,7 +1844,7 @@ static int32_t mnodeSetSchemaFromSuperTable(SSchema *pSchema, SSTableObj *pTable ...@@ -1874,7 +1844,7 @@ static int32_t mnodeSetSchemaFromSuperTable(SSchema *pSchema, SSTableObj *pTable
return (pTable->numOfColumns + pTable->numOfTags) * sizeof(SSchema); return (pTable->numOfColumns + pTable->numOfTags) * sizeof(SSchema);
} }
static int32_t mnodeDoGetSuperTableMeta(SMnodeMsg *pMsg, STableMetaMsg* pMeta) { static int32_t mnodeDoGetSuperTableMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta) {
SSTableObj *pTable = (SSTableObj *)pMsg->pTable; SSTableObj *pTable = (SSTableObj *)pMsg->pTable;
pMeta->uid = htobe64(pTable->uid); pMeta->uid = htobe64(pTable->uid);
...@@ -1893,7 +1863,8 @@ static int32_t mnodeDoGetSuperTableMeta(SMnodeMsg *pMsg, STableMetaMsg* pMeta) { ...@@ -1893,7 +1863,8 @@ static int32_t mnodeDoGetSuperTableMeta(SMnodeMsg *pMsg, STableMetaMsg* pMeta) {
static int32_t mnodeGetSuperTableMeta(SMnodeMsg *pMsg) { static int32_t mnodeGetSuperTableMeta(SMnodeMsg *pMsg) {
SSTableObj *pTable = (SSTableObj *)pMsg->pTable; SSTableObj *pTable = (SSTableObj *)pMsg->pTable;
STableMetaMsg *pMeta = rpcMallocCont(sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16)); STableMetaMsg *pMeta =
rpcMallocCont(sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16));
if (pMeta == NULL) { if (pMeta == NULL) {
return TSDB_CODE_MND_OUT_OF_MEMORY; return TSDB_CODE_MND_OUT_OF_MEMORY;
} }
...@@ -1910,7 +1881,7 @@ static int32_t mnodeGetSuperTableMeta(SMnodeMsg *pMsg) { ...@@ -1910,7 +1881,7 @@ static int32_t mnodeGetSuperTableMeta(SMnodeMsg *pMsg) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t doGetVgroupInfoLength(char* name) { static int32_t doGetVgroupInfoLength(char *name) {
SSTableObj *pTable = mnodeGetSuperTable(name); SSTableObj *pTable = mnodeGetSuperTable(name);
int32_t len = 0; int32_t len = 0;
if (pTable != NULL && pTable->vgHash != NULL) { if (pTable != NULL && pTable->vgHash != NULL) {
...@@ -1921,7 +1892,7 @@ static int32_t doGetVgroupInfoLength(char* name) { ...@@ -1921,7 +1892,7 @@ static int32_t doGetVgroupInfoLength(char* name) {
return len; return len;
} }
static int32_t getVgroupInfoLength(SSTableVgroupMsg* pInfo, int32_t numOfTable) { static int32_t getVgroupInfoLength(SSTableVgroupMsg *pInfo, int32_t numOfTable) {
int32_t contLen = sizeof(SSTableVgroupRspMsg) + 32 * sizeof(SVgroupMsg) + sizeof(SVgroupsMsg); int32_t contLen = sizeof(SSTableVgroupRspMsg) + 32 * sizeof(SVgroupMsg) + sizeof(SVgroupsMsg);
for (int32_t i = 0; i < numOfTable; ++i) { for (int32_t i = 0; i < numOfTable; ++i) {
char *stableName = (char *)pInfo + sizeof(SSTableVgroupMsg) + (TSDB_TABLE_FNAME_LEN)*i; char *stableName = (char *)pInfo + sizeof(SSTableVgroupMsg) + (TSDB_TABLE_FNAME_LEN)*i;
...@@ -1931,7 +1902,7 @@ static int32_t getVgroupInfoLength(SSTableVgroupMsg* pInfo, int32_t numOfTable) ...@@ -1931,7 +1902,7 @@ static int32_t getVgroupInfoLength(SSTableVgroupMsg* pInfo, int32_t numOfTable)
return contLen; return contLen;
} }
static char* serializeVgroupInfo(SSTableObj *pTable, char* name, char* msg, SMnodeMsg* pMsgBody, void* handle) { static char *serializeVgroupInfo(SSTableObj *pTable, char *name, char *msg, SMnodeMsg *pMsgBody, void *handle) {
strncpy(msg, name, TSDB_TABLE_FNAME_LEN); strncpy(msg, name, TSDB_TABLE_FNAME_LEN);
msg += TSDB_TABLE_FNAME_LEN; msg += TSDB_TABLE_FNAME_LEN;
...@@ -1946,8 +1917,8 @@ static char* serializeVgroupInfo(SSTableObj *pTable, char* name, char* msg, SMno ...@@ -1946,8 +1917,8 @@ static char* serializeVgroupInfo(SSTableObj *pTable, char* name, char* msg, SMno
msg += sizeof(SVgroupsMsg); msg += sizeof(SVgroupsMsg);
} else { } else {
SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)msg; SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)msg;
mDebug("msg:%p, app:%p stable:%s, hash:%p sizeOfVgList:%d will be returned", pMsgBody, handle, mDebug("msg:%p, app:%p stable:%s, hash:%p sizeOfVgList:%d will be returned", pMsgBody, handle, pTable->info.tableId,
pTable->info.tableId, pTable->vgHash, taosHashGetSize(pTable->vgHash)); pTable->vgHash, taosHashGetSize(pTable->vgHash));
int32_t *pVgId = taosHashIterate(pTable->vgHash, NULL); int32_t *pVgId = taosHashIterate(pTable->vgHash, NULL);
int32_t vgSize = 0; int32_t vgSize = 0;
...@@ -2006,7 +1977,8 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) { ...@@ -2006,7 +1977,8 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) {
SSTableObj *pTable = mnodeGetSuperTable(stableName); SSTableObj *pTable = mnodeGetSuperTable(stableName);
if (pTable == NULL) { if (pTable == NULL) {
mError("msg:%p, app:%p stable:%s, not exist while get stable vgroup info", pMsg, pMsg->rpcMsg.ahandle, stableName); mError("msg:%p, app:%p stable:%s, not exist while get stable vgroup info", pMsg, pMsg->rpcMsg.ahandle,
stableName);
mnodeDecTableRef(pTable); mnodeDecTableRef(pTable);
continue; continue;
} }
...@@ -2032,9 +2004,9 @@ static void mnodeProcessDropSuperTableRsp(SRpcMsg *rpcMsg) { ...@@ -2032,9 +2004,9 @@ static void mnodeProcessDropSuperTableRsp(SRpcMsg *rpcMsg) {
} }
static void *mnodeBuildCreateChildTableMsg(SCMCreateTableMsg *pCreateMsg, SCTableObj *pTable) { static void *mnodeBuildCreateChildTableMsg(SCMCreateTableMsg *pCreateMsg, SCTableObj *pTable) {
SCreateTableMsg* pMsg = (SCreateTableMsg*) ((char*)pCreateMsg + sizeof(SCMCreateTableMsg)); SCreateTableMsg *pMsg = (SCreateTableMsg *)((char *)pCreateMsg + sizeof(SCMCreateTableMsg));
char* tagData = NULL; char *tagData = NULL;
int32_t tagDataLen = 0; int32_t tagDataLen = 0;
int32_t totalCols = 0; int32_t totalCols = 0;
...@@ -2043,10 +2015,10 @@ static void *mnodeBuildCreateChildTableMsg(SCMCreateTableMsg *pCreateMsg, SCTabl ...@@ -2043,10 +2015,10 @@ static void *mnodeBuildCreateChildTableMsg(SCMCreateTableMsg *pCreateMsg, SCTabl
totalCols = pTable->superTable->numOfColumns + pTable->superTable->numOfTags; totalCols = pTable->superTable->numOfColumns + pTable->superTable->numOfTags;
contLen = sizeof(SMDCreateTableMsg) + totalCols * sizeof(SSchema) + pTable->sqlLen; contLen = sizeof(SMDCreateTableMsg) + totalCols * sizeof(SSchema) + pTable->sqlLen;
if (pMsg != NULL) { if (pMsg != NULL) {
int32_t nameLen = htonl(*(int32_t*)pMsg->schema); int32_t nameLen = htonl(*(int32_t *)pMsg->schema);
char* p = pMsg->schema + nameLen + sizeof(int32_t); char *p = pMsg->schema + nameLen + sizeof(int32_t);
tagDataLen = htonl(*(int32_t*) p); tagDataLen = htonl(*(int32_t *)p);
contLen += tagDataLen; contLen += tagDataLen;
tagData = p + sizeof(int32_t); tagData = p + sizeof(int32_t);
...@@ -2088,7 +2060,7 @@ static void *mnodeBuildCreateChildTableMsg(SCMCreateTableMsg *pCreateMsg, SCTabl ...@@ -2088,7 +2060,7 @@ static void *mnodeBuildCreateChildTableMsg(SCMCreateTableMsg *pCreateMsg, SCTabl
pCreate->superTableUid = 0; pCreate->superTableUid = 0;
} }
SSchema *pSchema = (SSchema *) pCreate->data; SSchema *pSchema = (SSchema *)pCreate->data;
if (pTable->info.type == TSDB_CHILD_TABLE) { if (pTable->info.type == TSDB_CHILD_TABLE) {
memcpy(pSchema, pTable->superTable->schema, totalCols * sizeof(SSchema)); memcpy(pSchema, pTable->superTable->schema, totalCols * sizeof(SSchema));
} else { } else {
...@@ -2125,13 +2097,11 @@ static int32_t mnodeDoCreateChildTableFp(SMnodeMsg *pMsg) { ...@@ -2125,13 +2097,11 @@ static int32_t mnodeDoCreateChildTableFp(SMnodeMsg *pMsg) {
} }
SRpcEpSet epSet = mnodeGetEpSetFromVgroup(pMsg->pVgroup); SRpcEpSet epSet = mnodeGetEpSetFromVgroup(pMsg->pVgroup);
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {.ahandle = pMsg,
.ahandle = pMsg,
.pCont = pMDCreate, .pCont = pMDCreate,
.contLen = htonl(pMDCreate->contLen), .contLen = htonl(pMDCreate->contLen),
.code = 0, .code = 0,
.msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE .msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE};
};
dnodeSendMsgToDnode(&epSet, &rpcMsg); dnodeSendMsgToDnode(&epSet, &rpcMsg);
return TSDB_CODE_MND_ACTION_IN_PROGRESS; return TSDB_CODE_MND_ACTION_IN_PROGRESS;
...@@ -2140,7 +2110,7 @@ static int32_t mnodeDoCreateChildTableFp(SMnodeMsg *pMsg) { ...@@ -2140,7 +2110,7 @@ static int32_t mnodeDoCreateChildTableFp(SMnodeMsg *pMsg) {
static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) { static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) {
SCTableObj *pTable = (SCTableObj *)pMsg->pTable; SCTableObj *pTable = (SCTableObj *)pMsg->pTable;
SCreateTableMsg *pCreate = (SCreateTableMsg*) ((char*)pMsg->rpcMsg.pCont + sizeof(SCMCreateTableMsg)); SCreateTableMsg *pCreate = (SCreateTableMsg *)((char *)pMsg->rpcMsg.pCont + sizeof(SCMCreateTableMsg));
assert(pTable); assert(pTable);
monSaveAuditLog((pTable->info.type == TSDB_CHILD_TABLE) ? MON_DDL_CMD_CREATE_CHILD_TABLE : MON_DDL_CMD_CREATE_TABLE, monSaveAuditLog((pTable->info.type == TSDB_CHILD_TABLE) ? MON_DDL_CMD_CREATE_CHILD_TABLE : MON_DDL_CMD_CREATE_TABLE,
...@@ -2198,7 +2168,7 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) { ...@@ -2198,7 +2168,7 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) {
SVgObj *pVgroup = pMsg->pVgroup; SVgObj *pVgroup = pMsg->pVgroup;
SCMCreateTableMsg *p1 = pMsg->rpcMsg.pCont; SCMCreateTableMsg *p1 = pMsg->rpcMsg.pCont;
SCreateTableMsg *pCreate = (SCreateTableMsg*)((char*)p1 + sizeof(SCMCreateTableMsg)); SCreateTableMsg *pCreate = (SCreateTableMsg *)((char *)p1 + sizeof(SCMCreateTableMsg));
SCTableObj *pTable = calloc(1, sizeof(SCTableObj)); SCTableObj *pTable = calloc(1, sizeof(SCTableObj));
if (pTable == NULL) { if (pTable == NULL) {
...@@ -2206,15 +2176,15 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) { ...@@ -2206,15 +2176,15 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) {
return TSDB_CODE_MND_OUT_OF_MEMORY; return TSDB_CODE_MND_OUT_OF_MEMORY;
} }
pTable->info.type = (pCreate->numOfColumns == 0)? TSDB_CHILD_TABLE:TSDB_NORMAL_TABLE; pTable->info.type = (pCreate->numOfColumns == 0) ? TSDB_CHILD_TABLE : TSDB_NORMAL_TABLE;
pTable->info.tableId = strdup(pCreate->tableName); pTable->info.tableId = strdup(pCreate->tableName);
pTable->createdTime = taosGetTimestampMs(); pTable->createdTime = taosGetTimestampMs();
pTable->tid = tid; pTable->tid = tid;
pTable->vgId = pVgroup->vgId; pTable->vgId = pVgroup->vgId;
if (pTable->info.type == TSDB_CHILD_TABLE) { if (pTable->info.type == TSDB_CHILD_TABLE) {
int32_t nameLen = htonl(*(int32_t*) pCreate->schema); int32_t nameLen = htonl(*(int32_t *)pCreate->schema);
char* name = (char*)pCreate->schema + sizeof(int32_t); char *name = (char *)pCreate->schema + sizeof(int32_t);
char stableName[TSDB_TABLE_FNAME_LEN] = {0}; char stableName[TSDB_TABLE_FNAME_LEN] = {0};
memcpy(stableName, name, nameLen); memcpy(stableName, name, nameLen);
...@@ -2252,7 +2222,7 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) { ...@@ -2252,7 +2222,7 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) {
int32_t numOfCols = pTable->numOfColumns; int32_t numOfCols = pTable->numOfColumns;
int32_t schemaSize = numOfCols * sizeof(SSchema); int32_t schemaSize = numOfCols * sizeof(SSchema);
pTable->schema = (SSchema *) calloc(1, schemaSize); pTable->schema = (SSchema *)calloc(1, schemaSize);
if (pTable->schema == NULL) { if (pTable->schema == NULL) {
free(pTable); free(pTable);
return TSDB_CODE_MND_OUT_OF_MEMORY; return TSDB_CODE_MND_OUT_OF_MEMORY;
...@@ -2273,7 +2243,7 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) { ...@@ -2273,7 +2243,7 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) {
free(pTable); free(pTable);
return TSDB_CODE_MND_OUT_OF_MEMORY; return TSDB_CODE_MND_OUT_OF_MEMORY;
} }
memcpy(pTable->sql, (char *) (pCreate->schema) + numOfCols * sizeof(SSchema), pTable->sqlLen); memcpy(pTable->sql, (char *)(pCreate->schema) + numOfCols * sizeof(SSchema), pTable->sqlLen);
pTable->sql[pTable->sqlLen - 1] = 0; pTable->sql[pTable->sqlLen - 1] = 0;
mDebug("msg:%p, app:%p table:%s, stream sql len:%d sql:%s", pMsg, pMsg->rpcMsg.ahandle, pTable->info.tableId, mDebug("msg:%p, app:%p table:%s, stream sql len:%d sql:%s", pMsg, pMsg->rpcMsg.ahandle, pTable->info.tableId,
pTable->sqlLen, pTable->sql); pTable->sqlLen, pTable->sql);
...@@ -2283,13 +2253,11 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) { ...@@ -2283,13 +2253,11 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) {
pMsg->pTable = (STableObj *)pTable; pMsg->pTable = (STableObj *)pTable;
mnodeIncTableRef(pMsg->pTable); mnodeIncTableRef(pMsg->pTable);
SSdbRow desc = { SSdbRow desc = {.type = SDB_OPER_GLOBAL,
.type = SDB_OPER_GLOBAL,
.pObj = pTable, .pObj = pTable,
.pTable = tsChildTableSdb, .pTable = tsChildTableSdb,
.pMsg = pMsg, .pMsg = pMsg,
.fpReq = mnodeDoCreateChildTableFp .fpReq = mnodeDoCreateChildTableFp};
};
int32_t code = sdbInsertRow(&desc); int32_t code = sdbInsertRow(&desc);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
...@@ -2305,9 +2273,58 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) { ...@@ -2305,9 +2273,58 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) {
return code; return code;
} }
static int32_t mnodeProcessMetaSyncCreateChildTableMsg(SMnodeMsg *pMsg, int32_t *tid, int32_t *vgId, uint64_t *uid,
uint64_t *suid) {
SCreateTableMsg *pCreate = (SCreateTableMsg *)((char *)pMsg->rpcMsg.pCont + sizeof(SCMCreateTableMsg));
int32_t code = 0;
// 0.db0._taos_meta_sync_cret_mndtb_taos_vgId.suid.uid.tid
if (strstr(pCreate->tableName, META_SYNC_CRET_MNDTB)) {
char *pTbName = strchr(pCreate->tableName, '.');
if (pTbName && (pTbName = strchr(pTbName + 1, '.'))) {
if (0 == strncmp(META_SYNC_CRET_MNDTB, ++pTbName, META_SYNC_TABLE_NAME_LEN)) {
pTbName += META_SYNC_TABLE_NAME_LEN;
*vgId = atoi(pTbName);
if ((pTbName = strchr(pTbName, '.'))) {
*suid = strtoull(++pTbName, NULL, 10);
if ((pTbName = strchr(pTbName, '.'))) {
*uid = strtoull(++pTbName, NULL, 10);
if ((pTbName = strchr(pTbName, '.'))) {
*tid = atoi(++pTbName);
}
}
}
}
}
if (*tid <= 0 || *uid == (uint64_t)-1 || *vgId <= 0) {
code = TSDB_CODE_MND_INVALID_FORMAT;
mError("msg:%p, app:%p table:%s, failed to create table, reason:%s", pMsg, pMsg->rpcMsg.ahandle,
pCreate->tableName, tstrerror(code));
return code;
}
mInfo("msg:%p, app:%p table:%s, start to create table, vgId:%d suid:%" PRIu64 " uid:%" PRIu64 " tid:%d", pMsg,
pMsg->rpcMsg.ahandle, pCreate->tableName, *vgId, *suid, *uid, *tid);
} else if (strstr(pCreate->tableName, META_SYNC_TABLE_NAME)) {
char *pTbName = strchr(pCreate->tableName, '.');
if (pTbName && (pTbName = strchr(pTbName + 1, '.'))) {
if (0 == strncmp(META_SYNC_TABLE_NAME, ++pTbName, META_SYNC_TABLE_NAME_LEN)) {
*vgId = atoi(pTbName + META_SYNC_TABLE_NAME_LEN);
}
}
if (*vgId <= 0) {
code = TSDB_CODE_MND_INVALID_FORMAT;
mError("msg:%p, app:%p table:%s, failed to create table, reason:%s", pMsg, pMsg->rpcMsg.ahandle,
pCreate->tableName, tstrerror(code));
return code;
}
}
return code;
}
static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) { static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) {
//SCMCreateTableMsg* p1 = pMsg->rpcMsg.pCont; // there are several tables here. // SCMCreateTableMsg* p1 = pMsg->rpcMsg.pCont; // there are several tables here.
SCreateTableMsg* pCreate = (SCreateTableMsg*)((char *)pMsg->rpcMsg.pCont + sizeof(SCMCreateTableMsg)); SCreateTableMsg *pCreate = (SCreateTableMsg *)((char *)pMsg->rpcMsg.pCont + sizeof(SCMCreateTableMsg));
int32_t code = grantCheck(TSDB_GRANT_TIMESERIES); int32_t code = grantCheck(TSDB_GRANT_TIMESERIES);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -2321,14 +2338,12 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) { ...@@ -2321,14 +2338,12 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) {
SVgObj *pVgroup = NULL; SVgObj *pVgroup = NULL;
int32_t tid = 0; int32_t tid = 0;
int32_t vgId = 0; int32_t vgId = 0;
uint64_t uid = -1;
uint64_t suid = -1;
if (tsMetaSyncOption) { if (tsMetaSyncOption) {
char *pTbName = strchr(pCreate->tableName, '.'); code = mnodeProcessMetaSyncCreateChildTableMsg(pMsg, &tid, &vgId, &uid, &suid);
if (pTbName && (pTbName = strchr(pTbName + 1, '.'))) { if (code != TSDB_CODE_SUCCESS) return code;
if (0 == strncmp(META_SYNC_TABLE_NAME, ++pTbName, META_SYNC_TABLE_NAME_LEN)) {
vgId = atoi(pTbName + META_SYNC_TABLE_NAME_LEN);
}
}
} }
code = mnodeGetAvailableVgroup(pMsg, &pVgroup, &tid, vgId); code = mnodeGetAvailableVgroup(pMsg, &pVgroup, &tid, vgId);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -2351,8 +2366,8 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) { ...@@ -2351,8 +2366,8 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) {
} }
if (pMsg->pTable == NULL) { if (pMsg->pTable == NULL) {
mError("msg:%p, app:%p table:%s, object not found, retry:%d reason:%s", pMsg, pMsg->rpcMsg.ahandle, pCreate->tableName, pMsg->retry, mError("msg:%p, app:%p table:%s, object not found, retry:%d reason:%s", pMsg, pMsg->rpcMsg.ahandle,
tstrerror(terrno)); pCreate->tableName, pMsg->retry, tstrerror(terrno));
return terrno; return terrno;
} else { } else {
mDebug("msg:%p, app:%p table:%s, send create msg to vnode again", pMsg, pMsg->rpcMsg.ahandle, pCreate->tableName); mDebug("msg:%p, app:%p table:%s, send create msg to vnode again", pMsg, pMsg->rpcMsg.ahandle, pCreate->tableName);
...@@ -2387,13 +2402,11 @@ static int32_t mnodeSendDropChildTableMsg(SMnodeMsg *pMsg, bool needReturn) { ...@@ -2387,13 +2402,11 @@ static int32_t mnodeSendDropChildTableMsg(SMnodeMsg *pMsg, bool needReturn) {
monSaveAuditLog((pTable->info.type == TSDB_CHILD_TABLE) ? MON_DDL_CMD_DROP_CHILD_TABLE : MON_DDL_CMD_DROP_TABLE, monSaveAuditLog((pTable->info.type == TSDB_CHILD_TABLE) ? MON_DDL_CMD_DROP_CHILD_TABLE : MON_DDL_CMD_DROP_TABLE,
mnodeGetUserFromMsg(pMsg), pTable->info.tableId, true); mnodeGetUserFromMsg(pMsg), pTable->info.tableId, true);
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {.ahandle = pMsg,
.ahandle = pMsg,
.pCont = pDrop, .pCont = pDrop,
.contLen = sizeof(SMDDropTableMsg), .contLen = sizeof(SMDDropTableMsg),
.code = 0, .code = 0,
.msgType = TSDB_MSG_TYPE_MD_DROP_TABLE .msgType = TSDB_MSG_TYPE_MD_DROP_TABLE};
};
if (!needReturn) rpcMsg.ahandle = NULL; if (!needReturn) rpcMsg.ahandle = NULL;
...@@ -2429,12 +2442,7 @@ static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg) { ...@@ -2429,12 +2442,7 @@ static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg) {
} }
SSdbRow row = { SSdbRow row = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL, .pTable = tsChildTableSdb, .pObj = pTable, .pMsg = pMsg, .fpRsp = mnodeDropChildTableCb};
.pTable = tsChildTableSdb,
.pObj = pTable,
.pMsg = pMsg,
.fpRsp = mnodeDropChildTableCb
};
int32_t code = sdbDeleteRow(&row); int32_t code = sdbDeleteRow(&row);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
...@@ -2480,12 +2488,12 @@ static int32_t mnodeProcessMetaSyncDropTableMsg(SMnodeMsg *pMsg) { ...@@ -2480,12 +2488,12 @@ static int32_t mnodeProcessMetaSyncDropTableMsg(SMnodeMsg *pMsg) {
// META_SYNC_DROP_VNDTB: _taos_meta_sync_drop_vndtb_taos_tableType_vgId_uid_tid_tbName; // META_SYNC_DROP_VNDTB: _taos_meta_sync_drop_vndtb_taos_tableType_vgId_uid_tid_tbName;
fStr = strndup(pDrop->name, strlen(pDrop->name)); fStr = strndup(pDrop->name, strlen(pDrop->name));
if(!fStr) { if (!fStr) {
code = TSDB_CODE_MND_OUT_OF_MEMORY; code = TSDB_CODE_MND_OUT_OF_MEMORY;
goto _exit; goto _exit;
} }
fnameList = strsplit(fStr, ".", &fnum); fnameList = strsplit(fStr, ".", &fnum);
if (fnum > 6 && 0 == strncmp(fnameList[2], META_SYNC_DROP_VNDTB, META_SYNC_DROP_TABLE_LEN)) { if (fnum > 6 && 0 == strncmp(fnameList[2], META_SYNC_DROP_VNDTB, META_SYNC_TABLE_NAME_LEN)) {
tableType = atoi(fnameList[3]); tableType = atoi(fnameList[3]);
if (tableType != TSDB_CHILD_TABLE && tableType != TSDB_NORMAL_TABLE) { if (tableType != TSDB_CHILD_TABLE && tableType != TSDB_NORMAL_TABLE) {
code = TSDB_CODE_MND_INVALID_TABLE_NAME; code = TSDB_CODE_MND_INVALID_TABLE_NAME;
...@@ -2557,7 +2565,7 @@ _exit: ...@@ -2557,7 +2565,7 @@ _exit:
} }
static int32_t mnodeFindNormalTableColumnIndex(SCTableObj *pTable, char *colName) { static int32_t mnodeFindNormalTableColumnIndex(SCTableObj *pTable, char *colName) {
SSchema *schema = (SSchema *) pTable->schema; SSchema *schema = (SSchema *)pTable->schema;
for (int32_t col = 0; col < pTable->numOfColumns; col++) { for (int32_t col = 0; col < pTable->numOfColumns; col++) {
if (strcmp(schema[col].name, colName) == 0) { if (strcmp(schema[col].name, colName) == 0) {
return col; return col;
...@@ -2570,8 +2578,8 @@ static int32_t mnodeFindNormalTableColumnIndex(SCTableObj *pTable, char *colName ...@@ -2570,8 +2578,8 @@ static int32_t mnodeFindNormalTableColumnIndex(SCTableObj *pTable, char *colName
static int32_t mnodeAlterNormalTableColumnCb(SMnodeMsg *pMsg, int32_t code) { static int32_t mnodeAlterNormalTableColumnCb(SMnodeMsg *pMsg, int32_t code) {
SCTableObj *pTable = (SCTableObj *)pMsg->pTable; SCTableObj *pTable = (SCTableObj *)pMsg->pTable;
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
mError("msg:%p, app:%p ctable %s, failed to alter column, reason:%s", pMsg, pMsg->rpcMsg.ahandle, pTable->info.tableId, mError("msg:%p, app:%p ctable %s, failed to alter column, reason:%s", pMsg, pMsg->rpcMsg.ahandle,
tstrerror(code)); pTable->info.tableId, tstrerror(code));
return code; return code;
} }
...@@ -2591,13 +2599,11 @@ static int32_t mnodeAlterNormalTableColumnCb(SMnodeMsg *pMsg, int32_t code) { ...@@ -2591,13 +2599,11 @@ static int32_t mnodeAlterNormalTableColumnCb(SMnodeMsg *pMsg, int32_t code) {
} }
SRpcEpSet epSet = mnodeGetEpSetFromVgroup(pMsg->pVgroup); SRpcEpSet epSet = mnodeGetEpSetFromVgroup(pMsg->pVgroup);
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {.ahandle = pMsg,
.ahandle = pMsg,
.pCont = pMDCreate, .pCont = pMDCreate,
.contLen = htonl(pMDCreate->contLen), .contLen = htonl(pMDCreate->contLen),
.code = 0, .code = 0,
.msgType = TSDB_MSG_TYPE_MD_ALTER_TABLE .msgType = TSDB_MSG_TYPE_MD_ALTER_TABLE};
};
mDebug("msg:%p, app:%p ctable %s, send alter column msg to vgId:%d", pMsg, pMsg->rpcMsg.ahandle, pTable->info.tableId, mDebug("msg:%p, app:%p ctable %s, send alter column msg to vgId:%d", pMsg, pMsg->rpcMsg.ahandle, pTable->info.tableId,
pMsg->pVgroup->vgId); pMsg->pVgroup->vgId);
...@@ -2610,7 +2616,8 @@ static int32_t mnodeAddNormalTableColumn(SMnodeMsg *pMsg, SSchema schema[], int3 ...@@ -2610,7 +2616,8 @@ static int32_t mnodeAddNormalTableColumn(SMnodeMsg *pMsg, SSchema schema[], int3
SCTableObj *pTable = (SCTableObj *)pMsg->pTable; SCTableObj *pTable = (SCTableObj *)pMsg->pTable;
SDbObj *pDb = pMsg->pDb; SDbObj *pDb = pMsg->pDb;
if (ncols <= 0) { if (ncols <= 0) {
mError("msg:%p, app:%p ctable:%s, add column, ncols:%d <= 0", pMsg, pMsg->rpcMsg.ahandle, pTable->info.tableId, ncols); mError("msg:%p, app:%p ctable:%s, add column, ncols:%d <= 0", pMsg, pMsg->rpcMsg.ahandle, pTable->info.tableId,
ncols);
monSaveAuditLog(MON_DDL_CMD_ADD_COLUMN, mnodeGetUserFromMsg(pMsg), pTable->info.tableId, false); monSaveAuditLog(MON_DDL_CMD_ADD_COLUMN, mnodeGetUserFromMsg(pMsg), pTable->info.tableId, false);
return TSDB_CODE_MND_APP_ERROR; return TSDB_CODE_MND_APP_ERROR;
} }
...@@ -2629,7 +2636,7 @@ static int32_t mnodeAddNormalTableColumn(SMnodeMsg *pMsg, SSchema schema[], int3 ...@@ -2629,7 +2636,7 @@ static int32_t mnodeAddNormalTableColumn(SMnodeMsg *pMsg, SSchema schema[], int3
memcpy(pTable->schema + pTable->numOfColumns, schema, sizeof(SSchema) * ncols); memcpy(pTable->schema + pTable->numOfColumns, schema, sizeof(SSchema) * ncols);
SSchema *tschema = (SSchema *) (pTable->schema + pTable->numOfColumns); SSchema *tschema = (SSchema *)(pTable->schema + pTable->numOfColumns);
for (int32_t i = 0; i < ncols; i++) { for (int32_t i = 0; i < ncols; i++) {
tschema[i].colId = pTable->nextColId++; tschema[i].colId = pTable->nextColId++;
} }
...@@ -2647,13 +2654,11 @@ static int32_t mnodeAddNormalTableColumn(SMnodeMsg *pMsg, SSchema schema[], int3 ...@@ -2647,13 +2654,11 @@ static int32_t mnodeAddNormalTableColumn(SMnodeMsg *pMsg, SSchema schema[], int3
mInfo("msg:%p, app:%p ctable %s, start to add column", pMsg, pMsg->rpcMsg.ahandle, pTable->info.tableId); mInfo("msg:%p, app:%p ctable %s, start to add column", pMsg, pMsg->rpcMsg.ahandle, pTable->info.tableId);
monSaveAuditLog(MON_DDL_CMD_ADD_COLUMN, mnodeGetUserFromMsg(pMsg), pTable->info.tableId, true); monSaveAuditLog(MON_DDL_CMD_ADD_COLUMN, mnodeGetUserFromMsg(pMsg), pTable->info.tableId, true);
SSdbRow row = { SSdbRow row = {.type = SDB_OPER_GLOBAL,
.type = SDB_OPER_GLOBAL,
.pTable = tsChildTableSdb, .pTable = tsChildTableSdb,
.pObj = pTable, .pObj = pTable,
.pMsg = pMsg, .pMsg = pMsg,
.fpRsp = mnodeAlterNormalTableColumnCb .fpRsp = mnodeAlterNormalTableColumnCb};
};
return sdbUpdateRow(&row); return sdbUpdateRow(&row);
} }
...@@ -2683,45 +2688,40 @@ static int32_t mnodeDropNormalTableColumn(SMnodeMsg *pMsg, char *colName) { ...@@ -2683,45 +2688,40 @@ static int32_t mnodeDropNormalTableColumn(SMnodeMsg *pMsg, char *colName) {
mInfo("msg:%p, app:%p ctable %s, start to drop column %s", pMsg, pMsg->rpcMsg.ahandle, pTable->info.tableId, colName); mInfo("msg:%p, app:%p ctable %s, start to drop column %s", pMsg, pMsg->rpcMsg.ahandle, pTable->info.tableId, colName);
monSaveAuditLog(MON_DDL_CMD_DROP_COLUMN, mnodeGetUserFromMsg(pMsg), pTable->info.tableId, true); monSaveAuditLog(MON_DDL_CMD_DROP_COLUMN, mnodeGetUserFromMsg(pMsg), pTable->info.tableId, true);
SSdbRow row = { SSdbRow row = {.type = SDB_OPER_GLOBAL,
.type = SDB_OPER_GLOBAL,
.pTable = tsChildTableSdb, .pTable = tsChildTableSdb,
.pObj = pTable, .pObj = pTable,
.pMsg = pMsg, .pMsg = pMsg,
.fpRsp = mnodeAlterNormalTableColumnCb .fpRsp = mnodeAlterNormalTableColumnCb};
};
return sdbUpdateRow(&row); return sdbUpdateRow(&row);
} }
static int32_t mnodeChangeNormalTableColumn(SMnodeMsg *pMsg) { static int32_t mnodeChangeNormalTableColumn(SMnodeMsg *pMsg) {
SAlterTableMsg *pAlter = pMsg->rpcMsg.pCont; SAlterTableMsg *pAlter = pMsg->rpcMsg.pCont;
char* name = pAlter->schema[0].name; char *name = pAlter->schema[0].name;
SCTableObj *pTable = (SCTableObj *)pMsg->pTable; SCTableObj *pTable = (SCTableObj *)pMsg->pTable;
int32_t col = mnodeFindNormalTableColumnIndex(pTable, name); int32_t col = mnodeFindNormalTableColumnIndex(pTable, name);
if (col < 0) { if (col < 0) {
mError("msg:%p, app:%p ctable:%s, change column, name: %s", pMsg, pMsg->rpcMsg.ahandle, mError("msg:%p, app:%p ctable:%s, change column, name: %s", pMsg, pMsg->rpcMsg.ahandle, pTable->info.tableId, name);
pTable->info.tableId, name);
monSaveAuditLog(MON_DDL_CMD_MODIFY_COLUMN, mnodeGetUserFromMsg(pMsg), pTable->info.tableId, false); monSaveAuditLog(MON_DDL_CMD_MODIFY_COLUMN, mnodeGetUserFromMsg(pMsg), pTable->info.tableId, false);
return TSDB_CODE_MND_FIELD_NOT_EXIST; return TSDB_CODE_MND_FIELD_NOT_EXIST;
} }
SSchema *schema = (SSchema *) (pTable->schema + col); SSchema *schema = (SSchema *)(pTable->schema + col);
ASSERT(schema->type == TSDB_DATA_TYPE_BINARY || schema->type == TSDB_DATA_TYPE_NCHAR); ASSERT(schema->type == TSDB_DATA_TYPE_BINARY || schema->type == TSDB_DATA_TYPE_NCHAR);
schema->bytes = pAlter->schema[0].bytes; schema->bytes = pAlter->schema[0].bytes;
++pTable->sversion; ++pTable->sversion;
mInfo("msg:%p, app:%p ctable %s, start to modify column %s len to %d", pMsg, pMsg->rpcMsg.ahandle, pTable->info.tableId, mInfo("msg:%p, app:%p ctable %s, start to modify column %s len to %d", pMsg, pMsg->rpcMsg.ahandle,
name, schema->bytes); pTable->info.tableId, name, schema->bytes);
monSaveAuditLog(MON_DDL_CMD_MODIFY_COLUMN, mnodeGetUserFromMsg(pMsg), pTable->info.tableId, true); monSaveAuditLog(MON_DDL_CMD_MODIFY_COLUMN, mnodeGetUserFromMsg(pMsg), pTable->info.tableId, true);
SSdbRow row = { SSdbRow row = {.type = SDB_OPER_GLOBAL,
.type = SDB_OPER_GLOBAL,
.pTable = tsChildTableSdb, .pTable = tsChildTableSdb,
.pObj = pTable, .pObj = pTable,
.pMsg = pMsg, .pMsg = pMsg,
.fpRsp = mnodeAlterNormalTableColumnCb .fpRsp = mnodeAlterNormalTableColumnCb};
};
return sdbUpdateRow(&row); return sdbUpdateRow(&row);
} }
...@@ -2800,15 +2800,15 @@ static int32_t mnodeAutoCreateChildTable(SMnodeMsg *pMsg) { ...@@ -2800,15 +2800,15 @@ static int32_t mnodeAutoCreateChildTable(SMnodeMsg *pMsg) {
return TSDB_CODE_MND_TAG_NOT_EXIST; return TSDB_CODE_MND_TAG_NOT_EXIST;
} }
char* p = pInfo->tags; char *p = pInfo->tags;
int32_t nameLen = htonl(*(int32_t*) p); int32_t nameLen = htonl(*(int32_t *)p);
p += sizeof(int32_t); p += sizeof(int32_t);
p += nameLen; p += nameLen;
int32_t tagLen = htonl(*(int32_t*) p); int32_t tagLen = htonl(*(int32_t *)p);
p += sizeof(int32_t); p += sizeof(int32_t);
int32_t totalLen = nameLen + tagLen + sizeof(int32_t)*2; int32_t totalLen = nameLen + tagLen + sizeof(int32_t) * 2;
if (tagLen == 0 || nameLen == 0) { if (tagLen == 0 || nameLen == 0) {
mError("msg:%p, app:%p table:%s, failed to create table on demand for super table is empty, tagLen:%d", pMsg, mError("msg:%p, app:%p table:%s, failed to create table on demand for super table is empty, tagLen:%d", pMsg,
pMsg->rpcMsg.ahandle, pInfo->tableFname, tagLen); pMsg->rpcMsg.ahandle, pInfo->tableFname, tagLen);
...@@ -2823,7 +2823,7 @@ static int32_t mnodeAutoCreateChildTable(SMnodeMsg *pMsg) { ...@@ -2823,7 +2823,7 @@ static int32_t mnodeAutoCreateChildTable(SMnodeMsg *pMsg) {
return TSDB_CODE_MND_OUT_OF_MEMORY; return TSDB_CODE_MND_OUT_OF_MEMORY;
} }
SCreateTableMsg* pCreate = (SCreateTableMsg*) ((char*) pCreateMsg + sizeof(SCMCreateTableMsg)); SCreateTableMsg *pCreate = (SCreateTableMsg *)((char *)pCreateMsg + sizeof(SCMCreateTableMsg));
size_t size = tListLen(pInfo->tableFname); size_t size = tListLen(pInfo->tableFname);
tstrncpy(pCreate->tableName, pInfo->tableFname, size); tstrncpy(pCreate->tableName, pInfo->tableFname, size);
...@@ -2870,7 +2870,7 @@ static int32_t mnodeGetChildTableMeta(SMnodeMsg *pMsg) { ...@@ -2870,7 +2870,7 @@ static int32_t mnodeGetChildTableMeta(SMnodeMsg *pMsg) {
} }
void mnodeDropAllChildTablesInVgroups(SVgObj *pVgroup) { void mnodeDropAllChildTablesInVgroups(SVgObj *pVgroup) {
void * pIter = NULL; void *pIter = NULL;
int32_t numOfTables = 0; int32_t numOfTables = 0;
SCTableObj *pTable = NULL; SCTableObj *pTable = NULL;
...@@ -2896,7 +2896,7 @@ void mnodeDropAllChildTablesInVgroups(SVgObj *pVgroup) { ...@@ -2896,7 +2896,7 @@ void mnodeDropAllChildTablesInVgroups(SVgObj *pVgroup) {
} }
void mnodeDropAllChildTables(SDbObj *pDropDb) { void mnodeDropAllChildTables(SDbObj *pDropDb) {
void * pIter = NULL; void *pIter = NULL;
int32_t numOfTables = 0; int32_t numOfTables = 0;
SCTableObj *pTable = NULL; SCTableObj *pTable = NULL;
...@@ -2927,7 +2927,7 @@ void mnodeDropAllChildTables(SDbObj *pDropDb) { ...@@ -2927,7 +2927,7 @@ void mnodeDropAllChildTables(SDbObj *pDropDb) {
} }
static void mnodeDropAllChildTablesInStable(SSTableObj *pStable) { static void mnodeDropAllChildTablesInStable(SSTableObj *pStable) {
void * pIter = NULL; void *pIter = NULL;
int32_t numOfTables = 0; int32_t numOfTables = 0;
SCTableObj *pTable = NULL; SCTableObj *pTable = NULL;
...@@ -3048,7 +3048,9 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) { ...@@ -3048,7 +3048,9 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
// If the table is deleted by another thread during creation, stop creating and send drop msg to vnode // If the table is deleted by another thread during creation, stop creating and send drop msg to vnode
if (sdbCheckRowDeleted(tsChildTableSdb, pTable)) { if (sdbCheckRowDeleted(tsChildTableSdb, pTable)) {
mDebug("msg:%p, app:%p table:%s, create table rsp received, but a deleting opertion incoming, vgId:%d sid:%d uid:%" PRIu64, mDebug(
"msg:%p, app:%p table:%s, create table rsp received, but a deleting opertion incoming, vgId:%d sid:%d "
"uid:%" PRIu64,
pMsg, pMsg->rpcMsg.ahandle, pTable->info.tableId, pTable->vgId, pTable->tid, pTable->uid); pMsg, pMsg->rpcMsg.ahandle, pTable->info.tableId, pTable->vgId, pTable->tid, pTable->uid);
// if the vgroup is already dropped from hash, it can't be accquired by pTable->vgId // if the vgroup is already dropped from hash, it can't be accquired by pTable->vgId
...@@ -3078,13 +3080,11 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) { ...@@ -3078,13 +3080,11 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
} }
if (rpcMsg->code == TSDB_CODE_SUCCESS || rpcMsg->code == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) { if (rpcMsg->code == TSDB_CODE_SUCCESS || rpcMsg->code == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
SSdbRow desc = { SSdbRow desc = {.type = SDB_OPER_GLOBAL,
.type = SDB_OPER_GLOBAL,
.pObj = pTable, .pObj = pTable,
.pTable = tsChildTableSdb, .pTable = tsChildTableSdb,
.pMsg = pMsg, .pMsg = pMsg,
.fpRsp = mnodeDoCreateChildTableCb .fpRsp = mnodeDoCreateChildTableCb};
};
int32_t code = sdbInsertRowToQueue(&desc); int32_t code = sdbInsertRowToQueue(&desc);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
...@@ -3094,8 +3094,7 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) { ...@@ -3094,8 +3094,7 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
if (pMsg->pBatchMasterMsg) { if (pMsg->pBatchMasterMsg) {
++pMsg->pBatchMasterMsg->received; ++pMsg->pBatchMasterMsg->received;
pMsg->pBatchMasterMsg->code = code; pMsg->pBatchMasterMsg->code = code;
if (pMsg->pBatchMasterMsg->successed + pMsg->pBatchMasterMsg->received if (pMsg->pBatchMasterMsg->successed + pMsg->pBatchMasterMsg->received >= pMsg->pBatchMasterMsg->expected) {
>= pMsg->pBatchMasterMsg->expected) {
dnodeSendRpcMWriteRsp(pMsg->pBatchMasterMsg, code); dnodeSendRpcMWriteRsp(pMsg->pBatchMasterMsg, code);
} }
...@@ -3126,15 +3125,14 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) { ...@@ -3126,15 +3125,14 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
sdbDeleteRow(&row); sdbDeleteRow(&row);
if (rpcMsg->code == TSDB_CODE_APP_NOT_READY) { if (rpcMsg->code == TSDB_CODE_APP_NOT_READY) {
//Avoid retry again in client // Avoid retry again in client
rpcMsg->code = TSDB_CODE_MND_VGROUP_NOT_READY; rpcMsg->code = TSDB_CODE_MND_VGROUP_NOT_READY;
} }
if (pMsg->pBatchMasterMsg) { if (pMsg->pBatchMasterMsg) {
++pMsg->pBatchMasterMsg->received; ++pMsg->pBatchMasterMsg->received;
pMsg->pBatchMasterMsg->code = rpcMsg->code; pMsg->pBatchMasterMsg->code = rpcMsg->code;
if (pMsg->pBatchMasterMsg->successed + pMsg->pBatchMasterMsg->received if (pMsg->pBatchMasterMsg->successed + pMsg->pBatchMasterMsg->received >= pMsg->pBatchMasterMsg->expected) {
>= pMsg->pBatchMasterMsg->expected) {
dnodeSendRpcMWriteRsp(pMsg->pBatchMasterMsg, rpcMsg->code); dnodeSendRpcMWriteRsp(pMsg->pBatchMasterMsg, rpcMsg->code);
} }
...@@ -3177,7 +3175,8 @@ static void mnodeProcessAlterTableRsp(SRpcMsg *rpcMsg) { ...@@ -3177,7 +3175,8 @@ static void mnodeProcessAlterTableRsp(SRpcMsg *rpcMsg) {
} }
} }
static SMultiTableMeta* ensureMsgBufferSpace(SMultiTableMeta *pMultiMeta, SArray* pList, int32_t* totalMallocLen, int32_t numOfVgroupList) { static SMultiTableMeta *ensureMsgBufferSpace(SMultiTableMeta *pMultiMeta, SArray *pList, int32_t *totalMallocLen,
int32_t numOfVgroupList) {
int32_t len = 0; int32_t len = 0;
for (int32_t i = 0; i < numOfVgroupList; ++i) { for (int32_t i = 0; i < numOfVgroupList; ++i) {
char *name = taosArrayGetP(pList, i); char *name = taosArrayGetP(pList, i);
...@@ -3189,7 +3188,7 @@ static SMultiTableMeta* ensureMsgBufferSpace(SMultiTableMeta *pMultiMeta, SArray ...@@ -3189,7 +3188,7 @@ static SMultiTableMeta* ensureMsgBufferSpace(SMultiTableMeta *pMultiMeta, SArray
(*totalMallocLen) *= 2; (*totalMallocLen) *= 2;
} }
SMultiTableMeta* pMultiMeta1 = realloc(pMultiMeta, *totalMallocLen); SMultiTableMeta *pMultiMeta1 = realloc(pMultiMeta, *totalMallocLen);
if (pMultiMeta1 == NULL) { if (pMultiMeta1 == NULL) {
return NULL; return NULL;
} }
...@@ -3210,9 +3209,9 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) { ...@@ -3210,9 +3209,9 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
int32_t num = 0; int32_t num = 0;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
char* str = strndup(pInfo->tableNames, contLen); char *str = strndup(pInfo->tableNames, contLen);
char** nameList = strsplit(str, "`", &num); char **nameList = strsplit(str, "`", &num);
SArray* pList = taosArrayInit(4, POINTER_BYTES); SArray *pList = taosArrayInit(4, POINTER_BYTES);
SMultiTableMeta *pMultiMeta = NULL; SMultiTableMeta *pMultiMeta = NULL;
if (num != pInfo->numOfTables + pInfo->numOfVgroups + pInfo->numOfUdfs) { if (num != pInfo->numOfTables + pInfo->numOfVgroups + pInfo->numOfUdfs) {
...@@ -3222,7 +3221,8 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) { ...@@ -3222,7 +3221,8 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
} }
// first malloc 80KB, subsequent reallocation will expand the size as twice of the original size // first malloc 80KB, subsequent reallocation will expand the size as twice of the original size
int32_t totalMallocLen = sizeof(SMultiTableMeta) + sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16); int32_t totalMallocLen =
sizeof(SMultiTableMeta) + sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16);
pMultiMeta = calloc(1, totalMallocLen); pMultiMeta = calloc(1, totalMallocLen);
if (pMultiMeta == NULL) { if (pMultiMeta == NULL) {
code = TSDB_CODE_MND_OUT_OF_MEMORY; code = TSDB_CODE_MND_OUT_OF_MEMORY;
...@@ -3238,7 +3238,8 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) { ...@@ -3238,7 +3238,8 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
pMsg->pTable = mnodeGetTable(fullName); pMsg->pTable = mnodeGetTable(fullName);
if (pMsg->pTable == NULL) { if (pMsg->pTable == NULL) {
mError("msg:%p, app:%p table:%s, failed to get table meta, table not exist", pMsg, pMsg->rpcMsg.ahandle, fullName); mError("msg:%p, app:%p table:%s, failed to get table meta, table not exist", pMsg, pMsg->rpcMsg.ahandle,
fullName);
code = TSDB_CODE_MND_INVALID_TABLE_NAME; code = TSDB_CODE_MND_INVALID_TABLE_NAME;
goto _end; goto _end;
} }
...@@ -3264,7 +3265,7 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) { ...@@ -3264,7 +3265,7 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
} }
} }
STableMetaMsg *pMeta = (STableMetaMsg *)((char*) pMultiMeta + pMultiMeta->contLen); STableMetaMsg *pMeta = (STableMetaMsg *)((char *)pMultiMeta + pMultiMeta->contLen);
if (pMsg->pTable->type == TSDB_SUPER_TABLE) { if (pMsg->pTable->type == TSDB_SUPER_TABLE) {
code = mnodeDoGetSuperTableMeta(pMsg, pMeta); code = mnodeDoGetSuperTableMeta(pMsg, pMeta);
...@@ -3286,19 +3287,19 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) { ...@@ -3286,19 +3287,19 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
} else { } else {
// ignore error and continue. // ignore error and continue.
// Otherwise the client may found that the responding message is inconsistent. // Otherwise the client may found that the responding message is inconsistent.
// goto _end; // goto _end;
} }
} }
int32_t tableNum = pInfo->numOfTables + pInfo->numOfVgroups; int32_t tableNum = pInfo->numOfTables + pInfo->numOfVgroups;
// add the additional super table names that needs the vgroup info // add the additional super table names that needs the vgroup info
for(;t < tableNum; ++t) { for (; t < tableNum; ++t) {
taosArrayPush(pList, &nameList[t]); taosArrayPush(pList, &nameList[t]);
} }
// add the pVgroupList into the pList // add the pVgroupList into the pList
int32_t numOfVgroupList = (int32_t) taosArrayGetSize(pList); int32_t numOfVgroupList = (int32_t)taosArrayGetSize(pList);
pMultiMeta->numOfVgroup = htonl(numOfVgroupList); pMultiMeta->numOfVgroup = htonl(numOfVgroupList);
pMultiMeta = ensureMsgBufferSpace(pMultiMeta, pList, &totalMallocLen, numOfVgroupList); pMultiMeta = ensureMsgBufferSpace(pMultiMeta, pList, &totalMallocLen, numOfVgroupList);
...@@ -3307,9 +3308,9 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) { ...@@ -3307,9 +3308,9 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
goto _end; goto _end;
} }
char* msg = (char*) pMultiMeta + pMultiMeta->contLen; char *msg = (char *)pMultiMeta + pMultiMeta->contLen;
for(int32_t i = 0; i < numOfVgroupList; ++i) { for (int32_t i = 0; i < numOfVgroupList; ++i) {
char* name = taosArrayGetP(pList, i); char *name = taosArrayGetP(pList, i);
SSTableObj *pTable = mnodeGetSuperTable(name); SSTableObj *pTable = mnodeGetSuperTable(name);
if (pTable == NULL) { if (pTable == NULL) {
...@@ -3321,23 +3322,23 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) { ...@@ -3321,23 +3322,23 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
msg = serializeVgroupInfo(pTable, name, msg, pMsg, pMsg->rpcMsg.ahandle); msg = serializeVgroupInfo(pTable, name, msg, pMsg, pMsg->rpcMsg.ahandle);
} }
pMultiMeta->contLen = (int32_t) (msg - (char*) pMultiMeta); pMultiMeta->contLen = (int32_t)(msg - (char *)pMultiMeta);
pMultiMeta->numOfTables = htonl(pMultiMeta->numOfTables); pMultiMeta->numOfTables = htonl(pMultiMeta->numOfTables);
// add the user-defined-function information // add the user-defined-function information
for(int32_t i = 0; i < pInfo->numOfUdfs; ++i, ++t) { for (int32_t i = 0; i < pInfo->numOfUdfs; ++i, ++t) {
char buf[TSDB_FUNC_NAME_LEN] = {0}; char buf[TSDB_FUNC_NAME_LEN] = {0};
tstrncpy(buf, nameList[t], TSDB_FUNC_NAME_LEN); tstrncpy(buf, nameList[t], TSDB_FUNC_NAME_LEN);
SFuncObj* pFuncObj = mnodeGetFunc(buf); SFuncObj *pFuncObj = mnodeGetFunc(buf);
if (pFuncObj == NULL) { if (pFuncObj == NULL) {
mError("function %s does not exist", buf); mError("function %s does not exist", buf);
code = TSDB_CODE_MND_INVALID_FUNC; code = TSDB_CODE_MND_INVALID_FUNC;
goto _end; goto _end;
} }
SFunctionInfoMsg* pFuncInfo = (SFunctionInfoMsg*) msg; SFunctionInfoMsg *pFuncInfo = (SFunctionInfoMsg *)msg;
strcpy(pFuncInfo->name, buf); strcpy(pFuncInfo->name, buf);
pFuncInfo->len = htonl(pFuncObj->contLen); pFuncInfo->len = htonl(pFuncObj->contLen);
...@@ -3351,14 +3352,14 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) { ...@@ -3351,14 +3352,14 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
msg += sizeof(SFunctionInfoMsg) + pFuncObj->contLen; msg += sizeof(SFunctionInfoMsg) + pFuncObj->contLen;
} }
pMultiMeta->contLen = (int32_t) (msg - (char*) pMultiMeta); pMultiMeta->contLen = (int32_t)(msg - (char *)pMultiMeta);
pMultiMeta->numOfUdf = htonl(pInfo->numOfUdfs); pMultiMeta->numOfUdf = htonl(pInfo->numOfUdfs);
pMsg->rpcRsp.rsp = pMultiMeta; pMsg->rpcRsp.rsp = pMultiMeta;
pMsg->rpcRsp.len = pMultiMeta->contLen; pMsg->rpcRsp.len = pMultiMeta->contLen;
code = TSDB_CODE_SUCCESS; code = TSDB_CODE_SUCCESS;
char* tmp = rpcMallocCont(pMultiMeta->contLen + 2); char *tmp = rpcMallocCont(pMultiMeta->contLen + 2);
if (tmp == NULL) { if (tmp == NULL) {
code = TSDB_CODE_MND_OUT_OF_MEMORY; code = TSDB_CODE_MND_OUT_OF_MEMORY;
goto _end; goto _end;
...@@ -3384,11 +3385,11 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) { ...@@ -3384,11 +3385,11 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
pMsg->rpcRsp.rsp = tmp; pMsg->rpcRsp.rsp = tmp;
pMsg->rpcRsp.len = pMultiMeta->contLen; pMsg->rpcRsp.len = pMultiMeta->contLen;
SMultiTableMeta* p = (SMultiTableMeta*) tmp; SMultiTableMeta *p = (SMultiTableMeta *)tmp;
mDebug("multiTable info build completed, original:%d, compressed:%d, comp:%d", p->rawLen, p->contLen, p->compressed); mDebug("multiTable info build completed, original:%d, compressed:%d, comp:%d", p->rawLen, p->contLen, p->compressed);
_end: _end:
tfree(str); tfree(str);
tfree(nameList); tfree(nameList);
taosArrayDestroy(&pList); taosArrayDestroy(&pList);
...@@ -3412,7 +3413,7 @@ static int32_t mnodeGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void ...@@ -3412,7 +3413,7 @@ static int32_t mnodeGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void
int32_t cols = 0; int32_t cols = 0;
SSchema *pSchema = pMeta->schema; SSchema *pSchema = pMeta->schema;
SSchema* s = tGetTbnameColumnSchema(); SSchema *s = tGetTbnameColumnSchema();
pShow->bytes[cols] = s->bytes; pShow->bytes[cols] = s->bytes;
pSchema[cols].type = s->type; pSchema[cols].type = s->type;
strcpy(pSchema[cols].name, "table_name"); strcpy(pSchema[cols].name, "table_name");
...@@ -3431,7 +3432,7 @@ static int32_t mnodeGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void ...@@ -3431,7 +3432,7 @@ static int32_t mnodeGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
SSchema* tbCol = tGetTbnameColumnSchema(); SSchema *tbCol = tGetTbnameColumnSchema();
pShow->bytes[cols] = tbCol->bytes + VARSTR_HEADER_SIZE; pShow->bytes[cols] = tbCol->bytes + VARSTR_HEADER_SIZE;
pSchema[cols].type = tbCol->type; pSchema[cols].type = tbCol->type;
strcpy(pSchema[cols].name, "stable_name"); strcpy(pSchema[cols].name, "stable_name");
...@@ -3456,7 +3457,6 @@ static int32_t mnodeGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void ...@@ -3456,7 +3457,6 @@ static int32_t mnodeGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
pMeta->numOfColumns = htons(cols); pMeta->numOfColumns = htons(cols);
pShow->numOfColumns = cols; pShow->numOfColumns = cols;
...@@ -3490,7 +3490,7 @@ static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows ...@@ -3490,7 +3490,7 @@ static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows
char prefix[64] = {0}; char prefix[64] = {0};
int32_t prefixLen = (int32_t)tableIdPrefix(pDb->name, prefix, 64); int32_t prefixLen = (int32_t)tableIdPrefix(pDb->name, prefix, 64);
char* pattern = mnodeGetTableShowPattern(pShow); char *pattern = mnodeGetTableShowPattern(pShow);
if (pShow->payloadLen > 0 && pattern == NULL) { if (pShow->payloadLen > 0 && pattern == NULL) {
return 0; return 0;
} }
...@@ -3522,7 +3522,7 @@ static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows ...@@ -3522,7 +3522,7 @@ static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *) pWrite = pTable->createdTime; *(int64_t *)pWrite = pTable->createdTime;
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
...@@ -3546,18 +3546,17 @@ static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows ...@@ -3546,18 +3546,17 @@ static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows
// uid // uid
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t*) pWrite = pTable->uid; *(int64_t *)pWrite = pTable->uid;
cols++; cols++;
// tid // tid
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t*) pWrite = pTable->tid; *(int32_t *)pWrite = pTable->tid;
cols++; cols++;
//vgid // vgid
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t*) pWrite = pTable->vgId; *(int32_t *)pWrite = pTable->vgId;
cols++; cols++;
numOfRows++; numOfRows++;
...@@ -3580,7 +3579,8 @@ static int32_t mnodeProcessAlterTableMsg(SMnodeMsg *pMsg) { ...@@ -3580,7 +3579,8 @@ static int32_t mnodeProcessAlterTableMsg(SMnodeMsg *pMsg) {
if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDbByTableName(pAlter->tableFname); if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDbByTableName(pAlter->tableFname);
if (pMsg->pDb == NULL) { if (pMsg->pDb == NULL) {
mError("msg:%p, app:%p table:%s, failed to alter table, db not selected", pMsg, pMsg->rpcMsg.ahandle, pAlter->tableFname); mError("msg:%p, app:%p table:%s, failed to alter table, db not selected", pMsg, pMsg->rpcMsg.ahandle,
pAlter->tableFname);
return TSDB_CODE_MND_DB_NOT_SELECTED; return TSDB_CODE_MND_DB_NOT_SELECTED;
} }
...@@ -3598,7 +3598,8 @@ static int32_t mnodeProcessAlterTableMsg(SMnodeMsg *pMsg) { ...@@ -3598,7 +3598,8 @@ static int32_t mnodeProcessAlterTableMsg(SMnodeMsg *pMsg) {
if (pMsg->pTable == NULL) pMsg->pTable = mnodeGetTable(pAlter->tableFname); if (pMsg->pTable == NULL) pMsg->pTable = mnodeGetTable(pAlter->tableFname);
if (pMsg->pTable == NULL) { if (pMsg->pTable == NULL) {
mError("msg:%p, app:%p table:%s, failed to alter table, table not exist", pMsg, pMsg->rpcMsg.ahandle, pAlter->tableFname); mError("msg:%p, app:%p table:%s, failed to alter table, table not exist", pMsg, pMsg->rpcMsg.ahandle,
pAlter->tableFname);
return TSDB_CODE_MND_INVALID_TABLE_NAME; return TSDB_CODE_MND_INVALID_TABLE_NAME;
} }
...@@ -3665,7 +3666,7 @@ static int32_t mnodeGetStreamTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, vo ...@@ -3665,7 +3666,7 @@ static int32_t mnodeGetStreamTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, vo
int32_t cols = 0; int32_t cols = 0;
SSchema *pSchema = pMeta->schema; SSchema *pSchema = pMeta->schema;
SSchema* tbnameColSchema = tGetTbnameColumnSchema(); SSchema *tbnameColSchema = tGetTbnameColumnSchema();
pShow->bytes[cols] = tbnameColSchema->bytes; pShow->bytes[cols] = tbnameColSchema->bytes;
pSchema[cols].type = tbnameColSchema->type; pSchema[cols].type = tbnameColSchema->type;
strcpy(pSchema[cols].name, "table_name"); strcpy(pSchema[cols].name, "table_name");
...@@ -3724,7 +3725,7 @@ static int32_t mnodeRetrieveStreamTables(SShowObj *pShow, char *data, int32_t ro ...@@ -3724,7 +3725,7 @@ static int32_t mnodeRetrieveStreamTables(SShowObj *pShow, char *data, int32_t ro
strcat(prefix, TS_PATH_DELIMITER); strcat(prefix, TS_PATH_DELIMITER);
int32_t prefixLen = (int32_t)strlen(prefix); int32_t prefixLen = (int32_t)strlen(prefix);
char* pattern = mnodeGetTableShowPattern(pShow); char *pattern = mnodeGetTableShowPattern(pShow);
if (pShow->payloadLen > 0 && pattern == NULL) { if (pShow->payloadLen > 0 && pattern == NULL) {
return 0; return 0;
} }
...@@ -3757,7 +3758,7 @@ static int32_t mnodeRetrieveStreamTables(SShowObj *pShow, char *data, int32_t ro ...@@ -3757,7 +3758,7 @@ static int32_t mnodeRetrieveStreamTables(SShowObj *pShow, char *data, int32_t ro
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *) pWrite = pTable->createdTime; *(int64_t *)pWrite = pTable->createdTime;
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
...@@ -3799,7 +3800,7 @@ static int32_t mnodeCompactSuperTables() { ...@@ -3799,7 +3800,7 @@ static int32_t mnodeCompactSuperTables() {
.rowSize = sizeof(SSTableObj) + schemaSize, .rowSize = sizeof(SSTableObj) + schemaSize,
}; };
//mInfo("compact super %" PRIu64, pTable->uid); // mInfo("compact super %" PRIu64, pTable->uid);
sdbInsertCompactRow(&row); sdbInsertCompactRow(&row);
} }
...@@ -3825,7 +3826,7 @@ static int32_t mnodeCompactChildTables() { ...@@ -3825,7 +3826,7 @@ static int32_t mnodeCompactChildTables() {
.pTable = tsChildTableSdb, .pTable = tsChildTableSdb,
}; };
//mInfo("compact child %" PRIu64 ":%d", pTable->uid, pTable->tid); // mInfo("compact child %" PRIu64 ":%d", pTable->uid, pTable->tid);
sdbInsertCompactRow(&row); sdbInsertCompactRow(&row);
} }
......
...@@ -459,7 +459,9 @@ int32_t mnodeGetAvailableVgroup(SMnodeMsg *pMsg, SVgObj **ppVgroup, int32_t *pSi ...@@ -459,7 +459,9 @@ int32_t mnodeGetAvailableVgroup(SMnodeMsg *pMsg, SVgObj **ppVgroup, int32_t *pSi
continue; continue;
} }
int32_t sid = taosAllocateId(pVgroup->idPool); int32_t sid = 0;
if (*pSid <= 0) {
sid = taosAllocateId(pVgroup->idPool);
if (sid <= 0) { if (sid <= 0) {
int curMaxId = taosIdPoolMaxSize(pVgroup->idPool); int curMaxId = taosIdPoolMaxSize(pVgroup->idPool);
if ((taosUpdateIdPool(pVgroup->idPool, curMaxId + 1) < 0) || ((sid = taosAllocateId(pVgroup->idPool)) <= 0)) { if ((taosUpdateIdPool(pVgroup->idPool, curMaxId + 1) < 0) || ((sid = taosAllocateId(pVgroup->idPool)) <= 0)) {
...@@ -469,6 +471,11 @@ int32_t mnodeGetAvailableVgroup(SMnodeMsg *pMsg, SVgObj **ppVgroup, int32_t *pSi ...@@ -469,6 +471,11 @@ int32_t mnodeGetAvailableVgroup(SMnodeMsg *pMsg, SVgObj **ppVgroup, int32_t *pSi
return TSDB_CODE_MND_APP_ERROR; return TSDB_CODE_MND_APP_ERROR;
} }
} }
} else {
}
mDebug("vgId:%d, alloc tid:%d", pVgroup->vgId, sid); mDebug("vgId:%d, alloc tid:%d", pVgroup->vgId, sid);
*pSid = sid; *pSid = sid;
......
...@@ -220,9 +220,9 @@ int tsdbDumpTables(STsdbRepo *pRepo, uint64_t qId) { ...@@ -220,9 +220,9 @@ int tsdbDumpTables(STsdbRepo *pRepo, uint64_t qId) {
if (pMeta->tables[i] != NULL) { if (pMeta->tables[i] != NULL) {
STable *pTable = pMeta->tables[i]; STable *pTable = pMeta->tables[i];
// keep the output format // keep the output format
tsdbInfo("vgId:%d QID:%" PRIu64 " stb:%s %s:%s tid:%d uid:%" PRIu64, REPO_ID(pRepo), qId, tsdbInfo("vgId:%d, type:%d stb:%s suid:%" PRIu64 " %s:%s tid:%d uid:%" PRIu64, REPO_ID(pRepo), pTable->type,
pTable->pSuper ? pTable->pSuper->name->data : "", "msynctbn", pTable->name->data, pTable->tableId.tid, pTable->pSuper ? pTable->pSuper->name->data : "", pTable->suid, "msynctbn", pTable->name->data,
pTable->tableId.uid); pTable->tableId.tid, pTable->tableId.uid);
} }
} }
if (tsdbUnlockRepoMeta(pRepo) < 0) return -1; if (tsdbUnlockRepoMeta(pRepo) < 0) return -1;
......
...@@ -229,6 +229,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC_OPTION, "Invalid topic option" ...@@ -229,6 +229,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC_OPTION, "Invalid topic option"
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC_PARTITONS, "Invalid topic partitons num, valid range: [1, 1000]") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC_PARTITONS, "Invalid topic partitons num, valid range: [1, 1000]")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_ALREADY_EXIST, "Topic already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_ALREADY_EXIST, "Topic already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FORMAT, "Invalid format")
// dnode // dnode
TAOS_DEFINE_ERROR(TSDB_CODE_DND_MSG_NOT_PROCESSED, "Message not processed") TAOS_DEFINE_ERROR(TSDB_CODE_DND_MSG_NOT_PROCESSED, "Message not processed")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_OUT_OF_MEMORY, "Dnode out of memory") TAOS_DEFINE_ERROR(TSDB_CODE_DND_OUT_OF_MEMORY, "Dnode out of memory")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册