executor.c 40.0 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14 15 16
 */

#include "executor.h"
H
Haojun Liao 已提交
17
#include "executorimpl.h"
18
#include "planner.h"
L
Liu Jicong 已提交
19
#include "tdatablock.h"
L
Liu Jicong 已提交
20
#include "tref.h"
21
#include "tudf.h"
L
Liu Jicong 已提交
22
#include "vnode.h"
23 24 25 26 27 28 29 30 31

static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT;
int32_t             exchangeObjRefPool = -1;

static void initRefPool() { exchangeObjRefPool = taosOpenRef(1024, doDestroyExchangeOperatorInfo); }
static void cleanupRefPool() {
  int32_t ref = atomic_val_compare_exchange_32(&exchangeObjRefPool, exchangeObjRefPool, 0);
  taosCloseRef(ref);
}
32

L
Liu Jicong 已提交
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) {
  ASSERT(pOperator != NULL);
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
    if (pOperator->numOfDownstream == 0) {
      qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
      return TSDB_CODE_QRY_APP_ERROR;
    }

    if (pOperator->numOfDownstream > 1) {  // not handle this in join query
      qError("join not supported for stream block scan, %s" PRIx64, id);
      return TSDB_CODE_QRY_APP_ERROR;
    }
    pOperator->status = OP_NOT_OPENED;
    return doSetSMABlock(pOperator->pDownstream[0], input, numOfBlocks, type, id);
  } else {
    pOperator->status = OP_NOT_OPENED;

    SStreamScanInfo* pInfo = pOperator->info;

    if (type == STREAM_INPUT__MERGED_SUBMIT) {
      for (int32_t i = 0; i < numOfBlocks; i++) {
        SSubmitReq* pReq = *(void**)POINTER_SHIFT(input, i * sizeof(void*));
        taosArrayPush(pInfo->pBlockLists, &pReq);
      }
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
    } else if (type == STREAM_INPUT__DATA_SUBMIT) {
      taosArrayPush(pInfo->pBlockLists, &input);
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
    } else if (type == STREAM_INPUT__DATA_BLOCK) {
      for (int32_t i = 0; i < numOfBlocks; ++i) {
        SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
        taosArrayPush(pInfo->pBlockLists, &pDataBlock);
      }
      pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
    }

    return TSDB_CODE_SUCCESS;
  }
}

L
Liu Jicong 已提交
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
static int32_t doSetStreamOpOpen(SOperatorInfo* pOperator, char* id) {
  {
    ASSERT(pOperator != NULL);
    if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
      if (pOperator->numOfDownstream == 0) {
        qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
        return TSDB_CODE_QRY_APP_ERROR;
      }

      if (pOperator->numOfDownstream > 1) {  // not handle this in join query
        qError("join not supported for stream block scan, %s" PRIx64, id);
        return TSDB_CODE_QRY_APP_ERROR;
      }
      pOperator->status = OP_NOT_OPENED;
      return doSetStreamOpOpen(pOperator->pDownstream[0], id);
    }
  }
  return 0;
}

L
Liu Jicong 已提交
93
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) {
H
Haojun Liao 已提交
94
  ASSERT(pOperator != NULL);
X
Xiaoyu Wang 已提交
95
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
H
Haojun Liao 已提交
96
    if (pOperator->numOfDownstream == 0) {
97
      qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
H
Haojun Liao 已提交
98 99
      return TSDB_CODE_QRY_APP_ERROR;
    }
H
Haojun Liao 已提交
100

H
Haojun Liao 已提交
101
    if (pOperator->numOfDownstream > 1) {  // not handle this in join query
102
      qError("join not supported for stream block scan, %s" PRIx64, id);
H
Haojun Liao 已提交
103
      return TSDB_CODE_QRY_APP_ERROR;
H
Haojun Liao 已提交
104
    }
L
Liu Jicong 已提交
105
    pOperator->status = OP_NOT_OPENED;
L
Liu Jicong 已提交
106
    return doSetStreamBlock(pOperator->pDownstream[0], input, numOfBlocks, type, id);
H
Haojun Liao 已提交
107
  } else {
108 109
    pOperator->status = OP_NOT_OPENED;

110
    SStreamScanInfo* pInfo = pOperator->info;
111

L
Liu Jicong 已提交
112 113
    ASSERT(pInfo->validBlockIndex == 0);
    ASSERT(taosArrayGetSize(pInfo->pBlockLists) == 0);
114

L
Liu Jicong 已提交
115
    if (type == STREAM_INPUT__MERGED_SUBMIT) {
C
Cary Xu 已提交
116
      // ASSERT(numOfBlocks > 1);
L
Liu Jicong 已提交
117 118 119
      for (int32_t i = 0; i < numOfBlocks; i++) {
        SSubmitReq* pReq = *(void**)POINTER_SHIFT(input, i * sizeof(void*));
        taosArrayPush(pInfo->pBlockLists, &pReq);
120
      }
L
Liu Jicong 已提交
121 122 123 124 125
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
    } else if (type == STREAM_INPUT__DATA_SUBMIT) {
      ASSERT(numOfBlocks == 1);
      taosArrayPush(pInfo->pBlockLists, &input);
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
L
Liu Jicong 已提交
126
    } else if (type == STREAM_INPUT__DATA_BLOCK) {
H
Haojun Liao 已提交
127
      for (int32_t i = 0; i < numOfBlocks; ++i) {
L
Liu Jicong 已提交
128
        SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
L
Liu Jicong 已提交
129
        taosArrayPush(pInfo->pBlockLists, &pDataBlock);
H
Haojun Liao 已提交
130
      }
L
Liu Jicong 已提交
131
      pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
132 133
    } else {
      ASSERT(0);
134 135
    }

H
Haojun Liao 已提交
136 137 138 139
    return TSDB_CODE_SUCCESS;
  }
}

