qworkerTests.cpp 25.3 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include <gtest/gtest.h>
#include <tglobal.h>
#include <iostream>
#pragma GCC diagnostic ignored "-Wwrite-strings"

#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
#include "os.h"

#include "taos.h"
#include "tdef.h"
#include "tvariant.h"
#include "tep.h"
#include "trpc.h"
D
dapan 已提交
31 32
#include "planner.h"
#include "qworker.h"
D
dapan1121 已提交
33 34
#include "stub.h"
#include "addr_any.h"
D
dapan1121 已提交
35 36
#include "executor.h"
#include "dataSinkMgt.h"
D
dapan1121 已提交
37 38 39 40


namespace {

D
dapan1121 已提交
41 42
#define qwtTestQueryQueueSize 1000
#define qwtTestFetchQueueSize 1000
D
dapan1121 已提交
43
#define qwtTestMaxExecTaskUsec 2
D
dapan1121 已提交
44

D
dapan1121 已提交
45
uint64_t qwtTestQueryId = 0;
D
dapan1121 已提交
46 47 48 49 50 51 52
bool qwtTestEnableSleep = true;
bool qwtTestStop = false;
bool qwtTestDeadLoop = true;
int32_t qwtTestMTRunSec = 10;
int32_t qwtTestPrintNum = 100000;
int32_t qwtTestCaseIdx = 0;
int32_t qwtTestCaseNum = 4;
D
dapan1121 已提交
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
bool qwtTestCaseFinished = false;
tsem_t qwtTestQuerySem;
tsem_t qwtTestFetchSem;

int32_t qwtTestQueryQueueRIdx = 0;
int32_t qwtTestQueryQueueWIdx = 0;
int32_t qwtTestQueryQueueNum = 0;
SRWLatch qwtTestQueryQueueLock = 0;
struct SRpcMsg *qwtTestQueryQueue[qwtTestQueryQueueSize] = {0};

int32_t qwtTestFetchQueueRIdx = 0;
int32_t qwtTestFetchQueueWIdx = 0;
int32_t qwtTestFetchQueueNum = 0;
SRWLatch qwtTestFetchQueueLock = 0;
struct SRpcMsg *qwtTestFetchQueue[qwtTestFetchQueueSize] = {0};


int32_t qwtTestSinkBlockNum = 0;
int32_t qwtTestSinkMaxBlockNum = 0;
bool qwtTestSinkQueryEnd = false;
SRWLatch qwtTestSinkLock = 0;
D
dapan1121 已提交
74
int32_t qwtTestSinkLastLen = 0;
D
dapan1121 已提交
75 76


D
dapan1121 已提交
77
SSubQueryMsg qwtqueryMsg = {0};
D
dapan1121 已提交
78 79 80 81 82 83
SRpcMsg qwtfetchRpc = {0};
SResFetchReq qwtfetchMsg = {0};
SRpcMsg qwtreadyRpc = {0};
SResReadyReq qwtreadyMsg = {0};
SRpcMsg qwtdropRpc = {0};
STaskDropReq qwtdropMsg = {0};  
D
dapan1121 已提交
84
SSchTasksStatusReq qwtstatusMsg = {0};
D
dapan1121 已提交
85

D
dapan1121 已提交
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102

void qwtInitLogFile() {
  const char    *defaultLogFileNamePrefix = "taosdlog";
  const int32_t  maxLogFileNum = 10;

  tsAsyncLog = 0;
  qDebugFlag = 159;

  char temp[128] = {0};
  sprintf(temp, "%s/%s", tsLogDir, defaultLogFileNamePrefix);
  if (taosInitLog(temp, tsNumOfLogLines, maxLogFileNum) < 0) {
    printf("failed to open log file in directory:%s\n", tsLogDir);
  }

}

void qwtBuildQueryReqMsg(SRpcMsg *queryRpc) {
D
dapan1121 已提交
103 104 105 106 107
  qwtqueryMsg.queryId = htobe64(atomic_add_fetch_64(&qwtTestQueryId, 1));
  qwtqueryMsg.sId = htobe64(1);
  qwtqueryMsg.taskId = htobe64(1);
  qwtqueryMsg.contentLen = htonl(100);
  queryRpc->pCont = &qwtqueryMsg;
D
dapan1121 已提交
108 109 110 111 112
  queryRpc->contLen = sizeof(SSubQueryMsg) + 100;
}

void qwtBuildReadyReqMsg(SResReadyReq *readyMsg, SRpcMsg *readyRpc) {
  readyMsg->sId = htobe64(1);
D
dapan1121 已提交
113
  readyMsg->queryId = htobe64(atomic_load_64(&qwtTestQueryId));
D
dapan1121 已提交
114 115 116 117 118 119 120
  readyMsg->taskId = htobe64(1);
  readyRpc->pCont = readyMsg;
  readyRpc->contLen = sizeof(SResReadyReq);
}

void qwtBuildFetchReqMsg(SResFetchReq *fetchMsg, SRpcMsg *fetchRpc) {
  fetchMsg->sId = htobe64(1);
D
dapan1121 已提交
121
  fetchMsg->queryId = htobe64(atomic_load_64(&qwtTestQueryId));
D
dapan1121 已提交
122 123 124 125 126 127 128
  fetchMsg->taskId = htobe64(1);
  fetchRpc->pCont = fetchMsg;
  fetchRpc->contLen = sizeof(SResFetchReq);
}

void qwtBuildDropReqMsg(STaskDropReq *dropMsg, SRpcMsg *dropRpc) {
  dropMsg->sId = htobe64(1);
D
dapan1121 已提交
129
  dropMsg->queryId = htobe64(atomic_load_64(&qwtTestQueryId));
D
dapan1121 已提交
130 131 132 133 134 135 136 137 138 139 140
  dropMsg->taskId = htobe64(1);
  dropRpc->pCont = dropMsg;
  dropRpc->contLen = sizeof(STaskDropReq);
}

void qwtBuildStatusReqMsg(SSchTasksStatusReq *statusMsg, SRpcMsg *statusRpc) {
  statusMsg->sId = htobe64(1);
  statusRpc->pCont = statusMsg;
  statusRpc->contLen = sizeof(SSchTasksStatusReq);
  statusRpc->msgType = TDMT_VND_TASKS_STATUS;
}
D
dapan1121 已提交
141

D
dapan 已提交
142
int32_t qwtStringToPlan(const char* str, SSubplan** subplan) {
D
dapan 已提交
143
  *subplan = (SSubplan *)0x1;
D
dapan 已提交
144
  return 0;
D
dapan1121 已提交
145 146
}

D
dapan1121 已提交
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
int32_t qwtPutReqToFetchQueue(void *node, struct SRpcMsg *pMsg) {
  taosWLockLatch(&qwtTestFetchQueueLock);
  qwtTestFetchQueue[qwtTestFetchQueueWIdx++] = pMsg;
  if (qwtTestFetchQueueWIdx >= qwtTestFetchQueueSize) {
    qwtTestFetchQueueWIdx = 0;
  }
  
  qwtTestFetchQueueNum++;

  if (qwtTestFetchQueueWIdx == qwtTestFetchQueueRIdx) {
    printf("Fetch queue is full");
    assert(0);
  }
  taosWUnLockLatch(&qwtTestFetchQueueLock);
  
  tsem_post(&qwtTestFetchSem);
  
  return 0;
}


D
dapan1121 已提交
168
int32_t qwtPutReqToQueue(void *node, struct SRpcMsg *pMsg) {
D
dapan1121 已提交
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
  taosWLockLatch(&qwtTestQueryQueueLock);
  qwtTestQueryQueue[qwtTestQueryQueueWIdx++] = pMsg;
  if (qwtTestQueryQueueWIdx >= qwtTestQueryQueueSize) {
    qwtTestQueryQueueWIdx = 0;
  }
  
  qwtTestQueryQueueNum++;

  if (qwtTestQueryQueueWIdx == qwtTestQueryQueueRIdx) {
    printf("query queue is full");
    assert(0);
  }
  taosWUnLockLatch(&qwtTestQueryQueueLock);
  
  tsem_post(&qwtTestQuerySem);
  
D
dapan1121 已提交
185 186 187 188
  return 0;
}


D
dapan1121 已提交
189

D
dapan1121 已提交
190
void qwtRpcSendResponse(const SRpcMsg *pRsp) {
D
dapan1121 已提交
191 192 193 194 195 196 197

  switch (pRsp->msgType) {
    case TDMT_VND_QUERY_RSP: {
      SQueryTableRsp *rsp = (SQueryTableRsp *)pRsp->pCont;

      if (0 == pRsp->code) {
        qwtBuildReadyReqMsg(&qwtreadyMsg, &qwtreadyRpc);    
D
dapan 已提交
198
        qwtPutReqToFetchQueue((void *)0x1, &qwtreadyRpc);
D
dapan1121 已提交
199 200
      } else {
        qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc);
D
dapan 已提交
201
        qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc);
D
dapan1121 已提交
202 203 204 205 206 207 208 209 210
      }
      
      break;
    }
    case TDMT_VND_RES_READY_RSP: {
      SResReadyRsp *rsp = (SResReadyRsp *)pRsp->pCont;
      
      if (0 == pRsp->code) {
        qwtBuildFetchReqMsg(&qwtfetchMsg, &qwtfetchRpc);
D
dapan 已提交
211
        qwtPutReqToFetchQueue((void *)0x1, &qwtfetchRpc);
D
dapan1121 已提交
212 213
      } else {
        qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc);
D
dapan 已提交
214
        qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc);
D
dapan1121 已提交
215 216 217 218 219 220 221 222
      }
      break;
    }
    case TDMT_VND_FETCH_RSP: {
      SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)pRsp->pCont;
  
      if (0 == pRsp->code && 0 == rsp->completed) {
        qwtBuildFetchReqMsg(&qwtfetchMsg, &qwtfetchRpc);
D
dapan 已提交
223
        qwtPutReqToFetchQueue((void *)0x1, &qwtfetchRpc);
D
dapan1121 已提交
224 225 226 227
        return;
      }

      qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc);
