executor.c 22.5 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14 15 16
 */

#include "executor.h"
17
#include "tref.h"
H
Haojun Liao 已提交
18
#include "executorimpl.h"
19
#include "planner.h"
L
Liu Jicong 已提交
20
#include "tdatablock.h"
L
Liu Jicong 已提交
21
#include "vnode.h"
22 23 24 25 26 27 28 29 30 31
#include "tudf.h"

static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT;
int32_t             exchangeObjRefPool = -1;

static void initRefPool() { exchangeObjRefPool = taosOpenRef(1024, doDestroyExchangeOperatorInfo); }
static void cleanupRefPool() {
  int32_t ref = atomic_val_compare_exchange_32(&exchangeObjRefPool, exchangeObjRefPool, 0);
  taosCloseRef(ref);
}
32

33 34
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, bool assignUid,
                                char* id) {
H
Haojun Liao 已提交
35
  ASSERT(pOperator != NULL);
X
Xiaoyu Wang 已提交
36
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
H
Haojun Liao 已提交
37
    if (pOperator->numOfDownstream == 0) {
38
      qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
H
Haojun Liao 已提交
39 40
      return TSDB_CODE_QRY_APP_ERROR;
    }
H
Haojun Liao 已提交
41

H
Haojun Liao 已提交
42
    if (pOperator->numOfDownstream > 1) {  // not handle this in join query
43
      qError("join not supported for stream block scan, %s" PRIx64, id);
H
Haojun Liao 已提交
44
      return TSDB_CODE_QRY_APP_ERROR;
H
Haojun Liao 已提交
45
    }
L
Liu Jicong 已提交
46
    pOperator->status = OP_NOT_OPENED;
47
    return doSetStreamBlock(pOperator->pDownstream[0], input, numOfBlocks, type, assignUid, id);
H
Haojun Liao 已提交
48
  } else {
49 50
    pOperator->status = OP_NOT_OPENED;

51
    SStreamScanInfo* pInfo = pOperator->info;
52
    pInfo->assignBlockUid = assignUid;
53

54 55
    // TODO: if a block was set but not consumed,
    // prevent setting a different type of block
56 57
    pInfo->validBlockIndex = 0;
    taosArrayClear(pInfo->pBlockLists);
58

L
Liu Jicong 已提交
59 60 61 62 63
    if (type == STREAM_INPUT__MERGED_SUBMIT) {
      ASSERT(numOfBlocks > 1);
      for (int32_t i = 0; i < numOfBlocks; i++) {
        SSubmitReq* pReq = *(void**)POINTER_SHIFT(input, i * sizeof(void*));
        taosArrayPush(pInfo->pBlockLists, &pReq);
64
      }
L
Liu Jicong 已提交
65 66
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
    } else if (type == STREAM_INPUT__DATA_SUBMIT) {
67 68 69 70
      /*if (tqReaderSetDataMsg(pInfo->tqReader, input, 0) < 0) {*/
      /*qError("submit msg messed up when initing stream block, %s" PRIx64, id);*/
      /*return TSDB_CODE_QRY_APP_ERROR;*/
      /*}*/
L
Liu Jicong 已提交
71 72 73 74 75 76
      ASSERT(numOfBlocks == 1);
      /*if (numOfBlocks == 1) {*/
      taosArrayPush(pInfo->pBlockLists, &input);
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
      /*} else {*/
      /*}*/
L
Liu Jicong 已提交
77
    } else if (type == STREAM_INPUT__DATA_BLOCK) {
H
Haojun Liao 已提交
78
      for (int32_t i = 0; i < numOfBlocks; ++i) {
L
Liu Jicong 已提交
79
        SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
80

81
        // TODO optimize
82
        SSDataBlock* p = createOneDataBlock(pDataBlock, false);
H
Haojun Liao 已提交
83
        p->info = pDataBlock->info;
84

L
Liu Jicong 已提交
85
        taosArrayClear(p->pDataBlock);
H
Haojun Liao 已提交
86 87 88
        taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock);
        taosArrayPush(pInfo->pBlockLists, &p);
      }
L
Liu Jicong 已提交
89
      pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
90 91
    } else {
      ASSERT(0);
92 93
    }

