schedulerTests.cpp 25.3 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include <gtest/gtest.h>
#include <iostream>

19 20 21 22 23 24 25 26 27
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
#pragma GCC diagnostic ignored "-Wreturn-type"
#pragma GCC diagnostic ignored "-Wformat"
#include <addr_any.h>

wafwerar's avatar
wafwerar 已提交
28 29 30
#ifdef WINDOWS
#define TD_USE_WINSOCK
#endif
D
dapan1121 已提交
31 32
#include "os.h"

33
#include "tglobal.h"
D
dapan1121 已提交
34 35 36
#include "taos.h"
#include "tdef.h"
#include "tvariant.h"
D
dapan1121 已提交
37 38
#include "catalog.h"
#include "scheduler.h"
H
Haojun Liao 已提交
39 40 41
#include "taos.h"
#include "tdatablock.h"
#include "tdef.h"
D
dapan1121 已提交
42
#include "trpc.h"
H
Haojun Liao 已提交
43
#include "tvariant.h"
S
Shengliang Guan 已提交
44 45 46 47 48 49 50 51 52

#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
#pragma GCC diagnostic ignored "-Wreturn-type"
#pragma GCC diagnostic ignored "-Wformat"

D
dapan 已提交
53 54
#include "schedulerInt.h"
#include "stub.h"
D
dapan1121 已提交
55
#include "tref.h"
D
dapan1121 已提交
56

D
dapan1121 已提交
57
namespace {
D
dapan 已提交
58

D
dapan1121 已提交
59
extern "C" int32_t schHandleResponseMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode);
D
dapan1121 已提交
60 61
extern "C" int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, int32_t rspCode);

D
dapan1121 已提交
62 63
int64_t insertJobRefId = 0;
int64_t queryJobRefId = 0;
D
dapan1121 已提交
64 65 66 67 68 69 70

uint64_t schtMergeTemplateId = 0x4;
uint64_t schtFetchTaskId = 0;
uint64_t schtQueryId = 1;

bool schtTestStop = false;
bool schtTestDeadLoop = false;
D
dapan1121 已提交
71
int32_t schtTestMTRunSec = 10;
D
dapan1121 已提交
72 73
int32_t schtTestPrintNum = 1000;
int32_t schtStartFetch = 0;
D
dapan 已提交
74

D
dapan1121 已提交
75

D
dapan1121 已提交
76 77 78 79 80 81
void schtInitLogFile() {
  const char    *defaultLogFileNamePrefix = "taoslog";
  const int32_t  maxLogFileNum = 10;

  tsAsyncLog = 0;
  qDebugFlag = 159;
wafwerar's avatar
wafwerar 已提交
82
  strcpy(tsLogDir, TD_LOG_DIR_PATH);
D
dapan1121 已提交
83

S
Shengliang Guan 已提交
84
  if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) {
S
os env  
Shengliang Guan 已提交
85
    printf("failed to open log file in directory:%s\n", tsLogDir);
D
dapan1121 已提交
86 87 88 89
  }

}

D
dapan1121 已提交
90 91 92 93 94
void schtQueryCb(SQueryResult* pResult, void* param, int32_t code) {
  assert(TSDB_CODE_SUCCESS == code);
  *(int32_t*)param = 1;
}

D
dapan1121 已提交
95

X
Xiaoyu Wang 已提交
96
void schtBuildQueryDag(SQueryPlan *dag) {
D
dapan1121 已提交
97
  uint64_t qId = schtQueryId;
D
dapan1121 已提交
98 99 100
  
  dag->queryId = qId;
  dag->numOfSubplans = 2;
X
Xiaoyu Wang 已提交
101 102 103
  dag->pSubplans = nodesMakeList();
  SNodeListNode *scan = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST);
  SNodeListNode *merge = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST);
D
dapan1121 已提交
104
  
wafwerar's avatar
wafwerar 已提交
105 106
  SSubplan *scanPlan = (SSubplan *)taosMemoryCalloc(1, sizeof(SSubplan));
  SSubplan *mergePlan = (SSubplan *)taosMemoryCalloc(1, sizeof(SSubplan));
D
dapan1121 已提交
107 108

  scanPlan->id.queryId = qId;
X
Xiaoyu Wang 已提交
109
  scanPlan->id.groupId = 0x0000000000000002;
D
dapan1121 已提交
110
  scanPlan->id.subplanId = 0x0000000000000003;
X
Xiaoyu Wang 已提交
111
  scanPlan->subplanType = SUBPLAN_TYPE_SCAN;
H
Haojun Liao 已提交
112

D
dapan1121 已提交
113
  scanPlan->execNode.nodeId = 1;
L
Liu Jicong 已提交
114 115
  scanPlan->execNode.epSet.inUse = 0;
  addEpIntoEpSet(&scanPlan->execNode.epSet, "ep0", 6030);
H
Haojun Liao 已提交
116

D
dapan1121 已提交
117 118
  scanPlan->pChildren = NULL;
  scanPlan->level = 1;
X
Xiaoyu Wang 已提交
119
  scanPlan->pParents = nodesMakeList();
wafwerar's avatar
wafwerar 已提交
120
  scanPlan->pNode = (SPhysiNode*)taosMemoryCalloc(1, sizeof(SPhysiNode));
D
dapan1121 已提交
121
  scanPlan->msgType = TDMT_SCH_QUERY;
D
dapan1121 已提交
122 123

  mergePlan->id.queryId = qId;
X
Xiaoyu Wang 已提交
124
  mergePlan->id.groupId = schtMergeTemplateId;
X
Xiaoyu Wang 已提交
125 126
  mergePlan->id.subplanId = 0x5555;
  mergePlan->subplanType = SUBPLAN_TYPE_MERGE;
D
dapan1121 已提交
127
  mergePlan->level = 0;
L
Liu Jicong 已提交
128
  mergePlan->execNode.epSet.numOfEps = 0;
H
Haojun Liao 已提交
129

X
Xiaoyu Wang 已提交
130
  mergePlan->pChildren = nodesMakeList();
D
dapan1121 已提交
131
  mergePlan->pParents = NULL;
