tdataformat.c 21.1 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
S
slguan 已提交
15
#include "tdataformat.h"
T
Tao Liu 已提交
16
#include "talgo.h"
H
TD-353  
Hongze Cheng 已提交
17
#include "tcoding.h"
H
Hongze Cheng 已提交
18
#include "wchar.h"
H
more  
hzcheng 已提交
19

H
hzcheng 已提交
20 21 22 23
/**
 * Duplicate the schema and return a new object
 */
STSchema *tdDupSchema(STSchema *pSchema) {
H
Hongze Cheng 已提交
24 25 26

  int tlen = sizeof(STSchema) + sizeof(STColumn) * schemaNCols(pSchema);
  STSchema *tSchema = (STSchema *)malloc(tlen);
H
hzcheng 已提交
27 28
  if (tSchema == NULL) return NULL;

H
Hongze Cheng 已提交
29
  memcpy((void *)tSchema, (void *)pSchema, tlen);
H
hzcheng 已提交
30 31 32 33

  return tSchema;
}

H
TD-27  
hzcheng 已提交
34 35 36
/**
 * Encode a schema to dst, and return the next pointer
 */
H
TD-353  
Hongze Cheng 已提交
37 38 39 40
int tdEncodeSchema(void **buf, STSchema *pSchema) {
  int tlen = 0;
  tlen += taosEncodeFixedI32(buf, schemaVersion(pSchema));
  tlen += taosEncodeFixedI32(buf, schemaNCols(pSchema));
H
TD-166  
hzcheng 已提交
41

H
TD-27  
hzcheng 已提交
42 43
  for (int i = 0; i < schemaNCols(pSchema); i++) {
    STColumn *pCol = schemaColAt(pSchema, i);
H
TD-353  
Hongze Cheng 已提交
44 45
    tlen += taosEncodeFixedI8(buf, colType(pCol));
    tlen += taosEncodeFixedI16(buf, colColId(pCol));
46
    tlen += taosEncodeFixedI16(buf, colBytes(pCol));
H
TD-27  
hzcheng 已提交
47 48
  }

H
TD-353  
Hongze Cheng 已提交
49
  return tlen;
H
TD-27  
hzcheng 已提交
50 51 52 53 54
}

/**
 * Decode a schema from a binary.
 */
H
TD-353  
Hongze Cheng 已提交
55
void *tdDecodeSchema(void *buf, STSchema **pRSchema) {
H
Hongze Cheng 已提交
56
  int version = 0;
H
TD-353  
Hongze Cheng 已提交
57
  int numOfCols = 0;
H
TD-353  
Hongze Cheng 已提交
58
  STSchemaBuilder schemaBuilder;
H
TD-27  
hzcheng 已提交
59

H
TD-353  
Hongze Cheng 已提交
60 61
  buf = taosDecodeFixedI32(buf, &version);
  buf = taosDecodeFixedI32(buf, &numOfCols);
H
TD-27  
hzcheng 已提交
62

H
Hongze Cheng 已提交
63 64
  if (tdInitTSchemaBuilder(&schemaBuilder, version) < 0) return NULL;

H
TD-353  
Hongze Cheng 已提交
65
  for (int i = 0; i < numOfCols; i++) {
H
TD-27  
hzcheng 已提交
66 67
    int8_t  type = 0;
    int16_t colId = 0;
68
    int16_t bytes = 0;
H
TD-353  
Hongze Cheng 已提交
69 70
    buf = taosDecodeFixedI8(buf, &type);
    buf = taosDecodeFixedI16(buf, &colId);
71
    buf = taosDecodeFixedI16(buf, &bytes);
H
Hongze Cheng 已提交
72 73 74 75
    if (tdAddColToSchema(&schemaBuilder, type, colId, bytes) < 0) {
      tdDestroyTSchemaBuilder(&schemaBuilder);
      return NULL;
    }
H
TD-27  
hzcheng 已提交
76 77
  }

H
TD-353  
Hongze Cheng 已提交
78
  *pRSchema = tdGetSchemaFromBuilder(&schemaBuilder);
H
Hongze Cheng 已提交
79
  tdDestroyTSchemaBuilder(&schemaBuilder);
H
TD-353  
Hongze Cheng 已提交
80
  return buf;
H
Hongze Cheng 已提交
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
}

int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) {
  if (pBuilder == NULL) return -1;

  pBuilder->tCols = 256;
  pBuilder->columns = (STColumn *)malloc(sizeof(STColumn) * pBuilder->tCols);
  if (pBuilder->columns == NULL) return -1;

  tdResetTSchemaBuilder(pBuilder, version);
  return 0;
}

void tdDestroyTSchemaBuilder(STSchemaBuilder *pBuilder) {
  if (pBuilder) {
    tfree(pBuilder->columns);
  }
}

