tdataformat.c 27.3 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
S
common  
Shengliang Guan 已提交
15 16

#define _DEFAULT_SOURCE
S
slguan 已提交
17
#include "tdataformat.h"
S
Shengliang Guan 已提交
18
#include "tcoding.h"
S
log  
Shengliang Guan 已提交
19
#include "tlog.h"
H
more  
hzcheng 已提交
20

21
static void dataColSetNEleNull(SDataCol *pCol, int nEle);
C
Cary Xu 已提交
22
#if 0
H
TD-1438  
Hongze Cheng 已提交
23
static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2,
24
                               int limit2, int tRows, bool forceSetNull);
C
Cary Xu 已提交
25
#endif
L
Liu Jicong 已提交
26
int tdAllocMemForCol(SDataCol *pCol, int maxPoints) {
L
Liu Jicong 已提交
27
  int spaceNeeded = pCol->bytes * maxPoints;
S
Shengliang Guan 已提交
28
  if (IS_VAR_DATA_TYPE(pCol->type)) {
L
Liu Jicong 已提交
29
    spaceNeeded += sizeof(VarDataOffsetT) * maxPoints;
L
Liu Jicong 已提交
30
  }
C
Cary Xu 已提交
31
#ifdef TD_SUPPORT_BITMAP
C
Cary Xu 已提交
32 33 34 35 36
  int32_t nBitmapBytes = (int32_t)TD_BITMAP_BYTES(maxPoints);
  spaceNeeded += (int)nBitmapBytes;
  // TODO: Currently, the compression of bitmap parts is affiliated to the column data parts, thus allocate 1 more
  // TYPE_BYTES as to comprise complete TYPE_BYTES. Otherwise, invalid read/write would be triggered.
  spaceNeeded += TYPE_BYTES[pCol->type];
C
Cary Xu 已提交
37
#endif
C
Cary Xu 已提交
38

S
Shengliang Guan 已提交
39
  if (pCol->spaceSize < spaceNeeded) {
wafwerar's avatar
wafwerar 已提交
40
    void *ptr = taosMemoryRealloc(pCol->pData, spaceNeeded);
S
Shengliang Guan 已提交
41 42
    if (ptr == NULL) {
      uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)spaceNeeded, strerror(errno));
L
Liu Jicong 已提交
43
      return -1;
L
Liu Jicong 已提交
44 45 46
    } else {
      pCol->pData = ptr;
      pCol->spaceSize = spaceNeeded;
47 48
    }
  }
C
Cary Xu 已提交
49
#ifdef TD_SUPPORT_BITMAP
C
Cary Xu 已提交
50 51 52 53
  if (IS_VAR_DATA_TYPE(pCol->type)) {
    pCol->pBitmap = POINTER_SHIFT(pCol->pData, pCol->bytes * maxPoints);
    pCol->dataOff = POINTER_SHIFT(pCol->pBitmap, nBitmapBytes);
  } else {
C
Cary Xu 已提交
54
    pCol->pBitmap = POINTER_SHIFT(pCol->pData, pCol->bytes * maxPoints);
L
Liu Jicong 已提交
55
  }
C
Cary Xu 已提交
56 57 58 59
#else
  if (IS_VAR_DATA_TYPE(pCol->type)) {
    pCol->dataOff = POINTER_SHIFT(pCol->pData, pCol->bytes * maxPoints);
  }
C
Cary Xu 已提交
60
#endif
L
Liu Jicong 已提交
61
  return 0;
62 63
}

H
hzcheng 已提交
64 65 66
/**
 * Duplicate the schema and return a new object
 */
H
Hongze Cheng 已提交
67
STSchema *tdDupSchema(const STSchema *pSchema) {
S
Shengliang Guan 已提交
68
  int       tlen = sizeof(STSchema) + sizeof(STColumn) * schemaNCols(pSchema);
wafwerar's avatar
wafwerar 已提交
69
  STSchema *tSchema = (STSchema *)taosMemoryMalloc(tlen);
H
hzcheng 已提交
70 71
  if (tSchema == NULL) return NULL;

H
Hongze Cheng 已提交
72
  memcpy((void *)tSchema, (void *)pSchema, tlen);
H
hzcheng 已提交
73 74 75 76

  return tSchema;
}

H
TD-27  
hzcheng 已提交
77 78 79
/**
 * Encode a schema to dst, and return the next pointer
 */
H
TD-353  
Hongze Cheng 已提交
80 81 82 83
int tdEncodeSchema(void **buf, STSchema *pSchema) {
  int tlen = 0;
  tlen += taosEncodeFixedI32(buf, schemaVersion(pSchema));
  tlen += taosEncodeFixedI32(buf, schemaNCols(pSchema));
H
TD-166  
hzcheng 已提交
84

H
TD-27  
hzcheng 已提交
85 86
  for (int i = 0; i < schemaNCols(pSchema); i++) {
    STColumn *pCol = schemaColAt(pSchema, i);
H
TD-353  
Hongze Cheng 已提交
87 88
    tlen += taosEncodeFixedI8(buf, colType(pCol));
    tlen += taosEncodeFixedI16(buf, colColId(pCol));
89
    tlen += taosEncodeFixedI16(buf, colBytes(pCol));
H
TD-27  
hzcheng 已提交
90 91
  }

H
TD-353  
Hongze Cheng 已提交
92
  return tlen;
H
TD-27  
hzcheng 已提交
93 94 95 96 97
}

/**
 * Decode a schema from a binary.
 */
H
TD-353  
Hongze Cheng 已提交
98
void *tdDecodeSchema(void *buf, STSchema **pRSchema) {
S
Shengliang Guan 已提交
99 100
  int             version = 0;
  int             numOfCols = 0;
H
TD-353  
Hongze Cheng 已提交
101
  STSchemaBuilder schemaBuilder;
H
TD-27  
hzcheng 已提交
102

H
TD-353  
Hongze Cheng 已提交
103 104
  buf = taosDecodeFixedI32(buf, &version);
  buf = taosDecodeFixedI32(buf, &numOfCols);
H
TD-27  
hzcheng 已提交
105

H
Hongze Cheng 已提交
106 107
  if (tdInitTSchemaBuilder(&schemaBuilder, version) < 0) return NULL;

H
TD-353  
Hongze Cheng 已提交
108
  for (int i = 0; i < numOfCols; i++) {
109 110 111
    col_type_t  type = 0;
    col_id_t    colId = 0;
    col_bytes_t bytes = 0;
H
TD-353  
Hongze Cheng 已提交
112 113
    buf = taosDecodeFixedI8(buf, &type);
    buf = taosDecodeFixedI16(buf, &colId);
114
    buf = taosDecodeFixedI32(buf, &bytes);
H
Hongze Cheng 已提交
115 116 117 118
    if (tdAddColToSchema(&schemaBuilder, type, colId, bytes) < 0) {
      tdDestroyTSchemaBuilder(&schemaBuilder);
      return NULL;
    }
H
TD-27  
hzcheng 已提交
119 120
  }

H
TD-353  
Hongze Cheng 已提交
121
  *pRSchema = tdGetSchemaFromBuilder(&schemaBuilder);
H
Hongze Cheng 已提交
122
  tdDestroyTSchemaBuilder(&schemaBuilder);
H
TD-353  
Hongze Cheng 已提交
123
  return buf;
H
Hongze Cheng 已提交
124 125 126 127 128 129
}

