schedulerTests.cpp 25.5 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include <gtest/gtest.h>
#include <iostream>

19 20 21 22 23 24 25 26 27
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
#pragma GCC diagnostic ignored "-Wreturn-type"
#pragma GCC diagnostic ignored "-Wformat"
#include <addr_any.h>

wafwerar's avatar
wafwerar 已提交
28 29 30
#ifdef WINDOWS
#define TD_USE_WINSOCK
#endif
D
dapan1121 已提交
31 32
#include "os.h"

33
#include "tglobal.h"
D
dapan1121 已提交
34 35 36
#include "taos.h"
#include "tdef.h"
#include "tvariant.h"
D
dapan1121 已提交
37 38
#include "catalog.h"
#include "scheduler.h"
H
Haojun Liao 已提交
39 40 41
#include "taos.h"
#include "tdatablock.h"
#include "tdef.h"
D
dapan1121 已提交
42
#include "trpc.h"
H
Haojun Liao 已提交
43
#include "tvariant.h"
S
Shengliang Guan 已提交
44 45 46 47 48 49 50 51 52

#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
#pragma GCC diagnostic ignored "-Wreturn-type"
#pragma GCC diagnostic ignored "-Wformat"

D
dapan1121 已提交
53
#include "schInt.h"
D
dapan 已提交
54
#include "stub.h"
D
dapan1121 已提交
55
#include "tref.h"
D
dapan1121 已提交
56

D
dapan1121 已提交
57
namespace {
D
dapan 已提交
58

D
dapan1121 已提交
59
extern "C" int32_t schHandleResponseMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode);
D
dapan1121 已提交
60 61
extern "C" int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, int32_t rspCode);

D
dapan1121 已提交
62 63
int64_t insertJobRefId = 0;
int64_t queryJobRefId = 0;
D
dapan1121 已提交
64 65 66 67 68 69 70

uint64_t schtMergeTemplateId = 0x4;
uint64_t schtFetchTaskId = 0;
uint64_t schtQueryId = 1;

bool schtTestStop = false;
bool schtTestDeadLoop = false;
D
dapan1121 已提交
71
int32_t schtTestMTRunSec = 10;
D
dapan1121 已提交
72 73
int32_t schtTestPrintNum = 1000;
int32_t schtStartFetch = 0;
D
dapan 已提交
74

D
dapan1121 已提交
75

D
dapan1121 已提交
76 77 78 79 80 81
void schtInitLogFile() {
  const char    *defaultLogFileNamePrefix = "taoslog";
  const int32_t  maxLogFileNum = 10;

  tsAsyncLog = 0;
  qDebugFlag = 159;
wafwerar's avatar
wafwerar 已提交
82
  strcpy(tsLogDir, TD_LOG_DIR_PATH);
D
dapan1121 已提交
83

S
Shengliang Guan 已提交
84
  if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) {
S
os env  
Shengliang Guan 已提交
85
    printf("failed to open log file in directory:%s\n", tsLogDir);
D
dapan1121 已提交
86 87 88 89
  }

}

D
dapan1121 已提交
90
void schtQueryCb(SExecResult* pResult, void* param, int32_t code) {
D
dapan1121 已提交
91 92 93 94
  assert(TSDB_CODE_SUCCESS == code);
  *(int32_t*)param = 1;
}

D
dapan1121 已提交
95

X
Xiaoyu Wang 已提交
96
void schtBuildQueryDag(SQueryPlan *dag) {
D
dapan1121 已提交
97
  uint64_t qId = schtQueryId;
D
dapan1121 已提交
98 99 100
  
  dag->queryId = qId;
  dag->numOfSubplans = 2;
X
Xiaoyu Wang 已提交
101 102 103
  dag->pSubplans = nodesMakeList();
  SNodeListNode *scan = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST);
  SNodeListNode *merge = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST);
D
dapan1121 已提交
104
  
wafwerar's avatar
wafwerar 已提交
105 106
  SSubplan *scanPlan = (SSubplan *)taosMemoryCalloc(1, sizeof(SSubplan));
  SSubplan *mergePlan = (SSubplan *)taosMemoryCalloc(1, sizeof(SSubplan));
D
dapan1121 已提交
107 108

  scanPlan->id.queryId = qId;
X
Xiaoyu Wang 已提交
109
  scanPlan->id.groupId = 0x0000000000000002;
D
dapan1121 已提交
110
  scanPlan->id.subplanId = 0x0000000000000003;
X
Xiaoyu Wang 已提交
111
  scanPlan->subplanType = SUBPLAN_TYPE_SCAN;
H
Haojun Liao 已提交
112

D
dapan1121 已提交
113
  scanPlan->execNode.nodeId = 1;
L
Liu Jicong 已提交
114 115
  scanPlan->execNode.epSet.inUse = 0;
  addEpIntoEpSet(&scanPlan->execNode.epSet, "ep0", 6030);
H
Haojun Liao 已提交
116

D
dapan1121 已提交
117 118
  scanPlan->pChildren = NULL;
  scanPlan->level = 1;
X
Xiaoyu Wang 已提交
119
  scanPlan->pParents = nodesMakeList();
wafwerar's avatar
wafwerar 已提交
120
  scanPlan->pNode = (SPhysiNode*)taosMemoryCalloc(1, sizeof(SPhysiNode));
D
dapan1121 已提交
121
  scanPlan->msgType = TDMT_SCH_QUERY;
D
dapan1121 已提交
122 123

  mergePlan->id.queryId = qId;
X
Xiaoyu Wang 已提交
124
  mergePlan->id.groupId = schtMergeTemplateId;
X
Xiaoyu Wang 已提交
125 126
  mergePlan->id.subplanId = 0x5555;
  mergePlan->subplanType = SUBPLAN_TYPE_MERGE;
D
dapan1121 已提交
127
  mergePlan->level = 0;
L
Liu Jicong 已提交
128
  mergePlan->execNode.epSet.numOfEps = 0;
H
Haojun Liao 已提交
129

X
Xiaoyu Wang 已提交
130
  mergePlan->pChildren = nodesMakeList();
D
dapan1121 已提交
131
  mergePlan->pParents = NULL;
wafwerar's avatar
wafwerar 已提交
132
  mergePlan->pNode = (SPhysiNode*)taosMemoryCalloc(1, sizeof(SPhysiNode));
D
dapan1121 已提交
133
  mergePlan->msgType = TDMT_SCH_QUERY;
