schedulerTests.cpp 9.8 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include <gtest/gtest.h>
#include <tglobal.h>
#include <iostream>
#pragma GCC diagnostic ignored "-Wwrite-strings"

#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
#include "os.h"

#include "taos.h"
#include "tdef.h"
#include "tvariant.h"
D
dapan1121 已提交
29 30
#include "catalog.h"
#include "scheduler.h"
D
dapan1121 已提交
31 32
#include "tep.h"
#include "trpc.h"
D
dapan 已提交
33 34 35
#include "schedulerInt.h"
#include "stub.h"
#include "addr_any.h"
D
dapan1121 已提交
36

D
dapan1121 已提交
37
namespace {
D
dapan 已提交
38

D
dapan1121 已提交
39
extern "C" int32_t schHandleResponseMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode);
D
dapan 已提交
40

D
dapan1121 已提交
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
void schtInitLogFile() {
  const char    *defaultLogFileNamePrefix = "taoslog";
  const int32_t  maxLogFileNum = 10;

  tsAsyncLog = 0;
  qDebugFlag = 159;

  char temp[128] = {0};
  sprintf(temp, "%s/%s", tsLogDir, defaultLogFileNamePrefix);
  if (taosInitLog(temp, tsNumOfLogLines, maxLogFileNum) < 0) {
    printf("failed to open log file in directory:%s\n", tsLogDir);
  }

}


D
dapan 已提交
57
void schtBuildQueryDag(SQueryDag *dag) {
D
dapan 已提交
58
  uint64_t qId = 0x0000000000000001;
D
dapan1121 已提交
59 60 61 62
  
  dag->queryId = qId;
  dag->numOfSubplans = 2;
  dag->pSubplans = taosArrayInit(dag->numOfSubplans, POINTER_BYTES);
D
dapan1121 已提交
63 64
  SArray *scan = taosArrayInit(1, POINTER_BYTES);
  SArray *merge = taosArrayInit(1, POINTER_BYTES);
D
dapan1121 已提交
65
  
D
dapan1121 已提交
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
  SSubplan *scanPlan = (SSubplan *)calloc(1, sizeof(SSubplan));
  SSubplan *mergePlan = (SSubplan *)calloc(1, sizeof(SSubplan));

  scanPlan->id.queryId = qId;
  scanPlan->id.templateId = 0x0000000000000002;
  scanPlan->id.subplanId = 0x0000000000000003;
  scanPlan->type = QUERY_TYPE_SCAN;
  scanPlan->execNode.numOfEps = 1;
  scanPlan->execNode.nodeId = 1;
  scanPlan->execNode.inUse = 0;
  scanPlan->execNode.epAddr[0].port = 6030;
  strcpy(scanPlan->execNode.epAddr[0].fqdn, "ep0");
  scanPlan->pChildren = NULL;
  scanPlan->level = 1;
  scanPlan->pParents = taosArrayInit(1, POINTER_BYTES);
  scanPlan->pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode));

  mergePlan->id.queryId = qId;
  mergePlan->id.templateId = 0x4444444444;
  mergePlan->id.subplanId = 0x5555555555;
  mergePlan->type = QUERY_TYPE_MERGE;
  mergePlan->level = 0;
  mergePlan->execNode.numOfEps = 0;
  mergePlan->pChildren = taosArrayInit(1, POINTER_BYTES);
  mergePlan->pParents = NULL;
  mergePlan->pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode));
D
dapan1121 已提交
92 93 94 95

  SSubplan *mergePointer = (SSubplan *)taosArrayPush(merge, &mergePlan);
  SSubplan *scanPointer = (SSubplan *)taosArrayPush(scan, &scanPlan);

D
dapan1121 已提交
96 97
  taosArrayPush(mergePlan->pChildren, &scanPlan);
  taosArrayPush(scanPlan->pParents, &mergePlan);
D
dapan1121 已提交
98 99 100 101 102

  taosArrayPush(dag->pSubplans, &merge);  
  taosArrayPush(dag->pSubplans, &scan);
}

