schedulerTests.cpp 25.3 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include <gtest/gtest.h>
#include <iostream>

19 20 21 22 23 24 25 26 27 28
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
#pragma GCC diagnostic ignored "-Wreturn-type"
#pragma GCC diagnostic ignored "-Wformat"
#include <addr_any.h>


D
dapan1121 已提交
29 30
#include "os.h"

31
#include "tglobal.h"
D
dapan1121 已提交
32 33 34
#include "taos.h"
#include "tdef.h"
#include "tvariant.h"
D
dapan1121 已提交
35 36
#include "catalog.h"
#include "scheduler.h"
H
Haojun Liao 已提交
37 38 39
#include "taos.h"
#include "tdatablock.h"
#include "tdef.h"
D
dapan1121 已提交
40
#include "trpc.h"
H
Haojun Liao 已提交
41
#include "tvariant.h"
S
Shengliang Guan 已提交
42 43 44 45 46 47 48 49 50

#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
#pragma GCC diagnostic ignored "-Wreturn-type"
#pragma GCC diagnostic ignored "-Wformat"

D
dapan 已提交
51 52
#include "schedulerInt.h"
#include "stub.h"
D
dapan1121 已提交
53
#include "tref.h"
D
dapan1121 已提交
54

D
dapan1121 已提交
55
namespace {
D
dapan 已提交
56

D
dapan1121 已提交
57
extern "C" int32_t schHandleResponseMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode);
D
dapan1121 已提交
58 59
extern "C" int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, int32_t rspCode);

D
dapan1121 已提交
60 61
int64_t insertJobRefId = 0;
int64_t queryJobRefId = 0;
D
dapan1121 已提交
62 63 64 65 66 67 68

uint64_t schtMergeTemplateId = 0x4;
uint64_t schtFetchTaskId = 0;
uint64_t schtQueryId = 1;

bool schtTestStop = false;
bool schtTestDeadLoop = false;
D
dapan1121 已提交
69
int32_t schtTestMTRunSec = 10;
D
dapan1121 已提交
70 71
int32_t schtTestPrintNum = 1000;
int32_t schtStartFetch = 0;
D
dapan 已提交
72

D
dapan1121 已提交
73

D
dapan1121 已提交
74 75 76 77 78 79
void schtInitLogFile() {
  const char    *defaultLogFileNamePrefix = "taoslog";
  const int32_t  maxLogFileNum = 10;

  tsAsyncLog = 0;
  qDebugFlag = 159;
D
dapan1121 已提交
80
  strcpy(tsLogDir, "/var/log/taos");
D
dapan1121 已提交
81

S
Shengliang Guan 已提交
82
  if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) {
S
os env  
Shengliang Guan 已提交
83
    printf("failed to open log file in directory:%s\n", tsLogDir);
D
dapan1121 已提交
84 85 86 87 88
  }

}


X
Xiaoyu Wang 已提交
89
void schtBuildQueryDag(SQueryPlan *dag) {
D
dapan1121 已提交
90
  uint64_t qId = schtQueryId;
D
dapan1121 已提交
91 92 93
  
  dag->queryId = qId;
  dag->numOfSubplans = 2;
X
Xiaoyu Wang 已提交
94 95 96
  dag->pSubplans = nodesMakeList();
  SNodeListNode *scan = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST);
  SNodeListNode *merge = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST);
D
dapan1121 已提交
97
  
D
dapan1121 已提交
98 99 100 101
  SSubplan *scanPlan = (SSubplan *)calloc(1, sizeof(SSubplan));
  SSubplan *mergePlan = (SSubplan *)calloc(1, sizeof(SSubplan));

  scanPlan->id.queryId = qId;
X
Xiaoyu Wang 已提交
102
  scanPlan->id.groupId = 0x0000000000000002;
D
dapan1121 已提交
103
  scanPlan->id.subplanId = 0x0000000000000003;
X
Xiaoyu Wang 已提交
104
  scanPlan->subplanType = SUBPLAN_TYPE_SCAN;
H
Haojun Liao 已提交
105

D
dapan1121 已提交
106
  scanPlan->execNode.nodeId = 1;
L
Liu Jicong 已提交
107 108
  scanPlan->execNode.epSet.inUse = 0;
  addEpIntoEpSet(&scanPlan->execNode.epSet, "ep0", 6030);
H
Haojun Liao 已提交
109

D
dapan1121 已提交
110 111
  scanPlan->pChildren = NULL;
  scanPlan->level = 1;
X
Xiaoyu Wang 已提交
112
  scanPlan->pParents = nodesMakeList();
X
Xiaoyu Wang 已提交
113
  scanPlan->pNode = (SPhysiNode*)calloc(1, sizeof(SPhysiNode));
D
dapan1121 已提交
114
  scanPlan->msgType = TDMT_VND_QUERY;
D
dapan1121 已提交
115 116

  mergePlan->id.queryId = qId;
X
Xiaoyu Wang 已提交
117
  mergePlan->id.groupId = schtMergeTemplateId;
X
Xiaoyu Wang 已提交
118 119
  mergePlan->id.subplanId = 0x5555;
  mergePlan->subplanType = SUBPLAN_TYPE_MERGE;
D
dapan1121 已提交
120
  mergePlan->level = 0;
L
Liu Jicong 已提交
121
  mergePlan->execNode.epSet.numOfEps = 0;
H
Haojun Liao 已提交
122

X
Xiaoyu Wang 已提交
123
  mergePlan->pChildren = nodesMakeList();
D
dapan1121 已提交
124
  mergePlan->pParents = NULL;
