tdataformat.c 20.7 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
S
slguan 已提交
15
#include "tdataformat.h"
T
Tao Liu 已提交
16
#include "talgo.h"
H
TD-353  
Hongze Cheng 已提交
17
#include "tcoding.h"
H
Hongze Cheng 已提交
18
#include "wchar.h"
H
more  
hzcheng 已提交
19

H
hzcheng 已提交
20 21 22 23
/**
 * Duplicate the schema and return a new object
 */
STSchema *tdDupSchema(STSchema *pSchema) {
H
Hongze Cheng 已提交
24 25 26

  int tlen = sizeof(STSchema) + sizeof(STColumn) * schemaNCols(pSchema);
  STSchema *tSchema = (STSchema *)malloc(tlen);
H
hzcheng 已提交
27 28
  if (tSchema == NULL) return NULL;

H
Hongze Cheng 已提交
29
  memcpy((void *)tSchema, (void *)pSchema, tlen);
H
hzcheng 已提交
30 31 32 33

  return tSchema;
}

H
TD-27  
hzcheng 已提交
34 35 36
/**
 * Encode a schema to dst, and return the next pointer
 */
H
TD-353  
Hongze Cheng 已提交
37 38 39 40
int tdEncodeSchema(void **buf, STSchema *pSchema) {
  int tlen = 0;
  tlen += taosEncodeFixedI32(buf, schemaVersion(pSchema));
  tlen += taosEncodeFixedI32(buf, schemaNCols(pSchema));
H
TD-166  
hzcheng 已提交
41

H
TD-27  
hzcheng 已提交
42 43
  for (int i = 0; i < schemaNCols(pSchema); i++) {
    STColumn *pCol = schemaColAt(pSchema, i);
H
TD-353  
Hongze Cheng 已提交
44 45 46
    tlen += taosEncodeFixedI8(buf, colType(pCol));
    tlen += taosEncodeFixedI16(buf, colColId(pCol));
    tlen += taosEncodeFixedI32(buf, colBytes(pCol));
H
TD-27  
hzcheng 已提交
47 48
  }

H
TD-353  
Hongze Cheng 已提交
49
  return tlen;
H
TD-27  
hzcheng 已提交
50 51 52 53 54
}

/**
 * Decode a schema from a binary.
 */
H
TD-353  
Hongze Cheng 已提交
55
void *tdDecodeSchema(void *buf, STSchema **pRSchema) {
H
Hongze Cheng 已提交
56
  int version = 0;
H
TD-353  
Hongze Cheng 已提交
57
  int numOfCols = 0;
H
TD-353  
Hongze Cheng 已提交
58
  STSchemaBuilder schemaBuilder;
H
TD-27  
hzcheng 已提交
59

H
TD-353  
Hongze Cheng 已提交
60 61
  buf = taosDecodeFixedI32(buf, &version);
  buf = taosDecodeFixedI32(buf, &numOfCols);
H
TD-27  
hzcheng 已提交
62

H
Hongze Cheng 已提交
63 64
  if (tdInitTSchemaBuilder(&schemaBuilder, version) < 0) return NULL;

H
TD-353  
Hongze Cheng 已提交
65
  for (int i = 0; i < numOfCols; i++) {
H
TD-27  
hzcheng 已提交
66 67 68
    int8_t  type = 0;
    int16_t colId = 0;
    int32_t bytes = 0;
H
TD-353  
Hongze Cheng 已提交
69 70 71
    buf = taosDecodeFixedI8(buf, &type);
    buf = taosDecodeFixedI16(buf, &colId);
    buf = taosDecodeFixedI32(buf, &bytes);
H
Hongze Cheng 已提交
72 73 74 75
    if (tdAddColToSchema(&schemaBuilder, type, colId, bytes) < 0) {
      tdDestroyTSchemaBuilder(&schemaBuilder);
      return NULL;
    }
H
TD-27  
hzcheng 已提交
76 77
  }

H
TD-353  
Hongze Cheng 已提交
78
  *pRSchema = tdGetSchemaFromBuilder(&schemaBuilder);
H
Hongze Cheng 已提交
79
  tdDestroyTSchemaBuilder(&schemaBuilder);
H
TD-353  
Hongze Cheng 已提交
80
  return buf;
H
Hongze Cheng 已提交
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
}

int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) {
  if (pBuilder == NULL) return -1;

  pBuilder->tCols = 256;
  pBuilder->columns = (STColumn *)malloc(sizeof(STColumn) * pBuilder->tCols);
  if (pBuilder->columns == NULL) return -1;

  tdResetTSchemaBuilder(pBuilder, version);
  return 0;
}

void tdDestroyTSchemaBuilder(STSchemaBuilder *pBuilder) {
  if (pBuilder) {
    tfree(pBuilder->columns);
  }
}

