tdataformat.c 20.2 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
S
slguan 已提交
15
#include "tdataformat.h"
T
Tao Liu 已提交
16
#include "talgo.h"
H
Hongze Cheng 已提交
17
#include "wchar.h"
H
more  
hzcheng 已提交
18

H
hzcheng 已提交
19 20 21 22
/**
 * Duplicate the schema and return a new object
 */
STSchema *tdDupSchema(STSchema *pSchema) {
H
Hongze Cheng 已提交
23 24 25

  int tlen = sizeof(STSchema) + sizeof(STColumn) * schemaNCols(pSchema);
  STSchema *tSchema = (STSchema *)malloc(tlen);
H
hzcheng 已提交
26 27
  if (tSchema == NULL) return NULL;

H
Hongze Cheng 已提交
28
  memcpy((void *)tSchema, (void *)pSchema, tlen);
H
hzcheng 已提交
29 30 31 32

  return tSchema;
}

H
TD-27  
hzcheng 已提交
33 34 35 36
/**
 * Encode a schema to dst, and return the next pointer
 */
void *tdEncodeSchema(void *dst, STSchema *pSchema) {
H
TD-166  
hzcheng 已提交
37

H
Hongze Cheng 已提交
38 39
  T_APPEND_MEMBER(dst, pSchema, STSchema, version);
  T_APPEND_MEMBER(dst, pSchema, STSchema, numOfCols);
H
TD-27  
hzcheng 已提交
40 41 42 43 44 45 46 47 48 49 50 51 52 53
  for (int i = 0; i < schemaNCols(pSchema); i++) {
    STColumn *pCol = schemaColAt(pSchema, i);
    T_APPEND_MEMBER(dst, pCol, STColumn, type);
    T_APPEND_MEMBER(dst, pCol, STColumn, colId);
    T_APPEND_MEMBER(dst, pCol, STColumn, bytes);
  }

  return dst;
}

/**
 * Decode a schema from a binary.
 */
STSchema *tdDecodeSchema(void **psrc) {
H
TD-166  
hzcheng 已提交
54
  int totalCols = 0;
H
Hongze Cheng 已提交
55 56
  int version = 0;
  STSchemaBuilder schemaBuilder = {0};
H
TD-27  
hzcheng 已提交
57

H
Hongze Cheng 已提交
58
  T_READ_MEMBER(*psrc, int, version);
H
TD-166  
hzcheng 已提交
59
  T_READ_MEMBER(*psrc, int, totalCols);
H
TD-27  
hzcheng 已提交
60

H
Hongze Cheng 已提交
61 62
  if (tdInitTSchemaBuilder(&schemaBuilder, version) < 0) return NULL;

H
TD-166  
hzcheng 已提交
63
  for (int i = 0; i < totalCols; i++) {
H
TD-27  
hzcheng 已提交
64 65 66 67 68 69 70
    int8_t  type = 0;
    int16_t colId = 0;
    int32_t bytes = 0;
    T_READ_MEMBER(*psrc, int8_t, type);
    T_READ_MEMBER(*psrc, int16_t, colId);
    T_READ_MEMBER(*psrc, int32_t, bytes);

H
Hongze Cheng 已提交
71 72 73 74
    if (tdAddColToSchema(&schemaBuilder, type, colId, bytes) < 0) {
      tdDestroyTSchemaBuilder(&schemaBuilder);
      return NULL;
    }
H
TD-27  
hzcheng 已提交
75 76
  }

H
Hongze Cheng 已提交
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
  STSchema *pSchema = tdGetSchemaFromBuilder(&schemaBuilder);
  tdDestroyTSchemaBuilder(&schemaBuilder);
  return pSchema;
}

int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) {
  if (pBuilder == NULL) return -1;

  pBuilder->tCols = 256;
  pBuilder->columns = (STColumn *)malloc(sizeof(STColumn) * pBuilder->tCols);
  if (pBuilder->columns == NULL) return -1;

  tdResetTSchemaBuilder(pBuilder, version);
  return 0;
}

void tdDestroyTSchemaBuilder(STSchemaBuilder *pBuilder) {
  if (pBuilder) {
    tfree(pBuilder->columns);
  }
}

void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) {
  pBuilder->nCols = 0;
  pBuilder->tlen = 0;
  pBuilder->flen = 0;
  pBuilder->version = version;
}

