executor.c 42.9 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14 15 16
 */

#include "executor.h"
H
Haojun Liao 已提交
17
#include "executorimpl.h"
18
#include "planner.h"
L
Liu Jicong 已提交
19
#include "tdatablock.h"
L
Liu Jicong 已提交
20
#include "tref.h"
21
#include "tudf.h"
L
Liu Jicong 已提交
22
#include "vnode.h"
23 24 25 26 27 28 29 30

static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT;
int32_t             exchangeObjRefPool = -1;

static void cleanupRefPool() {
  int32_t ref = atomic_val_compare_exchange_32(&exchangeObjRefPool, exchangeObjRefPool, 0);
  taosCloseRef(ref);
}
31

D
dapan1121 已提交
32 33 34 35 36
static void initRefPool() { 
  exchangeObjRefPool = taosOpenRef(1024, doDestroyExchangeOperatorInfo);   
  atexit(cleanupRefPool);
}

L
Liu Jicong 已提交
37 38 39 40
static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) {
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
    if (pOperator->numOfDownstream == 0) {
      qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
S
Shengliang Guan 已提交
41
      return TSDB_CODE_APP_ERROR;
L
Liu Jicong 已提交
42 43 44 45
    }

    if (pOperator->numOfDownstream > 1) {  // not handle this in join query
      qError("join not supported for stream block scan, %s" PRIx64, id);
S
Shengliang Guan 已提交
46
      return TSDB_CODE_APP_ERROR;
L
Liu Jicong 已提交
47 48 49 50 51 52 53 54 55 56
    }
    pOperator->status = OP_NOT_OPENED;
    return doSetSMABlock(pOperator->pDownstream[0], input, numOfBlocks, type, id);
  } else {
    pOperator->status = OP_NOT_OPENED;

    SStreamScanInfo* pInfo = pOperator->info;

    if (type == STREAM_INPUT__MERGED_SUBMIT) {
      for (int32_t i = 0; i < numOfBlocks; i++) {
K
kailixu 已提交
57 58
        SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData));
        taosArrayPush(pInfo->pBlockLists, pReq);
L
Liu Jicong 已提交
59 60 61 62 63 64 65 66
      }
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
    } else if (type == STREAM_INPUT__DATA_SUBMIT) {
      taosArrayPush(pInfo->pBlockLists, &input);
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
    } else if (type == STREAM_INPUT__DATA_BLOCK) {
      for (int32_t i = 0; i < numOfBlocks; ++i) {
        SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
K
kailixu 已提交
67 68 69 70
        SPackedData  tmp = {
             .pDataBlock = pDataBlock,
        };
        taosArrayPush(pInfo->pBlockLists, &tmp);
L
Liu Jicong 已提交
71 72 73 74 75 76 77 78
      }
      pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
    }

    return TSDB_CODE_SUCCESS;
  }
}

L
Liu Jicong 已提交
79
static int32_t doSetStreamOpOpen(SOperatorInfo* pOperator, char* id) {
H
Haojun Liao 已提交
80 81 82 83 84
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
    if (pOperator->numOfDownstream == 0) {
      qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
      return TSDB_CODE_APP_ERROR;
    }
L
Liu Jicong 已提交
85

H
Haojun Liao 已提交
86 87 88
    if (pOperator->numOfDownstream > 1) {  // not handle this in join query
      qError("join not supported for stream block scan, %s" PRIx64, id);
      return TSDB_CODE_APP_ERROR;
L
Liu Jicong 已提交
89
    }
H
Haojun Liao 已提交
90 91
    pOperator->status = OP_NOT_OPENED;
    return doSetStreamOpOpen(pOperator->pDownstream[0], id);
L
Liu Jicong 已提交
92 93 94 95
  }
  return 0;
}

96 97 98 99 100 101 102 103 104 105 106
static void clearStreamBlock(SOperatorInfo* pOperator) {
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
    if (pOperator->numOfDownstream == 1) {
      return clearStreamBlock(pOperator->pDownstream[0]);
    }
  } else {
    SStreamScanInfo* pInfo = pOperator->info;
    doClearBufferedBlocks(pInfo);
  }
}

5
54liuyao 已提交
107 108 109 110 111 112
void resetTaskInfo(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  pTaskInfo->code = 0;
  clearStreamBlock(pTaskInfo->pRoot);
}

L
Liu Jicong 已提交
113
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) {
X
Xiaoyu Wang 已提交
114
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
H
Haojun Liao 已提交
115
    if (pOperator->numOfDownstream == 0) {
116
      qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
S
Shengliang Guan 已提交
117
      return TSDB_CODE_APP_ERROR;
H
Haojun Liao 已提交
118
    }
H
Haojun Liao 已提交
119

H
Haojun Liao 已提交
120
    if (pOperator->numOfDownstream > 1) {  // not handle this in join query
121
      qError("join not supported for stream block scan, %s" PRIx64, id);
S
Shengliang Guan 已提交
122
      return TSDB_CODE_APP_ERROR;
H
Haojun Liao 已提交
123
    }
L
Liu Jicong 已提交
124
    pOperator->status = OP_NOT_OPENED;
L
Liu Jicong 已提交
125
    return doSetStreamBlock(pOperator->pDownstream[0], input, numOfBlocks, type, id);
H
Haojun Liao 已提交
126
  } else {
127 128
    pOperator->status = OP_NOT_OPENED;

129
    SStreamScanInfo* pInfo = pOperator->info;
130
    qDebug("stream set total blocks:%d, task id:%s" PRIx64, (int32_t)numOfBlocks, id);
L
Liu Jicong 已提交
131 132
    ASSERT(pInfo->validBlockIndex == 0);
    ASSERT(taosArrayGetSize(pInfo->pBlockLists) == 0);
133

L
Liu Jicong 已提交
134
    if (type == STREAM_INPUT__MERGED_SUBMIT) {
C
Cary Xu 已提交
135
      // ASSERT(numOfBlocks > 1);
L
Liu Jicong 已提交
136
      for (int32_t i = 0; i < numOfBlocks; i++) {
L
Liu Jicong 已提交
137
        SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData));
L
Liu Jicong 已提交
138
        taosArrayPush(pInfo->pBlockLists, pReq);
139
      }
L
Liu Jicong 已提交
140 141 142
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
    } else if (type == STREAM_INPUT__DATA_SUBMIT) {
      ASSERT(numOfBlocks == 1);
L
Liu Jicong 已提交
143
      taosArrayPush(pInfo->pBlockLists, input);
L
Liu Jicong 已提交
144
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
L
Liu Jicong 已提交
145
    } else if (type == STREAM_INPUT__DATA_BLOCK) {
H
Haojun Liao 已提交
146
      for (int32_t i = 0; i < numOfBlocks; ++i) {
L
Liu Jicong 已提交
147
        SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
L
Liu Jicong 已提交
148 149
        SPackedData  tmp = {
             .pDataBlock = pDataBlock,
L
Liu Jicong 已提交
150 151
        };
        taosArrayPush(pInfo->pBlockLists, &tmp);
H
Haojun Liao 已提交
152
      }
L
Liu Jicong 已提交
153
      pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
154 155
    } else {
      ASSERT(0);
156 157
    }