X
Xiaoyu Wang 已提交
125
  mergePlan->pNode = (SPhysiNode*)calloc(1, sizeof(SPhysiNode));
D
dapan1121 已提交
126
  mergePlan->msgType = TDMT_VND_QUERY;
D
dapan1121 已提交
127

D
dapan 已提交
128 129 130
  merge->pNodeList = nodesMakeList();
  scan->pNodeList = nodesMakeList();

X
Xiaoyu Wang 已提交
131 132
  nodesListAppend(merge->pNodeList, (SNode*)mergePlan);
  nodesListAppend(scan->pNodeList, (SNode*)scanPlan);
D
dapan1121 已提交
133

X
Xiaoyu Wang 已提交
134 135
  nodesListAppend(mergePlan->pChildren, (SNode*)scanPlan);
  nodesListAppend(scanPlan->pParents, (SNode*)mergePlan);
D
dapan1121 已提交
136

X
Xiaoyu Wang 已提交
137 138
  nodesListAppend(dag->pSubplans, (SNode*)merge);  
  nodesListAppend(dag->pSubplans, (SNode*)scan);
D
dapan1121 已提交
139 140
}

D
dapan 已提交
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) {
  uint64_t qId = schtQueryId;
  int32_t scanPlanNum = 20;
  
  dag->queryId = qId;
  dag->numOfSubplans = 2;
  dag->pSubplans = nodesMakeList();
  SNodeListNode *scan = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST);
  SNodeListNode *merge = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST);
  
  SSubplan *scanPlan = (SSubplan *)calloc(scanPlanNum, sizeof(SSubplan));
  SSubplan *mergePlan = (SSubplan *)calloc(1, sizeof(SSubplan));

  merge->pNodeList = nodesMakeList();
  scan->pNodeList = nodesMakeList();

  mergePlan->pChildren = nodesMakeList();

  for (int32_t i = 0; i < scanPlanNum; ++i) {
    scanPlan[i].id.queryId = qId;
X
Xiaoyu Wang 已提交
161
    scanPlan[i].id.groupId = 0x0000000000000002;
D
dapan 已提交
162 163 164 165
    scanPlan[i].id.subplanId = 0x0000000000000003 + i;
    scanPlan[i].subplanType = SUBPLAN_TYPE_SCAN;

    scanPlan[i].execNode.nodeId = 1 + i;
X
Xiaoyu Wang 已提交
166 167 168 169 170 171
    scanPlan[i].execNode.epSet.inUse = 0;
    scanPlan[i].execNodeStat.tableNum = taosRand() % 30;
    addEpIntoEpSet(&scanPlan[i].execNode.epSet, "ep0", 6030);
    addEpIntoEpSet(&scanPlan[i].execNode.epSet, "ep1", 6030);
    addEpIntoEpSet(&scanPlan[i].execNode.epSet, "ep2", 6030);
    scanPlan[i].execNode.epSet.inUse = taosRand() % 3;
D
dapan 已提交
172 173 174 175 176 177 178 179 180 181 182 183 184 185

    scanPlan[i].pChildren = NULL;
    scanPlan[i].level = 1;
    scanPlan[i].pParents = nodesMakeList();
    scanPlan[i].pNode = (SPhysiNode*)calloc(1, sizeof(SPhysiNode));
    scanPlan[i].msgType = TDMT_VND_QUERY;

    nodesListAppend(scanPlan[i].pParents, (SNode*)mergePlan);
    nodesListAppend(mergePlan->pChildren, (SNode*)(scanPlan + i));

    nodesListAppend(scan->pNodeList, (SNode*)(scanPlan + i));
  }

  mergePlan->id.queryId = qId;
X
Xiaoyu Wang 已提交
186
  mergePlan->id.groupId = schtMergeTemplateId;
D
dapan 已提交
187 188 189
  mergePlan->id.subplanId = 0x5555;
  mergePlan->subplanType = SUBPLAN_TYPE_MERGE;
  mergePlan->level = 0;
X
Xiaoyu Wang 已提交
190
  mergePlan->execNode.epSet.numOfEps = 0;
D
dapan 已提交
191 192 193 194 195 196 197 198 199 200 201 202

  mergePlan->pParents = NULL;
  mergePlan->pNode = (SPhysiNode*)calloc(1, sizeof(SPhysiNode));
  mergePlan->msgType = TDMT_VND_QUERY;

  nodesListAppend(merge->pNodeList, (SNode*)mergePlan);

  nodesListAppend(dag->pSubplans, (SNode*)merge);  
  nodesListAppend(dag->pSubplans, (SNode*)scan);
}


X
Xiaoyu Wang 已提交
203
void schtFreeQueryDag(SQueryPlan *dag) {
D
dapan1121 已提交
204 205 206 207

}


X
Xiaoyu Wang 已提交
208
void schtBuildInsertDag(SQueryPlan *dag) {
D
dapan 已提交
209 210 211 212
  uint64_t qId = 0x0000000000000002;
  
  dag->queryId = qId;
  dag->numOfSubplans = 2;
X
Xiaoyu Wang 已提交
213 214
  dag->pSubplans = nodesMakeList();
  SNodeListNode *inserta = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST);
D
dapan 已提交
215
  
D
dapan1121 已提交
216
  SSubplan *insertPlan = (SSubplan *)calloc(2, sizeof(SSubplan));
D
dapan 已提交
217 218

  insertPlan[0].id.queryId = qId;
X
Xiaoyu Wang 已提交
219
  insertPlan[0].id.groupId = 0x0000000000000003;
D
dapan 已提交
220
  insertPlan[0].id.subplanId = 0x0000000000000004;
