tdataformat.c 20.3 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
S
slguan 已提交
15
#include "tdataformat.h"
T
Tao Liu 已提交
16
#include "talgo.h"
H
Hongze Cheng 已提交
17
#include "wchar.h"
H
more  
hzcheng 已提交
18

H
hzcheng 已提交
19 20 21 22
/**
 * Duplicate the schema and return a new object
 */
STSchema *tdDupSchema(STSchema *pSchema) {
H
Hongze Cheng 已提交
23 24 25

  int tlen = sizeof(STSchema) + sizeof(STColumn) * schemaNCols(pSchema);
  STSchema *tSchema = (STSchema *)malloc(tlen);
H
hzcheng 已提交
26 27
  if (tSchema == NULL) return NULL;

H
Hongze Cheng 已提交
28
  memcpy((void *)tSchema, (void *)pSchema, tlen);
H
hzcheng 已提交
29 30 31 32

  return tSchema;
}

H
TD-27  
hzcheng 已提交
33 34 35 36
/**
 * Encode a schema to dst, and return the next pointer
 */
void *tdEncodeSchema(void *dst, STSchema *pSchema) {
H
TD-166  
hzcheng 已提交
37

H
Hongze Cheng 已提交
38 39
  T_APPEND_MEMBER(dst, pSchema, STSchema, version);
  T_APPEND_MEMBER(dst, pSchema, STSchema, numOfCols);
H
TD-27  
hzcheng 已提交
40 41 42 43 44 45 46 47 48 49 50 51 52 53
  for (int i = 0; i < schemaNCols(pSchema); i++) {
    STColumn *pCol = schemaColAt(pSchema, i);
    T_APPEND_MEMBER(dst, pCol, STColumn, type);
    T_APPEND_MEMBER(dst, pCol, STColumn, colId);
    T_APPEND_MEMBER(dst, pCol, STColumn, bytes);
  }

  return dst;
}

/**
 * Decode a schema from a binary.
 */
STSchema *tdDecodeSchema(void **psrc) {
H
TD-166  
hzcheng 已提交
54
  int totalCols = 0;
H
Hongze Cheng 已提交
55 56
  int version = 0;
  STSchemaBuilder schemaBuilder = {0};
H
TD-27  
hzcheng 已提交
57

H
Hongze Cheng 已提交
58
  T_READ_MEMBER(*psrc, int, version);
H
TD-166  
hzcheng 已提交
59
  T_READ_MEMBER(*psrc, int, totalCols);
H
TD-27  
hzcheng 已提交
60

H
Hongze Cheng 已提交
61 62
  if (tdInitTSchemaBuilder(&schemaBuilder, version) < 0) return NULL;

H
TD-166  
hzcheng 已提交
63
  for (int i = 0; i < totalCols; i++) {
H
TD-27  
hzcheng 已提交
64 65 66 67 68 69 70
    int8_t  type = 0;
    int16_t colId = 0;
    int32_t bytes = 0;
    T_READ_MEMBER(*psrc, int8_t, type);
    T_READ_MEMBER(*psrc, int16_t, colId);
    T_READ_MEMBER(*psrc, int32_t, bytes);

H
Hongze Cheng 已提交
71 72 73 74
    if (tdAddColToSchema(&schemaBuilder, type, colId, bytes) < 0) {
      tdDestroyTSchemaBuilder(&schemaBuilder);
      return NULL;
    }
H
TD-27  
hzcheng 已提交
75 76
  }

H
Hongze Cheng 已提交
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
  STSchema *pSchema = tdGetSchemaFromBuilder(&schemaBuilder);
  tdDestroyTSchemaBuilder(&schemaBuilder);
  return pSchema;
}

int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) {
  if (pBuilder == NULL) return -1;

  pBuilder->tCols = 256;
  pBuilder->columns = (STColumn *)malloc(sizeof(STColumn) * pBuilder->tCols);
  if (pBuilder->columns == NULL) return -1;

  tdResetTSchemaBuilder(pBuilder, version);
  return 0;
}

void tdDestroyTSchemaBuilder(STSchemaBuilder *pBuilder) {
  if (pBuilder) {
    tfree(pBuilder->columns);
  }
}

void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) {
  pBuilder->nCols = 0;
  pBuilder->tlen = 0;
  pBuilder->flen = 0;
T
Tao Liu 已提交
103
  pBuilder->vlen = 0;
H
Hongze Cheng 已提交
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
  pBuilder->version = version;
}

