tsdbWrite.c 3.7 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tsdb.h"
H
Hongze Cheng 已提交
17

18 19 20 21 22 23 24
/**
 * @brief max key by precision
 *  approximately calculation:
 *  ms: 3600*1000*8765*1000         // 1970 + 1000 years
 *  us: 3600*1000000*8765*1000      // 1970 + 1000 years
 *  ns: 3600*1000000000*8765*292    // 1970 + 292 years
 */
H
Hongze Cheng 已提交
25
int64_t tsMaxKeyByPrecision[] = {31556995200000L, 31556995200000000L, 9214646400000000000L};
26

C
Cary Xu 已提交
27
// static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg);
H
Hongze Cheng 已提交
28

29 30 31 32
int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq2 *pMsg, SSubmitRsp2 *pRsp) {
  int32_t arrSize = 0;
  int32_t affectedrows = 0;
  int32_t numOfRows = 0;
H
Hongze Cheng 已提交
33

H
Hongze Cheng 已提交
34 35 36
  if (ASSERTS(pTsdb->mem != NULL, "vgId:%d, mem is NULL", TD_VID(pTsdb->pVnode))) {
    return -1;
  }
H
Hongze Cheng 已提交
37

38 39 40 41
  if (pMsg) {
    arrSize = taosArrayGetSize(pMsg->aSubmitTbData);
  }

H
Hongze Cheng 已提交
42 43 44
  // scan and convert
  if (tsdbScanAndConvertSubmitMsg(pTsdb, pMsg) < 0) {
    if (terrno != TSDB_CODE_TDB_TABLE_RECONFIGURE) {
H
Hongze Cheng 已提交
45
      tsdbError("vgId:%d, failed to insert data since %s", TD_VID(pTsdb->pVnode), tstrerror(terrno));
H
Hongze Cheng 已提交
46 47 48 49 50
    }
    return -1;
  }

  // loop to insert
51 52
  for (int32_t i = 0; i < arrSize; ++i) {
    if ((terrno = tsdbInsertTableData(pTsdb, version, taosArrayGet(pMsg->aSubmitTbData, i), &affectedrows)) < 0) {
H
Hongze Cheng 已提交
53 54 55 56 57
      return -1;
    }
  }

  if (pRsp != NULL) {
H
Hongze Cheng 已提交
58 59
    // pRsp->affectedRows = affectedrows;
    // pRsp->numOfRows = numOfRows;
H
Hongze Cheng 已提交
60 61 62 63 64
  }

  return 0;
}

65
static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, tb_uid_t uid, TSKEY rowKey, TSKEY minKey, TSKEY maxKey,
C
Cary Xu 已提交
66 67
                                          TSKEY now) {
  if (rowKey < minKey || rowKey > maxKey) {
S
Shengliang Guan 已提交
68
    tsdbError("vgId:%d, table uid %" PRIu64 " timestamp is out of range! now %" PRId64 " minKey %" PRId64
C
Cary Xu 已提交
69
              " maxKey %" PRId64 " row key %" PRId64,
H
Hongze Cheng 已提交
70
              TD_VID(pTsdb->pVnode), uid, now, minKey, maxKey, rowKey);
71
    return TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
C
Cary Xu 已提交
72 73 74 75 76
  }

  return 0;
}

77
int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq2 *pMsg) {
78
  int32_t       code = 0;
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
  STsdbKeepCfg *pCfg = &pTsdb->keepCfg;
  TSKEY         now = taosGetTimestamp(pCfg->precision);
  TSKEY         minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep2;
  TSKEY         maxKey = tsMaxKeyByPrecision[pCfg->precision];
  int32_t       size = taosArrayGetSize(pMsg->aSubmitTbData);

  for (int32_t i = 0; i < size; ++i) {
    SSubmitTbData *pData = TARRAY_GET_ELEM(pMsg->aSubmitTbData, i);
    if (pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
      uint64_t  nColData = TARRAY_SIZE(pData->aCol);
      SColData *aColData = (SColData *)TARRAY_DATA(pData->aCol);
      if (nColData > 0) {
        int32_t nRows = aColData[0].nVal;
        TSKEY  *aKey = (TSKEY *)aColData[0].pData;
        for (int32_t r = 0; r < nRows; ++r) {
94 95
          if ((code = tsdbCheckRowRange(pTsdb, pData->uid, aKey[r], minKey, maxKey, now)) < 0) {
            goto _exit;
96 97 98 99 100 101 102
          }
        }
      }
    } else {
      int32_t nRows = taosArrayGetSize(pData->aRowP);
      for (int32_t r = 0; r < nRows; ++r) {
        SRow *pRow = (SRow *)taosArrayGetP(pData->aRowP, r);
103 104
        if ((code = tsdbCheckRowRange(pTsdb, pData->uid, pRow->ts, minKey, maxKey, now)) < 0) {
          goto _exit;
105 106 107 108 109
        }
      }
    }
  }

110 111
_exit:
  return code;
C
Cary Xu 已提交
112
}