int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) {
  if (pBuilder == NULL) return -1;

  pBuilder->tCols = 256;
wafwerar's avatar
wafwerar 已提交
130
  pBuilder->columns = (STColumn *)taosMemoryMalloc(sizeof(STColumn) * pBuilder->tCols);
H
Hongze Cheng 已提交
131 132 133 134 135 136 137 138
  if (pBuilder->columns == NULL) return -1;

  tdResetTSchemaBuilder(pBuilder, version);
  return 0;
}

void tdDestroyTSchemaBuilder(STSchemaBuilder *pBuilder) {
  if (pBuilder) {
wafwerar's avatar
wafwerar 已提交
139
    taosMemoryFreeClear(pBuilder->columns);
H
Hongze Cheng 已提交
140 141 142 143 144 145 146
  }
}

void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) {
  pBuilder->nCols = 0;
  pBuilder->tlen = 0;
  pBuilder->flen = 0;
T
Tao Liu 已提交
147
  pBuilder->vlen = 0;
H
Hongze Cheng 已提交
148 149 150
  pBuilder->version = version;
}

151
int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, col_id_t colId, col_bytes_t bytes) {
152
  if (!isValidDataType(type)) return -1;
H
Hongze Cheng 已提交
153 154 155

  if (pBuilder->nCols >= pBuilder->tCols) {
    pBuilder->tCols *= 2;
wafwerar's avatar
wafwerar 已提交
156
    STColumn *columns = (STColumn *)taosMemoryRealloc(pBuilder->columns, sizeof(STColumn) * pBuilder->tCols);
T
tickduan 已提交
157 158
    if (columns == NULL) return -1;
    pBuilder->columns = columns;
H
Hongze Cheng 已提交
159 160 161 162 163 164 165 166
  }

  STColumn *pCol = &(pBuilder->columns[pBuilder->nCols]);
  colSetType(pCol, type);
  colSetColId(pCol, colId);
  if (pBuilder->nCols == 0) {
    colSetOffset(pCol, 0);
  } else {
S
Shengliang Guan 已提交
167
    STColumn *pTCol = &(pBuilder->columns[pBuilder->nCols - 1]);
H
Hongze Cheng 已提交
168 169 170 171 172
    colSetOffset(pCol, pTCol->offset + TYPE_BYTES[pTCol->type]);
  }

  if (IS_VAR_DATA_TYPE(type)) {
    colSetBytes(pCol, bytes);
T
Tao Liu 已提交
173 174
    pBuilder->tlen += (TYPE_BYTES[type] + bytes);
    pBuilder->vlen += bytes - sizeof(VarDataLenT);
H
Hongze Cheng 已提交
175 176 177
  } else {
    colSetBytes(pCol, TYPE_BYTES[type]);
    pBuilder->tlen += TYPE_BYTES[type];
T
Tao Liu 已提交
178
    pBuilder->vlen += TYPE_BYTES[type];
H
Hongze Cheng 已提交
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193
  }

  pBuilder->nCols++;
  pBuilder->flen += TYPE_BYTES[type];

  ASSERT(pCol->offset < pBuilder->flen);

  return 0;
}

STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder) {
  if (pBuilder->nCols <= 0) return NULL;

  int tlen = sizeof(STSchema) + sizeof(STColumn) * pBuilder->nCols;

wafwerar's avatar
wafwerar 已提交
194
  STSchema *pSchema = (STSchema *)taosMemoryMalloc(tlen);
H
Hongze Cheng 已提交
195 196 197 198 199 200
  if (pSchema == NULL) return NULL;

  schemaVersion(pSchema) = pBuilder->version;
  schemaNCols(pSchema) = pBuilder->nCols;
  schemaTLen(pSchema) = pBuilder->tlen;
  schemaFLen(pSchema) = pBuilder->flen;
T
Tao Liu 已提交
201
  schemaVLen(pSchema) = pBuilder->vlen;
H
Hongze Cheng 已提交
202

C
Cary Xu 已提交
203
#ifdef TD_SUPPORT_BITMAP
C
Cary Xu 已提交
204
  schemaTLen(pSchema) += (int)TD_BITMAP_BYTES(schemaNCols(pSchema));
C
Cary Xu 已提交
205 206
#endif

H
Hongze Cheng 已提交
207 208
  memcpy(schemaColAt(pSchema, 0), pBuilder->columns, sizeof(STColumn) * pBuilder->nCols);

H
TD-27  
hzcheng 已提交
209 210 211
  return pSchema;
}

C
Cary Xu 已提交
212
#if 0
H
hzcheng 已提交
213 214 215
/**
 * Initialize a data row
 */
H
TD-90  
Hongze Cheng 已提交
216 217 218 219
void tdInitDataRow(SDataRow row, STSchema *pSchema) {
  dataRowSetLen(row, TD_DATA_ROW_HEAD_SIZE + schemaFLen(pSchema));
  dataRowSetVersion(row, schemaVersion(pSchema));
}
H
hzcheng 已提交
220

C
Cary Xu 已提交
221 222
SDataRow tdNewDataRowFromSchema(STSchema *pSchema) {
  int32_t size = dataRowMaxBytesFromSchema(pSchema);
H
hzcheng 已提交
223

wafwerar's avatar
wafwerar 已提交
224
  SDataRow row = taosMemoryMalloc(size);
C
Cary Xu 已提交
225
  if (row == NULL) return NULL;
H
hzcheng 已提交
226

C
Cary Xu 已提交
227 228 229
  tdInitDataRow(row, pSchema);
  return row;
}
H
hzcheng 已提交
230

H
hzcheng 已提交
231 232 233
/**
 * Free the SDataRow object
 */