D
dapan 已提交
228
      qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc);
D
dapan1121 已提交
229 230 231 232 233 234 235 236
      
      break;
    }
    case TDMT_VND_DROP_TASK: {
      STaskDropRsp *rsp = (STaskDropRsp *)pRsp->pCont;

      qwtTestCaseFinished = true;
      break;
D
dapan1121 已提交
237 238
    }
  }
D
dapan1121 已提交
239
  
D
dapan1121 已提交
240 241 242
  return;
}

D
dapan1121 已提交
243
int32_t qwtCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle) {
D
dapan1121 已提交
244 245 246 247 248
  int32_t idx = abs((++qwtTestCaseIdx) % qwtTestCaseNum);

  qwtTestSinkBlockNum = 0;
  qwtTestSinkMaxBlockNum = rand() % 100 + 1;
  qwtTestSinkQueryEnd = false;
D
dapan1121 已提交
249 250
  
  if (0 == idx) {
D
dapan1121 已提交
251 252
    *pTaskInfo = (qTaskInfo_t)qwtTestCaseIdx;
    *handle = (DataSinkHandle)qwtTestCaseIdx+1;
D
dapan1121 已提交
253 254 255 256
  } else if (1 == idx) {
    *pTaskInfo = NULL;
    *handle = NULL;
  } else if (2 == idx) {
D
dapan1121 已提交
257
    *pTaskInfo = (qTaskInfo_t)qwtTestCaseIdx;
D
dapan1121 已提交
258 259 260
    *handle = NULL;
  } else if (3 == idx) {
    *pTaskInfo = NULL;
D
dapan1121 已提交
261
    *handle = (DataSinkHandle)qwtTestCaseIdx;
D
dapan1121 已提交
262 263 264 265 266 267
  }
  
  return 0;
}

