schedulerTests.cpp 22.6 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include <gtest/gtest.h>
#include <iostream>

19 20 21 22 23 24 25 26 27 28
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
#pragma GCC diagnostic ignored "-Wreturn-type"
#pragma GCC diagnostic ignored "-Wformat"
#include <addr_any.h>


D
dapan1121 已提交
29 30
#include "os.h"

31
#include "tglobal.h"
D
dapan1121 已提交
32 33 34
#include "taos.h"
#include "tdef.h"
#include "tvariant.h"
D
dapan1121 已提交
35 36
#include "catalog.h"
#include "scheduler.h"
H
Haojun Liao 已提交
37 38 39
#include "taos.h"
#include "tdatablock.h"
#include "tdef.h"
D
dapan1121 已提交
40
#include "trpc.h"
H
Haojun Liao 已提交
41
#include "tvariant.h"
S
Shengliang Guan 已提交
42 43 44 45 46 47 48 49 50

#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
#pragma GCC diagnostic ignored "-Wreturn-type"
#pragma GCC diagnostic ignored "-Wformat"

D
dapan 已提交
51 52
#include "schedulerInt.h"
#include "stub.h"
D
dapan1121 已提交
53
#include "tref.h"
D
dapan1121 已提交
54

D
dapan1121 已提交
55
namespace {
D
dapan 已提交
56

D
dapan1121 已提交
57
extern "C" int32_t schHandleResponseMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode);
D
dapan1121 已提交
58 59
extern "C" int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, int32_t rspCode);

D
dapan1121 已提交
60 61
int64_t insertJobRefId = 0;
int64_t queryJobRefId = 0;
D
dapan1121 已提交
62 63 64 65 66 67 68

uint64_t schtMergeTemplateId = 0x4;
uint64_t schtFetchTaskId = 0;
uint64_t schtQueryId = 1;

bool schtTestStop = false;
bool schtTestDeadLoop = false;
D
dapan1121 已提交
69
int32_t schtTestMTRunSec = 10;
D
dapan1121 已提交
70 71
int32_t schtTestPrintNum = 1000;
int32_t schtStartFetch = 0;
D
dapan 已提交
72

D
dapan1121 已提交
73

D
dapan1121 已提交
74 75 76 77 78 79
void schtInitLogFile() {
  const char    *defaultLogFileNamePrefix = "taoslog";
  const int32_t  maxLogFileNum = 10;

  tsAsyncLog = 0;
  qDebugFlag = 159;
D
dapan1121 已提交
80
  strcpy(tsLogDir, "/var/log/taos");
D
dapan1121 已提交
81

S
Shengliang Guan 已提交
82
  if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) {
S
os env  
Shengliang Guan 已提交
83
    printf("failed to open log file in directory:%s\n", tsLogDir);
D
dapan1121 已提交
84 85 86 87 88
  }

}


X
Xiaoyu Wang 已提交
89
void schtBuildQueryDag(SQueryPlan *dag) {
D
dapan1121 已提交
90
  uint64_t qId = schtQueryId;
D
dapan1121 已提交
91 92 93
  
  dag->queryId = qId;
  dag->numOfSubplans = 2;
X
Xiaoyu Wang 已提交
94 95 96
  dag->pSubplans = nodesMakeList();
  SNodeListNode *scan = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST);
  SNodeListNode *merge = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST);
D
dapan1121 已提交
97
  
D
dapan1121 已提交
98 99 100 101
  SSubplan *scanPlan = (SSubplan *)calloc(1, sizeof(SSubplan));
  SSubplan *mergePlan = (SSubplan *)calloc(1, sizeof(SSubplan));

  scanPlan->id.queryId = qId;
X
Xiaoyu Wang 已提交
102
  scanPlan->id.groupId = 0x0000000000000002;
D
dapan1121 已提交
103
  scanPlan->id.subplanId = 0x0000000000000003;
X
Xiaoyu Wang 已提交
104
  scanPlan->subplanType = SUBPLAN_TYPE_SCAN;
H
Haojun Liao 已提交
105

D
dapan1121 已提交
106
  scanPlan->execNode.nodeId = 1;
H
Haojun Liao 已提交
107 108 109
  scanPlan->execNode.epset.inUse = 0;
  addEpIntoEpSet(&scanPlan->execNode.epset, "ep0", 6030);

D
dapan1121 已提交
110 111
  scanPlan->pChildren = NULL;
  scanPlan->level = 1;
X
Xiaoyu Wang 已提交
112
  scanPlan->pParents = nodesMakeList();
X
Xiaoyu Wang 已提交
113
  scanPlan->pNode = (SPhysiNode*)calloc(1, sizeof(SPhysiNode));
D
dapan1121 已提交
114
  scanPlan->msgType = TDMT_VND_QUERY;
D
dapan1121 已提交
115 116

  mergePlan->id.queryId = qId;
X
Xiaoyu Wang 已提交
117
  mergePlan->id.groupId = schtMergeTemplateId;