H
Haojun Liao 已提交
158 159 160 161
    return TSDB_CODE_SUCCESS;
  }
}

162 163 164 165 166 167 168 169 170 171 172 173 174
void doSetTaskId(SOperatorInfo* pOperator) {
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
    SStreamScanInfo* pStreamScanInfo = pOperator->info;
    STableScanInfo*  pScanInfo = pStreamScanInfo->pTableScanOp->info;
    if (pScanInfo->base.dataReader != NULL) {
      tsdbReaderSetId(pScanInfo->base.dataReader, pTaskInfo->id.str);
    }
  } else {
    doSetTaskId(pOperator->pDownstream[0]);
  }
}

H
Haojun Liao 已提交
175 176 177 178 179 180
void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId) {
  SExecTaskInfo* pTaskInfo = tinfo;
  pTaskInfo->id.queryId = queryId;

  taosMemoryFreeClear(pTaskInfo->id.str);
  pTaskInfo->id.str = buildTaskId(taskId, queryId);
181 182 183

  // set the idstr for tsdbReader
  doSetTaskId(pTaskInfo->pRoot);
H
Haojun Liao 已提交
184 185
}

L
Liu Jicong 已提交
186 187
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) {
  if (tinfo == NULL) {
S
Shengliang Guan 已提交
188
    return TSDB_CODE_APP_ERROR;
L
Liu Jicong 已提交
189 190 191 192 193 194 195 196 197 198 199 200 201
  }

  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;

  int32_t code = doSetStreamOpOpen(pTaskInfo->pRoot, GET_TASKID(pTaskInfo));
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo));
  } else {
    qDebug("%s set the stream block successfully", GET_TASKID(pTaskInfo));
  }

  return code;
}
202

L
Liu Jicong 已提交
203
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
H
Haojun Liao 已提交
204
  if (tinfo == NULL) {
S
Shengliang Guan 已提交
205
    return TSDB_CODE_APP_ERROR;
H
Haojun Liao 已提交
206 207
  }

H
Haojun Liao 已提交
208
  if (pBlocks == NULL || numOfBlocks == 0) {
H
Haojun Liao 已提交
209 210 211
    return TSDB_CODE_SUCCESS;
  }

L
Liu Jicong 已提交
212
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
H
Haojun Liao 已提交
213

C
Cary Xu 已提交
214
  int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void*)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
215
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
216
    qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
217
  } else {
H
Haojun Liao 已提交
218
    qDebug("%s set the stream block successfully", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
219 220 221 222 223
  }

  return code;
}

L
Liu Jicong 已提交
224 225
int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
  if (tinfo == NULL) {
S
Shengliang Guan 已提交
226
    return TSDB_CODE_APP_ERROR;
L
Liu Jicong 已提交
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244
  }

  if (pBlocks == NULL || numOfBlocks == 0) {
    return TSDB_CODE_SUCCESS;
  }

  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;

  int32_t code = doSetSMABlock(pTaskInfo->pRoot, (void*)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo));
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s failed to set the sma block data", GET_TASKID(pTaskInfo));
  } else {
    qDebug("%s set the sma block successfully", GET_TASKID(pTaskInfo));
  }

  return code;
}

245
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols, SSchemaWrapper** pSchema) {
L
Liu Jicong 已提交
246
  if (msg == NULL) {
L
Liu Jicong 已提交
247
    // create raw scan
248 249 250 251 252
    SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
    if (NULL == pTaskInfo) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
H
Haojun Liao 已提交
253

254 255
    setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);

256
    pTaskInfo->cost.created = taosGetTimestampUs();
257
    pTaskInfo->execModel = OPTR_EXEC_MODEL_QUEUE;
258
    pTaskInfo->pRoot = createRawScanOperatorInfo(pReaderHandle, pTaskInfo);
L
Liu Jicong 已提交
259
    if (NULL == pTaskInfo->pRoot) {
260 261 262 263 264
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      taosMemoryFree(pTaskInfo);
      return NULL;
    }
    return pTaskInfo;
L
Liu Jicong 已提交
265 266
  }

H
Haojun Liao 已提交
267 268
  struct SSubplan* pPlan = NULL;
  int32_t          code = qStringToSubplan(msg, &pPlan);
L
Liu Jicong 已提交
269 270 271 272 273 274
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    return NULL;
  }

  qTaskInfo_t pTaskInfo = NULL;
275
  code = qCreateExecTask(pReaderHandle, vgId, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_QUEUE);
L
Liu Jicong 已提交
276
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
277 278
    nodesDestroyNode((SNode*)pPlan);
    qDestroyTask(pTaskInfo);
L
Liu Jicong 已提交
279 280 281 282
    terrno = code;
    return NULL;
  }

283
  // extract the number of output columns
H
Haojun Liao 已提交
284
  SDataBlockDescNode* pDescNode = pPlan->pNode->pOutputDataBlockDesc;
wmmhello's avatar
wmmhello 已提交
285
  *numOfCols = 0;
286

L
Liu Jicong 已提交
287
  SNode* pNode;
288 289 290
  FOREACH(pNode, pDescNode->pSlots) {
    SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
    if (pSlotDesc->output) {
wmmhello's avatar
wmmhello 已提交
291
      ++(*numOfCols);
292 293 294
    }
  }

L
Liu Jicong 已提交
295 296 297
  if (pSchema) {
    *pSchema = tCloneSSchemaWrapper(((SExecTaskInfo*)pTaskInfo)->schemaInfo.qsw);
  }
L
Liu Jicong 已提交
298 299 300
  return pTaskInfo;
}

301
qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t vgId) {
L
Liu Jicong 已提交
302
  if (msg == NULL) {
303 304 305
    return NULL;
  }

H
Haojun Liao 已提交
306 307
  struct SSubplan* pPlan = NULL;
  int32_t          code = qStringToSubplan(msg, &pPlan);
308 309 310 311 312 313
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    return NULL;
  }

  qTaskInfo_t pTaskInfo = NULL;
314
  code = qCreateExecTask(readers, vgId, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM);
315
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
316 317
    nodesDestroyNode((SNode*)pPlan);
    qDestroyTask(pTaskInfo);
318 319 320 321 322 323
    terrno = code;
    return NULL;
  }

  return pTaskInfo;
}
324