int32_t qwtExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
D
dapan1121 已提交
268 269 270 271 272 273 274 275 276 277 278
  int32_t endExec = 0;
  
  if (NULL == tinfo) {
    *pRes = NULL;
    *useconds = 0;
  } else {
    endExec = rand() % 5;
    
    if (endExec) {
      usleep(rand() % qwtTestMaxExecTaskUsec);
      
D
dapan1121 已提交
279 280
      *pRes = (SSDataBlock*)calloc(1, sizeof(SSDataBlock));
      (*pRes)->info.rows = rand() % 1000;
D
dapan1121 已提交
281 282 283 284 285 286 287
    } else {
      *pRes = NULL;
      usleep(rand() % qwtTestMaxExecTaskUsec);
      *useconds = rand() % 10;
    }
  }
  
D
dapan1121 已提交
288 289 290 291 292 293 294 295 296 297 298 299
  return 0;
}

int32_t qwtKillTask(qTaskInfo_t qinfo) {
  return 0;
}

void qwtDestroyTask(qTaskInfo_t qHandle) {
}


int32_t qwtPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* pContinue) {
D
dapan1121 已提交
300 301 302 303
  if (NULL == handle || NULL == pInput || NULL == pContinue) {
    assert(0);
  }

D
dapan1121 已提交
304 305
  free((void *)pInput->pData);

D
dapan1121 已提交
306 307 308 309 310 311 312 313 314 315 316
  taosWLockLatch(&qwtTestSinkLock);

  qwtTestSinkBlockNum++;

  if (qwtTestSinkBlockNum >= qwtTestSinkMaxBlockNum) {
    *pContinue = true;
  } else {
    *pContinue = false;
  }
  taosWUnLockLatch(&qwtTestSinkLock);
  
D
dapan1121 已提交
317 318 319 320
  return 0;
}

void qwtEndPut(DataSinkHandle handle, uint64_t useconds) {
D
dapan1121 已提交
321 322 323 324 325
  if (NULL == handle) {
    assert(0);
  }

  qwtTestSinkQueryEnd = true;
D
dapan1121 已提交
326 327 328
}

void qwtGetDataLength(DataSinkHandle handle, int32_t* pLen, bool* pQueryEnd) {
D
dapan1121 已提交
329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347
  static int32_t in = 0;

  if (in > 0) {
    assert(0);
  }

  atomic_add_fetch_32(&in, 1);
  
  if (NULL == handle) {
    assert(0);
  }

  taosWLockLatch(&qwtTestSinkLock);
  if (qwtTestSinkBlockNum > 0) {
    *pLen = rand() % 100 + 1;
    qwtTestSinkBlockNum--;
  } else {
    *pLen = 0;
  }
D
dapan1121 已提交
348
  qwtTestSinkLastLen = *pLen;
D
dapan1121 已提交
349 350 351 352 353
  taosWUnLockLatch(&qwtTestSinkLock);

  *pQueryEnd = qwtTestSinkQueryEnd;

  atomic_sub_fetch_32(&in, 1);
D
dapan1121 已提交
354 355 356
}

int32_t qwtGetDataBlock(DataSinkHandle handle, SOutputData* pOutput) {
D
dapan1121 已提交
357
  taosWLockLatch(&qwtTestSinkLock);
D
dapan1121 已提交
358
  if (qwtTestSinkLastLen > 0) {
D
dapan1121 已提交
359 360
    pOutput->numOfRows = rand() % 10 + 1;
    pOutput->compressed = 1;
D
dapan1121 已提交
361 362 363 364 365 366 367 368 369 370 371 372 373 374
    pOutput->queryEnd = qwtTestSinkQueryEnd;
    if (qwtTestSinkBlockNum == 0) {
      pOutput->bufStatus = DS_BUF_EMPTY;
    } else if (qwtTestSinkBlockNum <= qwtTestSinkMaxBlockNum*0.5) {
      pOutput->bufStatus = DS_BUF_LOW;
    } else {
      pOutput->bufStatus = DS_BUF_FULL;
    }
    pOutput->useconds = rand() % 10 + 1;
    pOutput->precision = 1;
  } else if (qwtTestSinkLastLen == 0) {
    pOutput->numOfRows = 0;
    pOutput->compressed = 1;
    pOutput->pData = NULL;
D
dapan1121 已提交
375 376 377 378 379 380 381 382 383 384 385 386 387 388 389
    pOutput->queryEnd = qwtTestSinkQueryEnd;
    if (qwtTestSinkBlockNum == 0) {
      pOutput->bufStatus = DS_BUF_EMPTY;
    } else if (qwtTestSinkBlockNum <= qwtTestSinkMaxBlockNum*0.5) {
      pOutput->bufStatus = DS_BUF_LOW;
    } else {
      pOutput->bufStatus = DS_BUF_FULL;
    }
    pOutput->useconds = rand() % 10 + 1;
    pOutput->precision = 1;
  } else {
    assert(0);
  }
  taosWUnLockLatch(&qwtTestSinkLock);
  
D
dapan1121 已提交
390 391 392 393 394 395 396
  return 0;
}

