提交 89d1c6f6 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/3.0' into feature/dnode3

......@@ -62,6 +62,12 @@ typedef struct SConstantItem {
SVariant value;
} SConstantItem;
typedef struct {
uint32_t numOfTables;
SArray *pGroupList;
SHashObj *map; // speedup acquire the tableQueryInfo by table uid
} STableGroupInfo;
typedef struct SSDataBlock {
SColumnDataAgg *pBlockAgg;
SArray *pDataBlock; // SArray<SColumnInfoData>
......
......@@ -986,6 +986,14 @@ typedef struct {
char msg[];
} SSubQueryMsg;
typedef struct {
SMsgHead header;
uint64_t sId;
uint64_t queryId;
uint64_t taskId;
} SSinkDataReq;
typedef struct {
SMsgHead header;
uint64_t sId;
......
......@@ -18,6 +18,7 @@
#include "mallocator.h"
#include "meta.h"
#include "common.h"
#ifdef __cplusplus
extern "C" {
......@@ -39,6 +40,10 @@ typedef struct STable {
STSchema *pSchema;
} STable;
#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2
#define BLOCK_LOAD_TABLE_RR_ORDER 3
#define TABLE_TID(t) (t)->tid
#define TABLE_UID(t) (t)->uid
......@@ -58,6 +63,22 @@ typedef struct STsdbCfg {
int8_t compression;
} STsdbCfg;
// query condition to build multi-table data block iterator
typedef struct STsdbQueryCond {
STimeWindow twindow;
int32_t order; // desc|asc order to iterate the data block
int32_t numOfCols;
SColumnInfo *colList;
bool loadExternalRows; // load external rows or not
int32_t type; // data block load type:
} STsdbQueryCond;
typedef struct {
void *pTable;
TSKEY lastKey;
uint64_t uid;
} STableKeyInfo;
// STsdb
STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF, SMeta *pMeta);
void tsdbClose(STsdb *);
......@@ -70,6 +91,119 @@ int tsdbCommit(STsdb *pTsdb);
int tsdbOptionsInit(STsdbCfg *);
void tsdbOptionsClear(STsdbCfg *);
typedef void* tsdbReadHandleT;
/**
* Get the data block iterator, starting from position according to the query condition
*
* @param tsdb tsdb handle
* @param pCond query condition, including time window, result set order, and basic required columns for each block
* @param tableInfoGroup table object list in the form of set, grouped into different sets according to the
* group by condition
* @param qinfo query info handle from query processor
* @return
*/
tsdbReadHandleT *tsdbQueryTables(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId,
void *pRef);
/**
* Get the last row of the given query time window for all the tables in STableGroupInfo object.
* Note that only one data block with only row will be returned while invoking retrieve data block function for
* all tables in this group.
*
* @param tsdb tsdb handle
* @param pCond query condition, including time window, result set order, and basic required columns for each block
* @param tableInfo table list.
* @return
*/
//tsdbReadHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfo, uint64_t qId,
// SMemRef *pRef);
tsdbReadHandleT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, void* pMemRef);
bool isTsdbCacheLastRow(tsdbReadHandleT* pTsdbReadHandle);
/**
* get num of rows in mem table
*
* @param pHandle
* @return row size
*/
int64_t tsdbGetNumOfRowsInMemTable(tsdbReadHandleT* pHandle);
/**
* move to next block if exists
*
* @param pTsdbReadHandle
* @return
*/
bool tsdbNextDataBlock(tsdbReadHandleT pTsdbReadHandle);
/**
* Get current data block information
*
* @param pTsdbReadHandle
* @param pBlockInfo
* @return
*/
void tsdbRetrieveDataBlockInfo(tsdbReadHandleT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo);
/**
*
* Get the pre-calculated information w.r.t. current data block.
*
* In case of data block in cache, the pBlockStatis will always be NULL.
* If a block is not completed loaded from disk, the pBlockStatis will be NULL.
* @pBlockStatis the pre-calculated value for current data blocks. if the block is a cache block, always return 0
* @return
*/
int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReadHandleT *pTsdbReadHandle, SDataStatis **pBlockStatis);
/**
*
* The query condition with primary timestamp is passed to iterator during its constructor function,
* the returned data block must be satisfied with the time window condition in any cases,
* which means the SData data block is not actually the completed disk data blocks.
*
* @param pTsdbReadHandle query handle
* @param pColumnIdList required data columns id list
* @return
*/
SArray *tsdbRetrieveDataBlock(tsdbReadHandleT *pTsdbReadHandle, SArray *pColumnIdList);
/**
* destroy the created table group list, which is generated by tag query
* @param pGroupList
*/
void tsdbDestroyTableGroup(STableGroupInfo *pGroupList);
/**
* create the table group result including only one table, used to handle the normal table query
*
* @param tsdb tsdbHandle
* @param uid table uid
* @param pGroupInfo the generated result
* @return
*/
int32_t tsdbGetOneTableGroup(STsdb *tsdb, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo);
/**
*
* @param tsdb
* @param pTableIdList
* @param pGroupInfo
* @return
*/
int32_t tsdbGetTableGroupFromIdList(STsdb *tsdb, SArray *pTableIdList, STableGroupInfo *pGroupInfo);
/**
* clean up the query handle
* @param queryHandle
*/
void tsdbCleanupQueryHandle(tsdbReadHandleT queryHandle);
#ifdef __cplusplus
}
#endif
......
......@@ -21,36 +21,38 @@ extern "C" {
#endif
#include "os.h"
#include "executorimpl.h"
#include "thash.h"
#include "executor.h"
#define DS_CAPACITY_ENOUGH 1
#define DS_CAPACITY_FULL 2
#define DS_NEED_SCHEDULE 3
#define DS_END 4
#define DS_IN_PROCESS 5
#define DS_BUF_LOW 1
#define DS_BUF_FULL 2
#define DS_BUF_EMPTY 3
struct SDataSink;
struct SSDataBlock;
typedef struct SDataSinkMgtCfg {
uint32_t maxDataBlockNum;
uint32_t maxDataBlockNum; // todo: this should be numOfRows?
uint32_t maxDataBlockNumPerQuery;
} SDataSinkMgtCfg;
int32_t dsDataSinkMgtInit(SDataSinkMgtCfg *cfg);
typedef void* DataSinkHandle;
typedef struct SInputData {
const SSDataBlock* pData;
const struct SSDataBlock* pData;
SHashObj* pTableRetrieveTsMap;
} SInputData;
typedef struct SOutPutData {
typedef struct SOutputData {
int32_t numOfRows;
int8_t compressed;
char* pData;
} SOutPutData;
bool queryEnd;
bool needSchedule;
int32_t bufStatus;
int64_t useconds;
int8_t precision;
} SOutputData;
/**
* Create a subplan's datasinker handle for all later operations.
......@@ -66,16 +68,16 @@ int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pH
* @param pRes
* @return error code
*/
int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, int32_t* pStatus);
int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* pContinue);
void dsEndPut(DataSinkHandle handle);
void dsEndPut(DataSinkHandle handle, int64_t useconds);
/**
* Get the length of the data returned by the next call to dsGetDataBlock.
* @param handle
* @return data length
* @param pLen data length
*/
int32_t dsGetDataLength(DataSinkHandle handle, int32_t* pStatus);
void dsGetDataLength(DataSinkHandle handle, int32_t* pLen, bool* pQueryEnd);
/**
* Get data, the caller needs to allocate data memory.
......@@ -84,7 +86,7 @@ int32_t dsGetDataLength(DataSinkHandle handle, int32_t* pStatus);
* @param pStatus output
* @return error code
*/
int32_t dsGetDataBlock(DataSinkHandle handle, SOutPutData* pOutput, int32_t* pStatus);
int32_t dsGetDataBlock(DataSinkHandle handle, SOutputData* pOutput);
/**
* After dsGetStatus returns DS_NEED_SCHEDULE, the caller need to put this into the work queue.
......
......@@ -21,24 +21,30 @@ extern "C" {
#endif
typedef void* qTaskInfo_t;
typedef void* DataSinkHandle;
struct SSubplan;
/**
* create the qinfo object according to QueryTableMsg
* @param tsdb
* @param pQueryTableMsg
* @param pTaskInfo
* @return
*/
int32_t qCreateTask(void* tsdb, int32_t vgId, void* pQueryTableMsg, qTaskInfo_t* pTaskInfo, uint64_t qId);
/**
* Create the exec task object according to task json
* @param tsdb
* @param vgId
* @param pTaskInfoMsg
* @param pTaskInfo
* @param qId
* @return
*/
int32_t qCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo);
/**
* the main query execution function, including query on both table and multiple tables,
* The main task execution function, including query on both table and multiple tables,
* which are decided according to the tag or table name query conditions
*
* @param qinfo
* @param tinfo
* @param handle
* @return
*/
bool qExecTask(qTaskInfo_t qinfo, uint64_t *qId);
int32_t qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle);
/**
* Retrieve the produced results information, if current query is not paused or completed,
......@@ -60,7 +66,7 @@ int32_t qRetrieveQueryResultInfo(qTaskInfo_t qinfo, bool* buildRes, void* pRspCo
* @param contLen payload length
* @return
*/
int32_t qDumpRetrieveResult(qTaskInfo_t qinfo, SRetrieveTableRsp** pRsp, int32_t* contLen, bool* continueExec);
//int32_t qDumpRetrieveResult(qTaskInfo_t qinfo, SRetrieveTableRsp** pRsp, int32_t* contLen, bool* continueExec);
/**
* return the transporter context (RPC)
......@@ -81,7 +87,7 @@ int32_t qKillTask(qTaskInfo_t qinfo);
* @param qinfo
* @return
*/
int32_t qIsQueryCompleted(qTaskInfo_t qinfo);
int32_t qIsTaskCompleted(qTaskInfo_t qinfo);
/**
* destroy query info structure
......@@ -113,7 +119,7 @@ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t
* @param numOfIndex
* @return
*/
int32_t qCreateTableGroupByGroupExpr(SArray* pTableIdList, TSKEY skey, STableGroupInfo groupInfo, SColIndex* groupByIndex, int32_t numOfIndex);
//int32_t qCreateTableGroupByGroupExpr(SArray* pTableIdList, TSKEY skey, STableGroupInfo groupInfo, SColIndex* groupByIndex, int32_t numOfIndex);
/**
* Update the table id list of a given query.
......
......@@ -20,8 +20,10 @@
extern "C" {
#endif
#include "planner.h"
#include "catalog.h"
#include "planner.h"
struct SSchJob;
typedef struct SSchedulerCfg {
uint32_t maxJobNum;
......@@ -65,7 +67,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg);
* @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr
* @return
*/
int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob, SQueryResult *pRes);
int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob, SQueryResult *pRes);
/**
* Process the query job, generated according to the query physical plan.
......@@ -73,7 +75,7 @@ int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void
* @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr
* @return
*/
int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob);
int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob);
/**
* Fetch query result from the remote query executor
......@@ -81,7 +83,7 @@ int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag,
* @param data
* @return
*/
int32_t scheduleFetchRows(void *pJob, void **data);
int32_t scheduleFetchRows(struct SSchJob *pJob, void **data);
/**
......@@ -89,7 +91,7 @@ int32_t scheduleFetchRows(void *pJob, void **data);
* @param pJob
* @return
*/
int32_t scheduleCancelJob(void *pJob);
//int32_t scheduleCancelJob(void *pJob);
/**
* Free the query job
......
......@@ -113,6 +113,7 @@ typedef struct SRequestSendRecvBody {
tsem_t rspSem; // not used now
void* fp;
SShowReqInfo showInfo; // todo this attribute will be removed after the query framework being completed.
struct SSchJob *pQueryJob; // query job, created according to sql query DAG.
SDataBuf requestMsg;
SReqResultInfo resInfo;
} SRequestSendRecvBody;
......@@ -129,7 +130,7 @@ typedef struct SRequestObj {
char *msgBuf;
void *pInfo; // sql parse info, generated by parser module
int32_t code;
uint64_t affectedRows;
uint64_t affectedRows; // todo remove it
SQueryExecMetric metric;
SRequestSendRecvBody body;
} SRequestObj;
......@@ -161,12 +162,13 @@ int taos_options_imp(TSDB_OPTION option, const char *str);
void* openTransporter(const char *user, const char *auth, int32_t numOfThreads);
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet);
void initMsgHandleFp();
TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port);
TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen);
void *doFetchRow(SRequestObj* pRequest);
void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows);
#ifdef __cplusplus
......
#include "../../libs/scheduler/inc/schedulerInt.h"
#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
#include "planner.h"
#include "scheduler.h"
#include "tdef.h"
#include "tep.h"
#include "tglobal.h"
......@@ -8,9 +12,6 @@
#include "tnote.h"
#include "tpagedfile.h"
#include "tref.h"
#include "parser.h"
#include "planner.h"
#include "scheduler.h"
#define CHECK_CODE_GOTO(expr, lable) \
do { \
......@@ -57,6 +58,7 @@ static char* getClusterKey(const char* user, const char* auth, const char* ip, i
}
static STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, const char *db, uint16_t port, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo);
static void setResSchemaInfo(SReqResultInfo* pResInfo, const SDataBlockSchema* pDataBlockSchema);
TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port) {
if (taos_init() != TSDB_CODE_SUCCESS) {
......@@ -197,19 +199,49 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQueryNode, SQueryDag** pDag) {
pRequest->type = pQueryNode->type;
return qCreateQueryDag(pQueryNode, pDag, pRequest->requestId);
SReqResultInfo* pResInfo = &pRequest->body.resInfo;
int32_t code = qCreateQueryDag(pQueryNode, pDag, pRequest->requestId);
if (code != 0) {
return code;
}
if (pQueryNode->type == TSDB_SQL_SELECT) {
SArray* pa = taosArrayGetP((*pDag)->pSubplans, 0);
SSubplan* pPlan = taosArrayGetP(pa, 0);
SDataBlockSchema* pDataBlockSchema = &(pPlan->pDataSink->schema);
setResSchemaInfo(pResInfo, pDataBlockSchema);
pRequest->type = TDMT_VND_QUERY;
}
return code;
}
void setResSchemaInfo(SReqResultInfo* pResInfo, const SDataBlockSchema* pDataBlockSchema) {
assert(pDataBlockSchema != NULL && pDataBlockSchema->numOfCols > 0);
pResInfo->numOfCols = pDataBlockSchema->numOfCols;
pResInfo->fields = calloc(pDataBlockSchema->numOfCols, sizeof(pDataBlockSchema->pSchema[0]));
for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
SSchema* pSchema = &pDataBlockSchema->pSchema[i];
pResInfo->fields[i].bytes = pSchema->bytes;
pResInfo->fields[i].type = pSchema->type;
tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name));
}
}
int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) {
int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag) {
if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) {
SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
int32_t code = scheduleExecJob(pRequest->pTscObj->pTransporter, NULL, pDag, pJob, &res);
int32_t code = scheduleExecJob(pRequest->pTscObj->pTransporter, NULL, pDag, &pRequest->body.pQueryJob, &res);
if (code != TSDB_CODE_SUCCESS) {
// handle error and retry
} else {
if (*pJob != NULL) {
scheduleFreeJob(*pJob);
if (pRequest->body.pQueryJob != NULL) {
scheduleFreeJob(pRequest->body.pQueryJob);
}
}
......@@ -217,7 +249,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) {
return res.code;
}
return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL /*todo appInfo.xxx*/, pDag, pJob);
return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL, pDag, &pRequest->body.pQueryJob);
}
TAOS_RES *tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen) {
......@@ -294,7 +326,6 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
SRequestObj *pRequest = NULL;
SQueryNode *pQuery = NULL;
SQueryDag *pDag = NULL;
void *pJob = NULL;
terrno = TSDB_CODE_SUCCESS;
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
......@@ -304,9 +335,8 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
CHECK_CODE_GOTO(execDdlQuery(pRequest, pQuery), _return);
} else {
CHECK_CODE_GOTO(getPlan(pRequest, pQuery, &pDag), _return);
CHECK_CODE_GOTO(scheduleQuery(pRequest, pDag, &pJob), _return);
CHECK_CODE_GOTO(scheduleQuery(pRequest, pDag), _return);
pRequest->code = terrno;
return pRequest;
}
_return:
......@@ -315,6 +345,7 @@ _return:
if (NULL != pRequest && TSDB_CODE_SUCCESS != terrno) {
pRequest->code = terrno;
}
return pRequest;
}
......@@ -513,7 +544,17 @@ void* doFetchRow(SRequestObj* pRequest) {
SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
if (pRequest->type == TDMT_MND_SHOW) {
if (pRequest->type == TDMT_VND_QUERY) {
pRequest->type = TDMT_VND_FETCH;
scheduleFetchRows(pRequest->body.pQueryJob, (void **)&pRequest->body.resInfo.pData);
pResultInfo->current = 0;
if (pResultInfo->numOfRows <= pResultInfo->current) {
return NULL;
}
goto _return;
} else if (pRequest->type == TDMT_MND_SHOW) {
pRequest->type = TDMT_MND_SHOW_RETRIEVE;
} else if (pRequest->type == TDMT_VND_SHOW_TABLES) {
pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
......@@ -556,6 +597,8 @@ void* doFetchRow(SRequestObj* pRequest) {
}
}
_return:
for(int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
pResultInfo->row[i] = pResultInfo->pCol[i] + pResultInfo->fields[i].bytes * pResultInfo->current;
if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
......
此差异已折叠。
......@@ -23,7 +23,7 @@ int vnodeQueryOpen(SVnode *pVnode) { return qWorkerInit(NULL, &pVnode->pQuery);
int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
vTrace("query message is processed");
return qWorkerProcessQueryMsg(pVnode, pVnode->pQuery, pMsg);
return qWorkerProcessQueryMsg(pVnode->pTsdb, pVnode->pQuery, pMsg);
}
int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
......
......@@ -13,6 +13,7 @@ else(0)
"src/tsdbReadImpl.c"
"src/tsdbFile.c"
"src/tsdbFS.c"
"src/tsdbRead.c"
)
endif(0)
......
......@@ -1253,7 +1253,7 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *
pBlock->keyFirst = dataColsKeyFirst(pDataCols);
pBlock->keyLast = dataColsKeyLast(pDataCols);
tsdbDebug("vgId:%d tid:%d a block of data is written to file %s, offset %" PRId64
tsdbDebug("vgId:%d uid:%"PRId64" a block of data is written to file %s, offset %" PRId64
" numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64,
REPO_ID(pRepo), TABLE_TID(pTable), TSDB_FILE_FULL_NAME(pDFile), offset, rowsToWrite, pBlock->len,
pBlock->numOfCols, pBlock->keyFirst, pBlock->keyLast);
......
aux_source_directory(src EXECUTOR_SRC)
add_library(executor ${EXECUTOR_SRC})
#add_library(executor ${EXECUTOR_SRC})
#target_link_libraries(
# executor
# PRIVATE os util common function parser planner qcom tsdb
#)
add_library(executor STATIC ${EXECUTOR_SRC})
#set_target_properties(executor PROPERTIES
# IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/libexecutor.a"
# INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_SOURCE_DIR}/include/libs/executor"
# )
target_link_libraries(executor
PRIVATE os util common function parser planner qcom tsdb
)
target_include_directories(
executor
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/executor"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
executor
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/executor"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_link_libraries(
executor
PRIVATE os util common function parser planner qcom
)
\ No newline at end of file
#if(${BUILD_TEST})
ADD_SUBDIRECTORY(test)
#endif(${BUILD_TEST})
\ No newline at end of file
......@@ -31,10 +31,10 @@ typedef struct SDataSinkManager {
pthread_mutex_t mutex;
} SDataSinkManager;
typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SInputData* pInput, int32_t* pStatus);
typedef void (*FEndPut)(struct SDataSinkHandle* pHandle);
typedef int32_t (*FGetDataLength)(struct SDataSinkHandle* pHandle, int32_t* pStatus);
typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, SOutPutData* pOutput, int32_t* pStatus);
typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue);
typedef void (*FEndPut)(struct SDataSinkHandle* pHandle, int64_t useconds);
typedef void (*FGetDataLength)(struct SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryEnd);
typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, SOutputData* pOutput);
typedef int32_t (*FDestroyDataSinker)(struct SDataSinkHandle* pHandle);
typedef struct SDataSinkHandle {
......
......@@ -38,7 +38,7 @@
#define GET_RES_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t))
#define GET_RES_EXT_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t) + POINTER_BYTES)
#define GET_QID(_r) (((SQInfo*)((_r)->qinfo))->qId)
#define GET_TASKID(_t) (((SExecTaskInfo*)(_t))->id.queryId)
#define curTimeWindowIndex(_winres) ((_winres)->curIndex)
......@@ -157,6 +157,6 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo);
int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, struct STaskRuntimeEnv *pRuntimeEnv, int32_t* offset);
int32_t initUdfInfo(struct SUdfInfo* pUdfInfo);
//int32_t initUdfInfo(struct SUdfInfo* pUdfInfo);
#endif // TDENGINE_QUERYUTIL_H
......@@ -20,6 +20,7 @@
extern "C" {
#endif
#ifdef __cplusplus
}
#endif
......
......@@ -20,24 +20,19 @@
#include "ttszip.h"
#include "tvariant.h"
#include "thash.h"
#include "dataSinkMgt.h"
#include "executil.h"
#include "planner.h"
#include "taosdef.h"
#include "tarray.h"
#include "tfilter.h"
#include "thash.h"
#include "tlockfree.h"
#include "tpagedfile.h"
#include "planner.h"
#include "executor.h"
struct SColumnFilterElem;
typedef struct {
uint32_t numOfTables;
SArray *pGroupList;
SHashObj *map; // speedup acquire the tableQueryInfo by table uid
} STableGroupInfo;
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
#define IS_QUERY_KILLED(_q) ((_q)->code == TSDB_CODE_TSC_QUERY_CANCELLED)
......@@ -51,19 +46,19 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int
#define NEEDTO_COMPRESS_QUERY(size) ((size) > tsCompressColData? 1 : 0)
enum {
// when query starts to execute, this status will set
QUERY_NOT_COMPLETED = 0x1u,
// when this task starts to execute, this status will set
TASK_NOT_COMPLETED = 0x1u,
/* query is over
/* Task is over
* 1. this status is used in one row result query process, e.g., count/sum/first/last/ avg...etc.
* 2. when all data within queried time window, it is also denoted as query_completed
*/
QUERY_COMPLETED = 0x2u,
TASK_COMPLETED = 0x2u,
/* when the result is not completed return to client, this status will be
* usually used in case of interval query with interpolation option
*/
QUERY_OVER = 0x4u,
TASK_OVER = 0x4u,
};
typedef struct SResultRowCell {
......@@ -129,6 +124,7 @@ typedef struct {
} SOperatorProfResult;
typedef struct STaskCostInfo {
int64_t created;
int64_t start;
int64_t end;
......@@ -246,13 +242,14 @@ typedef struct STaskIdInfo {
uint64_t taskId; // this is a subplan id
} STaskIdInfo;
typedef struct STaskInfo {
typedef struct SExecTaskInfo {
STaskIdInfo id;
char *content;
uint32_t status;
STimeWindow window;
STaskCostInfo cost;
int64_t owner; // if it is in execution
int32_t code;
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
pthread_mutex_t lock; // used to synchronize the rsp/query threads
......@@ -260,8 +257,10 @@ typedef struct STaskInfo {
// int32_t dataReady; // denote if query result is ready or not
// void* rspContext; // response context
char *sql; // query sql string
jmp_buf env;
} STaskInfo;
jmp_buf env; //
DataSinkHandle dsHandle;
struct SOperatorInfo *pRoot;
} SExecTaskInfo;
typedef struct STaskRuntimeEnv {
jmp_buf env;
......@@ -269,7 +268,7 @@ typedef struct STaskRuntimeEnv {
uint32_t status; // query status
void* qinfo;
uint8_t scanFlag; // denotes reversed scan of data or not
void* pQueryHandle;
void* pTsdbReadHandle;
int32_t prevGroupId; // previous executed group id
bool enableGroupData;
......@@ -314,8 +313,8 @@ typedef struct SOperatorInfo {
char *name; // name, used to show the query execution plan
void *info; // extension attribution
SExprInfo *pExpr;
STaskRuntimeEnv *pRuntimeEnv;
STaskInfo *pTaskInfo;
STaskRuntimeEnv *pRuntimeEnv; // todo remove it
SExecTaskInfo *pTaskInfo;
struct SOperatorInfo **pDownstream; // downstram pointer list
int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator
......@@ -376,7 +375,7 @@ typedef struct STaskParam {
} STaskParam;
typedef struct STableScanInfo {
void *pQueryHandle;
void *pTsdbReadHandle;
int32_t numOfBlocks;
int32_t numOfSkipped;
int32_t numOfBlockStatis;
......@@ -544,7 +543,7 @@ typedef struct SOrderOperatorInfo {
void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream);
SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime);
SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime);
SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
......@@ -572,11 +571,11 @@ SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pUpstream, int32_t numOfUpstream, SSchema* pSchema, int32_t numOfOutput);
SOperatorInfo* createOrderOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrder* pOrderVal);
SSDataBlock* doGlobalAggregate(void* param, bool* newgroup);
SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup);
SSDataBlock* doSLimit(void* param, bool* newgroup);
//SSDataBlock* doGlobalAggregate(void* param, bool* newgroup);
//SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup);
//SSDataBlock* doSLimit(void* param, bool* newgroup);
int32_t doCreateFilterInfo(SColumnInfo* pCols, int32_t numOfCols, int32_t numOfFilterCols, SSingleColumnFilterInfo** pFilterInfo, uint64_t qId);
//int32_t doCreateFilterInfo(SColumnInfo* pCols, int32_t numOfCols, int32_t numOfFilterCols, SSingleColumnFilterInfo** pFilterInfo, uint64_t qId);
void doSetFilterColumnInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, SSDataBlock* pBlock);
bool doFilterDataBlock(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, int32_t numOfRows, int8_t* p);
void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p);
......@@ -617,14 +616,14 @@ STableQueryInfo* createTmpTableQueryInfo(STimeWindow win);
int32_t buildArithmeticExprFromMsg(SExprInfo *pArithExprInfo, void *pQueryMsg);
bool isQueryKilled(SQInfo *pQInfo);
bool isTaskKilled(SExecTaskInfo *pTaskInfo);
int32_t checkForQueryBuf(size_t numOfTables);
bool checkNeedToCompressQueryCol(SQInfo *pQInfo);
bool doBuildResCheck(SQInfo* pQInfo);
void setQueryStatus(STaskRuntimeEnv *pRuntimeEnv, int8_t status);
bool onlyQueryTags(STaskAttr* pQueryAttr);
void destroyUdfInfo(struct SUdfInfo* pUdfInfo);
//void destroyUdfInfo(struct SUdfInfo* pUdfInfo);
bool isValidQInfo(void *param);
......@@ -644,5 +643,7 @@ void freeQueryAttr(STaskAttr *pQuery);
int32_t getMaximumIdleDurationSec();
void doInvokeUdf(struct SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t type);
void setTaskStatus(SExecTaskInfo *pTaskInfo, int8_t status);
int32_t doCreateExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* readerHandle);
#endif // TDENGINE_EXECUTORIMPL_H
......@@ -19,6 +19,7 @@
#include "tcompression.h"
#include "tglobal.h"
#include "tqueue.h"
#include "executorimpl.h"
#define DATA_META_LENGTH(tables) (sizeof(int32_t) + sizeof(STableIdInfo) * taosHashGetSize(tables) + sizeof(SRetrieveTableRsp))
......@@ -42,6 +43,8 @@ typedef struct SDataDispatchHandle {
STaosQueue* pDataBlocks;
SDataDispatchBuf nextOutput;
int32_t status;
bool queryEnd;
int64_t useconds;
pthread_mutex_t mutex;
} SDataDispatchHandle;
......@@ -106,10 +109,14 @@ static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputDat
SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData;
pEntry->compressed = (int8_t)needCompress(pInput->pData, &(pHandle->schema));
pEntry->numOfRows = pInput->pData->info.rows;
pEntry->dataLen = 0;
pBuf->useSize = DATA_META_LENGTH(pInput->pTableRetrieveTsMap);
copyData(pInput, &pHandle->schema, pEntry->data, pEntry->compressed, &pEntry->dataLen);
pBuf->useSize += (pEntry->compressed ? pEntry->dataLen : pHandle->schema.resultRowSize * pInput->pData->info.rows);
if (0 == pEntry->compressed) {
pEntry->dataLen = pHandle->schema.resultRowSize * pInput->pData->info.rows;
}
pBuf->useSize += pEntry->dataLen;
// todo completed
}
......@@ -124,7 +131,9 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput,
static int32_t updateStatus(SDataDispatchHandle* pDispatcher) {
pthread_mutex_lock(&pDispatcher->mutex);
int32_t status = taosQueueSize(pDispatcher->pDataBlocks) < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery ? DS_CAPACITY_ENOUGH : DS_CAPACITY_FULL;
int32_t blockNums = taosQueueSize(pDispatcher->pDataBlocks);
int32_t status = (0 == blockNums ? DS_BUF_EMPTY :
(blockNums < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL));
pDispatcher->status = status;
pthread_mutex_unlock(&pDispatcher->mutex);
return status;
......@@ -137,7 +146,7 @@ static int32_t getStatus(SDataDispatchHandle* pDispatcher) {
return status;
}
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, int32_t* pStatus) {
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
SDataDispatchBuf* pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf));
if (NULL == pBuf || !allocBuf(pDispatcher, pInput, pBuf)) {
......@@ -145,38 +154,46 @@ static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput,
}
toDataCacheEntry(pDispatcher, pInput, pBuf);
taosWriteQitem(pDispatcher->pDataBlocks, pBuf);
*pStatus = updateStatus(pDispatcher);
*pContinue = (DS_BUF_LOW == updateStatus(pDispatcher) ? true : false);
return TSDB_CODE_SUCCESS;
}
static void endPut(struct SDataSinkHandle* pHandle) {
static void endPut(struct SDataSinkHandle* pHandle, int64_t useconds) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
pthread_mutex_lock(&pDispatcher->mutex);
pDispatcher->status = DS_END;
pDispatcher->queryEnd = true;
pDispatcher->useconds = useconds;
pthread_mutex_unlock(&pDispatcher->mutex);
}
static int32_t getDataLength(SDataSinkHandle* pHandle, int32_t* pStatus) {
static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryEnd) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
if (taosQueueEmpty(pDispatcher->pDataBlocks)) {
*pStatus = getStatus(pDispatcher) ? DS_END : DS_IN_PROCESS;
return 0;
*pQueryEnd = pDispatcher->queryEnd;
*pLen = 0;
return;
}
SDataDispatchBuf* pBuf = NULL;
taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
memcpy(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf));
taosFreeQitem(pBuf);
return ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->dataLen;
*pLen = ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->dataLen;
}
static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutPutData* pOutput, int32_t* pStatus) {
static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDispatcher->nextOutput.pData);
memcpy(pOutput->pData, pEntry->data, pEntry->dataLen);
pOutput->numOfRows = pEntry->numOfRows;
pOutput->compressed = pEntry->compressed;
tfree(pDispatcher->nextOutput.pData); // todo persistent
*pStatus = updateStatus(pDispatcher);
pOutput->bufStatus = updateStatus(pDispatcher);
pthread_mutex_lock(&pDispatcher->mutex);
pOutput->queryEnd = pDispatcher->queryEnd;
pOutput->needSchedule = false;
pOutput->useconds = pDispatcher->useconds;
pOutput->precision = pDispatcher->schema.precision;
pthread_mutex_unlock(&pDispatcher->mutex);
return TSDB_CODE_SUCCESS;
}
......@@ -200,12 +217,14 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSink* pDataS
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
dispatcher->sink.fPut = putDataBlock;
dispatcher->sink.fEndPut = endPut;
dispatcher->sink.fGetLen = getDataLength;
dispatcher->sink.fGetData = getDataBlock;
dispatcher->sink.fDestroy = destroyDataSinker;
dispatcher->pManager = pManager;
dispatcher->schema = pDataSink->schema;
dispatcher->status = DS_CAPACITY_ENOUGH;
dispatcher->status = DS_BUF_EMPTY;
dispatcher->queryEnd = false;
dispatcher->pDataBlocks = taosOpenQueue();
pthread_mutex_init(&dispatcher->mutex, NULL);
if (NULL == dispatcher->pDataBlocks) {
......
......@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tarray.h"
#include "dataSinkMgt.h"
#include "dataSinkInt.h"
#include "planner.h"
......@@ -31,24 +32,24 @@ int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pH
return TSDB_CODE_FAILED;
}
int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, int32_t* pStatus) {
int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* pContinue) {
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
return pHandleImpl->fPut(pHandleImpl, pInput, pStatus);
return pHandleImpl->fPut(pHandleImpl, pInput, pContinue);
}
void dsEndPut(DataSinkHandle handle) {
void dsEndPut(DataSinkHandle handle, int64_t useconds) {
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
return pHandleImpl->fEndPut(pHandleImpl);
return pHandleImpl->fEndPut(pHandleImpl, useconds);
}
int32_t dsGetDataLength(DataSinkHandle handle, int32_t* pStatus) {
void dsGetDataLength(DataSinkHandle handle, int32_t* pLen, bool* pQueryEnd) {
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
return pHandleImpl->fGetLen(pHandleImpl, pStatus);
pHandleImpl->fGetLen(pHandleImpl, pLen, pQueryEnd);
}
int32_t dsGetDataBlock(DataSinkHandle handle, SOutPutData* pOutput, int32_t* pStatus) {
int32_t dsGetDataBlock(DataSinkHandle handle, SOutputData* pOutput) {
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
return pHandleImpl->fGetData(pHandleImpl, pOutput, pStatus);
return pHandleImpl->fGetData(pHandleImpl, pOutput);
}
void dsScheduleProcess(void* ahandle, void* pItem) {
......
......@@ -547,7 +547,7 @@ static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(STaskRuntimeEnv *pRuntimeEnv
pTableQueryInfoList = malloc(POINTER_BYTES * size);
if (pTableQueryInfoList == NULL || posList == NULL || pGroupResInfo->pRows == NULL || pGroupResInfo->pRows == NULL) {
// qError("QInfo:%"PRIu64" failed alloc memory", GET_QID(pRuntimeEnv));
// qError("QInfo:%"PRIu64" failed alloc memory", GET_TASKID(pRuntimeEnv));
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _end;
}
......@@ -619,7 +619,7 @@ static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(STaskRuntimeEnv *pRuntimeEnv
int64_t endt = taosGetTimestampMs();
// qDebug("QInfo:%"PRIx64" result merge completed for group:%d, elapsed time:%" PRId64 " ms", GET_QID(pRuntimeEnv),
// qDebug("QInfo:%"PRIx64" result merge completed for group:%d, elapsed time:%" PRId64 " ms", GET_TASKID(pRuntimeEnv),
// pGroupResInfo->currentGroup, endt - startt);
_end:
......@@ -641,13 +641,13 @@ int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, STaskRuntimeEnv* pRun
break;
}
// qDebug("QInfo:%"PRIu64" no result in group %d, continue", GET_QID(pRuntimeEnv), pGroupResInfo->currentGroup);
// qDebug("QInfo:%"PRIu64" no result in group %d, continue", GET_TASKID(pRuntimeEnv), pGroupResInfo->currentGroup);
cleanupGroupResInfo(pGroupResInfo);
incNextGroup(pGroupResInfo);
}
// int64_t elapsedTime = taosGetTimestampUs() - st;
// qDebug("QInfo:%"PRIu64" merge res data into group, index:%d, total group:%d, elapsed time:%" PRId64 "us", GET_QID(pRuntimeEnv),
// qDebug("QInfo:%"PRIu64" merge res data into group, index:%d, total group:%d, elapsed time:%" PRId64 "us", GET_TASKID(pRuntimeEnv),
// pGroupResInfo->currentGroup, pGroupResInfo->totalGroup, elapsedTime);
return TSDB_CODE_SUCCESS;
......
......@@ -14,10 +14,12 @@
*/
#include "os.h"
#include "tarray.h"
#include "dataSinkMgt.h"
#include "exception.h"
#include "tcache.h"
#include "tglobal.h"
#include "tmsg.h"
#include "exception.h"
#include "thash.h"
#include "executorimpl.h"
......@@ -66,152 +68,24 @@ void freeParam(STaskParam *param) {
tfree(param->prevResult);
}
// todo parse json to get the operator tree.
int32_t qCreateTask(void* tsdb, int32_t vgId, void* pQueryMsg, qTaskInfo_t* pTaskInfo, uint64_t taskId) {
assert(pQueryMsg != NULL && tsdb != NULL);
int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo) {
assert(tsdb != NULL && pSubplan != NULL);
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
int32_t code = TSDB_CODE_SUCCESS;
#if 0
STaskParam param = {0};
code = convertQueryMsg(pQueryMsg, &param);
int32_t code = doCreateExecTaskInfo(pSubplan, pTask, tsdb);
if (code != TSDB_CODE_SUCCESS) {
goto _over;
}
if (pQueryMsg->numOfTables <= 0) {
qError("Invalid number of tables to query, numOfTables:%d", pQueryMsg->numOfTables);
code = TSDB_CODE_QRY_INVALID_MSG;
goto _over;
}
if (param.pTableIdList == NULL || taosArrayGetSize(param.pTableIdList) == 0) {
qError("qmsg:%p, SQueryTableReq wrong format", pQueryMsg);
code = TSDB_CODE_QRY_INVALID_MSG;
goto _over;
}
SQueriedTableInfo info = { .numOfTags = pQueryMsg->numOfTags, .numOfCols = pQueryMsg->numOfCols, .colList = pQueryMsg->tableCols};
if ((code = createQueryFunc(&info, pQueryMsg->numOfOutput, &param.pExprs, param.pExpr, param.pTagColumnInfo,
pQueryMsg->queryType, pQueryMsg, param.pUdfInfo)) != TSDB_CODE_SUCCESS) {
goto _over;
}
if (param.pSecExpr != NULL) {
if ((code = createIndirectQueryFuncExprFromMsg(pQueryMsg, pQueryMsg->secondStageOutput, &param.pSecExprs, param.pSecExpr, param.pExprs, param.pUdfInfo)) != TSDB_CODE_SUCCESS) {
goto _over;
}
}
if (param.colCond != NULL) {
if ((code = createQueryFilter(param.colCond, pQueryMsg->colCondLen, &param.pFilters)) != TSDB_CODE_SUCCESS) {
goto _over;
}
}
param.pGroupbyExpr = createGroupbyExprFromMsg(pQueryMsg, param.pGroupColIndex, &code);
if ((param.pGroupbyExpr == NULL && pQueryMsg->numOfGroupCols != 0) || code != TSDB_CODE_SUCCESS) {
goto _over;
}
bool isSTableQuery = false;
STableGroupInfo tableGroupInfo = {0};
int64_t st = taosGetTimestampUs();
if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_TABLE_QUERY)) {
STableIdInfo *id = taosArrayGet(param.pTableIdList, 0);
qDebug("qmsg:%p query normal table, uid:%"PRId64", tid:%d", pQueryMsg, id->uid, id->tid);
if ((code = tsdbGetOneTableGroup(tsdb, id->uid, pQueryMsg->window.skey, &tableGroupInfo)) != TSDB_CODE_SUCCESS) {
goto _over;
}
} else if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY|TSDB_QUERY_TYPE_STABLE_QUERY)) {
isSTableQuery = true;
// also note there's possibility that only one table in the super table
if (!TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY)) {
STableIdInfo *id = taosArrayGet(param.pTableIdList, 0);
// group by normal column, do not pass the group by condition to tsdb to group table into different group
int32_t numOfGroupByCols = pQueryMsg->numOfGroupCols;
if (pQueryMsg->numOfGroupCols == 1 && !TSDB_COL_IS_TAG(param.pGroupColIndex->flag)) {
numOfGroupByCols = 0;
}
qDebug("qmsg:%p query stable, uid:%"PRIu64", tid:%d", pQueryMsg, id->uid, id->tid);
code = tsdbQuerySTableByTagCond(tsdb, id->uid, pQueryMsg->window.skey, param.tagCond, pQueryMsg->tagCondLen,
pQueryMsg->tagNameRelType, param.tbnameCond, &tableGroupInfo, param.pGroupColIndex, numOfGroupByCols);
if (code != TSDB_CODE_SUCCESS) {
qError("qmsg:%p failed to query stable, reason: %s", pQueryMsg, tstrerror(code));
goto _over;
}
} else {
code = tsdbGetTableGroupFromIdList(tsdb, param.pTableIdList, &tableGroupInfo);
if (code != TSDB_CODE_SUCCESS) {
goto _over;
}
qDebug("qmsg:%p query on %u tables in one group from client", pQueryMsg, tableGroupInfo.numOfTables);
}
int64_t el = taosGetTimestampUs() - st;
qDebug("qmsg:%p tag filter completed, numOfTables:%u, elapsed time:%"PRId64"us", pQueryMsg, tableGroupInfo.numOfTables, el);
} else {
assert(0);
goto _error;
}
code = checkForQueryBuf(tableGroupInfo.numOfTables);
if (code != TSDB_CODE_SUCCESS) { // not enough query buffer, abort
goto _over;
}
assert(pQueryMsg->stableQuery == isSTableQuery);
(*pTaskInfo) = createQInfoImpl(pQueryMsg, param.pGroupbyExpr, param.pExprs, param.pSecExprs, &tableGroupInfo,
param.pTagColumnInfo, param.pFilters, vgId, param.sql, qId, param.pUdfInfo);
param.sql = NULL;
param.pExprs = NULL;
param.pSecExprs = NULL;
param.pGroupbyExpr = NULL;
param.pTagColumnInfo = NULL;
param.pFilters = NULL;
if ((*pTaskInfo) == NULL) {
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _over;
}
param.pUdfInfo = NULL;
code = initQInfo(&pQueryMsg->tsBuf, tsdb, NULL, *pTaskInfo, &param, (char*)pQueryMsg, pQueryMsg->prevResultLen, NULL);
_over:
if (param.pGroupbyExpr != NULL) {
taosArrayDestroy(param.pGroupbyExpr->columnInfo);
}
tfree(param.colCond);
destroyUdfInfo(param.pUdfInfo);
taosArrayDestroy(param.pTableIdList);
param.pTableIdList = NULL;
freeParam(&param);
for (int32_t i = 0; i < pQueryMsg->numOfCols; i++) {
SColumnInfo* column = pQueryMsg->tableCols + i;
freeColumnFilterInfo(column->flist.filterInfo, column->flist.numOfFilters);
}
filterFreeInfo(param.pFilters);
//pTaskInfo already freed in initQInfo, but *pTaskInfo may not pointer to null;
SDataSinkMgtCfg cfg = {.maxDataBlockNum = 1000, .maxDataBlockNumPerQuery = 100};
code = dsDataSinkMgtInit(&cfg);
if (code != TSDB_CODE_SUCCESS) {
*pTaskInfo = NULL;
goto _error;
}
#endif
code = dsCreateDataSinker(pSubplan->pDataSink, &(*pTask)->dsHandle);
_error:
// if failed to add ref for all tables in this query, abort current query
return code;
}
......@@ -250,7 +124,7 @@ int waitMoment(SQInfo* pQInfo){
while(used_ms < ms) {
taosMsleep(1000);
used_ms += 1000;
if(isQueryKilled(pQInfo)){
if(isTaskKilled(pQInfo)){
printf("test check query is canceled, sleep break.%s\n", pQInfo->sql);
break;
}
......@@ -261,68 +135,80 @@ int waitMoment(SQInfo* pQInfo){
}
#endif
bool qExecTask(qTaskInfo_t qinfo, uint64_t *qId) {
SQInfo *pQInfo = (SQInfo *)qinfo;
assert(pQInfo && pQInfo->signature == pQInfo);
int64_t threadId = taosGetSelfPthreadId();
int32_t qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
int64_t threadId = taosGetSelfPthreadId();
int64_t curOwner = 0;
if ((curOwner = atomic_val_compare_exchange_64(&pQInfo->owner, 0, threadId)) != 0) {
qError("QInfo:0x%"PRIx64"-%p qhandle is now executed by thread:%p", pQInfo->qId, pQInfo, (void*) curOwner);
pQInfo->code = TSDB_CODE_QRY_IN_EXEC;
return false;
if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
qError("QInfo:0x%" PRIx64 "-%p qhandle is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo,
(void*)curOwner);
pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
return pTaskInfo->code;
}
*qId = pQInfo->qId;
if(pQInfo->startExecTs == 0)
pQInfo->startExecTs = taosGetTimestampMs();
if (isQueryKilled(pQInfo)) {
qDebug("QInfo:0x%"PRIx64" it is already killed, abort", pQInfo->qId);
return doBuildResCheck(pQInfo);
if (pTaskInfo->cost.start == 0) {
pTaskInfo->cost.start = taosGetTimestampMs();
}
STaskRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
if (pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 0) {
qDebug("QInfo:0x%"PRIx64" no table exists for query, abort", pQInfo->qId);
// setTaskStatus(pRuntimeEnv, QUERY_COMPLETED);
return doBuildResCheck(pQInfo);
if (isTaskKilled(pTaskInfo)) {
qDebug("QInfo:0x%" PRIx64 " it is already killed, abort", GET_TASKID(pTaskInfo));
return pTaskInfo->code;
}
// STaskRuntimeEnv* pRuntimeEnv = &pTaskInfo->runtimeEnv;
// if (pTaskInfo->tableqinfoGroupInfo.numOfTables == 0) {
// qDebug("QInfo:0x%"PRIx64" no table exists for query, abort", GET_TASKID(pTaskInfo));
// setTaskStatus(pTaskInfo, TASK_COMPLETED);
// return doBuildResCheck(pTaskInfo);
// }
// error occurs, record the error code and return to client
int32_t ret = setjmp(pQInfo->runtimeEnv.env);
int32_t ret = setjmp(pTaskInfo->env);
if (ret != TSDB_CODE_SUCCESS) {
publishQueryAbortEvent(pQInfo, ret);
pQInfo->code = ret;
qDebug("QInfo:0x%"PRIx64" query abort due to error/cancel occurs, code:%s", pQInfo->qId, tstrerror(pQInfo->code));
return doBuildResCheck(pQInfo);
publishQueryAbortEvent(pTaskInfo, ret);
pTaskInfo->code = ret;
qDebug("QInfo:0x%" PRIx64 " query abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
return pTaskInfo->code;
}
qDebug("QInfo:0x%"PRIx64" query task is launched", pQInfo->qId);
qDebug("QInfo:0x%" PRIx64 " query task is launched", GET_TASKID(pTaskInfo));
bool newgroup = false;
publishOperatorProfEvent(pRuntimeEnv->proot, QUERY_PROF_BEFORE_OPERATOR_EXEC);
publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_BEFORE_OPERATOR_EXEC);
int64_t st = 0;
int64_t st = taosGetTimestampUs();
pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot, &newgroup);
pQInfo->summary.elapsedTime += (taosGetTimestampUs() - st);
#ifdef TEST_IMPL
waitMoment(pQInfo);
#endif
publishOperatorProfEvent(pRuntimeEnv->proot, QUERY_PROF_AFTER_OPERATOR_EXEC);
pRuntimeEnv->resultInfo.total += GET_NUM_OF_RESULTS(pRuntimeEnv);
if (isQueryKilled(pQInfo)) {
qDebug("QInfo:0x%"PRIx64" query is killed", pQInfo->qId);
} else if (GET_NUM_OF_RESULTS(pRuntimeEnv) == 0) {
qDebug("QInfo:0x%"PRIx64" over, %u tables queried, total %"PRId64" rows returned", pQInfo->qId, pRuntimeEnv->tableqinfoGroupInfo.numOfTables,
pRuntimeEnv->resultInfo.total);
} else {
qDebug("QInfo:0x%"PRIx64" query paused, %d rows returned, total:%" PRId64 " rows", pQInfo->qId,
GET_NUM_OF_RESULTS(pRuntimeEnv), pRuntimeEnv->resultInfo.total);
}
*handle = pTaskInfo->dsHandle;
while(1) {
st = taosGetTimestampUs();
SSDataBlock* pRes = pTaskInfo->pRoot->exec(pTaskInfo->pRoot, &newgroup);
return doBuildResCheck(pQInfo);
pTaskInfo->cost.elapsedTime += (taosGetTimestampUs() - st);
publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pRes == NULL) { // no results generated yet, abort
dsEndPut(pTaskInfo->dsHandle, pTaskInfo->cost.elapsedTime);
return pTaskInfo->code;
}
bool qcontinue = false;
SInputData inputData = {.pData = pRes, .pTableRetrieveTsMap = NULL};
pTaskInfo->code = dsPutDataBlock(pTaskInfo->dsHandle, &inputData, &qcontinue);
if (isTaskKilled(pTaskInfo)) {
qDebug("QInfo:0x%" PRIx64 " task is killed", GET_TASKID(pTaskInfo));
// } else if (GET_NUM_OF_RESULTS(pRuntimeEnv) == 0) {
// qDebug("QInfo:0x%"PRIx64" over, %u tables queried, total %"PRId64" rows returned", pTaskInfo->qId, pRuntimeEnv->tableqinfoGroupInfo.numOfTables,
// pRuntimeEnv->resultInfo.total);
}
if (!qcontinue) {
qDebug("QInfo:0x%"PRIx64" query paused, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d", GET_TASKID(pTaskInfo),
0, 0L, 0);
return pTaskInfo->code;
}
}
}
int32_t qRetrieveQueryResultInfo(qTaskInfo_t qinfo, bool* buildRes, void* pRspContext) {
......@@ -398,13 +284,13 @@ int32_t qKillTask(qTaskInfo_t qinfo) {
}
int32_t qIsTaskCompleted(qTaskInfo_t qinfo) {
SQInfo *pQInfo = (SQInfo *)qinfo;
SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo;
if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
if (pTaskInfo == NULL /*|| !isValidQInfo(pTaskInfo)*/) {
return TSDB_CODE_QRY_INVALID_QHANDLE;
}
return isQueryKilled(pQInfo) || Q_STATUS_EQUAL(pQInfo->runtimeEnv.status, QUERY_OVER);
return isTaskKilled(pTaskInfo) || Q_STATUS_EQUAL(pTaskInfo->status, TASK_OVER);
}
void qDestroyTask(qTaskInfo_t qHandle) {
......
MESSAGE(STATUS "build parser unit test")
# GoogleTest requires at least C++11
SET(CMAKE_CXX_STANDARD 11)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
ADD_EXECUTABLE(executorTest ${SOURCE_LIST})
TARGET_LINK_LIBRARIES(
executorTest
PUBLIC os util common transport gtest taos qcom executor function planner
)
TARGET_INCLUDE_DIRECTORIES(
executorTest
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/executor/"
PRIVATE "${CMAKE_SOURCE_DIR}/source/libs/executor/inc"
)
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <executorimpl.h>
#include <gtest/gtest.h>
#include <tglobal.h>
#include <iostream>
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
#include "os.h"
#include "taos.h"
#include "tdef.h"
#include "tvariant.h"
#include "tep.h"
#include "trpc.h"
#include "stub.h"
#include "executor.h"
/**
{
"Id": {
"QueryId": 1.3108161807422521e+19,
"TemplateId": 0,
"SubplanId": 0
},
"Node": {
"Name": "TableScan",
"Targets": [{
"Base": {
"Schema": {
"Type": 9,
"ColId": 5000,
"Bytes": 8
},
"Columns": [{
"TableId": 1,
"Flag": 0,
"Info": {
"ColId": 1,
"Type": 9,
"Bytes": 8
}
}],
"InterBytes": 0
},
"Expr": {
"Type": 4,
"Column": {
"Type": 9,
"ColId": 1,
"Bytes": 8
}
}
}, {
"Base": {
"Schema": {
"Type": 4,
"ColId": 5001,
"Bytes": 4
},
"Columns": [{
"TableId": 1,
"Flag": 0,
"Info": {
"ColId": 2,
"Type": 4,
"Bytes": 4
}
}],
"InterBytes": 0
},
"Expr": {
"Type": 4,
"Column": {
"Type": 4,
"ColId": 2,
"Bytes": 4
}
}
}],
"InputSchema": [{
"Type": 9,
"ColId": 5000,
"Bytes": 8
}, {
"Type": 4,
"ColId": 5001,
"Bytes": 4
}],
"TableScan": {
"TableId": 1,
"TableType": 2,
"Flag": 0,
"Window": {
"StartKey": -9.2233720368547758e+18,
"EndKey": 9.2233720368547758e+18
}
}
},
"DataSink": {
"Name": "Dispatch",
"Dispatch": {
}
}
}
*/
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
TEST(testCase, build_executor_tree_Test) {
const char* msg = "{\n"
"\t\"Id\":\t{\n"
"\t\t\"QueryId\":\t1.3108161807422521e+19,\n"
"\t\t\"TemplateId\":\t0,\n"
"\t\t\"SubplanId\":\t0\n"
"\t},\n"
"\t\"Node\":\t{\n"
"\t\t\"Name\":\t\"TableScan\",\n"
"\t\t\"Targets\":\t[{\n"
"\t\t\t\t\"Base\":\t{\n"
"\t\t\t\t\t\"Schema\":\t{\n"
"\t\t\t\t\t\t\"Type\":\t9,\n"
"\t\t\t\t\t\t\"ColId\":\t5000,\n"
"\t\t\t\t\t\t\"Bytes\":\t8\n"
"\t\t\t\t\t},\n"
"\t\t\t\t\t\"Columns\":\t[{\n"
"\t\t\t\t\t\t\t\"TableId\":\t1,\n"
"\t\t\t\t\t\t\t\"Flag\":\t0,\n"
"\t\t\t\t\t\t\t\"Info\":\t{\n"
"\t\t\t\t\t\t\t\t\"ColId\":\t1,\n"
"\t\t\t\t\t\t\t\t\"Type\":\t9,\n"
"\t\t\t\t\t\t\t\t\"Bytes\":\t8\n"
"\t\t\t\t\t\t\t}\n"
"\t\t\t\t\t\t}],\n"
"\t\t\t\t\t\"InterBytes\":\t0\n"
"\t\t\t\t},\n"
"\t\t\t\t\"Expr\":\t{\n"
"\t\t\t\t\t\"Type\":\t4,\n"
"\t\t\t\t\t\"Column\":\t{\n"
"\t\t\t\t\t\t\"Type\":\t9,\n"
"\t\t\t\t\t\t\"ColId\":\t1,\n"
"\t\t\t\t\t\t\"Bytes\":\t8\n"
"\t\t\t\t\t}\n"
"\t\t\t\t}\n"
"\t\t\t}, {\n"
"\t\t\t\t\"Base\":\t{\n"
"\t\t\t\t\t\"Schema\":\t{\n"
"\t\t\t\t\t\t\"Type\":\t4,\n"
"\t\t\t\t\t\t\"ColId\":\t5001,\n"
"\t\t\t\t\t\t\"Bytes\":\t4\n"
"\t\t\t\t\t},\n"
"\t\t\t\t\t\"Columns\":\t[{\n"
"\t\t\t\t\t\t\t\"TableId\":\t1,\n"
"\t\t\t\t\t\t\t\"Flag\":\t0,\n"
"\t\t\t\t\t\t\t\"Info\":\t{\n"
"\t\t\t\t\t\t\t\t\"ColId\":\t2,\n"
"\t\t\t\t\t\t\t\t\"Type\":\t4,\n"
"\t\t\t\t\t\t\t\t\"Bytes\":\t4\n"
"\t\t\t\t\t\t\t}\n"
"\t\t\t\t\t\t}],\n"
"\t\t\t\t\t\"InterBytes\":\t0\n"
"\t\t\t\t},\n"
"\t\t\t\t\"Expr\":\t{\n"
"\t\t\t\t\t\"Type\":\t4,\n"
"\t\t\t\t\t\"Column\":\t{\n"
"\t\t\t\t\t\t\"Type\":\t4,\n"
"\t\t\t\t\t\t\"ColId\":\t2,\n"
"\t\t\t\t\t\t\"Bytes\":\t4\n"
"\t\t\t\t\t}\n"
"\t\t\t\t}\n"
"\t\t\t}],\n"
"\t\t\"InputSchema\":\t[{\n"
"\t\t\t\t\"Type\":\t9,\n"
"\t\t\t\t\"ColId\":\t5000,\n"
"\t\t\t\t\"Bytes\":\t8\n"
"\t\t\t}, {\n"
"\t\t\t\t\"Type\":\t4,\n"
"\t\t\t\t\"ColId\":\t5001,\n"
"\t\t\t\t\"Bytes\":\t4\n"
"\t\t\t}],\n"
"\t\t\"TableScan\":\t{\n"
"\t\t\t\"TableId\":\t1,\n"
"\t\t\t\"TableType\":\t2,\n"
"\t\t\t\"Flag\":\t0,\n"
"\t\t\t\"Window\":\t{\n"
"\t\t\t\t\"StartKey\":\t-9.2233720368547758e+18,\n"
"\t\t\t\t\"EndKey\":\t9.2233720368547758e+18\n"
"\t\t\t}\n"
"\t\t}\n"
"\t},\n"
"\t\"DataSink\":\t{\n"
"\t\t\"Name\":\t\"Dispatch\",\n"
"\t\t\"Dispatch\":\t{\n"
"\t\t}\n"
"\t}\n"
"}";
SExecTaskInfo* pTaskInfo = nullptr;
int32_t code = qCreateExecTask((void*) 1, 2, NULL, (void**) &pTaskInfo);
}
\ No newline at end of file
......@@ -70,6 +70,9 @@ int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) {
int32_t code = qParserValidateSqlNode(&pCxt->ctx, &info, pQueryInfo, pCxt->pMsg, pCxt->msgLen);
if (code == TSDB_CODE_SUCCESS) {
*pQuery = (SQueryNode*)pQueryInfo;
} else {
terrno = code;
return code;
}
}
......
......@@ -62,7 +62,7 @@ typedef struct SQueryPlanNode {
SSchema *pSchema; // the schema of the input SSDatablock
int32_t numOfCols; // number of input columns
SArray *pExpr; // the query functions or sql aggregations
int32_t numOfExpr; // number of result columns, which is also the number of pExprs
int32_t numOfExpr; // number of result columns, which is also the number of pExprs
void *pExtInfo; // additional information
// children operator to generated result for current node to process
// in case of join, multiple prev nodes exist.
......
......@@ -88,16 +88,20 @@ static bool copySchema(SDataBlockSchema* dst, const SDataBlockSchema* src) {
}
static bool toDataBlockSchema(SQueryPlanNode* pPlanNode, SDataBlockSchema* dataBlockSchema) {
dataBlockSchema->numOfCols = pPlanNode->numOfCols;
dataBlockSchema->pSchema = malloc(sizeof(SSlotSchema) * pPlanNode->numOfCols);
dataBlockSchema->numOfCols = pPlanNode->numOfExpr;
dataBlockSchema->pSchema = malloc(sizeof(SSlotSchema) * pPlanNode->numOfExpr);
if (NULL == dataBlockSchema->pSchema) {
return false;
}
memcpy(dataBlockSchema->pSchema, pPlanNode->pSchema, sizeof(SSlotSchema) * pPlanNode->numOfCols);
dataBlockSchema->resultRowSize = 0;
for (int32_t i = 0; i < dataBlockSchema->numOfCols; ++i) {
for (int32_t i = 0; i < pPlanNode->numOfExpr; ++i) {
SExprInfo* pExprInfo = taosArrayGetP(pPlanNode->pExpr, i);
memcpy(&dataBlockSchema->pSchema[i], &pExprInfo->base.resSchema, sizeof(SSlotSchema));
dataBlockSchema->resultRowSize += dataBlockSchema->pSchema[i].bytes;
}
return true;
}
......@@ -284,7 +288,6 @@ static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTabl
return createUserTableScanNode(pPlanNode, pTable, OP_TableScan);
}
static SPhyNode* createTableScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
SQueryTableInfo* pTable = (SQueryTableInfo*)pPlanNode->pExtInfo;
......@@ -303,8 +306,6 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
case QNODE_TABLESCAN:
node = createTableScanNode(pCxt, pPlanNode);
break;
case QNODE_PROJECT:
// node = create
case QNODE_MODIFY:
// Insert is not an operator in a physical plan.
break;
......
......@@ -154,9 +154,13 @@ static bool fromRawArrayWithAlloc(const cJSON* json, const char* name, FFromJson
return fromItem(jArray, func, *array, itemSize, *size);
}
static bool fromRawArray(const cJSON* json, const char* name, FFromJson func, void* array, int32_t itemSize, int32_t* size) {
static bool fromRawArray(const cJSON* json, const char* name, FFromJson func, void** array, int32_t itemSize, int32_t* size) {
const cJSON* jArray = getArray(json, name, size);
return fromItem(jArray, func, array, itemSize, *size);
if (*array == NULL) {
*array = calloc(*size, itemSize);
}
return fromItem(jArray, func, *array, itemSize, *size);
}
static char* getString(const cJSON* json, const char* name) {
......@@ -218,7 +222,8 @@ static bool dataBlockSchemaFromJson(const cJSON* json, void* obj) {
SDataBlockSchema* schema = (SDataBlockSchema*)obj;
schema->resultRowSize = getNumber(json, jkDataBlockSchemaResultRowSize);
schema->precision = getNumber(json, jkDataBlockSchemaPrecision);
return fromRawArray(json, jkDataBlockSchemaSlotSchema, schemaFromJson, schema->pSchema, sizeof(SSlotSchema), &schema->numOfCols);
return fromRawArray(json, jkDataBlockSchemaSlotSchema, schemaFromJson, (void**) &(schema->pSchema), sizeof(SSlotSchema), &schema->numOfCols);
}
static const char* jkColumnFilterInfoLowerRelOptr = "LowerRelOptr";
......@@ -920,6 +925,8 @@ int32_t subPlanToString(const SSubplan* subplan, char** str, int32_t* len) {
}
*str = cJSON_Print(json);
printf("%s\n", *str);
*len = strlen(*str) + 1;
return TSDB_CODE_SUCCESS;
}
......
......@@ -57,34 +57,33 @@ void qDestroyQueryDag(struct SQueryDag* pDag) {
}
int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag, uint64_t requestId) {
SQueryPlanNode* logicPlan;
int32_t code = createQueryPlan(pNode, &logicPlan);
SQueryPlanNode* pLogicPlan;
int32_t code = createQueryPlan(pNode, &pLogicPlan);
if (TSDB_CODE_SUCCESS != code) {
destroyQueryPlan(logicPlan);
destroyQueryPlan(pLogicPlan);
return code;
}
//
if (logicPlan->info.type != QNODE_MODIFY) {
// char* str = NULL;
// queryPlanToString(logicPlan, &str);
// printf("%s\n", str);
if (pLogicPlan->info.type != QNODE_MODIFY) {
char* str = NULL;
queryPlanToString(pLogicPlan, &str);
printf("%s\n", str);
}
code = optimizeQueryPlan(logicPlan);
code = optimizeQueryPlan(pLogicPlan);
if (TSDB_CODE_SUCCESS != code) {
destroyQueryPlan(logicPlan);
destroyQueryPlan(pLogicPlan);
return code;
}
code = createDag(logicPlan, NULL, pDag, requestId);
code = createDag(pLogicPlan, NULL, pDag, requestId);
if (TSDB_CODE_SUCCESS != code) {
destroyQueryPlan(logicPlan);
destroyQueryPlan(pLogicPlan);
qDestroyQueryDag(*pDag);
return code;
}
destroyQueryPlan(logicPlan);
destroyQueryPlan(pLogicPlan);
return TSDB_CODE_SUCCESS;
}
......
......@@ -62,6 +62,9 @@ protected:
}
SQueryDag* dag = nullptr;
uint64_t requestId = 20;
SSchema *schema = NULL;
uint32_t numOfOutput = 0;
code = qCreateQueryDag(query, &dag, requestId);
dag_.reset(dag);
return code;
......
aux_source_directory(src QWORKER_SRC)
add_library(qworker ${QWORKER_SRC})
#add_library(qworker ${QWORKER_SRC})
#target_include_directories(
# qworker
# PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/qworker"
# PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
#)
#
#target_link_libraries(
# qworker
# PRIVATE os util transport planner qcom executor
#)
add_library(qworker STATIC ${QWORKER_SRC})
target_include_directories(
qworker
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/qworker"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_link_libraries(
qworker
PRIVATE os util transport planner qcom
)
#set_target_properties(qworker PROPERTIES
# IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/libqworker.a"
# INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_SOURCE_DIR}/include/libs/qworker"
# )
target_link_libraries(qworker
PRIVATE os util transport planner qcom executor
)
if(${BUILD_TEST})
ADD_SUBDIRECTORY(test)
......
......@@ -67,15 +67,17 @@ typedef struct SQWTaskStatus {
bool drop;
} SQWTaskStatus;
typedef struct SQWorkerResCache {
SRWLatch lock;
void *data;
} SQWorkerResCache;
typedef struct SQWorkerTaskHandlesCache {
SRWLatch lock;
bool needRsp;
qTaskInfo_t taskHandle;
DataSinkHandle sinkHandle;
} SQWorkerTaskHandlesCache;
typedef struct SQWSchStatus {
int32_t lastAccessTs; // timestamp in second
SRWLatch tasksLock;
SHashObj *tasksHash; // key:queryId+taskId, value: SQWorkerTaskStatus
SHashObj *tasksHash; // key:queryId+taskId, value: SQWTaskStatus
} SQWSchStatus;
// Qnode/Vnode level task management
......@@ -83,7 +85,7 @@ typedef struct SQWorkerMgmt {
SQWorkerCfg cfg;
SRWLatch schLock;
SRWLatch resLock;
SHashObj *schHash; //key: schedulerId, value: SQWorkerSchStatus
SHashObj *schHash; //key: schedulerId, value: SQWSchStatus
SHashObj *resHash; //key: queryId+taskId, value: SQWorkerResCache
} SQWorkerMgmt;
......
#include "qworker.h"
#include "tname.h"
#include <common.h>
#include "executor.h"
#include "planner.h"
#include "query.h"
#include "qworkerInt.h"
#include "tmsg.h"
#include "tname.h"
#include "dataSinkMgt.h"
int32_t qwValidateStatus(int8_t oriStatus, int8_t newStatus) {
int32_t code = 0;
......@@ -89,15 +92,16 @@ int32_t qwUpdateTaskInfo(SQWTaskStatus *task, int8_t type, void *data) {
return TSDB_CODE_SUCCESS;
}
int32_t qwAddTaskResCache(SQWorkerMgmt *mgmt, uint64_t qId, uint64_t tId, void *data) {
int32_t qwAddTaskHandlesToCache(SQWorkerMgmt *mgmt, uint64_t qId, uint64_t tId, qTaskInfo_t taskHandle, DataSinkHandle sinkHandle) {
char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId);
SQWorkerResCache resCache = {0};
resCache.data = data;
SQWorkerTaskHandlesCache resCache = {0};
resCache.taskHandle = taskHandle;
resCache.sinkHandle = sinkHandle;
QW_LOCK(QW_WRITE, &mgmt->resLock);
if (0 != taosHashPut(mgmt->resHash, id, sizeof(id), &resCache, sizeof(SQWorkerResCache))) {
if (0 != taosHashPut(mgmt->resHash, id, sizeof(id), &resCache, sizeof(SQWorkerTaskHandlesCache))) {
QW_UNLOCK(QW_WRITE, &mgmt->resLock);
qError("taosHashPut queryId[%"PRIx64"] taskId[%"PRIx64"] to resHash failed", qId, tId);
return TSDB_CODE_QRY_APP_ERROR;
......@@ -246,13 +250,13 @@ static int32_t qwAddTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_
QW_RET(code);
}
static FORCE_INLINE int32_t qwAcquireTaskResCache(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, SQWorkerResCache **res) {
static FORCE_INLINE int32_t qwAcquireTaskHandles(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, SQWorkerTaskHandlesCache **handles) {
char id[sizeof(queryId) + sizeof(taskId)] = {0};
QW_SET_QTID(id, queryId, taskId);
QW_LOCK(rwType, &mgmt->resLock);
*res = taosHashGet(mgmt->resHash, id, sizeof(id));
if (NULL == (*res)) {
*handles = taosHashGet(mgmt->resHash, id, sizeof(id));
if (NULL == (*handles)) {
QW_UNLOCK(rwType, &mgmt->resLock);
return TSDB_CODE_QRY_RES_CACHE_NOT_EXIST;
}
......@@ -602,19 +606,36 @@ int32_t qwBuildAndSendStatusRsp(SRpcMsg *pMsg, SSchedulerStatusRsp *sStatus) {
return TSDB_CODE_SUCCESS;
}
int32_t qwBuildAndSendFetchRsp(SRpcMsg *pMsg, void *data) {
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
int32_t qwInitFetchRsp(int32_t length, SRetrieveTableRsp **rsp) {
int32_t msgSize = sizeof(SRetrieveTableRsp) + length;
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(msgSize);
if (NULL == pRsp) {
qError("rpcMallocCont %d failed", msgSize);
QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
memset(pRsp, 0, sizeof(SRetrieveTableRsp));
//TODO fill msg
pRsp->completed = true;
*rsp = pRsp;
return TSDB_CODE_SUCCESS;
}
int32_t qwBuildAndSendFetchRsp(SRpcMsg *pMsg, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) {
if (NULL == pRsp) {
pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
memset(pRsp, 0, sizeof(SRetrieveTableRsp));
dataLength = 0;
}
SRpcMsg rpcRsp = {
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.pCont = pRsp,
.contLen = sizeof(*pRsp),
.code = 0,
.contLen = sizeof(*pRsp) + dataLength,
.code = code,
};
rpcSendResponse(&rpcRsp);
......@@ -847,62 +868,6 @@ int32_t qwCheckTaskCancelDrop( SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryI
return TSDB_CODE_SUCCESS;
}
int32_t qwHandleFetch(SQWorkerResCache *res, SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) {
SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL;
int32_t code = 0;
int32_t needRsp = true;
void *data = NULL;
QW_ERR_JRET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR));
QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task));
QW_LOCK(QW_READ, &task->lock);
if (task->cancel || task->drop) {
qError("task is already cancelled or dropped");
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
if (task->status != JOB_TASK_STATUS_EXECUTING && task->status != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
qError("invalid status %d for fetch", task->status);
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
if (QW_GOT_RES_DATA(res->data)) {
data = res->data;
if (QW_LOW_RES_DATA(res->data)) {
if (task->status == JOB_TASK_STATUS_PARTIAL_SUCCEED) {
//TODO add query back to queue
}
}
} else {
if (task->status != JOB_TASK_STATUS_EXECUTING) {
qError("invalid status %d for fetch without res", task->status);
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
//TODO SET FLAG FOR QUERY TO SEND RSP WHEN RES READY
needRsp = false;
}
_return:
if (task) {
QW_UNLOCK(QW_READ, &task->lock);
qwReleaseTask(QW_READ, sch);
}
if (sch) {
qwReleaseScheduler(QW_READ, mgmt);
}
if (needRsp) {
qwBuildAndSendFetchRsp(pMsg, res->data);
}
QW_RET(code);
}
int32_t qwQueryPostProcess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int8_t status, int32_t errCode) {
SQWSchStatus *sch = NULL;
......@@ -956,6 +921,113 @@ int32_t qwQueryPostProcess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint6
return TSDB_CODE_SUCCESS;
}
int32_t qwHandleFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) {
SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL;
int32_t code = 0;
int32_t needRsp = true;
void *data = NULL;
int32_t sinkStatus = 0;
int32_t dataLength = 0;
SRetrieveTableRsp *rsp = NULL;
bool queryEnd = false;
SQWorkerTaskHandlesCache *handles = NULL;
QW_ERR_JRET(qwAcquireTaskHandles(QW_READ, mgmt, queryId, taskId, &handles));
QW_ERR_JRET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR));
QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task));
QW_LOCK(QW_READ, &task->lock);
if (task->cancel || task->drop) {
qError("task is already cancelled or dropped");
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
if (task->status != JOB_TASK_STATUS_EXECUTING && task->status != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
qError("invalid status %d for fetch", task->status);
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
dsGetDataLength(handles->sinkHandle, &dataLength, &queryEnd);
if (dataLength > 0) {
SOutputData output = {0};
QW_ERR_JRET(qwInitFetchRsp(dataLength, &rsp));
output.pData = rsp->data;
code = dsGetDataBlock(handles->sinkHandle, &output);
if (code) {
qError("dsGetDataBlock failed, code:%x", code);
QW_ERR_JRET(code);
}
rsp->useconds = htobe64(output.useconds);
rsp->completed = 0;
rsp->precision = output.precision;
rsp->compressed = output.compressed;
rsp->compLen = htonl(dataLength);
rsp->numOfRows = htonl(output.numOfRows);
if (DS_BUF_EMPTY == output.bufStatus && output.queryEnd) {
rsp->completed = 1;
}
if (output.needSchedule) {
//TODO
}
if ((!output.queryEnd) && DS_BUF_LOW == output.bufStatus) {
//TODO
//UPDATE STATUS TO EXECUTING
}
} else {
if (dataLength < 0) {
qError("invalid length from dsGetDataLength, length:%d", dataLength);
QW_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
if (queryEnd) {
QW_ERR_JRET(qwQueryPostProcess(mgmt, sId, queryId, taskId, JOB_TASK_STATUS_SUCCEED, code));
} else {
if (task->status != JOB_TASK_STATUS_EXECUTING) {
qError("invalid status %d for fetch without res", task->status);
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
QW_LOCK(QW_WRITE, &handles->lock);
handles->needRsp = true;
QW_UNLOCK(QW_WRITE, &handles->lock);
needRsp = false;
}
}
_return:
if (task) {
QW_UNLOCK(QW_READ, &task->lock);
qwReleaseTask(QW_READ, sch);
}
if (sch) {
qwReleaseScheduler(QW_READ, mgmt);
}
if (needRsp) {
qwBuildAndSendFetchRsp(pMsg, rsp, dataLength, code);
}
if (handles) {
qwReleaseTaskResCache(QW_READ, mgmt);
}
QW_RET(code);
}
int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt) {
SQWorkerMgmt *mgmt = calloc(1, sizeof(SQWorkerMgmt));
if (NULL == mgmt) {
......@@ -1008,10 +1080,9 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
msg->taskId = htobe64(msg->taskId);
msg->contentLen = ntohl(msg->contentLen);
bool queryDone = false;
bool queryRsped = false;
bool needStop = false;
SSubplan *plan = NULL;
struct SSubplan *plan = NULL;
QW_ERR_JRET(qwCheckTaskCancelDrop(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, &needStop));
if (needStop) {
......@@ -1025,11 +1096,10 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
QW_ERR_JRET(code);
}
//TODO call executer to init subquery
code = 0; // return error directly
//TODO call executer to init subquery
qTaskInfo_t pTaskInfo = NULL;
code = qCreateExecTask(node, 0, (struct SSubplan *)plan, &pTaskInfo);
if (code) {
qError("qCreateExecTask failed, code:%x", code);
QW_ERR_JRET(code);
} else {
QW_ERR_JRET(qwAddTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, JOB_TASK_STATUS_EXECUTING, QW_EXIST_RET_ERR, NULL, NULL));
......@@ -1038,18 +1108,15 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
QW_ERR_JRET(qwBuildAndSendQueryRsp(pMsg, TSDB_CODE_SUCCESS));
queryRsped = true;
//TODO call executer to execute subquery
code = 0;
void *data = NULL;
queryDone = false;
//TODO call executer to execute subquery
DataSinkHandle sinkHandle = NULL;
code = qExecTask(pTaskInfo, &sinkHandle);
if (code) {
qError("qExecTask failed, code:%x", code);
QW_ERR_JRET(code);
} else {
QW_ERR_JRET(qwAddTaskResCache(qWorkerMgmt, msg->queryId, msg->taskId, data));
QW_ERR_JRET(qwAddTaskHandlesToCache(qWorkerMgmt, msg->queryId, msg->taskId, pTaskInfo, sinkHandle));
QW_ERR_JRET(qwUpdateTaskStatus(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, JOB_TASK_STATUS_PARTIAL_SUCCEED));
}
......@@ -1064,8 +1131,6 @@ _return:
int8_t status = 0;
if (TSDB_CODE_SUCCESS != code) {
status = JOB_TASK_STATUS_FAILED;
} else if (queryDone) {
status = JOB_TASK_STATUS_SUCCEED;
} else {
status = JOB_TASK_STATUS_PARTIAL_SUCCEED;
}
......@@ -1075,6 +1140,49 @@ _return:
QW_RET(code);
}
int32_t qWorkerProcessQueryContinueMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
int32_t code = 0;
int8_t status = 0;
bool queryDone = false;
uint64_t sId, qId, tId;
//TODO call executer to continue execute subquery
code = 0;
void *data = NULL;
queryDone = false;
//TODO call executer to continue execute subquery
if (TSDB_CODE_SUCCESS != code) {
status = JOB_TASK_STATUS_FAILED;
} else if (queryDone) {
status = JOB_TASK_STATUS_SUCCEED;
} else {
status = JOB_TASK_STATUS_PARTIAL_SUCCEED;
}
code = qwQueryPostProcess(qWorkerMgmt, sId, qId, tId, status, code);
QW_RET(code);
}
int32_t qWorkerProcessSinkDataMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
SSinkDataReq *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
qError("invalid sink data msg");
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
//TODO
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT;
......@@ -1137,17 +1245,10 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
QW_ERR_RET(qwUpdateSchLastAccess(qWorkerMgmt, msg->sId));
void *data = NULL;
SQWorkerResCache *res = NULL;
int32_t code = 0;
QW_ERR_RET(qwAcquireTaskResCache(QW_READ, qWorkerMgmt, msg->queryId, msg->taskId, &res));
QW_ERR_JRET(qwHandleFetch(res, qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, pMsg));
_return:
QW_ERR_RET(qwHandleFetch(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, pMsg));
qwReleaseTaskResCache(QW_READ, qWorkerMgmt);
QW_RET(code);
}
......@@ -1220,31 +1321,6 @@ int32_t qWorkerProcessShowFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg)
QW_ERR_RET(qwBuildAndSendShowFetchRsp(pMsg, pFetchReq));
}
int32_t qWorkerContinueQuery(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
int32_t code = 0;
int8_t status = 0;
bool queryDone = false;
uint64_t sId, qId, tId;
//TODO call executer to continue execute subquery
code = 0;
void *data = NULL;
queryDone = false;
//TODO call executer to continue execute subquery
if (TSDB_CODE_SUCCESS != code) {
status = JOB_TASK_STATUS_FAILED;
} else if (queryDone) {
status = JOB_TASK_STATUS_SUCCEED;
} else {
status = JOB_TASK_STATUS_PARTIAL_SUCCEED;
}
code = qwQueryPostProcess(qWorkerMgmt, sId, qId, tId, status, code);
QW_RET(code);
}
void qWorkerDestroy(void **qWorkerMgmt) {
if (NULL == qWorkerMgmt || NULL == *qWorkerMgmt) {
return;
......
......@@ -58,7 +58,6 @@ typedef struct SSchLevel {
SArray *subTasks; // Element is SQueryTask
} SSchLevel;
typedef struct SSchTask {
uint64_t taskId; // task id
SRWLatch lock; // task lock
......
......@@ -68,7 +68,7 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING) {
if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING && SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
SCH_TASK_ELOG("rsp msg conflicted with task status, status:%d, rspType:%d", SCH_GET_TASK_STATUS(pTask), msgType);
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
......@@ -87,68 +87,72 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m
int32_t schCheckAndUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
int32_t code = 0;
int8_t oriStatus = SCH_GET_JOB_STATUS(pJob);
int8_t oriStatus = 0;
/*
if (oriStatus == newStatus) {
SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
switch (oriStatus) {
case JOB_TASK_STATUS_NULL:
if (newStatus != JOB_TASK_STATUS_EXECUTING
&& newStatus != JOB_TASK_STATUS_FAILED
&& newStatus != JOB_TASK_STATUS_NOT_START) {
SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
break;
case JOB_TASK_STATUS_NOT_START:
if (newStatus != JOB_TASK_STATUS_CANCELLED) {
SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
break;
case JOB_TASK_STATUS_EXECUTING:
if (newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED
&& newStatus != JOB_TASK_STATUS_FAILED
&& newStatus != JOB_TASK_STATUS_CANCELLING
&& newStatus != JOB_TASK_STATUS_CANCELLED
&& newStatus != JOB_TASK_STATUS_DROPPING) {
SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
break;
case JOB_TASK_STATUS_PARTIAL_SUCCEED:
if (newStatus != JOB_TASK_STATUS_EXECUTING
&& newStatus != JOB_TASK_STATUS_SUCCEED
&& newStatus != JOB_TASK_STATUS_CANCELLED) {
while (true) {
oriStatus = SCH_GET_JOB_STATUS(pJob);
if (oriStatus == newStatus) {
SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
switch (oriStatus) {
case JOB_TASK_STATUS_NULL:
if (newStatus != JOB_TASK_STATUS_NOT_START) {
SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
break;
case JOB_TASK_STATUS_NOT_START:
if (newStatus != JOB_TASK_STATUS_EXECUTING) {
SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
break;
case JOB_TASK_STATUS_EXECUTING:
if (newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED
&& newStatus != JOB_TASK_STATUS_FAILED
&& newStatus != JOB_TASK_STATUS_CANCELLING
&& newStatus != JOB_TASK_STATUS_CANCELLED
&& newStatus != JOB_TASK_STATUS_DROPPING) {
SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
break;
case JOB_TASK_STATUS_PARTIAL_SUCCEED:
if (newStatus != JOB_TASK_STATUS_FAILED
&& newStatus != JOB_TASK_STATUS_SUCCEED
&& newStatus != JOB_TASK_STATUS_DROPPING) {
SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
break;
case JOB_TASK_STATUS_SUCCEED:
case JOB_TASK_STATUS_FAILED:
case JOB_TASK_STATUS_CANCELLING:
if (newStatus != JOB_TASK_STATUS_DROPPING) {
SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
break;
case JOB_TASK_STATUS_CANCELLED:
case JOB_TASK_STATUS_DROPPING:
SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
break;
case JOB_TASK_STATUS_SUCCEED:
case JOB_TASK_STATUS_FAILED:
case JOB_TASK_STATUS_CANCELLING:
if (newStatus != JOB_TASK_STATUS_CANCELLED) {
break;
default:
SCH_JOB_ELOG("invalid job status:%d", oriStatus);
SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
break;
case JOB_TASK_STATUS_CANCELLED:
case JOB_TASK_STATUS_DROPPING:
SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
break;
default:
qError("invalid task status:%d", oriStatus);
return TSDB_CODE_QRY_APP_ERROR;
}
*/
}
SCH_SET_JOB_STATUS(pJob, newStatus);
if (oriStatus != atomic_val_compare_exchange_8(&pJob->status, oriStatus, newStatus)) {
continue;
}
SCH_JOB_DLOG("status updated from %d to %d", oriStatus, newStatus);
SCH_JOB_DLOG("job status updated from %d to %d", oriStatus, newStatus);
break;
}
return TSDB_CODE_SUCCESS;
......@@ -265,7 +269,6 @@ int32_t schRecordTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *ad
int32_t schValidateAndBuildJob(SQueryDag *pDag, SSchJob *pJob) {
int32_t code = 0;
pJob->queryId = pDag->queryId;
if (pDag->numOfSubplans <= 0) {
......@@ -363,7 +366,6 @@ int32_t schValidateAndBuildJob(SQueryDag *pDag, SSchJob *pJob) {
SCH_ERR_JRET(schBuildTaskRalation(pJob, planToTask));
_return:
if (planToTask) {
taosHashCleanup(planToTask);
}
......@@ -495,6 +497,33 @@ int32_t schMoveTaskToFailList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
return TSDB_CODE_SUCCESS;
}
int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
if (0 != taosHashRemove(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId))) {
SCH_TASK_WLOG("remove task from succTask list failed, may not exist, status:%d", SCH_GET_TASK_STATUS(pTask));
}
int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
if (0 != code) {
if (HASH_NODE_EXIST(code)) {
*moved = true;
SCH_TASK_ELOG("task already in execTask list, status:%d", SCH_GET_TASK_STATUS(pTask));
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:%d", errno);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
*moved = true;
SCH_TASK_DLOG("task moved to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
return TSDB_CODE_SUCCESS;
}
int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, bool *needRetry) {
// TODO set retry or not based on task type/errCode/retry times/job status/available eps...
// TODO if needRetry, set task retry info
......@@ -507,6 +536,28 @@ int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, b
}
// Note: no more error processing, handled in function internal
int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) {
// if already FAILED, no more processing
SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_FAILED));
if (errCode) {
atomic_store_32(&pJob->errCode, errCode);
}
if (atomic_load_8(&pJob->userFetch) || ((!SCH_JOB_NEED_FETCH(&pJob->attr)) && pJob->attr.syncSchedule)) {
tsem_post(&pJob->rspSem);
}
SCH_ERR_RET(atomic_load_32(&pJob->errCode));
assert(0);
}
// Note: no more error processing, handled in function internal
int32_t schFetchFromRemote(SSchJob *pJob) {
int32_t code = 0;
......@@ -515,7 +566,13 @@ int32_t schFetchFromRemote(SSchJob *pJob) {
return TSDB_CODE_SUCCESS;
}
if (atomic_load_ptr(&pJob->res))
void *res = atomic_load_ptr(&pJob->res);
if (res) {
atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0);
SCH_JOB_DLOG("res already fetched, res:%p", res);
return TSDB_CODE_SUCCESS;
}
SCH_ERR_JRET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, TDMT_VND_FETCH));
......@@ -525,25 +582,9 @@ _return:
atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0);
return code;
}
schProcessOnJobFailure(pJob, code);
// Note: no more error processing, handled in function internal
int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) {
SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_FAILED));
if (errCode) {
atomic_store_32(&pJob->errCode, errCode);
}
if (atomic_load_8(&pJob->userFetch) || ((!SCH_JOB_NEED_FETCH(&pJob->attr)) && pJob->attr.syncSchedule)) {
tsem_post(&pJob->rspSem);
}
SCH_ERR_RET(atomic_load_32(&pJob->errCode));
assert(0);
return code;
}
......@@ -643,7 +684,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
SCH_ERR_RET(code);
}
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCCEED);
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PARTIAL_SUCCEED);
SCH_ERR_JRET(schRecordTaskSucceedNode(pTask));
......@@ -675,6 +716,11 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
}
pJob->fetchTask = pTask;
code = schMoveTaskToExecList(pJob, pTask, &moved);
if (code && moved) {
SCH_ERR_RET(code);
}
SCH_ERR_RET(schProcessOnJobPartialSuccess(pJob));
......@@ -813,7 +859,7 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in
SSchJob **job = taosHashGet(schMgmt.jobs, &pParam->queryId, sizeof(pParam->queryId));
if (NULL == job || NULL == (*job)) {
qError("QID:%"PRIx64" taosHashGet queryId not exist", pParam->queryId);
qError("QID:%"PRIx64" taosHashGet queryId not exist, may be dropped", pParam->queryId);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
......@@ -822,7 +868,10 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in
atomic_add_fetch_32(&pJob->ref, 1);
int32_t s = taosHashGetSize(pJob->execTasks);
assert(s != 0);
if (s <= 0) {
qError("QID:%"PRIx64",TID:%"PRIx64" no task in execTask list", pParam->queryId, pParam->taskId);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
SSchTask **task = taosHashGet(pJob->execTasks, &pParam->taskId, sizeof(pParam->taskId));
if (NULL == task || NULL == (*task)) {
......@@ -1147,7 +1196,7 @@ void schDropTaskOnExecutedNode(SSchJob *pJob, SSchTask *pTask) {
int32_t size = (int32_t)taosArrayGetSize(pTask->execAddrs);
if (size <= 0) {
SCH_TASK_DLOG("empty exec address, status:%d", SCH_GET_TASK_STATUS(pTask));
SCH_TASK_DLOG("task has no exec address, no need to drop it, status:%d", SCH_GET_TASK_STATUS(pTask));
return;
}
......@@ -1157,6 +1206,8 @@ void schDropTaskOnExecutedNode(SSchJob *pJob, SSchTask *pTask) {
schBuildAndSendMsg(pJob, pTask, addr, TDMT_VND_DROP_TASK);
}
SCH_TASK_DLOG("task has %d exec address", size);
}
void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) {
......@@ -1178,7 +1229,7 @@ void schDropJobAllTasks(SSchJob *pJob) {
schDropTaskInHashList(pJob, pJob->failTasks);
}
int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, void** job, bool syncSchedule) {
int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** job, bool syncSchedule) {
if (nodeList && taosArrayGetSize(nodeList) <= 0) {
qInfo("QID:%"PRIx64" input nodeList is empty", pDag->queryId);
}
......@@ -1291,14 +1342,14 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
return TSDB_CODE_SUCCESS;
}
int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob, SQueryResult *pRes) {
int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob, SQueryResult *pRes) {
if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
SSchJob *job = NULL;
SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, (void **)&job, true));
SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, &job, true));
*pJob = job;
......@@ -1308,14 +1359,14 @@ int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void
return TSDB_CODE_SUCCESS;
}
int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob) {
if (NULL == transport || /*NULL == nodeList || */NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob) {
if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
SSchJob *job = NULL;
SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, (void **)&job, false));
SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, &job, false));
*pJob = job;
......@@ -1323,14 +1374,18 @@ int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag,
}
int32_t scheduleFetchRows(void *job, void **data) {
if (NULL == job || NULL == data) {
int32_t scheduleFetchRows(SSchJob *pJob, void** pData) {
if (NULL == pJob || NULL == pData) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
SSchJob *pJob = job;
int32_t code = 0;
int8_t status = SCH_GET_JOB_STATUS(pJob);
if (status == JOB_TASK_STATUS_DROPPING) {
SCH_JOB_ELOG("job is dropping, status:%d", status);
return TSDB_CODE_SCH_STATUS_ERROR;
}
atomic_add_fetch_32(&pJob->ref, 1);
if (!SCH_JOB_NEED_FETCH(&pJob->attr)) {
......@@ -1345,14 +1400,12 @@ int32_t scheduleFetchRows(void *job, void **data) {
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
int8_t status = SCH_GET_JOB_STATUS(pJob);
if (status == JOB_TASK_STATUS_FAILED) {
*data = atomic_load_ptr(&pJob->res);
*pData = atomic_load_ptr(&pJob->res);
atomic_store_ptr(&pJob->res, NULL);
SCH_ERR_JRET(atomic_load_32(&pJob->errCode));
} else if (status == JOB_TASK_STATUS_SUCCEED) {
*data = atomic_load_ptr(&pJob->res);
*pData = atomic_load_ptr(&pJob->res);
atomic_store_ptr(&pJob->res, NULL);
goto _return;
} else if (status == JOB_TASK_STATUS_PARTIAL_SUCCEED) {
......@@ -1365,6 +1418,7 @@ int32_t scheduleFetchRows(void *job, void **data) {
if (status == JOB_TASK_STATUS_FAILED) {
code = atomic_load_32(&pJob->errCode);
SCH_ERR_JRET(code);
}
if (pJob->res && ((SRetrieveTableRsp *)pJob->res)->completed) {
......@@ -1372,9 +1426,9 @@ int32_t scheduleFetchRows(void *job, void **data) {
}
while (true) {
*data = atomic_load_ptr(&pJob->res);
*pData = atomic_load_ptr(&pJob->res);
if (*data != atomic_val_compare_exchange_ptr(&pJob->res, *data, NULL)) {
if (*pData != atomic_val_compare_exchange_ptr(&pJob->res, *pData, NULL)) {
continue;
}
......@@ -1414,6 +1468,10 @@ void scheduleFreeJob(void *job) {
return;
}
schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_DROPPING);
SCH_JOB_DLOG("job removed from list, no further ref, ref:%d", atomic_load_32(&pJob->ref));
while (true) {
int32_t ref = atomic_load_32(&pJob->ref);
if (0 == ref) {
......@@ -1425,6 +1483,8 @@ void scheduleFreeJob(void *job) {
}
}
SCH_JOB_DLOG("job no ref now, status:%d", SCH_GET_JOB_STATUS(pJob));
if (pJob->status == JOB_TASK_STATUS_EXECUTING) {
schCancelJob(pJob);
}
......@@ -1446,21 +1506,21 @@ void scheduleFreeJob(void *job) {
taosArrayDestroy(pLevel->subTasks);
}
taosHashCleanup(pJob->execTasks);
taosHashCleanup(pJob->failTasks);
taosHashCleanup(pJob->succTasks);
taosArrayDestroy(pJob->levels);
taosHashCleanup(pJob->execTasks);
taosHashCleanup(pJob->failTasks);
taosHashCleanup(pJob->succTasks);
tfree(pJob->res);
tfree(pJob);
}
taosArrayDestroy(pJob->levels);
tfree(pJob->res);
void schedulerDestroy(void) {
if (schMgmt.jobs) {
taosHashCleanup(schMgmt.jobs); //TODO
schMgmt.jobs = NULL;
}
tfree(pJob);
}
void schedulerDestroy(void) {
if (schMgmt.jobs) {
taosHashCleanup(schMgmt.jobs); //TODO
schMgmt.jobs = NULL;
}
}
......@@ -38,52 +38,73 @@ namespace {
extern "C" int32_t schHandleResponseMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode);
void schtInitLogFile() {
const char *defaultLogFileNamePrefix = "taoslog";
const int32_t maxLogFileNum = 10;
tsAsyncLog = 0;
qDebugFlag = 159;
char temp[128] = {0};
sprintf(temp, "%s/%s", tsLogDir, defaultLogFileNamePrefix);
if (taosInitLog(temp, tsNumOfLogLines, maxLogFileNum) < 0) {
printf("failed to open log file in directory:%s\n", tsLogDir);
}
}
void schtBuildQueryDag(SQueryDag *dag) {
uint64_t qId = 0x0000000000000001;
dag->queryId = qId;
dag->numOfSubplans = 2;
dag->pSubplans = taosArrayInit(dag->numOfSubplans, POINTER_BYTES);
SArray *scan = taosArrayInit(1, sizeof(SSubplan));
SArray *merge = taosArrayInit(1, sizeof(SSubplan));
SArray *scan = taosArrayInit(1, POINTER_BYTES);
SArray *merge = taosArrayInit(1, POINTER_BYTES);
SSubplan scanPlan = {0};
SSubplan mergePlan = {0};
scanPlan.id.queryId = qId;
scanPlan.id.templateId = 0x0000000000000002;
scanPlan.id.subplanId = 0x0000000000000003;
scanPlan.type = QUERY_TYPE_SCAN;
scanPlan.execNode.numOfEps = 1;
scanPlan.execNode.nodeId = 1;
scanPlan.execNode.inUse = 0;
scanPlan.execNode.epAddr[0].port = 6030;
strcpy(scanPlan.execNode.epAddr[0].fqdn, "ep0");
scanPlan.pChildren = NULL;
scanPlan.level = 1;
scanPlan.pParents = taosArrayInit(1, POINTER_BYTES);
scanPlan.pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode));
mergePlan.id.queryId = qId;
mergePlan.id.templateId = 0x4444444444;
mergePlan.id.subplanId = 0x5555555555;
mergePlan.type = QUERY_TYPE_MERGE;
mergePlan.level = 0;
mergePlan.execNode.numOfEps = 0;
mergePlan.pChildren = taosArrayInit(1, POINTER_BYTES);
mergePlan.pParents = NULL;
mergePlan.pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode));
SSubplan *scanPlan = (SSubplan *)calloc(1, sizeof(SSubplan));
SSubplan *mergePlan = (SSubplan *)calloc(1, sizeof(SSubplan));
scanPlan->id.queryId = qId;
scanPlan->id.templateId = 0x0000000000000002;
scanPlan->id.subplanId = 0x0000000000000003;
scanPlan->type = QUERY_TYPE_SCAN;
scanPlan->execNode.numOfEps = 1;
scanPlan->execNode.nodeId = 1;
scanPlan->execNode.inUse = 0;
scanPlan->execNode.epAddr[0].port = 6030;
strcpy(scanPlan->execNode.epAddr[0].fqdn, "ep0");
scanPlan->pChildren = NULL;
scanPlan->level = 1;
scanPlan->pParents = taosArrayInit(1, POINTER_BYTES);
scanPlan->pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode));
mergePlan->id.queryId = qId;
mergePlan->id.templateId = 0x4444444444;
mergePlan->id.subplanId = 0x5555555555;
mergePlan->type = QUERY_TYPE_MERGE;
mergePlan->level = 0;
mergePlan->execNode.numOfEps = 0;
mergePlan->pChildren = taosArrayInit(1, POINTER_BYTES);
mergePlan->pParents = NULL;
mergePlan->pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode));
SSubplan *mergePointer = (SSubplan *)taosArrayPush(merge, &mergePlan);
SSubplan *scanPointer = (SSubplan *)taosArrayPush(scan, &scanPlan);
taosArrayPush(mergePointer->pChildren, &scanPointer);
taosArrayPush(scanPointer->pParents, &mergePointer);
taosArrayPush(mergePlan->pChildren, &scanPlan);
taosArrayPush(scanPlan->pParents, &mergePlan);
taosArrayPush(dag->pSubplans, &merge);
taosArrayPush(dag->pSubplans, &scan);
}
void schtFreeQueryDag(SQueryDag *dag) {
}
void schtBuildInsertDag(SQueryDag *dag) {
uint64_t qId = 0x0000000000000002;
......@@ -138,8 +159,8 @@ int32_t schtPlanToString(const SSubplan *subplan, char** str, int32_t* len) {
return 0;
}
int32_t schtExecNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep) {
return 0;
void schtExecNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep) {
}
......@@ -196,7 +217,7 @@ void *schtSendRsp(void *param) {
return NULL;
}
void *pInsertJob = NULL;
struct SSchJob *pInsertJob = NULL;
}
......@@ -207,8 +228,11 @@ TEST(queryTest, normalCase) {
char *dbname = "1.db1";
char *tablename = "table1";
SVgroupInfo vgInfo = {0};
void *pJob = NULL;
SSchJob *pJob = NULL;
SQueryDag dag = {0};
schtInitLogFile();
SArray *qnodeList = taosArrayInit(1, sizeof(SEpAddr));
SEpAddr qnodeAddr = {0};
......@@ -295,6 +319,8 @@ TEST(queryTest, normalCase) {
ASSERT_EQ(data, (void*)NULL);
scheduleFreeJob(pJob);
schtFreeQueryDag(&dag);
}
......@@ -308,6 +334,9 @@ TEST(insertTest, normalCase) {
SVgroupInfo vgInfo = {0};
SQueryDag dag = {0};
uint64_t numOfRows = 0;
schtInitLogFile();
SArray *qnodeList = taosArrayInit(1, sizeof(SEpAddr));
SEpAddr qnodeAddr = {0};
......@@ -336,7 +365,11 @@ TEST(insertTest, normalCase) {
scheduleFreeJob(pInsertJob);
}
TEST(multiThread, forceFree) {
schtInitLogFile();
}
int main(int argc, char** argv) {
......
......@@ -13,7 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "cJSON.h"
#include "os.h"
#include "taoserror.h"
......@@ -33,6 +32,24 @@ static inline int walBuildMetaName(SWal* pWal, int metaVer, char* buf) {
return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer);
}
void* tmemmem(char* haystack, int hlen, char* needle, int nlen) {
char* limit;
if (nlen == 0 || hlen < nlen) {
return false;
}
limit = haystack + hlen - nlen + 1;
while ((haystack = (char*)memchr(
haystack, needle[0], limit - haystack)) != NULL) {
if (memcmp(haystack, needle, nlen) == 0) {
return haystack;
}
haystack++;
}
return NULL;
}
static inline int64_t walScanLogGetLastVer(SWal* pWal) {
ASSERT(pWal->fileInfoSet != NULL);
int sz = taosArrayGetSize(pWal->fileInfoSet);
......@@ -47,7 +64,8 @@ static inline int64_t walScanLogGetLastVer(SWal* pWal) {
struct stat statbuf;
stat(fnameStr, &statbuf);
int readSize = MIN(WAL_MAX_SIZE, statbuf.st_size);
int readSize = MIN(WAL_MAX_SIZE + 2, statbuf.st_size);
pLastFileInfo->fileSize = statbuf.st_size;
FileFd fd = taosOpenFileRead(fnameStr);
if (fd < 0) {
......@@ -64,6 +82,7 @@ static inline int64_t walScanLogGetLastVer(SWal* pWal) {
return -1;
}
taosLSeekFile(fd, -readSize, SEEK_END);
if (readSize != taosReadFile(fd, buf, readSize)) {
free(buf);
taosCloseFile(fd);
......@@ -71,21 +90,25 @@ static inline int64_t walScanLogGetLastVer(SWal* pWal) {
return -1;
}
char* found = strstr(buf, (const char*)&magic);
if (found == NULL) {
ASSERT(false);
// file has to be deleted
free(buf);
taosCloseFile(fd);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
}
char *another;
while((another = strstr(found + 1, (const char*)&magic)) != NULL) {
char* haystack = buf;
char* found = NULL;
char *candidate;
while((candidate = tmemmem(haystack, readSize - (haystack - buf), (char*)&magic, sizeof(uint64_t))) != NULL) {
// read and validate
SWalHead *logContent = (SWalHead*)another;
SWalHead *logContent = (SWalHead*)candidate;
if (walValidHeadCksum(logContent) == 0 && walValidBodyCksum(logContent) == 0) {
found = another;
found = candidate;
}
haystack = candidate + 1;
}
if (found == buf) {
SWalHead *logContent = (SWalHead*)found;
if (walValidHeadCksum(logContent) != 0 || walValidBodyCksum(logContent) != 0) {
// file has to be deleted
free(buf);
taosCloseFile(fd);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
}
}
taosCloseFile(fd);
......@@ -120,7 +143,6 @@ int walCheckAndRepairMeta(SWal* pWal) {
SWalFileInfo fileInfo;
memset(&fileInfo, -1, sizeof(SWalFileInfo));
sscanf(name, "%" PRId64 ".log", &fileInfo.firstVer);
FileFd fd = taosOpenFileRead(ent->d_name);
//get lastVer
//get size
taosArrayPush(pLogInfoArray, &fileInfo);
......@@ -137,28 +159,25 @@ int walCheckAndRepairMeta(SWal* pWal) {
}
int newSz = taosArrayGetSize(pLogInfoArray);
// case 1. meta file not exist / cannot be parsed
if (pWal->fileInfoSet == NULL && newSz != 0) {
// recover fileInfo set
pWal->fileInfoSet = pLogInfoArray;
if (newSz != 0) {
// recover meta version
pWal->vers.firstVer = ((SWalFileInfo*)taosArrayGet(pLogInfoArray, 0))->firstVer;
pWal->writeCur = newSz - 1;
}
// recover file size
} else if (oldSz < newSz) {
if (oldSz < newSz) {
for (int i = oldSz; i < newSz; i++) {
SWalFileInfo *pFileInfo = taosArrayGet(pLogInfoArray, i);
taosArrayPush(pWal->fileInfoSet, pFileInfo);
}
pWal->writeCur = newSz - 1;
}
if (pWal->fileInfoSet && taosArrayGetSize(pWal->fileInfoSet) != 0) {
pWal->vers.firstVer = ((SWalFileInfo*)taosArrayGet(pLogInfoArray, 0))->firstVer;
pWal->vers.lastVer = walScanLogGetLastVer(pWal);
((SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->lastVer = pWal->vers.lastVer;
ASSERT(pWal->vers.lastVer != -1);
}
int code = walSaveMeta(pWal);
if (code < 0) {
taosArrayDestroy(pLogInfoArray);
return -1;
}
}
// case 2. versions in meta not match log
// or some log not included in meta
// (e.g. program killed)
......@@ -182,14 +201,11 @@ int walCheckAndRepairMeta(SWal* pWal) {
}
#endif
int code = walSaveMeta(pWal);
if (code < 0) {
return -1;
}
// get last version of this file
//
// rebuild meta
taosArrayDestroy(pLogInfoArray);
return 0;
}
......@@ -397,6 +413,10 @@ int walLoadMeta(SWal* pWal) {
}
memset(buf, 0, size + 5);
FileFd fd = taosOpenFileRead(fnameStr);
if (fd < 0) {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
}
if (taosReadFile(fd, buf, size) != size) {
terrno = TAOS_SYSTEM_ERROR(errno);
taosCloseFile(fd);
......
......@@ -106,6 +106,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
// init write buffer
memset(&pWal->writeHead, 0, sizeof(SWalHead));
pWal->writeHead.head.headVer = WAL_HEAD_VER;
pWal->writeHead.magic = WAL_MAGIC;
if (pthread_mutex_init(&pWal->mutex, NULL) < 0) {
taosArrayDestroy(pWal->fileInfoSet);
......@@ -121,7 +122,9 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
return NULL;
}
if (walLoadMeta(pWal) < 0 && walCheckAndRepairMeta(pWal) < 0) {
walLoadMeta(pWal);
if (walCheckAndRepairMeta(pWal) < 0) {
taosRemoveRef(tsWal.refSetId, pWal->refId);
pthread_mutex_destroy(&pWal->mutex);
taosArrayDestroy(pWal->fileInfoSet);
......@@ -130,6 +133,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
}
if (walCheckAndRepairIdx(pWal) < 0) {
}
wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->cfg.vgId, pWal, pWal->cfg.level,
......
......@@ -340,7 +340,10 @@ TEST_F(WalRetentionEnv, repairMeta1) {
char buf[100];
sprintf(buf, "%s/meta-ver%d", pathName, 0);
remove(buf);
sprintf(buf, "%s/meta-ver%d", pathName, 1);
remove(buf);
SetUp();
//getchar();
ASSERT_EQ(pWal->vers.lastVer, 99);
......@@ -377,4 +380,26 @@ TEST_F(WalRetentionEnv, repairMeta1) {
ASSERT_EQ(code, 0);
}
for (int i = 0; i < 1000; i++) {
int ver = rand() % 200;
code = walReadWithHandle(pRead, ver);
ASSERT_EQ(code, 0);
// printf("rrbody: \n");
// for(int i = 0; i < pRead->pHead->head.len; i++) {
// printf("%d ", pRead->pHead->head.body[i]);
//}
// printf("\n");
ASSERT_EQ(pRead->pHead->head.version, ver);
ASSERT_EQ(pRead->curVersion, ver + 1);
char newStr[100];
sprintf(newStr, "%s-%d", ranStr, ver);
int len = strlen(newStr);
ASSERT_EQ(pRead->pHead->head.len, len);
for (int j = 0; j < len; j++) {
EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]);
}
}
}
......@@ -1087,7 +1087,7 @@ int32_t taosGetSystemUUID(char *uid, int32_t uidlen) {
return 0;
}
return -1;
return 0;
}
char *taosGetCmdlineByPID(int pid) {
......
......@@ -274,7 +274,7 @@ TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STable
TsdbQueryHandleT tsdbQueryCacheLast(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemRef* pMemRef);
bool isTsdbCacheLastRow(TsdbQueryHandleT* pQueryHandle);
bool isTsdbCacheLastRow(TsdbQueryHandleT* pTsdbReadHandle);
/**
......@@ -308,19 +308,19 @@ int64_t tsdbGetNumOfRowsInMemTable(TsdbQueryHandleT* pHandle);
/**
* move to next block if exists
*
* @param pQueryHandle
* @param pTsdbReadHandle
* @return
*/
bool tsdbNextDataBlock(TsdbQueryHandleT pQueryHandle);
bool tsdbNextDataBlock(TsdbQueryHandleT pTsdbReadHandle);
/**
* Get current data block information
*
* @param pQueryHandle
* @param pTsdbReadHandle
* @param pBlockInfo
* @return
*/
void tsdbRetrieveDataBlockInfo(TsdbQueryHandleT *pQueryHandle, SDataBlockInfo *pBlockInfo);
void tsdbRetrieveDataBlockInfo(TsdbQueryHandleT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo);
/**
*
......@@ -332,7 +332,7 @@ void tsdbRetrieveDataBlockInfo(TsdbQueryHandleT *pQueryHandle, SDataBlockInfo *p
* @pBlockStatis the pre-calculated value for current data blocks. if the block is a cache block, always return 0
* @return
*/
int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT *pQueryHandle, SDataStatis **pBlockStatis);
int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT *pTsdbReadHandle, SDataStatis **pBlockStatis);
/**
*
......@@ -340,11 +340,11 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT *pQueryHandle, SDataSta
* the returned data block must be satisfied with the time window condition in any cases,
* which means the SData data block is not actually the completed disk data blocks.
*
* @param pQueryHandle query handle
* @param pTsdbReadHandle query handle
* @param pColumnIdList required data columns id list
* @return
*/
SArray *tsdbRetrieveDataBlock(TsdbQueryHandleT *pQueryHandle, SArray *pColumnIdList);
SArray *tsdbRetrieveDataBlock(TsdbQueryHandleT *pTsdbReadHandle, SArray *pColumnIdList);
/**
* Get the qualified table id for a super table according to the tag query expression.
......
......@@ -284,7 +284,7 @@ typedef struct SQueryRuntimeEnv {
uint32_t status; // query status
void* qinfo;
uint8_t scanFlag; // denotes reversed scan of data or not
void* pQueryHandle;
void* pTsdbReadHandle;
int32_t prevGroupId; // previous executed group id
bool enableGroupData;
......@@ -418,7 +418,7 @@ typedef struct SQueryParam {
} SQueryParam;
typedef struct STableScanInfo {
void *pQueryHandle;
void *pTsdbReadHandle;
int32_t numOfBlocks;
int32_t numOfSkipped;
int32_t numOfBlockStatis;
......
此差异已折叠。
......@@ -85,7 +85,6 @@ if $data02 != 2 then
return -1
endi
return
system sh/exec.sh -n dnode1 -s stop -x SIGKILL
system sh/exec.sh -n dnode1 -s start
......@@ -104,4 +103,4 @@ if $rows != 2 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
system sh/exec.sh -n dnode1 -s stop -x SIGINT
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册