dataformat.c 9.9 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
H
Hongze Cheng 已提交
15
#include "dataformat.h"
H
more  
hzcheng 已提交
16

H
hzcheng 已提交
17 18
static int tdFLenFromSchema(STSchema *pSchema);

H
hzcheng 已提交
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
/**
 * Create a new STColumn object
 * ASSUMPTIONS: VALID PARAMETERS
 * 
 * @param type column type
 * @param colId column ID
 * @param bytes maximum bytes the col taken
 * 
 * @return a STColumn object on success
 *         NULL for failure
 */
STColumn *tdNewCol(int8_t type, int16_t colId, int16_t bytes) {
  if (!isValidDataType(type, 0)) return NULL;

  STColumn *pCol = (STColumn *)calloc(1, sizeof(STColumn));
  if (pCol == NULL) return NULL;

  colSetType(pCol, type);
  colSetColId(pCol, colId);
  colSetOffset(pCol, -1);
  switch (type) {
    case TSDB_DATA_TYPE_BINARY:
    case TSDB_DATA_TYPE_NCHAR:
      colSetBytes(pCol, bytes);
      break;
    default:
      colSetBytes(pCol, TYPE_BYTES[type]);
      break;
  }

  return pCol;
}

/**
 * Free a STColumn object CREATED with tdNewCol
 */
void tdFreeCol(STColumn *pCol) {
  if (pCol) free(pCol);
}

/**
 * Copy from source to destinition
 */
void tdColCpy(STColumn *dst, STColumn *src) { memcpy((void *)dst, (void *)src, sizeof(STColumn)); }

/**
 * Set the column
 */
void tdSetCol(STColumn *pCol, int8_t type, int16_t colId, int32_t bytes) {
  colSetType(pCol, type);
  colSetColId(pCol, colId);
  switch (type)
  {
  case TSDB_DATA_TYPE_BINARY:
  case TSDB_DATA_TYPE_NCHAR:
    colSetBytes(pCol, bytes);
    break;
  default:
    colSetBytes(pCol, TYPE_BYTES[type]);
    break;
  }
}

/**
 * Create a SSchema object with nCols columns
 * ASSUMPTIONS: VALID PARAMETERS
 *
 * @param nCols number of columns the schema has
 *
 * @return a STSchema object for success
 *         NULL for failure
 */
STSchema *tdNewSchema(int32_t nCols) {
  int32_t  size = sizeof(STSchema) + sizeof(STColumn) * nCols;

H
TD-27  
hzcheng 已提交
94
  STSchema *pSchema = (STSchema *)calloc(1, size);
H
hzcheng 已提交
95
  if (pSchema == NULL) return NULL;
H
hzcheng 已提交
96
  pSchema->numOfCols = 0;
H
hzcheng 已提交
97 98 99 100

  return pSchema;
}

H
hzcheng 已提交
101 102 103
/**
 * Append a column to the schema
 */
H
TD-27  
hzcheng 已提交
104 105
int tdSchemaAppendCol(STSchema *pSchema, int8_t type, int16_t colId, int32_t bytes) {
  // if (pSchema->numOfCols >= pSchema->totalCols) return -1;
H
hzcheng 已提交
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
  if (!isValidDataType(type, 0)) return -1;

  STColumn *pCol = schemaColAt(pSchema, schemaNCols(pSchema));
  colSetType(pCol, type);
  colSetColId(pCol, colId);
  colSetOffset(pCol, -1);
  switch (type) {
    case TSDB_DATA_TYPE_BINARY:
    case TSDB_DATA_TYPE_NCHAR:
      colSetBytes(pCol, bytes);
      break;
    default:
      colSetBytes(pCol, TYPE_BYTES[type]);
      break;
  }

  pSchema->numOfCols++;

  return 0;
}

H
hzcheng 已提交
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
/**
 * Duplicate the schema and return a new object
 */