void qwtDestroyDataSinker(DataSinkHandle handle) {

}

D
dapan1121 已提交
397

D
dapan1121 已提交
398

D
dapan 已提交
399
void stubSetStringToPlan() {
D
dapan1121 已提交
400
  static Stub stub;
D
dapan 已提交
401
  stub.set(qStringToSubplan, qwtStringToPlan);
D
dapan1121 已提交
402
  {
D
dapan 已提交
403
    AddrAny any("libplanner.so");
D
dapan1121 已提交
404
    std::map<std::string,void*> result;
D
dapan 已提交
405
    any.get_global_func_addr_dynsym("^qStringToSubplan$", result);
D
dapan1121 已提交
406
    for (const auto& f : result) {
D
dapan 已提交
407
      stub.set(f.second, qwtStringToPlan);
D
dapan1121 已提交
408 409 410 411
    }
  }
}

D
dapan1121 已提交
412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518
void stubSetExecTask() {
  static Stub stub;
  stub.set(qExecTask, qwtExecTask);
  {
    AddrAny any("libexecutor.so");
    std::map<std::string,void*> result;
    any.get_global_func_addr_dynsym("^qExecTask$", result);
    for (const auto& f : result) {
      stub.set(f.second, qwtExecTask);
    }
  }
}



void stubSetCreateExecTask() {
  static Stub stub;
  stub.set(qCreateExecTask, qwtCreateExecTask);
  {
    AddrAny any("libexecutor.so");
    std::map<std::string,void*> result;
    any.get_global_func_addr_dynsym("^qCreateExecTask$", result);
    for (const auto& f : result) {
      stub.set(f.second, qwtCreateExecTask);
    }
  }
}

void stubSetAsyncKillTask() {
  static Stub stub;
  stub.set(qAsyncKillTask, qwtKillTask);
  {
    AddrAny any("libexecutor.so");
    std::map<std::string,void*> result;
    any.get_global_func_addr_dynsym("^qAsyncKillTask$", result);
    for (const auto& f : result) {
      stub.set(f.second, qwtKillTask);
    }
  }
}

void stubSetDestroyTask() {
  static Stub stub;
  stub.set(qDestroyTask, qwtDestroyTask);
  {
    AddrAny any("libexecutor.so");
    std::map<std::string,void*> result;
    any.get_global_func_addr_dynsym("^qDestroyTask$", result);
    for (const auto& f : result) {
      stub.set(f.second, qwtDestroyTask);
    }
  }
}


void stubSetDestroyDataSinker() {
  static Stub stub;
  stub.set(dsDestroyDataSinker, qwtDestroyDataSinker);
  {
    AddrAny any("libexecutor.so");
    std::map<std::string,void*> result;
    any.get_global_func_addr_dynsym("^dsDestroyDataSinker$", result);
    for (const auto& f : result) {
      stub.set(f.second, qwtDestroyDataSinker);
    }
  }
}

void stubSetGetDataLength() {
  static Stub stub;
  stub.set(dsGetDataLength, qwtGetDataLength);
  {
    AddrAny any("libexecutor.so");
    std::map<std::string,void*> result;
    any.get_global_func_addr_dynsym("^dsGetDataLength$", result);
    for (const auto& f : result) {
      stub.set(f.second, qwtGetDataLength);
    }
  }
}

void stubSetEndPut() {
  static Stub stub;
  stub.set(dsEndPut, qwtEndPut);
  {
    AddrAny any("libexecutor.so");
    std::map<std::string,void*> result;
    any.get_global_func_addr_dynsym("^dsEndPut$", result);
    for (const auto& f : result) {
      stub.set(f.second, qwtEndPut);
    }
  }
}

void stubSetPutDataBlock() {
  static Stub stub;
  stub.set(dsPutDataBlock, qwtPutDataBlock);
  {
    AddrAny any("libexecutor.so");
    std::map<std::string,void*> result;
    any.get_global_func_addr_dynsym("^dsPutDataBlock$", result);
    for (const auto& f : result) {
      stub.set(f.second, qwtPutDataBlock);
    }
  }
}

D
dapan1121 已提交
519 520 521 522
void stubSetRpcSendResponse() {
  static Stub stub;
  stub.set(rpcSendResponse, qwtRpcSendResponse);
  {
D
dapan1121 已提交
523
    AddrAny any("libtransport.so");
D
dapan1121 已提交
524 525 526 527 528 529 530 531
    std::map<std::string,void*> result;
    any.get_global_func_addr_dynsym("^rpcSendResponse$", result);
    for (const auto& f : result) {
      stub.set(f.second, qwtRpcSendResponse);
    }
  }
}

