sma.h 9.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifndef _TD_VNODE_SMA_H_
#define _TD_VNODE_SMA_H_

#include "vnodeInt.h"

#ifdef __cplusplus
extern "C" {
#endif

// smaDebug ================
// clang-format off
#define smaFatal(...) do { if (smaDebugFlag & DEBUG_FATAL) { taosPrintLog("SMA FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }}     while(0)
#define smaError(...) do { if (smaDebugFlag & DEBUG_ERROR) { taosPrintLog("SMA ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }}     while(0)
#define smaWarn(...)  do { if (smaDebugFlag & DEBUG_WARN)  { taosPrintLog("SMA WARN ", DEBUG_WARN, 255, __VA_ARGS__); }}       while(0)
#define smaInfo(...)  do { if (smaDebugFlag & DEBUG_INFO)  { taosPrintLog("SMA ", DEBUG_INFO, 255, __VA_ARGS__); }}            while(0)
C
Cary Xu 已提交
31
#define smaDebug(...) do { if (smaDebugFlag & DEBUG_DEBUG) { taosPrintLog("SMA ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0)
32 33 34
#define smaTrace(...) do { if (smaDebugFlag & DEBUG_TRACE) { taosPrintLog("SMA ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on

C
Cary Xu 已提交
35 36
#define RSMA_TASK_INFO_HASH_SLOT 8

37 38
typedef struct SSmaEnv       SSmaEnv;
typedef struct SSmaStat      SSmaStat;
C
Cary Xu 已提交
39 40
typedef struct STSmaStat     STSmaStat;
typedef struct SRSmaStat     SRSmaStat;
41 42 43
typedef struct SSmaKey       SSmaKey;
typedef struct SRSmaInfo     SRSmaInfo;
typedef struct SRSmaInfoItem SRSmaInfoItem;
C
Cary Xu 已提交
44 45 46
typedef struct SQTaskFile    SQTaskFile;
typedef struct SQTaskFReader SQTaskFReader;
typedef struct SQTaskFWriter SQTaskFWriter;
47 48

struct SSmaEnv {
C
Cary Xu 已提交
49 50 51
  SRWLatch  lock;
  int8_t    type;
  SSmaStat *pStat;
52 53
};

54
typedef struct {
C
Cary Xu 已提交
55 56
  int8_t  inited;
  int32_t rsetId;
57
  void   *tmrHandle;  // shared by all fetch tasks
58 59
} SSmaMgmt;

C
Cary Xu 已提交
60
#define SMA_ENV_LOCK(env) (&(env)->lock)
C
Cary Xu 已提交
61 62
#define SMA_ENV_TYPE(env) ((env)->type)
#define SMA_ENV_STAT(env) ((env)->pStat)
63

C
Cary Xu 已提交
64
struct STSmaStat {
C
Cary Xu 已提交
65 66 67
  int8_t    state;  // ETsdbSmaStat
  STSma    *pTSma;  // cache schema
  STSchema *pTSchema;
68 69
};

C
Cary Xu 已提交
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
struct SQTaskFile {
  volatile int32_t nRef;
  int64_t          commitID;
  int64_t          size;
};

struct SQTaskFReader {
  SSma      *pSma;
  SQTaskFile fTask;
  TdFilePtr  pReadH;
};
struct SQTaskFWriter {
  SSma      *pSma;
  SQTaskFile fTask;
  TdFilePtr  pWriteH;
  char      *fname;
};

C
Cary Xu 已提交
88 89
struct SRSmaStat {
  SSma     *pSma;
C
Cary Xu 已提交
90 91
  int64_t   commitAppliedVer;  // vnode applied version for async commit
  int64_t   refId;             // shared by fetch tasks
C
Cary Xu 已提交
92
  SRWLatch  lock;              // r/w lock for rsma fs(e.g. qtaskinfo)
C
Cary Xu 已提交
93 94
  int8_t    triggerStat;       // shared by fetch tasks
  int8_t    commitStat;        // 0 not in committing, 1 in committing
C
Cary Xu 已提交
95
  SArray   *aTaskFile;         // qTaskFiles committed recently(for recovery/snapshot r/w)
C
Cary Xu 已提交
96 97
  SHashObj *rsmaInfoHash;      // key: stbUid, value: SRSmaInfo;
  SHashObj *iRsmaInfoHash;     // key: stbUid, value: SRSmaInfo; immutable rsmaInfoHash
C
Cary Xu 已提交
98 99
};

100 101
struct SSmaStat {
  union {
C
Cary Xu 已提交
102 103
    STSmaStat tsmaStat;  // time-range-wise sma
    SRSmaStat rsmaStat;  // rollup sma
104 105 106
  };
  T_REF_DECLARE()
};
107

C
Cary Xu 已提交
108 109 110 111 112 113 114
#define SMA_TSMA_STAT(s)      (&(s)->tsmaStat)
#define SMA_RSMA_STAT(s)      (&(s)->rsmaStat)
#define RSMA_INFO_HASH(r)     ((r)->rsmaInfoHash)
#define RSMA_IMU_INFO_HASH(r) ((r)->iRsmaInfoHash)
#define RSMA_TRIGGER_STAT(r)  (&(r)->triggerStat)
#define RSMA_COMMIT_STAT(r)   (&(r)->commitStat)
#define RSMA_REF_ID(r)        ((r)->refId)
C
Cary Xu 已提交
115
#define RSMA_FS_LOCK(r)       (&(r)->lock)
C
Cary Xu 已提交
116 117 118 119

struct SRSmaInfoItem {
  int8_t  level;
  int8_t  triggerStat;
C
Cary Xu 已提交
120 121
  int32_t maxDelay;
  tmr_h   tmrId;
C
Cary Xu 已提交
122 123 124
};

struct SRSmaInfo {
C
Cary Xu 已提交
125 126
  STSchema *pTSchema;
  int64_t   suid;
C
Cary Xu 已提交
127
  int64_t   refId;  // refId of SRSmaStat
C
Cary Xu 已提交
128 129
  int8_t    delFlag;
  T_REF_DECLARE()
C
Cary Xu 已提交
130
  SRSmaInfoItem items[TSDB_RETENTION_L2];
C
Cary Xu 已提交
131 132
  void         *taskInfo[TSDB_RETENTION_L2];   // qTaskInfo_t
  void         *iTaskInfo[TSDB_RETENTION_L2];  // immutable
C
Cary Xu 已提交
133
};
C
Cary Xu 已提交
134 135 136 137 138 139 140

#define RSMA_INFO_HEAD_LEN     32
#define RSMA_INFO_IS_DEL(r)    ((r)->delFlag == 1)
#define RSMA_INFO_SET_DEL(r)   ((r)->delFlag = 1)
#define RSMA_INFO_QTASK(r, i)  ((r)->taskInfo[i])
#define RSMA_INFO_IQTASK(r, i) ((r)->iTaskInfo[i])
#define RSMA_INFO_ITEM(r, i)   (&(r)->items[i])
C
Cary Xu 已提交
141 142 143 144 145

enum {
  TASK_TRIGGER_STAT_INIT = 0,
  TASK_TRIGGER_STAT_ACTIVE = 1,
  TASK_TRIGGER_STAT_INACTIVE = 2,
C
Cary Xu 已提交
146 147
  TASK_TRIGGER_STAT_PAUSED = 3,
  TASK_TRIGGER_STAT_CANCELLED = 4,
148
  TASK_TRIGGER_STAT_DROPPED = 5,
C
Cary Xu 已提交
149
};
C
Cary Xu 已提交
150

C
Cary Xu 已提交
151 152 153
enum {
  RSMA_ROLE_CREATE = 0,
  RSMA_ROLE_DROP = 1,
C
Cary Xu 已提交
154 155
  RSMA_ROLE_SUBMIT = 2,
  RSMA_ROLE_FETCH = 3,
C
Cary Xu 已提交
156 157
  RSMA_ROLE_ITERATE = 4,
};
C
Cary Xu 已提交
158

159 160 161 162 163
enum {
  RSMA_RESTORE_REBOOT = 1,
  RSMA_RESTORE_SYNC = 2,
};

164 165 166 167 168 169 170 171 172
void  tdDestroySmaEnv(SSmaEnv *pSmaEnv);
void *tdFreeSmaEnv(SSmaEnv *pSmaEnv);

int32_t tdDropTSma(SSma *pSma, char *pMsg);
int32_t tdDropTSmaData(SSma *pSma, int64_t indexUid);
int32_t tdInsertRSmaData(SSma *pSma, char *msg);

int32_t tdRefSmaStat(SSma *pSma, SSmaStat *pStat);
int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat);
C
Cary Xu 已提交
173 174
int32_t tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo);
int32_t tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo);
C
Cary Xu 已提交
175

