executor.h 6.5 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifndef _TD_EXECUTOR_H_
#define _TD_EXECUTOR_H_

#ifdef __cplusplus
extern "C" {
#endif

D
dapan1121 已提交
23
#include "query.h"
L
Liu Jicong 已提交
24
#include "tcommon.h"
25
#include "tmsgcb.h"
26
#include "storageapi.h"
D
dapan1121 已提交
27

H
Haojun Liao 已提交
28
typedef void* qTaskInfo_t;
H
Haojun Liao 已提交
29
typedef void* DataSinkHandle;
30

S
Shengliang 已提交
31
struct SRpcMsg;
H
Haojun Liao 已提交
32 33
struct SSubplan;

L
Liu Jicong 已提交
34
typedef int32_t (*localFetchFp)(void*, uint64_t, uint64_t, uint64_t, int64_t, int32_t, void**, SArray*);
D
dapan1121 已提交
35 36

typedef struct {
L
Liu Jicong 已提交
37
  void*        handle;
D
dapan1121 已提交
38
  bool         localExec;
D
dapan1121 已提交
39
  localFetchFp fp;
L
Liu Jicong 已提交
40
  SArray*      explainRes;
D
dapan1121 已提交
41 42
} SLocalFetch;

43
typedef struct {
L
Liu Jicong 已提交
44
  void*   tqReader;
45 46 47 48
  void*   config;
  void*   vnode;
  void*   mnd;
  SMsgCb* pMsgCb;
49
  int64_t version;
50 51
  bool    initMetaReader;
  bool    initTableReader;
L
Liu Jicong 已提交
52
  bool    initTqReader;
5
54liuyao 已提交
53
  int32_t numOfVgroups;
54
  void*   sContext;  // SSnapContext*
55

56
  void*   pStateBackend;
57
  struct SStorageAPI api;
L
liuyao 已提交
58

L
liuyao 已提交
59 60
  int8_t        fillHistory;
  STimeWindow   winRange;
H
Haojun Liao 已提交
61
} SReadHandle;
62

L
Liu Jicong 已提交
63
// in queue mode, data streams are seperated by msg
64
typedef enum {
65
  OPTR_EXEC_MODEL_BATCH = 0x1,
66
  OPTR_EXEC_MODEL_STREAM = 0x2,
L
Liu Jicong 已提交
67
  OPTR_EXEC_MODEL_QUEUE = 0x3,
68 69
} EOPTR_EXEC_MODEL;

L
Liu Jicong 已提交
70
/**
L
Liu Jicong 已提交
71
 * Create the exec task for stream mode
L
Liu Jicong 已提交
72
 * @param pMsg
L
Liu Jicong 已提交
73
 * @param SReadHandle
74
 * @param vgId
L
Liu Jicong 已提交
75 76
 * @return
 */
77
qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t vgId);
78

L
Liu Jicong 已提交
79
/**
L
Liu Jicong 已提交
80 81 82
 * Create the exec task for queue mode
 * @param pMsg
 * @param SReadHandle
L
Liu Jicong 已提交
83 84
 * @return
 */
X
Xiaoyu Wang 已提交
85 86
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols,
                                     uint64_t id);
L
Liu Jicong 已提交
87

wmmhello's avatar
wmmhello 已提交
88
int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray **tableList, void* pTaskInfo);
89

H
Haojun Liao 已提交
90 91 92 93 94 95 96
/**
 * set the task Id, usually used by message queue process
 * @param tinfo
 * @param taskId
 * @param queryId
 */
void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId);
L
Liu Jicong 已提交
97

wmmhello's avatar
wmmhello 已提交
98
//void qSetTaskCode(qTaskInfo_t tinfo, int32_t code);
99

L
Liu Jicong 已提交
100
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo);
101

102
// todo refactor
L
liuyao 已提交
103
void qGetCheckpointVersion(qTaskInfo_t tinfo, int64_t* dataVer, int64_t* ckId);
104

H
Haojun Liao 已提交
105 106 107 108 109 110 111 112
/**
 * Set multiple input data blocks for the stream scan.
 * @param tinfo
 * @param pBlocks
 * @param numOfInputBlock
 * @param type
 * @return
 */
L
Liu Jicong 已提交
113
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type);
H
Haojun Liao 已提交
114

L
Liu Jicong 已提交
115 116 117 118 119 120 121 122 123 124
/**
 * Set block for sma
 * @param tinfo
 * @param pBlocks
 * @param numOfInputBlock
 * @param type
 * @return
 */
