schedulerTests.cpp 25.3 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include <gtest/gtest.h>
#include <iostream>

19 20 21 22 23 24 25 26 27
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
#pragma GCC diagnostic ignored "-Wreturn-type"
#pragma GCC diagnostic ignored "-Wformat"
#include <addr_any.h>

wafwerar's avatar
wafwerar 已提交
28 29 30
#ifdef WINDOWS
#define TD_USE_WINSOCK
#endif
D
dapan1121 已提交
31 32
#include "os.h"

33
#include "tglobal.h"
D
dapan1121 已提交
34 35 36
#include "taos.h"
#include "tdef.h"
#include "tvariant.h"
D
dapan1121 已提交
37 38
#include "catalog.h"
#include "scheduler.h"
H
Haojun Liao 已提交
39 40 41
#include "taos.h"
#include "tdatablock.h"
#include "tdef.h"
D
dapan1121 已提交
42
#include "trpc.h"
H
Haojun Liao 已提交
43
#include "tvariant.h"
S
Shengliang Guan 已提交
44 45 46 47 48 49 50 51 52

#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
#pragma GCC diagnostic ignored "-Wreturn-type"
#pragma GCC diagnostic ignored "-Wformat"

D
dapan 已提交
53 54
#include "schedulerInt.h"
#include "stub.h"
D
dapan1121 已提交
55
#include "tref.h"
D
dapan1121 已提交
56

D
dapan1121 已提交
57
namespace {
D
dapan 已提交
58

D
dapan1121 已提交
59
extern "C" int32_t schHandleResponseMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode);
D
dapan1121 已提交
60 61
extern "C" int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, int32_t rspCode);

D
dapan1121 已提交
62 63
int64_t insertJobRefId = 0;
int64_t queryJobRefId = 0;
D
dapan1121 已提交
64 65 66 67 68 69 70

uint64_t schtMergeTemplateId = 0x4;
uint64_t schtFetchTaskId = 0;
uint64_t schtQueryId = 1;

bool schtTestStop = false;
bool schtTestDeadLoop = false;
D
dapan1121 已提交
71
int32_t schtTestMTRunSec = 10;
D
dapan1121 已提交
72 73
int32_t schtTestPrintNum = 1000;
int32_t schtStartFetch = 0;
D
dapan 已提交
74

D
dapan1121 已提交
75

D
dapan1121 已提交
76 77 78 79 80 81
void schtInitLogFile() {
  const char    *defaultLogFileNamePrefix = "taoslog";
  const int32_t  maxLogFileNum = 10;

  tsAsyncLog = 0;
  qDebugFlag = 159;
wafwerar's avatar
wafwerar 已提交
82
  strcpy(tsLogDir, TD_LOG_DIR_PATH);
D
dapan1121 已提交
83

S
Shengliang Guan 已提交
84
  if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) {
S
os env  
Shengliang Guan 已提交
85
    printf("failed to open log file in directory:%s\n", tsLogDir);
D
dapan1121 已提交
86 87 88 89
  }

}

D
dapan1121 已提交
90 91 92 93 94
void schtQueryCb(SQueryResult* pResult, void* param, int32_t code) {
  assert(TSDB_CODE_SUCCESS == code);
  *(int32_t*)param = 1;
}

D
dapan1121 已提交
95

X
Xiaoyu Wang 已提交
96
void schtBuildQueryDag(SQueryPlan *dag) {
D
dapan1121 已提交
97
  uint64_t qId = schtQueryId;
D
dapan1121 已提交
98 99 100
  
  dag->queryId = qId;
  dag->numOfSubplans = 2;
X
Xiaoyu Wang 已提交
101 102 103
  dag->pSubplans = nodesMakeList();
  SNodeListNode *scan = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST);
  SNodeListNode *merge = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST);
D
dapan1121 已提交
104
  
wafwerar's avatar
wafwerar 已提交
105 106
  SSubplan *scanPlan = (SSubplan *)taosMemoryCalloc(1, sizeof(SSubplan));
  SSubplan *mergePlan = (SSubplan *)taosMemoryCalloc(1, sizeof(SSubplan));
D
dapan1121 已提交
107 108

  scanPlan->id.queryId = qId;
X
Xiaoyu Wang 已提交
109
  scanPlan->id.groupId = 0x0000000000000002;
D
dapan1121 已提交
110
  scanPlan->id.subplanId = 0x0000000000000003;
X
Xiaoyu Wang 已提交
111
  scanPlan->subplanType = SUBPLAN_TYPE_SCAN;
H
Haojun Liao 已提交
112

D
dapan1121 已提交
113
  scanPlan->execNode.nodeId = 1;
L
Liu Jicong 已提交
114 115
  scanPlan->execNode.epSet.inUse = 0;
  addEpIntoEpSet(&scanPlan->execNode.epSet, "ep0", 6030);
H
Haojun Liao 已提交
116

D
dapan1121 已提交
117 118
  scanPlan->pChildren = NULL;
  scanPlan->level = 1;
X
Xiaoyu Wang 已提交
119
  scanPlan->pParents = nodesMakeList();
wafwerar's avatar
wafwerar 已提交
120
  scanPlan->pNode = (SPhysiNode*)taosMemoryCalloc(1, sizeof(SPhysiNode));
D
dapan1121 已提交
121
  scanPlan->msgType = TDMT_SCH_QUERY;
D
dapan1121 已提交
122 123

  mergePlan->id.queryId = qId;
X
Xiaoyu Wang 已提交
124
  mergePlan->id.groupId = schtMergeTemplateId;
X
Xiaoyu Wang 已提交
125 126
  mergePlan->id.subplanId = 0x5555;
  mergePlan->subplanType = SUBPLAN_TYPE_MERGE;
D
dapan1121 已提交
127
  mergePlan->level = 0;
L
Liu Jicong 已提交
128
  mergePlan->execNode.epSet.numOfEps = 0;
H
Haojun Liao 已提交
129

X
Xiaoyu Wang 已提交
130
  mergePlan->pChildren = nodesMakeList();
D
dapan1121 已提交
131
  mergePlan->pParents = NULL;
wafwerar's avatar
wafwerar 已提交
132
  mergePlan->pNode = (SPhysiNode*)taosMemoryCalloc(1, sizeof(SPhysiNode));