L
Liu Jicong 已提交
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) {
  if (tinfo == NULL) {
    return TSDB_CODE_QRY_APP_ERROR;
  }

  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;

  int32_t code = doSetStreamOpOpen(pTaskInfo->pRoot, GET_TASKID(pTaskInfo));
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo));
  } else {
    qDebug("%s set the stream block successfully", GET_TASKID(pTaskInfo));
  }

  return code;
}
156

L
Liu Jicong 已提交
157
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
H
Haojun Liao 已提交
158 159 160 161
  if (tinfo == NULL) {
    return TSDB_CODE_QRY_APP_ERROR;
  }

H
Haojun Liao 已提交
162
  if (pBlocks == NULL || numOfBlocks == 0) {
H
Haojun Liao 已提交
163 164 165
    return TSDB_CODE_SUCCESS;
  }

L
Liu Jicong 已提交
166
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
H
Haojun Liao 已提交
167

C
Cary Xu 已提交
168
  int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void*)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
169
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
170
    qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
171
  } else {
H
Haojun Liao 已提交
172
    qDebug("%s set the stream block successfully", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
173 174 175 176 177
  }

  return code;
}

L
Liu Jicong 已提交
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198
int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
  if (tinfo == NULL) {
    return TSDB_CODE_QRY_APP_ERROR;
  }

  if (pBlocks == NULL || numOfBlocks == 0) {
    return TSDB_CODE_SUCCESS;
  }

  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;

  int32_t code = doSetSMABlock(pTaskInfo->pRoot, (void*)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo));
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s failed to set the sma block data", GET_TASKID(pTaskInfo));
  } else {
    qDebug("%s set the sma block successfully", GET_TASKID(pTaskInfo));
  }

  return code;
}

H
Haojun Liao 已提交
199
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchema) {
L
Liu Jicong 已提交
200
  if (msg == NULL) {
L
Liu Jicong 已提交
201
    // create raw scan
202 203 204 205 206 207 208 209 210 211 212

    SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
    if (NULL == pTaskInfo) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
    setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);

    pTaskInfo->cost.created = taosGetTimestampMs();
    pTaskInfo->execModel = OPTR_EXEC_MODEL_QUEUE;
    pTaskInfo->pRoot = createRawScanOperatorInfo(readers, pTaskInfo);
L
Liu Jicong 已提交
213
    if (NULL == pTaskInfo->pRoot) {
214 215 216 217 218
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      taosMemoryFree(pTaskInfo);
      return NULL;
    }
    return pTaskInfo;
L
Liu Jicong 已提交
219 220
  }

H
Haojun Liao 已提交
221 222
  struct SSubplan* pPlan = NULL;
  int32_t          code = qStringToSubplan(msg, &pPlan);
L
Liu Jicong 已提交
223 224 225 226 227 228
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    return NULL;
  }

  qTaskInfo_t pTaskInfo = NULL;
H
Haojun Liao 已提交
229
  code = qCreateExecTask(readers, 0, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_QUEUE);
L
Liu Jicong 已提交
230
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
231 232
    nodesDestroyNode((SNode*)pPlan);
    qDestroyTask(pTaskInfo);
L
Liu Jicong 已提交
233 234 235 236
    terrno = code;
    return NULL;
  }

237
  // extract the number of output columns
H
Haojun Liao 已提交
238
  SDataBlockDescNode* pDescNode = pPlan->pNode->pOutputDataBlockDesc;
wmmhello's avatar
wmmhello 已提交
239
  *numOfCols = 0;
240

L
Liu Jicong 已提交
241
  SNode* pNode;
242 243 244
  FOREACH(pNode, pDescNode->pSlots) {
    SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
    if (pSlotDesc->output) {
wmmhello's avatar
wmmhello 已提交
245
      ++(*numOfCols);
246 247 248
    }
  }

L
Liu Jicong 已提交
249 250 251
  if (pSchema) {
    *pSchema = tCloneSSchemaWrapper(((SExecTaskInfo*)pTaskInfo)->schemaInfo.qsw);
  }
L
Liu Jicong 已提交
252 253 254
  return pTaskInfo;
}

L
Liu Jicong 已提交
255
qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers) {
L
Liu Jicong 已提交
256
  if (msg == NULL) {
257 258 259
    return NULL;
  }

260
  /*qDebugL("stream task string %s", (const char*)msg);*/
X
bugfix  
Xiaoyu Wang 已提交
261

H
Haojun Liao 已提交
262 263
  struct SSubplan* pPlan = NULL;
  int32_t          code = qStringToSubplan(msg, &pPlan);
264 265 266 267 268 269
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    return NULL;
  }

  qTaskInfo_t pTaskInfo = NULL;
H
Haojun Liao 已提交
270
  code = qCreateExecTask(readers, 0, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM);
271
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
272 273
    nodesDestroyNode((SNode*)pPlan);
    qDestroyTask(pTaskInfo);
274 275 276 277 278 279
    terrno = code;
    return NULL;
  }

  return pTaskInfo;
}
280

