tcommon.h 8.4 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16 17
#ifndef _TD_COMMON_DEF_H_
#define _TD_COMMON_DEF_H_
H
Haojun Liao 已提交
18

H
Haojun Liao 已提交
19
#include "taosdef.h"
H
Haojun Liao 已提交
20
#include "tarray.h"
L
Liu Jicong 已提交
21
#include "tmsg.h"
22
#include "tvariant.h"
S
Shengliang Guan 已提交
23 24 25 26 27

#ifdef __cplusplus
extern "C" {
#endif

L
Liu Jicong 已提交
28 29 30 31 32
enum {
  TMQ_CONF__RESET_OFFSET__LATEST = -1,
  TMQ_CONF__RESET_OFFSET__EARLIEAST = -2,
  TMQ_CONF__RESET_OFFSET__NONE = -3,
};
L
Liu Jicong 已提交
33

L
Liu Jicong 已提交
34
enum {
L
Liu Jicong 已提交
35 36
  TMQ_MSG_TYPE__DUMMY = 0,
  TMQ_MSG_TYPE__POLL_RSP,
L
Liu Jicong 已提交
37 38 39
  TMQ_MSG_TYPE__EP_RSP,
};

40 41
typedef struct {
  uint32_t  numOfTables;
L
Liu Jicong 已提交
42 43
  SArray*   pGroupList;
  SHashObj* map;  // speedup acquire the tableQueryInfo by table uid
44 45
} STableGroupInfo;

46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
typedef struct SColumnDataAgg {
  int16_t colId;
  int64_t sum;
  int64_t max;
  int64_t min;
  int16_t maxIndex;
  int16_t minIndex;
  int16_t numOfNull;
} SColumnDataAgg;

typedef struct SDataBlockInfo {
  STimeWindow window;
  int32_t     rows;
  int32_t     numOfCols;
  int64_t     uid;
} SDataBlockInfo;

63 64
typedef struct SConstantItem {
  SColumnInfo info;
H
Haojun Liao 已提交
65
  int32_t     startRow;  // run-length-encoding to save the space for multiple rows
66
  int32_t     endRow;
67 68 69
  SVariant    value;
} SConstantItem;

70
// info.numOfCols = taosArrayGetSize(pDataBlock) + taosArrayGetSize(pConstantList);
71
typedef struct SSDataBlock {
L
Liu Jicong 已提交
72 73 74 75
  SColumnDataAgg* pBlockAgg;
  SArray*         pDataBlock;  // SArray<SColumnInfoData>
  SArray* pConstantList;       // SArray<SConstantItem>, it is a constant/tags value of the corresponding result value.
  SDataBlockInfo info;
76 77
} SSDataBlock;

H
Haojun Liao 已提交
78
typedef struct SVarColAttr {
L
Liu Jicong 已提交
79
  int32_t* offset;    // start position for each entry in the list
H
Haojun Liao 已提交
80 81 82 83
  uint32_t length;    // used buffer size that contain the valid data
  uint32_t allocLen;  // allocated buffer size
} SVarColAttr;

84 85
// pBlockAgg->numOfNull == info.rows, all data are null
// pBlockAgg->numOfNull == 0, no data are null.
86
typedef struct SColumnInfoData {
L
Liu Jicong 已提交
87 88 89
  SColumnInfo info;     // TODO filter info needs to be removed
  bool        hasNull;  // if current column data has null value.
  char*       pData;    // the corresponding block data in memory
H
Haojun Liao 已提交
90
  union {
L
Liu Jicong 已提交
91
    char*       nullbitmap;  // bitmap, one bit for each item in the list
H
Haojun Liao 已提交
92 93
    SVarColAttr varmeta;
  };
94 95
} SColumnInfoData;

H
Haojun Liao 已提交
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
static FORCE_INLINE int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) {
  int64_t tbUid = pBlock->info.uid;
  int32_t numOfCols = pBlock->info.numOfCols;
  int32_t rows = pBlock->info.rows;
  int32_t sz = taosArrayGetSize(pBlock->pDataBlock);

  int32_t tlen = 0;
  tlen += taosEncodeFixedI64(buf, tbUid);
  tlen += taosEncodeFixedI32(buf, numOfCols);
  tlen += taosEncodeFixedI32(buf, rows);
  tlen += taosEncodeFixedI32(buf, sz);
  for (int32_t i = 0; i < sz; i++) {
    SColumnInfoData* pColData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
    tlen += taosEncodeFixedI16(buf, pColData->info.colId);
    tlen += taosEncodeFixedI16(buf, pColData->info.type);
    tlen += taosEncodeFixedI16(buf, pColData->info.bytes);
    int32_t colSz = rows * pColData->info.bytes;
    tlen += taosEncodeBinary(buf, pColData->pData, colSz);
  }
  return tlen;
}