C
Cary Xu 已提交
234
void tdFreeDataRow(SDataRow row) {
wafwerar's avatar
wafwerar 已提交
235
  if (row) taosMemoryFree(row);
C
Cary Xu 已提交
236
}
C
Cary Xu 已提交
237

C
Cary Xu 已提交
238
SDataRow tdDataRowDup(SDataRow row) {
wafwerar's avatar
wafwerar 已提交
239
  SDataRow trow = taosMemoryMalloc(dataRowLen(row));
C
Cary Xu 已提交
240
  if (trow == NULL) return NULL;
H
hzcheng 已提交
241

C
Cary Xu 已提交
242 243 244
  dataRowCpy(trow, row);
  return trow;
}
C
Cary Xu 已提交
245 246

SMemRow tdMemRowDup(SMemRow row) {
wafwerar's avatar
wafwerar 已提交
247
  SMemRow trow = taosMemoryMalloc(memRowTLen(row));
H
hzcheng 已提交
248 249
  if (trow == NULL) return NULL;

C
Cary Xu 已提交
250
  memRowCpy(trow, row);
H
hzcheng 已提交
251
  return trow;
H
hzcheng 已提交
252
}
C
Cary Xu 已提交
253
#endif
C
Cary Xu 已提交
254

255
void dataColInit(SDataCol *pDataCol, STColumn *pCol, int maxPoints) {
H
TD-166  
hzcheng 已提交
256 257 258
  pDataCol->type = colType(pCol);
  pDataCol->colId = colColId(pCol);
  pDataCol->bytes = colBytes(pCol);
S
Shengliang Guan 已提交
259
  pDataCol->offset = colOffset(pCol) + 0;  // TD_DATA_ROW_HEAD_SIZE;
H
TD-166  
hzcheng 已提交
260 261 262

  pDataCol->len = 0;
}
H
Hongze Cheng 已提交
263
// value from timestamp should be TKEY here instead of TSKEY
L
Liu Jicong 已提交
264
int dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints) {
H
TD-166  
hzcheng 已提交
265 266
  ASSERT(pCol != NULL && value != NULL);

K
kailixu 已提交
267 268 269
  if (isAllRowsNull(pCol)) {
    if (isNull(value, pCol->type)) {
      // all null value yet, just return
L
Liu Jicong 已提交
270
      return 0;
K
kailixu 已提交
271 272
    }

S
Shengliang Guan 已提交
273
    if (tdAllocMemForCol(pCol, maxPoints) < 0) return -1;
K
kailixu 已提交
274 275
    if (numOfRows > 0) {
      // Find the first not null value, fill all previouse values as NULL
L
Liu Jicong 已提交
276
      dataColSetNEleNull(pCol, numOfRows);
K
kailixu 已提交
277 278 279
    }
  }

H
Hongze Cheng 已提交
280 281 282 283 284 285 286
  if (IS_VAR_DATA_TYPE(pCol->type)) {
    // set offset
    pCol->dataOff[numOfRows] = pCol->len;
    // Copy data
    memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, varDataTLen(value));
    // Update the length
    pCol->len += varDataTLen(value);
H
TD-166  
hzcheng 已提交
287
  } else {
H
Haojun Liao 已提交
288
    ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows);
H
Hongze Cheng 已提交
289 290
    memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, pCol->bytes);
    pCol->len += pCol->bytes;
H
TD-166  
hzcheng 已提交
291
  }
L
Liu Jicong 已提交
292 293 294 295 296 297 298 299 300
  return 0;
}

static FORCE_INLINE const void *tdGetColDataOfRowUnsafe(SDataCol *pCol, int row) {
  if (IS_VAR_DATA_TYPE(pCol->type)) {
    return POINTER_SHIFT(pCol->pData, pCol->dataOff[row]);
  } else {
    return POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * row);
  }
H
TD-166  
hzcheng 已提交
301 302
}

H
TD-166  
hzcheng 已提交
303
bool isNEleNull(SDataCol *pCol, int nEle) {
S
Shengliang Guan 已提交
304
  if (isAllRowsNull(pCol)) return true;
H
Hongze Cheng 已提交
305
  for (int i = 0; i < nEle; i++) {
L
Liu Jicong 已提交
306
    if (!isNull(tdGetColDataOfRowUnsafe(pCol, i), pCol->type)) return false;
H
TD-166  
hzcheng 已提交
307
  }
H
Hongze Cheng 已提交
308
  return true;
H
TD-166  
hzcheng 已提交
309 310
}

311
static FORCE_INLINE void dataColSetNullAt(SDataCol *pCol, int index) {
H
TD-90  
Hongze Cheng 已提交
312 313 314
  if (IS_VAR_DATA_TYPE(pCol->type)) {
    pCol->dataOff[index] = pCol->len;
    char *ptr = POINTER_SHIFT(pCol->pData, pCol->len);
315
    setVardataNull(ptr, pCol->type);
H
TD-90  
Hongze Cheng 已提交
316 317 318 319 320 321 322
    pCol->len += varDataTLen(ptr);
  } else {
    setNull(POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * index), pCol->type, pCol->bytes);
    pCol->len += TYPE_BYTES[pCol->type];
  }
}

323
static void dataColSetNEleNull(SDataCol *pCol, int nEle) {
H
TD-90  
Hongze Cheng 已提交
324 325 326 327 328 329 330 331
  if (IS_VAR_DATA_TYPE(pCol->type)) {
    pCol->len = 0;
    for (int i = 0; i < nEle; i++) {
      dataColSetNullAt(pCol, i);
    }
  } else {
    setNullN(pCol->pData, pCol->type, pCol->bytes, nEle);
    pCol->len = TYPE_BYTES[pCol->type] * nEle;
H
TD-166  
hzcheng 已提交
332 333 334
  }
}

C
Cary Xu 已提交
335
void *dataColSetOffset(SDataCol *pCol, int nEle) {
H
TD-166  
hzcheng 已提交
336 337
  ASSERT(((pCol->type == TSDB_DATA_TYPE_BINARY) || (pCol->type == TSDB_DATA_TYPE_NCHAR)));

H
Hongze Cheng 已提交
338
  void *tptr = pCol->pData;
H
TD-166  
hzcheng 已提交
339
  // char *tptr = (char *)(pCol->pData);
H
TD-166  
hzcheng 已提交
340

H
TD-166  
hzcheng 已提交
341
  VarDataOffsetT offset = 0;
H
TD-166  
hzcheng 已提交
342
  for (int i = 0; i < nEle; i++) {
H
TD-166  
hzcheng 已提交
343
    pCol->dataOff[i] = offset;
H
TD-166  
hzcheng 已提交
344
    offset += varDataTLen(tptr);
H
hzcheng 已提交
345
    tptr = POINTER_SHIFT(tptr, varDataTLen(tptr));
H
TD-166  
hzcheng 已提交
346
  }
C
Cary Xu 已提交
347
  return POINTER_SHIFT(tptr, varDataTLen(tptr));
H
TD-166  
hzcheng 已提交
348 349
}