325
static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const SArray* tableIdList, const char* idstr) {
326
  SArray* qa = taosArrayInit(4, sizeof(tb_uid_t));
H
Haojun Liao 已提交
327 328 329 330
  int32_t numOfUids = taosArrayGetSize(tableIdList);
  if (numOfUids == 0) {
    return qa;
  }
331 332 333 334

  // let's discard the tables those are not created according to the queried super table.
  SMetaReader mr = {0};
  metaReaderInit(&mr, pScanInfo->readHandle.meta, 0);
H
Haojun Liao 已提交
335
  for (int32_t i = 0; i < numOfUids; ++i) {
336
    uint64_t* id = (uint64_t*)taosArrayGet(tableIdList, i);
337

338
    int32_t code = metaGetTableEntryByUid(&mr, *id);
339
    if (code != TSDB_CODE_SUCCESS) {
340
      qError("failed to get table meta, uid:%" PRIu64 " code:%s, %s", *id, tstrerror(terrno), idstr);
341 342 343
      continue;
    }

M
Minglei Jin 已提交
344 345
    tDecoderClear(&mr.coder);

346
    // TODO handle ntb case
L
Liu Jicong 已提交
347
    if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != pScanInfo->tableUid) {
348 349
      continue;
    }
350 351 352

    if (pScanInfo->pTagCond != NULL) {
      bool          qualified = false;
353
      STableKeyInfo info = {.groupId = 0, .uid = mr.me.uid};
H
Haojun Liao 已提交
354
      code = isQualifiedTable(&info, pScanInfo->pTagCond, pScanInfo->readHandle.meta, &qualified);
355 356 357 358 359 360 361 362 363 364
      if (code != TSDB_CODE_SUCCESS) {
        qError("failed to filter new table, uid:0x%" PRIx64 ", %s", info.uid, idstr);
        continue;
      }

      if (!qualified) {
        continue;
      }
    }

365
    // handle multiple partition
366 367 368 369 370 371 372
    taosArrayPush(qa, id);
  }

  metaReaderClear(&mr);
  return qa;
}

373
int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd) {
dengyihao's avatar
dengyihao 已提交
374
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
H
Haojun Liao 已提交
375 376

  if (isAdd) {
L
Liu Jicong 已提交
377
    qDebug("add %d tables id into query list, %s", (int32_t)taosArrayGetSize(tableIdList), pTaskInfo->id.str);
H
Haojun Liao 已提交
378 379
  }

380
  // traverse to the stream scanner node to add this table id
381
  SOperatorInfo* pInfo = pTaskInfo->pRoot;
L
Liu Jicong 已提交
382
  while (pInfo->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
383 384 385
    pInfo = pInfo->pDownstream[0];
  }

386 387
  int32_t          code = 0;
  SStreamScanInfo* pScanInfo = pInfo->info;
388
  if (isAdd) {  // add new table id
389
    SArray* qa = filterUnqualifiedTables(pScanInfo, tableIdList, GET_TASKID(pTaskInfo));
390 391 392
    int32_t numOfQualifiedTables = taosArrayGetSize(qa);

    qDebug(" %d qualified child tables added into stream scanner", numOfQualifiedTables);
393

L
Liu Jicong 已提交
394
    code = tqReaderAddTbUidList(pScanInfo->tqReader, qa);
395
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
396
      taosArrayDestroy(qa);
397 398 399
      return code;
    }

M
Minglei Jin 已提交
400
    bool   assignUid = false;
L
Liu Jicong 已提交
401 402
    size_t bufLen = (pScanInfo->pGroupTags != NULL) ? getTableTagsBufLen(pScanInfo->pGroupTags) : 0;
    char*  keyBuf = NULL;
403
    if (bufLen > 0) {
404
      assignUid = groupbyTbname(pScanInfo->pGroupTags);
405 406
      keyBuf = taosMemoryMalloc(bufLen);
      if (keyBuf == NULL) {
H
Haojun Liao 已提交
407
        taosArrayDestroy(qa);
408 409 410
        return TSDB_CODE_OUT_OF_MEMORY;
      }
    }
411

H
Haojun Liao 已提交
412
    STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList;
413 414

    for (int32_t i = 0; i < numOfQualifiedTables; ++i) {
415
      uint64_t*     uid = taosArrayGet(qa, i);
416
      STableKeyInfo keyInfo = {.uid = *uid, .groupId = 0};
417 418

      if (bufLen > 0) {
419 420 421 422 423 424
        if (assignUid) {
          keyInfo.groupId = keyInfo.uid;
        } else {
          code = getGroupIdFromTagsVal(pScanInfo->readHandle.meta, keyInfo.uid, pScanInfo->pGroupTags, keyBuf,
                                       &keyInfo.groupId);
          if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
425
            taosMemoryFree(keyBuf);
H
Haojun Liao 已提交
426
            taosArrayDestroy(qa);
427 428
            return code;
          }
429 430 431
        }
      }

L
Liu Jicong 已提交
432
#if 0
433
      bool exists = false;
H
Haojun Liao 已提交
434 435 436 437 438 439
      for (int32_t k = 0; k < taosArrayGetSize(pListInfo->pTableList); ++k) {
        STableKeyInfo* pKeyInfo = taosArrayGet(pListInfo->pTableList, k);
        if (pKeyInfo->uid == keyInfo.uid) {
          qWarn("ignore duplicated query table uid:%" PRIu64 " added, %s", pKeyInfo->uid, pTaskInfo->id.str);
          exists = true;
        }
440 441
      }

H
Haojun Liao 已提交
442
      if (!exists) {
443
#endif
444

H
Haojun Liao 已提交
445
      tableListAddTableInfo(pTableListInfo, keyInfo.uid, keyInfo.groupId);
446 447
    }

448 449 450 451
    if (keyBuf != NULL) {
      taosMemoryFree(keyBuf);
    }

452 453 454
    taosArrayDestroy(qa);
  } else {  // remove the table id in current list
    qDebug(" %d remove child tables from the stream scanner", (int32_t)taosArrayGetSize(tableIdList));
L
Liu Jicong 已提交
455
    code = tqReaderRemoveTbUidList(pScanInfo->tqReader, tableIdList);
456 457
  }

458
  return code;
L
fix  
Liu Jicong 已提交
459
}
460

461
int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion,
L
Liu Jicong 已提交
462
                                    int32_t* tversion) {
463
  ASSERT(tinfo != NULL && dbName != NULL && tableName != NULL);
464
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
465

466
  if (pTaskInfo->schemaInfo.sw == NULL) {
H
Haojun Liao 已提交
467 468 469
    return TSDB_CODE_SUCCESS;
  }

470 471 472 473
  *sversion = pTaskInfo->schemaInfo.sw->version;
  *tversion = pTaskInfo->schemaInfo.tversion;
  if (pTaskInfo->schemaInfo.dbname) {
    strcpy(dbName, pTaskInfo->schemaInfo.dbname);
474 475 476
  } else {
    dbName[0] = 0;
  }
477 478
  if (pTaskInfo->schemaInfo.tablename) {
    strcpy(tableName, pTaskInfo->schemaInfo.tablename);
479 480 481
  } else {
    tableName[0] = 0;
  }
482 483

  return 0;
484
}
485 486

