提交 e14d3b7b 编写于 作者: sangshuduo's avatar sangshuduo

Merge branch 'master' into hotfix/sangshuduo/TD-5872-taosdemo-stmt-improve-for-master

......@@ -238,7 +238,7 @@ pipeline {
sh '''
cd ${WKC}/tests/examples/C#/taosdemo
mcs -out:taosdemo *.cs > /dev/null 2>&1
echo '' |./taosdemo
echo '' |./taosdemo -c /etc/taos
'''
sh '''
cd ${WKC}/tests/gotest
......@@ -256,13 +256,11 @@ pipeline {
steps {
pre_test()
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
timeout(time: 60, unit: 'MINUTES'){
sh '''
cd ${WKC}/tests/pytest
./crash_gen.sh -a -p -t 4 -s 2000
'''
}
timeout(time: 60, unit: 'MINUTES'){
sh '''
cd ${WKC}/tests/pytest
./crash_gen.sh -a -p -t 4 -s 2000
'''
}
timeout(time: 60, unit: 'MINUTES'){
// sh '''
......
......@@ -4,7 +4,7 @@ PROJECT(TDengine)
IF (DEFINED VERNUMBER)
SET(TD_VER_NUMBER ${VERNUMBER})
ELSE ()
SET(TD_VER_NUMBER "2.1.7.2")
SET(TD_VER_NUMBER "2.2.0.0")
ENDIF ()
IF (DEFINED VERCOMPATIBLE)
......
Subproject commit ceda5bf9fcd7836509ac97dcc0056b3f1dd48cc5
Subproject commit 11c1060d4f917dd799ae628b131db5d6a5ef6954
......@@ -102,6 +102,12 @@ elif echo $osinfo | grep -qwi "centos" ; then
elif echo $osinfo | grep -qwi "fedora" ; then
# echo "This is fedora system"
os_type=2
elif echo $osinfo | grep -qwi "Linx" ; then
# echo "This is Linx system"
os_type=1
service_mod=0
initd_mod=0
service_config_dir="/etc/systemd/system"
else
echo " osinfo: ${osinfo}"
echo " This is an officially unverified linux system,"
......
......@@ -20,44 +20,33 @@ fi
# Dynamic directory
if [ "$osType" != "Darwin" ]; then
data_dir="/var/lib/taos"
log_dir="/var/log/taos"
else
data_dir="/usr/local/var/lib/taos"
log_dir="/usr/local/var/log/taos"
fi
if [ "$osType" != "Darwin" ]; then
cfg_install_dir="/etc/taos"
else
cfg_install_dir="/usr/local/etc/taos"
fi
if [ "$osType" != "Darwin" ]; then
bin_link_dir="/usr/bin"
lib_link_dir="/usr/lib"
lib64_link_dir="/usr/lib64"
inc_link_dir="/usr/include"
install_main_dir="/usr/local/taos"
bin_dir="/usr/local/taos/bin"
else
data_dir="/usr/local/var/lib/taos"
log_dir="/usr/local/var/log/taos"
cfg_install_dir="/usr/local/etc/taos"
bin_link_dir="/usr/local/bin"
lib_link_dir="/usr/local/lib"
inc_link_dir="/usr/local/include"
fi
#install main path
if [ "$osType" != "Darwin" ]; then
install_main_dir="/usr/local/taos"
else
install_main_dir="/usr/local/Cellar/tdengine/${verNumber}"
fi
# old bin dir
if [ "$osType" != "Darwin" ]; then
bin_dir="/usr/local/taos/bin"
else
bin_dir="/usr/local/Cellar/tdengine/${verNumber}/bin"
bin_dir="/usr/local/Cellar/tdengine/${verNumber}/bin"
fi
service_config_dir="/etc/systemd/system"
......@@ -254,7 +243,10 @@ function install_lib() {
${csudo} ln -sf ${lib64_link_dir}/libtaos.so.1 ${lib64_link_dir}/libtaos.so
fi
else
${csudo} cp -Rf ${binary_dir}/build/lib/libtaos.* ${install_main_dir}/driver && ${csudo} chmod 777 ${install_main_dir}/driver/*
${csudo} cp -Rf ${binary_dir}/build/lib/libtaos.${verNumber}.dylib ${install_main_dir}/driver && ${csudo} chmod 777 ${install_main_dir}/driver/*
${csudo} ln -sf ${install_main_dir}/driver/libtaos.* ${lib_link_dir}/libtaos.1.dylib
${csudo} ln -sf ${lib_link_dir}/libtaos.1.dylib ${lib_link_dir}/libtaos.dylib
fi
install_jemalloc
......
name: tdengine
base: core18
version: '2.1.7.2'
version: '2.2.0.0'
icon: snap/gui/t-dengine.svg
summary: an open-source big data platform designed and optimized for IoT.
description: |
......@@ -72,7 +72,7 @@ parts:
- usr/bin/taosd
- usr/bin/taos
- usr/bin/taosdemo
- usr/lib/libtaos.so.2.1.7.2
- usr/lib/libtaos.so.2.2.0.0
- usr/lib/libtaos.so.1
- usr/lib/libtaos.so
......
......@@ -58,6 +58,7 @@ typedef struct SRetrieveSupport {
int32_t subqueryIndex; // index of current vnode in vnode list
struct SSqlObj *pParentSql;
tFilePage * localBuffer; // temp buffer, there is a buffer for each vnode to
uint32_t localBufferSize;
uint32_t numOfRetry; // record the number of retry times
} SRetrieveSupport;
......
......@@ -55,7 +55,7 @@ typedef struct STidTags {
#pragma pack(pop)
typedef struct SJoinSupporter {
SSqlObj* pObj; // parent SqlObj
int64_t pObj; // parent SqlObj
int32_t subqueryIndex; // index of sub query
SInterval interval;
SLimitVal limit; // limit info
......
......@@ -430,6 +430,7 @@ int32_t handleUserDefinedFunc(SSqlObj* pSql, struct SSqlInfo* pInfo) {
const char *msg2 = "path is too long";
const char *msg3 = "invalid outputtype";
const char *msg4 = "invalid script";
const char *msg5 = "invalid dyn lib";
SSqlCmd *pCmd = &pSql->cmd;
switch (pInfo->type) {
......@@ -443,6 +444,10 @@ int32_t handleUserDefinedFunc(SSqlObj* pSql, struct SSqlInfo* pInfo) {
}
createInfo->name.z[createInfo->name.n] = 0;
// funcname's naming rule is same to column
if (validateColumnName(createInfo->name.z) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
strdequote(createInfo->name.z);
......@@ -462,10 +467,16 @@ int32_t handleUserDefinedFunc(SSqlObj* pSql, struct SSqlInfo* pInfo) {
if (ret) {
return ret;
}
//distinguish *.lua and *.so
//validate *.lua or .so
int32_t pathLen = (int32_t)strlen(createInfo->path.z);
if ((pathLen > 3) && (0 == strncmp(createInfo->path.z + pathLen - 3, "lua", 3)) && !isValidScript(buf, len)) {
if ((pathLen > 4) && (0 == strncmp(createInfo->path.z + pathLen - 4, ".lua", 4)) && !isValidScript(buf, len)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg4);
} else if (pathLen > 3 && (0 == strncmp(createInfo->path.z + pathLen - 3, ".so", 3))) {
void *handle = taosLoadDll(createInfo->path.z);
taosCloseDll(handle);
if (handle == NULL) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg5);
}
}
//TODO CHECK CODE
......
......@@ -2355,6 +2355,7 @@ int tscProcessRetrieveFuncRsp(SSqlObj* pSql) {
parQueryInfo->pUdfInfo = pQueryInfo->pUdfInfo; // assigned to parent sql obj.
pQueryInfo->pUdfInfo = NULL;
taosReleaseRef(tscObjRef, parent->self);
return TSDB_CODE_SUCCESS;
}
......@@ -3007,6 +3008,7 @@ int32_t tscGetTableMetaImpl(SSqlObj* pSql, STableMetaInfo *pTableMetaInfo, bool
// in case of child table, here only get the
if (pMeta->tableType == TSDB_CHILD_TABLE) {
int32_t code = tscCreateTableMetaFromSTableMeta(&pTableMetaInfo->pTableMeta, name, &pTableMetaInfo->tableMetaCapacity);
pMeta = pTableMetaInfo->pTableMeta;
if (code != TSDB_CODE_SUCCESS) {
return getTableMetaFromMnode(pSql, pTableMetaInfo, autocreate);
}
......
......@@ -386,7 +386,7 @@ SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, int32_t index) {
return NULL;
}
pSupporter->pObj = pSql;
pSupporter->pObj = pSql->self;
pSupporter->subqueryIndex = index;
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
......@@ -1119,7 +1119,10 @@ bool emptyTagList(SArray* resList, int32_t size) {
static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRows) {
SJoinSupporter* pSupporter = (SJoinSupporter*)param;
SSqlObj* pParentSql = pSupporter->pObj;
int64_t handle = pSupporter->pObj;
SSqlObj* pParentSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle);
if (pParentSql == NULL) return;
SSqlObj* pSql = (SSqlObj*)tres;
SSqlCmd* pCmd = &pSql->cmd;
......@@ -1133,16 +1136,15 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
tscError("0x%"PRIx64" abort query due to other subquery failure. code:%d, global code:%d", pSql->self, numOfRows, pParentSql->res.code);
if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
return;
goto _return;
}
if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) {
return;
goto _return;
}
tscAsyncResultOnError(pParentSql);
return;
goto _return;
}
// check for the error code firstly
......@@ -1154,15 +1156,15 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
pParentSql->res.code = numOfRows;
if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
return;
goto _return;
}
if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) {
return;
goto _return;
}
tscAsyncResultOnError(pParentSql);
return;
goto _return;
}
// keep the results in memory
......@@ -1177,11 +1179,11 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
return;
goto _return;
}
tscAsyncResultOnError(pParentSql);
return;
goto _return;
}
pSupporter->pIdTagList = tmp;
......@@ -1193,7 +1195,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
// query not completed, continue to retrieve tid + tag tuples
if (!pRes->completed) {
taos_fetch_rows_a(tres, tidTagRetrieveCallback, param);
return;
goto _return;
}
}
......@@ -1215,14 +1217,14 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
// set the callback function
pSql->fp = tscJoinQueryCallback;
tscBuildAndSendRequest(pSql, NULL);
return;
goto _return;
}
// no data exists in next vnode, mark the <tid, tags> query completed
// only when there is no subquery exits any more, proceeds to get the intersect of the <tid, tags> tuple sets.
if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
//tscDebug("0x%"PRIx64" tagRetrieve:%p,%d completed, total:%d", pParentSql->self, tres, pSupporter->subqueryIndex, pParentSql->subState.numOfSub);
return;
goto _return;
}
SArray* resList = taosArrayInit(pParentSql->subState.numOfSub, sizeof(SArray *));
......@@ -1234,7 +1236,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
tscAsyncResultOnError(pParentSql);
taosArrayDestroy(resList);
return;
goto _return;
}
if (emptyTagList(resList, pParentSql->subState.numOfSub)) { // no results,return.
......@@ -1278,12 +1280,18 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
}
taosArrayDestroy(resList);
_return:
taosReleaseRef(tscObjRef, handle);
}
static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRows) {
SJoinSupporter* pSupporter = (SJoinSupporter*)param;
SSqlObj* pParentSql = pSupporter->pObj;
int64_t handle = pSupporter->pObj;
SSqlObj* pParentSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle);
if (pParentSql == NULL) return;
SSqlObj* pSql = (SSqlObj*)tres;
SSqlCmd* pCmd = &pSql->cmd;
......@@ -1295,16 +1303,16 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
tscError("0x%"PRIx64" abort query due to other subquery failure. code:%d, global code:%d", pSql->self, numOfRows, pParentSql->res.code);
if (quitAllSubquery(pSql, pParentSql, pSupporter)){
return;
goto _return;
}
if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) {
return;
goto _return;
}
tscAsyncResultOnError(pParentSql);
return;
goto _return;
}
// check for the error code firstly
......@@ -1315,15 +1323,15 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
pParentSql->res.code = numOfRows;
if (quitAllSubquery(pSql, pParentSql, pSupporter)){
return;
goto _return;
}
if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) {
return;
goto _return;
}
tscAsyncResultOnError(pParentSql);
return;
goto _return;
}
if (numOfRows > 0) { // write the compressed timestamp to disk file
......@@ -1335,12 +1343,12 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
return;
goto _return;
}
tscAsyncResultOnError(pParentSql);
return;
goto _return;
}
}
......@@ -1354,12 +1362,12 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
if (quitAllSubquery(pSql, pParentSql, pSupporter)){
return;
goto _return;
}
tscAsyncResultOnError(pParentSql);
return;
goto _return;
}
if (pSupporter->pTSBuf == NULL) {
......@@ -1378,7 +1386,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
pRes->row = pRes->numOfRows;
taos_fetch_rows_a(tres, tsCompRetrieveCallback, param);
return;
goto _return;
}
}
......@@ -1406,11 +1414,11 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
// set the callback function
pSql->fp = tscJoinQueryCallback;
tscBuildAndSendRequest(pSql, NULL);
return;
goto _return;
}
if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
return;
goto _return;
}
tscDebug("0x%"PRIx64" all subquery retrieve ts complete, do ts block intersect", pParentSql->self);
......@@ -1424,7 +1432,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
// set no result command
pParentSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
(*pParentSql->fp)(pParentSql->param, pParentSql, 0);
return;
goto _return;
}
// launch the query the retrieve actual results from vnode along with the filtered timestamp
......@@ -1433,12 +1441,17 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
//update the vgroup that involved in real data query
tscLaunchRealSubqueries(pParentSql);
_return:
taosReleaseRef(tscObjRef, handle);
}
static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfRows) {
SJoinSupporter* pSupporter = (SJoinSupporter*)param;
int64_t handle = pSupporter->pObj;
SSqlObj* pParentSql = pSupporter->pObj;
SSqlObj* pParentSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle);
if (pParentSql == NULL) return;
SSqlObj* pSql = (SSqlObj*)tres;
SSqlCmd* pCmd = &pSql->cmd;
......@@ -1449,15 +1462,15 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
tscError("0x%"PRIx64" abort query due to other subquery failure. code:%d, global code:%d", pSql->self, numOfRows, pParentSql->res.code);
if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
return;
goto _return;
}
if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) {
return;
goto _return;
}
tscAsyncResultOnError(pParentSql);
return;
goto _return;
}
......@@ -1468,7 +1481,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
tscError("0x%"PRIx64" retrieve failed, index:%d, code:%s", pSql->self, pSupporter->subqueryIndex, tstrerror(numOfRows));
tscAsyncResultOnError(pParentSql);
return;
goto _return;
}
if (numOfRows >= 0) {
......@@ -1494,7 +1507,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
pSql->fp = tscJoinQueryCallback;
tscBuildAndSendRequest(pSql, NULL);
return;
goto _return;
} else {
tscDebug("0x%"PRIx64" no result in current subquery anymore", pSql->self);
}
......@@ -1502,7 +1515,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
//tscDebug("0x%"PRIx64" sub:0x%"PRIx64",%d completed, total:%d", pParentSql->self, pSql->self, pSupporter->subqueryIndex, pState->numOfSub);
return;
goto _return;
}
tscDebug("0x%"PRIx64" all %d secondary subqueries retrieval completed, code:%d", pSql->self, pState->numOfSub, pParentSql->res.code);
......@@ -1540,6 +1553,9 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
// data has retrieved to client, build the join results
tscBuildResFromSubqueries(pParentSql);
_return:
taosReleaseRef(tscObjRef, handle);
}
void tscFetchDatablockForSubquery(SSqlObj* pSql) {
......@@ -1787,7 +1803,10 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
SSqlObj* pSql = (SSqlObj*)tres;
SJoinSupporter* pSupporter = (SJoinSupporter*)param;
SSqlObj* pParentSql = pSupporter->pObj;
int64_t handle = pSupporter->pObj;
SSqlObj* pParentSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle);
if (pParentSql == NULL) return;
// There is only one subquery and table for each subquery.
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
......@@ -1799,16 +1818,16 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
tscError("0x%"PRIx64" abort query due to other subquery failure. code:%d, global code:%d", pSql->self, code, pParentSql->res.code);
if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
return;
goto _return;
}
if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) {
return;
goto _return;
}
tscAsyncResultOnError(pParentSql);
return;
goto _return;
}
// TODO here retry is required, not directly returns to client
......@@ -1819,16 +1838,16 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
pParentSql->res.code = code;
if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
return;
goto _return;
}
if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) {
return;
goto _return;
}
tscAsyncResultOnError(pParentSql);
return;
goto _return;
}
// retrieve <tid, tag> tuples from vnode
......@@ -1836,7 +1855,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
pSql->fp = tidTagRetrieveCallback;
pSql->cmd.command = TSDB_SQL_FETCH;
tscBuildAndSendRequest(pSql, NULL);
return;
goto _return;
}
// retrieve ts_comp info from vnode
......@@ -1844,13 +1863,13 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
pSql->fp = tsCompRetrieveCallback;
pSql->cmd.command = TSDB_SQL_FETCH;
tscBuildAndSendRequest(pSql, NULL);
return;
goto _return;
}
// In case of consequence query from other vnode, do not wait for other query response here.
if (!(pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0))) {
if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
return;
goto _return;
}
}
......@@ -1873,6 +1892,11 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
tscAsyncResultOnError(pParentSql);
}
}
_return:
taosReleaseRef(tscObjRef, handle);
}
/////////////////////////////////////////////////////////////////////////////////////////
......@@ -1971,9 +1995,9 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList);
tscDebug(
"%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, transfer to tid_tag query to retrieve (tableId, tags), "
"0x%"PRIX64" subquery:0x%"PRIx64" tableIndex:%d, vgroupIndex:%d, type:%d, transfer to tid_tag query to retrieve (tableId, tags), "
"exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, tagIndex:%d, name:%s",
pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, tscNumOfExprs(pNewQueryInfo),
pSql->self, pNew->self, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, tscNumOfExprs(pNewQueryInfo),
numOfCols, pNewQueryInfo->fieldsInfo.numOfOutput, colIndex.columnIndex, tNameGetTableName(&pNewQueryInfo->pTableMetaInfo[0]->name));
} else {
SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1};
......@@ -2006,9 +2030,9 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList);
tscDebug(
"%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%u, transfer to ts_comp query to retrieve timestamps, "
"0x%"PRIX64" subquery:0x%"PRIx64" tableIndex:%d, vgroupIndex:%d, type:%u, transfer to ts_comp query to retrieve timestamps, "
"exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, name:%s",
pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, tscNumOfExprs(pNewQueryInfo),
pSql->self, pNew->self, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, tscNumOfExprs(pNewQueryInfo),
numOfCols, pNewQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pNewQueryInfo->pTableMetaInfo[0]->name));
}
} else {
......@@ -2593,6 +2617,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
trs->pOrderDescriptor = pDesc;
trs->localBuffer = (tFilePage *)calloc(1, nBufferSize + sizeof(tFilePage));
trs->localBufferSize = nBufferSize + sizeof(tFilePage);
if (trs->localBuffer == NULL) {
tscError("0x%"PRIx64" failed to malloc buffer for local buffer, orderOfSub:%d, reason:%s", pSql->self, i, strerror(errno));
tfree(trs);
......@@ -2713,8 +2738,10 @@ static int32_t tscReissueSubquery(SRetrieveSupport *oriTrs, SSqlObj *pSql, int32
memcpy(trsupport, oriTrs, sizeof(*trsupport));
const uint32_t nBufferSize = (1u << 16u); // 64KB
trsupport->localBuffer = (tFilePage *)calloc(1, nBufferSize + sizeof(tFilePage));
// the buffer size should be the same as tscHandleMasterSTableQuery, which was used to initialize the SColumnModel
// the capacity member of SColumnModel will be used to save the trsupport->localBuffer in tscRetrieveFromDnodeCallBack
trsupport->localBuffer = (tFilePage *)calloc(1, oriTrs->localBufferSize);
if (trsupport->localBuffer == NULL) {
tscError("0x%"PRIx64" failed to malloc buffer for local buffer, reason:%s", pSql->self, strerror(errno));
tfree(trsupport);
......
......@@ -122,6 +122,10 @@ int32_t tscAcquireRpc(const char *key, const char *user, const char *secretEncry
void taos_init_imp(void) {
char temp[128] = {0};
// In the APIs of other program language, taos_cleanup is not available yet.
// So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning.
atexit(taos_cleanup);
errno = TSDB_CODE_SUCCESS;
srand(taosGetTimestampSec());
......@@ -197,10 +201,6 @@ void taos_init_imp(void) {
tscRefId = taosOpenRef(200, tscCloseTscObj);
// In the APIs of other program language, taos_cleanup is not available yet.
// So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning.
atexit(taos_cleanup);
tscDebug("client is initialized successfully");
}
......
......@@ -1516,6 +1516,8 @@ void tscFreeSqlObj(SSqlObj* pSql) {
return;
}
int64_t sid = pSql->self;
tscDebug("0x%"PRIx64" start to free sqlObj", pSql->self);
pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
......@@ -1547,6 +1549,8 @@ void tscFreeSqlObj(SSqlObj* pSql) {
tfree(pCmd->payload);
pCmd->allocSize = 0;
tscDebug("0x%"PRIx64" addr:%p free completed", sid, pSql);
tsem_destroy(&pSql->rspSem);
memset(pSql, 0, sizeof(*pSql));
free(pSql);
......
......@@ -448,6 +448,7 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols
int dcol = 0;
while (dcol < pCols->numOfCols) {
bool setCol = 0;
SDataCol *pDataCol = &(pCols->cols[dcol]);
if (rcol >= schemaNCols(pSchema)) {
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
......@@ -458,13 +459,14 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols
STColumn *pRowCol = schemaColAt(pSchema, rcol);
if (pRowCol->colId == pDataCol->colId) {
void *value = tdGetRowDataOfCol(row, pRowCol->type, pRowCol->offset + TD_DATA_ROW_HEAD_SIZE);
if(!isNull(value, pDataCol->type)) setCol = 1;
dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints);
dcol++;
rcol++;
} else if (pRowCol->colId < pDataCol->colId) {
rcol++;
} else {
if(forceSetNull) {
if(forceSetNull || setCol) {
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
}
dcol++;
......@@ -482,6 +484,7 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo
int nRowCols = kvRowNCols(row);
while (dcol < pCols->numOfCols) {
bool setCol = 0;
SDataCol *pDataCol = &(pCols->cols[dcol]);
if (rcol >= nRowCols || rcol >= schemaNCols(pSchema)) {
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
......@@ -493,13 +496,14 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo
if (colIdx->colId == pDataCol->colId) {
void *value = tdGetKvRowDataOfCol(row, colIdx->offset);
if(!isNull(value, pDataCol->type)) setCol = 1;
dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints);
++dcol;
++rcol;
} else if (colIdx->colId < pDataCol->colId) {
++rcol;
} else {
if (forceSetNull) {
if (forceSetNull || setCol) {
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
}
++dcol;
......@@ -533,7 +537,7 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *
ASSERT(target->numOfRows + rowsToMerge <= target->maxPoints);
for (int i = 0; i < rowsToMerge; i++) {
for (int j = 0; j < source->numOfCols; j++) {
if (source->cols[j].len > 0) {
if (source->cols[j].len > 0 || target->cols[j].len > 0) {
dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i + (*pOffset)), target->numOfRows,
target->maxPoints);
}
......@@ -577,7 +581,7 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i
if (key1 < key2) {
for (int i = 0; i < src1->numOfCols; i++) {
ASSERT(target->cols[i].type == src1->cols[i].type);
if (src1->cols[i].len > 0) {
if (src1->cols[i].len > 0 || target->cols[i].len > 0) {
dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows,
target->maxPoints);
}
......@@ -595,6 +599,8 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i
} else if(!forceSetNull && key1 == key2 && src1->cols[i].len > 0) {
dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows,
target->maxPoints);
} else if(target->cols[i].len > 0) {
dataColSetNullAt(&target->cols[i], target->numOfRows);
}
}
target->numOfRows++;
......
......@@ -1581,7 +1581,6 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
assert(tsGlobalConfigNum <= TSDB_CFG_MAX_NUM);
#ifdef TD_TSZ
// lossy compress
cfg.option = "lossyColumns";
......@@ -1602,8 +1601,6 @@ static void doInitGlobalConfig(void) {
cfg.maxValue = MAX_FLOAT;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "dPrecision";
......@@ -1635,6 +1632,9 @@ static void doInitGlobalConfig(void) {
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
assert(tsGlobalConfigNum == TSDB_CFG_MAX_NUM);
#else
assert(tsGlobalConfigNum == TSDB_CFG_MAX_NUM - 5);
#endif
}
......
......@@ -586,6 +586,130 @@ public class TSDBPreparedStatementTest {
Assert.assertEquals(numOfRows, rows);
}
@Test
public void bindDataQueryTest() throws SQLException {
Statement stmt = conn.createStatement();
stmt.execute("drop table if exists weather_test");
stmt.execute("create table weather_test(ts timestamp, f1 nchar(10), f2 binary(10)) tags (t1 int, t2 binary(10))");
int numOfRows = 1;
TSDBPreparedStatement s = (TSDBPreparedStatement) conn.prepareStatement("insert into ? using weather_test tags(?,?) (ts, f2) values(?, ?)");
s.setTableName("w2");
s.setTagInt(0, 1);
s.setTagString(1, "test");
ArrayList<Long> ts = new ArrayList<>();
for (int i = 0; i < numOfRows; i++) {
ts.add(System.currentTimeMillis() + i);
}
s.setTimestamp(0, ts);
ArrayList<String> s2 = new ArrayList<>();
for (int i = 0; i < numOfRows; i++) {
s2.add("test" + i % 4);
}
s.setString(1, s2, 10);
s.columnDataAddBatch();
s.columnDataExecuteBatch();
s.columnDataCloseBatch();
String sql = "select * from weather_test where t1 >= ? and t1 <= ?";
TSDBPreparedStatement s1 = (TSDBPreparedStatement) conn.prepareStatement(sql);
s1.setInt(1, 0);
s1.setInt(2, 10);
ResultSet rs = s1.executeQuery();
int rows = 0;
while (rs.next()) {
rows++;
}
Assert.assertEquals(numOfRows, rows);
}
@Test
public void setTagNullTest()throws SQLException {
Statement stmt = conn.createStatement();
stmt.execute("drop table if exists weather_test");
stmt.execute("create table weather_test(ts timestamp, c1 int) tags (t1 tinyint, t2 smallint, t3 int, t4 bigint, t5 float, t6 double, t7 bool, t8 binary(10), t9 nchar(10))");
int numOfRows = 1;
TSDBPreparedStatement s = (TSDBPreparedStatement) conn.prepareStatement("insert into ? using weather_test tags(?,?,?,?,?,?,?,?,?) values(?, ?)");
s.setTableName("w3");
s.setTagNull(0, TSDBConstants.TSDB_DATA_TYPE_TINYINT);
s.setTagNull(1, TSDBConstants.TSDB_DATA_TYPE_SMALLINT);
s.setTagNull(2, TSDBConstants.TSDB_DATA_TYPE_INT);
s.setTagNull(3, TSDBConstants.TSDB_DATA_TYPE_BIGINT);
s.setTagNull(4, TSDBConstants.TSDB_DATA_TYPE_FLOAT);
s.setTagNull(5, TSDBConstants.TSDB_DATA_TYPE_DOUBLE);
s.setTagNull(6, TSDBConstants.TSDB_DATA_TYPE_BOOL);
s.setTagNull(7, TSDBConstants.TSDB_DATA_TYPE_BINARY);
s.setTagNull(8, TSDBConstants.TSDB_DATA_TYPE_NCHAR);
ArrayList<Long> ts = new ArrayList<>();
for (int i = 0; i < numOfRows; i++) {
ts.add(System.currentTimeMillis() + i);
}
s.setTimestamp(0, ts);
ArrayList<Integer> s2 = new ArrayList<>();
for (int i = 0; i < numOfRows; i++) {
s2.add(i);
}
s.setInt(1, s2);
s.columnDataAddBatch();
s.columnDataExecuteBatch();
s.columnDataCloseBatch();
}
private String stringGenerator(int length) {
String source = "abcdefghijklmnopqrstuvwxyz";
StringBuilder sb = new StringBuilder();
Random rand = new Random();
for(int i = 0; i < length; i++) {
sb.append(source.charAt(rand.nextInt(26)));
}
return sb.toString();
}
@Test(expected = SQLException.class)
public void setMaxTableNameTest()throws SQLException {
Statement stmt = conn.createStatement();
stmt.execute("drop table if exists weather_test");
stmt.execute("create table weather_test(ts timestamp, c1 int) tags (t1 int)");
TSDBPreparedStatement s = (TSDBPreparedStatement) conn.prepareStatement("insert into ? using weather_test tags(?) values(?, ?)");
String tbname = stringGenerator(193);
s.setTableName(tbname);
s.setTagInt(0, 1);
int numOfRows = 1;
ArrayList<Long> ts = new ArrayList<>();
for (int i = 0; i < numOfRows; i++) {
ts.add(System.currentTimeMillis() + i);
}
s.setTimestamp(0, ts);
ArrayList<Integer> s2 = new ArrayList<>();
for (int i = 0; i < numOfRows; i++) {
s2.add(i);
}
s.setInt(1, s2);
s.columnDataAddBatch();
s.columnDataExecuteBatch();
s.columnDataCloseBatch();
}
@Test(expected = SQLException.class)
public void createTwoSameDbTest() throws SQLException {
// when
......
......@@ -98,6 +98,7 @@ void shellParseArgument(int argc, char *argv[], SShellArguments *arguments) {
tstrncpy(g_password, (char *)(argv[i] + 2), SHELL_MAX_PASSWORD_LEN);
}
arguments->password = g_password;
arguments->is_use_passwd = true;
strcpy(argv[i], "");
argc -= 1;
}
......
......@@ -255,8 +255,12 @@ int32_t shellRunCommand(TAOS* con, char* command) {
}
if (c == '\\') {
esc = true;
continue;
if (quote != 0 && (*command == '_' || *command == '\\')) {
//DO nothing
} else {
esc = true;
continue;
}
}
if (quote == c) {
......
......@@ -712,6 +712,15 @@ void leakTest(){
}
#define DB_CNT 500
void test_same_float(int algo, bool lossy){
float ori = 123.456789123;
float floats [DB_CNT];
for(int i=0; i< DB_CNT; i++){
floats[i] = ori;
}
DoFloat(floats, DB_CNT, algo, lossy);
}
void test_same_double(int algo){
double ori = 3.1415926;
......@@ -721,7 +730,6 @@ void test_same_double(int algo){
}
DoDouble(doubles, DB_CNT, algo);
}
#ifdef TD_TSZ
......@@ -781,6 +789,10 @@ int main(int argc, char *argv[]) {
return 0;
}
if(strcmp(argv[1], "-samef") == 0) {
test_same_float(atoi(argv[2]), true);
return 0;
}
if(strcmp(argv[1], "-samed") == 0) {
test_same_double(atoi(argv[2]));
return 0;
......
......@@ -31,7 +31,6 @@ void taosCloseDll(void *handle) {
int taosSetConsoleEcho(bool on)
{
#if 0
#define ECHOFLAGS (ECHO | ECHOE | ECHOK | ECHONL)
int err;
struct termios term;
......@@ -52,7 +51,6 @@ int taosSetConsoleEcho(bool on)
return -1;
}
#endif
return 0;
}
......@@ -147,6 +147,7 @@ typedef struct HttpContext {
int32_t state;
uint8_t reqType;
uint8_t parsed;
uint8_t error;
char ipstr[22];
char user[TSDB_USER_LEN]; // parsed from auth token or login message
char pass[HTTP_PASSWORD_LEN];
......
......@@ -188,11 +188,12 @@ void httpCloseContextByApp(HttpContext *pContext) {
pContext->parsed = false;
bool keepAlive = true;
if (parser && parser->httpVersion == HTTP_VERSION_10 && parser->keepAlive != HTTP_KEEPALIVE_ENABLE) {
if (pContext->error == true) {
keepAlive = false;
} else if (parser && parser->httpVersion == HTTP_VERSION_10 && parser->keepAlive != HTTP_KEEPALIVE_ENABLE) {
keepAlive = false;
} else if (parser && parser->httpVersion != HTTP_VERSION_10 && parser->keepAlive == HTTP_KEEPALIVE_DISABLE) {
keepAlive = false;
} else {
}
if (keepAlive) {
......
......@@ -1157,6 +1157,10 @@ static int32_t httpParseChar(HttpParser *parser, const char c, int32_t *again) {
httpOnError(parser, HTTP_CODE_INTERNAL_SERVER_ERROR, TSDB_CODE_HTTP_PARSE_ERROR_STATE);
}
if (ok != 0) {
pContext->error = true;
}
return ok;
}
......
......@@ -3734,6 +3734,10 @@ static void interp_function_impl(SQLFunctionCtx *pCtx) {
}
}
} else {
if (GET_RES_INFO(pCtx)->numOfRes > 0) {
return;
}
// no data generated yet
if (pCtx->size < 1) {
return;
......@@ -3763,11 +3767,15 @@ static void interp_function_impl(SQLFunctionCtx *pCtx) {
if (pCtx->size > 1) {
ekey = GET_TS_DATA(pCtx, 1);
if ((ascQuery && ekey < pCtx->startTs) || ((!ascQuery) && ekey > pCtx->startTs)) {
setNull(pCtx->pOutput, pCtx->inputType, pCtx->inputBytes);
SET_VAL(pCtx, 1, 1);
return;
}
val = ((char*)pCtx->pInput) + pCtx->inputBytes;
} else {
setNull(pCtx->pOutput, pCtx->inputType, pCtx->inputBytes);
SET_VAL(pCtx, 1, 1);
return;
}
} else {
......@@ -3812,7 +3820,7 @@ static void interp_function_impl(SQLFunctionCtx *pCtx) {
SET_VAL(pCtx, 1, 1);
}
static void interp_function(SQLFunctionCtx *pCtx) {
static void interp_function(SQLFunctionCtx *pCtx) {
// at this point, the value is existed, return directly
if (pCtx->size > 0) {
bool ascQuery = (pCtx->order == TSDB_ORDER_ASC);
......
......@@ -757,6 +757,16 @@ static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) {
}
}
static void unsetResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) {
assert(pResult != NULL && (type == RESULT_ROW_START_INTERP || type == RESULT_ROW_END_INTERP));
if (type == RESULT_ROW_START_INTERP) {
pResult->startInterp = false;
} else {
pResult->endInterp = false;
}
}
static bool resultRowInterpolated(SResultRow* pResult, SResultTsInterpType type) {
assert(pResult != NULL && (type == RESULT_ROW_START_INTERP || type == RESULT_ROW_END_INTERP));
if (type == RESULT_ROW_START_INTERP) {
......@@ -1368,6 +1378,11 @@ static bool setTimeWindowInterpolationStartTs(SOperatorInfo* pOperatorInfo, SQLF
bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
if (pos < 0 && !ascQuery) {
setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfOutput, RESULT_ROW_START_INTERP);
return true;
}
TSKEY curTs = tsCols[pos];
TSKEY lastTs = *(TSKEY *) pRuntimeEnv->prevRow[0];
......@@ -1603,6 +1618,7 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
}
int32_t startPos = ascQuery? 0 : (pSDataBlock->info.rows - 1);
int32_t ostartPos = 0;
TSKEY ts = getStartTsKey(pQueryAttr, &pSDataBlock->info.window, tsCols, pSDataBlock->info.rows);
STimeWindow win = getCurrentActiveTimeWindow(pResultRowInfo, ts, pQueryAttr);
......@@ -1611,7 +1627,7 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
SResultRow* pResult = NULL;
int32_t forwardStep = 0;
int32_t ret = 0;
STimeWindow preWin = win;
//STimeWindow preWin = win;
while (1) {
// null data, failed to allocate more memory buffer
......@@ -1625,11 +1641,17 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
forwardStep = getNumOfRowsInTimeWindow(pRuntimeEnv, &pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, true);
// window start(end) key interpolation
unsetResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
ostartPos = startPos;
if (!ascQuery) {
startPos += forwardStep * step;
}
doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->pCtx, pResult, &win, startPos, forwardStep);
doApplyFunctions(pRuntimeEnv, pInfo->pCtx, ascQuery ? &win : &preWin, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput);
preWin = win;
doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &win, ostartPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput);
int32_t prevEndPos = (forwardStep - 1) * step + startPos;
int32_t prevEndPos = (!ascQuery) ? startPos - step : (forwardStep - 1) * step + startPos;
startPos = getNextQualifiedWindow(pQueryAttr, &win, &pSDataBlock->info, tsCols, binarySearchForKey, prevEndPos);
if (startPos < 0) {
if ((ascQuery && win.skey <= pQueryAttr->window.ekey) || ((!ascQuery) && win.ekey >= pQueryAttr->window.ekey)) {
......@@ -1639,11 +1661,16 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
startPos = pSDataBlock->info.rows - 1;
if (ascQuery) {
startPos = pSDataBlock->info.rows - 1;
} else {
startPos = 0;
}
// window start(end) key interpolation
doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->pCtx, pResult, &win, startPos, forwardStep);
doApplyFunctions(pRuntimeEnv, pInfo->pCtx, ascQuery ? &win : &preWin, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput);
forwardStep = 1;
unsetResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
setNotInterpoWindowKey(pInfo->pCtx, pQueryAttr->numOfOutput, RESULT_ROW_START_INTERP);
doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &win, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput);
}
break;
......@@ -3525,7 +3552,7 @@ void copyToSDataBlock(SQueryRuntimeEnv* pRuntimeEnv, int32_t threshold, SSDataBl
}
static void updateTableQueryInfoForReverseScan(STableQueryInfo *pTableQueryInfo) {
static void updateTableQueryInfoForReverseScan(STableQueryInfo *pTableQueryInfo, int64_t qId) {
if (pTableQueryInfo == NULL) {
return;
}
......@@ -3536,6 +3563,9 @@ static void updateTableQueryInfoForReverseScan(STableQueryInfo *pTableQueryInfo)
SWITCH_ORDER(pTableQueryInfo->cur.order);
pTableQueryInfo->cur.vgroupIndex = -1;
qDebug("0x%"PRIx64" update query window for reverse scan, %"PRId64" - %"PRId64", lastKey:%"PRId64, qId, pTableQueryInfo->win.skey, pTableQueryInfo->win.ekey,
pTableQueryInfo->lastKey);
// set the index to be the end slot of result rows array
SResultRowInfo* pResultRowInfo = &pTableQueryInfo->resInfo;
if (pResultRowInfo->size > 0) {
......@@ -3556,7 +3586,7 @@ static void setupQueryRangeForReverseScan(SQueryRuntimeEnv* pRuntimeEnv) {
size_t t = taosArrayGetSize(group);
for (int32_t j = 0; j < t; ++j) {
STableQueryInfo *pCheckInfo = taosArrayGetP(group, j);
updateTableQueryInfoForReverseScan(pCheckInfo);
updateTableQueryInfoForReverseScan(pCheckInfo, GET_QID(pRuntimeEnv));
// update the last key in tableKeyInfo list, the tableKeyInfo is used to build the tsdbQueryHandle and decide
// the start check timestamp of tsdbQueryHandle
......@@ -4096,7 +4126,7 @@ void setParamForStableStddevByColData(SQueryRuntimeEnv* pRuntimeEnv, SQLFunction
* merged during merge stage. In this case, we need the pTableQueryInfo->lastResRows to decide if there
* is a previous result generated or not.
*/
void setIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, TSKEY key) {
void setIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow* winx, int32_t tid) {
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
STableQueryInfo *pTableQueryInfo = pRuntimeEnv->current;
SResultRowInfo *pResultRowInfo = &pTableQueryInfo->resInfo;
......@@ -4105,9 +4135,14 @@ void setIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, TSKEY key) {
return;
}
TSKEY key = QUERY_IS_ASC_QUERY(pQueryAttr)? winx->skey:winx->ekey;
qDebug("0x%"PRIx64" update query window, tid:%d, %"PRId64" - %"PRId64", old:%"PRId64" - %"PRId64, GET_QID(pRuntimeEnv), tid, key, pTableQueryInfo->win.ekey,
pTableQueryInfo->win.skey, pTableQueryInfo->win.ekey);
pTableQueryInfo->win.skey = key;
STimeWindow win = {.skey = key, .ekey = pQueryAttr->window.ekey};
/**
* In handling the both ascending and descending order super table query, we need to find the first qualified
* timestamp of this table, and then set the first qualified start timestamp.
......@@ -5744,7 +5779,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
assert(*newgroup == false);
//assert(*newgroup == false);
*newgroup = prevVal;
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
......@@ -6029,7 +6064,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
setTagValue(pOperator, pTableQueryInfo->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput);
setInputDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pQueryAttr->order.order);
setIntervalQueryRange(pRuntimeEnv, pBlock->info.window.skey);
setIntervalQueryRange(pRuntimeEnv, &pBlock->info.window, pBlock->info.tid);
hashIntervalAgg(pOperator, &pTableQueryInfo->resInfo, pBlock, pTableQueryInfo->groupIndex);
}
......@@ -6084,7 +6119,8 @@ static SSDataBlock* doAllSTableIntervalAgg(void* param, bool* newgroup) {
setTagValue(pOperator, pTableQueryInfo->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput);
setInputDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pQueryAttr->order.order);
setIntervalQueryRange(pRuntimeEnv, pBlock->info.window.skey);
setIntervalQueryRange(pRuntimeEnv, &pBlock->info.window, pBlock->info.tid);
hashAllIntervalAgg(pOperator, &pTableQueryInfo->resInfo, pBlock, pTableQueryInfo->groupIndex);
}
......@@ -6102,9 +6138,6 @@ static SSDataBlock* doAllSTableIntervalAgg(void* param, bool* newgroup) {
return pIntervalInfo->pRes;
}
static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo *pInfo, SSDataBlock *pSDataBlock) {
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
STableQueryInfo* item = pRuntimeEnv->current;
......
......@@ -239,6 +239,9 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, void** data, int32_t outputR
} else {
setNull(output, pCol->col.type, pCol->col.bytes);
}
if (!FILL_IS_ASC_FILL(pFillInfo)) {
memcpy(*prev + pCol->col.offset, output, pCol->col.bytes);
}
} else {
assignVal(output, (char*)&pCol->fillVal.i, pCol->col.bytes, pCol->col.type);
}
......
......@@ -1263,13 +1263,11 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
while (true) {
key1 = (*iter >= pDataCols->numOfRows) ? INT64_MAX : dataColsKeyAt(pDataCols, *iter);
bool isRowDel = false;
SMemRow row = tsdbNextIterRow(pCommitIter->pIter);
if (row == NULL || memRowKey(row) > maxKey) {
key2 = INT64_MAX;
} else {
key2 = memRowKey(row);
isRowDel = memRowDeleted(row);
}
if (key1 == INT64_MAX && key2 == INT64_MAX) break;
......@@ -1284,36 +1282,33 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
pTarget->numOfRows++;
(*iter)++;
} else if (key1 > key2) {
if (!isRowDel) {
if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) {
pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row));
ASSERT(pSchema != NULL);
}
tdAppendMemRowToDataCol(row, pSchema, pTarget, true);
if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) {
pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row));
ASSERT(pSchema != NULL);
}
tdAppendMemRowToDataCol(row, pSchema, pTarget, true);
tSkipListIterNext(pCommitIter->pIter);
} else {
if (update) {
if (!isRowDel) {
if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) {
pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row));
ASSERT(pSchema != NULL);
}
tdAppendMemRowToDataCol(row, pSchema, pTarget, update == TD_ROW_OVERWRITE_UPDATE);
}
} else {
ASSERT(!isRowDel);
if (update != TD_ROW_OVERWRITE_UPDATE) {
//copy disk data
for (int i = 0; i < pDataCols->numOfCols; i++) {
//TODO: dataColAppendVal may fail
dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows,
pTarget->maxPoints);
}
pTarget->numOfRows++;
if(update == TD_ROW_DISCARD_UPDATE) pTarget->numOfRows++;
}
if (update != TD_ROW_DISCARD_UPDATE) {
//copy mem data
if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) {
pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row));
ASSERT(pSchema != NULL);
}
tdAppendMemRowToDataCol(row, pSchema, pTarget, update == TD_ROW_OVERWRITE_UPDATE);
}
(*iter)++;
tSkipListIterNext(pCommitIter->pIter);
......
......@@ -859,13 +859,25 @@ static TSKEY extractFirstTraverseKey(STableCheckInfo* pCheckInfo, int32_t order,
pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH;
}
return r1;
} else if (r1 < r2 && ASCENDING_TRAVERSE(order)) {
pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
return r1;
}
else {
pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
return r2;
} else {
if (ASCENDING_TRAVERSE(order)) {
if (r1 < r2) {
pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
return r1;
} else {
pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
return r2;
}
} else {
if (r1 < r2) {
pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
return r2;
} else {
pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
return r1;
}
}
}
}
......@@ -930,7 +942,7 @@ static SMemRow getSMemRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order,
pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
return rimem;
} else {
pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
return rmem;
}
}
......@@ -1329,11 +1341,11 @@ static int32_t handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SBlock* p
assert(cur->blockCompleted);
if (cur->rows == binfo.rows) {
tsdbDebug("%p whole file block qualified, brange:%"PRId64"-%"PRId64", rows:%d, lastKey:%"PRId64", %"PRIx64,
pQueryHandle, cur->win.skey, cur->win.ekey, cur->rows, cur->lastKey, pQueryHandle->qId);
tsdbDebug("%p whole file block qualified, brange:%"PRId64"-%"PRId64", rows:%d, lastKey:%"PRId64", tid:%d, %"PRIx64,
pQueryHandle, cur->win.skey, cur->win.ekey, cur->rows, cur->lastKey, binfo.tid, pQueryHandle->qId);
} else {
tsdbDebug("%p create data block from remain file block, brange:%"PRId64"-%"PRId64", rows:%d, total:%d, lastKey:%"PRId64", %"PRIx64,
pQueryHandle, cur->win.skey, cur->win.ekey, cur->rows, binfo.rows, cur->lastKey, pQueryHandle->qId);
tsdbDebug("%p create data block from remain file block, brange:%"PRId64"-%"PRId64", rows:%d, total:%d, lastKey:%"PRId64", tid:%d, %"PRIx64,
pQueryHandle, cur->win.skey, cur->win.ekey, cur->rows, binfo.rows, cur->lastKey, binfo.tid, pQueryHandle->qId);
}
}
......
......@@ -20,7 +20,7 @@
extern "C" {
#endif
#define TSDB_CFG_MAX_NUM 116 // 110 + 6 with lossy option
#define TSDB_CFG_MAX_NUM 119
#define TSDB_CFG_PRINT_LEN 23
#define TSDB_CFG_OPTION_LEN 24
#define TSDB_CFG_VALUE_LEN 41
......
......@@ -112,14 +112,15 @@ void taosArrayRemoveBatch(SArray *pArray, const int32_t* pData, int32_t numOfEle
i += 1;
}
assert(i == pData[numOfElems - 1] + 1);
assert(i == pData[numOfElems - 1] + 1 && i <= size);
int32_t dstIndex = pData[numOfElems - 1] - numOfElems + 1;
int32_t srcIndex = pData[numOfElems - 1] + 1;
char* dst = TARRAY_GET_ELEM(pArray, dstIndex);
char* src = TARRAY_GET_ELEM(pArray, srcIndex);
memmove(dst, src, pArray->elemSize * (pArray->size - numOfElems));
int32_t dstIndex = pData[numOfElems - 1] - numOfElems + 1;
if (pArray->size - srcIndex > 0) {
char* dst = TARRAY_GET_ELEM(pArray, dstIndex);
char* src = TARRAY_GET_ELEM(pArray, srcIndex);
memmove(dst, src, pArray->elemSize * (pArray->size - srcIndex));
}
pArray->size -= numOfElems;
}
......
......@@ -71,6 +71,8 @@ static pthread_once_t cacheThreadInit = PTHREAD_ONCE_INIT;
static pthread_mutex_t guard = PTHREAD_MUTEX_INITIALIZER;
static SArray* pCacheArrayList = NULL;
static bool stopRefreshWorker = false;
static bool refreshWorkerNormalStopped = false;
static bool refreshWorkerUnexpectedStopped = false;
static void doInitRefreshThread(void) {
pCacheArrayList = taosArrayInit(4, POINTER_BYTES);
......@@ -537,7 +539,10 @@ void taosCacheCleanup(SCacheObj *pCacheObj) {
pCacheObj->deleting = 1;
// wait for the refresh thread quit before destroying the cache object.
// But in the dll, the child thread will be killed before atexit takes effect.
while(atomic_load_8(&pCacheObj->deleting) != 0) {
if (refreshWorkerNormalStopped) break;
if (refreshWorkerUnexpectedStopped) return;
taosMsleep(50);
}
......@@ -676,6 +681,12 @@ static void doCacheRefresh(SCacheObj* pCacheObj, int64_t time, __cache_free_fn_t
taosHashCondTraverse(pCacheObj->pHashTable, travHashTableFn, &sup);
}
void taosCacheRefreshWorkerUnexpectedStopped(void) {
if(!refreshWorkerNormalStopped) {
refreshWorkerUnexpectedStopped=true;
}
}
void* taosCacheTimedRefresh(void *handle) {
assert(pCacheArrayList != NULL);
uDebug("cache refresh thread starts");
......@@ -684,6 +695,7 @@ void* taosCacheTimedRefresh(void *handle) {
const int32_t SLEEP_DURATION = 500; //500 ms
int64_t count = 0;
atexit(taosCacheRefreshWorkerUnexpectedStopped);
while(1) {
taosMsleep(SLEEP_DURATION);
......@@ -748,6 +760,7 @@ void* taosCacheTimedRefresh(void *handle) {
pCacheArrayList = NULL;
pthread_mutex_destroy(&guard);
refreshWorkerNormalStopped=true;
uDebug("cache refresh thread quits");
return NULL;
......@@ -762,6 +775,6 @@ void taosCacheRefresh(SCacheObj *pCacheObj, __cache_free_fn_t fp) {
doCacheRefresh(pCacheObj, now, fp);
}
void taosStopCacheRefreshWorker() {
stopRefreshWorker = false;
void taosStopCacheRefreshWorker(void) {
stopRefreshWorker = true;
}
\ No newline at end of file
......@@ -262,6 +262,7 @@ int patternMatch(const char *patterStr, const char *str, size_t size, const SPat
c1 = str[j++];
if (j <= size) {
if (c == '\\' && patterStr[i] == '_' && c1 == '_') { i++; continue; }
if (c == c1 || tolower(c) == tolower(c1) || (c == pInfo->matchOne && c1 != 0)) {
continue;
}
......
......@@ -442,7 +442,7 @@ static int taosDecRefCount(int rsetId, int64_t rid, int remove) {
}
released = 1;
} else {
uTrace("rsetId:%d p:%p rid:%" PRId64 " is released", rsetId, pNode->p, rid);
uTrace("rsetId:%d p:%p rid:%" PRId64 " is released, count:%d", rsetId, pNode->p, rid, pNode->count);
}
} else {
uTrace("rsetId:%d rid:%" PRId64 " is not there, failed to release/remove", rsetId, rid);
......
......@@ -488,7 +488,7 @@ SOCKET taosOpenTcpServerSocket(uint32_t ip, uint16_t port) {
return -1;
}
if (listen(sockFd, 10) < 0) {
if (listen(sockFd, 1024) < 0) {
uError("listen tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno));
taosCloseSocket(sockFd);
return -1;
......
......@@ -64,12 +64,15 @@ int32_t strRmquote(char *z, int32_t len){
int32_t j = 0;
for (uint32_t k = 1; k < len - 1; ++k) {
if (z[k] == '\\' || (z[k] == delim && z[k + 1] == delim)) {
if (z[k] == '\\' && z[k + 1] == '_') {
//match '_' self
} else {
z[j] = z[k + 1];
cnt++;
j++;
k++;
continue;
cnt++;
j++;
k++;
continue;
}
}
z[j] = z[k];
......
......@@ -289,6 +289,13 @@ static int32_t vnodeWriteToWQueueImp(SVWriteMsg *pWrite) {
int64_t queuedSize = atomic_add_fetch_64(&pVnode->queuedWMsgSize, pWrite->walHead.len);
if (queued > MAX_QUEUED_MSG_NUM || queuedSize > MAX_QUEUED_MSG_SIZE) {
if (pWrite->qtype == TAOS_QTYPE_FWD) {
queued = atomic_sub_fetch_32(&pVnode->queuedWMsg, 1);
queuedSize = atomic_sub_fetch_64(&pVnode->queuedWMsgSize, pWrite->walHead.len);
return -1;
}
int32_t ms = (queued / MAX_QUEUED_MSG_NUM) * 10 + 3;
if (ms > 100) ms = 100;
vDebug("vgId:%d, too many msg:%d in vwqueue, flow control %dms", pVnode->vgId, queued, ms);
......
......@@ -8,22 +8,7 @@
#include <time.h>
#include <unistd.h>
int numThreads = 8;
int numSuperTables = 8;
int numChildTables = 4; // per thread, per super table
int numRowsPerChildTable = 2048;
void shuffle(char** lines, size_t n) {
if (n > 1) {
size_t i;
for (i = 0; i < n - 1; i++) {
size_t j = i + rand() / (RAND_MAX / (n - i) + 1);
char* t = lines[j];
lines[j] = lines[i];
lines[i] = t;
}
}
}
#define MAX_THREAD_LINE_BATCHES 1024
void printThreadId(pthread_t id, char* buf)
{
......@@ -38,10 +23,15 @@ static int64_t getTimeInUs() {
return (int64_t)systemTime.tv_sec * 1000000L + (int64_t)systemTime.tv_usec;
}
typedef struct {
TAOS* taos;
typedef struct {
char** lines;
int numLines;
} SThreadLinesBatch;
typedef struct {
TAOS* taos;
int numBatches;
SThreadLinesBatch batches[MAX_THREAD_LINE_BATCHES];
int64_t costTime;
} SThreadInsertArgs;
......@@ -49,16 +39,95 @@ static void* insertLines(void* args) {
SThreadInsertArgs* insertArgs = (SThreadInsertArgs*) args;
char tidBuf[32] = {0};
printThreadId(pthread_self(), tidBuf);
printf("%s, thread: 0x%s\n", "begin taos_insert_lines", tidBuf);
int64_t begin = getTimeInUs();
int32_t code = taos_insert_lines(insertArgs->taos, insertArgs->lines, insertArgs->numLines);
int64_t end = getTimeInUs();
insertArgs->costTime = end-begin;
printf("code: %d, %s. time used:%"PRId64", thread: 0x%s\n", code, tstrerror(code), end - begin, tidBuf);
for (int i = 0; i < insertArgs->numBatches; ++i) {
SThreadLinesBatch* batch = insertArgs->batches + i;
printf("%s, thread: 0x%s\n", "begin taos_insert_lines", tidBuf);
int64_t begin = getTimeInUs();
int32_t code = taos_insert_lines(insertArgs->taos, batch->lines, batch->numLines);
int64_t end = getTimeInUs();
insertArgs->costTime += end - begin;
printf("code: %d, %s. time used:%"PRId64", thread: 0x%s\n", code, tstrerror(code), end - begin, tidBuf);
}
return NULL;
}
int32_t getLineTemplate(char* lineTemplate, int templateLen, int numFields) {
if (numFields <= 4) {
char* sample = "sta%d,t3=%di32 c3=2147483647i32,c4=9223372036854775807i64,c9=11.12345f32,c10=22.123456789f64 %lldms";
snprintf(lineTemplate, templateLen, "%s", sample);
return 0;
}
if (numFields <= 13) {
char* sample = "sta%d,t0=true,t1=127i8,t2=32767i16,t3=%di32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=254u8,c6=32770u16,c7=2147483699u32,c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" %lldms";
snprintf(lineTemplate, templateLen, "%s", sample);
return 0;
}
char* lineFormatTable = "sta%d,t0=true,t1=127i8,t2=32767i16,t3=%di32 ";
snprintf(lineTemplate+strlen(lineTemplate), templateLen-strlen(lineTemplate), "%s", lineFormatTable);
int offset[] = {numFields*2/5, numFields*4/5, numFields};
for (int i = 0; i < offset[0]; ++i) {
snprintf(lineTemplate+strlen(lineTemplate), templateLen-strlen(lineTemplate), "c%d=%di32,", i, i);
}
for (int i=offset[0]+1; i < offset[1]; ++i) {
snprintf(lineTemplate+strlen(lineTemplate), templateLen-strlen(lineTemplate), "c%d=%d.43f64,", i, i);
}
for (int i = offset[1]+1; i < offset[2]; ++i) {
snprintf(lineTemplate+strlen(lineTemplate), templateLen-strlen(lineTemplate), "c%d=\"%d\",", i, i);
}
char* lineFormatTs = " %lldms";
snprintf(lineTemplate+strlen(lineTemplate)-1, templateLen-strlen(lineTemplate)+1, "%s", lineFormatTs);
return 0;
}
int main(int argc, char* argv[]) {
int numThreads = 8;
int numSuperTables = 1;
int numChildTables = 256;
int numRowsPerChildTable = 8192;
int numFields = 13;
int maxLinesPerBatch = 16384;
int opt;
while ((opt = getopt(argc, argv, "s:c:r:f:t:m:h")) != -1) {
switch (opt) {
case 's':
numSuperTables = atoi(optarg);
break;
case 'c':
numChildTables = atoi(optarg);
break;
case 'r':
numRowsPerChildTable = atoi(optarg);
break;
case 'f':
numFields = atoi(optarg);
break;
case 't':
numThreads = atoi(optarg);
break;
case 'm':
maxLinesPerBatch = atoi(optarg);
break;
case 'h':
fprintf(stderr, "Usage: %s -s supertable -c childtable -r rows -f fields -t threads -m maxlines_per_batch\n",
argv[0]);
exit(0);
default: /* '?' */
fprintf(stderr, "Usage: %s -s supertable -c childtable -r rows -f fields -t threads -m maxlines_per_batch\n",
argv[0]);
exit(-1);
}
}
TAOS_RES* result;
const char* host = "127.0.0.1";
const char* user = "root";
......@@ -71,6 +140,11 @@ int main(int argc, char* argv[]) {
exit(1);
}
if (numThreads * MAX_THREAD_LINE_BATCHES* maxLinesPerBatch < numSuperTables*numChildTables*numRowsPerChildTable) {
printf("too many rows to be handle by threads with %d batches", MAX_THREAD_LINE_BATCHES);
exit(2);
}
char* info = taos_get_server_info(taos);
printf("server info: %s\n", info);
info = taos_get_client_info(taos);
......@@ -78,7 +152,7 @@ int main(int argc, char* argv[]) {
result = taos_query(taos, "drop database if exists db;");
taos_free_result(result);
usleep(100000);
result = taos_query(taos, "create database db precision 'ms';");
result = taos_query(taos, "create database db precision 'us';");
taos_free_result(result);
usleep(100000);
......@@ -86,21 +160,24 @@ int main(int argc, char* argv[]) {
time_t ct = time(0);
int64_t ts = ct * 1000;
char* lineFormat = "sta%d,t0=true,t1=127i8,t2=32767i16,t3=%di32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=254u8,c6=32770u16,c7=2147483699u32,c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" %lldms";
char* lineTemplate = calloc(65536, sizeof(char));
getLineTemplate(lineTemplate, 65535, numFields);
printf("setup supertables...");
{
char** linesStb = calloc(numSuperTables, sizeof(char*));
for (int i = 0; i < numSuperTables; i++) {
char* lineStb = calloc(512, 1);
snprintf(lineStb, 512, lineFormat, i,
numThreads * numSuperTables * numChildTables,
ts + numThreads * numSuperTables * numChildTables * numRowsPerChildTable);
char* lineStb = calloc(strlen(lineTemplate)+128, 1);
snprintf(lineStb, strlen(lineTemplate)+128, lineTemplate, i,
numSuperTables * numChildTables,
ts + numSuperTables * numChildTables * numRowsPerChildTable);
linesStb[i] = lineStb;
}
SThreadInsertArgs args = {0};
args.taos = taos;
args.lines = linesStb;
args.numLines = numSuperTables;
args.batches[0].lines = linesStb;
args.batches[0].numLines = numSuperTables;
insertLines(&args);
for (int i = 0; i < numSuperTables; ++i) {
free(linesStb[i]);
......@@ -109,67 +186,72 @@ int main(int argc, char* argv[]) {
}
printf("generate lines...\n");
char*** linesThread = calloc(numThreads, sizeof(char**));
pthread_t* tids = calloc(numThreads, sizeof(pthread_t));
SThreadInsertArgs* argsThread = calloc(numThreads, sizeof(SThreadInsertArgs));
for (int i = 0; i < numThreads; ++i) {
char** lines = calloc(numSuperTables * numChildTables * numRowsPerChildTable, sizeof(char*));
linesThread[i] = lines;
argsThread[i].taos = taos;
argsThread[i].numBatches = 0;
}
for (int t = 0; t < numThreads; ++t) {
int l = 0;
char** lines = linesThread[t];
for (int i = 0; i < numSuperTables; ++i) {
for (int j = 0; j < numChildTables; ++j) {
for (int k = 0; k < numRowsPerChildTable; ++k) {
int stIdx = i;
int ctIdx = t*numSuperTables*numChildTables + j;
char* line = calloc(512, 1);
snprintf(line, 512, lineFormat, stIdx, ctIdx, ts + 10 * l);
lines[l] = line;
++l;
}
}
}
int64_t totalLines = numSuperTables * numChildTables * numRowsPerChildTable;
int totalBatches = (int) ((totalLines) / maxLinesPerBatch);
if (totalLines % maxLinesPerBatch != 0) {
totalBatches += 1;
}
printf("shuffle lines...\n");
for (int t = 0; t < numThreads; ++t) {
shuffle(linesThread[t], numSuperTables * numChildTables * numRowsPerChildTable);
char*** allBatches = calloc(totalBatches, sizeof(char**));
for (int i = 0; i < totalBatches; ++i) {
allBatches[i] = calloc(maxLinesPerBatch, sizeof(char*));
int threadNo = i % numThreads;
int batchNo = i / numThreads;
argsThread[threadNo].batches[batchNo].lines = allBatches[i];
argsThread[threadNo].numBatches = batchNo + 1;
}
int l = 0;
for (int i = 0; i < numSuperTables; ++i) {
for (int j = 0; j < numChildTables; ++j) {
for (int k = 0; k < numRowsPerChildTable; ++k) {
int stIdx = i;
int ctIdx = numSuperTables*numChildTables + j;
char* line = calloc(strlen(lineTemplate)+128, 1);
snprintf(line, strlen(lineTemplate)+128, lineTemplate, stIdx, ctIdx, ts + l);
int batchNo = l / maxLinesPerBatch;
int lineNo = l % maxLinesPerBatch;
allBatches[batchNo][lineNo] = line;
argsThread[batchNo % numThreads].batches[batchNo/numThreads].numLines = lineNo + 1;
++l;
}
}
}
printf("begin multi-thread insertion...\n");
int64_t begin = taosGetTimestampUs();
pthread_t* tids = calloc(numThreads, sizeof(pthread_t));
SThreadInsertArgs* argsThread = calloc(numThreads, sizeof(SThreadInsertArgs));
for (int i=0; i < numThreads; ++i) {
argsThread[i].lines = linesThread[i];
argsThread[i].taos = taos;
argsThread[i].numLines = numSuperTables * numChildTables * numRowsPerChildTable;
pthread_create(tids+i, NULL, insertLines, argsThread+i);
}
for (int i = 0; i < numThreads; ++i) {
pthread_join(tids[i], NULL);
}
int64_t end = taosGetTimestampUs();
int totalLines = numThreads*numSuperTables*numChildTables*numRowsPerChildTable;
printf("TOTAL LINES: %d\n", totalLines);
size_t linesNum = numSuperTables*numChildTables*numRowsPerChildTable;
printf("TOTAL LINES: %zu\n", linesNum);
printf("THREADS: %d\n", numThreads);
int64_t sumTime = 0;
for (int i=0; i<numThreads; ++i) {
sumTime += argsThread[i].costTime;
}
printf("TIME: %d(ms)\n", (int)(end-begin)/1000);
double throughput = (double)(totalLines)/(double)(end-begin) * 1000000;
printf("THROUGHPUT:%d/s\n", (int)throughput);
for (int i = 0; i < totalBatches; ++i) {
free(allBatches[i]);
}
free(allBatches);
free(argsThread);
free(tids);
for (int i = 0; i < numThreads; ++i) {
free(linesThread[i]);
}
free(linesThread);
free(lineTemplate);
taos_close(taos);
return 0;
}
......@@ -17,5 +17,5 @@ go env -w GO111MODULE=on
go env -w GOPROXY=https://goproxy.io,direct
bash ./case001/case001.sh $severIp $serverPort
#bash ./case002/case002.sh $severIp $serverPort
bash ./case002/case002.sh $severIp $serverPort
#bash ./case003/case003.sh $severIp $serverPort
......@@ -18,7 +18,7 @@ import (
"database/sql"
"flag"
"fmt"
_ "github.com/taosdata/driver-go/taosSql"
_ "github.com/taosdata/driver-go/v2/taosSql"
"log"
"strconv"
"time"
......@@ -62,6 +62,7 @@ func main() {
url = "root:taosdata@/tcp(" + configPara.hostName + ":" + strconv.Itoa(configPara.serverPort) + ")/"
// open connect to taos server
fmt.Printf("url:%s",url)
db, err := sql.Open(taosDriverName, url)
if err != nil {
log.Fatalf("Open database error: %s\n", err)
......@@ -167,17 +168,18 @@ func insert_data(db *sql.DB, demot string) {
func select_data(db *sql.DB, demot string) {
st := time.Now().Nanosecond()
fmt.Println(demot)
rows, err := db.Query("select * from ? ", demot) // go text mode
fmt.Println("end query",err)
checkErr(err, "select db.Query")
fmt.Printf("%10s%s%8s %5s %9s%s %s %8s%s %7s%s %8s%s %4s%s %5s%s\n", " ", "ts", " ", "id", " ", "name", " ", "len", " ", "flag", " ", "notes", " ", "fv", " ", " ", "dv")
var affectd int
//decoder := mahonia.NewDecoder("gbk") // 把原来ANSI格式的文本文件里的字符,用gbk进行解码。
fmt.Println("start next")
for rows.Next() {
var ts string
var ts time.Time
var name string
var id int
var len int8
......@@ -187,6 +189,7 @@ func select_data(db *sql.DB, demot string) {
var dv float64
err = rows.Scan(&ts, &id, &name, &len, &flag, &notes, &fv, &dv)
fmt.Println("rows:",err)
checkErr(err, "select rows.Scan")
fmt.Printf("%s|\t", ts)
......
@echo off
echo ==== start run cases001.go
del go.*
go mod init demotest
go build
demotest.exe -h %1 -p %2
cd ..
package main
import (
"database/sql/driver"
"fmt"
"io"
"os"
"time"
taos "github.com/taosdata/driver-go/v2/af"
)
func Subscribe_check(topic taos.Subscriber, check int) bool {
count := 0
rows, err := topic.Consume()
defer func() { rows.Close(); time.Sleep(time.Second) }()
if err != nil {
fmt.Println(err)
os.Exit(3)
}
for {
values := make([]driver.Value, 2)
err := rows.Next(values)
if err == io.EOF {
break
} else if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(4)
}
count++
}
if count == check {
return false
} else {
return true
}
}
func main() {
ts := 1630461600000
db, err := taos.Open("127.0.0.1", "", "", "", 0)
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
defer db.Close()
db.Exec("drop if exists database test")
db.Exec("create if not exists database test")
db.Exec("use test")
db.Exec("drop if exists database test")
db.Exec("create table test (ts timestamp ,level int)")
for i := 0; i < 10; i++ {
sqlcmd := fmt.Sprintf("insert into test values(%d,%d)", ts+i, i)
db.Exec(sqlcmd)
}
fmt.Println("consumption 01.")
topic, err := db.Subscribe(false, "test", "select ts, level from test", time.Second)
if Subscribe_check(topic, 10) {
os.Exit(3)
}
fmt.Println("consumption 02: no new rows inserted")
if Subscribe_check(topic, 0) {
os.Exit(3)
}
fmt.Println("consumption 03: after one new rows inserted")
sqlcmd := fmt.Sprintf("insert into test values(%d,%d)", ts+10, 10)
db.Exec(sqlcmd)
if Subscribe_check(topic, 1) {
os.Exit(3)
}
fmt.Println("consumption 04: keep progress and continue previous subscription")
topic.Unsubscribe(true)
topic, err = db.Subscribe(false, "test", "select ts, level from test", time.Second)
if Subscribe_check(topic, 0) {
os.Exit(3)
}
}
#!/bin/bash
echo "==== start run cases001.go"
set +e
#set -x
script_dir="$(dirname $(readlink -f $0))"
#echo "pwd: $script_dir, para0: $0"
#execName=$0
#execName=`echo ${execName##*/}`
#goName=`echo ${execName%.*}`
###### step 3: start build
cd $script_dir
rm -f go.*
go mod init demotest > /dev/null 2>&1
go mod tidy > /dev/null 2>&1
go build > /dev/null 2>&1
sleep 1s
./demotest -h $1 -p $2
......@@ -15,6 +15,7 @@ import sys
from util.log import *
from util.cases import *
from util.sql import *
from math import floor
class TDTestCase:
......@@ -27,23 +28,22 @@ class TDTestCase:
sql = "select server_version()"
ret = tdSql.query(sql)
version = tdSql.getData(0, 0)[0:3]
expectedVersion_dev = "2.0"
expectedVersion_master = "2.1"
if(version == expectedVersion_dev or version == expectedVersion_master):
tdLog.info("sql:%s, row:%d col:%d data:%s == expect" % (sql, 0, 0, version))
version = floor(float(tdSql.getData(0, 0)[0:3]))
expectedVersion = 2
if(version == expectedVersion):
tdLog.info("sql:%s, row:%d col:%d data:%d == expect" % (sql, 0, 0, version))
else:
tdLog.exit("sql:%s, row:%d col:%d data:%s != expect:%s or %s " % (sql, 0, 0, version, expectedVersion_dev, expectedVersion_master))
tdLog.exit("sql:%s, row:%d col:%d data:%d != expect:%d " % (sql, 0, 0, version, expectedVersion))
sql = "select client_version()"
ret = tdSql.query(sql)
version = tdSql.getData(0, 0)[0:3]
expectedVersion_dev = "2.0"
expectedVersion_master = "2.1"
if(version == expectedVersion_dev or version == expectedVersion_master):
tdLog.info("sql:%s, row:%d col:%d data:%s == expect" % (sql, 0, 0, version))
version = floor(float(tdSql.getData(0, 0)[0:3]))
expectedVersion = 2
if(version == expectedVersion):
tdLog.info("sql:%s, row:%d col:%d data:%d == expect" % (sql, 0, 0, version))
else:
tdLog.exit("sql:%s, row:%d col:%d data:%s != expect:%s or %s " % (sql, 0, 0, version, expectedVersion_dev, expectedVersion_master))
tdLog.exit("sql:%s, row:%d col:%d data:%d != expect:%d " % (sql, 0, 0, version, expectedVersion))
def stop(self):
......
......@@ -242,6 +242,7 @@ python3 ./test.py -f query/bug2143.py
python3 ./test.py -f query/sliding.py
python3 ./test.py -f query/unionAllTest.py
python3 ./test.py -f query/bug2281.py
python3 ./test.py -f query/udf.py
python3 ./test.py -f query/bug2119.py
python3 ./test.py -f query/isNullTest.py
python3 ./test.py -f query/queryWithTaosdKilled.py
......@@ -262,6 +263,7 @@ python3 ./test.py -f query/nestedQuery/queryInterval.py
python3 ./test.py -f query/queryStateWindow.py
# python3 ./test.py -f query/nestedQuery/queryWithOrderLimit.py
python3 ./test.py -f query/nestquery_last_row.py
python3 ./test.py -f query/nestedQuery/nestedQuery.py
python3 ./test.py -f query/queryCnameDisplay.py
python3 ./test.py -f query/operator_cost.py
python3 test.py -f query/nestedQuery/queryWithSpread.py
......@@ -302,6 +304,7 @@ python3 testMinTablesPerVnode.py
python3 queryCount.py
python3 ./test.py -f query/queryGroupbyWithInterval.py
python3 client/twoClients.py
python3 ./test.py -f query/query.py
python3 test.py -f query/queryInterval.py
python3 test.py -f query/queryFillTest.py
......@@ -392,6 +395,7 @@ python3 ./test.py -f alter/alterColMultiTimes.py
python3 ./test.py -f query/queryWildcardLength.py
python3 ./test.py -f query/queryTbnameUpperLower.py
python3 ./test.py -f query/queryGroupTbname.py
python3 ./test.py -f insert/verifyMemToDiskCrash.py
#======================p4-end===============
......
......@@ -1133,24 +1133,23 @@ class TDTestCase:
tdSql.checkRows(6)
def sStbDtbDdataAcMtInsertMultiThreadCheckCase(self):
"""
#! concurrency conflict
"""
"""
thread input same stb, different tb, different data, add col, mul tag
"""
self.cleanStb()
input_sql, stb_name = self.genFullTypeSql()
self.resCmp(input_sql, stb_name)
s_stb_d_tb_a_col_m_tag_list = self.genSqlList(stb_name=stb_name)[5]
# s_stb_d_tb_a_col_m_tag_list = self.genSqlList(stb_name=stb_name)[5]
s_stb_d_tb_a_col_m_tag_list = [(f'{stb_name},t0=F,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64 c0=t,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7="ngxgzdzs",c8=L"ncharColValue",c9=7u64,c11=L"ncharColValue",c10=F 1626006833639000000ns', 'hpxbys'), \
(f'{stb_name},t0=True,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64 c0=T,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7="vvfrdtty",c8=L"ncharColValue",c9=7u64,c11=L"ncharColValue",c10=True 1626006833639000000ns', 'hpxbys'), \
(f'{stb_name},t0=f,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64 c0=False,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7="kzscucnt",c8=L"ncharColValue",c9=7u64,c11=L"ncharColValue",c10=f 1626006833639000000ns', 'hpxbys'), \
(f'{stb_name},t0=false,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64 c0=f,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7="asegdbqk",c8=L"ncharColValue",c9=7u64,c11=L"ncharColValue",c10=false 1626006833639000000ns', 'hpxbys'), \
(f'{stb_name},t0=T,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64 c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7="yvqnhgmn",c8=L"ncharColValue",c9=7u64,c11=L"ncharColValue",c10=T 1626006833639000000ns', 'hpxbys')]
self.multiThreadRun(self.genMultiThreadSeq(s_stb_d_tb_a_col_m_tag_list))
tdSql.query(f"show tables;")
tdSql.checkRows(3)
def sStbDtbDdataAtMcInsertMultiThreadCheckCase(self):
"""
#! concurrency conflict
"""
"""
thread input same stb, different tb, different data, add tag, mul col
"""
......@@ -1170,12 +1169,18 @@ class TDTestCase:
tb_name = self.getLongName(7, "letters")
input_sql, stb_name = self.genFullTypeSql(tb_name=tb_name)
self.resCmp(input_sql, stb_name)
s_stb_s_tb_d_ts_list = self.genSqlList(stb_name=stb_name, tb_name=tb_name)[7]
# s_stb_s_tb_d_ts_list = self.genSqlList(stb_name=stb_name, tb_name=tb_name)[7]
s_stb_s_tb_d_ts_list =[(f'{stb_name},id="{tb_name}",t0=t,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7="tgqkvsws",t8=L"ncharTagValue" c0=f,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7="htvnnldm",c8=L"ncharColValue",c9=7u64 0', 'sfzqdz'), \
(f'{stb_name},id="{tb_name}",t0=f,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7="fvrhhqiy",t8=L"ncharTagValue" c0=False,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7="gybqvhos",c8=L"ncharColValue",c9=7u64 0', 'sfzqdz'), \
(f'{stb_name},id="{tb_name}",t0=f,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7="vifkabhu",t8=L"ncharTagValue" c0=t,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7="zlvxgquy",c8=L"ncharColValue",c9=7u64 0', 'sfzqdz'), \
(f'{stb_name},id="{tb_name}",t0=True,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7="lsyotcrn",t8=L"ncharTagValue" c0=False,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7="oaupfgtz",c8=L"ncharColValue",c9=7u64 0', 'sfzqdz'), \
(f'{stb_name},id="{tb_name}",t0=T,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7="jrwamcgy",t8=L"ncharTagValue" c0=F,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7="vgzadjsh",c8=L"ncharColValue",c9=7u64 0', 'sfzqdz')]
self.multiThreadRun(self.genMultiThreadSeq(s_stb_s_tb_d_ts_list))
tdSql.query(f"show tables;")
tdSql.checkRows(1)
tdSql.query(f"select * from {stb_name}")
tdSql.checkRows(6)
# ! Small probability bug ---> temporarily delete it
# tdSql.query(f"select * from {stb_name}")
# tdSql.checkRows(6)
def sStbStbDdataDtsAcMtInsertMultiThreadCheckCase(self):
"""
......@@ -1204,7 +1209,12 @@ class TDTestCase:
tb_name = self.getLongName(7, "letters")
input_sql, stb_name = self.genFullTypeSql(tb_name=tb_name)
self.resCmp(input_sql, stb_name)
s_stb_s_tb_d_ts_a_tag_m_col_list = self.genSqlList(stb_name=stb_name, tb_name=tb_name)[9]
# s_stb_s_tb_d_ts_a_tag_m_col_list = self.genSqlList(stb_name=stb_name, tb_name=tb_name)[9]
s_stb_s_tb_d_ts_a_tag_m_col_list = [(f'{stb_name},id="{tb_name}",t0=T,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7="xsajdfjc",t8=L"ncharTagValue",t11=127i8,t10=L"ncharTagValue" c0=f,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64 0', 'rgqcfb'), \
(f'{stb_name},id="{tb_name}",t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7="qzeyolgt",t8=L"ncharTagValue",t11=127i8,t10=L"ncharTagValue" c0=True,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64 0', 'rgqcfb'), \
(f'{stb_name},id="{tb_name}",t0=False,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7="suxqziwh",t8=L"ncharTagValue",t11=127i8,t10=L"ncharTagValue" c0=False,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64 0', 'rgqcfb'), \
(f'{stb_name},id="{tb_name}",t0=false,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7="vapolpgr",t8=L"ncharTagValue",t11=127i8,t10=L"ncharTagValue" c0=t,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64 0', 'rgqcfb'), \
(f'{stb_name},id="{tb_name}",t0=F,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7="eustwpfl",t8=L"ncharTagValue",t11=127i8,t10=L"ncharTagValue" c0=t,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64 0', 'rgqcfb')]
self.multiThreadRun(self.genMultiThreadSeq(s_stb_s_tb_d_ts_a_tag_m_col_list))
tdSql.query(f"show tables;")
tdSql.checkRows(1)
......@@ -1230,16 +1240,18 @@ class TDTestCase:
tdSql.checkRows(6)
def sStbDtbDdataDtsAcMtInsertMultiThreadCheckCase(self):
"""
# ! concurrency conflict
"""
"""
thread input same stb, different tb, data, ts, add col, mul tag
"""
self.cleanStb()
input_sql, stb_name = self.genFullTypeSql()
self.resCmp(input_sql, stb_name)
s_stb_d_tb_d_ts_a_col_m_tag_list = self.genSqlList(stb_name=stb_name)[11]
# s_stb_d_tb_d_ts_a_col_m_tag_list = self.genSqlList(stb_name=stb_name)[11]
s_stb_d_tb_d_ts_a_col_m_tag_list = [(f'{stb_name},t0=True,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64 c0=f,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7="eltflgpz",c8=L"ncharColValue",c9=7u64,c11=L"ncharColValue",c10=True 0', 'ynnlov'), \
(f'{stb_name},t0=t,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64 c0=False,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7="ysznggwl",c8=L"ncharColValue",c9=7u64,c11=L"ncharColValue",c10=t 0', 'ynnlov'), \
(f'{stb_name},t0=f,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64 c0=f,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7="nxwjucch",c8=L"ncharColValue",c9=7u64,c11=L"ncharColValue",c10=f 0', 'ynnlov'), \
(f'{stb_name},t0=F,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64 c0=T,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7="fzseicnt",c8=L"ncharColValue",c9=7u64,c11=L"ncharColValue",c10=F 0', 'ynnlov'), \
(f'{stb_name},t0=False,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64 c0=F,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7="zwgurhdp",c8=L"ncharColValue",c9=7u64,c11=L"ncharColValue",c10=False 0', 'ynnlov')]
self.multiThreadRun(self.genMultiThreadSeq(s_stb_d_tb_d_ts_a_col_m_tag_list))
tdSql.query(f"show tables;")
tdSql.checkRows(3)
......@@ -1285,7 +1297,7 @@ class TDTestCase:
self.tagColAddCheckCase()
self.tagMd5Check()
self.tagColBinaryMaxLengthCheckCase()
# self.tagColNcharMaxLengthCheckCase()
self.tagColNcharMaxLengthCheckCase()
self.batchInsertCheckCase()
self.multiInsertCheckCase(1000)
self.batchErrorInsertCheckCase()
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
from util.log import tdLog
from util.cases import tdCases
from util.sql import tdSql
from util.common import tdCom
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def checkTbMemDiskMerge(self):
tb_name = tdCom.getLongName(8, "letters")
tdSql.execute(
f'CREATE TABLE {tb_name} (ts timestamp, c1 int, c2 int)')
tdSql.execute(
f'insert into {tb_name} values ("2021-01-01 12:00:00.000", 1, 1)')
tdSql.execute(
f'insert into {tb_name} values ("2021-01-03 12:00:00.000", 3, 3)')
tdCom.restartTaosd()
tdSql.execute(
f'insert into {tb_name} values ("2021-01-02 12:00:00.000", Null, 2)')
tdSql.execute(
f'insert into {tb_name} values ("2021-01-04 12:00:00.000", Null, 4)')
query_sql = f'select * from {tb_name}'
res1 = tdSql.query(query_sql, True)
tdCom.restartTaosd()
res2 = tdSql.query(query_sql, True)
for i in range(4):
tdSql.checkEqual(res1[i], res2[i])
def checkStbMemDiskMerge(self):
stb_name = tdCom.getLongName(7, "letters")
tb_name = f'{stb_name}_sub'
tdSql.execute(
f'CREATE TABLE {stb_name} (ts timestamp, c1 int, c2 int) tags (t1 int)')
tdSql.execute(
f'CREATE TABLE {tb_name} using {stb_name} tags (1)')
tdSql.execute(
f'insert into {tb_name} values ("2021-01-01 12:00:00.000", 1, 1)')
tdSql.execute(
f'insert into {tb_name} values ("2021-01-03 12:00:00.000", 3, 3)')
tdCom.restartTaosd()
tdSql.execute(
f'insert into {tb_name} values ("2021-01-02 12:00:00.000", Null, 2)')
tdSql.execute(
f'insert into {tb_name} values ("2021-01-04 12:00:00.000", Null, 4)')
query_sql = f'select * from {stb_name}'
res1 = tdSql.query(query_sql, True)
tdCom.restartTaosd()
res2 = tdSql.query(query_sql, True)
for i in range(4):
tdSql.checkEqual(res1[i], res2[i])
def checkTbSuperSubBlockMerge(self):
tb_name = tdCom.getLongName(6, "letters")
tdSql.execute(
f'CREATE TABLE {tb_name} (ts timestamp, c1 int)')
start_ts = 1577808001000
for i in range(10):
tdSql.execute(
f'insert into {tb_name} values ({start_ts}, {i})')
start_ts += 1
tdCom.restartTaosd()
for i in range(10):
tdSql.execute(
f'insert into {tb_name} values ({start_ts}, Null)')
start_ts += 1
tdCom.restartTaosd()
for i in range(10):
new_ts = i + 10 + 10
tdSql.execute(
f'insert into {tb_name} values ({start_ts}, {new_ts})')
start_ts += 1
tdCom.restartTaosd()
tdSql.query(f'select * from {tb_name}')
def checkStbSuperSubBlockMerge(self):
stb_name = tdCom.getLongName(5, "letters")
tb_name = f'{stb_name}_sub'
tdSql.execute(
f'CREATE TABLE {stb_name} (ts timestamp, c1 int) tags (t1 int)')
tdSql.execute(
f'CREATE TABLE {tb_name} using {stb_name} tags (1)')
start_ts = 1577808001000
for i in range(10):
tdSql.execute(
f'insert into {tb_name} values ({start_ts}, {i})')
start_ts += 1
tdCom.restartTaosd()
for i in range(10):
tdSql.execute(
f'insert into {tb_name} values ({start_ts}, Null)')
start_ts += 1
tdCom.restartTaosd()
for i in range(10):
new_ts = i + 10 + 10
tdSql.execute(
f'insert into {tb_name} values ({start_ts}, {new_ts})')
start_ts += 1
tdCom.restartTaosd()
tdSql.query(f'select * from {stb_name}')
def run(self):
tdSql.prepare()
self.checkTbMemDiskMerge()
self.checkStbMemDiskMerge()
self.checkTbSuperSubBlockMerge()
self.checkStbSuperSubBlockMerge()
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
此差异已折叠。
......@@ -13,10 +13,10 @@
import sys
import taos
from util.log import tdLog
from util.cases import tdCases
from util.sql import tdSql
from util.dnodes import tdDnodes
from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import *
class TDTestCase:
......@@ -26,6 +26,18 @@ class TDTestCase:
self.ts = 1538548685000
def bug_6387(self):
tdSql.execute("create database bug6387 ")
tdSql.execute("use bug6387 ")
tdSql.execute("create table test(ts timestamp, c1 int) tags(t1 int)")
for i in range(5000):
sql = "insert into t%d using test tags(1) values " % i
for j in range(21):
sql = sql + "(now+%ds,%d)" % (j ,j )
tdSql.execute(sql)
tdSql.query("select count(*) from test interval(1s) group by tbname")
tdSql.checkData(0,1,1)
def run(self):
tdSql.prepare()
......@@ -120,15 +132,21 @@ class TDTestCase:
tdSql.execute("insert into 'Tb0' using tb tags(1) values(now, 1)")
tdSql.query("select * from tb")
tdSql.checkRows(1)
tdSql.query("select * from tb0")
tdSql.checkRows(1)
# For jira:https://jira.taosdata.com:18080/browse/TD-6314
tdSql.execute("use db")
tdSql.execute("create stable stb_001(ts timestamp,v int) tags(c0 int)")
tdSql.execute("insert into stb1 using stb_001 tags(1) values(now,1)")
tdSql.query("select _block_dist() from stb_001")
tdSql.checkRows(1)
tdSql.query("select * from tb0")
tdSql.checkRows(1)
#For jira: https://jira.taosdata.com:18080/browse/TD-6387
tdLog.info("case for bug_6387")
self.bug_6387()
def stop(self):
tdSql.close()
......
......@@ -13,6 +13,8 @@
import sys
import taos
import string
import random
from util.log import *
from util.cases import *
from util.sql import *
......@@ -23,6 +25,11 @@ class TDTestCase:
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
def get_random_string(self, length):
letters = string.ascii_lowercase
result_str = ''.join(random.choice(letters) for i in range(length))
return result_str
def run(self):
tdSql.prepare()
......@@ -186,6 +193,20 @@ class TDTestCase:
tdSql.query("select t1.ts from t0,t1 where t0.ts = t1.ts")
tdSql.checkData(0,0,'2018-10-03 14:38:05.000000')
#TD-6425 join result more than 1MB
tdSql.execute("create database test_join")
tdSql.execute("use test_join")
ts = 1538548685000
tdSql.execute("create table stb(ts timestamp, c1 nchar(200)) tags(id int, loc binary(20))")
for i in range(2):
tdSql.execute("create table tb%d using stb tags(1, 'city%d')" % (i, i))
for j in range(1000):
tdSql.execute("insert into tb%d values(%d, '%s')" % (i, ts + j, self.get_random_string(200)))
tdSql.query("select tb0.c1, tb1.c1 from tb0, tb1 where tb0.ts = tb1.ts")
tdSql.checkRows(1000)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import taos
import os
from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
self.ts = 1627750800000
self.numberOfRecords = 10000
def pre_stable(self):
os.system("gcc -g -O0 -fPIC -shared ../script/sh/abs_max.c -o /tmp/abs_max.so")
os.system("gcc -g -O0 -fPIC -shared ../script/sh/add_one.c -o /tmp/add_one.so")
tdSql.execute("create table stb(ts timestamp ,c1 int, c2 bigint) tags(t1 int)")
for i in range(50):
for j in range(200):
sql = "insert into t%d using stb tags(%d) values(%s,%d,%d)" % (i, i, self.ts + j, 1e2+j, 1e10+j)
tdSql.execute(sql)
for i in range(50):
for j in range(200):
sql = "insert into t%d using stb tags(%d) values(%s,%d,%d)" % (i, i, self.ts + j + 200 , -1e2-j, -j-1e10)
tdSql.execute(sql)
def test_udf_null(self):
tdLog.info("test missing parameters")
tdSql.error("create aggregate function as '/tmp/abs_maxw.so' outputtype bigint;")
tdSql.error("create aggregate function abs_max as '' outputtype bigint;")
tdSql.error("create aggregate function abs_max as outputtype bigint;")
tdSql.error("create aggregate function abs_max as '/tmp/abs_maxw.so' ;")
tdSql.error("create aggregate abs_max as '/tmp/abs_maxw.so' outputtype bigint;")
tdSql.execute("create aggregate function abs_max as '/tmp/abs_max.so' outputtype bigint;")
tdSql.error("select abs_max() from stb")
tdSql.error("select abs_max(c2) from ")
tdSql.execute("drop function abs_max")
def test_udf_format(self):
# tdSql.error("create aggregate function avg as '/tmp/abs_max.so' outputtype bigint;")
tdSql.error("create aggregate function .a as '/tmp/abs_max.so' outputtype bigint;")
tdSql.error("create aggregate function .11 as '/tmp/abs_max.so' outputtype bigint;")
tdSql.error("create aggregate function 1a as '/tmp/abs_max.so' outputtype bigint;")
tdSql.error("create aggregate function \"1+1\" as '/tmp/abs_max.so' outputtype bigint;")
# tdSql.error("create aggregate function [avg] as '/tmp/abs_max.so' outputtype bigint;")
tdSql.execute("create aggregate function abs_max as '/tmp/abs_max.so' outputtype bigint;")
# tdSql.error("create aggregate function abs_max2 as '/tmp/abs_max.so' outputtype bigint;")
tdSql.execute("drop function abs_max;")
tdSql.error("create aggregate function abs_max as '/tmp/add_onew.so' outputtype bigint;")
def test_udf_test(self):
tdSql.execute("create aggregate function abs_max as '/tmp/abs_max.so' outputtype bigint;")
tdSql.error("create aggregate function abs_max as '/tmp/add_onew.so' outputtype bigint;")
sql = 'select abs_max() from db.stb'
tdSql.error(sql)
sql = 'select abs_max(c2) from db.stb'
tdSql.query(sql)
tdSql.checkData(0,0,1410065607)
def run(self):
tdSql.prepare()
tdLog.info("==============step1")
self.pre_stable()
tdLog.info("==============step2")
self.test_udf_null()
tdLog.info("==============step3")
self.test_udf_format()
tdLog.info("==============step4")
self.test_udf_test()
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
......@@ -14,6 +14,7 @@
import random
import string
from util.sql import tdSql
from util.dnodes import tdDnodes
class TDCom:
def init(self, conn, logSql):
......@@ -47,6 +48,11 @@ class TDCom:
chars = ''.join(random.choice(string.ascii_letters.lower() + string.digits) for i in range(len))
return chars
def restartTaosd(self, index=1, db_name="db"):
tdDnodes.stop(index)
tdDnodes.startWithoutSleep(index)
tdSql.execute(f"use {db_name}")
def close(self):
self.cursor.close()
......
......@@ -221,3 +221,5 @@ run general/stream/table_replica1_vnoden.sim
run general/stream/metrics_replica1_vnoden.sim
run general/db/show_create_db.sim
run general/db/show_create_table.sim
run general/parser/like.sim
run general/parser/interp_blocks.sim
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册