tdataformat.c 26.4 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
S
slguan 已提交
15
#include "tdataformat.h"
S
Shengliang Guan 已提交
16
#include "ulog.h"
T
Tao Liu 已提交
17
#include "talgo.h"
H
TD-353  
Hongze Cheng 已提交
18
#include "tcoding.h"
H
Hongze Cheng 已提交
19
#include "wchar.h"
20
#include "tarray.h"
H
more  
hzcheng 已提交
21

22
static void dataColSetNEleNull(SDataCol *pCol, int nEle);
H
TD-1438  
Hongze Cheng 已提交
23
static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2,
24
                               int limit2, int tRows, bool forceSetNull);
H
TD-1438  
Hongze Cheng 已提交
25

L
Liu Jicong 已提交
26
int tdAllocMemForCol(SDataCol *pCol, int maxPoints) {
L
Liu Jicong 已提交
27 28
  int spaceNeeded = pCol->bytes * maxPoints;
  if(IS_VAR_DATA_TYPE(pCol->type)) {
L
Liu Jicong 已提交
29
    spaceNeeded += sizeof(VarDataOffsetT) * maxPoints;
L
Liu Jicong 已提交
30
  }
C
Cary Xu 已提交
31
#ifdef TD_SUPPORT_BITMAP
K
Kaili Xu 已提交
32
  spaceNeeded += (int)TD_BITMAP_BYTES(maxPoints);
C
Cary Xu 已提交
33
#endif
L
Liu Jicong 已提交
34 35 36
  if(pCol->spaceSize < spaceNeeded) {
    void* ptr = realloc(pCol->pData, spaceNeeded);
    if(ptr == NULL) {
L
Liu Jicong 已提交
37
      uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)spaceNeeded,
38
             strerror(errno));
L
Liu Jicong 已提交
39
      return -1;
L
Liu Jicong 已提交
40 41 42
    } else {
      pCol->pData = ptr;
      pCol->spaceSize = spaceNeeded;
43 44
    }
  }
C
Cary Xu 已提交
45
  if (IS_VAR_DATA_TYPE(pCol->type)) {
L
Liu Jicong 已提交
46
    pCol->dataOff = POINTER_SHIFT(pCol->pData, pCol->bytes * maxPoints);
C
Cary Xu 已提交
47 48 49 50 51 52 53
#ifdef TD_SUPPORT_BITMAP
    pCol->pBitmap = POINTER_SHIFT(pCol->dataOff, sizeof(VarDataOffsetT) * maxPoints);
#endif
  }
#ifdef TD_SUPPORT_BITMAP
  else {
    pCol->pBitmap = POINTER_SHIFT(pCol->pData, pCol->bytes * maxPoints);
L
Liu Jicong 已提交
54
  }
C
Cary Xu 已提交
55
#endif
L
Liu Jicong 已提交
56
  return 0;
57 58
}

H
hzcheng 已提交
59 60 61
/**
 * Duplicate the schema and return a new object
 */
H
Hongze Cheng 已提交
62
STSchema *tdDupSchema(const STSchema *pSchema) {
H
Hongze Cheng 已提交
63 64 65

  int tlen = sizeof(STSchema) + sizeof(STColumn) * schemaNCols(pSchema);
  STSchema *tSchema = (STSchema *)malloc(tlen);
H
hzcheng 已提交
66 67
  if (tSchema == NULL) return NULL;

H
Hongze Cheng 已提交
68
  memcpy((void *)tSchema, (void *)pSchema, tlen);
H
hzcheng 已提交
69 70 71 72

  return tSchema;
}

H
TD-27  
hzcheng 已提交
73 74 75
/**
 * Encode a schema to dst, and return the next pointer
 */
H
TD-353  
Hongze Cheng 已提交
76 77 78 79
int tdEncodeSchema(void **buf, STSchema *pSchema) {
  int tlen = 0;
  tlen += taosEncodeFixedI32(buf, schemaVersion(pSchema));
  tlen += taosEncodeFixedI32(buf, schemaNCols(pSchema));
H
TD-166  
hzcheng 已提交
80

H
TD-27  
hzcheng 已提交
81 82
  for (int i = 0; i < schemaNCols(pSchema); i++) {
    STColumn *pCol = schemaColAt(pSchema, i);
H
TD-353  
Hongze Cheng 已提交
83 84
    tlen += taosEncodeFixedI8(buf, colType(pCol));
    tlen += taosEncodeFixedI16(buf, colColId(pCol));
85
    tlen += taosEncodeFixedI16(buf, colBytes(pCol));
H
TD-27  
hzcheng 已提交
86 87
  }

H
TD-353  
Hongze Cheng 已提交
88
  return tlen;
H
TD-27  
hzcheng 已提交
89 90 91 92 93
}

/**
 * Decode a schema from a binary.
 */
H
TD-353  
Hongze Cheng 已提交
94
void *tdDecodeSchema(void *buf, STSchema **pRSchema) {
H
Hongze Cheng 已提交
95
  int version = 0;
H
TD-353  
Hongze Cheng 已提交
96
  int numOfCols = 0;
H
TD-353  
Hongze Cheng 已提交
97
  STSchemaBuilder schemaBuilder;
H
TD-27  
hzcheng 已提交
98

H
TD-353  
Hongze Cheng 已提交
99 100
  buf = taosDecodeFixedI32(buf, &version);
  buf = taosDecodeFixedI32(buf, &numOfCols);
H
TD-27  
hzcheng 已提交
101

H
Hongze Cheng 已提交
102 103
  if (tdInitTSchemaBuilder(&schemaBuilder, version) < 0) return NULL;

H
TD-353  
Hongze Cheng 已提交
104
  for (int i = 0; i < numOfCols; i++) {
H
TD-27  
hzcheng 已提交
105 106
    int8_t  type = 0;
    int16_t colId = 0;
107
    int16_t bytes = 0;
H
TD-353  
Hongze Cheng 已提交
108 109
    buf = taosDecodeFixedI8(buf, &type);
    buf = taosDecodeFixedI16(buf, &colId);
110
    buf = taosDecodeFixedI16(buf, &bytes);
H
Hongze Cheng 已提交
111 112 113 114
    if (tdAddColToSchema(&schemaBuilder, type, colId, bytes) < 0) {
      tdDestroyTSchemaBuilder(&schemaBuilder);
      return NULL;
    }
H
TD-27  
hzcheng 已提交
115 116
  }

H
TD-353  
Hongze Cheng 已提交
117
  *pRSchema = tdGetSchemaFromBuilder(&schemaBuilder);
H
Hongze Cheng 已提交
118
  tdDestroyTSchemaBuilder(&schemaBuilder);
H
TD-353  
Hongze Cheng 已提交
119
  return buf;
H
Hongze Cheng 已提交
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
}

