schedulerTests.cpp 26.3 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include <gtest/gtest.h>
#include <iostream>

19 20 21 22 23 24 25 26 27
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
#pragma GCC diagnostic ignored "-Wreturn-type"
#pragma GCC diagnostic ignored "-Wformat"
#include <addr_any.h>

wafwerar's avatar
wafwerar 已提交
28 29 30
#ifdef WINDOWS
#define TD_USE_WINSOCK
#endif
D
dapan1121 已提交
31 32
#include "os.h"

33
#include "tglobal.h"
D
dapan1121 已提交
34 35 36
#include "taos.h"
#include "tdef.h"
#include "tvariant.h"
D
dapan1121 已提交
37 38
#include "catalog.h"
#include "scheduler.h"
H
Haojun Liao 已提交
39 40 41
#include "taos.h"
#include "tdatablock.h"
#include "tdef.h"
D
dapan1121 已提交
42
#include "trpc.h"
H
Haojun Liao 已提交
43
#include "tvariant.h"
S
Shengliang Guan 已提交
44 45 46 47 48 49 50 51 52

#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
#pragma GCC diagnostic ignored "-Wreturn-type"
#pragma GCC diagnostic ignored "-Wformat"

D
dapan 已提交
53 54
#include "schedulerInt.h"
#include "stub.h"
D
dapan1121 已提交
55
#include "tref.h"
D
dapan1121 已提交
56

D
dapan1121 已提交
57
namespace {
D
dapan 已提交
58

D
dapan1121 已提交
59
extern "C" int32_t schHandleResponseMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode);
D
dapan1121 已提交
60 61
extern "C" int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, int32_t rspCode);

D
dapan1121 已提交
62 63
int64_t insertJobRefId = 0;
int64_t queryJobRefId = 0;
D
dapan1121 已提交
64 65 66 67 68 69 70

uint64_t schtMergeTemplateId = 0x4;
uint64_t schtFetchTaskId = 0;
uint64_t schtQueryId = 1;

bool schtTestStop = false;
bool schtTestDeadLoop = false;
D
dapan1121 已提交
71
int32_t schtTestMTRunSec = 10;
D
dapan1121 已提交
72 73
int32_t schtTestPrintNum = 1000;
int32_t schtStartFetch = 0;
D
dapan 已提交
74

D
dapan1121 已提交
75

D
dapan1121 已提交
76 77 78 79 80 81
void schtInitLogFile() {
  const char    *defaultLogFileNamePrefix = "taoslog";
  const int32_t  maxLogFileNum = 10;

  tsAsyncLog = 0;
  qDebugFlag = 159;
wafwerar's avatar
wafwerar 已提交
82
  strcpy(tsLogDir, TD_LOG_DIR_PATH);
D
dapan1121 已提交
83

S
Shengliang Guan 已提交
84
  if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) {
S
os env  
Shengliang Guan 已提交
85
    printf("failed to open log file in directory:%s\n", tsLogDir);
D
dapan1121 已提交
86 87 88 89
  }

}

D
dapan1121 已提交
90 91 92 93 94
void schtQueryCb(SQueryResult* pResult, void* param, int32_t code) {
  assert(TSDB_CODE_SUCCESS == code);
  *(int32_t*)param = 1;
}

D
dapan1121 已提交
95

X
Xiaoyu Wang 已提交
96
void schtBuildQueryDag(SQueryPlan *dag) {
D
dapan1121 已提交
97
  uint64_t qId = schtQueryId;
D
dapan1121 已提交
98 99 100
  
  dag->queryId = qId;
  dag->numOfSubplans = 2;
X
Xiaoyu Wang 已提交
101 102 103
  dag->pSubplans = nodesMakeList();
  SNodeListNode *scan = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST);
  SNodeListNode *merge = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST);
D
dapan1121 已提交
104
  
wafwerar's avatar
wafwerar 已提交
105 106
  SSubplan *scanPlan = (SSubplan *)taosMemoryCalloc(1, sizeof(SSubplan));
  SSubplan *mergePlan = (SSubplan *)taosMemoryCalloc(1, sizeof(SSubplan));
D
dapan1121 已提交
107 108

  scanPlan->id.queryId = qId;
X
Xiaoyu Wang 已提交
109
  scanPlan->id.groupId = 0x0000000000000002;
D
dapan1121 已提交
110
  scanPlan->id.subplanId = 0x0000000000000003;
X
Xiaoyu Wang 已提交
111
  scanPlan->subplanType = SUBPLAN_TYPE_SCAN;
H
Haojun Liao 已提交
112

D
dapan1121 已提交
113
  scanPlan->execNode.nodeId = 1;
L
Liu Jicong 已提交
114 115
  scanPlan->execNode.epSet.inUse = 0;
  addEpIntoEpSet(&scanPlan->execNode.epSet, "ep0", 6030);
H
Haojun Liao 已提交
116

D
dapan1121 已提交
117 118
  scanPlan->pChildren = NULL;
  scanPlan->level = 1;
X
Xiaoyu Wang 已提交
119
  scanPlan->pParents = nodesMakeList();
wafwerar's avatar
wafwerar 已提交
120
  scanPlan->pNode = (SPhysiNode*)taosMemoryCalloc(1, sizeof(SPhysiNode));
D
dapan1121 已提交
121
  scanPlan->msgType = TDMT_VND_QUERY;
D
dapan1121 已提交
122 123

  mergePlan->id.queryId = qId;
X
Xiaoyu Wang 已提交
124
  mergePlan->id.groupId = schtMergeTemplateId;
X
Xiaoyu Wang 已提交
125 126
  mergePlan->id.subplanId = 0x5555;
  mergePlan->subplanType = SUBPLAN_TYPE_MERGE;
D
dapan1121 已提交
127
  mergePlan->level = 0;
L
Liu Jicong 已提交
128
  mergePlan->execNode.epSet.numOfEps = 0;
H
Haojun Liao 已提交
129

X
Xiaoyu Wang 已提交
130
  mergePlan->pChildren = nodesMakeList();
D
dapan1121 已提交
131
  mergePlan->pParents = NULL;
