tcommon.h 8.7 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16 17
#ifndef _TD_COMMON_DEF_H_
#define _TD_COMMON_DEF_H_
H
Haojun Liao 已提交
18

H
Haojun Liao 已提交
19
#include "taosdef.h"
H
Haojun Liao 已提交
20
#include "tarray.h"
L
Liu Jicong 已提交
21
#include "tmsg.h"
22
#include "tvariant.h"
S
Shengliang Guan 已提交
23 24 25 26 27

#ifdef __cplusplus
extern "C" {
#endif

L
Liu Jicong 已提交
28 29 30 31 32
enum {
  TMQ_CONF__RESET_OFFSET__LATEST = -1,
  TMQ_CONF__RESET_OFFSET__EARLIEAST = -2,
  TMQ_CONF__RESET_OFFSET__NONE = -3,
};
L
Liu Jicong 已提交
33

L
Liu Jicong 已提交
34
enum {
L
Liu Jicong 已提交
35 36
  TMQ_MSG_TYPE__DUMMY = 0,
  TMQ_MSG_TYPE__POLL_RSP,
L
Liu Jicong 已提交
37 38 39
  TMQ_MSG_TYPE__EP_RSP,
};

40 41
typedef struct {
  uint32_t  numOfTables;
L
Liu Jicong 已提交
42 43
  SArray*   pGroupList;
  SHashObj* map;  // speedup acquire the tableQueryInfo by table uid
44 45
} STableGroupInfo;

46 47 48 49 50 51 52 53 54 55 56
typedef struct SColumnDataAgg {
  int16_t colId;
  int64_t sum;
  int64_t max;
  int64_t min;
  int16_t maxIndex;
  int16_t minIndex;
  int16_t numOfNull;
} SColumnDataAgg;

typedef struct SDataBlockInfo {
57 58 59 60 61
  STimeWindow    window;
  int32_t        rows;
  int32_t        rowSize;
  int16_t        numOfCols;
  int16_t        hasVarCol;
H
Haojun Liao 已提交
62
  union {int64_t uid; int64_t blockId;};
63 64
} SDataBlockInfo;

65 66
typedef struct SConstantItem {
  SColumnInfo info;
H
Haojun Liao 已提交
67
  int32_t     startRow;  // run-length-encoding to save the space for multiple rows
68
  int32_t     endRow;
69 70 71
  SVariant    value;
} SConstantItem;

72
// info.numOfCols = taosArrayGetSize(pDataBlock) + taosArrayGetSize(pConstantList);
73
typedef struct SSDataBlock {
H
Haojun Liao 已提交
74 75 76 77
  SColumnDataAgg *pBlockAgg;
  SArray         *pDataBlock;    // SArray<SColumnInfoData>
  SArray         *pConstantList; // SArray<SConstantItem>, it is a constant/tags value of the corresponding result value.
  SDataBlockInfo  info;
78 79
} SSDataBlock;

H
Haojun Liao 已提交
80
typedef struct SVarColAttr {
L
Liu Jicong 已提交
81
  int32_t* offset;    // start position for each entry in the list
H
Haojun Liao 已提交
82 83 84 85
  uint32_t length;    // used buffer size that contain the valid data
  uint32_t allocLen;  // allocated buffer size
} SVarColAttr;

86 87
// pBlockAgg->numOfNull == info.rows, all data are null
// pBlockAgg->numOfNull == 0, no data are null.
88
typedef struct SColumnInfoData {
L
Liu Jicong 已提交
89 90 91
  SColumnInfo info;     // TODO filter info needs to be removed
  bool        hasNull;  // if current column data has null value.
  char*       pData;    // the corresponding block data in memory
H
Haojun Liao 已提交
92
  union {
L
Liu Jicong 已提交
93
    char*       nullbitmap;  // bitmap, one bit for each item in the list
H
Haojun Liao 已提交
94 95
    SVarColAttr varmeta;
  };
96 97
} SColumnInfoData;

H
Haojun Liao 已提交
98 99
static FORCE_INLINE int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) {
  int64_t tbUid = pBlock->info.uid;
100 101
  int16_t numOfCols = pBlock->info.numOfCols;
  int16_t hasVarCol = pBlock->info.hasVarCol;
H
Haojun Liao 已提交
102 103 104 105 106
  int32_t rows = pBlock->info.rows;
  int32_t sz = taosArrayGetSize(pBlock->pDataBlock);

  int32_t tlen = 0;
  tlen += taosEncodeFixedI64(buf, tbUid);
107 108
  tlen += taosEncodeFixedI16(buf, numOfCols);
  tlen += taosEncodeFixedI16(buf, hasVarCol);
H
Haojun Liao 已提交
109 110 111 112 113 114
  tlen += taosEncodeFixedI32(buf, rows);
  tlen += taosEncodeFixedI32(buf, sz);
  for (int32_t i = 0; i < sz; i++) {
    SColumnInfoData* pColData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
    tlen += taosEncodeFixedI16(buf, pColData->info.colId);
    tlen += taosEncodeFixedI16(buf, pColData->info.type);
H
Haojun Liao 已提交
115
    tlen += taosEncodeFixedI32(buf, pColData->info.bytes);
H
Haojun Liao 已提交
116 117 118 119 120 121
    int32_t colSz = rows * pColData->info.bytes;
    tlen += taosEncodeBinary(buf, pColData->pData, colSz);
  }
  return tlen;
}