281
static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const SArray* tableIdList, const char* idstr) {
282 283 284 285 286 287
  SArray* qa = taosArrayInit(4, sizeof(tb_uid_t));

  // let's discard the tables those are not created according to the queried super table.
  SMetaReader mr = {0};
  metaReaderInit(&mr, pScanInfo->readHandle.meta, 0);
  for (int32_t i = 0; i < taosArrayGetSize(tableIdList); ++i) {
288
    uint64_t* id = (uint64_t*)taosArrayGet(tableIdList, i);
289 290 291

    int32_t code = metaGetTableEntryByUid(&mr, *id);
    if (code != TSDB_CODE_SUCCESS) {
292
      qError("failed to get table meta, uid:%" PRIu64 " code:%s, %s", *id, tstrerror(terrno), idstr);
293 294 295
      continue;
    }

M
Minglei Jin 已提交
296 297
    tDecoderClear(&mr.coder);

298
    // TODO handle ntb case
L
Liu Jicong 已提交
299
    if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != pScanInfo->tableUid) {
300 301
      continue;
    }
302 303 304

    if (pScanInfo->pTagCond != NULL) {
      bool          qualified = false;
305
      STableKeyInfo info = {.groupId = 0, .uid = mr.me.uid};
H
Haojun Liao 已提交
306
      code = isQualifiedTable(&info, pScanInfo->pTagCond, pScanInfo->readHandle.meta, &qualified);
307 308 309 310 311 312 313 314 315 316
      if (code != TSDB_CODE_SUCCESS) {
        qError("failed to filter new table, uid:0x%" PRIx64 ", %s", info.uid, idstr);
        continue;
      }

      if (!qualified) {
        continue;
      }
    }

317
    // handle multiple partition
318 319 320 321 322 323 324
    taosArrayPush(qa, id);
  }

  metaReaderClear(&mr);
  return qa;
}

325
int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd) {
dengyihao's avatar
dengyihao 已提交
326
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
H
Haojun Liao 已提交
327 328

  if (isAdd) {
L
Liu Jicong 已提交
329
    qDebug("add %d tables id into query list, %s", (int32_t)taosArrayGetSize(tableIdList), pTaskInfo->id.str);
H
Haojun Liao 已提交
330 331
  }

332
  // traverse to the stream scanner node to add this table id
333
  SOperatorInfo* pInfo = pTaskInfo->pRoot;
L
Liu Jicong 已提交
334
  while (pInfo->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
335 336 337
    pInfo = pInfo->pDownstream[0];
  }

338 339
  int32_t          code = 0;
  SStreamScanInfo* pScanInfo = pInfo->info;
340
  if (isAdd) {  // add new table id
341
    SArray* qa = filterUnqualifiedTables(pScanInfo, tableIdList, GET_TASKID(pTaskInfo));
342 343 344
    int32_t numOfQualifiedTables = taosArrayGetSize(qa);

    qDebug(" %d qualified child tables added into stream scanner", numOfQualifiedTables);
345

L
Liu Jicong 已提交
346
    code = tqReaderAddTbUidList(pScanInfo->tqReader, qa);
347
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
348
      taosArrayDestroy(qa);
349 350 351 352
      return code;
    }

    // todo refactor STableList
M
Minglei Jin 已提交
353
    bool   assignUid = false;
L
Liu Jicong 已提交
354 355
    size_t bufLen = (pScanInfo->pGroupTags != NULL) ? getTableTagsBufLen(pScanInfo->pGroupTags) : 0;
    char*  keyBuf = NULL;
356
    if (bufLen > 0) {
357
      assignUid = groupbyTbname(pScanInfo->pGroupTags);
358 359
      keyBuf = taosMemoryMalloc(bufLen);
      if (keyBuf == NULL) {
H
Haojun Liao 已提交
360
        taosArrayDestroy(qa);
361 362 363
        return TSDB_CODE_OUT_OF_MEMORY;
      }
    }
364

H
Haojun Liao 已提交
365
    STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList;
366 367

    for (int32_t i = 0; i < numOfQualifiedTables; ++i) {
368
      uint64_t*     uid = taosArrayGet(qa, i);
369
      STableKeyInfo keyInfo = {.uid = *uid, .groupId = 0};
370 371

      if (bufLen > 0) {
372 373 374 375 376 377
        if (assignUid) {
          keyInfo.groupId = keyInfo.uid;
        } else {
          code = getGroupIdFromTagsVal(pScanInfo->readHandle.meta, keyInfo.uid, pScanInfo->pGroupTags, keyBuf,
                                       &keyInfo.groupId);
          if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
378
            taosMemoryFree(keyBuf);
H
Haojun Liao 已提交
379
            taosArrayDestroy(qa);
380 381
            return code;
          }
382 383 384
        }
      }

L
Liu Jicong 已提交
385
#if 0
386
      bool exists = false;
H
Haojun Liao 已提交
387 388 389 390 391 392
      for (int32_t k = 0; k < taosArrayGetSize(pListInfo->pTableList); ++k) {
        STableKeyInfo* pKeyInfo = taosArrayGet(pListInfo->pTableList, k);
        if (pKeyInfo->uid == keyInfo.uid) {
          qWarn("ignore duplicated query table uid:%" PRIu64 " added, %s", pKeyInfo->uid, pTaskInfo->id.str);
          exists = true;
        }
393 394
      }

H
Haojun Liao 已提交
395
      if (!exists) {
396
#endif
397

H
Haojun Liao 已提交
398
      tableListAddTableInfo(pTableListInfo, keyInfo.uid, keyInfo.groupId);
399 400
    }

401 402 403 404
    if (keyBuf != NULL) {
      taosMemoryFree(keyBuf);
    }

405 406 407
    taosArrayDestroy(qa);
  } else {  // remove the table id in current list
    qDebug(" %d remove child tables from the stream scanner", (int32_t)taosArrayGetSize(tableIdList));
L
Liu Jicong 已提交
408
    code = tqReaderRemoveTbUidList(pScanInfo->tqReader, tableIdList);
409 410
  }

411
  return code;
L
fix  
Liu Jicong 已提交
412
}
413

414
int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion,
L
Liu Jicong 已提交
415
                                    int32_t* tversion) {
416
  ASSERT(tinfo != NULL && dbName != NULL && tableName != NULL);
417
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
418

419
  if (pTaskInfo->schemaInfo.sw == NULL) {
H
Haojun Liao 已提交
420 421 422
    return TSDB_CODE_SUCCESS;
  }

423 424 425 426
  *sversion = pTaskInfo->schemaInfo.sw->version;
  *tversion = pTaskInfo->schemaInfo.tversion;
  if (pTaskInfo->schemaInfo.dbname) {
    strcpy(dbName, pTaskInfo->schemaInfo.dbname);
427 428 429
  } else {
    dbName[0] = 0;
  }
430 431
  if (pTaskInfo->schemaInfo.tablename) {
    strcpy(tableName, pTaskInfo->schemaInfo.tablename);
432 433 434
  } else {
    tableName[0] = 0;
  }
435 436

  return 0;
437
}
438 439

