提交 eed1e2f9 编写于 作者: D dapan

Merge remote-tracking branch 'origin/feature/2.0_wxy_temp' into feature/qnode2

......@@ -48,7 +48,7 @@ typedef struct SOutputData {
int8_t compressed;
char* pData;
bool queryEnd;
bool needSchedule;
int32_t scheduleJobNo;
int32_t bufStatus;
int64_t useconds;
int8_t precision;
......
......@@ -119,6 +119,25 @@ typedef struct SExchangePhyNode {
SArray *pSrcEndPoints; // SEpAddr, scheduler fill by calling qSetSuplanExecutionNode
} SExchangePhyNode;
typedef enum EAggAlgo {
AGG_ALGO_PLAIN = 1, // simple agg across all input rows
AGG_ALGO_SORTED, // grouped agg, input must be sorted
AGG_ALGO_HASHED // grouped agg, use internal hashtable
} EAggAlgo;
typedef enum EAggSplit {
AGG_SPLIT_PRE = 1, // first level agg, maybe don't need calculate the final result
AGG_SPLIT_FINAL // second level agg, must calculate the final result
} EAggSplit;
typedef struct SAggPhyNode {
SPhyNode node;
EAggAlgo aggAlgo; // algorithm used by agg operator
EAggSplit aggSplit; // distributed splitting mode
SArray *pExprs; // SExprInfo list, these are expression list of group_by_clause and parameter expression of aggregate function
SArray *pGroupByList; // SColIndex list, but these must be column node
} SAggPhyNode;
typedef struct SSubplanId {
uint64_t queryId;
uint64_t templateId;
......
......@@ -30,7 +30,7 @@ OP_ENUM_MACRO(TagScan)
OP_ENUM_MACRO(SystemTableScan)
OP_ENUM_MACRO(Aggregate)
OP_ENUM_MACRO(Project)
OP_ENUM_MACRO(Groupby)
// OP_ENUM_MACRO(Groupby)
OP_ENUM_MACRO(Limit)
OP_ENUM_MACRO(SLimit)
OP_ENUM_MACRO(TimeWindow)
......
......@@ -55,7 +55,7 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
int32_t qWorkerProcessQueryContinueMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
int32_t qWorkerProcessDataSinkMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
......
......@@ -293,7 +293,7 @@ int32_t dndInit(const SDnodeEnvCfg *pCfg) {
if (vnodeInit(&vnodeOpt) != 0) {
dError("failed to init vnode since %s", terrstr());
dndCleanup();
return NULL;
return -1;
}
memcpy(&dndEnv.cfg, pCfg, sizeof(SDnodeEnvCfg));
......
......@@ -37,36 +37,18 @@ typedef int32_t (*PutReqToVQueryQFp)(SDnode *pDnode, struct SRpcMsg *pReq);
typedef struct SVnodeCfg {
int32_t vgId;
SDnode *pDnode;
/** vnode buffer pool options */
struct {
/** write buffer size */
uint64_t wsize;
uint64_t ssize;
uint64_t lsize;
/** use heap allocator or arena allocator */
bool isHeapAllocator;
};
/** time to live of tables in this vnode */
uint32_t ttl;
/** data to keep in this vnode */
uint32_t keep;
/** if TS data is eventually consistency */
bool isWeak;
/** TSDB config */
STsdbCfg tsdbCfg;
/** META config */
SMetaCfg metaCfg;
/** TQ config */
STqCfg tqCfg;
/** WAL config */
SWalCfg walCfg;
} SVnodeCfg;
......
......@@ -30,12 +30,9 @@
#include "vnodeBufferPool.h"
#include "vnodeCfg.h"
#include "vnodeCommit.h"
#include "vnodeFS.h"
#include "vnodeMemAllocator.h"
#include "vnodeQuery.h"
#include "vnodeRequest.h"
#include "vnodeStateMgr.h"
#include "vnodeSync.h"
#ifdef __cplusplus
extern "C" {
......@@ -73,8 +70,6 @@ struct SVnode {
STsdb* pTsdb;
STQ* pTq;
SWal* pWal;
SVnodeSync* pSync;
SVnodeFS* pFs;
tsem_t canCommit;
SQHandle* pQuery;
SDnode* pDnode;
......@@ -84,6 +79,16 @@ int vnodeScheduleTask(SVnodeTask* task);
int32_t vnodePutReqToVQueryQ(SVnode *pVnode, struct SRpcMsg *pReq);
// For Log
extern int32_t vDebugFlag;
#define vFatal(...) do { if (vDebugFlag & DEBUG_FATAL) { taosPrintLog("TDB FATAL ", 255, __VA_ARGS__); }} while(0)
#define vError(...) do { if (vDebugFlag & DEBUG_ERROR) { taosPrintLog("TDB ERROR ", 255, __VA_ARGS__); }} while(0)
#define vWarn(...) do { if (vDebugFlag & DEBUG_WARN) { taosPrintLog("TDB WARN ", 255, __VA_ARGS__); }} while(0)
#define vInfo(...) do { if (vDebugFlag & DEBUG_INFO) { taosPrintLog("TDB ", 255, __VA_ARGS__); }} while(0)
#define vDebug(...) do { if (vDebugFlag & DEBUG_DEBUG) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }} while(0)
#define vTrace(...) do { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }} while(0)
#ifdef __cplusplus
}
#endif
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_VNODE_MAF_H_
#define _TD_VNODE_MAF_H_
#include "vnode.h"
#ifdef __cplusplus
extern "C" {
#endif
int vnodeOpenMAF(SVnode *pVnode);
void vnodeCloseMAF(SVnode *pVnode);
#ifdef __cplusplus
}
#endif
#endif /*_TD_VNODE_MAF_H_*/
\ No newline at end of file
......@@ -19,13 +19,12 @@
#ifdef __cplusplus
extern "C" {
#endif
#include "vnodeInt.h"
#include "qworker.h"
#include "vnode.h"
typedef struct SQWorkerMgmt SQHandle;
int vnodeQueryOpen(SVnode *pVnode);
#ifdef __cplusplus
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_VNODE_READ_H_
#define _TD_VNODE_READ_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "vnodeInt.h"
void vnodeProcessReadMsg(SVnode *pVnode, SVnodeMsg *pMsg);
#ifdef __cplusplus
}
#endif
#endif /*_TD_VNODE_READ_H_*/
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_VNODE_REQUEST_H_
#define _TD_VNODE_REQUEST_H_
#include "vnode.h"
#ifdef __cplusplus
extern "C" {
#endif
// SVDropTbReq
// int vnodeBuildDropTableReq(void **buf, const SVDropTbReq *pReq);
// void *vnodeParseDropTableReq(void *buf, SVDropTbReq *pReq);
#ifdef __cplusplus
}
#endif
#endif /*_TD_VNODE_REQUEST_H_*/
\ No newline at end of file
set(META_DB_IMPL_LIST "BDB" "SQLITE")
set(META_DB_IMPL "BDB" CACHE STRING "Use BDB as the default META implementation")
set_property(CACHE META_DB_IMPL PROPERTY STRINGS ${META_DB_IMPL_LIST})
if(META_DB_IMPL IN_LIST META_DB_IMPL_LIST)
message(STATUS "META DB Impl: ${META_DB_IMPL}==============")
else()
message(FATAL_ERROR "Invalid META DB IMPL: ${META_DB_IMPL}==============")
endif()
aux_source_directory(src META_SRC)
if(${META_DB_IMPL} STREQUAL "BDB")
list(REMOVE_ITEM META_SRC "src/metaSQLiteImpl.c")
elseif(${META_DB_IMPL} STREQUAL "SQLITE")
list(REMOVE_ITEM META_SRC "src/metaBDBImpl.c")
endif()
add_library(meta STATIC ${META_SRC})
target_include_directories(
meta
PUBLIC "${CMAKE_SOURCE_DIR}/include/dnode/vnode/meta"
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/index"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_link_libraries(
meta
PUBLIC common
PUBLIC index
)
if(${META_DB_IMPL} STREQUAL "BDB")
target_link_libraries(
meta
PUBLIC bdb
)
elseif(${META_DB_IMPL} STREQUAL "SQLITE")
target_link_libraries(
meta
PUBLIC sqlite
)
endif()
if(${BUILD_TEST})
add_subdirectory(test)
endif(${BUILD_TEST})
aux_source_directory(src TQ_SRC)
add_library(tq ${TQ_SRC})
target_include_directories(
tq
PUBLIC "${CMAKE_SOURCE_DIR}/include/dnode/vnode/tq"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_link_libraries(
tq
PUBLIC wal
PUBLIC os
PUBLIC util
PUBLIC common
PUBLIC transport
)
if(${BUILD_TEST})
add_subdirectory(test)
endif(${BUILD_TEST})
aux_source_directory(src TSDB_SRC)
if(0)
add_library(tsdb ${TSDB_SRC})
else(0)
add_library(tsdb STATIC "")
target_sources(tsdb
PRIVATE
"src/tsdbCommit.c"
"src/tsdbMain.c"
"src/tsdbMemTable.c"
"src/tsdbOptions.c"
"src/tsdbWrite.c"
"src/tsdbReadImpl.c"
"src/tsdbFile.c"
"src/tsdbFS.c"
"src/tsdbRead.c"
)
endif(0)
target_include_directories(
tsdb
PUBLIC "${CMAKE_SOURCE_DIR}/include/dnode/vnode/tsdb"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_link_libraries(
tsdb
PUBLIC os
PUBLIC util
PUBLIC common
PUBLIC tkv
PUBLIC tfs
PUBLIC meta
)
\ No newline at end of file
aux_source_directory(src VNODE_SRC)
add_library(vnode STATIC ${VNODE_SRC})
target_include_directories(
vnode
PUBLIC "${CMAKE_SOURCE_DIR}/include/dnode/vnode"
private "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_link_libraries(
vnode
PUBLIC os
PUBLIC transport
PUBLIC meta
PUBLIC tq
PUBLIC tsdb
PUBLIC wal
PUBLIC sync
PUBLIC cjson
PUBLIC qworker
)
# test
if(${BUILD_TEST})
# add_subdirectory(test)
endif(${BUILD_TEST})
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnodeDef.h"
\ No newline at end of file
......@@ -41,7 +41,7 @@ int vnodeInit(const SVnodeOpt *pOption) {
for (uint16_t i = 0; i < pOption->nthreads; i++) {
pthread_create(&(vnodeMgr.threads[i]), NULL, loop, NULL);
pthread_setname_np(vnodeMgr.threads[i], "VND Commit Thread");
// pthread_setname_np(vnodeMgr.threads[i], "VND Commit Thread");
}
} else {
// TODO: if no commit thread is set, then another mechanism should be
......
......@@ -28,7 +28,7 @@ int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
case TDMT_VND_QUERY:
return qWorkerProcessQueryMsg(pVnode->pTsdb, pVnode->pQuery, pMsg);
case TDMT_VND_QUERY_CONTINUE:
return qWorkerProcessQueryContinueMsg(pVnode->pTsdb, pVnode->pQuery, pMsg);
return qWorkerProcessCQueryMsg(pVnode->pTsdb, pVnode->pQuery, pMsg);
case TDMT_VND_SCHEDULE_DATA_SINK:
return qWorkerProcessDataSinkMsg(pVnode->pTsdb, pVnode->pQuery, pMsg);
default:
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnodeDef.h"
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnodeDef.h"
#if 0
static int vnodeBuildCreateTableReq(void **buf, const SVCreateTableReq *pReq);
static void *vnodeParseCreateTableReq(void *buf, SVCreateTableReq *pReq);
int vnodeBuildReq(void **buf, const SVnodeReq *pReq, tmsg_t type) {
int tsize = 0;
tsize += taosEncodeFixedU64(buf, pReq->ver);
switch (type) {
case TDMT_VND_CREATE_STB:
tsize += vnodeBuildCreateTableReq(buf, &(pReq->ctReq));
break;
case TDMT_VND_SUBMIT:
/* code */
break;
default:
break;
}
/* TODO */
return tsize;
}
void *vnodeParseReq(void *buf, SVnodeReq *pReq, tmsg_t type) {
buf = taosDecodeFixedU64(buf, &(pReq->ver));
switch (type) {
case TDMT_VND_CREATE_STB:
buf = vnodeParseCreateTableReq(buf, &(pReq->ctReq));
break;
default:
break;
}
// TODO
return buf;
}
static int vnodeBuildCreateTableReq(void **buf, const SVCreateTableReq *pReq) {
int tsize = 0;
tsize += taosEncodeString(buf, pReq->name);
tsize += taosEncodeFixedU32(buf, pReq->ttl);
tsize += taosEncodeFixedU32(buf, pReq->keep);
tsize += taosEncodeFixedU8(buf, pReq->type);
switch (pReq->type) {
case META_SUPER_TABLE:
tsize += taosEncodeFixedU64(buf, pReq->stbCfg.suid);
tsize += tdEncodeSchema(buf, pReq->stbCfg.pSchema);
tsize += tdEncodeSchema(buf, pReq->stbCfg.pTagSchema);
break;
case META_CHILD_TABLE:
tsize += taosEncodeFixedU64(buf, pReq->ctbCfg.suid);
tsize += tdEncodeKVRow(buf, pReq->ctbCfg.pTag);
break;
case META_NORMAL_TABLE:
tsize += tdEncodeSchema(buf, pReq->ntbCfg.pSchema);
break;
default:
break;
}
return tsize;
}
static void *vnodeParseCreateTableReq(void *buf, SVCreateTableReq *pReq) {
buf = taosDecodeString(buf, &(pReq->name));
buf = taosDecodeFixedU32(buf, &(pReq->ttl));
buf = taosDecodeFixedU32(buf, &(pReq->keep));
buf = taosDecodeFixedU8(buf, &(pReq->type));
switch (pReq->type) {
case META_SUPER_TABLE:
buf = taosDecodeFixedU64(buf, &(pReq->stbCfg.suid));
buf = tdDecodeSchema(buf, &(pReq->stbCfg.pSchema));
buf = tdDecodeSchema(buf, &(pReq->stbCfg.pTagSchema));
break;
case META_CHILD_TABLE:
buf = taosDecodeFixedU64(buf, &(pReq->ctbCfg.suid));
buf = tdDecodeKVRow(buf, &(pReq->ctbCfg.pTag));
break;
case META_NORMAL_TABLE:
buf = tdDecodeSchema(buf, &(pReq->ntbCfg.pSchema));
break;
default:
break;
}
return buf;
}
int vnodeBuildDropTableReq(void **buf, const SVDropTbReq *pReq) {
// TODO
return 0;
}
void *vnodeParseDropTableReq(void *buf, SVDropTbReq *pReq) {
// TODO
}
#endif
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
......@@ -83,6 +83,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
if (metaCreateTable(pVnode->pMeta, pCreateTbReq) < 0) {
// TODO: handle error
}
vTrace("vgId:%d process create table %s", pVnode->vgId, pCreateTbReq->name);
if (pCreateTbReq->type == TD_SUPER_TABLE) {
free(pCreateTbReq->stbCfg.pSchema);
free(pCreateTbReq->stbCfg.pTagSchema);
......
add_subdirectory(transport)
add_subdirectory(sync)
add_subdirectory(tkv)
add_subdirectory(tdb)
add_subdirectory(index)
add_subdirectory(wal)
add_subdirectory(parser)
......
......@@ -182,6 +182,12 @@ static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryE
static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
if (NULL == pDispatcher->nextOutput.pData) {
assert(pDispatcher->queryEnd);
pOutput->useconds = pDispatcher->useconds;
pOutput->precision = pDispatcher->schema.precision;
return TSDB_CODE_SUCCESS;
}
SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDispatcher->nextOutput.pData);
memcpy(pOutput->pData, pEntry->data, pEntry->dataLen);
pOutput->numOfRows = pEntry->numOfRows;
......@@ -190,7 +196,7 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
pOutput->bufStatus = updateStatus(pDispatcher);
pthread_mutex_lock(&pDispatcher->mutex);
pOutput->queryEnd = pDispatcher->queryEnd;
pOutput->needSchedule = false;
pOutput->scheduleJobNo = 0;
pOutput->useconds = pDispatcher->useconds;
pOutput->precision = pDispatcher->schema.precision;
pthread_mutex_unlock(&pDispatcher->mutex);
......
......@@ -189,7 +189,6 @@ static SPhyNode* createUserTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableI
return (SPhyNode*)node;
}
static bool isSystemTable(SQueryTableInfo* pTable) {
// todo
return false;
......@@ -295,13 +294,31 @@ static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTabl
static SPhyNode* createTableScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
SQueryTableInfo* pTable = (SQueryTableInfo*)pPlanNode->pExtInfo;
if (needMultiNodeScan(pTable)) {
return createExchangeNode(pCxt, pPlanNode, splitSubplanByTable(pCxt, pPlanNode, pTable));
}
return createSingleTableScanNode(pPlanNode, pTable, pCxt->pCurrentSubplan);
}
static SPhyNode* createSingleTableAgg(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
SAggPhyNode* node = (SAggPhyNode*)initPhyNode(pPlanNode, OP_Aggregate, sizeof(SAggPhyNode));
SGroupbyExpr* pGroupBy = (SGroupbyExpr*)pPlanNode->pExtInfo;
node->aggAlgo = AGG_ALGO_PLAIN;
node->aggSplit = AGG_SPLIT_FINAL;
if (NULL != pGroupBy) {
node->aggAlgo = AGG_ALGO_HASHED;
node->pGroupByList = validPointer(taosArrayDup(pGroupBy->columnInfo));
}
return (SPhyNode*)node;
}
static SPhyNode* createAggNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
// if (needMultiNodeAgg(pPlanNode)) {
// }
return createSingleTableAgg(pCxt, pPlanNode);
}
static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
SPhyNode* node = NULL;
switch (pPlanNode->info.type) {
......@@ -311,6 +328,10 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
case QNODE_TABLESCAN:
node = createTableScanNode(pCxt, pPlanNode);
break;
case QNODE_AGGREGATE:
case QNODE_GROUPBY:
node = createAggNode(pCxt, pPlanNode);
break;
case QNODE_MODIFY:
// Insert is not an operator in a physical plan.
break;
......
......@@ -20,6 +20,19 @@
typedef bool (*FToJson)(const void* obj, cJSON* json);
typedef bool (*FFromJson)(const cJSON* json, void* obj);
static char* getString(const cJSON* json, const char* name) {
char* p = cJSON_GetStringValue(cJSON_GetObjectItem(json, name));
return strdup(p);
}
static void copyString(const cJSON* json, const char* name, char* dst) {
strcpy(dst, cJSON_GetStringValue(cJSON_GetObjectItem(json, name)));
}
static int64_t getNumber(const cJSON* json, const char* name) {
return cJSON_GetNumberValue(cJSON_GetObjectItem(json, name));
}
static bool addObject(cJSON* json, const char* name, FToJson func, const void* obj) {
if (NULL == obj) {
return true;
......@@ -62,6 +75,39 @@ static bool fromObjectWithAlloc(const cJSON* json, const char* name, FFromJson f
return func(jObj, *obj);
}
static const char* jkPnodeType = "Type";
static int32_t getPnodeTypeSize(cJSON* json) {
switch (getNumber(json, jkPnodeType)) {
case OP_TableScan:
case OP_DataBlocksOptScan:
case OP_TableSeqScan:
return sizeof(STableScanPhyNode);
case OP_TagScan:
return sizeof(STagScanPhyNode);
case OP_SystemTableScan:
return sizeof(SSystemTableScanPhyNode);
case OP_Aggregate:
return sizeof(SAggPhyNode);
case OP_Exchange:
return sizeof(SExchangePhyNode);
default:
break;
};
return -1;
}
static bool fromPnode(const cJSON* json, const char* name, FFromJson func, void** obj) {
cJSON* jObj = cJSON_GetObjectItem(json, name);
if (NULL == jObj) {
return true;
}
*obj = calloc(1, getPnodeTypeSize(jObj));
if (NULL == *obj) {
return false;
}
return func(jObj, *obj);
}
static bool addTarray(cJSON* json, const char* name, FToJson func, const SArray* array, bool isPoint) {
size_t size = (NULL == array) ? 0 : taosArrayGetSize(array);
if (size > 0) {
......@@ -154,26 +200,9 @@ static bool fromRawArrayWithAlloc(const cJSON* json, const char* name, FFromJson
return fromItem(jArray, func, *array, itemSize, *size);
}
static bool fromRawArray(const cJSON* json, const char* name, FFromJson func, void** array, int32_t itemSize, int32_t* size) {
static bool fromRawArray(const cJSON* json, const char* name, FFromJson func, void* array, int32_t itemSize, int32_t* size) {
const cJSON* jArray = getArray(json, name, size);
if (*array == NULL) {
*array = calloc(*size, itemSize);
}
return fromItem(jArray, func, *array, itemSize, *size);
}
static char* getString(const cJSON* json, const char* name) {
char* p = cJSON_GetStringValue(cJSON_GetObjectItem(json, name));
return strdup(p);
}
static void copyString(const cJSON* json, const char* name, char* dst) {
strcpy(dst, cJSON_GetStringValue(cJSON_GetObjectItem(json, name)));
}
static int64_t getNumber(const cJSON* json, const char* name) {
return cJSON_GetNumberValue(cJSON_GetObjectItem(json, name));
return fromItem(jArray, func, array, itemSize, *size);
}
static const char* jkSchemaType = "Type";
......@@ -221,7 +250,7 @@ static bool dataBlockSchemaFromJson(const cJSON* json, void* obj) {
schema->resultRowSize = getNumber(json, jkDataBlockSchemaResultRowSize);
schema->precision = getNumber(json, jkDataBlockSchemaPrecision);
return fromRawArray(json, jkDataBlockSchemaSlotSchema, schemaFromJson, (void**) &(schema->pSchema), sizeof(SSlotSchema), &schema->numOfCols);
return fromRawArrayWithAlloc(json, jkDataBlockSchemaSlotSchema, schemaFromJson, (void**)&(schema->pSchema), sizeof(SSlotSchema), &schema->numOfCols);
}
static const char* jkColumnFilterInfoLowerRelOptr = "LowerRelOptr";
......@@ -534,19 +563,15 @@ static const char* jkScanNodeTableCount = "Count";
static bool scanNodeToJson(const void* obj, cJSON* json) {
const SScanPhyNode* scan = (const SScanPhyNode*)obj;
bool res = cJSON_AddNumberToObject(json, jkScanNodeTableId, scan->uid);
if (res) {
res = cJSON_AddNumberToObject(json, jkScanNodeTableType, scan->tableType);
}
if (res) {
res = cJSON_AddNumberToObject(json, jkScanNodeTableOrder, scan->order);
}
if (res) {
res = cJSON_AddNumberToObject(json, jkScanNodeTableCount, scan->count);
}
return res;
}
......@@ -559,6 +584,66 @@ static bool scanNodeFromJson(const cJSON* json, void* obj) {
return true;
}
static const char* jkColIndexColId = "ColId";
static const char* jkColIndexColIndex = "ColIndex";
static const char* jkColIndexFlag = "Flag";
static const char* jkColIndexName = "Name";
static bool colIndexToJson(const void* obj, cJSON* json) {
const SColIndex* col = (const SColIndex*)obj;
bool res = cJSON_AddNumberToObject(json, jkColIndexColId, col->colId);
if (res) {
res = cJSON_AddNumberToObject(json, jkColIndexColIndex, col->colIndex);
}
if (res) {
res = cJSON_AddNumberToObject(json, jkColIndexFlag, col->flag);
}
if (res) {
res = cJSON_AddStringToObject(json, jkColIndexName, col->name);
}
return res;
}
static bool colIndexFromJson(const cJSON* json, void* obj) {
SColIndex* col = (SColIndex*)obj;
col->colId = getNumber(json, jkColIndexColId);
col->colIndex = getNumber(json, jkColIndexColIndex);
col->flag = getNumber(json, jkColIndexFlag);
copyString(json, jkColIndexName, col->name);
return true;
}
static const char* jkAggNodeAggAlgo = "AggAlgo";
static const char* jkAggNodeAggSplit = "AggSplit";
static const char* jkAggNodeExprs = "Exprs";
static const char* jkAggNodeGroupByList = "GroupByList";
static bool aggNodeToJson(const void* obj, cJSON* json) {
const SAggPhyNode* agg = (const SAggPhyNode*)obj;
bool res = cJSON_AddNumberToObject(json, jkAggNodeAggAlgo, agg->aggAlgo);
if (res) {
res = cJSON_AddNumberToObject(json, jkAggNodeAggSplit, agg->aggSplit);
}
if (res) {
res = addArray(json, jkAggNodeExprs, exprInfoToJson, agg->pExprs);
}
if (res) {
res = addArray(json, jkAggNodeGroupByList, colIndexToJson, agg->pGroupByList);
}
return res;
}
static bool aggNodeFromJson(const cJSON* json, void* obj) {
SAggPhyNode* agg = (SAggPhyNode*)obj;
agg->aggAlgo = getNumber(json, jkAggNodeAggAlgo);
agg->aggSplit = getNumber(json, jkAggNodeAggSplit);
bool res = fromArray(json, jkAggNodeExprs, exprInfoFromJson, &agg->pExprs, sizeof(SExprInfo));
if (res) {
res = fromArray(json, jkAggNodeGroupByList, colIndexFromJson, &agg->pGroupByList, sizeof(SExprInfo));
}
return res;
}
static const char* jkTableScanNodeFlag = "Flag";
static const char* jkTableScanNodeWindow = "Window";
static const char* jkTableScanNodeTagsConditions = "TagsConditions";
......@@ -667,10 +752,10 @@ static bool specificPhyNodeToJson(const void* obj, cJSON* json) {
case OP_SystemTableScan:
return scanNodeToJson(obj, json);
case OP_Aggregate:
break; // todo
return aggNodeToJson(obj, json);
case OP_Project:
return true;
case OP_Groupby:
// case OP_Groupby:
case OP_Limit:
case OP_SLimit:
case OP_TimeWindow:
......@@ -708,7 +793,7 @@ static bool specificPhyNodeFromJson(const cJSON* json, void* obj) {
break; // todo
case OP_Project:
return true;
case OP_Groupby:
// case OP_Groupby:
case OP_Limit:
case OP_SLimit:
case OP_TimeWindow:
......@@ -735,12 +820,15 @@ static bool specificPhyNodeFromJson(const cJSON* json, void* obj) {
static const char* jkPnodeName = "Name";
static const char* jkPnodeTargets = "Targets";
static const char* jkPnodeConditions = "Conditions";
static const char* jkPnodeSchema = "InputSchema";
static const char* jkPnodeSchema = "TargetSchema";
static const char* jkPnodeChildren = "Children";
// The 'pParent' field do not need to be serialized.
static bool phyNodeToJson(const void* obj, cJSON* jNode) {
const SPhyNode* phyNode = (const SPhyNode*)obj;
bool res = cJSON_AddStringToObject(jNode, jkPnodeName, phyNode->info.name);
bool res = cJSON_AddNumberToObject(jNode, jkPnodeType, phyNode->info.type);
if (res) {
res = cJSON_AddStringToObject(jNode, jkPnodeName, phyNode->info.name);
}
if (res) {
res = addArray(jNode, jkPnodeTargets, exprInfoToJson, phyNode->pTargets);
}
......@@ -762,8 +850,8 @@ static bool phyNodeToJson(const void* obj, cJSON* jNode) {
static bool phyNodeFromJson(const cJSON* json, void* obj) {
SPhyNode* node = (SPhyNode*) obj;
node->info.name = getString(json, jkPnodeName);
node->info.type = opNameToOpType(node->info.name);
node->info.type = getNumber(json, jkPnodeType);
node->info.name = opTypeToOpName(node->info.type);
bool res = fromArray(json, jkPnodeTargets, exprInfoFromJson, &node->pTargets, sizeof(SExprInfo));
if (res) {
......@@ -910,8 +998,7 @@ static SSubplan* subplanFromJson(const cJSON* json) {
}
bool res = fromObject(json, jkSubplanId, subplanIdFromJson, &subplan->id, true);
if (res) {
size_t size = MAX(sizeof(SPhyNode), sizeof(SScanPhyNode));
res = fromObjectWithAlloc(json, jkSubplanNode, phyNodeFromJson, (void**)&subplan->pNode, size, false);
res = fromPnode(json, jkSubplanNode, phyNodeFromJson, (void**)&subplan->pNode);
}
if (res) {
res = fromObjectWithAlloc(json, jkSubplanDataSink, dataSinkFromJson, (void**)&subplan->pDataSink, sizeof(SDataSink), false);
......
......@@ -65,9 +65,9 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag,
}
if (pLogicPlan->info.type != QNODE_MODIFY) {
// char* str = NULL;
// queryPlanToString(pLogicPlan, &str);
// printf("%s\n", str);
char* str = NULL;
queryPlanToString(pLogicPlan, &str);
printf("%s\n", str);
}
code = optimizeQueryPlan(pLogicPlan);
......
......@@ -30,6 +30,21 @@ void* myCalloc(size_t nmemb, size_t size) {
class PhyPlanTest : public Test {
protected:
void pushAgg(int32_t aggOp) {
unique_ptr<SQueryPlanNode> agg((SQueryPlanNode*)myCalloc(1, sizeof(SQueryPlanNode)));
agg->info.type = aggOp;
agg->pExpr = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
unique_ptr<SExprInfo> expr((SExprInfo*)myCalloc(1, sizeof(SExprInfo)));
expr->base.resSchema.type = TSDB_DATA_TYPE_INT;
expr->base.resSchema.bytes = tDataTypes[TSDB_DATA_TYPE_INT].bytes;
expr->pExpr = (tExprNode*)myCalloc(1, sizeof(tExprNode));
expr->pExpr->nodeType = TEXPR_FUNCTION_NODE;
strcpy(expr->pExpr->_function.functionName, "Count");
SExprInfo* item = expr.release();
taosArrayPush(agg->pExpr, &item);
pushNode(agg.release());
}
void pushScan(const string& db, const string& table, int32_t scanOp) {
shared_ptr<MockTableMeta> meta = mockCatalogService->getTableMeta(db, table);
EXPECT_TRUE(meta);
......@@ -95,10 +110,11 @@ protected:
private:
void pushNode(SQueryPlanNode* node) {
if (logicPlan_) {
// todo
} else {
logicPlan_.reset(node);
node->pChildren = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
SQueryPlanNode* child = logicPlan_.release();
taosArrayPush(node->pChildren, &child);
}
logicPlan_.reset(node);
}
void copySchemaMeta(STableMeta** dst, const STableMeta* src) {
......@@ -174,6 +190,16 @@ TEST_F(PhyPlanTest, superTableScanTest) {
// todo check
}
// select count(*) from table
TEST_F(PhyPlanTest, simpleAggTest) {
pushScan("test", "t1", QNODE_TABLESCAN);
pushAgg(QNODE_AGGREGATE);
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
explain();
SQueryDag* dag = result();
// todo check
}
// insert into t values(...)
TEST_F(PhyPlanTest, insertTest) {
ASSERT_EQ(run("test", "insert into t1 values (now, 1, \"beijing\")"), TSDB_CODE_SUCCESS);
......
......@@ -42,8 +42,7 @@ enum {
QW_EVENT_READY,
QW_EVENT_FETCH,
QW_EVENT_DROP,
QW_EVENT_SCH_SINK,
QW_EVENT_SCH_QUERY,
QW_EVENT_CQUERY,
QW_EVENT_MAX,
};
......@@ -138,10 +137,10 @@ typedef struct SQWorkerMgmt {
#define QW_IDS() sId, qId, tId
#define QW_FPARAMS() mgmt, QW_IDS()
#define QW_IS_EVENT_RECEIVED(ctx, event) ((ctx)->events[event] == QW_EVENT_RECEIVED)
#define QW_IS_EVENT_PROCESSED(ctx, event) ((ctx)->events[event] == QW_EVENT_PROCESSED)
#define QW_SET_EVENT_RECEIVED(ctx, event) ((ctx)->events[event] = QW_EVENT_RECEIVED)
#define QW_SET_EVENT_PROCESSED(ctx, event) ((ctx)->events[event] = QW_EVENT_PROCESSED)
#define QW_IS_EVENT_RECEIVED(ctx, event) (atomic_load_8(&(ctx)->events[event]) == QW_EVENT_RECEIVED)
#define QW_IS_EVENT_PROCESSED(ctx, event) (atomic_load_8(&(ctx)->events[event]) == QW_EVENT_PROCESSED)
#define QW_SET_EVENT_RECEIVED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_RECEIVED)
#define QW_SET_EVENT_PROCESSED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_PROCESSED)
#define QW_IN_EXECUTOR(ctx) ((ctx)->phase == QW_PHASE_PRE_QUERY || (ctx)->phase == QW_PHASE_PRE_CQUERY || (ctx)->phase == QW_PHASE_PRE_FETCH || (ctx)->phase == QW_PHASE_PRE_SINK)
......
......@@ -269,7 +269,19 @@ int32_t qwAddTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tI
QW_RET(qwAddTaskCtxImpl(QW_FPARAMS(), 0, 0, NULL));
}
int32_t qwGetTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskCtx **ctx) {
char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId);
*ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
if (NULL == (*ctx)) {
QW_TASK_ELOG("ctx not in ctxHash, id:%s", id);
QW_ERR_RET(TSDB_CODE_QRY_RES_CACHE_NOT_EXIST);
}
return TSDB_CODE_SUCCESS;
}
int32_t qwAcquireTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWTaskCtx **ctx) {
char id[sizeof(qId) + sizeof(tId)] = {0};
......@@ -422,8 +434,11 @@ int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId,
QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION);
}
if (ctx->taskHandle && QW_IN_EXECUTOR(ctx)) {
QW_ERR_JRET(qKillTask(ctx->taskHandle));
if (QW_IN_EXECUTOR(ctx)) {
if (ctx->taskHandle) {
QW_ERR_JRET(qKillTask(ctx->taskHandle));
}
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING));
} else if (ctx->phase > 0) {
QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS()));
......@@ -450,44 +465,6 @@ _return:
QW_RET(code);
}
int32_t qwScheduleDataSink(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskCtx *ctx, SRpcMsg *pMsg) {
if (atomic_load_8(&handles->sinkScheduled)) {
qDebug("data sink already scheduled");
return TSDB_CODE_SUCCESS;
}
SSinkDataReq * req = (SSinkDataReq *)rpcMallocCont(sizeof(SSinkDataReq));
if (NULL == req) {
qError("rpcMallocCont %d failed", (int32_t)sizeof(SSinkDataReq));
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
req->header.vgId = mgmt->nodeId;
req->sId = sId;
req->queryId = qId;
req->taskId = tId;
SRpcMsg pNewMsg = {
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.msgType = TDMT_VND_SCHEDULE_DATA_SINK,
.pCont = req,
.contLen = sizeof(SSinkDataReq),
.code = 0,
};
int32_t code = (*mgmt->putToQueueFp)(mgmt->nodeObj, &pNewMsg);
if (TSDB_CODE_SUCCESS != code) {
qError("put data sink schedule msg to queue failed, code:%x", code);
rpcFreeCont(req);
QW_ERR_RET(code);
}
qDebug("put data sink schedule msg to query queue");
return TSDB_CODE_SUCCESS;
}
int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg, SOutputData *pOutput) {
......@@ -572,10 +549,8 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S
switch (phase) {
case QW_PHASE_PRE_QUERY: {
QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), QW_READ, &ctx));
QW_LOCK(QW_WRITE, &ctx->lock);
locked = true;
ctx->phase = phase;
assert(!QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL));
......@@ -653,6 +628,8 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S
locked = true;
ctx->phase = phase;
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) {
QW_TASK_WLOG("task already cancelled", NULL);
output->needStop = true;
......@@ -685,6 +662,32 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S
}
break;
}
case QW_PHASE_POST_FETCH: {
QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), QW_READ, &ctx));
QW_LOCK(QW_WRITE, &ctx->lock);
locked = true;
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) {
QW_TASK_WLOG("task already cancelled", NULL);
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED);
}
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG("task is dropping", NULL);
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_DROPPING;
} else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL)) {
QW_TASK_WLOG("task is cancelling", NULL);
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLING;
}
break;
}
}
......@@ -783,6 +786,93 @@ _return:
QW_RET(rspCode);
}
int32_t qwProcessCQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg) {
int32_t code = 0;
bool queryRsped = false;
bool needStop = false;
struct SSubplan *plan = NULL;
int32_t rspCode = 0;
SQWPhaseInput input = {0};
SQWPhaseOutput output = {0};
SQWTaskCtx *ctx = NULL;
void *rsp = NULL;
int32_t dataLen = 0;
QW_ERR_JRET(qwHandleTaskEvent(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, &output));
needStop = output.needStop;
code = output.rspCode;
if (needStop) {
QW_TASK_DLOG("task need stop", NULL);
QW_ERR_JRET(code);
}
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
qTaskInfo_t taskHandle = ctx->taskHandle;
DataSinkHandle sinkHandle = ctx->sinkHandle;
code = qExecTask(taskHandle, &sinkHandle);
if (code) {
QW_TASK_ELOG("qExecTask failed, code:%x", code);
QW_ERR_JRET(code);
}
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_CQUERY);
SOutputData sOutput = {0};
QW_ERR_JRET(qwGetResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
if (NULL == rsp) {
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH);
}
// Note: schedule data sink firstly and will schedule query after it's done
if (sOutput.scheduleJobNo) {
if (sOutput.scheduleJobNo > ctx.sinkId) {
QW_TASK_DLOG("sink need schedule, scheduleJobNo:%d", sOutput.scheduleJobNo);
ctx.sinkId = sOutput.scheduleJobNo;
QW_ERR_JRET(qwBuildAndSendSchSinkMsg(QW_FPARAMS(), qwMsg->connection));
}
} else if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) {
QW_TASK_DLOG("task not end, need to continue, bufStatus:%d", sOutput.bufStatus);
if (!QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CQUERY)) {
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_CQUERY);
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING));
QW_ERR_RET(qwBuildAndSendCQueryMsg(QW_FPARAMS(), qwMsg->connection));
}
}
if (rsp) {
qwBuildFetchRsp(rsp, &sOutput, dataLen);
}
}
_return:
qwHandleTaskEvent(QW_FPARAMS(), QW_PHASE_POST_CQUERY, &input, &output);
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
if (code) {
qwFreeFetchRsp(rsp);
rsp = NULL;
qwBuildAndSendFetchRsp(qwMsg->connection, rsp, 0, code);
} else if (rsp) {
qwBuildAndSendFetchRsp(qwMsg->connection, rsp, dataLen, code);
}
}
QW_RET(rspCode);
}
int32_t qwProcessFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg) {
......@@ -811,24 +901,33 @@ int32_t qwProcessFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
QW_ERR_JRET(code);
}
QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), QW_READ, &ctx));
QW_LOCK(QW_WRITE, &ctx->lock);
locked = true;
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
SOutputData sOutput = {0};
QW_ERR_JRET(qwGetResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
if (NULL == rsp) {
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH);
}
// Note: schedule data sink firstly and will schedule query after it's done
if (sOutput.needSchedule) {
QW_TASK_DLOG("sink need schedule, queryEnd:%d", sOutput.queryEnd);
if (sOutput.needSchedule > ctx.sinkId) {
QW_ERR_RET(qwScheduleDataSink(ctx, mgmt, sId, qId, tId, pMsg));
if (sOutput.scheduleJobNo) {
if (sOutput.scheduleJobNo > ctx.sinkId) {
QW_TASK_DLOG("sink need schedule, scheduleJobNo:%d", sOutput.scheduleJobNo);
ctx.sinkId = sOutput.scheduleJobNo;
QW_ERR_JRET(qwBuildAndSendSchSinkMsg(QW_FPARAMS(), qwMsg->connection));
}
} else if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) {
QW_TASK_DLOG("task not end, need to continue, bufStatus:%d", output.bufStatus);
QW_ERR_RET(qwScheduleQuery(ctx, mgmt, sId, qId, tId, pMsg));
QW_TASK_DLOG("task not end, need to continue, bufStatus:%d", sOutput.bufStatus);
if (!QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CQUERY)) {
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_CQUERY);
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING));
QW_ERR_RET(qwBuildAndSendCQueryMsg(QW_FPARAMS(), qwMsg->connection));
}
}
if (rsp) {
......@@ -837,13 +936,7 @@ int32_t qwProcessFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
_return:
if (locked) {
QW_UNLOCK(QW_WRITE, &ctx->lock);
}
if (ctx) {
qwReleaseTaskCtx(QW_READ, mgmt);
}
qwHandleTaskEvent(QW_FPARAMS(), QW_PHASE_POST_FETCH, &input, &output);
if (code) {
qwFreeFetchRsp(rsp);
......@@ -852,6 +945,7 @@ _return:
} else if (rsp) {
qwBuildAndSendFetchRsp(qwMsg->connection, rsp, dataLen, code);
}
QW_RET(code);
}
......@@ -901,14 +995,14 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW
mgmt->cfg.maxSchTaskNum = QWORKER_DEFAULT_SCH_TASK_NUMBER;
}
mgmt->schHash = taosHashInit(mgmt->cfg.maxSchedulerNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
mgmt->schHash = taosHashInit(mgmt->cfg.maxSchedulerNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == mgmt->schHash) {
tfree(mgmt);
qError("init %d scheduler hash failed", mgmt->cfg.maxSchedulerNum);
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
mgmt->ctxHash = taosHashInit(mgmt->cfg.maxTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
mgmt->ctxHash = taosHashInit(mgmt->cfg.maxTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (NULL == mgmt->ctxHash) {
taosHashCleanup(mgmt->schHash);
mgmt->schHash = NULL;
......@@ -1104,47 +1198,3 @@ _return:
}
int32_t qwScheduleQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskCtx *handles, SRpcMsg *pMsg) {
if (atomic_load_8(&handles->queryScheduled)) {
QW_SCH_TASK_ELOG("query already scheduled, queryScheduled:%d", handles->queryScheduled);
return TSDB_CODE_SUCCESS;
}
QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING));
SQueryContinueReq * req = (SQueryContinueReq *)rpcMallocCont(sizeof(SQueryContinueReq));
if (NULL == req) {
QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(SQueryContinueReq));
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
req->header.vgId = mgmt->nodeId;
req->sId = sId;
req->queryId = qId;
req->taskId = tId;
SRpcMsg pNewMsg = {
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.msgType = TDMT_VND_QUERY_CONTINUE,
.pCont = req,
.contLen = sizeof(SQueryContinueReq),
.code = 0,
};
int32_t code = (*mgmt->putToQueueFp)(mgmt->nodeObj, &pNewMsg);
if (TSDB_CODE_SUCCESS != code) {
QW_SCH_TASK_ELOG("put query continue msg to queue failed, code:%x", code);
rpcFreeCont(req);
QW_ERR_RET(code);
}
handles->queryScheduled = true;
QW_SCH_TASK_DLOG("put query continue msg to query queue, vgId:%d", mgmt->nodeId);
return TSDB_CODE_SUCCESS;
}
......@@ -226,6 +226,42 @@ int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchRe
return TSDB_CODE_SUCCESS;
}
int32_t qwBuildAndSendSchSinkMsg(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, void *connection) {
SRpcMsg *pMsg = (SRpcMsg *)connection;
SSinkDataReq * req = (SSinkDataReq *)rpcMallocCont(sizeof(SSinkDataReq));
if (NULL == req) {
qError("rpcMallocCont %d failed", (int32_t)sizeof(SSinkDataReq));
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
req->header.vgId = mgmt->nodeId;
req->sId = sId;
req->queryId = qId;
req->taskId = tId;
SRpcMsg pNewMsg = {
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.msgType = TDMT_VND_SCHEDULE_DATA_SINK,
.pCont = req,
.contLen = sizeof(SSinkDataReq),
.code = 0,
};
int32_t code = (*mgmt->putToQueueFp)(mgmt->nodeObj, &pNewMsg);
if (TSDB_CODE_SUCCESS != code) {
qError("put data sink schedule msg to queue failed, code:%x", code);
rpcFreeCont(req);
QW_ERR_RET(code);
}
qDebug("put data sink schedule msg to query queue");
return TSDB_CODE_SUCCESS;
}
int32_t qwSetAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SRpcMsg *pMsg) {
SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL;
......@@ -268,14 +304,8 @@ _return:
}
int32_t qwScheduleQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskCtx *handles, SRpcMsg *pMsg) {
if (atomic_load_8(&handles->queryScheduled)) {
QW_SCH_TASK_ELOG("query already scheduled, queryScheduled:%d", handles->queryScheduled);
return TSDB_CODE_SUCCESS;
}
QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING));
int32_t qwBuildAndSendCQueryMsg(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, void *connection) {
SRpcMsg *pMsg = (SRpcMsg *)connection;
SQueryContinueReq * req = (SQueryContinueReq *)rpcMallocCont(sizeof(SQueryContinueReq));
if (NULL == req) {
QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(SQueryContinueReq));
......@@ -303,8 +333,6 @@ int32_t qwScheduleQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
QW_ERR_RET(code);
}
handles->queryScheduled = true;
QW_SCH_TASK_DLOG("put query continue msg to query queue, vgId:%d", mgmt->nodeId);
return TSDB_CODE_SUCCESS;
......@@ -338,74 +366,31 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
QW_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg));
}
int32_t qWorkerProcessQueryContinueMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
int32_t code = 0;
int8_t status = 0;
bool queryDone = false;
SQueryContinueReq *req = (SQueryContinueReq *)pMsg->pCont;
SQueryContinueReq *msg = (SQueryContinueReq *)pMsg->pCont;
bool needStop = false;
SQWTaskCtx *handles = NULL;
QW_ERR_JRET(qwAcquireTaskCtx(QW_READ, qWorkerMgmt, req->queryId, req->taskId, &handles));
QW_LOCK(QW_WRITE, &handles->lock);
qTaskInfo_t taskHandle = handles->taskHandle;
DataSinkHandle sinkHandle = handles->sinkHandle;
QW_UNLOCK(QW_WRITE, &handles->lock);
qwReleaseTaskResCache(QW_READ, qWorkerMgmt);
QW_ERR_JRET(qwCheckAndProcessTaskDrop(qWorkerMgmt, req->sId, req->queryId, req->taskId, &needStop));
if (needStop) {
qWarn("task need stop");
QW_ERR_JRET(qwAcquireTaskCtx(QW_READ, qWorkerMgmt, req->queryId, req->taskId, &handles));
QW_LOCK(QW_WRITE, &handles->lock);
if (handles->needRsp) {
qwBuildAndSendQueryRsp(pMsg, TSDB_CODE_QRY_TASK_CANCELLED);
handles->needRsp = false;
}
QW_UNLOCK(QW_WRITE, &handles->lock);
qwReleaseTaskResCache(QW_READ, qWorkerMgmt);
QW_ERR_RET(TSDB_CODE_QRY_TASK_CANCELLED);
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
QW_ELOG("invalid cquery msg, contLen:%d", pMsg->contLen);
QW_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
DataSinkHandle newHandle = NULL;
code = qExecTask(taskHandle, &newHandle);
if (code) {
qError("qExecTask failed, code:%x", code);
QW_ERR_JRET(code);
}
if (sinkHandle != newHandle) {
qError("data sink mis-match");
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
msg->sId = be64toh(msg->sId);
msg->queryId = be64toh(msg->queryId);
msg->taskId = be64toh(msg->taskId);
msg->contentLen = ntohl(msg->contentLen);
_return:
QW_ERR_JRET(qwAcquireTaskCtx(QW_READ, qWorkerMgmt, req->queryId, req->taskId, &handles));
QW_LOCK(QW_WRITE, &handles->lock);
if (handles->needRsp) {
code = qwBuildAndSendQueryRsp(pMsg, code);
handles->needRsp = false;
}
handles->queryScheduled = false;
QW_UNLOCK(QW_WRITE, &handles->lock);
qwReleaseTaskResCache(QW_READ, qWorkerMgmt);
uint64_t sId = msg->sId;
uint64_t qId = msg->queryId;
uint64_t tId = msg->taskId;
if (TSDB_CODE_SUCCESS != code) {
status = JOB_TASK_STATUS_FAILED;
} else {
status = JOB_TASK_STATUS_PARTIAL_SUCCEED;
}
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connection = pMsg};
code = qwQueryPostProcess(qWorkerMgmt, req->sId, req->queryId, req->taskId, status, code);
QW_RET(code);
QW_RET(qwProcessCQuery(QW_FPARAMS(), &qwMsg));
}
......
aux_source_directory(src TKV_SRC)
add_library(tkv STATIC ${TKV_SRC})
aux_source_directory(src TDB_SRC)
add_library(tdb STATIC ${TDB_SRC})
# target_include_directories(
# tkv
# PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/tkv"
# PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
# )
target_include_directories(
tkv
tdb
PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/inc"
PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/src/inc"
)
target_link_libraries(
tkv
tdb
PUBLIC os
PUBLIC util
)
\ No newline at end of file
)
if(${BUILD_TEST})
# add_subdirectory(test)
endif(${BUILD_TEST})
......@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TKV_H_
#define _TD_TKV_H_
#ifndef _TD_TDB_H_
#define _TD_TDB_H_
#include "os.h"
......@@ -22,18 +22,29 @@
extern "C" {
#endif
typedef enum {
TDB_BTREE = 0,
TDB_HASH,
TDB_HEAP,
} tdb_db_t;
// Forward declaration
typedef struct TDB TDB;
typedef struct TDB_ENV TDB_ENV;
typedef struct TDB TDB;
typedef struct TDB_CURSOR TDB_CURSOR;
// SKey
typedef struct {
void * bdata;
void* bdata;
uint32_t size;
} TDB_KEY, TDB_VALUE;
// TDB Operations
int tdbCreateDB(TDB** dbpp);
int tdbOpenDB(TDB* dbp, tdb_db_t type, uint32_t flags);
int tdbCloseDB(TDB* dbp, uint32_t flags);
#ifdef __cplusplus
}
#endif
#endif /*_TD_TKV_H_*/
\ No newline at end of file
#endif /*_TD_TDB_H_*/
\ No newline at end of file
......@@ -13,21 +13,21 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_VNODE_SYNC_H_
#define _TD_VNODE_SYNC_H_
#ifndef _TD_TDB_BTREE_H_
#define _TD_TDB_BTREE_H_
// #include "sync.h"
#include "tkvDef.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct {
/* data */
} SVnodeSync;
pgid_t root; // root page number
} TDB_BTREE;
#ifdef __cplusplus
}
#endif
#endif /*_TD_VNODE_SYNC_H_*/
\ No newline at end of file
#endif /*_TD_TDB_BTREE_H_*/
\ No newline at end of file
......@@ -13,35 +13,27 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_VNODE_FS_H_
#define _TD_VNODE_FS_H_
#ifndef _TD_TDB_BUF_POOL_H_
#define _TD_TDB_BUF_POOL_H_
#include "vnode.h"
#include "tdbPage.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct {
} SDir;
typedef struct STdbBufPool STdbBufPool;
typedef struct {
} SFile;
typedef struct SFS {
void *pImpl;
int (*startEdit)(struct SFS *);
int (*endEdit)(struct SFS *);
} SFS;
typedef struct {
} SVnodeFS;
int vnodeOpenFS(SVnode *pVnode);
void vnodeCloseFS(SVnode *pVnode);
int tbpOpen(STdbBufPool **ppTkvBufPool);
int tbpClose(STdbBufPool *pTkvBufPool);
STdbPage *tbpNewPage(STdbBufPool *pTkvBufPool);
int tbpDelPage(STdbBufPool *pTkvBufPool);
STdbPage *tbpFetchPage(STdbBufPool *pTkvBufPool, pgid_t pgid);
int tbpUnpinPage(STdbBufPool *pTkvBufPool, pgid_t pgid);
void tbpFlushPages(STdbBufPool *pTkvBufPool);
#ifdef __cplusplus
}
#endif
#endif /*_TD_VNODE_FS_H_*/
\ No newline at end of file
#endif /*_TD_TDB_BUF_POOL_H_*/
\ No newline at end of file
......@@ -13,19 +13,28 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TKV_ENV_H_
#define _TD_TKV_ENV_H_
#ifndef _TD_TDB_DB_H_
#define _TD_TDB_DB_H_
#include "tdbBtree.h"
#include "tdbHash.h"
#ifdef __cplusplus
extern "C" {
#endif
struct TDB_ENV {
char *homeDir;
struct TDB {
pgsize_t pageSize;
tdb_db_t type;
union {
TDB_BTREE btree;
TDB_HASH hash;
} dbam; // Different access methods
};
#ifdef __cplusplus
}
#endif
#endif /*_TD_TKV_ENV_H_*/
\ No newline at end of file
#endif /*_TD_TDB_DB_H_*/
\ No newline at end of file
......@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TKV_DEF_H_
#define _TD_TKV_DEF_H_
#ifndef _TD_TDB_DEF_H_
#define _TD_TDB_DEF_H_
#include "os.h"
......@@ -39,4 +39,4 @@ typedef int32_t pgsize_t;
}
#endif
#endif /*_TD_TKV_DEF_H_*/
\ No newline at end of file
#endif /*_TD_TDB_DEF_H_*/
\ No newline at end of file
......@@ -22,7 +22,7 @@ extern "C" {
#include "os.h"
#include "tkvDef.h"
#include "tdbDef.h"
typedef struct STkvDiskMgr STkvDiskMgr;
......
......@@ -13,19 +13,21 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TKV_DB_H_
#define _TD_TKV_DB_H_
#ifndef _TD_TKV_HAHS_H_
#define _TD_TKV_HAHS_H_
#include "tdbDef.h"
#ifdef __cplusplus
extern "C" {
#endif
struct TDB {
typedef struct {
// TODO
};
} TDB_HASH;
#ifdef __cplusplus
}
#endif
#endif /*_TD_TKV_DB_H_*/
\ No newline at end of file
#endif /*_TD_TKV_HAHS_H_*/
\ No newline at end of file
......@@ -17,30 +17,24 @@
#define _TD_TKV_PAGE_H_
#include "os.h"
#include "tkvDef.h"
#include "tdbDef.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct STkvPage {
typedef struct {
pgid_t pgid;
int32_t pinCount;
bool idDirty;
char* pData;
} STkvPage;
} STdbPage;
typedef struct {
uint16_t dbver;
uint16_t pgsize;
uint32_t cksm;
} STkvPgHdr;
// typedef struct {
// SPgHdr chdr;
// uint16_t used; // number of used slots
// uint16_t loffset; // the offset of the starting location of the last slot used
// } SSlottedPgHdr;
} STdbPgHdr;
#ifdef __cplusplus
}
......
......@@ -16,17 +16,17 @@
#include "thash.h"
#include "tlist.h"
#include "tkvBufPool.h"
#include "tkvDiskMgr.h"
#include "tkvPage.h"
#include "tdbBufPool.h"
#include "tdbDiskMgr.h"
#include "tdbPage.h"
struct SFrameIdWrapper {
TD_SLIST_NODE(SFrameIdWrapper);
frame_id_t id;
};
struct STkvBufPool {
STkvPage* pages;
struct STdbBufPool {
STdbPage* pages;
STkvDiskMgr* pDiskMgr;
SHashObj* pgTb; // page_id_t --> frame_id_t
TD_SLIST(SFrameIdWrapper) freeList;
......
......@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tkvDiskMgr.h"
#include "tdbDiskMgr.h"
struct STkvDiskMgr {
char * fname;
......
# tdbTest
add_executable(tdbTest "tdbTest.cpp")
target_link_libraries(tdbTest tdb gtest gtest_main)
\ No newline at end of file
#include "gtest/gtest.h"
#include "tdb.h"
TEST(tdb_api_test, tdb_create_open_close_db_test) {
int ret;
TDB *dbp;
tdbCreateDB(&dbp);
tdbOpenDB(dbp, TDB_BTREE, 0);
tdbCloseDB(dbp, 0);
}
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TKV_BUF_POOL_H_
#define _TD_TKV_BUF_POOL_H_
#include "tkvPage.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct STkvBufPool STkvBufPool;
int tbpOpen(STkvBufPool **ppTkvBufPool);
int tbpClose(STkvBufPool *pTkvBufPool);
STkvPage *tbpNewPage(STkvBufPool *pTkvBufPool);
int tbpDelPage(STkvBufPool *pTkvBufPool);
STkvPage *tbpFetchPage(STkvBufPool *pTkvBufPool, pgid_t pgid);
int tbpUnpinPage(STkvBufPool *pTkvBufPool, pgid_t pgid);
void tbpFlushPages(STkvBufPool *pTkvBufPool);
#ifdef __cplusplus
}
#endif
#endif /*_TD_TKV_BUF_POOL_H_*/
\ No newline at end of file
#include "gtest/gtest.h"
#include "iostream"
#include "tDiskMgr.h"
TEST(tDiskMgrTest, simple_test) {
// TODO
std::cout << "This is in tDiskMgrTest::simple_test" << std::endl;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册