L
Liu Jicong 已提交
350
SDataCols *tdNewDataCols(int maxCols, int maxRows) {
wafwerar's avatar
wafwerar 已提交
351
  SDataCols *pCols = (SDataCols *)taosMemoryCalloc(1, sizeof(SDataCols));
H
Haojun Liao 已提交
352
  if (pCols == NULL) {
S
Shengliang Guan 已提交
353
    uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)sizeof(SDataCols), strerror(errno));
H
Haojun Liao 已提交
354 355
    return NULL;
  }
H
TD-34  
hzcheng 已提交
356

H
Hongze Cheng 已提交
357
  pCols->maxPoints = maxRows;
L
Liu Jicong 已提交
358 359 360
  pCols->maxCols = maxCols;
  pCols->numOfRows = 0;
  pCols->numOfCols = 0;
H
Hongze Cheng 已提交
361 362

  if (maxCols > 0) {
wafwerar's avatar
wafwerar 已提交
363
    pCols->cols = (SDataCol *)taosMemoryCalloc(maxCols, sizeof(SDataCol));
H
Hongze Cheng 已提交
364 365 366 367 368 369
    if (pCols->cols == NULL) {
      uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)sizeof(SDataCol) * maxCols,
             strerror(errno));
      tdFreeDataCols(pCols);
      return NULL;
    }
L
Liu Jicong 已提交
370
    int i;
S
Shengliang Guan 已提交
371
    for (i = 0; i < maxCols; i++) {
L
Liu Jicong 已提交
372
      pCols->cols[i].spaceSize = 0;
L
Liu Jicong 已提交
373
      pCols->cols[i].len = 0;
L
Liu Jicong 已提交
374 375 376
      pCols->cols[i].pData = NULL;
      pCols->cols[i].dataOff = NULL;
    }
H
Hongze Cheng 已提交
377 378
  }

H
TD-34  
hzcheng 已提交
379 380 381
  return pCols;
}

H
Hongze Cheng 已提交
382
int tdInitDataCols(SDataCols *pCols, STSchema *pSchema) {
383 384
  int i;
  int oldMaxCols = pCols->maxCols;
L
Liu Jicong 已提交
385
  if (schemaNCols(pSchema) > oldMaxCols) {
H
Hongze Cheng 已提交
386
    pCols->maxCols = schemaNCols(pSchema);
wafwerar's avatar
wafwerar 已提交
387
    void *ptr = (SDataCol *)taosMemoryRealloc(pCols->cols, sizeof(SDataCol) * pCols->maxCols);
L
Liu Jicong 已提交
388 389
    if (ptr == NULL) return -1;
    pCols->cols = ptr;
S
Shengliang Guan 已提交
390
    for (i = oldMaxCols; i < pCols->maxCols; i++) {
391 392
      pCols->cols[i].pData = NULL;
      pCols->cols[i].dataOff = NULL;
L
Liu Jicong 已提交
393
      pCols->cols[i].spaceSize = 0;
394
    }
L
Liu Jicong 已提交
395
  }
H
Hongze Cheng 已提交
396

H
TD-34  
hzcheng 已提交
397 398 399
  tdResetDataCols(pCols);
  pCols->numOfCols = schemaNCols(pSchema);

400
  for (i = 0; i < schemaNCols(pSchema); i++) {
401
    dataColInit(pCols->cols + i, schemaColAt(pSchema, i), pCols->maxPoints);
H
TD-34  
hzcheng 已提交
402
  }
S
Shengliang Guan 已提交
403

H
Hongze Cheng 已提交
404
  return 0;
H
TD-34  
hzcheng 已提交
405 406
}

H
Hongze Cheng 已提交
407
SDataCols *tdFreeDataCols(SDataCols *pCols) {
408
  int i;
H
TD-34  
hzcheng 已提交
409
  if (pCols) {
S
Shengliang Guan 已提交
410
    if (pCols->cols) {
411
      int maxCols = pCols->maxCols;
S
Shengliang Guan 已提交
412
      for (i = 0; i < maxCols; i++) {
413
        SDataCol *pCol = &pCols->cols[i];
wafwerar's avatar
wafwerar 已提交
414
        taosMemoryFreeClear(pCol->pData);
415
      }
wafwerar's avatar
wafwerar 已提交
416
      taosMemoryFree(pCols->cols);
417 418
      pCols->cols = NULL;
    }
wafwerar's avatar
wafwerar 已提交
419
    taosMemoryFree(pCols);
H
TD-34  
hzcheng 已提交
420
  }
H
Hongze Cheng 已提交
421
  return NULL;
H
TD-34  
hzcheng 已提交
422 423
}

C
Cary Xu 已提交
424
#if 0
H
TD-100  
hzcheng 已提交
425
SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
L
Liu Jicong 已提交
426
  SDataCols *pRet = tdNewDataCols(pDataCols->maxCols, pDataCols->maxPoints);
H
TD-100  
hzcheng 已提交
427 428 429 430
  if (pRet == NULL) return NULL;

  pRet->numOfCols = pDataCols->numOfCols;
  pRet->sversion = pDataCols->sversion;
H
Haojun Liao 已提交
431
  if (keepData) pRet->numOfRows = pDataCols->numOfRows;