int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
D
dapan1121 已提交
440
                        qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, char* sql, EOPTR_EXEC_MODEL model) {
441 442 443 444 445 446
  assert(pSubplan != NULL);
  SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;

  taosThreadOnce(&initPoolOnce, initRefPool);
  atexit(cleanupRefPool);

447
  qDebug("start to create subplan task, TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, pSubplan->id.queryId);
448

449 450
  int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, sql, model);
  if (code != TSDB_CODE_SUCCESS) {
451
    qError("failed to createExecTaskInfoImpl, code: %s", tstrerror(code));
452 453 454
    goto _error;
  }

455
  SDataSinkMgtCfg cfg = {.maxDataBlockNum = 500, .maxDataBlockNumPerQuery = 50};
456 457
  code = dsDataSinkMgtInit(&cfg);
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
458
    qError("failed to dsDataSinkMgtInit, code:%s, %s", tstrerror(code), (*pTask)->id.str);
459 460 461 462 463 464 465
    goto _error;
  }

  if (handle) {
    void* pSinkParam = NULL;
    code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, pTaskInfo, readHandle);
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
466
      qError("failed to createDataSinkParam, vgId:%d, code:%s, %s", vgId, tstrerror(code), (*pTask)->id.str);
467 468 469
      goto _error;
    }

H
Haojun Liao 已提交
470
    code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam, (*pTask)->id.str);
L
Liu Jicong 已提交
471
    if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
472 473
      taosMemoryFreeClear(pSinkParam);
    }
474 475
  }

476
  qDebug("subplan task create completed, TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, pSubplan->id.queryId);
477

dengyihao's avatar
dengyihao 已提交
478
_error:
479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527
  // if failed to add ref for all tables in this query, abort current query
  return code;
}

#ifdef TEST_IMPL
// wait moment
int waitMoment(SQInfo* pQInfo) {
  if (pQInfo->sql) {
    int   ms = 0;
    char* pcnt = strstr(pQInfo->sql, " count(*)");
    if (pcnt) return 0;

    char* pos = strstr(pQInfo->sql, " t_");
    if (pos) {
      pos += 3;
      ms = atoi(pos);
      while (*pos >= '0' && *pos <= '9') {
        pos++;
      }
      char unit_char = *pos;
      if (unit_char == 'h') {
        ms *= 3600 * 1000;
      } else if (unit_char == 'm') {
        ms *= 60 * 1000;
      } else if (unit_char == 's') {
        ms *= 1000;
      }
    }
    if (ms == 0) return 0;
    printf("test wait sleep %dms. sql=%s ...\n", ms, pQInfo->sql);

    if (ms < 1000) {
      taosMsleep(ms);
    } else {
      int used_ms = 0;
      while (used_ms < ms) {
        taosMsleep(1000);
        used_ms += 1000;
        if (isTaskKilled(pQInfo)) {
          printf("test check query is canceled, sleep break.%s\n", pQInfo->sql);
          break;
        }
      }
    }
  }
  return 1;
}
#endif

H
Haojun Liao 已提交
528
static void freeBlock(void* param) {
529
  SSDataBlock* pBlock = *(SSDataBlock**)param;
H
Haojun Liao 已提交
530 531 532
  blockDataDestroy(pBlock);
}

533
int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bool* hasMore, SLocalFetch* pLocal) {
534 535 536
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  int64_t        threadId = taosGetSelfPthreadId();

D
dapan1121 已提交
537
  if (pLocal) {
538
    memcpy(&pTaskInfo->localFetch, pLocal, sizeof(*pLocal));
D
dapan1121 已提交
539
  }
L
Liu Jicong 已提交
540

H
Haojun Liao 已提交
541 542
  taosArrayClearEx(pResList, freeBlock);

543 544 545 546 547 548 549 550 551 552 553 554
  int64_t curOwner = 0;
  if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
    qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
    pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
    return pTaskInfo->code;
  }

  if (pTaskInfo->cost.start == 0) {
    pTaskInfo->cost.start = taosGetTimestampMs();
  }

  if (isTaskKilled(pTaskInfo)) {
555
    atomic_store_64(&pTaskInfo->owner, 0);
556 557 558 559 560 561 562 563 564
    qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
    return TSDB_CODE_SUCCESS;
  }

  // error occurs, record the error code and return to client
  int32_t ret = setjmp(pTaskInfo->env);
  if (ret != TSDB_CODE_SUCCESS) {
    pTaskInfo->code = ret;
    cleanUpUdfs();
H
Haojun Liao 已提交
565

566 567 568 569 570 571 572 573
    qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
    atomic_store_64(&pTaskInfo->owner, 0);

    return pTaskInfo->code;
  }

  qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));

574
  int32_t      current = 0;
H
Haojun Liao 已提交
575 576
  SSDataBlock* pRes = NULL;

577 578
  int64_t st = taosGetTimestampUs();

579
  while ((pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot)) != NULL) {
H
Haojun Liao 已提交
580 581 582 583 584 585 586 587 588 589
    SSDataBlock* p = createOneDataBlock(pRes, true);
    current += p->info.rows;
    ASSERT(p->info.rows > 0);
    taosArrayPush(pResList, &p);

    if (current >= 4096) {
      break;
    }
  }

590
  *hasMore = (pRes != NULL);
591 592 593
  uint64_t el = (taosGetTimestampUs() - st);

  pTaskInfo->cost.elapsedTime += el;