void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) {
  pBuilder->nCols = 0;
  pBuilder->tlen = 0;
  pBuilder->flen = 0;
T
Tao Liu 已提交
104
  pBuilder->vlen = 0;
H
Hongze Cheng 已提交
105 106 107 108
  pBuilder->version = version;
}

int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int16_t colId, int32_t bytes) {
109
  if (!isValidDataType(type)) return -1;
H
Hongze Cheng 已提交
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128

  if (pBuilder->nCols >= pBuilder->tCols) {
    pBuilder->tCols *= 2;
    pBuilder->columns = (STColumn *)realloc(pBuilder->columns, sizeof(STColumn) * pBuilder->tCols);
    if (pBuilder->columns == NULL) return -1;
  }

  STColumn *pCol = &(pBuilder->columns[pBuilder->nCols]);
  colSetType(pCol, type);
  colSetColId(pCol, colId);
  if (pBuilder->nCols == 0) {
    colSetOffset(pCol, 0);
  } else {
    STColumn *pTCol = &(pBuilder->columns[pBuilder->nCols-1]);
    colSetOffset(pCol, pTCol->offset + TYPE_BYTES[pTCol->type]);
  }

  if (IS_VAR_DATA_TYPE(type)) {
    colSetBytes(pCol, bytes);
T
Tao Liu 已提交
129 130
    pBuilder->tlen += (TYPE_BYTES[type] + bytes);
    pBuilder->vlen += bytes - sizeof(VarDataLenT);
H
Hongze Cheng 已提交
131 132 133
  } else {
    colSetBytes(pCol, TYPE_BYTES[type]);
    pBuilder->tlen += TYPE_BYTES[type];
T
Tao Liu 已提交
134
    pBuilder->vlen += TYPE_BYTES[type];
H
Hongze Cheng 已提交
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
  }

  pBuilder->nCols++;
  pBuilder->flen += TYPE_BYTES[type];

  ASSERT(pCol->offset < pBuilder->flen);

  return 0;
}

STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder) {
  if (pBuilder->nCols <= 0) return NULL;

  int tlen = sizeof(STSchema) + sizeof(STColumn) * pBuilder->nCols;

  STSchema *pSchema = (STSchema *)malloc(tlen);
  if (pSchema == NULL) return NULL;

  schemaVersion(pSchema) = pBuilder->version;
  schemaNCols(pSchema) = pBuilder->nCols;
  schemaTLen(pSchema) = pBuilder->tlen;
  schemaFLen(pSchema) = pBuilder->flen;
T
Tao Liu 已提交
157
  schemaVLen(pSchema) = pBuilder->vlen;
H
Hongze Cheng 已提交
158 159 160

  memcpy(schemaColAt(pSchema, 0), pBuilder->columns, sizeof(STColumn) * pBuilder->nCols);

H
TD-27  
hzcheng 已提交
161 162 163
  return pSchema;
}

H
hzcheng 已提交
164 165 166
/**
 * Initialize a data row
 */
H
TD-90  
Hongze Cheng 已提交
167 168 169 170
void tdInitDataRow(SDataRow row, STSchema *pSchema) {
  dataRowSetLen(row, TD_DATA_ROW_HEAD_SIZE + schemaFLen(pSchema));
  dataRowSetVersion(row, schemaVersion(pSchema));
}
H
hzcheng 已提交
171

H
TD-166  
hzcheng 已提交
172
SDataRow tdNewDataRowFromSchema(STSchema *pSchema) {
H
TD-166  
hzcheng 已提交
173
  int32_t size = dataRowMaxBytesFromSchema(pSchema);
H
hzcheng 已提交
174 175 176 177

  SDataRow row = malloc(size);
  if (row == NULL) return NULL;

H
hzcheng 已提交
178
  tdInitDataRow(row, pSchema);
H
hzcheng 已提交
179
  return row;
H
TD-166  
hzcheng 已提交
180
}
H
hzcheng 已提交
181

H
hzcheng 已提交
182 183 184 185 186 187 188
/**
 * Free the SDataRow object
 */
void tdFreeDataRow(SDataRow row) {
  if (row) free(row);
}

H
hzcheng 已提交
189
SDataRow tdDataRowDup(SDataRow row) {
H
hzcheng 已提交
190
  SDataRow trow = malloc(dataRowLen(row));
H
hzcheng 已提交
191 192 193
  if (trow == NULL) return NULL;

  dataRowCpy(trow, row);
H
hzcheng 已提交
194
  return trow;
H
hzcheng 已提交
195
}
H
hzcheng 已提交
196