wafwerar's avatar
wafwerar 已提交
132
  mergePlan->pNode = (SPhysiNode*)taosMemoryCalloc(1, sizeof(SPhysiNode));
D
dapan1121 已提交
133
  mergePlan->msgType = TDMT_SCH_QUERY;
D
dapan1121 已提交
134

D
dapan 已提交
135 136 137
  merge->pNodeList = nodesMakeList();
  scan->pNodeList = nodesMakeList();

X
Xiaoyu Wang 已提交
138 139
  nodesListAppend(merge->pNodeList, (SNode*)mergePlan);
  nodesListAppend(scan->pNodeList, (SNode*)scanPlan);
D
dapan1121 已提交
140

X
Xiaoyu Wang 已提交
141 142
  nodesListAppend(mergePlan->pChildren, (SNode*)scanPlan);
  nodesListAppend(scanPlan->pParents, (SNode*)mergePlan);
D
dapan1121 已提交
143

X
Xiaoyu Wang 已提交
144 145
  nodesListAppend(dag->pSubplans, (SNode*)merge);  
  nodesListAppend(dag->pSubplans, (SNode*)scan);
D
dapan1121 已提交
146 147
}

D
dapan 已提交
148 149 150 151 152 153 154 155 156 157
void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) {
  uint64_t qId = schtQueryId;
  int32_t scanPlanNum = 20;
  
  dag->queryId = qId;
  dag->numOfSubplans = 2;
  dag->pSubplans = nodesMakeList();
  SNodeListNode *scan = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST);
  SNodeListNode *merge = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST);
  
wafwerar's avatar
wafwerar 已提交
158 159
  SSubplan *scanPlan = (SSubplan *)taosMemoryCalloc(scanPlanNum, sizeof(SSubplan));
  SSubplan *mergePlan = (SSubplan *)taosMemoryCalloc(1, sizeof(SSubplan));
D
dapan 已提交
160 161 162 163 164 165 166 167

  merge->pNodeList = nodesMakeList();
  scan->pNodeList = nodesMakeList();

  mergePlan->pChildren = nodesMakeList();

  for (int32_t i = 0; i < scanPlanNum; ++i) {
    scanPlan[i].id.queryId = qId;
X
Xiaoyu Wang 已提交
168
    scanPlan[i].id.groupId = 0x0000000000000002;
D
dapan 已提交
169 170 171 172
    scanPlan[i].id.subplanId = 0x0000000000000003 + i;
    scanPlan[i].subplanType = SUBPLAN_TYPE_SCAN;

    scanPlan[i].execNode.nodeId = 1 + i;
X
Xiaoyu Wang 已提交
173 174 175 176 177 178
    scanPlan[i].execNode.epSet.inUse = 0;
    scanPlan[i].execNodeStat.tableNum = taosRand() % 30;
    addEpIntoEpSet(&scanPlan[i].execNode.epSet, "ep0", 6030);
    addEpIntoEpSet(&scanPlan[i].execNode.epSet, "ep1", 6030);
    addEpIntoEpSet(&scanPlan[i].execNode.epSet, "ep2", 6030);
    scanPlan[i].execNode.epSet.inUse = taosRand() % 3;
D
dapan 已提交
179 180 181 182

    scanPlan[i].pChildren = NULL;
    scanPlan[i].level = 1;
    scanPlan[i].pParents = nodesMakeList();
wafwerar's avatar
wafwerar 已提交
183
    scanPlan[i].pNode = (SPhysiNode*)taosMemoryCalloc(1, sizeof(SPhysiNode));
D
dapan1121 已提交
184
    scanPlan[i].msgType = TDMT_SCH_QUERY;
D
dapan 已提交
185 186 187 188 189 190 191 192

    nodesListAppend(scanPlan[i].pParents, (SNode*)mergePlan);
    nodesListAppend(mergePlan->pChildren, (SNode*)(scanPlan + i));

    nodesListAppend(scan->pNodeList, (SNode*)(scanPlan + i));
  }

  mergePlan->id.queryId = qId;
X
Xiaoyu Wang 已提交
193
  mergePlan->id.groupId = schtMergeTemplateId;
D
dapan 已提交
194 195 196
  mergePlan->id.subplanId = 0x5555;
  mergePlan->subplanType = SUBPLAN_TYPE_MERGE;
  mergePlan->level = 0;
X
Xiaoyu Wang 已提交
197
  mergePlan->execNode.epSet.numOfEps = 0;
D
dapan 已提交
198 199

  mergePlan->pParents = NULL;
wafwerar's avatar
wafwerar 已提交
200
  mergePlan->pNode = (SPhysiNode*)taosMemoryCalloc(1, sizeof(SPhysiNode));
D
dapan1121 已提交
201
  mergePlan->msgType = TDMT_SCH_QUERY;
D
dapan 已提交
202 203 204 205 206 207 208 209

  nodesListAppend(merge->pNodeList, (SNode*)mergePlan);

  nodesListAppend(dag->pSubplans, (SNode*)merge);  
  nodesListAppend(dag->pSubplans, (SNode*)scan);
}


X
Xiaoyu Wang 已提交
210
void schtFreeQueryDag(SQueryPlan *dag) {
D
dapan1121 已提交
211 212 213 214

}


X
Xiaoyu Wang 已提交
215
void schtBuildInsertDag(SQueryPlan *dag) {
D
dapan 已提交
216 217 218 219
  uint64_t qId = 0x0000000000000002;
  
  dag->queryId = qId;
  dag->numOfSubplans = 2;
X
Xiaoyu Wang 已提交
220 221
  dag->pSubplans = nodesMakeList();
  SNodeListNode *inserta = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST);
D
dapan 已提交
222
  
wafwerar's avatar
wafwerar 已提交
223
  SSubplan *insertPlan = (SSubplan *)taosMemoryCalloc(2, sizeof(SSubplan));
D
dapan 已提交
224 225

  insertPlan[0].id.queryId = qId;
X
Xiaoyu Wang 已提交
226
  insertPlan[0].id.groupId = 0x0000000000000003;
D
dapan 已提交
227
  insertPlan[0].id.subplanId = 0x0000000000000004;
