tdataformat.c 19.2 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
S
slguan 已提交
15
#include "tdataformat.h"
T
Tao Liu 已提交
16
#include "talgo.h"
H
Hongze Cheng 已提交
17
#include "wchar.h"
H
more  
hzcheng 已提交
18

H
hzcheng 已提交
19 20 21 22
/**
 * Duplicate the schema and return a new object
 */
STSchema *tdDupSchema(STSchema *pSchema) {
H
Hongze Cheng 已提交
23 24 25

  int tlen = sizeof(STSchema) + sizeof(STColumn) * schemaNCols(pSchema);
  STSchema *tSchema = (STSchema *)malloc(tlen);
H
hzcheng 已提交
26 27
  if (tSchema == NULL) return NULL;

H
Hongze Cheng 已提交
28
  memcpy((void *)tSchema, (void *)pSchema, tlen);
H
hzcheng 已提交
29 30 31 32

  return tSchema;
}

H
TD-27  
hzcheng 已提交
33 34 35 36
/**
 * Encode a schema to dst, and return the next pointer
 */
void *tdEncodeSchema(void *dst, STSchema *pSchema) {
H
TD-166  
hzcheng 已提交
37

H
Hongze Cheng 已提交
38 39
  T_APPEND_MEMBER(dst, pSchema, STSchema, version);
  T_APPEND_MEMBER(dst, pSchema, STSchema, numOfCols);
H
TD-27  
hzcheng 已提交
40 41 42 43 44 45 46 47 48 49 50 51 52 53
  for (int i = 0; i < schemaNCols(pSchema); i++) {
    STColumn *pCol = schemaColAt(pSchema, i);
    T_APPEND_MEMBER(dst, pCol, STColumn, type);
    T_APPEND_MEMBER(dst, pCol, STColumn, colId);
    T_APPEND_MEMBER(dst, pCol, STColumn, bytes);
  }

  return dst;
}

/**
 * Decode a schema from a binary.
 */
STSchema *tdDecodeSchema(void **psrc) {
H
TD-166  
hzcheng 已提交
54
  int totalCols = 0;
H
Hongze Cheng 已提交
55 56
  int version = 0;
  STSchemaBuilder schemaBuilder = {0};
H
TD-27  
hzcheng 已提交
57

H
Hongze Cheng 已提交
58
  T_READ_MEMBER(*psrc, int, version);
H
TD-166  
hzcheng 已提交
59
  T_READ_MEMBER(*psrc, int, totalCols);
H
TD-27  
hzcheng 已提交
60

H
Hongze Cheng 已提交
61 62
  if (tdInitTSchemaBuilder(&schemaBuilder, version) < 0) return NULL;

H
TD-166  
hzcheng 已提交
63
  for (int i = 0; i < totalCols; i++) {
H
TD-27  
hzcheng 已提交
64 65 66 67 68 69 70
    int8_t  type = 0;
    int16_t colId = 0;
    int32_t bytes = 0;
    T_READ_MEMBER(*psrc, int8_t, type);
    T_READ_MEMBER(*psrc, int16_t, colId);
    T_READ_MEMBER(*psrc, int32_t, bytes);

H
Hongze Cheng 已提交
71 72 73 74
    if (tdAddColToSchema(&schemaBuilder, type, colId, bytes) < 0) {
      tdDestroyTSchemaBuilder(&schemaBuilder);
      return NULL;
    }
H
TD-27  
hzcheng 已提交
75 76
  }

H
Hongze Cheng 已提交
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
  STSchema *pSchema = tdGetSchemaFromBuilder(&schemaBuilder);
  tdDestroyTSchemaBuilder(&schemaBuilder);
  return pSchema;
}

int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) {
  if (pBuilder == NULL) return -1;

  pBuilder->tCols = 256;
  pBuilder->columns = (STColumn *)malloc(sizeof(STColumn) * pBuilder->tCols);
  if (pBuilder->columns == NULL) return -1;

  tdResetTSchemaBuilder(pBuilder, version);
  return 0;
}

void tdDestroyTSchemaBuilder(STSchemaBuilder *pBuilder) {
  if (pBuilder) {
    tfree(pBuilder->columns);
  }
}

void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) {
  pBuilder->nCols = 0;
  pBuilder->tlen = 0;
  pBuilder->flen = 0;
  pBuilder->version = version;
}

