tcommon.h 8.7 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16 17
#ifndef _TD_COMMON_DEF_H_
#define _TD_COMMON_DEF_H_
H
Haojun Liao 已提交
18

H
Haojun Liao 已提交
19
#include "taosdef.h"
H
Haojun Liao 已提交
20
#include "tarray.h"
L
Liu Jicong 已提交
21
#include "tmsg.h"
22
#include "tvariant.h"
S
Shengliang Guan 已提交
23 24 25 26 27

#ifdef __cplusplus
extern "C" {
#endif

L
Liu Jicong 已提交
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
// typedef struct STimeWindow {
//   TSKEY skey;
//   TSKEY ekey;
// } STimeWindow;

// typedef struct {
//   int32_t dataLen;
//   char    name[TSDB_TABLE_FNAME_LEN];
//   char   *data;
// } STagData;

// typedef struct SSchema {
//   uint8_t type;
//   char    name[TSDB_COL_NAME_LEN];
//   int16_t colId;
//   int16_t bytes;
// } SSchema;

L
Liu Jicong 已提交
46 47 48 49 50
enum {
  TMQ_CONF__RESET_OFFSET__LATEST = -1,
  TMQ_CONF__RESET_OFFSET__EARLIEAST = -2,
  TMQ_CONF__RESET_OFFSET__NONE = -3,
};
L
Liu Jicong 已提交
51

52 53
typedef struct {
  uint32_t  numOfTables;
L
Liu Jicong 已提交
54 55
  SArray*   pGroupList;
  SHashObj* map;  // speedup acquire the tableQueryInfo by table uid
56 57
} STableGroupInfo;

58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
typedef struct SColumnDataAgg {
  int16_t colId;
  int64_t sum;
  int64_t max;
  int64_t min;
  int16_t maxIndex;
  int16_t minIndex;
  int16_t numOfNull;
} SColumnDataAgg;

typedef struct SDataBlockInfo {
  STimeWindow window;
  int32_t     rows;
  int32_t     numOfCols;
  int64_t     uid;
} SDataBlockInfo;

75 76
typedef struct SConstantItem {
  SColumnInfo info;
H
Haojun Liao 已提交
77
  int32_t     startRow;  // run-length-encoding to save the space for multiple rows
78
  int32_t     endRow;
79 80 81
  SVariant    value;
} SConstantItem;

82
// info.numOfCols = taosArrayGetSize(pDataBlock) + taosArrayGetSize(pConstantList);
83
typedef struct SSDataBlock {
L
Liu Jicong 已提交
84 85 86 87
  SColumnDataAgg* pBlockAgg;
  SArray*         pDataBlock;  // SArray<SColumnInfoData>
  SArray* pConstantList;       // SArray<SConstantItem>, it is a constant/tags value of the corresponding result value.
  SDataBlockInfo info;
88 89
} SSDataBlock;

H
Haojun Liao 已提交
90
typedef struct SVarColAttr {
L
Liu Jicong 已提交
91
  int32_t* offset;    // start position for each entry in the list
H
Haojun Liao 已提交
92 93 94 95
  uint32_t length;    // used buffer size that contain the valid data
  uint32_t allocLen;  // allocated buffer size
} SVarColAttr;

96 97
// pBlockAgg->numOfNull == info.rows, all data are null
// pBlockAgg->numOfNull == 0, no data are null.
98
typedef struct SColumnInfoData {
L
Liu Jicong 已提交
99 100 101
  SColumnInfo info;     // TODO filter info needs to be removed
  bool        hasNull;  // if current column data has null value.
  char*       pData;    // the corresponding block data in memory
H
Haojun Liao 已提交
102
  union {
L
Liu Jicong 已提交
103
    char*       nullbitmap;  // bitmap, one bit for each item in the list
H
Haojun Liao 已提交
104 105
    SVarColAttr varmeta;
  };
106 107
} SColumnInfoData;

H
Haojun Liao 已提交
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
static FORCE_INLINE int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) {
  int64_t tbUid = pBlock->info.uid;
  int32_t numOfCols = pBlock->info.numOfCols;
  int32_t rows = pBlock->info.rows;
  int32_t sz = taosArrayGetSize(pBlock->pDataBlock);

  int32_t tlen = 0;
  tlen += taosEncodeFixedI64(buf, tbUid);
  tlen += taosEncodeFixedI32(buf, numOfCols);
  tlen += taosEncodeFixedI32(buf, rows);
  tlen += taosEncodeFixedI32(buf, sz);
  for (int32_t i = 0; i < sz; i++) {
    SColumnInfoData* pColData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
    tlen += taosEncodeFixedI16(buf, pColData->info.colId);
    tlen += taosEncodeFixedI16(buf, pColData->info.type);
    tlen += taosEncodeFixedI16(buf, pColData->info.bytes);
    int32_t colSz = rows * pColData->info.bytes;
    tlen += taosEncodeBinary(buf, pColData->pData, colSz);
  }
  return tlen;
}

