提交 51171c75 编写于 作者: S slguan

[TD-15]

上级 00e18b70
......@@ -39,6 +39,7 @@ int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName
int32_t mgmtGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STableMetaMsg *pMeta, bool usePublicIp);
void mgmtDropAllChildTables(SDbObj *pDropDb);
void mgmtDropAllChildTablesInStable(SSuperTableObj *pStable);
#ifdef __cplusplus
}
......
......@@ -42,14 +42,14 @@ static void mgmtDestroyChildTable(SChildTableObj *pTable) {
tfree(pTable);
}
static int32_t mgmtChildTableActionDestroy(void *pObj) {
SChildTableObj *pTable = (SChildTableObj *)pObj;
static int32_t mgmtChildTableActionDestroy(SSdbOperDesc *pOper) {
SChildTableObj *pTable = pOper->pObj;
mgmtDestroyChildTable(pTable);
return TSDB_CODE_SUCCESS;
}
static int32_t mgmtChildTableActionInsert(void *pObj) {
SChildTableObj *pTable = (SChildTableObj *) pObj;
static int32_t mgmtChildTableActionInsert(SSdbOperDesc *pOper) {
SChildTableObj *pTable = pOper->pObj;
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
if (pVgroup == NULL) {
......@@ -84,15 +84,15 @@ static int32_t mgmtChildTableActionInsert(void *pObj) {
mgmtAddTableIntoDb(pDb);
mgmtAddTableIntoVgroup(pVgroup, (STableInfo *) pTable);
if (pVgroup->numOfTables >= pDb->cfg.maxSessions - 1 && pDb->numOfVgroups > 1) {
mgmtMoveVgroupToTail(pDb, pVgroup);
if (pVgroup->numOfTables >= pDb->cfg.maxSessions && pDb->numOfVgroups > 1) {
mgmtMoveVgroupToTail(pVgroup);
}
return TSDB_CODE_SUCCESS;
}
static int32_t mgmtChildTableActionDelete(void *pObj) {
SChildTableObj *pTable = (SChildTableObj *) pObj;
static int32_t mgmtChildTableActionDelete(SSdbOperDesc *pOper) {
SChildTableObj *pTable = pOper->pObj;
if (pTable->vgId == 0) {
return TSDB_CODE_INVALID_VGROUP_ID;
}
......@@ -121,33 +121,35 @@ static int32_t mgmtChildTableActionDelete(void *pObj) {
mgmtRemoveTableFromSuperTable(pTable->superTable);
if (pVgroup->numOfTables > 0) {
mgmtMoveVgroupToHead(pDb, pVgroup);
mgmtMoveVgroupToHead(pVgroup);
}
return TSDB_CODE_SUCCESS;
}
static int32_t mgmtChildTableActionUpdate(void *pObj) {
static int32_t mgmtChildTableActionUpdate(SSdbOperDesc *pOper) {
return TSDB_CODE_SUCCESS;
}
static int32_t mgmtChildTableActionEncode(void *pObj, void *pData, int32_t maxRowSize) {
SChildTableObj *pTable = (SChildTableObj *) pObj;
assert(pObj != NULL && pData != NULL);
static int32_t mgmtChildTableActionEncode(SSdbOperDesc *pOper) {
SChildTableObj *pTable = pOper->pObj;
assert(pTable != NULL && pOper->rowData != NULL);
memcpy(pData, pTable, tsChildTableUpdateSize);
memcpy(pOper->rowData, pTable, tsChildTableUpdateSize);
return tsChildTableUpdateSize;
}
static void *mgmtChildTableActionDecode(void *pData) {
assert(pData != NULL);
static int32_t mgmtChildTableActionDecode(SSdbOperDesc *pOper) {
assert(pOper->rowData != NULL);
SChildTableObj *pTable = (SChildTableObj *)calloc(sizeof(SChildTableObj), 1);
if (pTable == NULL) return NULL;
pOper->pObj = calloc(1, sizeof(SChildTableObj));
if (pOper->pObj == NULL) {
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
memcpy(pTable, pData, tsChildTableUpdateSize);
memcpy(pOper->pObj, pOper->rowData, tsChildTableUpdateSize);
return (void *)pTable;
return TSDB_CODE_SUCCESS;
}
int32_t mgmtInitChildTables() {
......@@ -162,7 +164,7 @@ int32_t mgmtInitChildTables() {
.tableName = "ctables",
.hashSessions = tsMaxTables,
.maxRowSize = tsChildTableUpdateSize,
.keyType = SDB_KEYTYPE_STRING,
.keyType = SDB_KEY_TYPE_STRING,
.insertFp = mgmtChildTableActionInsert,
.deleteFp = mgmtChildTableActionDelete,
.updateFp = mgmtChildTableActionUpdate,
......@@ -187,7 +189,11 @@ int32_t mgmtInitChildTables() {
SDbObj *pDb = mgmtGetDbByTableId(pTable->tableId);
if (pDb == NULL) {
mError("ctable:%s, failed to get db, discard it", pTable->tableId);
sdbDeleteRow(tsChildTableSdb, pTable, SDB_OPER_DISK);
SSdbOperDesc desc = {0};
desc.type = SDB_OPER_TYPE_LOCAL;
desc.pObj = pTable;
desc.table = tsChildTableSdb;
sdbDeleteRow(&desc);
pNode = pLastNode;
continue;
}
......@@ -196,7 +202,11 @@ int32_t mgmtInitChildTables() {
if (pVgroup == NULL) {
mError("ctable:%s, failed to get vgroup:%d sid:%d, discard it", pTable->tableId, pTable->vgId, pTable->sid);
pTable->vgId = 0;
sdbDeleteRow(tsChildTableSdb, pTable, SDB_OPER_DISK);
SSdbOperDesc desc = {0};
desc.type = SDB_OPER_TYPE_LOCAL;
desc.pObj = pTable;
desc.table = tsChildTableSdb;
sdbDeleteRow(&desc);
pNode = pLastNode;
continue;
}
......@@ -205,7 +215,11 @@ int32_t mgmtInitChildTables() {
mError("ctable:%s, db:%s not match with vgroup:%d db:%s sid:%d, discard it",
pTable->tableId, pDb->name, pTable->vgId, pVgroup->dbName, pTable->sid);
pTable->vgId = 0;
sdbDeleteRow(tsChildTableSdb, pTable, SDB_OPER_DISK);
SSdbOperDesc desc = {0};
desc.type = SDB_OPER_TYPE_LOCAL;
desc.pObj = pTable;
desc.table = tsChildTableSdb;
sdbDeleteRow(&desc);
pNode = pLastNode;
continue;
}
......@@ -213,19 +227,27 @@ int32_t mgmtInitChildTables() {
if (pVgroup->tableList == NULL) {
mError("ctable:%s, vgroup:%d tableList is null", pTable->tableId, pTable->vgId);
pTable->vgId = 0;
sdbDeleteRow(tsChildTableSdb, pTable, SDB_OPER_DISK);
SSdbOperDesc desc = {0};
desc.type = SDB_OPER_TYPE_LOCAL;
desc.pObj = pTable;
desc.table = tsChildTableSdb;
sdbDeleteRow(&desc);
pNode = pLastNode;
continue;
}
pVgroup->tableList[pTable->sid] = (STableInfo*)pTable;
taosIdPoolMarkStatus(pVgroup->idPool, pTable->sid, 1);
taosIdPoolMarkStatus(pVgroup->idPool, pTable->sid);
SSuperTableObj *pSuperTable = mgmtGetSuperTable(pTable->superTableId);
if (pSuperTable == NULL) {
mError("ctable:%s, stable:%s not exist", pTable->tableId, pTable->superTableId);
pTable->vgId = 0;
sdbDeleteRow(tsChildTableSdb, pTable, SDB_OPER_DISK);
SSdbOperDesc desc = {0};
desc.type = SDB_OPER_TYPE_LOCAL;
desc.pObj = pTable;
desc.table = tsChildTableSdb;
sdbDeleteRow(&desc);
pNode = pLastNode;
continue;
}
......@@ -310,7 +332,13 @@ void* mgmtCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t
pTable->vgId = pVgroup->vgId;
pTable->superTable = pSuperTable;
if (sdbInsertRow(tsChildTableSdb, pTable, SDB_OPER_GLOBAL) < 0) {
SSdbOperDesc desc = {0};
desc.type = SDB_OPER_TYPE_GLOBAL;
desc.pObj = pTable;
desc.table = tsChildTableSdb;
sdbInsertRow(&desc);
if (sdbInsertRow(&desc) < 0) {
free(pTable);
mError("ctable:%s, update sdb error", pCreate->tableId);
terrno = TSDB_CODE_SDB_ERROR;
......@@ -453,12 +481,45 @@ void mgmtDropAllChildTables(SDbObj *pDropDb) {
}
if (strncmp(pDropDb->name, pTable->tableId, dbNameLen) == 0) {
sdbDeleteRow(tsChildTableSdb, pTable, SDB_OPER_LOCAL);
SSdbOperDesc oper = {
.type = SDB_OPER_TYPE_LOCAL,
.table = tsChildTableSdb,
.pObj = pTable,
};
sdbDeleteRow(&oper);
pNode = pLastNode;
numOfTables++;
continue;
}
}
mTrace("db:%s, all child tables:%d is dropped from sdb", pDropDb->name, numOfTables);
}
void mgmtDropAllChildTablesInStable(SSuperTableObj *pStable) {
void *pNode = NULL;
void *pLastNode = NULL;
int32_t numOfTables = 0;
SChildTableObj *pTable = NULL;
while (1) {
pNode = sdbFetchRow(tsChildTableSdb, pNode, (void **)&pTable);
if (pTable == NULL) {
break;
}
if (pTable->superTable == pStable) {
SSdbOperDesc oper = {
.type = SDB_OPER_TYPE_LOCAL,
.table = tsChildTableSdb,
.pObj = pTable,
};
sdbDeleteRow(&oper);
pNode = pLastNode;
numOfTables ++;
numOfTables++;
continue;
}
}
mTrace("db:%s, all child tables:%d is dropped", pDropDb->name, numOfTables);
mTrace("stable:%s, all child tables:%d is dropped from sdb", pStable->tableId, numOfTables);
}
\ No newline at end of file
......@@ -547,9 +547,6 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
return ;
}
uint32_t lastPrivateIp = pDnode->privateIp;
uint32_t lastPublicIp = pDnode->publicIp;
pDnode->privateIp = htonl(pStatus->privateIp);
pDnode->publicIp = htonl(pStatus->publicIp);
pDnode->lastReboot = htonl(pStatus->lastReboot);
......
......@@ -42,14 +42,14 @@ static void mgmtDestroyNormalTable(SNormalTableObj *pTable) {
tfree(pTable);
}
static int32_t mgmtNormalTableActionDestroy(void *pObj) {
SNormalTableObj *pTable = (SNormalTableObj *)pObj;
static int32_t mgmtNormalTableActionDestroy(SSdbOperDesc *pOper) {
SNormalTableObj *pTable = pOper->pObj;
mgmtDestroyNormalTable(pTable);
return TSDB_CODE_SUCCESS;
}
static int32_t mgmtNormalTableActionInsert(void *pObj) {
SNormalTableObj *pTable = (SNormalTableObj *) pObj;
static int32_t mgmtNormalTableActionInsert(SSdbOperDesc *pOper) {
SNormalTableObj *pTable = pOper->pObj;
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
if (pVgroup == NULL) {
......@@ -82,14 +82,14 @@ static int32_t mgmtNormalTableActionInsert(void *pObj) {
mgmtAddTableIntoVgroup(pVgroup, (STableInfo *) pTable);
if (pVgroup->numOfTables >= pDb->cfg.maxSessions - 1 && pDb->numOfVgroups > 1) {
mgmtMoveVgroupToTail(pDb, pVgroup);
mgmtMoveVgroupToTail(pVgroup);
}
return TSDB_CODE_SUCCESS;
}
static int32_t mgmtNormalTableActionDelete(void *pObj) {
SNormalTableObj *pTable = (SNormalTableObj *) pObj;
static int32_t mgmtNormalTableActionDelete(SSdbOperDesc *pOper) {
SNormalTableObj *pTable = pOper->pObj;
if (pTable->vgId == 0) {
return TSDB_CODE_INVALID_VGROUP_ID;
}
......@@ -116,13 +116,13 @@ static int32_t mgmtNormalTableActionDelete(void *pObj) {
mgmtRemoveTableFromVgroup(pVgroup, (STableInfo *) pTable);
if (pVgroup->numOfTables > 0) {
mgmtMoveVgroupToHead(pDb, pVgroup);
mgmtMoveVgroupToHead(pVgroup);
}
return TSDB_CODE_SUCCESS;
}
static int32_t mgmtNormalTableActionUpdate(void *pObj) {
static int32_t mgmtNormalTableActionUpdate(SSdbOperDesc *pOper) {
// SNormalTableObj *pTable = (SNormalTableObj *) pObj;
// memcpy(pTable, str, tsNormalTableUpdateSize);
......@@ -134,48 +134,49 @@ static int32_t mgmtNormalTableActionUpdate(void *pObj) {
return TSDB_CODE_SUCCESS;
}
static int32_t mgmtNormalTableActionEncode(void *pObj, void *pData, int32_t maxRowSize) {
SNormalTableObj *pTable = (SNormalTableObj *) pObj;
assert(pObj != NULL && pData != NULL);
static int32_t mgmtNormalTableActionEncode(SSdbOperDesc *pOper) {
SNormalTableObj *pTable = pOper->pObj;
assert(pOper->pObj != NULL && pOper->rowData != NULL);
int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema);
if (maxRowSize < tsNormalTableUpdateSize + schemaSize + 1) {
if (pOper->maxRowSize < tsNormalTableUpdateSize + schemaSize + 1) {
return -1;
}
memcpy(pData, pTable, tsNormalTableUpdateSize);
memcpy(pData + tsNormalTableUpdateSize, pTable->schema, schemaSize);
memcpy(pData + tsNormalTableUpdateSize + schemaSize, pTable->sql, pTable->sqlLen);
memcpy(pOper->rowData, pTable, tsNormalTableUpdateSize);
memcpy(pOper->rowData + tsNormalTableUpdateSize, pTable->schema, schemaSize);
memcpy(pOper->rowData + tsNormalTableUpdateSize + schemaSize, pTable->sql, pTable->sqlLen);
return tsNormalTableUpdateSize + schemaSize + pTable->sqlLen;
}
static void *mgmtNormalTableActionDecode(void *pData) {
assert(pData != NULL);
static int32_t mgmtNormalTableActionDecode(SSdbOperDesc *pOper) {
assert(pOper->rowData != NULL);
SNormalTableObj *pTable = (SNormalTableObj *)malloc(sizeof(SNormalTableObj));
if (pTable == NULL) {
return NULL;
return -1;
}
memset(pTable, 0, sizeof(SNormalTableObj));
memcpy(pTable, pData, tsNormalTableUpdateSize);
memcpy(pTable, pOper->rowData, tsNormalTableUpdateSize);
int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema);
pTable->schema = (SSchema *)malloc(schemaSize);
if (pTable->schema == NULL) {
mgmtDestroyNormalTable(pTable);
return NULL;
return -1;
}
memcpy(pTable->schema, pData + tsNormalTableUpdateSize, schemaSize);
memcpy(pTable->schema, pOper->rowData + tsNormalTableUpdateSize, schemaSize);
pTable->sql = (char *)malloc(pTable->sqlLen);
if (pTable->sql == NULL) {
mgmtDestroyNormalTable(pTable);
return NULL;
return -1;
}
memcpy(pTable->sql, pData + tsNormalTableUpdateSize + schemaSize, pTable->sqlLen);
return (void *)pTable;
memcpy(pTable->sql, pOper->rowData + tsNormalTableUpdateSize + schemaSize, pTable->sqlLen);
return TSDB_CODE_SUCCESS;
}
int32_t mgmtInitNormalTables() {
......@@ -190,7 +191,7 @@ int32_t mgmtInitNormalTables() {
.tableName = "ntables",
.hashSessions = TSDB_MAX_NORMAL_TABLES,
.maxRowSize = sizeof(SNormalTableObj) + sizeof(SSchema) * TSDB_MAX_COLUMNS,
.keyType = SDB_KEYTYPE_STRING,
.keyType = SDB_KEY_TYPE_STRING,
.insertFp = mgmtNormalTableActionInsert,
.deleteFp = mgmtNormalTableActionDelete,
.updateFp = mgmtNormalTableActionUpdate,
......@@ -213,7 +214,11 @@ int32_t mgmtInitNormalTables() {
SDbObj *pDb = mgmtGetDbByTableId(pTable->tableId);
if (pDb == NULL) {
mError("ntable:%s, failed to get db, discard it", pTable->tableId);
sdbDeleteRow(tsNormalTableSdb, pTable, SDB_OPER_DISK);
SSdbOperDesc desc = {0};
desc.type = SDB_OPER_TYPE_LOCAL;
desc.pObj = pTable;
desc.table = tsNormalTableSdb;
sdbDeleteRow(&desc);
pNode = pLastNode;
continue;
}
......@@ -222,7 +227,11 @@ int32_t mgmtInitNormalTables() {
if (pVgroup == NULL) {
mError("ntable:%s, failed to get vgroup:%d sid:%d, discard it", pTable->tableId, pTable->vgId, pTable->sid);
pTable->vgId = 0;
sdbDeleteRow(tsNormalTableSdb, pTable, SDB_OPER_DISK);
SSdbOperDesc desc = {0};
desc.type = SDB_OPER_TYPE_LOCAL;
desc.pObj = pTable;
desc.table = tsNormalTableSdb;
sdbDeleteRow(&desc);
pNode = pLastNode;
continue;
}
......@@ -231,7 +240,12 @@ int32_t mgmtInitNormalTables() {
mError("ntable:%s, db:%s not match with vgroup:%d db:%s sid:%d, discard it",
pTable->tableId, pDb->name, pTable->vgId, pVgroup->dbName, pTable->sid);
pTable->vgId = 0;
sdbDeleteRow(tsNormalTableSdb, pTable, SDB_OPER_DISK);
SSdbOperDesc desc = {0};
desc.type = SDB_OPER_TYPE_LOCAL;
desc.pObj = pTable;
desc.table = tsNormalTableSdb;
sdbDeleteRow(&desc);
pNode = pLastNode;
continue;
}
......@@ -239,14 +253,18 @@ int32_t mgmtInitNormalTables() {
if (pVgroup->tableList == NULL) {
mError("ntable:%s, vgroup:%d tableList is null", pTable->tableId, pTable->vgId);
pTable->vgId = 0;
sdbDeleteRow(tsNormalTableSdb, pTable, SDB_OPER_DISK);
SSdbOperDesc desc = {0};
desc.type = SDB_OPER_TYPE_LOCAL;
desc.pObj = pTable;
desc.table = tsNormalTableSdb;
sdbDeleteRow(&desc);
pNode = pLastNode;
continue;
}
mgmtAddTableIntoVgroup(pVgroup, (STableInfo *)pTable);
//pVgroup->tableList[pTable->sid] = (STableInfo*)pTable;
taosIdPoolMarkStatus(pVgroup->idPool, pTable->sid, 1);
taosIdPoolMarkStatus(pVgroup->idPool, pTable->sid);
pTable->sql = (char *)pTable->schema + sizeof(SSchema) * pTable->numOfColumns;
......@@ -346,7 +364,11 @@ void *mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t
mTrace("table:%s, stream sql len:%d sql:%s", pTable->tableId, pTable->sqlLen, pTable->sql);
}
if (sdbInsertRow(tsNormalTableSdb, pTable, SDB_OPER_GLOBAL) < 0) {
SSdbOperDesc desc = {0};
desc.type = SDB_OPER_TYPE_GLOBAL;
desc.pObj = pTable;
desc.table = tsNormalTableSdb;
if (sdbInsertRow(&desc) < 0) {
mError("table:%s, update sdb error", pTable->tableId);
free(pTable);
terrno = TSDB_CODE_SDB_ERROR;
......@@ -443,7 +465,14 @@ int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int3
pTable->sversion++;
pAcct->acctInfo.numOfTimeSeries += ncols;
sdbUpdateRow(tsNormalTableSdb, pTable, tsNormalTableUpdateSize, SDB_OPER_GLOBAL);
SSdbOperDesc desc = {0};
desc.type = SDB_OPER_TYPE_GLOBAL;
desc.pObj = pTable;
desc.table = tsNormalTableSdb;
desc.rowData = pTable;
desc.rowSize = tsNormalTableUpdateSize;
sdbUpdateRow(&desc);
return TSDB_CODE_SUCCESS;
}
......@@ -472,7 +501,14 @@ int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *colName)
pTable->sversion++;
pAcct->acctInfo.numOfTimeSeries--;
sdbUpdateRow(tsNormalTableSdb, pTable, tsNormalTableUpdateSize, SDB_OPER_GLOBAL);
SSdbOperDesc desc = {0};
desc.type = SDB_OPER_TYPE_GLOBAL;
desc.pObj = pTable;
desc.table = tsNormalTableSdb;
desc.rowData = pTable;
desc.rowSize = tsNormalTableUpdateSize;
sdbUpdateRow(&desc);
return TSDB_CODE_SUCCESS;
}
......@@ -533,12 +569,17 @@ void mgmtDropAllNormalTables(SDbObj *pDropDb) {
if (pTable == NULL) break;
if (strncmp(pDropDb->name, pTable->tableId, dbNameLen) == 0) {
sdbDeleteRow(tsNormalTableSdb, pTable, SDB_OPER_LOCAL);
SSdbOperDesc oper = {
.type = SDB_OPER_TYPE_LOCAL,
.table = tsNormalTableSdb,
.pObj = pTable,
};
sdbDeleteRow(&oper);
pNode = pLastNode;
numOfTables ++;
numOfTables++;
continue;
}
}
mTrace("db:%s, all normal tables:%d is dropped", pDropDb->name, numOfTables);
mTrace("db:%s, all normal tables:%d is dropped from sdb", pDropDb->name, numOfTables);
}
......@@ -32,93 +32,84 @@
#include "name.h"
#include "tsqlfunction.h"
static void *tsSuperTableSdb;
static void *tsSuperTableSdb;
static int32_t tsSuperTableUpdateSize;
static int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static int32_t mgmtGetShowSuperTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static void mgmtDestroySuperTable(SSuperTableObj *pTable) {
tfree(pTable->schema);
tfree(pTable);
static void mgmtDestroySuperTable(SSuperTableObj *pStable) {
tfree(pStable->schema);
tfree(pStable);
}
static int32_t mgmtSuperTableActionDestroy(void *pObj) {
SSuperTableObj *pTable = (SSuperTableObj *) pObj;
mgmtDestroySuperTable(pTable);
static int32_t mgmtSuperTableActionDestroy(SSdbOperDesc *pOper) {
mgmtDestroySuperTable(pOper->pObj);
return TSDB_CODE_SUCCESS;
}
static int32_t mgmtSuperTableActionInsert(void *pObj) {
STableInfo *pTable = (STableInfo *) pObj;
SDbObj *pDb = mgmtGetDbByTableId(pTable->tableId);
if (pDb) {
static int32_t mgmtSuperTableActionInsert(SSdbOperDesc *pOper) {
STableInfo *pStable = pOper->pObj;
SDbObj *pDb = mgmtGetDbByTableId(pStable->tableId);
if (pDb != NULL) {
mgmtAddSuperTableIntoDb(pDb);
}
return TSDB_CODE_SUCCESS;
}
static int32_t mgmtSuperTableActionDelete(void *pObj) {
STableInfo *pTable = (STableInfo *) pObj;
SDbObj *pDb = mgmtGetDbByTableId(pTable->tableId);
if (pDb) {
static int32_t mgmtSuperTableActionDelete(SSdbOperDesc *pOper) {
STableInfo *pStable = pOper->pObj;
SDbObj *pDb = mgmtGetDbByTableId(pStable->tableId);
if (pDb != NULL) {
mgmtRemoveSuperTableFromDb(pDb);
mgmtDropAllChildTablesInStable((SSuperTableObj *)pStable);
}
return TSDB_CODE_SUCCESS;
}
static int32_t mgmtSuperTableActionUpdate(void *pObj) {
SSuperTableObj *pTable = (SSuperTableObj *) pObj;
memcpy(pTable, pObj, tsSuperTableUpdateSize);
int32_t schemaSize = sizeof(SSchema) * (pTable->numOfColumns + pTable->numOfTags);
pTable->schema = realloc(pTable->schema, schemaSize);
memcpy(pTable->schema, pObj + tsSuperTableUpdateSize, schemaSize);
static int32_t mgmtSuperTableActionUpdate(SSdbOperDesc *pOper) {
return TSDB_CODE_SUCCESS;
}
static int32_t mgmtSuperTableActionEncode(void *pObj, void *pData, int32_t maxRowSize) {
SSuperTableObj *pTable = (SSuperTableObj *) pObj;
assert(pObj != NULL && pData != NULL);
static int32_t mgmtSuperTableActionEncode(SSdbOperDesc *pOper) {
SSuperTableObj *pStable = pOper->pObj;
assert(pOper->pObj != NULL && pOper->rowData != NULL);
int32_t schemaSize = sizeof(SSchema) * (pTable->numOfColumns + pTable->numOfTags);
int32_t schemaSize = sizeof(SSchema) * (pStable->numOfColumns + pStable->numOfTags);
if (maxRowSize < tsSuperTableUpdateSize + schemaSize + 1) {
if (pOper->maxRowSize < tsSuperTableUpdateSize + schemaSize) {
return TSDB_CODE_INVALID_MSG_LEN;
}
memcpy(pData, pTable, tsSuperTableUpdateSize);
memcpy(pData + tsSuperTableUpdateSize, pTable->schema, schemaSize);
return tsSuperTableUpdateSize + schemaSize;
memcpy(pOper->rowData, pStable, tsSuperTableUpdateSize);
memcpy(pOper->rowData + tsSuperTableUpdateSize, pStable->schema, schemaSize);
pOper->rowSize = tsSuperTableUpdateSize + schemaSize;
return TSDB_CODE_SUCCESS;
}
static void *mgmtSuperTableActionDecode(void *pData) {
assert(pData != NULL);
static int32_t mgmtSuperTableActionDecode(SSdbOperDesc *pOper) {
assert(pOper->rowData != NULL);
SSuperTableObj *pTable = (SSuperTableObj *) malloc(sizeof(SSuperTableObj));
if (pTable == NULL) {
return NULL;
}
memset(pTable, 0, sizeof(SSuperTableObj));
memcpy(pTable, pData, tsSuperTableUpdateSize);
SSuperTableObj *pStable = (SSuperTableObj *) calloc(1, sizeof(SSuperTableObj));
if (pStable == NULL) return TSDB_CODE_SERV_OUT_OF_MEMORY;
int32_t schemaSize = sizeof(SSchema) * (pTable->numOfColumns + pTable->numOfTags);
pTable->schema = malloc(schemaSize);
if (pTable->schema == NULL) {
mgmtDestroySuperTable(pTable);
return NULL;
memcpy(pStable, pOper->rowData, tsSuperTableUpdateSize);
int32_t schemaSize = sizeof(SSchema) * (pStable->numOfColumns + pStable->numOfTags);
pStable->schema = malloc(schemaSize);
if (pStable->schema == NULL) {
mgmtDestroySuperTable(pStable);
return -1;
}
memcpy(pTable->schema, pData + tsSuperTableUpdateSize, schemaSize);
return (void *) pTable;
memcpy(pStable->schema, pOper->rowData + tsSuperTableUpdateSize, schemaSize);
pOper->pObj = pStable;
return TSDB_CODE_SUCCESS;
}
int32_t mgmtInitSuperTables() {
void *pNode = NULL;
void *pLastNode = NULL;
SSuperTableObj *pTable = NULL;
SSuperTableObj tObj;
tsSuperTableUpdateSize = tObj.updateEnd - (int8_t *)&tObj;
......@@ -126,7 +117,7 @@ int32_t mgmtInitSuperTables() {
.tableName = "stables",
.hashSessions = TSDB_MAX_SUPER_TABLES,
.maxRowSize = tsSuperTableUpdateSize + sizeof(SSchema) * TSDB_MAX_COLUMNS,
.keyType = SDB_KEYTYPE_STRING,
.keyType = SDB_KEY_TYPE_STRING,
.insertFp = mgmtSuperTableActionInsert,
.deleteFp = mgmtSuperTableActionDelete,
.updateFp = mgmtSuperTableActionUpdate,
......@@ -141,24 +132,6 @@ int32_t mgmtInitSuperTables() {
return -1;
}
pNode = NULL;
while (1) {
pNode = sdbFetchRow(tsSuperTableSdb, pNode, (void **) &pTable);
if (pTable == NULL) {
break;
}
SDbObj *pDb = mgmtGetDbByTableId(pTable->tableId);
if (pDb == NULL) {
mError("super table:%s, failed to get db, discard it", pTable->tableId);
sdbDeleteRow(tsSuperTableSdb, pTable, SDB_OPER_DISK);
pNode = pLastNode;
continue;
}
mgmtAddSuperTableIntoDb(pDb);
}
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_METRIC, mgmtGetShowSuperTableMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_METRIC, mgmtRetrieveShowSuperTables);
......@@ -171,7 +144,7 @@ void mgmtCleanUpSuperTables() {
}
int32_t mgmtCreateSuperTable(SCMCreateTableMsg *pCreate) {
SSuperTableObj *pStable = (SSuperTableObj *)calloc(sizeof(SSuperTableObj), 1);
SSuperTableObj *pStable = (SSuperTableObj *)calloc(1, sizeof(SSuperTableObj));
if (pStable == NULL) {
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
......@@ -203,13 +176,21 @@ int32_t mgmtCreateSuperTable(SCMCreateTableMsg *pCreate) {
tschema[col].bytes = htons(tschema[col].bytes);
}
if (sdbInsertRow(tsSuperTableSdb, pStable, SDB_OPER_GLOBAL) < 0) {
mError("stable:%s, update sdb error", pStable->tableId);
SSdbOperDesc oper = {
.type = SDB_OPER_TYPE_GLOBAL,
.table = tsSuperTableSdb,
.pObj = pStable,
.rowSize = sizeof(SSuperTableObj) + schemaSize
};
int32_t code = sdbInsertRow(&oper);
if (code != TSDB_CODE_SUCCESS) {
mgmtDestroySuperTable(pStable);
return TSDB_CODE_SDB_ERROR;
} else {
mLPrint("stable:%s, is created, tags:%d cols:%d", pStable->tableId, pStable->numOfTags, pStable->numOfColumns);
return TSDB_CODE_SUCCESS;
}
mPrint("stable:%s, is created, tags:%d cols:%d", pStable->tableId, pStable->numOfTags, pStable->numOfColumns);
return TSDB_CODE_SUCCESS;
}
int32_t mgmtDropSuperTable(SQueuedMsg *newMsg, SDbObj *pDb, SSuperTableObj *pStable) {
......@@ -217,10 +198,14 @@ int32_t mgmtDropSuperTable(SQueuedMsg *newMsg, SDbObj *pDb, SSuperTableObj *pSta
mError("stable:%s, numOfTables:%d not 0", pStable->tableId, pStable->numOfTables);
return TSDB_CODE_OTHERS;
} else {
//TODO: drop child tables
mError("stable:%s, is dropped from sdb", pStable->tableId);
mgmtRemoveSuperTableFromDb(pDb);
return TSDB_CODE_OTHERS;
SSdbOperDesc oper = {
.type = SDB_OPER_TYPE_GLOBAL,
.table = tsSuperTableSdb,
.pObj = pStable
};
int32_t code = sdbDeleteRow(&oper);
mLPrint("stable:%s, is dropped from sdb, result:%s", pStable->tableId, tstrerror(code));
return code;
}
}
......@@ -229,10 +214,9 @@ void* mgmtGetSuperTable(char *tableId) {
}
void *mgmtGetSuperTableVgroup(SSuperTableObj *pStable) {
//TODO get vgroup of dnodes
SCMSuperTableInfoRsp *rsp = rpcMallocCont(sizeof(SCMSuperTableInfoRsp) + sizeof(uint32_t) * mgmtGetDnodesNum());
rsp->numOfDnodes = 1;
rsp->dnodeIps[0] = 0;
rsp->numOfDnodes = htonl(1);
rsp->dnodeIps[0] = htonl(inet_addr(tsPrivateIp));
return rsp;
}
......@@ -289,7 +273,7 @@ int32_t mgmtAddSuperTableTag(SSuperTableObj *pStable, SSchema schema[], int32_t
pStable->sversion++;
pAcct->acctInfo.numOfTimeSeries += (ntags * pStable->numOfTables);
sdbUpdateRow(tsSuperTableSdb, pStable, tsSuperTableUpdateSize, SDB_OPER_GLOBAL);
// sdbUpdateRow(tsSuperTableSdb, pStable, tsSuperTableUpdateSize, SDB_OPER_GLOBAL);
mTrace("Succeed to add tag column %s to table %s", schema[0].name, pStable->tableId);
return TSDB_CODE_SUCCESS;
......@@ -322,7 +306,7 @@ int32_t mgmtDropSuperTableTag(SSuperTableObj *pStable, char *tagName) {
int32_t schemaSize = sizeof(SSchema) * (pStable->numOfTags + pStable->numOfColumns);
pStable->schema = realloc(pStable->schema, schemaSize);
sdbUpdateRow(tsSuperTableSdb, pStable, tsSuperTableUpdateSize, SDB_OPER_GLOBAL);
// sdbUpdateRow(tsSuperTableSdb, pStable, tsSuperTableUpdateSize, SDB_OPER_GLOBAL);
return TSDB_CODE_SUCCESS;
}
......@@ -354,7 +338,8 @@ int32_t mgmtModifySuperTableTagNameByName(SSuperTableObj *pStable, char *oldTagN
// mgmtSuperTableActionEncode(pStable, msg, size, &rowSize);
int32_t ret = sdbUpdateRow(tsSuperTableSdb, msg, tsSuperTableUpdateSize, SDB_OPER_GLOBAL);
int32_t ret = 0;
// int32_t ret = sdbUpdateRow(tsSuperTableSdb, msg, tsSuperTableUpdateSize, SDB_OPER_GLOBAL);
tfree(msg);
if (ret < 0) {
......@@ -416,7 +401,7 @@ int32_t mgmtAddSuperTableColumn(SSuperTableObj *pStable, SSchema schema[], int32
pStable->sversion++;
pAcct->acctInfo.numOfTimeSeries += (ncols * pStable->numOfTables);
sdbUpdateRow(tsSuperTableSdb, pStable, tsSuperTableUpdateSize, SDB_OPER_GLOBAL);
// sdbUpdateRow(tsSuperTableSdb, pStable, tsSuperTableUpdateSize, SDB_OPER_GLOBAL);
return TSDB_CODE_SUCCESS;
}
......@@ -449,7 +434,7 @@ int32_t mgmtDropSuperTableColumnByName(SSuperTableObj *pStable, char *colName) {
pStable->schema = realloc(pStable->schema, schemaSize);
pAcct->acctInfo.numOfTimeSeries -= (pStable->numOfTables);
sdbUpdateRow(tsSuperTableSdb, pStable, tsSuperTableUpdateSize, SDB_OPER_GLOBAL);
// sdbUpdateRow(tsSuperTableSdb, pStable, tsSuperTableUpdateSize, SDB_OPER_GLOBAL);
return TSDB_CODE_SUCCESS;
}
......@@ -588,14 +573,19 @@ void mgmtDropAllSuperTables(SDbObj *pDropDb) {
}
if (strncmp(pDropDb->name, pTable->tableId, dbNameLen) == 0) {
sdbDeleteRow(tsSuperTableSdb, pTable, SDB_OPER_GLOBAL);
SSdbOperDesc oper = {
.type = SDB_OPER_TYPE_LOCAL,
.table = tsSuperTableSdb,
.pObj = pTable,
};
sdbDeleteRow(&oper);
pNode = pLastNode;
numOfTables ++;
continue;
}
}
mTrace("db:%s, all super tables:%d is dropped", pDropDb->name, numOfTables);
mTrace("db:%s, all super tables:%d is dropped from sdb", pDropDb->name, numOfTables);
}
void mgmtAddTableIntoSuperTable(SSuperTableObj *pStable) {
......
......@@ -78,8 +78,6 @@ int32_t mgmtInitTables() {
return code;
}
mgmtSetVgroupIdPool();
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CREATE_TABLE, mgmtProcessCreateTableMsg);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_DROP_TABLE, mgmtProcessDropTableMsg);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_ALTER_TABLE, mgmtProcessAlterTableMsg);
......@@ -202,13 +200,13 @@ int32_t mgmtGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn)
pShow->bytes[cols] = TSDB_TABLE_NAME_LEN;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "table_name");
strcpy(pSchema[cols].name, "table name");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "created_time");
strcpy(pSchema[cols].name, "create time");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
......@@ -220,7 +218,7 @@ int32_t mgmtGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn)
pShow->bytes[cols] = TSDB_TABLE_NAME_LEN;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "stable_name");
strcpy(pSchema[cols].name, "stable name");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
......@@ -238,9 +236,6 @@ int32_t mgmtGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn)
return 0;
}
/*
* remove the hole in result set
*/
static void mgmtVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_t capacity, SShowObj *pShow) {
if (rows < capacity) {
for (int32_t i = 0; i < numOfCols; ++i) {
......@@ -434,11 +429,13 @@ void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) {
pTable = mgmtCreateChildTable(pCreate, pVgroup, sid);
if (pTable == NULL) {
mgmtSendSimpleResp(pMsg->thandle, terrno);
mgmtFreeQueuedMsg(newMsg);
return;
}
pMDCreate = mgmtBuildCreateChildTableMsg(pCreate, (SChildTableObj *) pTable);
if (pMDCreate == NULL) {
mgmtSendSimpleResp(pMsg->thandle, terrno);
mgmtFreeQueuedMsg(newMsg);
return;
}
} else {
......@@ -446,11 +443,13 @@ void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) {
pTable = mgmtCreateNormalTable(pCreate, pVgroup, sid);
if (pTable == NULL) {
mgmtSendSimpleResp(pMsg->thandle, terrno);
mgmtFreeQueuedMsg(newMsg);
return;
}
pMDCreate = mgmtBuildCreateNormalTableMsg((SNormalTableObj *) pTable);
if (pMDCreate == NULL) {
mgmtSendSimpleResp(pMsg->thandle, terrno);
mgmtFreeQueuedMsg(newMsg);
return;
}
}
......@@ -761,9 +760,9 @@ static void mgmtProcessCreateTableRsp(SRpcMsg *rpcMsg) {
if (rpcMsg->code != TSDB_CODE_SUCCESS) {
if (pTable->type == TSDB_CHILD_TABLE) {
sdbDeleteRow(tsChildTableSdb, pTable, SDB_OPER_GLOBAL);
// sdbDeleteRow(tsChildTableSdb, pTable, SDB_OPER_GLOBAL);
} else if (pTable->type == TSDB_NORMAL_TABLE){
sdbDeleteRow(tsNormalTableSdb, pTable, SDB_OPER_GLOBAL);
// sdbDeleteRow(tsNormalTableSdb, pTable, SDB_OPER_GLOBAL);
} else {}
mError("table:%s, failed to create in dnode, reason:%s", pTable->tableId, tstrerror(rpcMsg->code));
mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code);
......@@ -817,19 +816,19 @@ static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg) {
}
if (pTable->type == TSDB_CHILD_TABLE) {
if (sdbDeleteRow(tsChildTableSdb, pTable, SDB_OPER_GLOBAL) < 0) {
mError("table:%s, update ctables sdb error", pTable->tableId);
mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SDB_ERROR);
free(queueMsg);
return;
}
// if (sdbDeleteRow(tsChildTableSdb, pTable, SDB_OPER_GLOBAL) < 0) {
// mError("table:%s, update ctables sdb error", pTable->tableId);
// mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SDB_ERROR);
// free(queueMsg);
// return;
// }
} else if (pTable->type == TSDB_NORMAL_TABLE){
if (sdbDeleteRow(tsNormalTableSdb, pTable, SDB_OPER_GLOBAL) < 0) {
mError("table:%s, update ntables sdb error", pTable->tableId);
mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SDB_ERROR);
free(queueMsg);
return;
}
// if (sdbDeleteRow(tsNormalTableSdb, pTable, SDB_OPER_GLOBAL) < 0) {
// mError("table:%s, update ntables sdb error", pTable->tableId);
// mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SDB_ERROR);
// free(queueMsg);
// return;
// }
}
if (pVgroup->numOfTables <= 0) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册