H
Haojun Liao 已提交
94 95 96 97
    return TSDB_CODE_SUCCESS;
  }
}

L
Liu Jicong 已提交
98
#if 0
L
Liu Jicong 已提交
99 100 101 102 103
int32_t qStreamScanSnapshot(qTaskInfo_t tinfo) {
  if (tinfo == NULL) {
    return TSDB_CODE_QRY_APP_ERROR;
  }
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
L
Liu Jicong 已提交
104
  return doSetStreamBlock(pTaskInfo->pRoot, NULL, 0, STREAM_INPUT__TABLE_SCAN, 0, NULL);
L
Liu Jicong 已提交
105
}
L
Liu Jicong 已提交
106
#endif
L
Liu Jicong 已提交
107

108 109
int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool assignUid) {
  return qSetMultiStreamInput(tinfo, input, 1, type, assignUid);
H
Haojun Liao 已提交
110 111
}

112
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type, bool assignUid) {
H
Haojun Liao 已提交
113 114 115 116
  if (tinfo == NULL) {
    return TSDB_CODE_QRY_APP_ERROR;
  }

H
Haojun Liao 已提交
117
  if (pBlocks == NULL || numOfBlocks == 0) {
H
Haojun Liao 已提交
118 119 120
    return TSDB_CODE_SUCCESS;
  }

L
Liu Jicong 已提交
121
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
H
Haojun Liao 已提交
122

123 124
  int32_t code =
      doSetStreamBlock(pTaskInfo->pRoot, (void**)pBlocks, numOfBlocks, type, assignUid, GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
125
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
126
    qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
127
  } else {
H
Haojun Liao 已提交
128
    qDebug("%s set the stream block successfully", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
129 130 131 132 133
  }

  return code;
}

H
Haojun Liao 已提交
134
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchema) {
L
Liu Jicong 已提交
135 136 137 138 139
  if (msg == NULL) {
    // TODO create raw scan
    return NULL;
  }

H
Haojun Liao 已提交
140 141
  struct SSubplan* pPlan = NULL;
  int32_t          code = qStringToSubplan(msg, &pPlan);
L
Liu Jicong 已提交
142 143 144 145 146 147
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    return NULL;
  }

  qTaskInfo_t pTaskInfo = NULL;
H
Haojun Liao 已提交
148
  code = qCreateExecTask(readers, 0, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_QUEUE);
L
Liu Jicong 已提交
149
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
150 151
    nodesDestroyNode((SNode*)pPlan);
    qDestroyTask(pTaskInfo);
L
Liu Jicong 已提交
152 153 154 155
    terrno = code;
    return NULL;
  }

156
  // extract the number of output columns
H
Haojun Liao 已提交
157
  SDataBlockDescNode* pDescNode = pPlan->pNode->pOutputDataBlockDesc;
158 159
  *numOfCols = 0;

L
Liu Jicong 已提交
160
  SNode* pNode;
161 162 163 164 165 166 167
  FOREACH(pNode, pDescNode->pSlots) {
    SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
    if (pSlotDesc->output) {
      ++(*numOfCols);
    }
  }

H
Haojun Liao 已提交
168
  *pSchema = tCloneSSchemaWrapper(((SExecTaskInfo*)pTaskInfo)->schemaInfo.qsw);
L
Liu Jicong 已提交
169 170 171
  return pTaskInfo;
}

L
Liu Jicong 已提交
172
qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers) {
L
Liu Jicong 已提交
173
  if (msg == NULL) {
174 175 176
    return NULL;
  }

177
  /*qDebugL("stream task string %s", (const char*)msg);*/
X
bugfix  
Xiaoyu Wang 已提交
178

H
Haojun Liao 已提交
179 180
  struct SSubplan* pPlan = NULL;
  int32_t          code = qStringToSubplan(msg, &pPlan);
181 182 183 184 185 186
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    return NULL;
  }

  qTaskInfo_t pTaskInfo = NULL;
H
Haojun Liao 已提交
187
  code = qCreateExecTask(readers, 0, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM);
188
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
189 190
    nodesDestroyNode((SNode*)pPlan);
    qDestroyTask(pTaskInfo);
