diff --git a/src/mnode/inc/mgmtSdb.h b/src/mnode/inc/mgmtSdb.h index 7381ec6669c8ab2512759eb02cbe5e6ff582dbb6..bf5ccb26340019bc879dbbaf9d843cc488cbff51 100644 --- a/src/mnode/inc/mgmtSdb.h +++ b/src/mnode/inc/mgmtSdb.h @@ -20,42 +20,49 @@ extern "C" { #endif -enum _keytype { +typedef enum { SDB_KEYTYPE_STRING, SDB_KEYTYPE_AUTO, SDB_KEYTYPE_MAX } ESdbKeyType; -enum _sdbaction { - SDB_TYPE_INSERT, - SDB_TYPE_DELETE, - SDB_TYPE_UPDATE, - SDB_TYPE_DECODE, - SDB_TYPE_ENCODE, - SDB_TYPE_DESTROY, - SDB_MAX_ACTION_TYPES -} ESdbType; - typedef enum { SDB_OPER_GLOBAL, SDB_OPER_LOCAL, SDB_OPER_DISK -} ESdbOper; +} ESdbOperType; -uint64_t sdbGetVersion(); +enum _sdbaction { + SDB_TYPE_INSERT, + SDB_TYPE_DELETE, + SDB_TYPE_UPDATE, +} ESdbForwardType; + +typedef struct { + char *tableName; + int32_t hashSessions; + int32_t maxRowSize; + ESdbKeyType keyType; + int32_t (*insertFp)(void *pObj); + int32_t (*deleteFp)(void *pObj); + int32_t (*updateFp)(void *pObj); + int32_t (*encodeFp)(void *pObj, void *pData, int32_t maxRowSize); + void * (*decodeFp)(void *pData); + int32_t (*destroyFp)(void *pObj); +} SSdbTableDesc; -void *sdbOpenTable(int32_t maxRows, int32_t maxRowSize, char *name, uint8_t keyType, char *directory, - void *(*appTool)(char, void *, char *, int32_t, int32_t *)); +void *sdbOpenTable(SSdbTableDesc *desc); void sdbCloseTable(void *handle); +int32_t sdbInsertRow(void *handle, void *row, ESdbOperType oper); +int32_t sdbDeleteRow(void *handle, void *key, ESdbOperType oper); +int32_t sdbUpdateRow(void *handle, void *row, int32_t rowSize, ESdbOperType oper); + void *sdbGetRow(void *handle, void *key); void *sdbFetchRow(void *handle, void *pNode, void **ppRow); int64_t sdbGetId(void *handle); int64_t sdbGetNumOfRows(void *handle); - -int32_t sdbInsertRow(void *handle, void *row, ESdbOper oper); -int32_t sdbDeleteRow(void *handle, void *key, ESdbOper oper); -int32_t sdbUpdateRow(void *handle, void *row, int32_t rowSize, ESdbOper oper); +uint64_t sdbGetVersion(); #ifdef __cplusplus } diff --git a/src/mnode/src/mgmtChildTable.c b/src/mnode/src/mgmtChildTable.c index be78983c91c665e5ded69c1d5e51f964700a2579..5b27acf1c41c8746551e5837a9fa50221bc5cc7a 100644 --- a/src/mnode/src/mgmtChildTable.c +++ b/src/mnode/src/mgmtChildTable.c @@ -39,67 +39,43 @@ void *tsChildTableSdb; int32_t tsChildTableUpdateSize; -void *(*mgmtChildTableActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int32_t size, int32_t *ssize); - -void *mgmtChildTableActionInsert(void *row, char *str, int32_t size, int32_t *ssize); -void *mgmtChildTableActionDelete(void *row, char *str, int32_t size, int32_t *ssize); -void *mgmtChildTableActionUpdate(void *row, char *str, int32_t size, int32_t *ssize); -void *mgmtChildTableActionEncode(void *row, char *str, int32_t size, int32_t *ssize); -void *mgmtChildTableActionDecode(void *row, char *str, int32_t size, int32_t *ssize); -void *mgmtChildTableActionReset(void *row, char *str, int32_t size, int32_t *ssize); -void *mgmtChildTableActionDestroy(void *row, char *str, int32_t size, int32_t *ssize); static void mgmtDestroyChildTable(SChildTableObj *pTable) { - free(pTable); -} - -static void mgmtChildTableActionInit() { - mgmtChildTableActionFp[SDB_TYPE_INSERT] = mgmtChildTableActionInsert; - mgmtChildTableActionFp[SDB_TYPE_DELETE] = mgmtChildTableActionDelete; - mgmtChildTableActionFp[SDB_TYPE_UPDATE] = mgmtChildTableActionUpdate; - mgmtChildTableActionFp[SDB_TYPE_ENCODE] = mgmtChildTableActionEncode; - mgmtChildTableActionFp[SDB_TYPE_DECODE] = mgmtChildTableActionDecode; - mgmtChildTableActionFp[SDB_TYPE_DESTROY] = mgmtChildTableActionDestroy; + tfree(pTable); } -void *mgmtChildTableActionReset(void *row, char *str, int32_t size, int32_t *ssize) { - SChildTableObj *pTable = (SChildTableObj *) row; - memcpy(pTable, str, tsChildTableUpdateSize); - return NULL; -} - -void *mgmtChildTableActionDestroy(void *row, char *str, int32_t size, int32_t *ssize) { - SChildTableObj *pTable = (SChildTableObj *)row; +static int32_t mgmtChildTableActionDestroy(void *pObj) { + SChildTableObj *pTable = (SChildTableObj *)pObj; mgmtDestroyChildTable(pTable); - return NULL; + return TSDB_CODE_SUCCESS; } -void *mgmtChildTableActionInsert(void *row, char *str, int32_t size, int32_t *ssize) { - SChildTableObj *pTable = (SChildTableObj *) row; +static int32_t mgmtChildTableActionInsert(void *pObj) { + SChildTableObj *pTable = (SChildTableObj *) pObj; SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); if (pVgroup == NULL) { mError("ctable:%s, not in vgroup:%d", pTable->tableId, pTable->vgId); - return NULL; + return TSDB_CODE_INVALID_VGROUP_ID; } SDbObj *pDb = mgmtGetDb(pVgroup->dbName); if (pDb == NULL) { mError("ctable:%s, vgroup:%d not in db:%s", pTable->tableId, pVgroup->vgId, pVgroup->dbName); - return NULL; + return TSDB_CODE_INVALID_DB; } SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); if (pAcct == NULL) { mError("ctable:%s, account:%s not exists", pTable->tableId, pDb->cfg.acct); - return NULL; + return TSDB_CODE_INVALID_ACCT; } if (!mgmtIsMaster()) { int32_t sid = taosAllocateId(pVgroup->idPool); if (sid != pTable->sid) { mError("ctable:%s, sid:%d is not matched from the master:%d", pTable->tableId, sid, pTable->sid); - return NULL; + return TSDB_CODE_INVALID_SESSION_ID; } } @@ -114,30 +90,30 @@ void *mgmtChildTableActionInsert(void *row, char *str, int32_t size, int32_t *ss mgmtMoveVgroupToTail(pDb, pVgroup); } - return NULL; + return TSDB_CODE_SUCCESS; } -void *mgmtChildTableActionDelete(void *row, char *str, int32_t size, int32_t *ssize) { - SChildTableObj *pTable = (SChildTableObj *) row; +static int32_t mgmtChildTableActionDelete(void *pObj) { + SChildTableObj *pTable = (SChildTableObj *) pObj; if (pTable->vgId == 0) { - return NULL; + return TSDB_CODE_INVALID_VGROUP_ID; } SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); if (pVgroup == NULL) { - return NULL; + return TSDB_CODE_INVALID_VGROUP_ID; } SDbObj *pDb = mgmtGetDb(pVgroup->dbName); if (pDb == NULL) { mError("ctable:%s, vgroup:%d not in DB:%s", pTable->tableId, pVgroup->vgId, pVgroup->dbName); - return NULL; + return TSDB_CODE_INVALID_DB; } SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); if (pAcct == NULL) { mError("ctable:%s, account:%s not exists", pTable->tableId, pDb->cfg.acct); - return NULL; + return TSDB_CODE_INVALID_ACCT; } mgmtRestoreTimeSeries(pAcct, pTable->superTable->numOfColumns - 1); @@ -150,56 +126,54 @@ void *mgmtChildTableActionDelete(void *row, char *str, int32_t size, int32_t *ss mgmtMoveVgroupToHead(pDb, pVgroup); } - return NULL; + return TSDB_CODE_SUCCESS; } -void *mgmtChildTableActionUpdate(void *row, char *str, int32_t size, int32_t *ssize) { - return mgmtChildTableActionReset(row, str, size, NULL); +static int32_t mgmtChildTableActionUpdate(void *pObj) { + return TSDB_CODE_SUCCESS; } -void *mgmtChildTableActionEncode(void *row, char *str, int32_t size, int32_t *ssize) { - SChildTableObj *pTable = (SChildTableObj *) row; - assert(row != NULL && str != NULL); - - memcpy(str, pTable, tsChildTableUpdateSize); - *ssize = tsChildTableUpdateSize; +static int32_t mgmtChildTableActionEncode(void *pObj, void *pData, int32_t maxRowSize) { + SChildTableObj *pTable = (SChildTableObj *) pObj; + assert(pObj != NULL && pData != NULL); - return NULL; + memcpy(pData, pTable, tsChildTableUpdateSize); + return tsChildTableUpdateSize; } -void *mgmtChildTableActionDecode(void *row, char *str, int32_t size, int32_t *ssize) { - assert(str != NULL); +static void *mgmtChildTableActionDecode(void *pData) { + assert(pData != NULL); SChildTableObj *pTable = (SChildTableObj *)calloc(sizeof(SChildTableObj), 1); if (pTable == NULL) return NULL; - if (size < tsChildTableUpdateSize) { - mgmtDestroyChildTable(pTable); - return NULL; - } - memcpy(pTable, str, tsChildTableUpdateSize); + memcpy(pTable, pData, tsChildTableUpdateSize); return (void *)pTable; } -void *mgmtChildTableAction(char action, void *row, char *str, int32_t size, int32_t *ssize) { - if (mgmtChildTableActionFp[(uint8_t)action] != NULL) { - return (*(mgmtChildTableActionFp[(uint8_t)action]))(row, str, size, ssize); - } - return NULL; -} - int32_t mgmtInitChildTables() { void *pNode = NULL; void *pLastNode = NULL; SChildTableObj *pTable = NULL; - mgmtChildTableActionInit(); SChildTableObj tObj; tsChildTableUpdateSize = tObj.updateEnd - (int8_t *)&tObj; - tsChildTableSdb = sdbOpenTable(tsMaxTables, tsChildTableUpdateSize, - "ctables", SDB_KEYTYPE_STRING, tsMnodeDir, mgmtChildTableAction); + SSdbTableDesc tableDesc = { + .tableName = "ctables", + .hashSessions = tsMaxTables, + .maxRowSize = tsChildTableUpdateSize, + .keyType = SDB_KEYTYPE_STRING, + .insertFp = mgmtChildTableActionInsert, + .deleteFp = mgmtChildTableActionDelete, + .updateFp = mgmtChildTableActionUpdate, + .encodeFp = mgmtChildTableActionEncode, + .decodeFp = mgmtChildTableActionDecode, + .destroyFp = mgmtChildTableActionDestroy, + }; + + tsChildTableSdb = sdbOpenTable(&tableDesc); if (tsChildTableSdb == NULL) { mError("failed to init child table data"); return -1; diff --git a/src/mnode/src/mgmtDb.c b/src/mnode/src/mgmtDb.c index e889d200f549b65bdad84a88e4eddf8bf5006fbf..2549dd4c64511bc3fe8348ce09c926a1dfe46658 100644 --- a/src/mnode/src/mgmtDb.c +++ b/src/mnode/src/mgmtDb.c @@ -48,29 +48,57 @@ static void mgmtProcessCreateDbMsg(SQueuedMsg *pMsg); static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg); static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg); -static void *(*mgmtDbActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int32_t size, int32_t *ssize); -static void *mgmtDbActionInsert(void *row, char *str, int32_t size, int32_t *ssize); -static void *mgmtDbActionDelete(void *row, char *str, int32_t size, int32_t *ssize); -static void *mgmtDbActionUpdate(void *row, char *str, int32_t size, int32_t *ssize); -static void *mgmtDbActionEncode(void *row, char *str, int32_t size, int32_t *ssize); -static void *mgmtDbActionDecode(void *row, char *str, int32_t size, int32_t *ssize); -static void *mgmtDbActionReset(void *row, char *str, int32_t size, int32_t *ssize); -static void *mgmtDbActionDestroy(void *row, char *str, int32_t size, int32_t *ssize); +static int32_t mgmtDbActionDestroy(void *pObj) { + tfree(pObj); + return TSDB_CODE_SUCCESS; +} + +static int32_t mgmtDbActionInsert(void *pObj) { + SDbObj *pDb = (SDbObj *) pObj; + SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); + + pDb->pHead = NULL; + pDb->pTail = NULL; + pDb->numOfVgroups = 0; + pDb->numOfTables = 0; + mgmtAddDbIntoAcct(pAcct, pDb); + + return TSDB_CODE_SUCCESS; +} + +static int32_t mgmtDbActionDelete(void *pObj) { + SDbObj *pDb = (SDbObj *) pObj; + SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); + mgmtRemoveDbFromAcct(pAcct, pDb); + + mgmtDropAllNormalTables(pDb); + mgmtDropAllChildTables(pDb); + mgmtDropAllSuperTables(pDb); -static void mgmtDbActionInit() { - mgmtDbActionFp[SDB_TYPE_INSERT] = mgmtDbActionInsert; - mgmtDbActionFp[SDB_TYPE_DELETE] = mgmtDbActionDelete; - mgmtDbActionFp[SDB_TYPE_UPDATE] = mgmtDbActionUpdate; - mgmtDbActionFp[SDB_TYPE_ENCODE] = mgmtDbActionEncode; - mgmtDbActionFp[SDB_TYPE_DECODE] = mgmtDbActionDecode; - mgmtDbActionFp[SDB_TYPE_DESTROY] = mgmtDbActionDestroy; + return TSDB_CODE_SUCCESS; } -static void *mgmtDbAction(char action, void *row, char *str, int32_t size, int32_t *ssize) { - if (mgmtDbActionFp[(uint8_t)action] != NULL) { - return (*(mgmtDbActionFp[(uint8_t)action]))(row, str, size, ssize); +static int32_t mgmtDbActionUpdate(void *pObj) { + return TSDB_CODE_SUCCESS; +} + +static int32_t mgmtDbActionEncode(void *pObj, void *pData, int32_t maxRowSize) { + SDbObj *pDb = (SDbObj *)pObj; + if (maxRowSize < tsDbUpdateSize) { + return -1; + } else { + memcpy(pData, pDb, tsDbUpdateSize); + return tsDbUpdateSize; } - return NULL; +} + +static void *mgmtDbActionDecode(void *pData) { + SDbObj *pDb = (SDbObj *) malloc(sizeof(SDbObj)); + if (pDb == NULL) return NULL; + memset(pDb, 0, sizeof(SDbObj)); + memcpy(pDb, pData, tsDbUpdateSize); + + return (void *)pDb; } int32_t mgmtInitDbs() { @@ -78,12 +106,23 @@ int32_t mgmtInitDbs() { SDbObj * pDb = NULL; SAcctObj *pAcct = NULL; - mgmtDbActionInit(); - SDbObj tObj; tsDbUpdateSize = tObj.updateEnd - (char *)&tObj; - tsDbSdb = sdbOpenTable(TSDB_MAX_DBS, tsDbUpdateSize, "dbs", SDB_KEYTYPE_STRING, tsMnodeDir, mgmtDbAction); + SSdbTableDesc tableDesc = { + .tableName = "dbs", + .hashSessions = TSDB_MAX_DBS, + .maxRowSize = tsDbUpdateSize, + .keyType = SDB_KEYTYPE_STRING, + .insertFp = mgmtDbActionInsert, + .deleteFp = mgmtDbActionDelete, + .updateFp = mgmtDbActionUpdate, + .encodeFp = mgmtDbActionEncode, + .decodeFp = mgmtDbActionDecode, + .destroyFp = mgmtDbActionDestroy, + }; + + tsDbSdb = sdbOpenTable(&tableDesc); if (tsDbSdb == NULL) { mError("failed to init db data"); return -1; @@ -683,68 +722,6 @@ static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void * return numOfRows; } -void *mgmtDbActionInsert(void *row, char *str, int32_t size, int32_t *ssize) { - SDbObj *pDb = (SDbObj *) row; - SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); - - pDb->pHead = NULL; - pDb->pTail = NULL; - pDb->numOfVgroups = 0; - pDb->numOfTables = 0; - mgmtAddDbIntoAcct(pAcct, pDb); - - return NULL; -} - -void *mgmtDbActionDelete(void *row, char *str, int32_t size, int32_t *ssize) { - SDbObj *pDb = (SDbObj *) row; - SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); - mgmtRemoveDbFromAcct(pAcct, pDb); - - mgmtDropAllNormalTables(pDb); - mgmtDropAllChildTables(pDb); - mgmtDropAllSuperTables(pDb); - - return NULL; -} - -void *mgmtDbActionUpdate(void *row, char *str, int32_t size, int32_t *ssize) { - return mgmtDbActionReset(row, str, size, ssize); -} - -void *mgmtDbActionEncode(void *row, char *str, int32_t size, int32_t *ssize) { - SDbObj *pDb = (SDbObj *) row; - if (size < tsDbUpdateSize) { - *ssize = -1; - } else { - memcpy(str, pDb, tsDbUpdateSize); - *ssize = tsDbUpdateSize; - } - - return NULL; -} -void *mgmtDbActionDecode(void *row, char *str, int32_t size, int32_t *ssize) { - SDbObj *pDb = (SDbObj *) malloc(sizeof(SDbObj)); - if (pDb == NULL) return NULL; - memset(pDb, 0, sizeof(SDbObj)); - - memcpy(pDb, str, tsDbUpdateSize); - - return (void *)pDb; -} - -void *mgmtDbActionReset(void *row, char *str, int32_t size, int32_t *ssize) { - SDbObj *pDb = (SDbObj *) row; - memcpy(pDb, str, tsDbUpdateSize); - - return NULL; -} - -void *mgmtDbActionDestroy(void *row, char *str, int32_t size, int32_t *ssize) { - tfree(row); - return NULL; -} - void mgmtAddSuperTableIntoDb(SDbObj *pDb) { atomic_add_fetch_32(&pDb->numOfSuperTables, 1); } diff --git a/src/mnode/src/mgmtNormalTable.c b/src/mnode/src/mgmtNormalTable.c index df9fe0f439f61d02c5db88a61f7bed1df9b457d3..58578faf9a3574a0981ffd6ba6266554ce617cf3 100644 --- a/src/mnode/src/mgmtNormalTable.c +++ b/src/mnode/src/mgmtNormalTable.c @@ -35,75 +35,45 @@ void *tsNormalTableSdb; int32_t tsNormalTableUpdateSize; -void *(*mgmtNormalTableActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int32_t size, int32_t *ssize); - -void *mgmtNormalTableActionInsert(void *row, char *str, int32_t size, int32_t *ssize); -void *mgmtNormalTableActionDelete(void *row, char *str, int32_t size, int32_t *ssize); -void *mgmtNormalTableActionUpdate(void *row, char *str, int32_t size, int32_t *ssize); -void *mgmtNormalTableActionEncode(void *row, char *str, int32_t size, int32_t *ssize); -void *mgmtNormalTableActionDecode(void *row, char *str, int32_t size, int32_t *ssize); -void *mgmtNormalTableActionReset(void *row, char *str, int32_t size, int32_t *ssize); -void *mgmtNormalTableActionDestroy(void *row, char *str, int32_t size, int32_t *ssize); static void mgmtDestroyNormalTable(SNormalTableObj *pTable) { - free(pTable->schema); - free(pTable->sql); - free(pTable); -} - -static void mgmtNormalTableActionInit() { - mgmtNormalTableActionFp[SDB_TYPE_INSERT] = mgmtNormalTableActionInsert; - mgmtNormalTableActionFp[SDB_TYPE_DELETE] = mgmtNormalTableActionDelete; - mgmtNormalTableActionFp[SDB_TYPE_UPDATE] = mgmtNormalTableActionUpdate; - mgmtNormalTableActionFp[SDB_TYPE_ENCODE] = mgmtNormalTableActionEncode; - mgmtNormalTableActionFp[SDB_TYPE_DECODE] = mgmtNormalTableActionDecode; - mgmtNormalTableActionFp[SDB_TYPE_DESTROY] = mgmtNormalTableActionDestroy; + tfree(pTable->schema); + tfree(pTable->sql); + tfree(pTable); } -void *mgmtNormalTableActionReset(void *row, char *str, int32_t size, int32_t *ssize) { - SNormalTableObj *pTable = (SNormalTableObj *) row; - memcpy(pTable, str, tsNormalTableUpdateSize); - - int32_t schemaSize = sizeof(SSchema) * (pTable->numOfColumns) + pTable->sqlLen; - pTable->schema = realloc(pTable->schema, schemaSize); - pTable->sql = (char*)pTable->schema + sizeof(SSchema) * (pTable->numOfColumns); - memcpy(pTable->schema, str + tsNormalTableUpdateSize, schemaSize); - - return NULL; -} - -void *mgmtNormalTableActionDestroy(void *row, char *str, int32_t size, int32_t *ssize) { - SNormalTableObj *pTable = (SNormalTableObj *)row; +static int32_t mgmtNormalTableActionDestroy(void *pObj) { + SNormalTableObj *pTable = (SNormalTableObj *)pObj; mgmtDestroyNormalTable(pTable); - return NULL; + return TSDB_CODE_SUCCESS; } -void *mgmtNormalTableActionInsert(void *row, char *str, int32_t size, int32_t *ssize) { - SNormalTableObj *pTable = (SNormalTableObj *) row; +static int32_t mgmtNormalTableActionInsert(void *pObj) { + SNormalTableObj *pTable = (SNormalTableObj *) pObj; SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); if (pVgroup == NULL) { mError("id:%s not in vgroup:%d", pTable->tableId, pTable->vgId); - return NULL; + return TSDB_CODE_INVALID_VGROUP_ID; } SDbObj *pDb = mgmtGetDb(pVgroup->dbName); if (pDb == NULL) { mError("vgroup:%d not in DB:%s", pVgroup->vgId, pVgroup->dbName); - return NULL; + return TSDB_CODE_INVALID_DB; } SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); if (pAcct == NULL) { mError("account not exists"); - return NULL; + return TSDB_CODE_INVALID_ACCT; } if (!mgmtIsMaster()) { int32_t sid = taosAllocateId(pVgroup->idPool); if (sid != pTable->sid) { mError("sid:%d is not matched from the master:%d", sid, pTable->sid); - return NULL; + return TSDB_CODE_INVALID_SESSION_ID; } } @@ -115,30 +85,30 @@ void *mgmtNormalTableActionInsert(void *row, char *str, int32_t size, int32_t *s mgmtMoveVgroupToTail(pDb, pVgroup); } - return NULL; + return TSDB_CODE_SUCCESS; } -void *mgmtNormalTableActionDelete(void *row, char *str, int32_t size, int32_t *ssize) { - SNormalTableObj *pTable = (SNormalTableObj *) row; +static int32_t mgmtNormalTableActionDelete(void *pObj) { + SNormalTableObj *pTable = (SNormalTableObj *) pObj; if (pTable->vgId == 0) { - return NULL; + return TSDB_CODE_INVALID_VGROUP_ID; } SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); if (pVgroup == NULL) { - return NULL; + return TSDB_CODE_INVALID_VGROUP_ID; } SDbObj *pDb = mgmtGetDb(pVgroup->dbName); if (pDb == NULL) { mError("vgroup:%d not in DB:%s", pVgroup->vgId, pVgroup->dbName); - return NULL; + return TSDB_CODE_INVALID_DB; } SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); if (pAcct == NULL) { mError("account not exists"); - return NULL; + return TSDB_CODE_INVALID_ACCT; } mgmtRestoreTimeSeries(pAcct, pTable->numOfColumns - 1); @@ -149,45 +119,46 @@ void *mgmtNormalTableActionDelete(void *row, char *str, int32_t size, int32_t *s mgmtMoveVgroupToHead(pDb, pVgroup); } - return NULL; + return TSDB_CODE_SUCCESS; } -void *mgmtNormalTableActionUpdate(void *row, char *str, int32_t size, int32_t *ssize) { - return mgmtNormalTableActionReset(row, str, size, NULL); +static int32_t mgmtNormalTableActionUpdate(void *pObj) { + // SNormalTableObj *pTable = (SNormalTableObj *) pObj; + // memcpy(pTable, str, tsNormalTableUpdateSize); + + // int32_t schemaSize = sizeof(SSchema) * (pTable->numOfColumns) + pTable->sqlLen; + // pTable->schema = realloc(pTable->schema, schemaSize); + // pTable->sql = (char*)pTable->schema + sizeof(SSchema) * (pTable->numOfColumns); + // memcpy(pTable->schema, str + tsNormalTableUpdateSize, schemaSize); + + return TSDB_CODE_SUCCESS; } -void *mgmtNormalTableActionEncode(void *row, char *str, int32_t size, int32_t *ssize) { - SNormalTableObj *pTable = (SNormalTableObj *) row; - assert(row != NULL && str != NULL); +static int32_t mgmtNormalTableActionEncode(void *pObj, void *pData, int32_t maxRowSize) { + SNormalTableObj *pTable = (SNormalTableObj *) pObj; + assert(pObj != NULL && pData != NULL); int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema); - if (size < tsNormalTableUpdateSize + schemaSize + 1) { - *ssize = -1; - return NULL; + if (maxRowSize < tsNormalTableUpdateSize + schemaSize + 1) { + return -1; } - memcpy(str, pTable, tsNormalTableUpdateSize); - memcpy(str + tsNormalTableUpdateSize, pTable->schema, schemaSize); - memcpy(str + tsNormalTableUpdateSize + schemaSize, pTable->sql, pTable->sqlLen); - *ssize = tsNormalTableUpdateSize + schemaSize + pTable->sqlLen; - - return NULL; + memcpy(pData, pTable, tsNormalTableUpdateSize); + memcpy(pData + tsNormalTableUpdateSize, pTable->schema, schemaSize); + memcpy(pData + tsNormalTableUpdateSize + schemaSize, pTable->sql, pTable->sqlLen); + + return tsNormalTableUpdateSize + schemaSize + pTable->sqlLen; } -void *mgmtNormalTableActionDecode(void *row, char *str, int32_t size, int32_t *ssize) { - assert(str != NULL); +static void *mgmtNormalTableActionDecode(void *pData) { + assert(pData != NULL); SNormalTableObj *pTable = (SNormalTableObj *)malloc(sizeof(SNormalTableObj)); if (pTable == NULL) { return NULL; } memset(pTable, 0, sizeof(SNormalTableObj)); - - if (size < tsNormalTableUpdateSize) { - mgmtDestroyNormalTable(pTable); - return NULL; - } - memcpy(pTable, str, tsNormalTableUpdateSize); + memcpy(pTable, pData, tsNormalTableUpdateSize); int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema); pTable->schema = (SSchema *)malloc(schemaSize); @@ -196,35 +167,39 @@ void *mgmtNormalTableActionDecode(void *row, char *str, int32_t size, int32_t *s return NULL; } - memcpy(pTable->schema, str + tsNormalTableUpdateSize, schemaSize); + memcpy(pTable->schema, pData + tsNormalTableUpdateSize, schemaSize); pTable->sql = (char *)malloc(pTable->sqlLen); if (pTable->sql == NULL) { mgmtDestroyNormalTable(pTable); return NULL; } - memcpy(pTable->sql, str + tsNormalTableUpdateSize + schemaSize, pTable->sqlLen); + memcpy(pTable->sql, pData + tsNormalTableUpdateSize + schemaSize, pTable->sqlLen); return (void *)pTable; } -void *mgmtNormalTableAction(char action, void *row, char *str, int32_t size, int32_t *ssize) { - if (mgmtNormalTableActionFp[(uint8_t)action] != NULL) { - return (*(mgmtNormalTableActionFp[(uint8_t)action]))(row, str, size, ssize); - } - return NULL; -} - int32_t mgmtInitNormalTables() { void *pNode = NULL; void *pLastNode = NULL; SNormalTableObj *pTable = NULL; - mgmtNormalTableActionInit(); SNormalTableObj tObj; tsNormalTableUpdateSize = tObj.updateEnd - (int8_t *)&tObj; - tsNormalTableSdb = sdbOpenTable(TSDB_MAX_NORMAL_TABLES, sizeof(SNormalTableObj) + sizeof(SSchema) * TSDB_MAX_COLUMNS, - "ntables", SDB_KEYTYPE_STRING, tsMnodeDir, mgmtNormalTableAction); + SSdbTableDesc tableDesc = { + .tableName = "ntables", + .hashSessions = TSDB_MAX_NORMAL_TABLES, + .maxRowSize = sizeof(SNormalTableObj) + sizeof(SSchema) * TSDB_MAX_COLUMNS, + .keyType = SDB_KEYTYPE_STRING, + .insertFp = mgmtNormalTableActionInsert, + .deleteFp = mgmtNormalTableActionDelete, + .updateFp = mgmtNormalTableActionUpdate, + .encodeFp = mgmtNormalTableActionEncode, + .decodeFp = mgmtNormalTableActionDecode, + .destroyFp = mgmtNormalTableActionDestroy, + }; + + tsNormalTableSdb = sdbOpenTable(&tableDesc); if (tsNormalTableSdb == NULL) { mError("failed to init ntables data"); return -1; diff --git a/src/mnode/src/mgmtSdb.c b/src/mnode/src/mgmtSdb.c index 33bfe4622bdf569348a1fe362b55e808a294d926..8703c63541d0d404bc1b6a25f504ba46e353ed0a 100644 --- a/src/mnode/src/mgmtSdb.c +++ b/src/mnode/src/mgmtSdb.c @@ -16,8 +16,8 @@ #define _DEFAULT_SOURCE #include "os.h" #include "taosdef.h" -#include "tutil.h" #include "tchecksum.h" +#include "tglobalcfg.h" #include "tlog.h" #include "trpc.h" #include "tutil.h" @@ -33,32 +33,37 @@ typedef struct { uint64_t swVersion; int16_t sdbFileVersion; - char reserved[6]; + char reserved[2]; TSCKSUM checkSum; } SSdbHeader; typedef struct _SSdbTable { - SSdbHeader header; - int maxRows; - int dbId; - int32_t maxRowSize; - char name[TSDB_DB_NAME_LEN]; - char fn[128]; - int keyType; - uint32_t autoIndex; - int64_t numOfRows; - int64_t id; - int64_t size; - void * iHandle; - int fd; - void *(*appTool)(char, void *, char *, int, int *); + SSdbHeader header; + char name[TSDB_DB_NAME_LEN]; + char fn[TSDB_FILENAME_LEN]; + ESdbKeyType keyType; + int32_t dbId; + int32_t hashSessions; + int32_t maxRowSize; + uint32_t autoIndex; + int64_t numOfRows; + int64_t id; + int64_t size; + void * iHandle; + int32_t fd; + int32_t (*insertFp)(void *pObj); + int32_t (*deleteFp)(void *pObj); + int32_t (*updateFp)(void *pObj); + void * (*decodeFp)(void *pData); // return pObj + int32_t (*encodeFp)(void *pObj, void *pData, int32_t maxRowSize); // return size of pData + int32_t (*destroyFp)(void *pObj); pthread_mutex_t mutex; } SSdbTable; typedef struct { int64_t id; int64_t offset; - int rowSize; + int32_t rowSize; void * row; } SRowMeta; @@ -71,9 +76,9 @@ typedef struct { typedef struct { uint8_t dbId; - char type; + int8_t type; + int16_t dataLen; uint64_t version; - short dataLen; char data[]; } SForwardMsg; @@ -283,7 +288,7 @@ static int32_t sdbInitTableByFile(SSdbTable *pTable) { // TODO: Get rid of the rowMeta.offset and rowSize rowMeta.offset = pTable->size; rowMeta.rowSize = rowHead->rowSize; - rowMeta.row = (*(pTable->appTool))(SDB_TYPE_DECODE, NULL, rowHead->data, rowHead->rowSize, NULL); + rowMeta.row = (*pTable->decodeFp)(rowHead->data); (*sdbAddIndexFp[pTable->keyType])(pTable->iHandle, rowMeta.row, &rowMeta); if (pTable->keyType == SDB_KEYTYPE_AUTO) { pTable->autoIndex++; @@ -299,7 +304,7 @@ static int32_t sdbInitTableByFile(SSdbTable *pTable) { if (rowHead->id < 0) { // Delete the object (*sdbDeleteIndexFp[pTable->keyType])(pTable->iHandle, rowHead->data); - (*(pTable->appTool))(SDB_TYPE_DESTROY, pMetaRow, NULL, 0, NULL); + (*pTable->destroyFp)(pMetaRow); pTable->numOfRows--; numOfDels++; } else { // Reset the object TODO: is it possible to merge reset and @@ -322,7 +327,7 @@ static int32_t sdbInitTableByFile(SSdbTable *pTable) { } sdbVersion += (pTable->id - oldId); - if (numOfDels > pTable->maxRows / 4) sdbSaveSnapShot(pTable); + if (numOfDels > pTable->hashSessions / 4) sdbSaveSnapShot(pTable); tfree(rowHead); return 0; @@ -332,20 +337,25 @@ sdb_exit1: return -1; } -void *sdbOpenTable(int32_t maxRows, int32_t maxRowSize, char *name, uint8_t keyType, char *directory, - void *(*appTool)(char, void *, char *, int32_t, int32_t *)) { - SSdbTable *pTable = (SSdbTable *)malloc(sizeof(SSdbTable)); +void *sdbOpenTable(SSdbTableDesc *pDesc) { + SSdbTable *pTable = (SSdbTable *)calloc(1, sizeof(SSdbTable)); if (pTable == NULL) return NULL; - memset(pTable, 0, sizeof(SSdbTable)); - strcpy(pTable->name, name); - pTable->keyType = keyType; - pTable->maxRows = maxRows; - pTable->maxRowSize = maxRowSize; - pTable->appTool = appTool; - sprintf(pTable->fn, "%s/%s.db", directory, pTable->name); - - if (sdbInitIndexFp[keyType] != NULL) pTable->iHandle = (*sdbInitIndexFp[keyType])(maxRows, sizeof(SRowMeta)); + pTable->keyType = pDesc->keyType; + pTable->hashSessions = pDesc->hashSessions; + pTable->maxRowSize = pDesc->maxRowSize; + pTable->insertFp = pDesc->insertFp; + pTable->deleteFp = pDesc->deleteFp; + pTable->updateFp = pDesc->updateFp; + pTable->encodeFp = pDesc->encodeFp; + pTable->decodeFp = pDesc->decodeFp; + pTable->destroyFp = pDesc->destroyFp; + strcpy(pTable->name, pDesc->tableName); + sprintf(pTable->fn, "%s/%s.db", tsMnodeDir, pTable->name); + + if (sdbInitIndexFp[pTable->keyType] != NULL) { + pTable->iHandle = (*sdbInitIndexFp[pTable->keyType])(pTable->maxRowSize, sizeof(SRowMeta)); + } pthread_mutex_init(&pTable->mutex, NULL); @@ -386,7 +396,7 @@ void *sdbGetRow(void *handle, void *key) { } // row here must be encoded string (rowSize > 0) or the object it self (rowSize = 0) -int32_t sdbInsertRow(void *handle, void *row, ESdbOper oper) { +int32_t sdbInsertRow(void *handle, void *row, ESdbOperType oper) { SSdbTable *pTable = (SSdbTable *)handle; SRowMeta rowMeta; void * pObj = NULL; @@ -424,7 +434,7 @@ int32_t sdbInsertRow(void *handle, void *row, ESdbOper oper) { if (oper == SDB_OPER_GLOBAL) { pObj = row; } else { - pObj = (*(pTable->appTool))(SDB_TYPE_DECODE, NULL, row, 0, NULL); + pObj = (*pTable->decodeFp)(row); } pthread_mutex_lock(&pTable->mutex); @@ -439,7 +449,7 @@ int32_t sdbInsertRow(void *handle, void *row, ESdbOper oper) { } if (oper == SDB_OPER_GLOBAL || oper == SDB_OPER_LOCAL) { - (*(pTable->appTool))(SDB_TYPE_ENCODE, pObj, rowHead->data, pTable->maxRowSize, &(rowHead->rowSize)); + rowHead->rowSize = (*pTable->encodeFp)(pObj, rowHead->data, pTable->maxRowSize); assert(rowHead->rowSize > 0 && rowHead->rowSize <= pTable->maxRowSize); real_size = sizeof(SRowHead) + rowHead->rowSize + sizeof(TSCKSUM); @@ -489,7 +499,7 @@ int32_t sdbInsertRow(void *handle, void *row, ESdbOper oper) { pthread_mutex_unlock(&pTable->mutex); - (*pTable->appTool)(SDB_TYPE_INSERT, pObj, NULL, 0, NULL); + (*pTable->insertFp)(pObj); tfree(rowHead); @@ -497,7 +507,7 @@ int32_t sdbInsertRow(void *handle, void *row, ESdbOper oper) { } // row here can be object or null-terminated string -int32_t sdbDeleteRow(void *handle, void *row, ESdbOper oper) { +int32_t sdbDeleteRow(void *handle, void *row, ESdbOperType oper) { SSdbTable *pTable = (SSdbTable *)handle; SRowMeta * pMeta = NULL; void * pMetaRow = NULL; @@ -587,13 +597,13 @@ int32_t sdbDeleteRow(void *handle, void *row, ESdbOper oper) { tfree(rowHead); - (*pTable->appTool)(SDB_TYPE_DELETE, pMetaRow, NULL, 0, NULL); + (*pTable->deleteFp)(pMetaRow); return 0; } // row here can be the object or the string info (encoded string) -int32_t sdbUpdateRow(void *handle, void *row, int32_t updateSize, ESdbOper oper) { +int32_t sdbUpdateRow(void *handle, void *row, int32_t updateSize, ESdbOperType oper) { SSdbTable *pTable = (SSdbTable *)handle; SRowMeta * pMeta = NULL; int32_t total_size = 0; @@ -645,7 +655,7 @@ int32_t sdbUpdateRow(void *handle, void *row, int32_t updateSize, ESdbOper oper) memcpy(rowHead->data, row, updateSize); rowHead->rowSize = updateSize; } else { - (*(pTable->appTool))(SDB_TYPE_ENCODE, pMetaRow, rowHead->data, pTable->maxRowSize, &(rowHead->rowSize)); + rowHead->rowSize = (*pTable->encodeFp)(pMetaRow, rowHead->data, pTable->maxRowSize); } real_size = sizeof(SRowHead) + rowHead->rowSize + sizeof(TSCKSUM); @@ -689,7 +699,7 @@ int32_t sdbUpdateRow(void *handle, void *row, int32_t updateSize, ESdbOper oper) pthread_mutex_unlock(&pTable->mutex); - (*(pTable->appTool))(SDB_TYPE_UPDATE, pMetaRow, row, updateSize, NULL); // update in upper layer + (*pTable->updateFp)(pMetaRow); // update in upper layer tfree(rowHead); @@ -706,7 +716,7 @@ void sdbCloseTable(void *handle) { while (1) { pNode = sdbFetchRow(handle, pNode, &row); if (row == NULL) break; - (*(pTable->appTool))(SDB_TYPE_DESTROY, row, NULL, 0, NULL); + (*pTable->destroyFp)(row); } if (sdbCleanUpIndexFp[pTable->keyType]) (*sdbCleanUpIndexFp[pTable->keyType])(pTable->iHandle); @@ -724,13 +734,13 @@ void sdbCloseTable(void *handle) { void sdbResetTable(SSdbTable *pTable) { /* SRowHead rowHead; */ SRowMeta rowMeta; - int32_t bytes; - int32_t total_size = 0; - int32_t real_size = 0; + int32_t bytes; + int32_t total_size = 0; + int32_t real_size = 0; SRowHead *rowHead = NULL; void * pMetaRow = NULL; int64_t oldId = pTable->id; - int32_t oldNumOfRows = pTable->numOfRows; + int32_t oldNumOfRows = pTable->numOfRows; if (sdbOpenSdbFile(pTable) < 0) return; pTable->numOfRows = oldNumOfRows; @@ -792,19 +802,19 @@ void sdbResetTable(SSdbTable *pTable) { // TODO:Get rid of the rowMeta.offset and rowSize rowMeta.offset = pTable->size; rowMeta.rowSize = rowHead->rowSize; - rowMeta.row = (*(pTable->appTool))(SDB_TYPE_DECODE, NULL, rowHead->data, rowHead->rowSize, NULL); + rowMeta.row = (*pTable->decodeFp)(rowHead->data); (*sdbAddIndexFp[pTable->keyType])(pTable->iHandle, rowMeta.row, &rowMeta); pTable->numOfRows++; - (*pTable->appTool)(SDB_TYPE_INSERT, rowMeta.row, NULL, 0, NULL); + (*pTable->insertFp)(rowMeta.row); } } else { // already exists if (rowHead->id < 0) { // Delete the object (*sdbDeleteIndexFp[pTable->keyType])(pTable->iHandle, rowHead->data); - (*(pTable->appTool))(SDB_TYPE_DESTROY, pMetaRow, NULL, 0, NULL); + (*pTable->destroyFp)(pMetaRow); pTable->numOfRows--; } else { // update the object - (*(pTable->appTool))(SDB_TYPE_UPDATE, pMetaRow, rowHead->data, rowHead->rowSize, NULL); + (*pTable->updateFp)(pMetaRow); } } } @@ -866,7 +876,7 @@ void sdbSaveSnapShot(void *handle) { rowHead->delimiter = SDB_DELIMITER; rowHead->id = pMeta->id; - (*(pTable->appTool))(SDB_TYPE_ENCODE, pMeta->row, rowHead->data, pTable->maxRowSize, &(rowHead->rowSize)); + rowHead->rowSize = (*pTable->encodeFp)(pMeta->row, rowHead->data, pTable->maxRowSize); real_size = sizeof(SRowHead) + rowHead->rowSize + sizeof(TSCKSUM); if (taosCalcChecksumAppend(0, (uint8_t *)rowHead, real_size) < 0) { sdbError("failed to get checksum while save sdb %s snapshot", pTable->name); diff --git a/src/mnode/src/mgmtSuperTable.c b/src/mnode/src/mgmtSuperTable.c index 9120854e4769aac95319981e1be6b8f0d1634ba0..de64f9f54e5e55866f684db5e9a804741029f681 100644 --- a/src/mnode/src/mgmtSuperTable.c +++ b/src/mnode/src/mgmtSuperTable.c @@ -39,105 +39,73 @@ static void *tsSuperTableSdb; static int32_t tsSuperTableUpdateSize; -static void *(*mgmtSuperTableActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int32_t size, int32_t *ssize); -static void *mgmtSuperTableActionInsert(void *row, char *str, int32_t size, int32_t *ssize); -static void *mgmtSuperTableActionDelete(void *row, char *str, int32_t size, int32_t *ssize); -static void *mgmtSuperTableActionUpdate(void *row, char *str, int32_t size, int32_t *ssize); -static void *mgmtSuperTableActionEncode(void *row, char *str, int32_t size, int32_t *ssize); -static void *mgmtSuperTableActionDecode(void *row, char *str, int32_t size, int32_t *ssize); -static void *mgmtSuperTableActionReset(void *row, char *str, int32_t size, int32_t *ssize); -static void *mgmtSuperTableActionDestroy(void *row, char *str, int32_t size, int32_t *ssize); static int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mgmtGetShowSuperTableMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); static void mgmtDestroySuperTable(SSuperTableObj *pTable) { - free(pTable->schema); - free(pTable); + tfree(pTable->schema); + tfree(pTable); } -static void mgmtSuperTableActionInit() { - SSuperTableObj tObj; - tsSuperTableUpdateSize = tObj.updateEnd - (int8_t *)&tObj; - - mgmtSuperTableActionFp[SDB_TYPE_INSERT] = mgmtSuperTableActionInsert; - mgmtSuperTableActionFp[SDB_TYPE_DELETE] = mgmtSuperTableActionDelete; - mgmtSuperTableActionFp[SDB_TYPE_UPDATE] = mgmtSuperTableActionUpdate; - mgmtSuperTableActionFp[SDB_TYPE_ENCODE] = mgmtSuperTableActionEncode; - mgmtSuperTableActionFp[SDB_TYPE_DECODE] = mgmtSuperTableActionDecode; - mgmtSuperTableActionFp[SDB_TYPE_DESTROY] = mgmtSuperTableActionDestroy; -} - -void *mgmtSuperTableActionReset(void *row, char *str, int32_t size, int32_t *ssize) { - SSuperTableObj *pTable = (SSuperTableObj *) row; - memcpy(pTable, str, tsSuperTableUpdateSize); - - int32_t schemaSize = sizeof(SSchema) * (pTable->numOfColumns + pTable->numOfTags); - pTable->schema = realloc(pTable->schema, schemaSize); - memcpy(pTable->schema, str + tsSuperTableUpdateSize, schemaSize); - - return NULL; -} - -void *mgmtSuperTableActionDestroy(void *row, char *str, int32_t size, int32_t *ssize) { - SSuperTableObj *pTable = (SSuperTableObj *) row; +static int32_t mgmtSuperTableActionDestroy(void *pObj) { + SSuperTableObj *pTable = (SSuperTableObj *) pObj; mgmtDestroySuperTable(pTable); - return NULL; + return TSDB_CODE_SUCCESS; } -void *mgmtSuperTableActionInsert(void *row, char *str, int32_t size, int32_t *ssize) { - STableInfo *pTable = (STableInfo *) row; +static int32_t mgmtSuperTableActionInsert(void *pObj) { + STableInfo *pTable = (STableInfo *) pObj; SDbObj *pDb = mgmtGetDbByTableId(pTable->tableId); if (pDb) { mgmtAddSuperTableIntoDb(pDb); } - return NULL; + return TSDB_CODE_SUCCESS; } -void *mgmtSuperTableActionDelete(void *row, char *str, int32_t size, int32_t *ssize) { - STableInfo *pTable = (STableInfo *) row; +static int32_t mgmtSuperTableActionDelete(void *pObj) { + STableInfo *pTable = (STableInfo *) pObj; SDbObj *pDb = mgmtGetDbByTableId(pTable->tableId); if (pDb) { mgmtRemoveSuperTableFromDb(pDb); } - return NULL; + return TSDB_CODE_SUCCESS; } -void *mgmtSuperTableActionUpdate(void *row, char *str, int32_t size, int32_t *ssize) { - return mgmtSuperTableActionReset(row, str, size, NULL); +static int32_t mgmtSuperTableActionUpdate(void *pObj) { + SSuperTableObj *pTable = (SSuperTableObj *) pObj; + memcpy(pTable, pObj, tsSuperTableUpdateSize); + + int32_t schemaSize = sizeof(SSchema) * (pTable->numOfColumns + pTable->numOfTags); + pTable->schema = realloc(pTable->schema, schemaSize); + memcpy(pTable->schema, pObj + tsSuperTableUpdateSize, schemaSize); + + return TSDB_CODE_SUCCESS; } -void *mgmtSuperTableActionEncode(void *row, char *str, int32_t size, int32_t *ssize) { - SSuperTableObj *pTable = (SSuperTableObj *) row; - assert(row != NULL && str != NULL); +static int32_t mgmtSuperTableActionEncode(void *pObj, void *pData, int32_t maxRowSize) { + SSuperTableObj *pTable = (SSuperTableObj *) pObj; + assert(pObj != NULL && pData != NULL); int32_t schemaSize = sizeof(SSchema) * (pTable->numOfColumns + pTable->numOfTags); - if (size < tsSuperTableUpdateSize + schemaSize + 1) { - *ssize = -1; - return NULL; + if (maxRowSize < tsSuperTableUpdateSize + schemaSize + 1) { + return TSDB_CODE_INVALID_MSG_LEN; } - memcpy(str, pTable, tsSuperTableUpdateSize); - memcpy(str + tsSuperTableUpdateSize, pTable->schema, schemaSize); - *ssize = tsSuperTableUpdateSize + schemaSize; - - return NULL; + memcpy(pData, pTable, tsSuperTableUpdateSize); + memcpy(pData + tsSuperTableUpdateSize, pTable->schema, schemaSize); + return tsSuperTableUpdateSize + schemaSize; } -void *mgmtSuperTableActionDecode(void *row, char *str, int32_t size, int32_t *ssize) { - assert(str != NULL); +static void *mgmtSuperTableActionDecode(void *pData) { + assert(pData != NULL); SSuperTableObj *pTable = (SSuperTableObj *) malloc(sizeof(SSuperTableObj)); if (pTable == NULL) { return NULL; } memset(pTable, 0, sizeof(SSuperTableObj)); - - if (size < tsSuperTableUpdateSize) { - mgmtDestroySuperTable(pTable); - return NULL; - } - memcpy(pTable, str, tsSuperTableUpdateSize); + memcpy(pTable, pData, tsSuperTableUpdateSize); int32_t schemaSize = sizeof(SSchema) * (pTable->numOfColumns + pTable->numOfTags); pTable->schema = malloc(schemaSize); @@ -146,26 +114,32 @@ void *mgmtSuperTableActionDecode(void *row, char *str, int32_t size, int32_t *ss return NULL; } - memcpy(pTable->schema, str + tsSuperTableUpdateSize, schemaSize); + memcpy(pTable->schema, pData + tsSuperTableUpdateSize, schemaSize); return (void *) pTable; } -void *mgmtSuperTableAction(char action, void *row, char *str, int32_t size, int32_t *ssize) { - if (mgmtSuperTableActionFp[(uint8_t) action] != NULL) { - return (*(mgmtSuperTableActionFp[(uint8_t) action]))(row, str, size, ssize); - } - return NULL; -} - int32_t mgmtInitSuperTables() { void *pNode = NULL; void *pLastNode = NULL; SSuperTableObj *pTable = NULL; - mgmtSuperTableActionInit(); + SSuperTableObj tObj; + tsSuperTableUpdateSize = tObj.updateEnd - (int8_t *)&tObj; - tsSuperTableSdb = sdbOpenTable(TSDB_MAX_SUPER_TABLES, tsSuperTableUpdateSize + sizeof(SSchema) * TSDB_MAX_COLUMNS, - "stables", SDB_KEYTYPE_STRING, tsMnodeDir, mgmtSuperTableAction); + SSdbTableDesc tableDesc = { + .tableName = "stables", + .hashSessions = TSDB_MAX_SUPER_TABLES, + .maxRowSize = tsSuperTableUpdateSize + sizeof(SSchema) * TSDB_MAX_COLUMNS, + .keyType = SDB_KEYTYPE_STRING, + .insertFp = mgmtSuperTableActionInsert, + .deleteFp = mgmtSuperTableActionDelete, + .updateFp = mgmtSuperTableActionUpdate, + .encodeFp = mgmtSuperTableActionEncode, + .decodeFp = mgmtSuperTableActionDecode, + .destroyFp = mgmtSuperTableActionDestroy, + }; + + tsSuperTableSdb = sdbOpenTable(&tableDesc); if (tsSuperTableSdb == NULL) { mError("failed to init stables data"); return -1; @@ -365,8 +339,8 @@ int32_t mgmtModifySuperTableTagNameByName(SSuperTableObj *pStable, char *oldTagN return TSDB_CODE_INVALID_MSG_TYPE; } - int32_t rowSize = 0; - uint32_t len = strlen(newTagName); + int32_t rowSize = 0; + uint32_t len = strlen(newTagName); if (col >= pStable->numOfTags || len >= TSDB_COL_NAME_LEN || mgmtFindSuperTableTagIndex(pStable, newTagName) >= 0) { return TSDB_CODE_APP_ERROR; @@ -382,7 +356,7 @@ int32_t mgmtModifySuperTableTagNameByName(SSuperTableObj *pStable, char *oldTagN if (msg == NULL) return TSDB_CODE_APP_ERROR; memset(msg, 0, size); - mgmtSuperTableActionEncode(pStable, msg, size, &rowSize); + // mgmtSuperTableActionEncode(pStable, msg, size, &rowSize); int32_t ret = sdbUpdateRow(tsSuperTableSdb, msg, tsSuperTableUpdateSize, SDB_OPER_GLOBAL); tfree(msg); diff --git a/src/mnode/src/mgmtUser.c b/src/mnode/src/mgmtUser.c index 174976f4c0780b3c7e4788e541ceef6dd8a2f8d1..2ca37a45725d88258a3727ed1c83c3ae172f8101 100644 --- a/src/mnode/src/mgmtUser.c +++ b/src/mnode/src/mgmtUser.c @@ -18,6 +18,7 @@ #include "trpc.h" #include "tschemautil.h" #include "ttime.h" +#include "tutil.h" #include "mgmtAcct.h" #include "mgmtGrant.h" #include "mgmtMnode.h" @@ -38,16 +39,53 @@ static void mgmtProcessCreateUserMsg(SQueuedMsg *pMsg); static void mgmtProcessAlterUserMsg(SQueuedMsg *pMsg); static void mgmtProcessDropUserMsg(SQueuedMsg *pMsg); -static void *(*mgmtUserActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int32_t size, int32_t *ssize); -static void *mgmtUserActionInsert(void *row, char *str, int32_t size, int32_t *ssize); -static void *mgmtUserActionDelete(void *row, char *str, int32_t size, int32_t *ssize); -static void *mgmtUserActionUpdate(void *row, char *str, int32_t size, int32_t *ssize); -static void *mgmtUserActionEncode(void *row, char *str, int32_t size, int32_t *ssize); -static void *mgmtUserActionDecode(void *row, char *str, int32_t size, int32_t *ssize); -static void *mgmtUserActionReset(void *row, char *str, int32_t size, int32_t *ssize); -static void *mgmtUserActionDestroy(void *row, char *str, int32_t size, int32_t *ssize); -static void *mgmtUserAction(char action, void *row, char *str, int32_t size, int32_t *ssize); -static void mgmtUserActionInit(); +static int32_t mgmtUserActionDestroy(void *pObj) { + tfree(pObj); + return TSDB_CODE_SUCCESS; +} + +static int32_t mgmtUserActionInsert(void *pObj) { + SUserObj *pUser = (SUserObj *) pObj; + SAcctObj *pAcct = mgmtGetAcct(pUser->acct); + + pUser->pAcct = pAcct; + mgmtAddUserIntoAcct(pAcct, pUser); + + return TSDB_CODE_SUCCESS; +} + +static int32_t mgmtUserActionDelete(void *pObj) { + SUserObj *pUser = (SUserObj *) pObj; + SAcctObj *pAcct = mgmtGetAcct(pUser->acct); + + mgmtRemoveUserFromAcct(pAcct, pUser); + + return TSDB_CODE_SUCCESS; +} + +static int32_t mgmtUserActionUpdate(void *pObj) { + return TSDB_CODE_SUCCESS; +} + +static int32_t mgmtUserActionEncode(void *pObj, void *pData, int32_t maxRowSize) { + SUserObj *pUser = (SUserObj *) pObj; + + if (maxRowSize < tsUserUpdateSize) { + return -1; + } else { + memcpy(pData, pUser, tsUserUpdateSize); + return tsUserUpdateSize; + } +} + +static void *mgmtUserActionDecode(void *pData) { + SUserObj *pUser = (SUserObj *) malloc(sizeof(SUserObj)); + if (pUser == NULL) return NULL; + memset(pUser, 0, sizeof(SUserObj)); + memcpy(pUser, pData, tsUserUpdateSize); + + return pUser; +} int32_t mgmtInitUsers() { void *pNode = NULL; @@ -55,12 +93,23 @@ int32_t mgmtInitUsers() { SAcctObj *pAcct = NULL; int32_t numOfUsers = 0; - mgmtUserActionInit(); - SUserObj tObj; tsUserUpdateSize = tObj.updateEnd - (int8_t *)&tObj; - tsUserSdb = sdbOpenTable(TSDB_MAX_USERS, tsUserUpdateSize, "users", SDB_KEYTYPE_STRING, tsMnodeDir, mgmtUserAction); + SSdbTableDesc tableDesc = { + .tableName = "users", + .hashSessions = TSDB_MAX_USERS, + .maxRowSize = tsUserUpdateSize, + .keyType = SDB_KEYTYPE_STRING, + .insertFp = mgmtUserActionInsert, + .deleteFp = mgmtUserActionDelete, + .updateFp = mgmtUserActionUpdate, + .encodeFp = mgmtUserActionEncode, + .decodeFp = mgmtUserActionDecode, + .destroyFp = mgmtUserActionDestroy, + }; + + tsUserSdb = sdbOpenTable(&tableDesc); if (tsUserSdb == NULL) { mError("failed to init user data"); return -1; @@ -246,82 +295,6 @@ static int32_t mgmtRetrieveUsers(SShowObj *pShow, char *data, int32_t rows, void return numOfRows; } -static void mgmtUserActionInit() { - mgmtUserActionFp[SDB_TYPE_INSERT] = mgmtUserActionInsert; - mgmtUserActionFp[SDB_TYPE_DELETE] = mgmtUserActionDelete; - mgmtUserActionFp[SDB_TYPE_UPDATE] = mgmtUserActionUpdate; - mgmtUserActionFp[SDB_TYPE_ENCODE] = mgmtUserActionEncode; - mgmtUserActionFp[SDB_TYPE_DECODE] = mgmtUserActionDecode; - mgmtUserActionFp[SDB_TYPE_DESTROY] = mgmtUserActionDestroy; -} - -static void *mgmtUserAction(char action, void *row, char *str, int32_t size, int32_t *ssize) { - if (mgmtUserActionFp[(uint8_t) action] != NULL) { - return (*(mgmtUserActionFp[(uint8_t) action]))(row, str, size, ssize); - } - return NULL; -} - -static void *mgmtUserActionInsert(void *row, char *str, int32_t size, int32_t *ssize) { - SUserObj *pUser = (SUserObj *) row; - SAcctObj *pAcct = mgmtGetAcct(pUser->acct); - - pUser->pAcct = pAcct; - mgmtAddUserIntoAcct(pAcct, pUser); - - return NULL; -} - -static void *mgmtUserActionDelete(void *row, char *str, int32_t size, int32_t *ssize) { - SUserObj *pUser = (SUserObj *) row; - SAcctObj *pAcct = mgmtGetAcct(pUser->acct); - - mgmtRemoveUserFromAcct(pAcct, pUser); - - return NULL; -} - -static void *mgmtUserActionUpdate(void *row, char *str, int32_t size, int32_t *ssize) { - return mgmtUserActionReset(row, str, size, ssize); -} - -static void *mgmtUserActionEncode(void *row, char *str, int32_t size, int32_t *ssize) { - SUserObj *pUser = (SUserObj *) row; - - if (size < tsUserUpdateSize) { - *ssize = -1; - } else { - memcpy(str, pUser, tsUserUpdateSize); - *ssize = tsUserUpdateSize; - } - - return NULL; -} - -static void *mgmtUserActionDecode(void *row, char *str, int32_t size, int32_t *ssize) { - SUserObj *pUser = (SUserObj *) malloc(sizeof(SUserObj)); - if (pUser == NULL) return NULL; - memset(pUser, 0, sizeof(SUserObj)); - - memcpy(pUser, str, tsUserUpdateSize); - - return (void *)pUser; -} - -static void *mgmtUserActionReset(void *row, char *str, int32_t size, int32_t *ssize) { - SUserObj *pUser = (SUserObj *)row; - - memcpy(pUser, str, tsUserUpdateSize); - - return NULL; -} - -static void *mgmtUserActionDestroy(void *row, char *str, int32_t size, int32_t *ssize) { - tfree(row); - - return NULL; -} - SUserObj *mgmtGetUserFromConn(void *pConn) { SRpcConnInfo connInfo; if (rpcGetConnInfo(pConn, &connInfo) == 0) { diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index d4df83c25c959828a54b6c264341367675d56432..12ac6fef57b8a99b904f5ca00594620a724e5cdd 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -33,15 +33,6 @@ static void *tsVgroupSdb = NULL; static int32_t tsVgUpdateSize = 0; -static void *(*mgmtVgroupActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int32_t size, int32_t *ssize); -static void *mgmtVgroupActionInsert(void *row, char *str, int32_t size, int32_t *ssize); -static void *mgmtVgroupActionDelete(void *row, char *str, int32_t size, int32_t *ssize); -static void *mgmtVgroupActionUpdate(void *row, char *str, int32_t size, int32_t *ssize); -static void *mgmtVgroupActionEncode(void *row, char *str, int32_t size, int32_t *ssize); -static void *mgmtVgroupActionDecode(void *row, char *str, int32_t size, int32_t *ssize); -static void *mgmtVgroupActionReset(void *row, char *str, int32_t size, int32_t *ssize); -static void *mgmtVgroupActionDestroy(void *row, char *str, int32_t size, int32_t *ssize); - static int32_t mgmtGetVgroupMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn); static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg); @@ -50,32 +41,98 @@ static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg); static void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle); static void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle); -static void mgmtVgroupActionInit() { - SVgObj tObj; - tsVgUpdateSize = tObj.updateEnd - (int8_t *)&tObj; +static int32_t mgmtVgroupActionDestroy(void *pObj) { + SVgObj *pVgroup = (SVgObj *) pObj; + if (pVgroup->idPool) { + taosIdPoolCleanUp(pVgroup->idPool); + pVgroup->idPool = NULL; + } + if (pVgroup->tableList) tfree(pVgroup->tableList); + tfree(pObj); + return TSDB_CODE_SUCCESS; +} - mgmtVgroupActionFp[SDB_TYPE_INSERT] = mgmtVgroupActionInsert; - mgmtVgroupActionFp[SDB_TYPE_DELETE] = mgmtVgroupActionDelete; - mgmtVgroupActionFp[SDB_TYPE_UPDATE] = mgmtVgroupActionUpdate; - mgmtVgroupActionFp[SDB_TYPE_ENCODE] = mgmtVgroupActionEncode; - mgmtVgroupActionFp[SDB_TYPE_DECODE] = mgmtVgroupActionDecode; - mgmtVgroupActionFp[SDB_TYPE_DESTROY] = mgmtVgroupActionDestroy; +static int32_t mgmtVgroupActionInsert(void *pObj) { + SVgObj *pVgroup = pObj; + for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { + pVgroup->vnodeGid[i].vnode = pVgroup->vgId; + } + + return TSDB_CODE_SUCCESS; } -static void *mgmtVgroupAction(char action, void *row, char *str, int32_t size, int32_t *ssize) { - if (mgmtVgroupActionFp[(uint8_t) action] != NULL) { - return (*(mgmtVgroupActionFp[(uint8_t) action]))(row, str, size, ssize); +static int32_t mgmtVgroupActionDelete(void *pObj) { + SVgObj *pVgroup = pObj; + SDbObj *pDb = mgmtGetDb(pVgroup->dbName); + + if (pDb != NULL) { + mgmtRemoveVgroupFromDb(pDb, pVgroup); } - return NULL; + + // mgmtUnSetDnodeVgid(pVgroup->vnodeGid, pVgroup->numOfVnodes); + tfree(pVgroup->tableList); + + return TSDB_CODE_SUCCESS; +} + +static int32_t mgmtVgroupActionUpdate(void *pObj) { + SVgObj *pVgroup = (SVgObj *) pObj; + int32_t oldTables = taosIdPoolMaxSize(pVgroup->idPool); + + SDbObj *pDb = mgmtGetDb(pVgroup->dbName); + if (pDb != NULL) { + if (pDb->cfg.maxSessions != oldTables) { + mPrint("vgroup:%d tables change from %d to %d", pVgroup->vgId, oldTables, pDb->cfg.maxSessions); + taosUpdateIdPool(pVgroup->idPool, pDb->cfg.maxSessions); + int32_t size = sizeof(STableInfo *) * pDb->cfg.maxSessions; + pVgroup->tableList = (STableInfo **)realloc(pVgroup->tableList, size); + } + } + + mTrace("vgroup:%d update, numOfVnode:%d", pVgroup->vgId, pVgroup->numOfVnodes); + return TSDB_CODE_SUCCESS; +} + +static int32_t mgmtVgroupActionEncode(void *pObj, void *pData, int32_t maxRowSize) { + SVgObj *pVgroup = (SVgObj *) pObj; + if (maxRowSize < tsVgUpdateSize) { + return -1; + } else { + memcpy(pData, pVgroup, tsVgUpdateSize); + return tsVgUpdateSize; + } +} + +static void *mgmtVgroupActionDecode(void *pObj) { + SVgObj *pVgroup = (SVgObj *) malloc(sizeof(SVgObj)); + if (pVgroup == NULL) return NULL; + memset(pVgroup, 0, sizeof(SVgObj)); + memcpy(pVgroup, pObj, tsVgUpdateSize); + + return pVgroup; } int32_t mgmtInitVgroups() { void *pNode = NULL; SVgObj *pVgroup = NULL; - mgmtVgroupActionInit(); + SVgObj tObj; + tsVgUpdateSize = tObj.updateEnd - (int8_t *)&tObj; + + SSdbTableDesc tableDesc = { + .tableName = "vgroups", + .hashSessions = TSDB_MAX_VGROUPS, + .maxRowSize = tsVgUpdateSize, + .keyType = SDB_KEYTYPE_AUTO, + .insertFp = mgmtVgroupActionInsert, + .deleteFp = mgmtVgroupActionDelete, + .updateFp = mgmtVgroupActionUpdate, + .encodeFp = mgmtVgroupActionEncode, + .decodeFp = mgmtVgroupActionDecode, + .destroyFp = mgmtVgroupActionDestroy, + }; - tsVgroupSdb = sdbOpenTable(TSDB_MAX_VGROUPS, tsVgUpdateSize, "vgroups", SDB_KEYTYPE_AUTO, tsMnodeDir, mgmtVgroupAction); + tsVgroupSdb = sdbOpenTable(&tableDesc); if (tsVgroupSdb == NULL) { mError("failed to init vgroups data"); return -1; @@ -389,90 +446,6 @@ int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pCo return numOfRows; } -static void *mgmtVgroupActionInsert(void *row, char *str, int32_t size, int32_t *ssize) { - SVgObj *pVgroup = row; - for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { - pVgroup->vnodeGid[i].vnode = pVgroup->vgId; - } - - return NULL; -} - -static void *mgmtVgroupActionDelete(void *row, char *str, int32_t size, int32_t *ssize) { - SVgObj *pVgroup = row; - SDbObj *pDb = mgmtGetDb(pVgroup->dbName); - - if (pDb != NULL) { - mgmtRemoveVgroupFromDb(pDb, pVgroup); - } - - // mgmtUnSetDnodeVgid(pVgroup->vnodeGid, pVgroup->numOfVnodes); - tfree(pVgroup->tableList); - - return NULL; -} - -static void *mgmtVgroupActionUpdate(void *row, char *str, int32_t size, int32_t *ssize) { - mgmtVgroupActionReset(row, str, size, ssize); - - SVgObj *pVgroup = (SVgObj *) row; - int32_t oldTables = taosIdPoolMaxSize(pVgroup->idPool); - - SDbObj *pDb = mgmtGetDb(pVgroup->dbName); - if (pDb != NULL) { - if (pDb->cfg.maxSessions != oldTables) { - mPrint("vgroup:%d tables change from %d to %d", pVgroup->vgId, oldTables, pDb->cfg.maxSessions); - taosUpdateIdPool(pVgroup->idPool, pDb->cfg.maxSessions); - int32_t size = sizeof(STableInfo *) * pDb->cfg.maxSessions; - pVgroup->tableList = (STableInfo **)realloc(pVgroup->tableList, size); - } - } - - mTrace("vgroup:%d update, numOfVnode:%d", pVgroup->vgId, pVgroup->numOfVnodes); - - return NULL; -} - -static void *mgmtVgroupActionEncode(void *row, char *str, int32_t size, int32_t *ssize) { - SVgObj *pVgroup = (SVgObj *) row; - if (size < tsVgUpdateSize) { - *ssize = -1; - } else { - memcpy(str, pVgroup, tsVgUpdateSize); - *ssize = tsVgUpdateSize; - } - - return NULL; -} - -static void *mgmtVgroupActionDecode(void *row, char *str, int32_t size, int32_t *ssize) { - SVgObj *pVgroup = (SVgObj *) malloc(sizeof(SVgObj)); - if (pVgroup == NULL) return NULL; - memset(pVgroup, 0, sizeof(SVgObj)); - - int32_t tsVgUpdateSize = pVgroup->updateEnd - (int8_t *) pVgroup; - memcpy(pVgroup, str, tsVgUpdateSize); - - return (void *) pVgroup; -} - -static void *mgmtVgroupActionReset(void *row, char *str, int32_t size, int32_t *ssize) { - SVgObj *pVgroup = (SVgObj *) row; - memcpy(pVgroup, str, tsVgUpdateSize); - return NULL; -} - -static void *mgmtVgroupActionDestroy(void *row, char *str, int32_t size, int32_t *ssize) { - SVgObj *pVgroup = (SVgObj *) row; - if (pVgroup->idPool) { - taosIdPoolCleanUp(pVgroup->idPool); - pVgroup->idPool = NULL; - } - if (pVgroup->tableList) tfree(pVgroup->tableList); - tfree(row); - return NULL; -} - void mgmtUpdateVgroup(SVgObj *pVgroup) { sdbUpdateRow(tsVgroupSdb, pVgroup, tsVgUpdateSize, SDB_OPER_LOCAL); }