schedulerTests.cpp 10.3 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include <gtest/gtest.h>
#include <tglobal.h>
#include <iostream>
#pragma GCC diagnostic ignored "-Wwrite-strings"

#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
#include "os.h"

#include "taos.h"
#include "tdef.h"
#include "tvariant.h"
D
dapan1121 已提交
29 30
#include "catalog.h"
#include "scheduler.h"
D
dapan1121 已提交
31 32
#include "tep.h"
#include "trpc.h"
D
dapan 已提交
33 34 35
#include "schedulerInt.h"
#include "stub.h"
#include "addr_any.h"
D
dapan1121 已提交
36

D
dapan1121 已提交
37
namespace {
D
dapan 已提交
38

D
dapan1121 已提交
39
extern "C" int32_t schHandleResponseMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode);
D
dapan 已提交
40

D
dapan1121 已提交
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
void schtInitLogFile() {
  const char    *defaultLogFileNamePrefix = "taoslog";
  const int32_t  maxLogFileNum = 10;

  tsAsyncLog = 0;
  qDebugFlag = 159;

  char temp[128] = {0};
  sprintf(temp, "%s/%s", tsLogDir, defaultLogFileNamePrefix);
  if (taosInitLog(temp, tsNumOfLogLines, maxLogFileNum) < 0) {
    printf("failed to open log file in directory:%s\n", tsLogDir);
  }

}


D
dapan 已提交
57
void schtBuildQueryDag(SQueryDag *dag) {
D
dapan 已提交
58
  uint64_t qId = 0x0000000000000001;
D
dapan1121 已提交
59 60 61 62
  
  dag->queryId = qId;
  dag->numOfSubplans = 2;
  dag->pSubplans = taosArrayInit(dag->numOfSubplans, POINTER_BYTES);
D
dapan1121 已提交
63 64
  SArray *scan = taosArrayInit(1, POINTER_BYTES);
  SArray *merge = taosArrayInit(1, POINTER_BYTES);
D
dapan1121 已提交
65
  
D
dapan1121 已提交
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
  SSubplan *scanPlan = (SSubplan *)calloc(1, sizeof(SSubplan));
  SSubplan *mergePlan = (SSubplan *)calloc(1, sizeof(SSubplan));

  scanPlan->id.queryId = qId;
  scanPlan->id.templateId = 0x0000000000000002;
  scanPlan->id.subplanId = 0x0000000000000003;
  scanPlan->type = QUERY_TYPE_SCAN;
  scanPlan->execNode.numOfEps = 1;
  scanPlan->execNode.nodeId = 1;
  scanPlan->execNode.inUse = 0;
  scanPlan->execNode.epAddr[0].port = 6030;
  strcpy(scanPlan->execNode.epAddr[0].fqdn, "ep0");
  scanPlan->pChildren = NULL;
  scanPlan->level = 1;
  scanPlan->pParents = taosArrayInit(1, POINTER_BYTES);
  scanPlan->pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode));
D
dapan1121 已提交
82
  scanPlan->msgType = TDMT_VND_QUERY;
D
dapan1121 已提交
83 84 85 86 87 88 89 90 91 92

  mergePlan->id.queryId = qId;
  mergePlan->id.templateId = 0x4444444444;
  mergePlan->id.subplanId = 0x5555555555;
  mergePlan->type = QUERY_TYPE_MERGE;
  mergePlan->level = 0;
  mergePlan->execNode.numOfEps = 0;
  mergePlan->pChildren = taosArrayInit(1, POINTER_BYTES);
  mergePlan->pParents = NULL;
  mergePlan->pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode));
D
dapan1121 已提交
93
  mergePlan->msgType = TDMT_VND_QUERY;
D
dapan1121 已提交
94 95 96 97

  SSubplan *mergePointer = (SSubplan *)taosArrayPush(merge, &mergePlan);
  SSubplan *scanPointer = (SSubplan *)taosArrayPush(scan, &scanPlan);

D
dapan1121 已提交
98 99
  taosArrayPush(mergePlan->pChildren, &scanPlan);
  taosArrayPush(scanPlan->pParents, &mergePlan);
D
dapan1121 已提交
100 101 102 103 104

  taosArrayPush(dag->pSubplans, &merge);  
  taosArrayPush(dag->pSubplans, &scan);
}

