tdataformat.c 17.4 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
S
slguan 已提交
15
#include "tdataformat.h"
T
Tao Liu 已提交
16
#include "talgo.h"
H
Hongze Cheng 已提交
17
#include "wchar.h"
H
more  
hzcheng 已提交
18

H
hzcheng 已提交
19 20 21 22 23 24 25 26 27 28
/**
 * Create a SSchema object with nCols columns
 * ASSUMPTIONS: VALID PARAMETERS
 *
 * @param nCols number of columns the schema has
 *
 * @return a STSchema object for success
 *         NULL for failure
 */
STSchema *tdNewSchema(int32_t nCols) {
H
TD-166  
hzcheng 已提交
29
  int32_t size = sizeof(STSchema) + sizeof(STColumn) * nCols;
H
hzcheng 已提交
30

H
TD-27  
hzcheng 已提交
31
  STSchema *pSchema = (STSchema *)calloc(1, size);
H
hzcheng 已提交
32
  if (pSchema == NULL) return NULL;
H
TD-166  
hzcheng 已提交
33

H
hzcheng 已提交
34
  pSchema->numOfCols = 0;
H
TD-166  
hzcheng 已提交
35 36 37
  pSchema->totalCols = nCols;
  pSchema->flen = 0;
  pSchema->tlen = 0;
H
hzcheng 已提交
38 39 40 41

  return pSchema;
}

H
hzcheng 已提交
42 43 44
/**
 * Append a column to the schema
 */
H
TD-166  
hzcheng 已提交
45 46
int tdSchemaAddCol(STSchema *pSchema, int8_t type, int16_t colId, int32_t bytes) {
  if (!isValidDataType(type, 0) || pSchema->numOfCols >= pSchema->totalCols) return -1;
H
hzcheng 已提交
47 48 49 50

  STColumn *pCol = schemaColAt(pSchema, schemaNCols(pSchema));
  colSetType(pCol, type);
  colSetColId(pCol, colId);
H
TD-166  
hzcheng 已提交
51
  if (schemaNCols(pSchema) == 0) {
H
TD-166  
hzcheng 已提交
52 53
    colSetOffset(pCol, 0);
  } else {
H
Hongze Cheng 已提交
54
    STColumn *pTCol = schemaColAt(pSchema, schemaNCols(pSchema) - 1);
H
TD-166  
hzcheng 已提交
55
    colSetOffset(pCol, pTCol->offset + TYPE_BYTES[pTCol->type]);
H
TD-166  
hzcheng 已提交
56
  }
H
hzcheng 已提交
57 58 59
  switch (type) {
    case TSDB_DATA_TYPE_BINARY:
    case TSDB_DATA_TYPE_NCHAR:
H
Hongze Cheng 已提交
60
      colSetBytes(pCol, bytes);  // Set as maximum bytes
H
TD-166  
hzcheng 已提交
61
      pSchema->tlen += (TYPE_BYTES[type] + sizeof(VarDataLenT) + bytes);
H
hzcheng 已提交
62 63 64
      break;
    default:
      colSetBytes(pCol, TYPE_BYTES[type]);
H
TD-166  
hzcheng 已提交
65
      pSchema->tlen += TYPE_BYTES[type];
H
hzcheng 已提交
66 67 68 69
      break;
  }

  pSchema->numOfCols++;
H
TD-166  
hzcheng 已提交
70 71 72
  pSchema->flen += TYPE_BYTES[type];

  ASSERT(pCol->offset < pSchema->flen);
H
hzcheng 已提交
73 74 75 76

  return 0;
}

H
hzcheng 已提交
77 78 79 80 81 82 83 84 85 86 87 88 89
/**
 * Duplicate the schema and return a new object
 */
STSchema *tdDupSchema(STSchema *pSchema) {
  STSchema *tSchema = tdNewSchema(schemaNCols(pSchema));
  if (tSchema == NULL) return NULL;

  int32_t size = sizeof(STSchema) + sizeof(STColumn) * schemaNCols(pSchema);
  memcpy((void *)tSchema, (void *)pSchema, size);

  return tSchema;
}

H
TD-27  
hzcheng 已提交
90 91 92 93
/**
 * Return the size of encoded schema
 */