D
dapan1121 已提交
134

D
dapan 已提交
135 136 137
  merge->pNodeList = nodesMakeList();
  scan->pNodeList = nodesMakeList();

X
Xiaoyu Wang 已提交
138 139
  nodesListAppend(merge->pNodeList, (SNode*)mergePlan);
  nodesListAppend(scan->pNodeList, (SNode*)scanPlan);
D
dapan1121 已提交
140

X
Xiaoyu Wang 已提交
141 142
  nodesListAppend(mergePlan->pChildren, (SNode*)scanPlan);
  nodesListAppend(scanPlan->pParents, (SNode*)mergePlan);
D
dapan1121 已提交
143

X
Xiaoyu Wang 已提交
144 145
  nodesListAppend(dag->pSubplans, (SNode*)merge);  
  nodesListAppend(dag->pSubplans, (SNode*)scan);
D
dapan1121 已提交
146 147
}

D
dapan 已提交
148 149 150 151 152 153 154 155 156 157
void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) {
  uint64_t qId = schtQueryId;
  int32_t scanPlanNum = 20;
  
  dag->queryId = qId;
  dag->numOfSubplans = 2;
  dag->pSubplans = nodesMakeList();
  SNodeListNode *scan = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST);
  SNodeListNode *merge = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST);
  
wafwerar's avatar
wafwerar 已提交
158 159
  SSubplan *scanPlan = (SSubplan *)taosMemoryCalloc(scanPlanNum, sizeof(SSubplan));
  SSubplan *mergePlan = (SSubplan *)taosMemoryCalloc(1, sizeof(SSubplan));
D
dapan 已提交
160 161 162 163 164 165 166 167

  merge->pNodeList = nodesMakeList();
  scan->pNodeList = nodesMakeList();

  mergePlan->pChildren = nodesMakeList();

  for (int32_t i = 0; i < scanPlanNum; ++i) {
    scanPlan[i].id.queryId = qId;
X
Xiaoyu Wang 已提交
168
    scanPlan[i].id.groupId = 0x0000000000000002;
D
dapan 已提交
169 170 171 172
    scanPlan[i].id.subplanId = 0x0000000000000003 + i;
    scanPlan[i].subplanType = SUBPLAN_TYPE_SCAN;

    scanPlan[i].execNode.nodeId = 1 + i;
X
Xiaoyu Wang 已提交
173 174 175 176 177 178
    scanPlan[i].execNode.epSet.inUse = 0;
    scanPlan[i].execNodeStat.tableNum = taosRand() % 30;
    addEpIntoEpSet(&scanPlan[i].execNode.epSet, "ep0", 6030);
    addEpIntoEpSet(&scanPlan[i].execNode.epSet, "ep1", 6030);
    addEpIntoEpSet(&scanPlan[i].execNode.epSet, "ep2", 6030);
    scanPlan[i].execNode.epSet.inUse = taosRand() % 3;
D
dapan 已提交
179 180 181 182

    scanPlan[i].pChildren = NULL;
    scanPlan[i].level = 1;
    scanPlan[i].pParents = nodesMakeList();
wafwerar's avatar
wafwerar 已提交
183
    scanPlan[i].pNode = (SPhysiNode*)taosMemoryCalloc(1, sizeof(SPhysiNode));
D
dapan1121 已提交
184
    scanPlan[i].msgType = TDMT_SCH_QUERY;
D
dapan 已提交
185 186 187 188 189 190 191 192

    nodesListAppend(scanPlan[i].pParents, (SNode*)mergePlan);
    nodesListAppend(mergePlan->pChildren, (SNode*)(scanPlan + i));

    nodesListAppend(scan->pNodeList, (SNode*)(scanPlan + i));
  }

  mergePlan->id.queryId = qId;
X
Xiaoyu Wang 已提交
193
  mergePlan->id.groupId = schtMergeTemplateId;
D
dapan 已提交
194 195 196
  mergePlan->id.subplanId = 0x5555;
  mergePlan->subplanType = SUBPLAN_TYPE_MERGE;
  mergePlan->level = 0;
X
Xiaoyu Wang 已提交
197
  mergePlan->execNode.epSet.numOfEps = 0;
D
dapan 已提交
198 199

  mergePlan->pParents = NULL;
wafwerar's avatar
wafwerar 已提交
200
  mergePlan->pNode = (SPhysiNode*)taosMemoryCalloc(1, sizeof(SPhysiNode));
D
dapan1121 已提交
201
  mergePlan->msgType = TDMT_SCH_QUERY;
D
dapan 已提交
202 203 204 205 206 207 208 209

  nodesListAppend(merge->pNodeList, (SNode*)mergePlan);

  nodesListAppend(dag->pSubplans, (SNode*)merge);  
  nodesListAppend(dag->pSubplans, (SNode*)scan);
}


X
Xiaoyu Wang 已提交
210
void schtFreeQueryDag(SQueryPlan *dag) {
D
dapan1121 已提交
211 212 213 214

}


X
Xiaoyu Wang 已提交
215
void schtBuildInsertDag(SQueryPlan *dag) {
D
dapan 已提交
216 217 218 219
  uint64_t qId = 0x0000000000000002;
  
  dag->queryId = qId;
  dag->numOfSubplans = 2;
X
Xiaoyu Wang 已提交
220 221
  dag->pSubplans = nodesMakeList();
  SNodeListNode *inserta = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST);
D
dapan 已提交
222
  
wafwerar's avatar
wafwerar 已提交
223
  SSubplan *insertPlan = (SSubplan *)taosMemoryCalloc(2, sizeof(SSubplan));
D
dapan 已提交
224 225

  insertPlan[0].id.queryId = qId;
X
Xiaoyu Wang 已提交
226
  insertPlan[0].id.groupId = 0x0000000000000003;
D
dapan 已提交
227
  insertPlan[0].id.subplanId = 0x0000000000000004;
X
Xiaoyu Wang 已提交
228
  insertPlan[0].subplanType = SUBPLAN_TYPE_MODIFY;
D
dapan 已提交
229
  insertPlan[0].level = 0;
H
Haojun Liao 已提交
230

D
dapan1121 已提交
231
  insertPlan[0].execNode.nodeId = 1;
L
Liu Jicong 已提交
232 233
  insertPlan[0].execNode.epSet.inUse = 0;
  addEpIntoEpSet(&insertPlan[0].execNode.epSet, "ep0", 6030);
