executorMain.c 11.6 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "dataSinkMgt.h"
L
Liu Jicong 已提交
17
#include "os.h"
H
Haojun Liao 已提交
18
#include "tmsg.h"
L
Liu Jicong 已提交
19
#include "tref.h"
S
slzhou 已提交
20
#include "tudf.h"
H
Haojun Liao 已提交
21 22

#include "executor.h"
S
slzhou 已提交
23 24
#include "executorimpl.h"
#include "query.h"
25 26

static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT;
L
Liu Jicong 已提交
27
int32_t             exchangeObjRefPool = -1;
28

L
Liu Jicong 已提交
29
static void initRefPool() { exchangeObjRefPool = taosOpenRef(1024, doDestroyExchangeOperatorInfo); }
30 31 32 33
static void cleanupRefPool() {
  int32_t ref = atomic_val_compare_exchange_32(&exchangeObjRefPool, exchangeObjRefPool, 0);
  taosCloseRef(ref);
}
H
Haojun Liao 已提交
34

35
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
36
                        qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, const char* sql, EOPTR_EXEC_MODEL model) {
L
Liu Jicong 已提交
37
  assert(pSubplan != NULL);
H
Haojun Liao 已提交
38
  SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
H
Haojun Liao 已提交
39

40
  taosThreadOnce(&initPoolOnce, initRefPool);
41
  atexit(cleanupRefPool);
42
  int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, sql, model);
H
Haojun Liao 已提交
43 44 45
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
H
Haojun Liao 已提交
46

D
dapan1121 已提交
47
  SDataSinkMgtCfg cfg = {.maxDataBlockNum = 1000, .maxDataBlockNumPerQuery = 100};
48
  code = dsDataSinkMgtInit(&cfg);
H
Haojun Liao 已提交
49
  if (code != TSDB_CODE_SUCCESS) {
50
    goto _error;
H
Haojun Liao 已提交
51
  }
L
Liu Jicong 已提交
52

L
Liu Jicong 已提交
53
  if (handle) {
D
dapan1121 已提交
54
    void* pSinkParam = NULL;
D
dapan1121 已提交
55
    code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, pTaskInfo, readHandle);
D
dapan1121 已提交
56 57 58
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }
L
Liu Jicong 已提交
59

D
dapan1121 已提交
60
    code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam);
L
Liu Jicong 已提交
61
  }
62

L
Liu Jicong 已提交
63
_error:
H
Haojun Liao 已提交
64 65 66 67 68 69
  // if failed to add ref for all tables in this query, abort current query
  return code;
}

#ifdef TEST_IMPL
// wait moment
L
Liu Jicong 已提交
70 71 72
int waitMoment(SQInfo* pQInfo) {
  if (pQInfo->sql) {
    int   ms = 0;
H
Haojun Liao 已提交
73
    char* pcnt = strstr(pQInfo->sql, " count(*)");
L
Liu Jicong 已提交
74 75
    if (pcnt) return 0;

H
Haojun Liao 已提交
76
    char* pos = strstr(pQInfo->sql, " t_");
L
Liu Jicong 已提交
77
    if (pos) {
H
Haojun Liao 已提交
78 79
      pos += 3;
      ms = atoi(pos);
L
Liu Jicong 已提交
80 81
      while (*pos >= '0' && *pos <= '9') {
        pos++;
H
Haojun Liao 已提交
82 83
      }
      char unit_char = *pos;
L
Liu Jicong 已提交
84 85 86 87 88
      if (unit_char == 'h') {
        ms *= 3600 * 1000;
      } else if (unit_char == 'm') {
        ms *= 60 * 1000;
      } else if (unit_char == 's') {
H
Haojun Liao 已提交
89 90 91
        ms *= 1000;
      }
    }
L
Liu Jicong 已提交
92
    if (ms == 0) return 0;
H
Haojun Liao 已提交
93
    printf("test wait sleep %dms. sql=%s ...\n", ms, pQInfo->sql);
L
Liu Jicong 已提交
94 95

    if (ms < 1000) {
H
Haojun Liao 已提交
96 97 98
      taosMsleep(ms);
    } else {
      int used_ms = 0;
L
Liu Jicong 已提交
99
      while (used_ms < ms) {
H
Haojun Liao 已提交
100 101
        taosMsleep(1000);
        used_ms += 1000;
L
Liu Jicong 已提交
102
        if (isTaskKilled(pQInfo)) {
H
Haojun Liao 已提交
103 104 105 106 107 108 109 110 111 112
          printf("test check query is canceled, sleep break.%s\n", pQInfo->sql);
          break;
        }
      }
    }
  }
  return 1;
}
#endif

L
Liu Jicong 已提交
113
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
H
Haojun Liao 已提交
114 115
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  int64_t        threadId = taosGetSelfPthreadId();
H
Haojun Liao 已提交
116

D
dapan1121 已提交
117
  *pRes = NULL;
H
Haojun Liao 已提交
118
  int64_t curOwner = 0;
119
  if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
L
Liu Jicong 已提交
120
    qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
121
    pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
