qworkerTests.cpp 33.0 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include <gtest/gtest.h>
#include <iostream>

S
Shengliang Guan 已提交
19 20
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings"
D
dapan1121 已提交
21 22 23
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
S
Shengliang Guan 已提交
24 25 26 27
#pragma GCC diagnostic ignored "-Wsign-compare"
#pragma GCC diagnostic ignored "-Wformat"
#pragma GCC diagnostic ignored "-Wint-to-pointer-cast"
#pragma GCC diagnostic ignored "-Wpointer-arith"
28
#include <addr_any.h>
S
Shengliang Guan 已提交
29

wafwerar's avatar
wafwerar 已提交
30 31 32
#ifdef WINDOWS
#define TD_USE_WINSOCK
#endif
D
dapan1121 已提交
33 34
#include "os.h"

H
Hongze Cheng 已提交
35 36
#include "dataSinkMgt.h"
#include "executor.h"
D
dapan 已提交
37 38
#include "planner.h"
#include "qworker.h"
D
dapan1121 已提交
39
#include "stub.h"
H
Hongze Cheng 已提交
40 41 42 43 44 45
#include "taos.h"
#include "tdatablock.h"
#include "tdef.h"
#include "tglobal.h"
#include "trpc.h"
#include "tvariant.h"
D
dapan1121 已提交
46 47 48

namespace {

D
dapan1121 已提交
49 50 51
#define qwtTestQueryQueueSize 1000000
#define qwtTestFetchQueueSize 1000000

D
dapan1121 已提交
52 53
bool qwtEnableLog = true;

D
dapan1121 已提交
54 55
int32_t qwtTestMaxExecTaskUsec = 2;
int32_t qwtTestReqMaxDelayUsec = 2;
D
dapan1121 已提交
56

H
Hongze Cheng 已提交
57 58 59 60 61 62
int64_t  qwtTestQueryId = 0;
bool     qwtTestEnableSleep = true;
bool     qwtTestStop = false;
bool     qwtTestDeadLoop = false;
int32_t  qwtTestMTRunSec = 2;
int32_t  qwtTestPrintNum = 10000;
D
dapan1121 已提交
63 64
uint64_t qwtTestCaseIdx = 0;
uint64_t qwtTestCaseNum = 4;
H
Hongze Cheng 已提交
65 66 67 68 69 70 71 72 73
bool     qwtTestCaseFinished = false;
tsem_t   qwtTestQuerySem;
tsem_t   qwtTestFetchSem;
int32_t  qwtTestQuitThreadNum = 0;

int32_t         qwtTestQueryQueueRIdx = 0;
int32_t         qwtTestQueryQueueWIdx = 0;
int32_t         qwtTestQueryQueueNum = 0;
SRWLatch        qwtTestQueryQueueLock = 0;
D
dapan1121 已提交
74 75
struct SRpcMsg *qwtTestQueryQueue[qwtTestQueryQueueSize] = {0};

H
Hongze Cheng 已提交
76 77 78 79
int32_t         qwtTestFetchQueueRIdx = 0;
int32_t         qwtTestFetchQueueWIdx = 0;
int32_t         qwtTestFetchQueueNum = 0;
SRWLatch        qwtTestFetchQueueLock = 0;
D
dapan1121 已提交
80 81
struct SRpcMsg *qwtTestFetchQueue[qwtTestFetchQueueSize] = {0};

H
Hongze Cheng 已提交
82 83 84
int32_t  qwtTestSinkBlockNum = 0;
int32_t  qwtTestSinkMaxBlockNum = 0;
bool     qwtTestSinkQueryEnd = false;
D
dapan1121 已提交
85
SRWLatch qwtTestSinkLock = 0;
H
Hongze Cheng 已提交
86 87 88 89 90 91 92 93 94
int32_t  qwtTestSinkLastLen = 0;

SSubQueryMsg       qwtqueryMsg = {0};
SRpcMsg            qwtfetchRpc = {0};
SResFetchReq       qwtfetchMsg = {0};
SRpcMsg            qwtreadyRpc = {0};
SResReadyReq       qwtreadyMsg = {0};
SRpcMsg            qwtdropRpc = {0};
STaskDropReq       qwtdropMsg = {0};
D
dapan1121 已提交
95
SSchTasksStatusReq qwtstatusMsg = {0};
D
dapan1121 已提交
96

D
dapan1121 已提交
97
void qwtInitLogFile() {
D
dapan1121 已提交
98 99 100
  if (!qwtEnableLog) {
    return;
  }
H
Hongze Cheng 已提交
101 102
  const char   *defaultLogFileNamePrefix = "taosdlog";
  const int32_t maxLogFileNum = 10;
D
dapan1121 已提交
103 104 105

  tsAsyncLog = 0;
  qDebugFlag = 159;
wafwerar's avatar
wafwerar 已提交
106
  strcpy(tsLogDir, TD_LOG_DIR_PATH);
D
dapan1121 已提交
107

S
Shengliang Guan 已提交
108
  if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) {
S
os env  
Shengliang Guan 已提交
109
    printf("failed to open log file in directory:%s\n", tsLogDir);
D
dapan1121 已提交
110 111 112 113
  }
}

void qwtBuildQueryReqMsg(SRpcMsg *queryRpc) {
D
dapan1121 已提交
114 115 116
  qwtqueryMsg.queryId = htobe64(atomic_add_fetch_64(&qwtTestQueryId, 1));
  qwtqueryMsg.sId = htobe64(1);
  qwtqueryMsg.taskId = htobe64(1);
D
dapan1121 已提交
117
  qwtqueryMsg.msgLen = htonl(100);
118
  qwtqueryMsg.sqlLen = 0;
D
dapan1121 已提交
119
  queryRpc->msgType = TDMT_SCH_QUERY;
D
dapan1121 已提交
120
  queryRpc->pCont = &qwtqueryMsg;
D
dapan1121 已提交
121 122 123 124 125
  queryRpc->contLen = sizeof(SSubQueryMsg) + 100;
}

void qwtBuildFetchReqMsg(SResFetchReq *fetchMsg, SRpcMsg *fetchRpc) {
  fetchMsg->sId = htobe64(1);
D
dapan1121 已提交
126
  fetchMsg->queryId = htobe64(atomic_load_64(&qwtTestQueryId));
D
dapan1121 已提交
127
  fetchMsg->taskId = htobe64(1);
D
dapan1121 已提交
128
  fetchRpc->msgType = TDMT_SCH_FETCH;
D
dapan1121 已提交
129 130 131 132 133
  fetchRpc->pCont = fetchMsg;
  fetchRpc->contLen = sizeof(SResFetchReq);
}

void qwtBuildDropReqMsg(STaskDropReq *dropMsg, SRpcMsg *dropRpc) {
D
dapan1121 已提交
134 135 136 137 138 139 140 141 142
  dropMsg->sId = 1;
  dropMsg->queryId = atomic_load_64(&qwtTestQueryId);
  dropMsg->taskId = 1;
  
  int32_t msgSize = tSerializeSTaskDropReq(NULL, 0, dropMsg);
  if (msgSize < 0) {
    return;
  }
  
D
dapan1121 已提交
143
  char *msg = (char*)taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
144 145 146 147 148 149 150 151 152 153
  if (NULL == msg) {
    return;
  }
  
  if (tSerializeSTaskDropReq(msg, msgSize, dropMsg) < 0) {
    taosMemoryFree(msg);
    return;
  }


D
dapan1121 已提交
154
  dropRpc->msgType = TDMT_SCH_DROP_TASK;
D
dapan1121 已提交
155 156
  dropRpc->pCont = msg;
  dropRpc->contLen = msgSize;
D
dapan1121 已提交
157 158
}

H
Hongze Cheng 已提交
159
int32_t qwtStringToPlan(const char *str, SSubplan **subplan) {
D
dapan 已提交
160
  *subplan = (SSubplan *)0x1;
D
dapan 已提交
161
  return 0;
D
dapan1121 已提交
162 163
}

D
dapan1121 已提交
164 165
int32_t qwtPutReqToFetchQueue(void *node, struct SRpcMsg *pMsg) {
  taosWLockLatch(&qwtTestFetchQueueLock);
wafwerar's avatar
wafwerar 已提交
166
  struct SRpcMsg *newMsg = (struct SRpcMsg *)taosMemoryCalloc(1, sizeof(struct SRpcMsg));
H
Hongze Cheng 已提交
167
  memcpy(newMsg, pMsg, sizeof(struct SRpcMsg));
D
dapan1121 已提交
168
  qwtTestFetchQueue[qwtTestFetchQueueWIdx++] = newMsg;
D
dapan1121 已提交
169 170 171
  if (qwtTestFetchQueueWIdx >= qwtTestFetchQueueSize) {
    qwtTestFetchQueueWIdx = 0;
  }
H
Hongze Cheng 已提交
172

D
dapan1121 已提交
173 174 175 176 177 178 179
  qwtTestFetchQueueNum++;

  if (qwtTestFetchQueueWIdx == qwtTestFetchQueueRIdx) {
    printf("Fetch queue is full");
    assert(0);
  }
  taosWUnLockLatch(&qwtTestFetchQueueLock);
H
Hongze Cheng 已提交
180

D
dapan1121 已提交
181
  tsem_post(&qwtTestFetchSem);
H
Hongze Cheng 已提交
182

D
dapan1121 已提交
183 184 185
  return 0;
}

