qworkerTests.cpp 32.8 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include <gtest/gtest.h>
#include <iostream>

S
Shengliang Guan 已提交
19 20
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings"
D
dapan1121 已提交
21 22 23
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
S
Shengliang Guan 已提交
24 25 26 27
#pragma GCC diagnostic ignored "-Wsign-compare"
#pragma GCC diagnostic ignored "-Wformat"
#pragma GCC diagnostic ignored "-Wint-to-pointer-cast"
#pragma GCC diagnostic ignored "-Wpointer-arith"
28
#include <addr_any.h>
S
Shengliang Guan 已提交
29

wafwerar's avatar
wafwerar 已提交
30 31 32
#ifdef WINDOWS
#define TD_USE_WINSOCK
#endif
D
dapan1121 已提交
33 34
#include "os.h"

H
Hongze Cheng 已提交
35 36
#include "dataSinkMgt.h"
#include "executor.h"
D
dapan 已提交
37 38
#include "planner.h"
#include "qworker.h"
D
dapan1121 已提交
39
#include "stub.h"
H
Hongze Cheng 已提交
40 41 42 43 44 45
#include "taos.h"
#include "tdatablock.h"
#include "tdef.h"
#include "tglobal.h"
#include "trpc.h"
#include "tvariant.h"
D
dapan1121 已提交
46 47 48

namespace {

D
dapan1121 已提交
49 50 51
#define qwtTestQueryQueueSize 1000000
#define qwtTestFetchQueueSize 1000000

D
dapan1121 已提交
52 53
bool qwtEnableLog = true;

D
dapan1121 已提交
54 55
int32_t qwtTestMaxExecTaskUsec = 2;
int32_t qwtTestReqMaxDelayUsec = 2;
D
dapan1121 已提交
56

H
Hongze Cheng 已提交
57 58 59 60 61 62
int64_t  qwtTestQueryId = 0;
bool     qwtTestEnableSleep = true;
bool     qwtTestStop = false;
bool     qwtTestDeadLoop = false;
int32_t  qwtTestMTRunSec = 2;
int32_t  qwtTestPrintNum = 10000;
D
dapan1121 已提交
63 64
uint64_t qwtTestCaseIdx = 0;
uint64_t qwtTestCaseNum = 4;
H
Hongze Cheng 已提交
65 66 67 68 69 70 71 72 73
bool     qwtTestCaseFinished = false;
tsem_t   qwtTestQuerySem;
tsem_t   qwtTestFetchSem;
int32_t  qwtTestQuitThreadNum = 0;

int32_t         qwtTestQueryQueueRIdx = 0;
int32_t         qwtTestQueryQueueWIdx = 0;
int32_t         qwtTestQueryQueueNum = 0;
SRWLatch        qwtTestQueryQueueLock = 0;
D
dapan1121 已提交
74 75
struct SRpcMsg *qwtTestQueryQueue[qwtTestQueryQueueSize] = {0};

H
Hongze Cheng 已提交
76 77 78 79
int32_t         qwtTestFetchQueueRIdx = 0;
int32_t         qwtTestFetchQueueWIdx = 0;
int32_t         qwtTestFetchQueueNum = 0;
SRWLatch        qwtTestFetchQueueLock = 0;
D
dapan1121 已提交
80 81
struct SRpcMsg *qwtTestFetchQueue[qwtTestFetchQueueSize] = {0};

H
Hongze Cheng 已提交
82 83 84
int32_t  qwtTestSinkBlockNum = 0;
int32_t  qwtTestSinkMaxBlockNum = 0;
bool     qwtTestSinkQueryEnd = false;
D
dapan1121 已提交
85
SRWLatch qwtTestSinkLock = 0;
H
Hongze Cheng 已提交
86 87 88 89 90 91 92 93 94
int32_t  qwtTestSinkLastLen = 0;

SSubQueryMsg       qwtqueryMsg = {0};
SRpcMsg            qwtfetchRpc = {0};
SResFetchReq       qwtfetchMsg = {0};
SRpcMsg            qwtreadyRpc = {0};
SResReadyReq       qwtreadyMsg = {0};
SRpcMsg            qwtdropRpc = {0};
STaskDropReq       qwtdropMsg = {0};
D
dapan1121 已提交
95
SSchTasksStatusReq qwtstatusMsg = {0};
D
dapan1121 已提交
96

D
dapan1121 已提交
97
void qwtInitLogFile() {
D
dapan1121 已提交
98 99 100
  if (!qwtEnableLog) {
    return;
  }
H
Hongze Cheng 已提交
101 102
  const char   *defaultLogFileNamePrefix = "taosdlog";
  const int32_t maxLogFileNum = 10;
D
dapan1121 已提交
103 104 105

  tsAsyncLog = 0;
  qDebugFlag = 159;
wafwerar's avatar
wafwerar 已提交
106
  strcpy(tsLogDir, TD_LOG_DIR_PATH);
D
dapan1121 已提交
107

S
Shengliang Guan 已提交
108
  if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) {
S
os env  
Shengliang Guan 已提交
109
    printf("failed to open log file in directory:%s\n", tsLogDir);
D
dapan1121 已提交
110 111 112 113
  }
}

void qwtBuildQueryReqMsg(SRpcMsg *queryRpc) {
D
dapan1121 已提交
114 115 116
  qwtqueryMsg.queryId = htobe64(atomic_add_fetch_64(&qwtTestQueryId, 1));
  qwtqueryMsg.sId = htobe64(1);
  qwtqueryMsg.taskId = htobe64(1);
117 118
  qwtqueryMsg.phyLen = htonl(100);
  qwtqueryMsg.sqlLen = 0;
D
dapan1121 已提交
119
  queryRpc->msgType = TDMT_SCH_QUERY;
D
dapan1121 已提交
120
  queryRpc->pCont = &qwtqueryMsg;
D
dapan1121 已提交
121 122 123 124 125
  queryRpc->contLen = sizeof(SSubQueryMsg) + 100;
}

void qwtBuildFetchReqMsg(SResFetchReq *fetchMsg, SRpcMsg *fetchRpc) {
  fetchMsg->sId = htobe64(1);
D
dapan1121 已提交
126
  fetchMsg->queryId = htobe64(atomic_load_64(&qwtTestQueryId));
D
dapan1121 已提交
127
  fetchMsg->taskId = htobe64(1);
D
dapan1121 已提交
128
  fetchRpc->msgType = TDMT_SCH_FETCH;
D
dapan1121 已提交
129 130 131 132 133 134
  fetchRpc->pCont = fetchMsg;
  fetchRpc->contLen = sizeof(SResFetchReq);
}

void qwtBuildDropReqMsg(STaskDropReq *dropMsg, SRpcMsg *dropRpc) {
  dropMsg->sId = htobe64(1);
D
dapan1121 已提交
135
  dropMsg->queryId = htobe64(atomic_load_64(&qwtTestQueryId));
D
dapan1121 已提交
136
  dropMsg->taskId = htobe64(1);
D
dapan1121 已提交
137
  dropRpc->msgType = TDMT_SCH_DROP_TASK;
D
dapan1121 已提交
138 139 140 141
  dropRpc->pCont = dropMsg;
  dropRpc->contLen = sizeof(STaskDropReq);
}

H
Hongze Cheng 已提交
142
int32_t qwtStringToPlan(const char *str, SSubplan **subplan) {
D
dapan 已提交
143
  *subplan = (SSubplan *)0x1;
D
dapan 已提交
144
  return 0;
D
dapan1121 已提交
145 146
}

D
dapan1121 已提交
147 148
int32_t qwtPutReqToFetchQueue(void *node, struct SRpcMsg *pMsg) {
  taosWLockLatch(&qwtTestFetchQueueLock);
wafwerar's avatar
wafwerar 已提交
149
  struct SRpcMsg *newMsg = (struct SRpcMsg *)taosMemoryCalloc(1, sizeof(struct SRpcMsg));
H
Hongze Cheng 已提交
150
  memcpy(newMsg, pMsg, sizeof(struct SRpcMsg));
D
dapan1121 已提交
151
  qwtTestFetchQueue[qwtTestFetchQueueWIdx++] = newMsg;
D
dapan1121 已提交
152 153 154
  if (qwtTestFetchQueueWIdx >= qwtTestFetchQueueSize) {
    qwtTestFetchQueueWIdx = 0;
  }
H
Hongze Cheng 已提交
155

D
dapan1121 已提交
156 157 158 159 160 161 162
  qwtTestFetchQueueNum++;

  if (qwtTestFetchQueueWIdx == qwtTestFetchQueueRIdx) {
    printf("Fetch queue is full");
    assert(0);
  }
  taosWUnLockLatch(&qwtTestFetchQueueLock);
H
Hongze Cheng 已提交
163

D
dapan1121 已提交
164
  tsem_post(&qwtTestFetchSem);
H
Hongze Cheng 已提交
165

D
dapan1121 已提交
166 167 168
  return 0;
}

