tdatablock.h 11.1 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifndef _TD_COMMON_EP_H_
#define _TD_COMMON_EP_H_

S
common  
Shengliang Guan 已提交
19
#include "tcommon.h"
L
Liu Jicong 已提交
20
#include "tcompression.h"
S
Shengliang Guan 已提交
21
#include "tmsg.h"
22

H
Haojun Liao 已提交
23 24 25 26
#ifdef __cplusplus
extern "C" {
#endif

27 28 29 30 31
typedef struct SCorEpSet {
  int32_t version;
  SEpSet  epSet;
} SCorEpSet;

H
Haojun Liao 已提交
32
typedef struct SBlockOrderInfo {
H
Haojun Liao 已提交
33
  bool             nullFirst;
H
Haojun Liao 已提交
34
  int32_t          order;
H
Haojun Liao 已提交
35
  int32_t          slotId;
S
Shengliang Guan 已提交
36
  SColumnInfoData* pColData;
H
Haojun Liao 已提交
37 38
} SBlockOrderInfo;

S
Shengliang Guan 已提交
39 40
int32_t taosGetFqdnPortFromEp(const char* ep, SEp* pEp);
void    addEpIntoEpSet(SEpSet* pEpSet, const char* fqdn, uint16_t port);
H
Haojun Liao 已提交
41

S
Shengliang Guan 已提交
42
bool isEpsetEqual(const SEpSet* s1, const SEpSet* s2);
43

S
Shengliang Guan 已提交
44 45
void   updateEpSet_s(SCorEpSet* pEpSet, SEpSet* pNewEpSet);
SEpSet getEpSet_s(SCorEpSet* pEpSet);
46

H
Haojun Liao 已提交
47
#define NBIT                     (3u)
H
Haojun Liao 已提交
48
#define BitPos(_n)               ((_n) & ((1 << NBIT) - 1))
H
Haojun Liao 已提交
49 50
#define BMCharPos(bm_, r_)       ((bm_)[(r_) >> NBIT])
#define colDataIsNull_f(bm_, r_) ((BMCharPos(bm_, r_) & (1u << (7u - BitPos(r_)))) == (1u << (7u - BitPos(r_))))
51

H
Haojun Liao 已提交
52 53 54 55
#define colDataSetNull_f(bm_, r_)                    \
  do {                                               \
    BMCharPos(bm_, r_) |= (1u << (7u - BitPos(r_))); \
  } while (0)
H
Haojun Liao 已提交
56

57 58 59
#define colDataIsNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] == -1)
#define colDataSetNull_var(pColumnInfoData, row)  (pColumnInfoData->varmeta.offset[row] = -1)

60 61 62 63 64 65 66 67 68 69
#define BitmapLen(_n) (((_n) + ((1 << NBIT) - 1)) >> NBIT)

#define colDataGetVarData(p1_, r_) ((p1_)->pData + (p1_)->varmeta.offset[(r_)])

#define colDataGetNumData(p1_, r_) ((p1_)->pData + ((r_) * (p1_)->info.bytes))
// SColumnInfoData, rowNumber
#define colDataGetData(p1_, r_)                                        \
  ((IS_VAR_DATA_TYPE((p1_)->info.type)) ?  colDataGetVarData(p1_, r_)  \
                                        :  colDataGetNumData(p1_, r_))

L
Liu Jicong 已提交
70
static FORCE_INLINE bool colDataIsNull_s(const SColumnInfoData* pColumnInfoData, uint32_t row) {
71 72 73 74 75 76
  if (pColumnInfoData->info.type == TSDB_DATA_TYPE_JSON){
    if(colDataIsNull_var(pColumnInfoData, row)){
      return true;
    }
    char *data = colDataGetVarData(pColumnInfoData, row);
    return (*data == TSDB_DATA_TYPE_NULL);
77 78
  }

L
Liu Jicong 已提交
79 80 81
  if (!pColumnInfoData->hasNull) {
    return false;
  }
82 83

  if (pColumnInfoData->info.type== TSDB_DATA_TYPE_VARCHAR || pColumnInfoData->info.type == TSDB_DATA_TYPE_NCHAR) {
84
    return colDataIsNull_var(pColumnInfoData, row);
L
Liu Jicong 已提交
85 86 87 88 89 90 91 92 93
  } else {
    if (pColumnInfoData->nullbitmap == NULL) {
      return false;
    }

    return colDataIsNull_f(pColumnInfoData->nullbitmap, row);
  }
}

S
Shengliang Guan 已提交
94 95
static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, uint32_t totalRows, uint32_t row,
                                       SColumnDataAgg* pColAgg) {
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
  if (!pColumnInfoData->hasNull) {
    return false;
  }

  if (pColAgg != NULL) {
    if (pColAgg->numOfNull == totalRows) {
      ASSERT(pColumnInfoData->nullbitmap == NULL);
      return true;
    } else if (pColAgg->numOfNull == 0) {
      ASSERT(pColumnInfoData->nullbitmap == NULL);
      return false;
    }
  }

  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
111
    return colDataIsNull_var(pColumnInfoData, row);
112 113 114 115 116 117 118 119 120
  } else {
    if (pColumnInfoData->nullbitmap == NULL) {
      return false;
    }

    return colDataIsNull_f(pColumnInfoData->nullbitmap, row);
  }
}