191 192 193 194 195 196
    terrno = code;
    return NULL;
  }

  return pTaskInfo;
}
197

198
static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const SArray* tableIdList, const char* idstr) {
199 200 201 202 203 204
  SArray* qa = taosArrayInit(4, sizeof(tb_uid_t));

  // let's discard the tables those are not created according to the queried super table.
  SMetaReader mr = {0};
  metaReaderInit(&mr, pScanInfo->readHandle.meta, 0);
  for (int32_t i = 0; i < taosArrayGetSize(tableIdList); ++i) {
205
    uint64_t* id = (uint64_t*)taosArrayGet(tableIdList, i);
206 207 208

    int32_t code = metaGetTableEntryByUid(&mr, *id);
    if (code != TSDB_CODE_SUCCESS) {
209
      qError("failed to get table meta, uid:%" PRIu64 " code:%s, %s", *id, tstrerror(terrno), idstr);
210 211 212
      continue;
    }

213
    // TODO handle ntb case
L
Liu Jicong 已提交
214
    if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != pScanInfo->tableUid) {
215 216
      continue;
    }
217 218 219

    if (pScanInfo->pTagCond != NULL) {
      bool          qualified = false;
220
      STableKeyInfo info = {.groupId = 0, .uid = mr.me.uid};
H
Haojun Liao 已提交
221
      code = isQualifiedTable(&info, pScanInfo->pTagCond, pScanInfo->readHandle.meta, &qualified);
222 223 224 225 226 227 228 229 230 231
      if (code != TSDB_CODE_SUCCESS) {
        qError("failed to filter new table, uid:0x%" PRIx64 ", %s", info.uid, idstr);
        continue;
      }

      if (!qualified) {
        continue;
      }
    }

232
    // handle multiple partition
233 234 235 236 237 238 239
    taosArrayPush(qa, id);
  }

  metaReaderClear(&mr);
  return qa;
}

240
int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd) {
L
fix  
Liu Jicong 已提交
241
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
242

243
  // traverse to the stream scanner node to add this table id
244
  SOperatorInfo* pInfo = pTaskInfo->pRoot;
L
Liu Jicong 已提交
245
  while (pInfo->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
246 247 248
    pInfo = pInfo->pDownstream[0];
  }

249 250
  int32_t          code = 0;
  SStreamScanInfo* pScanInfo = pInfo->info;
251
  if (isAdd) {  // add new table id
252
    SArray* qa = filterUnqualifiedTables(pScanInfo, tableIdList, GET_TASKID(pTaskInfo));
253

254
    qDebug(" %d qualified child tables added into stream scanner", (int32_t)taosArrayGetSize(qa));
L
Liu Jicong 已提交
255
    code = tqReaderAddTbUidList(pScanInfo->tqReader, qa);
256 257 258 259 260
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    // todo refactor STableList
261 262 263 264 265 266 267 268
    size_t bufLen = (pScanInfo->pGroupTags != NULL)? getTableTagsBufLen(pScanInfo->pGroupTags):0;
    char* keyBuf = NULL;
    if (bufLen > 0) {
      keyBuf = taosMemoryMalloc(bufLen);
      if (keyBuf == NULL) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
    }
269

270 271
    for(int32_t i = 0; i < taosArrayGetSize(qa); ++i) {
      uint64_t*     uid = taosArrayGet(qa, i);
272
      STableKeyInfo keyInfo = {.uid = *uid, .groupId = 0};
273 274

      if (bufLen > 0) {
H
Haojun Liao 已提交
275
        code = getGroupIdFromTagsVal(pScanInfo->readHandle.meta, keyInfo.uid, pScanInfo->pGroupTags, keyBuf,
276 277 278 279 280 281
                                       &keyInfo.groupId);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
      }

282 283 284
      taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &keyInfo);
    }

285 286 287 288
    if (keyBuf != NULL) {
      taosMemoryFree(keyBuf);
    }

289 290 291
    taosArrayDestroy(qa);
  } else {  // remove the table id in current list
    qDebug(" %d remove child tables from the stream scanner", (int32_t)taosArrayGetSize(tableIdList));
L
Liu Jicong 已提交
292
    code = tqReaderRemoveTbUidList(pScanInfo->tqReader, tableIdList);
293 294
  }

