提交 6639ef6e 编写于 作者: S slguan

fix bug in dnode write module

上级 66e5cd34
...@@ -60,7 +60,7 @@ static void dnodeProcessRspFromMnode(SRpcMsg *pMsg) { ...@@ -60,7 +60,7 @@ static void dnodeProcessRspFromMnode(SRpcMsg *pMsg) {
if (dnodeProcessMgmtRspFp[pMsg->msgType]) { if (dnodeProcessMgmtRspFp[pMsg->msgType]) {
(*dnodeProcessMgmtRspFp[pMsg->msgType])(pMsg); (*dnodeProcessMgmtRspFp[pMsg->msgType])(pMsg);
} else { } else {
dError("%s is not processed", taosMsg[pMsg->msgType]); dError("%s is not processed in mclient", taosMsg[pMsg->msgType]);
} }
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
......
...@@ -81,7 +81,7 @@ static void dnodeProcessMsgFromMnode(SRpcMsg *pMsg) { ...@@ -81,7 +81,7 @@ static void dnodeProcessMsgFromMnode(SRpcMsg *pMsg) {
if (dnodeProcessMgmtMsgFp[pMsg->msgType]) { if (dnodeProcessMgmtMsgFp[pMsg->msgType]) {
(*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg); (*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg);
} else { } else {
dError("%s is not processed", taosMsg[pMsg->msgType]); dError("%s is not processed in mserver", taosMsg[pMsg->msgType]);
rspMsg.code = TSDB_CODE_MSG_NOT_PROCESSED; rspMsg.code = TSDB_CODE_MSG_NOT_PROCESSED;
rpcSendResponse(&rspMsg); rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
......
...@@ -84,27 +84,33 @@ void dnodeCleanupWrite() { ...@@ -84,27 +84,33 @@ void dnodeCleanupWrite() {
} }
void dnodeWrite(SRpcMsg *pMsg) { void dnodeWrite(SRpcMsg *pMsg) {
int32_t queuedMsgNum = 0;
int32_t leftLen = pMsg->contLen; int32_t leftLen = pMsg->contLen;
char *pCont = (char *) pMsg->pCont; char *pCont = (char *) pMsg->pCont;
int32_t contLen = 0;
int32_t numOfVnodes = 0;
int32_t vgId = 0;
SRpcContext *pRpcContext = NULL; SRpcContext *pRpcContext = NULL;
// parse head, get number of vnodes; int32_t numOfVnodes = 0;
if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) {
// TODO parse head, get number of vnodes;
numOfVnodes = 1;
} else {
numOfVnodes = 1;
}
if ( numOfVnodes > 1) { if (numOfVnodes > 1) {
pRpcContext = calloc(sizeof(SRpcContext), 1); pRpcContext = calloc(sizeof(SRpcContext), 1);
pRpcContext->numOfVnodes = numOfVnodes; pRpcContext->numOfVnodes = numOfVnodes;
} }
while (leftLen > 0) { while (leftLen > 0) {
// todo: parse head, get vgId, contLen SWriteMsgHead *pHead = (SWriteMsgHead *) pCont;
int32_t vgId = htonl(pHead->vgId);
int32_t contLen = htonl(pHead->contLen);
// get pVnode from vgId
void *pVnode = dnodeGetVnode(vgId); void *pVnode = dnodeGetVnode(vgId);
if (pVnode == NULL) { if (pVnode == NULL) {
leftLen -= contLen;
pCont -= contLen;
continue; continue;
} }
...@@ -118,10 +124,22 @@ void dnodeWrite(SRpcMsg *pMsg) { ...@@ -118,10 +124,22 @@ void dnodeWrite(SRpcMsg *pMsg) {
taos_queue queue = dnodeGetVnodeWworker(pVnode); taos_queue queue = dnodeGetVnodeWworker(pVnode);
taosWriteQitem(queue, &writeMsg); taosWriteQitem(queue, &writeMsg);
// next vnode // next vnode
leftLen -= contLen; leftLen -= contLen;
pCont -= contLen; pCont -= contLen;
queuedMsgNum++;
}
if (queuedMsgNum == 0) {
SRpcMsg rpcRsp = {
.handle = pMsg->handle,
.pCont = NULL,
.contLen = 0,
.code = TSDB_CODE_INVALID_VGROUP_ID,
.msgType = 0
};
rpcSendResponse(&rpcRsp);
} }
} }
......
...@@ -244,6 +244,14 @@ typedef struct { ...@@ -244,6 +244,14 @@ typedef struct {
} SVnodeDesc; } SVnodeDesc;
typedef struct { typedef struct {
int32_t contLen;
int32_t vgId;
} SWriteMsgHead;
typedef struct {
int32_t contLen;
int32_t vgId;
int8_t tableType; int8_t tableType;
int16_t numOfColumns; int16_t numOfColumns;
int16_t numOfTags; int16_t numOfTags;
...@@ -251,7 +259,6 @@ typedef struct { ...@@ -251,7 +259,6 @@ typedef struct {
int32_t sversion; int32_t sversion;
int32_t tagDataLen; int32_t tagDataLen;
int32_t sqlDataLen; int32_t sqlDataLen;
int32_t contLen;
int32_t numOfVPeers; int32_t numOfVPeers;
uint64_t uid; uint64_t uid;
uint64_t superTableUid; uint64_t superTableUid;
...@@ -337,6 +344,7 @@ typedef struct { ...@@ -337,6 +344,7 @@ typedef struct {
} SMgmtHead; } SMgmtHead;
typedef struct { typedef struct {
int32_t vgId;
int32_t sid; int32_t sid;
int32_t numOfVPeers; int32_t numOfVPeers;
uint64_t uid; uint64_t uid;
......
...@@ -359,7 +359,7 @@ int32_t mgmtCreateChildTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgObj ...@@ -359,7 +359,7 @@ int32_t mgmtCreateChildTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgObj
*pTableOut = (STableInfo *) pTable; *pTableOut = (STableInfo *) pTable;
mTrace("table:%s, create table in vgroup, vgroup:%d sid:%d vnode:%d uid:%" PRIu64 , mTrace("table:%s, create ctable in vgroup, vgroup:%d sid:%d vnode:%d uid:%" PRIu64 ,
pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid); pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -78,7 +78,7 @@ static void mgmtProcessRspFromDnode(SRpcMsg *rpcMsg) { ...@@ -78,7 +78,7 @@ static void mgmtProcessRspFromDnode(SRpcMsg *rpcMsg) {
if (mgmtProcessDnodeRspFp[rpcMsg->msgType]) { if (mgmtProcessDnodeRspFp[rpcMsg->msgType]) {
(*mgmtProcessDnodeRspFp[rpcMsg->msgType])(rpcMsg); (*mgmtProcessDnodeRspFp[rpcMsg->msgType])(rpcMsg);
} else { } else {
dError("%s is not processed", taosMsg[rpcMsg->msgType]); mError("%s is not processed in dclient", taosMsg[rpcMsg->msgType]);
} }
rpcFreeCont(rpcMsg->pCont); rpcFreeCont(rpcMsg->pCont);
......
...@@ -76,7 +76,7 @@ static void mgmtProcessMsgFromDnode(SRpcMsg *rpcMsg) { ...@@ -76,7 +76,7 @@ static void mgmtProcessMsgFromDnode(SRpcMsg *rpcMsg) {
if (mgmtProcessDnodeMsgFp[rpcMsg->msgType]) { if (mgmtProcessDnodeMsgFp[rpcMsg->msgType]) {
(*mgmtProcessDnodeMsgFp[rpcMsg->msgType])(rpcMsg); (*mgmtProcessDnodeMsgFp[rpcMsg->msgType])(rpcMsg);
} else { } else {
mError("%s is not processed", taosMsg[rpcMsg->msgType]); mError("%s is not processed in dserver", taosMsg[rpcMsg->msgType]);
} }
rpcFreeCont(rpcMsg->pCont); rpcFreeCont(rpcMsg->pCont);
......
...@@ -393,7 +393,7 @@ int32_t mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgOb ...@@ -393,7 +393,7 @@ int32_t mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgOb
*pTableOut = (STableInfo *) pTable; *pTableOut = (STableInfo *) pTable;
mTrace("table:%s, create table in vgroup, vgroup:%d sid:%d vnode:%d uid:%" PRIu64 , mTrace("table:%s, create ntable in vgroup, vgroup:%d sid:%d vnode:%d uid:%" PRIu64 ,
pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid); pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -145,7 +145,6 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { ...@@ -145,7 +145,6 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) {
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle);
if (pUser == NULL) { if (pUser == NULL) {
mError("thandle:%p, failed to retrieve user info", rpcMsg->handle);
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_USER); mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_USER);
rpcFreeCont(rpcMsg->pCont); rpcFreeCont(rpcMsg->pCont);
return; return;
...@@ -242,7 +241,7 @@ static void mgmtProcessRetrieveMsg(SQueuedMsg *pMsg) { ...@@ -242,7 +241,7 @@ static void mgmtProcessRetrieveMsg(SQueuedMsg *pMsg) {
} }
SShowObj *pShow = (SShowObj *)pRetrieve->qhandle; SShowObj *pShow = (SShowObj *)pRetrieve->qhandle;
if (!mgmtCheckQhandle(pShow)) { if (!mgmtCheckQhandle(pRetrieve->qhandle)) {
mError("pShow:%p, query memory is corrupted", pShow); mError("pShow:%p, query memory is corrupted", pShow);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_MEMORY_CORRUPTED); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_MEMORY_CORRUPTED);
return; return;
...@@ -466,7 +465,7 @@ static bool mgmtCheckMsgReadOnly(int8_t type, void *pCont) { ...@@ -466,7 +465,7 @@ static bool mgmtCheckMsgReadOnly(int8_t type, void *pCont) {
} }
static void mgmtProcessUnSupportMsg(SRpcMsg *rpcMsg) { static void mgmtProcessUnSupportMsg(SRpcMsg *rpcMsg) {
mError("%s is not processed", taosMsg[rpcMsg->msgType]); mError("%s is not processed in shell", taosMsg[rpcMsg->msgType]);
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.msgType = 0, .msgType = 0,
.pCont = 0, .pCont = 0,
......
...@@ -181,6 +181,8 @@ static void mgmtCreateTable(SVgObj *pVgroup, SQueuedMsg *pMsg) { ...@@ -181,6 +181,8 @@ static void mgmtCreateTable(SVgObj *pVgroup, SQueuedMsg *pMsg) {
.code = 0, .code = 0,
.msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE .msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE
}; };
pMsg->ahandle = pTable;
mgmtSendMsgToDnode(&ipSet, &rpcMsg); mgmtSendMsgToDnode(&ipSet, &rpcMsg);
} }
...@@ -441,14 +443,11 @@ void mgmtSetTableDirty(STableInfo *pTable, bool isDirty) { ...@@ -441,14 +443,11 @@ void mgmtSetTableDirty(STableInfo *pTable, bool isDirty) {
} }
void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) { void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) {
if (mgmtCheckRedirect(pMsg->thandle)) return;
SCMCreateTableMsg *pCreate = pMsg->pCont; SCMCreateTableMsg *pCreate = pMsg->pCont;
mTrace("thandle:%p, start to create table:%s", pMsg->thandle, pCreate->tableId); mTrace("thandle:%p, start to create table:%s", pMsg->thandle, pCreate->tableId);
if (mgmtCheckRedirect(pMsg->thandle)) {
mError("thandle:%p, failed to create table:%s, need redirect", pMsg->thandle, pCreate->tableId);
return;
}
if (mgmtCheckExpired()) { if (mgmtCheckExpired()) {
mError("thandle:%p, failed to create table:%s, grant expired", pCreate->tableId); mError("thandle:%p, failed to create table:%s, grant expired", pCreate->tableId);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_GRANT_EXPIRED); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_GRANT_EXPIRED);
...@@ -469,8 +468,8 @@ void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) { ...@@ -469,8 +468,8 @@ void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) {
return; return;
} }
SDbObj *pDb = mgmtGetDb(pCreate->db); pMsg->pDb = mgmtGetDb(pCreate->db);
if (pDb == NULL) { if (pMsg->pDb == NULL) {
mError("thandle:%p, failed to create table:%s, db not selected", pMsg->thandle, pCreate->tableId); mError("thandle:%p, failed to create table:%s, db not selected", pMsg->thandle, pCreate->tableId);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_DB_NOT_SELECTED); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_DB_NOT_SELECTED);
return; return;
...@@ -492,7 +491,7 @@ void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) { ...@@ -492,7 +491,7 @@ void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) {
if (pCreate->numOfTags != 0) { if (pCreate->numOfTags != 0) {
mTrace("thandle:%p, start to create super table:%s, tags:%d columns:%d", mTrace("thandle:%p, start to create super table:%s, tags:%d columns:%d",
pMsg->thandle, pCreate->tableId, pCreate->numOfTags, pCreate->numOfColumns); pMsg->thandle, pCreate->tableId, pCreate->numOfTags, pCreate->numOfColumns);
code = mgmtCreateSuperTable(pDb, pCreate); code = mgmtCreateSuperTable(pMsg->pDb, pCreate);
mgmtSendSimpleResp(pMsg->thandle, code); mgmtSendSimpleResp(pMsg->thandle, code);
return; return;
} }
...@@ -508,13 +507,13 @@ void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) { ...@@ -508,13 +507,13 @@ void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) {
memcpy(newMsg, pMsg, sizeof(SQueuedMsg)); memcpy(newMsg, pMsg, sizeof(SQueuedMsg));
pMsg->pCont = NULL; pMsg->pCont = NULL;
SVgObj *pVgroup = mgmtGetAvailableVgroup(pDb); SVgObj *pVgroup = mgmtGetAvailableVgroup(pMsg->pDb);
if (pVgroup == NULL) { if (pVgroup == NULL) {
mTrace("thandle:%p, table:%s start to create a new vgroup", pMsg->thandle, pCreate->tableId); mTrace("thandle:%p, table:%s start to create a new vgroup", newMsg->thandle, pCreate->tableId);
mgmtCreateVgroup(pMsg); mgmtCreateVgroup(newMsg);
} else { } else {
mTrace("thandle:%p, create table:%s in vgroup:%d", pMsg->thandle, pCreate->tableId, pVgroup->vgId); mTrace("thandle:%p, create table:%s in vgroup:%d", newMsg->thandle, pCreate->tableId, pVgroup->vgId);
mgmtCreateTable(pVgroup, pMsg); mgmtCreateTable(pVgroup, newMsg);
} }
} }
...@@ -769,7 +768,7 @@ static void mgmtProcessCreateTableRsp(SRpcMsg *rpcMsg) { ...@@ -769,7 +768,7 @@ static void mgmtProcessCreateTableRsp(SRpcMsg *rpcMsg) {
mgmtSetTableDirty(pTable, true); mgmtSetTableDirty(pTable, true);
//sdbDeleteRow(tsVgroupSdb, pVgroup); //sdbDeleteRow(tsVgroupSdb, pVgroup);
mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code); mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code);
mError("table:%s, failed to create in dnode, code:%d, set it dirty", pTable->tableId, rpcMsg->code); mError("table:%s, failed to create in dnode, reason:%s, set it dirty", pTable->tableId, tstrerror(rpcMsg->code));
mgmtSetTableDirty(pTable, true); mgmtSetTableDirty(pTable, true);
} else { } else {
mTrace("table:%s, created in dnode", pTable->tableId); mTrace("table:%s, created in dnode", pTable->tableId);
......
...@@ -182,6 +182,8 @@ void mgmtCreateVgroup(SQueuedMsg *pMsg) { ...@@ -182,6 +182,8 @@ void mgmtCreateVgroup(SQueuedMsg *pMsg) {
taosIpStr(pVgroup->vnodeGid[i].ip), pVgroup->vnodeGid[i].vnode); taosIpStr(pVgroup->vnodeGid[i].ip), pVgroup->vnodeGid[i].vnode);
} }
pMsg->ahandle = pVgroup;
pMsg->expected = pVgroup->numOfVnodes;
mgmtSendCreateVgroupMsg(pVgroup, pMsg); mgmtSendCreateVgroupMsg(pVgroup, pMsg);
} }
...@@ -561,7 +563,7 @@ SRpcIpSet mgmtGetIpSetFromVgroup(SVgObj *pVgroup) { ...@@ -561,7 +563,7 @@ SRpcIpSet mgmtGetIpSetFromVgroup(SVgObj *pVgroup) {
SRpcIpSet ipSet = { SRpcIpSet ipSet = {
.numOfIps = pVgroup->numOfVnodes, .numOfIps = pVgroup->numOfVnodes,
.inUse = 0, .inUse = 0,
.port = tsMnodeDnodePort .port = tsDnodeMnodePort
}; };
for (int i = 0; i < pVgroup->numOfVnodes; ++i) { for (int i = 0; i < pVgroup->numOfVnodes; ++i) {
ipSet.ip[i] = pVgroup->vnodeGid[i].ip; ipSet.ip[i] = pVgroup->vnodeGid[i].ip;
...@@ -574,7 +576,7 @@ SRpcIpSet mgmtGetIpSetFromIp(uint32_t ip) { ...@@ -574,7 +576,7 @@ SRpcIpSet mgmtGetIpSetFromIp(uint32_t ip) {
.ip[0] = ip, .ip[0] = ip,
.numOfIps = 1, .numOfIps = 1,
.inUse = 0, .inUse = 0,
.port = tsMnodeDnodePort .port = tsDnodeMnodePort
}; };
return ipSet; return ipSet;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册