tdatablock.h 10.1 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifndef _TD_COMMON_EP_H_
#define _TD_COMMON_EP_H_

S
common  
Shengliang Guan 已提交
19
#include "tcommon.h"
L
Liu Jicong 已提交
20
#include "tcompression.h"
S
Shengliang Guan 已提交
21
#include "tmsg.h"
22

H
Haojun Liao 已提交
23 24 25 26
#ifdef __cplusplus
extern "C" {
#endif

27 28 29 30 31
typedef struct SCorEpSet {
  int32_t version;
  SEpSet  epSet;
} SCorEpSet;

H
Haojun Liao 已提交
32
typedef struct SBlockOrderInfo {
H
Haojun Liao 已提交
33
  bool             nullFirst;
H
Haojun Liao 已提交
34
  int32_t          order;
H
Haojun Liao 已提交
35
  int32_t          slotId;
S
Shengliang Guan 已提交
36
  SColumnInfoData* pColData;
H
Haojun Liao 已提交
37 38
} SBlockOrderInfo;

S
Shengliang Guan 已提交
39 40
int32_t taosGetFqdnPortFromEp(const char* ep, SEp* pEp);
void    addEpIntoEpSet(SEpSet* pEpSet, const char* fqdn, uint16_t port);
H
Haojun Liao 已提交
41

S
Shengliang Guan 已提交
42
bool isEpsetEqual(const SEpSet* s1, const SEpSet* s2);
43

S
Shengliang Guan 已提交
44 45
void   updateEpSet_s(SCorEpSet* pEpSet, SEpSet* pNewEpSet);
SEpSet getEpSet_s(SCorEpSet* pEpSet);
46

H
Haojun Liao 已提交
47
#define NBIT                     (3u)
H
Haojun Liao 已提交
48
#define BitPos(_n)               ((_n) & ((1 << NBIT) - 1))
H
Haojun Liao 已提交
49 50
#define BMCharPos(bm_, r_)       ((bm_)[(r_) >> NBIT])
#define colDataIsNull_f(bm_, r_) ((BMCharPos(bm_, r_) & (1u << (7u - BitPos(r_)))) == (1u << (7u - BitPos(r_))))
51

H
Haojun Liao 已提交
52 53 54 55
#define colDataSetNull_f(bm_, r_)                    \
  do {                                               \
    BMCharPos(bm_, r_) |= (1u << (7u - BitPos(r_))); \
  } while (0)
H
Haojun Liao 已提交
56

S
shenglian zhou 已提交
57 58
#define colDataSetNotNull_f(bm_, r_)                  \
  do {                                                \
L
Liu Jicong 已提交
59
    BMCharPos(bm_, r_) &= ~(1u << (7u - BitPos(r_))); \
S
shenglian zhou 已提交
60 61
  } while (0)

L
Liu Jicong 已提交
62 63
#define colDataIsNull_var(pColumnInfoData, row)  (pColumnInfoData->varmeta.offset[row] == -1)
#define colDataSetNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] = -1)
64

65 66 67 68 69 70
#define BitmapLen(_n) (((_n) + ((1 << NBIT) - 1)) >> NBIT)

#define colDataGetVarData(p1_, r_) ((p1_)->pData + (p1_)->varmeta.offset[(r_)])

#define colDataGetNumData(p1_, r_) ((p1_)->pData + ((r_) * (p1_)->info.bytes))
// SColumnInfoData, rowNumber
L
Liu Jicong 已提交
71 72
#define colDataGetData(p1_, r_) \
  ((IS_VAR_DATA_TYPE((p1_)->info.type)) ? colDataGetVarData(p1_, r_) : colDataGetNumData(p1_, r_))
73

L
Liu Jicong 已提交
74
static FORCE_INLINE bool colDataIsNull_s(const SColumnInfoData* pColumnInfoData, uint32_t row) {
L
Liu Jicong 已提交
75 76
  if (pColumnInfoData->info.type == TSDB_DATA_TYPE_JSON) {
    if (colDataIsNull_var(pColumnInfoData, row)) {
77 78
      return true;
    }
L
Liu Jicong 已提交
79
    char* data = colDataGetVarData(pColumnInfoData, row);
80
    return (*data == TSDB_DATA_TYPE_NULL);
81 82
  }

L
Liu Jicong 已提交
83 84 85
  if (!pColumnInfoData->hasNull) {
    return false;
  }
86

L
Liu Jicong 已提交
87
  if (pColumnInfoData->info.type == TSDB_DATA_TYPE_VARCHAR || pColumnInfoData->info.type == TSDB_DATA_TYPE_NCHAR) {
88
    return colDataIsNull_var(pColumnInfoData, row);
L
Liu Jicong 已提交
89 90 91 92 93 94 95 96 97
  } else {
    if (pColumnInfoData->nullbitmap == NULL) {
      return false;
    }

    return colDataIsNull_f(pColumnInfoData->nullbitmap, row);
  }
}

S
Shengliang Guan 已提交
98 99
static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, uint32_t totalRows, uint32_t row,
                                       SColumnDataAgg* pColAgg) {
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
  if (!pColumnInfoData->hasNull) {
    return false;
  }

  if (pColAgg != NULL) {
    if (pColAgg->numOfNull == totalRows) {
      ASSERT(pColumnInfoData->nullbitmap == NULL);
      return true;
    } else if (pColAgg->numOfNull == 0) {
      ASSERT(pColumnInfoData->nullbitmap == NULL);
      return false;
    }
  }

  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
115
    return colDataIsNull_var(pColumnInfoData, row);
116 117 118 119 120 121 122 123 124
  } else {
    if (pColumnInfoData->nullbitmap == NULL) {
      return false;
    }

    return colDataIsNull_f(pColumnInfoData->nullbitmap, row);
  }
}