C
Cary Xu 已提交
176 177
void   *tdAcquireSmaRef(int32_t rsetId, int64_t refId);
int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId);
C
Cary Xu 已提交
178

C
Cary Xu 已提交
179
int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType);
180 181 182 183

int32_t tdLockSma(SSma *pSma);
int32_t tdUnLockSma(SSma *pSma);

C
Cary Xu 已提交
184 185 186
static FORCE_INLINE int8_t tdSmaStat(STSmaStat *pTStat) {
  if (pTStat) {
    return atomic_load_8(&pTStat->state);
187 188 189 190
  }
  return TSDB_SMA_STAT_UNKNOWN;
}

C
Cary Xu 已提交
191 192
static FORCE_INLINE bool tdSmaStatIsOK(STSmaStat *pTStat, int8_t *state) {
  if (!pTStat) {
193 194 195 196
    return false;
  }

  if (state) {
C
Cary Xu 已提交
197
    *state = atomic_load_8(&pTStat->state);
198 199
    return *state == TSDB_SMA_STAT_OK;
  }
C
Cary Xu 已提交
200
  return atomic_load_8(&pTStat->state) == TSDB_SMA_STAT_OK;
201 202
}

C
Cary Xu 已提交
203 204
static FORCE_INLINE bool tdSmaStatIsExpired(STSmaStat *pTStat) {
  return pTStat ? (atomic_load_8(&pTStat->state) & TSDB_SMA_STAT_EXPIRED) : true;
205 206
}