295
  return code;
L
fix  
Liu Jicong 已提交
296
}
297

298
int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion,
L
Liu Jicong 已提交
299
                                    int32_t* tversion) {
300
  ASSERT(tinfo != NULL && dbName != NULL && tableName != NULL);
301
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
302

303
  if (pTaskInfo->schemaInfo.sw == NULL) {
H
Haojun Liao 已提交
304 305 306
    return TSDB_CODE_SUCCESS;
  }

307 308 309 310
  *sversion = pTaskInfo->schemaInfo.sw->version;
  *tversion = pTaskInfo->schemaInfo.tversion;
  if (pTaskInfo->schemaInfo.dbname) {
    strcpy(dbName, pTaskInfo->schemaInfo.dbname);
311 312 313
  } else {
    dbName[0] = 0;
  }
314 315
  if (pTaskInfo->schemaInfo.tablename) {
    strcpy(tableName, pTaskInfo->schemaInfo.tablename);
316 317 318
  } else {
    tableName[0] = 0;
  }
319 320

  return 0;
321
}
322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349

int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
                        qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, const char* sql, EOPTR_EXEC_MODEL model) {
  assert(pSubplan != NULL);
  SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;

  taosThreadOnce(&initPoolOnce, initRefPool);
  atexit(cleanupRefPool);

  int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, sql, model);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  SDataSinkMgtCfg cfg = {.maxDataBlockNum = 1000, .maxDataBlockNumPerQuery = 100};
  code = dsDataSinkMgtInit(&cfg);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  if (handle) {
    void* pSinkParam = NULL;
    code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, pTaskInfo, readHandle);
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }

    code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam);
wmmhello's avatar
wmmhello 已提交
350 351 352
    if(code != TSDB_CODE_SUCCESS){
      taosMemoryFreeClear(pSinkParam);
    }
353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421
  }

  _error:
  // if failed to add ref for all tables in this query, abort current query
  return code;
}

#ifdef TEST_IMPL
// wait moment
int waitMoment(SQInfo* pQInfo) {
  if (pQInfo->sql) {
    int   ms = 0;
    char* pcnt = strstr(pQInfo->sql, " count(*)");
    if (pcnt) return 0;

    char* pos = strstr(pQInfo->sql, " t_");
    if (pos) {
      pos += 3;
      ms = atoi(pos);
      while (*pos >= '0' && *pos <= '9') {
        pos++;
      }
      char unit_char = *pos;
      if (unit_char == 'h') {
        ms *= 3600 * 1000;
      } else if (unit_char == 'm') {
        ms *= 60 * 1000;
      } else if (unit_char == 's') {
        ms *= 1000;
      }
    }
    if (ms == 0) return 0;
    printf("test wait sleep %dms. sql=%s ...\n", ms, pQInfo->sql);

    if (ms < 1000) {
      taosMsleep(ms);
    } else {
      int used_ms = 0;
      while (used_ms < ms) {
        taosMsleep(1000);
        used_ms += 1000;
        if (isTaskKilled(pQInfo)) {
          printf("test check query is canceled, sleep break.%s\n", pQInfo->sql);
          break;
        }
      }
    }
  }
  return 1;
}
#endif

int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  int64_t        threadId = taosGetSelfPthreadId();

  *pRes = NULL;
  int64_t curOwner = 0;
  if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
    qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
    pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
    return pTaskInfo->code;
  }

  if (pTaskInfo->cost.start == 0) {
    pTaskInfo->cost.start = taosGetTimestampMs();
  }

  if (isTaskKilled(pTaskInfo)) {
422
    atomic_store_64(&pTaskInfo->owner, 0);
423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467
    qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
    return TSDB_CODE_SUCCESS;
  }

  // error occurs, record the error code and return to client
  int32_t ret = setjmp(pTaskInfo->env);
  if (ret != TSDB_CODE_SUCCESS) {
    pTaskInfo->code = ret;
    cleanUpUdfs();
    qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
    atomic_store_64(&pTaskInfo->owner, 0);

    return pTaskInfo->code;
  }

  qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));

  int64_t st = taosGetTimestampUs();

  *pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot);
  uint64_t el = (taosGetTimestampUs() - st);

  pTaskInfo->cost.elapsedTime += el;
  if (NULL == *pRes) {
    *useconds = pTaskInfo->cost.elapsedTime;
  }

  cleanUpUdfs();

  int32_t  current = (*pRes != NULL) ? (*pRes)->info.rows : 0;
  uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;

  qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
         GET_TASKID(pTaskInfo), current, total, 0, el / 1000.0);

  atomic_store_64(&pTaskInfo->owner, 0);
  return pTaskInfo->code;
}