X
Xiaoyu Wang 已提交
221
  insertPlan[0].subplanType = SUBPLAN_TYPE_MODIFY;
D
dapan 已提交
222
  insertPlan[0].level = 0;
H
Haojun Liao 已提交
223

D
dapan1121 已提交
224
  insertPlan[0].execNode.nodeId = 1;
L
Liu Jicong 已提交
225 226
  insertPlan[0].execNode.epSet.inUse = 0;
  addEpIntoEpSet(&insertPlan[0].execNode.epSet, "ep0", 6030);
H
Haojun Liao 已提交
227

H
Haojun Liao 已提交
228
  insertPlan[0].pChildren = NULL;
D
dapan 已提交
229 230
  insertPlan[0].pParents = NULL;
  insertPlan[0].pNode = NULL;
X
Xiaoyu Wang 已提交
231
  insertPlan[0].pDataSink = (SDataSinkNode*)calloc(1, sizeof(SDataSinkNode));
D
dapan1121 已提交
232
  insertPlan[0].msgType = TDMT_VND_SUBMIT;
D
dapan 已提交
233 234

  insertPlan[1].id.queryId = qId;
X
Xiaoyu Wang 已提交
235
  insertPlan[1].id.groupId = 0x0000000000000003;
D
dapan 已提交
236
  insertPlan[1].id.subplanId = 0x0000000000000005;
X
Xiaoyu Wang 已提交
237
  insertPlan[1].subplanType = SUBPLAN_TYPE_MODIFY;
D
dapan 已提交
238
  insertPlan[1].level = 0;
H
Haojun Liao 已提交
239

D
dapan1121 已提交
240
  insertPlan[1].execNode.nodeId = 1;
L
Liu Jicong 已提交
241 242
  insertPlan[1].execNode.epSet.inUse = 0;
  addEpIntoEpSet(&insertPlan[1].execNode.epSet, "ep0", 6030);
H
Haojun Liao 已提交
243

H
Haojun Liao 已提交
244
  insertPlan[1].pChildren = NULL;
D
dapan 已提交
245 246
  insertPlan[1].pParents = NULL;
  insertPlan[1].pNode = NULL;
X
Xiaoyu Wang 已提交
247
  insertPlan[1].pDataSink = (SDataSinkNode*)calloc(1, sizeof(SDataSinkNode));
D
dapan1121 已提交
248
  insertPlan[1].msgType = TDMT_VND_SUBMIT;
D
dapan 已提交
249

D
dapan 已提交
250 251
  inserta->pNodeList = nodesMakeList();

X
Xiaoyu Wang 已提交
252
  nodesListAppend(inserta->pNodeList, (SNode*)insertPlan);
D
dapan1121 已提交
253
  insertPlan += 1;
X
Xiaoyu Wang 已提交
254
  nodesListAppend(inserta->pNodeList, (SNode*)insertPlan);
D
dapan 已提交
255

X
Xiaoyu Wang 已提交
256
  nodesListAppend(dag->pSubplans, (SNode*)inserta);  
D
dapan 已提交
257 258 259
}


D
dapan 已提交
260 261 262 263 264 265
int32_t schtPlanToString(const SSubplan *subplan, char** str, int32_t* len) {
  *str = (char *)calloc(1, 20);
  *len = 20;
  return 0;
}

X
Xiaoyu Wang 已提交
266
void schtExecNode(SSubplan* subplan, uint64_t groupId, SQueryNodeAddr* ep) {
H
Haojun Liao 已提交
267

D
dapan 已提交
268 269
}

D
dapan1121 已提交
270 271 272 273
void schtRpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) {

}

D
dapan 已提交
274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299
void schtSetPlanToString() {
  static Stub stub;
  stub.set(qSubPlanToString, schtPlanToString);
  {
    AddrAny any("libplanner.so");
    std::map<std::string,void*> result;
    any.get_global_func_addr_dynsym("^qSubPlanToString$", result);
    for (const auto& f : result) {
      stub.set(f.second, schtPlanToString);
    }
  }
}

void schtSetExecNode() {
  static Stub stub;
  stub.set(qSetSubplanExecutionNode, schtExecNode);
  {
    AddrAny any("libplanner.so");
    std::map<std::string,void*> result;
    any.get_global_func_addr_dynsym("^qSetSubplanExecutionNode$", result);
    for (const auto& f : result) {
      stub.set(f.second, schtExecNode);
    }
  }
}

D
dapan1121 已提交
300 301 302 303 304 305 306 307 308 309 310 311 312
void schtSetRpcSendRequest() {
  static Stub stub;
  stub.set(rpcSendRequest, schtRpcSendRequest);
  {
    AddrAny any("libtransport.so");
    std::map<std::string,void*> result;
    any.get_global_func_addr_dynsym("^rpcSendRequest$", result);
    for (const auto& f : result) {
      stub.set(f.second, schtRpcSendRequest);
    }
  }
}

D
dapan1121 已提交
313 314 315 316 317 318
int32_t schtAsyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo) {
  if (pInfo) {
    tfree(pInfo->param);
    tfree(pInfo->msgInfo.pData);
    free(pInfo);
  }
D
dapan1121 已提交
319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335
  return 0;
}


void schtSetAsyncSendMsgToServer() {
  static Stub stub;
  stub.set(asyncSendMsgToServer, schtAsyncSendMsgToServer);
  {
    AddrAny any("libtransport.so");
    std::map<std::string,void*> result;
    any.get_global_func_addr_dynsym("^asyncSendMsgToServer$", result);
    for (const auto& f : result) {
      stub.set(f.second, schtAsyncSendMsgToServer);
    }
  }
}

