tdataformat.c 22.0 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
S
slguan 已提交
15
#include "tdataformat.h"
H
Haojun Liao 已提交
16
#include "tulog.h"
T
Tao Liu 已提交
17
#include "talgo.h"
H
TD-353  
Hongze Cheng 已提交
18
#include "tcoding.h"
H
Hongze Cheng 已提交
19
#include "wchar.h"
H
more  
hzcheng 已提交
20

H
hzcheng 已提交
21 22 23 24
/**
 * Duplicate the schema and return a new object
 */
STSchema *tdDupSchema(STSchema *pSchema) {
H
Hongze Cheng 已提交
25 26 27

  int tlen = sizeof(STSchema) + sizeof(STColumn) * schemaNCols(pSchema);
  STSchema *tSchema = (STSchema *)malloc(tlen);
H
hzcheng 已提交
28 29
  if (tSchema == NULL) return NULL;

H
Hongze Cheng 已提交
30
  memcpy((void *)tSchema, (void *)pSchema, tlen);
H
hzcheng 已提交
31 32 33 34

  return tSchema;
}

H
TD-27  
hzcheng 已提交
35 36 37
/**
 * Encode a schema to dst, and return the next pointer
 */
H
TD-353  
Hongze Cheng 已提交
38 39 40 41
int tdEncodeSchema(void **buf, STSchema *pSchema) {
  int tlen = 0;
  tlen += taosEncodeFixedI32(buf, schemaVersion(pSchema));
  tlen += taosEncodeFixedI32(buf, schemaNCols(pSchema));
H
TD-166  
hzcheng 已提交
42

H
TD-27  
hzcheng 已提交
43 44
  for (int i = 0; i < schemaNCols(pSchema); i++) {
    STColumn *pCol = schemaColAt(pSchema, i);
H
TD-353  
Hongze Cheng 已提交
45 46
    tlen += taosEncodeFixedI8(buf, colType(pCol));
    tlen += taosEncodeFixedI16(buf, colColId(pCol));
47
    tlen += taosEncodeFixedI16(buf, colBytes(pCol));
H
TD-27  
hzcheng 已提交
48 49
  }

H
TD-353  
Hongze Cheng 已提交
50
  return tlen;
H
TD-27  
hzcheng 已提交
51 52 53 54 55
}

/**
 * Decode a schema from a binary.
 */
H
TD-353  
Hongze Cheng 已提交
56
void *tdDecodeSchema(void *buf, STSchema **pRSchema) {
H
Hongze Cheng 已提交
57
  int version = 0;
H
TD-353  
Hongze Cheng 已提交
58
  int numOfCols = 0;
H
TD-353  
Hongze Cheng 已提交
59
  STSchemaBuilder schemaBuilder;
H
TD-27  
hzcheng 已提交
60

H
TD-353  
Hongze Cheng 已提交
61 62
  buf = taosDecodeFixedI32(buf, &version);
  buf = taosDecodeFixedI32(buf, &numOfCols);
H
TD-27  
hzcheng 已提交
63

H
Hongze Cheng 已提交
64 65
  if (tdInitTSchemaBuilder(&schemaBuilder, version) < 0) return NULL;

H
TD-353  
Hongze Cheng 已提交
66
  for (int i = 0; i < numOfCols; i++) {
H
TD-27  
hzcheng 已提交
67 68
    int8_t  type = 0;
    int16_t colId = 0;
69
    int16_t bytes = 0;
H
TD-353  
Hongze Cheng 已提交
70 71
    buf = taosDecodeFixedI8(buf, &type);
    buf = taosDecodeFixedI16(buf, &colId);
72
    buf = taosDecodeFixedI16(buf, &bytes);
H
Hongze Cheng 已提交
73 74 75 76
    if (tdAddColToSchema(&schemaBuilder, type, colId, bytes) < 0) {
      tdDestroyTSchemaBuilder(&schemaBuilder);
      return NULL;
    }
H
TD-27  
hzcheng 已提交
77 78
  }

H
TD-353  
Hongze Cheng 已提交
79
  *pRSchema = tdGetSchemaFromBuilder(&schemaBuilder);
H
Hongze Cheng 已提交
80
  tdDestroyTSchemaBuilder(&schemaBuilder);
H
TD-353  
Hongze Cheng 已提交
81
  return buf;
H
Hongze Cheng 已提交
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
}

int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) {
  if (pBuilder == NULL) return -1;

  pBuilder->tCols = 256;
  pBuilder->columns = (STColumn *)malloc(sizeof(STColumn) * pBuilder->tCols);
  if (pBuilder->columns == NULL) return -1;

  tdResetTSchemaBuilder(pBuilder, version);
  return 0;
}

void tdDestroyTSchemaBuilder(STSchemaBuilder *pBuilder) {
  if (pBuilder) {
S
Shengliang Guan 已提交
97
    taosTFree(pBuilder->columns);
H
Hongze Cheng 已提交
98 99 100 101 102 103 104
  }
}

void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) {
  pBuilder->nCols = 0;
  pBuilder->tlen = 0;
  pBuilder->flen = 0;
T
Tao Liu 已提交
105
  pBuilder->vlen = 0;
H
Hongze Cheng 已提交
106 107 108
  pBuilder->version = version;
}