H
Haojun Liao 已提交
594
  if (NULL == pRes) {
595 596 597 598 599
    *useconds = pTaskInfo->cost.elapsedTime;
  }

  cleanUpUdfs();

H
Haojun Liao 已提交
600
  uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
H
Haojun Liao 已提交
601
  qDebug("%s task suspended, %d rows in %d blocks returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
602
         GET_TASKID(pTaskInfo), current, (int32_t)taosArrayGetSize(pResList), total, 0, el / 1000.0);
603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624

  atomic_store_64(&pTaskInfo->owner, 0);
  return pTaskInfo->code;
}

int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  int64_t        threadId = taosGetSelfPthreadId();

  *pRes = NULL;
  int64_t curOwner = 0;
  if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
    qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
    pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
    return pTaskInfo->code;
  }

  if (pTaskInfo->cost.start == 0) {
    pTaskInfo->cost.start = taosGetTimestampMs();
  }

  if (isTaskKilled(pTaskInfo)) {
625
    atomic_store_64(&pTaskInfo->owner, 0);
626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663
    qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
    return TSDB_CODE_SUCCESS;
  }

  // error occurs, record the error code and return to client
  int32_t ret = setjmp(pTaskInfo->env);
  if (ret != TSDB_CODE_SUCCESS) {
    pTaskInfo->code = ret;
    cleanUpUdfs();
    qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
    atomic_store_64(&pTaskInfo->owner, 0);
    return pTaskInfo->code;
  }

  qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));

  int64_t st = taosGetTimestampUs();

  *pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot);
  uint64_t el = (taosGetTimestampUs() - st);

  pTaskInfo->cost.elapsedTime += el;
  if (NULL == *pRes) {
    *useconds = pTaskInfo->cost.elapsedTime;
  }

  cleanUpUdfs();

  int32_t  current = (*pRes != NULL) ? (*pRes)->info.rows : 0;
  uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;

  qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
         GET_TASKID(pTaskInfo), current, total, 0, el / 1000.0);

  atomic_store_64(&pTaskInfo->owner, 0);
  return pTaskInfo->code;
}

L
Liu Jicong 已提交
664
int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo) {
D
dapan1121 已提交
665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684
  taosWLockLatch(&pTaskInfo->stopInfo.lock);
  taosArrayPush(pTaskInfo->stopInfo.pStopInfo, pInfo);
  taosWUnLockLatch(&pTaskInfo->stopInfo.lock);

  return TSDB_CODE_SUCCESS;
}

int32_t stopInfoComp(void const* lp, void const* rp) {
  SExchangeOpStopInfo* key = (SExchangeOpStopInfo*)lp;
  SExchangeOpStopInfo* pInfo = (SExchangeOpStopInfo*)rp;

  if (key->refId < pInfo->refId) {
    return -1;
  } else if (key->refId > pInfo->refId) {
    return 1;
  }

  return 0;
}

L
Liu Jicong 已提交
685
void qRemoveTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo) {
D
dapan1121 已提交
686 687 688 689 690 691 692
  taosWLockLatch(&pTaskInfo->stopInfo.lock);
  int32_t idx = taosArraySearchIdx(pTaskInfo->stopInfo.pStopInfo, pInfo, stopInfoComp, TD_EQ);
  if (idx >= 0) {
    taosArrayRemove(pTaskInfo->stopInfo.pStopInfo, idx);
  }
  taosWUnLockLatch(&pTaskInfo->stopInfo.lock);

D
dapan1121 已提交
693
  return;
D
dapan1121 已提交
694 695 696 697 698 699 700
}

void qStopTaskOperators(SExecTaskInfo* pTaskInfo) {
  taosWLockLatch(&pTaskInfo->stopInfo.lock);

  int32_t num = taosArrayGetSize(pTaskInfo->stopInfo.pStopInfo);
  for (int32_t i = 0; i < num; ++i) {
L
Liu Jicong 已提交
701 702
    SExchangeOpStopInfo* pStop = taosArrayGet(pTaskInfo->stopInfo.pStopInfo, i);
    SExchangeInfo*       pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pStop->refId);
D
dapan1121 已提交
703 704 705 706 707 708 709 710 711
    if (pExchangeInfo) {
      tsem_post(&pExchangeInfo->ready);
      taosReleaseRef(exchangeObjRefPool, pStop->refId);
    }
  }

  taosWUnLockLatch(&pTaskInfo->stopInfo.lock);
}

712 713 714 715 716 717 718 719
int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;

  if (pTaskInfo == NULL) {
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

  qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
L
Liu Jicong 已提交
720

721
  setTaskKilled(pTaskInfo);
D
dapan1121 已提交
722 723

  qStopTaskOperators(pTaskInfo);
L
Liu Jicong 已提交
724

725 726 727 728 729 730 731 732 733 734 735 736 737 738 739
  return TSDB_CODE_SUCCESS;
}

void qDestroyTask(qTaskInfo_t qTaskHandle) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qTaskHandle;
  if (pTaskInfo == NULL) {
    return;
  }

  qDebug("%s execTask completed, numOfRows:%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->pRoot->resultInfo.totalRows);

  queryCostStatis(pTaskInfo);  // print the query cost summary
  doDestroyTask(pTaskInfo);
}

H
Haojun Liao 已提交
740
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList) {
741
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
H
Haojun Liao 已提交
742
  return getOperatorExplainExecInfo(pTaskInfo->pRoot, pExecInfoList);
743 744 745 746 747 748 749 750 751 752
}