int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int16_t colId, int32_t bytes) {
  if (!isValidDataType(type, 0)) return -1;

  if (pBuilder->nCols >= pBuilder->tCols) {
    pBuilder->tCols *= 2;
    pBuilder->columns = (STColumn *)realloc(pBuilder->columns, sizeof(STColumn) * pBuilder->tCols);
    if (pBuilder->columns == NULL) return -1;
  }

  STColumn *pCol = &(pBuilder->columns[pBuilder->nCols]);
  colSetType(pCol, type);
  colSetColId(pCol, colId);
  if (pBuilder->nCols == 0) {
    colSetOffset(pCol, 0);
  } else {
    STColumn *pTCol = &(pBuilder->columns[pBuilder->nCols-1]);
    colSetOffset(pCol, pTCol->offset + TYPE_BYTES[pTCol->type]);
  }

  if (IS_VAR_DATA_TYPE(type)) {
    colSetBytes(pCol, bytes);
    pBuilder->tlen += (TYPE_BYTES[type] + sizeof(VarDataLenT) + bytes);
  } else {
    colSetBytes(pCol, TYPE_BYTES[type]);
    pBuilder->tlen += TYPE_BYTES[type];
  }

  pBuilder->nCols++;
  pBuilder->flen += TYPE_BYTES[type];

  ASSERT(pCol->offset < pBuilder->flen);

  return 0;
}

STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder) {
  if (pBuilder->nCols <= 0) return NULL;

  int tlen = sizeof(STSchema) + sizeof(STColumn) * pBuilder->nCols;

  STSchema *pSchema = (STSchema *)malloc(tlen);
  if (pSchema == NULL) return NULL;

  schemaVersion(pSchema) = pBuilder->version;
  schemaNCols(pSchema) = pBuilder->nCols;
  schemaTLen(pSchema) = pBuilder->tlen;
  schemaFLen(pSchema) = pBuilder->flen;

  memcpy(schemaColAt(pSchema, 0), pBuilder->columns, sizeof(STColumn) * pBuilder->nCols);

H
TD-27  
hzcheng 已提交
156 157 158
  return pSchema;
}

H
hzcheng 已提交
159 160 161
/**
 * Initialize a data row
 */
H
TD-166  
hzcheng 已提交
162
void tdInitDataRow(SDataRow row, STSchema *pSchema) { dataRowSetLen(row, TD_DATA_ROW_HEAD_SIZE + schemaFLen(pSchema)); }
H
hzcheng 已提交
163

H
TD-166  
hzcheng 已提交
164
SDataRow tdNewDataRowFromSchema(STSchema *pSchema) {
H
TD-166  
hzcheng 已提交
165
  int32_t size = dataRowMaxBytesFromSchema(pSchema);
H
hzcheng 已提交
166 167 168 169

  SDataRow row = malloc(size);
  if (row == NULL) return NULL;

H
hzcheng 已提交
170
  tdInitDataRow(row, pSchema);
H
hzcheng 已提交
171
  return row;
H
TD-166  
hzcheng 已提交
172
}
H
hzcheng 已提交
173

H
hzcheng 已提交
174 175 176 177 178 179 180
/**
 * Free the SDataRow object
 */
void tdFreeDataRow(SDataRow row) {
  if (row) free(row);
}

H
hzcheng 已提交
181
SDataRow tdDataRowDup(SDataRow row) {
H
hzcheng 已提交
182
  SDataRow trow = malloc(dataRowLen(row));
H
hzcheng 已提交
183 184 185
  if (trow == NULL) return NULL;

  dataRowCpy(trow, row);
H
hzcheng 已提交
186
  return trow;
H
hzcheng 已提交
187
}
H
hzcheng 已提交
188

