parInsertTest.cpp 10.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16 17
#include <functional>

18 19
#include <gtest/gtest.h>

20
#include "mockCatalogService.h"
wafwerar's avatar
wafwerar 已提交
21
#include "os.h"
X
Xiaoyu Wang 已提交
22
#include "parInt.h"
23 24

using namespace std;
25
using namespace std::placeholders;
26 27 28
using namespace testing;

namespace {
X
Xiaoyu Wang 已提交
29 30
string toString(int32_t code) { return tstrerror(code); }
}  // namespace
31 32 33 34 35 36 37 38 39

// syntax:
// INSERT INTO
//   tb_name
//       [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)]
//       [(field1_name, ...)]
//       VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
//   [...];
class InsertTest : public Test {
X
Xiaoyu Wang 已提交
40
 protected:
X
Xiaoyu Wang 已提交
41
  InsertTest() : res_(nullptr) {}
42 43
  ~InsertTest() { reset(); }

44 45 46 47 48 49 50
  void setDatabase(const string& acctId, const string& db) {
    acctId_ = acctId;
    db_ = db;
  }

  void bind(const char* sql) {
    reset();
H
Haojun Liao 已提交
51
    cxt_.acctId = atoi(acctId_.c_str());
X
Xiaoyu Wang 已提交
52
    cxt_.db = (char*)db_.c_str();
53 54 55 56
    strcpy(sqlBuf_, sql);
    cxt_.sqlLen = strlen(sql);
    sqlBuf_[cxt_.sqlLen] = '\0';
    cxt_.pSql = sqlBuf_;
X
Xiaoyu Wang 已提交
57
    cxt_.pUser = "root";
58 59 60
  }