S
Shengliang Guan 已提交
169
int32_t qwtPutReqToQueue(void *node, EQueueType qtype, struct SRpcMsg *pMsg) {
D
dapan1121 已提交
170
  taosWLockLatch(&qwtTestQueryQueueLock);
wafwerar's avatar
wafwerar 已提交
171
  struct SRpcMsg *newMsg = (struct SRpcMsg *)taosMemoryCalloc(1, sizeof(struct SRpcMsg));
D
dapan1121 已提交
172 173
  memcpy(newMsg, pMsg, sizeof(struct SRpcMsg));
  qwtTestQueryQueue[qwtTestQueryQueueWIdx++] = newMsg;
D
dapan1121 已提交
174 175 176
  if (qwtTestQueryQueueWIdx >= qwtTestQueryQueueSize) {
    qwtTestQueryQueueWIdx = 0;
  }
S
Shengliang Guan 已提交
177

D
dapan1121 已提交
178 179 180 181 182 183 184
  qwtTestQueryQueueNum++;

  if (qwtTestQueryQueueWIdx == qwtTestQueryQueueRIdx) {
    printf("query queue is full");
    assert(0);
  }
  taosWUnLockLatch(&qwtTestQueryQueueLock);
H
Hongze Cheng 已提交
185

D
dapan1121 已提交
186
  tsem_post(&qwtTestQuerySem);
D
dapan1121 已提交
187

H
Hongze Cheng 已提交
188
  return 0;
D
dapan1121 已提交
189
}
D
dapan1121 已提交
190

H
Hongze Cheng 已提交
191
void qwtSendReqToDnode(void *pVnode, struct SEpSet *epSet, struct SRpcMsg *pReq) {}
D
dapan1121 已提交
192

D
dapan1121 已提交
193
void qwtRpcSendResponse(const SRpcMsg *pRsp) {
D
dapan1121 已提交
194
  switch (pRsp->msgType) {
D
dapan1121 已提交
195 196
    case TDMT_SCH_QUERY_RSP:
    case TDMT_SCH_MERGE_QUERY_RSP: {
D
dapan1121 已提交
197 198
      SQueryTableRsp *rsp = (SQueryTableRsp *)pRsp->pCont;

D
dapan1121 已提交
199
      if (pRsp->code) {
D
dapan1121 已提交
200
        qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc);
D
dapan 已提交
201
        qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc);
D
dapan1121 已提交
202
      }
H
Hongze Cheng 已提交
203

D
dapan1121 已提交
204
      rpcFreeCont(rsp);
D
dapan1121 已提交
205 206
      break;
    }
D
dapan1121 已提交
207 208
    case TDMT_SCH_FETCH_RSP:
    case TDMT_SCH_MERGE_FETCH_RSP: {
D
dapan1121 已提交
209
      SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)pRsp->pCont;
H
Hongze Cheng 已提交
210

D
dapan1121 已提交
211 212
      if (0 == pRsp->code && 0 == rsp->completed) {
        qwtBuildFetchReqMsg(&qwtfetchMsg, &qwtfetchRpc);
D
dapan 已提交
213
        qwtPutReqToFetchQueue((void *)0x1, &qwtfetchRpc);
D
dapan1121 已提交
214
        rpcFreeCont(rsp);
D
dapan1121 已提交
215 216 217 218
        return;
      }

      qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc);
D
dapan 已提交
219
      qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc);
D
dapan1121 已提交
220
      rpcFreeCont(rsp);
H
Hongze Cheng 已提交
221

D
dapan1121 已提交
222 223
      break;
    }
D
dapan1121 已提交
224
    case TDMT_SCH_DROP_TASK_RSP: {
D
dapan1121 已提交
225
      STaskDropRsp *rsp = (STaskDropRsp *)pRsp->pCont;
D
dapan1121 已提交
226
      rpcFreeCont(rsp);
D
dapan1121 已提交
227 228 229

      qwtTestCaseFinished = true;
      break;
D
dapan1121 已提交
230 231
    }
  }
D
dapan1121 已提交
232

D
dapan1121 已提交
233 234 235
  return;
}

H
Hongze Cheng 已提交
236 237
int32_t qwtCreateExecTask(void *tsdb, int32_t vgId, uint64_t taskId, struct SSubplan *pPlan, qTaskInfo_t *pTaskInfo,
                          DataSinkHandle *handle) {
D
dapan1121 已提交
238
  qwtTestSinkBlockNum = 0;
wafwerar's avatar
wafwerar 已提交
239
  qwtTestSinkMaxBlockNum = taosRand() % 100 + 1;
D
dapan1121 已提交
240
  qwtTestSinkQueryEnd = false;
H
Hongze Cheng 已提交
241 242 243

  *pTaskInfo = (qTaskInfo_t)((char *)qwtTestCaseIdx + 1);
  *handle = (DataSinkHandle)((char *)qwtTestCaseIdx + 2);
D
dapan1121 已提交
244 245

  ++qwtTestCaseIdx;
H
Hongze Cheng 已提交
246

D
dapan1121 已提交
247 248 249
  return 0;
}

H
Hongze Cheng 已提交
250
int32_t qwtExecTask(qTaskInfo_t tinfo, SSDataBlock **pRes, uint64_t *useconds) {
D
dapan1121 已提交
251
  int32_t endExec = 0;
H
Hongze Cheng 已提交
252

D
dapan1121 已提交
253 254 255 256
  if (NULL == tinfo) {
    *pRes = NULL;
    *useconds = 0;
  } else {
D
dapan1121 已提交
257 258
    if (qwtTestSinkQueryEnd) {
      *pRes = NULL;
wafwerar's avatar
wafwerar 已提交
259
      *useconds = taosRand() % 10;
D
dapan1121 已提交
260 261
      return 0;
    }
H
Hongze Cheng 已提交
262

wafwerar's avatar
wafwerar 已提交
263
    endExec = taosRand() % 5;
H
Hongze Cheng 已提交
264

D
dapan1121 已提交
265 266
    int32_t runTime = 0;
    if (qwtTestEnableSleep && qwtTestMaxExecTaskUsec > 0) {
wafwerar's avatar
wafwerar 已提交
267
      runTime = taosRand() % qwtTestMaxExecTaskUsec;
D
dapan1121 已提交
268 269 270 271
    }

    if (qwtTestEnableSleep) {
      if (runTime) {
wafwerar's avatar
wafwerar 已提交
272
        taosUsleep(runTime);
D
dapan1121 已提交
273 274
      }
    }
H
Hongze Cheng 已提交
275

D
dapan1121 已提交
276
    if (endExec) {
H
Hongze Cheng 已提交
277
      *pRes = (SSDataBlock *)taosMemoryCalloc(1, sizeof(SSDataBlock));
D
dapan1121 已提交
278
      (*pRes)->info.rows = taosRand() % 1000 + 1;
D
dapan1121 已提交
279 280
    } else {
      *pRes = NULL;
wafwerar's avatar
wafwerar 已提交
281
      *useconds = taosRand() % 10;
D
dapan1121 已提交
282 283
    }
  }
D
dapan1121 已提交
284 285 286 287

  return 0;
}

H
Hongze Cheng 已提交
288
int32_t qwtKillTask(qTaskInfo_t qinfo) { return 0; }
D
dapan1121 已提交
289

H
Hongze Cheng 已提交
290
void qwtDestroyTask(qTaskInfo_t qHandle) {}
D
dapan1121 已提交
291

H
Hongze Cheng 已提交
292
int32_t qwtPutDataBlock(DataSinkHandle handle, const SInputData *pInput, bool *pContinue) {
D
dapan1121 已提交
293 294 295 296
  if (NULL == handle || NULL == pInput || NULL == pContinue) {
    assert(0);
  }

wafwerar's avatar
wafwerar 已提交
297
  taosMemoryFree((void *)pInput->pData);
D
dapan1121 已提交
298

D
dapan1121 已提交
299 300 301 302 303 304
  taosWLockLatch(&qwtTestSinkLock);

  qwtTestSinkBlockNum++;

  if (qwtTestSinkBlockNum >= qwtTestSinkMaxBlockNum) {
    *pContinue = false;
D
dapan1121 已提交
305 306
  } else {
    *pContinue = true;
D
dapan1121 已提交
307 308
  }
  taosWUnLockLatch(&qwtTestSinkLock);
H
Hongze Cheng 已提交
309

D
dapan1121 已提交
310 311 312 313
  return 0;
}

void qwtEndPut(DataSinkHandle handle, uint64_t useconds) {
D
dapan1121 已提交
314 315 316 317 318
  if (NULL == handle) {
    assert(0);
  }

  qwtTestSinkQueryEnd = true;
D
dapan1121 已提交
319 320
}