S
Shengliang Guan 已提交
186
int32_t qwtPutReqToQueue(void *node, EQueueType qtype, struct SRpcMsg *pMsg) {
D
dapan1121 已提交
187
  taosWLockLatch(&qwtTestQueryQueueLock);
wafwerar's avatar
wafwerar 已提交
188
  struct SRpcMsg *newMsg = (struct SRpcMsg *)taosMemoryCalloc(1, sizeof(struct SRpcMsg));
D
dapan1121 已提交
189 190
  memcpy(newMsg, pMsg, sizeof(struct SRpcMsg));
  qwtTestQueryQueue[qwtTestQueryQueueWIdx++] = newMsg;
D
dapan1121 已提交
191 192 193
  if (qwtTestQueryQueueWIdx >= qwtTestQueryQueueSize) {
    qwtTestQueryQueueWIdx = 0;
  }
S
Shengliang Guan 已提交
194

D
dapan1121 已提交
195 196 197 198 199 200 201
  qwtTestQueryQueueNum++;

  if (qwtTestQueryQueueWIdx == qwtTestQueryQueueRIdx) {
    printf("query queue is full");
    assert(0);
  }
  taosWUnLockLatch(&qwtTestQueryQueueLock);
H
Hongze Cheng 已提交
202

D
dapan1121 已提交
203
  tsem_post(&qwtTestQuerySem);
D
dapan1121 已提交
204

H
Hongze Cheng 已提交
205
  return 0;
D
dapan1121 已提交
206
}
D
dapan1121 已提交
207

H
Hongze Cheng 已提交
208
void qwtSendReqToDnode(void *pVnode, struct SEpSet *epSet, struct SRpcMsg *pReq) {}
D
dapan1121 已提交
209

D
dapan1121 已提交
210
void qwtRpcSendResponse(const SRpcMsg *pRsp) {
D
dapan1121 已提交
211
  switch (pRsp->msgType) {
D
dapan1121 已提交
212 213
    case TDMT_SCH_QUERY_RSP:
    case TDMT_SCH_MERGE_QUERY_RSP: {
D
dapan1121 已提交
214 215
      SQueryTableRsp *rsp = (SQueryTableRsp *)pRsp->pCont;

D
dapan1121 已提交
216
      if (pRsp->code) {
D
dapan1121 已提交
217
        qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc);
D
dapan 已提交
218
        qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc);
D
dapan1121 已提交
219
      }
H
Hongze Cheng 已提交
220

D
dapan1121 已提交
221
      rpcFreeCont(rsp);
D
dapan1121 已提交
222 223
      break;
    }
D
dapan1121 已提交
224 225
    case TDMT_SCH_FETCH_RSP:
    case TDMT_SCH_MERGE_FETCH_RSP: {
D
dapan1121 已提交
226
      SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)pRsp->pCont;
H
Hongze Cheng 已提交
227

D
dapan1121 已提交
228 229
      if (0 == pRsp->code && 0 == rsp->completed) {
        qwtBuildFetchReqMsg(&qwtfetchMsg, &qwtfetchRpc);
D
dapan 已提交
230
        qwtPutReqToFetchQueue((void *)0x1, &qwtfetchRpc);
D
dapan1121 已提交
231
        rpcFreeCont(rsp);
D
dapan1121 已提交
232 233 234 235
        return;
      }

      qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc);
D
dapan 已提交
236
      qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc);
D
dapan1121 已提交
237
      rpcFreeCont(rsp);
H
Hongze Cheng 已提交
238

D
dapan1121 已提交
239 240
      break;
    }
D
dapan1121 已提交
241
    case TDMT_SCH_DROP_TASK_RSP: {
D
dapan1121 已提交
242
      STaskDropRsp *rsp = (STaskDropRsp *)pRsp->pCont;
D
dapan1121 已提交
243
      rpcFreeCont(rsp);
D
dapan1121 已提交
244 245 246

      qwtTestCaseFinished = true;
      break;
D
dapan1121 已提交
247 248
    }
  }
D
dapan1121 已提交
249

D
dapan1121 已提交
250 251 252
  return;
}

H
Hongze Cheng 已提交
253 254
int32_t qwtCreateExecTask(void *tsdb, int32_t vgId, uint64_t taskId, struct SSubplan *pPlan, qTaskInfo_t *pTaskInfo,
                          DataSinkHandle *handle) {
D
dapan1121 已提交
255
  qwtTestSinkBlockNum = 0;
wafwerar's avatar
wafwerar 已提交
256
  qwtTestSinkMaxBlockNum = taosRand() % 100 + 1;
D
dapan1121 已提交
257
  qwtTestSinkQueryEnd = false;
H
Hongze Cheng 已提交
258 259 260

  *pTaskInfo = (qTaskInfo_t)((char *)qwtTestCaseIdx + 1);
  *handle = (DataSinkHandle)((char *)qwtTestCaseIdx + 2);
D
dapan1121 已提交
261 262

  ++qwtTestCaseIdx;
H
Hongze Cheng 已提交
263

D
dapan1121 已提交
264 265 266
  return 0;
}

H
Hongze Cheng 已提交
267
int32_t qwtExecTask(qTaskInfo_t tinfo, SSDataBlock **pRes, uint64_t *useconds) {
D
dapan1121 已提交
268
  int32_t endExec = 0;
H
Hongze Cheng 已提交
269

D
dapan1121 已提交
270 271 272 273
  if (NULL == tinfo) {
    *pRes = NULL;
    *useconds = 0;
  } else {
D
dapan1121 已提交
274 275
    if (qwtTestSinkQueryEnd) {
      *pRes = NULL;
wafwerar's avatar
wafwerar 已提交
276
      *useconds = taosRand() % 10;
D
dapan1121 已提交
277 278
      return 0;
    }
H
Hongze Cheng 已提交
279

wafwerar's avatar
wafwerar 已提交
280
    endExec = taosRand() % 5;
H
Hongze Cheng 已提交
281

D
dapan1121 已提交
282 283
    int32_t runTime = 0;
    if (qwtTestEnableSleep && qwtTestMaxExecTaskUsec > 0) {
wafwerar's avatar
wafwerar 已提交
284
      runTime = taosRand() % qwtTestMaxExecTaskUsec;
D
dapan1121 已提交
285 286 287 288
    }

    if (qwtTestEnableSleep) {
      if (runTime) {
wafwerar's avatar
wafwerar 已提交
289
        taosUsleep(runTime);
D
dapan1121 已提交
290 291
      }
    }
H
Hongze Cheng 已提交
292

D
dapan1121 已提交
293
    if (endExec) {
H
Hongze Cheng 已提交
294
      *pRes = (SSDataBlock *)taosMemoryCalloc(1, sizeof(SSDataBlock));
D
dapan1121 已提交
295
      (*pRes)->info.rows = taosRand() % 1000 + 1;
D
dapan1121 已提交
296 297
    } else {
      *pRes = NULL;
wafwerar's avatar
wafwerar 已提交
298
      *useconds = taosRand() % 10;
D
dapan1121 已提交
299 300
    }
  }
D
dapan1121 已提交
301 302 303 304

  return 0;
}

D
dapan1121 已提交
305
int32_t qwtKillTask(qTaskInfo_t qinfo, int32_t rspCode) { return 0; }
D
dapan1121 已提交
306

H
Hongze Cheng 已提交
307
void qwtDestroyTask(qTaskInfo_t qHandle) {}
D
dapan1121 已提交
308

H
Hongze Cheng 已提交
309
int32_t qwtPutDataBlock(DataSinkHandle handle, const SInputData *pInput, bool *pContinue) {
D
dapan1121 已提交
310 311 312 313
  if (NULL == handle || NULL == pInput || NULL == pContinue) {
    assert(0);
  }

wafwerar's avatar
wafwerar 已提交
314
  taosMemoryFree((void *)pInput->pData);
D
dapan1121 已提交
315

D
dapan1121 已提交
316 317 318 319 320 321
  taosWLockLatch(&qwtTestSinkLock);

  qwtTestSinkBlockNum++;

  if (qwtTestSinkBlockNum >= qwtTestSinkMaxBlockNum) {
    *pContinue = false;
D
dapan1121 已提交
322 323
  } else {
    *pContinue = true;
D
dapan1121 已提交
324 325
  }
  taosWUnLockLatch(&qwtTestSinkLock);
H
Hongze Cheng 已提交
326

D
dapan1121 已提交
327 328 329 330
  return 0;
}

void qwtEndPut(DataSinkHandle handle, uint64_t useconds) {
D
dapan1121 已提交
331 332 333 334 335
  if (NULL == handle) {
    assert(0);
  }

  qwtTestSinkQueryEnd = true;
D
dapan1121 已提交
336 337
}

