executor.c 24.5 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14 15 16
 */

#include "executor.h"
H
Haojun Liao 已提交
17
#include "executorimpl.h"
18
#include "planner.h"
L
Liu Jicong 已提交
19
#include "tdatablock.h"
L
Liu Jicong 已提交
20
#include "tref.h"
21
#include "tudf.h"
L
Liu Jicong 已提交
22
#include "vnode.h"
23 24 25 26 27 28 29 30 31

static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT;
int32_t             exchangeObjRefPool = -1;

static void initRefPool() { exchangeObjRefPool = taosOpenRef(1024, doDestroyExchangeOperatorInfo); }
static void cleanupRefPool() {
  int32_t ref = atomic_val_compare_exchange_32(&exchangeObjRefPool, exchangeObjRefPool, 0);
  taosCloseRef(ref);
}
32

L
Liu Jicong 已提交
33
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) {
H
Haojun Liao 已提交
34
  ASSERT(pOperator != NULL);
X
Xiaoyu Wang 已提交
35
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
H
Haojun Liao 已提交
36
    if (pOperator->numOfDownstream == 0) {
37
      qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
H
Haojun Liao 已提交
38 39
      return TSDB_CODE_QRY_APP_ERROR;
    }
H
Haojun Liao 已提交
40

H
Haojun Liao 已提交
41
    if (pOperator->numOfDownstream > 1) {  // not handle this in join query
42
      qError("join not supported for stream block scan, %s" PRIx64, id);
H
Haojun Liao 已提交
43
      return TSDB_CODE_QRY_APP_ERROR;
H
Haojun Liao 已提交
44
    }
L
Liu Jicong 已提交
45
    pOperator->status = OP_NOT_OPENED;
L
Liu Jicong 已提交
46
    return doSetStreamBlock(pOperator->pDownstream[0], input, numOfBlocks, type, id);
H
Haojun Liao 已提交
47
  } else {
48 49
    pOperator->status = OP_NOT_OPENED;

50
    SStreamScanInfo* pInfo = pOperator->info;
51

52 53
    // TODO: if a block was set but not consumed,
    // prevent setting a different type of block
54 55
    pInfo->validBlockIndex = 0;
    taosArrayClear(pInfo->pBlockLists);
56

L
Liu Jicong 已提交
57 58 59 60 61
    if (type == STREAM_INPUT__MERGED_SUBMIT) {
      ASSERT(numOfBlocks > 1);
      for (int32_t i = 0; i < numOfBlocks; i++) {
        SSubmitReq* pReq = *(void**)POINTER_SHIFT(input, i * sizeof(void*));
        taosArrayPush(pInfo->pBlockLists, &pReq);
62
      }
L
Liu Jicong 已提交
63 64
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
    } else if (type == STREAM_INPUT__DATA_SUBMIT) {
65 66 67 68
      /*if (tqReaderSetDataMsg(pInfo->tqReader, input, 0) < 0) {*/
      /*qError("submit msg messed up when initing stream block, %s" PRIx64, id);*/
      /*return TSDB_CODE_QRY_APP_ERROR;*/
      /*}*/
L
Liu Jicong 已提交
69 70 71 72 73 74
      ASSERT(numOfBlocks == 1);
      /*if (numOfBlocks == 1) {*/
      taosArrayPush(pInfo->pBlockLists, &input);
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
      /*} else {*/
      /*}*/
L
Liu Jicong 已提交
75
    } else if (type == STREAM_INPUT__DATA_BLOCK) {
H
Haojun Liao 已提交
76
      for (int32_t i = 0; i < numOfBlocks; ++i) {
L
Liu Jicong 已提交
77
        SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
78

79
        // TODO optimize
80
        SSDataBlock* p = createOneDataBlock(pDataBlock, false);
H
Haojun Liao 已提交
81
        p->info = pDataBlock->info;
82

L
Liu Jicong 已提交
83
        taosArrayClear(p->pDataBlock);
H
Haojun Liao 已提交
84 85 86
        taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock);
        taosArrayPush(pInfo->pBlockLists, &p);
      }
L
Liu Jicong 已提交
87
      pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
88 89
    } else {
      ASSERT(0);
90 91
    }