D
dapan1121 已提交
103 104 105 106 107
void schtFreeQueryDag(SQueryDag *dag) {

}


D
dapan 已提交
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
void schtBuildInsertDag(SQueryDag *dag) {
  uint64_t qId = 0x0000000000000002;
  
  dag->queryId = qId;
  dag->numOfSubplans = 2;
  dag->pSubplans = taosArrayInit(1, POINTER_BYTES);
  SArray *inserta = taosArrayInit(dag->numOfSubplans, sizeof(SSubplan));
  
  SSubplan insertPlan[2] = {0};

  insertPlan[0].id.queryId = qId;
  insertPlan[0].id.templateId = 0x0000000000000003;
  insertPlan[0].id.subplanId = 0x0000000000000004;
  insertPlan[0].type = QUERY_TYPE_MODIFY;
  insertPlan[0].level = 0;
D
dapan1121 已提交
123 124 125 126 127
  insertPlan[0].execNode.numOfEps = 1;
  insertPlan[0].execNode.nodeId = 1;
  insertPlan[0].execNode.inUse = 0;
  insertPlan[0].execNode.epAddr[0].port = 6030;
  strcpy(insertPlan[0].execNode.epAddr[0].fqdn, "ep0");
H
Haojun Liao 已提交
128
  insertPlan[0].pChildren = NULL;
D
dapan 已提交
129 130 131 132 133 134 135 136 137
  insertPlan[0].pParents = NULL;
  insertPlan[0].pNode = NULL;
  insertPlan[0].pDataSink = (SDataSink*)calloc(1, sizeof(SDataSink));

  insertPlan[1].id.queryId = qId;
  insertPlan[1].id.templateId = 0x0000000000000003;
  insertPlan[1].id.subplanId = 0x0000000000000005;
  insertPlan[1].type = QUERY_TYPE_MODIFY;
  insertPlan[1].level = 0;
D
dapan1121 已提交
138 139 140 141 142
  insertPlan[1].execNode.numOfEps = 1;
  insertPlan[1].execNode.nodeId = 1;
  insertPlan[1].execNode.inUse = 1;
  insertPlan[1].execNode.epAddr[0].port = 6030;
  strcpy(insertPlan[1].execNode.epAddr[0].fqdn, "ep1");
H
Haojun Liao 已提交
143
  insertPlan[1].pChildren = NULL;
D
dapan 已提交
144 145 146 147 148 149 150 151 152 153 154 155
  insertPlan[1].pParents = NULL;
  insertPlan[1].pNode = NULL;
  insertPlan[1].pDataSink = (SDataSink*)calloc(1, sizeof(SDataSink));


  taosArrayPush(inserta, &insertPlan[0]);
  taosArrayPush(inserta, &insertPlan[1]);

  taosArrayPush(dag->pSubplans, &inserta);  
}


D
dapan 已提交
156 157 158 159 160 161
int32_t schtPlanToString(const SSubplan *subplan, char** str, int32_t* len) {
  *str = (char *)calloc(1, 20);
  *len = 20;
  return 0;
}

D
dapan1121 已提交
162 163
void schtExecNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep) {
  
D
dapan 已提交
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
}


void schtSetPlanToString() {
  static Stub stub;
  stub.set(qSubPlanToString, schtPlanToString);
  {
    AddrAny any("libplanner.so");
    std::map<std::string,void*> result;
    any.get_global_func_addr_dynsym("^qSubPlanToString$", result);
    for (const auto& f : result) {
      stub.set(f.second, schtPlanToString);
    }
  }
}

void schtSetExecNode() {
  static Stub stub;
  stub.set(qSetSubplanExecutionNode, schtExecNode);
  {
    AddrAny any("libplanner.so");
    std::map<std::string,void*> result;
    any.get_global_func_addr_dynsym("^qSetSubplanExecutionNode$", result);
    for (const auto& f : result) {
      stub.set(f.second, schtExecNode);
    }
  }
}

