tdatablock.h 11.3 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifndef _TD_COMMON_EP_H_
#define _TD_COMMON_EP_H_

S
common  
Shengliang Guan 已提交
19
#include "tcommon.h"
L
Liu Jicong 已提交
20
#include "tcompression.h"
S
Shengliang Guan 已提交
21
#include "tmsg.h"
22

H
Haojun Liao 已提交
23 24 25 26
#ifdef __cplusplus
extern "C" {
#endif

27 28 29 30 31
typedef struct SCorEpSet {
  int32_t version;
  SEpSet  epSet;
} SCorEpSet;

H
Haojun Liao 已提交
32
typedef struct SBlockOrderInfo {
H
Haojun Liao 已提交
33
  bool             nullFirst;
H
Haojun Liao 已提交
34
  int32_t          order;
H
Haojun Liao 已提交
35
  int32_t          slotId;
S
Shengliang Guan 已提交
36
  SColumnInfoData* pColData;
H
Haojun Liao 已提交
37 38
} SBlockOrderInfo;

S
Shengliang Guan 已提交
39 40
int32_t taosGetFqdnPortFromEp(const char* ep, SEp* pEp);
void    addEpIntoEpSet(SEpSet* pEpSet, const char* fqdn, uint16_t port);
H
Haojun Liao 已提交
41

S
Shengliang Guan 已提交
42
bool isEpsetEqual(const SEpSet* s1, const SEpSet* s2);
43

S
Shengliang Guan 已提交
44 45
void   updateEpSet_s(SCorEpSet* pEpSet, SEpSet* pNewEpSet);
SEpSet getEpSet_s(SCorEpSet* pEpSet);
46

H
Haojun Liao 已提交
47
#define NBIT                     (3u)
H
Haojun Liao 已提交
48
#define BitPos(_n)               ((_n) & ((1 << NBIT) - 1))
H
Haojun Liao 已提交
49 50
#define BMCharPos(bm_, r_)       ((bm_)[(r_) >> NBIT])
#define colDataIsNull_f(bm_, r_) ((BMCharPos(bm_, r_) & (1u << (7u - BitPos(r_)))) == (1u << (7u - BitPos(r_))))
51

H
Haojun Liao 已提交
52 53 54 55
#define colDataSetNull_f(bm_, r_)                    \
  do {                                               \
    BMCharPos(bm_, r_) |= (1u << (7u - BitPos(r_))); \
  } while (0)
H
Haojun Liao 已提交
56

S
shenglian zhou 已提交
57 58 59 60 61
#define colDataSetNotNull_f(bm_, r_)                  \
  do {                                                \
    BMCharPos(bm_, r_) &= ~(1u << (7u - BitPos(r_)));  \
  } while (0)

62 63 64
#define colDataIsNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] == -1)
#define colDataSetNull_var(pColumnInfoData, row)  (pColumnInfoData->varmeta.offset[row] = -1)

65 66 67 68 69 70
#define BitmapLen(_n) (((_n) + ((1 << NBIT) - 1)) >> NBIT)

#define colDataGetVarData(p1_, r_) ((p1_)->pData + (p1_)->varmeta.offset[(r_)])

#define colDataGetNumData(p1_, r_) ((p1_)->pData + ((r_) * (p1_)->info.bytes))
// SColumnInfoData, rowNumber
L
Liu Jicong 已提交
71 72
#define colDataGetData(p1_, r_) \
  ((IS_VAR_DATA_TYPE((p1_)->info.type)) ? colDataGetVarData(p1_, r_) : colDataGetNumData(p1_, r_))
73

L
Liu Jicong 已提交
74
static FORCE_INLINE bool colDataIsNull_s(const SColumnInfoData* pColumnInfoData, uint32_t row) {
L
Liu Jicong 已提交
75 76
  if (pColumnInfoData->info.type == TSDB_DATA_TYPE_JSON) {
    if (colDataIsNull_var(pColumnInfoData, row)) {
77 78
      return true;
    }
L
Liu Jicong 已提交
79
    char* data = colDataGetVarData(pColumnInfoData, row);
80
    return (*data == TSDB_DATA_TYPE_NULL);
81 82
  }

L
Liu Jicong 已提交
83 84 85
  if (!pColumnInfoData->hasNull) {
    return false;
  }
86

L
Liu Jicong 已提交
87
  if (pColumnInfoData->info.type == TSDB_DATA_TYPE_VARCHAR || pColumnInfoData->info.type == TSDB_DATA_TYPE_NCHAR) {
88
    return colDataIsNull_var(pColumnInfoData, row);
L
Liu Jicong 已提交
89 90 91 92 93 94 95 96 97
  } else {
    if (pColumnInfoData->nullbitmap == NULL) {
      return false;
    }

    return colDataIsNull_f(pColumnInfoData->nullbitmap, row);
  }
}

S
Shengliang Guan 已提交
98 99
static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, uint32_t totalRows, uint32_t row,
                                       SColumnDataAgg* pColAgg) {
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
  if (!pColumnInfoData->hasNull) {
    return false;
  }

  if (pColAgg != NULL) {
    if (pColAgg->numOfNull == totalRows) {
      ASSERT(pColumnInfoData->nullbitmap == NULL);
      return true;
    } else if (pColAgg->numOfNull == 0) {
      ASSERT(pColumnInfoData->nullbitmap == NULL);
      return false;
    }
  }

  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
115
    return colDataIsNull_var(pColumnInfoData, row);
116 117 118 119 120 121 122 123 124
  } else {
    if (pColumnInfoData->nullbitmap == NULL) {
      return false;
    }

    return colDataIsNull_f(pColumnInfoData->nullbitmap, row);
  }
}