H
TD-166  
hzcheng 已提交
189 190 191 192
void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints) {
  pDataCol->type = colType(pCol);
  pDataCol->colId = colColId(pCol);
  pDataCol->bytes = colBytes(pCol);
H
TD-166  
hzcheng 已提交
193
  pDataCol->offset = colOffset(pCol) + TD_DATA_ROW_HEAD_SIZE;
H
TD-166  
hzcheng 已提交
194 195 196

  pDataCol->len = 0;
  if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) {
H
TD-166  
hzcheng 已提交
197
    pDataCol->spaceSize = (sizeof(VarDataLenT) + pDataCol->bytes) * maxPoints;
H
TD-166  
hzcheng 已提交
198
    pDataCol->dataOff = (VarDataOffsetT *)(*pBuf);
H
hzcheng 已提交
199 200
    pDataCol->pData = POINTER_SHIFT(*pBuf, TYPE_BYTES[pDataCol->type] * maxPoints);
    *pBuf = POINTER_SHIFT(*pBuf, pDataCol->spaceSize + TYPE_BYTES[pDataCol->type] * maxPoints);
H
TD-166  
hzcheng 已提交
201 202 203 204
  } else {
    pDataCol->spaceSize = pDataCol->bytes * maxPoints;
    pDataCol->dataOff = NULL;
    pDataCol->pData = *pBuf;
H
hzcheng 已提交
205
    *pBuf = POINTER_SHIFT(*pBuf, pDataCol->spaceSize);
H
TD-166  
hzcheng 已提交
206 207 208
  }
}

H
Haojun Liao 已提交
209
void dataColAppendVal(SDataCol *pCol, void *value, int numOfRows, int maxPoints) {
H
TD-166  
hzcheng 已提交
210 211 212 213 214 215
  ASSERT(pCol != NULL && value != NULL);

  switch (pCol->type) {
    case TSDB_DATA_TYPE_BINARY:
    case TSDB_DATA_TYPE_NCHAR:
      // set offset
H
Haojun Liao 已提交
216
      pCol->dataOff[numOfRows] = pCol->len;
H
TD-166  
hzcheng 已提交
217
      // Copy data
H
hzcheng 已提交
218
      memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, varDataTLen(value));
H
TD-166  
hzcheng 已提交
219
      // Update the length
H
TD-166  
hzcheng 已提交
220
      pCol->len += varDataTLen(value);
H
TD-166  
hzcheng 已提交
221 222
      break;
    default:
H
Haojun Liao 已提交
223
      ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows);
H
hzcheng 已提交
224
      memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, pCol->bytes);
H
TD-166  
hzcheng 已提交
225 226 227 228 229
      pCol->len += pCol->bytes;
      break;
  }
}

H
Haojun Liao 已提交
230 231
void dataColPopPoints(SDataCol *pCol, int pointsToPop, int numOfRows) {
  int pointsLeft = numOfRows - pointsToPop;
H
TD-166  
hzcheng 已提交
232 233 234 235 236

  ASSERT(pointsLeft > 0);

  if (pCol->type == TSDB_DATA_TYPE_BINARY || pCol->type == TSDB_DATA_TYPE_NCHAR) {
    ASSERT(pCol->len > 0);
H
TD-166  
hzcheng 已提交
237
    VarDataOffsetT toffset = pCol->dataOff[pointsToPop];
H
TD-166  
hzcheng 已提交
238 239
    pCol->len = pCol->len - toffset;
    ASSERT(pCol->len > 0);
H
hzcheng 已提交
240
    memmove(pCol->pData, POINTER_SHIFT(pCol->pData, toffset), pCol->len);
H
TD-166  
hzcheng 已提交
241 242
    dataColSetOffset(pCol, pointsLeft);
  } else {
H
Haojun Liao 已提交
243
    ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows);
H
TD-166  
hzcheng 已提交
244
    pCol->len = TYPE_BYTES[pCol->type] * pointsLeft;
H
hzcheng 已提交
245
    memmove(pCol->pData, POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * pointsToPop), pCol->len);
H
TD-166  
hzcheng 已提交
246 247 248
  }
}

H
TD-166  
hzcheng 已提交
249 250 251 252 253
bool isNEleNull(SDataCol *pCol, int nEle) {
  switch (pCol->type) {
    case TSDB_DATA_TYPE_BINARY:
    case TSDB_DATA_TYPE_NCHAR:
      for (int i = 0; i < nEle; i++) {
H
TD-166  
hzcheng 已提交
254
        if (!isNull(varDataVal(tdGetColDataOfRow(pCol, i)), pCol->type)) return false;
H
TD-166  
hzcheng 已提交
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269
      }
      return true;
    default:
      for (int i = 0; i < nEle; i++) {
        if (!isNull(tdGetColDataOfRow(pCol, i), pCol->type)) return false;
      }
      return true;
  }
}