void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) {
  pBuilder->nCols = 0;
  pBuilder->tlen = 0;
  pBuilder->flen = 0;
T
Tao Liu 已提交
104
  pBuilder->vlen = 0;
H
Hongze Cheng 已提交
105 106 107
  pBuilder->version = version;
}

108
int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int16_t colId, int16_t bytes) {
109
  if (!isValidDataType(type)) return -1;
H
Hongze Cheng 已提交
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128

  if (pBuilder->nCols >= pBuilder->tCols) {
    pBuilder->tCols *= 2;
    pBuilder->columns = (STColumn *)realloc(pBuilder->columns, sizeof(STColumn) * pBuilder->tCols);
    if (pBuilder->columns == NULL) return -1;
  }

  STColumn *pCol = &(pBuilder->columns[pBuilder->nCols]);
  colSetType(pCol, type);
  colSetColId(pCol, colId);
  if (pBuilder->nCols == 0) {
    colSetOffset(pCol, 0);
  } else {
    STColumn *pTCol = &(pBuilder->columns[pBuilder->nCols-1]);
    colSetOffset(pCol, pTCol->offset + TYPE_BYTES[pTCol->type]);
  }

  if (IS_VAR_DATA_TYPE(type)) {
    colSetBytes(pCol, bytes);
T
Tao Liu 已提交
129 130
    pBuilder->tlen += (TYPE_BYTES[type] + bytes);
    pBuilder->vlen += bytes - sizeof(VarDataLenT);
H
Hongze Cheng 已提交
131 132 133
  } else {
    colSetBytes(pCol, TYPE_BYTES[type]);
    pBuilder->tlen += TYPE_BYTES[type];
T
Tao Liu 已提交
134
    pBuilder->vlen += TYPE_BYTES[type];
H
Hongze Cheng 已提交
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
  }

  pBuilder->nCols++;
  pBuilder->flen += TYPE_BYTES[type];

  ASSERT(pCol->offset < pBuilder->flen);

  return 0;
}

STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder) {
  if (pBuilder->nCols <= 0) return NULL;

  int tlen = sizeof(STSchema) + sizeof(STColumn) * pBuilder->nCols;

  STSchema *pSchema = (STSchema *)malloc(tlen);
  if (pSchema == NULL) return NULL;

  schemaVersion(pSchema) = pBuilder->version;
  schemaNCols(pSchema) = pBuilder->nCols;
  schemaTLen(pSchema) = pBuilder->tlen;
  schemaFLen(pSchema) = pBuilder->flen;
T
Tao Liu 已提交
157
  schemaVLen(pSchema) = pBuilder->vlen;
H
Hongze Cheng 已提交
158 159 160

  memcpy(schemaColAt(pSchema, 0), pBuilder->columns, sizeof(STColumn) * pBuilder->nCols);

H
TD-27  
hzcheng 已提交
161 162 163
  return pSchema;
}

H
hzcheng 已提交
164 165 166
/**
 * Initialize a data row
 */
H
TD-90  
Hongze Cheng 已提交
167 168 169 170
void tdInitDataRow(SDataRow row, STSchema *pSchema) {
  dataRowSetLen(row, TD_DATA_ROW_HEAD_SIZE + schemaFLen(pSchema));
  dataRowSetVersion(row, schemaVersion(pSchema));
}
H
hzcheng 已提交
171

H
TD-166  
hzcheng 已提交
172
SDataRow tdNewDataRowFromSchema(STSchema *pSchema) {
H
TD-166  
hzcheng 已提交
173
  int32_t size = dataRowMaxBytesFromSchema(pSchema);
H
hzcheng 已提交
174 175 176 177

  SDataRow row = malloc(size);
  if (row == NULL) return NULL;

H
hzcheng 已提交
178
  tdInitDataRow(row, pSchema);
H
hzcheng 已提交
179
  return row;
H
TD-166  
hzcheng 已提交
180
}
H
hzcheng 已提交
181

H
hzcheng 已提交
182 183 184 185 186 187 188
/**
 * Free the SDataRow object
 */
void tdFreeDataRow(SDataRow row) {
  if (row) free(row);
}

H
hzcheng 已提交
189
SDataRow tdDataRowDup(SDataRow row) {
H
hzcheng 已提交
190
  SDataRow trow = malloc(dataRowLen(row));
H
hzcheng 已提交
191 192 193
  if (trow == NULL) return NULL;

  dataRowCpy(trow, row);
H
hzcheng 已提交
194
  return trow;
H
hzcheng 已提交
195
}
H
hzcheng 已提交
196

