sma.h 9.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifndef _TD_VNODE_SMA_H_
#define _TD_VNODE_SMA_H_

#include "vnodeInt.h"

#ifdef __cplusplus
extern "C" {
#endif

// smaDebug ================
// clang-format off
#define smaFatal(...) do { if (smaDebugFlag & DEBUG_FATAL) { taosPrintLog("SMA FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }}     while(0)
#define smaError(...) do { if (smaDebugFlag & DEBUG_ERROR) { taosPrintLog("SMA ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }}     while(0)
#define smaWarn(...)  do { if (smaDebugFlag & DEBUG_WARN)  { taosPrintLog("SMA WARN ", DEBUG_WARN, 255, __VA_ARGS__); }}       while(0)
#define smaInfo(...)  do { if (smaDebugFlag & DEBUG_INFO)  { taosPrintLog("SMA ", DEBUG_INFO, 255, __VA_ARGS__); }}            while(0)
C
Cary Xu 已提交
31
#define smaDebug(...) do { if (smaDebugFlag & DEBUG_DEBUG) { taosPrintLog("SMA ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0)
32 33 34
#define smaTrace(...) do { if (smaDebugFlag & DEBUG_TRACE) { taosPrintLog("SMA ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on

C
Cary Xu 已提交
35
#define RSMA_TASK_INFO_HASH_SLOT (8)
C
Cary Xu 已提交
36

37 38
typedef struct SSmaEnv       SSmaEnv;
typedef struct SSmaStat      SSmaStat;
C
Cary Xu 已提交
39 40
typedef struct STSmaStat     STSmaStat;
typedef struct SRSmaStat     SRSmaStat;
41
typedef struct SRSmaRef      SRSmaRef;
42 43
typedef struct SRSmaInfo     SRSmaInfo;
typedef struct SRSmaInfoItem SRSmaInfoItem;
44
typedef struct SRSmaFS       SRSmaFS;
C
Cary Xu 已提交
45 46
typedef struct SQTaskFile    SQTaskFile;
typedef struct SQTaskFReader SQTaskFReader;
47 48

struct SSmaEnv {
C
Cary Xu 已提交
49 50
  SRWLatch  lock;
  int8_t    type;
C
Cary Xu 已提交
51
  int8_t    flag;  // 0x01 inClose
C
Cary Xu 已提交
52
  SSmaStat *pStat;
53 54
};

C
Cary Xu 已提交
55 56
#define SMA_ENV_FLG_CLOSE ((int8_t)0x1)

57 58 59 60 61
struct SRSmaRef {
  int64_t refId;  // for SRSmaStat
  int64_t suid;
};

62
typedef struct {
C
Cary Xu 已提交
63 64
  int8_t  inited;
  int32_t rsetId;
65
  void   *tmrHandle;  // shared by all fetch tasks
66 67 68 69 70 71
  /**
   * @brief key: void* of SRSmaInfoItem, value: SRSmaRef
   *  N.B. Although there is a very small possibility that "void*" point to different objects while with the same
   * address after release/renew, the functionality is not affected as it just used to fetch the rsma results.
   */
  SHashObj *refHash;  // shared by all vgroups
72 73
} SSmaMgmt;

C
Cary Xu 已提交
74 75 76 77
#define SMA_ENV_LOCK(env)  (&(env)->lock)
#define SMA_ENV_TYPE(env)  ((env)->type)
#define SMA_ENV_STAT(env)  ((env)->pStat)
#define SMA_RSMA_STAT(sma) ((SRSmaStat *)SMA_ENV_STAT((SSmaEnv *)(sma)->pRSmaEnv))
78

C
Cary Xu 已提交
79
struct STSmaStat {
C
Cary Xu 已提交
80 81 82
  int8_t    state;  // ETsdbSmaStat
  STSma    *pTSma;  // cache schema
  STSchema *pTSchema;
83 84
};

C
Cary Xu 已提交
85 86
struct SQTaskFile {
  volatile int32_t nRef;
K
kailixu 已提交
87 88
  int8_t           level;
  int64_t          suid;
89
  int64_t          version;
C
Cary Xu 已提交
90
  int64_t          size;
91
  int64_t          mtime;
C
Cary Xu 已提交
92 93 94
};

struct SQTaskFReader {
95
  SSma     *pSma;
K
kailixu 已提交
96 97
  int8_t    level;
  int64_t   suid;
98 99
  int64_t   version;
  TdFilePtr pReadH;
C
Cary Xu 已提交
100 101
};

102 103 104 105
struct SRSmaFS {
  SArray *aQTaskInf;  // array of SQTaskFile
};

C
Cary Xu 已提交
106
struct SRSmaStat {
C
Cary Xu 已提交
107 108 109
  SSma            *pSma;
  int64_t          commitAppliedVer;  // vnode applied version for async commit
  int64_t          refId;             // shared by fetch tasks
C
Cary Xu 已提交
110
  volatile int64_t nBufItems;         // number of items in queue buffer
C
Cary Xu 已提交
111
  SRWLatch         lock;              // r/w lock for rsma fs(e.g. qtaskinfo)
C
Cary Xu 已提交
112
  volatile int32_t nFetchAll;         // active number of fetch all
113 114 115
  volatile int8_t  triggerStat;       // shared by fetch tasks
  volatile int8_t  commitStat;        // 0 not in committing, 1 in committing
  volatile int8_t  delFlag;           // 0 no deleted SRSmaInfo, 1 has deleted SRSmaInfo
116
  SRSmaFS          fs;                // for recovery/snapshot r/w
C
Cary Xu 已提交
117
  SHashObj        *infoHash;          // key: suid, value: SRSmaInfo
C
Cary Xu 已提交
118
  tsem_t           notEmpty;          // has items in queue buffer
C
Cary Xu 已提交
119 120
};

121 122
struct SSmaStat {
  union {
C
Cary Xu 已提交
123 124
    STSmaStat tsmaStat;  // time-range-wise sma
    SRSmaStat rsmaStat;  // rollup sma
125 126
  };
  T_REF_DECLARE()
C
Cary Xu 已提交
127
  char data[];
128
};
129

C
Cary Xu 已提交
130 131 132 133 134 135
#define SMA_STAT_TSMA(s)     (&(s)->tsmaStat)
#define SMA_STAT_RSMA(s)     (&(s)->rsmaStat)
#define RSMA_INFO_HASH(r)    ((r)->infoHash)
#define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat)
#define RSMA_COMMIT_STAT(r)  (&(r)->commitStat)
#define RSMA_REF_ID(r)       ((r)->refId)
136
#define RSMA_FS(r)           (&(r)->fs)
C
Cary Xu 已提交
137
#define RSMA_FS_LOCK(r)      (&(r)->lock)
C
Cary Xu 已提交
138 139

struct SRSmaInfoItem {
C
Cary Xu 已提交
140 141 142
  int8_t   level : 4;
  int8_t   fetchLevel : 4;
  int8_t   triggerStat;
C
Cary Xu 已提交
143
  uint16_t nScanned;
C
Cary Xu 已提交
144 145
  int32_t  maxDelay;  // ms
  tmr_h    tmrId;
146
  void    *pStreamState;
C
Cary Xu 已提交
147 148 149
};

struct SRSmaInfo {
C
Cary Xu 已提交
150
  SSma     *pSma;
C
Cary Xu 已提交
151 152
  STSchema *pTSchema;
  int64_t   suid;
C
Cary Xu 已提交
153 154
  int64_t   lastRecv;  // ms
  int8_t    assigned;  // 0 idle, 1 assgined for exec
C
Cary Xu 已提交
155
  int8_t    delFlag;
C
Cary Xu 已提交
156
  int16_t   padding;
C
Cary Xu 已提交
157
  T_REF_DECLARE()
C
Cary Xu 已提交
158
  SRSmaInfoItem items[TSDB_RETENTION_L2];
C
Cary Xu 已提交
159
  void         *taskInfo[TSDB_RETENTION_L2];   // qTaskInfo_t
C
Cary Xu 已提交
160 161 162 163 164
  STaosQueue   *queue;                         // buffer queue of SubmitReq
  STaosQall    *qall;                          // buffer qall of SubmitReq
  void         *iTaskInfo[TSDB_RETENTION_L2];  // immutable qTaskInfo_t
  STaosQueue   *iQueue;                        // immutable buffer queue of SubmitReq
  STaosQall    *iQall;                         // immutable buffer qall of SubmitReq
C
Cary Xu 已提交
165
};
C
Cary Xu 已提交
166

C
Cary Xu 已提交
167
#define RSMA_INFO_HEAD_LEN     offsetof(SRSmaInfo, items)
C
Cary Xu 已提交
168 169 170
#define RSMA_INFO_IS_DEL(r)    ((r)->delFlag == 1)
#define RSMA_INFO_SET_DEL(r)   ((r)->delFlag = 1)
#define RSMA_INFO_QTASK(r, i)  ((r)->taskInfo[i])
C
Cary Xu 已提交
171
#define RSMA_INFO_IQTASK(r, i) ((r)->iTaskInfo[i])
C
Cary Xu 已提交
172
#define RSMA_INFO_ITEM(r, i)   (&(r)->items[i])
C
Cary Xu 已提交
173 174 175 176 177

enum {
  TASK_TRIGGER_STAT_INIT = 0,
  TASK_TRIGGER_STAT_ACTIVE = 1,
  TASK_TRIGGER_STAT_INACTIVE = 2,
C
Cary Xu 已提交
178 179
  TASK_TRIGGER_STAT_PAUSED = 3,
  TASK_TRIGGER_STAT_CANCELLED = 4,
180
  TASK_TRIGGER_STAT_DROPPED = 5,
C
Cary Xu 已提交
181
};
C
Cary Xu 已提交
182

183 184 185 186 187
enum {
  RSMA_RESTORE_REBOOT = 1,
  RSMA_RESTORE_SYNC = 2,
};

C
Cary Xu 已提交
188 189 190 191 192 193
typedef enum {
  RSMA_EXEC_OVERFLOW = 1,  // triggered by queue buf overflow
  RSMA_EXEC_TIMEOUT = 2,   // triggered by timer
  RSMA_EXEC_COMMIT = 3,    // triggered by commit
} ERsmaExecType;

194 195 196 197
// sma
int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType);
void    tdDestroySmaEnv(SSmaEnv *pSmaEnv);
void   *tdFreeSmaEnv(SSmaEnv *pSmaEnv);
198 199
int32_t tdLockSma(SSma *pSma);
int32_t tdUnLockSma(SSma *pSma);
200 201
void   *tdAcquireSmaRef(int32_t rsetId, int64_t refId);
int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId);
202