D
dapan1121 已提交
532 533 534 535 536 537 538 539 540 541 542 543 544 545
void stubSetGetDataBlock() {
  static Stub stub;
  stub.set(dsGetDataBlock, qwtGetDataBlock);
  {
    AddrAny any("libtransport.so");
    std::map<std::string,void*> result;
    any.get_global_func_addr_dynsym("^dsGetDataBlock$", result);
    for (const auto& f : result) {
      stub.set(f.second, qwtGetDataBlock);
    }
  }
}


D
dapan1121 已提交
546 547 548 549 550 551 552
void *queryThread(void *param) {
  SRpcMsg queryRpc = {0};
  int32_t code = 0;
  uint32_t n = 0;
  void *mockPointer = (void *)0x1;    
  void *mgmt = param;

D
dapan1121 已提交
553 554 555 556 557 558 559
  while (!qwtTestStop) {
    qwtBuildQueryReqMsg(&queryRpc);
    qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);    
    if (qwtTestEnableSleep) {
      usleep(rand()%5);
    }
    if (++n % qwtTestPrintNum == 0) {
D
dapan1121 已提交
560 561 562 563 564 565 566 567 568 569 570 571 572
      printf("query:%d\n", n);
    }
  }

  return NULL;
}

void *readyThread(void *param) {
  SRpcMsg readyRpc = {0};
  int32_t code = 0;
  uint32_t n = 0;  
  void *mockPointer = (void *)0x1;    
  void *mgmt = param;
S
Shengliang Guan 已提交
573
  SResReadyReq readyMsg = {0};
D
dapan1121 已提交
574

D
dapan1121 已提交
575 576
  while (!qwtTestStop) {
    qwtBuildReadyReqMsg(&readyMsg, &readyRpc);
D
dapan1121 已提交
577
    code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc);
D
dapan1121 已提交
578 579 580 581
    if (qwtTestEnableSleep) {
      usleep(rand()%5);
    }
    if (++n % qwtTestPrintNum == 0) {
D
dapan1121 已提交
582 583 584 585 586 587 588 589 590 591 592 593 594
      printf("ready:%d\n", n);
    }    
  }

  return NULL;
}

void *fetchThread(void *param) {
  SRpcMsg fetchRpc = {0};
  int32_t code = 0;
  uint32_t n = 0;  
  void *mockPointer = (void *)0x1;    
  void *mgmt = param;
S
Shengliang Guan 已提交
595
  SResFetchReq fetchMsg = {0};
D
dapan1121 已提交
596

D
dapan1121 已提交
597 598
  while (!qwtTestStop) {
    qwtBuildFetchReqMsg(&fetchMsg, &fetchRpc);
D
dapan1121 已提交
599
    code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc);
D
dapan1121 已提交
600 601 602 603
    if (qwtTestEnableSleep) {
      usleep(rand()%5);
    }
    if (++n % qwtTestPrintNum == 0) {
D
dapan1121 已提交
604 605 606 607 608 609 610 611 612 613 614 615 616
      printf("fetch:%d\n", n);
    }    
  }

  return NULL;
}

void *dropThread(void *param) {
  SRpcMsg dropRpc = {0};
  int32_t code = 0;
  uint32_t n = 0;  
  void *mockPointer = (void *)0x1;    
  void *mgmt = param;
S
Shengliang Guan 已提交
617
  STaskDropReq dropMsg = {0};  
D
dapan1121 已提交
618

D
dapan1121 已提交
619 620
  while (!qwtTestStop) {
    qwtBuildDropReqMsg(&dropMsg, &dropRpc);
D
dapan1121 已提交
621
    code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc);
D
dapan1121 已提交
622 623 624 625
    if (qwtTestEnableSleep) {
      usleep(rand()%5);
    }
    if (++n % qwtTestPrintNum == 0) {
D
dapan1121 已提交
626 627 628 629 630 631 632 633 634 635 636 637 638
      printf("drop:%d\n", n);
    }    
  }

  return NULL;
}

void *statusThread(void *param) {
  SRpcMsg statusRpc = {0};
  int32_t code = 0;
  uint32_t n = 0;  
  void *mockPointer = (void *)0x1;    
  void *mgmt = param;
S
Shengliang Guan 已提交
639
  SSchTasksStatusReq statusMsg = {0};
D
dapan1121 已提交
640

D
dapan1121 已提交
641 642
  while (!qwtTestStop) {
    qwtBuildStatusReqMsg(&statusMsg, &statusRpc);
D
dapan1121 已提交
643
    code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
D
dapan1121 已提交
644 645 646 647
    if (qwtTestEnableSleep) {
      usleep(rand()%5);
    }
    if (++n % qwtTestPrintNum == 0) {
D
dapan1121 已提交
648 649 650 651 652 653 654 655
      printf("status:%d\n", n);
    }    
  }

  return NULL;
}


