parInsertData.h 5.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifndef TDENGINE_DATABLOCKMGT_H
#define TDENGINE_DATABLOCKMGT_H

#include "catalog.h"
#include "os.h"
#include "ttypes.h"
#include "tname.h"

#define IS_DATA_COL_ORDERED(spd) ((spd->orderStatus) == (int8_t)ORDER_STATUS_ORDERED)

typedef enum EOrderStatus {
  ORDER_STATUS_UNKNOWN = 0,
  ORDER_STATUS_ORDERED = 1,
  ORDER_STATUS_DISORDERED = 2,
} EOrderStatus;

typedef enum EValStat {
  VAL_STAT_HAS = 0x0,    // 0 means has val
  VAL_STAT_NONE = 0x01,  // 1 means no val
} EValStat;

typedef struct SBoundColumn {
  int32_t offset;   // all column offset value
  int32_t toffset;  // first part offset for SDataRow TODO: get offset from STSchema on future
40
  uint8_t valStat;  // EValStat. denote if current column bound or not(0 means has val, 1 means no val)
41 42 43
} SBoundColumn;

typedef struct {
44 45 46
  col_id_t schemaColIdx;
  col_id_t boundIdx;
  col_id_t finalIdx;
47 48 49
} SBoundIdxInfo;

typedef struct SParsedDataColInfo {
50 51
  col_id_t       numOfCols;
  col_id_t       numOfBound;
52
  uint16_t       flen;        // TODO: get from STSchema
C
Cary Xu 已提交
53
  uint16_t       allNullLen;  // TODO: get from STSchema(base on SDataRow)
54
  uint16_t       extendedVarLen;
C
Cary Xu 已提交
55
  uint16_t       boundNullLen;    // bound column len with all NULL value(without VarDataOffsetT/SColIdx part)
56 57
  col_id_t      *boundColumns;    // bound column idx according to schema
  SBoundColumn  *cols;
C
Cary Xu 已提交
58
  SBoundIdxInfo *colIdxInfo;
59 60 61 62
  int8_t         orderStatus;  // bound columns
} SParsedDataColInfo;

typedef struct {
63
  uint8_t rowType;  // default is 0, that is SDataRow
C
Cary Xu 已提交
64
  int32_t rowSize;
65 66 67 68 69
} SMemRowBuilder;

typedef struct STableDataBlocks {
  int8_t      tsSource;     // where does the UNIX timestamp come from, server or client
  bool        ordered;      // if current rows are ordered or not
X
Xiaoyu Wang 已提交
70
  int32_t     vgId;         // virtual group id
71 72 73 74 75 76 77 78 79 80
  int64_t     prevTS;       // previous timestamp, recorded to decide if the records array is ts ascending
  int32_t     numOfTables;  // number of tables in current submit block
  int32_t     rowSize;      // row size for current table
  uint32_t    nAllocSize;
  uint32_t    headerSize;   // header for table info (uid, tid, submit metadata)
  uint32_t    size;
  STableMeta *pTableMeta;   // the tableMeta of current table, the table meta will be used during submit, keep a ref to avoid to be removed from cache
  char       *pData;
  bool        cloned;
  STagData    tagData; 
D
dapan1121 已提交
81 82
  char        tableName[TSDB_TABLE_NAME_LEN];
  char        dbFName[TSDB_DB_FNAME_LEN];
83 84
  
  SParsedDataColInfo boundColumnInfo;
C
Cary Xu 已提交
85
  SRowBuilder        rowBuilder;
86 87 88
} STableDataBlocks;

static FORCE_INLINE int32_t getExtendedRowSize(STableDataBlocks *pBlock) {
C
Cary Xu 已提交
89 90 91 92
  STableComInfo *pTableInfo = &pBlock->pTableMeta->tableInfo;
  ASSERT(pBlock->rowSize == pTableInfo->rowSize);
  return pBlock->rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + pBlock->boundColumnInfo.extendedVarLen +
         (int32_t)TD_BITMAP_BYTES(pTableInfo->numOfColumns - 1);
93 94
}

95 96 97
static FORCE_INLINE void getSTSRowAppendInfo(SSchema *pSchema, uint8_t rowType, SParsedDataColInfo *spd, col_id_t idx,
                                             int32_t *toffset, col_id_t *colIdx) {
  col_id_t schemaIdx = 0;
98
  if (IS_DATA_COL_ORDERED(spd)) {
99
    schemaIdx = spd->boundColumns[idx] - PRIMARYKEY_TIMESTAMP_COL_ID;
C
Cary Xu 已提交
100
    if (TD_IS_TP_ROW_T(rowType)) {
101
      *toffset = (spd->cols + schemaIdx)->toffset;  // the offset of firstPart
C
Cary Xu 已提交
102
      *colIdx = schemaIdx;
103 104
    } else {
      *toffset = idx * sizeof(SColIdx);  // the offset of SColIdx
C
Cary Xu 已提交
105
      *colIdx = idx;
106 107 108
    }
  } else {
    ASSERT(idx == (spd->colIdxInfo + idx)->boundIdx);
C
Cary Xu 已提交
109
    schemaIdx = (spd->colIdxInfo + idx)->schemaColIdx - PRIMARYKEY_TIMESTAMP_COL_ID;
C
Cary Xu 已提交
110
    if (TD_IS_TP_ROW_T(rowType)) {
111
      *toffset = (spd->cols + schemaIdx)->toffset;
C
Cary Xu 已提交
112
      *colIdx = schemaIdx;
113 114
    } else {
      *toffset = ((spd->colIdxInfo + idx)->finalIdx) * sizeof(SColIdx);
C
Cary Xu 已提交
115
      *colIdx = (spd->colIdxInfo + idx)->finalIdx;
116 117 118 119
    }
  }
}

D
dapan1121 已提交
120
static FORCE_INLINE int32_t setBlockInfo(SSubmitBlk *pBlocks, STableDataBlocks* dataBuf, int32_t numOfRows) {
X
Xiaoyu Wang 已提交
121
  pBlocks->suid = (TSDB_NORMAL_TABLE == dataBuf->pTableMeta->tableType ? dataBuf->pTableMeta->uid : dataBuf->pTableMeta->suid);
D
dapan1121 已提交
122 123
  pBlocks->uid = dataBuf->pTableMeta->uid;
  pBlocks->sversion = dataBuf->pTableMeta->sversion;
124 125 126 127 128 129 130 131 132 133 134

  if (pBlocks->numOfRows + numOfRows >= INT16_MAX) {
    return TSDB_CODE_TSC_INVALID_OPERATION;
  } else {
    pBlocks->numOfRows += numOfRows;
    return TSDB_CODE_SUCCESS;
  }
}

int32_t schemaIdxCompar(const void *lhs, const void *rhs);
int32_t boundIdxCompar(const void *lhs, const void *rhs);
135
void    setBoundColumnInfo(SParsedDataColInfo *pColList, SSchema *pSchema, col_id_t numOfCols);
136
void destroyBoundColumnInfo(SParsedDataColInfo* pColList);
137
void destroyBlockArrayList(SArray* pDataBlockList);
138
void destroyBlockHashmap(SHashObj* pDataBlockHash);
C
Cary Xu 已提交
139
int  initRowBuilder(SRowBuilder *pBuilder, int16_t schemaVer, SParsedDataColInfo *pColInfo);
140 141
int32_t allocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows);
int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize,
142
    const STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList);
143
int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t payloadType, SArray** pVgDataBlocks);
144 145

#endif  // TDENGINE_DATABLOCKMGT_H