109
int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int16_t colId, int16_t bytes) {
110
  if (!isValidDataType(type)) return -1;
H
Hongze Cheng 已提交
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129

  if (pBuilder->nCols >= pBuilder->tCols) {
    pBuilder->tCols *= 2;
    pBuilder->columns = (STColumn *)realloc(pBuilder->columns, sizeof(STColumn) * pBuilder->tCols);
    if (pBuilder->columns == NULL) return -1;
  }

  STColumn *pCol = &(pBuilder->columns[pBuilder->nCols]);
  colSetType(pCol, type);
  colSetColId(pCol, colId);
  if (pBuilder->nCols == 0) {
    colSetOffset(pCol, 0);
  } else {
    STColumn *pTCol = &(pBuilder->columns[pBuilder->nCols-1]);
    colSetOffset(pCol, pTCol->offset + TYPE_BYTES[pTCol->type]);
  }

  if (IS_VAR_DATA_TYPE(type)) {
    colSetBytes(pCol, bytes);
T
Tao Liu 已提交
130 131
    pBuilder->tlen += (TYPE_BYTES[type] + bytes);
    pBuilder->vlen += bytes - sizeof(VarDataLenT);
H
Hongze Cheng 已提交
132 133 134
  } else {
    colSetBytes(pCol, TYPE_BYTES[type]);
    pBuilder->tlen += TYPE_BYTES[type];
T
Tao Liu 已提交
135
    pBuilder->vlen += TYPE_BYTES[type];
H
Hongze Cheng 已提交
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
  }

  pBuilder->nCols++;
  pBuilder->flen += TYPE_BYTES[type];

  ASSERT(pCol->offset < pBuilder->flen);

  return 0;
}

STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder) {
  if (pBuilder->nCols <= 0) return NULL;

  int tlen = sizeof(STSchema) + sizeof(STColumn) * pBuilder->nCols;

  STSchema *pSchema = (STSchema *)malloc(tlen);
  if (pSchema == NULL) return NULL;

  schemaVersion(pSchema) = pBuilder->version;
  schemaNCols(pSchema) = pBuilder->nCols;
  schemaTLen(pSchema) = pBuilder->tlen;
  schemaFLen(pSchema) = pBuilder->flen;
T
Tao Liu 已提交
158
  schemaVLen(pSchema) = pBuilder->vlen;
H
Hongze Cheng 已提交
159 160 161

  memcpy(schemaColAt(pSchema, 0), pBuilder->columns, sizeof(STColumn) * pBuilder->nCols);

H
TD-27  
hzcheng 已提交
162 163 164
  return pSchema;
}

H
hzcheng 已提交
165 166 167
/**
 * Initialize a data row
 */
H
TD-90  
Hongze Cheng 已提交
168 169 170 171
void tdInitDataRow(SDataRow row, STSchema *pSchema) {
  dataRowSetLen(row, TD_DATA_ROW_HEAD_SIZE + schemaFLen(pSchema));
  dataRowSetVersion(row, schemaVersion(pSchema));
}
H
hzcheng 已提交
172

H
TD-166  
hzcheng 已提交
173
SDataRow tdNewDataRowFromSchema(STSchema *pSchema) {
H
TD-166  
hzcheng 已提交
174
  int32_t size = dataRowMaxBytesFromSchema(pSchema);
H
hzcheng 已提交
175 176 177 178

  SDataRow row = malloc(size);
  if (row == NULL) return NULL;

H
hzcheng 已提交
179
  tdInitDataRow(row, pSchema);
H
hzcheng 已提交
180
  return row;
H
TD-166  
hzcheng 已提交
181
}
H
hzcheng 已提交
182

H
hzcheng 已提交
183 184 185 186 187 188 189
/**
 * Free the SDataRow object
 */
void tdFreeDataRow(SDataRow row) {
  if (row) free(row);
}

H
hzcheng 已提交
190
SDataRow tdDataRowDup(SDataRow row) {
H
hzcheng 已提交
191
  SDataRow trow = malloc(dataRowLen(row));
H
hzcheng 已提交
192 193 194
  if (trow == NULL) return NULL;

  dataRowCpy(trow, row);
H
hzcheng 已提交
195
  return trow;
H
hzcheng 已提交
196
}
H
hzcheng 已提交
197

H
TD-166  
hzcheng 已提交
198 199 200 201
void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints) {
  pDataCol->type = colType(pCol);
  pDataCol->colId = colColId(pCol);
  pDataCol->bytes = colBytes(pCol);
H
TD-166  
hzcheng 已提交
202
  pDataCol->offset = colOffset(pCol) + TD_DATA_ROW_HEAD_SIZE;
H
TD-166  
hzcheng 已提交
203 204 205 206

  pDataCol->len = 0;
  if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) {
    pDataCol->dataOff = (VarDataOffsetT *)(*pBuf);
H
Hongze Cheng 已提交
207 208 209
    pDataCol->pData = POINTER_SHIFT(*pBuf, sizeof(VarDataOffsetT) * maxPoints);
    pDataCol->spaceSize = pDataCol->bytes * maxPoints;
    *pBuf = POINTER_SHIFT(*pBuf, pDataCol->spaceSize + sizeof(VarDataOffsetT) * maxPoints);
H
TD-166  
hzcheng 已提交
210 211 212 213
  } else {
    pDataCol->spaceSize = pDataCol->bytes * maxPoints;
    pDataCol->dataOff = NULL;
    pDataCol->pData = *pBuf;
H
hzcheng 已提交
214
    *pBuf = POINTER_SHIFT(*pBuf, pDataCol->spaceSize);
H
TD-166  
hzcheng 已提交
215 216 217
  }
}