X
Xiaoyu Wang 已提交
228
  insertPlan[0].subplanType = SUBPLAN_TYPE_MODIFY;
D
dapan 已提交
229
  insertPlan[0].level = 0;
H
Haojun Liao 已提交
230

D
dapan1121 已提交
231
  insertPlan[0].execNode.nodeId = 1;
L
Liu Jicong 已提交
232 233
  insertPlan[0].execNode.epSet.inUse = 0;
  addEpIntoEpSet(&insertPlan[0].execNode.epSet, "ep0", 6030);
H
Haojun Liao 已提交
234

H
Haojun Liao 已提交
235
  insertPlan[0].pChildren = NULL;
D
dapan 已提交
236 237
  insertPlan[0].pParents = NULL;
  insertPlan[0].pNode = NULL;
wafwerar's avatar
wafwerar 已提交
238
  insertPlan[0].pDataSink = (SDataSinkNode*)taosMemoryCalloc(1, sizeof(SDataSinkNode));
D
dapan1121 已提交
239
  insertPlan[0].msgType = TDMT_VND_SUBMIT;
D
dapan 已提交
240 241

  insertPlan[1].id.queryId = qId;
X
Xiaoyu Wang 已提交
242
  insertPlan[1].id.groupId = 0x0000000000000003;
D
dapan 已提交
243
  insertPlan[1].id.subplanId = 0x0000000000000005;
X
Xiaoyu Wang 已提交
244
  insertPlan[1].subplanType = SUBPLAN_TYPE_MODIFY;
D
dapan 已提交
245
  insertPlan[1].level = 0;
H
Haojun Liao 已提交
246

D
dapan1121 已提交
247
  insertPlan[1].execNode.nodeId = 1;
L
Liu Jicong 已提交
248 249
  insertPlan[1].execNode.epSet.inUse = 0;
  addEpIntoEpSet(&insertPlan[1].execNode.epSet, "ep0", 6030);
H
Haojun Liao 已提交
250

H
Haojun Liao 已提交
251
  insertPlan[1].pChildren = NULL;
D
dapan 已提交
252 253
  insertPlan[1].pParents = NULL;
  insertPlan[1].pNode = NULL;
wafwerar's avatar
wafwerar 已提交
254
  insertPlan[1].pDataSink = (SDataSinkNode*)taosMemoryCalloc(1, sizeof(SDataSinkNode));
D
dapan1121 已提交
255
  insertPlan[1].msgType = TDMT_VND_SUBMIT;
D
dapan 已提交
256

D
dapan 已提交
257 258
  inserta->pNodeList = nodesMakeList();

X
Xiaoyu Wang 已提交
259
  nodesListAppend(inserta->pNodeList, (SNode*)insertPlan);
D
dapan1121 已提交
260
  insertPlan += 1;
X
Xiaoyu Wang 已提交
261
  nodesListAppend(inserta->pNodeList, (SNode*)insertPlan);
D
dapan 已提交
262

X
Xiaoyu Wang 已提交
263
  nodesListAppend(dag->pSubplans, (SNode*)inserta);  
D
dapan 已提交
264 265 266
}


D
dapan 已提交
267
int32_t schtPlanToString(const SSubplan *subplan, char** str, int32_t* len) {
wafwerar's avatar
wafwerar 已提交
268
  *str = (char *)taosMemoryCalloc(1, 20);
D
dapan 已提交
269 270 271 272
  *len = 20;
  return 0;
}

X
Xiaoyu Wang 已提交
273
void schtExecNode(SSubplan* subplan, uint64_t groupId, SQueryNodeAddr* ep) {
H
Haojun Liao 已提交
274

D
dapan 已提交
275 276
}

D
dapan1121 已提交
277 278 279 280
void schtRpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) {

}

D
dapan 已提交
281 282 283 284
void schtSetPlanToString() {
  static Stub stub;
  stub.set(qSubPlanToString, schtPlanToString);
  {
wafwerar's avatar
wafwerar 已提交
285 286 287 288 289 290
#ifdef WINDOWS
    AddrAny any;
    std::map<std::string,void*> result;
    any.get_func_addr("qSubPlanToString", result);
#endif
#ifdef LINUX
D
dapan 已提交
291 292 293
    AddrAny any("libplanner.so");
    std::map<std::string,void*> result;
    any.get_global_func_addr_dynsym("^qSubPlanToString$", result);
wafwerar's avatar
wafwerar 已提交
294
#endif
D
dapan 已提交
295 296 297 298 299 300 301 302 303 304
    for (const auto& f : result) {
      stub.set(f.second, schtPlanToString);
    }
  }
}

void schtSetExecNode() {
  static Stub stub;
  stub.set(qSetSubplanExecutionNode, schtExecNode);
  {
wafwerar's avatar
wafwerar 已提交
305 306 307 308 309 310
#ifdef WINDOWS
    AddrAny any;
    std::map<std::string,void*> result;
    any.get_func_addr("qSetSubplanExecutionNode", result);
#endif
#ifdef LINUX
D
dapan 已提交
311 312 313
    AddrAny any("libplanner.so");
    std::map<std::string,void*> result;
    any.get_global_func_addr_dynsym("^qSetSubplanExecutionNode$", result);
wafwerar's avatar
wafwerar 已提交
314
#endif
D
dapan 已提交
315 316 317 318 319 320
    for (const auto& f : result) {
      stub.set(f.second, schtExecNode);
    }
  }
}

D
dapan1121 已提交
321 322 323 324
void schtSetRpcSendRequest() {
  static Stub stub;
  stub.set(rpcSendRequest, schtRpcSendRequest);
  {
wafwerar's avatar
wafwerar 已提交
325 326 327 328 329 330
#ifdef WINDOWS
    AddrAny any;
    std::map<std::string,void*> result;
    any.get_func_addr("rpcSendRequest", result);
#endif
#ifdef LINUX
D
dapan1121 已提交
331 332 333
    AddrAny any("libtransport.so");
    std::map<std::string,void*> result;
    any.get_global_func_addr_dynsym("^rpcSendRequest$", result);
wafwerar's avatar
wafwerar 已提交
334
#endif
D
dapan1121 已提交
335 336 337 338 339 340
    for (const auto& f : result) {
      stub.set(f.second, schtRpcSendRequest);
    }
  }
}

