tdataformat.c 27.9 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
S
common  
Shengliang Guan 已提交
15 16

#define _DEFAULT_SOURCE
S
slguan 已提交
17
#include "tdataformat.h"
S
Shengliang Guan 已提交
18
#include "tcoding.h"
L
Liu Jicong 已提交
19
#include "tdatablock.h"
S
log  
Shengliang Guan 已提交
20
#include "tlog.h"
H
more  
hzcheng 已提交
21

H
Hongze Cheng 已提交
22
typedef struct SKVIdx {
H
Hongze Cheng 已提交
23 24
  int32_t cid;
  int32_t offset;
H
Hongze Cheng 已提交
25
} SKVIdx;
H
Hongze Cheng 已提交
26

H
Hongze Cheng 已提交
27 28 29 30 31 32 33
#pragma pack(push, 1)
typedef struct {
  int16_t nCols;
  SKVIdx  idx[];
} STSKVRow;
#pragma pack(pop)

H
Hongze Cheng 已提交
34
#define TSROW_IS_KV_ROW(r) ((r)->flags & TSROW_KV_ROW)
H
Hongze Cheng 已提交
35 36
#define SET_BIT1(p, i, v)  ((p)[(i) / 8] = (p)[(i) / 8] & (~(((uint8_t)1) << ((i) % 8))) | ((v) << ((i) % 8)))
#define SET_BIT2(p, i, v)  ((p)[(i) / 4] = (p)[(i) / 4] & (~(((uint8_t)3) << ((i) % 4))) | ((v) << ((i) % 4)))
H
Hongze Cheng 已提交
37 38
#define GET_BIT1(p, i)     (((p)[(i) / 8] >> ((i) % 8)) & ((uint8_t)1))
#define GET_BIT2(p, i)     (((p)[(i) / 4] >> ((i) % 4)) & ((uint8_t)3))
H
Hongze Cheng 已提交
39

H
Hongze Cheng 已提交
40
// STSRow2
H
Hongze Cheng 已提交
41 42
int32_t tEncodeTSRow(SEncoder *pEncoder, const STSRow2 *pRow) {
  if (tEncodeI64(pEncoder, pRow->ts) < 0) return -1;
H
Hongze Cheng 已提交
43
  if (tEncodeU8(pEncoder, pRow->flags) < 0) return -1;
H
Hongze Cheng 已提交
44
  if (tEncodeI32v(pEncoder, pRow->sver) < 0) return -1;
H
Hongze Cheng 已提交
45 46 47 48 49 50 51 52 53 54 55 56 57

  ASSERT(pRow->flags & 0xf != 0);

  switch (pRow->flags & 0xf) {
    case TSROW_HAS_NONE:
    case TSROW_HAS_NULL:
      return 0;
    case TSROW_HAS_VAL:
      ASSERT(!TSROW_IS_KV_ROW(pRow));
    default:
      ASSERT(pRow->nData && pRow->pData);
      if (tEncodeBinary(pEncoder, pRow->pData, pRow->nData)) return -1;
      break;
H
Hongze Cheng 已提交
58
  }
H
Hongze Cheng 已提交
59

H
Hongze Cheng 已提交
60 61 62 63 64
  return 0;
}

int32_t tDecodeTSRow(SDecoder *pDecoder, STSRow2 *pRow) {
  if (tDecodeI64(pDecoder, &pRow->ts) < 0) return -1;
H
Hongze Cheng 已提交
65
  if (tDecodeU8(pDecoder, &pRow->flags) < 0) return -1;
H
Hongze Cheng 已提交
66
  if (tDecodeI32v(pDecoder, &pRow->sver) < 0) return -1;
H
Hongze Cheng 已提交
67 68 69 70 71 72 73 74 75 76 77 78 79 80

  ASSERT(pRow->flags & 0xf != 0);

  switch (pRow->flags & 0xf) {
    case TSROW_HAS_NONE:
    case TSROW_HAS_NULL:
      pRow->nData = 0;
      pRow->pData = NULL;
      return 0;
    case TSROW_HAS_VAL:
      ASSERT(!TSROW_IS_KV_ROW(pRow));
    default:
      if (tDecodeBinary(pDecoder, &pRow->pData, &pRow->nData)) return -1;
      break;
H
Hongze Cheng 已提交
81
  }
H
Hongze Cheng 已提交
82

H
Hongze Cheng 已提交
83 84 85
  return 0;
}

H
Hongze Cheng 已提交
86 87 88
static FORCE_INLINE int kvRowCmprFn(const void *p1, const void *p2) {
  col_id_t cid = *(col_id_t *)p1;
  SKVIdx  *pKVIdx = (SKVIdx *)p2;
H
Hongze Cheng 已提交
89

H
Hongze Cheng 已提交
90 91 92 93 94 95 96 97 98
  if (cid < pKVIdx->cid) {
    return -1;
  } else if (cid > pKVIdx->cid) {
    return 1;
  }
  return 0;
}