H
TD-100  
hzcheng 已提交
432 433 434

  for (int i = 0; i < pDataCols->numOfCols; i++) {
    pRet->cols[i].type = pDataCols->cols[i].type;
C
Cary Xu 已提交
435
    pRet->cols[i].bitmap = pDataCols->cols[i].bitmap;
H
TD-100  
hzcheng 已提交
436 437 438
    pRet->cols[i].colId = pDataCols->cols[i].colId;
    pRet->cols[i].bytes = pDataCols->cols[i].bytes;
    pRet->cols[i].offset = pDataCols->cols[i].offset;
H
TD-166  
hzcheng 已提交
439 440

    if (keepData) {
L
Liu Jicong 已提交
441
      if (pDataCols->cols[i].len > 0) {
S
Shengliang Guan 已提交
442
        if (tdAllocMemForCol(&pRet->cols[i], pRet->maxPoints) < 0) {
L
Liu Jicong 已提交
443 444 445
          tdFreeDataCols(pRet);
          return NULL;
        }
L
Liu Jicong 已提交
446
        pRet->cols[i].len = pDataCols->cols[i].len;
H
Hongze Cheng 已提交
447
        memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pDataCols->cols[i].len);
H
Hongze Cheng 已提交
448
        if (IS_VAR_DATA_TYPE(pRet->cols[i].type)) {
L
Liu Jicong 已提交
449 450
          int dataOffSize = sizeof(VarDataOffsetT) * pDataCols->maxPoints;
          memcpy(pRet->cols[i].dataOff, pDataCols->cols[i].dataOff, dataOffSize);
H
Hongze Cheng 已提交
451
        }
H
TD-166  
hzcheng 已提交
452 453
      }
    }
H
TD-100  
hzcheng 已提交
454 455 456 457
  }

  return pRet;
}
C
Cary Xu 已提交
458
#endif
H
TD-100  
hzcheng 已提交
459

H
TD-34  
hzcheng 已提交
460
void tdResetDataCols(SDataCols *pCols) {
B
Bomin Zhang 已提交
461 462 463 464 465
  if (pCols != NULL) {
    pCols->numOfRows = 0;
    for (int i = 0; i < pCols->maxCols; i++) {
      dataColReset(pCols->cols + i);
    }
H
TD-34  
hzcheng 已提交
466 467
  }
}
C
Cary Xu 已提交
468
#if 0
469
static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull) {
H
TD-1548  
Hongze Cheng 已提交
470
  ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < dataRowKey(row));
H
TD-166  
hzcheng 已提交
471

C
Cary Xu 已提交
472
  int rcol = 0;
H
TD-90  
Hongze Cheng 已提交
473 474
  int dcol = 0;

475
  while (dcol < pCols->numOfCols) {
476
    bool setCol = 0;
477 478 479 480 481
    SDataCol *pDataCol = &(pCols->cols[dcol]);
    if (rcol >= schemaNCols(pSchema)) {
      dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
      dcol++;
      continue;
H
TD-90  
Hongze Cheng 已提交
482
    }
H
TD-166  
hzcheng 已提交
483

484 485 486
    STColumn *pRowCol = schemaColAt(pSchema, rcol);
    if (pRowCol->colId == pDataCol->colId) {
      void *value = tdGetRowDataOfCol(row, pRowCol->type, pRowCol->offset + TD_DATA_ROW_HEAD_SIZE);
487
      if(!isNull(value, pDataCol->type)) setCol = 1;
488 489 490 491 492 493
      dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints);
      dcol++;
      rcol++;
    } else if (pRowCol->colId < pDataCol->colId) {
      rcol++;
    } else {
494
      if(forceSetNull || setCol) {
495
        dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
C
Cary Xu 已提交
496
      }
497
      dcol++;
C
Cary Xu 已提交
498 499 500 501 502
    }
  }
  pCols->numOfRows++;
}

C
Cary Xu 已提交
503
static void tdAppendKVRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull) {
C
Cary Xu 已提交
504 505 506 507 508
  ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < kvRowKey(row));

  int rcol = 0;
  int dcol = 0;

509 510 511
  int nRowCols = kvRowNCols(row);

  while (dcol < pCols->numOfCols) {
512
    bool setCol = 0;
513 514 515 516 517
    SDataCol *pDataCol = &(pCols->cols[dcol]);
    if (rcol >= nRowCols || rcol >= schemaNCols(pSchema)) {
      dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
      ++dcol;
      continue;
C
Cary Xu 已提交
518 519
    }

520 521 522 523
    SColIdx *colIdx = kvRowColIdxAt(row, rcol);

    if (colIdx->colId == pDataCol->colId) {
      void *value = tdGetKvRowDataOfCol(row, colIdx->offset);
524
      if(!isNull(value, pDataCol->type)) setCol = 1;
525 526 527 528 529 530
      dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints);
      ++dcol;
      ++rcol;
    } else if (colIdx->colId < pDataCol->colId) {
      ++rcol;
    } else {
531
      if(forceSetNull || setCol) {
C
Cary Xu 已提交
532
        dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
H
TD-1548  
Hongze Cheng 已提交
533
      }
534
      ++dcol;
H
TD-90  
Hongze Cheng 已提交
535
    }
H
TD-34  
hzcheng 已提交
536
  }
H
Haojun Liao 已提交
537
  pCols->numOfRows++;
H
TD-34  
hzcheng 已提交
538
}
H
TD-166  
hzcheng 已提交
539

540
void tdAppendMemRowToDataCol(SMemRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull) {
C
Cary Xu 已提交
541
  if (isDataRow(row)) {
542
    tdAppendDataRowToDataCol(memRowDataBody(row), pSchema, pCols, forceSetNull);
C
Cary Xu 已提交
543
  } else if (isKvRow(row)) {
C
Cary Xu 已提交
544
    tdAppendKVRowToDataCol(memRowKvBody(row), pSchema, pCols, forceSetNull);
C
Cary Xu 已提交
545 546 547 548 549
  } else {
    ASSERT(0);
  }
}

550
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset, bool forceSetNull) {
H
Haojun Liao 已提交
551
  ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows);
H
TD-166  
hzcheng 已提交
552
  ASSERT(target->numOfCols == source->numOfCols);
H
Hongze Cheng 已提交
553 554 555 556 557
  int offset = 0;

  if (pOffset == NULL) {
    pOffset = &offset;
  }
H
TD-100  
hzcheng 已提交
558

H
TD-166  
hzcheng 已提交
559
  SDataCols *pTarget = NULL;
H
TD-100  
hzcheng 已提交
560

561
  if ((target->numOfRows == 0) || (dataColsKeyLast(target) < dataColsKeyAtRow(source, *pOffset))) {  // No overlap
H
Hongze Cheng 已提交
562
    ASSERT(target->numOfRows + rowsToMerge <= target->maxPoints);
H
TD-166  
hzcheng 已提交
563 564
    for (int i = 0; i < rowsToMerge; i++) {
      for (int j = 0; j < source->numOfCols; j++) {
565
        if (source->cols[j].len > 0 || target->cols[j].len > 0) {
H
Hongze Cheng 已提交
566
          dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i + (*pOffset)), target->numOfRows,
H
Hongze Cheng 已提交
567 568
                           target->maxPoints);
        }
H
TD-166  
hzcheng 已提交
569
      }