H
TD-166  
hzcheng 已提交
197 198 199 200
void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints) {
  pDataCol->type = colType(pCol);
  pDataCol->colId = colColId(pCol);
  pDataCol->bytes = colBytes(pCol);
H
TD-166  
hzcheng 已提交
201
  pDataCol->offset = colOffset(pCol) + TD_DATA_ROW_HEAD_SIZE;
H
TD-166  
hzcheng 已提交
202 203 204 205

  pDataCol->len = 0;
  if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) {
    pDataCol->dataOff = (VarDataOffsetT *)(*pBuf);
H
Hongze Cheng 已提交
206 207 208
    pDataCol->pData = POINTER_SHIFT(*pBuf, sizeof(VarDataOffsetT) * maxPoints);
    pDataCol->spaceSize = pDataCol->bytes * maxPoints;
    *pBuf = POINTER_SHIFT(*pBuf, pDataCol->spaceSize + sizeof(VarDataOffsetT) * maxPoints);
H
TD-166  
hzcheng 已提交
209 210 211 212
  } else {
    pDataCol->spaceSize = pDataCol->bytes * maxPoints;
    pDataCol->dataOff = NULL;
    pDataCol->pData = *pBuf;
H
hzcheng 已提交
213
    *pBuf = POINTER_SHIFT(*pBuf, pDataCol->spaceSize);
H
TD-166  
hzcheng 已提交
214 215 216
  }
}

H
Haojun Liao 已提交
217
void dataColAppendVal(SDataCol *pCol, void *value, int numOfRows, int maxPoints) {
H
TD-166  
hzcheng 已提交
218 219 220 221 222 223
  ASSERT(pCol != NULL && value != NULL);

  switch (pCol->type) {
    case TSDB_DATA_TYPE_BINARY:
    case TSDB_DATA_TYPE_NCHAR:
      // set offset
H
Haojun Liao 已提交
224
      pCol->dataOff[numOfRows] = pCol->len;
H
TD-166  
hzcheng 已提交
225
      // Copy data
H
hzcheng 已提交
226
      memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, varDataTLen(value));
H
TD-166  
hzcheng 已提交
227
      // Update the length
H
TD-166  
hzcheng 已提交
228
      pCol->len += varDataTLen(value);
H
TD-166  
hzcheng 已提交
229 230
      break;
    default:
H
Haojun Liao 已提交
231
      ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows);
H
hzcheng 已提交
232
      memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, pCol->bytes);
H
TD-166  
hzcheng 已提交
233 234 235 236 237
      pCol->len += pCol->bytes;
      break;
  }
}

H
Haojun Liao 已提交
238 239
void dataColPopPoints(SDataCol *pCol, int pointsToPop, int numOfRows) {
  int pointsLeft = numOfRows - pointsToPop;
H
TD-166  
hzcheng 已提交
240 241 242 243 244

  ASSERT(pointsLeft > 0);

  if (pCol->type == TSDB_DATA_TYPE_BINARY || pCol->type == TSDB_DATA_TYPE_NCHAR) {
    ASSERT(pCol->len > 0);
H
TD-166  
hzcheng 已提交
245
    VarDataOffsetT toffset = pCol->dataOff[pointsToPop];
H
TD-166  
hzcheng 已提交
246 247
    pCol->len = pCol->len - toffset;
    ASSERT(pCol->len > 0);
H
hzcheng 已提交
248
    memmove(pCol->pData, POINTER_SHIFT(pCol->pData, toffset), pCol->len);
H
TD-166  
hzcheng 已提交
249 250
    dataColSetOffset(pCol, pointsLeft);
  } else {
H
Haojun Liao 已提交
251
    ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows);
H
TD-166  
hzcheng 已提交
252
    pCol->len = TYPE_BYTES[pCol->type] * pointsLeft;
H
hzcheng 已提交
253
    memmove(pCol->pData, POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * pointsToPop), pCol->len);
H
TD-166  
hzcheng 已提交
254 255 256
  }
}

H
TD-166  
hzcheng 已提交
257 258 259 260 261
bool isNEleNull(SDataCol *pCol, int nEle) {
  switch (pCol->type) {
    case TSDB_DATA_TYPE_BINARY:
    case TSDB_DATA_TYPE_NCHAR:
      for (int i = 0; i < nEle; i++) {
H
Hongze Cheng 已提交
262
        if (!isNull(tdGetColDataOfRow(pCol, i), pCol->type)) return false;
H
TD-166  
hzcheng 已提交
263 264 265 266 267 268 269 270 271 272
      }
      return true;
    default:
      for (int i = 0; i < nEle; i++) {
        if (!isNull(tdGetColDataOfRow(pCol, i), pCol->type)) return false;
      }
      return true;
  }
}

H
TD-90  
Hongze Cheng 已提交
273 274 275 276
void dataColSetNullAt(SDataCol *pCol, int index) {
  if (IS_VAR_DATA_TYPE(pCol->type)) {
    pCol->dataOff[index] = pCol->len;
    char *ptr = POINTER_SHIFT(pCol->pData, pCol->len);
277
    setVardataNull(ptr, pCol->type);
H
TD-90  
Hongze Cheng 已提交
278 279 280 281 282 283 284
    pCol->len += varDataTLen(ptr);
  } else {
    setNull(POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * index), pCol->type, pCol->bytes);
    pCol->len += TYPE_BYTES[pCol->type];
  }
}

