schedulerTests.cpp 18.3 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include <gtest/gtest.h>
#include <iostream>

19 20 21 22 23 24 25 26 27 28
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
#pragma GCC diagnostic ignored "-Wreturn-type"
#pragma GCC diagnostic ignored "-Wformat"
#include <addr_any.h>


D
dapan1121 已提交
29 30
#include "os.h"

31
#include "tglobal.h"
D
dapan1121 已提交
32 33 34
#include "taos.h"
#include "tdef.h"
#include "tvariant.h"
D
dapan1121 已提交
35 36
#include "catalog.h"
#include "scheduler.h"
H
Haojun Liao 已提交
37 38 39
#include "taos.h"
#include "tdatablock.h"
#include "tdef.h"
D
dapan1121 已提交
40
#include "trpc.h"
H
Haojun Liao 已提交
41
#include "tvariant.h"
S
Shengliang Guan 已提交
42 43 44 45 46 47 48 49 50

#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
#pragma GCC diagnostic ignored "-Wreturn-type"
#pragma GCC diagnostic ignored "-Wformat"

D
dapan 已提交
51 52
#include "schedulerInt.h"
#include "stub.h"
D
dapan1121 已提交
53
#include "tref.h"
D
dapan1121 已提交
54

D
dapan1121 已提交
55
namespace {
D
dapan 已提交
56

D
dapan1121 已提交
57
extern "C" int32_t schHandleResponseMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode);
D
dapan1121 已提交
58 59
extern "C" int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, int32_t rspCode);

D
dapan1121 已提交
60 61
int64_t insertJobRefId = 0;
int64_t queryJobRefId = 0;
D
dapan1121 已提交
62 63 64 65 66 67 68

uint64_t schtMergeTemplateId = 0x4;
uint64_t schtFetchTaskId = 0;
uint64_t schtQueryId = 1;

bool schtTestStop = false;
bool schtTestDeadLoop = false;
D
dapan1121 已提交
69
int32_t schtTestMTRunSec = 10;
D
dapan1121 已提交
70 71
int32_t schtTestPrintNum = 1000;
int32_t schtStartFetch = 0;
D
dapan 已提交
72

D
dapan1121 已提交
73

D
dapan1121 已提交
74 75 76 77 78 79
void schtInitLogFile() {
  const char    *defaultLogFileNamePrefix = "taoslog";
  const int32_t  maxLogFileNum = 10;

  tsAsyncLog = 0;
  qDebugFlag = 159;
D
dapan1121 已提交
80
  strcpy(tsLogDir, "/var/log/taos");
D
dapan1121 已提交
81

S
Shengliang Guan 已提交
82
  if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) {
S
os env  
Shengliang Guan 已提交
83
    printf("failed to open log file in directory:%s\n", tsLogDir);
D
dapan1121 已提交
84 85 86 87 88
  }

}


D
dapan 已提交
89
void schtBuildQueryDag(SQueryDag *dag) {
D
dapan1121 已提交
90
  uint64_t qId = schtQueryId;
D
dapan1121 已提交
91 92 93 94
  
  dag->queryId = qId;
  dag->numOfSubplans = 2;
  dag->pSubplans = taosArrayInit(dag->numOfSubplans, POINTER_BYTES);
D
dapan1121 已提交
95 96
  SArray *scan = taosArrayInit(1, POINTER_BYTES);
  SArray *merge = taosArrayInit(1, POINTER_BYTES);
D
dapan1121 已提交
97
  
D
dapan1121 已提交
98 99 100 101 102 103 104
  SSubplan *scanPlan = (SSubplan *)calloc(1, sizeof(SSubplan));
  SSubplan *mergePlan = (SSubplan *)calloc(1, sizeof(SSubplan));

  scanPlan->id.queryId = qId;
  scanPlan->id.templateId = 0x0000000000000002;
  scanPlan->id.subplanId = 0x0000000000000003;
  scanPlan->type = QUERY_TYPE_SCAN;
H
Haojun Liao 已提交
105

D
dapan1121 已提交
106
  scanPlan->execNode.nodeId = 1;
L
Liu Jicong 已提交
107 108
  scanPlan->execNode.epSet.inUse = 0;
  addEpIntoEpSet(&scanPlan->execNode.epSet, "ep0", 6030);
H
Haojun Liao 已提交
109

D
dapan1121 已提交
110 111 112 113
  scanPlan->pChildren = NULL;
  scanPlan->level = 1;
  scanPlan->pParents = taosArrayInit(1, POINTER_BYTES);
  scanPlan->pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode));
