未验证 提交 808ae094 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #17562 from taosdata/feature/stream

fix(wal): reference
...@@ -550,7 +550,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu ...@@ -550,7 +550,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
// 6. execution // 6. execution
if (mndTransPrepare(pMnode, pTrans) != 0) { if (mndTransPrepare(pMnode, pTrans) != 0) {
ASSERT(0); mError("failed to prepare trans rebalance since %s", terrstr());
goto REB_FAIL; goto REB_FAIL;
} }
......
...@@ -167,7 +167,7 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) { ...@@ -167,7 +167,7 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) {
if (rollback) { if (rollback) {
ASSERT(0); ASSERT(0);
} else { } else {
code = tdbCommit(pWriter->pTq->pMetaStore, &pWriter->txn); code = tdbCommit(pWriter->pTq->pMetaDB, &pWriter->txn);
if (code) goto _err; if (code) goto _err;
} }
......
...@@ -175,11 +175,15 @@ void updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t pr ...@@ -175,11 +175,15 @@ void updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t pr
maxTs = TMAX(maxTs, ts); maxTs = TMAX(maxTs, ts);
SScalableBf *pSBf = getSBf(pInfo, ts); SScalableBf *pSBf = getSBf(pInfo, ts);
if (pSBf) { if (pSBf) {
tScalableBfPut(pSBf, &ts, sizeof(TSKEY)); SUpdateKey updateKey = {
.tbUid = tbUid,
.ts = ts,
};
tScalableBfPut(pSBf, &updateKey, sizeof(SUpdateKey));
} }
} }
TSKEY *pMaxTs = taosHashGet(pInfo->pMap, &tbUid, sizeof(int64_t)); TSKEY *pMaxTs = taosHashGet(pInfo->pMap, &tbUid, sizeof(int64_t));
if (pMaxTs == NULL || *pMaxTs > tbUid) { if (pMaxTs == NULL || *pMaxTs > maxTs) {
taosHashPut(pInfo->pMap, &tbUid, sizeof(int64_t), &maxTs, sizeof(TSKEY)); taosHashPut(pInfo->pMap, &tbUid, sizeof(int64_t), &maxTs, sizeof(TSKEY));
} }
} }
......
...@@ -42,6 +42,7 @@ void walCloseRef(SWal *pWal, int64_t refId) { ...@@ -42,6 +42,7 @@ void walCloseRef(SWal *pWal, int64_t refId) {
int32_t walRefVer(SWalRef *pRef, int64_t ver) { int32_t walRefVer(SWalRef *pRef, int64_t ver) {
SWal *pWal = pRef->pWal; SWal *pWal = pRef->pWal;
wDebug("vgId:%d, wal ref version %" PRId64 ", refId %" PRId64, pWal->cfg.vgId, ver, pRef->refId);
if (pRef->refVer != ver) { if (pRef->refVer != ver) {
taosThreadMutexLock(&pWal->mutex); taosThreadMutexLock(&pWal->mutex);
if (ver < pWal->vers.firstVer || ver > pWal->vers.lastVer) { if (ver < pWal->vers.firstVer || ver > pWal->vers.lastVer) {
......
...@@ -257,6 +257,8 @@ static FORCE_INLINE int32_t walCheckAndRoll(SWal *pWal) { ...@@ -257,6 +257,8 @@ static FORCE_INLINE int32_t walCheckAndRoll(SWal *pWal) {
int32_t walBeginSnapshot(SWal *pWal, int64_t ver) { int32_t walBeginSnapshot(SWal *pWal, int64_t ver) {
pWal->vers.verInSnapshotting = ver; pWal->vers.verInSnapshotting = ver;
wDebug("vgId:%d, wal begin snapshot for version %" PRId64 ", first ver %" PRId64 ", last ver %" PRId64,
pWal->cfg.vgId, ver, pWal->vers.firstVer, pWal->vers.lastVer);
// check file rolling // check file rolling
if (pWal->cfg.retentionPeriod == 0) { if (pWal->cfg.retentionPeriod == 0) {
taosThreadMutexLock(&pWal->mutex); taosThreadMutexLock(&pWal->mutex);
...@@ -273,6 +275,10 @@ int32_t walEndSnapshot(SWal *pWal) { ...@@ -273,6 +275,10 @@ int32_t walEndSnapshot(SWal *pWal) {
int32_t code = 0; int32_t code = 0;
taosThreadMutexLock(&pWal->mutex); taosThreadMutexLock(&pWal->mutex);
int64_t ver = pWal->vers.verInSnapshotting; int64_t ver = pWal->vers.verInSnapshotting;
wDebug("vgId:%d, wal end snapshot for version %" PRId64 ", first ver %" PRId64 ", last ver %" PRId64, pWal->cfg.vgId,
ver, pWal->vers.firstVer, pWal->vers.lastVer);
if (ver == -1) { if (ver == -1) {
code = -1; code = -1;
goto END; goto END;
...@@ -287,7 +293,8 @@ int32_t walEndSnapshot(SWal *pWal) { ...@@ -287,7 +293,8 @@ int32_t walEndSnapshot(SWal *pWal) {
if (pIter == NULL) break; if (pIter == NULL) break;
SWalRef *pRef = *(SWalRef **)pIter; SWalRef *pRef = *(SWalRef **)pIter;
if (pRef->refVer == -1) continue; if (pRef->refVer == -1) continue;
ver = TMIN(ver, pRef->refVer); ver = TMIN(ver, pRef->refVer - 1);
wDebug("vgId:%d, wal found ref %" PRId64 ", refId %" PRId64, pWal->cfg.vgId, pRef->refVer, pRef->refId);
} }
int deleteCnt = 0; int deleteCnt = 0;
...@@ -298,7 +305,12 @@ int32_t walEndSnapshot(SWal *pWal) { ...@@ -298,7 +305,12 @@ int32_t walEndSnapshot(SWal *pWal) {
SWalFileInfo *pInfo = taosArraySearch(pWal->fileInfoSet, &tmp, compareWalFileInfo, TD_LE); SWalFileInfo *pInfo = taosArraySearch(pWal->fileInfoSet, &tmp, compareWalFileInfo, TD_LE);
if (pInfo) { if (pInfo) {
if (ver >= pInfo->lastVer) { if (ver >= pInfo->lastVer) {
pInfo++; pInfo--;
}
if (POINTER_DISTANCE(pInfo, pWal->fileInfoSet->pData) > 0) {
wDebug("vgId:%d, begin remove from %" PRId64, pWal->cfg.vgId, pInfo->firstVer);
} else {
wDebug("vgId:%d, no remove", pWal->cfg.vgId);
} }
// iterate files, until the searched result // iterate files, until the searched result
for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) { for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) {
...@@ -315,10 +327,12 @@ int32_t walEndSnapshot(SWal *pWal) { ...@@ -315,10 +327,12 @@ int32_t walEndSnapshot(SWal *pWal) {
for (int i = 0; i < deleteCnt; i++) { for (int i = 0; i < deleteCnt; i++) {
pInfo = taosArrayGet(pWal->fileInfoSet, i); pInfo = taosArrayGet(pWal->fileInfoSet, i);
walBuildLogName(pWal, pInfo->firstVer, fnameStr); walBuildLogName(pWal, pInfo->firstVer, fnameStr);
wDebug("vgId:%d, remove file %s", pWal->cfg.vgId, fnameStr);
if (taosRemoveFile(fnameStr) < 0) { if (taosRemoveFile(fnameStr) < 0) {
goto UPDATE_META; goto UPDATE_META;
} }
walBuildIdxName(pWal, pInfo->firstVer, fnameStr); walBuildIdxName(pWal, pInfo->firstVer, fnameStr);
wDebug("vgId:%d, remove file %s", pWal->cfg.vgId, fnameStr);
if (taosRemoveFile(fnameStr) < 0) { if (taosRemoveFile(fnameStr) < 0) {
ASSERT(0); ASSERT(0);
} }
...@@ -409,7 +423,7 @@ END: ...@@ -409,7 +423,7 @@ END:
} }
static int32_t walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { static int32_t walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
SWalIdxEntry entry = {.ver = ver, .offset = offset}; SWalIdxEntry entry = {.ver = ver, .offset = offset};
SWalFileInfo *pFileInfo = walGetCurFileInfo(pWal); SWalFileInfo *pFileInfo = walGetCurFileInfo(pWal);
ASSERT(pFileInfo != NULL); ASSERT(pFileInfo != NULL);
ASSERT(pFileInfo->firstVer >= 0); ASSERT(pFileInfo->firstVer >= 0);
...@@ -429,7 +443,7 @@ static int32_t walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { ...@@ -429,7 +443,7 @@ static int32_t walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
if (endOffset < 0) { if (endOffset < 0) {
wFatal("vgId:%d, failed to seek end of idxfile due to %s. ver:%" PRId64 "", pWal->cfg.vgId, strerror(errno), ver); wFatal("vgId:%d, failed to seek end of idxfile due to %s. ver:%" PRId64 "", pWal->cfg.vgId, strerror(errno), ver);
} }
ASSERT(endOffset == idxOffset + sizeof(SWalIdxEntry) && "Offset of idx entries misaligned"); ASSERT(endOffset == idxOffset + sizeof(SWalIdxEntry) && "Offset of idx entries misaligned");
return 0; return 0;
} }
...@@ -437,7 +451,7 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy ...@@ -437,7 +451,7 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
const void *body, int32_t bodyLen) { const void *body, int32_t bodyLen) {
int64_t code = 0; int64_t code = 0;
int64_t offset = walGetCurFileOffset(pWal); int64_t offset = walGetCurFileOffset(pWal);
SWalFileInfo *pFileInfo = walGetCurFileInfo(pWal); SWalFileInfo *pFileInfo = walGetCurFileInfo(pWal);
ASSERT(pFileInfo != NULL); ASSERT(pFileInfo != NULL);
......
...@@ -137,8 +137,10 @@ SBloomFilter *tBloomFilterDecode(SDecoder *pDecoder) { ...@@ -137,8 +137,10 @@ SBloomFilter *tBloomFilterDecode(SDecoder *pDecoder) {
if (tDecodeU64(pDecoder, pUnits + i) < 0) goto _error; if (tDecodeU64(pDecoder, pUnits + i) < 0) goto _error;
} }
if (tDecodeDouble(pDecoder, &pBF->errorRate) < 0) goto _error; if (tDecodeDouble(pDecoder, &pBF->errorRate) < 0) goto _error;
pBF->hashFn1 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP); /*pBF->hashFn1 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP);*/
pBF->hashFn2 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_NCHAR); /*pBF->hashFn2 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_NCHAR);*/
pBF->hashFn1 = taosFastHash;
pBF->hashFn2 = taosDJB2Hash;
return pBF; return pBF;
_error: _error:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册