H
Haojun Liao 已提交
92 93 94 95
    return TSDB_CODE_SUCCESS;
  }
}

96 97 98 99 100 101 102 103 104
void tdCleanupStreamInputDataBlock(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  if (!pTaskInfo || !pTaskInfo->pRoot || pTaskInfo->pRoot->numOfDownstream <= 0) {
    return;
  }
  SOperatorInfo* pOptrInfo = pTaskInfo->pRoot->pDownstream[0];

  if (pOptrInfo->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
    SStreamScanInfo* pInfo = pOptrInfo->info;
C
Cary Xu 已提交
105
    if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {
106 107 108 109 110 111 112 113 114 115 116 117 118
      for (int32_t i = 0; i < taosArrayGetSize(pInfo->pBlockLists); ++i) {
        SSDataBlock* p = *(SSDataBlock**)taosArrayGet(pInfo->pBlockLists, i);
        taosArrayDestroy(p->pDataBlock);
        taosMemoryFreeClear(p);
      }
    } else {
      ASSERT(0);
    }
  } else {
    ASSERT(0);
  }
}

L
Liu Jicong 已提交
119
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
H
Haojun Liao 已提交
120 121 122 123
  if (tinfo == NULL) {
    return TSDB_CODE_QRY_APP_ERROR;
  }

H
Haojun Liao 已提交
124
  if (pBlocks == NULL || numOfBlocks == 0) {
H
Haojun Liao 已提交
125 126 127
    return TSDB_CODE_SUCCESS;
  }

L
Liu Jicong 已提交
128
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
H
Haojun Liao 已提交
129

C
Cary Xu 已提交
130
  int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void*)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
131
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
132
    qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
133
  } else {
H
Haojun Liao 已提交
134
    qDebug("%s set the stream block successfully", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
135 136 137 138 139
  }

  return code;
}

H
Haojun Liao 已提交
140
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchema) {
L
Liu Jicong 已提交
141 142 143 144 145
  if (msg == NULL) {
    // TODO create raw scan
    return NULL;
  }

H
Haojun Liao 已提交
146 147
  struct SSubplan* pPlan = NULL;
  int32_t          code = qStringToSubplan(msg, &pPlan);
L
Liu Jicong 已提交
148 149 150 151 152 153
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    return NULL;
  }

  qTaskInfo_t pTaskInfo = NULL;
H
Haojun Liao 已提交
154
  code = qCreateExecTask(readers, 0, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_QUEUE);
L
Liu Jicong 已提交
155
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
156 157
    nodesDestroyNode((SNode*)pPlan);
    qDestroyTask(pTaskInfo);
L
Liu Jicong 已提交
158 159 160 161
    terrno = code;
    return NULL;
  }

162
  // extract the number of output columns
H
Haojun Liao 已提交
163
  SDataBlockDescNode* pDescNode = pPlan->pNode->pOutputDataBlockDesc;
164 165
  *numOfCols = 0;

L
Liu Jicong 已提交
166
  SNode* pNode;
167 168 169 170 171 172 173
  FOREACH(pNode, pDescNode->pSlots) {
    SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
    if (pSlotDesc->output) {
      ++(*numOfCols);
    }
  }

H
Haojun Liao 已提交
174
  *pSchema = tCloneSSchemaWrapper(((SExecTaskInfo*)pTaskInfo)->schemaInfo.qsw);
L
Liu Jicong 已提交
175 176 177
  return pTaskInfo;
}

L
Liu Jicong 已提交
178
qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers) {
L
Liu Jicong 已提交
179
  if (msg == NULL) {
180 181 182
    return NULL;
  }

183
  /*qDebugL("stream task string %s", (const char*)msg);*/
X
bugfix  
Xiaoyu Wang 已提交
184

H
Haojun Liao 已提交
185 186
  struct SSubplan* pPlan = NULL;
  int32_t          code = qStringToSubplan(msg, &pPlan);
187 188 189 190 191 192
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    return NULL;
  }

  qTaskInfo_t pTaskInfo = NULL;
