提交 1d9dd153 编写于 作者: wmmhello's avatar wmmhello

fix:error in pHandle lock

上级 4bb25c28
......@@ -93,6 +93,7 @@ typedef struct {
typedef enum tq_handle_status{
TMQ_HANDLE_STATUS_IDLE = 0,
TMQ_HANDLE_STATUS_EXEC = 1,
TMQ_HANDLE_STATUS_DELETE = 2,
}tq_handle_status;
typedef struct {
......
......@@ -25,6 +25,7 @@ static int32_t tqInitialize(STQ* pTq);
static FORCE_INLINE bool tqIsHandleExec(STqHandle* pHandle) { return TMQ_HANDLE_STATUS_EXEC == pHandle->status; }
static FORCE_INLINE void tqSetHandleExec(STqHandle* pHandle) {pHandle->status = TMQ_HANDLE_STATUS_EXEC;}
static FORCE_INLINE void tqSetHandleIdle(STqHandle* pHandle) {pHandle->status = TMQ_HANDLE_STATUS_IDLE;}
static FORCE_INLINE void tqSetHandleDelete(STqHandle* pHandle) {pHandle->status = TMQ_HANDLE_STATUS_DELETE;}
int32_t tqInit() {
int8_t old;
......@@ -297,13 +298,10 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t
}
if (offset.val.type == TMQ_OFFSET__LOG) {
taosWLockLatch(&pTq->lock);
STqHandle* pHandle = taosHashGet(pTq->pHandle, offset.subKey, strlen(offset.subKey));
if (pHandle && (walSetRefVer(pHandle->pRef, offset.val.version) < 0)) {
taosWUnLockLatch(&pTq->lock);
return -1;
}
taosWUnLockLatch(&pTq->lock);
}
return 0;
......@@ -337,6 +335,7 @@ int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
SMqPollReq req = {0};
int code = 0;
if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
terrno = TSDB_CODE_INVALID_MSG;
......@@ -347,20 +346,29 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t reqEpoch = req.epoch;
STqOffsetVal reqOffset = req.reqOffset;
int32_t vgId = TD_VID(pTq->pVnode);
STqHandle* pHandle = NULL;
taosWLockLatch(&pTq->lock);
// 1. find handle
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
if (pHandle == NULL) {
tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey);
terrno = TSDB_CODE_INVALID_MSG;
while (1) {
taosWLockLatch(&pTq->lock);
// 1. find handle
pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
if (pHandle == NULL) {
tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey);
terrno = TSDB_CODE_INVALID_MSG;
taosWUnLockLatch(&pTq->lock);
return -1;
}
bool exec = tqIsHandleExec(pHandle);
if(!exec) {
tqSetHandleExec(pHandle);
taosWUnLockLatch(&pTq->lock);
break;
}
taosWUnLockLatch(&pTq->lock);
return -1;
}
while (tqIsHandleExec(pHandle)) {
tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry, pHandle:%p", consumerId, vgId, req.subKey, pHandle);
taosMsleep(5);
taosMsleep(10);
}
// 2. check re-balance status
......@@ -368,12 +376,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
taosWUnLockLatch(&pTq->lock);
return -1;
code = -1;
goto end;
}
tqSetHandleExec(pHandle);
tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId, req.subKey, pHandle);
taosWUnLockLatch(&pTq->lock);
// 3. update the epoch value
int32_t savedEpoch = pHandle->epoch;
......@@ -388,7 +395,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,
consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);
int code = tqExtractDataForMq(pTq, pHandle, &req, pMsg);
code = tqExtractDataForMq(pTq, pHandle, &req, pMsg);
end:
tqSetHandleIdle(pHandle);
tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, , set handle idle, pHandle:%p", consumerId, vgId, req.subKey, pHandle);
return code;
......@@ -405,8 +414,8 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
if (pHandle) {
while (tqIsHandleExec(pHandle)) {
tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey);
taosMsleep(5);
tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry, pHandle:%p", vgId, pHandle->subKey, pHandle);
taosMsleep(10);
}
if (pHandle->pRef) {
walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId);
......@@ -473,7 +482,6 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pVnode->config.vgId, req.subKey,
req.oldConsumerId, req.newConsumerId);
taosWLockLatch(&pTq->lock);
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
if (pHandle == NULL) {
if (req.oldConsumerId != -1) {
......@@ -556,20 +564,14 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
goto end;
} else {
while (tqIsHandleExec(pHandle)) {
tqDebug("sub req vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry, pHandle:%p", vgId, pHandle->subKey, pHandle);
taosMsleep(5);
}
if (pHandle->consumerId == req.newConsumerId) { // do nothing
tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs", req.vgId, req.newConsumerId);
atomic_add_fetch_32(&pHandle->epoch, 1);
// atomic_add_fetch_32(&pHandle->epoch, 1);
} else {
tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
req.newConsumerId);
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
atomic_store_32(&pHandle->epoch, 0);
// atomic_store_32(&pHandle->epoch, 0);
}
// kill executing task
qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
......@@ -580,13 +582,15 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
qStreamCloseTsdbReader(pTaskInfo);
}
// remove if it has been register in the push manager, and return one empty block to consumer
taosWLockLatch(&pTq->lock);
tqUnregisterPushHandle(pTq, pHandle);
taosWUnLockLatch(&pTq->lock);
ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
goto end;
}
end:
taosWUnLockLatch(&pTq->lock);
taosMemoryFree(req.qmsg);
return ret;
}
......
......@@ -352,9 +352,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, 0);
}
tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, vgId);
taosWLockLatch(&pTq->lock);
taosHashPut(pTq->pHandle, pKey, kLen, &handle, sizeof(STqHandle));
taosWUnLockLatch(&pTq->lock);
}
end:
......
......@@ -78,7 +78,6 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) {
// todo remove this
if (offset.val.type == TMQ_OFFSET__LOG) {
taosWLockLatch(&pStore->pTq->lock);
STqHandle* pHandle = taosHashGet(pStore->pTq->pHandle, offset.subKey, strlen(offset.subKey));
if (pHandle) {
if (walSetRefVer(pHandle->pRef, offset.val.version) < 0) {
......@@ -86,7 +85,6 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) {
// offset.val.version);
}
}
taosWUnLockLatch(&pStore->pTq->lock);
}
taosMemoryFree(pMemBuf);
......
......@@ -1035,7 +1035,6 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
int32_t vgId = TD_VID(pTq->pVnode);
// update the table list for each consumer handle
taosWLockLatch(&pTq->lock);
while (1) {
pIter = taosHashIterate(pTq->pHandle, pIter);
if (pIter == NULL) {
......@@ -1092,7 +1091,6 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
}
}
}
taosWUnLockLatch(&pTq->lock);
// update the table list handle for each stream scanner/wal reader
taosWLockLatch(&pTq->pStreamMeta->lock);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册