X
Xiaoyu Wang 已提交
118 119
  mergePlan->id.subplanId = 0x5555;
  mergePlan->subplanType = SUBPLAN_TYPE_MERGE;
D
dapan1121 已提交
120
  mergePlan->level = 0;
H
Haojun Liao 已提交
121 122
  mergePlan->execNode.epset.numOfEps = 0;

X
Xiaoyu Wang 已提交
123
  mergePlan->pChildren = nodesMakeList();
D
dapan1121 已提交
124
  mergePlan->pParents = NULL;
X
Xiaoyu Wang 已提交
125
  mergePlan->pNode = (SPhysiNode*)calloc(1, sizeof(SPhysiNode));
D
dapan1121 已提交
126
  mergePlan->msgType = TDMT_VND_QUERY;
D
dapan1121 已提交
127

D
dapan 已提交
128 129 130
  merge->pNodeList = nodesMakeList();
  scan->pNodeList = nodesMakeList();

X
Xiaoyu Wang 已提交
131 132
  nodesListAppend(merge->pNodeList, (SNode*)mergePlan);
  nodesListAppend(scan->pNodeList, (SNode*)scanPlan);
D
dapan1121 已提交
133

X
Xiaoyu Wang 已提交
134 135
  nodesListAppend(mergePlan->pChildren, (SNode*)scanPlan);
  nodesListAppend(scanPlan->pParents, (SNode*)mergePlan);
D
dapan1121 已提交
136

X
Xiaoyu Wang 已提交
137 138
  nodesListAppend(dag->pSubplans, (SNode*)merge);  
  nodesListAppend(dag->pSubplans, (SNode*)scan);
D
dapan1121 已提交
139 140
}

D
dapan 已提交
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) {
  uint64_t qId = schtQueryId;
  int32_t scanPlanNum = 20;
  
  dag->queryId = qId;
  dag->numOfSubplans = 2;
  dag->pSubplans = nodesMakeList();
  SNodeListNode *scan = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST);
  SNodeListNode *merge = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST);
  
  SSubplan *scanPlan = (SSubplan *)calloc(scanPlanNum, sizeof(SSubplan));
  SSubplan *mergePlan = (SSubplan *)calloc(1, sizeof(SSubplan));

  merge->pNodeList = nodesMakeList();
  scan->pNodeList = nodesMakeList();

  mergePlan->pChildren = nodesMakeList();

  for (int32_t i = 0; i < scanPlanNum; ++i) {
    scanPlan[i].id.queryId = qId;
X
Xiaoyu Wang 已提交
161
    scanPlan[i].id.groupId = 0x0000000000000002;
D
dapan 已提交
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
    scanPlan[i].id.subplanId = 0x0000000000000003 + i;
    scanPlan[i].subplanType = SUBPLAN_TYPE_SCAN;

    scanPlan[i].execNode.nodeId = 1 + i;
    scanPlan[i].execNode.epset.inUse = 0;
    scanPlan[i].execNodeStat.tableNum = rand() % 30;
    addEpIntoEpSet(&scanPlan[i].execNode.epset, "ep0", 6030);
    addEpIntoEpSet(&scanPlan[i].execNode.epset, "ep1", 6030);
    addEpIntoEpSet(&scanPlan[i].execNode.epset, "ep2", 6030);
    scanPlan[i].execNode.epset.inUse = rand() % 3;

    scanPlan[i].pChildren = NULL;
    scanPlan[i].level = 1;
    scanPlan[i].pParents = nodesMakeList();
    scanPlan[i].pNode = (SPhysiNode*)calloc(1, sizeof(SPhysiNode));
    scanPlan[i].msgType = TDMT_VND_QUERY;

    nodesListAppend(scanPlan[i].pParents, (SNode*)mergePlan);
    nodesListAppend(mergePlan->pChildren, (SNode*)(scanPlan + i));

    nodesListAppend(scan->pNodeList, (SNode*)(scanPlan + i));
  }

  mergePlan->id.queryId = qId;
X
Xiaoyu Wang 已提交
186
  mergePlan->id.groupId = schtMergeTemplateId;
D
dapan 已提交
187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202
  mergePlan->id.subplanId = 0x5555;
  mergePlan->subplanType = SUBPLAN_TYPE_MERGE;
  mergePlan->level = 0;
  mergePlan->execNode.epset.numOfEps = 0;

  mergePlan->pParents = NULL;
  mergePlan->pNode = (SPhysiNode*)calloc(1, sizeof(SPhysiNode));
  mergePlan->msgType = TDMT_VND_QUERY;

  nodesListAppend(merge->pNodeList, (SNode*)mergePlan);

  nodesListAppend(dag->pSubplans, (SNode*)merge);  
  nodesListAppend(dag->pSubplans, (SNode*)scan);
}


X
Xiaoyu Wang 已提交
203
void schtFreeQueryDag(SQueryPlan *dag) {
D
dapan1121 已提交
204 205 206 207

}