L
Liu Jicong 已提交
130
static FORCE_INLINE void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) {
H
Haojun Liao 已提交
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
  int32_t sz;

  buf = taosDecodeFixedI64(buf, &pBlock->info.uid);
  buf = taosDecodeFixedI32(buf, &pBlock->info.numOfCols);
  buf = taosDecodeFixedI32(buf, &pBlock->info.rows);
  buf = taosDecodeFixedI32(buf, &sz);
  pBlock->pDataBlock = taosArrayInit(sz, sizeof(SColumnInfoData));
  for (int32_t i = 0; i < sz; i++) {
    SColumnInfoData data = {0};
    buf = taosDecodeFixedI16(buf, &data.info.colId);
    buf = taosDecodeFixedI16(buf, &data.info.type);
    buf = taosDecodeFixedI16(buf, &data.info.bytes);
    int32_t colSz = pBlock->info.rows * data.info.bytes;
    buf = taosDecodeBinary(buf, (void**)&data.pData, colSz);
    taosArrayPush(pBlock->pDataBlock, &data);
  }
L
Liu Jicong 已提交
147
  return (void*)buf;
H
Haojun Liao 已提交
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
}

static FORCE_INLINE int32_t tEncodeSMqConsumeRsp(void** buf, const SMqConsumeRsp* pRsp) {
  int32_t tlen = 0;
  int32_t sz = 0;
  tlen += taosEncodeFixedI64(buf, pRsp->consumerId);
  tlen += taosEncodeFixedI64(buf, pRsp->reqOffset);
  tlen += taosEncodeFixedI64(buf, pRsp->rspOffset);
  tlen += taosEncodeFixedI32(buf, pRsp->skipLogNum);
  tlen += taosEncodeFixedI32(buf, pRsp->numOfTopics);
  if (pRsp->numOfTopics == 0) return tlen;
  tlen += tEncodeSSchemaWrapper(buf, pRsp->schemas);
  if (pRsp->pBlockData) {
    sz = taosArrayGetSize(pRsp->pBlockData);
  }
  tlen += taosEncodeFixedI32(buf, sz);
  for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
165
    SSDataBlock* pBlock = (SSDataBlock*)taosArrayGet(pRsp->pBlockData, i);
H
Haojun Liao 已提交
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196
    tlen += tEncodeDataBlock(buf, pBlock);
  }
  return tlen;
}

static FORCE_INLINE void* tDecodeSMqConsumeRsp(void* buf, SMqConsumeRsp* pRsp) {
  int32_t sz;
  buf = taosDecodeFixedI64(buf, &pRsp->consumerId);
  buf = taosDecodeFixedI64(buf, &pRsp->reqOffset);
  buf = taosDecodeFixedI64(buf, &pRsp->rspOffset);
  buf = taosDecodeFixedI32(buf, &pRsp->skipLogNum);
  buf = taosDecodeFixedI32(buf, &pRsp->numOfTopics);
  if (pRsp->numOfTopics == 0) return buf;
  pRsp->schemas = (SSchemaWrapper*)calloc(1, sizeof(SSchemaWrapper));
  if (pRsp->schemas == NULL) return NULL;
  buf = tDecodeSSchemaWrapper(buf, pRsp->schemas);
  buf = taosDecodeFixedI32(buf, &sz);
  pRsp->pBlockData = taosArrayInit(sz, sizeof(SSDataBlock));
  for (int32_t i = 0; i < sz; i++) {
    SSDataBlock block = {0};
    tDecodeDataBlock(buf, &block);
    taosArrayPush(pRsp->pBlockData, &block);
  }
  return buf;
}