D
dapan1121 已提交
341 342
int32_t schtAsyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo) {
  if (pInfo) {
wafwerar's avatar
wafwerar 已提交
343 344 345
    taosMemoryFreeClear(pInfo->param);
    taosMemoryFreeClear(pInfo->msgInfo.pData);
    taosMemoryFree(pInfo);
D
dapan1121 已提交
346
  }
D
dapan1121 已提交
347 348 349 350 351 352 353 354
  return 0;
}


void schtSetAsyncSendMsgToServer() {
  static Stub stub;
  stub.set(asyncSendMsgToServer, schtAsyncSendMsgToServer);
  {
wafwerar's avatar
wafwerar 已提交
355 356 357 358 359 360
#ifdef WINDOWS
    AddrAny any;
    std::map<std::string,void*> result;
    any.get_func_addr("asyncSendMsgToServer", result);
#endif
#ifdef LINUX
D
dapan1121 已提交
361 362 363
    AddrAny any("libtransport.so");
    std::map<std::string,void*> result;
    any.get_global_func_addr_dynsym("^asyncSendMsgToServer$", result);
wafwerar's avatar
wafwerar 已提交
364
#endif
D
dapan1121 已提交
365 366 367 368 369 370
    for (const auto& f : result) {
      stub.set(f.second, schtAsyncSendMsgToServer);
    }
  }
}

D
dapan1121 已提交
371

D
dapan 已提交
372
void *schtSendRsp(void *param) {
D
dapan1121 已提交
373 374
  SSchJob *pJob = NULL;
  int64_t job = 0;
D
dapan 已提交
375 376 377
  int32_t code = 0;

  while (true) {
D
dapan1121 已提交
378
    job = *(int64_t *)param;
D
dapan 已提交
379 380 381 382
    if (job) {
      break;
    }

wafwerar's avatar
wafwerar 已提交
383
    taosMsleep(1);
D
dapan 已提交
384
  }
D
dapan1121 已提交
385 386

  pJob = schAcquireJob(job);
D
dapan 已提交
387
  
D
dapan1121 已提交
388
  void *pIter = taosHashIterate(pJob->execTasks, NULL);
D
dapan 已提交
389 390 391
  while (pIter) {
    SSchTask *task = *(SSchTask **)pIter;

S
Shengliang Guan 已提交
392
    SSubmitRsp rsp = {0};
D
dapan 已提交
393
    rsp.affectedRows = 10;
D
dapan1121 已提交
394
    schHandleResponseMsg(pJob, task, TDMT_VND_SUBMIT_RSP, (char *)&rsp, sizeof(rsp), 0);
D
dapan 已提交
395
    
D
dapan1121 已提交
396
    pIter = taosHashIterate(pJob->execTasks, pIter);
D
dapan 已提交
397 398
  }    

D
dapan1121 已提交
399 400
  schReleaseJob(job);

D
dapan 已提交
401 402 403
  return NULL;
}

D
dapan1121 已提交
404
void *schtCreateFetchRspThread(void *param) {
D
dapan1121 已提交
405 406
  int64_t job = *(int64_t *)param;
  SSchJob* pJob = schAcquireJob(job);
D
dapan1121 已提交
407

wafwerar's avatar
wafwerar 已提交
408
  taosSsleep(1);
D
dapan1121 已提交
409 410

  int32_t code = 0;
wafwerar's avatar
wafwerar 已提交
411
  SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, sizeof(SRetrieveTableRsp));
D
dapan1121 已提交
412 413
  rsp->completed = 1;
  rsp->numOfRows = 10;
D
dapan1121 已提交
414
 
D
dapan1121 已提交
415 416 417 418
  code = schHandleResponseMsg(pJob, pJob->fetchTask, TDMT_VND_FETCH_RSP, (char *)rsp, sizeof(*rsp), 0);

  schReleaseJob(job);
  
D
dapan1121 已提交
419
  assert(code == 0);
wafwerar's avatar
wafwerar 已提交
420
  return NULL;
D
dapan1121 已提交
421 422 423
}


D
dapan1121 已提交
424 425
void *schtFetchRspThread(void *aa) {
  SDataBuf dataBuf = {0};
D
dapan1121 已提交
426
  SSchTaskCallbackParam* param = NULL;
D
dapan1121 已提交
427 428 429 430 431 432

  while (!schtTestStop) {
    if (0 == atomic_val_compare_exchange_32(&schtStartFetch, 1, 0)) {
      continue;
    }

wafwerar's avatar
wafwerar 已提交
433
    taosUsleep(1);
D
dapan1121 已提交
434
    
435
    param = (SSchTaskCallbackParam *)taosMemoryCalloc(1, sizeof(*param));
D
dapan1121 已提交
436 437 438 439 440

    param->queryId = schtQueryId;  
    param->taskId = schtFetchTaskId;

    int32_t code = 0;
wafwerar's avatar
wafwerar 已提交
441
    SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, sizeof(SRetrieveTableRsp));
D
dapan1121 已提交
442 443 444 445 446 447 448 449 450 451
    rsp->completed = 1;
    rsp->numOfRows = 10;

    dataBuf.pData = rsp;
    dataBuf.len = sizeof(*rsp);

    code = schHandleCallback(param, &dataBuf, TDMT_VND_FETCH_RSP, 0);
      
    assert(code == 0 || code);
  }
wafwerar's avatar
wafwerar 已提交
452
  return NULL;
D
dapan1121 已提交
453 454 455 456
}

void schtFreeQueryJob(int32_t freeThread) {
  static uint32_t freeNum = 0;
D
dapan1121 已提交
457
  int64_t job = queryJobRefId;
D
dapan1121 已提交
458
  
D
dapan1121 已提交
459
  if (job && atomic_val_compare_exchange_64(&queryJobRefId, job, 0)) {
D
dapan1121 已提交
460
    schedulerFreeJob(job, 0);
D
dapan1121 已提交
461 462 463 464 465 466 467 468 469 470 471 472 473 474
    if (freeThread) {
      if (++freeNum % schtTestPrintNum == 0) {
        printf("FreeNum:%d\n", freeNum);
      }
    }
  }
}

