提交 8ff09041 编写于 作者: S Shengliang Guan

Merge branches 'feature/wal' and 'feature/wal' of...

Merge branches 'feature/wal' and 'feature/wal' of https://github.com/taosdata/TDengine into feature/wal
...@@ -243,7 +243,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { ...@@ -243,7 +243,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
STscObj* pObj = pSql->pTscObj; STscObj* pObj = pSql->pTscObj;
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
char *pMsg = rpcMallocCont(sizeof(SMsgVersion) + pCmd->payloadLen); char *pMsg = rpcMallocCont(pCmd->payloadLen);
if (NULL == pMsg) { if (NULL == pMsg) {
tscError("%p msg:%s malloc failed", pSql, taosMsg[pSql->cmd.msgType]); tscError("%p msg:%s malloc failed", pSql, taosMsg[pSql->cmd.msgType]);
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
...@@ -254,13 +254,12 @@ int tscSendMsgToServer(SSqlObj *pSql) { ...@@ -254,13 +254,12 @@ int tscSendMsgToServer(SSqlObj *pSql) {
tscDumpMgmtEpSet(pSql); tscDumpMgmtEpSet(pSql);
} }
tstrncpy(pMsg, version, sizeof(SMsgVersion)); memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
memcpy(pMsg + sizeof(SMsgVersion), pSql->cmd.payload, pSql->cmd.payloadLen);
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.msgType = pSql->cmd.msgType, .msgType = pSql->cmd.msgType,
.pCont = pMsg, .pCont = pMsg,
.contLen = pSql->cmd.payloadLen + sizeof(SMsgVersion), .contLen = pSql->cmd.payloadLen,
.ahandle = (void*)pSql->self, .ahandle = (void*)pSql->self,
.handle = NULL, .handle = NULL,
.code = 0 .code = 0
......
...@@ -124,6 +124,8 @@ void dnodeDispatchToMReadQueue(SRpcMsg *pMsg) { ...@@ -124,6 +124,8 @@ void dnodeDispatchToMReadQueue(SRpcMsg *pMsg) {
SMnodeMsg *pRead = mnodeCreateMsg(pMsg); SMnodeMsg *pRead = mnodeCreateMsg(pMsg);
taosWriteQitem(tsMReadQueue, TAOS_QTYPE_RPC, pRead); taosWriteQitem(tsMReadQueue, TAOS_QTYPE_RPC, pRead);
} }
rpcFreeCont(pMsg->pCont);
} }
static void dnodeFreeMReadMsg(SMnodeMsg *pRead) { static void dnodeFreeMReadMsg(SMnodeMsg *pRead) {
......
...@@ -125,6 +125,8 @@ void dnodeDispatchToMWriteQueue(SRpcMsg *pMsg) { ...@@ -125,6 +125,8 @@ void dnodeDispatchToMWriteQueue(SRpcMsg *pMsg) {
taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue); taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue);
taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite); taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite);
} }
rpcFreeCont(pMsg->pCont);
} }
static void dnodeFreeMWriteMsg(SMnodeMsg *pWrite) { static void dnodeFreeMWriteMsg(SMnodeMsg *pWrite) {
......
...@@ -127,20 +127,7 @@ static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { ...@@ -127,20 +127,7 @@ static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
} else {} } else {}
if ( dnodeProcessShellMsgFp[pMsg->msgType] ) { if ( dnodeProcessShellMsgFp[pMsg->msgType] ) {
SMsgVersion *pMsgVersion = pMsg->pCont;
if (taosCheckVersion(pMsgVersion->clientVersion, version, 3) != TSDB_CODE_SUCCESS) {
rpcMsg.code = TSDB_CODE_TSC_INVALID_VERSION;
rpcSendResponse(&rpcMsg);
rpcFreeCont(pMsg->pCont);
return; // todo change the error code
}
pMsg->pCont += sizeof(*pMsgVersion);
pMsg->contLen -= sizeof(*pMsgVersion);
(*dnodeProcessShellMsgFp[pMsg->msgType])(pMsg); (*dnodeProcessShellMsgFp[pMsg->msgType])(pMsg);
//pMsg->contLen += sizeof(*pMsgVersion);
rpcFreeCont(pMsg->pCont - sizeof(*pMsgVersion));
} else { } else {
dError("RPC %p, shell msg:%s is not processed", pMsg->handle, taosMsg[pMsg->msgType]); dError("RPC %p, shell msg:%s is not processed", pMsg->handle, taosMsg[pMsg->msgType]);
rpcMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED; rpcMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
...@@ -244,4 +231,4 @@ SStatisInfo dnodeGetStatisInfo() { ...@@ -244,4 +231,4 @@ SStatisInfo dnodeGetStatisInfo() {
} }
return info; return info;
} }
\ No newline at end of file
...@@ -77,6 +77,8 @@ void dnodeDispatchToVReadQueue(SRpcMsg *pMsg) { ...@@ -77,6 +77,8 @@ void dnodeDispatchToVReadQueue(SRpcMsg *pMsg) {
SRpcMsg rpcRsp = {.handle = pMsg->handle, .code = TSDB_CODE_VND_INVALID_VGROUP_ID}; SRpcMsg rpcRsp = {.handle = pMsg->handle, .code = TSDB_CODE_VND_INVALID_VGROUP_ID};
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
} }
rpcFreeCont(pMsg->pCont);
} }
void *dnodeAllocVQueryQueue(void *pVnode) { void *dnodeAllocVQueryQueue(void *pVnode) {
......
...@@ -102,6 +102,7 @@ void dnodeDispatchToVWriteQueue(SRpcMsg *pRpcMsg) { ...@@ -102,6 +102,7 @@ void dnodeDispatchToVWriteQueue(SRpcMsg *pRpcMsg) {
} }
vnodeRelease(pVnode); vnodeRelease(pVnode);
rpcFreeCont(pRpcMsg->pCont);
} }
void *dnodeAllocVWriteQueue(void *pVnode) { void *dnodeAllocVWriteQueue(void *pVnode) {
......
...@@ -198,11 +198,6 @@ typedef struct { ...@@ -198,11 +198,6 @@ typedef struct {
int32_t numOfVnodes; int32_t numOfVnodes;
} SMsgDesc; } SMsgDesc;
typedef struct SMsgVersion {
char clientVersion[TSDB_VERSION_LEN];
uint32_t crc;
} SMsgVersion;
typedef struct SMsgHead { typedef struct SMsgHead {
int32_t contLen; int32_t contLen;
int32_t vgId; int32_t vgId;
......
...@@ -294,6 +294,11 @@ void mnodeGetMnodeEpSetForShell(SRpcEpSet *epSet, bool redirect) { ...@@ -294,6 +294,11 @@ void mnodeGetMnodeEpSetForShell(SRpcEpSet *epSet, bool redirect) {
*epSet = tsMEpForShell; *epSet = tsMEpForShell;
mnodeMnodeUnLock(); mnodeMnodeUnLock();
if (mnodeGetDnodesNum() <= 1) {
epSet->numOfEps = 0;
return;
}
mTrace("vgId:1, mnodes epSet for shell is returned, num:%d inUse:%d", tsMEpForShell.numOfEps, tsMEpForShell.inUse); mTrace("vgId:1, mnodes epSet for shell is returned, num:%d inUse:%d", tsMEpForShell.numOfEps, tsMEpForShell.inUse);
for (int32_t i = 0; i < epSet->numOfEps; ++i) { for (int32_t i = 0; i < epSet->numOfEps; ++i) {
if (redirect && strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort) { if (redirect && strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort) {
......
...@@ -346,8 +346,8 @@ create_table_args(A) ::= ifnotexists(U) ids(V) cpxName(Z) AS select(S). { ...@@ -346,8 +346,8 @@ create_table_args(A) ::= ifnotexists(U) ids(V) cpxName(Z) AS select(S). {
A = tSetCreateSQLElems(NULL, NULL, S, TSQL_CREATE_STREAM); A = tSetCreateSQLElems(NULL, NULL, S, TSQL_CREATE_STREAM);
setSQLInfo(pInfo, A, NULL, TSDB_SQL_CREATE_TABLE); setSQLInfo(pInfo, A, NULL, TSDB_SQL_CREATE_TABLE);
U.n += Z.n; V.n += Z.n;
setCreatedTableName(pInfo, &U, &V); setCreatedTableName(pInfo, &V, &U);
} }
%type column{TAOS_FIELD} %type column{TAOS_FIELD}
......
...@@ -2420,8 +2420,8 @@ static void yy_reduce( ...@@ -2420,8 +2420,8 @@ static void yy_reduce(
yylhsminor.yy538 = tSetCreateSQLElems(NULL, NULL, yymsp[0].minor.yy84, TSQL_CREATE_STREAM); yylhsminor.yy538 = tSetCreateSQLElems(NULL, NULL, yymsp[0].minor.yy84, TSQL_CREATE_STREAM);
setSQLInfo(pInfo, yylhsminor.yy538, NULL, TSDB_SQL_CREATE_TABLE); setSQLInfo(pInfo, yylhsminor.yy538, NULL, TSDB_SQL_CREATE_TABLE);
yymsp[-4].minor.yy0.n += yymsp[-2].minor.yy0.n; yymsp[-3].minor.yy0.n += yymsp[-2].minor.yy0.n;
setCreatedTableName(pInfo, &yymsp[-4].minor.yy0, &yymsp[-3].minor.yy0); setCreatedTableName(pInfo, &yymsp[-3].minor.yy0, &yymsp[-4].minor.yy0);
} }
yymsp[-4].minor.yy538 = yylhsminor.yy538; yymsp[-4].minor.yy538 = yylhsminor.yy538;
break; break;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册