tdataformat.c 17.3 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
S
slguan 已提交
15
#include "tdataformat.h"
H
TD-166  
hzcheng 已提交
16
#include "wchar.h"
T
Tao Liu 已提交
17
#include "talgo.h"
H
more  
hzcheng 已提交
18

H
hzcheng 已提交
19 20 21 22 23 24 25 26 27 28
/**
 * Create a SSchema object with nCols columns
 * ASSUMPTIONS: VALID PARAMETERS
 *
 * @param nCols number of columns the schema has
 *
 * @return a STSchema object for success
 *         NULL for failure
 */
STSchema *tdNewSchema(int32_t nCols) {
H
TD-166  
hzcheng 已提交
29
  int32_t size = sizeof(STSchema) + sizeof(STColumn) * nCols;
H
hzcheng 已提交
30

H
TD-27  
hzcheng 已提交
31
  STSchema *pSchema = (STSchema *)calloc(1, size);
H
hzcheng 已提交
32
  if (pSchema == NULL) return NULL;
H
TD-166  
hzcheng 已提交
33

H
hzcheng 已提交
34
  pSchema->numOfCols = 0;
H
TD-166  
hzcheng 已提交
35 36 37
  pSchema->totalCols = nCols;
  pSchema->flen = 0;
  pSchema->tlen = 0;
H
hzcheng 已提交
38 39 40 41

  return pSchema;
}

H
hzcheng 已提交
42 43 44
/**
 * Append a column to the schema
 */
H
TD-166  
hzcheng 已提交
45 46
int tdSchemaAddCol(STSchema *pSchema, int8_t type, int16_t colId, int32_t bytes) {
  if (!isValidDataType(type, 0) || pSchema->numOfCols >= pSchema->totalCols) return -1;
H
hzcheng 已提交
47 48 49 50

  STColumn *pCol = schemaColAt(pSchema, schemaNCols(pSchema));
  colSetType(pCol, type);
  colSetColId(pCol, colId);
H
TD-166  
hzcheng 已提交
51
  if (schemaNCols(pSchema) == 0) {
H
TD-166  
hzcheng 已提交
52 53
    colSetOffset(pCol, 0);
  } else {
H
TD-166  
hzcheng 已提交
54
    STColumn *pTCol = schemaColAt(pSchema, schemaNCols(pSchema)-1);
H
TD-166  
hzcheng 已提交
55
    colSetOffset(pCol, pTCol->offset + TYPE_BYTES[pTCol->type]);
H
TD-166  
hzcheng 已提交
56
  }
H
hzcheng 已提交
57 58 59
  switch (type) {
    case TSDB_DATA_TYPE_BINARY:
    case TSDB_DATA_TYPE_NCHAR:
H
TD-166  
hzcheng 已提交
60 61
      colSetBytes(pCol, bytes); // Set as maximum bytes
      pSchema->tlen += (TYPE_BYTES[type] + sizeof(VarDataLenT) + bytes);
H
hzcheng 已提交
62 63 64
      break;
    default:
      colSetBytes(pCol, TYPE_BYTES[type]);
H
TD-166  
hzcheng 已提交
65
      pSchema->tlen += TYPE_BYTES[type];
H
hzcheng 已提交
66 67 68 69
      break;
  }

  pSchema->numOfCols++;
H
TD-166  
hzcheng 已提交
70 71 72
  pSchema->flen += TYPE_BYTES[type];

  ASSERT(pCol->offset < pSchema->flen);
H
hzcheng 已提交
73 74 75 76

  return 0;
}

H
hzcheng 已提交
77 78 79 80 81 82 83 84 85 86 87 88 89
/**
 * Duplicate the schema and return a new object
 */
STSchema *tdDupSchema(STSchema *pSchema) {
  STSchema *tSchema = tdNewSchema(schemaNCols(pSchema));
  if (tSchema == NULL) return NULL;

  int32_t size = sizeof(STSchema) + sizeof(STColumn) * schemaNCols(pSchema);
  memcpy((void *)tSchema, (void *)pSchema, size);

  return tSchema;
}

H
TD-27  
hzcheng 已提交
90 91 92 93
/**
 * Return the size of encoded schema
 */
