tdataformat.c 26.0 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
S
slguan 已提交
15
#include "tdataformat.h"
S
Shengliang Guan 已提交
16
#include "ulog.h"
T
Tao Liu 已提交
17
#include "talgo.h"
H
TD-353  
Hongze Cheng 已提交
18
#include "tcoding.h"
H
Hongze Cheng 已提交
19
#include "wchar.h"
20
#include "tarray.h"
H
more  
hzcheng 已提交
21

22
static void dataColSetNEleNull(SDataCol *pCol, int nEle);
H
TD-1438  
Hongze Cheng 已提交
23
static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2,
24
                               int limit2, int tRows, bool forceSetNull);
H
TD-1438  
Hongze Cheng 已提交
25

L
Liu Jicong 已提交
26
int tdAllocMemForCol(SDataCol *pCol, int maxPoints) {
L
Liu Jicong 已提交
27 28
  int spaceNeeded = pCol->bytes * maxPoints;
  if(IS_VAR_DATA_TYPE(pCol->type)) {
L
Liu Jicong 已提交
29
    spaceNeeded += sizeof(VarDataOffsetT) * maxPoints;
L
Liu Jicong 已提交
30 31 32 33
  }
  if(pCol->spaceSize < spaceNeeded) {
    void* ptr = realloc(pCol->pData, spaceNeeded);
    if(ptr == NULL) {
L
Liu Jicong 已提交
34
      uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)spaceNeeded,
35
             strerror(errno));
L
Liu Jicong 已提交
36
      return -1;
L
Liu Jicong 已提交
37 38 39
    } else {
      pCol->pData = ptr;
      pCol->spaceSize = spaceNeeded;
40 41
    }
  }
L
Liu Jicong 已提交
42 43 44
  if(IS_VAR_DATA_TYPE(pCol->type)) {
    pCol->dataOff = POINTER_SHIFT(pCol->pData, pCol->bytes * maxPoints);
  }
L
Liu Jicong 已提交
45
  return 0;
46 47
}

H
hzcheng 已提交
48 49 50 51
/**
 * Duplicate the schema and return a new object
 */
STSchema *tdDupSchema(STSchema *pSchema) {
H
Hongze Cheng 已提交
52 53 54

  int tlen = sizeof(STSchema) + sizeof(STColumn) * schemaNCols(pSchema);
  STSchema *tSchema = (STSchema *)malloc(tlen);
H
hzcheng 已提交
55 56
  if (tSchema == NULL) return NULL;

H
Hongze Cheng 已提交
57
  memcpy((void *)tSchema, (void *)pSchema, tlen);
H
hzcheng 已提交
58 59 60 61

  return tSchema;
}

H
TD-27  
hzcheng 已提交
62 63 64
/**
 * Encode a schema to dst, and return the next pointer
 */
H
TD-353  
Hongze Cheng 已提交
65 66 67 68
int tdEncodeSchema(void **buf, STSchema *pSchema) {
  int tlen = 0;
  tlen += taosEncodeFixedI32(buf, schemaVersion(pSchema));
  tlen += taosEncodeFixedI32(buf, schemaNCols(pSchema));
H
TD-166  
hzcheng 已提交
69

H
TD-27  
hzcheng 已提交
70 71
  for (int i = 0; i < schemaNCols(pSchema); i++) {
    STColumn *pCol = schemaColAt(pSchema, i);
H
TD-353  
Hongze Cheng 已提交
72 73
    tlen += taosEncodeFixedI8(buf, colType(pCol));
    tlen += taosEncodeFixedI16(buf, colColId(pCol));
74
    tlen += taosEncodeFixedI16(buf, colBytes(pCol));
H
TD-27  
hzcheng 已提交
75 76
  }

H
TD-353  
Hongze Cheng 已提交
77
  return tlen;
H
TD-27  
hzcheng 已提交
78 79 80 81 82
}

/**
 * Decode a schema from a binary.
 */
H
TD-353  
Hongze Cheng 已提交
83
void *tdDecodeSchema(void *buf, STSchema **pRSchema) {
H
Hongze Cheng 已提交
84
  int version = 0;
H
TD-353  
Hongze Cheng 已提交
85
  int numOfCols = 0;
H
TD-353  
Hongze Cheng 已提交
86
  STSchemaBuilder schemaBuilder;
H
TD-27  
hzcheng 已提交
87

H
TD-353  
Hongze Cheng 已提交
88 89
  buf = taosDecodeFixedI32(buf, &version);
  buf = taosDecodeFixedI32(buf, &numOfCols);
H
TD-27  
hzcheng 已提交
90

H
Hongze Cheng 已提交
91 92
  if (tdInitTSchemaBuilder(&schemaBuilder, version) < 0) return NULL;

H
TD-353  
Hongze Cheng 已提交
93
  for (int i = 0; i < numOfCols; i++) {
H
TD-27  
hzcheng 已提交
94 95
    int8_t  type = 0;
    int16_t colId = 0;
96
    int16_t bytes = 0;
H
TD-353  
Hongze Cheng 已提交
97 98
    buf = taosDecodeFixedI8(buf, &type);
    buf = taosDecodeFixedI16(buf, &colId);
99
    buf = taosDecodeFixedI16(buf, &bytes);
H
Hongze Cheng 已提交
100 101 102 103
    if (tdAddColToSchema(&schemaBuilder, type, colId, bytes) < 0) {
      tdDestroyTSchemaBuilder(&schemaBuilder);
      return NULL;
    }
H
TD-27  
hzcheng 已提交
104 105
  }

H
TD-353  
Hongze Cheng 已提交
106
  *pRSchema = tdGetSchemaFromBuilder(&schemaBuilder);
H
Hongze Cheng 已提交
107
  tdDestroyTSchemaBuilder(&schemaBuilder);
H
TD-353  
Hongze Cheng 已提交
108
  return buf;
H
Hongze Cheng 已提交
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
}