H
Haojun Liao 已提交
218
void dataColAppendVal(SDataCol *pCol, void *value, int numOfRows, int maxPoints) {
H
TD-166  
hzcheng 已提交
219 220 221 222 223 224
  ASSERT(pCol != NULL && value != NULL);

  switch (pCol->type) {
    case TSDB_DATA_TYPE_BINARY:
    case TSDB_DATA_TYPE_NCHAR:
      // set offset
H
Haojun Liao 已提交
225
      pCol->dataOff[numOfRows] = pCol->len;
H
TD-166  
hzcheng 已提交
226
      // Copy data
H
hzcheng 已提交
227
      memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, varDataTLen(value));
H
TD-166  
hzcheng 已提交
228
      // Update the length
H
TD-166  
hzcheng 已提交
229
      pCol->len += varDataTLen(value);
H
TD-166  
hzcheng 已提交
230 231
      break;
    default:
H
Haojun Liao 已提交
232
      ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows);
H
hzcheng 已提交
233
      memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, pCol->bytes);
H
TD-166  
hzcheng 已提交
234 235 236 237 238
      pCol->len += pCol->bytes;
      break;
  }
}

H
Haojun Liao 已提交
239 240
void dataColPopPoints(SDataCol *pCol, int pointsToPop, int numOfRows) {
  int pointsLeft = numOfRows - pointsToPop;
H
TD-166  
hzcheng 已提交
241 242 243 244 245

  ASSERT(pointsLeft > 0);

  if (pCol->type == TSDB_DATA_TYPE_BINARY || pCol->type == TSDB_DATA_TYPE_NCHAR) {
    ASSERT(pCol->len > 0);
H
TD-166  
hzcheng 已提交
246
    VarDataOffsetT toffset = pCol->dataOff[pointsToPop];
H
TD-166  
hzcheng 已提交
247 248
    pCol->len = pCol->len - toffset;
    ASSERT(pCol->len > 0);
H
hzcheng 已提交
249
    memmove(pCol->pData, POINTER_SHIFT(pCol->pData, toffset), pCol->len);
H
TD-166  
hzcheng 已提交
250 251
    dataColSetOffset(pCol, pointsLeft);
  } else {
H
Haojun Liao 已提交
252
    ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows);
H
TD-166  
hzcheng 已提交
253
    pCol->len = TYPE_BYTES[pCol->type] * pointsLeft;
H
hzcheng 已提交
254
    memmove(pCol->pData, POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * pointsToPop), pCol->len);
H
TD-166  
hzcheng 已提交
255 256 257
  }
}

H
TD-166  
hzcheng 已提交
258 259 260 261 262
bool isNEleNull(SDataCol *pCol, int nEle) {
  switch (pCol->type) {
    case TSDB_DATA_TYPE_BINARY:
    case TSDB_DATA_TYPE_NCHAR:
      for (int i = 0; i < nEle; i++) {
H
Hongze Cheng 已提交
263
        if (!isNull(tdGetColDataOfRow(pCol, i), pCol->type)) return false;
H
TD-166  
hzcheng 已提交
264 265 266 267 268 269 270 271 272 273
      }
      return true;
    default:
      for (int i = 0; i < nEle; i++) {
        if (!isNull(tdGetColDataOfRow(pCol, i), pCol->type)) return false;
      }
      return true;
  }
}

H
TD-90  
Hongze Cheng 已提交
274 275 276 277
void dataColSetNullAt(SDataCol *pCol, int index) {
  if (IS_VAR_DATA_TYPE(pCol->type)) {
    pCol->dataOff[index] = pCol->len;
    char *ptr = POINTER_SHIFT(pCol->pData, pCol->len);
278
    setVardataNull(ptr, pCol->type);
H
TD-90  
Hongze Cheng 已提交
279 280 281 282 283 284 285
    pCol->len += varDataTLen(ptr);
  } else {
    setNull(POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * index), pCol->type, pCol->bytes);
    pCol->len += TYPE_BYTES[pCol->type];
  }
}

H
TD-166  
hzcheng 已提交
286
void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints) {
H
TD-166  
hzcheng 已提交
287

H
TD-90  
Hongze Cheng 已提交
288 289 290 291 292 293 294 295
  if (IS_VAR_DATA_TYPE(pCol->type)) {
    pCol->len = 0;
    for (int i = 0; i < nEle; i++) {
      dataColSetNullAt(pCol, i);
    }
  } else {
    setNullN(pCol->pData, pCol->type, pCol->bytes, nEle);
    pCol->len = TYPE_BYTES[pCol->type] * nEle;
H
TD-166  
hzcheng 已提交
296 297 298
  }
}