int tdGetSchemaEncodeSize(STSchema *pSchema) {
H
TD-166  
hzcheng 已提交
94 95 96
  return T_MEMBER_SIZE(STSchema, totalCols) +
         schemaNCols(pSchema) *
             (T_MEMBER_SIZE(STColumn, type) + T_MEMBER_SIZE(STColumn, colId) + T_MEMBER_SIZE(STColumn, bytes));
H
TD-27  
hzcheng 已提交
97 98 99 100 101 102
}

/**
 * Encode a schema to dst, and return the next pointer
 */
void *tdEncodeSchema(void *dst, STSchema *pSchema) {
H
TD-166  
hzcheng 已提交
103 104 105
  ASSERT(pSchema->numOfCols == pSchema->totalCols);

  T_APPEND_MEMBER(dst, pSchema, STSchema, totalCols);
H
TD-27  
hzcheng 已提交
106 107 108 109 110 111 112 113 114 115 116 117 118 119
  for (int i = 0; i < schemaNCols(pSchema); i++) {
    STColumn *pCol = schemaColAt(pSchema, i);
    T_APPEND_MEMBER(dst, pCol, STColumn, type);
    T_APPEND_MEMBER(dst, pCol, STColumn, colId);
    T_APPEND_MEMBER(dst, pCol, STColumn, bytes);
  }

  return dst;
}

/**
 * Decode a schema from a binary.
 */
STSchema *tdDecodeSchema(void **psrc) {
H
TD-166  
hzcheng 已提交
120
  int totalCols = 0;
H
TD-27  
hzcheng 已提交
121

H
TD-166  
hzcheng 已提交
122
  T_READ_MEMBER(*psrc, int, totalCols);
H
TD-27  
hzcheng 已提交
123

H
TD-166  
hzcheng 已提交
124
  STSchema *pSchema = tdNewSchema(totalCols);
H
TD-27  
hzcheng 已提交
125
  if (pSchema == NULL) return NULL;
H
TD-166  
hzcheng 已提交
126
  for (int i = 0; i < totalCols; i++) {
H
TD-27  
hzcheng 已提交
127 128 129 130 131 132 133
    int8_t  type = 0;
    int16_t colId = 0;
    int32_t bytes = 0;
    T_READ_MEMBER(*psrc, int8_t, type);
    T_READ_MEMBER(*psrc, int16_t, colId);
    T_READ_MEMBER(*psrc, int32_t, bytes);

H
TD-166  
hzcheng 已提交
134
    tdSchemaAddCol(pSchema, type, colId, bytes);
H
TD-27  
hzcheng 已提交
135 136 137 138 139
  }

  return pSchema;
}

H
hzcheng 已提交
140 141 142
/**
 * Initialize a data row
 */
H
TD-166  
hzcheng 已提交
143
void tdInitDataRow(SDataRow row, STSchema *pSchema) { dataRowSetLen(row, TD_DATA_ROW_HEAD_SIZE + schemaFLen(pSchema)); }
H
hzcheng 已提交
144

H
TD-166  
hzcheng 已提交
145
SDataRow tdNewDataRowFromSchema(STSchema *pSchema) {
H
TD-166  
hzcheng 已提交
146
  int32_t size = dataRowMaxBytesFromSchema(pSchema);
H
hzcheng 已提交
147 148 149 150

  SDataRow row = malloc(size);
  if (row == NULL) return NULL;

H
hzcheng 已提交
151
  tdInitDataRow(row, pSchema);
H
hzcheng 已提交
152
  return row;
H
TD-166  
hzcheng 已提交
153
}
H
hzcheng 已提交
154

155 156 157 158
int tdSetTagCol(SDataRow row, void *value, int16_t len, int8_t type, int16_t colId){ //insert/update tag value and update all the information
  ASSERT(((STagRow *)row)->pData != NULL);
  //STagCol * stCol = tdQueryTagColByID()

T
Tao Liu 已提交
159 160 161 162
  return 0;
};  

int tdDeleteTagCol(SDataRow row, int16_t colId){   // delete tag value and update all the information
T
Tao Liu 已提交
163 164
  //todo
  return 0;
T
Tao Liu 已提交
165 166
};  