int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int16_t colId, int32_t bytes) {
  if (!isValidDataType(type, 0)) return -1;

  if (pBuilder->nCols >= pBuilder->tCols) {
    pBuilder->tCols *= 2;
    pBuilder->columns = (STColumn *)realloc(pBuilder->columns, sizeof(STColumn) * pBuilder->tCols);
    if (pBuilder->columns == NULL) return -1;
  }

  STColumn *pCol = &(pBuilder->columns[pBuilder->nCols]);
  colSetType(pCol, type);
  colSetColId(pCol, colId);
  if (pBuilder->nCols == 0) {
    colSetOffset(pCol, 0);
  } else {
    STColumn *pTCol = &(pBuilder->columns[pBuilder->nCols-1]);
    colSetOffset(pCol, pTCol->offset + TYPE_BYTES[pTCol->type]);
  }

  if (IS_VAR_DATA_TYPE(type)) {
    colSetBytes(pCol, bytes);
T
Tao Liu 已提交
128 129
    pBuilder->tlen += (TYPE_BYTES[type] + bytes);
    pBuilder->vlen += bytes - sizeof(VarDataLenT);
H
Hongze Cheng 已提交
130 131 132
  } else {
    colSetBytes(pCol, TYPE_BYTES[type]);
    pBuilder->tlen += TYPE_BYTES[type];
T
Tao Liu 已提交
133
    pBuilder->vlen += TYPE_BYTES[type];
H
Hongze Cheng 已提交
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
  }

  pBuilder->nCols++;
  pBuilder->flen += TYPE_BYTES[type];

  ASSERT(pCol->offset < pBuilder->flen);

  return 0;
}

STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder) {
  if (pBuilder->nCols <= 0) return NULL;

  int tlen = sizeof(STSchema) + sizeof(STColumn) * pBuilder->nCols;

  STSchema *pSchema = (STSchema *)malloc(tlen);
  if (pSchema == NULL) return NULL;

  schemaVersion(pSchema) = pBuilder->version;
  schemaNCols(pSchema) = pBuilder->nCols;
  schemaTLen(pSchema) = pBuilder->tlen;
  schemaFLen(pSchema) = pBuilder->flen;
T
Tao Liu 已提交
156
  schemaVLen(pSchema) = pBuilder->vlen;
H
Hongze Cheng 已提交
157 158 159

  memcpy(schemaColAt(pSchema, 0), pBuilder->columns, sizeof(STColumn) * pBuilder->nCols);

H
TD-27  
hzcheng 已提交
160 161 162
  return pSchema;
}

H
hzcheng 已提交
163 164 165
/**
 * Initialize a data row
 */
H
TD-90  
Hongze Cheng 已提交
166 167 168 169
void tdInitDataRow(SDataRow row, STSchema *pSchema) {
  dataRowSetLen(row, TD_DATA_ROW_HEAD_SIZE + schemaFLen(pSchema));
  dataRowSetVersion(row, schemaVersion(pSchema));
}
H
hzcheng 已提交
170

H
TD-166  
hzcheng 已提交
171
SDataRow tdNewDataRowFromSchema(STSchema *pSchema) {
H
TD-166  
hzcheng 已提交
172
  int32_t size = dataRowMaxBytesFromSchema(pSchema);
H
hzcheng 已提交
173 174 175 176

  SDataRow row = malloc(size);
  if (row == NULL) return NULL;

H
hzcheng 已提交
177
  tdInitDataRow(row, pSchema);
H
hzcheng 已提交
178
  return row;
H
TD-166  
hzcheng 已提交
179
}
H
hzcheng 已提交
180

H
hzcheng 已提交
181 182 183 184 185 186 187
/**
 * Free the SDataRow object
 */
void tdFreeDataRow(SDataRow row) {
  if (row) free(row);
}

H
hzcheng 已提交
188
SDataRow tdDataRowDup(SDataRow row) {
H
hzcheng 已提交
189
  SDataRow trow = malloc(dataRowLen(row));
H
hzcheng 已提交
190 191 192
  if (trow == NULL) return NULL;

  dataRowCpy(trow, row);
H
hzcheng 已提交
193
  return trow;
H
hzcheng 已提交
194
}
H
hzcheng 已提交
195

H
TD-166  
hzcheng 已提交
196 197 198 199
void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints) {
  pDataCol->type = colType(pCol);
  pDataCol->colId = colColId(pCol);
  pDataCol->bytes = colBytes(pCol);
H
TD-166  
hzcheng 已提交
200
  pDataCol->offset = colOffset(pCol) + TD_DATA_ROW_HEAD_SIZE;
H
TD-166  
hzcheng 已提交
201 202 203 204

  pDataCol->len = 0;
  if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) {
    pDataCol->dataOff = (VarDataOffsetT *)(*pBuf);
H
Hongze Cheng 已提交
205 206 207
    pDataCol->pData = POINTER_SHIFT(*pBuf, sizeof(VarDataOffsetT) * maxPoints);
    pDataCol->spaceSize = pDataCol->bytes * maxPoints;
    *pBuf = POINTER_SHIFT(*pBuf, pDataCol->spaceSize + sizeof(VarDataOffsetT) * maxPoints);
H
TD-166  
hzcheng 已提交
208 209 210 211
  } else {
    pDataCol->spaceSize = pDataCol->bytes * maxPoints;
    pDataCol->dataOff = NULL;
    pDataCol->pData = *pBuf;
H
hzcheng 已提交
212
    *pBuf = POINTER_SHIFT(*pBuf, pDataCol->spaceSize);
H
TD-166  
hzcheng 已提交
213 214 215
  }
}

