提交 2d149dae 编写于 作者: S slguan

[TD-73] fix error in table

上级 aeac5f9e
...@@ -250,6 +250,8 @@ typedef struct { ...@@ -250,6 +250,8 @@ typedef struct {
void *ahandle; void *ahandle;
void *thandle; void *thandle;
void *pCont; void *pCont;
SAcctObj *pAcct;
SDnodeObj*pDnode;
SUserObj *pUser; SUserObj *pUser;
SDbObj *pDb; SDbObj *pDb;
SVgObj *pVgroup; SVgObj *pVgroup;
......
...@@ -791,6 +791,8 @@ void mgmtFreeQueuedMsg(SQueuedMsg *pMsg) { ...@@ -791,6 +791,8 @@ void mgmtFreeQueuedMsg(SQueuedMsg *pMsg) {
if (pMsg->pDb) mgmtDecDbRef(pMsg->pDb); if (pMsg->pDb) mgmtDecDbRef(pMsg->pDb);
if (pMsg->pVgroup) mgmtDecVgroupRef(pMsg->pVgroup); if (pMsg->pVgroup) mgmtDecVgroupRef(pMsg->pVgroup);
if (pMsg->pTable) mgmtDecTableRef(pMsg->pTable); if (pMsg->pTable) mgmtDecTableRef(pMsg->pTable);
// if (pMsg->pAcct) acctDecRef(pMsg->pAcct);
// if (pMsg->pDnode) mgmtDecTableRef(pMsg->pDnode);
free(pMsg); free(pMsg);
} }
} }
......
...@@ -49,14 +49,13 @@ static void * tsChildTableSdb; ...@@ -49,14 +49,13 @@ static void * tsChildTableSdb;
static void * tsSuperTableSdb; static void * tsSuperTableSdb;
static int32_t tsChildTableUpdateSize; static int32_t tsChildTableUpdateSize;
static int32_t tsSuperTableUpdateSize; static int32_t tsSuperTableUpdateSize;
static void * mgmtGetChildTable(char *tableId); static void * mgmtGetChildTable(char *tableId);
static void * mgmtGetSuperTable(char *tableId); static void * mgmtGetSuperTable(char *tableId);
static void mgmtDropAllChildTablesInStable(SSuperTableObj *pStable);
void mgmtGetChildTableMeta(SQueuedMsg *pMsg); static int32_t mgmtGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
void mgmtAlterChildTable(SQueuedMsg *pMsg, SChildTableObj *pTable); static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *pConn);
void mgmtDropAllChildTables(SDbObj *pDropDb); static int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, void *pConn);
void mgmtDropAllChildTablesInStable(SSuperTableObj *pStable); static int32_t mgmtGetShowSuperTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static void mgmtProcessCreateTableMsg(SQueuedMsg *queueMsg); static void mgmtProcessCreateTableMsg(SQueuedMsg *queueMsg);
static void mgmtProcessCreateSuperTableMsg(SQueuedMsg *pMsg); static void mgmtProcessCreateSuperTableMsg(SQueuedMsg *pMsg);
...@@ -65,28 +64,22 @@ static void mgmtProcessCreateChildTableRsp(SRpcMsg *rpcMsg); ...@@ -65,28 +64,22 @@ static void mgmtProcessCreateChildTableRsp(SRpcMsg *rpcMsg);
static void mgmtProcessDropTableMsg(SQueuedMsg *queueMsg); static void mgmtProcessDropTableMsg(SQueuedMsg *queueMsg);
static void mgmtProcessDropSuperTableMsg(SQueuedMsg *pMsg); static void mgmtProcessDropSuperTableMsg(SQueuedMsg *pMsg);
static void mgmtProcessDropStableRsp(SRpcMsg *rpcMsg); static void mgmtProcessDropSuperTableRsp(SRpcMsg *rpcMsg);
static void mgmtProcessDropChildTableMsg(SQueuedMsg *pMsg); static void mgmtProcessDropChildTableMsg(SQueuedMsg *pMsg);
static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg); static void mgmtProcessDropChildTableRsp(SRpcMsg *rpcMsg);
void mgmtGetSuperTableMeta(SQueuedMsg *pMsg);
void mgmtAlterSuperTable(SQueuedMsg *pMsg, SSuperTableObj *pTable);
void mgmtDropAllSuperTables(SDbObj *pDropDb);
int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable);
static void mgmtProcessSuperTableVgroupMsg(SQueuedMsg *queueMsg); static void mgmtProcessSuperTableVgroupMsg(SQueuedMsg *queueMsg);
static int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static int32_t mgmtGetShowSuperTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static void mgmtProcessMultiTableMetaMsg(SQueuedMsg *queueMsg); static void mgmtProcessMultiTableMetaMsg(SQueuedMsg *queueMsg);
static void mgmtProcessAlterTableRsp(SRpcMsg *rpcMsg);
static void mgmtProcessTableCfgMsg(SRpcMsg *rpcMsg); static void mgmtProcessTableCfgMsg(SRpcMsg *rpcMsg);
static int32_t mgmtGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static void mgmtProcessTableMetaMsg(SQueuedMsg *queueMsg);
static void mgmtGetSuperTableMeta(SQueuedMsg *pMsg);
static void mgmtGetChildTableMeta(SQueuedMsg *pMsg);
static void mgmtProcessAlterTableMsg(SQueuedMsg *queueMsg); static void mgmtProcessAlterTableMsg(SQueuedMsg *queueMsg);
static void mgmtProcessTableMetaMsg(SQueuedMsg *queueMsg); static void mgmtAlterChildTable(SQueuedMsg *pMsg);
static void mgmtAlterSuperTable(SQueuedMsg *pMsg);
static void mgmtProcessAlterTableRsp(SRpcMsg *rpcMsg);
static void mgmtDestroyChildTable(SChildTableObj *pTable) { static void mgmtDestroyChildTable(SChildTableObj *pTable) {
tfree(pTable->schema); tfree(pTable->schema);
...@@ -465,31 +458,32 @@ int32_t mgmtInitTables() { ...@@ -465,31 +458,32 @@ int32_t mgmtInitTables() {
} }
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_TABLES_META, mgmtProcessMultiTableMetaMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_TABLES_META, mgmtProcessMultiTableMetaMsg);
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_TABLE, mgmtGetShowTableMeta); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CREATE_TABLE, mgmtProcessCreateTableMsg);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_TABLE, mgmtRetrieveShowTables); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_DROP_TABLE, mgmtProcessDropTableMsg);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_ALTER_TABLE, mgmtProcessAlterTableMsg);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_TABLE_META, mgmtProcessTableMetaMsg);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_STABLE_VGROUP, mgmtProcessSuperTableVgroupMsg);
mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP, mgmtProcessCreateChildTableRsp); mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP, mgmtProcessCreateChildTableRsp);
mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_TABLE_RSP, mgmtProcessDropTableRsp); mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_TABLE_RSP, mgmtProcessDropChildTableRsp);
mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_STABLE_RSP, mgmtProcessDropSuperTableRsp);
mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP, mgmtProcessAlterTableRsp); mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP, mgmtProcessAlterTableRsp);
mgmtAddDServerMsgHandle(TSDB_MSG_TYPE_DM_CONFIG_TABLE, mgmtProcessTableCfgMsg); mgmtAddDServerMsgHandle(TSDB_MSG_TYPE_DM_CONFIG_TABLE, mgmtProcessTableCfgMsg);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_STABLE_VGROUP, mgmtProcessSuperTableVgroupMsg); mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_TABLE, mgmtGetShowTableMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_TABLE, mgmtRetrieveShowTables);
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_METRIC, mgmtGetShowSuperTableMeta); mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_METRIC, mgmtGetShowSuperTableMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_METRIC, mgmtRetrieveShowSuperTables); mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_METRIC, mgmtRetrieveShowSuperTables);
mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_STABLE_RSP, mgmtProcessDropStableRsp);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CREATE_TABLE, mgmtProcessCreateTableMsg);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_DROP_TABLE, mgmtProcessDropTableMsg);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_ALTER_TABLE, mgmtProcessAlterTableMsg);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_TABLE_META, mgmtProcessTableMetaMsg);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void * mgmtGetChildTable(char *tableId) { static void *mgmtGetChildTable(char *tableId) {
return sdbGetRow(tsChildTableSdb, tableId); return sdbGetRow(tsChildTableSdb, tableId);
} }
static void * mgmtGetSuperTable(char *tableId) { static void *mgmtGetSuperTable(char *tableId) {
return sdbGetRow(tsSuperTableSdb, tableId); return sdbGetRow(tsSuperTableSdb, tableId);
} }
...@@ -532,7 +526,7 @@ void mgmtCleanUpTables() { ...@@ -532,7 +526,7 @@ void mgmtCleanUpTables() {
mgmtCleanUpSuperTables(); mgmtCleanUpSuperTables();
} }
void mgmtExtractTableName(char* tableId, char* name) { static void mgmtExtractTableName(char* tableId, char* name) {
int pos = -1; int pos = -1;
int num = 0; int num = 0;
for (pos = 0; tableId[pos] != 0; ++pos) { for (pos = 0; tableId[pos] != 0; ++pos) {
...@@ -578,8 +572,6 @@ static void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) { ...@@ -578,8 +572,6 @@ static void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) {
static void mgmtProcessDropTableMsg(SQueuedMsg *pMsg) { static void mgmtProcessDropTableMsg(SQueuedMsg *pMsg) {
SCMDropTableMsg *pDrop = pMsg->pCont; SCMDropTableMsg *pDrop = pMsg->pCont;
if (mgmtCheckRedirect(pMsg->thandle)) return;
pMsg->pDb = mgmtGetDbByTableId(pDrop->tableId); pMsg->pDb = mgmtGetDbByTableId(pDrop->tableId);
if (pMsg->pDb == NULL || pMsg->pDb->dirty) { if (pMsg->pDb == NULL || pMsg->pDb->dirty) {
mError("table:%s, failed to drop table, db not selected", pDrop->tableId); mError("table:%s, failed to drop table, db not selected", pDrop->tableId);
...@@ -619,30 +611,22 @@ static void mgmtProcessAlterTableMsg(SQueuedMsg *pMsg) { ...@@ -619,30 +611,22 @@ static void mgmtProcessAlterTableMsg(SQueuedMsg *pMsg) {
SCMAlterTableMsg *pAlter = pMsg->pCont; SCMAlterTableMsg *pAlter = pMsg->pCont;
mTrace("table:%s, alter table msg is received from thandle:%p", pAlter->tableId, pMsg->thandle); mTrace("table:%s, alter table msg is received from thandle:%p", pAlter->tableId, pMsg->thandle);
if (mgmtCheckRedirect(pMsg->thandle)) return; pMsg->pDb = mgmtGetDbByTableId(pAlter->tableId);
if (pMsg->pDb == NULL || pMsg->pDb->dirty) {
if (!pMsg->pUser->writeAuth) {
mTrace("table:%s, failed to alter table, no rights", pAlter->tableId);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS);
return;
}
SDbObj *pDb = mgmtGetDbByTableId(pAlter->tableId);
if (pDb == NULL || pDb->dirty) {
mError("table:%s, failed to alter table, db not selected", pAlter->tableId); mError("table:%s, failed to alter table, db not selected", pAlter->tableId);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_DB_NOT_SELECTED); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_DB_NOT_SELECTED);
return; return;
} }
if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) { if (mgmtCheckIsMonitorDB(pMsg->pDb->name, tsMonitorDbName)) {
mError("table:%s, failed to alter table, its log db", pAlter->tableId); mError("table:%s, failed to alter table, its log db", pAlter->tableId);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_MONITOR_DB_FORBIDDEN); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_MONITOR_DB_FORBIDDEN);
return; return;
} }
STableInfo *pTable = mgmtGetTable(pAlter->tableId); pMsg->pTable = mgmtGetTable(pAlter->tableId);
if (pTable == NULL) { if (pMsg->pTable == NULL) {
mError("table:%s, failed to alter table, table not exist", pTable->tableId); mError("table:%s, failed to alter table, table not exist", pMsg->pTable->tableId);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE);
return; return;
} }
...@@ -651,7 +635,6 @@ static void mgmtProcessAlterTableMsg(SQueuedMsg *pMsg) { ...@@ -651,7 +635,6 @@ static void mgmtProcessAlterTableMsg(SQueuedMsg *pMsg) {
if (pAlter->numOfCols > 2) { if (pAlter->numOfCols > 2) {
mError("table:%s, error numOfCols:%d in alter table", pAlter->tableId, pAlter->numOfCols); mError("table:%s, error numOfCols:%d in alter table", pAlter->tableId, pAlter->numOfCols);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_APP_ERROR); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_APP_ERROR);
mgmtDecTableRef(pTable);
return; return;
} }
...@@ -659,12 +642,12 @@ static void mgmtProcessAlterTableMsg(SQueuedMsg *pMsg) { ...@@ -659,12 +642,12 @@ static void mgmtProcessAlterTableMsg(SQueuedMsg *pMsg) {
pAlter->schema[i].bytes = htons(pAlter->schema[i].bytes); pAlter->schema[i].bytes = htons(pAlter->schema[i].bytes);
} }
if (pTable->type == TSDB_SUPER_TABLE) { if (pMsg->pTable->type == TSDB_SUPER_TABLE) {
mTrace("table:%s, start to alter stable", pAlter->tableId); mTrace("table:%s, start to alter stable", pAlter->tableId);
mgmtAlterSuperTable(pMsg, (SSuperTableObj *)pTable); mgmtAlterSuperTable(pMsg);
} else { } else {
mTrace("table:%s, start to alter ctable", pAlter->tableId); mTrace("table:%s, start to alter ctable", pAlter->tableId);
mgmtAlterChildTable(pMsg, (SChildTableObj *)pTable); mgmtAlterChildTable(pMsg);
} }
} }
...@@ -744,7 +727,7 @@ static void mgmtProcessCreateSuperTableMsg(SQueuedMsg *pMsg) { ...@@ -744,7 +727,7 @@ static void mgmtProcessCreateSuperTableMsg(SQueuedMsg *pMsg) {
} }
} }
void mgmtProcessDropSuperTableMsg(SQueuedMsg *pMsg) { static void mgmtProcessDropSuperTableMsg(SQueuedMsg *pMsg) {
SSuperTableObj *pStable = (SSuperTableObj *)pMsg->pTable; SSuperTableObj *pStable = (SSuperTableObj *)pMsg->pTable;
if (pStable->numOfTables != 0) { if (pStable->numOfTables != 0) {
mError("stable:%s, numOfTables:%d not 0", pStable->info.tableId, pStable->numOfTables); mError("stable:%s, numOfTables:%d not 0", pStable->info.tableId, pStable->numOfTables);
...@@ -761,13 +744,6 @@ void mgmtProcessDropSuperTableMsg(SQueuedMsg *pMsg) { ...@@ -761,13 +744,6 @@ void mgmtProcessDropSuperTableMsg(SQueuedMsg *pMsg) {
} }
} }
static void *mgmtGetSuperTableVgroup(SSuperTableObj *pStable) {
SCMSTableVgroupRspMsg *rsp = rpcMallocCont(sizeof(SCMSTableVgroupRspMsg) + sizeof(uint32_t) * mgmtGetDnodesNum());
rsp->numOfDnodes = htonl(1);
rsp->dnodeIps[0] = htonl(inet_addr(tsPrivateIp));
return rsp;
}
static int32_t mgmtFindSuperTableTagIndex(SSuperTableObj *pStable, const char *tagName) { static int32_t mgmtFindSuperTableTagIndex(SSuperTableObj *pStable, const char *tagName) {
for (int32_t i = 0; i < pStable->numOfTags; i++) { for (int32_t i = 0; i < pStable->numOfTags; i++) {
SSchema *schema = (SSchema *)(pStable->schema + (pStable->numOfColumns + i) * sizeof(SSchema)); SSchema *schema = (SSchema *)(pStable->schema + (pStable->numOfColumns + i) * sizeof(SSchema));
...@@ -793,18 +769,6 @@ static int32_t mgmtAddSuperTableTag(SSuperTableObj *pStable, SSchema schema[], i ...@@ -793,18 +769,6 @@ static int32_t mgmtAddSuperTableTag(SSuperTableObj *pStable, SSchema schema[], i
} }
} }
SDbObj *pDb = mgmtGetDbByTableId(pStable->info.tableId);
if (pDb == NULL) {
mError("meter: %s not belongs to any database", pStable->info.tableId);
return TSDB_CODE_APP_ERROR;
}
SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct);
if (pAcct == NULL) {
mError("DB: %s not belongs to andy account", pDb->name);
return TSDB_CODE_APP_ERROR;
}
int32_t schemaSize = sizeof(SSchema) * (pStable->numOfTags + pStable->numOfColumns); int32_t schemaSize = sizeof(SSchema) * (pStable->numOfTags + pStable->numOfColumns);
pStable->schema = realloc(pStable->schema, schemaSize + sizeof(SSchema) * ntags); pStable->schema = realloc(pStable->schema, schemaSize + sizeof(SSchema) * ntags);
...@@ -820,7 +784,6 @@ static int32_t mgmtAddSuperTableTag(SSuperTableObj *pStable, SSchema schema[], i ...@@ -820,7 +784,6 @@ static int32_t mgmtAddSuperTableTag(SSuperTableObj *pStable, SSchema schema[], i
pStable->numOfColumns += ntags; pStable->numOfColumns += ntags;
pStable->sversion++; pStable->sversion++;
pAcct->acctInfo.numOfTimeSeries += (ntags * pStable->numOfTables);
// sdbUpdateRow(tsSuperTableSdb, pStable, tsSuperTableUpdateSize, SDB_OPER_GLOBAL); // sdbUpdateRow(tsSuperTableSdb, pStable, tsSuperTableUpdateSize, SDB_OPER_GLOBAL);
mTrace("Succeed to add tag column %s to table %s", schema[0].name, pStable->info.tableId); mTrace("Succeed to add tag column %s to table %s", schema[0].name, pStable->info.tableId);
...@@ -833,18 +796,6 @@ static int32_t mgmtProcessDropSuperTableMsgTag(SSuperTableObj *pStable, char *ta ...@@ -833,18 +796,6 @@ static int32_t mgmtProcessDropSuperTableMsgTag(SSuperTableObj *pStable, char *ta
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;
} }
SDbObj *pDb = mgmtGetDbByTableId(pStable->info.tableId);
if (pDb == NULL) {
mError("table: %s not belongs to any database", pStable->info.tableId);
return TSDB_CODE_APP_ERROR;
}
SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct);
if (pAcct == NULL) {
mError("DB: %s not belongs to any account", pDb->name);
return TSDB_CODE_APP_ERROR;
}
memmove(pStable->schema + sizeof(SSchema) * col, pStable->schema + sizeof(SSchema) * (col + 1), memmove(pStable->schema + sizeof(SSchema) * col, pStable->schema + sizeof(SSchema) * (col + 1),
sizeof(SSchema) * (pStable->numOfColumns + pStable->numOfTags - col - 1)); sizeof(SSchema) * (pStable->numOfColumns + pStable->numOfTags - col - 1));
...@@ -921,15 +872,15 @@ static int32_t mgmtAddSuperTableColumn(SSuperTableObj *pStable, SSchema schema[] ...@@ -921,15 +872,15 @@ static int32_t mgmtAddSuperTableColumn(SSuperTableObj *pStable, SSchema schema[]
} }
} }
SDbObj *pDb = mgmtGetDbByTableId(pStable->info.tableId); pMsg->pDb = mgmtGetDbByTableId(pStable->info.tableId);
if (pDb == NULL) { if (pMsg->pDb == NULL) {
mError("meter: %s not belongs to any database", pStable->info.tableId); mError("meter: %s not belongs to any database", pStable->info.tableId);
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;
} }
SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct); pMsg->pAcct = acctGetAcct(pMsg->pDb->cfg.acct);
if (pAcct == NULL) { if (pMsg->pAcct == NULL) {
mError("DB: %s not belongs to andy account", pDb->name); mError("DB: %s not belongs to andy account", pMsg->pDb->name);
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;
} }
...@@ -948,7 +899,7 @@ static int32_t mgmtAddSuperTableColumn(SSuperTableObj *pStable, SSchema schema[] ...@@ -948,7 +899,7 @@ static int32_t mgmtAddSuperTableColumn(SSuperTableObj *pStable, SSchema schema[]
pStable->numOfColumns += ncols; pStable->numOfColumns += ncols;
pStable->sversion++; pStable->sversion++;
pAcct->acctInfo.numOfTimeSeries += (ncols * pStable->numOfTables); pMsg->pAcct->acctInfo.numOfTimeSeries += (ncols * pStable->numOfTables);
// sdbUpdateRow(tsSuperTableSdb, pStable, tsSuperTableUpdateSize, SDB_OPER_GLOBAL); // sdbUpdateRow(tsSuperTableSdb, pStable, tsSuperTableUpdateSize, SDB_OPER_GLOBAL);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -960,15 +911,15 @@ static int32_t mgmtProcessDropSuperTableMsgColumnByName(SSuperTableObj *pStable, ...@@ -960,15 +911,15 @@ static int32_t mgmtProcessDropSuperTableMsgColumnByName(SSuperTableObj *pStable,
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;
} }
SDbObj *pDb = mgmtGetDbByTableId(pStable->info.tableId); pMsg->pDb = mgmtGetDbByTableId(pStable->info.tableId);
if (pDb == NULL) { if (pMsg->pDb == NULL) {
mError("table: %s not belongs to any database", pStable->info.tableId); mError("meter: %s not belongs to any database", pStable->info.tableId);
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;
} }
SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct); pMsg->pAcct = acctGetAcct(pMsg->pDb->cfg.acct);
if (pAcct == NULL) { if (pMsg->pAcct == NULL) {
mError("DB: %s not belongs to any account", pDb->name); mError("DB: %s not belongs to andy account", pMsg->pDb->name);
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;
} }
...@@ -981,12 +932,13 @@ static int32_t mgmtProcessDropSuperTableMsgColumnByName(SSuperTableObj *pStable, ...@@ -981,12 +932,13 @@ static int32_t mgmtProcessDropSuperTableMsgColumnByName(SSuperTableObj *pStable,
int32_t schemaSize = sizeof(SSchema) * (pStable->numOfTags + pStable->numOfColumns); int32_t schemaSize = sizeof(SSchema) * (pStable->numOfTags + pStable->numOfColumns);
pStable->schema = realloc(pStable->schema, schemaSize); pStable->schema = realloc(pStable->schema, schemaSize);
pAcct->acctInfo.numOfTimeSeries -= (pStable->numOfTables); pMsg->pAcct->acctInfo.numOfTimeSeries -= (pStable->numOfTables);
// sdbUpdateRow(tsSuperTableSdb, pStable, tsSuperTableUpdateSize, SDB_OPER_GLOBAL); // sdbUpdateRow(tsSuperTableSdb, pStable, tsSuperTableUpdateSize, SDB_OPER_GLOBAL);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
// show super tables
static int32_t mgmtGetShowSuperTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { static int32_t mgmtGetShowSuperTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
SDbObj *pDb = mgmtGetDb(pShow->db); SDbObj *pDb = mgmtGetDb(pShow->db);
if (pDb == NULL) { if (pDb == NULL) {
...@@ -1039,6 +991,7 @@ static int32_t mgmtGetShowSuperTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, ...@@ -1039,6 +991,7 @@ static int32_t mgmtGetShowSuperTableMeta(STableMetaMsg *pMeta, SShowObj *pShow,
return 0; return 0;
} }
// retrieve super tables
int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, void *pConn) { int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
int32_t numOfRows = 0; int32_t numOfRows = 0;
char * pWrite; char * pWrite;
...@@ -1058,6 +1011,7 @@ int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, v ...@@ -1058,6 +1011,7 @@ int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, v
char stableName[TSDB_TABLE_NAME_LEN] = {0}; char stableName[TSDB_TABLE_NAME_LEN] = {0};
while (numOfRows < rows) { while (numOfRows < rows) {
mgmtDecTableRef(pTable);
pShow->pNode = sdbFetchRow(tsSuperTableSdb, pShow->pNode, (void **) &pTable); pShow->pNode = sdbFetchRow(tsSuperTableSdb, pShow->pNode, (void **) &pTable);
if (pTable == NULL) break; if (pTable == NULL) break;
if (strncmp(pTable->info.tableId, prefix, prefixLen)) { if (strncmp(pTable->info.tableId, prefix, prefixLen)) {
...@@ -1094,7 +1048,6 @@ int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, v ...@@ -1094,7 +1048,6 @@ int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, v
cols++; cols++;
numOfRows++; numOfRows++;
mgmtDecTableRef(pTable);
} }
pShow->numOfReads += numOfRows; pShow->numOfReads += numOfRows;
...@@ -1132,7 +1085,7 @@ void mgmtDropAllSuperTables(SDbObj *pDropDb) { ...@@ -1132,7 +1085,7 @@ void mgmtDropAllSuperTables(SDbObj *pDropDb) {
mTrace("db:%s, all super tables:%d is dropped from sdb", pDropDb->name, numOfTables); mTrace("db:%s, all super tables:%d is dropped from sdb", pDropDb->name, numOfTables);
} }
int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable) { static int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable) {
int32_t numOfCols = pTable->numOfColumns + pTable->numOfTags; int32_t numOfCols = pTable->numOfColumns + pTable->numOfTags;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
strcpy(pSchema->name, pTable->schema[i].name); strcpy(pSchema->name, pTable->schema[i].name);
...@@ -1145,7 +1098,7 @@ int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable) { ...@@ -1145,7 +1098,7 @@ int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable) {
return (pTable->numOfColumns + pTable->numOfTags) * sizeof(SSchema); return (pTable->numOfColumns + pTable->numOfTags) * sizeof(SSchema);
} }
void mgmtGetSuperTableMeta(SQueuedMsg *pMsg) { static void mgmtGetSuperTableMeta(SQueuedMsg *pMsg) {
SSuperTableObj *pTable = (SSuperTableObj *)pMsg->pTable; SSuperTableObj *pTable = (SSuperTableObj *)pMsg->pTable;
STableMetaMsg *pMeta = rpcMallocCont(sizeof(STableMetaMsg) + sizeof(SSchema) * TSDB_MAX_COLUMNS); STableMetaMsg *pMeta = rpcMallocCont(sizeof(STableMetaMsg) + sizeof(SSchema) * TSDB_MAX_COLUMNS);
pMeta->uid = htobe64(pTable->uid); pMeta->uid = htobe64(pTable->uid);
...@@ -1170,26 +1123,31 @@ void mgmtGetSuperTableMeta(SQueuedMsg *pMsg) { ...@@ -1170,26 +1123,31 @@ void mgmtGetSuperTableMeta(SQueuedMsg *pMsg) {
static void mgmtProcessSuperTableVgroupMsg(SQueuedMsg *pMsg) { static void mgmtProcessSuperTableVgroupMsg(SQueuedMsg *pMsg) {
SCMSTableVgroupMsg *pInfo = pMsg->pCont; SCMSTableVgroupMsg *pInfo = pMsg->pCont;
STableInfo *pTable = mgmtGetSuperTable(pInfo->tableId); pMsg->pTable = mgmtGetSuperTable(pInfo->tableId);
if (pTable == NULL) { if (pMsg->pTable == NULL) {
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE);
return; return;
} }
SCMSTableVgroupRspMsg *pRsp = mgmtGetSuperTableVgroup((SSuperTableObj *) pTable); SCMSTableVgroupRspMsg *pRsp = rpcMallocCont(sizeof(SCMSTableVgroupRspMsg) + sizeof(uint32_t) * mgmtGetDnodesNum());
if (pRsp != NULL) { if (pRsp == NULL) {
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE);
return;
}
pRsp->numOfDnodes = htonl(1);
pRsp->dnodeIps[0] = htonl(inet_addr(tsPrivateIp));
int32_t msgLen = sizeof(SSuperTableObj) + htonl(pRsp->numOfDnodes) * sizeof(int32_t); int32_t msgLen = sizeof(SSuperTableObj) + htonl(pRsp->numOfDnodes) * sizeof(int32_t);
SRpcMsg rpcRsp = {0}; SRpcMsg rpcRsp = {0};
rpcRsp.handle = pMsg->thandle; rpcRsp.handle = pMsg->thandle;
rpcRsp.pCont = pRsp; rpcRsp.pCont = pRsp;
rpcRsp.contLen = msgLen; rpcRsp.contLen = msgLen;
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
} else {
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE);
}
} }
void mgmtAlterSuperTable(SQueuedMsg *pMsg, SSuperTableObj *pTable) { static void mgmtAlterSuperTable(SQueuedMsg *pMsg) {
SSuperTableObj *pTable = (SSuperTableObj *)pMsg->pTable;
int32_t code = TSDB_CODE_OPS_NOT_SUPPORT; int32_t code = TSDB_CODE_OPS_NOT_SUPPORT;
SCMAlterTableMsg *pAlter = pMsg->pCont; SCMAlterTableMsg *pAlter = pMsg->pCont;
...@@ -1208,7 +1166,7 @@ void mgmtAlterSuperTable(SQueuedMsg *pMsg, SSuperTableObj *pTable) { ...@@ -1208,7 +1166,7 @@ void mgmtAlterSuperTable(SQueuedMsg *pMsg, SSuperTableObj *pTable) {
mgmtSendSimpleResp(pMsg->thandle, code); mgmtSendSimpleResp(pMsg->thandle, code);
} }
static void mgmtProcessDropStableRsp(SRpcMsg *rpcMsg) { static void mgmtProcessDropSuperTableRsp(SRpcMsg *rpcMsg) {
mTrace("drop stable rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code); mTrace("drop stable rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code);
} }
...@@ -1280,7 +1238,7 @@ static void *mgmtBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableOb ...@@ -1280,7 +1238,7 @@ static void *mgmtBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableOb
static SChildTableObj* mgmtDoCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t tid) { static SChildTableObj* mgmtDoCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t tid) {
SChildTableObj *pTable = (SChildTableObj *) calloc(1, sizeof(SChildTableObj)); SChildTableObj *pTable = (SChildTableObj *) calloc(1, sizeof(SChildTableObj));
if (pTable == NULL) { if (pTable == NULL) {
mError("ctable:%s, failed to alloc memory", pCreate->tableId); mError("table:%s, failed to alloc memory", pCreate->tableId);
terrno = TSDB_CODE_SERV_OUT_OF_MEMORY; terrno = TSDB_CODE_SERV_OUT_OF_MEMORY;
return NULL; return NULL;
} }
...@@ -1300,7 +1258,7 @@ static SChildTableObj* mgmtDoCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj ...@@ -1300,7 +1258,7 @@ static SChildTableObj* mgmtDoCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj
char *pTagData = (char *) pCreate->schema; // it is a tag key char *pTagData = (char *) pCreate->schema; // it is a tag key
SSuperTableObj *pSuperTable = mgmtGetSuperTable(pTagData); SSuperTableObj *pSuperTable = mgmtGetSuperTable(pTagData);
if (pSuperTable == NULL) { if (pSuperTable == NULL) {
mError("ctable:%s, corresponding super table does not exist", pCreate->tableId); mError("table:%s, corresponding super table does not exist", pCreate->tableId);
free(pTable); free(pTable);
terrno = TSDB_CODE_INVALID_TABLE; terrno = TSDB_CODE_INVALID_TABLE;
return NULL; return NULL;
...@@ -1355,12 +1313,12 @@ static SChildTableObj* mgmtDoCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj ...@@ -1355,12 +1313,12 @@ static SChildTableObj* mgmtDoCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj
if (sdbInsertRow(&desc) != TSDB_CODE_SUCCESS) { if (sdbInsertRow(&desc) != TSDB_CODE_SUCCESS) {
free(pTable); free(pTable);
mError("ctable:%s, update sdb error", pCreate->tableId); mError("table:%s, update sdb error", pCreate->tableId);
terrno = TSDB_CODE_SDB_ERROR; terrno = TSDB_CODE_SDB_ERROR;
return NULL; return NULL;
} }
mTrace("ctable:%s, create ctable in vgroup, uid:%" PRIu64 , pTable->info.tableId, pTable->uid); mTrace("table:%s, create ctable in vgroup, uid:%" PRIu64 , pTable->info.tableId, pTable->uid);
return pTable; return pTable;
} }
...@@ -1414,7 +1372,7 @@ static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) { ...@@ -1414,7 +1372,7 @@ static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) {
mgmtSendMsgToDnode(&ipSet, &rpcMsg); mgmtSendMsgToDnode(&ipSet, &rpcMsg);
} }
void mgmtProcessDropChildTableMsg(SQueuedMsg *pMsg) { static void mgmtProcessDropChildTableMsg(SQueuedMsg *pMsg) {
SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable; SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable;
SVgObj *pVgroup = pMsg->pVgroup = mgmtGetVgroup(pTable->vgId); SVgObj *pVgroup = pMsg->pVgroup = mgmtGetVgroup(pTable->vgId);
if (pVgroup == NULL) { if (pVgroup == NULL) {
...@@ -1501,7 +1459,6 @@ int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName ...@@ -1501,7 +1459,6 @@ int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName
return 0; return 0;
} }
static int32_t mgmtFindNormalTableColumnIndex(SChildTableObj *pTable, char *colName) { static int32_t mgmtFindNormalTableColumnIndex(SChildTableObj *pTable, char *colName) {
SSchema *schema = (SSchema *) pTable->schema; SSchema *schema = (SSchema *) pTable->schema;
for (int32_t i = 0; i < pTable->numOfColumns; i++) { for (int32_t i = 0; i < pTable->numOfColumns; i++) {
...@@ -1744,7 +1701,7 @@ void mgmtDropAllChildTables(SDbObj *pDropDb) { ...@@ -1744,7 +1701,7 @@ void mgmtDropAllChildTables(SDbObj *pDropDb) {
mTrace("db:%s, all child tables:%d is dropped from sdb", pDropDb->name, numOfTables); mTrace("db:%s, all child tables:%d is dropped from sdb", pDropDb->name, numOfTables);
} }
void mgmtDropAllChildTablesInStable(SSuperTableObj *pStable) { static void mgmtDropAllChildTablesInStable(SSuperTableObj *pStable) {
void *pNode = NULL; void *pNode = NULL;
void *pLastNode = NULL; void *pLastNode = NULL;
int32_t numOfTables = 0; int32_t numOfTables = 0;
...@@ -1824,7 +1781,8 @@ static void mgmtProcessTableCfgMsg(SRpcMsg *rpcMsg) { ...@@ -1824,7 +1781,8 @@ static void mgmtProcessTableCfgMsg(SRpcMsg *rpcMsg) {
mgmtDecTableRef(pTable); mgmtDecTableRef(pTable);
} }
static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg) { // handle drop child response
static void mgmtProcessDropChildTableRsp(SRpcMsg *rpcMsg) {
if (rpcMsg->handle == NULL) return; if (rpcMsg->handle == NULL) return;
SQueuedMsg *queueMsg = rpcMsg->handle; SQueuedMsg *queueMsg = rpcMsg->handle;
...@@ -1870,6 +1828,8 @@ static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg) { ...@@ -1870,6 +1828,8 @@ static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg) {
mgmtFreeQueuedMsg(queueMsg); mgmtFreeQueuedMsg(queueMsg);
} }
// handle create table response from dnode
// if failed, drop the table cached
static void mgmtProcessCreateChildTableRsp(SRpcMsg *rpcMsg) { static void mgmtProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
if (rpcMsg->handle == NULL) return; if (rpcMsg->handle == NULL) return;
...@@ -1903,6 +1863,7 @@ static void mgmtProcessCreateChildTableRsp(SRpcMsg *rpcMsg) { ...@@ -1903,6 +1863,7 @@ static void mgmtProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
mgmtFreeQueuedMsg(queueMsg); mgmtFreeQueuedMsg(queueMsg);
} }
// not implemented yet
static void mgmtProcessAlterTableRsp(SRpcMsg *rpcMsg) { static void mgmtProcessAlterTableRsp(SRpcMsg *rpcMsg) {
mTrace("alter table rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code); mTrace("alter table rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code);
} }
...@@ -1971,11 +1932,10 @@ static void mgmtProcessMultiTableMetaMsg(SQueuedMsg *pMsg) { ...@@ -1971,11 +1932,10 @@ static void mgmtProcessMultiTableMetaMsg(SQueuedMsg *pMsg) {
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
} }
// show tables
static int32_t mgmtGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { static int32_t mgmtGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
SDbObj *pDb = mgmtGetDb(pShow->db); SDbObj *pDb = mgmtGetDb(pShow->db);
if (pDb == NULL) { if (pDb == NULL) return TSDB_CODE_DB_NOT_SELECTED;
return TSDB_CODE_DB_NOT_SELECTED;
}
int32_t cols = 0; int32_t cols = 0;
SSchema *pSchema = pMeta->schema; SSchema *pSchema = pMeta->schema;
...@@ -2098,7 +2058,8 @@ static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, ...@@ -2098,7 +2058,8 @@ static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows,
return numOfRows; return numOfRows;
} }
void mgmtAlterChildTable(SQueuedMsg *pMsg, SChildTableObj *pTable) { void mgmtAlterChildTable(SQueuedMsg *pMsg) {
SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable;
int32_t code = TSDB_CODE_OPS_NOT_SUPPORT; int32_t code = TSDB_CODE_OPS_NOT_SUPPORT;
SCMAlterTableMsg *pAlter = pMsg->pCont;; SCMAlterTableMsg *pAlter = pMsg->pCont;;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册