D
dapan1121 已提交
114
  scanPlan->msgType = TDMT_VND_QUERY;
D
dapan1121 已提交
115 116

  mergePlan->id.queryId = qId;
D
dapan1121 已提交
117
  mergePlan->id.templateId = schtMergeTemplateId;
D
dapan1121 已提交
118 119 120
  mergePlan->id.subplanId = 0x5555555555;
  mergePlan->type = QUERY_TYPE_MERGE;
  mergePlan->level = 0;
L
Liu Jicong 已提交
121
  mergePlan->execNode.epSet.numOfEps = 0;
H
Haojun Liao 已提交
122

D
dapan1121 已提交
123 124 125
  mergePlan->pChildren = taosArrayInit(1, POINTER_BYTES);
  mergePlan->pParents = NULL;
  mergePlan->pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode));
D
dapan1121 已提交
126
  mergePlan->msgType = TDMT_VND_QUERY;
D
dapan1121 已提交
127 128 129 130

  SSubplan *mergePointer = (SSubplan *)taosArrayPush(merge, &mergePlan);
  SSubplan *scanPointer = (SSubplan *)taosArrayPush(scan, &scanPlan);

D
dapan1121 已提交
131 132
  taosArrayPush(mergePlan->pChildren, &scanPlan);
  taosArrayPush(scanPlan->pParents, &mergePlan);
D
dapan1121 已提交
133 134 135 136 137

  taosArrayPush(dag->pSubplans, &merge);  
  taosArrayPush(dag->pSubplans, &scan);
}

D
dapan1121 已提交
138 139 140 141 142
void schtFreeQueryDag(SQueryDag *dag) {

}


D
dapan 已提交
143 144 145 146 147 148
void schtBuildInsertDag(SQueryDag *dag) {
  uint64_t qId = 0x0000000000000002;
  
  dag->queryId = qId;
  dag->numOfSubplans = 2;
  dag->pSubplans = taosArrayInit(1, POINTER_BYTES);
D
dapan1121 已提交
149
  SArray *inserta = taosArrayInit(dag->numOfSubplans, POINTER_BYTES);
D
dapan 已提交
150
  
D
dapan1121 已提交
151
  SSubplan *insertPlan = (SSubplan *)calloc(2, sizeof(SSubplan));
D
dapan 已提交
152 153 154 155 156 157

  insertPlan[0].id.queryId = qId;
  insertPlan[0].id.templateId = 0x0000000000000003;
  insertPlan[0].id.subplanId = 0x0000000000000004;
  insertPlan[0].type = QUERY_TYPE_MODIFY;
  insertPlan[0].level = 0;
H
Haojun Liao 已提交
158

D
dapan1121 已提交
159
  insertPlan[0].execNode.nodeId = 1;
L
Liu Jicong 已提交
160 161
  insertPlan[0].execNode.epSet.inUse = 0;
  addEpIntoEpSet(&insertPlan[0].execNode.epSet, "ep0", 6030);
H
Haojun Liao 已提交
162

H
Haojun Liao 已提交
163
  insertPlan[0].pChildren = NULL;
D
dapan 已提交
164 165 166
  insertPlan[0].pParents = NULL;
  insertPlan[0].pNode = NULL;
  insertPlan[0].pDataSink = (SDataSink*)calloc(1, sizeof(SDataSink));
D
dapan1121 已提交
167
  insertPlan[0].msgType = TDMT_VND_SUBMIT;
D
dapan 已提交
168 169 170 171 172 173

  insertPlan[1].id.queryId = qId;
  insertPlan[1].id.templateId = 0x0000000000000003;
  insertPlan[1].id.subplanId = 0x0000000000000005;
  insertPlan[1].type = QUERY_TYPE_MODIFY;
  insertPlan[1].level = 0;
H
Haojun Liao 已提交
174

D
dapan1121 已提交
175
  insertPlan[1].execNode.nodeId = 1;
L
Liu Jicong 已提交
176 177
  insertPlan[1].execNode.epSet.inUse = 0;
  addEpIntoEpSet(&insertPlan[1].execNode.epSet, "ep0", 6030);
H
Haojun Liao 已提交
178

H
Haojun Liao 已提交
179
  insertPlan[1].pChildren = NULL;
D
dapan 已提交
180 181 182
  insertPlan[1].pParents = NULL;
  insertPlan[1].pNode = NULL;
  insertPlan[1].pDataSink = (SDataSink*)calloc(1, sizeof(SDataSink));
D
dapan1121 已提交
183
  insertPlan[1].msgType = TDMT_VND_SUBMIT;
D
dapan 已提交
184

D
dapan1121 已提交
185 186 187
  taosArrayPush(inserta, &insertPlan);
  insertPlan += 1;
  taosArrayPush(inserta, &insertPlan);
D
dapan 已提交
188 189 190 191 192

  taosArrayPush(dag->pSubplans, &inserta);  
}