H
Haojun Liao 已提交
216
void dataColAppendVal(SDataCol *pCol, void *value, int numOfRows, int maxPoints) {
H
TD-166  
hzcheng 已提交
217 218 219 220 221 222
  ASSERT(pCol != NULL && value != NULL);

  switch (pCol->type) {
    case TSDB_DATA_TYPE_BINARY:
    case TSDB_DATA_TYPE_NCHAR:
      // set offset
H
Haojun Liao 已提交
223
      pCol->dataOff[numOfRows] = pCol->len;
H
TD-166  
hzcheng 已提交
224
      // Copy data
H
hzcheng 已提交
225
      memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, varDataTLen(value));
H
TD-166  
hzcheng 已提交
226
      // Update the length
H
TD-166  
hzcheng 已提交
227
      pCol->len += varDataTLen(value);
H
TD-166  
hzcheng 已提交
228 229
      break;
    default:
H
Haojun Liao 已提交
230
      ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows);
H
hzcheng 已提交
231
      memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, pCol->bytes);
H
TD-166  
hzcheng 已提交
232 233 234 235 236
      pCol->len += pCol->bytes;
      break;
  }
}

H
Haojun Liao 已提交
237 238
void dataColPopPoints(SDataCol *pCol, int pointsToPop, int numOfRows) {
  int pointsLeft = numOfRows - pointsToPop;
H
TD-166  
hzcheng 已提交
239 240 241 242 243

  ASSERT(pointsLeft > 0);

  if (pCol->type == TSDB_DATA_TYPE_BINARY || pCol->type == TSDB_DATA_TYPE_NCHAR) {
    ASSERT(pCol->len > 0);
H
TD-166  
hzcheng 已提交
244
    VarDataOffsetT toffset = pCol->dataOff[pointsToPop];
H
TD-166  
hzcheng 已提交
245 246
    pCol->len = pCol->len - toffset;
    ASSERT(pCol->len > 0);
H
hzcheng 已提交
247
    memmove(pCol->pData, POINTER_SHIFT(pCol->pData, toffset), pCol->len);
H
TD-166  
hzcheng 已提交
248 249
    dataColSetOffset(pCol, pointsLeft);
  } else {
H
Haojun Liao 已提交
250
    ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows);
H
TD-166  
hzcheng 已提交
251
    pCol->len = TYPE_BYTES[pCol->type] * pointsLeft;
H
hzcheng 已提交
252
    memmove(pCol->pData, POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * pointsToPop), pCol->len);
H
TD-166  
hzcheng 已提交
253 254 255
  }
}

H
TD-166  
hzcheng 已提交
256 257 258 259 260
bool isNEleNull(SDataCol *pCol, int nEle) {
  switch (pCol->type) {
    case TSDB_DATA_TYPE_BINARY:
    case TSDB_DATA_TYPE_NCHAR:
      for (int i = 0; i < nEle; i++) {
H
TD-166  
hzcheng 已提交
261
        if (!isNull(varDataVal(tdGetColDataOfRow(pCol, i)), pCol->type)) return false;
H
TD-166  
hzcheng 已提交
262 263 264 265 266 267 268 269 270 271
      }
      return true;
    default:
      for (int i = 0; i < nEle; i++) {
        if (!isNull(tdGetColDataOfRow(pCol, i), pCol->type)) return false;
      }
      return true;
  }
}

H
TD-90  
Hongze Cheng 已提交
272 273 274 275
void dataColSetNullAt(SDataCol *pCol, int index) {
  if (IS_VAR_DATA_TYPE(pCol->type)) {
    pCol->dataOff[index] = pCol->len;
    char *ptr = POINTER_SHIFT(pCol->pData, pCol->len);
276
    setVardataNull(ptr, pCol->type);
H
TD-90  
Hongze Cheng 已提交
277 278 279 280 281 282 283
    pCol->len += varDataTLen(ptr);
  } else {
    setNull(POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * index), pCol->type, pCol->bytes);
    pCol->len += TYPE_BYTES[pCol->type];
  }
}

