tcommon.h 7.3 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16 17
#ifndef _TD_COMMON_DEF_H_
#define _TD_COMMON_DEF_H_
H
Haojun Liao 已提交
18

H
Haojun Liao 已提交
19
#include "taosdef.h"
H
Haojun Liao 已提交
20
#include "tarray.h"
L
Liu Jicong 已提交
21
#include "tmsg.h"
22
#include "tvariant.h"
S
Shengliang Guan 已提交
23 24 25 26 27

#ifdef __cplusplus
extern "C" {
#endif

L
Liu Jicong 已提交
28 29 30 31 32
enum {
  TMQ_CONF__RESET_OFFSET__LATEST = -1,
  TMQ_CONF__RESET_OFFSET__EARLIEAST = -2,
  TMQ_CONF__RESET_OFFSET__NONE = -3,
};
L
Liu Jicong 已提交
33

L
Liu Jicong 已提交
34
enum {
L
Liu Jicong 已提交
35 36
  TMQ_MSG_TYPE__DUMMY = 0,
  TMQ_MSG_TYPE__POLL_RSP,
L
Liu Jicong 已提交
37 38 39
  TMQ_MSG_TYPE__EP_RSP,
};

L
fix  
Liu Jicong 已提交
40 41 42 43 44 45 46 47
enum {
  STREAM_TRIGGER__AT_ONCE = 1,
  STREAM_TRIGGER__WINDOW_CLOSE,
  STREAM_TRIGGER__BY_COUNT,
  STREAM_TRIGGER__BY_BATCH_COUNT,
  STREAM_TRIGGER__BY_EVENT_TIME,
};

48 49
typedef struct {
  uint32_t  numOfTables;
L
Liu Jicong 已提交
50 51
  SArray*   pGroupList;
  SHashObj* map;  // speedup acquire the tableQueryInfo by table uid
52 53
} STableGroupInfo;

54 55 56 57 58
typedef struct SColumnDataAgg {
  int16_t colId;
  int16_t maxIndex;
  int16_t minIndex;
  int16_t numOfNull;
59 60 61
  int64_t sum;
  int64_t max;
  int64_t min;
62 63 64
} SColumnDataAgg;

typedef struct SDataBlockInfo {
L
fix  
Liu Jicong 已提交
65 66 67 68
  STimeWindow window;
  int32_t     rows;
  int32_t     rowSize;
  union {
69
    int64_t uid;        // from which table of uid, comes from this data block
L
fix  
Liu Jicong 已提交
70 71
    int64_t blockId;
  };
H
Haojun Liao 已提交
72
  uint64_t    groupId;  // no need to serialize
73 74 75
  int16_t     numOfCols;
  int16_t     hasVarCol;
  int16_t     capacity;
76 77 78
} SDataBlockInfo;

typedef struct SSDataBlock {
L
Liu Jicong 已提交
79 80
  SColumnDataAgg* pBlockAgg;
  SArray*         pDataBlock;  // SArray<SColumnInfoData>
H
Haojun Liao 已提交
81
  SDataBlockInfo  info;
82 83
} SSDataBlock;

H
Haojun Liao 已提交
84
typedef struct SVarColAttr {
L
Liu Jicong 已提交
85
  int32_t* offset;    // start position for each entry in the list
H
Haojun Liao 已提交
86 87 88 89
  uint32_t length;    // used buffer size that contain the valid data
  uint32_t allocLen;  // allocated buffer size
} SVarColAttr;

90 91
// pBlockAgg->numOfNull == info.rows, all data are null
// pBlockAgg->numOfNull == 0, no data are null.
92
typedef struct SColumnInfoData {
L
Liu Jicong 已提交
93 94 95
  SColumnInfo info;     // TODO filter info needs to be removed
  bool        hasNull;  // if current column data has null value.
  char*       pData;    // the corresponding block data in memory
H
Haojun Liao 已提交
96
  union {
L
Liu Jicong 已提交
97
    char*       nullbitmap;  // bitmap, one bit for each item in the list
H
Haojun Liao 已提交
98 99
    SVarColAttr varmeta;
  };
100 101
} SColumnInfoData;

H
Haojun Liao 已提交
102 103 104
void*   blockDataDestroy(SSDataBlock* pBlock);
int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock);
void*   tDecodeDataBlock(const void* buf, SSDataBlock* pBlock);
H
Haojun Liao 已提交
105