int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int16_t colId, int32_t bytes) {
  if (!isValidDataType(type, 0)) return -1;

  if (pBuilder->nCols >= pBuilder->tCols) {
    pBuilder->tCols *= 2;
    pBuilder->columns = (STColumn *)realloc(pBuilder->columns, sizeof(STColumn) * pBuilder->tCols);
    if (pBuilder->columns == NULL) return -1;
  }

  STColumn *pCol = &(pBuilder->columns[pBuilder->nCols]);
  colSetType(pCol, type);
  colSetColId(pCol, colId);
  if (pBuilder->nCols == 0) {
    colSetOffset(pCol, 0);
  } else {
    STColumn *pTCol = &(pBuilder->columns[pBuilder->nCols-1]);
    colSetOffset(pCol, pTCol->offset + TYPE_BYTES[pTCol->type]);
  }

  if (IS_VAR_DATA_TYPE(type)) {
    colSetBytes(pCol, bytes);
    pBuilder->tlen += (TYPE_BYTES[type] + sizeof(VarDataLenT) + bytes);
  } else {
    colSetBytes(pCol, TYPE_BYTES[type]);
    pBuilder->tlen += TYPE_BYTES[type];
  }

  pBuilder->nCols++;
  pBuilder->flen += TYPE_BYTES[type];

  ASSERT(pCol->offset < pBuilder->flen);

  return 0;
}

STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder) {
  if (pBuilder->nCols <= 0) return NULL;

  int tlen = sizeof(STSchema) + sizeof(STColumn) * pBuilder->nCols;

  STSchema *pSchema = (STSchema *)malloc(tlen);
  if (pSchema == NULL) return NULL;

  schemaVersion(pSchema) = pBuilder->version;
  schemaNCols(pSchema) = pBuilder->nCols;
  schemaTLen(pSchema) = pBuilder->tlen;
  schemaFLen(pSchema) = pBuilder->flen;

  memcpy(schemaColAt(pSchema, 0), pBuilder->columns, sizeof(STColumn) * pBuilder->nCols);

H
TD-27  
hzcheng 已提交
156 157 158
  return pSchema;
}

H
hzcheng 已提交
159 160 161
/**
 * Initialize a data row
 */
H
TD-90  
Hongze Cheng 已提交
162 163 164 165
void tdInitDataRow(SDataRow row, STSchema *pSchema) {
  dataRowSetLen(row, TD_DATA_ROW_HEAD_SIZE + schemaFLen(pSchema));
  dataRowSetVersion(row, schemaVersion(pSchema));
}
H
hzcheng 已提交
166

H
TD-166  
hzcheng 已提交
167
SDataRow tdNewDataRowFromSchema(STSchema *pSchema) {
H
TD-166  
hzcheng 已提交
168
  int32_t size = dataRowMaxBytesFromSchema(pSchema);
H
hzcheng 已提交
169 170 171 172

  SDataRow row = malloc(size);
  if (row == NULL) return NULL;

H
hzcheng 已提交
173
  tdInitDataRow(row, pSchema);
H
hzcheng 已提交
174
  return row;
H
TD-166  
hzcheng 已提交
175
}
H
hzcheng 已提交
176

H
hzcheng 已提交
177 178 179 180 181 182 183
/**
 * Free the SDataRow object
 */
void tdFreeDataRow(SDataRow row) {
  if (row) free(row);
}

H
hzcheng 已提交
184
SDataRow tdDataRowDup(SDataRow row) {
H
hzcheng 已提交
185
  SDataRow trow = malloc(dataRowLen(row));
H
hzcheng 已提交
186 187 188
  if (trow == NULL) return NULL;

  dataRowCpy(trow, row);
H
hzcheng 已提交
189
  return trow;
H
hzcheng 已提交
190
}
H
hzcheng 已提交
191

H
TD-166  
hzcheng 已提交
192 193 194 195
void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints) {
  pDataCol->type = colType(pCol);
  pDataCol->colId = colColId(pCol);
  pDataCol->bytes = colBytes(pCol);
H
TD-166  
hzcheng 已提交
196
  pDataCol->offset = colOffset(pCol) + TD_DATA_ROW_HEAD_SIZE;
H
TD-166  
hzcheng 已提交
197 198 199

  pDataCol->len = 0;
  if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) {
H
TD-166  
hzcheng 已提交
200
    pDataCol->spaceSize = (sizeof(VarDataLenT) + pDataCol->bytes) * maxPoints;
H
TD-166  
hzcheng 已提交
201
    pDataCol->dataOff = (VarDataOffsetT *)(*pBuf);
H
hzcheng 已提交
202 203
    pDataCol->pData = POINTER_SHIFT(*pBuf, TYPE_BYTES[pDataCol->type] * maxPoints);
    *pBuf = POINTER_SHIFT(*pBuf, pDataCol->spaceSize + TYPE_BYTES[pDataCol->type] * maxPoints);
H
TD-166  
hzcheng 已提交
204 205 206 207
  } else {
    pDataCol->spaceSize = pDataCol->bytes * maxPoints;
    pDataCol->dataOff = NULL;
    pDataCol->pData = *pBuf;
H
hzcheng 已提交
208
    *pBuf = POINTER_SHIFT(*pBuf, pDataCol->spaceSize);
H
TD-166  
hzcheng 已提交
209 210 211
  }
}