H
Haojun Liao 已提交
570
      target->numOfRows++;
H
TD-166  
hzcheng 已提交
571
    }
L
lichuang 已提交
572
    (*pOffset) += rowsToMerge;
H
TD-166  
hzcheng 已提交
573 574 575 576 577
  } else {
    pTarget = tdDupDataCols(target, true);
    if (pTarget == NULL) goto _err;

    int iter1 = 0;
H
Hongze Cheng 已提交
578
    tdMergeTwoDataCols(target, pTarget, &iter1, pTarget->numOfRows, source, pOffset, source->numOfRows,
579
                       pTarget->numOfRows + rowsToMerge, forceSetNull);
H
TD-166  
hzcheng 已提交
580
  }
H
TD-100  
hzcheng 已提交
581 582 583 584 585 586 587 588

  tdFreeDataCols(pTarget);
  return 0;

_err:
  tdFreeDataCols(pTarget);
  return -1;
}
H
TD-100  
hzcheng 已提交
589

H
TD-1438  
Hongze Cheng 已提交
590 591
// src2 data has more priority than src1
static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2,
592
                               int limit2, int tRows, bool forceSetNull) {
H
TD-100  
hzcheng 已提交
593
  tdResetDataCols(target);
H
TD-521  
Hongze Cheng 已提交
594
  ASSERT(limit1 <= src1->numOfRows && limit2 <= src2->numOfRows);
H
TD-100  
hzcheng 已提交
595

H
Haojun Liao 已提交
596
  while (target->numOfRows < tRows) {
H
TD-521  
Hongze Cheng 已提交
597
    if (*iter1 >= limit1 && *iter2 >= limit2) break;
H
TD-100  
hzcheng 已提交
598

H
TD-1548  
Hongze Cheng 已提交
599 600 601 602 603 604
    TSKEY key1 = (*iter1 >= limit1) ? INT64_MAX : dataColsKeyAt(src1, *iter1);
    TKEY  tkey1 = (*iter1 >= limit1) ? TKEY_NULL : dataColsTKeyAt(src1, *iter1);
    TSKEY key2 = (*iter2 >= limit2) ? INT64_MAX : dataColsKeyAt(src2, *iter2);
    TKEY  tkey2 = (*iter2 >= limit2) ? TKEY_NULL : dataColsTKeyAt(src2, *iter2);

    ASSERT(tkey1 == TKEY_NULL || (!TKEY_IS_DELETED(tkey1)));
H
TD-100  
hzcheng 已提交
605

H
TD-1438  
Hongze Cheng 已提交
606
    if (key1 < key2) {
H
TD-100  
hzcheng 已提交
607 608
      for (int i = 0; i < src1->numOfCols; i++) {
        ASSERT(target->cols[i].type == src1->cols[i].type);
609
        if (src1->cols[i].len > 0 || target->cols[i].len > 0) {
H
Hongze Cheng 已提交
610 611 612
          dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows,
                           target->maxPoints);
        }
H
TD-100  
hzcheng 已提交
613 614
      }

H
Haojun Liao 已提交
615
      target->numOfRows++;
H
TD-100  
hzcheng 已提交
616
      (*iter1)++;
H
TD-1548  
Hongze Cheng 已提交
617 618 619 620
    } else if (key1 >= key2) {
      if ((key1 > key2) || (key1 == key2 && !TKEY_IS_DELETED(tkey2))) {
        for (int i = 0; i < src2->numOfCols; i++) {
          ASSERT(target->cols[i].type == src2->cols[i].type);
L
Liu Jicong 已提交
621
          if (src2->cols[i].len > 0 && !isNull(src2->cols[i].pData, src2->cols[i].type)) {
H
TD-1548  
Hongze Cheng 已提交
622 623
            dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src2->cols + i, *iter2), target->numOfRows,
                             target->maxPoints);
L
Liu Jicong 已提交
624 625 626
          } else if(!forceSetNull && key1 == key2 && src1->cols[i].len > 0) {
            dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows,
                             target->maxPoints);
627 628
          } else if(target->cols[i].len > 0) {
            dataColSetNullAt(&target->cols[i], target->numOfRows);
H
TD-1548  
Hongze Cheng 已提交
629
          }
H
Hongze Cheng 已提交
630
        }
H
Hongze Cheng 已提交
631
        target->numOfRows++;
H
TD-100  
hzcheng 已提交
632
      }
H
TD-100  
hzcheng 已提交
633

H
TD-100  
hzcheng 已提交
634
      (*iter2)++;
H
TD-1438  
Hongze Cheng 已提交
635
      if (key1 == key2) (*iter1)++;
H
TD-100  
hzcheng 已提交
636
    }
H
Hongze Cheng 已提交
637 638

    ASSERT(target->numOfRows <= target->maxPoints);
H
TD-100  
hzcheng 已提交
639
  }
H
Hongze Cheng 已提交
640
}
C
Cary Xu 已提交
641
#endif
H
Hongze Cheng 已提交
642

H
Hongze Cheng 已提交
643
SKVRow tdKVRowDup(SKVRow row) {
wafwerar's avatar
wafwerar 已提交
644
  SKVRow trow = taosMemoryMalloc(kvRowLen(row));
H
Hongze Cheng 已提交
645 646
  if (trow == NULL) return NULL;

H
Hongze Cheng 已提交
647
  kvRowCpy(trow, row);
H
Hongze Cheng 已提交
648 649 650
  return trow;
}

S
Shengliang Guan 已提交
651 652 653
static int compareColIdx(const void *a, const void *b) {
  const SColIdx *x = (const SColIdx *)a;
  const SColIdx *y = (const SColIdx *)b;
B
Bomin Zhang 已提交
654 655 656 657 658 659 660 661 662
  if (x->colId > y->colId) {
    return 1;
  }
  if (x->colId < y->colId) {
    return -1;
  }
  return 0;
}

S
Shengliang Guan 已提交
663
void tdSortKVRowByColIdx(SKVRow row) { qsort(kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), compareColIdx); }
B
Bomin Zhang 已提交
664

H
TD-90  
Hongze Cheng 已提交
665 666 667 668
int tdSetKVRowDataOfCol(SKVRow *orow, int16_t colId, int8_t type, void *value) {
  SColIdx *pColIdx = NULL;
  SKVRow   row = *orow;
  SKVRow   nrow = NULL;
S
Shengliang Guan 已提交
669
  void    *ptr = taosbsearch(&colId, kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), comparTagId, TD_GE);
