executor.c 41.6 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14 15 16
 */

#include "executor.h"
H
Haojun Liao 已提交
17
#include "executorimpl.h"
18
#include "planner.h"
L
Liu Jicong 已提交
19
#include "tdatablock.h"
L
Liu Jicong 已提交
20
#include "tref.h"
21
#include "tudf.h"
L
Liu Jicong 已提交
22
#include "vnode.h"
23 24 25 26 27 28 29 30

static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT;
int32_t             exchangeObjRefPool = -1;

static void cleanupRefPool() {
  int32_t ref = atomic_val_compare_exchange_32(&exchangeObjRefPool, exchangeObjRefPool, 0);
  taosCloseRef(ref);
}
31

D
dapan1121 已提交
32 33 34 35 36
static void initRefPool() { 
  exchangeObjRefPool = taosOpenRef(1024, doDestroyExchangeOperatorInfo);   
  atexit(cleanupRefPool);
}

L
Liu Jicong 已提交
37 38 39 40
static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) {
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
    if (pOperator->numOfDownstream == 0) {
      qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
S
Shengliang Guan 已提交
41
      return TSDB_CODE_APP_ERROR;
L
Liu Jicong 已提交
42 43 44 45
    }

    if (pOperator->numOfDownstream > 1) {  // not handle this in join query
      qError("join not supported for stream block scan, %s" PRIx64, id);
S
Shengliang Guan 已提交
46
      return TSDB_CODE_APP_ERROR;
L
Liu Jicong 已提交
47 48 49 50 51 52 53 54 55 56
    }
    pOperator->status = OP_NOT_OPENED;
    return doSetSMABlock(pOperator->pDownstream[0], input, numOfBlocks, type, id);
  } else {
    pOperator->status = OP_NOT_OPENED;

    SStreamScanInfo* pInfo = pOperator->info;

    if (type == STREAM_INPUT__MERGED_SUBMIT) {
      for (int32_t i = 0; i < numOfBlocks; i++) {
K
kailixu 已提交
57 58
        SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData));
        taosArrayPush(pInfo->pBlockLists, pReq);
L
Liu Jicong 已提交
59 60 61 62 63 64 65 66
      }
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
    } else if (type == STREAM_INPUT__DATA_SUBMIT) {
      taosArrayPush(pInfo->pBlockLists, &input);
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
    } else if (type == STREAM_INPUT__DATA_BLOCK) {
      for (int32_t i = 0; i < numOfBlocks; ++i) {
        SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
K
kailixu 已提交
67 68 69 70
        SPackedData  tmp = {
             .pDataBlock = pDataBlock,
        };
        taosArrayPush(pInfo->pBlockLists, &tmp);
L
Liu Jicong 已提交
71 72 73 74 75 76 77 78
      }
      pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
    }

    return TSDB_CODE_SUCCESS;
  }
}

L
Liu Jicong 已提交
79
static int32_t doSetStreamOpOpen(SOperatorInfo* pOperator, char* id) {
H
Haojun Liao 已提交
80 81 82 83 84
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
    if (pOperator->numOfDownstream == 0) {
      qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
      return TSDB_CODE_APP_ERROR;
    }
L
Liu Jicong 已提交
85

H
Haojun Liao 已提交
86 87 88
    if (pOperator->numOfDownstream > 1) {  // not handle this in join query
      qError("join not supported for stream block scan, %s" PRIx64, id);
      return TSDB_CODE_APP_ERROR;
L
Liu Jicong 已提交
89
    }
H
Haojun Liao 已提交
90 91
    pOperator->status = OP_NOT_OPENED;
    return doSetStreamOpOpen(pOperator->pDownstream[0], id);
L
Liu Jicong 已提交
92 93 94 95
  }
  return 0;
}

L
Liu Jicong 已提交
96
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) {
X
Xiaoyu Wang 已提交
97
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
H
Haojun Liao 已提交
98
    if (pOperator->numOfDownstream == 0) {
99
      qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
S
Shengliang Guan 已提交
100
      return TSDB_CODE_APP_ERROR;
H
Haojun Liao 已提交
101
    }
H
Haojun Liao 已提交
102

H
Haojun Liao 已提交
103
    if (pOperator->numOfDownstream > 1) {  // not handle this in join query
104
      qError("join not supported for stream block scan, %s" PRIx64, id);
S
Shengliang Guan 已提交
105
      return TSDB_CODE_APP_ERROR;
H
Haojun Liao 已提交
106
    }
L
Liu Jicong 已提交
107
    pOperator->status = OP_NOT_OPENED;
L
Liu Jicong 已提交
108
    return doSetStreamBlock(pOperator->pDownstream[0], input, numOfBlocks, type, id);
H
Haojun Liao 已提交
109
  } else {
110 111
    pOperator->status = OP_NOT_OPENED;

112
    SStreamScanInfo* pInfo = pOperator->info;
113
    qDebug("stream set total blocks:%d, task id:%s" PRIx64, (int32_t)numOfBlocks, id);
L
Liu Jicong 已提交
114 115
    ASSERT(pInfo->validBlockIndex == 0);
    ASSERT(taosArrayGetSize(pInfo->pBlockLists) == 0);
116

L
Liu Jicong 已提交
117
    if (type == STREAM_INPUT__MERGED_SUBMIT) {
C
Cary Xu 已提交
118
      // ASSERT(numOfBlocks > 1);
L
Liu Jicong 已提交
119
      for (int32_t i = 0; i < numOfBlocks; i++) {
L
Liu Jicong 已提交
120
        SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData));
L
Liu Jicong 已提交
121
        taosArrayPush(pInfo->pBlockLists, pReq);
122
      }
L
Liu Jicong 已提交
123 124 125
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
    } else if (type == STREAM_INPUT__DATA_SUBMIT) {
      ASSERT(numOfBlocks == 1);
L
Liu Jicong 已提交
126
      taosArrayPush(pInfo->pBlockLists, input);
L
Liu Jicong 已提交
127
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
L
Liu Jicong 已提交
128
    } else if (type == STREAM_INPUT__DATA_BLOCK) {
H
Haojun Liao 已提交
129
      for (int32_t i = 0; i < numOfBlocks; ++i) {
L
Liu Jicong 已提交
130
        SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
L
Liu Jicong 已提交
131 132
        SPackedData  tmp = {
             .pDataBlock = pDataBlock,
L
Liu Jicong 已提交
133 134
        };
        taosArrayPush(pInfo->pBlockLists, &tmp);
H
Haojun Liao 已提交
135
      }
L
Liu Jicong 已提交
136
      pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
137 138
    } else {
      ASSERT(0);
139 140
    }