int tdGetSchemaEncodeSize(STSchema *pSchema) {
H
TD-166  
hzcheng 已提交
94 95 96
  return T_MEMBER_SIZE(STSchema, totalCols) +
         schemaNCols(pSchema) *
             (T_MEMBER_SIZE(STColumn, type) + T_MEMBER_SIZE(STColumn, colId) + T_MEMBER_SIZE(STColumn, bytes));
H
TD-27  
hzcheng 已提交
97 98 99 100 101 102
}

/**
 * Encode a schema to dst, and return the next pointer
 */
void *tdEncodeSchema(void *dst, STSchema *pSchema) {
H
TD-166  
hzcheng 已提交
103 104 105
  ASSERT(pSchema->numOfCols == pSchema->totalCols);

  T_APPEND_MEMBER(dst, pSchema, STSchema, totalCols);
H
TD-27  
hzcheng 已提交
106 107 108 109 110 111 112 113 114 115 116 117 118 119
  for (int i = 0; i < schemaNCols(pSchema); i++) {
    STColumn *pCol = schemaColAt(pSchema, i);
    T_APPEND_MEMBER(dst, pCol, STColumn, type);
    T_APPEND_MEMBER(dst, pCol, STColumn, colId);
    T_APPEND_MEMBER(dst, pCol, STColumn, bytes);
  }

  return dst;
}

/**
 * Decode a schema from a binary.
 */
STSchema *tdDecodeSchema(void **psrc) {
H
TD-166  
hzcheng 已提交
120
  int totalCols = 0;
H
TD-27  
hzcheng 已提交
121

H
TD-166  
hzcheng 已提交
122
  T_READ_MEMBER(*psrc, int, totalCols);
H
TD-27  
hzcheng 已提交
123

H
TD-166  
hzcheng 已提交
124
  STSchema *pSchema = tdNewSchema(totalCols);
H
TD-27  
hzcheng 已提交
125
  if (pSchema == NULL) return NULL;
H
TD-166  
hzcheng 已提交
126
  for (int i = 0; i < totalCols; i++) {
H
TD-27  
hzcheng 已提交
127 128 129 130 131 132 133
    int8_t  type = 0;
    int16_t colId = 0;
    int32_t bytes = 0;
    T_READ_MEMBER(*psrc, int8_t, type);
    T_READ_MEMBER(*psrc, int16_t, colId);
    T_READ_MEMBER(*psrc, int32_t, bytes);

H
TD-166  
hzcheng 已提交
134
    tdSchemaAddCol(pSchema, type, colId, bytes);
H
TD-27  
hzcheng 已提交
135 136 137 138 139
  }

  return pSchema;
}

H
hzcheng 已提交
140 141 142
/**
 * Initialize a data row
 */
H
TD-166  
hzcheng 已提交
143
void tdInitDataRow(SDataRow row, STSchema *pSchema) { dataRowSetLen(row, TD_DATA_ROW_HEAD_SIZE + schemaFLen(pSchema)); }
H
hzcheng 已提交
144

H
TD-166  
hzcheng 已提交
145
SDataRow tdNewDataRowFromSchema(STSchema *pSchema) {
H
TD-166  
hzcheng 已提交
146
  int32_t size = dataRowMaxBytesFromSchema(pSchema);
H
hzcheng 已提交
147 148 149 150

  SDataRow row = malloc(size);
  if (row == NULL) return NULL;

H
hzcheng 已提交
151
  tdInitDataRow(row, pSchema);
H
hzcheng 已提交
152
  return row;
H
TD-166  
hzcheng 已提交
153
}
H
hzcheng 已提交
154

H
hzcheng 已提交
155 156 157 158 159 160 161
/**
 * Free the SDataRow object
 */
void tdFreeDataRow(SDataRow row) {
  if (row) free(row);
}

H
hzcheng 已提交
162
SDataRow tdDataRowDup(SDataRow row) {
H
hzcheng 已提交
163
  SDataRow trow = malloc(dataRowLen(row));
H
hzcheng 已提交
164 165 166
  if (trow == NULL) return NULL;

  dataRowCpy(trow, row);
H
hzcheng 已提交
167
  return trow;
H
hzcheng 已提交
168
}
H
hzcheng 已提交
169