H
TD-90  
Hongze Cheng 已提交
670

671
  if (ptr == NULL || ((SColIdx *)ptr)->colId > colId) {  // need to add a column value to the row
C
Cary Xu 已提交
672
    int diff = IS_VAR_DATA_TYPE(type) ? varDataTLen(value) : TYPE_BYTES[type];
673 674 675 676
    int nRowLen = kvRowLen(row) + sizeof(SColIdx) + diff;
    int oRowCols = kvRowNCols(row);

    ASSERT(diff > 0);
wafwerar's avatar
wafwerar 已提交
677
    nrow = taosMemoryMalloc(nRowLen);
H
TD-90  
Hongze Cheng 已提交
678 679
    if (nrow == NULL) return -1;

680 681
    kvRowSetLen(nrow, nRowLen);
    kvRowSetNCols(nrow, oRowCols + 1);
H
TD-90  
Hongze Cheng 已提交
682

683 684
    memcpy(kvRowColIdx(nrow), kvRowColIdx(row), sizeof(SColIdx) * oRowCols);
    memcpy(kvRowValues(nrow), kvRowValues(row), kvRowValLen(row));
H
TD-90  
Hongze Cheng 已提交
685

686 687 688
    pColIdx = kvRowColIdxAt(nrow, oRowCols);
    pColIdx->colId = colId;
    pColIdx->offset = kvRowValLen(row);
H
TD-90  
Hongze Cheng 已提交
689

690
    memcpy(kvRowColVal(nrow, pColIdx), value, diff);  // copy new value
H
TD-90  
Hongze Cheng 已提交
691

692
    tdSortKVRowByColIdx(nrow);
H
TD-90  
Hongze Cheng 已提交
693 694

    *orow = nrow;
wafwerar's avatar
wafwerar 已提交
695
    taosMemoryFree(row);
H
TD-90  
Hongze Cheng 已提交
696 697 698 699 700
  } else {
    ASSERT(((SColIdx *)ptr)->colId == colId);
    if (IS_VAR_DATA_TYPE(type)) {
      void *pOldVal = kvRowColVal(row, (SColIdx *)ptr);

S
Shengliang Guan 已提交
701
      if (varDataTLen(value) == varDataTLen(pOldVal)) {  // just update the column value in place
H
TD-90  
Hongze Cheng 已提交
702
        memcpy(pOldVal, value, varDataTLen(value));
703 704
      } else {  // need to reallocate the memory
        int16_t nlen = kvRowLen(row) + (varDataTLen(value) - varDataTLen(pOldVal));
H
TD-90  
Hongze Cheng 已提交
705
        ASSERT(nlen > 0);
wafwerar's avatar
wafwerar 已提交
706
        nrow = taosMemoryMalloc(nlen);
H
TD-90  
Hongze Cheng 已提交
707
        if (nrow == NULL) return -1;
H
TD-90  
Hongze Cheng 已提交
708 709 710 711

        kvRowSetLen(nrow, nlen);
        kvRowSetNCols(nrow, kvRowNCols(row));

712 713 714 715 716 717 718 719
        int zsize = sizeof(SColIdx) * kvRowNCols(row) + ((SColIdx *)ptr)->offset;
        memcpy(kvRowColIdx(nrow), kvRowColIdx(row), zsize);
        memcpy(kvRowColVal(nrow, ((SColIdx *)ptr)), value, varDataTLen(value));
        // Copy left value part
        int lsize = kvRowLen(row) - TD_KV_ROW_HEAD_SIZE - zsize - varDataTLen(pOldVal);
        if (lsize > 0) {
          memcpy(POINTER_SHIFT(nrow, TD_KV_ROW_HEAD_SIZE + zsize + varDataTLen(value)),
                 POINTER_SHIFT(row, TD_KV_ROW_HEAD_SIZE + zsize + varDataTLen(pOldVal)), lsize);
H
TD-90  
Hongze Cheng 已提交
720 721
        }

722 723 724 725 726
        for (int i = 0; i < kvRowNCols(nrow); i++) {
          pColIdx = kvRowColIdxAt(nrow, i);

          if (pColIdx->offset > ((SColIdx *)ptr)->offset) {
            pColIdx->offset = pColIdx->offset - varDataTLen(pOldVal) + varDataTLen(value);
H
TD-90  
Hongze Cheng 已提交
727 728 729 730
          }
        }

        *orow = nrow;
wafwerar's avatar
wafwerar 已提交
731
        taosMemoryFree(row);
H
TD-90  
Hongze Cheng 已提交
732 733 734 735 736 737 738
      }
    } else {
      memcpy(kvRowColVal(row, (SColIdx *)ptr), value, TYPE_BYTES[type]);
    }
  }

  return 0;
H
Hongze Cheng 已提交
739 740
}

H
TD-353  
Hongze Cheng 已提交
741
int tdEncodeKVRow(void **buf, SKVRow row) {
H
Hongze Cheng 已提交
742
  // May change the encode purpose
H
TD-353  
Hongze Cheng 已提交
743 744 745 746 747 748
  if (buf != NULL) {
    kvRowCpy(*buf, row);
    *buf = POINTER_SHIFT(*buf, kvRowLen(row));
  }

  return kvRowLen(row);
H
Hongze Cheng 已提交
749 750
}

H
Hongze Cheng 已提交
751 752
void *tdDecodeKVRow(void *buf, SKVRow *row) {
  *row = tdKVRowDup(buf);
H
TD-353  
Hongze Cheng 已提交
753
  if (*row == NULL) return NULL;
H
Hongze Cheng 已提交
754
  return POINTER_SHIFT(buf, kvRowLen(*row));
H
Hongze Cheng 已提交
755 756
}

H
Hongze Cheng 已提交
757
int tdInitKVRowBuilder(SKVRowBuilder *pBuilder) {
H
Hongze Cheng 已提交
758 759
  pBuilder->tCols = 128;
  pBuilder->nCols = 0;
wafwerar's avatar
wafwerar 已提交
760
  pBuilder->pColIdx = (SColIdx *)taosMemoryMalloc(sizeof(SColIdx) * pBuilder->tCols);
H
Hongze Cheng 已提交
761 762 763
  if (pBuilder->pColIdx == NULL) return -1;
  pBuilder->alloc = 1024;
  pBuilder->size = 0;
wafwerar's avatar
wafwerar 已提交
764
  pBuilder->buf = taosMemoryMalloc(pBuilder->alloc);
H
Hongze Cheng 已提交
765
  if (pBuilder->buf == NULL) {
wafwerar's avatar
wafwerar 已提交
766
    taosMemoryFree(pBuilder->pColIdx);
H
Hongze Cheng 已提交
767 768 769 770 771
    return -1;
  }
  return 0;
}