int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) {
  SExecTaskInfo* pTaskInfo = (struct SExecTaskInfo*)tinfo;
  if (pTaskInfo->pRoot == NULL) {
    return TSDB_CODE_INVALID_PARA;
  }

  int32_t nOptrWithVal = 0;
  int32_t code = encodeOperator(pTaskInfo->pRoot, pOutput, len, &nOptrWithVal);
H
Haojun Liao 已提交
753
  if ((code == TSDB_CODE_SUCCESS) && (nOptrWithVal == 0)) {
754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774
    taosMemoryFreeClear(*pOutput);
    *len = 0;
  }
  return code;
}

int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t len) {
  SExecTaskInfo* pTaskInfo = (struct SExecTaskInfo*)tinfo;

  if (pTaskInfo == NULL || pInput == NULL || len == 0) {
    return TSDB_CODE_INVALID_PARA;
  }

  return decodeOperator(pTaskInfo->pRoot, pInput, len);
}

int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  SOperatorInfo* pOperator = pTaskInfo->pRoot;

  while (1) {
775
    uint16_t type = pOperator->operatorType;
776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794
    if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
      *scanner = pOperator->info;
      return 0;
    } else {
      ASSERT(pOperator->numOfDownstream == 1);
      pOperator = pOperator->pDownstream[0];
    }
  }
}

#if 0
int32_t qStreamInput(qTaskInfo_t tinfo, void* pItem) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
  taosWriteQitem(pTaskInfo->streamInfo.inputQueue->queue, pItem);
  return 0;
}
#endif

795
int32_t qStreamSourceRecoverStep1(qTaskInfo_t tinfo, int64_t ver) {
796 797
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
L
Liu Jicong 已提交
798
  pTaskInfo->streamInfo.fillHistoryVer1 = ver;
799 800 801 802 803 804 805
  pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__PREPARE1;
  return 0;
}

int32_t qStreamSourceRecoverStep2(qTaskInfo_t tinfo, int64_t ver) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
L
Liu Jicong 已提交
806
  pTaskInfo->streamInfo.fillHistoryVer2 = ver;
807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826
  pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__PREPARE2;
  return 0;
}

int32_t qStreamRecoverFinish(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
  pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE;
  return 0;
}

int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  SOperatorInfo* pOperator = pTaskInfo->pRoot;

  while (1) {
    if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL ||
        pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
        pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
      SStreamIntervalOperatorInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
827 828 829 830 831 832 833 834 835
      ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||
             pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
      ASSERT(pInfo->twAggSup.calTriggerSaved == 0);
      ASSERT(pInfo->twAggSup.deleteMarkSaved == 0);

      qInfo("save stream param for interval: %d,  %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);

      pInfo->twAggSup.calTriggerSaved = pInfo->twAggSup.calTrigger;
      pInfo->twAggSup.deleteMarkSaved = pInfo->twAggSup.deleteMark;
836 837
      pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
      pInfo->twAggSup.deleteMark = INT64_MAX;
L
Liu Jicong 已提交
838

839 840 841 842
    } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
               pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
               pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
      SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
843 844 845 846 847 848 849 850 851
      ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||
             pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
      ASSERT(pInfo->twAggSup.calTriggerSaved == 0);
      ASSERT(pInfo->twAggSup.deleteMarkSaved == 0);

      qInfo("save stream param for session: %d,  %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);

      pInfo->twAggSup.calTriggerSaved = pInfo->twAggSup.calTrigger;
      pInfo->twAggSup.deleteMarkSaved = pInfo->twAggSup.deleteMark;
852 853 854 855
      pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
      pInfo->twAggSup.deleteMark = INT64_MAX;
    } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
      SStreamStateAggOperatorInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
856 857 858 859 860 861 862 863 864
      ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||
             pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
      ASSERT(pInfo->twAggSup.calTriggerSaved == 0);
      ASSERT(pInfo->twAggSup.deleteMarkSaved == 0);

      qInfo("save stream param for state: %d,  %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);

      pInfo->twAggSup.calTriggerSaved = pInfo->twAggSup.calTrigger;
      pInfo->twAggSup.deleteMarkSaved = pInfo->twAggSup.deleteMark;
865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893
      pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
      pInfo->twAggSup.deleteMark = INT64_MAX;
    }

    // iterate operator tree
    if (pOperator->numOfDownstream != 1 || pOperator->pDownstream[0] == NULL) {
      if (pOperator->numOfDownstream > 1) {
        qError("unexpected stream, multiple downstream");
        ASSERT(0);
        return -1;
      }
      return 0;
    } else {
      pOperator = pOperator->pDownstream[0];
    }
  }

  return 0;
}

int32_t qStreamRestoreParam(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  SOperatorInfo* pOperator = pTaskInfo->pRoot;

  while (1) {
    if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL ||
        pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
        pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
      SStreamIntervalOperatorInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
894 895 896 897 898 899 900 901
      ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE);
      ASSERT(pInfo->twAggSup.deleteMark == INT64_MAX);

      pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
      pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
      ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||
             pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
      qInfo("restore stream param for interval: %d,  %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
902 903 904 905
    } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
               pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
               pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
      SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
906 907 908 909 910 911 912 913
      ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE);
      ASSERT(pInfo->twAggSup.deleteMark == INT64_MAX);

      pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
      pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
      ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||
             pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
      qInfo("restore stream param for session: %d,  %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
914 915
    } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
      SStreamStateAggOperatorInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
916 917 918 919 920 921 922 923
      ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE);
      ASSERT(pInfo->twAggSup.deleteMark == INT64_MAX);

      pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
      pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
      ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||
             pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
      qInfo("restore stream param for state: %d,  %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
924 925 926 927 928 929 930 931 932 933 934 935 936 937
    }

    // iterate operator tree
    if (pOperator->numOfDownstream != 1 || pOperator->pDownstream[0] == NULL) {
      if (pOperator->numOfDownstream > 1) {
        qError("unexpected stream, multiple downstream");
        ASSERT(0);
        return -1;
      }
      return 0;
    } else {
      pOperator = pOperator->pDownstream[0];
    }
  }
938 939 940 941 942 943 944 945
  return 0;
}

