未验证 提交 fd379310 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #19979 from taosdata/fix/tq_oom

fix: handle insufficient resource
......@@ -816,7 +816,6 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
// TODO version should be assigned and refed during preprocess
SWalRef* pRef = walRefCommittedVer(pTq->pVnode->pWal);
if (pRef == NULL) {
ASSERT(0);
return -1;
}
int64_t ver = pRef->refVer;
......@@ -837,12 +836,9 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
pHandle->execHandle.task =
qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols, NULL);
ASSERT(pHandle->execHandle.task);
void* scanner = NULL;
qExtractStreamScanner(pHandle->execHandle.task, &scanner);
ASSERT(scanner);
pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
ASSERT(pHandle->execHandle.pExecReader);
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode);
......@@ -875,8 +871,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
tqDebug("try to persist handle %s consumer %" PRId64, req.subKey, pHandle->consumerId);
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
// TODO
ASSERT(0);
return -1;
}
} else {
/*ASSERT(pExec->consumerId == req.oldConsumerId);*/
......@@ -886,8 +881,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
atomic_add_fetch_32(&pHandle->epoch, 1);
taosMemoryFree(req.qmsg);
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
// TODO
ASSERT(0);
return -1;
}
// close handle
}
......
......@@ -71,17 +71,14 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
int32_t tqMetaOpen(STQ* pTq) {
if (tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaDB, 0) < 0) {
ASSERT(0);
return -1;
}
if (tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pExecStore, 0) < 0) {
ASSERT(0);
return -1;
}
if (tdbTbOpen("tq.check.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pCheckStore, 0) < 0) {
ASSERT(0);
return -1;
}
......@@ -197,40 +194,49 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
int32_t code;
int32_t vlen;
tEncodeSize(tEncodeSTqHandle, pHandle, vlen, code);
ASSERT(code == 0);
tqDebug("tq save %s(%d) consumer %" PRId64 " vgId:%d", pHandle->subKey, (int32_t)strlen(pHandle->subKey),
pHandle->consumerId, TD_VID(pTq->pVnode));
void* buf = taosMemoryCalloc(1, vlen);
if (buf == NULL) {
ASSERT(0);
return -1;
}
SEncoder encoder;
tEncoderInit(&encoder, buf, vlen);
if (tEncodeSTqHandle(&encoder, pHandle) < 0) {
ASSERT(0);
tEncoderClear(&encoder);
taosMemoryFree(buf);
return -1;
}
TXN* txn;
if (tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
0) {
ASSERT(0);
tEncoderClear(&encoder);
taosMemoryFree(buf);
return -1;
}
if (tdbTbUpsert(pTq->pExecStore, key, (int)strlen(key), buf, vlen, txn) < 0) {
ASSERT(0);
tEncoderClear(&encoder);
taosMemoryFree(buf);
return -1;
}
if (tdbCommit(pTq->pMetaDB, txn) < 0) {
ASSERT(0);
tEncoderClear(&encoder);
taosMemoryFree(buf);
return -1;
}
if (tdbPostCommit(pTq->pMetaDB, txn) < 0) {
ASSERT(0);
tEncoderClear(&encoder);
taosMemoryFree(buf);
return -1;
}
tEncoderClear(&encoder);
......
......@@ -87,8 +87,6 @@ int32_t walApplyVer(SWal *pWal, int64_t ver) {
}
int32_t walCommit(SWal *pWal, int64_t ver) {
ASSERT(pWal->vers.commitVer >= pWal->vers.snapshotVer);
ASSERT(pWal->vers.commitVer <= pWal->vers.lastVer);
if (ver < pWal->vers.commitVer) {
return 0;
}
......@@ -138,25 +136,21 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
TdFilePtr pIdxFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
if (pIdxFile == NULL) {
ASSERT(0);
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
int64_t idxOff = walGetVerIdxOffset(pWal, ver);
code = taosLSeekFile(pIdxFile, idxOff, SEEK_SET);
if (code < 0) {
ASSERT(0);
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
// read idx file and get log file pos
SWalIdxEntry entry;
if (taosReadFile(pIdxFile, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) {
ASSERT(0);
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
ASSERT(entry.ver == ver);
walBuildLogName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
TdFilePtr pLogFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
......@@ -176,24 +170,19 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
}
// validate offset
SWalCkHead head;
ASSERT(taosValidFile(pLogFile));
int64_t size = taosReadFile(pLogFile, &head, sizeof(SWalCkHead));
int64_t size = taosReadFile(pLogFile, &head, sizeof(SWalCkHead));
if (size != sizeof(SWalCkHead)) {
ASSERT(0);
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
code = walValidHeadCksum(&head);
ASSERT(code == 0);
if (code != 0) {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(0);
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
if (head.head.version != ver) {
ASSERT(0);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
taosThreadMutexUnlock(&pWal->mutex);
return -1;
......@@ -202,22 +191,17 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
// truncate old files
code = taosFtruncateFile(pLogFile, entry.offset);
if (code < 0) {
ASSERT(0);
terrno = TAOS_SYSTEM_ERROR(errno);
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
code = taosFtruncateFile(pIdxFile, idxOff);
if (code < 0) {
ASSERT(0);
terrno = TAOS_SYSTEM_ERROR(errno);
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
pWal->vers.lastVer = ver - 1;
if (pWal->vers.lastVer < pWal->vers.firstVer) {
ASSERT(pWal->vers.lastVer == pWal->vers.firstVer - 1);
}
((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->lastVer = ver - 1;
((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->fileSize = entry.offset;
taosCloseFile(&pIdxFile);
......@@ -386,7 +370,7 @@ int32_t walEndSnapshot(SWal *pWal) {
walBuildIdxName(pWal, pInfo->firstVer, fnameStr);
wDebug("vgId:%d, wal remove file %s", pWal->cfg.vgId, fnameStr);
if (taosRemoveFile(fnameStr) < 0 && errno != ENOENT) {
ASSERT(0);
goto END;
}
}
taosArrayClear(pWal->toDeleteFiles);
......@@ -441,7 +425,6 @@ int32_t walRollImpl(SWal *pWal) {
pWal->pIdxFile = pIdxFile;
pWal->pLogFile = pLogFile;
pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;
ASSERT(pWal->writeCur >= 0);
pWal->lastRollSeq = walGetSeq();
......@@ -458,9 +441,7 @@ END:
static int32_t walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
SWalIdxEntry entry = {.ver = ver, .offset = offset};
SWalFileInfo *pFileInfo = walGetCurFileInfo(pWal);
ASSERT(pFileInfo != NULL);
ASSERT(pFileInfo->firstVer >= 0);
int64_t idxOffset = (entry.ver - pFileInfo->firstVer) * sizeof(SWalIdxEntry);
int64_t idxOffset = (entry.ver - pFileInfo->firstVer) * sizeof(SWalIdxEntry);
wDebug("vgId:%d, write index, index:%" PRId64 ", offset:%" PRId64 ", at %" PRId64, pWal->cfg.vgId, ver, offset,
idxOffset);
......@@ -476,7 +457,6 @@ static int32_t walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
if (endOffset < 0) {
wFatal("vgId:%d, failed to seek end of idxfile due to %s. ver:%" PRId64 "", pWal->cfg.vgId, strerror(errno), ver);
}
ASSERT(endOffset == idxOffset + sizeof(SWalIdxEntry) && "Offset of idx entries misaligned");
return 0;
}
......@@ -486,9 +466,7 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
int64_t offset = walGetCurFileOffset(pWal);
SWalFileInfo *pFileInfo = walGetCurFileInfo(pWal);
ASSERT(pFileInfo != NULL);
ASSERT(pFileInfo->firstVer != -1);
pWal->writeHead.head.version = index;
pWal->writeHead.head.bodyLen = bodyLen;
pWal->writeHead.head.msgType = msgType;
......@@ -525,7 +503,6 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
// set status
if (pWal->vers.firstVer == -1) {
ASSERT(index == 0);
pWal->vers.firstVer = 0;
}
pWal->vers.lastVer = index;
......@@ -541,7 +518,6 @@ END:
wFatal("vgId:%d, failed to ftruncate logfile to offset:%" PRId64 " during recovery due to %s", pWal->cfg.vgId,
offset, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
ASSERT(0 && "failed to recover from error");
}
int64_t idxOffset = (index - pFileInfo->firstVer) * sizeof(SWalIdxEntry);
......@@ -549,7 +525,6 @@ END:
wFatal("vgId:%d, failed to ftruncate idxfile to offset:%" PRId64 "during recovery due to %s", pWal->cfg.vgId,
idxOffset, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
ASSERT(0 && "failed to recover from error");
}
return -1;
}
......@@ -576,8 +551,6 @@ int64_t walAppendLog(SWal *pWal, int64_t index, tmsg_t msgType, SWalSyncInfo syn
}
}
ASSERT(pWal->pLogFile != NULL && pWal->pIdxFile != NULL && pWal->writeCur >= 0);
if (walWriteImpl(pWal, index, msgType, syncMeta, body, bodyLen) < 0) {
taosThreadMutexUnlock(&pWal->mutex);
return -1;
......@@ -614,8 +587,6 @@ int32_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SWalSync
}
}
ASSERT(pWal->pIdxFile != NULL && pWal->pLogFile != NULL && pWal->writeCur >= 0);
if (walWriteImpl(pWal, index, msgType, syncMeta, body, bodyLen) < 0) {
taosThreadMutexUnlock(&pWal->mutex);
return -1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册