L
Liu Jicong 已提交
118
static FORCE_INLINE void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) {
H
Haojun Liao 已提交
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
  int32_t sz;

  buf = taosDecodeFixedI64(buf, &pBlock->info.uid);
  buf = taosDecodeFixedI32(buf, &pBlock->info.numOfCols);
  buf = taosDecodeFixedI32(buf, &pBlock->info.rows);
  buf = taosDecodeFixedI32(buf, &sz);
  pBlock->pDataBlock = taosArrayInit(sz, sizeof(SColumnInfoData));
  for (int32_t i = 0; i < sz; i++) {
    SColumnInfoData data = {0};
    buf = taosDecodeFixedI16(buf, &data.info.colId);
    buf = taosDecodeFixedI16(buf, &data.info.type);
    buf = taosDecodeFixedI16(buf, &data.info.bytes);
    int32_t colSz = pBlock->info.rows * data.info.bytes;
    buf = taosDecodeBinary(buf, (void**)&data.pData, colSz);
    taosArrayPush(pBlock->pDataBlock, &data);
  }
L
Liu Jicong 已提交
135
  return (void*)buf;
H
Haojun Liao 已提交
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
}

static FORCE_INLINE int32_t tEncodeSMqConsumeRsp(void** buf, const SMqConsumeRsp* pRsp) {
  int32_t tlen = 0;
  int32_t sz = 0;
  tlen += taosEncodeFixedI64(buf, pRsp->consumerId);
  tlen += taosEncodeFixedI64(buf, pRsp->reqOffset);
  tlen += taosEncodeFixedI64(buf, pRsp->rspOffset);
  tlen += taosEncodeFixedI32(buf, pRsp->skipLogNum);
  tlen += taosEncodeFixedI32(buf, pRsp->numOfTopics);
  if (pRsp->numOfTopics == 0) return tlen;
  tlen += tEncodeSSchemaWrapper(buf, pRsp->schemas);
  if (pRsp->pBlockData) {
    sz = taosArrayGetSize(pRsp->pBlockData);
  }
  tlen += taosEncodeFixedI32(buf, sz);
  for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
153
    SSDataBlock* pBlock = (SSDataBlock*)taosArrayGet(pRsp->pBlockData, i);
H
Haojun Liao 已提交
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
    tlen += tEncodeDataBlock(buf, pBlock);
  }
  return tlen;
}

static FORCE_INLINE void* tDecodeSMqConsumeRsp(void* buf, SMqConsumeRsp* pRsp) {
  int32_t sz;
  buf = taosDecodeFixedI64(buf, &pRsp->consumerId);
  buf = taosDecodeFixedI64(buf, &pRsp->reqOffset);
  buf = taosDecodeFixedI64(buf, &pRsp->rspOffset);
  buf = taosDecodeFixedI32(buf, &pRsp->skipLogNum);
  buf = taosDecodeFixedI32(buf, &pRsp->numOfTopics);
  if (pRsp->numOfTopics == 0) return buf;
  pRsp->schemas = (SSchemaWrapper*)calloc(1, sizeof(SSchemaWrapper));
  if (pRsp->schemas == NULL) return NULL;
  buf = tDecodeSSchemaWrapper(buf, pRsp->schemas);
  buf = taosDecodeFixedI32(buf, &sz);
  pRsp->pBlockData = taosArrayInit(sz, sizeof(SSDataBlock));
  for (int32_t i = 0; i < sz; i++) {
    SSDataBlock block = {0};
    tDecodeDataBlock(buf, &block);
    taosArrayPush(pRsp->pBlockData, &block);
  }
  return buf;
}