H
Hongze Cheng 已提交
338
void qwtGetDataLength(DataSinkHandle handle, int64_t *pLen, bool *pQueryEnd) {
D
dapan1121 已提交
339 340 341 342 343 344 345
  static int32_t in = 0;

  if (in > 0) {
    assert(0);
  }

  atomic_add_fetch_32(&in, 1);
H
Hongze Cheng 已提交
346

D
dapan1121 已提交
347 348 349 350 351 352
  if (NULL == handle) {
    assert(0);
  }

  taosWLockLatch(&qwtTestSinkLock);
  if (qwtTestSinkBlockNum > 0) {
wafwerar's avatar
wafwerar 已提交
353
    *pLen = taosRand() % 100 + 1;
D
dapan1121 已提交
354 355 356 357
    qwtTestSinkBlockNum--;
  } else {
    *pLen = 0;
  }
D
dapan1121 已提交
358
  qwtTestSinkLastLen = *pLen;
D
dapan1121 已提交
359 360 361 362 363
  taosWUnLockLatch(&qwtTestSinkLock);

  *pQueryEnd = qwtTestSinkQueryEnd;

  atomic_sub_fetch_32(&in, 1);
D
dapan1121 已提交
364 365
}

H
Hongze Cheng 已提交
366
int32_t qwtGetDataBlock(DataSinkHandle handle, SOutputData *pOutput) {
D
dapan1121 已提交
367
  taosWLockLatch(&qwtTestSinkLock);
D
dapan1121 已提交
368
  if (qwtTestSinkLastLen > 0) {
wafwerar's avatar
wafwerar 已提交
369
    pOutput->numOfRows = taosRand() % 10 + 1;
D
dapan1121 已提交
370
    pOutput->compressed = 1;
D
dapan1121 已提交
371 372 373
    pOutput->queryEnd = qwtTestSinkQueryEnd;
    if (qwtTestSinkBlockNum == 0) {
      pOutput->bufStatus = DS_BUF_EMPTY;
H
Hongze Cheng 已提交
374
    } else if (qwtTestSinkBlockNum <= qwtTestSinkMaxBlockNum * 0.5) {
D
dapan1121 已提交
375 376 377 378
      pOutput->bufStatus = DS_BUF_LOW;
    } else {
      pOutput->bufStatus = DS_BUF_FULL;
    }
wafwerar's avatar
wafwerar 已提交
379
    pOutput->useconds = taosRand() % 10 + 1;
D
dapan1121 已提交
380 381 382 383 384
    pOutput->precision = 1;
  } else if (qwtTestSinkLastLen == 0) {
    pOutput->numOfRows = 0;
    pOutput->compressed = 1;
    pOutput->pData = NULL;
D
dapan1121 已提交
385 386 387
    pOutput->queryEnd = qwtTestSinkQueryEnd;
    if (qwtTestSinkBlockNum == 0) {
      pOutput->bufStatus = DS_BUF_EMPTY;
H
Hongze Cheng 已提交
388
    } else if (qwtTestSinkBlockNum <= qwtTestSinkMaxBlockNum * 0.5) {
D
dapan1121 已提交
389 390 391 392
      pOutput->bufStatus = DS_BUF_LOW;
    } else {
      pOutput->bufStatus = DS_BUF_FULL;
    }
wafwerar's avatar
wafwerar 已提交
393
    pOutput->useconds = taosRand() % 10 + 1;
D
dapan1121 已提交
394 395 396 397 398
    pOutput->precision = 1;
  } else {
    assert(0);
  }
  taosWUnLockLatch(&qwtTestSinkLock);
D
dapan1121 已提交
399

H
Hongze Cheng 已提交
400
  return 0;
D
dapan1121 已提交
401 402
}

H
Hongze Cheng 已提交
403
void qwtDestroyDataSinker(DataSinkHandle handle) {}
D
dapan1121 已提交
404

D
dapan 已提交
405
void stubSetStringToPlan() {
D
dapan1121 已提交
406
  static Stub stub;
D
dapan 已提交
407
  stub.set(qStringToSubplan, qwtStringToPlan);
D
dapan1121 已提交
408
  {
wafwerar's avatar
wafwerar 已提交
409
#ifdef WINDOWS
H
Hongze Cheng 已提交
410 411
    AddrAny                       any;
    std::map<std::string, void *> result;
wafwerar's avatar
wafwerar 已提交
412 413 414
    any.get_func_addr("qStringToSubplan", result);
#endif
#ifdef LINUX
H
Hongze Cheng 已提交
415 416
    AddrAny                       any("libplanner.so");
    std::map<std::string, void *> result;
D
dapan 已提交
417
    any.get_global_func_addr_dynsym("^qStringToSubplan$", result);
wafwerar's avatar
wafwerar 已提交
418
#endif
H
Hongze Cheng 已提交
419
    for (const auto &f : result) {
D
dapan 已提交
420
      stub.set(f.second, qwtStringToPlan);
D
dapan1121 已提交
421 422 423 424
    }
  }
}

D
dapan1121 已提交
425 426 427 428
void stubSetExecTask() {
  static Stub stub;
  stub.set(qExecTask, qwtExecTask);
  {
wafwerar's avatar
wafwerar 已提交
429
#ifdef WINDOWS
H
Hongze Cheng 已提交
430 431
    AddrAny                       any;
    std::map<std::string, void *> result;
wafwerar's avatar
wafwerar 已提交
432 433 434
    any.get_func_addr("qExecTask", result);
#endif
#ifdef LINUX
H
Hongze Cheng 已提交
435 436
    AddrAny                       any("libexecutor.so");
    std::map<std::string, void *> result;
D
dapan1121 已提交
437
    any.get_global_func_addr_dynsym("^qExecTask$", result);
wafwerar's avatar
wafwerar 已提交
438
#endif
H
Hongze Cheng 已提交
439
    for (const auto &f : result) {
D
dapan1121 已提交
440 441 442 443 444 445 446 447 448
      stub.set(f.second, qwtExecTask);
    }
  }
}

void stubSetCreateExecTask() {
  static Stub stub;
  stub.set(qCreateExecTask, qwtCreateExecTask);
  {
wafwerar's avatar
wafwerar 已提交
449
#ifdef WINDOWS
H
Hongze Cheng 已提交
450 451
    AddrAny                       any;
    std::map<std::string, void *> result;
wafwerar's avatar
wafwerar 已提交
452 453 454
    any.get_func_addr("qCreateExecTask", result);
#endif
#ifdef LINUX
H
Hongze Cheng 已提交
455 456
    AddrAny                       any("libexecutor.so");
    std::map<std::string, void *> result;
D
dapan1121 已提交
457
    any.get_global_func_addr_dynsym("^qCreateExecTask$", result);
wafwerar's avatar
wafwerar 已提交
458
#endif
H
Hongze Cheng 已提交
459
    for (const auto &f : result) {
D
dapan1121 已提交
460 461 462 463 464 465 466 467 468
      stub.set(f.second, qwtCreateExecTask);
    }
  }
}

void stubSetAsyncKillTask() {
  static Stub stub;
  stub.set(qAsyncKillTask, qwtKillTask);
  {
wafwerar's avatar
wafwerar 已提交
469
#ifdef WINDOWS
H
Hongze Cheng 已提交
470 471
    AddrAny                       any;
    std::map<std::string, void *> result;
wafwerar's avatar
wafwerar 已提交
472 473 474
    any.get_func_addr("qAsyncKillTask", result);
#endif
#ifdef LINUX
H
Hongze Cheng 已提交
475 476
    AddrAny                       any("libexecutor.so");
    std::map<std::string, void *> result;
D
dapan1121 已提交
477
    any.get_global_func_addr_dynsym("^qAsyncKillTask$", result);
wafwerar's avatar
wafwerar 已提交
478
#endif
H
Hongze Cheng 已提交
479
    for (const auto &f : result) {
D
dapan1121 已提交
480 481 482 483 484 485 486 487 488
      stub.set(f.second, qwtKillTask);
    }
  }
}

void stubSetDestroyTask() {
  static Stub stub;
  stub.set(qDestroyTask, qwtDestroyTask);
  {
wafwerar's avatar
wafwerar 已提交
489
#ifdef WINDOWS
H
Hongze Cheng 已提交
490 491
    AddrAny                       any;
    std::map<std::string, void *> result;
wafwerar's avatar
wafwerar 已提交
492 493 494
    any.get_func_addr("qDestroyTask", result);
#endif
#ifdef LINUX
H
Hongze Cheng 已提交
495 496
    AddrAny                       any("libexecutor.so");
    std::map<std::string, void *> result;
D
dapan1121 已提交
497
    any.get_global_func_addr_dynsym("^qDestroyTask$", result);
wafwerar's avatar
wafwerar 已提交
498
#endif
H
Hongze Cheng 已提交
499
    for (const auto &f : result) {
D
dapan1121 已提交
500 501 502 503 504 505 506 507 508
      stub.set(f.second, qwtDestroyTask);
    }
  }
}

