sma.h 8.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifndef _TD_VNODE_SMA_H_
#define _TD_VNODE_SMA_H_

#include "vnodeInt.h"

#ifdef __cplusplus
extern "C" {
#endif

// smaDebug ================
// clang-format off
#define smaFatal(...) do { if (smaDebugFlag & DEBUG_FATAL) { taosPrintLog("SMA FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }}     while(0)
#define smaError(...) do { if (smaDebugFlag & DEBUG_ERROR) { taosPrintLog("SMA ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }}     while(0)
#define smaWarn(...)  do { if (smaDebugFlag & DEBUG_WARN)  { taosPrintLog("SMA WARN ", DEBUG_WARN, 255, __VA_ARGS__); }}       while(0)
#define smaInfo(...)  do { if (smaDebugFlag & DEBUG_INFO)  { taosPrintLog("SMA ", DEBUG_INFO, 255, __VA_ARGS__); }}            while(0)
C
Cary Xu 已提交
31
#define smaDebug(...) do { if (smaDebugFlag & DEBUG_DEBUG) { taosPrintLog("SMA ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0)
32 33 34
#define smaTrace(...) do { if (smaDebugFlag & DEBUG_TRACE) { taosPrintLog("SMA ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on

35 36
typedef struct SSmaEnv       SSmaEnv;
typedef struct SSmaStat      SSmaStat;
C
Cary Xu 已提交
37 38
typedef struct STSmaStat     STSmaStat;
typedef struct SRSmaStat     SRSmaStat;
39 40 41
typedef struct SSmaKey       SSmaKey;
typedef struct SRSmaInfo     SRSmaInfo;
typedef struct SRSmaInfoItem SRSmaInfoItem;
42 43 44 45 46 47 48

struct SSmaEnv {
  TdThreadRwlock lock;
  int8_t         type;
  SSmaStat      *pStat;
};

49
typedef struct {
C
Cary Xu 已提交
50 51
  int8_t  inited;
  int32_t rsetId;
52
  void   *tmrHandle;  // shared by all fetch tasks
53 54
} SSmaMgmt;

C
Cary Xu 已提交
55 56 57
#define SMA_ENV_LOCK(env) ((env)->lock)
#define SMA_ENV_TYPE(env) ((env)->type)
#define SMA_ENV_STAT(env) ((env)->pStat)
58

C
Cary Xu 已提交
59
struct STSmaStat {
C
Cary Xu 已提交
60 61 62
  int8_t    state;  // ETsdbSmaStat
  STSma    *pTSma;  // cache schema
  STSchema *pTSchema;
63 64
};

C
Cary Xu 已提交
65 66
struct SRSmaStat {
  SSma     *pSma;
67
  int64_t   submitVer;
C
Cary Xu 已提交
68 69
  int64_t   refId;         // shared by fetch tasks
  int8_t    triggerStat;   // shared by fetch tasks
C
Cary Xu 已提交
70 71 72
  SHashObj *rsmaInfoHash;  // key: stbUid, value: SRSmaInfo;
};

73 74
struct SSmaStat {
  union {
C
Cary Xu 已提交
75 76
    STSmaStat tsmaStat;  // time-range-wise sma
    SRSmaStat rsmaStat;  // rollup sma
77 78 79
  };
  T_REF_DECLARE()
};
80

C
Cary Xu 已提交
81 82 83 84
#define SMA_TSMA_STAT(s)     (&(s)->tsmaStat)
#define SMA_RSMA_STAT(s)     (&(s)->rsmaStat)
#define RSMA_INFO_HASH(r)    ((r)->rsmaInfoHash)
#define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat)
85
#define RSMA_REF_ID(r)       ((r)->refId)
86
#define RSMA_SUBMIT_VER(r)   ((r)->submitVer)
C
Cary Xu 已提交
87 88 89 90 91

enum {
  TASK_TRIGGER_STAT_INIT = 0,
  TASK_TRIGGER_STAT_ACTIVE = 1,
  TASK_TRIGGER_STAT_INACTIVE = 2,
C
Cary Xu 已提交
92 93
  TASK_TRIGGER_STAT_PAUSED = 3,
  TASK_TRIGGER_STAT_CANCELLED = 4,
94
  TASK_TRIGGER_STAT_DROPPED = 5,
C
Cary Xu 已提交
95
};
C
Cary Xu 已提交
96

97 98 99 100 101 102 103 104 105
void  tdDestroySmaEnv(SSmaEnv *pSmaEnv);
void *tdFreeSmaEnv(SSmaEnv *pSmaEnv);

int32_t tdDropTSma(SSma *pSma, char *pMsg);
int32_t tdDropTSmaData(SSma *pSma, int64_t indexUid);
int32_t tdInsertRSmaData(SSma *pSma, char *msg);

int32_t tdRefSmaStat(SSma *pSma, SSmaStat *pStat);
int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat);
C
Cary Xu 已提交
106 107 108 109