int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
D
dapan1121 已提交
487
                        qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, char* sql, EOPTR_EXEC_MODEL model) {
488 489 490 491 492
  assert(pSubplan != NULL);
  SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;

  taosThreadOnce(&initPoolOnce, initRefPool);

493
  qDebug("start to create task, TID:0x%" PRIx64 " QID:0x%" PRIx64 ", vgId:%d", taskId, pSubplan->id.queryId, vgId);
494

495
  int32_t code = createExecTaskInfo(pSubplan, pTask, readHandle, taskId, vgId, sql, model);
496
  if (code != TSDB_CODE_SUCCESS) {
497
    qError("failed to createExecTaskInfo, code: %s", tstrerror(code));
498 499 500
    goto _error;
  }

501
  SDataSinkMgtCfg cfg = {.maxDataBlockNum = 500, .maxDataBlockNumPerQuery = 50};
502 503
  code = dsDataSinkMgtInit(&cfg);
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
504
    qError("failed to dsDataSinkMgtInit, code:%s, %s", tstrerror(code), (*pTask)->id.str);
505 506 507 508 509 510 511
    goto _error;
  }

  if (handle) {
    void* pSinkParam = NULL;
    code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, pTaskInfo, readHandle);
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
512
      qError("failed to createDataSinkParam, vgId:%d, code:%s, %s", vgId, tstrerror(code), (*pTask)->id.str);
513 514 515
      goto _error;
    }

H
Haojun Liao 已提交
516
    code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam, (*pTask)->id.str);
L
Liu Jicong 已提交
517
    if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
518 519
      taosMemoryFreeClear(pSinkParam);
    }
520 521
  }

522
  qDebug("subplan task create completed, TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, pSubplan->id.queryId);
523

dengyihao's avatar
dengyihao 已提交
524
_error:
525 526 527 528
  // if failed to add ref for all tables in this query, abort current query
  return code;
}

H
Haojun Liao 已提交
529
static void freeBlock(void* param) {
530
  SSDataBlock* pBlock = *(SSDataBlock**)param;
H
Haojun Liao 已提交
531 532 533
  blockDataDestroy(pBlock);
}

534
int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bool* hasMore, SLocalFetch* pLocal) {
535 536 537
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  int64_t        threadId = taosGetSelfPthreadId();

D
dapan1121 已提交
538
  if (pLocal) {
539
    memcpy(&pTaskInfo->localFetch, pLocal, sizeof(*pLocal));
D
dapan1121 已提交
540
  }
L
Liu Jicong 已提交
541

H
Haojun Liao 已提交
542
  taosArrayClear(pResList);
H
Haojun Liao 已提交
543

544 545 546 547 548 549 550 551
  int64_t curOwner = 0;
  if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
    qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
    pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
    return pTaskInfo->code;
  }

  if (pTaskInfo->cost.start == 0) {
552
    pTaskInfo->cost.start = taosGetTimestampUs();
553 554 555
  }

  if (isTaskKilled(pTaskInfo)) {
556
    atomic_store_64(&pTaskInfo->owner, 0);
557 558 559 560 561 562 563 564 565
    qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
    return TSDB_CODE_SUCCESS;
  }

  // error occurs, record the error code and return to client
  int32_t ret = setjmp(pTaskInfo->env);
  if (ret != TSDB_CODE_SUCCESS) {
    pTaskInfo->code = ret;
    cleanUpUdfs();
H
Haojun Liao 已提交
566

567 568 569 570 571 572 573 574
    qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
    atomic_store_64(&pTaskInfo->owner, 0);

    return pTaskInfo->code;
  }

  qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));

575
  int32_t      current = 0;
H
Haojun Liao 已提交
576 577
  SSDataBlock* pRes = NULL;

578 579
  int64_t st = taosGetTimestampUs();

H
Haojun Liao 已提交
580
  int32_t blockIndex = 0;
581
  while ((pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot)) != NULL) {
H
Haojun Liao 已提交
582 583 584 585 586 587
    SSDataBlock* p = NULL;
    if (blockIndex >= taosArrayGetSize(pTaskInfo->pResultBlockList)) {
      SSDataBlock* p1 = createOneDataBlock(pRes, true);
      taosArrayPush(pTaskInfo->pResultBlockList, &p1);
      p = p1;
    } else {
L
Liu Jicong 已提交
588
      p = *(SSDataBlock**)taosArrayGet(pTaskInfo->pResultBlockList, blockIndex);
H
Haojun Liao 已提交
589 590 591 592 593
      copyDataBlock(p, pRes);
    }

    blockIndex += 1;

H
Haojun Liao 已提交
594 595 596 597 598 599 600 601 602
    current += p->info.rows;
    ASSERT(p->info.rows > 0);
    taosArrayPush(pResList, &p);

    if (current >= 4096) {
      break;
    }
  }

603
  *hasMore = (pRes != NULL);
604 605 606
  uint64_t el = (taosGetTimestampUs() - st);

  pTaskInfo->cost.elapsedTime += el;
H
Haojun Liao 已提交
607
  if (NULL == pRes) {
608 609 610 611 612
    *useconds = pTaskInfo->cost.elapsedTime;
  }

  cleanUpUdfs();

H
Haojun Liao 已提交
613
  uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
H
Haojun Liao 已提交
614
  qDebug("%s task suspended, %d rows in %d blocks returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
615
         GET_TASKID(pTaskInfo), current, (int32_t)taosArrayGetSize(pResList), total, 0, el / 1000.0);
616 617 618 619 620

  atomic_store_64(&pTaskInfo->owner, 0);
  return pTaskInfo->code;
}

H
Haojun Liao 已提交
621 622
void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
L
Liu Jicong 已提交
623 624 625
  SArray*        pList = pTaskInfo->pResultBlockList;
  size_t         num = taosArrayGetSize(pList);
  for (int32_t i = 0; i < num; ++i) {
H
Haojun Liao 已提交
626 627 628 629 630 631 632
    SSDataBlock** p = taosArrayGet(pTaskInfo->pResultBlockList, i);
    blockDataDestroy(*p);
  }

  taosArrayClear(pTaskInfo->pResultBlockList);
}

