schedulerTests.cpp 9.2 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include <gtest/gtest.h>
#include <tglobal.h>
#include <iostream>
#pragma GCC diagnostic ignored "-Wwrite-strings"

#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
#include "os.h"

#include "taos.h"
#include "tdef.h"
#include "tvariant.h"
D
dapan1121 已提交
29 30
#include "catalog.h"
#include "scheduler.h"
D
dapan1121 已提交
31 32
#include "tep.h"
#include "trpc.h"
D
dapan 已提交
33 34 35
#include "schedulerInt.h"
#include "stub.h"
#include "addr_any.h"
D
dapan1121 已提交
36

D
dapan1121 已提交
37
namespace {
D
dapan 已提交
38

D
dapan1121 已提交
39
extern "C" int32_t schHandleResponseMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode);
D
dapan 已提交
40

D
dapan 已提交
41
void schtBuildQueryDag(SQueryDag *dag) {
D
dapan 已提交
42
  uint64_t qId = 0x0000000000000001;
D
dapan1121 已提交
43 44 45 46 47 48 49 50 51 52 53
  
  dag->queryId = qId;
  dag->numOfSubplans = 2;
  dag->pSubplans = taosArrayInit(dag->numOfSubplans, POINTER_BYTES);
  SArray *scan = taosArrayInit(1, sizeof(SSubplan));
  SArray *merge = taosArrayInit(1, sizeof(SSubplan));
  
  SSubplan scanPlan = {0};
  SSubplan mergePlan = {0};

  scanPlan.id.queryId = qId;
D
dapan 已提交
54 55
  scanPlan.id.templateId = 0x0000000000000002;
  scanPlan.id.subplanId = 0x0000000000000003;
D
dapan1121 已提交
56
  scanPlan.type = QUERY_TYPE_SCAN;
D
dapan1121 已提交
57 58 59 60 61
  scanPlan.execNode.numOfEps = 1;
  scanPlan.execNode.nodeId = 1;
  scanPlan.execNode.inUse = 0;
  scanPlan.execNode.epAddr[0].port = 6030;
  strcpy(scanPlan.execNode.epAddr[0].fqdn, "ep0");
H
Haojun Liao 已提交
62
  scanPlan.pChildren = NULL;
63
  scanPlan.level = 1;
D
dapan1121 已提交
64
  scanPlan.pParents = taosArrayInit(1, POINTER_BYTES);
D
dapan 已提交
65
  scanPlan.pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode));
D
dapan1121 已提交
66 67 68 69 70 71

  mergePlan.id.queryId = qId;
  mergePlan.id.templateId = 0x4444444444;
  mergePlan.id.subplanId = 0x5555555555;
  mergePlan.type = QUERY_TYPE_MERGE;
  mergePlan.level = 0;
D
dapan1121 已提交
72
  mergePlan.execNode.numOfEps = 0;
H
Haojun Liao 已提交
73
  mergePlan.pChildren = taosArrayInit(1, POINTER_BYTES);
D
dapan1121 已提交
74
  mergePlan.pParents = NULL;
D
dapan 已提交
75
  mergePlan.pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode));
D
dapan1121 已提交
76 77 78 79

  SSubplan *mergePointer = (SSubplan *)taosArrayPush(merge, &mergePlan);
  SSubplan *scanPointer = (SSubplan *)taosArrayPush(scan, &scanPlan);

H
Haojun Liao 已提交
80
  taosArrayPush(mergePointer->pChildren, &scanPointer);
D
dapan1121 已提交
81 82 83 84 85 86
  taosArrayPush(scanPointer->pParents, &mergePointer);

  taosArrayPush(dag->pSubplans, &merge);  
  taosArrayPush(dag->pSubplans, &scan);
}