D
dapan 已提交
193 194 195 196 197 198
int32_t schtPlanToString(const SSubplan *subplan, char** str, int32_t* len) {
  *str = (char *)calloc(1, 20);
  *len = 20;
  return 0;
}

D
dapan1121 已提交
199
void schtExecNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep) {
H
Haojun Liao 已提交
200

D
dapan 已提交
201 202
}

D
dapan1121 已提交
203 204 205 206
void schtRpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) {

}

D
dapan 已提交
207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232
void schtSetPlanToString() {
  static Stub stub;
  stub.set(qSubPlanToString, schtPlanToString);
  {
    AddrAny any("libplanner.so");
    std::map<std::string,void*> result;
    any.get_global_func_addr_dynsym("^qSubPlanToString$", result);
    for (const auto& f : result) {
      stub.set(f.second, schtPlanToString);
    }
  }
}

void schtSetExecNode() {
  static Stub stub;
  stub.set(qSetSubplanExecutionNode, schtExecNode);
  {
    AddrAny any("libplanner.so");
    std::map<std::string,void*> result;
    any.get_global_func_addr_dynsym("^qSetSubplanExecutionNode$", result);
    for (const auto& f : result) {
      stub.set(f.second, schtExecNode);
    }
  }
}

D
dapan1121 已提交
233 234 235 236 237 238 239 240 241 242 243 244 245
void schtSetRpcSendRequest() {
  static Stub stub;
  stub.set(rpcSendRequest, schtRpcSendRequest);
  {
    AddrAny any("libtransport.so");
    std::map<std::string,void*> result;
    any.get_global_func_addr_dynsym("^rpcSendRequest$", result);
    for (const auto& f : result) {
      stub.set(f.second, schtRpcSendRequest);
    }
  }
}

D
dapan1121 已提交
246 247 248 249 250 251
int32_t schtAsyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo) {
  if (pInfo) {
    tfree(pInfo->param);
    tfree(pInfo->msgInfo.pData);
    free(pInfo);
  }
D
dapan1121 已提交
252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268
  return 0;
}


void schtSetAsyncSendMsgToServer() {
  static Stub stub;
  stub.set(asyncSendMsgToServer, schtAsyncSendMsgToServer);
  {
    AddrAny any("libtransport.so");
    std::map<std::string,void*> result;
    any.get_global_func_addr_dynsym("^asyncSendMsgToServer$", result);
    for (const auto& f : result) {
      stub.set(f.second, schtAsyncSendMsgToServer);
    }
  }
}

D
dapan1121 已提交
269

