提交 852b5239 编写于 作者: G Ganlin Zhao

Merge branch '2.6' into fix/TS-2865

......@@ -105,12 +105,11 @@ As a high-performance, scalable and SQL supported time-series database, TDengine
## Comparison with other databases
- [Writing Performance Comparison of TDengine and InfluxDB ](https://tdengine.com/2022/02/23/4975.html)
- [Query Performance Comparison of TDengine and InfluxDB](https://tdengine.com/2022/02/24/5120.html)
- [TDengine vs InfluxDB、OpenTSDB、Cassandra、MySQL、ClickHouse](https://www.tdengine.com/downloads/TDengine_Testing_Report_en.pdf)
- [TDengine vs OpenTSDB](https://tdengine.com/2019/09/12/710.html)
- [TDengine vs Cassandra](https://tdengine.com/2019/09/12/708.html)
- [TDengine vs InfluxDB](https://tdengine.com/2019/09/12/706.html)
- [Writing Performance Comparison of TDengine and InfluxDB ](https://tdengine.com/performance-comparison-of-tdengine-and-influxdb/)
- [Query Performance Comparison of TDengine and InfluxDB](https://tdengine.com/query-performance-comparison-test-report-tdengine-vs-influxdb/)
- [TDengine vs OpenTSDB](https://tdengine.com/performance-tdengine-vs-opentsdb/)
- [TDengine vs Cassandra](https://tdengine.com/performance-tdengine-vs-cassandra/)
- [TDengine vs InfluxDB](https://tdengine.com/performance-tdengine-vs-influxdb/)
If you want to learn some basics about time-series databases, please check [here](https://tdengine.com/tsdb).
If you want to learn some basics about time-series databases, please check [here](https://tdengine.com/tsdb/).
......@@ -33,7 +33,7 @@ We recommend using the latest version of `taospy`, regardless of the version of
### Preparation
1. Install Python. Python >= 3.6 is recommended. If Python is not available on your system, refer to the [Python BeginnersGuide](https://wiki.python.org/moin/BeginnersGuide/Download) to install it.
1. Install Python. Python >= 3.6.2 is recommended. If Python is not available on your system, refer to the [Python BeginnersGuide](https://wiki.python.org/moin/BeginnersGuide/Download) to install it.
2. Install [pip](https://pypi.org/project/pip/). In most cases, the Python installer comes with the pip utility. If not, please refer to [pip documentation](https://pip.pypa.io/en/stable/installation/) to install it.
If you use a native connection, you will also need to [Install Client Driver](/reference/connector#Install-Client-Driver). The client install package includes the TDengine client dynamic link library (`libtaos.so` or `taos.dll`) and the TDengine CLI.
......
......@@ -76,7 +76,7 @@ Development: false
### Install from source code
```
git clone https://github.com:taosdata/kafka-connect-tdengine.git
git clone --branch master https://github.com:taosdata/kafka-connect-tdengine.git
cd kafka-connect-tdengine
mvn clean package
unzip -d $CONFLUENT_HOME/share/java/ target/components/packages/taosdata-kafka-connect-tdengine-*.zip
......
......@@ -33,7 +33,7 @@ Python 连接器的源码托管在 [GitHub](https://github.com/taosdata/taos-con
### 准备
1. 安装 Python。建议使用 Python >= 3.6。如果系统上还没有 Python 可参考 [Python BeginnersGuide](https://wiki.python.org/moin/BeginnersGuide/Download) 安装。
1. 安装 Python。建议使用 Python >= 3.6.2。如果系统上还没有 Python 可参考 [Python BeginnersGuide](https://wiki.python.org/moin/BeginnersGuide/Download) 安装。
2. 安装 [pip](https://pypi.org/project/pip/)。大部分情况下 Python 的安装包都自带了 pip 工具, 如果没有请参考 [pip docuemntation](https://pip.pypa.io/en/stable/installation/) 安装。
3. 如果使用原生连接,还需[安装客户端驱动](../#安装客户端驱动)。客户端软件包含了 TDengine 客户端动态链接库(libtaos.so 或 taos.dll) 和 TDengine CLI。
......
......@@ -78,7 +78,7 @@ Development: false
### 从源码安装
```
git clone https://github.com:taosdata/kafka-connect-tdengine.git
git clone --branch master https://github.com:taosdata/kafka-connect-tdengine.git
cd kafka-connect-tdengine
mvn clean package
unzip -d $CONFLUENT_HOME/share/java/ target/components/packages/taosdata-kafka-connect-tdengine-*.zip
......
......@@ -406,6 +406,7 @@ typedef struct SSqlObj {
int64_t lastAlive;
void * pPrevContext;
bool enableBatch;
bool needUpdateMeta;
} SSqlObj;
typedef struct SSqlStream {
......
......@@ -259,13 +259,14 @@ static inline char *insertTags(char *sql, char *tags) {
// nest call
part2 = insertTags(sub_sql, tags);
free(sub_sql);
if (part2 == NULL) {
// unknown format, can not insert tags
tscError("TAGS insertTags sub select sql failed. subsql=%s sql=%s", sub_sql, sql);
free(sub_sql);
free(buf);
return NULL;
}
free(sub_sql);
// new string is part1 + part2 + part 3
strncpy(buf, p, part1_end - p + 1);
......
......@@ -1736,6 +1736,16 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow
goto _error;
}
} else if (code != TSDB_CODE_SUCCESS) {
SSqlObj *rootObj = pSql->rootObj;
if (rootObj->res.code && rootObj->needUpdateMeta) {
rootObj->needUpdateMeta = false;
if (pSql->retry < pSql->maxRetry) {
tscRenewTableMeta(pSql);
return;
}
}
goto _error;
}
......
......@@ -532,6 +532,20 @@ bool shouldRewTableMeta(SSqlObj* pSql, SRpcMsg* rpcMsg) {
return true;
}
int tscHandleRenewTableMeta(SSqlObj *pSql) {
SSqlObj *rootObj = pSql->rootObj;
if (rootObj == pSql) {
return tscRenewTableMeta(pSql);
}
rootObj->res.code = pSql->res.code;
rootObj->needUpdateMeta = true;
return rootObj->res.code;
}
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
TSDB_CACHE_PTR_TYPE handle = (TSDB_CACHE_PTR_TYPE) rpcMsg->ahandle;
SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle);
......@@ -611,7 +625,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
}
pSql->retryReason = rpcMsg->code;
rpcMsg->code = tscRenewTableMeta(pSql);
rpcMsg->code = tscHandleRenewTableMeta(pSql);
// if there is an error occurring, proceed to the following error handling procedure.
if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, handle);
......@@ -3425,7 +3439,8 @@ int tscRenewTableMeta(SSqlObj *pSql) {
SSqlObj *rootSql = pSql->rootObj;
tscFreeSubobj(rootSql);
tscResetSqlCmd(&rootSql->cmd, true, rootSql->self);
rootSql->res.code = 0;
code = getMultiTableMetaFromMnode(rootSql, pNameList, vgroupList, NULL, tscTableMetaCallBack, true);
taosArrayDestroyEx(&pNameList, freeElem);
taosArrayDestroyEx(&vgroupList, freeElem);
......
......@@ -1631,6 +1631,8 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
SJoinSupporter* pSupporter = (SJoinSupporter*)param;
int64_t handle = pSupporter->pObj;
tscDebug("***enter joinRetrieveFinalResCallback");
SSqlObj* pParentSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle);
if (pParentSql == NULL) return;
......@@ -1639,7 +1641,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
SSqlRes* pRes = &pSql->res;
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
tscError("0x%"PRIx64" abort query due to other subquery failure. code:%d, global code:%d", pSql->self, numOfRows, pParentSql->res.code);
if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
......@@ -1682,10 +1684,16 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
numOfVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
}
if ((++pTableMetaInfo->vgroupIndex) < numOfVgroups) {
tscDebug("**clauseLimit:%" PRId64 " numOfClauseTotal:%" PRId64 " vgIdx:%d numOfVgroups:%d",
pParentSql->cmd.active->clauseLimit, pParentSql->res.numOfClauseTotal, pTableMetaInfo->vgroupIndex, numOfVgroups);
if ((++pTableMetaInfo->vgroupIndex) < numOfVgroups && (pParentSql->cmd.active->clauseLimit < 0 || pParentSql->cmd.active->clauseLimit > pParentSql->res.numOfClauseTotal)) {
tscDebug("0x%"PRIx64" no result in current vnode anymore, try next vnode, vgIndex:%d", pSql->self, pTableMetaInfo->vgroupIndex);
pSql->cmd.command = TSDB_SQL_SELECT;
pSql->fp = tscJoinQueryCallback;
pSql->cmd.active->clauseLimit = pParentSql->cmd.active->clauseLimit;
pSql->cmd.active->limit.limit = pParentSql->cmd.active->clauseLimit - pParentSql->res.numOfClauseTotal;
pSql->cmd.active->limit.offset = pSql->res.offset;
tscBuildAndSendRequest(pSql, NULL);
goto _return;
......@@ -1744,6 +1752,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
bool hasData = true;
bool reachLimit = false;
tscDebug("***enter tscFetchDatablockForSubquery");
{ pthread_mutex_lock(&pSql->subState.mutex);
assert(pSql->subState.numOfSub >= 1);
......@@ -1782,6 +1791,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
// has data remains in client side, and continue to return data to app
if (hasData) {
tscDebug("*hasData");
tscBuildResFromSubqueries(pSql);
return;
}
......@@ -1789,6 +1799,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
// If at least one subquery is completed in current vnode, try the next vnode in case of multi-vnode
// super table projection query.
if (reachLimit) {
tscDebug("*reachLimit");
pSql->res.completed = true;
freeJoinSubqueryObj(pSql);
......@@ -1847,8 +1858,11 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSub->cmd);
tscDebug("**nonorderedPrj:%d resRow:%d numOfRows:%d com:%d numOfClauseTotal:%"PRId64 " clauseLimit:%" PRId64,
tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0), pSub->res.row, pSub->res.numOfRows,
pSub->res.completed, pSql->res.numOfClauseTotal, pSql->cmd.active->clauseLimit);
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && pSub->res.row >= pSub->res.numOfRows &&
pSub->res.completed) {
pSub->res.completed && (pSql->cmd.active->clauseLimit < 0 || pSql->res.numOfClauseTotal < pSql->cmd.active->clauseLimit)) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
assert(pQueryInfo->numOfTables == 1);
......@@ -1865,6 +1879,9 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
pTableMetaInfo->vgroupIndex);
pSub->cmd.command = TSDB_SQL_SELECT;
pSub->fp = tscJoinQueryCallback;
pSub->cmd.active->clauseLimit = pSql->cmd.active->clauseLimit;
pSub->cmd.active->limit.limit = pSql->cmd.active->clauseLimit - pSql->res.numOfClauseTotal;
pSub->cmd.active->limit.offset = pSub->res.offset;
tscBuildAndSendRequest(pSub, NULL);
tryNextVnode = true;
......@@ -2029,6 +2046,19 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
goto _return;
}
SSqlObj *rootObj = pSql->rootObj;
if (rootObj->needUpdateMeta) {
rootObj->needUpdateMeta = false;
if (pSql->retry < pSql->maxRetry) {
tscRenewTableMeta(pSql);
} else {
tscAsyncResultOnError(pParentSql);
}
goto _return;
}
if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) {
goto _return;
}
......@@ -2132,6 +2162,8 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
SQueryInfo *pNewQueryInfo = tscGetQueryInfo(&pNew->cmd);
assert(pNewQueryInfo != NULL);
pNewQueryInfo->clauseLimit = -1;
pSupporter->colList = pNewQueryInfo->colList;
pNewQueryInfo->colList = NULL;
......@@ -2579,6 +2611,16 @@ void tscFirstRoundCallback(void* param, TAOS_RES* tres, int code) {
SSqlObj* pSql = (SSqlObj*) tres;
int32_t c = taos_errno(pSql);
SSqlObj *rootObj = pSql->rootObj;
if (rootObj->res.code && rootObj->needUpdateMeta) {
rootObj->needUpdateMeta = false;
if (pSql->retry < pSql->maxRetry) {
tscRenewTableMeta(pSql);
return;
}
}
if (c != TSDB_CODE_SUCCESS) {
SSqlObj* parent = pSup->pParent;
......@@ -3086,6 +3128,16 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
tscDestroyGlobalMergerEnv(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, pState->numOfSub);
tscFreeRetrieveSup(&pSql->param);
SSqlObj *rootObj = pSql->rootObj;
if (rootObj->needUpdateMeta) {
rootObj->needUpdateMeta = false;
if (pSql->retry < pSql->maxRetry) {
tscRenewTableMeta(pSql);
return;
}
}
// in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pParentSql->cmd);
......@@ -3187,7 +3239,18 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
tscFreeRetrieveSup(&pSql->param);
return;
}
SSqlObj *rootObj = pSql->rootObj;
if (rootObj->needUpdateMeta) {
rootObj->needUpdateMeta = false;
if (pSql->retry < pSql->maxRetry) {
tscRenewTableMeta(pSql);
tscFreeRetrieveSup(&pSql->param);
return;
}
}
// all sub-queries are returned, start to local merge process
pDesc->pColumnModel->capacity = trsupport->pExtMemBuffer[idx]->numOfElemsPerPage;
......@@ -3330,7 +3393,21 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_NO_DISKSPACE);
return;
}
SColumnModel *pModelDesc = pDesc->pColumnModel;
if (pModelDesc == NULL) {
tscError("0x%"PRIx64" sub:0x%"PRIx64" column model has been freed", pParentSql->self, pSql->self);
tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_QRY_APP_ERROR);
return;
}
SColumnModel *pModelMemBuf = trsupport->pExtMemBuffer[idx]->pColumnModel;
if (pModelDesc->numOfCols != pModelMemBuf->numOfCols ||
pModelDesc->rowSize != pModelMemBuf->rowSize) {
tscError("0x%"PRIx64" sub:0x%"PRIx64 "extBuf column model is not consistent with descriptor column model", pParentSql->self, pSql->self);
tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_QRY_APP_ERROR);
return;
}
int32_t ret = saveToBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, pRes->data,
pRes->numOfRows, pQueryInfo->groupbyExpr.orderType);
if (ret != 0) { // set no disk space error info, and abort retry
......@@ -3803,6 +3880,7 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) {
pthread_mutex_unlock(&pSql->subState.mutex); }
if (numOfRes == 0) { // no result any more, free all subquery objects
tscDebug("*******query complete");
pSql->res.completed = true;
freeJoinSubqueryObj(pSql);
return;
......@@ -3870,7 +3948,7 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) {
pthread_mutex_unlock(&pSql->subState.mutex); }
pRes->numOfRows = numOfRes;
pRes->numOfClauseTotal += numOfRes;
//pRes->numOfClauseTotal += numOfRes;
int32_t finalRowSize = 0;
for(int32_t i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
......@@ -3887,6 +3965,7 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) {
void tscBuildResFromSubqueries(SSqlObj *pSql) {
SSqlRes* pRes = &pSql->res;
tscDebug("*enter tscBuildResFromSubqueries");
if (pRes->code != TSDB_CODE_SUCCESS) {
tscAsyncResultOnError(pSql);
return;
......
......@@ -4574,7 +4574,9 @@ int32_t tscErrorMsgWithCode(int32_t code, char* dstBuffer, const char* errMsg, c
bool tscHasReachLimitation(SQueryInfo* pQueryInfo, SSqlRes* pRes) {
assert(pQueryInfo != NULL && pQueryInfo->clauseLimit != 0);
return (pQueryInfo->clauseLimit > 0 && pRes->numOfClauseTotal >= pQueryInfo->clauseLimit);
bool reachLimit = (pQueryInfo->clauseLimit > 0 && pRes->numOfClauseTotal >= pQueryInfo->clauseLimit);
tscDebug("reachLimit:%d, limit:%" PRId64 " total:%" PRId64, reachLimit, pQueryInfo->clauseLimit, pRes->numOfClauseTotal);
return reachLimit;
}
char* tscGetErrorMsgPayload(SSqlCmd* pCmd) { return pCmd->payload; }
......
Subproject commit 0111c666ddb9fc05889592412ba70c2c3764febf
Subproject commit 41affde86d4ae7be340e31725650970326ac5b0d
......@@ -52,6 +52,7 @@ enum {
TS_JOIN_TS_EQUAL = 0,
TS_JOIN_TS_NOT_EQUALS = 1,
TS_JOIN_TAG_NOT_EQUALS = 2,
TS_JOIN_BLOCK_IGNORE = 3,
};
typedef enum SResultTsInterpType {
......@@ -3231,13 +3232,13 @@ static int32_t doTSJoinFilter(SQueryRuntimeEnv* pRuntimeEnv, TSKEY key, tVariant
if (key < elem.ts) {
return TS_JOIN_TS_NOT_EQUALS;
} else if (key > elem.ts) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_INCONSISTAN);
return TS_JOIN_BLOCK_IGNORE;
}
} else {
if (key > elem.ts) {
return TS_JOIN_TS_NOT_EQUALS;
} else if (key < elem.ts) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_INCONSISTAN);
return TS_JOIN_BLOCK_IGNORE;
}
}
......@@ -3422,6 +3423,9 @@ void filterColRowsInDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SSDataBlock* pBlock
} else if (ret == TS_JOIN_TS_NOT_EQUALS) {
all = false;
continue;
} else if (ret == TS_JOIN_BLOCK_IGNORE) {
all = false;
break;
} else {
assert(ret == TS_JOIN_TS_EQUAL);
p[offset] = true;
......@@ -4551,9 +4555,9 @@ int32_t setTimestampListJoinInfo(SQueryRuntimeEnv* pRuntimeEnv, tVariant* pTag,
if (!tsBufIsValidElem(&elem)) {
if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR ||
pTag->nType == TSDB_DATA_TYPE_JSON) {
qError("QInfo:0x%" PRIx64 " failed to find tag:%s in ts_comp", GET_QID(pRuntimeEnv), pTag->pz);
qDebug("QInfo:0x%" PRIx64 " tag not found in:%s in ts_comp", GET_QID(pRuntimeEnv), pTag->pz);
} else {
qError("QInfo:0x%" PRIx64 " failed to find tag:%" PRId64 " in ts_comp", GET_QID(pRuntimeEnv), pTag->i64);
qDebug("QInfo:0x%" PRIx64 " tag not found in:%" PRId64 " in ts_comp", GET_QID(pRuntimeEnv), pTag->i64);
}
return -1;
......
......@@ -1254,6 +1254,7 @@ void tOrderDescDestroy(tOrderDescriptor *pDesc) {
}
destroyColumnModel(pDesc->pColumnModel);
pDesc->pColumnModel = NULL;
tfree(pDesc);
}
......
......@@ -50,6 +50,12 @@ mkdir -p /var/lib/taos/subscribe
cd $CONTAINER_TESTDIR/tests/$exec_dir
ulimit -c unlimited
#define taospy 2.7.3
pip3 list|grep taospy
pip3 uninstall taospy -y
pip3 install taospy==2.7.3
$TIMEOUT_CMD $cmd
RET=$?
......
......@@ -96,7 +96,6 @@ docker run \
-v $REPDIR/packaging:$CONTAINER_TESTDIR/packaging:ro \
-v $REPDIR/README.md:$CONTAINER_TESTDIR/README.md:ro \
-v $REPDIR/docs:$CONTAINER_TESTDIR/docs \
-v $REPDIR/src/connector/python/taos:/usr/local/lib/python3.8/site-packages/taos:ro \
-e LD_LIBRARY_PATH=/home/debug/build/lib:/home/debug/build/lib64 \
--rm --ulimit core=-1 taos_test:v1.0 $CONTAINER_TESTDIR/tests/parallel_test/run_case.sh -d "$exec_dir" -c "$cmd" $timeout_param
ret=$?
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册