L
Liu Jicong 已提交
122
static FORCE_INLINE void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) {
H
Haojun Liao 已提交
123 124 125
  int32_t sz;

  buf = taosDecodeFixedI64(buf, &pBlock->info.uid);
126 127
  buf = taosDecodeFixedI16(buf, &pBlock->info.numOfCols);
  buf = taosDecodeFixedI16(buf, &pBlock->info.hasVarCol);
H
Haojun Liao 已提交
128 129 130 131 132 133 134
  buf = taosDecodeFixedI32(buf, &pBlock->info.rows);
  buf = taosDecodeFixedI32(buf, &sz);
  pBlock->pDataBlock = taosArrayInit(sz, sizeof(SColumnInfoData));
  for (int32_t i = 0; i < sz; i++) {
    SColumnInfoData data = {0};
    buf = taosDecodeFixedI16(buf, &data.info.colId);
    buf = taosDecodeFixedI16(buf, &data.info.type);
H
Haojun Liao 已提交
135
    buf = taosDecodeFixedI32(buf, &data.info.bytes);
H
Haojun Liao 已提交
136 137 138 139
    int32_t colSz = pBlock->info.rows * data.info.bytes;
    buf = taosDecodeBinary(buf, (void**)&data.pData, colSz);
    taosArrayPush(pBlock->pDataBlock, &data);
  }
L
Liu Jicong 已提交
140
  return (void*)buf;
H
Haojun Liao 已提交
141 142
}

L
Liu Jicong 已提交
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
static FORCE_INLINE void tDeleteSSDataBlock(SSDataBlock* pBlock) {
  if (pBlock == NULL) {
    return;
  }

  // int32_t numOfOutput = pBlock->info.numOfCols;
  int32_t sz = taosArrayGetSize(pBlock->pDataBlock);
  for (int32_t i = 0; i < sz; ++i) {
    SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
    tfree(pColInfoData->pData);
  }

  taosArrayDestroy(pBlock->pDataBlock);
  tfree(pBlock->pBlockAgg);
  // tfree(pBlock);
}

L
Liu Jicong 已提交
160
static FORCE_INLINE int32_t tEncodeSMqPollRsp(void** buf, const SMqPollRsp* pRsp) {
H
Haojun Liao 已提交
161 162 163 164 165 166 167 168 169 170 171 172 173 174
  int32_t tlen = 0;
  int32_t sz = 0;
  tlen += taosEncodeFixedI64(buf, pRsp->consumerId);
  tlen += taosEncodeFixedI64(buf, pRsp->reqOffset);
  tlen += taosEncodeFixedI64(buf, pRsp->rspOffset);
  tlen += taosEncodeFixedI32(buf, pRsp->skipLogNum);
  tlen += taosEncodeFixedI32(buf, pRsp->numOfTopics);
  if (pRsp->numOfTopics == 0) return tlen;
  tlen += tEncodeSSchemaWrapper(buf, pRsp->schemas);
  if (pRsp->pBlockData) {
    sz = taosArrayGetSize(pRsp->pBlockData);
  }
  tlen += taosEncodeFixedI32(buf, sz);
  for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
175
    SSDataBlock* pBlock = (SSDataBlock*)taosArrayGet(pRsp->pBlockData, i);
H
Haojun Liao 已提交
176 177 178 179 180
    tlen += tEncodeDataBlock(buf, pBlock);
  }
  return tlen;
}