D
dapan1121 已提交
133
  mergePlan->msgType = TDMT_SCH_QUERY;
D
dapan1121 已提交
134

D
dapan 已提交
135 136 137
  merge->pNodeList = nodesMakeList();
  scan->pNodeList = nodesMakeList();

X
Xiaoyu Wang 已提交
138 139
  nodesListAppend(merge->pNodeList, (SNode*)mergePlan);
  nodesListAppend(scan->pNodeList, (SNode*)scanPlan);
D
dapan1121 已提交
140

X
Xiaoyu Wang 已提交
141 142
  nodesListAppend(mergePlan->pChildren, (SNode*)scanPlan);
  nodesListAppend(scanPlan->pParents, (SNode*)mergePlan);
D
dapan1121 已提交
143

X
Xiaoyu Wang 已提交
144 145
  nodesListAppend(dag->pSubplans, (SNode*)merge);  
  nodesListAppend(dag->pSubplans, (SNode*)scan);
D
dapan1121 已提交
146 147
}

D
dapan 已提交
148 149 150 151 152 153 154 155 156 157
void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) {
  uint64_t qId = schtQueryId;
  int32_t scanPlanNum = 20;
  
  dag->queryId = qId;
  dag->numOfSubplans = 2;
  dag->pSubplans = nodesMakeList();
  SNodeListNode *scan = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST);
  SNodeListNode *merge = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST);
  
wafwerar's avatar
wafwerar 已提交
158 159
  SSubplan *scanPlan = (SSubplan *)taosMemoryCalloc(scanPlanNum, sizeof(SSubplan));
  SSubplan *mergePlan = (SSubplan *)taosMemoryCalloc(1, sizeof(SSubplan));
D
dapan 已提交
160 161 162 163 164 165 166 167

  merge->pNodeList = nodesMakeList();
  scan->pNodeList = nodesMakeList();

  mergePlan->pChildren = nodesMakeList();

  for (int32_t i = 0; i < scanPlanNum; ++i) {
    scanPlan[i].id.queryId = qId;
X
Xiaoyu Wang 已提交
168
    scanPlan[i].id.groupId = 0x0000000000000002;
D
dapan 已提交
169 170 171 172
    scanPlan[i].id.subplanId = 0x0000000000000003 + i;
    scanPlan[i].subplanType = SUBPLAN_TYPE_SCAN;

    scanPlan[i].execNode.nodeId = 1 + i;
X
Xiaoyu Wang 已提交
173 174 175 176 177 178
    scanPlan[i].execNode.epSet.inUse = 0;
    scanPlan[i].execNodeStat.tableNum = taosRand() % 30;
    addEpIntoEpSet(&scanPlan[i].execNode.epSet, "ep0", 6030);
    addEpIntoEpSet(&scanPlan[i].execNode.epSet, "ep1", 6030);
    addEpIntoEpSet(&scanPlan[i].execNode.epSet, "ep2", 6030);
    scanPlan[i].execNode.epSet.inUse = taosRand() % 3;
D
dapan 已提交
179 180 181 182

    scanPlan[i].pChildren = NULL;
    scanPlan[i].level = 1;
    scanPlan[i].pParents = nodesMakeList();
wafwerar's avatar
wafwerar 已提交
183
    scanPlan[i].pNode = (SPhysiNode*)taosMemoryCalloc(1, sizeof(SPhysiNode));
D
dapan1121 已提交
184
    scanPlan[i].msgType = TDMT_SCH_QUERY;
D
dapan 已提交
185 186 187 188 189 190 191 192

    nodesListAppend(scanPlan[i].pParents, (SNode*)mergePlan);
    nodesListAppend(mergePlan->pChildren, (SNode*)(scanPlan + i));

    nodesListAppend(scan->pNodeList, (SNode*)(scanPlan + i));
  }

  mergePlan->id.queryId = qId;
X
Xiaoyu Wang 已提交
193
  mergePlan->id.groupId = schtMergeTemplateId;
D
dapan 已提交
194 195 196
  mergePlan->id.subplanId = 0x5555;
  mergePlan->subplanType = SUBPLAN_TYPE_MERGE;
  mergePlan->level = 0;
X
Xiaoyu Wang 已提交
197
  mergePlan->execNode.epSet.numOfEps = 0;
D
dapan 已提交
198 199

  mergePlan->pParents = NULL;
wafwerar's avatar
wafwerar 已提交
200
  mergePlan->pNode = (SPhysiNode*)taosMemoryCalloc(1, sizeof(SPhysiNode));
D
dapan1121 已提交
201
  mergePlan->msgType = TDMT_SCH_QUERY;
D
dapan 已提交
202 203 204 205 206 207 208 209

  nodesListAppend(merge->pNodeList, (SNode*)mergePlan);

  nodesListAppend(dag->pSubplans, (SNode*)merge);  
  nodesListAppend(dag->pSubplans, (SNode*)scan);
}


X
Xiaoyu Wang 已提交
210
void schtFreeQueryDag(SQueryPlan *dag) {
D
dapan1121 已提交
211 212 213 214

}


X
Xiaoyu Wang 已提交
215
void schtBuildInsertDag(SQueryPlan *dag) {
D
dapan 已提交
216 217 218 219
  uint64_t qId = 0x0000000000000002;
  
  dag->queryId = qId;
  dag->numOfSubplans = 2;
X
Xiaoyu Wang 已提交
220 221
  dag->pSubplans = nodesMakeList();
  SNodeListNode *inserta = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST);
D
dapan 已提交
222
  
wafwerar's avatar
wafwerar 已提交
223
  SSubplan *insertPlan = (SSubplan *)taosMemoryCalloc(2, sizeof(SSubplan));
D
dapan 已提交
224 225

  insertPlan[0].id.queryId = qId;
X
Xiaoyu Wang 已提交
226
  insertPlan[0].id.groupId = 0x0000000000000003;
D
dapan 已提交
227
  insertPlan[0].id.subplanId = 0x0000000000000004;
X
Xiaoyu Wang 已提交
228
  insertPlan[0].subplanType = SUBPLAN_TYPE_MODIFY;
D
dapan 已提交
229
  insertPlan[0].level = 0;
H
Haojun Liao 已提交
230

D
dapan1121 已提交
231
  insertPlan[0].execNode.nodeId = 1;