D
dapan 已提交
193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
void *schtSendRsp(void *param) {
  SSchJob *job = NULL;
  int32_t code = 0;

  while (true) {
    job = *(SSchJob **)param;
    if (job) {
      break;
    }

    usleep(1000);
  }
  
  void *pIter = taosHashIterate(job->execTasks, NULL);
  while (pIter) {
    SSchTask *task = *(SSchTask **)pIter;

S
Shengliang Guan 已提交
210
    SShellSubmitRsp rsp = {0};
D
dapan 已提交
211
    rsp.affectedRows = 10;
D
dapan1121 已提交
212
    schHandleResponseMsg(job, task, TDMT_VND_SUBMIT, (char *)&rsp, sizeof(rsp), 0);
D
dapan 已提交
213 214 215 216 217 218 219
    
    pIter = taosHashIterate(job->execTasks, pIter);
  }    

  return NULL;
}

D
dapan1121 已提交
220
struct SSchJob *pInsertJob = NULL;
D
dapan 已提交
221

D
dapan 已提交
222

D
dapan1121 已提交
223 224
}

D
dapan 已提交
225
TEST(queryTest, normalCase) {
D
dapan1121 已提交
226
  void *mockPointer = (void *)0x1;
D
dapan1121 已提交
227 228 229
  char *clusterId = "cluster1";
  char *dbname = "1.db1";
  char *tablename = "table1";
D
dapan1121 已提交
230
  SVgroupInfo vgInfo = {0};
D
dapan1121 已提交
231
  SSchJob *pJob = NULL;
D
dapan1121 已提交
232
  SQueryDag dag = {0};
D
dapan1121 已提交
233 234 235

  schtInitLogFile();

D
dapan1121 已提交
236
  SArray *qnodeList = taosArrayInit(1, sizeof(SEpAddr));
D
dapan 已提交
237 238 239 240 241

  SEpAddr qnodeAddr = {0};
  strcpy(qnodeAddr.fqdn, "qnode0.ep");
  qnodeAddr.port = 6031;
  taosArrayPush(qnodeList, &qnodeAddr);
D
dapan1121 已提交
242 243
  
  int32_t code = schedulerInit(NULL);
D
dapan1121 已提交
244
  ASSERT_EQ(code, 0);
D
dapan1121 已提交
245

D
dapan 已提交
246
  schtBuildQueryDag(&dag);
D
dapan 已提交
247 248 249

  schtSetPlanToString();
  schtSetExecNode();
D
dapan1121 已提交
250 251
  
  code = scheduleAsyncExecJob(mockPointer, qnodeList, &dag, &pJob);
D
dapan1121 已提交
252
  ASSERT_EQ(code, 0);
D
dapan 已提交
253

D
dapan 已提交
254
  SSchJob *job = (SSchJob *)pJob;
D
dapan 已提交
255 256
  void *pIter = taosHashIterate(job->execTasks, NULL);
  while (pIter) {
D
dapan 已提交
257
    SSchTask *task = *(SSchTask **)pIter;
D
dapan 已提交
258 259

    SQueryTableRsp rsp = {0};
D
dapan1121 已提交
260
    code = schHandleResponseMsg(job, task, TDMT_VND_QUERY, (char *)&rsp, sizeof(rsp), 0);
D
dapan 已提交
261 262 263 264 265 266 267
    
    ASSERT_EQ(code, 0);
    pIter = taosHashIterate(job->execTasks, pIter);
  }    

  pIter = taosHashIterate(job->execTasks, NULL);
  while (pIter) {
D
dapan 已提交
268
    SSchTask *task = *(SSchTask **)pIter;
D
dapan 已提交
269 270

    SResReadyRsp rsp = {0};
D
dapan1121 已提交
271
    code = schHandleResponseMsg(job, task, TDMT_VND_RES_READY, (char *)&rsp, sizeof(rsp), 0);
D
dapan 已提交
272 273 274 275 276 277 278
    
    ASSERT_EQ(code, 0);
    pIter = taosHashIterate(job->execTasks, pIter);
  }  

  pIter = taosHashIterate(job->execTasks, NULL);
  while (pIter) {
D
dapan 已提交
279
    SSchTask *task = *(SSchTask **)pIter;
D
dapan 已提交
280 281

    SQueryTableRsp rsp = {0};
D
dapan1121 已提交
282
    code = schHandleResponseMsg(job, task, TDMT_VND_QUERY, (char *)&rsp, sizeof(rsp), 0);
D
dapan 已提交
283 284 285 286 287 288 289
    
    ASSERT_EQ(code, 0);
    pIter = taosHashIterate(job->execTasks, pIter);
  }    

  pIter = taosHashIterate(job->execTasks, NULL);
  while (pIter) {
D
dapan 已提交
290
    SSchTask *task = *(SSchTask **)pIter;
D
dapan 已提交
291 292

    SResReadyRsp rsp = {0};
D
dapan1121 已提交
293
    code = schHandleResponseMsg(job, task, TDMT_VND_RES_READY, (char *)&rsp, sizeof(rsp), 0);
D
dapan 已提交
294 295 296 297 298 299 300 301
    ASSERT_EQ(code, 0);
    
    pIter = taosHashIterate(job->execTasks, pIter);
  }  

  SRetrieveTableRsp rsp = {0};
  rsp.completed = 1;
  rsp.numOfRows = 10;
D
dapan1121 已提交
302
  code = schHandleResponseMsg(job, NULL, TDMT_VND_FETCH, (char *)&rsp, sizeof(rsp), 0);
D
dapan 已提交
303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321
    
  ASSERT_EQ(code, 0);


  void *data = NULL;
  
  code = scheduleFetchRows(job, &data);
  ASSERT_EQ(code, 0);

  SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data;
  ASSERT_EQ(pRsp->completed, 1);
  ASSERT_EQ(pRsp->numOfRows, 10);

  data = NULL;
  code = scheduleFetchRows(job, &data);
  ASSERT_EQ(code, 0);
  ASSERT_EQ(data, (void*)NULL);

  scheduleFreeJob(pJob);
D
dapan1121 已提交
322 323

  schtFreeQueryDag(&dag);
D
dapan1121 已提交
324
}
D
dapan1121 已提交
325 326


