diff --git a/src/mnode/inc/mgmtSdb.h b/src/mnode/inc/mgmtSdb.h index e33bfce8243f6be94250b2d517b026e0948f856f..ccddc11e04fc32cf2ee8cd0b503c51a993025d4d 100644 --- a/src/mnode/inc/mgmtSdb.h +++ b/src/mnode/inc/mgmtSdb.h @@ -32,12 +32,12 @@ typedef enum { typedef struct { ESdbOperType type; + void * table; + void * pObj; + int64_t version; int32_t maxRowSize; int32_t rowSize; void * rowData; - void * pObj; - void * table; - int64_t version; } SSdbOperDesc; typedef struct { diff --git a/src/mnode/src/mgmtSdb.c b/src/mnode/src/mgmtSdb.c index f957dc4a839d8c5ea95923604b44acf71954c069..ea05cb59f42f818d04c46c506515b75793d38e08 100644 --- a/src/mnode/src/mgmtSdb.c +++ b/src/mnode/src/mgmtSdb.c @@ -290,6 +290,8 @@ static int32_t sdbInitTableByFile(SSdbTable *pTable) { maxAutoIndex = MAX(maxAutoIndex, *(int32_t *) rowHead->data); } + pTable->version = MAX(pTable->version, abs(rowHead->version)); + void *pMetaRow = sdbGetRow(pTable, rowHead->data); if (pMetaRow == NULL) { if (rowHead->version < 0) { @@ -310,11 +312,11 @@ static int32_t sdbInitTableByFile(SSdbTable *pTable) { rowMeta.row = oper.pObj; (*sdbAddIndexFp[pTable->keyType])(pTable->iHandle, rowMeta.row, &rowMeta); pTable->numOfRows++; - sdbTrace("table:%s, read record:%s and insert, numOfRows:%d version:%" PRId64 " sdbversion:%" PRId64, - pTable->tableName, sdbGetkeyStr(pTable, rowHead->data), pTable->numOfRows, pTable->version, sdbVersion); + sdbTrace("table:%s, read new record:%s, numOfRows:%d version:%" PRId64 , + pTable->tableName, sdbGetkeyStr(pTable, rowHead->data), pTable->numOfRows, pTable->version); } else { - sdbError("table:%s, failed to decode record:%s, numOfRows:%d version:%" PRId64 " sdbversion:%" PRId64, - pTable->tableName, sdbGetkeyStr(pTable, rowHead->data), pTable->numOfRows, pTable->version, sdbVersion); + sdbError("table:%s, failed to decode record:%s, numOfRows:%d version:%" PRId64 , + pTable->tableName, sdbGetkeyStr(pTable, rowHead->data), pTable->numOfRows, pTable->version); } } } else { @@ -326,8 +328,8 @@ static int32_t sdbInitTableByFile(SSdbTable *pTable) { (*pTable->destroyFp)(&oper); (*sdbDeleteIndexFp[pTable->keyType])(pTable->iHandle, rowHead->data); pTable->numOfRows--; - sdbTrace("table:%s, read record:%s and delete, numOfRows:%d version:%" PRId64 " sdbversion:%" PRId64, - pTable->tableName, sdbGetkeyStr(pTable, rowHead->data), pTable->numOfRows, pTable->version, sdbVersion); + sdbTrace("table:%s, read deleted record:%s, numOfRows:%d version:%" PRId64 , + pTable->tableName, sdbGetkeyStr(pTable, rowHead->data), pTable->numOfRows, pTable->version); } else { SRowMeta rowMeta; rowMeta.version = rowHead->version; @@ -347,17 +349,14 @@ static int32_t sdbInitTableByFile(SSdbTable *pTable) { if (code == TSDB_CODE_SUCCESS) { rowMeta.row = oper.pObj; (*sdbAddIndexFp[pTable->keyType])(pTable->iHandle, rowMeta.row, &rowMeta); - sdbTrace("table:%s, read record:%s and update, numOfRows:%d version:%" PRId64 " sdbversion:%" PRId64, - pTable->tableName, sdbGetkeyStr(pTable, rowHead->data), pTable->numOfRows, pTable->version, sdbVersion); + sdbTrace("table:%s, read updated record:%s, numOfRows:%d version:%" PRId64 , + pTable->tableName, sdbGetkeyStr(pTable, rowHead->data), pTable->numOfRows, pTable->version); } else { - sdbError("table:%s, failed to decode record:%s, numOfRows:%d version:%" PRId64 " sdbversion:%" PRId64, - pTable->tableName, sdbGetkeyStr(pTable, rowHead->data), pTable->numOfRows, pTable->version, sdbVersion); + sdbError("table:%s, failed to decode record:%s, numOfRows:%d version:%" PRId64 , + pTable->tableName, sdbGetkeyStr(pTable, rowHead->data), pTable->numOfRows, pTable->version); } } numOfChanged++; - if (pTable->version < abs(rowHead->version)) { - pTable->version = abs(rowHead->version); - } } pTable->fileSize += real_size; @@ -424,7 +423,7 @@ void *sdbOpenTable(SSdbTableDesc *pDesc) { pTable->tableId = sdbNumOfTables++; sdbTableList[pTable->tableId] = pTable; - sdbTrace("table:%s is initialized, numOfRows:%d, numOfTables:%d, version:%" PRId64 " sdbversion:%" PRId64, + sdbTrace("table:%s, is initialized, numOfRows:%d numOfTables:%d version:%" PRId64 " sdbversion:%" PRId64, pTable->tableName, pTable->numOfRows, sdbNumOfTables, pTable->version, sdbVersion); return pTable; @@ -470,6 +469,7 @@ int32_t sdbInsertRow(SSdbOperDesc *pOper) { return TSDB_CODE_ALREADY_THERE; } + pOper->maxRowSize = pTable->maxRowSize; pthread_mutex_lock(&pTable->mutex); if (pOper->type == SDB_OPER_TYPE_GLOBAL) { @@ -507,12 +507,11 @@ int32_t sdbInsertRow(SSdbOperDesc *pOper) { pOper->rowData = rowHead->data; (*pTable->encodeFp)(pOper); rowHead->rowSize = pOper->rowSize; - + rowHead->delimiter = SDB_DELIMITER; + rowHead->version = pTable->version; assert(rowHead->rowSize > 0 && rowHead->rowSize <= pTable->maxRowSize); int32_t real_size = sizeof(SRowHead) + rowHead->rowSize + sizeof(TSCKSUM); - rowHead->delimiter = SDB_DELIMITER; - rowHead->version = pTable->version; if (taosCalcChecksumAppend(0, (uint8_t *)rowHead, real_size) < 0) { sdbError("table:%s, failed to get checksum while inserting", pTable->tableName); pTable->version--; @@ -540,7 +539,7 @@ int32_t sdbInsertRow(SSdbOperDesc *pOper) { pthread_mutex_unlock(&pTable->mutex); sdbTrace("table:%s, a record is inserted:%s, sdbversion:%" PRId64 " version:%" PRId64 " rowSize:%d numOfRows:%d fileSize:%" PRId64, - pTable->tableName, sdbGetkeyStr(pTable, pOper->pObj), sdbVersion, pOper->version, pOper->rowSize, pTable->numOfRows, pTable->fileSize); + pTable->tableName, sdbGetkeyStr(pTable, pOper->pObj), sdbVersion, pTable->version, pOper->rowSize, pTable->numOfRows, pTable->fileSize); (*pTable->insertFp)(pOper); @@ -568,11 +567,11 @@ int32_t sdbDeleteRow(SSdbOperDesc *pOper) { .type = SDB_FORWARD_TYPE_DELETE, .tableId = pTable->tableId, .version = pTable->version + 1, - .rowSize = pOper->rowSize, - .rowData = pOper->rowData, + .rowSize = pMeta->rowSize, + .rowData = pMeta->row, }; - if (sdbForwardDbReqToPeer(&forward) == 0) { + if (sdbForwardDbReqToPeer(&forward) != 0) { sdbError("table:%s, failed to delete record", pTable->tableName); pthread_mutex_unlock(&pTable->mutex); return -1; @@ -593,7 +592,7 @@ int32_t sdbDeleteRow(SSdbOperDesc *pOper) { int32_t rowSize = 0; switch (pTable->keyType) { case SDB_KEY_TYPE_STRING: - rowSize = strlen((char *)pOper->rowData) + 1; + rowSize = strlen((char *)pOper->pObj) + 1; break; case SDB_KEY_TYPE_AUTO: rowSize = sizeof(uint64_t); @@ -605,8 +604,9 @@ int32_t sdbDeleteRow(SSdbOperDesc *pOper) { rowHead->delimiter = SDB_DELIMITER; rowHead->rowSize = rowSize; rowHead->version = -(pTable->version); - memcpy(rowHead->data, pOper->rowData, pOper->rowSize); - if (taosCalcChecksumAppend(0, (uint8_t *)rowHead, total_size) < 0) { + memcpy(rowHead->data, pOper->pObj, rowSize); + int32_t real_size = sizeof(SRowHead) + rowHead->rowSize + sizeof(TSCKSUM); + if (taosCalcChecksumAppend(0, (uint8_t *)rowHead, real_size) < 0) { sdbError("failed to get checksum while inserting, sdb:%s", pTable->tableName); pTable->version--; sdbVersion--; @@ -615,14 +615,14 @@ int32_t sdbDeleteRow(SSdbOperDesc *pOper) { return -1; } - twrite(pTable->fd, rowHead, total_size); - pTable->fileSize += total_size; + twrite(pTable->fd, rowHead, real_size); + pTable->fileSize += real_size; sdbFinishCommit(pTable); tfree(rowHead); sdbTrace("table:%s, a record is deleted:%s, sdbversion:%" PRId64 " id:%" PRId64 " numOfRows:%d", - pTable->tableName, sdbGetkeyStr(pTable, pOper->rowData), sdbVersion, pTable->version, pTable->numOfRows); + pTable->tableName, sdbGetkeyStr(pTable, pOper->pObj), sdbVersion, pTable->version, pTable->numOfRows); // Delete from current layer (*sdbDeleteIndexFp[pTable->keyType])(pTable->iHandle, pOper->pObj); @@ -642,10 +642,10 @@ int32_t sdbUpdateRow(SSdbOperDesc *pOper) { SSdbTable *pTable = (SSdbTable *)pOper->table; if (pTable == NULL) return -1; - SRowMeta *pMeta = sdbGetRowMeta(pTable, pOper->rowData); + SRowMeta *pMeta = sdbGetRowMeta(pTable, pOper->pObj); if (pMeta == NULL) { sdbError("table:%s, failed to update record:%s, record is not there, sdbversion:%" PRId64 " id:%" PRId64, - pTable->tableName, sdbGetkeyStr(pTable, pOper->rowData), sdbVersion, pTable->version); + pTable->tableName, sdbGetkeyStr(pTable, pOper->pObj), sdbVersion, pTable->version); return -1; } @@ -658,11 +658,11 @@ int32_t sdbUpdateRow(SSdbOperDesc *pOper) { SForwardMsg forward = { .type = SDB_FORWARD_TYPE_UPDATE, .tableId = pTable->tableId, - .version = pOper->version + 1, + .version = pTable->version + 1, .rowSize = pOper->rowSize, .rowData = pOper->rowData, }; - if (sdbForwardDbReqToPeer(&forward) == 0) { + if (sdbForwardDbReqToPeer(&forward) != 0) { sdbError("table:%s, failed to update record", pTable->tableName); pthread_mutex_unlock(&pTable->mutex); return -1; @@ -710,7 +710,7 @@ int32_t sdbUpdateRow(SSdbOperDesc *pOper) { sdbFinishCommit(pTable); sdbTrace("table:%s, a record is updated:%s, sdbversion:%" PRId64 " id:%" PRId64 " numOfRows:%" PRId64, - pTable->tableName, sdbGetkeyStr(pTable, pOper->rowData), sdbVersion, pTable->version, pTable->numOfRows); + pTable->tableName, sdbGetkeyStr(pTable, pOper->pObj), sdbVersion, pTable->version, pTable->numOfRows); pMeta->version = pTable->version; pMeta->offset = pTable->fileSize; diff --git a/src/mnode/src/mgmtUser.c b/src/mnode/src/mgmtUser.c index 6e72a6c8c2de4a95f23c6ac5fccb6d22e1f46438..11285764b8db2ba0071e2cc90293eeb0b8f4f3a9 100644 --- a/src/mnode/src/mgmtUser.c +++ b/src/mnode/src/mgmtUser.c @@ -38,23 +38,20 @@ static void mgmtProcessCreateUserMsg(SQueuedMsg *pMsg); static void mgmtProcessAlterUserMsg(SQueuedMsg *pMsg); static void mgmtProcessDropUserMsg(SQueuedMsg *pMsg); -static int32_t mgmtUserActionDestroy(void *pObj) { - tfree(pObj); +static int32_t mgmtUserActionDestroy(SSdbOperDesc *pOper) { + tfree(pOper->pObj); return TSDB_CODE_SUCCESS; } -static int32_t mgmtUserActionInsert(void *pObj) { - SUserObj *pUser = (SUserObj *) pObj; +static int32_t mgmtUserActionInsert(SSdbOperDesc *pOper) { + SUserObj *pUser = pOper->pObj; SAcctObj *pAcct = mgmtGetAcct(pUser->acct); - - pUser->pAcct = pAcct; mgmtAddUserIntoAcct(pAcct, pUser); - return TSDB_CODE_SUCCESS; } -static int32_t mgmtUserActionDelete(void *pObj) { - SUserObj *pUser = (SUserObj *) pObj; +static int32_t mgmtUserActionDelete(SSdbOperDesc *pOper) { + SUserObj *pUser = pOper->pObj; SAcctObj *pAcct = mgmtGetAcct(pUser->acct); mgmtRemoveUserFromAcct(pAcct, pUser); @@ -62,36 +59,32 @@ static int32_t mgmtUserActionDelete(void *pObj) { return TSDB_CODE_SUCCESS; } -static int32_t mgmtUserActionUpdate(void *pObj) { +static int32_t mgmtUserActionUpdate(SSdbOperDesc *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mgmtUserActionEncode(void *pObj, void *pData, int32_t maxRowSize) { - SUserObj *pUser = (SUserObj *) pObj; +static int32_t mgmtUserActionEncode(SSdbOperDesc *pOper) { + SUserObj *pUser = pOper->pObj; - if (maxRowSize < tsUserUpdateSize) { + if (pOper->maxRowSize < tsUserUpdateSize) { return -1; } else { - memcpy(pData, pUser, tsUserUpdateSize); - return tsUserUpdateSize; + memcpy(pOper->rowData, pUser, tsUserUpdateSize); + pOper->rowSize = tsUserUpdateSize; + return TSDB_CODE_SUCCESS; } } -static void *mgmtUserActionDecode(void *pData) { - SUserObj *pUser = (SUserObj *) malloc(sizeof(SUserObj)); - if (pUser == NULL) return NULL; - memset(pUser, 0, sizeof(SUserObj)); - memcpy(pUser, pData, tsUserUpdateSize); +static int32_t mgmtUserActionDecode(SSdbOperDesc *pOper) { + SUserObj *pUser = (SUserObj *) calloc(1, sizeof(SUserObj)); + if (pUser == NULL) return -1; - return pUser; + memcpy(pUser, pOper->rowData, tsUserUpdateSize); + pOper->pObj = pUser; + return TSDB_CODE_SUCCESS; } int32_t mgmtInitUsers() { - void *pNode = NULL; - SUserObj *pUser = NULL; - SAcctObj *pAcct = NULL; - int32_t numOfUsers = 0; - SUserObj tObj; tsUserUpdateSize = tObj.updateEnd - (int8_t *)&tObj; @@ -99,7 +92,7 @@ int32_t mgmtInitUsers() { .tableName = "users", .hashSessions = TSDB_MAX_USERS, .maxRowSize = tsUserUpdateSize, - .keyType = SDB_KEYTYPE_STRING, + .keyType = SDB_KEY_TYPE_STRING, .insertFp = mgmtUserActionInsert, .deleteFp = mgmtUserActionDelete, .updateFp = mgmtUserActionUpdate, @@ -114,20 +107,7 @@ int32_t mgmtInitUsers() { return -1; } - while (1) { - pNode = sdbFetchRow(tsUserSdb, pNode, (void **)&pUser); - if (pUser == NULL) break; - - pUser->prev = NULL; - pUser->next = NULL; - - pAcct = mgmtGetAcct(pUser->acct); - mgmtAddUserIntoAcct(pAcct, pUser); - - numOfUsers++; - } - - pAcct = mgmtGetAcct("root"); + SAcctObj *pAcct = mgmtGetAcct("root"); mgmtCreateUser(pAcct, "root", "taosdata"); mgmtCreateUser(pAcct, "monitor", tsInternalPass); mgmtCreateUser(pAcct, "_root", tsInternalPass); @@ -151,7 +131,20 @@ SUserObj *mgmtGetUser(char *name) { } static int32_t mgmtUpdateUser(SUserObj *pUser) { - return sdbUpdateRow(tsUserSdb, pUser, tsUserUpdateSize, SDB_OPER_GLOBAL); + SSdbOperDesc oper = { + .type = SDB_OPER_TYPE_GLOBAL, + .table = tsUserSdb, + .pObj = pUser, + .rowSize = tsUserUpdateSize + }; + + int32_t code = sdbUpdateRow(&oper); + if (code != TSDB_CODE_SUCCESS) { + tfree(pUser); + code = TSDB_CODE_SDB_ERROR; + } + + return code; } static int32_t mgmtCreateUser(SAcctObj *pAcct, char *name, char *pass) { @@ -186,8 +179,15 @@ static int32_t mgmtCreateUser(SAcctObj *pAcct, char *name, char *pass) { pUser->superAuth = 1; } - code = TSDB_CODE_SUCCESS; - if (sdbInsertRow(tsUserSdb, pUser, SDB_OPER_GLOBAL) < 0) { + SSdbOperDesc oper = { + .type = SDB_OPER_TYPE_GLOBAL, + .table = tsUserSdb, + .pObj = pUser, + .rowSize = sizeof(SUserObj) + }; + + code = sdbInsertRow(&oper); + if (code != TSDB_CODE_SUCCESS) { tfree(pUser); code = TSDB_CODE_SDB_ERROR; } @@ -208,9 +208,19 @@ static int32_t mgmtDropUser(SAcctObj *pAcct, char *name) { return TSDB_CODE_NO_RIGHTS; } - sdbDeleteRow(tsUserSdb, pUser, SDB_OPER_GLOBAL); + SSdbOperDesc oper = { + .type = SDB_OPER_TYPE_GLOBAL, + .table = tsUserSdb, + .pObj = pUser + }; - return 0; + int32_t code = sdbDeleteRow(&oper); + if (code != TSDB_CODE_SUCCESS) { + tfree(pUser); + code = TSDB_CODE_SDB_ERROR; + } + + return code; } static int32_t mgmtGetUserMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { @@ -359,7 +369,7 @@ static void mgmtProcessAlterUserMsg(SQueuedMsg *pMsg) { memset(pUser->pass, 0, sizeof(pUser->pass)); taosEncryptPass((uint8_t*)pAlter->pass, strlen(pAlter->pass), pUser->pass); code = mgmtUpdateUser(pUser); - mLPrint("user:%s password is altered by %s, code:%d", pAlter->user, pUser->user, code); + mLPrint("user:%s password is altered by %s, result:%d", pUser->user, pOperUser->user, tstrerror(code)); } else { code = TSDB_CODE_NO_RIGHTS; } @@ -394,10 +404,6 @@ static void mgmtProcessAlterUserMsg(SQueuedMsg *pMsg) { } if (hasRight) { - //if (pAlter->privilege == 1) { // super - // pUser->superAuth = 1; - // pUser->writeAuth = 1; - //} if (pAlter->privilege == 2) { // read pUser->superAuth = 0; pUser->writeAuth = 0; @@ -408,7 +414,7 @@ static void mgmtProcessAlterUserMsg(SQueuedMsg *pMsg) { } code = mgmtUpdateUser(pUser); - mLPrint("user:%s privilege is altered by %s, code:%d", pAlter->user, pUser->user, code); + mLPrint("user:%s privilege is altered by %s, result:%d", pUser->user, pOperUser->user, tstrerror(code)); } else { code = TSDB_CODE_NO_RIGHTS; }