H
Haojun Liao 已提交
193
  code = qCreateExecTask(readers, 0, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM);
194
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
195 196
    nodesDestroyNode((SNode*)pPlan);
    qDestroyTask(pTaskInfo);
197 198 199 200 201 202
    terrno = code;
    return NULL;
  }

  return pTaskInfo;
}
203

204
static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const SArray* tableIdList, const char* idstr) {
205 206 207 208 209 210
  SArray* qa = taosArrayInit(4, sizeof(tb_uid_t));

  // let's discard the tables those are not created according to the queried super table.
  SMetaReader mr = {0};
  metaReaderInit(&mr, pScanInfo->readHandle.meta, 0);
  for (int32_t i = 0; i < taosArrayGetSize(tableIdList); ++i) {
211
    uint64_t* id = (uint64_t*)taosArrayGet(tableIdList, i);
212 213 214

    int32_t code = metaGetTableEntryByUid(&mr, *id);
    if (code != TSDB_CODE_SUCCESS) {
215
      qError("failed to get table meta, uid:%" PRIu64 " code:%s, %s", *id, tstrerror(terrno), idstr);
216 217 218
      continue;
    }

M
Minglei Jin 已提交
219 220
    tDecoderClear(&mr.coder);

221
    // TODO handle ntb case
L
Liu Jicong 已提交
222
    if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != pScanInfo->tableUid) {
223 224
      continue;
    }
225 226 227

    if (pScanInfo->pTagCond != NULL) {
      bool          qualified = false;
228
      STableKeyInfo info = {.groupId = 0, .uid = mr.me.uid};
H
Haojun Liao 已提交
229
      code = isQualifiedTable(&info, pScanInfo->pTagCond, pScanInfo->readHandle.meta, &qualified);
230 231 232 233 234 235 236 237 238 239
      if (code != TSDB_CODE_SUCCESS) {
        qError("failed to filter new table, uid:0x%" PRIx64 ", %s", info.uid, idstr);
        continue;
      }

      if (!qualified) {
        continue;
      }
    }

240
    // handle multiple partition
241 242 243 244 245 246 247
    taosArrayPush(qa, id);
  }

  metaReaderClear(&mr);
  return qa;
}

248
int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd) {
L
fix  
Liu Jicong 已提交
249
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
250

251
  // traverse to the stream scanner node to add this table id
252
  SOperatorInfo* pInfo = pTaskInfo->pRoot;
L
Liu Jicong 已提交
253
  while (pInfo->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
254 255 256
    pInfo = pInfo->pDownstream[0];
  }

257 258
  int32_t          code = 0;
  SStreamScanInfo* pScanInfo = pInfo->info;
259
  if (isAdd) {  // add new table id
260
    SArray* qa = filterUnqualifiedTables(pScanInfo, tableIdList, GET_TASKID(pTaskInfo));
261

262
    qDebug(" %d qualified child tables added into stream scanner", (int32_t)taosArrayGetSize(qa));
L
Liu Jicong 已提交
263
    code = tqReaderAddTbUidList(pScanInfo->tqReader, qa);
264 265 266 267 268
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    // todo refactor STableList
M
Minglei Jin 已提交
269
    bool   assignUid = false;
L
Liu Jicong 已提交
270 271
    size_t bufLen = (pScanInfo->pGroupTags != NULL) ? getTableTagsBufLen(pScanInfo->pGroupTags) : 0;
    char*  keyBuf = NULL;
272
    if (bufLen > 0) {
273
      assignUid = groupbyTbname(pScanInfo->pGroupTags);
274 275 276 277 278
      keyBuf = taosMemoryMalloc(bufLen);
      if (keyBuf == NULL) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
    }
279

L
Liu Jicong 已提交
280
    for (int32_t i = 0; i < taosArrayGetSize(qa); ++i) {
281
      uint64_t*     uid = taosArrayGet(qa, i);
282
      STableKeyInfo keyInfo = {.uid = *uid, .groupId = 0};
283 284

      if (bufLen > 0) {
285 286 287 288 289 290 291 292
        if (assignUid) {
          keyInfo.groupId = keyInfo.uid;
        } else {
          code = getGroupIdFromTagsVal(pScanInfo->readHandle.meta, keyInfo.uid, pScanInfo->pGroupTags, keyBuf,
                                       &keyInfo.groupId);
          if (code != TSDB_CODE_SUCCESS) {
            return code;
          }
293 294 295
        }
      }

296
      taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &keyInfo);
297 298 299 300 301 302
      if (pTaskInfo->tableqinfoList.map == NULL) {
        pTaskInfo->tableqinfoList.map =
            taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
      }

      taosHashPut(pTaskInfo->tableqinfoList.map, uid, sizeof(uid), &keyInfo.groupId, sizeof(keyInfo.groupId));
303 304
    }

305 306 307 308
    if (keyBuf != NULL) {
      taosMemoryFree(keyBuf);
    }

309 310 311
    taosArrayDestroy(qa);
  } else {  // remove the table id in current list
    qDebug(" %d remove child tables from the stream scanner", (int32_t)taosArrayGetSize(tableIdList));
L
Liu Jicong 已提交
312
    code = tqReaderRemoveTbUidList(pScanInfo->tqReader, tableIdList);
313 314
  }