D
dapan1121 已提交
656
void *clientThread(void *param) {
D
dapan1121 已提交
657 658 659
  int32_t code = 0;
  uint32_t n = 0;
  void *mgmt = param;
D
dapan1121 已提交
660 661 662 663
  void *mockPointer = (void *)0x1;    
  SRpcMsg queryRpc = {0};

  sleep(1);
D
dapan1121 已提交
664 665

  while (!qwtTestStop) {
D
dapan1121 已提交
666 667
    qwtTestCaseFinished = false;
    
D
dapan1121 已提交
668
    qwtBuildQueryReqMsg(&queryRpc);
D
dapan 已提交
669
    qwtPutReqToQueue((void *)0x1, &queryRpc);
D
dapan1121 已提交
670 671 672 673 674

    while (!qwtTestCaseFinished) {
      usleep(1);
    }
    
D
dapan1121 已提交
675 676 677
    if (qwtTestEnableSleep) {
      usleep(rand()%5);
    }
D
dapan1121 已提交
678
    
D
dapan1121 已提交
679 680 681 682 683 684 685
    if (++n % qwtTestPrintNum == 0) {
      printf("query:%d\n", n);
    }
  }

  return NULL;
}
D
dapan1121 已提交
686

D
dapan1121 已提交
687
void *queryQueueThread(void *param) {
D
dapan1121 已提交
688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710
  void *mockPointer = (void *)0x1;   
  SRpcMsg *queryRpc = NULL;
  void *mgmt = param;

  while (!qwtTestStop) {
    tsem_wait(&qwtTestQuerySem);

    taosWLockLatch(&qwtTestQueryQueueLock);
    if (qwtTestQueryQueueNum <= 0 || qwtTestQueryQueueRIdx == qwtTestQueryQueueWIdx) {
      printf("query queue is empty\n");
      assert(0);
    }
    
    queryRpc = qwtTestQueryQueue[qwtTestQueryQueueRIdx++];
    
    if (qwtTestQueryQueueRIdx >= qwtTestQueryQueueSize) {
      qwtTestQueryQueueRIdx = 0;
    }
    
    qwtTestQueryQueueNum--;
    taosWUnLockLatch(&qwtTestQueryQueueLock);

    if (TDMT_VND_QUERY == queryRpc->msgType) {
D
dapan 已提交
711
      qWorkerProcessQueryMsg(mockPointer, mgmt, queryRpc);
D
dapan1121 已提交
712
    } else if (TDMT_VND_QUERY_CONTINUE == queryRpc->msgType) {
D
dapan 已提交
713
      qWorkerProcessCQueryMsg(mockPointer, mgmt, queryRpc);
D
dapan1121 已提交
714 715 716 717 718
    } else {
      printf("unknown msg in query queue, type:%d\n", queryRpc->msgType);
      assert(0);
    }
  }
D
dapan1121 已提交
719

D
dapan 已提交
720
  return NULL;
D
dapan1121 已提交
721 722 723
}

void *fetchQueueThread(void *param) {
D
dapan1121 已提交
724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761
  void *mockPointer = (void *)0x1;   
  SRpcMsg *fetchRpc = NULL;
  void *mgmt = param;

  while (!qwtTestStop) {
    tsem_wait(&qwtTestFetchSem);

    taosWLockLatch(&qwtTestFetchQueueLock);
    if (qwtTestFetchQueueNum <= 0 || qwtTestFetchQueueRIdx == qwtTestFetchQueueWIdx) {
      printf("Fetch queue is empty\n");
      assert(0);
    }
    
    fetchRpc = qwtTestFetchQueue[qwtTestFetchQueueRIdx++];
    
    if (qwtTestFetchQueueRIdx >= qwtTestFetchQueueSize) {
      qwtTestFetchQueueRIdx = 0;
    }
    
    qwtTestFetchQueueNum--;
    taosWUnLockLatch(&qwtTestFetchQueueLock);

    switch (fetchRpc->msgType) {
      case TDMT_VND_FETCH:
        qWorkerProcessFetchMsg(mockPointer, mgmt, fetchRpc);
      case TDMT_VND_RES_READY:
        qWorkerProcessReadyMsg(mockPointer, mgmt, fetchRpc);
      case TDMT_VND_TASKS_STATUS:
        qWorkerProcessStatusMsg(mockPointer, mgmt, fetchRpc);
      case TDMT_VND_CANCEL_TASK:
        qWorkerProcessCancelMsg(mockPointer, mgmt, fetchRpc);
      case TDMT_VND_DROP_TASK:
        qWorkerProcessDropMsg(mockPointer, mgmt, fetchRpc);
      default:
        printf("unknown msg type:%d in fetch queue", fetchRpc->msgType);
        assert(0);
    }
  }
D
dapan1121 已提交
762

D
dapan 已提交
763
  return NULL;
D
dapan1121 已提交
764 765
}

D
dapan1121 已提交
766

D
dapan1121 已提交
767 768 769 770

}