H
Hongze Cheng 已提交
321
void qwtGetDataLength(DataSinkHandle handle, int64_t *pLen, bool *pQueryEnd) {
D
dapan1121 已提交
322 323 324 325 326 327 328
  static int32_t in = 0;

  if (in > 0) {
    assert(0);
  }

  atomic_add_fetch_32(&in, 1);
H
Hongze Cheng 已提交
329

D
dapan1121 已提交
330 331 332 333 334 335
  if (NULL == handle) {
    assert(0);
  }

  taosWLockLatch(&qwtTestSinkLock);
  if (qwtTestSinkBlockNum > 0) {
wafwerar's avatar
wafwerar 已提交
336
    *pLen = taosRand() % 100 + 1;
D
dapan1121 已提交
337 338 339 340
    qwtTestSinkBlockNum--;
  } else {
    *pLen = 0;
  }
D
dapan1121 已提交
341
  qwtTestSinkLastLen = *pLen;
D
dapan1121 已提交
342 343 344 345 346
  taosWUnLockLatch(&qwtTestSinkLock);

  *pQueryEnd = qwtTestSinkQueryEnd;

  atomic_sub_fetch_32(&in, 1);
D
dapan1121 已提交
347 348
}

H
Hongze Cheng 已提交
349
int32_t qwtGetDataBlock(DataSinkHandle handle, SOutputData *pOutput) {
D
dapan1121 已提交
350
  taosWLockLatch(&qwtTestSinkLock);
D
dapan1121 已提交
351
  if (qwtTestSinkLastLen > 0) {
wafwerar's avatar
wafwerar 已提交
352
    pOutput->numOfRows = taosRand() % 10 + 1;
D
dapan1121 已提交
353
    pOutput->compressed = 1;
D
dapan1121 已提交
354 355 356
    pOutput->queryEnd = qwtTestSinkQueryEnd;
    if (qwtTestSinkBlockNum == 0) {
      pOutput->bufStatus = DS_BUF_EMPTY;
H
Hongze Cheng 已提交
357
    } else if (qwtTestSinkBlockNum <= qwtTestSinkMaxBlockNum * 0.5) {
D
dapan1121 已提交
358 359 360 361
      pOutput->bufStatus = DS_BUF_LOW;
    } else {
      pOutput->bufStatus = DS_BUF_FULL;
    }
wafwerar's avatar
wafwerar 已提交
362
    pOutput->useconds = taosRand() % 10 + 1;
D
dapan1121 已提交
363 364 365 366 367
    pOutput->precision = 1;
  } else if (qwtTestSinkLastLen == 0) {
    pOutput->numOfRows = 0;
    pOutput->compressed = 1;
    pOutput->pData = NULL;
D
dapan1121 已提交
368 369 370
    pOutput->queryEnd = qwtTestSinkQueryEnd;
    if (qwtTestSinkBlockNum == 0) {
      pOutput->bufStatus = DS_BUF_EMPTY;
H
Hongze Cheng 已提交
371
    } else if (qwtTestSinkBlockNum <= qwtTestSinkMaxBlockNum * 0.5) {
D
dapan1121 已提交
372 373 374 375
      pOutput->bufStatus = DS_BUF_LOW;
    } else {
      pOutput->bufStatus = DS_BUF_FULL;
    }
wafwerar's avatar
wafwerar 已提交
376
    pOutput->useconds = taosRand() % 10 + 1;
D
dapan1121 已提交
377 378 379 380 381
    pOutput->precision = 1;
  } else {
    assert(0);
  }
  taosWUnLockLatch(&qwtTestSinkLock);
D
dapan1121 已提交
382

H
Hongze Cheng 已提交
383
  return 0;
D
dapan1121 已提交
384 385
}

H
Hongze Cheng 已提交
386
void qwtDestroyDataSinker(DataSinkHandle handle) {}
D
dapan1121 已提交
387

D
dapan 已提交
388
void stubSetStringToPlan() {
D
dapan1121 已提交
389
  static Stub stub;
D
dapan 已提交
390
  stub.set(qStringToSubplan, qwtStringToPlan);
D
dapan1121 已提交
391
  {
wafwerar's avatar
wafwerar 已提交
392
#ifdef WINDOWS
H
Hongze Cheng 已提交
393 394
    AddrAny                       any;
    std::map<std::string, void *> result;
wafwerar's avatar
wafwerar 已提交
395 396 397
    any.get_func_addr("qStringToSubplan", result);
#endif
#ifdef LINUX
H
Hongze Cheng 已提交
398 399
    AddrAny                       any("libplanner.so");
    std::map<std::string, void *> result;
D
dapan 已提交
400
    any.get_global_func_addr_dynsym("^qStringToSubplan$", result);
wafwerar's avatar
wafwerar 已提交
401
#endif
H
Hongze Cheng 已提交
402
    for (const auto &f : result) {
D
dapan 已提交
403
      stub.set(f.second, qwtStringToPlan);
D
dapan1121 已提交
404 405 406 407
    }
  }
}

D
dapan1121 已提交
408 409 410 411
void stubSetExecTask() {
  static Stub stub;
  stub.set(qExecTask, qwtExecTask);
  {
wafwerar's avatar
wafwerar 已提交
412
#ifdef WINDOWS
H
Hongze Cheng 已提交
413 414
    AddrAny                       any;
    std::map<std::string, void *> result;
wafwerar's avatar
wafwerar 已提交
415 416 417
    any.get_func_addr("qExecTask", result);
#endif
#ifdef LINUX
H
Hongze Cheng 已提交
418 419
    AddrAny                       any("libexecutor.so");
    std::map<std::string, void *> result;
D
dapan1121 已提交
420
    any.get_global_func_addr_dynsym("^qExecTask$", result);
wafwerar's avatar
wafwerar 已提交
421
#endif
H
Hongze Cheng 已提交
422
    for (const auto &f : result) {
D
dapan1121 已提交
423 424 425 426 427 428 429 430 431
      stub.set(f.second, qwtExecTask);
    }
  }
}

void stubSetCreateExecTask() {
  static Stub stub;
  stub.set(qCreateExecTask, qwtCreateExecTask);
  {
wafwerar's avatar
wafwerar 已提交
432
#ifdef WINDOWS
H
Hongze Cheng 已提交
433 434
    AddrAny                       any;
    std::map<std::string, void *> result;
wafwerar's avatar
wafwerar 已提交
435 436 437
    any.get_func_addr("qCreateExecTask", result);
#endif
#ifdef LINUX
H
Hongze Cheng 已提交
438 439
    AddrAny                       any("libexecutor.so");
    std::map<std::string, void *> result;
D
dapan1121 已提交
440
    any.get_global_func_addr_dynsym("^qCreateExecTask$", result);
wafwerar's avatar
wafwerar 已提交
441
#endif
H
Hongze Cheng 已提交
442
    for (const auto &f : result) {
D
dapan1121 已提交
443 444 445 446 447 448 449 450 451
      stub.set(f.second, qwtCreateExecTask);
    }
  }
}

void stubSetAsyncKillTask() {
  static Stub stub;
  stub.set(qAsyncKillTask, qwtKillTask);
  {
wafwerar's avatar
wafwerar 已提交
452
#ifdef WINDOWS
H
Hongze Cheng 已提交
453 454
    AddrAny                       any;
    std::map<std::string, void *> result;
wafwerar's avatar
wafwerar 已提交
455 456 457
    any.get_func_addr("qAsyncKillTask", result);
#endif
#ifdef LINUX
H
Hongze Cheng 已提交
458 459
    AddrAny                       any("libexecutor.so");
    std::map<std::string, void *> result;
D
dapan1121 已提交
460
    any.get_global_func_addr_dynsym("^qAsyncKillTask$", result);
wafwerar's avatar
wafwerar 已提交
461
#endif
H
Hongze Cheng 已提交
462
    for (const auto &f : result) {
D
dapan1121 已提交
463 464 465 466 467 468 469 470 471
      stub.set(f.second, qwtKillTask);
    }
  }
}

void stubSetDestroyTask() {
  static Stub stub;
  stub.set(qDestroyTask, qwtDestroyTask);
  {
wafwerar's avatar
wafwerar 已提交
472
#ifdef WINDOWS
H
Hongze Cheng 已提交
473 474
    AddrAny                       any;
    std::map<std::string, void *> result;
wafwerar's avatar
wafwerar 已提交
475 476 477
    any.get_func_addr("qDestroyTask", result);
#endif
#ifdef LINUX
H
Hongze Cheng 已提交
478 479
    AddrAny                       any("libexecutor.so");
    std::map<std::string, void *> result;
D
dapan1121 已提交
480
    any.get_global_func_addr_dynsym("^qDestroyTask$", result);
wafwerar's avatar
wafwerar 已提交
481
#endif
H
Hongze Cheng 已提交
482
    for (const auto &f : result) {
D
dapan1121 已提交
483 484 485 486 487 488 489 490 491
      stub.set(f.second, qwtDestroyTask);
    }
  }
}