D
dapan1121 已提交
122
    return pTaskInfo->code;
H
Haojun Liao 已提交
123 124
  }

H
Haojun Liao 已提交
125
  if (pTaskInfo->cost.start == 0) {
126
    pTaskInfo->cost.start = taosGetTimestampMs();
H
Haojun Liao 已提交
127 128
  }

129
  if (isTaskKilled(pTaskInfo)) {
H
Haojun Liao 已提交
130
    qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
D
dapan1121 已提交
131
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
132 133 134
  }

  // error occurs, record the error code and return to client
135
  int32_t ret = setjmp(pTaskInfo->env);
H
Haojun Liao 已提交
136
  if (ret != TSDB_CODE_SUCCESS) {
137
    pTaskInfo->code = ret;
138
    cleanUpUdfs();
139
    qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
D
dapan1121 已提交
140
    return pTaskInfo->code;
H
Haojun Liao 已提交
141 142
  }

H
Haojun Liao 已提交
143
  qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
144

145
  int64_t st = taosGetTimestampUs();
D
dapan1121 已提交
146 147

  *pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot);
H
Haojun Liao 已提交
148
  uint64_t el = (taosGetTimestampUs() - st);
149

H
Haojun Liao 已提交
150
  pTaskInfo->cost.elapsedTime += el;
D
dapan1121 已提交
151 152 153 154
  if (NULL == *pRes) {
    *useconds = pTaskInfo->cost.elapsedTime;
  }

155 156
  cleanUpUdfs();

L
Liu Jicong 已提交
157
  int32_t  current = (*pRes != NULL) ? (*pRes)->info.rows : 0;
158
  uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
H
Haojun Liao 已提交
159

H
Haojun Liao 已提交
160
  qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
L
Liu Jicong 已提交
161
         GET_TASKID(pTaskInfo), current, total, 0, el / 1000.0);
H
Haojun Liao 已提交
162

163
  atomic_store_64(&pTaskInfo->owner, 0);
D
dapan1121 已提交
164
  return pTaskInfo->code;
H
Haojun Liao 已提交
165 166 167
}

int32_t qKillTask(qTaskInfo_t qinfo) {
L
Liu Jicong 已提交
168
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
H
Haojun Liao 已提交
169

H
Haojun Liao 已提交
170
  if (pTaskInfo == NULL) {
H
Haojun Liao 已提交
171 172 173
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

H
Haojun Liao 已提交
174
  qDebug("%s execTask killed", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
175
  setTaskKilled(pTaskInfo);
H
Haojun Liao 已提交
176 177 178

  // Wait for the query executing thread being stopped/
  // Once the query is stopped, the owner of qHandle will be cleared immediately.
H
Haojun Liao 已提交
179
  while (pTaskInfo->owner != 0) {
H
Haojun Liao 已提交
180 181 182 183 184 185
    taosMsleep(100);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
186
int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
L
Liu Jicong 已提交
187
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
D
dapan1121 已提交
188

H
Haojun Liao 已提交
189
  if (pTaskInfo == NULL) {
D
dapan1121 已提交
190 191 192
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

H
Haojun Liao 已提交
193
  qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
194
  setTaskKilled(pTaskInfo);
D
dapan1121 已提交
195 196 197 198

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
199
void qDestroyTask(qTaskInfo_t qTaskHandle) {
L
Liu Jicong 已提交
200 201
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qTaskHandle;
  qDebug("%s execTask completed, numOfRows:%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->pRoot->resultInfo.totalRows);
H
Haojun Liao 已提交
202

L
Liu Jicong 已提交
203
  queryCostStatis(pTaskInfo);  // print the query cost summary
H
Haojun Liao 已提交
204
  doDestroyTask(pTaskInfo);
H
Haojun Liao 已提交
205
}
D
dapan1121 已提交
206

L
Liu Jicong 已提交
207 208 209
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t* resNum, SExplainExecInfo** pRes) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  int32_t        capacity = 0;
D
dapan1121 已提交
210

L
Liu Jicong 已提交
211
  return getOperatorExplainExecInfo(pTaskInfo->pRoot, pRes, &capacity, resNum);
D
dapan1121 已提交
212 213
}

214 215
int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) {
  SExecTaskInfo* pTaskInfo = (struct SExecTaskInfo*)tinfo;
H
Haojun Liao 已提交
216 217 218 219
  if (pTaskInfo->pRoot == NULL) {
    return TSDB_CODE_INVALID_PARA;
  }

C
Cary Xu 已提交
220 221 222 223 224 225 226
  int32_t nOptrWithVal = 0;
  int32_t code = encodeOperator(pTaskInfo->pRoot, pOutput, len, &nOptrWithVal);
  if ((code == TSDB_CODE_SUCCESS) && (nOptrWithVal = 0)) {
    taosMemoryFreeClear(*pOutput);
    *len = 0;
  }
  return code;
H
Haojun Liao 已提交
227 228
}

229
int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t len) {
L
Liu Jicong 已提交
230
  SExecTaskInfo* pTaskInfo = (struct SExecTaskInfo*)tinfo;
231

H
Haojun Liao 已提交
232 233 234 235 236 237 238
  if (pTaskInfo == NULL || pInput == NULL || len == 0) {
    return TSDB_CODE_INVALID_PARA;
  }

  return decodeOperator(pTaskInfo->pRoot, pInput, len);
}

L
Liu Jicong 已提交
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269
int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  SOperatorInfo* pOperator = pTaskInfo->pRoot;

  while (1) {
    uint8_t type = pOperator->operatorType;
    if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
      *scanner = pOperator->info;
      return 0;
    } else {
      ASSERT(pOperator->numOfDownstream == 1);
      pOperator = pOperator->pDownstream[0];
    }
  }
}