D
dapan1121 已提交
771
TEST(seqTest, normalCase) {
D
dapan 已提交
772 773 774 775 776 777 778
  void *mgmt = NULL;
  int32_t code = 0;
  void *mockPointer = (void *)0x1;
  SRpcMsg queryRpc = {0};
  SRpcMsg readyRpc = {0};
  SRpcMsg fetchRpc = {0};
  SRpcMsg dropRpc = {0};
D
dapan1121 已提交
779
  SRpcMsg statusRpc = {0};
D
dapan1121 已提交
780 781

  qwtInitLogFile();
D
dapan 已提交
782

D
dapan1121 已提交
783 784 785 786
  qwtBuildQueryReqMsg(&queryRpc);
  qwtBuildReadyReqMsg(&qwtreadyMsg, &readyRpc);
  qwtBuildFetchReqMsg(&qwtfetchMsg, &fetchRpc);
  qwtBuildDropReqMsg(&qwtdropMsg, &dropRpc);
D
dapan1121 已提交
787
  
D
dapan 已提交
788
  stubSetStringToPlan();
D
dapan1121 已提交
789
  stubSetRpcSendResponse();
D
dapan1121 已提交
790 791 792 793 794 795 796 797 798
  stubSetExecTask();
  stubSetCreateExecTask();
  stubSetAsyncKillTask();
  stubSetDestroyTask();
  stubSetDestroyDataSinker();
  stubSetGetDataLength();
  stubSetEndPut();
  stubSetPutDataBlock();
  stubSetGetDataBlock();
D
dapan 已提交
799
  
D
dapan1121 已提交
800
  code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue);
D
dapan 已提交
801 802
  ASSERT_EQ(code, 0);

D
dapan1121 已提交
803
  qwtBuildStatusReqMsg(&qwtstatusMsg, &statusRpc);
D
dapan1121 已提交
804 805 806
  code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
  ASSERT_EQ(code, 0);

D
dapan 已提交
807 808 809
  code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);
  ASSERT_EQ(code, 0);

D
dapan1121 已提交
810
  qwtBuildStatusReqMsg(&qwtstatusMsg, &statusRpc);
D
dapan1121 已提交
811 812 813
  code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
  ASSERT_EQ(code, 0);

D
dapan 已提交
814 815 816
  code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc);
  ASSERT_EQ(code, 0);

D
dapan1121 已提交
817
  qwtBuildStatusReqMsg(&qwtstatusMsg, &statusRpc);
D
dapan1121 已提交
818 819 820
  code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
  ASSERT_EQ(code, 0);

D
dapan 已提交
821 822 823
  code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc);
  ASSERT_EQ(code, 0);

D
dapan1121 已提交
824
  qwtBuildStatusReqMsg(&qwtstatusMsg, &statusRpc);
D
dapan1121 已提交
825 826 827 828 829 830
  code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
  ASSERT_EQ(code, 0);

  code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc);
  ASSERT_EQ(code, 0);

D
dapan1121 已提交
831
  qwtBuildStatusReqMsg(&qwtstatusMsg, &statusRpc);
D
dapan1121 已提交
832 833 834 835 836 837 838 839 840 841 842 843 844
  code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
  ASSERT_EQ(code, 0);

  qWorkerDestroy(&mgmt);
}

TEST(seqTest, cancelFirst) {
  void *mgmt = NULL;
  int32_t code = 0;
  void *mockPointer = (void *)0x1;
  SRpcMsg queryRpc = {0};
  SRpcMsg dropRpc = {0};
  SRpcMsg statusRpc = {0};
D
dapan1121 已提交
845 846

  qwtInitLogFile();
D
dapan1121 已提交
847
  
D
dapan1121 已提交
848 849 850
  qwtBuildQueryReqMsg(&queryRpc);
  qwtBuildDropReqMsg(&qwtdropMsg, &dropRpc);
  qwtBuildStatusReqMsg(&qwtstatusMsg, &statusRpc);
D
dapan1121 已提交
851 852 853 854

  stubSetStringToPlan();
  stubSetRpcSendResponse();
  
D
dapan1121 已提交
855
  code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue);
D
dapan1121 已提交
856 857
  ASSERT_EQ(code, 0);

D
dapan1121 已提交
858
  qwtBuildStatusReqMsg(&qwtstatusMsg, &statusRpc);
D
dapan1121 已提交
859 860 861
  code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
  ASSERT_EQ(code, 0);

D
dapan 已提交
862 863
  code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc);
  ASSERT_EQ(code, 0);
D
dapan1121 已提交
864

D
dapan1121 已提交
865
  qwtBuildStatusReqMsg(&qwtstatusMsg, &statusRpc);
D
dapan1121 已提交
866 867 868 869
  code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
  ASSERT_EQ(code, 0);

  code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);
D
dapan1121 已提交
870
  ASSERT_TRUE(0 != code);
D
dapan1121 已提交
871

D
dapan1121 已提交
872
  qwtBuildStatusReqMsg(&qwtstatusMsg, &statusRpc);
D
dapan1121 已提交
873 874 875 876 877 878 879 880 881 882 883 884 885 886 887
  code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
  ASSERT_EQ(code, 0);

  qWorkerDestroy(&mgmt);
}

TEST(seqTest, randCase) {
  void *mgmt = NULL;
  int32_t code = 0;
  void *mockPointer = (void *)0x1;
  SRpcMsg queryRpc = {0};
  SRpcMsg readyRpc = {0};
  SRpcMsg fetchRpc = {0};
  SRpcMsg dropRpc = {0};
  SRpcMsg statusRpc = {0};
S
Shengliang Guan 已提交
888 889 890 891
  SResReadyReq readyMsg = {0};
  SResFetchReq fetchMsg = {0};
  STaskDropReq dropMsg = {0};  
  SSchTasksStatusReq statusMsg = {0};
D
dapan1121 已提交
892 893

  qwtInitLogFile();
D
dapan1121 已提交
894 895 896
  
  stubSetStringToPlan();
  stubSetRpcSendResponse();
D
dapan1121 已提交
897
  stubSetCreateExecTask();
D
dapan1121 已提交
898 899 900

  srand(time(NULL));
  
D
dapan1121 已提交
901
  code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue);