void stubSetDestroyDataSinker() {
  static Stub stub;
  stub.set(dsDestroyDataSinker, qwtDestroyDataSinker);
  {
wafwerar's avatar
wafwerar 已提交
492
#ifdef WINDOWS
H
Hongze Cheng 已提交
493 494
    AddrAny                       any;
    std::map<std::string, void *> result;
wafwerar's avatar
wafwerar 已提交
495 496 497
    any.get_func_addr("dsDestroyDataSinker", result);
#endif
#ifdef LINUX
H
Hongze Cheng 已提交
498 499
    AddrAny                       any("libexecutor.so");
    std::map<std::string, void *> result;
D
dapan1121 已提交
500
    any.get_global_func_addr_dynsym("^dsDestroyDataSinker$", result);
wafwerar's avatar
wafwerar 已提交
501
#endif
H
Hongze Cheng 已提交
502
    for (const auto &f : result) {
D
dapan1121 已提交
503 504 505 506 507 508 509 510 511
      stub.set(f.second, qwtDestroyDataSinker);
    }
  }
}

void stubSetGetDataLength() {
  static Stub stub;
  stub.set(dsGetDataLength, qwtGetDataLength);
  {
wafwerar's avatar
wafwerar 已提交
512
#ifdef WINDOWS
H
Hongze Cheng 已提交
513 514
    AddrAny                       any;
    std::map<std::string, void *> result;
wafwerar's avatar
wafwerar 已提交
515 516 517
    any.get_func_addr("dsGetDataLength", result);
#endif
#ifdef LINUX
H
Hongze Cheng 已提交
518 519
    AddrAny                       any("libexecutor.so");
    std::map<std::string, void *> result;
D
dapan1121 已提交
520
    any.get_global_func_addr_dynsym("^dsGetDataLength$", result);
wafwerar's avatar
wafwerar 已提交
521
#endif
H
Hongze Cheng 已提交
522
    for (const auto &f : result) {
D
dapan1121 已提交
523 524 525 526 527 528 529 530 531
      stub.set(f.second, qwtGetDataLength);
    }
  }
}

void stubSetEndPut() {
  static Stub stub;
  stub.set(dsEndPut, qwtEndPut);
  {
wafwerar's avatar
wafwerar 已提交
532
#ifdef WINDOWS
H
Hongze Cheng 已提交
533 534
    AddrAny                       any;
    std::map<std::string, void *> result;
wafwerar's avatar
wafwerar 已提交
535 536 537
    any.get_func_addr("dsEndPut", result);
#endif
#ifdef LINUX
H
Hongze Cheng 已提交
538 539
    AddrAny                       any("libexecutor.so");
    std::map<std::string, void *> result;
D
dapan1121 已提交
540
    any.get_global_func_addr_dynsym("^dsEndPut$", result);
wafwerar's avatar
wafwerar 已提交
541
#endif
H
Hongze Cheng 已提交
542
    for (const auto &f : result) {
D
dapan1121 已提交
543 544 545 546 547 548 549 550 551
      stub.set(f.second, qwtEndPut);
    }
  }
}

void stubSetPutDataBlock() {
  static Stub stub;
  stub.set(dsPutDataBlock, qwtPutDataBlock);
  {
wafwerar's avatar
wafwerar 已提交
552
#ifdef WINDOWS
H
Hongze Cheng 已提交
553 554
    AddrAny                       any;
    std::map<std::string, void *> result;
wafwerar's avatar
wafwerar 已提交
555 556 557
    any.get_func_addr("dsPutDataBlock", result);
#endif
#ifdef LINUX
H
Hongze Cheng 已提交
558 559
    AddrAny                       any("libexecutor.so");
    std::map<std::string, void *> result;
D
dapan1121 已提交
560
    any.get_global_func_addr_dynsym("^dsPutDataBlock$", result);
wafwerar's avatar
wafwerar 已提交
561
#endif
H
Hongze Cheng 已提交
562
    for (const auto &f : result) {
D
dapan1121 已提交
563 564 565 566 567
      stub.set(f.second, qwtPutDataBlock);
    }
  }
}

D
dapan1121 已提交
568 569 570 571
void stubSetRpcSendResponse() {
  static Stub stub;
  stub.set(rpcSendResponse, qwtRpcSendResponse);
  {
wafwerar's avatar
wafwerar 已提交
572
#ifdef WINDOWS
H
Hongze Cheng 已提交
573 574
    AddrAny                       any;
    std::map<std::string, void *> result;
wafwerar's avatar
wafwerar 已提交
575 576 577
    any.get_func_addr("rpcSendResponse", result);
#endif
#ifdef LINUX
H
Hongze Cheng 已提交
578 579
    AddrAny                       any("libtransport.so");
    std::map<std::string, void *> result;
D
dapan1121 已提交
580
    any.get_global_func_addr_dynsym("^rpcSendResponse$", result);
wafwerar's avatar
wafwerar 已提交
581
#endif
H
Hongze Cheng 已提交
582
    for (const auto &f : result) {
D
dapan1121 已提交
583 584 585 586 587
      stub.set(f.second, qwtRpcSendResponse);
    }
  }
}

D
dapan1121 已提交
588 589 590 591
void stubSetGetDataBlock() {
  static Stub stub;
  stub.set(dsGetDataBlock, qwtGetDataBlock);
  {
wafwerar's avatar
wafwerar 已提交
592
#ifdef WINDOWS
H
Hongze Cheng 已提交
593 594
    AddrAny                       any;
    std::map<std::string, void *> result;
wafwerar's avatar
wafwerar 已提交
595 596 597
    any.get_func_addr("dsGetDataBlock", result);
#endif
#ifdef LINUX
H
Hongze Cheng 已提交
598 599
    AddrAny                       any("libtransport.so");
    std::map<std::string, void *> result;
D
dapan1121 已提交
600
    any.get_global_func_addr_dynsym("^dsGetDataBlock$", result);
wafwerar's avatar
wafwerar 已提交
601
#endif
H
Hongze Cheng 已提交
602
    for (const auto &f : result) {
D
dapan1121 已提交
603 604 605 606 607
      stub.set(f.second, qwtGetDataBlock);
    }
  }
}

D
dapan1121 已提交
608
void *queryThread(void *param) {
H
Hongze Cheng 已提交
609 610
  SRpcMsg  queryRpc = {0};
  int32_t  code = 0;
D
dapan1121 已提交
611
  uint32_t n = 0;
H
Hongze Cheng 已提交
612 613
  void    *mockPointer = (void *)0x1;
  void    *mgmt = param;
D
dapan1121 已提交
614

D
dapan1121 已提交
615 616
  while (!qwtTestStop) {
    qwtBuildQueryReqMsg(&queryRpc);
H
Hongze Cheng 已提交
617
    qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0);
D
dapan1121 已提交
618
    if (qwtTestEnableSleep) {
H
Hongze Cheng 已提交
619
      taosUsleep(taosRand() % 5);
D
dapan1121 已提交
620 621
    }
    if (++n % qwtTestPrintNum == 0) {
D
dapan1121 已提交
622 623 624 625 626 627 628 629
      printf("query:%d\n", n);
    }
  }

  return NULL;
}

void *fetchThread(void *param) {
H
Hongze Cheng 已提交
630 631 632 633 634
  SRpcMsg      fetchRpc = {0};
  int32_t      code = 0;
  uint32_t     n = 0;
  void        *mockPointer = (void *)0x1;
  void        *mgmt = param;
S
Shengliang Guan 已提交
635
  SResFetchReq fetchMsg = {0};
D
dapan1121 已提交
636

D
dapan1121 已提交
637 638
  while (!qwtTestStop) {
    qwtBuildFetchReqMsg(&fetchMsg, &fetchRpc);
D
dapan1121 已提交
639
    code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0);
D
dapan1121 已提交
640
    if (qwtTestEnableSleep) {
H
Hongze Cheng 已提交
641
      taosUsleep(taosRand() % 5);
D
dapan1121 已提交
642 643
    }
    if (++n % qwtTestPrintNum == 0) {
D
dapan1121 已提交
644
      printf("fetch:%d\n", n);
H
Hongze Cheng 已提交
645
    }
D
dapan1121 已提交
646 647 648 649 650 651
  }

  return NULL;
}

void *dropThread(void *param) {
H
Hongze Cheng 已提交
652 653 654 655 656 657
  SRpcMsg      dropRpc = {0};
  int32_t      code = 0;
  uint32_t     n = 0;
  void        *mockPointer = (void *)0x1;
  void        *mgmt = param;
  STaskDropReq dropMsg = {0};
D
dapan1121 已提交
658

D
dapan1121 已提交
659 660
  while (!qwtTestStop) {
    qwtBuildDropReqMsg(&dropMsg, &dropRpc);
D
dapan1121 已提交
661
    code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0);
D
dapan1121 已提交
662
    if (qwtTestEnableSleep) {
H
Hongze Cheng 已提交
663
      taosUsleep(taosRand() % 5);
D
dapan1121 已提交
664 665
    }
    if (++n % qwtTestPrintNum == 0) {
D
dapan1121 已提交
666
      printf("drop:%d\n", n);
H
Hongze Cheng 已提交
667
    }
D
dapan1121 已提交
668 669 670 671 672
  }

  return NULL;
}