H
TD-166  
hzcheng 已提交
197 198 199 200
void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints) {
  pDataCol->type = colType(pCol);
  pDataCol->colId = colColId(pCol);
  pDataCol->bytes = colBytes(pCol);
H
TD-166  
hzcheng 已提交
201
  pDataCol->offset = colOffset(pCol) + TD_DATA_ROW_HEAD_SIZE;
H
TD-166  
hzcheng 已提交
202 203 204 205

  pDataCol->len = 0;
  if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) {
    pDataCol->dataOff = (VarDataOffsetT *)(*pBuf);
H
Hongze Cheng 已提交
206 207 208
    pDataCol->pData = POINTER_SHIFT(*pBuf, sizeof(VarDataOffsetT) * maxPoints);
    pDataCol->spaceSize = pDataCol->bytes * maxPoints;
    *pBuf = POINTER_SHIFT(*pBuf, pDataCol->spaceSize + sizeof(VarDataOffsetT) * maxPoints);
H
TD-166  
hzcheng 已提交
209 210 211 212
  } else {
    pDataCol->spaceSize = pDataCol->bytes * maxPoints;
    pDataCol->dataOff = NULL;
    pDataCol->pData = *pBuf;
H
hzcheng 已提交
213
    *pBuf = POINTER_SHIFT(*pBuf, pDataCol->spaceSize);
H
TD-166  
hzcheng 已提交
214 215 216
  }
}

H
Haojun Liao 已提交
217
void dataColAppendVal(SDataCol *pCol, void *value, int numOfRows, int maxPoints) {
H
TD-166  
hzcheng 已提交
218 219 220 221 222 223
  ASSERT(pCol != NULL && value != NULL);

  switch (pCol->type) {
    case TSDB_DATA_TYPE_BINARY:
    case TSDB_DATA_TYPE_NCHAR:
      // set offset
H
Haojun Liao 已提交
224
      pCol->dataOff[numOfRows] = pCol->len;
H
TD-166  
hzcheng 已提交
225
      // Copy data
H
hzcheng 已提交
226
      memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, varDataTLen(value));
H
TD-166  
hzcheng 已提交
227
      // Update the length
H
TD-166  
hzcheng 已提交
228
      pCol->len += varDataTLen(value);
H
TD-166  
hzcheng 已提交
229 230
      break;
    default:
H
Haojun Liao 已提交
231
      ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows);
H
hzcheng 已提交
232
      memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, pCol->bytes);
H
TD-166  
hzcheng 已提交
233 234 235 236 237
      pCol->len += pCol->bytes;
      break;
  }
}

H
Haojun Liao 已提交
238 239
void dataColPopPoints(SDataCol *pCol, int pointsToPop, int numOfRows) {
  int pointsLeft = numOfRows - pointsToPop;
H
TD-166  
hzcheng 已提交
240 241 242 243 244

  ASSERT(pointsLeft > 0);

  if (pCol->type == TSDB_DATA_TYPE_BINARY || pCol->type == TSDB_DATA_TYPE_NCHAR) {
    ASSERT(pCol->len > 0);
H
TD-166  
hzcheng 已提交
245
    VarDataOffsetT toffset = pCol->dataOff[pointsToPop];
H
TD-166  
hzcheng 已提交
246 247
    pCol->len = pCol->len - toffset;
    ASSERT(pCol->len > 0);
H
hzcheng 已提交
248
    memmove(pCol->pData, POINTER_SHIFT(pCol->pData, toffset), pCol->len);
H
TD-166  
hzcheng 已提交
249 250
    dataColSetOffset(pCol, pointsLeft);
  } else {
H
Haojun Liao 已提交
251
    ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows);
H
TD-166  
hzcheng 已提交
252
    pCol->len = TYPE_BYTES[pCol->type] * pointsLeft;
H
hzcheng 已提交
253
    memmove(pCol->pData, POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * pointsToPop), pCol->len);
H
TD-166  
hzcheng 已提交
254 255 256
  }
}

H
TD-166  
hzcheng 已提交
257 258 259 260 261
bool isNEleNull(SDataCol *pCol, int nEle) {
  switch (pCol->type) {
    case TSDB_DATA_TYPE_BINARY:
    case TSDB_DATA_TYPE_NCHAR:
      for (int i = 0; i < nEle; i++) {
H
Hongze Cheng 已提交
262
        if (!isNull(tdGetColDataOfRow(pCol, i), pCol->type)) return false;
H
TD-166  
hzcheng 已提交
263 264 265 266 267 268 269 270 271 272
      }
      return true;
    default:
      for (int i = 0; i < nEle; i++) {
        if (!isNull(tdGetColDataOfRow(pCol, i), pCol->type)) return false;
      }
      return true;
  }
}

H
TD-90  
Hongze Cheng 已提交
273 274 275 276
void dataColSetNullAt(SDataCol *pCol, int index) {
  if (IS_VAR_DATA_TYPE(pCol->type)) {
    pCol->dataOff[index] = pCol->len;
    char *ptr = POINTER_SHIFT(pCol->pData, pCol->len);
277
    setVardataNull(ptr, pCol->type);
H
TD-90  
Hongze Cheng 已提交
278 279 280 281 282 283 284
    pCol->len += varDataTLen(ptr);
  } else {
    setNull(POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * index), pCol->type, pCol->bytes);
    pCol->len += TYPE_BYTES[pCol->type];
  }
}