L
Liu Jicong 已提交
232 233
  insertPlan[0].execNode.epSet.inUse = 0;
  addEpIntoEpSet(&insertPlan[0].execNode.epSet, "ep0", 6030);
H
Haojun Liao 已提交
234

H
Haojun Liao 已提交
235
  insertPlan[0].pChildren = NULL;
D
dapan 已提交
236 237
  insertPlan[0].pParents = NULL;
  insertPlan[0].pNode = NULL;
wafwerar's avatar
wafwerar 已提交
238
  insertPlan[0].pDataSink = (SDataSinkNode*)taosMemoryCalloc(1, sizeof(SDataSinkNode));
D
dapan1121 已提交
239
  insertPlan[0].msgType = TDMT_VND_SUBMIT;
D
dapan 已提交
240 241

  insertPlan[1].id.queryId = qId;
X
Xiaoyu Wang 已提交
242
  insertPlan[1].id.groupId = 0x0000000000000003;
D
dapan 已提交
243
  insertPlan[1].id.subplanId = 0x0000000000000005;
X
Xiaoyu Wang 已提交
244
  insertPlan[1].subplanType = SUBPLAN_TYPE_MODIFY;
D
dapan 已提交
245
  insertPlan[1].level = 0;
H
Haojun Liao 已提交
246

D
dapan1121 已提交
247
  insertPlan[1].execNode.nodeId = 1;
L
Liu Jicong 已提交
248 249
  insertPlan[1].execNode.epSet.inUse = 0;
  addEpIntoEpSet(&insertPlan[1].execNode.epSet, "ep0", 6030);
H
Haojun Liao 已提交
250

H
Haojun Liao 已提交
251
  insertPlan[1].pChildren = NULL;
D
dapan 已提交
252 253
  insertPlan[1].pParents = NULL;
  insertPlan[1].pNode = NULL;
wafwerar's avatar
wafwerar 已提交
254
  insertPlan[1].pDataSink = (SDataSinkNode*)taosMemoryCalloc(1, sizeof(SDataSinkNode));
D
dapan1121 已提交
255
  insertPlan[1].msgType = TDMT_VND_SUBMIT;
D
dapan 已提交
256

D
dapan 已提交
257 258
  inserta->pNodeList = nodesMakeList();

X
Xiaoyu Wang 已提交
259
  nodesListAppend(inserta->pNodeList, (SNode*)insertPlan);
D
dapan1121 已提交
260
  insertPlan += 1;
X
Xiaoyu Wang 已提交
261
  nodesListAppend(inserta->pNodeList, (SNode*)insertPlan);
D
dapan 已提交
262

X
Xiaoyu Wang 已提交
263
  nodesListAppend(dag->pSubplans, (SNode*)inserta);  
D
dapan 已提交
264 265 266
}


D
dapan 已提交
267
int32_t schtPlanToString(const SSubplan *subplan, char** str, int32_t* len) {
wafwerar's avatar
wafwerar 已提交
268
  *str = (char *)taosMemoryCalloc(1, 20);
D
dapan 已提交
269 270 271 272
  *len = 20;
  return 0;
}

X
Xiaoyu Wang 已提交
273
void schtExecNode(SSubplan* subplan, uint64_t groupId, SQueryNodeAddr* ep) {
H
Haojun Liao 已提交
274

D
dapan 已提交
275 276
}

D
dapan1121 已提交
277 278 279 280
void schtRpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) {

}

D
dapan 已提交
281 282 283 284
void schtSetPlanToString() {
  static Stub stub;
  stub.set(qSubPlanToString, schtPlanToString);
  {
wafwerar's avatar
wafwerar 已提交
285 286 287 288 289 290
#ifdef WINDOWS
    AddrAny any;
    std::map<std::string,void*> result;
    any.get_func_addr("qSubPlanToString", result);
#endif
#ifdef LINUX
D
dapan 已提交
291 292 293
    AddrAny any("libplanner.so");
    std::map<std::string,void*> result;
    any.get_global_func_addr_dynsym("^qSubPlanToString$", result);
wafwerar's avatar
wafwerar 已提交
294
#endif
D
dapan 已提交
295 296 297 298 299 300 301 302 303 304
    for (const auto& f : result) {
      stub.set(f.second, schtPlanToString);
    }
  }
}

void schtSetExecNode() {
  static Stub stub;
  stub.set(qSetSubplanExecutionNode, schtExecNode);
  {
wafwerar's avatar
wafwerar 已提交
305 306 307 308 309 310
#ifdef WINDOWS
    AddrAny any;
    std::map<std::string,void*> result;
    any.get_func_addr("qSetSubplanExecutionNode", result);
#endif
#ifdef LINUX
D
dapan 已提交
311 312 313
    AddrAny any("libplanner.so");
    std::map<std::string,void*> result;
    any.get_global_func_addr_dynsym("^qSetSubplanExecutionNode$", result);
wafwerar's avatar
wafwerar 已提交
314
#endif
D
dapan 已提交
315 316 317 318 319 320
    for (const auto& f : result) {
      stub.set(f.second, schtExecNode);
    }
  }
}

D
dapan1121 已提交
321 322 323 324
void schtSetRpcSendRequest() {
  static Stub stub;
  stub.set(rpcSendRequest, schtRpcSendRequest);
  {
wafwerar's avatar
wafwerar 已提交
325 326 327 328 329 330
#ifdef WINDOWS
    AddrAny any;
    std::map<std::string,void*> result;
    any.get_func_addr("rpcSendRequest", result);
#endif
#ifdef LINUX
D
dapan1121 已提交
331 332 333
    AddrAny any("libtransport.so");
    std::map<std::string,void*> result;
    any.get_global_func_addr_dynsym("^rpcSendRequest$", result);
wafwerar's avatar
wafwerar 已提交
334
#endif
D
dapan1121 已提交
335 336 337 338 339 340
    for (const auto& f : result) {
      stub.set(f.second, schtRpcSendRequest);
    }
  }
}

D
dapan1121 已提交
341 342
int32_t schtAsyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo) {
  if (pInfo) {
wafwerar's avatar
wafwerar 已提交
343 344 345
    taosMemoryFreeClear(pInfo->param);
    taosMemoryFreeClear(pInfo->msgInfo.pData);
    taosMemoryFree(pInfo);
D
dapan1121 已提交
346
  }
D
dapan1121 已提交
347 348 349 350 351 352 353 354
  return 0;
}


