parInsertUtil.h 7.0 KB
Newer Older
X
Xiaoyu Wang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifndef TDENGINE_PAR_INSERT_UTIL_H
#define TDENGINE_PAR_INSERT_UTIL_H

#include "parUtil.h"

struct SToken;

#define NEXT_TOKEN(pSql, sToken)                \
  do {                                          \
    int32_t index = 0;                          \
    sToken = tStrGetToken(pSql, &index, false); \
    pSql += index;                              \
  } while (0)

#define CHECK_CODE(expr)             \
  do {                               \
    int32_t code = expr;             \
    if (TSDB_CODE_SUCCESS != code) { \
      return code;                   \
    }                                \
  } while (0)

X
Xiaoyu Wang 已提交
38 39
#define IS_DATA_COL_ORDERED(spd) ((spd->orderStatus) == (int8_t)ORDER_STATUS_ORDERED)

X
Xiaoyu Wang 已提交
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
typedef enum EOrderStatus {
  ORDER_STATUS_UNKNOWN = 0,
  ORDER_STATUS_ORDERED = 1,
  ORDER_STATUS_DISORDERED = 2,
} EOrderStatus;

typedef enum EValStat {
  VAL_STAT_HAS = 0x0,    // 0 means has val
  VAL_STAT_NONE = 0x01,  // 1 means no val
} EValStat;

typedef struct SBoundColumn {
  int32_t offset;   // all column offset value
  int32_t toffset;  // first part offset for SDataRow TODO: get offset from STSchema on future
  uint8_t valStat;  // EValStat. denote if current column bound or not(0 means has val, 1 means no val)
} SBoundColumn;

typedef struct {
  col_id_t schemaColIdx;
  col_id_t boundIdx;
  col_id_t finalIdx;
} SBoundIdxInfo;

typedef struct SParsedDataColInfo {
  col_id_t       numOfCols;
  col_id_t       numOfBound;
  uint16_t       flen;        // TODO: get from STSchema
  uint16_t       allNullLen;  // TODO: get from STSchema(base on SDataRow)
  uint16_t       extendedVarLen;
  uint16_t       boundNullLen;  // bound column len with all NULL value(without VarDataOffsetT/SColIdx part)
  col_id_t      *boundColumns;  // bound column idx according to schema
  SBoundColumn  *cols;
  SBoundIdxInfo *colIdxInfo;
  int8_t         orderStatus;  // bound columns
} SParsedDataColInfo;

typedef struct SInsertParseBaseContext {
  SParseContext *pComCxt;
  char          *pSql;
  SMsgBuf        msg;
} SInsertParseBaseContext;

typedef struct SInsertParseSyntaxCxt {
  SParseContext   *pComCxt;
  char            *pSql;
  SMsgBuf          msg;
  SParseMetaCache *pMetaCache;
} SInsertParseSyntaxCxt;

typedef struct SMemParam {
  SRowBuilder *rb;
  SSchema     *schema;
  int32_t      toffset;
  col_id_t     colIdx;
} SMemParam;

typedef struct {
  uint8_t rowType;  // default is 0, that is SDataRow
  int32_t rowSize;
} SMemRowBuilder;

typedef struct STableDataBlocks {
  int8_t      tsSource;     // where does the UNIX timestamp come from, server or client
  bool        ordered;      // if current rows are ordered or not
  int32_t     vgId;         // virtual group id
  int64_t     prevTS;       // previous timestamp, recorded to decide if the records array is ts ascending
  int32_t     numOfTables;  // number of tables in current submit block
  int32_t     rowSize;      // row size for current table
  uint32_t    nAllocSize;
  uint32_t    headerSize;  // header for table info (uid, tid, submit metadata)
  uint32_t    size;
  STableMeta *pTableMeta;  // the tableMeta of current table, the table meta will be used during submit, keep a ref to
                           // avoid to be removed from cache
  char              *pData;
  bool               cloned;
  int32_t            createTbReqLen;
  SParsedDataColInfo boundColumnInfo;
  SRowBuilder        rowBuilder;
} STableDataBlocks;