H
TD-166  
hzcheng 已提交
170 171 172 173
void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints) {
  pDataCol->type = colType(pCol);
  pDataCol->colId = colColId(pCol);
  pDataCol->bytes = colBytes(pCol);
H
TD-166  
hzcheng 已提交
174
  pDataCol->offset = colOffset(pCol) + TD_DATA_ROW_HEAD_SIZE;
H
TD-166  
hzcheng 已提交
175 176 177

  pDataCol->len = 0;
  if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) {
H
TD-166  
hzcheng 已提交
178
    pDataCol->spaceSize = (sizeof(VarDataLenT) + pDataCol->bytes) * maxPoints;
H
TD-166  
hzcheng 已提交
179
    pDataCol->dataOff = (VarDataOffsetT *)(*pBuf);
H
hzcheng 已提交
180 181
    pDataCol->pData = POINTER_SHIFT(*pBuf, TYPE_BYTES[pDataCol->type] * maxPoints);
    *pBuf = POINTER_SHIFT(*pBuf, pDataCol->spaceSize + TYPE_BYTES[pDataCol->type] * maxPoints);
H
TD-166  
hzcheng 已提交
182 183 184 185
  } else {
    pDataCol->spaceSize = pDataCol->bytes * maxPoints;
    pDataCol->dataOff = NULL;
    pDataCol->pData = *pBuf;
H
hzcheng 已提交
186
    *pBuf = POINTER_SHIFT(*pBuf, pDataCol->spaceSize);
H
TD-166  
hzcheng 已提交
187 188 189
  }
}

H
Haojun Liao 已提交
190
void dataColAppendVal(SDataCol *pCol, void *value, int numOfRows, int maxPoints) {
H
TD-166  
hzcheng 已提交
191 192 193 194 195 196
  ASSERT(pCol != NULL && value != NULL);

  switch (pCol->type) {
    case TSDB_DATA_TYPE_BINARY:
    case TSDB_DATA_TYPE_NCHAR:
      // set offset
H
Haojun Liao 已提交
197
      pCol->dataOff[numOfRows] = pCol->len;
H
TD-166  
hzcheng 已提交
198
      // Copy data
H
hzcheng 已提交
199
      memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, varDataTLen(value));
H
TD-166  
hzcheng 已提交
200
      // Update the length
H
TD-166  
hzcheng 已提交
201
      pCol->len += varDataTLen(value);
H
TD-166  
hzcheng 已提交
202 203
      break;
    default:
H
Haojun Liao 已提交
204
      ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows);
H
hzcheng 已提交
205
      memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, pCol->bytes);
H
TD-166  
hzcheng 已提交
206 207 208 209 210
      pCol->len += pCol->bytes;
      break;
  }
}

H
Haojun Liao 已提交
211 212
void dataColPopPoints(SDataCol *pCol, int pointsToPop, int numOfRows) {
  int pointsLeft = numOfRows - pointsToPop;
H
TD-166  
hzcheng 已提交
213 214 215 216 217

  ASSERT(pointsLeft > 0);

  if (pCol->type == TSDB_DATA_TYPE_BINARY || pCol->type == TSDB_DATA_TYPE_NCHAR) {
    ASSERT(pCol->len > 0);
H
TD-166  
hzcheng 已提交
218
    VarDataOffsetT toffset = pCol->dataOff[pointsToPop];
H
TD-166  
hzcheng 已提交
219 220
    pCol->len = pCol->len - toffset;
    ASSERT(pCol->len > 0);
H
hzcheng 已提交
221
    memmove(pCol->pData, POINTER_SHIFT(pCol->pData, toffset), pCol->len);
H
TD-166  
hzcheng 已提交
222 223
    dataColSetOffset(pCol, pointsLeft);
  } else {
H
Haojun Liao 已提交
224
    ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows);
H
TD-166  
hzcheng 已提交
225
    pCol->len = TYPE_BYTES[pCol->type] * pointsLeft;
H
hzcheng 已提交
226
    memmove(pCol->pData, POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * pointsToPop), pCol->len);
H
TD-166  
hzcheng 已提交
227 228 229
  }
}