void schtSetAsyncSendMsgToServer() {
  static Stub stub;
  stub.set(asyncSendMsgToServer, schtAsyncSendMsgToServer);
  {
wafwerar's avatar
wafwerar 已提交
355 356 357 358 359 360
#ifdef WINDOWS
    AddrAny any;
    std::map<std::string,void*> result;
    any.get_func_addr("asyncSendMsgToServer", result);
#endif
#ifdef LINUX
D
dapan1121 已提交
361 362 363
    AddrAny any("libtransport.so");
    std::map<std::string,void*> result;
    any.get_global_func_addr_dynsym("^asyncSendMsgToServer$", result);
wafwerar's avatar
wafwerar 已提交
364
#endif
D
dapan1121 已提交
365 366 367 368 369 370
    for (const auto& f : result) {
      stub.set(f.second, schtAsyncSendMsgToServer);
    }
  }
}

D
dapan1121 已提交
371

D
dapan 已提交
372
void *schtSendRsp(void *param) {
D
dapan1121 已提交
373 374
  SSchJob *pJob = NULL;
  int64_t job = 0;
D
dapan 已提交
375 376 377
  int32_t code = 0;

  while (true) {
D
dapan1121 已提交
378
    job = *(int64_t *)param;
D
dapan 已提交
379 380 381 382
    if (job) {
      break;
    }

wafwerar's avatar
wafwerar 已提交
383
    taosMsleep(1);
D
dapan 已提交
384
  }
D
dapan1121 已提交
385 386

  pJob = schAcquireJob(job);
D
dapan 已提交
387
  
D
dapan1121 已提交
388
  void *pIter = taosHashIterate(pJob->execTasks, NULL);
D
dapan 已提交
389 390 391
  while (pIter) {
    SSchTask *task = *(SSchTask **)pIter;

S
Shengliang Guan 已提交
392
    SSubmitRsp rsp = {0};
D
dapan 已提交
393
    rsp.affectedRows = 10;
D
dapan1121 已提交
394
    schHandleResponseMsg(pJob, task, TDMT_VND_SUBMIT_RSP, (char *)&rsp, sizeof(rsp), 0);
D
dapan 已提交
395
    
D
dapan1121 已提交
396
    pIter = taosHashIterate(pJob->execTasks, pIter);
D
dapan 已提交
397 398
  }    

D
dapan1121 已提交
399 400
  schReleaseJob(job);

D
dapan 已提交
401 402 403
  return NULL;
}

D
dapan1121 已提交
404
void *schtCreateFetchRspThread(void *param) {
D
dapan1121 已提交
405 406
  int64_t job = *(int64_t *)param;
  SSchJob* pJob = schAcquireJob(job);
D
dapan1121 已提交
407

wafwerar's avatar
wafwerar 已提交
408
  taosSsleep(1);
D
dapan1121 已提交
409 410

  int32_t code = 0;
wafwerar's avatar
wafwerar 已提交
411
  SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, sizeof(SRetrieveTableRsp));
D
dapan1121 已提交
412 413
  rsp->completed = 1;
  rsp->numOfRows = 10;
D
dapan1121 已提交
414
 
D
dapan1121 已提交
415
  code = schHandleResponseMsg(pJob, pJob->fetchTask, TDMT_SCH_FETCH_RSP, (char *)rsp, sizeof(*rsp), 0);
D
dapan1121 已提交
416 417 418

  schReleaseJob(job);
  
D
dapan1121 已提交
419
  assert(code == 0);
wafwerar's avatar
wafwerar 已提交
420
  return NULL;
D
dapan1121 已提交
421 422 423
}


D
dapan1121 已提交
424 425
void *schtFetchRspThread(void *aa) {
  SDataBuf dataBuf = {0};
D
dapan1121 已提交
426
  SSchTaskCallbackParam* param = NULL;
D
dapan1121 已提交
427 428 429 430 431 432

  while (!schtTestStop) {
    if (0 == atomic_val_compare_exchange_32(&schtStartFetch, 1, 0)) {
      continue;
    }

wafwerar's avatar
wafwerar 已提交
433
    taosUsleep(1);
D
dapan1121 已提交
434
    
435
    param = (SSchTaskCallbackParam *)taosMemoryCalloc(1, sizeof(*param));
D
dapan1121 已提交
436 437 438 439 440

    param->queryId = schtQueryId;  
    param->taskId = schtFetchTaskId;

    int32_t code = 0;
wafwerar's avatar
wafwerar 已提交
441
    SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, sizeof(SRetrieveTableRsp));
D
dapan1121 已提交
442 443 444 445 446 447
    rsp->completed = 1;
    rsp->numOfRows = 10;

    dataBuf.pData = rsp;
    dataBuf.len = sizeof(*rsp);

D
dapan1121 已提交
448
    code = schHandleCallback(param, &dataBuf, TDMT_SCH_FETCH_RSP, 0);
D
dapan1121 已提交
449 450 451
      
    assert(code == 0 || code);
  }
wafwerar's avatar
wafwerar 已提交
452
  return NULL;
D
dapan1121 已提交
453 454 455 456
}

void schtFreeQueryJob(int32_t freeThread) {
  static uint32_t freeNum = 0;
D
dapan1121 已提交
457
  int64_t job = queryJobRefId;
D
dapan1121 已提交
458
  
D
dapan1121 已提交
459
  if (job && atomic_val_compare_exchange_64(&queryJobRefId, job, 0)) {
D
dapan1121 已提交
460
    schedulerFreeJob(&job, 0);
D
dapan1121 已提交
461 462 463 464 465 466 467 468 469 470 471 472 473 474
    if (freeThread) {
      if (++freeNum % schtTestPrintNum == 0) {
        printf("FreeNum:%d\n", freeNum);
      }
    }
  }
}