wafwerar's avatar
wafwerar 已提交
132
  mergePlan->pNode = (SPhysiNode*)taosMemoryCalloc(1, sizeof(SPhysiNode));
D
dapan1121 已提交
133
  mergePlan->msgType = TDMT_VND_QUERY;
D
dapan1121 已提交
134

D
dapan 已提交
135 136 137
  merge->pNodeList = nodesMakeList();
  scan->pNodeList = nodesMakeList();

X
Xiaoyu Wang 已提交
138 139
  nodesListAppend(merge->pNodeList, (SNode*)mergePlan);
  nodesListAppend(scan->pNodeList, (SNode*)scanPlan);
D
dapan1121 已提交
140

X
Xiaoyu Wang 已提交
141 142
  nodesListAppend(mergePlan->pChildren, (SNode*)scanPlan);
  nodesListAppend(scanPlan->pParents, (SNode*)mergePlan);
D
dapan1121 已提交
143

X
Xiaoyu Wang 已提交
144 145
  nodesListAppend(dag->pSubplans, (SNode*)merge);  
  nodesListAppend(dag->pSubplans, (SNode*)scan);
D
dapan1121 已提交
146 147
}

D
dapan 已提交
148 149 150 151 152 153 154 155 156 157
void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) {
  uint64_t qId = schtQueryId;
  int32_t scanPlanNum = 20;
  
  dag->queryId = qId;
  dag->numOfSubplans = 2;
  dag->pSubplans = nodesMakeList();
  SNodeListNode *scan = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST);
  SNodeListNode *merge = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST);
  
wafwerar's avatar
wafwerar 已提交
158 159
  SSubplan *scanPlan = (SSubplan *)taosMemoryCalloc(scanPlanNum, sizeof(SSubplan));
  SSubplan *mergePlan = (SSubplan *)taosMemoryCalloc(1, sizeof(SSubplan));
D
dapan 已提交
160 161 162 163 164 165 166 167

  merge->pNodeList = nodesMakeList();
  scan->pNodeList = nodesMakeList();

  mergePlan->pChildren = nodesMakeList();

  for (int32_t i = 0; i < scanPlanNum; ++i) {
    scanPlan[i].id.queryId = qId;
X
Xiaoyu Wang 已提交
168
    scanPlan[i].id.groupId = 0x0000000000000002;
D
dapan 已提交
169 170 171 172
    scanPlan[i].id.subplanId = 0x0000000000000003 + i;
    scanPlan[i].subplanType = SUBPLAN_TYPE_SCAN;

    scanPlan[i].execNode.nodeId = 1 + i;
X
Xiaoyu Wang 已提交
173 174 175 176 177 178
    scanPlan[i].execNode.epSet.inUse = 0;
    scanPlan[i].execNodeStat.tableNum = taosRand() % 30;
    addEpIntoEpSet(&scanPlan[i].execNode.epSet, "ep0", 6030);
    addEpIntoEpSet(&scanPlan[i].execNode.epSet, "ep1", 6030);
    addEpIntoEpSet(&scanPlan[i].execNode.epSet, "ep2", 6030);
    scanPlan[i].execNode.epSet.inUse = taosRand() % 3;
D
dapan 已提交
179 180 181 182

    scanPlan[i].pChildren = NULL;
    scanPlan[i].level = 1;
    scanPlan[i].pParents = nodesMakeList();
wafwerar's avatar
wafwerar 已提交
183
    scanPlan[i].pNode = (SPhysiNode*)taosMemoryCalloc(1, sizeof(SPhysiNode));
D
dapan 已提交
184 185 186 187 188 189 190 191 192
    scanPlan[i].msgType = TDMT_VND_QUERY;

    nodesListAppend(scanPlan[i].pParents, (SNode*)mergePlan);
    nodesListAppend(mergePlan->pChildren, (SNode*)(scanPlan + i));

    nodesListAppend(scan->pNodeList, (SNode*)(scanPlan + i));
  }

  mergePlan->id.queryId = qId;
X
Xiaoyu Wang 已提交
193
  mergePlan->id.groupId = schtMergeTemplateId;
D
dapan 已提交
194 195 196
  mergePlan->id.subplanId = 0x5555;
  mergePlan->subplanType = SUBPLAN_TYPE_MERGE;
  mergePlan->level = 0;
X
Xiaoyu Wang 已提交
197
  mergePlan->execNode.epSet.numOfEps = 0;
D
dapan 已提交
198 199

  mergePlan->pParents = NULL;
wafwerar's avatar
wafwerar 已提交
200
  mergePlan->pNode = (SPhysiNode*)taosMemoryCalloc(1, sizeof(SPhysiNode));
D
dapan 已提交
201 202 203 204 205 206 207 208 209
  mergePlan->msgType = TDMT_VND_QUERY;

  nodesListAppend(merge->pNodeList, (SNode*)mergePlan);

  nodesListAppend(dag->pSubplans, (SNode*)merge);  
  nodesListAppend(dag->pSubplans, (SNode*)scan);
}


X
Xiaoyu Wang 已提交
210
void schtFreeQueryDag(SQueryPlan *dag) {
D
dapan1121 已提交
211 212 213 214

}


X
Xiaoyu Wang 已提交
215
void schtBuildInsertDag(SQueryPlan *dag) {
D
dapan 已提交
216 217 218 219
  uint64_t qId = 0x0000000000000002;
  
  dag->queryId = qId;
  dag->numOfSubplans = 2;
X
Xiaoyu Wang 已提交
220 221
  dag->pSubplans = nodesMakeList();
  SNodeListNode *inserta = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST);
D
dapan 已提交
222
  
wafwerar's avatar
wafwerar 已提交
223
  SSubplan *insertPlan = (SSubplan *)taosMemoryCalloc(2, sizeof(SSubplan));
D
dapan 已提交
224 225

  insertPlan[0].id.queryId = qId;
X
Xiaoyu Wang 已提交
226
  insertPlan[0].id.groupId = 0x0000000000000003;
D
dapan 已提交
227
  insertPlan[0].id.subplanId = 0x0000000000000004;
X
Xiaoyu Wang 已提交
228
  insertPlan[0].subplanType = SUBPLAN_TYPE_MODIFY;