D
dapan1121 已提交
673
void *qwtclientThread(void *param) {
H
Hongze Cheng 已提交
674
  int32_t  code = 0;
D
dapan1121 已提交
675
  uint32_t n = 0;
H
Hongze Cheng 已提交
676 677 678
  void    *mgmt = param;
  void    *mockPointer = (void *)0x1;
  SRpcMsg  queryRpc = {0};
D
dapan1121 已提交
679

wafwerar's avatar
wafwerar 已提交
680
  taosSsleep(1);
D
dapan1121 已提交
681 682

  while (!qwtTestStop) {
D
dapan1121 已提交
683
    qwtTestCaseFinished = false;
S
Shengliang Guan 已提交
684

D
dapan1121 已提交
685
    qwtBuildQueryReqMsg(&queryRpc);
S
Shengliang Guan 已提交
686
    qwtPutReqToQueue((void *)0x1, QUERY_QUEUE, &queryRpc);
D
dapan1121 已提交
687 688

    while (!qwtTestCaseFinished) {
wafwerar's avatar
wafwerar 已提交
689
      taosUsleep(1);
D
dapan1121 已提交
690
    }
H
Hongze Cheng 已提交
691

D
dapan1121 已提交
692
    if (++n % qwtTestPrintNum == 0) {
D
dapan1121 已提交
693
      printf("case run:%d\n", n);
D
dapan1121 已提交
694 695 696
    }
  }

D
dapan1121 已提交
697 698
  atomic_add_fetch_32(&qwtTestQuitThreadNum, 1);

D
dapan1121 已提交
699 700
  return NULL;
}
D
dapan1121 已提交
701

D
dapan1121 已提交
702
void *queryQueueThread(void *param) {
H
Hongze Cheng 已提交
703
  void    *mockPointer = (void *)0x1;
D
dapan1121 已提交
704
  SRpcMsg *queryRpc = NULL;
H
Hongze Cheng 已提交
705
  void    *mgmt = param;
D
dapan1121 已提交
706

D
dapan1121 已提交
707
  while (true) {
D
dapan1121 已提交
708 709
    tsem_wait(&qwtTestQuerySem);

D
dapan1121 已提交
710
    if (qwtTestStop && qwtTestQueryQueueNum <= 0 && qwtTestCaseFinished) {
D
dapan1121 已提交
711 712 713
      break;
    }

D
dapan1121 已提交
714 715 716 717 718
    taosWLockLatch(&qwtTestQueryQueueLock);
    if (qwtTestQueryQueueNum <= 0 || qwtTestQueryQueueRIdx == qwtTestQueryQueueWIdx) {
      printf("query queue is empty\n");
      assert(0);
    }
H
Hongze Cheng 已提交
719

D
dapan1121 已提交
720
    queryRpc = qwtTestQueryQueue[qwtTestQueryQueueRIdx++];
H
Hongze Cheng 已提交
721

D
dapan1121 已提交
722 723 724
    if (qwtTestQueryQueueRIdx >= qwtTestQueryQueueSize) {
      qwtTestQueryQueueRIdx = 0;
    }
H
Hongze Cheng 已提交
725

D
dapan1121 已提交
726 727 728
    qwtTestQueryQueueNum--;
    taosWUnLockLatch(&qwtTestQueryQueueLock);

D
dapan1121 已提交
729
    if (qwtTestEnableSleep && qwtTestReqMaxDelayUsec > 0) {
wafwerar's avatar
wafwerar 已提交
730
      int32_t delay = taosRand() % qwtTestReqMaxDelayUsec;
D
dapan1121 已提交
731 732

      if (delay) {
wafwerar's avatar
wafwerar 已提交
733
        taosUsleep(delay);
D
dapan1121 已提交
734 735
      }
    }
H
Hongze Cheng 已提交
736

D
dapan1121 已提交
737
    if (TDMT_SCH_QUERY == queryRpc->msgType) {
D
dapan1121 已提交
738
      qWorkerProcessQueryMsg(mockPointer, mgmt, queryRpc, 0);
D
dapan1121 已提交
739
    } else if (TDMT_SCH_QUERY_CONTINUE == queryRpc->msgType) {
D
dapan1121 已提交
740
      qWorkerProcessCQueryMsg(mockPointer, mgmt, queryRpc, 0);
D
dapan1121 已提交
741 742 743 744
    } else {
      printf("unknown msg in query queue, type:%d\n", queryRpc->msgType);
      assert(0);
    }
D
dapan1121 已提交
745

wafwerar's avatar
wafwerar 已提交
746
    taosMemoryFree(queryRpc);
D
dapan1121 已提交
747

D
dapan1121 已提交
748
    if (qwtTestStop && qwtTestQueryQueueNum <= 0 && qwtTestCaseFinished) {
D
dapan1121 已提交
749 750
      break;
    }
D
dapan1121 已提交
751
  }
D
dapan1121 已提交
752

D
dapan1121 已提交
753 754
  atomic_add_fetch_32(&qwtTestQuitThreadNum, 1);

D
dapan 已提交
755
  return NULL;
D
dapan1121 已提交
756 757 758
}

void *fetchQueueThread(void *param) {
H
Hongze Cheng 已提交
759
  void    *mockPointer = (void *)0x1;
D
dapan1121 已提交
760
  SRpcMsg *fetchRpc = NULL;
H
Hongze Cheng 已提交
761
  void    *mgmt = param;
D
dapan1121 已提交
762

D
dapan1121 已提交
763
  while (true) {
D
dapan1121 已提交
764 765
    tsem_wait(&qwtTestFetchSem);

D
dapan1121 已提交
766 767
    if (qwtTestStop && qwtTestFetchQueueNum <= 0 && qwtTestCaseFinished) {
      break;
H
Hongze Cheng 已提交
768
    }
D
dapan1121 已提交
769

D
dapan1121 已提交
770 771 772 773 774
    taosWLockLatch(&qwtTestFetchQueueLock);
    if (qwtTestFetchQueueNum <= 0 || qwtTestFetchQueueRIdx == qwtTestFetchQueueWIdx) {
      printf("Fetch queue is empty\n");
      assert(0);
    }
H
Hongze Cheng 已提交
775

D
dapan1121 已提交
776
    fetchRpc = qwtTestFetchQueue[qwtTestFetchQueueRIdx++];
H
Hongze Cheng 已提交
777

D
dapan1121 已提交
778 779 780
    if (qwtTestFetchQueueRIdx >= qwtTestFetchQueueSize) {
      qwtTestFetchQueueRIdx = 0;
    }
H
Hongze Cheng 已提交
781

D
dapan1121 已提交
782 783 784
    qwtTestFetchQueueNum--;
    taosWUnLockLatch(&qwtTestFetchQueueLock);

D
dapan1121 已提交
785
    if (qwtTestEnableSleep && qwtTestReqMaxDelayUsec > 0) {
wafwerar's avatar
wafwerar 已提交
786
      int32_t delay = taosRand() % qwtTestReqMaxDelayUsec;
D
dapan1121 已提交
787 788

      if (delay) {
wafwerar's avatar
wafwerar 已提交
789
        taosUsleep(delay);
D
dapan1121 已提交
790 791 792
      }
    }

D
dapan1121 已提交
793
    switch (fetchRpc->msgType) {
D
dapan1121 已提交
794
      case TDMT_SCH_FETCH:
D
dapan1121 已提交
795
      case TDMT_SCH_MERGE_FETCH:
D
dapan1121 已提交
796
        qWorkerProcessFetchMsg(mockPointer, mgmt, fetchRpc, 0);
D
dapan1121 已提交
797
        break;
D
dapan1121 已提交
798
      case TDMT_SCH_CANCEL_TASK:
D
dapan1121 已提交
799
        qWorkerProcessCancelMsg(mockPointer, mgmt, fetchRpc, 0);
D
dapan1121 已提交
800
        break;
D
dapan1121 已提交
801
      case TDMT_SCH_DROP_TASK:
D
dapan1121 已提交
802
        qWorkerProcessDropMsg(mockPointer, mgmt, fetchRpc, 0);
D
dapan1121 已提交
803
        break;
D
dapan1121 已提交
804 805 806
      default:
        printf("unknown msg type:%d in fetch queue", fetchRpc->msgType);
        assert(0);
D
dapan1121 已提交
807
        break;
D
dapan1121 已提交
808
    }
D
dapan1121 已提交
809

wafwerar's avatar
wafwerar 已提交
810
    taosMemoryFree(fetchRpc);
D
dapan1121 已提交
811

D
dapan1121 已提交
812
    if (qwtTestStop && qwtTestFetchQueueNum <= 0 && qwtTestCaseFinished) {
D
dapan1121 已提交
813
      break;
H
Hongze Cheng 已提交
814
    }
D
dapan1121 已提交
815
  }
D
dapan1121 已提交
816

D
dapan1121 已提交
817 818
  atomic_add_fetch_32(&qwtTestQuitThreadNum, 1);

D
dapan 已提交
819
  return NULL;
D
dapan1121 已提交
820 821
}

H
Hongze Cheng 已提交
822
}  // namespace
D
dapan1121 已提交
823