H
Haojun Liao 已提交
212
void dataColAppendVal(SDataCol *pCol, void *value, int numOfRows, int maxPoints) {
H
TD-166  
hzcheng 已提交
213 214 215 216 217 218
  ASSERT(pCol != NULL && value != NULL);

  switch (pCol->type) {
    case TSDB_DATA_TYPE_BINARY:
    case TSDB_DATA_TYPE_NCHAR:
      // set offset
H
Haojun Liao 已提交
219
      pCol->dataOff[numOfRows] = pCol->len;
H
TD-166  
hzcheng 已提交
220
      // Copy data
H
hzcheng 已提交
221
      memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, varDataTLen(value));
H
TD-166  
hzcheng 已提交
222
      // Update the length
H
TD-166  
hzcheng 已提交
223
      pCol->len += varDataTLen(value);
H
TD-166  
hzcheng 已提交
224 225
      break;
    default:
H
Haojun Liao 已提交
226
      ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows);
H
hzcheng 已提交
227
      memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, pCol->bytes);
H
TD-166  
hzcheng 已提交
228 229 230 231 232
      pCol->len += pCol->bytes;
      break;
  }
}

H
Haojun Liao 已提交
233 234
void dataColPopPoints(SDataCol *pCol, int pointsToPop, int numOfRows) {
  int pointsLeft = numOfRows - pointsToPop;
H
TD-166  
hzcheng 已提交
235 236 237 238 239

  ASSERT(pointsLeft > 0);

  if (pCol->type == TSDB_DATA_TYPE_BINARY || pCol->type == TSDB_DATA_TYPE_NCHAR) {
    ASSERT(pCol->len > 0);
H
TD-166  
hzcheng 已提交
240
    VarDataOffsetT toffset = pCol->dataOff[pointsToPop];
H
TD-166  
hzcheng 已提交
241 242
    pCol->len = pCol->len - toffset;
    ASSERT(pCol->len > 0);
H
hzcheng 已提交
243
    memmove(pCol->pData, POINTER_SHIFT(pCol->pData, toffset), pCol->len);
H
TD-166  
hzcheng 已提交
244 245
    dataColSetOffset(pCol, pointsLeft);
  } else {
H
Haojun Liao 已提交
246
    ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows);
H
TD-166  
hzcheng 已提交
247
    pCol->len = TYPE_BYTES[pCol->type] * pointsLeft;
H
hzcheng 已提交
248
    memmove(pCol->pData, POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * pointsToPop), pCol->len);
H
TD-166  
hzcheng 已提交
249 250 251
  }
}

H
TD-166  
hzcheng 已提交
252 253 254 255 256
bool isNEleNull(SDataCol *pCol, int nEle) {
  switch (pCol->type) {
    case TSDB_DATA_TYPE_BINARY:
    case TSDB_DATA_TYPE_NCHAR:
      for (int i = 0; i < nEle; i++) {
H
TD-166  
hzcheng 已提交
257
        if (!isNull(varDataVal(tdGetColDataOfRow(pCol, i)), pCol->type)) return false;
H
TD-166  
hzcheng 已提交
258 259 260 261 262 263 264 265 266 267
      }
      return true;
    default:
      for (int i = 0; i < nEle; i++) {
        if (!isNull(tdGetColDataOfRow(pCol, i), pCol->type)) return false;
      }
      return true;
  }
}

H
TD-90  
Hongze Cheng 已提交
268 269 270 271 272 273 274 275 276 277 278 279 280
void dataColSetNullAt(SDataCol *pCol, int index) {
  if (IS_VAR_DATA_TYPE(pCol->type)) {
    pCol->dataOff[index] = pCol->len;
    char *ptr = POINTER_SHIFT(pCol->pData, pCol->len);
    varDataLen(ptr) = (pCol->type == TSDB_DATA_TYPE_BINARY) ? sizeof(char) : TSDB_NCHAR_SIZE;
    setNull(varDataVal(ptr), pCol->type, pCol->bytes);
    pCol->len += varDataTLen(ptr);
  } else {
    setNull(POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * index), pCol->type, pCol->bytes);
    pCol->len += TYPE_BYTES[pCol->type];
  }
}