315
  return code;
L
fix  
Liu Jicong 已提交
316
}
317

318
int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion,
L
Liu Jicong 已提交
319
                                    int32_t* tversion) {
320
  ASSERT(tinfo != NULL && dbName != NULL && tableName != NULL);
321
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
322

323
  if (pTaskInfo->schemaInfo.sw == NULL) {
H
Haojun Liao 已提交
324 325 326
    return TSDB_CODE_SUCCESS;
  }

327 328 329 330
  *sversion = pTaskInfo->schemaInfo.sw->version;
  *tversion = pTaskInfo->schemaInfo.tversion;
  if (pTaskInfo->schemaInfo.dbname) {
    strcpy(dbName, pTaskInfo->schemaInfo.dbname);
331 332 333
  } else {
    dbName[0] = 0;
  }
334 335
  if (pTaskInfo->schemaInfo.tablename) {
    strcpy(tableName, pTaskInfo->schemaInfo.tablename);
336 337 338
  } else {
    tableName[0] = 0;
  }
339 340

  return 0;
341
}
342 343

int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
D
dapan1121 已提交
344
                        qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, char* sql, EOPTR_EXEC_MODEL model) {
345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369
  assert(pSubplan != NULL);
  SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;

  taosThreadOnce(&initPoolOnce, initRefPool);
  atexit(cleanupRefPool);

  int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, sql, model);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  SDataSinkMgtCfg cfg = {.maxDataBlockNum = 1000, .maxDataBlockNumPerQuery = 100};
  code = dsDataSinkMgtInit(&cfg);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  if (handle) {
    void* pSinkParam = NULL;
    code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, pTaskInfo, readHandle);
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }

    code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam);
L
Liu Jicong 已提交
370
    if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
371 372
      taosMemoryFreeClear(pSinkParam);
    }
373 374
  }

L
Liu Jicong 已提交
375
_error:
376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424
  // if failed to add ref for all tables in this query, abort current query
  return code;
}

#ifdef TEST_IMPL
// wait moment
int waitMoment(SQInfo* pQInfo) {
  if (pQInfo->sql) {
    int   ms = 0;
    char* pcnt = strstr(pQInfo->sql, " count(*)");
    if (pcnt) return 0;

    char* pos = strstr(pQInfo->sql, " t_");
    if (pos) {
      pos += 3;
      ms = atoi(pos);
      while (*pos >= '0' && *pos <= '9') {
        pos++;
      }
      char unit_char = *pos;
      if (unit_char == 'h') {
        ms *= 3600 * 1000;
      } else if (unit_char == 'm') {
        ms *= 60 * 1000;
      } else if (unit_char == 's') {
        ms *= 1000;
      }
    }
    if (ms == 0) return 0;
    printf("test wait sleep %dms. sql=%s ...\n", ms, pQInfo->sql);

    if (ms < 1000) {
      taosMsleep(ms);
    } else {
      int used_ms = 0;
      while (used_ms < ms) {
        taosMsleep(1000);
        used_ms += 1000;
        if (isTaskKilled(pQInfo)) {
          printf("test check query is canceled, sleep break.%s\n", pQInfo->sql);
          break;
        }
      }
    }
  }
  return 1;
}
#endif