H
Haojun Liao 已提交
141 142 143 144
    return TSDB_CODE_SUCCESS;
  }
}

L
Liu Jicong 已提交
145 146
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) {
  if (tinfo == NULL) {
S
Shengliang Guan 已提交
147
    return TSDB_CODE_APP_ERROR;
L
Liu Jicong 已提交
148 149 150 151 152 153 154 155 156 157 158 159 160
  }

  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;

  int32_t code = doSetStreamOpOpen(pTaskInfo->pRoot, GET_TASKID(pTaskInfo));
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo));
  } else {
    qDebug("%s set the stream block successfully", GET_TASKID(pTaskInfo));
  }

  return code;
}
161

L
Liu Jicong 已提交
162
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
H
Haojun Liao 已提交
163
  if (tinfo == NULL) {
S
Shengliang Guan 已提交
164
    return TSDB_CODE_APP_ERROR;
H
Haojun Liao 已提交
165 166
  }

H
Haojun Liao 已提交
167
  if (pBlocks == NULL || numOfBlocks == 0) {
H
Haojun Liao 已提交
168 169 170
    return TSDB_CODE_SUCCESS;
  }

L
Liu Jicong 已提交
171
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
H
Haojun Liao 已提交
172

C
Cary Xu 已提交
173
  int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void*)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
174
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
175
    qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
176
  } else {
H
Haojun Liao 已提交
177
    qDebug("%s set the stream block successfully", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
178 179 180 181 182
  }

  return code;
}

L
Liu Jicong 已提交
183 184
int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
  if (tinfo == NULL) {
S
Shengliang Guan 已提交
185
    return TSDB_CODE_APP_ERROR;
L
Liu Jicong 已提交
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203
  }

  if (pBlocks == NULL || numOfBlocks == 0) {
    return TSDB_CODE_SUCCESS;
  }

  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;

  int32_t code = doSetSMABlock(pTaskInfo->pRoot, (void*)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo));
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s failed to set the sma block data", GET_TASKID(pTaskInfo));
  } else {
    qDebug("%s set the sma block successfully", GET_TASKID(pTaskInfo));
  }

  return code;
}

H
Haojun Liao 已提交
204
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchema) {
L
Liu Jicong 已提交
205
  if (msg == NULL) {
L
Liu Jicong 已提交
206
    // create raw scan
207 208 209 210 211 212 213 214

    SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
    if (NULL == pTaskInfo) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
    setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);

215
    pTaskInfo->cost.created = taosGetTimestampUs();
216 217
    pTaskInfo->execModel = OPTR_EXEC_MODEL_QUEUE;
    pTaskInfo->pRoot = createRawScanOperatorInfo(readers, pTaskInfo);
L
Liu Jicong 已提交
218
    if (NULL == pTaskInfo->pRoot) {
219 220 221 222 223
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      taosMemoryFree(pTaskInfo);
      return NULL;
    }
    return pTaskInfo;
L
Liu Jicong 已提交
224 225
  }

H
Haojun Liao 已提交
226 227
  struct SSubplan* pPlan = NULL;
  int32_t          code = qStringToSubplan(msg, &pPlan);
L
Liu Jicong 已提交
228 229 230 231 232 233
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    return NULL;
  }

  qTaskInfo_t pTaskInfo = NULL;
H
Haojun Liao 已提交
234
  code = qCreateExecTask(readers, 0, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_QUEUE);
L
Liu Jicong 已提交
235
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
236 237
    nodesDestroyNode((SNode*)pPlan);
    qDestroyTask(pTaskInfo);
L
Liu Jicong 已提交
238 239 240 241
    terrno = code;
    return NULL;
  }

242
  // extract the number of output columns
H
Haojun Liao 已提交
243
  SDataBlockDescNode* pDescNode = pPlan->pNode->pOutputDataBlockDesc;
wmmhello's avatar
wmmhello 已提交
244
  *numOfCols = 0;
245

L
Liu Jicong 已提交
246
  SNode* pNode;
247 248 249
  FOREACH(pNode, pDescNode->pSlots) {
    SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
    if (pSlotDesc->output) {
wmmhello's avatar
wmmhello 已提交
250
      ++(*numOfCols);
251 252 253
    }
  }

L
Liu Jicong 已提交
254 255 256
  if (pSchema) {
    *pSchema = tCloneSSchemaWrapper(((SExecTaskInfo*)pTaskInfo)->schemaInfo.qsw);
  }
L
Liu Jicong 已提交
257 258 259
  return pTaskInfo;
}

L
Liu Jicong 已提交
260
qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers) {
L
Liu Jicong 已提交
261
  if (msg == NULL) {
262 263 264
    return NULL;
  }

265
  /*qDebugL("stream task string %s", (const char*)msg);*/
X
bugfix  
Xiaoyu Wang 已提交
266

H
Haojun Liao 已提交
267 268
  struct SSubplan* pPlan = NULL;
  int32_t          code = qStringToSubplan(msg, &pPlan);
269 270 271 272 273 274
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    return NULL;
  }

  qTaskInfo_t pTaskInfo = NULL;
H
Haojun Liao 已提交
275
  code = qCreateExecTask(readers, 0, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM);
276
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
277 278
    nodesDestroyNode((SNode*)pPlan);
    qDestroyTask(pTaskInfo);
279 280 281 282 283 284
    terrno = code;
    return NULL;
  }

  return pTaskInfo;
}
285