633 634 635 636 637 638 639 640 641 642 643 644 645
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  int64_t        threadId = taosGetSelfPthreadId();

  *pRes = NULL;
  int64_t curOwner = 0;
  if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
    qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
    pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
    return pTaskInfo->code;
  }

  if (pTaskInfo->cost.start == 0) {
646
    pTaskInfo->cost.start = taosGetTimestampUs();
647 648
  }

5
54liuyao 已提交
649
  if (isTaskKilled(pTaskInfo)) {
650
    clearStreamBlock(pTaskInfo->pRoot);
651
    atomic_store_64(&pTaskInfo->owner, 0);
652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689
    qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
    return TSDB_CODE_SUCCESS;
  }

  // error occurs, record the error code and return to client
  int32_t ret = setjmp(pTaskInfo->env);
  if (ret != TSDB_CODE_SUCCESS) {
    pTaskInfo->code = ret;
    cleanUpUdfs();
    qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
    atomic_store_64(&pTaskInfo->owner, 0);
    return pTaskInfo->code;
  }

  qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));

  int64_t st = taosGetTimestampUs();

  *pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot);
  uint64_t el = (taosGetTimestampUs() - st);

  pTaskInfo->cost.elapsedTime += el;
  if (NULL == *pRes) {
    *useconds = pTaskInfo->cost.elapsedTime;
  }

  cleanUpUdfs();

  int32_t  current = (*pRes != NULL) ? (*pRes)->info.rows : 0;
  uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;

  qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
         GET_TASKID(pTaskInfo), current, total, 0, el / 1000.0);

  atomic_store_64(&pTaskInfo->owner, 0);
  return pTaskInfo->code;
}

L
Liu Jicong 已提交
690
int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo) {
D
dapan1121 已提交
691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710
  taosWLockLatch(&pTaskInfo->stopInfo.lock);
  taosArrayPush(pTaskInfo->stopInfo.pStopInfo, pInfo);
  taosWUnLockLatch(&pTaskInfo->stopInfo.lock);

  return TSDB_CODE_SUCCESS;
}

int32_t stopInfoComp(void const* lp, void const* rp) {
  SExchangeOpStopInfo* key = (SExchangeOpStopInfo*)lp;
  SExchangeOpStopInfo* pInfo = (SExchangeOpStopInfo*)rp;

  if (key->refId < pInfo->refId) {
    return -1;
  } else if (key->refId > pInfo->refId) {
    return 1;
  }

  return 0;
}

L
Liu Jicong 已提交
711
void qRemoveTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo) {
D
dapan1121 已提交
712 713 714 715 716 717 718
  taosWLockLatch(&pTaskInfo->stopInfo.lock);
  int32_t idx = taosArraySearchIdx(pTaskInfo->stopInfo.pStopInfo, pInfo, stopInfoComp, TD_EQ);
  if (idx >= 0) {
    taosArrayRemove(pTaskInfo->stopInfo.pStopInfo, idx);
  }
  taosWUnLockLatch(&pTaskInfo->stopInfo.lock);

D
dapan1121 已提交
719
  return;
D
dapan1121 已提交
720 721 722 723 724 725 726
}

void qStopTaskOperators(SExecTaskInfo* pTaskInfo) {
  taosWLockLatch(&pTaskInfo->stopInfo.lock);

  int32_t num = taosArrayGetSize(pTaskInfo->stopInfo.pStopInfo);
  for (int32_t i = 0; i < num; ++i) {
L
Liu Jicong 已提交
727 728
    SExchangeOpStopInfo* pStop = taosArrayGet(pTaskInfo->stopInfo.pStopInfo, i);
    SExchangeInfo*       pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pStop->refId);
D
dapan1121 已提交
729 730 731 732 733 734 735 736 737
    if (pExchangeInfo) {
      tsem_post(&pExchangeInfo->ready);
      taosReleaseRef(exchangeObjRefPool, pStop->refId);
    }
  }

  taosWUnLockLatch(&pTaskInfo->stopInfo.lock);
}

D
dapan1121 已提交
738
int32_t qAsyncKillTask(qTaskInfo_t qinfo, int32_t rspCode) {
739 740 741 742 743 744
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
  if (pTaskInfo == NULL) {
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

  qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
L
Liu Jicong 已提交
745

D
dapan1121 已提交
746
  setTaskKilled(pTaskInfo, rspCode);
D
dapan1121 已提交
747
  qStopTaskOperators(pTaskInfo);
L
Liu Jicong 已提交
748

749 750 751
  return TSDB_CODE_SUCCESS;
}

752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768
int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  if (pTaskInfo == NULL) {
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

  qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
  setTaskKilled(pTaskInfo, rspCode);

  while(qTaskIsExecuting(pTaskInfo)) {
    taosMsleep(10);
  }

  pTaskInfo->code = rspCode;
  return TSDB_CODE_SUCCESS;
}

769 770 771 772 773 774 775 776 777
bool qTaskIsExecuting(qTaskInfo_t qinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
  if (NULL == pTaskInfo) {
    return false;
  }

  return 0 != atomic_load_64(&pTaskInfo->owner);
}

H
Haojun Liao 已提交
778 779
static void printTaskExecCostInLog(SExecTaskInfo* pTaskInfo) {
  STaskCostInfo* pSummary = &pTaskInfo->cost;
780
  int64_t        idleTime = pSummary->start - pSummary->created;
H
Haojun Liao 已提交
781 782 783 784

  SFileBlockLoadRecorder* pRecorder = pSummary->pRecoder;
  if (pSummary->pRecoder != NULL) {
    qDebug(
785
        "%s :cost summary: idle:%.2f ms, elapsed time:%.2f ms, extract tableList:%.2f ms, "
786
        "createGroupIdMap:%.2f ms, total blocks:%d, "
H
Haojun Liao 已提交
787
        "load block SMA:%d, load data block:%d, total rows:%" PRId64 ", check rows:%" PRId64,
788 789 790 791 792 793
        GET_TASKID(pTaskInfo), idleTime / 1000.0, pSummary->elapsedTime / 1000.0, pSummary->extractListTime,
        pSummary->groupIdMapTime, pRecorder->totalBlocks, pRecorder->loadBlockStatis, pRecorder->loadBlocks,
        pRecorder->totalRows, pRecorder->totalCheckedRows);
  } else {
    qDebug("%s :cost summary: idle in queue:%.2f ms, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), idleTime / 1000.0,
           pSummary->elapsedTime / 1000.0);
H
Haojun Liao 已提交
794 795 796
  }
}

797 798 799 800 801 802 803 804
void qDestroyTask(qTaskInfo_t qTaskHandle) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qTaskHandle;
  if (pTaskInfo == NULL) {
    return;
  }

  qDebug("%s execTask completed, numOfRows:%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->pRoot->resultInfo.totalRows);

H
Haojun Liao 已提交
805
  printTaskExecCostInLog(pTaskInfo);  // print the query cost summary
806 807 808
  doDestroyTask(pTaskInfo);
}

H
Haojun Liao 已提交
809
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList) {
810
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
H
Haojun Liao 已提交
811
  return getOperatorExplainExecInfo(pTaskInfo->pRoot, pExecInfoList);
812 813 814 815 816 817 818 819 820
}

