提交 b58f9459 编写于 作者: S slguan

[TD-17]

上级 036b3d35
......@@ -48,7 +48,7 @@ int32_t mgmtInitMnodes() {
void mgmtCleanupMnodes() {}
bool mgmtInServerStatus() { return tsMnodeObj.status == TSDB_MN_STATUS_SERVING; }
bool mgmtIsMaster() { return tsMnodeObj.role == TSDB_MN_ROLE_MASTER; }
bool mgmtCheckRedirect(void *handle) { return false; }
bool mgmtCheckRedirect(void *thandle) { return false; }
static int32_t mgmtGetMnodesNum() {
return 1;
......@@ -117,7 +117,7 @@ static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
}
pShow->numOfRows = mgmtGetDnodesNum();
pShow->numOfRows = mgmtGetMnodesNum();
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
pShow->pNode = NULL;
......
......@@ -40,7 +40,7 @@ typedef int32_t (*SShowMetaFp)(STableMetaMsg *pMeta, SShowObj *pShow, void *pCon
typedef int32_t (*SShowRetrieveFp)(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static int mgmtShellRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey);
static bool mgmtCheckMsgReadOnly(int8_t type, void *pCont);
static bool mgmtCheckMsgReadOnly(SQueuedMsg *pMsg);
static void mgmtProcessMsgFromShell(SRpcMsg *pMsg);
static void mgmtProcessUnSupportMsg(SRpcMsg *rpcMsg);
static void mgmtProcessMsgWhileNotReady(SRpcMsg *rpcMsg);
......@@ -135,7 +135,7 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) {
}
if (mgmtCheckRedirect(rpcMsg->handle)) {
// send resp in redirect func
// rpcSendRedirectRsp(rpcMsg->handle, mgmtGetMnodeIpListForRedirect());
rpcFreeCont(rpcMsg->pCont);
return;
}
......@@ -165,7 +165,7 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) {
return;
}
if (mgmtCheckMsgReadOnly(rpcMsg->msgType, rpcMsg->pCont)) {
if (mgmtCheckMsgReadOnly(pMsg)) {
(*tsMgmtProcessShellMsgFp[rpcMsg->msgType])(pMsg);
mgmtFreeQueuedMsg(pMsg);
} else {
......@@ -185,7 +185,7 @@ static void mgmtProcessShowMsg(SQueuedMsg *pMsg) {
return;
}
if (!tsMgmtShowMetaFp[pShowMsg->type]) {
if (!tsMgmtShowMetaFp[pShowMsg->type] || !tsMgmtShowRetrieveFp[pShowMsg->type]) {
mError("show type:%s is not support", taosGetShowTypeStr(pShowMsg->type));
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_OPS_NOT_SUPPORT);
return;
......@@ -299,22 +299,13 @@ static void mgmtProcessRetrieveMsg(SQueuedMsg *pMsg) {
}
static void mgmtProcessHeartBeatMsg(SQueuedMsg *pMsg) {
//SCMHeartBeatMsg *pHBMsg = (SCMHeartBeatMsg *) rpcMsg->pCont;
//mgmtSaveQueryStreamList(pHBMsg);
SCMHeartBeatRsp *pHBRsp = (SCMHeartBeatRsp *) rpcMallocCont(sizeof(SCMHeartBeatRsp));
if (pHBRsp == NULL) {
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SERV_OUT_OF_MEMORY);
return;
}
SRpcConnInfo connInfo;
if (rpcGetConnInfo(pMsg->thandle, &connInfo) != 0) {
mError("conn:%p is already released while process heart beat msg", pMsg->thandle);
return;
}
if (connInfo.serverIp == tsPublicIpInt) {
if (pMsg->usePublicIp) {
mgmtGetMnodePublicIpList(&pHBRsp->ipList);
} else {
mgmtGetMnodePrivateIpList(&pHBRsp->ipList);
......@@ -424,10 +415,10 @@ static void mgmtProcessUseMsg(SQueuedMsg *pMsg) {
SCMUseDbMsg *pUseDbMsg = pMsg->pCont;
// todo check for priority of current user
SDbObj* pDbObj = mgmtGetDb(pUseDbMsg->db);
pMsg->pDb = mgmtGetDb(pUseDbMsg->db);
int32_t code = TSDB_CODE_SUCCESS;
if (pDbObj == NULL) {
if (pMsg->pDb == NULL) {
code = TSDB_CODE_INVALID_DB;
}
......@@ -438,26 +429,29 @@ static void mgmtProcessUseMsg(SQueuedMsg *pMsg) {
/**
* check if we need to add mgmtProcessTableMetaMsg into tranQueue, which will be executed one-by-one.
*/
static bool mgmtCheckMeterMetaMsgType(void *pMsg) {
static bool mgmtCheckTableMetaMsgReadOnly(SQueuedMsg *pMsg) {
SCMTableInfoMsg *pInfo = (SCMTableInfoMsg *) pMsg;
int16_t autoCreate = htons(pInfo->createFlag);
STableInfo *pTable = mgmtGetTable(pInfo->tableId);
pMsg->pTable = mgmtGetTable(pInfo->tableId);
if (pMsg->pTable != NULL) return true;
// If table does not exists and autoCreate flag is set, we add the handler into task queue
bool addIntoTranQueue = (pTable == NULL && autoCreate == 1);
if (addIntoTranQueue) {
int16_t autoCreate = htons(pInfo->createFlag);
if (autoCreate == 1) {
mTrace("table:%s auto created task added", pInfo->tableId);
return false;
}
mgmtDecTableRef(pTable);
return addIntoTranQueue;
return true;
}
static bool mgmtCheckMsgReadOnly(int8_t type, void *pCont) {
if ((type == TSDB_MSG_TYPE_CM_TABLE_META && (!mgmtCheckMeterMetaMsgType(pCont))) ||
type == TSDB_MSG_TYPE_CM_STABLE_VGROUP || type == TSDB_MSG_TYPE_RETRIEVE ||
type == TSDB_MSG_TYPE_CM_SHOW || type == TSDB_MSG_TYPE_CM_TABLES_META ||
type == TSDB_MSG_TYPE_CM_CONNECT) {
static bool mgmtCheckMsgReadOnly(SQueuedMsg *pMsg) {
if (pMsg->msgType == TSDB_MSG_TYPE_CM_TABLE_META) {
return mgmtCheckTableMetaMsgReadOnly(pMsg);
}
if (pMsg->msgType == TSDB_MSG_TYPE_CM_STABLE_VGROUP || pMsg->msgType == TSDB_MSG_TYPE_RETRIEVE ||
pMsg->msgType == TSDB_MSG_TYPE_CM_SHOW || pMsg->msgType == TSDB_MSG_TYPE_CM_TABLES_META ||
pMsg->msgType == TSDB_MSG_TYPE_CM_CONNECT) {
return true;
}
......
......@@ -77,8 +77,6 @@ static void mgmtGetSuperTableMeta(SQueuedMsg *pMsg);
static void mgmtGetChildTableMeta(SQueuedMsg *pMsg);
static void mgmtProcessAlterTableMsg(SQueuedMsg *queueMsg);
static void mgmtAlterChildTable(SQueuedMsg *pMsg);
static void mgmtAlterSuperTable(SQueuedMsg *pMsg);
static void mgmtProcessAlterTableRsp(SRpcMsg *rpcMsg);
static void mgmtDestroyChildTable(SChildTableObj *pTable) {
......@@ -328,6 +326,7 @@ static int32_t mgmtInitChildTables() {
pNode = pLastNode;
continue;
}
mgmtDecTableRef(pSuperTable);
}
}
......@@ -607,50 +606,6 @@ static void mgmtProcessDropTableMsg(SQueuedMsg *pMsg) {
}
}
static void mgmtProcessAlterTableMsg(SQueuedMsg *pMsg) {
SCMAlterTableMsg *pAlter = pMsg->pCont;
mTrace("table:%s, alter table msg is received from thandle:%p", pAlter->tableId, pMsg->thandle);
pMsg->pDb = mgmtGetDbByTableId(pAlter->tableId);
if (pMsg->pDb == NULL || pMsg->pDb->dirty) {
mError("table:%s, failed to alter table, db not selected", pAlter->tableId);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_DB_NOT_SELECTED);
return;
}
if (mgmtCheckIsMonitorDB(pMsg->pDb->name, tsMonitorDbName)) {
mError("table:%s, failed to alter table, its log db", pAlter->tableId);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_MONITOR_DB_FORBIDDEN);
return;
}
pMsg->pTable = mgmtGetTable(pAlter->tableId);
if (pMsg->pTable == NULL) {
mError("table:%s, failed to alter table, table not exist", pMsg->pTable->tableId);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE);
return;
}
pAlter->numOfCols = htons(pAlter->numOfCols);
if (pAlter->numOfCols > 2) {
mError("table:%s, error numOfCols:%d in alter table", pAlter->tableId, pAlter->numOfCols);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_APP_ERROR);
return;
}
for (int32_t i = 0; i < pAlter->numOfCols; ++i) {
pAlter->schema[i].bytes = htons(pAlter->schema[i].bytes);
}
if (pMsg->pTable->type == TSDB_SUPER_TABLE) {
mTrace("table:%s, start to alter stable", pAlter->tableId);
mgmtAlterSuperTable(pMsg);
} else {
mTrace("table:%s, start to alter ctable", pAlter->tableId);
mgmtAlterChildTable(pMsg);
}
}
static void mgmtProcessTableMetaMsg(SQueuedMsg *pMsg) {
SCMTableInfoMsg *pInfo = pMsg->pCont;
mTrace("table:%s, table meta msg is received from thandle:%p", pInfo->tableId, pMsg->thandle);
......@@ -662,7 +617,6 @@ static void mgmtProcessTableMetaMsg(SQueuedMsg *pMsg) {
return;
}
pMsg->pTable = mgmtGetTable(pInfo->tableId);
if (pMsg->pTable == NULL) {
mgmtGetChildTableMeta(pMsg);
} else {
......@@ -784,13 +738,23 @@ static int32_t mgmtAddSuperTableTag(SSuperTableObj *pStable, SSchema schema[], i
pStable->numOfColumns += ntags;
pStable->sversion++;
// sdbUpdateRow(tsSuperTableSdb, pStable, tsSuperTableUpdateSize, SDB_OPER_GLOBAL);
SSdbOperDesc oper = {
.type = SDB_OPER_TYPE_GLOBAL,
.table = tsSuperTableSdb,
.pObj = pStable,
.rowSize = tsSuperTableUpdateSize
};
mTrace("Succeed to add tag column %s to table %s", schema[0].name, pStable->info.tableId);
int32_t code = sdbUpdateRow(&oper);
if (code != TSDB_CODE_SUCCESS) {
return TSDB_CODE_SDB_ERROR;
}
mPrint("table %s, succeed to add tag %s", pStable->info.tableId, schema[0].name);
return TSDB_CODE_SUCCESS;
}
static int32_t mgmtProcessDropSuperTableMsgTag(SSuperTableObj *pStable, char *tagName) {
static int32_t mgmtDropSuperTableTag(SSuperTableObj *pStable, char *tagName) {
int32_t col = mgmtFindSuperTableTagIndex(pStable, tagName);
if (col <= 0 || col >= pStable->numOfTags) {
return TSDB_CODE_APP_ERROR;
......@@ -805,16 +769,27 @@ static int32_t mgmtProcessDropSuperTableMsgTag(SSuperTableObj *pStable, char *ta
int32_t schemaSize = sizeof(SSchema) * (pStable->numOfTags + pStable->numOfColumns);
pStable->schema = realloc(pStable->schema, schemaSize);
// sdbUpdateRow(tsSuperTableSdb, pStable, tsSuperTableUpdateSize, SDB_OPER_GLOBAL);
SSdbOperDesc oper = {
.type = SDB_OPER_TYPE_GLOBAL,
.table = tsSuperTableSdb,
.pObj = pStable,
.rowSize = tsSuperTableUpdateSize
};
int32_t code = sdbUpdateRow(&oper);
if (code != TSDB_CODE_SUCCESS) {
return TSDB_CODE_SDB_ERROR;
}
mPrint("table %s, succeed to drop tag %s", pStable->info.tableId, tagName);
return TSDB_CODE_SUCCESS;
}
static int32_t mgmtModifySuperTableTagNameByName(SSuperTableObj *pStable, char *oldTagName, char *newTagName) {
static int32_t mgmtModifySuperTableTagName(SSuperTableObj *pStable, char *oldTagName, char *newTagName) {
int32_t col = mgmtFindSuperTableTagIndex(pStable, oldTagName);
if (col < 0) {
// Tag name does not exist
mError("Failed to modify table %s tag column, oname: %s, nname: %s", pStable->info.tableId, oldTagName, newTagName);
mError("table:%s, failed to modify table tag, oldName: %s, newName: %s", pStable->info.tableId, oldTagName, newTagName);
return TSDB_CODE_INVALID_MSG_TYPE;
}
......@@ -829,24 +804,19 @@ static int32_t mgmtModifySuperTableTagNameByName(SSuperTableObj *pStable, char *
SSchema *schema = (SSchema *) (pStable->schema + (pStable->numOfColumns + col) * sizeof(SSchema));
strncpy(schema->name, newTagName, TSDB_COL_NAME_LEN);
// Encode string
int32_t size = 1 + sizeof(SSuperTableObj) + TSDB_MAX_BYTES_PER_ROW;
char *msg = (char *) malloc(size);
if (msg == NULL) return TSDB_CODE_APP_ERROR;
memset(msg, 0, size);
// mgmtSuperTableActionEncode(pStable, msg, size, &rowSize);
int32_t ret = 0;
// int32_t ret = sdbUpdateRow(tsSuperTableSdb, msg, tsSuperTableUpdateSize, SDB_OPER_GLOBAL);
tfree(msg);
SSdbOperDesc oper = {
.type = SDB_OPER_TYPE_GLOBAL,
.table = tsSuperTableSdb,
.pObj = pStable,
.rowSize = tsSuperTableUpdateSize
};
if (ret < 0) {
mError("Failed to modify table %s tag column", pStable->info.tableId);
return TSDB_CODE_APP_ERROR;
int32_t code = sdbUpdateRow(&oper);
if (code != TSDB_CODE_SUCCESS) {
return TSDB_CODE_SDB_ERROR;
}
mTrace("Succeed to modify table %s tag column", pStable->info.tableId);
mPrint("table %s, succeed to modify tag %s to %s", pStable->info.tableId, oldTagName, newTagName);
return TSDB_CODE_SUCCESS;
}
......@@ -861,7 +831,7 @@ static int32_t mgmtFindSuperTableColumnIndex(SSuperTableObj *pStable, char *colN
return -1;
}
static int32_t mgmtAddSuperTableColumn(SSuperTableObj *pStable, SSchema schema[], int32_t ncols) {
static int32_t mgmtAddSuperTableColumn(SDbObj *pDb, SSuperTableObj *pStable, SSchema schema[], int32_t ncols) {
if (ncols <= 0) {
return TSDB_CODE_APP_ERROR;
}
......@@ -872,18 +842,6 @@ static int32_t mgmtAddSuperTableColumn(SSuperTableObj *pStable, SSchema schema[]
}
}
// pMsg->pDb = mgmtGetDbByTableId(pStable->info.tableId);
// if (pMsg->pDb == NULL) {
// mError("meter: %s not belongs to any database", pStable->info.tableId);
// return TSDB_CODE_APP_ERROR;
// }
// pMsg->pAcct = acctGetAcct(pMsg->pDb->cfg.acct);
// if (pMsg->pAcct == NULL) {
// mError("DB: %s not belongs to andy account", pMsg->pDb->name);
// return TSDB_CODE_APP_ERROR;
// }
int32_t schemaSize = sizeof(SSchema) * (pStable->numOfTags + pStable->numOfColumns);
pStable->schema = realloc(pStable->schema, schemaSize + sizeof(SSchema) * ncols);
......@@ -899,30 +857,34 @@ static int32_t mgmtAddSuperTableColumn(SSuperTableObj *pStable, SSchema schema[]
pStable->numOfColumns += ncols;
pStable->sversion++;
// pMsg->pAcct->acctInfo.numOfTimeSeries += (ncols * pStable->numOfTables);
// sdbUpdateRow(tsSuperTableSdb, pStable, tsSuperTableUpdateSize, SDB_OPER_GLOBAL);
SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct);
if (pAcct != NULL) {
pAcct->acctInfo.numOfTimeSeries += (ncols * pStable->numOfTables);
acctDecRef(pAcct);
}
SSdbOperDesc oper = {
.type = SDB_OPER_TYPE_GLOBAL,
.table = tsSuperTableSdb,
.pObj = pStable,
.rowSize = tsSuperTableUpdateSize
};
int32_t code = sdbUpdateRow(&oper);
if (code != TSDB_CODE_SUCCESS) {
return TSDB_CODE_SDB_ERROR;
}
mPrint("table %s, succeed to add column", pStable->info.tableId);
return TSDB_CODE_SUCCESS;
}
static int32_t mgmtProcessDropSuperTableMsgColumnByName(SSuperTableObj *pStable, char *colName) {
static int32_t mgmtDropSuperTableColumn(SDbObj *pDb, SSuperTableObj *pStable, char *colName) {
int32_t col = mgmtFindSuperTableColumnIndex(pStable, colName);
if (col < 0) {
return TSDB_CODE_APP_ERROR;
}
// pMsg->pDb = mgmtGetDbByTableId(pStable->info.tableId);
// if (pMsg->pDb == NULL) {
// mError("meter: %s not belongs to any database", pStable->info.tableId);
// return TSDB_CODE_APP_ERROR;
// }
// pMsg->pAcct = acctGetAcct(pMsg->pDb->cfg.acct);
// if (pMsg->pAcct == NULL) {
// mError("DB: %s not belongs to andy account", pMsg->pDb->name);
// return TSDB_CODE_APP_ERROR;
// }
memmove(pStable->schema + sizeof(SSchema) * col, pStable->schema + sizeof(SSchema) * (col + 1),
sizeof(SSchema) * (pStable->numOfColumns + pStable->numOfTags - col - 1));
......@@ -932,18 +894,32 @@ static int32_t mgmtProcessDropSuperTableMsgColumnByName(SSuperTableObj *pStable,
int32_t schemaSize = sizeof(SSchema) * (pStable->numOfTags + pStable->numOfColumns);
pStable->schema = realloc(pStable->schema, schemaSize);
// pMsg->pAcct->acctInfo.numOfTimeSeries -= (pStable->numOfTables);
// sdbUpdateRow(tsSuperTableSdb, pStable, tsSuperTableUpdateSize, SDB_OPER_GLOBAL);
SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct);
if (pAcct != NULL) {
pAcct->acctInfo.numOfTimeSeries -= pStable->numOfTables;
acctDecRef(pAcct);
}
SSdbOperDesc oper = {
.type = SDB_OPER_TYPE_GLOBAL,
.table = tsSuperTableSdb,
.pObj = pStable,
.rowSize = tsSuperTableUpdateSize
};
int32_t code = sdbUpdateRow(&oper);
if (code != TSDB_CODE_SUCCESS) {
return TSDB_CODE_SDB_ERROR;
}
mPrint("table %s, succeed to delete column", pStable->info.tableId);
return TSDB_CODE_SUCCESS;
}
// show super tables
static int32_t mgmtGetShowSuperTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
SDbObj *pDb = mgmtGetDb(pShow->db);
if (pDb == NULL) {
return TSDB_CODE_DB_NOT_SELECTED;
}
if (pDb == NULL) return TSDB_CODE_DB_NOT_SELECTED;
int32_t cols = 0;
SSchema *pSchema = pMeta->schema;
......@@ -1146,26 +1122,6 @@ static void mgmtProcessSuperTableVgroupMsg(SQueuedMsg *pMsg) {
rpcSendResponse(&rpcRsp);
}
static void mgmtAlterSuperTable(SQueuedMsg *pMsg) {
SSuperTableObj *pTable = (SSuperTableObj *)pMsg->pTable;
int32_t code = TSDB_CODE_OPS_NOT_SUPPORT;
SCMAlterTableMsg *pAlter = pMsg->pCont;
if (pAlter->type == TSDB_ALTER_TABLE_ADD_TAG_COLUMN) {
code = mgmtAddSuperTableTag((SSuperTableObj *) pTable, pAlter->schema, 1);
} else if (pAlter->type == TSDB_ALTER_TABLE_DROP_TAG_COLUMN) {
code = mgmtProcessDropSuperTableMsgTag((SSuperTableObj *) pTable, pAlter->schema[0].name);
} else if (pAlter->type == TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN) {
code = mgmtModifySuperTableTagNameByName((SSuperTableObj *) pTable, pAlter->schema[0].name, pAlter->schema[1].name);
} else if (pAlter->type == TSDB_ALTER_TABLE_ADD_COLUMN) {
code = mgmtAddSuperTableColumn((SSuperTableObj *) pTable, pAlter->schema, 1);
} else if (pAlter->type == TSDB_ALTER_TABLE_DROP_COLUMN) {
code = mgmtProcessDropSuperTableMsgColumnByName((SSuperTableObj *) pTable, pAlter->schema[0].name);
} else {}
mgmtSendSimpleResp(pMsg->thandle, code);
}
static void mgmtProcessDropSuperTableRsp(SRpcMsg *rpcMsg) {
mTrace("drop stable rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code);
}
......@@ -1410,53 +1366,8 @@ static void mgmtProcessDropChildTableMsg(SQueuedMsg *pMsg) {
mgmtSendMsgToDnode(&ipSet, &rpcMsg);
}
int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent) {
// TODO: send message to dnode
// int32_t col = mgmtFindSuperTableTagIndex(pTable->superTable, tagName);
// if (col < 0 || col > pTable->superTable->numOfTags) {
// return TSDB_CODE_APP_ERROR;
// }
//
// //TODO send msg to dnode
// mTrace("Succeed to modify tag column %d of table %s", col, pTable->info.tableId);
// return TSDB_CODE_SUCCESS;
// int32_t rowSize = 0;
// SSchema *schema = (SSchema *)(pSuperTable->schema + (pSuperTable->numOfColumns + col) * sizeof(SSchema));
//
// if (col == 0) {
// pTable->isDirty = 1;
// removeMeterFromMetricIndex(pSuperTable, pTable);
// }
// memcpy(pTable->pTagData + mgmtGetTagsLength(pMetric, col) + TSDB_TABLE_ID_LEN, nContent, schema->bytes);
// if (col == 0) {
// addMeterIntoMetricIndex(pMetric, pTable);
// }
//
// // Encode the string
// int32_t size = sizeof(STabObj) + TSDB_MAX_BYTES_PER_ROW + 1;
// char *msg = (char *)malloc(size);
// if (msg == NULL) {
// mError("failed to allocate message memory while modify tag value");
// return TSDB_CODE_APP_ERROR;
// }
// memset(msg, 0, size);
//
// mgmtMeterActionEncode(pTable, msg, size, &rowSize);
//
// int32_t ret = sdbUpdateRow(tsChildTableSdb, msg, rowSize, 1); // Need callback function
// tfree(msg);
//
// if (pTable->isDirty) pTable->isDirty = 0;
//
// if (ret < 0) {
// mError("Failed to modify tag column %d of table %s", col, pTable->info.tableId);
// return TSDB_CODE_APP_ERROR;
// }
//
// mTrace("Succeed to modify tag column %d of table %s", col, pTable->info.tableId);
// return TSDB_CODE_SUCCESS;
return 0;
static int32_t mgmtModifyChildTableTagValue(SChildTableObj *pTable, char *tagName, char *nContent) {
return TSDB_CODE_OPS_NOT_SUPPORT;
}
static int32_t mgmtFindNormalTableColumnIndex(SChildTableObj *pTable, char *colName) {
......@@ -1470,7 +1381,7 @@ static int32_t mgmtFindNormalTableColumnIndex(SChildTableObj *pTable, char *colN
return -1;
}
static int32_t mgmtAddNormalTableColumn(SChildTableObj *pTable, SSchema schema[], int32_t ncols) {
static int32_t mgmtAddNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, SSchema schema[], int32_t ncols) {
if (ncols <= 0) {
return TSDB_CODE_APP_ERROR;
}
......@@ -1481,18 +1392,6 @@ static int32_t mgmtAddNormalTableColumn(SChildTableObj *pTable, SSchema schema[]
}
}
SDbObj *pDb = mgmtGetDbByTableId(pTable->info.tableId);
if (pDb == NULL) {
mError("table: %s not belongs to any database", pTable->info.tableId);
return TSDB_CODE_APP_ERROR;
}
SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct);
if (pAcct == NULL) {
mError("DB: %s not belongs to andy account", pDb->name);
return TSDB_CODE_APP_ERROR;
}
int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema);
pTable->schema = realloc(pTable->schema, schemaSize + sizeof(SSchema) * ncols);
......@@ -1505,53 +1404,60 @@ static int32_t mgmtAddNormalTableColumn(SChildTableObj *pTable, SSchema schema[]
pTable->numOfColumns += ncols;
pTable->sversion++;
pAcct->acctInfo.numOfTimeSeries += ncols;
SSdbOperDesc desc = {0};
desc.type = SDB_OPER_TYPE_GLOBAL;
desc.pObj = pTable;
desc.table = tsChildTableSdb;
desc.rowData = pTable;
desc.rowSize = tsChildTableUpdateSize;
sdbUpdateRow(&desc);
SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct);
if (pAcct != NULL) {
pAcct->acctInfo.numOfTimeSeries += ncols;
acctDecRef(pAcct);
}
SSdbOperDesc oper = {
.type = SDB_OPER_TYPE_GLOBAL,
.table = tsChildTableSdb,
.pObj = pTable,
.rowSize = tsChildTableUpdateSize
};
int32_t code = sdbUpdateRow(&oper);
if (code != TSDB_CODE_SUCCESS) {
return TSDB_CODE_SDB_ERROR;
}
mPrint("table %s, succeed to add column", pTable->info.tableId);
return TSDB_CODE_SUCCESS;
}
static int32_t mgmtDropNormalTableColumnByName(SChildTableObj *pTable, char *colName) {
static int32_t mgmtDropNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, char *colName) {
int32_t col = mgmtFindNormalTableColumnIndex(pTable, colName);
if (col < 0) {
return TSDB_CODE_APP_ERROR;
}
SDbObj *pDb = mgmtGetDbByTableId(pTable->info.tableId);
if (pDb == NULL) {
mError("table: %s not belongs to any database", pTable->info.tableId);
return TSDB_CODE_APP_ERROR;
}
SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct);
if (pAcct == NULL) {
mError("DB: %s not belongs to any account", pDb->name);
return TSDB_CODE_APP_ERROR;
}
memmove(pTable->schema + sizeof(SSchema) * col, pTable->schema + sizeof(SSchema) * (col + 1),
sizeof(SSchema) * (pTable->numOfColumns - col - 1));
pTable->numOfColumns--;
pTable->sversion++;
pAcct->acctInfo.numOfTimeSeries--;
SSdbOperDesc desc = {0};
desc.type = SDB_OPER_TYPE_GLOBAL;
desc.pObj = pTable;
desc.table = tsChildTableSdb;
desc.rowData = pTable;
desc.rowSize = tsChildTableUpdateSize;
sdbUpdateRow(&desc);
SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct);
if (pAcct != NULL) {
pAcct->acctInfo.numOfTimeSeries--;
acctDecRef(pAcct);
}
SSdbOperDesc oper = {
.type = SDB_OPER_TYPE_GLOBAL,
.table = tsChildTableSdb,
.pObj = pTable,
.rowSize = tsChildTableUpdateSize
};
int32_t code = sdbUpdateRow(&oper);
if (code != TSDB_CODE_SUCCESS) {
return TSDB_CODE_SDB_ERROR;
}
mPrint("table %s, succeed to add column %s", pTable->info.tableId, colName);
return TSDB_CODE_SUCCESS;
}
......@@ -1617,13 +1523,7 @@ static int32_t mgmtDoGetChildTableMeta(SQueuedMsg *pMsg, STableMetaMsg *pMeta) {
void mgmtGetChildTableMeta(SQueuedMsg *pMsg) {
SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable;
SCMTableInfoMsg *pInfo = pMsg->pCont;
SDbObj *pDb = pMsg->pDb;
if (pDb == NULL || pDb->dirty) {
mError("table:%s, failed to get table meta, db not selected", pInfo->tableId);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_DB_NOT_SELECTED);
return;
}
if (pTable == NULL) {
if (htons(pInfo->createFlag) != 1) {
mError("table:%s, failed to get table meta, table not exist", pInfo->tableId);
......@@ -1869,19 +1769,6 @@ static void mgmtProcessAlterTableRsp(SRpcMsg *rpcMsg) {
}
static void mgmtProcessMultiTableMetaMsg(SQueuedMsg *pMsg) {
SRpcConnInfo connInfo;
if (rpcGetConnInfo(pMsg->thandle, &connInfo) != 0) {
mError("conn:%p is already released while get mulit table meta", pMsg->thandle);
return;
}
bool usePublicIp = (connInfo.serverIp == tsPublicIpInt);
SUserObj *pUser = mgmtGetUser(connInfo.user);
if (pUser == NULL) {
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_USER);
return;
}
SCMMultiTableInfoMsg *pInfo = pMsg->pCont;
pInfo->numOfTables = htonl(pInfo->numOfTables);
......@@ -1932,7 +1819,6 @@ static void mgmtProcessMultiTableMetaMsg(SQueuedMsg *pMsg) {
rpcSendResponse(&rpcRsp);
}
// show tables
static int32_t mgmtGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
SDbObj *pDb = mgmtGetDb(pShow->db);
if (pDb == NULL) return TSDB_CODE_DB_NOT_SELECTED;
......@@ -2058,19 +1944,69 @@ static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows,
return numOfRows;
}
void mgmtAlterChildTable(SQueuedMsg *pMsg) {
SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable;
static void mgmtProcessAlterTableMsg(SQueuedMsg *pMsg) {
SCMAlterTableMsg *pAlter = pMsg->pCont;
mTrace("table:%s, alter table msg is received from thandle:%p", pAlter->tableId, pMsg->thandle);
pMsg->pDb = mgmtGetDbByTableId(pAlter->tableId);
if (pMsg->pDb == NULL || pMsg->pDb->dirty) {
mError("table:%s, failed to alter table, db not selected", pAlter->tableId);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_DB_NOT_SELECTED);
return;
}
if (mgmtCheckIsMonitorDB(pMsg->pDb->name, tsMonitorDbName)) {
mError("table:%s, failed to alter table, its log db", pAlter->tableId);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_MONITOR_DB_FORBIDDEN);
return;
}
pMsg->pTable = mgmtGetTable(pAlter->tableId);
if (pMsg->pTable == NULL) {
mError("table:%s, failed to alter table, table not exist", pMsg->pTable->tableId);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE);
return;
}
pAlter->numOfCols = htons(pAlter->numOfCols);
if (pAlter->numOfCols > 2) {
mError("table:%s, error numOfCols:%d in alter table", pAlter->tableId, pAlter->numOfCols);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_APP_ERROR);
return;
}
for (int32_t i = 0; i < pAlter->numOfCols; ++i) {
pAlter->schema[i].bytes = htons(pAlter->schema[i].bytes);
}
int32_t code = TSDB_CODE_OPS_NOT_SUPPORT;
SCMAlterTableMsg *pAlter = pMsg->pCont;;
if (pAlter->type == TSDB_ALTER_TABLE_UPDATE_TAG_VAL) {
code = mgmtModifyChildTableTagValueByName(pTable, pAlter->schema[0].name, pAlter->tagVal);
} else if (pAlter->type == TSDB_ALTER_TABLE_ADD_COLUMN) {
code = mgmtAddNormalTableColumn(pTable, pAlter->schema, 1);
} else if (pAlter->type == TSDB_ALTER_TABLE_DROP_COLUMN) {
code = mgmtDropNormalTableColumnByName(pTable, pAlter->schema[0].name);
if (pMsg->pTable->type == TSDB_SUPER_TABLE) {
SSuperTableObj *pTable = (SSuperTableObj *)pMsg->pTable;
mTrace("table:%s, start to alter stable", pAlter->tableId);
if (pAlter->type == TSDB_ALTER_TABLE_ADD_TAG_COLUMN) {
code = mgmtAddSuperTableTag(pTable, pAlter->schema, 1);
} else if (pAlter->type == TSDB_ALTER_TABLE_DROP_TAG_COLUMN) {
code = mgmtDropSuperTableTag(pTable, pAlter->schema[0].name);
} else if (pAlter->type == TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN) {
code = mgmtModifySuperTableTagName(pTable, pAlter->schema[0].name, pAlter->schema[1].name);
} else if (pAlter->type == TSDB_ALTER_TABLE_ADD_COLUMN) {
code = mgmtAddSuperTableColumn(pMsg->pDb, pTable, pAlter->schema, 1);
} else if (pAlter->type == TSDB_ALTER_TABLE_DROP_COLUMN) {
code = mgmtDropSuperTableColumn(pMsg->pDb, pTable, pAlter->schema[0].name);
} else {
}
} else {
mTrace("table:%s, start to alter ctable", pAlter->tableId);
SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable;
if (pAlter->type == TSDB_ALTER_TABLE_UPDATE_TAG_VAL) {
code = mgmtModifyChildTableTagValue(pTable, pAlter->schema[0].name, pAlter->tagVal);
} else if (pAlter->type == TSDB_ALTER_TABLE_ADD_COLUMN) {
code = mgmtAddNormalTableColumn(pMsg->pDb, pTable, pAlter->schema, 1);
} else if (pAlter->type == TSDB_ALTER_TABLE_DROP_COLUMN) {
code = mgmtDropNormalTableColumn(pMsg->pDb, pTable, pAlter->schema[0].name);
} else {
}
}
mgmtSendSimpleResp(pMsg->thandle, code);
}
\ No newline at end of file
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册