H
TD-166  
hzcheng 已提交
285
void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints) {
H
TD-166  
hzcheng 已提交
286

H
TD-90  
Hongze Cheng 已提交
287 288 289 290 291 292 293 294
  if (IS_VAR_DATA_TYPE(pCol->type)) {
    pCol->len = 0;
    for (int i = 0; i < nEle; i++) {
      dataColSetNullAt(pCol, i);
    }
  } else {
    setNullN(pCol->pData, pCol->type, pCol->bytes, nEle);
    pCol->len = TYPE_BYTES[pCol->type] * nEle;
H
TD-166  
hzcheng 已提交
295 296 297
  }
}

H
TD-166  
hzcheng 已提交
298 299 300
void dataColSetOffset(SDataCol *pCol, int nEle) {
  ASSERT(((pCol->type == TSDB_DATA_TYPE_BINARY) || (pCol->type == TSDB_DATA_TYPE_NCHAR)));

H
Hongze Cheng 已提交
301
  void *tptr = pCol->pData;
H
TD-166  
hzcheng 已提交
302
  // char *tptr = (char *)(pCol->pData);
H
TD-166  
hzcheng 已提交
303

H
TD-166  
hzcheng 已提交
304
  VarDataOffsetT offset = 0;
H
TD-166  
hzcheng 已提交
305
  for (int i = 0; i < nEle; i++) {
H
TD-166  
hzcheng 已提交
306
    pCol->dataOff[i] = offset;
H
TD-166  
hzcheng 已提交
307
    offset += varDataTLen(tptr);
H
hzcheng 已提交
308
    tptr = POINTER_SHIFT(tptr, varDataTLen(tptr));
H
TD-166  
hzcheng 已提交
309 310 311
  }
}

H
TD-166  
hzcheng 已提交
312
SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) {
H
TD-34  
hzcheng 已提交
313 314 315 316 317 318
  SDataCols *pCols = (SDataCols *)calloc(1, sizeof(SDataCols) + sizeof(SDataCol) * maxCols);
  if (pCols == NULL) return NULL;

  pCols->maxRowSize = maxRowSize;
  pCols->maxCols = maxCols;
  pCols->maxPoints = maxRows;
H
TD-166  
hzcheng 已提交
319
  pCols->bufSize = maxRowSize * maxRows;
H
TD-34  
hzcheng 已提交
320

H
TD-166  
hzcheng 已提交
321
  pCols->buf = malloc(pCols->bufSize);
H
TD-34  
hzcheng 已提交
322 323
  if (pCols->buf == NULL) {
    free(pCols);
H
TD-34  
hzcheng 已提交
324 325
    return NULL;
  }
H
TD-34  
hzcheng 已提交
326

H
TD-34  
hzcheng 已提交
327 328 329 330 331 332 333 334
  return pCols;
}

void tdInitDataCols(SDataCols *pCols, STSchema *pSchema) {
  // assert(schemaNCols(pSchema) <= pCols->numOfCols);
  tdResetDataCols(pCols);
  pCols->numOfCols = schemaNCols(pSchema);

H
TD-166  
hzcheng 已提交
335
  void *ptr = pCols->buf;
H
TD-34  
hzcheng 已提交
336
  for (int i = 0; i < schemaNCols(pSchema); i++) {
H
TD-166  
hzcheng 已提交
337
    dataColInit(pCols->cols + i, schemaColAt(pSchema, i), &ptr, pCols->maxPoints);
H
TD-166  
hzcheng 已提交
338
    ASSERT((char *)ptr - (char *)(pCols->buf) <= pCols->bufSize);
H
TD-34  
hzcheng 已提交
339
  }
H
TD-34  
hzcheng 已提交
340 341 342 343
}

void tdFreeDataCols(SDataCols *pCols) {
  if (pCols) {
H
TD-166  
hzcheng 已提交
344
    tfree(pCols->buf);
H
TD-34  
hzcheng 已提交
345 346 347 348
    free(pCols);
  }
}

H
TD-100  
hzcheng 已提交
349
SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
H
TD-166  
hzcheng 已提交
350
  SDataCols *pRet = tdNewDataCols(pDataCols->maxRowSize, pDataCols->maxCols, pDataCols->maxPoints);
H
TD-100  
hzcheng 已提交
351 352 353 354
  if (pRet == NULL) return NULL;

  pRet->numOfCols = pDataCols->numOfCols;
  pRet->sversion = pDataCols->sversion;
H
Haojun Liao 已提交
355
  if (keepData) pRet->numOfRows = pDataCols->numOfRows;
