tdataformat.c 26.9 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
S
slguan 已提交
15
#include "tdataformat.h"
S
Shengliang Guan 已提交
16
#include "ulog.h"
T
Tao Liu 已提交
17
#include "talgo.h"
H
TD-353  
Hongze Cheng 已提交
18
#include "tcoding.h"
H
Hongze Cheng 已提交
19
#include "wchar.h"
20
#include "tarray.h"
H
more  
hzcheng 已提交
21

22
static void dataColSetNEleNull(SDataCol *pCol, int nEle);
C
Cary Xu 已提交
23
#if 0
H
TD-1438  
Hongze Cheng 已提交
24
static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2,
25
                               int limit2, int tRows, bool forceSetNull);
C
Cary Xu 已提交
26
#endif
L
Liu Jicong 已提交
27
int tdAllocMemForCol(SDataCol *pCol, int maxPoints) {
L
Liu Jicong 已提交
28 29
  int spaceNeeded = pCol->bytes * maxPoints;
  if(IS_VAR_DATA_TYPE(pCol->type)) {
L
Liu Jicong 已提交
30
    spaceNeeded += sizeof(VarDataOffsetT) * maxPoints;
L
Liu Jicong 已提交
31
  }
C
Cary Xu 已提交
32
#ifdef TD_SUPPORT_BITMAP
C
Cary Xu 已提交
33 34 35 36 37
  int32_t nBitmapBytes = (int32_t)TD_BITMAP_BYTES(maxPoints);
  spaceNeeded += (int)nBitmapBytes;
  // TODO: Currently, the compression of bitmap parts is affiliated to the column data parts, thus allocate 1 more
  // TYPE_BYTES as to comprise complete TYPE_BYTES. Otherwise, invalid read/write would be triggered.
  spaceNeeded += TYPE_BYTES[pCol->type];
C
Cary Xu 已提交
38
#endif
C
Cary Xu 已提交
39

L
Liu Jicong 已提交
40 41 42
  if(pCol->spaceSize < spaceNeeded) {
    void* ptr = realloc(pCol->pData, spaceNeeded);
    if(ptr == NULL) {
L
Liu Jicong 已提交
43
      uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)spaceNeeded,
44
             strerror(errno));
L
Liu Jicong 已提交
45
      return -1;
L
Liu Jicong 已提交
46 47 48
    } else {
      pCol->pData = ptr;
      pCol->spaceSize = spaceNeeded;
49 50
    }
  }
C
Cary Xu 已提交
51
#ifdef TD_SUPPORT_BITMAP
C
Cary Xu 已提交
52 53 54 55
  if (IS_VAR_DATA_TYPE(pCol->type)) {
    pCol->pBitmap = POINTER_SHIFT(pCol->pData, pCol->bytes * maxPoints);
    pCol->dataOff = POINTER_SHIFT(pCol->pBitmap, nBitmapBytes);
  } else {
C
Cary Xu 已提交
56
    pCol->pBitmap = POINTER_SHIFT(pCol->pData, pCol->bytes * maxPoints);
L
Liu Jicong 已提交
57
  }
C
Cary Xu 已提交
58 59 60 61
#else
  if (IS_VAR_DATA_TYPE(pCol->type)) {
    pCol->dataOff = POINTER_SHIFT(pCol->pData, pCol->bytes * maxPoints);
  }
C
Cary Xu 已提交
62
#endif
L
Liu Jicong 已提交
63
  return 0;
64 65
}

H
hzcheng 已提交
66 67 68
/**
 * Duplicate the schema and return a new object
 */
H
Hongze Cheng 已提交
69
STSchema *tdDupSchema(const STSchema *pSchema) {
H
Hongze Cheng 已提交
70 71 72

  int tlen = sizeof(STSchema) + sizeof(STColumn) * schemaNCols(pSchema);
  STSchema *tSchema = (STSchema *)malloc(tlen);
H
hzcheng 已提交
73 74
  if (tSchema == NULL) return NULL;

H
Hongze Cheng 已提交
75
  memcpy((void *)tSchema, (void *)pSchema, tlen);
H
hzcheng 已提交
76 77 78 79

  return tSchema;
}

H
TD-27  
hzcheng 已提交
80 81 82
/**
 * Encode a schema to dst, and return the next pointer
 */
H
TD-353  
Hongze Cheng 已提交
83 84 85 86
int tdEncodeSchema(void **buf, STSchema *pSchema) {
  int tlen = 0;
  tlen += taosEncodeFixedI32(buf, schemaVersion(pSchema));
  tlen += taosEncodeFixedI32(buf, schemaNCols(pSchema));
H
TD-166  
hzcheng 已提交
87

H
TD-27  
hzcheng 已提交
88 89
  for (int i = 0; i < schemaNCols(pSchema); i++) {
    STColumn *pCol = schemaColAt(pSchema, i);
H
TD-353  
Hongze Cheng 已提交
90 91
    tlen += taosEncodeFixedI8(buf, colType(pCol));
    tlen += taosEncodeFixedI16(buf, colColId(pCol));
92
    tlen += taosEncodeFixedI16(buf, colBytes(pCol));
H
TD-27  
hzcheng 已提交
93 94
  }

H
TD-353  
Hongze Cheng 已提交
95
  return tlen;
H
TD-27  
hzcheng 已提交
96 97 98 99 100
}

/**
 * Decode a schema from a binary.
 */