void stubSetDestroyDataSinker() {
  static Stub stub;
  stub.set(dsDestroyDataSinker, qwtDestroyDataSinker);
  {
wafwerar's avatar
wafwerar 已提交
509
#ifdef WINDOWS
H
Hongze Cheng 已提交
510 511
    AddrAny                       any;
    std::map<std::string, void *> result;
wafwerar's avatar
wafwerar 已提交
512 513 514
    any.get_func_addr("dsDestroyDataSinker", result);
#endif
#ifdef LINUX
H
Hongze Cheng 已提交
515 516
    AddrAny                       any("libexecutor.so");
    std::map<std::string, void *> result;
D
dapan1121 已提交
517
    any.get_global_func_addr_dynsym("^dsDestroyDataSinker$", result);
wafwerar's avatar
wafwerar 已提交
518
#endif
H
Hongze Cheng 已提交
519
    for (const auto &f : result) {
D
dapan1121 已提交
520 521 522 523 524 525 526 527 528
      stub.set(f.second, qwtDestroyDataSinker);
    }
  }
}

void stubSetGetDataLength() {
  static Stub stub;
  stub.set(dsGetDataLength, qwtGetDataLength);
  {
wafwerar's avatar
wafwerar 已提交
529
#ifdef WINDOWS
H
Hongze Cheng 已提交
530 531
    AddrAny                       any;
    std::map<std::string, void *> result;
wafwerar's avatar
wafwerar 已提交
532 533 534
    any.get_func_addr("dsGetDataLength", result);
#endif
#ifdef LINUX
H
Hongze Cheng 已提交
535 536
    AddrAny                       any("libexecutor.so");
    std::map<std::string, void *> result;
D
dapan1121 已提交
537
    any.get_global_func_addr_dynsym("^dsGetDataLength$", result);
wafwerar's avatar
wafwerar 已提交
538
#endif
H
Hongze Cheng 已提交
539
    for (const auto &f : result) {
D
dapan1121 已提交
540 541 542 543 544 545 546 547 548
      stub.set(f.second, qwtGetDataLength);
    }
  }
}

void stubSetEndPut() {
  static Stub stub;
  stub.set(dsEndPut, qwtEndPut);
  {
wafwerar's avatar
wafwerar 已提交
549
#ifdef WINDOWS
H
Hongze Cheng 已提交
550 551
    AddrAny                       any;
    std::map<std::string, void *> result;
wafwerar's avatar
wafwerar 已提交
552 553 554
    any.get_func_addr("dsEndPut", result);
#endif
#ifdef LINUX
H
Hongze Cheng 已提交
555 556
    AddrAny                       any("libexecutor.so");
    std::map<std::string, void *> result;
D
dapan1121 已提交
557
    any.get_global_func_addr_dynsym("^dsEndPut$", result);
wafwerar's avatar
wafwerar 已提交
558
#endif
H
Hongze Cheng 已提交
559
    for (const auto &f : result) {
D
dapan1121 已提交
560 561 562 563 564 565 566 567 568
      stub.set(f.second, qwtEndPut);
    }
  }
}

void stubSetPutDataBlock() {
  static Stub stub;
  stub.set(dsPutDataBlock, qwtPutDataBlock);
  {
wafwerar's avatar
wafwerar 已提交
569
#ifdef WINDOWS
H
Hongze Cheng 已提交
570 571
    AddrAny                       any;
    std::map<std::string, void *> result;
wafwerar's avatar
wafwerar 已提交
572 573 574
    any.get_func_addr("dsPutDataBlock", result);
#endif
#ifdef LINUX
H
Hongze Cheng 已提交
575 576
    AddrAny                       any("libexecutor.so");
    std::map<std::string, void *> result;
D
dapan1121 已提交
577
    any.get_global_func_addr_dynsym("^dsPutDataBlock$", result);
wafwerar's avatar
wafwerar 已提交
578
#endif
H
Hongze Cheng 已提交
579
    for (const auto &f : result) {
D
dapan1121 已提交
580 581 582 583 584
      stub.set(f.second, qwtPutDataBlock);
    }
  }
}

D
dapan1121 已提交
585 586 587 588
void stubSetRpcSendResponse() {
  static Stub stub;
  stub.set(rpcSendResponse, qwtRpcSendResponse);
  {
wafwerar's avatar
wafwerar 已提交
589
#ifdef WINDOWS
H
Hongze Cheng 已提交
590 591
    AddrAny                       any;
    std::map<std::string, void *> result;
wafwerar's avatar
wafwerar 已提交
592 593 594
    any.get_func_addr("rpcSendResponse", result);
#endif
#ifdef LINUX
H
Hongze Cheng 已提交
595 596
    AddrAny                       any("libtransport.so");
    std::map<std::string, void *> result;
D
dapan1121 已提交
597
    any.get_global_func_addr_dynsym("^rpcSendResponse$", result);
wafwerar's avatar
wafwerar 已提交
598
#endif
H
Hongze Cheng 已提交
599
    for (const auto &f : result) {
D
dapan1121 已提交
600 601 602 603 604
      stub.set(f.second, qwtRpcSendResponse);
    }
  }
}

D
dapan1121 已提交
605 606 607 608
void stubSetGetDataBlock() {
  static Stub stub;
  stub.set(dsGetDataBlock, qwtGetDataBlock);
  {
wafwerar's avatar
wafwerar 已提交
609
#ifdef WINDOWS
H
Hongze Cheng 已提交
610 611
    AddrAny                       any;
    std::map<std::string, void *> result;
wafwerar's avatar
wafwerar 已提交
612 613 614
    any.get_func_addr("dsGetDataBlock", result);
#endif
#ifdef LINUX
H
Hongze Cheng 已提交
615 616
    AddrAny                       any("libtransport.so");
    std::map<std::string, void *> result;
D
dapan1121 已提交
617
    any.get_global_func_addr_dynsym("^dsGetDataBlock$", result);
wafwerar's avatar
wafwerar 已提交
618
#endif
H
Hongze Cheng 已提交
619
    for (const auto &f : result) {
D
dapan1121 已提交
620 621 622 623 624
      stub.set(f.second, qwtGetDataBlock);
    }
  }
}

D
dapan1121 已提交
625
void *queryThread(void *param) {
H
Hongze Cheng 已提交
626 627
  SRpcMsg  queryRpc = {0};
  int32_t  code = 0;
D
dapan1121 已提交
628
  uint32_t n = 0;
H
Hongze Cheng 已提交
629 630
  void    *mockPointer = (void *)0x1;
  void    *mgmt = param;
D
dapan1121 已提交
631

D
dapan1121 已提交
632 633
  while (!qwtTestStop) {
    qwtBuildQueryReqMsg(&queryRpc);
H
Hongze Cheng 已提交
634
    qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0);
D
dapan1121 已提交
635
    if (qwtTestEnableSleep) {
H
Hongze Cheng 已提交
636
      taosUsleep(taosRand() % 5);
D
dapan1121 已提交
637 638
    }
    if (++n % qwtTestPrintNum == 0) {
D
dapan1121 已提交
639 640 641 642 643 644 645 646
      printf("query:%d\n", n);
    }
  }

  return NULL;
}

void *fetchThread(void *param) {
H
Hongze Cheng 已提交
647 648 649 650 651
  SRpcMsg      fetchRpc = {0};
  int32_t      code = 0;
  uint32_t     n = 0;
  void        *mockPointer = (void *)0x1;
  void        *mgmt = param;
S
Shengliang Guan 已提交
652
  SResFetchReq fetchMsg = {0};
D
dapan1121 已提交
653

D
dapan1121 已提交
654 655
  while (!qwtTestStop) {
    qwtBuildFetchReqMsg(&fetchMsg, &fetchRpc);
D
dapan1121 已提交
656
    code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0);
D
dapan1121 已提交
657
    if (qwtTestEnableSleep) {
H
Hongze Cheng 已提交
658
      taosUsleep(taosRand() % 5);
D
dapan1121 已提交
659 660
    }
    if (++n % qwtTestPrintNum == 0) {
D
dapan1121 已提交
661
      printf("fetch:%d\n", n);
H
Hongze Cheng 已提交
662
    }
D
dapan1121 已提交
663 664 665 666 667 668
  }

  return NULL;
}

void *dropThread(void *param) {
H
Hongze Cheng 已提交
669 670 671 672 673 674
  SRpcMsg      dropRpc = {0};
  int32_t      code = 0;
  uint32_t     n = 0;
  void        *mockPointer = (void *)0x1;
  void        *mgmt = param;
  STaskDropReq dropMsg = {0};
D
dapan1121 已提交
675

D
dapan1121 已提交
676 677
  while (!qwtTestStop) {
    qwtBuildDropReqMsg(&dropMsg, &dropRpc);
D
dapan1121 已提交
678
    code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0);
D
dapan1121 已提交
679
    if (qwtTestEnableSleep) {
H
Hongze Cheng 已提交
680
      taosUsleep(taosRand() % 5);
D
dapan1121 已提交
681 682
    }
    if (++n % qwtTestPrintNum == 0) {
D
dapan1121 已提交
683
      printf("drop:%d\n", n);
H
Hongze Cheng 已提交
684
    }
D
dapan1121 已提交
685 686 687 688 689
  }

  return NULL;
}

