提交 ab98754c 编写于 作者: X Xiaoyu Wang

Merge remote-tracking branch 'origin/3.0' into enh/3.0_planner_optimize

......@@ -107,7 +107,7 @@ sudo yum config-manager --set-enabled Powertools
### macOS
```
sudo brew install argp-standalone pkgconfig
brew install argp-standalone pkgconfig
```
### 设置 golang 开发环境
......@@ -276,7 +276,7 @@ sudo make install
安装成功后,可以在应用程序中双击 TDengine 图标启动服务,或者在终端中启动 TDengine 服务:
```bash
launchctl start taosd
launchctl start com.tdengine.taosd
```
用户可以使用 TDengine CLI 来连接 TDengine 服务,在终端中,输入:
......
......@@ -108,7 +108,7 @@ sudo yum config-manager --set-enabled powertools
### macOS
```
sudo brew install argp-standalone pkgconfig
brew install argp-standalone pkgconfig
```
### Setup golang environment
......@@ -280,7 +280,7 @@ Installing from source code will also configure service management for TDengine.
To start the service after installation, double-click the /applications/TDengine to start the program, or in a terminal, use:
```bash
launchctl start taosd
launchctl start com.tdengine.taosd
```
Then users can use the TDengine CLI to connect the TDengine server. In a terminal, use:
......
......@@ -45,10 +45,19 @@ IF (${CMAKE_SYSTEM_NAME} MATCHES "Linux" OR ${CMAKE_SYSTEM_NAME} MATCHES "Darwin
ADD_DEFINITIONS("-DDARWIN -Wno-tautological-pointer-compare")
MESSAGE("Current system processor is ${CMAKE_SYSTEM_PROCESSOR}.")
IF (${CMAKE_SYSTEM_PROCESSOR} MATCHES "arm64" OR ${CMAKE_SYSTEM_PROCESSOR} MATCHES "x86_64")
MESSAGE("Current system arch is 64")
IF (${CMAKE_SYSTEM_PROCESSOR} MATCHES "arm64")
MESSAGE("Current system arch is arm64")
SET(TD_DARWIN_64 TRUE)
SET(TD_DARWIN_ARM64 TRUE)
ADD_DEFINITIONS("-D_TD_DARWIN_64")
ADD_DEFINITIONS("-D_TD_DARWIN_ARM64")
ENDIF ()
IF (${CMAKE_SYSTEM_PROCESSOR} MATCHES "x86_64")
MESSAGE("Current system arch is x86_64")
SET(TD_DARWIN_64 TRUE)
SET(TD_DARWIN_X64 TRUE)
ADD_DEFINITIONS("-D_TD_DARWIN_64")
ADD_DEFINITIONS("-D_TD_DARWIN_X64")
ENDIF ()
ADD_DEFINITIONS("-DHAVE_UNISTD_H")
......
......@@ -270,7 +270,7 @@ if(${JEMALLOC_ENABLED})
PREFIX "jemalloc"
SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/jemalloc
BUILD_IN_SOURCE 1
CONFIGURE_COMMAND ./autogen.sh COMMAND ./configure --prefix=${CMAKE_BINARY_DIR}/build/ --disable-initial-exec-tls
CONFIGURE_COMMAND ./autogen.sh COMMAND ./configure --prefix=${CMAKE_BINARY_DIR}/build/ --disable-initial-exec-tls --with-malloc-conf='background_thread:true,metadata_thp:auto'
BUILD_COMMAND ${MAKE}
)
INCLUDE_DIRECTORIES(${CMAKE_BINARY_DIR}/build/include)
......
......@@ -189,13 +189,13 @@ After the installation is complete, run `C:\TDengine\taosd.exe` to start TDengin
<TabItem label="macOS" value="macos">
After the installation is complete, double-click the /applications/TDengine to start the program, or run `launchctl start taosd` to start TDengine Server.
After the installation is complete, double-click the /applications/TDengine to start the program, or run `launchctl start com.tdengine.taosd` to start TDengine Server.
The following `launchctl` commands can help you manage TDengine service:
- Start TDengine Server: `launchctl start taosd`
- Start TDengine Server: `launchctl start com.tdengine.taosd`
- Stop TDengine Server: `launchctl stop taosd`
- Stop TDengine Server: `launchctl stop com.tdengine.taosd`
- Check TDengine Server status: `launchctl list | grep taosd`
......
......@@ -675,7 +675,7 @@ To prevent system resource from being exhausted by multiple concurrent streams,
| Meaning | Whether to generate core file when server crashes |
| Value Range | 0: false, 1: true |
| Default Value | 1 |
| Note | The core file is generated under root directory `systemctl/launchctl start taosd` is used to start, or under the working directory if `taosd` is started directly on Linux/macOS Shell. |
| Note | The core file is generated under root directory `systemctl start taosd`/`launchctl start com.tdengine.taosd` is used to start, or under the working directory if `taosd` is started directly on Linux/macOS Shell. |
### udf
......
......@@ -181,7 +181,7 @@ To make full use of the characteristics of time-series data, TDengine splits the
For time-series data, there is generally a retention policy, which is determined by the system configuration parameter `keep`. Data files exceeding this set number of days will be automatically deleted by the system to free up storage space.
Given `duration` and `keep` parameters, the total number of data files in a vnode is: keep/duration. The total number of data files should not be too large or too small. 10 to 100 is appropriate. Based on this principle, reasonable `duration` can be set. In the current version, parameter `keep` can be modified, but parameter `duration` cannot be modified once it is set.
Given `duration` and `keep` parameters, the total number of data files in a vnode is: round up of (keep/duration+1). The total number of data files should not be too large or too small. 10 to 100 is appropriate. Based on this principle, reasonable `duration` can be set. In the current version, parameter `keep` can be modified, but parameter `duration` cannot be modified once it is set.
In each data file, the data of a table is stored in blocks. A table can have one or more data file blocks. In a file block, data is stored in columns, occupying a continuous storage space, thus greatly improving the reading speed. The size of file block is determined by the system parameter `maxRows` (the maximum number of records per block), and the default value is 4096. This value should not be too large or too small. If it is too large, data location for queries will take a longer time. If it is too small, the index of data block is too large, and the compression efficiency will be low with slower reading speed.
......
......@@ -188,13 +188,13 @@ Active: inactive (dead)
<TabItem label="macOS 系统" value="macos">
安装后,在应用程序目录下,双击 TDengine 图标来启动程序,也可以运行 `launchctl start taosd` 来启动 TDengine 服务进程。
安装后,在应用程序目录下,双击 TDengine 图标来启动程序,也可以运行 `launchctl start com.tdengine.taosd` 来启动 TDengine 服务进程。
如下 `launchctl` 命令可以帮助你管理 TDengine 服务:
- 启动服务进程:`launchctl start taosd`
- 启动服务进程:`launchctl start com.tdengine.taosd`
- 停止服务进程:`launchctl stop taosd`
- 停止服务进程:`launchctl stop com.tdengine.taosd`
- 查看服务状态:`launchctl list | grep taosd`
......
......@@ -230,7 +230,7 @@ int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst);
int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst);
extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char** msg, int32_t msgSize, int32_t* msgLen,
void* (*mallocFp)(int32_t));
void* (*mallocFp)(int64_t));
extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t msgSize);
#define SET_META_TYPE_NULL(t) (t) = META_TYPE_NULL_TABLE
......
......@@ -43,7 +43,7 @@ int32_t scalarGetOperatorParamNum(EOperatorType type);
int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type);
int32_t vectorGetConvertType(int32_t type1, int32_t type2);
int32_t vectorConvertImpl(const SScalarParam *pIn, SScalarParam *pOut, int32_t *overflow);
int32_t vectorConvertSingleColImpl(const SScalarParam *pIn, SScalarParam *pOut, int32_t *overflow, int32_t startIndex, int32_t numOfRows);
/* Math functions */
int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
......
......@@ -123,9 +123,9 @@ void rpcCleanup();
void *rpcOpen(const SRpcInit *pRpc);
void rpcClose(void *);
void rpcCloseImpl(void *);
void *rpcMallocCont(int32_t contLen);
void *rpcMallocCont(int64_t contLen);
void rpcFreeCont(void *pCont);
void *rpcReallocCont(void *ptr, int32_t contLen);
void *rpcReallocCont(void *ptr, int64_t contLen);
// Because taosd supports multi-process mode
// These functions should not be used on the server side
......
......@@ -29,12 +29,12 @@ extern "C" {
#define free FREE_FUNC_TAOS_FORBID
#endif
void *taosMemoryMalloc(int32_t size);
void *taosMemoryCalloc(int32_t num, int32_t size);
void *taosMemoryRealloc(void *ptr, int32_t size);
void *taosMemoryMalloc(int64_t size);
void *taosMemoryCalloc(int64_t num, int64_t size);
void *taosMemoryRealloc(void *ptr, int64_t size);
void *taosMemoryStrDup(const char *ptr);
void taosMemoryFree(void *ptr);
int32_t taosMemorySize(void *ptr);
int64_t taosMemorySize(void *ptr);
void taosPrintBackTrace();
#define taosMemoryFreeClear(ptr) \
......
......@@ -285,7 +285,7 @@ if [ "$osType" != "Darwin" ]; then
if [[ "$pagMode" == "full" ]]; then
if [ -d ${top_dir}/tools/taos-tools/packaging/rpm ]; then
cd ${top_dir}/tools/taos-tools/packaging/rpm
taostools_ver=$(git tag |grep -v taos | sort | tail -1)
taos_tools_ver=$(git tag |grep -v taos | sort | tail -1)
[ -z "$taos_tools_ver" ] && taos_tools_ver="0.1.0"
${csudo}./make-taos-tools-rpm.sh ${top_dir} \
......
......@@ -17,12 +17,12 @@ EOF
taosd_status=`Launchctl list | grep taosd | head -n 1 | awk '{print $1}'`
if [ "$taosd_status"x = "-"x ]; then
launchctl start taosd
launchctl start com.tdengine.taosd
showAlertMessage "Taosd is running!" "TDengine" "ok" "note"
else
choose_result=`showAlertMessage "Taosd is running!\nDo you want to close it?" "TDengine" "yes,cancel" "stop"`
if [ "$choose_result"x = "button returned:yes"x ]; then
launchctl stop taosd
launchctl stop com.tdengine.taosd
fi
fi
此差异已折叠。
......@@ -3,7 +3,7 @@
<plist version="1.0">
<dict>
<key>Label</key>
<string>taosd</string>
<string>com.tdengine.taosd</string>
<key>ProgramArguments</key>
<array>
<string>/usr/local/bin/taosd</string>
......
TDengine is a high-efficient, scalable, high-available distributed time-series database, which makes a lot of optimizations on inserting and querying data, which is far more efficient than normal regular databases. So TDengine can meet the high requirements of IOT and other areas on storing and querying a large amount of data.
To configure TDengine : edit /etc/taos/taos.cfg
To start service : launchctl start taosd
To start service : launchctl start com.tdengine.taosd
To access TDengine : use taos in shell
\ No newline at end of file
......@@ -615,7 +615,7 @@ function update_TDengine() {
if [ "$osType" != "Darwin" ]; then
echo -e "${GREEN_DARK}To start ${productName} ${NC}: ${serverName}${NC}"
else
echo -e "${GREEN_DARK}To start service ${NC}: launchctl start ${serverName}${NC}"
echo -e "${GREEN_DARK}To start service ${NC}: launchctl start com.tdengine.taosd${NC}"
fi
[ -f ${installDir}/bin/taosadapter ] && \
echo -e "${GREEN_DARK}To start Taos Adapter ${NC}: taosadapter &${NC}"
......@@ -666,7 +666,7 @@ function install_TDengine() {
if [ "$osType" != "Darwin" ]; then
echo -e "${GREEN_DARK}To start ${productName} ${NC}: ${serverName}${NC}"
else
echo -e "${GREEN_DARK}To start service ${NC}: launchctl start ${serverName}${NC}"
echo -e "${GREEN_DARK}To start service ${NC}: launchctl start com.tdengine.taosd${NC}"
fi
[ -f ${installDir}/bin/taosadapter ] && \
echo -e "${GREEN_DARK}To start Taos Adapter ${NC}: taosadapter &${NC}"
......
......@@ -18,6 +18,7 @@ if [ "$osType" != "Darwin" ]; then
script_dir=$(dirname $(readlink -f "$0"))
verNumber=""
lib_file_ext="so"
lib_file_ext_1="so.1"
bin_link_dir="/usr/bin"
lib_link_dir="/usr/lib"
......@@ -29,6 +30,7 @@ else
script_dir=${source_dir}/packaging/tools
verNumber=`ls tdengine/driver | grep -E "libtaos\.[0-9]\.[0-9]" | sed "s/libtaos.//g" | sed "s/.dylib//g" | head -n 1`
lib_file_ext="dylib"
lib_file_ext_1="1.dylib"
bin_link_dir="/usr/local/bin"
lib_link_dir="/usr/local/lib"
......@@ -134,14 +136,14 @@ function install_lib() {
[ -f ${lib_link_dir}/libtaosws.${lib_file_ext} ] && ${csudo}rm -f ${lib_link_dir}/libtaosws.${lib_file_ext} || :
[ -f ${lib64_link_dir}/libtaosws.${lib_file_ext} ] && ${csudo}rm -f ${lib64_link_dir}/libtaosws.${lib_file_ext} || :
${csudo}ln -s ${lib_dir}/libtaos.* ${lib_link_dir}/libtaos.so.1
${csudo}ln -s ${lib_link_dir}/libtaos.so.1 ${lib_link_dir}/libtaos.so
${csudo}ln -s ${lib_dir}/libtaos.* ${lib_link_dir}/libtaos.${lib_file_ext_1}
${csudo}ln -s ${lib_link_dir}/libtaos.${lib_file_ext_1} ${lib_link_dir}/libtaos.${lib_file_ext}
[ -f ${lib_dir}/libtaosws.${lib_file_ext} ] && ${csudo}ln -sf ${lib_dir}/libtaosws.${lib_file_ext} ${lib_link_dir}/libtaosws.${lib_file_ext} ||:
if [[ -d ${lib64_link_dir} && ! -e ${lib64_link_dir}/libtaos.so ]]; then
${csudo}ln -s ${lib_dir}/libtaos.* ${lib64_link_dir}/libtaos.so.1 || :
${csudo}ln -s ${lib64_link_dir}/libtaos.so.1 ${lib64_link_dir}/libtaos.so || :
if [[ -d ${lib64_link_dir} && ! -e ${lib64_link_dir}/libtaos.${lib_file_ext} ]]; then
${csudo}ln -s ${lib_dir}/libtaos.* ${lib64_link_dir}/libtaos.${lib_file_ext_1} || :
${csudo}ln -s ${lib64_link_dir}/libtaos.${lib_file_ext_1} ${lib64_link_dir}/libtaos.${lib_file_ext} || :
[ -f ${lib_dir}/libtaosws.${lib_file_ext} ] && ${csudo}ln -sf ${lib_dir}/libtaosws.${lib_file_ext} ${lib64_link_dir}/libtaosws.${lib_file_ext} || :
fi
......
......@@ -117,7 +117,7 @@ function clean_local_bin() {
function clean_lib() {
# Remove link
${csudo}rm -f ${lib_link_dir}/libtaos.* || :
[ -f ${lib_link_dir}/libtaosws.so ] && ${csudo}rm -f ${lib_link_dir}/libtaosws.so || :
[ -f ${lib_link_dir}/libtaosws.* ] && ${csudo}rm -f ${lib_link_dir}/libtaosws.* || :
${csudo}rm -f ${lib64_link_dir}/libtaos.* || :
[ -f ${lib64_link_dir}/libtaosws.* ] && ${csudo}rm -f ${lib64_link_dir}/libtaosws.* || :
......
......@@ -19,6 +19,13 @@ target_link_libraries(
PRIVATE os util common transport nodes parser command planner catalog scheduler function qcom
)
if(TD_DARWIN_ARM64)
target_link_libraries(
taos
PRIVATE "-arch x86_64"
)
endif()
if(TD_WINDOWS)
INCLUDE_DIRECTORIES(jni/windows)
INCLUDE_DIRECTORIES(jni/windows/win32)
......
......@@ -2477,11 +2477,12 @@ static void smlInsertCallback(void *param, void *res, int32_t code) {
} else {
pParam->request->body.resInfo.numOfRows += info->affectedRows;
}
// unlock
taosThreadSpinUnlock(&pParam->lock);
if (pParam->cnt == pParam->total) {
tsem_post(&pParam->sem);
}
taosThreadSpinUnlock(&pParam->lock);
// unlock
uDebug("SML:0x%" PRIx64 " insert finished, code: %d, rows: %d, total: %d", info->id, code, rows, info->affectedRows);
info->cost.endTime = taosGetTimestampUs();
info->cost.code = code;
......
......@@ -429,7 +429,8 @@ typedef struct {
SDataFReader *pDataFReader;
TSDBROW row;
SMergeTree mergeTree;
SMergeTree mergeTree;
SMergeTree *pMergeTree;
} SFSLastNextRowIter;
static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) {
......@@ -444,11 +445,14 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) {
case SFSLASTNEXTROW_FILESET: {
SDFileSet *pFileSet = NULL;
_next_fileset:
if (state->pMergeTree != NULL) {
tMergeTreeClose(state->pMergeTree);
state->pMergeTree = NULL;
}
if (--state->iFileSet >= 0) {
pFileSet = (SDFileSet *)taosArrayGet(state->aDFileSet, state->iFileSet);
} else {
tMergeTreeClose(&state->mergeTree);
*ppRow = NULL;
return code;
}
......@@ -460,10 +464,10 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) {
tMergeTreeOpen(&state->mergeTree, 1, state->pDataFReader, state->suid, state->uid,
&(STimeWindow){.skey = TSKEY_MIN, .ekey = TSKEY_MAX},
&(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, pLoadInfo, true, NULL);
state->pMergeTree = &state->mergeTree;
bool hasVal = tMergeTreeNext(&state->mergeTree);
if (!hasVal) {
state->state = SFSLASTNEXTROW_FILESET;
tMergeTreeClose(&state->mergeTree);
goto _next_fileset;
}
state->state = SFSLASTNEXTROW_BLOCKROW;
......@@ -475,6 +479,7 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) {
if (!hasVal) {
state->state = SFSLASTNEXTROW_FILESET;
}
return code;
default:
ASSERT(0);
......@@ -486,6 +491,11 @@ _err:
tsdbDataFReaderClose(&state->pDataFReader);
state->pDataFReader = NULL;
}
if (state->pMergeTree != NULL) {
tMergeTreeClose(state->pMergeTree);
state->pMergeTree = NULL;
}
*ppRow = NULL;
return code;
......@@ -504,6 +514,11 @@ int32_t clearNextRowFromFSLast(void *iter) {
state->pDataFReader = NULL;
}
if (state->pMergeTree != NULL) {
tMergeTreeClose(state->pMergeTree);
state->pMergeTree = NULL;
}
return code;
}
......
......@@ -101,8 +101,8 @@ void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) {
int32_t code = 0;
SSttBlockLoadInfo* pInfo = pIter->pBlockLoadInfo;
if (pInfo->blockIndex[0] == pIter->iSttBlk) {
SSttBlockLoadInfo *pInfo = pIter->pBlockLoadInfo;
if (pInfo->blockIndex[0] == pIter->iSttBlk) {
if (pInfo->currentLoadBlockIndex != 0) {
tsdbDebug("current load index is set to 0, block index:%d, file index:%d, due to uid:%" PRIu64 ", load data, %s",
pIter->iSttBlk, pIter->iStt, pIter->uid, idStr);
......@@ -113,7 +113,7 @@ static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) {
if (pInfo->blockIndex[1] == pIter->iSttBlk) {
if (pInfo->currentLoadBlockIndex != 1) {
tsdbDebug("current load index is set to 1, block index:%d, file index:%d, due to uid:%"PRIu64", load data, %s",
tsdbDebug("current load index is set to 1, block index:%d, file index:%d, due to uid:%" PRIu64 ", load data, %s",
pIter->iSttBlk, pIter->iStt, pIter->uid, idStr);
pInfo->currentLoadBlockIndex = 1;
}
......@@ -140,8 +140,10 @@ static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) {
pInfo->elapsedTime += el;
pInfo->loadBlocks += 1;
tsdbDebug("read last block, total load:%d, trigger by uid:%"PRIu64", last file index:%d, last block index:%d, entry:%d, %p, elapsed time:%.2f ms, %s",
pInfo->loadBlocks, pIter->uid, pIter->iStt, pIter->iSttBlk, pInfo->currentLoadBlockIndex, pBlock, el, idStr);
tsdbDebug("read last block, total load:%d, trigger by uid:%" PRIu64
", last file index:%d, last block index:%d, entry:%d, %p, elapsed time:%.2f ms, %s",
pInfo->loadBlocks, pIter->uid, pIter->iStt, pIter->iSttBlk, pInfo->currentLoadBlockIndex, pBlock, el,
idStr);
if (code != TSDB_CODE_SUCCESS) {
goto _exit;
}
......@@ -336,7 +338,7 @@ _exit:
void tLDataIterClose(SLDataIter *pIter) { taosMemoryFree(pIter); }
void tLDataIterNextBlock(SLDataIter *pIter, const char* idStr) {
void tLDataIterNextBlock(SLDataIter *pIter, const char *idStr) {
int32_t step = pIter->backward ? -1 : 1;
int32_t oldIndex = pIter->iSttBlk;
......@@ -386,10 +388,10 @@ void tLDataIterNextBlock(SLDataIter *pIter, const char* idStr) {
if (index != -1) {
pIter->iSttBlk = index;
pIter->pSttBlk = (SSttBlk *)taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, pIter->iSttBlk);
tsdbDebug("try next last file block:%d from %d, trigger by uid:%"PRIu64", file index:%d, %s", pIter->iSttBlk, oldIndex, pIter->uid, pIter->iStt,
idStr);
tsdbDebug("try next last file block:%d from %d, trigger by uid:%" PRIu64 ", file index:%d, %s", pIter->iSttBlk,
oldIndex, pIter->uid, pIter->iStt, idStr);
} else {
tsdbDebug("no more last block qualified, uid:%"PRIu64", file index::%d, %s", pIter->uid, oldIndex, idStr);
tsdbDebug("no more last block qualified, uid:%" PRIu64 ", file index::%d, %s", pIter->uid, oldIndex, idStr);
}
}
......
......@@ -1645,7 +1645,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
STSRow* pTSRow = NULL;
SRowMerger merge = {0};
TSDBROW fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
tsdbTrace("fRow ptr:%p, %d, uid:%"PRIu64", %s", fRow.pBlockData, fRow.iRow, pLastBlockReader->uid, pReader->idStr);
tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", %s", fRow.pBlockData, fRow.iRow, pLastBlockReader->uid, pReader->idStr);
// only last block exists
if ((!mergeBlockData) || (tsLastBlock != pBlockData->aTSKEY[pDumpInfo->rowIndex])) {
......
......@@ -638,7 +638,7 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SArray
char* msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_QNODE_LIST;
void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
ctgDebug("try to get qnode list from mnode, mgmtEpInUse:%d", pConn->mgmtEps.inUse);
......@@ -692,7 +692,7 @@ int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SArray
char* msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_DNODE_LIST;
void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
ctgDebug("try to get dnode list from mnode, mgmtEpInUse:%d", pConn->mgmtEps.inUse);
......@@ -743,7 +743,7 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SBuildU
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_USE_DB;
SCtgTask* pTask = tReq ? tReq->pTask : NULL;
void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
ctgDebug("try to get db vgInfo from mnode, dbFName:%s", input->db);
......@@ -795,7 +795,7 @@ int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char
char* msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_GET_DB_CFG;
void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
ctgDebug("try to get db cfg from mnode, dbFName:%s", dbFName);
......@@ -850,7 +850,7 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const
char* msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_GET_INDEX;
void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
ctgDebug("try to get index from mnode, indexName:%s", indexName);
......@@ -905,7 +905,7 @@ int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName* n
char* msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_GET_TABLE_INDEX;
void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
char tbFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(name, tbFName);
......@@ -962,7 +962,7 @@ int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const ch
char* msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_RETRIEVE_FUNC;
void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
ctgDebug("try to get udf info from mnode, funcName:%s", funcName);
......@@ -1017,7 +1017,7 @@ int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const
char* msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_GET_USER_AUTH;
void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
ctgDebug("try to get user auth from mnode, user:%s", user);
......@@ -1077,7 +1077,7 @@ int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo* pConn, char*
int32_t reqType = TDMT_MND_TABLE_META;
char tbFName[TSDB_TABLE_FNAME_LEN];
sprintf(tbFName, "%s.%s", dbFName, tbName);
void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
ctgDebug("try to get table meta from mnode, tbFName:%s", tbFName);
......@@ -1140,7 +1140,7 @@ int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SNa
int32_t reqType = TDMT_VND_TABLE_META;
char tbFName[TSDB_TABLE_FNAME_LEN];
sprintf(tbFName, "%s.%s", dbFName, pTableName->tname);
void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
SEp* pEp = &vgroupInfo->epSet.eps[vgroupInfo->epSet.inUse];
ctgDebug("try to get table meta from vnode, vgId:%d, ep num:%d, ep %s:%d, tbFName:%s", vgroupInfo->vgId,
......@@ -1209,7 +1209,7 @@ int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const S
int32_t reqType = TDMT_VND_TABLE_CFG;
char tbFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pTableName, tbFName);
void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(pTableName, dbFName);
SBuildTableInput bInput = {.vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char*)pTableName->tname};
......@@ -1274,7 +1274,7 @@ int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const S
int32_t reqType = TDMT_MND_TABLE_CFG;
char tbFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pTableName, tbFName);
void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(pTableName, dbFName);
SBuildTableInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = (char*)pTableName->tname};
......@@ -1326,7 +1326,7 @@ int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, char** ou
char* msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_SERVER_VERSION;
void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
qDebug("try to get svr ver from mnode");
......
......@@ -1250,6 +1250,17 @@ void createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
pExp->base.resSchema =
createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pOpNode->node.aliasName);
pExp->pExpr->_optrRoot.pRootNode = pNode;
} else if (type == QUERY_NODE_CASE_WHEN) {
pExp->pExpr->nodeType = QUERY_NODE_OPERATOR;
SCaseWhenNode* pCaseNode = (SCaseWhenNode*)pNode;
pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
pExp->base.numOfParams = 1;
SDataType* pType = &pCaseNode->node.resType;
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, slotId, pType->scale,
pType->precision, pCaseNode->node.aliasName);
pExp->pExpr->_optrRoot.pRootNode = pNode;
} else {
ASSERT(0);
}
......
......@@ -131,7 +131,7 @@ static int32_t valueNodeCopy(const SValueNode* pSrc, SValueNode* pDst) {
COPY_SCALAR_FIELD(placeholderNo);
COPY_SCALAR_FIELD(typeData);
COPY_SCALAR_FIELD(unit);
if (!pSrc->translate) {
if (!pSrc->translate || pSrc->isNull) {
return TSDB_CODE_SUCCESS;
}
switch (pSrc->node.resType.type) {
......
......@@ -3011,7 +3011,7 @@ static int32_t valueNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkValueIsNull, pNode->isNull);
}
if (TSDB_CODE_SUCCESS == code && pNode->translate) {
if (TSDB_CODE_SUCCESS == code && pNode->translate && !pNode->isNull) {
code = datumToJson(pNode, pJson);
}
......@@ -3161,7 +3161,7 @@ static int32_t jsonToValueNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkValueIsNull, &pNode->isNull);
}
if (TSDB_CODE_SUCCESS == code && pNode->translate) {
if (TSDB_CODE_SUCCESS == code && pNode->translate && !pNode->isNull) {
code = jsonToDatum(pJson, pNode);
}
......
......@@ -1608,7 +1608,7 @@ char* nodesGetStrValueFromNode(SValueNode* pNode) {
bool nodesIsExprNode(const SNode* pNode) {
ENodeType type = nodeType(pNode);
return (QUERY_NODE_COLUMN == type || QUERY_NODE_VALUE == type || QUERY_NODE_OPERATOR == type ||
QUERY_NODE_FUNCTION == type || QUERY_NODE_LOGIC_CONDITION == type);
QUERY_NODE_FUNCTION == type || QUERY_NODE_LOGIC_CONDITION == type || QUERY_NODE_CASE_WHEN == type);
}
bool nodesIsUnaryOp(const SOperatorNode* pOp) {
......
......@@ -66,7 +66,8 @@ static EDealRes doRewriteExpr(SNode** pNode, void* pContext) {
switch (nodeType(*pNode)) {
case QUERY_NODE_OPERATOR:
case QUERY_NODE_LOGIC_CONDITION:
case QUERY_NODE_FUNCTION: {
case QUERY_NODE_FUNCTION:
case QUERY_NODE_CASE_WHEN: {
SRewriteExprCxt* pCxt = (SRewriteExprCxt*)pContext;
SNode* pExpr;
int32_t index = 0;
......@@ -118,6 +119,17 @@ static EDealRes doNameExpr(SNode* pNode, void* pContext) {
return DEAL_RES_CONTINUE;
}
static int32_t rewriteExprForSelect(SNode* pExpr, SSelectStmt* pSelect, ESqlClause clause) {
nodesWalkExpr(pExpr, doNameExpr, NULL);
SRewriteExprCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pExprs = NULL};
cxt.errCode = nodesListMakeAppend(&cxt.pExprs, pExpr);
if (TSDB_CODE_SUCCESS == cxt.errCode) {
nodesRewriteSelectStmt(pSelect, clause, doRewriteExpr, &cxt);
nodesClearList(cxt.pExprs);
}
return cxt.errCode;
}
static int32_t rewriteExprsForSelect(SNodeList* pExprs, SSelectStmt* pSelect, ESqlClause clause) {
nodesWalkExprs(pExprs, doNameExpr, NULL);
SRewriteExprCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pExprs = pExprs};
......@@ -711,8 +723,13 @@ static int32_t createWindowLogicNodeByState(SLogicPlanContext* pCxt, SStateWindo
nodesDestroyNode((SNode*)pWindow);
return TSDB_CODE_OUT_OF_MEMORY;
}
return createWindowLogicNodeFinalize(pCxt, pSelect, pWindow, pLogicNode);
// rewrite the expression in subsequent clauses
int32_t code = rewriteExprForSelect(pWindow->pStateExpr, pSelect, SQL_CLAUSE_WINDOW);
if (TSDB_CODE_SUCCESS == code) {
code = createWindowLogicNodeFinalize(pCxt, pSelect, pWindow, pLogicNode);
}
return code;
}
static int32_t createWindowLogicNodeBySession(SLogicPlanContext* pCxt, SSessionWindowNode* pSession,
......
......@@ -765,7 +765,8 @@ static EDealRes doRewritePrecalcExprs(SNode** pNode, void* pContext) {
return collectAndRewrite(pCxt, pNode);
}
case QUERY_NODE_OPERATOR:
case QUERY_NODE_LOGIC_CONDITION: {
case QUERY_NODE_LOGIC_CONDITION:
case QUERY_NODE_CASE_WHEN: {
return collectAndRewrite(pCxt, pNode);
}
case QUERY_NODE_FUNCTION: {
......
......@@ -53,7 +53,8 @@ static EDealRes doCreateColumn(SNode* pNode, void* pContext) {
}
case QUERY_NODE_OPERATOR:
case QUERY_NODE_LOGIC_CONDITION:
case QUERY_NODE_FUNCTION: {
case QUERY_NODE_FUNCTION:
case QUERY_NODE_CASE_WHEN: {
SExprNode* pExpr = (SExprNode*)pNode;
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == pCol) {
......
......@@ -24,8 +24,8 @@
#pragma GCC diagnostic ignored "-Wformat-truncation"
#endif
int32_t (*queryBuildMsg[TDMT_MAX])(void *input, char **msg, int32_t msgSize, int32_t *msgLen,
void *(*mallocFp)(int32_t)) = {0};
int32_t (*queryBuildMsg[TDMT_MAX])(void *input, char **msg, int32_t msgSize, int32_t *msgLen,
void *(*mallocFp)(int64_t)) = {0};
int32_t (*queryProcessMsgRsp[TDMT_MAX])(void *output, char *msg, int32_t msgSize) = {0};
int32_t queryBuildUseDbOutput(SUseDbOutput *pOut, SUseDbRsp *usedbRsp) {
......@@ -67,8 +67,8 @@ int32_t queryBuildUseDbOutput(SUseDbOutput *pOut, SUseDbRsp *usedbRsp) {
return TSDB_CODE_SUCCESS;
}
int32_t queryBuildTableMetaReqMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen,
void *(*mallcFp)(int32_t)) {
int32_t queryBuildTableMetaReqMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen,
void *(*mallcFp)(int64_t)) {
SBuildTableInput *pInput = input;
if (NULL == input || NULL == msg || NULL == msgLen) {
return TSDB_CODE_TSC_INVALID_INPUT;
......@@ -91,7 +91,7 @@ int32_t queryBuildTableMetaReqMsg(void *input, char **msg, int32_t msgSize, int3
return TSDB_CODE_SUCCESS;
}
int32_t queryBuildUseDbMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void *(*mallcFp)(int32_t)) {
int32_t queryBuildUseDbMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void *(*mallcFp)(int64_t)) {
SBuildUseDBInput *pInput = input;
if (NULL == pInput || NULL == msg || NULL == msgLen) {
return TSDB_CODE_TSC_INVALID_INPUT;
......@@ -114,7 +114,7 @@ int32_t queryBuildUseDbMsg(void *input, char **msg, int32_t msgSize, int32_t *ms
return TSDB_CODE_SUCCESS;
}
int32_t queryBuildQnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void *(*mallcFp)(int32_t)) {
int32_t queryBuildQnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void *(*mallcFp)(int64_t)) {
if (NULL == msg || NULL == msgLen) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
......@@ -132,7 +132,7 @@ int32_t queryBuildQnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t
return TSDB_CODE_SUCCESS;
}
int32_t queryBuildDnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void *(*mallcFp)(int32_t)) {
int32_t queryBuildDnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void *(*mallcFp)(int64_t)) {
if (NULL == msg || NULL == msgLen) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
......@@ -150,7 +150,7 @@ int32_t queryBuildDnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t
return TSDB_CODE_SUCCESS;
}
int32_t queryBuildGetSerVerMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void *(*mallcFp)(int32_t)) {
int32_t queryBuildGetSerVerMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void *(*mallcFp)(int64_t)) {
if (NULL == msg || NULL == msgLen) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
......@@ -167,7 +167,7 @@ int32_t queryBuildGetSerVerMsg(void *input, char **msg, int32_t msgSize, int32_t
return TSDB_CODE_SUCCESS;
}
int32_t queryBuildGetDBCfgMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void *(*mallcFp)(int32_t)) {
int32_t queryBuildGetDBCfgMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void *(*mallcFp)(int64_t)) {
if (NULL == msg || NULL == msgLen) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
......@@ -185,7 +185,7 @@ int32_t queryBuildGetDBCfgMsg(void *input, char **msg, int32_t msgSize, int32_t
return TSDB_CODE_SUCCESS;
}
int32_t queryBuildGetIndexMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void *(*mallcFp)(int32_t)) {
int32_t queryBuildGetIndexMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void *(*mallcFp)(int64_t)) {
if (NULL == msg || NULL == msgLen) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
......@@ -203,8 +203,8 @@ int32_t queryBuildGetIndexMsg(void *input, char **msg, int32_t msgSize, int32_t
return TSDB_CODE_SUCCESS;
}
int32_t queryBuildRetrieveFuncMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen,
void *(*mallcFp)(int32_t)) {
int32_t queryBuildRetrieveFuncMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen,
void *(*mallcFp)(int64_t)) {
if (NULL == msg || NULL == msgLen) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
......@@ -227,7 +227,7 @@ int32_t queryBuildRetrieveFuncMsg(void *input, char **msg, int32_t msgSize, int3
return TSDB_CODE_SUCCESS;
}
int32_t queryBuildGetUserAuthMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void *(*mallcFp)(int32_t)) {
int32_t queryBuildGetUserAuthMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void *(*mallcFp)(int64_t)) {
if (NULL == msg || NULL == msgLen) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
......@@ -245,7 +245,7 @@ int32_t queryBuildGetUserAuthMsg(void *input, char **msg, int32_t msgSize, int32
return TSDB_CODE_SUCCESS;
}
int32_t queryBuildGetTbIndexMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void *(*mallcFp)(int32_t)) {
int32_t queryBuildGetTbIndexMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void *(*mallcFp)(int64_t)) {
if (NULL == msg || NULL == msgLen) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
......@@ -263,7 +263,7 @@ int32_t queryBuildGetTbIndexMsg(void *input, char **msg, int32_t msgSize, int32_
return TSDB_CODE_SUCCESS;
}
int32_t queryBuildGetTbCfgMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void *(*mallcFp)(int32_t)) {
int32_t queryBuildGetTbCfgMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void *(*mallcFp)(int64_t)) {
if (NULL == msg || NULL == msgLen) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
......
......@@ -86,7 +86,7 @@ typedef struct SScalarCtx {
} \
} while (0)
int32_t doConvertDataType(SValueNode* pValueNode, SScalarParam* out, int32_t* overflow);
int32_t sclConvertValueToSclParam(SValueNode* pValueNode, SScalarParam* out, int32_t* overflow);
int32_t sclCreateColumnInfoData(SDataType* pType, int32_t numOfRows, SScalarParam* pParam);
int32_t sclConvertToTsValueNode(int8_t precision, SValueNode* valueNode);
......@@ -95,6 +95,11 @@ int32_t sclConvertToTsValueNode(int8_t precision, SValueNode* valueNode);
#define GET_PARAM_PRECISON(_c) ((_c)->columnData->info.precision)
void sclFreeParam(SScalarParam* param);
void doVectorCompare(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, int32_t startIndex, int32_t numOfRows,
int32_t _ord, int32_t optr);
void vectorCompareImpl(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, int32_t startIndex, int32_t numOfRows,
int32_t _ord, int32_t optr);
void vectorCompare(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, int32_t _ord, int32_t optr);
#ifdef __cplusplus
}
......
......@@ -20,6 +20,15 @@
extern "C" {
#endif
typedef struct SSclVectorConvCtx {
const SScalarParam* pIn;
SScalarParam* pOut;
int32_t startIndex;
int32_t endIndex;
int16_t inType;
int16_t outType;
} SSclVectorConvCtx;
typedef double (*_getDoubleValue_fn_t)(void *src, int32_t index);
static FORCE_INLINE double getVectorDoubleValue_TINYINT(void *src, int32_t index) {
......
......@@ -1165,7 +1165,7 @@ int32_t fltAddGroupUnitFromNode(SFilterInfo *info, SNode *tree, SArray *group) {
SValueNode *valueNode = (SValueNode *)cell->pNode;
if (valueNode->node.resType.type != type) {
int32_t overflow = 0;
code = doConvertDataType(valueNode, &out, &overflow);
code = sclConvertValueToSclParam(valueNode, &out, &overflow);
if (code) {
// fltError("convert from %d to %d failed", in.type, out.type);
FLT_ERR_RET(code);
......@@ -1973,7 +1973,7 @@ int32_t fltInitValFieldData(SFilterInfo *info) {
}
// todo refactor the convert
int32_t code = doConvertDataType(var, &out, NULL);
int32_t code = sclConvertValueToSclParam(var, &out, NULL);
if (code != TSDB_CODE_SUCCESS) {
qError("convert value to type[%d] failed", type);
return TSDB_CODE_TSC_INVALID_OPERATION;
......@@ -3792,6 +3792,11 @@ EDealRes fltReviseRewriter(SNode **pNode, void *pContext) {
return DEAL_RES_CONTINUE;
}
if (QUERY_NODE_CASE_WHEN == nodeType(*pNode) || QUERY_NODE_WHEN_THEN == nodeType(*pNode)) {
stat->scalarMode = true;
return DEAL_RES_CONTINUE;
}
if (QUERY_NODE_OPERATOR == nodeType(*pNode)) {
SOperatorNode *node = (SOperatorNode *)*pNode;
if (!FLT_IS_COMPARISON_OPERATOR(node->opType)) {
......
......@@ -61,7 +61,7 @@ int32_t sclCreateColumnInfoData(SDataType *pType, int32_t numOfRows, SScalarPara
return TSDB_CODE_SUCCESS;
}
int32_t doConvertDataType(SValueNode *pValueNode, SScalarParam *out, int32_t *overflow) {
int32_t sclConvertValueToSclParam(SValueNode* pValueNode, SScalarParam* out, int32_t* overflow) {
SScalarParam in = {.numOfRows = 1};
int32_t code = sclCreateColumnInfoData(&pValueNode->node.resType, 1, &in);
if (code != TSDB_CODE_SUCCESS) {
......@@ -71,12 +71,34 @@ int32_t doConvertDataType(SValueNode *pValueNode, SScalarParam *out, int32_t *ov
colDataAppend(in.columnData, 0, nodesGetValueFromNode(pValueNode), false);
colInfoDataEnsureCapacity(out->columnData, 1);
code = vectorConvertImpl(&in, out, overflow);
code = vectorConvertSingleColImpl(&in, out, overflow, -1, -1);
sclFreeParam(&in);
return code;
}
int32_t sclExtendResRows(SScalarParam *pDst, SScalarParam *pSrc, SArray *pBlockList) {
SSDataBlock* pb = taosArrayGetP(pBlockList, 0);
SScalarParam *pLeft = taosMemoryCalloc(1, sizeof(SScalarParam));
if (NULL == pLeft) {
sclError("calloc %d failed", (int32_t)sizeof(SScalarParam));
SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pLeft->numOfRows = pb->info.rows;
if (pDst->numOfRows < pb->info.rows) {
colInfoDataEnsureCapacity(pDst->columnData, pb->info.rows);
}
_bin_scalar_fn_t OperatorFn = getBinScalarOperatorFn(OP_TYPE_ASSIGN);
OperatorFn(pLeft, pSrc, pDst, TSDB_ORDER_ASC);
taosMemoryFree(pLeft);
return TSDB_CODE_SUCCESS;
}
int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type) {
SHashObj *pObj = taosHashInit(256, taosGetDefaultHashFunction(type), true, false);
if (NULL == pObj) {
......@@ -110,7 +132,7 @@ int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type) {
}
int32_t overflow = 0;
code = doConvertDataType(valueNode, &out, &overflow);
code = sclConvertValueToSclParam(valueNode, &out, &overflow);
if (code != TSDB_CODE_SUCCESS) {
// sclError("convert data from %d to %d failed", in.type, out.type);
SCL_ERR_JRET(code);
......@@ -178,7 +200,7 @@ void sclFreeRes(SHashObj *res) {
}
void sclFreeParam(SScalarParam *param) {
if (!param->colAlloced) {
if (NULL == param || !param->colAlloced) {
return;
}
......@@ -386,7 +408,8 @@ int32_t sclInitParam(SNode *node, SScalarParam *param, SScalarCtx *ctx, int32_t
}
case QUERY_NODE_FUNCTION:
case QUERY_NODE_OPERATOR:
case QUERY_NODE_LOGIC_CONDITION: {
case QUERY_NODE_LOGIC_CONDITION:
case QUERY_NODE_CASE_WHEN: {
SScalarParam *res = (SScalarParam *)taosHashGet(ctx->pRes, &node, POINTER_BYTES);
if (NULL == res) {
sclError("no result for node, type:%d, node:%p", nodeType(node), node);
......@@ -540,6 +563,135 @@ _return:
SCL_RET(code);
}
int32_t sclGetNodeRes(SNode* node, SScalarCtx *ctx, SScalarParam **res) {
if (NULL == node) {
return TSDB_CODE_SUCCESS;
}
int32_t rowNum = 0;
*res = taosMemoryCalloc(1, sizeof(**res));
if (NULL == *res) {
SCL_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
SCL_ERR_RET(sclInitParam(node, *res, ctx, &rowNum));
return TSDB_CODE_SUCCESS;
}
int32_t sclWalkCaseWhenList(SScalarCtx *ctx, SNodeList* pList, struct SListCell* pCell, SScalarParam *pCase, SScalarParam *pElse, SScalarParam *pComp, SScalarParam *output, int32_t rowIdx, int32_t totalRows, bool *complete) {
SNode *node = NULL;
SWhenThenNode* pWhenThen = NULL;
SScalarParam *pWhen = NULL;
SScalarParam *pThen = NULL;
int32_t code = 0;
for (SListCell* cell = pCell; (NULL != cell ? (node = cell->pNode, true) : (node = NULL, false)); cell = cell->pNext) {
pWhenThen = (SWhenThenNode*)node;
SCL_ERR_RET(sclGetNodeRes(pWhenThen->pWhen, ctx, &pWhen));
SCL_ERR_RET(sclGetNodeRes(pWhenThen->pThen, ctx, &pThen));
vectorCompareImpl(pCase, pWhen, pComp, rowIdx, 1, TSDB_ORDER_ASC, OP_TYPE_EQUAL);
bool *equal = (bool*)colDataGetData(pComp->columnData, rowIdx);
if (*equal) {
colDataAppend(output->columnData, rowIdx, colDataGetData(pThen->columnData, (pThen->numOfRows > 1 ? rowIdx : 0)), colDataIsNull_s(pThen->columnData, (pThen->numOfRows > 1 ? rowIdx : 0)));
if (0 == rowIdx && 1 == pCase->numOfRows && 1 == pWhen->numOfRows && 1 == pThen->numOfRows && totalRows > 1) {
SCL_ERR_JRET(sclExtendResRows(output, output, ctx->pBlockList));
*complete = true;
}
goto _return;
}
}
if (pElse) {
colDataAppend(output->columnData, rowIdx, colDataGetData(pElse->columnData, (pElse->numOfRows > 1 ? rowIdx : 0)), colDataIsNull_s(pElse->columnData, (pElse->numOfRows > 1 ? rowIdx : 0)));
if (0 == rowIdx && 1 == pCase->numOfRows && 1 == pElse->numOfRows && totalRows > 1) {
SCL_ERR_JRET(sclExtendResRows(output, output, ctx->pBlockList));
*complete = true;
}
goto _return;
}
colDataAppend(output->columnData, rowIdx, NULL, true);
if (0 == rowIdx && 1 == pCase->numOfRows && totalRows > 1) {
SCL_ERR_JRET(sclExtendResRows(output, output, ctx->pBlockList));
*complete = true;
}
_return:
sclFreeParam(pWhen);
sclFreeParam(pThen);
SCL_RET(code);
}
int32_t sclWalkWhenList(SScalarCtx *ctx, SNodeList* pList, struct SListCell* pCell, SScalarParam *pElse, SScalarParam *output,
int32_t rowIdx, int32_t totalRows, bool *complete, bool preSingle) {
SNode *node = NULL;
SWhenThenNode* pWhenThen = NULL;
SScalarParam *pWhen = NULL;
SScalarParam *pThen = NULL;
int32_t code = 0;
for (SListCell* cell = pCell; (NULL != cell ? (node = cell->pNode, true) : (node = NULL, false)); cell = cell->pNext) {
pWhenThen = (SWhenThenNode*)node;
pWhen = NULL;
pThen = NULL;
SCL_ERR_JRET(sclGetNodeRes(pWhenThen->pWhen, ctx, &pWhen));
SCL_ERR_JRET(sclGetNodeRes(pWhenThen->pThen, ctx, &pThen));
bool *whenValue = (bool*)colDataGetData(pWhen->columnData, (pWhen->numOfRows > 1 ? rowIdx : 0));
if (*whenValue) {
colDataAppend(output->columnData, rowIdx, colDataGetData(pThen->columnData, (pThen->numOfRows > 1 ? rowIdx : 0)), colDataIsNull_s(pThen->columnData, (pThen->numOfRows > 1 ? rowIdx : 0)));
if (preSingle && 0 == rowIdx && 1 == pWhen->numOfRows && 1 == pThen->numOfRows && totalRows > 1) {
SCL_ERR_JRET(sclExtendResRows(output, output, ctx->pBlockList));
*complete = true;
}
goto _return;
}
sclFreeParam(pWhen);
sclFreeParam(pThen);
}
if (pElse) {
colDataAppend(output->columnData, rowIdx, colDataGetData(pElse->columnData, (pElse->numOfRows > 1 ? rowIdx : 0)), colDataIsNull_s(pElse->columnData, (pElse->numOfRows > 1 ? rowIdx : 0)));
if (preSingle && 0 == rowIdx && 1 == pElse->numOfRows && totalRows > 1) {
SCL_ERR_JRET(sclExtendResRows(output, output, ctx->pBlockList));
*complete = true;
}
goto _return;
}
colDataAppend(output->columnData, rowIdx, NULL, true);
if (preSingle && 0 == rowIdx && totalRows > 1) {
SCL_ERR_JRET(sclExtendResRows(output, output, ctx->pBlockList));
*complete = true;
}
_return:
sclFreeParam(pWhen);
sclFreeParam(pThen);
SCL_RET(code);
}
int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *output) {
SScalarParam *params = NULL;
int32_t rowNum = 0;
......@@ -698,6 +850,101 @@ _return:
SCL_RET(code);
}
int32_t sclExecCaseWhen(SCaseWhenNode *node, SScalarCtx *ctx, SScalarParam *output) {
int32_t code = 0;
SScalarParam *pCase = NULL;
SScalarParam *pElse = NULL;
SScalarParam *pWhen = NULL;
SScalarParam *pThen = NULL;
SScalarParam comp = {0};
int32_t rowNum = 1;
bool complete = false;
if (NULL == node->pWhenThenList || node->pWhenThenList->length <= 0) {
sclError("invalid whenThen list");
SCL_ERR_RET(TSDB_CODE_INVALID_PARA);
}
if (ctx->pBlockList) {
SSDataBlock* pb = taosArrayGetP(ctx->pBlockList, 0);
rowNum = pb->info.rows;
output->numOfRows = pb->info.rows;
}
SCL_ERR_JRET(sclCreateColumnInfoData(&node->node.resType, rowNum, output));
SCL_ERR_JRET(sclGetNodeRes(node->pCase, ctx, &pCase));
SCL_ERR_JRET(sclGetNodeRes(node->pElse, ctx, &pElse));
SDataType compType = {0};
compType.type = TSDB_DATA_TYPE_BOOL;
compType.bytes = tDataTypes[compType.type].bytes;
SCL_ERR_JRET(sclCreateColumnInfoData(&compType, rowNum, &comp));
SNode* tnode = NULL;
SWhenThenNode* pWhenThen = (SWhenThenNode*)node->pWhenThenList->pHead->pNode;
SCL_ERR_JRET(sclGetNodeRes(pWhenThen->pWhen, ctx, &pWhen));
SCL_ERR_JRET(sclGetNodeRes(pWhenThen->pThen, ctx, &pThen));
if (pCase) {
vectorCompare(pCase, pWhen, &comp, TSDB_ORDER_ASC, OP_TYPE_EQUAL);
for (int32_t i = 0; i < rowNum; ++i) {
bool *equal = (bool*)colDataGetData(comp.columnData, (comp.numOfRows > 1 ? i : 0));
if (*equal) {
colDataAppend(output->columnData, i, colDataGetData(pThen->columnData, (pThen->numOfRows > 1 ? i : 0)), colDataIsNull_s(pThen->columnData, (pThen->numOfRows > 1 ? i : 0)));
if (0 == i && 1 == pCase->numOfRows && 1 == pWhen->numOfRows && 1 == pThen->numOfRows && rowNum > 1) {
SCL_ERR_JRET(sclExtendResRows(output, output, ctx->pBlockList));
break;
}
} else {
SCL_ERR_JRET(sclWalkCaseWhenList(ctx, node->pWhenThenList, node->pWhenThenList->pHead->pNext, pCase, pElse, &comp, output, i, rowNum, &complete));
if (complete) {
break;
}
}
}
} else {
for (int32_t i = 0; i < rowNum; ++i) {
bool *whenValue = (bool*)colDataGetData(pWhen->columnData, (pWhen->numOfRows > 1 ? i : 0));
if (*whenValue) {
colDataAppend(output->columnData, i, colDataGetData(pThen->columnData, (pThen->numOfRows > 1 ? i : 0)), colDataIsNull_s(pThen->columnData, (pThen->numOfRows > 1 ? i : 0)));
if (0 == i && 1 == pWhen->numOfRows && 1 == pThen->numOfRows && rowNum > 1) {
SCL_ERR_JRET(sclExtendResRows(output, output, ctx->pBlockList));
break;
}
} else {
SCL_ERR_JRET(sclWalkWhenList(ctx, node->pWhenThenList, node->pWhenThenList->pHead->pNext, pElse, output, i, rowNum, &complete, (pWhen->numOfRows == 1 && pThen->numOfRows == 1)));
if (complete) {
break;
}
}
}
}
sclFreeParam(pCase);
sclFreeParam(pElse);
sclFreeParam(&comp);
sclFreeParam(pWhen);
sclFreeParam(pThen);
return TSDB_CODE_SUCCESS;
_return:
sclFreeParam(pCase);
sclFreeParam(pElse);
sclFreeParam(&comp);
sclFreeParam(pWhen);
sclFreeParam(pThen);
sclFreeParam(output);
SCL_RET(code);
}
EDealRes sclRewriteNullInOptr(SNode **pNode, SScalarCtx *ctx, EOperatorType opType) {
if (opType <= OP_TYPE_CALC_MAX) {
SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
......@@ -960,9 +1207,66 @@ EDealRes sclRewriteOperator(SNode **pNode, SScalarCtx *ctx) {
return DEAL_RES_CONTINUE;
}
EDealRes sclRewriteCaseWhen(SNode** pNode, SScalarCtx *ctx) {
SCaseWhenNode *node = (SCaseWhenNode *)*pNode;
if ((!SCL_IS_CONST_NODE(node->pCase)) || (!SCL_IS_CONST_NODE(node->pElse))) {
return DEAL_RES_CONTINUE;
}
SNode* tnode = NULL;
FOREACH(tnode, node->pWhenThenList) {
SWhenThenNode* pWhenThen = (SWhenThenNode*)tnode;
if (!SCL_IS_CONST_NODE(pWhenThen->pWhen) || !SCL_IS_CONST_NODE(pWhenThen->pThen)) {
return DEAL_RES_CONTINUE;
}
}
SScalarParam output = {0};
ctx->code = sclExecCaseWhen(node, ctx, &output);
if (ctx->code) {
return DEAL_RES_ERROR;
}
SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
if (NULL == res) {
sclError("make value node failed");
sclFreeParam(&output);
ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
return DEAL_RES_ERROR;
}
res->translate = true;
res->node.resType = node->node.resType;
if (colDataIsNull_s(output.columnData, 0)) {
res->isNull = true;
res->node.resType = node->node.resType;
} else {
int32_t type = output.columnData->info.type;
if (IS_VAR_DATA_TYPE(type)) { // todo refactor
res->datum.p = output.columnData->pData;
output.columnData->pData = NULL;
} else {
nodesSetValueNodeValue(res, output.columnData->pData);
}
}
nodesDestroyNode(*pNode);
*pNode = (SNode*)res;
sclFreeParam(&output);
return DEAL_RES_CONTINUE;
}
EDealRes sclConstantsRewriter(SNode **pNode, void *pContext) {
SScalarCtx *ctx = (SScalarCtx *)pContext;
if (QUERY_NODE_OPERATOR == nodeType(*pNode)) {
return sclRewriteOperator(pNode, ctx);
}
if (QUERY_NODE_FUNCTION == nodeType(*pNode)) {
return sclRewriteFunction(pNode, ctx);
}
......@@ -971,8 +1275,8 @@ EDealRes sclConstantsRewriter(SNode **pNode, void *pContext) {
return sclRewriteLogic(pNode, ctx);
}
if (QUERY_NODE_OPERATOR == nodeType(*pNode)) {
return sclRewriteOperator(pNode, ctx);
if (QUERY_NODE_CASE_WHEN == nodeType(*pNode)) {
return sclRewriteCaseWhen(pNode, ctx);
}
return DEAL_RES_CONTINUE;
......@@ -1082,13 +1386,36 @@ EDealRes sclWalkTarget(SNode *pNode, SScalarCtx *ctx) {
return DEAL_RES_CONTINUE;
}
EDealRes sclWalkCaseWhen(SNode* pNode, SScalarCtx *ctx) {
SCaseWhenNode *node = (SCaseWhenNode *)pNode;
SScalarParam output = {0};
ctx->code = sclExecCaseWhen(node, ctx, &output);
if (ctx->code) {
return DEAL_RES_ERROR;
}
if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
return DEAL_RES_ERROR;
}
return DEAL_RES_CONTINUE;
}
EDealRes sclCalcWalker(SNode *pNode, void *pContext) {
if (QUERY_NODE_VALUE == nodeType(pNode) || QUERY_NODE_NODE_LIST == nodeType(pNode) ||
QUERY_NODE_COLUMN == nodeType(pNode) || QUERY_NODE_LEFT_VALUE == nodeType(pNode)) {
if (QUERY_NODE_VALUE == nodeType(pNode) || QUERY_NODE_NODE_LIST == nodeType(pNode)
|| QUERY_NODE_COLUMN == nodeType(pNode) || QUERY_NODE_LEFT_VALUE == nodeType(pNode)
|| QUERY_NODE_WHEN_THEN == nodeType(pNode)) {
return DEAL_RES_CONTINUE;
}
SScalarCtx *ctx = (SScalarCtx *)pContext;
if (QUERY_NODE_OPERATOR == nodeType(pNode)) {
return sclWalkOperator(pNode, ctx);
}
if (QUERY_NODE_FUNCTION == nodeType(pNode)) {
return sclWalkFunction(pNode, ctx);
}
......@@ -1097,38 +1424,19 @@ EDealRes sclCalcWalker(SNode *pNode, void *pContext) {
return sclWalkLogic(pNode, ctx);
}
if (QUERY_NODE_OPERATOR == nodeType(pNode)) {
return sclWalkOperator(pNode, ctx);
}
if (QUERY_NODE_TARGET == nodeType(pNode)) {
return sclWalkTarget(pNode, ctx);
}
if (QUERY_NODE_CASE_WHEN == nodeType(pNode)) {
return sclWalkCaseWhen(pNode, ctx);
}
sclError("invalid node type for scalar calculating, type:%d", nodeType(pNode));
ctx->code = TSDB_CODE_QRY_INVALID_INPUT;
return DEAL_RES_ERROR;
}
int32_t sclExtendResRows(SScalarParam *pDst, SScalarParam *pSrc, SArray *pBlockList) {
SSDataBlock *pb = taosArrayGetP(pBlockList, 0);
SScalarParam *pLeft = taosMemoryCalloc(1, sizeof(SScalarParam));
if (NULL == pLeft) {
sclError("calloc %d failed", (int32_t)sizeof(SScalarParam));
SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pLeft->numOfRows = pb->info.rows;
colInfoDataEnsureCapacity(pDst->columnData, pb->info.rows);
_bin_scalar_fn_t OperatorFn = getBinScalarOperatorFn(OP_TYPE_ASSIGN);
OperatorFn(pLeft, pSrc, pDst, TSDB_ORDER_ASC);
taosMemoryFree(pLeft);
return TSDB_CODE_SUCCESS;
}
int32_t sclCalcConstants(SNode *pNode, bool dual, SNode **pRes) {
if (NULL == pNode) {
SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
......
此差异已折叠。
此差异已折叠。
......@@ -16,19 +16,17 @@
#include "query.h"
#include "schInt.h"
tsem_t schdRspSem;
tsem_t schdRspSem;
SSchDebug gSCHDebug = {0};
void schdExecCallback(SExecResult* pResult, void* param, int32_t code) {
if (code) {
pResult->code = code;
}
*(SExecResult*)param = *pResult;
taosMemoryFree(pResult);
tsem_post(&schdRspSem);
}
......@@ -13,10 +13,10 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "catalog.h"
#include "query.h"
#include "schInt.h"
#include "tmsg.h"
#include "query.h"
#include "catalog.h"
#include "tref.h"
void schFreeFlowCtrl(SSchJob *pJob) {
......@@ -25,14 +25,14 @@ void schFreeFlowCtrl(SSchJob *pJob) {
}
SSchFlowControl *ctrl = NULL;
void *pIter = taosHashIterate(pJob->flowCtrl, NULL);
void *pIter = taosHashIterate(pJob->flowCtrl, NULL);
while (pIter) {
ctrl = (SSchFlowControl *)pIter;
if (ctrl->taskList) {
taosArrayDestroy(ctrl->taskList);
}
pIter = taosHashIterate(pJob->flowCtrl, pIter);
}
......@@ -59,7 +59,8 @@ int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) {
return TSDB_CODE_SUCCESS;
}
pJob->flowCtrl = taosHashInit(pJob->taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
pJob->flowCtrl =
taosHashInit(pJob->taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (NULL == pJob->flowCtrl) {
SCH_JOB_ELOG("taosHashInit %d flowCtrl failed", pJob->taskNum);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
......@@ -73,17 +74,17 @@ int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) {
}
int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask) {
SSchLevel *pLevel = pTask->level;
SSchLevel *pLevel = pTask->level;
SSchFlowControl *ctrl = NULL;
int32_t code = 0;
SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode);
int32_t code = 0;
SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode);
ctrl = (SSchFlowControl *)taosHashGet(pJob->flowCtrl, ep, sizeof(SEp));
if (NULL == ctrl) {
SCH_TASK_ELOG("taosHashGet node from flowCtrl failed, fqdn:%s, port:%d", ep->fqdn, ep->port);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
SCH_LOCK(SCH_WRITE, &ctrl->lock);
if (ctrl->execTaskNum <= 0) {
SCH_TASK_ELOG("taosHashGet node from flowCtrl failed, fqdn:%s, port:%d", ep->fqdn, ep->port);
......@@ -93,8 +94,8 @@ int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask) {
--ctrl->execTaskNum;
ctrl->tableNumSum -= pTask->plan->execNodeStat.tableNum;
SCH_TASK_DLOG("task quota removed, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d",
ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum);
SCH_TASK_DLOG("task quota removed, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", ep->fqdn,
ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum);
_return:
......@@ -104,11 +105,11 @@ _return:
}
int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) {
SSchLevel *pLevel = pTask->level;
int32_t code = 0;
SSchLevel *pLevel = pTask->level;
int32_t code = 0;
SSchFlowControl *ctrl = NULL;
SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode);
SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode);
do {
ctrl = (SSchFlowControl *)taosHashGet(pJob->flowCtrl, ep, sizeof(SEp));
if (NULL == ctrl) {
......@@ -119,34 +120,34 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) {
if (HASH_NODE_EXIST(code)) {
continue;
}
SCH_TASK_ELOG("taosHashPut flowCtrl failed, size:%d", (int32_t)sizeof(nctrl));
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SCH_TASK_DLOG("task quota added, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d",
ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, nctrl.tableNumSum, nctrl.execTaskNum);
SCH_TASK_DLOG("task quota added, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", ep->fqdn,
ep->port, pTask->plan->execNodeStat.tableNum, nctrl.tableNumSum, nctrl.execTaskNum);
*enough = true;
return TSDB_CODE_SUCCESS;
}
SCH_LOCK(SCH_WRITE, &ctrl->lock);
if (0 == ctrl->execTaskNum) {
ctrl->tableNumSum = pTask->plan->execNodeStat.tableNum;
++ctrl->execTaskNum;
*enough = true;
break;
}
int32_t sum = pTask->plan->execNodeStat.tableNum + ctrl->tableNumSum;
if (sum <= schMgmt.cfg.maxNodeTableNum) {
ctrl->tableNumSum = sum;
++ctrl->execTaskNum;
*enough = true;
break;
}
......@@ -166,24 +167,25 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) {
*enough = false;
ctrl->sorted = false;
break;
} while (true);
_return:
SCH_TASK_DLOG("task quota %s added, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d",
((*enough)?"":"NOT"), ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum);
SCH_TASK_DLOG("task quota %s added, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d",
((*enough) ? "" : "NOT"), ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum,
ctrl->execTaskNum);
SCH_UNLOCK(SCH_WRITE, &ctrl->lock);
SCH_RET(code);
}
int32_t schTaskTableNumCompare(const void* key1, const void* key2) {
int32_t schTaskTableNumCompare(const void *key1, const void *key2) {
SSchTask *pTask1 = *(SSchTask **)key1;
SSchTask *pTask2 = *(SSchTask **)key2;
if (pTask1->plan->execNodeStat.tableNum < pTask2->plan->execNodeStat.tableNum) {
return 1;
} else if (pTask1->plan->execNodeStat.tableNum > pTask2->plan->execNodeStat.tableNum) {
......@@ -193,22 +195,21 @@ int32_t schTaskTableNumCompare(const void* key1, const void* key2) {
}
}
int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) {
SCH_LOCK(SCH_WRITE, &ctrl->lock);
if (NULL == ctrl->taskList || taosArrayGetSize(ctrl->taskList) <= 0) {
SCH_UNLOCK(SCH_WRITE, &ctrl->lock);
return TSDB_CODE_SUCCESS;
}
int32_t remainNum = schMgmt.cfg.maxNodeTableNum - ctrl->tableNumSum;
int32_t taskNum = taosArrayGetSize(ctrl->taskList);
int32_t code = 0;
int32_t remainNum = schMgmt.cfg.maxNodeTableNum - ctrl->tableNumSum;
int32_t taskNum = taosArrayGetSize(ctrl->taskList);
int32_t code = 0;
SSchTask *pTask = NULL;
if (taskNum > 1 && !ctrl->sorted) {
taosArraySort(ctrl->taskList, schTaskTableNumCompare); // desc order
taosArraySort(ctrl->taskList, schTaskTableNumCompare); // desc order
}
for (int32_t i = 0; i < taskNum; ++i) {
......@@ -216,36 +217,36 @@ int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) {
SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode);
if (pTask->plan->execNodeStat.tableNum > remainNum && ctrl->execTaskNum > 0) {
SCH_TASK_DLOG("task NOT to launch, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d",
ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum);
SCH_TASK_DLOG("task NOT to launch, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", ep->fqdn,
ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum);
continue;
}
ctrl->tableNumSum += pTask->plan->execNodeStat.tableNum;
++ctrl->execTaskNum;
taosArrayRemove(ctrl->taskList, i);
SCH_TASK_DLOG("task to launch, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d",
ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum);
SCH_TASK_DLOG("task to launch, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", ep->fqdn,
ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum);
SCH_ERR_JRET(schAsyncLaunchTaskImpl(pJob, pTask));
remainNum -= pTask->plan->execNodeStat.tableNum;
if (remainNum <= 0) {
SCH_TASK_DLOG("no more task to launch, fqdn:%s, port:%d, remainNum:%d, remainExecTaskNum:%d",
ep->fqdn, ep->port, ctrl->tableNumSum, ctrl->execTaskNum);
SCH_TASK_DLOG("no more task to launch, fqdn:%s, port:%d, remainNum:%d, remainExecTaskNum:%d", ep->fqdn, ep->port,
ctrl->tableNumSum, ctrl->execTaskNum);
break;
}
if (i < (taskNum - 1)) {
SSchTask *pLastTask = *(SSchTask **)taosArrayGetLast(ctrl->taskList);
if (remainNum < pLastTask->plan->execNodeStat.tableNum) {
SCH_TASK_DLOG("no more task to launch, fqdn:%s, port:%d, remainNum:%d, remainExecTaskNum:%d, smallestInList:%d",
ep->fqdn, ep->port, ctrl->tableNumSum, ctrl->execTaskNum, pLastTask->plan->execNodeStat.tableNum);
SCH_TASK_DLOG("no more task to launch, fqdn:%s, port:%d, remainNum:%d, remainExecTaskNum:%d, smallestInList:%d",
ep->fqdn, ep->port, ctrl->tableNumSum, ctrl->execTaskNum, pLastTask->plan->execNodeStat.tableNum);
break;
}
}
......@@ -253,7 +254,7 @@ int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) {
--i;
--taskNum;
}
_return:
SCH_UNLOCK(SCH_WRITE, &ctrl->lock);
......@@ -261,11 +262,10 @@ _return:
if (code) {
code = schProcessOnTaskFailure(pJob, pTask, code);
}
SCH_RET(code);
}
int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask) {
if (!SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
return TSDB_CODE_SUCCESS;
......@@ -274,17 +274,16 @@ int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask) {
SCH_ERR_RET(schDecTaskFlowQuota(pJob, pTask));
SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode);
SSchFlowControl *ctrl = (SSchFlowControl *)taosHashGet(pJob->flowCtrl, ep, sizeof(SEp));
if (NULL == ctrl) {
SCH_TASK_ELOG("taosHashGet node from flowCtrl failed, fqdn:%s, port:%d", ep->fqdn, ep->port);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
int32_t code = schLaunchTasksInFlowCtrlListImpl(pJob, ctrl);;
SCH_ERR_RET(code);
return code; // to avoid compiler error
}
int32_t code = schLaunchTasksInFlowCtrlListImpl(pJob, ctrl);
;
SCH_ERR_RET(code);
return code; // to avoid compiler error
}
......@@ -52,9 +52,8 @@ _return:
bool schJobDone(SSchJob *pJob) {
int8_t status = SCH_GET_JOB_STATUS(pJob);
return (status == JOB_TASK_STATUS_FAIL || status == JOB_TASK_STATUS_DROP ||
status == JOB_TASK_STATUS_SUCC);
return (status == JOB_TASK_STATUS_FAIL || status == JOB_TASK_STATUS_DROP || status == JOB_TASK_STATUS_SUCC);
}
FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) {
......@@ -221,7 +220,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SCH_TASK_DLOG("parents info, the %d parent TID 0x%" PRIx64, n, (*parentTask)->taskId);
SCH_TASK_DLOG("parents info, the %d parent TID 0x%" PRIx64, n, (*parentTask)->taskId);
}
SCH_TASK_DLOG("level:%d, parentNum:%d, childNum:%d", i, parentNum, childNum);
......@@ -235,7 +234,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
SSchTask* pTask = taosArrayGet(pLevel->subTasks, 0);
SSchTask *pTask = taosArrayGet(pLevel->subTasks, 0);
if (SUBPLAN_TYPE_MODIFY != pTask->plan->subplanType) {
pJob->attr.needFetch = true;
}
......@@ -244,7 +243,6 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
return TSDB_CODE_SUCCESS;
}
int32_t schAppendJobDataSrc(SSchJob *pJob, SSchTask *pTask) {
if (!SCH_IS_DATA_BIND_QRY_TASK(pTask)) {
return TSDB_CODE_SUCCESS;
......@@ -255,7 +253,6 @@ int32_t schAppendJobDataSrc(SSchJob *pJob, SSchTask *pTask) {
return TSDB_CODE_SUCCESS;
}
int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
int32_t code = 0;
pJob->queryId = pDag->queryId;
......@@ -365,7 +362,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
SCH_ERR_JRET(schBuildTaskRalation(pJob, planToTask));
_return:
if (planToTask) {
taosHashCleanup(planToTask);
}
......@@ -373,8 +370,7 @@ _return:
SCH_RET(code);
}
int32_t schDumpJobExecRes(SSchJob* pJob, SExecResult* pRes) {
int32_t schDumpJobExecRes(SSchJob *pJob, SExecResult *pRes) {
pRes->code = atomic_load_32(&pJob->errCode);
pRes->numOfRows = pJob->resNumOfRows;
pRes->res = pJob->execRes.res;
......@@ -387,13 +383,13 @@ int32_t schDumpJobExecRes(SSchJob* pJob, SExecResult* pRes) {
return TSDB_CODE_SUCCESS;
}
int32_t schDumpJobFetchRes(SSchJob* pJob, void** pData) {
int32_t schDumpJobFetchRes(SSchJob *pJob, void **pData) {
int32_t code = 0;
SCH_LOCK(SCH_WRITE, &pJob->resLock);
pJob->fetched = true;
if (pJob->fetchRes && ((SRetrieveTableRsp *)pJob->fetchRes)->completed) {
SCH_ERR_JRET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_SUCC, NULL));
}
......@@ -422,12 +418,12 @@ int32_t schDumpJobFetchRes(SSchJob* pJob, void** pData) {
_return:
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
return code;
}
int32_t schNotifyUserExecRes(SSchJob* pJob) {
SExecResult* pRes = taosMemoryCalloc(1, sizeof(SExecResult));
int32_t schNotifyUserExecRes(SSchJob *pJob) {
SExecResult *pRes = taosMemoryCalloc(1, sizeof(SExecResult));
if (pRes) {
schDumpJobExecRes(pJob, pRes);
}
......@@ -439,9 +435,9 @@ int32_t schNotifyUserExecRes(SSchJob* pJob) {
return TSDB_CODE_SUCCESS;
}
int32_t schNotifyUserFetchRes(SSchJob* pJob) {
void* pRes = NULL;
int32_t schNotifyUserFetchRes(SSchJob *pJob) {
void *pRes = NULL;
schDumpJobFetchRes(pJob, &pRes);
SCH_JOB_DLOG("sch start to invoke fetch cb, code: %s", tstrerror(pJob->errCode));
......@@ -453,17 +449,17 @@ int32_t schNotifyUserFetchRes(SSchJob* pJob) {
void schPostJobRes(SSchJob *pJob, SCH_OP_TYPE op) {
SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
if (SCH_OP_NULL == pJob->opStatus.op) {
SCH_JOB_DLOG("job not in any operation, no need to post job res, status:%s", jobTaskStatusStr(pJob->status));
goto _return;
}
if (op && pJob->opStatus.op != op) {
SCH_JOB_ELOG("job in operation %s mis-match with expected %s", schGetOpStr(pJob->opStatus.op), schGetOpStr(op));
goto _return;
}
if (SCH_JOB_IN_SYNC_OP(pJob)) {
SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
tsem_post(&pJob->rspSem);
......@@ -487,7 +483,7 @@ _return:
int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) {
schUpdateJobErrCode(pJob, errCode);
int32_t code = atomic_load_32(&pJob->errCode);
if (code) {
SCH_JOB_DLOG("job failed with error %s", tstrerror(code));
......@@ -507,9 +503,7 @@ int32_t schHandleJobFailure(SSchJob *pJob, int32_t errCode) {
return TSDB_CODE_SCH_IGNORE_ERROR;
}
int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode) {
SCH_RET(schProcessOnJobFailure(pJob, errCode));
}
int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode) { SCH_RET(schProcessOnJobFailure(pJob, errCode)); }
int32_t schHandleJobDrop(SSchJob *pJob, int32_t errCode) {
if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
......@@ -520,8 +514,7 @@ int32_t schHandleJobDrop(SSchJob *pJob, int32_t errCode) {
return TSDB_CODE_SCH_IGNORE_ERROR;
}
int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) {
int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) {
if (schChkCurrentOp(pJob, SCH_OP_FETCH, -1)) {
SCH_ERR_RET(schLaunchFetchTask(pJob));
} else {
......@@ -531,9 +524,7 @@ int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) {
return TSDB_CODE_SUCCESS;
}
void schProcessOnDataFetched(SSchJob *pJob) {
schPostJobRes(pJob, SCH_OP_FETCH);
}
void schProcessOnDataFetched(SSchJob *pJob) { schPostJobRes(pJob, SCH_OP_FETCH); }
int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp) {
SCH_TASK_DLOG("got explain rsp, rows:%d, complete:%d", htonl(pRsp->numOfRows), pRsp->completed);
......@@ -548,14 +539,13 @@ int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRs
return TSDB_CODE_SUCCESS;
}
int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask) {
if (!SCH_IS_QUERY_JOB(pJob)) {
return TSDB_CODE_SUCCESS;
}
SSchLevel *pLevel = pTask->level;
int32_t doneNum = atomic_add_fetch_32(&pLevel->taskDoneNum, 1);
int32_t doneNum = atomic_add_fetch_32(&pLevel->taskDoneNum, 1);
if (doneNum == pLevel->taskNum) {
pJob->levelIdx--;
......@@ -566,7 +556,7 @@ int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask) {
if (pTask->children && taosArrayGetSize(pTask->children) > 0) {
continue;
}
SCH_ERR_RET(schLaunchTask(pJob, pTask));
}
}
......@@ -577,11 +567,11 @@ int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask) {
int32_t schSaveJobExecRes(SSchJob *pJob, SQueryTableRsp *rsp) {
if (rsp->tbFName[0]) {
SCH_LOCK(SCH_WRITE, &pJob->resLock);
if (NULL == pJob->execRes.res) {
pJob->execRes.res = taosArrayInit(pJob->taskNum, sizeof(STbVerInfo));
if (NULL == pJob->execRes.res) {
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
}
......@@ -610,7 +600,6 @@ int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask) {
return TSDB_CODE_SUCCESS;
}
int32_t schLaunchJob(SSchJob *pJob) {
if (EXPLAIN_MODE_STATIC == pJob->attr.explainMode) {
SCH_ERR_RET(qExecStaticExplain(pJob->pDag, (SRetrieveTableRsp **)&pJob->fetchRes));
......@@ -623,11 +612,10 @@ int32_t schLaunchJob(SSchJob *pJob) {
return TSDB_CODE_SUCCESS;
}
void schDropJobAllTasks(SSchJob *pJob) {
schDropTaskInHashList(pJob, pJob->execTasks);
// schDropTaskInHashList(pJob, pJob->succTasks);
// schDropTaskInHashList(pJob, pJob->failTasks);
// schDropTaskInHashList(pJob, pJob->succTasks);
// schDropTaskInHashList(pJob, pJob->failTasks);
}
void schFreeJobImpl(void *job) {
......@@ -659,10 +647,10 @@ void schFreeJobImpl(void *job) {
schFreeFlowCtrl(pJob);
taosHashCleanup(pJob->execTasks);
// taosHashCleanup(pJob->failTasks);
// taosHashCleanup(pJob->succTasks);
// taosHashCleanup(pJob->failTasks);
// taosHashCleanup(pJob->succTasks);
taosHashCleanup(pJob->taskList);
taosArrayDestroy(pJob->levels);
taosArrayDestroy(pJob->nodeList);
taosArrayDestroy(pJob->dataSrcTasks);
......@@ -688,19 +676,19 @@ void schFreeJobImpl(void *job) {
}
int32_t schJobFetchRows(SSchJob *pJob) {
int32_t code = 0;
int32_t code = 0;
if (!(pJob->attr.explainMode == EXPLAIN_MODE_STATIC)) {
SCH_ERR_RET(schLaunchFetchTask(pJob));
if (schChkCurrentOp(pJob, SCH_OP_FETCH, true)) {
SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
tsem_wait(&pJob->rspSem);
SCH_RET(schDumpJobFetchRes(pJob, pJob->userRes.fetchRes));
SCH_RET(schDumpJobFetchRes(pJob, pJob->userRes.fetchRes));
}
} else {
if (schChkCurrentOp(pJob, SCH_OP_FETCH, true)) {
SCH_RET(schDumpJobFetchRes(pJob, pJob->userRes.fetchRes));
SCH_RET(schDumpJobFetchRes(pJob, pJob->userRes.fetchRes));
} else {
schPostJobRes(pJob, SCH_OP_FETCH);
}
......@@ -736,9 +724,9 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) {
} else {
pJob->nodeList = taosArrayDup(pReq->pNodeList);
}
pJob->taskList =
taosHashInit(pReq->pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
pJob->taskList = taosHashInit(pReq->pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false,
HASH_ENTRY_LOCK);
if (NULL == pJob->taskList) {
SCH_JOB_ELOG("taosHashInit %d taskList failed", pReq->pDag->numOfSubplans);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
......@@ -750,8 +738,8 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) {
SCH_ERR_JRET(qExecExplainBegin(pReq->pDag, &pJob->explainCtx, pReq->startTs));
}
pJob->execTasks =
taosHashInit(pReq->pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
pJob->execTasks = taosHashInit(pReq->pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false,
HASH_ENTRY_LOCK);
if (NULL == pJob->execTasks) {
SCH_JOB_ELOG("taosHashInit %d execTasks failed", pReq->pDag->numOfSubplans);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
......@@ -769,7 +757,7 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) {
*pJobId = pJob->refId;
SCH_JOB_DLOG("job refId:0x%" PRIx64" created", pJob->refId);
SCH_JOB_DLOG("job refId:0x%" PRIx64 " created", pJob->refId);
return TSDB_CODE_SUCCESS;
......@@ -782,31 +770,31 @@ _return:
} else {
taosRemoveRef(schMgmt.jobRef, pJob->refId);
}
SCH_RET(code);
}
int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq) {
int32_t code = 0;
qDebug("QID:0x%" PRIx64 " sch job refId 0x%"PRIx64 " started", pReq->pDag->queryId, pJob->refId);
qDebug("QID:0x%" PRIx64 " sch job refId 0x%" PRIx64 " started", pReq->pDag->queryId, pJob->refId);
SCH_ERR_RET(schLaunchJob(pJob));
if (pReq->syncReq) {
SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
tsem_wait(&pJob->rspSem);
}
SCH_JOB_DLOG("job exec done, job status:%s, jobId:0x%" PRIx64, SCH_GET_JOB_STATUS_STR(pJob), pJob->refId);
return TSDB_CODE_SUCCESS;
}
void schDirectPostJobRes(SSchedulerReq* pReq, int32_t errCode) {
void schDirectPostJobRes(SSchedulerReq *pReq, int32_t errCode) {
if (NULL == pReq || pReq->syncReq) {
return;
}
if (pReq->execFp) {
(*pReq->execFp)(NULL, pReq->cbParam, errCode);
} else if (pReq->fetchFp) {
......@@ -827,16 +815,17 @@ bool schChkCurrentOp(SSchJob *pJob, int32_t op, int8_t sync) {
return r;
}
void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int32_t errCode) {
void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq, int32_t errCode) {
int32_t op = 0;
switch (type) {
case SCH_OP_EXEC:
if (pReq && pReq->syncReq) {
SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL);
if (SCH_OP_NULL == op || op != type) {
SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op), jobTaskStatusStr(pJob->status));
SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op),
jobTaskStatusStr(pJob->status));
}
SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
schDumpJobExecRes(pJob, pReq->pExecRes);
......@@ -847,7 +836,8 @@ void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int
SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL);
if (SCH_OP_NULL == op || op != type) {
SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op), jobTaskStatusStr(pJob->status));
SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op),
jobTaskStatusStr(pJob->status));
}
SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
}
......@@ -866,10 +856,10 @@ void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int
SCH_JOB_DLOG("job end %s operation with code %s", schGetOpStr(type), tstrerror(errCode));
}
int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq) {
int32_t schProcessOnOpBegin(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq) {
int32_t code = 0;
int8_t status = SCH_GET_JOB_STATUS(pJob);
int8_t status = SCH_GET_JOB_STATUS(pJob);
switch (type) {
case SCH_OP_EXEC:
SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
......@@ -879,9 +869,9 @@ int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq
schDirectPostJobRes(pReq, TSDB_CODE_TSC_APP_ERROR);
SCH_ERR_RET(TSDB_CODE_TSC_APP_ERROR);
}
SCH_JOB_DLOG("job start %s operation", schGetOpStr(pJob->opStatus.op));
pJob->opStatus.syncReq = pReq->syncReq;
SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
break;
......@@ -893,16 +883,16 @@ int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq
schDirectPostJobRes(pReq, TSDB_CODE_TSC_APP_ERROR);
SCH_ERR_RET(TSDB_CODE_TSC_APP_ERROR);
}
SCH_JOB_DLOG("job start %s operation", schGetOpStr(pJob->opStatus.op));
pJob->userRes.fetchRes = pReq->pFetchRes;
pJob->userRes.fetchFp = pReq->fetchFp;
pJob->userRes.cbParam = pReq->cbParam;
pJob->opStatus.syncReq = pReq->syncReq;
SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
if (!SCH_JOB_NEED_FETCH(pJob)) {
SCH_JOB_ELOG("no need to fetch data, status:%s", SCH_GET_JOB_STATUS_STR(pJob));
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
......@@ -912,7 +902,7 @@ int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq
SCH_JOB_ELOG("job status error for fetch, status:%s", jobTaskStatusStr(status));
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
break;
case SCH_OP_GET_STATUS:
if (pJob->status < JOB_TASK_STATUS_INIT || pJob->levelNum <= 0 || NULL == pJob->levels) {
......@@ -941,23 +931,23 @@ void schProcessOnCbEnd(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
if (errCode) {
schHandleJobFailure(pJob, errCode);
}
if (pJob) {
schReleaseJob(pJob->refId);
}
}
int32_t schProcessOnCbBegin(SSchJob** job, SSchTask** task, uint64_t qId, int64_t rId, uint64_t tId) {
int32_t schProcessOnCbBegin(SSchJob **job, SSchTask **task, uint64_t qId, int64_t rId, uint64_t tId) {
int32_t code = 0;
int8_t status = 0;
int8_t status = 0;
SSchTask *pTask = NULL;
SSchJob *pJob = schAcquireJob(rId);
SSchJob *pJob = schAcquireJob(rId);
if (NULL == pJob) {
qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "job no exist, may be dropped, refId:0x%" PRIx64, qId, tId, rId);
SCH_ERR_RET(TSDB_CODE_QRY_JOB_NOT_EXIST);
}
if (schJobNeedToStop(pJob, &status)) {
SCH_TASK_DLOG("will not do further processing cause of job status %s", jobTaskStatusStr(status));
SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
......@@ -980,9 +970,6 @@ _return:
if (pJob) {
schReleaseJob(rId);
}
SCH_RET(code);
}
......@@ -29,25 +29,25 @@ int32_t schSwitchJobStatus(SSchJob* pJob, int32_t status, void* param) {
case JOB_TASK_STATUS_INIT:
break;
case JOB_TASK_STATUS_EXEC:
SCH_ERR_JRET(schExecJob(pJob, (SSchedulerReq*)param));
SCH_ERR_JRET(schExecJob(pJob, (SSchedulerReq*)param));
break;
case JOB_TASK_STATUS_PART_SUCC:
SCH_ERR_JRET(schProcessOnJobPartialSuccess(pJob));
break;
case JOB_TASK_STATUS_SUCC:
break;
case JOB_TASK_STATUS_FAIL:
case JOB_TASK_STATUS_FAIL:
SCH_RET(schProcessOnJobFailure(pJob, (param ? *(int32_t*)param : 0)));
break;
case JOB_TASK_STATUS_DROP:
schProcessOnJobDropped(pJob, *(int32_t*)param);
if (taosRemoveRef(schMgmt.jobRef, pJob->refId)) {
SCH_JOB_ELOG("remove job from job list failed, refId:0x%" PRIx64, pJob->refId);
} else {
SCH_JOB_DLOG("job removed from jobRef list, refId:0x%" PRIx64, pJob->refId);
}
break;
break;
default: {
SCH_JOB_ELOG("unknown job status %d", status);
SCH_RET(TSDB_CODE_SCH_STATUS_ERROR);
......@@ -62,7 +62,7 @@ _return:
}
int32_t schHandleOpBeginEvent(int64_t jobId, SSchJob** job, SCH_OP_TYPE type, SSchedulerReq* pReq) {
SSchJob *pJob = schAcquireJob(jobId);
SSchJob* pJob = schAcquireJob(jobId);
if (NULL == pJob) {
qWarn("Acquire sch job failed, may be dropped, jobId:0x%" PRIx64, jobId);
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
......@@ -75,12 +75,12 @@ int32_t schHandleOpBeginEvent(int64_t jobId, SSchJob** job, SCH_OP_TYPE type, SS
int32_t schHandleOpEndEvent(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int32_t errCode) {
int32_t code = errCode;
if (NULL == pJob) {
schDirectPostJobRes(pReq, errCode);
SCH_RET(code);
}
schProcessOnOpEnd(pJob, type, pReq, errCode);
if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
......@@ -91,5 +91,3 @@ int32_t schHandleOpEndEvent(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq
return code;
}
......@@ -16,12 +16,12 @@
#include "catalog.h"
#include "command.h"
#include "query.h"
#include "qworker.h"
#include "schInt.h"
#include "tglobal.h"
#include "tmsg.h"
#include "tref.h"
#include "trpc.h"
#include "qworker.h"
#include "tglobal.h"
void schFreeTask(SSchJob *pJob, SSchTask *pTask) {
schDeregisterTaskHb(pJob, pTask);
......@@ -94,7 +94,7 @@ int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) {
if (SCH_IS_LOCAL_EXEC_TASK(pJob, pTask)) {
return TSDB_CODE_SUCCESS;
}
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
if (NULL == addr) {
SCH_TASK_ELOG("taosArrayGet candidate addr failed, idx:%d, size:%d", pTask->candidateIdx,
......@@ -162,14 +162,15 @@ int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, v
if (dropExecNode) {
SCH_RET(schDropTaskExecNode(pJob, pTask, handle, execId));
}
schUpdateTaskExecNode(pJob, pTask, handle, execId);
if ((execId != pTask->execId) || pTask->waitRetry) { // ignore it
SCH_TASK_DLOG("handle not updated since execId %d is already not current execId %d, waitRetry %d", execId, pTask->execId, pTask->waitRetry);
SCH_TASK_DLOG("handle not updated since execId %d is already not current execId %d, waitRetry %d", execId,
pTask->execId, pTask->waitRetry);
SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
}
SCH_SET_TASK_HANDLE(pTask, handle);
return TSDB_CODE_SUCCESS;
......@@ -837,17 +838,18 @@ int32_t schHandleExplainRes(SArray *pExplainRes) {
goto _return;
}
SSchTask *pTask = NULL;
SSchJob *pJob = NULL;
SSchTask *pTask = NULL;
SSchJob *pJob = NULL;
for (int32_t i = 0; i < resNum; ++i) {
SExplainLocalRsp* localRsp = taosArrayGet(pExplainRes, i);
SExplainLocalRsp *localRsp = taosArrayGet(pExplainRes, i);
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ", begin to handle LOCAL explain rsp msg", localRsp->qId, localRsp->tId);
pJob = schAcquireJob(localRsp->rId);
if (NULL == pJob) {
qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "job no exist, may be dropped, refId:0x%" PRIx64, localRsp->qId, localRsp->tId, localRsp->rId);
qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "job no exist, may be dropped, refId:0x%" PRIx64, localRsp->qId,
localRsp->tId, localRsp->rId);
SCH_ERR_JRET(TSDB_CODE_QRY_JOB_NOT_EXIST);
}
......@@ -857,16 +859,17 @@ int32_t schHandleExplainRes(SArray *pExplainRes) {
schReleaseJob(pJob->refId);
SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
}
code = schGetTaskInJob(pJob, localRsp->tId, &pTask);
if (TSDB_CODE_SUCCESS == code) {
code = schProcessExplainRsp(pJob, pTask, &localRsp->rsp);
}
schReleaseJob(pJob->refId);
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ", end to handle LOCAL explain rsp msg, code:%x", localRsp->qId, localRsp->tId, code);
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ", end to handle LOCAL explain rsp msg, code:%x", localRsp->qId,
localRsp->tId, code);
SCH_ERR_JRET(code);
......@@ -879,7 +882,7 @@ int32_t schHandleExplainRes(SArray *pExplainRes) {
_return:
for (int32_t i = 0; i < resNum; ++i) {
SExplainLocalRsp* localRsp = taosArrayGet(pExplainRes, i);
SExplainLocalRsp *localRsp = taosArrayGet(pExplainRes, i);
tFreeSExplainRsp(&localRsp->rsp);
}
......@@ -890,7 +893,7 @@ _return:
int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) {
SSubplan *plan = pTask->plan;
int32_t code = 0;
int32_t code = 0;
if (NULL == pTask->msg) { // TODO add more detailed reason for failure
code = qSubPlanToMsg(plan, &pTask->msg, &pTask->msgLen);
......@@ -899,7 +902,7 @@ int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) {
pTask->msgLen);
SCH_ERR_RET(code);
} else if (tsQueryPlannerTrace) {
char *msg = NULL;
char *msg = NULL;
int32_t msgLen = 0;
qSubPlanToString(plan, &msg, &msgLen);
SCH_TASK_DLOGL("physical plan len:%d, %s", msgLen, msg);
......@@ -912,18 +915,18 @@ int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) {
if (SCH_IS_QUERY_JOB(pJob)) {
SCH_ERR_RET(schEnsureHbConnection(pJob, pTask));
}
SCH_RET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType));
}
int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) {
//SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask));
// SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask));
if (NULL == schMgmt.queryMgmt) {
SCH_ERR_RET(qWorkerInit(NODE_TYPE_CLIENT, CLIENT_HANDLE, (void **)&schMgmt.queryMgmt, NULL));
}
SArray *explainRes = NULL;
SQWMsg qwMsg = {0};
SQWMsg qwMsg = {0};
qwMsg.msgInfo.taskType = TASK_TYPE_TEMP;
qwMsg.msgInfo.explain = SCH_IS_EXPLAIN_JOB(pJob);
qwMsg.msgInfo.needFetch = SCH_TASK_NEED_FETCH(pTask);
......@@ -934,8 +937,9 @@ int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) {
if (SCH_IS_EXPLAIN_JOB(pJob)) {
explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
}
SCH_ERR_RET(qWorkerProcessLocalQuery(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId, pTask->execId, &qwMsg, explainRes));
SCH_ERR_RET(qWorkerProcessLocalQuery(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId,
pTask->execId, &qwMsg, explainRes));
if (SCH_IS_EXPLAIN_JOB(pJob)) {
SCH_ERR_RET(schHandleExplainRes(explainRes));
......@@ -958,17 +962,17 @@ int32_t schLaunchTaskImpl(void *param) {
if (pCtx->asyncLaunch) {
SCH_LOCK_TASK(pTask);
}
int8_t status = 0;
int32_t code = 0;
int8_t status = 0;
int32_t code = 0;
atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1);
pTask->execId++;
pTask->retryTimes++;
pTask->waitRetry = false;
SCH_TASK_DLOG("start to launch %s task, execId %d, retry %d", SCH_IS_LOCAL_EXEC_TASK(pJob, pTask) ? "LOCAL" : "REMOTE",
pTask->execId, pTask->retryTimes);
SCH_TASK_DLOG("start to launch %s task, execId %d, retry %d",
SCH_IS_LOCAL_EXEC_TASK(pJob, pTask) ? "LOCAL" : "REMOTE", pTask->execId, pTask->retryTimes);
SCH_LOG_TASK_START_TS(pTask);
......@@ -1086,19 +1090,20 @@ int32_t schExecRemoteFetch(SSchJob *pJob, SSchTask *pTask) {
}
int32_t schExecLocalFetch(SSchJob *pJob, SSchTask *pTask) {
void *pRsp = NULL;
void *pRsp = NULL;
SArray *explainRes = NULL;
if (SCH_IS_EXPLAIN_JOB(pJob)) {
explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
}
SCH_ERR_RET(qWorkerProcessLocalFetch(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId, pTask->execId, &pRsp, explainRes));
SCH_ERR_RET(qWorkerProcessLocalFetch(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId,
pTask->execId, &pRsp, explainRes));
if (SCH_IS_EXPLAIN_JOB(pJob)) {
SCH_ERR_RET(schHandleExplainRes(explainRes));
}
SCH_ERR_RET(schProcessFetchRsp(pJob, pTask, pRsp, TSDB_CODE_SUCCESS));
return TSDB_CODE_SUCCESS;
......
......@@ -21,21 +21,21 @@
#include "tref.h"
#include "trpc.h"
FORCE_INLINE SSchJob *schAcquireJob(int64_t refId) {
qDebug("sch acquire jobId:0x%"PRIx64, refId);
return (SSchJob *)taosAcquireRef(schMgmt.jobRef, refId);
FORCE_INLINE SSchJob *schAcquireJob(int64_t refId) {
qDebug("sch acquire jobId:0x%" PRIx64, refId);
return (SSchJob *)taosAcquireRef(schMgmt.jobRef, refId);
}
FORCE_INLINE int32_t schReleaseJob(int64_t refId) {
FORCE_INLINE int32_t schReleaseJob(int64_t refId) {
if (0 == refId) {
return TSDB_CODE_SUCCESS;
}
qDebug("sch release jobId:0x%"PRIx64, refId);
return taosReleaseRef(schMgmt.jobRef, refId);
qDebug("sch release jobId:0x%" PRIx64, refId);
return taosReleaseRef(schMgmt.jobRef, refId);
}
char* schGetOpStr(SCH_OP_TYPE type) {
char *schGetOpStr(SCH_OP_TYPE type) {
switch (type) {
case SCH_OP_NULL:
return "NULL";
......@@ -53,28 +53,28 @@ char* schGetOpStr(SCH_OP_TYPE type) {
void schFreeHbTrans(SSchHbTrans *pTrans) {
rpcReleaseHandle(pTrans->trans.pHandle, TAOS_CONN_CLIENT);
schFreeRpcCtx(&pTrans->rpcCtx);
schFreeRpcCtx(&pTrans->rpcCtx);
}
void schCleanClusterHb(void* pTrans) {
void schCleanClusterHb(void *pTrans) {
SCH_LOCK(SCH_WRITE, &schMgmt.hbLock);
SSchHbTrans *hb = taosHashIterate(schMgmt.hbConnections, NULL);
while (hb) {
if (hb->trans.pTrans == pTrans) {
SQueryNodeEpId* pEpId = taosHashGetKey(hb, NULL);
SQueryNodeEpId *pEpId = taosHashGetKey(hb, NULL);
schFreeHbTrans(hb);
taosHashRemove(schMgmt.hbConnections, pEpId, sizeof(SQueryNodeEpId));
}
hb = taosHashIterate(schMgmt.hbConnections, hb);
}
SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock);
}
int32_t schRemoveHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *epId) {
int32_t code = 0;
int32_t code = 0;
SCH_LOCK(SCH_WRITE, &schMgmt.hbLock);
SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId));
......@@ -94,7 +94,6 @@ int32_t schRemoveHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *ep
return TSDB_CODE_SUCCESS;
}
int32_t schAddHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *epId, bool *exist) {
int32_t code = 0;
SSchHbTrans hb = {0};
......@@ -155,13 +154,13 @@ void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask) {
if (!pTask->registerdHb) {
return;
}
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
SQueryNodeEpId epId = {0};
epId.nodeId = addr->nodeId;
SEp* pEp = SCH_GET_CUR_EP(addr);
SEp *pEp = SCH_GET_CUR_EP(addr);
strcpy(epId.ep.fqdn, pEp->fqdn);
epId.ep.port = pEp->port;
......@@ -180,24 +179,22 @@ void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask) {
} else {
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
}
pTask->registerdHb = false;
}
int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) {
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
SQueryNodeEpId epId = {0};
epId.nodeId = addr->nodeId;
SEp* pEp = SCH_GET_CUR_EP(addr);
SEp *pEp = SCH_GET_CUR_EP(addr);
strcpy(epId.ep.fqdn, pEp->fqdn);
epId.ep.port = pEp->port;
SCH_ERR_RET(schRegisterHbConnection(pJob, pTask, &epId));
pTask->registerdHb = true;
return TSDB_CODE_SUCCESS;
......@@ -226,7 +223,6 @@ int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans) {
return TSDB_CODE_SUCCESS;
}
void schCloseJobRef(void) {
if (!atomic_load_8((int8_t *)&schMgmt.exit)) {
return;
......@@ -242,7 +238,7 @@ uint64_t schGenTaskId(void) { return atomic_add_fetch_64(&schMgmt.taskId, 1); }
uint64_t schGenUUID(void) {
static uint64_t hashId = 0;
static int32_t requestSerialId = 0;
static int32_t requestSerialId = 0;
if (hashId == 0) {
char uid[64] = {0};
......@@ -254,15 +250,14 @@ uint64_t schGenUUID(void) {
}
}
int64_t ts = taosGetTimestampMs();
uint64_t pid = taosGetPId();
int32_t val = atomic_add_fetch_32(&requestSerialId, 1);
int64_t ts = taosGetTimestampMs();
uint64_t pid = taosGetPId();
int32_t val = atomic_add_fetch_32(&requestSerialId, 1);
uint64_t id = ((hashId & 0x0FFF) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF);
return id;
}
void schFreeRpcCtxVal(const void *arg) {
if (NULL == arg) {
return;
......@@ -307,5 +302,3 @@ int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTas
return TSDB_CODE_SUCCESS;
}
......@@ -14,10 +14,10 @@
*/
#include "query.h"
#include "qworker.h"
#include "schInt.h"
#include "tmsg.h"
#include "tref.h"
#include "qworker.h"
SSchedulerMgmt schMgmt = {
.jobRef = -1,
......@@ -35,7 +35,7 @@ int32_t schedulerInit() {
schMgmt.cfg.enableReSchedule = true;
qDebug("schedule policy init to %d", schMgmt.cfg.schPolicy);
schMgmt.jobRef = taosOpenRef(schMgmt.cfg.maxJobNum, schFreeJobImpl);
if (schMgmt.jobRef < 0) {
qError("init schduler jobRef failed, num:%u", schMgmt.cfg.maxJobNum);
......@@ -61,7 +61,7 @@ int32_t schedulerInit() {
int32_t schedulerExecJob(SSchedulerReq *pReq, int64_t *pJobId) {
qDebug("scheduler %s exec job start", pReq->syncReq ? "SYNC" : "ASYNC");
int32_t code = 0;
int32_t code = 0;
SSchJob *pJob = NULL;
SCH_ERR_JRET(schInitJob(pJobId, pReq));
......@@ -73,7 +73,7 @@ int32_t schedulerExecJob(SSchedulerReq *pReq, int64_t *pJobId) {
SCH_ERR_JRET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_EXEC, pReq));
_return:
SCH_RET(schHandleOpEndEvent(pJob, SCH_OP_EXEC, pReq, code));
}
......@@ -144,7 +144,7 @@ int32_t schedulerEnableReSchedule(bool enableResche) {
return TSDB_CODE_SUCCESS;
}
void schedulerFreeJob(int64_t* jobId, int32_t errCode) {
void schedulerFreeJob(int64_t *jobId, int32_t errCode) {
if (0 == *jobId) {
return;
}
......@@ -158,7 +158,7 @@ void schedulerFreeJob(int64_t* jobId, int32_t errCode) {
SCH_JOB_DLOG("start to free job 0x%" PRIx64 ", errCode:0x%x", *jobId, errCode);
schHandleJobDrop(pJob, errCode);
schReleaseJob(*jobId);
*jobId = 0;
}
......
......@@ -100,8 +100,8 @@ void rpcCloseImpl(void* arg) {
taosMemoryFree(pRpc);
}
void* rpcMallocCont(int32_t contLen) {
int32_t size = contLen + TRANS_MSG_OVERHEAD;
void* rpcMallocCont(int64_t contLen) {
int64_t size = contLen + TRANS_MSG_OVERHEAD;
char* start = taosMemoryCalloc(1, size);
if (start == NULL) {
tError("failed to malloc msg, size:%d", size);
......@@ -120,11 +120,11 @@ void rpcFreeCont(void* cont) {
tTrace("rpc free cont:%p", (char*)cont - TRANS_MSG_OVERHEAD);
}
void* rpcReallocCont(void* ptr, int32_t contLen) {
void* rpcReallocCont(void* ptr, int64_t contLen) {
if (ptr == NULL) return rpcMallocCont(contLen);
char* st = (char*)ptr - TRANS_MSG_OVERHEAD;
int32_t sz = contLen + TRANS_MSG_OVERHEAD;
int64_t sz = contLen + TRANS_MSG_OVERHEAD;
st = taosMemoryRealloc(st, sz);
if (st == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......
......@@ -228,7 +228,7 @@ void taosPrintBackTrace() {
void taosPrintBackTrace() { return; }
#endif
void *taosMemoryMalloc(int32_t size) {
void *taosMemoryMalloc(int64_t size) {
#ifdef USE_TD_MEMORY
void *tmp = malloc(size + sizeof(TdMemoryInfo));
if (tmp == NULL) return NULL;
......@@ -244,7 +244,7 @@ void *taosMemoryMalloc(int32_t size) {
#endif
}
void *taosMemoryCalloc(int32_t num, int32_t size) {
void *taosMemoryCalloc(int64_t num, int64_t size) {
#ifdef USE_TD_MEMORY
int32_t memorySize = num * size;
char *tmp = calloc(memorySize + sizeof(TdMemoryInfo), 1);
......@@ -261,7 +261,7 @@ void *taosMemoryCalloc(int32_t num, int32_t size) {
#endif
}
void *taosMemoryRealloc(void *ptr, int32_t size) {
void *taosMemoryRealloc(void *ptr, int64_t size) {
#ifdef USE_TD_MEMORY
if (ptr == NULL) return taosMemoryMalloc(size);
......@@ -318,7 +318,7 @@ void taosMemoryFree(void *ptr) {
#endif
}
int32_t taosMemorySize(void *ptr) {
int64_t taosMemorySize(void *ptr) {
if (ptr == NULL) return 0;
#ifdef USE_TD_MEMORY
......
......@@ -352,6 +352,7 @@
./test.sh -f tsim/scalar/in.sim
./test.sh -f tsim/scalar/scalar.sim
./test.sh -f tsim/scalar/filter.sim
./test.sh -f tsim/scalar/caseWhen.sim
# ---- alter ----
./test.sh -f tsim/alter/cached_schema_after_alter.sim
......
此差异已折叠。
......@@ -12,25 +12,7 @@ FORMAT_DIR_LIST=(
"${PRJ_ROOT_DIR}/source/os"
"${PRJ_ROOT_DIR}/source/util"
"${PRJ_ROOT_DIR}/source/common"
"${PRJ_ROOT_DIR}/source/libs/cache"
"${PRJ_ROOT_DIR}/source/libs/catalog"
"${PRJ_ROOT_DIR}/source/libs/command"
"${PRJ_ROOT_DIR}/source/libs/executor"
"${PRJ_ROOT_DIR}/source/libs/function"
"${PRJ_ROOT_DIR}/source/libs/index"
"${PRJ_ROOT_DIR}/source/libs/monitor"
"${PRJ_ROOT_DIR}/source/libs/nodes"
# "${PRJ_ROOT_DIR}/source/libs/parser"
"${PRJ_ROOT_DIR}/source/libs/planner"
"${PRJ_ROOT_DIR}/source/libs/qcom"
"${PRJ_ROOT_DIR}/source/libs/qworker"
"${PRJ_ROOT_DIR}/source/libs/scalar"
"${PRJ_ROOT_DIR}/source/libs/stream"
"${PRJ_ROOT_DIR}/source/libs/sync"
"${PRJ_ROOT_DIR}/source/libs/tdb"
"${PRJ_ROOT_DIR}/source/libs/tfs"
"${PRJ_ROOT_DIR}/source/libs/transport"
"${PRJ_ROOT_DIR}/source/libs/wal"
"${PRJ_ROOT_DIR}/source/libs"
"${PRJ_ROOT_DIR}/source/client/inc"
"${PRJ_ROOT_DIR}/source/client/src"
"${PRJ_ROOT_DIR}/source/client/test"
......@@ -45,7 +27,7 @@ EXCLUDE_FILE_LIST=(
)
for d in ${FORMAT_DIR_LIST[@]}; do
for f in $(find $d -type f -regex '.*\.\(cpp\|hpp\|c\|h\)'); do
for f in $(find $d -type f -not -name '*sql.c' -regex '.*\.\(cpp\|hpp\|c\|h\)'); do
${FORMAT_BIN} -i $f
done
done
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册