diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index d3e2bbb1be8d9578b26e8d10066465e6e85a4765..46dc179295ac9481e78234a609e8c1f9427f4308 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -132,7 +132,7 @@ typedef struct { } SWalRef; typedef struct { - int8_t scanUncommited; +// int8_t scanUncommited; int8_t scanNotApplied; int8_t scanMeta; int8_t enableRef; diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 19d08ad66ddc2f81000a910f26142dc36d664413..ce06e0eac42ff2186a880b330f5fc953e811f8e1 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -1580,7 +1580,9 @@ static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawL code = smlModifyDBSchemas(info); if (code == 0 || code == TSDB_CODE_SML_INVALID_DATA || code == TSDB_CODE_PAR_TOO_MANY_COLUMNS || code == TSDB_CODE_PAR_INVALID_TAGS_NUM || code == TSDB_CODE_PAR_INVALID_TAGS_LENGTH - || code == TSDB_CODE_PAR_INVALID_ROW_LENGTH) break; + || code == TSDB_CODE_PAR_INVALID_ROW_LENGTH || code == TSDB_CODE_MND_FIELD_VALUE_OVERFLOW) { + break; + } taosMsleep(100); uInfo("SML:0x%" PRIx64 " smlModifyDBSchemas retry code:%s, times:%d", info->id, tstrerror(code), retryNum); } while (retryNum++ < taosHashGetSize(info->superTables) * MAX_RETRY_TIMES); diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 844ad89b6c0de25a379cc67ff8bc325792fdfe56..b29e36efdce9f6496da99d212bfd08c552a96bc7 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -37,7 +37,7 @@ SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) { if (cond) { pReader->cond = *cond; } else { - pReader->cond.scanUncommited = 0; +// pReader->cond.scanUncommited = 0; pReader->cond.scanNotApplied = 0; pReader->cond.scanMeta = 0; pReader->cond.enableRef = 0; @@ -74,13 +74,18 @@ int32_t walNextValidMsg(SWalReader *pReader) { int64_t lastVer = walGetLastVer(pReader->pWal); int64_t committedVer = walGetCommittedVer(pReader->pWal); int64_t appliedVer = walGetAppliedVer(pReader->pWal); - int64_t endVer = pReader->cond.scanUncommited ? lastVer : committedVer; + while(appliedVer < committedVer){ // wait apply ver equal to commit ver, otherwise may lost data when consume data [TD-24010] + wDebug("vgId:%d, wal apply ver:%"PRId64" smaller than commit ver:%"PRId64", so sleep 1ms", pReader->pWal->cfg.vgId, appliedVer, committedVer); + taosMsleep(1); + appliedVer = walGetAppliedVer(pReader->pWal); + } +// int64_t endVer = pReader->cond.scanUncommited ? lastVer : committedVer; // endVer = TMIN(appliedVer, endVer); wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64 - ", applied index:%" PRId64 ", end index:%" PRId64, - pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer, endVer); - while (fetchVer <= endVer) { + ", applied index:%" PRId64, + pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer); + while (fetchVer <= committedVer) { if (walFetchHeadNew(pReader, fetchVer) < 0) { return -1; }