int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type);

125 126 127 128 129 130 131 132
/**
 * Update the table id list, add or remove.
 *
 * @param tinfo
 * @param id
 * @param isAdd
 * @return
 */
133
int32_t qUpdateTableListForStreamScanner(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd);
134

L
Liu Jicong 已提交
135 136 137 138 139 140 141 142 143 144
/**
 * Create the exec task object according to task json
 * @param readHandle
 * @param vgId
 * @param pTaskInfoMsg
 * @param pTaskInfo
 * @param qId
 * @return
 */
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, struct SSubplan* pPlan,
D
dapan1121 已提交
145
                        qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, char* sql, EOPTR_EXEC_MODEL model);
H
Haojun Liao 已提交
146

147 148 149 150 151 152 153
/**
 *
 * @param tinfo
 * @param sversion
 * @param tversion
 * @return
 */
154
int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion,
L
Liu Jicong 已提交
155
                                    int32_t* tversion);
156

H
Haojun Liao 已提交
157
/**
H
Haojun Liao 已提交
158
 * The main task execution function, including query on both table and multiple tables,
H
Haojun Liao 已提交
159 160
 * which are decided according to the tag or table name query conditions
 *
H
Haojun Liao 已提交
161 162
 * @param tinfo
 * @param handle
H
Haojun Liao 已提交
163 164
 * @return
 */
165
int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bool* hasMore, SLocalFetch* pLocal);
H
Haojun Liao 已提交
166

167
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pBlock, uint64_t* useconds);
H
Haojun Liao 已提交
168

H
Haojun Liao 已提交
169 170
void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo);

D
dapan1121 已提交
171 172
/**
 * kill the ongoing query asynchronously
H
Haojun Liao 已提交
173
 * @param tinfo  qhandle
D
dapan1121 已提交
174 175
 * @return
 */
D
dapan1121 已提交
176
int32_t qAsyncKillTask(qTaskInfo_t tinfo, int32_t rspCode);
177

178
int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode);
D
dapan1121 已提交
179

180 181
bool qTaskIsExecuting(qTaskInfo_t qinfo);

H
Haojun Liao 已提交
182 183 184 185
/**
 * destroy query info structure
 * @param qHandle
 */
H
Haojun Liao 已提交
186
void qDestroyTask(qTaskInfo_t tinfo);
H
Haojun Liao 已提交
187

D
dapan1121 已提交
188
void qProcessRspMsg(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet);
S
Shengliang 已提交
189

190
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList /*,int32_t* resNum, SExplainExecInfo** pRes*/);
D
dapan1121 已提交
191

192 193 194 195
int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len);

int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t len);

196 197 198
void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order);
void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery);
STimeWindow getAlignQueryTimeWindow(const SInterval* pInterval, int64_t key);
199

200
SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo);
L
Liu Jicong 已提交
201

202
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType);
L
Liu Jicong 已提交
203

204 205
void qStreamSetOpen(qTaskInfo_t tinfo);

206
void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset);
L
Liu Jicong 已提交
207

wmmhello's avatar
wmmhello 已提交
208
SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo);
L
Liu Jicong 已提交
209

wmmhello's avatar
wmmhello 已提交
210 211 212 213
const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo);

const char* qExtractTbnameFromTask(qTaskInfo_t tinfo);

L
Liu Jicong 已提交
214 215
void* qExtractReaderFromStreamScanner(void* scanner);

L
Liu Jicong 已提交
216 217
int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner);

218
int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo);
L
liuyao 已提交
219 220
int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow);
int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow);
221
int32_t qStreamRecoverFinish(qTaskInfo_t tinfo);
222
int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo);
L
Liu Jicong 已提交
223
bool    qStreamRecoverScanFinished(qTaskInfo_t tinfo);
224
int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo);
5
54liuyao 已提交
225
void    resetTaskInfo(qTaskInfo_t tinfo);
H
Haojun Liao 已提交
226

L
liuyao 已提交
227 228 229
int32_t qStreamOperatorReleaseState(qTaskInfo_t tInfo);
int32_t qStreamOperatorReloadState(qTaskInfo_t tInfo);

H
Hongze Cheng 已提交
230 231 232 233
#ifdef __cplusplus
}
#endif

234
#endif /*_TD_EXECUTOR_H_*/