int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) {
  if (pBuilder == NULL) return -1;

  pBuilder->tCols = 256;
  pBuilder->columns = (STColumn *)malloc(sizeof(STColumn) * pBuilder->tCols);
  if (pBuilder->columns == NULL) return -1;

  tdResetTSchemaBuilder(pBuilder, version);
  return 0;
}

void tdDestroyTSchemaBuilder(STSchemaBuilder *pBuilder) {
  if (pBuilder) {
S
TD-1848  
Shengliang Guan 已提交
135
    tfree(pBuilder->columns);
H
Hongze Cheng 已提交
136 137 138 139 140 141 142
  }
}

void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) {
  pBuilder->nCols = 0;
  pBuilder->tlen = 0;
  pBuilder->flen = 0;
T
Tao Liu 已提交
143
  pBuilder->vlen = 0;
H
Hongze Cheng 已提交
144 145 146
  pBuilder->version = version;
}

147
int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int16_t colId, int16_t bytes) {
148
  if (!isValidDataType(type)) return -1;
H
Hongze Cheng 已提交
149 150 151

  if (pBuilder->nCols >= pBuilder->tCols) {
    pBuilder->tCols *= 2;
T
tickduan 已提交
152 153 154
    STColumn* columns = (STColumn *)realloc(pBuilder->columns, sizeof(STColumn) * pBuilder->tCols);
    if (columns == NULL) return -1;
    pBuilder->columns = columns;
H
Hongze Cheng 已提交
155 156 157 158 159 160 161 162 163 164 165 166 167 168
  }

  STColumn *pCol = &(pBuilder->columns[pBuilder->nCols]);
  colSetType(pCol, type);
  colSetColId(pCol, colId);
  if (pBuilder->nCols == 0) {
    colSetOffset(pCol, 0);
  } else {
    STColumn *pTCol = &(pBuilder->columns[pBuilder->nCols-1]);
    colSetOffset(pCol, pTCol->offset + TYPE_BYTES[pTCol->type]);
  }

  if (IS_VAR_DATA_TYPE(type)) {
    colSetBytes(pCol, bytes);
T
Tao Liu 已提交
169 170
    pBuilder->tlen += (TYPE_BYTES[type] + bytes);
    pBuilder->vlen += bytes - sizeof(VarDataLenT);
H
Hongze Cheng 已提交
171 172 173
  } else {
    colSetBytes(pCol, TYPE_BYTES[type]);
    pBuilder->tlen += TYPE_BYTES[type];
T
Tao Liu 已提交
174
    pBuilder->vlen += TYPE_BYTES[type];
H
Hongze Cheng 已提交
175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196
  }

  pBuilder->nCols++;
  pBuilder->flen += TYPE_BYTES[type];

  ASSERT(pCol->offset < pBuilder->flen);

  return 0;
}

STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder) {
  if (pBuilder->nCols <= 0) return NULL;

  int tlen = sizeof(STSchema) + sizeof(STColumn) * pBuilder->nCols;

  STSchema *pSchema = (STSchema *)malloc(tlen);
  if (pSchema == NULL) return NULL;

  schemaVersion(pSchema) = pBuilder->version;
  schemaNCols(pSchema) = pBuilder->nCols;
  schemaTLen(pSchema) = pBuilder->tlen;
  schemaFLen(pSchema) = pBuilder->flen;
T
Tao Liu 已提交
197
  schemaVLen(pSchema) = pBuilder->vlen;
H
Hongze Cheng 已提交
198

C
Cary Xu 已提交
199
#ifdef TD_SUPPORT_BITMAP
C
Cary Xu 已提交
200
  schemaTLen(pSchema) += (int)TD_BITMAP_BYTES(schemaNCols(pSchema));
C
Cary Xu 已提交
201 202
#endif

H
Hongze Cheng 已提交
203 204
  memcpy(schemaColAt(pSchema, 0), pBuilder->columns, sizeof(STColumn) * pBuilder->nCols);

H
TD-27  
hzcheng 已提交
205 206 207
  return pSchema;
}

H
hzcheng 已提交
208 209 210
/**
 * Initialize a data row
 */
H
TD-90  
Hongze Cheng 已提交
211 212 213 214
void tdInitDataRow(SDataRow row, STSchema *pSchema) {
  dataRowSetLen(row, TD_DATA_ROW_HEAD_SIZE + schemaFLen(pSchema));
  dataRowSetVersion(row, schemaVersion(pSchema));
}
H
hzcheng 已提交
215

C
Cary Xu 已提交
216 217
SDataRow tdNewDataRowFromSchema(STSchema *pSchema) {
  int32_t size = dataRowMaxBytesFromSchema(pSchema);
H
hzcheng 已提交
218

C
Cary Xu 已提交
219 220
  SDataRow row = malloc(size);
  if (row == NULL) return NULL;
H
hzcheng 已提交
221

C
Cary Xu 已提交
222 223 224
  tdInitDataRow(row, pSchema);
  return row;
}
H
hzcheng 已提交
225

H
hzcheng 已提交
226 227 228
/**
 * Free the SDataRow object
 */
C
Cary Xu 已提交
229 230 231
void tdFreeDataRow(SDataRow row) {
  if (row) free(row);
}
C
Cary Xu 已提交
232