D
dapan 已提交
229
  insertPlan[0].level = 0;
H
Haojun Liao 已提交
230

D
dapan1121 已提交
231
  insertPlan[0].execNode.nodeId = 1;
L
Liu Jicong 已提交
232 233
  insertPlan[0].execNode.epSet.inUse = 0;
  addEpIntoEpSet(&insertPlan[0].execNode.epSet, "ep0", 6030);
H
Haojun Liao 已提交
234

H
Haojun Liao 已提交
235
  insertPlan[0].pChildren = NULL;
D
dapan 已提交
236 237
  insertPlan[0].pParents = NULL;
  insertPlan[0].pNode = NULL;
wafwerar's avatar
wafwerar 已提交
238
  insertPlan[0].pDataSink = (SDataSinkNode*)taosMemoryCalloc(1, sizeof(SDataSinkNode));
D
dapan1121 已提交
239
  insertPlan[0].msgType = TDMT_VND_SUBMIT;
D
dapan 已提交
240 241

  insertPlan[1].id.queryId = qId;
X
Xiaoyu Wang 已提交
242
  insertPlan[1].id.groupId = 0x0000000000000003;
D
dapan 已提交
243
  insertPlan[1].id.subplanId = 0x0000000000000005;
X
Xiaoyu Wang 已提交
244
  insertPlan[1].subplanType = SUBPLAN_TYPE_MODIFY;
D
dapan 已提交
245
  insertPlan[1].level = 0;
H
Haojun Liao 已提交
246

D
dapan1121 已提交
247
  insertPlan[1].execNode.nodeId = 1;
L
Liu Jicong 已提交
248 249
  insertPlan[1].execNode.epSet.inUse = 0;
  addEpIntoEpSet(&insertPlan[1].execNode.epSet, "ep0", 6030);
H
Haojun Liao 已提交
250

H
Haojun Liao 已提交
251
  insertPlan[1].pChildren = NULL;
D
dapan 已提交
252 253
  insertPlan[1].pParents = NULL;
  insertPlan[1].pNode = NULL;
wafwerar's avatar
wafwerar 已提交
254
  insertPlan[1].pDataSink = (SDataSinkNode*)taosMemoryCalloc(1, sizeof(SDataSinkNode));
D
dapan1121 已提交
255
  insertPlan[1].msgType = TDMT_VND_SUBMIT;
D
dapan 已提交
256

D
dapan 已提交
257 258
  inserta->pNodeList = nodesMakeList();

X
Xiaoyu Wang 已提交
259
  nodesListAppend(inserta->pNodeList, (SNode*)insertPlan);
D
dapan1121 已提交
260
  insertPlan += 1;
X
Xiaoyu Wang 已提交
261
  nodesListAppend(inserta->pNodeList, (SNode*)insertPlan);
D
dapan 已提交
262

X
Xiaoyu Wang 已提交
263
  nodesListAppend(dag->pSubplans, (SNode*)inserta);  
D
dapan 已提交
264 265 266
}


D
dapan 已提交
267
int32_t schtPlanToString(const SSubplan *subplan, char** str, int32_t* len) {
wafwerar's avatar
wafwerar 已提交
268
  *str = (char *)taosMemoryCalloc(1, 20);
D
dapan 已提交
269 270 271 272
  *len = 20;
  return 0;
}

X
Xiaoyu Wang 已提交
273
void schtExecNode(SSubplan* subplan, uint64_t groupId, SQueryNodeAddr* ep) {
H
Haojun Liao 已提交
274

D
dapan 已提交
275 276
}

D
dapan1121 已提交
277 278 279 280
void schtRpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) {

}

D
dapan 已提交
281 282 283 284
void schtSetPlanToString() {
  static Stub stub;
  stub.set(qSubPlanToString, schtPlanToString);
  {
wafwerar's avatar
wafwerar 已提交
285 286 287 288 289 290
#ifdef WINDOWS
    AddrAny any;
    std::map<std::string,void*> result;
    any.get_func_addr("qSubPlanToString", result);
#endif
#ifdef LINUX
D
dapan 已提交
291 292 293
    AddrAny any("libplanner.so");
    std::map<std::string,void*> result;
    any.get_global_func_addr_dynsym("^qSubPlanToString$", result);
wafwerar's avatar
wafwerar 已提交
294
#endif
D
dapan 已提交
295 296 297 298 299 300 301 302 303 304
    for (const auto& f : result) {
      stub.set(f.second, schtPlanToString);
    }
  }
}

void schtSetExecNode() {
  static Stub stub;
  stub.set(qSetSubplanExecutionNode, schtExecNode);
  {
wafwerar's avatar
wafwerar 已提交
305 306 307 308 309 310
#ifdef WINDOWS
    AddrAny any;
    std::map<std::string,void*> result;
    any.get_func_addr("qSetSubplanExecutionNode", result);
#endif
#ifdef LINUX
D
dapan 已提交
311 312 313
    AddrAny any("libplanner.so");
    std::map<std::string,void*> result;
    any.get_global_func_addr_dynsym("^qSetSubplanExecutionNode$", result);
wafwerar's avatar
wafwerar 已提交
314
#endif
D
dapan 已提交
315 316 317 318 319 320
    for (const auto& f : result) {
      stub.set(f.second, schtExecNode);
    }
  }
}

D
dapan1121 已提交
321 322 323 324
void schtSetRpcSendRequest() {
  static Stub stub;
  stub.set(rpcSendRequest, schtRpcSendRequest);
  {
wafwerar's avatar
wafwerar 已提交
325 326 327 328 329 330
#ifdef WINDOWS
    AddrAny any;
    std::map<std::string,void*> result;
    any.get_func_addr("rpcSendRequest", result);
#endif
#ifdef LINUX
D
dapan1121 已提交
331 332 333
    AddrAny any("libtransport.so");
    std::map<std::string,void*> result;
    any.get_global_func_addr_dynsym("^rpcSendRequest$", result);
wafwerar's avatar
wafwerar 已提交
334
#endif
D
dapan1121 已提交
335 336 337 338 339 340
    for (const auto& f : result) {
      stub.set(f.second, schtRpcSendRequest);
    }
  }
}