X
Xiaoyu Wang 已提交
208
void schtBuildInsertDag(SQueryPlan *dag) {
D
dapan 已提交
209 210 211 212
  uint64_t qId = 0x0000000000000002;
  
  dag->queryId = qId;
  dag->numOfSubplans = 2;
X
Xiaoyu Wang 已提交
213 214
  dag->pSubplans = nodesMakeList();
  SNodeListNode *inserta = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST);
D
dapan 已提交
215
  
D
dapan1121 已提交
216
  SSubplan *insertPlan = (SSubplan *)calloc(2, sizeof(SSubplan));
D
dapan 已提交
217 218

  insertPlan[0].id.queryId = qId;
X
Xiaoyu Wang 已提交
219
  insertPlan[0].id.groupId = 0x0000000000000003;
D
dapan 已提交
220
  insertPlan[0].id.subplanId = 0x0000000000000004;
X
Xiaoyu Wang 已提交
221
  insertPlan[0].subplanType = SUBPLAN_TYPE_MODIFY;
D
dapan 已提交
222
  insertPlan[0].level = 0;
H
Haojun Liao 已提交
223

D
dapan1121 已提交
224
  insertPlan[0].execNode.nodeId = 1;
H
Haojun Liao 已提交
225 226 227
  insertPlan[0].execNode.epset.inUse = 0;
  addEpIntoEpSet(&insertPlan[0].execNode.epset, "ep0", 6030);

H
Haojun Liao 已提交
228
  insertPlan[0].pChildren = NULL;
D
dapan 已提交
229 230
  insertPlan[0].pParents = NULL;
  insertPlan[0].pNode = NULL;
X
Xiaoyu Wang 已提交
231
  insertPlan[0].pDataSink = (SDataSinkNode*)calloc(1, sizeof(SDataSinkNode));
D
dapan1121 已提交
232
  insertPlan[0].msgType = TDMT_VND_SUBMIT;
D
dapan 已提交
233 234

  insertPlan[1].id.queryId = qId;
X
Xiaoyu Wang 已提交
235
  insertPlan[1].id.groupId = 0x0000000000000003;
D
dapan 已提交
236
  insertPlan[1].id.subplanId = 0x0000000000000005;
X
Xiaoyu Wang 已提交
237
  insertPlan[1].subplanType = SUBPLAN_TYPE_MODIFY;
D
dapan 已提交
238
  insertPlan[1].level = 0;
H
Haojun Liao 已提交
239

D
dapan1121 已提交
240
  insertPlan[1].execNode.nodeId = 1;
H
Haojun Liao 已提交
241 242 243
  insertPlan[1].execNode.epset.inUse = 0;
  addEpIntoEpSet(&insertPlan[1].execNode.epset, "ep0", 6030);

H
Haojun Liao 已提交
244
  insertPlan[1].pChildren = NULL;
D
dapan 已提交
245 246
  insertPlan[1].pParents = NULL;
  insertPlan[1].pNode = NULL;
X
Xiaoyu Wang 已提交
247
  insertPlan[1].pDataSink = (SDataSinkNode*)calloc(1, sizeof(SDataSinkNode));
D
dapan1121 已提交
248
  insertPlan[1].msgType = TDMT_VND_SUBMIT;
D
dapan 已提交
249

D
dapan 已提交
250 251
  inserta->pNodeList = nodesMakeList();

X
Xiaoyu Wang 已提交
252
  nodesListAppend(inserta->pNodeList, (SNode*)insertPlan);
D
dapan1121 已提交
253
  insertPlan += 1;
X
Xiaoyu Wang 已提交
254
  nodesListAppend(inserta->pNodeList, (SNode*)insertPlan);
D
dapan 已提交
255

X
Xiaoyu Wang 已提交
256
  nodesListAppend(dag->pSubplans, (SNode*)inserta);  
D
dapan 已提交
257 258 259
}


D
dapan 已提交
260 261 262 263 264 265
int32_t schtPlanToString(const SSubplan *subplan, char** str, int32_t* len) {
  *str = (char *)calloc(1, 20);
  *len = 20;
  return 0;
}

X
Xiaoyu Wang 已提交
266
void schtExecNode(SSubplan* subplan, uint64_t groupId, SQueryNodeAddr* ep) {
H
Haojun Liao 已提交
267

D
dapan 已提交
268 269
}

D
dapan1121 已提交
270 271 272 273
void schtRpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) {

}

D
dapan 已提交
274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299
void schtSetPlanToString() {
  static Stub stub;
  stub.set(qSubPlanToString, schtPlanToString);
  {
    AddrAny any("libplanner.so");
    std::map<std::string,void*> result;
    any.get_global_func_addr_dynsym("^qSubPlanToString$", result);
    for (const auto& f : result) {
      stub.set(f.second, schtPlanToString);
    }
  }
}