H
Haojun Liao 已提交
125 126 127
static FORCE_INLINE void colDataAppendNULL(SColumnInfoData* pColumnInfoData, uint32_t currentRow) {
  // There is a placehold for each NULL value of binary or nchar type.
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
128
    colDataSetNull_var(pColumnInfoData, currentRow);  // it is a null value of VAR type.
H
Haojun Liao 已提交
129 130 131 132 133 134 135
  } else {
    colDataSetNull_f(pColumnInfoData->nullbitmap, currentRow);
  }

  pColumnInfoData->hasNull = true;
}

136 137
static FORCE_INLINE void colDataAppendNNULL(SColumnInfoData* pColumnInfoData, uint32_t start, size_t nRows) {
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
L
Liu Jicong 已提交
138
    for (int32_t i = start; i < start + nRows; ++i) {
L
Liu Jicong 已提交
139
      colDataSetNull_var(pColumnInfoData, i);  // it is a null value of VAR type.
140 141
    }
  } else {
L
Liu Jicong 已提交
142
    for (int32_t i = start; i < start + nRows; ++i) {
143 144 145 146 147 148 149
      colDataSetNull_f(pColumnInfoData->nullbitmap, i);
    }
  }

  pColumnInfoData->hasNull = true;
}

150
static FORCE_INLINE void colDataAppendInt8(SColumnInfoData* pColumnInfoData, uint32_t currentRow, int8_t* v) {
H
Haojun Liao 已提交
151 152 153 154 155 156
  ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_TINYINT ||
         pColumnInfoData->info.type == TSDB_DATA_TYPE_UTINYINT || pColumnInfoData->info.type == TSDB_DATA_TYPE_BOOL);
  char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow;
  *(int8_t*)p = *(int8_t*)v;
}

157
static FORCE_INLINE void colDataAppendInt16(SColumnInfoData* pColumnInfoData, uint32_t currentRow, int16_t* v) {
L
Liu Jicong 已提交
158 159
  ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_SMALLINT ||
         pColumnInfoData->info.type == TSDB_DATA_TYPE_USMALLINT);
H
Haojun Liao 已提交
160 161 162 163
  char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow;
  *(int16_t*)p = *(int16_t*)v;
}

164
static FORCE_INLINE void colDataAppendInt32(SColumnInfoData* pColumnInfoData, uint32_t currentRow, int32_t* v) {
H
Haojun Liao 已提交
165 166 167 168 169
  ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_INT || pColumnInfoData->info.type == TSDB_DATA_TYPE_UINT);
  char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow;
  *(int32_t*)p = *(int32_t*)v;
}

