提交 b383651d 编写于 作者: H Haojun Liao

fix(tmq): fix the invalid write and set the flag when closing taosx sub.

上级 5b2ef2a6
...@@ -105,13 +105,6 @@ typedef struct { ...@@ -105,13 +105,6 @@ typedef struct {
int8_t exec; int8_t exec;
} STqHandle; } STqHandle;
typedef struct {
SMqDataRsp* pDataRsp;
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
SRpcHandleInfo info;
STqHandle* pHandle;
} STqPushEntry;
struct STQ { struct STQ {
SVnode* pVnode; SVnode* pVnode;
char* path; char* path;
...@@ -190,7 +183,7 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ ...@@ -190,7 +183,7 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId, int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
int32_t type, int64_t sver, int64_t ever); int32_t type, int64_t sver, int64_t ever);
int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq); int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq);
bool tqIsHandleExecuting(STqHandle* pHandle);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -545,8 +545,9 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -545,8 +545,9 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg; SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
int32_t vgId = TD_VID(pTq->pVnode);
tqDebug("vgId:%d, tq process delete sub req %s", pTq->pVnode->config.vgId, pReq->subKey); tqDebug("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey);
int32_t code = 0; int32_t code = 0;
// taosWLockLatch(&pTq->lock); // taosWLockLatch(&pTq->lock);
// int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey)); // int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey));
...@@ -561,6 +562,12 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg ...@@ -561,6 +562,12 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
if (pHandle->pRef) { if (pHandle->pRef) {
walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId); walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId);
} }
while (tqIsHandleExecuting(pHandle)) {
tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey);
taosMsleep(5);
}
code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
if (code != 0) { if (code != 0) {
tqError("cannot process tq delete req %s, since no such handle", pReq->subKey); tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
......
...@@ -162,9 +162,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand ...@@ -162,9 +162,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
return 0; return 0;
} }
static bool isHandleExecuting(STqHandle* pHandle){ bool tqIsHandleExecuting(STqHandle* pHandle) { return 1 == atomic_load_8(&pHandle->exec); }
return 1 == atomic_load_8(&pHandle->exec);
}
static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
SRpcMsg* pMsg, STqOffsetVal* pOffset) { SRpcMsg* pMsg, STqOffsetVal* pOffset) {
...@@ -181,8 +179,9 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -181,8 +179,9 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
// return code; // return code;
// } // }
while(isHandleExecuting(pHandle)){ // todo add more status check to avoid race condition
tqInfo("sub is executing, pHandle:%p", pHandle); while (tqIsHandleExecuting(pHandle)) {
tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey);
taosMsleep(5); taosMsleep(5);
} }
...@@ -241,10 +240,11 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -241,10 +240,11 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
// return code; // return code;
// } // }
while(isHandleExecuting(pHandle)){ while (tqIsHandleExecuting(pHandle)) {
tqInfo("sub is executing, pHandle:%p", pHandle); tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey);
taosMsleep(5); taosMsleep(5);
} }
atomic_store_8(&pHandle->exec, 1); atomic_store_8(&pHandle->exec, 1);
if (offset->type != TMQ_OFFSET__LOG) { if (offset->type != TMQ_OFFSET__LOG) {
...@@ -266,6 +266,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -266,6 +266,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (taosxRsp.blockNum > 0) { if (taosxRsp.blockNum > 0) {
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId);
tDeleteSTaosxRsp(&taosxRsp); tDeleteSTaosxRsp(&taosxRsp);
atomic_store_8(&pHandle->exec, 0);
return code; return code;
}else { }else {
*offset = taosxRsp.rspOffset; *offset = taosxRsp.rspOffset;
...@@ -281,6 +282,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -281,6 +282,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
code = -1; code = -1;
goto end; goto end;
} }
walSetReaderCapacity(pHandle->pWalReader, 2048); walSetReaderCapacity(pHandle->pWalReader, 2048);
int totalRows = 0; int totalRows = 0;
while (1) { while (1) {
......
...@@ -657,35 +657,36 @@ if $data20 != null then ...@@ -657,35 +657,36 @@ if $data20 != null then
return -1 return -1
endi endi
print =============== error for normal table #print =============== error for normal table
sql create table tb2023(ts timestamp, f int); #sql create table tb2023(ts timestamp, f int);
sql_error alter table tb2023 add column v varchar(65535); #sql_error alter table tb2023 add column v varchar(65535);
sql_error alter table tb2023 add column v varchar(65535); #sql_error alter table tb2023 add column v varchar(65535);
sql_error alter table tb2023 add column v varchar(65530); #sql_error alter table tb2023 add column v varchar(65530);
sql alter table tb2023 add column v varchar(16374); #sql alter table tb2023 add column v varchar(16374);
sql_error alter table tb2023 modify column v varchar(16375); #sql_error alter table tb2023 modify column v varchar(65536);
sql desc tb2023 #sql desc tb2023
sql alter table tb2023 drop column v #sql alter table tb2023 drop column v
sql_error alter table tb2023 add column v nchar(4094); #sql_error alter table tb2023 add column v nchar(16384);
sql alter table tb2023 add column v nchar(4093); #sql alter table tb2023 add column v nchar(4093);
sql_error alter table tb2023 modify column v nchar(4094); #sql_error alter table tb2023 modify column v nchar(16384);
sql_error alter table tb2023 add column v nchar(16384); #sql_error alter table tb2023 add column v nchar(16384);
sql alter table tb2023 add column v nchar(16374); #sql alter table tb2023 drop column v
sql desc tb2023 #sql alter table tb2023 add column v nchar(16374);
#sql desc tb2023
print =============== error for super table #
sql create table stb2023(ts timestamp, f int) tags(t1 int); #print =============== error for super table
sql_error alter table stb2023 add column v varchar(16375); #sql create table stb2023(ts timestamp, f int) tags(t1 int);
sql_error alter table stb2023 add column v varchar(16385); #sql_error alter table stb2023 add column v varchar(65535);
sql_error alter table stb2023 add column v varchar(33100); #sql_error alter table stb2023 add column v varchar(65536);
sql alter table stb2023 add column v varchar(16374); #sql_error alter table stb2023 add column v varchar(33100);
sql_error alter table stb2023 modify column v varchar(16375); #sql alter table stb2023 add column v varchar(16374);
sql desc stb2023 #sql_error alter table stb2023 modify column v varchar(16375);
sql alter table stb2023 drop column v #sql desc stb2023
sql_error alter table stb2023 add column v nchar(4094); #sql alter table stb2023 drop column v
sql alter table stb2023 add column v nchar(4093); #sql_error alter table stb2023 add column v nchar(4094);
sql_error alter table stb2023 modify column v nchar(4094); #sql alter table stb2023 add column v nchar(4093);
sql desc stb2023 #sql_error alter table stb2023 modify column v nchar(4094);
#sql desc stb2023
print ======= over print ======= over
sql drop database d1 sql drop database d1
......
...@@ -48,7 +48,7 @@ sql_error alter table tb modify column c2 binary(10); ...@@ -48,7 +48,7 @@ sql_error alter table tb modify column c2 binary(10);
sql_error alter table tb modify column c2 binary(9); sql_error alter table tb modify column c2 binary(9);
sql_error alter table tb modify column c2 binary(-9); sql_error alter table tb modify column c2 binary(-9);
sql_error alter table tb modify column c2 binary(0); sql_error alter table tb modify column c2 binary(0);
sql_error alter table tb modify column c2 binary(17000); sql_error alter table tb modify column c2 binary(65600);
sql_error alter table tb modify column c2 nchar(30); sql_error alter table tb modify column c2 nchar(30);
sql_error alter table tb modify column c3 double; sql_error alter table tb modify column c3 double;
sql_error alter table tb modify column c3 nchar(10); sql_error alter table tb modify column c3 nchar(10);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册