D
dapan1121 已提交
341 342
int32_t schtAsyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo) {
  if (pInfo) {
wafwerar's avatar
wafwerar 已提交
343 344 345
    taosMemoryFreeClear(pInfo->param);
    taosMemoryFreeClear(pInfo->msgInfo.pData);
    taosMemoryFree(pInfo);
D
dapan1121 已提交
346
  }
D
dapan1121 已提交
347 348 349 350 351 352 353 354
  return 0;
}


void schtSetAsyncSendMsgToServer() {
  static Stub stub;
  stub.set(asyncSendMsgToServer, schtAsyncSendMsgToServer);
  {
wafwerar's avatar
wafwerar 已提交
355 356 357 358 359 360
#ifdef WINDOWS
    AddrAny any;
    std::map<std::string,void*> result;
    any.get_func_addr("asyncSendMsgToServer", result);
#endif
#ifdef LINUX
D
dapan1121 已提交
361 362 363
    AddrAny any("libtransport.so");
    std::map<std::string,void*> result;
    any.get_global_func_addr_dynsym("^asyncSendMsgToServer$", result);
wafwerar's avatar
wafwerar 已提交
364
#endif
D
dapan1121 已提交
365 366 367 368 369 370
    for (const auto& f : result) {
      stub.set(f.second, schtAsyncSendMsgToServer);
    }
  }
}

D
dapan1121 已提交
371

D
dapan 已提交
372
void *schtSendRsp(void *param) {
D
dapan1121 已提交
373 374
  SSchJob *pJob = NULL;
  int64_t job = 0;
D
dapan 已提交
375 376 377
  int32_t code = 0;

  while (true) {
D
dapan1121 已提交
378
    job = *(int64_t *)param;
D
dapan 已提交
379 380 381 382
    if (job) {
      break;
    }

wafwerar's avatar
wafwerar 已提交
383
    taosMsleep(1);
D
dapan 已提交
384
  }
D
dapan1121 已提交
385 386

  pJob = schAcquireJob(job);
D
dapan 已提交
387
  
D
dapan1121 已提交
388
  void *pIter = taosHashIterate(pJob->execTasks, NULL);
D
dapan 已提交
389 390 391
  while (pIter) {
    SSchTask *task = *(SSchTask **)pIter;

S
Shengliang Guan 已提交
392
    SSubmitRsp rsp = {0};
D
dapan 已提交
393
    rsp.affectedRows = 10;
D
dapan1121 已提交
394
    schHandleResponseMsg(pJob, task, TDMT_VND_SUBMIT_RSP, (char *)&rsp, sizeof(rsp), 0);
D
dapan 已提交
395
    
D
dapan1121 已提交
396
    pIter = taosHashIterate(pJob->execTasks, pIter);
D
dapan 已提交
397 398
  }    

D
dapan1121 已提交
399 400
  schReleaseJob(job);

D
dapan 已提交
401 402 403
  return NULL;
}

D
dapan1121 已提交
404
void *schtCreateFetchRspThread(void *param) {
D
dapan1121 已提交
405 406
  int64_t job = *(int64_t *)param;
  SSchJob* pJob = schAcquireJob(job);
D
dapan1121 已提交
407

wafwerar's avatar
wafwerar 已提交
408
  taosSsleep(1);
D
dapan1121 已提交
409 410

  int32_t code = 0;
wafwerar's avatar
wafwerar 已提交
411
  SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, sizeof(SRetrieveTableRsp));
D
dapan1121 已提交
412 413
  rsp->completed = 1;
  rsp->numOfRows = 10;
D
dapan1121 已提交
414
 
D
dapan1121 已提交
415 416 417 418
  code = schHandleResponseMsg(pJob, pJob->fetchTask, TDMT_VND_FETCH_RSP, (char *)rsp, sizeof(*rsp), 0);

  schReleaseJob(job);
  
D
dapan1121 已提交
419
  assert(code == 0);
wafwerar's avatar
wafwerar 已提交
420
  return NULL;
D
dapan1121 已提交
421 422 423
}


D
dapan1121 已提交
424 425
void *schtFetchRspThread(void *aa) {
  SDataBuf dataBuf = {0};
D
dapan1121 已提交
426
  SSchTaskCallbackParam* param = NULL;
D
dapan1121 已提交
427 428 429 430 431 432

  while (!schtTestStop) {
    if (0 == atomic_val_compare_exchange_32(&schtStartFetch, 1, 0)) {
      continue;
    }

wafwerar's avatar
wafwerar 已提交
433
    taosUsleep(1);
D
dapan1121 已提交
434
    
435
    param = (SSchTaskCallbackParam *)taosMemoryCalloc(1, sizeof(*param));
D
dapan1121 已提交
436 437 438 439 440

    param->queryId = schtQueryId;  
    param->taskId = schtFetchTaskId;

    int32_t code = 0;
wafwerar's avatar
wafwerar 已提交
441
    SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, sizeof(SRetrieveTableRsp));
D
dapan1121 已提交
442 443 444 445 446 447 448 449 450 451
    rsp->completed = 1;
    rsp->numOfRows = 10;

    dataBuf.pData = rsp;
    dataBuf.len = sizeof(*rsp);

    code = schHandleCallback(param, &dataBuf, TDMT_VND_FETCH_RSP, 0);
      
    assert(code == 0 || code);
  }
wafwerar's avatar
wafwerar 已提交
452
  return NULL;
D
dapan1121 已提交
453 454 455 456
}

void schtFreeQueryJob(int32_t freeThread) {
  static uint32_t freeNum = 0;
D
dapan1121 已提交
457
  int64_t job = queryJobRefId;
D
dapan1121 已提交
458
  
D
dapan1121 已提交
459
  if (job && atomic_val_compare_exchange_64(&queryJobRefId, job, 0)) {
D
dapan1121 已提交
460
    schedulerFreeJob(job);
D
dapan1121 已提交
461 462 463 464 465 466 467 468 469 470 471 472 473 474
    if (freeThread) {
      if (++freeNum % schtTestPrintNum == 0) {
        printf("FreeNum:%d\n", freeNum);
      }
    }
  }
}