L
Liu Jicong 已提交
106
int32_t tEncodeDataBlocks(void** buf, const SArray* blocks);
L
Liu Jicong 已提交
107
void*   tDecodeDataBlocks(const void* buf, SArray** blocks);
L
fix  
Liu Jicong 已提交
108
void    colDataDestroy(SColumnInfoData* pColData);
L
Liu Jicong 已提交
109

L
Liu Jicong 已提交
110 111 112 113 114 115
static FORCE_INLINE void blockDestroyInner(SSDataBlock* pBlock) {
  // WARNING: do not use info.numOfCols,
  // sometimes info.numOfCols != array size
  int32_t numOfOutput = taosArrayGetSize(pBlock->pDataBlock);
  for (int32_t i = 0; i < numOfOutput; ++i) {
    SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
116
    colDataDestroy(pColInfoData);
L
Liu Jicong 已提交
117
  }
L
Liu Jicong 已提交
118 119

  taosArrayDestroy(pBlock->pDataBlock);
wafwerar's avatar
wafwerar 已提交
120
  taosMemoryFreeClear(pBlock->pBlockAgg);
L
Liu Jicong 已提交
121 122
}

L
Liu Jicong 已提交
123 124
static FORCE_INLINE void tDeleteSSDataBlock(SSDataBlock* pBlock) { blockDestroyInner(pBlock); }

L
Liu Jicong 已提交
125
static FORCE_INLINE int32_t tEncodeSMqPollRsp(void** buf, const SMqPollRsp* pRsp) {
H
Haojun Liao 已提交
126 127
  int32_t tlen = 0;
  int32_t sz = 0;
L
Liu Jicong 已提交
128
  // tlen += taosEncodeFixedI64(buf, pRsp->consumerId);
H
Haojun Liao 已提交
129 130 131 132 133
  tlen += taosEncodeFixedI64(buf, pRsp->reqOffset);
  tlen += taosEncodeFixedI64(buf, pRsp->rspOffset);
  tlen += taosEncodeFixedI32(buf, pRsp->skipLogNum);
  tlen += taosEncodeFixedI32(buf, pRsp->numOfTopics);
  if (pRsp->numOfTopics == 0) return tlen;
L
Liu Jicong 已提交
134
  tlen += taosEncodeSSchemaWrapper(buf, pRsp->schema);
H
Haojun Liao 已提交
135 136 137 138 139
  if (pRsp->pBlockData) {
    sz = taosArrayGetSize(pRsp->pBlockData);
  }
  tlen += taosEncodeFixedI32(buf, sz);
  for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
140
    SSDataBlock* pBlock = (SSDataBlock*)taosArrayGet(pRsp->pBlockData, i);
H
Haojun Liao 已提交
141 142 143 144 145
    tlen += tEncodeDataBlock(buf, pBlock);
  }
  return tlen;
}

L
Liu Jicong 已提交
146
static FORCE_INLINE void* tDecodeSMqPollRsp(void* buf, SMqPollRsp* pRsp) {
H
Haojun Liao 已提交
147
  int32_t sz;
L
Liu Jicong 已提交
148
  // buf = taosDecodeFixedI64(buf, &pRsp->consumerId);
H
Haojun Liao 已提交
149 150 151 152 153
  buf = taosDecodeFixedI64(buf, &pRsp->reqOffset);
  buf = taosDecodeFixedI64(buf, &pRsp->rspOffset);
  buf = taosDecodeFixedI32(buf, &pRsp->skipLogNum);
  buf = taosDecodeFixedI32(buf, &pRsp->numOfTopics);
  if (pRsp->numOfTopics == 0) return buf;
wafwerar's avatar
wafwerar 已提交
154
  pRsp->schema = (SSchemaWrapper*)taosMemoryCalloc(1, sizeof(SSchemaWrapper));
L
Liu Jicong 已提交
155
  if (pRsp->schema == NULL) return NULL;
L
Liu Jicong 已提交
156
  buf = taosDecodeSSchemaWrapper(buf, pRsp->schema);
H
Haojun Liao 已提交
157 158 159 160 161 162 163 164 165 166
  buf = taosDecodeFixedI32(buf, &sz);
  pRsp->pBlockData = taosArrayInit(sz, sizeof(SSDataBlock));
  for (int32_t i = 0; i < sz; i++) {
    SSDataBlock block = {0};
    tDecodeDataBlock(buf, &block);
    taosArrayPush(pRsp->pBlockData, &block);
  }
  return buf;
}