H
TD-353  
Hongze Cheng 已提交
101
void *tdDecodeSchema(void *buf, STSchema **pRSchema) {
H
Hongze Cheng 已提交
102
  int version = 0;
H
TD-353  
Hongze Cheng 已提交
103
  int numOfCols = 0;
H
TD-353  
Hongze Cheng 已提交
104
  STSchemaBuilder schemaBuilder;
H
TD-27  
hzcheng 已提交
105

H
TD-353  
Hongze Cheng 已提交
106 107
  buf = taosDecodeFixedI32(buf, &version);
  buf = taosDecodeFixedI32(buf, &numOfCols);
H
TD-27  
hzcheng 已提交
108

H
Hongze Cheng 已提交
109 110
  if (tdInitTSchemaBuilder(&schemaBuilder, version) < 0) return NULL;

H
TD-353  
Hongze Cheng 已提交
111
  for (int i = 0; i < numOfCols; i++) {
H
TD-27  
hzcheng 已提交
112 113
    int8_t  type = 0;
    int16_t colId = 0;
114
    int16_t bytes = 0;
H
TD-353  
Hongze Cheng 已提交
115 116
    buf = taosDecodeFixedI8(buf, &type);
    buf = taosDecodeFixedI16(buf, &colId);
117
    buf = taosDecodeFixedI16(buf, &bytes);
H
Hongze Cheng 已提交
118 119 120 121
    if (tdAddColToSchema(&schemaBuilder, type, colId, bytes) < 0) {
      tdDestroyTSchemaBuilder(&schemaBuilder);
      return NULL;
    }
H
TD-27  
hzcheng 已提交
122 123
  }

H
TD-353  
Hongze Cheng 已提交
124
  *pRSchema = tdGetSchemaFromBuilder(&schemaBuilder);
H
Hongze Cheng 已提交
125
  tdDestroyTSchemaBuilder(&schemaBuilder);
H
TD-353  
Hongze Cheng 已提交
126
  return buf;
H
Hongze Cheng 已提交
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
}

int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) {
  if (pBuilder == NULL) return -1;

  pBuilder->tCols = 256;
  pBuilder->columns = (STColumn *)malloc(sizeof(STColumn) * pBuilder->tCols);
  if (pBuilder->columns == NULL) return -1;

  tdResetTSchemaBuilder(pBuilder, version);
  return 0;
}

void tdDestroyTSchemaBuilder(STSchemaBuilder *pBuilder) {
  if (pBuilder) {
S
TD-1848  
Shengliang Guan 已提交
142
    tfree(pBuilder->columns);
H
Hongze Cheng 已提交
143 144 145 146 147 148 149
  }
}

void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) {
  pBuilder->nCols = 0;
  pBuilder->tlen = 0;
  pBuilder->flen = 0;
T
Tao Liu 已提交
150
  pBuilder->vlen = 0;
H
Hongze Cheng 已提交
151 152 153
  pBuilder->version = version;
}

154
int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int16_t colId, int16_t bytes) {
155
  if (!isValidDataType(type)) return -1;
H
Hongze Cheng 已提交
156 157 158

  if (pBuilder->nCols >= pBuilder->tCols) {
    pBuilder->tCols *= 2;
T
tickduan 已提交
159 160 161
    STColumn* columns = (STColumn *)realloc(pBuilder->columns, sizeof(STColumn) * pBuilder->tCols);
    if (columns == NULL) return -1;
    pBuilder->columns = columns;
H
Hongze Cheng 已提交
162 163 164 165 166 167 168 169 170 171 172 173 174 175
  }

  STColumn *pCol = &(pBuilder->columns[pBuilder->nCols]);
  colSetType(pCol, type);
  colSetColId(pCol, colId);
  if (pBuilder->nCols == 0) {
    colSetOffset(pCol, 0);
  } else {
    STColumn *pTCol = &(pBuilder->columns[pBuilder->nCols-1]);
    colSetOffset(pCol, pTCol->offset + TYPE_BYTES[pTCol->type]);
  }

  if (IS_VAR_DATA_TYPE(type)) {
    colSetBytes(pCol, bytes);
T
Tao Liu 已提交
176 177
    pBuilder->tlen += (TYPE_BYTES[type] + bytes);
    pBuilder->vlen += bytes - sizeof(VarDataLenT);
H
Hongze Cheng 已提交
178 179 180
  } else {
    colSetBytes(pCol, TYPE_BYTES[type]);
    pBuilder->tlen += TYPE_BYTES[type];
T
Tao Liu 已提交
181
    pBuilder->vlen += TYPE_BYTES[type];
H
Hongze Cheng 已提交
182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203
  }

  pBuilder->nCols++;
  pBuilder->flen += TYPE_BYTES[type];

  ASSERT(pCol->offset < pBuilder->flen);

  return 0;
}

STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder) {
  if (pBuilder->nCols <= 0) return NULL;

  int tlen = sizeof(STSchema) + sizeof(STColumn) * pBuilder->nCols;

  STSchema *pSchema = (STSchema *)malloc(tlen);
  if (pSchema == NULL) return NULL;

  schemaVersion(pSchema) = pBuilder->version;
  schemaNCols(pSchema) = pBuilder->nCols;
  schemaTLen(pSchema) = pBuilder->tlen;
  schemaFLen(pSchema) = pBuilder->flen;
T
Tao Liu 已提交
204
  schemaVLen(pSchema) = pBuilder->vlen;
H
Hongze Cheng 已提交
205

C
Cary Xu 已提交
206
#ifdef TD_SUPPORT_BITMAP
C
Cary Xu 已提交
207
  schemaTLen(pSchema) += (int)TD_BITMAP_BYTES(schemaNCols(pSchema));
C
Cary Xu 已提交
208 209
#endif

H
Hongze Cheng 已提交
210 211
  memcpy(schemaColAt(pSchema, 0), pBuilder->columns, sizeof(STColumn) * pBuilder->nCols);

H
TD-27  
hzcheng 已提交
212 213 214
  return pSchema;
}

C
Cary Xu 已提交
215
#if 0
H
hzcheng 已提交
216 217 218
/**
 * Initialize a data row
 */
H
TD-90  
Hongze Cheng 已提交
219 220 221 222
void tdInitDataRow(SDataRow row, STSchema *pSchema) {
  dataRowSetLen(row, TD_DATA_ROW_HEAD_SIZE + schemaFLen(pSchema));
  dataRowSetVersion(row, schemaVersion(pSchema));
}
H
hzcheng 已提交
223

C
Cary Xu 已提交
224 225
SDataRow tdNewDataRowFromSchema(STSchema *pSchema) {
  int32_t size = dataRowMaxBytesFromSchema(pSchema);
H
hzcheng 已提交
226

C
Cary Xu 已提交
227 228
  SDataRow row = malloc(size);
  if (row == NULL) return NULL;
H
hzcheng 已提交
229

C
Cary Xu 已提交
230 231 232
  tdInitDataRow(row, pSchema);
  return row;
}
H
hzcheng 已提交
233