void* schtRunJobThread(void *aa) {
  void *mockPointer = (void *)0x1;
  char *clusterId = "cluster1";
  char *dbname = "1.db1";
  char *tablename = "table1";
  SVgroupInfo vgInfo = {0};
X
Xiaoyu Wang 已提交
475
  SQueryPlan dag;
D
dapan1121 已提交
476 477 478 479 480 481 482 483 484 485 486 487

  schtInitLogFile();

  
  int32_t code = schedulerInit(NULL);
  assert(code == 0);


  schtSetPlanToString();
  schtSetExecNode();
  schtSetAsyncSendMsgToServer();

D
dapan1121 已提交
488
  SSchJob *pJob = NULL;
D
dapan1121 已提交
489
  SSchTaskCallbackParam *param = NULL;
D
dapan1121 已提交
490 491 492
  SHashObj *execTasks = NULL;
  SDataBuf dataBuf = {0};
  uint32_t jobFinished = 0;
D
dapan1121 已提交
493
  int32_t queryDone = 0;
D
dapan1121 已提交
494 495 496 497

  while (!schtTestStop) {
    schtBuildQueryDag(&dag);

H
Haojun Liao 已提交
498
    SArray *qnodeList = taosArrayInit(1, sizeof(SEp));
D
dapan1121 已提交
499

H
Haojun Liao 已提交
500
    SEp qnodeAddr = {0};
D
dapan1121 已提交
501 502 503 504
    strcpy(qnodeAddr.fqdn, "qnode0.ep");
    qnodeAddr.port = 6031;
    taosArrayPush(qnodeList, &qnodeAddr);

D
dapan1121 已提交
505
    queryDone = 0;
D
dapan1121 已提交
506 507 508 509 510 511 512 513 514 515 516 517 518
    SRequestConnInfo conn = {.pTrans = mockPointer, 
                             .requestId = 0,
                             .requestObjRefId = 0
    };
    SSchedulerReq req = {.pConn = &conn,
                         .pNodeList = qnodeList,
                         .pDag = &dag,
                         .sql = "select * from tb",
                         .startTs = 0,
                         .fp = schtQueryCb,
                         .cbParam = &queryDone
    };    
    code = schedulerAsyncExecJob(&req, &queryJobRefId);      
D
dapan1121 已提交
519 520
    assert(code == 0);

D
dapan1121 已提交
521 522 523 524 525 526 527
    pJob = schAcquireJob(queryJobRefId);
    if (NULL == pJob) {
      taosArrayDestroy(qnodeList);
      schtFreeQueryDag(&dag);
      continue;
    }
    
D
dapan1121 已提交
528
    execTasks = taosHashInit(5, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
D
dapan1121 已提交
529
    void *pIter = taosHashIterate(pJob->execTasks, NULL);
D
dapan1121 已提交
530 531 532 533 534
    while (pIter) {
      SSchTask *task = *(SSchTask **)pIter;
      schtFetchTaskId = task->taskId - 1;
      
      taosHashPut(execTasks, &task->taskId, sizeof(task->taskId), task, sizeof(*task));
D
dapan1121 已提交
535
      pIter = taosHashIterate(pJob->execTasks, pIter);
D
dapan1121 已提交
536 537
    }    

538
    param = (SSchTaskCallbackParam *)taosMemoryCalloc(1, sizeof(*param));
D
dapan1121 已提交
539 540
    param->refId = queryJobRefId;
    param->queryId = pJob->queryId;   
D
dapan1121 已提交
541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556

    pIter = taosHashIterate(execTasks, NULL);
    while (pIter) {
      SSchTask *task = (SSchTask *)pIter;

      param->taskId = task->taskId;
      SQueryTableRsp rsp = {0};
      dataBuf.pData = &rsp;
      dataBuf.len = sizeof(rsp);
      
      code = schHandleCallback(param, &dataBuf, TDMT_VND_QUERY_RSP, 0);
      assert(code == 0 || code);

      pIter = taosHashIterate(execTasks, pIter);
    }    

557
    param = (SSchTaskCallbackParam *)taosMemoryCalloc(1, sizeof(*param));
D
dapan1121 已提交
558 559
    param->refId = queryJobRefId;
    param->queryId = pJob->queryId;   
D
dapan1121 已提交
560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576

    pIter = taosHashIterate(execTasks, NULL);
    while (pIter) {
      SSchTask *task = (SSchTask *)pIter;

      param->taskId = task->taskId - 1;
      SQueryTableRsp rsp = {0};
      dataBuf.pData = &rsp;
      dataBuf.len = sizeof(rsp);
      
      code = schHandleCallback(param, &dataBuf, TDMT_VND_QUERY_RSP, 0);
      assert(code == 0 || code);
      
      pIter = taosHashIterate(execTasks, pIter);
    }    


D
dapan1121 已提交
577 578 579 580 581 582 583 584
    while (true) {
      if (queryDone) {
        break;
      }
    
      taosUsleep(10000);
    }

D
dapan1121 已提交
585 586 587
    atomic_store_32(&schtStartFetch, 1);

    void *data = NULL;  
D
dapan1121 已提交
588
    code = schedulerFetchRows(queryJobRefId, &data);
D
dapan1121 已提交
589 590 591 592 593 594 595 596 597
    assert(code == 0 || code);

    if (0 == code) {
      SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data;
      assert(pRsp->completed == 1);
      assert(pRsp->numOfRows == 10);
    }

    data = NULL;
D
dapan1121 已提交
598
    code = schedulerFetchRows(queryJobRefId, &data);
D
dapan1121 已提交
599 600 601 602 603
    assert(code == 0 || code);
    
    schtFreeQueryJob(0);

    taosHashCleanup(execTasks);
D
dapan1121 已提交
604
    taosArrayDestroy(qnodeList);
D
dapan1121 已提交
605 606 607 608 609 610 611 612 613 614 615 616

    schtFreeQueryDag(&dag);

    if (++jobFinished % schtTestPrintNum == 0) {
      printf("jobFinished:%d\n", jobFinished);
    }

    ++schtQueryId;
  }

  schedulerDestroy();

wafwerar's avatar
wafwerar 已提交
617
  return NULL;
D
dapan1121 已提交
618 619 620 621
}

void* schtFreeJobThread(void *aa) {
  while (!schtTestStop) {
wafwerar's avatar
wafwerar 已提交
622
    taosUsleep(taosRand() % 100);
D
dapan1121 已提交
623 624
    schtFreeQueryJob(1);
  }
wafwerar's avatar
wafwerar 已提交
625
  return NULL;
D
dapan1121 已提交
626
}
D
dapan1121 已提交
627 628


D
dapan1121 已提交
629 630
}