int32_t qKillTask(qTaskInfo_t qinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
  if (pTaskInfo == NULL) {
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

H
Haojun Liao 已提交
468
  qAsyncKillTask(qinfo);
469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608

  // Wait for the query executing thread being stopped/
  // Once the query is stopped, the owner of qHandle will be cleared immediately.
  while (pTaskInfo->owner != 0) {
    taosMsleep(100);
  }

  return TSDB_CODE_SUCCESS;
}

int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;

  if (pTaskInfo == NULL) {
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

  qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
  setTaskKilled(pTaskInfo);
  return TSDB_CODE_SUCCESS;
}

void qDestroyTask(qTaskInfo_t qTaskHandle) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qTaskHandle;
  if (pTaskInfo == NULL) {
    return;
  }

  qDebug("%s execTask completed, numOfRows:%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->pRoot->resultInfo.totalRows);

  queryCostStatis(pTaskInfo);  // print the query cost summary
  doDestroyTask(pTaskInfo);
}

int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t* resNum, SExplainExecInfo** pRes) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  int32_t        capacity = 0;

  return getOperatorExplainExecInfo(pTaskInfo->pRoot, pRes, &capacity, resNum);
}

int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) {
  SExecTaskInfo* pTaskInfo = (struct SExecTaskInfo*)tinfo;
  if (pTaskInfo->pRoot == NULL) {
    return TSDB_CODE_INVALID_PARA;
  }

  int32_t nOptrWithVal = 0;
  int32_t code = encodeOperator(pTaskInfo->pRoot, pOutput, len, &nOptrWithVal);
  if ((code == TSDB_CODE_SUCCESS) && (nOptrWithVal = 0)) {
    taosMemoryFreeClear(*pOutput);
    *len = 0;
  }
  return code;
}

int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t len) {
  SExecTaskInfo* pTaskInfo = (struct SExecTaskInfo*)tinfo;

  if (pTaskInfo == NULL || pInput == NULL || len == 0) {
    return TSDB_CODE_INVALID_PARA;
  }

  return decodeOperator(pTaskInfo->pRoot, pInput, len);
}

int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  SOperatorInfo* pOperator = pTaskInfo->pRoot;

  while (1) {
    uint8_t type = pOperator->operatorType;
    if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
      *scanner = pOperator->info;
      return 0;
    } else {
      ASSERT(pOperator->numOfDownstream == 1);
      pOperator = pOperator->pDownstream[0];
    }
  }
}

#if 0
int32_t qStreamInput(qTaskInfo_t tinfo, void* pItem) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
  taosWriteQitem(pTaskInfo->streamInfo.inputQueue->queue, pItem);
  return 0;
}
#endif

int32_t qStreamPrepareRecover(qTaskInfo_t tinfo, int64_t startVer, int64_t endVer) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
  pTaskInfo->streamInfo.recoverStartVer = startVer;
  pTaskInfo->streamInfo.recoverEndVer = endVer;
  pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__PREPARE;
  return 0;
}

void* qExtractReaderFromStreamScanner(void* scanner) {
  SStreamScanInfo* pInfo = scanner;
  return (void*)pInfo->tqReader;
}

const SSchemaWrapper* qExtractSchemaFromStreamScanner(void* scanner) {
  SStreamScanInfo* pInfo = scanner;
  return pInfo->tqReader->pSchemaWrapper;
}

const STqOffset* qExtractStatusFromStreamScanner(void* scanner) {
  SStreamScanInfo* pInfo = scanner;
  return &pInfo->offset;
}

void* qStreamExtractMetaMsg(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
  return pTaskInfo->streamInfo.metaBlk;
}