H
Haojun Liao 已提交
125 126 127
static FORCE_INLINE void colDataAppendNULL(SColumnInfoData* pColumnInfoData, uint32_t currentRow) {
  // There is a placehold for each NULL value of binary or nchar type.
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
128
    colDataSetNull_var(pColumnInfoData, currentRow);  // it is a null value of VAR type.
H
Haojun Liao 已提交
129 130 131 132 133 134 135
  } else {
    colDataSetNull_f(pColumnInfoData->nullbitmap, currentRow);
  }

  pColumnInfoData->hasNull = true;
}

136 137
static FORCE_INLINE void colDataAppendNNULL(SColumnInfoData* pColumnInfoData, uint32_t start, size_t nRows) {
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
L
Liu Jicong 已提交
138
    for (int32_t i = start; i < start + nRows; ++i) {
L
Liu Jicong 已提交
139
      colDataSetNull_var(pColumnInfoData, i);  // it is a null value of VAR type.
140 141
    }
  } else {
L
Liu Jicong 已提交
142
    for (int32_t i = start; i < start + nRows; ++i) {
143 144 145 146 147 148 149
      colDataSetNull_f(pColumnInfoData->nullbitmap, i);
    }
  }

  pColumnInfoData->hasNull = true;
}

150
static FORCE_INLINE void colDataAppendInt8(SColumnInfoData* pColumnInfoData, uint32_t currentRow, int8_t* v) {
H
Haojun Liao 已提交
151 152 153 154 155 156
  ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_TINYINT ||
         pColumnInfoData->info.type == TSDB_DATA_TYPE_UTINYINT || pColumnInfoData->info.type == TSDB_DATA_TYPE_BOOL);
  char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow;
  *(int8_t*)p = *(int8_t*)v;
}

157
static FORCE_INLINE void colDataAppendInt16(SColumnInfoData* pColumnInfoData, uint32_t currentRow, int16_t* v) {
L
Liu Jicong 已提交
158 159
  ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_SMALLINT ||
         pColumnInfoData->info.type == TSDB_DATA_TYPE_USMALLINT);
H
Haojun Liao 已提交
160 161 162 163
  char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow;
  *(int16_t*)p = *(int16_t*)v;
}

164
static FORCE_INLINE void colDataAppendInt32(SColumnInfoData* pColumnInfoData, uint32_t currentRow, int32_t* v) {
H
Haojun Liao 已提交
165 166 167 168 169
  ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_INT || pColumnInfoData->info.type == TSDB_DATA_TYPE_UINT);
  char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow;
  *(int32_t*)p = *(int32_t*)v;
}

170
static FORCE_INLINE void colDataAppendInt64(SColumnInfoData* pColumnInfoData, uint32_t currentRow, int64_t* v) {
171 172
  int32_t type = pColumnInfoData->info.type;
  ASSERT(type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT || type == TSDB_DATA_TYPE_TIMESTAMP);
H
Haojun Liao 已提交
173 174 175 176
  char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow;
  *(int64_t*)p = *(int64_t*)v;
}

177
static FORCE_INLINE void colDataAppendFloat(SColumnInfoData* pColumnInfoData, uint32_t currentRow, float* v) {
H
Haojun Liao 已提交
178 179 180 181 182
  ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_FLOAT);
  char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow;
  *(float*)p = *(float*)v;
}

183
static FORCE_INLINE void colDataAppendDouble(SColumnInfoData* pColumnInfoData, uint32_t currentRow, double* v) {
H
Haojun Liao 已提交
184 185 186 187 188
  ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_DOUBLE);
  char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow;
  *(double*)p = *(double*)v;
}

H
Haojun Liao 已提交
189
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull);
190
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, int32_t* capacity, const SColumnInfoData* pSource,
L
Liu Jicong 已提交
191
                        uint32_t numOfRow2);
192
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows);
H
Haojun Liao 已提交
193
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock);
H
Haojun Liao 已提交
194

195 196
int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows);
void    colDataTrim(SColumnInfoData* pColumnInfoData);
H
Haojun Liao 已提交
197