int32_t insGetExtendedRowSize(STableDataBlocks *pBlock);
void insGetSTSRowAppendInfo(uint8_t rowType, SParsedDataColInfo *spd, col_id_t idx, int32_t *toffset, col_id_t *colIdx);
X
Xiaoyu Wang 已提交
122
int32_t insSetBlockInfo(SSubmitBlk *pBlocks, STableDataBlocks *dataBuf, int32_t numOfRows, SMsgBuf *pMsg);
X
Xiaoyu Wang 已提交
123 124 125 126 127 128 129 130 131
int32_t insSchemaIdxCompar(const void *lhs, const void *rhs);
int32_t insBoundIdxCompar(const void *lhs, const void *rhs);
void    insSetBoundColumnInfo(SParsedDataColInfo *pColList, SSchema *pSchema, col_id_t numOfCols);
void    insDestroyBlockArrayList(SArray *pDataBlockList);
void    insDestroyBlockHashmap(SHashObj *pDataBlockHash);
int32_t insInitRowBuilder(SRowBuilder *pBuilder, int16_t schemaVer, SParsedDataColInfo *pColInfo);
int32_t insGetDataBlockFromList(SHashObj *pHashList, void *id, int32_t idLen, int32_t size, int32_t startOffset,
                                int32_t rowSize, STableMeta *pTableMeta, STableDataBlocks **dataBlocks,
                                SArray *pBlockList, SVCreateTbReq *pCreateTbReq);
132
int32_t insMergeTableDataBlocks(SHashObj *pHashObj, SArray **pVgDataBlocks);
X
Xiaoyu Wang 已提交
133 134 135 136 137 138 139 140
int32_t insBuildCreateTbMsg(STableDataBlocks *pBlocks, SVCreateTbReq *pCreateTbReq);
int32_t insAllocateMemForSize(STableDataBlocks *pDataBlock, int32_t allSize);
int32_t insCreateSName(SName *pName, struct SToken *pTableName, int32_t acctId, const char *dbName, SMsgBuf *pMsgBuf);
int32_t insFindCol(struct SToken *pColname, int32_t start, int32_t end, SSchema *pSchema);
void    insBuildCreateTbReq(SVCreateTbReq *pTbReq, const char *tname, STag *pTag, int64_t suid, const char *sname,
                            SArray *tagName, uint8_t tagNum);
int32_t insMemRowAppend(SMsgBuf *pMsgBuf, const void *value, int32_t len, void *param);
int32_t insCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start);
X
Xiaoyu Wang 已提交
141
int32_t insBuildOutput(SHashObj *pVgroupsHashObj, SArray *pVgDataBlocks, SArray **pDataBlocks);
X
Xiaoyu Wang 已提交
142 143
void    insDestroyDataBlock(STableDataBlocks *pDataBlock);

X
Xiaoyu Wang 已提交
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
typedef struct SBoundColInfo {
  int32_t *pColIndex;  // bound index => schema index
  int32_t  numOfCols;
  int32_t  numOfBound;
} SBoundColInfo;

typedef struct STableDataCxt {
  STableMeta    *pMeta;
  STSchema      *pSchema;
  SBoundColInfo  boundColsInfo;
  SArray        *pValues;
  SVCreateTbReq *pCreateTblReq;
  SSubmitTbData  data;
} STableDataCxt;

typedef struct SVgroupDataCxt {
  int32_t     vgId;
  SSubmitReq2 data;
} SVgroupDataCxt;

int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo *pInfo);
int32_t insGetTableDataCxt(SHashObj *pHash, void *id, int32_t idLen, STableMeta *pTableMeta,
                           SVCreateTbReq **pCreateTbReq, STableDataCxt **pTableCxt);
int32_t insMergeTableDataCxt(SHashObj *pTableHash, SArray **pVgDataBlocks);
int32_t insBuildVgDataBlocks(SHashObj *pVgroupsHashObj, SArray *pVgDataBlocks, SArray **pDataBlocks);
void    insDestroyTableDataCxtHashMap(SHashObj *pTableCxtHash);
void    insDestroyVgroupDataCxt(SVgroupDataCxt *pVgCxt);
void    insDestroyVgroupDataCxtList(SArray *pVgCxtList);
void    insDestroyVgroupDataCxtHashMap(SHashObj *pVgCxtHash);

X
Xiaoyu Wang 已提交
174
#endif  // TDENGINE_PAR_INSERT_UTIL_H