wal.h 6.3 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
#ifndef _TD_WAL_H_
#define _TD_WAL_H_

L
Liu Jicong 已提交
18
#include "os.h"
L
Liu Jicong 已提交
19
#include "tarray.h"
L
Liu Jicong 已提交
20 21
#include "tdef.h"
#include "tlog.h"
L
Liu Jicong 已提交
22
#include "tmsg.h"
H
refact  
Hongze Cheng 已提交
23 24 25 26
#ifdef __cplusplus
extern "C" {
#endif

S
Shengliang Guan 已提交
27 28 29 30 31 32 33 34
// clang-format off
#define wFatal(...) { if (wDebugFlag & DEBUG_FATAL) { taosPrintLog("WAL FATAL ", DEBUG_FATAL, 255,        __VA_ARGS__); }}
#define wError(...) { if (wDebugFlag & DEBUG_ERROR) { taosPrintLog("WAL ERROR ", DEBUG_ERROR, 255,        __VA_ARGS__); }}
#define wWarn(...)  { if (wDebugFlag & DEBUG_WARN)  { taosPrintLog("WAL WARN ",  DEBUG_WARN, 255,         __VA_ARGS__); }}
#define wInfo(...)  { if (wDebugFlag & DEBUG_INFO)  { taosPrintLog("WAL ",       DEBUG_INFO, 255,         __VA_ARGS__); }}
#define wDebug(...) { if (wDebugFlag & DEBUG_DEBUG) { taosPrintLog("WAL ",       DEBUG_DEBUG, wDebugFlag, __VA_ARGS__); }}
#define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ",       DEBUG_TRACE, wDebugFlag, __VA_ARGS__); }}
// clang-format on
L
Liu Jicong 已提交
35

L
Liu Jicong 已提交
36 37 38 39 40 41 42 43 44 45
#define WAL_PROTO_VER     0
#define WAL_NOSUFFIX_LEN  20
#define WAL_SUFFIX_AT     (WAL_NOSUFFIX_LEN + 1)
#define WAL_LOG_SUFFIX    "log"
#define WAL_INDEX_SUFFIX  "idx"
#define WAL_REFRESH_MS    1000
#define WAL_PATH_LEN      (TSDB_FILENAME_LEN + 12)
#define WAL_FILE_LEN      (WAL_PATH_LEN + 32)
#define WAL_MAGIC         0xFAFBFCFDF4F3F2F1ULL
#define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3)
L
Liu Jicong 已提交
46

L
Liu Jicong 已提交
47 48 49 50
typedef enum {
  TAOS_WAL_WRITE = 1,
  TAOS_WAL_FSYNC = 2,
} EWalType;
51

L
Liu Jicong 已提交
52 53 54 55 56
typedef struct {
  int32_t  vgId;
  int32_t  fsyncPeriod;      // millisecond
  int32_t  retentionPeriod;  // secs
  int32_t  rollPeriod;       // secs
L
Liu Jicong 已提交
57
  int64_t  retentionSize;
L
Liu Jicong 已提交
58
  int64_t  segSize;
L
Liu Jicong 已提交
59
  EWalType level;  // wal level
L
Liu Jicong 已提交
60 61
} SWalCfg;

L
Liu Jicong 已提交
62
typedef struct {
L
Liu Jicong 已提交
63 64 65 66
  int64_t firstVer;
  int64_t verInSnapshotting;
  int64_t snapshotVer;
  int64_t commitVer;
L
Liu Jicong 已提交
67
  int64_t appliedVer;
L
Liu Jicong 已提交
68 69 70
  int64_t lastVer;
} SWalVer;

L
Liu Jicong 已提交
71 72 73 74 75 76
#pragma pack(push, 1)
// used by sync module
typedef struct {
  int8_t   isWeek;
  uint64_t seqNum;
  uint64_t term;
L
Liu Jicong 已提交
77
} SWalSyncInfo;
L
Liu Jicong 已提交
78 79 80

typedef struct {
  int64_t version;
81
  int64_t ingestTs;
L
Liu Jicong 已提交
82
  int32_t bodyLen;
83 84
  int16_t msgType;
  int8_t  protoVer;
L
Liu Jicong 已提交
85 86

  // sync meta
L
Liu Jicong 已提交
87
  SWalSyncInfo syncMeta;
L
Liu Jicong 已提交
88 89 90 91 92 93 94 95 96 97 98 99

  char body[];
} SWalCont;

typedef struct {
  uint64_t magic;
  uint32_t cksumHead;
  uint32_t cksumBody;
  SWalCont head;
} SWalCkHead;
#pragma pack(pop)

L
Liu Jicong 已提交
100
typedef struct SWal {
L
Liu Jicong 已提交
101
  // cfg
L
Liu Jicong 已提交
102
  SWalCfg cfg;
L
Liu Jicong 已提交
103
  int32_t fsyncSeq;
L
Liu Jicong 已提交
104
  // meta
L
Liu Jicong 已提交
105
  SWalVer   vers;
L
Liu Jicong 已提交
106 107
  TdFilePtr pLogFile;
  TdFilePtr pIdxFile;
L
Liu Jicong 已提交
108
  int32_t   writeCur;
L
Liu Jicong 已提交
109
  SArray   *fileInfoSet;  // SArray<SWalFileInfo>
110 111
  // gc
  SArray *toDeleteFiles;  // SArray<SWalFileInfo>
L
Liu Jicong 已提交
112
  // status
L
Liu Jicong 已提交
113 114
  int64_t totSize;
  int64_t lastRollSeq;
L
Liu Jicong 已提交
115
  // ctl
L
Liu Jicong 已提交
116
  int64_t       refId;
wafwerar's avatar
wafwerar 已提交
117
  TdThreadMutex mutex;
118
  // ref
119
  SHashObj *pRefHash;  // refId -> SWalRef
L
Liu Jicong 已提交
120
  // path
L
Liu Jicong 已提交
121
  char path[WAL_PATH_LEN];
L
Liu Jicong 已提交
122
  // reusable write head
L
Liu Jicong 已提交
123
  SWalCkHead writeHead;
124 125 126 127 128
} SWal;

typedef struct {
  int64_t refId;
  int64_t refVer;
wmmhello's avatar
wmmhello 已提交
129
//  int64_t refFile;
130 131
  SWal   *pWal;
} SWalRef;
L
Liu Jicong 已提交
132

L
Liu Jicong 已提交
133 134
typedef struct {
  int8_t scanUncommited;
135
  int8_t scanNotApplied;
L
Liu Jicong 已提交
136
  int8_t scanMeta;
L
Liu Jicong 已提交
137
  int8_t enableRef;
L
Liu Jicong 已提交
138 139 140 141
} SWalFilterCond;

typedef struct {
  SWal          *pWal;
142
  int64_t        readerId;
L
Liu Jicong 已提交
143 144 145 146 147
  TdFilePtr      pLogFile;
  TdFilePtr      pIdxFile;
  int64_t        curFileFirstVer;
  int64_t        curVersion;
  int64_t        capacity;
L
Liu Jicong 已提交
148
  int8_t         curInvalid;
149
  int8_t         curStopped;
L
Liu Jicong 已提交
150 151
  TdThreadMutex  mutex;
  SWalFilterCond cond;
152 153
  // TODO remove it
  SWalCkHead *pHead;
L
Liu Jicong 已提交
154
} SWalReader;
155

S
Shengliang Guan 已提交
156 157 158
// module initialization
int32_t walInit();
void    walCleanUp();
159

S
Shengliang Guan 已提交
160
// handle open and ctl
L
Liu Jicong 已提交
161
SWal   *walOpen(const char *path, SWalCfg *pCfg);
S
Shengliang Guan 已提交
162
int32_t walAlter(SWal *, SWalCfg *pCfg);
163
int32_t walPersist(SWal *);
S
Shengliang Guan 已提交
164
void    walClose(SWal *);
165

L
Liu Jicong 已提交
166 167 168
// write interfaces

// By assigning index by the caller, wal gurantees linearizability
L
Liu Jicong 已提交
169
int32_t walWrite(SWal *, int64_t index, tmsg_t msgType, const void *body, int32_t bodyLen);
L
Liu Jicong 已提交
170 171 172
int32_t walWriteWithSyncInfo(SWal *, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body,
                             int32_t bodyLen);

173
// Assign version automatically and return to caller,
L
Liu Jicong 已提交
174
// -1 will be returned for failed writes
B
Benguang Zhao 已提交
175
int64_t walAppendLog(SWal *, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body, int32_t bodyLen);
L
Liu Jicong 已提交
176 177

void walFsync(SWal *, bool force);
L
Liu Jicong 已提交
178

S
Shengliang Guan 已提交
179 180 181
// apis for lifecycle management
int32_t walCommit(SWal *, int64_t ver);
int32_t walRollback(SWal *, int64_t ver);
L
Liu Jicong 已提交
182
// notify that previous logs can be pruned safely
L
Liu Jicong 已提交
183 184
int32_t walBeginSnapshot(SWal *, int64_t ver);
int32_t walEndSnapshot(SWal *);
185
int32_t walRestoreFromSnapshot(SWal *, int64_t ver);
L
Liu Jicong 已提交
186 187 188
// for tq
int32_t walApplyVer(SWal *, int64_t ver);

L
Liu Jicong 已提交
189
// int32_t  walDataCorrupted(SWal*);
190

S
Shengliang Guan 已提交
191
// read
L
Liu Jicong 已提交
192 193
SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond);
void        walCloseReader(SWalReader *pRead);
194
void        walReadReset(SWalReader *pReader);
L
Liu Jicong 已提交
195
int32_t     walReadVer(SWalReader *pRead, int64_t ver);
L
Liu Jicong 已提交
196
int32_t     walReadSeekVer(SWalReader *pRead, int64_t ver);
L
Liu Jicong 已提交
197
int32_t     walNextValidMsg(SWalReader *pRead);
198 199

