tsdbWrite.c 3.7 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tsdb.h"
H
Hongze Cheng 已提交
17

18 19 20 21 22 23 24 25 26
/**
 * @brief max key by precision
 *  approximately calculation:
 *  ms: 3600*1000*8765*1000         // 1970 + 1000 years
 *  us: 3600*1000000*8765*1000      // 1970 + 1000 years
 *  ns: 3600*1000000000*8765*292    // 1970 + 292 years
 */
static int64_t tsMaxKeyByPrecision[] = {31556995200000L, 31556995200000000L, 9214646400000000000L};

C
Cary Xu 已提交
27
// static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg);
H
Hongze Cheng 已提交
28

29 30 31 32
int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq2 *pMsg, SSubmitRsp2 *pRsp) {
  int32_t arrSize = 0;
  int32_t affectedrows = 0;
  int32_t numOfRows = 0;
H
Hongze Cheng 已提交
33 34 35

  ASSERT(pTsdb->mem != NULL);

36 37 38 39
  if (pMsg) {
    arrSize = taosArrayGetSize(pMsg->aSubmitTbData);
  }

H
Hongze Cheng 已提交
40 41 42
  // scan and convert
  if (tsdbScanAndConvertSubmitMsg(pTsdb, pMsg) < 0) {
    if (terrno != TSDB_CODE_TDB_TABLE_RECONFIGURE) {
H
Hongze Cheng 已提交
43
      tsdbError("vgId:%d, failed to insert data since %s", TD_VID(pTsdb->pVnode), tstrerror(terrno));
H
Hongze Cheng 已提交
44 45 46 47 48
    }
    return -1;
  }

  // loop to insert
49 50
  for (int32_t i = 0; i < arrSize; ++i) {
    if ((terrno = tsdbInsertTableData(pTsdb, version, taosArrayGet(pMsg->aSubmitTbData, i), &affectedrows)) < 0) {
H
Hongze Cheng 已提交
51 52 53 54 55
      return -1;
    }
  }

  if (pRsp != NULL) {
H
Hongze Cheng 已提交
56 57
    // pRsp->affectedRows = affectedrows;
    // pRsp->numOfRows = numOfRows;
H
Hongze Cheng 已提交
58 59 60 61 62
  }

  return 0;
}

63
static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, tb_uid_t uid, TSKEY rowKey, TSKEY minKey, TSKEY maxKey,
C
Cary Xu 已提交
64 65
                                          TSKEY now) {
  if (rowKey < minKey || rowKey > maxKey) {
S
Shengliang Guan 已提交
66
    tsdbError("vgId:%d, table uid %" PRIu64 " timestamp is out of range! now %" PRId64 " minKey %" PRId64
C
Cary Xu 已提交
67
              " maxKey %" PRId64 " row key %" PRId64,
H
Hongze Cheng 已提交
68
              TD_VID(pTsdb->pVnode), uid, now, minKey, maxKey, rowKey);
69
    return TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
C
Cary Xu 已提交
70 71 72 73 74
  }

  return 0;
}

75
int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq2 *pMsg) {
76
  int32_t       code = 0;
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
  STsdbKeepCfg *pCfg = &pTsdb->keepCfg;
  TSKEY         now = taosGetTimestamp(pCfg->precision);
  TSKEY         minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep2;
  TSKEY         maxKey = tsMaxKeyByPrecision[pCfg->precision];
  int32_t       size = taosArrayGetSize(pMsg->aSubmitTbData);

  for (int32_t i = 0; i < size; ++i) {
    SSubmitTbData *pData = TARRAY_GET_ELEM(pMsg->aSubmitTbData, i);
    if (pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
      uint64_t  nColData = TARRAY_SIZE(pData->aCol);
      SColData *aColData = (SColData *)TARRAY_DATA(pData->aCol);
      if (nColData > 0) {
        int32_t nRows = aColData[0].nVal;
        TSKEY  *aKey = (TSKEY *)aColData[0].pData;
        for (int32_t r = 0; r < nRows; ++r) {
92 93
          if ((code = tsdbCheckRowRange(pTsdb, pData->uid, aKey[r], minKey, maxKey, now)) < 0) {
            goto _exit;
94 95 96 97 98 99 100
          }
        }
      }
    } else {
      int32_t nRows = taosArrayGetSize(pData->aRowP);
      for (int32_t r = 0; r < nRows; ++r) {
        SRow *pRow = (SRow *)taosArrayGetP(pData->aRowP, r);
101 102
        if ((code = tsdbCheckRowRange(pTsdb, pData->uid, pRow->ts, minKey, maxKey, now)) < 0) {
          goto _exit;
103 104 105 106 107
        }
      }
    }
  }

108 109
_exit:
  return code;
C
Cary Xu 已提交
110
}