int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) {
  if (pBuilder == NULL) return -1;

  pBuilder->tCols = 256;
  pBuilder->columns = (STColumn *)malloc(sizeof(STColumn) * pBuilder->tCols);
  if (pBuilder->columns == NULL) return -1;

  tdResetTSchemaBuilder(pBuilder, version);
  return 0;
}

void tdDestroyTSchemaBuilder(STSchemaBuilder *pBuilder) {
  if (pBuilder) {
S
TD-1848  
Shengliang Guan 已提交
124
    tfree(pBuilder->columns);
H
Hongze Cheng 已提交
125 126 127 128 129 130 131
  }
}

void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) {
  pBuilder->nCols = 0;
  pBuilder->tlen = 0;
  pBuilder->flen = 0;
T
Tao Liu 已提交
132
  pBuilder->vlen = 0;
H
Hongze Cheng 已提交
133 134 135
  pBuilder->version = version;
}

136
int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int16_t colId, int16_t bytes) {
137
  if (!isValidDataType(type)) return -1;
H
Hongze Cheng 已提交
138 139 140

  if (pBuilder->nCols >= pBuilder->tCols) {
    pBuilder->tCols *= 2;
T
tickduan 已提交
141 142 143
    STColumn* columns = (STColumn *)realloc(pBuilder->columns, sizeof(STColumn) * pBuilder->tCols);
    if (columns == NULL) return -1;
    pBuilder->columns = columns;
H
Hongze Cheng 已提交
144 145 146 147 148 149 150 151 152 153 154 155 156 157
  }

  STColumn *pCol = &(pBuilder->columns[pBuilder->nCols]);
  colSetType(pCol, type);
  colSetColId(pCol, colId);
  if (pBuilder->nCols == 0) {
    colSetOffset(pCol, 0);
  } else {
    STColumn *pTCol = &(pBuilder->columns[pBuilder->nCols-1]);
    colSetOffset(pCol, pTCol->offset + TYPE_BYTES[pTCol->type]);
  }

  if (IS_VAR_DATA_TYPE(type)) {
    colSetBytes(pCol, bytes);
T
Tao Liu 已提交
158 159
    pBuilder->tlen += (TYPE_BYTES[type] + bytes);
    pBuilder->vlen += bytes - sizeof(VarDataLenT);
H
Hongze Cheng 已提交
160 161 162
  } else {
    colSetBytes(pCol, TYPE_BYTES[type]);
    pBuilder->tlen += TYPE_BYTES[type];
T
Tao Liu 已提交
163
    pBuilder->vlen += TYPE_BYTES[type];
H
Hongze Cheng 已提交
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
  }

  pBuilder->nCols++;
  pBuilder->flen += TYPE_BYTES[type];

  ASSERT(pCol->offset < pBuilder->flen);

  return 0;
}

STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder) {
  if (pBuilder->nCols <= 0) return NULL;

  int tlen = sizeof(STSchema) + sizeof(STColumn) * pBuilder->nCols;

  STSchema *pSchema = (STSchema *)malloc(tlen);
  if (pSchema == NULL) return NULL;

  schemaVersion(pSchema) = pBuilder->version;
  schemaNCols(pSchema) = pBuilder->nCols;
  schemaTLen(pSchema) = pBuilder->tlen;
  schemaFLen(pSchema) = pBuilder->flen;
T
Tao Liu 已提交
186
  schemaVLen(pSchema) = pBuilder->vlen;
H
Hongze Cheng 已提交
187 188 189

  memcpy(schemaColAt(pSchema, 0), pBuilder->columns, sizeof(STColumn) * pBuilder->nCols);

H
TD-27  
hzcheng 已提交
190 191 192
  return pSchema;
}

H
hzcheng 已提交
193 194 195
/**
 * Initialize a data row
 */
H
TD-90  
Hongze Cheng 已提交
196 197 198 199
void tdInitDataRow(SDataRow row, STSchema *pSchema) {
  dataRowSetLen(row, TD_DATA_ROW_HEAD_SIZE + schemaFLen(pSchema));
  dataRowSetVersion(row, schemaVersion(pSchema));
}
H
hzcheng 已提交
200

C
Cary Xu 已提交
201 202
SDataRow tdNewDataRowFromSchema(STSchema *pSchema) {
  int32_t size = dataRowMaxBytesFromSchema(pSchema);
H
hzcheng 已提交
203

C
Cary Xu 已提交
204 205
  SDataRow row = malloc(size);
  if (row == NULL) return NULL;
H
hzcheng 已提交
206

C
Cary Xu 已提交
207 208 209
  tdInitDataRow(row, pSchema);
  return row;
}
H
hzcheng 已提交
210

H
hzcheng 已提交
211 212 213
/**
 * Free the SDataRow object
 */
C
Cary Xu 已提交
214 215 216
void tdFreeDataRow(SDataRow row) {
  if (row) free(row);
}
C
Cary Xu 已提交
217

C
Cary Xu 已提交
218 219 220
SDataRow tdDataRowDup(SDataRow row) {
  SDataRow trow = malloc(dataRowLen(row));
  if (trow == NULL) return NULL;
H
hzcheng 已提交
221

C
Cary Xu 已提交
222 223 224
  dataRowCpy(trow, row);
  return trow;
}
C
Cary Xu 已提交
225 226 227

SMemRow tdMemRowDup(SMemRow row) {
  SMemRow trow = malloc(memRowTLen(row));
H
hzcheng 已提交
228 229
  if (trow == NULL) return NULL;

C
Cary Xu 已提交
230
  memRowCpy(trow, row);
H
hzcheng 已提交
231
  return trow;
H
hzcheng 已提交
232
}
C
Cary Xu 已提交
233