H
TD-166  
hzcheng 已提交
285
void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints) {
H
TD-166  
hzcheng 已提交
286

H
TD-90  
Hongze Cheng 已提交
287 288 289 290 291 292 293 294
  if (IS_VAR_DATA_TYPE(pCol->type)) {
    pCol->len = 0;
    for (int i = 0; i < nEle; i++) {
      dataColSetNullAt(pCol, i);
    }
  } else {
    setNullN(pCol->pData, pCol->type, pCol->bytes, nEle);
    pCol->len = TYPE_BYTES[pCol->type] * nEle;
H
TD-166  
hzcheng 已提交
295 296 297
  }
}

H
TD-166  
hzcheng 已提交
298 299 300
void dataColSetOffset(SDataCol *pCol, int nEle) {
  ASSERT(((pCol->type == TSDB_DATA_TYPE_BINARY) || (pCol->type == TSDB_DATA_TYPE_NCHAR)));

H
Hongze Cheng 已提交
301
  void *tptr = pCol->pData;
H
TD-166  
hzcheng 已提交
302
  // char *tptr = (char *)(pCol->pData);
H
TD-166  
hzcheng 已提交
303

H
TD-166  
hzcheng 已提交
304
  VarDataOffsetT offset = 0;
H
TD-166  
hzcheng 已提交
305
  for (int i = 0; i < nEle; i++) {
H
TD-166  
hzcheng 已提交
306
    pCol->dataOff[i] = offset;
H
TD-166  
hzcheng 已提交
307
    offset += varDataTLen(tptr);
H
hzcheng 已提交
308
    tptr = POINTER_SHIFT(tptr, varDataTLen(tptr));
H
TD-166  
hzcheng 已提交
309 310 311
  }
}

H
TD-166  
hzcheng 已提交
312
SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) {
H
TD-34  
hzcheng 已提交
313 314 315 316 317 318
  SDataCols *pCols = (SDataCols *)calloc(1, sizeof(SDataCols) + sizeof(SDataCol) * maxCols);
  if (pCols == NULL) return NULL;

  pCols->maxRowSize = maxRowSize;
  pCols->maxCols = maxCols;
  pCols->maxPoints = maxRows;
H
TD-166  
hzcheng 已提交
319
  pCols->bufSize = maxRowSize * maxRows;
H
TD-34  
hzcheng 已提交
320

H
Hongze Cheng 已提交
321
  pCols->buf = malloc(pCols->bufSize);
H
TD-34  
hzcheng 已提交
322 323
  if (pCols->buf == NULL) {
    free(pCols);
H
TD-34  
hzcheng 已提交
324 325
    return NULL;
  }
H
TD-34  
hzcheng 已提交
326

H
TD-34  
hzcheng 已提交
327 328 329 330 331 332 333 334
  return pCols;
}

void tdInitDataCols(SDataCols *pCols, STSchema *pSchema) {
  // assert(schemaNCols(pSchema) <= pCols->numOfCols);
  tdResetDataCols(pCols);
  pCols->numOfCols = schemaNCols(pSchema);

H
TD-166  
hzcheng 已提交
335
  void *ptr = pCols->buf;
H
TD-34  
hzcheng 已提交
336
  for (int i = 0; i < schemaNCols(pSchema); i++) {
H
TD-166  
hzcheng 已提交
337
    dataColInit(pCols->cols + i, schemaColAt(pSchema, i), &ptr, pCols->maxPoints);
H
TD-166  
hzcheng 已提交
338
    ASSERT((char *)ptr - (char *)(pCols->buf) <= pCols->bufSize);
H
TD-34  
hzcheng 已提交
339
  }
H
TD-34  
hzcheng 已提交
340 341 342 343
}

void tdFreeDataCols(SDataCols *pCols) {
  if (pCols) {
H
TD-166  
hzcheng 已提交
344
    tfree(pCols->buf);
H
TD-34  
hzcheng 已提交
345 346 347 348
    free(pCols);
  }
}

H
TD-100  
hzcheng 已提交
349
SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
H
TD-166  
hzcheng 已提交
350
  SDataCols *pRet = tdNewDataCols(pDataCols->maxRowSize, pDataCols->maxCols, pDataCols->maxPoints);
H
TD-100  
hzcheng 已提交
351 352 353 354
  if (pRet == NULL) return NULL;

  pRet->numOfCols = pDataCols->numOfCols;
  pRet->sversion = pDataCols->sversion;
H
Haojun Liao 已提交
355
  if (keepData) pRet->numOfRows = pDataCols->numOfRows;