D
dapan1121 已提交
824
TEST(seqTest, normalCase) {
H
Hongze Cheng 已提交
825
  void   *mgmt = NULL;
D
dapan 已提交
826
  int32_t code = 0;
H
Hongze Cheng 已提交
827
  void   *mockPointer = (void *)0x1;
D
dapan 已提交
828 829 830
  SRpcMsg queryRpc = {0};
  SRpcMsg fetchRpc = {0};
  SRpcMsg dropRpc = {0};
D
dapan1121 已提交
831 832

  qwtInitLogFile();
D
dapan 已提交
833

D
dapan1121 已提交
834 835 836
  qwtBuildQueryReqMsg(&queryRpc);
  qwtBuildFetchReqMsg(&qwtfetchMsg, &fetchRpc);
  qwtBuildDropReqMsg(&qwtdropMsg, &dropRpc);
H
Hongze Cheng 已提交
837

D
dapan 已提交
838
  stubSetStringToPlan();
D
dapan1121 已提交
839
  stubSetRpcSendResponse();
D
dapan1121 已提交
840 841 842 843 844 845 846 847 848
  stubSetExecTask();
  stubSetCreateExecTask();
  stubSetAsyncKillTask();
  stubSetDestroyTask();
  stubSetDestroyDataSinker();
  stubSetGetDataLength();
  stubSetEndPut();
  stubSetPutDataBlock();
  stubSetGetDataBlock();
H
Hongze Cheng 已提交
849

X
Xiaoyu Wang 已提交
850
  SMsgCb msgCb = {0};
851
  msgCb.mgmt = (void *)mockPointer;
S
Shengliang Guan 已提交
852
  msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue;
D
dapan1121 已提交
853
  code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb);
D
dapan 已提交
854 855
  ASSERT_EQ(code, 0);

D
dapan1121 已提交
856
  code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0);
D
dapan 已提交
857 858
  ASSERT_EQ(code, 0);

H
Hongze Cheng 已提交
859 860
  // code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc);
  // ASSERT_EQ(code, 0);
D
dapan 已提交
861

D
dapan1121 已提交
862
  code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0);
D
dapan 已提交
863 864
  ASSERT_EQ(code, 0);

D
dapan1121 已提交
865
  code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0);
D
dapan1121 已提交
866 867 868 869 870 871
  ASSERT_EQ(code, 0);

  qWorkerDestroy(&mgmt);
}

TEST(seqTest, cancelFirst) {
H
Hongze Cheng 已提交
872
  void   *mgmt = NULL;
D
dapan1121 已提交
873
  int32_t code = 0;
H
Hongze Cheng 已提交
874
  void   *mockPointer = (void *)0x1;
D
dapan1121 已提交
875 876
  SRpcMsg queryRpc = {0};
  SRpcMsg dropRpc = {0};
D
dapan1121 已提交
877 878

  qwtInitLogFile();
H
Hongze Cheng 已提交
879

D
dapan1121 已提交
880 881
  qwtBuildQueryReqMsg(&queryRpc);
  qwtBuildDropReqMsg(&qwtdropMsg, &dropRpc);
D
dapan1121 已提交
882 883 884

  stubSetStringToPlan();
  stubSetRpcSendResponse();
S
Shengliang Guan 已提交
885

X
Xiaoyu Wang 已提交
886
  SMsgCb msgCb = {0};
887
  msgCb.mgmt = (void *)mockPointer;
S
Shengliang Guan 已提交
888
  msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue;
D
dapan1121 已提交
889
  code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb);
D
dapan1121 已提交
890 891
  ASSERT_EQ(code, 0);

D
dapan1121 已提交
892
  code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0);
D
dapan 已提交
893
  ASSERT_EQ(code, 0);
D
dapan1121 已提交
894

D
dapan1121 已提交
895
  code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0);
D
dapan1121 已提交
896
  ASSERT_TRUE(0 != code);
D
dapan1121 已提交
897 898 899 900 901

  qWorkerDestroy(&mgmt);
}

TEST(seqTest, randCase) {
H
Hongze Cheng 已提交
902 903 904 905 906 907 908 909 910 911 912
  void              *mgmt = NULL;
  int32_t            code = 0;
  void              *mockPointer = (void *)0x1;
  SRpcMsg            queryRpc = {0};
  SRpcMsg            readyRpc = {0};
  SRpcMsg            fetchRpc = {0};
  SRpcMsg            dropRpc = {0};
  SRpcMsg            statusRpc = {0};
  SResReadyReq       readyMsg = {0};
  SResFetchReq       fetchMsg = {0};
  STaskDropReq       dropMsg = {0};
S
Shengliang Guan 已提交
913
  SSchTasksStatusReq statusMsg = {0};
D
dapan1121 已提交
914 915

  qwtInitLogFile();
H
Hongze Cheng 已提交
916

D
dapan1121 已提交
917 918
  stubSetStringToPlan();
  stubSetRpcSendResponse();
D
dapan1121 已提交
919
  stubSetCreateExecTask();
D
dapan1121 已提交
920

wafwerar's avatar
wafwerar 已提交
921
  taosSeedRand(taosGetTimestampSec());
S
Shengliang Guan 已提交
922

X
Xiaoyu Wang 已提交
923
  SMsgCb msgCb = {0};
924
  msgCb.mgmt = (void *)mockPointer;
S
Shengliang Guan 已提交
925
  msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue;
D
dapan1121 已提交
926
  code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb);
D
dapan1121 已提交
927 928 929 930 931
  ASSERT_EQ(code, 0);

  int32_t t = 0;
  int32_t maxr = 10001;
  while (true) {
wafwerar's avatar
wafwerar 已提交
932
    int32_t r = taosRand() % maxr;
H
Hongze Cheng 已提交
933 934 935

    if (r >= 0 && r < maxr / 5) {
      printf("Query,%d\n", t++);
D
dapan1121 已提交
936
      qwtBuildQueryReqMsg(&queryRpc);
D
dapan1121 已提交
937
      code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0);
H
Hongze Cheng 已提交
938 939 940 941 942 943 944 945
    } else if (r >= maxr / 5 && r < maxr * 2 / 5) {
      // printf("Ready,%d\n", t++);
      // qwtBuildReadyReqMsg(&readyMsg, &readyRpc);
      // code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc);
      // if (qwtTestEnableSleep) {
      //   taosUsleep(1);
      // }
    } else if (r >= maxr * 2 / 5 && r < maxr * 3 / 5) {
D
dapan1121 已提交
946
      printf("Fetch,%d\n", t++);
D
dapan1121 已提交
947
      qwtBuildFetchReqMsg(&fetchMsg, &fetchRpc);
D
dapan1121 已提交
948
      code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0);
D
dapan1121 已提交
949
      if (qwtTestEnableSleep) {
wafwerar's avatar
wafwerar 已提交
950
        taosUsleep(1);
D
dapan1121 已提交
951
      }
H
Hongze Cheng 已提交
952
    } else if (r >= maxr * 3 / 5 && r < maxr * 4 / 5) {
D
dapan1121 已提交
953
      printf("Drop,%d\n", t++);
D
dapan1121 已提交
954
      qwtBuildDropReqMsg(&dropMsg, &dropRpc);
D
dapan1121 已提交
955
      code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0);
D
dapan1121 已提交
956
      if (qwtTestEnableSleep) {
wafwerar's avatar
wafwerar 已提交
957
        taosUsleep(1);
D
dapan1121 已提交
958
      }
H
Hongze Cheng 已提交
959
    } else if (r >= maxr * 4 / 5 && r < maxr - 1) {
D
dapan1121 已提交
960
      printf("Status,%d\n", t++);
D
dapan1121 已提交
961
      if (qwtTestEnableSleep) {
wafwerar's avatar
wafwerar 已提交
962
        taosUsleep(1);
H
Hongze Cheng 已提交
963
      }
D
dapan1121 已提交
964 965 966 967 968 969 970 971 972 973
    } else {
      printf("QUIT RAND NOW");
      break;
    }
  }

  qWorkerDestroy(&mgmt);
}

TEST(seqTest, multithreadRand) {
H
Hongze Cheng 已提交
974
  void   *mgmt = NULL;
D
dapan1121 已提交
975
  int32_t code = 0;
H
Hongze Cheng 已提交
976
  void   *mockPointer = (void *)0x1;
D
dapan1121 已提交
977 978

  qwtInitLogFile();
H
Hongze Cheng 已提交
979

D
dapan1121 已提交
980 981
  stubSetStringToPlan();
  stubSetRpcSendResponse();
D
dapan1121 已提交
982 983 984 985 986 987 988 989 990
  stubSetExecTask();
  stubSetCreateExecTask();
  stubSetAsyncKillTask();
  stubSetDestroyTask();
  stubSetDestroyDataSinker();
  stubSetGetDataLength();
  stubSetEndPut();
  stubSetPutDataBlock();
  stubSetGetDataBlock();
D
dapan1121 已提交
991

wafwerar's avatar
wafwerar 已提交
992
  taosSeedRand(taosGetTimestampSec());
S
Shengliang Guan 已提交
993

X
Xiaoyu Wang 已提交
994
  SMsgCb msgCb = {0};
995
  msgCb.mgmt = (void *)mockPointer;
S
Shengliang Guan 已提交
996
  msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue;
D
dapan1121 已提交
997
  code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb);