H
hzcheng 已提交
234 235 236
/**
 * Free the SDataRow object
 */
C
Cary Xu 已提交
237 238 239
void tdFreeDataRow(SDataRow row) {
  if (row) free(row);
}
C
Cary Xu 已提交
240

C
Cary Xu 已提交
241 242 243
SDataRow tdDataRowDup(SDataRow row) {
  SDataRow trow = malloc(dataRowLen(row));
  if (trow == NULL) return NULL;
H
hzcheng 已提交
244

C
Cary Xu 已提交
245 246 247
  dataRowCpy(trow, row);
  return trow;
}
C
Cary Xu 已提交
248 249 250

SMemRow tdMemRowDup(SMemRow row) {
  SMemRow trow = malloc(memRowTLen(row));
H
hzcheng 已提交
251 252
  if (trow == NULL) return NULL;

C
Cary Xu 已提交
253
  memRowCpy(trow, row);
H
hzcheng 已提交
254
  return trow;
H
hzcheng 已提交
255
}
C
Cary Xu 已提交
256
#endif
C
Cary Xu 已提交
257

258
void dataColInit(SDataCol *pDataCol, STColumn *pCol, int maxPoints) {
H
TD-166  
hzcheng 已提交
259 260 261
  pDataCol->type = colType(pCol);
  pDataCol->colId = colColId(pCol);
  pDataCol->bytes = colBytes(pCol);
C
Cary Xu 已提交
262
  pDataCol->offset = colOffset(pCol) + 0; //TD_DATA_ROW_HEAD_SIZE;
H
TD-166  
hzcheng 已提交
263 264 265

  pDataCol->len = 0;
}
H
Hongze Cheng 已提交
266
// value from timestamp should be TKEY here instead of TSKEY
L
Liu Jicong 已提交
267
int dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints) {
H
TD-166  
hzcheng 已提交
268 269
  ASSERT(pCol != NULL && value != NULL);

K
kailixu 已提交
270 271 272
  if (isAllRowsNull(pCol)) {
    if (isNull(value, pCol->type)) {
      // all null value yet, just return
L
Liu Jicong 已提交
273
      return 0;
K
kailixu 已提交
274 275
    }

L
Liu Jicong 已提交
276
    if(tdAllocMemForCol(pCol, maxPoints) < 0) return -1;
K
kailixu 已提交
277 278
    if (numOfRows > 0) {
      // Find the first not null value, fill all previouse values as NULL
L
Liu Jicong 已提交
279
      dataColSetNEleNull(pCol, numOfRows);
K
kailixu 已提交
280 281 282
    }
  }

H
Hongze Cheng 已提交
283 284 285 286 287 288 289
  if (IS_VAR_DATA_TYPE(pCol->type)) {
    // set offset
    pCol->dataOff[numOfRows] = pCol->len;
    // Copy data
    memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, varDataTLen(value));
    // Update the length
    pCol->len += varDataTLen(value);
H
TD-166  
hzcheng 已提交
290
  } else {
H
Haojun Liao 已提交
291
    ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows);
H
Hongze Cheng 已提交
292 293
    memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, pCol->bytes);
    pCol->len += pCol->bytes;
H
TD-166  
hzcheng 已提交
294
  }
L
Liu Jicong 已提交
295 296 297 298 299 300 301 302 303
  return 0;
}

static FORCE_INLINE const void *tdGetColDataOfRowUnsafe(SDataCol *pCol, int row) {
  if (IS_VAR_DATA_TYPE(pCol->type)) {
    return POINTER_SHIFT(pCol->pData, pCol->dataOff[row]);
  } else {
    return POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * row);
  }
H
TD-166  
hzcheng 已提交
304 305
}

H
TD-166  
hzcheng 已提交
306
bool isNEleNull(SDataCol *pCol, int nEle) {
307
  if(isAllRowsNull(pCol)) return true;
H
Hongze Cheng 已提交
308
  for (int i = 0; i < nEle; i++) {
L
Liu Jicong 已提交
309
    if (!isNull(tdGetColDataOfRowUnsafe(pCol, i), pCol->type)) return false;
H
TD-166  
hzcheng 已提交
310
  }
H
Hongze Cheng 已提交
311
  return true;
H
TD-166  
hzcheng 已提交
312 313
}

314
static FORCE_INLINE void dataColSetNullAt(SDataCol *pCol, int index) {
H
TD-90  
Hongze Cheng 已提交
315 316 317
  if (IS_VAR_DATA_TYPE(pCol->type)) {
    pCol->dataOff[index] = pCol->len;
    char *ptr = POINTER_SHIFT(pCol->pData, pCol->len);
318
    setVardataNull(ptr, pCol->type);
H
TD-90  
Hongze Cheng 已提交
319 320 321 322 323 324 325
    pCol->len += varDataTLen(ptr);
  } else {
    setNull(POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * index), pCol->type, pCol->bytes);
    pCol->len += TYPE_BYTES[pCol->type];
  }
}

326
static void dataColSetNEleNull(SDataCol *pCol, int nEle) {
H
TD-90  
Hongze Cheng 已提交
327 328 329 330 331 332 333 334
  if (IS_VAR_DATA_TYPE(pCol->type)) {
    pCol->len = 0;
    for (int i = 0; i < nEle; i++) {
      dataColSetNullAt(pCol, i);
    }
  } else {
    setNullN(pCol->pData, pCol->type, pCol->bytes, nEle);
    pCol->len = TYPE_BYTES[pCol->type] * nEle;
H
TD-166  
hzcheng 已提交
335 336 337
  }
}