int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) {
  SExecTaskInfo* pTaskInfo = (struct SExecTaskInfo*)tinfo;
  if (pTaskInfo->pRoot == NULL) {
    return TSDB_CODE_INVALID_PARA;
  }

  int32_t nOptrWithVal = 0;
L
Liu Jicong 已提交
821 822 823 824 825
  //  int32_t code = encodeOperator(pTaskInfo->pRoot, pOutput, len, &nOptrWithVal);
  //  if ((code == TSDB_CODE_SUCCESS) && (nOptrWithVal == 0)) {
  //    taosMemoryFreeClear(*pOutput);
  //    *len = 0;
  //  }
H
Haojun Liao 已提交
826
  return 0;
827 828 829 830 831 832 833 834 835
}

int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t len) {
  SExecTaskInfo* pTaskInfo = (struct SExecTaskInfo*)tinfo;

  if (pTaskInfo == NULL || pInput == NULL || len == 0) {
    return TSDB_CODE_INVALID_PARA;
  }

H
Haojun Liao 已提交
836
  return 0;
L
Liu Jicong 已提交
837
  //  return decodeOperator(pTaskInfo->pRoot, pInput, len);
838 839 840 841 842 843 844
}

int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  SOperatorInfo* pOperator = pTaskInfo->pRoot;

  while (1) {
845
    uint16_t type = pOperator->operatorType;
846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864
    if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
      *scanner = pOperator->info;
      return 0;
    } else {
      ASSERT(pOperator->numOfDownstream == 1);
      pOperator = pOperator->pDownstream[0];
    }
  }
}

#if 0
int32_t qStreamInput(qTaskInfo_t tinfo, void* pItem) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
  taosWriteQitem(pTaskInfo->streamInfo.inputQueue->queue, pItem);
  return 0;
}
#endif

865
int32_t qStreamSourceRecoverStep1(qTaskInfo_t tinfo, int64_t ver) {
866 867
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
L
Liu Jicong 已提交
868
  pTaskInfo->streamInfo.fillHistoryVer1 = ver;
869 870 871 872 873 874 875
  pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__PREPARE1;
  return 0;
}

int32_t qStreamSourceRecoverStep2(qTaskInfo_t tinfo, int64_t ver) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
L
Liu Jicong 已提交
876
  pTaskInfo->streamInfo.fillHistoryVer2 = ver;
877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896
  pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__PREPARE2;
  return 0;
}

int32_t qStreamRecoverFinish(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
  pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE;
  return 0;
}

int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  SOperatorInfo* pOperator = pTaskInfo->pRoot;

  while (1) {
    if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL ||
        pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
        pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
      SStreamIntervalOperatorInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
897 898 899 900 901 902 903 904 905
      ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||
             pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
      ASSERT(pInfo->twAggSup.calTriggerSaved == 0);
      ASSERT(pInfo->twAggSup.deleteMarkSaved == 0);

      qInfo("save stream param for interval: %d,  %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);

      pInfo->twAggSup.calTriggerSaved = pInfo->twAggSup.calTrigger;
      pInfo->twAggSup.deleteMarkSaved = pInfo->twAggSup.deleteMark;
906 907
      pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
      pInfo->twAggSup.deleteMark = INT64_MAX;
908 909
      pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData;
      pInfo->ignoreExpiredData = false;
910 911 912 913
    } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
               pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
               pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
      SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
914 915 916 917 918 919 920 921 922
      ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||
             pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
      ASSERT(pInfo->twAggSup.calTriggerSaved == 0);
      ASSERT(pInfo->twAggSup.deleteMarkSaved == 0);

      qInfo("save stream param for session: %d,  %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);

      pInfo->twAggSup.calTriggerSaved = pInfo->twAggSup.calTrigger;
      pInfo->twAggSup.deleteMarkSaved = pInfo->twAggSup.deleteMark;
923 924
      pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
      pInfo->twAggSup.deleteMark = INT64_MAX;
925 926
      pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData;
      pInfo->ignoreExpiredData = false;
927 928
    } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
      SStreamStateAggOperatorInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
929 930 931 932 933 934 935 936 937
      ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||
             pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
      ASSERT(pInfo->twAggSup.calTriggerSaved == 0);
      ASSERT(pInfo->twAggSup.deleteMarkSaved == 0);

      qInfo("save stream param for state: %d,  %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);

      pInfo->twAggSup.calTriggerSaved = pInfo->twAggSup.calTrigger;
      pInfo->twAggSup.deleteMarkSaved = pInfo->twAggSup.deleteMark;
938 939
      pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
      pInfo->twAggSup.deleteMark = INT64_MAX;
940 941
      pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData;
      pInfo->ignoreExpiredData = false;
942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968
    }

    // iterate operator tree
    if (pOperator->numOfDownstream != 1 || pOperator->pDownstream[0] == NULL) {
      if (pOperator->numOfDownstream > 1) {
        qError("unexpected stream, multiple downstream");
        ASSERT(0);
        return -1;
      }
      return 0;
    } else {
      pOperator = pOperator->pDownstream[0];
    }
  }

  return 0;
}

int32_t qStreamRestoreParam(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  SOperatorInfo* pOperator = pTaskInfo->pRoot;

  while (1) {
    if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL ||
        pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
        pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
      SStreamIntervalOperatorInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
969 970
      pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
      pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
971
      pInfo->ignoreExpiredData = pInfo->ignoreExpiredDataSaved;
L
Liu Jicong 已提交
972
      qInfo("restore stream param for interval: %d,  %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
973 974 975 976
    } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
               pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
               pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
      SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
977 978
      pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
      pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
979
      pInfo->ignoreExpiredData = pInfo->ignoreExpiredDataSaved;
L
Liu Jicong 已提交
980
      qInfo("restore stream param for session: %d,  %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
981 982
    } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
      SStreamStateAggOperatorInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
983 984
      pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
      pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
985
      pInfo->ignoreExpiredData = pInfo->ignoreExpiredDataSaved;
L
Liu Jicong 已提交
986
      qInfo("restore stream param for state: %d,  %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
987 988 989 990 991 992
    }

    // iterate operator tree
    if (pOperator->numOfDownstream != 1 || pOperator->pDownstream[0] == NULL) {
      if (pOperator->numOfDownstream > 1) {
        qError("unexpected stream, multiple downstream");
L
Liu Jicong 已提交
993
        /*ASSERT(0);*/
994 995 996 997 998 999 1000
        return -1;
      }
      return 0;
    } else {
      pOperator = pOperator->pDownstream[0];
    }
  }
1001 1002
  return 0;
}
L
Liu Jicong 已提交
1003 1004 1005 1006
bool qStreamRecoverScanFinished(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  return pTaskInfo->streamInfo.recoverScanFinished;
}
1007 1008 1009 1010 1011 1012