void* schtRunJobThread(void *aa) {
  void *mockPointer = (void *)0x1;
  char *clusterId = "cluster1";
  char *dbname = "1.db1";
  char *tablename = "table1";
  SVgroupInfo vgInfo = {0};
X
Xiaoyu Wang 已提交
475
  SQueryPlan dag;
D
dapan1121 已提交
476 477 478 479 480 481 482 483 484 485 486 487

  schtInitLogFile();

  
  int32_t code = schedulerInit(NULL);
  assert(code == 0);


  schtSetPlanToString();
  schtSetExecNode();
  schtSetAsyncSendMsgToServer();

D
dapan1121 已提交
488
  SSchJob *pJob = NULL;
D
dapan1121 已提交
489
  SSchTaskCallbackParam *param = NULL;
D
dapan1121 已提交
490 491 492
  SHashObj *execTasks = NULL;
  SDataBuf dataBuf = {0};
  uint32_t jobFinished = 0;
D
dapan1121 已提交
493
  int32_t queryDone = 0;
D
dapan1121 已提交
494 495 496 497

  while (!schtTestStop) {
    schtBuildQueryDag(&dag);

H
Haojun Liao 已提交
498
    SArray *qnodeList = taosArrayInit(1, sizeof(SEp));
D
dapan1121 已提交
499

H
Haojun Liao 已提交
500
    SEp qnodeAddr = {0};
D
dapan1121 已提交
501 502 503 504
    strcpy(qnodeAddr.fqdn, "qnode0.ep");
    qnodeAddr.port = 6031;
    taosArrayPush(qnodeList, &qnodeAddr);

D
dapan1121 已提交
505
    queryDone = 0;
D
dapan1121 已提交
506 507 508 509 510 511 512 513 514 515 516
    
    SRequestConnInfo conn = {0};
    conn.pTrans = mockPointer;
    SSchedulerReq req = {0};    
    req.pConn = &conn;
    req.pNodeList = qnodeList;
    req.pDag = &dag;
    req.sql = "select * from tb";
    req.fp = schtQueryCb;
    req.cbParam = &queryDone;
    
D
dapan1121 已提交
517
    code = schedulerAsyncExecJob(&req, &queryJobRefId);      
D
dapan1121 已提交
518 519
    assert(code == 0);

D
dapan1121 已提交
520 521 522 523 524 525 526
    pJob = schAcquireJob(queryJobRefId);
    if (NULL == pJob) {
      taosArrayDestroy(qnodeList);
      schtFreeQueryDag(&dag);
      continue;
    }
    
D
dapan1121 已提交
527
    execTasks = taosHashInit(5, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
D
dapan1121 已提交
528
    void *pIter = taosHashIterate(pJob->execTasks, NULL);
D
dapan1121 已提交
529 530 531 532 533
    while (pIter) {
      SSchTask *task = *(SSchTask **)pIter;
      schtFetchTaskId = task->taskId - 1;
      
      taosHashPut(execTasks, &task->taskId, sizeof(task->taskId), task, sizeof(*task));
D
dapan1121 已提交
534
      pIter = taosHashIterate(pJob->execTasks, pIter);
D
dapan1121 已提交
535 536
    }    

537
    param = (SSchTaskCallbackParam *)taosMemoryCalloc(1, sizeof(*param));
D
dapan1121 已提交
538 539
    param->refId = queryJobRefId;
    param->queryId = pJob->queryId;   
D
dapan1121 已提交
540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555

    pIter = taosHashIterate(execTasks, NULL);
    while (pIter) {
      SSchTask *task = (SSchTask *)pIter;

      param->taskId = task->taskId;
      SQueryTableRsp rsp = {0};
      dataBuf.pData = &rsp;
      dataBuf.len = sizeof(rsp);
      
      code = schHandleCallback(param, &dataBuf, TDMT_VND_QUERY_RSP, 0);
      assert(code == 0 || code);

      pIter = taosHashIterate(execTasks, pIter);
    }    

556
    param = (SSchTaskCallbackParam *)taosMemoryCalloc(1, sizeof(*param));
D
dapan1121 已提交
557 558
    param->refId = queryJobRefId;
    param->queryId = pJob->queryId;   
D
dapan1121 已提交
559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575

    pIter = taosHashIterate(execTasks, NULL);
    while (pIter) {
      SSchTask *task = (SSchTask *)pIter;

      param->taskId = task->taskId - 1;
      SQueryTableRsp rsp = {0};
      dataBuf.pData = &rsp;
      dataBuf.len = sizeof(rsp);
      
      code = schHandleCallback(param, &dataBuf, TDMT_VND_QUERY_RSP, 0);
      assert(code == 0 || code);
      
      pIter = taosHashIterate(execTasks, pIter);
    }    


D
dapan1121 已提交
576 577 578 579 580 581 582 583
    while (true) {
      if (queryDone) {
        break;
      }
    
      taosUsleep(10000);
    }

D
dapan1121 已提交
584 585 586
    atomic_store_32(&schtStartFetch, 1);

    void *data = NULL;  
D
dapan1121 已提交
587
    code = schedulerFetchRows(queryJobRefId, &data);
D
dapan1121 已提交
588 589 590 591 592 593 594 595 596
    assert(code == 0 || code);

    if (0 == code) {
      SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data;
      assert(pRsp->completed == 1);
      assert(pRsp->numOfRows == 10);
    }

    data = NULL;
D
dapan1121 已提交
597
    code = schedulerFetchRows(queryJobRefId, &data);
D
dapan1121 已提交
598 599 600 601 602
    assert(code == 0 || code);
    
    schtFreeQueryJob(0);

    taosHashCleanup(execTasks);
D
dapan1121 已提交
603
    taosArrayDestroy(qnodeList);
D
dapan1121 已提交
604 605 606 607 608 609 610 611 612 613 614 615

    schtFreeQueryDag(&dag);

    if (++jobFinished % schtTestPrintNum == 0) {
      printf("jobFinished:%d\n", jobFinished);
    }

    ++schtQueryId;
  }

  schedulerDestroy();

wafwerar's avatar
wafwerar 已提交
616
  return NULL;
D
dapan1121 已提交
617 618 619 620
}

void* schtFreeJobThread(void *aa) {
  while (!schtTestStop) {
wafwerar's avatar
wafwerar 已提交
621
    taosUsleep(taosRand() % 100);
D
dapan1121 已提交
622 623
    schtFreeQueryJob(1);
  }
wafwerar's avatar
wafwerar 已提交
624
  return NULL;
D
dapan1121 已提交
625
}
D
dapan1121 已提交
626 627


D
dapan1121 已提交
628 629
}