H
TD-100  
hzcheng 已提交
356 357 358 359 360 361

  for (int i = 0; i < pDataCols->numOfCols; i++) {
    pRet->cols[i].type = pDataCols->cols[i].type;
    pRet->cols[i].colId = pDataCols->cols[i].colId;
    pRet->cols[i].bytes = pDataCols->cols[i].bytes;
    pRet->cols[i].offset = pDataCols->cols[i].offset;
H
TD-166  
hzcheng 已提交
362 363

    pRet->cols[i].spaceSize = pDataCols->cols[i].spaceSize;
H
TD-100  
hzcheng 已提交
364
    pRet->cols[i].pData = (void *)((char *)pRet->buf + ((char *)(pDataCols->cols[i].pData) - (char *)(pDataCols->buf)));
H
TD-100  
hzcheng 已提交
365

H
TD-166  
hzcheng 已提交
366 367 368 369 370 371 372 373
    if (pRet->cols[i].type == TSDB_DATA_TYPE_BINARY || pRet->cols[i].type == TSDB_DATA_TYPE_NCHAR) {
      ASSERT(pDataCols->cols[i].dataOff != NULL);
      pRet->cols[i].dataOff =
          (int32_t *)((char *)pRet->buf + ((char *)(pDataCols->cols[i].dataOff) - (char *)(pDataCols->buf)));
    }

    if (keepData) {
      pRet->cols[i].len = pDataCols->cols[i].len;
H
Hongze Cheng 已提交
374 375 376 377 378
      if (pDataCols->cols[i].len > 0) {
        memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pDataCols->cols[i].len);
        if (pRet->cols[i].type == TSDB_DATA_TYPE_BINARY || pRet->cols[i].type == TSDB_DATA_TYPE_NCHAR) {
          memcpy(pRet->cols[i].dataOff, pDataCols->cols[i].dataOff, sizeof(VarDataOffsetT) * pDataCols->maxPoints);
        }
H
TD-166  
hzcheng 已提交
379 380
      }
    }
H
TD-100  
hzcheng 已提交
381 382 383 384 385
  }

  return pRet;
}

H
TD-34  
hzcheng 已提交
386
void tdResetDataCols(SDataCols *pCols) {
H
Haojun Liao 已提交
387
  pCols->numOfRows = 0;
H
TD-34  
hzcheng 已提交
388
  for (int i = 0; i < pCols->maxCols; i++) {
H
TD-166  
hzcheng 已提交
389
    dataColReset(pCols->cols + i);
H
TD-34  
hzcheng 已提交
390 391 392
  }
}

H
TD-90  
Hongze Cheng 已提交
393
void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols) {
H
TD-166  
hzcheng 已提交
394 395
  ASSERT(dataColsKeyLast(pCols) < dataRowKey(row));

H
TD-90  
Hongze Cheng 已提交
396 397 398 399 400 401 402 403 404 405
  int rcol = 0;
  int dcol = 0;

  while (dcol < pCols->numOfCols) {
    SDataCol *pDataCol = &(pCols->cols[dcol]);
    if (rcol >= schemaNCols(pSchema)) {
      dataColSetNullAt(pDataCol, pCols->numOfRows);
      dcol++;
      continue;
    }
H
TD-166  
hzcheng 已提交
406

H
TD-90  
Hongze Cheng 已提交
407 408
    STColumn *pRowCol = schemaColAt(pSchema, rcol);
    if (pRowCol->colId == pDataCol->colId) {
H
TD-90  
Hongze Cheng 已提交
409 410
      void *value = tdGetRowDataOfCol(row, pRowCol->type, pRowCol->offset+TD_DATA_ROW_HEAD_SIZE);
      dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints);
H
TD-90  
Hongze Cheng 已提交
411 412 413 414 415 416 417 418
      dcol++;
      rcol++;
    } else if (pRowCol->colId < pDataCol->colId) {
      rcol++;
    } else {
      dataColSetNullAt(pDataCol, pCols->numOfRows);
      dcol++;
    }
H
TD-34  
hzcheng 已提交
419
  }
H
Haojun Liao 已提交
420
  pCols->numOfRows++;
H
TD-34  
hzcheng 已提交
421
}
H
TD-166  
hzcheng 已提交
422

H
TD-34  
hzcheng 已提交
423 424
// Pop pointsToPop points from the SDataCols
void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop) {
H
Haojun Liao 已提交
425
  int pointsLeft = pCols->numOfRows - pointsToPop;
H
TD-166  
hzcheng 已提交
426
  if (pointsLeft <= 0) {
H
TD-166  
hzcheng 已提交
427 428 429
    tdResetDataCols(pCols);
    return;
  }
H
TD-34  
hzcheng 已提交
430 431

  for (int iCol = 0; iCol < pCols->numOfCols; iCol++) {
H
TD-166  
hzcheng 已提交
432
    SDataCol *pCol = pCols->cols + iCol;
H
Haojun Liao 已提交
433
    dataColPopPoints(pCol, pointsToPop, pCols->numOfRows);
H
TD-34  
hzcheng 已提交
434
  }
H
Haojun Liao 已提交
435
  pCols->numOfRows = pointsLeft;
H
TD-34  
hzcheng 已提交
436
}
H
TD-34  
hzcheng 已提交
437