D
dapan 已提交
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
void schtBuildInsertDag(SQueryDag *dag) {
  uint64_t qId = 0x0000000000000002;
  
  dag->queryId = qId;
  dag->numOfSubplans = 2;
  dag->pSubplans = taosArrayInit(1, POINTER_BYTES);
  SArray *inserta = taosArrayInit(dag->numOfSubplans, sizeof(SSubplan));
  
  SSubplan insertPlan[2] = {0};

  insertPlan[0].id.queryId = qId;
  insertPlan[0].id.templateId = 0x0000000000000003;
  insertPlan[0].id.subplanId = 0x0000000000000004;
  insertPlan[0].type = QUERY_TYPE_MODIFY;
  insertPlan[0].level = 0;
D
dapan1121 已提交
102 103 104 105 106
  insertPlan[0].execNode.numOfEps = 1;
  insertPlan[0].execNode.nodeId = 1;
  insertPlan[0].execNode.inUse = 0;
  insertPlan[0].execNode.epAddr[0].port = 6030;
  strcpy(insertPlan[0].execNode.epAddr[0].fqdn, "ep0");
H
Haojun Liao 已提交
107
  insertPlan[0].pChildren = NULL;
D
dapan 已提交
108 109 110 111 112 113 114 115 116
  insertPlan[0].pParents = NULL;
  insertPlan[0].pNode = NULL;
  insertPlan[0].pDataSink = (SDataSink*)calloc(1, sizeof(SDataSink));

  insertPlan[1].id.queryId = qId;
  insertPlan[1].id.templateId = 0x0000000000000003;
  insertPlan[1].id.subplanId = 0x0000000000000005;
  insertPlan[1].type = QUERY_TYPE_MODIFY;
  insertPlan[1].level = 0;
D
dapan1121 已提交
117 118 119 120 121
  insertPlan[1].execNode.numOfEps = 1;
  insertPlan[1].execNode.nodeId = 1;
  insertPlan[1].execNode.inUse = 1;
  insertPlan[1].execNode.epAddr[0].port = 6030;
  strcpy(insertPlan[1].execNode.epAddr[0].fqdn, "ep1");
H
Haojun Liao 已提交
122
  insertPlan[1].pChildren = NULL;
D
dapan 已提交
123 124 125 126 127 128 129 130 131 132 133 134
  insertPlan[1].pParents = NULL;
  insertPlan[1].pNode = NULL;
  insertPlan[1].pDataSink = (SDataSink*)calloc(1, sizeof(SDataSink));


  taosArrayPush(inserta, &insertPlan[0]);
  taosArrayPush(inserta, &insertPlan[1]);

  taosArrayPush(dag->pSubplans, &inserta);  
}


D
dapan 已提交
135 136 137 138 139 140
int32_t schtPlanToString(const SSubplan *subplan, char** str, int32_t* len) {
  *str = (char *)calloc(1, 20);
  *len = 20;
  return 0;
}

D
dapan1121 已提交
141
int32_t schtExecNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep) {
D
dapan 已提交
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
  return 0;
}


void schtSetPlanToString() {
  static Stub stub;
  stub.set(qSubPlanToString, schtPlanToString);
  {
    AddrAny any("libplanner.so");
    std::map<std::string,void*> result;
    any.get_global_func_addr_dynsym("^qSubPlanToString$", result);
    for (const auto& f : result) {
      stub.set(f.second, schtPlanToString);
    }
  }
}

void schtSetExecNode() {
  static Stub stub;
  stub.set(qSetSubplanExecutionNode, schtExecNode);
  {
    AddrAny any("libplanner.so");
    std::map<std::string,void*> result;
    any.get_global_func_addr_dynsym("^qSetSubplanExecutionNode$", result);
    for (const auto& f : result) {
      stub.set(f.second, schtExecNode);
    }
  }
}

D
dapan 已提交
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188
void *schtSendRsp(void *param) {
  SSchJob *job = NULL;
  int32_t code = 0;

  while (true) {
    job = *(SSchJob **)param;
    if (job) {
      break;
    }

    usleep(1000);
  }
  
  void *pIter = taosHashIterate(job->execTasks, NULL);
  while (pIter) {
    SSchTask *task = *(SSchTask **)pIter;

S
Shengliang Guan 已提交
189
    SShellSubmitRsp rsp = {0};
D
dapan 已提交
190
    rsp.affectedRows = 10;
D
dapan1121 已提交
191
    schHandleResponseMsg(job, task, TDMT_VND_SUBMIT, (char *)&rsp, sizeof(rsp), 0);
D
dapan 已提交
192 193 194 195 196 197 198 199 200
    
    pIter = taosHashIterate(job->execTasks, pIter);
  }    

  return NULL;
}

void *pInsertJob = NULL;

D
dapan 已提交
201

D
dapan1121 已提交
202 203
}