H
TD-166  
hzcheng 已提交
299 300 301
void dataColSetOffset(SDataCol *pCol, int nEle) {
  ASSERT(((pCol->type == TSDB_DATA_TYPE_BINARY) || (pCol->type == TSDB_DATA_TYPE_NCHAR)));

H
Hongze Cheng 已提交
302
  void *tptr = pCol->pData;
H
TD-166  
hzcheng 已提交
303
  // char *tptr = (char *)(pCol->pData);
H
TD-166  
hzcheng 已提交
304

H
TD-166  
hzcheng 已提交
305
  VarDataOffsetT offset = 0;
H
TD-166  
hzcheng 已提交
306
  for (int i = 0; i < nEle; i++) {
H
TD-166  
hzcheng 已提交
307
    pCol->dataOff[i] = offset;
H
TD-166  
hzcheng 已提交
308
    offset += varDataTLen(tptr);
H
hzcheng 已提交
309
    tptr = POINTER_SHIFT(tptr, varDataTLen(tptr));
H
TD-166  
hzcheng 已提交
310 311 312
  }
}

H
TD-166  
hzcheng 已提交
313
SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) {
H
Hongze Cheng 已提交
314
  SDataCols *pCols = (SDataCols *)calloc(1, sizeof(SDataCols));
H
Haojun Liao 已提交
315
  if (pCols == NULL) {
S
Shengliang Guan 已提交
316
    uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)sizeof(SDataCols), strerror(errno));
H
Haojun Liao 已提交
317 318
    return NULL;
  }
H
TD-34  
hzcheng 已提交
319

H
Hongze Cheng 已提交
320 321
  pCols->cols = (SDataCol *)calloc(maxCols, sizeof(SDataCol));
  if (pCols->cols == NULL) {
S
Shengliang Guan 已提交
322
    uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)sizeof(SDataCol) * maxCols, strerror(errno));
H
Hongze Cheng 已提交
323 324 325 326
    tdFreeDataCols(pCols);
    return NULL;
  }

H
TD-34  
hzcheng 已提交
327 328 329
  pCols->maxRowSize = maxRowSize;
  pCols->maxCols = maxCols;
  pCols->maxPoints = maxRows;
H
TD-166  
hzcheng 已提交
330
  pCols->bufSize = maxRowSize * maxRows;
H
TD-34  
hzcheng 已提交
331

H
Hongze Cheng 已提交
332
  pCols->buf = malloc(pCols->bufSize);
H
TD-34  
hzcheng 已提交
333
  if (pCols->buf == NULL) {
S
Shengliang Guan 已提交
334
    uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)sizeof(SDataCol) * maxCols, strerror(errno));
H
Hongze Cheng 已提交
335
    tdFreeDataCols(pCols);
H
TD-34  
hzcheng 已提交
336 337
    return NULL;
  }
H
TD-34  
hzcheng 已提交
338

H
TD-34  
hzcheng 已提交
339 340 341
  return pCols;
}

H
Hongze Cheng 已提交
342 343 344 345 346 347 348 349 350 351 352 353 354 355
int tdInitDataCols(SDataCols *pCols, STSchema *pSchema) {
  if (schemaNCols(pSchema) > pCols->maxCols) {
    pCols->maxCols = schemaNCols(pSchema);
    pCols->cols = (SDataCol *)realloc(pCols->cols, sizeof(SDataCol) * pCols->maxCols);
    if (pCols->cols == NULL) return -1;
  }

  if (schemaTLen(pSchema) > pCols->maxRowSize) {
    pCols->maxRowSize = schemaTLen(pSchema);
    pCols->bufSize = schemaTLen(pSchema) * pCols->maxPoints;
    pCols->buf = realloc(pCols->buf, pCols->bufSize);
    if (pCols->buf == NULL) return -1;
  }

H
TD-34  
hzcheng 已提交
356 357 358
  tdResetDataCols(pCols);
  pCols->numOfCols = schemaNCols(pSchema);

H
TD-166  
hzcheng 已提交
359
  void *ptr = pCols->buf;
H
TD-34  
hzcheng 已提交
360
  for (int i = 0; i < schemaNCols(pSchema); i++) {
H
TD-166  
hzcheng 已提交
361
    dataColInit(pCols->cols + i, schemaColAt(pSchema, i), &ptr, pCols->maxPoints);
H
TD-166  
hzcheng 已提交
362
    ASSERT((char *)ptr - (char *)(pCols->buf) <= pCols->bufSize);
H
TD-34  
hzcheng 已提交
363
  }
H
Hongze Cheng 已提交
364 365
  
  return 0;
H
TD-34  
hzcheng 已提交
366 367 368 369
}

void tdFreeDataCols(SDataCols *pCols) {
  if (pCols) {
S
Shengliang Guan 已提交
370 371
    taosTFree(pCols->buf);
    taosTFree(pCols->cols);
H
TD-34  
hzcheng 已提交
372 373 374 375
    free(pCols);
  }
}

H
TD-100  
hzcheng 已提交
376
SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
H
TD-166  
hzcheng 已提交
377
  SDataCols *pRet = tdNewDataCols(pDataCols->maxRowSize, pDataCols->maxCols, pDataCols->maxPoints);