void schtSetExecNode() {
  static Stub stub;
  stub.set(qSetSubplanExecutionNode, schtExecNode);
  {
    AddrAny any("libplanner.so");
    std::map<std::string,void*> result;
    any.get_global_func_addr_dynsym("^qSetSubplanExecutionNode$", result);
    for (const auto& f : result) {
      stub.set(f.second, schtExecNode);
    }
  }
}

D
dapan1121 已提交
300 301 302 303 304 305 306 307 308 309 310 311 312
void schtSetRpcSendRequest() {
  static Stub stub;
  stub.set(rpcSendRequest, schtRpcSendRequest);
  {
    AddrAny any("libtransport.so");
    std::map<std::string,void*> result;
    any.get_global_func_addr_dynsym("^rpcSendRequest$", result);
    for (const auto& f : result) {
      stub.set(f.second, schtRpcSendRequest);
    }
  }
}

D
dapan1121 已提交
313 314 315 316 317 318
int32_t schtAsyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo) {
  if (pInfo) {
    tfree(pInfo->param);
    tfree(pInfo->msgInfo.pData);
    free(pInfo);
  }
D
dapan1121 已提交
319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335
  return 0;
}


void schtSetAsyncSendMsgToServer() {
  static Stub stub;
  stub.set(asyncSendMsgToServer, schtAsyncSendMsgToServer);
  {
    AddrAny any("libtransport.so");
    std::map<std::string,void*> result;
    any.get_global_func_addr_dynsym("^asyncSendMsgToServer$", result);
    for (const auto& f : result) {
      stub.set(f.second, schtAsyncSendMsgToServer);
    }
  }
}

D
dapan1121 已提交
336

D
dapan 已提交
337
void *schtSendRsp(void *param) {
D
dapan1121 已提交
338 339
  SSchJob *pJob = NULL;
  int64_t job = 0;
D
dapan 已提交
340 341 342
  int32_t code = 0;

  while (true) {
D
dapan1121 已提交
343
    job = *(int64_t *)param;
D
dapan 已提交
344 345 346 347 348 349
    if (job) {
      break;
    }

    usleep(1000);
  }
D
dapan1121 已提交
350 351

  pJob = schAcquireJob(job);
D
dapan 已提交
352
  
D
dapan1121 已提交
353
  void *pIter = taosHashIterate(pJob->execTasks, NULL);
D
dapan 已提交
354 355 356
  while (pIter) {
    SSchTask *task = *(SSchTask **)pIter;

S
Shengliang Guan 已提交
357
    SSubmitRsp rsp = {0};
D
dapan 已提交
358
    rsp.affectedRows = 10;
D
dapan1121 已提交
359
    schHandleResponseMsg(pJob, task, TDMT_VND_SUBMIT_RSP, (char *)&rsp, sizeof(rsp), 0);
D
dapan 已提交
360
    
D
dapan1121 已提交
361
    pIter = taosHashIterate(pJob->execTasks, pIter);
D
dapan 已提交
362 363
  }    

D
dapan1121 已提交
364 365
  schReleaseJob(job);

D
dapan 已提交
366 367 368
  return NULL;
}

D
dapan1121 已提交
369
void *schtCreateFetchRspThread(void *param) {
D
dapan1121 已提交
370 371
  int64_t job = *(int64_t *)param;
  SSchJob* pJob = schAcquireJob(job);
D
dapan1121 已提交
372 373 374 375 376 377 378

  sleep(1);

  int32_t code = 0;
  SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)calloc(1, sizeof(SRetrieveTableRsp));
  rsp->completed = 1;
  rsp->numOfRows = 10;
D
dapan1121 已提交
379
 
D
dapan1121 已提交
380 381 382 383
  code = schHandleResponseMsg(pJob, pJob->fetchTask, TDMT_VND_FETCH_RSP, (char *)rsp, sizeof(*rsp), 0);

  schReleaseJob(job);
  
D
dapan1121 已提交
384 385 386 387
  assert(code == 0);
}


D
dapan1121 已提交
388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419
void *schtFetchRspThread(void *aa) {
  SDataBuf dataBuf = {0};
  SSchCallbackParam* param = NULL;

  while (!schtTestStop) {
    if (0 == atomic_val_compare_exchange_32(&schtStartFetch, 1, 0)) {
      continue;
    }

    usleep(1);
    
    param = (SSchCallbackParam *)calloc(1, sizeof(*param));

    param->queryId = schtQueryId;  
    param->taskId = schtFetchTaskId;

    int32_t code = 0;
    SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)calloc(1, sizeof(SRetrieveTableRsp));
    rsp->completed = 1;
    rsp->numOfRows = 10;

    dataBuf.pData = rsp;
    dataBuf.len = sizeof(*rsp);

    code = schHandleCallback(param, &dataBuf, TDMT_VND_FETCH_RSP, 0);
      
    assert(code == 0 || code);
  }
}