H
hzcheng 已提交
438
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) {
H
Haojun Liao 已提交
439 440
  ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows);
  ASSERT(target->numOfRows + rowsToMerge <= target->maxPoints);
H
TD-166  
hzcheng 已提交
441
  ASSERT(target->numOfCols == source->numOfCols);
H
TD-100  
hzcheng 已提交
442

H
TD-166  
hzcheng 已提交
443
  SDataCols *pTarget = NULL;
H
TD-100  
hzcheng 已提交
444

H
TD-166  
hzcheng 已提交
445 446 447
  if (dataColsKeyLast(target) < dataColsKeyFirst(source)) {  // No overlap
    for (int i = 0; i < rowsToMerge; i++) {
      for (int j = 0; j < source->numOfCols; j++) {
H
Hongze Cheng 已提交
448 449 450 451
        if (source->cols[j].len > 0) {
          dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i), target->numOfRows,
                           target->maxPoints);
        }
H
TD-166  
hzcheng 已提交
452
      }
H
Haojun Liao 已提交
453
      target->numOfRows++;
H
TD-166  
hzcheng 已提交
454 455 456 457 458 459 460
    }
  } else {
    pTarget = tdDupDataCols(target, true);
    if (pTarget == NULL) goto _err;

    int iter1 = 0;
    int iter2 = 0;
H
TD-521  
Hongze Cheng 已提交
461 462
    tdMergeTwoDataCols(target, pTarget, &iter1, pTarget->numOfRows, source, &iter2, source->numOfRows,
                       pTarget->numOfRows + rowsToMerge);
H
TD-166  
hzcheng 已提交
463
  }
H
TD-100  
hzcheng 已提交
464 465 466 467 468 469 470 471

  tdFreeDataCols(pTarget);
  return 0;

_err:
  tdFreeDataCols(pTarget);
  return -1;
}
H
TD-100  
hzcheng 已提交
472

H
TD-521  
Hongze Cheng 已提交
473
void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, int limit2, int tRows) {
H
TD-100  
hzcheng 已提交
474
  tdResetDataCols(target);
H
TD-521  
Hongze Cheng 已提交
475
  ASSERT(limit1 <= src1->numOfRows && limit2 <= src2->numOfRows);
H
TD-100  
hzcheng 已提交
476

H
Haojun Liao 已提交
477
  while (target->numOfRows < tRows) {
H
TD-521  
Hongze Cheng 已提交
478
    if (*iter1 >= limit1 && *iter2 >= limit2) break;
H
TD-100  
hzcheng 已提交
479

H
TD-521  
Hongze Cheng 已提交
480 481
    TSKEY key1 = (*iter1 >= limit1) ? INT64_MAX : ((TSKEY *)(src1->cols[0].pData))[*iter1];
    TSKEY key2 = (*iter2 >= limit2) ? INT64_MAX : ((TSKEY *)(src2->cols[0].pData))[*iter2];
H
TD-100  
hzcheng 已提交
482

H
Hongze Cheng 已提交
483
    if (key1 <= key2) {
H
TD-100  
hzcheng 已提交
484 485
      for (int i = 0; i < src1->numOfCols; i++) {
        ASSERT(target->cols[i].type == src1->cols[i].type);
H
Hongze Cheng 已提交
486 487 488 489
        if (src1->cols[i].len > 0) {
          dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows,
                           target->maxPoints);
        }
H
TD-100  
hzcheng 已提交
490 491
      }

H
Haojun Liao 已提交
492
      target->numOfRows++;
H
TD-100  
hzcheng 已提交
493
      (*iter1)++;
H
Hongze Cheng 已提交
494 495
      if (key1 == key2) (*iter2)++;
    } else {
H
TD-100  
hzcheng 已提交
496 497
      for (int i = 0; i < src2->numOfCols; i++) {
        ASSERT(target->cols[i].type == src2->cols[i].type);
H
Hongze Cheng 已提交
498 499 500 501
        if (src2->cols[i].len > 0) {
          dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src2->cols + i, *iter2), target->numOfRows,
                           target->maxPoints);
        }
H
TD-100  
hzcheng 已提交
502
      }
H
TD-100  
hzcheng 已提交
503

H
Haojun Liao 已提交
504
      target->numOfRows++;
H
TD-100  
hzcheng 已提交
505
      (*iter2)++;
H
TD-100  
hzcheng 已提交
506 507
    }
  }
H
Hongze Cheng 已提交
508 509
}