H
TD-166  
hzcheng 已提交
281
void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints) {
H
TD-166  
hzcheng 已提交
282

H
TD-90  
Hongze Cheng 已提交
283 284 285 286 287 288 289 290
  if (IS_VAR_DATA_TYPE(pCol->type)) {
    pCol->len = 0;
    for (int i = 0; i < nEle; i++) {
      dataColSetNullAt(pCol, i);
    }
  } else {
    setNullN(pCol->pData, pCol->type, pCol->bytes, nEle);
    pCol->len = TYPE_BYTES[pCol->type] * nEle;
H
TD-166  
hzcheng 已提交
291 292 293
  }
}

H
TD-166  
hzcheng 已提交
294 295 296
void dataColSetOffset(SDataCol *pCol, int nEle) {
  ASSERT(((pCol->type == TSDB_DATA_TYPE_BINARY) || (pCol->type == TSDB_DATA_TYPE_NCHAR)));

H
Hongze Cheng 已提交
297
  void *tptr = pCol->pData;
H
TD-166  
hzcheng 已提交
298
  // char *tptr = (char *)(pCol->pData);
H
TD-166  
hzcheng 已提交
299

H
TD-166  
hzcheng 已提交
300
  VarDataOffsetT offset = 0;
H
TD-166  
hzcheng 已提交
301
  for (int i = 0; i < nEle; i++) {
H
TD-166  
hzcheng 已提交
302
    pCol->dataOff[i] = offset;
H
TD-166  
hzcheng 已提交
303
    offset += varDataTLen(tptr);
H
hzcheng 已提交
304
    tptr = POINTER_SHIFT(tptr, varDataTLen(tptr));
H
TD-166  
hzcheng 已提交
305 306 307
  }
}

H
TD-166  
hzcheng 已提交
308
SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) {
H
TD-34  
hzcheng 已提交
309 310 311 312 313 314
  SDataCols *pCols = (SDataCols *)calloc(1, sizeof(SDataCols) + sizeof(SDataCol) * maxCols);
  if (pCols == NULL) return NULL;

  pCols->maxRowSize = maxRowSize;
  pCols->maxCols = maxCols;
  pCols->maxPoints = maxRows;
H
TD-166  
hzcheng 已提交
315
  pCols->bufSize = maxRowSize * maxRows;
H
TD-34  
hzcheng 已提交
316

H
TD-166  
hzcheng 已提交
317
  pCols->buf = malloc(pCols->bufSize);
H
TD-34  
hzcheng 已提交
318 319
  if (pCols->buf == NULL) {
    free(pCols);
H
TD-34  
hzcheng 已提交
320 321
    return NULL;
  }
H
TD-34  
hzcheng 已提交
322

H
TD-34  
hzcheng 已提交
323 324 325 326 327 328 329 330
  return pCols;
}

void tdInitDataCols(SDataCols *pCols, STSchema *pSchema) {
  // assert(schemaNCols(pSchema) <= pCols->numOfCols);
  tdResetDataCols(pCols);
  pCols->numOfCols = schemaNCols(pSchema);

H
TD-166  
hzcheng 已提交
331
  void *ptr = pCols->buf;
H
TD-34  
hzcheng 已提交
332
  for (int i = 0; i < schemaNCols(pSchema); i++) {
H
TD-166  
hzcheng 已提交
333
    dataColInit(pCols->cols + i, schemaColAt(pSchema, i), &ptr, pCols->maxPoints);
H
TD-166  
hzcheng 已提交
334
    ASSERT((char *)ptr - (char *)(pCols->buf) <= pCols->bufSize);
H
TD-34  
hzcheng 已提交
335
  }
H
TD-34  
hzcheng 已提交
336 337 338 339
}

void tdFreeDataCols(SDataCols *pCols) {
  if (pCols) {
H
TD-166  
hzcheng 已提交
340
    tfree(pCols->buf);
H
TD-34  
hzcheng 已提交
341 342 343 344
    free(pCols);
  }
}

H
TD-100  
hzcheng 已提交
345
SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
H
TD-166  
hzcheng 已提交
346
  SDataCols *pRet = tdNewDataCols(pDataCols->maxRowSize, pDataCols->maxCols, pDataCols->maxPoints);
H
TD-100  
hzcheng 已提交
347 348 349 350
  if (pRet == NULL) return NULL;

  pRet->numOfCols = pDataCols->numOfCols;
  pRet->sversion = pDataCols->sversion;
H
Haojun Liao 已提交
351
  if (keepData) pRet->numOfRows = pDataCols->numOfRows;