D
dapan 已提交
270
void *schtSendRsp(void *param) {
D
dapan1121 已提交
271 272
  SSchJob *pJob = NULL;
  int64_t job = 0;
D
dapan 已提交
273 274 275
  int32_t code = 0;

  while (true) {
D
dapan1121 已提交
276
    job = *(int64_t *)param;
D
dapan 已提交
277 278 279 280
    if (job) {
      break;
    }

wafwerar's avatar
wafwerar 已提交
281
    taosMsleep(1);
D
dapan 已提交
282
  }
D
dapan1121 已提交
283 284

  pJob = schAcquireJob(job);
D
dapan 已提交
285
  
D
dapan1121 已提交
286
  void *pIter = taosHashIterate(pJob->execTasks, NULL);
D
dapan 已提交
287 288 289
  while (pIter) {
    SSchTask *task = *(SSchTask **)pIter;

S
Shengliang Guan 已提交
290
    SSubmitRsp rsp = {0};
D
dapan 已提交
291
    rsp.affectedRows = 10;
D
dapan1121 已提交
292
    schHandleResponseMsg(pJob, task, TDMT_VND_SUBMIT_RSP, (char *)&rsp, sizeof(rsp), 0);
D
dapan 已提交
293
    
D
dapan1121 已提交
294
    pIter = taosHashIterate(pJob->execTasks, pIter);
D
dapan 已提交
295 296
  }    

D
dapan1121 已提交
297 298
  schReleaseJob(job);

D
dapan 已提交
299 300 301
  return NULL;
}

D
dapan1121 已提交
302
void *schtCreateFetchRspThread(void *param) {
D
dapan1121 已提交
303 304
  int64_t job = *(int64_t *)param;
  SSchJob* pJob = schAcquireJob(job);
D
dapan1121 已提交
305

wafwerar's avatar
wafwerar 已提交
306
  taosSsleep(1);
D
dapan1121 已提交
307 308 309 310 311

  int32_t code = 0;
  SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)calloc(1, sizeof(SRetrieveTableRsp));
  rsp->completed = 1;
  rsp->numOfRows = 10;
D
dapan1121 已提交
312
 
D
dapan1121 已提交
313 314 315 316
  code = schHandleResponseMsg(pJob, pJob->fetchTask, TDMT_VND_FETCH_RSP, (char *)rsp, sizeof(*rsp), 0);

  schReleaseJob(job);
  
D
dapan1121 已提交
317 318 319 320
  assert(code == 0);
}


D
dapan1121 已提交
321 322 323 324 325 326 327 328 329
void *schtFetchRspThread(void *aa) {
  SDataBuf dataBuf = {0};
  SSchCallbackParam* param = NULL;

  while (!schtTestStop) {
    if (0 == atomic_val_compare_exchange_32(&schtStartFetch, 1, 0)) {
      continue;
    }

wafwerar's avatar
wafwerar 已提交
330
    taosUsleep(1);
D
dapan1121 已提交
331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352
    
    param = (SSchCallbackParam *)calloc(1, sizeof(*param));

    param->queryId = schtQueryId;  
    param->taskId = schtFetchTaskId;

    int32_t code = 0;
    SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)calloc(1, sizeof(SRetrieveTableRsp));
    rsp->completed = 1;
    rsp->numOfRows = 10;

    dataBuf.pData = rsp;
    dataBuf.len = sizeof(*rsp);

    code = schHandleCallback(param, &dataBuf, TDMT_VND_FETCH_RSP, 0);
      
    assert(code == 0 || code);
  }
}

void schtFreeQueryJob(int32_t freeThread) {
  static uint32_t freeNum = 0;
D
dapan1121 已提交
353
  int64_t job = queryJobRefId;
D
dapan1121 已提交
354
  
D
dapan1121 已提交
355
  if (job && atomic_val_compare_exchange_64(&queryJobRefId, job, 0)) {
D
dapan1121 已提交
356
    schedulerFreeJob(job);
D
dapan1121 已提交
357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383
    if (freeThread) {
      if (++freeNum % schtTestPrintNum == 0) {
        printf("FreeNum:%d\n", freeNum);
      }
    }
  }
}