void schtFreeQueryJob(int32_t freeThread) {
  static uint32_t freeNum = 0;
D
dapan1121 已提交
420
  int64_t job = queryJobRefId;
D
dapan1121 已提交
421
  
D
dapan1121 已提交
422
  if (job && atomic_val_compare_exchange_64(&queryJobRefId, job, 0)) {
D
dapan1121 已提交
423
    schedulerFreeJob(job);
D
dapan1121 已提交
424 425 426 427 428 429 430 431 432 433 434 435 436 437
    if (freeThread) {
      if (++freeNum % schtTestPrintNum == 0) {
        printf("FreeNum:%d\n", freeNum);
      }
    }
  }
}

void* schtRunJobThread(void *aa) {
  void *mockPointer = (void *)0x1;
  char *clusterId = "cluster1";
  char *dbname = "1.db1";
  char *tablename = "table1";
  SVgroupInfo vgInfo = {0};
X
Xiaoyu Wang 已提交
438
  SQueryPlan dag;
D
dapan1121 已提交
439 440 441 442 443 444 445 446 447 448 449 450

  schtInitLogFile();

  
  int32_t code = schedulerInit(NULL);
  assert(code == 0);


  schtSetPlanToString();
  schtSetExecNode();
  schtSetAsyncSendMsgToServer();

D
dapan1121 已提交
451
  SSchJob *pJob = NULL;
D
dapan1121 已提交
452 453 454 455 456 457 458 459
  SSchCallbackParam *param = NULL;
  SHashObj *execTasks = NULL;
  SDataBuf dataBuf = {0};
  uint32_t jobFinished = 0;

  while (!schtTestStop) {
    schtBuildQueryDag(&dag);

H
Haojun Liao 已提交
460
    SArray *qnodeList = taosArrayInit(1, sizeof(SEp));
D
dapan1121 已提交
461

H
Haojun Liao 已提交
462
    SEp qnodeAddr = {0};
D
dapan1121 已提交
463 464 465 466
    strcpy(qnodeAddr.fqdn, "qnode0.ep");
    qnodeAddr.port = 6031;
    taosArrayPush(qnodeList, &qnodeAddr);

D
dapan1121 已提交
467
    code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, "select * from tb", &queryJobRefId);
D
dapan1121 已提交
468 469
    assert(code == 0);

D
dapan1121 已提交
470 471 472 473 474 475 476
    pJob = schAcquireJob(queryJobRefId);
    if (NULL == pJob) {
      taosArrayDestroy(qnodeList);
      schtFreeQueryDag(&dag);
      continue;
    }
    
D
dapan1121 已提交
477
    execTasks = taosHashInit(5, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
D
dapan1121 已提交
478
    void *pIter = taosHashIterate(pJob->execTasks, NULL);
D
dapan1121 已提交
479 480 481 482 483
    while (pIter) {
      SSchTask *task = *(SSchTask **)pIter;
      schtFetchTaskId = task->taskId - 1;
      
      taosHashPut(execTasks, &task->taskId, sizeof(task->taskId), task, sizeof(*task));
D
dapan1121 已提交
484
      pIter = taosHashIterate(pJob->execTasks, pIter);
D
dapan1121 已提交
485 486 487
    }    

    param = (SSchCallbackParam *)calloc(1, sizeof(*param));
D
dapan1121 已提交
488 489
    param->refId = queryJobRefId;
    param->queryId = pJob->queryId;   
D
dapan1121 已提交
490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507

    pIter = taosHashIterate(execTasks, NULL);
    while (pIter) {
      SSchTask *task = (SSchTask *)pIter;

      param->taskId = task->taskId;
      SQueryTableRsp rsp = {0};
      dataBuf.pData = &rsp;
      dataBuf.len = sizeof(rsp);
      
      code = schHandleCallback(param, &dataBuf, TDMT_VND_QUERY_RSP, 0);
      assert(code == 0 || code);

      pIter = taosHashIterate(execTasks, pIter);
    }    


    param = (SSchCallbackParam *)calloc(1, sizeof(*param));
D
dapan1121 已提交
508 509 510
    param->refId = queryJobRefId;
    param->queryId = pJob->queryId;   
    
D
dapan1121 已提交
511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527
    pIter = taosHashIterate(execTasks, NULL);
    while (pIter) {
      SSchTask *task = (SSchTask *)pIter;

      param->taskId = task->taskId;
      SResReadyRsp rsp = {0};
      dataBuf.pData = &rsp;
      dataBuf.len = sizeof(rsp);
      
      code = schHandleCallback(param, &dataBuf, TDMT_VND_RES_READY_RSP, 0);
      assert(code == 0 || code);
      
      pIter = taosHashIterate(execTasks, pIter);
    }  


    param = (SSchCallbackParam *)calloc(1, sizeof(*param));
D
dapan1121 已提交
528 529
    param->refId = queryJobRefId;
    param->queryId = pJob->queryId;   
D
dapan1121 已提交
530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547

    pIter = taosHashIterate(execTasks, NULL);
    while (pIter) {
      SSchTask *task = (SSchTask *)pIter;

      param->taskId = task->taskId - 1;
      SQueryTableRsp rsp = {0};
      dataBuf.pData = &rsp;
      dataBuf.len = sizeof(rsp);
      
      code = schHandleCallback(param, &dataBuf, TDMT_VND_QUERY_RSP, 0);
      assert(code == 0 || code);
      
      pIter = taosHashIterate(execTasks, pIter);
    }    


    param = (SSchCallbackParam *)calloc(1, sizeof(*param));
D
dapan1121 已提交
548 549
    param->refId = queryJobRefId;
    param->queryId = pJob->queryId;   
D
dapan1121 已提交
550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568

    pIter = taosHashIterate(execTasks, NULL);
    while (pIter) {
      SSchTask *task = (SSchTask *)pIter;

      param->taskId = task->taskId - 1;
      SResReadyRsp rsp = {0};
      dataBuf.pData = &rsp;
      dataBuf.len = sizeof(rsp);
      
      code = schHandleCallback(param, &dataBuf, TDMT_VND_RES_READY_RSP, 0);
      assert(code == 0 || code);
      
      pIter = taosHashIterate(execTasks, pIter);
    }  

    atomic_store_32(&schtStartFetch, 1);

    void *data = NULL;  
D
dapan1121 已提交
569
    code = schedulerFetchRows(queryJobRefId, &data);
D
dapan1121 已提交
570 571 572 573 574 575 576 577 578
    assert(code == 0 || code);

    if (0 == code) {
      SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data;
      assert(pRsp->completed == 1);
      assert(pRsp->numOfRows == 10);
    }

    data = NULL;
D
dapan1121 已提交
579
    code = schedulerFetchRows(queryJobRefId, &data);
D
dapan1121 已提交
580 581 582 583 584
    assert(code == 0 || code);
    
    schtFreeQueryJob(0);

    taosHashCleanup(execTasks);
D
dapan1121 已提交
585
    taosArrayDestroy(qnodeList);
D
dapan1121 已提交
586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601

    schtFreeQueryDag(&dag);

    if (++jobFinished % schtTestPrintNum == 0) {
      printf("jobFinished:%d\n", jobFinished);
    }

    ++schtQueryId;
  }

  schedulerDestroy();

}