H
Haojun Liao 已提交
234

H
Haojun Liao 已提交
235
  insertPlan[0].pChildren = NULL;
D
dapan 已提交
236 237
  insertPlan[0].pParents = NULL;
  insertPlan[0].pNode = NULL;
wafwerar's avatar
wafwerar 已提交
238
  insertPlan[0].pDataSink = (SDataSinkNode*)taosMemoryCalloc(1, sizeof(SDataSinkNode));
D
dapan1121 已提交
239
  insertPlan[0].msgType = TDMT_VND_SUBMIT;
D
dapan 已提交
240 241

  insertPlan[1].id.queryId = qId;
X
Xiaoyu Wang 已提交
242
  insertPlan[1].id.groupId = 0x0000000000000003;
D
dapan 已提交
243
  insertPlan[1].id.subplanId = 0x0000000000000005;
X
Xiaoyu Wang 已提交
244
  insertPlan[1].subplanType = SUBPLAN_TYPE_MODIFY;
D
dapan 已提交
245
  insertPlan[1].level = 0;
H
Haojun Liao 已提交
246

D
dapan1121 已提交
247
  insertPlan[1].execNode.nodeId = 1;
L
Liu Jicong 已提交
248 249
  insertPlan[1].execNode.epSet.inUse = 0;
  addEpIntoEpSet(&insertPlan[1].execNode.epSet, "ep0", 6030);
H
Haojun Liao 已提交
250

H
Haojun Liao 已提交
251
  insertPlan[1].pChildren = NULL;
D
dapan 已提交
252 253
  insertPlan[1].pParents = NULL;
  insertPlan[1].pNode = NULL;
wafwerar's avatar
wafwerar 已提交
254
  insertPlan[1].pDataSink = (SDataSinkNode*)taosMemoryCalloc(1, sizeof(SDataSinkNode));
D
dapan1121 已提交
255
  insertPlan[1].msgType = TDMT_VND_SUBMIT;
D
dapan 已提交
256

D
dapan 已提交
257 258
  inserta->pNodeList = nodesMakeList();

X
Xiaoyu Wang 已提交
259
  nodesListAppend(inserta->pNodeList, (SNode*)insertPlan);
D
dapan1121 已提交
260
  insertPlan += 1;
X
Xiaoyu Wang 已提交
261
  nodesListAppend(inserta->pNodeList, (SNode*)insertPlan);
D
dapan 已提交
262

X
Xiaoyu Wang 已提交
263
  nodesListAppend(dag->pSubplans, (SNode*)inserta);  
D
dapan 已提交
264 265 266
}


D
dapan 已提交
267
int32_t schtPlanToString(const SSubplan *subplan, char** str, int32_t* len) {
wafwerar's avatar
wafwerar 已提交
268
  *str = (char *)taosMemoryCalloc(1, 20);
D
dapan 已提交
269 270 271 272
  *len = 20;
  return 0;
}

X
Xiaoyu Wang 已提交
273
void schtExecNode(SSubplan* subplan, uint64_t groupId, SQueryNodeAddr* ep) {
H
Haojun Liao 已提交
274

D
dapan 已提交
275 276
}

D
dapan1121 已提交
277 278 279 280
void schtRpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) {

}

D
dapan 已提交
281 282 283 284
void schtSetPlanToString() {
  static Stub stub;
  stub.set(qSubPlanToString, schtPlanToString);
  {
wafwerar's avatar
wafwerar 已提交
285 286 287 288 289 290
#ifdef WINDOWS
    AddrAny any;
    std::map<std::string,void*> result;
    any.get_func_addr("qSubPlanToString", result);
#endif
#ifdef LINUX
D
dapan 已提交
291 292 293
    AddrAny any("libplanner.so");
    std::map<std::string,void*> result;
    any.get_global_func_addr_dynsym("^qSubPlanToString$", result);
wafwerar's avatar
wafwerar 已提交
294
#endif
D
dapan 已提交
295 296 297 298 299 300 301 302 303 304
    for (const auto& f : result) {
      stub.set(f.second, schtPlanToString);
    }
  }
}

void schtSetExecNode() {
  static Stub stub;
  stub.set(qSetSubplanExecutionNode, schtExecNode);
  {
wafwerar's avatar
wafwerar 已提交
305 306 307 308 309 310
#ifdef WINDOWS
    AddrAny any;
    std::map<std::string,void*> result;
    any.get_func_addr("qSetSubplanExecutionNode", result);
#endif
#ifdef LINUX
D
dapan 已提交
311 312 313
    AddrAny any("libplanner.so");
    std::map<std::string,void*> result;
    any.get_global_func_addr_dynsym("^qSetSubplanExecutionNode$", result);
wafwerar's avatar
wafwerar 已提交
314
#endif
D
dapan 已提交
315 316 317 318 319 320
    for (const auto& f : result) {
      stub.set(f.second, schtExecNode);
    }
  }
}

D
dapan1121 已提交
321 322 323 324
void schtSetRpcSendRequest() {
  static Stub stub;
  stub.set(rpcSendRequest, schtRpcSendRequest);
  {
wafwerar's avatar
wafwerar 已提交
325 326 327 328 329 330
#ifdef WINDOWS
    AddrAny any;
    std::map<std::string,void*> result;
    any.get_func_addr("rpcSendRequest", result);
#endif
#ifdef LINUX
D
dapan1121 已提交
331 332 333
    AddrAny any("libtransport.so");
    std::map<std::string,void*> result;
    any.get_global_func_addr_dynsym("^rpcSendRequest$", result);
wafwerar's avatar
wafwerar 已提交
334
#endif
D
dapan1121 已提交
335 336 337 338 339 340
    for (const auto& f : result) {
      stub.set(f.second, schtRpcSendRequest);
    }
  }
}

D
dapan1121 已提交
341 342
int32_t schtAsyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo) {
  if (pInfo) {
wafwerar's avatar
wafwerar 已提交
343 344 345
    taosMemoryFreeClear(pInfo->param);
    taosMemoryFreeClear(pInfo->msgInfo.pData);
    taosMemoryFree(pInfo);
D
dapan1121 已提交
346
  }
D
dapan1121 已提交
347 348 349 350 351 352 353 354
  return 0;
}