234
void dataColInit(SDataCol *pDataCol, STColumn *pCol, int maxPoints) {
H
TD-166  
hzcheng 已提交
235 236 237
  pDataCol->type = colType(pCol);
  pDataCol->colId = colColId(pCol);
  pDataCol->bytes = colBytes(pCol);
C
Cary Xu 已提交
238
  pDataCol->offset = colOffset(pCol) + TD_DATA_ROW_HEAD_SIZE;
H
TD-166  
hzcheng 已提交
239 240 241

  pDataCol->len = 0;
}
H
Hongze Cheng 已提交
242
// value from timestamp should be TKEY here instead of TSKEY
L
Liu Jicong 已提交
243
int dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints) {
H
TD-166  
hzcheng 已提交
244 245
  ASSERT(pCol != NULL && value != NULL);

K
kailixu 已提交
246 247 248
  if (isAllRowsNull(pCol)) {
    if (isNull(value, pCol->type)) {
      // all null value yet, just return
L
Liu Jicong 已提交
249
      return 0;
K
kailixu 已提交
250 251
    }

L
Liu Jicong 已提交
252
    if(tdAllocMemForCol(pCol, maxPoints) < 0) return -1;
K
kailixu 已提交
253 254
    if (numOfRows > 0) {
      // Find the first not null value, fill all previouse values as NULL
L
Liu Jicong 已提交
255
      dataColSetNEleNull(pCol, numOfRows);
K
kailixu 已提交
256 257 258
    }
  }

H
Hongze Cheng 已提交
259 260 261 262 263 264 265
  if (IS_VAR_DATA_TYPE(pCol->type)) {
    // set offset
    pCol->dataOff[numOfRows] = pCol->len;
    // Copy data
    memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, varDataTLen(value));
    // Update the length
    pCol->len += varDataTLen(value);
H
TD-166  
hzcheng 已提交
266
  } else {
H
Haojun Liao 已提交
267
    ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows);
H
Hongze Cheng 已提交
268 269
    memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, pCol->bytes);
    pCol->len += pCol->bytes;
H
TD-166  
hzcheng 已提交
270
  }
L
Liu Jicong 已提交
271 272 273 274 275 276 277 278 279
  return 0;
}

static FORCE_INLINE const void *tdGetColDataOfRowUnsafe(SDataCol *pCol, int row) {
  if (IS_VAR_DATA_TYPE(pCol->type)) {
    return POINTER_SHIFT(pCol->pData, pCol->dataOff[row]);
  } else {
    return POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * row);
  }
H
TD-166  
hzcheng 已提交
280 281
}

H
TD-166  
hzcheng 已提交
282
bool isNEleNull(SDataCol *pCol, int nEle) {
283
  if(isAllRowsNull(pCol)) return true;
H
Hongze Cheng 已提交
284
  for (int i = 0; i < nEle; i++) {
L
Liu Jicong 已提交
285
    if (!isNull(tdGetColDataOfRowUnsafe(pCol, i), pCol->type)) return false;
H
TD-166  
hzcheng 已提交
286
  }
H
Hongze Cheng 已提交
287
  return true;
H
TD-166  
hzcheng 已提交
288 289
}

290
static FORCE_INLINE void dataColSetNullAt(SDataCol *pCol, int index) {
H
TD-90  
Hongze Cheng 已提交
291 292 293
  if (IS_VAR_DATA_TYPE(pCol->type)) {
    pCol->dataOff[index] = pCol->len;
    char *ptr = POINTER_SHIFT(pCol->pData, pCol->len);
294
    setVardataNull(ptr, pCol->type);
H
TD-90  
Hongze Cheng 已提交
295 296 297 298 299 300 301
    pCol->len += varDataTLen(ptr);
  } else {
    setNull(POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * index), pCol->type, pCol->bytes);
    pCol->len += TYPE_BYTES[pCol->type];
  }
}

302
static void dataColSetNEleNull(SDataCol *pCol, int nEle) {
H
TD-90  
Hongze Cheng 已提交
303 304 305 306 307 308 309 310
  if (IS_VAR_DATA_TYPE(pCol->type)) {
    pCol->len = 0;
    for (int i = 0; i < nEle; i++) {
      dataColSetNullAt(pCol, i);
    }
  } else {
    setNullN(pCol->pData, pCol->type, pCol->bytes, nEle);
    pCol->len = TYPE_BYTES[pCol->type] * nEle;
H
TD-166  
hzcheng 已提交
311 312 313
  }
}

H
TD-166  
hzcheng 已提交
314 315 316
void dataColSetOffset(SDataCol *pCol, int nEle) {
  ASSERT(((pCol->type == TSDB_DATA_TYPE_BINARY) || (pCol->type == TSDB_DATA_TYPE_NCHAR)));

H
Hongze Cheng 已提交
317
  void *tptr = pCol->pData;
H
TD-166  
hzcheng 已提交
318
  // char *tptr = (char *)(pCol->pData);
H
TD-166  
hzcheng 已提交
319

H
TD-166  
hzcheng 已提交
320
  VarDataOffsetT offset = 0;
H
TD-166  
hzcheng 已提交
321
  for (int i = 0; i < nEle; i++) {
H
TD-166  
hzcheng 已提交
322
    pCol->dataOff[i] = offset;
H
TD-166  
hzcheng 已提交
323
    offset += varDataTLen(tptr);
H
hzcheng 已提交
324
    tptr = POINTER_SHIFT(tptr, varDataTLen(tptr));
H
TD-166  
hzcheng 已提交
325 326 327
  }
}

L
Liu Jicong 已提交
328
SDataCols *tdNewDataCols(int maxCols, int maxRows) {
H
Hongze Cheng 已提交
329
  SDataCols *pCols = (SDataCols *)calloc(1, sizeof(SDataCols));
H
Haojun Liao 已提交
330
  if (pCols == NULL) {
S
Shengliang Guan 已提交
331
    uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)sizeof(SDataCols), strerror(errno));