void* schtRunJobThread(void *aa) {
  void *mockPointer = (void *)0x1;
  char *clusterId = "cluster1";
  char *dbname = "1.db1";
  char *tablename = "table1";
  SVgroupInfo vgInfo = {0};
  SQueryDag dag = {0};

  schtInitLogFile();

  
  int32_t code = schedulerInit(NULL);
  assert(code == 0);


  schtSetPlanToString();
  schtSetExecNode();
  schtSetAsyncSendMsgToServer();

D
dapan1121 已提交
384
  SSchJob *pJob = NULL;
D
dapan1121 已提交
385 386 387 388 389 390 391 392
  SSchCallbackParam *param = NULL;
  SHashObj *execTasks = NULL;
  SDataBuf dataBuf = {0};
  uint32_t jobFinished = 0;

  while (!schtTestStop) {
    schtBuildQueryDag(&dag);

H
Haojun Liao 已提交
393
    SArray *qnodeList = taosArrayInit(1, sizeof(SEp));
D
dapan1121 已提交
394

H
Haojun Liao 已提交
395
    SEp qnodeAddr = {0};
D
dapan1121 已提交
396 397 398 399
    strcpy(qnodeAddr.fqdn, "qnode0.ep");
    qnodeAddr.port = 6031;
    taosArrayPush(qnodeList, &qnodeAddr);

D
dapan1121 已提交
400
    code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, "select * from tb", &queryJobRefId);
D
dapan1121 已提交
401 402
    assert(code == 0);

D
dapan1121 已提交
403 404 405 406 407 408 409
    pJob = schAcquireJob(queryJobRefId);
    if (NULL == pJob) {
      taosArrayDestroy(qnodeList);
      schtFreeQueryDag(&dag);
      continue;
    }
    
D
dapan1121 已提交
410
    execTasks = taosHashInit(5, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
D
dapan1121 已提交
411
    void *pIter = taosHashIterate(pJob->execTasks, NULL);
D
dapan1121 已提交
412 413 414 415 416
    while (pIter) {
      SSchTask *task = *(SSchTask **)pIter;
      schtFetchTaskId = task->taskId - 1;
      
      taosHashPut(execTasks, &task->taskId, sizeof(task->taskId), task, sizeof(*task));
D
dapan1121 已提交
417
      pIter = taosHashIterate(pJob->execTasks, pIter);
D
dapan1121 已提交
418 419 420
    }    

    param = (SSchCallbackParam *)calloc(1, sizeof(*param));
D
dapan1121 已提交
421 422
    param->refId = queryJobRefId;
    param->queryId = pJob->queryId;   
D
dapan1121 已提交
423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440

    pIter = taosHashIterate(execTasks, NULL);
    while (pIter) {
      SSchTask *task = (SSchTask *)pIter;

      param->taskId = task->taskId;
      SQueryTableRsp rsp = {0};
      dataBuf.pData = &rsp;
      dataBuf.len = sizeof(rsp);
      
      code = schHandleCallback(param, &dataBuf, TDMT_VND_QUERY_RSP, 0);
      assert(code == 0 || code);

      pIter = taosHashIterate(execTasks, pIter);
    }    


    param = (SSchCallbackParam *)calloc(1, sizeof(*param));
D
dapan1121 已提交
441 442 443
    param->refId = queryJobRefId;
    param->queryId = pJob->queryId;   
    
D
dapan1121 已提交
444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460
    pIter = taosHashIterate(execTasks, NULL);
    while (pIter) {
      SSchTask *task = (SSchTask *)pIter;

      param->taskId = task->taskId;
      SResReadyRsp rsp = {0};
      dataBuf.pData = &rsp;
      dataBuf.len = sizeof(rsp);
      
      code = schHandleCallback(param, &dataBuf, TDMT_VND_RES_READY_RSP, 0);
      assert(code == 0 || code);
      
      pIter = taosHashIterate(execTasks, pIter);
    }  


    param = (SSchCallbackParam *)calloc(1, sizeof(*param));
D
dapan1121 已提交
461 462
    param->refId = queryJobRefId;
    param->queryId = pJob->queryId;   
D
dapan1121 已提交
463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480

    pIter = taosHashIterate(execTasks, NULL);
    while (pIter) {
      SSchTask *task = (SSchTask *)pIter;

      param->taskId = task->taskId - 1;
      SQueryTableRsp rsp = {0};
      dataBuf.pData = &rsp;
      dataBuf.len = sizeof(rsp);
      
      code = schHandleCallback(param, &dataBuf, TDMT_VND_QUERY_RSP, 0);
      assert(code == 0 || code);
      
      pIter = taosHashIterate(execTasks, pIter);
    }    


    param = (SSchCallbackParam *)calloc(1, sizeof(*param));
D
dapan1121 已提交
481 482
    param->refId = queryJobRefId;
    param->queryId = pJob->queryId;   
D
dapan1121 已提交
483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501

    pIter = taosHashIterate(execTasks, NULL);
    while (pIter) {
      SSchTask *task = (SSchTask *)pIter;

      param->taskId = task->taskId - 1;
      SResReadyRsp rsp = {0};
      dataBuf.pData = &rsp;
      dataBuf.len = sizeof(rsp);
      
      code = schHandleCallback(param, &dataBuf, TDMT_VND_RES_READY_RSP, 0);
      assert(code == 0 || code);
      
      pIter = taosHashIterate(execTasks, pIter);
    }  

    atomic_store_32(&schtStartFetch, 1);

    void *data = NULL;  
D
dapan1121 已提交
502
    code = schedulerFetchRows(queryJobRefId, &data);
D
dapan1121 已提交
503 504 505 506 507 508 509 510 511
    assert(code == 0 || code);

    if (0 == code) {
      SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data;
      assert(pRsp->completed == 1);
      assert(pRsp->numOfRows == 10);
    }

    data = NULL;
D
dapan1121 已提交
512
    code = schedulerFetchRows(queryJobRefId, &data);
D
dapan1121 已提交
513 514 515 516 517
    assert(code == 0 || code);
    
    schtFreeQueryJob(0);

    taosHashCleanup(execTasks);
D
dapan1121 已提交
518
    taosArrayDestroy(qnodeList);
D
dapan1121 已提交
519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534

    schtFreeQueryDag(&dag);

    if (++jobFinished % schtTestPrintNum == 0) {
      printf("jobFinished:%d\n", jobFinished);
    }

    ++schtQueryId;
  }

  schedulerDestroy();

}