H
TD-100  
hzcheng 已提交
356 357 358 359 360 361

  for (int i = 0; i < pDataCols->numOfCols; i++) {
    pRet->cols[i].type = pDataCols->cols[i].type;
    pRet->cols[i].colId = pDataCols->cols[i].colId;
    pRet->cols[i].bytes = pDataCols->cols[i].bytes;
    pRet->cols[i].offset = pDataCols->cols[i].offset;
H
TD-166  
hzcheng 已提交
362 363

    pRet->cols[i].spaceSize = pDataCols->cols[i].spaceSize;
H
TD-100  
hzcheng 已提交
364
    pRet->cols[i].pData = (void *)((char *)pRet->buf + ((char *)(pDataCols->cols[i].pData) - (char *)(pDataCols->buf)));
H
TD-100  
hzcheng 已提交
365

H
TD-166  
hzcheng 已提交
366 367 368 369 370 371 372 373
    if (pRet->cols[i].type == TSDB_DATA_TYPE_BINARY || pRet->cols[i].type == TSDB_DATA_TYPE_NCHAR) {
      ASSERT(pDataCols->cols[i].dataOff != NULL);
      pRet->cols[i].dataOff =
          (int32_t *)((char *)pRet->buf + ((char *)(pDataCols->cols[i].dataOff) - (char *)(pDataCols->buf)));
    }

    if (keepData) {
      pRet->cols[i].len = pDataCols->cols[i].len;
H
Hongze Cheng 已提交
374 375 376 377 378
      if (pDataCols->cols[i].len > 0) {
        memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pDataCols->cols[i].len);
        if (pRet->cols[i].type == TSDB_DATA_TYPE_BINARY || pRet->cols[i].type == TSDB_DATA_TYPE_NCHAR) {
          memcpy(pRet->cols[i].dataOff, pDataCols->cols[i].dataOff, sizeof(VarDataOffsetT) * pDataCols->maxPoints);
        }
H
TD-166  
hzcheng 已提交
379 380
      }
    }
H
TD-100  
hzcheng 已提交
381 382 383 384 385
  }

  return pRet;
}

H
TD-34  
hzcheng 已提交
386
void tdResetDataCols(SDataCols *pCols) {
B
Bomin Zhang 已提交
387 388 389 390 391
  if (pCols != NULL) {
    pCols->numOfRows = 0;
    for (int i = 0; i < pCols->maxCols; i++) {
      dataColReset(pCols->cols + i);
    }
H
TD-34  
hzcheng 已提交
392 393 394
  }
}

H
TD-90  
Hongze Cheng 已提交
395
void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols) {
H
TD-166  
hzcheng 已提交
396 397
  ASSERT(dataColsKeyLast(pCols) < dataRowKey(row));

H
TD-90  
Hongze Cheng 已提交
398 399 400 401 402 403 404 405 406 407
  int rcol = 0;
  int dcol = 0;

  while (dcol < pCols->numOfCols) {
    SDataCol *pDataCol = &(pCols->cols[dcol]);
    if (rcol >= schemaNCols(pSchema)) {
      dataColSetNullAt(pDataCol, pCols->numOfRows);
      dcol++;
      continue;
    }
H
TD-166  
hzcheng 已提交
408

H
TD-90  
Hongze Cheng 已提交
409 410
    STColumn *pRowCol = schemaColAt(pSchema, rcol);
    if (pRowCol->colId == pDataCol->colId) {
H
TD-90  
Hongze Cheng 已提交
411 412
      void *value = tdGetRowDataOfCol(row, pRowCol->type, pRowCol->offset+TD_DATA_ROW_HEAD_SIZE);
      dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints);
H
TD-90  
Hongze Cheng 已提交
413 414 415 416 417 418 419 420
      dcol++;
      rcol++;
    } else if (pRowCol->colId < pDataCol->colId) {
      rcol++;
    } else {
      dataColSetNullAt(pDataCol, pCols->numOfRows);
      dcol++;
    }
H
TD-34  
hzcheng 已提交
421
  }
H
Haojun Liao 已提交
422
  pCols->numOfRows++;
H
TD-34  
hzcheng 已提交
423
}
H
TD-166  
hzcheng 已提交
424

H
TD-34  
hzcheng 已提交
425 426
// Pop pointsToPop points from the SDataCols
void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop) {
H
Haojun Liao 已提交
427
  int pointsLeft = pCols->numOfRows - pointsToPop;
H
TD-166  
hzcheng 已提交
428
  if (pointsLeft <= 0) {
H
TD-166  
hzcheng 已提交
429 430 431
    tdResetDataCols(pCols);
    return;
  }
H
TD-34  
hzcheng 已提交
432 433

  for (int iCol = 0; iCol < pCols->numOfCols; iCol++) {
H
TD-166  
hzcheng 已提交
434
    SDataCol *pCol = pCols->cols + iCol;
H
Haojun Liao 已提交
435
    dataColPopPoints(pCol, pointsToPop, pCols->numOfRows);
H
TD-34  
hzcheng 已提交
436
  }
H
Haojun Liao 已提交
437
  pCols->numOfRows = pointsLeft;
H
TD-34  
hzcheng 已提交
438
}
H
TD-34  
hzcheng 已提交
439

