diff --git a/src/mnode/inc/mgmtMnode.h b/src/mnode/inc/mgmtMnode.h index d381b09055b8ca9cc315ad53c5c8865f9a0e5f98..729d31544f4c49f729e8c6d958a5a038661e225b 100644 --- a/src/mnode/inc/mgmtMnode.h +++ b/src/mnode/inc/mgmtMnode.h @@ -23,6 +23,9 @@ extern "C" { int32_t mgmtInitMnodes(); void mgmtCleanupMnodes(); +bool mgmtInServerStatus(); +bool mgmtIsMaster(); + bool mgmtCheckRedirect(void *handle); void mgmtGetMnodePrivateIpList(SRpcIpSet *ipSet); void mgmtGetMnodePublicIpList(SRpcIpSet *ipSet); diff --git a/src/mnode/inc/mgmtSdb.h b/src/mnode/inc/mgmtSdb.h index c64db286c6802562d51b74ddf0955ee005147635..7381ec6669c8ab2512759eb02cbe5e6ff582dbb6 100644 --- a/src/mnode/inc/mgmtSdb.h +++ b/src/mnode/inc/mgmtSdb.h @@ -20,27 +20,11 @@ extern "C" { #endif -#include -#include -#include -#include -#include -#include -#include -#include - -#include "hashint.h" -#include "hashstr.h" -#include "tchecksum.h" -#include "tlog.h" -#include "trpc.h" -#include "tutil.h" - enum _keytype { SDB_KEYTYPE_STRING, SDB_KEYTYPE_AUTO, SDB_KEYTYPE_MAX -}; +} ESdbKeyType; enum _sdbaction { SDB_TYPE_INSERT, @@ -50,11 +34,15 @@ enum _sdbaction { SDB_TYPE_ENCODE, SDB_TYPE_DESTROY, SDB_MAX_ACTION_TYPES -}; +} ESdbType; + +typedef enum { + SDB_OPER_GLOBAL, + SDB_OPER_LOCAL, + SDB_OPER_DISK +} ESdbOper; uint64_t sdbGetVersion(); -bool sdbInServerState(); -bool sdbIsMaster(); void *sdbOpenTable(int32_t maxRows, int32_t maxRowSize, char *name, uint8_t keyType, char *directory, void *(*appTool)(char, void *, char *, int32_t, int32_t *)); @@ -65,9 +53,9 @@ void *sdbFetchRow(void *handle, void *pNode, void **ppRow); int64_t sdbGetId(void *handle); int64_t sdbGetNumOfRows(void *handle); -int64_t sdbInsertRow(void *handle, void *row, int32_t rowSize); -int32_t sdbDeleteRow(void *handle, void *key); -int32_t sdbUpdateRow(void *handle, void *row, int32_t updateSize, char isUpdated); +int32_t sdbInsertRow(void *handle, void *row, ESdbOper oper); +int32_t sdbDeleteRow(void *handle, void *key, ESdbOper oper); +int32_t sdbUpdateRow(void *handle, void *row, int32_t rowSize, ESdbOper oper); #ifdef __cplusplus } diff --git a/src/mnode/src/mgmtChildTable.c b/src/mnode/src/mgmtChildTable.c index 589beeaaba23c9d2428a822774ba32a1bf88f891..be78983c91c665e5ded69c1d5e51f964700a2579 100644 --- a/src/mnode/src/mgmtChildTable.c +++ b/src/mnode/src/mgmtChildTable.c @@ -29,6 +29,7 @@ #include "mgmtDb.h" #include "mgmtDClient.h" #include "mgmtGrant.h" +#include "mgmtMnode.h" #include "mgmtProfile.h" #include "mgmtSdb.h" #include "mgmtShell.h" @@ -94,7 +95,7 @@ void *mgmtChildTableActionInsert(void *row, char *str, int32_t size, int32_t *ss return NULL; } - if (!sdbIsMaster()) { + if (!mgmtIsMaster()) { int32_t sid = taosAllocateId(pVgroup->idPool); if (sid != pTable->sid) { mError("ctable:%s, sid:%d is not matched from the master:%d", pTable->tableId, sid, pTable->sid); @@ -214,7 +215,7 @@ int32_t mgmtInitChildTables() { SDbObj *pDb = mgmtGetDbByTableId(pTable->tableId); if (pDb == NULL) { mError("ctable:%s, failed to get db, discard it", pTable->tableId); - sdbDeleteRow(tsChildTableSdb, pTable); + sdbDeleteRow(tsChildTableSdb, pTable, SDB_OPER_DISK); pNode = pLastNode; continue; } @@ -223,7 +224,7 @@ int32_t mgmtInitChildTables() { if (pVgroup == NULL) { mError("ctable:%s, failed to get vgroup:%d sid:%d, discard it", pTable->tableId, pTable->vgId, pTable->sid); pTable->vgId = 0; - sdbDeleteRow(tsChildTableSdb, pTable); + sdbDeleteRow(tsChildTableSdb, pTable, SDB_OPER_DISK); pNode = pLastNode; continue; } @@ -232,7 +233,7 @@ int32_t mgmtInitChildTables() { mError("ctable:%s, db:%s not match with vgroup:%d db:%s sid:%d, discard it", pTable->tableId, pDb->name, pTable->vgId, pVgroup->dbName, pTable->sid); pTable->vgId = 0; - sdbDeleteRow(tsChildTableSdb, pTable); + sdbDeleteRow(tsChildTableSdb, pTable, SDB_OPER_DISK); pNode = pLastNode; continue; } @@ -240,7 +241,7 @@ int32_t mgmtInitChildTables() { if (pVgroup->tableList == NULL) { mError("ctable:%s, vgroup:%d tableList is null", pTable->tableId, pTable->vgId); pTable->vgId = 0; - sdbDeleteRow(tsChildTableSdb, pTable); + sdbDeleteRow(tsChildTableSdb, pTable, SDB_OPER_DISK); pNode = pLastNode; continue; } @@ -252,7 +253,7 @@ int32_t mgmtInitChildTables() { if (pSuperTable == NULL) { mError("ctable:%s, stable:%s not exist", pTable->tableId, pTable->superTableId); pTable->vgId = 0; - sdbDeleteRow(tsChildTableSdb, pTable); + sdbDeleteRow(tsChildTableSdb, pTable, SDB_OPER_DISK); pNode = pLastNode; continue; } @@ -337,7 +338,7 @@ void* mgmtCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t pTable->vgId = pVgroup->vgId; pTable->superTable = pSuperTable; - if (sdbInsertRow(tsChildTableSdb, pTable, 0) < 0) { + if (sdbInsertRow(tsChildTableSdb, pTable, SDB_OPER_GLOBAL) < 0) { free(pTable); mError("ctable:%s, update sdb error", pCreate->tableId); terrno = TSDB_CODE_SDB_ERROR; @@ -481,7 +482,7 @@ void mgmtDropAllChildTables(SDbObj *pDropDb) { } if (strncmp(pDropDb->name, pTable->tableId, dbNameLen) == 0) { - sdbDeleteRow(tsChildTableSdb, pTable); + sdbDeleteRow(tsChildTableSdb, pTable, SDB_OPER_LOCAL); pNode = pLastNode; numOfTables ++; continue; diff --git a/src/mnode/src/mgmtDb.c b/src/mnode/src/mgmtDb.c index 5cd12594860fab715b116a965c660eaaf88c8224..e889d200f549b65bdad84a88e4eddf8bf5006fbf 100644 --- a/src/mnode/src/mgmtDb.c +++ b/src/mnode/src/mgmtDb.c @@ -279,7 +279,7 @@ static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate) { pDb->createdTime = taosGetTimestampMs(); pDb->cfg = *pCreate; - if (sdbInsertRow(tsDbSdb, pDb, 0) < 0) { + if (sdbInsertRow(tsDbSdb, pDb, SDB_OPER_GLOBAL) < 0) { code = TSDB_CODE_SDB_ERROR; tfree(pDb); } @@ -819,7 +819,7 @@ static void mgmtDropDb(void *handle, void *tmrId) { SDbObj *pDb = newMsg->ahandle; mPrint("db:%s, drop db from sdb", pDb->name); - int32_t code = sdbDeleteRow(tsDbSdb, pDb); + int32_t code = sdbDeleteRow(tsDbSdb, pDb, SDB_OPER_GLOBAL); if (code != 0) { code = TSDB_CODE_SDB_ERROR; } diff --git a/src/mnode/src/mgmtMain.c b/src/mnode/src/mgmtMain.c index 6aec06970e22d7d782000df1823b099d646663af..ab0b99fa586512e64d1550b29ab77e1b4cbe4332 100644 --- a/src/mnode/src/mgmtMain.c +++ b/src/mnode/src/mgmtMain.c @@ -127,7 +127,7 @@ int32_t mgmtStartSystem() { void mgmtStopSystem() { - if (sdbIsMaster()) { + if (mgmtIsMaster()) { mTrace("it is a master mgmt node, it could not be stopped"); return; } diff --git a/src/mnode/src/mgmtMnode.c b/src/mnode/src/mgmtMnode.c index ac18af01585afcd72679a542b57b5176cfebdf45..8e459bf5afa8de8673a436d0299eb043c11e1636 100644 --- a/src/mnode/src/mgmtMnode.c +++ b/src/mnode/src/mgmtMnode.c @@ -29,6 +29,8 @@ int32_t (*mpeerInitMnodesFp)() = NULL; void (*mpeerCleanUpMnodesFp)() = NULL; static SMnodeObj tsMnodeObj = {0}; +static bool tsMnodeIsMaster = false; +static bool tsMnodeIsServing = false; static int32_t mgmtGetMnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); @@ -52,6 +54,8 @@ int32_t mgmtInitMnodes() { if (mpeerInitMnodesFp) { return (*mpeerInitMnodesFp)(); } else { + tsMnodeIsServing = true; + tsMnodeIsMaster = true; return 0; } } @@ -62,6 +66,14 @@ void mgmtCleanupMnodes() { } } +bool mgmtInServerStatus() { + return tsMnodeIsServing; +} + +bool mgmtIsMaster() { + return tsMnodeIsMaster; +} + bool mgmtCheckRedirect(void *handle) { return false; } diff --git a/src/mnode/src/mgmtNormalTable.c b/src/mnode/src/mgmtNormalTable.c index d42a596008293c218f61ce340a3e1800093940ac..df9fe0f439f61d02c5db88a61f7bed1df9b457d3 100644 --- a/src/mnode/src/mgmtNormalTable.c +++ b/src/mnode/src/mgmtNormalTable.c @@ -26,6 +26,7 @@ #include "mgmtDb.h" #include "mgmtDClient.h" #include "mgmtGrant.h" +#include "mgmtMnode.h" #include "mgmtNormalTable.h" #include "mgmtSdb.h" #include "mgmtSuperTable.h" @@ -98,7 +99,7 @@ void *mgmtNormalTableActionInsert(void *row, char *str, int32_t size, int32_t *s return NULL; } - if (!sdbIsMaster()) { + if (!mgmtIsMaster()) { int32_t sid = taosAllocateId(pVgroup->idPool); if (sid != pTable->sid) { mError("sid:%d is not matched from the master:%d", sid, pTable->sid); @@ -237,7 +238,7 @@ int32_t mgmtInitNormalTables() { SDbObj *pDb = mgmtGetDbByTableId(pTable->tableId); if (pDb == NULL) { mError("ntable:%s, failed to get db, discard it", pTable->tableId); - sdbDeleteRow(tsNormalTableSdb, pTable); + sdbDeleteRow(tsNormalTableSdb, pTable, SDB_OPER_DISK); pNode = pLastNode; continue; } @@ -246,7 +247,7 @@ int32_t mgmtInitNormalTables() { if (pVgroup == NULL) { mError("ntable:%s, failed to get vgroup:%d sid:%d, discard it", pTable->tableId, pTable->vgId, pTable->sid); pTable->vgId = 0; - sdbDeleteRow(tsNormalTableSdb, pTable); + sdbDeleteRow(tsNormalTableSdb, pTable, SDB_OPER_DISK); pNode = pLastNode; continue; } @@ -255,7 +256,7 @@ int32_t mgmtInitNormalTables() { mError("ntable:%s, db:%s not match with vgroup:%d db:%s sid:%d, discard it", pTable->tableId, pDb->name, pTable->vgId, pVgroup->dbName, pTable->sid); pTable->vgId = 0; - sdbDeleteRow(tsNormalTableSdb, pTable); + sdbDeleteRow(tsNormalTableSdb, pTable, SDB_OPER_DISK); pNode = pLastNode; continue; } @@ -263,7 +264,7 @@ int32_t mgmtInitNormalTables() { if (pVgroup->tableList == NULL) { mError("ntable:%s, vgroup:%d tableList is null", pTable->tableId, pTable->vgId); pTable->vgId = 0; - sdbDeleteRow(tsNormalTableSdb, pTable); + sdbDeleteRow(tsNormalTableSdb, pTable, SDB_OPER_DISK); pNode = pLastNode; continue; } @@ -370,7 +371,7 @@ void *mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t mTrace("table:%s, stream sql len:%d sql:%s", pTable->tableId, pTable->sqlLen, pTable->sql); } - if (sdbInsertRow(tsNormalTableSdb, pTable, 0) < 0) { + if (sdbInsertRow(tsNormalTableSdb, pTable, SDB_OPER_GLOBAL) < 0) { mError("table:%s, update sdb error", pTable->tableId); free(pTable); terrno = TSDB_CODE_SDB_ERROR; @@ -467,7 +468,7 @@ int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int3 pTable->sversion++; pAcct->acctInfo.numOfTimeSeries += ncols; - sdbUpdateRow(tsNormalTableSdb, pTable, 0, 1); + sdbUpdateRow(tsNormalTableSdb, pTable, tsNormalTableUpdateSize, SDB_OPER_GLOBAL); return TSDB_CODE_SUCCESS; } @@ -496,7 +497,7 @@ int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *colName) pTable->sversion++; pAcct->acctInfo.numOfTimeSeries--; - sdbUpdateRow(tsNormalTableSdb, pTable, 0, 1); + sdbUpdateRow(tsNormalTableSdb, pTable, tsNormalTableUpdateSize, SDB_OPER_GLOBAL); return TSDB_CODE_SUCCESS; } @@ -555,7 +556,7 @@ void mgmtDropAllNormalTables(SDbObj *pDropDb) { if (pTable == NULL) break; if (strncmp(pDropDb->name, pTable->tableId, dbNameLen) == 0) { - sdbDeleteRow(tsNormalTableSdb, pTable); + sdbDeleteRow(tsNormalTableSdb, pTable, SDB_OPER_LOCAL); pNode = pLastNode; numOfTables ++; continue; diff --git a/src/mnode/src/mgmtSdb.c b/src/mnode/src/mgmtSdb.c index 3564392ed1b8f0bcf639134128c7453cf71465e4..33bfe4622bdf569348a1fe362b55e808a294d926 100644 --- a/src/mnode/src/mgmtSdb.c +++ b/src/mnode/src/mgmtSdb.c @@ -17,15 +17,18 @@ #include "os.h" #include "taosdef.h" #include "tutil.h" +#include "tchecksum.h" +#include "tlog.h" +#include "trpc.h" +#include "tutil.h" +#include "hashint.h" +#include "hashstr.h" #include "mgmtSdb.h" #define abs(x) (((x) < 0) ? -(x) : (x)) #define SDB_MAX_PEERS 4 #define SDB_DELIMITER 0xFFF00F00 #define SDB_ENDCOMMIT 0xAFFFAAAF -#define SDB_STATUS_OFFLINE 0 -#define SDB_STATUS_SERVING 1 - typedef struct { uint64_t swVersion; @@ -81,15 +84,6 @@ int32_t (*mpeerForwardRequestFp)(SSdbTable *pTable, char type, void *cont, int32 static SSdbTable *sdbTableList[10] = {0}; static int32_t sdbNumOfTables = 0; static uint64_t sdbVersion = 0; -static int32_t sdbMaster = 0; -static int32_t sdbStatus = SDB_STATUS_OFFLINE; - - -// #ifdef CLUSTER -// int32_t sdbMaster = 0; -// #else -// int32_t sdbMaster = 1; -// #endif static void *(*sdbInitIndexFp[])(int32_t maxRows, int32_t dataSize) = {sdbOpenStrHash, sdbOpenIntHash}; static void *(*sdbAddIndexFp[])(void *handle, void *key, void *data) = {sdbAddStrHash, sdbAddIntHash}; @@ -102,8 +96,6 @@ void sdbResetTable(SSdbTable *pTable); void sdbSaveSnapShot(void *handle); uint64_t sdbGetVersion() { return sdbVersion; } -bool sdbInServerState() { return sdbStatus == SDB_STATUS_SERVING; } -bool sdbIsMaster() { return sdbMaster; } int64_t sdbGetId(void *handle) { return ((SSdbTable *)handle)->id; } int64_t sdbGetNumOfRows(void *handle) { return ((SSdbTable *)handle)->numOfRows; } @@ -319,6 +311,10 @@ static int32_t sdbInitTableByFile(SSdbTable *pTable) { pTable->size += real_size; if (pTable->id < abs(rowHead->id)) pTable->id = abs(rowHead->id); + + //TODO: check this valid + pTable->size += 4; + lseek(pTable->fd, 4, SEEK_CUR); } if (pTable->keyType == SDB_KEYTYPE_AUTO) { @@ -390,81 +386,67 @@ void *sdbGetRow(void *handle, void *key) { } // row here must be encoded string (rowSize > 0) or the object it self (rowSize = 0) -int64_t sdbInsertRow(void *handle, void *row, int32_t rowSize) { +int32_t sdbInsertRow(void *handle, void *row, ESdbOper oper) { SSdbTable *pTable = (SSdbTable *)handle; SRowMeta rowMeta; - int64_t id = -1; void * pObj = NULL; - int32_t total_size = 0; - int32_t real_size = 0; - /* char action = SDB_TYPE_INSERT; */ + int32_t total_size = 0; + int32_t real_size = 0; if (pTable == NULL) { sdbError("sdb tables is null"); return -1; } - - if ((pTable->keyType != SDB_KEYTYPE_AUTO) || *((int64_t *)row)) - if (sdbGetRow(handle, row)) { - if (strcmp(pTable->name, "mnode") == 0) { - /* - * The first mnode created when the system just start, so the insert action may failed - * see sdbPeer.c : sdbInitPeers - */ - pTable->id++; - sdbVersion++; - sdbPrint("table:%s, record:%s already exist, think it successed, sdbVersion:%" PRId64 " id:%" PRId64, - pTable->name, taosIpStr(*(int32_t *)row), sdbVersion, pTable->id); - return 0; - } else { - switch (pTable->keyType) { - case SDB_KEYTYPE_STRING: - sdbError("table:%s, failed to insert record:%s sdbVersion:%" PRId64 " id:%" PRId64 , pTable->name, (char *)row, sdbVersion, pTable->id); - break; - case SDB_KEYTYPE_AUTO: - sdbError("table:%s, failed to insert record:%d sdbVersion:%" PRId64 " id:%" PRId64, pTable->name, *(int32_t *)row, sdbVersion, pTable->id); - break; - default: - sdbError("table:%s, failed to insert record sdbVersion:%" PRId64 " id:%" PRId64, pTable->name, sdbVersion, pTable->id); - break; - } - return -1; - } + + if (sdbGetRow(handle, row)) { + switch (pTable->keyType) { + case SDB_KEYTYPE_STRING: + sdbError("table:%s, failed to insert record:%s sdbVersion:%" PRId64 " id:%" PRId64 , pTable->name, (char *)row, sdbVersion, pTable->id); + break; + case SDB_KEYTYPE_AUTO: + sdbError("table:%s, failed to insert record:%d sdbVersion:%" PRId64 " id:%" PRId64, pTable->name, *(int32_t *)row, sdbVersion, pTable->id); + break; + default: + sdbError("table:%s, failed to insert record sdbVersion:%" PRId64 " id:%" PRId64, pTable->name, sdbVersion, pTable->id); + break; } + return -1; + } total_size = sizeof(SRowHead) + pTable->maxRowSize + sizeof(TSCKSUM); SRowHead *rowHead = (SRowHead *)malloc(total_size); if (rowHead == NULL) { - sdbError("failed to allocate row head memory, sdb: %s", pTable->name); + sdbError("table:%s, failed to allocate row head memory", pTable->name); return -1; } memset(rowHead, 0, total_size); - if (rowSize == 0) { // object is created already + if (oper == SDB_OPER_GLOBAL) { pObj = row; - } else { // encoded string, to create object - pObj = (*(pTable->appTool))(SDB_TYPE_DECODE, NULL, row, rowSize, NULL); - } - (*(pTable->appTool))(SDB_TYPE_ENCODE, pObj, rowHead->data, pTable->maxRowSize, &(rowHead->rowSize)); - assert(rowHead->rowSize > 0 && rowHead->rowSize <= pTable->maxRowSize); - + } else { + pObj = (*(pTable->appTool))(SDB_TYPE_DECODE, NULL, row, 0, NULL); + } + pthread_mutex_lock(&pTable->mutex); - if (sdbForwardDbReqToPeer(pTable, SDB_TYPE_INSERT, rowHead->data, rowHead->rowSize) == 0) { - pTable->id++; - sdbVersion++; - if (pTable->keyType == SDB_KEYTYPE_AUTO) { - // TODO:here need to change - *((uint32_t *)pObj) = ++pTable->autoIndex; - (*(pTable->appTool))(SDB_TYPE_ENCODE, pObj, rowHead->data, pTable->maxRowSize, &(rowHead->rowSize)); - } + if (oper == SDB_OPER_GLOBAL) { + if (sdbForwardDbReqToPeer(pTable, SDB_TYPE_INSERT, rowHead->data, rowHead->rowSize) != 0) { + sdbError("table:%s, failed to insert record", pTable->name); + pthread_mutex_unlock(&pTable->mutex); + tfree(rowHead); + return -1; + } + } - real_size = sizeof(SRowHead) + rowHead->rowSize + sizeof(TSCKSUM); + if (oper == SDB_OPER_GLOBAL || oper == SDB_OPER_LOCAL) { + (*(pTable->appTool))(SDB_TYPE_ENCODE, pObj, rowHead->data, pTable->maxRowSize, &(rowHead->rowSize)); + assert(rowHead->rowSize > 0 && rowHead->rowSize <= pTable->maxRowSize); + real_size = sizeof(SRowHead) + rowHead->rowSize + sizeof(TSCKSUM); rowHead->delimiter = SDB_DELIMITER; - rowHead->id = pTable->id; + rowHead->id = pTable->id + 1; if (taosCalcChecksumAppend(0, (uint8_t *)rowHead, real_size) < 0) { - sdbError("failed to get checksum while inserting, sdb:%s", pTable->name); + sdbError("table:%s, failed to get checksum while inserting", pTable->name); pthread_mutex_unlock(&pTable->mutex); tfree(rowHead); return -1; @@ -477,14 +459,10 @@ int64_t sdbInsertRow(void *handle, void *row, int32_t rowSize) { rowMeta.row = pObj; (*sdbAddIndexFp[pTable->keyType])(pTable->iHandle, pObj, &rowMeta); - /* Update the disk content */ - /* write(pTable->fd, &action, sizeof(action)); */ - /* pTable->size += sizeof(action); */ twrite(pTable->fd, rowHead, real_size); pTable->size += real_size; sdbFinishCommit(pTable); - pTable->numOfRows++; switch (pTable->keyType) { case SDB_KEYTYPE_STRING: sdbTrace("table:%s, a record is inserted:%s, sdbVersion:%" PRId64 " id:%" PRId64 " rowSize:%d numOfRows:%d fileSize:%" PRId64, @@ -499,32 +477,33 @@ int64_t sdbInsertRow(void *handle, void *row, int32_t rowSize) { pTable->name, sdbVersion, rowHead->id, rowHead->rowSize, pTable->numOfRows, pTable->size); break; } + } - id = rowMeta.id; - } else { - sdbError("table:%s, failed to insert record", pTable->name); + if (pTable->keyType == SDB_KEYTYPE_AUTO) { + *((uint32_t *)pObj) = ++pTable->autoIndex; } - tfree(rowHead); + pTable->numOfRows++; + pTable->id++; + sdbVersion++; pthread_mutex_unlock(&pTable->mutex); - /* callback function to update the MGMT layer */ - if (id >= 0 && pTable->appTool) (*pTable->appTool)(SDB_TYPE_INSERT, pObj, NULL, 0, NULL); + (*pTable->appTool)(SDB_TYPE_INSERT, pObj, NULL, 0, NULL); + + tfree(rowHead); - return id; + return 0; } // row here can be object or null-terminated string -int32_t sdbDeleteRow(void *handle, void *row) { +int32_t sdbDeleteRow(void *handle, void *row, ESdbOper oper) { SSdbTable *pTable = (SSdbTable *)handle; SRowMeta * pMeta = NULL; - int32_t code = -1; void * pMetaRow = NULL; SRowHead * rowHead = NULL; - int32_t rowSize = 0; - int32_t total_size = 0; - /* char action = SDB_TYPE_DELETE; */ + int32_t rowSize = 0; + int32_t total_size = 0; if (pTable == NULL) return -1; @@ -558,67 +537,67 @@ int32_t sdbDeleteRow(void *handle, void *row) { pthread_mutex_lock(&pTable->mutex); - if (sdbForwardDbReqToPeer(pTable, SDB_TYPE_DELETE, (char *)row, rowSize) == 0) { - pTable->id++; - sdbVersion++; - - rowHead->delimiter = SDB_DELIMITER; - rowHead->rowSize = rowSize; - rowHead->id = -(pTable->id); - memcpy(rowHead->data, row, rowSize); - if (taosCalcChecksumAppend(0, (uint8_t *)rowHead, total_size) < 0) { - sdbError("failed to get checksum while inserting, sdb:%s", pTable->name); + if (oper == SDB_OPER_GLOBAL) { + if (sdbForwardDbReqToPeer(pTable, SDB_TYPE_DELETE, (char *)row, rowSize) == 0) { + sdbError("table:%s, failed to delete record", pTable->name); pthread_mutex_unlock(&pTable->mutex); tfree(rowHead); return -1; - } - /* write(pTable->fd, &action, sizeof(action)); */ - /* pTable->size += sizeof(action); */ - twrite(pTable->fd, rowHead, total_size); - pTable->size += total_size; - sdbFinishCommit(pTable); + } + } - pTable->numOfRows--; - - switch (pTable->keyType) { - case SDB_KEYTYPE_STRING: - sdbTrace("table:%s, a record is deleted:%s, sdbVersion:%" PRId64 " id:%" PRId64 " numOfRows:%d", - pTable->name, (char *)row, sdbVersion, pTable->id, pTable->numOfRows); - break; - case SDB_KEYTYPE_AUTO: - sdbTrace("table:%s, a record is deleted:%d, sdbVersion:%" PRId64 " id:%" PRId64 " numOfRows:%d", - pTable->name, *(int32_t *)row, sdbVersion, pTable->id, pTable->numOfRows); - break; - default: - sdbTrace("table:%s, a record is deleted, sdbVersion:%" PRId64 " id:%" PRId64 " numOfRows:%d", - pTable->name, sdbVersion, pTable->id, pTable->numOfRows); - break; - } + rowHead->delimiter = SDB_DELIMITER; + rowHead->rowSize = rowSize; + rowHead->id = -(pTable->id); + memcpy(rowHead->data, row, rowSize); + if (taosCalcChecksumAppend(0, (uint8_t *)rowHead, total_size) < 0) { + sdbError("failed to get checksum while inserting, sdb:%s", pTable->name); + pthread_mutex_unlock(&pTable->mutex); + tfree(rowHead); + return -1; + } - // Delete from current layer - (*sdbDeleteIndexFp[pTable->keyType])(pTable->iHandle, row); + twrite(pTable->fd, rowHead, total_size); + pTable->size += total_size; + sdbFinishCommit(pTable); - code = 0; + switch (pTable->keyType) { + case SDB_KEYTYPE_STRING: + sdbTrace("table:%s, a record is deleted:%s, sdbVersion:%" PRId64 " id:%" PRId64 " numOfRows:%d", + pTable->name, (char *)row, sdbVersion, pTable->id, pTable->numOfRows); + break; + case SDB_KEYTYPE_AUTO: + sdbTrace("table:%s, a record is deleted:%d, sdbVersion:%" PRId64 " id:%" PRId64 " numOfRows:%d", + pTable->name, *(int32_t *)row, sdbVersion, pTable->id, pTable->numOfRows); + break; + default: + sdbTrace("table:%s, a record is deleted, sdbVersion:%" PRId64 " id:%" PRId64 " numOfRows:%d", + pTable->name, sdbVersion, pTable->id, pTable->numOfRows); + break; } + // Delete from current layer + (*sdbDeleteIndexFp[pTable->keyType])(pTable->iHandle, row); + + pTable->numOfRows--; + pTable->id++; + sdbVersion++; + pthread_mutex_unlock(&pTable->mutex); tfree(rowHead); - // callback function of the delete - if (code == 0 && pTable->appTool) (*pTable->appTool)(SDB_TYPE_DELETE, pMetaRow, NULL, 0, NULL); + (*pTable->appTool)(SDB_TYPE_DELETE, pMetaRow, NULL, 0, NULL); - return code; + return 0; } // row here can be the object or the string info (encoded string) -int32_t sdbUpdateRow(void *handle, void *row, int32_t updateSize, char isUpdated) { +int32_t sdbUpdateRow(void *handle, void *row, int32_t updateSize, ESdbOper oper) { SSdbTable *pTable = (SSdbTable *)handle; SRowMeta * pMeta = NULL; - int32_t code = -1; - int32_t total_size = 0; - int32_t real_size = 0; - /* char action = SDB_TYPE_UPDATE; */ + int32_t total_size = 0; + int32_t real_size = 0; if (pTable == NULL || row == NULL) return -1; pMeta = sdbGetRowMeta(handle, row); @@ -651,8 +630,15 @@ int32_t sdbUpdateRow(void *handle, void *row, int32_t updateSize, char isUpdated } memset(rowHead, 0, total_size); - if (!isUpdated) { - (*(pTable->appTool))(SDB_TYPE_UPDATE, pMetaRow, row, updateSize, NULL); // update in upper layer + pthread_mutex_lock(&pTable->mutex); + + if (oper == SDB_OPER_GLOBAL) { + if (sdbForwardDbReqToPeer(pTable, SDB_TYPE_UPDATE, rowHead->data, rowHead->rowSize) == 0) { + sdbError("table:%s, failed to update record", pTable->name); + pthread_mutex_unlock(&pTable->mutex); + tfree(rowHead); + return -1; + } } if (pMetaRow != row) { @@ -663,57 +649,51 @@ int32_t sdbUpdateRow(void *handle, void *row, int32_t updateSize, char isUpdated } real_size = sizeof(SRowHead) + rowHead->rowSize + sizeof(TSCKSUM); - ; - - pthread_mutex_lock(&pTable->mutex); - if (sdbForwardDbReqToPeer(pTable, SDB_TYPE_UPDATE, rowHead->data, rowHead->rowSize) == 0) { - pTable->id++; - sdbVersion++; - - // write to the new position - rowHead->delimiter = SDB_DELIMITER; - rowHead->id = pTable->id; - if (taosCalcChecksumAppend(0, (uint8_t *)rowHead, real_size) < 0) { - sdbError("failed to get checksum, sdb:%s id:%d", pTable->name, rowHead->id); - pthread_mutex_unlock(&pTable->mutex); - tfree(rowHead); - return -1; - } - /* write(pTable->fd, &action, sizeof(action)); */ - /* pTable->size += sizeof(action); */ - twrite(pTable->fd, rowHead, real_size); + // write to the new position + rowHead->delimiter = SDB_DELIMITER; + rowHead->id = pTable->id; + if (taosCalcChecksumAppend(0, (uint8_t *)rowHead, real_size) < 0) { + sdbError("failed to get checksum, sdb:%s id:%d", pTable->name, rowHead->id); + pthread_mutex_unlock(&pTable->mutex); + tfree(rowHead); + return -1; + } + + twrite(pTable->fd, rowHead, real_size); - pMeta->id = pTable->id; - pMeta->offset = pTable->size; - pMeta->rowSize = rowHead->rowSize; - pTable->size += real_size; + pMeta->id = pTable->id; + pMeta->offset = pTable->size; + pMeta->rowSize = rowHead->rowSize; + pTable->size += real_size; - sdbFinishCommit(pTable); - - switch (pTable->keyType) { - case SDB_KEYTYPE_STRING: - sdbTrace("table:%s, a record is updated:%s, sdbVersion:%" PRId64 " id:%" PRId64 " numOfRows:%" PRId64, - pTable->name, (char *)row, sdbVersion, pTable->id, pTable->numOfRows); - break; - case SDB_KEYTYPE_AUTO: - sdbTrace("table:%s, a record is updated:%d, sdbVersion:%" PRId64 " id:%" PRId64 " numOfRows:%" PRId64, - pTable->name, *(int32_t *)row, sdbVersion, pTable->id, pTable->numOfRows); - break; - default: - sdbTrace("table:%s, a record is updated, sdbVersion:%" PRId64 " id:%" PRId64 " numOfRows:%" PRId64, pTable->name, sdbVersion, - pTable->id, pTable->numOfRows); - break; - } + sdbFinishCommit(pTable); - code = 0; + switch (pTable->keyType) { + case SDB_KEYTYPE_STRING: + sdbTrace("table:%s, a record is updated:%s, sdbVersion:%" PRId64 " id:%" PRId64 " numOfRows:%" PRId64, + pTable->name, (char *)row, sdbVersion, pTable->id, pTable->numOfRows); + break; + case SDB_KEYTYPE_AUTO: + sdbTrace("table:%s, a record is updated:%d, sdbVersion:%" PRId64 " id:%" PRId64 " numOfRows:%" PRId64, + pTable->name, *(int32_t *)row, sdbVersion, pTable->id, pTable->numOfRows); + break; + default: + sdbTrace("table:%s, a record is updated, sdbVersion:%" PRId64 " id:%" PRId64 " numOfRows:%" PRId64, pTable->name, sdbVersion, + pTable->id, pTable->numOfRows); + break; } + pTable->id++; + sdbVersion++; + pthread_mutex_unlock(&pTable->mutex); + (*(pTable->appTool))(SDB_TYPE_UPDATE, pMetaRow, row, updateSize, NULL); // update in upper layer + tfree(rowHead); - return code; + return 0; } void sdbCloseTable(void *handle) { diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index c50ecc01c7d758fbca6553328a312130db9b157f..501faaab97b1808e308799b71c71de56565a796a 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -132,7 +132,7 @@ void mgmtAddToShellQueue(SQueuedMsg *queuedMsg) { } static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { - if (!sdbInServerState()) { + if (!mgmtInServerStatus()) { mgmtProcessMsgWhileNotReady(rpcMsg); rpcFreeCont(rpcMsg->pCont); return; diff --git a/src/mnode/src/mgmtSuperTable.c b/src/mnode/src/mgmtSuperTable.c index 075633cd4d29c894bfe609102771baed1dcba044..9120854e4769aac95319981e1be6b8f0d1634ba0 100644 --- a/src/mnode/src/mgmtSuperTable.c +++ b/src/mnode/src/mgmtSuperTable.c @@ -181,7 +181,7 @@ int32_t mgmtInitSuperTables() { SDbObj *pDb = mgmtGetDbByTableId(pTable->tableId); if (pDb == NULL) { mError("super table:%s, failed to get db, discard it", pTable->tableId); - sdbDeleteRow(tsSuperTableSdb, pTable); + sdbDeleteRow(tsSuperTableSdb, pTable, SDB_OPER_DISK); pNode = pLastNode; continue; } @@ -233,7 +233,7 @@ int32_t mgmtCreateSuperTable(SCMCreateTableMsg *pCreate) { tschema[col].bytes = htons(tschema[col].bytes); } - if (sdbInsertRow(tsSuperTableSdb, pStable, 0) < 0) { + if (sdbInsertRow(tsSuperTableSdb, pStable, SDB_OPER_GLOBAL) < 0) { mError("stable:%s, update sdb error", pStable->tableId); return TSDB_CODE_SDB_ERROR; } @@ -319,7 +319,7 @@ int32_t mgmtAddSuperTableTag(SSuperTableObj *pStable, SSchema schema[], int32_t pStable->sversion++; pAcct->acctInfo.numOfTimeSeries += (ntags * pStable->numOfTables); - sdbUpdateRow(tsSuperTableSdb, pStable, 0, 1); + sdbUpdateRow(tsSuperTableSdb, pStable, tsSuperTableUpdateSize, SDB_OPER_GLOBAL); mTrace("Succeed to add tag column %s to table %s", schema[0].name, pStable->tableId); return TSDB_CODE_SUCCESS; @@ -352,7 +352,7 @@ int32_t mgmtDropSuperTableTag(SSuperTableObj *pStable, char *tagName) { int32_t schemaSize = sizeof(SSchema) * (pStable->numOfTags + pStable->numOfColumns); pStable->schema = realloc(pStable->schema, schemaSize); - sdbUpdateRow(tsSuperTableSdb, pStable, 0, 1); + sdbUpdateRow(tsSuperTableSdb, pStable, tsSuperTableUpdateSize, SDB_OPER_GLOBAL); return TSDB_CODE_SUCCESS; } @@ -384,7 +384,7 @@ int32_t mgmtModifySuperTableTagNameByName(SSuperTableObj *pStable, char *oldTagN mgmtSuperTableActionEncode(pStable, msg, size, &rowSize); - int32_t ret = sdbUpdateRow(tsSuperTableSdb, msg, rowSize, 1); + int32_t ret = sdbUpdateRow(tsSuperTableSdb, msg, tsSuperTableUpdateSize, SDB_OPER_GLOBAL); tfree(msg); if (ret < 0) { @@ -446,7 +446,7 @@ int32_t mgmtAddSuperTableColumn(SSuperTableObj *pStable, SSchema schema[], int32 pStable->sversion++; pAcct->acctInfo.numOfTimeSeries += (ncols * pStable->numOfTables); - sdbUpdateRow(tsSuperTableSdb, pStable, 0, 1); + sdbUpdateRow(tsSuperTableSdb, pStable, tsSuperTableUpdateSize, SDB_OPER_GLOBAL); return TSDB_CODE_SUCCESS; } @@ -479,7 +479,7 @@ int32_t mgmtDropSuperTableColumnByName(SSuperTableObj *pStable, char *colName) { pStable->schema = realloc(pStable->schema, schemaSize); pAcct->acctInfo.numOfTimeSeries -= (pStable->numOfTables); - sdbUpdateRow(tsSuperTableSdb, pStable, 0, 1); + sdbUpdateRow(tsSuperTableSdb, pStable, tsSuperTableUpdateSize, SDB_OPER_GLOBAL); return TSDB_CODE_SUCCESS; } @@ -618,7 +618,7 @@ void mgmtDropAllSuperTables(SDbObj *pDropDb) { } if (strncmp(pDropDb->name, pTable->tableId, dbNameLen) == 0) { - sdbDeleteRow(tsSuperTableSdb, pTable); + sdbDeleteRow(tsSuperTableSdb, pTable, SDB_OPER_GLOBAL); pNode = pLastNode; numOfTables ++; continue; diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index 34c1a69cb1363c8fb5e6a17626a491d8919a3959..a907cd85f53da70b69c2c0ff6e9130f3212770e8 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -757,9 +757,9 @@ static void mgmtProcessCreateTableRsp(SRpcMsg *rpcMsg) { if (rpcMsg->code != TSDB_CODE_SUCCESS) { if (pTable->type == TSDB_CHILD_TABLE) { - sdbDeleteRow(tsChildTableSdb, pTable); + sdbDeleteRow(tsChildTableSdb, pTable, SDB_OPER_GLOBAL); } else if (pTable->type == TSDB_NORMAL_TABLE){ - sdbDeleteRow(tsNormalTableSdb, pTable); + sdbDeleteRow(tsNormalTableSdb, pTable, SDB_OPER_GLOBAL); } else {} mError("table:%s, failed to create in dnode, reason:%s", pTable->tableId, tstrerror(rpcMsg->code)); mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code); @@ -813,14 +813,14 @@ static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg) { } if (pTable->type == TSDB_CHILD_TABLE) { - if (sdbDeleteRow(tsChildTableSdb, pTable) < 0) { + if (sdbDeleteRow(tsChildTableSdb, pTable, SDB_OPER_GLOBAL) < 0) { mError("table:%s, update ctables sdb error", pTable->tableId); mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SDB_ERROR); free(queueMsg); return; } } else if (pTable->type == TSDB_NORMAL_TABLE){ - if (sdbDeleteRow(tsNormalTableSdb, pTable) < 0) { + if (sdbDeleteRow(tsNormalTableSdb, pTable, SDB_OPER_GLOBAL) < 0) { mError("table:%s, update ntables sdb error", pTable->tableId); mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SDB_ERROR); free(queueMsg); diff --git a/src/mnode/src/mgmtUser.c b/src/mnode/src/mgmtUser.c index 33caa759880e3bfffb6f151173f8f9c9cc7f63dd..174976f4c0780b3c7e4788e541ceef6dd8a2f8d1 100644 --- a/src/mnode/src/mgmtUser.c +++ b/src/mnode/src/mgmtUser.c @@ -103,7 +103,7 @@ SUserObj *mgmtGetUser(char *name) { } static int32_t mgmtUpdateUser(SUserObj *pUser) { - return sdbUpdateRow(tsUserSdb, pUser, 0, 1); + return sdbUpdateRow(tsUserSdb, pUser, tsUserUpdateSize, SDB_OPER_GLOBAL); } static int32_t mgmtCreateUser(SAcctObj *pAcct, char *name, char *pass) { @@ -140,7 +140,7 @@ static int32_t mgmtCreateUser(SAcctObj *pAcct, char *name, char *pass) { } code = TSDB_CODE_SUCCESS; - if (sdbInsertRow(tsUserSdb, pUser, 0) < 0) { + if (sdbInsertRow(tsUserSdb, pUser, SDB_OPER_GLOBAL) < 0) { tfree(pUser); code = TSDB_CODE_SDB_ERROR; } @@ -161,7 +161,7 @@ static int32_t mgmtDropUser(SAcctObj *pAcct, char *name) { return TSDB_CODE_NO_RIGHTS; } - sdbDeleteRow(tsUserSdb, pUser); + sdbDeleteRow(tsUserSdb, pUser, SDB_OPER_GLOBAL); return 0; } diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index 62a2abd17d7e690c4249714231dbe026a8220d91..d4df83c25c959828a54b6c264341367675d56432 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -112,7 +112,7 @@ int32_t mgmtInitVgroups() { if (tsIsCluster && pVgroup->vnodeGid[0].publicIp == 0) { pVgroup->vnodeGid[0].publicIp = inet_addr(tsPublicIp); pVgroup->vnodeGid[0].privateIp = inet_addr(tsPrivateIp); - sdbUpdateRow(tsVgroupSdb, pVgroup, tsVgUpdateSize, 1); + sdbUpdateRow(tsVgroupSdb, pVgroup, tsVgUpdateSize, SDB_OPER_GLOBAL); } // mgmtSetDnodeVgid(pVgroup->vnodeGid, pVgroup->numOfVnodes, pVgroup->vgId); @@ -161,7 +161,7 @@ void mgmtCreateVgroup(SQueuedMsg *pMsg) { mgmtAddVgroupIntoDb(pDb, pVgroup); // mgmtSetDnodeVgid(pVgroup->vnodeGid, pVgroup->numOfVnodes, pVgroup->vgId); - sdbInsertRow(tsVgroupSdb, pVgroup, 0); + sdbInsertRow(tsVgroupSdb, pVgroup, SDB_OPER_GLOBAL); mPrint("vgroup:%d, is created in mnode, db:%s replica:%d", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes); for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { @@ -179,7 +179,7 @@ void mgmtDropVgroup(SVgObj *pVgroup, void *ahandle) { } else { mTrace("vgroup:%d, replica:%d is deleting from sdb", pVgroup->vgId, pVgroup->numOfVnodes); mgmtSendDropVgroupMsg(pVgroup, NULL); - sdbDeleteRow(tsVgroupSdb, pVgroup); + sdbDeleteRow(tsVgroupSdb, pVgroup, SDB_OPER_GLOBAL); } } @@ -474,7 +474,7 @@ static void *mgmtVgroupActionDestroy(void *row, char *str, int32_t size, int32_t } void mgmtUpdateVgroup(SVgObj *pVgroup) { - sdbUpdateRow(tsVgroupSdb, pVgroup, tsVgUpdateSize, 0); + sdbUpdateRow(tsVgroupSdb, pVgroup, tsVgUpdateSize, SDB_OPER_LOCAL); } void mgmtAddTableIntoVgroup(SVgObj *pVgroup, STableInfo *pTable) { @@ -607,7 +607,7 @@ static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg) { memcpy(newMsg->pCont, queueMsg->pCont, newMsg->contLen); mgmtAddToShellQueue(newMsg); } else { - sdbDeleteRow(tsVgroupSdb, pVgroup); + sdbDeleteRow(tsVgroupSdb, pVgroup, SDB_OPER_GLOBAL); mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code); } @@ -661,7 +661,7 @@ static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg) { if (queueMsg->received != queueMsg->expected) return; - sdbDeleteRow(tsVgroupSdb, pVgroup); + sdbDeleteRow(tsVgroupSdb, pVgroup, SDB_OPER_GLOBAL); SQueuedMsg *newMsg = calloc(1, sizeof(SQueuedMsg)); newMsg->msgType = queueMsg->msgType; @@ -691,7 +691,7 @@ void mgmtUpdateVgroupIp(SDnodeObj *pDnode) { pDnode->publicIp, taosIpStr(vnodeGid->publicIp)); vnodeGid->publicIp = pDnode->publicIp; vnodeGid->privateIp = pDnode->privateIp; - sdbUpdateRow(tsVgroupSdb, pVgroup, tsVgUpdateSize, 1); + sdbUpdateRow(tsVgroupSdb, pVgroup, tsVgUpdateSize, SDB_OPER_GLOBAL); } } }