void schtSetAsyncSendMsgToServer() {
  static Stub stub;
  stub.set(asyncSendMsgToServer, schtAsyncSendMsgToServer);
  {
wafwerar's avatar
wafwerar 已提交
355 356 357 358 359 360
#ifdef WINDOWS
    AddrAny any;
    std::map<std::string,void*> result;
    any.get_func_addr("asyncSendMsgToServer", result);
#endif
#ifdef LINUX
D
dapan1121 已提交
361 362 363
    AddrAny any("libtransport.so");
    std::map<std::string,void*> result;
    any.get_global_func_addr_dynsym("^asyncSendMsgToServer$", result);
wafwerar's avatar
wafwerar 已提交
364
#endif
D
dapan1121 已提交
365 366 367 368 369 370
    for (const auto& f : result) {
      stub.set(f.second, schtAsyncSendMsgToServer);
    }
  }
}

D
dapan1121 已提交
371

D
dapan 已提交
372
void *schtSendRsp(void *param) {
D
dapan1121 已提交
373 374
  SSchJob *pJob = NULL;
  int64_t job = 0;
D
dapan 已提交
375 376 377
  int32_t code = 0;

  while (true) {
D
dapan1121 已提交
378
    job = *(int64_t *)param;
D
dapan 已提交
379 380 381 382
    if (job) {
      break;
    }

wafwerar's avatar
wafwerar 已提交
383
    taosMsleep(1);
D
dapan 已提交
384
  }
D
dapan1121 已提交
385 386

  pJob = schAcquireJob(job);
D
dapan 已提交
387
  
D
dapan1121 已提交
388
  void *pIter = taosHashIterate(pJob->execTasks, NULL);
D
dapan 已提交
389 390 391
  while (pIter) {
    SSchTask *task = *(SSchTask **)pIter;

S
Shengliang Guan 已提交
392
    SSubmitRsp rsp = {0};
D
dapan 已提交
393
    rsp.affectedRows = 10;
D
dapan1121 已提交
394
    schHandleResponseMsg(pJob, task, TDMT_VND_SUBMIT_RSP, (char *)&rsp, sizeof(rsp), 0);
D
dapan 已提交
395
    
D
dapan1121 已提交
396
    pIter = taosHashIterate(pJob->execTasks, pIter);
D
dapan 已提交
397 398
  }    

D
dapan1121 已提交
399 400
  schReleaseJob(job);

D
dapan 已提交
401 402 403
  return NULL;
}

D
dapan1121 已提交
404
void *schtCreateFetchRspThread(void *param) {
D
dapan1121 已提交
405 406
  int64_t job = *(int64_t *)param;
  SSchJob* pJob = schAcquireJob(job);
D
dapan1121 已提交
407

wafwerar's avatar
wafwerar 已提交
408
  taosSsleep(1);
D
dapan1121 已提交
409 410

  int32_t code = 0;
wafwerar's avatar
wafwerar 已提交
411
  SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, sizeof(SRetrieveTableRsp));
D
dapan1121 已提交
412 413
  rsp->completed = 1;
  rsp->numOfRows = 10;
D
dapan1121 已提交
414
 
D
dapan1121 已提交
415
  code = schHandleResponseMsg(pJob, pJob->fetchTask, TDMT_SCH_FETCH_RSP, (char *)rsp, sizeof(*rsp), 0);
D
dapan1121 已提交
416 417 418

  schReleaseJob(job);
  
D
dapan1121 已提交
419
  assert(code == 0);
wafwerar's avatar
wafwerar 已提交
420
  return NULL;
D
dapan1121 已提交
421 422 423
}


D
dapan1121 已提交
424 425
void *schtFetchRspThread(void *aa) {
  SDataBuf dataBuf = {0};
D
dapan1121 已提交
426
  SSchTaskCallbackParam* param = NULL;
D
dapan1121 已提交
427 428 429 430 431 432

  while (!schtTestStop) {
    if (0 == atomic_val_compare_exchange_32(&schtStartFetch, 1, 0)) {
      continue;
    }

wafwerar's avatar
wafwerar 已提交
433
    taosUsleep(1);
D
dapan1121 已提交
434
    
435
    param = (SSchTaskCallbackParam *)taosMemoryCalloc(1, sizeof(*param));
D
dapan1121 已提交
436 437 438 439 440

    param->queryId = schtQueryId;  
    param->taskId = schtFetchTaskId;

    int32_t code = 0;
wafwerar's avatar
wafwerar 已提交
441
    SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, sizeof(SRetrieveTableRsp));
D
dapan1121 已提交
442 443 444 445 446 447
    rsp->completed = 1;
    rsp->numOfRows = 10;

    dataBuf.pData = rsp;
    dataBuf.len = sizeof(*rsp);

D
dapan1121 已提交
448
    code = schHandleCallback(param, &dataBuf, TDMT_SCH_FETCH_RSP, 0);
D
dapan1121 已提交
449 450 451
      
    assert(code == 0 || code);
  }
wafwerar's avatar
wafwerar 已提交
452
  return NULL;
D
dapan1121 已提交
453 454 455 456
}

void schtFreeQueryJob(int32_t freeThread) {
  static uint32_t freeNum = 0;
D
dapan1121 已提交
457
  int64_t job = queryJobRefId;
D
dapan1121 已提交
458
  
D
dapan1121 已提交
459
  if (job && atomic_val_compare_exchange_64(&queryJobRefId, job, 0)) {
D
dapan1121 已提交
460
    schedulerFreeJob(&job, 0);
D
dapan1121 已提交
461 462 463 464 465 466 467 468 469 470 471 472 473 474
    if (freeThread) {
      if (++freeNum % schtTestPrintNum == 0) {
        printf("FreeNum:%d\n", freeNum);
      }
    }
  }
}