H
Haojun Liao 已提交
332 333
    return NULL;
  }
H
TD-34  
hzcheng 已提交
334

H
Hongze Cheng 已提交
335
  pCols->maxPoints = maxRows;
L
Liu Jicong 已提交
336 337 338
  pCols->maxCols = maxCols;
  pCols->numOfRows = 0;
  pCols->numOfCols = 0;
H
Hongze Cheng 已提交
339 340 341 342 343 344 345 346 347

  if (maxCols > 0) {
    pCols->cols = (SDataCol *)calloc(maxCols, sizeof(SDataCol));
    if (pCols->cols == NULL) {
      uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)sizeof(SDataCol) * maxCols,
             strerror(errno));
      tdFreeDataCols(pCols);
      return NULL;
    }
L
Liu Jicong 已提交
348 349 350
    int i;
    for(i = 0; i < maxCols; i++) {
      pCols->cols[i].spaceSize = 0;
L
Liu Jicong 已提交
351
      pCols->cols[i].len = 0;
L
Liu Jicong 已提交
352 353 354
      pCols->cols[i].pData = NULL;
      pCols->cols[i].dataOff = NULL;
    }
H
Hongze Cheng 已提交
355 356
  }

H
TD-34  
hzcheng 已提交
357 358 359
  return pCols;
}

H
Hongze Cheng 已提交
360
int tdInitDataCols(SDataCols *pCols, STSchema *pSchema) {
361 362
  int i;
  int oldMaxCols = pCols->maxCols;
L
Liu Jicong 已提交
363
  if (schemaNCols(pSchema) > oldMaxCols) {
H
Hongze Cheng 已提交
364
    pCols->maxCols = schemaNCols(pSchema);
L
Liu Jicong 已提交
365 366 367
    void* ptr = (SDataCol *)realloc(pCols->cols, sizeof(SDataCol) * pCols->maxCols);
    if (ptr == NULL) return -1;
    pCols->cols = ptr;
368 369 370
    for(i = oldMaxCols; i < pCols->maxCols; i++) {
      pCols->cols[i].pData = NULL;
      pCols->cols[i].dataOff = NULL;
L
Liu Jicong 已提交
371
      pCols->cols[i].spaceSize = 0;
372
    }
L
Liu Jicong 已提交
373
  }
H
Hongze Cheng 已提交
374

H
TD-34  
hzcheng 已提交
375 376 377
  tdResetDataCols(pCols);
  pCols->numOfCols = schemaNCols(pSchema);

378
  for (i = 0; i < schemaNCols(pSchema); i++) {
379
    dataColInit(pCols->cols + i, schemaColAt(pSchema, i), pCols->maxPoints);
H
TD-34  
hzcheng 已提交
380
  }
H
Hongze Cheng 已提交
381 382
  
  return 0;
H
TD-34  
hzcheng 已提交
383 384
}

H
Hongze Cheng 已提交
385
SDataCols *tdFreeDataCols(SDataCols *pCols) {
386
  int i;
H
TD-34  
hzcheng 已提交
387
  if (pCols) {
388 389 390 391 392 393 394 395 396
    if(pCols->cols) {
      int maxCols = pCols->maxCols;
      for(i = 0; i < maxCols; i++) {
        SDataCol *pCol = &pCols->cols[i];
        tfree(pCol->pData);
      }
      free(pCols->cols);
      pCols->cols = NULL;
    }
H
TD-34  
hzcheng 已提交
397 398
    free(pCols);
  }
H
Hongze Cheng 已提交
399
  return NULL;
H
TD-34  
hzcheng 已提交
400 401
}

H
TD-100  
hzcheng 已提交
402
SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
L
Liu Jicong 已提交
403
  SDataCols *pRet = tdNewDataCols(pDataCols->maxCols, pDataCols->maxPoints);
H
TD-100  
hzcheng 已提交
404 405 406 407
  if (pRet == NULL) return NULL;

  pRet->numOfCols = pDataCols->numOfCols;
  pRet->sversion = pDataCols->sversion;
H
Haojun Liao 已提交
408
  if (keepData) pRet->numOfRows = pDataCols->numOfRows;
H
TD-100  
hzcheng 已提交
409 410 411 412 413 414

  for (int i = 0; i < pDataCols->numOfCols; i++) {
    pRet->cols[i].type = pDataCols->cols[i].type;
    pRet->cols[i].colId = pDataCols->cols[i].colId;
    pRet->cols[i].bytes = pDataCols->cols[i].bytes;
    pRet->cols[i].offset = pDataCols->cols[i].offset;
H
TD-166  
hzcheng 已提交
415 416

    if (keepData) {
L
Liu Jicong 已提交
417
      if (pDataCols->cols[i].len > 0) {
L
Liu Jicong 已提交
418 419 420 421
        if(tdAllocMemForCol(&pRet->cols[i], pRet->maxPoints) < 0) {
          tdFreeDataCols(pRet);
          return NULL;
        }
L
Liu Jicong 已提交
422
        pRet->cols[i].len = pDataCols->cols[i].len;
H
Hongze Cheng 已提交
423
        memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pDataCols->cols[i].len);
H
Hongze Cheng 已提交
424
        if (IS_VAR_DATA_TYPE(pRet->cols[i].type)) {
L
Liu Jicong 已提交
425 426
          int dataOffSize = sizeof(VarDataOffsetT) * pDataCols->maxPoints;
          memcpy(pRet->cols[i].dataOff, pDataCols->cols[i].dataOff, dataOffSize);
H
Hongze Cheng 已提交
427
        }
H
TD-166  
hzcheng 已提交
428 429
      }
    }
H
TD-100  
hzcheng 已提交
430 431 432 433 434
  }

  return pRet;
}