H
Haojun Liao 已提交
425 426 427 428 429
static void freeBlock(void* param) {
  SSDataBlock* pBlock = *(SSDataBlock**) param;
  blockDataDestroy(pBlock);
}

430
int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds) {
431 432 433
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  int64_t        threadId = taosGetSelfPthreadId();

H
Haojun Liao 已提交
434 435
  taosArrayClearEx(pResList, freeBlock);

436 437 438 439 440 441 442 443 444 445 446 447
  int64_t curOwner = 0;
  if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
    qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
    pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
    return pTaskInfo->code;
  }

  if (pTaskInfo->cost.start == 0) {
    pTaskInfo->cost.start = taosGetTimestampMs();
  }

  if (isTaskKilled(pTaskInfo)) {
448
    atomic_store_64(&pTaskInfo->owner, 0);
449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465
    qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
    return TSDB_CODE_SUCCESS;
  }

  // error occurs, record the error code and return to client
  int32_t ret = setjmp(pTaskInfo->env);
  if (ret != TSDB_CODE_SUCCESS) {
    pTaskInfo->code = ret;
    cleanUpUdfs();
    qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
    atomic_store_64(&pTaskInfo->owner, 0);

    return pTaskInfo->code;
  }

  qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));

H
Haojun Liao 已提交
466 467 468
  int32_t current = 0;
  SSDataBlock* pRes = NULL;

469 470
  int64_t st = taosGetTimestampUs();

H
Haojun Liao 已提交
471 472 473 474 475 476 477 478 479 480 481
  while((pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot)) != NULL) {
    SSDataBlock* p = createOneDataBlock(pRes, true);
    current += p->info.rows;
    ASSERT(p->info.rows > 0);
    taosArrayPush(pResList, &p);

    if (current >= 4096) {
      break;
    }
  }

482 483 484
  uint64_t el = (taosGetTimestampUs() - st);

  pTaskInfo->cost.elapsedTime += el;
H
Haojun Liao 已提交
485
  if (NULL == pRes) {
486 487 488 489 490 491
    *useconds = pTaskInfo->cost.elapsedTime;
  }

  cleanUpUdfs();
  uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;

H
Haojun Liao 已提交
492 493
  qDebug("%s task suspended, %d rows in %d blocks returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
         GET_TASKID(pTaskInfo), current, (int32_t) taosArrayGetSize(pResList), total, 0, el / 1000.0);
494 495 496 497 498

  atomic_store_64(&pTaskInfo->owner, 0);
  return pTaskInfo->code;
}

499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  int64_t        threadId = taosGetSelfPthreadId();

  *pRes = NULL;
  int64_t curOwner = 0;
  if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
    qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
    pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
    return pTaskInfo->code;
  }

  if (pTaskInfo->cost.start == 0) {
    pTaskInfo->cost.start = taosGetTimestampMs();
  }

  if (isTaskKilled(pTaskInfo)) {
    atomic_store_64(&pTaskInfo->owner, 0);
    qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
    return TSDB_CODE_SUCCESS;
  }

  // error occurs, record the error code and return to client
  int32_t ret = setjmp(pTaskInfo->env);
  if (ret != TSDB_CODE_SUCCESS) {
    pTaskInfo->code = ret;
    cleanUpUdfs();
    qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
    atomic_store_64(&pTaskInfo->owner, 0);

    return pTaskInfo->code;
  }

  qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));

  int64_t st = taosGetTimestampUs();

  *pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot);
  uint64_t el = (taosGetTimestampUs() - st);

  pTaskInfo->cost.elapsedTime += el;
  if (NULL == *pRes) {
    *useconds = pTaskInfo->cost.elapsedTime;
  }

  cleanUpUdfs();

  int32_t  current = (*pRes != NULL) ? (*pRes)->info.rows : 0;
  uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;

  qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
         GET_TASKID(pTaskInfo), current, total, 0, el / 1000.0);

  atomic_store_64(&pTaskInfo->owner, 0);
  return pTaskInfo->code;
}