H
hzcheng 已提交
440
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) {
H
Haojun Liao 已提交
441 442
  ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows);
  ASSERT(target->numOfRows + rowsToMerge <= target->maxPoints);
H
TD-166  
hzcheng 已提交
443
  ASSERT(target->numOfCols == source->numOfCols);
H
TD-100  
hzcheng 已提交
444

H
TD-166  
hzcheng 已提交
445
  SDataCols *pTarget = NULL;
H
TD-100  
hzcheng 已提交
446

H
TD-166  
hzcheng 已提交
447 448 449
  if (dataColsKeyLast(target) < dataColsKeyFirst(source)) {  // No overlap
    for (int i = 0; i < rowsToMerge; i++) {
      for (int j = 0; j < source->numOfCols; j++) {
H
Hongze Cheng 已提交
450 451 452 453
        if (source->cols[j].len > 0) {
          dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i), target->numOfRows,
                           target->maxPoints);
        }
H
TD-166  
hzcheng 已提交
454
      }
H
Haojun Liao 已提交
455
      target->numOfRows++;
H
TD-166  
hzcheng 已提交
456 457 458 459 460 461 462
    }
  } else {
    pTarget = tdDupDataCols(target, true);
    if (pTarget == NULL) goto _err;

    int iter1 = 0;
    int iter2 = 0;
H
TD-521  
Hongze Cheng 已提交
463 464
    tdMergeTwoDataCols(target, pTarget, &iter1, pTarget->numOfRows, source, &iter2, source->numOfRows,
                       pTarget->numOfRows + rowsToMerge);
H
TD-166  
hzcheng 已提交
465
  }
H
TD-100  
hzcheng 已提交
466 467 468 469 470 471 472 473

  tdFreeDataCols(pTarget);
  return 0;

_err:
  tdFreeDataCols(pTarget);
  return -1;
}
H
TD-100  
hzcheng 已提交
474

H
TD-521  
Hongze Cheng 已提交
475
void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, int limit2, int tRows) {
H
TD-100  
hzcheng 已提交
476
  tdResetDataCols(target);
H
TD-521  
Hongze Cheng 已提交
477
  ASSERT(limit1 <= src1->numOfRows && limit2 <= src2->numOfRows);
H
TD-100  
hzcheng 已提交
478

H
Haojun Liao 已提交
479
  while (target->numOfRows < tRows) {
H
TD-521  
Hongze Cheng 已提交
480
    if (*iter1 >= limit1 && *iter2 >= limit2) break;
H
TD-100  
hzcheng 已提交
481

H
TD-521  
Hongze Cheng 已提交
482 483
    TSKEY key1 = (*iter1 >= limit1) ? INT64_MAX : ((TSKEY *)(src1->cols[0].pData))[*iter1];
    TSKEY key2 = (*iter2 >= limit2) ? INT64_MAX : ((TSKEY *)(src2->cols[0].pData))[*iter2];
H
TD-100  
hzcheng 已提交
484

H
Hongze Cheng 已提交
485
    if (key1 <= key2) {
H
TD-100  
hzcheng 已提交
486 487
      for (int i = 0; i < src1->numOfCols; i++) {
        ASSERT(target->cols[i].type == src1->cols[i].type);
H
Hongze Cheng 已提交
488 489 490 491
        if (src1->cols[i].len > 0) {
          dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows,
                           target->maxPoints);
        }
H
TD-100  
hzcheng 已提交
492 493
      }

H
Haojun Liao 已提交
494
      target->numOfRows++;
H
TD-100  
hzcheng 已提交
495
      (*iter1)++;
H
Hongze Cheng 已提交
496 497
      if (key1 == key2) (*iter2)++;
    } else {
H
TD-100  
hzcheng 已提交
498 499
      for (int i = 0; i < src2->numOfCols; i++) {
        ASSERT(target->cols[i].type == src2->cols[i].type);
H
Hongze Cheng 已提交
500 501 502 503
        if (src2->cols[i].len > 0) {
          dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src2->cols + i, *iter2), target->numOfRows,
                           target->maxPoints);
        }
H
TD-100  
hzcheng 已提交
504
      }
H
TD-100  
hzcheng 已提交
505

H
Haojun Liao 已提交
506
      target->numOfRows++;
H
TD-100  
hzcheng 已提交
507
      (*iter2)++;
H
TD-100  
hzcheng 已提交
508 509
    }
  }
H
Hongze Cheng 已提交
510 511
}

H
Hongze Cheng 已提交
512 513
SKVRow tdKVRowDup(SKVRow row) {
  SKVRow trow = malloc(kvRowLen(row));
H
Hongze Cheng 已提交
514 515
  if (trow == NULL) return NULL;

H
Hongze Cheng 已提交
516
  kvRowCpy(trow, row);
H
Hongze Cheng 已提交
517 518 519
  return trow;
}