T
Tao Liu 已提交
167 168
static int compTagId(const void *key1, const void *key2) {
  if (((STagCol *)key1)->colId > ((STagCol *)key2)->colId) {
T
Tao Liu 已提交
169
    return 1;
T
Tao Liu 已提交
170
  } else if (((STagCol *)key1)->colId == ((STagCol *)key2)->colId) {
T
Tao Liu 已提交
171 172 173 174 175 176
    return 0;
  } else {
    return -1;
  }
}

177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
/**
 * Find tag structure by colId, if find, return tag structure, else return NULL;
 */
STagCol * tdQueryTagColByID(SDataRow row, int16_t colId, int flags) {  //if find tag, 0, else return -1; 
  ASSERT(((STagRow *)row)->pData != NULL);
  STagCol *pBase = ((STagRow *)row)->tagCols;
  int16_t nCols = ((STagRow *)row)->ncols;
  STagCol key = {colId,0,0};
  STagCol * stCol = taosbsearch(&key, pBase, nCols, sizeof(STagCol), compTagId, flags);
  return stCol;
};   

/**
* Find tag value by colId, if find, return tag value, else return NULL;
*/
void * tdQueryTagByID(SDataRow row, int16_t colId, int16_t *type) {
T
Tao Liu 已提交
193 194 195
  ASSERT(((STagRow *)row)->pData != NULL);
  STagCol *pBase = ((STagRow *)row)->tagCols;
  int16_t nCols = ((STagRow *)row)->ncols;
T
Tao Liu 已提交
196 197
  STagCol key = {colId,0,0};
  STagCol * stCol = taosbsearch(&key, pBase, nCols, sizeof(STagCol), compTagId, TD_EQ);
T
Tao Liu 已提交
198 199 200 201 202 203 204 205
  if (NULL == stCol) {
    return NULL;
  }
  
  void * pData = ((STagRow *)row)->pData;
  *type = stCol->colType;

  return pData + stCol->offset;
T
Tao Liu 已提交
206 207
};   

T
Tao Liu 已提交
208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230
int tdAppendTagColVal(SDataRow row, void *value, int8_t type, int32_t bytes, int16_t colId){
  ASSERT(value != NULL);
  //ASSERT(bytes-2 == varDataTLen(value));
  ASSERT(row != NULL);
  STagRow *pTagrow = row;
  pTagrow->tagCols[pTagrow->ncols].colId = colId;
  pTagrow->tagCols[pTagrow->ncols].colType = type;
  pTagrow->tagCols[pTagrow->ncols].offset = pTagrow->dataLen;
  
  switch (type) {
    case TSDB_DATA_TYPE_BINARY:
    case TSDB_DATA_TYPE_NCHAR:
      memcpy((char *)pTagrow->pData + pTagrow->dataLen, value, varDataTLen(value));
      pTagrow->dataLen += varDataTLen(value);
      break;
    default:
      memcpy((char *)pTagrow->pData + pTagrow->dataLen, value, TYPE_BYTES[type]);
      pTagrow->dataLen += TYPE_BYTES[type];
      break;
  } 
  
  pTagrow->ncols++;   

T
Tao Liu 已提交
231 232 233
  return 0;
};  

T
Tao Liu 已提交
234 235 236 237 238 239
void * tdNewTagRowFromSchema(STSchema *pSchema, int16_t numofTags) {
  int32_t size = sizeof(STagRow) + numofTags * sizeof(STagCol);

  STagRow *row = malloc(size);
  if (row == NULL) return NULL;

T
Tao Liu 已提交
240
  int32_t datasize = pSchema->tlen;
T
Tao Liu 已提交
241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286
  row->pData = malloc(datasize);
  if (NULL == row->pData) {
    free(row);
    return NULL;
  }

  row->len = size;
  row->dataLen = 0; 
  row->ncols = 0; 
  return row;   
}
/**
 * free tag row 
 */
 
void tdFreeTagRow(SDataRow row) {
  if (row) {
    free(((STagRow *)row)->pData);
    free(row);
  }  
}