D
dapan 已提交
631
TEST(queryTest, normalCase) {
D
dapan1121 已提交
632
  void *mockPointer = (void *)0x1;
D
dapan1121 已提交
633 634 635
  char *clusterId = "cluster1";
  char *dbname = "1.db1";
  char *tablename = "table1";
D
dapan1121 已提交
636
  SVgroupInfo vgInfo = {0};
D
dapan1121 已提交
637
  int64_t job = 0;
H
Haojun Liao 已提交
638
  SQueryPlan dag;
D
dapan1121 已提交
639

640
  memset(&dag, 0, sizeof(dag));
D
dapan1121 已提交
641

H
Haojun Liao 已提交
642
  SArray *qnodeList = taosArrayInit(1, sizeof(SEp));
D
dapan 已提交
643

H
Haojun Liao 已提交
644
  SEp qnodeAddr = {0};
D
dapan 已提交
645 646 647
  strcpy(qnodeAddr.fqdn, "qnode0.ep");
  qnodeAddr.port = 6031;
  taosArrayPush(qnodeList, &qnodeAddr);
D
dapan1121 已提交
648 649
  
  int32_t code = schedulerInit(NULL);
D
dapan1121 已提交
650
  ASSERT_EQ(code, 0);
D
dapan1121 已提交
651

D
dapan 已提交
652
  schtBuildQueryDag(&dag);
D
dapan 已提交
653 654 655

  schtSetPlanToString();
  schtSetExecNode();
D
dapan1121 已提交
656
  schtSetAsyncSendMsgToServer();
D
dapan1121 已提交
657 658

  int32_t queryDone = 0;
D
dapan1121 已提交
659 660 661 662 663 664 665 666 667 668 669 670 671
  SRequestConnInfo conn = {.pTrans = mockPointer, 
                           .requestId = 0,
                           .requestObjRefId = 0
  };
  SSchedulerReq req = {.pConn = &conn,
                       .pNodeList = qnodeList,
                       .pDag = &dag,
                       .sql = "select * from tb",
                       .startTs = 0,
                       .fp = schtQueryCb,
                       .cbParam = &queryDone
  };    
  code = schedulerAsyncExecJob(&req, &job);  
D
dapan1121 已提交
672
  ASSERT_EQ(code, 0);
D
dapan 已提交
673

D
dapan1121 已提交
674 675 676 677
  
  SSchJob *pJob = schAcquireJob(job);
  
  void *pIter = taosHashIterate(pJob->execTasks, NULL);
D
dapan 已提交
678
  while (pIter) {
D
dapan 已提交
679
    SSchTask *task = *(SSchTask **)pIter;
D
dapan 已提交
680 681

    SQueryTableRsp rsp = {0};
D
dapan1121 已提交
682
    code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
D
dapan 已提交
683 684
    
    ASSERT_EQ(code, 0);
D
dapan1121 已提交
685
    pIter = taosHashIterate(pJob->execTasks, pIter);
D
dapan 已提交
686 687
  }    

D
dapan1121 已提交
688
  pIter = taosHashIterate(pJob->execTasks, NULL);
D
dapan 已提交
689
  while (pIter) {
D
dapan 已提交
690
    SSchTask *task = *(SSchTask **)pIter;
D
dapan 已提交
691 692

    SQueryTableRsp rsp = {0};
D
dapan1121 已提交
693
    code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
D
dapan 已提交
694 695
    
    ASSERT_EQ(code, 0);
D
dapan1121 已提交
696
    pIter = taosHashIterate(pJob->execTasks, pIter);
D
dapan 已提交
697 698
  }    

D
dapan1121 已提交
699 700 701 702 703 704 705 706
  while (true) {
    if (queryDone) {
      break;
    }

    taosUsleep(10000);
  }
  
wafwerar's avatar
wafwerar 已提交
707 708
  TdThreadAttr thattr;
  taosThreadAttrInit(&thattr);
D
dapan 已提交
709

wafwerar's avatar
wafwerar 已提交
710 711
  TdThread thread1;
  taosThreadCreate(&(thread1), &thattr, schtCreateFetchRspThread, &job);
D
dapan 已提交
712

D
dapan1121 已提交
713
  void *data = NULL;  
D
dapan1121 已提交
714
  code = schedulerFetchRows(job, &data);
D
dapan 已提交
715 716 717 718 719
  ASSERT_EQ(code, 0);

  SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data;
  ASSERT_EQ(pRsp->completed, 1);
  ASSERT_EQ(pRsp->numOfRows, 10);
wafwerar's avatar
wafwerar 已提交
720
  taosMemoryFreeClear(data);
D
dapan 已提交
721 722

  data = NULL;
D
dapan1121 已提交
723
  code = schedulerFetchRows(job, &data);
D
dapan 已提交
724
  ASSERT_EQ(code, 0);
D
dapan1121 已提交
725 726 727
  ASSERT_TRUE(data == NULL);

  schReleaseJob(job);
D
dapan 已提交
728

D
dapan1121 已提交
729
  schedulerFreeJob(job);
D
dapan1121 已提交
730 731

  schtFreeQueryDag(&dag);
D
dapan1121 已提交
732 733

  schedulerDestroy();
D
dapan1121 已提交
734
}
D
dapan1121 已提交
735