void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints) {
  char *ptr = NULL;
  switch (pCol->type) {
    case TSDB_DATA_TYPE_BINARY:
    case TSDB_DATA_TYPE_NCHAR:
H
TD-166  
hzcheng 已提交
270
      pCol->len = 0;
H
TD-166  
hzcheng 已提交
271
      for (int i = 0; i < nEle; i++) {
H
TD-166  
hzcheng 已提交
272 273 274 275 276
        pCol->dataOff[i] = pCol->len;
        ptr = (char *)pCol->pData + pCol->len;
        varDataLen(ptr) = (pCol->type == TSDB_DATA_TYPE_BINARY) ? sizeof(char) : TSDB_NCHAR_SIZE;
        setNull(ptr + sizeof(VarDataLenT), pCol->type, pCol->bytes);
        pCol->len += varDataTLen(ptr);
H
TD-166  
hzcheng 已提交
277
      }
H
TD-166  
hzcheng 已提交
278

H
TD-166  
hzcheng 已提交
279 280 281 282 283 284 285 286
      break;
    default:
      setNullN(pCol->pData, pCol->type, pCol->bytes, nEle);
      pCol->len = TYPE_BYTES[pCol->type] * nEle;
      break;
  }
}

H
TD-166  
hzcheng 已提交
287 288 289
void dataColSetOffset(SDataCol *pCol, int nEle) {
  ASSERT(((pCol->type == TSDB_DATA_TYPE_BINARY) || (pCol->type == TSDB_DATA_TYPE_NCHAR)));

H
Hongze Cheng 已提交
290
  void *tptr = pCol->pData;
H
TD-166  
hzcheng 已提交
291
  // char *tptr = (char *)(pCol->pData);
H
TD-166  
hzcheng 已提交
292

H
TD-166  
hzcheng 已提交
293
  VarDataOffsetT offset = 0;
H
TD-166  
hzcheng 已提交
294
  for (int i = 0; i < nEle; i++) {
H
TD-166  
hzcheng 已提交
295
    pCol->dataOff[i] = offset;
H
TD-166  
hzcheng 已提交
296
    offset += varDataTLen(tptr);
H
hzcheng 已提交
297
    tptr = POINTER_SHIFT(tptr, varDataTLen(tptr));
H
TD-166  
hzcheng 已提交
298 299 300
  }
}

H
TD-166  
hzcheng 已提交
301
SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) {
H
TD-34  
hzcheng 已提交
302 303 304 305 306 307
  SDataCols *pCols = (SDataCols *)calloc(1, sizeof(SDataCols) + sizeof(SDataCol) * maxCols);
  if (pCols == NULL) return NULL;

  pCols->maxRowSize = maxRowSize;
  pCols->maxCols = maxCols;
  pCols->maxPoints = maxRows;
H
TD-166  
hzcheng 已提交
308
  pCols->bufSize = maxRowSize * maxRows;
H
TD-34  
hzcheng 已提交
309

H
TD-166  
hzcheng 已提交
310
  pCols->buf = malloc(pCols->bufSize);
H
TD-34  
hzcheng 已提交
311 312
  if (pCols->buf == NULL) {
    free(pCols);
H
TD-34  
hzcheng 已提交
313 314
    return NULL;
  }
H
TD-34  
hzcheng 已提交
315

H
TD-34  
hzcheng 已提交
316 317 318 319 320 321 322 323
  return pCols;
}

void tdInitDataCols(SDataCols *pCols, STSchema *pSchema) {
  // assert(schemaNCols(pSchema) <= pCols->numOfCols);
  tdResetDataCols(pCols);
  pCols->numOfCols = schemaNCols(pSchema);

H
TD-166  
hzcheng 已提交
324
  void *ptr = pCols->buf;
H
TD-34  
hzcheng 已提交
325
  for (int i = 0; i < schemaNCols(pSchema); i++) {
H
TD-166  
hzcheng 已提交
326
    dataColInit(pCols->cols + i, schemaColAt(pSchema, i), &ptr, pCols->maxPoints);
H
TD-166  
hzcheng 已提交
327
    ASSERT((char *)ptr - (char *)(pCols->buf) <= pCols->bufSize);
H
TD-34  
hzcheng 已提交
328
  }
H
TD-34  
hzcheng 已提交
329 330 331 332
}

void tdFreeDataCols(SDataCols *pCols) {
  if (pCols) {
H
TD-166  
hzcheng 已提交
333
    tfree(pCols->buf);
H
TD-34  
hzcheng 已提交
334 335 336 337
    free(pCols);
  }
}