SDataRow tdTagRowDup(SDataRow row) {
  STagRow *trow = malloc(dataRowLen(row));
  if (trow == NULL) return NULL;
  
  dataRowCpy(trow, row);
  trow->pData = malloc(trow->dataLen);
  if (NULL == trow->pData) {
    free(trow);
    return NULL;
  }
  memcpy(trow->pData, ((STagRow *)row)->pData, trow->dataLen);
  return trow;
}

SDataRow tdTagRowDecode(SDataRow row) {
  STagRow *trow = malloc(dataRowLen(row));
  if (trow == NULL) return NULL;
  
  dataRowCpy(trow, row);
  trow->pData = malloc(trow->dataLen);
  if (NULL == trow->pData) {
    free(trow);
    return NULL;
  }
T
Tao Liu 已提交
287
  char * pData = (char *)row + dataRowLen(row);
T
Tao Liu 已提交
288 289
  memcpy(trow->pData, pData, trow->dataLen);
  return trow;
T
Tao Liu 已提交
290 291
}

T
Tao Liu 已提交
292 293 294 295 296 297 298 299
int tdTagRowCpy(SDataRow dst, SDataRow src) {
  if (src == NULL) return -1;
  
  dataRowCpy(dst, src);
  void * pData = dst + dataRowLen(src);
  memcpy(pData, ((STagRow *)src)->pData, ((STagRow *)src)->dataLen);
  return 0;
}
H
hzcheng 已提交
300 301 302 303 304 305 306
/**
 * Free the SDataRow object
 */
void tdFreeDataRow(SDataRow row) {
  if (row) free(row);
}

H
hzcheng 已提交
307
SDataRow tdDataRowDup(SDataRow row) {
H
hzcheng 已提交
308
  SDataRow trow = malloc(dataRowLen(row));
H
hzcheng 已提交
309 310 311
  if (trow == NULL) return NULL;

  dataRowCpy(trow, row);
H
hzcheng 已提交
312
  return trow;
H
hzcheng 已提交
313
}
H
hzcheng 已提交
314

H
TD-166  
hzcheng 已提交
315 316 317 318
void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints) {
  pDataCol->type = colType(pCol);
  pDataCol->colId = colColId(pCol);
  pDataCol->bytes = colBytes(pCol);
H
TD-166  
hzcheng 已提交
319
  pDataCol->offset = colOffset(pCol) + TD_DATA_ROW_HEAD_SIZE;
H
TD-166  
hzcheng 已提交
320 321 322

  pDataCol->len = 0;
  if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) {
H
TD-166  
hzcheng 已提交
323
    pDataCol->spaceSize = (sizeof(VarDataLenT) + pDataCol->bytes) * maxPoints;
H
TD-166  
hzcheng 已提交
324
    pDataCol->dataOff = (VarDataOffsetT *)(*pBuf);
H
hzcheng 已提交
325 326
    pDataCol->pData = POINTER_SHIFT(*pBuf, TYPE_BYTES[pDataCol->type] * maxPoints);
    *pBuf = POINTER_SHIFT(*pBuf, pDataCol->spaceSize + TYPE_BYTES[pDataCol->type] * maxPoints);
H
TD-166  
hzcheng 已提交
327 328 329 330
  } else {
    pDataCol->spaceSize = pDataCol->bytes * maxPoints;
    pDataCol->dataOff = NULL;
    pDataCol->pData = *pBuf;
H
hzcheng 已提交
331
    *pBuf = POINTER_SHIFT(*pBuf, pDataCol->spaceSize);
H
TD-166  
hzcheng 已提交
332 333 334 335
  }

}

H
Haojun Liao 已提交
336
void dataColAppendVal(SDataCol *pCol, void *value, int numOfRows, int maxPoints) {
H
TD-166  
hzcheng 已提交
337 338 339 340 341 342
  ASSERT(pCol != NULL && value != NULL);

  switch (pCol->type) {
    case TSDB_DATA_TYPE_BINARY:
    case TSDB_DATA_TYPE_NCHAR:
      // set offset
H
Haojun Liao 已提交
343
      pCol->dataOff[numOfRows] = pCol->len;
H
TD-166  
hzcheng 已提交
344
      // Copy data
H
hzcheng 已提交
345
      memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, varDataTLen(value));
H
TD-166  
hzcheng 已提交
346
      // Update the length
H
TD-166  
hzcheng 已提交
347
      pCol->len += varDataTLen(value);
H
TD-166  
hzcheng 已提交
348 349
      break;
    default:
H
Haojun Liao 已提交
350
      ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows);