C
Cary Xu 已提交
338
void *dataColSetOffset(SDataCol *pCol, int nEle) {
H
TD-166  
hzcheng 已提交
339 340
  ASSERT(((pCol->type == TSDB_DATA_TYPE_BINARY) || (pCol->type == TSDB_DATA_TYPE_NCHAR)));

H
Hongze Cheng 已提交
341
  void *tptr = pCol->pData;
H
TD-166  
hzcheng 已提交
342
  // char *tptr = (char *)(pCol->pData);
H
TD-166  
hzcheng 已提交
343

H
TD-166  
hzcheng 已提交
344
  VarDataOffsetT offset = 0;
H
TD-166  
hzcheng 已提交
345
  for (int i = 0; i < nEle; i++) {
H
TD-166  
hzcheng 已提交
346
    pCol->dataOff[i] = offset;
H
TD-166  
hzcheng 已提交
347
    offset += varDataTLen(tptr);
H
hzcheng 已提交
348
    tptr = POINTER_SHIFT(tptr, varDataTLen(tptr));
H
TD-166  
hzcheng 已提交
349
  }
C
Cary Xu 已提交
350
  return POINTER_SHIFT(tptr, varDataTLen(tptr));
H
TD-166  
hzcheng 已提交
351 352
}

L
Liu Jicong 已提交
353
SDataCols *tdNewDataCols(int maxCols, int maxRows) {
H
Hongze Cheng 已提交
354
  SDataCols *pCols = (SDataCols *)calloc(1, sizeof(SDataCols));
H
Haojun Liao 已提交
355
  if (pCols == NULL) {
S
Shengliang Guan 已提交
356
    uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)sizeof(SDataCols), strerror(errno));
H
Haojun Liao 已提交
357 358
    return NULL;
  }
H
TD-34  
hzcheng 已提交
359

H
Hongze Cheng 已提交
360
  pCols->maxPoints = maxRows;
L
Liu Jicong 已提交
361 362 363
  pCols->maxCols = maxCols;
  pCols->numOfRows = 0;
  pCols->numOfCols = 0;
H
Hongze Cheng 已提交
364 365 366 367 368 369 370 371 372

  if (maxCols > 0) {
    pCols->cols = (SDataCol *)calloc(maxCols, sizeof(SDataCol));
    if (pCols->cols == NULL) {
      uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)sizeof(SDataCol) * maxCols,
             strerror(errno));
      tdFreeDataCols(pCols);
      return NULL;
    }
L
Liu Jicong 已提交
373 374 375
    int i;
    for(i = 0; i < maxCols; i++) {
      pCols->cols[i].spaceSize = 0;
L
Liu Jicong 已提交
376
      pCols->cols[i].len = 0;
L
Liu Jicong 已提交
377 378 379
      pCols->cols[i].pData = NULL;
      pCols->cols[i].dataOff = NULL;
    }
H
Hongze Cheng 已提交
380 381
  }

H
TD-34  
hzcheng 已提交
382 383 384
  return pCols;
}

H
Hongze Cheng 已提交
385
int tdInitDataCols(SDataCols *pCols, STSchema *pSchema) {
386 387
  int i;
  int oldMaxCols = pCols->maxCols;
L
Liu Jicong 已提交
388
  if (schemaNCols(pSchema) > oldMaxCols) {
H
Hongze Cheng 已提交
389
    pCols->maxCols = schemaNCols(pSchema);
L
Liu Jicong 已提交
390 391 392
    void* ptr = (SDataCol *)realloc(pCols->cols, sizeof(SDataCol) * pCols->maxCols);
    if (ptr == NULL) return -1;
    pCols->cols = ptr;
393 394 395
    for(i = oldMaxCols; i < pCols->maxCols; i++) {
      pCols->cols[i].pData = NULL;
      pCols->cols[i].dataOff = NULL;
L
Liu Jicong 已提交
396
      pCols->cols[i].spaceSize = 0;
397
    }
L
Liu Jicong 已提交
398
  }
H
Hongze Cheng 已提交
399

H
TD-34  
hzcheng 已提交
400 401 402
  tdResetDataCols(pCols);
  pCols->numOfCols = schemaNCols(pSchema);

403
  for (i = 0; i < schemaNCols(pSchema); i++) {
404
    dataColInit(pCols->cols + i, schemaColAt(pSchema, i), pCols->maxPoints);
H
TD-34  
hzcheng 已提交
405
  }
H
Hongze Cheng 已提交
406 407
  
  return 0;
H
TD-34  
hzcheng 已提交
408 409
}

H
Hongze Cheng 已提交
410
SDataCols *tdFreeDataCols(SDataCols *pCols) {
411
  int i;
H
TD-34  
hzcheng 已提交
412
  if (pCols) {
413 414 415 416 417 418 419 420 421
    if(pCols->cols) {
      int maxCols = pCols->maxCols;
      for(i = 0; i < maxCols; i++) {
        SDataCol *pCol = &pCols->cols[i];
        tfree(pCol->pData);
      }
      free(pCols->cols);
      pCols->cols = NULL;
    }
H
TD-34  
hzcheng 已提交
422 423
    free(pCols);
  }
H
Hongze Cheng 已提交
424
  return NULL;
H
TD-34  
hzcheng 已提交
425 426
}

H
TD-100  
hzcheng 已提交
427
SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
L
Liu Jicong 已提交
428
  SDataCols *pRet = tdNewDataCols(pDataCols->maxCols, pDataCols->maxPoints);
H
TD-100  
hzcheng 已提交
429 430 431 432
  if (pRet == NULL) return NULL;

  pRet->numOfCols = pDataCols->numOfCols;
  pRet->sversion = pDataCols->sversion;
H
Haojun Liao 已提交
433
  if (keepData) pRet->numOfRows = pDataCols->numOfRows;
H
TD-100  
hzcheng 已提交
434 435 436 437 438 439

  for (int i = 0; i < pDataCols->numOfCols; i++) {
    pRet->cols[i].type = pDataCols->cols[i].type;
    pRet->cols[i].colId = pDataCols->cols[i].colId;
    pRet->cols[i].bytes = pDataCols->cols[i].bytes;
    pRet->cols[i].offset = pDataCols->cols[i].offset;
H
TD-166  
hzcheng 已提交
440 441

    if (keepData) {
L
Liu Jicong 已提交
442
      if (pDataCols->cols[i].len > 0) {
L
Liu Jicong 已提交
443 444 445 446
        if(tdAllocMemForCol(&pRet->cols[i], pRet->maxPoints) < 0) {
          tdFreeDataCols(pRet);
          return NULL;
        }
L
Liu Jicong 已提交
447
        pRet->cols[i].len = pDataCols->cols[i].len;
H
Hongze Cheng 已提交
448
        memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pDataCols->cols[i].len);
H
Hongze Cheng 已提交
449
        if (IS_VAR_DATA_TYPE(pRet->cols[i].type)) {
L
Liu Jicong 已提交
450 451
          int dataOffSize = sizeof(VarDataOffsetT) * pDataCols->maxPoints;
          memcpy(pRet->cols[i].dataOff, pDataCols->cols[i].dataOff, dataOffSize);
H
Hongze Cheng 已提交
452
        }
H
TD-166  
hzcheng 已提交
453 454
      }
    }