C
Cary Xu 已提交
233 234 235
SDataRow tdDataRowDup(SDataRow row) {
  SDataRow trow = malloc(dataRowLen(row));
  if (trow == NULL) return NULL;
H
hzcheng 已提交
236

C
Cary Xu 已提交
237 238 239
  dataRowCpy(trow, row);
  return trow;
}
C
Cary Xu 已提交
240 241 242

SMemRow tdMemRowDup(SMemRow row) {
  SMemRow trow = malloc(memRowTLen(row));
H
hzcheng 已提交
243 244
  if (trow == NULL) return NULL;

C
Cary Xu 已提交
245
  memRowCpy(trow, row);
H
hzcheng 已提交
246
  return trow;
H
hzcheng 已提交
247
}
C
Cary Xu 已提交
248

249
void dataColInit(SDataCol *pDataCol, STColumn *pCol, int maxPoints) {
H
TD-166  
hzcheng 已提交
250 251 252
  pDataCol->type = colType(pCol);
  pDataCol->colId = colColId(pCol);
  pDataCol->bytes = colBytes(pCol);
C
Cary Xu 已提交
253
  pDataCol->offset = colOffset(pCol) + TD_DATA_ROW_HEAD_SIZE;
H
TD-166  
hzcheng 已提交
254 255 256

  pDataCol->len = 0;
}
H
Hongze Cheng 已提交
257
// value from timestamp should be TKEY here instead of TSKEY
L
Liu Jicong 已提交
258
int dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints) {
H
TD-166  
hzcheng 已提交
259 260
  ASSERT(pCol != NULL && value != NULL);

K
kailixu 已提交
261 262 263
  if (isAllRowsNull(pCol)) {
    if (isNull(value, pCol->type)) {
      // all null value yet, just return
L
Liu Jicong 已提交
264
      return 0;
K
kailixu 已提交
265 266
    }

L
Liu Jicong 已提交
267
    if(tdAllocMemForCol(pCol, maxPoints) < 0) return -1;
K
kailixu 已提交
268 269
    if (numOfRows > 0) {
      // Find the first not null value, fill all previouse values as NULL
L
Liu Jicong 已提交
270
      dataColSetNEleNull(pCol, numOfRows);
K
kailixu 已提交
271 272 273
    }
  }

H
Hongze Cheng 已提交
274 275 276 277 278 279 280
  if (IS_VAR_DATA_TYPE(pCol->type)) {
    // set offset
    pCol->dataOff[numOfRows] = pCol->len;
    // Copy data
    memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, varDataTLen(value));
    // Update the length
    pCol->len += varDataTLen(value);
H
TD-166  
hzcheng 已提交
281
  } else {
H
Haojun Liao 已提交
282
    ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows);
H
Hongze Cheng 已提交
283 284
    memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, pCol->bytes);
    pCol->len += pCol->bytes;
H
TD-166  
hzcheng 已提交
285
  }
L
Liu Jicong 已提交
286 287 288 289 290 291 292 293 294
  return 0;
}

static FORCE_INLINE const void *tdGetColDataOfRowUnsafe(SDataCol *pCol, int row) {
  if (IS_VAR_DATA_TYPE(pCol->type)) {
    return POINTER_SHIFT(pCol->pData, pCol->dataOff[row]);
  } else {
    return POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * row);
  }
H
TD-166  
hzcheng 已提交
295 296
}

H
TD-166  
hzcheng 已提交
297
bool isNEleNull(SDataCol *pCol, int nEle) {
298
  if(isAllRowsNull(pCol)) return true;
H
Hongze Cheng 已提交
299
  for (int i = 0; i < nEle; i++) {
L
Liu Jicong 已提交
300
    if (!isNull(tdGetColDataOfRowUnsafe(pCol, i), pCol->type)) return false;
H
TD-166  
hzcheng 已提交
301
  }
H
Hongze Cheng 已提交
302
  return true;
H
TD-166  
hzcheng 已提交
303 304
}

305
static FORCE_INLINE void dataColSetNullAt(SDataCol *pCol, int index) {
H
TD-90  
Hongze Cheng 已提交
306 307 308
  if (IS_VAR_DATA_TYPE(pCol->type)) {
    pCol->dataOff[index] = pCol->len;
    char *ptr = POINTER_SHIFT(pCol->pData, pCol->len);
309
    setVardataNull(ptr, pCol->type);
H
TD-90  
Hongze Cheng 已提交
310 311 312 313 314 315 316
    pCol->len += varDataTLen(ptr);
  } else {
    setNull(POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * index), pCol->type, pCol->bytes);
    pCol->len += TYPE_BYTES[pCol->type];
  }
}

317
static void dataColSetNEleNull(SDataCol *pCol, int nEle) {
H
TD-90  
Hongze Cheng 已提交
318 319 320 321 322 323 324 325
  if (IS_VAR_DATA_TYPE(pCol->type)) {
    pCol->len = 0;
    for (int i = 0; i < nEle; i++) {
      dataColSetNullAt(pCol, i);
    }
  } else {
    setNullN(pCol->pData, pCol->type, pCol->bytes, nEle);
    pCol->len = TYPE_BYTES[pCol->type] * nEle;
H
TD-166  
hzcheng 已提交
326 327 328
  }
}

H
TD-166  
hzcheng 已提交
329 330 331
void dataColSetOffset(SDataCol *pCol, int nEle) {
  ASSERT(((pCol->type == TSDB_DATA_TYPE_BINARY) || (pCol->type == TSDB_DATA_TYPE_NCHAR)));

H
Hongze Cheng 已提交
332
  void *tptr = pCol->pData;
H
TD-166  
hzcheng 已提交
333
  // char *tptr = (char *)(pCol->pData);
H
TD-166  
hzcheng 已提交
334

H
TD-166  
hzcheng 已提交
335
  VarDataOffsetT offset = 0;
H
TD-166  
hzcheng 已提交
336
  for (int i = 0; i < nEle; i++) {
H
TD-166  
hzcheng 已提交
337
    pCol->dataOff[i] = offset;
H
TD-166  
hzcheng 已提交
338
    offset += varDataTLen(tptr);
H
hzcheng 已提交
339
    tptr = POINTER_SHIFT(tptr, varDataTLen(tptr));
H
TD-166  
hzcheng 已提交
340 341 342
  }
}