  int32_t run() {
61
    code_ = parseInsertSql(&cxt_, &res_, nullptr);
62 63 64 65 66 67
    if (code_ != TSDB_CODE_SUCCESS) {
      cout << "code:" << toString(code_) << ", msg:" << errMagBuf_ << endl;
    }
    return code_;
  }

68
  int32_t runAsync() {
X
Xiaoyu Wang 已提交
69
    cxt_.async = true;
70 71 72
    bool                                                           request = true;
    unique_ptr<SParseMetaCache, function<void(SParseMetaCache*)> > metaCache(
        new SParseMetaCache(), std::bind(_destoryParseMetaCache, _1, cref(request)));
73
    code_ = parseInsertSyntax(&cxt_, &res_, metaCache.get());
74 75 76 77 78
    if (code_ != TSDB_CODE_SUCCESS) {
      cout << "parseInsertSyntax code:" << toString(code_) << ", msg:" << errMagBuf_ << endl;
      return code_;
    }

79 80
    unique_ptr<SCatalogReq, void (*)(SCatalogReq*)> catalogReq(new SCatalogReq(),
                                                               MockCatalogService::destoryCatalogReq);
X
Xiaoyu Wang 已提交
81
    code_ = buildCatalogReq(&cxt_, metaCache.get(), catalogReq.get());
82 83 84 85 86
    if (code_ != TSDB_CODE_SUCCESS) {
      cout << "buildCatalogReq code:" << toString(code_) << ", msg:" << errMagBuf_ << endl;
      return code_;
    }

87 88
    unique_ptr<SMetaData, void (*)(SMetaData*)> metaData(new SMetaData(), MockCatalogService::destoryMetaData);
    g_mockCatalogService->catalogGetAllMeta(catalogReq.get(), metaData.get());
89

90 91
    metaCache.reset(new SParseMetaCache());
    request = false;
X
Xiaoyu Wang 已提交
92
    code_ = putMetaDataToCache(catalogReq.get(), metaData.get(), metaCache.get(), true);
93 94 95 96 97
    if (code_ != TSDB_CODE_SUCCESS) {
      cout << "putMetaDataToCache code:" << toString(code_) << ", msg:" << errMagBuf_ << endl;
      return code_;
    }

98
    code_ = parseInsertSql(&cxt_, &res_, metaCache.get());
99 100 101 102 103 104 105 106
    if (code_ != TSDB_CODE_SUCCESS) {
      cout << "parseInsertSql code:" << toString(code_) << ", msg:" << errMagBuf_ << endl;
      return code_;
    }

    return code_;
  }

107
  void dumpReslut() {
X
Xiaoyu Wang 已提交
108
    SVnodeModifOpStmt* pStmt = getVnodeModifStmt(res_);
X
Xiaoyu Wang 已提交
109 110 111
    size_t             num = taosArrayGetSize(pStmt->pDataBlocks);
    cout << "payloadType:" << (int32_t)pStmt->payloadType << ", insertType:" << pStmt->insertType
         << ", numOfVgs:" << num << endl;
112
    for (size_t i = 0; i < num; ++i) {
X
Xiaoyu Wang 已提交
113
      SVgDataBlocks* vg = (SVgDataBlocks*)taosArrayGetP(pStmt->pDataBlocks, i);
X
Xiaoyu Wang 已提交
114
      cout << "vgId:" << vg->vg.vgId << ", numOfTables:" << vg->numOfTables << ", dataSize:" << vg->size << endl;
S
Shengliang Guan 已提交
115
      SSubmitReq* submit = (SSubmitReq*)vg->pData;
116
      cout << "length:" << ntohl(submit->length) << ", numOfBlocks:" << ntohl(submit->numOfBlocks) << endl;
X
Xiaoyu Wang 已提交
117
      int32_t     numOfBlocks = ntohl(submit->numOfBlocks);
118 119 120
      SSubmitBlk* blk = (SSubmitBlk*)(submit + 1);
      for (int32_t i = 0; i < numOfBlocks; ++i) {
        cout << "Block:" << i << endl;
C
Cary Xu 已提交
121 122 123
        cout << "\tuid:" << be64toh(blk->uid) << ", tid:" << be64toh(blk->suid) << ", sversion:" << ntohl(blk->sversion)
             << ", dataLen:" << ntohl(blk->dataLen) << ", schemaLen:" << ntohl(blk->schemaLen)
             << ", numOfRows:" << ntohl(blk->numOfRows) << endl;
124 125 126 127 128
        blk = (SSubmitBlk*)(blk->data + ntohl(blk->dataLen));
      }
    }
  }

C
Cary Xu 已提交
129
  void checkReslut(int32_t numOfTables, int32_t numOfRows1, int32_t numOfRows2 = -1) {
X
Xiaoyu Wang 已提交
130 131 132 133
    SVnodeModifOpStmt* pStmt = getVnodeModifStmt(res_);
    ASSERT_EQ(pStmt->payloadType, PAYLOAD_TYPE_KV);
    ASSERT_EQ(pStmt->insertType, TSDB_QUERY_TYPE_INSERT);
    size_t num = taosArrayGetSize(pStmt->pDataBlocks);
134 135
    ASSERT_GE(num, 0);
    for (size_t i = 0; i < num; ++i) {
X
Xiaoyu Wang 已提交
136
      SVgDataBlocks* vg = (SVgDataBlocks*)taosArrayGetP(pStmt->pDataBlocks, i);
137 138
      ASSERT_EQ(vg->numOfTables, numOfTables);
      ASSERT_GE(vg->size, 0);
S
Shengliang Guan 已提交
139
      SSubmitReq* submit = (SSubmitReq*)vg->pData;
140 141
      ASSERT_GE(ntohl(submit->length), 0);
      ASSERT_GE(ntohl(submit->numOfBlocks), 0);
X
Xiaoyu Wang 已提交
142
      int32_t     numOfBlocks = ntohl(submit->numOfBlocks);
143 144
      SSubmitBlk* blk = (SSubmitBlk*)(submit + 1);
      for (int32_t i = 0; i < numOfBlocks; ++i) {
C
Cary Xu 已提交
145
        ASSERT_EQ(ntohl(blk->numOfRows), (0 == i ? numOfRows1 : (numOfRows2 > 0 ? numOfRows2 : numOfRows1)));
146 147 148 149 150
        blk = (SSubmitBlk*)(blk->data + ntohl(blk->dataLen));
      }
    }
  }

X
Xiaoyu Wang 已提交
151
 private:
152 153 154
  static const int max_err_len = 1024;
  static const int max_sql_len = 1024 * 1024;

155 156
  static void _destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request) {
    destoryParseMetaCache(pMetaCache, request);
157 158 159
    delete pMetaCache;
  }