H
TD-100  
hzcheng 已提交
455 456 457 458 459
  }

  return pRet;
}

H
TD-34  
hzcheng 已提交
460
void tdResetDataCols(SDataCols *pCols) {
B
Bomin Zhang 已提交
461 462 463 464 465
  if (pCols != NULL) {
    pCols->numOfRows = 0;
    for (int i = 0; i < pCols->maxCols; i++) {
      dataColReset(pCols->cols + i);
    }
H
TD-34  
hzcheng 已提交
466 467
  }
}
C
Cary Xu 已提交
468
#if 0
469
static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull) {
H
TD-1548  
Hongze Cheng 已提交
470
  ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < dataRowKey(row));
H
TD-166  
hzcheng 已提交
471

C
Cary Xu 已提交
472
  int rcol = 0;
H
TD-90  
Hongze Cheng 已提交
473 474
  int dcol = 0;

475
  while (dcol < pCols->numOfCols) {
476
    bool setCol = 0;
477 478 479 480 481
    SDataCol *pDataCol = &(pCols->cols[dcol]);
    if (rcol >= schemaNCols(pSchema)) {
      dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
      dcol++;
      continue;
H
TD-90  
Hongze Cheng 已提交
482
    }
H
TD-166  
hzcheng 已提交
483

484 485 486
    STColumn *pRowCol = schemaColAt(pSchema, rcol);
    if (pRowCol->colId == pDataCol->colId) {
      void *value = tdGetRowDataOfCol(row, pRowCol->type, pRowCol->offset + TD_DATA_ROW_HEAD_SIZE);
487
      if(!isNull(value, pDataCol->type)) setCol = 1;
488 489 490 491 492 493
      dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints);
      dcol++;
      rcol++;
    } else if (pRowCol->colId < pDataCol->colId) {
      rcol++;
    } else {
494
      if(forceSetNull || setCol) {
495
        dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
C
Cary Xu 已提交
496
      }
497
      dcol++;
C
Cary Xu 已提交
498 499 500 501 502
    }
  }
  pCols->numOfRows++;
}

C
Cary Xu 已提交
503
static void tdAppendKVRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull) {
C
Cary Xu 已提交
504 505 506 507 508
  ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < kvRowKey(row));

  int rcol = 0;
  int dcol = 0;

509 510 511
  int nRowCols = kvRowNCols(row);

  while (dcol < pCols->numOfCols) {
512
    bool setCol = 0;
513 514 515 516 517
    SDataCol *pDataCol = &(pCols->cols[dcol]);
    if (rcol >= nRowCols || rcol >= schemaNCols(pSchema)) {
      dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
      ++dcol;
      continue;
C
Cary Xu 已提交
518 519
    }

520 521 522 523
    SColIdx *colIdx = kvRowColIdxAt(row, rcol);

    if (colIdx->colId == pDataCol->colId) {
      void *value = tdGetKvRowDataOfCol(row, colIdx->offset);
524
      if(!isNull(value, pDataCol->type)) setCol = 1;
525 526 527 528 529 530
      dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints);
      ++dcol;
      ++rcol;
    } else if (colIdx->colId < pDataCol->colId) {
      ++rcol;
    } else {
531
      if(forceSetNull || setCol) {
C
Cary Xu 已提交
532
        dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
H
TD-1548  
Hongze Cheng 已提交
533
      }
534
      ++dcol;
H
TD-90  
Hongze Cheng 已提交
535
    }
H
TD-34  
hzcheng 已提交
536
  }
H
Haojun Liao 已提交
537
  pCols->numOfRows++;
H
TD-34  
hzcheng 已提交
538
}
H
TD-166  
hzcheng 已提交
539

540
void tdAppendMemRowToDataCol(SMemRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull) {
C
Cary Xu 已提交
541
  if (isDataRow(row)) {
542
    tdAppendDataRowToDataCol(memRowDataBody(row), pSchema, pCols, forceSetNull);
C
Cary Xu 已提交
543
  } else if (isKvRow(row)) {
C
Cary Xu 已提交
544
    tdAppendKVRowToDataCol(memRowKvBody(row), pSchema, pCols, forceSetNull);
C
Cary Xu 已提交
545 546 547 548 549
  } else {
    ASSERT(0);
  }
}

550
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset, bool forceSetNull) {
H
Haojun Liao 已提交
551
  ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows);
H
TD-166  
hzcheng 已提交
552
  ASSERT(target->numOfCols == source->numOfCols);
H
Hongze Cheng 已提交
553 554 555 556 557
  int offset = 0;

  if (pOffset == NULL) {
    pOffset = &offset;
  }
H
TD-100  
hzcheng 已提交
558

H
TD-166  
hzcheng 已提交
559
  SDataCols *pTarget = NULL;
H
TD-100  
hzcheng 已提交
560

561
  if ((target->numOfRows == 0) || (dataColsKeyLast(target) < dataColsKeyAtRow(source, *pOffset))) {  // No overlap
H
Hongze Cheng 已提交
562
    ASSERT(target->numOfRows + rowsToMerge <= target->maxPoints);
H
TD-166  
hzcheng 已提交
563 564
    for (int i = 0; i < rowsToMerge; i++) {
      for (int j = 0; j < source->numOfCols; j++) {
565
        if (source->cols[j].len > 0 || target->cols[j].len > 0) {
H
Hongze Cheng 已提交
566
          dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i + (*pOffset)), target->numOfRows,
H
Hongze Cheng 已提交
567 568
                           target->maxPoints);
        }
H
TD-166  
hzcheng 已提交
569
      }