H
Haojun Liao 已提交
198 199
size_t blockDataGetNumOfCols(const SSDataBlock* pBlock);
size_t blockDataGetNumOfRows(const SSDataBlock* pBlock);
H
Haojun Liao 已提交
200

201
int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc, SArray* pIndexMap);
L
Liu Jicong 已提交
202 203
int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startIndex, int32_t* stopIndex,
                           int32_t pageSize);
H
Haojun Liao 已提交
204
int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock);
205
int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf);
H
Haojun Liao 已提交
206
int32_t blockDataFromBuf1(SSDataBlock* pBlock, const char* buf, size_t capacity);
H
Haojun Liao 已提交
207

208 209
SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount);

H
Haojun Liao 已提交
210
size_t blockDataGetSize(const SSDataBlock* pBlock);
211
size_t blockDataGetRowSize(SSDataBlock* pBlock);
H
Haojun Liao 已提交
212 213 214
double blockDataGetSerialRowSize(const SSDataBlock* pBlock);
size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock);

H
Haojun Liao 已提交
215
int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo);
H
Haojun Liao 已提交
216
int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst);
H
Haojun Liao 已提交
217

218
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, size_t existRows, uint32_t numOfRows);
219
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows);
220

L
Liu Jicong 已提交
221 222
void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows);
void blockDataCleanup(SSDataBlock* pDataBlock);
223

L
Liu Jicong 已提交
224
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize);
225

L
Liu Jicong 已提交
226
int32_t blockDataTrimFirstNRows(SSDataBlock* pBlock, size_t n);
H
Haojun Liao 已提交
227

228
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData);
229

L
Liu Jicong 已提交
230
void blockDebugShowData(const SArray* dataBlocks);
L
Liu Jicong 已提交
231

C
Cary Xu 已提交
232
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId,
C
Cary Xu 已提交
233 234
                                 tb_uid_t uid, tb_uid_t suid);

L
Liu Jicong 已提交
235 236
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema);

L
Liu Jicong 已提交
237 238
static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
  return blockDataGetSerialMetaSize(pBlock) + blockDataGetSize(pBlock);
L
Liu Jicong 已提交
239 240
}

L
Liu Jicong 已提交
241
static FORCE_INLINE int32_t blockCompressColData(SColumnInfoData* pColRes, int32_t numOfRows, char* data,
L
Liu Jicong 已提交
242
                                                 int8_t compressed) {
L
Liu Jicong 已提交
243 244 245 246 247 248
  int32_t colSize = colDataGetLength(pColRes, numOfRows);
  return (*(tDataTypes[pColRes->info.type].compFunc))(pColRes->pData, colSize, numOfRows, data,
                                                      colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0);
}

static FORCE_INLINE void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols,
L
Liu Jicong 已提交
249
                                             int8_t needCompress) {
L
Liu Jicong 已提交
250
  int32_t* actualLen = (int32_t*)data;
251 252
  data += sizeof(int32_t);

L
Liu Jicong 已提交
253
  uint64_t* groupId = (uint64_t*)data;
254
  data += sizeof(uint64_t);
L
Liu Jicong 已提交
255

256
  int32_t* colSizes = (int32_t*)data;
L
Liu Jicong 已提交
257
  data += numOfCols * sizeof(int32_t);
258 259

  *dataLen = (numOfCols * sizeof(int32_t) + sizeof(uint64_t) + sizeof(int32_t));
L
Liu Jicong 已提交
260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290

  int32_t numOfRows = pBlock->info.rows;
  for (int32_t col = 0; col < numOfCols; ++col) {
    SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, col);

    // copy the null bitmap
    if (IS_VAR_DATA_TYPE(pColRes->info.type)) {
      size_t metaSize = numOfRows * sizeof(int32_t);
      memcpy(data, pColRes->varmeta.offset, metaSize);
      data += metaSize;
      (*dataLen) += metaSize;
    } else {
      int32_t len = BitmapLen(numOfRows);
      memcpy(data, pColRes->nullbitmap, len);
      data += len;
      (*dataLen) += len;
    }

    if (needCompress) {
      colSizes[col] = blockCompressColData(pColRes, numOfRows, data, needCompress);
      data += colSizes[col];
      (*dataLen) += colSizes[col];
    } else {
      colSizes[col] = colDataGetLength(pColRes, numOfRows);
      (*dataLen) += colSizes[col];
      memmove(data, pColRes->pData, colSizes[col]);
      data += colSizes[col];
    }

    colSizes[col] = htonl(colSizes[col]);
  }
291 292 293

  *actualLen = *dataLen;
  *groupId = pBlock->info.groupId;
L
Liu Jicong 已提交
294 295
}

H
Haojun Liao 已提交
296 297 298 299
#ifdef __cplusplus
}
#endif

L
Liu Jicong 已提交
300
#endif /*_TD_COMMON_EP_H_*/
301