H
TD-34  
hzcheng 已提交
435
void tdResetDataCols(SDataCols *pCols) {
B
Bomin Zhang 已提交
436 437 438 439 440
  if (pCols != NULL) {
    pCols->numOfRows = 0;
    for (int i = 0; i < pCols->maxCols; i++) {
      dataColReset(pCols->cols + i);
    }
H
TD-34  
hzcheng 已提交
441 442
  }
}
443 444

static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull) {
H
TD-1548  
Hongze Cheng 已提交
445
  ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < dataRowKey(row));
H
TD-166  
hzcheng 已提交
446

C
Cary Xu 已提交
447
  int rcol = 0;
H
TD-90  
Hongze Cheng 已提交
448 449
  int dcol = 0;

450
  while (dcol < pCols->numOfCols) {
451
    bool setCol = 0;
452 453 454 455 456
    SDataCol *pDataCol = &(pCols->cols[dcol]);
    if (rcol >= schemaNCols(pSchema)) {
      dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
      dcol++;
      continue;
H
TD-90  
Hongze Cheng 已提交
457
    }
H
TD-166  
hzcheng 已提交
458

459 460 461
    STColumn *pRowCol = schemaColAt(pSchema, rcol);
    if (pRowCol->colId == pDataCol->colId) {
      void *value = tdGetRowDataOfCol(row, pRowCol->type, pRowCol->offset + TD_DATA_ROW_HEAD_SIZE);
462
      if(!isNull(value, pDataCol->type)) setCol = 1;
463 464 465 466 467 468
      dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints);
      dcol++;
      rcol++;
    } else if (pRowCol->colId < pDataCol->colId) {
      rcol++;
    } else {
469
      if(forceSetNull || setCol) {
470
        dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
C
Cary Xu 已提交
471
      }
472
      dcol++;
C
Cary Xu 已提交
473 474 475 476 477
    }
  }
  pCols->numOfRows++;
}

478
static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull) {
C
Cary Xu 已提交
479 480 481 482 483
  ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < kvRowKey(row));

  int rcol = 0;
  int dcol = 0;

484 485 486
  int nRowCols = kvRowNCols(row);

  while (dcol < pCols->numOfCols) {
487
    bool setCol = 0;
488 489 490 491 492
    SDataCol *pDataCol = &(pCols->cols[dcol]);
    if (rcol >= nRowCols || rcol >= schemaNCols(pSchema)) {
      dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
      ++dcol;
      continue;
C
Cary Xu 已提交
493 494
    }

495 496 497 498
    SColIdx *colIdx = kvRowColIdxAt(row, rcol);

    if (colIdx->colId == pDataCol->colId) {
      void *value = tdGetKvRowDataOfCol(row, colIdx->offset);
499
      if(!isNull(value, pDataCol->type)) setCol = 1;
500 501 502 503 504 505
      dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints);
      ++dcol;
      ++rcol;
    } else if (colIdx->colId < pDataCol->colId) {
      ++rcol;
    } else {
506
      if(forceSetNull || setCol) {
C
Cary Xu 已提交
507
        dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
H
TD-1548  
Hongze Cheng 已提交
508
      }
509
      ++dcol;
H
TD-90  
Hongze Cheng 已提交
510
    }
H
TD-34  
hzcheng 已提交
511
  }
H
Haojun Liao 已提交
512
  pCols->numOfRows++;
H
TD-34  
hzcheng 已提交
513
}
H
TD-166  
hzcheng 已提交
514

515
void tdAppendMemRowToDataCol(SMemRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull) {
C
Cary Xu 已提交
516
  if (isDataRow(row)) {
517
    tdAppendDataRowToDataCol(memRowDataBody(row), pSchema, pCols, forceSetNull);
C
Cary Xu 已提交
518
  } else if (isKvRow(row)) {
519
    tdAppendKvRowToDataCol(memRowKvBody(row), pSchema, pCols, forceSetNull);
C
Cary Xu 已提交
520 521 522 523 524
  } else {
    ASSERT(0);
  }
}

525
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset, bool forceSetNull) {
H
Haojun Liao 已提交
526
  ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows);
H
TD-166  
hzcheng 已提交
527
  ASSERT(target->numOfCols == source->numOfCols);
H
Hongze Cheng 已提交
528 529 530 531 532
  int offset = 0;

  if (pOffset == NULL) {
    pOffset = &offset;
  }
H
TD-100  
hzcheng 已提交
533

H
TD-166  
hzcheng 已提交
534
  SDataCols *pTarget = NULL;
H
TD-100  
hzcheng 已提交
535

536
  if ((target->numOfRows == 0) || (dataColsKeyLast(target) < dataColsKeyAtRow(source, *pOffset))) {  // No overlap
H
Hongze Cheng 已提交
537
    ASSERT(target->numOfRows + rowsToMerge <= target->maxPoints);
H
TD-166  
hzcheng 已提交
538 539
    for (int i = 0; i < rowsToMerge; i++) {
      for (int j = 0; j < source->numOfCols; j++) {
540
        if (source->cols[j].len > 0 || target->cols[j].len > 0) {
H
Hongze Cheng 已提交
541
          dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i + (*pOffset)), target->numOfRows,
H
Hongze Cheng 已提交
542 543
                           target->maxPoints);
        }
H
TD-166  
hzcheng 已提交
544
      }
H
Haojun Liao 已提交
545
      target->numOfRows++;
H
TD-166  
hzcheng 已提交
546
    }
L
lichuang 已提交
547
    (*pOffset) += rowsToMerge;
H
TD-166  
hzcheng 已提交
548 549 550 551 552
  } else {
    pTarget = tdDupDataCols(target, true);
    if (pTarget == NULL) goto _err;

    int iter1 = 0;
H
Hongze Cheng 已提交
553
    tdMergeTwoDataCols(target, pTarget, &iter1, pTarget->numOfRows, source, pOffset, source->numOfRows,
554
                       pTarget->numOfRows + rowsToMerge, forceSetNull);
H
TD-166  
hzcheng 已提交
555
  }