void* qExtractReaderFromStreamScanner(void* scanner) {
  SStreamScanInfo* pInfo = scanner;
  return (void*)pInfo->tqReader;
}

wmmhello's avatar
wmmhello 已提交
1013 1014 1015 1016 1017 1018 1019 1020
const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  return pTaskInfo->streamInfo.schema;
}

const char* qExtractTbnameFromTask(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  return pTaskInfo->streamInfo.tbName;
1021 1022
}

wmmhello's avatar
wmmhello 已提交
1023
SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) {
1024
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
wmmhello's avatar
wmmhello 已提交
1025
  return &pTaskInfo->streamInfo.metaRsp;
1026 1027
}

wmmhello's avatar
wmmhello 已提交
1028 1029 1030 1031 1032
int64_t qStreamExtractPrepareUid(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  return pTaskInfo->streamInfo.prepareStatus.uid;
}

1033 1034 1035 1036 1037 1038
int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  memcpy(pOffset, &pTaskInfo->streamInfo.lastStatus, sizeof(STqOffsetVal));
  return 0;
}

H
Haojun Liao 已提交
1039
int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* sContext, SMetaTableInfo* pMtInfo) {
1040 1041
  memset(pCond, 0, sizeof(SQueryTableDataCond));
  pCond->order = TSDB_ORDER_ASC;
H
Haojun Liao 已提交
1042
  pCond->numOfCols = pMtInfo->schema->nCols;
1043
  pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo));
H
Haojun Liao 已提交
1044 1045 1046 1047
  pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t) * pCond->numOfCols);
  if (pCond->colList == NULL || pCond->pSlotList == NULL) {
    taosMemoryFreeClear(pCond->colList);
    taosMemoryFreeClear(pCond->pSlotList);
S
Shengliang Guan 已提交
1048
    terrno = TSDB_CODE_OUT_OF_MEMORY;
1049 1050 1051
    return terrno;
  }

H
Haojun Liao 已提交
1052
  pCond->twindows = TSWINDOW_INITIALIZER;
H
Haojun Liao 已提交
1053
  pCond->suid = pMtInfo->suid;
1054 1055 1056 1057 1058
  pCond->type = TIMEWINDOW_RANGE_CONTAINED;
  pCond->startVersion = -1;
  pCond->endVersion = sContext->snapVersion;

  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
H
Haojun Liao 已提交
1059 1060 1061 1062 1063 1064
    SColumnInfo* pColInfo = &pCond->colList[i];
    pColInfo->type = pMtInfo->schema->pSchema[i].type;
    pColInfo->bytes = pMtInfo->schema->pSchema[i].bytes;
    pColInfo->colId = pMtInfo->schema->pSchema[i].colId;

    pCond->pSlotList[i] = i;
1065 1066 1067 1068 1069
  }

  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
1070
int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit) {
L
Liu Jicong 已提交
1071
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
wmmhello's avatar
wmmhello 已提交
1072 1073 1074 1075
  if((pTaskInfo->execModel != OPTR_EXEC_MODEL_QUEUE) || (pTaskInfo->streamInfo.submit.msgStr != NULL)){
    qError("qStreamSetScanMemData err:%d,%p", pTaskInfo->execModel, pTaskInfo->streamInfo.submit.msgStr);
    return -1;
  }
1076 1077
  qDebug("set the submit block for future scan");

L
Liu Jicong 已提交
1078
  pTaskInfo->streamInfo.submit = submit;
L
Liu Jicong 已提交
1079 1080 1081
  return 0;
}

1082
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) {
1083 1084 1085
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  SOperatorInfo* pOperator = pTaskInfo->pRoot;
  pTaskInfo->streamInfo.prepareStatus = *pOffset;
1086
  pTaskInfo->streamInfo.returned = 0;
1087

1088 1089 1090
  if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus)) {
    return 0;
  }
1091

1092 1093
  if (subType == TOPIC_SUB_TYPE__COLUMN) {
    pOperator->status = OP_OPENED;
1094

1095
    // TODO add more check
1096
    if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
wmmhello's avatar
wmmhello 已提交
1097 1098 1099 1100
      if(pOperator->numOfDownstream != 1){
        qError("pOperator->numOfDownstream != 1:%d", pOperator->numOfDownstream);
        return -1;
      }
1101 1102
      pOperator = pOperator->pDownstream[0];
    }
L
Liu Jicong 已提交
1103

1104 1105 1106
    SStreamScanInfo* pInfo = pOperator->info;
    if (pOffset->type == TMQ_OFFSET__LOG) {
      STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1107 1108
      tsdbReaderClose(pTSInfo->base.dataReader);
      pTSInfo->base.dataReader = NULL;
H
Haojun Liao 已提交
1109
      // let's seek to the next version in wal file
H
Haojun Liao 已提交
1110
      if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, pTaskInfo->id.str) < 0) {
wmmhello's avatar
wmmhello 已提交
1111
        qError("tqSeekVer failed ver:%" PRId64, pOffset->version + 1);
1112 1113 1114
        return -1;
      }
    } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
1115 1116
      // iterate all tables from tableInfoList, and retrieve rows from each table one-by-one
      // those data are from the snapshot in tsdb, besides the data in the wal file.
1117 1118 1119 1120
      int64_t uid = pOffset->uid;
      int64_t ts = pOffset->ts;

      if (uid == 0) {
H
Haojun Liao 已提交
1121 1122
        if (tableListGetSize(pTaskInfo->pTableInfoList) != 0) {
          STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, 0);
1123 1124 1125
          uid = pTableInfo->uid;
          ts = INT64_MIN;
        } else {
wmmhello's avatar
wmmhello 已提交
1126
          qError("uid == 0 and tablelist size is 0");
L
Liu Jicong 已提交
1127 1128
          return -1;
        }
1129
      }
H
Haojun Liao 已提交
1130

1131 1132 1133
      /*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/
      /*pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {*/
      STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1134
      int32_t         numOfTables = tableListGetSize(pTaskInfo->pTableInfoList);
H
Haojun Liao 已提交
1135

H
Haojun Liao 已提交
1136
      qDebug("switch to table uid:%" PRId64 " ts:%" PRId64 "% "PRId64 " rows returned", uid, ts, pInfo->pTableScanOp->resultInfo.totalRows);
1137
      pInfo->pTableScanOp->resultInfo.totalRows = 0;