D
dapan 已提交
630
TEST(queryTest, normalCase) {
D
dapan1121 已提交
631
  void *mockPointer = (void *)0x1;
D
dapan1121 已提交
632 633 634
  char *clusterId = "cluster1";
  char *dbname = "1.db1";
  char *tablename = "table1";
D
dapan1121 已提交
635
  SVgroupInfo vgInfo = {0};
D
dapan1121 已提交
636
  int64_t job = 0;
H
Haojun Liao 已提交
637
  SQueryPlan dag;
D
dapan1121 已提交
638

639
  memset(&dag, 0, sizeof(dag));
D
dapan1121 已提交
640

H
Haojun Liao 已提交
641
  SArray *qnodeList = taosArrayInit(1, sizeof(SEp));
D
dapan 已提交
642

H
Haojun Liao 已提交
643
  SEp qnodeAddr = {0};
D
dapan 已提交
644 645 646
  strcpy(qnodeAddr.fqdn, "qnode0.ep");
  qnodeAddr.port = 6031;
  taosArrayPush(qnodeList, &qnodeAddr);
D
dapan1121 已提交
647 648
  
  int32_t code = schedulerInit(NULL);
D
dapan1121 已提交
649
  ASSERT_EQ(code, 0);
D
dapan1121 已提交
650

D
dapan 已提交
651
  schtBuildQueryDag(&dag);
D
dapan 已提交
652 653 654

  schtSetPlanToString();
  schtSetExecNode();
D
dapan1121 已提交
655
  schtSetAsyncSendMsgToServer();
D
dapan1121 已提交
656 657

  int32_t queryDone = 0;
D
dapan1121 已提交
658 659 660 661 662 663 664 665 666 667 668

  SRequestConnInfo conn = {0};
  conn.pTrans = mockPointer;
  SSchedulerReq req = {0};    
  req.pConn = &conn;
  req.pNodeList = qnodeList;
  req.pDag = &dag;
  req.sql = "select * from tb";
  req.fp = schtQueryCb;
  req.cbParam = &queryDone;
    
D
dapan1121 已提交
669
  code = schedulerAsyncExecJob(&req, &job);  
D
dapan1121 已提交
670
  ASSERT_EQ(code, 0);
D
dapan 已提交
671

D
dapan1121 已提交
672 673 674 675
  
  SSchJob *pJob = schAcquireJob(job);
  
  void *pIter = taosHashIterate(pJob->execTasks, NULL);
D
dapan 已提交
676
  while (pIter) {
D
dapan 已提交
677
    SSchTask *task = *(SSchTask **)pIter;
D
dapan 已提交
678 679

    SQueryTableRsp rsp = {0};
D
dapan1121 已提交
680
    code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
D
dapan 已提交
681 682
    
    ASSERT_EQ(code, 0);
D
dapan1121 已提交
683
    pIter = taosHashIterate(pJob->execTasks, pIter);
D
dapan 已提交
684 685
  }    

D
dapan1121 已提交
686
  pIter = taosHashIterate(pJob->execTasks, NULL);
D
dapan 已提交
687
  while (pIter) {
D
dapan 已提交
688
    SSchTask *task = *(SSchTask **)pIter;
D
dapan 已提交
689 690

    SQueryTableRsp rsp = {0};
D
dapan1121 已提交
691
    code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
D
dapan 已提交
692 693
    
    ASSERT_EQ(code, 0);
D
dapan1121 已提交
694
    pIter = taosHashIterate(pJob->execTasks, pIter);
D
dapan 已提交
695 696
  }    

D
dapan1121 已提交
697 698 699 700 701 702 703 704
  while (true) {
    if (queryDone) {
      break;
    }

    taosUsleep(10000);
  }
  
wafwerar's avatar
wafwerar 已提交
705 706
  TdThreadAttr thattr;
  taosThreadAttrInit(&thattr);
D
dapan 已提交
707

wafwerar's avatar
wafwerar 已提交
708 709
  TdThread thread1;
  taosThreadCreate(&(thread1), &thattr, schtCreateFetchRspThread, &job);
D
dapan 已提交
710

D
dapan1121 已提交
711
  void *data = NULL;  
D
dapan1121 已提交
712
  code = schedulerFetchRows(job, &data);
D
dapan 已提交
713 714 715 716 717
  ASSERT_EQ(code, 0);

  SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data;
  ASSERT_EQ(pRsp->completed, 1);
  ASSERT_EQ(pRsp->numOfRows, 10);
wafwerar's avatar
wafwerar 已提交
718
  taosMemoryFreeClear(data);
D
dapan 已提交
719 720

  data = NULL;
D
dapan1121 已提交
721
  code = schedulerFetchRows(job, &data);
D
dapan 已提交
722
  ASSERT_EQ(code, 0);
D
dapan1121 已提交
723 724 725
  ASSERT_TRUE(data == NULL);

  schReleaseJob(job);
D
dapan 已提交
726

D
dapan1121 已提交
727
  schedulerFreeJob(job, 0);
D
dapan1121 已提交
728 729

  schtFreeQueryDag(&dag);
D
dapan1121 已提交
730 731

  schedulerDestroy();
D
dapan1121 已提交
732
}
D
dapan1121 已提交
733