int32_t tTSRowGet(const STSRow2 *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal) {
H
Hongze Cheng 已提交
99 100 101 102 103 104 105
  uint32_t       n;
  const uint8_t *p;
  uint8_t        v;
  int32_t        bidx = iCol - 1;
  STColumn      *pTColumn = &pTSchema->columns[iCol];
  STSKVRow      *pTSKVRow;
  SKVIdx        *pKVIdx;
H
Hongze Cheng 已提交
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134

  ASSERT(pTColumn->colId != 0);

  ASSERT(pRow->flags & 0xf != 0);
  switch (pRow->flags & 0xf) {
    case TSROW_HAS_NONE:
      COL_VAL_SET_NONE(pColVal);
      return 0;
    case TSROW_HAS_NULL:
      COL_VAL_SET_NULL(pColVal);
      return 0;
  }

  if (TSROW_IS_KV_ROW(pRow)) {
    ASSERT((pRow->flags & 0xf) != TSROW_HAS_VAL);

    pTSKVRow = (STSKVRow *)pRow->pData;
    pKVIdx = bsearch(&pTColumn->colId, pTSKVRow->idx, pTSKVRow->nCols, sizeof(SKVIdx), kvRowCmprFn);
    if (pKVIdx == NULL) {
      COL_VAL_SET_NONE(pColVal);
    } else if (pKVIdx->offset < 0) {
      COL_VAL_SET_NULL(pColVal);
    } else {
      p = pRow->pData + sizeof(STSKVRow) + sizeof(SKVIdx) * pTSKVRow->nCols + pKVIdx->offset;
      tGetBinary(p, &p, &n);
      COL_VAL_SET_VAL(pColVal, p, n);
    }
  } else {
    // get bitmap
H
Hongze Cheng 已提交
135
    p = pRow->pData;
H
Hongze Cheng 已提交
136
    switch (pRow->flags & 0xf) {
H
Hongze Cheng 已提交
137
      case TSROW_HAS_NULL | TSROW_HAS_NONE:
H
Hongze Cheng 已提交
138 139 140
        v = GET_BIT1(p, bidx);
        if (v == 0) {
          COL_VAL_SET_NONE(pColVal);
H
Hongze Cheng 已提交
141
        } else {
H
Hongze Cheng 已提交
142
          COL_VAL_SET_NULL(pColVal);
H
Hongze Cheng 已提交
143
        }
H
Hongze Cheng 已提交
144
        return 0;
H
Hongze Cheng 已提交
145
      case TSROW_HAS_VAL | TSROW_HAS_NONE:
H
Hongze Cheng 已提交
146 147 148 149 150 151 152 153
        v = GET_BIT1(p, bidx);
        if (v == 1) {
          p = p + (pTSchema->numOfCols - 2) / 8 + 1;
          break;
        } else {
          COL_VAL_SET_NONE(pColVal);
          return 0;
        }
H
Hongze Cheng 已提交
154
      case TSROW_HAS_VAL | TSROW_HAS_NULL:
H
Hongze Cheng 已提交
155 156 157 158
        v = GET_BIT1(p, bidx);
        if (v == 1) {
          p = p + (pTSchema->numOfCols - 2) / 8 + 1;
          break;
H
Hongze Cheng 已提交
159
        } else {
H
Hongze Cheng 已提交
160 161
          COL_VAL_SET_NULL(pColVal);
          return 0;
H
Hongze Cheng 已提交
162
        }
H
Hongze Cheng 已提交
163
      case TSROW_HAS_VAL | TSROW_HAS_NULL | TSROW_HAS_NONE:
H
Hongze Cheng 已提交
164 165 166 167 168 169 170 171 172 173 174 175 176
        v = GET_BIT2(p, bidx);
        if (v == 0) {
          COL_VAL_SET_NONE(pColVal);
          return 0;
        } else if (v == 1) {
          COL_VAL_SET_NULL(pColVal);
          return 0;
        } else if (v == 2) {
          p = p + (pTSchema->numOfCols - 2) / 4 + 1;
          break;
        } else {
          ASSERT(0);
        }
H
Hongze Cheng 已提交
177
      default:
H
Hongze Cheng 已提交
178
        break;
H
Hongze Cheng 已提交
179
    }
H
Hongze Cheng 已提交
180 181 182 183

    // get real value
    p = p + pTColumn->offset;
    if (IS_VAR_DATA_TYPE(pTColumn->type)) {
H
Hongze Cheng 已提交
184
      tGetBinary(p + pTSchema->flen + *(int32_t *)p, &p, &n);
H
Hongze Cheng 已提交
185 186 187 188
    } else {
      n = pTColumn->bytes;
    }
    COL_VAL_SET_VAL(pColVal, p, n);
H
Hongze Cheng 已提交
189
  }
H
Hongze Cheng 已提交
190

H
Hongze Cheng 已提交
191 192 193 194
  return 0;
}