D
dapan1121 已提交
336

D
dapan 已提交
337
void *schtSendRsp(void *param) {
D
dapan1121 已提交
338 339
  SSchJob *pJob = NULL;
  int64_t job = 0;
D
dapan 已提交
340 341 342
  int32_t code = 0;

  while (true) {
D
dapan1121 已提交
343
    job = *(int64_t *)param;
D
dapan 已提交
344 345 346 347
    if (job) {
      break;
    }

wafwerar's avatar
wafwerar 已提交
348
    taosMsleep(1);
D
dapan 已提交
349
  }
D
dapan1121 已提交
350 351

  pJob = schAcquireJob(job);
D
dapan 已提交
352
  
D
dapan1121 已提交
353
  void *pIter = taosHashIterate(pJob->execTasks, NULL);
D
dapan 已提交
354 355 356
  while (pIter) {
    SSchTask *task = *(SSchTask **)pIter;

S
Shengliang Guan 已提交
357
    SSubmitRsp rsp = {0};
D
dapan 已提交
358
    rsp.affectedRows = 10;
D
dapan1121 已提交
359
    schHandleResponseMsg(pJob, task, TDMT_VND_SUBMIT_RSP, (char *)&rsp, sizeof(rsp), 0);
D
dapan 已提交
360
    
D
dapan1121 已提交
361
    pIter = taosHashIterate(pJob->execTasks, pIter);
D
dapan 已提交
362 363
  }    

D
dapan1121 已提交
364 365
  schReleaseJob(job);

D
dapan 已提交
366 367 368
  return NULL;
}

D
dapan1121 已提交
369
void *schtCreateFetchRspThread(void *param) {
D
dapan1121 已提交
370 371
  int64_t job = *(int64_t *)param;
  SSchJob* pJob = schAcquireJob(job);
D
dapan1121 已提交
372

wafwerar's avatar
wafwerar 已提交
373
  taosSsleep(1);
D
dapan1121 已提交
374 375 376 377 378

  int32_t code = 0;
  SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)calloc(1, sizeof(SRetrieveTableRsp));
  rsp->completed = 1;
  rsp->numOfRows = 10;
D
dapan1121 已提交
379
 
D
dapan1121 已提交
380 381 382 383
  code = schHandleResponseMsg(pJob, pJob->fetchTask, TDMT_VND_FETCH_RSP, (char *)rsp, sizeof(*rsp), 0);

  schReleaseJob(job);
  
D
dapan1121 已提交
384 385 386 387
  assert(code == 0);
}


D
dapan1121 已提交
388 389 390 391 392 393 394 395 396
void *schtFetchRspThread(void *aa) {
  SDataBuf dataBuf = {0};
  SSchCallbackParam* param = NULL;

  while (!schtTestStop) {
    if (0 == atomic_val_compare_exchange_32(&schtStartFetch, 1, 0)) {
      continue;
    }

wafwerar's avatar
wafwerar 已提交
397
    taosUsleep(1);
D
dapan1121 已提交
398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419
    
    param = (SSchCallbackParam *)calloc(1, sizeof(*param));

    param->queryId = schtQueryId;  
    param->taskId = schtFetchTaskId;

    int32_t code = 0;
    SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)calloc(1, sizeof(SRetrieveTableRsp));
    rsp->completed = 1;
    rsp->numOfRows = 10;

    dataBuf.pData = rsp;
    dataBuf.len = sizeof(*rsp);

    code = schHandleCallback(param, &dataBuf, TDMT_VND_FETCH_RSP, 0);
      
    assert(code == 0 || code);
  }
}

void schtFreeQueryJob(int32_t freeThread) {
  static uint32_t freeNum = 0;
D
dapan1121 已提交
420
  int64_t job = queryJobRefId;
D
dapan1121 已提交
421
  
D
dapan1121 已提交
422
  if (job && atomic_val_compare_exchange_64(&queryJobRefId, job, 0)) {
D
dapan1121 已提交
423
    schedulerFreeJob(job);
D
dapan1121 已提交
424 425 426 427 428 429 430 431 432 433 434 435 436 437
    if (freeThread) {
      if (++freeNum % schtTestPrintNum == 0) {
        printf("FreeNum:%d\n", freeNum);
      }
    }
  }
}