D
dapan 已提交
734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759
TEST(queryTest, readyFirstCase) {
  void *mockPointer = (void *)0x1;
  char *clusterId = "cluster1";
  char *dbname = "1.db1";
  char *tablename = "table1";
  SVgroupInfo vgInfo = {0};
  int64_t job = 0;
  SQueryPlan dag;

  memset(&dag, 0, sizeof(dag));

  SArray *qnodeList = taosArrayInit(1, sizeof(SEp));

  SEp qnodeAddr = {0};
  strcpy(qnodeAddr.fqdn, "qnode0.ep");
  qnodeAddr.port = 6031;
  taosArrayPush(qnodeList, &qnodeAddr);
  
  int32_t code = schedulerInit(NULL);
  ASSERT_EQ(code, 0);

  schtBuildQueryDag(&dag);

  schtSetPlanToString();
  schtSetExecNode();
  schtSetAsyncSendMsgToServer();
D
dapan1121 已提交
760 761

  int32_t queryDone = 0;  
D
dapan1121 已提交
762 763 764 765 766 767 768 769 770 771

  SRequestConnInfo conn = {0};
  conn.pTrans = mockPointer;
  SSchedulerReq req = {0};    
  req.pConn = &conn;
  req.pNodeList = qnodeList;
  req.pDag = &dag;
  req.sql = "select * from tb";
  req.fp = schtQueryCb;
  req.cbParam = &queryDone;
D
dapan1121 已提交
772
  code = schedulerAsyncExecJob(&req, &job);
D
dapan 已提交
773 774 775 776 777
  ASSERT_EQ(code, 0);

  
  SSchJob *pJob = schAcquireJob(job);
  
D
dapan1121 已提交
778
  void *pIter = taosHashIterate(pJob->execTasks, NULL);
D
dapan 已提交
779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799
  while (pIter) {
    SSchTask *task = *(SSchTask **)pIter;

    SQueryTableRsp rsp = {0};
    code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
    
    ASSERT_EQ(code, 0);
    pIter = taosHashIterate(pJob->execTasks, pIter);
  }    

  pIter = taosHashIterate(pJob->execTasks, NULL);
  while (pIter) {
    SSchTask *task = *(SSchTask **)pIter;

    SQueryTableRsp rsp = {0};
    code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
    
    ASSERT_EQ(code, 0);
    pIter = taosHashIterate(pJob->execTasks, pIter);
  }    

D
dapan1121 已提交
800 801 802 803 804 805 806
  while (true) {
    if (queryDone) {
      break;
    }

    taosUsleep(10000);
  }
D
dapan 已提交
807 808


D
dapan1121 已提交
809 810
  TdThreadAttr thattr;
  taosThreadAttrInit(&thattr);
D
dapan 已提交
811

D
dapan1121 已提交
812 813
  TdThread thread1;
  taosThreadCreate(&(thread1), &thattr, schtCreateFetchRspThread, &job);
D
dapan 已提交
814 815 816 817 818 819 820 821

  void *data = NULL;  
  code = schedulerFetchRows(job, &data);
  ASSERT_EQ(code, 0);

  SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data;
  ASSERT_EQ(pRsp->completed, 1);
  ASSERT_EQ(pRsp->numOfRows, 10);
D
dapan1121 已提交
822
  taosMemoryFreeClear(data);
D
dapan 已提交
823 824 825 826 827 828 829 830

  data = NULL;
  code = schedulerFetchRows(job, &data);
  ASSERT_EQ(code, 0);
  ASSERT_TRUE(data == NULL);

  schReleaseJob(job);

D
dapan1121 已提交
831
  schedulerFreeJob(job, 0);
D
dapan 已提交
832 833 834 835 836 837 838 839

  schtFreeQueryDag(&dag);

  schedulerDestroy();
}



D
dapan 已提交
840 841 842 843 844 845 846 847 848 849 850
TEST(queryTest, flowCtrlCase) {
  void *mockPointer = (void *)0x1;
  char *clusterId = "cluster1";
  char *dbname = "1.db1";
  char *tablename = "table1";
  SVgroupInfo vgInfo = {0};
  int64_t job = 0;
  SQueryPlan dag;

  schtInitLogFile();

wafwerar's avatar
wafwerar 已提交
851
  taosSeedRand(taosGetTimestampSec());
D
dapan 已提交
852
  
D
dapan 已提交
853 854 855 856 857 858 859 860 861 862
  SArray *qnodeList = taosArrayInit(1, sizeof(SEp));

  SEp qnodeAddr = {0};
  strcpy(qnodeAddr.fqdn, "qnode0.ep");
  qnodeAddr.port = 6031;
  taosArrayPush(qnodeList, &qnodeAddr);
  
  int32_t code = schedulerInit(NULL);
  ASSERT_EQ(code, 0);

D
dapan 已提交
863
  schtBuildQueryFlowCtrlDag(&dag);
D
dapan 已提交
864 865 866 867

  schtSetPlanToString();
  schtSetExecNode();
  schtSetAsyncSendMsgToServer();
D
dapan1121 已提交
868 869

  int32_t queryDone = 0;  
D
dapan1121 已提交
870 871 872 873 874 875 876 877 878 879
  SRequestConnInfo conn = {0};
  conn.pTrans = mockPointer;
  SSchedulerReq req = {0};    
  req.pConn = &conn;
  req.pNodeList = qnodeList;
  req.pDag = &dag;
  req.sql = "select * from tb";
  req.fp = schtQueryCb;
  req.cbParam = &queryDone;

D
dapan1121 已提交
880
  code = schedulerAsyncExecJob(&req, &job);
D
dapan 已提交
881 882 883 884 885
  ASSERT_EQ(code, 0);

  
  SSchJob *pJob = schAcquireJob(job);

D
dapan1121 已提交
886
  bool qDone = false;
D
dapan 已提交
887
  
D
dapan1121 已提交
888
  while (!qDone) {
D
dapan 已提交
889 890 891 892
    void *pIter = taosHashIterate(pJob->execTasks, NULL);
    if (NULL == pIter) {
      break;
    }
D
dapan 已提交
893
    
D
dapan 已提交
894 895
    while (pIter) {
      SSchTask *task = *(SSchTask **)pIter;
D
dapan 已提交
896

D
dapan 已提交
897 898
      taosHashCancelIterate(pJob->execTasks, pIter);

D
dapan1121 已提交
899
      if (task->lastMsgType == TDMT_SCH_QUERY) {
D
dapan 已提交
900 901 902 903 904
        SQueryTableRsp rsp = {0};
        code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
        
        ASSERT_EQ(code, 0);
      } else {
D
dapan1121 已提交
905
        qDone = true;
D
dapan 已提交
906 907 908 909 910 911
        break;
      }
      
      pIter = NULL;
    }    
  }
D
dapan 已提交
912

D
dapan1121 已提交
913 914 915 916 917 918 919
  while (true) {
    if (queryDone) {
      break;
    }

    taosUsleep(10000);
  }
D
dapan 已提交
920

wafwerar's avatar
wafwerar 已提交
921 922
  TdThreadAttr thattr;
  taosThreadAttrInit(&thattr);
D
dapan 已提交
923

wafwerar's avatar
wafwerar 已提交
924 925
  TdThread thread1;
  taosThreadCreate(&(thread1), &thattr, schtCreateFetchRspThread, &job);
D
dapan 已提交
926 927 928 929 930 931 932 933

  void *data = NULL;  
  code = schedulerFetchRows(job, &data);
  ASSERT_EQ(code, 0);

  SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data;
  ASSERT_EQ(pRsp->completed, 1);
  ASSERT_EQ(pRsp->numOfRows, 10);
wafwerar's avatar
wafwerar 已提交
934
  taosMemoryFreeClear(data);
D
dapan 已提交
935 936 937 938 939 940 941 942

  data = NULL;
  code = schedulerFetchRows(job, &data);
  ASSERT_EQ(code, 0);
  ASSERT_TRUE(data == NULL);

  schReleaseJob(job);

D
dapan1121 已提交
943
  schedulerFreeJob(job, 0);
D
dapan 已提交
944 945 946 947 948

  schtFreeQueryDag(&dag);

  schedulerDestroy();
}
D
dapan1121 已提交
949