void* schtFreeJobThread(void *aa) {
  while (!schtTestStop) {
wafwerar's avatar
wafwerar 已提交
535
    taosUsleep(taosRand() % 100);
D
dapan1121 已提交
536 537 538
    schtFreeQueryJob(1);
  }
}
D
dapan1121 已提交
539 540


D
dapan1121 已提交
541 542
}

D
dapan 已提交
543
TEST(queryTest, normalCase) {
D
dapan1121 已提交
544
  void *mockPointer = (void *)0x1;
D
dapan1121 已提交
545 546 547
  char *clusterId = "cluster1";
  char *dbname = "1.db1";
  char *tablename = "table1";
D
dapan1121 已提交
548
  SVgroupInfo vgInfo = {0};
D
dapan1121 已提交
549
  int64_t job = 0;
D
dapan1121 已提交
550
  SQueryDag dag = {0};
D
dapan1121 已提交
551 552 553

  schtInitLogFile();

H
Haojun Liao 已提交
554
  SArray *qnodeList = taosArrayInit(1, sizeof(SEp));
D
dapan 已提交
555

H
Haojun Liao 已提交
556
  SEp qnodeAddr = {0};
D
dapan 已提交
557 558 559
  strcpy(qnodeAddr.fqdn, "qnode0.ep");
  qnodeAddr.port = 6031;
  taosArrayPush(qnodeList, &qnodeAddr);
D
dapan1121 已提交
560 561
  
  int32_t code = schedulerInit(NULL);
D
dapan1121 已提交
562
  ASSERT_EQ(code, 0);
D
dapan1121 已提交
563

D
dapan 已提交
564
  schtBuildQueryDag(&dag);
D
dapan 已提交
565 566 567

  schtSetPlanToString();
  schtSetExecNode();
D
dapan1121 已提交
568
  schtSetAsyncSendMsgToServer();
D
dapan1121 已提交
569
  
D
dapan1121 已提交
570
  code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, "select * from tb", &job);