void* schtFreeJobThread(void *aa) {
  while (!schtTestStop) {
D
dapan1121 已提交
602
    usleep(rand() % 100);
D
dapan1121 已提交
603 604 605
    schtFreeQueryJob(1);
  }
}
D
dapan1121 已提交
606 607


D
dapan1121 已提交
608 609
}

D
dapan 已提交
610
TEST(queryTest, normalCase) {
D
dapan1121 已提交
611
  void *mockPointer = (void *)0x1;
D
dapan1121 已提交
612 613 614
  char *clusterId = "cluster1";
  char *dbname = "1.db1";
  char *tablename = "table1";
D
dapan1121 已提交
615
  SVgroupInfo vgInfo = {0};
D
dapan1121 已提交
616
  int64_t job = 0;
H
Haojun Liao 已提交
617
  SQueryPlan dag;
D
dapan1121 已提交
618

619
  memset(&dag, 0, sizeof(dag));
D
dapan1121 已提交
620

H
Haojun Liao 已提交
621
  SArray *qnodeList = taosArrayInit(1, sizeof(SEp));
D
dapan 已提交
622

H
Haojun Liao 已提交
623
  SEp qnodeAddr = {0};
D
dapan 已提交
624 625 626
  strcpy(qnodeAddr.fqdn, "qnode0.ep");
  qnodeAddr.port = 6031;
  taosArrayPush(qnodeList, &qnodeAddr);
D
dapan1121 已提交
627 628
  
  int32_t code = schedulerInit(NULL);
D
dapan1121 已提交
629
  ASSERT_EQ(code, 0);
D
dapan1121 已提交
630

D
dapan 已提交
631
  schtBuildQueryDag(&dag);
D
dapan 已提交
632 633 634

  schtSetPlanToString();
  schtSetExecNode();
D
dapan1121 已提交
635
  schtSetAsyncSendMsgToServer();
D
dapan1121 已提交
636
  
D
dapan1121 已提交
637
  code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, "select * from tb", &job);
D
dapan1121 已提交
638
  ASSERT_EQ(code, 0);
D
dapan 已提交
639

D
dapan1121 已提交
640 641 642 643
  
  SSchJob *pJob = schAcquireJob(job);
  
  void *pIter = taosHashIterate(pJob->execTasks, NULL);
D
dapan 已提交
644
  while (pIter) {
D
dapan 已提交
645
    SSchTask *task = *(SSchTask **)pIter;
D
dapan 已提交
646 647

    SQueryTableRsp rsp = {0};
D
dapan1121 已提交
648
    code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
D
dapan 已提交
649 650
    
    ASSERT_EQ(code, 0);
D
dapan1121 已提交
651
    pIter = taosHashIterate(pJob->execTasks, pIter);
D
dapan 已提交
652 653
  }    