H
TD-166  
hzcheng 已提交
284
void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints) {
H
TD-166  
hzcheng 已提交
285

H
TD-90  
Hongze Cheng 已提交
286 287 288 289 290 291 292 293
  if (IS_VAR_DATA_TYPE(pCol->type)) {
    pCol->len = 0;
    for (int i = 0; i < nEle; i++) {
      dataColSetNullAt(pCol, i);
    }
  } else {
    setNullN(pCol->pData, pCol->type, pCol->bytes, nEle);
    pCol->len = TYPE_BYTES[pCol->type] * nEle;
H
TD-166  
hzcheng 已提交
294 295 296
  }
}

H
TD-166  
hzcheng 已提交
297 298 299
void dataColSetOffset(SDataCol *pCol, int nEle) {
  ASSERT(((pCol->type == TSDB_DATA_TYPE_BINARY) || (pCol->type == TSDB_DATA_TYPE_NCHAR)));

H
Hongze Cheng 已提交
300
  void *tptr = pCol->pData;
H
TD-166  
hzcheng 已提交
301
  // char *tptr = (char *)(pCol->pData);
H
TD-166  
hzcheng 已提交
302

H
TD-166  
hzcheng 已提交
303
  VarDataOffsetT offset = 0;
H
TD-166  
hzcheng 已提交
304
  for (int i = 0; i < nEle; i++) {
H
TD-166  
hzcheng 已提交
305
    pCol->dataOff[i] = offset;
H
TD-166  
hzcheng 已提交
306
    offset += varDataTLen(tptr);
H
hzcheng 已提交
307
    tptr = POINTER_SHIFT(tptr, varDataTLen(tptr));
H
TD-166  
hzcheng 已提交
308 309 310
  }
}

H
TD-166  
hzcheng 已提交
311
SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) {
H
TD-34  
hzcheng 已提交
312 313 314 315 316 317
  SDataCols *pCols = (SDataCols *)calloc(1, sizeof(SDataCols) + sizeof(SDataCol) * maxCols);
  if (pCols == NULL) return NULL;

  pCols->maxRowSize = maxRowSize;
  pCols->maxCols = maxCols;
  pCols->maxPoints = maxRows;
H
TD-166  
hzcheng 已提交
318
  pCols->bufSize = maxRowSize * maxRows;
H
TD-34  
hzcheng 已提交
319

H
TD-166  
hzcheng 已提交
320
  pCols->buf = malloc(pCols->bufSize);
H
TD-34  
hzcheng 已提交
321 322
  if (pCols->buf == NULL) {
    free(pCols);
H
TD-34  
hzcheng 已提交
323 324
    return NULL;
  }
H
TD-34  
hzcheng 已提交
325

H
TD-34  
hzcheng 已提交
326 327 328 329 330 331 332 333
  return pCols;
}

void tdInitDataCols(SDataCols *pCols, STSchema *pSchema) {
  // assert(schemaNCols(pSchema) <= pCols->numOfCols);
  tdResetDataCols(pCols);
  pCols->numOfCols = schemaNCols(pSchema);

H
TD-166  
hzcheng 已提交
334
  void *ptr = pCols->buf;
H
TD-34  
hzcheng 已提交
335
  for (int i = 0; i < schemaNCols(pSchema); i++) {
H
TD-166  
hzcheng 已提交
336
    dataColInit(pCols->cols + i, schemaColAt(pSchema, i), &ptr, pCols->maxPoints);
H
TD-166  
hzcheng 已提交
337
    ASSERT((char *)ptr - (char *)(pCols->buf) <= pCols->bufSize);
H
TD-34  
hzcheng 已提交
338
  }
H
TD-34  
hzcheng 已提交
339 340 341 342
}

void tdFreeDataCols(SDataCols *pCols) {
  if (pCols) {
H
TD-166  
hzcheng 已提交
343
    tfree(pCols->buf);
H
TD-34  
hzcheng 已提交
344 345 346 347
    free(pCols);
  }
}

H
TD-100  
hzcheng 已提交
348
SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
H
TD-166  
hzcheng 已提交
349
  SDataCols *pRet = tdNewDataCols(pDataCols->maxRowSize, pDataCols->maxCols, pDataCols->maxPoints);
H
TD-100  
hzcheng 已提交
350 351 352 353
  if (pRet == NULL) return NULL;

  pRet->numOfCols = pDataCols->numOfCols;
  pRet->sversion = pDataCols->sversion;
H
Haojun Liao 已提交
354
  if (keepData) pRet->numOfRows = pDataCols->numOfRows;