D
dapan1121 已提交
105 106 107 108 109
void schtFreeQueryDag(SQueryDag *dag) {

}


D
dapan 已提交
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
void schtBuildInsertDag(SQueryDag *dag) {
  uint64_t qId = 0x0000000000000002;
  
  dag->queryId = qId;
  dag->numOfSubplans = 2;
  dag->pSubplans = taosArrayInit(1, POINTER_BYTES);
  SArray *inserta = taosArrayInit(dag->numOfSubplans, sizeof(SSubplan));
  
  SSubplan insertPlan[2] = {0};

  insertPlan[0].id.queryId = qId;
  insertPlan[0].id.templateId = 0x0000000000000003;
  insertPlan[0].id.subplanId = 0x0000000000000004;
  insertPlan[0].type = QUERY_TYPE_MODIFY;
  insertPlan[0].level = 0;
D
dapan1121 已提交
125 126 127 128 129
  insertPlan[0].execNode.numOfEps = 1;
  insertPlan[0].execNode.nodeId = 1;
  insertPlan[0].execNode.inUse = 0;
  insertPlan[0].execNode.epAddr[0].port = 6030;
  strcpy(insertPlan[0].execNode.epAddr[0].fqdn, "ep0");
H
Haojun Liao 已提交
130
  insertPlan[0].pChildren = NULL;
D
dapan 已提交
131 132 133 134 135 136 137 138 139
  insertPlan[0].pParents = NULL;
  insertPlan[0].pNode = NULL;
  insertPlan[0].pDataSink = (SDataSink*)calloc(1, sizeof(SDataSink));

  insertPlan[1].id.queryId = qId;
  insertPlan[1].id.templateId = 0x0000000000000003;
  insertPlan[1].id.subplanId = 0x0000000000000005;
  insertPlan[1].type = QUERY_TYPE_MODIFY;
  insertPlan[1].level = 0;
D
dapan1121 已提交
140 141 142 143 144
  insertPlan[1].execNode.numOfEps = 1;
  insertPlan[1].execNode.nodeId = 1;
  insertPlan[1].execNode.inUse = 1;
  insertPlan[1].execNode.epAddr[0].port = 6030;
  strcpy(insertPlan[1].execNode.epAddr[0].fqdn, "ep1");
H
Haojun Liao 已提交
145
  insertPlan[1].pChildren = NULL;
D
dapan 已提交
146 147 148 149 150 151 152 153 154 155 156 157
  insertPlan[1].pParents = NULL;
  insertPlan[1].pNode = NULL;
  insertPlan[1].pDataSink = (SDataSink*)calloc(1, sizeof(SDataSink));


  taosArrayPush(inserta, &insertPlan[0]);
  taosArrayPush(inserta, &insertPlan[1]);

  taosArrayPush(dag->pSubplans, &inserta);  
}


D
dapan 已提交
158 159 160 161 162 163
int32_t schtPlanToString(const SSubplan *subplan, char** str, int32_t* len) {
  *str = (char *)calloc(1, 20);
  *len = 20;
  return 0;
}

D
dapan1121 已提交
164
void schtExecNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep) {
H
Haojun Liao 已提交
165

D
dapan 已提交
166 167
}

D
dapan1121 已提交
168 169 170 171 172
void schtRpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) {

}


D
dapan 已提交
173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199

void schtSetPlanToString() {
  static Stub stub;
  stub.set(qSubPlanToString, schtPlanToString);
  {
    AddrAny any("libplanner.so");
    std::map<std::string,void*> result;
    any.get_global_func_addr_dynsym("^qSubPlanToString$", result);
    for (const auto& f : result) {
      stub.set(f.second, schtPlanToString);
    }
  }
}

void schtSetExecNode() {
  static Stub stub;
  stub.set(qSetSubplanExecutionNode, schtExecNode);
  {
    AddrAny any("libplanner.so");
    std::map<std::string,void*> result;
    any.get_global_func_addr_dynsym("^qSetSubplanExecutionNode$", result);
    for (const auto& f : result) {
      stub.set(f.second, schtExecNode);
    }
  }
}

