diff --git a/src/inc/mnode.h b/src/inc/mnode.h index 597342d3cbe030a86eb41b33292514ae99de1c09..2fbceb06dfccf5ebac4d7c461698248bbfa043ef 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -250,6 +250,8 @@ typedef struct { void *ahandle; void *thandle; void *pCont; + SAcctObj *pAcct; + SDnodeObj*pDnode; SUserObj *pUser; SDbObj *pDb; SVgObj *pVgroup; diff --git a/src/mnode/src/mgmtProfile.c b/src/mnode/src/mgmtProfile.c index ae921a0505f4f28e2545ee2b39ffb65b1db829ee..4a8b4b1ea19be0fa1a384caee214b9e440eb6974 100644 --- a/src/mnode/src/mgmtProfile.c +++ b/src/mnode/src/mgmtProfile.c @@ -791,6 +791,8 @@ void mgmtFreeQueuedMsg(SQueuedMsg *pMsg) { if (pMsg->pDb) mgmtDecDbRef(pMsg->pDb); if (pMsg->pVgroup) mgmtDecVgroupRef(pMsg->pVgroup); if (pMsg->pTable) mgmtDecTableRef(pMsg->pTable); + // if (pMsg->pAcct) acctDecRef(pMsg->pAcct); + // if (pMsg->pDnode) mgmtDecTableRef(pMsg->pDnode); free(pMsg); } } diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index 08eba1434d5a370790f70ffe7b403ac6ec5d1ca4..a4e4e0bcae76d3a8448484ce53d5ceedd9f3fa9f 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -49,14 +49,13 @@ static void * tsChildTableSdb; static void * tsSuperTableSdb; static int32_t tsChildTableUpdateSize; static int32_t tsSuperTableUpdateSize; - static void * mgmtGetChildTable(char *tableId); static void * mgmtGetSuperTable(char *tableId); - -void mgmtGetChildTableMeta(SQueuedMsg *pMsg); -void mgmtAlterChildTable(SQueuedMsg *pMsg, SChildTableObj *pTable); -void mgmtDropAllChildTables(SDbObj *pDropDb); -void mgmtDropAllChildTablesInStable(SSuperTableObj *pStable); +static void mgmtDropAllChildTablesInStable(SSuperTableObj *pStable); +static int32_t mgmtGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); +static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *pConn); +static int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, void *pConn); +static int32_t mgmtGetShowSuperTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static void mgmtProcessCreateTableMsg(SQueuedMsg *queueMsg); static void mgmtProcessCreateSuperTableMsg(SQueuedMsg *pMsg); @@ -65,28 +64,22 @@ static void mgmtProcessCreateChildTableRsp(SRpcMsg *rpcMsg); static void mgmtProcessDropTableMsg(SQueuedMsg *queueMsg); static void mgmtProcessDropSuperTableMsg(SQueuedMsg *pMsg); -static void mgmtProcessDropStableRsp(SRpcMsg *rpcMsg); +static void mgmtProcessDropSuperTableRsp(SRpcMsg *rpcMsg); static void mgmtProcessDropChildTableMsg(SQueuedMsg *pMsg); -static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg); +static void mgmtProcessDropChildTableRsp(SRpcMsg *rpcMsg); -void mgmtGetSuperTableMeta(SQueuedMsg *pMsg); -void mgmtAlterSuperTable(SQueuedMsg *pMsg, SSuperTableObj *pTable); -void mgmtDropAllSuperTables(SDbObj *pDropDb); -int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable); - -static void mgmtProcessSuperTableVgroupMsg(SQueuedMsg *queueMsg); -static int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, void *pConn); -static int32_t mgmtGetShowSuperTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); - -static void mgmtProcessMultiTableMetaMsg(SQueuedMsg *queueMsg); -static void mgmtProcessAlterTableRsp(SRpcMsg *rpcMsg); -static void mgmtProcessTableCfgMsg(SRpcMsg *rpcMsg); -static int32_t mgmtGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); -static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *pConn); +static void mgmtProcessSuperTableVgroupMsg(SQueuedMsg *queueMsg); +static void mgmtProcessMultiTableMetaMsg(SQueuedMsg *queueMsg); +static void mgmtProcessTableCfgMsg(SRpcMsg *rpcMsg); +static void mgmtProcessTableMetaMsg(SQueuedMsg *queueMsg); +static void mgmtGetSuperTableMeta(SQueuedMsg *pMsg); +static void mgmtGetChildTableMeta(SQueuedMsg *pMsg); static void mgmtProcessAlterTableMsg(SQueuedMsg *queueMsg); -static void mgmtProcessTableMetaMsg(SQueuedMsg *queueMsg); +static void mgmtAlterChildTable(SQueuedMsg *pMsg); +static void mgmtAlterSuperTable(SQueuedMsg *pMsg); +static void mgmtProcessAlterTableRsp(SRpcMsg *rpcMsg); static void mgmtDestroyChildTable(SChildTableObj *pTable) { tfree(pTable->schema); @@ -465,31 +458,32 @@ int32_t mgmtInitTables() { } mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_TABLES_META, mgmtProcessMultiTableMetaMsg); - mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_TABLE, mgmtGetShowTableMeta); - mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_TABLE, mgmtRetrieveShowTables); + mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CREATE_TABLE, mgmtProcessCreateTableMsg); + mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_DROP_TABLE, mgmtProcessDropTableMsg); + mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_ALTER_TABLE, mgmtProcessAlterTableMsg); + mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_TABLE_META, mgmtProcessTableMetaMsg); + mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_STABLE_VGROUP, mgmtProcessSuperTableVgroupMsg); + mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP, mgmtProcessCreateChildTableRsp); - mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_TABLE_RSP, mgmtProcessDropTableRsp); + mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_TABLE_RSP, mgmtProcessDropChildTableRsp); + mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_STABLE_RSP, mgmtProcessDropSuperTableRsp); mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP, mgmtProcessAlterTableRsp); + mgmtAddDServerMsgHandle(TSDB_MSG_TYPE_DM_CONFIG_TABLE, mgmtProcessTableCfgMsg); - mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_STABLE_VGROUP, mgmtProcessSuperTableVgroupMsg); + mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_TABLE, mgmtGetShowTableMeta); + mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_TABLE, mgmtRetrieveShowTables); mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_METRIC, mgmtGetShowSuperTableMeta); mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_METRIC, mgmtRetrieveShowSuperTables); - mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_STABLE_RSP, mgmtProcessDropStableRsp); - - mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CREATE_TABLE, mgmtProcessCreateTableMsg); - mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_DROP_TABLE, mgmtProcessDropTableMsg); - mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_ALTER_TABLE, mgmtProcessAlterTableMsg); - mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_TABLE_META, mgmtProcessTableMetaMsg); - + return TSDB_CODE_SUCCESS; } -static void * mgmtGetChildTable(char *tableId) { +static void *mgmtGetChildTable(char *tableId) { return sdbGetRow(tsChildTableSdb, tableId); } -static void * mgmtGetSuperTable(char *tableId) { +static void *mgmtGetSuperTable(char *tableId) { return sdbGetRow(tsSuperTableSdb, tableId); } @@ -532,7 +526,7 @@ void mgmtCleanUpTables() { mgmtCleanUpSuperTables(); } -void mgmtExtractTableName(char* tableId, char* name) { +static void mgmtExtractTableName(char* tableId, char* name) { int pos = -1; int num = 0; for (pos = 0; tableId[pos] != 0; ++pos) { @@ -578,8 +572,6 @@ static void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) { static void mgmtProcessDropTableMsg(SQueuedMsg *pMsg) { SCMDropTableMsg *pDrop = pMsg->pCont; - if (mgmtCheckRedirect(pMsg->thandle)) return; - pMsg->pDb = mgmtGetDbByTableId(pDrop->tableId); if (pMsg->pDb == NULL || pMsg->pDb->dirty) { mError("table:%s, failed to drop table, db not selected", pDrop->tableId); @@ -619,30 +611,22 @@ static void mgmtProcessAlterTableMsg(SQueuedMsg *pMsg) { SCMAlterTableMsg *pAlter = pMsg->pCont; mTrace("table:%s, alter table msg is received from thandle:%p", pAlter->tableId, pMsg->thandle); - if (mgmtCheckRedirect(pMsg->thandle)) return; - - if (!pMsg->pUser->writeAuth) { - mTrace("table:%s, failed to alter table, no rights", pAlter->tableId); - mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS); - return; - } - - SDbObj *pDb = mgmtGetDbByTableId(pAlter->tableId); - if (pDb == NULL || pDb->dirty) { + pMsg->pDb = mgmtGetDbByTableId(pAlter->tableId); + if (pMsg->pDb == NULL || pMsg->pDb->dirty) { mError("table:%s, failed to alter table, db not selected", pAlter->tableId); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_DB_NOT_SELECTED); return; } - if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) { + if (mgmtCheckIsMonitorDB(pMsg->pDb->name, tsMonitorDbName)) { mError("table:%s, failed to alter table, its log db", pAlter->tableId); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_MONITOR_DB_FORBIDDEN); return; } - STableInfo *pTable = mgmtGetTable(pAlter->tableId); - if (pTable == NULL) { - mError("table:%s, failed to alter table, table not exist", pTable->tableId); + pMsg->pTable = mgmtGetTable(pAlter->tableId); + if (pMsg->pTable == NULL) { + mError("table:%s, failed to alter table, table not exist", pMsg->pTable->tableId); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE); return; } @@ -651,7 +635,6 @@ static void mgmtProcessAlterTableMsg(SQueuedMsg *pMsg) { if (pAlter->numOfCols > 2) { mError("table:%s, error numOfCols:%d in alter table", pAlter->tableId, pAlter->numOfCols); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_APP_ERROR); - mgmtDecTableRef(pTable); return; } @@ -659,12 +642,12 @@ static void mgmtProcessAlterTableMsg(SQueuedMsg *pMsg) { pAlter->schema[i].bytes = htons(pAlter->schema[i].bytes); } - if (pTable->type == TSDB_SUPER_TABLE) { + if (pMsg->pTable->type == TSDB_SUPER_TABLE) { mTrace("table:%s, start to alter stable", pAlter->tableId); - mgmtAlterSuperTable(pMsg, (SSuperTableObj *)pTable); + mgmtAlterSuperTable(pMsg); } else { mTrace("table:%s, start to alter ctable", pAlter->tableId); - mgmtAlterChildTable(pMsg, (SChildTableObj *)pTable); + mgmtAlterChildTable(pMsg); } } @@ -744,7 +727,7 @@ static void mgmtProcessCreateSuperTableMsg(SQueuedMsg *pMsg) { } } -void mgmtProcessDropSuperTableMsg(SQueuedMsg *pMsg) { +static void mgmtProcessDropSuperTableMsg(SQueuedMsg *pMsg) { SSuperTableObj *pStable = (SSuperTableObj *)pMsg->pTable; if (pStable->numOfTables != 0) { mError("stable:%s, numOfTables:%d not 0", pStable->info.tableId, pStable->numOfTables); @@ -761,13 +744,6 @@ void mgmtProcessDropSuperTableMsg(SQueuedMsg *pMsg) { } } -static void *mgmtGetSuperTableVgroup(SSuperTableObj *pStable) { - SCMSTableVgroupRspMsg *rsp = rpcMallocCont(sizeof(SCMSTableVgroupRspMsg) + sizeof(uint32_t) * mgmtGetDnodesNum()); - rsp->numOfDnodes = htonl(1); - rsp->dnodeIps[0] = htonl(inet_addr(tsPrivateIp)); - return rsp; -} - static int32_t mgmtFindSuperTableTagIndex(SSuperTableObj *pStable, const char *tagName) { for (int32_t i = 0; i < pStable->numOfTags; i++) { SSchema *schema = (SSchema *)(pStable->schema + (pStable->numOfColumns + i) * sizeof(SSchema)); @@ -793,18 +769,6 @@ static int32_t mgmtAddSuperTableTag(SSuperTableObj *pStable, SSchema schema[], i } } - SDbObj *pDb = mgmtGetDbByTableId(pStable->info.tableId); - if (pDb == NULL) { - mError("meter: %s not belongs to any database", pStable->info.tableId); - return TSDB_CODE_APP_ERROR; - } - - SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct); - if (pAcct == NULL) { - mError("DB: %s not belongs to andy account", pDb->name); - return TSDB_CODE_APP_ERROR; - } - int32_t schemaSize = sizeof(SSchema) * (pStable->numOfTags + pStable->numOfColumns); pStable->schema = realloc(pStable->schema, schemaSize + sizeof(SSchema) * ntags); @@ -820,7 +784,6 @@ static int32_t mgmtAddSuperTableTag(SSuperTableObj *pStable, SSchema schema[], i pStable->numOfColumns += ntags; pStable->sversion++; - pAcct->acctInfo.numOfTimeSeries += (ntags * pStable->numOfTables); // sdbUpdateRow(tsSuperTableSdb, pStable, tsSuperTableUpdateSize, SDB_OPER_GLOBAL); mTrace("Succeed to add tag column %s to table %s", schema[0].name, pStable->info.tableId); @@ -833,18 +796,6 @@ static int32_t mgmtProcessDropSuperTableMsgTag(SSuperTableObj *pStable, char *ta return TSDB_CODE_APP_ERROR; } - SDbObj *pDb = mgmtGetDbByTableId(pStable->info.tableId); - if (pDb == NULL) { - mError("table: %s not belongs to any database", pStable->info.tableId); - return TSDB_CODE_APP_ERROR; - } - - SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct); - if (pAcct == NULL) { - mError("DB: %s not belongs to any account", pDb->name); - return TSDB_CODE_APP_ERROR; - } - memmove(pStable->schema + sizeof(SSchema) * col, pStable->schema + sizeof(SSchema) * (col + 1), sizeof(SSchema) * (pStable->numOfColumns + pStable->numOfTags - col - 1)); @@ -921,15 +872,15 @@ static int32_t mgmtAddSuperTableColumn(SSuperTableObj *pStable, SSchema schema[] } } - SDbObj *pDb = mgmtGetDbByTableId(pStable->info.tableId); - if (pDb == NULL) { + pMsg->pDb = mgmtGetDbByTableId(pStable->info.tableId); + if (pMsg->pDb == NULL) { mError("meter: %s not belongs to any database", pStable->info.tableId); return TSDB_CODE_APP_ERROR; } - SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct); - if (pAcct == NULL) { - mError("DB: %s not belongs to andy account", pDb->name); + pMsg->pAcct = acctGetAcct(pMsg->pDb->cfg.acct); + if (pMsg->pAcct == NULL) { + mError("DB: %s not belongs to andy account", pMsg->pDb->name); return TSDB_CODE_APP_ERROR; } @@ -948,7 +899,7 @@ static int32_t mgmtAddSuperTableColumn(SSuperTableObj *pStable, SSchema schema[] pStable->numOfColumns += ncols; pStable->sversion++; - pAcct->acctInfo.numOfTimeSeries += (ncols * pStable->numOfTables); + pMsg->pAcct->acctInfo.numOfTimeSeries += (ncols * pStable->numOfTables); // sdbUpdateRow(tsSuperTableSdb, pStable, tsSuperTableUpdateSize, SDB_OPER_GLOBAL); return TSDB_CODE_SUCCESS; @@ -960,15 +911,15 @@ static int32_t mgmtProcessDropSuperTableMsgColumnByName(SSuperTableObj *pStable, return TSDB_CODE_APP_ERROR; } - SDbObj *pDb = mgmtGetDbByTableId(pStable->info.tableId); - if (pDb == NULL) { - mError("table: %s not belongs to any database", pStable->info.tableId); + pMsg->pDb = mgmtGetDbByTableId(pStable->info.tableId); + if (pMsg->pDb == NULL) { + mError("meter: %s not belongs to any database", pStable->info.tableId); return TSDB_CODE_APP_ERROR; } - SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct); - if (pAcct == NULL) { - mError("DB: %s not belongs to any account", pDb->name); + pMsg->pAcct = acctGetAcct(pMsg->pDb->cfg.acct); + if (pMsg->pAcct == NULL) { + mError("DB: %s not belongs to andy account", pMsg->pDb->name); return TSDB_CODE_APP_ERROR; } @@ -981,12 +932,13 @@ static int32_t mgmtProcessDropSuperTableMsgColumnByName(SSuperTableObj *pStable, int32_t schemaSize = sizeof(SSchema) * (pStable->numOfTags + pStable->numOfColumns); pStable->schema = realloc(pStable->schema, schemaSize); - pAcct->acctInfo.numOfTimeSeries -= (pStable->numOfTables); + pMsg->pAcct->acctInfo.numOfTimeSeries -= (pStable->numOfTables); // sdbUpdateRow(tsSuperTableSdb, pStable, tsSuperTableUpdateSize, SDB_OPER_GLOBAL); return TSDB_CODE_SUCCESS; } +// show super tables static int32_t mgmtGetShowSuperTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { SDbObj *pDb = mgmtGetDb(pShow->db); if (pDb == NULL) { @@ -1039,6 +991,7 @@ static int32_t mgmtGetShowSuperTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, return 0; } +// retrieve super tables int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, void *pConn) { int32_t numOfRows = 0; char * pWrite; @@ -1057,7 +1010,8 @@ int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, v SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER; char stableName[TSDB_TABLE_NAME_LEN] = {0}; - while (numOfRows < rows) { + while (numOfRows < rows) { + mgmtDecTableRef(pTable); pShow->pNode = sdbFetchRow(tsSuperTableSdb, pShow->pNode, (void **) &pTable); if (pTable == NULL) break; if (strncmp(pTable->info.tableId, prefix, prefixLen)) { @@ -1094,7 +1048,6 @@ int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, v cols++; numOfRows++; - mgmtDecTableRef(pTable); } pShow->numOfReads += numOfRows; @@ -1132,7 +1085,7 @@ void mgmtDropAllSuperTables(SDbObj *pDropDb) { mTrace("db:%s, all super tables:%d is dropped from sdb", pDropDb->name, numOfTables); } -int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable) { +static int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable) { int32_t numOfCols = pTable->numOfColumns + pTable->numOfTags; for (int32_t i = 0; i < numOfCols; ++i) { strcpy(pSchema->name, pTable->schema[i].name); @@ -1145,7 +1098,7 @@ int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable) { return (pTable->numOfColumns + pTable->numOfTags) * sizeof(SSchema); } -void mgmtGetSuperTableMeta(SQueuedMsg *pMsg) { +static void mgmtGetSuperTableMeta(SQueuedMsg *pMsg) { SSuperTableObj *pTable = (SSuperTableObj *)pMsg->pTable; STableMetaMsg *pMeta = rpcMallocCont(sizeof(STableMetaMsg) + sizeof(SSchema) * TSDB_MAX_COLUMNS); pMeta->uid = htobe64(pTable->uid); @@ -1170,26 +1123,31 @@ void mgmtGetSuperTableMeta(SQueuedMsg *pMsg) { static void mgmtProcessSuperTableVgroupMsg(SQueuedMsg *pMsg) { SCMSTableVgroupMsg *pInfo = pMsg->pCont; - STableInfo *pTable = mgmtGetSuperTable(pInfo->tableId); - if (pTable == NULL) { + pMsg->pTable = mgmtGetSuperTable(pInfo->tableId); + if (pMsg->pTable == NULL) { mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE); return; } - SCMSTableVgroupRspMsg *pRsp = mgmtGetSuperTableVgroup((SSuperTableObj *) pTable); - if (pRsp != NULL) { - int32_t msgLen = sizeof(SSuperTableObj) + htonl(pRsp->numOfDnodes) * sizeof(int32_t); - SRpcMsg rpcRsp = {0}; - rpcRsp.handle = pMsg->thandle; - rpcRsp.pCont = pRsp; - rpcRsp.contLen = msgLen; - rpcSendResponse(&rpcRsp); - } else { + SCMSTableVgroupRspMsg *pRsp = rpcMallocCont(sizeof(SCMSTableVgroupRspMsg) + sizeof(uint32_t) * mgmtGetDnodesNum()); + if (pRsp == NULL) { mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE); - } + return; + } + + pRsp->numOfDnodes = htonl(1); + pRsp->dnodeIps[0] = htonl(inet_addr(tsPrivateIp)); + + int32_t msgLen = sizeof(SSuperTableObj) + htonl(pRsp->numOfDnodes) * sizeof(int32_t); + SRpcMsg rpcRsp = {0}; + rpcRsp.handle = pMsg->thandle; + rpcRsp.pCont = pRsp; + rpcRsp.contLen = msgLen; + rpcSendResponse(&rpcRsp); } -void mgmtAlterSuperTable(SQueuedMsg *pMsg, SSuperTableObj *pTable) { +static void mgmtAlterSuperTable(SQueuedMsg *pMsg) { + SSuperTableObj *pTable = (SSuperTableObj *)pMsg->pTable; int32_t code = TSDB_CODE_OPS_NOT_SUPPORT; SCMAlterTableMsg *pAlter = pMsg->pCont; @@ -1208,7 +1166,7 @@ void mgmtAlterSuperTable(SQueuedMsg *pMsg, SSuperTableObj *pTable) { mgmtSendSimpleResp(pMsg->thandle, code); } -static void mgmtProcessDropStableRsp(SRpcMsg *rpcMsg) { +static void mgmtProcessDropSuperTableRsp(SRpcMsg *rpcMsg) { mTrace("drop stable rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code); } @@ -1280,7 +1238,7 @@ static void *mgmtBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableOb static SChildTableObj* mgmtDoCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t tid) { SChildTableObj *pTable = (SChildTableObj *) calloc(1, sizeof(SChildTableObj)); if (pTable == NULL) { - mError("ctable:%s, failed to alloc memory", pCreate->tableId); + mError("table:%s, failed to alloc memory", pCreate->tableId); terrno = TSDB_CODE_SERV_OUT_OF_MEMORY; return NULL; } @@ -1300,7 +1258,7 @@ static SChildTableObj* mgmtDoCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj char *pTagData = (char *) pCreate->schema; // it is a tag key SSuperTableObj *pSuperTable = mgmtGetSuperTable(pTagData); if (pSuperTable == NULL) { - mError("ctable:%s, corresponding super table does not exist", pCreate->tableId); + mError("table:%s, corresponding super table does not exist", pCreate->tableId); free(pTable); terrno = TSDB_CODE_INVALID_TABLE; return NULL; @@ -1355,12 +1313,12 @@ static SChildTableObj* mgmtDoCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj if (sdbInsertRow(&desc) != TSDB_CODE_SUCCESS) { free(pTable); - mError("ctable:%s, update sdb error", pCreate->tableId); + mError("table:%s, update sdb error", pCreate->tableId); terrno = TSDB_CODE_SDB_ERROR; return NULL; } - mTrace("ctable:%s, create ctable in vgroup, uid:%" PRIu64 , pTable->info.tableId, pTable->uid); + mTrace("table:%s, create ctable in vgroup, uid:%" PRIu64 , pTable->info.tableId, pTable->uid); return pTable; } @@ -1414,7 +1372,7 @@ static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) { mgmtSendMsgToDnode(&ipSet, &rpcMsg); } -void mgmtProcessDropChildTableMsg(SQueuedMsg *pMsg) { +static void mgmtProcessDropChildTableMsg(SQueuedMsg *pMsg) { SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable; SVgObj *pVgroup = pMsg->pVgroup = mgmtGetVgroup(pTable->vgId); if (pVgroup == NULL) { @@ -1501,7 +1459,6 @@ int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName return 0; } - static int32_t mgmtFindNormalTableColumnIndex(SChildTableObj *pTable, char *colName) { SSchema *schema = (SSchema *) pTable->schema; for (int32_t i = 0; i < pTable->numOfColumns; i++) { @@ -1744,7 +1701,7 @@ void mgmtDropAllChildTables(SDbObj *pDropDb) { mTrace("db:%s, all child tables:%d is dropped from sdb", pDropDb->name, numOfTables); } -void mgmtDropAllChildTablesInStable(SSuperTableObj *pStable) { +static void mgmtDropAllChildTablesInStable(SSuperTableObj *pStable) { void *pNode = NULL; void *pLastNode = NULL; int32_t numOfTables = 0; @@ -1824,7 +1781,8 @@ static void mgmtProcessTableCfgMsg(SRpcMsg *rpcMsg) { mgmtDecTableRef(pTable); } -static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg) { +// handle drop child response +static void mgmtProcessDropChildTableRsp(SRpcMsg *rpcMsg) { if (rpcMsg->handle == NULL) return; SQueuedMsg *queueMsg = rpcMsg->handle; @@ -1870,6 +1828,8 @@ static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg) { mgmtFreeQueuedMsg(queueMsg); } +// handle create table response from dnode +// if failed, drop the table cached static void mgmtProcessCreateChildTableRsp(SRpcMsg *rpcMsg) { if (rpcMsg->handle == NULL) return; @@ -1903,6 +1863,7 @@ static void mgmtProcessCreateChildTableRsp(SRpcMsg *rpcMsg) { mgmtFreeQueuedMsg(queueMsg); } +// not implemented yet static void mgmtProcessAlterTableRsp(SRpcMsg *rpcMsg) { mTrace("alter table rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code); } @@ -1971,11 +1932,10 @@ static void mgmtProcessMultiTableMetaMsg(SQueuedMsg *pMsg) { rpcSendResponse(&rpcRsp); } +// show tables static int32_t mgmtGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { SDbObj *pDb = mgmtGetDb(pShow->db); - if (pDb == NULL) { - return TSDB_CODE_DB_NOT_SELECTED; - } + if (pDb == NULL) return TSDB_CODE_DB_NOT_SELECTED; int32_t cols = 0; SSchema *pSchema = pMeta->schema; @@ -2098,7 +2058,8 @@ static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, return numOfRows; } -void mgmtAlterChildTable(SQueuedMsg *pMsg, SChildTableObj *pTable) { +void mgmtAlterChildTable(SQueuedMsg *pMsg) { + SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable; int32_t code = TSDB_CODE_OPS_NOT_SUPPORT; SCMAlterTableMsg *pAlter = pMsg->pCont;;