D
dapan 已提交
736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761
TEST(queryTest, readyFirstCase) {
  void *mockPointer = (void *)0x1;
  char *clusterId = "cluster1";
  char *dbname = "1.db1";
  char *tablename = "table1";
  SVgroupInfo vgInfo = {0};
  int64_t job = 0;
  SQueryPlan dag;

  memset(&dag, 0, sizeof(dag));

  SArray *qnodeList = taosArrayInit(1, sizeof(SEp));

  SEp qnodeAddr = {0};
  strcpy(qnodeAddr.fqdn, "qnode0.ep");
  qnodeAddr.port = 6031;
  taosArrayPush(qnodeList, &qnodeAddr);
  
  int32_t code = schedulerInit(NULL);
  ASSERT_EQ(code, 0);

  schtBuildQueryDag(&dag);

  schtSetPlanToString();
  schtSetExecNode();
  schtSetAsyncSendMsgToServer();
D
dapan1121 已提交
762 763

  int32_t queryDone = 0;  
D
dapan1121 已提交
764 765 766 767 768 769 770 771 772 773 774 775 776
  SRequestConnInfo conn = {.pTrans = mockPointer, 
                           .requestId = 0,
                           .requestObjRefId = 0
  };
  SSchedulerReq req = {.pConn = &conn,
                       .pNodeList = qnodeList,
                       .pDag = &dag,
                       .sql = "select * from tb",
                       .startTs = 0,
                       .fp = schtQueryCb,
                       .cbParam = &queryDone
  };    
  code = schedulerAsyncExecJob(&req, &job);
D
dapan 已提交
777 778 779 780 781
  ASSERT_EQ(code, 0);

  
  SSchJob *pJob = schAcquireJob(job);
  
D
dapan1121 已提交
782
  void *pIter = taosHashIterate(pJob->execTasks, NULL);
D
dapan 已提交
783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803
  while (pIter) {
    SSchTask *task = *(SSchTask **)pIter;

    SQueryTableRsp rsp = {0};
    code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
    
    ASSERT_EQ(code, 0);
    pIter = taosHashIterate(pJob->execTasks, pIter);
  }    

  pIter = taosHashIterate(pJob->execTasks, NULL);
  while (pIter) {
    SSchTask *task = *(SSchTask **)pIter;

    SQueryTableRsp rsp = {0};
    code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
    
    ASSERT_EQ(code, 0);
    pIter = taosHashIterate(pJob->execTasks, pIter);
  }    

D
dapan1121 已提交
804 805 806 807 808 809 810
  while (true) {
    if (queryDone) {
      break;
    }

    taosUsleep(10000);
  }
D
dapan 已提交
811 812


D
dapan1121 已提交
813 814
  TdThreadAttr thattr;
  taosThreadAttrInit(&thattr);
D
dapan 已提交
815

D
dapan1121 已提交
816 817
  TdThread thread1;
  taosThreadCreate(&(thread1), &thattr, schtCreateFetchRspThread, &job);
D
dapan 已提交
818 819 820 821 822 823 824 825

  void *data = NULL;  
  code = schedulerFetchRows(job, &data);
  ASSERT_EQ(code, 0);

  SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data;
  ASSERT_EQ(pRsp->completed, 1);
  ASSERT_EQ(pRsp->numOfRows, 10);
D
dapan1121 已提交
826
  taosMemoryFreeClear(data);
D
dapan 已提交
827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843

  data = NULL;
  code = schedulerFetchRows(job, &data);
  ASSERT_EQ(code, 0);
  ASSERT_TRUE(data == NULL);

  schReleaseJob(job);

  schedulerFreeJob(job);

  schtFreeQueryDag(&dag);

  schedulerDestroy();
}



D
dapan 已提交
844 845 846 847 848 849 850 851 852 853 854
TEST(queryTest, flowCtrlCase) {
  void *mockPointer = (void *)0x1;
  char *clusterId = "cluster1";
  char *dbname = "1.db1";
  char *tablename = "table1";
  SVgroupInfo vgInfo = {0};
  int64_t job = 0;
  SQueryPlan dag;

  schtInitLogFile();

wafwerar's avatar
wafwerar 已提交
855
  taosSeedRand(taosGetTimestampSec());
D
dapan 已提交
856
  
D
dapan 已提交
857 858 859 860 861 862 863 864 865 866
  SArray *qnodeList = taosArrayInit(1, sizeof(SEp));

  SEp qnodeAddr = {0};
  strcpy(qnodeAddr.fqdn, "qnode0.ep");
  qnodeAddr.port = 6031;
  taosArrayPush(qnodeList, &qnodeAddr);
  
  int32_t code = schedulerInit(NULL);
  ASSERT_EQ(code, 0);

D
dapan 已提交
867
  schtBuildQueryFlowCtrlDag(&dag);
D
dapan 已提交
868 869 870 871

  schtSetPlanToString();
  schtSetExecNode();
  schtSetAsyncSendMsgToServer();
D
dapan1121 已提交
872 873

  int32_t queryDone = 0;  
D
dapan1121 已提交
874 875 876 877 878 879 880 881 882 883 884 885 886
  SRequestConnInfo conn = {.pTrans = mockPointer, 
                           .requestId = 0,
                           .requestObjRefId = 0
  };
  SSchedulerReq req = {.pConn = &conn,
                       .pNodeList = qnodeList,
                       .pDag = &dag,
                       .sql = "select * from tb",
                       .startTs = 0,
                       .fp = schtQueryCb,
                       .cbParam = &queryDone
  };  
  code = schedulerAsyncExecJob(&req, &job);
D
dapan 已提交
887 888 889 890 891
  ASSERT_EQ(code, 0);

  
  SSchJob *pJob = schAcquireJob(job);

D
dapan1121 已提交
892
  bool qDone = false;
D
dapan 已提交
893
  
D
dapan1121 已提交
894
  while (!qDone) {
D
dapan 已提交
895 896 897 898
    void *pIter = taosHashIterate(pJob->execTasks, NULL);
    if (NULL == pIter) {
      break;
    }
D
dapan 已提交
899
    
D
dapan 已提交
900 901
    while (pIter) {
      SSchTask *task = *(SSchTask **)pIter;
D
dapan 已提交
902

D
dapan 已提交
903 904 905 906 907 908 909 910
      taosHashCancelIterate(pJob->execTasks, pIter);

      if (task->lastMsgType == TDMT_VND_QUERY) {
        SQueryTableRsp rsp = {0};
        code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
        
        ASSERT_EQ(code, 0);
      } else {
D
dapan1121 已提交
911
        qDone = true;
D
dapan 已提交
912 913 914 915 916 917
        break;
      }
      
      pIter = NULL;
    }    
  }
D
dapan 已提交
918

D
dapan1121 已提交
919 920 921 922 923 924 925
  while (true) {
    if (queryDone) {
      break;
    }

    taosUsleep(10000);
  }
D
dapan 已提交
926

wafwerar's avatar
wafwerar 已提交
927 928
  TdThreadAttr thattr;
  taosThreadAttrInit(&thattr);
D
dapan 已提交
929

wafwerar's avatar
wafwerar 已提交
930 931
  TdThread thread1;
  taosThreadCreate(&(thread1), &thattr, schtCreateFetchRspThread, &job);
D
dapan 已提交
932 933 934 935 936 937 938 939

  void *data = NULL;  
  code = schedulerFetchRows(job, &data);
  ASSERT_EQ(code, 0);

  SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data;
  ASSERT_EQ(pRsp->completed, 1);
  ASSERT_EQ(pRsp->numOfRows, 10);
wafwerar's avatar
wafwerar 已提交
940
  taosMemoryFreeClear(data);
D
dapan 已提交
941 942 943 944 945 946 947 948 949 950 951 952 953 954

  data = NULL;
  code = schedulerFetchRows(job, &data);
  ASSERT_EQ(code, 0);
  ASSERT_TRUE(data == NULL);

  schReleaseJob(job);

  schedulerFreeJob(job);

  schtFreeQueryDag(&dag);

  schedulerDestroy();
}
D
dapan1121 已提交
955