D
dapan 已提交
950 951 952 953 954 955 956

TEST(insertTest, normalCase) {
  void *mockPointer = (void *)0x1;
  char *clusterId = "cluster1";
  char *dbname = "1.db1";
  char *tablename = "table1";
  SVgroupInfo vgInfo = {0};
X
Xiaoyu Wang 已提交
957
  SQueryPlan dag;
D
dapan 已提交
958
  uint64_t numOfRows = 0;
D
dapan1121 已提交
959

H
Haojun Liao 已提交
960
  SArray *qnodeList = taosArrayInit(1, sizeof(SEp));
D
dapan 已提交
961

H
Haojun Liao 已提交
962
  SEp qnodeAddr = {0};
D
dapan 已提交
963 964 965 966 967 968 969 970 971 972
  strcpy(qnodeAddr.fqdn, "qnode0.ep");
  qnodeAddr.port = 6031;
  taosArrayPush(qnodeList, &qnodeAddr);
  
  int32_t code = schedulerInit(NULL);
  ASSERT_EQ(code, 0);

  schtBuildInsertDag(&dag);

  schtSetPlanToString();
D
dapan1121 已提交
973
  schtSetAsyncSendMsgToServer();
D
dapan 已提交
974

wafwerar's avatar
wafwerar 已提交
975 976
  TdThreadAttr thattr;
  taosThreadAttrInit(&thattr);
D
dapan 已提交
977

wafwerar's avatar
wafwerar 已提交
978 979
  TdThread thread1;
  taosThreadCreate(&(thread1), &thattr, schtSendRsp, &insertJobRefId);
D
dapan1121 已提交
980 981

  SQueryResult res = {0};
D
dapan1121 已提交
982 983 984 985 986 987 988 989 990 991

  SRequestConnInfo conn = {0};
  conn.pTrans = mockPointer;
  SSchedulerReq req = {0};    
  req.pConn = &conn;
  req.pNodeList = qnodeList;
  req.pDag = &dag;
  req.sql = "insert into tb values(now,1)";
  req.fp = schtQueryCb;
  req.cbParam = NULL;
D
dapan1121 已提交
992 993
  
  code = schedulerExecJob(&req, &insertJobRefId, &res);
D
dapan 已提交
994
  ASSERT_EQ(code, 0);
D
dapan1121 已提交
995
  ASSERT_EQ(res.numOfRows, 20);
D
dapan 已提交
996

D
dapan1121 已提交
997
  schedulerFreeJob(insertJobRefId, 0);
D
dapan1121 已提交
998 999

  schedulerDestroy();  
D
dapan 已提交
1000 1001
}

D
dapan1121 已提交
1002
TEST(multiThread, forceFree) {
wafwerar's avatar
wafwerar 已提交
1003 1004
  TdThreadAttr thattr;
  taosThreadAttrInit(&thattr);
D
dapan 已提交
1005

wafwerar's avatar
wafwerar 已提交
1006 1007 1008 1009
  TdThread thread1, thread2, thread3;
  taosThreadCreate(&(thread1), &thattr, schtRunJobThread, NULL);
  taosThreadCreate(&(thread2), &thattr, schtFreeJobThread, NULL);
  taosThreadCreate(&(thread3), &thattr, schtFetchRspThread, NULL);
D
dapan1121 已提交
1010

D
dapan1121 已提交
1011 1012
  while (true) {
    if (schtTestDeadLoop) {
wafwerar's avatar
wafwerar 已提交
1013
      taosSsleep(1);
D
dapan1121 已提交
1014
    } else {
wafwerar's avatar
wafwerar 已提交
1015
      taosSsleep(schtTestMTRunSec);
D
dapan1121 已提交
1016 1017 1018 1019 1020
      break;
    }
  }
  
  schtTestStop = true;
wafwerar's avatar
wafwerar 已提交
1021
  taosSsleep(3);
D
dapan1121 已提交
1022
}
D
dapan 已提交
1023

D
dapan1121 已提交
1024
int main(int argc, char** argv) {
wafwerar's avatar
wafwerar 已提交
1025
  taosSeedRand(taosGetTimestampSec());
D
dapan1121 已提交
1026 1027 1028 1029
  testing::InitGoogleTest(&argc, argv);
  return RUN_ALL_TESTS();
}

D
dapan1121 已提交
1030
#pragma GCC diagnostic pop