H
TD-100  
hzcheng 已提交
338
SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
H
TD-166  
hzcheng 已提交
339
  SDataCols *pRet = tdNewDataCols(pDataCols->maxRowSize, pDataCols->maxCols, pDataCols->maxPoints);
H
TD-100  
hzcheng 已提交
340 341 342 343
  if (pRet == NULL) return NULL;

  pRet->numOfCols = pDataCols->numOfCols;
  pRet->sversion = pDataCols->sversion;
H
Haojun Liao 已提交
344
  if (keepData) pRet->numOfRows = pDataCols->numOfRows;
H
TD-100  
hzcheng 已提交
345 346 347 348 349 350

  for (int i = 0; i < pDataCols->numOfCols; i++) {
    pRet->cols[i].type = pDataCols->cols[i].type;
    pRet->cols[i].colId = pDataCols->cols[i].colId;
    pRet->cols[i].bytes = pDataCols->cols[i].bytes;
    pRet->cols[i].offset = pDataCols->cols[i].offset;
H
TD-166  
hzcheng 已提交
351 352

    pRet->cols[i].spaceSize = pDataCols->cols[i].spaceSize;
H
TD-100  
hzcheng 已提交
353
    pRet->cols[i].pData = (void *)((char *)pRet->buf + ((char *)(pDataCols->cols[i].pData) - (char *)(pDataCols->buf)));
H
TD-100  
hzcheng 已提交
354

H
TD-166  
hzcheng 已提交
355 356 357 358 359 360 361 362 363 364
    if (pRet->cols[i].type == TSDB_DATA_TYPE_BINARY || pRet->cols[i].type == TSDB_DATA_TYPE_NCHAR) {
      ASSERT(pDataCols->cols[i].dataOff != NULL);
      pRet->cols[i].dataOff =
          (int32_t *)((char *)pRet->buf + ((char *)(pDataCols->cols[i].dataOff) - (char *)(pDataCols->buf)));
    }

    if (keepData) {
      pRet->cols[i].len = pDataCols->cols[i].len;
      memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pDataCols->cols[i].len);
      if (pRet->cols[i].type == TSDB_DATA_TYPE_BINARY || pRet->cols[i].type == TSDB_DATA_TYPE_NCHAR) {
H
TD-166  
hzcheng 已提交
365
        memcpy(pRet->cols[i].dataOff, pDataCols->cols[i].dataOff, sizeof(VarDataOffsetT) * pDataCols->maxPoints);
H
TD-166  
hzcheng 已提交
366 367
      }
    }
H
TD-100  
hzcheng 已提交
368 369 370 371 372
  }

  return pRet;
}

H
TD-34  
hzcheng 已提交
373
void tdResetDataCols(SDataCols *pCols) {
H
Haojun Liao 已提交
374
  pCols->numOfRows = 0;
H
TD-34  
hzcheng 已提交
375
  for (int i = 0; i < pCols->maxCols; i++) {
H
TD-166  
hzcheng 已提交
376
    dataColReset(pCols->cols + i);
H
TD-34  
hzcheng 已提交
377 378 379
  }
}

H
TD-34  
hzcheng 已提交
380
void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols) {
H
TD-166  
hzcheng 已提交
381 382
  ASSERT(dataColsKeyLast(pCols) < dataRowKey(row));

H
TD-34  
hzcheng 已提交
383 384
  for (int i = 0; i < pCols->numOfCols; i++) {
    SDataCol *pCol = pCols->cols + i;
H
TD-166  
hzcheng 已提交
385
    void *    value = tdGetRowDataOfCol(row, pCol->type, pCol->offset);
H
TD-166  
hzcheng 已提交
386

H
Haojun Liao 已提交
387
    dataColAppendVal(pCol, value, pCols->numOfRows, pCols->maxPoints);
H
TD-34  
hzcheng 已提交
388
  }
H
Haojun Liao 已提交
389
  pCols->numOfRows++;
H
TD-34  
hzcheng 已提交
390
}
H
TD-166  
hzcheng 已提交
391