void   *tdAcquireSmaRef(int32_t rsetId, int64_t refId, const char *tags, int32_t ln);
int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId, const char *tags, int32_t ln);

C
Cary Xu 已提交
110
int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType);
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141

int32_t tdLockSma(SSma *pSma);
int32_t tdUnLockSma(SSma *pSma);

static FORCE_INLINE int32_t tdRLockSmaEnv(SSmaEnv *pEnv) {
  int code = taosThreadRwlockRdlock(&(pEnv->lock));
  if (code != 0) {
    terrno = TAOS_SYSTEM_ERROR(code);
    return -1;
  }
  return 0;
}

static FORCE_INLINE int32_t tdWLockSmaEnv(SSmaEnv *pEnv) {
  int code = taosThreadRwlockWrlock(&(pEnv->lock));
  if (code != 0) {
    terrno = TAOS_SYSTEM_ERROR(code);
    return -1;
  }
  return 0;
}

static FORCE_INLINE int32_t tdUnLockSmaEnv(SSmaEnv *pEnv) {
  int code = taosThreadRwlockUnlock(&(pEnv->lock));
  if (code != 0) {
    terrno = TAOS_SYSTEM_ERROR(code);
    return -1;
  }
  return 0;
}

C
Cary Xu 已提交
142 143 144
static FORCE_INLINE int8_t tdSmaStat(STSmaStat *pTStat) {
  if (pTStat) {
    return atomic_load_8(&pTStat->state);
145 146 147 148
  }
  return TSDB_SMA_STAT_UNKNOWN;
}

C
Cary Xu 已提交
149 150
static FORCE_INLINE bool tdSmaStatIsOK(STSmaStat *pTStat, int8_t *state) {
  if (!pTStat) {
151 152 153 154
    return false;
  }

  if (state) {
C
Cary Xu 已提交
155
    *state = atomic_load_8(&pTStat->state);
156 157
    return *state == TSDB_SMA_STAT_OK;
  }
C
Cary Xu 已提交
158
  return atomic_load_8(&pTStat->state) == TSDB_SMA_STAT_OK;
159 160
}

C
Cary Xu 已提交
161 162
static FORCE_INLINE bool tdSmaStatIsExpired(STSmaStat *pTStat) {
  return pTStat ? (atomic_load_8(&pTStat->state) & TSDB_SMA_STAT_EXPIRED) : true;
163 164
}

C
Cary Xu 已提交
165 166
static FORCE_INLINE bool tdSmaStatIsDropped(STSmaStat *pTStat) {
  return pTStat ? (atomic_load_8(&pTStat->state) & TSDB_SMA_STAT_DROPPED) : true;
167 168
}

C
Cary Xu 已提交
169 170 171
static FORCE_INLINE void tdSmaStatSetOK(STSmaStat *pTStat) {
  if (pTStat) {
    atomic_store_8(&pTStat->state, TSDB_SMA_STAT_OK);
172 173 174
  }
}

C
Cary Xu 已提交
175 176 177
static FORCE_INLINE void tdSmaStatSetExpired(STSmaStat *pTStat) {
  if (pTStat) {
    atomic_or_fetch_8(&pTStat->state, TSDB_SMA_STAT_EXPIRED);
178 179 180
  }
}

C
Cary Xu 已提交
181 182 183
static FORCE_INLINE void tdSmaStatSetDropped(STSmaStat *pTStat) {
  if (pTStat) {
    atomic_or_fetch_8(&pTStat->state, TSDB_SMA_STAT_DROPPED);
184 185 186
  }
}