D
dapan1121 已提交
690
void *qwtclientThread(void *param) {
H
Hongze Cheng 已提交
691
  int32_t  code = 0;
D
dapan1121 已提交
692
  uint32_t n = 0;
H
Hongze Cheng 已提交
693 694 695
  void    *mgmt = param;
  void    *mockPointer = (void *)0x1;
  SRpcMsg  queryRpc = {0};
D
dapan1121 已提交
696

wafwerar's avatar
wafwerar 已提交
697
  taosSsleep(1);
D
dapan1121 已提交
698 699

  while (!qwtTestStop) {
D
dapan1121 已提交
700
    qwtTestCaseFinished = false;
S
Shengliang Guan 已提交
701

D
dapan1121 已提交
702
    qwtBuildQueryReqMsg(&queryRpc);
S
Shengliang Guan 已提交
703
    qwtPutReqToQueue((void *)0x1, QUERY_QUEUE, &queryRpc);
D
dapan1121 已提交
704 705

    while (!qwtTestCaseFinished) {
wafwerar's avatar
wafwerar 已提交
706
      taosUsleep(1);
D
dapan1121 已提交
707
    }
H
Hongze Cheng 已提交
708

D
dapan1121 已提交
709
    if (++n % qwtTestPrintNum == 0) {
D
dapan1121 已提交
710
      printf("case run:%d\n", n);
D
dapan1121 已提交
711 712 713
    }
  }

D
dapan1121 已提交
714 715
  atomic_add_fetch_32(&qwtTestQuitThreadNum, 1);

D
dapan1121 已提交
716 717
  return NULL;
}
D
dapan1121 已提交
718

D
dapan1121 已提交
719
void *queryQueueThread(void *param) {
H
Hongze Cheng 已提交
720
  void    *mockPointer = (void *)0x1;
D
dapan1121 已提交
721
  SRpcMsg *queryRpc = NULL;
H
Hongze Cheng 已提交
722
  void    *mgmt = param;
D
dapan1121 已提交
723

D
dapan1121 已提交
724
  while (true) {
D
dapan1121 已提交
725 726
    tsem_wait(&qwtTestQuerySem);

D
dapan1121 已提交
727
    if (qwtTestStop && qwtTestQueryQueueNum <= 0 && qwtTestCaseFinished) {
D
dapan1121 已提交
728 729 730
      break;
    }

D
dapan1121 已提交
731 732 733 734 735
    taosWLockLatch(&qwtTestQueryQueueLock);
    if (qwtTestQueryQueueNum <= 0 || qwtTestQueryQueueRIdx == qwtTestQueryQueueWIdx) {
      printf("query queue is empty\n");
      assert(0);
    }
H
Hongze Cheng 已提交
736

D
dapan1121 已提交
737
    queryRpc = qwtTestQueryQueue[qwtTestQueryQueueRIdx++];
H
Hongze Cheng 已提交
738

D
dapan1121 已提交
739 740 741
    if (qwtTestQueryQueueRIdx >= qwtTestQueryQueueSize) {
      qwtTestQueryQueueRIdx = 0;
    }
H
Hongze Cheng 已提交
742

D
dapan1121 已提交
743 744 745
    qwtTestQueryQueueNum--;
    taosWUnLockLatch(&qwtTestQueryQueueLock);

D
dapan1121 已提交
746
    if (qwtTestEnableSleep && qwtTestReqMaxDelayUsec > 0) {
wafwerar's avatar
wafwerar 已提交
747
      int32_t delay = taosRand() % qwtTestReqMaxDelayUsec;
D
dapan1121 已提交
748 749

      if (delay) {
wafwerar's avatar
wafwerar 已提交
750
        taosUsleep(delay);
D
dapan1121 已提交
751 752
      }
    }
H
Hongze Cheng 已提交
753

D
dapan1121 已提交
754
    if (TDMT_SCH_QUERY == queryRpc->msgType) {
D
dapan1121 已提交
755
      qWorkerProcessQueryMsg(mockPointer, mgmt, queryRpc, 0);
D
dapan1121 已提交
756
    } else if (TDMT_SCH_QUERY_CONTINUE == queryRpc->msgType) {
D
dapan1121 已提交
757
      qWorkerProcessCQueryMsg(mockPointer, mgmt, queryRpc, 0);
D
dapan1121 已提交
758 759 760 761
    } else {
      printf("unknown msg in query queue, type:%d\n", queryRpc->msgType);
      assert(0);
    }
D
dapan1121 已提交
762

wafwerar's avatar
wafwerar 已提交
763
    taosMemoryFree(queryRpc);
D
dapan1121 已提交
764

D
dapan1121 已提交
765
    if (qwtTestStop && qwtTestQueryQueueNum <= 0 && qwtTestCaseFinished) {
D
dapan1121 已提交
766 767
      break;
    }
D
dapan1121 已提交
768
  }
D
dapan1121 已提交
769

D
dapan1121 已提交
770 771
  atomic_add_fetch_32(&qwtTestQuitThreadNum, 1);

D
dapan 已提交
772
  return NULL;
D
dapan1121 已提交
773 774 775
}

void *fetchQueueThread(void *param) {
H
Hongze Cheng 已提交
776
  void    *mockPointer = (void *)0x1;
D
dapan1121 已提交
777
  SRpcMsg *fetchRpc = NULL;
H
Hongze Cheng 已提交
778
  void    *mgmt = param;
D
dapan1121 已提交
779

D
dapan1121 已提交
780
  while (true) {
D
dapan1121 已提交
781 782
    tsem_wait(&qwtTestFetchSem);

D
dapan1121 已提交
783 784
    if (qwtTestStop && qwtTestFetchQueueNum <= 0 && qwtTestCaseFinished) {
      break;
H
Hongze Cheng 已提交
785
    }
D
dapan1121 已提交
786

D
dapan1121 已提交
787 788 789 790 791
    taosWLockLatch(&qwtTestFetchQueueLock);
    if (qwtTestFetchQueueNum <= 0 || qwtTestFetchQueueRIdx == qwtTestFetchQueueWIdx) {
      printf("Fetch queue is empty\n");
      assert(0);
    }
H
Hongze Cheng 已提交
792

D
dapan1121 已提交
793
    fetchRpc = qwtTestFetchQueue[qwtTestFetchQueueRIdx++];
H
Hongze Cheng 已提交
794

D
dapan1121 已提交
795 796 797
    if (qwtTestFetchQueueRIdx >= qwtTestFetchQueueSize) {
      qwtTestFetchQueueRIdx = 0;
    }
H
Hongze Cheng 已提交
798

D
dapan1121 已提交
799 800 801
    qwtTestFetchQueueNum--;
    taosWUnLockLatch(&qwtTestFetchQueueLock);

D
dapan1121 已提交
802
    if (qwtTestEnableSleep && qwtTestReqMaxDelayUsec > 0) {
wafwerar's avatar
wafwerar 已提交
803
      int32_t delay = taosRand() % qwtTestReqMaxDelayUsec;
D
dapan1121 已提交
804 805

      if (delay) {
wafwerar's avatar
wafwerar 已提交
806
        taosUsleep(delay);
D
dapan1121 已提交
807 808 809
      }
    }

D
dapan1121 已提交
810
    switch (fetchRpc->msgType) {
D
dapan1121 已提交
811
      case TDMT_SCH_FETCH:
D
dapan1121 已提交
812
      case TDMT_SCH_MERGE_FETCH:
D
dapan1121 已提交
813
        qWorkerProcessFetchMsg(mockPointer, mgmt, fetchRpc, 0);
D
dapan1121 已提交
814
        break;
D
dapan1121 已提交
815
      case TDMT_SCH_CANCEL_TASK:
D
dapan1121 已提交
816
        //qWorkerProcessCancelMsg(mockPointer, mgmt, fetchRpc, 0);
D
dapan1121 已提交
817
        break;
D
dapan1121 已提交
818
      case TDMT_SCH_DROP_TASK:
D
dapan1121 已提交
819
        qWorkerProcessDropMsg(mockPointer, mgmt, fetchRpc, 0);
D
dapan1121 已提交
820
        break;
D
dapan1121 已提交
821 822 823
      default:
        printf("unknown msg type:%d in fetch queue", fetchRpc->msgType);
        assert(0);
D
dapan1121 已提交
824
        break;
D
dapan1121 已提交
825
    }
D
dapan1121 已提交
826

wafwerar's avatar
wafwerar 已提交
827
    taosMemoryFree(fetchRpc);
D
dapan1121 已提交
828

D
dapan1121 已提交
829
    if (qwtTestStop && qwtTestFetchQueueNum <= 0 && qwtTestCaseFinished) {
D
dapan1121 已提交
830
      break;
H
Hongze Cheng 已提交
831
    }
D
dapan1121 已提交
832
  }
D
dapan1121 已提交
833

D
dapan1121 已提交
834 835
  atomic_add_fetch_32(&qwtTestQuitThreadNum, 1);

D
dapan 已提交
836
  return NULL;
D
dapan1121 已提交
837 838
}