H
TD-100  
hzcheng 已提交
378 379 380 381
  if (pRet == NULL) return NULL;

  pRet->numOfCols = pDataCols->numOfCols;
  pRet->sversion = pDataCols->sversion;
H
Haojun Liao 已提交
382
  if (keepData) pRet->numOfRows = pDataCols->numOfRows;
H
TD-100  
hzcheng 已提交
383 384 385 386 387 388

  for (int i = 0; i < pDataCols->numOfCols; i++) {
    pRet->cols[i].type = pDataCols->cols[i].type;
    pRet->cols[i].colId = pDataCols->cols[i].colId;
    pRet->cols[i].bytes = pDataCols->cols[i].bytes;
    pRet->cols[i].offset = pDataCols->cols[i].offset;
H
TD-166  
hzcheng 已提交
389 390

    pRet->cols[i].spaceSize = pDataCols->cols[i].spaceSize;
H
TD-100  
hzcheng 已提交
391
    pRet->cols[i].pData = (void *)((char *)pRet->buf + ((char *)(pDataCols->cols[i].pData) - (char *)(pDataCols->buf)));
H
TD-100  
hzcheng 已提交
392

H
TD-166  
hzcheng 已提交
393 394 395 396 397 398 399 400
    if (pRet->cols[i].type == TSDB_DATA_TYPE_BINARY || pRet->cols[i].type == TSDB_DATA_TYPE_NCHAR) {
      ASSERT(pDataCols->cols[i].dataOff != NULL);
      pRet->cols[i].dataOff =
          (int32_t *)((char *)pRet->buf + ((char *)(pDataCols->cols[i].dataOff) - (char *)(pDataCols->buf)));
    }

    if (keepData) {
      pRet->cols[i].len = pDataCols->cols[i].len;
H
Hongze Cheng 已提交
401 402 403 404 405
      if (pDataCols->cols[i].len > 0) {
        memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pDataCols->cols[i].len);
        if (pRet->cols[i].type == TSDB_DATA_TYPE_BINARY || pRet->cols[i].type == TSDB_DATA_TYPE_NCHAR) {
          memcpy(pRet->cols[i].dataOff, pDataCols->cols[i].dataOff, sizeof(VarDataOffsetT) * pDataCols->maxPoints);
        }
H
TD-166  
hzcheng 已提交
406 407
      }
    }
H
TD-100  
hzcheng 已提交
408 409 410 411 412
  }

  return pRet;
}

H
TD-34  
hzcheng 已提交
413
void tdResetDataCols(SDataCols *pCols) {
B
Bomin Zhang 已提交
414 415 416 417 418
  if (pCols != NULL) {
    pCols->numOfRows = 0;
    for (int i = 0; i < pCols->maxCols; i++) {
      dataColReset(pCols->cols + i);
    }
H
TD-34  
hzcheng 已提交
419 420 421
  }
}

H
TD-90  
Hongze Cheng 已提交
422
void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols) {
H
TD-166  
hzcheng 已提交
423 424
  ASSERT(dataColsKeyLast(pCols) < dataRowKey(row));

H
TD-90  
Hongze Cheng 已提交
425 426 427 428 429 430 431 432 433 434
  int rcol = 0;
  int dcol = 0;

  while (dcol < pCols->numOfCols) {
    SDataCol *pDataCol = &(pCols->cols[dcol]);
    if (rcol >= schemaNCols(pSchema)) {
      dataColSetNullAt(pDataCol, pCols->numOfRows);
      dcol++;
      continue;
    }
H
TD-166  
hzcheng 已提交
435

H
TD-90  
Hongze Cheng 已提交
436 437
    STColumn *pRowCol = schemaColAt(pSchema, rcol);
    if (pRowCol->colId == pDataCol->colId) {
H
TD-90  
Hongze Cheng 已提交
438 439
      void *value = tdGetRowDataOfCol(row, pRowCol->type, pRowCol->offset+TD_DATA_ROW_HEAD_SIZE);
      dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints);
H
TD-90  
Hongze Cheng 已提交
440 441 442 443 444 445 446 447
      dcol++;
      rcol++;
    } else if (pRowCol->colId < pDataCol->colId) {
      rcol++;
    } else {
      dataColSetNullAt(pDataCol, pCols->numOfRows);
      dcol++;
    }
H
TD-34  
hzcheng 已提交
448
  }
H
Haojun Liao 已提交
449
  pCols->numOfRows++;
H
TD-34  
hzcheng 已提交
450
}
H
TD-166  
hzcheng 已提交
451

H
TD-34  
hzcheng 已提交
452 453
// Pop pointsToPop points from the SDataCols
void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop) {
H
Haojun Liao 已提交
454
  int pointsLeft = pCols->numOfRows - pointsToPop;
H
TD-166  
hzcheng 已提交
455
  if (pointsLeft <= 0) {
H
TD-166  
hzcheng 已提交
456 457 458
    tdResetDataCols(pCols);
    return;
  }
H
TD-34  
hzcheng 已提交
459 460

  for (int iCol = 0; iCol < pCols->numOfCols; iCol++) {
H
TD-166  
hzcheng 已提交
461
    SDataCol *pCol = pCols->cols + iCol;
H
Haojun Liao 已提交
462
    dataColPopPoints(pCol, pointsToPop, pCols->numOfRows);
H
TD-34  
hzcheng 已提交
463
  }
H
Haojun Liao 已提交
464
  pCols->numOfRows = pointsLeft;
H
TD-34  
hzcheng 已提交
465
}
H
TD-34  
hzcheng 已提交
466