D
dapan 已提交
956 957 958 959 960 961 962

TEST(insertTest, normalCase) {
  void *mockPointer = (void *)0x1;
  char *clusterId = "cluster1";
  char *dbname = "1.db1";
  char *tablename = "table1";
  SVgroupInfo vgInfo = {0};
X
Xiaoyu Wang 已提交
963
  SQueryPlan dag;
D
dapan 已提交
964
  uint64_t numOfRows = 0;
D
dapan1121 已提交
965

H
Haojun Liao 已提交
966
  SArray *qnodeList = taosArrayInit(1, sizeof(SEp));
D
dapan 已提交
967

H
Haojun Liao 已提交
968
  SEp qnodeAddr = {0};
D
dapan 已提交
969 970 971 972 973 974 975 976 977 978
  strcpy(qnodeAddr.fqdn, "qnode0.ep");
  qnodeAddr.port = 6031;
  taosArrayPush(qnodeList, &qnodeAddr);
  
  int32_t code = schedulerInit(NULL);
  ASSERT_EQ(code, 0);

  schtBuildInsertDag(&dag);

  schtSetPlanToString();
D
dapan1121 已提交
979
  schtSetAsyncSendMsgToServer();
D
dapan 已提交
980

wafwerar's avatar
wafwerar 已提交
981 982
  TdThreadAttr thattr;
  taosThreadAttrInit(&thattr);
D
dapan 已提交
983

wafwerar's avatar
wafwerar 已提交
984 985
  TdThread thread1;
  taosThreadCreate(&(thread1), &thattr, schtSendRsp, &insertJobRefId);
D
dapan1121 已提交
986 987

  SQueryResult res = {0};
D
dapan1121 已提交
988 989 990 991 992 993 994 995 996 997 998 999 1000 1001
  SRequestConnInfo conn = {.pTrans = mockPointer, 
                           .requestId = 0,
                           .requestObjRefId = 0
  };
  
  SSchedulerReq req = {.pConn = &conn,
                       .pNodeList = qnodeList,
                       .pDag = &dag,
                       .sql = "insert into tb values(now,1)",
                       .startTs = 0,
                       .fp = NULL,
                       .cbParam = NULL
  };
  code = schedulerExecJob(&req, &insertJobRefId, &res);
D
dapan 已提交
1002
  ASSERT_EQ(code, 0);
D
dapan1121 已提交
1003
  ASSERT_EQ(res.numOfRows, 20);
D
dapan 已提交
1004

D
dapan1121 已提交
1005
  schedulerFreeJob(insertJobRefId);
D
dapan1121 已提交
1006 1007

  schedulerDestroy();  
D
dapan 已提交
1008 1009
}

D
dapan1121 已提交
1010
TEST(multiThread, forceFree) {
wafwerar's avatar
wafwerar 已提交
1011 1012
  TdThreadAttr thattr;
  taosThreadAttrInit(&thattr);
D
dapan 已提交
1013

wafwerar's avatar
wafwerar 已提交
1014 1015 1016 1017
  TdThread thread1, thread2, thread3;
  taosThreadCreate(&(thread1), &thattr, schtRunJobThread, NULL);
  taosThreadCreate(&(thread2), &thattr, schtFreeJobThread, NULL);
  taosThreadCreate(&(thread3), &thattr, schtFetchRspThread, NULL);
D
dapan1121 已提交
1018

D
dapan1121 已提交
1019 1020
  while (true) {
    if (schtTestDeadLoop) {
wafwerar's avatar
wafwerar 已提交
1021
      taosSsleep(1);
D
dapan1121 已提交
1022
    } else {
wafwerar's avatar
wafwerar 已提交
1023
      taosSsleep(schtTestMTRunSec);
D
dapan1121 已提交
1024 1025 1026 1027 1028
      break;
    }
  }
  
  schtTestStop = true;
wafwerar's avatar
wafwerar 已提交
1029
  taosSsleep(3);
D
dapan1121 已提交
1030
}
D
dapan 已提交
1031

D
dapan1121 已提交
1032
int main(int argc, char** argv) {
wafwerar's avatar
wafwerar 已提交
1033
  taosSeedRand(taosGetTimestampSec());
D
dapan1121 已提交
1034 1035 1036 1037
  testing::InitGoogleTest(&argc, argv);
  return RUN_ALL_TESTS();
}

D
dapan1121 已提交
1038
#pragma GCC diagnostic pop