L
Liu Jicong 已提交
343
SDataCols *tdNewDataCols(int maxCols, int maxRows) {
H
Hongze Cheng 已提交
344
  SDataCols *pCols = (SDataCols *)calloc(1, sizeof(SDataCols));
H
Haojun Liao 已提交
345
  if (pCols == NULL) {
S
Shengliang Guan 已提交
346
    uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)sizeof(SDataCols), strerror(errno));
H
Haojun Liao 已提交
347 348
    return NULL;
  }
H
TD-34  
hzcheng 已提交
349

H
Hongze Cheng 已提交
350
  pCols->maxPoints = maxRows;
L
Liu Jicong 已提交
351 352 353
  pCols->maxCols = maxCols;
  pCols->numOfRows = 0;
  pCols->numOfCols = 0;
H
Hongze Cheng 已提交
354 355 356 357 358 359 360 361 362

  if (maxCols > 0) {
    pCols->cols = (SDataCol *)calloc(maxCols, sizeof(SDataCol));
    if (pCols->cols == NULL) {
      uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)sizeof(SDataCol) * maxCols,
             strerror(errno));
      tdFreeDataCols(pCols);
      return NULL;
    }
L
Liu Jicong 已提交
363 364 365
    int i;
    for(i = 0; i < maxCols; i++) {
      pCols->cols[i].spaceSize = 0;
L
Liu Jicong 已提交
366
      pCols->cols[i].len = 0;
L
Liu Jicong 已提交
367 368 369
      pCols->cols[i].pData = NULL;
      pCols->cols[i].dataOff = NULL;
    }
H
Hongze Cheng 已提交
370 371
  }

H
TD-34  
hzcheng 已提交
372 373 374
  return pCols;
}

H
Hongze Cheng 已提交
375
int tdInitDataCols(SDataCols *pCols, STSchema *pSchema) {
376 377
  int i;
  int oldMaxCols = pCols->maxCols;
L
Liu Jicong 已提交
378
  if (schemaNCols(pSchema) > oldMaxCols) {
H
Hongze Cheng 已提交
379
    pCols->maxCols = schemaNCols(pSchema);
L
Liu Jicong 已提交
380 381 382
    void* ptr = (SDataCol *)realloc(pCols->cols, sizeof(SDataCol) * pCols->maxCols);
    if (ptr == NULL) return -1;
    pCols->cols = ptr;
383 384 385
    for(i = oldMaxCols; i < pCols->maxCols; i++) {
      pCols->cols[i].pData = NULL;
      pCols->cols[i].dataOff = NULL;
L
Liu Jicong 已提交
386
      pCols->cols[i].spaceSize = 0;
387
    }
L
Liu Jicong 已提交
388
  }
H
Hongze Cheng 已提交
389

H
TD-34  
hzcheng 已提交
390 391 392
  tdResetDataCols(pCols);
  pCols->numOfCols = schemaNCols(pSchema);

393
  for (i = 0; i < schemaNCols(pSchema); i++) {
394
    dataColInit(pCols->cols + i, schemaColAt(pSchema, i), pCols->maxPoints);
H
TD-34  
hzcheng 已提交
395
  }
H
Hongze Cheng 已提交
396 397
  
  return 0;
H
TD-34  
hzcheng 已提交
398 399
}

H
Hongze Cheng 已提交
400
SDataCols *tdFreeDataCols(SDataCols *pCols) {
401
  int i;
H
TD-34  
hzcheng 已提交
402
  if (pCols) {
403 404 405 406 407 408 409 410 411
    if(pCols->cols) {
      int maxCols = pCols->maxCols;
      for(i = 0; i < maxCols; i++) {
        SDataCol *pCol = &pCols->cols[i];
        tfree(pCol->pData);
      }
      free(pCols->cols);
      pCols->cols = NULL;
    }
H
TD-34  
hzcheng 已提交
412 413
    free(pCols);
  }
H
Hongze Cheng 已提交
414
  return NULL;
H
TD-34  
hzcheng 已提交
415 416
}

H
TD-100  
hzcheng 已提交
417
SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
L
Liu Jicong 已提交
418
  SDataCols *pRet = tdNewDataCols(pDataCols->maxCols, pDataCols->maxPoints);
H
TD-100  
hzcheng 已提交
419 420 421 422
  if (pRet == NULL) return NULL;

  pRet->numOfCols = pDataCols->numOfCols;
  pRet->sversion = pDataCols->sversion;
H
Haojun Liao 已提交
423
  if (keepData) pRet->numOfRows = pDataCols->numOfRows;
H
TD-100  
hzcheng 已提交
424 425 426 427 428 429

  for (int i = 0; i < pDataCols->numOfCols; i++) {
    pRet->cols[i].type = pDataCols->cols[i].type;
    pRet->cols[i].colId = pDataCols->cols[i].colId;
    pRet->cols[i].bytes = pDataCols->cols[i].bytes;
    pRet->cols[i].offset = pDataCols->cols[i].offset;
H
TD-166  
hzcheng 已提交
430 431

    if (keepData) {
L
Liu Jicong 已提交
432
      if (pDataCols->cols[i].len > 0) {
L
Liu Jicong 已提交
433 434 435 436
        if(tdAllocMemForCol(&pRet->cols[i], pRet->maxPoints) < 0) {
          tdFreeDataCols(pRet);
          return NULL;
        }
L
Liu Jicong 已提交
437
        pRet->cols[i].len = pDataCols->cols[i].len;
H
Hongze Cheng 已提交
438
        memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pDataCols->cols[i].len);
H
Hongze Cheng 已提交
439
        if (IS_VAR_DATA_TYPE(pRet->cols[i].type)) {
L
Liu Jicong 已提交
440 441
          int dataOffSize = sizeof(VarDataOffsetT) * pDataCols->maxPoints;
          memcpy(pRet->cols[i].dataOff, pDataCols->cols[i].dataOff, dataOffSize);
H
Hongze Cheng 已提交
442
        }
H
TD-166  
hzcheng 已提交
443 444
      }
    }