H
hzcheng 已提交
467
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) {
H
Haojun Liao 已提交
468 469
  ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows);
  ASSERT(target->numOfRows + rowsToMerge <= target->maxPoints);
H
TD-166  
hzcheng 已提交
470
  ASSERT(target->numOfCols == source->numOfCols);
H
TD-100  
hzcheng 已提交
471

H
TD-166  
hzcheng 已提交
472
  SDataCols *pTarget = NULL;
H
TD-100  
hzcheng 已提交
473

H
TD-166  
hzcheng 已提交
474 475 476
  if (dataColsKeyLast(target) < dataColsKeyFirst(source)) {  // No overlap
    for (int i = 0; i < rowsToMerge; i++) {
      for (int j = 0; j < source->numOfCols; j++) {
H
Hongze Cheng 已提交
477 478 479 480
        if (source->cols[j].len > 0) {
          dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i), target->numOfRows,
                           target->maxPoints);
        }
H
TD-166  
hzcheng 已提交
481
      }
H
Haojun Liao 已提交
482
      target->numOfRows++;
H
TD-166  
hzcheng 已提交
483 484 485 486 487 488 489
    }
  } else {
    pTarget = tdDupDataCols(target, true);
    if (pTarget == NULL) goto _err;

    int iter1 = 0;
    int iter2 = 0;
H
TD-521  
Hongze Cheng 已提交
490 491
    tdMergeTwoDataCols(target, pTarget, &iter1, pTarget->numOfRows, source, &iter2, source->numOfRows,
                       pTarget->numOfRows + rowsToMerge);
H
TD-166  
hzcheng 已提交
492
  }
H
TD-100  
hzcheng 已提交
493 494 495 496 497 498 499 500

  tdFreeDataCols(pTarget);
  return 0;

_err:
  tdFreeDataCols(pTarget);
  return -1;
}
H
TD-100  
hzcheng 已提交
501

H
TD-521  
Hongze Cheng 已提交
502
void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, int limit2, int tRows) {
H
TD-100  
hzcheng 已提交
503
  tdResetDataCols(target);
H
TD-521  
Hongze Cheng 已提交
504
  ASSERT(limit1 <= src1->numOfRows && limit2 <= src2->numOfRows);
H
TD-100  
hzcheng 已提交
505

H
Haojun Liao 已提交
506
  while (target->numOfRows < tRows) {
H
TD-521  
Hongze Cheng 已提交
507
    if (*iter1 >= limit1 && *iter2 >= limit2) break;
H
TD-100  
hzcheng 已提交
508

H
TD-521  
Hongze Cheng 已提交
509 510
    TSKEY key1 = (*iter1 >= limit1) ? INT64_MAX : ((TSKEY *)(src1->cols[0].pData))[*iter1];
    TSKEY key2 = (*iter2 >= limit2) ? INT64_MAX : ((TSKEY *)(src2->cols[0].pData))[*iter2];
H
TD-100  
hzcheng 已提交
511

H
Hongze Cheng 已提交
512
    if (key1 <= key2) {
H
TD-100  
hzcheng 已提交
513 514
      for (int i = 0; i < src1->numOfCols; i++) {
        ASSERT(target->cols[i].type == src1->cols[i].type);
H
Hongze Cheng 已提交
515 516 517 518
        if (src1->cols[i].len > 0) {
          dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows,
                           target->maxPoints);
        }
H
TD-100  
hzcheng 已提交
519 520
      }

H
Haojun Liao 已提交
521
      target->numOfRows++;
H
TD-100  
hzcheng 已提交
522
      (*iter1)++;
H
Hongze Cheng 已提交
523 524
      if (key1 == key2) (*iter2)++;
    } else {
H
TD-100  
hzcheng 已提交
525 526
      for (int i = 0; i < src2->numOfCols; i++) {
        ASSERT(target->cols[i].type == src2->cols[i].type);
H
Hongze Cheng 已提交
527 528 529 530
        if (src2->cols[i].len > 0) {
          dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src2->cols + i, *iter2), target->numOfRows,
                           target->maxPoints);
        }
H
TD-100  
hzcheng 已提交
531
      }
H
TD-100  
hzcheng 已提交
532

H
Haojun Liao 已提交
533
      target->numOfRows++;
H
TD-100  
hzcheng 已提交
534
      (*iter2)++;
H
TD-100  
hzcheng 已提交
535 536
    }
  }
H
Hongze Cheng 已提交
537 538
}

H
Hongze Cheng 已提交
539 540
SKVRow tdKVRowDup(SKVRow row) {
  SKVRow trow = malloc(kvRowLen(row));
H
Hongze Cheng 已提交
541 542
  if (trow == NULL) return NULL;

H
Hongze Cheng 已提交
543
  kvRowCpy(trow, row);
H
Hongze Cheng 已提交
544 545 546
  return trow;
}