int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
  memcpy(pOffset, &pTaskInfo->streamInfo.lastStatus, sizeof(STqOffsetVal));
  return 0;
}

int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  SOperatorInfo* pOperator = pTaskInfo->pRoot;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
  pTaskInfo->streamInfo.prepareStatus = *pOffset;
  if (!tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus)) {
    while (1) {
      uint8_t type = pOperator->operatorType;
      pOperator->status = OP_OPENED;
      if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
        SStreamScanInfo* pInfo = pOperator->info;
        if (pOffset->type == TMQ_OFFSET__LOG) {
H
Haojun Liao 已提交
609 610 611
          STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
          tsdbReaderClose(pTSInfo->dataReader);
          pTSInfo->dataReader = NULL;
612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637
#if 0
          if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus) &&
              pInfo->tqReader->pWalReader->curVersion != pOffset->version) {
            qError("prepare scan ver %ld actual ver %ld, last %ld", pOffset->version,
                   pInfo->tqReader->pWalReader->curVersion, pTaskInfo->streamInfo.lastStatus.version);
            ASSERT(0);
          }
#endif
          if (tqSeekVer(pInfo->tqReader, pOffset->version + 1) < 0) {
            return -1;
          }
          ASSERT(pInfo->tqReader->pWalReader->curVersion == pOffset->version + 1);
        } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
          /*pInfo->blockType = STREAM_INPUT__TABLE_SCAN;*/
          int64_t uid = pOffset->uid;
          int64_t ts = pOffset->ts;

          if (uid == 0) {
            if (taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList) != 0) {
              STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0);
              uid = pTableInfo->uid;
              ts = INT64_MIN;
            } else {
              return -1;
            }
          }
H
Haojun Liao 已提交
638

639 640 641 642
          /*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/
          /*pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {*/
          STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
          int32_t         tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
H
Haojun Liao 已提交
643 644 645 646 647 648 649 650 651

#ifndef NDEBUG

          qDebug("switch to next table %ld (cursor %d), %ld rows returned", uid, pTableScanInfo->currentTable,
                 pInfo->pTableScanOp->resultInfo.totalRows);
          pInfo->pTableScanOp->resultInfo.totalRows = 0;
#endif

          bool found = false;
652 653 654 655 656 657 658 659 660 661 662 663
          for (int32_t i = 0; i < tableSz; i++) {
            STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i);
            if (pTableInfo->uid == uid) {
              found = true;
              pTableScanInfo->currentTable = i;
              break;
            }
          }

          // TODO after dropping table, table may be not found
          ASSERT(found);

H
Haojun Liao 已提交
664 665 666 667 668 669 670 671
          if (pTableScanInfo->dataReader == NULL) {
            if (tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &pTableScanInfo->cond,
                               pTaskInfo->tableqinfoList.pTableList, &pTableScanInfo->dataReader, NULL) < 0 ||
                pTableScanInfo->dataReader == NULL) {
              ASSERT(0);
            }
          }

672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695
          tsdbSetTableId(pTableScanInfo->dataReader, uid);
          int64_t oldSkey = pTableScanInfo->cond.twindows.skey;
          pTableScanInfo->cond.twindows.skey = ts + 1;
          tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond);
          pTableScanInfo->cond.twindows.skey = oldSkey;
          pTableScanInfo->scanTimes = 0;

          qDebug("tsdb reader offset seek to uid %ld ts %ld, table cur set to %d , all table num %d", uid, ts,
                 pTableScanInfo->currentTable, tableSz);
          /*}*/

        } else {
          ASSERT(0);
        }
        return 0;
      } else {
        ASSERT(pOperator->numOfDownstream == 1);
        pOperator = pOperator->pDownstream[0];
      }
    }
  }
  return 0;
}

H
Haojun Liao 已提交
696

697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718
#if 0
int32_t qStreamPrepareTsdbScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;

  if (uid == 0) {
    if (taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList) != 0) {
      STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0);
      uid = pTableInfo->uid;
      ts = INT64_MIN;
    }
  }

  return doPrepareScan(pTaskInfo->pRoot, uid, ts);
}

int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;

  return doGetScanStatus(pTaskInfo->pRoot, uid, ts);
}
#endif