H
TD-100  
hzcheng 已提交
445 446 447 448 449
  }

  return pRet;
}

H
TD-34  
hzcheng 已提交
450
void tdResetDataCols(SDataCols *pCols) {
B
Bomin Zhang 已提交
451 452 453 454 455
  if (pCols != NULL) {
    pCols->numOfRows = 0;
    for (int i = 0; i < pCols->maxCols; i++) {
      dataColReset(pCols->cols + i);
    }
H
TD-34  
hzcheng 已提交
456 457
  }
}
458 459

static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull) {
H
TD-1548  
Hongze Cheng 已提交
460
  ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < dataRowKey(row));
H
TD-166  
hzcheng 已提交
461

C
Cary Xu 已提交
462
  int rcol = 0;
H
TD-90  
Hongze Cheng 已提交
463 464
  int dcol = 0;

465
  while (dcol < pCols->numOfCols) {
466
    bool setCol = 0;
467 468 469 470 471
    SDataCol *pDataCol = &(pCols->cols[dcol]);
    if (rcol >= schemaNCols(pSchema)) {
      dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
      dcol++;
      continue;
H
TD-90  
Hongze Cheng 已提交
472
    }
H
TD-166  
hzcheng 已提交
473

474 475 476
    STColumn *pRowCol = schemaColAt(pSchema, rcol);
    if (pRowCol->colId == pDataCol->colId) {
      void *value = tdGetRowDataOfCol(row, pRowCol->type, pRowCol->offset + TD_DATA_ROW_HEAD_SIZE);
477
      if(!isNull(value, pDataCol->type)) setCol = 1;
478 479 480 481 482 483
      dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints);
      dcol++;
      rcol++;
    } else if (pRowCol->colId < pDataCol->colId) {
      rcol++;
    } else {
484
      if(forceSetNull || setCol) {
485
        dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
C
Cary Xu 已提交
486
      }
487
      dcol++;
C
Cary Xu 已提交
488 489 490 491 492
    }
  }
  pCols->numOfRows++;
}

C
Cary Xu 已提交
493
static void tdAppendKVRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull) {
C
Cary Xu 已提交
494 495 496 497 498
  ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < kvRowKey(row));

  int rcol = 0;
  int dcol = 0;

499 500 501
  int nRowCols = kvRowNCols(row);

  while (dcol < pCols->numOfCols) {
502
    bool setCol = 0;
503 504 505 506 507
    SDataCol *pDataCol = &(pCols->cols[dcol]);
    if (rcol >= nRowCols || rcol >= schemaNCols(pSchema)) {
      dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
      ++dcol;
      continue;
C
Cary Xu 已提交
508 509
    }

510 511 512 513
    SColIdx *colIdx = kvRowColIdxAt(row, rcol);

    if (colIdx->colId == pDataCol->colId) {
      void *value = tdGetKvRowDataOfCol(row, colIdx->offset);
514
      if(!isNull(value, pDataCol->type)) setCol = 1;
515 516 517 518 519 520
      dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints);
      ++dcol;
      ++rcol;
    } else if (colIdx->colId < pDataCol->colId) {
      ++rcol;
    } else {
521
      if(forceSetNull || setCol) {
C
Cary Xu 已提交
522
        dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
H
TD-1548  
Hongze Cheng 已提交
523
      }
524
      ++dcol;
H
TD-90  
Hongze Cheng 已提交
525
    }
H
TD-34  
hzcheng 已提交
526
  }
H
Haojun Liao 已提交
527
  pCols->numOfRows++;
H
TD-34  
hzcheng 已提交
528
}
H
TD-166  
hzcheng 已提交
529

530
void tdAppendMemRowToDataCol(SMemRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull) {
C
Cary Xu 已提交
531
  if (isDataRow(row)) {
532
    tdAppendDataRowToDataCol(memRowDataBody(row), pSchema, pCols, forceSetNull);
C
Cary Xu 已提交
533
  } else if (isKvRow(row)) {
C
Cary Xu 已提交
534
    tdAppendKVRowToDataCol(memRowKvBody(row), pSchema, pCols, forceSetNull);
C
Cary Xu 已提交
535 536 537 538 539
  } else {
    ASSERT(0);
  }
}

540
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset, bool forceSetNull) {
H
Haojun Liao 已提交
541
  ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows);
H
TD-166  
hzcheng 已提交
542
  ASSERT(target->numOfCols == source->numOfCols);
H
Hongze Cheng 已提交
543 544 545 546 547
  int offset = 0;

  if (pOffset == NULL) {
    pOffset = &offset;
  }
H
TD-100  
hzcheng 已提交
548

H
TD-166  
hzcheng 已提交
549
  SDataCols *pTarget = NULL;
H
TD-100  
hzcheng 已提交
550

551
  if ((target->numOfRows == 0) || (dataColsKeyLast(target) < dataColsKeyAtRow(source, *pOffset))) {  // No overlap
H
Hongze Cheng 已提交
552
    ASSERT(target->numOfRows + rowsToMerge <= target->maxPoints);
H
TD-166  
hzcheng 已提交
553 554
    for (int i = 0; i < rowsToMerge; i++) {
      for (int j = 0; j < source->numOfCols; j++) {
555
        if (source->cols[j].len > 0 || target->cols[j].len > 0) {
H
Hongze Cheng 已提交
556
          dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i + (*pOffset)), target->numOfRows,
H
Hongze Cheng 已提交
557 558
                           target->maxPoints);
        }
H
TD-166  
hzcheng 已提交
559
      }
H
Haojun Liao 已提交
560
      target->numOfRows++;
H
TD-166  
hzcheng 已提交
561
    }
L
lichuang 已提交
562
    (*pOffset) += rowsToMerge;