void* qExtractReaderFromStreamScanner(void* scanner) {
  SStreamScanInfo* pInfo = scanner;
  return (void*)pInfo->tqReader;
}

wmmhello's avatar
wmmhello 已提交
946 947 948 949 950 951 952 953
const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  return pTaskInfo->streamInfo.schema;
}

const char* qExtractTbnameFromTask(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  return pTaskInfo->streamInfo.tbName;
954 955
}

wmmhello's avatar
wmmhello 已提交
956
SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) {
957 958
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
wmmhello's avatar
wmmhello 已提交
959
  return &pTaskInfo->streamInfo.metaRsp;
960 961
}

wmmhello's avatar
wmmhello 已提交
962 963 964 965 966 967
int64_t qStreamExtractPrepareUid(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
  return pTaskInfo->streamInfo.prepareStatus.uid;
}

968 969 970 971 972 973 974
int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
  memcpy(pOffset, &pTaskInfo->streamInfo.lastStatus, sizeof(STqOffsetVal));
  return 0;
}

H
Haojun Liao 已提交
975
int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* sContext, SMetaTableInfo* pMtInfo) {
976 977
  memset(pCond, 0, sizeof(SQueryTableDataCond));
  pCond->order = TSDB_ORDER_ASC;
H
Haojun Liao 已提交
978
  pCond->numOfCols = pMtInfo->schema->nCols;
979 980 981 982 983 984 985
  pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo));
  if (pCond->colList == NULL) {
    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return terrno;
  }

  pCond->twindows = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
H
Haojun Liao 已提交
986
  pCond->suid = pMtInfo->suid;
987 988 989 990 991
  pCond->type = TIMEWINDOW_RANGE_CONTAINED;
  pCond->startVersion = -1;
  pCond->endVersion = sContext->snapVersion;

  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
H
Haojun Liao 已提交
992 993 994
    pCond->colList[i].type = pMtInfo->schema->pSchema[i].type;
    pCond->colList[i].bytes = pMtInfo->schema->pSchema[i].bytes;
    pCond->colList[i].colId = pMtInfo->schema->pSchema[i].colId;
995 996 997 998 999
  }

  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
1000 1001 1002 1003 1004 1005 1006 1007
int32_t qStreamScanMemData(qTaskInfo_t tinfo, const SSubmitReq* pReq) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
  ASSERT(pTaskInfo->streamInfo.pReq == NULL);
  pTaskInfo->streamInfo.pReq = pReq;
  return 0;
}

1008
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) {
1009 1010 1011 1012
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  SOperatorInfo* pOperator = pTaskInfo->pRoot;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
  pTaskInfo->streamInfo.prepareStatus = *pOffset;
1013
  pTaskInfo->streamInfo.returned = 0;
1014 1015 1016 1017
  if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus)) {
    return 0;
  }
  if (subType == TOPIC_SUB_TYPE__COLUMN) {
wmmhello's avatar
wmmhello 已提交
1018
    uint16_t type = pOperator->operatorType;
1019 1020 1021 1022 1023 1024
    pOperator->status = OP_OPENED;
    // TODO add more check
    if (type != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
      ASSERT(pOperator->numOfDownstream == 1);
      pOperator = pOperator->pDownstream[0];
    }
L
Liu Jicong 已提交
1025

1026 1027 1028
    SStreamScanInfo* pInfo = pOperator->info;
    if (pOffset->type == TMQ_OFFSET__LOG) {
      STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1029 1030
      tsdbReaderClose(pTSInfo->base.dataReader);
      pTSInfo->base.dataReader = NULL;
1031
#if 0
wmmhello's avatar
wmmhello 已提交
1032 1033 1034 1035 1036 1037
      if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus) &&
          pInfo->tqReader->pWalReader->curVersion != pOffset->version) {
        qError("prepare scan ver %" PRId64 " actual ver %" PRId64 ", last %" PRId64, pOffset->version,
               pInfo->tqReader->pWalReader->curVersion, pTaskInfo->streamInfo.lastStatus.version);
        ASSERT(0);
      }
1038
#endif
1039 1040 1041 1042 1043 1044 1045 1046 1047 1048
      if (tqSeekVer(pInfo->tqReader, pOffset->version + 1) < 0) {
        return -1;
      }
      ASSERT(pInfo->tqReader->pWalReader->curVersion == pOffset->version + 1);
    } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
      /*pInfo->blockType = STREAM_INPUT__TABLE_SCAN;*/
      int64_t uid = pOffset->uid;
      int64_t ts = pOffset->ts;

      if (uid == 0) {
H
Haojun Liao 已提交
1049 1050
        if (tableListGetSize(pTaskInfo->pTableInfoList) != 0) {
          STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, 0);
1051 1052 1053
          uid = pTableInfo->uid;
          ts = INT64_MIN;
        } else {
L
Liu Jicong 已提交
1054 1055
          return -1;
        }
1056
      }
H
Haojun Liao 已提交
1057

1058 1059 1060
      /*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/
      /*pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {*/
      STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1061
      int32_t         numOfTables = tableListGetSize(pTaskInfo->pTableInfoList);
H
Haojun Liao 已提交
1062 1063

#ifndef NDEBUG
wmmhello's avatar
wmmhello 已提交
1064 1065
      qDebug("switch to next table %" PRId64 " (cursor %d), %" PRId64 " rows returned", uid,
             pTableScanInfo->currentTable, pInfo->pTableScanOp->resultInfo.totalRows);
1066
      pInfo->pTableScanOp->resultInfo.totalRows = 0;
H
Haojun Liao 已提交
1067 1068
#endif

1069
      bool found = false;
1070
      for (int32_t i = 0; i < numOfTables; i++) {
H
Haojun Liao 已提交
1071
        STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, i);