H
Haojun Liao 已提交
570
      target->numOfRows++;
H
TD-166  
hzcheng 已提交
571
    }
L
lichuang 已提交
572
    (*pOffset) += rowsToMerge;
H
TD-166  
hzcheng 已提交
573 574 575 576 577
  } else {
    pTarget = tdDupDataCols(target, true);
    if (pTarget == NULL) goto _err;

    int iter1 = 0;
H
Hongze Cheng 已提交
578
    tdMergeTwoDataCols(target, pTarget, &iter1, pTarget->numOfRows, source, pOffset, source->numOfRows,
579
                       pTarget->numOfRows + rowsToMerge, forceSetNull);
H
TD-166  
hzcheng 已提交
580
  }
H
TD-100  
hzcheng 已提交
581 582 583 584 585 586 587 588

  tdFreeDataCols(pTarget);
  return 0;

_err:
  tdFreeDataCols(pTarget);
  return -1;
}
H
TD-100  
hzcheng 已提交
589

H
TD-1438  
Hongze Cheng 已提交
590 591
// src2 data has more priority than src1
static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2,
592
                               int limit2, int tRows, bool forceSetNull) {
H
TD-100  
hzcheng 已提交
593
  tdResetDataCols(target);
H
TD-521  
Hongze Cheng 已提交
594
  ASSERT(limit1 <= src1->numOfRows && limit2 <= src2->numOfRows);
H
TD-100  
hzcheng 已提交
595

H
Haojun Liao 已提交
596
  while (target->numOfRows < tRows) {
H
TD-521  
Hongze Cheng 已提交
597
    if (*iter1 >= limit1 && *iter2 >= limit2) break;
H
TD-100  
hzcheng 已提交
598

H
TD-1548  
Hongze Cheng 已提交
599 600 601 602 603 604
    TSKEY key1 = (*iter1 >= limit1) ? INT64_MAX : dataColsKeyAt(src1, *iter1);
    TKEY  tkey1 = (*iter1 >= limit1) ? TKEY_NULL : dataColsTKeyAt(src1, *iter1);
    TSKEY key2 = (*iter2 >= limit2) ? INT64_MAX : dataColsKeyAt(src2, *iter2);
    TKEY  tkey2 = (*iter2 >= limit2) ? TKEY_NULL : dataColsTKeyAt(src2, *iter2);

    ASSERT(tkey1 == TKEY_NULL || (!TKEY_IS_DELETED(tkey1)));
H
TD-100  
hzcheng 已提交
605

H
TD-1438  
Hongze Cheng 已提交
606
    if (key1 < key2) {
H
TD-100  
hzcheng 已提交
607 608
      for (int i = 0; i < src1->numOfCols; i++) {
        ASSERT(target->cols[i].type == src1->cols[i].type);
609
        if (src1->cols[i].len > 0 || target->cols[i].len > 0) {
H
Hongze Cheng 已提交
610 611 612
          dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows,
                           target->maxPoints);
        }
H
TD-100  
hzcheng 已提交
613 614
      }

H
Haojun Liao 已提交
615
      target->numOfRows++;
H
TD-100  
hzcheng 已提交
616
      (*iter1)++;
H
TD-1548  
Hongze Cheng 已提交
617 618 619 620
    } else if (key1 >= key2) {
      if ((key1 > key2) || (key1 == key2 && !TKEY_IS_DELETED(tkey2))) {
        for (int i = 0; i < src2->numOfCols; i++) {
          ASSERT(target->cols[i].type == src2->cols[i].type);
L
Liu Jicong 已提交
621
          if (src2->cols[i].len > 0 && !isNull(src2->cols[i].pData, src2->cols[i].type)) {
H
TD-1548  
Hongze Cheng 已提交
622 623
            dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src2->cols + i, *iter2), target->numOfRows,
                             target->maxPoints);
L
Liu Jicong 已提交
624 625 626
          } else if(!forceSetNull && key1 == key2 && src1->cols[i].len > 0) {
            dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows,
                             target->maxPoints);
627 628
          } else if(target->cols[i].len > 0) {
            dataColSetNullAt(&target->cols[i], target->numOfRows);
H
TD-1548  
Hongze Cheng 已提交
629
          }
H
Hongze Cheng 已提交
630
        }
H
Hongze Cheng 已提交
631
        target->numOfRows++;
H
TD-100  
hzcheng 已提交
632
      }
H
TD-100  
hzcheng 已提交
633

H
TD-100  
hzcheng 已提交
634
      (*iter2)++;
H
TD-1438  
Hongze Cheng 已提交
635
      if (key1 == key2) (*iter1)++;
H
TD-100  
hzcheng 已提交
636
    }
H
Hongze Cheng 已提交
637 638

    ASSERT(target->numOfRows <= target->maxPoints);
H
TD-100  
hzcheng 已提交
639
  }
H
Hongze Cheng 已提交
640
}
C
Cary Xu 已提交
641
#endif
H
Hongze Cheng 已提交
642

H
Hongze Cheng 已提交
643 644
SKVRow tdKVRowDup(SKVRow row) {
  SKVRow trow = malloc(kvRowLen(row));
H
Hongze Cheng 已提交
645 646
  if (trow == NULL) return NULL;

H
Hongze Cheng 已提交
647
  kvRowCpy(trow, row);
H
Hongze Cheng 已提交
648 649 650
  return trow;
}

B
Bomin Zhang 已提交
651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666
static int compareColIdx(const void* a, const void* b) {
  const SColIdx* x = (const SColIdx*)a;
  const SColIdx* y = (const SColIdx*)b;
  if (x->colId > y->colId) {
    return 1;
  }
  if (x->colId < y->colId) {
    return -1;
  }
  return 0;
}

void tdSortKVRowByColIdx(SKVRow row) {
  qsort(kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), compareColIdx);
}

H
TD-90  
Hongze Cheng 已提交
667 668 669 670 671 672
int tdSetKVRowDataOfCol(SKVRow *orow, int16_t colId, int8_t type, void *value) {
  SColIdx *pColIdx = NULL;
  SKVRow   row = *orow;
  SKVRow   nrow = NULL;
  void *   ptr = taosbsearch(&colId, kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), comparTagId, TD_GE);