STSchema *tdDupSchema(STSchema *pSchema) {
  STSchema *tSchema = tdNewSchema(schemaNCols(pSchema));
  if (tSchema == NULL) return NULL;

  int32_t size = sizeof(STSchema) + sizeof(STColumn) * schemaNCols(pSchema);
  memcpy((void *)tSchema, (void *)pSchema, size);

  return tSchema;
}

/**
 * Free the SSchema object created by tdNewSchema or tdDupSchema
 */
void tdFreeSchema(STSchema *pSchema) {
  if (pSchema == NULL) free(pSchema);
}

/**
 * Function to update each columns's offset field in the schema.
 * ASSUMPTIONS: VALID PARAMETERS
 */
void tdUpdateSchema(STSchema *pSchema) {
  STColumn *pCol = NULL;
H
TD-34  
hzcheng 已提交
153
  int32_t offset = TD_DATA_ROW_HEAD_SIZE;
H
hzcheng 已提交
154 155 156 157 158 159 160
  for (int i = 0; i < schemaNCols(pSchema); i++) {
    pCol = schemaColAt(pSchema, i);
    colSetOffset(pCol, offset);
    offset += TYPE_BYTES[pCol->type];
  }
}

H
TD-27  
hzcheng 已提交
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207
/**
 * Return the size of encoded schema
 */
int tdGetSchemaEncodeSize(STSchema *pSchema) {
  return sizeof(STSchema) + schemaNCols(pSchema) * (T_MEMBER_SIZE(STColumn, type) + T_MEMBER_SIZE(STColumn, colId) +
                                                    T_MEMBER_SIZE(STColumn, bytes));
}

/**
 * Encode a schema to dst, and return the next pointer
 */
void *tdEncodeSchema(void *dst, STSchema *pSchema) {
  T_APPEND_MEMBER(dst, pSchema, STSchema, numOfCols);
  for (int i = 0; i < schemaNCols(pSchema); i++) {
    STColumn *pCol = schemaColAt(pSchema, i);
    T_APPEND_MEMBER(dst, pCol, STColumn, type);
    T_APPEND_MEMBER(dst, pCol, STColumn, colId);
    T_APPEND_MEMBER(dst, pCol, STColumn, bytes);
  }

  return dst;
}

/**
 * Decode a schema from a binary.
 */
STSchema *tdDecodeSchema(void **psrc) {
  int numOfCols = 0;

  T_READ_MEMBER(*psrc, int, numOfCols);

  STSchema *pSchema = tdNewSchema(numOfCols);
  if (pSchema == NULL) return NULL;
  for (int i = 0; i < numOfCols; i++) {
    int8_t  type = 0;
    int16_t colId = 0;
    int32_t bytes = 0;
    T_READ_MEMBER(*psrc, int8_t, type);
    T_READ_MEMBER(*psrc, int16_t, colId);
    T_READ_MEMBER(*psrc, int32_t, bytes);

    tdSchemaAppendCol(pSchema, type, colId, bytes);
  }

  return pSchema;
}

H
hzcheng 已提交
208 209 210 211 212 213 214 215
/**
 * Initialize a data row
 */
void tdInitDataRow(SDataRow row, STSchema *pSchema) {
  dataRowSetFLen(row, TD_DATA_ROW_HEAD_SIZE);
  dataRowSetLen(row, TD_DATA_ROW_HEAD_SIZE + tdFLenFromSchema(pSchema));
}

H
hzcheng 已提交
216 217 218 219 220 221 222 223 224 225
/**
 * Create a data row with maximum row length bytes.
 *
 * NOTE: THE AAPLICATION SHOULD MAKE SURE BYTES IS LARGE ENOUGH TO
 * HOLD THE WHOE ROW.
 *
 * @param bytes max bytes a row can take
 * @return SDataRow object for success
 *         NULL for failure
 */
H
hzcheng 已提交
226
SDataRow tdNewDataRow(int32_t bytes, STSchema *pSchema) {
H
hzcheng 已提交
227 228 229 230 231
  int32_t size = sizeof(int32_t) + bytes;

  SDataRow row = malloc(size);
  if (row == NULL) return NULL;

H
hzcheng 已提交
232
  tdInitDataRow(row, pSchema);
H
hzcheng 已提交
233 234 235 236

  return row;
}