286
static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const SArray* tableIdList, const char* idstr) {
287
  SArray* qa = taosArrayInit(4, sizeof(tb_uid_t));
H
Haojun Liao 已提交
288 289 290 291
  int32_t numOfUids = taosArrayGetSize(tableIdList);
  if (numOfUids == 0) {
    return qa;
  }
292 293 294 295

  // let's discard the tables those are not created according to the queried super table.
  SMetaReader mr = {0};
  metaReaderInit(&mr, pScanInfo->readHandle.meta, 0);
H
Haojun Liao 已提交
296
  for (int32_t i = 0; i < numOfUids; ++i) {
297
    uint64_t* id = (uint64_t*)taosArrayGet(tableIdList, i);
298

299
    int32_t code = metaGetTableEntryByUid(&mr, *id);
300
    if (code != TSDB_CODE_SUCCESS) {
301
      qError("failed to get table meta, uid:%" PRIu64 " code:%s, %s", *id, tstrerror(terrno), idstr);
302 303 304
      continue;
    }

M
Minglei Jin 已提交
305 306
    tDecoderClear(&mr.coder);

307
    // TODO handle ntb case
L
Liu Jicong 已提交
308
    if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != pScanInfo->tableUid) {
309 310
      continue;
    }
311 312 313

    if (pScanInfo->pTagCond != NULL) {
      bool          qualified = false;
314
      STableKeyInfo info = {.groupId = 0, .uid = mr.me.uid};
H
Haojun Liao 已提交
315
      code = isQualifiedTable(&info, pScanInfo->pTagCond, pScanInfo->readHandle.meta, &qualified);
316 317 318 319 320 321 322 323 324 325
      if (code != TSDB_CODE_SUCCESS) {
        qError("failed to filter new table, uid:0x%" PRIx64 ", %s", info.uid, idstr);
        continue;
      }

      if (!qualified) {
        continue;
      }
    }

326
    // handle multiple partition
327 328 329 330 331 332 333
    taosArrayPush(qa, id);
  }

  metaReaderClear(&mr);
  return qa;
}

334
int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd) {
dengyihao's avatar
dengyihao 已提交
335
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
H
Haojun Liao 已提交
336 337

  if (isAdd) {
L
Liu Jicong 已提交
338
    qDebug("add %d tables id into query list, %s", (int32_t)taosArrayGetSize(tableIdList), pTaskInfo->id.str);
H
Haojun Liao 已提交
339 340
  }

341
  // traverse to the stream scanner node to add this table id
342
  SOperatorInfo* pInfo = pTaskInfo->pRoot;
L
Liu Jicong 已提交
343
  while (pInfo->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
344 345 346
    pInfo = pInfo->pDownstream[0];
  }

347 348
  int32_t          code = 0;
  SStreamScanInfo* pScanInfo = pInfo->info;
349
  if (isAdd) {  // add new table id
350
    SArray* qa = filterUnqualifiedTables(pScanInfo, tableIdList, GET_TASKID(pTaskInfo));
351 352 353
    int32_t numOfQualifiedTables = taosArrayGetSize(qa);

    qDebug(" %d qualified child tables added into stream scanner", numOfQualifiedTables);
354

L
Liu Jicong 已提交
355
    code = tqReaderAddTbUidList(pScanInfo->tqReader, qa);
356
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
357
      taosArrayDestroy(qa);
358 359 360
      return code;
    }

M
Minglei Jin 已提交
361
    bool   assignUid = false;
L
Liu Jicong 已提交
362 363
    size_t bufLen = (pScanInfo->pGroupTags != NULL) ? getTableTagsBufLen(pScanInfo->pGroupTags) : 0;
    char*  keyBuf = NULL;
364
    if (bufLen > 0) {
365
      assignUid = groupbyTbname(pScanInfo->pGroupTags);
366 367
      keyBuf = taosMemoryMalloc(bufLen);
      if (keyBuf == NULL) {
H
Haojun Liao 已提交
368
        taosArrayDestroy(qa);
369 370 371
        return TSDB_CODE_OUT_OF_MEMORY;
      }
    }
372

H
Haojun Liao 已提交
373
    STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList;
374 375

    for (int32_t i = 0; i < numOfQualifiedTables; ++i) {
376
      uint64_t*     uid = taosArrayGet(qa, i);
377
      STableKeyInfo keyInfo = {.uid = *uid, .groupId = 0};
378 379

      if (bufLen > 0) {
380 381 382 383 384 385
        if (assignUid) {
          keyInfo.groupId = keyInfo.uid;
        } else {
          code = getGroupIdFromTagsVal(pScanInfo->readHandle.meta, keyInfo.uid, pScanInfo->pGroupTags, keyBuf,
                                       &keyInfo.groupId);
          if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
386
            taosMemoryFree(keyBuf);
H
Haojun Liao 已提交
387
            taosArrayDestroy(qa);
388 389
            return code;
          }
390 391 392
        }
      }

L
Liu Jicong 已提交
393
#if 0
394
      bool exists = false;
H
Haojun Liao 已提交
395 396 397 398 399 400
      for (int32_t k = 0; k < taosArrayGetSize(pListInfo->pTableList); ++k) {
        STableKeyInfo* pKeyInfo = taosArrayGet(pListInfo->pTableList, k);
        if (pKeyInfo->uid == keyInfo.uid) {
          qWarn("ignore duplicated query table uid:%" PRIu64 " added, %s", pKeyInfo->uid, pTaskInfo->id.str);
          exists = true;
        }
401 402
      }

H
Haojun Liao 已提交
403
      if (!exists) {
404
#endif
405

H
Haojun Liao 已提交
406
      tableListAddTableInfo(pTableListInfo, keyInfo.uid, keyInfo.groupId);
407 408
    }

409 410 411 412
    if (keyBuf != NULL) {
      taosMemoryFree(keyBuf);
    }

413 414 415
    taosArrayDestroy(qa);
  } else {  // remove the table id in current list
    qDebug(" %d remove child tables from the stream scanner", (int32_t)taosArrayGetSize(tableIdList));
L
Liu Jicong 已提交
416
    code = tqReaderRemoveTbUidList(pScanInfo->tqReader, tableIdList);
417 418
  }

419
  return code;
L
fix  
Liu Jicong 已提交
420
}
421

422
int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion,
L
Liu Jicong 已提交
423
                                    int32_t* tversion) {
424
  ASSERT(tinfo != NULL && dbName != NULL && tableName != NULL);
425
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
426

427
  if (pTaskInfo->schemaInfo.sw == NULL) {
H
Haojun Liao 已提交
428 429 430
    return TSDB_CODE_SUCCESS;
  }

431 432 433 434
  *sversion = pTaskInfo->schemaInfo.sw->version;
  *tversion = pTaskInfo->schemaInfo.tversion;
  if (pTaskInfo->schemaInfo.dbname) {
    strcpy(dbName, pTaskInfo->schemaInfo.dbname);
435 436 437
  } else {
    dbName[0] = 0;
  }
438 439
  if (pTaskInfo->schemaInfo.tablename) {
    strcpy(tableName, pTaskInfo->schemaInfo.tablename);
440 441 442
  } else {
    tableName[0] = 0;
  }
443 444

  return 0;
445
}
446 447