D
dapan1121 已提交
200 201 202 203 204 205 206 207 208 209 210 211 212 213
void schtSetRpcSendRequest() {
  static Stub stub;
  stub.set(rpcSendRequest, schtRpcSendRequest);
  {
    AddrAny any("libtransport.so");
    std::map<std::string,void*> result;
    any.get_global_func_addr_dynsym("^rpcSendRequest$", result);
    for (const auto& f : result) {
      stub.set(f.second, schtRpcSendRequest);
    }
  }
}


D
dapan 已提交
214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230
void *schtSendRsp(void *param) {
  SSchJob *job = NULL;
  int32_t code = 0;

  while (true) {
    job = *(SSchJob **)param;
    if (job) {
      break;
    }

    usleep(1000);
  }
  
  void *pIter = taosHashIterate(job->execTasks, NULL);
  while (pIter) {
    SSchTask *task = *(SSchTask **)pIter;

S
Shengliang Guan 已提交
231
    SShellSubmitRsp rsp = {0};
D
dapan 已提交
232
    rsp.affectedRows = 10;
D
dapan1121 已提交
233
    schHandleResponseMsg(job, task, TDMT_VND_SUBMIT, (char *)&rsp, sizeof(rsp), 0);
D
dapan 已提交
234 235 236 237 238 239 240
    
    pIter = taosHashIterate(job->execTasks, pIter);
  }    

  return NULL;
}

D
dapan1121 已提交
241
struct SSchJob *pInsertJob = NULL;
D
dapan1121 已提交
242 243
}

D
dapan 已提交
244
TEST(queryTest, normalCase) {
D
dapan1121 已提交
245
  void *mockPointer = (void *)0x1;
D
dapan1121 已提交
246 247 248
  char *clusterId = "cluster1";
  char *dbname = "1.db1";
  char *tablename = "table1";
D
dapan1121 已提交
249
  SVgroupInfo vgInfo = {0};
D
dapan1121 已提交
250
  SSchJob *pJob = NULL;
D
dapan1121 已提交
251
  SQueryDag dag = {0};
D
dapan1121 已提交
252 253 254

  schtInitLogFile();

D
dapan1121 已提交
255
  SArray *qnodeList = taosArrayInit(1, sizeof(SEpAddr));
D
dapan 已提交
256 257 258 259 260

  SEpAddr qnodeAddr = {0};
  strcpy(qnodeAddr.fqdn, "qnode0.ep");
  qnodeAddr.port = 6031;
  taosArrayPush(qnodeList, &qnodeAddr);
D
dapan1121 已提交
261 262
  
  int32_t code = schedulerInit(NULL);
D
dapan1121 已提交
263
  ASSERT_EQ(code, 0);
D
dapan1121 已提交
264

D
dapan 已提交
265
  schtBuildQueryDag(&dag);
D
dapan 已提交
266 267 268

  schtSetPlanToString();
  schtSetExecNode();
D
dapan1121 已提交
269 270
  
  code = scheduleAsyncExecJob(mockPointer, qnodeList, &dag, &pJob);
D
dapan1121 已提交
271
  ASSERT_EQ(code, 0);
D
dapan 已提交
272

D
dapan 已提交
273
  SSchJob *job = (SSchJob *)pJob;
D
dapan 已提交
274 275
  void *pIter = taosHashIterate(job->execTasks, NULL);
  while (pIter) {
D
dapan 已提交
276
    SSchTask *task = *(SSchTask **)pIter;
D
dapan 已提交
277 278

    SQueryTableRsp rsp = {0};
D
dapan1121 已提交
279
    code = schHandleResponseMsg(job, task, TDMT_VND_QUERY, (char *)&rsp, sizeof(rsp), 0);
D
dapan 已提交
280 281 282 283 284 285 286
    
    ASSERT_EQ(code, 0);
    pIter = taosHashIterate(job->execTasks, pIter);
  }    

  pIter = taosHashIterate(job->execTasks, NULL);
  while (pIter) {
D
dapan 已提交
287
    SSchTask *task = *(SSchTask **)pIter;
D
dapan 已提交
288 289

    SResReadyRsp rsp = {0};
D
dapan1121 已提交
290
    code = schHandleResponseMsg(job, task, TDMT_VND_RES_READY, (char *)&rsp, sizeof(rsp), 0);
D
dapan 已提交
291 292 293 294 295 296 297
    
    ASSERT_EQ(code, 0);
    pIter = taosHashIterate(job->execTasks, pIter);
  }  

  pIter = taosHashIterate(job->execTasks, NULL);
  while (pIter) {
D
dapan 已提交
298
    SSchTask *task = *(SSchTask **)pIter;
D
dapan 已提交
299 300

    SQueryTableRsp rsp = {0};
D
dapan1121 已提交
301
    code = schHandleResponseMsg(job, task, TDMT_VND_QUERY, (char *)&rsp, sizeof(rsp), 0);
D
dapan 已提交
302 303 304 305 306 307 308
    
    ASSERT_EQ(code, 0);
    pIter = taosHashIterate(job->execTasks, pIter);
  }    

  pIter = taosHashIterate(job->execTasks, NULL);
  while (pIter) {
D
dapan 已提交
309
    SSchTask *task = *(SSchTask **)pIter;
D
dapan 已提交
310 311

    SResReadyRsp rsp = {0};
D
dapan1121 已提交
312
    code = schHandleResponseMsg(job, task, TDMT_VND_RES_READY, (char *)&rsp, sizeof(rsp), 0);
D
dapan 已提交
313 314 315 316 317 318 319 320
    ASSERT_EQ(code, 0);
    
    pIter = taosHashIterate(job->execTasks, pIter);
  }  

  SRetrieveTableRsp rsp = {0};
  rsp.completed = 1;
  rsp.numOfRows = 10;
D
dapan1121 已提交
321
  code = schHandleResponseMsg(job, NULL, TDMT_VND_FETCH, (char *)&rsp, sizeof(rsp), 0);
D
dapan 已提交
322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340
    
  ASSERT_EQ(code, 0);


  void *data = NULL;
  
  code = scheduleFetchRows(job, &data);
  ASSERT_EQ(code, 0);

  SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data;
  ASSERT_EQ(pRsp->completed, 1);
  ASSERT_EQ(pRsp->numOfRows, 10);

  data = NULL;
  code = scheduleFetchRows(job, &data);
  ASSERT_EQ(code, 0);
  ASSERT_EQ(data, (void*)NULL);

  scheduleFreeJob(pJob);
D
dapan1121 已提交
341 342

  schtFreeQueryDag(&dag);
D
dapan1121 已提交
343
}
D
dapan1121 已提交
344 345