D
dapan1121 已提交
571
  ASSERT_EQ(code, 0);
D
dapan 已提交
572

D
dapan1121 已提交
573 574 575 576
  
  SSchJob *pJob = schAcquireJob(job);
  
  void *pIter = taosHashIterate(pJob->execTasks, NULL);
D
dapan 已提交
577
  while (pIter) {
D
dapan 已提交
578
    SSchTask *task = *(SSchTask **)pIter;
D
dapan 已提交
579 580

    SQueryTableRsp rsp = {0};
D
dapan1121 已提交
581
    code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
D
dapan 已提交
582 583
    
    ASSERT_EQ(code, 0);
D
dapan1121 已提交
584
    pIter = taosHashIterate(pJob->execTasks, pIter);
D
dapan 已提交
585 586
  }    

D
dapan1121 已提交
587
  pIter = taosHashIterate(pJob->execTasks, NULL);
D
dapan 已提交
588
  while (pIter) {
D
dapan 已提交
589
    SSchTask *task = *(SSchTask **)pIter;
D
dapan 已提交
590 591

    SResReadyRsp rsp = {0};
D
dapan1121 已提交
592
    code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0);
D
dapan1121 已提交
593
    printf("code:%d", code);
D
dapan 已提交
594
    ASSERT_EQ(code, 0);
D
dapan1121 已提交
595
    pIter = taosHashIterate(pJob->execTasks, pIter);
D
dapan 已提交
596 597
  }  

D
dapan1121 已提交
598
  pIter = taosHashIterate(pJob->execTasks, NULL);
D
dapan 已提交
599
  while (pIter) {
D
dapan 已提交
600
    SSchTask *task = *(SSchTask **)pIter;
D
dapan 已提交
601 602

    SQueryTableRsp rsp = {0};
D
dapan1121 已提交
603
    code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
D
dapan 已提交
604 605
    
    ASSERT_EQ(code, 0);
D
dapan1121 已提交
606
    pIter = taosHashIterate(pJob->execTasks, pIter);
D
dapan 已提交
607 608
  }    

D
dapan1121 已提交
609
  pIter = taosHashIterate(pJob->execTasks, NULL);
D
dapan 已提交
610
  while (pIter) {
D
dapan 已提交
611
    SSchTask *task = *(SSchTask **)pIter;
D
dapan 已提交
612 613

    SResReadyRsp rsp = {0};
D
dapan1121 已提交
614
    code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0);
D
dapan 已提交
615 616
    ASSERT_EQ(code, 0);
    
D
dapan1121 已提交
617
    pIter = taosHashIterate(pJob->execTasks, pIter);
D
dapan 已提交
618 619
  }  

D
dapan1121 已提交
620 621
  pthread_attr_t thattr;
  pthread_attr_init(&thattr);
D
dapan 已提交
622

D
dapan1121 已提交
623
  pthread_t thread1;
D
dapan1121 已提交
624
  pthread_create(&(thread1), &thattr, schtCreateFetchRspThread, &job);
D
dapan 已提交
625

D
dapan1121 已提交
626
  void *data = NULL;  
D
dapan1121 已提交
627
  code = schedulerFetchRows(job, &data);
D
dapan 已提交
628 629 630 631 632
  ASSERT_EQ(code, 0);

  SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data;
  ASSERT_EQ(pRsp->completed, 1);
  ASSERT_EQ(pRsp->numOfRows, 10);
D
dapan1121 已提交
633
  tfree(data);
D
dapan 已提交
634 635

  data = NULL;
D
dapan1121 已提交
636
  code = schedulerFetchRows(job, &data);
D
dapan 已提交
637
  ASSERT_EQ(code, 0);
D
dapan1121 已提交
638 639 640
  ASSERT_TRUE(data == NULL);

  schReleaseJob(job);