H
TD-100  
hzcheng 已提交
352 353 354 355 356 357

  for (int i = 0; i < pDataCols->numOfCols; i++) {
    pRet->cols[i].type = pDataCols->cols[i].type;
    pRet->cols[i].colId = pDataCols->cols[i].colId;
    pRet->cols[i].bytes = pDataCols->cols[i].bytes;
    pRet->cols[i].offset = pDataCols->cols[i].offset;
H
TD-166  
hzcheng 已提交
358 359

    pRet->cols[i].spaceSize = pDataCols->cols[i].spaceSize;
H
TD-100  
hzcheng 已提交
360
    pRet->cols[i].pData = (void *)((char *)pRet->buf + ((char *)(pDataCols->cols[i].pData) - (char *)(pDataCols->buf)));
H
TD-100  
hzcheng 已提交
361

H
TD-166  
hzcheng 已提交
362 363 364 365 366 367 368 369 370 371
    if (pRet->cols[i].type == TSDB_DATA_TYPE_BINARY || pRet->cols[i].type == TSDB_DATA_TYPE_NCHAR) {
      ASSERT(pDataCols->cols[i].dataOff != NULL);
      pRet->cols[i].dataOff =
          (int32_t *)((char *)pRet->buf + ((char *)(pDataCols->cols[i].dataOff) - (char *)(pDataCols->buf)));
    }

    if (keepData) {
      pRet->cols[i].len = pDataCols->cols[i].len;
      memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pDataCols->cols[i].len);
      if (pRet->cols[i].type == TSDB_DATA_TYPE_BINARY || pRet->cols[i].type == TSDB_DATA_TYPE_NCHAR) {
H
TD-166  
hzcheng 已提交
372
        memcpy(pRet->cols[i].dataOff, pDataCols->cols[i].dataOff, sizeof(VarDataOffsetT) * pDataCols->maxPoints);
H
TD-166  
hzcheng 已提交
373 374
      }
    }
H
TD-100  
hzcheng 已提交
375 376 377 378 379
  }

  return pRet;
}

H
TD-34  
hzcheng 已提交
380
void tdResetDataCols(SDataCols *pCols) {
H
Haojun Liao 已提交
381
  pCols->numOfRows = 0;
H
TD-34  
hzcheng 已提交
382
  for (int i = 0; i < pCols->maxCols; i++) {
H
TD-166  
hzcheng 已提交
383
    dataColReset(pCols->cols + i);
H
TD-34  
hzcheng 已提交
384 385 386
  }
}

H
TD-90  
Hongze Cheng 已提交
387
void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols) {
H
TD-166  
hzcheng 已提交
388 389
  ASSERT(dataColsKeyLast(pCols) < dataRowKey(row));

H
TD-90  
Hongze Cheng 已提交
390 391 392 393 394 395 396 397 398 399
  int rcol = 0;
  int dcol = 0;

  while (dcol < pCols->numOfCols) {
    SDataCol *pDataCol = &(pCols->cols[dcol]);
    if (rcol >= schemaNCols(pSchema)) {
      dataColSetNullAt(pDataCol, pCols->numOfRows);
      dcol++;
      continue;
    }
H
TD-166  
hzcheng 已提交
400

H
TD-90  
Hongze Cheng 已提交
401 402
    STColumn *pRowCol = schemaColAt(pSchema, rcol);
    if (pRowCol->colId == pDataCol->colId) {
H
TD-90  
Hongze Cheng 已提交
403 404
      void *value = tdGetRowDataOfCol(row, pRowCol->type, pRowCol->offset+TD_DATA_ROW_HEAD_SIZE);
      dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints);
H
TD-90  
Hongze Cheng 已提交
405 406 407 408 409 410 411 412
      dcol++;
      rcol++;
    } else if (pRowCol->colId < pDataCol->colId) {
      rcol++;
    } else {
      dataColSetNullAt(pDataCol, pCols->numOfRows);
      dcol++;
    }
H
TD-34  
hzcheng 已提交
413
  }
H
Haojun Liao 已提交
414
  pCols->numOfRows++;
H
TD-34  
hzcheng 已提交
415
}
H
TD-166  
hzcheng 已提交
416

H
TD-34  
hzcheng 已提交
417 418
// Pop pointsToPop points from the SDataCols
void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop) {
H
Haojun Liao 已提交
419
  int pointsLeft = pCols->numOfRows - pointsToPop;
H
TD-166  
hzcheng 已提交
420
  if (pointsLeft <= 0) {
H
TD-166  
hzcheng 已提交
421 422 423
    tdResetDataCols(pCols);
    return;
  }
H
TD-34  
hzcheng 已提交
424 425

  for (int iCol = 0; iCol < pCols->numOfCols; iCol++) {
H
TD-166  
hzcheng 已提交
426
    SDataCol *pCol = pCols->cols + iCol;
H
Haojun Liao 已提交
427
    dataColPopPoints(pCol, pointsToPop, pCols->numOfRows);
H
TD-34  
hzcheng 已提交
428
  }
H
Haojun Liao 已提交
429
  pCols->numOfRows = pointsLeft;
H
TD-34  
hzcheng 已提交
430
}
H
TD-34  
hzcheng 已提交
431