L
Liu Jicong 已提交
167
static FORCE_INLINE void tDeleteSMqConsumeRsp(SMqPollRsp* pRsp) {
L
Liu Jicong 已提交
168 169
  if (pRsp->schema) {
    if (pRsp->schema->nCols) {
wafwerar's avatar
wafwerar 已提交
170
      taosMemoryFreeClear(pRsp->schema->pSchema);
H
Haojun Liao 已提交
171
    }
wafwerar's avatar
wafwerar 已提交
172
    taosMemoryFree(pRsp->schema);
H
Haojun Liao 已提交
173
  }
L
Liu Jicong 已提交
174
  taosArrayDestroyEx(pRsp->pBlockData, (void (*)(void*))blockDestroyInner);
L
Liu Jicong 已提交
175
  pRsp->pBlockData = NULL;
H
Haojun Liao 已提交
176 177
}

178 179
//======================================================================================================================
// the following structure shared by parser and executor
180
typedef struct SColumn {
H
Haojun Liao 已提交
181 182 183 184 185
  union {
    uint64_t uid;
    int64_t  dataBlockId;
  };

H
Haojun Liao 已提交
186 187
  int16_t colId;
  int16_t slotId;
H
Haojun Liao 已提交
188

H
Haojun Liao 已提交
189 190
  char    name[TSDB_COL_NAME_LEN];
  int8_t  flag;  // column type: normal column, tag, or user-input column (integer/float/string)
H
Haojun Liao 已提交
191 192 193 194
  int16_t type;
  int32_t bytes;
  uint8_t precision;
  uint8_t scale;
195 196
} SColumn;

H
Haojun Liao 已提交
197 198 199 200 201 202 203 204 205 206 207 208 209
typedef struct STableBlockDistInfo {
  uint16_t  rowSize;
  uint16_t  numOfFiles;
  uint32_t  numOfTables;
  uint64_t  totalSize;
  uint64_t  totalRows;
  int32_t   maxRows;
  int32_t   minRows;
  int32_t   firstSeekTimeUs;
  uint32_t  numOfRowsInMemTable;
  uint32_t  numOfSmallBlocks;
  SArray   *dataBlockInfos;
} STableBlockDistInfo;
210

G
Ganlin Zhao 已提交
211
enum {
H
Haojun Liao 已提交
212 213
  FUNC_PARAM_TYPE_VALUE = 0x1,
  FUNC_PARAM_TYPE_COLUMN= 0x2,
G
Ganlin Zhao 已提交
214 215
};

H
Haojun Liao 已提交
216 217
typedef struct SFunctParam {
  int32_t  type;
L
Liu Jicong 已提交
218
  SColumn* pCol;
H
Haojun Liao 已提交
219 220
  SVariant param;
} SFunctParam;
H
Haojun Liao 已提交
221

H
Haojun Liao 已提交
222
// the structure for sql function in select clause
H
Haojun Liao 已提交
223 224
typedef struct SResSchame {
  int8_t  type;
225
  int32_t slotId;
H
Haojun Liao 已提交
226 227 228 229 230 231 232
  int32_t bytes;
  int32_t precision;
  int32_t scale;
  char    name[TSDB_COL_NAME_LEN];
} SResSchema;

// TODO move away to executor.h
H
Haojun Liao 已提交
233
typedef struct SExprBasicInfo {
H
Haojun Liao 已提交
234
  SResSchema   resSchema;
H
Haojun Liao 已提交
235
  int16_t      numOfParams;  // argument value of each function
L
Liu Jicong 已提交
236
  SFunctParam* pParam;
H
Haojun Liao 已提交
237
} SExprBasicInfo;
238 239

typedef struct SExprInfo {
L
Liu Jicong 已提交
240 241
  struct SExprBasicInfo base;
  struct tExprNode*     pExpr;
242 243
} SExprInfo;

wmmhello's avatar
wmmhello 已提交
244 245 246 247 248 249 250 251 252
typedef struct {
  const char* key;
  int32_t keyLen;
  uint8_t type;
  int16_t length;
  const char* value;
  int32_t valueLen;
} SSmlKv;

L
fix  
Liu Jicong 已提交
253
#define QUERY_ASC_FORWARD_STEP 1
254 255 256 257
#define QUERY_DESC_FORWARD_STEP -1

#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)

H
Haojun Liao 已提交
258 259 260 261
#ifdef __cplusplus
}
#endif

L
Liu Jicong 已提交
262
#endif /*_TD_COMMON_DEF_H_*/