D
dapan1121 已提交
654
  pIter = taosHashIterate(pJob->execTasks, NULL);
D
dapan 已提交
655
  while (pIter) {
D
dapan 已提交
656
    SSchTask *task = *(SSchTask **)pIter;
D
dapan 已提交
657 658

    SResReadyRsp rsp = {0};
D
dapan1121 已提交
659
    code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0);
D
dapan1121 已提交
660
    printf("code:%d", code);
D
dapan 已提交
661
    ASSERT_EQ(code, 0);
D
dapan1121 已提交
662
    pIter = taosHashIterate(pJob->execTasks, pIter);
D
dapan 已提交
663 664
  }  

D
dapan1121 已提交
665
  pIter = taosHashIterate(pJob->execTasks, NULL);
D
dapan 已提交
666
  while (pIter) {
D
dapan 已提交
667
    SSchTask *task = *(SSchTask **)pIter;
D
dapan 已提交
668 669

    SQueryTableRsp rsp = {0};
D
dapan1121 已提交
670
    code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
D
dapan 已提交
671 672
    
    ASSERT_EQ(code, 0);
D
dapan1121 已提交
673
    pIter = taosHashIterate(pJob->execTasks, pIter);
D
dapan 已提交
674 675
  }    

D
dapan1121 已提交
676
  pIter = taosHashIterate(pJob->execTasks, NULL);
D
dapan 已提交
677
  while (pIter) {
D
dapan 已提交
678
    SSchTask *task = *(SSchTask **)pIter;
D
dapan 已提交
679 680

    SResReadyRsp rsp = {0};
D
dapan1121 已提交
681
    code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0);
D
dapan 已提交
682 683
    ASSERT_EQ(code, 0);
    
D
dapan1121 已提交
684
    pIter = taosHashIterate(pJob->execTasks, pIter);
D
dapan 已提交
685 686
  }  

D
dapan1121 已提交
687 688
  pthread_attr_t thattr;
  pthread_attr_init(&thattr);
D
dapan 已提交
689

D
dapan1121 已提交
690
  pthread_t thread1;
D
dapan1121 已提交
691
  pthread_create(&(thread1), &thattr, schtCreateFetchRspThread, &job);
D
dapan 已提交
692

D
dapan1121 已提交
693
  void *data = NULL;  
D
dapan1121 已提交
694
  code = schedulerFetchRows(job, &data);
D
dapan 已提交
695 696 697 698 699
  ASSERT_EQ(code, 0);

  SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data;
  ASSERT_EQ(pRsp->completed, 1);
  ASSERT_EQ(pRsp->numOfRows, 10);
D
dapan1121 已提交
700
  tfree(data);
D
dapan 已提交
701 702

  data = NULL;
D
dapan1121 已提交
703
  code = schedulerFetchRows(job, &data);
D
dapan 已提交
704
  ASSERT_EQ(code, 0);
D
dapan1121 已提交
705 706 707
  ASSERT_TRUE(data == NULL);

  schReleaseJob(job);
D
dapan 已提交
708

D
dapan1121 已提交
709
  schedulerFreeJob(job);
D
dapan1121 已提交
710 711

  schtFreeQueryDag(&dag);
D
dapan1121 已提交
712 713

  schedulerDestroy();
D
dapan1121 已提交
714
}
D
dapan1121 已提交
715

D
dapan 已提交
716 717 718 719 720 721 722 723 724 725 726
TEST(queryTest, flowCtrlCase) {
  void *mockPointer = (void *)0x1;
  char *clusterId = "cluster1";
  char *dbname = "1.db1";
  char *tablename = "table1";
  SVgroupInfo vgInfo = {0};
  int64_t job = 0;
  SQueryPlan dag;

  schtInitLogFile();

D
dapan 已提交
727 728
  srand(time(NULL));
  
D
dapan 已提交
729 730 731 732 733 734 735 736 737 738
  SArray *qnodeList = taosArrayInit(1, sizeof(SEp));

  SEp qnodeAddr = {0};
  strcpy(qnodeAddr.fqdn, "qnode0.ep");
  qnodeAddr.port = 6031;
  taosArrayPush(qnodeList, &qnodeAddr);
  
  int32_t code = schedulerInit(NULL);
  ASSERT_EQ(code, 0);

D
dapan 已提交
739
  schtBuildQueryFlowCtrlDag(&dag);
D
dapan 已提交
740 741 742 743 744 745 746 747 748 749 750

  schtSetPlanToString();
  schtSetExecNode();
  schtSetAsyncSendMsgToServer();
  
  code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, "select * from tb", &job);
  ASSERT_EQ(code, 0);

  
  SSchJob *pJob = schAcquireJob(job);