H
Hongze Cheng 已提交
839
}  // namespace
D
dapan1121 已提交
840

D
dapan1121 已提交
841
TEST(seqTest, normalCase) {
H
Hongze Cheng 已提交
842
  void   *mgmt = NULL;
D
dapan 已提交
843
  int32_t code = 0;
H
Hongze Cheng 已提交
844
  void   *mockPointer = (void *)0x1;
D
dapan 已提交
845 846 847
  SRpcMsg queryRpc = {0};
  SRpcMsg fetchRpc = {0};
  SRpcMsg dropRpc = {0};
D
dapan1121 已提交
848 849

  qwtInitLogFile();
D
dapan 已提交
850

D
dapan1121 已提交
851 852 853
  qwtBuildQueryReqMsg(&queryRpc);
  qwtBuildFetchReqMsg(&qwtfetchMsg, &fetchRpc);
  qwtBuildDropReqMsg(&qwtdropMsg, &dropRpc);
H
Hongze Cheng 已提交
854

D
dapan 已提交
855
  stubSetStringToPlan();
D
dapan1121 已提交
856
  stubSetRpcSendResponse();
D
dapan1121 已提交
857 858 859 860 861 862 863 864 865
  stubSetExecTask();
  stubSetCreateExecTask();
  stubSetAsyncKillTask();
  stubSetDestroyTask();
  stubSetDestroyDataSinker();
  stubSetGetDataLength();
  stubSetEndPut();
  stubSetPutDataBlock();
  stubSetGetDataBlock();
H
Hongze Cheng 已提交
866

X
Xiaoyu Wang 已提交
867
  SMsgCb msgCb = {0};
868
  msgCb.mgmt = (void *)mockPointer;
S
Shengliang Guan 已提交
869
  msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue;
D
dapan1121 已提交
870
  code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb);
D
dapan 已提交
871 872
  ASSERT_EQ(code, 0);

D
dapan1121 已提交
873
  code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0);
D
dapan 已提交
874 875
  ASSERT_EQ(code, 0);

H
Hongze Cheng 已提交
876 877
  // code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc);
  // ASSERT_EQ(code, 0);
D
dapan 已提交
878

D
dapan1121 已提交
879
  code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0);
D
dapan 已提交
880 881
  ASSERT_EQ(code, 0);

D
dapan1121 已提交
882
  code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0);
D
dapan1121 已提交
883 884 885 886 887 888
  ASSERT_EQ(code, 0);

  qWorkerDestroy(&mgmt);
}

TEST(seqTest, cancelFirst) {
H
Hongze Cheng 已提交
889
  void   *mgmt = NULL;
D
dapan1121 已提交
890
  int32_t code = 0;
H
Hongze Cheng 已提交
891
  void   *mockPointer = (void *)0x1;
D
dapan1121 已提交
892 893
  SRpcMsg queryRpc = {0};
  SRpcMsg dropRpc = {0};
D
dapan1121 已提交
894 895

  qwtInitLogFile();
H
Hongze Cheng 已提交
896

D
dapan1121 已提交
897 898
  qwtBuildQueryReqMsg(&queryRpc);
  qwtBuildDropReqMsg(&qwtdropMsg, &dropRpc);
D
dapan1121 已提交
899 900 901

  stubSetStringToPlan();
  stubSetRpcSendResponse();
S
Shengliang Guan 已提交
902

X
Xiaoyu Wang 已提交
903
  SMsgCb msgCb = {0};
904
  msgCb.mgmt = (void *)mockPointer;
S
Shengliang Guan 已提交
905
  msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue;
D
dapan1121 已提交
906
  code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb);
D
dapan1121 已提交
907 908
  ASSERT_EQ(code, 0);

D
dapan1121 已提交
909
  code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0);
D
dapan 已提交
910
  ASSERT_EQ(code, 0);
D
dapan1121 已提交
911

D
dapan1121 已提交
912
  code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0);
D
dapan1121 已提交
913
  ASSERT_TRUE(0 != code);
D
dapan1121 已提交
914 915 916 917 918

  qWorkerDestroy(&mgmt);
}

TEST(seqTest, randCase) {
H
Hongze Cheng 已提交
919 920 921 922 923 924 925 926 927 928 929
  void              *mgmt = NULL;
  int32_t            code = 0;
  void              *mockPointer = (void *)0x1;
  SRpcMsg            queryRpc = {0};
  SRpcMsg            readyRpc = {0};
  SRpcMsg            fetchRpc = {0};
  SRpcMsg            dropRpc = {0};
  SRpcMsg            statusRpc = {0};
  SResReadyReq       readyMsg = {0};
  SResFetchReq       fetchMsg = {0};
  STaskDropReq       dropMsg = {0};
S
Shengliang Guan 已提交
930
  SSchTasksStatusReq statusMsg = {0};
D
dapan1121 已提交
931 932

  qwtInitLogFile();
H
Hongze Cheng 已提交
933

D
dapan1121 已提交
934 935
  stubSetStringToPlan();
  stubSetRpcSendResponse();
D
dapan1121 已提交
936
  stubSetCreateExecTask();
D
dapan1121 已提交
937

wafwerar's avatar
wafwerar 已提交
938
  taosSeedRand(taosGetTimestampSec());
S
Shengliang Guan 已提交
939

X
Xiaoyu Wang 已提交
940
  SMsgCb msgCb = {0};
941
  msgCb.mgmt = (void *)mockPointer;
S
Shengliang Guan 已提交
942
  msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue;
D
dapan1121 已提交
943
  code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb);
D
dapan1121 已提交
944 945 946 947 948
  ASSERT_EQ(code, 0);

  int32_t t = 0;
  int32_t maxr = 10001;
  while (true) {
wafwerar's avatar
wafwerar 已提交
949
    int32_t r = taosRand() % maxr;
H
Hongze Cheng 已提交
950 951 952

    if (r >= 0 && r < maxr / 5) {
      printf("Query,%d\n", t++);
D
dapan1121 已提交
953
      qwtBuildQueryReqMsg(&queryRpc);
D
dapan1121 已提交
954
      code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0);
H
Hongze Cheng 已提交
955 956 957 958 959 960 961 962
    } else if (r >= maxr / 5 && r < maxr * 2 / 5) {
      // printf("Ready,%d\n", t++);
      // qwtBuildReadyReqMsg(&readyMsg, &readyRpc);
      // code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc);
      // if (qwtTestEnableSleep) {
      //   taosUsleep(1);
      // }
    } else if (r >= maxr * 2 / 5 && r < maxr * 3 / 5) {
D
dapan1121 已提交
963
      printf("Fetch,%d\n", t++);
D
dapan1121 已提交
964
      qwtBuildFetchReqMsg(&fetchMsg, &fetchRpc);
D
dapan1121 已提交
965
      code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0);
D
dapan1121 已提交
966
      if (qwtTestEnableSleep) {
wafwerar's avatar
wafwerar 已提交
967
        taosUsleep(1);
D
dapan1121 已提交
968
      }
H
Hongze Cheng 已提交
969
    } else if (r >= maxr * 3 / 5 && r < maxr * 4 / 5) {
D
dapan1121 已提交
970
      printf("Drop,%d\n", t++);
D
dapan1121 已提交
971
      qwtBuildDropReqMsg(&dropMsg, &dropRpc);
D
dapan1121 已提交
972
      code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0);
D
dapan1121 已提交
973
      if (qwtTestEnableSleep) {
wafwerar's avatar
wafwerar 已提交
974
        taosUsleep(1);
D
dapan1121 已提交
975
      }
H
Hongze Cheng 已提交
976
    } else if (r >= maxr * 4 / 5 && r < maxr - 1) {
D
dapan1121 已提交
977
      printf("Status,%d\n", t++);
D
dapan1121 已提交
978
      if (qwtTestEnableSleep) {
wafwerar's avatar
wafwerar 已提交
979
        taosUsleep(1);
H
Hongze Cheng 已提交
980
      }
D
dapan1121 已提交
981 982 983 984 985 986 987 988 989 990
    } else {
      printf("QUIT RAND NOW");
      break;
    }
  }

  qWorkerDestroy(&mgmt);
}

TEST(seqTest, multithreadRand) {
H
Hongze Cheng 已提交
991
  void   *mgmt = NULL;
D
dapan1121 已提交
992
  int32_t code = 0;
H
Hongze Cheng 已提交
993
  void   *mockPointer = (void *)0x1;
D
dapan1121 已提交
994 995

  qwtInitLogFile();
H
Hongze Cheng 已提交
996

D
dapan1121 已提交
997 998
  stubSetStringToPlan();
  stubSetRpcSendResponse();
D
dapan1121 已提交
999 1000 1001 1002 1003 1004 1005 1006 1007
  stubSetExecTask();
  stubSetCreateExecTask();
  stubSetAsyncKillTask();
  stubSetDestroyTask();
  stubSetDestroyDataSinker();
  stubSetGetDataLength();
  stubSetEndPut();
  stubSetPutDataBlock();
  stubSetGetDataBlock();
D
dapan1121 已提交
1008

wafwerar's avatar
wafwerar 已提交
1009
  taosSeedRand(taosGetTimestampSec());
S
Shengliang Guan 已提交
1010

X
Xiaoyu Wang 已提交
1011
  SMsgCb msgCb = {0};
1012
  msgCb.mgmt = (void *)mockPointer;
S
Shengliang Guan 已提交
1013
  msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue;
D
dapan1121 已提交
1014
  code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb);