H
TD-100  
hzcheng 已提交
355 356 357 358 359 360

  for (int i = 0; i < pDataCols->numOfCols; i++) {
    pRet->cols[i].type = pDataCols->cols[i].type;
    pRet->cols[i].colId = pDataCols->cols[i].colId;
    pRet->cols[i].bytes = pDataCols->cols[i].bytes;
    pRet->cols[i].offset = pDataCols->cols[i].offset;
H
TD-166  
hzcheng 已提交
361 362

    pRet->cols[i].spaceSize = pDataCols->cols[i].spaceSize;
H
TD-100  
hzcheng 已提交
363
    pRet->cols[i].pData = (void *)((char *)pRet->buf + ((char *)(pDataCols->cols[i].pData) - (char *)(pDataCols->buf)));
H
TD-100  
hzcheng 已提交
364

H
TD-166  
hzcheng 已提交
365 366 367 368 369 370 371 372 373 374
    if (pRet->cols[i].type == TSDB_DATA_TYPE_BINARY || pRet->cols[i].type == TSDB_DATA_TYPE_NCHAR) {
      ASSERT(pDataCols->cols[i].dataOff != NULL);
      pRet->cols[i].dataOff =
          (int32_t *)((char *)pRet->buf + ((char *)(pDataCols->cols[i].dataOff) - (char *)(pDataCols->buf)));
    }

    if (keepData) {
      pRet->cols[i].len = pDataCols->cols[i].len;
      memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pDataCols->cols[i].len);
      if (pRet->cols[i].type == TSDB_DATA_TYPE_BINARY || pRet->cols[i].type == TSDB_DATA_TYPE_NCHAR) {
H
TD-166  
hzcheng 已提交
375
        memcpy(pRet->cols[i].dataOff, pDataCols->cols[i].dataOff, sizeof(VarDataOffsetT) * pDataCols->maxPoints);
H
TD-166  
hzcheng 已提交
376 377
      }
    }
H
TD-100  
hzcheng 已提交
378 379 380 381 382
  }

  return pRet;
}

H
TD-34  
hzcheng 已提交
383
void tdResetDataCols(SDataCols *pCols) {
H
Haojun Liao 已提交
384
  pCols->numOfRows = 0;
H
TD-34  
hzcheng 已提交
385
  for (int i = 0; i < pCols->maxCols; i++) {
H
TD-166  
hzcheng 已提交
386
    dataColReset(pCols->cols + i);
H
TD-34  
hzcheng 已提交
387 388 389
  }
}

H
TD-90  
Hongze Cheng 已提交
390
void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols) {
H
TD-166  
hzcheng 已提交
391 392
  ASSERT(dataColsKeyLast(pCols) < dataRowKey(row));

H
TD-90  
Hongze Cheng 已提交
393 394 395 396 397 398 399 400 401 402
  int rcol = 0;
  int dcol = 0;

  while (dcol < pCols->numOfCols) {
    SDataCol *pDataCol = &(pCols->cols[dcol]);
    if (rcol >= schemaNCols(pSchema)) {
      dataColSetNullAt(pDataCol, pCols->numOfRows);
      dcol++;
      continue;
    }
H
TD-166  
hzcheng 已提交
403

H
TD-90  
Hongze Cheng 已提交
404 405
    STColumn *pRowCol = schemaColAt(pSchema, rcol);
    if (pRowCol->colId == pDataCol->colId) {
H
TD-90  
Hongze Cheng 已提交
406 407
      void *value = tdGetRowDataOfCol(row, pRowCol->type, pRowCol->offset+TD_DATA_ROW_HEAD_SIZE);
      dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints);
H
TD-90  
Hongze Cheng 已提交
408 409 410 411 412 413 414 415
      dcol++;
      rcol++;
    } else if (pRowCol->colId < pDataCol->colId) {
      rcol++;
    } else {
      dataColSetNullAt(pDataCol, pCols->numOfRows);
      dcol++;
    }
H
TD-34  
hzcheng 已提交
416
  }
H
Haojun Liao 已提交
417
  pCols->numOfRows++;
H
TD-34  
hzcheng 已提交
418
}
H
TD-166  
hzcheng 已提交
419

H
TD-34  
hzcheng 已提交
420 421
// Pop pointsToPop points from the SDataCols
void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop) {
H
Haojun Liao 已提交
422
  int pointsLeft = pCols->numOfRows - pointsToPop;
H
TD-166  
hzcheng 已提交
423
  if (pointsLeft <= 0) {
H
TD-166  
hzcheng 已提交
424 425 426
    tdResetDataCols(pCols);
    return;
  }
H
TD-34  
hzcheng 已提交
427 428

  for (int iCol = 0; iCol < pCols->numOfCols; iCol++) {
H
TD-166  
hzcheng 已提交
429
    SDataCol *pCol = pCols->cols + iCol;
H
Haojun Liao 已提交
430
    dataColPopPoints(pCol, pointsToPop, pCols->numOfRows);
H
TD-34  
hzcheng 已提交
431
  }
H
Haojun Liao 已提交
432
  pCols->numOfRows = pointsLeft;
H
TD-34  
hzcheng 已提交
433
}
H
TD-34  
hzcheng 已提交
434