H
TD-166  
hzcheng 已提交
230 231 232 233 234
bool isNEleNull(SDataCol *pCol, int nEle) {
  switch (pCol->type) {
    case TSDB_DATA_TYPE_BINARY:
    case TSDB_DATA_TYPE_NCHAR:
      for (int i = 0; i < nEle; i++) {
H
TD-166  
hzcheng 已提交
235
        if (!isNull(varDataVal(tdGetColDataOfRow(pCol, i)), pCol->type)) return false;
H
TD-166  
hzcheng 已提交
236 237 238 239 240 241 242 243 244 245 246 247 248 249 250
      }
      return true;
    default:
      for (int i = 0; i < nEle; i++) {
        if (!isNull(tdGetColDataOfRow(pCol, i), pCol->type)) return false;
      }
      return true;
  }
}

void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints) {
  char *ptr = NULL;
  switch (pCol->type) {
    case TSDB_DATA_TYPE_BINARY:
    case TSDB_DATA_TYPE_NCHAR:
H
TD-166  
hzcheng 已提交
251
      pCol->len = 0;
H
TD-166  
hzcheng 已提交
252
      for (int i = 0; i < nEle; i++) {
H
TD-166  
hzcheng 已提交
253 254 255 256 257
        pCol->dataOff[i] = pCol->len;
        ptr = (char *)pCol->pData + pCol->len;
        varDataLen(ptr) = (pCol->type == TSDB_DATA_TYPE_BINARY) ? sizeof(char) : TSDB_NCHAR_SIZE;
        setNull(ptr + sizeof(VarDataLenT), pCol->type, pCol->bytes);
        pCol->len += varDataTLen(ptr);
H
TD-166  
hzcheng 已提交
258
      }
H
TD-166  
hzcheng 已提交
259

H
TD-166  
hzcheng 已提交
260 261 262 263 264 265 266 267
      break;
    default:
      setNullN(pCol->pData, pCol->type, pCol->bytes, nEle);
      pCol->len = TYPE_BYTES[pCol->type] * nEle;
      break;
  }
}

H
TD-166  
hzcheng 已提交
268 269 270
void dataColSetOffset(SDataCol *pCol, int nEle) {
  ASSERT(((pCol->type == TSDB_DATA_TYPE_BINARY) || (pCol->type == TSDB_DATA_TYPE_NCHAR)));

H
Hongze Cheng 已提交
271
  void *tptr = pCol->pData;
H
TD-166  
hzcheng 已提交
272
  // char *tptr = (char *)(pCol->pData);
H
TD-166  
hzcheng 已提交
273

H
TD-166  
hzcheng 已提交
274
  VarDataOffsetT offset = 0;
H
TD-166  
hzcheng 已提交
275
  for (int i = 0; i < nEle; i++) {
H
TD-166  
hzcheng 已提交
276
    pCol->dataOff[i] = offset;
H
TD-166  
hzcheng 已提交
277
    offset += varDataTLen(tptr);
H
hzcheng 已提交
278
    tptr = POINTER_SHIFT(tptr, varDataTLen(tptr));
H
TD-166  
hzcheng 已提交
279 280 281
  }
}

H
TD-166  
hzcheng 已提交
282
SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) {
H
TD-34  
hzcheng 已提交
283 284 285 286 287 288
  SDataCols *pCols = (SDataCols *)calloc(1, sizeof(SDataCols) + sizeof(SDataCol) * maxCols);
  if (pCols == NULL) return NULL;

  pCols->maxRowSize = maxRowSize;
  pCols->maxCols = maxCols;
  pCols->maxPoints = maxRows;
H
TD-166  
hzcheng 已提交
289
  pCols->bufSize = maxRowSize * maxRows;
H
TD-34  
hzcheng 已提交
290

H
TD-166  
hzcheng 已提交
291
  pCols->buf = malloc(pCols->bufSize);
H
TD-34  
hzcheng 已提交
292 293
  if (pCols->buf == NULL) {
    free(pCols);
H
TD-34  
hzcheng 已提交
294 295
    return NULL;
  }
H
TD-34  
hzcheng 已提交
296

H
TD-34  
hzcheng 已提交
297 298 299 300 301 302 303 304
  return pCols;
}