int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
D
dapan1121 已提交
448
                        qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, char* sql, EOPTR_EXEC_MODEL model) {
449 450 451 452 453
  assert(pSubplan != NULL);
  SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;

  taosThreadOnce(&initPoolOnce, initRefPool);

454
  qDebug("start to create subplan task, TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, pSubplan->id.queryId);
455

456 457
  int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, sql, model);
  if (code != TSDB_CODE_SUCCESS) {
458
    qError("failed to createExecTaskInfoImpl, code: %s", tstrerror(code));
459 460 461
    goto _error;
  }

462
  SDataSinkMgtCfg cfg = {.maxDataBlockNum = 500, .maxDataBlockNumPerQuery = 50};
463 464
  code = dsDataSinkMgtInit(&cfg);
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
465
    qError("failed to dsDataSinkMgtInit, code:%s, %s", tstrerror(code), (*pTask)->id.str);
466 467 468 469 470 471 472
    goto _error;
  }

  if (handle) {
    void* pSinkParam = NULL;
    code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, pTaskInfo, readHandle);
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
473
      qError("failed to createDataSinkParam, vgId:%d, code:%s, %s", vgId, tstrerror(code), (*pTask)->id.str);
474 475 476
      goto _error;
    }

H
Haojun Liao 已提交
477
    code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam, (*pTask)->id.str);
L
Liu Jicong 已提交
478
    if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
479 480
      taosMemoryFreeClear(pSinkParam);
    }
481 482
  }

483
  qDebug("subplan task create completed, TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, pSubplan->id.queryId);
484

dengyihao's avatar
dengyihao 已提交
485
_error:
486 487 488 489
  // if failed to add ref for all tables in this query, abort current query
  return code;
}

H
Haojun Liao 已提交
490
static void freeBlock(void* param) {
491
  SSDataBlock* pBlock = *(SSDataBlock**)param;
H
Haojun Liao 已提交
492 493 494
  blockDataDestroy(pBlock);
}

495
int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bool* hasMore, SLocalFetch* pLocal) {
496 497 498
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  int64_t        threadId = taosGetSelfPthreadId();

D
dapan1121 已提交
499
  if (pLocal) {
500
    memcpy(&pTaskInfo->localFetch, pLocal, sizeof(*pLocal));
D
dapan1121 已提交
501
  }
L
Liu Jicong 已提交
502

H
Haojun Liao 已提交
503
  taosArrayClear(pResList);
H
Haojun Liao 已提交
504

505 506 507 508 509 510 511 512
  int64_t curOwner = 0;
  if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
    qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
    pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
    return pTaskInfo->code;
  }

  if (pTaskInfo->cost.start == 0) {
513
    pTaskInfo->cost.start = taosGetTimestampUs();
514 515 516
  }

  if (isTaskKilled(pTaskInfo)) {
517
    atomic_store_64(&pTaskInfo->owner, 0);
518 519 520 521 522 523 524 525 526
    qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
    return TSDB_CODE_SUCCESS;
  }

  // error occurs, record the error code and return to client
  int32_t ret = setjmp(pTaskInfo->env);
  if (ret != TSDB_CODE_SUCCESS) {
    pTaskInfo->code = ret;
    cleanUpUdfs();
H
Haojun Liao 已提交
527

528 529 530 531 532 533 534 535
    qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
    atomic_store_64(&pTaskInfo->owner, 0);

    return pTaskInfo->code;
  }

  qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));

536
  int32_t      current = 0;
H
Haojun Liao 已提交
537 538
  SSDataBlock* pRes = NULL;

539 540
  int64_t st = taosGetTimestampUs();

H
Haojun Liao 已提交
541
  int32_t blockIndex = 0;
542
  while ((pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot)) != NULL) {
H
Haojun Liao 已提交
543 544 545 546 547 548
    SSDataBlock* p = NULL;
    if (blockIndex >= taosArrayGetSize(pTaskInfo->pResultBlockList)) {
      SSDataBlock* p1 = createOneDataBlock(pRes, true);
      taosArrayPush(pTaskInfo->pResultBlockList, &p1);
      p = p1;
    } else {
L
Liu Jicong 已提交
549
      p = *(SSDataBlock**)taosArrayGet(pTaskInfo->pResultBlockList, blockIndex);
H
Haojun Liao 已提交
550 551 552 553 554
      copyDataBlock(p, pRes);
    }

    blockIndex += 1;

H
Haojun Liao 已提交
555 556 557 558 559 560 561 562 563
    current += p->info.rows;
    ASSERT(p->info.rows > 0);
    taosArrayPush(pResList, &p);

    if (current >= 4096) {
      break;
    }
  }

564
  *hasMore = (pRes != NULL);
565 566 567
  uint64_t el = (taosGetTimestampUs() - st);

  pTaskInfo->cost.elapsedTime += el;
H
Haojun Liao 已提交
568
  if (NULL == pRes) {
569 570 571 572 573
    *useconds = pTaskInfo->cost.elapsedTime;
  }

  cleanUpUdfs();

H
Haojun Liao 已提交
574
  uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
H
Haojun Liao 已提交
575
  qDebug("%s task suspended, %d rows in %d blocks returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
576
         GET_TASKID(pTaskInfo), current, (int32_t)taosArrayGetSize(pResList), total, 0, el / 1000.0);
577 578 579 580 581

  atomic_store_64(&pTaskInfo->owner, 0);
  return pTaskInfo->code;
}

H
Haojun Liao 已提交
582 583
void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
L
Liu Jicong 已提交
584 585 586
  SArray*        pList = pTaskInfo->pResultBlockList;
  size_t         num = taosArrayGetSize(pList);
  for (int32_t i = 0; i < num; ++i) {
H
Haojun Liao 已提交
587 588 589 590 591 592 593
    SSDataBlock** p = taosArrayGet(pTaskInfo->pResultBlockList, i);
    blockDataDestroy(*p);
  }

  taosArrayClear(pTaskInfo->pResultBlockList);
}