H
hzcheng 已提交
435
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) {
H
Haojun Liao 已提交
436 437
  ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows);
  ASSERT(target->numOfRows + rowsToMerge <= target->maxPoints);
H
TD-166  
hzcheng 已提交
438
  ASSERT(target->numOfCols == source->numOfCols);
H
TD-100  
hzcheng 已提交
439

H
TD-166  
hzcheng 已提交
440
  SDataCols *pTarget = NULL;
H
TD-100  
hzcheng 已提交
441

H
TD-166  
hzcheng 已提交
442 443 444
  if (dataColsKeyLast(target) < dataColsKeyFirst(source)) {  // No overlap
    for (int i = 0; i < rowsToMerge; i++) {
      for (int j = 0; j < source->numOfCols; j++) {
H
Haojun Liao 已提交
445
        dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i), target->numOfRows,
H
TD-166  
hzcheng 已提交
446 447
                         target->maxPoints);
      }
H
Haojun Liao 已提交
448
      target->numOfRows++;
H
TD-166  
hzcheng 已提交
449 450 451 452 453 454 455
    }
  } else {
    pTarget = tdDupDataCols(target, true);
    if (pTarget == NULL) goto _err;

    int iter1 = 0;
    int iter2 = 0;
H
TD-521  
Hongze Cheng 已提交
456 457
    tdMergeTwoDataCols(target, pTarget, &iter1, pTarget->numOfRows, source, &iter2, source->numOfRows,
                       pTarget->numOfRows + rowsToMerge);
H
TD-166  
hzcheng 已提交
458
  }
H
TD-100  
hzcheng 已提交
459 460 461 462 463 464 465 466

  tdFreeDataCols(pTarget);
  return 0;

_err:
  tdFreeDataCols(pTarget);
  return -1;
}
H
TD-100  
hzcheng 已提交
467

H
TD-521  
Hongze Cheng 已提交
468
void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, int limit2, int tRows) {
H
TD-100  
hzcheng 已提交
469
  tdResetDataCols(target);
H
TD-521  
Hongze Cheng 已提交
470
  ASSERT(limit1 <= src1->numOfRows && limit2 <= src2->numOfRows);
H
TD-100  
hzcheng 已提交
471

H
Haojun Liao 已提交
472
  while (target->numOfRows < tRows) {
H
TD-521  
Hongze Cheng 已提交
473
    if (*iter1 >= limit1 && *iter2 >= limit2) break;
H
TD-100  
hzcheng 已提交
474

H
TD-521  
Hongze Cheng 已提交
475 476
    TSKEY key1 = (*iter1 >= limit1) ? INT64_MAX : ((TSKEY *)(src1->cols[0].pData))[*iter1];
    TSKEY key2 = (*iter2 >= limit2) ? INT64_MAX : ((TSKEY *)(src2->cols[0].pData))[*iter2];
H
TD-100  
hzcheng 已提交
477

H
Hongze Cheng 已提交
478
    if (key1 <= key2) {
H
TD-100  
hzcheng 已提交
479 480
      for (int i = 0; i < src1->numOfCols; i++) {
        ASSERT(target->cols[i].type == src1->cols[i].type);
H
Haojun Liao 已提交
481
        dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows,
H
TD-166  
hzcheng 已提交
482
                         target->maxPoints);
H
TD-100  
hzcheng 已提交
483 484
      }

H
Haojun Liao 已提交
485
      target->numOfRows++;
H
TD-100  
hzcheng 已提交
486
      (*iter1)++;
H
Hongze Cheng 已提交
487 488
      if (key1 == key2) (*iter2)++;
    } else {
H
TD-100  
hzcheng 已提交
489 490
      for (int i = 0; i < src2->numOfCols; i++) {
        ASSERT(target->cols[i].type == src2->cols[i].type);
H
Haojun Liao 已提交
491
        dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src2->cols + i, *iter2), target->numOfRows,
H
TD-166  
hzcheng 已提交
492
                         target->maxPoints);
H
TD-100  
hzcheng 已提交
493
      }
H
TD-100  
hzcheng 已提交
494

H
Haojun Liao 已提交
495
      target->numOfRows++;
H
TD-100  
hzcheng 已提交
496
      (*iter2)++;
H
TD-100  
hzcheng 已提交
497 498
    }
  }
H
Hongze Cheng 已提交
499 500
}

