提交 562967f1 编写于 作者: S slguan

[TD-9] add code for drop database

上级 13b5cd4c
......@@ -180,7 +180,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
return TSDB_CODE_CLI_OUT_OF_MEMORY;
}
pSql->ipList->ip[0] = inet_addr("192.168.0.1");
pSql->ipList->ip[0] = inet_addr(tsPrivateIp);
if (pSql->cmd.command < TSDB_SQL_MGMT) {
pSql->ipList->port = tsDnodeShellPort;
......@@ -197,7 +197,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
rpcSendRequest(pVnodeConn, pSql->ipList, &rpcMsg);
} else {
pSql->ipList->port = tsMnodeShellPort;
tscPrint("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port);
tscTrace("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port);
memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
SRpcMsg rpcMsg = {
.msgType = pSql->cmd.msgType,
......@@ -213,7 +213,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
}
void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
tscPrint("response:%s is received, len:%d error:%s", taosMsg[rpcMsg->msgType], rpcMsg->contLen, tstrerror(rpcMsg->code));
tscTrace("response:%s is received, len:%d error:%s", taosMsg[rpcMsg->msgType], rpcMsg->contLen, tstrerror(rpcMsg->code));
SSqlObj *pSql = (SSqlObj *)rpcMsg->handle;
if (pSql == NULL || pSql->signature != pSql) {
tscError("%p sql is already released, signature:%p", pSql, pSql->signature);
......
......@@ -70,25 +70,31 @@ void dnodeCleanupRead() {
}
void dnodeRead(SRpcMsg *pMsg) {
int32_t queuedMsgNum = 0;
int32_t leftLen = pMsg->contLen;
char *pCont = (char *) pMsg->pCont;
int32_t contLen = 0;
int32_t numOfVnodes = 0;
int32_t vgId = 0;
SRpcContext *pRpcContext = NULL;
// parse head, get number of vnodes;
if ( numOfVnodes > 1) {
pRpcContext = calloc(sizeof(SRpcContext), 1);
pRpcContext->numOfVnodes = 1;
// SMsgDesc *pDesc = pCont;
// pDesc->numOfVnodes = htonl(pDesc->numOfVnodes);
// pCont += sizeof(SMsgDesc);
// if (pDesc->numOfVnodes > 1) {
// pRpcContext = calloc(sizeof(SRpcContext), 1);
// pRpcContext->numOfVnodes = pDesc->numOfVnodes;
// }
if (pMsg->msgType == TSDB_MSG_TYPE_RETRIEVE) {
queuedMsgNum = 0;
}
while (leftLen > 0) {
// todo: parse head, get vgId, contLen
SMsgHead *pHead = (SMsgHead *) pCont;
pHead->vgId = 1; //htonl(pHead->vgId);
pHead->contLen = pMsg->contLen; //htonl(pHead->contLen);
// get pVnode from vgId
void *pVnode = dnodeGetVnode(vgId);
void *pVnode = dnodeGetVnode(pHead->vgId);
if (pVnode == NULL) {
leftLen -= pHead->contLen;
pCont -= pHead->contLen;
continue;
}
......@@ -96,7 +102,7 @@ void dnodeRead(SRpcMsg *pMsg) {
SReadMsg readMsg;
readMsg.rpcMsg = *pMsg;
readMsg.pCont = pCont;
readMsg.contLen = contLen;
readMsg.contLen = pHead->contLen;
readMsg.pRpcContext = pRpcContext;
readMsg.pVnode = pVnode;
......@@ -104,11 +110,23 @@ void dnodeRead(SRpcMsg *pMsg) {
taosWriteQitem(queue, &readMsg);
// next vnode
leftLen -= contLen;
pCont -= contLen;
leftLen -= pHead->contLen;
pCont -= pHead->contLen;
queuedMsgNum++;
dnodeReleaseVnode(pVnode);
}
if (queuedMsgNum == 0) {
SRpcMsg rpcRsp = {
.handle = pMsg->handle,
.pCont = NULL,
.contLen = 0,
.code = TSDB_CODE_INVALID_VGROUP_ID,
.msgType = 0
};
rpcSendResponse(&rpcRsp);
}
}
void *dnodeAllocateReadWorker() {
......
......@@ -94,9 +94,9 @@ void dnodeWrite(SRpcMsg *pMsg) {
SRpcContext *pRpcContext = NULL;
if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT || pMsg->msgType == TSDB_MSG_TYPE_MD_DROP_STABLE) {
SWriteMsgDesc *pDesc = pCont;
SMsgDesc *pDesc = pCont;
pDesc->numOfVnodes = htonl(pDesc->numOfVnodes);
pCont += sizeof(SWriteMsgDesc);
pCont += sizeof(SMsgDesc);
if (pDesc->numOfVnodes > 1) {
pRpcContext = calloc(sizeof(SRpcContext), 1);
pRpcContext->numOfVnodes = pDesc->numOfVnodes;
......@@ -104,7 +104,7 @@ void dnodeWrite(SRpcMsg *pMsg) {
}
while (leftLen > 0) {
SWriteMsgHead *pHead = (SWriteMsgHead *) pCont;
SMsgHead *pHead = (SMsgHead *) pCont;
pHead->vgId = htonl(pHead->vgId);
pHead->contLen = htonl(pHead->contLen);
......@@ -322,7 +322,7 @@ static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg) {
for (int i = pTable->numOfColumns; i < totalCols; i++) {
tdSchemaAppendCol(pDestTagSchema, pSchema[i].type, pSchema[i].colId, pSchema[i].bytes);
}
tsdbTableSetSchema(&tCfg, pDestTagSchema, false);
tsdbTableSetTagSchema(&tCfg, pDestTagSchema, false);
char *pTagData = pTable->data + totalCols * sizeof(SSchema);
int accumBytes = 0;
......
......@@ -166,6 +166,7 @@ typedef struct _vg_obj {
typedef struct _db_obj {
char name[TSDB_DB_NAME_LEN + 1];
int8_t dirty;
int64_t createdTime;
SDbCfg cfg;
int8_t dropStatus;
......@@ -175,10 +176,8 @@ typedef struct _db_obj {
int32_t numOfVgroups;
int32_t numOfTables;
int32_t numOfSuperTables;
int32_t vgStatus;
SVgObj *pHead; // empty vgroup first
SVgObj *pTail; // empty vgroup end
void * vgTimer;
SVgObj *pHead;
SVgObj *pTail;
} SDbObj;
struct _acctObj;
......
......@@ -236,12 +236,12 @@ typedef struct {
typedef struct {
int32_t numOfVnodes;
} SWriteMsgDesc;
} SMsgDesc;
typedef struct {
int32_t contLen;
int32_t vgId;
} SWriteMsgHead;
} SMsgHead;
typedef struct {
int32_t contLen;
......
......@@ -38,6 +38,8 @@ int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName
int32_t mgmtGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STableMeta *pMeta, bool usePublicIp);
void mgmtDropAllChildTables(SDbObj *pDropDb);
#ifdef __cplusplus
}
#endif
......
......@@ -37,6 +37,8 @@ int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *colName);
int32_t mgmtGetNormalTableMeta(SDbObj *pDb, SNormalTableObj *pTable, STableMeta *pMeta, bool usePublicIp);
void mgmtDropAllNormalTables(SDbObj *pDropDb);
#ifdef __cplusplus
}
#endif
......
......@@ -45,6 +45,8 @@ void * mgmtGetSuperTableVgroup(SSuperTableObj *pStable);
int32_t mgmtFindSuperTableTagIndex(SSuperTableObj *pTable, const char *tagName);
int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable);
void mgmtDropAllSuperTables(SDbObj *pDropDb);
#ifdef __cplusplus
}
#endif
......
......@@ -32,16 +32,9 @@ STableInfo* mgmtGetTable(char *tableId);
STableInfo* mgmtGetTableByPos(uint32_t dnodeIp, int32_t vnode, int32_t sid);
int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMeta *pMeta, bool usePublicIp);
int32_t mgmtRetrieveMetricMeta(void *pConn, char **pStart, SSuperTableMetaMsg *pInfo);
int32_t mgmtDropTable(SDbObj *pDb, char *tableId, int32_t ignore);
int32_t mgmtAlterTable(SDbObj *pDb, SCMAlterTableMsg *pAlter);
void mgmtAddTableIntoSuperTable(SSuperTableObj *pStable);
void mgmtRemoveTableFromSuperTable(SSuperTableObj *pStable);
SMDDropTableMsg *mgmtBuildRemoveTableMsg(STableInfo *pTable);
SMDDropSTableMsg *mgmtBuildRemoveSuperTableMsg(STableInfo *pTable);
#ifdef __cplusplus
}
#endif
......
......@@ -30,7 +30,7 @@ SVgObj *mgmtGetVgroup(int32_t vgId);
SVgObj *mgmtGetVgroupByVnode(uint32_t dnode, int32_t vnode);
void mgmtCreateVgroup(SQueuedMsg *pMsg);
int32_t mgmtDropVgroup(SVgObj *pVgroup);
void mgmtDropVgroup(SVgObj *pVgroup, void *ahandle);
void mgmtUpdateVgroup(SVgObj *pVgroup);
void mgmtSetVgroupIdPool();
......
......@@ -123,7 +123,6 @@ void *mgmtChildTableActionDelete(void *row, char *str, int32_t size, int32_t *ss
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
if (pVgroup == NULL) {
mError("id:%s not in vgroup:%d", pTable->tableId, pTable->vgId);
return NULL;
}
......@@ -462,13 +461,37 @@ int32_t mgmtGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STableMeta *p
for (int32_t i = 0; i < TSDB_VNODES_SUPPORT; ++i) {
if (usePublicIp) {
pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].publicIp;
pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode);
} else {
pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].ip;
pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode);
}
pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode);
pMeta->vpeerDesc[i].vgId = htonl(pVgroup->vgId);
}
pMeta->numOfVpeers = pVgroup->numOfVnodes;
return TSDB_CODE_SUCCESS;
}
void mgmtDropAllChildTables(SDbObj *pDropDb) {
void *pNode = NULL;
void *pLastNode = NULL;
int32_t numOfTables = 0;
int32_t dbNameLen = strlen(pDropDb->name);
SChildTableObj *pTable = NULL;
while (1) {
pNode = sdbFetchRow(tsChildTableSdb, pNode, (void **)&pTable);
if (pTable == NULL) {
break;
}
if (strncmp(pDropDb->name, pTable->tableId, dbNameLen) == 0) {
sdbDeleteRow(tsChildTableSdb, pTable);
pNode = pLastNode;
numOfTables ++;
continue;
}
}
mTrace("db:%s, all child tables:%d is dropped", pDropDb->name, numOfTables);
}
\ No newline at end of file
......@@ -27,6 +27,9 @@
#include "mgmtMnode.h"
#include "mgmtGrant.h"
#include "mgmtShell.h"
#include "mgmtNormalTable.h"
#include "mgmtChildTable.h"
#include "mgmtSuperTable.h"
#include "mgmtTable.h"
#include "mgmtUser.h"
#include "mgmtVgroup.h"
......@@ -34,10 +37,9 @@
static void *tsDbSdb = NULL;
static int32_t tsDbUpdateSize;
static int32_t mgmtUpdateDb(SDbObj *pDb);
static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate);
static int32_t mgmtDropDbByName(SAcctObj *pAcct, char *name, short ignoreNotExists);
static int32_t mgmtDropDb(SDbObj *pDb);
static void mgmtDropDb(void *handle, void *tmrId);
static void mgmtSetDbDirty(SDbObj *pDb);
static int32_t mgmtGetDbMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *pConn);
......@@ -60,7 +62,7 @@ static void mgmtDbActionInit() {
mgmtDbActionFp[SDB_TYPE_UPDATE] = mgmtDbActionUpdate;
mgmtDbActionFp[SDB_TYPE_ENCODE] = mgmtDbActionEncode;
mgmtDbActionFp[SDB_TYPE_DECODE] = mgmtDbActionDecode;
mgmtDbActionFp[SDB_TYPE_RESET] = mgmtDbActionReset;
mgmtDbActionFp[SDB_TYPE_RESET] = mgmtDbActionReset;
mgmtDbActionFp[SDB_TYPE_DESTROY] = mgmtDbActionDestroy;
}
......@@ -98,8 +100,6 @@ int32_t mgmtInitDbs() {
pDb->numOfTables = 0;
pDb->numOfVgroups = 0;
pDb->numOfSuperTables = 0;
pDb->vgStatus = TSDB_VG_STATUS_READY;
pDb->vgTimer = NULL;
pAcct = mgmtGetAcct(pDb->cfg.acct);
if (pAcct != NULL)
mgmtAddDbIntoAcct(pAcct, pDb);
......@@ -293,135 +293,6 @@ static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate) {
return code;
}
static int32_t mgmtUpdateDb(SDbObj *pDb) {
return sdbUpdateRow(tsDbSdb, pDb, tsDbUpdateSize, 1);
}
static int32_t mgmtSetDbDropping(SDbObj *pDb) {
if (pDb->dropStatus == TSDB_DB_STATUS_DROP_FROM_SDB) return 0;
SVgObj *pVgroup = pDb->pHead;
while (pVgroup != NULL) {
for (int32_t i = 0; i < pVgroup->numOfVnodes; i++) {
SVnodeGid *pVnodeGid = pVgroup->vnodeGid + i;
SDnodeObj *pDnode = mgmtGetDnode(pVnodeGid->ip);
if (pDnode == NULL) continue;
SVnodeLoad *pVload = &pDnode->vload[pVnodeGid->vnode];
if (pVload->dropStatus != TSDB_VN_DROP_STATUS_DROPPING) {
pVload->dropStatus = TSDB_VN_DROP_STATUS_DROPPING;
mPrint("dnode:%s vnode:%d db:%s set to dropping status", taosIpStr(pDnode->privateIp), pVnodeGid->vnode, pDb->name);
if (mgmtUpdateDnode(pDnode) < 0) {
mError("db:%s drop failed, dnode sdb update error", pDb->name);
return TSDB_CODE_SDB_ERROR;
}
}
}
//void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle) {
// mTrace("vgroup:%d send free vgroup msg, ahandle:%p", pVgroup->vgId, ahandle);
//
// for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
// SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip);
// mgmtSendDropVnodeMsg(pVgroup->vnodeGid[i].vnode, &ipSet, ahandle);
// }
//}
//
// mgmtSendDropVgroupMsg(pVgroup);
pVgroup = pVgroup->next;
}
if (pDb->dropStatus == TSDB_DB_STATUS_DROPPING) return 0;
pDb->dropStatus = TSDB_DB_STATUS_DROPPING;
if (mgmtUpdateDb(pDb) < 0) {
mError("db:%s drop failed, db sdb update error", pDb->name);
return TSDB_CODE_SDB_ERROR;
}
mPrint("db:%s set to dropping status", pDb->name);
return 0;
}
static bool mgmtCheckDropDbFinished(SDbObj *pDb) {
SVgObj *pVgroup = pDb->pHead;
while (pVgroup) {
for (int32_t i = 0; i < pVgroup->numOfVnodes; i++) {
SVnodeGid *pVnodeGid = pVgroup->vnodeGid + i;
SDnodeObj *pDnode = mgmtGetDnode(pVnodeGid->ip);
if (pDnode == NULL) continue;
if (pDnode->status == TSDB_DN_STATUS_OFFLINE) continue;
SVnodeLoad *pVload = &pDnode->vload[pVnodeGid->vnode];
if (pVload->dropStatus == TSDB_VN_DROP_STATUS_DROPPING) {
mTrace("dnode:%s, vnode:%d db:%s wait dropping", taosIpStr(pDnode->privateIp), pVnodeGid->vnode, pDb->name);
return false;
}
}
pVgroup = pVgroup->next;
}
mPrint("db:%s all vnodes drop finished", pDb->name);
return true;
}
static void mgmtDropDbFromSdb(SDbObj *pDb) {
while (pDb->pHead) mgmtDropVgroup(pDb->pHead);
// SSuperTableObj *pMetric = pDb->pSTable;
// while (pMetric) {
// SSuperTableObj *pNext = pMetric->next;
// mgmtDropTable(pDb, pMetric->tableId, 0);
// pMetric = pNext;
// }
mPrint("db:%s all meters drop finished", pDb->name);
sdbDeleteRow(tsDbSdb, pDb);
mPrint("db:%s database drop finished", pDb->name);
}
static int32_t mgmtDropDb(SDbObj *pDb) {
if (pDb->dropStatus == TSDB_DB_STATUS_DROPPING) {
bool finished = mgmtCheckDropDbFinished(pDb);
if (!finished) {
SVgObj *pVgroup = pDb->pHead;
while (pVgroup != NULL) {
//mgmtSendDropVgroupMsg(pVgroup, NULL);
pVgroup = pVgroup->next;
}
return TSDB_CODE_ACTION_IN_PROGRESS;
}
// don't sync this action
pDb->dropStatus = TSDB_DB_STATUS_DROP_FROM_SDB;
mgmtDropDbFromSdb(pDb);
return 0;
} else {
int32_t code = mgmtSetDbDropping(pDb);
if (code != 0) return code;
return TSDB_CODE_ACTION_IN_PROGRESS;
}
}
UNUSED_FUNC
static int32_t mgmtDropDbByName(SAcctObj *pAcct, char *name, short ignoreNotExists) {
SDbObj *pDb = (SDbObj *)sdbGetRow(tsDbSdb, name);
if (pDb == NULL) {
if (ignoreNotExists) return TSDB_CODE_SUCCESS;
mWarn("db:%s is not there", name);
return TSDB_CODE_INVALID_DB;
}
if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) {
return TSDB_CODE_MONITOR_DB_FORBIDDEN;
}
return mgmtDropDb(pDb);
}
bool mgmtCheckIsMonitorDB(char *db, char *monitordb) {
char dbName[TSDB_DB_NAME_LEN + 1] = {0};
extractDBName(db, dbName);
......@@ -430,20 +301,6 @@ bool mgmtCheckIsMonitorDB(char *db, char *monitordb) {
return (strncasecmp(dbName, monitordb, len) == 0 && len == strlen(monitordb));
}
UNUSED_FUNC
static void mgmtMonitorDbDrop(void *unused, void *unusedt) {
void * pNode = NULL;
SDbObj *pDb = NULL;
while (1) {
pNode = sdbFetchRow(tsDbSdb, pNode, (void **)&pDb);
if (pDb == NULL) break;
if (pDb->dropStatus != TSDB_DB_STATUS_DROPPING) continue;
mgmtDropDb(pDb);
break;
}
}
static int32_t mgmtAlterDb(SAcctObj *pAcct, SCMAlterDbMsg *pAlter) {
return 0;
// int32_t code = TSDB_CODE_SUCCESS;
......@@ -840,7 +697,6 @@ void *mgmtDbActionInsert(void *row, char *str, int32_t size, int32_t *ssize) {
pDb->pTail = NULL;
pDb->numOfVgroups = 0;
pDb->numOfTables = 0;
pDb->vgTimer = NULL;
mgmtAddDbIntoAcct(pAcct, pDb);
return NULL;
......@@ -851,6 +707,10 @@ void *mgmtDbActionDelete(void *row, char *str, int32_t size, int32_t *ssize) {
SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct);
mgmtRemoveDbFromAcct(pAcct, pDb);
mgmtDropAllNormalTables(pDb);
mgmtDropAllChildTables(pDb);
mgmtDropAllSuperTables(pDb);
return NULL;
}
......@@ -906,6 +766,10 @@ void mgmtRemoveTableFromDb(SDbObj *pDb) {
atomic_add_fetch_32(&pDb->numOfTables, -1);
}
static void mgmtSetDbDirty(SDbObj *pDb) {
pDb->dirty = true;
}
static void mgmtProcessCreateDbMsg(SQueuedMsg *pMsg) {
if (mgmtCheckRedirect(pMsg->thandle)) return;
......@@ -919,7 +783,6 @@ static void mgmtProcessCreateDbMsg(SQueuedMsg *pMsg) {
pCreate->commitTime = htonl(pCreate->commitTime);
pCreate->blocksPerTable = htons(pCreate->blocksPerTable);
pCreate->rowsInFileBlock = htonl(pCreate->rowsInFileBlock);
// pCreate->cacheNumOfBlocks = htonl(pCreate->cacheNumOfBlocks);
int32_t code;
if (mgmtCheckExpired()) {
......@@ -957,21 +820,76 @@ static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg) {
mgmtSendSimpleResp(pMsg->thandle, code);
}
static void mgmtDropDb(void *handle, void *tmrId) {
SQueuedMsg *newMsg = handle;
SDbObj *pDb = newMsg->ahandle;
mPrint("db:%s, drop db from sdb", pDb->name);
int32_t code = sdbDeleteRow(tsDbSdb, pDb);
if (code != 0) {
code = TSDB_CODE_SDB_ERROR;
}
mgmtSendSimpleResp(newMsg->thandle, code);
rpcFreeCont(newMsg->pCont);
free(newMsg);
}
static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) {
if (mgmtCheckRedirect(pMsg->thandle)) return;
int32_t code;
if (pMsg->pUser->superAuth) {
//SCMDropDbMsg *pDrop = rpcMsg->pCont;
//rpcRsp.code = mgmtDropDbByName(pUser->pAcct, pDrop->db, pDrop->ignoreNotExists);
//if (rpcRsp.code == TSDB_CODE_SUCCESS) {
// mLPrint("DB:%s is dropped by %s", pDrop->db, pUser->user);
//}
} else {
code = TSDB_CODE_NO_RIGHTS;
SCMDropDbMsg *pDrop = pMsg->pCont;
mTrace("db:%s, drop db msg is received from thandle:%p", pDrop->db, pMsg->thandle);
if (mgmtCheckExpired()) {
mError("db:%s, failed to drop, grant expired", pDrop->db);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_GRANT_EXPIRED);
return;
}
if (!pMsg->pUser->writeAuth) {
mError("db:%s, failed to drop, no rights", pDrop->db);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS);
return;
}
SDbObj *pDb = mgmtGetDb(pDrop->db);
if (pDb == NULL) {
if (pDrop->ignoreNotExists) {
mTrace("db:%s, db is not exist, think drop success", pDrop->db);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SUCCESS);
return;
} else {
mError("db:%s, failed to drop, invalid db", pDrop->db);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_DB);
return;
}
}
if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) {
mError("db:%s, can't drop monitor database", pDrop->db);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_MONITOR_DB_FORBIDDEN);
return;
}
if (code != TSDB_CODE_SUCCESS) {
mgmtSendSimpleResp(pMsg->thandle, code);
mgmtSetDbDirty(pDb);
SQueuedMsg *newMsg = malloc(sizeof(SQueuedMsg));
memcpy(newMsg, pMsg, sizeof(SQueuedMsg));
pMsg->pCont = NULL;
SVgObj *pVgroup = pDb->pHead;
if (pVgroup != NULL) {
mPrint("vgroup:%d, will be dropped", pVgroup->vgId);
newMsg->ahandle = pVgroup;
newMsg->expected = pVgroup->numOfVnodes;
mgmtDropVgroup(pVgroup, newMsg);
return;
}
mTrace("db:%s, all vgroups is dropped", pDb->name);
void *tmpTmr;
newMsg->ahandle = pDb;
taosTmrReset(mgmtDropDb, 10, newMsg, tsMgmtTmr, &tmpTmr);
}
......@@ -125,7 +125,6 @@ void *mgmtNormalTableActionDelete(void *row, char *str, int32_t size, int32_t *s
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
if (pVgroup == NULL) {
mError("id:%s not in vgroup:%d", pTable->tableId, pTable->vgId);
return NULL;
}
......@@ -540,14 +539,35 @@ int32_t mgmtGetNormalTableMeta(SDbObj *pDb, SNormalTableObj *pTable, STableMeta
for (int32_t i = 0; i < TSDB_VNODES_SUPPORT; ++i) {
if (usePublicIp) {
pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].publicIp;
pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode);
} else {
pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].ip;
pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode);
}
pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode);
pMeta->vpeerDesc[i].vgId = htonl(pVgroup->vgId);
}
pMeta->numOfVpeers = pVgroup->numOfVnodes;
return TSDB_CODE_SUCCESS;
}
void mgmtDropAllNormalTables(SDbObj *pDropDb) {
void *pNode = NULL;
void *pLastNode = NULL;
int32_t numOfTables = 0;
int32_t dbNameLen = strlen(pDropDb->name);
SNormalTableObj *pTable = NULL;
while (1) {
pNode = sdbFetchRow(tsNormalTableSdb, pNode, (void **)&pTable);
if (pTable == NULL) break;
if (strncmp(pDropDb->name, pTable->tableId, dbNameLen) == 0) {
sdbDeleteRow(tsNormalTableSdb, pTable);
pNode = pLastNode;
numOfTables ++;
continue;
}
}
mTrace("db:%s, all normal tables:%d is dropped", pDropDb->name, numOfTables);
}
......@@ -242,6 +242,8 @@ static void mgmtProcessRetrieveMsg(SQueuedMsg *pMsg) {
}
SShowObj *pShow = (SShowObj *)pRetrieve->qhandle;
mTrace("show:%p, type:%s, retrieve data", pShow, taosGetShowTypeStr(pShow->type));
if (!mgmtCheckQhandle(pRetrieve->qhandle)) {
mError("pShow:%p, query memory is corrupted", pShow);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_MEMORY_CORRUPTED);
......
......@@ -612,6 +612,30 @@ int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, v
return numOfRows;
}
void mgmtDropAllSuperTables(SDbObj *pDropDb) {
void *pNode = NULL;
void *pLastNode = NULL;
int32_t numOfTables = 0;
int32_t dbNameLen = strlen(pDropDb->name);
SSuperTableObj *pTable = NULL;
while (1) {
pNode = sdbFetchRow(tsSuperTableSdb, pNode, (void **)&pTable);
if (pTable == NULL) {
break;
}
if (strncmp(pDropDb->name, pTable->tableId, dbNameLen) == 0) {
sdbDeleteRow(tsSuperTableSdb, pTable);
pNode = pLastNode;
numOfTables ++;
continue;
}
}
mTrace("db:%s, all super tables:%d is dropped", pDropDb->name, numOfTables);
}
void mgmtAddTableIntoSuperTable(SSuperTableObj *pStable) {
pStable->numOfTables++;
}
......
......@@ -346,13 +346,6 @@ int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *
return numOfRows;
}
SMDDropTableMsg *mgmtBuildRemoveTableMsg(STableInfo *pTable) {
SMDDropTableMsg *pRemove = NULL;
return pRemove;
}
void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) {
if (mgmtCheckRedirect(pMsg->thandle)) return;
......@@ -439,7 +432,7 @@ void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) {
mgmtSendSimpleResp(pMsg->thandle, terrno);
return;
}
pMDCreate = mgmtBuildCreateChildTableMsg(pCreate, pTable);
pMDCreate = mgmtBuildCreateChildTableMsg(pCreate, (SChildTableObj *) pTable);
if (pMDCreate == NULL) {
mgmtSendSimpleResp(pMsg->thandle, terrno);
return;
......@@ -451,7 +444,7 @@ void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) {
mgmtSendSimpleResp(pMsg->thandle, terrno);
return;
}
pMDCreate = mgmtBuildCreateNormalTableMsg(pTable);
pMDCreate = mgmtBuildCreateNormalTableMsg((SNormalTableObj *) pTable);
if (pMDCreate == NULL) {
mgmtSendSimpleResp(pMsg->thandle, terrno);
return;
......@@ -475,7 +468,7 @@ void mgmtProcessDropTableMsg(SQueuedMsg *pMsg) {
if (mgmtCheckRedirect(pMsg->thandle)) return;
SCMDropTableMsg *pDrop = pMsg->pCont;
mTrace("table:%s, drop msg is received from thandle:%p", pDrop->tableId, pMsg->thandle);
mTrace("table:%s, drop table msg is received from thandle:%p", pDrop->tableId, pMsg->thandle);
if (mgmtCheckExpired()) {
mError("table:%s, failed to drop, grant expired", pDrop->tableId);
......@@ -832,7 +825,7 @@ static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg) {
if (pVgroup->numOfTables <= 0) {
mPrint("vgroup:%d, all tables is dropped, drop vgroup", pVgroup->vgId);
mgmtDropVgroup(pVgroup);
mgmtDropVgroup(pVgroup, NULL);
}
mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SUCCESS);
......
......@@ -138,18 +138,6 @@ SVgObj *mgmtGetAvailableVgroup(SDbObj *pDb) {
return pDb->pHead;
}
void mgmtProcessVgTimer(void *handle, void *tmrId) {
SDbObj *pDb = (SDbObj *)handle;
if (pDb == NULL) return;
if (pDb->vgStatus > TSDB_VG_STATUS_IN_PROGRESS) {
mTrace("db:%s, set vgroup status from %d to ready", pDb->name, pDb->vgStatus);
pDb->vgStatus = TSDB_VG_STATUS_READY;
}
pDb->vgTimer = NULL;
}
void mgmtCreateVgroup(SQueuedMsg *pMsg) {
SDbObj *pDb = pMsg->pDb;
if (pDb == NULL) {
......@@ -188,22 +176,14 @@ void mgmtCreateVgroup(SQueuedMsg *pMsg) {
mgmtSendCreateVgroupMsg(pVgroup, pMsg);
}
int32_t mgmtDropVgroup(SVgObj *pVgroup) {
// STableInfo *pTable;
if (pVgroup->numOfTables > 0) {
// for (int32_t i = 0; i < pDb->cfg.maxSessions; ++i) {
// if (pVgroup->tableList != NULL) {
// pTable = pVgroup->tableList[i];
// if (pTable) mgmtDropTable(pDb, pTable->tableId, 0);
// }
// }
void mgmtDropVgroup(SVgObj *pVgroup, void *ahandle) {
if (ahandle != NULL) {
mgmtSendDropVgroupMsg(pVgroup, ahandle);
} else {
mTrace("vgroup:%d, replica:%d is deleting from sdb", pVgroup->vgId, pVgroup->numOfVnodes);
mgmtSendDropVgroupMsg(pVgroup, NULL);
sdbDeleteRow(tsVgroupSdb, pVgroup);
}
mTrace("vgroup:%d, replica:%d is deleting from sdb", pVgroup->vgId, pVgroup->numOfVnodes);
mgmtSendDropVgroupMsg(pVgroup, NULL);
sdbDeleteRow(tsVgroupSdb, pVgroup);
return TSDB_CODE_SUCCESS;
}
void mgmtSetVgroupIdPool() {
......@@ -666,4 +646,33 @@ static void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle) {
static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg) {
mTrace("drop vnode msg is received");
if (rpcMsg->handle == NULL) return;
SQueuedMsg *queueMsg = rpcMsg->handle;
queueMsg->received++;
if (rpcMsg->code == TSDB_CODE_SUCCESS) {
queueMsg->code = rpcMsg->code;
queueMsg->successed++;
}
SVgObj *pVgroup = queueMsg->ahandle;
mTrace("vgroup:%d, drop vnode rsp received, result:%s received:%d successed:%d expected:%d, thandle:%p ahandle:%p",
pVgroup->vgId, tstrerror(rpcMsg->code), queueMsg->received, queueMsg->successed, queueMsg->expected,
queueMsg->thandle, rpcMsg->handle);
if (queueMsg->received != queueMsg->expected) return;
sdbDeleteRow(tsVgroupSdb, pVgroup);
SQueuedMsg *newMsg = calloc(1, sizeof(SQueuedMsg));
newMsg->msgType = queueMsg->msgType;
newMsg->thandle = queueMsg->thandle;
newMsg->pDb = queueMsg->pDb;
newMsg->pUser = queueMsg->pUser;
newMsg->contLen = queueMsg->contLen;
newMsg->pCont = rpcMallocCont(newMsg->contLen);
memcpy(newMsg->pCont, queueMsg->pCont, newMsg->contLen);
mgmtAddToShellQueue(newMsg);
free(queueMsg);
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册