未验证 提交 bb99a407 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #15057 from taosdata/fix/valgrind

fix: let certain type of write messages to be executed sequentially
......@@ -179,6 +179,16 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
} else {
dGTrace("vgId:%d, msg:%p put into vnode-write queue", pVnode->vgId, pMsg);
taosWriteQitem(pVnode->pWriteQ, pMsg);
#if 0 // tests for batch writes
if (pMsg->msgType == TDMT_VND_CREATE_TABLE) {
SRpcMsg *pDup = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
memcpy(pDup, pMsg, sizeof(SRpcMsg));
pDup->pCont = rpcMallocCont(pMsg->contLen);
memcpy(pDup->pCont, pMsg->pCont, pMsg->contLen);
pDup->info.handle = NULL;
taosWriteQitem(pVnode->pWriteQ, pDup);
}
#endif
}
break;
case SYNC_QUEUE:
......
......@@ -53,6 +53,7 @@ int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
*(int64_t *)(dc.data + dc.pos) = uid;
*(int64_t *)(dc.data + dc.pos + 8) = ctime;
vTrace("vgId:%d, table:%s uid:%" PRId64 " is generated", pVnode->config.vgId, name, uid);
tEndDecode(&dc);
}
......@@ -381,7 +382,7 @@ static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *p
goto end;
}
vInfo("vgId:%d, drop ttl table req will be processed, time:%d", pVnode->config.vgId, ttlReq.timestamp);
vDebug("vgId:%d, drop ttl table req will be processed, time:%d", pVnode->config.vgId, ttlReq.timestamp);
int32_t ret = metaTtlDropTable(pVnode->pMeta, ttlReq.timestamp, tbUids);
if (ret != 0) {
goto end;
......
......@@ -17,35 +17,22 @@
#include "vnd.h"
static inline bool vnodeIsMsgBlock(tmsg_t type) {
return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_CONFIRM) || (type == TDMT_VND_ALTER_REPLICA);
return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_CREATE_TABLE) ||
(type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) || (type == TDMT_VND_UPDATE_TAG_VAL);
}
static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; }
static inline void vnodeAccumBlockMsg(SVnode *pVnode, tmsg_t type) {
if (!vnodeIsMsgBlock(type)) return;
int32_t count = atomic_add_fetch_32(&pVnode->blockCount, 1);
vTrace("vgId:%d, accum block, count:%d type:%s", pVnode->config.vgId, count, TMSG_INFO(type));
}
static inline void vnodeWaitBlockMsg(SVnode *pVnode) {
int32_t count = atomic_load_32(&pVnode->blockCount);
if (count <= 0) return;
vTrace("vgId:%d, wait block finish, count:%d", pVnode->config.vgId, count);
tsem_wait(&pVnode->syncSem);
static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
if (vnodeIsMsgBlock(pMsg->msgType)) {
vTrace("vgId:%d, msg:%p wait block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
tsem_wait(&pVnode->syncSem);
}
}
static inline void vnodePostBlockMsg(SVnode *pVnode, tmsg_t type) {
if (!vnodeIsMsgBlock(type)) return;
int32_t count = atomic_load_32(&pVnode->blockCount);
if (count <= 0) return;
count = atomic_sub_fetch_32(&pVnode->blockCount, 1);
vTrace("vgId:%d, post block, count:%d type:%s", pVnode->config.vgId, count, TMSG_INFO(type));
if (count <= 0) {
static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
if (vnodeIsMsgBlock(pMsg->msgType)) {
vTrace("vgId:%d, msg:%p post block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
tsem_post(&pVnode->syncSem);
}
}
......@@ -143,6 +130,8 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs)
int32_t code = 0;
SRpcMsg *pMsg = NULL;
vTrace("vgId:%d, get %d msgs from vnode-write queue", vgId, numOfMsgs);
for (int32_t m = 0; m < numOfMsgs; m++) {
if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
const STraceId *trace = &pMsg->info.traceId;
......@@ -165,13 +154,14 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs)
if (rsp.info.handle != NULL) {
tmsgSendRsp(&rsp);
}
} else if (code == 0) {
vnodeWaitBlockMsg(pVnode, pMsg);
} else {
}
}
}
if (code == 0) {
vnodeAccumBlockMsg(pVnode, pMsg->msgType);
} else if (code < 0) {
if (code < 0) {
if (terrno == TSDB_CODE_SYN_NOT_LEADER) {
vnodeRedirectRpcMsg(pVnode, pMsg);
} else {
......@@ -182,15 +172,12 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs)
tmsgSendRsp(&rsp);
}
}
} else {
}
vGTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, code);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
vnodeWaitBlockMsg(pVnode);
}
void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
......@@ -213,7 +200,7 @@ void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
}
}
vnodePostBlockMsg(pVnode, pMsg->msgType);
vnodePostBlockMsg(pVnode, pMsg);
if (rsp.info.handle != NULL) {
tmsgSendRsp(&rsp);
}
......@@ -418,7 +405,7 @@ static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReCon
tmsgSendRsp(&rpcMsg);
}
vnodePostBlockMsg(pVnode, TDMT_VND_ALTER_REPLICA);
vnodePostBlockMsg(pVnode, pMsg);
}
static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
......
......@@ -312,6 +312,7 @@
./test.sh -f tsim/valgrind/checkError3.sim
./test.sh -f tsim/valgrind/checkError4.sim
./test.sh -f tsim/valgrind/checkError5.sim
./test.sh -f tsim/valgrind/checkError6.sim
# --- vnode
# unsupport ./test.sh -f tsim/vnode/replica3_basic.sim
......
......@@ -111,6 +111,15 @@ if $hasleader != 1 then
goto step2
endi
# sql use db;
# sql create table stb (ts timestamp, c int) tags (t int);
# sql create table t0 using stb tags (0);
# sql insert into t0 values(now, 1);
# sql show db.stables;
# sql show db.tables;
# sql show db.vgroups;
return
sql create table db.stb (ts timestamp, c1 int, c2 binary(4)) tags(t1 int, t2 binary(16)) comment "abd"
sql create table db.ctb using db.stb tags(101, "102")
sql insert into db.ctb values(now, 1, "2")
......
......@@ -26,6 +26,7 @@ print =============== step2: create db
sql create database db
sql use db
sql create table db.stb (ts timestamp, c1 int, c2 binary(4)) tags(t1 int, t2 float, t3 binary(16)) comment "abd"
sql create table db.c1 using db.stb tags(101, 102, "103")
print =============== step3: alter stb
sql_error alter table db.stb add column ts int
......@@ -42,9 +43,8 @@ sql alter table db.stb drop tag c1
sql alter table db.stb drop tag t5
sql alter table db.stb MODIFY tag t3 binary(32)
sql alter table db.stb rename tag t1 tx
sql alter table db.stb comment 'abcde' ;
goto _OVER
sql drop table db.stb
print =============== step4: alter tb
sql create table tb (ts timestamp, a int)
......@@ -66,6 +66,35 @@ sql alter table tb add column h binary(10)
sql select count(a), count(b), count(c), count(d), count(e), count(f), count(g), count(h) from tb
sql select * from tb order by ts desc
print =============== step5: alter stb and insert data
sql create table stb (ts timestamp, c1 int, c2 binary(4)) tags(t1 int, t2 float, t3 binary(16)) comment "abd"
sql show db.stables
sql describe stb
sql_error alter table stb add column ts int
sql create table db.ctb using db.stb tags(101, 102, "103")
sql insert into db.ctb values(now, 1, "2")
sql show db.tables
sql select * from db.stb
sql select * from tb
sql alter table stb add column c3 int
sql describe stb
sql select * from db.stb
sql select * from tb
sql insert into db.ctb values(now+1s, 1, 2, 3)
sql select * from db.stb
sql alter table db.stb add column c4 bigint
sql select * from db.stb
sql insert into db.ctb values(now+2s, 1, 2, 3, 4)
sql alter table db.stb drop column c1
sql reset query cache
sql select * from tb
sql insert into db.ctb values(now+3s, 2, 3, 4)
sql select * from db.stb
_OVER:
system sh/exec.sh -n dnode1 -s stop -x SIGINT
print =============== check
......
......@@ -68,7 +68,7 @@ $null=
system_content sh/checkValgrind.sh -n dnode1
print cmd return result ----> [ $system_content ]
if $system_content > 0 then
if $system_content > 3 then
return -1
endi
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册