H
hzcheng 已提交
351
      memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, pCol->bytes);
H
TD-166  
hzcheng 已提交
352 353 354 355 356
      pCol->len += pCol->bytes;
      break;
  }
}

H
Haojun Liao 已提交
357 358
void dataColPopPoints(SDataCol *pCol, int pointsToPop, int numOfRows) {
  int pointsLeft = numOfRows - pointsToPop;
H
TD-166  
hzcheng 已提交
359 360 361 362 363

  ASSERT(pointsLeft > 0);

  if (pCol->type == TSDB_DATA_TYPE_BINARY || pCol->type == TSDB_DATA_TYPE_NCHAR) {
    ASSERT(pCol->len > 0);
H
TD-166  
hzcheng 已提交
364
    VarDataOffsetT toffset = pCol->dataOff[pointsToPop];
H
TD-166  
hzcheng 已提交
365 366
    pCol->len = pCol->len - toffset;
    ASSERT(pCol->len > 0);
H
hzcheng 已提交
367
    memmove(pCol->pData, POINTER_SHIFT(pCol->pData, toffset), pCol->len);
H
TD-166  
hzcheng 已提交
368 369
    dataColSetOffset(pCol, pointsLeft);
  } else {
H
Haojun Liao 已提交
370
    ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows);
H
TD-166  
hzcheng 已提交
371
    pCol->len = TYPE_BYTES[pCol->type] * pointsLeft;
H
hzcheng 已提交
372
    memmove(pCol->pData, POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * pointsToPop), pCol->len);
H
TD-166  
hzcheng 已提交
373 374 375
  }
}

H
TD-166  
hzcheng 已提交
376 377 378 379 380
bool isNEleNull(SDataCol *pCol, int nEle) {
  switch (pCol->type) {
    case TSDB_DATA_TYPE_BINARY:
    case TSDB_DATA_TYPE_NCHAR:
      for (int i = 0; i < nEle; i++) {
H
TD-166  
hzcheng 已提交
381
        if (!isNull(varDataVal(tdGetColDataOfRow(pCol, i)), pCol->type)) return false;
H
TD-166  
hzcheng 已提交
382 383 384 385 386 387 388 389 390 391 392 393 394 395 396
      }
      return true;
    default:
      for (int i = 0; i < nEle; i++) {
        if (!isNull(tdGetColDataOfRow(pCol, i), pCol->type)) return false;
      }
      return true;
  }
}

void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints) {
  char *ptr = NULL;
  switch (pCol->type) {
    case TSDB_DATA_TYPE_BINARY:
    case TSDB_DATA_TYPE_NCHAR:
H
TD-166  
hzcheng 已提交
397
      pCol->len = 0;
H
TD-166  
hzcheng 已提交
398
      for (int i = 0; i < nEle; i++) {
H
TD-166  
hzcheng 已提交
399 400 401 402 403
        pCol->dataOff[i] = pCol->len;
        ptr = (char *)pCol->pData + pCol->len;
        varDataLen(ptr) = (pCol->type == TSDB_DATA_TYPE_BINARY) ? sizeof(char) : TSDB_NCHAR_SIZE;
        setNull(ptr + sizeof(VarDataLenT), pCol->type, pCol->bytes);
        pCol->len += varDataTLen(ptr);
H
TD-166  
hzcheng 已提交
404
      }
H
TD-166  
hzcheng 已提交
405

H
TD-166  
hzcheng 已提交
406 407 408 409 410 411 412 413
      break;
    default:
      setNullN(pCol->pData, pCol->type, pCol->bytes, nEle);
      pCol->len = TYPE_BYTES[pCol->type] * nEle;
      break;
  }
}

