“19b1d449de3a602ff1b9e7548cff3a955937fcc4”上不存在“source/dnode/mnode/src/mnodeOper.c”
提交 d16c70d0 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

Merge branch 'develop' of https://github.com/taosdata/TDengine into develop

...@@ -4,6 +4,9 @@ ...@@ -4,6 +4,9 @@
[submodule "src/connector/grafanaplugin"] [submodule "src/connector/grafanaplugin"]
path = src/connector/grafanaplugin path = src/connector/grafanaplugin
url = https://github.com/taosdata/grafanaplugin url = https://github.com/taosdata/grafanaplugin
[submodule "tests/examples/rust"]
path = tests/examples/rust
url = https://github.com/songtianyi/tdengine-rust-bindings.git
[submodule "src/connector/hivemq-tdengine-extension"] [submodule "src/connector/hivemq-tdengine-extension"]
path = src/connector/hivemq-tdengine-extension path = src/connector/hivemq-tdengine-extension
url = https://github.com/huskar-t/hivemq-tdengine-extension.git url = https://github.com/huskar-t/hivemq-tdengine-extension.git
\ No newline at end of file
...@@ -4,7 +4,7 @@ PROJECT(TDengine) ...@@ -4,7 +4,7 @@ PROJECT(TDengine)
IF (DEFINED VERNUMBER) IF (DEFINED VERNUMBER)
SET(TD_VER_NUMBER ${VERNUMBER}) SET(TD_VER_NUMBER ${VERNUMBER})
ELSE () ELSE ()
SET(TD_VER_NUMBER "2.0.6.0") SET(TD_VER_NUMBER "2.0.7.0")
ENDIF () ENDIF ()
IF (DEFINED VERCOMPATIBLE) IF (DEFINED VERCOMPATIBLE)
......
...@@ -48,7 +48,7 @@ cp ${compile_dir}/../packaging/deb/taosd ${pkg_dir}${install_home_pat ...@@ -48,7 +48,7 @@ cp ${compile_dir}/../packaging/deb/taosd ${pkg_dir}${install_home_pat
cp ${compile_dir}/../packaging/tools/post.sh ${pkg_dir}${install_home_path}/script cp ${compile_dir}/../packaging/tools/post.sh ${pkg_dir}${install_home_path}/script
cp ${compile_dir}/../packaging/tools/preun.sh ${pkg_dir}${install_home_path}/script cp ${compile_dir}/../packaging/tools/preun.sh ${pkg_dir}${install_home_path}/script
cp ${compile_dir}/build/bin/taosdemo ${pkg_dir}${install_home_path}/bin cp ${compile_dir}/build/bin/taosdemo ${pkg_dir}${install_home_path}/bin
#cp ${compile_dir}/build/bin/taosdump ${pkg_dir}${install_home_path}/bin cp ${compile_dir}/build/bin/taosdump ${pkg_dir}${install_home_path}/bin
cp ${compile_dir}/build/bin/taosd ${pkg_dir}${install_home_path}/bin cp ${compile_dir}/build/bin/taosd ${pkg_dir}${install_home_path}/bin
cp ${compile_dir}/build/bin/taos ${pkg_dir}${install_home_path}/bin cp ${compile_dir}/build/bin/taos ${pkg_dir}${install_home_path}/bin
cp ${compile_dir}/build/lib/${libfile} ${pkg_dir}${install_home_path}/driver cp ${compile_dir}/build/lib/${libfile} ${pkg_dir}${install_home_path}/driver
......
...@@ -58,7 +58,7 @@ cp %{_compiledir}/../packaging/tools/preun.sh %{buildroot}%{homepath}/scri ...@@ -58,7 +58,7 @@ cp %{_compiledir}/../packaging/tools/preun.sh %{buildroot}%{homepath}/scri
cp %{_compiledir}/build/bin/taos %{buildroot}%{homepath}/bin cp %{_compiledir}/build/bin/taos %{buildroot}%{homepath}/bin
cp %{_compiledir}/build/bin/taosd %{buildroot}%{homepath}/bin cp %{_compiledir}/build/bin/taosd %{buildroot}%{homepath}/bin
cp %{_compiledir}/build/bin/taosdemo %{buildroot}%{homepath}/bin cp %{_compiledir}/build/bin/taosdemo %{buildroot}%{homepath}/bin
#cp %{_compiledir}/build/bin/taosdump %{buildroot}%{homepath}/bin cp %{_compiledir}/build/bin/taosdump %{buildroot}%{homepath}/bin
cp %{_compiledir}/build/lib/${libfile} %{buildroot}%{homepath}/driver cp %{_compiledir}/build/lib/${libfile} %{buildroot}%{homepath}/driver
cp %{_compiledir}/../src/inc/taos.h %{buildroot}%{homepath}/include cp %{_compiledir}/../src/inc/taos.h %{buildroot}%{homepath}/include
cp %{_compiledir}/../src/inc/taoserror.h %{buildroot}%{homepath}/include cp %{_compiledir}/../src/inc/taoserror.h %{buildroot}%{homepath}/include
......
#!/bin/bash
#
log_dir=$1
result_file=$2
if [ ! -n "$1" ];then
echo "Pleas input the director of taosdlog."
echo "usage: ./get_client.sh <taosdlog directory> <result file>"
exit 1
else
log_dir=$1
fi
if [ ! -n "$2" ];then
result_file=clientInfo.txt
else
result_file=$2
fi
grep "new TCP connection" ${log_dir}/taosdlog.* | sed -e "s/0x.* from / /"|sed -e "s/,.*$//"|sed -e "s/:[0-9]*$//"|sort -r|uniq -f 2|sort -k 3 -r|uniq -f 2 > ${result_file}
...@@ -45,8 +45,7 @@ if [ "$osType" != "Darwin" ]; then ...@@ -45,8 +45,7 @@ if [ "$osType" != "Darwin" ]; then
strip ${build_dir}/bin/taos strip ${build_dir}/bin/taos
bin_files="${build_dir}/bin/taos ${script_dir}/remove_client.sh" bin_files="${build_dir}/bin/taos ${script_dir}/remove_client.sh"
else else
#bin_files="${build_dir}/bin/taos ${build_dir}/bin/taosdump ${build_dir}/bin/taosdemo ${script_dir}/remove_client.sh ${script_dir}/set_core.sh" bin_files="${build_dir}/bin/taos ${build_dir}/bin/taosdump ${build_dir}/bin/taosdemo ${script_dir}/remove_client.sh ${script_dir}/set_core.sh ${script_dir}/get_client.sh"
bin_files="${build_dir}/bin/taos ${build_dir}/bin/taosdemo ${script_dir}/remove_client.sh ${script_dir}/set_core.sh"
fi fi
lib_files="${build_dir}/lib/libtaos.so.${version}" lib_files="${build_dir}/lib/libtaos.so.${version}"
else else
......
...@@ -77,7 +77,9 @@ if [ "$osType" != "Darwin" ]; then ...@@ -77,7 +77,9 @@ if [ "$osType" != "Darwin" ]; then
cp ${build_dir}/bin/taos ${install_dir}/bin/power cp ${build_dir}/bin/taos ${install_dir}/bin/power
cp ${script_dir}/remove_power.sh ${install_dir}/bin cp ${script_dir}/remove_power.sh ${install_dir}/bin
cp ${build_dir}/bin/taosdemo ${install_dir}/bin/powerdemo cp ${build_dir}/bin/taosdemo ${install_dir}/bin/powerdemo
cp ${build_dir}/bin/taosdump ${install_dir}/bin/powerdump
cp ${script_dir}/set_core.sh ${install_dir}/bin cp ${script_dir}/set_core.sh ${install_dir}/bin
cp ${script_dir}/get_client.sh ${install_dir}/bin
fi fi
else else
cp ${bin_files} ${install_dir}/bin cp ${bin_files} ${install_dir}/bin
......
...@@ -36,8 +36,7 @@ if [ "$pagMode" == "lite" ]; then ...@@ -36,8 +36,7 @@ if [ "$pagMode" == "lite" ]; then
strip ${build_dir}/bin/taos strip ${build_dir}/bin/taos
bin_files="${build_dir}/bin/taosd ${build_dir}/bin/taos ${script_dir}/remove.sh" bin_files="${build_dir}/bin/taosd ${build_dir}/bin/taos ${script_dir}/remove.sh"
else else
#bin_files="${build_dir}/bin/taosd ${build_dir}/bin/taos ${build_dir}/bin/taosdump ${build_dir}/bin/taosdemo ${build_dir}/bin/tarbitrator ${script_dir}/remove.sh ${script_dir}/set_core.sh" bin_files="${build_dir}/bin/taosd ${build_dir}/bin/taos ${build_dir}/bin/taosdump ${build_dir}/bin/taosdemo ${build_dir}/bin/tarbitrator ${script_dir}/remove.sh ${script_dir}/set_core.sh ${script_dir}/get_client.sh"
bin_files="${build_dir}/bin/taosd ${build_dir}/bin/taos ${build_dir}/bin/taosdemo ${build_dir}/bin/tarbitrator ${script_dir}/remove.sh ${script_dir}/set_core.sh"
fi fi
lib_files="${build_dir}/lib/libtaos.so.${version}" lib_files="${build_dir}/lib/libtaos.so.${version}"
......
...@@ -77,8 +77,10 @@ else ...@@ -77,8 +77,10 @@ else
cp ${build_dir}/bin/taosd ${install_dir}/bin/powerd cp ${build_dir}/bin/taosd ${install_dir}/bin/powerd
cp ${script_dir}/remove_power.sh ${install_dir}/bin cp ${script_dir}/remove_power.sh ${install_dir}/bin
cp ${build_dir}/bin/taosdemo ${install_dir}/bin/powerdemo cp ${build_dir}/bin/taosdemo ${install_dir}/bin/powerdemo
cp ${build_dir}/bin/taosdump ${install_dir}/bin/powerdump
cp ${build_dir}/bin/tarbitrator ${install_dir}/bin cp ${build_dir}/bin/tarbitrator ${install_dir}/bin
cp ${script_dir}/set_core.sh ${install_dir}/bin cp ${script_dir}/set_core.sh ${install_dir}/bin
cp ${script_dir}/get_client.sh ${install_dir}/bin
fi fi
chmod a+x ${install_dir}/bin/* || : chmod a+x ${install_dir}/bin/* || :
......
name: tdengine name: tdengine
base: core18 base: core18
version: '2.0.6.0' version: '2.0.7.0'
icon: snap/gui/t-dengine.svg icon: snap/gui/t-dengine.svg
summary: an open-source big data platform designed and optimized for IoT. summary: an open-source big data platform designed and optimized for IoT.
description: | description: |
......
...@@ -62,6 +62,7 @@ typedef struct SLocalReducer { ...@@ -62,6 +62,7 @@ typedef struct SLocalReducer {
bool hasUnprocessedRow; bool hasUnprocessedRow;
tOrderDescriptor * pDesc; tOrderDescriptor * pDesc;
SColumnModel * resColModel; SColumnModel * resColModel;
SColumnModel* finalModel;
tExtMemBuffer ** pExtMemBuffer; // disk-based buffer tExtMemBuffer ** pExtMemBuffer; // disk-based buffer
SFillInfo* pFillInfo; // interpolation support structure SFillInfo* pFillInfo; // interpolation support structure
char* pFinalRes; // result data after interpo char* pFinalRes; // result data after interpo
...@@ -74,7 +75,8 @@ typedef struct SLocalReducer { ...@@ -74,7 +75,8 @@ typedef struct SLocalReducer {
typedef struct SRetrieveSupport { typedef struct SRetrieveSupport {
tExtMemBuffer ** pExtMemBuffer; // for build loser tree tExtMemBuffer ** pExtMemBuffer; // for build loser tree
tOrderDescriptor *pOrderDescriptor; tOrderDescriptor *pOrderDescriptor;
SColumnModel * pFinalColModel; // colModel for final result SColumnModel* pFinalColModel; // colModel for final result
SColumnModel* pFFColModel;
int32_t subqueryIndex; // index of current vnode in vnode list int32_t subqueryIndex; // index of current vnode in vnode list
SSqlObj * pParentSql; SSqlObj * pParentSql;
tFilePage * localBuffer; // temp buffer, there is a buffer for each vnode to tFilePage * localBuffer; // temp buffer, there is a buffer for each vnode to
...@@ -96,7 +98,7 @@ int32_t tscFlushTmpBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tF ...@@ -96,7 +98,7 @@ int32_t tscFlushTmpBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tF
* create local reducer to launch the second-stage reduce process at client site * create local reducer to launch the second-stage reduce process at client site
*/ */
void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc, void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc,
SColumnModel *finalModel, SSqlObj* pSql); SColumnModel *finalModel, SColumnModel *pFFModel, SSqlObj* pSql);
void tscDestroyLocalReducer(SSqlObj *pSql); void tscDestroyLocalReducer(SSqlObj *pSql);
......
...@@ -41,6 +41,8 @@ int32_t tscHandleInsertRetry(SSqlObj* pSql); ...@@ -41,6 +41,8 @@ int32_t tscHandleInsertRetry(SSqlObj* pSql);
void tscBuildResFromSubqueries(SSqlObj *pSql); void tscBuildResFromSubqueries(SSqlObj *pSql);
TAOS_ROW doSetResultRowData(SSqlObj *pSql, bool finalResult); TAOS_ROW doSetResultRowData(SSqlObj *pSql, bool finalResult);
char *getArithemicInputSrc(void *param, const char *name, int32_t colId);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -125,6 +125,7 @@ int32_t tscGetDataBlockFromList(void* pHashList, SArray* pDataBlockList, int64_t ...@@ -125,6 +125,7 @@ int32_t tscGetDataBlockFromList(void* pHashList, SArray* pDataBlockList, int64_t
*/ */
bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo); bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo);
bool tscIsTWAQuery(SQueryInfo* pQueryInfo); bool tscIsTWAQuery(SQueryInfo* pQueryInfo);
bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo);
bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo *pQueryInfo, int32_t tableIndex); bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo *pQueryInfo, int32_t tableIndex);
bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex); bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex);
...@@ -158,7 +159,7 @@ SInternalField* tscFieldInfoGetInternalField(SFieldInfo* pFieldInfo, int32_t ind ...@@ -158,7 +159,7 @@ SInternalField* tscFieldInfoGetInternalField(SFieldInfo* pFieldInfo, int32_t ind
TAOS_FIELD* tscFieldInfoGetField(SFieldInfo* pFieldInfo, int32_t index); TAOS_FIELD* tscFieldInfoGetField(SFieldInfo* pFieldInfo, int32_t index);
void tscFieldInfoUpdateOffset(SQueryInfo* pQueryInfo); void tscFieldInfoUpdateOffset(SQueryInfo* pQueryInfo);
void tscFieldInfoUpdateOffsetForInterResult(SQueryInfo* pQueryInfo); void tscFieldInfoUpdateOffset(SQueryInfo* pQueryInfo);
int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index); int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index);
void tscFieldInfoClear(SFieldInfo* pFieldInfo); void tscFieldInfoClear(SFieldInfo* pFieldInfo);
...@@ -167,15 +168,15 @@ static FORCE_INLINE int32_t tscNumOfFields(SQueryInfo* pQueryInfo) { return pQue ...@@ -167,15 +168,15 @@ static FORCE_INLINE int32_t tscNumOfFields(SQueryInfo* pQueryInfo) { return pQue
int32_t tscFieldInfoCompare(const SFieldInfo* pFieldInfo1, const SFieldInfo* pFieldInfo2); int32_t tscFieldInfoCompare(const SFieldInfo* pFieldInfo1, const SFieldInfo* pFieldInfo2);
void addExprParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes, int16_t tableIndex); void addExprParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes);
int32_t tscGetResRowLength(SArray* pExprList); int32_t tscGetResRowLength(SArray* pExprList);
SSqlExpr* tscSqlExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, SColumnIndex* pColIndex, int16_t type, SSqlExpr* tscSqlExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t interSize, bool isTagCol); int16_t size, int16_t resColId, int16_t interSize, bool isTagCol);
SSqlExpr* tscSqlExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type, SSqlExpr* tscSqlExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t interSize, bool isTagCol); int16_t size, int16_t resColId, int16_t interSize, bool isTagCol);
SSqlExpr* tscSqlExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, int16_t srcColumnIndex, int16_t type, SSqlExpr* tscSqlExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, int16_t srcColumnIndex, int16_t type,
int16_t size); int16_t size);
......
...@@ -136,6 +136,7 @@ typedef struct SSqlExpr { ...@@ -136,6 +136,7 @@ typedef struct SSqlExpr {
int16_t numOfParams; // argument value of each function int16_t numOfParams; // argument value of each function
tVariant param[3]; // parameters are not more than 3 tVariant param[3]; // parameters are not more than 3
int32_t offset; // sub result column value of arithmetic expression. int32_t offset; // sub result column value of arithmetic expression.
int16_t resColId; // result column id
} SSqlExpr; } SSqlExpr;
typedef struct SColumnIndex { typedef struct SColumnIndex {
...@@ -251,6 +252,7 @@ typedef struct SQueryInfo { ...@@ -251,6 +252,7 @@ typedef struct SQueryInfo {
int64_t clauseLimit; // limit for current sub clause int64_t clauseLimit; // limit for current sub clause
int64_t prjOffset; // offset value in the original sql expression, only applied at client side int64_t prjOffset; // offset value in the original sql expression, only applied at client side
int32_t udColumnId; // current user-defined constant output field column id, monotonically decreases from TSDB_UD_COLUMN_INDEX int32_t udColumnId; // current user-defined constant output field column id, monotonically decreases from TSDB_UD_COLUMN_INDEX
int16_t resColumnId; // result column id
} SQueryInfo; } SQueryInfo;
typedef struct { typedef struct {
...@@ -462,17 +464,16 @@ int32_t tscSQLSyntaxErrMsg(char* msg, const char* additionalInfo, const char* s ...@@ -462,17 +464,16 @@ int32_t tscSQLSyntaxErrMsg(char* msg, const char* additionalInfo, const char* s
int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo); int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo);
static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pFieldInfo, int32_t columnIndex) { static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pFieldInfo, int32_t columnIndex, int32_t offset) {
SInternalField* pInfo = (SInternalField*) TARRAY_GET_ELEM(pFieldInfo->internalField, columnIndex); SInternalField* pInfo = (SInternalField*) TARRAY_GET_ELEM(pFieldInfo->internalField, columnIndex);
assert(pInfo->pSqlExpr != NULL);
int32_t type = pInfo->pSqlExpr->resType; int32_t type = pInfo->field.type;
int32_t bytes = pInfo->pSqlExpr->resBytes; int32_t bytes = pInfo->field.bytes;
char* pData = pRes->data + (int32_t)(pInfo->pSqlExpr->offset * pRes->numOfRows + bytes * pRes->row); char* pData = pRes->data + (int32_t)(offset * pRes->numOfRows + bytes * pRes->row);
// user defined constant value output columns // user defined constant value output columns
if (TSDB_COL_IS_UD_COL(pInfo->pSqlExpr->colInfo.flag)) { if (pInfo->pSqlExpr != NULL && TSDB_COL_IS_UD_COL(pInfo->pSqlExpr->colInfo.flag)) {
if (type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BINARY) { if (type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BINARY) {
pData = pInfo->pSqlExpr->param[1].pz; pData = pInfo->pSqlExpr->param[1].pz;
pRes->length[columnIndex] = pInfo->pSqlExpr->param[1].nLen; pRes->length[columnIndex] = pInfo->pSqlExpr->param[1].nLen;
...@@ -517,6 +518,7 @@ extern SRpcCorEpSet tscMgmtEpSet; ...@@ -517,6 +518,7 @@ extern SRpcCorEpSet tscMgmtEpSet;
extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo); extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo);
void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables); void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables);
int16_t getNewResColId(SQueryInfo* pQueryInfo);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -351,7 +351,7 @@ void tscProcessFetchRow(SSchedMsg *pMsg) { ...@@ -351,7 +351,7 @@ void tscProcessFetchRow(SSchedMsg *pMsg) {
SInternalField* pSup = taosArrayGet(pQueryInfo->fieldsInfo.internalField, i); SInternalField* pSup = taosArrayGet(pQueryInfo->fieldsInfo.internalField, i);
if (pSup->pSqlExpr != NULL) { if (pSup->pSqlExpr != NULL) {
tscGetResultColumnChr(pRes, &pQueryInfo->fieldsInfo, i); tscGetResultColumnChr(pRes, &pQueryInfo->fieldsInfo, i, 0);
} else { } else {
// todo add // todo add
} }
......
...@@ -2695,17 +2695,18 @@ static void apercentile_func_second_merge(SQLFunctionCtx *pCtx) { ...@@ -2695,17 +2695,18 @@ static void apercentile_func_second_merge(SQLFunctionCtx *pCtx) {
} }
SAPercentileInfo *pOutput = getAPerctInfo(pCtx); SAPercentileInfo *pOutput = getAPerctInfo(pCtx);
SHistogramInfo * pHisto = pOutput->pHisto; SHistogramInfo *pHisto = pOutput->pHisto;
if (pHisto->numOfElems <= 0) { if (pHisto->numOfElems <= 0) {
memcpy(pHisto, pInput->pHisto, sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1)); memcpy(pHisto, pInput->pHisto, sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1));
pHisto->elems = (SHistBin*) ((char *)pHisto + sizeof(SHistogramInfo)); pHisto->elems = (SHistBin*) ((char *)pHisto + sizeof(SHistogramInfo));
} else { } else {
//TODO(dengyihao): avoid memcpy
pHisto->elems = (SHistBin*) ((char *)pHisto + sizeof(SHistogramInfo)); pHisto->elems = (SHistBin*) ((char *)pHisto + sizeof(SHistogramInfo));
SHistogramInfo *pRes = tHistogramMerge(pHisto, pInput->pHisto, MAX_HISTOGRAM_BIN); SHistogramInfo *pRes = tHistogramMerge(pHisto, pInput->pHisto, MAX_HISTOGRAM_BIN);
tHistogramDestroy(&pOutput->pHisto); memcpy(pHisto, pRes, sizeof(SHistogramInfo) + sizeof(SHistBin) * MAX_HISTOGRAM_BIN);
pOutput->pHisto = pRes; pHisto->elems = (SHistBin*) ((char *)pHisto + sizeof(SHistogramInfo));
tHistogramDestroy(&pRes);
} }
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
......
...@@ -162,7 +162,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols, ...@@ -162,7 +162,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
SInternalField* pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f); SInternalField* pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY,
(TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE, (TSDB_COL_NAME_LEN - 1), false); (TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE, -1000, (TSDB_COL_NAME_LEN - 1), false);
rowLen += ((TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE); rowLen += ((TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE);
...@@ -172,7 +172,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols, ...@@ -172,7 +172,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f); pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, (int16_t)(typeColLength + VARSTR_HEADER_SIZE), pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, (int16_t)(typeColLength + VARSTR_HEADER_SIZE),
typeColLength, false); -1000, typeColLength, false);
rowLen += typeColLength + VARSTR_HEADER_SIZE; rowLen += typeColLength + VARSTR_HEADER_SIZE;
...@@ -182,7 +182,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols, ...@@ -182,7 +182,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f); pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_INT, sizeof(int32_t), pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_INT, sizeof(int32_t),
sizeof(int32_t), false); -1000, sizeof(int32_t), false);
rowLen += sizeof(int32_t); rowLen += sizeof(int32_t);
...@@ -192,7 +192,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols, ...@@ -192,7 +192,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f); pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, (int16_t)(noteColLength + VARSTR_HEADER_SIZE), pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, (int16_t)(noteColLength + VARSTR_HEADER_SIZE),
noteColLength, false); -1000, noteColLength, false);
rowLen += noteColLength + VARSTR_HEADER_SIZE; rowLen += noteColLength + VARSTR_HEADER_SIZE;
return rowLen; return rowLen;
...@@ -407,8 +407,7 @@ static int32_t tscSCreateBuildResultFields(SSqlObj *pSql, BuildType type, const ...@@ -407,8 +407,7 @@ static int32_t tscSCreateBuildResultFields(SSqlObj *pSql, BuildType type, const
} }
SInternalField* pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f); SInternalField* pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, f.bytes, -1000, f.bytes - VARSTR_HEADER_SIZE, false);
f.bytes, f.bytes - VARSTR_HEADER_SIZE, false);
rowLen += f.bytes; rowLen += f.bytes;
...@@ -422,7 +421,7 @@ static int32_t tscSCreateBuildResultFields(SSqlObj *pSql, BuildType type, const ...@@ -422,7 +421,7 @@ static int32_t tscSCreateBuildResultFields(SSqlObj *pSql, BuildType type, const
pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f); pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY,
(int16_t)(ddlLen + VARSTR_HEADER_SIZE), ddlLen, false); (int16_t)(ddlLen + VARSTR_HEADER_SIZE), -1000, ddlLen, false);
rowLen += ddlLen + VARSTR_HEADER_SIZE; rowLen += ddlLen + VARSTR_HEADER_SIZE;
...@@ -619,7 +618,11 @@ static int32_t tscRebuildDDLForNormalTable(SSqlObj *pSql, const char *tableName, ...@@ -619,7 +618,11 @@ static int32_t tscRebuildDDLForNormalTable(SSqlObj *pSql, const char *tableName,
for (int32_t i = 0; i < numOfRows; ++i) { for (int32_t i = 0; i < numOfRows; ++i) {
uint8_t type = pSchema[i].type; uint8_t type = pSchema[i].type;
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "%s %s(%d),", pSchema[i].name,tDataTypeDesc[pSchema[i].type].aName,pSchema->bytes); int32_t bytes = pSchema[i].bytes - VARSTR_HEADER_SIZE;
if (type == TSDB_DATA_TYPE_NCHAR) {
bytes = bytes/TSDB_NCHAR_SIZE;
}
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "%s %s(%d),", pSchema[i].name, tDataTypeDesc[pSchema[i].type].aName, bytes);
} else { } else {
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "%s %s,", pSchema[i].name, tDataTypeDesc[pSchema[i].type].aName); snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "%s %s,", pSchema[i].name, tDataTypeDesc[pSchema[i].type].aName);
} }
...@@ -642,7 +645,11 @@ static int32_t tscRebuildDDLForSuperTable(SSqlObj *pSql, const char *tableName, ...@@ -642,7 +645,11 @@ static int32_t tscRebuildDDLForSuperTable(SSqlObj *pSql, const char *tableName,
for (int32_t i = 0; i < numOfRows; ++i) { for (int32_t i = 0; i < numOfRows; ++i) {
uint8_t type = pSchema[i].type; uint8_t type = pSchema[i].type;
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result),"%s %s(%d),", pSchema[i].name,tDataTypeDesc[pSchema[i].type].aName,pSchema->bytes); int32_t bytes = pSchema[i].bytes - VARSTR_HEADER_SIZE;
if (type == TSDB_DATA_TYPE_NCHAR) {
bytes = bytes/TSDB_NCHAR_SIZE;
}
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result),"%s %s(%d),", pSchema[i].name,tDataTypeDesc[pSchema[i].type].aName, bytes);
} else { } else {
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "%s %s,", pSchema[i].name, tDataTypeDesc[type].aName); snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "%s %s,", pSchema[i].name, tDataTypeDesc[type].aName);
} }
...@@ -652,7 +659,11 @@ static int32_t tscRebuildDDLForSuperTable(SSqlObj *pSql, const char *tableName, ...@@ -652,7 +659,11 @@ static int32_t tscRebuildDDLForSuperTable(SSqlObj *pSql, const char *tableName,
for (int32_t i = numOfRows; i < totalRows; i++) { for (int32_t i = numOfRows; i < totalRows; i++) {
uint8_t type = pSchema[i].type; uint8_t type = pSchema[i].type;
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "%s %s(%d),", pSchema[i].name,tDataTypeDesc[pSchema[i].type].aName,pSchema->bytes); int32_t bytes = pSchema[i].bytes - VARSTR_HEADER_SIZE;
if (type == TSDB_DATA_TYPE_NCHAR) {
bytes = bytes/TSDB_NCHAR_SIZE;
}
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "%s %s(%d),", pSchema[i].name,tDataTypeDesc[pSchema[i].type].aName, bytes);
} else { } else {
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "%s %s,", pSchema[i].name, tDataTypeDesc[type].aName); snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "%s %s,", pSchema[i].name, tDataTypeDesc[type].aName);
} }
......
...@@ -13,14 +13,15 @@ ...@@ -13,14 +13,15 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "tscLocalMerge.h"
#include "tscSubquery.h"
#include "os.h" #include "os.h"
#include "qAst.h"
#include "tlosertree.h" #include "tlosertree.h"
#include "tscLog.h"
#include "tscUtil.h" #include "tscUtil.h"
#include "tschemautil.h" #include "tschemautil.h"
#include "tsclient.h" #include "tsclient.h"
#include "tutil.h"
#include "tscLog.h"
#include "tscLocalMerge.h"
typedef struct SCompareParam { typedef struct SCompareParam {
SLocalDataSource **pLocalData; SLocalDataSource **pLocalData;
...@@ -29,6 +30,8 @@ typedef struct SCompareParam { ...@@ -29,6 +30,8 @@ typedef struct SCompareParam {
int32_t groupOrderType; int32_t groupOrderType;
} SCompareParam; } SCompareParam;
static void doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_t rowSize, int32_t finalRowSize);
int32_t treeComparator(const void *pLeft, const void *pRight, void *param) { int32_t treeComparator(const void *pLeft, const void *pRight, void *param) {
int32_t pLeftIdx = *(int32_t *)pLeft; int32_t pLeftIdx = *(int32_t *)pLeft;
int32_t pRightIdx = *(int32_t *)pRight; int32_t pRightIdx = *(int32_t *)pRight;
...@@ -132,12 +135,15 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalReducer *pReducer, tOrderDesc ...@@ -132,12 +135,15 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalReducer *pReducer, tOrderDesc
} }
static SFillColInfo* createFillColInfo(SQueryInfo* pQueryInfo) { static SFillColInfo* createFillColInfo(SQueryInfo* pQueryInfo) {
int32_t numOfCols = (int32_t)tscSqlExprNumOfExprs(pQueryInfo); int32_t numOfCols = (int32_t)tscNumOfFields(pQueryInfo);
int32_t offset = 0; int32_t offset = 0;
SFillColInfo* pFillCol = calloc(numOfCols, sizeof(SFillColInfo)); SFillColInfo* pFillCol = calloc(numOfCols, sizeof(SFillColInfo));
for(int32_t i = 0; i < numOfCols; ++i) { for(int32_t i = 0; i < numOfCols; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); SInternalField* pIField = taosArrayGet(pQueryInfo->fieldsInfo.internalField, i);
if (pIField->pArithExprInfo == NULL) {
SSqlExpr* pExpr = pIField->pSqlExpr;
pFillCol[i].col.bytes = pExpr->resBytes; pFillCol[i].col.bytes = pExpr->resBytes;
pFillCol[i].col.type = (int8_t)pExpr->resType; pFillCol[i].col.type = (int8_t)pExpr->resType;
...@@ -146,14 +152,24 @@ static SFillColInfo* createFillColInfo(SQueryInfo* pQueryInfo) { ...@@ -146,14 +152,24 @@ static SFillColInfo* createFillColInfo(SQueryInfo* pQueryInfo) {
pFillCol[i].col.offset = offset; pFillCol[i].col.offset = offset;
pFillCol[i].functionId = pExpr->functionId; pFillCol[i].functionId = pExpr->functionId;
pFillCol[i].fillVal.i = pQueryInfo->fillVal[i]; pFillCol[i].fillVal.i = pQueryInfo->fillVal[i];
offset += pExpr->resBytes; } else {
pFillCol[i].col.bytes = pIField->field.bytes;
pFillCol[i].col.type = (int8_t)pIField->field.type;
pFillCol[i].col.colId = -100;
pFillCol[i].flag = TSDB_COL_NORMAL;
pFillCol[i].col.offset = offset;
pFillCol[i].functionId = -1;
pFillCol[i].fillVal.i = pQueryInfo->fillVal[i];
}
offset += pFillCol[i].col.bytes;
} }
return pFillCol; return pFillCol;
} }
void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc, void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc,
SColumnModel *finalmodel, SSqlObj* pSql) { SColumnModel *finalmodel, SColumnModel *pFFModel, SSqlObj* pSql) {
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res; SSqlRes* pRes = &pSql->res;
...@@ -342,8 +358,6 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd ...@@ -342,8 +358,6 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
return; return;
} }
size_t numOfCols = tscSqlExprNumOfExprs(pQueryInfo);
pReducer->pTempBuffer->num = 0; pReducer->pTempBuffer->num = 0;
tscCreateResPointerInfo(pRes, pQueryInfo); tscCreateResPointerInfo(pRes, pQueryInfo);
...@@ -372,7 +386,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd ...@@ -372,7 +386,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
if (pQueryInfo->fillType != TSDB_FILL_NONE) { if (pQueryInfo->fillType != TSDB_FILL_NONE) {
SFillColInfo* pFillCol = createFillColInfo(pQueryInfo); SFillColInfo* pFillCol = createFillColInfo(pQueryInfo);
pReducer->pFillInfo = taosInitFillInfo(pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols, pReducer->pFillInfo = taosInitFillInfo(pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols,
4096, (int32_t)numOfCols, pQueryInfo->interval.sliding, pQueryInfo->interval.slidingUnit, 4096, (int32_t)pQueryInfo->fieldsInfo.numOfOutput, pQueryInfo->interval.sliding, pQueryInfo->interval.slidingUnit,
tinfo.precision, pQueryInfo->fillType, pFillCol, pSql); tinfo.precision, pQueryInfo->fillType, pFillCol, pSql);
} }
} }
...@@ -491,7 +505,8 @@ void tscDestroyLocalReducer(SSqlObj *pSql) { ...@@ -491,7 +505,8 @@ void tscDestroyLocalReducer(SSqlObj *pSql) {
pLocalReducer->pFillInfo = taosDestroyFillInfo(pLocalReducer->pFillInfo); pLocalReducer->pFillInfo = taosDestroyFillInfo(pLocalReducer->pFillInfo);
if (pLocalReducer->pCtx != NULL) { if (pLocalReducer->pCtx != NULL) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { int32_t numOfExprs = (int32_t) tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < numOfExprs; ++i) {
SQLFunctionCtx *pCtx = &pLocalReducer->pCtx[i]; SQLFunctionCtx *pCtx = &pLocalReducer->pCtx[i];
tVariantDestroy(&pCtx->tag); tVariantDestroy(&pCtx->tag);
...@@ -555,7 +570,8 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCm ...@@ -555,7 +570,8 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCm
if (numOfGroupByCols > 0) { if (numOfGroupByCols > 0) {
if (pQueryInfo->groupbyExpr.numOfGroupCols > 0) { if (pQueryInfo->groupbyExpr.numOfGroupCols > 0) {
int32_t startCols = pQueryInfo->fieldsInfo.numOfOutput - pQueryInfo->groupbyExpr.numOfGroupCols; int32_t numOfInternalOutput = (int32_t) tscSqlExprNumOfExprs(pQueryInfo);
int32_t startCols = numOfInternalOutput - pQueryInfo->groupbyExpr.numOfGroupCols;
// the last "pQueryInfo->groupbyExpr.numOfGroupCols" columns are order-by columns // the last "pQueryInfo->groupbyExpr.numOfGroupCols" columns are order-by columns
for (int32_t i = 0; i < pQueryInfo->groupbyExpr.numOfGroupCols; ++i) { for (int32_t i = 0; i < pQueryInfo->groupbyExpr.numOfGroupCols; ++i) {
...@@ -674,6 +690,8 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr ...@@ -674,6 +690,8 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
pSchema[i].bytes = pExpr->resBytes; pSchema[i].bytes = pExpr->resBytes;
pSchema[i].type = (int8_t)pExpr->resType; pSchema[i].type = (int8_t)pExpr->resType;
tstrncpy(pSchema[i].name, pExpr->aliasName, tListLen(pSchema[i].name));
rlen += pExpr->resBytes; rlen += pExpr->resBytes;
} }
...@@ -736,8 +754,8 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr ...@@ -736,8 +754,8 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
} }
*pFinalModel = createColumnModel(pSchema, (int32_t)size, capacity); *pFinalModel = createColumnModel(pSchema, (int32_t)size, capacity);
tfree(pSchema);
tfree(pSchema);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -966,10 +984,11 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO ...@@ -966,10 +984,11 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO
savePrevRecordAndSetupFillInfo(pLocalReducer, pQueryInfo, pFillInfo); savePrevRecordAndSetupFillInfo(pLocalReducer, pQueryInfo, pFillInfo);
} }
int32_t offset = 0;
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i); TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
int16_t offset = getColumnModelOffset(pLocalReducer->resColModel, i);
memcpy(pRes->data + offset * pRes->numOfRows, pResPages[i]->data, (size_t)(pField->bytes * pRes->numOfRows)); memcpy(pRes->data + offset * pRes->numOfRows, pResPages[i]->data, (size_t)(pField->bytes * pRes->numOfRows));
offset += pField->bytes;
} }
pRes->numOfRowsGroup += pRes->numOfRows; pRes->numOfRowsGroup += pRes->numOfRows;
...@@ -1222,6 +1241,10 @@ bool genFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool noMoreCur ...@@ -1222,6 +1241,10 @@ bool genFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool noMoreCur
tColModelCompact(pModel, pResBuf, pModel->capacity); tColModelCompact(pModel, pResBuf, pModel->capacity);
if (tscIsSecondStageQuery(pQueryInfo)) {
doArithmeticCalculate(pQueryInfo, pResBuf, pModel->rowSize, pLocalReducer->finalRowSize);
}
#ifdef _DEBUG_VIEW #ifdef _DEBUG_VIEW
printf("final result before interpo:\n"); printf("final result before interpo:\n");
// tColModelDisplay(pLocalReducer->resColModel, pLocalReducer->pBufForInterpo, pResBuf->num, pResBuf->num); // tColModelDisplay(pLocalReducer->resColModel, pLocalReducer->pBufForInterpo, pResBuf->num, pResBuf->num);
...@@ -1588,3 +1611,44 @@ void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen) ...@@ -1588,3 +1611,44 @@ void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen)
pRes->pLocalReducer->pResultBuf->num = numOfRes; pRes->pLocalReducer->pResultBuf->num = numOfRes;
pRes->data = pRes->pLocalReducer->pResultBuf->data; pRes->data = pRes->pLocalReducer->pResultBuf->data;
} }
void doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_t rowSize, int32_t finalRowSize) {
char* pbuf = calloc(1, pOutput->num * rowSize);
size_t size = tscNumOfFields(pQueryInfo);
SArithmeticSupport arithSup = {0};
// todo refactor
arithSup.offset = 0;
arithSup.numOfCols = (int32_t) tscSqlExprNumOfExprs(pQueryInfo);
arithSup.exprList = pQueryInfo->exprList;
arithSup.data = calloc(arithSup.numOfCols, POINTER_BYTES);
for(int32_t k = 0; k < arithSup.numOfCols; ++k) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, k);
arithSup.data[k] = (pOutput->data + pOutput->num* pExpr->offset);
}
int32_t offset = 0;
for (int i = 0; i < size; ++i) {
SInternalField* pSup = TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i);
// calculate the result from several other columns
if (pSup->pArithExprInfo != NULL) {
arithSup.pArithExpr = pSup->pArithExprInfo;
tExprTreeCalcTraverse(arithSup.pArithExpr->pExpr, (int32_t) pOutput->num, pbuf + pOutput->num*offset, &arithSup, TSDB_ORDER_ASC, getArithemicInputSrc);
} else {
SSqlExpr* pExpr = pSup->pSqlExpr;
memcpy(pbuf + pOutput->num * offset, pExpr->offset * pOutput->num + pOutput->data, pExpr->resBytes * pOutput->num);
}
offset += pSup->field.bytes;
}
assert(finalRowSize <= rowSize);
memcpy(pOutput->data, pbuf, pOutput->num * finalRowSize);
tfree(pbuf);
tfree(arithSup.data);
}
\ No newline at end of file
...@@ -1148,6 +1148,10 @@ int tsParseInsertSql(SSqlObj *pSql) { ...@@ -1148,6 +1148,10 @@ int tsParseInsertSql(SSqlObj *pSql) {
index = 0; index = 0;
sToken = tStrGetToken(str, &index, false, 0, NULL); sToken = tStrGetToken(str, &index, false, 0, NULL);
if (sToken.type != TK_STRING && sToken.type != TK_ID) {
code = tscInvalidSQLErrMsg(pCmd->payload, "file path is required following keyword FILE", sToken.z);
goto _error;
}
str += index; str += index;
if (sToken.n == 0) { if (sToken.n == 0) {
code = tscInvalidSQLErrMsg(pCmd->payload, "file path is required following keyword FILE", sToken.z); code = tscInvalidSQLErrMsg(pCmd->payload, "file path is required following keyword FILE", sToken.z);
......
...@@ -52,7 +52,8 @@ typedef struct SConvertFunc { ...@@ -52,7 +52,8 @@ typedef struct SConvertFunc {
int32_t originFuncId; int32_t originFuncId;
int32_t execFuncId; int32_t execFuncId;
} SConvertFunc; } SConvertFunc;
static SSqlExpr* doAddProjectCol(SQueryInfo* pQueryInfo, int32_t outputIndex, int32_t colIndex, int32_t tableIndex);
static SSqlExpr* doAddProjectCol(SQueryInfo* pQueryInfo, int32_t colIndex, int32_t tableIndex);
static int32_t setShowInfo(SSqlObj* pSql, SSqlInfo* pInfo); static int32_t setShowInfo(SSqlObj* pSql, SSqlInfo* pInfo);
static char* getAccountId(SSqlObj* pSql); static char* getAccountId(SSqlObj* pSql);
...@@ -127,6 +128,10 @@ static int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo); ...@@ -127,6 +128,10 @@ static int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo);
static int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index); static int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index);
static int32_t exprTreeFromSqlExpr(SSqlCmd* pCmd, tExprNode **pExpr, const tSQLExpr* pSqlExpr, SQueryInfo* pQueryInfo, SArray* pCols, int64_t *uid); static int32_t exprTreeFromSqlExpr(SSqlCmd* pCmd, tExprNode **pExpr, const tSQLExpr* pSqlExpr, SQueryInfo* pQueryInfo, SArray* pCols, int64_t *uid);
int16_t getNewResColId(SQueryInfo* pQueryInfo) {
return pQueryInfo->resColumnId--;
}
static uint8_t convertOptr(SStrToken *pToken) { static uint8_t convertOptr(SStrToken *pToken) {
switch (pToken->type) { switch (pToken->type) {
case TK_LT: case TK_LT:
...@@ -1274,6 +1279,7 @@ static void tscInsertPrimaryTSSourceColumn(SQueryInfo* pQueryInfo, SColumnIndex* ...@@ -1274,6 +1279,7 @@ static void tscInsertPrimaryTSSourceColumn(SQueryInfo* pQueryInfo, SColumnIndex*
SColumnIndex tsCol = {.tableIndex = pIndex->tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX}; SColumnIndex tsCol = {.tableIndex = pIndex->tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
tscColumnListInsert(pQueryInfo->colList, &tsCol); tscColumnListInsert(pQueryInfo->colList, &tsCol);
} }
static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t exprIndex, tSQLExprItem* pItem) { static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t exprIndex, tSQLExprItem* pItem) {
const char* msg1 = "invalid column name, illegal column type, or columns in arithmetic expression from two tables"; const char* msg1 = "invalid column name, illegal column type, or columns in arithmetic expression from two tables";
const char* msg2 = "invalid arithmetic expression in select clause"; const char* msg2 = "invalid arithmetic expression in select clause";
...@@ -1305,7 +1311,7 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t ...@@ -1305,7 +1311,7 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t
SColumnIndex index = {.tableIndex = tableIndex}; SColumnIndex index = {.tableIndex = tableIndex};
SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_ARITHM, &index, TSDB_DATA_TYPE_DOUBLE, sizeof(double), SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_ARITHM, &index, TSDB_DATA_TYPE_DOUBLE, sizeof(double),
sizeof(double), false); -1000, sizeof(double), false);
char* name = (pItem->aliasName != NULL)? pItem->aliasName:pItem->pNode->token.z; char* name = (pItem->aliasName != NULL)? pItem->aliasName:pItem->pNode->token.z;
size_t len = MIN(sizeof(pExpr->aliasName), pItem->pNode->token.n + 1); size_t len = MIN(sizeof(pExpr->aliasName), pItem->pNode->token.n + 1);
...@@ -1321,6 +1327,7 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t ...@@ -1321,6 +1327,7 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
} }
// check for if there is a tag in the arithmetic express
size_t numOfNode = taosArrayGetSize(colList); size_t numOfNode = taosArrayGetSize(colList);
for(int32_t k = 0; k < numOfNode; ++k) { for(int32_t k = 0; k < numOfNode; ++k) {
SColIndex* pIndex = taosArrayGet(colList, k); SColIndex* pIndex = taosArrayGet(colList, k);
...@@ -1346,9 +1353,9 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t ...@@ -1346,9 +1353,9 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t
char* c = tbufGetData(&bw, false); char* c = tbufGetData(&bw, false);
// set the serialized binary string as the parameter of arithmetic expression // set the serialized binary string as the parameter of arithmetic expression
addExprParams(pExpr, c, TSDB_DATA_TYPE_BINARY, (int32_t)len, index.tableIndex); addExprParams(pExpr, c, TSDB_DATA_TYPE_BINARY, (int32_t)len);
insertResultField(pQueryInfo, exprIndex, &columnList, sizeof(double), TSDB_DATA_TYPE_DOUBLE, pExpr->aliasName, pExpr); insertResultField(pQueryInfo, exprIndex, &columnList, sizeof(double), TSDB_DATA_TYPE_DOUBLE, pExpr->aliasName, pExpr);
// add ts column // add ts column
tscInsertPrimaryTSSourceColumn(pQueryInfo, &index); tscInsertPrimaryTSSourceColumn(pQueryInfo, &index);
...@@ -1380,6 +1387,10 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t ...@@ -1380,6 +1387,10 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t
pArithExprInfo->interBytes = sizeof(double); pArithExprInfo->interBytes = sizeof(double);
pArithExprInfo->type = TSDB_DATA_TYPE_DOUBLE; pArithExprInfo->type = TSDB_DATA_TYPE_DOUBLE;
pArithExprInfo->base.functionId = TSDB_FUNC_ARITHM;
pArithExprInfo->base.numOfParams = 1;
pArithExprInfo->base.resColId = getNewResColId(pQueryInfo);
int32_t ret = exprTreeFromSqlExpr(pCmd, &pArithExprInfo->pExpr, pItem->pNode, pQueryInfo, NULL, &pArithExprInfo->uid); int32_t ret = exprTreeFromSqlExpr(pCmd, &pArithExprInfo->pExpr, pItem->pNode, pQueryInfo, NULL, &pArithExprInfo->uid);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
tExprTreeDestroy(&pArithExprInfo->pExpr, NULL); tExprTreeDestroy(&pArithExprInfo->pExpr, NULL);
...@@ -1388,14 +1399,30 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t ...@@ -1388,14 +1399,30 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t
pInfo->pArithExprInfo = pArithExprInfo; pInfo->pArithExprInfo = pArithExprInfo;
} }
SBufferWriter bw = tbufInitWriter(NULL, false);
TRY(0) {
exprTreeToBinary(&bw, pInfo->pArithExprInfo->pExpr);
} CATCH(code) {
tbufCloseWriter(&bw);
UNUSED(code);
// TODO: other error handling
} END_TRY
SSqlFuncMsg* pFuncMsg = &pInfo->pArithExprInfo->base;
pFuncMsg->arg[0].argBytes = (int16_t) tbufTell(&bw);
pFuncMsg->arg[0].argValue.pz = tbufGetData(&bw, true);
pFuncMsg->arg[0].argType = TSDB_DATA_TYPE_BINARY;
// tbufCloseWriter(&bw); // TODO there is a memory leak
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void addProjectQueryCol(SQueryInfo* pQueryInfo, int32_t startPos, SColumnIndex* pIndex, tSQLExprItem* pItem) { static void addProjectQueryCol(SQueryInfo* pQueryInfo, int32_t startPos, SColumnIndex* pIndex, tSQLExprItem* pItem) {
SSqlExpr* pExpr = doAddProjectCol(pQueryInfo, startPos, pIndex->columnIndex, pIndex->tableIndex); SSqlExpr* pExpr = doAddProjectCol(pQueryInfo, pIndex->columnIndex, pIndex->tableIndex);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, pIndex->tableIndex); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, pIndex->tableIndex);
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
...@@ -1540,7 +1567,7 @@ int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SColumnLi ...@@ -1540,7 +1567,7 @@ int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SColumnLi
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SSqlExpr* doAddProjectCol(SQueryInfo* pQueryInfo, int32_t outputIndex, int32_t colIndex, int32_t tableIndex) { SSqlExpr* doAddProjectCol(SQueryInfo* pQueryInfo, int32_t colIndex, int32_t tableIndex) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
int32_t numOfCols = tscGetNumOfColumns(pTableMeta); int32_t numOfCols = tscGetNumOfColumns(pTableMeta);
...@@ -1552,20 +1579,22 @@ SSqlExpr* doAddProjectCol(SQueryInfo* pQueryInfo, int32_t outputIndex, int32_t c ...@@ -1552,20 +1579,22 @@ SSqlExpr* doAddProjectCol(SQueryInfo* pQueryInfo, int32_t outputIndex, int32_t c
if (functionId == TSDB_FUNC_TAGPRJ) { if (functionId == TSDB_FUNC_TAGPRJ) {
index.columnIndex = colIndex - tscGetNumOfColumns(pTableMeta); index.columnIndex = colIndex - tscGetNumOfColumns(pTableMeta);
tscColumnListInsert(pTableMetaInfo->tagColList, &index); tscColumnListInsert(pTableMetaInfo->tagColList, &index);
} else { } else {
index.columnIndex = colIndex; index.columnIndex = colIndex;
} }
return tscSqlExprAppend(pQueryInfo, functionId, &index, pSchema->type, pSchema->bytes, int16_t colId = getNewResColId(pQueryInfo);
pSchema->bytes, functionId == TSDB_FUNC_TAGPRJ); return tscSqlExprAppend(pQueryInfo, functionId, &index, pSchema->type, pSchema->bytes, colId, pSchema->bytes,
(functionId == TSDB_FUNC_TAGPRJ));
} }
SSqlExpr* tscAddSpecialColumnForSelect(SQueryInfo* pQueryInfo, int32_t outputColIndex, int16_t functionId, SSqlExpr* tscAddSpecialColumnForSelect(SQueryInfo* pQueryInfo, int32_t outputColIndex, int16_t functionId,
SColumnIndex* pIndex, SSchema* pColSchema, int16_t flag) { SColumnIndex* pIndex, SSchema* pColSchema, int16_t flag) {
int16_t colId = getNewResColId(pQueryInfo);
SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, outputColIndex, functionId, pIndex, pColSchema->type, SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, outputColIndex, functionId, pIndex, pColSchema->type,
pColSchema->bytes, pColSchema->bytes, TSDB_COL_IS_TAG(flag)); pColSchema->bytes, colId, pColSchema->bytes, TSDB_COL_IS_TAG(flag));
tstrncpy(pExpr->aliasName, pColSchema->name, sizeof(pExpr->aliasName)); tstrncpy(pExpr->aliasName, pColSchema->name, sizeof(pExpr->aliasName));
SColumnList ids = getColumnList(1, pIndex->tableIndex, pIndex->columnIndex); SColumnList ids = getColumnList(1, pIndex->tableIndex, pIndex->columnIndex);
...@@ -1601,7 +1630,7 @@ static int32_t doAddProjectionExprAndResultFields(SQueryInfo* pQueryInfo, SColum ...@@ -1601,7 +1630,7 @@ static int32_t doAddProjectionExprAndResultFields(SQueryInfo* pQueryInfo, SColum
} }
for (int32_t j = 0; j < numOfTotalColumns; ++j) { for (int32_t j = 0; j < numOfTotalColumns; ++j) {
SSqlExpr* pExpr = doAddProjectCol(pQueryInfo, startPos + j, j, pIndex->tableIndex); SSqlExpr* pExpr = doAddProjectCol(pQueryInfo, j, pIndex->tableIndex);
tstrncpy(pExpr->aliasName, pSchema[j].name, sizeof(pExpr->aliasName)); tstrncpy(pExpr->aliasName, pSchema[j].name, sizeof(pExpr->aliasName));
pIndex->columnIndex = j; pIndex->columnIndex = j;
...@@ -1710,7 +1739,7 @@ static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SS ...@@ -1710,7 +1739,7 @@ static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SS
bytes = pSchema->bytes; bytes = pSchema->bytes;
} }
SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, functionID, pColIndex, type, bytes, bytes, false); SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, functionID, pColIndex, type, bytes, getNewResColId(pQueryInfo), bytes, false);
tstrncpy(pExpr->aliasName, name, tListLen(pExpr->aliasName)); tstrncpy(pExpr->aliasName, name, tListLen(pExpr->aliasName));
if (cvtFunc.originFuncId == TSDB_FUNC_LAST_ROW && cvtFunc.originFuncId != functionID) { if (cvtFunc.originFuncId == TSDB_FUNC_LAST_ROW && cvtFunc.originFuncId != functionID) {
...@@ -1804,7 +1833,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -1804,7 +1833,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
index = (SColumnIndex){0, PRIMARYKEY_TIMESTAMP_COL_INDEX}; index = (SColumnIndex){0, PRIMARYKEY_TIMESTAMP_COL_INDEX};
int32_t size = tDataTypeDesc[TSDB_DATA_TYPE_BIGINT].nSize; int32_t size = tDataTypeDesc[TSDB_DATA_TYPE_BIGINT].nSize;
pExpr = tscSqlExprAppend(pQueryInfo, functionID, &index, TSDB_DATA_TYPE_BIGINT, size, size, false); pExpr = tscSqlExprAppend(pQueryInfo, functionID, &index, TSDB_DATA_TYPE_BIGINT, size, getNewResColId(pQueryInfo), size, false);
} else if (sqlOptr == TK_INTEGER) { // select count(1) from table1 } else if (sqlOptr == TK_INTEGER) { // select count(1) from table1
char buf[8] = {0}; char buf[8] = {0};
int64_t val = -1; int64_t val = -1;
...@@ -1816,7 +1845,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -1816,7 +1845,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
if (val == 1) { if (val == 1) {
index = (SColumnIndex){0, PRIMARYKEY_TIMESTAMP_COL_INDEX}; index = (SColumnIndex){0, PRIMARYKEY_TIMESTAMP_COL_INDEX};
int32_t size = tDataTypeDesc[TSDB_DATA_TYPE_BIGINT].nSize; int32_t size = tDataTypeDesc[TSDB_DATA_TYPE_BIGINT].nSize;
pExpr = tscSqlExprAppend(pQueryInfo, functionID, &index, TSDB_DATA_TYPE_BIGINT, size, size, false); pExpr = tscSqlExprAppend(pQueryInfo, functionID, &index, TSDB_DATA_TYPE_BIGINT, size, getNewResColId(pQueryInfo), size, false);
} else { } else {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
} }
...@@ -1836,12 +1865,12 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -1836,12 +1865,12 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
} }
int32_t size = tDataTypeDesc[TSDB_DATA_TYPE_BIGINT].nSize; int32_t size = tDataTypeDesc[TSDB_DATA_TYPE_BIGINT].nSize;
pExpr = tscSqlExprAppend(pQueryInfo, functionID, &index, TSDB_DATA_TYPE_BIGINT, size, size, isTag); pExpr = tscSqlExprAppend(pQueryInfo, functionID, &index, TSDB_DATA_TYPE_BIGINT, size, getNewResColId(pQueryInfo), size, isTag);
} }
} else { // count(*) is equalled to count(primary_timestamp_key) } else { // count(*) is equalled to count(primary_timestamp_key)
index = (SColumnIndex){0, PRIMARYKEY_TIMESTAMP_COL_INDEX}; index = (SColumnIndex){0, PRIMARYKEY_TIMESTAMP_COL_INDEX};
int32_t size = tDataTypeDesc[TSDB_DATA_TYPE_BIGINT].nSize; int32_t size = tDataTypeDesc[TSDB_DATA_TYPE_BIGINT].nSize;
pExpr = tscSqlExprAppend(pQueryInfo, functionID, &index, TSDB_DATA_TYPE_BIGINT, size, size, false); pExpr = tscSqlExprAppend(pQueryInfo, functionID, &index, TSDB_DATA_TYPE_BIGINT, size, getNewResColId(pQueryInfo), size, false);
} }
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
...@@ -1928,7 +1957,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -1928,7 +1957,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
colIndex += 1; colIndex += 1;
SColumnIndex indexTS = {.tableIndex = index.tableIndex, .columnIndex = 0}; SColumnIndex indexTS = {.tableIndex = index.tableIndex, .columnIndex = 0};
SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &indexTS, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &indexTS, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE,
TSDB_KEYSIZE, false); getNewResColId(pQueryInfo), TSDB_KEYSIZE, false);
SColumnList ids = getColumnList(1, 0, 0); SColumnList ids = getColumnList(1, 0, 0);
insertResultField(pQueryInfo, 0, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, aAggs[TSDB_FUNC_TS_DUMMY].aName, pExpr); insertResultField(pQueryInfo, 0, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, aAggs[TSDB_FUNC_TS_DUMMY].aName, pExpr);
...@@ -1939,7 +1968,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -1939,7 +1968,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg6); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg6);
} }
SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, functionID, &index, resultType, resultSize, resultSize, false); SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, functionID, &index, resultType, resultSize, getNewResColId(pQueryInfo), resultSize, false);
if (optr == TK_LEASTSQUARES) { if (optr == TK_LEASTSQUARES) {
/* set the leastsquares parameters */ /* set the leastsquares parameters */
...@@ -1948,14 +1977,14 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -1948,14 +1977,14 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
} }
addExprParams(pExpr, val, TSDB_DATA_TYPE_DOUBLE, DOUBLE_BYTES, 0); addExprParams(pExpr, val, TSDB_DATA_TYPE_DOUBLE, DOUBLE_BYTES);
memset(val, 0, tListLen(val)); memset(val, 0, tListLen(val));
if (tVariantDump(&pParamElem[2].pNode->val, val, TSDB_DATA_TYPE_DOUBLE, true) < 0) { if (tVariantDump(&pParamElem[2].pNode->val, val, TSDB_DATA_TYPE_DOUBLE, true) < 0) {
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
} }
addExprParams(pExpr, val, TSDB_DATA_TYPE_DOUBLE, sizeof(double), 0); addExprParams(pExpr, val, TSDB_DATA_TYPE_DOUBLE, sizeof(double));
} }
SColumnList ids = {0}; SColumnList ids = {0};
...@@ -2180,8 +2209,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -2180,8 +2209,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
tscInsertPrimaryTSSourceColumn(pQueryInfo, &index); tscInsertPrimaryTSSourceColumn(pQueryInfo, &index);
colIndex += 1; // the first column is ts colIndex += 1; // the first column is ts
pExpr = tscSqlExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, resultSize, false); pExpr = tscSqlExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pQueryInfo), resultSize, false);
addExprParams(pExpr, val, TSDB_DATA_TYPE_DOUBLE, sizeof(double), 0); addExprParams(pExpr, val, TSDB_DATA_TYPE_DOUBLE, sizeof(double));
} else { } else {
tVariantDump(pVariant, val, TSDB_DATA_TYPE_BIGINT, true); tVariantDump(pVariant, val, TSDB_DATA_TYPE_BIGINT, true);
...@@ -2198,7 +2227,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -2198,7 +2227,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
// todo REFACTOR // todo REFACTOR
// set the first column ts for top/bottom query // set the first column ts for top/bottom query
SColumnIndex index1 = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX}; SColumnIndex index1 = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX};
pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS, &index1, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS, &index1, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, getNewResColId(pQueryInfo),
TSDB_KEYSIZE, false); TSDB_KEYSIZE, false);
tstrncpy(pExpr->aliasName, aAggs[TSDB_FUNC_TS].aName, sizeof(pExpr->aliasName)); tstrncpy(pExpr->aliasName, aAggs[TSDB_FUNC_TS].aName, sizeof(pExpr->aliasName));
...@@ -2209,8 +2238,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -2209,8 +2238,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
colIndex += 1; // the first column is ts colIndex += 1; // the first column is ts
pExpr = tscSqlExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, resultSize, false); pExpr = tscSqlExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pQueryInfo), resultSize, false);
addExprParams(pExpr, val, TSDB_DATA_TYPE_BIGINT, sizeof(int64_t), 0); addExprParams(pExpr, val, TSDB_DATA_TYPE_BIGINT, sizeof(int64_t));
} }
memset(pExpr->aliasName, 0, tListLen(pExpr->aliasName)); memset(pExpr->aliasName, 0, tListLen(pExpr->aliasName));
...@@ -2694,7 +2723,7 @@ int32_t tscTansformSQLFuncForSTableQuery(SQueryInfo* pQueryInfo) { ...@@ -2694,7 +2723,7 @@ int32_t tscTansformSQLFuncForSTableQuery(SQueryInfo* pQueryInfo) {
} }
} }
tscFieldInfoUpdateOffsetForInterResult(pQueryInfo); tscFieldInfoUpdateOffset(pQueryInfo);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -2922,7 +2951,7 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, SArray* pList, SSqlCmd* pCmd) ...@@ -2922,7 +2951,7 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, SArray* pList, SSqlCmd* pCmd)
void setColumnOffsetValueInResultset(SQueryInfo* pQueryInfo) { void setColumnOffsetValueInResultset(SQueryInfo* pQueryInfo) {
if (QUERY_IS_STABLE_QUERY(pQueryInfo->type)) { if (QUERY_IS_STABLE_QUERY(pQueryInfo->type)) {
tscFieldInfoUpdateOffsetForInterResult(pQueryInfo); tscFieldInfoUpdateOffset(pQueryInfo);
} else { } else {
tscFieldInfoUpdateOffset(pQueryInfo); tscFieldInfoUpdateOffset(pQueryInfo);
} }
...@@ -4437,7 +4466,7 @@ int32_t parseFillClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuery ...@@ -4437,7 +4466,7 @@ int32_t parseFillClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuery
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
} }
size_t size = tscSqlExprNumOfExprs(pQueryInfo); size_t size = tscNumOfFields(pQueryInfo);
if (pQueryInfo->fillVal == NULL) { if (pQueryInfo->fillVal == NULL) {
pQueryInfo->fillVal = calloc(size, sizeof(int64_t)); pQueryInfo->fillVal = calloc(size, sizeof(int64_t));
...@@ -4451,12 +4480,8 @@ int32_t parseFillClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuery ...@@ -4451,12 +4480,8 @@ int32_t parseFillClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuery
} else if (strncasecmp(pItem->pVar.pz, "null", 4) == 0 && pItem->pVar.nLen == 4) { } else if (strncasecmp(pItem->pVar.pz, "null", 4) == 0 && pItem->pVar.nLen == 4) {
pQueryInfo->fillType = TSDB_FILL_NULL; pQueryInfo->fillType = TSDB_FILL_NULL;
for (int32_t i = START_INTERPO_COL_IDX; i < size; ++i) { for (int32_t i = START_INTERPO_COL_IDX; i < size; ++i) {
TAOS_FIELD* pFields = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i); TAOS_FIELD* pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
if (pFields->type == TSDB_DATA_TYPE_BINARY || pFields->type == TSDB_DATA_TYPE_NCHAR) { setNull((char*)&pQueryInfo->fillVal[i], pField->type, pField->bytes);
setVardataNull((char*) &pQueryInfo->fillVal[i], pFields->type);
} else {
setNull((char*)&pQueryInfo->fillVal[i], pFields->type, pFields->bytes);
};
} }
} else if (strncasecmp(pItem->pVar.pz, "prev", 4) == 0 && pItem->pVar.nLen == 4) { } else if (strncasecmp(pItem->pVar.pz, "prev", 4) == 0 && pItem->pVar.nLen == 4) {
pQueryInfo->fillType = TSDB_FILL_PREV; pQueryInfo->fillType = TSDB_FILL_PREV;
...@@ -4487,15 +4512,15 @@ int32_t parseFillClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuery ...@@ -4487,15 +4512,15 @@ int32_t parseFillClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuery
int32_t j = 1; int32_t j = 1;
for (int32_t i = startPos; i < numOfFillVal; ++i, ++j) { for (int32_t i = startPos; i < numOfFillVal; ++i, ++j) {
TAOS_FIELD* pFields = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i); TAOS_FIELD* pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
if (pFields->type == TSDB_DATA_TYPE_BINARY || pFields->type == TSDB_DATA_TYPE_NCHAR) { if (pField->type == TSDB_DATA_TYPE_BINARY || pField->type == TSDB_DATA_TYPE_NCHAR) {
setVardataNull((char*) &pQueryInfo->fillVal[i], pFields->type); setVardataNull((char*) &pQueryInfo->fillVal[i], pField->type);
continue; continue;
} }
tVariant* p = taosArrayGet(pFillToken, j); tVariant* p = taosArrayGet(pFillToken, j);
int32_t ret = tVariantDump(p, (char*)&pQueryInfo->fillVal[i], pFields->type, true); int32_t ret = tVariantDump(p, (char*)&pQueryInfo->fillVal[i], pField->type, true);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg);
} }
...@@ -4505,12 +4530,12 @@ int32_t parseFillClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuery ...@@ -4505,12 +4530,12 @@ int32_t parseFillClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuery
tVariantListItem* lastItem = taosArrayGetLast(pFillToken); tVariantListItem* lastItem = taosArrayGetLast(pFillToken);
for (int32_t i = numOfFillVal; i < size; ++i) { for (int32_t i = numOfFillVal; i < size; ++i) {
TAOS_FIELD* pFields = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i); TAOS_FIELD* pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
if (pFields->type == TSDB_DATA_TYPE_BINARY || pFields->type == TSDB_DATA_TYPE_NCHAR) { if (pField->type == TSDB_DATA_TYPE_BINARY || pField->type == TSDB_DATA_TYPE_NCHAR) {
setVardataNull((char*) &pQueryInfo->fillVal[i], pFields->type); setVardataNull((char*) &pQueryInfo->fillVal[i], pField->type);
} else { } else {
tVariantDump(&lastItem->pVar, (char*)&pQueryInfo->fillVal[i], pFields->type, true); tVariantDump(&lastItem->pVar, (char*)&pQueryInfo->fillVal[i], pField->type, true);
} }
} }
} }
...@@ -5447,7 +5472,7 @@ void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClau ...@@ -5447,7 +5472,7 @@ void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClau
int16_t type = pTagSchema->type; int16_t type = pTagSchema->type;
int16_t bytes = pTagSchema->bytes; int16_t bytes = pTagSchema->bytes;
pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TAG, &index, type, bytes, bytes, true); pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TAG, &index, type, bytes, getNewResColId(pQueryInfo), bytes, true);
pExpr->colInfo.flag = TSDB_COL_TAG; pExpr->colInfo.flag = TSDB_COL_TAG;
// NOTE: tag column does not add to source column list // NOTE: tag column does not add to source column list
...@@ -5750,7 +5775,7 @@ static int32_t doAddGroupbyColumnsOnDemand(SSqlCmd* pCmd, SQueryInfo* pQueryInfo ...@@ -5750,7 +5775,7 @@ static int32_t doAddGroupbyColumnsOnDemand(SSqlCmd* pCmd, SQueryInfo* pQueryInfo
if (TSDB_COL_IS_TAG(pColIndex->flag)) { if (TSDB_COL_IS_TAG(pColIndex->flag)) {
SColumnIndex index = {.tableIndex = pQueryInfo->groupbyExpr.tableIndex, .columnIndex = colIndex}; SColumnIndex index = {.tableIndex = pQueryInfo->groupbyExpr.tableIndex, .columnIndex = colIndex};
SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TAG, &index, type, bytes, bytes, true); SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TAG, &index, type, bytes, getNewResColId(pQueryInfo), bytes, true);
memset(pExpr->aliasName, 0, sizeof(pExpr->aliasName)); memset(pExpr->aliasName, 0, sizeof(pExpr->aliasName));
tstrncpy(pExpr->aliasName, name, sizeof(pExpr->aliasName)); tstrncpy(pExpr->aliasName, name, sizeof(pExpr->aliasName));
...@@ -5913,7 +5938,7 @@ int32_t doLocalQueryProcess(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQ ...@@ -5913,7 +5938,7 @@ int32_t doLocalQueryProcess(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQ
SColumnIndex ind = {0}; SColumnIndex ind = {0};
SSqlExpr* pExpr1 = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TAG_DUMMY, &ind, TSDB_DATA_TYPE_INT, SSqlExpr* pExpr1 = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TAG_DUMMY, &ind, TSDB_DATA_TYPE_INT,
tDataTypeDesc[TSDB_DATA_TYPE_INT].nSize, tDataTypeDesc[TSDB_DATA_TYPE_INT].nSize, false); tDataTypeDesc[TSDB_DATA_TYPE_INT].nSize, getNewResColId(pQueryInfo), tDataTypeDesc[TSDB_DATA_TYPE_INT].nSize, false);
const char* name = (pExprList->a[0].aliasName != NULL)? pExprList->a[0].aliasName:functionsInfo[index].name; const char* name = (pExprList->a[0].aliasName != NULL)? pExprList->a[0].aliasName:functionsInfo[index].name;
tstrncpy(pExpr1->aliasName, name, tListLen(pExpr1->aliasName)); tstrncpy(pExpr1->aliasName, name, tListLen(pExpr1->aliasName));
...@@ -6585,6 +6610,7 @@ int32_t exprTreeFromSqlExpr(SSqlCmd* pCmd, tExprNode **pExpr, const tSQLExpr* pS ...@@ -6585,6 +6610,7 @@ int32_t exprTreeFromSqlExpr(SSqlCmd* pCmd, tExprNode **pExpr, const tSQLExpr* pS
if (strcmp((*pExpr)->pSchema->name, p1->aliasName) == 0) { if (strcmp((*pExpr)->pSchema->name, p1->aliasName) == 0) {
(*pExpr)->pSchema->type = (uint8_t)p1->resType; (*pExpr)->pSchema->type = (uint8_t)p1->resType;
(*pExpr)->pSchema->bytes = p1->resBytes; (*pExpr)->pSchema->bytes = p1->resBytes;
(*pExpr)->pSchema->colId = p1->resColId;
if (uid != NULL) { if (uid != NULL) {
*uid = p1->uid; *uid = p1->uid;
......
...@@ -241,11 +241,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { ...@@ -241,11 +241,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
.code = 0 .code = 0
}; };
// NOTE: the rpc context should be acquired before sending data to server. rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg, &pSql->rpcRid);
// Otherwise, the pSql object may have been released already during the response function, which is
// processMsgFromServer function. In the meanwhile, the assignment of the rpc context to sql object will absolutely
// cause crash.
pSql->rpcRid = rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -702,7 +698,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -702,7 +698,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->queryType = htonl(pQueryInfo->type); pQueryMsg->queryType = htonl(pQueryInfo->type);
size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo); size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo);
pQueryMsg->numOfOutput = htons((int16_t)numOfOutput); pQueryMsg->numOfOutput = htons((int16_t)numOfOutput); // this is the stage one output column number
// set column list ids // set column list ids
size_t numOfCols = taosArrayGetSize(pQueryInfo->colList); size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
...@@ -764,12 +760,15 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -764,12 +760,15 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
} }
assert(pExpr->resColId < 0);
pSqlFuncExpr->colInfo.colId = htons(pExpr->colInfo.colId); pSqlFuncExpr->colInfo.colId = htons(pExpr->colInfo.colId);
pSqlFuncExpr->colInfo.colIndex = htons(pExpr->colInfo.colIndex); pSqlFuncExpr->colInfo.colIndex = htons(pExpr->colInfo.colIndex);
pSqlFuncExpr->colInfo.flag = htons(pExpr->colInfo.flag); pSqlFuncExpr->colInfo.flag = htons(pExpr->colInfo.flag);
pSqlFuncExpr->functionId = htons(pExpr->functionId); pSqlFuncExpr->functionId = htons(pExpr->functionId);
pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams); pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams);
pSqlFuncExpr->resColId = htons(pExpr->resColId);
pMsg += sizeof(SSqlFuncMsg); pMsg += sizeof(SSqlFuncMsg);
for (int32_t j = 0; j < pExpr->numOfParams; ++j) { for (int32_t j = 0; j < pExpr->numOfParams; ++j) {
...@@ -788,6 +787,72 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -788,6 +787,72 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pSqlFuncExpr = (SSqlFuncMsg *)pMsg; pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
} }
if(tscIsSecondStageQuery(pQueryInfo)) {
size_t output = tscNumOfFields(pQueryInfo);
pQueryMsg->secondStageOutput = htonl((int32_t) output);
SSqlFuncMsg *pSqlFuncExpr1 = (SSqlFuncMsg *)pMsg;
for (int32_t i = 0; i < output; ++i) {
SInternalField* pField = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, i);
SSqlExpr *pExpr = pField->pSqlExpr;
if (pExpr != NULL) {
if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId, pExpr->numOfParams)) {
tscError("%p table schema is not matched with parsed sql", pSql);
return TSDB_CODE_TSC_INVALID_SQL;
}
pSqlFuncExpr1->colInfo.colId = htons(pExpr->colInfo.colId);
pSqlFuncExpr1->colInfo.colIndex = htons(pExpr->colInfo.colIndex);
pSqlFuncExpr1->colInfo.flag = htons(pExpr->colInfo.flag);
pSqlFuncExpr1->functionId = htons(pExpr->functionId);
pSqlFuncExpr1->numOfParams = htons(pExpr->numOfParams);
pMsg += sizeof(SSqlFuncMsg);
for (int32_t j = 0; j < pExpr->numOfParams; ++j) {
// todo add log
pSqlFuncExpr1->arg[j].argType = htons((uint16_t)pExpr->param[j].nType);
pSqlFuncExpr1->arg[j].argBytes = htons(pExpr->param[j].nLen);
if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) {
memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen);
pMsg += pExpr->param[j].nLen;
} else {
pSqlFuncExpr1->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64Key);
}
}
pSqlFuncExpr1 = (SSqlFuncMsg *)pMsg;
} else {
assert(pField->pArithExprInfo != NULL);
SExprInfo* pExprInfo = pField->pArithExprInfo;
pSqlFuncExpr1->colInfo.colId = htons(pExprInfo->base.colInfo.colId);
pSqlFuncExpr1->functionId = htons(pExprInfo->base.functionId);
pSqlFuncExpr1->numOfParams = htons(pExprInfo->base.numOfParams);
pMsg += sizeof(SSqlFuncMsg);
for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
// todo add log
pSqlFuncExpr1->arg[j].argType = htons((uint16_t)pExprInfo->base.arg[j].argType);
pSqlFuncExpr1->arg[j].argBytes = htons(pExprInfo->base.arg[j].argBytes);
if (pExprInfo->base.arg[j].argType == TSDB_DATA_TYPE_BINARY) {
memcpy(pMsg, pExprInfo->base.arg[j].argValue.pz, pExprInfo->base.arg[j].argBytes);
pMsg += pExprInfo->base.arg[j].argBytes;
} else {
pSqlFuncExpr1->arg[j].argValue.i64 = htobe64(pExprInfo->base.arg[j].argValue.i64);
}
}
pSqlFuncExpr1 = (SSqlFuncMsg *)pMsg;
}
}
} else {
pQueryMsg->secondStageOutput = 0;
}
// serialize the table info (sid, uid, tags) // serialize the table info (sid, uid, tags)
pMsg = doSerializeTableInfo(pQueryMsg, pSql, pMsg); pMsg = doSerializeTableInfo(pQueryMsg, pSql, pMsg);
...@@ -814,7 +879,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -814,7 +879,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
} }
if (pQueryInfo->fillType != TSDB_FILL_NONE) { if (pQueryInfo->fillType != TSDB_FILL_NONE) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) {
*((int64_t *)pMsg) = htobe64(pQueryInfo->fillVal[i]); *((int64_t *)pMsg) = htobe64(pQueryInfo->fillVal[i]);
pMsg += sizeof(pQueryInfo->fillVal[0]); pMsg += sizeof(pQueryInfo->fillVal[0]);
} }
...@@ -1950,7 +2015,7 @@ int tscProcessShowRsp(SSqlObj *pSql) { ...@@ -1950,7 +2015,7 @@ int tscProcessShowRsp(SSqlObj *pSql) {
SInternalField* pInfo = tscFieldInfoAppend(pFieldInfo, &f); SInternalField* pInfo = tscFieldInfoAppend(pFieldInfo, &f);
pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index,
pTableSchema[i].type, pTableSchema[i].bytes, pTableSchema[i].bytes, false); pTableSchema[i].type, pTableSchema[i].bytes, getNewResColId(pQueryInfo), pTableSchema[i].bytes, false);
} }
pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutput; pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
......
...@@ -28,7 +28,6 @@ ...@@ -28,7 +28,6 @@
#include "tutil.h" #include "tutil.h"
#include "ttimer.h" #include "ttimer.h"
#include "tscProfile.h" #include "tscProfile.h"
#include "ttimer.h"
static bool validImpl(const char* str, size_t maxsize) { static bool validImpl(const char* str, size_t maxsize) {
if (str == NULL) { if (str == NULL) {
...@@ -482,7 +481,7 @@ int taos_fetch_block_impl(TAOS_RES *res, TAOS_ROW *rows) { ...@@ -482,7 +481,7 @@ int taos_fetch_block_impl(TAOS_RES *res, TAOS_ROW *rows) {
assert(0); assert(0);
for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
tscGetResultColumnChr(pRes, &pQueryInfo->fieldsInfo, i); tscGetResultColumnChr(pRes, &pQueryInfo->fieldsInfo, i, 0);
} }
*rows = pRes->tsrow; *rows = pRes->tsrow;
......
...@@ -1643,7 +1643,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { ...@@ -1643,7 +1643,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
tExtMemBuffer ** pMemoryBuf = NULL; tExtMemBuffer ** pMemoryBuf = NULL;
tOrderDescriptor *pDesc = NULL; tOrderDescriptor *pDesc = NULL;
SColumnModel * pModel = NULL; SColumnModel *pModel = NULL;
pRes->qhandle = 0x1; // hack the qhandle check pRes->qhandle = 0x1; // hack the qhandle check
...@@ -1762,10 +1762,6 @@ static void tscFreeRetrieveSup(SSqlObj *pSql) { ...@@ -1762,10 +1762,6 @@ static void tscFreeRetrieveSup(SSqlObj *pSql) {
} }
tscDebug("%p start to free subquery supp obj:%p", pSql, trsupport); tscDebug("%p start to free subquery supp obj:%p", pSql, trsupport);
// int32_t index = trsupport->subqueryIndex;
// SSqlObj *pParentSql = trsupport->pParentSql;
// assert(pSql == pParentSql->pSubs[index]);
tfree(trsupport->localBuffer); tfree(trsupport->localBuffer);
tfree(trsupport); tfree(trsupport);
} }
...@@ -1956,7 +1952,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p ...@@ -1956,7 +1952,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
SQueryInfo *pPQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, 0); SQueryInfo *pPQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, 0);
tscClearInterpInfo(pPQueryInfo); tscClearInterpInfo(pPQueryInfo);
tscCreateLocalReducer(trsupport->pExtMemBuffer, pState->numOfSub, pDesc, trsupport->pFinalColModel, pParentSql); tscCreateLocalReducer(trsupport->pExtMemBuffer, pState->numOfSub, pDesc, trsupport->pFinalColModel, trsupport->pFFColModel, pParentSql);
tscDebug("%p build loser tree completed", pParentSql); tscDebug("%p build loser tree completed", pParentSql);
pParentSql->res.precision = pSql->res.precision; pParentSql->res.precision = pSql->res.precision;
...@@ -2418,7 +2414,7 @@ static void transferNcharData(SSqlObj *pSql, int32_t columnIndex, TAOS_FIELD *pF ...@@ -2418,7 +2414,7 @@ static void transferNcharData(SSqlObj *pSql, int32_t columnIndex, TAOS_FIELD *pF
} }
} }
static char *getArithemicInputSrc(void *param, const char *name, int32_t colId) { char *getArithemicInputSrc(void *param, const char *name, int32_t colId) {
SArithmeticSupport *pSupport = (SArithmeticSupport *) param; SArithmeticSupport *pSupport = (SArithmeticSupport *) param;
int32_t index = -1; int32_t index = -1;
...@@ -2449,48 +2445,22 @@ TAOS_ROW doSetResultRowData(SSqlObj *pSql, bool finalResult) { ...@@ -2449,48 +2445,22 @@ TAOS_ROW doSetResultRowData(SSqlObj *pSql, bool finalResult) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
size_t size = tscNumOfFields(pQueryInfo); size_t size = tscNumOfFields(pQueryInfo);
int32_t offset = 0;
for (int i = 0; i < size; ++i) { for (int i = 0; i < size; ++i) {
SInternalField* pSup = TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i); tscGetResultColumnChr(pRes, &pQueryInfo->fieldsInfo, i, offset);
if (pSup->pSqlExpr != NULL) { TAOS_FIELD *pField = TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i);
tscGetResultColumnChr(pRes, &pQueryInfo->fieldsInfo, i);
} offset += pField->bytes;
// primary key column cannot be null in interval query, no need to check // primary key column cannot be null in interval query, no need to check
if (i == 0 && pQueryInfo->interval.interval > 0) { if (i == 0 && pQueryInfo->interval.interval > 0) {
continue; continue;
} }
TAOS_FIELD *pField = TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i);
if (pRes->tsrow[i] != NULL && pField->type == TSDB_DATA_TYPE_NCHAR) { if (pRes->tsrow[i] != NULL && pField->type == TSDB_DATA_TYPE_NCHAR) {
transferNcharData(pSql, i, pField); transferNcharData(pSql, i, pField);
} }
// calculate the result from several other columns
if (pSup->pArithExprInfo != NULL) {
if (pRes->pArithSup == NULL) {
pRes->pArithSup = (SArithmeticSupport*)calloc(1, sizeof(SArithmeticSupport));
}
pRes->pArithSup->offset = 0;
pRes->pArithSup->pArithExpr = pSup->pArithExprInfo;
pRes->pArithSup->numOfCols = (int32_t)tscSqlExprNumOfExprs(pQueryInfo);
pRes->pArithSup->exprList = pQueryInfo->exprList;
pRes->pArithSup->data = calloc(pRes->pArithSup->numOfCols, POINTER_BYTES);
if (pRes->buffer[i] == NULL) {
TAOS_FIELD* field = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
pRes->buffer[i] = malloc(field->bytes);
}
for(int32_t k = 0; k < pRes->pArithSup->numOfCols; ++k) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, k);
pRes->pArithSup->data[k] = (pRes->data + pRes->numOfRows* pExpr->offset) + pRes->row*pExpr->resBytes;
}
tExprTreeCalcTraverse(pRes->pArithSup->pArithExpr->pExpr, 1, pRes->buffer[i], pRes->pArithSup,
TSDB_ORDER_ASC, getArithemicInputSrc);
pRes->tsrow[i] = (unsigned char*)pRes->buffer[i];
}
} }
pRes->row++; // index increase one-step pRes->row++; // index increase one-step
......
...@@ -105,6 +105,7 @@ void taos_init_imp(void) { ...@@ -105,6 +105,7 @@ void taos_init_imp(void) {
taosReadGlobalCfg(); taosReadGlobalCfg();
taosCheckGlobalCfg(); taosCheckGlobalCfg();
rpcInit();
tscDebug("starting to initialize TAOS client ..."); tscDebug("starting to initialize TAOS client ...");
tscDebug("Local End Point is:%s", tsLocalEp); tscDebug("Local End Point is:%s", tsLocalEp);
} }
...@@ -179,6 +180,7 @@ void taos_cleanup(void) { ...@@ -179,6 +180,7 @@ void taos_cleanup(void) {
taosCloseRef(tscRefId); taosCloseRef(tscRefId);
taosCleanupKeywordsTable(); taosCleanupKeywordsTable();
taosCloseLog(); taosCloseLog();
if (tscEmbedded == 0) rpcCleanup();
m = tscTmr; m = tscTmr;
if (m != NULL && atomic_val_compare_exchange_ptr(&tscTmr, m, 0) == m) { if (m != NULL && atomic_val_compare_exchange_ptr(&tscTmr, m, 0) == m) {
......
...@@ -219,6 +219,24 @@ bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo) { ...@@ -219,6 +219,24 @@ bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo) {
return true; return true;
} }
bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo) {
size_t numOfOutput = tscNumOfFields(pQueryInfo);
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
if (numOfOutput == numOfExprs) {
return false;
}
for(int32_t i = 0; i < numOfOutput; ++i) {
SExprInfo* pExprInfo = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, i)->pArithExprInfo;
if (pExprInfo != NULL) {
return true;
}
}
return false;
}
bool tscIsTWAQuery(SQueryInfo* pQueryInfo) { bool tscIsTWAQuery(SQueryInfo* pQueryInfo) {
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo); size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < numOfExprs; ++i) { for (int32_t i = 0; i < numOfExprs; ++i) {
...@@ -864,23 +882,6 @@ void tscFieldInfoUpdateOffset(SQueryInfo* pQueryInfo) { ...@@ -864,23 +882,6 @@ void tscFieldInfoUpdateOffset(SQueryInfo* pQueryInfo) {
} }
} }
void tscFieldInfoUpdateOffsetForInterResult(SQueryInfo* pQueryInfo) {
if (tscSqlExprNumOfExprs(pQueryInfo) == 0) {
return;
}
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
pExpr->offset = 0;
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 1; i < numOfExprs; ++i) {
SSqlExpr* prev = taosArrayGetP(pQueryInfo->exprList, i - 1);
SSqlExpr* p = taosArrayGetP(pQueryInfo->exprList, i);
p->offset = prev->offset + prev->resBytes;
}
}
SInternalField* tscFieldInfoGetInternalField(SFieldInfo* pFieldInfo, int32_t index) { SInternalField* tscFieldInfoGetInternalField(SFieldInfo* pFieldInfo, int32_t index) {
assert(index < pFieldInfo->numOfOutput); assert(index < pFieldInfo->numOfOutput);
return TARRAY_GET_ELEM(pFieldInfo->internalField, index); return TARRAY_GET_ELEM(pFieldInfo->internalField, index);
...@@ -944,6 +945,14 @@ void tscFieldInfoClear(SFieldInfo* pFieldInfo) { ...@@ -944,6 +945,14 @@ void tscFieldInfoClear(SFieldInfo* pFieldInfo) {
if (pInfo->pArithExprInfo != NULL) { if (pInfo->pArithExprInfo != NULL) {
tExprTreeDestroy(&pInfo->pArithExprInfo->pExpr, NULL); tExprTreeDestroy(&pInfo->pArithExprInfo->pExpr, NULL);
SSqlFuncMsg* pFuncMsg = &pInfo->pArithExprInfo->base;
for(int32_t j = 0; j < pFuncMsg->numOfParams; ++j) {
if (pFuncMsg->arg[j].argType == TSDB_DATA_TYPE_BINARY) {
tfree(pFuncMsg->arg[j].argValue.pz);
}
}
tfree(pInfo->pArithExprInfo); tfree(pInfo->pArithExprInfo);
} }
} }
...@@ -955,7 +964,7 @@ void tscFieldInfoClear(SFieldInfo* pFieldInfo) { ...@@ -955,7 +964,7 @@ void tscFieldInfoClear(SFieldInfo* pFieldInfo) {
} }
static SSqlExpr* doBuildSqlExpr(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type, static SSqlExpr* doBuildSqlExpr(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t interSize, int32_t colType) { int16_t size, int16_t resColId, int16_t interSize, int32_t colType) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, pColIndex->tableIndex); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, pColIndex->tableIndex);
SSqlExpr* pExpr = calloc(1, sizeof(SSqlExpr)); SSqlExpr* pExpr = calloc(1, sizeof(SSqlExpr));
...@@ -988,6 +997,7 @@ static SSqlExpr* doBuildSqlExpr(SQueryInfo* pQueryInfo, int16_t functionId, SCol ...@@ -988,6 +997,7 @@ static SSqlExpr* doBuildSqlExpr(SQueryInfo* pQueryInfo, int16_t functionId, SCol
pExpr->resType = type; pExpr->resType = type;
pExpr->resBytes = size; pExpr->resBytes = size;
pExpr->resColId = resColId;
pExpr->interBytes = interSize; pExpr->interBytes = interSize;
if (pTableMetaInfo->pTableMeta) { if (pTableMetaInfo->pTableMeta) {
...@@ -998,20 +1008,20 @@ static SSqlExpr* doBuildSqlExpr(SQueryInfo* pQueryInfo, int16_t functionId, SCol ...@@ -998,20 +1008,20 @@ static SSqlExpr* doBuildSqlExpr(SQueryInfo* pQueryInfo, int16_t functionId, SCol
} }
SSqlExpr* tscSqlExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, SColumnIndex* pColIndex, int16_t type, SSqlExpr* tscSqlExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t interSize, bool isTagCol) { int16_t size, int16_t resColId, int16_t interSize, bool isTagCol) {
int32_t num = (int32_t)taosArrayGetSize(pQueryInfo->exprList); int32_t num = (int32_t)taosArrayGetSize(pQueryInfo->exprList);
if (index == num) { if (index == num) {
return tscSqlExprAppend(pQueryInfo, functionId, pColIndex, type, size, interSize, isTagCol); return tscSqlExprAppend(pQueryInfo, functionId, pColIndex, type, size, resColId, interSize, isTagCol);
} }
SSqlExpr* pExpr = doBuildSqlExpr(pQueryInfo, functionId, pColIndex, type, size, interSize, isTagCol); SSqlExpr* pExpr = doBuildSqlExpr(pQueryInfo, functionId, pColIndex, type, size, resColId, interSize, isTagCol);
taosArrayInsert(pQueryInfo->exprList, index, &pExpr); taosArrayInsert(pQueryInfo->exprList, index, &pExpr);
return pExpr; return pExpr;
} }
SSqlExpr* tscSqlExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type, SSqlExpr* tscSqlExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t interSize, bool isTagCol) { int16_t size, int16_t resColId, int16_t interSize, bool isTagCol) {
SSqlExpr* pExpr = doBuildSqlExpr(pQueryInfo, functionId, pColIndex, type, size, interSize, isTagCol); SSqlExpr* pExpr = doBuildSqlExpr(pQueryInfo, functionId, pColIndex, type, size, resColId, interSize, isTagCol);
taosArrayPush(pQueryInfo->exprList, &pExpr); taosArrayPush(pQueryInfo->exprList, &pExpr);
return pExpr; return pExpr;
} }
...@@ -1039,16 +1049,14 @@ size_t tscSqlExprNumOfExprs(SQueryInfo* pQueryInfo) { ...@@ -1039,16 +1049,14 @@ size_t tscSqlExprNumOfExprs(SQueryInfo* pQueryInfo) {
return taosArrayGetSize(pQueryInfo->exprList); return taosArrayGetSize(pQueryInfo->exprList);
} }
void addExprParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes, int16_t tableIndex) { void addExprParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes) {
if (pExpr == NULL || argument == NULL || bytes == 0) { assert (pExpr != NULL || argument != NULL || bytes != 0);
return;
}
// set parameter value // set parameter value
// transfer to tVariant from byte data/no ascii data // transfer to tVariant from byte data/no ascii data
tVariantCreateFromBinary(&pExpr->param[pExpr->numOfParams], argument, bytes, type); tVariantCreateFromBinary(&pExpr->param[pExpr->numOfParams], argument, bytes, type);
pExpr->numOfParams += 1; pExpr->numOfParams += 1;
assert(pExpr->numOfParams <= 3); assert(pExpr->numOfParams <= 3);
} }
...@@ -1601,6 +1609,7 @@ void tscInitQueryInfo(SQueryInfo* pQueryInfo) { ...@@ -1601,6 +1609,7 @@ void tscInitQueryInfo(SQueryInfo* pQueryInfo) {
pQueryInfo->exprList = taosArrayInit(4, POINTER_BYTES); pQueryInfo->exprList = taosArrayInit(4, POINTER_BYTES);
pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES); pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
pQueryInfo->udColumnId = TSDB_UD_COLUMN_INDEX; pQueryInfo->udColumnId = TSDB_UD_COLUMN_INDEX;
pQueryInfo->resColumnId= -1000;
} }
int32_t tscAddSubqueryInfo(SSqlCmd* pCmd) { int32_t tscAddSubqueryInfo(SSqlCmd* pCmd) {
......
...@@ -197,7 +197,8 @@ public class TSDBConnection implements Connection { ...@@ -197,7 +197,8 @@ public class TSDBConnection implements Connection {
public SQLWarning getWarnings() throws SQLException { public SQLWarning getWarnings() throws SQLException {
//todo: implement getWarnings according to the warning messages returned from TDengine //todo: implement getWarnings according to the warning messages returned from TDengine
throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG); return null;
// throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
} }
public void clearWarnings() throws SQLException { public void clearWarnings() throws SQLException {
......
...@@ -24,8 +24,8 @@ int32_t dnodeInitVWrite(); ...@@ -24,8 +24,8 @@ int32_t dnodeInitVWrite();
void dnodeCleanupVWrite(); void dnodeCleanupVWrite();
void dnodeDispatchToVWriteQueue(SRpcMsg *pMsg); void dnodeDispatchToVWriteQueue(SRpcMsg *pMsg);
void * dnodeAllocVWriteQueue(void *pVnode); void * dnodeAllocVWriteQueue(void *pVnode);
void dnodeFreeVWriteQueue(void *wqueue); void dnodeFreeVWriteQueue(void *pWqueue);
void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code); void dnodeSendRpcVWriteRsp(void *pVnode, void *pWrite, int32_t code);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include "tconfig.h" #include "tconfig.h"
#include "tglobal.h" #include "tglobal.h"
#include "twal.h" #include "twal.h"
#include "trpc.h"
#include "dnode.h" #include "dnode.h"
#include "dnodeInt.h" #include "dnodeInt.h"
#include "dnodeMgmt.h" #include "dnodeMgmt.h"
...@@ -54,6 +55,7 @@ typedef struct { ...@@ -54,6 +55,7 @@ typedef struct {
} SDnodeComponent; } SDnodeComponent;
static const SDnodeComponent tsDnodeComponents[] = { static const SDnodeComponent tsDnodeComponents[] = {
{"rpc", rpcInit, rpcCleanup},
{"storage", dnodeInitStorage, dnodeCleanupStorage}, {"storage", dnodeInitStorage, dnodeCleanupStorage},
{"dnodecfg", dnodeInitCfg, dnodeCleanupCfg}, {"dnodecfg", dnodeInitCfg, dnodeCleanupCfg},
{"dnodeeps", dnodeInitEps, dnodeCleanupEps}, {"dnodeeps", dnodeInitEps, dnodeCleanupEps},
......
...@@ -151,6 +151,13 @@ void dnodeCleanupClient() { ...@@ -151,6 +151,13 @@ void dnodeCleanupClient() {
} }
static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
if (dnodeGetRunStatus() != TSDB_RUN_STATUS_RUNING) {
if (pMsg == NULL || pMsg->pCont == NULL) return;
dDebug("msg:%p is ignored since dnode not running", pMsg);
rpcFreeCont(pMsg->pCont);
return;
}
if (pMsg->msgType == TSDB_MSG_TYPE_DM_STATUS_RSP && pEpSet) { if (pMsg->msgType == TSDB_MSG_TYPE_DM_STATUS_RSP && pEpSet) {
dnodeUpdateEpSetForPeer(pEpSet); dnodeUpdateEpSetForPeer(pEpSet);
} }
...@@ -169,7 +176,7 @@ void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) { ...@@ -169,7 +176,7 @@ void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) {
} }
void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) { void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) {
rpcSendRequest(tsClientRpc, epSet, rpcMsg); rpcSendRequest(tsClientRpc, epSet, rpcMsg, NULL);
} }
void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) { void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) {
......
...@@ -38,11 +38,11 @@ typedef struct { ...@@ -38,11 +38,11 @@ typedef struct {
} SVWriteWorkerPool; } SVWriteWorkerPool;
static SVWriteWorkerPool tsVWriteWP; static SVWriteWorkerPool tsVWriteWP;
static void *dnodeProcessVWriteQueue(void *param); static void *dnodeProcessVWriteQueue(void *pWorker);
int32_t dnodeInitVWrite() { int32_t dnodeInitVWrite() {
tsVWriteWP.max = tsNumOfCores; tsVWriteWP.max = tsNumOfCores;
tsVWriteWP.worker = (SVWriteWorker *)tcalloc(sizeof(SVWriteWorker), tsVWriteWP.max); tsVWriteWP.worker = tcalloc(sizeof(SVWriteWorker), tsVWriteWP.max);
if (tsVWriteWP.worker == NULL) return -1; if (tsVWriteWP.worker == NULL) return -1;
pthread_mutex_init(&tsVWriteWP.mutex, NULL); pthread_mutex_init(&tsVWriteWP.mutex, NULL);
...@@ -162,13 +162,13 @@ void *dnodeAllocVWriteQueue(void *pVnode) { ...@@ -162,13 +162,13 @@ void *dnodeAllocVWriteQueue(void *pVnode) {
return queue; return queue;
} }
void dnodeFreeVWriteQueue(void *wqueue) { void dnodeFreeVWriteQueue(void *pWqueue) {
taosCloseQueue(wqueue); taosCloseQueue(pWqueue);
} }
void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code) { void dnodeSendRpcVWriteRsp(void *pVnode, void *wparam, int32_t code) {
if (param == NULL) return; if (wparam == NULL) return;
SVWriteMsg *pWrite = param; SVWriteMsg *pWrite = wparam;
if (code < 0) pWrite->code = code; if (code < 0) pWrite->code = code;
int32_t count = atomic_add_fetch_32(&pWrite->processedCount, 1); int32_t count = atomic_add_fetch_32(&pWrite->processedCount, 1);
...@@ -183,13 +183,11 @@ void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code) { ...@@ -183,13 +183,11 @@ void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code) {
}; };
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
taosFreeQitem(pWrite); vnodeFreeFromWQueue(pVnode, pWrite);
vnodeRelease(pVnode);
} }
static void *dnodeProcessVWriteQueue(void *param) { static void *dnodeProcessVWriteQueue(void *wparam) {
SVWriteWorker *pWorker = param; SVWriteWorker *pWorker = wparam;
SVWriteMsg * pWrite; SVWriteMsg * pWrite;
void * pVnode; void * pVnode;
int32_t numOfMsgs; int32_t numOfMsgs;
...@@ -232,8 +230,7 @@ static void *dnodeProcessVWriteQueue(void *param) { ...@@ -232,8 +230,7 @@ static void *dnodeProcessVWriteQueue(void *param) {
if (pWrite->rspRet.rsp) { if (pWrite->rspRet.rsp) {
rpcFreeCont(pWrite->rspRet.rsp); rpcFreeCont(pWrite->rspRet.rsp);
} }
taosFreeQitem(pWrite); vnodeFreeFromWQueue(pVnode, pWrite);
vnodeRelease(pVnode);
} }
} }
} }
......
...@@ -54,8 +54,8 @@ void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet ...@@ -54,8 +54,8 @@ void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet
void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid); void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid);
void *dnodeAllocVWriteQueue(void *pVnode); void *dnodeAllocVWriteQueue(void *pVnode);
void dnodeFreeVWriteQueue(void *wqueue); void dnodeFreeVWriteQueue(void *pWqueue);
void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code); void dnodeSendRpcVWriteRsp(void *pVnode, void *pWrite, int32_t code);
void *dnodeAllocVReadQueue(void *pVnode); void *dnodeAllocVReadQueue(void *pVnode);
void dnodeFreeVReadQueue(void *rqueue); void dnodeFreeVReadQueue(void *rqueue);
......
...@@ -201,6 +201,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_SUCH_FILE_OR_DIR, 0, 0x0507, "Missing da ...@@ -201,6 +201,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_SUCH_FILE_OR_DIR, 0, 0x0507, "Missing da
TAOS_DEFINE_ERROR(TSDB_CODE_VND_OUT_OF_MEMORY, 0, 0x0508, "Out of memory") TAOS_DEFINE_ERROR(TSDB_CODE_VND_OUT_OF_MEMORY, 0, 0x0508, "Out of memory")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_APP_ERROR, 0, 0x0509, "Unexpected generic error in vnode") TAOS_DEFINE_ERROR(TSDB_CODE_VND_APP_ERROR, 0, 0x0509, "Unexpected generic error in vnode")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_VRESION_FILE, 0, 0x050A, "Invalid version file") TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_VRESION_FILE, 0, 0x050A, "Invalid version file")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_FULL, 0, 0x050B, "Vnode memory is full because commit failed")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NOT_SYNCED, 0, 0x0511, "Database suspended") TAOS_DEFINE_ERROR(TSDB_CODE_VND_NOT_SYNCED, 0, 0x0511, "Database suspended")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, 0, 0x0512, "Write operation denied") TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, 0, 0x0512, "Write operation denied")
......
...@@ -392,6 +392,7 @@ typedef struct SColIndex { ...@@ -392,6 +392,7 @@ typedef struct SColIndex {
typedef struct SSqlFuncMsg { typedef struct SSqlFuncMsg {
int16_t functionId; int16_t functionId;
int16_t numOfParams; int16_t numOfParams;
int16_t resColId; // result column id, id of the current output column
SColIndex colInfo; SColIndex colInfo;
struct ArgElem { struct ArgElem {
...@@ -461,11 +462,6 @@ typedef struct STimeWindow { ...@@ -461,11 +462,6 @@ typedef struct STimeWindow {
TSKEY ekey; TSKEY ekey;
} STimeWindow; } STimeWindow;
/*
* the outputCols is equalled to or larger than numOfCols
* e.g., select min(colName), max(colName), avg(colName) from table
* the outputCols will be 3 while the numOfCols is 1.
*/
typedef struct { typedef struct {
SMsgHead head; SMsgHead head;
STimeWindow window; STimeWindow window;
...@@ -485,6 +481,7 @@ typedef struct { ...@@ -485,6 +481,7 @@ typedef struct {
int16_t tagNameRelType; // relation of tag criteria and tbname criteria int16_t tagNameRelType; // relation of tag criteria and tbname criteria
int16_t fillType; // interpolate type int16_t fillType; // interpolate type
uint64_t fillVal; // default value array list uint64_t fillVal; // default value array list
int32_t secondStageOutput;
int32_t tsOffset; // offset value in current msg body, NOTE: ts list is compressed int32_t tsOffset; // offset value in current msg body, NOTE: ts list is compressed
int32_t tsLen; // total length of ts comp block int32_t tsLen; // total length of ts comp block
int32_t tsNumOfBlocks; // ts comp block numbers int32_t tsNumOfBlocks; // ts comp block numbers
......
...@@ -78,12 +78,14 @@ typedef struct SRpcInit { ...@@ -78,12 +78,14 @@ typedef struct SRpcInit {
int (*afp)(char *tableId, char *spi, char *encrypt, char *secret, char *ckey); int (*afp)(char *tableId, char *spi, char *encrypt, char *secret, char *ckey);
} SRpcInit; } SRpcInit;
int32_t rpcInit();
void rpcCleanup();
void *rpcOpen(const SRpcInit *pRpc); void *rpcOpen(const SRpcInit *pRpc);
void rpcClose(void *); void rpcClose(void *);
void *rpcMallocCont(int contLen); void *rpcMallocCont(int contLen);
void rpcFreeCont(void *pCont); void rpcFreeCont(void *pCont);
void *rpcReallocCont(void *ptr, int contLen); void *rpcReallocCont(void *ptr, int contLen);
int64_t rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg); void rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid);
void rpcSendResponse(const SRpcMsg *pMsg); void rpcSendResponse(const SRpcMsg *pMsg);
void rpcSendRedirectRsp(void *pConn, const SRpcEpSet *pEpSet); void rpcSendRedirectRsp(void *pConn, const SRpcEpSet *pEpSet);
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
......
...@@ -46,7 +46,7 @@ extern "C" { ...@@ -46,7 +46,7 @@ extern "C" {
typedef struct { typedef struct {
void *appH; void *appH;
void *cqH; void *cqH;
int (*notifyStatus)(void *, int status); int (*notifyStatus)(void *, int status, int eno);
int (*eventCallBack)(void *); int (*eventCallBack)(void *);
void *(*cqCreateFunc)(void *handle, uint64_t uid, int sid, char *sqlStr, STSchema *pSchema); void *(*cqCreateFunc)(void *handle, uint64_t uid, int sid, char *sqlStr, STSchema *pSchema);
void (*cqDropFunc)(void *handle); void (*cqDropFunc)(void *handle);
...@@ -83,7 +83,7 @@ STsdbCfg *tsdbGetCfg(const TSDB_REPO_T *repo); ...@@ -83,7 +83,7 @@ STsdbCfg *tsdbGetCfg(const TSDB_REPO_T *repo);
int tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg); int tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg);
int32_t tsdbDropRepo(char *rootDir); int32_t tsdbDropRepo(char *rootDir);
TSDB_REPO_T *tsdbOpenRepo(char *rootDir, STsdbAppH *pAppH); TSDB_REPO_T *tsdbOpenRepo(char *rootDir, STsdbAppH *pAppH);
void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit); int tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit);
int32_t tsdbConfigRepo(TSDB_REPO_T *repo, STsdbCfg *pCfg); int32_t tsdbConfigRepo(TSDB_REPO_T *repo, STsdbCfg *pCfg);
int tsdbGetState(TSDB_REPO_T *repo); int tsdbGetState(TSDB_REPO_T *repo);
......
...@@ -70,11 +70,12 @@ void* vnodeAcquire(int32_t vgId); // add refcount ...@@ -70,11 +70,12 @@ void* vnodeAcquire(int32_t vgId); // add refcount
void vnodeRelease(void *pVnode); // dec refCount void vnodeRelease(void *pVnode); // dec refCount
void* vnodeGetWal(void *pVnode); void* vnodeGetWal(void *pVnode);
int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rparam); int32_t vnodeWriteToWQueue(void *pVnode, void *pHead, int32_t qtype, void *pRpcMsg);
int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rparam); void vnodeFreeFromWQueue(void *pVnode, SVWriteMsg *pWrite);
int32_t vnodeProcessWrite(void *pVnode, void *pHead, int32_t qtype, void *pRspRet);
int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes); int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes);
void vnodeBuildStatusMsg(void *param); void vnodeBuildStatusMsg(void *pStatus);
void vnodeConfirmForward(void *param, uint64_t version, int32_t code); void vnodeConfirmForward(void *pVnode, uint64_t version, int32_t code);
void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes); void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes);
int32_t vnodeInitResources(); int32_t vnodeInitResources();
......
...@@ -3,3 +3,4 @@ PROJECT(TDengine) ...@@ -3,3 +3,4 @@ PROJECT(TDengine)
ADD_SUBDIRECTORY(shell) ADD_SUBDIRECTORY(shell)
ADD_SUBDIRECTORY(taosdemo) ADD_SUBDIRECTORY(taosdemo)
ADD_SUBDIRECTORY(taosdump)
...@@ -46,7 +46,7 @@ static struct argp_option options[] = { ...@@ -46,7 +46,7 @@ static struct argp_option options[] = {
{"thread", 'T', "THREADNUM", 0, "Number of threads when using multi-thread to import data."}, {"thread", 'T', "THREADNUM", 0, "Number of threads when using multi-thread to import data."},
{"database", 'd', "DATABASE", 0, "Database to use when connecting to the server."}, {"database", 'd', "DATABASE", 0, "Database to use when connecting to the server."},
{"timezone", 't', "TIMEZONE", 0, "Time zone of the shell, default is local."}, {"timezone", 't', "TIMEZONE", 0, "Time zone of the shell, default is local."},
{"netrole", 'n', "NETROLE", 0, "Net role when network connectivity test, default is NULL, valid option: client | server."}, {"netrole", 'n', "NETROLE", 0, "Net role when network connectivity test, default is NULL, options: client|clients|server."},
{"endport", 'e', "ENDPORT", 0, "Net test end port, default is 6042."}, {"endport", 'e', "ENDPORT", 0, "Net test end port, default is 6042."},
{"pktlen", 'l', "PKTLEN", 0, "Packet length used for net test, default is 1000 bytes."}, {"pktlen", 'l', "PKTLEN", 0, "Packet length used for net test, default is 1000 bytes."},
{0}}; {0}};
......
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc)
INCLUDE_DIRECTORIES(inc)
AUX_SOURCE_DIRECTORY(. SRC)
IF (TD_LINUX)
ADD_EXECUTABLE(taosdump ${SRC})
IF (TD_SOMODE_STATIC)
TARGET_LINK_LIBRARIES(taosdump taos_static)
ELSE ()
TARGET_LINK_LIBRARIES(taosdump taos)
ENDIF ()
ENDIF ()
此差异已折叠。
taos1_6="/root/mnt/work/test/td1.6/build/bin/taos"
taosdump1_6="/root/mnt/work/test/td1.6/build/bin/taosdump"
taoscfg1_6="/root/mnt/work/test/td1.6/test/cfg"
taos2_0="/root/mnt/work/test/td2.0/build/bin/taos"
taosdump2_0="/root/mnt/work/test/td2.0/build/bin/taosdump"
taoscfg2_0="/root/mnt/work/test/td2.0/test/cfg"
data_dir="/root/mnt/work/test/td1.6/output"
table_list="/root/mnt/work/test/td1.6/tables"
DBNAME="test"
NTABLES=$(wc -l ${table_list} | awk '{print $1;}')
NTABLES_PER_DUMP=101
mkdir -p ${data_dir}
i=0
round=0
command="${taosdump1_6} -c ${taoscfg1_6} -o ${data_dir} -N 100 -T 20 ${DBNAME}"
while IFS= read -r line
do
i=$((i+1))
command="${command} ${line}"
if [[ "$i" -eq ${NTABLES_PER_DUMP} ]]; then
round=$((round+1))
echo "Starting round ${round} dump out..."
rm -f ${data_dir}/*
${command}
echo "Starting round ${round} dump in..."
${taosdump2_0} -c ${taoscfg2_0} -i ${data_dir}
# Reset variables
# command="${taosdump1_6} -c ${taoscfg1_6} -o ${data_dir} -N 100 ${DBNAME}"
command="${taosdump1_6} -c ${taoscfg1_6} -o ${data_dir} -N 100 -T 20 ${DBNAME}"
i=0
fi
done < "${table_list}"
if [[ ${i} -ne "0" ]]; then
round=$((round+1))
echo "Starting round ${round} dump out..."
rm -f ${data_dir}/*
${command}
echo "Starting round ${round} dump in..."
${taosdump2_0} -c ${taoscfg2_0} -i ${data_dir}
fi
...@@ -37,7 +37,7 @@ extern "C" { ...@@ -37,7 +37,7 @@ extern "C" {
#endif #endif
#ifndef TAOS_OS_DEF_EPOLL #ifndef TAOS_OS_DEF_EPOLL
#define TAOS_EPOLL_WAIT_TIME -1 #define TAOS_EPOLL_WAIT_TIME 500
#endif #endif
#ifdef TAOS_RANDOM_NETWORK_FAIL #ifdef TAOS_RANDOM_NETWORK_FAIL
......
...@@ -111,6 +111,9 @@ void taosUninitTimer() { ...@@ -111,6 +111,9 @@ void taosUninitTimer() {
pthread_sigmask(SIG_BLOCK, &set, NULL); pthread_sigmask(SIG_BLOCK, &set, NULL);
*/ */
void taosMsleep(int mseconds) { void taosMsleep(int mseconds) {
#if 1
usleep(mseconds * 1000);
#else
struct timeval timeout; struct timeval timeout;
int seconds, useconds; int seconds, useconds;
...@@ -126,7 +129,8 @@ void taosMsleep(int mseconds) { ...@@ -126,7 +129,8 @@ void taosMsleep(int mseconds) {
select(0, NULL, NULL, NULL, &timeout); select(0, NULL, NULL, NULL, &timeout);
/* pthread_sigmask(SIG_UNBLOCK, &set, NULL); */ /* pthread_sigmask(SIG_UNBLOCK, &set, NULL); */
#endif
} }
#endif #endif
\ No newline at end of file
...@@ -85,7 +85,7 @@ static void httpProcessHttpData(void *param) { ...@@ -85,7 +85,7 @@ static void httpProcessHttpData(void *param) {
while (1) { while (1) {
struct epoll_event events[HTTP_MAX_EVENTS]; struct epoll_event events[HTTP_MAX_EVENTS];
//-1 means uncertainty, 0-nowait, 1-wait 1 ms, set it from -1 to 1 //-1 means uncertainty, 0-nowait, 1-wait 1 ms, set it from -1 to 1
fdNum = epoll_wait(pThread->pollFd, events, HTTP_MAX_EVENTS, 1); fdNum = epoll_wait(pThread->pollFd, events, HTTP_MAX_EVENTS, TAOS_EPOLL_WAIT_TIME);
if (pThread->stop) { if (pThread->stop) {
httpDebug("%p, http thread get stop event, exiting...", pThread); httpDebug("%p, http thread get stop event, exiting...", pThread);
break; break;
......
...@@ -152,7 +152,10 @@ typedef struct SQuery { ...@@ -152,7 +152,10 @@ typedef struct SQuery {
SLimitVal limit; SLimitVal limit;
int32_t rowSize; int32_t rowSize;
SSqlGroupbyExpr* pGroupbyExpr; SSqlGroupbyExpr* pGroupbyExpr;
SExprInfo* pSelectExpr; SExprInfo* pExpr1;
SExprInfo* pExpr2;
int32_t numOfExpr2;
SColumnInfo* colList; SColumnInfo* colList;
SColumnInfo* tagColList; SColumnInfo* tagColList;
int32_t numOfFilterCols; int32_t numOfFilterCols;
......
...@@ -43,7 +43,8 @@ typedef struct SHistogramInfo { ...@@ -43,7 +43,8 @@ typedef struct SHistogramInfo {
int32_t numOfElems; int32_t numOfElems;
int32_t numOfEntries; int32_t numOfEntries;
int32_t maxEntries; int32_t maxEntries;
double min;
double max;
#if defined(USE_ARRAYLIST) #if defined(USE_ARRAYLIST)
SHistBin* elems; SHistBin* elems;
#else #else
...@@ -52,9 +53,6 @@ typedef struct SHistogramInfo { ...@@ -52,9 +53,6 @@ typedef struct SHistogramInfo {
int32_t maxIndex; int32_t maxIndex;
bool ordered; bool ordered;
#endif #endif
double min;
double max;
} SHistogramInfo; } SHistogramInfo;
SHistogramInfo* tHistogramCreate(int32_t numOfBins); SHistogramInfo* tHistogramCreate(int32_t numOfBins);
......
...@@ -48,7 +48,7 @@ static FORCE_INLINE SResultRow *getResultRow(SWindowResInfo *pWindowResInfo, int ...@@ -48,7 +48,7 @@ static FORCE_INLINE SResultRow *getResultRow(SWindowResInfo *pWindowResInfo, int
} }
#define curTimeWindowIndex(_winres) ((_winres)->curIndex) #define curTimeWindowIndex(_winres) ((_winres)->curIndex)
#define GET_ROW_PARAM_FOR_MULTIOUTPUT(_q, tbq, sq) (((tbq) && (!sq))? (_q)->pSelectExpr[1].base.arg->argValue.i64:1) #define GET_ROW_PARAM_FOR_MULTIOUTPUT(_q, tbq, sq) (((tbq) && (!sq))? (_q)->pExpr1[1].base.arg->argValue.i64:1)
bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot); bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot);
...@@ -62,7 +62,7 @@ static FORCE_INLINE char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int3 ...@@ -62,7 +62,7 @@ static FORCE_INLINE char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int3
int32_t realRowId = (int32_t)(pResult->rowId * GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pRuntimeEnv->topBotQuery, pRuntimeEnv->stableQuery)); int32_t realRowId = (int32_t)(pResult->rowId * GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pRuntimeEnv->topBotQuery, pRuntimeEnv->stableQuery));
return ((char *)page->data) + pRuntimeEnv->offset[columnIndex] * pRuntimeEnv->numOfRowsPerPage + return ((char *)page->data) + pRuntimeEnv->offset[columnIndex] * pRuntimeEnv->numOfRowsPerPage +
pQuery->pSelectExpr[columnIndex].bytes * realRowId; pQuery->pExpr1[columnIndex].bytes * realRowId;
} }
bool isNull_filter(SColumnFilterElem *pFilter, char* minval, char* maxval); bool isNull_filter(SColumnFilterElem *pFilter, char* minval, char* maxval);
......
...@@ -128,7 +128,7 @@ typedef struct SArithmeticSupport { ...@@ -128,7 +128,7 @@ typedef struct SArithmeticSupport {
SExprInfo *pArithExpr; SExprInfo *pArithExpr;
int32_t numOfCols; int32_t numOfCols;
SColumnInfo *colList; SColumnInfo *colList;
SArray* exprList; // client side used void *exprList; // client side used
int32_t offset; int32_t offset;
char** data; char** data;
} SArithmeticSupport; } SArithmeticSupport;
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.6.3/apache-maven-3.6.3-bin.zip
wrapperUrl=https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Subproject commit f2ffd30521b8e8afbc9d25c75f8eeeb6a48bd030
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册