H
hzcheng 已提交
237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255
/**
 * Get maximum bytes a data row from a schema
 * ASSUMPTIONS: VALID PARAMETER
 */
int tdMaxRowBytesFromSchema(STSchema *pSchema) {
  // TODO
  int bytes = TD_DATA_ROW_HEAD_SIZE;
  for (int i = 0; i < schemaNCols(pSchema); i++) {
    STColumn *pCol = schemaColAt(pSchema, i);
    bytes += TYPE_BYTES[pCol->type];

    if (pCol->type == TSDB_DATA_TYPE_BINARY || pCol->type == TSDB_DATA_TYPE_NCHAR) {
      bytes += pCol->bytes;
    }
  }

  return bytes;
}

H
hzcheng 已提交
256
SDataRow tdNewDataRowFromSchema(STSchema *pSchema) { return tdNewDataRow(tdMaxRowBytesFromSchema(pSchema), pSchema); }
H
hzcheng 已提交
257 258 259 260 261 262 263 264 265

/**
 * Free the SDataRow object
 */
void tdFreeDataRow(SDataRow row) {
  if (row) free(row);
}

/**
H
hzcheng 已提交
266
 * Append a column value to the data row
H
hzcheng 已提交
267
 */
H
hzcheng 已提交
268 269 270 271 272 273 274 275 276 277 278 279 280 281 282
int tdAppendColVal(SDataRow row, void *value, STColumn *pCol) {
  switch (colType(pCol))
  {
  case TSDB_DATA_TYPE_BINARY:
  case TSDB_DATA_TYPE_NCHAR:
    *(int32_t *)dataRowAt(row, dataRowFLen(row)) = dataRowLen(row);
    dataRowFLen(row) += TYPE_BYTES[colType(pCol)];
    memcpy((void *)dataRowAt(row, dataRowLen(row)), value, strlen(value));
    dataRowLen(row) += strlen(value);
    break;
  default:
    memcpy(dataRowAt(row, dataRowFLen(row)), value, TYPE_BYTES[colType(pCol)]);
    dataRowFLen(row) += TYPE_BYTES[colType(pCol)];
    break;
  }
H
TD-27  
hzcheng 已提交
283 284

  return 0;
H
hzcheng 已提交
285 286 287
}

void tdDataRowReset(SDataRow row, STSchema *pSchema) { tdInitDataRow(row, pSchema); }
H
hzcheng 已提交
288

H
hzcheng 已提交
289
SDataRow tdDataRowDup(SDataRow row) {
H
hzcheng 已提交
290
  SDataRow trow = malloc(dataRowLen(row));
H
hzcheng 已提交
291 292 293
  if (trow == NULL) return NULL;

  dataRowCpy(trow, row);
H
hzcheng 已提交
294
  return trow;
H
hzcheng 已提交
295
}
H
hzcheng 已提交
296

H
TD-34  
hzcheng 已提交
297 298 299 300 301 302 303 304 305 306 307
SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) {
  SDataCols *pCols = (SDataCols *)calloc(1, sizeof(SDataCols) + sizeof(SDataCol) * maxCols);
  if (pCols == NULL) return NULL;

  pCols->maxRowSize = maxRowSize;
  pCols->maxCols = maxCols;
  pCols->maxPoints = maxRows;

  pCols->buf = malloc(maxRowSize * maxRows);
  if (pCols->buf == NULL) {
    free(pCols);
H
TD-34  
hzcheng 已提交
308 309
    return NULL;
  }
H
TD-34  
hzcheng 已提交
310

H
TD-34  
hzcheng 已提交
311 312 313 314 315 316 317 318 319
  return pCols;
}