H
TD-34  
hzcheng 已提交
392 393
// Pop pointsToPop points from the SDataCols
void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop) {
H
Haojun Liao 已提交
394
  int pointsLeft = pCols->numOfRows - pointsToPop;
H
TD-166  
hzcheng 已提交
395
  if (pointsLeft <= 0) {
H
TD-166  
hzcheng 已提交
396 397 398
    tdResetDataCols(pCols);
    return;
  }
H
TD-34  
hzcheng 已提交
399 400

  for (int iCol = 0; iCol < pCols->numOfCols; iCol++) {
H
TD-166  
hzcheng 已提交
401
    SDataCol *pCol = pCols->cols + iCol;
H
Haojun Liao 已提交
402
    dataColPopPoints(pCol, pointsToPop, pCols->numOfRows);
H
TD-34  
hzcheng 已提交
403
  }
H
Haojun Liao 已提交
404
  pCols->numOfRows = pointsLeft;
H
TD-34  
hzcheng 已提交
405
}
H
TD-34  
hzcheng 已提交
406

H
hzcheng 已提交
407
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) {
H
Haojun Liao 已提交
408 409
  ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows);
  ASSERT(target->numOfRows + rowsToMerge <= target->maxPoints);
H
TD-166  
hzcheng 已提交
410
  ASSERT(target->numOfCols == source->numOfCols);
H
TD-100  
hzcheng 已提交
411

H
TD-166  
hzcheng 已提交
412
  SDataCols *pTarget = NULL;
H
TD-100  
hzcheng 已提交
413

H
TD-166  
hzcheng 已提交
414 415 416
  if (dataColsKeyLast(target) < dataColsKeyFirst(source)) {  // No overlap
    for (int i = 0; i < rowsToMerge; i++) {
      for (int j = 0; j < source->numOfCols; j++) {
H
Haojun Liao 已提交
417
        dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i), target->numOfRows,
H
TD-166  
hzcheng 已提交
418 419
                         target->maxPoints);
      }
H
Haojun Liao 已提交
420
      target->numOfRows++;
H
TD-166  
hzcheng 已提交
421 422 423 424 425 426 427
    }
  } else {
    pTarget = tdDupDataCols(target, true);
    if (pTarget == NULL) goto _err;

    int iter1 = 0;
    int iter2 = 0;
H
Haojun Liao 已提交
428
    tdMergeTwoDataCols(target, pTarget, &iter1, source, &iter2, pTarget->numOfRows + rowsToMerge);
H
TD-166  
hzcheng 已提交
429
  }
H
TD-100  
hzcheng 已提交
430 431 432 433 434 435 436 437

  tdFreeDataCols(pTarget);
  return 0;

_err:
  tdFreeDataCols(pTarget);
  return -1;
}
H
TD-100  
hzcheng 已提交
438

H
TD-100  
hzcheng 已提交
439
void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCols *src2, int *iter2, int tRows) {
H
TD-166  
hzcheng 已提交
440
  // TODO: add resolve duplicate key here
H
TD-100  
hzcheng 已提交
441 442
  tdResetDataCols(target);

H
Haojun Liao 已提交
443 444
  while (target->numOfRows < tRows) {
    if (*iter1 >= src1->numOfRows && *iter2 >= src2->numOfRows) break;
H
TD-100  
hzcheng 已提交
445

H
Haojun Liao 已提交
446 447
    TSKEY key1 = (*iter1 >= src1->numOfRows) ? INT64_MAX : ((TSKEY *)(src1->cols[0].pData))[*iter1];
    TSKEY key2 = (*iter2 >= src2->numOfRows) ? INT64_MAX : ((TSKEY *)(src2->cols[0].pData))[*iter2];
H
TD-100  
hzcheng 已提交
448

H
Hongze Cheng 已提交
449
    if (key1 <= key2) {
H
TD-100  
hzcheng 已提交
450 451
      for (int i = 0; i < src1->numOfCols; i++) {
        ASSERT(target->cols[i].type == src1->cols[i].type);
H
Haojun Liao 已提交
452
        dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows,
H
TD-166  
hzcheng 已提交
453
                         target->maxPoints);
H
TD-100  
hzcheng 已提交
454 455
      }

H
Haojun Liao 已提交
456
      target->numOfRows++;
H
TD-100  
hzcheng 已提交
457
      (*iter1)++;
H
Hongze Cheng 已提交
458 459
      if (key1 == key2) (*iter2)++;
    } else {
H
TD-100  
hzcheng 已提交
460 461
      for (int i = 0; i < src2->numOfCols; i++) {
        ASSERT(target->cols[i].type == src2->cols[i].type);
H
Haojun Liao 已提交
462
        dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src2->cols + i, *iter2), target->numOfRows,
H
TD-166  
hzcheng 已提交
463
                         target->maxPoints);
H
TD-100  
hzcheng 已提交
464
      }