H
TD-166  
hzcheng 已提交
414 415 416
void dataColSetOffset(SDataCol *pCol, int nEle) {
  ASSERT(((pCol->type == TSDB_DATA_TYPE_BINARY) || (pCol->type == TSDB_DATA_TYPE_NCHAR)));

H
TD-166  
hzcheng 已提交
417 418
  void * tptr = pCol->pData;
  // char *tptr = (char *)(pCol->pData);
H
TD-166  
hzcheng 已提交
419

H
TD-166  
hzcheng 已提交
420
  VarDataOffsetT offset = 0;
H
TD-166  
hzcheng 已提交
421
  for (int i = 0; i < nEle; i++) {
H
TD-166  
hzcheng 已提交
422
    pCol->dataOff[i] = offset;
H
TD-166  
hzcheng 已提交
423
    offset += varDataTLen(tptr);
H
hzcheng 已提交
424
    tptr = POINTER_SHIFT(tptr, varDataTLen(tptr));
H
TD-166  
hzcheng 已提交
425 426 427
  }
}

H
TD-166  
hzcheng 已提交
428
SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) {
H
TD-34  
hzcheng 已提交
429 430 431 432 433 434
  SDataCols *pCols = (SDataCols *)calloc(1, sizeof(SDataCols) + sizeof(SDataCol) * maxCols);
  if (pCols == NULL) return NULL;

  pCols->maxRowSize = maxRowSize;
  pCols->maxCols = maxCols;
  pCols->maxPoints = maxRows;
H
TD-166  
hzcheng 已提交
435
  pCols->bufSize = maxRowSize * maxRows;
H
TD-34  
hzcheng 已提交
436

H
TD-166  
hzcheng 已提交
437
  pCols->buf = malloc(pCols->bufSize);
H
TD-34  
hzcheng 已提交
438 439
  if (pCols->buf == NULL) {
    free(pCols);
H
TD-34  
hzcheng 已提交
440 441
    return NULL;
  }
H
TD-34  
hzcheng 已提交
442

H
TD-34  
hzcheng 已提交
443 444 445 446 447 448 449 450
  return pCols;
}

void tdInitDataCols(SDataCols *pCols, STSchema *pSchema) {
  // assert(schemaNCols(pSchema) <= pCols->numOfCols);
  tdResetDataCols(pCols);
  pCols->numOfCols = schemaNCols(pSchema);

H
TD-166  
hzcheng 已提交
451
  void *ptr = pCols->buf;
H
TD-34  
hzcheng 已提交
452
  for (int i = 0; i < schemaNCols(pSchema); i++) {
H
TD-166  
hzcheng 已提交
453
    dataColInit(pCols->cols + i, schemaColAt(pSchema, i), &ptr, pCols->maxPoints);
H
TD-166  
hzcheng 已提交
454
    ASSERT((char *)ptr - (char *)(pCols->buf) <= pCols->bufSize);
H
TD-34  
hzcheng 已提交
455
  }
H
TD-34  
hzcheng 已提交
456 457 458 459
}

void tdFreeDataCols(SDataCols *pCols) {
  if (pCols) {
H
TD-166  
hzcheng 已提交
460
    tfree(pCols->buf);
H
TD-34  
hzcheng 已提交
461 462 463 464
    free(pCols);
  }
}

H
TD-100  
hzcheng 已提交
465
SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
H
TD-166  
hzcheng 已提交
466
  SDataCols *pRet = tdNewDataCols(pDataCols->maxRowSize, pDataCols->maxCols, pDataCols->maxPoints);
H
TD-100  
hzcheng 已提交
467 468 469 470
  if (pRet == NULL) return NULL;

  pRet->numOfCols = pDataCols->numOfCols;
  pRet->sversion = pDataCols->sversion;
H
Haojun Liao 已提交
471
  if (keepData) pRet->numOfRows = pDataCols->numOfRows;