void tdInitDataCols(SDataCols *pCols, STSchema *pSchema) {
  // assert(schemaNCols(pSchema) <= pCols->numOfCols);
  tdResetDataCols(pCols);
  pCols->numOfCols = schemaNCols(pSchema);

H
TD-166  
hzcheng 已提交
305
  void *ptr = pCols->buf;
H
TD-34  
hzcheng 已提交
306
  for (int i = 0; i < schemaNCols(pSchema); i++) {
H
TD-166  
hzcheng 已提交
307
    dataColInit(pCols->cols + i, schemaColAt(pSchema, i), &ptr, pCols->maxPoints);
H
TD-166  
hzcheng 已提交
308
    ASSERT((char *)ptr - (char *)(pCols->buf) <= pCols->bufSize);
H
TD-34  
hzcheng 已提交
309
  }
H
TD-34  
hzcheng 已提交
310 311 312 313
}

void tdFreeDataCols(SDataCols *pCols) {
  if (pCols) {
H
TD-166  
hzcheng 已提交
314
    tfree(pCols->buf);
H
TD-34  
hzcheng 已提交
315 316 317 318
    free(pCols);
  }
}

H
TD-100  
hzcheng 已提交
319
SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
H
TD-166  
hzcheng 已提交
320
  SDataCols *pRet = tdNewDataCols(pDataCols->maxRowSize, pDataCols->maxCols, pDataCols->maxPoints);
H
TD-100  
hzcheng 已提交
321 322 323 324
  if (pRet == NULL) return NULL;

  pRet->numOfCols = pDataCols->numOfCols;
  pRet->sversion = pDataCols->sversion;
H
Haojun Liao 已提交
325
  if (keepData) pRet->numOfRows = pDataCols->numOfRows;
H
TD-100  
hzcheng 已提交
326 327 328 329 330 331

  for (int i = 0; i < pDataCols->numOfCols; i++) {
    pRet->cols[i].type = pDataCols->cols[i].type;
    pRet->cols[i].colId = pDataCols->cols[i].colId;
    pRet->cols[i].bytes = pDataCols->cols[i].bytes;
    pRet->cols[i].offset = pDataCols->cols[i].offset;
H
TD-166  
hzcheng 已提交
332 333

    pRet->cols[i].spaceSize = pDataCols->cols[i].spaceSize;
H
TD-100  
hzcheng 已提交
334
    pRet->cols[i].pData = (void *)((char *)pRet->buf + ((char *)(pDataCols->cols[i].pData) - (char *)(pDataCols->buf)));
H
TD-100  
hzcheng 已提交
335

H
TD-166  
hzcheng 已提交
336 337 338 339 340 341 342 343 344 345
    if (pRet->cols[i].type == TSDB_DATA_TYPE_BINARY || pRet->cols[i].type == TSDB_DATA_TYPE_NCHAR) {
      ASSERT(pDataCols->cols[i].dataOff != NULL);
      pRet->cols[i].dataOff =
          (int32_t *)((char *)pRet->buf + ((char *)(pDataCols->cols[i].dataOff) - (char *)(pDataCols->buf)));
    }

    if (keepData) {
      pRet->cols[i].len = pDataCols->cols[i].len;
      memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pDataCols->cols[i].len);
      if (pRet->cols[i].type == TSDB_DATA_TYPE_BINARY || pRet->cols[i].type == TSDB_DATA_TYPE_NCHAR) {
H
TD-166  
hzcheng 已提交
346
        memcpy(pRet->cols[i].dataOff, pDataCols->cols[i].dataOff, sizeof(VarDataOffsetT) * pDataCols->maxPoints);
H
TD-166  
hzcheng 已提交
347 348
      }
    }
H
TD-100  
hzcheng 已提交
349 350 351 352 353
  }

  return pRet;
}

H
TD-34  
hzcheng 已提交
354
void tdResetDataCols(SDataCols *pCols) {
H
Haojun Liao 已提交
355
  pCols->numOfRows = 0;
H
TD-34  
hzcheng 已提交
356
  for (int i = 0; i < pCols->maxCols; i++) {
H
TD-166  
hzcheng 已提交
357
    dataColReset(pCols->cols + i);
H
TD-34  
hzcheng 已提交
358 359 360
  }
}