// STSchema
H
Hongze Cheng 已提交
195 196 197 198 199 200 201
int32_t tTSchemaCreate(int32_t sver, SSchema *pSchema, int32_t ncols, STSchema **ppTSchema) {
  *ppTSchema = (STSchema *)taosMemoryMalloc(sizeof(STSchema) + sizeof(STColumn) * ncols);
  if (*ppTSchema == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

H
Hongze Cheng 已提交
202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
  (*ppTSchema)->numOfCols = ncols;
  (*ppTSchema)->version = sver;
  (*ppTSchema)->flen = 0;
  (*ppTSchema)->vlen = 0;
  (*ppTSchema)->tlen = 0;

  for (int32_t iCol = 0; iCol < ncols; iCol++) {
    SSchema  *pColumn = &pSchema[iCol];
    STColumn *pTColumn = &((*ppTSchema)->columns[iCol]);

    pTColumn->colId = pColumn->colId;
    pTColumn->type = pColumn->type;
    pTColumn->flags = pColumn->flags;
    pTColumn->bytes = pColumn->bytes;
    pTColumn->offset = (*ppTSchema)->flen;

    // skip first column
    if (iCol) {
      (*ppTSchema)->flen += TYPE_BYTES[pColumn->type];
      if (IS_VAR_DATA_TYPE(pColumn->type)) {
        (*ppTSchema)->vlen += (pColumn->bytes + 5);
      }
    }
  }

H
Hongze Cheng 已提交
227 228 229
  return 0;
}

H
Hongze Cheng 已提交
230 231 232
void tTSchemaDestroy(STSchema *pTSchema) {
  if (pTSchema) taosMemoryFree(pTSchema);
}
H
Hongze Cheng 已提交
233

H
Hongze Cheng 已提交
234
// STSRowBuilder
H
Hongze Cheng 已提交
235
int32_t tTSRowBuilderInit(STSRowBuilder *pBuilder, int32_t sver, SSchema *pSchema, int32_t nCols) {
H
Hongze Cheng 已提交
236 237
  if (tTSchemaCreate(sver, pSchema, nCols, &pBuilder->pTSchema) < 0) return -1;

H
Hongze Cheng 已提交
238 239 240 241
  pBuilder->szBitMap1 = (nCols - 2) / 8 + 1;
  pBuilder->szBitMap2 = (nCols - 2) / 4 + 1;
  pBuilder->szKVBuf =
      sizeof(STSKVRow) + sizeof(SKVIdx) * (nCols - 1) + pBuilder->pTSchema->flen + pBuilder->pTSchema->vlen;
H
Hongze Cheng 已提交
242 243 244
  pBuilder->szTPBuf = pBuilder->szBitMap2 + pBuilder->pTSchema->flen + pBuilder->pTSchema->vlen;
  pBuilder->pKVBuf = taosMemoryMalloc(pBuilder->szKVBuf);
  if (pBuilder->pKVBuf == NULL) {
H
Hongze Cheng 已提交
245 246
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    tTSchemaDestroy(pBuilder->pTSchema);
H
Hongze Cheng 已提交
247
    return -1;
H
Hongze Cheng 已提交
248
  }
H
Hongze Cheng 已提交
249 250 251
  pBuilder->pTPBuf = taosMemoryMalloc(pBuilder->szTPBuf);
  if (pBuilder->pTPBuf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
252 253
    taosMemoryFree(pBuilder->pKVBuf);
    tTSchemaDestroy(pBuilder->pTSchema);
H
Hongze Cheng 已提交
254
    return -1;
H
Hongze Cheng 已提交
255 256
  }

H
Hongze Cheng 已提交
257 258 259
  return 0;
}

H
Hongze Cheng 已提交
260
void tTSRowBuilderClear(STSRowBuilder *pBuilder) {
H
Hongze Cheng 已提交
261 262 263 264
  if (pBuilder->pTPBuf) {
    taosMemoryFree(pBuilder->pTPBuf);
    pBuilder->pTPBuf = NULL;
  }
H
Hongze Cheng 已提交
265 266 267 268 269 270
  if (pBuilder->pKVBuf) {
    taosMemoryFree(pBuilder->pKVBuf);
    pBuilder->pKVBuf = NULL;
  }
  tTSchemaDestroy(pBuilder->pTSchema);
  pBuilder->pTSchema = NULL;
H
Hongze Cheng 已提交
271 272
}

H
Hongze Cheng 已提交
273
void tTSRowBuilderReset(STSRowBuilder *pBuilder) {
H
Hongze Cheng 已提交
274
  for (int32_t iCol = pBuilder->pTSchema->numOfCols - 1; iCol >= 0; iCol--) {
H
Hongze Cheng 已提交
275 276
    STColumn *pTColumn = &pBuilder->pTSchema->columns[iCol];
    COL_CLR_SET(pTColumn->flags);
H
Hongze Cheng 已提交
277 278
  }

H
Hongze Cheng 已提交
279
  pBuilder->iCol = 0;
H
Hongze Cheng 已提交
280
  ((STSKVRow *)pBuilder->pKVBuf)->nCols = 0;
H
Hongze Cheng 已提交
281 282
  pBuilder->vlenKV = 0;
  pBuilder->vlenTP = 0;
H
Hongze Cheng 已提交
283
  pBuilder->row.flags = 0;
H
Hongze Cheng 已提交
284 285 286
}

int32_t tTSRowBuilderPut(STSRowBuilder *pBuilder, int32_t cid, const uint8_t *pData, uint32_t nData) {
H
Hongze Cheng 已提交
287 288 289
  STColumn *pTColumn = &pBuilder->pTSchema->columns[pBuilder->iCol];
  uint8_t  *p;
  int32_t   iCol;
H
Hongze Cheng 已提交
290
  STSKVRow *pTSKVRow = (STSKVRow *)pBuilder->pKVBuf;
H
Hongze Cheng 已提交
291 292 293 294 295 296

  // use interp search (todo)
  if (pTColumn->colId > cid) {
    for (iCol = pBuilder->iCol + 1; iCol < pBuilder->pTSchema->numOfCols; iCol++) {
      pTColumn = &pBuilder->pTSchema->columns[iCol];
      if (pTColumn->colId == cid) break;
H
Hongze Cheng 已提交
297
    }
H
Hongze Cheng 已提交
298 299 300 301
  } else if (pTColumn->colId < cid) {
    for (iCol = pBuilder->iCol - 1; iCol >= 0; iCol--) {
      pTColumn = &pBuilder->pTSchema->columns[iCol];
      if (pTColumn->colId == cid) break;
H
Hongze Cheng 已提交
302
    }
H
Hongze Cheng 已提交
303 304
  }

H
Hongze Cheng 已提交
305
  if (pTColumn->colId != cid || COL_IS_SET(pTColumn->flags)) {
H
Hongze Cheng 已提交
306 307 308
    return -1;
  }

H
Hongze Cheng 已提交
309 310
  pBuilder->iCol = iCol;

H
Hongze Cheng 已提交
311 312
  // set value
  if (cid == 0) {
H
Hongze Cheng 已提交
313
    ASSERT(pData && nData == sizeof(TSKEY) && iCol == 0);
H
Hongze Cheng 已提交
314
    pBuilder->row.ts = *(TSKEY *)pData;
H
Hongze Cheng 已提交
315
    pTColumn->flags |= COL_SET_VAL;
H
Hongze Cheng 已提交
316
  } else {
H
Hongze Cheng 已提交
317 318
    if (pData) {
      // set VAL
H
Hongze Cheng 已提交
319

H
Hongze Cheng 已提交
320 321 322 323 324
      pBuilder->row.flags |= TSROW_HAS_VAL;
      pTColumn->flags |= COL_SET_VAL;

      /* KV */
      if (1) {  // avoid KV at some threshold (todo)
H
Hongze Cheng 已提交
325 326
        pTSKVRow->idx[pTSKVRow->nCols].cid = cid;
        pTSKVRow->idx[pTSKVRow->nCols].offset = pBuilder->vlenKV;
H
Hongze Cheng 已提交
327

H
Hongze Cheng 已提交
328 329
        p = pBuilder->pKVBuf + sizeof(STSKVRow) + sizeof(SKVIdx) * (pBuilder->pTSchema->numOfCols - 1) +
            pBuilder->vlenKV;
H
Hongze Cheng 已提交
330 331 332 333 334 335 336 337
        if (IS_VAR_DATA_TYPE(pTColumn->type)) {
          ASSERT(nData <= pTColumn->bytes);
          pBuilder->vlenKV += tPutBinary(p, pData, nData);
        } else {
          ASSERT(nData == pTColumn->bytes);
          memcpy(p, pData, nData);
          pBuilder->vlenKV += nData;
        }
H
Hongze Cheng 已提交
338 339
      }

H
Hongze Cheng 已提交
340 341 342 343 344
      /* TUPLE */
      p = pBuilder->pTPBuf + pBuilder->szBitMap2 + pTColumn->offset;
      if (IS_VAR_DATA_TYPE(pTColumn->type)) {
        ASSERT(nData <= pTColumn->bytes);
        *(int32_t *)p = pBuilder->vlenTP;
H
Hongze Cheng 已提交
345

H
Hongze Cheng 已提交
346
        p = pBuilder->pTPBuf + pBuilder->szBitMap2 + pBuilder->pTSchema->flen + pBuilder->vlenTP;
H
Hongze Cheng 已提交
347
        pBuilder->vlenTP += tPutBinary(p, pData, nData);
H
Hongze Cheng 已提交
348
      } else {
H
Hongze Cheng 已提交
349
        ASSERT(nData == pTColumn->bytes);
H
Hongze Cheng 已提交
350 351
        memcpy(p, pData, nData);
      }
H
Hongze Cheng 已提交
352 353 354
    } else {
      // set NULL

H
Hongze Cheng 已提交
355
      pBuilder->row.flags |= TSROW_HAS_NULL;
H
Hongze Cheng 已提交
356
      pTColumn->flags |= COL_SET_NULL;
H
Hongze Cheng 已提交
357

H
Hongze Cheng 已提交
358 359
      pTSKVRow->idx[pTSKVRow->nCols].cid = cid;
      pTSKVRow->idx[pTSKVRow->nCols].offset = -1;
H
Hongze Cheng 已提交
360
    }
H
Hongze Cheng 已提交
361

H
Hongze Cheng 已提交
362
    pTSKVRow->nCols++;
H
Hongze Cheng 已提交
363 364 365 366 367
  }

  return 0;
}

H
Hongze Cheng 已提交
368 369 370 371 372 373 374 375 376 377
static FORCE_INLINE int tSKVIdxCmprFn(const void *p1, const void *p2) {
  SKVIdx *pKVIdx1 = (SKVIdx *)p1;
  SKVIdx *pKVIdx2 = (SKVIdx *)p2;
  if (pKVIdx1->cid > pKVIdx2->cid) {
    return 1;
  } else if (pKVIdx1->cid < pKVIdx2->cid) {
    return -1;
  }
  return 0;
}
H
Hongze Cheng 已提交
378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413
static void setBitMap(uint8_t *p, STSchema *pTSchema, uint8_t flags) {
  int32_t   bidx;
  STColumn *pTColumn;

  for (int32_t iCol = 1; iCol < pTSchema->numOfCols; iCol++) {
    pTColumn = &pTSchema->columns[iCol];
    bidx = iCol - 1;

    switch (flags) {
      case TSROW_HAS_NULL | TSROW_HAS_NONE:
        if (pTColumn->flags & COL_SET_NULL) {
          SET_BIT1(p, bidx, (uint8_t)1);
        } else {
          SET_BIT1(p, bidx, (uint8_t)0);
        }
        break;
      case TSROW_HAS_VAL | TSROW_HAS_NULL | TSROW_HAS_NONE:
        if (pTColumn->flags & COL_SET_NULL) {
          SET_BIT2(p, bidx, (uint8_t)1);
        } else if (pTColumn->flags & COL_SET_NULL) {
          SET_BIT2(p, bidx, (uint8_t)2);
        } else {
          SET_BIT2(p, bidx, (uint8_t)0);
        }
        break;
      default:
        if (pTColumn->flags & COL_SET_VAL) {
          SET_BIT1(p, bidx, (uint8_t)1);
        } else {
          SET_BIT1(p, bidx, (uint8_t)0);
        }

        break;
    }
  }
}
H
Hongze Cheng 已提交
414
int32_t tTSRowBuilderGetRow(STSRowBuilder *pBuilder, const STSRow2 **ppRow) {
H
Hongze Cheng 已提交
415 416 417
  int32_t   nDataTP, nDataKV;
  uint32_t  flags;
  STSKVRow *pTSKVRow = (STSKVRow *)pBuilder->pKVBuf;
H
Hongze Cheng 已提交
418 419

  // error not set ts
H
Hongze Cheng 已提交
420
  if (!COL_IS_SET(pBuilder->pTSchema->columns->flags)) {
H
Hongze Cheng 已提交
421 422 423
    return -1;
  }

H
Hongze Cheng 已提交
424 425
  ASSERT(pTSKVRow->nCols < pBuilder->pTSchema->numOfCols);
  if (pTSKVRow->nCols < pBuilder->pTSchema->numOfCols - 1) {
H
Hongze Cheng 已提交
426
    pBuilder->row.flags |= TSROW_HAS_NONE;
H
Hongze Cheng 已提交
427
  }
H
Hongze Cheng 已提交
428

H
Hongze Cheng 已提交
429 430 431 432 433
  ASSERT(pBuilder->row.flags & 0xf != 0);
  *(ppRow) = &pBuilder->row;
  switch (pBuilder->row.flags & 0xf) {
    case TSROW_HAS_NONE:
    case TSROW_HAS_NULL:
H
Hongze Cheng 已提交
434 435
      pBuilder->row.nData = 0;
      pBuilder->row.pData = NULL;
H
Hongze Cheng 已提交
436 437
      return 0;
    case TSROW_HAS_NULL | TSROW_HAS_NONE:
H
Hongze Cheng 已提交
438
      nDataTP = pBuilder->szBitMap1;
H
Hongze Cheng 已提交
439
      break;
H
Hongze Cheng 已提交
440
    case TSROW_HAS_VAL:
H
Hongze Cheng 已提交
441
      nDataTP = pBuilder->pTSchema->flen + pBuilder->vlenTP;
H
Hongze Cheng 已提交
442
      break;
H
Hongze Cheng 已提交
443 444
    case TSROW_HAS_VAL | TSROW_HAS_NONE:
    case TSROW_HAS_VAL | TSROW_HAS_NULL:
H
Hongze Cheng 已提交
445
      nDataTP = pBuilder->szBitMap1 + pBuilder->pTSchema->flen + pBuilder->vlenTP;
H
Hongze Cheng 已提交
446
      break;
H
Hongze Cheng 已提交
447
    case TSROW_HAS_VAL | TSROW_HAS_NULL | TSROW_HAS_NONE:
H
Hongze Cheng 已提交
448
      nDataTP = pBuilder->szBitMap2 + pBuilder->pTSchema->flen + pBuilder->vlenTP;
H
Hongze Cheng 已提交
449 450
      break;
    default:
H
Hongze Cheng 已提交
451
      ASSERT(0);
H
Hongze Cheng 已提交
452 453
  }

H
Hongze Cheng 已提交
454 455
  nDataKV = sizeof(STSKVRow) + sizeof(SKVIdx) * pTSKVRow->nCols + pBuilder->vlenKV;
  pBuilder->row.sver = pBuilder->pTSchema->version;
H
Hongze Cheng 已提交
456 457 458
  if (nDataKV < nDataTP) {
    // generate KV row

H
Hongze Cheng 已提交
459
    pBuilder->row.flags |= TSROW_KV_ROW;
H
Hongze Cheng 已提交
460
    pBuilder->row.nData = nDataKV;
H
Hongze Cheng 已提交
461
    pBuilder->row.pData = pBuilder->pKVBuf;
H
Hongze Cheng 已提交
462

H
Hongze Cheng 已提交
463 464 465
    qsort(pTSKVRow->idx, pTSKVRow->nCols, sizeof(SKVIdx), tSKVIdxCmprFn);
    if (pTSKVRow->nCols < pBuilder->pTSchema->numOfCols - 1) {
      memmove(&pTSKVRow->idx[pTSKVRow->nCols], &pTSKVRow->idx[pBuilder->pTSchema->numOfCols - 1], pBuilder->vlenKV);
H
Hongze Cheng 已提交
466 467
    }
  } else {
H
Hongze Cheng 已提交
468 469 470 471
    // generate TUPLE row

    pBuilder->row.nData = nDataTP;

H
Hongze Cheng 已提交
472 473
    uint8_t *p;
    uint8_t  flags = pBuilder->row.flags & 0xf;
H
Hongze Cheng 已提交
474

H
Hongze Cheng 已提交
475 476
    if (flags == TSROW_HAS_VAL) {
      pBuilder->row.pData = pBuilder->pTPBuf + pBuilder->szBitMap2;
H
Hongze Cheng 已提交
477
    } else {
H
Hongze Cheng 已提交
478 479 480 481
      if (flags == TSROW_HAS_VAL) {
        p = pBuilder->pTPBuf;
      } else {
        p = pBuilder->pTPBuf + pBuilder->szBitMap2 - pBuilder->szBitMap1;
H
Hongze Cheng 已提交
482 483
      }

H
Hongze Cheng 已提交
484
      setBitMap(p, pBuilder->pTSchema, flags);
H
Hongze Cheng 已提交
485 486
      pBuilder->row.pData = p;
    }
H
Hongze Cheng 已提交
487 488 489 490 491
  }

  return 0;
}

H
Hongze Cheng 已提交
492
#if 1  // ====================
493
static void dataColSetNEleNull(SDataCol *pCol, int nEle);
H
Hongze Cheng 已提交
494
int         tdAllocMemForCol(SDataCol *pCol, int maxPoints) {
L
Liu Jicong 已提交
495
  int spaceNeeded = pCol->bytes * maxPoints;
S
Shengliang Guan 已提交
496
  if (IS_VAR_DATA_TYPE(pCol->type)) {
L
Liu Jicong 已提交
497
    spaceNeeded += sizeof(VarDataOffsetT) * maxPoints;
L
Liu Jicong 已提交
498
  }
C
Cary Xu 已提交
499
#ifdef TD_SUPPORT_BITMAP
C
Cary Xu 已提交
500 501 502 503
  int32_t nBitmapBytes = (int32_t)TD_BITMAP_BYTES(maxPoints);
  spaceNeeded += (int)nBitmapBytes;
  // TODO: Currently, the compression of bitmap parts is affiliated to the column data parts, thus allocate 1 more
  // TYPE_BYTES as to comprise complete TYPE_BYTES. Otherwise, invalid read/write would be triggered.
H
Hongze Cheng 已提交
504 505
  // spaceNeeded += TYPE_BYTES[pCol->type]; // the bitmap part is append as a single part since 2022.04.03, thus
  // remove the additional space
C
Cary Xu 已提交
506
#endif
C
Cary Xu 已提交
507

S
Shengliang Guan 已提交
508
  if (pCol->spaceSize < spaceNeeded) {
wafwerar's avatar
wafwerar 已提交
509
    void *ptr = taosMemoryRealloc(pCol->pData, spaceNeeded);
S
Shengliang Guan 已提交
510 511
    if (ptr == NULL) {
      uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)spaceNeeded, strerror(errno));
L
Liu Jicong 已提交
512
      return -1;
L
Liu Jicong 已提交
513 514 515
    } else {
      pCol->pData = ptr;
      pCol->spaceSize = spaceNeeded;
516 517
    }
  }