L
Liu Jicong 已提交
181
static FORCE_INLINE void* tDecodeSMqPollRsp(void* buf, SMqPollRsp* pRsp) {
H
Haojun Liao 已提交
182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
  int32_t sz;
  buf = taosDecodeFixedI64(buf, &pRsp->consumerId);
  buf = taosDecodeFixedI64(buf, &pRsp->reqOffset);
  buf = taosDecodeFixedI64(buf, &pRsp->rspOffset);
  buf = taosDecodeFixedI32(buf, &pRsp->skipLogNum);
  buf = taosDecodeFixedI32(buf, &pRsp->numOfTopics);
  if (pRsp->numOfTopics == 0) return buf;
  pRsp->schemas = (SSchemaWrapper*)calloc(1, sizeof(SSchemaWrapper));
  if (pRsp->schemas == NULL) return NULL;
  buf = tDecodeSSchemaWrapper(buf, pRsp->schemas);
  buf = taosDecodeFixedI32(buf, &sz);
  pRsp->pBlockData = taosArrayInit(sz, sizeof(SSDataBlock));
  for (int32_t i = 0; i < sz; i++) {
    SSDataBlock block = {0};
    tDecodeDataBlock(buf, &block);
    taosArrayPush(pRsp->pBlockData, &block);
  }
  return buf;
}

L
Liu Jicong 已提交
202
static FORCE_INLINE void tDeleteSMqConsumeRsp(SMqPollRsp* pRsp) {
H
Haojun Liao 已提交
203 204 205 206 207 208
  if (pRsp->schemas) {
    if (pRsp->schemas->nCols) {
      tfree(pRsp->schemas->pSchema);
    }
    free(pRsp->schemas);
  }
L
Liu Jicong 已提交
209 210
  taosArrayDestroyEx(pRsp->pBlockData, (void (*)(void*))tDeleteSSDataBlock);
  pRsp->pBlockData = NULL;
H
Haojun Liao 已提交
211 212
}

213 214
//======================================================================================================================
// the following structure shared by parser and executor
215
typedef struct SColumn {
H
Haojun Liao 已提交
216 217 218 219 220 221 222 223 224 225
  union {
    uint64_t uid;
    int64_t  dataBlockId;
  };

  union {
    int16_t colId;
    int16_t slotId;
  };

H
Haojun Liao 已提交
226 227
  char    name[TSDB_COL_NAME_LEN];
  int8_t  flag;  // column type: normal column, tag, or user-input column (integer/float/string)
H
Haojun Liao 已提交
228 229 230 231
  int16_t type;
  int32_t bytes;
  uint8_t precision;
  uint8_t scale;
232 233
} SColumn;

234
typedef struct SLimit {
H
Haojun Liao 已提交
235 236
  int64_t limit;
  int64_t offset;
237 238 239
} SLimit;

typedef struct SOrder {
H
Haojun Liao 已提交
240 241
  uint32_t order;
  SColumn  col;
242 243 244
} SOrder;

typedef struct SGroupbyExpr {
L
Liu Jicong 已提交
245
  SArray* columnInfo;  // SArray<SColIndex>, group by columns information
H
Haojun Liao 已提交
246
  bool    groupbyTag;  // group by tag or column
247 248
} SGroupbyExpr;

H
Haojun Liao 已提交
249 250 251 252 253
typedef struct SFunctParam {
  int32_t  type;
  SColumn *pCol;
  SVariant param;
} SFunctParam;
H
Haojun Liao 已提交
254

H
Haojun Liao 已提交
255
// the structure for sql function in select clause
H
Haojun Liao 已提交
256 257 258 259 260 261 262 263 264 265
typedef struct SResSchame {
  int8_t  type;
  int32_t colId;
  int32_t bytes;
  int32_t precision;
  int32_t scale;
  char    name[TSDB_COL_NAME_LEN];
} SResSchema;

// TODO move away to executor.h
H
Haojun Liao 已提交
266
typedef struct SExprBasicInfo {
H
Haojun Liao 已提交
267
  SResSchema   resSchema;
H
Haojun Liao 已提交
268 269 270
  int16_t      numOfParams;  // argument value of each function
  SFunctParam *pParam;
} SExprBasicInfo;
271 272

typedef struct SExprInfo {
H
Haojun Liao 已提交
273 274
  struct SExprBasicInfo  base;
  struct tExprNode      *pExpr;
275 276
} SExprInfo;

277 278 279 280 281
typedef struct SStateWindow {
  SColumn col;
} SStateWindow;

typedef struct SSessionWindow {
H
Haojun Liao 已提交
282
  int64_t gap;  // gap between two session window(in microseconds)
283 284 285
  SColumn col;
} SSessionWindow;

L
Liu Jicong 已提交
286
#define QUERY_ASC_FORWARD_STEP  1
287 288 289 290
#define QUERY_DESC_FORWARD_STEP -1

#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)

H
Haojun Liao 已提交
291 292 293 294
#ifdef __cplusplus
}
#endif

L
Liu Jicong 已提交
295
#endif /*_TD_COMMON_DEF_H_*/