static FORCE_INLINE void tDeleteSSDataBlock(SSDataBlock* pBlock) {
  if (pBlock == NULL) {
    return;
  }

L
Liu Jicong 已提交
197
  // int32_t numOfOutput = pBlock->info.numOfCols;
H
Haojun Liao 已提交
198
  int32_t sz = taosArrayGetSize(pBlock->pDataBlock);
L
Liu Jicong 已提交
199
  for (int32_t i = 0; i < sz; ++i) {
H
Haojun Liao 已提交
200 201 202 203 204 205
    SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
    tfree(pColInfoData->pData);
  }

  taosArrayDestroy(pBlock->pDataBlock);
  tfree(pBlock->pBlockAgg);
L
Liu Jicong 已提交
206
  // tfree(pBlock);
H
Haojun Liao 已提交
207 208 209 210 211 212 213 214 215
}

static FORCE_INLINE void tDeleteSMqConsumeRsp(SMqConsumeRsp* pRsp) {
  if (pRsp->schemas) {
    if (pRsp->schemas->nCols) {
      tfree(pRsp->schemas->pSchema);
    }
    free(pRsp->schemas);
  }
L
Liu Jicong 已提交
216 217
  taosArrayDestroyEx(pRsp->pBlockData, (void (*)(void*))tDeleteSSDataBlock);
  pRsp->pBlockData = NULL;
S
Shengliang Guan 已提交
218
  // for (int32_t i = 0; i < taosArrayGetSize(pRsp->pBlockData); i++) {
L
Liu Jicong 已提交
219 220
  // SSDataBlock* pDataBlock = (SSDataBlock*)taosArrayGet(pRsp->pBlockData, i);
  // tDeleteSSDataBlock(pDataBlock);
H
Haojun Liao 已提交
221 222 223
  //}
}

224 225
//======================================================================================================================
// the following structure shared by parser and executor
226
typedef struct SColumn {
H
Haojun Liao 已提交
227 228 229 230
  uint64_t    uid;
  char        name[TSDB_COL_NAME_LEN];
  int8_t      flag;  // column type: normal column, tag, or user-input column (integer/float/string)
  SColumnInfo info;
231 232
} SColumn;

233
typedef struct SLimit {
H
Haojun Liao 已提交
234 235
  int64_t limit;
  int64_t offset;
236 237 238
} SLimit;

typedef struct SOrder {
H
Haojun Liao 已提交
239 240
  uint32_t order;
  SColumn  col;
241 242 243
} SOrder;

typedef struct SGroupbyExpr {
L
Liu Jicong 已提交
244
  SArray* columnInfo;  // SArray<SColIndex>, group by columns information
H
Haojun Liao 已提交
245
  bool    groupbyTag;  // group by tag or column
246 247 248 249
} SGroupbyExpr;

// the structure for sql function in select clause
typedef struct SSqlExpr {
H
Haojun Liao 已提交
250 251 252
  char    token[TSDB_COL_NAME_LEN];  // original token
  SSchema resSchema;

L
Liu Jicong 已提交
253 254 255 256 257
  int32_t  numOfCols;
  SColumn* pColumns;     // data columns that are required by query
  int32_t  interBytes;   // inter result buffer size
  int16_t  numOfParams;  // argument value of each function
  SVariant param[3];     // parameters are not more than 3
258 259 260
} SSqlExpr;

typedef struct SExprInfo {
H
Haojun Liao 已提交
261
  struct SSqlExpr   base;
L
Liu Jicong 已提交
262
  struct tExprNode* pExpr;
263 264
} SExprInfo;

265 266 267 268 269
typedef struct SStateWindow {
  SColumn col;
} SStateWindow;

typedef struct SSessionWindow {
H
Haojun Liao 已提交
270
  int64_t gap;  // gap between two session window(in microseconds)
271 272 273
  SColumn col;
} SSessionWindow;

L
Liu Jicong 已提交
274
#define QUERY_ASC_FORWARD_STEP  1
275 276 277 278
#define QUERY_DESC_FORWARD_STEP -1

#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)

H
Haojun Liao 已提交
279 280 281 282
#ifdef __cplusplus
}
#endif

S
Shengliang Guan 已提交
283
#endif  /*_TD_COMMON_DEF_H_*/