C
Cary Xu 已提交
518
#ifdef TD_SUPPORT_BITMAP
519

C
Cary Xu 已提交
520 521 522 523
  if (IS_VAR_DATA_TYPE(pCol->type)) {
    pCol->pBitmap = POINTER_SHIFT(pCol->pData, pCol->bytes * maxPoints);
    pCol->dataOff = POINTER_SHIFT(pCol->pBitmap, nBitmapBytes);
  } else {
C
Cary Xu 已提交
524
    pCol->pBitmap = POINTER_SHIFT(pCol->pData, pCol->bytes * maxPoints);
L
Liu Jicong 已提交
525
  }
C
Cary Xu 已提交
526 527 528 529
#else
  if (IS_VAR_DATA_TYPE(pCol->type)) {
    pCol->dataOff = POINTER_SHIFT(pCol->pData, pCol->bytes * maxPoints);
  }
C
Cary Xu 已提交
530
#endif
L
Liu Jicong 已提交
531
  return 0;
532 533
}

H
hzcheng 已提交
534 535 536
/**
 * Duplicate the schema and return a new object
 */
H
Hongze Cheng 已提交
537
STSchema *tdDupSchema(const STSchema *pSchema) {
S
Shengliang Guan 已提交
538
  int       tlen = sizeof(STSchema) + sizeof(STColumn) * schemaNCols(pSchema);
wafwerar's avatar
wafwerar 已提交
539
  STSchema *tSchema = (STSchema *)taosMemoryMalloc(tlen);
H
hzcheng 已提交
540 541
  if (tSchema == NULL) return NULL;

H
Hongze Cheng 已提交
542
  memcpy((void *)tSchema, (void *)pSchema, tlen);
H
hzcheng 已提交
543 544 545 546

  return tSchema;
}