D
dapan1121 已提交
902 903 904 905 906 907 908 909
  ASSERT_EQ(code, 0);

  int32_t t = 0;
  int32_t maxr = 10001;
  while (true) {
    int32_t r = rand() % maxr;
    
    if (r >= 0 && r < maxr/5) {
D
dapan1121 已提交
910 911
      printf("Query,%d\n", t++);      
      qwtBuildQueryReqMsg(&queryRpc);
D
dapan1121 已提交
912 913 914
      code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);
    } else if (r >= maxr/5 && r < maxr * 2/5) {
      printf("Ready,%d\n", t++);
D
dapan1121 已提交
915
      qwtBuildReadyReqMsg(&readyMsg, &readyRpc);
D
dapan1121 已提交
916 917 918
      code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc);
    } else if (r >= maxr * 2/5 && r < maxr* 3/5) {
      printf("Fetch,%d\n", t++);
D
dapan1121 已提交
919
      qwtBuildFetchReqMsg(&fetchMsg, &fetchRpc);
D
dapan1121 已提交
920 921 922
      code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc);
    } else if (r >= maxr * 3/5 && r < maxr * 4/5) {
      printf("Drop,%d\n", t++);
D
dapan1121 已提交
923
      qwtBuildDropReqMsg(&dropMsg, &dropRpc);
D
dapan1121 已提交
924 925 926
      code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc);
    } else if (r >= maxr * 4/5 && r < maxr-1) {
      printf("Status,%d\n", t++);
D
dapan1121 已提交
927
      qwtBuildStatusReqMsg(&statusMsg, &statusRpc);
D
dapan1121 已提交
928 929 930 931 932 933 934 935 936 937 938 939 940 941 942
      code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
      ASSERT_EQ(code, 0);
    } else {
      printf("QUIT RAND NOW");
      break;
    }
  }

  qWorkerDestroy(&mgmt);
}

TEST(seqTest, multithreadRand) {
  void *mgmt = NULL;
  int32_t code = 0;
  void *mockPointer = (void *)0x1;
D
dapan1121 已提交
943 944

  qwtInitLogFile();
D
dapan1121 已提交
945 946 947 948 949 950
  
  stubSetStringToPlan();
  stubSetRpcSendResponse();

  srand(time(NULL));
  
D
dapan1121 已提交
951
  code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue);
D
dapan1121 已提交
952 953 954 955 956 957 958 959 960 961 962 963
  ASSERT_EQ(code, 0);

  pthread_attr_t thattr;
  pthread_attr_init(&thattr);

  pthread_t t1,t2,t3,t4,t5;
  pthread_create(&(t1), &thattr, queryThread, mgmt);
  pthread_create(&(t2), &thattr, readyThread, NULL);
  pthread_create(&(t3), &thattr, fetchThread, NULL);
  pthread_create(&(t4), &thattr, dropThread, NULL);
  pthread_create(&(t5), &thattr, statusThread, NULL);

D
dapan1121 已提交
964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002
  while (true) {
    if (qwtTestDeadLoop) {
      sleep(1);
    } else {
      sleep(qwtTestMTRunSec);
      break;
    }
  }
  
  qwtTestStop = true;
  sleep(3);
  
  qWorkerDestroy(&mgmt);
}

TEST(rcTest, multithread) {
  void *mgmt = NULL;
  int32_t code = 0;
  void *mockPointer = (void *)0x1;

  qwtInitLogFile();
  
  stubSetStringToPlan();
  stubSetRpcSendResponse();
  stubSetExecTask();
  stubSetCreateExecTask();
  stubSetAsyncKillTask();
  stubSetDestroyTask();
  stubSetDestroyDataSinker();
  stubSetGetDataLength();
  stubSetEndPut();
  stubSetPutDataBlock();
  stubSetGetDataBlock();

  srand(time(NULL));
  
  code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue);
  ASSERT_EQ(code, 0);

D
dapan1121 已提交
1003 1004 1005
  tsem_init(&qwtTestQuerySem, 0, 0);
  tsem_init(&qwtTestFetchSem, 0, 0);

D
dapan1121 已提交
1006 1007 1008 1009
  pthread_attr_t thattr;
  pthread_attr_init(&thattr);

  pthread_t t1,t2,t3,t4,t5;
D
dapan1121 已提交
1010 1011 1012
  pthread_create(&(t1), &thattr, clientThread, mgmt);
  pthread_create(&(t2), &thattr, queryQueueThread, mgmt);
  pthread_create(&(t3), &thattr, fetchQueueThread, mgmt);
D
dapan1121 已提交
1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024

  while (true) {
    if (qwtTestDeadLoop) {
      sleep(1);
    } else {
      sleep(qwtTestMTRunSec);
      break;
    }
  }
  
  qwtTestStop = true;
  sleep(3);
D
dapan1121 已提交
1025
  
D
dapan1121 已提交
1026
  qWorkerDestroy(&mgmt);
D
dapan 已提交
1027 1028 1029
}


D
dapan1121 已提交
1030

D
dapan1121 已提交
1031
int main(int argc, char** argv) {
D
dapan1121 已提交
1032
  srand(time(NULL));
D
dapan1121 已提交
1033 1034 1035 1036 1037 1038
  testing::InitGoogleTest(&argc, argv);
  return RUN_ALL_TESTS();
}