H
TD-34  
hzcheng 已提交
361
void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols) {
H
TD-166  
hzcheng 已提交
362 363
  ASSERT(dataColsKeyLast(pCols) < dataRowKey(row));

H
TD-34  
hzcheng 已提交
364 365
  for (int i = 0; i < pCols->numOfCols; i++) {
    SDataCol *pCol = pCols->cols + i;
H
TD-166  
hzcheng 已提交
366
    void *    value = tdGetRowDataOfCol(row, pCol->type, pCol->offset);
H
TD-166  
hzcheng 已提交
367

H
Haojun Liao 已提交
368
    dataColAppendVal(pCol, value, pCols->numOfRows, pCols->maxPoints);
H
TD-34  
hzcheng 已提交
369
  }
H
Haojun Liao 已提交
370
  pCols->numOfRows++;
H
TD-34  
hzcheng 已提交
371
}
H
TD-166  
hzcheng 已提交
372

H
TD-34  
hzcheng 已提交
373 374
// Pop pointsToPop points from the SDataCols
void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop) {
H
Haojun Liao 已提交
375
  int pointsLeft = pCols->numOfRows - pointsToPop;
H
TD-166  
hzcheng 已提交
376
  if (pointsLeft <= 0) {
H
TD-166  
hzcheng 已提交
377 378 379
    tdResetDataCols(pCols);
    return;
  }
H
TD-34  
hzcheng 已提交
380 381

  for (int iCol = 0; iCol < pCols->numOfCols; iCol++) {
H
TD-166  
hzcheng 已提交
382
    SDataCol *pCol = pCols->cols + iCol;
H
Haojun Liao 已提交
383
    dataColPopPoints(pCol, pointsToPop, pCols->numOfRows);
H
TD-34  
hzcheng 已提交
384
  }
H
Haojun Liao 已提交
385
  pCols->numOfRows = pointsLeft;
H
TD-34  
hzcheng 已提交
386
}
H
TD-34  
hzcheng 已提交
387

H
hzcheng 已提交
388
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) {
H
Haojun Liao 已提交
389 390
  ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows);
  ASSERT(target->numOfRows + rowsToMerge <= target->maxPoints);
H
TD-166  
hzcheng 已提交
391
  ASSERT(target->numOfCols == source->numOfCols);
H
TD-100  
hzcheng 已提交
392

H
TD-166  
hzcheng 已提交
393
  SDataCols *pTarget = NULL;
H
TD-100  
hzcheng 已提交
394

H
TD-166  
hzcheng 已提交
395 396 397
  if (dataColsKeyLast(target) < dataColsKeyFirst(source)) {  // No overlap
    for (int i = 0; i < rowsToMerge; i++) {
      for (int j = 0; j < source->numOfCols; j++) {
H
Haojun Liao 已提交
398
        dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i), target->numOfRows,
H
TD-166  
hzcheng 已提交
399 400
                         target->maxPoints);
      }
H
Haojun Liao 已提交
401
      target->numOfRows++;
H
TD-166  
hzcheng 已提交
402 403 404 405 406 407 408
    }
  } else {
    pTarget = tdDupDataCols(target, true);
    if (pTarget == NULL) goto _err;

    int iter1 = 0;
    int iter2 = 0;
H
Haojun Liao 已提交
409
    tdMergeTwoDataCols(target, pTarget, &iter1, source, &iter2, pTarget->numOfRows + rowsToMerge);
H
TD-166  
hzcheng 已提交
410
  }
H
TD-100  
hzcheng 已提交
411 412 413 414 415 416 417 418

  tdFreeDataCols(pTarget);
  return 0;

_err:
  tdFreeDataCols(pTarget);
  return -1;
}
H
TD-100  
hzcheng 已提交
419