void* schtRunJobThread(void *aa) {
  void *mockPointer = (void *)0x1;
  char *clusterId = "cluster1";
  char *dbname = "1.db1";
  char *tablename = "table1";
  SVgroupInfo vgInfo = {0};
X
Xiaoyu Wang 已提交
438
  SQueryPlan dag;
D
dapan1121 已提交
439 440 441 442 443 444 445 446 447 448 449 450

  schtInitLogFile();

  
  int32_t code = schedulerInit(NULL);
  assert(code == 0);


  schtSetPlanToString();
  schtSetExecNode();
  schtSetAsyncSendMsgToServer();

D
dapan1121 已提交
451
  SSchJob *pJob = NULL;
D
dapan1121 已提交
452 453 454 455 456 457 458 459
  SSchCallbackParam *param = NULL;
  SHashObj *execTasks = NULL;
  SDataBuf dataBuf = {0};
  uint32_t jobFinished = 0;

  while (!schtTestStop) {
    schtBuildQueryDag(&dag);

H
Haojun Liao 已提交
460
    SArray *qnodeList = taosArrayInit(1, sizeof(SEp));
D
dapan1121 已提交
461

H
Haojun Liao 已提交
462
    SEp qnodeAddr = {0};
D
dapan1121 已提交
463 464 465 466
    strcpy(qnodeAddr.fqdn, "qnode0.ep");
    qnodeAddr.port = 6031;
    taosArrayPush(qnodeList, &qnodeAddr);

D
dapan1121 已提交
467
    code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, "select * from tb", &queryJobRefId);
D
dapan1121 已提交
468 469
    assert(code == 0);

D
dapan1121 已提交
470 471 472 473 474 475 476
    pJob = schAcquireJob(queryJobRefId);
    if (NULL == pJob) {
      taosArrayDestroy(qnodeList);
      schtFreeQueryDag(&dag);
      continue;
    }
    
D
dapan1121 已提交
477
    execTasks = taosHashInit(5, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
D
dapan1121 已提交
478
    void *pIter = taosHashIterate(pJob->execTasks, NULL);
D
dapan1121 已提交
479 480 481 482 483
    while (pIter) {
      SSchTask *task = *(SSchTask **)pIter;
      schtFetchTaskId = task->taskId - 1;
      
      taosHashPut(execTasks, &task->taskId, sizeof(task->taskId), task, sizeof(*task));
D
dapan1121 已提交
484
      pIter = taosHashIterate(pJob->execTasks, pIter);
D
dapan1121 已提交
485 486 487
    }    

    param = (SSchCallbackParam *)calloc(1, sizeof(*param));
D
dapan1121 已提交
488 489
    param->refId = queryJobRefId;
    param->queryId = pJob->queryId;   
D
dapan1121 已提交
490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507

    pIter = taosHashIterate(execTasks, NULL);
    while (pIter) {
      SSchTask *task = (SSchTask *)pIter;

      param->taskId = task->taskId;
      SQueryTableRsp rsp = {0};
      dataBuf.pData = &rsp;
      dataBuf.len = sizeof(rsp);
      
      code = schHandleCallback(param, &dataBuf, TDMT_VND_QUERY_RSP, 0);
      assert(code == 0 || code);

      pIter = taosHashIterate(execTasks, pIter);
    }    


    param = (SSchCallbackParam *)calloc(1, sizeof(*param));
D
dapan1121 已提交
508 509 510
    param->refId = queryJobRefId;
    param->queryId = pJob->queryId;   
    
D
dapan1121 已提交
511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527
    pIter = taosHashIterate(execTasks, NULL);
    while (pIter) {
      SSchTask *task = (SSchTask *)pIter;

      param->taskId = task->taskId;
      SResReadyRsp rsp = {0};
      dataBuf.pData = &rsp;
      dataBuf.len = sizeof(rsp);
      
      code = schHandleCallback(param, &dataBuf, TDMT_VND_RES_READY_RSP, 0);
      assert(code == 0 || code);
      
      pIter = taosHashIterate(execTasks, pIter);
    }  


    param = (SSchCallbackParam *)calloc(1, sizeof(*param));
D
dapan1121 已提交
528 529
    param->refId = queryJobRefId;
    param->queryId = pJob->queryId;   
D
dapan1121 已提交
530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547

    pIter = taosHashIterate(execTasks, NULL);
    while (pIter) {
      SSchTask *task = (SSchTask *)pIter;

      param->taskId = task->taskId - 1;
      SQueryTableRsp rsp = {0};
      dataBuf.pData = &rsp;
      dataBuf.len = sizeof(rsp);
      
      code = schHandleCallback(param, &dataBuf, TDMT_VND_QUERY_RSP, 0);
      assert(code == 0 || code);
      
      pIter = taosHashIterate(execTasks, pIter);
    }    


    param = (SSchCallbackParam *)calloc(1, sizeof(*param));
D
dapan1121 已提交
548 549
    param->refId = queryJobRefId;
    param->queryId = pJob->queryId;   
D
dapan1121 已提交
550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568

    pIter = taosHashIterate(execTasks, NULL);
    while (pIter) {
      SSchTask *task = (SSchTask *)pIter;

      param->taskId = task->taskId - 1;
      SResReadyRsp rsp = {0};
      dataBuf.pData = &rsp;
      dataBuf.len = sizeof(rsp);
      
      code = schHandleCallback(param, &dataBuf, TDMT_VND_RES_READY_RSP, 0);
      assert(code == 0 || code);
      
      pIter = taosHashIterate(execTasks, pIter);
    }  

    atomic_store_32(&schtStartFetch, 1);

    void *data = NULL;  
D
dapan1121 已提交
569
    code = schedulerFetchRows(queryJobRefId, &data);
D
dapan1121 已提交
570 571 572 573 574 575 576 577 578
    assert(code == 0 || code);

    if (0 == code) {
      SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data;
      assert(pRsp->completed == 1);
      assert(pRsp->numOfRows == 10);
    }

    data = NULL;
D
dapan1121 已提交
579
    code = schedulerFetchRows(queryJobRefId, &data);
D
dapan1121 已提交
580 581 582 583 584
    assert(code == 0 || code);
    
    schtFreeQueryJob(0);

    taosHashCleanup(execTasks);
D
dapan1121 已提交
585
    taosArrayDestroy(qnodeList);
D
dapan1121 已提交
586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601

    schtFreeQueryDag(&dag);

    if (++jobFinished % schtTestPrintNum == 0) {
      printf("jobFinished:%d\n", jobFinished);
    }

    ++schtQueryId;
  }

  schedulerDestroy();

}

void* schtFreeJobThread(void *aa) {
  while (!schtTestStop) {
wafwerar's avatar
wafwerar 已提交
602
    taosUsleep(taosRand() % 100);
D
dapan1121 已提交
603 604 605
    schtFreeQueryJob(1);
  }
}
D
dapan1121 已提交
606 607


