diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 7342af4d1844f2a561bd15240f82326d6e20d618..c435cdb03bcd67e13fc7ac819b1b4089fa04e0aa 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -61,30 +61,41 @@ extern "C" { } \ } -#define WAL_HEAD_VER 0 +#define WAL_HEAD_VER 0 #define WAL_NOSUFFIX_LEN 20 -#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN + 1) -#define WAL_LOG_SUFFIX "log" +#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN + 1) +#define WAL_LOG_SUFFIX "log" #define WAL_INDEX_SUFFIX "idx" -#define WAL_REFRESH_MS 1000 -#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead)) -#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12) -#define WAL_FILE_LEN (WAL_PATH_LEN + 32) -#define WAL_MAGIC 0xFAFBFCFDULL +#define WAL_REFRESH_MS 1000 +#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead)) +#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12) +#define WAL_FILE_LEN (WAL_PATH_LEN + 32) +#define WAL_MAGIC 0xFAFBFCFDULL #define WAL_CUR_FAILED 1 #pragma pack(push, 1) typedef enum { TAOS_WAL_NOLOG = 0, TAOS_WAL_WRITE = 1, TAOS_WAL_FSYNC = 2 } EWalType; +// used by sync module +typedef struct { + int8_t isWeek; + uint64_t seqNum; + uint64_t term; +} SSyncInfo; + typedef struct SWalReadHead { int8_t headVer; - int16_t msgType; int8_t reserved; + int16_t msgType; int32_t len; int64_t ingestTs; // not implemented int64_t version; - char body[]; + + // sync info + SSyncInfo syncInfo; + + char body[]; } SWalReadHead; typedef struct { @@ -117,16 +128,16 @@ typedef struct SWal { SWalCfg cfg; int32_t fsyncSeq; // meta - SWalVer vers; + SWalVer vers; TdFilePtr pWriteLogTFile; TdFilePtr pWriteIdxTFile; - int32_t writeCur; - SArray *fileInfoSet; + int32_t writeCur; + SArray *fileInfoSet; // status int64_t totSize; int64_t lastRollSeq; // ctl - int64_t refId; + int64_t refId; TdThreadMutex mutex; // path char path[WAL_PATH_LEN]; @@ -158,6 +169,7 @@ int32_t walAlter(SWal *, SWalCfg *pCfg); void walClose(SWal *); // write +int64_t walWriteWithSyncInfo(SWal *, int64_t index, tmsg_t msgType, SSyncInfo info, const void *body, int32_t bodyLen); int64_t walWrite(SWal *, int64_t index, tmsg_t msgType, const void *body, int32_t bodyLen); void walFsync(SWal *, bool force); diff --git a/source/client/stream/stream.c b/source/client/stream/stream.c deleted file mode 100644 index 6dea4a4e57392be988126c579648f39a8270b9bf..0000000000000000000000000000000000000000 --- a/source/client/stream/stream.c +++ /dev/null @@ -1,14 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ \ No newline at end of file diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 8392e66e58107914021b14dae9b690d4d24ccf09..e1b347d2b8215ff420676827a0ad0bd5cb901415 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -253,7 +253,8 @@ static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { return 0; } -int64_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, int32_t bodyLen) { +int64_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncInfo syncInfo, const void *body, + int32_t bodyLen) { if (pWal == NULL) return -1; int code = 0; @@ -296,6 +297,10 @@ int64_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, in int64_t offset = walGetCurFileOffset(pWal); pWal->writeHead.head.len = bodyLen; pWal->writeHead.head.msgType = msgType; + + // sync info + pWal->writeHead.head.syncInfo = syncInfo; + pWal->writeHead.cksumHead = walCalcHeadCksum(&pWal->writeHead); pWal->writeHead.cksumBody = walCalcBodyCksum(body, bodyLen); @@ -332,6 +337,15 @@ int64_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, in return 0; } +int64_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, int32_t bodyLen) { + SSyncInfo info = { + .isWeek = -1, + .seqNum = UINT64_MAX, + .term = UINT64_MAX, + }; + return walWriteWithSyncInfo(pWal, index, msgType, info, body, bodyLen); +} + void walFsync(SWal *pWal, bool forceFsync) { if (forceFsync || (pWal->cfg.level == TAOS_WAL_FSYNC && pWal->cfg.fsyncPeriod == 0)) { wTrace("vgId:%d, fileId:%" PRId64 ".log, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal));