tdataformat.c 32.6 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
S
common  
Shengliang Guan 已提交
15 16

#define _DEFAULT_SOURCE
S
slguan 已提交
17
#include "tdataformat.h"
S
Shengliang Guan 已提交
18
#include "tcoding.h"
L
Liu Jicong 已提交
19
#include "tdatablock.h"
S
log  
Shengliang Guan 已提交
20
#include "tlog.h"
H
more  
hzcheng 已提交
21

H
Hongze Cheng 已提交
22
typedef struct SKVIdx {
H
Hongze Cheng 已提交
23 24
  int32_t cid;
  int32_t offset;
H
Hongze Cheng 已提交
25
} SKVIdx;
H
Hongze Cheng 已提交
26

H
Hongze Cheng 已提交
27 28 29 30 31 32 33
#pragma pack(push, 1)
typedef struct {
  int16_t nCols;
  SKVIdx  idx[];
} STSKVRow;
#pragma pack(pop)

H
Hongze Cheng 已提交
34 35
#pragma pack(push, 1)
struct STag {
H
Hongze Cheng 已提交
36 37 38 39 40
  int8_t  isJson;
  int16_t len;
  int16_t nTag;
  int32_t ver;
  int16_t idx[];
H
Hongze Cheng 已提交
41 42 43
};
#pragma pack(pop)

H
Hongze Cheng 已提交
44
#define TSROW_IS_KV_ROW(r) ((r)->flags & TSROW_KV_ROW)
H
Hongze Cheng 已提交
45 46
#define BIT1_SIZE(n)       (((n)-1) / 8 + 1)
#define BIT2_SIZE(n)       (((n)-1) / 4 + 1)
H
Hongze Cheng 已提交
47 48
#define SET_BIT1(p, i, v)  ((p)[(i) / 8] = (p)[(i) / 8] & (~(((uint8_t)1) << ((i) % 8))) | ((v) << ((i) % 8)))
#define SET_BIT2(p, i, v)  ((p)[(i) / 4] = (p)[(i) / 4] & (~(((uint8_t)3) << ((i) % 4))) | ((v) << ((i) % 4)))
H
Hongze Cheng 已提交
49 50
#define GET_BIT1(p, i)     (((p)[(i) / 8] >> ((i) % 8)) & ((uint8_t)1))
#define GET_BIT2(p, i)     (((p)[(i) / 4] >> ((i) % 4)) & ((uint8_t)3))
H
Hongze Cheng 已提交
51

H
Hongze Cheng 已提交
52 53
static FORCE_INLINE int tSKVIdxCmprFn(const void *p1, const void *p2);

H
Hongze Cheng 已提交
54
// STSRow2
H
Hongze Cheng 已提交
55 56
int32_t tPutTSRow(uint8_t *p, STSRow2 *pRow) {
  int32_t n = 0;
H
Hongze Cheng 已提交
57

H
Hongze Cheng 已提交
58 59 60 61 62
  n += tPutI64(p ? p + n : p, pRow->ts);
  n += tPutI8(p ? p + n : p, pRow->flags);
  n += tPutI32v(p ? p + n : p, pRow->sver);

  ASSERT(pRow->flags & 0xf);
H
Hongze Cheng 已提交
63 64 65 66

  switch (pRow->flags & 0xf) {
    case TSROW_HAS_NONE:
    case TSROW_HAS_NULL:
H
Hongze Cheng 已提交
67
      break;
H
Hongze Cheng 已提交
68
    default:
H
Hongze Cheng 已提交
69
      n += tPutBinary(p ? p + n : p, pRow->pData, pRow->nData);
H
Hongze Cheng 已提交
70
      break;
H
Hongze Cheng 已提交
71
  }
H
Hongze Cheng 已提交
72

H
Hongze Cheng 已提交
73
  return n;
H
Hongze Cheng 已提交
74 75
}

H
Hongze Cheng 已提交
76 77 78
int32_t tGetTSRow(uint8_t *p, STSRow2 *pRow) {
  int32_t n = 0;
  uint8_t flags;
H
Hongze Cheng 已提交
79

H
Hongze Cheng 已提交
80 81 82
  n += tGetI64(p + n, pRow ? &pRow->ts : NULL);
  n += tGetI8(p + n, pRow ? &pRow->flags : &flags);
  n += tGetI32v(p + n, pRow ? &pRow->sver : NULL);
H
Hongze Cheng 已提交
83

H
Hongze Cheng 已提交
84 85
  if (pRow) flags = pRow->flags;
  switch (flags & 0xf) {
H
Hongze Cheng 已提交
86 87
    case TSROW_HAS_NONE:
    case TSROW_HAS_NULL:
H
Hongze Cheng 已提交
88
      break;
H
Hongze Cheng 已提交
89
    default:
H
Hongze Cheng 已提交
90
      n += tGetBinary(p + n, pRow ? &pRow->pData : NULL, pRow ? &pRow->nData : NULL);
H
Hongze Cheng 已提交
91
      break;
H
Hongze Cheng 已提交
92
  }
H
Hongze Cheng 已提交
93

H
Hongze Cheng 已提交
94
  return n;
H
Hongze Cheng 已提交
95 96
}

H
Hongze Cheng 已提交
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
int32_t tTSRowDup(const STSRow2 *pRow, STSRow2 **ppRow) {
  (*ppRow) = taosMemoryMalloc(sizeof(*pRow) + pRow->nData);
  if (*ppRow == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  (*ppRow)->ts = pRow->ts;
  (*ppRow)->flags = pRow->flags;
  (*ppRow)->sver = pRow->sver;
  (*ppRow)->nData = pRow->nData;
  if (pRow->nData) {
    (*ppRow)->pData = (uint8_t *)(&(*ppRow)[1]);
    memcpy((*ppRow)->pData, pRow->pData, pRow->nData);
  } else {
    (*ppRow)->pData = NULL;
  }

  return 0;
}

void tTSRowFree(STSRow2 *pRow) {
  if (pRow) taosMemoryFree(pRow);
}

H
Hongze Cheng 已提交
122
int32_t tTSRowGet(const STSRow2 *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal) {
H
Hongze Cheng 已提交
123 124 125 126 127 128 129
  uint32_t  n;
  uint8_t  *p;
  uint8_t   v;
  int32_t   bidx = iCol - 1;
  STColumn *pTColumn = &pTSchema->columns[iCol];
  STSKVRow *pTSKVRow;
  SKVIdx   *pKVIdx;
H
Hongze Cheng 已提交
130

H
Hongze Cheng 已提交
131
  ASSERT(iCol != 0);
H
Hongze Cheng 已提交
132 133 134 135 136
  ASSERT(pTColumn->colId != 0);

  ASSERT(pRow->flags & 0xf != 0);
  switch (pRow->flags & 0xf) {
    case TSROW_HAS_NONE:
H
Hongze Cheng 已提交
137
      *pColVal = ColValNONE;
H
Hongze Cheng 已提交
138 139
      return 0;
    case TSROW_HAS_NULL:
H
Hongze Cheng 已提交
140
      *pColVal = ColValNULL;
H
Hongze Cheng 已提交
141 142 143 144 145 146 147
      return 0;
  }

  if (TSROW_IS_KV_ROW(pRow)) {
    ASSERT((pRow->flags & 0xf) != TSROW_HAS_VAL);

    pTSKVRow = (STSKVRow *)pRow->pData;
H
Hongze Cheng 已提交
148 149
    pKVIdx =
        bsearch(&((SKVIdx){.cid = pTColumn->colId}), pTSKVRow->idx, pTSKVRow->nCols, sizeof(SKVIdx), tSKVIdxCmprFn);
H
Hongze Cheng 已提交
150
    if (pKVIdx == NULL) {
H
Hongze Cheng 已提交
151
      *pColVal = ColValNONE;
H
Hongze Cheng 已提交
152
    } else if (pKVIdx->offset < 0) {
H
Hongze Cheng 已提交
153
      *pColVal = ColValNULL;
H
Hongze Cheng 已提交
154 155
    } else {
      p = pRow->pData + sizeof(STSKVRow) + sizeof(SKVIdx) * pTSKVRow->nCols + pKVIdx->offset;
H
Hongze Cheng 已提交
156 157
      pColVal->type = COL_VAL_DATA;
      tGetBinary(p, &pColVal->pData, &pColVal->nData);
H
Hongze Cheng 已提交
158 159 160
    }
  } else {
    // get bitmap
H
Hongze Cheng 已提交
161
    p = pRow->pData;
H
Hongze Cheng 已提交
162
    switch (pRow->flags & 0xf) {
H
Hongze Cheng 已提交
163
      case TSROW_HAS_NULL | TSROW_HAS_NONE:
H
Hongze Cheng 已提交
164 165
        v = GET_BIT1(p, bidx);
        if (v == 0) {
H
Hongze Cheng 已提交
166
          *pColVal = ColValNONE;
H
Hongze Cheng 已提交
167
        } else {
H
Hongze Cheng 已提交
168
          *pColVal = ColValNULL;
H
Hongze Cheng 已提交
169
        }
H
Hongze Cheng 已提交
170
        return 0;
H
Hongze Cheng 已提交
171
      case TSROW_HAS_VAL | TSROW_HAS_NONE:
H
Hongze Cheng 已提交
172 173
        v = GET_BIT1(p, bidx);
        if (v == 1) {
H
Hongze Cheng 已提交
174
          p = p + BIT1_SIZE(pTSchema->numOfCols - 1);
H
Hongze Cheng 已提交
175 176
          break;
        } else {
H
Hongze Cheng 已提交
177
          *pColVal = ColValNONE;
H
Hongze Cheng 已提交
178 179
          return 0;
        }
H
Hongze Cheng 已提交
180
      case TSROW_HAS_VAL | TSROW_HAS_NULL:
H
Hongze Cheng 已提交
181 182
        v = GET_BIT1(p, bidx);
        if (v == 1) {
H
Hongze Cheng 已提交
183
          p = p + BIT1_SIZE(pTSchema->numOfCols - 1);
H
Hongze Cheng 已提交
184
          break;
H
Hongze Cheng 已提交
185
        } else {
H
Hongze Cheng 已提交
186
          *pColVal = ColValNULL;
H
Hongze Cheng 已提交
187
          return 0;
H
Hongze Cheng 已提交
188
        }
H
Hongze Cheng 已提交
189
      case TSROW_HAS_VAL | TSROW_HAS_NULL | TSROW_HAS_NONE:
H
Hongze Cheng 已提交
190 191
        v = GET_BIT2(p, bidx);
        if (v == 0) {
H
Hongze Cheng 已提交
192
          *pColVal = ColValNONE;
H
Hongze Cheng 已提交
193 194
          return 0;
        } else if (v == 1) {
H
Hongze Cheng 已提交
195
          *pColVal = ColValNULL;
H
Hongze Cheng 已提交
196 197
          return 0;
        } else if (v == 2) {
H
Hongze Cheng 已提交
198
          p = p + BIT2_SIZE(pTSchema->numOfCols - 1);
H
Hongze Cheng 已提交
199 200 201 202
          break;
        } else {
          ASSERT(0);
        }
H
Hongze Cheng 已提交
203
      default:
H
Hongze Cheng 已提交
204
        break;
H
Hongze Cheng 已提交
205
    }
H
Hongze Cheng 已提交
206 207 208

    // get real value
    p = p + pTColumn->offset;
H
Hongze Cheng 已提交
209
    pColVal->type = COL_VAL_DATA;
H
Hongze Cheng 已提交
210
    if (IS_VAR_DATA_TYPE(pTColumn->type)) {
H
Hongze Cheng 已提交
211
      tGetBinary(p + pTSchema->flen + *(int32_t *)p, &pColVal->pData, &pColVal->nData);
H
Hongze Cheng 已提交
212
    } else {
H
Hongze Cheng 已提交
213 214
      pColVal->pData = p;
      pColVal->nData = pTColumn->bytes;
H
Hongze Cheng 已提交
215
    }
H
Hongze Cheng 已提交
216
  }
H
Hongze Cheng 已提交
217

H
Hongze Cheng 已提交
218 219 220 221
  return 0;
}

// STSchema
H
Hongze Cheng 已提交
222 223 224 225 226 227 228
int32_t tTSchemaCreate(int32_t sver, SSchema *pSchema, int32_t ncols, STSchema **ppTSchema) {
  *ppTSchema = (STSchema *)taosMemoryMalloc(sizeof(STSchema) + sizeof(STColumn) * ncols);
  if (*ppTSchema == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

H
Hongze Cheng 已提交
229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253
  (*ppTSchema)->numOfCols = ncols;
  (*ppTSchema)->version = sver;
  (*ppTSchema)->flen = 0;
  (*ppTSchema)->vlen = 0;
  (*ppTSchema)->tlen = 0;

  for (int32_t iCol = 0; iCol < ncols; iCol++) {
    SSchema  *pColumn = &pSchema[iCol];
    STColumn *pTColumn = &((*ppTSchema)->columns[iCol]);

    pTColumn->colId = pColumn->colId;
    pTColumn->type = pColumn->type;
    pTColumn->flags = pColumn->flags;
    pTColumn->bytes = pColumn->bytes;
    pTColumn->offset = (*ppTSchema)->flen;

    // skip first column
    if (iCol) {
      (*ppTSchema)->flen += TYPE_BYTES[pColumn->type];
      if (IS_VAR_DATA_TYPE(pColumn->type)) {
        (*ppTSchema)->vlen += (pColumn->bytes + 5);
      }
    }
  }

H
Hongze Cheng 已提交
254 255 256
  return 0;
}

H
Hongze Cheng 已提交
257 258 259
void tTSchemaDestroy(STSchema *pTSchema) {
  if (pTSchema) taosMemoryFree(pTSchema);
}
H
Hongze Cheng 已提交
260

H
Hongze Cheng 已提交
261
// STSRowBuilder
H
Hongze Cheng 已提交
262
int32_t tTSRowBuilderInit(STSRowBuilder *pBuilder, int32_t sver, int32_t nCols, SSchema *pSchema) {
H
Hongze Cheng 已提交
263 264
  if (tTSchemaCreate(sver, pSchema, nCols, &pBuilder->pTSchema) < 0) return -1;

H
Hongze Cheng 已提交
265 266
  pBuilder->szBitMap1 = BIT1_SIZE(nCols - 1);
  pBuilder->szBitMap2 = BIT2_SIZE(nCols - 1);
H
Hongze Cheng 已提交
267 268
  pBuilder->szKVBuf =
      sizeof(STSKVRow) + sizeof(SKVIdx) * (nCols - 1) + pBuilder->pTSchema->flen + pBuilder->pTSchema->vlen;
H
Hongze Cheng 已提交
269 270 271
  pBuilder->szTPBuf = pBuilder->szBitMap2 + pBuilder->pTSchema->flen + pBuilder->pTSchema->vlen;
  pBuilder->pKVBuf = taosMemoryMalloc(pBuilder->szKVBuf);
  if (pBuilder->pKVBuf == NULL) {
H
Hongze Cheng 已提交
272 273
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    tTSchemaDestroy(pBuilder->pTSchema);
H
Hongze Cheng 已提交
274
    return -1;
H
Hongze Cheng 已提交
275
  }
H
Hongze Cheng 已提交
276 277 278
  pBuilder->pTPBuf = taosMemoryMalloc(pBuilder->szTPBuf);
  if (pBuilder->pTPBuf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
279 280
    taosMemoryFree(pBuilder->pKVBuf);
    tTSchemaDestroy(pBuilder->pTSchema);
H
Hongze Cheng 已提交
281
    return -1;
H
Hongze Cheng 已提交
282 283
  }

H
Hongze Cheng 已提交
284 285 286
  return 0;
}

H
Hongze Cheng 已提交
287
void tTSRowBuilderClear(STSRowBuilder *pBuilder) {
H
Hongze Cheng 已提交
288 289 290 291
  if (pBuilder->pTPBuf) {
    taosMemoryFree(pBuilder->pTPBuf);
    pBuilder->pTPBuf = NULL;
  }
H
Hongze Cheng 已提交
292 293 294 295 296 297
  if (pBuilder->pKVBuf) {
    taosMemoryFree(pBuilder->pKVBuf);
    pBuilder->pKVBuf = NULL;
  }
  tTSchemaDestroy(pBuilder->pTSchema);
  pBuilder->pTSchema = NULL;
H
Hongze Cheng 已提交
298 299
}

H
Hongze Cheng 已提交
300
void tTSRowBuilderReset(STSRowBuilder *pBuilder) {
H
Hongze Cheng 已提交
301
  for (int32_t iCol = pBuilder->pTSchema->numOfCols - 1; iCol >= 0; iCol--) {
H
Hongze Cheng 已提交
302 303
    STColumn *pTColumn = &pBuilder->pTSchema->columns[iCol];
    COL_CLR_SET(pTColumn->flags);
H
Hongze Cheng 已提交
304 305
  }

H
Hongze Cheng 已提交
306
  pBuilder->iCol = 0;
H
Hongze Cheng 已提交
307
  ((STSKVRow *)pBuilder->pKVBuf)->nCols = 0;
H
Hongze Cheng 已提交
308 309
  pBuilder->vlenKV = 0;
  pBuilder->vlenTP = 0;
H
Hongze Cheng 已提交
310
  pBuilder->row.flags = 0;
H
Hongze Cheng 已提交
311 312
}

H
Hongze Cheng 已提交
313
int32_t tTSRowBuilderPut(STSRowBuilder *pBuilder, int32_t cid, uint8_t *pData, uint32_t nData) {
H
Hongze Cheng 已提交
314 315 316
  STColumn *pTColumn = &pBuilder->pTSchema->columns[pBuilder->iCol];
  uint8_t  *p;
  int32_t   iCol;
H
Hongze Cheng 已提交
317
  STSKVRow *pTSKVRow = (STSKVRow *)pBuilder->pKVBuf;
H
Hongze Cheng 已提交
318

H
Hongze Cheng 已提交
319 320
  // use interp search
  if (pTColumn->colId < cid) {  // right search
H
Hongze Cheng 已提交
321 322
    for (iCol = pBuilder->iCol + 1; iCol < pBuilder->pTSchema->numOfCols; iCol++) {
      pTColumn = &pBuilder->pTSchema->columns[iCol];
H
Hongze Cheng 已提交
323
      if (pTColumn->colId >= cid) break;
H
Hongze Cheng 已提交
324
    }
H
Hongze Cheng 已提交
325
  } else if (pTColumn->colId > cid) {  // left search
H
Hongze Cheng 已提交
326 327
    for (iCol = pBuilder->iCol - 1; iCol >= 0; iCol--) {
      pTColumn = &pBuilder->pTSchema->columns[iCol];
H
Hongze Cheng 已提交
328
      if (pTColumn->colId <= cid) break;
H
Hongze Cheng 已提交
329
    }
H
Hongze Cheng 已提交
330 331
  }

H
Hongze Cheng 已提交
332
  if (pTColumn->colId != cid || COL_IS_SET(pTColumn->flags)) {
H
Hongze Cheng 已提交
333 334 335
    return -1;
  }

H
Hongze Cheng 已提交
336 337
  pBuilder->iCol = iCol;

H
Hongze Cheng 已提交
338 339
  // set value
  if (cid == 0) {
H
Hongze Cheng 已提交
340
    ASSERT(pData && nData == sizeof(TSKEY) && iCol == 0);
H
Hongze Cheng 已提交
341
    pBuilder->row.ts = *(TSKEY *)pData;
H
Hongze Cheng 已提交
342
    pTColumn->flags |= COL_SET_VAL;
H
Hongze Cheng 已提交
343
  } else {
H
Hongze Cheng 已提交
344 345
    if (pData) {
      // set VAL
H
Hongze Cheng 已提交
346

H
Hongze Cheng 已提交
347 348 349 350 351
      pBuilder->row.flags |= TSROW_HAS_VAL;
      pTColumn->flags |= COL_SET_VAL;

      /* KV */
      if (1) {  // avoid KV at some threshold (todo)
H
Hongze Cheng 已提交
352 353
        pTSKVRow->idx[pTSKVRow->nCols].cid = cid;
        pTSKVRow->idx[pTSKVRow->nCols].offset = pBuilder->vlenKV;
H
Hongze Cheng 已提交
354

H
Hongze Cheng 已提交
355 356
        p = pBuilder->pKVBuf + sizeof(STSKVRow) + sizeof(SKVIdx) * (pBuilder->pTSchema->numOfCols - 1) +
            pBuilder->vlenKV;
H
Hongze Cheng 已提交
357 358 359 360 361 362 363 364
        if (IS_VAR_DATA_TYPE(pTColumn->type)) {
          ASSERT(nData <= pTColumn->bytes);
          pBuilder->vlenKV += tPutBinary(p, pData, nData);
        } else {
          ASSERT(nData == pTColumn->bytes);
          memcpy(p, pData, nData);
          pBuilder->vlenKV += nData;
        }
H
Hongze Cheng 已提交
365 366
      }

H
Hongze Cheng 已提交
367 368 369 370 371
      /* TUPLE */
      p = pBuilder->pTPBuf + pBuilder->szBitMap2 + pTColumn->offset;
      if (IS_VAR_DATA_TYPE(pTColumn->type)) {
        ASSERT(nData <= pTColumn->bytes);
        *(int32_t *)p = pBuilder->vlenTP;
H
Hongze Cheng 已提交
372

H
Hongze Cheng 已提交
373
        p = pBuilder->pTPBuf + pBuilder->szBitMap2 + pBuilder->pTSchema->flen + pBuilder->vlenTP;
H
Hongze Cheng 已提交
374
        pBuilder->vlenTP += tPutBinary(p, pData, nData);
H
Hongze Cheng 已提交
375
      } else {
H
Hongze Cheng 已提交
376
        ASSERT(nData == pTColumn->bytes);
H
Hongze Cheng 已提交
377 378
        memcpy(p, pData, nData);
      }
H
Hongze Cheng 已提交
379 380 381
    } else {
      // set NULL

H
Hongze Cheng 已提交
382
      pBuilder->row.flags |= TSROW_HAS_NULL;
H
Hongze Cheng 已提交
383
      pTColumn->flags |= COL_SET_NULL;
H
Hongze Cheng 已提交
384

H
Hongze Cheng 已提交
385 386
      pTSKVRow->idx[pTSKVRow->nCols].cid = cid;
      pTSKVRow->idx[pTSKVRow->nCols].offset = -1;
H
Hongze Cheng 已提交
387
    }
H
Hongze Cheng 已提交
388

H
Hongze Cheng 已提交
389
    pTSKVRow->nCols++;
H
Hongze Cheng 已提交
390 391 392 393 394
  }

  return 0;
}

H
Hongze Cheng 已提交
395 396 397 398 399 400 401 402 403 404
static FORCE_INLINE int tSKVIdxCmprFn(const void *p1, const void *p2) {
  SKVIdx *pKVIdx1 = (SKVIdx *)p1;
  SKVIdx *pKVIdx2 = (SKVIdx *)p2;
  if (pKVIdx1->cid > pKVIdx2->cid) {
    return 1;
  } else if (pKVIdx1->cid < pKVIdx2->cid) {
    return -1;
  }
  return 0;
}
H
Hongze Cheng 已提交
405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423
static void setBitMap(uint8_t *p, STSchema *pTSchema, uint8_t flags) {
  int32_t   bidx;
  STColumn *pTColumn;

  for (int32_t iCol = 1; iCol < pTSchema->numOfCols; iCol++) {
    pTColumn = &pTSchema->columns[iCol];
    bidx = iCol - 1;

    switch (flags) {
      case TSROW_HAS_NULL | TSROW_HAS_NONE:
        if (pTColumn->flags & COL_SET_NULL) {
          SET_BIT1(p, bidx, (uint8_t)1);
        } else {
          SET_BIT1(p, bidx, (uint8_t)0);
        }
        break;
      case TSROW_HAS_VAL | TSROW_HAS_NULL | TSROW_HAS_NONE:
        if (pTColumn->flags & COL_SET_NULL) {
          SET_BIT2(p, bidx, (uint8_t)1);
H
Hongze Cheng 已提交
424
        } else if (pTColumn->flags & COL_SET_VAL) {
H
Hongze Cheng 已提交
425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440
          SET_BIT2(p, bidx, (uint8_t)2);
        } else {
          SET_BIT2(p, bidx, (uint8_t)0);
        }
        break;
      default:
        if (pTColumn->flags & COL_SET_VAL) {
          SET_BIT1(p, bidx, (uint8_t)1);
        } else {
          SET_BIT1(p, bidx, (uint8_t)0);
        }

        break;
    }
  }
}
H
Hongze Cheng 已提交
441
int32_t tTSRowBuilderGetRow(STSRowBuilder *pBuilder, const STSRow2 **ppRow) {
H
Hongze Cheng 已提交
442 443 444
  int32_t   nDataTP, nDataKV;
  uint32_t  flags;
  STSKVRow *pTSKVRow = (STSKVRow *)pBuilder->pKVBuf;
H
Hongze Cheng 已提交
445
  int32_t   nCols = pBuilder->pTSchema->numOfCols;
H
Hongze Cheng 已提交
446 447

  // error not set ts
H
Hongze Cheng 已提交
448
  if (!COL_IS_SET(pBuilder->pTSchema->columns->flags)) {
H
Hongze Cheng 已提交
449 450 451
    return -1;
  }

H
Hongze Cheng 已提交
452 453
  ASSERT(pTSKVRow->nCols < nCols);
  if (pTSKVRow->nCols < nCols - 1) {
H
Hongze Cheng 已提交
454
    pBuilder->row.flags |= TSROW_HAS_NONE;
H
Hongze Cheng 已提交
455
  }
H
Hongze Cheng 已提交
456

H
Hongze Cheng 已提交
457 458 459 460 461
  ASSERT(pBuilder->row.flags & 0xf != 0);
  *(ppRow) = &pBuilder->row;
  switch (pBuilder->row.flags & 0xf) {
    case TSROW_HAS_NONE:
    case TSROW_HAS_NULL:
H
Hongze Cheng 已提交
462 463
      pBuilder->row.nData = 0;
      pBuilder->row.pData = NULL;
H
Hongze Cheng 已提交
464 465
      return 0;
    case TSROW_HAS_NULL | TSROW_HAS_NONE:
H
Hongze Cheng 已提交
466
      nDataTP = pBuilder->szBitMap1;
H
Hongze Cheng 已提交
467
      break;
H
Hongze Cheng 已提交
468
    case TSROW_HAS_VAL:
H
Hongze Cheng 已提交
469
      nDataTP = pBuilder->pTSchema->flen + pBuilder->vlenTP;
H
Hongze Cheng 已提交
470
      break;
H
Hongze Cheng 已提交
471 472
    case TSROW_HAS_VAL | TSROW_HAS_NONE:
    case TSROW_HAS_VAL | TSROW_HAS_NULL:
H
Hongze Cheng 已提交
473
      nDataTP = pBuilder->szBitMap1 + pBuilder->pTSchema->flen + pBuilder->vlenTP;
H
Hongze Cheng 已提交
474
      break;
H
Hongze Cheng 已提交
475
    case TSROW_HAS_VAL | TSROW_HAS_NULL | TSROW_HAS_NONE:
H
Hongze Cheng 已提交
476
      nDataTP = pBuilder->szBitMap2 + pBuilder->pTSchema->flen + pBuilder->vlenTP;
H
Hongze Cheng 已提交
477 478
      break;
    default:
H
Hongze Cheng 已提交
479
      ASSERT(0);
H
Hongze Cheng 已提交
480 481
  }

H
Hongze Cheng 已提交
482 483
  nDataKV = sizeof(STSKVRow) + sizeof(SKVIdx) * pTSKVRow->nCols + pBuilder->vlenKV;
  pBuilder->row.sver = pBuilder->pTSchema->version;
H
Hongze Cheng 已提交
484 485 486
  if (nDataKV < nDataTP) {
    // generate KV row

H
Hongze Cheng 已提交
487 488
    ASSERT(pBuilder->row.flags & 0xf != TSROW_HAS_VAL);

H
Hongze Cheng 已提交
489
    pBuilder->row.flags |= TSROW_KV_ROW;
H
Hongze Cheng 已提交
490
    pBuilder->row.nData = nDataKV;
H
Hongze Cheng 已提交
491
    pBuilder->row.pData = pBuilder->pKVBuf;
H
Hongze Cheng 已提交
492

H
Hongze Cheng 已提交
493
    qsort(pTSKVRow->idx, pTSKVRow->nCols, sizeof(SKVIdx), tSKVIdxCmprFn);
H
Hongze Cheng 已提交
494 495
    if (pTSKVRow->nCols < nCols - 1) {
      memmove(&pTSKVRow->idx[pTSKVRow->nCols], &pTSKVRow->idx[nCols - 1], pBuilder->vlenKV);
H
Hongze Cheng 已提交
496 497
    }
  } else {
H
Hongze Cheng 已提交
498 499 500 501
    // generate TUPLE row

    pBuilder->row.nData = nDataTP;

H
Hongze Cheng 已提交
502 503
    uint8_t *p;
    uint8_t  flags = pBuilder->row.flags & 0xf;
H
Hongze Cheng 已提交
504

H
Hongze Cheng 已提交
505 506
    if (flags == TSROW_HAS_VAL) {
      pBuilder->row.pData = pBuilder->pTPBuf + pBuilder->szBitMap2;
H
Hongze Cheng 已提交
507
    } else {
H
Hongze Cheng 已提交
508 509
      if (flags == TSROW_HAS_VAL | TSROW_HAS_NULL | TSROW_HAS_NONE) {
        pBuilder->row.pData = pBuilder->pTPBuf;
H
Hongze Cheng 已提交
510
      } else {
H
Hongze Cheng 已提交
511
        pBuilder->row.pData = pBuilder->pTPBuf + pBuilder->szBitMap2 - pBuilder->szBitMap1;
H
Hongze Cheng 已提交
512 513
      }

H
Hongze Cheng 已提交
514
      setBitMap(pBuilder->row.pData, pBuilder->pTSchema, flags);
H
Hongze Cheng 已提交
515
    }
H
Hongze Cheng 已提交
516 517 518 519 520
  }

  return 0;
}

H
Hongze Cheng 已提交
521 522
static int tTagValCmprFn(const void *p1, const void *p2) {
  if (((STagVal *)p1)->cid < ((STagVal *)p2)->cid) {
H
Hongze Cheng 已提交
523
    return -1;
H
Hongze Cheng 已提交
524
  } else if (((STagVal *)p1)->cid > ((STagVal *)p2)->cid) {
H
Hongze Cheng 已提交
525 526
    return 1;
  }
H
Hongze Cheng 已提交
527 528

  ASSERT(0);
H
Hongze Cheng 已提交
529 530
  return 0;
}
H
Hongze Cheng 已提交
531 532 533 534 535
static int tTagValJsonCmprFn(const void *p1, const void *p2) {
  return strcmp(((STagVal *)p1)[0].pKey, ((STagVal *)p2)[0].pKey);
}
static int32_t tPutTagVal(uint8_t *p, STagVal *pTagVal, int8_t isJson) {
  int32_t n = 0;
H
Hongze Cheng 已提交
536

H
Hongze Cheng 已提交
537 538 539 540 541 542
  // key
  if (isJson) {
    n += tPutCStr(p ? p + n : p, pTagVal->pKey);
  } else {
    n += tPutI16v(p ? p + n : p, pTagVal->cid);
  }
H
Hongze Cheng 已提交
543

H
Hongze Cheng 已提交
544 545 546 547 548 549 550 551 552 553
  // type
  n += tPutI8(p ? p + n : p, pTagVal->type);

  // value
  if (IS_VAR_DATA_TYPE(pTagVal->type)) {
    n += tPutBinary(p ? p + n : p, pTagVal->pData, pTagVal->nData);
  } else {
    ASSERT(pTagVal->nData == TYPE_BYTES[pTagVal->type]);
    if (p) memcpy(p + n, pTagVal->pData, pTagVal->nData);
    n += pTagVal->nData;
H
Hongze Cheng 已提交
554 555
  }

H
Hongze Cheng 已提交
556 557 558 559 560 561 562 563 564 565
  return n;
}
static int32_t tGetTagVal(uint8_t *p, STagVal *pTagVal, int8_t isJson) {
  int32_t n = 0;

  // key
  if (isJson) {
    n += tGetCStr(p + n, &pTagVal->pKey);
  } else {
    n += tGetI16v(p + n, &pTagVal->cid);
H
Hongze Cheng 已提交
566 567
  }

H
Hongze Cheng 已提交
568 569
  // type
  n += tGetI8(p + n, &pTagVal->type);
H
Hongze Cheng 已提交
570

H
Hongze Cheng 已提交
571 572 573 574 575 576 577 578 579 580 581
  // value
  if (IS_VAR_DATA_TYPE(pTagVal->type)) {
    n += tGetBinary(p + n, &pTagVal->pData, &pTagVal->nData);
  } else {
    pTagVal->pData = p + n;
    pTagVal->nData = TYPE_BYTES[pTagVal->type];
    n += pTagVal->nData;
  }

  return n;
}
H
Hongze Cheng 已提交
582
int32_t tTagNew(SArray *pArray, int32_t version, int8_t isJson, STag **ppTag) {
H
Hongze Cheng 已提交
583 584 585
  int32_t  code = 0;
  uint8_t *p = NULL;
  int16_t  n = 0;
H
Hongze Cheng 已提交
586
  int16_t  nTag = taosArrayGetSize(pArray);
H
Hongze Cheng 已提交
587 588 589 590
  int32_t  szTag = sizeof(STag) + sizeof(int16_t) * nTag;

  // sort
  if (isJson) {
H
Hongze Cheng 已提交
591
    qsort(pArray->pData, nTag, sizeof(STagVal), tTagValJsonCmprFn);
H
Hongze Cheng 已提交
592
  } else {
H
Hongze Cheng 已提交
593
    qsort(pArray->pData, nTag, sizeof(STagVal), tTagValCmprFn);
H
Hongze Cheng 已提交
594 595 596
  }

  // get size
H
Hongze Cheng 已提交
597
  for (int16_t iTag = 0; iTag < nTag; iTag++) {
H
Hongze Cheng 已提交
598
    szTag += tPutTagVal(NULL, (STagVal *)taosArrayGet(pArray, iTag), isJson);
H
Hongze Cheng 已提交
599
  }
H
Hongze Cheng 已提交
600

H
Hongze Cheng 已提交
601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616
  // TODO
  // if (szTag >= 16 * 1024) {
  //   code = TSDB_CODE_IVLD_TAG;
  //   goto _err;
  // }

  // build tag
  (*ppTag) = (STag *)taosMemoryMalloc(szTag);
  if ((*ppTag) == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
  (*ppTag)->isJson = isJson ? 1 : 0;
  (*ppTag)->len = szTag;
  (*ppTag)->nTag = nTag;
  (*ppTag)->ver = version;
H
Hongze Cheng 已提交
617

H
Hongze Cheng 已提交
618 619 620 621
  p = (uint8_t *)&(*ppTag)->idx[nTag];
  n = 0;
  for (int16_t iTag = 0; iTag < nTag; iTag++) {
    (*ppTag)->idx[iTag] = n;
H
Hongze Cheng 已提交
622
    n += tPutTagVal(p + n, (STagVal *)taosArrayGet(pArray, iTag), isJson);
H
Hongze Cheng 已提交
623 624
  }

H
Hongze Cheng 已提交
625 626 627 628
  return code;

_err:
  return code;
H
Hongze Cheng 已提交
629 630 631 632 633 634
}

void tTagFree(STag *pTag) {
  if (pTag) taosMemoryFree(pTag);
}

H
Hongze Cheng 已提交
635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651
void tTagGet(STag *pTag, STagVal *pTagVal) {
  int16_t  lidx = 0;
  int16_t  ridx = pTag->nTag - 1;
  int16_t  midx;
  uint8_t *p = (uint8_t *)&pTag->idx[pTag->nTag];
  STagVal  tv;
  int      c;

  pTagVal->type = TSDB_DATA_TYPE_NULL;
  pTagVal->pData = NULL;
  pTagVal->nData = 0;
  while (lidx <= ridx) {
    midx = (lidx + ridx) / 2;

    tGetTagVal(p + pTag->idx[midx], &tv, pTag->isJson);
    if (pTag->isJson) {
      c = tTagValJsonCmprFn(pTagVal, &tv);
H
Hongze Cheng 已提交
652
    } else {
H
Hongze Cheng 已提交
653
      c = tTagValCmprFn(pTagVal, &tv);
H
Hongze Cheng 已提交
654 655
    }

H
Hongze Cheng 已提交
656 657 658 659
    if (c < 0) {
      ridx = midx - 1;
    } else if (c > 0) {
      lidx = midx + 1;
H
Hongze Cheng 已提交
660
    } else {
H
Hongze Cheng 已提交
661 662 663 664
      pTagVal->type = tv.type;
      pTagVal->nData = tv.nData;
      pTagVal->pData = tv.pData;
      break;
H
Hongze Cheng 已提交
665 666 667 668
    }
  }
}

H
more  
Hongze Cheng 已提交
669 670
int32_t tEncodeTag(SEncoder *pEncoder, const STag *pTag) {
  return tEncodeBinary(pEncoder, (const uint8_t *)pTag, pTag->len);
H
Hongze Cheng 已提交
671 672
}

H
Hongze Cheng 已提交
673
int32_t tDecodeTag(SDecoder *pDecoder, STag **ppTag) { return tDecodeBinary(pDecoder, (uint8_t **)ppTag, NULL); }
H
Hongze Cheng 已提交
674

H
Hongze Cheng 已提交
675
int32_t tTagToValArray(STag *pTag, SArray **ppArray) {
H
Hongze Cheng 已提交
676 677
  int32_t  code = 0;
  uint8_t *p = (uint8_t *)&pTag->idx[pTag->nTag];
H
Hongze Cheng 已提交
678
  STagVal  tv;
H
Hongze Cheng 已提交
679

H
Hongze Cheng 已提交
680 681
  (*ppArray) = taosArrayInit(pTag->nTag, sizeof(STagVal));
  if (*ppArray == NULL) {
H
Hongze Cheng 已提交
682 683 684 685 686
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  for (int16_t iTag = 0; iTag < pTag->nTag; iTag++) {
H
Hongze Cheng 已提交
687 688
    tGetTagVal(p + pTag->idx[iTag], &tv, pTag->isJson);
    taosArrayPush(*ppArray, &tv);
H
Hongze Cheng 已提交
689 690 691 692 693 694 695 696
  }

  return code;

_err:
  return code;
}

H
Hongze Cheng 已提交
697
#if 1  // ===================================================================================================================
698
static void dataColSetNEleNull(SDataCol *pCol, int nEle);
H
Hongze Cheng 已提交
699
int         tdAllocMemForCol(SDataCol *pCol, int maxPoints) {
L
Liu Jicong 已提交
700
  int spaceNeeded = pCol->bytes * maxPoints;
S
Shengliang Guan 已提交
701
  if (IS_VAR_DATA_TYPE(pCol->type)) {
L
Liu Jicong 已提交
702
    spaceNeeded += sizeof(VarDataOffsetT) * maxPoints;
L
Liu Jicong 已提交
703
  }
C
Cary Xu 已提交
704
#ifdef TD_SUPPORT_BITMAP
C
Cary Xu 已提交
705 706 707 708
  int32_t nBitmapBytes = (int32_t)TD_BITMAP_BYTES(maxPoints);
  spaceNeeded += (int)nBitmapBytes;
  // TODO: Currently, the compression of bitmap parts is affiliated to the column data parts, thus allocate 1 more
  // TYPE_BYTES as to comprise complete TYPE_BYTES. Otherwise, invalid read/write would be triggered.
H
Hongze Cheng 已提交
709 710
  // spaceNeeded += TYPE_BYTES[pCol->type]; // the bitmap part is append as a single part since 2022.04.03, thus
  // remove the additional space
C
Cary Xu 已提交
711
#endif
C
Cary Xu 已提交
712

S
Shengliang Guan 已提交
713
  if (pCol->spaceSize < spaceNeeded) {
wafwerar's avatar
wafwerar 已提交
714
    void *ptr = taosMemoryRealloc(pCol->pData, spaceNeeded);
S
Shengliang Guan 已提交
715 716
    if (ptr == NULL) {
      uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)spaceNeeded, strerror(errno));
L
Liu Jicong 已提交
717
      return -1;
L
Liu Jicong 已提交
718 719 720
    } else {
      pCol->pData = ptr;
      pCol->spaceSize = spaceNeeded;
721 722
    }
  }
C
Cary Xu 已提交
723
#ifdef TD_SUPPORT_BITMAP
724

C
Cary Xu 已提交
725 726 727 728
  if (IS_VAR_DATA_TYPE(pCol->type)) {
    pCol->pBitmap = POINTER_SHIFT(pCol->pData, pCol->bytes * maxPoints);
    pCol->dataOff = POINTER_SHIFT(pCol->pBitmap, nBitmapBytes);
  } else {
C
Cary Xu 已提交
729
    pCol->pBitmap = POINTER_SHIFT(pCol->pData, pCol->bytes * maxPoints);
L
Liu Jicong 已提交
730
  }
C
Cary Xu 已提交
731 732 733 734
#else
  if (IS_VAR_DATA_TYPE(pCol->type)) {
    pCol->dataOff = POINTER_SHIFT(pCol->pData, pCol->bytes * maxPoints);
  }
C
Cary Xu 已提交
735
#endif
L
Liu Jicong 已提交
736
  return 0;
737 738
}

H
hzcheng 已提交
739 740 741
/**
 * Duplicate the schema and return a new object
 */
H
Hongze Cheng 已提交
742
STSchema *tdDupSchema(const STSchema *pSchema) {
S
Shengliang Guan 已提交
743
  int       tlen = sizeof(STSchema) + sizeof(STColumn) * schemaNCols(pSchema);
wafwerar's avatar
wafwerar 已提交
744
  STSchema *tSchema = (STSchema *)taosMemoryMalloc(tlen);
H
hzcheng 已提交
745 746
  if (tSchema == NULL) return NULL;

H
Hongze Cheng 已提交
747
  memcpy((void *)tSchema, (void *)pSchema, tlen);
H
hzcheng 已提交
748 749 750 751

  return tSchema;
}

H
TD-27  
hzcheng 已提交
752 753 754
/**
 * Encode a schema to dst, and return the next pointer
 */
H
TD-353  
Hongze Cheng 已提交
755 756 757 758
int tdEncodeSchema(void **buf, STSchema *pSchema) {
  int tlen = 0;
  tlen += taosEncodeFixedI32(buf, schemaVersion(pSchema));
  tlen += taosEncodeFixedI32(buf, schemaNCols(pSchema));
H
TD-166  
hzcheng 已提交
759

H
TD-27  
hzcheng 已提交
760 761
  for (int i = 0; i < schemaNCols(pSchema); i++) {
    STColumn *pCol = schemaColAt(pSchema, i);
H
TD-353  
Hongze Cheng 已提交
762
    tlen += taosEncodeFixedI8(buf, colType(pCol));
C
Cary Xu 已提交
763
    tlen += taosEncodeFixedI8(buf, colFlags(pCol));
H
TD-353  
Hongze Cheng 已提交
764
    tlen += taosEncodeFixedI16(buf, colColId(pCol));
765
    tlen += taosEncodeFixedI16(buf, colBytes(pCol));
H
TD-27  
hzcheng 已提交
766 767
  }

H
TD-353  
Hongze Cheng 已提交
768
  return tlen;
H
TD-27  
hzcheng 已提交
769 770 771 772 773
}

/**
 * Decode a schema from a binary.
 */
H
TD-353  
Hongze Cheng 已提交
774
void *tdDecodeSchema(void *buf, STSchema **pRSchema) {
S
Shengliang Guan 已提交
775 776
  int             version = 0;
  int             numOfCols = 0;
H
TD-353  
Hongze Cheng 已提交
777
  STSchemaBuilder schemaBuilder;
H
TD-27  
hzcheng 已提交
778

H
TD-353  
Hongze Cheng 已提交
779 780
  buf = taosDecodeFixedI32(buf, &version);
  buf = taosDecodeFixedI32(buf, &numOfCols);
H
TD-27  
hzcheng 已提交
781

H
Hongze Cheng 已提交
782 783
  if (tdInitTSchemaBuilder(&schemaBuilder, version) < 0) return NULL;

H
TD-353  
Hongze Cheng 已提交
784
  for (int i = 0; i < numOfCols; i++) {
785
    col_type_t  type = 0;
C
Cary Xu 已提交
786
    int8_t      flags = 0;
787 788
    col_id_t    colId = 0;
    col_bytes_t bytes = 0;
H
TD-353  
Hongze Cheng 已提交
789
    buf = taosDecodeFixedI8(buf, &type);
C
Cary Xu 已提交
790
    buf = taosDecodeFixedI8(buf, &flags);
H
TD-353  
Hongze Cheng 已提交
791
    buf = taosDecodeFixedI16(buf, &colId);
792
    buf = taosDecodeFixedI32(buf, &bytes);
C
Cary Xu 已提交
793
    if (tdAddColToSchema(&schemaBuilder, type, flags, colId, bytes) < 0) {
H
Hongze Cheng 已提交
794 795 796
      tdDestroyTSchemaBuilder(&schemaBuilder);
      return NULL;
    }
H
TD-27  
hzcheng 已提交
797 798
  }

H
TD-353  
Hongze Cheng 已提交
799
  *pRSchema = tdGetSchemaFromBuilder(&schemaBuilder);
H
Hongze Cheng 已提交
800
  tdDestroyTSchemaBuilder(&schemaBuilder);
H
TD-353  
Hongze Cheng 已提交
801
  return buf;
H
Hongze Cheng 已提交
802 803
}

C
Cary Xu 已提交
804
int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, schema_ver_t version) {
H
Hongze Cheng 已提交
805 806 807
  if (pBuilder == NULL) return -1;

  pBuilder->tCols = 256;
wafwerar's avatar
wafwerar 已提交
808
  pBuilder->columns = (STColumn *)taosMemoryMalloc(sizeof(STColumn) * pBuilder->tCols);
H
Hongze Cheng 已提交
809 810 811 812 813 814 815 816
  if (pBuilder->columns == NULL) return -1;

  tdResetTSchemaBuilder(pBuilder, version);
  return 0;
}

void tdDestroyTSchemaBuilder(STSchemaBuilder *pBuilder) {
  if (pBuilder) {
wafwerar's avatar
wafwerar 已提交
817
    taosMemoryFreeClear(pBuilder->columns);
H
Hongze Cheng 已提交
818 819 820
  }
}

C
Cary Xu 已提交
821
void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, schema_ver_t version) {
H
Hongze Cheng 已提交
822 823 824
  pBuilder->nCols = 0;
  pBuilder->tlen = 0;
  pBuilder->flen = 0;
T
Tao Liu 已提交
825
  pBuilder->vlen = 0;
H
Hongze Cheng 已提交
826 827 828
  pBuilder->version = version;
}

C
Cary Xu 已提交
829
int32_t tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int8_t flags, col_id_t colId, col_bytes_t bytes) {
830
  if (!isValidDataType(type)) return -1;
H
Hongze Cheng 已提交
831 832 833

  if (pBuilder->nCols >= pBuilder->tCols) {
    pBuilder->tCols *= 2;
wafwerar's avatar
wafwerar 已提交
834
    STColumn *columns = (STColumn *)taosMemoryRealloc(pBuilder->columns, sizeof(STColumn) * pBuilder->tCols);
T
tickduan 已提交
835 836
    if (columns == NULL) return -1;
    pBuilder->columns = columns;
H
Hongze Cheng 已提交
837 838 839 840 841
  }

  STColumn *pCol = &(pBuilder->columns[pBuilder->nCols]);
  colSetType(pCol, type);
  colSetColId(pCol, colId);
C
Cary Xu 已提交
842
  colSetFlags(pCol, flags);
H
Hongze Cheng 已提交
843 844 845
  if (pBuilder->nCols == 0) {
    colSetOffset(pCol, 0);
  } else {
S
Shengliang Guan 已提交
846
    STColumn *pTCol = &(pBuilder->columns[pBuilder->nCols - 1]);
H
Hongze Cheng 已提交
847 848 849 850 851
    colSetOffset(pCol, pTCol->offset + TYPE_BYTES[pTCol->type]);
  }

  if (IS_VAR_DATA_TYPE(type)) {
    colSetBytes(pCol, bytes);
T
Tao Liu 已提交
852 853
    pBuilder->tlen += (TYPE_BYTES[type] + bytes);
    pBuilder->vlen += bytes - sizeof(VarDataLenT);
H
Hongze Cheng 已提交
854 855 856
  } else {
    colSetBytes(pCol, TYPE_BYTES[type]);
    pBuilder->tlen += TYPE_BYTES[type];
T
Tao Liu 已提交
857
    pBuilder->vlen += TYPE_BYTES[type];
H
Hongze Cheng 已提交
858 859 860 861 862 863 864 865 866 867 868 869 870 871 872
  }

  pBuilder->nCols++;
  pBuilder->flen += TYPE_BYTES[type];

  ASSERT(pCol->offset < pBuilder->flen);

  return 0;
}

STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder) {
  if (pBuilder->nCols <= 0) return NULL;

  int tlen = sizeof(STSchema) + sizeof(STColumn) * pBuilder->nCols;

wafwerar's avatar
wafwerar 已提交
873
  STSchema *pSchema = (STSchema *)taosMemoryMalloc(tlen);
H
Hongze Cheng 已提交
874 875 876 877 878 879
  if (pSchema == NULL) return NULL;

  schemaVersion(pSchema) = pBuilder->version;
  schemaNCols(pSchema) = pBuilder->nCols;
  schemaTLen(pSchema) = pBuilder->tlen;
  schemaFLen(pSchema) = pBuilder->flen;
T
Tao Liu 已提交
880
  schemaVLen(pSchema) = pBuilder->vlen;
H
Hongze Cheng 已提交
881

C
Cary Xu 已提交
882
#ifdef TD_SUPPORT_BITMAP
C
Cary Xu 已提交
883
  schemaTLen(pSchema) += (int)TD_BITMAP_BYTES(schemaNCols(pSchema));
C
Cary Xu 已提交
884 885
#endif

H
Hongze Cheng 已提交
886 887
  memcpy(schemaColAt(pSchema, 0), pBuilder->columns, sizeof(STColumn) * pBuilder->nCols);

H
TD-27  
hzcheng 已提交
888 889 890
  return pSchema;
}

891
void dataColInit(SDataCol *pDataCol, STColumn *pCol, int maxPoints) {
H
TD-166  
hzcheng 已提交
892 893 894
  pDataCol->type = colType(pCol);
  pDataCol->colId = colColId(pCol);
  pDataCol->bytes = colBytes(pCol);
S
Shengliang Guan 已提交
895
  pDataCol->offset = colOffset(pCol) + 0;  // TD_DATA_ROW_HEAD_SIZE;
H
TD-166  
hzcheng 已提交
896 897 898

  pDataCol->len = 0;
}
C
Cary Xu 已提交
899

L
Liu Jicong 已提交
900 901 902 903 904 905
static FORCE_INLINE const void *tdGetColDataOfRowUnsafe(SDataCol *pCol, int row) {
  if (IS_VAR_DATA_TYPE(pCol->type)) {
    return POINTER_SHIFT(pCol->pData, pCol->dataOff[row]);
  } else {
    return POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * row);
  }
H
TD-166  
hzcheng 已提交
906 907
}

H
TD-166  
hzcheng 已提交
908
bool isNEleNull(SDataCol *pCol, int nEle) {
S
Shengliang Guan 已提交
909
  if (isAllRowsNull(pCol)) return true;
910
  for (int i = 0; i < nEle; ++i) {
L
Liu Jicong 已提交
911
    if (!isNull(tdGetColDataOfRowUnsafe(pCol, i), pCol->type)) return false;
H
TD-166  
hzcheng 已提交
912
  }
H
Hongze Cheng 已提交
913
  return true;
H
TD-166  
hzcheng 已提交
914 915
}

C
Cary Xu 已提交
916
void *dataColSetOffset(SDataCol *pCol, int nEle) {
H
TD-166  
hzcheng 已提交
917 918
  ASSERT(((pCol->type == TSDB_DATA_TYPE_BINARY) || (pCol->type == TSDB_DATA_TYPE_NCHAR)));

H
Hongze Cheng 已提交
919
  void *tptr = pCol->pData;
H
TD-166  
hzcheng 已提交
920
  // char *tptr = (char *)(pCol->pData);
H
TD-166  
hzcheng 已提交
921

H
TD-166  
hzcheng 已提交
922
  VarDataOffsetT offset = 0;
923
  for (int i = 0; i < nEle; ++i) {
H
TD-166  
hzcheng 已提交
924
    pCol->dataOff[i] = offset;
H
TD-166  
hzcheng 已提交
925
    offset += varDataTLen(tptr);
H
hzcheng 已提交
926
    tptr = POINTER_SHIFT(tptr, varDataTLen(tptr));
H
TD-166  
hzcheng 已提交
927
  }
C
Cary Xu 已提交
928
  return POINTER_SHIFT(tptr, varDataTLen(tptr));
H
TD-166  
hzcheng 已提交
929 930
}

L
Liu Jicong 已提交
931
SDataCols *tdNewDataCols(int maxCols, int maxRows) {
wafwerar's avatar
wafwerar 已提交
932
  SDataCols *pCols = (SDataCols *)taosMemoryCalloc(1, sizeof(SDataCols));
H
Haojun Liao 已提交
933
  if (pCols == NULL) {
S
Shengliang Guan 已提交
934
    uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)sizeof(SDataCols), strerror(errno));
H
Haojun Liao 已提交
935 936
    return NULL;
  }
H
TD-34  
hzcheng 已提交
937

H
Hongze Cheng 已提交
938
  pCols->maxPoints = maxRows;
L
Liu Jicong 已提交
939 940 941
  pCols->maxCols = maxCols;
  pCols->numOfRows = 0;
  pCols->numOfCols = 0;
942
  pCols->bitmapMode = TSDB_BITMODE_DEFAULT;
H
Hongze Cheng 已提交
943 944

  if (maxCols > 0) {
wafwerar's avatar
wafwerar 已提交
945
    pCols->cols = (SDataCol *)taosMemoryCalloc(maxCols, sizeof(SDataCol));
H
Hongze Cheng 已提交
946 947 948 949 950 951
    if (pCols->cols == NULL) {
      uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)sizeof(SDataCol) * maxCols,
             strerror(errno));
      tdFreeDataCols(pCols);
      return NULL;
    }
952
#if 0  // no need as calloc used
L
Liu Jicong 已提交
953
    int i;
S
Shengliang Guan 已提交
954
    for (i = 0; i < maxCols; i++) {
L
Liu Jicong 已提交
955
      pCols->cols[i].spaceSize = 0;
L
Liu Jicong 已提交
956
      pCols->cols[i].len = 0;
L
Liu Jicong 已提交
957 958 959
      pCols->cols[i].pData = NULL;
      pCols->cols[i].dataOff = NULL;
    }
960
#endif
H
Hongze Cheng 已提交
961 962
  }

H
TD-34  
hzcheng 已提交
963 964 965
  return pCols;
}

H
Hongze Cheng 已提交
966
int tdInitDataCols(SDataCols *pCols, STSchema *pSchema) {
967 968
  int i;
  int oldMaxCols = pCols->maxCols;
L
Liu Jicong 已提交
969
  if (schemaNCols(pSchema) > oldMaxCols) {
H
Hongze Cheng 已提交
970
    pCols->maxCols = schemaNCols(pSchema);
wafwerar's avatar
wafwerar 已提交
971
    void *ptr = (SDataCol *)taosMemoryRealloc(pCols->cols, sizeof(SDataCol) * pCols->maxCols);
L
Liu Jicong 已提交
972 973
    if (ptr == NULL) return -1;
    pCols->cols = ptr;
974
    for (i = oldMaxCols; i < pCols->maxCols; ++i) {
975 976
      pCols->cols[i].pData = NULL;
      pCols->cols[i].dataOff = NULL;
977
      pCols->cols[i].pBitmap = NULL;
L
Liu Jicong 已提交
978
      pCols->cols[i].spaceSize = 0;
979
    }
L
Liu Jicong 已提交
980
  }
981 982 983
#if 0
  tdResetDataCols(pCols); // redundant loop to reset len/blen to 0, already reset in following dataColInit(...)
#endif
H
Hongze Cheng 已提交
984

985
  pCols->numOfRows = 0;
986
  pCols->bitmapMode = TSDB_BITMODE_DEFAULT;
H
TD-34  
hzcheng 已提交
987 988
  pCols->numOfCols = schemaNCols(pSchema);

989
  for (i = 0; i < schemaNCols(pSchema); ++i) {
990
    dataColInit(pCols->cols + i, schemaColAt(pSchema, i), pCols->maxPoints);
H
TD-34  
hzcheng 已提交
991
  }
S
Shengliang Guan 已提交
992

H
Hongze Cheng 已提交
993
  return 0;
H
TD-34  
hzcheng 已提交
994 995
}

H
Hongze Cheng 已提交
996
SDataCols *tdFreeDataCols(SDataCols *pCols) {
997
  int i;
H
TD-34  
hzcheng 已提交
998
  if (pCols) {
S
Shengliang Guan 已提交
999
    if (pCols->cols) {
1000
      int maxCols = pCols->maxCols;
1001
      for (i = 0; i < maxCols; ++i) {
1002
        SDataCol *pCol = &pCols->cols[i];
wafwerar's avatar
wafwerar 已提交
1003
        taosMemoryFreeClear(pCol->pData);
1004
      }
wafwerar's avatar
wafwerar 已提交
1005
      taosMemoryFree(pCols->cols);
1006 1007
      pCols->cols = NULL;
    }
wafwerar's avatar
wafwerar 已提交
1008
    taosMemoryFree(pCols);
H
TD-34  
hzcheng 已提交
1009
  }
H
Hongze Cheng 已提交
1010
  return NULL;
H
TD-34  
hzcheng 已提交
1011 1012 1013
}

void tdResetDataCols(SDataCols *pCols) {
B
Bomin Zhang 已提交
1014 1015
  if (pCols != NULL) {
    pCols->numOfRows = 0;
C
Cary Xu 已提交
1016
    pCols->bitmapMode = 0;
1017
    for (int i = 0; i < pCols->maxCols; ++i) {
B
Bomin Zhang 已提交
1018 1019
      dataColReset(pCols->cols + i);
    }
H
TD-34  
hzcheng 已提交
1020 1021
  }
}
H
Hongze Cheng 已提交
1022

H
Hongze Cheng 已提交
1023
SKVRow tdKVRowDup(SKVRow row) {
wafwerar's avatar
wafwerar 已提交
1024
  SKVRow trow = taosMemoryMalloc(kvRowLen(row));
H
Hongze Cheng 已提交
1025 1026
  if (trow == NULL) return NULL;

H
Hongze Cheng 已提交
1027
  kvRowCpy(trow, row);
H
Hongze Cheng 已提交
1028 1029 1030
  return trow;
}

S
Shengliang Guan 已提交
1031 1032 1033
static int compareColIdx(const void *a, const void *b) {
  const SColIdx *x = (const SColIdx *)a;
  const SColIdx *y = (const SColIdx *)b;
B
Bomin Zhang 已提交
1034 1035 1036 1037 1038 1039 1040 1041 1042
  if (x->colId > y->colId) {
    return 1;
  }
  if (x->colId < y->colId) {
    return -1;
  }
  return 0;
}

S
Shengliang Guan 已提交
1043
void tdSortKVRowByColIdx(SKVRow row) { qsort(kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), compareColIdx); }
B
Bomin Zhang 已提交
1044

H
TD-90  
Hongze Cheng 已提交
1045 1046 1047 1048
int tdSetKVRowDataOfCol(SKVRow *orow, int16_t colId, int8_t type, void *value) {
  SColIdx *pColIdx = NULL;
  SKVRow   row = *orow;
  SKVRow   nrow = NULL;
S
Shengliang Guan 已提交
1049
  void    *ptr = taosbsearch(&colId, kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), comparTagId, TD_GE);
H
TD-90  
Hongze Cheng 已提交
1050