556 557 558 559 560 561
int32_t qKillTask(qTaskInfo_t qinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
  if (pTaskInfo == NULL) {
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

H
Haojun Liao 已提交
562
  qAsyncKillTask(qinfo);
563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596

  // Wait for the query executing thread being stopped/
  // Once the query is stopped, the owner of qHandle will be cleared immediately.
  while (pTaskInfo->owner != 0) {
    taosMsleep(100);
  }

  return TSDB_CODE_SUCCESS;
}

int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;

  if (pTaskInfo == NULL) {
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

  qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
  setTaskKilled(pTaskInfo);
  return TSDB_CODE_SUCCESS;
}

void qDestroyTask(qTaskInfo_t qTaskHandle) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qTaskHandle;
  if (pTaskInfo == NULL) {
    return;
  }

  qDebug("%s execTask completed, numOfRows:%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->pRoot->resultInfo.totalRows);

  queryCostStatis(pTaskInfo);  // print the query cost summary
  doDestroyTask(pTaskInfo);
}

H
Haojun Liao 已提交
597
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList) {
598
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
H
Haojun Liao 已提交
599
  return getOperatorExplainExecInfo(pTaskInfo->pRoot, pExecInfoList);
600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692
}

int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) {
  SExecTaskInfo* pTaskInfo = (struct SExecTaskInfo*)tinfo;
  if (pTaskInfo->pRoot == NULL) {
    return TSDB_CODE_INVALID_PARA;
  }

  int32_t nOptrWithVal = 0;
  int32_t code = encodeOperator(pTaskInfo->pRoot, pOutput, len, &nOptrWithVal);
  if ((code == TSDB_CODE_SUCCESS) && (nOptrWithVal = 0)) {
    taosMemoryFreeClear(*pOutput);
    *len = 0;
  }
  return code;
}

int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t len) {
  SExecTaskInfo* pTaskInfo = (struct SExecTaskInfo*)tinfo;

  if (pTaskInfo == NULL || pInput == NULL || len == 0) {
    return TSDB_CODE_INVALID_PARA;
  }

  return decodeOperator(pTaskInfo->pRoot, pInput, len);
}

int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  SOperatorInfo* pOperator = pTaskInfo->pRoot;

  while (1) {
    uint8_t type = pOperator->operatorType;
    if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
      *scanner = pOperator->info;
      return 0;
    } else {
      ASSERT(pOperator->numOfDownstream == 1);
      pOperator = pOperator->pDownstream[0];
    }
  }
}

#if 0
int32_t qStreamInput(qTaskInfo_t tinfo, void* pItem) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
  taosWriteQitem(pTaskInfo->streamInfo.inputQueue->queue, pItem);
  return 0;
}
#endif

int32_t qStreamPrepareRecover(qTaskInfo_t tinfo, int64_t startVer, int64_t endVer) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
  pTaskInfo->streamInfo.recoverStartVer = startVer;
  pTaskInfo->streamInfo.recoverEndVer = endVer;
  pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__PREPARE;
  return 0;
}

void* qExtractReaderFromStreamScanner(void* scanner) {
  SStreamScanInfo* pInfo = scanner;
  return (void*)pInfo->tqReader;
}

const SSchemaWrapper* qExtractSchemaFromStreamScanner(void* scanner) {
  SStreamScanInfo* pInfo = scanner;
  return pInfo->tqReader->pSchemaWrapper;
}

void* qStreamExtractMetaMsg(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
  return pTaskInfo->streamInfo.metaBlk;
}

int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
  memcpy(pOffset, &pTaskInfo->streamInfo.lastStatus, sizeof(STqOffsetVal));
  return 0;
}