594 595 596 597 598 599 600 601 602 603 604 605 606
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  int64_t        threadId = taosGetSelfPthreadId();

  *pRes = NULL;
  int64_t curOwner = 0;
  if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
    qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
    pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
    return pTaskInfo->code;
  }

  if (pTaskInfo->cost.start == 0) {
607
    pTaskInfo->cost.start = taosGetTimestampUs();
608 609 610
  }

  if (isTaskKilled(pTaskInfo)) {
611
    atomic_store_64(&pTaskInfo->owner, 0);
612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649
    qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
    return TSDB_CODE_SUCCESS;
  }

  // error occurs, record the error code and return to client
  int32_t ret = setjmp(pTaskInfo->env);
  if (ret != TSDB_CODE_SUCCESS) {
    pTaskInfo->code = ret;
    cleanUpUdfs();
    qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
    atomic_store_64(&pTaskInfo->owner, 0);
    return pTaskInfo->code;
  }

  qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));

  int64_t st = taosGetTimestampUs();

  *pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot);
  uint64_t el = (taosGetTimestampUs() - st);

  pTaskInfo->cost.elapsedTime += el;
  if (NULL == *pRes) {
    *useconds = pTaskInfo->cost.elapsedTime;
  }

  cleanUpUdfs();

  int32_t  current = (*pRes != NULL) ? (*pRes)->info.rows : 0;
  uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;

  qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
         GET_TASKID(pTaskInfo), current, total, 0, el / 1000.0);

  atomic_store_64(&pTaskInfo->owner, 0);
  return pTaskInfo->code;
}

L
Liu Jicong 已提交
650
int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo) {
D
dapan1121 已提交
651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670
  taosWLockLatch(&pTaskInfo->stopInfo.lock);
  taosArrayPush(pTaskInfo->stopInfo.pStopInfo, pInfo);
  taosWUnLockLatch(&pTaskInfo->stopInfo.lock);

  return TSDB_CODE_SUCCESS;
}

int32_t stopInfoComp(void const* lp, void const* rp) {
  SExchangeOpStopInfo* key = (SExchangeOpStopInfo*)lp;
  SExchangeOpStopInfo* pInfo = (SExchangeOpStopInfo*)rp;

  if (key->refId < pInfo->refId) {
    return -1;
  } else if (key->refId > pInfo->refId) {
    return 1;
  }

  return 0;
}

L
Liu Jicong 已提交
671
void qRemoveTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo) {
D
dapan1121 已提交
672 673 674 675 676 677 678
  taosWLockLatch(&pTaskInfo->stopInfo.lock);
  int32_t idx = taosArraySearchIdx(pTaskInfo->stopInfo.pStopInfo, pInfo, stopInfoComp, TD_EQ);
  if (idx >= 0) {
    taosArrayRemove(pTaskInfo->stopInfo.pStopInfo, idx);
  }
  taosWUnLockLatch(&pTaskInfo->stopInfo.lock);

D
dapan1121 已提交
679
  return;
D
dapan1121 已提交
680 681 682 683 684 685 686
}

void qStopTaskOperators(SExecTaskInfo* pTaskInfo) {
  taosWLockLatch(&pTaskInfo->stopInfo.lock);

  int32_t num = taosArrayGetSize(pTaskInfo->stopInfo.pStopInfo);
  for (int32_t i = 0; i < num; ++i) {
L
Liu Jicong 已提交
687 688
    SExchangeOpStopInfo* pStop = taosArrayGet(pTaskInfo->stopInfo.pStopInfo, i);
    SExchangeInfo*       pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pStop->refId);
D
dapan1121 已提交
689 690 691 692 693 694 695 696 697
    if (pExchangeInfo) {
      tsem_post(&pExchangeInfo->ready);
      taosReleaseRef(exchangeObjRefPool, pStop->refId);
    }
  }

  taosWUnLockLatch(&pTaskInfo->stopInfo.lock);
}

D
dapan1121 已提交
698
int32_t qAsyncKillTask(qTaskInfo_t qinfo, int32_t rspCode) {
699 700 701 702 703 704 705
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;

  if (pTaskInfo == NULL) {
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

  qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
L
Liu Jicong 已提交
706

D
dapan1121 已提交
707
  setTaskKilled(pTaskInfo, rspCode);
D
dapan1121 已提交
708 709

  qStopTaskOperators(pTaskInfo);
L
Liu Jicong 已提交
710

711 712 713
  return TSDB_CODE_SUCCESS;
}

714 715 716 717 718 719 720 721 722
bool qTaskIsExecuting(qTaskInfo_t qinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
  if (NULL == pTaskInfo) {
    return false;
  }

  return 0 != atomic_load_64(&pTaskInfo->owner);
}

H
Haojun Liao 已提交
723 724
static void printTaskExecCostInLog(SExecTaskInfo* pTaskInfo) {
  STaskCostInfo* pSummary = &pTaskInfo->cost;
725
  int64_t        idleTime = pSummary->start - pSummary->created;
H
Haojun Liao 已提交
726 727 728 729

  SFileBlockLoadRecorder* pRecorder = pSummary->pRecoder;
  if (pSummary->pRecoder != NULL) {
    qDebug(
730
        "%s :cost summary: idle:%.2f ms, elapsed time:%.2f ms, extract tableList:%.2f ms, "
731
        "createGroupIdMap:%.2f ms, total blocks:%d, "
H
Haojun Liao 已提交
732
        "load block SMA:%d, load data block:%d, total rows:%" PRId64 ", check rows:%" PRId64,
733 734 735 736 737 738
        GET_TASKID(pTaskInfo), idleTime / 1000.0, pSummary->elapsedTime / 1000.0, pSummary->extractListTime,
        pSummary->groupIdMapTime, pRecorder->totalBlocks, pRecorder->loadBlockStatis, pRecorder->loadBlocks,
        pRecorder->totalRows, pRecorder->totalCheckedRows);
  } else {
    qDebug("%s :cost summary: idle in queue:%.2f ms, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), idleTime / 1000.0,
           pSummary->elapsedTime / 1000.0);
H
Haojun Liao 已提交
739 740 741
  }
}

742 743 744 745 746 747 748 749
void qDestroyTask(qTaskInfo_t qTaskHandle) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qTaskHandle;
  if (pTaskInfo == NULL) {
    return;
  }

  qDebug("%s execTask completed, numOfRows:%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->pRoot->resultInfo.totalRows);

H
Haojun Liao 已提交
750
  printTaskExecCostInLog(pTaskInfo);  // print the query cost summary
751 752 753
  doDestroyTask(pTaskInfo);
}

H
Haojun Liao 已提交
754
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList) {
755
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
H
Haojun Liao 已提交
756
  return getOperatorExplainExecInfo(pTaskInfo->pRoot, pExecInfoList);
757 758 759 760 761 762 763 764 765
}