H
TD-27  
hzcheng 已提交
547 548 549
/**
 * Encode a schema to dst, and return the next pointer
 */
H
TD-353  
Hongze Cheng 已提交
550 551 552 553
int tdEncodeSchema(void **buf, STSchema *pSchema) {
  int tlen = 0;
  tlen += taosEncodeFixedI32(buf, schemaVersion(pSchema));
  tlen += taosEncodeFixedI32(buf, schemaNCols(pSchema));
H
TD-166  
hzcheng 已提交
554

H
TD-27  
hzcheng 已提交
555 556
  for (int i = 0; i < schemaNCols(pSchema); i++) {
    STColumn *pCol = schemaColAt(pSchema, i);
H
TD-353  
Hongze Cheng 已提交
557
    tlen += taosEncodeFixedI8(buf, colType(pCol));
C
Cary Xu 已提交
558
    tlen += taosEncodeFixedI8(buf, colFlags(pCol));
H
TD-353  
Hongze Cheng 已提交
559
    tlen += taosEncodeFixedI16(buf, colColId(pCol));
560
    tlen += taosEncodeFixedI16(buf, colBytes(pCol));
H
TD-27  
hzcheng 已提交
561 562
  }

H
TD-353  
Hongze Cheng 已提交
563
  return tlen;
H
TD-27  
hzcheng 已提交
564 565 566 567 568
}

/**
 * Decode a schema from a binary.
 */
H
TD-353  
Hongze Cheng 已提交
569
void *tdDecodeSchema(void *buf, STSchema **pRSchema) {
S
Shengliang Guan 已提交
570 571
  int             version = 0;
  int             numOfCols = 0;
H
TD-353  
Hongze Cheng 已提交
572
  STSchemaBuilder schemaBuilder;
H
TD-27  
hzcheng 已提交
573

H
TD-353  
Hongze Cheng 已提交
574 575
  buf = taosDecodeFixedI32(buf, &version);
  buf = taosDecodeFixedI32(buf, &numOfCols);
H
TD-27  
hzcheng 已提交
576

H
Hongze Cheng 已提交
577 578
  if (tdInitTSchemaBuilder(&schemaBuilder, version) < 0) return NULL;

H
TD-353  
Hongze Cheng 已提交
579
  for (int i = 0; i < numOfCols; i++) {
580
    col_type_t  type = 0;
C
Cary Xu 已提交
581
    int8_t      flags = 0;
582 583
    col_id_t    colId = 0;
    col_bytes_t bytes = 0;
H
TD-353  
Hongze Cheng 已提交
584
    buf = taosDecodeFixedI8(buf, &type);
C
Cary Xu 已提交
585
    buf = taosDecodeFixedI8(buf, &flags);
H
TD-353  
Hongze Cheng 已提交
586
    buf = taosDecodeFixedI16(buf, &colId);
587
    buf = taosDecodeFixedI32(buf, &bytes);
C
Cary Xu 已提交
588
    if (tdAddColToSchema(&schemaBuilder, type, flags, colId, bytes) < 0) {
H
Hongze Cheng 已提交
589 590 591
      tdDestroyTSchemaBuilder(&schemaBuilder);
      return NULL;
    }
H
TD-27  
hzcheng 已提交
592 593
  }

H
TD-353  
Hongze Cheng 已提交
594
  *pRSchema = tdGetSchemaFromBuilder(&schemaBuilder);
H
Hongze Cheng 已提交
595
  tdDestroyTSchemaBuilder(&schemaBuilder);
H
TD-353  
Hongze Cheng 已提交
596
  return buf;
H
Hongze Cheng 已提交
597 598
}

C
Cary Xu 已提交
599
int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, schema_ver_t version) {
H
Hongze Cheng 已提交
600 601 602
  if (pBuilder == NULL) return -1;

  pBuilder->tCols = 256;
wafwerar's avatar
wafwerar 已提交
603
  pBuilder->columns = (STColumn *)taosMemoryMalloc(sizeof(STColumn) * pBuilder->tCols);
H
Hongze Cheng 已提交
604 605 606 607 608 609 610 611
  if (pBuilder->columns == NULL) return -1;

  tdResetTSchemaBuilder(pBuilder, version);
  return 0;
}

void tdDestroyTSchemaBuilder(STSchemaBuilder *pBuilder) {
  if (pBuilder) {
wafwerar's avatar
wafwerar 已提交
612
    taosMemoryFreeClear(pBuilder->columns);
H
Hongze Cheng 已提交
613 614 615
  }
}

C
Cary Xu 已提交
616
void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, schema_ver_t version) {
H
Hongze Cheng 已提交
617 618 619
  pBuilder->nCols = 0;
  pBuilder->tlen = 0;
  pBuilder->flen = 0;
T
Tao Liu 已提交
620
  pBuilder->vlen = 0;
H
Hongze Cheng 已提交
621 622 623
  pBuilder->version = version;
}

