From d2e3ed8b7ddd7d6a3f6932c30584931a638284db Mon Sep 17 00:00:00 2001 From: slguan Date: Tue, 24 Mar 2020 12:49:22 +0800 Subject: [PATCH] [TD-15] refact code of sdb --- src/inc/mnode.h | 20 ++--- src/mnode/src/mgmtChildTable.c | 49 ++++-------- src/mnode/src/mgmtDb.c | 4 +- src/mnode/src/mgmtNormalTable.c | 66 ++++------------- src/mnode/src/mgmtSdb.c | 39 +++++----- src/mnode/src/mgmtSuperTable.c | 2 +- src/mnode/src/mgmtTable.c | 127 +++++++++++++++++++++----------- src/mnode/src/mgmtUser.c | 4 +- src/mnode/src/mgmtVgroup.c | 8 +- src/util/src/tidpool.c | 35 ++++----- 10 files changed, 165 insertions(+), 189 deletions(-) diff --git a/src/inc/mnode.h b/src/inc/mnode.h index cde26dad4a..792a8ec1cc 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -53,8 +53,8 @@ typedef struct { int8_t numOfMnodes; int32_t numOfDnodes; char mnodeName[TSDB_DNODE_NAME_LEN + 1]; - char reserved[7]; - char updateEnd[1]; + int8_t reserved[15]; + int8_t updateEnd[1]; int syncFd; void *hbTimer; void *pSync; @@ -78,8 +78,8 @@ typedef struct { float lbScore; // calc in balance function int32_t customScore; // config by user char dnodeName[TSDB_DNODE_NAME_LEN + 1]; - char reserved[7]; - char updateEnd[1]; + int8_t reserved[15]; + int8_t updateEnd[1]; SVnodeLoad vload[TSDB_MAX_VNODES]; int32_t status; uint32_t lastReboot; // time stamp for last reboot @@ -120,7 +120,7 @@ typedef struct SSuperTableObj { int32_t sversion; int32_t numOfColumns; int32_t numOfTags; - int8_t reserved[5]; + int8_t reserved[15]; int8_t updateEnd[1]; int32_t numOfTables; int16_t nextColId; @@ -152,7 +152,7 @@ typedef struct { int32_t sversion; int32_t numOfColumns; int32_t sqlLen; - int8_t reserved[3]; + int8_t reserved[7]; int8_t updateEnd[1]; char* sql; //null-terminated string int16_t nextColId; @@ -167,7 +167,6 @@ typedef struct _vg_obj { int64_t createdTime; SVnodeGid vnodeGid[TSDB_VNODES_SUPPORT]; int32_t numOfVnodes; - int32_t numOfTables; int32_t lbIp; int32_t lbTime; int8_t lbStatus; @@ -175,6 +174,7 @@ typedef struct _vg_obj { int8_t updateEnd[1]; struct _vg_obj *prev, *next; struct _db_obj *pDb; + int32_t numOfTables; void * idPool; STableInfo ** tableList; } SVgObj; @@ -184,8 +184,8 @@ typedef struct _db_obj { int8_t dirty; int64_t createdTime; SDbCfg cfg; - char reserved[15]; - char updateEnd[1]; + int8_t reserved[15]; + int8_t updateEnd[1]; struct _db_obj *prev, *next; int32_t numOfVgroups; int32_t numOfTables; @@ -203,7 +203,7 @@ typedef struct _user_obj { int64_t createdTime; int8_t superAuth; int8_t writeAuth; - int8_t reserved[16]; + int8_t reserved[13]; int8_t updateEnd[1]; struct _user_obj *prev, *next; struct _acctObj * pAcct; diff --git a/src/mnode/src/mgmtChildTable.c b/src/mnode/src/mgmtChildTable.c index d717a4e5e9..9076681caa 100644 --- a/src/mnode/src/mgmtChildTable.c +++ b/src/mnode/src/mgmtChildTable.c @@ -43,8 +43,7 @@ static void mgmtDestroyChildTable(SChildTableObj *pTable) { } static int32_t mgmtChildTableActionDestroy(SSdbOperDesc *pOper) { - SChildTableObj *pTable = pOper->pObj; - mgmtDestroyChildTable(pTable); + mgmtDestroyChildTable(pOper->pObj); return TSDB_CODE_SUCCESS; } @@ -69,14 +68,6 @@ static int32_t mgmtChildTableActionInsert(SSdbOperDesc *pOper) { return TSDB_CODE_INVALID_ACCT; } - if (!mgmtIsMaster()) { - int32_t sid = taosAllocateId(pVgroup->idPool); - if (sid != pTable->sid) { - mError("ctable:%s, sid:%d is not matched from the master:%d", pTable->tableId, sid, pTable->sid); - return TSDB_CODE_INVALID_SESSION_ID; - } - } - pTable->superTable = mgmtGetSuperTable(pTable->superTableId); mgmtAddTableIntoSuperTable(pTable->superTable); @@ -84,10 +75,6 @@ static int32_t mgmtChildTableActionInsert(SSdbOperDesc *pOper) { mgmtAddTableIntoDb(pDb); mgmtAddTableIntoVgroup(pVgroup, (STableInfo *) pTable); - if (pVgroup->numOfTables >= pDb->cfg.maxSessions && pDb->numOfVgroups > 1) { - mgmtMoveVgroupToTail(pVgroup); - } - return TSDB_CODE_SUCCESS; } @@ -117,13 +104,8 @@ static int32_t mgmtChildTableActionDelete(SSdbOperDesc *pOper) { mgmtRestoreTimeSeries(pAcct, pTable->superTable->numOfColumns - 1); mgmtRemoveTableFromDb(pDb); mgmtRemoveTableFromVgroup(pVgroup, (STableInfo *) pTable); - mgmtRemoveTableFromSuperTable(pTable->superTable); - if (pVgroup->numOfTables > 0) { - mgmtMoveVgroupToHead(pVgroup); - } - return TSDB_CODE_SUCCESS; } @@ -136,7 +118,9 @@ static int32_t mgmtChildTableActionEncode(SSdbOperDesc *pOper) { assert(pTable != NULL && pOper->rowData != NULL); memcpy(pOper->rowData, pTable, tsChildTableUpdateSize); - return tsChildTableUpdateSize; + pOper->rowSize = tsChildTableUpdateSize; + + return TSDB_CODE_SUCCESS; } static int32_t mgmtChildTableActionDecode(SSdbOperDesc *pOper) { @@ -158,7 +142,7 @@ int32_t mgmtInitChildTables() { SChildTableObj *pTable = NULL; SChildTableObj tObj; - tsChildTableUpdateSize = tObj.updateEnd - (int8_t *)&tObj; + tsChildTableUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj; SSdbTableDesc tableDesc = { .tableName = "ctables", @@ -236,9 +220,6 @@ int32_t mgmtInitChildTables() { continue; } - pVgroup->tableList[pTable->sid] = (STableInfo*)pTable; - taosIdPoolMarkStatus(pVgroup->idPool, pTable->sid); - SSuperTableObj *pSuperTable = mgmtGetSuperTable(pTable->superTableId); if (pSuperTable == NULL) { mError("ctable:%s, stable:%s not exist", pTable->tableId, pTable->superTableId); @@ -251,12 +232,6 @@ int32_t mgmtInitChildTables() { pNode = pLastNode; continue; } - - pTable->superTable = pSuperTable; - mgmtAddTableIntoSuperTable(pSuperTable); - - SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); - mgmtAddTimeSeries(pAcct, pTable->superTable->numOfColumns - 1); } mTrace("child table is initialized"); @@ -268,8 +243,13 @@ void mgmtCleanUpChildTables() { } void *mgmtBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableObj *pTable) { - char *pTagData = pMsg->schema + TSDB_TABLE_ID_LEN + 1; - int32_t tagDataLen = htonl(pMsg->contLen) - sizeof(SCMCreateTableMsg) - TSDB_TABLE_ID_LEN - 1; + char * pTagData = NULL; + int32_t tagDataLen = 0; + if (pMsg != NULL) { + pTagData = pMsg->schema + TSDB_TABLE_ID_LEN + 1; + tagDataLen = htonl(pMsg->contLen) - sizeof(SCMCreateTableMsg) - TSDB_TABLE_ID_LEN - 1; + } + int32_t totalCols = pTable->superTable->numOfColumns + pTable->superTable->numOfTags; int32_t contLen = sizeof(SMDCreateTableMsg) + totalCols * sizeof(SSchema) + tagDataLen; @@ -302,7 +282,10 @@ void *mgmtBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableObj *pTab pSchema++; } - memcpy(pCreate->data + totalCols * sizeof(SSchema), pTagData, tagDataLen); + if (pMsg != NULL) { + memcpy(pCreate->data + totalCols * sizeof(SSchema), pTagData, tagDataLen); + } + return pCreate; } diff --git a/src/mnode/src/mgmtDb.c b/src/mnode/src/mgmtDb.c index e8ab5aaa8c..0cc793f2c0 100644 --- a/src/mnode/src/mgmtDb.c +++ b/src/mnode/src/mgmtDb.c @@ -117,7 +117,7 @@ static int32_t mgmtDbActionDecode(SSdbOperDesc *pOper) { int32_t mgmtInitDbs() { SDbObj tObj; - tsDbUpdateSize = tObj.updateEnd - (char *)&tObj; + tsDbUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj; SSdbTableDesc tableDesc = { .tableName = "dbs", @@ -276,8 +276,6 @@ static int32_t mgmtCheckDbParams(SCMCreateDbMsg *pCreate) { pCreate->blocksPerTable = TSDB_MIN_AVG_BLOCKS; } - pCreate->maxSessions++; - return TSDB_CODE_SUCCESS; } diff --git a/src/mnode/src/mgmtNormalTable.c b/src/mnode/src/mgmtNormalTable.c index 0ffd4fb28e..4e5e4ce7cc 100644 --- a/src/mnode/src/mgmtNormalTable.c +++ b/src/mnode/src/mgmtNormalTable.c @@ -33,7 +33,7 @@ #include "mgmtTable.h" #include "mgmtVgroup.h" -void *tsNormalTableSdb; +void *tsNormalTableSdb; int32_t tsNormalTableUpdateSize; static void mgmtDestroyNormalTable(SNormalTableObj *pTable) { @@ -43,8 +43,7 @@ static void mgmtDestroyNormalTable(SNormalTableObj *pTable) { } static int32_t mgmtNormalTableActionDestroy(SSdbOperDesc *pOper) { - SNormalTableObj *pTable = pOper->pObj; - mgmtDestroyNormalTable(pTable); + mgmtDestroyNormalTable(pOper->pObj); return TSDB_CODE_SUCCESS; } @@ -53,38 +52,26 @@ static int32_t mgmtNormalTableActionInsert(SSdbOperDesc *pOper) { SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); if (pVgroup == NULL) { - mError("id:%s not in vgroup:%d", pTable->tableId, pTable->vgId); + mError("ntable:%s not in vgroup:%d", pTable->tableId, pTable->vgId); return TSDB_CODE_INVALID_VGROUP_ID; } SDbObj *pDb = mgmtGetDb(pVgroup->dbName); if (pDb == NULL) { - mError("vgroup:%d not in DB:%s", pVgroup->vgId, pVgroup->dbName); + mError("ntable:%s, vgroup:%d not in DB:%s", pTable->tableId, pVgroup->vgId, pVgroup->dbName); return TSDB_CODE_INVALID_DB; } SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); if (pAcct == NULL) { - mError("account not exists"); + mError("ntable:%s, account:%s not exists", pTable->tableId, pDb->cfg.acct); return TSDB_CODE_INVALID_ACCT; } - if (!mgmtIsMaster()) { - int32_t sid = taosAllocateId(pVgroup->idPool); - if (sid != pTable->sid) { - mError("sid:%d is not matched from the master:%d", sid, pTable->sid); - return TSDB_CODE_INVALID_SESSION_ID; - } - } - mgmtAddTimeSeries(pAcct, pTable->numOfColumns - 1); mgmtAddTableIntoDb(pDb); mgmtAddTableIntoVgroup(pVgroup, (STableInfo *) pTable); - if (pVgroup->numOfTables >= pDb->cfg.maxSessions - 1 && pDb->numOfVgroups > 1) { - mgmtMoveVgroupToTail(pVgroup); - } - return TSDB_CODE_SUCCESS; } @@ -101,7 +88,7 @@ static int32_t mgmtNormalTableActionDelete(SSdbOperDesc *pOper) { SDbObj *pDb = mgmtGetDb(pVgroup->dbName); if (pDb == NULL) { - mError("vgroup:%d not in DB:%s", pVgroup->vgId, pVgroup->dbName); + mError("ntable:%s, vgroup:%d not in DB:%s", pTable->tableId, pVgroup->vgId, pVgroup->dbName); return TSDB_CODE_INVALID_DB; } @@ -115,22 +102,10 @@ static int32_t mgmtNormalTableActionDelete(SSdbOperDesc *pOper) { mgmtRemoveTableFromDb(pDb); mgmtRemoveTableFromVgroup(pVgroup, (STableInfo *) pTable); - if (pVgroup->numOfTables > 0) { - mgmtMoveVgroupToHead(pVgroup); - } - return TSDB_CODE_SUCCESS; } static int32_t mgmtNormalTableActionUpdate(SSdbOperDesc *pOper) { - // SNormalTableObj *pTable = (SNormalTableObj *) pObj; - // memcpy(pTable, str, tsNormalTableUpdateSize); - - // int32_t schemaSize = sizeof(SSchema) * (pTable->numOfColumns) + pTable->sqlLen; - // pTable->schema = realloc(pTable->schema, schemaSize); - // pTable->sql = (char*)pTable->schema + sizeof(SSchema) * (pTable->numOfColumns); - // memcpy(pTable->schema, str + tsNormalTableUpdateSize, schemaSize); - return TSDB_CODE_SUCCESS; } @@ -139,25 +114,24 @@ static int32_t mgmtNormalTableActionEncode(SSdbOperDesc *pOper) { assert(pOper->pObj != NULL && pOper->rowData != NULL); int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema); - if (pOper->maxRowSize < tsNormalTableUpdateSize + schemaSize + 1) { - return -1; + if (pOper->maxRowSize < tsNormalTableUpdateSize + schemaSize) { + return TSDB_CODE_INVALID_MSG_LEN; } memcpy(pOper->rowData, pTable, tsNormalTableUpdateSize); memcpy(pOper->rowData + tsNormalTableUpdateSize, pTable->schema, schemaSize); memcpy(pOper->rowData + tsNormalTableUpdateSize + schemaSize, pTable->sql, pTable->sqlLen); - - return tsNormalTableUpdateSize + schemaSize + pTable->sqlLen; + + pOper->rowSize = tsNormalTableUpdateSize + schemaSize + pTable->sqlLen; + return TSDB_CODE_SUCCESS; } static int32_t mgmtNormalTableActionDecode(SSdbOperDesc *pOper) { assert(pOper->rowData != NULL); - SNormalTableObj *pTable = (SNormalTableObj *)malloc(sizeof(SNormalTableObj)); - if (pTable == NULL) { - return -1; - } - memset(pTable, 0, sizeof(SNormalTableObj)); + SNormalTableObj *pTable = (SNormalTableObj *)calloc(1, sizeof(SNormalTableObj)); + if (pTable == NULL) TSDB_CODE_SERV_OUT_OF_MEMORY; + memcpy(pTable, pOper->rowData, tsNormalTableUpdateSize); int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema); @@ -176,6 +150,7 @@ static int32_t mgmtNormalTableActionDecode(SSdbOperDesc *pOper) { } memcpy(pTable->sql, pOper->rowData + tsNormalTableUpdateSize + schemaSize, pTable->sqlLen); + pOper->pObj = pTable; return TSDB_CODE_SUCCESS; } @@ -185,7 +160,7 @@ int32_t mgmtInitNormalTables() { SNormalTableObj *pTable = NULL; SNormalTableObj tObj; - tsNormalTableUpdateSize = tObj.updateEnd - (int8_t *)&tObj; + tsNormalTableUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj; SSdbTableDesc tableDesc = { .tableName = "ntables", @@ -261,15 +236,6 @@ int32_t mgmtInitNormalTables() { pNode = pLastNode; continue; } - - mgmtAddTableIntoVgroup(pVgroup, (STableInfo *)pTable); - //pVgroup->tableList[pTable->sid] = (STableInfo*)pTable; - taosIdPoolMarkStatus(pVgroup->idPool, pTable->sid); - - pTable->sql = (char *)pTable->schema + sizeof(SSchema) * pTable->numOfColumns; - - SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); - mgmtAddTimeSeries(pAcct, pTable->numOfColumns - 1); } mTrace("ntables is initialized"); diff --git a/src/mnode/src/mgmtSdb.c b/src/mnode/src/mgmtSdb.c index eee8c0f1a0..e950761a31 100644 --- a/src/mnode/src/mgmtSdb.c +++ b/src/mnode/src/mgmtSdb.c @@ -309,11 +309,11 @@ static int32_t sdbInitTableByFile(SSdbTable *pTable) { rowMeta.row = oper.pObj; (*sdbAddIndexFp[pTable->keyType])(pTable->iHandle, rowMeta.row, &rowMeta); pTable->numOfRows++; - sdbTrace("table:%s, read new record:%s, numOfRows:%d version:%" PRId64 , - pTable->tableName, sdbGetkeyStr(pTable, rowHead->data), pTable->numOfRows, pTable->version); + sdbTrace("table:%s, version:%" PRId64 " numOfRows:%d, read new record:%s", + pTable->tableName, pTable->version, pTable->numOfRows, sdbGetkeyStr(pTable, rowHead->data)); } else { - sdbError("table:%s, failed to decode record:%s, numOfRows:%d version:%" PRId64 , - pTable->tableName, sdbGetkeyStr(pTable, rowHead->data), pTable->numOfRows, pTable->version); + sdbTrace("table:%s, version:%" PRId64 " numOfRows:%d, failed to decode record:%s", + pTable->tableName, pTable->version, pTable->numOfRows, sdbGetkeyStr(pTable, rowHead->data)); } } } else { @@ -325,8 +325,8 @@ static int32_t sdbInitTableByFile(SSdbTable *pTable) { (*pTable->destroyFp)(&oper); (*sdbDeleteIndexFp[pTable->keyType])(pTable->iHandle, rowHead->data); pTable->numOfRows--; - sdbTrace("table:%s, read deleted record:%s, numOfRows:%d version:%" PRId64 , - pTable->tableName, sdbGetkeyStr(pTable, rowHead->data), pTable->numOfRows, pTable->version); + sdbTrace("table:%s, version:%" PRId64 " numOfRows:%d, read deleted record:%s", + pTable->tableName, pTable->version, pTable->numOfRows, sdbGetkeyStr(pTable, rowHead->data)); } else { SRowMeta rowMeta; rowMeta.version = rowHead->version; @@ -346,11 +346,11 @@ static int32_t sdbInitTableByFile(SSdbTable *pTable) { if (code == TSDB_CODE_SUCCESS) { rowMeta.row = oper.pObj; (*sdbAddIndexFp[pTable->keyType])(pTable->iHandle, rowMeta.row, &rowMeta); - sdbTrace("table:%s, read updated record:%s, numOfRows:%d version:%" PRId64 , - pTable->tableName, sdbGetkeyStr(pTable, rowHead->data), pTable->numOfRows, pTable->version); + sdbTrace("table:%s, version:%" PRId64 " numOfRows:%d, read updated record:%s", + pTable->tableName, pTable->version, pTable->numOfRows, sdbGetkeyStr(pTable, rowHead->data)); } else { - sdbError("table:%s, failed to decode record:%s, numOfRows:%d version:%" PRId64 , - pTable->tableName, sdbGetkeyStr(pTable, rowHead->data), pTable->numOfRows, pTable->version); + sdbTrace("table:%s, version:%" PRId64 " numOfRows:%d, failed to decode record:%s", + pTable->tableName, pTable->version, pTable->numOfRows, sdbGetkeyStr(pTable, rowHead->data)); } } numOfChanged++; @@ -416,8 +416,8 @@ void *sdbOpenTable(SSdbTableDesc *pDesc) { pTable->tableId = sdbNumOfTables++; sdbTableList[pTable->tableId] = pTable; - sdbTrace("table:%s, is initialized, numOfRows:%d numOfTables:%d version:%" PRId64 " sdbversion:%" PRId64, - pTable->tableName, pTable->numOfRows, sdbNumOfTables, pTable->version, sdbVersion); + sdbTrace("table:%s, is initialized, sdbversion:%" PRId64 " version:%" PRId64 " numOfRows:%d numOfTables:%d", + pTable->tableName, sdbVersion, pTable->version, pTable->numOfRows, sdbNumOfTables); return pTable; } @@ -531,8 +531,8 @@ int32_t sdbInsertRow(SSdbOperDesc *pOper) { pthread_mutex_unlock(&pTable->mutex); - sdbTrace("table:%s, inserte record:%s, sdbversion:%" PRId64 " version:%" PRId64 " rowSize:%d numOfRows:%d fileSize:%" PRId64, - pTable->tableName, sdbGetkeyStr(pTable, pOper->pObj), sdbVersion, pTable->version, pOper->rowSize, pTable->numOfRows, pTable->fileSize); + sdbTrace("table:%s, sdbversion:%" PRId64 " version:%" PRId64 " numOfRows:%d, insert record:%s, rowSize:%d fileSize:%" PRId64, + pTable->tableName, sdbVersion, pTable->version, pTable->numOfRows, sdbGetkeyStr(pTable, pOper->pObj), pOper->rowSize, pTable->fileSize); (*pTable->insertFp)(pOper); @@ -614,8 +614,9 @@ int32_t sdbDeleteRow(SSdbOperDesc *pOper) { tfree(rowHead); - sdbTrace("table:%s, delete record:%s, sdbversion:%" PRId64 " version:%" PRId64 " numOfRows:%d", - pTable->tableName, sdbGetkeyStr(pTable, pOper->pObj), sdbVersion, pTable->version, pTable->numOfRows); + + sdbTrace("table:%s, sdbversion:%" PRId64 " version:%" PRId64 " numOfRows:%d, delete record:%s, rowSize:%d fileSize:%" PRId64, + pTable->tableName, sdbVersion, pTable->version, pTable->numOfRows, sdbGetkeyStr(pTable, pOper->pObj), pOper->rowSize, pTable->fileSize); // Delete from current layer (*sdbDeleteIndexFp[pTable->keyType])(pTable->iHandle, pOper->pObj); @@ -702,9 +703,9 @@ int32_t sdbUpdateRow(SSdbOperDesc *pOper) { pTable->fileSize += real_size; sdbFinishCommit(pTable); - sdbTrace("table:%s, update record:%s, sdbversion:%" PRId64 " version:%" PRId64 " numOfRows:%" PRId64, - pTable->tableName, sdbGetkeyStr(pTable, pOper->pObj), sdbVersion, pTable->version, pTable->numOfRows); - + sdbTrace("table:%s, sdbversion:%" PRId64 " version:%" PRId64 " numOfRows:%d, update record:%s, rowSize:%d fileSize:%" PRId64, + pTable->tableName, sdbVersion, pTable->version, pTable->numOfRows, sdbGetkeyStr(pTable, pOper->pObj), pOper->rowSize, pTable->fileSize); + pMeta->version = pTable->version; pMeta->offset = pTable->fileSize; pMeta->rowSize = rowHead->rowSize; diff --git a/src/mnode/src/mgmtSuperTable.c b/src/mnode/src/mgmtSuperTable.c index d3a0dcb6bc..f2b54ec000 100644 --- a/src/mnode/src/mgmtSuperTable.c +++ b/src/mnode/src/mgmtSuperTable.c @@ -111,7 +111,7 @@ static int32_t mgmtSuperTableActionDecode(SSdbOperDesc *pOper) { int32_t mgmtInitSuperTables() { SSuperTableObj tObj; - tsSuperTableUpdateSize = tObj.updateEnd - (int8_t *)&tObj; + tsSuperTableUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj; SSdbTableDesc tableDesc = { .tableName = "stables", diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index 918547fa12..675de57924 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -22,6 +22,7 @@ #include "mgmtDClient.h" #include "mgmtDb.h" #include "mgmtDnode.h" +#include "mgmtDServer.h" #include "mgmtGrant.h" #include "mgmtMnode.h" #include "mgmtNormalTable.h" @@ -61,6 +62,7 @@ static void mgmtProcessDropStableRsp(SRpcMsg *rpcMsg); static int32_t mgmtGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *pConn); static void mgmtProcessGetTableMeta(STableInfo *pTable, void *thandle); +static void mgmtProcessTableCfgMsg(SRpcMsg *rpcMsg); int32_t mgmtInitTables() { int32_t code = mgmtInitSuperTables(); @@ -90,6 +92,7 @@ int32_t mgmtInitTables() { mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_TABLE_RSP, mgmtProcessDropTableRsp); mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP, mgmtProcessAlterTableRsp); mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_STABLE_RSP, mgmtProcessDropStableRsp); + mgmtAddDServerMsgHandle(TSDB_MSG_TYPE_DM_CONFIG_TABLE, mgmtProcessTableCfgMsg); return TSDB_CODE_SUCCESS; } @@ -760,9 +763,19 @@ static void mgmtProcessCreateTableRsp(SRpcMsg *rpcMsg) { if (rpcMsg->code != TSDB_CODE_SUCCESS) { if (pTable->type == TSDB_CHILD_TABLE) { - // sdbDeleteRow(tsChildTableSdb, pTable, SDB_OPER_GLOBAL); + SSdbOperDesc oper = { + .type = SDB_OPER_TYPE_GLOBAL, + .table = tsChildTableSdb, + .pObj = pTable + }; + sdbDeleteRow(&oper); } else if (pTable->type == TSDB_NORMAL_TABLE){ - // sdbDeleteRow(tsNormalTableSdb, pTable, SDB_OPER_GLOBAL); + SSdbOperDesc oper = { + .type = SDB_OPER_TYPE_GLOBAL, + .table = tsNormalTableSdb, + .pObj = pTable + }; + sdbDeleteRow(&oper); } else {} mError("table:%s, failed to create in dnode, reason:%s", pTable->tableId, tstrerror(rpcMsg->code)); mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code); @@ -816,19 +829,31 @@ static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg) { } if (pTable->type == TSDB_CHILD_TABLE) { - // if (sdbDeleteRow(tsChildTableSdb, pTable, SDB_OPER_GLOBAL) < 0) { - // mError("table:%s, update ctables sdb error", pTable->tableId); - // mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SDB_ERROR); - // free(queueMsg); - // return; - // } + SSdbOperDesc oper = { + .type = SDB_OPER_TYPE_GLOBAL, + .table = tsChildTableSdb, + .pObj = pTable + }; + int32_t code = sdbDeleteRow(&oper); + if (code != TSDB_CODE_SUCCESS) { + mError("table:%s, update ctables sdb error", pTable->tableId); + mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SDB_ERROR); + free(queueMsg); + return; + } } else if (pTable->type == TSDB_NORMAL_TABLE){ - // if (sdbDeleteRow(tsNormalTableSdb, pTable, SDB_OPER_GLOBAL) < 0) { - // mError("table:%s, update ntables sdb error", pTable->tableId); - // mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SDB_ERROR); - // free(queueMsg); - // return; - // } + SSdbOperDesc oper = { + .type = SDB_OPER_TYPE_GLOBAL, + .table = tsNormalTableSdb, + .pObj = pTable + }; + int32_t code = sdbDeleteRow(&oper); + if (code != TSDB_CODE_SUCCESS) { + mError("table:%s, update ntables sdb error", pTable->tableId); + mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SDB_ERROR); + free(queueMsg); + return; + } } if (pVgroup->numOfTables <= 0) { @@ -844,32 +869,48 @@ static void mgmtProcessDropStableRsp(SRpcMsg *rpcMsg) { mTrace("drop stable rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code); } -// -// -//static void mgmtProcessTableCfgMsg(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle) { -// SDMConfigTableMsg *pCfg = (SDMConfigTableMsg *) pCont; -// pCfg->dnode = htonl(pCfg->dnode); -// pCfg->vnode = htonl(pCfg->vnode); -// pCfg->sid = htonl(pCfg->sid); -// mTrace("dnode:%s, vnode:%d, sid:%d, receive table config msg", taosIpStr(pCfg->dnode), pCfg->vnode, pCfg->sid); -// -// if (!sdbMaster) { -// mError("dnode:%s, vnode:%d, sid:%d, not master, redirect it", taosIpStr(pCfg->dnode), pCfg->vnode, pCfg->sid); -// mgmtSendRspToDnode(thandle, msgType + 1, TSDB_CODE_REDIRECT, NULL, 0); -// return; -// } -// -// STableInfo *pTable = mgmtGetTableByPos(pCfg->dnode, pCfg->vnode, pCfg->sid); -// if (pTable == NULL) { -// mError("dnode:%s, vnode:%d, sid:%d, table not found", taosIpStr(pCfg->dnode), pCfg->vnode, pCfg->sid); -// mgmtSendRspToDnode(thandle, msgType + 1, TSDB_CODE_INVALID_TABLE, NULL, 0); -// return; -// } -// -// mgmtSendRspToDnode(thandle, msgType + 1, TSDB_CODE_SUCCESS, NULL, 0); -// -// //TODO -// SRpcIpSet ipSet = mgmtGetIpSetFromIp(pCfg->dnode); -// mgmtSendCreateTableMsg(NULL, &ipSet, NULL); -//} -// \ No newline at end of file +static void mgmtProcessTableCfgMsg(SRpcMsg *rpcMsg) { + if (mgmtCheckRedirect(rpcMsg->handle)) return; + + SDMConfigTableMsg *pCfg = (SDMConfigTableMsg *) rpcMsg->pCont; + pCfg->dnode = htonl(pCfg->dnode); + pCfg->vnode = htonl(pCfg->vnode); + pCfg->sid = htonl(pCfg->sid); + mTrace("dnode:%s, vnode:%d, sid:%d, receive table config msg", taosIpStr(pCfg->dnode), pCfg->vnode, pCfg->sid); + + STableInfo *pTable = mgmtGetTableByPos(pCfg->dnode, pCfg->vnode, pCfg->sid); + if (pTable == NULL) { + mError("dnode:%s, vnode:%d, sid:%d, table not found", taosIpStr(pCfg->dnode), pCfg->vnode, pCfg->sid); + mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_NOT_ACTIVE_TABLE); + return; + } + + mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_SUCCESS); + + SMDCreateTableMsg *pMDCreate = NULL; + if (pTable->type == TSDB_CHILD_TABLE) { + mTrace("table:%s, is a child table, vgroup:%d sid:%d", pTable->tableId, pCfg->vnode, pCfg->sid); + pMDCreate = mgmtBuildCreateChildTableMsg(NULL, (SChildTableObj *) pTable); + if (pMDCreate == NULL) { + return; + } + } else if (pTable->type == TSDB_NORMAL_TABLE) { + mTrace("table:%s, is a normal table, vgroup:%d sid:%d", pTable->tableId, pCfg->vnode, pCfg->sid); + pMDCreate = mgmtBuildCreateNormalTableMsg((SNormalTableObj *) pTable); + if (pMDCreate == NULL) { + return; + } + } else { + mError("table:%s, invalid msg type, vgroup:%d sid:%d", pTable->tableId, pCfg->vnode, pCfg->sid); + } + + SRpcIpSet ipSet = mgmtGetIpSetFromIp(pCfg->dnode); + SRpcMsg rpcRsp = { + .handle = NULL, + .pCont = pMDCreate, + .contLen = htonl(pMDCreate->contLen), + .code = 0, + .msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE + }; + mgmtSendMsgToDnode(&ipSet, &rpcRsp); +} diff --git a/src/mnode/src/mgmtUser.c b/src/mnode/src/mgmtUser.c index e55ba9ea60..576b326669 100644 --- a/src/mnode/src/mgmtUser.c +++ b/src/mnode/src/mgmtUser.c @@ -94,7 +94,7 @@ static int32_t mgmtUserActionDecode(SSdbOperDesc *pOper) { int32_t mgmtInitUsers() { SUserObj tObj; - tsUserUpdateSize = tObj.updateEnd - (int8_t *)&tObj; + tsUserUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj; SSdbTableDesc tableDesc = { .tableName = "users", @@ -471,7 +471,7 @@ static void mgmtProcessDropUserMsg(SQueuedMsg *pMsg) { if (hasRight) { code = mgmtDropUser(pUser->pAcct, pDrop->user); if (code == TSDB_CODE_SUCCESS) { - mLPrint("user:%s is dropped by %s", pDrop->user, pUser->user); + mLPrint("user:%s is dropped by %s, result:%d", pUser->user, pOperUser->user, tstrerror(code)); } } else { code = TSDB_CODE_NO_RIGHTS; diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index b37b363077..9e52574b8c 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -142,7 +142,7 @@ static int32_t mgmtVgroupActionDecode(SSdbOperDesc *pOper) { int32_t mgmtInitVgroups() { SVgObj tObj; - tsVgUpdateSize = tObj.updateEnd - (int8_t *)&tObj; + tsVgUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj; SSdbTableDesc tableDesc = { .tableName = "vgroups", @@ -433,13 +433,10 @@ void mgmtAddTableIntoVgroup(SVgObj *pVgroup, STableInfo *pTable) { pVgroup->tableList[pTable->sid] = pTable; taosIdPoolMarkStatus(pVgroup->idPool, pTable->sid); pVgroup->numOfTables++; - pVgroup->pDb->numOfTables++; } if (pVgroup->numOfTables >= pVgroup->pDb->cfg.maxSessions) mgmtAddVgroupIntoDbTail(pVgroup); - else - mgmtAddVgroupIntoDb(pVgroup); } void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, STableInfo *pTable) { @@ -447,13 +444,10 @@ void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, STableInfo *pTable) { pVgroup->tableList[pTable->sid] = NULL; taosFreeId(pVgroup->idPool, pTable->sid); pVgroup->numOfTables--; - pVgroup->pDb->numOfTables--; } if (pVgroup->numOfTables >= pVgroup->pDb->cfg.maxSessions) mgmtAddVgroupIntoDbTail(pVgroup); - else - mgmtAddVgroupIntoDb(pVgroup); } SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup) { diff --git a/src/util/src/tidpool.c b/src/util/src/tidpool.c index c3f429ac43..5ed6b8cf23 100644 --- a/src/util/src/tidpool.c +++ b/src/util/src/tidpool.c @@ -15,12 +15,13 @@ #include "os.h" #include "tlog.h" +#include typedef struct { int maxId; int numOfFree; int freeSlot; - int * freeList; + bool * freeList; pthread_mutex_t mutex; } id_pool_t; @@ -28,7 +29,7 @@ void *taosInitIdPool(int maxId) { id_pool_t *pIdPool = calloc(1, sizeof(id_pool_t)); if (pIdPool == NULL) return NULL; - pIdPool->freeList = malloc(sizeof(int) * (size_t)maxId); + pIdPool->freeList = calloc(maxId, sizeof(bool)); if (pIdPool->freeList == NULL) { free(pIdPool); return NULL; @@ -40,10 +41,6 @@ void *taosInitIdPool(int maxId) { pthread_mutex_init(&pIdPool->mutex, NULL); - for (int i = 0; i < maxId; ++i) { - pIdPool->freeList[i] = -1; - } - pTrace("pool:%p is setup, maxId:%d", pIdPool, pIdPool->maxId); return pIdPool; @@ -61,8 +58,8 @@ int taosAllocateId(void *handle) { if (pIdPool->numOfFree > 0) { for (int i = 0; i < pIdPool->maxId; ++i) { slot = (i + pIdPool->freeSlot) % pIdPool->maxId; - if (pIdPool->freeList[slot] == -1) { - pIdPool->freeList[slot] = slot; + if (!pIdPool->freeList[slot]) { + pIdPool->freeList[slot] = true; pIdPool->freeSlot = slot + 1; pIdPool->numOfFree--; break; @@ -71,7 +68,7 @@ int taosAllocateId(void *handle) { } pthread_mutex_unlock(&pIdPool->mutex); - return slot; + return slot + 1; } void taosFreeId(void *handle, int id) { @@ -80,9 +77,9 @@ void taosFreeId(void *handle, int id) { pthread_mutex_lock(&pIdPool->mutex); - int slot = id % pIdPool->maxId; - if (pIdPool->freeList[slot] != -1) { - pIdPool->freeList[slot] = -1; + int slot = (id - 1) % pIdPool->maxId; + if (pIdPool->freeList[slot]) { + pIdPool->freeList[slot] = false; pIdPool->numOfFree++; } @@ -114,9 +111,9 @@ void taosIdPoolMarkStatus(void *handle, int id) { id_pool_t *pIdPool = handle; pthread_mutex_lock(&pIdPool->mutex); - int slot = id % pIdPool->maxId; - if (pIdPool->freeList[slot] == -1) { - pIdPool->freeList[slot] = slot; + int slot = (id - 1) % pIdPool->maxId; + if (!pIdPool->freeList[slot]) { + pIdPool->freeList[slot] = true; pIdPool->numOfFree--; } @@ -129,18 +126,14 @@ int taosUpdateIdPool(id_pool_t *handle, int maxId) { return -1; } - int *idList = malloc(sizeof(int) * maxId); + int *idList = calloc(maxId, sizeof(bool)); if (idList == NULL) { return -1; } pthread_mutex_lock(&pIdPool->mutex); - for (int i = 0; i < maxId; ++i) { - idList[i] = -1; - } - - memcpy(idList, pIdPool->freeList, sizeof(int) * pIdPool->maxId); + memcpy(idList, pIdPool->freeList, sizeof(bool) * pIdPool->maxId); pIdPool->numOfFree += (maxId - pIdPool->maxId); pIdPool->maxId = maxId; -- GitLab