673
  if (ptr == NULL || ((SColIdx *)ptr)->colId > colId) {  // need to add a column value to the row
C
Cary Xu 已提交
674
    int diff = IS_VAR_DATA_TYPE(type) ? varDataTLen(value) : TYPE_BYTES[type];
675 676 677 678 679
    int nRowLen = kvRowLen(row) + sizeof(SColIdx) + diff;
    int oRowCols = kvRowNCols(row);

    ASSERT(diff > 0);
    nrow = malloc(nRowLen);
H
TD-90  
Hongze Cheng 已提交
680 681
    if (nrow == NULL) return -1;

682 683
    kvRowSetLen(nrow, nRowLen);
    kvRowSetNCols(nrow, oRowCols + 1);
H
TD-90  
Hongze Cheng 已提交
684

685 686
    memcpy(kvRowColIdx(nrow), kvRowColIdx(row), sizeof(SColIdx) * oRowCols);
    memcpy(kvRowValues(nrow), kvRowValues(row), kvRowValLen(row));
H
TD-90  
Hongze Cheng 已提交
687

688 689 690
    pColIdx = kvRowColIdxAt(nrow, oRowCols);
    pColIdx->colId = colId;
    pColIdx->offset = kvRowValLen(row);
H
TD-90  
Hongze Cheng 已提交
691

692
    memcpy(kvRowColVal(nrow, pColIdx), value, diff);  // copy new value
H
TD-90  
Hongze Cheng 已提交
693

694
    tdSortKVRowByColIdx(nrow);
H
TD-90  
Hongze Cheng 已提交
695 696 697

    *orow = nrow;
    free(row);
H
TD-90  
Hongze Cheng 已提交
698 699 700 701 702 703 704
  } else {
    ASSERT(((SColIdx *)ptr)->colId == colId);
    if (IS_VAR_DATA_TYPE(type)) {
      void *pOldVal = kvRowColVal(row, (SColIdx *)ptr);

      if (varDataTLen(value) == varDataTLen(pOldVal)) { // just update the column value in place
        memcpy(pOldVal, value, varDataTLen(value));
705 706
      } else {  // need to reallocate the memory
        int16_t nlen = kvRowLen(row) + (varDataTLen(value) - varDataTLen(pOldVal));
H
TD-90  
Hongze Cheng 已提交
707 708
        ASSERT(nlen > 0);
        nrow = malloc(nlen);
H
TD-90  
Hongze Cheng 已提交
709
        if (nrow == NULL) return -1;
H
TD-90  
Hongze Cheng 已提交
710 711 712 713

        kvRowSetLen(nrow, nlen);
        kvRowSetNCols(nrow, kvRowNCols(row));

714 715 716 717 718 719 720 721
        int zsize = sizeof(SColIdx) * kvRowNCols(row) + ((SColIdx *)ptr)->offset;
        memcpy(kvRowColIdx(nrow), kvRowColIdx(row), zsize);
        memcpy(kvRowColVal(nrow, ((SColIdx *)ptr)), value, varDataTLen(value));
        // Copy left value part
        int lsize = kvRowLen(row) - TD_KV_ROW_HEAD_SIZE - zsize - varDataTLen(pOldVal);
        if (lsize > 0) {
          memcpy(POINTER_SHIFT(nrow, TD_KV_ROW_HEAD_SIZE + zsize + varDataTLen(value)),
                 POINTER_SHIFT(row, TD_KV_ROW_HEAD_SIZE + zsize + varDataTLen(pOldVal)), lsize);
H
TD-90  
Hongze Cheng 已提交
722 723
        }

724 725 726 727 728
        for (int i = 0; i < kvRowNCols(nrow); i++) {
          pColIdx = kvRowColIdxAt(nrow, i);

          if (pColIdx->offset > ((SColIdx *)ptr)->offset) {
            pColIdx->offset = pColIdx->offset - varDataTLen(pOldVal) + varDataTLen(value);
H
TD-90  
Hongze Cheng 已提交
729 730 731 732
          }
        }

        *orow = nrow;
H
TD-90  
Hongze Cheng 已提交
733
        free(row);
H
TD-90  
Hongze Cheng 已提交
734 735 736 737 738 739 740
      }
    } else {
      memcpy(kvRowColVal(row, (SColIdx *)ptr), value, TYPE_BYTES[type]);
    }
  }

  return 0;
H
Hongze Cheng 已提交
741 742
}

H
TD-353  
Hongze Cheng 已提交
743
int tdEncodeKVRow(void **buf, SKVRow row) {
H
Hongze Cheng 已提交
744
  // May change the encode purpose
H
TD-353  
Hongze Cheng 已提交
745 746 747 748 749 750
  if (buf != NULL) {
    kvRowCpy(*buf, row);
    *buf = POINTER_SHIFT(*buf, kvRowLen(row));
  }

  return kvRowLen(row);
H
Hongze Cheng 已提交
751 752
}

H
Hongze Cheng 已提交
753 754
void *tdDecodeKVRow(void *buf, SKVRow *row) {
  *row = tdKVRowDup(buf);
H
TD-353  
Hongze Cheng 已提交
755
  if (*row == NULL) return NULL;
H
Hongze Cheng 已提交
756
  return POINTER_SHIFT(buf, kvRowLen(*row));
H
Hongze Cheng 已提交
757 758
}

H
Hongze Cheng 已提交
759
int tdInitKVRowBuilder(SKVRowBuilder *pBuilder) {
H
Hongze Cheng 已提交
760 761 762 763 764 765 766 767 768 769 770 771 772 773
  pBuilder->tCols = 128;
  pBuilder->nCols = 0;
  pBuilder->pColIdx = (SColIdx *)malloc(sizeof(SColIdx) * pBuilder->tCols);
  if (pBuilder->pColIdx == NULL) return -1;
  pBuilder->alloc = 1024;
  pBuilder->size = 0;
  pBuilder->buf = malloc(pBuilder->alloc);
  if (pBuilder->buf == NULL) {
    free(pBuilder->pColIdx);
    return -1;
  }
  return 0;
}