H
TD-100  
hzcheng 已提交
420
void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCols *src2, int *iter2, int tRows) {
H
TD-166  
hzcheng 已提交
421
  // TODO: add resolve duplicate key here
H
TD-100  
hzcheng 已提交
422 423
  tdResetDataCols(target);

H
Haojun Liao 已提交
424 425
  while (target->numOfRows < tRows) {
    if (*iter1 >= src1->numOfRows && *iter2 >= src2->numOfRows) break;
H
TD-100  
hzcheng 已提交
426

H
Haojun Liao 已提交
427 428
    TSKEY key1 = (*iter1 >= src1->numOfRows) ? INT64_MAX : ((TSKEY *)(src1->cols[0].pData))[*iter1];
    TSKEY key2 = (*iter2 >= src2->numOfRows) ? INT64_MAX : ((TSKEY *)(src2->cols[0].pData))[*iter2];
H
TD-100  
hzcheng 已提交
429

H
Hongze Cheng 已提交
430
    if (key1 <= key2) {
H
TD-100  
hzcheng 已提交
431 432
      for (int i = 0; i < src1->numOfCols; i++) {
        ASSERT(target->cols[i].type == src1->cols[i].type);
H
Haojun Liao 已提交
433
        dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows,
H
TD-166  
hzcheng 已提交
434
                         target->maxPoints);
H
TD-100  
hzcheng 已提交
435 436
      }

H
Haojun Liao 已提交
437
      target->numOfRows++;
H
TD-100  
hzcheng 已提交
438
      (*iter1)++;
H
Hongze Cheng 已提交
439 440
      if (key1 == key2) (*iter2)++;
    } else {
H
TD-100  
hzcheng 已提交
441 442
      for (int i = 0; i < src2->numOfCols; i++) {
        ASSERT(target->cols[i].type == src2->cols[i].type);
H
Haojun Liao 已提交
443
        dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src2->cols + i, *iter2), target->numOfRows,
H
TD-166  
hzcheng 已提交
444
                         target->maxPoints);
H
TD-100  
hzcheng 已提交
445
      }
H
TD-100  
hzcheng 已提交
446

H
Haojun Liao 已提交
447
      target->numOfRows++;
H
TD-100  
hzcheng 已提交
448
      (*iter2)++;
H
TD-100  
hzcheng 已提交
449 450
    }
  }
H
Hongze Cheng 已提交
451 452
}

H
Hongze Cheng 已提交
453 454
SKVRow tdKVRowDup(SKVRow row) {
  SKVRow trow = malloc(kvRowLen(row));
H
Hongze Cheng 已提交
455 456
  if (trow == NULL) return NULL;

H
Hongze Cheng 已提交
457
  kvRowCpy(trow, row);
H
Hongze Cheng 已提交
458 459 460
  return trow;
}

H
Hongze Cheng 已提交
461
SKVRow tdSetKVRowDataOfCol(SKVRow row, int16_t colId, int8_t type, void *value) {
H
Hongze Cheng 已提交
462 463 464
  // TODO
  return NULL;
  // SColIdx *pColIdx = NULL;
H
Hongze Cheng 已提交
465 466
  // SKVRow rrow = row;
  // SKVRow nrow = NULL;
H
Hongze Cheng 已提交
467 468 469
  // void *ptr = taosbsearch(&colId, kvDataRowColIdx(row), kvDataRowNCols(row), sizeof(SColIdx), comparTagId, TD_GE);

  // if (ptr == NULL || ((SColIdx *)ptr)->colId < colId) { // need to add a column value to the row
H
Hongze Cheng 已提交
470 471
  //   int tlen = kvDataRowLen(row) + sizeof(SColIdx) + (IS_VAR_DATA_TYPE(type) ? varDataTLen(value) :
  //   TYPE_BYTES[type]); nrow = malloc(tlen); if (nrow == NULL) return NULL;
H
Hongze Cheng 已提交
472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525

  //   kvDataRowSetNCols(nrow, kvDataRowNCols(row)+1);
  //   kvDataRowSetLen(nrow, tlen);

  //   if (ptr == NULL) ptr = kvDataRowValues(row);

  //   // Copy the columns before the col
  //   if (POINTER_DISTANCE(ptr, kvDataRowColIdx(row)) > 0) {
  //     memcpy(kvDataRowColIdx(nrow), kvDataRowColIdx(row), POINTER_DISTANCE(ptr, kvDataRowColIdx(row)));
  //     memcpy(kvDataRowValues(nrow), kvDataRowValues(row), ((SColIdx *)ptr)->offset); // TODO: here is not correct
  //   }

  //   // Set the new col value
  //   pColIdx = (SColIdx *)POINTER_SHIFT(nrow, POINTER_DISTANCE(ptr, row));
  //   pColIdx->colId = colId;
  //   pColIdx->offset = ((SColIdx *)ptr)->offset; // TODO: here is not correct

  //   if (IS_VAR_DATA_TYPE(type)) {
  //     memcpy(POINTER_SHIFT(kvDataRowValues(nrow), pColIdx->offset), value, varDataLen(value));
  //   } else {
  //     memcpy(POINTER_SHIFT(kvDataRowValues(nrow), pColIdx->offset), value, TYPE_BYTES[type]);
  //   }

  //   // Copy the columns after the col
  //   if (POINTER_DISTANCE(kvDataRowValues(row), ptr) > 0) {
  //     // TODO: memcpy();
  //   }
  // } else {
  //   // TODO
  //   ASSERT(((SColIdx *)ptr)->colId == colId);
  //   if (IS_VAR_DATA_TYPE(type)) {
  //     void *pOldVal = kvDataRowColVal(row, (SColIdx *)ptr);

  //     if (varDataTLen(value) == varDataTLen(pOldVal)) { // just update the column value in place
  //       memcpy(pOldVal, value, varDataTLen(value));
  //     } else { // enlarge the memory
  //       // rrow = realloc(rrow, kvDataRowLen(rrow) + varDataTLen(value) - varDataTLen(pOldVal));
  //       // if (rrow == NULL) return NULL;
  //       // memmove();
  //       // for () {
  //       //   ((SColIdx *)ptr)->offset += balabala;
  //       // }

  //       // kvDataRowSetLen();

  //     }
  //   } else {
  //     memcpy(kvDataRowColVal(row, (SColIdx *)ptr), value, TYPE_BYTES[type]);
  //   }
  // }

  // return rrow;
}