int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) {
  SExecTaskInfo* pTaskInfo = (struct SExecTaskInfo*)tinfo;
  if (pTaskInfo->pRoot == NULL) {
    return TSDB_CODE_INVALID_PARA;
  }

  int32_t nOptrWithVal = 0;
L
Liu Jicong 已提交
766 767 768 769 770
  //  int32_t code = encodeOperator(pTaskInfo->pRoot, pOutput, len, &nOptrWithVal);
  //  if ((code == TSDB_CODE_SUCCESS) && (nOptrWithVal == 0)) {
  //    taosMemoryFreeClear(*pOutput);
  //    *len = 0;
  //  }
H
Haojun Liao 已提交
771
  return 0;
772 773 774 775 776 777 778 779 780
}

int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t len) {
  SExecTaskInfo* pTaskInfo = (struct SExecTaskInfo*)tinfo;

  if (pTaskInfo == NULL || pInput == NULL || len == 0) {
    return TSDB_CODE_INVALID_PARA;
  }

H
Haojun Liao 已提交
781
  return 0;
L
Liu Jicong 已提交
782
  //  return decodeOperator(pTaskInfo->pRoot, pInput, len);
783 784 785 786 787 788 789
}

int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  SOperatorInfo* pOperator = pTaskInfo->pRoot;

  while (1) {
790
    uint16_t type = pOperator->operatorType;
791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809
    if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
      *scanner = pOperator->info;
      return 0;
    } else {
      ASSERT(pOperator->numOfDownstream == 1);
      pOperator = pOperator->pDownstream[0];
    }
  }
}

#if 0
int32_t qStreamInput(qTaskInfo_t tinfo, void* pItem) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
  taosWriteQitem(pTaskInfo->streamInfo.inputQueue->queue, pItem);
  return 0;
}
#endif

810
int32_t qStreamSourceRecoverStep1(qTaskInfo_t tinfo, int64_t ver) {
811 812
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
L
Liu Jicong 已提交
813
  pTaskInfo->streamInfo.fillHistoryVer1 = ver;
814 815 816 817 818 819 820
  pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__PREPARE1;
  return 0;
}

int32_t qStreamSourceRecoverStep2(qTaskInfo_t tinfo, int64_t ver) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
L
Liu Jicong 已提交
821
  pTaskInfo->streamInfo.fillHistoryVer2 = ver;
822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841
  pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__PREPARE2;
  return 0;
}

int32_t qStreamRecoverFinish(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
  pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE;
  return 0;
}

int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  SOperatorInfo* pOperator = pTaskInfo->pRoot;

  while (1) {
    if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL ||
        pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
        pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
      SStreamIntervalOperatorInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
842 843 844 845 846 847 848 849 850
      ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||
             pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
      ASSERT(pInfo->twAggSup.calTriggerSaved == 0);
      ASSERT(pInfo->twAggSup.deleteMarkSaved == 0);

      qInfo("save stream param for interval: %d,  %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);

      pInfo->twAggSup.calTriggerSaved = pInfo->twAggSup.calTrigger;
      pInfo->twAggSup.deleteMarkSaved = pInfo->twAggSup.deleteMark;
851 852
      pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
      pInfo->twAggSup.deleteMark = INT64_MAX;
L
Liu Jicong 已提交
853

854 855 856 857
    } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
               pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
               pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
      SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
858 859 860 861 862 863 864 865 866
      ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||
             pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
      ASSERT(pInfo->twAggSup.calTriggerSaved == 0);
      ASSERT(pInfo->twAggSup.deleteMarkSaved == 0);

      qInfo("save stream param for session: %d,  %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);

      pInfo->twAggSup.calTriggerSaved = pInfo->twAggSup.calTrigger;
      pInfo->twAggSup.deleteMarkSaved = pInfo->twAggSup.deleteMark;
867 868 869 870
      pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
      pInfo->twAggSup.deleteMark = INT64_MAX;
    } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
      SStreamStateAggOperatorInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
871 872 873 874 875 876 877 878 879
      ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||
             pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
      ASSERT(pInfo->twAggSup.calTriggerSaved == 0);
      ASSERT(pInfo->twAggSup.deleteMarkSaved == 0);

      qInfo("save stream param for state: %d,  %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);

      pInfo->twAggSup.calTriggerSaved = pInfo->twAggSup.calTrigger;
      pInfo->twAggSup.deleteMarkSaved = pInfo->twAggSup.deleteMark;
880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908
      pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
      pInfo->twAggSup.deleteMark = INT64_MAX;
    }

    // iterate operator tree
    if (pOperator->numOfDownstream != 1 || pOperator->pDownstream[0] == NULL) {
      if (pOperator->numOfDownstream > 1) {
        qError("unexpected stream, multiple downstream");
        ASSERT(0);
        return -1;
      }
      return 0;
    } else {
      pOperator = pOperator->pDownstream[0];
    }
  }

  return 0;
}

int32_t qStreamRestoreParam(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  SOperatorInfo* pOperator = pTaskInfo->pRoot;

  while (1) {
    if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL ||
        pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
        pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
      SStreamIntervalOperatorInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
909 910
      /*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE);*/
      /*ASSERT(pInfo->twAggSup.deleteMark == INT64_MAX);*/
L
Liu Jicong 已提交
911 912 913

      pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
      pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
L
Liu Jicong 已提交
914 915
      /*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||*/
      /*pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);*/
L
Liu Jicong 已提交
916
      qInfo("restore stream param for interval: %d,  %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
917 918 919 920
    } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
               pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
               pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
      SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
921 922
      /*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE);*/
      /*ASSERT(pInfo->twAggSup.deleteMark == INT64_MAX);*/
L
Liu Jicong 已提交
923 924 925

      pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
      pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
L
Liu Jicong 已提交
926 927
      /*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||*/
      /*pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);*/
L
Liu Jicong 已提交
928
      qInfo("restore stream param for session: %d,  %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
929 930
    } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
      SStreamStateAggOperatorInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
931 932
      /*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE);*/
      /*ASSERT(pInfo->twAggSup.deleteMark == INT64_MAX);*/
L
Liu Jicong 已提交
933 934 935

      pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
      pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
L
Liu Jicong 已提交
936 937
      /*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||*/
      /*pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);*/
L
Liu Jicong 已提交
938
      qInfo("restore stream param for state: %d,  %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
939 940 941 942 943 944
    }

    // iterate operator tree
    if (pOperator->numOfDownstream != 1 || pOperator->pDownstream[0] == NULL) {
      if (pOperator->numOfDownstream > 1) {
        qError("unexpected stream, multiple downstream");
L
Liu Jicong 已提交
945
        /*ASSERT(0);*/
946 947 948 949 950 951 952
        return -1;
      }
      return 0;
    } else {
      pOperator = pOperator->pDownstream[0];
    }
  }
953 954
  return 0;
}
L
Liu Jicong 已提交
955 956 957 958
bool qStreamRecoverScanFinished(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  return pTaskInfo->streamInfo.recoverScanFinished;
}
959 960 961 962 963 964