B
Bomin Zhang 已提交
520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535
static int compareColIdx(const void* a, const void* b) {
  const SColIdx* x = (const SColIdx*)a;
  const SColIdx* y = (const SColIdx*)b;
  if (x->colId > y->colId) {
    return 1;
  }
  if (x->colId < y->colId) {
    return -1;
  }
  return 0;
}

void tdSortKVRowByColIdx(SKVRow row) {
  qsort(kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), compareColIdx);
}

H
TD-90  
Hongze Cheng 已提交
536 537 538 539 540 541 542
int tdSetKVRowDataOfCol(SKVRow *orow, int16_t colId, int8_t type, void *value) {
  SColIdx *pColIdx = NULL;
  SKVRow   row = *orow;
  SKVRow   nrow = NULL;
  void *   ptr = taosbsearch(&colId, kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), comparTagId, TD_GE);

  if (ptr == NULL || ((SColIdx *)ptr)->colId < colId) { // need to add a column value to the row
H
TD-90  
Hongze Cheng 已提交
543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580
    int diff = IS_VAR_DATA_TYPE(type) ? varDataTLen(value) : TYPE_BYTES[type];
    nrow = malloc(kvRowLen(row) + sizeof(SColIdx) + diff);
    if (nrow == NULL) return -1;

    kvRowSetLen(nrow, kvRowLen(row) + sizeof(SColIdx) + diff);
    kvRowSetNCols(nrow, kvRowNCols(row) + 1);

    if (ptr == NULL) {
      memcpy(kvRowColIdx(nrow), kvRowColIdx(row), sizeof(SColIdx) * kvRowNCols(row));
      memcpy(kvRowValues(nrow), kvRowValues(row), POINTER_DISTANCE(kvRowEnd(row), kvRowValues(row)));
      int colIdx = kvRowNCols(nrow) - 1;
      kvRowColIdxAt(nrow, colIdx)->colId = colId;
      kvRowColIdxAt(nrow, colIdx)->offset = POINTER_DISTANCE(kvRowEnd(row), kvRowValues(row));
      memcpy(kvRowColVal(nrow, kvRowColIdxAt(nrow, colIdx)), value, diff);
    } else {
      int16_t tlen = POINTER_DISTANCE(ptr, kvRowColIdx(row));
      if (tlen > 0) {
        memcpy(kvRowColIdx(nrow), kvRowColIdx(row), tlen);
        memcpy(kvRowValues(nrow), kvRowValues(row), ((SColIdx *)ptr)->offset);
      }

      int colIdx = tlen / sizeof(SColIdx);
      kvRowColIdxAt(nrow, colIdx)->colId = colId;
      kvRowColIdxAt(nrow, colIdx)->offset = ((SColIdx *)ptr)->offset;
      memcpy(kvRowColVal(nrow, kvRowColIdxAt(nrow, colIdx)), value, diff);

      for (int i = colIdx; i < kvRowNCols(row); i++) {
        kvRowColIdxAt(nrow, i + 1)->colId = kvRowColIdxAt(row, i)->colId;
        kvRowColIdxAt(nrow, i + 1)->offset = kvRowColIdxAt(row, i)->offset + diff;
      }
      memcpy(kvRowColVal(nrow, kvRowColIdxAt(nrow, colIdx + 1)), kvRowColVal(row, kvRowColIdxAt(row, colIdx)),
             POINTER_DISTANCE(kvRowEnd(row), kvRowColVal(row, kvRowColIdxAt(row, colIdx)))

      );
    }

    *orow = nrow;
    free(row);
H
TD-90  
Hongze Cheng 已提交
581 582 583 584 585 586 587 588 589 590 591 592
  } else {
    ASSERT(((SColIdx *)ptr)->colId == colId);
    if (IS_VAR_DATA_TYPE(type)) {
      void *pOldVal = kvRowColVal(row, (SColIdx *)ptr);

      if (varDataTLen(value) == varDataTLen(pOldVal)) { // just update the column value in place
        memcpy(pOldVal, value, varDataTLen(value));
      } else { // need to reallocate the memory
        int16_t diff = varDataTLen(value) - varDataTLen(pOldVal);
        int16_t nlen = kvRowLen(row) + diff;
        ASSERT(nlen > 0);
        nrow = malloc(nlen);
H
TD-90  
Hongze Cheng 已提交
593
        if (nrow == NULL) return -1;
H
TD-90  
Hongze Cheng 已提交
594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617

        kvRowSetLen(nrow, nlen);
        kvRowSetNCols(nrow, kvRowNCols(row));

        // Copy part ahead
        nlen = POINTER_DISTANCE(ptr, kvRowColIdx(row));
        ASSERT(nlen % sizeof(SColIdx) == 0);
        if (nlen > 0) {
          ASSERT(((SColIdx *)ptr)->offset > 0);
          memcpy(kvRowColIdx(nrow), kvRowColIdx(row), nlen);
          memcpy(kvRowValues(nrow), kvRowValues(row), ((SColIdx *)ptr)->offset);
        }

        // Construct current column value
        int colIdx = nlen / sizeof(SColIdx);
        pColIdx = kvRowColIdxAt(nrow, colIdx);
        pColIdx->colId = ((SColIdx *)ptr)->colId;
        pColIdx->offset = ((SColIdx *)ptr)->offset;
        memcpy(kvRowColVal(nrow, pColIdx), value, varDataTLen(value));
 
        // Construct columns after
        if (kvRowNCols(nrow) - colIdx - 1 > 0) {
          for (int i = colIdx + 1; i < kvRowNCols(nrow); i++) {
            kvRowColIdxAt(nrow, i)->colId = kvRowColIdxAt(row, i)->colId;
H
Hongze Cheng 已提交
618
            kvRowColIdxAt(nrow, i)->offset = kvRowColIdxAt(row, i)->offset + diff;
H
TD-90  
Hongze Cheng 已提交
619 620 621 622 623 624
          }
          memcpy(kvRowColVal(nrow, kvRowColIdxAt(nrow, colIdx + 1)), kvRowColVal(row, kvRowColIdxAt(row, colIdx + 1)),
                 POINTER_DISTANCE(kvRowEnd(row), kvRowColVal(row, kvRowColIdxAt(row, colIdx + 1))));
        }

        *orow = nrow;
H
TD-90  
Hongze Cheng 已提交
625
        free(row);
H
TD-90  
Hongze Cheng 已提交
626 627 628 629 630 631 632
      }
    } else {
      memcpy(kvRowColVal(row, (SColIdx *)ptr), value, TYPE_BYTES[type]);
    }
  }

  return 0;