int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  SOperatorInfo* pOperator = pTaskInfo->pRoot;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
  pTaskInfo->streamInfo.prepareStatus = *pOffset;
  if (!tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus)) {
    while (1) {
      uint8_t type = pOperator->operatorType;
      pOperator->status = OP_OPENED;
L
Liu Jicong 已提交
693 694 695 696 697 698 699 700 701 702 703
      // TODO add more check
      if (type != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
        ASSERT(pOperator->numOfDownstream == 1);
        pOperator = pOperator->pDownstream[0];
      }

      SStreamScanInfo* pInfo = pOperator->info;
      if (pOffset->type == TMQ_OFFSET__LOG) {
        STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
        tsdbReaderClose(pTSInfo->dataReader);
        pTSInfo->dataReader = NULL;
704 705 706
#if 0
          if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus) &&
              pInfo->tqReader->pWalReader->curVersion != pOffset->version) {
S
Shengliang Guan 已提交
707
            qError("prepare scan ver %" PRId64 " actual ver %" PRId64 ", last %" PRId64, pOffset->version,
708 709 710 711
                   pInfo->tqReader->pWalReader->curVersion, pTaskInfo->streamInfo.lastStatus.version);
            ASSERT(0);
          }
#endif
L
Liu Jicong 已提交
712 713 714 715 716 717 718 719 720 721 722 723 724 725 726
        if (tqSeekVer(pInfo->tqReader, pOffset->version + 1) < 0) {
          return -1;
        }
        ASSERT(pInfo->tqReader->pWalReader->curVersion == pOffset->version + 1);
      } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
        /*pInfo->blockType = STREAM_INPUT__TABLE_SCAN;*/
        int64_t uid = pOffset->uid;
        int64_t ts = pOffset->ts;

        if (uid == 0) {
          if (taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList) != 0) {
            STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0);
            uid = pTableInfo->uid;
            ts = INT64_MIN;
          } else {
727 728
            return -1;
          }
L
Liu Jicong 已提交
729
        }
H
Haojun Liao 已提交
730

L
Liu Jicong 已提交
731 732 733 734
        /*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/
        /*pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {*/
        STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
        int32_t         tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
H
Haojun Liao 已提交
735 736 737

#ifndef NDEBUG

S
Shengliang Guan 已提交
738 739
        qDebug("switch to next table %" PRId64 " (cursor %d), %" PRId64 " rows returned", uid,
               pTableScanInfo->currentTable, pInfo->pTableScanOp->resultInfo.totalRows);
L
Liu Jicong 已提交
740
        pInfo->pTableScanOp->resultInfo.totalRows = 0;
H
Haojun Liao 已提交
741 742
#endif

L
Liu Jicong 已提交
743 744 745 746 747 748 749
        bool found = false;
        for (int32_t i = 0; i < tableSz; i++) {
          STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i);
          if (pTableInfo->uid == uid) {
            found = true;
            pTableScanInfo->currentTable = i;
            break;
750
          }
L
Liu Jicong 已提交
751
        }
752

L
Liu Jicong 已提交
753 754
        // TODO after dropping table, table may be not found
        ASSERT(found);
755

L
Liu Jicong 已提交
756 757 758 759 760
        if (pTableScanInfo->dataReader == NULL) {
          if (tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &pTableScanInfo->cond,
                             pTaskInfo->tableqinfoList.pTableList, &pTableScanInfo->dataReader, NULL) < 0 ||
              pTableScanInfo->dataReader == NULL) {
            ASSERT(0);
H
Haojun Liao 已提交
761
          }
L
Liu Jicong 已提交
762
        }
H
Haojun Liao 已提交
763

L
Liu Jicong 已提交
764 765 766 767 768 769
        tsdbSetTableId(pTableScanInfo->dataReader, uid);
        int64_t oldSkey = pTableScanInfo->cond.twindows.skey;
        pTableScanInfo->cond.twindows.skey = ts + 1;
        tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond);
        pTableScanInfo->cond.twindows.skey = oldSkey;
        pTableScanInfo->scanTimes = 0;
770

S
Shengliang Guan 已提交
771 772
        qDebug("tsdb reader offset seek to uid %" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid,
               ts, pTableScanInfo->currentTable, tableSz);
L
Liu Jicong 已提交
773
        /*}*/
774 775

      } else {
L
Liu Jicong 已提交
776
        ASSERT(0);
777
      }
L
Liu Jicong 已提交
778
      return 0;
779 780 781 782
    }
  }
  return 0;
}