void* schtRunJobThread(void *aa) {
  void *mockPointer = (void *)0x1;
  char *clusterId = "cluster1";
  char *dbname = "1.db1";
  char *tablename = "table1";
  SVgroupInfo vgInfo = {0};
X
Xiaoyu Wang 已提交
475
  SQueryPlan dag;
D
dapan1121 已提交
476 477 478 479

  schtInitLogFile();

  
D
dapan1121 已提交
480
  int32_t code = schedulerInit();
D
dapan1121 已提交
481 482 483 484 485 486 487
  assert(code == 0);


  schtSetPlanToString();
  schtSetExecNode();
  schtSetAsyncSendMsgToServer();

D
dapan1121 已提交
488
  SSchJob *pJob = NULL;
D
dapan1121 已提交
489
  SSchTaskCallbackParam *param = NULL;
D
dapan1121 已提交
490 491 492
  SHashObj *execTasks = NULL;
  SDataBuf dataBuf = {0};
  uint32_t jobFinished = 0;
D
dapan1121 已提交
493
  int32_t queryDone = 0;
D
dapan1121 已提交
494 495 496 497

  while (!schtTestStop) {
    schtBuildQueryDag(&dag);

H
Haojun Liao 已提交
498
    SArray *qnodeList = taosArrayInit(1, sizeof(SEp));
D
dapan1121 已提交
499

H
Haojun Liao 已提交
500
    SEp qnodeAddr = {0};
D
dapan1121 已提交
501 502 503 504
    strcpy(qnodeAddr.fqdn, "qnode0.ep");
    qnodeAddr.port = 6031;
    taosArrayPush(qnodeList, &qnodeAddr);

D
dapan1121 已提交
505
    queryDone = 0;
D
dapan1121 已提交
506 507 508 509
    
    SRequestConnInfo conn = {0};
    conn.pTrans = mockPointer;
    SSchedulerReq req = {0};    
D
dapan1121 已提交
510
    req.syncReq = false;
D
dapan1121 已提交
511 512 513 514
    req.pConn = &conn;
    req.pNodeList = qnodeList;
    req.pDag = &dag;
    req.sql = "select * from tb";
D
dapan1121 已提交
515
    req.execFp = schtQueryCb;
D
dapan1121 已提交
516
    req.cbParam = &queryDone;
D
dapan1121 已提交
517
    
D
dapan1121 已提交
518
    code = schedulerExecJob(&req, &queryJobRefId);      
D
dapan1121 已提交
519 520
    assert(code == 0);

D
dapan1121 已提交
521 522 523 524 525 526 527
    pJob = schAcquireJob(queryJobRefId);
    if (NULL == pJob) {
      taosArrayDestroy(qnodeList);
      schtFreeQueryDag(&dag);
      continue;
    }
    
D
dapan1121 已提交
528
    execTasks = taosHashInit(5, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
D
dapan1121 已提交
529
    void *pIter = taosHashIterate(pJob->execTasks, NULL);
D
dapan1121 已提交
530 531 532 533 534
    while (pIter) {
      SSchTask *task = *(SSchTask **)pIter;
      schtFetchTaskId = task->taskId - 1;
      
      taosHashPut(execTasks, &task->taskId, sizeof(task->taskId), task, sizeof(*task));
D
dapan1121 已提交
535
      pIter = taosHashIterate(pJob->execTasks, pIter);
D
dapan1121 已提交
536 537
    }    

538
    param = (SSchTaskCallbackParam *)taosMemoryCalloc(1, sizeof(*param));
D
dapan1121 已提交
539 540
    param->refId = queryJobRefId;
    param->queryId = pJob->queryId;   
D
dapan1121 已提交
541 542 543 544 545 546 547 548 549 550

    pIter = taosHashIterate(execTasks, NULL);
    while (pIter) {
      SSchTask *task = (SSchTask *)pIter;

      param->taskId = task->taskId;
      SQueryTableRsp rsp = {0};
      dataBuf.pData = &rsp;
      dataBuf.len = sizeof(rsp);
      
D
dapan1121 已提交
551
      code = schHandleCallback(param, &dataBuf, TDMT_SCH_QUERY_RSP, 0);
D
dapan1121 已提交
552 553 554 555 556
      assert(code == 0 || code);

      pIter = taosHashIterate(execTasks, pIter);
    }    

557
    param = (SSchTaskCallbackParam *)taosMemoryCalloc(1, sizeof(*param));
D
dapan1121 已提交
558 559
    param->refId = queryJobRefId;
    param->queryId = pJob->queryId;   
D
dapan1121 已提交
560 561 562 563 564 565 566 567 568 569

    pIter = taosHashIterate(execTasks, NULL);
    while (pIter) {
      SSchTask *task = (SSchTask *)pIter;

      param->taskId = task->taskId - 1;
      SQueryTableRsp rsp = {0};
      dataBuf.pData = &rsp;
      dataBuf.len = sizeof(rsp);
      
D
dapan1121 已提交
570
      code = schHandleCallback(param, &dataBuf, TDMT_SCH_QUERY_RSP, 0);
D
dapan1121 已提交
571 572 573 574 575 576
      assert(code == 0 || code);
      
      pIter = taosHashIterate(execTasks, pIter);
    }    


D
dapan1121 已提交
577 578 579 580 581 582 583 584
    while (true) {
      if (queryDone) {
        break;
      }
    
      taosUsleep(10000);
    }

D
dapan1121 已提交
585 586 587
    atomic_store_32(&schtStartFetch, 1);

    void *data = NULL;  
D
dapan1121 已提交
588 589 590 591
    req.syncReq = true;
    req.pFetchRes = &data;
    
    code = schedulerFetchRows(queryJobRefId, &req);
D
dapan1121 已提交
592 593 594 595 596 597 598 599 600
    assert(code == 0 || code);

    if (0 == code) {
      SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data;
      assert(pRsp->completed == 1);
      assert(pRsp->numOfRows == 10);
    }

    data = NULL;
D
dapan1121 已提交
601
    code = schedulerFetchRows(queryJobRefId, &req);
D
dapan1121 已提交
602 603 604 605 606
    assert(code == 0 || code);
    
    schtFreeQueryJob(0);

    taosHashCleanup(execTasks);
D
dapan1121 已提交
607
    taosArrayDestroy(qnodeList);
D
dapan1121 已提交
608 609 610 611 612 613 614 615 616 617 618 619

    schtFreeQueryDag(&dag);

    if (++jobFinished % schtTestPrintNum == 0) {
      printf("jobFinished:%d\n", jobFinished);
    }

    ++schtQueryId;
  }

  schedulerDestroy();

wafwerar's avatar
wafwerar 已提交
620
  return NULL;
D
dapan1121 已提交
621 622 623 624
}

void* schtFreeJobThread(void *aa) {
  while (!schtTestStop) {
wafwerar's avatar
wafwerar 已提交
625
    taosUsleep(taosRand() % 100);
D
dapan1121 已提交
626 627
    schtFreeQueryJob(1);
  }
wafwerar's avatar
wafwerar 已提交
628
  return NULL;
D
dapan1121 已提交
629
}
D
dapan1121 已提交
630 631


D
dapan1121 已提交
632 633
}