H
Hongze Cheng 已提交
633 634
}

H
TD-353  
Hongze Cheng 已提交
635
int tdEncodeKVRow(void **buf, SKVRow row) {
H
Hongze Cheng 已提交
636
  // May change the encode purpose
H
TD-353  
Hongze Cheng 已提交
637 638 639 640 641 642
  if (buf != NULL) {
    kvRowCpy(*buf, row);
    *buf = POINTER_SHIFT(*buf, kvRowLen(row));
  }

  return kvRowLen(row);
H
Hongze Cheng 已提交
643 644
}

H
Hongze Cheng 已提交
645 646
void *tdDecodeKVRow(void *buf, SKVRow *row) {
  *row = tdKVRowDup(buf);
H
TD-353  
Hongze Cheng 已提交
647
  if (*row == NULL) return NULL;
H
Hongze Cheng 已提交
648
  return POINTER_SHIFT(buf, kvRowLen(*row));
H
Hongze Cheng 已提交
649 650
}

H
Hongze Cheng 已提交
651
int tdInitKVRowBuilder(SKVRowBuilder *pBuilder) {
H
Hongze Cheng 已提交
652 653 654 655 656 657 658 659 660 661 662 663 664 665
  pBuilder->tCols = 128;
  pBuilder->nCols = 0;
  pBuilder->pColIdx = (SColIdx *)malloc(sizeof(SColIdx) * pBuilder->tCols);
  if (pBuilder->pColIdx == NULL) return -1;
  pBuilder->alloc = 1024;
  pBuilder->size = 0;
  pBuilder->buf = malloc(pBuilder->alloc);
  if (pBuilder->buf == NULL) {
    free(pBuilder->pColIdx);
    return -1;
  }
  return 0;
}

H
Hongze Cheng 已提交
666
void tdDestroyKVRowBuilder(SKVRowBuilder *pBuilder) {
H
Hongze Cheng 已提交
667 668 669 670
  tfree(pBuilder->pColIdx);
  tfree(pBuilder->buf);
}

H
Hongze Cheng 已提交
671
void tdResetKVRowBuilder(SKVRowBuilder *pBuilder) {
H
Hongze Cheng 已提交
672 673 674 675
  pBuilder->nCols = 0;
  pBuilder->size = 0;
}

H
Hongze Cheng 已提交
676
SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder) {
H
Hongze Cheng 已提交
677 678 679
  int tlen = sizeof(SColIdx) * pBuilder->nCols + pBuilder->size;
  if (tlen == 0) return NULL;

H
Hongze Cheng 已提交
680 681 682
  tlen += TD_KV_ROW_HEAD_SIZE;

  SKVRow row = malloc(tlen);
H
Hongze Cheng 已提交
683 684
  if (row == NULL) return NULL;

H
Hongze Cheng 已提交
685
  kvRowSetNCols(row, pBuilder->nCols);
H
Hongze Cheng 已提交
686
  kvRowSetLen(row, tlen);
H
Hongze Cheng 已提交
687

H
Hongze Cheng 已提交
688 689
  memcpy(kvRowColIdx(row), pBuilder->pColIdx, sizeof(SColIdx) * pBuilder->nCols);
  memcpy(kvRowValues(row), pBuilder->buf, pBuilder->size);
H
Hongze Cheng 已提交
690 691

  return row;
H
TD-100  
hzcheng 已提交
692
}