H
Hongze Cheng 已提交
510 511
SKVRow tdKVRowDup(SKVRow row) {
  SKVRow trow = malloc(kvRowLen(row));
H
Hongze Cheng 已提交
512 513
  if (trow == NULL) return NULL;

H
Hongze Cheng 已提交
514
  kvRowCpy(trow, row);
H
Hongze Cheng 已提交
515 516 517
  return trow;
}

H
TD-90  
Hongze Cheng 已提交
518 519 520 521 522 523 524
int tdSetKVRowDataOfCol(SKVRow *orow, int16_t colId, int8_t type, void *value) {
  SColIdx *pColIdx = NULL;
  SKVRow   row = *orow;
  SKVRow   nrow = NULL;
  void *   ptr = taosbsearch(&colId, kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), comparTagId, TD_GE);

  if (ptr == NULL || ((SColIdx *)ptr)->colId < colId) { // need to add a column value to the row
H
TD-90  
Hongze Cheng 已提交
525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562
    int diff = IS_VAR_DATA_TYPE(type) ? varDataTLen(value) : TYPE_BYTES[type];
    nrow = malloc(kvRowLen(row) + sizeof(SColIdx) + diff);
    if (nrow == NULL) return -1;

    kvRowSetLen(nrow, kvRowLen(row) + sizeof(SColIdx) + diff);
    kvRowSetNCols(nrow, kvRowNCols(row) + 1);

    if (ptr == NULL) {
      memcpy(kvRowColIdx(nrow), kvRowColIdx(row), sizeof(SColIdx) * kvRowNCols(row));
      memcpy(kvRowValues(nrow), kvRowValues(row), POINTER_DISTANCE(kvRowEnd(row), kvRowValues(row)));
      int colIdx = kvRowNCols(nrow) - 1;
      kvRowColIdxAt(nrow, colIdx)->colId = colId;
      kvRowColIdxAt(nrow, colIdx)->offset = POINTER_DISTANCE(kvRowEnd(row), kvRowValues(row));
      memcpy(kvRowColVal(nrow, kvRowColIdxAt(nrow, colIdx)), value, diff);
    } else {
      int16_t tlen = POINTER_DISTANCE(ptr, kvRowColIdx(row));
      if (tlen > 0) {
        memcpy(kvRowColIdx(nrow), kvRowColIdx(row), tlen);
        memcpy(kvRowValues(nrow), kvRowValues(row), ((SColIdx *)ptr)->offset);
      }

      int colIdx = tlen / sizeof(SColIdx);
      kvRowColIdxAt(nrow, colIdx)->colId = colId;
      kvRowColIdxAt(nrow, colIdx)->offset = ((SColIdx *)ptr)->offset;
      memcpy(kvRowColVal(nrow, kvRowColIdxAt(nrow, colIdx)), value, diff);

      for (int i = colIdx; i < kvRowNCols(row); i++) {
        kvRowColIdxAt(nrow, i + 1)->colId = kvRowColIdxAt(row, i)->colId;
        kvRowColIdxAt(nrow, i + 1)->offset = kvRowColIdxAt(row, i)->offset + diff;
      }
      memcpy(kvRowColVal(nrow, kvRowColIdxAt(nrow, colIdx + 1)), kvRowColVal(row, kvRowColIdxAt(row, colIdx)),
             POINTER_DISTANCE(kvRowEnd(row), kvRowColVal(row, kvRowColIdxAt(row, colIdx)))

      );
    }

    *orow = nrow;
    free(row);
H
TD-90  
Hongze Cheng 已提交
563 564 565 566 567 568 569 570 571 572 573 574
  } else {
    ASSERT(((SColIdx *)ptr)->colId == colId);
    if (IS_VAR_DATA_TYPE(type)) {
      void *pOldVal = kvRowColVal(row, (SColIdx *)ptr);

      if (varDataTLen(value) == varDataTLen(pOldVal)) { // just update the column value in place
        memcpy(pOldVal, value, varDataTLen(value));
      } else { // need to reallocate the memory
        int16_t diff = varDataTLen(value) - varDataTLen(pOldVal);
        int16_t nlen = kvRowLen(row) + diff;
        ASSERT(nlen > 0);
        nrow = malloc(nlen);
H
TD-90  
Hongze Cheng 已提交
575
        if (nrow == NULL) return -1;
H
TD-90  
Hongze Cheng 已提交
576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599

        kvRowSetLen(nrow, nlen);
        kvRowSetNCols(nrow, kvRowNCols(row));

        // Copy part ahead
        nlen = POINTER_DISTANCE(ptr, kvRowColIdx(row));
        ASSERT(nlen % sizeof(SColIdx) == 0);
        if (nlen > 0) {
          ASSERT(((SColIdx *)ptr)->offset > 0);
          memcpy(kvRowColIdx(nrow), kvRowColIdx(row), nlen);
          memcpy(kvRowValues(nrow), kvRowValues(row), ((SColIdx *)ptr)->offset);
        }

        // Construct current column value
        int colIdx = nlen / sizeof(SColIdx);
        pColIdx = kvRowColIdxAt(nrow, colIdx);
        pColIdx->colId = ((SColIdx *)ptr)->colId;
        pColIdx->offset = ((SColIdx *)ptr)->offset;
        memcpy(kvRowColVal(nrow, pColIdx), value, varDataTLen(value));
 
        // Construct columns after
        if (kvRowNCols(nrow) - colIdx - 1 > 0) {
          for (int i = colIdx + 1; i < kvRowNCols(nrow); i++) {
            kvRowColIdxAt(nrow, i)->colId = kvRowColIdxAt(row, i)->colId;
H
Hongze Cheng 已提交
600
            kvRowColIdxAt(nrow, i)->offset = kvRowColIdxAt(row, i)->offset + diff;
H
TD-90  
Hongze Cheng 已提交
601 602 603 604 605 606
          }
          memcpy(kvRowColVal(nrow, kvRowColIdxAt(nrow, colIdx + 1)), kvRowColVal(row, kvRowColIdxAt(row, colIdx + 1)),
                 POINTER_DISTANCE(kvRowEnd(row), kvRowColVal(row, kvRowColIdxAt(row, colIdx + 1))));
        }

        *orow = nrow;
H
TD-90  
Hongze Cheng 已提交
607
        free(row);
H
TD-90  
Hongze Cheng 已提交
608 609 610 611 612 613 614
      }
    } else {
      memcpy(kvRowColVal(row, (SColIdx *)ptr), value, TYPE_BYTES[type]);
    }
  }

  return 0;
