From 51171c75de64ec31a2185c9547360a28d45ce373 Mon Sep 17 00:00:00 2001 From: slguan Date: Mon, 23 Mar 2020 22:58:09 +0800 Subject: [PATCH] [TD-15] --- src/mnode/inc/mgmtChildTable.h | 1 + src/mnode/src/mgmtChildTable.c | 123 +++++++++++++++++------ src/mnode/src/mgmtDnode.c | 3 - src/mnode/src/mgmtNormalTable.c | 115 +++++++++++++++------- src/mnode/src/mgmtSuperTable.c | 168 +++++++++++++++----------------- src/mnode/src/mgmtTable.c | 43 ++++---- 6 files changed, 271 insertions(+), 182 deletions(-) diff --git a/src/mnode/inc/mgmtChildTable.h b/src/mnode/inc/mgmtChildTable.h index b16dd58f67..9252a7d485 100644 --- a/src/mnode/inc/mgmtChildTable.h +++ b/src/mnode/inc/mgmtChildTable.h @@ -39,6 +39,7 @@ int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName int32_t mgmtGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STableMetaMsg *pMeta, bool usePublicIp); void mgmtDropAllChildTables(SDbObj *pDropDb); +void mgmtDropAllChildTablesInStable(SSuperTableObj *pStable); #ifdef __cplusplus } diff --git a/src/mnode/src/mgmtChildTable.c b/src/mnode/src/mgmtChildTable.c index 9d3c3c634d..d717a4e5e9 100644 --- a/src/mnode/src/mgmtChildTable.c +++ b/src/mnode/src/mgmtChildTable.c @@ -42,14 +42,14 @@ static void mgmtDestroyChildTable(SChildTableObj *pTable) { tfree(pTable); } -static int32_t mgmtChildTableActionDestroy(void *pObj) { - SChildTableObj *pTable = (SChildTableObj *)pObj; +static int32_t mgmtChildTableActionDestroy(SSdbOperDesc *pOper) { + SChildTableObj *pTable = pOper->pObj; mgmtDestroyChildTable(pTable); return TSDB_CODE_SUCCESS; } -static int32_t mgmtChildTableActionInsert(void *pObj) { - SChildTableObj *pTable = (SChildTableObj *) pObj; +static int32_t mgmtChildTableActionInsert(SSdbOperDesc *pOper) { + SChildTableObj *pTable = pOper->pObj; SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); if (pVgroup == NULL) { @@ -84,15 +84,15 @@ static int32_t mgmtChildTableActionInsert(void *pObj) { mgmtAddTableIntoDb(pDb); mgmtAddTableIntoVgroup(pVgroup, (STableInfo *) pTable); - if (pVgroup->numOfTables >= pDb->cfg.maxSessions - 1 && pDb->numOfVgroups > 1) { - mgmtMoveVgroupToTail(pDb, pVgroup); + if (pVgroup->numOfTables >= pDb->cfg.maxSessions && pDb->numOfVgroups > 1) { + mgmtMoveVgroupToTail(pVgroup); } return TSDB_CODE_SUCCESS; } -static int32_t mgmtChildTableActionDelete(void *pObj) { - SChildTableObj *pTable = (SChildTableObj *) pObj; +static int32_t mgmtChildTableActionDelete(SSdbOperDesc *pOper) { + SChildTableObj *pTable = pOper->pObj; if (pTable->vgId == 0) { return TSDB_CODE_INVALID_VGROUP_ID; } @@ -121,33 +121,35 @@ static int32_t mgmtChildTableActionDelete(void *pObj) { mgmtRemoveTableFromSuperTable(pTable->superTable); if (pVgroup->numOfTables > 0) { - mgmtMoveVgroupToHead(pDb, pVgroup); + mgmtMoveVgroupToHead(pVgroup); } return TSDB_CODE_SUCCESS; } -static int32_t mgmtChildTableActionUpdate(void *pObj) { +static int32_t mgmtChildTableActionUpdate(SSdbOperDesc *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mgmtChildTableActionEncode(void *pObj, void *pData, int32_t maxRowSize) { - SChildTableObj *pTable = (SChildTableObj *) pObj; - assert(pObj != NULL && pData != NULL); +static int32_t mgmtChildTableActionEncode(SSdbOperDesc *pOper) { + SChildTableObj *pTable = pOper->pObj; + assert(pTable != NULL && pOper->rowData != NULL); - memcpy(pData, pTable, tsChildTableUpdateSize); + memcpy(pOper->rowData, pTable, tsChildTableUpdateSize); return tsChildTableUpdateSize; } -static void *mgmtChildTableActionDecode(void *pData) { - assert(pData != NULL); +static int32_t mgmtChildTableActionDecode(SSdbOperDesc *pOper) { + assert(pOper->rowData != NULL); - SChildTableObj *pTable = (SChildTableObj *)calloc(sizeof(SChildTableObj), 1); - if (pTable == NULL) return NULL; + pOper->pObj = calloc(1, sizeof(SChildTableObj)); + if (pOper->pObj == NULL) { + return TSDB_CODE_SERV_OUT_OF_MEMORY; + } - memcpy(pTable, pData, tsChildTableUpdateSize); + memcpy(pOper->pObj, pOper->rowData, tsChildTableUpdateSize); - return (void *)pTable; + return TSDB_CODE_SUCCESS; } int32_t mgmtInitChildTables() { @@ -162,7 +164,7 @@ int32_t mgmtInitChildTables() { .tableName = "ctables", .hashSessions = tsMaxTables, .maxRowSize = tsChildTableUpdateSize, - .keyType = SDB_KEYTYPE_STRING, + .keyType = SDB_KEY_TYPE_STRING, .insertFp = mgmtChildTableActionInsert, .deleteFp = mgmtChildTableActionDelete, .updateFp = mgmtChildTableActionUpdate, @@ -187,7 +189,11 @@ int32_t mgmtInitChildTables() { SDbObj *pDb = mgmtGetDbByTableId(pTable->tableId); if (pDb == NULL) { mError("ctable:%s, failed to get db, discard it", pTable->tableId); - sdbDeleteRow(tsChildTableSdb, pTable, SDB_OPER_DISK); + SSdbOperDesc desc = {0}; + desc.type = SDB_OPER_TYPE_LOCAL; + desc.pObj = pTable; + desc.table = tsChildTableSdb; + sdbDeleteRow(&desc); pNode = pLastNode; continue; } @@ -196,7 +202,11 @@ int32_t mgmtInitChildTables() { if (pVgroup == NULL) { mError("ctable:%s, failed to get vgroup:%d sid:%d, discard it", pTable->tableId, pTable->vgId, pTable->sid); pTable->vgId = 0; - sdbDeleteRow(tsChildTableSdb, pTable, SDB_OPER_DISK); + SSdbOperDesc desc = {0}; + desc.type = SDB_OPER_TYPE_LOCAL; + desc.pObj = pTable; + desc.table = tsChildTableSdb; + sdbDeleteRow(&desc); pNode = pLastNode; continue; } @@ -205,7 +215,11 @@ int32_t mgmtInitChildTables() { mError("ctable:%s, db:%s not match with vgroup:%d db:%s sid:%d, discard it", pTable->tableId, pDb->name, pTable->vgId, pVgroup->dbName, pTable->sid); pTable->vgId = 0; - sdbDeleteRow(tsChildTableSdb, pTable, SDB_OPER_DISK); + SSdbOperDesc desc = {0}; + desc.type = SDB_OPER_TYPE_LOCAL; + desc.pObj = pTable; + desc.table = tsChildTableSdb; + sdbDeleteRow(&desc); pNode = pLastNode; continue; } @@ -213,19 +227,27 @@ int32_t mgmtInitChildTables() { if (pVgroup->tableList == NULL) { mError("ctable:%s, vgroup:%d tableList is null", pTable->tableId, pTable->vgId); pTable->vgId = 0; - sdbDeleteRow(tsChildTableSdb, pTable, SDB_OPER_DISK); + SSdbOperDesc desc = {0}; + desc.type = SDB_OPER_TYPE_LOCAL; + desc.pObj = pTable; + desc.table = tsChildTableSdb; + sdbDeleteRow(&desc); pNode = pLastNode; continue; } pVgroup->tableList[pTable->sid] = (STableInfo*)pTable; - taosIdPoolMarkStatus(pVgroup->idPool, pTable->sid, 1); + taosIdPoolMarkStatus(pVgroup->idPool, pTable->sid); SSuperTableObj *pSuperTable = mgmtGetSuperTable(pTable->superTableId); if (pSuperTable == NULL) { mError("ctable:%s, stable:%s not exist", pTable->tableId, pTable->superTableId); pTable->vgId = 0; - sdbDeleteRow(tsChildTableSdb, pTable, SDB_OPER_DISK); + SSdbOperDesc desc = {0}; + desc.type = SDB_OPER_TYPE_LOCAL; + desc.pObj = pTable; + desc.table = tsChildTableSdb; + sdbDeleteRow(&desc); pNode = pLastNode; continue; } @@ -310,7 +332,13 @@ void* mgmtCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t pTable->vgId = pVgroup->vgId; pTable->superTable = pSuperTable; - if (sdbInsertRow(tsChildTableSdb, pTable, SDB_OPER_GLOBAL) < 0) { + SSdbOperDesc desc = {0}; + desc.type = SDB_OPER_TYPE_GLOBAL; + desc.pObj = pTable; + desc.table = tsChildTableSdb; + sdbInsertRow(&desc); + + if (sdbInsertRow(&desc) < 0) { free(pTable); mError("ctable:%s, update sdb error", pCreate->tableId); terrno = TSDB_CODE_SDB_ERROR; @@ -453,12 +481,45 @@ void mgmtDropAllChildTables(SDbObj *pDropDb) { } if (strncmp(pDropDb->name, pTable->tableId, dbNameLen) == 0) { - sdbDeleteRow(tsChildTableSdb, pTable, SDB_OPER_LOCAL); + SSdbOperDesc oper = { + .type = SDB_OPER_TYPE_LOCAL, + .table = tsChildTableSdb, + .pObj = pTable, + }; + sdbDeleteRow(&oper); + pNode = pLastNode; + numOfTables++; + continue; + } + } + + mTrace("db:%s, all child tables:%d is dropped from sdb", pDropDb->name, numOfTables); +} + +void mgmtDropAllChildTablesInStable(SSuperTableObj *pStable) { + void *pNode = NULL; + void *pLastNode = NULL; + int32_t numOfTables = 0; + SChildTableObj *pTable = NULL; + + while (1) { + pNode = sdbFetchRow(tsChildTableSdb, pNode, (void **)&pTable); + if (pTable == NULL) { + break; + } + + if (pTable->superTable == pStable) { + SSdbOperDesc oper = { + .type = SDB_OPER_TYPE_LOCAL, + .table = tsChildTableSdb, + .pObj = pTable, + }; + sdbDeleteRow(&oper); pNode = pLastNode; - numOfTables ++; + numOfTables++; continue; } } - mTrace("db:%s, all child tables:%d is dropped", pDropDb->name, numOfTables); + mTrace("stable:%s, all child tables:%d is dropped from sdb", pStable->tableId, numOfTables); } \ No newline at end of file diff --git a/src/mnode/src/mgmtDnode.c b/src/mnode/src/mgmtDnode.c index 0d83e262ae..0aab083471 100644 --- a/src/mnode/src/mgmtDnode.c +++ b/src/mnode/src/mgmtDnode.c @@ -547,9 +547,6 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { return ; } - uint32_t lastPrivateIp = pDnode->privateIp; - uint32_t lastPublicIp = pDnode->publicIp; - pDnode->privateIp = htonl(pStatus->privateIp); pDnode->publicIp = htonl(pStatus->publicIp); pDnode->lastReboot = htonl(pStatus->lastReboot); diff --git a/src/mnode/src/mgmtNormalTable.c b/src/mnode/src/mgmtNormalTable.c index 9140a4c752..0ffd4fb28e 100644 --- a/src/mnode/src/mgmtNormalTable.c +++ b/src/mnode/src/mgmtNormalTable.c @@ -42,14 +42,14 @@ static void mgmtDestroyNormalTable(SNormalTableObj *pTable) { tfree(pTable); } -static int32_t mgmtNormalTableActionDestroy(void *pObj) { - SNormalTableObj *pTable = (SNormalTableObj *)pObj; +static int32_t mgmtNormalTableActionDestroy(SSdbOperDesc *pOper) { + SNormalTableObj *pTable = pOper->pObj; mgmtDestroyNormalTable(pTable); return TSDB_CODE_SUCCESS; } -static int32_t mgmtNormalTableActionInsert(void *pObj) { - SNormalTableObj *pTable = (SNormalTableObj *) pObj; +static int32_t mgmtNormalTableActionInsert(SSdbOperDesc *pOper) { + SNormalTableObj *pTable = pOper->pObj; SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); if (pVgroup == NULL) { @@ -82,14 +82,14 @@ static int32_t mgmtNormalTableActionInsert(void *pObj) { mgmtAddTableIntoVgroup(pVgroup, (STableInfo *) pTable); if (pVgroup->numOfTables >= pDb->cfg.maxSessions - 1 && pDb->numOfVgroups > 1) { - mgmtMoveVgroupToTail(pDb, pVgroup); + mgmtMoveVgroupToTail(pVgroup); } return TSDB_CODE_SUCCESS; } -static int32_t mgmtNormalTableActionDelete(void *pObj) { - SNormalTableObj *pTable = (SNormalTableObj *) pObj; +static int32_t mgmtNormalTableActionDelete(SSdbOperDesc *pOper) { + SNormalTableObj *pTable = pOper->pObj; if (pTable->vgId == 0) { return TSDB_CODE_INVALID_VGROUP_ID; } @@ -116,13 +116,13 @@ static int32_t mgmtNormalTableActionDelete(void *pObj) { mgmtRemoveTableFromVgroup(pVgroup, (STableInfo *) pTable); if (pVgroup->numOfTables > 0) { - mgmtMoveVgroupToHead(pDb, pVgroup); + mgmtMoveVgroupToHead(pVgroup); } return TSDB_CODE_SUCCESS; } -static int32_t mgmtNormalTableActionUpdate(void *pObj) { +static int32_t mgmtNormalTableActionUpdate(SSdbOperDesc *pOper) { // SNormalTableObj *pTable = (SNormalTableObj *) pObj; // memcpy(pTable, str, tsNormalTableUpdateSize); @@ -134,48 +134,49 @@ static int32_t mgmtNormalTableActionUpdate(void *pObj) { return TSDB_CODE_SUCCESS; } -static int32_t mgmtNormalTableActionEncode(void *pObj, void *pData, int32_t maxRowSize) { - SNormalTableObj *pTable = (SNormalTableObj *) pObj; - assert(pObj != NULL && pData != NULL); +static int32_t mgmtNormalTableActionEncode(SSdbOperDesc *pOper) { + SNormalTableObj *pTable = pOper->pObj; + assert(pOper->pObj != NULL && pOper->rowData != NULL); int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema); - if (maxRowSize < tsNormalTableUpdateSize + schemaSize + 1) { + if (pOper->maxRowSize < tsNormalTableUpdateSize + schemaSize + 1) { return -1; } - memcpy(pData, pTable, tsNormalTableUpdateSize); - memcpy(pData + tsNormalTableUpdateSize, pTable->schema, schemaSize); - memcpy(pData + tsNormalTableUpdateSize + schemaSize, pTable->sql, pTable->sqlLen); + memcpy(pOper->rowData, pTable, tsNormalTableUpdateSize); + memcpy(pOper->rowData + tsNormalTableUpdateSize, pTable->schema, schemaSize); + memcpy(pOper->rowData + tsNormalTableUpdateSize + schemaSize, pTable->sql, pTable->sqlLen); return tsNormalTableUpdateSize + schemaSize + pTable->sqlLen; } -static void *mgmtNormalTableActionDecode(void *pData) { - assert(pData != NULL); +static int32_t mgmtNormalTableActionDecode(SSdbOperDesc *pOper) { + assert(pOper->rowData != NULL); SNormalTableObj *pTable = (SNormalTableObj *)malloc(sizeof(SNormalTableObj)); if (pTable == NULL) { - return NULL; + return -1; } memset(pTable, 0, sizeof(SNormalTableObj)); - memcpy(pTable, pData, tsNormalTableUpdateSize); + memcpy(pTable, pOper->rowData, tsNormalTableUpdateSize); int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema); pTable->schema = (SSchema *)malloc(schemaSize); if (pTable->schema == NULL) { mgmtDestroyNormalTable(pTable); - return NULL; + return -1; } - memcpy(pTable->schema, pData + tsNormalTableUpdateSize, schemaSize); + memcpy(pTable->schema, pOper->rowData + tsNormalTableUpdateSize, schemaSize); pTable->sql = (char *)malloc(pTable->sqlLen); if (pTable->sql == NULL) { mgmtDestroyNormalTable(pTable); - return NULL; + return -1; } - memcpy(pTable->sql, pData + tsNormalTableUpdateSize + schemaSize, pTable->sqlLen); - return (void *)pTable; + memcpy(pTable->sql, pOper->rowData + tsNormalTableUpdateSize + schemaSize, pTable->sqlLen); + + return TSDB_CODE_SUCCESS; } int32_t mgmtInitNormalTables() { @@ -190,7 +191,7 @@ int32_t mgmtInitNormalTables() { .tableName = "ntables", .hashSessions = TSDB_MAX_NORMAL_TABLES, .maxRowSize = sizeof(SNormalTableObj) + sizeof(SSchema) * TSDB_MAX_COLUMNS, - .keyType = SDB_KEYTYPE_STRING, + .keyType = SDB_KEY_TYPE_STRING, .insertFp = mgmtNormalTableActionInsert, .deleteFp = mgmtNormalTableActionDelete, .updateFp = mgmtNormalTableActionUpdate, @@ -213,7 +214,11 @@ int32_t mgmtInitNormalTables() { SDbObj *pDb = mgmtGetDbByTableId(pTable->tableId); if (pDb == NULL) { mError("ntable:%s, failed to get db, discard it", pTable->tableId); - sdbDeleteRow(tsNormalTableSdb, pTable, SDB_OPER_DISK); + SSdbOperDesc desc = {0}; + desc.type = SDB_OPER_TYPE_LOCAL; + desc.pObj = pTable; + desc.table = tsNormalTableSdb; + sdbDeleteRow(&desc); pNode = pLastNode; continue; } @@ -222,7 +227,11 @@ int32_t mgmtInitNormalTables() { if (pVgroup == NULL) { mError("ntable:%s, failed to get vgroup:%d sid:%d, discard it", pTable->tableId, pTable->vgId, pTable->sid); pTable->vgId = 0; - sdbDeleteRow(tsNormalTableSdb, pTable, SDB_OPER_DISK); + SSdbOperDesc desc = {0}; + desc.type = SDB_OPER_TYPE_LOCAL; + desc.pObj = pTable; + desc.table = tsNormalTableSdb; + sdbDeleteRow(&desc); pNode = pLastNode; continue; } @@ -231,7 +240,12 @@ int32_t mgmtInitNormalTables() { mError("ntable:%s, db:%s not match with vgroup:%d db:%s sid:%d, discard it", pTable->tableId, pDb->name, pTable->vgId, pVgroup->dbName, pTable->sid); pTable->vgId = 0; - sdbDeleteRow(tsNormalTableSdb, pTable, SDB_OPER_DISK); + + SSdbOperDesc desc = {0}; + desc.type = SDB_OPER_TYPE_LOCAL; + desc.pObj = pTable; + desc.table = tsNormalTableSdb; + sdbDeleteRow(&desc); pNode = pLastNode; continue; } @@ -239,14 +253,18 @@ int32_t mgmtInitNormalTables() { if (pVgroup->tableList == NULL) { mError("ntable:%s, vgroup:%d tableList is null", pTable->tableId, pTable->vgId); pTable->vgId = 0; - sdbDeleteRow(tsNormalTableSdb, pTable, SDB_OPER_DISK); + SSdbOperDesc desc = {0}; + desc.type = SDB_OPER_TYPE_LOCAL; + desc.pObj = pTable; + desc.table = tsNormalTableSdb; + sdbDeleteRow(&desc); pNode = pLastNode; continue; } mgmtAddTableIntoVgroup(pVgroup, (STableInfo *)pTable); //pVgroup->tableList[pTable->sid] = (STableInfo*)pTable; - taosIdPoolMarkStatus(pVgroup->idPool, pTable->sid, 1); + taosIdPoolMarkStatus(pVgroup->idPool, pTable->sid); pTable->sql = (char *)pTable->schema + sizeof(SSchema) * pTable->numOfColumns; @@ -346,7 +364,11 @@ void *mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t mTrace("table:%s, stream sql len:%d sql:%s", pTable->tableId, pTable->sqlLen, pTable->sql); } - if (sdbInsertRow(tsNormalTableSdb, pTable, SDB_OPER_GLOBAL) < 0) { + SSdbOperDesc desc = {0}; + desc.type = SDB_OPER_TYPE_GLOBAL; + desc.pObj = pTable; + desc.table = tsNormalTableSdb; + if (sdbInsertRow(&desc) < 0) { mError("table:%s, update sdb error", pTable->tableId); free(pTable); terrno = TSDB_CODE_SDB_ERROR; @@ -443,7 +465,14 @@ int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int3 pTable->sversion++; pAcct->acctInfo.numOfTimeSeries += ncols; - sdbUpdateRow(tsNormalTableSdb, pTable, tsNormalTableUpdateSize, SDB_OPER_GLOBAL); + SSdbOperDesc desc = {0}; + desc.type = SDB_OPER_TYPE_GLOBAL; + desc.pObj = pTable; + desc.table = tsNormalTableSdb; + desc.rowData = pTable; + desc.rowSize = tsNormalTableUpdateSize; + sdbUpdateRow(&desc); + return TSDB_CODE_SUCCESS; } @@ -472,7 +501,14 @@ int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *colName) pTable->sversion++; pAcct->acctInfo.numOfTimeSeries--; - sdbUpdateRow(tsNormalTableSdb, pTable, tsNormalTableUpdateSize, SDB_OPER_GLOBAL); + + SSdbOperDesc desc = {0}; + desc.type = SDB_OPER_TYPE_GLOBAL; + desc.pObj = pTable; + desc.table = tsNormalTableSdb; + desc.rowData = pTable; + desc.rowSize = tsNormalTableUpdateSize; + sdbUpdateRow(&desc); return TSDB_CODE_SUCCESS; } @@ -533,12 +569,17 @@ void mgmtDropAllNormalTables(SDbObj *pDropDb) { if (pTable == NULL) break; if (strncmp(pDropDb->name, pTable->tableId, dbNameLen) == 0) { - sdbDeleteRow(tsNormalTableSdb, pTable, SDB_OPER_LOCAL); + SSdbOperDesc oper = { + .type = SDB_OPER_TYPE_LOCAL, + .table = tsNormalTableSdb, + .pObj = pTable, + }; + sdbDeleteRow(&oper); pNode = pLastNode; - numOfTables ++; + numOfTables++; continue; } } - mTrace("db:%s, all normal tables:%d is dropped", pDropDb->name, numOfTables); + mTrace("db:%s, all normal tables:%d is dropped from sdb", pDropDb->name, numOfTables); } diff --git a/src/mnode/src/mgmtSuperTable.c b/src/mnode/src/mgmtSuperTable.c index a4949a74c4..d3a0dcb6bc 100644 --- a/src/mnode/src/mgmtSuperTable.c +++ b/src/mnode/src/mgmtSuperTable.c @@ -32,93 +32,84 @@ #include "name.h" #include "tsqlfunction.h" -static void *tsSuperTableSdb; +static void *tsSuperTableSdb; static int32_t tsSuperTableUpdateSize; static int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mgmtGetShowSuperTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); -static void mgmtDestroySuperTable(SSuperTableObj *pTable) { - tfree(pTable->schema); - tfree(pTable); +static void mgmtDestroySuperTable(SSuperTableObj *pStable) { + tfree(pStable->schema); + tfree(pStable); } -static int32_t mgmtSuperTableActionDestroy(void *pObj) { - SSuperTableObj *pTable = (SSuperTableObj *) pObj; - mgmtDestroySuperTable(pTable); +static int32_t mgmtSuperTableActionDestroy(SSdbOperDesc *pOper) { + mgmtDestroySuperTable(pOper->pObj); return TSDB_CODE_SUCCESS; } -static int32_t mgmtSuperTableActionInsert(void *pObj) { - STableInfo *pTable = (STableInfo *) pObj; - SDbObj *pDb = mgmtGetDbByTableId(pTable->tableId); - if (pDb) { +static int32_t mgmtSuperTableActionInsert(SSdbOperDesc *pOper) { + STableInfo *pStable = pOper->pObj; + SDbObj *pDb = mgmtGetDbByTableId(pStable->tableId); + if (pDb != NULL) { mgmtAddSuperTableIntoDb(pDb); } return TSDB_CODE_SUCCESS; } -static int32_t mgmtSuperTableActionDelete(void *pObj) { - STableInfo *pTable = (STableInfo *) pObj; - SDbObj *pDb = mgmtGetDbByTableId(pTable->tableId); - if (pDb) { +static int32_t mgmtSuperTableActionDelete(SSdbOperDesc *pOper) { + STableInfo *pStable = pOper->pObj; + SDbObj *pDb = mgmtGetDbByTableId(pStable->tableId); + if (pDb != NULL) { mgmtRemoveSuperTableFromDb(pDb); + mgmtDropAllChildTablesInStable((SSuperTableObj *)pStable); } return TSDB_CODE_SUCCESS; } -static int32_t mgmtSuperTableActionUpdate(void *pObj) { - SSuperTableObj *pTable = (SSuperTableObj *) pObj; - memcpy(pTable, pObj, tsSuperTableUpdateSize); - - int32_t schemaSize = sizeof(SSchema) * (pTable->numOfColumns + pTable->numOfTags); - pTable->schema = realloc(pTable->schema, schemaSize); - memcpy(pTable->schema, pObj + tsSuperTableUpdateSize, schemaSize); - +static int32_t mgmtSuperTableActionUpdate(SSdbOperDesc *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mgmtSuperTableActionEncode(void *pObj, void *pData, int32_t maxRowSize) { - SSuperTableObj *pTable = (SSuperTableObj *) pObj; - assert(pObj != NULL && pData != NULL); +static int32_t mgmtSuperTableActionEncode(SSdbOperDesc *pOper) { + SSuperTableObj *pStable = pOper->pObj; + assert(pOper->pObj != NULL && pOper->rowData != NULL); - int32_t schemaSize = sizeof(SSchema) * (pTable->numOfColumns + pTable->numOfTags); + int32_t schemaSize = sizeof(SSchema) * (pStable->numOfColumns + pStable->numOfTags); - if (maxRowSize < tsSuperTableUpdateSize + schemaSize + 1) { + if (pOper->maxRowSize < tsSuperTableUpdateSize + schemaSize) { return TSDB_CODE_INVALID_MSG_LEN; } - memcpy(pData, pTable, tsSuperTableUpdateSize); - memcpy(pData + tsSuperTableUpdateSize, pTable->schema, schemaSize); - return tsSuperTableUpdateSize + schemaSize; + memcpy(pOper->rowData, pStable, tsSuperTableUpdateSize); + memcpy(pOper->rowData + tsSuperTableUpdateSize, pStable->schema, schemaSize); + pOper->rowSize = tsSuperTableUpdateSize + schemaSize; + + return TSDB_CODE_SUCCESS; } -static void *mgmtSuperTableActionDecode(void *pData) { - assert(pData != NULL); +static int32_t mgmtSuperTableActionDecode(SSdbOperDesc *pOper) { + assert(pOper->rowData != NULL); - SSuperTableObj *pTable = (SSuperTableObj *) malloc(sizeof(SSuperTableObj)); - if (pTable == NULL) { - return NULL; - } - memset(pTable, 0, sizeof(SSuperTableObj)); - memcpy(pTable, pData, tsSuperTableUpdateSize); + SSuperTableObj *pStable = (SSuperTableObj *) calloc(1, sizeof(SSuperTableObj)); + if (pStable == NULL) return TSDB_CODE_SERV_OUT_OF_MEMORY; - int32_t schemaSize = sizeof(SSchema) * (pTable->numOfColumns + pTable->numOfTags); - pTable->schema = malloc(schemaSize); - if (pTable->schema == NULL) { - mgmtDestroySuperTable(pTable); - return NULL; + memcpy(pStable, pOper->rowData, tsSuperTableUpdateSize); + + int32_t schemaSize = sizeof(SSchema) * (pStable->numOfColumns + pStable->numOfTags); + pStable->schema = malloc(schemaSize); + if (pStable->schema == NULL) { + mgmtDestroySuperTable(pStable); + return -1; } - memcpy(pTable->schema, pData + tsSuperTableUpdateSize, schemaSize); - return (void *) pTable; + memcpy(pStable->schema, pOper->rowData + tsSuperTableUpdateSize, schemaSize); + pOper->pObj = pStable; + + return TSDB_CODE_SUCCESS; } int32_t mgmtInitSuperTables() { - void *pNode = NULL; - void *pLastNode = NULL; - SSuperTableObj *pTable = NULL; - SSuperTableObj tObj; tsSuperTableUpdateSize = tObj.updateEnd - (int8_t *)&tObj; @@ -126,7 +117,7 @@ int32_t mgmtInitSuperTables() { .tableName = "stables", .hashSessions = TSDB_MAX_SUPER_TABLES, .maxRowSize = tsSuperTableUpdateSize + sizeof(SSchema) * TSDB_MAX_COLUMNS, - .keyType = SDB_KEYTYPE_STRING, + .keyType = SDB_KEY_TYPE_STRING, .insertFp = mgmtSuperTableActionInsert, .deleteFp = mgmtSuperTableActionDelete, .updateFp = mgmtSuperTableActionUpdate, @@ -141,24 +132,6 @@ int32_t mgmtInitSuperTables() { return -1; } - pNode = NULL; - while (1) { - pNode = sdbFetchRow(tsSuperTableSdb, pNode, (void **) &pTable); - if (pTable == NULL) { - break; - } - - SDbObj *pDb = mgmtGetDbByTableId(pTable->tableId); - if (pDb == NULL) { - mError("super table:%s, failed to get db, discard it", pTable->tableId); - sdbDeleteRow(tsSuperTableSdb, pTable, SDB_OPER_DISK); - pNode = pLastNode; - continue; - } - - mgmtAddSuperTableIntoDb(pDb); - } - mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_METRIC, mgmtGetShowSuperTableMeta); mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_METRIC, mgmtRetrieveShowSuperTables); @@ -171,7 +144,7 @@ void mgmtCleanUpSuperTables() { } int32_t mgmtCreateSuperTable(SCMCreateTableMsg *pCreate) { - SSuperTableObj *pStable = (SSuperTableObj *)calloc(sizeof(SSuperTableObj), 1); + SSuperTableObj *pStable = (SSuperTableObj *)calloc(1, sizeof(SSuperTableObj)); if (pStable == NULL) { return TSDB_CODE_SERV_OUT_OF_MEMORY; } @@ -203,13 +176,21 @@ int32_t mgmtCreateSuperTable(SCMCreateTableMsg *pCreate) { tschema[col].bytes = htons(tschema[col].bytes); } - if (sdbInsertRow(tsSuperTableSdb, pStable, SDB_OPER_GLOBAL) < 0) { - mError("stable:%s, update sdb error", pStable->tableId); + SSdbOperDesc oper = { + .type = SDB_OPER_TYPE_GLOBAL, + .table = tsSuperTableSdb, + .pObj = pStable, + .rowSize = sizeof(SSuperTableObj) + schemaSize + }; + + int32_t code = sdbInsertRow(&oper); + if (code != TSDB_CODE_SUCCESS) { + mgmtDestroySuperTable(pStable); return TSDB_CODE_SDB_ERROR; + } else { + mLPrint("stable:%s, is created, tags:%d cols:%d", pStable->tableId, pStable->numOfTags, pStable->numOfColumns); + return TSDB_CODE_SUCCESS; } - - mPrint("stable:%s, is created, tags:%d cols:%d", pStable->tableId, pStable->numOfTags, pStable->numOfColumns); - return TSDB_CODE_SUCCESS; } int32_t mgmtDropSuperTable(SQueuedMsg *newMsg, SDbObj *pDb, SSuperTableObj *pStable) { @@ -217,10 +198,14 @@ int32_t mgmtDropSuperTable(SQueuedMsg *newMsg, SDbObj *pDb, SSuperTableObj *pSta mError("stable:%s, numOfTables:%d not 0", pStable->tableId, pStable->numOfTables); return TSDB_CODE_OTHERS; } else { - //TODO: drop child tables - mError("stable:%s, is dropped from sdb", pStable->tableId); - mgmtRemoveSuperTableFromDb(pDb); - return TSDB_CODE_OTHERS; + SSdbOperDesc oper = { + .type = SDB_OPER_TYPE_GLOBAL, + .table = tsSuperTableSdb, + .pObj = pStable + }; + int32_t code = sdbDeleteRow(&oper); + mLPrint("stable:%s, is dropped from sdb, result:%s", pStable->tableId, tstrerror(code)); + return code; } } @@ -229,10 +214,9 @@ void* mgmtGetSuperTable(char *tableId) { } void *mgmtGetSuperTableVgroup(SSuperTableObj *pStable) { - //TODO get vgroup of dnodes SCMSuperTableInfoRsp *rsp = rpcMallocCont(sizeof(SCMSuperTableInfoRsp) + sizeof(uint32_t) * mgmtGetDnodesNum()); - rsp->numOfDnodes = 1; - rsp->dnodeIps[0] = 0; + rsp->numOfDnodes = htonl(1); + rsp->dnodeIps[0] = htonl(inet_addr(tsPrivateIp)); return rsp; } @@ -289,7 +273,7 @@ int32_t mgmtAddSuperTableTag(SSuperTableObj *pStable, SSchema schema[], int32_t pStable->sversion++; pAcct->acctInfo.numOfTimeSeries += (ntags * pStable->numOfTables); - sdbUpdateRow(tsSuperTableSdb, pStable, tsSuperTableUpdateSize, SDB_OPER_GLOBAL); + // sdbUpdateRow(tsSuperTableSdb, pStable, tsSuperTableUpdateSize, SDB_OPER_GLOBAL); mTrace("Succeed to add tag column %s to table %s", schema[0].name, pStable->tableId); return TSDB_CODE_SUCCESS; @@ -322,7 +306,7 @@ int32_t mgmtDropSuperTableTag(SSuperTableObj *pStable, char *tagName) { int32_t schemaSize = sizeof(SSchema) * (pStable->numOfTags + pStable->numOfColumns); pStable->schema = realloc(pStable->schema, schemaSize); - sdbUpdateRow(tsSuperTableSdb, pStable, tsSuperTableUpdateSize, SDB_OPER_GLOBAL); + // sdbUpdateRow(tsSuperTableSdb, pStable, tsSuperTableUpdateSize, SDB_OPER_GLOBAL); return TSDB_CODE_SUCCESS; } @@ -354,7 +338,8 @@ int32_t mgmtModifySuperTableTagNameByName(SSuperTableObj *pStable, char *oldTagN // mgmtSuperTableActionEncode(pStable, msg, size, &rowSize); - int32_t ret = sdbUpdateRow(tsSuperTableSdb, msg, tsSuperTableUpdateSize, SDB_OPER_GLOBAL); + int32_t ret = 0; + // int32_t ret = sdbUpdateRow(tsSuperTableSdb, msg, tsSuperTableUpdateSize, SDB_OPER_GLOBAL); tfree(msg); if (ret < 0) { @@ -416,7 +401,7 @@ int32_t mgmtAddSuperTableColumn(SSuperTableObj *pStable, SSchema schema[], int32 pStable->sversion++; pAcct->acctInfo.numOfTimeSeries += (ncols * pStable->numOfTables); - sdbUpdateRow(tsSuperTableSdb, pStable, tsSuperTableUpdateSize, SDB_OPER_GLOBAL); + // sdbUpdateRow(tsSuperTableSdb, pStable, tsSuperTableUpdateSize, SDB_OPER_GLOBAL); return TSDB_CODE_SUCCESS; } @@ -449,7 +434,7 @@ int32_t mgmtDropSuperTableColumnByName(SSuperTableObj *pStable, char *colName) { pStable->schema = realloc(pStable->schema, schemaSize); pAcct->acctInfo.numOfTimeSeries -= (pStable->numOfTables); - sdbUpdateRow(tsSuperTableSdb, pStable, tsSuperTableUpdateSize, SDB_OPER_GLOBAL); + // sdbUpdateRow(tsSuperTableSdb, pStable, tsSuperTableUpdateSize, SDB_OPER_GLOBAL); return TSDB_CODE_SUCCESS; } @@ -588,14 +573,19 @@ void mgmtDropAllSuperTables(SDbObj *pDropDb) { } if (strncmp(pDropDb->name, pTable->tableId, dbNameLen) == 0) { - sdbDeleteRow(tsSuperTableSdb, pTable, SDB_OPER_GLOBAL); + SSdbOperDesc oper = { + .type = SDB_OPER_TYPE_LOCAL, + .table = tsSuperTableSdb, + .pObj = pTable, + }; + sdbDeleteRow(&oper); pNode = pLastNode; numOfTables ++; continue; } } - mTrace("db:%s, all super tables:%d is dropped", pDropDb->name, numOfTables); + mTrace("db:%s, all super tables:%d is dropped from sdb", pDropDb->name, numOfTables); } void mgmtAddTableIntoSuperTable(SSuperTableObj *pStable) { diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index 255b2a6f27..918547fa12 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -78,8 +78,6 @@ int32_t mgmtInitTables() { return code; } - mgmtSetVgroupIdPool(); - mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CREATE_TABLE, mgmtProcessCreateTableMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_DROP_TABLE, mgmtProcessDropTableMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_ALTER_TABLE, mgmtProcessAlterTableMsg); @@ -202,13 +200,13 @@ int32_t mgmtGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) pShow->bytes[cols] = TSDB_TABLE_NAME_LEN; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema[cols].name, "table_name"); + strcpy(pSchema[cols].name, "table name"); pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 8; pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; - strcpy(pSchema[cols].name, "created_time"); + strcpy(pSchema[cols].name, "create time"); pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; @@ -220,7 +218,7 @@ int32_t mgmtGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) pShow->bytes[cols] = TSDB_TABLE_NAME_LEN; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema[cols].name, "stable_name"); + strcpy(pSchema[cols].name, "stable name"); pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; @@ -238,9 +236,6 @@ int32_t mgmtGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) return 0; } -/* - * remove the hole in result set - */ static void mgmtVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_t capacity, SShowObj *pShow) { if (rows < capacity) { for (int32_t i = 0; i < numOfCols; ++i) { @@ -434,11 +429,13 @@ void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) { pTable = mgmtCreateChildTable(pCreate, pVgroup, sid); if (pTable == NULL) { mgmtSendSimpleResp(pMsg->thandle, terrno); + mgmtFreeQueuedMsg(newMsg); return; } pMDCreate = mgmtBuildCreateChildTableMsg(pCreate, (SChildTableObj *) pTable); if (pMDCreate == NULL) { mgmtSendSimpleResp(pMsg->thandle, terrno); + mgmtFreeQueuedMsg(newMsg); return; } } else { @@ -446,11 +443,13 @@ void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) { pTable = mgmtCreateNormalTable(pCreate, pVgroup, sid); if (pTable == NULL) { mgmtSendSimpleResp(pMsg->thandle, terrno); + mgmtFreeQueuedMsg(newMsg); return; } pMDCreate = mgmtBuildCreateNormalTableMsg((SNormalTableObj *) pTable); if (pMDCreate == NULL) { mgmtSendSimpleResp(pMsg->thandle, terrno); + mgmtFreeQueuedMsg(newMsg); return; } } @@ -761,9 +760,9 @@ static void mgmtProcessCreateTableRsp(SRpcMsg *rpcMsg) { if (rpcMsg->code != TSDB_CODE_SUCCESS) { if (pTable->type == TSDB_CHILD_TABLE) { - sdbDeleteRow(tsChildTableSdb, pTable, SDB_OPER_GLOBAL); + // sdbDeleteRow(tsChildTableSdb, pTable, SDB_OPER_GLOBAL); } else if (pTable->type == TSDB_NORMAL_TABLE){ - sdbDeleteRow(tsNormalTableSdb, pTable, SDB_OPER_GLOBAL); + // sdbDeleteRow(tsNormalTableSdb, pTable, SDB_OPER_GLOBAL); } else {} mError("table:%s, failed to create in dnode, reason:%s", pTable->tableId, tstrerror(rpcMsg->code)); mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code); @@ -817,19 +816,19 @@ static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg) { } if (pTable->type == TSDB_CHILD_TABLE) { - if (sdbDeleteRow(tsChildTableSdb, pTable, SDB_OPER_GLOBAL) < 0) { - mError("table:%s, update ctables sdb error", pTable->tableId); - mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SDB_ERROR); - free(queueMsg); - return; - } + // if (sdbDeleteRow(tsChildTableSdb, pTable, SDB_OPER_GLOBAL) < 0) { + // mError("table:%s, update ctables sdb error", pTable->tableId); + // mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SDB_ERROR); + // free(queueMsg); + // return; + // } } else if (pTable->type == TSDB_NORMAL_TABLE){ - if (sdbDeleteRow(tsNormalTableSdb, pTable, SDB_OPER_GLOBAL) < 0) { - mError("table:%s, update ntables sdb error", pTable->tableId); - mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SDB_ERROR); - free(queueMsg); - return; - } + // if (sdbDeleteRow(tsNormalTableSdb, pTable, SDB_OPER_GLOBAL) < 0) { + // mError("table:%s, update ntables sdb error", pTable->tableId); + // mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SDB_ERROR); + // free(queueMsg); + // return; + // } } if (pVgroup->numOfTables <= 0) { -- GitLab