// only for tq usage
L
Liu Jicong 已提交
200 201 202 203
void    walSetReaderCapacity(SWalReader *pRead, int32_t capacity);
int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead);
int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead);
int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead);
204

L
Liu Jicong 已提交
205
SWalRef *walRefFirstVer(SWal *, SWalRef *);
206
SWalRef *walRefCommittedVer(SWal *);
207 208

SWalRef *walOpenRef(SWal *);
209
void     walCloseRef(SWal *pWal, int64_t refId);
210
int32_t  walRefVer(SWalRef *, int64_t ver);
211
void     walUnrefVer(SWalRef *);
212

213
// helper function for raft
214
bool walLogExist(SWal *, int64_t ver);
215
bool walIsEmpty(SWal *);
216

S
Shengliang Guan 已提交
217
// lifecycle check
L
Liu Jicong 已提交
218 219 220
int64_t walGetFirstVer(SWal *);
int64_t walGetSnapshotVer(SWal *);
int64_t walGetLastVer(SWal *);
L
Liu Jicong 已提交
221
int64_t walGetCommittedVer(SWal *);
L
Liu Jicong 已提交
222
int64_t walGetAppliedVer(SWal *);
223

H
refact  
Hongze Cheng 已提交
224 225 226 227
#ifdef __cplusplus
}
#endif

228
#endif  // _TD_WAL_H_