H
TD-100  
hzcheng 已提交
465

H
Haojun Liao 已提交
466
      target->numOfRows++;
H
TD-100  
hzcheng 已提交
467
      (*iter2)++;
H
TD-100  
hzcheng 已提交
468 469
    }
  }
H
Hongze Cheng 已提交
470 471
}

H
Hongze Cheng 已提交
472 473
SKVRow tdKVRowDup(SKVRow row) {
  SKVRow trow = malloc(kvRowLen(row));
H
Hongze Cheng 已提交
474 475
  if (trow == NULL) return NULL;

H
Hongze Cheng 已提交
476
  kvRowCpy(trow, row);
H
Hongze Cheng 已提交
477 478 479
  return trow;
}

H
TD-90  
Hongze Cheng 已提交
480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569
int tdSetKVRowDataOfCol(SKVRow *orow, int16_t colId, int8_t type, void *value) {
  SColIdx *pColIdx = NULL;
  SKVRow   row = *orow;
  SKVRow   nrow = NULL;
  void *   ptr = taosbsearch(&colId, kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), comparTagId, TD_GE);

  if (ptr == NULL || ((SColIdx *)ptr)->colId < colId) { // need to add a column value to the row
    // int tlen = kvDataRowLen(row) + sizeof(SColIdx) + (IS_VAR_DATA_TYPE(type) ? varDataTLen(value) :
    // TYPE_BYTES[type]); nrow = malloc(tlen); if (nrow == NULL) return NULL;

    // kvDataRowSetNCols(nrow, kvDataRowNCols(row)+1);
    // kvDataRowSetLen(nrow, tlen);

    // if (ptr == NULL) ptr = kvDataRowValues(row);

    // // Copy the columns before the col
    // if (POINTER_DISTANCE(ptr, kvDataRowColIdx(row)) > 0) {
    //   memcpy(kvDataRowColIdx(nrow), kvDataRowColIdx(row), POINTER_DISTANCE(ptr, kvDataRowColIdx(row)));
    //   memcpy(kvDataRowValues(nrow), kvDataRowValues(row), ((SColIdx *)ptr)->offset); // TODO: here is not correct
    // }

    // // Set the new col value
    // pColIdx = (SColIdx *)POINTER_SHIFT(nrow, POINTER_DISTANCE(ptr, row));
    // pColIdx->colId = colId;
    // pColIdx->offset = ((SColIdx *)ptr)->offset; // TODO: here is not correct

    // if (IS_VAR_DATA_TYPE(type)) {
    //   memcpy(POINTER_SHIFT(kvDataRowValues(nrow), pColIdx->offset), value, varDataLen(value));
    // } else {
    //   memcpy(POINTER_SHIFT(kvDataRowValues(nrow), pColIdx->offset), value, TYPE_BYTES[type]);
    // }

    // // Copy the columns after the col
    // if (POINTER_DISTANCE(kvDataRowValues(row), ptr) > 0) {
    //   // TODO: memcpy();
    // }
  } else {
    ASSERT(((SColIdx *)ptr)->colId == colId);
    if (IS_VAR_DATA_TYPE(type)) {
      void *pOldVal = kvRowColVal(row, (SColIdx *)ptr);

      if (varDataTLen(value) == varDataTLen(pOldVal)) { // just update the column value in place
        memcpy(pOldVal, value, varDataTLen(value));
      } else { // need to reallocate the memory
        int16_t diff = varDataTLen(value) - varDataTLen(pOldVal);
        int16_t nlen = kvRowLen(row) + diff;
        ASSERT(nlen > 0);
        nrow = malloc(nlen);
        if (nrow == NULL) {
          // TODO: deal with the error here
        }

        kvRowSetLen(nrow, nlen);
        kvRowSetNCols(nrow, kvRowNCols(row));

        // Copy part ahead
        nlen = POINTER_DISTANCE(ptr, kvRowColIdx(row));
        ASSERT(nlen % sizeof(SColIdx) == 0);
        if (nlen > 0) {
          ASSERT(((SColIdx *)ptr)->offset > 0);
          memcpy(kvRowColIdx(nrow), kvRowColIdx(row), nlen);
          memcpy(kvRowValues(nrow), kvRowValues(row), ((SColIdx *)ptr)->offset);
        }

        // Construct current column value
        int colIdx = nlen / sizeof(SColIdx);
        pColIdx = kvRowColIdxAt(nrow, colIdx);
        pColIdx->colId = ((SColIdx *)ptr)->colId;
        pColIdx->offset = ((SColIdx *)ptr)->offset;
        memcpy(kvRowColVal(nrow, pColIdx), value, varDataTLen(value));
 
        // Construct columns after
        if (kvRowNCols(nrow) - colIdx - 1 > 0) {
          for (int i = colIdx + 1; i < kvRowNCols(nrow); i++) {
            kvRowColIdxAt(nrow, i)->colId = kvRowColIdxAt(row, i)->colId;
            kvRowColIdxAt(nrow, i)->offset += diff;
          }
          memcpy(kvRowColVal(nrow, kvRowColIdxAt(nrow, colIdx + 1)), kvRowColVal(row, kvRowColIdxAt(row, colIdx + 1)),
                 POINTER_DISTANCE(kvRowEnd(row), kvRowColVal(row, kvRowColIdxAt(row, colIdx + 1))));
        }

        free(row);
        *orow = nrow;
      }
    } else {
      memcpy(kvRowColVal(row, (SColIdx *)ptr), value, TYPE_BYTES[type]);
    }
  }

  return 0;