H
Hongze Cheng 已提交
501 502
SKVRow tdKVRowDup(SKVRow row) {
  SKVRow trow = malloc(kvRowLen(row));
H
Hongze Cheng 已提交
503 504
  if (trow == NULL) return NULL;

H
Hongze Cheng 已提交
505
  kvRowCpy(trow, row);
H
Hongze Cheng 已提交
506 507 508
  return trow;
}

H
TD-90  
Hongze Cheng 已提交
509 510 511 512 513 514 515
int tdSetKVRowDataOfCol(SKVRow *orow, int16_t colId, int8_t type, void *value) {
  SColIdx *pColIdx = NULL;
  SKVRow   row = *orow;
  SKVRow   nrow = NULL;
  void *   ptr = taosbsearch(&colId, kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), comparTagId, TD_GE);

  if (ptr == NULL || ((SColIdx *)ptr)->colId < colId) { // need to add a column value to the row
H
TD-90  
Hongze Cheng 已提交
516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553
    int diff = IS_VAR_DATA_TYPE(type) ? varDataTLen(value) : TYPE_BYTES[type];
    nrow = malloc(kvRowLen(row) + sizeof(SColIdx) + diff);
    if (nrow == NULL) return -1;

    kvRowSetLen(nrow, kvRowLen(row) + sizeof(SColIdx) + diff);
    kvRowSetNCols(nrow, kvRowNCols(row) + 1);

    if (ptr == NULL) {
      memcpy(kvRowColIdx(nrow), kvRowColIdx(row), sizeof(SColIdx) * kvRowNCols(row));
      memcpy(kvRowValues(nrow), kvRowValues(row), POINTER_DISTANCE(kvRowEnd(row), kvRowValues(row)));
      int colIdx = kvRowNCols(nrow) - 1;
      kvRowColIdxAt(nrow, colIdx)->colId = colId;
      kvRowColIdxAt(nrow, colIdx)->offset = POINTER_DISTANCE(kvRowEnd(row), kvRowValues(row));
      memcpy(kvRowColVal(nrow, kvRowColIdxAt(nrow, colIdx)), value, diff);
    } else {
      int16_t tlen = POINTER_DISTANCE(ptr, kvRowColIdx(row));
      if (tlen > 0) {
        memcpy(kvRowColIdx(nrow), kvRowColIdx(row), tlen);
        memcpy(kvRowValues(nrow), kvRowValues(row), ((SColIdx *)ptr)->offset);
      }

      int colIdx = tlen / sizeof(SColIdx);
      kvRowColIdxAt(nrow, colIdx)->colId = colId;
      kvRowColIdxAt(nrow, colIdx)->offset = ((SColIdx *)ptr)->offset;
      memcpy(kvRowColVal(nrow, kvRowColIdxAt(nrow, colIdx)), value, diff);

      for (int i = colIdx; i < kvRowNCols(row); i++) {
        kvRowColIdxAt(nrow, i + 1)->colId = kvRowColIdxAt(row, i)->colId;
        kvRowColIdxAt(nrow, i + 1)->offset = kvRowColIdxAt(row, i)->offset + diff;
      }
      memcpy(kvRowColVal(nrow, kvRowColIdxAt(nrow, colIdx + 1)), kvRowColVal(row, kvRowColIdxAt(row, colIdx)),
             POINTER_DISTANCE(kvRowEnd(row), kvRowColVal(row, kvRowColIdxAt(row, colIdx)))

      );
    }

    *orow = nrow;
    free(row);
H
TD-90  
Hongze Cheng 已提交
554 555 556 557 558 559 560 561 562 563 564 565
  } else {
    ASSERT(((SColIdx *)ptr)->colId == colId);
    if (IS_VAR_DATA_TYPE(type)) {
      void *pOldVal = kvRowColVal(row, (SColIdx *)ptr);

      if (varDataTLen(value) == varDataTLen(pOldVal)) { // just update the column value in place
        memcpy(pOldVal, value, varDataTLen(value));
      } else { // need to reallocate the memory
        int16_t diff = varDataTLen(value) - varDataTLen(pOldVal);
        int16_t nlen = kvRowLen(row) + diff;
        ASSERT(nlen > 0);
        nrow = malloc(nlen);
H
TD-90  
Hongze Cheng 已提交
566
        if (nrow == NULL) return -1;
H
TD-90  
Hongze Cheng 已提交
567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590

        kvRowSetLen(nrow, nlen);
        kvRowSetNCols(nrow, kvRowNCols(row));

        // Copy part ahead
        nlen = POINTER_DISTANCE(ptr, kvRowColIdx(row));
        ASSERT(nlen % sizeof(SColIdx) == 0);
        if (nlen > 0) {
          ASSERT(((SColIdx *)ptr)->offset > 0);
          memcpy(kvRowColIdx(nrow), kvRowColIdx(row), nlen);
          memcpy(kvRowValues(nrow), kvRowValues(row), ((SColIdx *)ptr)->offset);
        }

        // Construct current column value
        int colIdx = nlen / sizeof(SColIdx);
        pColIdx = kvRowColIdxAt(nrow, colIdx);
        pColIdx->colId = ((SColIdx *)ptr)->colId;
        pColIdx->offset = ((SColIdx *)ptr)->offset;
        memcpy(kvRowColVal(nrow, pColIdx), value, varDataTLen(value));
 
        // Construct columns after
        if (kvRowNCols(nrow) - colIdx - 1 > 0) {
          for (int i = colIdx + 1; i < kvRowNCols(nrow); i++) {
            kvRowColIdxAt(nrow, i)->colId = kvRowColIdxAt(row, i)->colId;
H
Hongze Cheng 已提交
591
            kvRowColIdxAt(nrow, i)->offset = kvRowColIdxAt(row, i)->offset + diff;
H
TD-90  
Hongze Cheng 已提交
592 593 594 595 596 597
          }
          memcpy(kvRowColVal(nrow, kvRowColIdxAt(nrow, colIdx + 1)), kvRowColVal(row, kvRowColIdxAt(row, colIdx + 1)),
                 POINTER_DISTANCE(kvRowEnd(row), kvRowColVal(row, kvRowColIdxAt(row, colIdx + 1))));
        }

        *orow = nrow;
H
TD-90  
Hongze Cheng 已提交
598
        free(row);
H
TD-90  
Hongze Cheng 已提交
599 600 601 602 603 604 605
      }
    } else {
      memcpy(kvRowColVal(row, (SColIdx *)ptr), value, TYPE_BYTES[type]);
    }
  }

  return 0;