void* schtRunJobThread(void *aa) {
  void *mockPointer = (void *)0x1;
  char *clusterId = "cluster1";
  char *dbname = "1.db1";
  char *tablename = "table1";
  SVgroupInfo vgInfo = {0};
X
Xiaoyu Wang 已提交
475
  SQueryPlan dag;
D
dapan1121 已提交
476 477 478 479 480 481 482 483 484 485 486 487

  schtInitLogFile();

  
  int32_t code = schedulerInit(NULL);
  assert(code == 0);


  schtSetPlanToString();
  schtSetExecNode();
  schtSetAsyncSendMsgToServer();

D
dapan1121 已提交
488
  SSchJob *pJob = NULL;
D
dapan1121 已提交
489
  SSchTaskCallbackParam *param = NULL;
D
dapan1121 已提交
490 491 492
  SHashObj *execTasks = NULL;
  SDataBuf dataBuf = {0};
  uint32_t jobFinished = 0;
D
dapan1121 已提交
493
  int32_t queryDone = 0;
D
dapan1121 已提交
494 495 496 497

  while (!schtTestStop) {
    schtBuildQueryDag(&dag);

H
Haojun Liao 已提交
498
    SArray *qnodeList = taosArrayInit(1, sizeof(SEp));
D
dapan1121 已提交
499

H
Haojun Liao 已提交
500
    SEp qnodeAddr = {0};
D
dapan1121 已提交
501 502 503 504
    strcpy(qnodeAddr.fqdn, "qnode0.ep");
    qnodeAddr.port = 6031;
    taosArrayPush(qnodeList, &qnodeAddr);

D
dapan1121 已提交
505
    queryDone = 0;
D
dapan1121 已提交
506 507 508 509
    
    SRequestConnInfo conn = {0};
    conn.pTrans = mockPointer;
    SSchedulerReq req = {0};    
D
dapan1121 已提交
510
    req.syncReq = false;
D
dapan1121 已提交
511 512 513 514
    req.pConn = &conn;
    req.pNodeList = qnodeList;
    req.pDag = &dag;
    req.sql = "select * from tb";
D
dapan1121 已提交
515 516
    req.execFp = schtQueryCb;
    req.execParam = &queryDone;
D
dapan1121 已提交
517
    
D
dapan1121 已提交
518
    code = schedulerExecJob(&req, &queryJobRefId);      
D
dapan1121 已提交
519 520
    assert(code == 0);

D
dapan1121 已提交
521 522 523 524 525 526 527
    pJob = schAcquireJob(queryJobRefId);
    if (NULL == pJob) {
      taosArrayDestroy(qnodeList);
      schtFreeQueryDag(&dag);
      continue;
    }
    
D
dapan1121 已提交
528
    execTasks = taosHashInit(5, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
D
dapan1121 已提交
529
    void *pIter = taosHashIterate(pJob->execTasks, NULL);
D
dapan1121 已提交
530 531 532 533 534
    while (pIter) {
      SSchTask *task = *(SSchTask **)pIter;
      schtFetchTaskId = task->taskId - 1;
      
      taosHashPut(execTasks, &task->taskId, sizeof(task->taskId), task, sizeof(*task));
D
dapan1121 已提交
535
      pIter = taosHashIterate(pJob->execTasks, pIter);
D
dapan1121 已提交
536 537
    }    

538
    param = (SSchTaskCallbackParam *)taosMemoryCalloc(1, sizeof(*param));
D
dapan1121 已提交
539 540
    param->refId = queryJobRefId;
    param->queryId = pJob->queryId;   
D
dapan1121 已提交
541 542 543 544 545 546 547 548 549 550

    pIter = taosHashIterate(execTasks, NULL);
    while (pIter) {
      SSchTask *task = (SSchTask *)pIter;

      param->taskId = task->taskId;
      SQueryTableRsp rsp = {0};
      dataBuf.pData = &rsp;
      dataBuf.len = sizeof(rsp);
      
D
dapan1121 已提交
551
      code = schHandleCallback(param, &dataBuf, TDMT_SCH_QUERY_RSP, 0);
D
dapan1121 已提交
552 553 554 555 556
      assert(code == 0 || code);

      pIter = taosHashIterate(execTasks, pIter);
    }    

557
    param = (SSchTaskCallbackParam *)taosMemoryCalloc(1, sizeof(*param));
D
dapan1121 已提交
558 559
    param->refId = queryJobRefId;
    param->queryId = pJob->queryId;   
D
dapan1121 已提交
560 561 562 563 564 565 566 567 568 569

    pIter = taosHashIterate(execTasks, NULL);
    while (pIter) {
      SSchTask *task = (SSchTask *)pIter;

      param->taskId = task->taskId - 1;
      SQueryTableRsp rsp = {0};
      dataBuf.pData = &rsp;
      dataBuf.len = sizeof(rsp);
      
D
dapan1121 已提交
570
      code = schHandleCallback(param, &dataBuf, TDMT_SCH_QUERY_RSP, 0);
D
dapan1121 已提交
571 572 573 574 575 576
      assert(code == 0 || code);
      
      pIter = taosHashIterate(execTasks, pIter);
    }    


D
dapan1121 已提交
577 578 579 580 581 582 583 584
    while (true) {
      if (queryDone) {
        break;
      }
    
      taosUsleep(10000);
    }

D
dapan1121 已提交
585 586 587
    atomic_store_32(&schtStartFetch, 1);

    void *data = NULL;  
D
dapan1121 已提交
588
    code = schedulerFetchRows(queryJobRefId, &data);
D
dapan1121 已提交
589 590 591 592 593 594 595 596 597
    assert(code == 0 || code);

    if (0 == code) {
      SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data;
      assert(pRsp->completed == 1);
      assert(pRsp->numOfRows == 10);
    }

    data = NULL;
D
dapan1121 已提交
598
    code = schedulerFetchRows(queryJobRefId, &data);
D
dapan1121 已提交
599 600 601 602 603
    assert(code == 0 || code);
    
    schtFreeQueryJob(0);

    taosHashCleanup(execTasks);
D
dapan1121 已提交
604
    taosArrayDestroy(qnodeList);
D
dapan1121 已提交
605 606 607 608 609 610 611 612 613 614 615 616

    schtFreeQueryDag(&dag);

    if (++jobFinished % schtTestPrintNum == 0) {
      printf("jobFinished:%d\n", jobFinished);
    }

    ++schtQueryId;
  }

  schedulerDestroy();

wafwerar's avatar
wafwerar 已提交
617
  return NULL;
D
dapan1121 已提交
618 619 620 621
}

void* schtFreeJobThread(void *aa) {
  while (!schtTestStop) {
wafwerar's avatar
wafwerar 已提交
622
    taosUsleep(taosRand() % 100);
D
dapan1121 已提交
623 624
    schtFreeQueryJob(1);
  }
wafwerar's avatar
wafwerar 已提交
625
  return NULL;
D
dapan1121 已提交
626
}
D
dapan1121 已提交
627 628


D
dapan1121 已提交
629 630
}