203 204 205 206 207 208 209 210 211
static FORCE_INLINE void tdRefSmaStat(SSma *pSma, SSmaStat *pStat) {
  int32_t ref = T_REF_INC(pStat);
  smaDebug("vgId:%d, ref sma stat:%p, val:%d", SMA_VID(pSma), pStat, ref);
}
static FORCE_INLINE void tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat) {
  int32_t ref = T_REF_DEC(pStat);
  smaDebug("vgId:%d, unref sma stat:%p, val:%d", SMA_VID(pSma), pStat, ref);
}

212 213
// rsma
void   *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree);
K
kailixu 已提交
214
int32_t tdRSmaFSOpen(SSma *pSma, int64_t version, int8_t rollback);
215
void    tdRSmaFSClose(SRSmaFS *fs);
K
kailixu 已提交
216 217
int32_t tdRSmaFSPrepareCommit(SSma *pSma, SRSmaFS *pFSNew);
int32_t tdRSmaFSCommit(SSma *pSma);
K
kailixu 已提交
218
int32_t tdRSmaFSFinishCommit(SSma *pSma);
K
kailixu 已提交
219 220 221 222 223
int32_t tdRSmaFSCopy(SSma *pSma, SRSmaFS *pFS);
int32_t tdRSmaFSTakeSnapshot(SSma *pSma, SRSmaFS *pFS);
int32_t tdRSmaFSRef(SSma *pSma, SRSmaFS *pFS);
void    tdRSmaFSUnRef(SSma *pSma, SRSmaFS *pFS);
int32_t tdRSmaFSUpsertQTaskFile(SSma *pSma, SRSmaFS *pFS, SQTaskFile *qTaskFile, int32_t nSize);
K
kailixu 已提交
224 225
int32_t tdRSmaFSRollback(SSma *pSma);
int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer, int8_t rollback);
226 227 228
int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName);
int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type);
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash);
K
kailixu 已提交
229 230 231 232
int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer, int8_t rollback);
void    tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t suid, int8_t level, int64_t version, char *outputName);
void    tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t suid, int8_t level, int64_t version, const char *path,
                                   char *outputName);
233 234
void    tdRSmaQTaskInfoGetFullPath(int32_t vgId, int8_t level, const char *path, char *outputName);
void    tdRSmaQTaskInfoGetFullPathEx(int32_t vgId, tb_uid_t suid, int8_t level, const char *path, char *outputName);
235 236 237

static FORCE_INLINE void tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) {
  int32_t ref = T_REF_INC(pRSmaInfo);
K
kailixu 已提交
238
  smaTrace("vgId:%d, ref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref);
239 240 241
}
static FORCE_INLINE void tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) {
  int32_t ref = T_REF_DEC(pRSmaInfo);
K
kailixu 已提交
242
  smaTrace("vgId:%d, unref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref);
243
}
C
Cary Xu 已提交
244

K
kailixu 已提交
245 246 247
void tdRSmaGetFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t suid,
                       int8_t level, int64_t version, char *outputName);
void tdRSmaGetDirName(int32_t vgId, const char *pdname, const char *dname, bool endWithSep, char *outputName);
C
Cary Xu 已提交
248

249 250 251 252
#ifdef __cplusplus
}
#endif

L
Liu Jicong 已提交
253
#endif /*_TD_VNODE_SMA_H_*/