H
TD-100  
hzcheng 已提交
556 557 558 559 560 561 562 563

  tdFreeDataCols(pTarget);
  return 0;

_err:
  tdFreeDataCols(pTarget);
  return -1;
}
H
TD-100  
hzcheng 已提交
564

H
TD-1438  
Hongze Cheng 已提交
565 566
// src2 data has more priority than src1
static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2,
567
                               int limit2, int tRows, bool forceSetNull) {
H
TD-100  
hzcheng 已提交
568
  tdResetDataCols(target);
H
TD-521  
Hongze Cheng 已提交
569
  ASSERT(limit1 <= src1->numOfRows && limit2 <= src2->numOfRows);
H
TD-100  
hzcheng 已提交
570

H
Haojun Liao 已提交
571
  while (target->numOfRows < tRows) {
H
TD-521  
Hongze Cheng 已提交
572
    if (*iter1 >= limit1 && *iter2 >= limit2) break;
H
TD-100  
hzcheng 已提交
573

H
TD-1548  
Hongze Cheng 已提交
574 575 576 577 578 579
    TSKEY key1 = (*iter1 >= limit1) ? INT64_MAX : dataColsKeyAt(src1, *iter1);
    TKEY  tkey1 = (*iter1 >= limit1) ? TKEY_NULL : dataColsTKeyAt(src1, *iter1);
    TSKEY key2 = (*iter2 >= limit2) ? INT64_MAX : dataColsKeyAt(src2, *iter2);
    TKEY  tkey2 = (*iter2 >= limit2) ? TKEY_NULL : dataColsTKeyAt(src2, *iter2);

    ASSERT(tkey1 == TKEY_NULL || (!TKEY_IS_DELETED(tkey1)));
H
TD-100  
hzcheng 已提交
580

H
TD-1438  
Hongze Cheng 已提交
581
    if (key1 < key2) {
H
TD-100  
hzcheng 已提交
582 583
      for (int i = 0; i < src1->numOfCols; i++) {
        ASSERT(target->cols[i].type == src1->cols[i].type);
584
        if (src1->cols[i].len > 0 || target->cols[i].len > 0) {
H
Hongze Cheng 已提交
585 586 587
          dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows,
                           target->maxPoints);
        }
H
TD-100  
hzcheng 已提交
588 589
      }

H
Haojun Liao 已提交
590
      target->numOfRows++;
H
TD-100  
hzcheng 已提交
591
      (*iter1)++;
H
TD-1548  
Hongze Cheng 已提交
592 593 594 595
    } else if (key1 >= key2) {
      if ((key1 > key2) || (key1 == key2 && !TKEY_IS_DELETED(tkey2))) {
        for (int i = 0; i < src2->numOfCols; i++) {
          ASSERT(target->cols[i].type == src2->cols[i].type);
L
Liu Jicong 已提交
596
          if (src2->cols[i].len > 0 && !isNull(src2->cols[i].pData, src2->cols[i].type)) {
H
TD-1548  
Hongze Cheng 已提交
597 598
            dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src2->cols + i, *iter2), target->numOfRows,
                             target->maxPoints);
L
Liu Jicong 已提交
599 600 601
          } else if(!forceSetNull && key1 == key2 && src1->cols[i].len > 0) {
            dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows,
                             target->maxPoints);
602 603
          } else if(target->cols[i].len > 0) {
            dataColSetNullAt(&target->cols[i], target->numOfRows);
H
TD-1548  
Hongze Cheng 已提交
604
          }
H
Hongze Cheng 已提交
605
        }
H
Hongze Cheng 已提交
606
        target->numOfRows++;
H
TD-100  
hzcheng 已提交
607
      }
H
TD-100  
hzcheng 已提交
608

H
TD-100  
hzcheng 已提交
609
      (*iter2)++;
H
TD-1438  
Hongze Cheng 已提交
610
      if (key1 == key2) (*iter1)++;
H
TD-100  
hzcheng 已提交
611
    }
H
Hongze Cheng 已提交
612 613

    ASSERT(target->numOfRows <= target->maxPoints);
H
TD-100  
hzcheng 已提交
614
  }
H
Hongze Cheng 已提交
615 616
}

H
Hongze Cheng 已提交
617 618
SKVRow tdKVRowDup(SKVRow row) {
  SKVRow trow = malloc(kvRowLen(row));
H
Hongze Cheng 已提交
619 620
  if (trow == NULL) return NULL;

H
Hongze Cheng 已提交
621
  kvRowCpy(trow, row);
H
Hongze Cheng 已提交
622 623 624
  return trow;
}

B
Bomin Zhang 已提交
625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640
static int compareColIdx(const void* a, const void* b) {
  const SColIdx* x = (const SColIdx*)a;
  const SColIdx* y = (const SColIdx*)b;
  if (x->colId > y->colId) {
    return 1;
  }
  if (x->colId < y->colId) {
    return -1;
  }
  return 0;
}

void tdSortKVRowByColIdx(SKVRow row) {
  qsort(kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), compareColIdx);
}

H
TD-90  
Hongze Cheng 已提交
641 642 643 644 645 646
int tdSetKVRowDataOfCol(SKVRow *orow, int16_t colId, int8_t type, void *value) {
  SColIdx *pColIdx = NULL;
  SKVRow   row = *orow;
  SKVRow   nrow = NULL;
  void *   ptr = taosbsearch(&colId, kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), comparTagId, TD_GE);