H
Haojun Liao 已提交
1138

1139
      bool found = false;
1140
      for (int32_t i = 0; i < numOfTables; i++) {
H
Haojun Liao 已提交
1141
        STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, i);
1142 1143 1144 1145
        if (pTableInfo->uid == uid) {
          found = true;
          pTableScanInfo->currentTable = i;
          break;
L
Liu Jicong 已提交
1146
        }
1147
      }
1148

L
Liu Jicong 已提交
1149
      // TODO after dropping table, table may not found
wmmhello's avatar
wmmhello 已提交
1150 1151 1152 1153
      if(!found){
        qError("uid not found in tablelist %" PRId64, uid);
        return -1;
      }
1154

H
Haojun Liao 已提交
1155
      if (pTableScanInfo->base.dataReader == NULL) {
H
Haojun Liao 已提交
1156
        STableKeyInfo* pList = tableListGetInfo(pTaskInfo->pTableInfoList, 0);
dengyihao's avatar
dengyihao 已提交
1157
        int32_t        num = tableListGetSize(pTaskInfo->pTableInfoList);
1158

H
Haojun Liao 已提交
1159
        if (tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &pTableScanInfo->base.cond, pList, num,
H
Haojun Liao 已提交
1160
                           pTableScanInfo->pResBlock, &pTableScanInfo->base.dataReader, NULL) < 0 ||
H
Haojun Liao 已提交
1161
            pTableScanInfo->base.dataReader == NULL) {
wmmhello's avatar
wmmhello 已提交
1162 1163
          qError("tsdbReaderOpen failed. uid:%" PRIi64, pOffset->uid);
          return -1;
L
Liu Jicong 已提交
1164
        }
1165
      }
H
Haojun Liao 已提交
1166

1167
      STableKeyInfo tki = {.uid = uid};
H
Haojun Liao 已提交
1168 1169 1170 1171 1172
      tsdbSetTableList(pTableScanInfo->base.dataReader, &tki, 1);
      int64_t oldSkey = pTableScanInfo->base.cond.twindows.skey;
      pTableScanInfo->base.cond.twindows.skey = ts + 1;
      tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
      pTableScanInfo->base.cond.twindows.skey = oldSkey;
1173
      pTableScanInfo->scanTimes = 0;
1174

H
Haojun Liao 已提交
1175
      qDebug("tsdb reader offset seek snapshot to uid:%" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid,
1176
             ts, pTableScanInfo->currentTable, numOfTables);
1177
    } else {
wmmhello's avatar
wmmhello 已提交
1178 1179
      qError("invalid pOffset->type:%d", pOffset->type);
      return -1;
1180
    }
L
Liu Jicong 已提交
1181
  } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
1182
    SStreamRawScanInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
1183 1184 1185
    SSnapContext*       sContext = pInfo->sContext;
    if (setForSnapShot(sContext, pOffset->uid) != 0) {
      qError("setDataForSnapShot error. uid:%" PRIi64, pOffset->uid);
1186 1187 1188 1189 1190 1191
      return -1;
    }

    SMetaTableInfo mtInfo = getUidfromSnapShot(sContext);
    tsdbReaderClose(pInfo->dataReader);
    pInfo->dataReader = NULL;
H
Haojun Liao 已提交
1192

1193
    cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
H
Haojun Liao 已提交
1194 1195 1196 1197 1198
    tableListClear(pTaskInfo->pTableInfoList);

    if (mtInfo.uid == 0) {
      return 0;  // no data
    }
1199

H
Haojun Liao 已提交
1200
    initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, &mtInfo);
1201
    pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts;
1202

dengyihao's avatar
dengyihao 已提交
1203
    if (pTaskInfo->pTableInfoList == NULL) {
H
Haojun Liao 已提交
1204 1205 1206 1207
      pTaskInfo->pTableInfoList = tableListCreate();
    }

    tableListAddTableInfo(pTaskInfo->pTableInfoList, mtInfo.uid, 0);
1208

H
Haojun Liao 已提交
1209
    STableKeyInfo* pList = tableListGetInfo(pTaskInfo->pTableInfoList, 0);
dengyihao's avatar
dengyihao 已提交
1210
    int32_t        size = tableListGetSize(pTaskInfo->pTableInfoList);
1211

H
Haojun Liao 已提交
1212
    tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, NULL, &pInfo->dataReader, NULL);
1213

1214
    cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
L
Liu Jicong 已提交
1215 1216 1217 1218
    strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName);
    tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema);
    pTaskInfo->streamInfo.schema = mtInfo.schema;

1219
    qDebug("tmqsnap qStreamPrepareScan snapshot data uid:%" PRId64 " ts %" PRId64, mtInfo.uid, pOffset->ts);
L
Liu Jicong 已提交
1220
  } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_META) {
wmmhello's avatar
wmmhello 已提交
1221
    SStreamRawScanInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
1222 1223
    SSnapContext*       sContext = pInfo->sContext;
    if (setForSnapShot(sContext, pOffset->uid) != 0) {
H
Haojun Liao 已提交
1224
      qError("setForSnapShot error. uid:%" PRIu64 " ,version:%" PRId64, pOffset->uid, pOffset->version);
wmmhello's avatar
wmmhello 已提交
1225 1226
      return -1;
    }
1227
    qDebug("tmqsnap qStreamPrepareScan snapshot meta uid:%" PRId64 " ts %" PRId64, pOffset->uid, pOffset->ts);
L
Liu Jicong 已提交
1228
  } else if (pOffset->type == TMQ_OFFSET__LOG) {
wmmhello's avatar
wmmhello 已提交
1229 1230 1231
    SStreamRawScanInfo* pInfo = pOperator->info;
    tsdbReaderClose(pInfo->dataReader);
    pInfo->dataReader = NULL;
wmmhello's avatar
wmmhello 已提交
1232
    qDebug("tmqsnap qStreamPrepareScan snapshot log");
1233 1234 1235
  }
  return 0;
}
H
Haojun Liao 已提交
1236 1237 1238

void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
wmmhello's avatar
wmmhello 已提交
1239 1240 1241 1242
  if(pMsg->info.ahandle == NULL){
    qError("pMsg->info.ahandle is NULL");
    return;
  }
H
Haojun Liao 已提交
1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258

  SDataBuf buf = {.len = pMsg->contLen, .pData = NULL};

  if (pMsg->contLen > 0) {
    buf.pData = taosMemoryCalloc(1, pMsg->contLen);
    if (buf.pData == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
    } else {
      memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
    }
  }

  pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
  rpcFreeCont(pMsg->pCont);
  destroySendMsgInfo(pSendInfo);
L
Liu Jicong 已提交
1259
}
L
Liu Jicong 已提交
1260