H
Haojun Liao 已提交
121 122 123
static FORCE_INLINE void colDataAppendNULL(SColumnInfoData* pColumnInfoData, uint32_t currentRow) {
  // There is a placehold for each NULL value of binary or nchar type.
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
124
    colDataSetNull_var(pColumnInfoData, currentRow);  // it is a null value of VAR type.
H
Haojun Liao 已提交
125 126 127 128 129 130 131
  } else {
    colDataSetNull_f(pColumnInfoData->nullbitmap, currentRow);
  }

  pColumnInfoData->hasNull = true;
}

132 133
static FORCE_INLINE void colDataAppendNNULL(SColumnInfoData* pColumnInfoData, uint32_t start, size_t nRows) {
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
L
Liu Jicong 已提交
134
    for (int32_t i = start; i < start + nRows; ++i) {
135
      colDataSetNull_var(pColumnInfoData,i);  // it is a null value of VAR type.
136 137
    }
  } else {
L
Liu Jicong 已提交
138
    for (int32_t i = start; i < start + nRows; ++i) {
139 140 141 142 143 144 145
      colDataSetNull_f(pColumnInfoData->nullbitmap, i);
    }
  }

  pColumnInfoData->hasNull = true;
}

146
static FORCE_INLINE void colDataAppendInt8(SColumnInfoData* pColumnInfoData, uint32_t currentRow, int8_t* v) {
H
Haojun Liao 已提交
147 148 149 150 151 152
  ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_TINYINT ||
         pColumnInfoData->info.type == TSDB_DATA_TYPE_UTINYINT || pColumnInfoData->info.type == TSDB_DATA_TYPE_BOOL);
  char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow;
  *(int8_t*)p = *(int8_t*)v;
}

153
static FORCE_INLINE void colDataAppendInt16(SColumnInfoData* pColumnInfoData, uint32_t currentRow, int16_t* v) {
L
Liu Jicong 已提交
154 155
  ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_SMALLINT ||
         pColumnInfoData->info.type == TSDB_DATA_TYPE_USMALLINT);
H
Haojun Liao 已提交
156 157 158 159
  char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow;
  *(int16_t*)p = *(int16_t*)v;
}

160
static FORCE_INLINE void colDataAppendInt32(SColumnInfoData* pColumnInfoData, uint32_t currentRow, int32_t* v) {
H
Haojun Liao 已提交
161 162 163 164 165
  ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_INT || pColumnInfoData->info.type == TSDB_DATA_TYPE_UINT);
  char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow;
  *(int32_t*)p = *(int32_t*)v;
}

166
static FORCE_INLINE void colDataAppendInt64(SColumnInfoData* pColumnInfoData, uint32_t currentRow, int64_t* v) {
167 168
  int32_t type = pColumnInfoData->info.type;
  ASSERT(type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT || type == TSDB_DATA_TYPE_TIMESTAMP);
H
Haojun Liao 已提交
169 170 171 172
  char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow;
  *(int64_t*)p = *(int64_t*)v;
}

173
static FORCE_INLINE void colDataAppendFloat(SColumnInfoData* pColumnInfoData, uint32_t currentRow, float* v) {
H
Haojun Liao 已提交
174 175 176 177 178
  ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_FLOAT);
  char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow;
  *(float*)p = *(float*)v;
}

179
static FORCE_INLINE void colDataAppendDouble(SColumnInfoData* pColumnInfoData, uint32_t currentRow, double* v) {
H
Haojun Liao 已提交
180 181 182 183 184
  ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_DOUBLE);
  char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow;
  *(double*)p = *(double*)v;
}

H
Haojun Liao 已提交
185
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull);
186
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, int32_t* capacity, const SColumnInfoData* pSource,
L
Liu Jicong 已提交
187
                        uint32_t numOfRow2);
188
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows);
H
Haojun Liao 已提交
189
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock);
H
Haojun Liao 已提交
190

191 192
int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows);
void    colDataTrim(SColumnInfoData* pColumnInfoData);
H
Haojun Liao 已提交
193