160 161 162 163 164 165
  void reset() {
    memset(&cxt_, 0, sizeof(cxt_));
    memset(errMagBuf_, 0, max_err_len);
    cxt_.pMsg = errMagBuf_;
    cxt_.msgLen = max_err_len;
    code_ = TSDB_CODE_SUCCESS;
166
    qDestroyQuery(res_);
167
    res_ = nullptr;
X
Xiaoyu Wang 已提交
168 169
  }

X
Xiaoyu Wang 已提交
170
  SVnodeModifOpStmt* getVnodeModifStmt(SQuery* pQuery) { return (SVnodeModifOpStmt*)pQuery->pRoot; }
171

X
Xiaoyu Wang 已提交
172 173 174 175
  string        acctId_;
  string        db_;
  char          errMagBuf_[max_err_len];
  char          sqlBuf_[max_sql_len];
176
  SParseContext cxt_;
X
Xiaoyu Wang 已提交
177 178
  int32_t       code_;
  SQuery*       res_;
179 180
};

181
// INSERT INTO tb_name [(field1_name, ...)] VALUES (field1_value, ...)
182 183 184
TEST_F(InsertTest, singleTableSingleRowTest) {
  setDatabase("root", "test");

X
Xiaoyu Wang 已提交
185
  bind("insert into t1 values (now, 1, 'beijing', 3, 4, 5)");
186 187 188
  ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
  dumpReslut();
  checkReslut(1, 1);
189 190 191 192 193 194 195 196 197 198 199

  bind("insert into t1 (ts, c1, c2, c3, c4, c5) values (now, 1, 'beijing', 3, 4, 5)");
  ASSERT_EQ(run(), TSDB_CODE_SUCCESS);

  bind("insert into t1 values (now, 1, 'beijing', 3, 4, 5)");
  ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS);
  dumpReslut();
  checkReslut(1, 1);

  bind("insert into t1 (ts, c1, c2, c3, c4, c5) values (now, 1, 'beijing', 3, 4, 5)");
  ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS);
200 201 202 203 204 205
}

// INSERT INTO tb_name VALUES (field1_value, ...)(field1_value, ...)
TEST_F(InsertTest, singleTableMultiRowTest) {
  setDatabase("root", "test");

X
Xiaoyu Wang 已提交
206
  bind(
207 208
      "insert into t1 values (now, 1, 'beijing', 3, 4, 5)(now+1s, 2, 'shanghai', 6, 7, 8)"
      "(now+2s, 3, 'guangzhou', 9, 10, 11)");
209 210 211
  ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
  dumpReslut();
  checkReslut(1, 3);
212 213 214 215 216

  bind(
      "insert into t1 values (now, 1, 'beijing', 3, 4, 5)(now+1s, 2, 'shanghai', 6, 7, 8)"
      "(now+2s, 3, 'guangzhou', 9, 10, 11)");
  ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS);
217 218 219 220 221 222 223 224 225 226
}

// INSERT INTO tb1_name VALUES (field1_value, ...) tb2_name VALUES (field1_value, ...)
TEST_F(InsertTest, multiTableSingleRowTest) {
  setDatabase("root", "test");

  bind("insert into st1s1 values (now, 1, \"beijing\") st1s2 values (now, 10, \"131028\")");
  ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
  dumpReslut();
  checkReslut(2, 1);
227 228 229

  bind("insert into st1s1 values (now, 1, \"beijing\") st1s2 values (now, 10, \"131028\")");
  ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS);