H
Hongze Cheng 已提交
570 571
}

H
Hongze Cheng 已提交
572
void *tdEncodeKVRow(void *buf, SKVRow row) {
H
Hongze Cheng 已提交
573
  // May change the encode purpose
H
Hongze Cheng 已提交
574 575
  kvRowCpy(buf, row);
  return POINTER_SHIFT(buf, kvRowLen(row));
H
Hongze Cheng 已提交
576 577
}

H
Hongze Cheng 已提交
578 579 580
void *tdDecodeKVRow(void *buf, SKVRow *row) {
  *row = tdKVRowDup(buf);
  return POINTER_SHIFT(buf, kvRowLen(*row));
H
Hongze Cheng 已提交
581 582
}

H
Hongze Cheng 已提交
583
int tdInitKVRowBuilder(SKVRowBuilder *pBuilder) {
H
Hongze Cheng 已提交
584 585 586 587 588 589 590 591 592 593 594 595 596 597
  pBuilder->tCols = 128;
  pBuilder->nCols = 0;
  pBuilder->pColIdx = (SColIdx *)malloc(sizeof(SColIdx) * pBuilder->tCols);
  if (pBuilder->pColIdx == NULL) return -1;
  pBuilder->alloc = 1024;
  pBuilder->size = 0;
  pBuilder->buf = malloc(pBuilder->alloc);
  if (pBuilder->buf == NULL) {
    free(pBuilder->pColIdx);
    return -1;
  }
  return 0;
}

H
Hongze Cheng 已提交
598
void tdDestroyKVRowBuilder(SKVRowBuilder *pBuilder) {
H
Hongze Cheng 已提交
599 600 601 602
  tfree(pBuilder->pColIdx);
  tfree(pBuilder->buf);
}

H
Hongze Cheng 已提交
603
void tdResetKVRowBuilder(SKVRowBuilder *pBuilder) {
H
Hongze Cheng 已提交
604 605 606 607
  pBuilder->nCols = 0;
  pBuilder->size = 0;
}

H
Hongze Cheng 已提交
608
SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder) {
H
Hongze Cheng 已提交
609 610 611
  int tlen = sizeof(SColIdx) * pBuilder->nCols + pBuilder->size;
  if (tlen == 0) return NULL;

H
Hongze Cheng 已提交
612 613 614
  tlen += TD_KV_ROW_HEAD_SIZE;

  SKVRow row = malloc(tlen);
H
Hongze Cheng 已提交
615 616
  if (row == NULL) return NULL;

H
Hongze Cheng 已提交
617
  kvRowSetNCols(row, pBuilder->nCols);
H
Hongze Cheng 已提交
618
  kvRowSetLen(row, tlen);
H
Hongze Cheng 已提交
619

H
Hongze Cheng 已提交
620 621
  memcpy(kvRowColIdx(row), pBuilder->pColIdx, sizeof(SColIdx) * pBuilder->nCols);
  memcpy(kvRowValues(row), pBuilder->buf, pBuilder->size);
H
Hongze Cheng 已提交
622 623

  return row;
H
TD-100  
hzcheng 已提交
624
}