H
hzcheng 已提交
432
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) {
H
Haojun Liao 已提交
433 434
  ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows);
  ASSERT(target->numOfRows + rowsToMerge <= target->maxPoints);
H
TD-166  
hzcheng 已提交
435
  ASSERT(target->numOfCols == source->numOfCols);
H
TD-100  
hzcheng 已提交
436

H
TD-166  
hzcheng 已提交
437
  SDataCols *pTarget = NULL;
H
TD-100  
hzcheng 已提交
438

H
TD-166  
hzcheng 已提交
439 440 441
  if (dataColsKeyLast(target) < dataColsKeyFirst(source)) {  // No overlap
    for (int i = 0; i < rowsToMerge; i++) {
      for (int j = 0; j < source->numOfCols; j++) {
H
Haojun Liao 已提交
442
        dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i), target->numOfRows,
H
TD-166  
hzcheng 已提交
443 444
                         target->maxPoints);
      }
H
Haojun Liao 已提交
445
      target->numOfRows++;
H
TD-166  
hzcheng 已提交
446 447 448 449 450 451 452
    }
  } else {
    pTarget = tdDupDataCols(target, true);
    if (pTarget == NULL) goto _err;

    int iter1 = 0;
    int iter2 = 0;
H
Haojun Liao 已提交
453
    tdMergeTwoDataCols(target, pTarget, &iter1, source, &iter2, pTarget->numOfRows + rowsToMerge);
H
TD-166  
hzcheng 已提交
454
  }
H
TD-100  
hzcheng 已提交
455 456 457 458 459 460 461 462

  tdFreeDataCols(pTarget);
  return 0;

_err:
  tdFreeDataCols(pTarget);
  return -1;
}
H
TD-100  
hzcheng 已提交
463

H
TD-100  
hzcheng 已提交
464
void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCols *src2, int *iter2, int tRows) {
H
TD-166  
hzcheng 已提交
465
  // TODO: add resolve duplicate key here
H
TD-100  
hzcheng 已提交
466 467
  tdResetDataCols(target);

H
Haojun Liao 已提交
468 469
  while (target->numOfRows < tRows) {
    if (*iter1 >= src1->numOfRows && *iter2 >= src2->numOfRows) break;
H
TD-100  
hzcheng 已提交
470

H
Haojun Liao 已提交
471 472
    TSKEY key1 = (*iter1 >= src1->numOfRows) ? INT64_MAX : ((TSKEY *)(src1->cols[0].pData))[*iter1];
    TSKEY key2 = (*iter2 >= src2->numOfRows) ? INT64_MAX : ((TSKEY *)(src2->cols[0].pData))[*iter2];
H
TD-100  
hzcheng 已提交
473

H
Hongze Cheng 已提交
474
    if (key1 <= key2) {
H
TD-100  
hzcheng 已提交
475 476
      for (int i = 0; i < src1->numOfCols; i++) {
        ASSERT(target->cols[i].type == src1->cols[i].type);
H
Haojun Liao 已提交
477
        dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows,
H
TD-166  
hzcheng 已提交
478
                         target->maxPoints);
H
TD-100  
hzcheng 已提交
479 480
      }

H
Haojun Liao 已提交
481
      target->numOfRows++;
H
TD-100  
hzcheng 已提交
482
      (*iter1)++;
H
Hongze Cheng 已提交
483 484
      if (key1 == key2) (*iter2)++;
    } else {
H
TD-100  
hzcheng 已提交
485 486
      for (int i = 0; i < src2->numOfCols; i++) {
        ASSERT(target->cols[i].type == src2->cols[i].type);
H
Haojun Liao 已提交
487
        dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src2->cols + i, *iter2), target->numOfRows,
H
TD-166  
hzcheng 已提交
488
                         target->maxPoints);
H
TD-100  
hzcheng 已提交
489
      }
H
TD-100  
hzcheng 已提交
490

H
Haojun Liao 已提交
491
      target->numOfRows++;
H
TD-100  
hzcheng 已提交
492
      (*iter2)++;
H
TD-100  
hzcheng 已提交
493 494
    }
  }
H
Hongze Cheng 已提交
495 496
}