D
dapan1121 已提交
1015 1016
  ASSERT_EQ(code, 0);

wafwerar's avatar
wafwerar 已提交
1017 1018
  TdThreadAttr thattr;
  taosThreadAttrInit(&thattr);
D
dapan1121 已提交
1019

H
Hongze Cheng 已提交
1020
  TdThread t1, t2, t3, t4, t5, t6;
wafwerar's avatar
wafwerar 已提交
1021
  taosThreadCreate(&(t1), &thattr, queryThread, mgmt);
H
Hongze Cheng 已提交
1022
  // taosThreadCreate(&(t2), &thattr, readyThread, NULL);
wafwerar's avatar
wafwerar 已提交
1023 1024
  taosThreadCreate(&(t3), &thattr, fetchThread, NULL);
  taosThreadCreate(&(t4), &thattr, dropThread, NULL);
X
Xiaoyu Wang 已提交
1025
  taosThreadCreate(&(t6), &thattr, fetchQueueThread, mgmt);
D
dapan1121 已提交
1026

D
dapan1121 已提交
1027 1028
  while (true) {
    if (qwtTestDeadLoop) {
wafwerar's avatar
wafwerar 已提交
1029
      taosSsleep(1);
D
dapan1121 已提交
1030
    } else {
wafwerar's avatar
wafwerar 已提交
1031
      taosSsleep(qwtTestMTRunSec);
D
dapan1121 已提交
1032 1033 1034
      break;
    }
  }
H
Hongze Cheng 已提交
1035

D
dapan1121 已提交
1036
  qwtTestStop = true;
wafwerar's avatar
wafwerar 已提交
1037
  taosSsleep(3);
D
dapan1121 已提交
1038 1039 1040 1041 1042 1043 1044 1045 1046

  qwtTestQueryQueueNum = 0;
  qwtTestQueryQueueRIdx = 0;
  qwtTestQueryQueueWIdx = 0;
  qwtTestQueryQueueLock = 0;
  qwtTestFetchQueueNum = 0;
  qwtTestFetchQueueRIdx = 0;
  qwtTestFetchQueueWIdx = 0;
  qwtTestFetchQueueLock = 0;
H
Hongze Cheng 已提交
1047

D
dapan1121 已提交
1048 1049 1050
  qWorkerDestroy(&mgmt);
}

D
dapan1121 已提交
1051
TEST(rcTest, shortExecshortDelay) {
H
Hongze Cheng 已提交
1052
  void   *mgmt = NULL;
D
dapan1121 已提交
1053
  int32_t code = 0;
H
Hongze Cheng 已提交
1054
  void   *mockPointer = (void *)0x1;
D
dapan1121 已提交
1055 1056

  qwtInitLogFile();
H
Hongze Cheng 已提交
1057

D
dapan1121 已提交
1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069
  stubSetStringToPlan();
  stubSetRpcSendResponse();
  stubSetExecTask();
  stubSetCreateExecTask();
  stubSetAsyncKillTask();
  stubSetDestroyTask();
  stubSetDestroyDataSinker();
  stubSetGetDataLength();
  stubSetEndPut();
  stubSetPutDataBlock();
  stubSetGetDataBlock();

wafwerar's avatar
wafwerar 已提交
1070
  taosSeedRand(taosGetTimestampSec());
D
dapan1121 已提交
1071 1072 1073
  qwtTestStop = false;
  qwtTestQuitThreadNum = 0;

S
Shengliang Guan 已提交
1074
  SMsgCb msgCb = {0};
1075
  msgCb.mgmt = (void *)mockPointer;
S
Shengliang Guan 已提交
1076
  msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue;
D
dapan1121 已提交
1077
  code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb);
D
dapan1121 已提交
1078 1079 1080 1081 1082 1083 1084 1085
  ASSERT_EQ(code, 0);

  qwtTestMaxExecTaskUsec = 0;
  qwtTestReqMaxDelayUsec = 0;

  tsem_init(&qwtTestQuerySem, 0, 0);
  tsem_init(&qwtTestFetchSem, 0, 0);

wafwerar's avatar
wafwerar 已提交
1086 1087
  TdThreadAttr thattr;
  taosThreadAttrInit(&thattr);
D
dapan1121 已提交
1088

H
Hongze Cheng 已提交
1089
  TdThread t1, t2, t3, t4, t5;
wafwerar's avatar
wafwerar 已提交
1090 1091 1092
  taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt);
  taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt);
  taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt);
D
dapan1121 已提交
1093 1094 1095

  while (true) {
    if (qwtTestDeadLoop) {
wafwerar's avatar
wafwerar 已提交
1096
      taosSsleep(1);
D
dapan1121 已提交
1097
    } else {
wafwerar's avatar
wafwerar 已提交
1098
      taosSsleep(qwtTestMTRunSec);
D
dapan1121 已提交
1099 1100 1101
      break;
    }
  }
H
Hongze Cheng 已提交
1102

D
dapan1121 已提交
1103 1104 1105 1106 1107 1108
  qwtTestStop = true;

  while (true) {
    if (qwtTestQuitThreadNum == 3) {
      break;
    }
H
Hongze Cheng 已提交
1109

wafwerar's avatar
wafwerar 已提交
1110
    taosSsleep(1);
D
dapan1121 已提交
1111 1112

    if (qwtTestCaseFinished) {
H
Hongze Cheng 已提交
1113
      if (qwtTestQuitThreadNum < 3) {
D
dapan1121 已提交
1114 1115 1116
        tsem_post(&qwtTestQuerySem);
        tsem_post(&qwtTestFetchSem);

wafwerar's avatar
wafwerar 已提交
1117
        taosUsleep(10);
D
dapan1121 已提交
1118 1119
      }
    }
D
dapan1121 已提交
1120 1121 1122 1123 1124 1125 1126 1127 1128 1129
  }

  qwtTestQueryQueueNum = 0;
  qwtTestQueryQueueRIdx = 0;
  qwtTestQueryQueueWIdx = 0;
  qwtTestQueryQueueLock = 0;
  qwtTestFetchQueueNum = 0;
  qwtTestFetchQueueRIdx = 0;
  qwtTestFetchQueueWIdx = 0;
  qwtTestFetchQueueLock = 0;
H
Hongze Cheng 已提交
1130 1131

  qWorkerDestroy(&mgmt);
D
dapan1121 已提交
1132 1133 1134
}

TEST(rcTest, longExecshortDelay) {
H
Hongze Cheng 已提交
1135
  void   *mgmt = NULL;
D
dapan1121 已提交
1136
  int32_t code = 0;
H
Hongze Cheng 已提交
1137
  void   *mockPointer = (void *)0x1;
D
dapan1121 已提交
1138 1139

  qwtInitLogFile();
H
Hongze Cheng 已提交
1140

D
dapan1121 已提交
1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152
  stubSetStringToPlan();
  stubSetRpcSendResponse();
  stubSetExecTask();
  stubSetCreateExecTask();
  stubSetAsyncKillTask();
  stubSetDestroyTask();
  stubSetDestroyDataSinker();
  stubSetGetDataLength();
  stubSetEndPut();
  stubSetPutDataBlock();
  stubSetGetDataBlock();

wafwerar's avatar
wafwerar 已提交
1153
  taosSeedRand(taosGetTimestampSec());
D
dapan1121 已提交
1154 1155 1156
  qwtTestStop = false;
  qwtTestQuitThreadNum = 0;

S
Shengliang Guan 已提交
1157
  SMsgCb msgCb = {0};
1158
  msgCb.mgmt = (void *)mockPointer;
S
Shengliang Guan 已提交
1159
  msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue;
D
dapan1121 已提交
1160
  code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb);
D
dapan1121 已提交
1161 1162 1163 1164 1165 1166 1167 1168
  ASSERT_EQ(code, 0);

  qwtTestMaxExecTaskUsec = 1000000;
  qwtTestReqMaxDelayUsec = 0;

  tsem_init(&qwtTestQuerySem, 0, 0);
  tsem_init(&qwtTestFetchSem, 0, 0);

wafwerar's avatar
wafwerar 已提交
1169 1170
  TdThreadAttr thattr;
  taosThreadAttrInit(&thattr);
D
dapan1121 已提交
1171

H
Hongze Cheng 已提交
1172
  TdThread t1, t2, t3, t4, t5;
wafwerar's avatar
wafwerar 已提交
1173 1174 1175
  taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt);
  taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt);
  taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt);
