diff --git a/src/mnode/src/mgmtMnode.c b/src/mnode/src/mgmtMnode.c index e9277651c7267fcb03c5535f3381251e7d709df8..448b36b22434a53ef053d5fe94728827df49fdd6 100644 --- a/src/mnode/src/mgmtMnode.c +++ b/src/mnode/src/mgmtMnode.c @@ -48,7 +48,7 @@ int32_t mgmtInitMnodes() { void mgmtCleanupMnodes() {} bool mgmtInServerStatus() { return tsMnodeObj.status == TSDB_MN_STATUS_SERVING; } bool mgmtIsMaster() { return tsMnodeObj.role == TSDB_MN_ROLE_MASTER; } -bool mgmtCheckRedirect(void *handle) { return false; } +bool mgmtCheckRedirect(void *thandle) { return false; } static int32_t mgmtGetMnodesNum() { return 1; @@ -117,7 +117,7 @@ static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; } - pShow->numOfRows = mgmtGetDnodesNum(); + pShow->numOfRows = mgmtGetMnodesNum(); pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; pShow->pNode = NULL; diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index b84fe08546ca1be4c6c14acda0222212db3091e1..66cecbdc3f7b6202f062669436d56e936a4c21d8 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -40,7 +40,7 @@ typedef int32_t (*SShowMetaFp)(STableMetaMsg *pMeta, SShowObj *pShow, void *pCon typedef int32_t (*SShowRetrieveFp)(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int mgmtShellRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey); -static bool mgmtCheckMsgReadOnly(int8_t type, void *pCont); +static bool mgmtCheckMsgReadOnly(SQueuedMsg *pMsg); static void mgmtProcessMsgFromShell(SRpcMsg *pMsg); static void mgmtProcessUnSupportMsg(SRpcMsg *rpcMsg); static void mgmtProcessMsgWhileNotReady(SRpcMsg *rpcMsg); @@ -135,7 +135,7 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { } if (mgmtCheckRedirect(rpcMsg->handle)) { - // send resp in redirect func + // rpcSendRedirectRsp(rpcMsg->handle, mgmtGetMnodeIpListForRedirect()); rpcFreeCont(rpcMsg->pCont); return; } @@ -165,7 +165,7 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { return; } - if (mgmtCheckMsgReadOnly(rpcMsg->msgType, rpcMsg->pCont)) { + if (mgmtCheckMsgReadOnly(pMsg)) { (*tsMgmtProcessShellMsgFp[rpcMsg->msgType])(pMsg); mgmtFreeQueuedMsg(pMsg); } else { @@ -185,7 +185,7 @@ static void mgmtProcessShowMsg(SQueuedMsg *pMsg) { return; } - if (!tsMgmtShowMetaFp[pShowMsg->type]) { + if (!tsMgmtShowMetaFp[pShowMsg->type] || !tsMgmtShowRetrieveFp[pShowMsg->type]) { mError("show type:%s is not support", taosGetShowTypeStr(pShowMsg->type)); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_OPS_NOT_SUPPORT); return; @@ -299,22 +299,13 @@ static void mgmtProcessRetrieveMsg(SQueuedMsg *pMsg) { } static void mgmtProcessHeartBeatMsg(SQueuedMsg *pMsg) { - //SCMHeartBeatMsg *pHBMsg = (SCMHeartBeatMsg *) rpcMsg->pCont; - //mgmtSaveQueryStreamList(pHBMsg); - SCMHeartBeatRsp *pHBRsp = (SCMHeartBeatRsp *) rpcMallocCont(sizeof(SCMHeartBeatRsp)); if (pHBRsp == NULL) { mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SERV_OUT_OF_MEMORY); return; } - SRpcConnInfo connInfo; - if (rpcGetConnInfo(pMsg->thandle, &connInfo) != 0) { - mError("conn:%p is already released while process heart beat msg", pMsg->thandle); - return; - } - - if (connInfo.serverIp == tsPublicIpInt) { + if (pMsg->usePublicIp) { mgmtGetMnodePublicIpList(&pHBRsp->ipList); } else { mgmtGetMnodePrivateIpList(&pHBRsp->ipList); @@ -424,10 +415,10 @@ static void mgmtProcessUseMsg(SQueuedMsg *pMsg) { SCMUseDbMsg *pUseDbMsg = pMsg->pCont; // todo check for priority of current user - SDbObj* pDbObj = mgmtGetDb(pUseDbMsg->db); + pMsg->pDb = mgmtGetDb(pUseDbMsg->db); int32_t code = TSDB_CODE_SUCCESS; - if (pDbObj == NULL) { + if (pMsg->pDb == NULL) { code = TSDB_CODE_INVALID_DB; } @@ -438,26 +429,29 @@ static void mgmtProcessUseMsg(SQueuedMsg *pMsg) { /** * check if we need to add mgmtProcessTableMetaMsg into tranQueue, which will be executed one-by-one. */ -static bool mgmtCheckMeterMetaMsgType(void *pMsg) { +static bool mgmtCheckTableMetaMsgReadOnly(SQueuedMsg *pMsg) { SCMTableInfoMsg *pInfo = (SCMTableInfoMsg *) pMsg; - int16_t autoCreate = htons(pInfo->createFlag); - STableInfo *pTable = mgmtGetTable(pInfo->tableId); + pMsg->pTable = mgmtGetTable(pInfo->tableId); + if (pMsg->pTable != NULL) return true; // If table does not exists and autoCreate flag is set, we add the handler into task queue - bool addIntoTranQueue = (pTable == NULL && autoCreate == 1); - if (addIntoTranQueue) { + int16_t autoCreate = htons(pInfo->createFlag); + if (autoCreate == 1) { mTrace("table:%s auto created task added", pInfo->tableId); + return false; } - - mgmtDecTableRef(pTable); - return addIntoTranQueue; + + return true; } -static bool mgmtCheckMsgReadOnly(int8_t type, void *pCont) { - if ((type == TSDB_MSG_TYPE_CM_TABLE_META && (!mgmtCheckMeterMetaMsgType(pCont))) || - type == TSDB_MSG_TYPE_CM_STABLE_VGROUP || type == TSDB_MSG_TYPE_RETRIEVE || - type == TSDB_MSG_TYPE_CM_SHOW || type == TSDB_MSG_TYPE_CM_TABLES_META || - type == TSDB_MSG_TYPE_CM_CONNECT) { +static bool mgmtCheckMsgReadOnly(SQueuedMsg *pMsg) { + if (pMsg->msgType == TSDB_MSG_TYPE_CM_TABLE_META) { + return mgmtCheckTableMetaMsgReadOnly(pMsg); + } + + if (pMsg->msgType == TSDB_MSG_TYPE_CM_STABLE_VGROUP || pMsg->msgType == TSDB_MSG_TYPE_RETRIEVE || + pMsg->msgType == TSDB_MSG_TYPE_CM_SHOW || pMsg->msgType == TSDB_MSG_TYPE_CM_TABLES_META || + pMsg->msgType == TSDB_MSG_TYPE_CM_CONNECT) { return true; } diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index 16c801416b8708521ae58a23b8099b1243d1dfe3..492ceeff05edbe78db071110ff45ceea29f01aca 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -77,8 +77,6 @@ static void mgmtGetSuperTableMeta(SQueuedMsg *pMsg); static void mgmtGetChildTableMeta(SQueuedMsg *pMsg); static void mgmtProcessAlterTableMsg(SQueuedMsg *queueMsg); -static void mgmtAlterChildTable(SQueuedMsg *pMsg); -static void mgmtAlterSuperTable(SQueuedMsg *pMsg); static void mgmtProcessAlterTableRsp(SRpcMsg *rpcMsg); static void mgmtDestroyChildTable(SChildTableObj *pTable) { @@ -328,6 +326,7 @@ static int32_t mgmtInitChildTables() { pNode = pLastNode; continue; } + mgmtDecTableRef(pSuperTable); } } @@ -607,50 +606,6 @@ static void mgmtProcessDropTableMsg(SQueuedMsg *pMsg) { } } -static void mgmtProcessAlterTableMsg(SQueuedMsg *pMsg) { - SCMAlterTableMsg *pAlter = pMsg->pCont; - mTrace("table:%s, alter table msg is received from thandle:%p", pAlter->tableId, pMsg->thandle); - - pMsg->pDb = mgmtGetDbByTableId(pAlter->tableId); - if (pMsg->pDb == NULL || pMsg->pDb->dirty) { - mError("table:%s, failed to alter table, db not selected", pAlter->tableId); - mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_DB_NOT_SELECTED); - return; - } - - if (mgmtCheckIsMonitorDB(pMsg->pDb->name, tsMonitorDbName)) { - mError("table:%s, failed to alter table, its log db", pAlter->tableId); - mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_MONITOR_DB_FORBIDDEN); - return; - } - - pMsg->pTable = mgmtGetTable(pAlter->tableId); - if (pMsg->pTable == NULL) { - mError("table:%s, failed to alter table, table not exist", pMsg->pTable->tableId); - mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE); - return; - } - - pAlter->numOfCols = htons(pAlter->numOfCols); - if (pAlter->numOfCols > 2) { - mError("table:%s, error numOfCols:%d in alter table", pAlter->tableId, pAlter->numOfCols); - mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_APP_ERROR); - return; - } - - for (int32_t i = 0; i < pAlter->numOfCols; ++i) { - pAlter->schema[i].bytes = htons(pAlter->schema[i].bytes); - } - - if (pMsg->pTable->type == TSDB_SUPER_TABLE) { - mTrace("table:%s, start to alter stable", pAlter->tableId); - mgmtAlterSuperTable(pMsg); - } else { - mTrace("table:%s, start to alter ctable", pAlter->tableId); - mgmtAlterChildTable(pMsg); - } -} - static void mgmtProcessTableMetaMsg(SQueuedMsg *pMsg) { SCMTableInfoMsg *pInfo = pMsg->pCont; mTrace("table:%s, table meta msg is received from thandle:%p", pInfo->tableId, pMsg->thandle); @@ -662,7 +617,6 @@ static void mgmtProcessTableMetaMsg(SQueuedMsg *pMsg) { return; } - pMsg->pTable = mgmtGetTable(pInfo->tableId); if (pMsg->pTable == NULL) { mgmtGetChildTableMeta(pMsg); } else { @@ -784,13 +738,23 @@ static int32_t mgmtAddSuperTableTag(SSuperTableObj *pStable, SSchema schema[], i pStable->numOfColumns += ntags; pStable->sversion++; - // sdbUpdateRow(tsSuperTableSdb, pStable, tsSuperTableUpdateSize, SDB_OPER_GLOBAL); + SSdbOperDesc oper = { + .type = SDB_OPER_TYPE_GLOBAL, + .table = tsSuperTableSdb, + .pObj = pStable, + .rowSize = tsSuperTableUpdateSize + }; - mTrace("Succeed to add tag column %s to table %s", schema[0].name, pStable->info.tableId); + int32_t code = sdbUpdateRow(&oper); + if (code != TSDB_CODE_SUCCESS) { + return TSDB_CODE_SDB_ERROR; + } + + mPrint("table %s, succeed to add tag %s", pStable->info.tableId, schema[0].name); return TSDB_CODE_SUCCESS; } -static int32_t mgmtProcessDropSuperTableMsgTag(SSuperTableObj *pStable, char *tagName) { +static int32_t mgmtDropSuperTableTag(SSuperTableObj *pStable, char *tagName) { int32_t col = mgmtFindSuperTableTagIndex(pStable, tagName); if (col <= 0 || col >= pStable->numOfTags) { return TSDB_CODE_APP_ERROR; @@ -805,16 +769,27 @@ static int32_t mgmtProcessDropSuperTableMsgTag(SSuperTableObj *pStable, char *ta int32_t schemaSize = sizeof(SSchema) * (pStable->numOfTags + pStable->numOfColumns); pStable->schema = realloc(pStable->schema, schemaSize); - // sdbUpdateRow(tsSuperTableSdb, pStable, tsSuperTableUpdateSize, SDB_OPER_GLOBAL); + SSdbOperDesc oper = { + .type = SDB_OPER_TYPE_GLOBAL, + .table = tsSuperTableSdb, + .pObj = pStable, + .rowSize = tsSuperTableUpdateSize + }; + int32_t code = sdbUpdateRow(&oper); + if (code != TSDB_CODE_SUCCESS) { + return TSDB_CODE_SDB_ERROR; + } + + mPrint("table %s, succeed to drop tag %s", pStable->info.tableId, tagName); return TSDB_CODE_SUCCESS; } -static int32_t mgmtModifySuperTableTagNameByName(SSuperTableObj *pStable, char *oldTagName, char *newTagName) { +static int32_t mgmtModifySuperTableTagName(SSuperTableObj *pStable, char *oldTagName, char *newTagName) { int32_t col = mgmtFindSuperTableTagIndex(pStable, oldTagName); if (col < 0) { // Tag name does not exist - mError("Failed to modify table %s tag column, oname: %s, nname: %s", pStable->info.tableId, oldTagName, newTagName); + mError("table:%s, failed to modify table tag, oldName: %s, newName: %s", pStable->info.tableId, oldTagName, newTagName); return TSDB_CODE_INVALID_MSG_TYPE; } @@ -829,24 +804,19 @@ static int32_t mgmtModifySuperTableTagNameByName(SSuperTableObj *pStable, char * SSchema *schema = (SSchema *) (pStable->schema + (pStable->numOfColumns + col) * sizeof(SSchema)); strncpy(schema->name, newTagName, TSDB_COL_NAME_LEN); - // Encode string - int32_t size = 1 + sizeof(SSuperTableObj) + TSDB_MAX_BYTES_PER_ROW; - char *msg = (char *) malloc(size); - if (msg == NULL) return TSDB_CODE_APP_ERROR; - memset(msg, 0, size); - - // mgmtSuperTableActionEncode(pStable, msg, size, &rowSize); - - int32_t ret = 0; - // int32_t ret = sdbUpdateRow(tsSuperTableSdb, msg, tsSuperTableUpdateSize, SDB_OPER_GLOBAL); - tfree(msg); + SSdbOperDesc oper = { + .type = SDB_OPER_TYPE_GLOBAL, + .table = tsSuperTableSdb, + .pObj = pStable, + .rowSize = tsSuperTableUpdateSize + }; - if (ret < 0) { - mError("Failed to modify table %s tag column", pStable->info.tableId); - return TSDB_CODE_APP_ERROR; + int32_t code = sdbUpdateRow(&oper); + if (code != TSDB_CODE_SUCCESS) { + return TSDB_CODE_SDB_ERROR; } - - mTrace("Succeed to modify table %s tag column", pStable->info.tableId); + + mPrint("table %s, succeed to modify tag %s to %s", pStable->info.tableId, oldTagName, newTagName); return TSDB_CODE_SUCCESS; } @@ -861,7 +831,7 @@ static int32_t mgmtFindSuperTableColumnIndex(SSuperTableObj *pStable, char *colN return -1; } -static int32_t mgmtAddSuperTableColumn(SSuperTableObj *pStable, SSchema schema[], int32_t ncols) { +static int32_t mgmtAddSuperTableColumn(SDbObj *pDb, SSuperTableObj *pStable, SSchema schema[], int32_t ncols) { if (ncols <= 0) { return TSDB_CODE_APP_ERROR; } @@ -872,18 +842,6 @@ static int32_t mgmtAddSuperTableColumn(SSuperTableObj *pStable, SSchema schema[] } } - // pMsg->pDb = mgmtGetDbByTableId(pStable->info.tableId); - // if (pMsg->pDb == NULL) { - // mError("meter: %s not belongs to any database", pStable->info.tableId); - // return TSDB_CODE_APP_ERROR; - // } - - // pMsg->pAcct = acctGetAcct(pMsg->pDb->cfg.acct); - // if (pMsg->pAcct == NULL) { - // mError("DB: %s not belongs to andy account", pMsg->pDb->name); - // return TSDB_CODE_APP_ERROR; - // } - int32_t schemaSize = sizeof(SSchema) * (pStable->numOfTags + pStable->numOfColumns); pStable->schema = realloc(pStable->schema, schemaSize + sizeof(SSchema) * ncols); @@ -899,30 +857,34 @@ static int32_t mgmtAddSuperTableColumn(SSuperTableObj *pStable, SSchema schema[] pStable->numOfColumns += ncols; pStable->sversion++; - // pMsg->pAcct->acctInfo.numOfTimeSeries += (ncols * pStable->numOfTables); - // sdbUpdateRow(tsSuperTableSdb, pStable, tsSuperTableUpdateSize, SDB_OPER_GLOBAL); + SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct); + if (pAcct != NULL) { + pAcct->acctInfo.numOfTimeSeries += (ncols * pStable->numOfTables); + acctDecRef(pAcct); + } + SSdbOperDesc oper = { + .type = SDB_OPER_TYPE_GLOBAL, + .table = tsSuperTableSdb, + .pObj = pStable, + .rowSize = tsSuperTableUpdateSize + }; + + int32_t code = sdbUpdateRow(&oper); + if (code != TSDB_CODE_SUCCESS) { + return TSDB_CODE_SDB_ERROR; + } + + mPrint("table %s, succeed to add column", pStable->info.tableId); return TSDB_CODE_SUCCESS; } -static int32_t mgmtProcessDropSuperTableMsgColumnByName(SSuperTableObj *pStable, char *colName) { +static int32_t mgmtDropSuperTableColumn(SDbObj *pDb, SSuperTableObj *pStable, char *colName) { int32_t col = mgmtFindSuperTableColumnIndex(pStable, colName); if (col < 0) { return TSDB_CODE_APP_ERROR; } - // pMsg->pDb = mgmtGetDbByTableId(pStable->info.tableId); - // if (pMsg->pDb == NULL) { - // mError("meter: %s not belongs to any database", pStable->info.tableId); - // return TSDB_CODE_APP_ERROR; - // } - - // pMsg->pAcct = acctGetAcct(pMsg->pDb->cfg.acct); - // if (pMsg->pAcct == NULL) { - // mError("DB: %s not belongs to andy account", pMsg->pDb->name); - // return TSDB_CODE_APP_ERROR; - // } - memmove(pStable->schema + sizeof(SSchema) * col, pStable->schema + sizeof(SSchema) * (col + 1), sizeof(SSchema) * (pStable->numOfColumns + pStable->numOfTags - col - 1)); @@ -932,18 +894,32 @@ static int32_t mgmtProcessDropSuperTableMsgColumnByName(SSuperTableObj *pStable, int32_t schemaSize = sizeof(SSchema) * (pStable->numOfTags + pStable->numOfColumns); pStable->schema = realloc(pStable->schema, schemaSize); - // pMsg->pAcct->acctInfo.numOfTimeSeries -= (pStable->numOfTables); - // sdbUpdateRow(tsSuperTableSdb, pStable, tsSuperTableUpdateSize, SDB_OPER_GLOBAL); + SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct); + if (pAcct != NULL) { + pAcct->acctInfo.numOfTimeSeries -= pStable->numOfTables; + acctDecRef(pAcct); + } + SSdbOperDesc oper = { + .type = SDB_OPER_TYPE_GLOBAL, + .table = tsSuperTableSdb, + .pObj = pStable, + .rowSize = tsSuperTableUpdateSize + }; + + int32_t code = sdbUpdateRow(&oper); + if (code != TSDB_CODE_SUCCESS) { + return TSDB_CODE_SDB_ERROR; + } + + mPrint("table %s, succeed to delete column", pStable->info.tableId); return TSDB_CODE_SUCCESS; } // show super tables static int32_t mgmtGetShowSuperTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { SDbObj *pDb = mgmtGetDb(pShow->db); - if (pDb == NULL) { - return TSDB_CODE_DB_NOT_SELECTED; - } + if (pDb == NULL) return TSDB_CODE_DB_NOT_SELECTED; int32_t cols = 0; SSchema *pSchema = pMeta->schema; @@ -1146,26 +1122,6 @@ static void mgmtProcessSuperTableVgroupMsg(SQueuedMsg *pMsg) { rpcSendResponse(&rpcRsp); } -static void mgmtAlterSuperTable(SQueuedMsg *pMsg) { - SSuperTableObj *pTable = (SSuperTableObj *)pMsg->pTable; - int32_t code = TSDB_CODE_OPS_NOT_SUPPORT; - SCMAlterTableMsg *pAlter = pMsg->pCont; - - if (pAlter->type == TSDB_ALTER_TABLE_ADD_TAG_COLUMN) { - code = mgmtAddSuperTableTag((SSuperTableObj *) pTable, pAlter->schema, 1); - } else if (pAlter->type == TSDB_ALTER_TABLE_DROP_TAG_COLUMN) { - code = mgmtProcessDropSuperTableMsgTag((SSuperTableObj *) pTable, pAlter->schema[0].name); - } else if (pAlter->type == TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN) { - code = mgmtModifySuperTableTagNameByName((SSuperTableObj *) pTable, pAlter->schema[0].name, pAlter->schema[1].name); - } else if (pAlter->type == TSDB_ALTER_TABLE_ADD_COLUMN) { - code = mgmtAddSuperTableColumn((SSuperTableObj *) pTable, pAlter->schema, 1); - } else if (pAlter->type == TSDB_ALTER_TABLE_DROP_COLUMN) { - code = mgmtProcessDropSuperTableMsgColumnByName((SSuperTableObj *) pTable, pAlter->schema[0].name); - } else {} - - mgmtSendSimpleResp(pMsg->thandle, code); -} - static void mgmtProcessDropSuperTableRsp(SRpcMsg *rpcMsg) { mTrace("drop stable rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code); } @@ -1410,53 +1366,8 @@ static void mgmtProcessDropChildTableMsg(SQueuedMsg *pMsg) { mgmtSendMsgToDnode(&ipSet, &rpcMsg); } -int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent) { -// TODO: send message to dnode -// int32_t col = mgmtFindSuperTableTagIndex(pTable->superTable, tagName); -// if (col < 0 || col > pTable->superTable->numOfTags) { -// return TSDB_CODE_APP_ERROR; -// } -// -// //TODO send msg to dnode -// mTrace("Succeed to modify tag column %d of table %s", col, pTable->info.tableId); -// return TSDB_CODE_SUCCESS; - -// int32_t rowSize = 0; -// SSchema *schema = (SSchema *)(pSuperTable->schema + (pSuperTable->numOfColumns + col) * sizeof(SSchema)); -// -// if (col == 0) { -// pTable->isDirty = 1; -// removeMeterFromMetricIndex(pSuperTable, pTable); -// } -// memcpy(pTable->pTagData + mgmtGetTagsLength(pMetric, col) + TSDB_TABLE_ID_LEN, nContent, schema->bytes); -// if (col == 0) { -// addMeterIntoMetricIndex(pMetric, pTable); -// } -// -// // Encode the string -// int32_t size = sizeof(STabObj) + TSDB_MAX_BYTES_PER_ROW + 1; -// char *msg = (char *)malloc(size); -// if (msg == NULL) { -// mError("failed to allocate message memory while modify tag value"); -// return TSDB_CODE_APP_ERROR; -// } -// memset(msg, 0, size); -// -// mgmtMeterActionEncode(pTable, msg, size, &rowSize); -// -// int32_t ret = sdbUpdateRow(tsChildTableSdb, msg, rowSize, 1); // Need callback function -// tfree(msg); -// -// if (pTable->isDirty) pTable->isDirty = 0; -// -// if (ret < 0) { -// mError("Failed to modify tag column %d of table %s", col, pTable->info.tableId); -// return TSDB_CODE_APP_ERROR; -// } -// -// mTrace("Succeed to modify tag column %d of table %s", col, pTable->info.tableId); -// return TSDB_CODE_SUCCESS; - return 0; +static int32_t mgmtModifyChildTableTagValue(SChildTableObj *pTable, char *tagName, char *nContent) { + return TSDB_CODE_OPS_NOT_SUPPORT; } static int32_t mgmtFindNormalTableColumnIndex(SChildTableObj *pTable, char *colName) { @@ -1470,7 +1381,7 @@ static int32_t mgmtFindNormalTableColumnIndex(SChildTableObj *pTable, char *colN return -1; } -static int32_t mgmtAddNormalTableColumn(SChildTableObj *pTable, SSchema schema[], int32_t ncols) { +static int32_t mgmtAddNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, SSchema schema[], int32_t ncols) { if (ncols <= 0) { return TSDB_CODE_APP_ERROR; } @@ -1481,18 +1392,6 @@ static int32_t mgmtAddNormalTableColumn(SChildTableObj *pTable, SSchema schema[] } } - SDbObj *pDb = mgmtGetDbByTableId(pTable->info.tableId); - if (pDb == NULL) { - mError("table: %s not belongs to any database", pTable->info.tableId); - return TSDB_CODE_APP_ERROR; - } - - SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct); - if (pAcct == NULL) { - mError("DB: %s not belongs to andy account", pDb->name); - return TSDB_CODE_APP_ERROR; - } - int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema); pTable->schema = realloc(pTable->schema, schemaSize + sizeof(SSchema) * ncols); @@ -1505,53 +1404,60 @@ static int32_t mgmtAddNormalTableColumn(SChildTableObj *pTable, SSchema schema[] pTable->numOfColumns += ncols; pTable->sversion++; - pAcct->acctInfo.numOfTimeSeries += ncols; - - SSdbOperDesc desc = {0}; - desc.type = SDB_OPER_TYPE_GLOBAL; - desc.pObj = pTable; - desc.table = tsChildTableSdb; - desc.rowData = pTable; - desc.rowSize = tsChildTableUpdateSize; - sdbUpdateRow(&desc); + + SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct); + if (pAcct != NULL) { + pAcct->acctInfo.numOfTimeSeries += ncols; + acctDecRef(pAcct); + } + + SSdbOperDesc oper = { + .type = SDB_OPER_TYPE_GLOBAL, + .table = tsChildTableSdb, + .pObj = pTable, + .rowSize = tsChildTableUpdateSize + }; + int32_t code = sdbUpdateRow(&oper); + if (code != TSDB_CODE_SUCCESS) { + return TSDB_CODE_SDB_ERROR; + } + + mPrint("table %s, succeed to add column", pTable->info.tableId); return TSDB_CODE_SUCCESS; } -static int32_t mgmtDropNormalTableColumnByName(SChildTableObj *pTable, char *colName) { +static int32_t mgmtDropNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, char *colName) { int32_t col = mgmtFindNormalTableColumnIndex(pTable, colName); if (col < 0) { return TSDB_CODE_APP_ERROR; } - SDbObj *pDb = mgmtGetDbByTableId(pTable->info.tableId); - if (pDb == NULL) { - mError("table: %s not belongs to any database", pTable->info.tableId); - return TSDB_CODE_APP_ERROR; - } - - SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct); - if (pAcct == NULL) { - mError("DB: %s not belongs to any account", pDb->name); - return TSDB_CODE_APP_ERROR; - } - memmove(pTable->schema + sizeof(SSchema) * col, pTable->schema + sizeof(SSchema) * (col + 1), sizeof(SSchema) * (pTable->numOfColumns - col - 1)); pTable->numOfColumns--; pTable->sversion++; - pAcct->acctInfo.numOfTimeSeries--; - - SSdbOperDesc desc = {0}; - desc.type = SDB_OPER_TYPE_GLOBAL; - desc.pObj = pTable; - desc.table = tsChildTableSdb; - desc.rowData = pTable; - desc.rowSize = tsChildTableUpdateSize; - sdbUpdateRow(&desc); + SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct); + if (pAcct != NULL) { + pAcct->acctInfo.numOfTimeSeries--; + acctDecRef(pAcct); + } + + SSdbOperDesc oper = { + .type = SDB_OPER_TYPE_GLOBAL, + .table = tsChildTableSdb, + .pObj = pTable, + .rowSize = tsChildTableUpdateSize + }; + int32_t code = sdbUpdateRow(&oper); + if (code != TSDB_CODE_SUCCESS) { + return TSDB_CODE_SDB_ERROR; + } + + mPrint("table %s, succeed to add column %s", pTable->info.tableId, colName); return TSDB_CODE_SUCCESS; } @@ -1617,13 +1523,7 @@ static int32_t mgmtDoGetChildTableMeta(SQueuedMsg *pMsg, STableMetaMsg *pMeta) { void mgmtGetChildTableMeta(SQueuedMsg *pMsg) { SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable; SCMTableInfoMsg *pInfo = pMsg->pCont; - SDbObj *pDb = pMsg->pDb; - if (pDb == NULL || pDb->dirty) { - mError("table:%s, failed to get table meta, db not selected", pInfo->tableId); - mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_DB_NOT_SELECTED); - return; - } - + if (pTable == NULL) { if (htons(pInfo->createFlag) != 1) { mError("table:%s, failed to get table meta, table not exist", pInfo->tableId); @@ -1869,19 +1769,6 @@ static void mgmtProcessAlterTableRsp(SRpcMsg *rpcMsg) { } static void mgmtProcessMultiTableMetaMsg(SQueuedMsg *pMsg) { - SRpcConnInfo connInfo; - if (rpcGetConnInfo(pMsg->thandle, &connInfo) != 0) { - mError("conn:%p is already released while get mulit table meta", pMsg->thandle); - return; - } - - bool usePublicIp = (connInfo.serverIp == tsPublicIpInt); - SUserObj *pUser = mgmtGetUser(connInfo.user); - if (pUser == NULL) { - mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_USER); - return; - } - SCMMultiTableInfoMsg *pInfo = pMsg->pCont; pInfo->numOfTables = htonl(pInfo->numOfTables); @@ -1932,7 +1819,6 @@ static void mgmtProcessMultiTableMetaMsg(SQueuedMsg *pMsg) { rpcSendResponse(&rpcRsp); } -// show tables static int32_t mgmtGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { SDbObj *pDb = mgmtGetDb(pShow->db); if (pDb == NULL) return TSDB_CODE_DB_NOT_SELECTED; @@ -2058,19 +1944,69 @@ static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, return numOfRows; } -void mgmtAlterChildTable(SQueuedMsg *pMsg) { - SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable; +static void mgmtProcessAlterTableMsg(SQueuedMsg *pMsg) { + SCMAlterTableMsg *pAlter = pMsg->pCont; + mTrace("table:%s, alter table msg is received from thandle:%p", pAlter->tableId, pMsg->thandle); + + pMsg->pDb = mgmtGetDbByTableId(pAlter->tableId); + if (pMsg->pDb == NULL || pMsg->pDb->dirty) { + mError("table:%s, failed to alter table, db not selected", pAlter->tableId); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_DB_NOT_SELECTED); + return; + } + + if (mgmtCheckIsMonitorDB(pMsg->pDb->name, tsMonitorDbName)) { + mError("table:%s, failed to alter table, its log db", pAlter->tableId); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_MONITOR_DB_FORBIDDEN); + return; + } + + pMsg->pTable = mgmtGetTable(pAlter->tableId); + if (pMsg->pTable == NULL) { + mError("table:%s, failed to alter table, table not exist", pMsg->pTable->tableId); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE); + return; + } + + pAlter->numOfCols = htons(pAlter->numOfCols); + if (pAlter->numOfCols > 2) { + mError("table:%s, error numOfCols:%d in alter table", pAlter->tableId, pAlter->numOfCols); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_APP_ERROR); + return; + } + + for (int32_t i = 0; i < pAlter->numOfCols; ++i) { + pAlter->schema[i].bytes = htons(pAlter->schema[i].bytes); + } + int32_t code = TSDB_CODE_OPS_NOT_SUPPORT; - SCMAlterTableMsg *pAlter = pMsg->pCont;; - - if (pAlter->type == TSDB_ALTER_TABLE_UPDATE_TAG_VAL) { - code = mgmtModifyChildTableTagValueByName(pTable, pAlter->schema[0].name, pAlter->tagVal); - } else if (pAlter->type == TSDB_ALTER_TABLE_ADD_COLUMN) { - code = mgmtAddNormalTableColumn(pTable, pAlter->schema, 1); - } else if (pAlter->type == TSDB_ALTER_TABLE_DROP_COLUMN) { - code = mgmtDropNormalTableColumnByName(pTable, pAlter->schema[0].name); + if (pMsg->pTable->type == TSDB_SUPER_TABLE) { + SSuperTableObj *pTable = (SSuperTableObj *)pMsg->pTable; + mTrace("table:%s, start to alter stable", pAlter->tableId); + if (pAlter->type == TSDB_ALTER_TABLE_ADD_TAG_COLUMN) { + code = mgmtAddSuperTableTag(pTable, pAlter->schema, 1); + } else if (pAlter->type == TSDB_ALTER_TABLE_DROP_TAG_COLUMN) { + code = mgmtDropSuperTableTag(pTable, pAlter->schema[0].name); + } else if (pAlter->type == TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN) { + code = mgmtModifySuperTableTagName(pTable, pAlter->schema[0].name, pAlter->schema[1].name); + } else if (pAlter->type == TSDB_ALTER_TABLE_ADD_COLUMN) { + code = mgmtAddSuperTableColumn(pMsg->pDb, pTable, pAlter->schema, 1); + } else if (pAlter->type == TSDB_ALTER_TABLE_DROP_COLUMN) { + code = mgmtDropSuperTableColumn(pMsg->pDb, pTable, pAlter->schema[0].name); + } else { + } } else { + mTrace("table:%s, start to alter ctable", pAlter->tableId); + SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable; + if (pAlter->type == TSDB_ALTER_TABLE_UPDATE_TAG_VAL) { + code = mgmtModifyChildTableTagValue(pTable, pAlter->schema[0].name, pAlter->tagVal); + } else if (pAlter->type == TSDB_ALTER_TABLE_ADD_COLUMN) { + code = mgmtAddNormalTableColumn(pMsg->pDb, pTable, pAlter->schema, 1); + } else if (pAlter->type == TSDB_ALTER_TABLE_DROP_COLUMN) { + code = mgmtDropNormalTableColumn(pMsg->pDb, pTable, pAlter->schema[0].name); + } else { + } } mgmtSendSimpleResp(pMsg->thandle, code); -} \ No newline at end of file +}