H
Hongze Cheng 已提交
497 498
SKVRow tdKVRowDup(SKVRow row) {
  SKVRow trow = malloc(kvRowLen(row));
H
Hongze Cheng 已提交
499 500
  if (trow == NULL) return NULL;

H
Hongze Cheng 已提交
501
  kvRowCpy(trow, row);
H
Hongze Cheng 已提交
502 503 504
  return trow;
}

H
TD-90  
Hongze Cheng 已提交
505 506 507 508 509 510 511
int tdSetKVRowDataOfCol(SKVRow *orow, int16_t colId, int8_t type, void *value) {
  SColIdx *pColIdx = NULL;
  SKVRow   row = *orow;
  SKVRow   nrow = NULL;
  void *   ptr = taosbsearch(&colId, kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), comparTagId, TD_GE);

  if (ptr == NULL || ((SColIdx *)ptr)->colId < colId) { // need to add a column value to the row
H
TD-90  
Hongze Cheng 已提交
512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549
    int diff = IS_VAR_DATA_TYPE(type) ? varDataTLen(value) : TYPE_BYTES[type];
    nrow = malloc(kvRowLen(row) + sizeof(SColIdx) + diff);
    if (nrow == NULL) return -1;

    kvRowSetLen(nrow, kvRowLen(row) + sizeof(SColIdx) + diff);
    kvRowSetNCols(nrow, kvRowNCols(row) + 1);

    if (ptr == NULL) {
      memcpy(kvRowColIdx(nrow), kvRowColIdx(row), sizeof(SColIdx) * kvRowNCols(row));
      memcpy(kvRowValues(nrow), kvRowValues(row), POINTER_DISTANCE(kvRowEnd(row), kvRowValues(row)));
      int colIdx = kvRowNCols(nrow) - 1;
      kvRowColIdxAt(nrow, colIdx)->colId = colId;
      kvRowColIdxAt(nrow, colIdx)->offset = POINTER_DISTANCE(kvRowEnd(row), kvRowValues(row));
      memcpy(kvRowColVal(nrow, kvRowColIdxAt(nrow, colIdx)), value, diff);
    } else {
      int16_t tlen = POINTER_DISTANCE(ptr, kvRowColIdx(row));
      if (tlen > 0) {
        memcpy(kvRowColIdx(nrow), kvRowColIdx(row), tlen);
        memcpy(kvRowValues(nrow), kvRowValues(row), ((SColIdx *)ptr)->offset);
      }

      int colIdx = tlen / sizeof(SColIdx);
      kvRowColIdxAt(nrow, colIdx)->colId = colId;
      kvRowColIdxAt(nrow, colIdx)->offset = ((SColIdx *)ptr)->offset;
      memcpy(kvRowColVal(nrow, kvRowColIdxAt(nrow, colIdx)), value, diff);

      for (int i = colIdx; i < kvRowNCols(row); i++) {
        kvRowColIdxAt(nrow, i + 1)->colId = kvRowColIdxAt(row, i)->colId;
        kvRowColIdxAt(nrow, i + 1)->offset = kvRowColIdxAt(row, i)->offset + diff;
      }
      memcpy(kvRowColVal(nrow, kvRowColIdxAt(nrow, colIdx + 1)), kvRowColVal(row, kvRowColIdxAt(row, colIdx)),
             POINTER_DISTANCE(kvRowEnd(row), kvRowColVal(row, kvRowColIdxAt(row, colIdx)))

      );
    }

    *orow = nrow;
    free(row);
H
TD-90  
Hongze Cheng 已提交
550 551 552 553 554 555 556 557 558 559 560 561
  } else {
    ASSERT(((SColIdx *)ptr)->colId == colId);
    if (IS_VAR_DATA_TYPE(type)) {
      void *pOldVal = kvRowColVal(row, (SColIdx *)ptr);

      if (varDataTLen(value) == varDataTLen(pOldVal)) { // just update the column value in place
        memcpy(pOldVal, value, varDataTLen(value));
      } else { // need to reallocate the memory
        int16_t diff = varDataTLen(value) - varDataTLen(pOldVal);
        int16_t nlen = kvRowLen(row) + diff;
        ASSERT(nlen > 0);
        nrow = malloc(nlen);
H
TD-90  
Hongze Cheng 已提交
562
        if (nrow == NULL) return -1;
H
TD-90  
Hongze Cheng 已提交
563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593

        kvRowSetLen(nrow, nlen);
        kvRowSetNCols(nrow, kvRowNCols(row));

        // Copy part ahead
        nlen = POINTER_DISTANCE(ptr, kvRowColIdx(row));
        ASSERT(nlen % sizeof(SColIdx) == 0);
        if (nlen > 0) {
          ASSERT(((SColIdx *)ptr)->offset > 0);
          memcpy(kvRowColIdx(nrow), kvRowColIdx(row), nlen);
          memcpy(kvRowValues(nrow), kvRowValues(row), ((SColIdx *)ptr)->offset);
        }

        // Construct current column value
        int colIdx = nlen / sizeof(SColIdx);
        pColIdx = kvRowColIdxAt(nrow, colIdx);
        pColIdx->colId = ((SColIdx *)ptr)->colId;
        pColIdx->offset = ((SColIdx *)ptr)->offset;
        memcpy(kvRowColVal(nrow, pColIdx), value, varDataTLen(value));
 
        // Construct columns after
        if (kvRowNCols(nrow) - colIdx - 1 > 0) {
          for (int i = colIdx + 1; i < kvRowNCols(nrow); i++) {
            kvRowColIdxAt(nrow, i)->colId = kvRowColIdxAt(row, i)->colId;
            kvRowColIdxAt(nrow, i)->offset += diff;
          }
          memcpy(kvRowColVal(nrow, kvRowColIdxAt(nrow, colIdx + 1)), kvRowColVal(row, kvRowColIdxAt(row, colIdx + 1)),
                 POINTER_DISTANCE(kvRowEnd(row), kvRowColVal(row, kvRowColIdxAt(row, colIdx + 1))));
        }

        *orow = nrow;
H
TD-90  
Hongze Cheng 已提交
594
        free(row);
H
TD-90  
Hongze Cheng 已提交
595 596 597 598 599 600 601
      }
    } else {
      memcpy(kvRowColVal(row, (SColIdx *)ptr), value, TYPE_BYTES[type]);
    }
  }

  return 0;
