querytask.h 4.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifndef TDENGINE_QUERYTASK_H
#define TDENGINE_QUERYTASK_H

#ifdef __cplusplus
extern "C" {
#endif

#define GET_TASKID(_t) (((SExecTaskInfo*)(_t))->id.str)

25 26 27 28 29 30 31 32 33 34 35
enum {
  // when this task starts to execute, this status will set
      TASK_NOT_COMPLETED = 0x1u,

  /* Task is over
   * 1. this status is used in one row result query process, e.g., count/sum/first/last/ avg...etc.
   * 2. when all data within queried time window, it is also denoted as query_completed
   */
      TASK_COMPLETED = 0x2u,
};

36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
typedef struct STaskIdInfo {
  uint64_t queryId;  // this is also a request id
  uint64_t subplanId;
  uint64_t templateId;
  char*    str;
  int32_t  vgId;
} STaskIdInfo;

typedef struct STaskCostInfo {
  int64_t                 created;
  int64_t                 start;
  uint64_t                elapsedTime;
  double                  extractListTime;
  double                  groupIdMapTime;
  SFileBlockLoadRecorder* pRecoder;
} STaskCostInfo;

typedef struct STaskStopInfo {
  SRWLatch lock;
  SArray*  pStopInfo;
} STaskStopInfo;

58
typedef struct {
59 60 61 62 63 64
  STqOffsetVal         currentOffset;  // for tmq
  SMqMetaRsp           metaRsp;        // for tmq fetching meta
  int64_t              snapshotVer;
  SSchemaWrapper*      schema;
  char                 tbName[TSDB_TABLE_NAME_LEN];  // this is the current scan table: todo refactor
  int8_t               recoverStep;
L
liuyao 已提交
65 66
  bool                 recoverStep1Finished;
  bool                 recoverStep2Finished;
67 68
  int8_t               recoverScanFinished;
  SQueryTableDataCond  tableCond;
69 70
  SVersionRange        fillHistoryVer;
  STimeWindow          fillHistoryWindow;
71 72 73
  SStreamState*        pState;
  int64_t              dataVersion;
  int64_t              checkPointId;
74 75
} SStreamTaskInfo;

76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
struct SExecTaskInfo {
  STaskIdInfo           id;
  uint32_t              status;
  STimeWindow           window;
  STaskCostInfo         cost;
  int64_t               owner;  // if it is in execution
  int32_t               code;
  int32_t               qbufQuota;  // total available buffer (in KB) during execution query
  int64_t               version;    // used for stream to record wal version, why not move to sschemainfo
  SStreamTaskInfo       streamInfo;
  SSchemaInfo           schemaInfo;
  const char*           sql;        // query sql string
  jmp_buf               env;        // jump to this position when error happens.
  EOPTR_EXEC_MODEL      execModel;  // operator execution model [batch model|stream model]
  SSubplan*             pSubplan;
  struct SOperatorInfo* pRoot;
  SLocalFetch           localFetch;
  SArray*               pResultBlockList;  // result block list
  STaskStopInfo         stopInfo;
  SRWLatch              lock;  // secure the access of STableListInfo
96
  SStorageAPI           storageAPI;
97 98 99
};

void           buildTaskId(uint64_t taskId, uint64_t queryId, char* dst);
100
SExecTaskInfo* doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model, SStorageAPI* pAPI);
101 102 103 104 105 106 107
void           doDestroyTask(SExecTaskInfo* pTaskInfo);
bool           isTaskKilled(SExecTaskInfo* pTaskInfo);
void           setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode);
void           setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
int32_t        createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
                                  int32_t vgId, char* sql, EOPTR_EXEC_MODEL model);
int32_t        qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo);
108
SArray*        getTableListInfo(const SExecTaskInfo* pTaskInfo);
109 110 111 112 113 114

#ifdef __cplusplus
}
#endif

#endif  // TDENGINE_QUERYTASK_H