static FORCE_INLINE void tDeleteSSDataBlock(SSDataBlock* pBlock) {
  if (pBlock == NULL) {
    return;
  }

L
Liu Jicong 已提交
185
  // int32_t numOfOutput = pBlock->info.numOfCols;
H
Haojun Liao 已提交
186
  int32_t sz = taosArrayGetSize(pBlock->pDataBlock);
L
Liu Jicong 已提交
187
  for (int32_t i = 0; i < sz; ++i) {
H
Haojun Liao 已提交
188 189 190 191 192 193
    SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
    tfree(pColInfoData->pData);
  }

  taosArrayDestroy(pBlock->pDataBlock);
  tfree(pBlock->pBlockAgg);
L
Liu Jicong 已提交
194
  // tfree(pBlock);
H
Haojun Liao 已提交
195 196 197 198 199 200 201 202 203
}

static FORCE_INLINE void tDeleteSMqConsumeRsp(SMqConsumeRsp* pRsp) {
  if (pRsp->schemas) {
    if (pRsp->schemas->nCols) {
      tfree(pRsp->schemas->pSchema);
    }
    free(pRsp->schemas);
  }
L
Liu Jicong 已提交
204 205
  taosArrayDestroyEx(pRsp->pBlockData, (void (*)(void*))tDeleteSSDataBlock);
  pRsp->pBlockData = NULL;
S
Shengliang Guan 已提交
206
  // for (int32_t i = 0; i < taosArrayGetSize(pRsp->pBlockData); i++) {
L
Liu Jicong 已提交
207 208
  // SSDataBlock* pDataBlock = (SSDataBlock*)taosArrayGet(pRsp->pBlockData, i);
  // tDeleteSSDataBlock(pDataBlock);
H
Haojun Liao 已提交
209 210 211
  //}
}

212 213
//======================================================================================================================
// the following structure shared by parser and executor
214
typedef struct SColumn {
H
Haojun Liao 已提交
215 216 217 218
  uint64_t    uid;
  char        name[TSDB_COL_NAME_LEN];
  int8_t      flag;  // column type: normal column, tag, or user-input column (integer/float/string)
  SColumnInfo info;
219 220
} SColumn;

221
typedef struct SLimit {
H
Haojun Liao 已提交
222 223
  int64_t limit;
  int64_t offset;
224 225 226
} SLimit;

typedef struct SOrder {
H
Haojun Liao 已提交
227 228
  uint32_t order;
  SColumn  col;
229 230 231
} SOrder;

typedef struct SGroupbyExpr {
L
Liu Jicong 已提交
232
  SArray* columnInfo;  // SArray<SColIndex>, group by columns information
H
Haojun Liao 已提交
233
  bool    groupbyTag;  // group by tag or column
234 235 236 237
} SGroupbyExpr;

// the structure for sql function in select clause
typedef struct SSqlExpr {
H
Haojun Liao 已提交
238 239 240
  char    token[TSDB_COL_NAME_LEN];  // original token
  SSchema resSchema;

L
Liu Jicong 已提交
241 242 243 244 245
  int32_t  numOfCols;
  SColumn* pColumns;     // data columns that are required by query
  int32_t  interBytes;   // inter result buffer size
  int16_t  numOfParams;  // argument value of each function
  SVariant param[3];     // parameters are not more than 3
246 247 248
} SSqlExpr;

typedef struct SExprInfo {
H
Haojun Liao 已提交
249
  struct SSqlExpr   base;
L
Liu Jicong 已提交
250
  struct tExprNode* pExpr;
251 252
} SExprInfo;

253 254 255 256 257
typedef struct SStateWindow {
  SColumn col;
} SStateWindow;

typedef struct SSessionWindow {
H
Haojun Liao 已提交
258
  int64_t gap;  // gap between two session window(in microseconds)
259 260 261
  SColumn col;
} SSessionWindow;

L
Liu Jicong 已提交
262
#define QUERY_ASC_FORWARD_STEP  1
263 264 265 266
#define QUERY_DESC_FORWARD_STEP -1

#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)

H
Haojun Liao 已提交
267 268 269 270
#ifdef __cplusplus
}
#endif

L
Liu Jicong 已提交
271
#endif /*_TD_COMMON_DEF_H_*/