D
dapan 已提交
204
TEST(queryTest, normalCase) {
D
dapan1121 已提交
205
  void *mockPointer = (void *)0x1;
D
dapan1121 已提交
206 207 208
  char *clusterId = "cluster1";
  char *dbname = "1.db1";
  char *tablename = "table1";
D
dapan1121 已提交
209 210 211 212
  SVgroupInfo vgInfo = {0};
  void *pJob = NULL;
  SQueryDag dag = {0};
  SArray *qnodeList = taosArrayInit(1, sizeof(SEpAddr));
D
dapan 已提交
213 214 215 216 217

  SEpAddr qnodeAddr = {0};
  strcpy(qnodeAddr.fqdn, "qnode0.ep");
  qnodeAddr.port = 6031;
  taosArrayPush(qnodeList, &qnodeAddr);
D
dapan1121 已提交
218 219
  
  int32_t code = schedulerInit(NULL);
D
dapan1121 已提交
220
  ASSERT_EQ(code, 0);
D
dapan1121 已提交
221

D
dapan 已提交
222
  schtBuildQueryDag(&dag);
D
dapan 已提交
223 224 225

  schtSetPlanToString();
  schtSetExecNode();
D
dapan1121 已提交
226 227
  
  code = scheduleAsyncExecJob(mockPointer, qnodeList, &dag, &pJob);
D
dapan1121 已提交
228
  ASSERT_EQ(code, 0);
D
dapan 已提交
229

D
dapan 已提交
230
  SSchJob *job = (SSchJob *)pJob;
D
dapan 已提交
231 232
  void *pIter = taosHashIterate(job->execTasks, NULL);
  while (pIter) {
D
dapan 已提交
233
    SSchTask *task = *(SSchTask **)pIter;
D
dapan 已提交
234 235

    SQueryTableRsp rsp = {0};
D
dapan1121 已提交
236
    code = schHandleResponseMsg(job, task, TDMT_VND_QUERY, (char *)&rsp, sizeof(rsp), 0);
D
dapan 已提交
237 238 239 240 241 242 243
    
    ASSERT_EQ(code, 0);
    pIter = taosHashIterate(job->execTasks, pIter);
  }    

  pIter = taosHashIterate(job->execTasks, NULL);
  while (pIter) {
D
dapan 已提交
244
    SSchTask *task = *(SSchTask **)pIter;
D
dapan 已提交
245 246

    SResReadyRsp rsp = {0};
D
dapan1121 已提交
247
    code = schHandleResponseMsg(job, task, TDMT_VND_RES_READY, (char *)&rsp, sizeof(rsp), 0);
D
dapan 已提交
248 249 250 251 252 253 254
    
    ASSERT_EQ(code, 0);
    pIter = taosHashIterate(job->execTasks, pIter);
  }  

  pIter = taosHashIterate(job->execTasks, NULL);
  while (pIter) {
D
dapan 已提交
255
    SSchTask *task = *(SSchTask **)pIter;
D
dapan 已提交
256 257

    SQueryTableRsp rsp = {0};
D
dapan1121 已提交
258
    code = schHandleResponseMsg(job, task, TDMT_VND_QUERY, (char *)&rsp, sizeof(rsp), 0);
D
dapan 已提交
259 260 261 262 263 264 265
    
    ASSERT_EQ(code, 0);
    pIter = taosHashIterate(job->execTasks, pIter);
  }    

  pIter = taosHashIterate(job->execTasks, NULL);
  while (pIter) {
D
dapan 已提交
266
    SSchTask *task = *(SSchTask **)pIter;
D
dapan 已提交
267 268

    SResReadyRsp rsp = {0};
D
dapan1121 已提交
269
    code = schHandleResponseMsg(job, task, TDMT_VND_RES_READY, (char *)&rsp, sizeof(rsp), 0);
D
dapan 已提交
270 271 272 273 274 275 276 277
    ASSERT_EQ(code, 0);
    
    pIter = taosHashIterate(job->execTasks, pIter);
  }  

  SRetrieveTableRsp rsp = {0};
  rsp.completed = 1;
  rsp.numOfRows = 10;
D
dapan1121 已提交
278
  code = schHandleResponseMsg(job, NULL, TDMT_VND_FETCH, (char *)&rsp, sizeof(rsp), 0);
D
dapan 已提交
279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297
    
  ASSERT_EQ(code, 0);


  void *data = NULL;
  
  code = scheduleFetchRows(job, &data);
  ASSERT_EQ(code, 0);

  SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data;
  ASSERT_EQ(pRsp->completed, 1);
  ASSERT_EQ(pRsp->numOfRows, 10);

  data = NULL;
  code = scheduleFetchRows(job, &data);
  ASSERT_EQ(code, 0);
  ASSERT_EQ(data, (void*)NULL);

  scheduleFreeJob(pJob);
D
dapan1121 已提交
298
}
D
dapan1121 已提交
299 300


D
dapan 已提交
301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329


TEST(insertTest, normalCase) {
  void *mockPointer = (void *)0x1;
  char *clusterId = "cluster1";
  char *dbname = "1.db1";
  char *tablename = "table1";
  SVgroupInfo vgInfo = {0};
  SQueryDag dag = {0};
  uint64_t numOfRows = 0;
  SArray *qnodeList = taosArrayInit(1, sizeof(SEpAddr));

  SEpAddr qnodeAddr = {0};
  strcpy(qnodeAddr.fqdn, "qnode0.ep");
  qnodeAddr.port = 6031;
  taosArrayPush(qnodeList, &qnodeAddr);
  
  int32_t code = schedulerInit(NULL);
  ASSERT_EQ(code, 0);

  schtBuildInsertDag(&dag);

  schtSetPlanToString();

  pthread_attr_t thattr;
  pthread_attr_init(&thattr);

  pthread_t thread1;
  pthread_create(&(thread1), &thattr, schtSendRsp, &pInsertJob);
D
dapan1121 已提交
330 331 332

  SQueryResult res = {0};
  code = scheduleExecJob(mockPointer, qnodeList, &dag, &pInsertJob, &res);
D
dapan 已提交
333
  ASSERT_EQ(code, 0);
D
dapan1121 已提交
334
  ASSERT_EQ(res.numOfRows, 20);
D
dapan 已提交
335 336 337 338 339 340 341

  scheduleFreeJob(pInsertJob);
}




D
dapan1121 已提交
342 343 344 345 346 347 348
int main(int argc, char** argv) {
  testing::InitGoogleTest(&argc, argv);
  return RUN_ALL_TESTS();
}



D
dapan1121 已提交
349