H
Hongze Cheng 已提交
615 616
}

H
TD-353  
Hongze Cheng 已提交
617
int tdEncodeKVRow(void **buf, SKVRow row) {
H
Hongze Cheng 已提交
618
  // May change the encode purpose
H
TD-353  
Hongze Cheng 已提交
619 620 621 622 623 624
  if (buf != NULL) {
    kvRowCpy(*buf, row);
    *buf = POINTER_SHIFT(*buf, kvRowLen(row));
  }

  return kvRowLen(row);
H
Hongze Cheng 已提交
625 626
}

H
Hongze Cheng 已提交
627 628
void *tdDecodeKVRow(void *buf, SKVRow *row) {
  *row = tdKVRowDup(buf);
H
TD-353  
Hongze Cheng 已提交
629
  if (*row == NULL) return NULL;
H
Hongze Cheng 已提交
630
  return POINTER_SHIFT(buf, kvRowLen(*row));
H
Hongze Cheng 已提交
631 632
}

H
Hongze Cheng 已提交
633
int tdInitKVRowBuilder(SKVRowBuilder *pBuilder) {
H
Hongze Cheng 已提交
634 635 636 637 638 639 640 641 642 643 644 645 646 647
  pBuilder->tCols = 128;
  pBuilder->nCols = 0;
  pBuilder->pColIdx = (SColIdx *)malloc(sizeof(SColIdx) * pBuilder->tCols);
  if (pBuilder->pColIdx == NULL) return -1;
  pBuilder->alloc = 1024;
  pBuilder->size = 0;
  pBuilder->buf = malloc(pBuilder->alloc);
  if (pBuilder->buf == NULL) {
    free(pBuilder->pColIdx);
    return -1;
  }
  return 0;
}

H
Hongze Cheng 已提交
648
void tdDestroyKVRowBuilder(SKVRowBuilder *pBuilder) {
H
Hongze Cheng 已提交
649 650 651 652
  tfree(pBuilder->pColIdx);
  tfree(pBuilder->buf);
}

H
Hongze Cheng 已提交
653
void tdResetKVRowBuilder(SKVRowBuilder *pBuilder) {
H
Hongze Cheng 已提交
654 655 656 657
  pBuilder->nCols = 0;
  pBuilder->size = 0;
}

H
Hongze Cheng 已提交
658
SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder) {
H
Hongze Cheng 已提交
659 660 661
  int tlen = sizeof(SColIdx) * pBuilder->nCols + pBuilder->size;
  if (tlen == 0) return NULL;

H
Hongze Cheng 已提交
662 663 664
  tlen += TD_KV_ROW_HEAD_SIZE;

  SKVRow row = malloc(tlen);
H
Hongze Cheng 已提交
665 666
  if (row == NULL) return NULL;

H
Hongze Cheng 已提交
667
  kvRowSetNCols(row, pBuilder->nCols);
H
Hongze Cheng 已提交
668
  kvRowSetLen(row, tlen);
H
Hongze Cheng 已提交
669

H
Hongze Cheng 已提交
670 671
  memcpy(kvRowColIdx(row), pBuilder->pColIdx, sizeof(SColIdx) * pBuilder->nCols);
  memcpy(kvRowValues(row), pBuilder->buf, pBuilder->size);
H
Hongze Cheng 已提交
672 673

  return row;
H
TD-100  
hzcheng 已提交
674
}