提交 d2e3ed8b 编写于 作者: S slguan

[TD-15] refact code of sdb

上级 51171c75
......@@ -53,8 +53,8 @@ typedef struct {
int8_t numOfMnodes;
int32_t numOfDnodes;
char mnodeName[TSDB_DNODE_NAME_LEN + 1];
char reserved[7];
char updateEnd[1];
int8_t reserved[15];
int8_t updateEnd[1];
int syncFd;
void *hbTimer;
void *pSync;
......@@ -78,8 +78,8 @@ typedef struct {
float lbScore; // calc in balance function
int32_t customScore; // config by user
char dnodeName[TSDB_DNODE_NAME_LEN + 1];
char reserved[7];
char updateEnd[1];
int8_t reserved[15];
int8_t updateEnd[1];
SVnodeLoad vload[TSDB_MAX_VNODES];
int32_t status;
uint32_t lastReboot; // time stamp for last reboot
......@@ -120,7 +120,7 @@ typedef struct SSuperTableObj {
int32_t sversion;
int32_t numOfColumns;
int32_t numOfTags;
int8_t reserved[5];
int8_t reserved[15];
int8_t updateEnd[1];
int32_t numOfTables;
int16_t nextColId;
......@@ -152,7 +152,7 @@ typedef struct {
int32_t sversion;
int32_t numOfColumns;
int32_t sqlLen;
int8_t reserved[3];
int8_t reserved[7];
int8_t updateEnd[1];
char* sql; //null-terminated string
int16_t nextColId;
......@@ -167,7 +167,6 @@ typedef struct _vg_obj {
int64_t createdTime;
SVnodeGid vnodeGid[TSDB_VNODES_SUPPORT];
int32_t numOfVnodes;
int32_t numOfTables;
int32_t lbIp;
int32_t lbTime;
int8_t lbStatus;
......@@ -175,6 +174,7 @@ typedef struct _vg_obj {
int8_t updateEnd[1];
struct _vg_obj *prev, *next;
struct _db_obj *pDb;
int32_t numOfTables;
void * idPool;
STableInfo ** tableList;
} SVgObj;
......@@ -184,8 +184,8 @@ typedef struct _db_obj {
int8_t dirty;
int64_t createdTime;
SDbCfg cfg;
char reserved[15];
char updateEnd[1];
int8_t reserved[15];
int8_t updateEnd[1];
struct _db_obj *prev, *next;
int32_t numOfVgroups;
int32_t numOfTables;
......@@ -203,7 +203,7 @@ typedef struct _user_obj {
int64_t createdTime;
int8_t superAuth;
int8_t writeAuth;
int8_t reserved[16];
int8_t reserved[13];
int8_t updateEnd[1];
struct _user_obj *prev, *next;
struct _acctObj * pAcct;
......
......@@ -43,8 +43,7 @@ static void mgmtDestroyChildTable(SChildTableObj *pTable) {
}
static int32_t mgmtChildTableActionDestroy(SSdbOperDesc *pOper) {
SChildTableObj *pTable = pOper->pObj;
mgmtDestroyChildTable(pTable);
mgmtDestroyChildTable(pOper->pObj);
return TSDB_CODE_SUCCESS;
}
......@@ -69,14 +68,6 @@ static int32_t mgmtChildTableActionInsert(SSdbOperDesc *pOper) {
return TSDB_CODE_INVALID_ACCT;
}
if (!mgmtIsMaster()) {
int32_t sid = taosAllocateId(pVgroup->idPool);
if (sid != pTable->sid) {
mError("ctable:%s, sid:%d is not matched from the master:%d", pTable->tableId, sid, pTable->sid);
return TSDB_CODE_INVALID_SESSION_ID;
}
}
pTable->superTable = mgmtGetSuperTable(pTable->superTableId);
mgmtAddTableIntoSuperTable(pTable->superTable);
......@@ -84,10 +75,6 @@ static int32_t mgmtChildTableActionInsert(SSdbOperDesc *pOper) {
mgmtAddTableIntoDb(pDb);
mgmtAddTableIntoVgroup(pVgroup, (STableInfo *) pTable);
if (pVgroup->numOfTables >= pDb->cfg.maxSessions && pDb->numOfVgroups > 1) {
mgmtMoveVgroupToTail(pVgroup);
}
return TSDB_CODE_SUCCESS;
}
......@@ -117,13 +104,8 @@ static int32_t mgmtChildTableActionDelete(SSdbOperDesc *pOper) {
mgmtRestoreTimeSeries(pAcct, pTable->superTable->numOfColumns - 1);
mgmtRemoveTableFromDb(pDb);
mgmtRemoveTableFromVgroup(pVgroup, (STableInfo *) pTable);
mgmtRemoveTableFromSuperTable(pTable->superTable);
if (pVgroup->numOfTables > 0) {
mgmtMoveVgroupToHead(pVgroup);
}
return TSDB_CODE_SUCCESS;
}
......@@ -136,7 +118,9 @@ static int32_t mgmtChildTableActionEncode(SSdbOperDesc *pOper) {
assert(pTable != NULL && pOper->rowData != NULL);
memcpy(pOper->rowData, pTable, tsChildTableUpdateSize);
return tsChildTableUpdateSize;
pOper->rowSize = tsChildTableUpdateSize;
return TSDB_CODE_SUCCESS;
}
static int32_t mgmtChildTableActionDecode(SSdbOperDesc *pOper) {
......@@ -158,7 +142,7 @@ int32_t mgmtInitChildTables() {
SChildTableObj *pTable = NULL;
SChildTableObj tObj;
tsChildTableUpdateSize = tObj.updateEnd - (int8_t *)&tObj;
tsChildTableUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj;
SSdbTableDesc tableDesc = {
.tableName = "ctables",
......@@ -236,9 +220,6 @@ int32_t mgmtInitChildTables() {
continue;
}
pVgroup->tableList[pTable->sid] = (STableInfo*)pTable;
taosIdPoolMarkStatus(pVgroup->idPool, pTable->sid);
SSuperTableObj *pSuperTable = mgmtGetSuperTable(pTable->superTableId);
if (pSuperTable == NULL) {
mError("ctable:%s, stable:%s not exist", pTable->tableId, pTable->superTableId);
......@@ -251,12 +232,6 @@ int32_t mgmtInitChildTables() {
pNode = pLastNode;
continue;
}
pTable->superTable = pSuperTable;
mgmtAddTableIntoSuperTable(pSuperTable);
SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct);
mgmtAddTimeSeries(pAcct, pTable->superTable->numOfColumns - 1);
}
mTrace("child table is initialized");
......@@ -268,8 +243,13 @@ void mgmtCleanUpChildTables() {
}
void *mgmtBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableObj *pTable) {
char *pTagData = pMsg->schema + TSDB_TABLE_ID_LEN + 1;
int32_t tagDataLen = htonl(pMsg->contLen) - sizeof(SCMCreateTableMsg) - TSDB_TABLE_ID_LEN - 1;
char * pTagData = NULL;
int32_t tagDataLen = 0;
if (pMsg != NULL) {
pTagData = pMsg->schema + TSDB_TABLE_ID_LEN + 1;
tagDataLen = htonl(pMsg->contLen) - sizeof(SCMCreateTableMsg) - TSDB_TABLE_ID_LEN - 1;
}
int32_t totalCols = pTable->superTable->numOfColumns + pTable->superTable->numOfTags;
int32_t contLen = sizeof(SMDCreateTableMsg) + totalCols * sizeof(SSchema) + tagDataLen;
......@@ -302,7 +282,10 @@ void *mgmtBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableObj *pTab
pSchema++;
}
memcpy(pCreate->data + totalCols * sizeof(SSchema), pTagData, tagDataLen);
if (pMsg != NULL) {
memcpy(pCreate->data + totalCols * sizeof(SSchema), pTagData, tagDataLen);
}
return pCreate;
}
......
......@@ -117,7 +117,7 @@ static int32_t mgmtDbActionDecode(SSdbOperDesc *pOper) {
int32_t mgmtInitDbs() {
SDbObj tObj;
tsDbUpdateSize = tObj.updateEnd - (char *)&tObj;
tsDbUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj;
SSdbTableDesc tableDesc = {
.tableName = "dbs",
......@@ -276,8 +276,6 @@ static int32_t mgmtCheckDbParams(SCMCreateDbMsg *pCreate) {
pCreate->blocksPerTable = TSDB_MIN_AVG_BLOCKS;
}
pCreate->maxSessions++;
return TSDB_CODE_SUCCESS;
}
......
......@@ -33,7 +33,7 @@
#include "mgmtTable.h"
#include "mgmtVgroup.h"
void *tsNormalTableSdb;
void *tsNormalTableSdb;
int32_t tsNormalTableUpdateSize;
static void mgmtDestroyNormalTable(SNormalTableObj *pTable) {
......@@ -43,8 +43,7 @@ static void mgmtDestroyNormalTable(SNormalTableObj *pTable) {
}
static int32_t mgmtNormalTableActionDestroy(SSdbOperDesc *pOper) {
SNormalTableObj *pTable = pOper->pObj;
mgmtDestroyNormalTable(pTable);
mgmtDestroyNormalTable(pOper->pObj);
return TSDB_CODE_SUCCESS;
}
......@@ -53,38 +52,26 @@ static int32_t mgmtNormalTableActionInsert(SSdbOperDesc *pOper) {
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
if (pVgroup == NULL) {
mError("id:%s not in vgroup:%d", pTable->tableId, pTable->vgId);
mError("ntable:%s not in vgroup:%d", pTable->tableId, pTable->vgId);
return TSDB_CODE_INVALID_VGROUP_ID;
}
SDbObj *pDb = mgmtGetDb(pVgroup->dbName);
if (pDb == NULL) {
mError("vgroup:%d not in DB:%s", pVgroup->vgId, pVgroup->dbName);
mError("ntable:%s, vgroup:%d not in DB:%s", pTable->tableId, pVgroup->vgId, pVgroup->dbName);
return TSDB_CODE_INVALID_DB;
}
SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct);
if (pAcct == NULL) {
mError("account not exists");
mError("ntable:%s, account:%s not exists", pTable->tableId, pDb->cfg.acct);
return TSDB_CODE_INVALID_ACCT;
}
if (!mgmtIsMaster()) {
int32_t sid = taosAllocateId(pVgroup->idPool);
if (sid != pTable->sid) {
mError("sid:%d is not matched from the master:%d", sid, pTable->sid);
return TSDB_CODE_INVALID_SESSION_ID;
}
}
mgmtAddTimeSeries(pAcct, pTable->numOfColumns - 1);
mgmtAddTableIntoDb(pDb);
mgmtAddTableIntoVgroup(pVgroup, (STableInfo *) pTable);
if (pVgroup->numOfTables >= pDb->cfg.maxSessions - 1 && pDb->numOfVgroups > 1) {
mgmtMoveVgroupToTail(pVgroup);
}
return TSDB_CODE_SUCCESS;
}
......@@ -101,7 +88,7 @@ static int32_t mgmtNormalTableActionDelete(SSdbOperDesc *pOper) {
SDbObj *pDb = mgmtGetDb(pVgroup->dbName);
if (pDb == NULL) {
mError("vgroup:%d not in DB:%s", pVgroup->vgId, pVgroup->dbName);
mError("ntable:%s, vgroup:%d not in DB:%s", pTable->tableId, pVgroup->vgId, pVgroup->dbName);
return TSDB_CODE_INVALID_DB;
}
......@@ -115,22 +102,10 @@ static int32_t mgmtNormalTableActionDelete(SSdbOperDesc *pOper) {
mgmtRemoveTableFromDb(pDb);
mgmtRemoveTableFromVgroup(pVgroup, (STableInfo *) pTable);
if (pVgroup->numOfTables > 0) {
mgmtMoveVgroupToHead(pVgroup);
}
return TSDB_CODE_SUCCESS;
}
static int32_t mgmtNormalTableActionUpdate(SSdbOperDesc *pOper) {
// SNormalTableObj *pTable = (SNormalTableObj *) pObj;
// memcpy(pTable, str, tsNormalTableUpdateSize);
// int32_t schemaSize = sizeof(SSchema) * (pTable->numOfColumns) + pTable->sqlLen;
// pTable->schema = realloc(pTable->schema, schemaSize);
// pTable->sql = (char*)pTable->schema + sizeof(SSchema) * (pTable->numOfColumns);
// memcpy(pTable->schema, str + tsNormalTableUpdateSize, schemaSize);
return TSDB_CODE_SUCCESS;
}
......@@ -139,25 +114,24 @@ static int32_t mgmtNormalTableActionEncode(SSdbOperDesc *pOper) {
assert(pOper->pObj != NULL && pOper->rowData != NULL);
int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema);
if (pOper->maxRowSize < tsNormalTableUpdateSize + schemaSize + 1) {
return -1;
if (pOper->maxRowSize < tsNormalTableUpdateSize + schemaSize) {
return TSDB_CODE_INVALID_MSG_LEN;
}
memcpy(pOper->rowData, pTable, tsNormalTableUpdateSize);
memcpy(pOper->rowData + tsNormalTableUpdateSize, pTable->schema, schemaSize);
memcpy(pOper->rowData + tsNormalTableUpdateSize + schemaSize, pTable->sql, pTable->sqlLen);
return tsNormalTableUpdateSize + schemaSize + pTable->sqlLen;
pOper->rowSize = tsNormalTableUpdateSize + schemaSize + pTable->sqlLen;
return TSDB_CODE_SUCCESS;
}
static int32_t mgmtNormalTableActionDecode(SSdbOperDesc *pOper) {
assert(pOper->rowData != NULL);
SNormalTableObj *pTable = (SNormalTableObj *)malloc(sizeof(SNormalTableObj));
if (pTable == NULL) {
return -1;
}
memset(pTable, 0, sizeof(SNormalTableObj));
SNormalTableObj *pTable = (SNormalTableObj *)calloc(1, sizeof(SNormalTableObj));
if (pTable == NULL) TSDB_CODE_SERV_OUT_OF_MEMORY;
memcpy(pTable, pOper->rowData, tsNormalTableUpdateSize);
int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema);
......@@ -176,6 +150,7 @@ static int32_t mgmtNormalTableActionDecode(SSdbOperDesc *pOper) {
}
memcpy(pTable->sql, pOper->rowData + tsNormalTableUpdateSize + schemaSize, pTable->sqlLen);
pOper->pObj = pTable;
return TSDB_CODE_SUCCESS;
}
......@@ -185,7 +160,7 @@ int32_t mgmtInitNormalTables() {
SNormalTableObj *pTable = NULL;
SNormalTableObj tObj;
tsNormalTableUpdateSize = tObj.updateEnd - (int8_t *)&tObj;
tsNormalTableUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj;
SSdbTableDesc tableDesc = {
.tableName = "ntables",
......@@ -261,15 +236,6 @@ int32_t mgmtInitNormalTables() {
pNode = pLastNode;
continue;
}
mgmtAddTableIntoVgroup(pVgroup, (STableInfo *)pTable);
//pVgroup->tableList[pTable->sid] = (STableInfo*)pTable;
taosIdPoolMarkStatus(pVgroup->idPool, pTable->sid);
pTable->sql = (char *)pTable->schema + sizeof(SSchema) * pTable->numOfColumns;
SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct);
mgmtAddTimeSeries(pAcct, pTable->numOfColumns - 1);
}
mTrace("ntables is initialized");
......
......@@ -309,11 +309,11 @@ static int32_t sdbInitTableByFile(SSdbTable *pTable) {
rowMeta.row = oper.pObj;
(*sdbAddIndexFp[pTable->keyType])(pTable->iHandle, rowMeta.row, &rowMeta);
pTable->numOfRows++;
sdbTrace("table:%s, read new record:%s, numOfRows:%d version:%" PRId64 ,
pTable->tableName, sdbGetkeyStr(pTable, rowHead->data), pTable->numOfRows, pTable->version);
sdbTrace("table:%s, version:%" PRId64 " numOfRows:%d, read new record:%s",
pTable->tableName, pTable->version, pTable->numOfRows, sdbGetkeyStr(pTable, rowHead->data));
} else {
sdbError("table:%s, failed to decode record:%s, numOfRows:%d version:%" PRId64 ,
pTable->tableName, sdbGetkeyStr(pTable, rowHead->data), pTable->numOfRows, pTable->version);
sdbTrace("table:%s, version:%" PRId64 " numOfRows:%d, failed to decode record:%s",
pTable->tableName, pTable->version, pTable->numOfRows, sdbGetkeyStr(pTable, rowHead->data));
}
}
} else {
......@@ -325,8 +325,8 @@ static int32_t sdbInitTableByFile(SSdbTable *pTable) {
(*pTable->destroyFp)(&oper);
(*sdbDeleteIndexFp[pTable->keyType])(pTable->iHandle, rowHead->data);
pTable->numOfRows--;
sdbTrace("table:%s, read deleted record:%s, numOfRows:%d version:%" PRId64 ,
pTable->tableName, sdbGetkeyStr(pTable, rowHead->data), pTable->numOfRows, pTable->version);
sdbTrace("table:%s, version:%" PRId64 " numOfRows:%d, read deleted record:%s",
pTable->tableName, pTable->version, pTable->numOfRows, sdbGetkeyStr(pTable, rowHead->data));
} else {
SRowMeta rowMeta;
rowMeta.version = rowHead->version;
......@@ -346,11 +346,11 @@ static int32_t sdbInitTableByFile(SSdbTable *pTable) {
if (code == TSDB_CODE_SUCCESS) {
rowMeta.row = oper.pObj;
(*sdbAddIndexFp[pTable->keyType])(pTable->iHandle, rowMeta.row, &rowMeta);
sdbTrace("table:%s, read updated record:%s, numOfRows:%d version:%" PRId64 ,
pTable->tableName, sdbGetkeyStr(pTable, rowHead->data), pTable->numOfRows, pTable->version);
sdbTrace("table:%s, version:%" PRId64 " numOfRows:%d, read updated record:%s",
pTable->tableName, pTable->version, pTable->numOfRows, sdbGetkeyStr(pTable, rowHead->data));
} else {
sdbError("table:%s, failed to decode record:%s, numOfRows:%d version:%" PRId64 ,
pTable->tableName, sdbGetkeyStr(pTable, rowHead->data), pTable->numOfRows, pTable->version);
sdbTrace("table:%s, version:%" PRId64 " numOfRows:%d, failed to decode record:%s",
pTable->tableName, pTable->version, pTable->numOfRows, sdbGetkeyStr(pTable, rowHead->data));
}
}
numOfChanged++;
......@@ -416,8 +416,8 @@ void *sdbOpenTable(SSdbTableDesc *pDesc) {
pTable->tableId = sdbNumOfTables++;
sdbTableList[pTable->tableId] = pTable;
sdbTrace("table:%s, is initialized, numOfRows:%d numOfTables:%d version:%" PRId64 " sdbversion:%" PRId64,
pTable->tableName, pTable->numOfRows, sdbNumOfTables, pTable->version, sdbVersion);
sdbTrace("table:%s, is initialized, sdbversion:%" PRId64 " version:%" PRId64 " numOfRows:%d numOfTables:%d",
pTable->tableName, sdbVersion, pTable->version, pTable->numOfRows, sdbNumOfTables);
return pTable;
}
......@@ -531,8 +531,8 @@ int32_t sdbInsertRow(SSdbOperDesc *pOper) {
pthread_mutex_unlock(&pTable->mutex);
sdbTrace("table:%s, inserte record:%s, sdbversion:%" PRId64 " version:%" PRId64 " rowSize:%d numOfRows:%d fileSize:%" PRId64,
pTable->tableName, sdbGetkeyStr(pTable, pOper->pObj), sdbVersion, pTable->version, pOper->rowSize, pTable->numOfRows, pTable->fileSize);
sdbTrace("table:%s, sdbversion:%" PRId64 " version:%" PRId64 " numOfRows:%d, insert record:%s, rowSize:%d fileSize:%" PRId64,
pTable->tableName, sdbVersion, pTable->version, pTable->numOfRows, sdbGetkeyStr(pTable, pOper->pObj), pOper->rowSize, pTable->fileSize);
(*pTable->insertFp)(pOper);
......@@ -614,8 +614,9 @@ int32_t sdbDeleteRow(SSdbOperDesc *pOper) {
tfree(rowHead);
sdbTrace("table:%s, delete record:%s, sdbversion:%" PRId64 " version:%" PRId64 " numOfRows:%d",
pTable->tableName, sdbGetkeyStr(pTable, pOper->pObj), sdbVersion, pTable->version, pTable->numOfRows);
sdbTrace("table:%s, sdbversion:%" PRId64 " version:%" PRId64 " numOfRows:%d, delete record:%s, rowSize:%d fileSize:%" PRId64,
pTable->tableName, sdbVersion, pTable->version, pTable->numOfRows, sdbGetkeyStr(pTable, pOper->pObj), pOper->rowSize, pTable->fileSize);
// Delete from current layer
(*sdbDeleteIndexFp[pTable->keyType])(pTable->iHandle, pOper->pObj);
......@@ -702,9 +703,9 @@ int32_t sdbUpdateRow(SSdbOperDesc *pOper) {
pTable->fileSize += real_size;
sdbFinishCommit(pTable);
sdbTrace("table:%s, update record:%s, sdbversion:%" PRId64 " version:%" PRId64 " numOfRows:%" PRId64,
pTable->tableName, sdbGetkeyStr(pTable, pOper->pObj), sdbVersion, pTable->version, pTable->numOfRows);
sdbTrace("table:%s, sdbversion:%" PRId64 " version:%" PRId64 " numOfRows:%d, update record:%s, rowSize:%d fileSize:%" PRId64,
pTable->tableName, sdbVersion, pTable->version, pTable->numOfRows, sdbGetkeyStr(pTable, pOper->pObj), pOper->rowSize, pTable->fileSize);
pMeta->version = pTable->version;
pMeta->offset = pTable->fileSize;
pMeta->rowSize = rowHead->rowSize;
......
......@@ -111,7 +111,7 @@ static int32_t mgmtSuperTableActionDecode(SSdbOperDesc *pOper) {
int32_t mgmtInitSuperTables() {
SSuperTableObj tObj;
tsSuperTableUpdateSize = tObj.updateEnd - (int8_t *)&tObj;
tsSuperTableUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj;
SSdbTableDesc tableDesc = {
.tableName = "stables",
......
......@@ -22,6 +22,7 @@
#include "mgmtDClient.h"
#include "mgmtDb.h"
#include "mgmtDnode.h"
#include "mgmtDServer.h"
#include "mgmtGrant.h"
#include "mgmtMnode.h"
#include "mgmtNormalTable.h"
......@@ -61,6 +62,7 @@ static void mgmtProcessDropStableRsp(SRpcMsg *rpcMsg);
static int32_t mgmtGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static void mgmtProcessGetTableMeta(STableInfo *pTable, void *thandle);
static void mgmtProcessTableCfgMsg(SRpcMsg *rpcMsg);
int32_t mgmtInitTables() {
int32_t code = mgmtInitSuperTables();
......@@ -90,6 +92,7 @@ int32_t mgmtInitTables() {
mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_TABLE_RSP, mgmtProcessDropTableRsp);
mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP, mgmtProcessAlterTableRsp);
mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_STABLE_RSP, mgmtProcessDropStableRsp);
mgmtAddDServerMsgHandle(TSDB_MSG_TYPE_DM_CONFIG_TABLE, mgmtProcessTableCfgMsg);
return TSDB_CODE_SUCCESS;
}
......@@ -760,9 +763,19 @@ static void mgmtProcessCreateTableRsp(SRpcMsg *rpcMsg) {
if (rpcMsg->code != TSDB_CODE_SUCCESS) {
if (pTable->type == TSDB_CHILD_TABLE) {
// sdbDeleteRow(tsChildTableSdb, pTable, SDB_OPER_GLOBAL);
SSdbOperDesc oper = {
.type = SDB_OPER_TYPE_GLOBAL,
.table = tsChildTableSdb,
.pObj = pTable
};
sdbDeleteRow(&oper);
} else if (pTable->type == TSDB_NORMAL_TABLE){
// sdbDeleteRow(tsNormalTableSdb, pTable, SDB_OPER_GLOBAL);
SSdbOperDesc oper = {
.type = SDB_OPER_TYPE_GLOBAL,
.table = tsNormalTableSdb,
.pObj = pTable
};
sdbDeleteRow(&oper);
} else {}
mError("table:%s, failed to create in dnode, reason:%s", pTable->tableId, tstrerror(rpcMsg->code));
mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code);
......@@ -816,19 +829,31 @@ static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg) {
}
if (pTable->type == TSDB_CHILD_TABLE) {
// if (sdbDeleteRow(tsChildTableSdb, pTable, SDB_OPER_GLOBAL) < 0) {
// mError("table:%s, update ctables sdb error", pTable->tableId);
// mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SDB_ERROR);
// free(queueMsg);
// return;
// }
SSdbOperDesc oper = {
.type = SDB_OPER_TYPE_GLOBAL,
.table = tsChildTableSdb,
.pObj = pTable
};
int32_t code = sdbDeleteRow(&oper);
if (code != TSDB_CODE_SUCCESS) {
mError("table:%s, update ctables sdb error", pTable->tableId);
mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SDB_ERROR);
free(queueMsg);
return;
}
} else if (pTable->type == TSDB_NORMAL_TABLE){
// if (sdbDeleteRow(tsNormalTableSdb, pTable, SDB_OPER_GLOBAL) < 0) {
// mError("table:%s, update ntables sdb error", pTable->tableId);
// mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SDB_ERROR);
// free(queueMsg);
// return;
// }
SSdbOperDesc oper = {
.type = SDB_OPER_TYPE_GLOBAL,
.table = tsNormalTableSdb,
.pObj = pTable
};
int32_t code = sdbDeleteRow(&oper);
if (code != TSDB_CODE_SUCCESS) {
mError("table:%s, update ntables sdb error", pTable->tableId);
mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SDB_ERROR);
free(queueMsg);
return;
}
}
if (pVgroup->numOfTables <= 0) {
......@@ -844,32 +869,48 @@ static void mgmtProcessDropStableRsp(SRpcMsg *rpcMsg) {
mTrace("drop stable rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code);
}
//
//
//static void mgmtProcessTableCfgMsg(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle) {
// SDMConfigTableMsg *pCfg = (SDMConfigTableMsg *) pCont;
// pCfg->dnode = htonl(pCfg->dnode);
// pCfg->vnode = htonl(pCfg->vnode);
// pCfg->sid = htonl(pCfg->sid);
// mTrace("dnode:%s, vnode:%d, sid:%d, receive table config msg", taosIpStr(pCfg->dnode), pCfg->vnode, pCfg->sid);
//
// if (!sdbMaster) {
// mError("dnode:%s, vnode:%d, sid:%d, not master, redirect it", taosIpStr(pCfg->dnode), pCfg->vnode, pCfg->sid);
// mgmtSendRspToDnode(thandle, msgType + 1, TSDB_CODE_REDIRECT, NULL, 0);
// return;
// }
//
// STableInfo *pTable = mgmtGetTableByPos(pCfg->dnode, pCfg->vnode, pCfg->sid);
// if (pTable == NULL) {
// mError("dnode:%s, vnode:%d, sid:%d, table not found", taosIpStr(pCfg->dnode), pCfg->vnode, pCfg->sid);
// mgmtSendRspToDnode(thandle, msgType + 1, TSDB_CODE_INVALID_TABLE, NULL, 0);
// return;
// }
//
// mgmtSendRspToDnode(thandle, msgType + 1, TSDB_CODE_SUCCESS, NULL, 0);
//
// //TODO
// SRpcIpSet ipSet = mgmtGetIpSetFromIp(pCfg->dnode);
// mgmtSendCreateTableMsg(NULL, &ipSet, NULL);
//}
//
\ No newline at end of file
static void mgmtProcessTableCfgMsg(SRpcMsg *rpcMsg) {
if (mgmtCheckRedirect(rpcMsg->handle)) return;
SDMConfigTableMsg *pCfg = (SDMConfigTableMsg *) rpcMsg->pCont;
pCfg->dnode = htonl(pCfg->dnode);
pCfg->vnode = htonl(pCfg->vnode);
pCfg->sid = htonl(pCfg->sid);
mTrace("dnode:%s, vnode:%d, sid:%d, receive table config msg", taosIpStr(pCfg->dnode), pCfg->vnode, pCfg->sid);
STableInfo *pTable = mgmtGetTableByPos(pCfg->dnode, pCfg->vnode, pCfg->sid);
if (pTable == NULL) {
mError("dnode:%s, vnode:%d, sid:%d, table not found", taosIpStr(pCfg->dnode), pCfg->vnode, pCfg->sid);
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_NOT_ACTIVE_TABLE);
return;
}
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_SUCCESS);
SMDCreateTableMsg *pMDCreate = NULL;
if (pTable->type == TSDB_CHILD_TABLE) {
mTrace("table:%s, is a child table, vgroup:%d sid:%d", pTable->tableId, pCfg->vnode, pCfg->sid);
pMDCreate = mgmtBuildCreateChildTableMsg(NULL, (SChildTableObj *) pTable);
if (pMDCreate == NULL) {
return;
}
} else if (pTable->type == TSDB_NORMAL_TABLE) {
mTrace("table:%s, is a normal table, vgroup:%d sid:%d", pTable->tableId, pCfg->vnode, pCfg->sid);
pMDCreate = mgmtBuildCreateNormalTableMsg((SNormalTableObj *) pTable);
if (pMDCreate == NULL) {
return;
}
} else {
mError("table:%s, invalid msg type, vgroup:%d sid:%d", pTable->tableId, pCfg->vnode, pCfg->sid);
}
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pCfg->dnode);
SRpcMsg rpcRsp = {
.handle = NULL,
.pCont = pMDCreate,
.contLen = htonl(pMDCreate->contLen),
.code = 0,
.msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE
};
mgmtSendMsgToDnode(&ipSet, &rpcRsp);
}
......@@ -94,7 +94,7 @@ static int32_t mgmtUserActionDecode(SSdbOperDesc *pOper) {
int32_t mgmtInitUsers() {
SUserObj tObj;
tsUserUpdateSize = tObj.updateEnd - (int8_t *)&tObj;
tsUserUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj;
SSdbTableDesc tableDesc = {
.tableName = "users",
......@@ -471,7 +471,7 @@ static void mgmtProcessDropUserMsg(SQueuedMsg *pMsg) {
if (hasRight) {
code = mgmtDropUser(pUser->pAcct, pDrop->user);
if (code == TSDB_CODE_SUCCESS) {
mLPrint("user:%s is dropped by %s", pDrop->user, pUser->user);
mLPrint("user:%s is dropped by %s, result:%d", pUser->user, pOperUser->user, tstrerror(code));
}
} else {
code = TSDB_CODE_NO_RIGHTS;
......
......@@ -142,7 +142,7 @@ static int32_t mgmtVgroupActionDecode(SSdbOperDesc *pOper) {
int32_t mgmtInitVgroups() {
SVgObj tObj;
tsVgUpdateSize = tObj.updateEnd - (int8_t *)&tObj;
tsVgUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj;
SSdbTableDesc tableDesc = {
.tableName = "vgroups",
......@@ -433,13 +433,10 @@ void mgmtAddTableIntoVgroup(SVgObj *pVgroup, STableInfo *pTable) {
pVgroup->tableList[pTable->sid] = pTable;
taosIdPoolMarkStatus(pVgroup->idPool, pTable->sid);
pVgroup->numOfTables++;
pVgroup->pDb->numOfTables++;
}
if (pVgroup->numOfTables >= pVgroup->pDb->cfg.maxSessions)
mgmtAddVgroupIntoDbTail(pVgroup);
else
mgmtAddVgroupIntoDb(pVgroup);
}
void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, STableInfo *pTable) {
......@@ -447,13 +444,10 @@ void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, STableInfo *pTable) {
pVgroup->tableList[pTable->sid] = NULL;
taosFreeId(pVgroup->idPool, pTable->sid);
pVgroup->numOfTables--;
pVgroup->pDb->numOfTables--;
}
if (pVgroup->numOfTables >= pVgroup->pDb->cfg.maxSessions)
mgmtAddVgroupIntoDbTail(pVgroup);
else
mgmtAddVgroupIntoDb(pVgroup);
}
SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup) {
......
......@@ -15,12 +15,13 @@
#include "os.h"
#include "tlog.h"
#include <stdbool.h>
typedef struct {
int maxId;
int numOfFree;
int freeSlot;
int * freeList;
bool * freeList;
pthread_mutex_t mutex;
} id_pool_t;
......@@ -28,7 +29,7 @@ void *taosInitIdPool(int maxId) {
id_pool_t *pIdPool = calloc(1, sizeof(id_pool_t));
if (pIdPool == NULL) return NULL;
pIdPool->freeList = malloc(sizeof(int) * (size_t)maxId);
pIdPool->freeList = calloc(maxId, sizeof(bool));
if (pIdPool->freeList == NULL) {
free(pIdPool);
return NULL;
......@@ -40,10 +41,6 @@ void *taosInitIdPool(int maxId) {
pthread_mutex_init(&pIdPool->mutex, NULL);
for (int i = 0; i < maxId; ++i) {
pIdPool->freeList[i] = -1;
}
pTrace("pool:%p is setup, maxId:%d", pIdPool, pIdPool->maxId);
return pIdPool;
......@@ -61,8 +58,8 @@ int taosAllocateId(void *handle) {
if (pIdPool->numOfFree > 0) {
for (int i = 0; i < pIdPool->maxId; ++i) {
slot = (i + pIdPool->freeSlot) % pIdPool->maxId;
if (pIdPool->freeList[slot] == -1) {
pIdPool->freeList[slot] = slot;
if (!pIdPool->freeList[slot]) {
pIdPool->freeList[slot] = true;
pIdPool->freeSlot = slot + 1;
pIdPool->numOfFree--;
break;
......@@ -71,7 +68,7 @@ int taosAllocateId(void *handle) {
}
pthread_mutex_unlock(&pIdPool->mutex);
return slot;
return slot + 1;
}
void taosFreeId(void *handle, int id) {
......@@ -80,9 +77,9 @@ void taosFreeId(void *handle, int id) {
pthread_mutex_lock(&pIdPool->mutex);
int slot = id % pIdPool->maxId;
if (pIdPool->freeList[slot] != -1) {
pIdPool->freeList[slot] = -1;
int slot = (id - 1) % pIdPool->maxId;
if (pIdPool->freeList[slot]) {
pIdPool->freeList[slot] = false;
pIdPool->numOfFree++;
}
......@@ -114,9 +111,9 @@ void taosIdPoolMarkStatus(void *handle, int id) {
id_pool_t *pIdPool = handle;
pthread_mutex_lock(&pIdPool->mutex);
int slot = id % pIdPool->maxId;
if (pIdPool->freeList[slot] == -1) {
pIdPool->freeList[slot] = slot;
int slot = (id - 1) % pIdPool->maxId;
if (!pIdPool->freeList[slot]) {
pIdPool->freeList[slot] = true;
pIdPool->numOfFree--;
}
......@@ -129,18 +126,14 @@ int taosUpdateIdPool(id_pool_t *handle, int maxId) {
return -1;
}
int *idList = malloc(sizeof(int) * maxId);
int *idList = calloc(maxId, sizeof(bool));
if (idList == NULL) {
return -1;
}
pthread_mutex_lock(&pIdPool->mutex);
for (int i = 0; i < maxId; ++i) {
idList[i] = -1;
}
memcpy(idList, pIdPool->freeList, sizeof(int) * pIdPool->maxId);
memcpy(idList, pIdPool->freeList, sizeof(bool) * pIdPool->maxId);
pIdPool->numOfFree += (maxId - pIdPool->maxId);
pIdPool->maxId = maxId;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册