提交 2e3e7f32 编写于 作者: S slguan

[TD-15] fix sdb error while delete user

上级 d93d1093
......@@ -32,12 +32,12 @@ typedef enum {
typedef struct {
ESdbOperType type;
void * table;
void * pObj;
int64_t version;
int32_t maxRowSize;
int32_t rowSize;
void * rowData;
void * pObj;
void * table;
int64_t version;
} SSdbOperDesc;
typedef struct {
......
......@@ -290,6 +290,8 @@ static int32_t sdbInitTableByFile(SSdbTable *pTable) {
maxAutoIndex = MAX(maxAutoIndex, *(int32_t *) rowHead->data);
}
pTable->version = MAX(pTable->version, abs(rowHead->version));
void *pMetaRow = sdbGetRow(pTable, rowHead->data);
if (pMetaRow == NULL) {
if (rowHead->version < 0) {
......@@ -310,11 +312,11 @@ static int32_t sdbInitTableByFile(SSdbTable *pTable) {
rowMeta.row = oper.pObj;
(*sdbAddIndexFp[pTable->keyType])(pTable->iHandle, rowMeta.row, &rowMeta);
pTable->numOfRows++;
sdbTrace("table:%s, read record:%s and insert, numOfRows:%d version:%" PRId64 " sdbversion:%" PRId64,
pTable->tableName, sdbGetkeyStr(pTable, rowHead->data), pTable->numOfRows, pTable->version, sdbVersion);
sdbTrace("table:%s, read new record:%s, numOfRows:%d version:%" PRId64 ,
pTable->tableName, sdbGetkeyStr(pTable, rowHead->data), pTable->numOfRows, pTable->version);
} else {
sdbError("table:%s, failed to decode record:%s, numOfRows:%d version:%" PRId64 " sdbversion:%" PRId64,
pTable->tableName, sdbGetkeyStr(pTable, rowHead->data), pTable->numOfRows, pTable->version, sdbVersion);
sdbError("table:%s, failed to decode record:%s, numOfRows:%d version:%" PRId64 ,
pTable->tableName, sdbGetkeyStr(pTable, rowHead->data), pTable->numOfRows, pTable->version);
}
}
} else {
......@@ -326,8 +328,8 @@ static int32_t sdbInitTableByFile(SSdbTable *pTable) {
(*pTable->destroyFp)(&oper);
(*sdbDeleteIndexFp[pTable->keyType])(pTable->iHandle, rowHead->data);
pTable->numOfRows--;
sdbTrace("table:%s, read record:%s and delete, numOfRows:%d version:%" PRId64 " sdbversion:%" PRId64,
pTable->tableName, sdbGetkeyStr(pTable, rowHead->data), pTable->numOfRows, pTable->version, sdbVersion);
sdbTrace("table:%s, read deleted record:%s, numOfRows:%d version:%" PRId64 ,
pTable->tableName, sdbGetkeyStr(pTable, rowHead->data), pTable->numOfRows, pTable->version);
} else {
SRowMeta rowMeta;
rowMeta.version = rowHead->version;
......@@ -347,17 +349,14 @@ static int32_t sdbInitTableByFile(SSdbTable *pTable) {
if (code == TSDB_CODE_SUCCESS) {
rowMeta.row = oper.pObj;
(*sdbAddIndexFp[pTable->keyType])(pTable->iHandle, rowMeta.row, &rowMeta);
sdbTrace("table:%s, read record:%s and update, numOfRows:%d version:%" PRId64 " sdbversion:%" PRId64,
pTable->tableName, sdbGetkeyStr(pTable, rowHead->data), pTable->numOfRows, pTable->version, sdbVersion);
sdbTrace("table:%s, read updated record:%s, numOfRows:%d version:%" PRId64 ,
pTable->tableName, sdbGetkeyStr(pTable, rowHead->data), pTable->numOfRows, pTable->version);
} else {
sdbError("table:%s, failed to decode record:%s, numOfRows:%d version:%" PRId64 " sdbversion:%" PRId64,
pTable->tableName, sdbGetkeyStr(pTable, rowHead->data), pTable->numOfRows, pTable->version, sdbVersion);
sdbError("table:%s, failed to decode record:%s, numOfRows:%d version:%" PRId64 ,
pTable->tableName, sdbGetkeyStr(pTable, rowHead->data), pTable->numOfRows, pTable->version);
}
}
numOfChanged++;
if (pTable->version < abs(rowHead->version)) {
pTable->version = abs(rowHead->version);
}
}
pTable->fileSize += real_size;
......@@ -424,7 +423,7 @@ void *sdbOpenTable(SSdbTableDesc *pDesc) {
pTable->tableId = sdbNumOfTables++;
sdbTableList[pTable->tableId] = pTable;
sdbTrace("table:%s is initialized, numOfRows:%d, numOfTables:%d, version:%" PRId64 " sdbversion:%" PRId64,
sdbTrace("table:%s, is initialized, numOfRows:%d numOfTables:%d version:%" PRId64 " sdbversion:%" PRId64,
pTable->tableName, pTable->numOfRows, sdbNumOfTables, pTable->version, sdbVersion);
return pTable;
......@@ -470,6 +469,7 @@ int32_t sdbInsertRow(SSdbOperDesc *pOper) {
return TSDB_CODE_ALREADY_THERE;
}
pOper->maxRowSize = pTable->maxRowSize;
pthread_mutex_lock(&pTable->mutex);
if (pOper->type == SDB_OPER_TYPE_GLOBAL) {
......@@ -507,12 +507,11 @@ int32_t sdbInsertRow(SSdbOperDesc *pOper) {
pOper->rowData = rowHead->data;
(*pTable->encodeFp)(pOper);
rowHead->rowSize = pOper->rowSize;
rowHead->delimiter = SDB_DELIMITER;
rowHead->version = pTable->version;
assert(rowHead->rowSize > 0 && rowHead->rowSize <= pTable->maxRowSize);
int32_t real_size = sizeof(SRowHead) + rowHead->rowSize + sizeof(TSCKSUM);
rowHead->delimiter = SDB_DELIMITER;
rowHead->version = pTable->version;
if (taosCalcChecksumAppend(0, (uint8_t *)rowHead, real_size) < 0) {
sdbError("table:%s, failed to get checksum while inserting", pTable->tableName);
pTable->version--;
......@@ -540,7 +539,7 @@ int32_t sdbInsertRow(SSdbOperDesc *pOper) {
pthread_mutex_unlock(&pTable->mutex);
sdbTrace("table:%s, a record is inserted:%s, sdbversion:%" PRId64 " version:%" PRId64 " rowSize:%d numOfRows:%d fileSize:%" PRId64,
pTable->tableName, sdbGetkeyStr(pTable, pOper->pObj), sdbVersion, pOper->version, pOper->rowSize, pTable->numOfRows, pTable->fileSize);
pTable->tableName, sdbGetkeyStr(pTable, pOper->pObj), sdbVersion, pTable->version, pOper->rowSize, pTable->numOfRows, pTable->fileSize);
(*pTable->insertFp)(pOper);
......@@ -568,11 +567,11 @@ int32_t sdbDeleteRow(SSdbOperDesc *pOper) {
.type = SDB_FORWARD_TYPE_DELETE,
.tableId = pTable->tableId,
.version = pTable->version + 1,
.rowSize = pOper->rowSize,
.rowData = pOper->rowData,
.rowSize = pMeta->rowSize,
.rowData = pMeta->row,
};
if (sdbForwardDbReqToPeer(&forward) == 0) {
if (sdbForwardDbReqToPeer(&forward) != 0) {
sdbError("table:%s, failed to delete record", pTable->tableName);
pthread_mutex_unlock(&pTable->mutex);
return -1;
......@@ -593,7 +592,7 @@ int32_t sdbDeleteRow(SSdbOperDesc *pOper) {
int32_t rowSize = 0;
switch (pTable->keyType) {
case SDB_KEY_TYPE_STRING:
rowSize = strlen((char *)pOper->rowData) + 1;
rowSize = strlen((char *)pOper->pObj) + 1;
break;
case SDB_KEY_TYPE_AUTO:
rowSize = sizeof(uint64_t);
......@@ -605,8 +604,9 @@ int32_t sdbDeleteRow(SSdbOperDesc *pOper) {
rowHead->delimiter = SDB_DELIMITER;
rowHead->rowSize = rowSize;
rowHead->version = -(pTable->version);
memcpy(rowHead->data, pOper->rowData, pOper->rowSize);
if (taosCalcChecksumAppend(0, (uint8_t *)rowHead, total_size) < 0) {
memcpy(rowHead->data, pOper->pObj, rowSize);
int32_t real_size = sizeof(SRowHead) + rowHead->rowSize + sizeof(TSCKSUM);
if (taosCalcChecksumAppend(0, (uint8_t *)rowHead, real_size) < 0) {
sdbError("failed to get checksum while inserting, sdb:%s", pTable->tableName);
pTable->version--;
sdbVersion--;
......@@ -615,14 +615,14 @@ int32_t sdbDeleteRow(SSdbOperDesc *pOper) {
return -1;
}
twrite(pTable->fd, rowHead, total_size);
pTable->fileSize += total_size;
twrite(pTable->fd, rowHead, real_size);
pTable->fileSize += real_size;
sdbFinishCommit(pTable);
tfree(rowHead);
sdbTrace("table:%s, a record is deleted:%s, sdbversion:%" PRId64 " id:%" PRId64 " numOfRows:%d",
pTable->tableName, sdbGetkeyStr(pTable, pOper->rowData), sdbVersion, pTable->version, pTable->numOfRows);
pTable->tableName, sdbGetkeyStr(pTable, pOper->pObj), sdbVersion, pTable->version, pTable->numOfRows);
// Delete from current layer
(*sdbDeleteIndexFp[pTable->keyType])(pTable->iHandle, pOper->pObj);
......@@ -642,10 +642,10 @@ int32_t sdbUpdateRow(SSdbOperDesc *pOper) {
SSdbTable *pTable = (SSdbTable *)pOper->table;
if (pTable == NULL) return -1;
SRowMeta *pMeta = sdbGetRowMeta(pTable, pOper->rowData);
SRowMeta *pMeta = sdbGetRowMeta(pTable, pOper->pObj);
if (pMeta == NULL) {
sdbError("table:%s, failed to update record:%s, record is not there, sdbversion:%" PRId64 " id:%" PRId64,
pTable->tableName, sdbGetkeyStr(pTable, pOper->rowData), sdbVersion, pTable->version);
pTable->tableName, sdbGetkeyStr(pTable, pOper->pObj), sdbVersion, pTable->version);
return -1;
}
......@@ -658,11 +658,11 @@ int32_t sdbUpdateRow(SSdbOperDesc *pOper) {
SForwardMsg forward = {
.type = SDB_FORWARD_TYPE_UPDATE,
.tableId = pTable->tableId,
.version = pOper->version + 1,
.version = pTable->version + 1,
.rowSize = pOper->rowSize,
.rowData = pOper->rowData,
};
if (sdbForwardDbReqToPeer(&forward) == 0) {
if (sdbForwardDbReqToPeer(&forward) != 0) {
sdbError("table:%s, failed to update record", pTable->tableName);
pthread_mutex_unlock(&pTable->mutex);
return -1;
......@@ -710,7 +710,7 @@ int32_t sdbUpdateRow(SSdbOperDesc *pOper) {
sdbFinishCommit(pTable);
sdbTrace("table:%s, a record is updated:%s, sdbversion:%" PRId64 " id:%" PRId64 " numOfRows:%" PRId64,
pTable->tableName, sdbGetkeyStr(pTable, pOper->rowData), sdbVersion, pTable->version, pTable->numOfRows);
pTable->tableName, sdbGetkeyStr(pTable, pOper->pObj), sdbVersion, pTable->version, pTable->numOfRows);
pMeta->version = pTable->version;
pMeta->offset = pTable->fileSize;
......
......@@ -38,23 +38,20 @@ static void mgmtProcessCreateUserMsg(SQueuedMsg *pMsg);
static void mgmtProcessAlterUserMsg(SQueuedMsg *pMsg);
static void mgmtProcessDropUserMsg(SQueuedMsg *pMsg);
static int32_t mgmtUserActionDestroy(void *pObj) {
tfree(pObj);
static int32_t mgmtUserActionDestroy(SSdbOperDesc *pOper) {
tfree(pOper->pObj);
return TSDB_CODE_SUCCESS;
}
static int32_t mgmtUserActionInsert(void *pObj) {
SUserObj *pUser = (SUserObj *) pObj;
static int32_t mgmtUserActionInsert(SSdbOperDesc *pOper) {
SUserObj *pUser = pOper->pObj;
SAcctObj *pAcct = mgmtGetAcct(pUser->acct);
pUser->pAcct = pAcct;
mgmtAddUserIntoAcct(pAcct, pUser);
return TSDB_CODE_SUCCESS;
}
static int32_t mgmtUserActionDelete(void *pObj) {
SUserObj *pUser = (SUserObj *) pObj;
static int32_t mgmtUserActionDelete(SSdbOperDesc *pOper) {
SUserObj *pUser = pOper->pObj;
SAcctObj *pAcct = mgmtGetAcct(pUser->acct);
mgmtRemoveUserFromAcct(pAcct, pUser);
......@@ -62,36 +59,32 @@ static int32_t mgmtUserActionDelete(void *pObj) {
return TSDB_CODE_SUCCESS;
}
static int32_t mgmtUserActionUpdate(void *pObj) {
static int32_t mgmtUserActionUpdate(SSdbOperDesc *pOper) {
return TSDB_CODE_SUCCESS;
}
static int32_t mgmtUserActionEncode(void *pObj, void *pData, int32_t maxRowSize) {
SUserObj *pUser = (SUserObj *) pObj;
static int32_t mgmtUserActionEncode(SSdbOperDesc *pOper) {
SUserObj *pUser = pOper->pObj;
if (maxRowSize < tsUserUpdateSize) {
if (pOper->maxRowSize < tsUserUpdateSize) {
return -1;
} else {
memcpy(pData, pUser, tsUserUpdateSize);
return tsUserUpdateSize;
memcpy(pOper->rowData, pUser, tsUserUpdateSize);
pOper->rowSize = tsUserUpdateSize;
return TSDB_CODE_SUCCESS;
}
}
static void *mgmtUserActionDecode(void *pData) {
SUserObj *pUser = (SUserObj *) malloc(sizeof(SUserObj));
if (pUser == NULL) return NULL;
memset(pUser, 0, sizeof(SUserObj));
memcpy(pUser, pData, tsUserUpdateSize);
static int32_t mgmtUserActionDecode(SSdbOperDesc *pOper) {
SUserObj *pUser = (SUserObj *) calloc(1, sizeof(SUserObj));
if (pUser == NULL) return -1;
return pUser;
memcpy(pUser, pOper->rowData, tsUserUpdateSize);
pOper->pObj = pUser;
return TSDB_CODE_SUCCESS;
}
int32_t mgmtInitUsers() {
void *pNode = NULL;
SUserObj *pUser = NULL;
SAcctObj *pAcct = NULL;
int32_t numOfUsers = 0;
SUserObj tObj;
tsUserUpdateSize = tObj.updateEnd - (int8_t *)&tObj;
......@@ -99,7 +92,7 @@ int32_t mgmtInitUsers() {
.tableName = "users",
.hashSessions = TSDB_MAX_USERS,
.maxRowSize = tsUserUpdateSize,
.keyType = SDB_KEYTYPE_STRING,
.keyType = SDB_KEY_TYPE_STRING,
.insertFp = mgmtUserActionInsert,
.deleteFp = mgmtUserActionDelete,
.updateFp = mgmtUserActionUpdate,
......@@ -114,20 +107,7 @@ int32_t mgmtInitUsers() {
return -1;
}
while (1) {
pNode = sdbFetchRow(tsUserSdb, pNode, (void **)&pUser);
if (pUser == NULL) break;
pUser->prev = NULL;
pUser->next = NULL;
pAcct = mgmtGetAcct(pUser->acct);
mgmtAddUserIntoAcct(pAcct, pUser);
numOfUsers++;
}
pAcct = mgmtGetAcct("root");
SAcctObj *pAcct = mgmtGetAcct("root");
mgmtCreateUser(pAcct, "root", "taosdata");
mgmtCreateUser(pAcct, "monitor", tsInternalPass);
mgmtCreateUser(pAcct, "_root", tsInternalPass);
......@@ -151,7 +131,20 @@ SUserObj *mgmtGetUser(char *name) {
}
static int32_t mgmtUpdateUser(SUserObj *pUser) {
return sdbUpdateRow(tsUserSdb, pUser, tsUserUpdateSize, SDB_OPER_GLOBAL);
SSdbOperDesc oper = {
.type = SDB_OPER_TYPE_GLOBAL,
.table = tsUserSdb,
.pObj = pUser,
.rowSize = tsUserUpdateSize
};
int32_t code = sdbUpdateRow(&oper);
if (code != TSDB_CODE_SUCCESS) {
tfree(pUser);
code = TSDB_CODE_SDB_ERROR;
}
return code;
}
static int32_t mgmtCreateUser(SAcctObj *pAcct, char *name, char *pass) {
......@@ -186,8 +179,15 @@ static int32_t mgmtCreateUser(SAcctObj *pAcct, char *name, char *pass) {
pUser->superAuth = 1;
}
code = TSDB_CODE_SUCCESS;
if (sdbInsertRow(tsUserSdb, pUser, SDB_OPER_GLOBAL) < 0) {
SSdbOperDesc oper = {
.type = SDB_OPER_TYPE_GLOBAL,
.table = tsUserSdb,
.pObj = pUser,
.rowSize = sizeof(SUserObj)
};
code = sdbInsertRow(&oper);
if (code != TSDB_CODE_SUCCESS) {
tfree(pUser);
code = TSDB_CODE_SDB_ERROR;
}
......@@ -208,9 +208,19 @@ static int32_t mgmtDropUser(SAcctObj *pAcct, char *name) {
return TSDB_CODE_NO_RIGHTS;
}
sdbDeleteRow(tsUserSdb, pUser, SDB_OPER_GLOBAL);
SSdbOperDesc oper = {
.type = SDB_OPER_TYPE_GLOBAL,
.table = tsUserSdb,
.pObj = pUser
};
return 0;
int32_t code = sdbDeleteRow(&oper);
if (code != TSDB_CODE_SUCCESS) {
tfree(pUser);
code = TSDB_CODE_SDB_ERROR;
}
return code;
}
static int32_t mgmtGetUserMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
......@@ -359,7 +369,7 @@ static void mgmtProcessAlterUserMsg(SQueuedMsg *pMsg) {
memset(pUser->pass, 0, sizeof(pUser->pass));
taosEncryptPass((uint8_t*)pAlter->pass, strlen(pAlter->pass), pUser->pass);
code = mgmtUpdateUser(pUser);
mLPrint("user:%s password is altered by %s, code:%d", pAlter->user, pUser->user, code);
mLPrint("user:%s password is altered by %s, result:%d", pUser->user, pOperUser->user, tstrerror(code));
} else {
code = TSDB_CODE_NO_RIGHTS;
}
......@@ -394,10 +404,6 @@ static void mgmtProcessAlterUserMsg(SQueuedMsg *pMsg) {
}
if (hasRight) {
//if (pAlter->privilege == 1) { // super
// pUser->superAuth = 1;
// pUser->writeAuth = 1;
//}
if (pAlter->privilege == 2) { // read
pUser->superAuth = 0;
pUser->writeAuth = 0;
......@@ -408,7 +414,7 @@ static void mgmtProcessAlterUserMsg(SQueuedMsg *pMsg) {
}
code = mgmtUpdateUser(pUser);
mLPrint("user:%s privilege is altered by %s, code:%d", pAlter->user, pUser->user, code);
mLPrint("user:%s privilege is altered by %s, result:%d", pUser->user, pOperUser->user, tstrerror(code));
} else {
code = TSDB_CODE_NO_RIGHTS;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册