H
TD-100  
hzcheng 已提交
472 473 474 475 476 477

  for (int i = 0; i < pDataCols->numOfCols; i++) {
    pRet->cols[i].type = pDataCols->cols[i].type;
    pRet->cols[i].colId = pDataCols->cols[i].colId;
    pRet->cols[i].bytes = pDataCols->cols[i].bytes;
    pRet->cols[i].offset = pDataCols->cols[i].offset;
H
TD-166  
hzcheng 已提交
478 479

    pRet->cols[i].spaceSize = pDataCols->cols[i].spaceSize;
H
TD-100  
hzcheng 已提交
480
    pRet->cols[i].pData = (void *)((char *)pRet->buf + ((char *)(pDataCols->cols[i].pData) - (char *)(pDataCols->buf)));
H
TD-100  
hzcheng 已提交
481

H
TD-166  
hzcheng 已提交
482 483 484 485 486 487 488 489 490 491
    if (pRet->cols[i].type == TSDB_DATA_TYPE_BINARY || pRet->cols[i].type == TSDB_DATA_TYPE_NCHAR) {
      ASSERT(pDataCols->cols[i].dataOff != NULL);
      pRet->cols[i].dataOff =
          (int32_t *)((char *)pRet->buf + ((char *)(pDataCols->cols[i].dataOff) - (char *)(pDataCols->buf)));
    }

    if (keepData) {
      pRet->cols[i].len = pDataCols->cols[i].len;
      memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pDataCols->cols[i].len);
      if (pRet->cols[i].type == TSDB_DATA_TYPE_BINARY || pRet->cols[i].type == TSDB_DATA_TYPE_NCHAR) {
H
TD-166  
hzcheng 已提交
492
        memcpy(pRet->cols[i].dataOff, pDataCols->cols[i].dataOff, sizeof(VarDataOffsetT) * pDataCols->maxPoints);
H
TD-166  
hzcheng 已提交
493 494
      }
    }
H
TD-100  
hzcheng 已提交
495 496 497 498 499
  }

  return pRet;
}

H
TD-34  
hzcheng 已提交
500
void tdResetDataCols(SDataCols *pCols) {
H
Haojun Liao 已提交
501
  pCols->numOfRows = 0;
H
TD-34  
hzcheng 已提交
502
  for (int i = 0; i < pCols->maxCols; i++) {
H
TD-166  
hzcheng 已提交
503
    dataColReset(pCols->cols + i);
H
TD-34  
hzcheng 已提交
504 505 506
  }
}

H
TD-34  
hzcheng 已提交
507
void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols) {
H
TD-166  
hzcheng 已提交
508 509
  ASSERT(dataColsKeyLast(pCols) < dataRowKey(row));

H
TD-34  
hzcheng 已提交
510 511
  for (int i = 0; i < pCols->numOfCols; i++) {
    SDataCol *pCol = pCols->cols + i;
H
TD-166  
hzcheng 已提交
512
    void *    value = tdGetRowDataOfCol(row, pCol->type, pCol->offset);
H
TD-166  
hzcheng 已提交
513

H
Haojun Liao 已提交
514
    dataColAppendVal(pCol, value, pCols->numOfRows, pCols->maxPoints);
H
TD-34  
hzcheng 已提交
515
  }
H
Haojun Liao 已提交
516
  pCols->numOfRows++;
H
TD-34  
hzcheng 已提交
517
}
H
TD-166  
hzcheng 已提交
518

H
TD-34  
hzcheng 已提交
519 520
// Pop pointsToPop points from the SDataCols
void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop) {
H
Haojun Liao 已提交
521
  int pointsLeft = pCols->numOfRows - pointsToPop;
H
TD-166  
hzcheng 已提交
522
  if (pointsLeft <= 0) {
H
TD-166  
hzcheng 已提交
523 524 525
    tdResetDataCols(pCols);
    return;
  }
H
TD-34  
hzcheng 已提交
526 527

  for (int iCol = 0; iCol < pCols->numOfCols; iCol++) {
H
TD-166  
hzcheng 已提交
528
    SDataCol *pCol = pCols->cols + iCol;
H
Haojun Liao 已提交
529
    dataColPopPoints(pCol, pointsToPop, pCols->numOfRows);
H
TD-34  
hzcheng 已提交
530
  }
H
Haojun Liao 已提交
531
  pCols->numOfRows = pointsLeft;
H
TD-34  
hzcheng 已提交
532
}
H
TD-34  
hzcheng 已提交
533