D
dapan1121 已提交
608 609
}

D
dapan 已提交
610
TEST(queryTest, normalCase) {
D
dapan1121 已提交
611
  void *mockPointer = (void *)0x1;
D
dapan1121 已提交
612 613 614
  char *clusterId = "cluster1";
  char *dbname = "1.db1";
  char *tablename = "table1";
D
dapan1121 已提交
615
  SVgroupInfo vgInfo = {0};
D
dapan1121 已提交
616
  int64_t job = 0;
H
Haojun Liao 已提交
617
  SQueryPlan dag;
D
dapan1121 已提交
618

619
  memset(&dag, 0, sizeof(dag));
D
dapan1121 已提交
620

H
Haojun Liao 已提交
621
  SArray *qnodeList = taosArrayInit(1, sizeof(SEp));
D
dapan 已提交
622

H
Haojun Liao 已提交
623
  SEp qnodeAddr = {0};
D
dapan 已提交
624 625 626
  strcpy(qnodeAddr.fqdn, "qnode0.ep");
  qnodeAddr.port = 6031;
  taosArrayPush(qnodeList, &qnodeAddr);
D
dapan1121 已提交
627 628
  
  int32_t code = schedulerInit(NULL);
D
dapan1121 已提交
629
  ASSERT_EQ(code, 0);
D
dapan1121 已提交
630

D
dapan 已提交
631
  schtBuildQueryDag(&dag);
D
dapan 已提交
632 633 634

  schtSetPlanToString();
  schtSetExecNode();
D
dapan1121 已提交
635
  schtSetAsyncSendMsgToServer();
D
dapan1121 已提交
636
  
D
dapan1121 已提交
637
  code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, "select * from tb", &job);
D
dapan1121 已提交
638
  ASSERT_EQ(code, 0);
D
dapan 已提交
639

D
dapan1121 已提交
640 641 642 643
  
  SSchJob *pJob = schAcquireJob(job);
  
  void *pIter = taosHashIterate(pJob->execTasks, NULL);
D
dapan 已提交
644
  while (pIter) {
D
dapan 已提交
645
    SSchTask *task = *(SSchTask **)pIter;
D
dapan 已提交
646 647

    SQueryTableRsp rsp = {0};
D
dapan1121 已提交
648
    code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
D
dapan 已提交
649 650
    
    ASSERT_EQ(code, 0);
D
dapan1121 已提交
651
    pIter = taosHashIterate(pJob->execTasks, pIter);
D
dapan 已提交
652 653
  }    

D
dapan1121 已提交
654
  pIter = taosHashIterate(pJob->execTasks, NULL);
D
dapan 已提交
655
  while (pIter) {
D
dapan 已提交
656
    SSchTask *task = *(SSchTask **)pIter;
D
dapan 已提交
657 658

    SResReadyRsp rsp = {0};
D
dapan1121 已提交
659
    code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0);
D
dapan1121 已提交
660
    printf("code:%d", code);
D
dapan 已提交
661
    ASSERT_EQ(code, 0);
D
dapan1121 已提交
662
    pIter = taosHashIterate(pJob->execTasks, pIter);
D
dapan 已提交
663 664
  }  

D
dapan1121 已提交
665
  pIter = taosHashIterate(pJob->execTasks, NULL);
D
dapan 已提交
666
  while (pIter) {
D
dapan 已提交
667
    SSchTask *task = *(SSchTask **)pIter;
D
dapan 已提交
668 669

    SQueryTableRsp rsp = {0};
D
dapan1121 已提交
670
    code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
D
dapan 已提交
671 672
    
    ASSERT_EQ(code, 0);
D
dapan1121 已提交
673
    pIter = taosHashIterate(pJob->execTasks, pIter);
D
dapan 已提交
674 675
  }    

D
dapan1121 已提交
676
  pIter = taosHashIterate(pJob->execTasks, NULL);
D
dapan 已提交
677
  while (pIter) {
D
dapan 已提交
678
    SSchTask *task = *(SSchTask **)pIter;
D
dapan 已提交
679 680

    SResReadyRsp rsp = {0};
D
dapan1121 已提交
681
    code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0);
D
dapan 已提交
682 683
    ASSERT_EQ(code, 0);
    
D
dapan1121 已提交
684
    pIter = taosHashIterate(pJob->execTasks, pIter);
D
dapan 已提交
685 686
  }  

wafwerar's avatar
wafwerar 已提交
687 688
  TdThreadAttr thattr;
  taosThreadAttrInit(&thattr);
D
dapan 已提交
689

wafwerar's avatar
wafwerar 已提交
690 691
  TdThread thread1;
  taosThreadCreate(&(thread1), &thattr, schtCreateFetchRspThread, &job);
D
dapan 已提交
692

D
dapan1121 已提交
693
  void *data = NULL;  
D
dapan1121 已提交
694
  code = schedulerFetchRows(job, &data);
D
dapan 已提交
695 696 697 698 699
  ASSERT_EQ(code, 0);

  SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data;
  ASSERT_EQ(pRsp->completed, 1);
  ASSERT_EQ(pRsp->numOfRows, 10);
D
dapan1121 已提交
700
  tfree(data);
D
dapan 已提交
701 702

  data = NULL;
D
dapan1121 已提交
703
  code = schedulerFetchRows(job, &data);
D
dapan 已提交
704
  ASSERT_EQ(code, 0);
D
dapan1121 已提交
705 706 707
  ASSERT_TRUE(data == NULL);

  schReleaseJob(job);