1072 1073 1074 1075
        if (pTableInfo->uid == uid) {
          found = true;
          pTableScanInfo->currentTable = i;
          break;
L
Liu Jicong 已提交
1076
        }
1077
      }
1078

L
Liu Jicong 已提交
1079
      // TODO after dropping table, table may not found
1080
      ASSERT(found);
1081

H
Haojun Liao 已提交
1082
      if (pTableScanInfo->base.dataReader == NULL) {
H
Haojun Liao 已提交
1083
        STableKeyInfo* pList = tableListGetInfo(pTaskInfo->pTableInfoList, 0);
dengyihao's avatar
dengyihao 已提交
1084
        int32_t        num = tableListGetSize(pTaskInfo->pTableInfoList);
1085

H
Haojun Liao 已提交
1086 1087 1088
        if (tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &pTableScanInfo->base.cond, pList, num,
                           &pTableScanInfo->base.dataReader, NULL) < 0 ||
            pTableScanInfo->base.dataReader == NULL) {
1089
          ASSERT(0);
L
Liu Jicong 已提交
1090
        }
1091
      }
H
Haojun Liao 已提交
1092

1093
      STableKeyInfo tki = {.uid = uid};
H
Haojun Liao 已提交
1094 1095 1096 1097 1098
      tsdbSetTableList(pTableScanInfo->base.dataReader, &tki, 1);
      int64_t oldSkey = pTableScanInfo->base.cond.twindows.skey;
      pTableScanInfo->base.cond.twindows.skey = ts + 1;
      tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
      pTableScanInfo->base.cond.twindows.skey = oldSkey;
1099
      pTableScanInfo->scanTimes = 0;
1100

wmmhello's avatar
wmmhello 已提交
1101
      qDebug("tsdb reader offset seek to uid %" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid,
1102
             ts, pTableScanInfo->currentTable, numOfTables);
1103 1104 1105
      /*}*/
    } else {
      ASSERT(0);
1106
    }
L
Liu Jicong 已提交
1107
  } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
1108
    SStreamRawScanInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
1109 1110 1111
    SSnapContext*       sContext = pInfo->sContext;
    if (setForSnapShot(sContext, pOffset->uid) != 0) {
      qError("setDataForSnapShot error. uid:%" PRIi64, pOffset->uid);
1112 1113 1114 1115 1116 1117
      return -1;
    }

    SMetaTableInfo mtInfo = getUidfromSnapShot(sContext);
    tsdbReaderClose(pInfo->dataReader);
    pInfo->dataReader = NULL;
H
Haojun Liao 已提交
1118

1119
    cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
H
Haojun Liao 已提交
1120 1121 1122 1123 1124
    tableListClear(pTaskInfo->pTableInfoList);

    if (mtInfo.uid == 0) {
      return 0;  // no data
    }
1125

H
Haojun Liao 已提交
1126
    initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, &mtInfo);
1127
    pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts;
1128

dengyihao's avatar
dengyihao 已提交
1129
    if (pTaskInfo->pTableInfoList == NULL) {
H
Haojun Liao 已提交
1130 1131 1132 1133
      pTaskInfo->pTableInfoList = tableListCreate();
    }

    tableListAddTableInfo(pTaskInfo->pTableInfoList, mtInfo.uid, 0);
1134

H
Haojun Liao 已提交
1135
    STableKeyInfo* pList = tableListGetInfo(pTaskInfo->pTableInfoList, 0);
dengyihao's avatar
dengyihao 已提交
1136
    int32_t        size = tableListGetSize(pTaskInfo->pTableInfoList);
H
Haojun Liao 已提交
1137
    ASSERT(size == 1);
1138

H
Haojun Liao 已提交
1139
    tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, &pInfo->dataReader, NULL);
1140

1141
    cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
L
Liu Jicong 已提交
1142 1143 1144 1145
    strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName);
    tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema);
    pTaskInfo->streamInfo.schema = mtInfo.schema;

1146
    qDebug("tmqsnap qStreamPrepareScan snapshot data uid %" PRId64 " ts %" PRId64, mtInfo.uid, pOffset->ts);
L
Liu Jicong 已提交
1147
  } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_META) {
wmmhello's avatar
wmmhello 已提交
1148
    SStreamRawScanInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
1149 1150
    SSnapContext*       sContext = pInfo->sContext;
    if (setForSnapShot(sContext, pOffset->uid) != 0) {
H
Haojun Liao 已提交
1151
      qError("setForSnapShot error. uid:%" PRIu64 " ,version:%" PRId64, pOffset->uid, pOffset->version);
wmmhello's avatar
wmmhello 已提交
1152 1153
      return -1;
    }
1154
    qDebug("tmqsnap qStreamPrepareScan snapshot meta uid %" PRId64 " ts %" PRId64, pOffset->uid, pOffset->ts);
L
Liu Jicong 已提交
1155
  } else if (pOffset->type == TMQ_OFFSET__LOG) {
wmmhello's avatar
wmmhello 已提交
1156 1157 1158
    SStreamRawScanInfo* pInfo = pOperator->info;
    tsdbReaderClose(pInfo->dataReader);
    pInfo->dataReader = NULL;
wmmhello's avatar
wmmhello 已提交
1159
    qDebug("tmqsnap qStreamPrepareScan snapshot log");
1160 1161 1162
  }
  return 0;
}
H
Haojun Liao 已提交
1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182

void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
  assert(pMsg->info.ahandle != NULL);

  SDataBuf buf = {.len = pMsg->contLen, .pData = NULL};

  if (pMsg->contLen > 0) {
    buf.pData = taosMemoryCalloc(1, pMsg->contLen);
    if (buf.pData == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
    } else {
      memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
    }
  }

  pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
  rpcFreeCont(pMsg->pCont);
  destroySendMsgInfo(pSendInfo);
L
Liu Jicong 已提交
1183
}