647
  if (ptr == NULL || ((SColIdx *)ptr)->colId > colId) {  // need to add a column value to the row
C
Cary Xu 已提交
648
    int diff = IS_VAR_DATA_TYPE(type) ? varDataTLen(value) : TYPE_BYTES[type];
649 650 651 652 653
    int nRowLen = kvRowLen(row) + sizeof(SColIdx) + diff;
    int oRowCols = kvRowNCols(row);

    ASSERT(diff > 0);
    nrow = malloc(nRowLen);
H
TD-90  
Hongze Cheng 已提交
654 655
    if (nrow == NULL) return -1;

656 657
    kvRowSetLen(nrow, nRowLen);
    kvRowSetNCols(nrow, oRowCols + 1);
H
TD-90  
Hongze Cheng 已提交
658

659 660
    memcpy(kvRowColIdx(nrow), kvRowColIdx(row), sizeof(SColIdx) * oRowCols);
    memcpy(kvRowValues(nrow), kvRowValues(row), kvRowValLen(row));
H
TD-90  
Hongze Cheng 已提交
661

662 663 664
    pColIdx = kvRowColIdxAt(nrow, oRowCols);
    pColIdx->colId = colId;
    pColIdx->offset = kvRowValLen(row);
H
TD-90  
Hongze Cheng 已提交
665

666
    memcpy(kvRowColVal(nrow, pColIdx), value, diff);  // copy new value
H
TD-90  
Hongze Cheng 已提交
667

668
    tdSortKVRowByColIdx(nrow);
H
TD-90  
Hongze Cheng 已提交
669 670 671

    *orow = nrow;
    free(row);
H
TD-90  
Hongze Cheng 已提交
672 673 674 675 676 677 678
  } else {
    ASSERT(((SColIdx *)ptr)->colId == colId);
    if (IS_VAR_DATA_TYPE(type)) {
      void *pOldVal = kvRowColVal(row, (SColIdx *)ptr);

      if (varDataTLen(value) == varDataTLen(pOldVal)) { // just update the column value in place
        memcpy(pOldVal, value, varDataTLen(value));
679 680
      } else {  // need to reallocate the memory
        int16_t nlen = kvRowLen(row) + (varDataTLen(value) - varDataTLen(pOldVal));
H
TD-90  
Hongze Cheng 已提交
681 682
        ASSERT(nlen > 0);
        nrow = malloc(nlen);
H
TD-90  
Hongze Cheng 已提交
683
        if (nrow == NULL) return -1;
H
TD-90  
Hongze Cheng 已提交
684 685 686 687

        kvRowSetLen(nrow, nlen);
        kvRowSetNCols(nrow, kvRowNCols(row));

688 689 690 691 692 693 694 695
        int zsize = sizeof(SColIdx) * kvRowNCols(row) + ((SColIdx *)ptr)->offset;
        memcpy(kvRowColIdx(nrow), kvRowColIdx(row), zsize);
        memcpy(kvRowColVal(nrow, ((SColIdx *)ptr)), value, varDataTLen(value));
        // Copy left value part
        int lsize = kvRowLen(row) - TD_KV_ROW_HEAD_SIZE - zsize - varDataTLen(pOldVal);
        if (lsize > 0) {
          memcpy(POINTER_SHIFT(nrow, TD_KV_ROW_HEAD_SIZE + zsize + varDataTLen(value)),
                 POINTER_SHIFT(row, TD_KV_ROW_HEAD_SIZE + zsize + varDataTLen(pOldVal)), lsize);
H
TD-90  
Hongze Cheng 已提交
696 697
        }

698 699 700 701 702
        for (int i = 0; i < kvRowNCols(nrow); i++) {
          pColIdx = kvRowColIdxAt(nrow, i);

          if (pColIdx->offset > ((SColIdx *)ptr)->offset) {
            pColIdx->offset = pColIdx->offset - varDataTLen(pOldVal) + varDataTLen(value);
H
TD-90  
Hongze Cheng 已提交
703 704 705 706
          }
        }

        *orow = nrow;
H
TD-90  
Hongze Cheng 已提交
707
        free(row);
H
TD-90  
Hongze Cheng 已提交
708 709 710 711 712 713 714
      }
    } else {
      memcpy(kvRowColVal(row, (SColIdx *)ptr), value, TYPE_BYTES[type]);
    }
  }

  return 0;
H
Hongze Cheng 已提交
715 716
}

H
TD-353  
Hongze Cheng 已提交
717
int tdEncodeKVRow(void **buf, SKVRow row) {
H
Hongze Cheng 已提交
718
  // May change the encode purpose
H
TD-353  
Hongze Cheng 已提交
719 720 721 722 723 724
  if (buf != NULL) {
    kvRowCpy(*buf, row);
    *buf = POINTER_SHIFT(*buf, kvRowLen(row));
  }

  return kvRowLen(row);
H
Hongze Cheng 已提交
725 726
}

H
Hongze Cheng 已提交
727 728
void *tdDecodeKVRow(void *buf, SKVRow *row) {
  *row = tdKVRowDup(buf);
H
TD-353  
Hongze Cheng 已提交
729
  if (*row == NULL) return NULL;
H
Hongze Cheng 已提交
730
  return POINTER_SHIFT(buf, kvRowLen(*row));
H
Hongze Cheng 已提交
731 732
}

H
Hongze Cheng 已提交
733
int tdInitKVRowBuilder(SKVRowBuilder *pBuilder) {
H
Hongze Cheng 已提交
734 735 736 737 738 739 740 741 742 743 744 745 746 747
  pBuilder->tCols = 128;
  pBuilder->nCols = 0;
  pBuilder->pColIdx = (SColIdx *)malloc(sizeof(SColIdx) * pBuilder->tCols);
  if (pBuilder->pColIdx == NULL) return -1;
  pBuilder->alloc = 1024;
  pBuilder->size = 0;
  pBuilder->buf = malloc(pBuilder->alloc);
  if (pBuilder->buf == NULL) {
    free(pBuilder->pColIdx);
    return -1;
  }
  return 0;
}

H
Hongze Cheng 已提交
748
void tdDestroyKVRowBuilder(SKVRowBuilder *pBuilder) {
S
TD-1848  
Shengliang Guan 已提交
749 750
  tfree(pBuilder->pColIdx);
  tfree(pBuilder->buf);
H
Hongze Cheng 已提交
751 752
}