H
Haojun Liao 已提交
194 195
size_t blockDataGetNumOfCols(const SSDataBlock* pBlock);
size_t blockDataGetNumOfRows(const SSDataBlock* pBlock);
H
Haojun Liao 已提交
196

197
int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc, SArray* pIndexMap);
L
Liu Jicong 已提交
198 199
int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startIndex, int32_t* stopIndex,
                           int32_t pageSize);
H
Haojun Liao 已提交
200
int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock);
201
int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf);
H
Haojun Liao 已提交
202
int32_t blockDataFromBuf1(SSDataBlock* pBlock, const char* buf, size_t capacity);
H
Haojun Liao 已提交
203

204 205
SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount);

H
Haojun Liao 已提交
206
size_t blockDataGetSize(const SSDataBlock* pBlock);
207
size_t blockDataGetRowSize(SSDataBlock* pBlock);
H
Haojun Liao 已提交
208 209 210
double blockDataGetSerialRowSize(const SSDataBlock* pBlock);
size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock);

H
Haojun Liao 已提交
211
int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo);
H
Haojun Liao 已提交
212
int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst);
H
Haojun Liao 已提交
213

214
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, size_t existRows, uint32_t numOfRows);
215
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows);
216

L
Liu Jicong 已提交
217 218
void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows);
void blockDataCleanup(SSDataBlock* pDataBlock);
219

L
Liu Jicong 已提交
220
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize);
221

L
Liu Jicong 已提交
222
int32_t blockDataTrimFirstNRows(SSDataBlock* pBlock, size_t n);
H
Haojun Liao 已提交
223

224
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData);
225

L
Liu Jicong 已提交
226
void blockDebugShowData(const SArray* dataBlocks);
L
Liu Jicong 已提交
227

C
Cary Xu 已提交
228 229 230
void buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId,
                                 tb_uid_t uid, tb_uid_t suid);

L
Liu Jicong 已提交
231 232
static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
  return blockDataGetSerialMetaSize(pBlock) + blockDataGetSize(pBlock);
L
Liu Jicong 已提交
233 234
}

L
Liu Jicong 已提交
235
static FORCE_INLINE int32_t blockCompressColData(SColumnInfoData* pColRes, int32_t numOfRows, char* data,
L
Liu Jicong 已提交
236
                                                 int8_t compressed) {
L
Liu Jicong 已提交
237 238 239 240 241 242
  int32_t colSize = colDataGetLength(pColRes, numOfRows);
  return (*(tDataTypes[pColRes->info.type].compFunc))(pColRes->pData, colSize, numOfRows, data,
                                                      colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0);
}

static FORCE_INLINE void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols,
L
Liu Jicong 已提交
243
                                             int8_t needCompress) {
244 245 246 247 248
  int32_t* actualLen = (int32_t*) data;
  data += sizeof(int32_t);

  uint64_t* groupId = (uint64_t*) data;
  data += sizeof(uint64_t);
L
Liu Jicong 已提交
249

250
  int32_t* colSizes = (int32_t*)data;
L
Liu Jicong 已提交
251
  data += numOfCols * sizeof(int32_t);
252 253

  *dataLen = (numOfCols * sizeof(int32_t) + sizeof(uint64_t) + sizeof(int32_t));
L
Liu Jicong 已提交
254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284

  int32_t numOfRows = pBlock->info.rows;
  for (int32_t col = 0; col < numOfCols; ++col) {
    SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, col);

    // copy the null bitmap
    if (IS_VAR_DATA_TYPE(pColRes->info.type)) {
      size_t metaSize = numOfRows * sizeof(int32_t);
      memcpy(data, pColRes->varmeta.offset, metaSize);
      data += metaSize;
      (*dataLen) += metaSize;
    } else {
      int32_t len = BitmapLen(numOfRows);
      memcpy(data, pColRes->nullbitmap, len);
      data += len;
      (*dataLen) += len;
    }

    if (needCompress) {
      colSizes[col] = blockCompressColData(pColRes, numOfRows, data, needCompress);
      data += colSizes[col];
      (*dataLen) += colSizes[col];
    } else {
      colSizes[col] = colDataGetLength(pColRes, numOfRows);
      (*dataLen) += colSizes[col];
      memmove(data, pColRes->pData, colSizes[col]);
      data += colSizes[col];
    }

    colSizes[col] = htonl(colSizes[col]);
  }
285 286 287

  *actualLen = *dataLen;
  *groupId = pBlock->info.groupId;
L
Liu Jicong 已提交
288 289
}

H
Haojun Liao 已提交
290 291 292 293
#ifdef __cplusplus
}
#endif

L
Liu Jicong 已提交
294
#endif /*_TD_COMMON_EP_H_*/
295