H
TD-166  
hzcheng 已提交
563 564 565 566 567
  } else {
    pTarget = tdDupDataCols(target, true);
    if (pTarget == NULL) goto _err;

    int iter1 = 0;
H
Hongze Cheng 已提交
568
    tdMergeTwoDataCols(target, pTarget, &iter1, pTarget->numOfRows, source, pOffset, source->numOfRows,
569
                       pTarget->numOfRows + rowsToMerge, forceSetNull);
H
TD-166  
hzcheng 已提交
570
  }
H
TD-100  
hzcheng 已提交
571 572 573 574 575 576 577 578

  tdFreeDataCols(pTarget);
  return 0;

_err:
  tdFreeDataCols(pTarget);
  return -1;
}
H
TD-100  
hzcheng 已提交
579

H
TD-1438  
Hongze Cheng 已提交
580 581
// src2 data has more priority than src1
static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2,
582
                               int limit2, int tRows, bool forceSetNull) {
H
TD-100  
hzcheng 已提交
583
  tdResetDataCols(target);
H
TD-521  
Hongze Cheng 已提交
584
  ASSERT(limit1 <= src1->numOfRows && limit2 <= src2->numOfRows);
H
TD-100  
hzcheng 已提交
585

H
Haojun Liao 已提交
586
  while (target->numOfRows < tRows) {
H
TD-521  
Hongze Cheng 已提交
587
    if (*iter1 >= limit1 && *iter2 >= limit2) break;
H
TD-100  
hzcheng 已提交
588

H
TD-1548  
Hongze Cheng 已提交
589 590 591 592 593 594
    TSKEY key1 = (*iter1 >= limit1) ? INT64_MAX : dataColsKeyAt(src1, *iter1);
    TKEY  tkey1 = (*iter1 >= limit1) ? TKEY_NULL : dataColsTKeyAt(src1, *iter1);
    TSKEY key2 = (*iter2 >= limit2) ? INT64_MAX : dataColsKeyAt(src2, *iter2);
    TKEY  tkey2 = (*iter2 >= limit2) ? TKEY_NULL : dataColsTKeyAt(src2, *iter2);

    ASSERT(tkey1 == TKEY_NULL || (!TKEY_IS_DELETED(tkey1)));
H
TD-100  
hzcheng 已提交
595

H
TD-1438  
Hongze Cheng 已提交
596
    if (key1 < key2) {
H
TD-100  
hzcheng 已提交
597 598
      for (int i = 0; i < src1->numOfCols; i++) {
        ASSERT(target->cols[i].type == src1->cols[i].type);
599
        if (src1->cols[i].len > 0 || target->cols[i].len > 0) {
H
Hongze Cheng 已提交
600 601 602
          dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows,
                           target->maxPoints);
        }
H
TD-100  
hzcheng 已提交
603 604
      }

H
Haojun Liao 已提交
605
      target->numOfRows++;
H
TD-100  
hzcheng 已提交
606
      (*iter1)++;
H
TD-1548  
Hongze Cheng 已提交
607 608 609 610
    } else if (key1 >= key2) {
      if ((key1 > key2) || (key1 == key2 && !TKEY_IS_DELETED(tkey2))) {
        for (int i = 0; i < src2->numOfCols; i++) {
          ASSERT(target->cols[i].type == src2->cols[i].type);
L
Liu Jicong 已提交
611
          if (src2->cols[i].len > 0 && !isNull(src2->cols[i].pData, src2->cols[i].type)) {
H
TD-1548  
Hongze Cheng 已提交
612 613
            dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src2->cols + i, *iter2), target->numOfRows,
                             target->maxPoints);
L
Liu Jicong 已提交
614 615 616
          } else if(!forceSetNull && key1 == key2 && src1->cols[i].len > 0) {
            dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows,
                             target->maxPoints);
617 618
          } else if(target->cols[i].len > 0) {
            dataColSetNullAt(&target->cols[i], target->numOfRows);
H
TD-1548  
Hongze Cheng 已提交
619
          }
H
Hongze Cheng 已提交
620
        }
H
Hongze Cheng 已提交
621
        target->numOfRows++;
H
TD-100  
hzcheng 已提交
622
      }
H
TD-100  
hzcheng 已提交
623

H
TD-100  
hzcheng 已提交
624
      (*iter2)++;
H
TD-1438  
Hongze Cheng 已提交
625
      if (key1 == key2) (*iter1)++;
H
TD-100  
hzcheng 已提交
626
    }
H
Hongze Cheng 已提交
627 628

    ASSERT(target->numOfRows <= target->maxPoints);
H
TD-100  
hzcheng 已提交
629
  }
H
Hongze Cheng 已提交
630 631
}

H
Hongze Cheng 已提交
632 633
SKVRow tdKVRowDup(SKVRow row) {
  SKVRow trow = malloc(kvRowLen(row));
H
Hongze Cheng 已提交
634 635
  if (trow == NULL) return NULL;

H
Hongze Cheng 已提交
636
  kvRowCpy(trow, row);
H
Hongze Cheng 已提交
637 638 639
  return trow;
}

B
Bomin Zhang 已提交
640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655
static int compareColIdx(const void* a, const void* b) {
  const SColIdx* x = (const SColIdx*)a;
  const SColIdx* y = (const SColIdx*)b;
  if (x->colId > y->colId) {
    return 1;
  }
  if (x->colId < y->colId) {
    return -1;
  }
  return 0;
}

void tdSortKVRowByColIdx(SKVRow row) {
  qsort(kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), compareColIdx);
}

H
TD-90  
Hongze Cheng 已提交
656 657 658 659 660 661
int tdSetKVRowDataOfCol(SKVRow *orow, int16_t colId, int8_t type, void *value) {
  SColIdx *pColIdx = NULL;
  SKVRow   row = *orow;
  SKVRow   nrow = NULL;
  void *   ptr = taosbsearch(&colId, kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), comparTagId, TD_GE);