D
dapan 已提交
708

D
dapan1121 已提交
709
  schedulerFreeJob(job);
D
dapan1121 已提交
710 711

  schtFreeQueryDag(&dag);
D
dapan1121 已提交
712 713

  schedulerDestroy();
D
dapan1121 已提交
714
}
D
dapan1121 已提交
715

D
dapan 已提交
716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748
TEST(queryTest, readyFirstCase) {
  void *mockPointer = (void *)0x1;
  char *clusterId = "cluster1";
  char *dbname = "1.db1";
  char *tablename = "table1";
  SVgroupInfo vgInfo = {0};
  int64_t job = 0;
  SQueryPlan dag;

  memset(&dag, 0, sizeof(dag));

  SArray *qnodeList = taosArrayInit(1, sizeof(SEp));

  SEp qnodeAddr = {0};
  strcpy(qnodeAddr.fqdn, "qnode0.ep");
  qnodeAddr.port = 6031;
  taosArrayPush(qnodeList, &qnodeAddr);
  
  int32_t code = schedulerInit(NULL);
  ASSERT_EQ(code, 0);

  schtBuildQueryDag(&dag);

  schtSetPlanToString();
  schtSetExecNode();
  schtSetAsyncSendMsgToServer();
  
  code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, "select * from tb", &job);
  ASSERT_EQ(code, 0);

  
  SSchJob *pJob = schAcquireJob(job);

D
dapan1121 已提交
749
  void *pIter = taosHashIterate(pJob->execTasks, NULL);
D
dapan 已提交
750 751 752 753 754 755 756 757 758 759
  while (pIter) {
    SSchTask *task = *(SSchTask **)pIter;

    SResReadyRsp rsp = {0};
    code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0);
    printf("code:%d", code);
    ASSERT_EQ(code, 0);
    pIter = taosHashIterate(pJob->execTasks, pIter);
  }  
  
D
dapan1121 已提交
760
  pIter = taosHashIterate(pJob->execTasks, NULL);
D
dapan 已提交
761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794
  while (pIter) {
    SSchTask *task = *(SSchTask **)pIter;

    SQueryTableRsp rsp = {0};
    code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
    
    ASSERT_EQ(code, 0);
    pIter = taosHashIterate(pJob->execTasks, pIter);
  }    

  pIter = taosHashIterate(pJob->execTasks, NULL);
  while (pIter) {
    SSchTask *task = *(SSchTask **)pIter;

    SResReadyRsp rsp = {0};
    code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0);
    ASSERT_EQ(code, 0);
    
    pIter = taosHashIterate(pJob->execTasks, pIter);
  }  

  pIter = taosHashIterate(pJob->execTasks, NULL);
  while (pIter) {
    SSchTask *task = *(SSchTask **)pIter;

    SQueryTableRsp rsp = {0};
    code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
    
    ASSERT_EQ(code, 0);
    pIter = taosHashIterate(pJob->execTasks, pIter);
  }    



D
dapan1121 已提交
795 796
  TdThreadAttr thattr;
  taosThreadAttrInit(&thattr);
D
dapan 已提交
797

D
dapan1121 已提交
798 799
  TdThread thread1;
  taosThreadCreate(&(thread1), &thattr, schtCreateFetchRspThread, &job);
D
dapan 已提交
800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825

  void *data = NULL;  
  code = schedulerFetchRows(job, &data);
  ASSERT_EQ(code, 0);

  SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data;
  ASSERT_EQ(pRsp->completed, 1);
  ASSERT_EQ(pRsp->numOfRows, 10);
  tfree(data);

  data = NULL;
  code = schedulerFetchRows(job, &data);
  ASSERT_EQ(code, 0);
  ASSERT_TRUE(data == NULL);

  schReleaseJob(job);

  schedulerFreeJob(job);

  schtFreeQueryDag(&dag);

  schedulerDestroy();
}



D
dapan 已提交
826 827 828 829 830 831 832 833 834 835 836
TEST(queryTest, flowCtrlCase) {
  void *mockPointer = (void *)0x1;
  char *clusterId = "cluster1";
  char *dbname = "1.db1";
  char *tablename = "table1";
  SVgroupInfo vgInfo = {0};
  int64_t job = 0;
  SQueryPlan dag;

  schtInitLogFile();

wafwerar's avatar
wafwerar 已提交
837
  taosSeedRand(taosGetTimestampSec());
D
dapan 已提交
838
  
D
dapan 已提交
839 840 841 842 843 844 845 846 847 848
  SArray *qnodeList = taosArrayInit(1, sizeof(SEp));

  SEp qnodeAddr = {0};
  strcpy(qnodeAddr.fqdn, "qnode0.ep");
  qnodeAddr.port = 6031;
  taosArrayPush(qnodeList, &qnodeAddr);
  
  int32_t code = schedulerInit(NULL);
  ASSERT_EQ(code, 0);

D
dapan 已提交
849
  schtBuildQueryFlowCtrlDag(&dag);
D
dapan 已提交
850 851 852 853 854 855 856 857 858 859 860

  schtSetPlanToString();
  schtSetExecNode();
  schtSetAsyncSendMsgToServer();
  
  code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, "select * from tb", &job);
  ASSERT_EQ(code, 0);

  
  SSchJob *pJob = schAcquireJob(job);