D
dapan 已提交
346 347 348 349 350 351 352 353 354 355


TEST(insertTest, normalCase) {
  void *mockPointer = (void *)0x1;
  char *clusterId = "cluster1";
  char *dbname = "1.db1";
  char *tablename = "table1";
  SVgroupInfo vgInfo = {0};
  SQueryDag dag = {0};
  uint64_t numOfRows = 0;
D
dapan1121 已提交
356 357

  schtInitLogFile();
H
Haojun Liao 已提交
358

D
dapan 已提交
359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377
  SArray *qnodeList = taosArrayInit(1, sizeof(SEpAddr));

  SEpAddr qnodeAddr = {0};
  strcpy(qnodeAddr.fqdn, "qnode0.ep");
  qnodeAddr.port = 6031;
  taosArrayPush(qnodeList, &qnodeAddr);
  
  int32_t code = schedulerInit(NULL);
  ASSERT_EQ(code, 0);

  schtBuildInsertDag(&dag);

  schtSetPlanToString();

  pthread_attr_t thattr;
  pthread_attr_init(&thattr);

  pthread_t thread1;
  pthread_create(&(thread1), &thattr, schtSendRsp, &pInsertJob);
D
dapan1121 已提交
378 379 380

  SQueryResult res = {0};
  code = scheduleExecJob(mockPointer, qnodeList, &dag, &pInsertJob, &res);
D
dapan 已提交
381
  ASSERT_EQ(code, 0);
D
dapan1121 已提交
382
  ASSERT_EQ(res.numOfRows, 20);
D
dapan 已提交
383 384 385 386

  scheduleFreeJob(pInsertJob);
}

D
dapan1121 已提交
387
TEST(multiThread, forceFree) {
D
dapan 已提交
388

D
dapan1121 已提交
389 390 391
  schtInitLogFile();

}
D
dapan 已提交
392 393


D
dapan1121 已提交
394 395 396 397 398 399 400
int main(int argc, char** argv) {
  testing::InitGoogleTest(&argc, argv);
  return RUN_ALL_TESTS();
}



D
dapan1121 已提交
401