662
  if (ptr == NULL || ((SColIdx *)ptr)->colId > colId) {  // need to add a column value to the row
C
Cary Xu 已提交
663
    int diff = IS_VAR_DATA_TYPE(type) ? varDataTLen(value) : TYPE_BYTES[type];
664 665 666 667 668
    int nRowLen = kvRowLen(row) + sizeof(SColIdx) + diff;
    int oRowCols = kvRowNCols(row);

    ASSERT(diff > 0);
    nrow = malloc(nRowLen);
H
TD-90  
Hongze Cheng 已提交
669 670
    if (nrow == NULL) return -1;

671 672
    kvRowSetLen(nrow, nRowLen);
    kvRowSetNCols(nrow, oRowCols + 1);
H
TD-90  
Hongze Cheng 已提交
673

674 675
    memcpy(kvRowColIdx(nrow), kvRowColIdx(row), sizeof(SColIdx) * oRowCols);
    memcpy(kvRowValues(nrow), kvRowValues(row), kvRowValLen(row));
H
TD-90  
Hongze Cheng 已提交
676

677 678 679
    pColIdx = kvRowColIdxAt(nrow, oRowCols);
    pColIdx->colId = colId;
    pColIdx->offset = kvRowValLen(row);
H
TD-90  
Hongze Cheng 已提交
680

681
    memcpy(kvRowColVal(nrow, pColIdx), value, diff);  // copy new value
H
TD-90  
Hongze Cheng 已提交
682

683
    tdSortKVRowByColIdx(nrow);
H
TD-90  
Hongze Cheng 已提交
684 685 686

    *orow = nrow;
    free(row);
H
TD-90  
Hongze Cheng 已提交
687 688 689 690 691 692 693
  } else {
    ASSERT(((SColIdx *)ptr)->colId == colId);
    if (IS_VAR_DATA_TYPE(type)) {
      void *pOldVal = kvRowColVal(row, (SColIdx *)ptr);

      if (varDataTLen(value) == varDataTLen(pOldVal)) { // just update the column value in place
        memcpy(pOldVal, value, varDataTLen(value));
694 695
      } else {  // need to reallocate the memory
        int16_t nlen = kvRowLen(row) + (varDataTLen(value) - varDataTLen(pOldVal));
H
TD-90  
Hongze Cheng 已提交
696 697
        ASSERT(nlen > 0);
        nrow = malloc(nlen);
H
TD-90  
Hongze Cheng 已提交
698
        if (nrow == NULL) return -1;
H
TD-90  
Hongze Cheng 已提交
699 700 701 702

        kvRowSetLen(nrow, nlen);
        kvRowSetNCols(nrow, kvRowNCols(row));

703 704 705 706 707 708 709 710
        int zsize = sizeof(SColIdx) * kvRowNCols(row) + ((SColIdx *)ptr)->offset;
        memcpy(kvRowColIdx(nrow), kvRowColIdx(row), zsize);
        memcpy(kvRowColVal(nrow, ((SColIdx *)ptr)), value, varDataTLen(value));
        // Copy left value part
        int lsize = kvRowLen(row) - TD_KV_ROW_HEAD_SIZE - zsize - varDataTLen(pOldVal);
        if (lsize > 0) {
          memcpy(POINTER_SHIFT(nrow, TD_KV_ROW_HEAD_SIZE + zsize + varDataTLen(value)),
                 POINTER_SHIFT(row, TD_KV_ROW_HEAD_SIZE + zsize + varDataTLen(pOldVal)), lsize);
H
TD-90  
Hongze Cheng 已提交
711 712
        }

713 714 715 716 717
        for (int i = 0; i < kvRowNCols(nrow); i++) {
          pColIdx = kvRowColIdxAt(nrow, i);

          if (pColIdx->offset > ((SColIdx *)ptr)->offset) {
            pColIdx->offset = pColIdx->offset - varDataTLen(pOldVal) + varDataTLen(value);
H
TD-90  
Hongze Cheng 已提交
718 719 720 721
          }
        }

        *orow = nrow;
H
TD-90  
Hongze Cheng 已提交
722
        free(row);
H
TD-90  
Hongze Cheng 已提交
723 724 725 726 727 728 729
      }
    } else {
      memcpy(kvRowColVal(row, (SColIdx *)ptr), value, TYPE_BYTES[type]);
    }
  }

  return 0;
H
Hongze Cheng 已提交
730 731
}

H
TD-353  
Hongze Cheng 已提交
732
int tdEncodeKVRow(void **buf, SKVRow row) {
H
Hongze Cheng 已提交
733
  // May change the encode purpose
H
TD-353  
Hongze Cheng 已提交
734 735 736 737 738 739
  if (buf != NULL) {
    kvRowCpy(*buf, row);
    *buf = POINTER_SHIFT(*buf, kvRowLen(row));
  }

  return kvRowLen(row);
H
Hongze Cheng 已提交
740 741
}

H
Hongze Cheng 已提交
742 743
void *tdDecodeKVRow(void *buf, SKVRow *row) {
  *row = tdKVRowDup(buf);
H
TD-353  
Hongze Cheng 已提交
744
  if (*row == NULL) return NULL;
H
Hongze Cheng 已提交
745
  return POINTER_SHIFT(buf, kvRowLen(*row));
H
Hongze Cheng 已提交
746 747
}

H
Hongze Cheng 已提交
748
int tdInitKVRowBuilder(SKVRowBuilder *pBuilder) {
H
Hongze Cheng 已提交
749 750 751 752 753 754 755 756 757 758 759 760 761 762
  pBuilder->tCols = 128;
  pBuilder->nCols = 0;
  pBuilder->pColIdx = (SColIdx *)malloc(sizeof(SColIdx) * pBuilder->tCols);
  if (pBuilder->pColIdx == NULL) return -1;
  pBuilder->alloc = 1024;
  pBuilder->size = 0;
  pBuilder->buf = malloc(pBuilder->alloc);
  if (pBuilder->buf == NULL) {
    free(pBuilder->pColIdx);
    return -1;
  }
  return 0;
}