D
dapan 已提交
327 328 329 330 331 332 333 334 335 336


TEST(insertTest, normalCase) {
  void *mockPointer = (void *)0x1;
  char *clusterId = "cluster1";
  char *dbname = "1.db1";
  char *tablename = "table1";
  SVgroupInfo vgInfo = {0};
  SQueryDag dag = {0};
  uint64_t numOfRows = 0;
D
dapan1121 已提交
337 338 339

  schtInitLogFile();
  
D
dapan 已提交
340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358
  SArray *qnodeList = taosArrayInit(1, sizeof(SEpAddr));

  SEpAddr qnodeAddr = {0};
  strcpy(qnodeAddr.fqdn, "qnode0.ep");
  qnodeAddr.port = 6031;
  taosArrayPush(qnodeList, &qnodeAddr);
  
  int32_t code = schedulerInit(NULL);
  ASSERT_EQ(code, 0);

  schtBuildInsertDag(&dag);

  schtSetPlanToString();

  pthread_attr_t thattr;
  pthread_attr_init(&thattr);

  pthread_t thread1;
  pthread_create(&(thread1), &thattr, schtSendRsp, &pInsertJob);
D
dapan1121 已提交
359 360 361

  SQueryResult res = {0};
  code = scheduleExecJob(mockPointer, qnodeList, &dag, &pInsertJob, &res);
D
dapan 已提交
362
  ASSERT_EQ(code, 0);
D
dapan1121 已提交
363
  ASSERT_EQ(res.numOfRows, 20);
D
dapan 已提交
364 365 366 367

  scheduleFreeJob(pInsertJob);
}

D
dapan1121 已提交
368
TEST(multiThread, forceFree) {
D
dapan 已提交
369

D
dapan1121 已提交
370 371 372
  schtInitLogFile();

}
D
dapan 已提交
373 374


D
dapan1121 已提交
375 376 377 378 379 380 381
int main(int argc, char** argv) {
  testing::InitGoogleTest(&argc, argv);
  return RUN_ALL_TESTS();
}



D
dapan1121 已提交
382