D
dapan1121 已提交
998 999
  ASSERT_EQ(code, 0);

wafwerar's avatar
wafwerar 已提交
1000 1001
  TdThreadAttr thattr;
  taosThreadAttrInit(&thattr);
D
dapan1121 已提交
1002

H
Hongze Cheng 已提交
1003
  TdThread t1, t2, t3, t4, t5, t6;
wafwerar's avatar
wafwerar 已提交
1004
  taosThreadCreate(&(t1), &thattr, queryThread, mgmt);
H
Hongze Cheng 已提交
1005
  // taosThreadCreate(&(t2), &thattr, readyThread, NULL);
wafwerar's avatar
wafwerar 已提交
1006 1007
  taosThreadCreate(&(t3), &thattr, fetchThread, NULL);
  taosThreadCreate(&(t4), &thattr, dropThread, NULL);
X
Xiaoyu Wang 已提交
1008
  taosThreadCreate(&(t6), &thattr, fetchQueueThread, mgmt);
D
dapan1121 已提交
1009

D
dapan1121 已提交
1010 1011
  while (true) {
    if (qwtTestDeadLoop) {
wafwerar's avatar
wafwerar 已提交
1012
      taosSsleep(1);
D
dapan1121 已提交
1013
    } else {
wafwerar's avatar
wafwerar 已提交
1014
      taosSsleep(qwtTestMTRunSec);
D
dapan1121 已提交
1015 1016 1017
      break;
    }
  }
H
Hongze Cheng 已提交
1018

D
dapan1121 已提交
1019
  qwtTestStop = true;
wafwerar's avatar
wafwerar 已提交
1020
  taosSsleep(3);
D
dapan1121 已提交
1021 1022 1023 1024 1025 1026 1027 1028 1029

  qwtTestQueryQueueNum = 0;
  qwtTestQueryQueueRIdx = 0;
  qwtTestQueryQueueWIdx = 0;
  qwtTestQueryQueueLock = 0;
  qwtTestFetchQueueNum = 0;
  qwtTestFetchQueueRIdx = 0;
  qwtTestFetchQueueWIdx = 0;
  qwtTestFetchQueueLock = 0;
H
Hongze Cheng 已提交
1030

D
dapan1121 已提交
1031 1032 1033
  qWorkerDestroy(&mgmt);
}

D
dapan1121 已提交
1034
TEST(rcTest, shortExecshortDelay) {
H
Hongze Cheng 已提交
1035
  void   *mgmt = NULL;
D
dapan1121 已提交
1036
  int32_t code = 0;
H
Hongze Cheng 已提交
1037
  void   *mockPointer = (void *)0x1;
D
dapan1121 已提交
1038 1039

  qwtInitLogFile();
H
Hongze Cheng 已提交
1040

D
dapan1121 已提交
1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052
  stubSetStringToPlan();
  stubSetRpcSendResponse();
  stubSetExecTask();
  stubSetCreateExecTask();
  stubSetAsyncKillTask();
  stubSetDestroyTask();
  stubSetDestroyDataSinker();
  stubSetGetDataLength();
  stubSetEndPut();
  stubSetPutDataBlock();
  stubSetGetDataBlock();

wafwerar's avatar
wafwerar 已提交
1053
  taosSeedRand(taosGetTimestampSec());
D
dapan1121 已提交
1054 1055 1056
  qwtTestStop = false;
  qwtTestQuitThreadNum = 0;

S
Shengliang Guan 已提交
1057
  SMsgCb msgCb = {0};
1058
  msgCb.mgmt = (void *)mockPointer;
S
Shengliang Guan 已提交
1059
  msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue;
D
dapan1121 已提交
1060
  code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb);
D
dapan1121 已提交
1061 1062 1063 1064 1065 1066 1067 1068
  ASSERT_EQ(code, 0);

  qwtTestMaxExecTaskUsec = 0;
  qwtTestReqMaxDelayUsec = 0;

  tsem_init(&qwtTestQuerySem, 0, 0);
  tsem_init(&qwtTestFetchSem, 0, 0);

wafwerar's avatar
wafwerar 已提交
1069 1070
  TdThreadAttr thattr;
  taosThreadAttrInit(&thattr);
D
dapan1121 已提交
1071

H
Hongze Cheng 已提交
1072
  TdThread t1, t2, t3, t4, t5;
wafwerar's avatar
wafwerar 已提交
1073 1074 1075
  taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt);
  taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt);
  taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt);
D
dapan1121 已提交
1076 1077 1078

  while (true) {
    if (qwtTestDeadLoop) {
wafwerar's avatar
wafwerar 已提交
1079
      taosSsleep(1);
D
dapan1121 已提交
1080
    } else {
wafwerar's avatar
wafwerar 已提交
1081
      taosSsleep(qwtTestMTRunSec);
D
dapan1121 已提交
1082 1083 1084
      break;
    }
  }
H
Hongze Cheng 已提交
1085

D
dapan1121 已提交
1086 1087 1088 1089 1090 1091
  qwtTestStop = true;

  while (true) {
    if (qwtTestQuitThreadNum == 3) {
      break;
    }
H
Hongze Cheng 已提交
1092

wafwerar's avatar
wafwerar 已提交
1093
    taosSsleep(1);
D
dapan1121 已提交
1094 1095

    if (qwtTestCaseFinished) {
H
Hongze Cheng 已提交
1096
      if (qwtTestQuitThreadNum < 3) {
D
dapan1121 已提交
1097 1098 1099
        tsem_post(&qwtTestQuerySem);
        tsem_post(&qwtTestFetchSem);

wafwerar's avatar
wafwerar 已提交
1100
        taosUsleep(10);
D
dapan1121 已提交
1101 1102
      }
    }
D
dapan1121 已提交
1103 1104 1105 1106 1107 1108 1109 1110 1111 1112
  }

  qwtTestQueryQueueNum = 0;
  qwtTestQueryQueueRIdx = 0;
  qwtTestQueryQueueWIdx = 0;
  qwtTestQueryQueueLock = 0;
  qwtTestFetchQueueNum = 0;
  qwtTestFetchQueueRIdx = 0;
  qwtTestFetchQueueWIdx = 0;
  qwtTestFetchQueueLock = 0;
H
Hongze Cheng 已提交
1113 1114

  qWorkerDestroy(&mgmt);
D
dapan1121 已提交
1115 1116 1117
}

TEST(rcTest, longExecshortDelay) {
H
Hongze Cheng 已提交
1118
  void   *mgmt = NULL;
D
dapan1121 已提交
1119
  int32_t code = 0;
H
Hongze Cheng 已提交
1120
  void   *mockPointer = (void *)0x1;
D
dapan1121 已提交
1121 1122

  qwtInitLogFile();
H
Hongze Cheng 已提交
1123

D
dapan1121 已提交
1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135
  stubSetStringToPlan();
  stubSetRpcSendResponse();
  stubSetExecTask();
  stubSetCreateExecTask();
  stubSetAsyncKillTask();
  stubSetDestroyTask();
  stubSetDestroyDataSinker();
  stubSetGetDataLength();
  stubSetEndPut();
  stubSetPutDataBlock();
  stubSetGetDataBlock();

wafwerar's avatar
wafwerar 已提交
1136
  taosSeedRand(taosGetTimestampSec());
D
dapan1121 已提交
1137 1138 1139
  qwtTestStop = false;
  qwtTestQuitThreadNum = 0;

S
Shengliang Guan 已提交
1140
  SMsgCb msgCb = {0};
1141
  msgCb.mgmt = (void *)mockPointer;
S
Shengliang Guan 已提交
1142
  msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue;
D
dapan1121 已提交
1143
  code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb);
D
dapan1121 已提交
1144 1145 1146 1147 1148 1149 1150 1151
  ASSERT_EQ(code, 0);

  qwtTestMaxExecTaskUsec = 1000000;
  qwtTestReqMaxDelayUsec = 0;

  tsem_init(&qwtTestQuerySem, 0, 0);
  tsem_init(&qwtTestFetchSem, 0, 0);

wafwerar's avatar
wafwerar 已提交
1152 1153
  TdThreadAttr thattr;
  taosThreadAttrInit(&thattr);
D
dapan1121 已提交
1154

H
Hongze Cheng 已提交
1155
  TdThread t1, t2, t3, t4, t5;
wafwerar's avatar
wafwerar 已提交
1156 1157 1158
  taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt);
  taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt);
  taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt);