B
Bomin Zhang 已提交
547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562
static int compareColIdx(const void* a, const void* b) {
  const SColIdx* x = (const SColIdx*)a;
  const SColIdx* y = (const SColIdx*)b;
  if (x->colId > y->colId) {
    return 1;
  }
  if (x->colId < y->colId) {
    return -1;
  }
  return 0;
}

void tdSortKVRowByColIdx(SKVRow row) {
  qsort(kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), compareColIdx);
}

H
TD-90  
Hongze Cheng 已提交
563 564 565 566 567 568
int tdSetKVRowDataOfCol(SKVRow *orow, int16_t colId, int8_t type, void *value) {
  SColIdx *pColIdx = NULL;
  SKVRow   row = *orow;
  SKVRow   nrow = NULL;
  void *   ptr = taosbsearch(&colId, kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), comparTagId, TD_GE);

H
Hongze Cheng 已提交
569
  if (ptr == NULL || ((SColIdx *)ptr)->colId > colId) { // need to add a column value to the row
H
TD-90  
Hongze Cheng 已提交
570 571 572 573
    int diff = IS_VAR_DATA_TYPE(type) ? varDataTLen(value) : TYPE_BYTES[type];
    nrow = malloc(kvRowLen(row) + sizeof(SColIdx) + diff);
    if (nrow == NULL) return -1;

574
    kvRowSetLen(nrow, kvRowLen(row) + (int16_t)sizeof(SColIdx) + diff);
H
TD-90  
Hongze Cheng 已提交
575 576 577 578 579 580 581
    kvRowSetNCols(nrow, kvRowNCols(row) + 1);

    if (ptr == NULL) {
      memcpy(kvRowColIdx(nrow), kvRowColIdx(row), sizeof(SColIdx) * kvRowNCols(row));
      memcpy(kvRowValues(nrow), kvRowValues(row), POINTER_DISTANCE(kvRowEnd(row), kvRowValues(row)));
      int colIdx = kvRowNCols(nrow) - 1;
      kvRowColIdxAt(nrow, colIdx)->colId = colId;
582
      kvRowColIdxAt(nrow, colIdx)->offset = (int16_t)(POINTER_DISTANCE(kvRowEnd(row), kvRowValues(row)));
H
TD-90  
Hongze Cheng 已提交
583 584
      memcpy(kvRowColVal(nrow, kvRowColIdxAt(nrow, colIdx)), value, diff);
    } else {
585
      int16_t tlen = (int16_t)(POINTER_DISTANCE(ptr, kvRowColIdx(row)));
H
TD-90  
Hongze Cheng 已提交
586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607
      if (tlen > 0) {
        memcpy(kvRowColIdx(nrow), kvRowColIdx(row), tlen);
        memcpy(kvRowValues(nrow), kvRowValues(row), ((SColIdx *)ptr)->offset);
      }

      int colIdx = tlen / sizeof(SColIdx);
      kvRowColIdxAt(nrow, colIdx)->colId = colId;
      kvRowColIdxAt(nrow, colIdx)->offset = ((SColIdx *)ptr)->offset;
      memcpy(kvRowColVal(nrow, kvRowColIdxAt(nrow, colIdx)), value, diff);

      for (int i = colIdx; i < kvRowNCols(row); i++) {
        kvRowColIdxAt(nrow, i + 1)->colId = kvRowColIdxAt(row, i)->colId;
        kvRowColIdxAt(nrow, i + 1)->offset = kvRowColIdxAt(row, i)->offset + diff;
      }
      memcpy(kvRowColVal(nrow, kvRowColIdxAt(nrow, colIdx + 1)), kvRowColVal(row, kvRowColIdxAt(row, colIdx)),
             POINTER_DISTANCE(kvRowEnd(row), kvRowColVal(row, kvRowColIdxAt(row, colIdx)))

      );
    }

    *orow = nrow;
    free(row);
H
TD-90  
Hongze Cheng 已提交
608 609 610 611 612 613 614 615 616 617 618 619
  } else {
    ASSERT(((SColIdx *)ptr)->colId == colId);
    if (IS_VAR_DATA_TYPE(type)) {
      void *pOldVal = kvRowColVal(row, (SColIdx *)ptr);

      if (varDataTLen(value) == varDataTLen(pOldVal)) { // just update the column value in place
        memcpy(pOldVal, value, varDataTLen(value));
      } else { // need to reallocate the memory
        int16_t diff = varDataTLen(value) - varDataTLen(pOldVal);
        int16_t nlen = kvRowLen(row) + diff;
        ASSERT(nlen > 0);
        nrow = malloc(nlen);
H
TD-90  
Hongze Cheng 已提交
620
        if (nrow == NULL) return -1;
H
TD-90  
Hongze Cheng 已提交
621 622 623 624 625

        kvRowSetLen(nrow, nlen);
        kvRowSetNCols(nrow, kvRowNCols(row));

        // Copy part ahead
626
        nlen = (int16_t)(POINTER_DISTANCE(ptr, kvRowColIdx(row)));
H
TD-90  
Hongze Cheng 已提交
627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644
        ASSERT(nlen % sizeof(SColIdx) == 0);
        if (nlen > 0) {
          ASSERT(((SColIdx *)ptr)->offset > 0);
          memcpy(kvRowColIdx(nrow), kvRowColIdx(row), nlen);
          memcpy(kvRowValues(nrow), kvRowValues(row), ((SColIdx *)ptr)->offset);
        }

        // Construct current column value
        int colIdx = nlen / sizeof(SColIdx);
        pColIdx = kvRowColIdxAt(nrow, colIdx);
        pColIdx->colId = ((SColIdx *)ptr)->colId;
        pColIdx->offset = ((SColIdx *)ptr)->offset;
        memcpy(kvRowColVal(nrow, pColIdx), value, varDataTLen(value));
 
        // Construct columns after
        if (kvRowNCols(nrow) - colIdx - 1 > 0) {
          for (int i = colIdx + 1; i < kvRowNCols(nrow); i++) {
            kvRowColIdxAt(nrow, i)->colId = kvRowColIdxAt(row, i)->colId;
H
Hongze Cheng 已提交
645
            kvRowColIdxAt(nrow, i)->offset = kvRowColIdxAt(row, i)->offset + diff;
H
TD-90  
Hongze Cheng 已提交
646 647 648 649 650 651
          }
          memcpy(kvRowColVal(nrow, kvRowColIdxAt(nrow, colIdx + 1)), kvRowColVal(row, kvRowColIdxAt(row, colIdx + 1)),
                 POINTER_DISTANCE(kvRowEnd(row), kvRowColVal(row, kvRowColIdxAt(row, colIdx + 1))));
        }

        *orow = nrow;
H
TD-90  
Hongze Cheng 已提交
652
        free(row);
H
TD-90  
Hongze Cheng 已提交
653 654 655 656 657 658 659
      }
    } else {
      memcpy(kvRowColVal(row, (SColIdx *)ptr), value, TYPE_BYTES[type]);
    }
  }

  return 0;