H
Hongze Cheng 已提交
753
void tdResetKVRowBuilder(SKVRowBuilder *pBuilder) {
H
Hongze Cheng 已提交
754 755 756 757
  pBuilder->nCols = 0;
  pBuilder->size = 0;
}

H
Hongze Cheng 已提交
758
SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder) {
C
Cary Xu 已提交
759
  int tlen = sizeof(SColIdx) * pBuilder->nCols + pBuilder->size;
H
Hongze Cheng 已提交
760 761
  if (tlen == 0) return NULL;

H
Hongze Cheng 已提交
762 763 764
  tlen += TD_KV_ROW_HEAD_SIZE;

  SKVRow row = malloc(tlen);
H
Hongze Cheng 已提交
765 766
  if (row == NULL) return NULL;

H
Hongze Cheng 已提交
767
  kvRowSetNCols(row, pBuilder->nCols);
H
Hongze Cheng 已提交
768
  kvRowSetLen(row, tlen);
H
Hongze Cheng 已提交
769

H
Hongze Cheng 已提交
770 771
  memcpy(kvRowColIdx(row), pBuilder->pColIdx, sizeof(SColIdx) * pBuilder->nCols);
  memcpy(kvRowValues(row), pBuilder->buf, pBuilder->size);
H
Hongze Cheng 已提交
772 773

  return row;
774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860
}

SMemRow mergeTwoMemRows(void *buffer, SMemRow row1, SMemRow row2, STSchema *pSchema1, STSchema *pSchema2) {
#if 0
  ASSERT(memRowKey(row1) == memRowKey(row2));
  ASSERT(schemaVersion(pSchema1) == memRowVersion(row1));
  ASSERT(schemaVersion(pSchema2) == memRowVersion(row2));
  ASSERT(schemaVersion(pSchema1) >= schemaVersion(pSchema2));
#endif

  SArray *stashRow = taosArrayInit(pSchema1->numOfCols, sizeof(SColInfo));
  if (stashRow == NULL) {
    return NULL;
  }

  SMemRow  pRow = buffer;
  SDataRow dataRow = memRowDataBody(pRow);
  memRowSetType(pRow, SMEM_ROW_DATA);
  dataRowSetVersion(dataRow, schemaVersion(pSchema1));  // use latest schema version
  dataRowSetLen(dataRow, (TDRowLenT)(TD_DATA_ROW_HEAD_SIZE + pSchema1->flen));

  TDRowTLenT dataLen = 0, kvLen = TD_MEM_ROW_KV_HEAD_SIZE;

  int32_t  i = 0;  // row1
  int32_t  j = 0;  // row2
  int32_t  nCols1 = schemaNCols(pSchema1);
  int32_t  nCols2 = schemaNCols(pSchema2);
  SColInfo colInfo = {0};
  int32_t  kvIdx1 = 0, kvIdx2 = 0;

  while (i < nCols1) {
    STColumn *pCol = schemaColAt(pSchema1, i);
    void *    val1 = tdGetMemRowDataOfColEx(row1, pCol->colId, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset, &kvIdx1);
    // if val1 != NULL, use val1;
    if (val1 != NULL && !isNull(val1, pCol->type)) {
      tdAppendColVal(dataRow, val1, pCol->type, pCol->offset);
      kvLen += tdGetColAppendLen(SMEM_ROW_KV, val1, pCol->type);
      setSColInfo(&colInfo, pCol->colId, pCol->type, val1);
      taosArrayPush(stashRow, &colInfo);
      ++i;  // next col
      continue;
    }

    void *val2 = NULL;
    while (j < nCols2) {
      STColumn *tCol = schemaColAt(pSchema2, j);
      if (tCol->colId < pCol->colId) {
        ++j;
        continue;
      }
      if (tCol->colId == pCol->colId) {
        val2 = tdGetMemRowDataOfColEx(row2, tCol->colId, tCol->type, TD_DATA_ROW_HEAD_SIZE + tCol->offset, &kvIdx2);
      } else if (tCol->colId > pCol->colId) {
        // set NULL
      }
      break;
    }  // end of while(j<nCols2)
    if (val2 == NULL) {
      val2 = (void *)getNullValue(pCol->type);
    }
    tdAppendColVal(dataRow, val2, pCol->type, pCol->offset);
    if (!isNull(val2, pCol->type)) {
      kvLen += tdGetColAppendLen(SMEM_ROW_KV, val2, pCol->type);
      setSColInfo(&colInfo, pCol->colId, pCol->type, val2);
      taosArrayPush(stashRow, &colInfo);
    }

    ++i;  // next col
  }

  dataLen = memRowTLen(pRow);

  if (kvLen < dataLen) {
    // scan stashRow and generate SKVRow
    memset(buffer, 0, sizeof(dataLen));
    SMemRow tRow = buffer;
    memRowSetType(tRow, SMEM_ROW_KV);
    SKVRow kvRow = (SKVRow)memRowKvBody(tRow);
    int16_t nKvNCols = (int16_t) taosArrayGetSize(stashRow);
    kvRowSetLen(kvRow, (TDRowLenT)(TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * nKvNCols));
    kvRowSetNCols(kvRow, nKvNCols);
    memRowSetKvVersion(tRow, pSchema1->version);

    int32_t toffset = 0;
    int16_t k;
    for (k = 0; k < nKvNCols; ++k) {
      SColInfo *pColInfo = taosArrayGet(stashRow, k);
861 862
      tdAppendKvColVal(kvRow, pColInfo->colVal, true, pColInfo->colId, pColInfo->colType, toffset);
      toffset += sizeof(SColIdx);
863 864 865 866 867 868
    }
    ASSERT(kvLen == memRowTLen(tRow));
  }
  taosArrayDestroy(stashRow);
  return buffer;
}