C
Cary Xu 已提交
207 208
static FORCE_INLINE bool tdSmaStatIsDropped(STSmaStat *pTStat) {
  return pTStat ? (atomic_load_8(&pTStat->state) & TSDB_SMA_STAT_DROPPED) : true;
209 210
}

C
Cary Xu 已提交
211 212 213
static FORCE_INLINE void tdSmaStatSetOK(STSmaStat *pTStat) {
  if (pTStat) {
    atomic_store_8(&pTStat->state, TSDB_SMA_STAT_OK);
214 215 216
  }
}

C
Cary Xu 已提交
217 218 219
static FORCE_INLINE void tdSmaStatSetExpired(STSmaStat *pTStat) {
  if (pTStat) {
    atomic_or_fetch_8(&pTStat->state, TSDB_SMA_STAT_EXPIRED);
220 221 222
  }
}

C
Cary Xu 已提交
223 224 225
static FORCE_INLINE void tdSmaStatSetDropped(STSmaStat *pTStat) {
  if (pTStat) {
    atomic_or_fetch_8(&pTStat->state, TSDB_SMA_STAT_DROPPED);
226 227
  }
}
C
Cary Xu 已提交
228

C
Cary Xu 已提交
229 230
void           tdRSmaQTaskInfoGetFileName(int32_t vid, int64_t version, char *outputName);
void           tdRSmaQTaskInfoGetFullName(int32_t vid, int64_t version, const char *path, char *outputName);
C
Cary Xu 已提交
231
int32_t        tdCloneRSmaInfo(SSma *pSma, SRSmaInfo **pDest, SRSmaInfo *pSrc);
C
Cary Xu 已提交
232
void           tdFreeQTaskInfo(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level);
C
Cary Xu 已提交
233 234
static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType);
void          *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType);
C
Cary Xu 已提交
235 236
void          *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree);
int32_t        tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash);
237