void* qExtractReaderFromStreamScanner(void* scanner) {
  SStreamScanInfo* pInfo = scanner;
  return (void*)pInfo->tqReader;
}

const SSchemaWrapper* qExtractSchemaFromStreamScanner(void* scanner) {
  SStreamScanInfo* pInfo = scanner;
  return pInfo->tqReader->pSchemaWrapper;
}

const STqOffset* qExtractStatusFromStreamScanner(void* scanner) {
  SStreamScanInfo* pInfo = scanner;
  return &pInfo->offset;
}

L
Liu Jicong 已提交
270 271 272 273 274 275 276 277 278 279 280 281 282
void* qStreamExtractMetaMsg(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
  return pTaskInfo->streamInfo.metaBlk;
}

int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
  memcpy(pOffset, &pTaskInfo->streamInfo.lastStatus, sizeof(STqOffsetVal));
  return 0;
}

L
Liu Jicong 已提交
283
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
L
Liu Jicong 已提交
284 285 286 287 288 289 290 291 292 293 294 295
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  SOperatorInfo* pOperator = pTaskInfo->pRoot;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
  pTaskInfo->streamInfo.prepareStatus = *pOffset;
  // TODO: optimize
  /*if (pTaskInfo->streamInfo.lastStatus.type != pOffset->type ||*/
  /*pTaskInfo->streamInfo.prepareStatus.version != pTaskInfo->streamInfo.lastStatus.version) {*/
  while (1) {
    uint8_t type = pOperator->operatorType;
    pOperator->status = OP_OPENED;
    if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
      SStreamScanInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329
      if (pOffset->type == TMQ_OFFSET__LOG) {
        if (tqSeekVer(pInfo->tqReader, pOffset->version) < 0) {
          return -1;
        }
        ASSERT(pInfo->tqReader->pWalReader->curVersion == pOffset->version);
      } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
        pInfo->blockType = STREAM_INPUT__TABLE_SCAN;
        int64_t uid = pOffset->uid;
        int64_t ts = pOffset->ts;

        if (uid == 0) {
          if (taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList) != 0) {
            STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0);
            uid = pTableInfo->uid;
            ts = INT64_MIN;
          }
        }
        if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||
            pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {
          STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
          int32_t         tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
          bool            found = false;
          for (int32_t i = 0; i < tableSz; i++) {
            STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i);
            if (pTableInfo->uid == uid) {
              found = true;
              pTableScanInfo->currentTable = i;
            }
          }

          // TODO after dropping table, table may be not found
          ASSERT(found);

          tsdbSetTableId(pTableScanInfo->dataReader, uid);
H
Haojun Liao 已提交
330 331 332 333
          int64_t oldSkey = pTableScanInfo->cond.twindows.skey;
          pTableScanInfo->cond.twindows.skey = ts + 1;
          tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond);
          pTableScanInfo->cond.twindows.skey = oldSkey;
L
Liu Jicong 已提交
334 335 336 337 338 339 340 341
          pTableScanInfo->scanTimes = 0;

          qDebug("tsdb reader offset seek to uid %ld ts %ld, table cur set to %d , all table num %d", uid, ts,
                 pTableScanInfo->currentTable, tableSz);
        }

      } else {
        ASSERT(0);
L
Liu Jicong 已提交
342
      }
L
Liu Jicong 已提交
343 344 345 346 347 348 349 350 351 352
      return 0;
    } else {
      ASSERT(pOperator->numOfDownstream == 1);
      pOperator = pOperator->pDownstream[0];
    }
  }
  /*}*/
  return 0;
}

L
Liu Jicong 已提交
353
#if 0
L
Liu Jicong 已提交
354
int32_t qStreamPrepareTsdbScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts) {
L
Liu Jicong 已提交
355
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
D
dapan1121 已提交
356

357
  if (uid == 0) {
L
Liu Jicong 已提交
358 359 360 361 362
    if (taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList) != 0) {
      STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0);
      uid = pTableInfo->uid;
      ts = INT64_MIN;
    }
363 364
  }

L
Liu Jicong 已提交
365
  return doPrepareScan(pTaskInfo->pRoot, uid, ts);
366 367
}

L
Liu Jicong 已提交
368 369
int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
370

L
Liu Jicong 已提交
371 372
  return doGetScanStatus(pTaskInfo->pRoot, uid, ts);
}
L
Liu Jicong 已提交
373
#endif