提交 ed2254db 编写于 作者: S slguan

[TD-114] let create table wait until master selected

上级 909876fa
......@@ -149,7 +149,8 @@ typedef struct _vg_obj {
int32_t lbDnodeId;
int32_t lbTime;
int8_t status;
int8_t reserved[14];
int8_t inUse;
int8_t reserved[13];
int8_t updateEnd[1];
int32_t refCount;
struct _vg_obj *prev, *next;
......@@ -243,6 +244,8 @@ typedef struct {
int8_t received;
int8_t successed;
int8_t expected;
int8_t retry;
int8_t maxRetry;
int32_t contLen;
int32_t code;
void *ahandle;
......
......@@ -510,7 +510,6 @@ typedef struct SRetrieveTableRsp {
typedef struct {
int32_t vgId;
int32_t vnode;
int64_t totalStorage;
int64_t compStorage;
int64_t pointsWritten;
......
......@@ -31,6 +31,7 @@ void mgmtAddShellShowMetaHandle(uint8_t showType, SShowMetaFp fp);
void mgmtAddShellShowRetrieveHandle(uint8_t showType, SShowRetrieveFp fp);
void mgmtAddToShellQueue(SQueuedMsg *queuedMsg);
void mgmtDealyedAddToShellQueue(SQueuedMsg *queuedMsg);
void mgmtSendSimpleResp(void *thandle, int32_t code);
#ifdef __cplusplus
......
......@@ -37,6 +37,7 @@ void mgmtDropAllVgroups(SDbObj *pDropDb);
void * mgmtGetNextVgroup(void *pNode, SVgObj **pVgroup);
void mgmtUpdateVgroup(SVgObj *pVgroup);
void mgmtUpdateVgroupStatus(SVgObj *pVgroup, int32_t dnodeId, SVnodeLoad *pVload);
void mgmtCreateVgroup(SQueuedMsg *pMsg, SDbObj *pDb);
void mgmtDropVgroup(SVgObj *pVgroup, void *ahandle);
......
......@@ -189,18 +189,21 @@ void clusterProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
int32_t openVnodes = htons(pStatus->openVnodes);
for (int32_t j = 0; j < openVnodes; ++j) {
pDnode->vload[j].vgId = htonl(pStatus->load[j].vgId);
pDnode->vload[j].totalStorage = htobe64(pStatus->load[j].totalStorage);
pDnode->vload[j].compStorage = htobe64(pStatus->load[j].compStorage);
pDnode->vload[j].pointsWritten = htobe64(pStatus->load[j].pointsWritten);
SVnodeLoad *pVload = &pStatus->load[j];
pDnode->vload[j].vgId = htonl(pVload->vgId);
pDnode->vload[j].totalStorage = htobe64(pVload->totalStorage);
pDnode->vload[j].compStorage = htobe64(pVload->compStorage);
pDnode->vload[j].pointsWritten = htobe64(pVload->pointsWritten);
SVgObj *pVgroup = mgmtGetVgroup(pDnode->vload[j].vgId);
if (pVgroup == NULL) {
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pDnode->privateIp);
mPrint("dnode:%d, vgroup:%d not exist in mnode, drop it", pDnode->dnodeId, pDnode->vload[j].vgId);
mgmtSendDropVnodeMsg(pDnode->vload[j].vgId, &ipSet, NULL);
} else {
mgmtUpdateVgroupStatus(pVgroup, pDnode->dnodeId, pVload);
mgmtReleaseVgroup(pVgroup);
}
mgmtReleaseVgroup(pVgroup);
}
if (pDnode->status == TAOS_DN_STATUS_OFFLINE) {
......
......@@ -806,6 +806,8 @@ void* mgmtCloneQueuedMsg(SQueuedMsg *pSrcMsg) {
pDestMsg->msgType = pSrcMsg->msgType;
pDestMsg->pCont = pSrcMsg->pCont;
pDestMsg->contLen = pSrcMsg->contLen;
pDestMsg->retry = pSrcMsg->retry;
pDestMsg->maxRetry= pSrcMsg->maxRetry;
pDestMsg->pUser = pSrcMsg->pUser;
pDestMsg->usePublicIp = pSrcMsg->usePublicIp;
......
......@@ -128,6 +128,15 @@ void mgmtAddToShellQueue(SQueuedMsg *queuedMsg) {
taosScheduleTask(tsMgmtTranQhandle, &schedMsg);
}
static void mgmtDoDealyedAddToShellQueue(void *param, void *tmrId) {
mgmtAddToShellQueue(param);
}
void mgmtDealyedAddToShellQueue(SQueuedMsg *queuedMsg) {
void *unUsed = NULL;
taosTmrReset(mgmtDoDealyedAddToShellQueue, 1000, queuedMsg, tsMgmtTmr, &unUsed);
}
static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) {
if (rpcMsg == NULL || rpcMsg->pCont == NULL) {
return;
......
......@@ -540,7 +540,7 @@ static void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) {
SCMCreateTableMsg *pCreate = pMsg->pCont;
pMsg->pTable = mgmtGetTable(pCreate->tableId);
if (pMsg->pTable != NULL) {
if (pMsg->pTable != NULL && pMsg->retry == 0) {
if (pCreate->igExists) {
mTrace("table:%s, is already exist", pCreate->tableId);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SUCCESS);
......@@ -1300,7 +1300,11 @@ static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) {
return;
}
pMsg->pTable = (STableObj *)mgmtDoCreateChildTable(pCreate, pVgroup, sid);
if (pMsg->retry == 0) {
pMsg->pTable = (STableObj *)mgmtDoCreateChildTable(pCreate, pVgroup, sid);
} else {
pMsg->pTable = mgmtGetTable(pCreate->tableId);
}
if (pMsg->pTable == NULL) {
mgmtSendSimpleResp(pMsg->thandle, terrno);
return;
......@@ -1315,6 +1319,7 @@ static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) {
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup);
SQueuedMsg *newMsg = mgmtCloneQueuedMsg(pMsg);
newMsg->ahandle = pMsg->pTable;
newMsg->maxRetry = 5;
mgmtIncTableRef(pMsg->pTable);
SRpcMsg rpcMsg = {
.handle = newMsg,
......@@ -1737,30 +1742,40 @@ static void mgmtProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
queueMsg->received++;
SChildTableObj *pTable = queueMsg->ahandle;
mTrace("table:%s, create table rsp received, thandle:%p ahandle:%p result:%s", pTable->info.tableId, queueMsg->thandle,
rpcMsg->handle, tstrerror(rpcMsg->code));
mTrace("table:%s, create table rsp received, thandle:%p result:%s", pTable->info.tableId, queueMsg->thandle,
tstrerror(rpcMsg->code));
if (rpcMsg->code != TSDB_CODE_SUCCESS) {
SSdbOperDesc oper = {
.type = SDB_OPER_TYPE_GLOBAL,
.table = tsChildTableSdb,
.pObj = pTable
};
sdbDeleteRow(&oper);
if (queueMsg->retry++ < queueMsg->maxRetry) {
mTrace("table:%s, create table rsp received, retry:%d thandle:%p result:%s", pTable->info.tableId,
queueMsg->retry, queueMsg->thandle, tstrerror(rpcMsg->code));
mgmtDealyedAddToShellQueue(queueMsg);
} else {
mError("table:%s, failed to create in dnode, thandle:%p result:%s", pTable->info.tableId,
queueMsg->thandle, tstrerror(rpcMsg->code));
SSdbOperDesc oper = {
.type = SDB_OPER_TYPE_GLOBAL,
.table = tsChildTableSdb,
.pObj = pTable
};
sdbDeleteRow(&oper);
mError("table:%s, failed to create in dnode, reason:%s", pTable->info.tableId, tstrerror(rpcMsg->code));
mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code);
mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code);
mgmtFreeQueuedMsg(queueMsg);
}
} else {
mTrace("table:%s, created in dnode", pTable->info.tableId);
mTrace("table:%s, created in dnode, thandle:%p result:%s", pTable->info.tableId, queueMsg->thandle,
tstrerror(rpcMsg->code));
if (queueMsg->msgType != TSDB_MSG_TYPE_CM_CREATE_TABLE) {
mTrace("table:%s, start to get meta", pTable->info.tableId);
mgmtAddToShellQueue(mgmtCloneQueuedMsg(queueMsg));
mgmtAddToShellQueue(queueMsg);
} else {
mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code);
mgmtFreeQueuedMsg(queueMsg);
}
}
mgmtFreeQueuedMsg(queueMsg);
}
// not implemented yet
......
......@@ -18,6 +18,7 @@
#include "taoserror.h"
#include "tlog.h"
#include "tbalance.h"
#include "tsync.h"
#include "tcluster.h"
#include "mnode.h"
#include "mgmtDb.h"
......@@ -209,6 +210,18 @@ void mgmtUpdateVgroup(SVgObj *pVgroup) {
mgmtSendCreateVgroupMsg(pVgroup, NULL);
}
void mgmtUpdateVgroupStatus(SVgObj *pVgroup, int32_t dnodeId, SVnodeLoad *pVload) {
if (pVload->role == TAOS_SYNC_ROLE_MASTER) {
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
if (pVgid->dnodeId == dnodeId) {
pVgroup->inUse = i;
break;
}
}
}
}
SVgObj *mgmtGetAvailableVgroup(SDbObj *pDb) {
return pDb->pHead;
}
......
......@@ -285,7 +285,6 @@ static void vnodeBuildVloadMsg(char *pNode, void * param) {
SVnodeLoad *pLoad = &pStatus->load[pStatus->openVnodes++];
pLoad->vgId = htonl(pVnode->vgId);
pLoad->vnode = htonl(pVnode->vgId);
pLoad->status = pVnode->status;
pLoad->role = pVnode->role;
}
......
......@@ -57,7 +57,8 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
if (pVnode->status != TAOS_VN_STATUS_READY)
return TSDB_CODE_NOT_ACTIVE_VNODE;
// if (pVnode->replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER)
if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER)
return TSDB_CODE_NO_MASTER;
// assign version
pVnode->version++;
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -m 192.168.0.1 -i 192.168.0.1
system sh/deploy.sh -n dnode2 -m 192.168.0.1 -i 192.168.0.2
\ No newline at end of file
system sh/deploy.sh -n dnode2 -m 192.168.0.1 -i 192.168.0.2
system sh/deploy.sh -n dnode3 -m 192.168.0.1 -i 192.168.0.3
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册