void* qExtractReaderFromStreamScanner(void* scanner) {
  SStreamScanInfo* pInfo = scanner;
  return (void*)pInfo->tqReader;
}

wmmhello's avatar
wmmhello 已提交
965 966 967 968 969 970 971 972
const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  return pTaskInfo->streamInfo.schema;
}

const char* qExtractTbnameFromTask(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  return pTaskInfo->streamInfo.tbName;
973 974
}

wmmhello's avatar
wmmhello 已提交
975
SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) {
976 977
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
wmmhello's avatar
wmmhello 已提交
978
  return &pTaskInfo->streamInfo.metaRsp;
979 980
}

wmmhello's avatar
wmmhello 已提交
981 982 983 984 985 986
int64_t qStreamExtractPrepareUid(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
  return pTaskInfo->streamInfo.prepareStatus.uid;
}

987 988 989 990 991 992 993
int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
  memcpy(pOffset, &pTaskInfo->streamInfo.lastStatus, sizeof(STqOffsetVal));
  return 0;
}

H
Haojun Liao 已提交
994
int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* sContext, SMetaTableInfo* pMtInfo) {
995 996
  memset(pCond, 0, sizeof(SQueryTableDataCond));
  pCond->order = TSDB_ORDER_ASC;
H
Haojun Liao 已提交
997
  pCond->numOfCols = pMtInfo->schema->nCols;
998
  pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo));
H
Haojun Liao 已提交
999 1000 1001 1002
  pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t) * pCond->numOfCols);
  if (pCond->colList == NULL || pCond->pSlotList == NULL) {
    taosMemoryFreeClear(pCond->colList);
    taosMemoryFreeClear(pCond->pSlotList);
S
Shengliang Guan 已提交
1003
    terrno = TSDB_CODE_OUT_OF_MEMORY;
1004 1005 1006
    return terrno;
  }

H
Haojun Liao 已提交
1007
  pCond->twindows = TSWINDOW_INITIALIZER;
H
Haojun Liao 已提交
1008
  pCond->suid = pMtInfo->suid;
1009 1010 1011 1012 1013
  pCond->type = TIMEWINDOW_RANGE_CONTAINED;
  pCond->startVersion = -1;
  pCond->endVersion = sContext->snapVersion;

  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
H
Haojun Liao 已提交
1014 1015 1016 1017 1018 1019
    SColumnInfo* pColInfo = &pCond->colList[i];
    pColInfo->type = pMtInfo->schema->pSchema[i].type;
    pColInfo->bytes = pMtInfo->schema->pSchema[i].bytes;
    pColInfo->colId = pMtInfo->schema->pSchema[i].colId;

    pCond->pSlotList[i] = i;
1020 1021 1022 1023 1024
  }

  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
1025
#if 0
L
Liu Jicong 已提交
1026
int32_t qStreamScanMemData(qTaskInfo_t tinfo, const SSubmitReq* pReq, int64_t scanVer) {
L
Liu Jicong 已提交
1027 1028 1029 1030
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
  ASSERT(pTaskInfo->streamInfo.pReq == NULL);
  pTaskInfo->streamInfo.pReq = pReq;
L
Liu Jicong 已提交
1031 1032 1033
  pTaskInfo->streamInfo.scanVer = scanVer;
  return 0;
}
L
Liu Jicong 已提交
1034
#endif
L
Liu Jicong 已提交
1035

L
Liu Jicong 已提交
1036
int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit) {
L
Liu Jicong 已提交
1037
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
1038 1039 1040
  ASSERT((pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE )&& (pTaskInfo->streamInfo.submit.msgStr == NULL));
  qDebug("set the submit block for future scan");

L
Liu Jicong 已提交
1041
  pTaskInfo->streamInfo.submit = submit;
L
Liu Jicong 已提交
1042 1043 1044
  return 0;
}

1045
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) {
1046 1047 1048 1049
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  SOperatorInfo* pOperator = pTaskInfo->pRoot;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
  pTaskInfo->streamInfo.prepareStatus = *pOffset;
1050
  pTaskInfo->streamInfo.returned = 0;
1051

1052 1053 1054
  if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus)) {
    return 0;
  }
1055

1056 1057
  if (subType == TOPIC_SUB_TYPE__COLUMN) {
    pOperator->status = OP_OPENED;
1058

1059
    // TODO add more check
1060
    if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
1061 1062 1063
      ASSERT(pOperator->numOfDownstream == 1);
      pOperator = pOperator->pDownstream[0];
    }
L
Liu Jicong 已提交
1064

1065 1066 1067
    SStreamScanInfo* pInfo = pOperator->info;
    if (pOffset->type == TMQ_OFFSET__LOG) {
      STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1068 1069
      tsdbReaderClose(pTSInfo->base.dataReader);
      pTSInfo->base.dataReader = NULL;
H
Haojun Liao 已提交
1070
      // let's seek to the next version in wal file
1071 1072 1073 1074
      if (tqSeekVer(pInfo->tqReader, pOffset->version + 1) < 0) {
        return -1;
      }
    } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
1075 1076
      // iterate all tables from tableInfoList, and retrieve rows from each table one-by-one
      // those data are from the snapshot in tsdb, besides the data in the wal file.
1077 1078 1079 1080
      int64_t uid = pOffset->uid;
      int64_t ts = pOffset->ts;

      if (uid == 0) {
H
Haojun Liao 已提交
1081 1082
        if (tableListGetSize(pTaskInfo->pTableInfoList) != 0) {
          STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, 0);
1083 1084 1085
          uid = pTableInfo->uid;
          ts = INT64_MIN;
        } else {
L
Liu Jicong 已提交
1086 1087
          return -1;
        }
1088
      }
H
Haojun Liao 已提交
1089

1090 1091 1092
      /*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/
      /*pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {*/
      STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1093
      int32_t         numOfTables = tableListGetSize(pTaskInfo->pTableInfoList);
H
Haojun Liao 已提交
1094 1095