1051
  if (ptr == NULL || ((SColIdx *)ptr)->colId > colId) {  // need to add a column value to the row
C
Cary Xu 已提交
1052
    int diff = IS_VAR_DATA_TYPE(type) ? varDataTLen(value) : TYPE_BYTES[type];
1053 1054 1055 1056
    int nRowLen = kvRowLen(row) + sizeof(SColIdx) + diff;
    int oRowCols = kvRowNCols(row);

    ASSERT(diff > 0);
wafwerar's avatar
wafwerar 已提交
1057
    nrow = taosMemoryMalloc(nRowLen);
H
TD-90  
Hongze Cheng 已提交
1058 1059
    if (nrow == NULL) return -1;

1060 1061
    kvRowSetLen(nrow, nRowLen);
    kvRowSetNCols(nrow, oRowCols + 1);
H
TD-90  
Hongze Cheng 已提交
1062

1063 1064
    memcpy(kvRowColIdx(nrow), kvRowColIdx(row), sizeof(SColIdx) * oRowCols);
    memcpy(kvRowValues(nrow), kvRowValues(row), kvRowValLen(row));
H
TD-90  
Hongze Cheng 已提交
1065

1066 1067 1068
    pColIdx = kvRowColIdxAt(nrow, oRowCols);
    pColIdx->colId = colId;
    pColIdx->offset = kvRowValLen(row);
H
TD-90  
Hongze Cheng 已提交
1069

1070
    memcpy(kvRowColVal(nrow, pColIdx), value, diff);  // copy new value
H
TD-90  
Hongze Cheng 已提交
1071

1072
    tdSortKVRowByColIdx(nrow);
H
TD-90  
Hongze Cheng 已提交
1073 1074

    *orow = nrow;
wafwerar's avatar
wafwerar 已提交
1075
    taosMemoryFree(row);
H
TD-90  
Hongze Cheng 已提交
1076 1077 1078 1079 1080
  } else {
    ASSERT(((SColIdx *)ptr)->colId == colId);
    if (IS_VAR_DATA_TYPE(type)) {
      void *pOldVal = kvRowColVal(row, (SColIdx *)ptr);

S
Shengliang Guan 已提交
1081
      if (varDataTLen(value) == varDataTLen(pOldVal)) {  // just update the column value in place
H
TD-90  
Hongze Cheng 已提交
1082
        memcpy(pOldVal, value, varDataTLen(value));
1083 1084
      } else {  // need to reallocate the memory
        int16_t nlen = kvRowLen(row) + (varDataTLen(value) - varDataTLen(pOldVal));
H
TD-90  
Hongze Cheng 已提交
1085
        ASSERT(nlen > 0);
wafwerar's avatar
wafwerar 已提交
1086
        nrow = taosMemoryMalloc(nlen);
H
TD-90  
Hongze Cheng 已提交
1087
        if (nrow == NULL) return -1;
H
TD-90  
Hongze Cheng 已提交
1088 1089 1090 1091

        kvRowSetLen(nrow, nlen);
        kvRowSetNCols(nrow, kvRowNCols(row));

1092 1093 1094 1095 1096 1097 1098 1099
        int zsize = sizeof(SColIdx) * kvRowNCols(row) + ((SColIdx *)ptr)->offset;
        memcpy(kvRowColIdx(nrow), kvRowColIdx(row), zsize);
        memcpy(kvRowColVal(nrow, ((SColIdx *)ptr)), value, varDataTLen(value));
        // Copy left value part
        int lsize = kvRowLen(row) - TD_KV_ROW_HEAD_SIZE - zsize - varDataTLen(pOldVal);
        if (lsize > 0) {
          memcpy(POINTER_SHIFT(nrow, TD_KV_ROW_HEAD_SIZE + zsize + varDataTLen(value)),
                 POINTER_SHIFT(row, TD_KV_ROW_HEAD_SIZE + zsize + varDataTLen(pOldVal)), lsize);
H
TD-90  
Hongze Cheng 已提交
1100 1101
        }

1102 1103 1104 1105 1106
        for (int i = 0; i < kvRowNCols(nrow); i++) {
          pColIdx = kvRowColIdxAt(nrow, i);

          if (pColIdx->offset > ((SColIdx *)ptr)->offset) {
            pColIdx->offset = pColIdx->offset - varDataTLen(pOldVal) + varDataTLen(value);
H
TD-90  
Hongze Cheng 已提交
1107 1108 1109 1110
          }
        }

        *orow = nrow;
wafwerar's avatar
wafwerar 已提交
1111
        taosMemoryFree(row);
H
TD-90  
Hongze Cheng 已提交
1112 1113 1114 1115 1116 1117 1118
      }
    } else {
      memcpy(kvRowColVal(row, (SColIdx *)ptr), value, TYPE_BYTES[type]);
    }
  }

  return 0;
H
Hongze Cheng 已提交
1119 1120
}

H
TD-353  
Hongze Cheng 已提交
1121
int tdEncodeKVRow(void **buf, SKVRow row) {
H
Hongze Cheng 已提交
1122
  // May change the encode purpose
H
TD-353  
Hongze Cheng 已提交
1123 1124 1125 1126 1127 1128
  if (buf != NULL) {
    kvRowCpy(*buf, row);
    *buf = POINTER_SHIFT(*buf, kvRowLen(row));
  }

  return kvRowLen(row);
H
Hongze Cheng 已提交
1129 1130
}

H
Hongze Cheng 已提交
1131 1132
void *tdDecodeKVRow(void *buf, SKVRow *row) {
  *row = tdKVRowDup(buf);
H
TD-353  
Hongze Cheng 已提交
1133
  if (*row == NULL) return NULL;
H
Hongze Cheng 已提交
1134
  return POINTER_SHIFT(buf, kvRowLen(*row));
H
Hongze Cheng 已提交
1135 1136
}

H
Hongze Cheng 已提交
1137
int tdInitKVRowBuilder(SKVRowBuilder *pBuilder) {
H
Hongze Cheng 已提交
1138 1139
  pBuilder->tCols = 128;
  pBuilder->nCols = 0;
wafwerar's avatar
wafwerar 已提交
1140
  pBuilder->pColIdx = (SColIdx *)taosMemoryMalloc(sizeof(SColIdx) * pBuilder->tCols);
H
Hongze Cheng 已提交
1141 1142 1143
  if (pBuilder->pColIdx == NULL) return -1;
  pBuilder->alloc = 1024;
  pBuilder->size = 0;
wafwerar's avatar
wafwerar 已提交
1144
  pBuilder->buf = taosMemoryMalloc(pBuilder->alloc);
H
Hongze Cheng 已提交
1145
  if (pBuilder->buf == NULL) {
wafwerar's avatar
wafwerar 已提交
1146
    taosMemoryFree(pBuilder->pColIdx);
H
Hongze Cheng 已提交
1147 1148 1149 1150 1151
    return -1;
  }
  return 0;
}

H
Hongze Cheng 已提交
1152
void tdDestroyKVRowBuilder(SKVRowBuilder *pBuilder) {
wafwerar's avatar
wafwerar 已提交
1153 1154
  taosMemoryFreeClear(pBuilder->pColIdx);
  taosMemoryFreeClear(pBuilder->buf);
H
Hongze Cheng 已提交
1155 1156
}

H
Hongze Cheng 已提交
1157
void tdResetKVRowBuilder(SKVRowBuilder *pBuilder) {
H
Hongze Cheng 已提交
1158 1159 1160 1161
  pBuilder->nCols = 0;
  pBuilder->size = 0;
}

H
Hongze Cheng 已提交
1162
SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder) {
C
Cary Xu 已提交
1163
  int tlen = sizeof(SColIdx) * pBuilder->nCols + pBuilder->size;
1164
  // if (tlen == 0) return NULL;    // nCols == 0 means no tags
H
Hongze Cheng 已提交
1165

H
Hongze Cheng 已提交
1166 1167
  tlen += TD_KV_ROW_HEAD_SIZE;

wafwerar's avatar
wafwerar 已提交
1168
  SKVRow row = taosMemoryMalloc(tlen);
H
Hongze Cheng 已提交
1169 1170
  if (row == NULL) return NULL;

H
Hongze Cheng 已提交
1171
  kvRowSetNCols(row, pBuilder->nCols);
H
Hongze Cheng 已提交
1172
  kvRowSetLen(row, tlen);
H
Hongze Cheng 已提交
1173

H
Hongze Cheng 已提交
1174
  if (pBuilder->nCols > 0) {
1175 1176 1177
    memcpy(kvRowColIdx(row), pBuilder->pColIdx, sizeof(SColIdx) * pBuilder->nCols);
    memcpy(kvRowValues(row), pBuilder->buf, pBuilder->size);
  }
H
Hongze Cheng 已提交
1178 1179

  return row;
1180
}
H
Hongze Cheng 已提交
1181
#endif