H
Hongze Cheng 已提交
602 603
}

H
Hongze Cheng 已提交
604
void *tdEncodeKVRow(void *buf, SKVRow row) {
H
Hongze Cheng 已提交
605
  // May change the encode purpose
H
Hongze Cheng 已提交
606 607
  kvRowCpy(buf, row);
  return POINTER_SHIFT(buf, kvRowLen(row));
H
Hongze Cheng 已提交
608 609
}

H
Hongze Cheng 已提交
610 611 612
void *tdDecodeKVRow(void *buf, SKVRow *row) {
  *row = tdKVRowDup(buf);
  return POINTER_SHIFT(buf, kvRowLen(*row));
H
Hongze Cheng 已提交
613 614
}

H
Hongze Cheng 已提交
615
int tdInitKVRowBuilder(SKVRowBuilder *pBuilder) {
H
Hongze Cheng 已提交
616 617 618 619 620 621 622 623 624 625 626 627 628 629
  pBuilder->tCols = 128;
  pBuilder->nCols = 0;
  pBuilder->pColIdx = (SColIdx *)malloc(sizeof(SColIdx) * pBuilder->tCols);
  if (pBuilder->pColIdx == NULL) return -1;
  pBuilder->alloc = 1024;
  pBuilder->size = 0;
  pBuilder->buf = malloc(pBuilder->alloc);
  if (pBuilder->buf == NULL) {
    free(pBuilder->pColIdx);
    return -1;
  }
  return 0;
}

H
Hongze Cheng 已提交
630
void tdDestroyKVRowBuilder(SKVRowBuilder *pBuilder) {
H
Hongze Cheng 已提交
631 632 633 634
  tfree(pBuilder->pColIdx);
  tfree(pBuilder->buf);
}

H
Hongze Cheng 已提交
635
void tdResetKVRowBuilder(SKVRowBuilder *pBuilder) {
H
Hongze Cheng 已提交
636 637 638 639
  pBuilder->nCols = 0;
  pBuilder->size = 0;
}

H
Hongze Cheng 已提交
640
SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder) {
H
Hongze Cheng 已提交
641 642 643
  int tlen = sizeof(SColIdx) * pBuilder->nCols + pBuilder->size;
  if (tlen == 0) return NULL;

H
Hongze Cheng 已提交
644 645 646
  tlen += TD_KV_ROW_HEAD_SIZE;

  SKVRow row = malloc(tlen);
H
Hongze Cheng 已提交
647 648
  if (row == NULL) return NULL;

H
Hongze Cheng 已提交
649
  kvRowSetNCols(row, pBuilder->nCols);
H
Hongze Cheng 已提交
650
  kvRowSetLen(row, tlen);
H
Hongze Cheng 已提交
651

H
Hongze Cheng 已提交
652 653
  memcpy(kvRowColIdx(row), pBuilder->pColIdx, sizeof(SColIdx) * pBuilder->nCols);
  memcpy(kvRowValues(row), pBuilder->buf, pBuilder->size);
H
Hongze Cheng 已提交
654 655

  return row;
H
TD-100  
hzcheng 已提交
656
}