void tdInitDataCols(SDataCols *pCols, STSchema *pSchema) {
  // assert(schemaNCols(pSchema) <= pCols->numOfCols);
  tdResetDataCols(pCols);
  pCols->numOfCols = schemaNCols(pSchema);

  pCols->cols[0].pData = pCols->buf;
H
TD-100  
hzcheng 已提交
320
  int offset = TD_DATA_ROW_HEAD_SIZE;
H
TD-34  
hzcheng 已提交
321 322 323 324 325 326
  for (int i = 0; i < schemaNCols(pSchema); i++) {
    if (i > 0) {
      pCols->cols[i].pData = (char *)(pCols->cols[i - 1].pData) + schemaColAt(pSchema, i - 1)->bytes * pCols->maxPoints;
    }
    pCols->cols[i].type = colType(schemaColAt(pSchema, i));
    pCols->cols[i].bytes = colBytes(schemaColAt(pSchema, i));
H
TD-100  
hzcheng 已提交
327
    pCols->cols[i].offset = offset;
H
TD-34  
hzcheng 已提交
328
    pCols->cols[i].colId = colColId(schemaColAt(pSchema, i));
H
TD-100  
hzcheng 已提交
329 330

    offset += TYPE_BYTES[pCols->cols[i].type];
H
TD-34  
hzcheng 已提交
331
  }
H
TD-34  
hzcheng 已提交
332 333 334 335 336 337 338 339 340 341 342
}

void tdFreeDataCols(SDataCols *pCols) {
  if (pCols) {
    if (pCols->buf) free(pCols->buf);
    free(pCols);
  }
}

void tdResetDataCols(SDataCols *pCols) {
  pCols->numOfPoints = 0;
H
TD-34  
hzcheng 已提交
343
  for (int i = 0; i < pCols->maxCols; i++) {
H
TD-34  
hzcheng 已提交
344 345 346 347
    pCols->cols[i].len = 0;
  }
}

H
TD-34  
hzcheng 已提交
348
void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols) {
H
TD-34  
hzcheng 已提交
349 350
  for (int i = 0; i < pCols->numOfCols; i++) {
    SDataCol *pCol = pCols->cols + i;
H
TD-34  
hzcheng 已提交
351 352
    memcpy((void *)((char *)(pCol->pData) + pCol->len), dataRowAt(row, pCol->offset), pCol->bytes);
    pCol->len += pCol->bytes;
H
TD-34  
hzcheng 已提交
353
  }
H
TD-34  
hzcheng 已提交
354
  pCols->numOfPoints++;
H
TD-34  
hzcheng 已提交
355
}
H
TD-34  
hzcheng 已提交
356 357
// Pop pointsToPop points from the SDataCols
void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop) {
H
TD-34  
hzcheng 已提交
358 359 360 361 362 363 364 365 366 367 368 369
  int pointsLeft = pCols->numOfPoints - pointsToPop;

  for (int iCol = 0; iCol < pCols->numOfCols; iCol++) {
    SDataCol *p_col = pCols->cols + iCol;
    if (p_col->len > 0) {
      p_col->len = TYPE_BYTES[p_col->type] * pointsLeft;
      if (pointsLeft > 0) {
        memmove((void *)(p_col->pData), (void *)((char *)(p_col->pData) + TYPE_BYTES[p_col->type] * pointsToPop), p_col->len);
      }
    }
  }
  pCols->numOfPoints = pointsLeft;
H
TD-34  
hzcheng 已提交
370
}
H
TD-34  
hzcheng 已提交
371

H
hzcheng 已提交
372 373 374 375 376 377 378 379 380 381 382
/**
 * Return the first part length of a data row for a schema
 */
static int tdFLenFromSchema(STSchema *pSchema) {
  int ret = 0;
  for (int i = 0; i < schemaNCols(pSchema); i++) {
    STColumn *pCol = schemaColAt(pSchema, i);
    ret += TYPE_BYTES[pCol->type];
  }

  return ret;
383
}
H
TD-100  
hzcheng 已提交
384

H
hzcheng 已提交
385 386
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) {
  // TODO
H
hzcheng 已提交
387
  return 0;
H
TD-100  
hzcheng 已提交
388
}