H
Hongze Cheng 已提交
660 661
}

H
TD-353  
Hongze Cheng 已提交
662
int tdEncodeKVRow(void **buf, SKVRow row) {
H
Hongze Cheng 已提交
663
  // May change the encode purpose
H
TD-353  
Hongze Cheng 已提交
664 665 666 667 668 669
  if (buf != NULL) {
    kvRowCpy(*buf, row);
    *buf = POINTER_SHIFT(*buf, kvRowLen(row));
  }

  return kvRowLen(row);
H
Hongze Cheng 已提交
670 671
}

H
Hongze Cheng 已提交
672 673
void *tdDecodeKVRow(void *buf, SKVRow *row) {
  *row = tdKVRowDup(buf);
H
TD-353  
Hongze Cheng 已提交
674
  if (*row == NULL) return NULL;
H
Hongze Cheng 已提交
675
  return POINTER_SHIFT(buf, kvRowLen(*row));
H
Hongze Cheng 已提交
676 677
}

H
Hongze Cheng 已提交
678
int tdInitKVRowBuilder(SKVRowBuilder *pBuilder) {
H
Hongze Cheng 已提交
679 680 681 682 683 684 685 686 687 688 689 690 691 692
  pBuilder->tCols = 128;
  pBuilder->nCols = 0;
  pBuilder->pColIdx = (SColIdx *)malloc(sizeof(SColIdx) * pBuilder->tCols);
  if (pBuilder->pColIdx == NULL) return -1;
  pBuilder->alloc = 1024;
  pBuilder->size = 0;
  pBuilder->buf = malloc(pBuilder->alloc);
  if (pBuilder->buf == NULL) {
    free(pBuilder->pColIdx);
    return -1;
  }
  return 0;
}

H
Hongze Cheng 已提交
693
void tdDestroyKVRowBuilder(SKVRowBuilder *pBuilder) {
S
Shengliang Guan 已提交
694 695
  taosTFree(pBuilder->pColIdx);
  taosTFree(pBuilder->buf);
H
Hongze Cheng 已提交
696 697
}

H
Hongze Cheng 已提交
698
void tdResetKVRowBuilder(SKVRowBuilder *pBuilder) {
H
Hongze Cheng 已提交
699 700 701 702
  pBuilder->nCols = 0;
  pBuilder->size = 0;
}

H
Hongze Cheng 已提交
703
SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder) {
H
Hongze Cheng 已提交
704 705 706
  int tlen = sizeof(SColIdx) * pBuilder->nCols + pBuilder->size;
  if (tlen == 0) return NULL;

H
Hongze Cheng 已提交
707 708 709
  tlen += TD_KV_ROW_HEAD_SIZE;

  SKVRow row = malloc(tlen);
H
Hongze Cheng 已提交
710 711
  if (row == NULL) return NULL;

H
Hongze Cheng 已提交
712
  kvRowSetNCols(row, pBuilder->nCols);
H
Hongze Cheng 已提交
713
  kvRowSetLen(row, tlen);
H
Hongze Cheng 已提交
714

H
Hongze Cheng 已提交
715 716
  memcpy(kvRowColIdx(row), pBuilder->pColIdx, sizeof(SColIdx) * pBuilder->nCols);
  memcpy(kvRowValues(row), pBuilder->buf, pBuilder->size);
H
Hongze Cheng 已提交
717 718

  return row;
S
TD-1530  
Shengliang Guan 已提交
719
}