H
Hongze Cheng 已提交
763
void tdDestroyKVRowBuilder(SKVRowBuilder *pBuilder) {
S
TD-1848  
Shengliang Guan 已提交
764 765
  tfree(pBuilder->pColIdx);
  tfree(pBuilder->buf);
H
Hongze Cheng 已提交
766 767
}

H
Hongze Cheng 已提交
768
void tdResetKVRowBuilder(SKVRowBuilder *pBuilder) {
H
Hongze Cheng 已提交
769 770 771 772
  pBuilder->nCols = 0;
  pBuilder->size = 0;
}

H
Hongze Cheng 已提交
773
SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder) {
C
Cary Xu 已提交
774
  int tlen = sizeof(SColIdx) * pBuilder->nCols + pBuilder->size;
H
Hongze Cheng 已提交
775 776
  if (tlen == 0) return NULL;

H
Hongze Cheng 已提交
777 778 779
  tlen += TD_KV_ROW_HEAD_SIZE;

  SKVRow row = malloc(tlen);
H
Hongze Cheng 已提交
780 781
  if (row == NULL) return NULL;

H
Hongze Cheng 已提交
782
  kvRowSetNCols(row, pBuilder->nCols);
H
Hongze Cheng 已提交
783
  kvRowSetLen(row, tlen);
H
Hongze Cheng 已提交
784

H
Hongze Cheng 已提交
785 786
  memcpy(kvRowColIdx(row), pBuilder->pColIdx, sizeof(SColIdx) * pBuilder->nCols);
  memcpy(kvRowValues(row), pBuilder->buf, pBuilder->size);
H
Hongze Cheng 已提交
787 788

  return row;
789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809
}

SMemRow mergeTwoMemRows(void *buffer, SMemRow row1, SMemRow row2, STSchema *pSchema1, STSchema *pSchema2) {
#if 0
  ASSERT(memRowKey(row1) == memRowKey(row2));
  ASSERT(schemaVersion(pSchema1) == memRowVersion(row1));
  ASSERT(schemaVersion(pSchema2) == memRowVersion(row2));
  ASSERT(schemaVersion(pSchema1) >= schemaVersion(pSchema2));
#endif

  SArray *stashRow = taosArrayInit(pSchema1->numOfCols, sizeof(SColInfo));
  if (stashRow == NULL) {
    return NULL;
  }

  SMemRow  pRow = buffer;
  SDataRow dataRow = memRowDataBody(pRow);
  memRowSetType(pRow, SMEM_ROW_DATA);
  dataRowSetVersion(dataRow, schemaVersion(pSchema1));  // use latest schema version
  dataRowSetLen(dataRow, (TDRowLenT)(TD_DATA_ROW_HEAD_SIZE + pSchema1->flen));

C
Cary Xu 已提交
810
  TDRowLenT dataLen = 0, kvLen = TD_MEM_ROW_KV_HEAD_SIZE;
811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875

  int32_t  i = 0;  // row1
  int32_t  j = 0;  // row2
  int32_t  nCols1 = schemaNCols(pSchema1);
  int32_t  nCols2 = schemaNCols(pSchema2);
  SColInfo colInfo = {0};
  int32_t  kvIdx1 = 0, kvIdx2 = 0;

  while (i < nCols1) {
    STColumn *pCol = schemaColAt(pSchema1, i);
    void *    val1 = tdGetMemRowDataOfColEx(row1, pCol->colId, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset, &kvIdx1);
    // if val1 != NULL, use val1;
    if (val1 != NULL && !isNull(val1, pCol->type)) {
      tdAppendColVal(dataRow, val1, pCol->type, pCol->offset);
      kvLen += tdGetColAppendLen(SMEM_ROW_KV, val1, pCol->type);
      setSColInfo(&colInfo, pCol->colId, pCol->type, val1);
      taosArrayPush(stashRow, &colInfo);
      ++i;  // next col
      continue;
    }

    void *val2 = NULL;
    while (j < nCols2) {
      STColumn *tCol = schemaColAt(pSchema2, j);
      if (tCol->colId < pCol->colId) {
        ++j;
        continue;
      }
      if (tCol->colId == pCol->colId) {
        val2 = tdGetMemRowDataOfColEx(row2, tCol->colId, tCol->type, TD_DATA_ROW_HEAD_SIZE + tCol->offset, &kvIdx2);
      } else if (tCol->colId > pCol->colId) {
        // set NULL
      }
      break;
    }  // end of while(j<nCols2)
    if (val2 == NULL) {
      val2 = (void *)getNullValue(pCol->type);
    }
    tdAppendColVal(dataRow, val2, pCol->type, pCol->offset);
    if (!isNull(val2, pCol->type)) {
      kvLen += tdGetColAppendLen(SMEM_ROW_KV, val2, pCol->type);
      setSColInfo(&colInfo, pCol->colId, pCol->type, val2);
      taosArrayPush(stashRow, &colInfo);
    }

    ++i;  // next col
  }

  dataLen = memRowTLen(pRow);

  if (kvLen < dataLen) {
    // scan stashRow and generate SKVRow
    memset(buffer, 0, sizeof(dataLen));
    SMemRow tRow = buffer;
    memRowSetType(tRow, SMEM_ROW_KV);
    SKVRow kvRow = (SKVRow)memRowKvBody(tRow);
    int16_t nKvNCols = (int16_t) taosArrayGetSize(stashRow);
    kvRowSetLen(kvRow, (TDRowLenT)(TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * nKvNCols));
    kvRowSetNCols(kvRow, nKvNCols);
    memRowSetKvVersion(tRow, pSchema1->version);

    int32_t toffset = 0;
    int16_t k;
    for (k = 0; k < nKvNCols; ++k) {
      SColInfo *pColInfo = taosArrayGet(stashRow, k);
876 877
      tdAppendKvColVal(kvRow, pColInfo->colVal, true, pColInfo->colId, pColInfo->colType, toffset);
      toffset += sizeof(SColIdx);
878 879 880 881 882 883
    }
    ASSERT(kvLen == memRowTLen(tRow));
  }
  taosArrayDestroy(stashRow);
  return buffer;
}