H
Hongze Cheng 已提交
774
void tdDestroyKVRowBuilder(SKVRowBuilder *pBuilder) {
S
TD-1848  
Shengliang Guan 已提交
775 776
  tfree(pBuilder->pColIdx);
  tfree(pBuilder->buf);
H
Hongze Cheng 已提交
777 778
}

H
Hongze Cheng 已提交
779
void tdResetKVRowBuilder(SKVRowBuilder *pBuilder) {
H
Hongze Cheng 已提交
780 781 782 783
  pBuilder->nCols = 0;
  pBuilder->size = 0;
}

H
Hongze Cheng 已提交
784
SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder) {
C
Cary Xu 已提交
785
  int tlen = sizeof(SColIdx) * pBuilder->nCols + pBuilder->size;
H
Hongze Cheng 已提交
786 787
  if (tlen == 0) return NULL;

H
Hongze Cheng 已提交
788 789 790
  tlen += TD_KV_ROW_HEAD_SIZE;

  SKVRow row = malloc(tlen);
H
Hongze Cheng 已提交
791 792
  if (row == NULL) return NULL;

H
Hongze Cheng 已提交
793
  kvRowSetNCols(row, pBuilder->nCols);
H
Hongze Cheng 已提交
794
  kvRowSetLen(row, tlen);
H
Hongze Cheng 已提交
795

H
Hongze Cheng 已提交
796 797
  memcpy(kvRowColIdx(row), pBuilder->pColIdx, sizeof(SColIdx) * pBuilder->nCols);
  memcpy(kvRowValues(row), pBuilder->buf, pBuilder->size);
H
Hongze Cheng 已提交
798 799

  return row;
800
}
C
Cary Xu 已提交
801
#if 0
802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820
SMemRow mergeTwoMemRows(void *buffer, SMemRow row1, SMemRow row2, STSchema *pSchema1, STSchema *pSchema2) {
#if 0
  ASSERT(memRowKey(row1) == memRowKey(row2));
  ASSERT(schemaVersion(pSchema1) == memRowVersion(row1));
  ASSERT(schemaVersion(pSchema2) == memRowVersion(row2));
  ASSERT(schemaVersion(pSchema1) >= schemaVersion(pSchema2));
#endif

  SArray *stashRow = taosArrayInit(pSchema1->numOfCols, sizeof(SColInfo));
  if (stashRow == NULL) {
    return NULL;
  }

  SMemRow  pRow = buffer;
  SDataRow dataRow = memRowDataBody(pRow);
  memRowSetType(pRow, SMEM_ROW_DATA);
  dataRowSetVersion(dataRow, schemaVersion(pSchema1));  // use latest schema version
  dataRowSetLen(dataRow, (TDRowLenT)(TD_DATA_ROW_HEAD_SIZE + pSchema1->flen));

C
Cary Xu 已提交
821
  TDRowLenT dataLen = 0, kvLen = TD_MEM_ROW_KV_HEAD_SIZE;
822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886

  int32_t  i = 0;  // row1
  int32_t  j = 0;  // row2
  int32_t  nCols1 = schemaNCols(pSchema1);
  int32_t  nCols2 = schemaNCols(pSchema2);
  SColInfo colInfo = {0};
  int32_t  kvIdx1 = 0, kvIdx2 = 0;

  while (i < nCols1) {
    STColumn *pCol = schemaColAt(pSchema1, i);
    void *    val1 = tdGetMemRowDataOfColEx(row1, pCol->colId, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset, &kvIdx1);
    // if val1 != NULL, use val1;
    if (val1 != NULL && !isNull(val1, pCol->type)) {
      tdAppendColVal(dataRow, val1, pCol->type, pCol->offset);
      kvLen += tdGetColAppendLen(SMEM_ROW_KV, val1, pCol->type);
      setSColInfo(&colInfo, pCol->colId, pCol->type, val1);
      taosArrayPush(stashRow, &colInfo);
      ++i;  // next col
      continue;
    }

    void *val2 = NULL;
    while (j < nCols2) {
      STColumn *tCol = schemaColAt(pSchema2, j);
      if (tCol->colId < pCol->colId) {
        ++j;
        continue;
      }
      if (tCol->colId == pCol->colId) {
        val2 = tdGetMemRowDataOfColEx(row2, tCol->colId, tCol->type, TD_DATA_ROW_HEAD_SIZE + tCol->offset, &kvIdx2);
      } else if (tCol->colId > pCol->colId) {
        // set NULL
      }
      break;
    }  // end of while(j<nCols2)
    if (val2 == NULL) {
      val2 = (void *)getNullValue(pCol->type);
    }
    tdAppendColVal(dataRow, val2, pCol->type, pCol->offset);
    if (!isNull(val2, pCol->type)) {
      kvLen += tdGetColAppendLen(SMEM_ROW_KV, val2, pCol->type);
      setSColInfo(&colInfo, pCol->colId, pCol->type, val2);
      taosArrayPush(stashRow, &colInfo);
    }

    ++i;  // next col
  }

  dataLen = memRowTLen(pRow);

  if (kvLen < dataLen) {
    // scan stashRow and generate SKVRow
    memset(buffer, 0, sizeof(dataLen));
    SMemRow tRow = buffer;
    memRowSetType(tRow, SMEM_ROW_KV);
    SKVRow kvRow = (SKVRow)memRowKvBody(tRow);
    int16_t nKvNCols = (int16_t) taosArrayGetSize(stashRow);
    kvRowSetLen(kvRow, (TDRowLenT)(TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * nKvNCols));
    kvRowSetNCols(kvRow, nKvNCols);
    memRowSetKvVersion(tRow, pSchema1->version);

    int32_t toffset = 0;
    int16_t k;
    for (k = 0; k < nKvNCols; ++k) {
      SColInfo *pColInfo = taosArrayGet(stashRow, k);
887 888
      tdAppendKvColVal(kvRow, pColInfo->colVal, true, pColInfo->colId, pColInfo->colType, toffset);
      toffset += sizeof(SColIdx);
889 890 891 892 893 894
    }
    ASSERT(kvLen == memRowTLen(tRow));
  }
  taosArrayDestroy(stashRow);
  return buffer;
}
C
Cary Xu 已提交
895
#endif