H
Hongze Cheng 已提交
772
void tdDestroyKVRowBuilder(SKVRowBuilder *pBuilder) {
wafwerar's avatar
wafwerar 已提交
773 774
  taosMemoryFreeClear(pBuilder->pColIdx);
  taosMemoryFreeClear(pBuilder->buf);
H
Hongze Cheng 已提交
775 776
}

H
Hongze Cheng 已提交
777
void tdResetKVRowBuilder(SKVRowBuilder *pBuilder) {
H
Hongze Cheng 已提交
778 779 780 781
  pBuilder->nCols = 0;
  pBuilder->size = 0;
}

H
Hongze Cheng 已提交
782
SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder) {
C
Cary Xu 已提交
783
  int tlen = sizeof(SColIdx) * pBuilder->nCols + pBuilder->size;
H
Hongze Cheng 已提交
784 785
  if (tlen == 0) return NULL;

H
Hongze Cheng 已提交
786 787
  tlen += TD_KV_ROW_HEAD_SIZE;

wafwerar's avatar
wafwerar 已提交
788
  SKVRow row = taosMemoryMalloc(tlen);
H
Hongze Cheng 已提交
789 790
  if (row == NULL) return NULL;

H
Hongze Cheng 已提交
791
  kvRowSetNCols(row, pBuilder->nCols);
H
Hongze Cheng 已提交
792
  kvRowSetLen(row, tlen);
H
Hongze Cheng 已提交
793

H
Hongze Cheng 已提交
794 795
  memcpy(kvRowColIdx(row), pBuilder->pColIdx, sizeof(SColIdx) * pBuilder->nCols);
  memcpy(kvRowValues(row), pBuilder->buf, pBuilder->size);
H
Hongze Cheng 已提交
796 797

  return row;
798
}
C
Cary Xu 已提交
799
#if 0
800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818
SMemRow mergeTwoMemRows(void *buffer, SMemRow row1, SMemRow row2, STSchema *pSchema1, STSchema *pSchema2) {
#if 0
  ASSERT(memRowKey(row1) == memRowKey(row2));
  ASSERT(schemaVersion(pSchema1) == memRowVersion(row1));
  ASSERT(schemaVersion(pSchema2) == memRowVersion(row2));
  ASSERT(schemaVersion(pSchema1) >= schemaVersion(pSchema2));
#endif

  SArray *stashRow = taosArrayInit(pSchema1->numOfCols, sizeof(SColInfo));
  if (stashRow == NULL) {
    return NULL;
  }

  SMemRow  pRow = buffer;
  SDataRow dataRow = memRowDataBody(pRow);
  memRowSetType(pRow, SMEM_ROW_DATA);
  dataRowSetVersion(dataRow, schemaVersion(pSchema1));  // use latest schema version
  dataRowSetLen(dataRow, (TDRowLenT)(TD_DATA_ROW_HEAD_SIZE + pSchema1->flen));

C
Cary Xu 已提交
819
  TDRowLenT dataLen = 0, kvLen = TD_MEM_ROW_KV_HEAD_SIZE;
820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884

  int32_t  i = 0;  // row1
  int32_t  j = 0;  // row2
  int32_t  nCols1 = schemaNCols(pSchema1);
  int32_t  nCols2 = schemaNCols(pSchema2);
  SColInfo colInfo = {0};
  int32_t  kvIdx1 = 0, kvIdx2 = 0;

  while (i < nCols1) {
    STColumn *pCol = schemaColAt(pSchema1, i);
    void *    val1 = tdGetMemRowDataOfColEx(row1, pCol->colId, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset, &kvIdx1);
    // if val1 != NULL, use val1;
    if (val1 != NULL && !isNull(val1, pCol->type)) {
      tdAppendColVal(dataRow, val1, pCol->type, pCol->offset);
      kvLen += tdGetColAppendLen(SMEM_ROW_KV, val1, pCol->type);
      setSColInfo(&colInfo, pCol->colId, pCol->type, val1);
      taosArrayPush(stashRow, &colInfo);
      ++i;  // next col
      continue;
    }

    void *val2 = NULL;
    while (j < nCols2) {
      STColumn *tCol = schemaColAt(pSchema2, j);
      if (tCol->colId < pCol->colId) {
        ++j;
        continue;
      }
      if (tCol->colId == pCol->colId) {
        val2 = tdGetMemRowDataOfColEx(row2, tCol->colId, tCol->type, TD_DATA_ROW_HEAD_SIZE + tCol->offset, &kvIdx2);
      } else if (tCol->colId > pCol->colId) {
        // set NULL
      }
      break;
    }  // end of while(j<nCols2)
    if (val2 == NULL) {
      val2 = (void *)getNullValue(pCol->type);
    }
    tdAppendColVal(dataRow, val2, pCol->type, pCol->offset);
    if (!isNull(val2, pCol->type)) {
      kvLen += tdGetColAppendLen(SMEM_ROW_KV, val2, pCol->type);
      setSColInfo(&colInfo, pCol->colId, pCol->type, val2);
      taosArrayPush(stashRow, &colInfo);
    }

    ++i;  // next col
  }

  dataLen = memRowTLen(pRow);

  if (kvLen < dataLen) {
    // scan stashRow and generate SKVRow
    memset(buffer, 0, sizeof(dataLen));
    SMemRow tRow = buffer;
    memRowSetType(tRow, SMEM_ROW_KV);
    SKVRow kvRow = (SKVRow)memRowKvBody(tRow);
    int16_t nKvNCols = (int16_t) taosArrayGetSize(stashRow);
    kvRowSetLen(kvRow, (TDRowLenT)(TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * nKvNCols));
    kvRowSetNCols(kvRow, nKvNCols);
    memRowSetKvVersion(tRow, pSchema1->version);

    int32_t toffset = 0;
    int16_t k;
    for (k = 0; k < nKvNCols; ++k) {
      SColInfo *pColInfo = taosArrayGet(stashRow, k);
885 886
      tdAppendKvColVal(kvRow, pColInfo->colVal, true, pColInfo->colId, pColInfo->colType, toffset);
      toffset += sizeof(SColIdx);
887 888 889 890 891 892
    }
    ASSERT(kvLen == memRowTLen(tRow));
  }
  taosArrayDestroy(stashRow);
  return buffer;
}
C
Cary Xu 已提交
893
#endif