未验证 提交 04e0fa80 编写于 作者: H hzcheng 提交者: GitHub

Merge pull request #1381 from taosdata/refact/slguan

Refact/slguan
......@@ -180,7 +180,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
return TSDB_CODE_CLI_OUT_OF_MEMORY;
}
pSql->ipList->ip[0] = inet_addr("192.168.0.1");
pSql->ipList->ip[0] = inet_addr(tsPrivateIp);
if (pSql->cmd.command < TSDB_SQL_MGMT) {
pSql->ipList->port = tsDnodeShellPort;
......@@ -197,7 +197,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
rpcSendRequest(pVnodeConn, pSql->ipList, &rpcMsg);
} else {
pSql->ipList->port = tsMnodeShellPort;
tscPrint("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port);
tscTrace("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port);
memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
SRpcMsg rpcMsg = {
.msgType = pSql->cmd.msgType,
......@@ -213,7 +213,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
}
void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
tscPrint("response:%s is received, len:%d error:%s", taosMsg[rpcMsg->msgType], rpcMsg->contLen, tstrerror(rpcMsg->code));
tscTrace("response:%s is received, len:%d error:%s", taosMsg[rpcMsg->msgType], rpcMsg->contLen, tstrerror(rpcMsg->code));
SSqlObj *pSql = (SSqlObj *)rpcMsg->handle;
if (pSql == NULL || pSql->signature != pSql) {
tscError("%p sql is already released, signature:%p", pSql, pSql->signature);
......@@ -2032,6 +2032,7 @@ int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
tscClearFieldInfo(&pQueryInfo->fieldsInfo);
msgLen = pMsg - (char*)pCreateTableMsg;
pCreateTableMsg->contLen = htonl(msgLen);
pCmd->payloadLen = msgLen;
pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE;
......
......@@ -70,25 +70,31 @@ void dnodeCleanupRead() {
}
void dnodeRead(SRpcMsg *pMsg) {
int32_t queuedMsgNum = 0;
int32_t leftLen = pMsg->contLen;
char *pCont = (char *) pMsg->pCont;
int32_t contLen = 0;
int32_t numOfVnodes = 0;
int32_t vgId = 0;
SRpcContext *pRpcContext = NULL;
// parse head, get number of vnodes;
if ( numOfVnodes > 1) {
pRpcContext = calloc(sizeof(SRpcContext), 1);
pRpcContext->numOfVnodes = 1;
// SMsgDesc *pDesc = pCont;
// pDesc->numOfVnodes = htonl(pDesc->numOfVnodes);
// pCont += sizeof(SMsgDesc);
// if (pDesc->numOfVnodes > 1) {
// pRpcContext = calloc(sizeof(SRpcContext), 1);
// pRpcContext->numOfVnodes = pDesc->numOfVnodes;
// }
if (pMsg->msgType == TSDB_MSG_TYPE_RETRIEVE) {
queuedMsgNum = 0;
}
while (leftLen > 0) {
// todo: parse head, get vgId, contLen
SMsgHead *pHead = (SMsgHead *) pCont;
pHead->vgId = 1; //htonl(pHead->vgId);
pHead->contLen = pMsg->contLen; //htonl(pHead->contLen);
// get pVnode from vgId
void *pVnode = dnodeGetVnode(vgId);
void *pVnode = dnodeGetVnode(pHead->vgId);
if (pVnode == NULL) {
leftLen -= pHead->contLen;
pCont -= pHead->contLen;
continue;
}
......@@ -96,7 +102,7 @@ void dnodeRead(SRpcMsg *pMsg) {
SReadMsg readMsg;
readMsg.rpcMsg = *pMsg;
readMsg.pCont = pCont;
readMsg.contLen = contLen;
readMsg.contLen = pHead->contLen;
readMsg.pRpcContext = pRpcContext;
readMsg.pVnode = pVnode;
......@@ -104,11 +110,23 @@ void dnodeRead(SRpcMsg *pMsg) {
taosWriteQitem(queue, &readMsg);
// next vnode
leftLen -= contLen;
pCont -= contLen;
leftLen -= pHead->contLen;
pCont -= pHead->contLen;
queuedMsgNum++;
dnodeReleaseVnode(pVnode);
}
if (queuedMsgNum == 0) {
SRpcMsg rpcRsp = {
.handle = pMsg->handle,
.pCont = NULL,
.contLen = 0,
.code = TSDB_CODE_INVALID_VGROUP_ID,
.msgType = 0
};
rpcSendResponse(&rpcRsp);
}
}
void *dnodeAllocateReadWorker() {
......
......@@ -93,21 +93,18 @@ void dnodeWrite(SRpcMsg *pMsg) {
char *pCont = (char *) pMsg->pCont;
SRpcContext *pRpcContext = NULL;
int32_t numOfVnodes = 0;
if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) {
// TODO parse head, get number of vnodes;
numOfVnodes = 1;
} else {
numOfVnodes = 1;
}
if (numOfVnodes > 1) {
pRpcContext = calloc(sizeof(SRpcContext), 1);
pRpcContext->numOfVnodes = numOfVnodes;
if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT || pMsg->msgType == TSDB_MSG_TYPE_MD_DROP_STABLE) {
SMsgDesc *pDesc = pCont;
pDesc->numOfVnodes = htonl(pDesc->numOfVnodes);
pCont += sizeof(SMsgDesc);
if (pDesc->numOfVnodes > 1) {
pRpcContext = calloc(sizeof(SRpcContext), 1);
pRpcContext->numOfVnodes = pDesc->numOfVnodes;
}
}
while (leftLen > 0) {
SWriteMsgHead *pHead = (SWriteMsgHead *) pCont;
SMsgHead *pHead = (SMsgHead *) pCont;
pHead->vgId = htonl(pHead->vgId);
pHead->contLen = htonl(pHead->contLen);
......@@ -291,26 +288,9 @@ static void dnodeProcessSubmitMsg(SWriteMsg *pMsg) {
static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg) {
SMDCreateTableMsg *pTable = pMsg->rpcMsg.pCont;
dTrace("table:%s, start to create in dnode, vgroup:%d", pTable->tableId, pTable->vgId);
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
void *pVnode = dnodeGetVnode(pTable->vgId);
if (pVnode == NULL) {
rpcRsp.code = TSDB_CODE_INVALID_VGROUP_ID;
dTrace("table:%s, failed to create in vgroup:%d, reason:%s", pTable->tableId, pTable->vgId, tstrerror(rpcRsp.code));
rpcSendResponse(&rpcRsp);
return;
}
void *pTsdb = dnodeGetVnodeTsdb(pVnode);
if (pTsdb == NULL) {
dnodeReleaseVnode(pVnode);
rpcRsp.code = TSDB_CODE_NOT_ACTIVE_VNODE;
dTrace("table:%s, failed to create in vgroup:%d, reason:%s", pTable->tableId, pTable->vgId, tstrerror(rpcRsp.code));
rpcSendResponse(&rpcRsp);
return;
}
dTrace("table:%s, start to create in dnode, vgroup:%d", pTable->tableId, pTable->vgId);
pTable->numOfColumns = htons(pTable->numOfColumns);
pTable->numOfTags = htons(pTable->numOfTags);
pTable->sid = htonl(pTable->sid);
......@@ -342,9 +322,8 @@ static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg) {
for (int i = pTable->numOfColumns; i < totalCols; i++) {
tdSchemaAppendCol(pDestTagSchema, pSchema[i].type, pSchema[i].colId, pSchema[i].bytes);
}
tsdbTableSetSchema(&tCfg, pDestTagSchema, false);
tsdbTableSetTagSchema(&tCfg, pDestTagSchema, false);
// TODO: add data row
char *pTagData = pTable->data + totalCols * sizeof(SSchema);
int accumBytes = 0;
SDataRow dataRow = tdNewDataRowFromSchema(pDestTagSchema);
......@@ -356,49 +335,107 @@ static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg) {
tsdbTableSetTagValue(&tCfg, dataRow, false);
}
void *pTsdb = dnodeGetVnodeTsdb(pMsg->pVnode);
rpcRsp.code = tsdbCreateTable(pTsdb, &tCfg);
dnodeReleaseVnode(pVnode);
dnodeReleaseVnode(pMsg->pVnode);
if (rpcRsp.code != TSDB_CODE_SUCCESS) {
dError("table:%s, failed to create in vgroup:%d, reason:%s", pTable->tableId, pTable->vgId, tstrerror(rpcRsp.code));
rpcSendResponse(&rpcRsp);
} else {
dTrace("table:%s, created in dnode", pTable->tableId);
rpcSendResponse(&rpcRsp);
}
dTrace("table:%s, create table result:%s", pTable->tableId, tstrerror(rpcRsp.code));
rpcSendResponse(&rpcRsp);
}
static void dnodeProcessDropTableMsg(SWriteMsg *pMsg) {
SMDDropTableMsg *pTable = pMsg->rpcMsg.pCont;
dPrint("table:%s, sid:%d is dropped", pTable->tableId, pTable->sid);
// pTable->sid = htonl(pTable->sid);
// pTable->numOfVPeers = htonl(pTable->numOfVPeers);
// pTable->uid = htobe64(pTable->uid);
//
// for (int i = 0; i < pTable->numOfVPeers; ++i) {
// pTable->vpeerDesc[i].ip = htonl(pTable->vpeerDesc[i].ip);
// pTable->vpeerDesc[i].vnode = htonl(pTable->vpeerDesc[i].vnode);
// }
//
// int32_t code = dnodeDropTable(pTable);
//
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
dTrace("table:%s, start to drop in dnode, vgroup:%d", pTable->tableId, pTable->vgId);
STableId tableId = {
.uid = htobe64(pTable->uid),
.tid = htonl(pTable->sid)
};
void *pTsdb = dnodeGetVnodeTsdb(pMsg->pVnode);
rpcRsp.code = tsdbDropTable(pTsdb, tableId);
dnodeReleaseVnode(pMsg->pVnode);
dTrace("table:%s, drop table result:%s", pTable->tableId, tstrerror(rpcRsp.code));
rpcSendResponse(&rpcRsp);
}
static void dnodeProcessAlterTableMsg(SWriteMsg *pMsg) {
SMDCreateTableMsg *pTable = pMsg->rpcMsg.pCont;
dPrint("table:%s, sid:%d is alterd", pTable->tableId, pTable->sid);
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
dTrace("table:%s, start to alter in dnode, vgroup:%d", pTable->tableId, pTable->vgId);
pTable->numOfColumns = htons(pTable->numOfColumns);
pTable->numOfTags = htons(pTable->numOfTags);
pTable->sid = htonl(pTable->sid);
pTable->sversion = htonl(pTable->sversion);
pTable->tagDataLen = htonl(pTable->tagDataLen);
pTable->sqlDataLen = htonl(pTable->sqlDataLen);
pTable->uid = htobe64(pTable->uid);
pTable->superTableUid = htobe64(pTable->superTableUid);
pTable->createdTime = htobe64(pTable->createdTime);
SSchema *pSchema = (SSchema *) pTable->data;
int totalCols = pTable->numOfColumns + pTable->numOfTags;
for (int i = 0; i < totalCols; i++) {
pSchema[i].colId = htons(pSchema[i].colId);
pSchema[i].bytes = htons(pSchema[i].bytes);
}
STableCfg tCfg;
tsdbInitTableCfg(&tCfg, pTable->tableType, pTable->uid, pTable->sid);
STSchema *pDestSchema = tdNewSchema(pTable->numOfColumns);
for (int i = 0; i < pTable->numOfColumns; i++) {
tdSchemaAppendCol(pDestSchema, pSchema[i].type, pSchema[i].colId, pSchema[i].bytes);
}
tsdbTableSetSchema(&tCfg, pDestSchema, false);
if (pTable->numOfTags != 0) {
STSchema *pDestTagSchema = tdNewSchema(pTable->numOfTags);
for (int i = pTable->numOfColumns; i < totalCols; i++) {
tdSchemaAppendCol(pDestTagSchema, pSchema[i].type, pSchema[i].colId, pSchema[i].bytes);
}
tsdbTableSetSchema(&tCfg, pDestTagSchema, false);
char *pTagData = pTable->data + totalCols * sizeof(SSchema);
int accumBytes = 0;
SDataRow dataRow = tdNewDataRowFromSchema(pDestTagSchema);
for (int i = 0; i < pTable->numOfTags; i++) {
tdAppendColVal(dataRow, pTagData + accumBytes, pDestTagSchema->columns + i);
accumBytes += pSchema[i + pTable->numOfColumns].bytes;
}
tsdbTableSetTagValue(&tCfg, dataRow, false);
}
void *pTsdb = dnodeGetVnodeTsdb(pMsg->pVnode);
rpcRsp.code = tsdbAlterTable(pTsdb, &tCfg);
dnodeReleaseVnode(pMsg->pVnode);
dTrace("table:%s, alter table result:%s", pTable->tableId, tstrerror(rpcRsp.code));
rpcSendResponse(&rpcRsp);
}
static void dnodeProcessDropStableMsg(SWriteMsg *pMsg) {
SMDDropSTableMsg *pTable = pMsg->rpcMsg.pCont;
dPrint("stable:%s, is dropped", pTable->tableId);
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
dTrace("stable:%s, start to it drop in dnode, vgroup:%d", pTable->tableId, pTable->vgId);
pTable->uid = htobe64(pTable->uid);
// TODO: drop stable in vvnode
//void *pTsdb = dnodeGetVnodeTsdb(pMsg->pVnode);
//rpcRsp.code = tsdbDropSTable(pTsdb, pTable->uid);
rpcRsp.code = TSDB_CODE_SUCCESS;
dnodeReleaseVnode(pMsg->pVnode);
dTrace("stable:%s, drop stable result:%s", pTable->tableId, tstrerror(rpcRsp.code));
rpcSendResponse(&rpcRsp);
}
......@@ -39,7 +39,7 @@ extern "C" {
#include "ttimer.h"
#include "tutil.h"
typedef struct {
typedef struct {
uint32_t privateIp;
int32_t sid;
uint32_t moduleStatus;
......@@ -77,6 +77,7 @@ extern "C" {
} SDnodeObj;
typedef struct {
int32_t dnodeId;
uint32_t ip;
uint32_t publicIp;
int32_t vnode;
......@@ -97,6 +98,7 @@ struct _vg_obj;
typedef struct SSuperTableObj {
char tableId[TSDB_TABLE_ID_LEN + 1];
int8_t type;
int8_t dirty;
uint64_t uid;
int32_t sid;
int32_t vgId;
......@@ -104,7 +106,7 @@ typedef struct SSuperTableObj {
int32_t sversion;
int32_t numOfColumns;
int32_t numOfTags;
int8_t reserved[7];
int8_t reserved[5];
int8_t updateEnd[1];
int32_t numOfTables;
int16_t nextColId;
......@@ -114,12 +116,13 @@ typedef struct SSuperTableObj {
typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1];
int8_t type;
int8_t dirty;
uint64_t uid;
int32_t sid;
int32_t vgId;
int64_t createdTime;
char superTableId[TSDB_TABLE_ID_LEN + 1];
int8_t reserved[7];
int8_t reserved[1];
int8_t updateEnd[1];
SSuperTableObj *superTable;
} SChildTableObj;
......@@ -127,13 +130,14 @@ typedef struct {
typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1];
int8_t type;
int8_t dirty;
uint64_t uid;
int32_t sid;
int32_t vgId;
int64_t createdTime;
int32_t sversion;
int32_t numOfColumns;
int16_t sqlLen;
int32_t sqlLen;
int8_t reserved[3];
int8_t updateEnd[1];
char* sql; //null-terminated string
......@@ -162,6 +166,7 @@ typedef struct _vg_obj {
typedef struct _db_obj {
char name[TSDB_DB_NAME_LEN + 1];
int8_t dirty;
int64_t createdTime;
SDbCfg cfg;
int8_t dropStatus;
......@@ -171,10 +176,8 @@ typedef struct _db_obj {
int32_t numOfVgroups;
int32_t numOfTables;
int32_t numOfSuperTables;
int32_t vgStatus;
SVgObj *pHead; // empty vgroup first
SVgObj *pTail; // empty vgroup end
void * vgTimer;
SVgObj *pHead;
SVgObj *pTail;
} SDbObj;
struct _acctObj;
......
......@@ -234,10 +234,14 @@ typedef struct {
uint32_t ip;
} SVnodeDesc;
typedef struct {
int32_t numOfVnodes;
} SMsgDesc;
typedef struct {
int32_t contLen;
int32_t vgId;
} SWriteMsgHead;
} SMsgHead;
typedef struct {
int32_t contLen;
......@@ -264,7 +268,8 @@ typedef struct {
int16_t numOfTags;
int16_t numOfColumns;
int16_t sqlLen; // the length of SQL, it starts after schema , sql is a null-terminated string
int16_t reserved[16];
int32_t contLen;
int8_t reserved[16];
char schema[];
} SCMCreateTableMsg;
......@@ -332,22 +337,22 @@ typedef struct {
} SMgmtHead;
typedef struct {
int32_t contLen;
int32_t vgId;
int32_t sid;
int32_t numOfVPeers;
uint64_t uid;
SVnodeDesc vpeerDesc[TSDB_MAX_MPEERS];
char tableId[TSDB_TABLE_ID_LEN + 1];
} SMDDropTableMsg;
typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1];
int32_t contLen;
int32_t vgId;
int64_t uid;
char tableId[TSDB_TABLE_ID_LEN + 1];
} SMDDropSTableMsg;
typedef struct {
int32_t vgId;
int32_t vnode;
int32_t vgId;
} SMDDropVnodeMsg;
typedef struct SColIndexEx {
......
......@@ -30,13 +30,16 @@ int32_t mgmtInitChildTables();
void mgmtCleanUpChildTables();
void * mgmtGetChildTable(char *tableId);
int32_t mgmtCreateChildTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid,
SMDCreateTableMsg **pDCreateOut, STableInfo **pTableOut);
int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable);
void *mgmtCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid);
void *mgmtBuildCreateChildTableMsg(SCMCreateTableMsg *pCreate, SChildTableObj *pTable);
int32_t mgmtDropChildTable(SQueuedMsg *newMsg, SChildTableObj *pTable);
int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent);
int32_t mgmtGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STableMeta *pMeta, bool usePublicIp);
void mgmtDropAllChildTables(SDbObj *pDropDb);
#ifdef __cplusplus
}
#endif
......
......@@ -28,14 +28,17 @@ int32_t mgmtInitNormalTables();
void mgmtCleanUpNormalTables();
void * mgmtGetNormalTable(char *tableId);
int32_t mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid,
SMDCreateTableMsg **pDCreateOut, STableInfo **pTableOut);
int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable);
void * mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid);
void * mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable);
int32_t mgmtDropNormalTable(SQueuedMsg *newMsg, SNormalTableObj *pTable);
int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int32_t ncols);
int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *colName);
int32_t mgmtGetNormalTableMeta(SDbObj *pDb, SNormalTableObj *pTable, STableMeta *pMeta, bool usePublicIp);
void mgmtDropAllNormalTables(SDbObj *pDropDb);
#ifdef __cplusplus
}
#endif
......
......@@ -31,8 +31,8 @@ void mgmtCleanUpSuperTables();
void * mgmtGetSuperTable(char *tableId);
int32_t mgmtCreateSuperTable(SDbObj *pDb, SCMCreateTableMsg *pCreate);
int32_t mgmtDropSuperTable(SDbObj *pDb, SSuperTableObj *pTable);
int32_t mgmtCreateSuperTable(SCMCreateTableMsg *pCreate);
int32_t mgmtDropSuperTable(SQueuedMsg *newMsg, SDbObj *pDb, SSuperTableObj *pTable);
int32_t mgmtAddSuperTableTag(SSuperTableObj *pTable, SSchema schema[], int32_t ntags);
int32_t mgmtDropSuperTableTag(SSuperTableObj *pTable, char *tagName);
int32_t mgmtModifySuperTableTagNameByName(SSuperTableObj *pTable, char *oldTagName, char *newTagName);
......@@ -45,6 +45,8 @@ void * mgmtGetSuperTableVgroup(SSuperTableObj *pStable);
int32_t mgmtFindSuperTableTagIndex(SSuperTableObj *pTable, const char *tagName);
int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable);
void mgmtDropAllSuperTables(SDbObj *pDropDb);
#ifdef __cplusplus
}
#endif
......
......@@ -32,16 +32,9 @@ STableInfo* mgmtGetTable(char *tableId);
STableInfo* mgmtGetTableByPos(uint32_t dnodeIp, int32_t vnode, int32_t sid);
int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMeta *pMeta, bool usePublicIp);
int32_t mgmtRetrieveMetricMeta(void *pConn, char **pStart, SSuperTableMetaMsg *pInfo);
int32_t mgmtDropTable(SDbObj *pDb, char *tableId, int32_t ignore);
int32_t mgmtAlterTable(SDbObj *pDb, SCMAlterTableMsg *pAlter);
void mgmtAddTableIntoSuperTable(SSuperTableObj *pStable);
void mgmtRemoveTableFromSuperTable(SSuperTableObj *pStable);
SMDDropTableMsg *mgmtBuildRemoveTableMsg(STableInfo *pTable);
SMDDropSTableMsg *mgmtBuildRemoveSuperTableMsg(STableInfo *pTable);
#ifdef __cplusplus
}
#endif
......
......@@ -30,7 +30,7 @@ SVgObj *mgmtGetVgroup(int32_t vgId);
SVgObj *mgmtGetVgroupByVnode(uint32_t dnode, int32_t vnode);
void mgmtCreateVgroup(SQueuedMsg *pMsg);
int32_t mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup);
void mgmtDropVgroup(SVgObj *pVgroup, void *ahandle);
void mgmtUpdateVgroup(SVgObj *pVgroup);
void mgmtSetVgroupIdPool();
......@@ -41,7 +41,6 @@ void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, STableInfo *pTable);
SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode);
void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *ahandle);
void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle);
SRpcIpSet mgmtGetIpSetFromVgroup(SVgObj *pVgroup);
SRpcIpSet mgmtGetIpSetFromIp(uint32_t ip);
......
......@@ -123,7 +123,6 @@ void *mgmtChildTableActionDelete(void *row, char *str, int32_t size, int32_t *ss
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
if (pVgroup == NULL) {
mError("id:%s not in vgroup:%d", pTable->tableId, pTable->vgId);
return NULL;
}
......@@ -272,19 +271,22 @@ void mgmtCleanUpChildTables() {
sdbCloseTable(tsChildTableSdb);
}
static void *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgroup, void *pTagData, int32_t tagDataLen) {
int32_t totalCols = pTable->superTable->numOfColumns + pTable->superTable->numOfTags;
int32_t contLen = sizeof(SMDCreateTableMsg) + totalCols * sizeof(SSchema) + tagDataLen;
void *mgmtBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableObj *pTable) {
char *pTagData = pMsg->schema + TSDB_TABLE_ID_LEN + 1;
int32_t tagDataLen = htonl(pMsg->contLen) - sizeof(SCMCreateTableMsg) - TSDB_TABLE_ID_LEN - 1;
int32_t totalCols = pTable->superTable->numOfColumns + pTable->superTable->numOfTags;
int32_t contLen = sizeof(SMDCreateTableMsg) + totalCols * sizeof(SSchema) + tagDataLen;
SMDCreateTableMsg *pCreate = rpcMallocCont(contLen);
if (pCreate == NULL) {
terrno = TSDB_CODE_SERV_OUT_OF_MEMORY;
return NULL;
}
memcpy(pCreate->tableId, pTable->tableId, TSDB_TABLE_ID_LEN);
memcpy(pCreate->superTableId, pTable->superTable->tableId, TSDB_TABLE_ID_LEN);
memcpy(pCreate->tableId, pTable->tableId, TSDB_TABLE_ID_LEN + 1);
memcpy(pCreate->superTableId, pTable->superTable->tableId, TSDB_TABLE_ID_LEN + 1);
pCreate->contLen = htonl(contLen);
pCreate->vgId = htonl(pVgroup->vgId);
pCreate->vgId = htonl(pTable->vgId);
pCreate->tableType = pTable->type;
pCreate->numOfColumns = htons(pTable->superTable->numOfColumns);
pCreate->numOfTags = htons(pTable->superTable->numOfTags);
......@@ -305,103 +307,85 @@ static void *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgrou
}
memcpy(pCreate->data + totalCols * sizeof(SSchema), pTagData, tagDataLen);
return pCreate;
}
int32_t mgmtCreateChildTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid,
SMDCreateTableMsg **pMDCreateOut, STableInfo **pTableOut) {
void* mgmtCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t tid) {
int32_t numOfTables = sdbGetNumOfRows(tsChildTableSdb);
if (numOfTables >= tsMaxTables) {
mError("table:%s, numOfTables:%d exceed maxTables:%d", pCreate->tableId, numOfTables, tsMaxTables);
return TSDB_CODE_TOO_MANY_TABLES;
terrno = TSDB_CODE_TOO_MANY_TABLES;
return NULL;
}
char *pTagData = (char *) pCreate->schema; // it is a tag key
SSuperTableObj *pSuperTable = mgmtGetSuperTable(pTagData);
if (pSuperTable == NULL) {
mError("table:%s, corresponding super table does not exist", pCreate->tableId);
return TSDB_CODE_INVALID_TABLE;
terrno = TSDB_CODE_INVALID_TABLE;
return NULL;
}
SChildTableObj *pTable = (SChildTableObj *) calloc(sizeof(SChildTableObj), 1);
if (pTable == NULL) {
mError("table:%s, failed to alloc memory", pCreate->tableId);
return TSDB_CODE_SERV_OUT_OF_MEMORY;
terrno = TSDB_CODE_SERV_OUT_OF_MEMORY;
return NULL;
}
strcpy(pTable->tableId, pCreate->tableId);
strcpy(pTable->superTableId, pSuperTable->tableId);
pTable->type = TSDB_CHILD_TABLE;
pTable->createdTime = taosGetTimestampMs();
pTable->uid = (((uint64_t) pTable->vgId) << 40) + ((((uint64_t) pTable->sid) & ((1ul << 24) - 1ul)) << 16) +
((uint64_t) sdbGetVersion() & ((1ul << 16) - 1ul));
pTable->sid = sid;
pTable->sid = tid;
pTable->vgId = pVgroup->vgId;
pTable->createdTime = taosGetTimestampMs();
pTable->superTable = pSuperTable;
if (sdbInsertRow(tsChildTableSdb, pTable, 0) < 0) {
free(pTable);
mError("table:%s, update sdb error", pCreate->tableId);
return TSDB_CODE_SDB_ERROR;
}
pTagData += (TSDB_TABLE_ID_LEN + 1);
int32_t tagDataLen = contLen - sizeof(SCMCreateTableMsg) - TSDB_TABLE_ID_LEN - 1;
*pMDCreateOut = mgmtBuildCreateChildTableMsg(pTable, pVgroup, pTagData, tagDataLen);
if (*pMDCreateOut == NULL) {
mError("table:%s, failed to build create table message", pCreate->tableId);
return TSDB_CODE_SERV_OUT_OF_MEMORY;
terrno = TSDB_CODE_SDB_ERROR;
return NULL;
}
*pTableOut = (STableInfo *) pTable;
mTrace("table:%s, create ctable in vgroup, uid:%" PRIu64 , pTable->tableId, pTable->uid);
return TSDB_CODE_SUCCESS;
return pTable;
}
int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable) {
int32_t mgmtDropChildTable(SQueuedMsg *newMsg, SChildTableObj *pTable) {
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
if (pVgroup == NULL) {
mError("table:%s, failed to drop child table, vgroup not exist", pTable->tableId);
return TSDB_CODE_OTHERS;
}
SMDDropTableMsg *pRemove = rpcMallocCont(sizeof(SMDDropTableMsg));
if (pRemove == NULL) {
SMDDropTableMsg *pDrop = rpcMallocCont(sizeof(SMDDropTableMsg));
if (pDrop == NULL) {
mError("table:%s, failed to drop child table, no enough memory", pTable->tableId);
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
strcpy(pRemove->tableId, pTable->tableId);
pRemove->sid = htonl(pTable->sid);
pRemove->uid = htobe64(pTable->uid);
pRemove->numOfVPeers = htonl(pVgroup->numOfVnodes);
for (int i = 0; i < pVgroup->numOfVnodes; ++i) {
pRemove->vpeerDesc[i].ip = htonl(pVgroup->vnodeGid[i].ip);
pRemove->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode);
}
strcpy(pDrop->tableId, pTable->tableId);
pDrop->vgId = htonl(pTable->vgId);
pDrop->contLen = htonl(sizeof(SMDDropTableMsg));
pDrop->sid = htonl(pTable->sid);
pDrop->uid = htobe64(pTable->uid);
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup);
mTrace("table:%s, send drop table msg", pRemove->tableId);
mTrace("table:%s, send drop table msg", pDrop->tableId);
SRpcMsg rpcMsg = {
.handle = 0,
.pCont = pRemove,
.handle = newMsg,
.pCont = pDrop,
.contLen = sizeof(SMDDropTableMsg),
.code = 0,
.msgType = TSDB_MSG_TYPE_MD_DROP_TABLE
};
mgmtSendMsgToDnode(&ipSet, &rpcMsg);
if (sdbDeleteRow(tsChildTableSdb, pTable) < 0) {
mError("table:%s, update ctables sdb error", pTable->tableId);
return TSDB_CODE_SDB_ERROR;
}
if (pVgroup->numOfTables <= 0) {
mgmtDropVgroup(pDb, pVgroup);
}
newMsg->ahandle = pTable;
mgmtSendMsgToDnode(&ipSet, &rpcMsg);
return TSDB_CODE_SUCCESS;
}
......@@ -477,13 +461,37 @@ int32_t mgmtGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STableMeta *p
for (int32_t i = 0; i < TSDB_VNODES_SUPPORT; ++i) {
if (usePublicIp) {
pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].publicIp;
pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode);
} else {
pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].ip;
pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode);
}
pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode);
pMeta->vpeerDesc[i].vgId = htonl(pVgroup->vgId);
}
pMeta->numOfVpeers = pVgroup->numOfVnodes;
return TSDB_CODE_SUCCESS;
}
void mgmtDropAllChildTables(SDbObj *pDropDb) {
void *pNode = NULL;
void *pLastNode = NULL;
int32_t numOfTables = 0;
int32_t dbNameLen = strlen(pDropDb->name);
SChildTableObj *pTable = NULL;
while (1) {
pNode = sdbFetchRow(tsChildTableSdb, pNode, (void **)&pTable);
if (pTable == NULL) {
break;
}
if (strncmp(pDropDb->name, pTable->tableId, dbNameLen) == 0) {
sdbDeleteRow(tsChildTableSdb, pTable);
pNode = pLastNode;
numOfTables ++;
continue;
}
}
mTrace("db:%s, all child tables:%d is dropped", pDropDb->name, numOfTables);
}
\ No newline at end of file
......@@ -27,6 +27,9 @@
#include "mgmtMnode.h"
#include "mgmtGrant.h"
#include "mgmtShell.h"
#include "mgmtNormalTable.h"
#include "mgmtChildTable.h"
#include "mgmtSuperTable.h"
#include "mgmtTable.h"
#include "mgmtUser.h"
#include "mgmtVgroup.h"
......@@ -34,10 +37,9 @@
static void *tsDbSdb = NULL;
static int32_t tsDbUpdateSize;
static int32_t mgmtUpdateDb(SDbObj *pDb);
static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate);
static int32_t mgmtDropDbByName(SAcctObj *pAcct, char *name, short ignoreNotExists);
static int32_t mgmtDropDb(SDbObj *pDb);
static void mgmtDropDb(void *handle, void *tmrId);
static void mgmtSetDbDirty(SDbObj *pDb);
static int32_t mgmtGetDbMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *pConn);
......@@ -60,7 +62,7 @@ static void mgmtDbActionInit() {
mgmtDbActionFp[SDB_TYPE_UPDATE] = mgmtDbActionUpdate;
mgmtDbActionFp[SDB_TYPE_ENCODE] = mgmtDbActionEncode;
mgmtDbActionFp[SDB_TYPE_DECODE] = mgmtDbActionDecode;
mgmtDbActionFp[SDB_TYPE_RESET] = mgmtDbActionReset;
mgmtDbActionFp[SDB_TYPE_RESET] = mgmtDbActionReset;
mgmtDbActionFp[SDB_TYPE_DESTROY] = mgmtDbActionDestroy;
}
......@@ -98,8 +100,6 @@ int32_t mgmtInitDbs() {
pDb->numOfTables = 0;
pDb->numOfVgroups = 0;
pDb->numOfSuperTables = 0;
pDb->vgStatus = TSDB_VG_STATUS_READY;
pDb->vgTimer = NULL;
pAcct = mgmtGetAcct(pDb->cfg.acct);
if (pAcct != NULL)
mgmtAddDbIntoAcct(pAcct, pDb);
......@@ -293,135 +293,6 @@ static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate) {
return code;
}
static int32_t mgmtUpdateDb(SDbObj *pDb) {
return sdbUpdateRow(tsDbSdb, pDb, tsDbUpdateSize, 1);
}
static int32_t mgmtSetDbDropping(SDbObj *pDb) {
if (pDb->dropStatus == TSDB_DB_STATUS_DROP_FROM_SDB) return 0;
SVgObj *pVgroup = pDb->pHead;
while (pVgroup != NULL) {
for (int32_t i = 0; i < pVgroup->numOfVnodes; i++) {
SVnodeGid *pVnodeGid = pVgroup->vnodeGid + i;
SDnodeObj *pDnode = mgmtGetDnode(pVnodeGid->ip);
if (pDnode == NULL) continue;
SVnodeLoad *pVload = &pDnode->vload[pVnodeGid->vnode];
if (pVload->dropStatus != TSDB_VN_DROP_STATUS_DROPPING) {
pVload->dropStatus = TSDB_VN_DROP_STATUS_DROPPING;
mPrint("dnode:%s vnode:%d db:%s set to dropping status", taosIpStr(pDnode->privateIp), pVnodeGid->vnode, pDb->name);
if (mgmtUpdateDnode(pDnode) < 0) {
mError("db:%s drop failed, dnode sdb update error", pDb->name);
return TSDB_CODE_SDB_ERROR;
}
}
}
//void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle) {
// mTrace("vgroup:%d send free vgroup msg, ahandle:%p", pVgroup->vgId, ahandle);
//
// for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
// SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip);
// mgmtSendDropVnodeMsg(pVgroup->vnodeGid[i].vnode, &ipSet, ahandle);
// }
//}
//
// mgmtSendDropVgroupMsg(pVgroup);
pVgroup = pVgroup->next;
}
if (pDb->dropStatus == TSDB_DB_STATUS_DROPPING) return 0;
pDb->dropStatus = TSDB_DB_STATUS_DROPPING;
if (mgmtUpdateDb(pDb) < 0) {
mError("db:%s drop failed, db sdb update error", pDb->name);
return TSDB_CODE_SDB_ERROR;
}
mPrint("db:%s set to dropping status", pDb->name);
return 0;
}
static bool mgmtCheckDropDbFinished(SDbObj *pDb) {
SVgObj *pVgroup = pDb->pHead;
while (pVgroup) {
for (int32_t i = 0; i < pVgroup->numOfVnodes; i++) {
SVnodeGid *pVnodeGid = pVgroup->vnodeGid + i;
SDnodeObj *pDnode = mgmtGetDnode(pVnodeGid->ip);
if (pDnode == NULL) continue;
if (pDnode->status == TSDB_DN_STATUS_OFFLINE) continue;
SVnodeLoad *pVload = &pDnode->vload[pVnodeGid->vnode];
if (pVload->dropStatus == TSDB_VN_DROP_STATUS_DROPPING) {
mTrace("dnode:%s, vnode:%d db:%s wait dropping", taosIpStr(pDnode->privateIp), pVnodeGid->vnode, pDb->name);
return false;
}
}
pVgroup = pVgroup->next;
}
mPrint("db:%s all vnodes drop finished", pDb->name);
return true;
}
static void mgmtDropDbFromSdb(SDbObj *pDb) {
while (pDb->pHead) mgmtDropVgroup(pDb, pDb->pHead);
// SSuperTableObj *pMetric = pDb->pSTable;
// while (pMetric) {
// SSuperTableObj *pNext = pMetric->next;
// mgmtDropTable(pDb, pMetric->tableId, 0);
// pMetric = pNext;
// }
mPrint("db:%s all meters drop finished", pDb->name);
sdbDeleteRow(tsDbSdb, pDb);
mPrint("db:%s database drop finished", pDb->name);
}
static int32_t mgmtDropDb(SDbObj *pDb) {
if (pDb->dropStatus == TSDB_DB_STATUS_DROPPING) {
bool finished = mgmtCheckDropDbFinished(pDb);
if (!finished) {
SVgObj *pVgroup = pDb->pHead;
while (pVgroup != NULL) {
//mgmtSendDropVgroupMsg(pVgroup, NULL);
pVgroup = pVgroup->next;
}
return TSDB_CODE_ACTION_IN_PROGRESS;
}
// don't sync this action
pDb->dropStatus = TSDB_DB_STATUS_DROP_FROM_SDB;
mgmtDropDbFromSdb(pDb);
return 0;
} else {
int32_t code = mgmtSetDbDropping(pDb);
if (code != 0) return code;
return TSDB_CODE_ACTION_IN_PROGRESS;
}
}
UNUSED_FUNC
static int32_t mgmtDropDbByName(SAcctObj *pAcct, char *name, short ignoreNotExists) {
SDbObj *pDb = (SDbObj *)sdbGetRow(tsDbSdb, name);
if (pDb == NULL) {
if (ignoreNotExists) return TSDB_CODE_SUCCESS;
mWarn("db:%s is not there", name);
return TSDB_CODE_INVALID_DB;
}
if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) {
return TSDB_CODE_MONITOR_DB_FORBIDDEN;
}
return mgmtDropDb(pDb);
}
bool mgmtCheckIsMonitorDB(char *db, char *monitordb) {
char dbName[TSDB_DB_NAME_LEN + 1] = {0};
extractDBName(db, dbName);
......@@ -430,20 +301,6 @@ bool mgmtCheckIsMonitorDB(char *db, char *monitordb) {
return (strncasecmp(dbName, monitordb, len) == 0 && len == strlen(monitordb));
}
UNUSED_FUNC
static void mgmtMonitorDbDrop(void *unused, void *unusedt) {
void * pNode = NULL;
SDbObj *pDb = NULL;
while (1) {
pNode = sdbFetchRow(tsDbSdb, pNode, (void **)&pDb);
if (pDb == NULL) break;
if (pDb->dropStatus != TSDB_DB_STATUS_DROPPING) continue;
mgmtDropDb(pDb);
break;
}
}
static int32_t mgmtAlterDb(SAcctObj *pAcct, SCMAlterDbMsg *pAlter) {
return 0;
// int32_t code = TSDB_CODE_SUCCESS;
......@@ -840,7 +697,6 @@ void *mgmtDbActionInsert(void *row, char *str, int32_t size, int32_t *ssize) {
pDb->pTail = NULL;
pDb->numOfVgroups = 0;
pDb->numOfTables = 0;
pDb->vgTimer = NULL;
mgmtAddDbIntoAcct(pAcct, pDb);
return NULL;
......@@ -851,6 +707,10 @@ void *mgmtDbActionDelete(void *row, char *str, int32_t size, int32_t *ssize) {
SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct);
mgmtRemoveDbFromAcct(pAcct, pDb);
mgmtDropAllNormalTables(pDb);
mgmtDropAllChildTables(pDb);
mgmtDropAllSuperTables(pDb);
return NULL;
}
......@@ -906,6 +766,10 @@ void mgmtRemoveTableFromDb(SDbObj *pDb) {
atomic_add_fetch_32(&pDb->numOfTables, -1);
}
static void mgmtSetDbDirty(SDbObj *pDb) {
pDb->dirty = true;
}
static void mgmtProcessCreateDbMsg(SQueuedMsg *pMsg) {
if (mgmtCheckRedirect(pMsg->thandle)) return;
......@@ -919,7 +783,6 @@ static void mgmtProcessCreateDbMsg(SQueuedMsg *pMsg) {
pCreate->commitTime = htonl(pCreate->commitTime);
pCreate->blocksPerTable = htons(pCreate->blocksPerTable);
pCreate->rowsInFileBlock = htonl(pCreate->rowsInFileBlock);
// pCreate->cacheNumOfBlocks = htonl(pCreate->cacheNumOfBlocks);
int32_t code;
if (mgmtCheckExpired()) {
......@@ -957,22 +820,76 @@ static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg) {
mgmtSendSimpleResp(pMsg->thandle, code);
}
static void mgmtDropDb(void *handle, void *tmrId) {
SQueuedMsg *newMsg = handle;
SDbObj *pDb = newMsg->ahandle;
mPrint("db:%s, drop db from sdb", pDb->name);
int32_t code = sdbDeleteRow(tsDbSdb, pDb);
if (code != 0) {
code = TSDB_CODE_SDB_ERROR;
}
mgmtSendSimpleResp(newMsg->thandle, code);
rpcFreeCont(newMsg->pCont);
free(newMsg);
}
static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) {
if (mgmtCheckRedirect(pMsg->thandle)) return;
int32_t code;
if (pMsg->pUser->superAuth) {
code = TSDB_CODE_OPS_NOT_SUPPORT;
//SCMDropDbMsg *pDrop = rpcMsg->pCont;
//rpcRsp.code = mgmtDropDbByName(pUser->pAcct, pDrop->db, pDrop->ignoreNotExists);
//if (rpcRsp.code == TSDB_CODE_SUCCESS) {
// mLPrint("DB:%s is dropped by %s", pDrop->db, pUser->user);
//}
} else {
code = TSDB_CODE_NO_RIGHTS;
SCMDropDbMsg *pDrop = pMsg->pCont;
mTrace("db:%s, drop db msg is received from thandle:%p", pDrop->db, pMsg->thandle);
if (mgmtCheckExpired()) {
mError("db:%s, failed to drop, grant expired", pDrop->db);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_GRANT_EXPIRED);
return;
}
if (!pMsg->pUser->writeAuth) {
mError("db:%s, failed to drop, no rights", pDrop->db);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS);
return;
}
SDbObj *pDb = mgmtGetDb(pDrop->db);
if (pDb == NULL) {
if (pDrop->ignoreNotExists) {
mTrace("db:%s, db is not exist, think drop success", pDrop->db);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SUCCESS);
return;
} else {
mError("db:%s, failed to drop, invalid db", pDrop->db);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_DB);
return;
}
}
if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) {
mError("db:%s, can't drop monitor database", pDrop->db);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_MONITOR_DB_FORBIDDEN);
return;
}
if (code != TSDB_CODE_SUCCESS) {
mgmtSendSimpleResp(pMsg->thandle, code);
mgmtSetDbDirty(pDb);
SQueuedMsg *newMsg = malloc(sizeof(SQueuedMsg));
memcpy(newMsg, pMsg, sizeof(SQueuedMsg));
pMsg->pCont = NULL;
SVgObj *pVgroup = pDb->pHead;
if (pVgroup != NULL) {
mPrint("vgroup:%d, will be dropped", pVgroup->vgId);
newMsg->ahandle = pVgroup;
newMsg->expected = pVgroup->numOfVnodes;
mgmtDropVgroup(pVgroup, newMsg);
return;
}
mTrace("db:%s, all vgroups is dropped", pDb->name);
void *tmpTmr;
newMsg->ahandle = pDb;
taosTmrReset(mgmtDropDb, 10, newMsg, tsMgmtTmr, &tmpTmr);
}
......@@ -125,7 +125,6 @@ void *mgmtNormalTableActionDelete(void *row, char *str, int32_t size, int32_t *s
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
if (pVgroup == NULL) {
mError("id:%s not in vgroup:%d", pTable->tableId, pTable->vgId);
return NULL;
}
......@@ -287,18 +286,19 @@ void mgmtCleanUpNormalTables() {
sdbCloseTable(tsNormalTableSdb);
}
static void *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgroup) {
void *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable) {
int32_t totalCols = pTable->numOfColumns;
int32_t contLen = sizeof(SMDCreateTableMsg) + totalCols * sizeof(SSchema) + pTable->sqlLen;
SMDCreateTableMsg *pCreate = rpcMallocCont(contLen);
if (pCreate == NULL) {
terrno = TSDB_CODE_SERV_OUT_OF_MEMORY;
return NULL;
}
memcpy(pCreate->tableId, pTable->tableId, TSDB_TABLE_ID_LEN + 1);
pCreate->contLen = htonl(contLen);
pCreate->vgId = htonl(pVgroup->vgId);
pCreate->vgId = htonl(pTable->vgId);
pCreate->tableType = pTable->type;
pCreate->numOfColumns = htons(pTable->numOfColumns);
pCreate->numOfTags = 0;
......@@ -319,30 +319,30 @@ static void *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgr
}
memcpy(pCreate + sizeof(SMDCreateTableMsg) + totalCols * sizeof(SSchema), pTable->sql, pTable->sqlLen);
return pCreate;
}
int32_t mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid,
SMDCreateTableMsg **pDCreateOut, STableInfo **pTableOut) {
void *mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid) {
int32_t numOfTables = sdbGetNumOfRows(tsNormalTableSdb);
if (numOfTables >= TSDB_MAX_NORMAL_TABLES) {
mError("table:%s, numOfTables:%d exceed maxTables:%d", pCreate->tableId, numOfTables, TSDB_MAX_NORMAL_TABLES);
return TSDB_CODE_TOO_MANY_TABLES;
terrno = TSDB_CODE_TOO_MANY_TABLES;
return NULL;
}
SNormalTableObj *pTable = (SNormalTableObj *) calloc(sizeof(SNormalTableObj), 1);
if (pTable == NULL) {
mError("table:%s, failed to alloc memory", pCreate->tableId);
return TSDB_CODE_SERV_OUT_OF_MEMORY;
terrno = TSDB_CODE_SERV_OUT_OF_MEMORY;
return NULL;
}
strcpy(pTable->tableId, pCreate->tableId);
pTable->type = TSDB_NORMAL_TABLE;
pTable->vgId = pVgroup->vgId;
pTable->createdTime = taosGetTimestampMs();
pTable->uid = (((uint64_t) pTable->createdTime) << 16) + ((uint64_t) sdbGetVersion() & ((1ul << 16) - 1ul));
pTable->sid = sid;
pTable->createdTime = taosGetTimestampMs();
pTable->sversion = 0;
pTable->numOfColumns = htons(pCreate->numOfColumns);
pTable->sqlLen = htons(pCreate->sqlLen);
......@@ -352,7 +352,8 @@ int32_t mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgOb
pTable->schema = (SSchema *) calloc(1, schemaSize);
if (pTable->schema == NULL) {
free(pTable);
return TSDB_CODE_SERV_OUT_OF_MEMORY;
terrno = TSDB_CODE_SERV_OUT_OF_MEMORY;
return NULL;
}
memcpy(pTable->schema, pCreate->schema, numOfCols * sizeof(SSchema));
......@@ -368,7 +369,8 @@ int32_t mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgOb
pTable->sql = calloc(1, pTable->sqlLen);
if (pTable->sql == NULL) {
free(pTable);
return TSDB_CODE_SERV_OUT_OF_MEMORY;
terrno = TSDB_CODE_SERV_OUT_OF_MEMORY;
return NULL;
}
memcpy(pTable->sql, (char *) (pCreate->schema) + numOfCols * sizeof(SSchema), pTable->sqlLen);
pTable->sql[pTable->sqlLen - 1] = 0;
......@@ -378,65 +380,45 @@ int32_t mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgOb
if (sdbInsertRow(tsNormalTableSdb, pTable, 0) < 0) {
mError("table:%s, update sdb error", pTable->tableId);
free(pTable);
return TSDB_CODE_SDB_ERROR;
}
*pDCreateOut = mgmtBuildCreateNormalTableMsg(pTable, pVgroup);
if (*pDCreateOut == NULL) {
mError("table:%s, failed to build create table message", pTable->tableId);
sdbDeleteRow(tsNormalTableSdb, pTable);
return TSDB_CODE_SERV_OUT_OF_MEMORY;
terrno = TSDB_CODE_SDB_ERROR;
return NULL;
}
*pTableOut = (STableInfo *) pTable;
mTrace("table:%s, create ntable in vgroup, uid:%" PRIu64 , pTable->tableId, pTable->uid);
return TSDB_CODE_SUCCESS;
return pTable;
}
int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable) {
int32_t mgmtDropNormalTable(SQueuedMsg *newMsg, SNormalTableObj *pTable) {
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
if (pVgroup == NULL) {
mError("table:%s, failed to drop normal table, vgroup not exist", pTable->tableId);
return TSDB_CODE_OTHERS;
}
SMDDropTableMsg *pRemove = rpcMallocCont(sizeof(SMDDropTableMsg));
if (pRemove == NULL) {
SMDDropTableMsg *pDrop = rpcMallocCont(sizeof(SMDDropTableMsg));
if (pDrop == NULL) {
mError("table:%s, failed to drop normal table, no enough memory", pTable->tableId);
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
strcpy(pRemove->tableId, pTable->tableId);
pRemove->sid = htonl(pTable->sid);
pRemove->uid = htobe64(pTable->uid);
pRemove->numOfVPeers = htonl(pVgroup->numOfVnodes);
for (int i = 0; i < pVgroup->numOfVnodes; ++i) {
pRemove->vpeerDesc[i].ip = htonl(pVgroup->vnodeGid[i].ip);
pRemove->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode);
}
strcpy(pDrop->tableId, pTable->tableId);
pDrop->contLen = htonl(sizeof(SMDDropTableMsg));
pDrop->vgId = htonl(pVgroup->vgId);
pDrop->sid = htonl(pTable->sid);
pDrop->uid = htobe64(pTable->uid);
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup);
mTrace("table:%s, send drop table msg", pRemove->tableId);
mTrace("table:%s, send drop table msg", pDrop->tableId);
SRpcMsg rpcMsg = {
.handle = 0,
.pCont = pRemove,
.handle = newMsg,
.pCont = pDrop,
.contLen = sizeof(SMDDropTableMsg),
.code = 0,
.msgType = TSDB_MSG_TYPE_MD_DROP_TABLE
};
mgmtSendMsgToDnode(&ipSet, &rpcMsg);
if (sdbDeleteRow(tsNormalTableSdb, pTable) < 0) {
mError("table:%s, update ntables sdb error", pTable->tableId);
return TSDB_CODE_SDB_ERROR;
}
if (pVgroup->numOfTables <= 0) {
mgmtDropVgroup(pDb, pVgroup);
}
newMsg->ahandle = pTable;
mgmtSendMsgToDnode(&ipSet, &rpcMsg);
return TSDB_CODE_SUCCESS;
}
......@@ -557,14 +539,35 @@ int32_t mgmtGetNormalTableMeta(SDbObj *pDb, SNormalTableObj *pTable, STableMeta
for (int32_t i = 0; i < TSDB_VNODES_SUPPORT; ++i) {
if (usePublicIp) {
pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].publicIp;
pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode);
} else {
pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].ip;
pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode);
}
pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode);
pMeta->vpeerDesc[i].vgId = htonl(pVgroup->vgId);
}
pMeta->numOfVpeers = pVgroup->numOfVnodes;
return TSDB_CODE_SUCCESS;
}
void mgmtDropAllNormalTables(SDbObj *pDropDb) {
void *pNode = NULL;
void *pLastNode = NULL;
int32_t numOfTables = 0;
int32_t dbNameLen = strlen(pDropDb->name);
SNormalTableObj *pTable = NULL;
while (1) {
pNode = sdbFetchRow(tsNormalTableSdb, pNode, (void **)&pTable);
if (pTable == NULL) break;
if (strncmp(pDropDb->name, pTable->tableId, dbNameLen) == 0) {
sdbDeleteRow(tsNormalTableSdb, pTable);
pNode = pLastNode;
numOfTables ++;
continue;
}
}
mTrace("db:%s, all normal tables:%d is dropped", pDropDb->name, numOfTables);
}
......@@ -242,6 +242,8 @@ static void mgmtProcessRetrieveMsg(SQueuedMsg *pMsg) {
}
SShowObj *pShow = (SShowObj *)pRetrieve->qhandle;
mTrace("show:%p, type:%s, retrieve data", pShow, taosGetShowTypeStr(pShow->type));
if (!mgmtCheckQhandle(pRetrieve->qhandle)) {
mError("pShow:%p, query memory is corrupted", pShow);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_MEMORY_CORRUPTED);
......
......@@ -202,7 +202,7 @@ void mgmtCleanUpSuperTables() {
sdbCloseTable(tsSuperTableSdb);
}
int32_t mgmtCreateSuperTable(SDbObj *pDb, SCMCreateTableMsg *pCreate) {
int32_t mgmtCreateSuperTable(SCMCreateTableMsg *pCreate) {
int32_t numOfTables = sdbGetNumOfRows(tsSuperTableSdb);
if (numOfTables >= TSDB_MAX_SUPER_TABLES) {
mError("stable:%s, numOfTables:%d exceed maxTables:%d", pCreate->tableId, numOfTables, TSDB_MAX_SUPER_TABLES);
......@@ -250,11 +250,16 @@ int32_t mgmtCreateSuperTable(SDbObj *pDb, SCMCreateTableMsg *pCreate) {
return TSDB_CODE_SUCCESS;
}
int32_t mgmtDropSuperTable(SDbObj *pDb, SSuperTableObj *pSuperTable) {
//TODO drop all child tables
mgmtRemoveSuperTableFromDb(pDb);
return sdbDeleteRow(tsSuperTableSdb, pSuperTable);
int32_t mgmtDropSuperTable(SQueuedMsg *newMsg, SDbObj *pDb, SSuperTableObj *pStable) {
if (pStable->numOfTables != 0) {
mError("stable:%s, numOfTables:%d not 0", pStable->tableId, pStable->numOfTables);
return TSDB_CODE_OTHERS;
} else {
//TODO: drop child tables
mError("stable:%s, is dropped from sdb", pStable->tableId);
mgmtRemoveSuperTableFromDb(pDb);
return TSDB_CODE_OTHERS;
}
}
void* mgmtGetSuperTable(char *tableId) {
......@@ -607,6 +612,30 @@ int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, v
return numOfRows;
}
void mgmtDropAllSuperTables(SDbObj *pDropDb) {
void *pNode = NULL;
void *pLastNode = NULL;
int32_t numOfTables = 0;
int32_t dbNameLen = strlen(pDropDb->name);
SSuperTableObj *pTable = NULL;
while (1) {
pNode = sdbFetchRow(tsSuperTableSdb, pNode, (void **)&pTable);
if (pTable == NULL) {
break;
}
if (strncmp(pDropDb->name, pTable->tableId, dbNameLen) == 0) {
sdbDeleteRow(tsSuperTableSdb, pTable);
pNode = pLastNode;
numOfTables ++;
continue;
}
}
mTrace("db:%s, all super tables:%d is dropped", pDropDb->name, numOfTables);
}
void mgmtAddTableIntoSuperTable(SSuperTableObj *pStable) {
pStable->numOfTables++;
}
......
......@@ -51,6 +51,7 @@ static void mgmtProcessTableMetaMsg(SQueuedMsg *queueMsg);
static void mgmtProcessMultiTableMetaMsg(SQueuedMsg *queueMsg);
static void mgmtProcessSuperTableMetaMsg(SQueuedMsg *queueMsg);
static void mgmtProcessCreateTableRsp(SRpcMsg *rpcMsg);
static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg);
static int32_t mgmtGetShowTableMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static void mgmtProcessGetTableMeta(STableInfo *pTable, void *thandle);
......@@ -82,6 +83,9 @@ int32_t mgmtInitTables() {
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_TABLE, mgmtGetShowTableMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_TABLE, mgmtRetrieveShowTables);
mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP, mgmtProcessCreateTableRsp);
mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_TABLE_RSP, mgmtProcessDropTableRsp);
mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP, NULL);
mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_STABLE_RSP, NULL);
return TSDB_CODE_SUCCESS;
}
......@@ -134,83 +138,6 @@ int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMeta *pMeta, boo
return TSDB_CODE_SUCCESS;
}
static void mgmtCreateTable(SVgObj *pVgroup, SQueuedMsg *pMsg) {
SCMCreateTableMsg *pCreate = pMsg->pCont;
int32_t sid = taosAllocateId(pVgroup->idPool);
if (sid < 0) {
mTrace("tables:%s, no enough sid in vgroup:%d", pVgroup->vgId);
mgmtCreateVgroup(pMsg);
return;
}
int32_t code;
STableInfo *pTable;
SMDCreateTableMsg *pMDCreate = NULL;
if (pCreate->numOfColumns == 0) {
mTrace("table:%s, is a child table, vgroup:%d sid:%d ahandle:%p", pCreate->tableId, pVgroup->vgId, sid, pMsg);
code = mgmtCreateChildTable(pCreate, pMsg->contLen, pVgroup, sid, &pMDCreate, &pTable);
} else {
mTrace("table:%s, is a normal table, vgroup:%d sid:%d ahandle:%p", pCreate->tableId, pVgroup->vgId, sid, pMsg);
code = mgmtCreateNormalTable(pCreate, pMsg->contLen, pVgroup, sid, &pMDCreate, &pTable);
}
if (code != TSDB_CODE_SUCCESS) {
mTrace("table:%s, failed to create in vgroup:%d", pCreate->tableId, pVgroup->vgId);
mgmtSendSimpleResp(pMsg->thandle, code);
return;
}
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup);
SRpcMsg rpcMsg = {
.handle = pMsg,
.pCont = pMDCreate,
.contLen = htonl(pMDCreate->contLen),
.code = 0,
.msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE
};
pMsg->ahandle = pTable;
mgmtSendMsgToDnode(&ipSet, &rpcMsg);
}
int32_t mgmtDropTable(SDbObj *pDb, char *tableId, int32_t ignore) {
STableInfo *pTable = mgmtGetTable(tableId);
if (pTable == NULL) {
if (ignore) {
mTrace("table:%s, table is not exist, think it success", tableId);
return TSDB_CODE_SUCCESS;
} else {
mError("table:%s, failed to create table, table not exist", tableId);
return TSDB_CODE_INVALID_TABLE;
}
}
if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) {
mError("table:%s, failed to create table, in monitor database", tableId);
return TSDB_CODE_MONITOR_DB_FORBIDDEN;
}
switch (pTable->type) {
case TSDB_SUPER_TABLE:
mTrace("table:%s, start to drop super table", tableId);
return mgmtDropSuperTable(pDb, (SSuperTableObj *) pTable);
case TSDB_CHILD_TABLE:
mTrace("table:%s, start to drop child table", tableId);
return mgmtDropChildTable(pDb, (SChildTableObj *) pTable);
case TSDB_NORMAL_TABLE:
mTrace("table:%s, start to drop normal table", tableId);
return mgmtDropNormalTable(pDb, (SNormalTableObj *) pTable);
case TSDB_STREAM_TABLE:
mTrace("table:%s, start to drop stream table", tableId);
return mgmtDropNormalTable(pDb, (SNormalTableObj *) pTable);
default:
mError("table:%s, invalid table type:%d", tableId, pTable->type);
return TSDB_CODE_INVALID_TABLE;
}
}
int32_t mgmtAlterTable(SDbObj *pDb, SCMAlterTableMsg *pAlter) {
STableInfo *pTable = mgmtGetTable(pAlter->tableId);
if (pTable == NULL) {
......@@ -419,13 +346,6 @@ int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *
return numOfRows;
}
SMDDropTableMsg *mgmtBuildRemoveTableMsg(STableInfo *pTable) {
SMDDropTableMsg *pRemove = NULL;
return pRemove;
}
void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) {
if (mgmtCheckRedirect(pMsg->thandle)) return;
......@@ -474,7 +394,7 @@ void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) {
if (pCreate->numOfTags != 0) {
mTrace("table:%s, is a super table", pCreate->tableId);
code = mgmtCreateSuperTable(pMsg->pDb, pCreate);
code = mgmtCreateSuperTable(pCreate);
mgmtSendSimpleResp(pMsg->thandle, code);
return;
}
......@@ -494,29 +414,70 @@ void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) {
if (pVgroup == NULL) {
mTrace("table:%s, start to create a new vgroup", pCreate->tableId);
mgmtCreateVgroup(newMsg);
return;
}
int32_t sid = taosAllocateId(pVgroup->idPool);
if (sid < 0) {
mTrace("tables:%s, no enough sid in vgroup:%d", pVgroup->vgId);
mgmtCreateVgroup(newMsg);
return;
}
SMDCreateTableMsg *pMDCreate = NULL;
if (pCreate->numOfColumns == 0) {
mTrace("table:%s, is a child table, vgroup:%d sid:%d ahandle:%p", pCreate->tableId, pVgroup->vgId, sid, pMsg);
pTable = mgmtCreateChildTable(pCreate, pVgroup, sid);
if (pTable == NULL) {
mgmtSendSimpleResp(pMsg->thandle, terrno);
return;
}
pMDCreate = mgmtBuildCreateChildTableMsg(pCreate, (SChildTableObj *) pTable);
if (pMDCreate == NULL) {
mgmtSendSimpleResp(pMsg->thandle, terrno);
return;
}
} else {
mTrace("table:%s, vgroup:%d is selected", pCreate->tableId, pVgroup->vgId);
mgmtCreateTable(pVgroup, newMsg);
mTrace("table:%s, is a normal table, vgroup:%d sid:%d ahandle:%p", pCreate->tableId, pVgroup->vgId, sid, pMsg);
pTable = mgmtCreateNormalTable(pCreate, pVgroup, sid);
if (pTable == NULL) {
mgmtSendSimpleResp(pMsg->thandle, terrno);
return;
}
pMDCreate = mgmtBuildCreateNormalTableMsg((SNormalTableObj *) pTable);
if (pMDCreate == NULL) {
mgmtSendSimpleResp(pMsg->thandle, terrno);
return;
}
}
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup);
SRpcMsg rpcMsg = {
.handle = newMsg,
.pCont = pMDCreate,
.contLen = htonl(pMDCreate->contLen),
.code = 0,
.msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE
};
newMsg->ahandle = pTable;
mgmtSendMsgToDnode(&ipSet, &rpcMsg);
}
void mgmtProcessDropTableMsg(SQueuedMsg *pMsg) {
SCMDropTableMsg *pDrop = pMsg->pCont;
if (mgmtCheckRedirect(pMsg->thandle)) return;
if (mgmtCheckRedirect(pMsg->thandle)) {
mError("thandle:%p, failed to drop table:%s, need redirect message", pMsg->thandle, pDrop->tableId);
return;
}
SCMDropTableMsg *pDrop = pMsg->pCont;
mTrace("table:%s, drop table msg is received from thandle:%p", pDrop->tableId, pMsg->thandle);
SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle);
if (pUser == NULL) {
mError("table:%s, failed to drop table, invalid user", pDrop->tableId);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_USER);
if (mgmtCheckExpired()) {
mError("table:%s, failed to drop, grant expired", pDrop->tableId);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_GRANT_EXPIRED);
return;
}
if (!pUser->writeAuth) {
mError("table:%s, failed to drop table, no rights", pDrop->tableId);
if (!pMsg->pUser->writeAuth) {
mError("table:%s, failed to drop, no rights", pDrop->tableId);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS);
return;
}
......@@ -528,8 +489,54 @@ void mgmtProcessDropTableMsg(SQueuedMsg *pMsg) {
return;
}
int32_t code = mgmtDropTable(pDb, pDrop->tableId, pDrop->igNotExists);
if (code != TSDB_CODE_ACTION_IN_PROGRESS) {
STableInfo *pTable = mgmtGetTable(pDrop->tableId);
if (pTable == NULL) {
if (pDrop->igNotExists) {
mTrace("table:%s, table is not exist, think drop success", pDrop->tableId);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SUCCESS);
return;
} else {
mError("table:%s, failed to drop table, table not exist", pDrop->tableId);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE);
return;
}
}
if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) {
mError("table:%s, failed to drop table, in monitor database", pDrop->tableId);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_MONITOR_DB_FORBIDDEN);
return;
}
SQueuedMsg *newMsg = malloc(sizeof(SQueuedMsg));
memcpy(newMsg, pMsg, sizeof(SQueuedMsg));
pMsg->pCont = NULL;
int32_t code;
switch (pTable->type) {
case TSDB_SUPER_TABLE:
mTrace("table:%s, start to drop super table", pDrop->tableId);
code = mgmtDropSuperTable(newMsg, pDb, (SSuperTableObj *) pTable);
break;
case TSDB_CHILD_TABLE:
mTrace("table:%s, start to drop child table", pDrop->tableId);
code = mgmtDropChildTable(newMsg, (SChildTableObj *) pTable);
break;
case TSDB_NORMAL_TABLE:
mTrace("table:%s, start to drop normal table", pDrop->tableId);
code = mgmtDropNormalTable(newMsg, (SNormalTableObj *) pTable);
break;
case TSDB_STREAM_TABLE:
mTrace("table:%s, start to drop stream table", pDrop->tableId);
code = mgmtDropNormalTable(newMsg, (SNormalTableObj *) pTable);
break;
default:
code = TSDB_CODE_INVALID_TABLE_TYPE;
mError("table:%s, invalid table type:%d", pDrop->tableId, pTable->type);
}
if (code != TSDB_CODE_SUCCESS) {
free(newMsg);
mgmtSendSimpleResp(pMsg->thandle, code);
}
}
......@@ -775,3 +782,52 @@ static void mgmtProcessCreateTableRsp(SRpcMsg *rpcMsg) {
free(queueMsg);
}
static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg) {
if (rpcMsg->handle == NULL) return;
SQueuedMsg *queueMsg = rpcMsg->handle;
queueMsg->received++;
STableInfo *pTable = queueMsg->ahandle;
mTrace("table:%s, drop table rsp received, thandle:%p result:%s", pTable->tableId, queueMsg->thandle, tstrerror(rpcMsg->code));
if (rpcMsg->code != TSDB_CODE_SUCCESS) {
mError("table:%s, failed to drop in dnode, reason:%s", pTable->tableId, tstrerror(rpcMsg->code));
mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code);
free(queueMsg);
return;
}
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
if (pVgroup == NULL) {
mError("table:%s, failed to get vgroup", pTable->tableId);
mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_INVALID_VGROUP_ID);
free(queueMsg);
return;
}
if (pTable->type == TSDB_CHILD_TABLE) {
if (sdbDeleteRow(tsChildTableSdb, pTable) < 0) {
mError("table:%s, update ctables sdb error", pTable->tableId);
mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SDB_ERROR);
free(queueMsg);
return;
}
} else if (pTable->type == TSDB_NORMAL_TABLE){
if (sdbDeleteRow(tsNormalTableSdb, pTable) < 0) {
mError("table:%s, update ntables sdb error", pTable->tableId);
mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SDB_ERROR);
free(queueMsg);
return;
}
}
if (pVgroup->numOfTables <= 0) {
mPrint("vgroup:%d, all tables is dropped, drop vgroup", pVgroup->vgId);
mgmtDropVgroup(pVgroup, NULL);
}
mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SUCCESS);
free(queueMsg);
}
......@@ -44,8 +44,10 @@ static void *mgmtVgroupActionDestroy(void *row, char *str, int32_t size, int32_t
static int32_t mgmtGetVgroupMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
static int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg);
static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg);
void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle);
static void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle);
static void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle);
static void mgmtVgroupActionInit() {
SVgObj tObj;
......@@ -119,6 +121,7 @@ int32_t mgmtInitVgroups() {
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_VGROUP, mgmtGetVgroupMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_VGROUP, mgmtRetrieveVgroups);
mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP, mgmtProcessCreateVnodeRsp);
mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_VNODE_RSP, mgmtProcessDropVnodeRsp);
mTrace("vgroup is initialized");
return 0;
......@@ -135,18 +138,6 @@ SVgObj *mgmtGetAvailableVgroup(SDbObj *pDb) {
return pDb->pHead;
}
void mgmtProcessVgTimer(void *handle, void *tmrId) {
SDbObj *pDb = (SDbObj *)handle;
if (pDb == NULL) return;
if (pDb->vgStatus > TSDB_VG_STATUS_IN_PROGRESS) {
mTrace("db:%s, set vgroup status from %d to ready", pDb->name, pDb->vgStatus);
pDb->vgStatus = TSDB_VG_STATUS_READY;
}
pDb->vgTimer = NULL;
}
void mgmtCreateVgroup(SQueuedMsg *pMsg) {
SDbObj *pDb = pMsg->pDb;
if (pDb == NULL) {
......@@ -185,25 +176,14 @@ void mgmtCreateVgroup(SQueuedMsg *pMsg) {
mgmtSendCreateVgroupMsg(pVgroup, pMsg);
}
int32_t mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup) {
STableInfo *pTable;
if (pVgroup->numOfTables > 0) {
for (int32_t i = 0; i < pDb->cfg.maxSessions; ++i) {
if (pVgroup->tableList != NULL) {
pTable = pVgroup->tableList[i];
if (pTable) mgmtDropTable(pDb, pTable->tableId, 0);
}
}
void mgmtDropVgroup(SVgObj *pVgroup, void *ahandle) {
if (ahandle != NULL) {
mgmtSendDropVgroupMsg(pVgroup, ahandle);
} else {
mTrace("vgroup:%d, replica:%d is deleting from sdb", pVgroup->vgId, pVgroup->numOfVnodes);
mgmtSendDropVgroupMsg(pVgroup, NULL);
sdbDeleteRow(tsVgroupSdb, pVgroup);
}
mTrace("vgroup:%d, db:%s replica:%d is deleted", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes);
//mgmtSendDropVgroupMsg(pVgroup, NULL);
sdbDeleteRow(tsVgroupSdb, pVgroup);
return TSDB_CODE_SUCCESS;
}
void mgmtSetVgroupIdPool() {
......@@ -632,5 +612,67 @@ static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code);
}
free(queueMsg);
}
static SMDDropVnodeMsg *mgmtBuildDropVnodeMsg(SVgObj *pVgroup) {
SMDDropVnodeMsg *pDrop = rpcMallocCont(sizeof(SMDDropVnodeMsg));
if (pDrop == NULL) return NULL;
pDrop->vgId = htonl(pVgroup->vgId);
return pDrop;
}
static void mgmtSendDropVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle) {
mTrace("vgroup:%d, send drop vnode msg, ahandle:%p", pVgroup->vgId, ahandle);
SMDDropVnodeMsg *pDrop = mgmtBuildDropVnodeMsg(pVgroup);
SRpcMsg rpcMsg = {
.handle = ahandle,
.pCont = pDrop,
.contLen = pDrop ? sizeof(SMDDropVnodeMsg) : 0,
.code = 0,
.msgType = TSDB_MSG_TYPE_MD_DROP_VNODE
};
mgmtSendMsgToDnode(ipSet, &rpcMsg);
}
static void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle) {
mTrace("vgroup:%d, send drop all vnodes msg, ahandle:%p", pVgroup->vgId, ahandle);
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip);
mgmtSendDropVnodeMsg(pVgroup, &ipSet, ahandle);
}
}
static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg) {
mTrace("drop vnode msg is received");
if (rpcMsg->handle == NULL) return;
SQueuedMsg *queueMsg = rpcMsg->handle;
queueMsg->received++;
if (rpcMsg->code == TSDB_CODE_SUCCESS) {
queueMsg->code = rpcMsg->code;
queueMsg->successed++;
}
SVgObj *pVgroup = queueMsg->ahandle;
mTrace("vgroup:%d, drop vnode rsp received, result:%s received:%d successed:%d expected:%d, thandle:%p ahandle:%p",
pVgroup->vgId, tstrerror(rpcMsg->code), queueMsg->received, queueMsg->successed, queueMsg->expected,
queueMsg->thandle, rpcMsg->handle);
if (queueMsg->received != queueMsg->expected) return;
sdbDeleteRow(tsVgroupSdb, pVgroup);
SQueuedMsg *newMsg = calloc(1, sizeof(SQueuedMsg));
newMsg->msgType = queueMsg->msgType;
newMsg->thandle = queueMsg->thandle;
newMsg->pDb = queueMsg->pDb;
newMsg->pUser = queueMsg->pUser;
newMsg->contLen = queueMsg->contLen;
newMsg->pCont = rpcMallocCont(newMsg->contLen);
memcpy(newMsg->pCont, queueMsg->pCont, newMsg->contLen);
mgmtAddToShellQueue(newMsg);
free(queueMsg);
}
\ No newline at end of file
......@@ -15,5 +15,5 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
TARGET_LINK_LIBRARIES(tsdb common tutil)
# Someone has no gtest directory, so comment it
ADD_SUBDIRECTORY(tests)
# ADD_SUBDIRECTORY(tests)
ENDIF ()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册