H
Hongze Cheng 已提交
526
void *tdEncodeKVRow(void *buf, SKVRow row) {
H
Hongze Cheng 已提交
527
  // May change the encode purpose
H
Hongze Cheng 已提交
528 529
  kvRowCpy(buf, row);
  return POINTER_SHIFT(buf, kvRowLen(row));
H
Hongze Cheng 已提交
530 531
}

H
Hongze Cheng 已提交
532 533 534
void *tdDecodeKVRow(void *buf, SKVRow *row) {
  *row = tdKVRowDup(buf);
  return POINTER_SHIFT(buf, kvRowLen(*row));
H
Hongze Cheng 已提交
535 536
}

H
Hongze Cheng 已提交
537
int tdInitKVRowBuilder(SKVRowBuilder *pBuilder) {
H
Hongze Cheng 已提交
538 539 540 541 542 543 544 545 546 547 548 549 550 551
  pBuilder->tCols = 128;
  pBuilder->nCols = 0;
  pBuilder->pColIdx = (SColIdx *)malloc(sizeof(SColIdx) * pBuilder->tCols);
  if (pBuilder->pColIdx == NULL) return -1;
  pBuilder->alloc = 1024;
  pBuilder->size = 0;
  pBuilder->buf = malloc(pBuilder->alloc);
  if (pBuilder->buf == NULL) {
    free(pBuilder->pColIdx);
    return -1;
  }
  return 0;
}

H
Hongze Cheng 已提交
552
void tdDestroyKVRowBuilder(SKVRowBuilder *pBuilder) {
H
Hongze Cheng 已提交
553 554 555 556
  tfree(pBuilder->pColIdx);
  tfree(pBuilder->buf);
}

H
Hongze Cheng 已提交
557
void tdResetKVRowBuilder(SKVRowBuilder *pBuilder) {
H
Hongze Cheng 已提交
558 559 560 561
  pBuilder->nCols = 0;
  pBuilder->size = 0;
}

H
Hongze Cheng 已提交
562
SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder) {
H
Hongze Cheng 已提交
563 564 565
  int tlen = sizeof(SColIdx) * pBuilder->nCols + pBuilder->size;
  if (tlen == 0) return NULL;

H
Hongze Cheng 已提交
566 567 568
  tlen += TD_KV_ROW_HEAD_SIZE;

  SKVRow row = malloc(tlen);
H
Hongze Cheng 已提交
569 570
  if (row == NULL) return NULL;

H
Hongze Cheng 已提交
571
  kvRowSetNCols(row, pBuilder->nCols);
H
Hongze Cheng 已提交
572
  kvRowSetLen(row, tlen);
H
Hongze Cheng 已提交
573

H
Hongze Cheng 已提交
574 575
  memcpy(kvRowColIdx(row), pBuilder->pColIdx, sizeof(SColIdx) * pBuilder->nCols);
  memcpy(kvRowValues(row), pBuilder->buf, pBuilder->size);
H
Hongze Cheng 已提交
576 577

  return row;
H
TD-100  
hzcheng 已提交
578
}