238
int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName);
239 240
int32_t tdProcessRSmaRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer);
int32_t tdRsmaRestore(SSma *pSma, int8_t type, int64_t committedVer);
C
Cary Xu 已提交
241

C
Cary Xu 已提交
242
int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg);
C
Cary Xu 已提交
243 244
int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg);
int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);
C
Cary Xu 已提交
245

C
Cary Xu 已提交
246 247
// smaFileUtil ================

248 249
#define TD_FILE_HEAD_SIZE 512

C
Cary Xu 已提交
250 251 252 253
typedef struct STFInfo STFInfo;
typedef struct STFile  STFile;

struct STFInfo {
254
  // common fields
C
Cary Xu 已提交
255
  uint32_t magic;
C
Cary Xu 已提交
256
  uint32_t ftype;
C
Cary Xu 已提交
257
  uint32_t fver;
C
Cary Xu 已提交
258
  int64_t  fsize;
C
Cary Xu 已提交
259 260
};

261 262 263 264
enum {
  TD_FTYPE_RSMA_QTASKINFO = 0,
};

C
Cary Xu 已提交
265
struct STFile {
C
Cary Xu 已提交
266
  uint8_t   state;
C
Cary Xu 已提交
267
  STFInfo   info;
C
Cary Xu 已提交
268
  char     *fname;
C
Cary Xu 已提交
269 270 271
  TdFilePtr pFile;
};

C
Cary Xu 已提交
272 273
#define TD_TFILE_PFILE(tf)        ((tf)->pFile)
#define TD_TFILE_OPENED(tf)       (TD_TFILE_PFILE(tf) != NULL)
C
Cary Xu 已提交
274
#define TD_TFILE_FULL_NAME(tf)    ((tf)->fname)
C
Cary Xu 已提交
275 276 277 278
#define TD_TFILE_OPENED(tf)       (TD_TFILE_PFILE(tf) != NULL)
#define TD_TFILE_CLOSED(tf)       (!TD_TFILE_OPENED(tf))
#define TD_TFILE_SET_CLOSED(f)    (TD_TFILE_PFILE(f) = NULL)
#define TD_TFILE_SET_STATE(tf, s) ((tf)->state = (s))
C
Cary Xu 已提交
279

C
Cary Xu 已提交
280 281
int32_t tdInitTFile(STFile *pTFile, const char *dname, const char *fname);
int32_t tdCreateTFile(STFile *pTFile, bool updateHeader, int8_t fType);
C
Cary Xu 已提交
282 283 284 285 286
int32_t tdOpenTFile(STFile *pTFile, int flags);
int64_t tdReadTFile(STFile *pTFile, void *buf, int64_t nbyte);
int64_t tdSeekTFile(STFile *pTFile, int64_t offset, int whence);
int64_t tdWriteTFile(STFile *pTFile, void *buf, int64_t nbyte);
int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset);
287
int64_t tdGetTFileSize(STFile *pTFile, int64_t *size);
C
Cary Xu 已提交
288 289 290 291 292
int32_t tdRemoveTFile(STFile *pTFile);
int32_t tdLoadTFileHeader(STFile *pTFile, STFInfo *pInfo);
int32_t tdUpdateTFileHeader(STFile *pTFile);
void    tdUpdateTFileMagic(STFile *pTFile, void *pCksm);
void    tdCloseTFile(STFile *pTFile);
C
Cary Xu 已提交
293
void    tdDestroyTFile(STFile *pTFile);
294

C
Cary Xu 已提交
295 296 297
void tdGetVndFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t version,
                      char *outputName);
void tdGetVndDirName(int32_t vgId, const char *pdname, const char *dname, bool endWithSep, char *outputName);
C
Cary Xu 已提交
298

299 300 301 302
#ifdef __cplusplus
}
#endif

L
Liu Jicong 已提交
303
#endif /*_TD_VNODE_SMA_H_*/