提交 36454c73 编写于 作者: wmmhello's avatar wmmhello

fix:add lock to pHandle for safety

上级 51d70ca2
......@@ -208,8 +208,7 @@ SWalRef *walRefCommittedVer(SWal *);
SWalRef *walOpenRef(SWal *);
void walCloseRef(SWal *pWal, int64_t refId);
int32_t walRefVer(SWalRef *, int64_t ver);
void walUnrefVer(SWalRef *);
int32_t walSetRefVer(SWalRef *, int64_t ver);
// helper function for raft
bool walLogExist(SWal *, int64_t ver);
......
......@@ -46,23 +46,23 @@ typedef struct STqOffsetStore STqOffsetStore;
// tqPush
typedef struct {
// msg info
int64_t consumerId;
int64_t reqOffset;
int64_t processedVer;
int32_t epoch;
// rpc info
int64_t reqId;
SRpcHandleInfo rpcInfo;
tmr_h timerId;
int8_t tmrStopped;
// exec
int8_t inputStatus;
int8_t execStatus;
SStreamQueue inputQ;
SRWLatch lock;
} STqPushHandle;
//typedef struct {
// // msg info
// int64_t consumerId;
// int64_t reqOffset;
// int64_t processedVer;
// int32_t epoch;
// // rpc info
// int64_t reqId;
// SRpcHandleInfo rpcInfo;
// tmr_h timerId;
// int8_t tmrStopped;
// // exec
// int8_t inputStatus;
// int8_t execStatus;
// SStreamQueue inputQ;
// SRWLatch lock;
//} STqPushHandle;
// tqExec
......@@ -90,6 +90,11 @@ typedef struct {
int32_t numOfCols; // number of out pout column, temporarily used
} STqExecHandle;
typedef enum tq_handle_status{
TMQ_HANDLE_STATUS_IDLE = 0,
TMQ_HANDLE_STATUS_EXEC = 1,
}tq_handle_status;
typedef struct {
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
int64_t consumerId;
......@@ -98,18 +103,18 @@ typedef struct {
int64_t snapshotVer;
SWalReader* pWalReader;
SWalRef* pRef;
STqPushHandle pushHandle; // push
// STqPushHandle pushHandle; // push
STqExecHandle execHandle; // exec
SRpcMsg* msg;
int32_t noDataPollCnt;
int8_t exec;
tq_handle_status status;
} STqHandle;
typedef struct {
SMqDataRsp* pDataRsp;
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
SRpcHandleInfo info;
} STqPushEntry;
//typedef struct {
// SMqDataRsp* pDataRsp;
// char subKey[TSDB_SUBSCRIBE_KEY_LEN];
// SRpcHandleInfo info;
//} STqPushEntry;
struct STQ {
SVnode* pVnode;
......@@ -185,7 +190,9 @@ int32_t tqStreamTasksScanWal(STQ* pTq);
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver);
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);
bool tqIsHandleExecuting(STqHandle* pHandle);
FORCE_INLINE bool tqIsHandleExec(STqHandle* pHandle) { return TMQ_HANDLE_STATUS_EXEC == pHandle->status; }
FORCE_INLINE void tqSetHandleExec(STqHandle* pHandle) {pHandle->status = TMQ_HANDLE_STATUS_EXEC;}
FORCE_INLINE void tqSetHandleIdle(STqHandle* pHandle) {pHandle->status = TMQ_HANDLE_STATUS_IDLE;}
#ifdef __cplusplus
}
......
......@@ -61,6 +61,9 @@ void tqCleanUp() {
static void destroyTqHandle(void* data) {
STqHandle* pData = (STqHandle*)data;
qDestroyTask(pData->execHandle.task);
if (pData->pRef) {
walCloseRef(pData->pWalReader->pWal, pData->pRef->refId);
}
if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
taosMemoryFreeClear(pData->execHandle.execCol.qmsg);
} else if (pData->execHandle.subType == TOPIC_SUB_TYPE__DB) {
......@@ -292,10 +295,13 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t
}
if (offset.val.type == TMQ_OFFSET__LOG) {
taosWLockLatch(&pTq->lock);
STqHandle* pHandle = taosHashGet(pTq->pHandle, offset.subKey, strlen(offset.subKey));
if (pHandle && (walRefVer(pHandle->pRef, offset.val.version) < 0)) {
if (pHandle && (walSetRefVer(pHandle->pRef, offset.val.version) < 0)) {
taosWUnLockLatch(&pTq->lock);
return -1;
}
taosWUnLockLatch(&pTq->lock);
}
return 0;
......@@ -340,34 +346,38 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
STqOffsetVal reqOffset = req.reqOffset;
int32_t vgId = TD_VID(pTq->pVnode);
taosWLockLatch(&pTq->lock);
// 1. find handle
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
if (pHandle == NULL) {
tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey);
terrno = TSDB_CODE_INVALID_MSG;
taosWUnLockLatch(&pTq->lock);
return -1;
}
while (tqIsHandleExec(pHandle)) {
tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", consumerId, vgId, req.subKey);
taosMsleep(5);
}
tqSetHandleExec(pHandle);
taosWUnLockLatch(&pTq->lock);
// 2. check re-balance status
taosRLockLatch(&pTq->lock);
if (pHandle->consumerId != consumerId) {
tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
taosRUnLockLatch(&pTq->lock);
return -1;
}
taosRUnLockLatch(&pTq->lock);
// 3. update the epoch value
taosWLockLatch(&pTq->lock);
int32_t savedEpoch = pHandle->epoch;
if (savedEpoch < reqEpoch) {
tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch,
reqEpoch);
pHandle->epoch = reqEpoch;
}
taosWUnLockLatch(&pTq->lock);
char buf[80];
tFormatOffset(buf, 80, &reqOffset);
......@@ -384,18 +394,13 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
tqDebug("vgId:%d, tq process delete sub req %s", pTq->pVnode->config.vgId, pReq->subKey);
int32_t code = 0;
taosWLockLatch(&pTq->lock);
STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
if (pHandle) {
// walCloseRef(pHandle->pWalReader->pWal, pHandle->pRef->refId);
if (pHandle->pRef) {
walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId);
}
while (tqIsHandleExecuting(pHandle)) {
while (tqIsHandleExec(pHandle)) {
tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey);
taosMsleep(5);
}
code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
if (code != 0) {
tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
......@@ -410,6 +415,8 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) {
tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
}
taosWUnLockLatch(&pTq->lock);
return 0;
}
......@@ -456,6 +463,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pVnode->config.vgId, req.subKey,
req.oldConsumerId, req.newConsumerId);
taosWLockLatch(&pTq->lock);
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
if (pHandle == NULL) {
if (req.oldConsumerId != -1) {
......@@ -507,7 +515,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
pHandle->execHandle.pTqReader = tqReaderOpen(pVnode);
pHandle->execHandle.execDb.pFilterOutTbUid =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta,
(SSnapContext**)(&handle.sContext));
......@@ -538,6 +546,11 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
goto end;
} else {
while (tqIsHandleExec(pHandle)) {
tqDebug("sub req vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey);
taosMsleep(5);
}
if (pHandle->consumerId == req.newConsumerId) { // do nothing
tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs", req.vgId, req.newConsumerId);
atomic_add_fetch_32(&pHandle->epoch, 1);
......@@ -553,22 +566,17 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
if (pTaskInfo != NULL) {
qKillTask(pTaskInfo, TSDB_CODE_SUCCESS);
}
taosWLockLatch(&pTq->lock);
// remove if it has been register in the push manager, and return one empty block to consumer
tqUnregisterPushHandle(pTq, pHandle);
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
qStreamCloseTsdbReader(pTaskInfo);
}
taosWUnLockLatch(&pTq->lock);
// remove if it has been register in the push manager, and return one empty block to consumer
tqUnregisterPushHandle(pTq, pHandle);
ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
goto end;
}
end:
taosWUnLockLatch(&pTq->lock);
taosMemoryFree(req.qmsg);
return ret;
}
......
......@@ -54,7 +54,7 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
if (tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.execCol.qmsg) < 0) return -1;
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
pHandle->execHandle.execDb.pFilterOutTbUid =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
int32_t size = 0;
if (tDecodeI32(pDecoder, &size) < 0) return -1;
for (int32_t i = 0; i < size; i++) {
......@@ -295,7 +295,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
code = -1;
goto end;
}
walRefVer(handle.pRef, handle.snapshotVer);
walSetRefVer(handle.pRef, handle.snapshotVer);
SReadHandle reader = {
.meta = pTq->pVnode->pMeta,
......@@ -352,7 +352,9 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, 0);
}
tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, vgId);
taosWLockLatch(&pTq->lock);
taosHashPut(pTq->pHandle, pKey, kLen, &handle, sizeof(STqHandle));
taosWUnLockLatch(&pTq->lock);
}
end:
......
......@@ -78,13 +78,15 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) {
// todo remove this
if (offset.val.type == TMQ_OFFSET__LOG) {
taosWLockLatch(&pStore->pTq->lock);
STqHandle* pHandle = taosHashGet(pStore->pTq->pHandle, offset.subKey, strlen(offset.subKey));
if (pHandle) {
if (walRefVer(pHandle->pRef, offset.val.version) < 0) {
if (walSetRefVer(pHandle->pRef, offset.val.version) < 0) {
// tqError("vgId: %d, tq handle %s ref ver %" PRId64 "error", pStore->pTq->pVnode->config.vgId, pHandle->subKey,
// offset.val.version);
}
}
taosWUnLockLatch(&pStore->pTq->lock);
}
taosMemoryFree(pMemBuf);
......
......@@ -1017,6 +1017,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
int32_t vgId = TD_VID(pTq->pVnode);
// update the table list for each consumer handle
taosWLockLatch(&pTq->lock);
while (1) {
pIter = taosHashIterate(pTq->pHandle, pIter);
if (pIter == NULL) {
......@@ -1073,6 +1074,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
}
}
}
taosWUnLockLatch(&pTq->lock);
// update the table list handle for each stream scanner/wal reader
taosWLockLatch(&pTq->pStreamMeta->lock);
......
......@@ -162,8 +162,6 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
return 0;
}
bool tqIsHandleExecuting(STqHandle* pHandle) { return 1 == atomic_load_8(&pHandle->exec); }
static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
SRpcMsg* pMsg, STqOffsetVal* pOffset) {
uint64_t consumerId = pRequest->consumerId;
......@@ -173,12 +171,6 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);
while(tqIsHandleExecuting(pHandle)){
tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey);
taosMsleep(5);
}
atomic_store_8(&pHandle->exec, 1);
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
if(code != 0) {
......@@ -193,9 +185,9 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
// lock
taosWLockLatch(&pTq->lock);
code = tqRegisterPushHandle(pTq, pHandle, pMsg);
tqSetHandleIdle(pHandle);
taosWUnLockLatch(&pTq->lock);
tDeleteSMqDataRsp(&dataRsp);
atomic_store_8(&pHandle->exec, 0);
return code;
}
else{
......@@ -214,7 +206,9 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code);
tDeleteSMqDataRsp(&dataRsp);
}
atomic_store_8(&pHandle->exec, 0);
taosWLockLatch(&pTq->lock);
tqSetHandleIdle(pHandle);
taosWUnLockLatch(&pTq->lock);
return code;
}
......@@ -228,13 +222,6 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
STaosxRsp taosxRsp = {0};
tqInitTaosxRsp(&taosxRsp, pRequest);
while(tqIsHandleExecuting(pHandle)){
tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey);
taosMsleep(5);
}
atomic_store_8(&pHandle->exec, 1);
if (offset->type != TMQ_OFFSET__LOG) {
if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) {
code = -1;
......@@ -329,7 +316,6 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
}
end:
atomic_store_8(&pHandle->exec, 0);
tDeleteSTaosxRsp(&taosxRsp);
taosMemoryFreeClear(pCkHead);
......
......@@ -1060,7 +1060,10 @@ void qStreamSetOpen(qTaskInfo_t tinfo) {
void verifyOffset(void *pWalReader, STqOffsetVal* pOffset){
// if offset version is small than first version , let's seek to first version
taosThreadMutexLock(&((SWalReader*)pWalReader)->pWal->mutex);
int64_t firstVer = walGetFirstVer(((SWalReader*)pWalReader)->pWal);
taosThreadMutexUnlock(&((SWalReader*)pWalReader)->pWal->mutex);
if (pOffset->version + 1 < firstVer){
pOffset->version = firstVer - 1;
}
......
......@@ -45,7 +45,7 @@ void walCloseRef(SWal *pWal, int64_t refId) {
taosMemoryFree(pRef);
}
int32_t walRefVer(SWalRef *pRef, int64_t ver) {
int32_t walSetRefVer(SWalRef *pRef, int64_t ver) {
SWal *pWal = pRef->pWal;
wDebug("vgId:%d, wal ref version %" PRId64 ", refId %" PRId64, pWal->cfg.vgId, ver, pRef->refId);
if (pRef->refVer != ver) {
......@@ -57,26 +57,12 @@ int32_t walRefVer(SWalRef *pRef, int64_t ver) {
}
pRef->refVer = ver;
// bsearch in fileSet
// SWalFileInfo tmpInfo;
// tmpInfo.firstVer = ver;
// SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
// ASSERT(pRet != NULL);
// pRef->refFile = pRet->firstVer;
taosThreadMutexUnlock(&pWal->mutex);
}
return 0;
}
#if 1
void walUnrefVer(SWalRef *pRef) {
pRef->refId = -1;
// pRef->refFile = -1;
}
#endif
SWalRef *walRefFirstVer(SWal *pWal, SWalRef *pRef) {
if (pRef == NULL) {
pRef = walOpenRef(pWal);
......@@ -87,12 +73,6 @@ SWalRef *walRefFirstVer(SWal *pWal, SWalRef *pRef) {
taosThreadMutexLock(&pWal->mutex);
int64_t ver = walGetFirstVer(pWal);
pRef->refVer = ver;
// bsearch in fileSet
// SWalFileInfo tmpInfo;
// tmpInfo.firstVer = ver;
// SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
// ASSERT(pRet != NULL);
// pRef->refFile = pRet->firstVer;
taosThreadMutexUnlock(&pWal->mutex);
wDebug("vgId:%d, wal ref version %" PRId64 " for first", pWal->cfg.vgId, ver);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册