C
Cary Xu 已提交
187 188
static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType);
void          *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType);
189
void          *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo);
C
Cary Xu 已提交
190
int32_t        tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat);
191

192 193
int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName);
int32_t tdProcessRSmaRestoreImpl(SSma *pSma);
C
Cary Xu 已提交
194

C
Cary Xu 已提交
195
int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg);
C
Cary Xu 已提交
196 197
int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg);
int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);
C
Cary Xu 已提交
198

C
Cary Xu 已提交
199 200
// smaFileUtil ================

201 202
#define TD_FILE_HEAD_SIZE 512

C
Cary Xu 已提交
203 204 205 206
typedef struct STFInfo STFInfo;
typedef struct STFile  STFile;

struct STFInfo {
207
  // common fields
C
Cary Xu 已提交
208
  uint32_t magic;
C
Cary Xu 已提交
209
  uint32_t ftype;
C
Cary Xu 已提交
210
  uint32_t fver;
C
Cary Xu 已提交
211
  int64_t  fsize;
212 213 214 215

  // specific fields
  union {
    struct {
216
      int64_t submitVer;
217 218
    } qTaskInfo;
  };
C
Cary Xu 已提交
219 220
};

221 222 223 224
enum {
  TD_FTYPE_RSMA_QTASKINFO = 0,
};

C
Cary Xu 已提交
225
struct STFile {
C
Cary Xu 已提交
226
  uint8_t   state;
C
Cary Xu 已提交
227
  STFInfo   info;
C
Cary Xu 已提交
228
  char     *fname;
C
Cary Xu 已提交
229 230 231
  TdFilePtr pFile;
};

C
Cary Xu 已提交
232 233
#define TD_TFILE_PFILE(tf)        ((tf)->pFile)
#define TD_TFILE_OPENED(tf)       (TD_TFILE_PFILE(tf) != NULL)
C
Cary Xu 已提交
234
#define TD_TFILE_FULL_NAME(tf)    ((tf)->fname)
C
Cary Xu 已提交
235 236 237 238
#define TD_TFILE_OPENED(tf)       (TD_TFILE_PFILE(tf) != NULL)
#define TD_TFILE_CLOSED(tf)       (!TD_TFILE_OPENED(tf))
#define TD_TFILE_SET_CLOSED(f)    (TD_TFILE_PFILE(f) = NULL)
#define TD_TFILE_SET_STATE(tf, s) ((tf)->state = (s))
C
Cary Xu 已提交
239

C
Cary Xu 已提交
240 241
int32_t tdInitTFile(STFile *pTFile, const char *dname, const char *fname);
int32_t tdCreateTFile(STFile *pTFile, bool updateHeader, int8_t fType);
C
Cary Xu 已提交
242 243 244 245 246
int32_t tdOpenTFile(STFile *pTFile, int flags);
int64_t tdReadTFile(STFile *pTFile, void *buf, int64_t nbyte);
int64_t tdSeekTFile(STFile *pTFile, int64_t offset, int whence);
int64_t tdWriteTFile(STFile *pTFile, void *buf, int64_t nbyte);
int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset);
247
int64_t tdGetTFileSize(STFile *pTFile, int64_t *size);
C
Cary Xu 已提交
248 249 250 251 252
int32_t tdRemoveTFile(STFile *pTFile);
int32_t tdLoadTFileHeader(STFile *pTFile, STFInfo *pInfo);
int32_t tdUpdateTFileHeader(STFile *pTFile);
void    tdUpdateTFileMagic(STFile *pTFile, void *pCksm);
void    tdCloseTFile(STFile *pTFile);
C
Cary Xu 已提交
253
void    tdDestroyTFile(STFile *pTFile);
254

C
Cary Xu 已提交
255
void tdGetVndFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t version, char *outputName);
C
Cary Xu 已提交
256
void tdGetVndDirName(int32_t vgId,const char *pdname,  const char *dname, bool endWithSep, char *outputName);
C
Cary Xu 已提交
257

258 259 260 261
#ifdef __cplusplus
}
#endif

L
Liu Jicong 已提交
262
#endif /*_TD_VNODE_SMA_H_*/