C
Cary Xu 已提交
624
int32_t tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int8_t flags, col_id_t colId, col_bytes_t bytes) {
625
  if (!isValidDataType(type)) return -1;
H
Hongze Cheng 已提交
626 627 628

  if (pBuilder->nCols >= pBuilder->tCols) {
    pBuilder->tCols *= 2;
wafwerar's avatar
wafwerar 已提交
629
    STColumn *columns = (STColumn *)taosMemoryRealloc(pBuilder->columns, sizeof(STColumn) * pBuilder->tCols);
T
tickduan 已提交
630 631
    if (columns == NULL) return -1;
    pBuilder->columns = columns;
H
Hongze Cheng 已提交
632 633 634 635 636
  }

  STColumn *pCol = &(pBuilder->columns[pBuilder->nCols]);
  colSetType(pCol, type);
  colSetColId(pCol, colId);
C
Cary Xu 已提交
637
  colSetFlags(pCol, flags);
H
Hongze Cheng 已提交
638 639 640
  if (pBuilder->nCols == 0) {
    colSetOffset(pCol, 0);
  } else {
S
Shengliang Guan 已提交
641
    STColumn *pTCol = &(pBuilder->columns[pBuilder->nCols - 1]);
H
Hongze Cheng 已提交
642 643 644 645 646
    colSetOffset(pCol, pTCol->offset + TYPE_BYTES[pTCol->type]);
  }

  if (IS_VAR_DATA_TYPE(type)) {
    colSetBytes(pCol, bytes);
T
Tao Liu 已提交
647 648
    pBuilder->tlen += (TYPE_BYTES[type] + bytes);
    pBuilder->vlen += bytes - sizeof(VarDataLenT);
H
Hongze Cheng 已提交
649 650 651
  } else {
    colSetBytes(pCol, TYPE_BYTES[type]);
    pBuilder->tlen += TYPE_BYTES[type];
T
Tao Liu 已提交
652
    pBuilder->vlen += TYPE_BYTES[type];
H
Hongze Cheng 已提交
653 654 655 656 657 658 659 660 661 662 663 664 665 666 667
  }

  pBuilder->nCols++;
  pBuilder->flen += TYPE_BYTES[type];

  ASSERT(pCol->offset < pBuilder->flen);

  return 0;
}

STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder) {
  if (pBuilder->nCols <= 0) return NULL;

  int tlen = sizeof(STSchema) + sizeof(STColumn) * pBuilder->nCols;

wafwerar's avatar
wafwerar 已提交
668
  STSchema *pSchema = (STSchema *)taosMemoryMalloc(tlen);
H
Hongze Cheng 已提交
669 670 671 672 673 674
  if (pSchema == NULL) return NULL;

  schemaVersion(pSchema) = pBuilder->version;
  schemaNCols(pSchema) = pBuilder->nCols;
  schemaTLen(pSchema) = pBuilder->tlen;
  schemaFLen(pSchema) = pBuilder->flen;
T
Tao Liu 已提交
675
  schemaVLen(pSchema) = pBuilder->vlen;
H
Hongze Cheng 已提交
676

C
Cary Xu 已提交
677
#ifdef TD_SUPPORT_BITMAP
C
Cary Xu 已提交
678
  schemaTLen(pSchema) += (int)TD_BITMAP_BYTES(schemaNCols(pSchema));
C
Cary Xu 已提交
679 680
#endif

H
Hongze Cheng 已提交
681 682
  memcpy(schemaColAt(pSchema, 0), pBuilder->columns, sizeof(STColumn) * pBuilder->nCols);

H
TD-27  
hzcheng 已提交
683 684 685
  return pSchema;
}

686
void dataColInit(SDataCol *pDataCol, STColumn *pCol, int maxPoints) {
H
TD-166  
hzcheng 已提交
687 688 689
  pDataCol->type = colType(pCol);
  pDataCol->colId = colColId(pCol);
  pDataCol->bytes = colBytes(pCol);
S
Shengliang Guan 已提交
690
  pDataCol->offset = colOffset(pCol) + 0;  // TD_DATA_ROW_HEAD_SIZE;
H
TD-166  
hzcheng 已提交
691 692 693

  pDataCol->len = 0;
}
C
Cary Xu 已提交
694

L
Liu Jicong 已提交
695 696 697 698 699 700
static FORCE_INLINE const void *tdGetColDataOfRowUnsafe(SDataCol *pCol, int row) {
  if (IS_VAR_DATA_TYPE(pCol->type)) {
    return POINTER_SHIFT(pCol->pData, pCol->dataOff[row]);
  } else {
    return POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * row);
  }
H
TD-166  
hzcheng 已提交
701 702
}

H
TD-166  
hzcheng 已提交
703
bool isNEleNull(SDataCol *pCol, int nEle) {
S
Shengliang Guan 已提交
704
  if (isAllRowsNull(pCol)) return true;
705
  for (int i = 0; i < nEle; ++i) {
L
Liu Jicong 已提交
706
    if (!isNull(tdGetColDataOfRowUnsafe(pCol, i), pCol->type)) return false;
H
TD-166  
hzcheng 已提交
707
  }
H
Hongze Cheng 已提交
708
  return true;
H
TD-166  
hzcheng 已提交
709 710
}

C
Cary Xu 已提交
711
void *dataColSetOffset(SDataCol *pCol, int nEle) {
H
TD-166  
hzcheng 已提交
712 713
  ASSERT(((pCol->type == TSDB_DATA_TYPE_BINARY) || (pCol->type == TSDB_DATA_TYPE_NCHAR)));

H
Hongze Cheng 已提交
714
  void *tptr = pCol->pData;
H
TD-166  
hzcheng 已提交
715
  // char *tptr = (char *)(pCol->pData);
H
TD-166  
hzcheng 已提交
716

H
TD-166  
hzcheng 已提交
717
  VarDataOffsetT offset = 0;
718
  for (int i = 0; i < nEle; ++i) {
H
TD-166  
hzcheng 已提交
719
    pCol->dataOff[i] = offset;
H
TD-166  
hzcheng 已提交
720
    offset += varDataTLen(tptr);
H
hzcheng 已提交
721
    tptr = POINTER_SHIFT(tptr, varDataTLen(tptr));
H
TD-166  
hzcheng 已提交
722
  }
C
Cary Xu 已提交
723
  return POINTER_SHIFT(tptr, varDataTLen(tptr));
H
TD-166  
hzcheng 已提交
724 725
}

L
Liu Jicong 已提交
726
SDataCols *tdNewDataCols(int maxCols, int maxRows) {
wafwerar's avatar
wafwerar 已提交
727
  SDataCols *pCols = (SDataCols *)taosMemoryCalloc(1, sizeof(SDataCols));
H
Haojun Liao 已提交
728
  if (pCols == NULL) {
S
Shengliang Guan 已提交
729
    uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)sizeof(SDataCols), strerror(errno));
H
Haojun Liao 已提交
730 731
    return NULL;
  }
H
TD-34  
hzcheng 已提交
732

H
Hongze Cheng 已提交
733
  pCols->maxPoints = maxRows;
L
Liu Jicong 已提交
734 735 736
  pCols->maxCols = maxCols;
  pCols->numOfRows = 0;
  pCols->numOfCols = 0;
C
Cary Xu 已提交
737
  // pCols->bitmapMode = 0; // calloc already set 0