D
dapan1121 已提交
1176 1177 1178

  while (true) {
    if (qwtTestDeadLoop) {
wafwerar's avatar
wafwerar 已提交
1179
      taosSsleep(1);
D
dapan1121 已提交
1180
    } else {
wafwerar's avatar
wafwerar 已提交
1181
      taosSsleep(qwtTestMTRunSec);
D
dapan1121 已提交
1182 1183 1184
      break;
    }
  }
H
Hongze Cheng 已提交
1185

D
dapan1121 已提交
1186 1187 1188 1189 1190 1191
  qwtTestStop = true;

  while (true) {
    if (qwtTestQuitThreadNum == 3) {
      break;
    }
H
Hongze Cheng 已提交
1192

wafwerar's avatar
wafwerar 已提交
1193
    taosSsleep(1);
D
dapan1121 已提交
1194 1195

    if (qwtTestCaseFinished) {
H
Hongze Cheng 已提交
1196
      if (qwtTestQuitThreadNum < 3) {
D
dapan1121 已提交
1197 1198
        tsem_post(&qwtTestQuerySem);
        tsem_post(&qwtTestFetchSem);
H
Hongze Cheng 已提交
1199

wafwerar's avatar
wafwerar 已提交
1200
        taosUsleep(10);
D
dapan1121 已提交
1201 1202
      }
    }
D
dapan1121 已提交
1203 1204 1205 1206 1207 1208 1209 1210 1211 1212
  }

  qwtTestQueryQueueNum = 0;
  qwtTestQueryQueueRIdx = 0;
  qwtTestQueryQueueWIdx = 0;
  qwtTestQueryQueueLock = 0;
  qwtTestFetchQueueNum = 0;
  qwtTestFetchQueueRIdx = 0;
  qwtTestFetchQueueWIdx = 0;
  qwtTestFetchQueueLock = 0;
H
Hongze Cheng 已提交
1213

D
dapan1121 已提交
1214 1215 1216 1217
  qWorkerDestroy(&mgmt);
}

TEST(rcTest, shortExeclongDelay) {
H
Hongze Cheng 已提交
1218
  void   *mgmt = NULL;
D
dapan1121 已提交
1219
  int32_t code = 0;
H
Hongze Cheng 已提交
1220
  void   *mockPointer = (void *)0x1;
D
dapan1121 已提交
1221 1222

  qwtInitLogFile();
H
Hongze Cheng 已提交
1223

D
dapan1121 已提交
1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235
  stubSetStringToPlan();
  stubSetRpcSendResponse();
  stubSetExecTask();
  stubSetCreateExecTask();
  stubSetAsyncKillTask();
  stubSetDestroyTask();
  stubSetDestroyDataSinker();
  stubSetGetDataLength();
  stubSetEndPut();
  stubSetPutDataBlock();
  stubSetGetDataBlock();

wafwerar's avatar
wafwerar 已提交
1236
  taosSeedRand(taosGetTimestampSec());
D
dapan1121 已提交
1237 1238 1239
  qwtTestStop = false;
  qwtTestQuitThreadNum = 0;

S
Shengliang Guan 已提交
1240
  SMsgCb msgCb = {0};
1241
  msgCb.mgmt = (void *)mockPointer;
S
Shengliang Guan 已提交
1242
  msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue;
D
dapan1121 已提交
1243
  code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb);
D
dapan1121 已提交
1244 1245 1246 1247 1248 1249 1250 1251
  ASSERT_EQ(code, 0);

  qwtTestMaxExecTaskUsec = 0;
  qwtTestReqMaxDelayUsec = 1000000;

  tsem_init(&qwtTestQuerySem, 0, 0);
  tsem_init(&qwtTestFetchSem, 0, 0);

wafwerar's avatar
wafwerar 已提交
1252 1253
  TdThreadAttr thattr;
  taosThreadAttrInit(&thattr);
D
dapan1121 已提交
1254

H
Hongze Cheng 已提交
1255
  TdThread t1, t2, t3, t4, t5;
wafwerar's avatar
wafwerar 已提交
1256 1257 1258
  taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt);
  taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt);
  taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt);
D
dapan1121 已提交
1259 1260 1261

  while (true) {
    if (qwtTestDeadLoop) {
wafwerar's avatar
wafwerar 已提交
1262
      taosSsleep(1);
D
dapan1121 已提交
1263
    } else {
wafwerar's avatar
wafwerar 已提交
1264
      taosSsleep(qwtTestMTRunSec);
D
dapan1121 已提交
1265 1266 1267 1268
      break;
    }
  }

H
Hongze Cheng 已提交
1269
  qwtTestStop = true;
D
dapan1121 已提交
1270 1271 1272 1273 1274

  while (true) {
    if (qwtTestQuitThreadNum == 3) {
      break;
    }
H
Hongze Cheng 已提交
1275

wafwerar's avatar
wafwerar 已提交
1276
    taosSsleep(1);
D
dapan1121 已提交
1277 1278

    if (qwtTestCaseFinished) {
H
Hongze Cheng 已提交
1279
      if (qwtTestQuitThreadNum < 3) {
D
dapan1121 已提交
1280 1281
        tsem_post(&qwtTestQuerySem);
        tsem_post(&qwtTestFetchSem);
H
Hongze Cheng 已提交
1282

wafwerar's avatar
wafwerar 已提交
1283
        taosUsleep(10);
D
dapan1121 已提交
1284 1285
      }
    }
D
dapan1121 已提交
1286 1287 1288 1289 1290 1291 1292 1293 1294 1295
  }

  qwtTestQueryQueueNum = 0;
  qwtTestQueryQueueRIdx = 0;
  qwtTestQueryQueueWIdx = 0;
  qwtTestQueryQueueLock = 0;
  qwtTestFetchQueueNum = 0;
  qwtTestFetchQueueRIdx = 0;
  qwtTestFetchQueueWIdx = 0;
  qwtTestFetchQueueLock = 0;
H
Hongze Cheng 已提交
1296

D
dapan1121 已提交
1297 1298 1299 1300
  qWorkerDestroy(&mgmt);
}

TEST(rcTest, dropTest) {
H
Hongze Cheng 已提交
1301
  void   *mgmt = NULL;
D
dapan1121 已提交
1302
  int32_t code = 0;
H
Hongze Cheng 已提交
1303
  void   *mockPointer = (void *)0x1;
D
dapan1121 已提交
1304 1305

  qwtInitLogFile();
H
Hongze Cheng 已提交
1306

D
dapan1121 已提交
1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318
  stubSetStringToPlan();
  stubSetRpcSendResponse();
  stubSetExecTask();
  stubSetCreateExecTask();
  stubSetAsyncKillTask();
  stubSetDestroyTask();
  stubSetDestroyDataSinker();
  stubSetGetDataLength();
  stubSetEndPut();
  stubSetPutDataBlock();
  stubSetGetDataBlock();

wafwerar's avatar
wafwerar 已提交
1319
  taosSeedRand(taosGetTimestampSec());
S
Shengliang Guan 已提交
1320

X
Xiaoyu Wang 已提交
1321
  SMsgCb msgCb = {0};
1322
  msgCb.mgmt = (void *)mockPointer;
S
Shengliang Guan 已提交
1323
  msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue;
D
dapan1121 已提交
1324
  code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb);
D
dapan1121 已提交
1325 1326
  ASSERT_EQ(code, 0);

D
dapan1121 已提交
1327 1328 1329
  tsem_init(&qwtTestQuerySem, 0, 0);
  tsem_init(&qwtTestFetchSem, 0, 0);

wafwerar's avatar
wafwerar 已提交
1330 1331
  TdThreadAttr thattr;
  taosThreadAttrInit(&thattr);
D
dapan1121 已提交
1332

H
Hongze Cheng 已提交
1333
  TdThread t1, t2, t3, t4, t5;
X
Xiaoyu Wang 已提交
1334
  taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt);
wafwerar's avatar
wafwerar 已提交
1335 1336
  taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt);
  taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt);
D
dapan1121 已提交
1337 1338 1339

  while (true) {
    if (qwtTestDeadLoop) {
wafwerar's avatar
wafwerar 已提交
1340
      taosSsleep(1);
D
dapan1121 已提交
1341
    } else {
wafwerar's avatar
wafwerar 已提交
1342
      taosSsleep(qwtTestMTRunSec);
D
dapan1121 已提交
1343 1344 1345
      break;
    }
  }
H
Hongze Cheng 已提交
1346

D
dapan1121 已提交
1347
  qwtTestStop = true;
wafwerar's avatar
wafwerar 已提交
1348
  taosSsleep(3);
H
Hongze Cheng 已提交
1349

D
dapan1121 已提交
1350
  qWorkerDestroy(&mgmt);
D
dapan 已提交
1351 1352
}

H
Hongze Cheng 已提交
1353
int main(int argc, char **argv) {
wafwerar's avatar
wafwerar 已提交
1354
  taosSeedRand(taosGetTimestampSec());
D
dapan1121 已提交
1355 1356 1357 1358
  testing::InitGoogleTest(&argc, argv);
  return RUN_ALL_TESTS();
}

D
dapan1121 已提交
1359
#pragma GCC diagnostic pop