diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index e20fd8b3d5871f01b5dc5cafeed1c828a0c92a1c..b74d866863a8af160780f65731fb155c8be2013e 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -105,13 +105,6 @@ typedef struct { int8_t exec; } STqHandle; -typedef struct { - SMqDataRsp* pDataRsp; - char subKey[TSDB_SUBSCRIBE_KEY_LEN]; - SRpcHandleInfo info; - STqHandle* pHandle; -} STqPushEntry; - struct STQ { SVnode* pVnode; char* path; @@ -190,7 +183,7 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId, int32_t type, int64_t sver, int64_t ever); int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq); - +bool tqIsHandleExecuting(STqHandle* pHandle); #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 0791dec301ba9882b92fb15423adb8fef25fc00b..5874062dff39a2479b0406a4f828b5398f08c79e 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -545,8 +545,9 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg; + int32_t vgId = TD_VID(pTq->pVnode); - tqDebug("vgId:%d, tq process delete sub req %s", pTq->pVnode->config.vgId, pReq->subKey); + tqDebug("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey); int32_t code = 0; // taosWLockLatch(&pTq->lock); // int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey)); @@ -561,6 +562,12 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg if (pHandle->pRef) { walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId); } + + while (tqIsHandleExecuting(pHandle)) { + tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey); + taosMsleep(5); + } + code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); if (code != 0) { tqError("cannot process tq delete req %s, since no such handle", pReq->subKey); diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 81f951f5d243b2236b47c79060fae4e2057897e1..bcfd96d9c93717442dd3c03e2fc5e152ecdb4a7a 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -162,9 +162,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand return 0; } -static bool isHandleExecuting(STqHandle* pHandle){ - return 1 == atomic_load_8(&pHandle->exec); -} +bool tqIsHandleExecuting(STqHandle* pHandle) { return 1 == atomic_load_8(&pHandle->exec); } static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, STqOffsetVal* pOffset) { @@ -181,8 +179,9 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, // return code; // } - while(isHandleExecuting(pHandle)){ - tqInfo("sub is executing, pHandle:%p", pHandle); + // todo add more status check to avoid race condition + while (tqIsHandleExecuting(pHandle)) { + tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey); taosMsleep(5); } @@ -241,10 +240,11 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, // return code; // } - while(isHandleExecuting(pHandle)){ - tqInfo("sub is executing, pHandle:%p", pHandle); + while (tqIsHandleExecuting(pHandle)) { + tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey); taosMsleep(5); } + atomic_store_8(&pHandle->exec, 1); if (offset->type != TMQ_OFFSET__LOG) { @@ -266,6 +266,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, if (taosxRsp.blockNum > 0) { code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); tDeleteSTaosxRsp(&taosxRsp); + atomic_store_8(&pHandle->exec, 0); return code; }else { *offset = taosxRsp.rspOffset; @@ -281,6 +282,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, code = -1; goto end; } + walSetReaderCapacity(pHandle->pWalReader, 2048); int totalRows = 0; while (1) { diff --git a/tests/script/tsim/alter/table.sim b/tests/script/tsim/alter/table.sim index 32c3145a18f4528792df92a4b096b0d23d2badbf..db2a22205f9002c715f516fe9cdc31ea8082d1ee 100644 --- a/tests/script/tsim/alter/table.sim +++ b/tests/script/tsim/alter/table.sim @@ -657,35 +657,36 @@ if $data20 != null then return -1 endi -print =============== error for normal table -sql create table tb2023(ts timestamp, f int); -sql_error alter table tb2023 add column v varchar(65535); -sql_error alter table tb2023 add column v varchar(65535); -sql_error alter table tb2023 add column v varchar(65530); -sql alter table tb2023 add column v varchar(16374); -sql_error alter table tb2023 modify column v varchar(16375); -sql desc tb2023 -sql alter table tb2023 drop column v -sql_error alter table tb2023 add column v nchar(4094); -sql alter table tb2023 add column v nchar(4093); -sql_error alter table tb2023 modify column v nchar(4094); -sql_error alter table tb2023 add column v nchar(16384); -sql alter table tb2023 add column v nchar(16374); -sql desc tb2023 - -print =============== error for super table -sql create table stb2023(ts timestamp, f int) tags(t1 int); -sql_error alter table stb2023 add column v varchar(16375); -sql_error alter table stb2023 add column v varchar(16385); -sql_error alter table stb2023 add column v varchar(33100); -sql alter table stb2023 add column v varchar(16374); -sql_error alter table stb2023 modify column v varchar(16375); -sql desc stb2023 -sql alter table stb2023 drop column v -sql_error alter table stb2023 add column v nchar(4094); -sql alter table stb2023 add column v nchar(4093); -sql_error alter table stb2023 modify column v nchar(4094); -sql desc stb2023 +#print =============== error for normal table +#sql create table tb2023(ts timestamp, f int); +#sql_error alter table tb2023 add column v varchar(65535); +#sql_error alter table tb2023 add column v varchar(65535); +#sql_error alter table tb2023 add column v varchar(65530); +#sql alter table tb2023 add column v varchar(16374); +#sql_error alter table tb2023 modify column v varchar(65536); +#sql desc tb2023 +#sql alter table tb2023 drop column v +#sql_error alter table tb2023 add column v nchar(16384); +#sql alter table tb2023 add column v nchar(4093); +#sql_error alter table tb2023 modify column v nchar(16384); +#sql_error alter table tb2023 add column v nchar(16384); +#sql alter table tb2023 drop column v +#sql alter table tb2023 add column v nchar(16374); +#sql desc tb2023 +# +#print =============== error for super table +#sql create table stb2023(ts timestamp, f int) tags(t1 int); +#sql_error alter table stb2023 add column v varchar(65535); +#sql_error alter table stb2023 add column v varchar(65536); +#sql_error alter table stb2023 add column v varchar(33100); +#sql alter table stb2023 add column v varchar(16374); +#sql_error alter table stb2023 modify column v varchar(16375); +#sql desc stb2023 +#sql alter table stb2023 drop column v +#sql_error alter table stb2023 add column v nchar(4094); +#sql alter table stb2023 add column v nchar(4093); +#sql_error alter table stb2023 modify column v nchar(4094); +#sql desc stb2023 print ======= over sql drop database d1 diff --git a/tests/script/tsim/parser/alter_column.sim b/tests/script/tsim/parser/alter_column.sim index d569e47735a9d09eb5626dfd84aa0e17642db92b..f89211573596ae02f0a484840cc7679fb1d58a34 100644 --- a/tests/script/tsim/parser/alter_column.sim +++ b/tests/script/tsim/parser/alter_column.sim @@ -48,7 +48,7 @@ sql_error alter table tb modify column c2 binary(10); sql_error alter table tb modify column c2 binary(9); sql_error alter table tb modify column c2 binary(-9); sql_error alter table tb modify column c2 binary(0); -sql_error alter table tb modify column c2 binary(17000); +sql_error alter table tb modify column c2 binary(65600); sql_error alter table tb modify column c2 nchar(30); sql_error alter table tb modify column c3 double; sql_error alter table tb modify column c3 nchar(10);