H
Hongze Cheng 已提交
606 607
}

H
Hongze Cheng 已提交
608
void *tdEncodeKVRow(void *buf, SKVRow row) {
H
Hongze Cheng 已提交
609
  // May change the encode purpose
H
Hongze Cheng 已提交
610 611
  kvRowCpy(buf, row);
  return POINTER_SHIFT(buf, kvRowLen(row));
H
Hongze Cheng 已提交
612 613
}

H
Hongze Cheng 已提交
614 615 616
void *tdDecodeKVRow(void *buf, SKVRow *row) {
  *row = tdKVRowDup(buf);
  return POINTER_SHIFT(buf, kvRowLen(*row));
H
Hongze Cheng 已提交
617 618
}

H
Hongze Cheng 已提交
619
int tdInitKVRowBuilder(SKVRowBuilder *pBuilder) {
H
Hongze Cheng 已提交
620 621 622 623 624 625 626 627 628 629 630 631 632 633
  pBuilder->tCols = 128;
  pBuilder->nCols = 0;
  pBuilder->pColIdx = (SColIdx *)malloc(sizeof(SColIdx) * pBuilder->tCols);
  if (pBuilder->pColIdx == NULL) return -1;
  pBuilder->alloc = 1024;
  pBuilder->size = 0;
  pBuilder->buf = malloc(pBuilder->alloc);
  if (pBuilder->buf == NULL) {
    free(pBuilder->pColIdx);
    return -1;
  }
  return 0;
}

H
Hongze Cheng 已提交
634
void tdDestroyKVRowBuilder(SKVRowBuilder *pBuilder) {
H
Hongze Cheng 已提交
635 636 637 638
  tfree(pBuilder->pColIdx);
  tfree(pBuilder->buf);
}

H
Hongze Cheng 已提交
639
void tdResetKVRowBuilder(SKVRowBuilder *pBuilder) {
H
Hongze Cheng 已提交
640 641 642 643
  pBuilder->nCols = 0;
  pBuilder->size = 0;
}

H
Hongze Cheng 已提交
644
SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder) {
H
Hongze Cheng 已提交
645 646 647
  int tlen = sizeof(SColIdx) * pBuilder->nCols + pBuilder->size;
  if (tlen == 0) return NULL;

H
Hongze Cheng 已提交
648 649 650
  tlen += TD_KV_ROW_HEAD_SIZE;

  SKVRow row = malloc(tlen);
H
Hongze Cheng 已提交
651 652
  if (row == NULL) return NULL;

H
Hongze Cheng 已提交
653
  kvRowSetNCols(row, pBuilder->nCols);
H
Hongze Cheng 已提交
654
  kvRowSetLen(row, tlen);
H
Hongze Cheng 已提交
655

H
Hongze Cheng 已提交
656 657
  memcpy(kvRowColIdx(row), pBuilder->pColIdx, sizeof(SColIdx) * pBuilder->nCols);
  memcpy(kvRowValues(row), pBuilder->buf, pBuilder->size);
H
Hongze Cheng 已提交
658 659

  return row;
H
TD-100  
hzcheng 已提交
660
}