D
dapan 已提交
861 862 863 864 865 866 867
  bool queryDone = false;
  
  while (!queryDone) {
    void *pIter = taosHashIterate(pJob->execTasks, NULL);
    if (NULL == pIter) {
      break;
    }
D
dapan 已提交
868
    
D
dapan 已提交
869 870
    while (pIter) {
      SSchTask *task = *(SSchTask **)pIter;
D
dapan 已提交
871

D
dapan 已提交
872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890
      taosHashCancelIterate(pJob->execTasks, pIter);

      if (task->lastMsgType == TDMT_VND_QUERY) {
        SQueryTableRsp rsp = {0};
        code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
        
        ASSERT_EQ(code, 0);
      } else if (task->lastMsgType == TDMT_VND_RES_READY) {
        SResReadyRsp rsp = {0};
        code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0);
        ASSERT_EQ(code, 0);
      } else {
        queryDone = true;
        break;
      }
      
      pIter = NULL;
    }    
  }
D
dapan 已提交
891 892


wafwerar's avatar
wafwerar 已提交
893 894
  TdThreadAttr thattr;
  taosThreadAttrInit(&thattr);
D
dapan 已提交
895

wafwerar's avatar
wafwerar 已提交
896 897
  TdThread thread1;
  taosThreadCreate(&(thread1), &thattr, schtCreateFetchRspThread, &job);
D
dapan 已提交
898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920

  void *data = NULL;  
  code = schedulerFetchRows(job, &data);
  ASSERT_EQ(code, 0);

  SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data;
  ASSERT_EQ(pRsp->completed, 1);
  ASSERT_EQ(pRsp->numOfRows, 10);
  tfree(data);

  data = NULL;
  code = schedulerFetchRows(job, &data);
  ASSERT_EQ(code, 0);
  ASSERT_TRUE(data == NULL);

  schReleaseJob(job);

  schedulerFreeJob(job);

  schtFreeQueryDag(&dag);

  schedulerDestroy();
}
D
dapan1121 已提交
921

D
dapan 已提交
922 923 924 925 926 927 928

TEST(insertTest, normalCase) {
  void *mockPointer = (void *)0x1;
  char *clusterId = "cluster1";
  char *dbname = "1.db1";
  char *tablename = "table1";
  SVgroupInfo vgInfo = {0};
X
Xiaoyu Wang 已提交
929
  SQueryPlan dag;
D
dapan 已提交
930
  uint64_t numOfRows = 0;
D
dapan1121 已提交
931

H
Haojun Liao 已提交
932
  SArray *qnodeList = taosArrayInit(1, sizeof(SEp));
D
dapan 已提交
933

H
Haojun Liao 已提交
934
  SEp qnodeAddr = {0};
D
dapan 已提交
935 936 937 938 939 940 941 942 943 944
  strcpy(qnodeAddr.fqdn, "qnode0.ep");
  qnodeAddr.port = 6031;
  taosArrayPush(qnodeList, &qnodeAddr);
  
  int32_t code = schedulerInit(NULL);
  ASSERT_EQ(code, 0);

  schtBuildInsertDag(&dag);

  schtSetPlanToString();
D
dapan1121 已提交
945
  schtSetAsyncSendMsgToServer();
D
dapan 已提交
946

wafwerar's avatar
wafwerar 已提交
947 948
  TdThreadAttr thattr;
  taosThreadAttrInit(&thattr);
D
dapan 已提交
949

wafwerar's avatar
wafwerar 已提交
950 951
  TdThread thread1;
  taosThreadCreate(&(thread1), &thattr, schtSendRsp, &insertJobRefId);
D
dapan1121 已提交
952 953

  SQueryResult res = {0};
D
dapan1121 已提交
954
  code = schedulerExecJob(mockPointer, qnodeList, &dag, &insertJobRefId, "insert into tb values(now,1)", &res);
D
dapan 已提交
955
  ASSERT_EQ(code, 0);
D
dapan1121 已提交
956
  ASSERT_EQ(res.numOfRows, 20);
D
dapan 已提交
957

D
dapan1121 已提交
958
  schedulerFreeJob(insertJobRefId);
D
dapan1121 已提交
959 960

  schedulerDestroy();  
D
dapan 已提交
961 962
}

D
dapan1121 已提交
963
TEST(multiThread, forceFree) {
wafwerar's avatar
wafwerar 已提交
964 965
  TdThreadAttr thattr;
  taosThreadAttrInit(&thattr);
D
dapan 已提交
966

wafwerar's avatar
wafwerar 已提交
967 968 969 970
  TdThread thread1, thread2, thread3;
  taosThreadCreate(&(thread1), &thattr, schtRunJobThread, NULL);
  taosThreadCreate(&(thread2), &thattr, schtFreeJobThread, NULL);
  taosThreadCreate(&(thread3), &thattr, schtFetchRspThread, NULL);
D
dapan1121 已提交
971

D
dapan1121 已提交
972 973
  while (true) {
    if (schtTestDeadLoop) {
wafwerar's avatar
wafwerar 已提交
974
      taosSsleep(1);
D
dapan1121 已提交
975
    } else {
wafwerar's avatar
wafwerar 已提交
976
      taosSsleep(schtTestMTRunSec);
D
dapan1121 已提交
977 978 979 980 981
      break;
    }
  }
  
  schtTestStop = true;
wafwerar's avatar
wafwerar 已提交
982
  taosSsleep(3);
D
dapan1121 已提交
983
}
D
dapan 已提交
984

D
dapan1121 已提交
985
int main(int argc, char** argv) {
wafwerar's avatar
wafwerar 已提交
986
  taosSeedRand(taosGetTimestampSec());
D
dapan1121 已提交
987 988 989 990
  testing::InitGoogleTest(&argc, argv);
  return RUN_ALL_TESTS();
}

D
dapan1121 已提交
991
#pragma GCC diagnostic pop