H
Hongze Cheng 已提交
738 739

  if (maxCols > 0) {
wafwerar's avatar
wafwerar 已提交
740
    pCols->cols = (SDataCol *)taosMemoryCalloc(maxCols, sizeof(SDataCol));
H
Hongze Cheng 已提交
741 742 743 744 745 746
    if (pCols->cols == NULL) {
      uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)sizeof(SDataCol) * maxCols,
             strerror(errno));
      tdFreeDataCols(pCols);
      return NULL;
    }
747
#if 0  // no need as calloc used
L
Liu Jicong 已提交
748
    int i;
S
Shengliang Guan 已提交
749
    for (i = 0; i < maxCols; i++) {
L
Liu Jicong 已提交
750
      pCols->cols[i].spaceSize = 0;
L
Liu Jicong 已提交
751
      pCols->cols[i].len = 0;
L
Liu Jicong 已提交
752 753 754
      pCols->cols[i].pData = NULL;
      pCols->cols[i].dataOff = NULL;
    }
755
#endif
H
Hongze Cheng 已提交
756 757
  }

H
TD-34  
hzcheng 已提交
758 759 760
  return pCols;
}

H
Hongze Cheng 已提交
761
int tdInitDataCols(SDataCols *pCols, STSchema *pSchema) {
762 763
  int i;
  int oldMaxCols = pCols->maxCols;
L
Liu Jicong 已提交
764
  if (schemaNCols(pSchema) > oldMaxCols) {
H
Hongze Cheng 已提交
765
    pCols->maxCols = schemaNCols(pSchema);
wafwerar's avatar
wafwerar 已提交
766
    void *ptr = (SDataCol *)taosMemoryRealloc(pCols->cols, sizeof(SDataCol) * pCols->maxCols);
L
Liu Jicong 已提交
767 768
    if (ptr == NULL) return -1;
    pCols->cols = ptr;
769
    for (i = oldMaxCols; i < pCols->maxCols; ++i) {
770 771
      pCols->cols[i].pData = NULL;
      pCols->cols[i].dataOff = NULL;
772
      pCols->cols[i].pBitmap = NULL;
L
Liu Jicong 已提交
773
      pCols->cols[i].spaceSize = 0;
774
    }
L
Liu Jicong 已提交
775
  }
776 777 778
#if 0
  tdResetDataCols(pCols); // redundant loop to reset len/blen to 0, already reset in following dataColInit(...)
#endif
H
Hongze Cheng 已提交
779

780
  pCols->numOfRows = 0;
C
Cary Xu 已提交
781
  pCols->bitmapMode = 0;
H
TD-34  
hzcheng 已提交
782 783
  pCols->numOfCols = schemaNCols(pSchema);

784
  for (i = 0; i < schemaNCols(pSchema); ++i) {
785
    dataColInit(pCols->cols + i, schemaColAt(pSchema, i), pCols->maxPoints);
H
TD-34  
hzcheng 已提交
786
  }
S
Shengliang Guan 已提交
787

H
Hongze Cheng 已提交
788
  return 0;
H
TD-34  
hzcheng 已提交
789 790
}

H
Hongze Cheng 已提交
791
SDataCols *tdFreeDataCols(SDataCols *pCols) {
792
  int i;
H
TD-34  
hzcheng 已提交
793
  if (pCols) {
S
Shengliang Guan 已提交
794
    if (pCols->cols) {
795
      int maxCols = pCols->maxCols;
796
      for (i = 0; i < maxCols; ++i) {
797
        SDataCol *pCol = &pCols->cols[i];
wafwerar's avatar
wafwerar 已提交
798
        taosMemoryFreeClear(pCol->pData);
799
      }
wafwerar's avatar
wafwerar 已提交
800
      taosMemoryFree(pCols->cols);
801 802
      pCols->cols = NULL;
    }
wafwerar's avatar
wafwerar 已提交
803
    taosMemoryFree(pCols);
H
TD-34  
hzcheng 已提交
804
  }
H
Hongze Cheng 已提交
805
  return NULL;
H
TD-34  
hzcheng 已提交
806 807 808
}

void tdResetDataCols(SDataCols *pCols) {
B
Bomin Zhang 已提交
809 810
  if (pCols != NULL) {
    pCols->numOfRows = 0;
C
Cary Xu 已提交
811
    pCols->bitmapMode = 0;
812
    for (int i = 0; i < pCols->maxCols; ++i) {
B
Bomin Zhang 已提交
813 814
      dataColReset(pCols->cols + i);
    }
H
TD-34  
hzcheng 已提交
815 816
  }
}
H
Hongze Cheng 已提交
817

H
Hongze Cheng 已提交
818
SKVRow tdKVRowDup(SKVRow row) {
wafwerar's avatar
wafwerar 已提交
819
  SKVRow trow = taosMemoryMalloc(kvRowLen(row));
H
Hongze Cheng 已提交
820 821
  if (trow == NULL) return NULL;

H
Hongze Cheng 已提交
822
  kvRowCpy(trow, row);
H
Hongze Cheng 已提交
823 824 825
  return trow;
}

S
Shengliang Guan 已提交
826 827 828
static int compareColIdx(const void *a, const void *b) {
  const SColIdx *x = (const SColIdx *)a;
  const SColIdx *y = (const SColIdx *)b;
B
Bomin Zhang 已提交
829 830 831 832 833 834 835 836 837
  if (x->colId > y->colId) {
    return 1;
  }
  if (x->colId < y->colId) {
    return -1;
  }
  return 0;
}

S
Shengliang Guan 已提交
838
void tdSortKVRowByColIdx(SKVRow row) { qsort(kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), compareColIdx); }
B
Bomin Zhang 已提交
839

H
TD-90  
Hongze Cheng 已提交
840 841 842 843
int tdSetKVRowDataOfCol(SKVRow *orow, int16_t colId, int8_t type, void *value) {
  SColIdx *pColIdx = NULL;
  SKVRow   row = *orow;
  SKVRow   nrow = NULL;
S
Shengliang Guan 已提交
844
  void    *ptr = taosbsearch(&colId, kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), comparTagId, TD_GE);
H
TD-90  
Hongze Cheng 已提交
845