#ifndef NDEBUG
wmmhello's avatar
wmmhello 已提交
1096 1097
      qDebug("switch to next table %" PRId64 " (cursor %d), %" PRId64 " rows returned", uid,
             pTableScanInfo->currentTable, pInfo->pTableScanOp->resultInfo.totalRows);
1098
      pInfo->pTableScanOp->resultInfo.totalRows = 0;
H
Haojun Liao 已提交
1099 1100
#endif

1101
      bool found = false;
1102
      for (int32_t i = 0; i < numOfTables; i++) {
H
Haojun Liao 已提交
1103
        STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, i);
1104 1105 1106 1107
        if (pTableInfo->uid == uid) {
          found = true;
          pTableScanInfo->currentTable = i;
          break;
L
Liu Jicong 已提交
1108
        }
1109
      }
1110

L
Liu Jicong 已提交
1111
      // TODO after dropping table, table may not found
1112
      ASSERT(found);
1113

H
Haojun Liao 已提交
1114
      if (pTableScanInfo->base.dataReader == NULL) {
H
Haojun Liao 已提交
1115
        STableKeyInfo* pList = tableListGetInfo(pTaskInfo->pTableInfoList, 0);
dengyihao's avatar
dengyihao 已提交
1116
        int32_t        num = tableListGetSize(pTaskInfo->pTableInfoList);
1117

H
Haojun Liao 已提交
1118
        if (tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &pTableScanInfo->base.cond, pList, num,
H
Haojun Liao 已提交
1119
                           pTableScanInfo->pResBlock, &pTableScanInfo->base.dataReader, NULL) < 0 ||
H
Haojun Liao 已提交
1120
            pTableScanInfo->base.dataReader == NULL) {
1121
          ASSERT(0);
L
Liu Jicong 已提交
1122
        }
1123
      }
H
Haojun Liao 已提交
1124

1125
      STableKeyInfo tki = {.uid = uid};
H
Haojun Liao 已提交
1126 1127 1128 1129 1130
      tsdbSetTableList(pTableScanInfo->base.dataReader, &tki, 1);
      int64_t oldSkey = pTableScanInfo->base.cond.twindows.skey;
      pTableScanInfo->base.cond.twindows.skey = ts + 1;
      tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
      pTableScanInfo->base.cond.twindows.skey = oldSkey;
1131
      pTableScanInfo->scanTimes = 0;
1132

wmmhello's avatar
wmmhello 已提交
1133
      qDebug("tsdb reader offset seek to uid %" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid,
1134
             ts, pTableScanInfo->currentTable, numOfTables);
1135 1136
    } else {
      ASSERT(0);
1137
    }
L
Liu Jicong 已提交
1138
  } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
1139
    SStreamRawScanInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
1140 1141 1142
    SSnapContext*       sContext = pInfo->sContext;
    if (setForSnapShot(sContext, pOffset->uid) != 0) {
      qError("setDataForSnapShot error. uid:%" PRIi64, pOffset->uid);
1143 1144 1145 1146 1147 1148
      return -1;
    }

    SMetaTableInfo mtInfo = getUidfromSnapShot(sContext);
    tsdbReaderClose(pInfo->dataReader);
    pInfo->dataReader = NULL;
H
Haojun Liao 已提交
1149

1150
    cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
H
Haojun Liao 已提交
1151 1152 1153 1154 1155
    tableListClear(pTaskInfo->pTableInfoList);

    if (mtInfo.uid == 0) {
      return 0;  // no data
    }
1156

H
Haojun Liao 已提交
1157
    initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, &mtInfo);
1158
    pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts;
1159

dengyihao's avatar
dengyihao 已提交
1160
    if (pTaskInfo->pTableInfoList == NULL) {
H
Haojun Liao 已提交
1161 1162 1163 1164
      pTaskInfo->pTableInfoList = tableListCreate();
    }

    tableListAddTableInfo(pTaskInfo->pTableInfoList, mtInfo.uid, 0);
1165

H
Haojun Liao 已提交
1166
    STableKeyInfo* pList = tableListGetInfo(pTaskInfo->pTableInfoList, 0);
dengyihao's avatar
dengyihao 已提交
1167
    int32_t        size = tableListGetSize(pTaskInfo->pTableInfoList);
H
Haojun Liao 已提交
1168
    ASSERT(size == 1);
1169

H
Haojun Liao 已提交
1170
    tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, NULL, &pInfo->dataReader, NULL);
1171

1172
    cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
L
Liu Jicong 已提交
1173 1174 1175 1176
    strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName);
    tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema);
    pTaskInfo->streamInfo.schema = mtInfo.schema;

1177
    qDebug("tmqsnap qStreamPrepareScan snapshot data uid:%" PRId64 " ts %" PRId64, mtInfo.uid, pOffset->ts);
L
Liu Jicong 已提交
1178
  } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_META) {
wmmhello's avatar
wmmhello 已提交
1179
    SStreamRawScanInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
1180 1181
    SSnapContext*       sContext = pInfo->sContext;
    if (setForSnapShot(sContext, pOffset->uid) != 0) {
H
Haojun Liao 已提交
1182
      qError("setForSnapShot error. uid:%" PRIu64 " ,version:%" PRId64, pOffset->uid, pOffset->version);
wmmhello's avatar
wmmhello 已提交
1183 1184
      return -1;
    }
1185
    qDebug("tmqsnap qStreamPrepareScan snapshot meta uid:%" PRId64 " ts %" PRId64, pOffset->uid, pOffset->ts);
L
Liu Jicong 已提交
1186
  } else if (pOffset->type == TMQ_OFFSET__LOG) {
wmmhello's avatar
wmmhello 已提交
1187 1188 1189
    SStreamRawScanInfo* pInfo = pOperator->info;
    tsdbReaderClose(pInfo->dataReader);
    pInfo->dataReader = NULL;
wmmhello's avatar
wmmhello 已提交
1190
    qDebug("tmqsnap qStreamPrepareScan snapshot log");
1191 1192 1193
  }
  return 0;
}
H
Haojun Liao 已提交
1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213

void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
  assert(pMsg->info.ahandle != NULL);

  SDataBuf buf = {.len = pMsg->contLen, .pData = NULL};

  if (pMsg->contLen > 0) {
    buf.pData = taosMemoryCalloc(1, pMsg->contLen);
    if (buf.pData == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
    } else {
      memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
    }
  }

  pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
  rpcFreeCont(pMsg->pCont);
  destroySendMsgInfo(pSendInfo);
L
Liu Jicong 已提交
1214
}
L
Liu Jicong 已提交
1215