D
dapan 已提交
631
TEST(queryTest, normalCase) {
D
dapan1121 已提交
632
  void *mockPointer = (void *)0x1;
D
dapan1121 已提交
633 634 635
  char *clusterId = "cluster1";
  char *dbname = "1.db1";
  char *tablename = "table1";
D
dapan1121 已提交
636
  SVgroupInfo vgInfo = {0};
D
dapan1121 已提交
637
  int64_t job = 0;
H
Haojun Liao 已提交
638
  SQueryPlan dag;
D
dapan1121 已提交
639

640
  memset(&dag, 0, sizeof(dag));
D
dapan1121 已提交
641

H
Haojun Liao 已提交
642
  SArray *qnodeList = taosArrayInit(1, sizeof(SEp));
D
dapan 已提交
643

H
Haojun Liao 已提交
644
  SEp qnodeAddr = {0};
D
dapan 已提交
645 646 647
  strcpy(qnodeAddr.fqdn, "qnode0.ep");
  qnodeAddr.port = 6031;
  taosArrayPush(qnodeList, &qnodeAddr);
D
dapan1121 已提交
648 649
  
  int32_t code = schedulerInit(NULL);
D
dapan1121 已提交
650
  ASSERT_EQ(code, 0);
D
dapan1121 已提交
651

D
dapan 已提交
652
  schtBuildQueryDag(&dag);
D
dapan 已提交
653 654 655

  schtSetPlanToString();
  schtSetExecNode();
D
dapan1121 已提交
656
  schtSetAsyncSendMsgToServer();
D
dapan1121 已提交
657 658

  int32_t queryDone = 0;
D
dapan1121 已提交
659 660 661

  SRequestConnInfo conn = {0};
  conn.pTrans = mockPointer;
D
dapan1121 已提交
662
  SSchedulerReq req = {0};   
D
dapan1121 已提交
663 664 665 666
  req.pConn = &conn;
  req.pNodeList = qnodeList;
  req.pDag = &dag;
  req.sql = "select * from tb";
D
dapan1121 已提交
667 668
  req.execFp = schtQueryCb;
  req.execParam = &queryDone;
D
dapan1121 已提交
669
    
D
dapan1121 已提交
670
  code = schedulerExecJob(&req, &job);  
D
dapan1121 已提交
671
  ASSERT_EQ(code, 0);
D
dapan 已提交
672

D
dapan1121 已提交
673 674 675 676
  
  SSchJob *pJob = schAcquireJob(job);
  
  void *pIter = taosHashIterate(pJob->execTasks, NULL);
D
dapan 已提交
677
  while (pIter) {
D
dapan 已提交
678
    SSchTask *task = *(SSchTask **)pIter;
D
dapan 已提交
679 680

    SQueryTableRsp rsp = {0};
D
dapan1121 已提交
681
    code = schHandleResponseMsg(pJob, task, TDMT_SCH_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
D
dapan 已提交
682 683
    
    ASSERT_EQ(code, 0);
D
dapan1121 已提交
684
    pIter = taosHashIterate(pJob->execTasks, pIter);
D
dapan 已提交
685 686
  }    

D
dapan1121 已提交
687
  pIter = taosHashIterate(pJob->execTasks, NULL);
D
dapan 已提交
688
  while (pIter) {
D
dapan 已提交
689
    SSchTask *task = *(SSchTask **)pIter;
D
dapan 已提交
690 691

    SQueryTableRsp rsp = {0};
D
dapan1121 已提交
692
    code = schHandleResponseMsg(pJob, task, TDMT_SCH_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
D
dapan 已提交
693 694
    
    ASSERT_EQ(code, 0);
D
dapan1121 已提交
695
    pIter = taosHashIterate(pJob->execTasks, pIter);
D
dapan 已提交
696 697
  }    

D
dapan1121 已提交
698 699 700 701 702 703 704 705
  while (true) {
    if (queryDone) {
      break;
    }

    taosUsleep(10000);
  }
  
wafwerar's avatar
wafwerar 已提交
706 707
  TdThreadAttr thattr;
  taosThreadAttrInit(&thattr);
D
dapan 已提交
708

wafwerar's avatar
wafwerar 已提交
709 710
  TdThread thread1;
  taosThreadCreate(&(thread1), &thattr, schtCreateFetchRspThread, &job);
D
dapan 已提交
711

D
dapan1121 已提交
712
  void *data = NULL;  
D
dapan1121 已提交
713
  code = schedulerFetchRows(job, &data);
D
dapan 已提交
714 715 716 717 718
  ASSERT_EQ(code, 0);

  SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data;
  ASSERT_EQ(pRsp->completed, 1);
  ASSERT_EQ(pRsp->numOfRows, 10);
wafwerar's avatar
wafwerar 已提交
719
  taosMemoryFreeClear(data);
D
dapan 已提交
720 721

  data = NULL;
D
dapan1121 已提交
722
  code = schedulerFetchRows(job, &data);
D
dapan 已提交
723
  ASSERT_EQ(code, 0);
D
dapan1121 已提交
724 725 726
  ASSERT_TRUE(data == NULL);

  schReleaseJob(job);
D
dapan 已提交
727

D
dapan1121 已提交
728
  schedulerFreeJob(&job, 0);
D
dapan1121 已提交
729 730

  schtFreeQueryDag(&dag);
D
dapan1121 已提交
731 732

  schedulerDestroy();
D
dapan1121 已提交
733
}
D
dapan1121 已提交
734

D
dapan 已提交
735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760
TEST(queryTest, readyFirstCase) {
  void *mockPointer = (void *)0x1;
  char *clusterId = "cluster1";
  char *dbname = "1.db1";
  char *tablename = "table1";
  SVgroupInfo vgInfo = {0};
  int64_t job = 0;
  SQueryPlan dag;

  memset(&dag, 0, sizeof(dag));

  SArray *qnodeList = taosArrayInit(1, sizeof(SEp));

  SEp qnodeAddr = {0};
  strcpy(qnodeAddr.fqdn, "qnode0.ep");
  qnodeAddr.port = 6031;
  taosArrayPush(qnodeList, &qnodeAddr);
  
  int32_t code = schedulerInit(NULL);
  ASSERT_EQ(code, 0);

  schtBuildQueryDag(&dag);

  schtSetPlanToString();
  schtSetExecNode();
  schtSetAsyncSendMsgToServer();
D
dapan1121 已提交
761 762

  int32_t queryDone = 0;  
D
dapan1121 已提交
763 764 765 766 767 768 769 770

  SRequestConnInfo conn = {0};
  conn.pTrans = mockPointer;
  SSchedulerReq req = {0};    
  req.pConn = &conn;
  req.pNodeList = qnodeList;
  req.pDag = &dag;
  req.sql = "select * from tb";
D
dapan1121 已提交
771 772
  req.execFp = schtQueryCb;
  req.execParam = &queryDone;
D
dapan1121 已提交
773
  code = schedulerExecJob(&req, &job);
D
dapan 已提交
774 775 776 777 778
  ASSERT_EQ(code, 0);

  
  SSchJob *pJob = schAcquireJob(job);
  
D
dapan1121 已提交
779
  void *pIter = taosHashIterate(pJob->execTasks, NULL);
D
dapan 已提交
780 781 782 783
  while (pIter) {
    SSchTask *task = *(SSchTask **)pIter;

    SQueryTableRsp rsp = {0};
D
dapan1121 已提交
784
    code = schHandleResponseMsg(pJob, task, TDMT_SCH_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
D
dapan 已提交
785 786 787 788 789 790 791 792 793 794
    
    ASSERT_EQ(code, 0);
    pIter = taosHashIterate(pJob->execTasks, pIter);
  }    

  pIter = taosHashIterate(pJob->execTasks, NULL);
  while (pIter) {
    SSchTask *task = *(SSchTask **)pIter;

    SQueryTableRsp rsp = {0};
D
dapan1121 已提交
795
    code = schHandleResponseMsg(pJob, task, TDMT_SCH_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
D
dapan 已提交
796 797 798 799 800
    
    ASSERT_EQ(code, 0);
    pIter = taosHashIterate(pJob->execTasks, pIter);
  }    

D
dapan1121 已提交
801 802 803 804 805 806 807
  while (true) {
    if (queryDone) {
      break;
    }

    taosUsleep(10000);
  }
D
dapan 已提交
808 809


D
dapan1121 已提交
810 811
  TdThreadAttr thattr;
  taosThreadAttrInit(&thattr);
D
dapan 已提交
812

D
dapan1121 已提交
813 814
  TdThread thread1;
  taosThreadCreate(&(thread1), &thattr, schtCreateFetchRspThread, &job);
D
dapan 已提交
815 816 817 818 819 820 821 822

  void *data = NULL;  
  code = schedulerFetchRows(job, &data);
  ASSERT_EQ(code, 0);

  SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data;
  ASSERT_EQ(pRsp->completed, 1);
  ASSERT_EQ(pRsp->numOfRows, 10);
D
dapan1121 已提交
823
  taosMemoryFreeClear(data);
D
dapan 已提交
824 825 826 827 828 829 830 831

  data = NULL;
  code = schedulerFetchRows(job, &data);
  ASSERT_EQ(code, 0);
  ASSERT_TRUE(data == NULL);

  schReleaseJob(job);

D
dapan1121 已提交
832
  schedulerFreeJob(&job, 0);
D
dapan 已提交
833 834 835 836 837 838 839 840

  schtFreeQueryDag(&dag);

  schedulerDestroy();
}



D
dapan 已提交
841 842 843 844 845 846 847 848 849 850 851
TEST(queryTest, flowCtrlCase) {
  void *mockPointer = (void *)0x1;
  char *clusterId = "cluster1";
  char *dbname = "1.db1";
  char *tablename = "table1";
  SVgroupInfo vgInfo = {0};
  int64_t job = 0;
  SQueryPlan dag;

  schtInitLogFile();

wafwerar's avatar
wafwerar 已提交
852
  taosSeedRand(taosGetTimestampSec());
D
dapan 已提交
853
  
D
dapan 已提交
854 855 856 857 858 859 860 861 862 863
  SArray *qnodeList = taosArrayInit(1, sizeof(SEp));

  SEp qnodeAddr = {0};
  strcpy(qnodeAddr.fqdn, "qnode0.ep");
  qnodeAddr.port = 6031;
  taosArrayPush(qnodeList, &qnodeAddr);
  
  int32_t code = schedulerInit(NULL);
  ASSERT_EQ(code, 0);

D
dapan 已提交
864
  schtBuildQueryFlowCtrlDag(&dag);
D
dapan 已提交
865 866 867 868

  schtSetPlanToString();
  schtSetExecNode();
  schtSetAsyncSendMsgToServer();
D
dapan1121 已提交
869 870

  int32_t queryDone = 0;  
D
dapan1121 已提交
871 872 873 874 875 876 877
  SRequestConnInfo conn = {0};
  conn.pTrans = mockPointer;
  SSchedulerReq req = {0};    
  req.pConn = &conn;
  req.pNodeList = qnodeList;
  req.pDag = &dag;
  req.sql = "select * from tb";
D
dapan1121 已提交
878 879
  req.execFp = schtQueryCb;
  req.execParam = &queryDone;
D
dapan1121 已提交
880

D
dapan1121 已提交
881
  code = schedulerExecJob(&req, &job);
D
dapan 已提交
882 883 884 885 886
  ASSERT_EQ(code, 0);

  
  SSchJob *pJob = schAcquireJob(job);

D
dapan1121 已提交
887
  bool qDone = false;
D
dapan 已提交
888
  
D
dapan1121 已提交
889
  while (!qDone) {
D
dapan 已提交
890 891 892 893
    void *pIter = taosHashIterate(pJob->execTasks, NULL);
    if (NULL == pIter) {
      break;
    }
D
dapan 已提交
894
    
D
dapan 已提交
895 896
    while (pIter) {
      SSchTask *task = *(SSchTask **)pIter;
D
dapan 已提交
897

D
dapan 已提交
898 899
      taosHashCancelIterate(pJob->execTasks, pIter);

D
dapan1121 已提交
900
      if (task->lastMsgType == TDMT_SCH_QUERY) {
D
dapan 已提交
901
        SQueryTableRsp rsp = {0};
D
dapan1121 已提交
902
        code = schHandleResponseMsg(pJob, task, TDMT_SCH_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
D
dapan 已提交
903 904 905
        
        ASSERT_EQ(code, 0);
      } else {
D
dapan1121 已提交
906
        qDone = true;
D
dapan 已提交
907 908 909 910 911 912
        break;
      }
      
      pIter = NULL;
    }    
  }
D
dapan 已提交
913

D
dapan1121 已提交
914 915 916 917 918 919 920
  while (true) {
    if (queryDone) {
      break;
    }

    taosUsleep(10000);
  }
D
dapan 已提交
921

wafwerar's avatar
wafwerar 已提交
922 923
  TdThreadAttr thattr;
  taosThreadAttrInit(&thattr);
D
dapan 已提交
924

wafwerar's avatar
wafwerar 已提交
925 926
  TdThread thread1;
  taosThreadCreate(&(thread1), &thattr, schtCreateFetchRspThread, &job);
D
dapan 已提交
927 928 929 930 931 932 933 934

  void *data = NULL;  
  code = schedulerFetchRows(job, &data);
  ASSERT_EQ(code, 0);

  SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data;
  ASSERT_EQ(pRsp->completed, 1);
  ASSERT_EQ(pRsp->numOfRows, 10);
wafwerar's avatar
wafwerar 已提交
935
  taosMemoryFreeClear(data);
D
dapan 已提交
936 937 938 939 940 941 942 943

  data = NULL;
  code = schedulerFetchRows(job, &data);
  ASSERT_EQ(code, 0);
  ASSERT_TRUE(data == NULL);

  schReleaseJob(job);

D
dapan1121 已提交
944
  schedulerFreeJob(&job, 0);
D
dapan 已提交
945 946 947 948 949

  schtFreeQueryDag(&dag);

  schedulerDestroy();
}
D
dapan1121 已提交
950

D
dapan 已提交
951 952 953 954 955 956 957

TEST(insertTest, normalCase) {
  void *mockPointer = (void *)0x1;
  char *clusterId = "cluster1";
  char *dbname = "1.db1";
  char *tablename = "table1";
  SVgroupInfo vgInfo = {0};
X
Xiaoyu Wang 已提交
958
  SQueryPlan dag;
D
dapan 已提交
959
  uint64_t numOfRows = 0;
D
dapan1121 已提交
960

H
Haojun Liao 已提交
961
  SArray *qnodeList = taosArrayInit(1, sizeof(SEp));
D
dapan 已提交
962

H
Haojun Liao 已提交
963
  SEp qnodeAddr = {0};
D
dapan 已提交
964 965 966 967 968 969 970 971 972 973
  strcpy(qnodeAddr.fqdn, "qnode0.ep");
  qnodeAddr.port = 6031;
  taosArrayPush(qnodeList, &qnodeAddr);
  
  int32_t code = schedulerInit(NULL);
  ASSERT_EQ(code, 0);

  schtBuildInsertDag(&dag);

  schtSetPlanToString();
D
dapan1121 已提交
974
  schtSetAsyncSendMsgToServer();
D
dapan 已提交
975

wafwerar's avatar
wafwerar 已提交
976 977
  TdThreadAttr thattr;
  taosThreadAttrInit(&thattr);
D
dapan 已提交
978

wafwerar's avatar
wafwerar 已提交
979 980
  TdThread thread1;
  taosThreadCreate(&(thread1), &thattr, schtSendRsp, &insertJobRefId);
D
dapan1121 已提交
981 982

  SQueryResult res = {0};
D
dapan1121 已提交
983 984 985 986 987 988 989 990

  SRequestConnInfo conn = {0};
  conn.pTrans = mockPointer;
  SSchedulerReq req = {0};    
  req.pConn = &conn;
  req.pNodeList = qnodeList;
  req.pDag = &dag;
  req.sql = "insert into tb values(now,1)";
D
dapan1121 已提交
991 992
  req.execFp = schtQueryCb;
  req.execParam = NULL;
D
dapan1121 已提交
993 994
  
  code = schedulerExecJob(&req, &insertJobRefId, &res);
D
dapan 已提交
995
  ASSERT_EQ(code, 0);
D
dapan1121 已提交
996
  ASSERT_EQ(res.numOfRows, 20);
D
dapan 已提交
997

D
dapan1121 已提交
998
  schedulerFreeJob(&insertJobRefId, 0);
D
dapan1121 已提交
999 1000

  schedulerDestroy();  
D
dapan 已提交
1001 1002
}

D
dapan1121 已提交
1003
TEST(multiThread, forceFree) {
wafwerar's avatar
wafwerar 已提交
1004 1005
  TdThreadAttr thattr;
  taosThreadAttrInit(&thattr);
D
dapan 已提交
1006

wafwerar's avatar
wafwerar 已提交
1007 1008 1009 1010
  TdThread thread1, thread2, thread3;
  taosThreadCreate(&(thread1), &thattr, schtRunJobThread, NULL);
  taosThreadCreate(&(thread2), &thattr, schtFreeJobThread, NULL);
  taosThreadCreate(&(thread3), &thattr, schtFetchRspThread, NULL);
D
dapan1121 已提交
1011

D
dapan1121 已提交
1012 1013
  while (true) {
    if (schtTestDeadLoop) {
wafwerar's avatar
wafwerar 已提交
1014
      taosSsleep(1);
D
dapan1121 已提交
1015
    } else {
wafwerar's avatar
wafwerar 已提交
1016
      taosSsleep(schtTestMTRunSec);
D
dapan1121 已提交
1017 1018 1019 1020 1021
      break;
    }
  }
  
  schtTestStop = true;
wafwerar's avatar
wafwerar 已提交
1022
  taosSsleep(3);
D
dapan1121 已提交
1023
}
D
dapan 已提交
1024

D
dapan1121 已提交
1025
int main(int argc, char** argv) {
wafwerar's avatar
wafwerar 已提交
1026
  taosSeedRand(taosGetTimestampSec());
D
dapan1121 已提交
1027 1028 1029 1030
  testing::InitGoogleTest(&argc, argv);
  return RUN_ALL_TESTS();
}

D
dapan1121 已提交
1031
#pragma GCC diagnostic pop