提交 76b5cef3 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/tsdb_refact

--- ---
title: Get Started title: Get Started
description: 'Install TDengine from Docker image, apt-get or package, and run TAOS CLI and taosBenchmark to experience the features' description: 'Install TDengine from Docker image, apt-get or package, and run TDengine CLI and taosBenchmark to experience the features'
--- ---
import Tabs from "@theme/Tabs"; import Tabs from "@theme/Tabs";
...@@ -120,7 +120,7 @@ select * from t; ...@@ -120,7 +120,7 @@ select * from t;
Query OK, 2 row(s) in set (0.003128s) Query OK, 2 row(s) in set (0.003128s)
``` ```
Besides executing SQL commands, system administrators can check running status, add/drop user accounts and manage the running instances. TAOS CLI with client driver can be installed and run on either Linux or Windows machines. For more details on CLI, please [check here](../reference/taos-shell/). Besides executing SQL commands, system administrators can check running status, add/drop user accounts and manage the running instances. TDengine CLI with client driver can be installed and run on either Linux or Windows machines. For more details on CLI, please [check here](../reference/taos-shell/).
## Experience the blazing fast speed ## Experience the blazing fast speed
......
...@@ -37,7 +37,7 @@ USE power; ...@@ -37,7 +37,7 @@ USE power;
## Create STable ## Create STable
In a time-series application, there may be multiple kinds of data collection points. For example, in the electrical power system there are meters, transformers, bus bars, switches, etc. For easy and efficient aggregation of multiple tables, one STable needs to be created for each kind of data collection point. For example, for the meters in [table 1](/tdinternal/arch#model_table1), the SQL statement below can be used to create the super table. In a time-series application, there may be multiple kinds of data collection points. For example, in the electrical power system there are meters, transformers, bus bars, switches, etc. For easy and efficient aggregation of multiple tables, one STable needs to be created for each kind of data collection point. For example, for the meters in [table 1](/concept/#model_table1), the SQL statement below can be used to create the super table.
```sql ```sql
CREATE STable meters (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int); CREATE STable meters (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);
......
...@@ -22,7 +22,7 @@ import CStmt from "./_c_stmt.mdx"; ...@@ -22,7 +22,7 @@ import CStmt from "./_c_stmt.mdx";
## Introduction ## Introduction
Application programs can execute `INSERT` statement through connectors to insert rows. The TAOS CLI can also be used to manually insert data. Application programs can execute `INSERT` statement through connectors to insert rows. The TDengine CLI can also be used to manually insert data.
### Insert Single Row ### Insert Single Row
......
--- ---
title: 立即开始 title: 立即开始
description: ' Docker,安装包或使用 apt-get 快速安装 TDengine, 通过命令行程序TAOS CLI和工具 taosdemo 快速体验 TDengine 功能' description: ' Docker,安装包或使用 apt-get 快速安装 TDengine, 通过命令行程序TDengine CLI和工具 taosdemo 快速体验 TDengine 功能'
--- ---
import Tabs from "@theme/Tabs"; import Tabs from "@theme/Tabs";
...@@ -122,7 +122,7 @@ select * from t; ...@@ -122,7 +122,7 @@ select * from t;
Query OK, 2 row(s) in set (0.003128s) Query OK, 2 row(s) in set (0.003128s)
``` ```
除执行 SQL 语句外,系统管理员还可以从 TDengine CLI 进行检查系统运行状态、添加删除用户账号等操作。TAOS CLI 连同应用驱动也可以独立安装在 Linux 或 Windows 机器上运行,更多细节请参考 [这里](../reference/taos-shell/) 除执行 SQL 语句外,系统管理员还可以从 TDengine CLI 进行检查系统运行状态、添加删除用户账号等操作。TDengine CLI 连同应用驱动也可以独立安装在 Linux 或 Windows 机器上运行,更多细节请参考 [这里](../reference/taos-shell/)
## 使用 taosBenchmark 体验写入速度 ## 使用 taosBenchmark 体验写入速度
......
...@@ -65,6 +65,12 @@ extern "C" { ...@@ -65,6 +65,12 @@ extern "C" {
(list) = NULL; \ (list) = NULL; \
} while (0) } while (0)
#define NODES_CLEAR_LIST(list) \
do { \
nodesClearList((list)); \
(list) = NULL; \
} while (0)
typedef enum ENodeType { typedef enum ENodeType {
// Syntax nodes are used in parser and planner module, and some are also used in executor module, such as COLUMN, // Syntax nodes are used in parser and planner module, and some are also used in executor module, such as COLUMN,
// VALUE, OPERATOR, FUNCTION and so on. // VALUE, OPERATOR, FUNCTION and so on.
......
...@@ -69,6 +69,7 @@ typedef struct SScanLogicNode { ...@@ -69,6 +69,7 @@ typedef struct SScanLogicNode {
int16_t tsColId; int16_t tsColId;
double filesFactor; double filesFactor;
SArray* pSmaIndexes; SArray* pSmaIndexes;
SNodeList* pPartTags;
} SScanLogicNode; } SScanLogicNode;
typedef struct SJoinLogicNode { typedef struct SJoinLogicNode {
...@@ -257,7 +258,7 @@ typedef struct STableScanPhysiNode { ...@@ -257,7 +258,7 @@ typedef struct STableScanPhysiNode {
double ratio; double ratio;
int32_t dataRequired; int32_t dataRequired;
SNodeList* pDynamicScanFuncs; SNodeList* pDynamicScanFuncs;
SNodeList* pPartitionKeys; SNodeList* pPartitionTags;
int64_t interval; int64_t interval;
int64_t offset; int64_t offset;
int64_t sliding; int64_t sliding;
......
...@@ -873,7 +873,8 @@ static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) ...@@ -873,7 +873,8 @@ static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray)
pAction->rawWritten = 0; pAction->rawWritten = 0;
pAction->msgSent = 0; pAction->msgSent = 0;
pAction->msgReceived = 0; pAction->msgReceived = 0;
if (pAction->errCode == TSDB_CODE_RPC_REDIRECT) { if (pAction->errCode == TSDB_CODE_RPC_REDIRECT || pAction->errCode == TSDB_CODE_SYN_NOT_IN_NEW_CONFIG ||
pAction->errCode == TSDB_CODE_SYN_INTERNAL_ERROR || pAction->errCode == TSDB_CODE_SYN_NOT_LEADER) {
pAction->epSet.inUse = (pAction->epSet.inUse + 1) % pAction->epSet.numOfEps; pAction->epSet.inUse = (pAction->epSet.inUse + 1) % pAction->epSet.numOfEps;
mDebug("trans:%d, %s:%d execute status is reset and set epset inuse:%d", pTrans->id, mndTransStr(pAction->stage), mDebug("trans:%d, %s:%d execute status is reset and set epset inuse:%d", pTrans->id, mndTransStr(pAction->stage),
action, pAction->epSet.inUse); action, pAction->epSet.inUse);
...@@ -1126,7 +1127,6 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans) ...@@ -1126,7 +1127,6 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
pTrans->code = terrno; pTrans->code = terrno;
mError("trans:%d, %s:%d is executed and failed to sync to other mnodes since %s", pTrans->id, mError("trans:%d, %s:%d is executed and failed to sync to other mnodes since %s", pTrans->id,
mndTransStr(pAction->stage), pAction->id, terrstr()); mndTransStr(pAction->stage), pAction->id, terrstr());
break;
} }
} else if (code == TSDB_CODE_ACTION_IN_PROGRESS) { } else if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
mDebug("trans:%d, %s:%d is in progress and wait it finish", pTrans->id, mndTransStr(pAction->stage), pAction->id); mDebug("trans:%d, %s:%d is in progress and wait it finish", pTrans->id, mndTransStr(pAction->stage), pAction->id);
...@@ -1134,8 +1134,6 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans) ...@@ -1134,8 +1134,6 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
} else { } else {
terrno = code; terrno = code;
pTrans->code = code; pTrans->code = code;
mError("trans:%d, %s:%d failed to execute since %s", pTrans->id, mndTransStr(pAction->stage), pAction->id,
terrstr());
break; break;
} }
} }
......
...@@ -72,7 +72,17 @@ static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -72,7 +72,17 @@ static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) {
return -1; return -1;
} }
return syncPropose(pVnode->sync, &rpcMsg, false); int32_t code = syncPropose(pVnode->sync, &rpcMsg, false);
if (code != 0) {
vDebug("vgId:%d, failed to propose reconfig msg since %s", TD_VID(pVnode), terrstr());
if (syncLeaderTransfer(pVnode->sync) != 0) {
vError("vgId:%d, failed to transfer leader since %s", TD_VID(pVnode), terrstr());
} else {
vDebug("vgId:%d, transfer leader success, propose reconfig config again", TD_VID(pVnode));
}
}
return code;
} }
void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
......
...@@ -13,7 +13,6 @@ ...@@ -13,7 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "tref.h"
#include "filter.h" #include "filter.h"
#include "function.h" #include "function.h"
#include "functionMgt.h" #include "functionMgt.h"
...@@ -21,6 +20,7 @@ ...@@ -21,6 +20,7 @@
#include "querynodes.h" #include "querynodes.h"
#include "tfill.h" #include "tfill.h"
#include "tname.h" #include "tname.h"
#include "tref.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "tglobal.h" #include "tglobal.h"
...@@ -79,7 +79,7 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) { ...@@ -79,7 +79,7 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
#define realloc u_realloc #define realloc u_realloc
#endif #endif
#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st))) #define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st)))
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0) #define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0)
int32_t getMaximumIdleDurationSec() { return tsShellActivityTimer * 2; } int32_t getMaximumIdleDurationSec() { return tsShellActivityTimer * 2; }
...@@ -2395,7 +2395,7 @@ typedef struct SFetchRspHandleWrapper { ...@@ -2395,7 +2395,7 @@ typedef struct SFetchRspHandleWrapper {
} SFetchRspHandleWrapper; } SFetchRspHandleWrapper;
int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code) { int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code) {
SFetchRspHandleWrapper* pWrapper = (SFetchRspHandleWrapper*) param; SFetchRspHandleWrapper* pWrapper = (SFetchRspHandleWrapper*)param;
SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pWrapper->exchangeId); SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pWrapper->exchangeId);
if (pExchangeInfo == NULL) { if (pExchangeInfo == NULL) {
...@@ -2403,7 +2403,7 @@ int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code) ...@@ -2403,7 +2403,7 @@ int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code)
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t index = pWrapper->sourceIndex; int32_t index = pWrapper->sourceIndex;
SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index); SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
...@@ -2411,9 +2411,9 @@ int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code) ...@@ -2411,9 +2411,9 @@ int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code)
SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp; SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
pRsp->numOfRows = htonl(pRsp->numOfRows); pRsp->numOfRows = htonl(pRsp->numOfRows);
pRsp->compLen = htonl(pRsp->compLen); pRsp->compLen = htonl(pRsp->compLen);
pRsp->numOfCols = htonl(pRsp->numOfCols); pRsp->numOfCols = htonl(pRsp->numOfCols);
pRsp->useconds = htobe64(pRsp->useconds); pRsp->useconds = htobe64(pRsp->useconds);
ASSERT(pRsp != NULL); ASSERT(pRsp != NULL);
qDebug("%s fetch rsp received, index:%d, rows:%d", pSourceDataInfo->taskId, index, pRsp->numOfRows); qDebug("%s fetch rsp received, index:%d, rows:%d", pSourceDataInfo->taskId, index, pRsp->numOfRows);
...@@ -2475,9 +2475,9 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf ...@@ -2475,9 +2475,9 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf
pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, sourceIndex, totalSources); pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, sourceIndex, totalSources);
pMsg->header.vgId = htonl(pSource->addr.nodeId); pMsg->header.vgId = htonl(pSource->addr.nodeId);
pMsg->sId = htobe64(pSource->schedId); pMsg->sId = htobe64(pSource->schedId);
pMsg->taskId = htobe64(pSource->taskId); pMsg->taskId = htobe64(pSource->taskId);
pMsg->queryId = htobe64(pTaskInfo->id.queryId); pMsg->queryId = htobe64(pTaskInfo->id.queryId);
// send the fetch remote task result reques // send the fetch remote task result reques
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
...@@ -2489,14 +2489,14 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf ...@@ -2489,14 +2489,14 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf
} }
SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper)); SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper));
pWrapper->exchangeId = pExchangeInfo->self; pWrapper->exchangeId = pExchangeInfo->self;
pWrapper->sourceIndex = sourceIndex; pWrapper->sourceIndex = sourceIndex;
pMsgSendInfo->param = pWrapper; pMsgSendInfo->param = pWrapper;
pMsgSendInfo->msgInfo.pData = pMsg; pMsgSendInfo->msgInfo.pData = pMsg;
pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq); pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq);
pMsgSendInfo->msgType = TDMT_VND_FETCH; pMsgSendInfo->msgType = TDMT_VND_FETCH;
pMsgSendInfo->fp = loadRemoteDataCallback; pMsgSendInfo->fp = loadRemoteDataCallback;
int64_t transporterId = 0; int64_t transporterId = 0;
int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo); int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
...@@ -2645,7 +2645,7 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx ...@@ -2645,7 +2645,7 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
if (pRsp->numOfRows == 0) { if (pRsp->numOfRows == 0) {
qDebug("%s vgId:%d, taskId:0x%" PRIx64 " index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 qDebug("%s vgId:%d, taskId:0x%" PRIx64 " index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64
", completed:%d try next %d/%"PRIzu, ", completed:%d try next %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, i, pDataInfo->totalRows, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, i, pDataInfo->totalRows,
pExchangeInfo->loadInfo.totalRows, completed + 1, i + 1, totalSources); pExchangeInfo->loadInfo.totalRows, completed + 1, i + 1, totalSources);
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
...@@ -2663,8 +2663,9 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx ...@@ -2663,8 +2663,9 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
} }
if (pRsp->completed == 1) { if (pRsp->completed == 1) {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " index:%d completed, numOfRows:%d, rowsOfSource:%" PRIu64 qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 ", completed:%d try next %d/%" PRIzu, " index:%d completed, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64
", completed:%d try next %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, i, pRes->info.rows, pDataInfo->totalRows, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, i, pRes->info.rows, pDataInfo->totalRows,
pLoadInfo->totalRows, pLoadInfo->totalSize, completed + 1, i + 1, totalSources); pLoadInfo->totalRows, pLoadInfo->totalSize, completed + 1, i + 1, totalSources);
completed += 1; completed += 1;
...@@ -2718,7 +2719,7 @@ static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) { ...@@ -2718,7 +2719,7 @@ static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
int64_t endTs = taosGetTimestampUs(); int64_t endTs = taosGetTimestampUs();
qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%.2fms", GET_TASKID(pTaskInfo), qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%.2fms", GET_TASKID(pTaskInfo),
totalSources, (endTs - startTs)/1000.0); totalSources, (endTs - startTs) / 1000.0);
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
pOperator->cost.openCost = taosGetTimestampUs() - startTs; pOperator->cost.openCost = taosGetTimestampUs() - startTs;
...@@ -2849,8 +2850,8 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const ...@@ -2849,8 +2850,8 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const
SSourceDataInfo dataInfo = {0}; SSourceDataInfo dataInfo = {0};
dataInfo.status = EX_SOURCE_DATA_NOT_READY; dataInfo.status = EX_SOURCE_DATA_NOT_READY;
dataInfo.taskId = id; dataInfo.taskId = id;
dataInfo.index = i; dataInfo.index = i;
SSourceDataInfo *pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo); SSourceDataInfo* pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
if (pDs == NULL) { if (pDs == NULL) {
taosArrayDestroy(pInfo->pSourceDataInfo); taosArrayDestroy(pInfo->pSourceDataInfo);
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
...@@ -2864,7 +2865,7 @@ static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* ...@@ -2864,7 +2865,7 @@ static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo*
size_t numOfSources = LIST_LENGTH(pExNode->pSrcEndPoints); size_t numOfSources = LIST_LENGTH(pExNode->pSrcEndPoints);
if (numOfSources == 0) { if (numOfSources == 0) {
qError("%s invalid number: %d of sources in exchange operator", id, (int32_t) numOfSources); qError("%s invalid number: %d of sources in exchange operator", id, (int32_t)numOfSources);
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
...@@ -2898,16 +2899,16 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode ...@@ -2898,16 +2899,16 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode
tsem_init(&pInfo->ready, 0, 0); tsem_init(&pInfo->ready, 0, 0);
pInfo->seqLoadData = false; pInfo->seqLoadData = false;
pInfo->pTransporter = pTransporter; pInfo->pTransporter = pTransporter;
pInfo->pResult = createResDataBlock(pExNode->node.pOutputDataBlockDesc); pInfo->pResult = createResDataBlock(pExNode->node.pOutputDataBlockDesc);
pOperator->name = "ExchangeOperator"; pOperator->name = "ExchangeOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
pOperator->blocking = false; pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->numOfExprs = pInfo->pResult->info.numOfCols; pOperator->numOfExprs = pInfo->pResult->info.numOfCols;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL, pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL,
destroyExchangeOperatorInfo, NULL, NULL, NULL); destroyExchangeOperatorInfo, NULL, NULL, NULL);
...@@ -4066,7 +4067,7 @@ void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -4066,7 +4067,7 @@ void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) {
} }
void doDestroyExchangeOperatorInfo(void* param) { void doDestroyExchangeOperatorInfo(void* param) {
SExchangeInfo* pExInfo = (SExchangeInfo*) param; SExchangeInfo* pExInfo = (SExchangeInfo*)param;
taosArrayDestroy(pExInfo->pSources); taosArrayDestroy(pExInfo->pSources);
taosArrayDestroy(pExInfo->pSourceDataInfo); taosArrayDestroy(pExInfo->pSourceDataInfo);
...@@ -4554,8 +4555,8 @@ int32_t extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskI ...@@ -4554,8 +4555,8 @@ int32_t extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskI
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SArray* groupKey){ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SArray* groupKey) {
if(groupKey == NULL) { if (groupKey == NULL) {
return TDB_CODE_SUCCESS; return TDB_CODE_SUCCESS;
} }
...@@ -4564,11 +4565,11 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, ...@@ -4564,11 +4565,11 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
int32_t keyLen = 0; int32_t keyLen = 0;
void *keyBuf = NULL; void* keyBuf = NULL;
int32_t numOfGroupCols = taosArrayGetSize(groupKey); int32_t numOfGroupCols = taosArrayGetSize(groupKey);
for (int32_t j = 0; j < numOfGroupCols; ++j) { for (int32_t j = 0; j < numOfGroupCols; ++j) {
SColumn* pCol = taosArrayGet(groupKey, j); SColumn* pCol = taosArrayGet(groupKey, j);
keyLen += pCol->bytes; // actual data + null_flag keyLen += pCol->bytes; // actual data + null_flag
} }
int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols; int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols;
...@@ -4579,9 +4580,9 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, ...@@ -4579,9 +4580,9 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
for(int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++){ for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++) {
STableKeyInfo *info = taosArrayGet(pTableListInfo->pTableList, i); STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
SMetaReader mr = {0}; SMetaReader mr = {0};
metaReaderInit(&mr, pHandle->meta, 0); metaReaderInit(&mr, pHandle->meta, 0);
metaGetTableEntryByUid(&mr, info->uid); metaGetTableEntryByUid(&mr, info->uid);
...@@ -4590,23 +4591,23 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, ...@@ -4590,23 +4591,23 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
for (int32_t j = 0; j < numOfGroupCols; ++j) { for (int32_t j = 0; j < numOfGroupCols; ++j) {
SColumn* pCol = taosArrayGet(groupKey, j); SColumn* pCol = taosArrayGet(groupKey, j);
if(strcmp(pCol->name, "tbname") == 0){ if (strcmp(pCol->name, "tbname") == 0) {
isNull[i] = 0; isNull[i] = 0;
memcpy(pStart, mr.me.name, strlen(mr.me.name)); memcpy(pStart, mr.me.name, strlen(mr.me.name));
pStart += strlen(mr.me.name); pStart += strlen(mr.me.name);
}else{ } else {
STagVal tagVal = {0}; STagVal tagVal = {0};
tagVal.cid = pCol->colId; tagVal.cid = pCol->colId;
const char* p = metaGetTableTagVal(&mr.me, pCol->type, &tagVal); const char* p = metaGetTableTagVal(&mr.me, pCol->type, &tagVal);
if(p == NULL){ if (p == NULL) {
isNull[j] = 1; isNull[j] = 1;
continue; continue;
} }
isNull[i] = 0; isNull[i] = 0;
if (pCol->type == TSDB_DATA_TYPE_JSON) { if (pCol->type == TSDB_DATA_TYPE_JSON) {
// int32_t dataLen = getJsonValueLen(pkey->pData); // int32_t dataLen = getJsonValueLen(pkey->pData);
// memcpy(pStart, (pkey->pData), dataLen); // memcpy(pStart, (pkey->pData), dataLen);
// pStart += dataLen; // pStart += dataLen;
} else if (IS_VAR_DATA_TYPE(pCol->type)) { } else if (IS_VAR_DATA_TYPE(pCol->type)) {
memcpy(pStart, tagVal.pData, tagVal.nData); memcpy(pStart, tagVal.pData, tagVal.nData);
pStart += tagVal.nData; pStart += tagVal.nData;
...@@ -4618,7 +4619,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, ...@@ -4618,7 +4619,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
} }
} }
int32_t len = (int32_t) (pStart - (char*)keyBuf); int32_t len = (int32_t)(pStart - (char*)keyBuf);
uint64_t* groupId = taosHashGet(pTableListInfo->map, keyBuf, len); uint64_t* groupId = taosHashGet(pTableListInfo->map, keyBuf, len);
if (groupId) { if (groupId) {
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), groupId, sizeof(uint64_t)); taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), groupId, sizeof(uint64_t));
...@@ -4653,7 +4654,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4653,7 +4654,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return NULL; return NULL;
} }
SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionKeys); SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionTags);
code = generateGroupIdMap(pTableListInfo, pHandle, groupKeys); // todo for json code = generateGroupIdMap(pTableListInfo, pHandle, groupKeys); // todo for json
taosArrayDestroy(groupKeys); taosArrayDestroy(groupKeys);
if (code) { if (code) {
...@@ -4661,7 +4662,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4661,7 +4662,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return NULL; return NULL;
} }
SOperatorInfo* pOperator = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo); SOperatorInfo* pOperator = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo);
STableScanInfo* pScanInfo = pOperator->info; STableScanInfo* pScanInfo = pOperator->info;
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder; pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
return pOperator; return pOperator;
...@@ -4671,11 +4672,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4671,11 +4672,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SArray* dataReaders = taosArrayInit(8, POINTER_BYTES); SArray* dataReaders = taosArrayInit(8, POINTER_BYTES);
createMultipleDataReaders(pTableScanNode, pHandle, pTableListInfo, dataReaders, queryId, taskId, pTagCond); createMultipleDataReaders(pTableScanNode, pHandle, pTableListInfo, dataReaders, queryId, taskId, pTagCond);
extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo); extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo);
SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionKeys); SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionTags);
generateGroupIdMap(pTableListInfo, pHandle, groupKeys); //todo for json generateGroupIdMap(pTableListInfo, pHandle, groupKeys); // todo for json
taosArrayDestroy(groupKeys); taosArrayDestroy(groupKeys);
SOperatorInfo* pOperator = SOperatorInfo* pOperator = createTableMergeScanOperatorInfo(pTableScanNode, dataReaders, pHandle, pTaskInfo);
createTableMergeScanOperatorInfo(pTableScanNode, dataReaders, pHandle, pTaskInfo);
STableScanInfo* pScanInfo = pOperator->info; STableScanInfo* pScanInfo = pOperator->info;
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder; pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
return pOperator; return pOperator;
...@@ -4700,16 +4700,15 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4700,16 +4700,15 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
qDebug("%s pDataReader is not NULL", GET_TASKID(pTaskInfo)); qDebug("%s pDataReader is not NULL", GET_TASKID(pTaskInfo));
} }
SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionKeys); SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionTags);
int32_t code = generateGroupIdMap(pTableListInfo, pHandle, groupKeys); //todo for json int32_t code = generateGroupIdMap(pTableListInfo, pHandle, groupKeys); // todo for json
taosArrayDestroy(groupKeys); taosArrayDestroy(groupKeys);
if (code){ if (code) {
tsdbCleanupReadHandle(pDataReader); tsdbCleanupReadHandle(pDataReader);
return NULL; return NULL;
} }
SOperatorInfo* pOperator = SOperatorInfo* pOperator = createStreamScanOperatorInfo(pDataReader, pHandle, pTableScanNode, pTaskInfo, &twSup);
createStreamScanOperatorInfo(pDataReader, pHandle, pTableScanNode, pTaskInfo, &twSup);
return pOperator; return pOperator;
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
...@@ -4718,7 +4717,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4718,7 +4717,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode; STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode;
int32_t code = getTableList(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableListInfo, pScanPhyNode->node.pConditions); int32_t code = getTableList(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableListInfo,
pScanPhyNode->node.pConditions);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return NULL; return NULL;
} }
...@@ -4796,7 +4796,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4796,7 +4796,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pTaskInfo, isStream); createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pTaskInfo, isStream);
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) {
SMergeIntervalPhysiNode * pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode; SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode;
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num); SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
...@@ -5034,7 +5034,7 @@ SArray* extractColumnInfo(SNodeList* pNodeList) { ...@@ -5034,7 +5034,7 @@ SArray* extractColumnInfo(SNodeList* pNodeList) {
} }
SArray* extractPartitionColInfo(SNodeList* pNodeList) { SArray* extractPartitionColInfo(SNodeList* pNodeList) {
if(!pNodeList) return NULL; if (!pNodeList) return NULL;
size_t numOfCols = LIST_LENGTH(pNodeList); size_t numOfCols = LIST_LENGTH(pNodeList);
SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn)); SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn));
if (pList == NULL) { if (pList == NULL) {
...@@ -5139,7 +5139,7 @@ int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STa ...@@ -5139,7 +5139,7 @@ int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STa
SArray* res = taosArrayInit(8, sizeof(uint64_t)); SArray* res = taosArrayInit(8, sizeof(uint64_t));
code = doFilterTag(pTagCond, &metaArg, res); code = doFilterTag(pTagCond, &metaArg, res);
if (code == TSDB_CODE_INDEX_REBUILDING){ // todo if (code == TSDB_CODE_INDEX_REBUILDING) { // todo
// doFilter(); // doFilter();
} else if (code != TSDB_CODE_SUCCESS) { } else if (code != TSDB_CODE_SUCCESS) {
qError("failed to get tableIds, reason: %s, suid: %" PRIu64 "", tstrerror(code), tableUid); qError("failed to get tableIds, reason: %s, suid: %" PRIu64 "", tstrerror(code), tableUid);
...@@ -5499,4 +5499,3 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlF ...@@ -5499,4 +5499,3 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlF
} }
return code; return code;
} }
...@@ -355,6 +355,7 @@ static SNode* logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) { ...@@ -355,6 +355,7 @@ static SNode* logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) {
COPY_SCALAR_FIELD(watermark); COPY_SCALAR_FIELD(watermark);
COPY_SCALAR_FIELD(tsColId); COPY_SCALAR_FIELD(tsColId);
COPY_SCALAR_FIELD(filesFactor); COPY_SCALAR_FIELD(filesFactor);
CLONE_NODE_LIST_FIELD(pPartTags);
return (SNode*)pDst; return (SNode*)pDst;
} }
...@@ -495,7 +496,7 @@ static SNode* physiTableScanCopy(const STableScanPhysiNode* pSrc, STableScanPhys ...@@ -495,7 +496,7 @@ static SNode* physiTableScanCopy(const STableScanPhysiNode* pSrc, STableScanPhys
COPY_SCALAR_FIELD(ratio); COPY_SCALAR_FIELD(ratio);
COPY_SCALAR_FIELD(dataRequired); COPY_SCALAR_FIELD(dataRequired);
CLONE_NODE_LIST_FIELD(pDynamicScanFuncs); CLONE_NODE_LIST_FIELD(pDynamicScanFuncs);
CLONE_NODE_LIST_FIELD(pPartitionKeys); CLONE_NODE_LIST_FIELD(pPartitionTags);
COPY_SCALAR_FIELD(interval); COPY_SCALAR_FIELD(interval);
COPY_SCALAR_FIELD(offset); COPY_SCALAR_FIELD(offset);
COPY_SCALAR_FIELD(sliding); COPY_SCALAR_FIELD(sliding);
......
...@@ -515,6 +515,7 @@ static const char* jkScanLogicPlanScanPseudoCols = "ScanPseudoCols"; ...@@ -515,6 +515,7 @@ static const char* jkScanLogicPlanScanPseudoCols = "ScanPseudoCols";
static const char* jkScanLogicPlanTableId = "TableId"; static const char* jkScanLogicPlanTableId = "TableId";
static const char* jkScanLogicPlanTableType = "TableType"; static const char* jkScanLogicPlanTableType = "TableType";
static const char* jkScanLogicPlanTagCond = "TagCond"; static const char* jkScanLogicPlanTagCond = "TagCond";
static const char* jkScanLogicPlanPartTags = "PartTags";
static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) { static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
const SScanLogicNode* pNode = (const SScanLogicNode*)pObj; const SScanLogicNode* pNode = (const SScanLogicNode*)pObj;
...@@ -535,6 +536,9 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) { ...@@ -535,6 +536,9 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkScanLogicPlanTagCond, nodeToJson, pNode->pTagCond); code = tjsonAddObject(pJson, jkScanLogicPlanTagCond, nodeToJson, pNode->pTagCond);
} }
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkScanLogicPlanPartTags, pNode->pPartTags);
}
return code; return code;
} }
...@@ -559,6 +563,9 @@ static int32_t jsonToLogicScanNode(const SJson* pJson, void* pObj) { ...@@ -559,6 +563,9 @@ static int32_t jsonToLogicScanNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkScanLogicPlanTagCond, &pNode->pTagCond); code = jsonToNodeObject(pJson, jkScanLogicPlanTagCond, &pNode->pTagCond);
} }
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkScanLogicPlanPartTags, &pNode->pPartTags);
}
return code; return code;
} }
...@@ -1368,6 +1375,7 @@ static const char* jkTableScanPhysiPlanTriggerType = "triggerType"; ...@@ -1368,6 +1375,7 @@ static const char* jkTableScanPhysiPlanTriggerType = "triggerType";
static const char* jkTableScanPhysiPlanWatermark = "watermark"; static const char* jkTableScanPhysiPlanWatermark = "watermark";
static const char* jkTableScanPhysiPlanTsColId = "tsColId"; static const char* jkTableScanPhysiPlanTsColId = "tsColId";
static const char* jkTableScanPhysiPlanFilesFactor = "FilesFactor"; static const char* jkTableScanPhysiPlanFilesFactor = "FilesFactor";
static const char* jkTableScanPhysiPlanPartitionTags = "PartitionTags";
static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) { static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj; const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj;
...@@ -1421,6 +1429,9 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) { ...@@ -1421,6 +1429,9 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddDoubleToObject(pJson, jkTableScanPhysiPlanFilesFactor, pNode->filesFactor); code = tjsonAddDoubleToObject(pJson, jkTableScanPhysiPlanFilesFactor, pNode->filesFactor);
} }
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkTableScanPhysiPlanPartitionTags, pNode->pPartitionTags);
}
return code; return code;
} }
...@@ -1446,30 +1457,24 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) { ...@@ -1446,30 +1457,24 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) {
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanDataRequired, pNode->dataRequired, code); tjsonGetNumberValue(pJson, jkTableScanPhysiPlanDataRequired, pNode->dataRequired, code);
;
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkTableScanPhysiPlanDynamicScanFuncs, &pNode->pDynamicScanFuncs); code = jsonToNodeList(pJson, jkTableScanPhysiPlanDynamicScanFuncs, &pNode->pDynamicScanFuncs);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanInterval, pNode->interval, code); tjsonGetNumberValue(pJson, jkTableScanPhysiPlanInterval, pNode->interval, code);
;
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanOffset, pNode->offset, code); tjsonGetNumberValue(pJson, jkTableScanPhysiPlanOffset, pNode->offset, code);
;
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanSliding, pNode->sliding, code); tjsonGetNumberValue(pJson, jkTableScanPhysiPlanSliding, pNode->sliding, code);
;
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanIntervalUnit, pNode->intervalUnit, code); tjsonGetNumberValue(pJson, jkTableScanPhysiPlanIntervalUnit, pNode->intervalUnit, code);
;
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanSlidingUnit, pNode->slidingUnit, code); tjsonGetNumberValue(pJson, jkTableScanPhysiPlanSlidingUnit, pNode->slidingUnit, code);
;
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanTriggerType, pNode->triggerType, code); tjsonGetNumberValue(pJson, jkTableScanPhysiPlanTriggerType, pNode->triggerType, code);
...@@ -1483,6 +1488,10 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) { ...@@ -1483,6 +1488,10 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetDoubleValue(pJson, jkTableScanPhysiPlanFilesFactor, &pNode->filesFactor); code = tjsonGetDoubleValue(pJson, jkTableScanPhysiPlanFilesFactor, &pNode->filesFactor);
} }
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkTableScanPhysiPlanPartitionTags, &pNode->pPartitionTags);
}
return code; return code;
} }
......
...@@ -109,9 +109,8 @@ static bool osdMayBeOptimized(SLogicNode* pNode) { ...@@ -109,9 +109,8 @@ static bool osdMayBeOptimized(SLogicNode* pNode) {
return false; return false;
} }
if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent) || if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent) ||
(QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode->pParent) && (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode->pParent) && pNode->pParent->pParent &&
pNode->pParent->pParent && QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent->pParent))) {
QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent->pParent)) ) {
return true; return true;
} }
return !osdHaveNormalCol(((SAggLogicNode*)pNode->pParent)->pGroupKeys); return !osdHaveNormalCol(((SAggLogicNode*)pNode->pParent)->pGroupKeys);
...@@ -222,9 +221,8 @@ static int32_t osdGetDataRequired(SNodeList* pFuncs) { ...@@ -222,9 +221,8 @@ static int32_t osdGetDataRequired(SNodeList* pFuncs) {
static void setScanWindowInfo(SScanLogicNode* pScan) { static void setScanWindowInfo(SScanLogicNode* pScan) {
SLogicNode* pParent = pScan->node.pParent; SLogicNode* pParent = pScan->node.pParent;
if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pParent) && if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pParent) && pParent->pParent &&
pParent->pParent && QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pParent->pParent)) {
QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pParent->pParent)) {
pParent = pParent->pParent; pParent = pParent->pParent;
} }
if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pParent)) { if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pParent)) {
...@@ -1041,12 +1039,55 @@ static int32_t smaOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) ...@@ -1041,12 +1039,55 @@ static int32_t smaOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan)
return smaOptimizeImpl(pCxt, pLogicSubplan, pScan); return smaOptimizeImpl(pCxt, pLogicSubplan, pScan);
} }
static EDealRes partTagsOptHasColImpl(SNode* pNode, void* pContext) {
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
if (COLUMN_TYPE_TAG != ((SColumnNode*)pNode)->colType) {
*(bool*)pContext = true;
return DEAL_RES_END;
}
}
return DEAL_RES_CONTINUE;
}
static bool partTagsOptHasCol(SNodeList* pPartKeys) {
bool hasCol = false;
nodesWalkExprs(pPartKeys, partTagsOptHasColImpl, &hasCol);
return hasCol;
}
static bool partTagsOptMayBeOptimized(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_PARTITION != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren) ||
QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pNode->pChildren, 0))) {
return false;
}
return !partTagsOptHasCol(((SPartitionLogicNode*)pNode)->pPartitionKeys);
}
static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
SPartitionLogicNode* pPart =
(SPartitionLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, partTagsOptMayBeOptimized);
if (NULL == pPart) {
return TSDB_CODE_SUCCESS;
}
SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pPart->node.pChildren, 0);
TSWAP(pPart->pPartitionKeys, pScan->pPartTags);
int32_t code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pPart, (SLogicNode*)pScan);
if (TSDB_CODE_SUCCESS == code) {
NODES_CLEAR_LIST(pPart->node.pChildren);
nodesDestroyNode((SNode*)pPart);
}
return code;
}
// clang-format off // clang-format off
static const SOptimizeRule optimizeRuleSet[] = { static const SOptimizeRule optimizeRuleSet[] = {
{.pName = "OptimizeScanData", .optimizeFunc = osdOptimize}, {.pName = "OptimizeScanData", .optimizeFunc = osdOptimize},
{.pName = "ConditionPushDown", .optimizeFunc = cpdOptimize}, {.pName = "ConditionPushDown", .optimizeFunc = cpdOptimize},
{.pName = "OrderByPrimaryKey", .optimizeFunc = opkOptimize}, {.pName = "OrderByPrimaryKey", .optimizeFunc = opkOptimize},
{.pName = "SmaIndex", .optimizeFunc = smaOptimize} {.pName = "SmaIndex", .optimizeFunc = smaOptimize},
{.pName = "PartitionByTags", .optimizeFunc = partTagsOptimize}
}; };
// clang-format on // clang-format on
......
...@@ -500,7 +500,9 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp ...@@ -500,7 +500,9 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp
tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName); tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
pTableScan->dataRequired = pScanLogicNode->dataRequired; pTableScan->dataRequired = pScanLogicNode->dataRequired;
pTableScan->pDynamicScanFuncs = nodesCloneList(pScanLogicNode->pDynamicScanFuncs); pTableScan->pDynamicScanFuncs = nodesCloneList(pScanLogicNode->pDynamicScanFuncs);
if (NULL != pScanLogicNode->pDynamicScanFuncs && NULL == pTableScan->pDynamicScanFuncs) { pTableScan->pPartitionTags = nodesCloneList(pScanLogicNode->pPartTags);
if ((NULL != pScanLogicNode->pDynamicScanFuncs && NULL == pTableScan->pDynamicScanFuncs) ||
(NULL != pScanLogicNode->pPartTags && NULL == pTableScan->pPartitionTags)) {
nodesDestroyNode((SNode*)pTableScan); nodesDestroyNode((SNode*)pTableScan);
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
......
...@@ -4,6 +4,11 @@ system sh/deploy.sh -n dnode2 -i 2 ...@@ -4,6 +4,11 @@ system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3 system sh/deploy.sh -n dnode3 -i 3
system sh/deploy.sh -n dnode4 -i 4 system sh/deploy.sh -n dnode4 -i 4
system sh/deploy.sh -n dnode5 -i 5 system sh/deploy.sh -n dnode5 -i 5
system sh/cfg.sh -n dnode1 -c transPullupInterval -v 1
system sh/cfg.sh -n dnode2 -c transPullupInterval -v 1
system sh/cfg.sh -n dnode3 -c transPullupInterval -v 1
system sh/cfg.sh -n dnode4 -c transPullupInterval -v 1
system sh/cfg.sh -n dnode5 -c transPullupInterval -v 1
system sh/cfg.sh -n dnode1 -c supportVnodes -v 0 system sh/cfg.sh -n dnode1 -c supportVnodes -v 0
system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start system sh/exec.sh -n dnode2 -s start
...@@ -128,9 +133,9 @@ if $rows != 1 then ...@@ -128,9 +133,9 @@ if $rows != 1 then
endi endi
if $data(2)[4] == leader then if $data(2)[4] == leader then
$leaderExist = 1 $leaderExist = 1
$leaderVnode = 4 $leaderVnode = 2
$follower1 = 2 $follower1 = 3
$follower2 = 3 $follower2 = 4
endi endi
if $data(2)[6] == leader then if $data(2)[6] == leader then
$leaderExist = 1 $leaderExist = 1
...@@ -140,9 +145,9 @@ if $data(2)[6] == leader then ...@@ -140,9 +145,9 @@ if $data(2)[6] == leader then
endi endi
if $data(2)[8] == leader then if $data(2)[8] == leader then
$leaderExist = 1 $leaderExist = 1
$leaderVnode = 2 $leaderVnode = 4
$follower1 = 3 $follower1 = 2
$follower2 = 4 $follower2 = 3
endi endi
if $leaderExist != 1 then if $leaderExist != 1 then
goto step3 goto step3
...@@ -171,8 +176,6 @@ if $rows != 1 then ...@@ -171,8 +176,6 @@ if $rows != 1 then
return -1 return -1
endi endi
return
print =============== step33: move follower1 print =============== step33: move follower1
print redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode 5 print redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode 5
sql redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode 5 sql redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode 5
......
...@@ -250,9 +250,26 @@ class TDTestCase: ...@@ -250,9 +250,26 @@ class TDTestCase:
# wait db ready # wait db ready
while 1: while 1:
tdSql.query("show databases") tdSql.query("show databases")
if tdSql.getRows() == 4: if tdSql.getRows() == 4:
print (tdSql.getData(0,0), tdSql.getData(1,0),tdSql.getData(2,0),) print ('==================================================')
break print (tdSql.getData(0,0), tdSql.getData(1,0),tdSql.getData(2,0))
index = 0
if tdSql.getData(0,0) == parameterDict['dbName']:
index = 0
elif tdSql.getData(1,0) == parameterDict['dbName']:
index = 1
elif tdSql.getData(2,0) == parameterDict['dbName']:
index = 2
elif tdSql.getData(3,0) == parameterDict['dbName']:
index = 3
else:
continue
if tdSql.getData(index,19) == 'ready':
print("******************** index: %d"%index)
break
continue
else: else:
time.sleep(1) time.sleep(1)
...@@ -378,8 +395,27 @@ class TDTestCase: ...@@ -378,8 +395,27 @@ class TDTestCase:
while 1: while 1:
tdSql.query("show databases") tdSql.query("show databases")
if tdSql.getRows() == 5: if tdSql.getRows() == 5:
print (tdSql.getData(0,0), tdSql.getData(1,0),tdSql.getData(2,0),) print ('==================================================')
break print (tdSql.getData(0,0), tdSql.getData(1,0),tdSql.getData(2,0),tdSql.getData(3,0),tdSql.getData(4,0))
index = 0
if tdSql.getData(0,0) == parameterDict['dbName']:
index = 0
elif tdSql.getData(1,0) == parameterDict['dbName']:
index = 1
elif tdSql.getData(2,0) == parameterDict['dbName']:
index = 2
elif tdSql.getData(3,0) == parameterDict['dbName']:
index = 3
elif tdSql.getData(4,0) == parameterDict['dbName']:
index = 4
else:
continue
if tdSql.getData(index,19) == 'ready':
print("******************** index: %d"%index)
break
continue
else: else:
time.sleep(1) time.sleep(1)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册