提交 9307ddac 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

Merge branch 'develop' into feature/race

......@@ -79,7 +79,14 @@ pipeline {
cmake .. > /dev/null
make > /dev/null
cd ${WKC}/tests/pytest
./crash_gen.sh -a -p -t 4 -s 2000
'''
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WKC}/tests/pytest
./crash_gen.sh -a -p -t 4 -s 2000
'''
}
sh '''
date
cd ${WKC}/tests
./test-all.sh b2
......
......@@ -58,7 +58,7 @@ cp -r ${top_dir}/src/connector/grafanaplugin ${pkg_dir}${install_home_pat
cp -r ${top_dir}/src/connector/python ${pkg_dir}${install_home_path}/connector
cp -r ${top_dir}/src/connector/go ${pkg_dir}${install_home_path}/connector
cp -r ${top_dir}/src/connector/nodejs ${pkg_dir}${install_home_path}/connector
cp ${compile_dir}/build/lib/taos-jdbcdriver*dist.* ${pkg_dir}${install_home_path}/connector
cp ${compile_dir}/build/lib/taos-jdbcdriver*dist.* ${pkg_dir}${install_home_path}/connector ||:
cp -r ${compile_dir}/../packaging/deb/DEBIAN ${pkg_dir}/
chmod 755 ${pkg_dir}/DEBIAN/*
......
......@@ -156,9 +156,15 @@ build_time=$(date +"%F %R")
# get commint id from git
gitinfo=$(git rev-parse --verify HEAD)
enterprise_dir="${top_dir}/../enterprise"
cd ${enterprise_dir}
gitinfoOfInternal=$(git rev-parse --verify HEAD)
if [[ "$verMode" == "cluster" ]]; then
enterprise_dir="${top_dir}/../enterprise"
cd ${enterprise_dir}
gitinfoOfInternal=$(git rev-parse --verify HEAD)
else
gitinfoOfInternal=NULL
fi
cd ${curr_dir}
# 2. cmake executable file
......@@ -193,23 +199,35 @@ cd ${curr_dir}
# 3. Call the corresponding script for packaging
if [ "$osType" != "Darwin" ]; then
if [[ "$verMode" != "cluster" ]] && [[ "$cpuType" == "x64" ]] && [[ "$dbName" == "taos" ]]; then
echo "====do deb package for the ubuntu system===="
output_dir="${top_dir}/debs"
if [ -d ${output_dir} ]; then
${csudo} rm -rf ${output_dir}
ret='0'
command -v dpkg >/dev/null 2>&1 || { ret='1'; }
if [ "$ret" -eq 0 ]; then
echo "====do deb package for the ubuntu system===="
output_dir="${top_dir}/debs"
if [ -d ${output_dir} ]; then
${csudo} rm -rf ${output_dir}
fi
${csudo} mkdir -p ${output_dir}
cd ${script_dir}/deb
${csudo} ./makedeb.sh ${compile_dir} ${output_dir} ${verNumber} ${cpuType} ${osType} ${verMode} ${verType}
else
echo "==========dpkg command not exist, so not release deb package!!!"
fi
${csudo} mkdir -p ${output_dir}
cd ${script_dir}/deb
${csudo} ./makedeb.sh ${compile_dir} ${output_dir} ${verNumber} ${cpuType} ${osType} ${verMode} ${verType}
echo "====do rpm package for the centos system===="
output_dir="${top_dir}/rpms"
if [ -d ${output_dir} ]; then
${csudo} rm -rf ${output_dir}
ret='0'
command -v rpmbuild >/dev/null 2>&1 || { ret='1'; }
if [ "$ret" -eq 0 ]; then
echo "====do rpm package for the centos system===="
output_dir="${top_dir}/rpms"
if [ -d ${output_dir} ]; then
${csudo} rm -rf ${output_dir}
fi
${csudo} mkdir -p ${output_dir}
cd ${script_dir}/rpm
${csudo} ./makerpm.sh ${compile_dir} ${output_dir} ${verNumber} ${cpuType} ${osType} ${verMode} ${verType}
else
echo "==========rpmbuild command not exist, so not release rpm package!!!"
fi
${csudo} mkdir -p ${output_dir}
cd ${script_dir}/rpm
${csudo} ./makerpm.sh ${compile_dir} ${output_dir} ${verNumber} ${cpuType} ${osType} ${verMode} ${verType}
fi
echo "====do tar.gz package for all systems===="
......
......@@ -65,7 +65,7 @@ cp -r %{_compiledir}/../src/connector/grafanaplugin %{buildroot}%{homepath}/conn
cp -r %{_compiledir}/../src/connector/python %{buildroot}%{homepath}/connector
cp -r %{_compiledir}/../src/connector/go %{buildroot}%{homepath}/connector
cp -r %{_compiledir}/../src/connector/nodejs %{buildroot}%{homepath}/connector
cp %{_compiledir}/build/lib/taos-jdbcdriver*dist.* %{buildroot}%{homepath}/connector
cp %{_compiledir}/build/lib/taos-jdbcdriver*dist.* %{buildroot}%{homepath}/connector ||:
cp -r %{_compiledir}/../tests/examples/* %{buildroot}%{homepath}/examples
#Scripts executed before installation
......
......@@ -278,11 +278,11 @@ function install_service_on_sysvinit() {
# Install taosd service
if ((${os_type}==1)); then
${csudo} cp -f ${script_dir}/../deb/init.d/taosd ${install_main_dir}/init.d
${csudo} cp ${script_dir}/../deb/init.d/taosd ${service_config_dir} && ${csudo} chmod a+x ${service_config_dir}/taosd
${csudo} cp -f ${script_dir}/../deb/taosd ${install_main_dir}/init.d
${csudo} cp ${script_dir}/../deb/taosd ${service_config_dir} && ${csudo} chmod a+x ${service_config_dir}/taosd
elif ((${os_type}==2)); then
${csudo} cp -f ${script_dir}/../rpm/init.d/taosd ${install_main_dir}/init.d
${csudo} cp ${script_dir}/../rpm/init.d/taosd ${service_config_dir} && ${csudo} chmod a+x ${service_config_dir}/taosd
${csudo} cp -f ${script_dir}/../rpm/taosd ${install_main_dir}/init.d
${csudo} cp ${script_dir}/../rpm/taosd ${service_config_dir} && ${csudo} chmod a+x ${service_config_dir}/taosd
fi
#restart_config_str="taos:2345:respawn:${service_config_dir}/taosd start"
......
......@@ -110,7 +110,7 @@ mkdir -p ${install_dir}/connector
if [[ "$pagMode" != "lite" ]] && [[ "$cpuType" != "aarch32" ]]; then
if [ "$osType" != "Darwin" ]; then
cp ${build_dir}/lib/*.jar ${install_dir}/connector
cp ${build_dir}/lib/*.jar ${install_dir}/connector ||:
fi
cp -r ${connector_dir}/grafanaplugin ${install_dir}/connector/
cp -r ${connector_dir}/python ${install_dir}/connector/
......
......@@ -135,7 +135,7 @@ mkdir -p ${install_dir}/connector
if [[ "$pagMode" != "lite" ]] && [[ "$cpuType" != "aarch32" ]]; then
if [ "$osType" != "Darwin" ]; then
cp ${build_dir}/lib/*.jar ${install_dir}/connector
cp ${build_dir}/lib/*.jar ${install_dir}/connector ||:
fi
cp -r ${connector_dir}/grafanaplugin ${install_dir}/connector/
cp -r ${connector_dir}/python ${install_dir}/connector/
......
......@@ -124,7 +124,7 @@ cp ${lib_files} ${install_dir}/driver
connector_dir="${code_dir}/connector"
mkdir -p ${install_dir}/connector
if [[ "$pagMode" != "lite" ]] && [[ "$cpuType" != "aarch32" ]]; then
cp ${build_dir}/lib/*.jar ${install_dir}/connector
cp ${build_dir}/lib/*.jar ${install_dir}/connector ||:
cp -r ${connector_dir}/grafanaplugin ${install_dir}/connector/
cp -r ${connector_dir}/python ${install_dir}/connector/
cp -r ${connector_dir}/go ${install_dir}/connector
......
......@@ -156,7 +156,7 @@ cp ${lib_files} ${install_dir}/driver
connector_dir="${code_dir}/connector"
mkdir -p ${install_dir}/connector
if [[ "$pagMode" != "lite" ]] && [[ "$cpuType" != "aarch32" ]]; then
cp ${build_dir}/lib/*.jar ${install_dir}/connector
cp ${build_dir}/lib/*.jar ${install_dir}/connector ||:
cp -r ${connector_dir}/grafanaplugin ${install_dir}/connector/
cp -r ${connector_dir}/python ${install_dir}/connector/
cp -r ${connector_dir}/go ${install_dir}/connector
......
name: tdengine
base: core18
version: '2.0.5.1'
version: '2.0.6.0'
icon: snap/gui/t-dengine.svg
summary: an open-source big data platform designed and optimized for IoT.
description: |
......@@ -72,7 +72,7 @@ parts:
- usr/bin/taosd
- usr/bin/taos
- usr/bin/taosdemo
- usr/lib/libtaos.so.2.0.5.1
- usr/lib/libtaos.so.2.0.6.0
- usr/lib/libtaos.so.1
- usr/lib/libtaos.so
......
......@@ -82,6 +82,7 @@ typedef struct SJoinSupporter {
char* pIdTagList; // result of first stage tags
int32_t totalLen;
int32_t num;
SArray* pVgroupTables;
} SJoinSupporter;
typedef struct SVgroupTableInfo {
......@@ -215,7 +216,7 @@ SQueryInfo *tscGetQueryInfoDetailSafely(SSqlCmd *pCmd, int32_t subClauseIndex);
void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache);
STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, STableMeta* pTableMeta,
SVgroupsInfo* vgroupList, SArray* pTagCols);
SVgroupsInfo* vgroupList, SArray* pTagCols, SArray* pVgroupTables);
STableMetaInfo* tscAddEmptyMetaInfo(SQueryInfo *pQueryInfo);
int32_t tscAddSubqueryInfo(SSqlCmd *pCmd);
......@@ -224,6 +225,8 @@ void tscInitQueryInfo(SQueryInfo* pQueryInfo);
void tscClearSubqueryInfo(SSqlCmd* pCmd);
void tscFreeVgroupTableInfo(SArray* pVgroupTables);
SArray* tscCloneVgroupTableInfo(SArray* pVgroupTables);
void tscRemoveVgroupTableGroup(SArray* pVgroupTable, int32_t index);
int tscGetSTableVgroupInfo(SSqlObj* pSql, int32_t clauseIndex);
int tscGetTableMeta(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo);
......
......@@ -2461,12 +2461,22 @@ static void percentile_function(SQLFunctionCtx *pCtx) {
// the first stage, only acquire the min/max value
if (pInfo->stage == 0) {
if (pCtx->preAggVals.isSet) {
if (GET_DOUBLE_VAL(&pInfo->minval) > pCtx->preAggVals.statis.min) {
SET_DOUBLE_VAL(&pInfo->minval, (double)pCtx->preAggVals.statis.min);
double tmin = 0.0, tmax = 0.0;
if (pCtx->inputType >= TSDB_DATA_TYPE_TINYINT && pCtx->inputType <= TSDB_DATA_TYPE_BIGINT) {
tmin = (double)GET_INT64_VAL(&pCtx->preAggVals.statis.min);
tmax = (double)GET_INT64_VAL(&pCtx->preAggVals.statis.max);
} else if (pCtx->inputType == TSDB_DATA_TYPE_DOUBLE || pCtx->inputType == TSDB_DATA_TYPE_FLOAT) {
tmin = GET_DOUBLE_VAL(&pCtx->preAggVals.statis.min);
tmax = GET_DOUBLE_VAL(&pCtx->preAggVals.statis.max);
} else {
assert(true);
}
if (GET_DOUBLE_VAL(&pInfo->minval) > tmin) {
SET_DOUBLE_VAL(&pInfo->minval, tmin);
}
if (GET_DOUBLE_VAL(&pInfo->maxval) < pCtx->preAggVals.statis.max) {
SET_DOUBLE_VAL(&pInfo->maxval, (double)pCtx->preAggVals.statis.max);
if (GET_DOUBLE_VAL(&pInfo->maxval) < tmax) {
SET_DOUBLE_VAL(&pInfo->maxval, tmax);
}
pInfo->numOfElems += (pCtx->size - pCtx->preAggVals.statis.numOfNull);
......@@ -4025,11 +4035,11 @@ static void ts_comp_function(SQLFunctionCtx *pCtx) {
// primary ts must be existed, so no need to check its existance
if (pCtx->order == TSDB_ORDER_ASC) {
tsBufAppend(pTSbuf, 0, &pCtx->tag, input, pCtx->size * TSDB_KEYSIZE);
tsBufAppend(pTSbuf, (int32_t)pCtx->param[0].i64Key, &pCtx->tag, input, pCtx->size * TSDB_KEYSIZE);
} else {
for (int32_t i = pCtx->size - 1; i >= 0; --i) {
char *d = GET_INPUT_CHAR_INDEX(pCtx, i);
tsBufAppend(pTSbuf, 0, &pCtx->tag, d, TSDB_KEYSIZE);
tsBufAppend(pTSbuf, (int32_t)pCtx->param[0].i64Key, &pCtx->tag, d, (int32_t)TSDB_KEYSIZE);
}
}
......@@ -4048,7 +4058,7 @@ static void ts_comp_function_f(SQLFunctionCtx *pCtx, int32_t index) {
STSBuf *pTSbuf = pInfo->pTSBuf;
tsBufAppend(pTSbuf, 0, &pCtx->tag, pData, TSDB_KEYSIZE);
tsBufAppend(pTSbuf, (int32_t)pCtx->param[0].i64Key, &pCtx->tag, pData, TSDB_KEYSIZE);
SET_VAL(pCtx, pCtx->size, 1);
pResInfo->hasResult = DATA_SET_FLAG;
......
......@@ -698,7 +698,8 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
pg *= 2;
}
size_t numOfSubs = pTableMetaInfo->vgroupList->numOfVgroups;
size_t numOfSubs = pSql->subState.numOfSub;
assert(numOfSubs <= pTableMetaInfo->vgroupList->numOfVgroups);
for (int32_t i = 0; i < numOfSubs; ++i) {
(*pMemBuffer)[i] = createExtMemBuffer(nBufferSizes, rlen, pg, pModel);
(*pMemBuffer)[i]->flushModel = MULTIPLE_APPEND_MODEL;
......
......@@ -877,22 +877,13 @@ static bool validateTableColumnInfo(tFieldList* pFieldList, SSqlCmd* pCmd) {
int32_t nLen = 0;
for (int32_t i = 0; i < pFieldList->nField; ++i) {
if (pFieldList->p[i].bytes == 0) {
TAOS_FIELD* pField = &pFieldList->p[i];
if (pField->bytes == 0) {
invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5);
return false;
}
nLen += pFieldList->p[i].bytes;
}
// max row length must be less than TSDB_MAX_BYTES_PER_ROW
if (nLen > TSDB_MAX_BYTES_PER_ROW) {
invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
return false;
}
// field name must be unique
for (int32_t i = 0; i < pFieldList->nField; ++i) {
TAOS_FIELD* pField = &pFieldList->p[i];
if (pField->type < TSDB_DATA_TYPE_BOOL || pField->type > TSDB_DATA_TYPE_NCHAR) {
invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4);
return false;
......@@ -909,10 +900,19 @@ static bool validateTableColumnInfo(tFieldList* pFieldList, SSqlCmd* pCmd) {
return false;
}
// field name must be unique
if (has(pFieldList, i + 1, pFieldList->p[i].name) == true) {
invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
return false;
}
nLen += pField->bytes;
}
// max row length must be less than TSDB_MAX_BYTES_PER_ROW
if (nLen > TSDB_MAX_BYTES_PER_ROW) {
invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
return false;
}
return true;
......
......@@ -177,7 +177,7 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size
pVgroupInfo->epAddr[i].port = pEpMsg->port;
}
tscInitCorVgroupInfo(&pTableMeta->corVgroupInfo, &pTableMeta->vgroupInfo);
tscInitCorVgroupInfo(&pTableMeta->corVgroupInfo, pVgroupInfo);
pTableMeta->sversion = pTableMetaMsg->sversion;
pTableMeta->tversion = pTableMetaMsg->tversion;
......
......@@ -484,14 +484,25 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
int32_t vgIndex = pTableMetaInfo->vgroupIndex;
SVgroupsInfo* pVgroupInfo = pTableMetaInfo->vgroupList;
assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups);
if (pTableMetaInfo->pVgroupTables == NULL) {
SVgroupsInfo *pVgroupInfo = pTableMetaInfo->vgroupList;
assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups);
pRetrieveMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId);
tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d", pSql, pVgroupInfo->vgroups[vgIndex].vgId, vgIndex);
} else {
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
assert(vgIndex >= 0 && vgIndex < numOfVgroups);
pRetrieveMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId);
SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, vgIndex);
pRetrieveMsg->header.vgId = htonl(pTableIdList->vgInfo.vgId);
tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d", pSql, pTableIdList->vgInfo.vgId, vgIndex);
}
} else {
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
pRetrieveMsg->header.vgId = htonl(pTableMeta->vgroupInfo.vgId);
tscDebug("%p build fetch msg from only one vgroup, vgId:%d", pSql, pTableMeta->vgroupInfo.vgId);
}
pSql->cmd.payloadLen = sizeof(SRetrieveTableMsg);
......@@ -665,12 +676,12 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->limit = htobe64(pQueryInfo->limit.limit);
pQueryMsg->offset = htobe64(pQueryInfo->limit.offset);
pQueryMsg->numOfCols = htons((int16_t)taosArrayGetSize(pQueryInfo->colList));
pQueryMsg->interval.interval = htobe64(pQueryInfo->interval.interval);
pQueryMsg->interval.sliding = htobe64(pQueryInfo->interval.sliding);
pQueryMsg->interval.interval = htobe64(pQueryInfo->interval.interval);
pQueryMsg->interval.sliding = htobe64(pQueryInfo->interval.sliding);
pQueryMsg->interval.offset = htobe64(pQueryInfo->interval.offset);
pQueryMsg->interval.intervalUnit = pQueryInfo->interval.intervalUnit;
pQueryMsg->interval.slidingUnit = pQueryInfo->interval.slidingUnit;
pQueryMsg->interval.offsetUnit = pQueryInfo->interval.offsetUnit;
pQueryMsg->interval.slidingUnit = pQueryInfo->interval.slidingUnit;
pQueryMsg->interval.offsetUnit = pQueryInfo->interval.offsetUnit;
pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
pQueryMsg->numOfTags = htonl(numOfTags);
pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType);
......@@ -853,7 +864,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int32_t numOfBlocks = 0;
if (pQueryInfo->tsBuf != NULL) {
STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pQueryInfo->tsBuf, pTableMetaInfo->vgroupIndex);
int32_t vnodeId = htonl(pQueryMsg->head.vgId);
STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pQueryInfo->tsBuf, vnodeId);
assert(QUERY_IS_JOIN_QUERY(pQueryInfo->type) && pBlockInfo != NULL); // this query should not be sent
// todo refactor
......@@ -2274,7 +2286,7 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
STableMetaInfo *pMInfo = tscGetMetaInfo(pQueryInfo, i);
STableMeta *pTableMeta = taosCacheAcquireByData(tscMetaCache, pMInfo->pTableMeta);
tscAddTableMetaInfo(pNewQueryInfo, pMInfo->name, pTableMeta, NULL, pMInfo->tagColList);
tscAddTableMetaInfo(pNewQueryInfo, pMInfo->name, pTableMeta, NULL, pMInfo->tagColList, pMInfo->pVgroupTables);
}
if ((code = tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
......
......@@ -23,7 +23,6 @@
#include "tscSubquery.h"
#include "tschemautil.h"
#include "tsclient.h"
#include "tscSubquery.h"
typedef struct SInsertSupporter {
SSqlObj* pSql;
......@@ -59,6 +58,8 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ
pSubQueryInfo1->tsBuf = output1;
pSubQueryInfo2->tsBuf = output2;
TSKEY st = taosGetTimestampUs();
// no result generated, return directly
if (pSupporter1->pTSBuf == NULL || pSupporter2->pTSBuf == NULL) {
tscDebug("%p at least one ts-comp is empty, 0 for secondary query after ts blocks intersecting", pSql);
......@@ -95,7 +96,7 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ
tscInfo("%" PRId64 ", tags:%"PRId64" \t %" PRId64 ", tags:%"PRId64, elem1.ts, elem1.tag.i64Key, elem2.ts, elem2.tag.i64Key);
#endif
int32_t res = tVariantCompare(&elem1.tag, &elem2.tag);
int32_t res = tVariantCompare(elem1.tag, elem2.tag);
if (res == -1 || (res == 0 && tsCompare(order, elem1.ts, elem2.ts))) {
if (!tsBufNextPos(pSupporter1->pTSBuf)) {
break;
......@@ -122,8 +123,9 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ
win->ekey = elem1.ts;
}
tsBufAppend(output1, elem1.vnode, &elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts));
tsBufAppend(output2, elem2.vnode, &elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts));
tsBufAppend(output1, elem1.vnode, elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts));
tsBufAppend(output2, elem2.vnode, elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts));
} else {
pLimit->offset -= 1;
}
......@@ -158,9 +160,10 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ
tsBufDestroy(pSupporter1->pTSBuf);
tsBufDestroy(pSupporter2->pTSBuf);
tscDebug("%p input1:%" PRId64 ", input2:%" PRId64 ", final:%" PRId64 " for secondary query after ts blocks "
"intersecting, skey:%" PRId64 ", ekey:%" PRId64, pSql, numOfInput1, numOfInput2, output1->numOfTotal,
win->skey, win->ekey);
TSKEY et = taosGetTimestampUs();
tscDebug("%p input1:%" PRId64 ", input2:%" PRId64 ", final:%" PRId64 " in %d vnodes for secondary query after ts blocks "
"intersecting, skey:%" PRId64 ", ekey:%" PRId64 ", numOfVnode:%d, elasped time:%"PRId64" us", pSql, numOfInput1, numOfInput2, output1->numOfTotal,
output1->numOfVnodes, win->skey, win->ekey, tsBufGetNumOfVnodes(output1), et - st);
return output1->numOfTotal;
}
......@@ -216,6 +219,11 @@ static void tscDestroyJoinSupporter(SJoinSupporter* pSupporter) {
pSupporter->f = NULL;
}
if (pSupporter->pVgroupTables != NULL) {
taosArrayDestroy(pSupporter->pVgroupTables);
pSupporter->pVgroupTables = NULL;
}
taosTFree(pSupporter->pIdTagList);
tscTagCondRelease(&pSupporter->tagCond);
free(pSupporter);
......@@ -305,7 +313,6 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
// set the second stage sub query for join process
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE);
memcpy(&pQueryInfo->interval, &pSupporter->interval, sizeof(pQueryInfo->interval));
tscTagCondCopy(&pQueryInfo->tagCond, &pSupporter->tagCond);
......@@ -324,7 +331,9 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
tscFieldInfoUpdateOffset(pNewQueryInfo);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0);
pTableMetaInfo->pVgroupTables = pSupporter->pVgroupTables;
pSupporter->pVgroupTables = NULL;
/*
* When handling the projection query, the offset value will be modified for table-table join, which is changed
* during the timestamp intersection.
......@@ -356,10 +365,39 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
int16_t colId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid);
// set the tag column id for executor to extract correct tag value
pExpr->param[0].i64Key = colId;
pExpr->param[0] = (tVariant) {.i64Key = colId, .nType = TSDB_DATA_TYPE_BIGINT, .nLen = sizeof(int64_t)};
pExpr->numOfParams = 1;
}
int32_t num = 0;
int32_t *list = NULL;
tsBufGetVnodeIdList(pNewQueryInfo->tsBuf, &num, &list);
if (pTableMetaInfo->pVgroupTables != NULL) {
for(int32_t k = 0; k < taosArrayGetSize(pTableMetaInfo->pVgroupTables);) {
SVgroupTableInfo* p = taosArrayGet(pTableMetaInfo->pVgroupTables, k);
bool found = false;
for(int32_t f = 0; f < num; ++f) {
if (p->vgInfo.vgId == list[f]) {
found = true;
break;
}
}
if (!found) {
tscRemoveVgroupTableGroup(pTableMetaInfo->pVgroupTables, k);
} else {
k++;
}
}
assert(taosArrayGetSize(pTableMetaInfo->pVgroupTables) > 0);
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_MULTITABLE_QUERY);
}
taosTFree(list);
size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList);
tscDebug("%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, name:%s",
pSql, pNew, 0, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, taosArrayGetSize(pNewQueryInfo->exprList),
......@@ -418,6 +456,8 @@ static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSupporter* pSupporter) {
static void updateQueryTimeRange(SQueryInfo* pQueryInfo, STimeWindow* win) {
assert(pQueryInfo->window.skey <= win->skey && pQueryInfo->window.ekey >= win->ekey);
pQueryInfo->window = *win;
}
int32_t tscCompareTidTags(const void* p1, const void* p2) {
......@@ -474,10 +514,11 @@ static void issueTSCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj*
SSqlCmd* pCmd = &pSql->cmd;
tscClearSubqueryInfo(pCmd);
tscFreeSqlResult(pSql);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
assert(pQueryInfo->numOfTables == 1);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
tscInitQueryInfo(pQueryInfo);
TSDB_QUERY_CLEAR_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY);
......@@ -524,13 +565,7 @@ static void issueTSCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj*
tscProcessSql(pSql);
}
static bool checkForDuplicateTagVal(SQueryInfo* pQueryInfo, SJoinSupporter* p1, SSqlObj* pPSqlObj) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
SSchema* pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);// todo: tags mismatch, tags not completed
SColumn *pCol = taosArrayGetP(pTableMetaInfo->tagColList, 0);
SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];
static bool checkForDuplicateTagVal(SSchema* pColSchema, SJoinSupporter* p1, SSqlObj* pPSqlObj) {
for(int32_t i = 1; i < p1->num; ++i) {
STidTags* prev = (STidTags*) varDataVal(p1->pIdTagList + (i - 1) * p1->tagSize);
STidTags* p = (STidTags*) varDataVal(p1->pIdTagList + i * p1->tagSize);
......@@ -564,7 +599,7 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar
*s1 = taosArrayInit(p1->num, p1->tagSize - sizeof(int16_t));
*s2 = taosArrayInit(p2->num, p2->tagSize - sizeof(int16_t));
if (!(checkForDuplicateTagVal(pQueryInfo, p1, pParentSql) && checkForDuplicateTagVal(pQueryInfo, p2, pParentSql))) {
if (!(checkForDuplicateTagVal(pColSchema, p1, pParentSql) && checkForDuplicateTagVal(pColSchema, p2, pParentSql))) {
return TSDB_CODE_QRY_DUP_JOIN_KEY;
}
......@@ -708,6 +743,12 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
STableMetaInfo* pTableMetaInfo2 = tscGetMetaInfo(pQueryInfo2, 0);
tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo2, s2);
SSqlObj* psub1 = pParentSql->pSubs[0];
((SJoinSupporter*)psub1->param)->pVgroupTables = tscCloneVgroupTableInfo(pTableMetaInfo1->pVgroupTables);
SSqlObj* psub2 = pParentSql->pSubs[1];
((SJoinSupporter*)psub2->param)->pVgroupTables = tscCloneVgroupTableInfo(pTableMetaInfo2->pVgroupTables);
pParentSql->subState.numOfSub = 2;
pParentSql->subState.numOfRemain = pParentSql->subState.numOfSub;
......@@ -766,9 +807,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
pSupporter->pTSBuf = pBuf;
} else {
assert(pQueryInfo->numOfTables == 1); // for subquery, only one
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
tsBufMerge(pSupporter->pTSBuf, pBuf, pTableMetaInfo->vgroupIndex);
tsBufMerge(pSupporter->pTSBuf, pBuf);
tsBufDestroy(pBuf);
}
......@@ -835,6 +874,8 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
// launch the query the retrieve actual results from vnode along with the filtered timestamp
SQueryInfo* pPQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, pParentSql->cmd.clauseIndex);
updateQueryTimeRange(pPQueryInfo, &win);
//update the vgroup that involved in real data query
tscLaunchRealSubqueries(pParentSql);
}
......@@ -868,20 +909,27 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
assert(pQueryInfo->numOfTables == 1);
// for projection query, need to try next vnode if current vnode is exhausted
if ((++pTableMetaInfo->vgroupIndex) < pTableMetaInfo->vgroupList->numOfVgroups) {
pState->numOfRemain = 1;
pState->numOfSub = 1;
int32_t numOfVgroups = 0; // TODO refactor
if (pTableMetaInfo->pVgroupTables != NULL) {
numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
} else {
numOfVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
}
if ((++pTableMetaInfo->vgroupIndex) < numOfVgroups) {
tscDebug("%p no result in current vnode anymore, try next vnode, vgIndex:%d", pSql, pTableMetaInfo->vgroupIndex);
pSql->cmd.command = TSDB_SQL_SELECT;
pSql->fp = tscJoinQueryCallback;
tscProcessSql(pSql);
tscProcessSql(pSql);
return;
} else {
tscDebug("%p no result in current subquery anymore", pSql);
}
}
if (atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1) > 0) {
tscDebug("%p sub:%p completed, remain:%d, total:%d", pParentSql, tres, pParentSql->subState.numOfRemain, pState->numOfSub);
if (atomic_sub_fetch_32(&pState->numOfRemain, 1) > 0) {
tscDebug("%p sub:%p completed, remain:%d, total:%d", pParentSql, tres, pState->numOfRemain, pState->numOfSub);
return;
}
......@@ -895,60 +943,60 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
// update the records for each subquery in parent sql object.
for (int32_t i = 0; i < pState->numOfSub; ++i) {
if (pParentSql->pSubs[i] == NULL) {
tscDebug("%p %p sub:%d not retrieve data", pParentSql, NULL, i);
continue;
}
SSqlRes* pRes1 = &pParentSql->pSubs[i]->res;
pRes1->numOfClauseTotal += pRes1->numOfRows;
}
// data has retrieved to client, build the join results
tscBuildResFromSubqueries(pParentSql);
}
static SJoinSupporter* tscUpdateSubqueryStatus(SSqlObj* pSql, int32_t numOfFetch) {
int32_t notInvolved = 0;
SJoinSupporter* pSupporter = NULL;
SSubqueryState* pState = &pSql->subState;
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
if (pSql->pSubs[i] == NULL) {
notInvolved++;
if (pRes1->row > 0 && pRes1->numOfRows > 0) {
tscDebug("%p sub:%p index:%d numOfRows:%"PRId64" total:%"PRId64 " (not retrieve)", pParentSql, pParentSql->pSubs[i], i,
pRes1->numOfRows, pRes1->numOfTotal);
assert(pRes1->row < pRes1->numOfRows);
} else {
pSupporter = (SJoinSupporter*)pSql->pSubs[i]->param;
pRes1->numOfClauseTotal += pRes1->numOfRows;
tscDebug("%p sub:%p index:%d numOfRows:%"PRId64" total:%"PRId64, pParentSql, pParentSql->pSubs[i], i,
pRes1->numOfRows, pRes1->numOfTotal);
}
}
pState->numOfRemain = numOfFetch;
return pSupporter;
// data has retrieved to client, build the join results
tscBuildResFromSubqueries(pParentSql);
}
void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
assert(pSql->subState.numOfSub >= 1);
int32_t numOfFetch = 0;
bool hasData = true;
bool hasData = true;
bool reachLimit = false;
// if the subquery is NULL, it does not involved in the final result generation
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
// if the subquery is NULL, it does not involved in the final result generation
SSqlObj* pSub = pSql->pSubs[i];
if (pSub == NULL) {
continue;
}
SSqlRes *pRes = &pSub->res;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSub->cmd, 0);
if (!tscHasReachLimitation(pQueryInfo, pRes)) {
if (pRes->row >= pRes->numOfRows) {
// no data left in current result buffer
hasData = false;
// The current query is completed for the active vnode, try next vnode if exists
// If it is completed, no need to fetch anymore.
if (!pRes->completed) {
numOfFetch++;
}
}
} else { // has reach the limitation, no data anymore
if (pRes->row >= pRes->numOfRows) {
hasData = false;
reachLimit = true;
hasData = false;
break;
}
}
......@@ -958,29 +1006,102 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
if (hasData) {
tscBuildResFromSubqueries(pSql);
return;
} else if (numOfFetch <= 0) {
}
// If at least one subquery is completed in current vnode, try the next vnode in case of multi-vnode
// super table projection query.
if (reachLimit) {
pSql->res.completed = true;
freeJoinSubqueryObj(pSql);
if (pSql->res.code == TSDB_CODE_SUCCESS) {
(*pSql->fp)(pSql->param, pSql, 0);
} else {
tscQueueAsyncRes(pSql);
}
return;
}
if (numOfFetch <= 0) {
bool tryNextVnode = false;
SSqlObj* pp = pSql->pSubs[0];
SQueryInfo* pi = tscGetQueryInfoDetail(&pp->cmd, 0);
// get the number of subquery that need to retrieve the next vnode.
if (tscNonOrderedProjectionQueryOnSTable(pi, 0)) {
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i];
if (pSub != NULL && pSub->res.row >= pSub->res.numOfRows && pSub->res.completed) {
pSql->subState.numOfRemain++;
}
}
}
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i];
if (pSub == NULL) {
continue;
}
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSub->cmd, 0);
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && pSub->res.row >= pSub->res.numOfRows &&
pSub->res.completed) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
assert(pQueryInfo->numOfTables == 1);
// for projection query, need to try next vnode if current vnode is exhausted
int32_t numOfVgroups = 0; // TODO refactor
if (pTableMetaInfo->pVgroupTables != NULL) {
numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
} else {
numOfVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
}
if ((++pTableMetaInfo->vgroupIndex) < numOfVgroups) {
tscDebug("%p no result in current vnode anymore, try next vnode, vgIndex:%d", pSub,
pTableMetaInfo->vgroupIndex);
pSub->cmd.command = TSDB_SQL_SELECT;
pSub->fp = tscJoinQueryCallback;
tscProcessSql(pSub);
tryNextVnode = true;
} else {
tscDebug("%p no result in current subquery anymore", pSub);
}
}
}
if (tryNextVnode) {
return;
}
pSql->res.completed = true;
freeJoinSubqueryObj(pSql);
if (pSql->res.code == TSDB_CODE_SUCCESS) {
(*pSql->fp)(pSql->param, pSql, 0);
} else {
tscQueueAsyncRes(pSql);
}
return;
}
// TODO multi-vnode retrieve for projection query with limitation has bugs, since the global limiation is not handled
// retrieve data from current vnode.
tscDebug("%p retrieve data from %d subqueries", pSql, numOfFetch);
SJoinSupporter* pSupporter = tscUpdateSubqueryStatus(pSql, numOfFetch);
SJoinSupporter* pSupporter = NULL;
pSql->subState.numOfRemain = numOfFetch;
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSql1 = pSql->pSubs[i];
if (pSql1 == NULL) {
continue;
}
SSqlRes* pRes1 = &pSql1->res;
SSqlCmd* pCmd1 = &pSql1->cmd;
......@@ -1122,7 +1243,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
* data instead of returning to its invoker
*/
if (pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
pParentSql->subState.numOfRemain = pParentSql->subState.numOfSub; // reset the record value
// pParentSql->subState.numOfRemain = pParentSql->subState.numOfSub; // reset the record value
pSql->fp = joinRetrieveFinalResCallback; // continue retrieve data
pSql->cmd.command = TSDB_SQL_FETCH;
......@@ -1386,7 +1507,13 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
SSubqueryState *pState = &pSql->subState;
pState->numOfSub = pTableMetaInfo->vgroupList->numOfVgroups;
pState->numOfSub = 0;
if (pTableMetaInfo->pVgroupTables == NULL) {
pState->numOfSub = pTableMetaInfo->vgroupList->numOfVgroups;
} else {
pState->numOfSub = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
}
assert(pState->numOfSub > 0);
int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, nBufferSize);
......@@ -2017,7 +2144,7 @@ static char* getResultBlockPosition(SSqlCmd* pCmd, SSqlRes* pRes, int32_t column
assert(pInfo->pSqlExpr != NULL);
*bytes = pInfo->pSqlExpr->resBytes;
char* pData = pRes->data + pInfo->pSqlExpr->offset * pRes->numOfRows;
char* pData = pRes->data + pInfo->pSqlExpr->offset * pRes->numOfRows + pRes->row * (*bytes);
return pData;
}
......@@ -2029,11 +2156,13 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) {
int32_t numOfRes = INT32_MAX;
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
if (pSql->pSubs[i] == NULL) {
SSqlObj* pSub = pSql->pSubs[i];
if (pSub == NULL) {
continue;
}
numOfRes = (int32_t)(MIN(numOfRes, pSql->pSubs[i]->res.numOfRows));
int32_t remain = (int32_t)(pSub->res.numOfRows - pSub->res.row);
numOfRes = (int32_t)(MIN(numOfRes, remain));
}
if (numOfRes == 0) {
......@@ -2059,14 +2188,23 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) {
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
for(int32_t i = 0; i < numOfExprs; ++i) {
SColumnIndex* pIndex = &pRes->pColumnIndex[i];
SSqlRes *pRes1 = &pSql->pSubs[pIndex->tableIndex]->res;
SSqlCmd *pCmd1 = &pSql->pSubs[pIndex->tableIndex]->cmd;
SSqlRes* pRes1 = &pSql->pSubs[pIndex->tableIndex]->res;
SSqlCmd* pCmd1 = &pSql->pSubs[pIndex->tableIndex]->cmd;
char* pData = getResultBlockPosition(pCmd1, pRes1, pIndex->columnIndex, &bytes);
memcpy(data, pData, bytes * numOfRes);
data += bytes * numOfRes;
pRes1->row = numOfRes;
}
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i];
if (pSub == NULL) {
continue;
}
pSub->res.row += numOfRes;
assert(pSub->res.row <= pSub->res.numOfRows);
}
pRes->numOfRows = numOfRes;
......@@ -2085,6 +2223,8 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
pRes->numOfCols = (int32_t)numOfExprs;
pRes->tsrow = calloc(numOfExprs, POINTER_BYTES);
pRes->buffer = calloc(numOfExprs, POINTER_BYTES);
pRes->length = calloc(numOfExprs, sizeof(int32_t));
......
......@@ -1121,6 +1121,8 @@ int32_t tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepco
}
*p1 = *pExpr;
memset(p1->param, 0, sizeof(tVariant) * tListLen(p1->param));
for (int32_t j = 0; j < pExpr->numOfParams; ++j) {
tVariantAssign(&p1->param[j], &pExpr->param[j]);
}
......@@ -1678,19 +1680,62 @@ void tscClearSubqueryInfo(SSqlCmd* pCmd) {
}
void tscFreeVgroupTableInfo(SArray* pVgroupTables) {
if (pVgroupTables != NULL) {
size_t num = taosArrayGetSize(pVgroupTables);
for (size_t i = 0; i < num; i++) {
SVgroupTableInfo* pInfo = taosArrayGet(pVgroupTables, i);
if (pVgroupTables == NULL) {
return;
}
for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) {
taosTFree(pInfo->vgInfo.epAddr[j].fqdn);
}
size_t num = taosArrayGetSize(pVgroupTables);
for (size_t i = 0; i < num; i++) {
SVgroupTableInfo* pInfo = taosArrayGet(pVgroupTables, i);
for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) {
taosTFree(pInfo->vgInfo.epAddr[j].fqdn);
}
taosArrayDestroy(pInfo->itemList);
}
taosArrayDestroy(pVgroupTables);
}
void tscRemoveVgroupTableGroup(SArray* pVgroupTable, int32_t index) {
assert(pVgroupTable != NULL && index >= 0);
size_t size = taosArrayGetSize(pVgroupTable);
assert(size > index);
taosArrayDestroy(pInfo->itemList);
SVgroupTableInfo* pInfo = taosArrayGet(pVgroupTable, index);
for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) {
taosTFree(pInfo->vgInfo.epAddr[j].fqdn);
}
taosArrayDestroy(pInfo->itemList);
taosArrayRemove(pVgroupTable, index);
}
SArray* tscCloneVgroupTableInfo(SArray* pVgroupTables) {
if (pVgroupTables == NULL) {
return NULL;
}
size_t num = taosArrayGetSize(pVgroupTables);
SArray* pa = taosArrayInit(num, sizeof(SVgroupTableInfo));
SVgroupTableInfo info;
for (size_t i = 0; i < num; i++) {
SVgroupTableInfo* pInfo = taosArrayGet(pVgroupTables, i);
memset(&info, 0, sizeof(SVgroupTableInfo));
info.vgInfo = pInfo->vgInfo;
for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) {
info.vgInfo.epAddr[j].fqdn = strdup(pInfo->vgInfo.epAddr[j].fqdn);
}
taosArrayDestroy(pVgroupTables);
info.itemList = taosArrayClone(pInfo->itemList);
taosArrayPush(pa, &info);
}
return pa;
}
void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache) {
......@@ -1708,7 +1753,7 @@ void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool rem
}
STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, STableMeta* pTableMeta,
SVgroupsInfo* vgroupList, SArray* pTagCols) {
SVgroupsInfo* vgroupList, SArray* pTagCols, SArray* pVgroupTables) {
void* pAlloc = realloc(pQueryInfo->pTableMetaInfo, (pQueryInfo->numOfTables + 1) * POINTER_BYTES);
if (pAlloc == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
......@@ -1742,13 +1787,15 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST
if (pTagCols != NULL) {
tscColumnListCopy(pTableMetaInfo->tagColList, pTagCols, -1);
}
pTableMetaInfo->pVgroupTables = tscCloneVgroupTableInfo(pVgroupTables);
pQueryInfo->numOfTables += 1;
return pTableMetaInfo;
}
STableMetaInfo* tscAddEmptyMetaInfo(SQueryInfo* pQueryInfo) {
return tscAddTableMetaInfo(pQueryInfo, NULL, NULL, NULL, NULL);
return tscAddTableMetaInfo(pQueryInfo, NULL, NULL, NULL, NULL, NULL);
}
void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache) {
......@@ -1822,7 +1869,7 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm
assert(pSql->cmd.clauseIndex == 0);
STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
tscAddTableMetaInfo(pQueryInfo, pMasterTableMetaInfo->name, NULL, NULL, NULL);
tscAddTableMetaInfo(pQueryInfo, pMasterTableMetaInfo->name, NULL, NULL, NULL, NULL);
registerSqlObj(pNew);
return pNew;
......@@ -1987,14 +2034,16 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
STableMeta* pTableMeta = taosCacheAcquireByData(tscMetaCache, pTableMetaInfo->pTableMeta); // get by name may failed due to the cache cleanup
assert(pTableMeta != NULL);
pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pTableMeta, pTableMetaInfo->vgroupList, pTableMetaInfo->tagColList);
pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pTableMeta, pTableMetaInfo->vgroupList,
pTableMetaInfo->tagColList, pTableMetaInfo->pVgroupTables);
} else { // transfer the ownership of pTableMeta to the newly create sql object.
STableMetaInfo* pPrevInfo = tscGetTableMetaInfoFromCmd(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex, 0);
STableMeta* pPrevTableMeta = taosCacheTransfer(tscMetaCache, (void**)&pPrevInfo->pTableMeta);
SVgroupsInfo* pVgroupsInfo = pPrevInfo->vgroupList;
pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pPrevTableMeta, pVgroupsInfo, pTableMetaInfo->tagColList);
pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pPrevTableMeta, pVgroupsInfo, pTableMetaInfo->tagColList,
pTableMetaInfo->pVgroupTables);
}
if (pFinalInfo->pTableMeta == NULL) {
......
......@@ -44,14 +44,17 @@ extern int32_t tsMaxShellConns;
extern int32_t tsShellActivityTimer;
extern uint32_t tsMaxTmrCtrl;
extern float tsNumOfThreadsPerCore;
extern float tsRatioOfQueryThreads;
extern float tsRatioOfQueryThreads; // todo remove it
extern int8_t tsDaylight;
extern char tsTimezone[];
extern char tsLocale[];
extern char tsCharset[]; // default encode string
extern char tsCharset[]; // default encode string
extern int32_t tsEnableCoreFile;
extern int32_t tsCompressMsgSize;
//query buffer management
extern int32_t tsQueryBufferSize; // maximum allowed usage buffer for each data node during query processing
// client
extern int32_t tsTableMetaKeepTimer;
extern int32_t tsMaxSQLStringLen;
......
......@@ -45,14 +45,14 @@ int32_t tsEnableTelemetryReporting = 1;
char tsEmail[TSDB_FQDN_LEN] = {0};
// common
int32_t tsRpcTimer = 1000;
int32_t tsRpcMaxTime = 600; // seconds;
int32_t tsMaxShellConns = 5000;
int32_t tsRpcTimer = 1000;
int32_t tsRpcMaxTime = 600; // seconds;
int32_t tsMaxShellConns = 5000;
int32_t tsMaxConnections = 5000;
int32_t tsShellActivityTimer = 3; // second
float tsNumOfThreadsPerCore = 1.0;
float tsRatioOfQueryThreads = 0.5;
int8_t tsDaylight = 0;
int32_t tsShellActivityTimer = 3; // second
float tsNumOfThreadsPerCore = 1.0f;
float tsRatioOfQueryThreads = 0.5f;
int8_t tsDaylight = 0;
char tsTimezone[TSDB_TIMEZONE_LEN] = {0};
char tsLocale[TSDB_LOCALE_LEN] = {0};
char tsCharset[TSDB_LOCALE_LEN] = {0}; // default encode string
......@@ -99,6 +99,12 @@ float tsStreamComputDelayRatio = 0.1f;
int32_t tsProjectExecInterval = 10000; // every 10sec, the projection will be executed once
int64_t tsMaxRetentWindow = 24 * 3600L; // maximum time window tolerance
// the maximum allowed query buffer size during query processing for each data node.
// -1 no limit (default)
// 0 no query allowed, queries are disabled
// positive value (in MB)
int32_t tsQueryBufferSize = -1;
// db parameters
int32_t tsCacheBlockSize = TSDB_DEFAULT_CACHE_BLOCK_SIZE;
int32_t tsBlocksPerVnode = TSDB_DEFAULT_TOTAL_BLOCKS;
......@@ -676,7 +682,7 @@ static void doInitGlobalConfig(void) {
cfg.minValue = TSDB_MIN_CACHE_BLOCK_SIZE;
cfg.maxValue = TSDB_MAX_CACHE_BLOCK_SIZE;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_Mb;
cfg.unitType = TAOS_CFG_UTYPE_MB;
taosInitConfigOption(cfg);
cfg.option = "blocks";
......@@ -839,6 +845,16 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "queryBufferSize";
cfg.ptr = &tsQueryBufferSize;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = -1;
cfg.maxValue = 500000000000.0f;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_BYTE;
taosInitConfigOption(cfg);
// locale & charset
cfg.option = "timezone";
cfg.ptr = tsTimezone;
......
......@@ -144,21 +144,24 @@ void tVariantDestroy(tVariant *pVar) {
void tVariantAssign(tVariant *pDst, const tVariant *pSrc) {
if (pSrc == NULL || pDst == NULL) return;
*pDst = *pSrc;
pDst->nType = pSrc->nType;
if (pSrc->nType == TSDB_DATA_TYPE_BINARY || pSrc->nType == TSDB_DATA_TYPE_NCHAR) {
int32_t len = pSrc->nLen + 1;
if (pSrc->nType == TSDB_DATA_TYPE_NCHAR) {
len = len * TSDB_NCHAR_SIZE;
}
pDst->pz = calloc(1, len);
memcpy(pDst->pz, pSrc->pz, len);
int32_t len = pSrc->nLen + TSDB_NCHAR_SIZE;
char* p = realloc(pDst->pz, len);
assert(p);
memset(p, 0, len);
pDst->pz = p;
memcpy(pDst->pz, pSrc->pz, pSrc->nLen);
pDst->nLen = pSrc->nLen;
return;
}
// this is only for string array
if (pSrc->nType == TSDB_DATA_TYPE_ARRAY) {
if (pSrc->nType >= TSDB_DATA_TYPE_BOOL && pSrc->nType <= TSDB_DATA_TYPE_DOUBLE) {
pDst->i64Key = pSrc->i64Key;
} else if (pSrc->nType == TSDB_DATA_TYPE_ARRAY) { // this is only for string array
size_t num = taosArrayGetSize(pSrc->arr);
pDst->arr = taosArrayInit(num, sizeof(char*));
for(size_t i = 0; i < num; i++) {
......@@ -166,8 +169,6 @@ void tVariantAssign(tVariant *pDst, const tVariant *pSrc) {
char* n = strdup(p);
taosArrayPush(pDst->arr, &n);
}
return;
}
pDst->nLen = tDataTypeDesc[pDst->nType].nSize;
......
......@@ -78,7 +78,6 @@ int32_t qKillQuery(qinfo_t qinfo);
int32_t qQueryCompleted(qinfo_t qinfo);
/**
* destroy query info structure
* @param qHandle
......
......@@ -236,6 +236,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_NOT_READY, 0, 0x0707, "Query not
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_HAS_RSP, 0, 0x0708, "Query should response")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_IN_EXEC, 0, 0x0709, "Multiple retrieval of this query")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW, 0, 0x070A, "Too many time window in query")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_NOT_ENOUGH_BUFFER, 0, 0x070B, "Query buffer limit has reached")
// grant
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_EXPIRED, 0, 0x0800, "License expired")
......
......@@ -182,7 +182,7 @@ static int32_t mnodeGetConnsMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC
// app name
pShow->bytes[cols] = TSDB_APPNAME_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "app_name");
strcpy(pSchema[cols].name, "program");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
......
......@@ -35,16 +35,9 @@ typedef struct STSList {
int32_t len;
} STSList;
typedef struct STSRawBlock {
int32_t vnode;
int64_t tag;
TSKEY* ts;
int32_t len;
} STSRawBlock;
typedef struct STSElem {
TSKEY ts;
tVariant tag;
tVariant* tag;
int32_t vnode;
} STSElem;
......@@ -84,6 +77,7 @@ typedef struct STSBuf {
char path[PATH_MAX];
uint32_t fileSize;
// todo use array
STSVnodeBlockInfoEx* pData;
uint32_t numOfAlloc;
uint32_t numOfVnodes;
......@@ -106,12 +100,12 @@ typedef struct STSBufFileHeader {
STSBuf* tsBufCreate(bool autoDelete, int32_t order);
STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete);
STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t tsOrder);
STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t tsOrder, int32_t vnodeId);
void* tsBufDestroy(STSBuf* pTSBuf);
void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, tVariant* tag, const char* pData, int32_t len);
int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeIdx);
int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf);
STSBuf* tsBufClone(STSBuf* pTSBuf);
......@@ -121,6 +115,7 @@ void tsBufFlush(STSBuf* pTSBuf);
void tsBufResetPos(STSBuf* pTSBuf);
STSElem tsBufGetElem(STSBuf* pTSBuf);
bool tsBufNextPos(STSBuf* pTSBuf);
STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, tVariant* tag);
......@@ -136,6 +131,10 @@ void tsBufSetCursor(STSBuf* pTSBuf, STSCursor* pCur);
*/
void tsBufDisplay(STSBuf* pTSBuf);
int32_t tsBufGetNumOfVnodes(STSBuf* pTSBuf);
void tsBufGetVnodeIdList(STSBuf* pTSBuf, int32_t* num, int32_t** vnodeId);
#ifdef __cplusplus
}
#endif
......
......@@ -184,7 +184,7 @@ static void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx, SResultInf
static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId);
static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY *tsCol, SDataBlockInfo* pBlockInfo,
SDataStatis *pStatis, void *param, int32_t colIndex);
SDataStatis *pStatis, void *param, int32_t colIndex, int32_t vgId);
static void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv);
static void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo);
......@@ -194,6 +194,8 @@ static void buildTagQueryResult(SQInfo *pQInfo);
static int32_t setAdditionalInfo(SQInfo *pQInfo, void *pTable, STableQueryInfo *pTableQueryInfo);
static int32_t flushFromResultBuf(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo);
static int32_t checkForQueryBuf(size_t numOfTables);
static void releaseQueryBuf(size_t numOfTables);
bool doFilterData(SQuery *pQuery, int32_t elemPos) {
for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) {
......@@ -1005,9 +1007,10 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv);
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
char *dataBlock = getDataBlock(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->rows, pDataBlock);
setExecParams(pQuery, &pCtx[k], dataBlock, tsCols, pDataBlockInfo, pStatis, &sasArray[k], k);
setExecParams(pQuery, &pCtx[k], dataBlock, tsCols, pDataBlockInfo, pStatis, &sasArray[k], k, pQInfo->vgId);
}
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
......@@ -1200,7 +1203,7 @@ static int32_t doTSJoinFilter(SQueryRuntimeEnv *pRuntimeEnv, int32_t offset) {
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
// compare tag first
if (tVariantCompare(&pCtx[0].tag, &elem.tag) != 0) {
if (tVariantCompare(&pCtx[0].tag, elem.tag) != 0) {
return TS_JOIN_TAG_NOT_EQUALS;
}
......@@ -1286,9 +1289,10 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
groupbyColumnData = getGroupbyColumnData(pQuery, &type, &bytes, pDataBlock);
}
SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv);
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
char *dataBlock = getDataBlock(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->rows, pDataBlock);
setExecParams(pQuery, &pCtx[k], dataBlock, tsCols, pDataBlockInfo, pStatis, &sasArray[k], k);
setExecParams(pQuery, &pCtx[k], dataBlock, tsCols, pDataBlockInfo, pStatis, &sasArray[k], k, pQInfo->vgId);
}
// set the input column data
......@@ -1303,7 +1307,6 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
// from top to bottom in desc
// from bottom to top in asc order
if (pRuntimeEnv->pTSBuf != NULL) {
SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pRuntimeEnv);
qDebug("QInfo:%p process data rows, numOfRows:%d, query order:%d, ts comp order:%d", pQInfo, pDataBlockInfo->rows,
pQuery->order.order, pRuntimeEnv->pTSBuf->cur.order);
}
......@@ -1409,6 +1412,10 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
item->lastKey = (QUERY_IS_ASC_QUERY(pQuery)? pDataBlockInfo->window.ekey:pDataBlockInfo->window.skey) + step;
}
if (pRuntimeEnv->pTSBuf != NULL) {
item->cur = tsBufGetCursor(pRuntimeEnv->pTSBuf);
}
// todo refactor: extract method
for(int32_t i = 0; i < pQuery->numOfOutput; ++i) {
if (pQuery->pSelectExpr[i].base.functionId != TSDB_FUNC_ARITHM) {
......@@ -1469,7 +1476,7 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
}
void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY *tsCol, SDataBlockInfo* pBlockInfo,
SDataStatis *pStatis, void *param, int32_t colIndex) {
SDataStatis *pStatis, void *param, int32_t colIndex, int32_t vgId) {
int32_t functionId = pQuery->pSelectExpr[colIndex].base.functionId;
int32_t colId = pQuery->pSelectExpr[colIndex].base.colInfo.colId;
......@@ -1542,6 +1549,9 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY
}
}
}
} else if (functionId == TSDB_FUNC_TS_COMP) {
pCtx->param[0].i64Key = vgId;
pCtx->param[0].nType = TSDB_DATA_TYPE_BIGINT;
}
#if defined(_DEBUG_VIEW)
......@@ -2621,12 +2631,19 @@ void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, void *pTable, void *tsdb) {
pFuncMsg->colInfo.colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
assert(pFuncMsg->numOfParams == 1);
int16_t tagColId = (int16_t)pExprInfo->base.arg->argValue.i64;
SColumnInfo* pColInfo = doGetTagColumnInfoById(pQuery->tagColList, pQuery->numOfTags, tagColId);
int16_t tagColId = (int16_t)pExprInfo->base.arg->argValue.i64;
SColumnInfo *pColInfo = doGetTagColumnInfoById(pQuery->tagColList, pQuery->numOfTags, tagColId);
doSetTagValueInParam(tsdb, pTable, tagColId, &pRuntimeEnv->pCtx[0].tag, pColInfo->type, pColInfo->bytes);
qDebug("QInfo:%p set tag value for join comparison, colId:%" PRId64 ", val:%"PRId64, pQInfo, pExprInfo->base.arg->argValue.i64,
pRuntimeEnv->pCtx[0].tag.i64Key)
int16_t tagType = pRuntimeEnv->pCtx[0].tag.nType;
if (tagType == TSDB_DATA_TYPE_BINARY || tagType == TSDB_DATA_TYPE_NCHAR) {
qDebug("QInfo:%p set tag value for join comparison, colId:%" PRId64 ", val:%s", pQInfo,
pExprInfo->base.arg->argValue.i64, pRuntimeEnv->pCtx[0].tag.pz);
} else {
qDebug("QInfo:%p set tag value for join comparison, colId:%" PRId64 ", val:%" PRId64, pQInfo,
pExprInfo->base.arg->argValue.i64, pRuntimeEnv->pCtx[0].tag.i64Key);
}
}
}
}
......@@ -3860,14 +3877,40 @@ int32_t setAdditionalInfo(SQInfo *pQInfo, void* pTable, STableQueryInfo *pTableQ
// both the master and supplement scan needs to set the correct ts comp start position
if (pRuntimeEnv->pTSBuf != NULL) {
tVariant* pTag = &pRuntimeEnv->pCtx[0].tag;
if (pTableQueryInfo->cur.vgroupIndex == -1) {
tVariantAssign(&pTableQueryInfo->tag, &pRuntimeEnv->pCtx[0].tag);
tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, 0, &pTableQueryInfo->tag);
tVariantAssign(&pTableQueryInfo->tag, pTag);
STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, pQInfo->vgId, &pTableQueryInfo->tag);
// failed to find data with the specified tag value and vnodeId
if (elem.vnode < 0) {
if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
qError("QInfo:%p failed to find tag:%s in ts_comp", pQInfo, pTag->pz);
} else {
qError("QInfo:%p failed to find tag:%" PRId64 " in ts_comp", pQInfo, pTag->i64Key);
}
return false;
}
// keep the cursor info of current meter
pTableQueryInfo->cur = pRuntimeEnv->pTSBuf->cur;
pTableQueryInfo->cur = tsBufGetCursor(pRuntimeEnv->pTSBuf);
if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
qDebug("QInfo:%p find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d", pQInfo, pTag->pz, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex);
} else {
qDebug("QInfo:%p find tag:%"PRId64" start pos in ts_comp, blockIndex:%d, tsIndex:%d", pQInfo, pTag->i64Key, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex);
}
} else {
tsBufSetCursor(pRuntimeEnv->pTSBuf, &pTableQueryInfo->cur);
if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
qDebug("QInfo:%p find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d", pQInfo, pTag->pz, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex);
} else {
qDebug("QInfo:%p find tag:%"PRId64" start pos in ts_comp, blockIndex:%d, tsIndex:%d", pQInfo, pTag->i64Key, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex);
}
}
}
......@@ -4763,15 +4806,62 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) {
}
if (pRuntimeEnv->pTSBuf != NULL) {
if (pRuntimeEnv->cur.vgroupIndex == -1) {
STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, 0, &pRuntimeEnv->pCtx[0].tag);
tVariant* pTag = &pRuntimeEnv->pCtx[0].tag;
// failed to find data with the specified tag value
if (pRuntimeEnv->cur.vgroupIndex == -1) {
STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, pQInfo->vgId, pTag);
// failed to find data with the specified tag value and vnodeId
if (elem.vnode < 0) {
if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
qError("QInfo:%p failed to find tag:%s in ts_comp", pQInfo, pTag->pz);
} else {
qError("QInfo:%p failed to find tag:%"PRId64" in ts_comp", pQInfo, pTag->i64Key);
}
return false;
} else {
STSCursor cur = tsBufGetCursor(pRuntimeEnv->pTSBuf);
if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
qDebug("QInfo:%p find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d", pQInfo, pTag->pz,
cur.blockIndex, cur.tsIndex);
} else {
qDebug("QInfo:%p find tag:%"PRId64" start pos in ts_comp, blockIndex:%d, tsIndex:%d", pQInfo, pTag->i64Key,
cur.blockIndex, cur.tsIndex);
}
}
} else {
tsBufSetCursor(pRuntimeEnv->pTSBuf, &pRuntimeEnv->cur);
STSElem elem = tsBufGetElem(pRuntimeEnv->pTSBuf);
if (tVariantCompare(elem.tag, &pRuntimeEnv->pCtx[0].tag) != 0) {
STSElem elem1 = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, pQInfo->vgId, pTag);
// failed to find data with the specified tag value and vnodeId
if (elem1.vnode < 0) {
if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
qError("QInfo:%p failed to find tag:%s in ts_comp", pQInfo, pTag->pz);
} else {
qError("QInfo:%p failed to find tag:%"PRId64" in ts_comp", pQInfo, pTag->i64Key);
}
return false;
} else {
STSCursor cur = tsBufGetCursor(pRuntimeEnv->pTSBuf);
if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
qDebug("QInfo:%p find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d", pQInfo, pTag->pz, cur.blockIndex, cur.tsIndex);
} else {
qDebug("QInfo:%p find tag:%"PRId64" start pos in ts_comp, blockIndex:%d, tsIndex:%d", pQInfo, pTag->i64Key, cur.blockIndex, cur.tsIndex);
}
}
} else {
tsBufSetCursor(pRuntimeEnv->pTSBuf, &pRuntimeEnv->cur);
STSCursor cur = tsBufGetCursor(pRuntimeEnv->pTSBuf);
if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
qDebug("QInfo:%p continue scan ts_comp file, tag:%s blockIndex:%d, tsIndex:%d", pQInfo, pTag->pz, cur.blockIndex, cur.tsIndex);
} else {
qDebug("QInfo:%p continue scan ts_comp file, tag:%"PRId64" blockIndex:%d, tsIndex:%d", pQInfo, pTag->i64Key, cur.blockIndex, cur.tsIndex);
}
}
}
}
......@@ -5027,6 +5117,10 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
break;
}
if (pRuntimeEnv->pTSBuf != NULL) {
pRuntimeEnv->cur = pRuntimeEnv->pTSBuf->cur;
}
} else {
// all data in the result buffer are skipped due to the offset, continue to retrieve data from current meter
if (pQuery->rec.rows == 0) {
......@@ -6320,7 +6414,7 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ
STSBuf *pTSBuf = NULL;
if (pQueryMsg->tsLen > 0) { // open new file to save the result
char *tsBlock = (char *) pQueryMsg + pQueryMsg->tsOffset;
pTSBuf = tsBufCreateFromCompBlocks(tsBlock, pQueryMsg->tsNumOfBlocks, pQueryMsg->tsLen, pQueryMsg->tsOrder);
pTSBuf = tsBufCreateFromCompBlocks(tsBlock, pQueryMsg->tsNumOfBlocks, pQueryMsg->tsLen, pQueryMsg->tsOrder, vgId);
tsBufResetPos(pTSBuf);
bool ret = tsBufNextPos(pTSBuf);
......@@ -6402,6 +6496,8 @@ static void freeQInfo(SQInfo *pQInfo) {
qDebug("QInfo:%p start to free QInfo", pQInfo);
releaseQueryBuf(pQInfo->tableqinfoGroupInfo.numOfTables);
teardownQueryRuntimeEnv(&pQInfo->runtimeEnv);
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
......@@ -6636,6 +6732,11 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
assert(0);
}
code = checkForQueryBuf(tableGroupInfo.numOfTables);
if (code != TSDB_CODE_SUCCESS) { // not enough query buffer, abort
goto _over;
}
(*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pExprs, &tableGroupInfo, pTagColumnInfo, isSTableQuery);
pExprs = NULL;
pGroupbyExpr = NULL;
......@@ -7037,6 +7138,48 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
setQueryStatus(pQuery, QUERY_COMPLETED);
}
static int64_t getQuerySupportBufSize(size_t numOfTables) {
size_t s1 = sizeof(STableQueryInfo);
size_t s2 = sizeof(SHashNode);
// size_t s3 = sizeof(STableCheckInfo); buffer consumption in tsdb
return (int64_t)((s1 + s2) * 1.5 * numOfTables);
}
int32_t checkForQueryBuf(size_t numOfTables) {
int64_t t = getQuerySupportBufSize(numOfTables);
if (tsQueryBufferSize < 0) {
return TSDB_CODE_SUCCESS;
} else if (tsQueryBufferSize > 0) {
while(1) {
int64_t s = tsQueryBufferSize;
int64_t remain = s - t;
if (remain >= 0) {
if (atomic_val_compare_exchange_64(&tsQueryBufferSize, s, remain) == s) {
return TSDB_CODE_SUCCESS;
}
} else {
return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
}
}
}
// disable query processing if the value of tsQueryBufferSize is zero.
return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
}
void releaseQueryBuf(size_t numOfTables) {
if (tsQueryBufferSize <= 0) {
return;
}
int64_t t = getQuerySupportBufSize(numOfTables);
// restore value is not enough buffer available
atomic_add_fetch_64(&tsQueryBufferSize, t);
}
void* qGetResultRetrieveMsg(qinfo_t qinfo) {
SQInfo* pQInfo = (SQInfo*) qinfo;
assert(pQInfo != NULL);
......
......@@ -403,7 +403,7 @@ void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, tVariant* tag, const char* pDa
} else {
expandBuffer(ptsData, len);
}
tVariantAssign(&pTSBuf->block.tag, tag);
memcpy(ptsData->rawBuf + ptsData->len, pData, (size_t)len);
......@@ -561,6 +561,19 @@ static void tsBufGetBlock(STSBuf* pTSBuf, int32_t vnodeIndex, int32_t blockIndex
pCur->tsIndex = (pCur->order == TSDB_ORDER_ASC) ? 0 : pBlock->numOfElem - 1;
}
static int32_t doUpdateVnodeInfo(STSBuf* pTSBuf, int64_t offset, STSVnodeBlockInfo* pVInfo) {
if (offset < 0 || offset >= getDataStartOffset()) {
return -1;
}
if (fseek(pTSBuf->f, (int32_t)offset, SEEK_SET) != 0) {
return -1;
}
fwrite(pVInfo, sizeof(STSVnodeBlockInfo), 1, pTSBuf->f);
return 0;
}
STSVnodeBlockInfo* tsBufGetVnodeBlockInfo(STSBuf* pTSBuf, int32_t vnodeId) {
int32_t j = tsBufFindVnodeIndexFromId(pTSBuf->pData, pTSBuf->numOfVnodes, vnodeId);
if (j == -1) {
......@@ -649,7 +662,7 @@ bool tsBufNextPos(STSBuf* pTSBuf) {
return false;
}
int32_t blockIndex = pCur->order == TSDB_ORDER_ASC ? 0 : pBlockInfo->numOfBlocks - 1;
int32_t blockIndex = (pCur->order == TSDB_ORDER_ASC) ? 0 : (pBlockInfo->numOfBlocks - 1);
tsBufGetBlock(pTSBuf, pCur->vgroupIndex + step, blockIndex);
break;
......@@ -675,8 +688,7 @@ void tsBufResetPos(STSBuf* pTSBuf) {
}
STSElem tsBufGetElem(STSBuf* pTSBuf) {
STSElem elem1 = {.vnode = -1};
STSElem elem1 = {.vnode = -1};
if (pTSBuf == NULL) {
return elem1;
}
......@@ -690,7 +702,7 @@ STSElem tsBufGetElem(STSBuf* pTSBuf) {
elem1.vnode = pTSBuf->pData[pCur->vgroupIndex].info.vnode;
elem1.ts = *(TSKEY*)(pTSBuf->tsData.rawBuf + pCur->tsIndex * TSDB_KEYSIZE);
tVariantAssign(&elem1.tag, &pBlock->tag);
elem1.tag = &pBlock->tag;
return elem1;
}
......@@ -702,7 +714,7 @@ STSElem tsBufGetElem(STSBuf* pTSBuf) {
* @param vnodeId
* @return
*/
int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeId) {
int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf) {
if (pDestBuf == NULL || pSrcBuf == NULL || pSrcBuf->numOfVnodes <= 0) {
return 0;
}
......@@ -712,14 +724,13 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeId) {
}
// src can only have one vnode index
if (pSrcBuf->numOfVnodes > 1) {
return -1;
}
assert(pSrcBuf->numOfVnodes == 1);
// there are data in buffer, flush to disk first
tsBufFlush(pDestBuf);
// compared with the last vnode id
int32_t vnodeId = tsBufGetLastVnodeInfo((STSBuf*) pSrcBuf)->info.vnode;
if (vnodeId != tsBufGetLastVnodeInfo(pDestBuf)->info.vnode) {
int32_t oldSize = pDestBuf->numOfVnodes;
int32_t newSize = oldSize + pSrcBuf->numOfVnodes;
......@@ -791,14 +802,14 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeId) {
return 0;
}
STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t order) {
STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t order, int32_t vnodeId) {
STSBuf* pTSBuf = tsBufCreate(true, order);
STSVnodeBlockInfo* pBlockInfo = &(addOneVnodeInfo(pTSBuf, 0)->info);
pBlockInfo->numOfBlocks = numOfBlocks;
pBlockInfo->compLen = len;
pBlockInfo->offset = getDataStartOffset();
pBlockInfo->vnode = 0;
pBlockInfo->vnode = vnodeId;
// update prev vnode length info in file
TSBufUpdateVnodeInfo(pTSBuf, pTSBuf->numOfVnodes - 1, pBlockInfo);
......@@ -902,8 +913,8 @@ void tsBufDisplay(STSBuf* pTSBuf) {
while (tsBufNextPos(pTSBuf)) {
STSElem elem = tsBufGetElem(pTSBuf);
if (elem.tag.nType == TSDB_DATA_TYPE_BIGINT) {
printf("%d-%" PRId64 "-%" PRId64 "\n", elem.vnode, elem.tag.i64Key, elem.ts);
if (elem.tag->nType == TSDB_DATA_TYPE_BIGINT) {
printf("%d-%" PRId64 "-%" PRId64 "\n", elem.vnode, elem.tag->i64Key, elem.ts);
}
}
......@@ -915,19 +926,6 @@ static int32_t getDataStartOffset() {
return sizeof(STSBufFileHeader) + TS_COMP_FILE_VNODE_MAX * sizeof(STSVnodeBlockInfo);
}
static int32_t doUpdateVnodeInfo(STSBuf* pTSBuf, int64_t offset, STSVnodeBlockInfo* pVInfo) {
if (offset < 0 || offset >= getDataStartOffset()) {
return -1;
}
if (fseek(pTSBuf->f, (int32_t)offset, SEEK_SET) != 0) {
return -1;
}
fwrite(pVInfo, sizeof(STSVnodeBlockInfo), 1, pTSBuf->f);
return 0;
}
// update prev vnode length info in file
static void TSBufUpdateVnodeInfo(STSBuf* pTSBuf, int32_t index, STSVnodeBlockInfo* pBlockInfo) {
int32_t offset = sizeof(STSBufFileHeader) + index * sizeof(STSVnodeBlockInfo);
......@@ -969,3 +967,29 @@ static STSBuf* allocResForTSBuf(STSBuf* pTSBuf) {
pTSBuf->fileSize += getDataStartOffset();
return pTSBuf;
}
int32_t tsBufGetNumOfVnodes(STSBuf* pTSBuf) {
if (pTSBuf == NULL) {
return 0;
}
return pTSBuf->numOfVnodes;
}
void tsBufGetVnodeIdList(STSBuf* pTSBuf, int32_t* num, int32_t** vnodeId) {
int32_t size = tsBufGetNumOfVnodes(pTSBuf);
if (num != NULL) {
*num = size;
}
*vnodeId = NULL;
if (size == 0) {
return;
}
(*vnodeId) = malloc(tsBufGetNumOfVnodes(pTSBuf) * sizeof(int32_t));
for(int32_t i = 0; i < size; ++i) {
(*vnodeId)[i] = pTSBuf->pData[i].info.vnode;
}
}
\ No newline at end of file
......@@ -304,7 +304,7 @@ void TSTraverse() {
int32_t totalOutput = 10;
while (1) {
STSElem elem = tsBufGetElem(pTSBuf);
printf("%d-%" PRIu64 "-%" PRIu64 "\n", elem.vnode, elem.tag.i64Key, elem.ts);
printf("%d-%" PRIu64 "-%" PRIu64 "\n", elem.vnode, elem.tag->i64Key, elem.ts);
if (!tsBufNextPos(pTSBuf)) {
break;
......@@ -352,7 +352,7 @@ void TSTraverse() {
totalOutput = 10;
while (1) {
STSElem elem = tsBufGetElem(pTSBuf);
printf("%d-%" PRIu64 "-%" PRIu64 "\n", elem.vnode, elem.tag.i64Key, elem.ts);
printf("%d-%" PRIu64 "-%" PRIu64 "\n", elem.vnode, elem.tag->i64Key, elem.ts);
if (!tsBufNextPos(pTSBuf)) {
break;
......@@ -416,8 +416,8 @@ void mergeDiffVnodeBufferTest() {
int64_t* list = createTsList(num, start, step);
t.i64Key = i;
tsBufAppend(pTSBuf1, 0, &t, (const char*)list, num * sizeof(int64_t));
tsBufAppend(pTSBuf2, 0, &t, (const char*)list, num * sizeof(int64_t));
tsBufAppend(pTSBuf1, 1, &t, (const char*)list, num * sizeof(int64_t));
tsBufAppend(pTSBuf2, 9, &t, (const char*)list, num * sizeof(int64_t));
free(list);
......@@ -426,7 +426,7 @@ void mergeDiffVnodeBufferTest() {
tsBufFlush(pTSBuf2);
tsBufMerge(pTSBuf1, pTSBuf2, 9);
tsBufMerge(pTSBuf1, pTSBuf2);
EXPECT_EQ(pTSBuf1->numOfVnodes, 2);
EXPECT_EQ(pTSBuf1->numOfTotal, numOfTags * 2 * num);
......@@ -459,8 +459,6 @@ void mergeIdenticalVnodeBufferTest() {
start += step * num;
}
for (int32_t i = numOfTags; i < numOfTags * 2; ++i) {
int64_t* list = createTsList(num, start, step);
......@@ -473,7 +471,7 @@ void mergeIdenticalVnodeBufferTest() {
tsBufFlush(pTSBuf2);
tsBufMerge(pTSBuf1, pTSBuf2, 12);
tsBufMerge(pTSBuf1, pTSBuf2);
EXPECT_EQ(pTSBuf1->numOfVnodes, 1);
EXPECT_EQ(pTSBuf1->numOfTotal, numOfTags * 2 * num);
......@@ -482,7 +480,7 @@ void mergeIdenticalVnodeBufferTest() {
STSElem elem = tsBufGetElem(pTSBuf1);
EXPECT_EQ(elem.vnode, 12);
printf("%d-%" PRIu64 "-%" PRIu64 "\n", elem.vnode, elem.tag.i64Key, elem.ts);
printf("%d-%" PRIu64 "-%" PRIu64 "\n", elem.vnode, elem.tag->i64Key, elem.ts);
}
tsBufDestroy(pTSBuf1);
......
......@@ -53,7 +53,7 @@ enum {
TAOS_CFG_UTYPE_NONE,
TAOS_CFG_UTYPE_PERCENT,
TAOS_CFG_UTYPE_GB,
TAOS_CFG_UTYPE_Mb,
TAOS_CFG_UTYPE_MB,
TAOS_CFG_UTYPE_BYTE,
TAOS_CFG_UTYPE_SECOND,
TAOS_CFG_UTYPE_MS
......
......@@ -335,7 +335,7 @@ void *taosCacheTransfer(SCacheObj *pCacheObj, void **data) {
}
void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
if (pCacheObj == NULL || taosHashGetSize(pCacheObj->pHashTable) + pCacheObj->numOfElemsInTrash == 0) {
if (pCacheObj == NULL) {
return;
}
......@@ -343,7 +343,12 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
uError("cache:%s, NULL data to release", pCacheObj->name);
return;
}
// The operation of removal from hash table and addition to trashcan is not an atomic operation,
// therefore the check for the empty of both the hash table and the trashcan has a race condition.
// It happens when there is only one object in the cache, and two threads which has referenced this object
// start to free the it simultaneously [TD-1569].
size_t offset = offsetof(SCacheDataNode, data);
SCacheDataNode *pNode = (SCacheDataNode *)((char *)(*data) - offset);
......
......@@ -250,11 +250,13 @@ int walWrite(void *handle, SWalHead *pHead) {
if (taosTWrite(pWal->fd, pHead, contLen) != contLen) {
wError("wal:%s, failed to write(%s)", pWal->name, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return terrno;
} else {
pWal->version = pHead->version;
}
ASSERT(contLen == pHead->len + sizeof(SWalHead));
return terrno;
return 0;
}
void walFsync(void *handle) {
......@@ -424,7 +426,7 @@ static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) {
if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) {
wWarn("wal:%s, cksum is messed up, skip the rest of file", name);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
// ASSERT(false);
ASSERT(false);
break;
}
......
......@@ -107,7 +107,7 @@ func main() {
fmt.Scanln()
url = "root:taosdata@/tcp(" + configPara.hostName + ":" + strconv.Itoa(configPara.serverPort) + ")/"
//url = fmt.Sprintf("%s:%s@/tcp(%s:%d)/%s?interpolateParams=true", configPara.user, configPara.password, configPara.hostName, configPara.serverPort, configPara.dbName)
//url = fmt.Sprintf("%s:%s@/tcp(%s:%d)/%s?interpolateParams=true", configPara.user, configPara.password, configPara.hostName, configPara.serverPort, configPara.dbName)
// open connect to taos server
//db, err := sql.Open(taosDriverName, url)
//if err != nil {
......@@ -115,6 +115,7 @@ func main() {
// os.Exit(1)
//}
//defer db.Close()
rand.Seed(time.Now().Unix())
createDatabase(configPara.dbName, configPara.supTblName)
fmt.Printf("======== create database success! ========\n\n")
......
......@@ -42,8 +42,8 @@ function executeQuery(sql){
var start = new Date().getTime();
var promise = cursor.query(sql, true);
var end = new Date().getTime();
printSql(sql, promise != null,(end - start));
promise.then(function(result){
printSql(sql, result != null,(end - start));
result.pretty();
});
}
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from clusterSetup import *
from util.sql import tdSql
from util.log import tdLog
import random
import time
class ClusterTestcase:
## test case 32 ##
def run(self):
nodes = Nodes()
nodes.addConfigs("maxVgroupsPerDb", "10")
nodes.addConfigs("maxTablesPerVnode", "1000")
nodes.restartAllTaosd()
ctest = ClusterTest(nodes.node1.hostName)
ctest.connectDB()
ctest.createSTable(1)
ctest.run()
tdSql.init(ctest.conn.cursor(), False)
tdSql.execute("use %s" % ctest.dbName)
tdSql.query("show vgroups")
dnodes = []
for i in range(10):
dnodes.append(int(tdSql.getData(i, 4)))
s = set(dnodes)
if len(s) < 3:
tdLog.exit("cluster is not balanced")
tdLog.info("cluster is balanced")
nodes.removeConfigs("maxVgroupsPerDb", "10")
nodes.removeConfigs("maxTablesPerVnode", "1000")
nodes.restartAllTaosd()
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
ct = ClusterTestcase()
ct.run()
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from clusterSetup import *
from util.sql import tdSql
from util.log import tdLog
import random
class ClusterTestcase:
## test case 1, 33 ##
def run(self):
nodes = Nodes()
ctest = ClusterTest(nodes.node1.hostName)
ctest.connectDB()
tdSql.init(ctest.conn.cursor(), False)
## Test case 1 ##
tdLog.info("Test case 1 repeat %d times" % ctest.repeat)
for i in range(ctest.repeat):
tdLog.info("Start Round %d" % (i + 1))
replica = random.randint(1,3)
ctest.createSTable(replica)
ctest.run()
tdLog.sleep(10)
tdSql.query("select count(*) from %s.%s" %(ctest.dbName, ctest.stbName))
tdSql.checkData(0, 0, ctest.numberOfRecords * ctest.numberOfTables)
tdLog.info("Round %d completed" % (i + 1))
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
ct = ClusterTestcase()
ct.run()
\ No newline at end of file
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from clusterSetup import *
from util.sql import tdSql
from util.log import tdLog
import random
class ClusterTestcase:
## test case 7, ##
def run(self):
nodes = Nodes()
ctest = ClusterTest(nodes.node1.hostName)
ctest.connectDB()
tdSql.init(ctest.conn.cursor(), False)
tdSql.execute("use %s" % ctest.dbName)
tdSql.query("show vgroups")
for i in range(10):
tdSql.checkData(i, 5, "master")
tdSql.execute("alter database %s replica 2" % ctest.dbName)
tdLog.sleep(30)
tdSql.query("show vgroups")
for i in range(10):
tdSql.checkData(i, 5, "master")
tdSql.checkData(i, 7, "slave")
tdSql.execute("alter database %s replica 3" % ctest.dbName)
tdLog.sleep(30)
tdSql.query("show vgroups")
for i in range(10):
tdSql.checkData(i, 5, "master")
tdSql.checkData(i, 7, "slave")
tdSql.checkData(i, 9, "slave")
ct = ClusterTestcase()
ct.run()
\ No newline at end of file
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import os
import sys
sys.path.insert(0, os.getcwd())
from fabric import Connection
from util.sql import *
from util.log import *
import taos
import random
import threading
import logging
class Node:
def __init__(self, index, username, hostIP, hostName, password, homeDir):
self.index = index
self.username = username
self.hostIP = hostIP
self.hostName = hostName
self.homeDir = homeDir
self.conn = Connection("{}@{}".format(username, hostName), connect_kwargs={"password": "{}".format(password)})
def startTaosd(self):
try:
self.conn.run("sudo systemctl start taosd")
except Exception as e:
print("Start Taosd error for node %d " % self.index)
logging.exception(e)
def stopTaosd(self):
try:
self.conn.run("sudo systemctl stop taosd")
except Exception as e:
print("Stop Taosd error for node %d " % self.index)
logging.exception(e)
def restartTaosd(self):
try:
self.conn.run("sudo systemctl restart taosd")
except Exception as e:
print("Stop Taosd error for node %d " % self.index)
logging.exception(e)
def removeTaosd(self):
try:
self.conn.run("rmtaos")
except Exception as e:
print("remove taosd error for node %d " % self.index)
logging.exception(e)
def installTaosd(self, packagePath):
self.conn.put(packagePath, self.homeDir)
self.conn.cd(self.homeDir)
self.conn.run("tar -zxf $(basename '%s')" % packagePath)
with self.conn.cd("TDengine-enterprise-server"):
self.conn.run("yes|./install.sh")
def configTaosd(self, taosConfigKey, taosConfigValue):
self.conn.run("sudo echo '%s %s' >> %s" % (taosConfigKey, taosConfigValue, "/etc/taos/taos.cfg"))
def removeTaosConfig(self, taosConfigKey, taosConfigValue):
self.conn.run("sudo sed -in-place -e '/%s %s/d' %s" % (taosConfigKey, taosConfigValue, "/etc/taos/taos.cfg"))
def configHosts(self, ip, name):
self.conn.run("echo '%s %s' >> %s" % (ip, name, '/etc/hosts'))
def removeData(self):
try:
self.conn.run("sudo rm -rf /var/lib/taos/*")
except Exception as e:
print("remove taosd data error for node %d " % self.index)
logging.exception(e)
def removeLog(self):
try:
self.conn.run("sudo rm -rf /var/log/taos/*")
except Exception as e:
print("remove taosd error for node %d " % self.index)
logging.exception(e)
def removeDataForMnode(self):
try:
self.conn.run("sudo rm -rf /var/lib/taos/*")
except Exception as e:
print("remove taosd error for node %d " % self.index)
logging.exception(e)
def removeDataForVnode(self, id):
try:
self.conn.run("sudo rm -rf /var/lib/taos/vnode%d/*.data" % id)
except Exception as e:
print("remove taosd error for node %d " % self.index)
logging.exception(e)
class Nodes:
def __init__(self):
self.node1 = Node(1, 'ubuntu', '192.168.1.52', 'node1', 'tbase125!', '/home/ubuntu')
self.node2 = Node(2, 'ubuntu', '192.168.1.53', 'node2', 'tbase125!', '/home/ubuntu')
self.node3 = Node(3, 'ubuntu', '192.168.1.54', 'node3', 'tbase125!', '/home/ubuntu')
def stopAllTaosd(self):
self.node1.stopTaosd()
self.node2.stopTaosd()
self.node3.stopTaosd()
def startAllTaosd(self):
self.node1.startTaosd()
self.node2.startTaosd()
self.node3.startTaosd()
def restartAllTaosd(self):
self.node1.restartTaosd()
self.node2.restartTaosd()
self.node3.restartTaosd()
def addConfigs(self, configKey, configValue):
self.node1.configTaosd(configKey, configValue)
self.node2.configTaosd(configKey, configValue)
self.node3.configTaosd(configKey, configValue)
def removeConfigs(self, configKey, configValue):
self.node1.removeTaosConfig(configKey, configValue)
self.node2.removeTaosConfig(configKey, configValue)
self.node3.removeTaosConfig(configKey, configValue)
def removeAllDataFiles(self):
self.node1.removeData()
self.node2.removeData()
self.node3.removeData()
class ClusterTest:
def __init__(self, hostName):
self.host = hostName
self.user = "root"
self.password = "taosdata"
self.config = "/etc/taos"
self.dbName = "mytest"
self.stbName = "meters"
self.numberOfThreads = 20
self.numberOfTables = 10000
self.numberOfRecords = 1000
self.tbPrefix = "t"
self.ts = 1538548685000
self.repeat = 1
def connectDB(self):
self.conn = taos.connect(
host=self.host,
user=self.user,
password=self.password,
config=self.config)
def createSTable(self, replica):
cursor = self.conn.cursor()
tdLog.info("drop database if exists %s" % self.dbName)
cursor.execute("drop database if exists %s" % self.dbName)
tdLog.info("create database %s replica %d" % (self.dbName, replica))
cursor.execute("create database %s replica %d" % (self.dbName, replica))
tdLog.info("use %s" % self.dbName)
cursor.execute("use %s" % self.dbName)
tdLog.info("drop table if exists %s" % self.stbName)
cursor.execute("drop table if exists %s" % self.stbName)
tdLog.info("create table %s(ts timestamp, current float, voltage int, phase int) tags(id int)" % self.stbName)
cursor.execute("create table %s(ts timestamp, current float, voltage int, phase int) tags(id int)" % self.stbName)
cursor.close()
def insertData(self, threadID):
print("Thread %d: starting" % threadID)
cursor = self.conn.cursor()
tablesPerThread = int(self.numberOfTables / self.numberOfThreads)
baseTableID = tablesPerThread * threadID
for i in range (tablesPerThread):
cursor.execute("create table %s%d using %s tags(%d)" % (self.tbPrefix, baseTableID + i, self.stbName, baseTableID + i))
query = "insert into %s%d values" % (self.tbPrefix, baseTableID + i)
base = self.numberOfRecords * i
for j in range(self.numberOfRecords):
query += "(%d, %f, %d, %d)" % (self.ts + base + j, random.random(), random.randint(210, 230), random.randint(0, 10))
cursor.execute(query)
cursor.close()
print("Thread %d: finishing" % threadID)
def run(self):
threads = []
tdLog.info("Inserting data")
for i in range(self.numberOfThreads):
thread = threading.Thread(target=self.insertData, args=(i,))
threads.append(thread)
thread.start()
for i in range(self.numberOfThreads):
threads[i].join()
\ No newline at end of file
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from clusterSetup import *
from util.sql import tdSql
from util.log import tdLog
import random
class ClusterTestcase:
## test case 20, 21, 22 ##
def run(self):
nodes = Nodes()
ctest = ClusterTest(nodes.node1.hostName)
ctest.connectDB()
ctest.createSTable(3)
ctest.run()
tdSql.init(ctest.conn.cursor(), False)
nodes.node2.stopTaosd()
tdSql.execute("use %s" % ctest.dbName)
tdSql.query("show vgroups")
vnodeID = tdSql.getData(0, 0)
nodes.node2.removeDataForVnode(vnodeID)
nodes.node2.startTaosd()
# Wait for vnode file to recover
for i in range(10):
tdSql.query("select count(*) from t0")
tdLog.sleep(10)
for i in range(10):
tdSql.query("select count(*) from t0")
tdSql.checkData(0, 0, 1000)
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
ct = ClusterTestcase()
ct.run()
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from clusterSetup import *
from util.sql import tdSql
from util.log import tdLog
import random
class ClusterTestcase:
##Cover test case 5 ##
def run(self):
# cluster environment set up
nodes = Nodes()
nodes.addConfigs("maxVgroupsPerDb", "10")
nodes.addConfigs("maxTablesPerVnode", "1000")
nodes.restartAllTaosd()
ctest = ClusterTest(nodes.node1.hostName)
ctest.connectDB()
ctest.createSTable(1)
ctest.run()
tdSql.init(ctest.conn.cursor(), False)
tdSql.execute("use %s" % ctest.dbName)
tdSql.error("create table tt1 using %s tags(1)" % ctest.stbName)
nodes.removeConfigs("maxVgroupsPerDb", "10")
nodes.removeConfigs("maxTablesPerVnode", "1000")
nodes.restartAllTaosd()
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
ct = ClusterTestcase()
ct.run()
\ No newline at end of file
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from clusterSetup import *
from util.sql import tdSql
from util.log import tdLog
import random
class ClusterTestcase:
## test case 7, 10 ##
def run(self):
# cluster environment set up
tdLog.info("Test case 7, 10")
nodes = Nodes()
ctest = ClusterTest(nodes.node1.hostName)
ctest.connectDB()
tdSql.init(ctest.conn.cursor(), False)
nodes.node1.stopTaosd()
tdSql.query("show dnodes")
tdSql.checkRows(3)
tdSql.checkData(0, 4, "offline")
tdSql.checkData(1, 4, "ready")
tdSql.checkData(2, 4, "ready")
nodes.node1.startTaosd()
tdSql.checkRows(3)
tdSql.checkData(0, 4, "ready")
tdSql.checkData(1, 4, "ready")
tdSql.checkData(2, 4, "ready")
nodes.node2.stopTaosd()
tdSql.query("show dnodes")
tdSql.checkRows(3)
tdSql.checkData(0, 4, "ready")
tdSql.checkData(1, 4, "offline")
tdSql.checkData(2, 4, "ready")
nodes.node2.startTaosd()
tdSql.checkRows(3)
tdSql.checkData(0, 4, "ready")
tdSql.checkData(1, 4, "ready")
tdSql.checkData(2, 4, "ready")
nodes.node3.stopTaosd()
tdSql.query("show dnodes")
tdSql.checkRows(3)
tdSql.checkData(0, 4, "ready")
tdSql.checkData(1, 4, "ready")
tdSql.checkData(2, 4, "offline")
nodes.node3.startTaosd()
tdSql.checkRows(3)
tdSql.checkData(0, 4, "ready")
tdSql.checkData(1, 4, "ready")
tdSql.checkData(2, 4, "ready")
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
ct = ClusterTestcase()
ct.run()
\ No newline at end of file
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from clusterSetup import *
from util.sql import tdSql
from util.log import tdLog
import random
class ClusterTestcase:
## cover test case 6, 8, 9, 11 ##
def run(self):
# cluster environment set up
nodes = Nodes()
ctest = ClusterTest(nodes.node1.hostName)
ctest.connectDB()
tdSql.init(ctest.conn.cursor(), False)
nodes.addConfigs("offlineThreshold", "10")
nodes.removeAllDataFiles()
nodes.restartAllTaosd()
nodes.node3.stopTaosd()
tdLog.sleep(10)
tdSql.query("show dnodes")
tdSql.checkRows(3)
tdSql.checkData(2, 4, "offline")
tdLog.sleep(60)
tdSql.checkRows(3)
tdSql.checkData(2, 4, "dropping")
tdLog.sleep(300)
tdSql.checkRows(2)
nodes.removeConfigs("offlineThreshold", "10")
nodes.restartAllTaosd()
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
ct = ClusterTestcase()
ct.run()
\ No newline at end of file
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from clusterSetup import *
from util.sql import tdSql
from util.log import tdLog
import random
class ClusterTestcase:
## test case 28, 29, 30, 31 ##
def run(self):
nodes = Nodes()
ctest = ClusterTest(nodes.node1.hostName)
ctest.connectDB()
ctest.createSTable(3)
ctest.run()
tdSql.init(ctest.conn.cursor(), False)
tdSql.execute("use %s" % ctest.dbName)
nodes.node2.stopTaosd()
for i in range(100):
tdSql.execute("drop table t%d" % i)
nodes.node2.startTaosd()
tdSql.query("show tables")
tdSql.checkRows(9900)
nodes.node2.stopTaosd()
for i in range(10):
tdSql.execute("create table a%d using meters tags(2)" % i)
nodes.node2.startTaosd()
tdSql.query("show tables")
tdSql.checkRows(9910)
nodes.node2.stopTaosd()
tdSql.execute("alter table meters add col col6 int")
nodes.node2.startTaosd()
nodes.node2.stopTaosd()
tdSql.execute("drop database %s" % ctest.dbName)
nodes.node2.startTaosd()
tdSql.query("show databases")
tdSql.checkRows(0)
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
ct = ClusterTestcase()
ct.run()
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from clusterSetup import *
from util.sql import tdSql
from util.log import tdLog
import random
import time
class ClusterTestcase:
## test case 32 ##
def run(self):
nodes = Nodes()
ctest = ClusterTest(nodes.node1.hostName)
ctest.connectDB()
ctest.createSTable(1)
ctest.run()
tdSql.init(ctest.conn.cursor(), False)
tdSql.execute("use %s" % ctest.dbName)
totalTime = 0
for i in range(10):
startTime = time.time()
tdSql.query("select * from %s" % ctest.stbName)
totalTime += time.time() - startTime
print("replica 1: avarage query time for %d records: %f seconds" % (ctest.numberOfTables * ctest.numberOfRecords,totalTime / 10))
tdSql.execute("alter database %s replica 3" % ctest.dbName)
tdLog.sleep(60)
totalTime = 0
for i in range(10):
startTime = time.time()
tdSql.query("select * from %s" % ctest.stbName)
totalTime += time.time() - startTime
print("replica 3: avarage query time for %d records: %f seconds" % (ctest.numberOfTables * ctest.numberOfRecords,totalTime / 10))
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
ct = ClusterTestcase()
ct.run()
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from clusterSetup import *
from util.sql import tdSql
from util.log import tdLog
import random
class ClusterTestcase:
## test case 19 ##
def run(self):
nodes = Nodes()
ctest = ClusterTest(nodes.node1.hostName)
tdSql.init(ctest.conn.cursor(), False)
tdSql.query("show databases")
count = tdSql.queryRows;
nodes.stopAllTaosd()
nodes.node1.startTaosd()
tdSql.error("show databases")
nodes.node2.startTaosd()
tdSql.error("show databases")
nodes.node3.startTaosd()
tdLog.sleep(10)
tdSql.query("show databases")
tdSql.checkRows(count)
ct = ClusterTestcase()
ct.run()
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from clusterSetup import *
from util.sql import tdSql
from util.log import tdLog
import random
class ClusterTestcase:
## test case 17, 18 ##
def run(self):
nodes = Nodes()
ctest = ClusterTest(nodes.node1.hostName)
ctest.connectDB()
ctest.createSTable(1)
ctest.run()
tdSql.init(ctest.conn.cursor(), False)
tdSql.query("show databases")
count = tdSql.queryRows;
tdSql.execute("use %s" % ctest.dbName)
tdSql.execute("alter database %s replica 3" % ctest.dbName)
nodes.node2.stopTaosd()
nodes.node3.stopTaosd()
tdSql.error("show databases")
nodes.node2.startTaosd()
tdSql.error("show databases")
nodes.node3.startTaosd()
tdSql.query("show databases")
tdSql.checkRows(count)
ct = ClusterTestcase()
ct.run()
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from clusterSetup import *
from util.sql import tdSql
from util.log import tdLog
import random
class ClusterTestcase:
## test case 24, 25, 26, 27 ##
def run(self):
nodes = Nodes()
ctest = ClusterTest(nodes.node1.hostName)
ctest.connectDB()
ctest.createSTable(1)
ctest.run()
tdSql.init(ctest.conn.cursor(), False)
tdSql.execute("use %s" % ctest.dbName)
tdSql.execute("alter database %s replica 3" % ctest.dbName)
for i in range(100):
tdSql.execute("drop table t%d" % i)
for i in range(100):
tdSql.execute("create table a%d using meters tags(1)" % i)
tdSql.execute("alter table meters add col col5 int")
tdSql.execute("alter table meters drop col col5 int")
tdSql.execute("drop database %s" % ctest.dbName)
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
ct = ClusterTestcase()
ct.run()
python3 basicTest.py
python3 bananceTest.py
python3 changeReplicaTest.py
python3 dataFileRecoveryTest.py
python3 fullDnodesTest.py
python3 killAndRestartDnodesTest.py
python3 offlineThresholdTest.py
python3 oneReplicaOfflineTest.py
python3 queryTimeTest.py
python3 stopAllDnodesTest.py
python3 stopTwoDnodesTest.py
python3 syncingTest.py
\ No newline at end of file
......@@ -1231,6 +1231,7 @@ class Task():
0x05, # TSDB_CODE_RPC_NOT_READY
0x0B, # Unable to establish connection, more details in TD-1648
0x200, # invalid SQL, TODO: re-examine with TD-934
0x20F, # query terminated, possibly due to vnoding being dropped, see TD-1776
0x217, # "db not selected", client side defined error code
# 0x218, # "Table does not exist" client side defined error code
0x360, # Table already exists
......
......@@ -24,6 +24,7 @@ python3 ./test.py -f table/alter_wal0.py
python3 ./test.py -f table/column_name.py
python3 ./test.py -f table/column_num.py
python3 ./test.py -f table/db_table.py
python3 ./test.py -f table/create_sensitive.py
#python3 ./test.py -f table/tablename-boundary.py
# tag
......@@ -186,7 +187,7 @@ python3 ./test.py -f functions/function_leastsquares.py -r 1
python3 ./test.py -f functions/function_max.py -r 1
python3 ./test.py -f functions/function_min.py -r 1
python3 ./test.py -f functions/function_operations.py -r 1
python3 ./test.py -f functions/function_percentile.py
python3 ./test.py -f functions/function_percentile.py -r 1
python3 ./test.py -f functions/function_spread.py -r 1
python3 ./test.py -f functions/function_stddev.py -r 1
python3 ./test.py -f functions/function_sum.py -r 1
......
......@@ -95,14 +95,16 @@ class TDTestCase:
tdSql.error(
"select stb_t.ts, stb_t.dscrption, stb_t.temperature, stb_t.id, stb_p.dscrption, stb_p.pressure from stb_p, stb_t where stb_p.ts=stb_t.ts and stb_p.id = stb_t.id group by stb_t.id")
tdSql.error(
"select stb_t.ts, stb_t.dscrption, stb_t.temperature, stb_t.id, stb_p.dscrption, stb_p.pressure from stb_p, stb_t where stb_p.ts=stb_t.ts and stb_p.id = stb_t.name;")
tdSql.error(
"select stb_t.ts, stb_t.dscrption, stb_t.temperature, stb_t.id, stb_p.dscrption, stb_p.pressure from stb_p, stb_t where stb_p.ts=stb_t.ts and stb_p.location = stb_t.name")
"select stb_t.ts, stb_t.dscrption, stb_t.temperature, stb_t.id, stb_p.dscrption, stb_p.pressure from stb_p, stb_t where stb_p.ts=stb_t.ts and stb_p.id = stb_t.name;")
tdSql.execute("alter table stb_t add tag pid int")
tdSql.execute("alter table tb_t1 set tag pid=2")
tdSql.execute("alter table tb_t2 set tag pid=1")
tdSql.query(
"select stb_t.ts, stb_t.dscrption, stb_t.temperature, stb_t.id, stb_p.dscrption, stb_p.pressure from stb_p, stb_t where stb_p.ts=stb_t.ts and stb_p.location = stb_t.name")
tdSql.checkRows(0)
tdSql.query("select stb_t.ts, stb_t.dscrption, stb_t.temperature, stb_t.id, stb_p.dscrption, stb_p.pressure from stb_p, stb_t where stb_p.ts=stb_t.ts and stb_p.id = stb_t.pid")
tdSql.checkRows(3)
......
......@@ -11,6 +11,7 @@
# -*- coding: utf-8 -*-
import sys
import os
import taos
......@@ -32,17 +33,23 @@ class taosdemoQueryPerformace:
def query(self):
cursor = self.conn.cursor()
cursor.execute("use test")
cursor.execute("use test")
totalTime = 0
for i in range(100):
startTime = time.time()
for i in range(100):
if(sys.argv[1] == '1'):
# root permission is required
os.system("echo 3 > /proc/sys/vm/drop_caches")
startTime = time.time()
cursor.execute("select count(*) from test.meters")
totalTime += time.time() - startTime
print("query time for: select count(*) from test.meters %f seconds" % (totalTime / 100))
totalTime = 0
for i in range(100):
if(sys.argv[1] == '1'):
# root permission is required
os.system("echo 3 > /proc/sys/vm/drop_caches")
startTime = time.time()
cursor.execute("select avg(f1), max(f2), min(f3) from test.meters")
totalTime += time.time() - startTime
......@@ -50,6 +57,9 @@ class taosdemoQueryPerformace:
totalTime = 0
for i in range(100):
if(sys.argv[1] == '1'):
# root permission is required
os.system("echo 3 > /proc/sys/vm/drop_caches")
startTime = time.time()
cursor.execute("select count(*) from test.meters where loc='beijing'")
totalTime += time.time() - startTime
......@@ -57,6 +67,9 @@ class taosdemoQueryPerformace:
totalTime = 0
for i in range(100):
if(sys.argv[1] == '1'):
# root permission is required
os.system("echo 3 > /proc/sys/vm/drop_caches")
startTime = time.time()
cursor.execute("select avg(f1), max(f2), min(f3) from test.meters where areaid=10")
totalTime += time.time() - startTime
......@@ -64,6 +77,9 @@ class taosdemoQueryPerformace:
totalTime = 0
for i in range(100):
if(sys.argv[1] == '1'):
# root permission is required
os.system("echo 3 > /proc/sys/vm/drop_caches")
startTime = time.time()
cursor.execute("select avg(f1), max(f2), min(f3) from test.t10 interval(10s)")
totalTime += time.time() - startTime
......@@ -71,11 +87,34 @@ class taosdemoQueryPerformace:
totalTime = 0
for i in range(100):
if(sys.argv[1] == '1'):
# root permission is required
os.system("echo 3 > /proc/sys/vm/drop_caches")
startTime = time.time()
cursor.execute("select last_row(*) from meters")
totalTime += time.time() - startTime
print("query time for: select last_row(*) from meters %f seconds" % (totalTime / 100))
totalTime = 0
for i in range(100):
if(sys.argv[1] == '1'):
# root permission is required
os.system("echo 3 > /proc/sys/vm/drop_caches")
startTime = time.time()
cursor.execute("select * from meters")
totalTime += time.time() - startTime
print("query time for: select * from meters %f seconds" % (totalTime / 100))
totalTime = 0
for i in range(100):
if(sys.argv[1] == '1'):
# root permission is required
os.system("echo 3 > /proc/sys/vm/drop_caches")
startTime = time.time()
cursor.execute("select avg(f1), max(f2), min(f3) from meters where ts <= '2017-07-15 10:40:01.000' and ts <= '2017-07-15 14:00:40.000'")
totalTime += time.time() - startTime
print("query time for: select avg(f1), max(f2), min(f3) from meters where ts <= '2017-07-15 10:40:01.000' and ts <= '2017-07-15 14:00:40.000' %f seconds" % (totalTime / 100))
if __name__ == '__main__':
perftest = taosdemoQueryPerformace()
perftest.initConnection()
......
......@@ -96,6 +96,12 @@ class TDTestCase:
tdSql.query("select * from st order by ts desc")
self.checkColumnSorted(0, "desc")
print("======= step 2: verify order for special column =========")
tdSql.query("select tbcol1 from st order by ts desc")
tdSql.query("select tbcol6 from st order by ts desc")
for i in range(1, 10):
tdSql.error("select * from st order by tbcol%d" % i)
tdSql.error("select * from st order by tbcol%d asc" % i)
......
......@@ -26,7 +26,6 @@ class TDTestCase:
def run(self):
rowNum = 200
totalNum = 200
tdSql.prepare()
tdLog.info("=============== step1")
......@@ -42,7 +41,9 @@ class TDTestCase:
tdSql.execute("create table st as select count(*), count(tbcol), count(tbcol2) from mt interval(10s)")
tdLog.info("=============== step3")
start = time.time()
tdSql.waitedQuery("select * from st", 1, 120)
delay = int(time.time() - start) + 20
v = tdSql.getData(0, 3)
if v >= 51:
tdLog.exit("value is %d, which is larger than 51" % v)
......@@ -54,11 +55,18 @@ class TDTestCase:
tdSql.execute("insert into tb%d values(now + %ds, %d, %d)" % (i, j, j, j))
tdLog.info("=============== step5")
tdLog.sleep(40)
tdSql.waitedQuery("select * from st order by ts desc", 1, 120)
v = tdSql.getData(0, 3)
if v <= 51:
tdLog.exit("value is %d, which is smaller than 51" % v)
maxValue = 0
for i in range(delay):
time.sleep(1)
tdSql.query("select * from st order by ts desc")
v = tdSql.getData(0, 3)
if v > maxValue:
maxValue = v
if v > 51:
break
if maxValue <= 51:
tdLog.exit("value is %d, which is smaller than 51" % maxValue)
def stop(self):
tdSql.close()
......
# -*- coding: utf-8 -*-
import sys
import string
import random
import subprocess
from util.log import *
from util.cases import *
from util.sql import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def run(self):
tdSql.prepare()
tdLog.info('=============== step1')
tdLog.info('create table TestSensitiveT(ts timestamp, i int)')
tdSql.execute('create table TestSensitiveT(ts timestamp, i int)')
tdLog.info('create table TestSensitiveSt(ts timestamp,i int) tags(j int)')
tdSql.execute('create table TestSensitiveSt(ts timestamp,i int) tags(j int)')
tdLog.info('create table Abcde using TestSensitiveSt tags(1)')
tdSql.execute('create table AbcdeFgh using TestSensitiveSt tags(1)')
tdLog.info('=============== step2')
tdLog.info('test normal table ')
tdSql.error('create table testsensitivet(ts timestamp, i int)')
tdSql.error('create table testsensitivet(ts timestamp, j int)')
tdSql.error('create table testsensItivet(ts timestamp, j int)')
tdSql.error('create table TESTSENSITIVET(ts timestamp, i int)')
tdLog.info('=============== step3')
tdLog.info('test super table ')
tdSql.error('create table testsensitivest(ts timestamp,i int) tags(j int)')
tdSql.error('create table testsensitivest(ts timestamp,i int) tags(k int)')
tdSql.error('create table TESTSENSITIVEST(ts timestamp,i int) tags(j int)')
tdSql.error('create table Testsensitivest(ts timestamp,i int) tags(j int)')
tdLog.info('=============== step4')
tdLog.info('test subtable ')
tdSql.error('create table abcdefgh using TestSensitiveSt tags(1)')
tdSql.error('create table ABCDEFGH using TestSensitiveSt tags(1)')
tdSql.error('create table Abcdefgh using TestSensitiveSt tags(1)')
tdSql.error('create table abcdeFgh using TestSensitiveSt tags(1)')
tdSql.error('insert into table abcdefgh using TestSensitiveSt tags(1) values(now,1)')
tdSql.error('insert into table ABCDEFGH using TestSensitiveSt tags(1) values(now,1)')
tdSql.error('insert into table Abcdefgh using TestSensitiveSt tags(1) values(now,1)')
tdSql.error('insert into table abcdeFgH using TestSensitiveSt tags(1) values(now,1)')
tdSql.query('show tables')
tdLog.info('tdSql.checkRow(0)')
tdSql.checkRows(2)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
......@@ -27,7 +27,7 @@ $mt = $mtPrefix . $i
$tstart = 100000
sql drop database if exits $db -x step1
sql drop database if exists $db -x step1
step1:
sql create database if not exists $db keep 36500
sql use $db
......
......@@ -24,7 +24,7 @@ $mt = $mtPrefix . $i
$tstart = 100000
sql drop database if exits $db -x step1
sql drop database if exists $db -x step1
step1:
sql create database if not exists $db keep 36500
sql use $db
......
......@@ -22,7 +22,7 @@ $mt = $mtPrefix . $i
$tstart = 100000
sql drop database if exits $db -x step1
sql drop database if exists $db -x step1
step1:
sql create database if not exists $db keep 36500
sql use $db
......
......@@ -21,7 +21,7 @@ $mt = $mtPrefix . $i
$tstart = 100000
sql drop database if exits $db -x step1
sql drop database if exists $db -x step1
step1:
sql create database if not exists $db keep 36500
sql use $db
......
......@@ -26,7 +26,7 @@ $i = 0
$db = $dbPrefix . $i
$mt = $mtPrefix . $i
sql drop database if exits $db -x step1
sql drop database if exists $db -x step1
step1:
sql create database if not exists $db maxtables 4 keep 36500
sql use $db
......
sleep 2000
run general/parser/alter.sim
sleep 2000
run general/parser/alter1.sim
sleep 2000
run general/parser/alter_stable.sim
sleep 2000
run general/parser/auto_create_tb.sim
sleep 2000
run general/parser/auto_create_tb_drop_tb.sim
sleep 2000
run general/parser/col_arithmetic_operation.sim
sleep 2000
run general/parser/columnValue.sim
sleep 2000
run general/parser/commit.sim
sleep 2000
run general/parser/create_db.sim
sleep 2000
run general/parser/create_mt.sim
sleep 2000
run general/parser/create_tb.sim
sleep 2000
run general/parser/dbtbnameValidate.sim
sleep 2000
run general/parser/fill.sim
sleep 2000
run general/parser/fill_stb.sim
sleep 2000
#run general/parser/fill_us.sim #
sleep 2000
run general/parser/first_last.sim
sleep 2000
run general/parser/import_commit1.sim
sleep 2000
run general/parser/import_commit2.sim
sleep 2000
run general/parser/import_commit3.sim
sleep 2000
#run general/parser/import_file.sim
sleep 2000
run general/parser/insert_tb.sim
sleep 2000
run general/parser/tags_dynamically_specifiy.sim
sleep 2000
run general/parser/interp.sim
sleep 2000
run general/parser/lastrow.sim
#sleep 2000
#run general/parser/alter.sim
#sleep 2000
#run general/parser/alter1.sim
#sleep 2000
#run general/parser/alter_stable.sim
#sleep 2000
#run general/parser/auto_create_tb.sim
#sleep 2000
#run general/parser/auto_create_tb_drop_tb.sim
#sleep 2000
#run general/parser/col_arithmetic_operation.sim
#sleep 2000
#run general/parser/columnValue.sim
#sleep 2000
#run general/parser/commit.sim
#sleep 2000
#run general/parser/create_db.sim
#sleep 2000
#run general/parser/create_mt.sim
#sleep 2000
#run general/parser/create_tb.sim
#sleep 2000
#run general/parser/dbtbnameValidate.sim
#sleep 2000
#run general/parser/fill.sim
#sleep 2000
#run general/parser/fill_stb.sim
#sleep 2000
##run general/parser/fill_us.sim #
#sleep 2000
#run general/parser/first_last.sim
#sleep 2000
#run general/parser/import_commit1.sim
#sleep 2000
#run general/parser/import_commit2.sim
#sleep 2000
#run general/parser/import_commit3.sim
#sleep 2000
##run general/parser/import_file.sim
#sleep 2000
#run general/parser/insert_tb.sim
#sleep 2000
#run general/parser/tags_dynamically_specifiy.sim
#sleep 2000
#run general/parser/interp.sim
#sleep 2000
#run general/parser/lastrow.sim
sleep 2000
run general/parser/limit.sim
sleep 2000
......
......@@ -27,7 +27,7 @@ $j = 1
$mt1 = $mtPrefix . $j
sql drop database if exits $db -x step1
sql drop database if exists $db -x step1
step1:
sql create database if not exists $db
sql use $db
......
......@@ -20,7 +20,7 @@ $i = 0
$db = $dbPrefix . $i
$mt = $mtPrefix . $i
sql drop database if exits $db -x step1
sql drop database if exists $db -x step1
step1:
sql create database if not exists $db
sql use $db
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册