H
hzcheng 已提交
534
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) {
H
Haojun Liao 已提交
535 536
  ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows);
  ASSERT(target->numOfRows + rowsToMerge <= target->maxPoints);
H
TD-166  
hzcheng 已提交
537
  ASSERT(target->numOfCols == source->numOfCols);
H
TD-100  
hzcheng 已提交
538

H
TD-166  
hzcheng 已提交
539
  SDataCols *pTarget = NULL;
H
TD-100  
hzcheng 已提交
540

H
TD-166  
hzcheng 已提交
541 542 543
  if (dataColsKeyLast(target) < dataColsKeyFirst(source)) {  // No overlap
    for (int i = 0; i < rowsToMerge; i++) {
      for (int j = 0; j < source->numOfCols; j++) {
H
Haojun Liao 已提交
544
        dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i), target->numOfRows,
H
TD-166  
hzcheng 已提交
545 546
                         target->maxPoints);
      }
H
Haojun Liao 已提交
547
      target->numOfRows++;
H
TD-166  
hzcheng 已提交
548 549 550 551 552 553 554
    }
  } else {
    pTarget = tdDupDataCols(target, true);
    if (pTarget == NULL) goto _err;

    int iter1 = 0;
    int iter2 = 0;
H
Haojun Liao 已提交
555
    tdMergeTwoDataCols(target, pTarget, &iter1, source, &iter2, pTarget->numOfRows + rowsToMerge);
H
TD-166  
hzcheng 已提交
556
  }
H
TD-100  
hzcheng 已提交
557 558 559 560 561 562 563 564

  tdFreeDataCols(pTarget);
  return 0;

_err:
  tdFreeDataCols(pTarget);
  return -1;
}
H
TD-100  
hzcheng 已提交
565

H
TD-100  
hzcheng 已提交
566
void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCols *src2, int *iter2, int tRows) {
H
TD-166  
hzcheng 已提交
567
  // TODO: add resolve duplicate key here
H
TD-100  
hzcheng 已提交
568 569
  tdResetDataCols(target);

H
Haojun Liao 已提交
570 571
  while (target->numOfRows < tRows) {
    if (*iter1 >= src1->numOfRows && *iter2 >= src2->numOfRows) break;
H
TD-100  
hzcheng 已提交
572

H
Haojun Liao 已提交
573 574
    TSKEY key1 = (*iter1 >= src1->numOfRows) ? INT64_MAX : ((TSKEY *)(src1->cols[0].pData))[*iter1];
    TSKEY key2 = (*iter2 >= src2->numOfRows) ? INT64_MAX : ((TSKEY *)(src2->cols[0].pData))[*iter2];
H
TD-100  
hzcheng 已提交
575

H
Hongze Cheng 已提交
576
    if (key1 <= key2) {
H
TD-100  
hzcheng 已提交
577 578
      for (int i = 0; i < src1->numOfCols; i++) {
        ASSERT(target->cols[i].type == src1->cols[i].type);
H
Haojun Liao 已提交
579
        dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows,
H
TD-166  
hzcheng 已提交
580
                         target->maxPoints);
H
TD-100  
hzcheng 已提交
581 582
      }

H
Haojun Liao 已提交
583
      target->numOfRows++;
H
TD-100  
hzcheng 已提交
584
      (*iter1)++;
H
Hongze Cheng 已提交
585 586
      if (key1 == key2) (*iter2)++;
    } else {
H
TD-100  
hzcheng 已提交
587 588
      for (int i = 0; i < src2->numOfCols; i++) {
        ASSERT(target->cols[i].type == src2->cols[i].type);
H
Haojun Liao 已提交
589
        dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src2->cols + i, *iter2), target->numOfRows,
H
TD-166  
hzcheng 已提交
590
                         target->maxPoints);
H
TD-100  
hzcheng 已提交
591
      }
H
TD-100  
hzcheng 已提交
592

H
Haojun Liao 已提交
593
      target->numOfRows++;
H
TD-100  
hzcheng 已提交
594
      (*iter2)++;
H
TD-100  
hzcheng 已提交
595 596
    }
  }
H
TD-100  
hzcheng 已提交
597
}