“0e6e30bad186f399543d9554308035bd53131f10”上不存在“git@gitcode.net:s920243400/PaddleOCR.git”
提交 37896bf1 编写于 作者: K kailixu

chore: code format

上级 49098547
......@@ -14,39 +14,40 @@
*/
#define _DEFAULT_SOURCE
#include "mnodeTable.h"
#include "dnode.h"
#include "os.h"
#include "taosmsg.h"
#include "tutil.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "tscompression.h"
#include "tname.h"
#include "tidpool.h"
#include "tglobal.h"
#include "tcompare.h"
#include "tdataformat.h"
#include "tgrant.h"
#include "tqueue.h"
#include "hash.h"
#include "mnode.h"
#include "dnode.h"
#include "mnodeDef.h"
#include "mnodeInt.h"
#include "mnodeAcct.h"
#include "mnodeDb.h"
#include "mnodeDef.h"
#include "mnodeDnode.h"
#include "mnodeFunc.h"
#include "mnodeInt.h"
#include "mnodePeer.h"
#include "mnodeRead.h"
#include "mnodeSdb.h"
#include "mnodeShow.h"
#include "mnodeTable.h"
#include "mnodeUser.h"
#include "mnodeVgroup.h"
#include "mnodeWrite.h"
#include "os.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "tcompare.h"
#include "tdataformat.h"
#include "tglobal.h"
#include "tgrant.h"
#include "tidpool.h"
#include "tname.h"
#include "tqueue.h"
#include "tscompression.h"
#include "tutil.h"
#include "mnodeRead.h"
#include "mnodePeer.h"
#include "mnodeFunc.h"
#define ALTER_CTABLE_RETRY_TIMES 3
#define ALTER_CTABLE_RETRY_TIMES 3
#define CREATE_CTABLE_RETRY_TIMES 10
#define CREATE_CTABLE_RETRY_SEC 14
#define CREATE_CTABLE_RETRY_SEC 14
// informal
#define META_SYNC_TABLE_NAME_LEN 32
......@@ -57,9 +58,9 @@
// informal
int64_t tsCTableRid = -1;
static void *tsChildTableSdb;
static void * tsChildTableSdb;
int64_t tsSTableRid = -1;
static void *tsSuperTableSdb;
static void * tsSuperTableSdb;
static SHashObj *tsSTableUidHash;
static int32_t tsChildTableUpdateSize;
static int32_t tsSuperTableUpdateSize;
......@@ -71,12 +72,12 @@ typedef struct {
uint64_t suid;
} SMetaInfo;
static void *mnodeGetChildTable(char *tableId);
static void *mnodeGetSuperTable(char *tableId);
static void *mnodeGetSuperTableByUid(uint64_t uid);
static void mnodeDropAllChildTablesInStable(SSTableObj *pStable);
static void mnodeAddTableIntoStable(SSTableObj *pStable, SCTableObj *pCtable);
static void mnodeRemoveTableFromStable(SSTableObj *pStable, SCTableObj *pCtable);
static void * mnodeGetChildTable(char *tableId);
static void * mnodeGetSuperTable(char *tableId);
static void * mnodeGetSuperTableByUid(uint64_t uid);
static void mnodeDropAllChildTablesInStable(SSTableObj *pStable);
static void mnodeAddTableIntoStable(SSTableObj *pStable, SCTableObj *pCtable);
static void mnodeRemoveTableFromStable(SSTableObj *pStable, SCTableObj *pCtable);
static int32_t mnodeGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *pConn);
......@@ -122,10 +123,10 @@ static void mnodeDestroyChildTable(SCTableObj *pTable) {
tfree(pTable);
}
static char *mnodeGetTableShowPattern(SShowObj *pShow) {
char *pattern = NULL;
static char* mnodeGetTableShowPattern(SShowObj *pShow) {
char* pattern = NULL;
if (pShow != NULL && pShow->payloadLen > 0) {
pattern = (char *)malloc(pShow->payloadLen + 1);
pattern = (char*)malloc(pShow->payloadLen + 1);
if (pattern == NULL) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
......@@ -143,7 +144,7 @@ static int32_t mnodeChildTableActionDestroy(SSdbRow *pRow) {
static int32_t mnodeChildTableActionInsert(SSdbRow *pRow) {
SCTableObj *pTable = pRow->pObj;
int32_t code = 0;
int32_t code = 0;
SVgObj *pVgroup = mnodeGetVgroup(pTable->vgId);
if (pVgroup == NULL) {
......@@ -214,8 +215,8 @@ static int32_t mnodeChildTableActionDelete(SSdbRow *pRow) {
return TSDB_CODE_MND_VGROUP_NOT_EXIST;
}
SVgObj *pVgroup = NULL;
SDbObj *pDb = NULL;
SVgObj *pVgroup = NULL;
SDbObj *pDb = NULL;
SAcctObj *pAcct = NULL;
pVgroup = mnodeGetVgroup(pTable->vgId);
......@@ -285,10 +286,10 @@ static int32_t mnodeChildTableActionUpdate(SSdbRow *pRow) {
SCTableObj *pNew = pRow->pObj;
SCTableObj *pTable = mnodeGetChildTable(pNew->info.tableId);
if (pTable != pNew) {
void *oldTableId = pTable->info.tableId;
void *oldSql = pTable->sql;
void *oldSchema = pTable->schema;
void *oldSTable = pTable->superTable;
void *oldTableId = pTable->info.tableId;
void *oldSql = pTable->sql;
void *oldSchema = pTable->schema;
void *oldSTable = pTable->superTable;
int32_t oldRefCount = pTable->refCount;
if (pTable->info.type == TSDB_NORMAL_TABLE) {
......@@ -448,20 +449,21 @@ static int32_t mnodeInitChildTables() {
SCTableObj tObj;
tsChildTableUpdateSize = (int32_t)((int8_t *)tObj.updateEnd - (int8_t *)&tObj.info.type);
SSdbTableDesc desc = {.id = SDB_TABLE_CTABLE,
.name = "ctables",
.hashSessions = TSDB_DEFAULT_CTABLES_HASH_SIZE,
.maxRowSize = sizeof(SCTableObj) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16) +
TSDB_TABLE_FNAME_LEN + TSDB_CQ_SQL_SIZE,
.refCountPos = (int32_t)((int8_t *)(&tObj.refCount) - (int8_t *)&tObj),
.keyType = SDB_KEY_VAR_STRING,
.fpInsert = mnodeChildTableActionInsert,
.fpDelete = mnodeChildTableActionDelete,
.fpUpdate = mnodeChildTableActionUpdate,
.fpEncode = mnodeChildTableActionEncode,
.fpDecode = mnodeChildTableActionDecode,
.fpDestroy = mnodeChildTableActionDestroy,
.fpRestored = mnodeChildTableActionRestored};
SSdbTableDesc desc = {
.id = SDB_TABLE_CTABLE,
.name = "ctables",
.hashSessions = TSDB_DEFAULT_CTABLES_HASH_SIZE,
.maxRowSize = sizeof(SCTableObj) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16) + TSDB_TABLE_FNAME_LEN + TSDB_CQ_SQL_SIZE,
.refCountPos = (int32_t)((int8_t *)(&tObj.refCount) - (int8_t *)&tObj),
.keyType = SDB_KEY_VAR_STRING,
.fpInsert = mnodeChildTableActionInsert,
.fpDelete = mnodeChildTableActionDelete,
.fpUpdate = mnodeChildTableActionUpdate,
.fpEncode = mnodeChildTableActionEncode,
.fpDecode = mnodeChildTableActionDecode,
.fpDestroy = mnodeChildTableActionDestroy,
.fpRestored = mnodeChildTableActionRestored
};
tsCTableRid = sdbOpenTable(&desc);
tsChildTableSdb = sdbGetTableByRid(tsCTableRid);
......@@ -479,9 +481,13 @@ static void mnodeCleanupChildTables() {
tsChildTableSdb = NULL;
}
int64_t mnodeGetSuperTableNum() { return sdbGetNumOfRows(tsSuperTableSdb); }
int64_t mnodeGetSuperTableNum() {
return sdbGetNumOfRows(tsSuperTableSdb);
}
int64_t mnodeGetChildTableNum() { return sdbGetNumOfRows(tsChildTableSdb); }
int64_t mnodeGetChildTableNum() {
return sdbGetNumOfRows(tsChildTableSdb);
}
static void mnodeAddTableIntoStable(SSTableObj *pStable, SCTableObj *pCtable) {
atomic_add_fetch_32(&pStable->numOfTables, 1);
......@@ -494,8 +500,8 @@ static void mnodeAddTableIntoStable(SSTableObj *pStable, SCTableObj *pCtable) {
if (pStable->vgHash != NULL) {
if (taosHashGet(pStable->vgHash, &pCtable->vgId, sizeof(pCtable->vgId)) == NULL) {
taosHashPut(pStable->vgHash, &pCtable->vgId, sizeof(pCtable->vgId), &pCtable->vgId, sizeof(pCtable->vgId));
mDebug("stable:%s, vgId:%d is put into stable vgId hash:%p, sizeOfVgList:%d", pStable->info.tableId,
pCtable->vgId, pStable->vgHash, taosHashGetSize(pStable->vgHash));
mDebug("stable:%s, vgId:%d is put into stable vgId hash:%p, sizeOfVgList:%d", pStable->info.tableId, pCtable->vgId,
pStable->vgHash, taosHashGetSize(pStable->vgHash));
}
}
}
......@@ -532,7 +538,7 @@ static int32_t mnodeSuperTableActionDestroy(SSdbRow *pRow) {
static int32_t mnodeSuperTableActionInsert(SSdbRow *pRow) {
SSTableObj *pStable = pRow->pObj;
SDbObj *pDb = mnodeGetDbByTableName(pStable->info.tableId);
SDbObj * pDb = mnodeGetDbByTableName(pStable->info.tableId);
if (pDb != NULL && pDb->status == TSDB_DB_STATUS_READY) {
mnodeAddSuperTableIntoDb(pDb);
}
......@@ -546,7 +552,7 @@ static int32_t mnodeSuperTableActionInsert(SSdbRow *pRow) {
static int32_t mnodeSuperTableActionDelete(SSdbRow *pRow) {
SSTableObj *pStable = pRow->pObj;
SDbObj *pDb = mnodeGetDbByTableName(pStable->info.tableId);
SDbObj * pDb = mnodeGetDbByTableName(pStable->info.tableId);
if (pDb != NULL) {
mnodeRemoveSuperTableFromDb(pDb);
mnodeDropAllChildTablesInStable((SSTableObj *)pStable);
......@@ -566,9 +572,9 @@ static int32_t mnodeSuperTableActionUpdate(SSdbRow *pRow) {
mDebug("table:%s, will be updated, hash:%p sizeOfVgList:%d, new hash:%p sizeOfVgList:%d", pTable->info.tableId,
pTable->vgHash, taosHashGetSize(pTable->vgHash), pNew->vgHash, taosHashGetSize(pNew->vgHash));
void *oldTableId = pTable->info.tableId;
void *oldSchema = pTable->schema;
void *oldVgHash = pTable->vgHash;
void *oldTableId = pTable->info.tableId;
void *oldSchema = pTable->schema;
void *oldVgHash = pTable->vgHash;
int32_t oldRefCount = pTable->refCount;
int32_t oldNumOfTables = pTable->numOfTables;
......@@ -618,11 +624,11 @@ static int32_t mnodeSuperTableActionEncode(SSdbRow *pRow) {
static int32_t mnodeSuperTableActionDecode(SSdbRow *pRow) {
assert(pRow->rowData != NULL);
SSTableObj *pStable = (SSTableObj *)calloc(1, sizeof(SSTableObj));
SSTableObj *pStable = (SSTableObj *) calloc(1, sizeof(SSTableObj));
if (pStable == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY;
int32_t len = (int32_t)strlen(pRow->rowData);
if (len >= TSDB_TABLE_FNAME_LEN) {
if (len >= TSDB_TABLE_FNAME_LEN){
free(pStable);
return TSDB_CODE_MND_INVALID_TABLE_ID;
}
......@@ -645,26 +651,29 @@ static int32_t mnodeSuperTableActionDecode(SSdbRow *pRow) {
return TSDB_CODE_SUCCESS;
}
static int32_t mnodeSuperTableActionRestored() { return 0; }
static int32_t mnodeSuperTableActionRestored() {
return 0;
}
static int32_t mnodeInitSuperTables() {
SSTableObj tObj;
tsSuperTableUpdateSize = (int32_t)((int8_t *)tObj.updateEnd - (int8_t *)&tObj.info.type);
SSdbTableDesc desc = {.id = SDB_TABLE_STABLE,
.name = "stables",
.hashSessions = TSDB_DEFAULT_STABLES_HASH_SIZE,
.maxRowSize = sizeof(SSTableObj) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16) +
TSDB_TABLE_FNAME_LEN,
.refCountPos = (int32_t)((int8_t *)(&tObj.refCount) - (int8_t *)&tObj),
.keyType = SDB_KEY_VAR_STRING,
.fpInsert = mnodeSuperTableActionInsert,
.fpDelete = mnodeSuperTableActionDelete,
.fpUpdate = mnodeSuperTableActionUpdate,
.fpEncode = mnodeSuperTableActionEncode,
.fpDecode = mnodeSuperTableActionDecode,
.fpDestroy = mnodeSuperTableActionDestroy,
.fpRestored = mnodeSuperTableActionRestored};
SSdbTableDesc desc = {
.id = SDB_TABLE_STABLE,
.name = "stables",
.hashSessions = TSDB_DEFAULT_STABLES_HASH_SIZE,
.maxRowSize = sizeof(SSTableObj) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16) + TSDB_TABLE_FNAME_LEN,
.refCountPos = (int32_t)((int8_t *)(&tObj.refCount) - (int8_t *)&tObj),
.keyType = SDB_KEY_VAR_STRING,
.fpInsert = mnodeSuperTableActionInsert,
.fpDelete = mnodeSuperTableActionDelete,
.fpUpdate = mnodeSuperTableActionUpdate,
.fpEncode = mnodeSuperTableActionEncode,
.fpDecode = mnodeSuperTableActionDecode,
.fpDestroy = mnodeSuperTableActionDestroy,
.fpRestored = mnodeSuperTableActionRestored
};
tsSTableUidHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
tsSTableRid = sdbOpenTable(&desc);
......@@ -724,9 +733,13 @@ int32_t mnodeInitTables() {
return TSDB_CODE_SUCCESS;
}
static void *mnodeGetChildTable(char *tableId) { return sdbGetRow(tsChildTableSdb, tableId); }
static void *mnodeGetChildTable(char *tableId) {
return sdbGetRow(tsChildTableSdb, tableId);
}
static void *mnodeGetSuperTable(char *tableId) { return sdbGetRow(tsSuperTableSdb, tableId); }
static void *mnodeGetSuperTable(char *tableId) {
return sdbGetRow(tsSuperTableSdb, tableId);
}
static void *mnodeGetSuperTableByUid(uint64_t uid) {
SSTableObj **ppStable = taosHashGet(tsSTableUidHash, &uid, sizeof(int64_t));
......@@ -755,13 +768,17 @@ void *mnodeGetNextChildTable(void *pIter, SCTableObj **pTable) {
return sdbFetchRow(tsChildTableSdb, pIter, (void **)pTable);
}
void mnodeCancelGetNextChildTable(void *pIter) { sdbFreeIter(tsChildTableSdb, pIter); }
void mnodeCancelGetNextChildTable(void *pIter) {
sdbFreeIter(tsChildTableSdb, pIter);
}
void *mnodeGetNextSuperTable(void *pIter, SSTableObj **pTable) {
return sdbFetchRow(tsSuperTableSdb, pIter, (void **)pTable);
}
void mnodeCancelGetNextSuperTable(void *pIter) { sdbFreeIter(tsSuperTableSdb, pIter); }
void mnodeCancelGetNextSuperTable(void *pIter) {
sdbFreeIter(tsSuperTableSdb, pIter);
}
void mnodeIncTableRef(void *p1) {
STableObj *pTable = (STableObj *)p1;
......@@ -789,7 +806,7 @@ void mnodeCleanupTables() {
}
// todo move to name.h, add length of table name
static void mnodeExtractTableName(char *tableId, char *name) {
static void mnodeExtractTableName(char* tableId, char* name) {
int pos = -1;
int num = 0;
for (pos = 0; tableId[pos] != 0; ++pos) {
......@@ -806,7 +823,7 @@ static SMnodeMsg *mnodeCreateSubMsg(SMnodeMsg *pBatchMasterMsg, int32_t contSize
SMnodeMsg *pSubMsg = taosAllocateQitem(sizeof(*pBatchMasterMsg) + contSize);
*pSubMsg = *pBatchMasterMsg;
// pSubMsg->pCont = (char *) pSubMsg + sizeof(SMnodeMsg);
//pSubMsg->pCont = (char *) pSubMsg + sizeof(SMnodeMsg);
pSubMsg->rpcMsg.pCont = pSubMsg->pCont;
pSubMsg->successed = 0;
pSubMsg->expected = 0;
......@@ -837,8 +854,7 @@ static int32_t mnodeValidateCreateTableMsg(SCreateTableMsg *pCreateTable, SMnode
}
if (pMsg->pDb == NULL) {
mError("msg:%p, app:%p table:%s, failed to create, db not selected", pMsg, pMsg->rpcMsg.ahandle,
pCreateTable->tableName);
mError("msg:%p, app:%p table:%s, failed to create, db not selected", pMsg, pMsg->rpcMsg.ahandle, pCreateTable->tableName);
return TSDB_CODE_MND_DB_NOT_SELECTED;
}
......@@ -874,23 +890,22 @@ static int32_t mnodeValidateCreateTableMsg(SCreateTableMsg *pCreateTable, SMnode
}
static int32_t mnodeProcessBatchCreateTableMsg(SMnodeMsg *pMsg) {
if (pMsg->pBatchMasterMsg == NULL) { // batch master first round
if (pMsg->pBatchMasterMsg == NULL) { // batch master first round
pMsg->pBatchMasterMsg = pMsg;
SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont;
int32_t numOfTables = htonl(pCreate->numOfTables);
int32_t contentLen = htonl(pCreate->contLen);
int32_t numOfTables = htonl(pCreate->numOfTables);
int32_t contentLen = htonl(pCreate->contLen);
pMsg->expected = numOfTables;
int32_t code = TSDB_CODE_SUCCESS;
SCreateTableMsg *pCreateTable = (SCreateTableMsg *)((char *)pCreate + sizeof(SCMCreateTableMsg));
for (SCreateTableMsg *p = pCreateTable; p < (SCreateTableMsg *)((char *)pCreate + contentLen);
p = (SCreateTableMsg *)((char *)p + htonl(p->len))) {
int32_t code = TSDB_CODE_SUCCESS;
SCreateTableMsg *pCreateTable = (SCreateTableMsg*) ((char*) pCreate + sizeof(SCMCreateTableMsg));
for (SCreateTableMsg *p = pCreateTable; p < (SCreateTableMsg *) ((char *) pCreate + contentLen); p = (SCreateTableMsg *) ((char *) p + htonl(p->len))) {
SMnodeMsg *pSubMsg = mnodeCreateSubMsg(pMsg, sizeof(SCMCreateTableMsg) + htonl(p->len));
memcpy(pSubMsg->pCont + sizeof(SCMCreateTableMsg), p, htonl(p->len));
code = mnodeValidateCreateTableMsg(p, pSubMsg);
if (code == TSDB_CODE_SUCCESS || (p->igExists == 1 && code == TSDB_CODE_MND_TABLE_ALREADY_EXIST)) {
if (code == TSDB_CODE_SUCCESS || ( p->igExists == 1 && code == TSDB_CODE_MND_TABLE_ALREADY_EXIST )) {
++pSubMsg->pBatchMasterMsg->successed;
mnodeDestroySubMsg(pSubMsg);
continue;
......@@ -908,10 +923,10 @@ static int32_t mnodeProcessBatchCreateTableMsg(SMnodeMsg *pMsg) {
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
} else {
if (pMsg->pBatchMasterMsg != pMsg) { // batch sub replay
if (pMsg->pBatchMasterMsg != pMsg) { // batch sub replay
SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont;
SCreateTableMsg *pCreateTable = (SCreateTableMsg *)((char *)pCreate + sizeof(SCMCreateTableMsg));
int32_t code = mnodeValidateCreateTableMsg(pCreateTable, pMsg);
SCreateTableMsg *pCreateTable = (SCreateTableMsg*) ((char*) pCreate + sizeof(SCMCreateTableMsg));
int32_t code = mnodeValidateCreateTableMsg(pCreateTable, pMsg);
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_MND_TABLE_ALREADY_EXIST) {
++pMsg->pBatchMasterMsg->successed;
mnodeDestroySubMsg(pMsg);
......@@ -923,12 +938,13 @@ static int32_t mnodeProcessBatchCreateTableMsg(SMnodeMsg *pMsg) {
mnodeDestroySubMsg(pMsg);
}
if (pMsg->pBatchMasterMsg->successed + pMsg->pBatchMasterMsg->received >= pMsg->pBatchMasterMsg->expected) {
if (pMsg->pBatchMasterMsg->successed + pMsg->pBatchMasterMsg->received
>= pMsg->pBatchMasterMsg->expected) {
dnodeSendRpcMWriteRsp(pMsg->pBatchMasterMsg, pMsg->pBatchMasterMsg->code);
}
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} else { // batch master replay, reprocess the whole batch
} else { // batch master replay, reprocess the whole batch
assert(0);
return TSDB_CODE_MND_MSG_NOT_PROCESSED;
}
......@@ -949,7 +965,7 @@ static int32_t mnodeProcessCreateTableMsg(SMnodeMsg *pMsg) {
return mnodeProcessBatchCreateTableMsg(pMsg);
}
SCreateTableMsg *p = (SCreateTableMsg *)((char *)pCreate + sizeof(SCMCreateTableMsg));
SCreateTableMsg *p = (SCreateTableMsg*)((char*) pCreate + sizeof(SCMCreateTableMsg));
if (pMsg->pDb == NULL) {
pMsg->pDb = mnodeGetDbByTableName(p->tableName);
}
......@@ -973,8 +989,7 @@ static int32_t mnodeProcessCreateTableMsg(SMnodeMsg *pMsg) {
mDebug("msg:%p, app:%p table:%s, is already exist", pMsg, pMsg->rpcMsg.ahandle, p->tableName);
return TSDB_CODE_SUCCESS;
} else {
mError("msg:%p, app:%p table:%s, failed to create, table already exist", pMsg, pMsg->rpcMsg.ahandle,
p->tableName);
mError("msg:%p, app:%p table:%s, failed to create, table already exist", pMsg, pMsg->rpcMsg.ahandle, p->tableName);
return TSDB_CODE_MND_TABLE_ALREADY_EXIST;
}
}
......@@ -1048,8 +1063,8 @@ static int32_t mnodeProcessDropTableMsg(SMnodeMsg *pMsg) {
static int32_t mnodeProcessTableMetaMsg(SMnodeMsg *pMsg) {
STableInfoMsg *pInfo = pMsg->rpcMsg.pCont;
pInfo->createFlag = htons(pInfo->createFlag);
mDebug("msg:%p, app:%p table:%s, table meta msg is received from thandle:%p, createFlag:%d", pMsg,
pMsg->rpcMsg.ahandle, pInfo->tableFname, pMsg->rpcMsg.handle, pInfo->createFlag);
mDebug("msg:%p, app:%p table:%s, table meta msg is received from thandle:%p, createFlag:%d", pMsg, pMsg->rpcMsg.ahandle,
pInfo->tableFname, pMsg->rpcMsg.handle, pInfo->createFlag);
if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDbByTableName(pInfo->tableFname);
if (pMsg->pDb == NULL) {
......@@ -1089,18 +1104,20 @@ static int32_t mnodeCreateSuperTableCb(SMnodeMsg *pMsg, int32_t code) {
if (code == TSDB_CODE_SUCCESS) {
mLInfo("stable:%s, is created in sdb, uid:%" PRIu64, pTable->info.tableId, pTable->uid);
if (pMsg->pBatchMasterMsg) pMsg->pBatchMasterMsg->successed++;
if(pMsg->pBatchMasterMsg)
pMsg->pBatchMasterMsg->successed ++;
} else {
mError("msg:%p, app:%p stable:%s, failed to create in sdb, reason:%s", pMsg, pMsg->rpcMsg.ahandle,
pTable->info.tableId, tstrerror(code));
mError("msg:%p, app:%p stable:%s, failed to create in sdb, reason:%s", pMsg, pMsg->rpcMsg.ahandle, pTable->info.tableId,
tstrerror(code));
SSdbRow desc = {.type = SDB_OPER_GLOBAL, .pObj = pTable, .pTable = tsSuperTableSdb};
sdbDeleteRow(&desc);
if (pMsg->pBatchMasterMsg) pMsg->pBatchMasterMsg->received++;
if(pMsg->pBatchMasterMsg)
pMsg->pBatchMasterMsg->received ++;
}
monSaveAuditLog(MON_DDL_CMD_CREATE_SUPER_TABLE, mnodeGetUserFromMsg(pMsg), pTable->info.tableId, !code);
// if super table create by batch msg, check done and send finished to client
if (pMsg->pBatchMasterMsg) {
if(pMsg->pBatchMasterMsg) {
if (pMsg->pBatchMasterMsg->successed + pMsg->pBatchMasterMsg->received >= pMsg->pBatchMasterMsg->expected)
dnodeSendRpcMWriteRsp(pMsg->pBatchMasterMsg, code);
}
......@@ -1109,8 +1126,8 @@ static int32_t mnodeCreateSuperTableCb(SMnodeMsg *pMsg, int32_t code) {
}
static uint64_t mnodeCreateSuperTableUid() {
int64_t us = taosGetTimestampUs();
uint64_t x = (us & ((((uint64_t)1) << 40) - 1));
int64_t us = taosGetTimestampUs();
uint64_t x = (us & ((((uint64_t)1)<<40) - 1));
x = x << 24;
return x + ((sdbGetVersion() & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
......@@ -1118,7 +1135,7 @@ static uint64_t mnodeCreateSuperTableUid() {
static uint64_t mnodeCreateTableUid(int32_t vgId, int32_t tid) {
uint64_t uid = (((uint64_t)vgId) << 48) + ((((uint64_t)tid) & ((1ul << 24) - 1ul)) << 24) +
((sdbGetVersion() & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
((sdbGetVersion() & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
return uid;
}
......@@ -1130,7 +1147,7 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) {
return TSDB_CODE_MND_INVALID_CREATE_TABLE_MSG;
}
SCreateTableMsg *pCreate = (SCreateTableMsg *)((char *)pCreate1 + sizeof(SCMCreateTableMsg));
SCreateTableMsg* pCreate = (SCreateTableMsg*)((char*)pCreate1 + sizeof(SCMCreateTableMsg));
int16_t numOfTags = htons(pCreate->numOfTags);
if (numOfTags > TSDB_MAX_TAGS) {
......@@ -1141,34 +1158,31 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) {
int16_t numOfColumns = htons(pCreate->numOfColumns);
int32_t numOfCols = numOfColumns + numOfTags;
if (numOfCols > TSDB_MAX_COLUMNS) {
mError("msg:%p, app:%p table:%s, failed to create, too many columns", pMsg, pMsg->rpcMsg.ahandle,
pCreate->tableName);
mError("msg:%p, app:%p table:%s, failed to create, too many columns", pMsg, pMsg->rpcMsg.ahandle, pCreate->tableName);
return TSDB_CODE_MND_TOO_MANY_COLUMNS;
}
SSTableObj *pStable = calloc(1, sizeof(SSTableObj));
if (pStable == NULL) {
mError("msg:%p, app:%p table:%s, failed to create, no enough memory", pMsg, pMsg->rpcMsg.ahandle,
pCreate->tableName);
mError("msg:%p, app:%p table:%s, failed to create, no enough memory", pMsg, pMsg->rpcMsg.ahandle, pCreate->tableName);
return TSDB_CODE_MND_OUT_OF_MEMORY;
}
pStable->info.tableId = strdup(pCreate->tableName);
pStable->info.type = TSDB_SUPER_TABLE;
pStable->createdTime = taosGetTimestampMs();
pStable->uid = mnodeCreateSuperTableUid();
pStable->sversion = 0;
pStable->tversion = 0;
pStable->info.type = TSDB_SUPER_TABLE;
pStable->createdTime = taosGetTimestampMs();
pStable->uid = mnodeCreateSuperTableUid();
pStable->sversion = 0;
pStable->tversion = 0;
pStable->numOfColumns = numOfColumns;
pStable->numOfTags = numOfTags;
pStable->numOfTags = numOfTags;
int32_t schemaSize = numOfCols * sizeof(SSchema);
pStable->schema = (SSchema *)calloc(1, schemaSize);
if (pStable->schema == NULL) {
tfree(pStable->info.tableId);
tfree(pStable);
mError("msg:%p, app:%p table:%s, failed to create, no schema input", pMsg, pMsg->rpcMsg.ahandle,
pCreate->tableName);
mError("msg:%p, app:%p table:%s, failed to create, no schema input", pMsg, pMsg->rpcMsg.ahandle, pCreate->tableName);
return TSDB_CODE_MND_INVALID_TABLE_NAME;
}
......@@ -1183,8 +1197,7 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) {
}
if (!tIsValidSchema(pStable->schema, pStable->numOfColumns, pStable->numOfTags)) {
mError("msg:%p, app:%p table:%s, failed to create table, invalid schema", pMsg, pMsg->rpcMsg.ahandle,
pCreate->tableName);
mError("msg:%p, app:%p table:%s, failed to create table, invalid schema", pMsg, pMsg->rpcMsg.ahandle, pCreate->tableName);
tfree(pStable->info.tableId);
tfree(pStable->schema);
tfree(pStable);
......@@ -1194,12 +1207,14 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) {
pMsg->pTable = (STableObj *)pStable;
mnodeIncTableRef(pMsg->pTable);
SSdbRow row = {.type = SDB_OPER_GLOBAL,
.pTable = tsSuperTableSdb,
.pObj = pStable,
.rowSize = sizeof(SSTableObj) + schemaSize,
.pMsg = pMsg,
.fpRsp = mnodeCreateSuperTableCb};
SSdbRow row = {
.type = SDB_OPER_GLOBAL,
.pTable = tsSuperTableSdb,
.pObj = pStable,
.rowSize = sizeof(SSTableObj) + schemaSize,
.pMsg = pMsg,
.fpRsp = mnodeCreateSuperTableCb
};
int32_t code = sdbInsertRow(&row);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
......@@ -1260,11 +1275,13 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) {
mInfo("msg:%p, app:%p stable:%s will be dropped, hash:%p sizeOfVgList:%d", pMsg, pMsg->rpcMsg.ahandle,
pStable->info.tableId, pStable->vgHash, taosHashGetSize(pStable->vgHash));
SSdbRow row = {.type = SDB_OPER_GLOBAL,
.pTable = tsSuperTableSdb,
.pObj = pStable,
.pMsg = pMsg,
.fpRsp = mnodeDropSuperTableCb};
SSdbRow row = {
.type = SDB_OPER_GLOBAL,
.pTable = tsSuperTableSdb,
.pObj = pStable,
.pMsg = pMsg,
.fpRsp = mnodeDropSuperTableCb
};
int32_t code = sdbDeleteRow(&row);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
......@@ -1276,7 +1293,7 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) {
}
static int32_t mnodeFindSuperTableTagIndex(SSTableObj *pStable, const char *tagName) {
SSchema *schema = (SSchema *)pStable->schema;
SSchema *schema = (SSchema *) pStable->schema;
for (int32_t tag = 0; tag < pStable->numOfTags; tag++) {
if (strcmp(schema[pStable->numOfColumns + tag].name, tagName) == 0) {
return tag;
......@@ -1289,7 +1306,7 @@ static int32_t mnodeFindSuperTableTagIndex(SSTableObj *pStable, const char *tagN
static int32_t mnodeAddSuperTableTagCb(SMnodeMsg *pMsg, int32_t code) {
SSTableObj *pStable = (SSTableObj *)pMsg->pTable;
mLInfo("msg:%p, app:%p stable %s, add tag result:%s, numOfTags:%d", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId,
tstrerror(code), pStable->numOfTags);
tstrerror(code), pStable->numOfTags);
if (code == TSDB_CODE_SUCCESS) {
code = mnodeGetSuperTableMeta(pMsg);
}
......@@ -1333,13 +1350,15 @@ static int32_t mnodeAddSuperTableTag(SMnodeMsg *pMsg, SSchema schema[], int32_t
pStable->tversion++;
mInfo("msg:%p, app:%p stable %s, start to add tag %s", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId,
schema[0].name);
schema[0].name);
SSdbRow row = {.type = SDB_OPER_GLOBAL,
.pTable = tsSuperTableSdb,
.pObj = pStable,
.pMsg = pMsg,
.fpRsp = mnodeAddSuperTableTagCb};
SSdbRow row = {
.type = SDB_OPER_GLOBAL,
.pTable = tsSuperTableSdb,
.pObj = pStable,
.pMsg = pMsg,
.fpRsp = mnodeAddSuperTableTagCb
};
return sdbUpdateRow(&row);
}
......@@ -1347,7 +1366,7 @@ static int32_t mnodeAddSuperTableTag(SMnodeMsg *pMsg, SSchema schema[], int32_t
static int32_t mnodeDropSuperTableTagCb(SMnodeMsg *pMsg, int32_t code) {
SSTableObj *pStable = (SSTableObj *)pMsg->pTable;
mLInfo("msg:%p, app:%p stable %s, drop tag result:%s", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId,
tstrerror(code));
tstrerror(code));
if (code == TSDB_CODE_SUCCESS) {
code = mnodeGetSuperTableMeta(pMsg);
}
......@@ -1358,7 +1377,7 @@ static int32_t mnodeDropSuperTableTagCb(SMnodeMsg *pMsg, int32_t code) {
static int32_t mnodeDropSuperTableTag(SMnodeMsg *pMsg, char *tagName) {
SSTableObj *pStable = (SSTableObj *)pMsg->pTable;
int32_t col = mnodeFindSuperTableTagIndex(pStable, tagName);
int32_t col = mnodeFindSuperTableTagIndex(pStable, tagName);
if (col < 0) {
mError("msg:%p, app:%p stable:%s, drop tag, tag:%s not exist", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId,
tagName);
......@@ -1372,11 +1391,13 @@ static int32_t mnodeDropSuperTableTag(SMnodeMsg *pMsg, char *tagName) {
mInfo("msg:%p, app:%p stable %s, start to drop tag %s", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId, tagName);
SSdbRow row = {.type = SDB_OPER_GLOBAL,
.pTable = tsSuperTableSdb,
.pObj = pStable,
.pMsg = pMsg,
.fpRsp = mnodeDropSuperTableTagCb};
SSdbRow row = {
.type = SDB_OPER_GLOBAL,
.pTable = tsSuperTableSdb,
.pObj = pStable,
.pMsg = pMsg,
.fpRsp = mnodeDropSuperTableTagCb
};
return sdbUpdateRow(&row);
}
......@@ -1395,7 +1416,7 @@ static int32_t mnodeModifySuperTableTagNameCb(SMnodeMsg *pMsg, int32_t code) {
static int32_t mnodeModifySuperTableTagName(SMnodeMsg *pMsg, char *oldTagName, char *newTagName) {
SSTableObj *pStable = (SSTableObj *)pMsg->pTable;
int32_t col = mnodeFindSuperTableTagIndex(pStable, oldTagName);
int32_t col = mnodeFindSuperTableTagIndex(pStable, oldTagName);
if (col < 0) {
mError("msg:%p, app:%p stable:%s, failed to modify table tag, oldName: %s, newName: %s", pMsg, pMsg->rpcMsg.ahandle,
pStable->info.tableId, oldTagName, newTagName);
......@@ -1413,23 +1434,25 @@ static int32_t mnodeModifySuperTableTagName(SMnodeMsg *pMsg, char *oldTagName, c
}
// update
SSchema *schema = (SSchema *)(pStable->schema + pStable->numOfColumns + col);
SSchema *schema = (SSchema *) (pStable->schema + pStable->numOfColumns + col);
tstrncpy(schema->name, newTagName, sizeof(schema->name));
mInfo("msg:%p, app:%p stable %s, start to modify tag %s to %s", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId,
oldTagName, newTagName);
oldTagName, newTagName);
SSdbRow row = {.type = SDB_OPER_GLOBAL,
.pTable = tsSuperTableSdb,
.pObj = pStable,
.pMsg = pMsg,
.fpRsp = mnodeModifySuperTableTagNameCb};
SSdbRow row = {
.type = SDB_OPER_GLOBAL,
.pTable = tsSuperTableSdb,
.pObj = pStable,
.pMsg = pMsg,
.fpRsp = mnodeModifySuperTableTagNameCb
};
return sdbUpdateRow(&row);
}
static int32_t mnodeFindSuperTableColumnIndex(SSTableObj *pStable, char *colName) {
SSchema *schema = (SSchema *)pStable->schema;
SSchema *schema = (SSchema *) pStable->schema;
for (int32_t col = 0; col < pStable->numOfColumns; col++) {
if (strcmp(schema[col].name, colName) == 0) {
return col;
......@@ -1442,7 +1465,7 @@ static int32_t mnodeFindSuperTableColumnIndex(SSTableObj *pStable, char *colName
static int32_t mnodeAddSuperTableColumnCb(SMnodeMsg *pMsg, int32_t code) {
SSTableObj *pStable = (SSTableObj *)pMsg->pTable;
mLInfo("msg:%p, app:%p stable %s, add column result:%s", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId,
tstrerror(code));
tstrerror(code));
if (code == TSDB_CODE_SUCCESS) {
code = mnodeGetSuperTableMeta(pMsg);
}
......@@ -1452,11 +1475,10 @@ static int32_t mnodeAddSuperTableColumnCb(SMnodeMsg *pMsg, int32_t code) {
}
static int32_t mnodeAddSuperTableColumn(SMnodeMsg *pMsg, SSchema schema[], int32_t ncols) {
SDbObj *pDb = pMsg->pDb;
SDbObj *pDb = pMsg->pDb;
SSTableObj *pStable = (SSTableObj *)pMsg->pTable;
if (ncols <= 0) {
mError("msg:%p, app:%p stable:%s, add column, ncols:%d <= 0", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId,
ncols);
mError("msg:%p, app:%p stable:%s, add column, ncols:%d <= 0", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId, ncols);
return TSDB_CODE_MND_APP_ERROR;
}
......@@ -1486,7 +1508,7 @@ static int32_t mnodeAddSuperTableColumn(SMnodeMsg *pMsg, SSchema schema[], int32
sizeof(SSchema) * pStable->numOfTags);
memcpy(pStable->schema + pStable->numOfColumns, schema, sizeof(SSchema) * ncols);
SSchema *tschema = (SSchema *)(pStable->schema + pStable->numOfColumns);
SSchema *tschema = (SSchema *) (pStable->schema + pStable->numOfColumns);
for (int32_t i = 0; i < ncols; i++) {
tschema[i].colId = pStable->nextColId++;
}
......@@ -1503,13 +1525,15 @@ static int32_t mnodeAddSuperTableColumn(SMnodeMsg *pMsg, SSchema schema[], int32
mInfo("msg:%p, app:%p stable %s, start to add column", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId);
SSdbRow row = {.type = SDB_OPER_GLOBAL,
.pTable = tsSuperTableSdb,
.pObj = pStable,
.pMsg = pMsg,
.fpRsp = mnodeAddSuperTableColumnCb};
SSdbRow row = {
.type = SDB_OPER_GLOBAL,
.pTable = tsSuperTableSdb,
.pObj = pStable,
.pMsg = pMsg,
.fpRsp = mnodeAddSuperTableColumnCb
};
return sdbUpdateRow(&row);
return sdbUpdateRow(&row);
}
static int32_t mnodeDropSuperTableColumnCb(SMnodeMsg *pMsg, int32_t code) {
......@@ -1525,12 +1549,12 @@ static int32_t mnodeDropSuperTableColumnCb(SMnodeMsg *pMsg, int32_t code) {
}
static int32_t mnodeDropSuperTableColumn(SMnodeMsg *pMsg, char *colName) {
SDbObj *pDb = pMsg->pDb;
SDbObj *pDb = pMsg->pDb;
SSTableObj *pStable = (SSTableObj *)pMsg->pTable;
int32_t col = mnodeFindSuperTableColumnIndex(pStable, colName);
int32_t col = mnodeFindSuperTableColumnIndex(pStable, colName);
if (col <= 0) {
mError("msg:%p, app:%p stable:%s, drop column, column:%s not exist", pMsg, pMsg->rpcMsg.ahandle,
pStable->info.tableId, colName);
mError("msg:%p, app:%p stable:%s, drop column, column:%s not exist", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId,
colName);
return TSDB_CODE_MND_FIELD_NOT_EXIST;
}
......@@ -1552,11 +1576,13 @@ static int32_t mnodeDropSuperTableColumn(SMnodeMsg *pMsg, char *colName) {
mInfo("msg:%p, app:%p stable %s, start to delete column", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId);
SSdbRow row = {.type = SDB_OPER_GLOBAL,
.pTable = tsSuperTableSdb,
.pObj = pStable,
.pMsg = pMsg,
.fpRsp = mnodeDropSuperTableColumnCb};
SSdbRow row = {
.type = SDB_OPER_GLOBAL,
.pTable = tsSuperTableSdb,
.pObj = pStable,
.pMsg = pMsg,
.fpRsp = mnodeDropSuperTableColumnCb
};
return sdbUpdateRow(&row);
}
......@@ -1575,11 +1601,12 @@ static int32_t mnodeChangeSuperTableColumnCb(SMnodeMsg *pMsg, int32_t code) {
static int32_t mnodeChangeSuperTableColumn(SMnodeMsg *pMsg) {
SAlterTableMsg *pAlter = pMsg->rpcMsg.pCont;
char *name = pAlter->schema[0].name;
SSTableObj *pStable = (SSTableObj *)pMsg->pTable;
int32_t col = mnodeFindSuperTableColumnIndex(pStable, name);
char* name = pAlter->schema[0].name;
SSTableObj *pStable = (SSTableObj *)pMsg->pTable;
int32_t col = mnodeFindSuperTableColumnIndex(pStable, name);
if (col < 0) {
mError("msg:%p, app:%p stable:%s, change column, name:%s", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId, name);
mError("msg:%p, app:%p stable:%s, change column, name:%s", pMsg, pMsg->rpcMsg.ahandle,
pStable->info.tableId, name);
return TSDB_CODE_MND_FIELD_NOT_EXIST;
}
......@@ -1600,7 +1627,7 @@ static int32_t mnodeChangeSuperTableColumn(SMnodeMsg *pMsg) {
}
// update
SSchema *schema = (SSchema *)(pStable->schema + col);
SSchema *schema = (SSchema *) (pStable->schema + col);
ASSERT(schema->type == TSDB_DATA_TYPE_BINARY || schema->type == TSDB_DATA_TYPE_NCHAR);
if (pAlter->schema[0].bytes <= schema->bytes) {
......@@ -1611,30 +1638,33 @@ static int32_t mnodeChangeSuperTableColumn(SMnodeMsg *pMsg) {
schema->bytes = pAlter->schema[0].bytes;
pStable->sversion++;
mInfo("msg:%p, app:%p stable %s, start to modify column %s len to %d", pMsg, pMsg->rpcMsg.ahandle,
pStable->info.tableId, name, schema->bytes);
mInfo("msg:%p, app:%p stable %s, start to modify column %s len to %d", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId,
name, schema->bytes);
SSdbRow row = {.type = SDB_OPER_GLOBAL,
.pTable = tsSuperTableSdb,
.pObj = pStable,
.pMsg = pMsg,
.fpRsp = mnodeChangeSuperTableColumnCb};
SSdbRow row = {
.type = SDB_OPER_GLOBAL,
.pTable = tsSuperTableSdb,
.pObj = pStable,
.pMsg = pMsg,
.fpRsp = mnodeChangeSuperTableColumnCb
};
return sdbUpdateRow(&row);
}
static int32_t mnodeChangeSuperTableTag(SMnodeMsg *pMsg) {
SAlterTableMsg *pAlter = pMsg->rpcMsg.pCont;
char *name = pAlter->schema[0].name;
SSTableObj *pStable = (SSTableObj *)pMsg->pTable;
int32_t col = mnodeFindSuperTableTagIndex(pStable, name);
char* name = pAlter->schema[0].name;
SSTableObj *pStable = (SSTableObj *)pMsg->pTable;
int32_t col = mnodeFindSuperTableTagIndex(pStable, name);
if (col < 0) {
mError("msg:%p, app:%p stable:%s, change column, name:%s", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId, name);
mError("msg:%p, app:%p stable:%s, change column, name:%s", pMsg, pMsg->rpcMsg.ahandle,
pStable->info.tableId, name);
return TSDB_CODE_MND_FIELD_NOT_EXIST;
}
// update
SSchema *schema = (SSchema *)(pStable->schema + col + pStable->numOfColumns);
SSchema *schema = (SSchema *) (pStable->schema + col + pStable->numOfColumns);
ASSERT(schema->type == TSDB_DATA_TYPE_BINARY || schema->type == TSDB_DATA_TYPE_NCHAR);
if (pAlter->schema[0].bytes <= schema->bytes) {
mError("msg:%p, app:%p stable:%s, modify tag len. tag:%s, len from %d to %d", pMsg, pMsg->rpcMsg.ahandle,
......@@ -1645,13 +1675,15 @@ static int32_t mnodeChangeSuperTableTag(SMnodeMsg *pMsg) {
schema->bytes = pAlter->schema[0].bytes;
pStable->tversion++;
mInfo("msg:%p, app:%p stable %s, start to modify tag len %s to %d", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId,
name, schema->bytes);
name, schema->bytes);
SSdbRow row = {.type = SDB_OPER_GLOBAL,
.pTable = tsSuperTableSdb,
.pObj = pStable,
.pMsg = pMsg,
.fpRsp = mnodeChangeSuperTableColumnCb};
SSdbRow row = {
.type = SDB_OPER_GLOBAL,
.pTable = tsSuperTableSdb,
.pObj = pStable,
.pMsg = pMsg,
.fpRsp = mnodeChangeSuperTableColumnCb
};
return sdbUpdateRow(&row);
}
......@@ -1667,10 +1699,10 @@ static int32_t mnodeGetShowSuperTableMeta(STableMetaMsg *pMeta, SShowObj *pShow,
return TSDB_CODE_MND_DB_IN_DROPPING;
}
int32_t cols = 0;
int32_t cols = 0;
SSchema *pSchema = pMeta->schema;
SSchema *tbnameSchema = tGetTbnameColumnSchema();
SSchema* tbnameSchema = tGetTbnameColumnSchema();
pShow->bytes[cols] = tbnameSchema->bytes;
pSchema[cols].type = tbnameSchema->type;
strcpy(pSchema[cols].name, "name");
......@@ -1716,12 +1748,12 @@ static int32_t mnodeGetShowSuperTableMeta(STableMetaMsg *pMeta, SShowObj *pShow,
// retrieve super tables
int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
int32_t numOfRows = 0;
char *pWrite;
int32_t cols = 0;
int32_t numOfRows = 0;
char * pWrite;
int32_t cols = 0;
SSTableObj *pTable = NULL;
char prefix[64] = {0};
int32_t prefixLen;
char prefix[64] = {0};
int32_t prefixLen;
SDbObj *pDb = mnodeGetDb(pShow->db);
if (pDb == NULL) return 0;
......@@ -1737,9 +1769,9 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows,
prefixLen = (int32_t)strlen(prefix);
SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER;
char stableName[TSDB_TABLE_NAME_LEN] = {0};
char stableName[TSDB_TABLE_NAME_LEN] = {0};
char *pattern = mnodeGetTableShowPattern(pShow);
char* pattern = mnodeGetTableShowPattern(pShow);
if (pShow->payloadLen > 0 && pattern == NULL) {
return 0;
}
......@@ -1755,8 +1787,7 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows,
memset(stableName, 0, tListLen(stableName));
mnodeExtractTableName(pTable->info.tableId, stableName);
if (pShow->payloadLen > 0 &&
patternMatch(pattern, stableName, sizeof(stableName) - 1, &info) != TSDB_PATTERN_MATCH) {
if (pShow->payloadLen > 0 && patternMatch(pattern, stableName, sizeof(stableName) - 1, &info) != TSDB_PATTERN_MATCH) {
mnodeDecTableRef(pTable);
continue;
}
......@@ -1766,8 +1797,8 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows,
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
int16_t len = (int16_t)strnlen(stableName, TSDB_TABLE_NAME_LEN - 1);
*(int16_t *)pWrite = len;
pWrite += sizeof(int16_t); // todo refactor
*(int16_t*) pWrite = len;
pWrite += sizeof(int16_t); // todo refactor
strncpy(pWrite, stableName, len);
cols++;
......@@ -1805,8 +1836,8 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows,
}
void mnodeDropAllSuperTables(SDbObj *pDropDb) {
void *pIter = NULL;
int32_t numOfTables = 0;
void * pIter= NULL;
int32_t numOfTables = 0;
SSTableObj *pTable = NULL;
char prefix[64] = {0};
......@@ -1822,12 +1853,12 @@ void mnodeDropAllSuperTables(SDbObj *pDropDb) {
if (strncmp(prefix, pTable->info.tableId, prefixLen) == 0) {
SSdbRow row = {
.type = SDB_OPER_LOCAL,
.pTable = tsSuperTableSdb,
.pObj = pTable,
.type = SDB_OPER_LOCAL,
.pTable = tsSuperTableSdb,
.pObj = pTable,
};
sdbDeleteRow(&row);
numOfTables++;
numOfTables ++;
}
mnodeDecTableRef(pTable);
......@@ -1842,7 +1873,7 @@ static int32_t mnodeSetSchemaFromSuperTable(SSchema *pSchema, SSTableObj *pTable
for (int32_t i = 0; i < numOfCols; ++i) {
tstrncpy(pSchema->name, pTable->schema[i].name, sizeof(pSchema->name));
pSchema->type = pTable->schema[i].type;
pSchema->type = pTable->schema[i].type;
pSchema->bytes = htons(pTable->schema[i].bytes);
pSchema->colId = htons(pTable->schema[i].colId);
pSchema++;
......@@ -1851,27 +1882,26 @@ static int32_t mnodeSetSchemaFromSuperTable(SSchema *pSchema, SSTableObj *pTable
return (pTable->numOfColumns + pTable->numOfTags) * sizeof(SSchema);
}
static int32_t mnodeDoGetSuperTableMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta) {
static int32_t mnodeDoGetSuperTableMeta(SMnodeMsg *pMsg, STableMetaMsg* pMeta) {
SSTableObj *pTable = (SSTableObj *)pMsg->pTable;
pMeta->uid = htobe64(pTable->uid);
pMeta->sversion = htons(pTable->sversion);
pMeta->tversion = htons(pTable->tversion);
pMeta->precision = pMsg->pDb->cfg.precision;
pMeta->update = pMsg->pDb->cfg.update;
pMeta->numOfTags = (uint8_t)pTable->numOfTags;
pMeta->uid = htobe64(pTable->uid);
pMeta->sversion = htons(pTable->sversion);
pMeta->tversion = htons(pTable->tversion);
pMeta->precision = pMsg->pDb->cfg.precision;
pMeta->update = pMsg->pDb->cfg.update;
pMeta->numOfTags = (uint8_t)pTable->numOfTags;
pMeta->numOfColumns = htons((int16_t)pTable->numOfColumns);
pMeta->tableType = pTable->info.type;
pMeta->contLen = sizeof(STableMetaMsg) + mnodeSetSchemaFromSuperTable(pMeta->schema, pTable);
pMeta->tableType = pTable->info.type;
pMeta->contLen = sizeof(STableMetaMsg) + mnodeSetSchemaFromSuperTable(pMeta->schema, pTable);
tstrncpy(pMeta->tableFname, pTable->info.tableId, sizeof(pMeta->tableFname));
return TSDB_CODE_SUCCESS;
}
static int32_t mnodeGetSuperTableMeta(SMnodeMsg *pMsg) {
SSTableObj *pTable = (SSTableObj *)pMsg->pTable;
STableMetaMsg *pMeta =
rpcMallocCont(sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16));
SSTableObj *pTable = (SSTableObj *)pMsg->pTable;
STableMetaMsg *pMeta = rpcMallocCont(sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16));
if (pMeta == NULL) {
return TSDB_CODE_MND_OUT_OF_MEMORY;
}
......@@ -1888,9 +1918,9 @@ static int32_t mnodeGetSuperTableMeta(SMnodeMsg *pMsg) {
return TSDB_CODE_SUCCESS;
}
static int32_t doGetVgroupInfoLength(char *name) {
static int32_t doGetVgroupInfoLength(char* name) {
SSTableObj *pTable = mnodeGetSuperTable(name);
int32_t len = 0;
int32_t len = 0;
if (pTable != NULL && pTable->vgHash != NULL) {
len = (taosHashGetSize(pTable->vgHash) * sizeof(SVgroupMsg) + sizeof(SVgroupsMsg));
}
......@@ -1899,7 +1929,7 @@ static int32_t doGetVgroupInfoLength(char *name) {
return len;
}
static int32_t getVgroupInfoLength(SSTableVgroupMsg *pInfo, int32_t numOfTable) {
static int32_t getVgroupInfoLength(SSTableVgroupMsg* pInfo, int32_t numOfTable) {
int32_t contLen = sizeof(SSTableVgroupRspMsg) + 32 * sizeof(SVgroupMsg) + sizeof(SVgroupsMsg);
for (int32_t i = 0; i < numOfTable; ++i) {
char *stableName = (char *)pInfo + sizeof(SSTableVgroupMsg) + (TSDB_TABLE_FNAME_LEN)*i;
......@@ -1909,7 +1939,7 @@ static int32_t getVgroupInfoLength(SSTableVgroupMsg *pInfo, int32_t numOfTable)
return contLen;
}
static char *serializeVgroupInfo(SSTableObj *pTable, char *name, char *msg, SMnodeMsg *pMsgBody, void *handle) {
static char* serializeVgroupInfo(SSTableObj *pTable, char* name, char* msg, SMnodeMsg* pMsgBody, void* handle) {
strncpy(msg, name, TSDB_TABLE_FNAME_LEN);
msg += TSDB_TABLE_FNAME_LEN;
......@@ -1924,8 +1954,8 @@ static char *serializeVgroupInfo(SSTableObj *pTable, char *name, char *msg, SMno
msg += sizeof(SVgroupsMsg);
} else {
SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)msg;
mDebug("msg:%p, app:%p stable:%s, hash:%p sizeOfVgList:%d will be returned", pMsgBody, handle, pTable->info.tableId,
pTable->vgHash, taosHashGetSize(pTable->vgHash));
mDebug("msg:%p, app:%p stable:%s, hash:%p sizeOfVgList:%d will be returned", pMsgBody, handle,
pTable->info.tableId, pTable->vgHash, taosHashGetSize(pTable->vgHash));
int32_t *pVgId = taosHashIterate(pTable->vgHash, NULL);
int32_t vgSize = 0;
......@@ -1967,10 +1997,10 @@ static char *serializeVgroupInfo(SSTableObj *pTable, char *name, char *msg, SMno
static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) {
SSTableVgroupMsg *pInfo = pMsg->rpcMsg.pCont;
int32_t numOfTable = htonl(pInfo->numOfTables);
int32_t numOfTable = htonl(pInfo->numOfTables);
// calculate the required space.
int32_t contLen = getVgroupInfoLength(pInfo, numOfTable);
int32_t contLen = getVgroupInfoLength(pInfo, numOfTable);
SSTableVgroupRspMsg *pRsp = rpcMallocCont(contLen);
if (pRsp == NULL) {
return TSDB_CODE_MND_OUT_OF_MEMORY;
......@@ -1984,8 +2014,7 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) {
SSTableObj *pTable = mnodeGetSuperTable(stableName);
if (pTable == NULL) {
mError("msg:%p, app:%p stable:%s, not exist while get stable vgroup info", pMsg, pMsg->rpcMsg.ahandle,
stableName);
mError("msg:%p, app:%p stable:%s, not exist while get stable vgroup info", pMsg, pMsg->rpcMsg.ahandle, stableName);
mnodeDecTableRef(pTable);
continue;
}
......@@ -2011,9 +2040,9 @@ static void mnodeProcessDropSuperTableRsp(SRpcMsg *rpcMsg) {
}
static void *mnodeBuildCreateChildTableMsg(SCMCreateTableMsg *pCreateMsg, SCTableObj *pTable) {
SCreateTableMsg *pMsg = (SCreateTableMsg *)((char *)pCreateMsg + sizeof(SCMCreateTableMsg));
SCreateTableMsg* pMsg = (SCreateTableMsg*) ((char*)pCreateMsg + sizeof(SCMCreateTableMsg));
char *tagData = NULL;
char* tagData = NULL;
int32_t tagDataLen = 0;
int32_t totalCols = 0;
......@@ -2022,10 +2051,10 @@ static void *mnodeBuildCreateChildTableMsg(SCMCreateTableMsg *pCreateMsg, SCTabl
totalCols = pTable->superTable->numOfColumns + pTable->superTable->numOfTags;
contLen = sizeof(SMDCreateTableMsg) + totalCols * sizeof(SSchema) + pTable->sqlLen;
if (pMsg != NULL) {
int32_t nameLen = htonl(*(int32_t *)pMsg->schema);
char *p = pMsg->schema + nameLen + sizeof(int32_t);
int32_t nameLen = htonl(*(int32_t*)pMsg->schema);
char* p = pMsg->schema + nameLen + sizeof(int32_t);
tagDataLen = htonl(*(int32_t *)p);
tagDataLen = htonl(*(int32_t*) p);
contLen += tagDataLen;
tagData = p + sizeof(int32_t);
......@@ -2042,32 +2071,32 @@ static void *mnodeBuildCreateChildTableMsg(SCMCreateTableMsg *pCreateMsg, SCTabl
}
mnodeExtractTableName(pTable->info.tableId, pCreate->tableFname);
pCreate->contLen = htonl(contLen);
pCreate->vgId = htonl(pTable->vgId);
pCreate->tableType = pTable->info.type;
pCreate->createdTime = htobe64(pTable->createdTime);
pCreate->tid = htonl(pTable->tid);
pCreate->sqlDataLen = htonl(pTable->sqlLen);
pCreate->uid = htobe64(pTable->uid);
pCreate->contLen = htonl(contLen);
pCreate->vgId = htonl(pTable->vgId);
pCreate->tableType = pTable->info.type;
pCreate->createdTime = htobe64(pTable->createdTime);
pCreate->tid = htonl(pTable->tid);
pCreate->sqlDataLen = htonl(pTable->sqlLen);
pCreate->uid = htobe64(pTable->uid);
if (pTable->info.type == TSDB_CHILD_TABLE) {
mnodeExtractTableName(pTable->superTable->info.tableId, pCreate->stableFname);
pCreate->numOfColumns = htons(pTable->superTable->numOfColumns);
pCreate->numOfTags = htons(pTable->superTable->numOfTags);
pCreate->sversion = htonl(pTable->superTable->sversion);
pCreate->tversion = htonl(pTable->superTable->tversion);
pCreate->tagDataLen = htonl(tagDataLen);
pCreate->numOfColumns = htons(pTable->superTable->numOfColumns);
pCreate->numOfTags = htons(pTable->superTable->numOfTags);
pCreate->sversion = htonl(pTable->superTable->sversion);
pCreate->tversion = htonl(pTable->superTable->tversion);
pCreate->tagDataLen = htonl(tagDataLen);
pCreate->superTableUid = htobe64(pTable->superTable->uid);
} else {
pCreate->numOfColumns = htons(pTable->numOfColumns);
pCreate->numOfTags = 0;
pCreate->sversion = htonl(pTable->sversion);
pCreate->tversion = 0;
pCreate->tagDataLen = 0;
pCreate->numOfColumns = htons(pTable->numOfColumns);
pCreate->numOfTags = 0;
pCreate->sversion = htonl(pTable->sversion);
pCreate->tversion = 0;
pCreate->tagDataLen = 0;
pCreate->superTableUid = 0;
}
SSchema *pSchema = (SSchema *)pCreate->data;
SSchema *pSchema = (SSchema *) pCreate->data;
if (pTable->info.type == TSDB_CHILD_TABLE) {
memcpy(pSchema, pTable->superTable->schema, totalCols * sizeof(SSchema));
} else {
......@@ -2104,11 +2133,13 @@ static int32_t mnodeDoCreateChildTableFp(SMnodeMsg *pMsg) {
}
SRpcEpSet epSet = mnodeGetEpSetFromVgroup(pMsg->pVgroup);
SRpcMsg rpcMsg = {.ahandle = pMsg,
.pCont = pMDCreate,
.contLen = htonl(pMDCreate->contLen),
.code = 0,
.msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE};
SRpcMsg rpcMsg = {
.ahandle = pMsg,
.pCont = pMDCreate,
.contLen = htonl(pMDCreate->contLen),
.code = 0,
.msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE
};
dnodeSendMsgToDnode(&epSet, &rpcMsg);
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
......@@ -2117,7 +2148,7 @@ static int32_t mnodeDoCreateChildTableFp(SMnodeMsg *pMsg) {
static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) {
SCTableObj *pTable = (SCTableObj *)pMsg->pTable;
SCreateTableMsg *pCreate = (SCreateTableMsg *)((char *)pMsg->rpcMsg.pCont + sizeof(SCMCreateTableMsg));
SCreateTableMsg *pCreate = (SCreateTableMsg*) ((char*)pMsg->rpcMsg.pCont + sizeof(SCMCreateTableMsg));
assert(pTable);
monSaveAuditLog((pTable->info.type == TSDB_CHILD_TABLE) ? MON_DDL_CMD_CREATE_CHILD_TABLE : MON_DDL_CMD_CREATE_TABLE,
......@@ -2175,7 +2206,7 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, SMetaInfo *pInf) {
SVgObj *pVgroup = pMsg->pVgroup;
SCMCreateTableMsg *p1 = pMsg->rpcMsg.pCont;
SCreateTableMsg *pCreate = (SCreateTableMsg *)((char *)p1 + sizeof(SCMCreateTableMsg));
SCreateTableMsg *pCreate = (SCreateTableMsg*)((char*)p1 + sizeof(SCMCreateTableMsg));
SCTableObj *pTable = calloc(1, sizeof(SCTableObj));
if (pTable == NULL) {
......@@ -2183,20 +2214,20 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, SMetaInfo *pInf) {
return TSDB_CODE_MND_OUT_OF_MEMORY;
}
pTable->info.type = (pCreate->numOfColumns == 0) ? TSDB_CHILD_TABLE : TSDB_NORMAL_TABLE;
pTable->info.type = (pCreate->numOfColumns == 0)? TSDB_CHILD_TABLE:TSDB_NORMAL_TABLE;
pTable->info.tableId = strdup(pCreate->tableName);
pTable->createdTime = taosGetTimestampMs();
pTable->tid = pInf->tid;
pTable->vgId = pVgroup->vgId;
pTable->createdTime = taosGetTimestampMs();
pTable->tid = pInf->tid;
pTable->vgId = pVgroup->vgId;
if (pTable->info.type == TSDB_CHILD_TABLE) {
int32_t nameLen = htonl(*(int32_t *)pCreate->schema);
char *name = (char *)pCreate->schema + sizeof(int32_t);
int32_t nameLen = htonl(*(int32_t*) pCreate->schema);
char* name = (char*)pCreate->schema + sizeof(int32_t);
char stableName[TSDB_TABLE_FNAME_LEN] = {0};
memcpy(stableName, name, nameLen);
char prefix[64] = {0};
char prefix[64] = {0};
size_t prefixLen = tableIdPrefix(pMsg->pDb->name, prefix, 64);
if (0 != strncasecmp(prefix, stableName, prefixLen)) {
mError("msg:%p, app:%p table:%s, corresponding super table:%s not in this db", pMsg, pMsg->rpcMsg.ahandle,
......@@ -2231,13 +2262,13 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, SMetaInfo *pInf) {
}
}
pTable->sversion = 0;
pTable->sversion = 0;
pTable->numOfColumns = htons(pCreate->numOfColumns);
pTable->sqlLen = htons(pCreate->sqlLen);
pTable->sqlLen = htons(pCreate->sqlLen);
int32_t numOfCols = pTable->numOfColumns;
int32_t numOfCols = pTable->numOfColumns;
int32_t schemaSize = numOfCols * sizeof(SSchema);
pTable->schema = (SSchema *)calloc(1, schemaSize);
pTable->schema = (SSchema *) calloc(1, schemaSize);
if (pTable->schema == NULL) {
free(pTable);
return TSDB_CODE_MND_OUT_OF_MEMORY;
......@@ -2246,7 +2277,7 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, SMetaInfo *pInf) {
pTable->nextColId = 0;
for (int32_t col = 0; col < numOfCols; col++) {
SSchema *tschema = pTable->schema;
SSchema *tschema = pTable->schema;
tschema[col].colId = pTable->nextColId++;
tschema[col].bytes = htons(tschema[col].bytes);
}
......@@ -2258,7 +2289,7 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, SMetaInfo *pInf) {
free(pTable);
return TSDB_CODE_MND_OUT_OF_MEMORY;
}
memcpy(pTable->sql, (char *)(pCreate->schema) + numOfCols * sizeof(SSchema), pTable->sqlLen);
memcpy(pTable->sql, (char *) (pCreate->schema) + numOfCols * sizeof(SSchema), pTable->sqlLen);
pTable->sql[pTable->sqlLen - 1] = 0;
mDebug("msg:%p, app:%p table:%s, stream sql len:%d sql:%s", pMsg, pMsg->rpcMsg.ahandle, pTable->info.tableId,
pTable->sqlLen, pTable->sql);
......@@ -2268,11 +2299,13 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, SMetaInfo *pInf) {
pMsg->pTable = (STableObj *)pTable;
mnodeIncTableRef(pMsg->pTable);
SSdbRow desc = {.type = SDB_OPER_GLOBAL,
.pObj = pTable,
.pTable = tsChildTableSdb,
.pMsg = pMsg,
.fpReq = mnodeDoCreateChildTableFp};
SSdbRow desc = {
.type = SDB_OPER_GLOBAL,
.pObj = pTable,
.pTable = tsChildTableSdb,
.pMsg = pMsg,
.fpReq = mnodeDoCreateChildTableFp
};
int32_t code = sdbInsertRow(&desc);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
......@@ -2281,7 +2314,7 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, SMetaInfo *pInf) {
mError("msg:%p, app:%p table:%s, failed to create, reason:%s", pMsg, pMsg->rpcMsg.ahandle, pCreate->tableName,
tstrerror(code));
} else {
mInfo("msg:%p, app:%p table:%s, allocated in vgroup, vgId:%d sid:%d uid:%" PRIu64, pMsg, pMsg->rpcMsg.ahandle,
mDebug("msg:%p, app:%p table:%s, allocated in vgroup, vgId:%d sid:%d uid:%" PRIu64, pMsg, pMsg->rpcMsg.ahandle,
pTable->info.tableId, pVgroup->vgId, pTable->tid, pTable->uid);
}
......@@ -2350,8 +2383,8 @@ static int32_t mnodeProcessMetaSyncCreateChildTableMsg(SMnodeMsg *pMsg, SMetaInf
}
static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) {
// SCMCreateTableMsg* p1 = pMsg->rpcMsg.pCont; // there are several tables here.
SCreateTableMsg *pCreate = (SCreateTableMsg *)((char *)pMsg->rpcMsg.pCont + sizeof(SCMCreateTableMsg));
//SCMCreateTableMsg* p1 = pMsg->rpcMsg.pCont; // there are several tables here.
SCreateTableMsg* pCreate = (SCreateTableMsg*)((char *)pMsg->rpcMsg.pCont + sizeof(SCMCreateTableMsg));
int32_t code = grantCheck(TSDB_GRANT_TIMESERIES);
if (code != TSDB_CODE_SUCCESS) {
......@@ -2362,7 +2395,7 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) {
if (pMsg->retry == 0) {
if (pMsg->pTable == NULL) {
SVgObj *pVgroup = NULL;
SVgObj *pVgroup = NULL;
SMetaInfo metaInfo = {.tid = 0, .vgId = 0, .uid = -1, .suid = -1};
if (tsMetaSyncOption) {
......@@ -2372,7 +2405,7 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) {
code = mnodeGetAvailableVgroup(pMsg, &pVgroup, &metaInfo.tid, metaInfo.vgId);
if (code != TSDB_CODE_SUCCESS) {
mInfo("msg:%p, app:%p table:%s, failed to get available vgroup, reason:%s", pMsg, pMsg->rpcMsg.ahandle,
pCreate->tableName, tstrerror(code));
pCreate->tableName, tstrerror(code));
return code;
}
......@@ -2390,8 +2423,8 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) {
}
if (pMsg->pTable == NULL) {
mError("msg:%p, app:%p table:%s, object not found, retry:%d reason:%s", pMsg, pMsg->rpcMsg.ahandle,
pCreate->tableName, pMsg->retry, tstrerror(terrno));
mError("msg:%p, app:%p table:%s, object not found, retry:%d reason:%s", pMsg, pMsg->rpcMsg.ahandle, pCreate->tableName, pMsg->retry,
tstrerror(terrno));
return terrno;
} else {
mDebug("msg:%p, app:%p table:%s, send create msg to vnode again", pMsg, pMsg->rpcMsg.ahandle, pCreate->tableName);
......@@ -2413,10 +2446,10 @@ static int32_t mnodeSendDropChildTableMsg(SMnodeMsg *pMsg, bool needReturn) {
}
tstrncpy(pDrop->tableFname, pTable->info.tableId, TSDB_TABLE_FNAME_LEN);
pDrop->vgId = htonl(pTable->vgId);
pDrop->vgId = htonl(pTable->vgId);
pDrop->contLen = htonl(sizeof(SMDDropTableMsg));
pDrop->tid = htonl(pTable->tid);
pDrop->uid = htobe64(pTable->uid);
pDrop->tid = htonl(pTable->tid);
pDrop->uid = htobe64(pTable->uid);
SRpcEpSet epSet = mnodeGetEpSetFromVgroup(pMsg->pVgroup);
......@@ -2426,11 +2459,13 @@ static int32_t mnodeSendDropChildTableMsg(SMnodeMsg *pMsg, bool needReturn) {
monSaveAuditLog((pTable->info.type == TSDB_CHILD_TABLE) ? MON_DDL_CMD_DROP_CHILD_TABLE : MON_DDL_CMD_DROP_TABLE,
mnodeGetUserFromMsg(pMsg), pTable->info.tableId, true);
SRpcMsg rpcMsg = {.ahandle = pMsg,
.pCont = pDrop,
.contLen = sizeof(SMDDropTableMsg),
.code = 0,
.msgType = TSDB_MSG_TYPE_MD_DROP_TABLE};
SRpcMsg rpcMsg = {
.ahandle = pMsg,
.pCont = pDrop,
.contLen = sizeof(SMDDropTableMsg),
.code = 0,
.msgType = TSDB_MSG_TYPE_MD_DROP_TABLE
};
if (!needReturn) rpcMsg.ahandle = NULL;
......@@ -2453,6 +2488,7 @@ static int32_t mnodeDropChildTableCb(SMnodeMsg *pMsg, int32_t code) {
return TSDB_CODE_SUCCESS;
}
}
return mnodeSendDropChildTableMsg(pMsg, true);
}
......@@ -2466,7 +2502,12 @@ static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg) {
}
SSdbRow row = {
.type = SDB_OPER_GLOBAL, .pTable = tsChildTableSdb, .pObj = pTable, .pMsg = pMsg, .fpRsp = mnodeDropChildTableCb};
.type = SDB_OPER_GLOBAL,
.pTable = tsChildTableSdb,
.pObj = pTable,
.pMsg = pMsg,
.fpRsp = mnodeDropChildTableCb
};
int32_t code = sdbDeleteRow(&row);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
......@@ -2589,7 +2630,7 @@ _exit:
}
static int32_t mnodeFindNormalTableColumnIndex(SCTableObj *pTable, char *colName) {
SSchema *schema = (SSchema *)pTable->schema;
SSchema *schema = (SSchema *) pTable->schema;
for (int32_t col = 0; col < pTable->numOfColumns; col++) {
if (strcmp(schema[col].name, colName) == 0) {
return col;
......@@ -2602,8 +2643,8 @@ static int32_t mnodeFindNormalTableColumnIndex(SCTableObj *pTable, char *colName
static int32_t mnodeAlterNormalTableColumnCb(SMnodeMsg *pMsg, int32_t code) {
SCTableObj *pTable = (SCTableObj *)pMsg->pTable;
if (code != TSDB_CODE_SUCCESS) {
mError("msg:%p, app:%p ctable %s, failed to alter column, reason:%s", pMsg, pMsg->rpcMsg.ahandle,
pTable->info.tableId, tstrerror(code));
mError("msg:%p, app:%p ctable %s, failed to alter column, reason:%s", pMsg, pMsg->rpcMsg.ahandle, pTable->info.tableId,
tstrerror(code));
return code;
}
......@@ -2623,11 +2664,13 @@ static int32_t mnodeAlterNormalTableColumnCb(SMnodeMsg *pMsg, int32_t code) {
}
SRpcEpSet epSet = mnodeGetEpSetFromVgroup(pMsg->pVgroup);
SRpcMsg rpcMsg = {.ahandle = pMsg,
.pCont = pMDCreate,
.contLen = htonl(pMDCreate->contLen),
.code = 0,
.msgType = TSDB_MSG_TYPE_MD_ALTER_TABLE};
SRpcMsg rpcMsg = {
.ahandle = pMsg,
.pCont = pMDCreate,
.contLen = htonl(pMDCreate->contLen),
.code = 0,
.msgType = TSDB_MSG_TYPE_MD_ALTER_TABLE
};
mDebug("msg:%p, app:%p ctable %s, send alter column msg to vgId:%d", pMsg, pMsg->rpcMsg.ahandle, pTable->info.tableId,
pMsg->pVgroup->vgId);
......@@ -2638,10 +2681,9 @@ static int32_t mnodeAlterNormalTableColumnCb(SMnodeMsg *pMsg, int32_t code) {
static int32_t mnodeAddNormalTableColumn(SMnodeMsg *pMsg, SSchema schema[], int32_t ncols) {
SCTableObj *pTable = (SCTableObj *)pMsg->pTable;
SDbObj *pDb = pMsg->pDb;
SDbObj *pDb = pMsg->pDb;
if (ncols <= 0) {
mError("msg:%p, app:%p ctable:%s, add column, ncols:%d <= 0", pMsg, pMsg->rpcMsg.ahandle, pTable->info.tableId,
ncols);
mError("msg:%p, app:%p ctable:%s, add column, ncols:%d <= 0", pMsg, pMsg->rpcMsg.ahandle, pTable->info.tableId, ncols);
monSaveAuditLog(MON_DDL_CMD_ADD_COLUMN, mnodeGetUserFromMsg(pMsg), pTable->info.tableId, false);
return TSDB_CODE_MND_APP_ERROR;
}
......@@ -2660,7 +2702,7 @@ static int32_t mnodeAddNormalTableColumn(SMnodeMsg *pMsg, SSchema schema[], int3
memcpy(pTable->schema + pTable->numOfColumns, schema, sizeof(SSchema) * ncols);
SSchema *tschema = (SSchema *)(pTable->schema + pTable->numOfColumns);
SSchema *tschema = (SSchema *) (pTable->schema + pTable->numOfColumns);
for (int32_t i = 0; i < ncols; i++) {
tschema[i].colId = pTable->nextColId++;
}
......@@ -2678,19 +2720,21 @@ static int32_t mnodeAddNormalTableColumn(SMnodeMsg *pMsg, SSchema schema[], int3
mInfo("msg:%p, app:%p ctable %s, start to add column", pMsg, pMsg->rpcMsg.ahandle, pTable->info.tableId);
monSaveAuditLog(MON_DDL_CMD_ADD_COLUMN, mnodeGetUserFromMsg(pMsg), pTable->info.tableId, true);
SSdbRow row = {.type = SDB_OPER_GLOBAL,
.pTable = tsChildTableSdb,
.pObj = pTable,
.pMsg = pMsg,
.fpRsp = mnodeAlterNormalTableColumnCb};
SSdbRow row = {
.type = SDB_OPER_GLOBAL,
.pTable = tsChildTableSdb,
.pObj = pTable,
.pMsg = pMsg,
.fpRsp = mnodeAlterNormalTableColumnCb
};
return sdbUpdateRow(&row);
}
static int32_t mnodeDropNormalTableColumn(SMnodeMsg *pMsg, char *colName) {
SDbObj *pDb = pMsg->pDb;
SDbObj *pDb = pMsg->pDb;
SCTableObj *pTable = (SCTableObj *)pMsg->pTable;
int32_t col = mnodeFindNormalTableColumnIndex(pTable, colName);
int32_t col = mnodeFindNormalTableColumnIndex(pTable, colName);
if (col <= 0) {
mError("msg:%p, app:%p ctable:%s, drop column, column:%s not exist", pMsg, pMsg->rpcMsg.ahandle,
pTable->info.tableId, colName);
......@@ -2712,40 +2756,45 @@ static int32_t mnodeDropNormalTableColumn(SMnodeMsg *pMsg, char *colName) {
mInfo("msg:%p, app:%p ctable %s, start to drop column %s", pMsg, pMsg->rpcMsg.ahandle, pTable->info.tableId, colName);
monSaveAuditLog(MON_DDL_CMD_DROP_COLUMN, mnodeGetUserFromMsg(pMsg), pTable->info.tableId, true);
SSdbRow row = {.type = SDB_OPER_GLOBAL,
.pTable = tsChildTableSdb,
.pObj = pTable,
.pMsg = pMsg,
.fpRsp = mnodeAlterNormalTableColumnCb};
SSdbRow row = {
.type = SDB_OPER_GLOBAL,
.pTable = tsChildTableSdb,
.pObj = pTable,
.pMsg = pMsg,
.fpRsp = mnodeAlterNormalTableColumnCb
};
return sdbUpdateRow(&row);
}
static int32_t mnodeChangeNormalTableColumn(SMnodeMsg *pMsg) {
SAlterTableMsg *pAlter = pMsg->rpcMsg.pCont;
char *name = pAlter->schema[0].name;
SCTableObj *pTable = (SCTableObj *)pMsg->pTable;
int32_t col = mnodeFindNormalTableColumnIndex(pTable, name);
char* name = pAlter->schema[0].name;
SCTableObj *pTable = (SCTableObj *)pMsg->pTable;
int32_t col = mnodeFindNormalTableColumnIndex(pTable, name);
if (col < 0) {
mError("msg:%p, app:%p ctable:%s, change column, name: %s", pMsg, pMsg->rpcMsg.ahandle, pTable->info.tableId, name);
mError("msg:%p, app:%p ctable:%s, change column, name: %s", pMsg, pMsg->rpcMsg.ahandle,
pTable->info.tableId, name);
monSaveAuditLog(MON_DDL_CMD_MODIFY_COLUMN, mnodeGetUserFromMsg(pMsg), pTable->info.tableId, false);
return TSDB_CODE_MND_FIELD_NOT_EXIST;
}
SSchema *schema = (SSchema *)(pTable->schema + col);
SSchema *schema = (SSchema *) (pTable->schema + col);
ASSERT(schema->type == TSDB_DATA_TYPE_BINARY || schema->type == TSDB_DATA_TYPE_NCHAR);
schema->bytes = pAlter->schema[0].bytes;
++pTable->sversion;
mInfo("msg:%p, app:%p ctable %s, start to modify column %s len to %d", pMsg, pMsg->rpcMsg.ahandle,
pTable->info.tableId, name, schema->bytes);
mInfo("msg:%p, app:%p ctable %s, start to modify column %s len to %d", pMsg, pMsg->rpcMsg.ahandle, pTable->info.tableId,
name, schema->bytes);
monSaveAuditLog(MON_DDL_CMD_MODIFY_COLUMN, mnodeGetUserFromMsg(pMsg), pTable->info.tableId, true);
SSdbRow row = {.type = SDB_OPER_GLOBAL,
.pTable = tsChildTableSdb,
.pObj = pTable,
.pMsg = pMsg,
.fpRsp = mnodeAlterNormalTableColumnCb};
SSdbRow row = {
.type = SDB_OPER_GLOBAL,
.pTable = tsChildTableSdb,
.pObj = pTable,
.pMsg = pMsg,
.fpRsp = mnodeAlterNormalTableColumnCb
};
return sdbUpdateRow(&row);
}
......@@ -2754,7 +2803,7 @@ static int32_t mnodeSetSchemaFromNormalTable(SSchema *pSchema, SCTableObj *pTabl
int32_t numOfCols = pTable->numOfColumns;
for (int32_t i = 0; i < numOfCols; ++i) {
strcpy(pSchema->name, pTable->schema[i].name);
pSchema->type = pTable->schema[i].type;
pSchema->type = pTable->schema[i].type;
pSchema->bytes = htons(pTable->schema[i].bytes);
pSchema->colId = htons(pTable->schema[i].colId);
pSchema++;
......@@ -2764,13 +2813,13 @@ static int32_t mnodeSetSchemaFromNormalTable(SSchema *pSchema, SCTableObj *pTabl
}
static int32_t mnodeDoGetChildTableMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta) {
SDbObj *pDb = pMsg->pDb;
SDbObj *pDb = pMsg->pDb;
SCTableObj *pTable = (SCTableObj *)pMsg->pTable;
pMeta->uid = htobe64(pTable->uid);
pMeta->tid = htonl(pTable->tid);
pMeta->uid = htobe64(pTable->uid);
pMeta->tid = htonl(pTable->tid);
pMeta->precision = pDb->cfg.precision;
pMeta->update = pDb->cfg.update;
pMeta->update = pDb->cfg.update;
pMeta->tableType = pTable->info.type;
tstrncpy(pMeta->tableFname, pTable->info.tableId, TSDB_TABLE_FNAME_LEN);
......@@ -2778,18 +2827,18 @@ static int32_t mnodeDoGetChildTableMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta) {
assert(pTable->superTable != NULL);
tstrncpy(pMeta->sTableName, pTable->superTable->info.tableId, TSDB_TABLE_FNAME_LEN);
pMeta->suid = pTable->superTable->uid;
pMeta->sversion = htons(pTable->superTable->sversion);
pMeta->tversion = htons(pTable->superTable->tversion);
pMeta->numOfTags = (int8_t)pTable->superTable->numOfTags;
pMeta->suid = pTable->superTable->uid;
pMeta->sversion = htons(pTable->superTable->sversion);
pMeta->tversion = htons(pTable->superTable->tversion);
pMeta->numOfTags = (int8_t)pTable->superTable->numOfTags;
pMeta->numOfColumns = htons((int16_t)pTable->superTable->numOfColumns);
pMeta->contLen = sizeof(STableMetaMsg) + mnodeSetSchemaFromSuperTable(pMeta->schema, pTable->superTable);
pMeta->contLen = sizeof(STableMetaMsg) + mnodeSetSchemaFromSuperTable(pMeta->schema, pTable->superTable);
} else {
pMeta->sversion = htons(pTable->sversion);
pMeta->tversion = 0;
pMeta->numOfTags = 0;
pMeta->sversion = htons(pTable->sversion);
pMeta->tversion = 0;
pMeta->numOfTags = 0;
pMeta->numOfColumns = htons((int16_t)pTable->numOfColumns);
pMeta->contLen = sizeof(STableMetaMsg) + mnodeSetSchemaFromNormalTable(pMeta->schema, pTable);
pMeta->contLen = sizeof(STableMetaMsg) + mnodeSetSchemaFromNormalTable(pMeta->schema, pTable);
}
if (pMsg->pVgroup == NULL) pMsg->pVgroup = mnodeGetVgroup(pTable->vgId);
......@@ -2824,22 +2873,22 @@ static int32_t mnodeAutoCreateChildTable(SMnodeMsg *pMsg) {
return TSDB_CODE_MND_TAG_NOT_EXIST;
}
char *p = pInfo->tags;
int32_t nameLen = htonl(*(int32_t *)p);
char* p = pInfo->tags;
int32_t nameLen = htonl(*(int32_t*) p);
p += sizeof(int32_t);
p += nameLen;
int32_t tagLen = htonl(*(int32_t *)p);
int32_t tagLen = htonl(*(int32_t*) p);
p += sizeof(int32_t);
int32_t totalLen = nameLen + tagLen + sizeof(int32_t) * 2;
int32_t totalLen = nameLen + tagLen + sizeof(int32_t)*2;
if (tagLen == 0 || nameLen == 0) {
mError("msg:%p, app:%p table:%s, failed to create table on demand for super table is empty, tagLen:%d", pMsg,
pMsg->rpcMsg.ahandle, pInfo->tableFname, tagLen);
return TSDB_CODE_MND_INVALID_STABLE_NAME;
}
int32_t contLen = sizeof(SCMCreateTableMsg) + sizeof(SCreateTableMsg) + totalLen;
int32_t contLen = sizeof(SCMCreateTableMsg) + sizeof(SCreateTableMsg) + totalLen;
SCMCreateTableMsg *pCreateMsg = calloc(1, contLen);
if (pCreateMsg == NULL) {
mError("msg:%p, app:%p table:%s, failed to create table while get meta info, no enough memory", pMsg,
......@@ -2847,7 +2896,7 @@ static int32_t mnodeAutoCreateChildTable(SMnodeMsg *pMsg) {
return TSDB_CODE_MND_OUT_OF_MEMORY;
}
SCreateTableMsg *pCreate = (SCreateTableMsg *)((char *)pCreateMsg + sizeof(SCMCreateTableMsg));
SCreateTableMsg* pCreate = (SCreateTableMsg*) ((char*) pCreateMsg + sizeof(SCMCreateTableMsg));
size_t size = tListLen(pInfo->tableFname);
tstrncpy(pCreate->tableName, pInfo->tableFname, size);
......@@ -2894,8 +2943,8 @@ static int32_t mnodeGetChildTableMeta(SMnodeMsg *pMsg) {
}
void mnodeDropAllChildTablesInVgroups(SVgObj *pVgroup) {
void *pIter = NULL;
int32_t numOfTables = 0;
void * pIter = NULL;
int32_t numOfTables = 0;
SCTableObj *pTable = NULL;
mInfo("vgId:%d, all child tables will be dropped from sdb", pVgroup->vgId);
......@@ -2906,9 +2955,9 @@ void mnodeDropAllChildTablesInVgroups(SVgObj *pVgroup) {
if (pTable->vgId == pVgroup->vgId) {
SSdbRow row = {
.type = SDB_OPER_LOCAL,
.pTable = tsChildTableSdb,
.pObj = pTable,
.type = SDB_OPER_LOCAL,
.pTable = tsChildTableSdb,
.pObj = pTable,
};
sdbDeleteRow(&row);
numOfTables++;
......@@ -2920,8 +2969,8 @@ void mnodeDropAllChildTablesInVgroups(SVgObj *pVgroup) {
}
void mnodeDropAllChildTables(SDbObj *pDropDb) {
void *pIter = NULL;
int32_t numOfTables = 0;
void * pIter = NULL;
int32_t numOfTables = 0;
SCTableObj *pTable = NULL;
char prefix[64] = {0};
......@@ -2937,9 +2986,9 @@ void mnodeDropAllChildTables(SDbObj *pDropDb) {
if (strncmp(prefix, pTable->info.tableId, prefixLen) == 0) {
SSdbRow row = {
.type = SDB_OPER_LOCAL,
.pTable = tsChildTableSdb,
.pObj = pTable,
.type = SDB_OPER_LOCAL,
.pTable = tsChildTableSdb,
.pObj = pTable,
};
sdbDeleteRow(&row);
numOfTables++;
......@@ -2951,8 +3000,8 @@ void mnodeDropAllChildTables(SDbObj *pDropDb) {
}
static void mnodeDropAllChildTablesInStable(SSTableObj *pStable) {
void *pIter = NULL;
int32_t numOfTables = 0;
void * pIter = NULL;
int32_t numOfTables = 0;
SCTableObj *pTable = NULL;
mInfo("stable:%s uid:%" PRIu64 ", all child tables:%d will be dropped from sdb", pStable->info.tableId, pStable->uid,
......@@ -2964,9 +3013,9 @@ static void mnodeDropAllChildTablesInStable(SSTableObj *pStable) {
if (pTable->superTable == pStable) {
SSdbRow row = {
.type = SDB_OPER_LOCAL,
.pTable = tsChildTableSdb,
.pObj = pTable,
.type = SDB_OPER_LOCAL,
.pTable = tsChildTableSdb,
.pObj = pTable,
};
sdbDeleteRow(&row);
numOfTables++;
......@@ -3072,10 +3121,8 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
// If the table is deleted by another thread during creation, stop creating and send drop msg to vnode
if (sdbCheckRowDeleted(tsChildTableSdb, pTable)) {
mDebug(
"msg:%p, app:%p table:%s, create table rsp received, but a deleting opertion incoming, vgId:%d sid:%d "
"uid:%" PRIu64,
pMsg, pMsg->rpcMsg.ahandle, pTable->info.tableId, pTable->vgId, pTable->tid, pTable->uid);
mDebug("msg:%p, app:%p table:%s, create table rsp received, but a deleting opertion incoming, vgId:%d sid:%d uid:%" PRIu64,
pMsg, pMsg->rpcMsg.ahandle, pTable->info.tableId, pTable->vgId, pTable->tid, pTable->uid);
// if the vgroup is already dropped from hash, it can't be accquired by pTable->vgId
// so the refCount of vgroup can not be decreased
......@@ -3104,11 +3151,13 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
}
if (rpcMsg->code == TSDB_CODE_SUCCESS || rpcMsg->code == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
SSdbRow desc = {.type = SDB_OPER_GLOBAL,
.pObj = pTable,
.pTable = tsChildTableSdb,
.pMsg = pMsg,
.fpRsp = mnodeDoCreateChildTableCb};
SSdbRow desc = {
.type = SDB_OPER_GLOBAL,
.pObj = pTable,
.pTable = tsChildTableSdb,
.pMsg = pMsg,
.fpRsp = mnodeDoCreateChildTableCb
};
int32_t code = sdbInsertRowToQueue(&desc);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
......@@ -3116,15 +3165,16 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
mnodeDestroyChildTable(pTable);
if (pMsg->pBatchMasterMsg) {
++pMsg->pBatchMasterMsg->received;
pMsg->pBatchMasterMsg->code = code;
if (pMsg->pBatchMasterMsg->successed + pMsg->pBatchMasterMsg->received >= pMsg->pBatchMasterMsg->expected) {
dnodeSendRpcMWriteRsp(pMsg->pBatchMasterMsg, code);
}
++pMsg->pBatchMasterMsg->received;
pMsg->pBatchMasterMsg->code = code;
if (pMsg->pBatchMasterMsg->successed + pMsg->pBatchMasterMsg->received
>= pMsg->pBatchMasterMsg->expected) {
dnodeSendRpcMWriteRsp(pMsg->pBatchMasterMsg, code);
}
mnodeDestroySubMsg(pMsg);
mnodeDestroySubMsg(pMsg);
return;
return;
}
dnodeSendRpcMWriteRsp(pMsg, code);
......@@ -3149,20 +3199,21 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
sdbDeleteRow(&row);
if (rpcMsg->code == TSDB_CODE_APP_NOT_READY) {
// Avoid retry again in client
//Avoid retry again in client
rpcMsg->code = TSDB_CODE_MND_VGROUP_NOT_READY;
}
if (pMsg->pBatchMasterMsg) {
++pMsg->pBatchMasterMsg->received;
pMsg->pBatchMasterMsg->code = rpcMsg->code;
if (pMsg->pBatchMasterMsg->successed + pMsg->pBatchMasterMsg->received >= pMsg->pBatchMasterMsg->expected) {
dnodeSendRpcMWriteRsp(pMsg->pBatchMasterMsg, rpcMsg->code);
}
++pMsg->pBatchMasterMsg->received;
pMsg->pBatchMasterMsg->code = rpcMsg->code;
if (pMsg->pBatchMasterMsg->successed + pMsg->pBatchMasterMsg->received
>= pMsg->pBatchMasterMsg->expected) {
dnodeSendRpcMWriteRsp(pMsg->pBatchMasterMsg, rpcMsg->code);
}
mnodeDestroySubMsg(pMsg);
mnodeDestroySubMsg(pMsg);
return;
return;
}
dnodeSendRpcMWriteRsp(pMsg, rpcMsg->code);
......@@ -3199,8 +3250,7 @@ static void mnodeProcessAlterTableRsp(SRpcMsg *rpcMsg) {
}
}
static SMultiTableMeta *ensureMsgBufferSpace(SMultiTableMeta *pMultiMeta, SArray *pList, int32_t *totalMallocLen,
int32_t numOfVgroupList) {
static SMultiTableMeta* ensureMsgBufferSpace(SMultiTableMeta *pMultiMeta, SArray* pList, int32_t* totalMallocLen, int32_t numOfVgroupList) {
int32_t len = 0;
for (int32_t i = 0; i < numOfVgroupList; ++i) {
char *name = taosArrayGetP(pList, i);
......@@ -3212,7 +3262,7 @@ static SMultiTableMeta *ensureMsgBufferSpace(SMultiTableMeta *pMultiMeta, SArray
(*totalMallocLen) *= 2;
}
SMultiTableMeta *pMultiMeta1 = realloc(pMultiMeta, *totalMallocLen);
SMultiTableMeta* pMultiMeta1 = realloc(pMultiMeta, *totalMallocLen);
if (pMultiMeta1 == NULL) {
return NULL;
}
......@@ -3225,17 +3275,17 @@ static SMultiTableMeta *ensureMsgBufferSpace(SMultiTableMeta *pMultiMeta, SArray
static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
SMultiTableInfoMsg *pInfo = pMsg->rpcMsg.pCont;
pInfo->numOfTables = htonl(pInfo->numOfTables);
pInfo->numOfTables = htonl(pInfo->numOfTables);
pInfo->numOfVgroups = htonl(pInfo->numOfVgroups);
pInfo->numOfUdfs = htonl(pInfo->numOfUdfs);
pInfo->numOfUdfs = htonl(pInfo->numOfUdfs);
int32_t contLen = pMsg->rpcMsg.contLen - sizeof(SMultiTableInfoMsg);
int32_t num = 0;
int32_t code = TSDB_CODE_SUCCESS;
char *str = strndup(pInfo->tableNames, contLen);
char **nameList = strsplit(str, "`", &num);
SArray *pList = taosArrayInit(4, POINTER_BYTES);
int32_t num = 0;
int32_t code = TSDB_CODE_SUCCESS;
char* str = strndup(pInfo->tableNames, contLen);
char** nameList = strsplit(str, "`", &num);
SArray* pList = taosArrayInit(4, POINTER_BYTES);
SMultiTableMeta *pMultiMeta = NULL;
if (num != pInfo->numOfTables + pInfo->numOfVgroups + pInfo->numOfUdfs) {
......@@ -3245,8 +3295,7 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
}
// first malloc 80KB, subsequent reallocation will expand the size as twice of the original size
int32_t totalMallocLen =
sizeof(SMultiTableMeta) + sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16);
int32_t totalMallocLen = sizeof(SMultiTableMeta) + sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16);
pMultiMeta = calloc(1, totalMallocLen);
if (pMultiMeta == NULL) {
code = TSDB_CODE_MND_OUT_OF_MEMORY;
......@@ -3262,8 +3311,7 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
pMsg->pTable = mnodeGetTable(fullName);
if (pMsg->pTable == NULL) {
mError("msg:%p, app:%p table:%s, failed to get table meta, table not exist", pMsg, pMsg->rpcMsg.ahandle,
fullName);
mError("msg:%p, app:%p table:%s, failed to get table meta, table not exist", pMsg, pMsg->rpcMsg.ahandle, fullName);
code = TSDB_CODE_MND_INVALID_TABLE_NAME;
goto _end;
}
......@@ -3289,7 +3337,7 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
}
}
STableMetaMsg *pMeta = (STableMetaMsg *)((char *)pMultiMeta + pMultiMeta->contLen);
STableMetaMsg *pMeta = (STableMetaMsg *)((char*) pMultiMeta + pMultiMeta->contLen);
if (pMsg->pTable->type == TSDB_SUPER_TABLE) {
code = mnodeDoGetSuperTableMeta(pMsg, pMeta);
......@@ -3311,19 +3359,19 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
} else {
// ignore error and continue.
// Otherwise the client may found that the responding message is inconsistent.
// goto _end;
// goto _end;
}
}
int32_t tableNum = pInfo->numOfTables + pInfo->numOfVgroups;
// add the additional super table names that needs the vgroup info
for (; t < tableNum; ++t) {
for(;t < tableNum; ++t) {
taosArrayPush(pList, &nameList[t]);
}
// add the pVgroupList into the pList
int32_t numOfVgroupList = (int32_t)taosArrayGetSize(pList);
int32_t numOfVgroupList = (int32_t) taosArrayGetSize(pList);
pMultiMeta->numOfVgroup = htonl(numOfVgroupList);
pMultiMeta = ensureMsgBufferSpace(pMultiMeta, pList, &totalMallocLen, numOfVgroupList);
......@@ -3332,9 +3380,9 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
goto _end;
}
char *msg = (char *)pMultiMeta + pMultiMeta->contLen;
for (int32_t i = 0; i < numOfVgroupList; ++i) {
char *name = taosArrayGetP(pList, i);
char* msg = (char*) pMultiMeta + pMultiMeta->contLen;
for(int32_t i = 0; i < numOfVgroupList; ++i) {
char* name = taosArrayGetP(pList, i);
SSTableObj *pTable = mnodeGetSuperTable(name);
if (pTable == NULL) {
......@@ -3346,44 +3394,44 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
msg = serializeVgroupInfo(pTable, name, msg, pMsg, pMsg->rpcMsg.ahandle);
}
pMultiMeta->contLen = (int32_t)(msg - (char *)pMultiMeta);
pMultiMeta->contLen = (int32_t) (msg - (char*) pMultiMeta);
pMultiMeta->numOfTables = htonl(pMultiMeta->numOfTables);
// add the user-defined-function information
for (int32_t i = 0; i < pInfo->numOfUdfs; ++i, ++t) {
for(int32_t i = 0; i < pInfo->numOfUdfs; ++i, ++t) {
char buf[TSDB_FUNC_NAME_LEN] = {0};
tstrncpy(buf, nameList[t], TSDB_FUNC_NAME_LEN);
SFuncObj *pFuncObj = mnodeGetFunc(buf);
SFuncObj* pFuncObj = mnodeGetFunc(buf);
if (pFuncObj == NULL) {
mError("function %s does not exist", buf);
code = TSDB_CODE_MND_INVALID_FUNC;
goto _end;
}
SFunctionInfoMsg *pFuncInfo = (SFunctionInfoMsg *)msg;
SFunctionInfoMsg* pFuncInfo = (SFunctionInfoMsg*) msg;
strcpy(pFuncInfo->name, buf);
pFuncInfo->len = htonl(pFuncObj->contLen);
memcpy(pFuncInfo->content, pFuncObj->cont, pFuncObj->contLen);
pFuncInfo->funcType = htonl(pFuncObj->funcType);
pFuncInfo->resType = pFuncObj->resType;
pFuncInfo->resType = pFuncObj->resType;
pFuncInfo->resBytes = htons(pFuncObj->resBytes);
pFuncInfo->bufSize = htonl(pFuncObj->bufSize);
pFuncInfo->bufSize = htonl(pFuncObj->bufSize);
msg += sizeof(SFunctionInfoMsg) + pFuncObj->contLen;
}
pMultiMeta->contLen = (int32_t)(msg - (char *)pMultiMeta);
pMultiMeta->contLen = (int32_t) (msg - (char*) pMultiMeta);
pMultiMeta->numOfUdf = htonl(pInfo->numOfUdfs);
pMsg->rpcRsp.rsp = pMultiMeta;
pMsg->rpcRsp.len = pMultiMeta->contLen;
code = TSDB_CODE_SUCCESS;
char *tmp = rpcMallocCont(pMultiMeta->contLen + 2);
char* tmp = rpcMallocCont(pMultiMeta->contLen + 2);
if (tmp == NULL) {
code = TSDB_CODE_MND_OUT_OF_MEMORY;
goto _end;
......@@ -3395,7 +3443,7 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
pMultiMeta->metaClone = pInfo->metaClone;
pMultiMeta->rawLen = pMultiMeta->contLen;
if (len == -1 || len >= dataLen + 2) { // compress failed, do not compress this binary data
if (len == -1 || len >= dataLen + 2) { // compress failed, do not compress this binary data
pMultiMeta->compressed = 0;
memcpy(tmp, pMultiMeta, sizeof(SMultiTableMeta) + pMultiMeta->contLen);
} else {
......@@ -3409,15 +3457,15 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
pMsg->rpcRsp.rsp = tmp;
pMsg->rpcRsp.len = pMultiMeta->contLen;
SMultiTableMeta *p = (SMultiTableMeta *)tmp;
SMultiTableMeta* p = (SMultiTableMeta*) tmp;
mDebug("multiTable info build completed, original:%d, compressed:%d, comp:%d", p->rawLen, p->contLen, p->compressed);
_end:
_end:
tfree(str);
tfree(nameList);
taosArrayDestroy(&pList);
pMsg->pTable = NULL;
pMsg->pTable = NULL;
pMsg->pVgroup = NULL;
tfree(pMultiMeta);
......@@ -3434,10 +3482,10 @@ static int32_t mnodeGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void
return TSDB_CODE_MND_DB_IN_DROPPING;
}
int32_t cols = 0;
int32_t cols = 0;
SSchema *pSchema = pMeta->schema;
SSchema *s = tGetTbnameColumnSchema();
SSchema* s = tGetTbnameColumnSchema();
pShow->bytes[cols] = s->bytes;
pSchema[cols].type = s->type;
strcpy(pSchema[cols].name, "table_name");
......@@ -3456,14 +3504,14 @@ static int32_t mnodeGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
SSchema *tbCol = tGetTbnameColumnSchema();
SSchema* tbCol = tGetTbnameColumnSchema();
pShow->bytes[cols] = tbCol->bytes + VARSTR_HEADER_SIZE;
pSchema[cols].type = tbCol->type;
strcpy(pSchema[cols].name, "stable_name");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 8; // table uid
pShow->bytes[cols] = 8; // table uid
pSchema[cols].type = TSDB_DATA_TYPE_BIGINT;
strcpy(pSchema[cols].name, "uid");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
......@@ -3481,6 +3529,7 @@ static int32_t mnodeGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pMeta->numOfColumns = htons(cols);
pShow->numOfColumns = cols;
......@@ -3490,7 +3539,7 @@ static int32_t mnodeGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void
}
pShow->numOfRows = pDb->numOfTables;
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
mnodeDecDbRef(pDb);
return 0;
......@@ -3506,15 +3555,15 @@ static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows
return 0;
}
int32_t cols = 0;
int32_t numOfRows = 0;
SCTableObj *pTable = NULL;
int32_t cols = 0;
int32_t numOfRows = 0;
SCTableObj *pTable = NULL;
SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER;
char prefix[64] = {0};
char prefix[64] = {0};
int32_t prefixLen = (int32_t)tableIdPrefix(pDb->name, prefix, 64);
char *pattern = mnodeGetTableShowPattern(pShow);
char* pattern = mnodeGetTableShowPattern(pShow);
if (pShow->payloadLen > 0 && pattern == NULL) {
return 0;
}
......@@ -3546,7 +3595,7 @@ static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = pTable->createdTime;
*(int64_t *) pWrite = pTable->createdTime;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
......@@ -3570,17 +3619,18 @@ static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows
// uid
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = pTable->uid;
*(int64_t*) pWrite = pTable->uid;
cols++;
// tid
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pTable->tid;
*(int32_t*) pWrite = pTable->tid;
cols++;
// vgid
//vgid
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pTable->vgId;
*(int32_t*) pWrite = pTable->vgId;
cols++;
numOfRows++;
......@@ -3603,8 +3653,7 @@ static int32_t mnodeProcessAlterTableMsg(SMnodeMsg *pMsg) {
if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDbByTableName(pAlter->tableFname);
if (pMsg->pDb == NULL) {
mError("msg:%p, app:%p table:%s, failed to alter table, db not selected", pMsg, pMsg->rpcMsg.ahandle,
pAlter->tableFname);
mError("msg:%p, app:%p table:%s, failed to alter table, db not selected", pMsg, pMsg->rpcMsg.ahandle, pAlter->tableFname);
return TSDB_CODE_MND_DB_NOT_SELECTED;
}
......@@ -3622,8 +3671,7 @@ static int32_t mnodeProcessAlterTableMsg(SMnodeMsg *pMsg) {
if (pMsg->pTable == NULL) pMsg->pTable = mnodeGetTable(pAlter->tableFname);
if (pMsg->pTable == NULL) {
mError("msg:%p, app:%p table:%s, failed to alter table, table not exist", pMsg, pMsg->rpcMsg.ahandle,
pAlter->tableFname);
mError("msg:%p, app:%p table:%s, failed to alter table, table not exist", pMsg, pMsg->rpcMsg.ahandle, pAlter->tableFname);
return TSDB_CODE_MND_INVALID_TABLE_NAME;
}
......@@ -3674,7 +3722,7 @@ static int32_t mnodeProcessAlterTableMsg(SMnodeMsg *pMsg) {
}
}
return code;
return code;
}
static int32_t mnodeGetStreamTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
......@@ -3687,10 +3735,10 @@ static int32_t mnodeGetStreamTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, vo
return TSDB_CODE_MND_DB_IN_DROPPING;
}
int32_t cols = 0;
int32_t cols = 0;
SSchema *pSchema = pMeta->schema;
SSchema *tbnameColSchema = tGetTbnameColumnSchema();
SSchema* tbnameColSchema = tGetTbnameColumnSchema();
pShow->bytes[cols] = tbnameColSchema->bytes;
pSchema[cols].type = tbnameColSchema->type;
strcpy(pSchema[cols].name, "table_name");
......@@ -3724,7 +3772,7 @@ static int32_t mnodeGetStreamTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, vo
}
pShow->numOfRows = pDb->numOfTables;
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
mnodeDecDbRef(pDb);
return 0;
......@@ -3740,8 +3788,8 @@ static int32_t mnodeRetrieveStreamTables(SShowObj *pShow, char *data, int32_t ro
return 0;
}
int32_t numOfRows = 0;
SCTableObj *pTable = NULL;
int32_t numOfRows = 0;
SCTableObj *pTable = NULL;
SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER;
char prefix[64] = {0};
......@@ -3749,7 +3797,7 @@ static int32_t mnodeRetrieveStreamTables(SShowObj *pShow, char *data, int32_t ro
strcat(prefix, TS_PATH_DELIMITER);
int32_t prefixLen = (int32_t)strlen(prefix);
char *pattern = mnodeGetTableShowPattern(pShow);
char* pattern = mnodeGetTableShowPattern(pShow);
if (pShow->payloadLen > 0 && pattern == NULL) {
return 0;
}
......@@ -3782,7 +3830,7 @@ static int32_t mnodeRetrieveStreamTables(SShowObj *pShow, char *data, int32_t ro
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = pTable->createdTime;
*(int64_t *) pWrite = pTable->createdTime;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
......@@ -3807,7 +3855,7 @@ static int32_t mnodeRetrieveStreamTables(SShowObj *pShow, char *data, int32_t ro
}
static int32_t mnodeCompactSuperTables() {
void *pIter = NULL;
void *pIter = NULL;
SSTableObj *pTable = NULL;
mInfo("start to compact super table...");
......@@ -3818,13 +3866,13 @@ static int32_t mnodeCompactSuperTables() {
int32_t schemaSize = (pTable->numOfColumns + pTable->numOfTags) * sizeof(SSchema);
SSdbRow row = {
.type = SDB_OPER_GLOBAL,
.pTable = tsSuperTableSdb,
.pObj = pTable,
.rowSize = sizeof(SSTableObj) + schemaSize,
.type = SDB_OPER_GLOBAL,
.pTable = tsSuperTableSdb,
.pObj = pTable,
.rowSize = sizeof(SSTableObj) + schemaSize,
};
// mInfo("compact super %" PRIu64, pTable->uid);
//mInfo("compact super %" PRIu64, pTable->uid);
sdbInsertCompactRow(&row);
}
......@@ -3835,7 +3883,7 @@ static int32_t mnodeCompactSuperTables() {
}
static int32_t mnodeCompactChildTables() {
void *pIter = NULL;
void *pIter = NULL;
SCTableObj *pTable = NULL;
mInfo("start to compact child table...");
......@@ -3845,12 +3893,12 @@ static int32_t mnodeCompactChildTables() {
if (pTable == NULL) break;
SSdbRow row = {
.type = SDB_OPER_GLOBAL,
.pObj = pTable,
.pTable = tsChildTableSdb,
.type = SDB_OPER_GLOBAL,
.pObj = pTable,
.pTable = tsChildTableSdb,
};
// mInfo("compact child %" PRIu64 ":%d", pTable->uid, pTable->tid);
//mInfo("compact child %" PRIu64 ":%d", pTable->uid, pTable->tid);
sdbInsertCompactRow(&row);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册