230 231 232 233 234 235
}

// INSERT INTO tb1_name VALUES (field1_value, ...) tb2_name VALUES (field1_value, ...)
TEST_F(InsertTest, multiTableMultiRowTest) {
  setDatabase("root", "test");

X
Xiaoyu Wang 已提交
236 237 238
  bind(
      "insert into st1s1 values (now, 1, \"beijing\")(now+1s, 2, \"shanghai\")(now+2s, 3, \"guangzhou\")"
      " st1s2 values (now, 10, \"131028\")(now+1s, 20, \"132028\")");
239 240 241
  ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
  dumpReslut();
  checkReslut(2, 3, 2);
242 243 244 245 246

  bind(
      "insert into st1s1 values (now, 1, \"beijing\")(now+1s, 2, \"shanghai\")(now+2s, 3, \"guangzhou\")"
      " st1s2 values (now, 10, \"131028\")(now+1s, 20, \"132028\")");
  ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS);
247 248
}

X
Xiaoyu Wang 已提交
249
// INSERT INTO
X
Xiaoyu Wang 已提交
250 251 252 253 254
//    tb1_name USING st1_name [(tag1_name, ...)] TAGS (tag1_value, ...) VALUES (field1_value, ...)
//    tb2_name USING st2_name [(tag1_name, ...)] TAGS (tag1_value, ...) VALUES (field1_value, ...)
TEST_F(InsertTest, autoCreateTableTest) {
  setDatabase("root", "test");

X
Xiaoyu Wang 已提交
255
  bind(
X
Xiaoyu Wang 已提交
256 257
      "insert into st1s1 using st1 tags(1, 'wxy', now) "
      "values (now, 1, \"beijing\")(now+1s, 2, \"shanghai\")(now+2s, 3, \"guangzhou\")");
X
Xiaoyu Wang 已提交
258 259 260
  ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
  dumpReslut();
  checkReslut(1, 3);
261 262 263 264 265 266 267

  bind(
      "insert into st1s1 using st1 (tag1, tag2) tags(1, 'wxy') values (now, 1, \"beijing\")"
      "(now+1s, 2, \"shanghai\")(now+2s, 3, \"guangzhou\")");
  ASSERT_EQ(run(), TSDB_CODE_SUCCESS);

  bind(
X
Xiaoyu Wang 已提交
268 269
      "insert into st1s1 using st1 tags(1, 'wxy', now) "
      "values (now, 1, \"beijing\")(now+1s, 2, \"shanghai\")(now+2s, 3, \"guangzhou\")");
270 271 272 273 274 275
  ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS);

  bind(
      "insert into st1s1 using st1 (tag1, tag2) tags(1, 'wxy') values (now, 1, \"beijing\")"
      "(now+1s, 2, \"shanghai\")(now+2s, 3, \"guangzhou\")");
  ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS);
276 277 278 279 280

  bind(
      "insert into st1s1 using st1 tags(1, 'wxy', now) values (now, 1, \"beijing\")"
      "st1s1 using st1 tags(1, 'wxy', now) values (now+1s, 2, \"shanghai\")");
  ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
X
Xiaoyu Wang 已提交
281 282
}

283 284 285 286 287 288 289
TEST_F(InsertTest, toleranceTest) {
  setDatabase("root", "test");

  bind("insert into");
  ASSERT_NE(run(), TSDB_CODE_SUCCESS);
  bind("insert into t");
  ASSERT_NE(run(), TSDB_CODE_SUCCESS);
290 291 292 293 294

  bind("insert into");
  ASSERT_NE(runAsync(), TSDB_CODE_SUCCESS);
  bind("insert into t");
  ASSERT_NE(runAsync(), TSDB_CODE_SUCCESS);
295
}