D
dapan 已提交
634
TEST(queryTest, normalCase) {
D
dapan1121 已提交
635
  void *mockPointer = (void *)0x1;
D
dapan1121 已提交
636 637 638
  char *clusterId = "cluster1";
  char *dbname = "1.db1";
  char *tablename = "table1";
D
dapan1121 已提交
639
  SVgroupInfo vgInfo = {0};
D
dapan1121 已提交
640
  int64_t job = 0;
H
Haojun Liao 已提交
641
  SQueryPlan dag;
D
dapan1121 已提交
642

643
  memset(&dag, 0, sizeof(dag));
D
dapan1121 已提交
644

H
Haojun Liao 已提交
645
  SArray *qnodeList = taosArrayInit(1, sizeof(SEp));
D
dapan 已提交
646

H
Haojun Liao 已提交
647
  SEp qnodeAddr = {0};
D
dapan 已提交
648 649 650
  strcpy(qnodeAddr.fqdn, "qnode0.ep");
  qnodeAddr.port = 6031;
  taosArrayPush(qnodeList, &qnodeAddr);
D
dapan1121 已提交
651 652
  
  int32_t code = schedulerInit(NULL);
D
dapan1121 已提交
653
  ASSERT_EQ(code, 0);
D
dapan1121 已提交
654

D
dapan 已提交
655
  schtBuildQueryDag(&dag);
D
dapan 已提交
656 657 658

  schtSetPlanToString();
  schtSetExecNode();
D
dapan1121 已提交
659
  schtSetAsyncSendMsgToServer();
D
dapan1121 已提交
660 661

  int32_t queryDone = 0;
D
dapan1121 已提交
662 663 664

  SRequestConnInfo conn = {0};
  conn.pTrans = mockPointer;
D
dapan1121 已提交
665
  SSchedulerReq req = {0};   
D
dapan1121 已提交
666 667 668 669
  req.pConn = &conn;
  req.pNodeList = qnodeList;
  req.pDag = &dag;
  req.sql = "select * from tb";
D
dapan1121 已提交
670
  req.execFp = schtQueryCb;
D
dapan1121 已提交
671
  req.cbParam = &queryDone;
D
dapan1121 已提交
672
    
D
dapan1121 已提交
673
  code = schedulerExecJob(&req, &job);  
D
dapan1121 已提交
674
  ASSERT_EQ(code, 0);
D
dapan 已提交
675

D
dapan1121 已提交
676 677 678 679
  
  SSchJob *pJob = schAcquireJob(job);
  
  void *pIter = taosHashIterate(pJob->execTasks, NULL);
D
dapan 已提交
680
  while (pIter) {
D
dapan 已提交
681
    SSchTask *task = *(SSchTask **)pIter;
D
dapan 已提交
682 683

    SQueryTableRsp rsp = {0};
D
dapan1121 已提交
684
    code = schHandleResponseMsg(pJob, task, TDMT_SCH_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
D
dapan 已提交
685 686
    
    ASSERT_EQ(code, 0);
D
dapan1121 已提交
687
    pIter = taosHashIterate(pJob->execTasks, pIter);
D
dapan 已提交
688 689
  }    

D
dapan1121 已提交
690
  pIter = taosHashIterate(pJob->execTasks, NULL);
D
dapan 已提交
691
  while (pIter) {
D
dapan 已提交
692
    SSchTask *task = *(SSchTask **)pIter;
D
dapan 已提交
693 694

    SQueryTableRsp rsp = {0};
D
dapan1121 已提交
695
    code = schHandleResponseMsg(pJob, task, TDMT_SCH_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
D
dapan 已提交
696 697
    
    ASSERT_EQ(code, 0);
D
dapan1121 已提交
698
    pIter = taosHashIterate(pJob->execTasks, pIter);
D
dapan 已提交
699 700
  }    

D
dapan1121 已提交
701 702 703 704 705 706 707 708
  while (true) {
    if (queryDone) {
      break;
    }

    taosUsleep(10000);
  }
  
wafwerar's avatar
wafwerar 已提交
709 710
  TdThreadAttr thattr;
  taosThreadAttrInit(&thattr);
D
dapan 已提交
711

wafwerar's avatar
wafwerar 已提交
712 713
  TdThread thread1;
  taosThreadCreate(&(thread1), &thattr, schtCreateFetchRspThread, &job);
D
dapan 已提交
714

D
dapan1121 已提交
715
  void *data = NULL;  
D
dapan1121 已提交
716 717 718 719
  req.syncReq = true;
  req.pFetchRes = &data;

  code = schedulerFetchRows(job, &req);
D
dapan 已提交
720 721 722 723 724
  ASSERT_EQ(code, 0);

  SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data;
  ASSERT_EQ(pRsp->completed, 1);
  ASSERT_EQ(pRsp->numOfRows, 10);
wafwerar's avatar
wafwerar 已提交
725
  taosMemoryFreeClear(data);
D
dapan 已提交
726 727

  data = NULL;
D
dapan1121 已提交
728
  code = schedulerFetchRows(job, &req);
D
dapan 已提交
729
  ASSERT_EQ(code, 0);
D
dapan1121 已提交
730 731 732
  ASSERT_TRUE(data == NULL);

  schReleaseJob(job);
D
dapan 已提交
733

D
dapan1121 已提交
734
  schedulerFreeJob(&job, 0);
D
dapan1121 已提交
735 736

  schtFreeQueryDag(&dag);
D
dapan1121 已提交
737 738

  schedulerDestroy();
D
dapan1121 已提交
739
}
D
dapan1121 已提交
740

D
dapan 已提交
741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766
TEST(queryTest, readyFirstCase) {
  void *mockPointer = (void *)0x1;
  char *clusterId = "cluster1";
  char *dbname = "1.db1";
  char *tablename = "table1";
  SVgroupInfo vgInfo = {0};
  int64_t job = 0;
  SQueryPlan dag;

  memset(&dag, 0, sizeof(dag));

  SArray *qnodeList = taosArrayInit(1, sizeof(SEp));

  SEp qnodeAddr = {0};
  strcpy(qnodeAddr.fqdn, "qnode0.ep");
  qnodeAddr.port = 6031;
  taosArrayPush(qnodeList, &qnodeAddr);
  
  int32_t code = schedulerInit(NULL);
  ASSERT_EQ(code, 0);

  schtBuildQueryDag(&dag);

  schtSetPlanToString();
  schtSetExecNode();
  schtSetAsyncSendMsgToServer();
D
dapan1121 已提交
767 768

  int32_t queryDone = 0;  
D
dapan1121 已提交
769 770 771 772 773 774 775 776

  SRequestConnInfo conn = {0};
  conn.pTrans = mockPointer;
  SSchedulerReq req = {0};    
  req.pConn = &conn;
  req.pNodeList = qnodeList;
  req.pDag = &dag;
  req.sql = "select * from tb";
D
dapan1121 已提交
777
  req.execFp = schtQueryCb;
D
dapan1121 已提交
778
  req.cbParam = &queryDone;
D
dapan1121 已提交
779
  code = schedulerExecJob(&req, &job);
D
dapan 已提交
780 781 782 783 784
  ASSERT_EQ(code, 0);

  
  SSchJob *pJob = schAcquireJob(job);
  
D
dapan1121 已提交
785
  void *pIter = taosHashIterate(pJob->execTasks, NULL);
D
dapan 已提交
786 787 788 789
  while (pIter) {
    SSchTask *task = *(SSchTask **)pIter;

    SQueryTableRsp rsp = {0};
D
dapan1121 已提交
790
    code = schHandleResponseMsg(pJob, task, TDMT_SCH_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
D
dapan 已提交
791 792 793 794 795 796 797 798 799 800
    
    ASSERT_EQ(code, 0);
    pIter = taosHashIterate(pJob->execTasks, pIter);
  }    

  pIter = taosHashIterate(pJob->execTasks, NULL);
  while (pIter) {
    SSchTask *task = *(SSchTask **)pIter;

    SQueryTableRsp rsp = {0};
D
dapan1121 已提交
801
    code = schHandleResponseMsg(pJob, task, TDMT_SCH_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
D
dapan 已提交
802 803 804 805 806
    
    ASSERT_EQ(code, 0);
    pIter = taosHashIterate(pJob->execTasks, pIter);
  }    

D
dapan1121 已提交
807 808 809 810 811 812 813
  while (true) {
    if (queryDone) {
      break;
    }

    taosUsleep(10000);
  }
D
dapan 已提交
814 815


D
dapan1121 已提交
816 817
  TdThreadAttr thattr;
  taosThreadAttrInit(&thattr);
D
dapan 已提交
818

D
dapan1121 已提交
819 820
  TdThread thread1;
  taosThreadCreate(&(thread1), &thattr, schtCreateFetchRspThread, &job);
D
dapan 已提交
821 822

  void *data = NULL;  
D
dapan1121 已提交
823 824 825
  req.syncReq = true;
  req.pFetchRes = &data;
  code = schedulerFetchRows(job, &req);
D
dapan 已提交
826 827 828 829 830
  ASSERT_EQ(code, 0);

  SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data;
  ASSERT_EQ(pRsp->completed, 1);
  ASSERT_EQ(pRsp->numOfRows, 10);
D
dapan1121 已提交
831
  taosMemoryFreeClear(data);
D
dapan 已提交
832 833

  data = NULL;
D
dapan1121 已提交
834
  code = schedulerFetchRows(job, &req);
D
dapan 已提交
835 836 837 838 839
  ASSERT_EQ(code, 0);
  ASSERT_TRUE(data == NULL);

  schReleaseJob(job);

D
dapan1121 已提交
840
  schedulerFreeJob(&job, 0);
D
dapan 已提交
841 842 843 844 845 846 847 848

  schtFreeQueryDag(&dag);

  schedulerDestroy();
}



D
dapan 已提交
849 850 851 852 853 854 855 856 857 858 859
TEST(queryTest, flowCtrlCase) {
  void *mockPointer = (void *)0x1;
  char *clusterId = "cluster1";
  char *dbname = "1.db1";
  char *tablename = "table1";
  SVgroupInfo vgInfo = {0};
  int64_t job = 0;
  SQueryPlan dag;

  schtInitLogFile();

wafwerar's avatar
wafwerar 已提交
860
  taosSeedRand(taosGetTimestampSec());
D
dapan 已提交
861
  
D
dapan 已提交
862 863 864 865 866 867 868 869 870 871
  SArray *qnodeList = taosArrayInit(1, sizeof(SEp));

  SEp qnodeAddr = {0};
  strcpy(qnodeAddr.fqdn, "qnode0.ep");
  qnodeAddr.port = 6031;
  taosArrayPush(qnodeList, &qnodeAddr);
  
  int32_t code = schedulerInit(NULL);
  ASSERT_EQ(code, 0);

D
dapan 已提交
872
  schtBuildQueryFlowCtrlDag(&dag);
D
dapan 已提交
873 874 875 876

  schtSetPlanToString();
  schtSetExecNode();
  schtSetAsyncSendMsgToServer();
D
dapan1121 已提交
877 878

  int32_t queryDone = 0;  
D
dapan1121 已提交
879 880 881 882 883 884 885
  SRequestConnInfo conn = {0};
  conn.pTrans = mockPointer;
  SSchedulerReq req = {0};    
  req.pConn = &conn;
  req.pNodeList = qnodeList;
  req.pDag = &dag;
  req.sql = "select * from tb";
D
dapan1121 已提交
886
  req.execFp = schtQueryCb;
D
dapan1121 已提交
887
  req.cbParam = &queryDone;
D
dapan1121 已提交
888

D
dapan1121 已提交
889
  code = schedulerExecJob(&req, &job);
D
dapan 已提交
890 891 892 893 894
  ASSERT_EQ(code, 0);

  
  SSchJob *pJob = schAcquireJob(job);

D
dapan1121 已提交
895
  bool qDone = false;
D
dapan 已提交
896
  
D
dapan1121 已提交
897
  while (!qDone) {
D
dapan 已提交
898 899 900 901
    void *pIter = taosHashIterate(pJob->execTasks, NULL);
    if (NULL == pIter) {
      break;
    }
D
dapan 已提交
902
    
D
dapan 已提交
903 904
    while (pIter) {
      SSchTask *task = *(SSchTask **)pIter;
D
dapan 已提交
905

D
dapan 已提交
906 907
      taosHashCancelIterate(pJob->execTasks, pIter);

D
dapan1121 已提交
908
      if (task->lastMsgType == TDMT_SCH_QUERY) {
D
dapan 已提交
909
        SQueryTableRsp rsp = {0};
D
dapan1121 已提交
910
        code = schHandleResponseMsg(pJob, task, TDMT_SCH_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
D
dapan 已提交
911 912 913
        
        ASSERT_EQ(code, 0);
      } else {
D
dapan1121 已提交
914
        qDone = true;
D
dapan 已提交
915 916 917 918 919 920
        break;
      }
      
      pIter = NULL;
    }    
  }
D
dapan 已提交
921

D
dapan1121 已提交
922 923 924 925 926 927 928
  while (true) {
    if (queryDone) {
      break;
    }

    taosUsleep(10000);
  }
D
dapan 已提交
929

wafwerar's avatar
wafwerar 已提交
930 931
  TdThreadAttr thattr;
  taosThreadAttrInit(&thattr);
D
dapan 已提交
932

wafwerar's avatar
wafwerar 已提交
933 934
  TdThread thread1;
  taosThreadCreate(&(thread1), &thattr, schtCreateFetchRspThread, &job);
D
dapan 已提交
935 936

  void *data = NULL;  
D
dapan1121 已提交
937 938 939
  req.syncReq = true;
  req.pFetchRes = &data;
  code = schedulerFetchRows(job, &req);
D
dapan 已提交
940 941 942 943 944
  ASSERT_EQ(code, 0);

  SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data;
  ASSERT_EQ(pRsp->completed, 1);
  ASSERT_EQ(pRsp->numOfRows, 10);
wafwerar's avatar
wafwerar 已提交
945
  taosMemoryFreeClear(data);
D
dapan 已提交
946 947

  data = NULL;
D
dapan1121 已提交
948
  code = schedulerFetchRows(job, &req);
D
dapan 已提交
949 950 951 952 953
  ASSERT_EQ(code, 0);
  ASSERT_TRUE(data == NULL);

  schReleaseJob(job);

D
dapan1121 已提交
954
  schedulerFreeJob(&job, 0);
D
dapan 已提交
955 956 957 958 959

  schtFreeQueryDag(&dag);

  schedulerDestroy();
}
D
dapan1121 已提交
960

D
dapan 已提交
961 962 963 964 965 966 967

TEST(insertTest, normalCase) {
  void *mockPointer = (void *)0x1;
  char *clusterId = "cluster1";
  char *dbname = "1.db1";
  char *tablename = "table1";
  SVgroupInfo vgInfo = {0};
X
Xiaoyu Wang 已提交
968
  SQueryPlan dag;
D
dapan 已提交
969
  uint64_t numOfRows = 0;
D
dapan1121 已提交
970

H
Haojun Liao 已提交
971
  SArray *qnodeList = taosArrayInit(1, sizeof(SEp));
D
dapan 已提交
972

H
Haojun Liao 已提交
973
  SEp qnodeAddr = {0};
D
dapan 已提交
974 975 976 977 978 979 980 981 982 983
  strcpy(qnodeAddr.fqdn, "qnode0.ep");
  qnodeAddr.port = 6031;
  taosArrayPush(qnodeList, &qnodeAddr);
  
  int32_t code = schedulerInit(NULL);
  ASSERT_EQ(code, 0);

  schtBuildInsertDag(&dag);

  schtSetPlanToString();
D
dapan1121 已提交
984
  schtSetAsyncSendMsgToServer();
D
dapan 已提交
985

wafwerar's avatar
wafwerar 已提交
986 987
  TdThreadAttr thattr;
  taosThreadAttrInit(&thattr);
D
dapan 已提交
988

wafwerar's avatar
wafwerar 已提交
989 990
  TdThread thread1;
  taosThreadCreate(&(thread1), &thattr, schtSendRsp, &insertJobRefId);
D
dapan1121 已提交
991

D
dapan1121 已提交
992
  SExecResult res = {0};
D
dapan1121 已提交
993 994 995 996 997 998 999 1000

  SRequestConnInfo conn = {0};
  conn.pTrans = mockPointer;
  SSchedulerReq req = {0};    
  req.pConn = &conn;
  req.pNodeList = qnodeList;
  req.pDag = &dag;
  req.sql = "insert into tb values(now,1)";
D
dapan1121 已提交
1001
  req.execFp = schtQueryCb;
D
dapan1121 已提交
1002
  req.cbParam = NULL;
D
dapan1121 已提交
1003
  
D
dapan1121 已提交
1004
  code = schedulerExecJob(&req, &insertJobRefId);
D
dapan 已提交
1005
  ASSERT_EQ(code, 0);
D
dapan1121 已提交
1006
  ASSERT_EQ(res.numOfRows, 20);
D
dapan 已提交
1007

D
dapan1121 已提交
1008
  schedulerFreeJob(&insertJobRefId, 0);
D
dapan1121 已提交
1009 1010

  schedulerDestroy();  
D
dapan 已提交
1011 1012
}

D
dapan1121 已提交
1013
TEST(multiThread, forceFree) {
wafwerar's avatar
wafwerar 已提交
1014 1015
  TdThreadAttr thattr;
  taosThreadAttrInit(&thattr);
D
dapan 已提交
1016

wafwerar's avatar
wafwerar 已提交
1017 1018 1019 1020
  TdThread thread1, thread2, thread3;
  taosThreadCreate(&(thread1), &thattr, schtRunJobThread, NULL);
  taosThreadCreate(&(thread2), &thattr, schtFreeJobThread, NULL);
  taosThreadCreate(&(thread3), &thattr, schtFetchRspThread, NULL);
D
dapan1121 已提交
1021

D
dapan1121 已提交
1022 1023
  while (true) {
    if (schtTestDeadLoop) {
wafwerar's avatar
wafwerar 已提交
1024
      taosSsleep(1);
D
dapan1121 已提交
1025
    } else {
wafwerar's avatar
wafwerar 已提交
1026
      taosSsleep(schtTestMTRunSec);
D
dapan1121 已提交
1027 1028 1029 1030 1031
      break;
    }
  }
  
  schtTestStop = true;
wafwerar's avatar
wafwerar 已提交
1032
  taosSsleep(3);
D
dapan1121 已提交
1033
}
D
dapan 已提交
1034

D
dapan1121 已提交
1035
int main(int argc, char** argv) {
wafwerar's avatar
wafwerar 已提交
1036
  taosSeedRand(taosGetTimestampSec());
D
dapan1121 已提交
1037 1038 1039 1040
  testing::InitGoogleTest(&argc, argv);
  return RUN_ALL_TESTS();
}

D
dapan1121 已提交
1041
#pragma GCC diagnostic pop