D
dapan 已提交
641

D
dapan1121 已提交
642
  schedulerFreeJob(job);
D
dapan1121 已提交
643 644

  schtFreeQueryDag(&dag);
D
dapan1121 已提交
645 646

  schedulerDestroy();
D
dapan1121 已提交
647
}
D
dapan1121 已提交
648 649


D
dapan 已提交
650 651 652 653 654 655 656 657 658

TEST(insertTest, normalCase) {
  void *mockPointer = (void *)0x1;
  char *clusterId = "cluster1";
  char *dbname = "1.db1";
  char *tablename = "table1";
  SVgroupInfo vgInfo = {0};
  SQueryDag dag = {0};
  uint64_t numOfRows = 0;
D
dapan1121 已提交
659 660

  schtInitLogFile();
H
Haojun Liao 已提交
661

H
Haojun Liao 已提交
662
  SArray *qnodeList = taosArrayInit(1, sizeof(SEp));
D
dapan 已提交
663

H
Haojun Liao 已提交
664
  SEp qnodeAddr = {0};
D
dapan 已提交
665 666 667 668 669 670 671 672 673 674
  strcpy(qnodeAddr.fqdn, "qnode0.ep");
  qnodeAddr.port = 6031;
  taosArrayPush(qnodeList, &qnodeAddr);
  
  int32_t code = schedulerInit(NULL);
  ASSERT_EQ(code, 0);

  schtBuildInsertDag(&dag);

  schtSetPlanToString();
D
dapan1121 已提交
675
  schtSetAsyncSendMsgToServer();
D
dapan 已提交
676 677 678 679 680

  pthread_attr_t thattr;
  pthread_attr_init(&thattr);

  pthread_t thread1;
D
dapan1121 已提交
681
  pthread_create(&(thread1), &thattr, schtSendRsp, &insertJobRefId);
D
dapan1121 已提交
682 683

  SQueryResult res = {0};
D
dapan1121 已提交
684
  code = schedulerExecJob(mockPointer, qnodeList, &dag, &insertJobRefId, "insert into tb values(now,1)", &res);
D
dapan 已提交
685
  ASSERT_EQ(code, 0);
D
dapan1121 已提交
686
  ASSERT_EQ(res.numOfRows, 20);
D
dapan 已提交
687

D
dapan1121 已提交
688
  schedulerFreeJob(insertJobRefId);
D
dapan1121 已提交
689 690

  schedulerDestroy();  
D
dapan 已提交
691 692
}

D
dapan1121 已提交
693
TEST(multiThread, forceFree) {
D
dapan1121 已提交
694 695
  pthread_attr_t thattr;
  pthread_attr_init(&thattr);
D
dapan 已提交
696

D
dapan1121 已提交
697 698 699 700
  pthread_t thread1, thread2, thread3;
  pthread_create(&(thread1), &thattr, schtRunJobThread, NULL);
  pthread_create(&(thread2), &thattr, schtFreeJobThread, NULL);
  pthread_create(&(thread3), &thattr, schtFetchRspThread, NULL);
D
dapan1121 已提交
701

D
dapan1121 已提交
702 703
  while (true) {
    if (schtTestDeadLoop) {
wafwerar's avatar
wafwerar 已提交
704
      taosSsleep(1);
D
dapan1121 已提交
705
    } else {
wafwerar's avatar
wafwerar 已提交
706
      taosSsleep(schtTestMTRunSec);
D
dapan1121 已提交
707 708 709 710 711
      break;
    }
  }
  
  schtTestStop = true;
wafwerar's avatar
wafwerar 已提交
712
  taosSsleep(3);
D
dapan1121 已提交
713
}
D
dapan 已提交
714

D
dapan1121 已提交
715
int main(int argc, char** argv) {
wafwerar's avatar
wafwerar 已提交
716
  taosSeedRand(time(NULL));
D
dapan1121 已提交
717 718 719 720
  testing::InitGoogleTest(&argc, argv);
  return RUN_ALL_TESTS();
}

D
dapan1121 已提交
721
#pragma GCC diagnostic pop