846
  if (ptr == NULL || ((SColIdx *)ptr)->colId > colId) {  // need to add a column value to the row
C
Cary Xu 已提交
847
    int diff = IS_VAR_DATA_TYPE(type) ? varDataTLen(value) : TYPE_BYTES[type];
848 849 850 851
    int nRowLen = kvRowLen(row) + sizeof(SColIdx) + diff;
    int oRowCols = kvRowNCols(row);

    ASSERT(diff > 0);
wafwerar's avatar
wafwerar 已提交
852
    nrow = taosMemoryMalloc(nRowLen);
H
TD-90  
Hongze Cheng 已提交
853 854
    if (nrow == NULL) return -1;

855 856
    kvRowSetLen(nrow, nRowLen);
    kvRowSetNCols(nrow, oRowCols + 1);
H
TD-90  
Hongze Cheng 已提交
857

858 859
    memcpy(kvRowColIdx(nrow), kvRowColIdx(row), sizeof(SColIdx) * oRowCols);
    memcpy(kvRowValues(nrow), kvRowValues(row), kvRowValLen(row));
H
TD-90  
Hongze Cheng 已提交
860

861 862 863
    pColIdx = kvRowColIdxAt(nrow, oRowCols);
    pColIdx->colId = colId;
    pColIdx->offset = kvRowValLen(row);
H
TD-90  
Hongze Cheng 已提交
864

865
    memcpy(kvRowColVal(nrow, pColIdx), value, diff);  // copy new value
H
TD-90  
Hongze Cheng 已提交
866

867
    tdSortKVRowByColIdx(nrow);
H
TD-90  
Hongze Cheng 已提交
868 869

    *orow = nrow;
wafwerar's avatar
wafwerar 已提交
870
    taosMemoryFree(row);
H
TD-90  
Hongze Cheng 已提交
871 872 873 874 875
  } else {
    ASSERT(((SColIdx *)ptr)->colId == colId);
    if (IS_VAR_DATA_TYPE(type)) {
      void *pOldVal = kvRowColVal(row, (SColIdx *)ptr);

S
Shengliang Guan 已提交
876
      if (varDataTLen(value) == varDataTLen(pOldVal)) {  // just update the column value in place
H
TD-90  
Hongze Cheng 已提交
877
        memcpy(pOldVal, value, varDataTLen(value));
878 879
      } else {  // need to reallocate the memory
        int16_t nlen = kvRowLen(row) + (varDataTLen(value) - varDataTLen(pOldVal));
H
TD-90  
Hongze Cheng 已提交
880
        ASSERT(nlen > 0);
wafwerar's avatar
wafwerar 已提交
881
        nrow = taosMemoryMalloc(nlen);
H
TD-90  
Hongze Cheng 已提交
882
        if (nrow == NULL) return -1;
H
TD-90  
Hongze Cheng 已提交
883 884 885 886

        kvRowSetLen(nrow, nlen);
        kvRowSetNCols(nrow, kvRowNCols(row));

887 888 889 890 891 892 893 894
        int zsize = sizeof(SColIdx) * kvRowNCols(row) + ((SColIdx *)ptr)->offset;
        memcpy(kvRowColIdx(nrow), kvRowColIdx(row), zsize);
        memcpy(kvRowColVal(nrow, ((SColIdx *)ptr)), value, varDataTLen(value));
        // Copy left value part
        int lsize = kvRowLen(row) - TD_KV_ROW_HEAD_SIZE - zsize - varDataTLen(pOldVal);
        if (lsize > 0) {
          memcpy(POINTER_SHIFT(nrow, TD_KV_ROW_HEAD_SIZE + zsize + varDataTLen(value)),
                 POINTER_SHIFT(row, TD_KV_ROW_HEAD_SIZE + zsize + varDataTLen(pOldVal)), lsize);
H
TD-90  
Hongze Cheng 已提交
895 896
        }

897 898 899 900 901
        for (int i = 0; i < kvRowNCols(nrow); i++) {
          pColIdx = kvRowColIdxAt(nrow, i);

          if (pColIdx->offset > ((SColIdx *)ptr)->offset) {
            pColIdx->offset = pColIdx->offset - varDataTLen(pOldVal) + varDataTLen(value);
H
TD-90  
Hongze Cheng 已提交
902 903 904 905
          }
        }

        *orow = nrow;
wafwerar's avatar
wafwerar 已提交
906
        taosMemoryFree(row);
H
TD-90  
Hongze Cheng 已提交
907 908 909 910 911 912 913
      }
    } else {
      memcpy(kvRowColVal(row, (SColIdx *)ptr), value, TYPE_BYTES[type]);
    }
  }

  return 0;
H
Hongze Cheng 已提交
914 915
}

H
TD-353  
Hongze Cheng 已提交
916
int tdEncodeKVRow(void **buf, SKVRow row) {
H
Hongze Cheng 已提交
917
  // May change the encode purpose
H
TD-353  
Hongze Cheng 已提交
918 919 920 921 922 923
  if (buf != NULL) {
    kvRowCpy(*buf, row);
    *buf = POINTER_SHIFT(*buf, kvRowLen(row));
  }

  return kvRowLen(row);
H
Hongze Cheng 已提交
924 925
}

H
Hongze Cheng 已提交
926 927
void *tdDecodeKVRow(void *buf, SKVRow *row) {
  *row = tdKVRowDup(buf);
H
TD-353  
Hongze Cheng 已提交
928
  if (*row == NULL) return NULL;
H
Hongze Cheng 已提交
929
  return POINTER_SHIFT(buf, kvRowLen(*row));
H
Hongze Cheng 已提交
930 931
}

H
Hongze Cheng 已提交
932
int tdInitKVRowBuilder(SKVRowBuilder *pBuilder) {
H
Hongze Cheng 已提交
933 934
  pBuilder->tCols = 128;
  pBuilder->nCols = 0;
wafwerar's avatar
wafwerar 已提交
935
  pBuilder->pColIdx = (SColIdx *)taosMemoryMalloc(sizeof(SColIdx) * pBuilder->tCols);
H
Hongze Cheng 已提交
936 937 938
  if (pBuilder->pColIdx == NULL) return -1;
  pBuilder->alloc = 1024;
  pBuilder->size = 0;
wafwerar's avatar
wafwerar 已提交
939
  pBuilder->buf = taosMemoryMalloc(pBuilder->alloc);
H
Hongze Cheng 已提交
940
  if (pBuilder->buf == NULL) {
wafwerar's avatar
wafwerar 已提交
941
    taosMemoryFree(pBuilder->pColIdx);
H
Hongze Cheng 已提交
942 943 944 945 946
    return -1;
  }
  return 0;
}

H
Hongze Cheng 已提交
947
void tdDestroyKVRowBuilder(SKVRowBuilder *pBuilder) {
wafwerar's avatar
wafwerar 已提交
948 949
  taosMemoryFreeClear(pBuilder->pColIdx);
  taosMemoryFreeClear(pBuilder->buf);
H
Hongze Cheng 已提交
950 951
}

H
Hongze Cheng 已提交
952
void tdResetKVRowBuilder(SKVRowBuilder *pBuilder) {
H
Hongze Cheng 已提交
953 954 955 956
  pBuilder->nCols = 0;
  pBuilder->size = 0;
}

H
Hongze Cheng 已提交
957
SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder) {
C
Cary Xu 已提交
958
  int tlen = sizeof(SColIdx) * pBuilder->nCols + pBuilder->size;
H
Hongze Cheng 已提交
959 960
  if (tlen == 0) return NULL;

H
Hongze Cheng 已提交
961 962
  tlen += TD_KV_ROW_HEAD_SIZE;

wafwerar's avatar
wafwerar 已提交
963
  SKVRow row = taosMemoryMalloc(tlen);
H
Hongze Cheng 已提交
964 965
  if (row == NULL) return NULL;

H
Hongze Cheng 已提交
966
  kvRowSetNCols(row, pBuilder->nCols);
H
Hongze Cheng 已提交
967
  kvRowSetLen(row, tlen);
H
Hongze Cheng 已提交
968

H
Hongze Cheng 已提交
969 970
  memcpy(kvRowColIdx(row), pBuilder->pColIdx, sizeof(SColIdx) * pBuilder->nCols);
  memcpy(kvRowValues(row), pBuilder->buf, pBuilder->size);
H
Hongze Cheng 已提交
971 972

  return row;
973
}
H
Hongze Cheng 已提交
974
#endif