170
static FORCE_INLINE void colDataAppendInt64(SColumnInfoData* pColumnInfoData, uint32_t currentRow, int64_t* v) {
171 172
  int32_t type = pColumnInfoData->info.type;
  ASSERT(type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT || type == TSDB_DATA_TYPE_TIMESTAMP);
H
Haojun Liao 已提交
173 174 175 176
  char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow;
  *(int64_t*)p = *(int64_t*)v;
}

177
static FORCE_INLINE void colDataAppendFloat(SColumnInfoData* pColumnInfoData, uint32_t currentRow, float* v) {
H
Haojun Liao 已提交
178 179 180 181 182
  ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_FLOAT);
  char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow;
  *(float*)p = *(float*)v;
}

183
static FORCE_INLINE void colDataAppendDouble(SColumnInfoData* pColumnInfoData, uint32_t currentRow, double* v) {
H
Haojun Liao 已提交
184 185 186 187 188
  ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_DOUBLE);
  char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow;
  *(double*)p = *(double*)v;
}

H
Haojun Liao 已提交
189
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull);
L
Liu Jicong 已提交
190 191
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, int32_t* capacity,
                        const SColumnInfoData* pSource, uint32_t numOfRow2);
192
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows);
193
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex);
H
Haojun Liao 已提交
194

195 196
int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows);
void    colDataTrim(SColumnInfoData* pColumnInfoData);
H
Haojun Liao 已提交
197

H
Haojun Liao 已提交
198 199
size_t blockDataGetNumOfCols(const SSDataBlock* pBlock);
size_t blockDataGetNumOfRows(const SSDataBlock* pBlock);
H
Haojun Liao 已提交
200

201
int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc);
L
Liu Jicong 已提交
202 203
int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startIndex, int32_t* stopIndex,
                           int32_t pageSize);
H
Haojun Liao 已提交
204
int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock);
205
int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf);
H
Haojun Liao 已提交
206
int32_t blockDataFromBuf1(SSDataBlock* pBlock, const char* buf, size_t capacity);
H
Haojun Liao 已提交
207

208 209
SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount);

H
Haojun Liao 已提交
210
size_t blockDataGetSize(const SSDataBlock* pBlock);
211
size_t blockDataGetRowSize(SSDataBlock* pBlock);
H
Haojun Liao 已提交
212 213 214
double blockDataGetSerialRowSize(const SSDataBlock* pBlock);
size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock);

H
Haojun Liao 已提交
215
int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo);
H
Haojun Liao 已提交
216
int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst);
H
Haojun Liao 已提交
217

218
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, size_t existRows, uint32_t numOfRows);
219
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows);
220

L
Liu Jicong 已提交
221 222
void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows);
void blockDataCleanup(SSDataBlock* pDataBlock);
223

L
Liu Jicong 已提交
224
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize);
225

L
Liu Jicong 已提交
226
int32_t blockDataTrimFirstNRows(SSDataBlock* pBlock, size_t n);
H
Haojun Liao 已提交
227

228
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData);
229

230 231 232
void  blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols, int8_t needCompress);
const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRows, const char* pData);

L
Liu Jicong 已提交
233
void blockDebugShowData(const SArray* dataBlocks);
L
Liu Jicong 已提交
234

C
Cary Xu 已提交
235
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId,
236
                                    tb_uid_t suid);
C
Cary Xu 已提交
237

L
Liu Jicong 已提交
238 239
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid,
                            const char* stbFullName, int32_t vgId);
L
Liu Jicong 已提交
240

L
Liu Jicong 已提交
241 242
static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
  return blockDataGetSerialMetaSize(pBlock) + blockDataGetSize(pBlock);
L
Liu Jicong 已提交
243 244
}

L
Liu Jicong 已提交
245
static FORCE_INLINE int32_t blockCompressColData(SColumnInfoData* pColRes, int32_t numOfRows, char* data,
L
Liu Jicong 已提交
246
                                                 int8_t compressed) {
L
Liu Jicong 已提交
247 248 249 250 251
  int32_t colSize = colDataGetLength(pColRes, numOfRows);
  return (*(tDataTypes[pColRes->info.type].compFunc))(pColRes->pData, colSize, numOfRows, data,
                                                      colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0);
}

H
Haojun Liao 已提交
252 253 254 255
#ifdef __cplusplus
}
#endif

L
Liu Jicong 已提交
256
#endif /*_TD_COMMON_EP_H_*/