D
dapan 已提交
751 752 753 754 755 756 757
  bool queryDone = false;
  
  while (!queryDone) {
    void *pIter = taosHashIterate(pJob->execTasks, NULL);
    if (NULL == pIter) {
      break;
    }
D
dapan 已提交
758
    
D
dapan 已提交
759 760
    while (pIter) {
      SSchTask *task = *(SSchTask **)pIter;
D
dapan 已提交
761

D
dapan 已提交
762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780
      taosHashCancelIterate(pJob->execTasks, pIter);

      if (task->lastMsgType == TDMT_VND_QUERY) {
        SQueryTableRsp rsp = {0};
        code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
        
        ASSERT_EQ(code, 0);
      } else if (task->lastMsgType == TDMT_VND_RES_READY) {
        SResReadyRsp rsp = {0};
        code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0);
        ASSERT_EQ(code, 0);
      } else {
        queryDone = true;
        break;
      }
      
      pIter = NULL;
    }    
  }
D
dapan 已提交
781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810


  pthread_attr_t thattr;
  pthread_attr_init(&thattr);

  pthread_t thread1;
  pthread_create(&(thread1), &thattr, schtCreateFetchRspThread, &job);

  void *data = NULL;  
  code = schedulerFetchRows(job, &data);
  ASSERT_EQ(code, 0);

  SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data;
  ASSERT_EQ(pRsp->completed, 1);
  ASSERT_EQ(pRsp->numOfRows, 10);
  tfree(data);

  data = NULL;
  code = schedulerFetchRows(job, &data);
  ASSERT_EQ(code, 0);
  ASSERT_TRUE(data == NULL);

  schReleaseJob(job);

  schedulerFreeJob(job);

  schtFreeQueryDag(&dag);

  schedulerDestroy();
}
D
dapan1121 已提交
811

D
dapan 已提交
812 813 814 815 816 817 818

TEST(insertTest, normalCase) {
  void *mockPointer = (void *)0x1;
  char *clusterId = "cluster1";
  char *dbname = "1.db1";
  char *tablename = "table1";
  SVgroupInfo vgInfo = {0};
X
Xiaoyu Wang 已提交
819
  SQueryPlan dag;
D
dapan 已提交
820
  uint64_t numOfRows = 0;
D
dapan1121 已提交
821

H
Haojun Liao 已提交
822
  SArray *qnodeList = taosArrayInit(1, sizeof(SEp));
D
dapan 已提交
823

H
Haojun Liao 已提交
824
  SEp qnodeAddr = {0};
D
dapan 已提交
825 826 827 828 829 830 831 832 833 834
  strcpy(qnodeAddr.fqdn, "qnode0.ep");
  qnodeAddr.port = 6031;
  taosArrayPush(qnodeList, &qnodeAddr);
  
  int32_t code = schedulerInit(NULL);
  ASSERT_EQ(code, 0);

  schtBuildInsertDag(&dag);

  schtSetPlanToString();
D
dapan1121 已提交
835
  schtSetAsyncSendMsgToServer();
D
dapan 已提交
836 837 838 839 840

  pthread_attr_t thattr;
  pthread_attr_init(&thattr);

  pthread_t thread1;
D
dapan1121 已提交
841
  pthread_create(&(thread1), &thattr, schtSendRsp, &insertJobRefId);
D
dapan1121 已提交
842 843

  SQueryResult res = {0};
D
dapan1121 已提交
844
  code = schedulerExecJob(mockPointer, qnodeList, &dag, &insertJobRefId, "insert into tb values(now,1)", &res);
D
dapan 已提交
845
  ASSERT_EQ(code, 0);
D
dapan1121 已提交
846
  ASSERT_EQ(res.numOfRows, 20);
D
dapan 已提交
847

D
dapan1121 已提交
848
  schedulerFreeJob(insertJobRefId);
D
dapan1121 已提交
849 850

  schedulerDestroy();  
D
dapan 已提交
851 852
}

D
dapan1121 已提交
853
TEST(multiThread, forceFree) {
D
dapan1121 已提交
854 855
  pthread_attr_t thattr;
  pthread_attr_init(&thattr);
D
dapan 已提交
856

D
dapan1121 已提交
857 858 859 860
  pthread_t thread1, thread2, thread3;
  pthread_create(&(thread1), &thattr, schtRunJobThread, NULL);
  pthread_create(&(thread2), &thattr, schtFreeJobThread, NULL);
  pthread_create(&(thread3), &thattr, schtFetchRspThread, NULL);
D
dapan1121 已提交
861

D
dapan1121 已提交
862 863 864 865 866 867 868 869 870 871 872
  while (true) {
    if (schtTestDeadLoop) {
      sleep(1);
    } else {
      sleep(schtTestMTRunSec);
      break;
    }
  }
  
  schtTestStop = true;
  sleep(3);
D
dapan1121 已提交
873
}
D
dapan 已提交
874

D
dapan1121 已提交
875
int main(int argc, char** argv) {
D
dapan1121 已提交
876
  srand(time(NULL));
D
dapan1121 已提交
877 878 879 880
  testing::InitGoogleTest(&argc, argv);
  return RUN_ALL_TESTS();
}

D
dapan1121 已提交
881
#pragma GCC diagnostic pop