D
dapan1121 已提交
1159 1160 1161

  while (true) {
    if (qwtTestDeadLoop) {
wafwerar's avatar
wafwerar 已提交
1162
      taosSsleep(1);
D
dapan1121 已提交
1163
    } else {
wafwerar's avatar
wafwerar 已提交
1164
      taosSsleep(qwtTestMTRunSec);
D
dapan1121 已提交
1165 1166 1167
      break;
    }
  }
H
Hongze Cheng 已提交
1168

D
dapan1121 已提交
1169 1170 1171 1172 1173 1174
  qwtTestStop = true;

  while (true) {
    if (qwtTestQuitThreadNum == 3) {
      break;
    }
H
Hongze Cheng 已提交
1175

wafwerar's avatar
wafwerar 已提交
1176
    taosSsleep(1);
D
dapan1121 已提交
1177 1178

    if (qwtTestCaseFinished) {
H
Hongze Cheng 已提交
1179
      if (qwtTestQuitThreadNum < 3) {
D
dapan1121 已提交
1180 1181
        tsem_post(&qwtTestQuerySem);
        tsem_post(&qwtTestFetchSem);
H
Hongze Cheng 已提交
1182

wafwerar's avatar
wafwerar 已提交
1183
        taosUsleep(10);
D
dapan1121 已提交
1184 1185
      }
    }
D
dapan1121 已提交
1186 1187 1188 1189 1190 1191 1192 1193 1194 1195
  }

  qwtTestQueryQueueNum = 0;
  qwtTestQueryQueueRIdx = 0;
  qwtTestQueryQueueWIdx = 0;
  qwtTestQueryQueueLock = 0;
  qwtTestFetchQueueNum = 0;
  qwtTestFetchQueueRIdx = 0;
  qwtTestFetchQueueWIdx = 0;
  qwtTestFetchQueueLock = 0;
H
Hongze Cheng 已提交
1196

D
dapan1121 已提交
1197 1198 1199 1200
  qWorkerDestroy(&mgmt);
}

TEST(rcTest, shortExeclongDelay) {
H
Hongze Cheng 已提交
1201
  void   *mgmt = NULL;
D
dapan1121 已提交
1202
  int32_t code = 0;
H
Hongze Cheng 已提交
1203
  void   *mockPointer = (void *)0x1;
D
dapan1121 已提交
1204 1205

  qwtInitLogFile();
H
Hongze Cheng 已提交
1206

D
dapan1121 已提交
1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218
  stubSetStringToPlan();
  stubSetRpcSendResponse();
  stubSetExecTask();
  stubSetCreateExecTask();
  stubSetAsyncKillTask();
  stubSetDestroyTask();
  stubSetDestroyDataSinker();
  stubSetGetDataLength();
  stubSetEndPut();
  stubSetPutDataBlock();
  stubSetGetDataBlock();

wafwerar's avatar
wafwerar 已提交
1219
  taosSeedRand(taosGetTimestampSec());
D
dapan1121 已提交
1220 1221 1222
  qwtTestStop = false;
  qwtTestQuitThreadNum = 0;

S
Shengliang Guan 已提交
1223
  SMsgCb msgCb = {0};
1224
  msgCb.mgmt = (void *)mockPointer;
S
Shengliang Guan 已提交
1225
  msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue;
D
dapan1121 已提交
1226
  code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb);
D
dapan1121 已提交
1227 1228 1229 1230 1231 1232 1233 1234
  ASSERT_EQ(code, 0);

  qwtTestMaxExecTaskUsec = 0;
  qwtTestReqMaxDelayUsec = 1000000;

  tsem_init(&qwtTestQuerySem, 0, 0);
  tsem_init(&qwtTestFetchSem, 0, 0);

wafwerar's avatar
wafwerar 已提交
1235 1236
  TdThreadAttr thattr;
  taosThreadAttrInit(&thattr);
D
dapan1121 已提交
1237

H
Hongze Cheng 已提交
1238
  TdThread t1, t2, t3, t4, t5;
wafwerar's avatar
wafwerar 已提交
1239 1240 1241
  taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt);
  taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt);
  taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt);
D
dapan1121 已提交
1242 1243 1244

  while (true) {
    if (qwtTestDeadLoop) {
wafwerar's avatar
wafwerar 已提交
1245
      taosSsleep(1);
D
dapan1121 已提交
1246
    } else {
wafwerar's avatar
wafwerar 已提交
1247
      taosSsleep(qwtTestMTRunSec);
D
dapan1121 已提交
1248 1249 1250 1251
      break;
    }
  }

H
Hongze Cheng 已提交
1252
  qwtTestStop = true;
D
dapan1121 已提交
1253 1254 1255 1256 1257

  while (true) {
    if (qwtTestQuitThreadNum == 3) {
      break;
    }
H
Hongze Cheng 已提交
1258

wafwerar's avatar
wafwerar 已提交
1259
    taosSsleep(1);
D
dapan1121 已提交
1260 1261

    if (qwtTestCaseFinished) {
H
Hongze Cheng 已提交
1262
      if (qwtTestQuitThreadNum < 3) {
D
dapan1121 已提交
1263 1264
        tsem_post(&qwtTestQuerySem);
        tsem_post(&qwtTestFetchSem);
H
Hongze Cheng 已提交
1265

wafwerar's avatar
wafwerar 已提交
1266
        taosUsleep(10);
D
dapan1121 已提交
1267 1268
      }
    }
D
dapan1121 已提交
1269 1270 1271 1272 1273 1274 1275 1276 1277 1278
  }

  qwtTestQueryQueueNum = 0;
  qwtTestQueryQueueRIdx = 0;
  qwtTestQueryQueueWIdx = 0;
  qwtTestQueryQueueLock = 0;
  qwtTestFetchQueueNum = 0;
  qwtTestFetchQueueRIdx = 0;
  qwtTestFetchQueueWIdx = 0;
  qwtTestFetchQueueLock = 0;
H
Hongze Cheng 已提交
1279

D
dapan1121 已提交
1280 1281 1282 1283
  qWorkerDestroy(&mgmt);
}

TEST(rcTest, dropTest) {
H
Hongze Cheng 已提交
1284
  void   *mgmt = NULL;
D
dapan1121 已提交
1285
  int32_t code = 0;
H
Hongze Cheng 已提交
1286
  void   *mockPointer = (void *)0x1;
D
dapan1121 已提交
1287 1288

  qwtInitLogFile();
H
Hongze Cheng 已提交
1289

D
dapan1121 已提交
1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301
  stubSetStringToPlan();
  stubSetRpcSendResponse();
  stubSetExecTask();
  stubSetCreateExecTask();
  stubSetAsyncKillTask();
  stubSetDestroyTask();
  stubSetDestroyDataSinker();
  stubSetGetDataLength();
  stubSetEndPut();
  stubSetPutDataBlock();
  stubSetGetDataBlock();

wafwerar's avatar
wafwerar 已提交
1302
  taosSeedRand(taosGetTimestampSec());
S
Shengliang Guan 已提交
1303

X
Xiaoyu Wang 已提交
1304
  SMsgCb msgCb = {0};
1305
  msgCb.mgmt = (void *)mockPointer;
S
Shengliang Guan 已提交
1306
  msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue;
D
dapan1121 已提交
1307
  code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb);
D
dapan1121 已提交
1308 1309
  ASSERT_EQ(code, 0);

D
dapan1121 已提交
1310 1311 1312
  tsem_init(&qwtTestQuerySem, 0, 0);
  tsem_init(&qwtTestFetchSem, 0, 0);

wafwerar's avatar
wafwerar 已提交
1313 1314
  TdThreadAttr thattr;
  taosThreadAttrInit(&thattr);
D
dapan1121 已提交
1315

H
Hongze Cheng 已提交
1316
  TdThread t1, t2, t3, t4, t5;
X
Xiaoyu Wang 已提交
1317
  taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt);
wafwerar's avatar
wafwerar 已提交
1318 1319
  taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt);
  taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt);
D
dapan1121 已提交
1320 1321 1322

  while (true) {
    if (qwtTestDeadLoop) {
wafwerar's avatar
wafwerar 已提交
1323
      taosSsleep(1);
D
dapan1121 已提交
1324
    } else {
wafwerar's avatar
wafwerar 已提交
1325
      taosSsleep(qwtTestMTRunSec);
D
dapan1121 已提交
1326 1327 1328
      break;
    }
  }
H
Hongze Cheng 已提交
1329

D
dapan1121 已提交
1330
  qwtTestStop = true;
wafwerar's avatar
wafwerar 已提交
1331
  taosSsleep(3);
H
Hongze Cheng 已提交
1332

D
dapan1121 已提交
1333
  qWorkerDestroy(&mgmt);
D
dapan 已提交
1334 1335
}

H
Hongze Cheng 已提交
1336
int main(int argc, char **argv) {
wafwerar's avatar
wafwerar 已提交
1337
  taosSeedRand(taosGetTimestampSec());
D
dapan1121 已提交
1338 1339 1340 1341
  testing::InitGoogleTest(&argc, argv);
  return RUN_ALL_TESTS();
}

D
dapan1121 已提交
1342
#pragma GCC diagnostic pop