提交 5a56e396 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/vnode_compact

...@@ -243,8 +243,8 @@ sudo launchctl load -w /Library/LaunchDaemons/limit.maxfiles.plist ...@@ -243,8 +243,8 @@ sudo launchctl load -w /Library/LaunchDaemons/limit.maxfiles.plist
``` ```
launchctl limit maxfiles launchctl limit maxfiles
``` ```
### 19 建库时提示Out of dnode ### 20 建库时提示 Out of dnodes
该提示是创建db的vnode数量不够了,需要的vnode不能超过了dnode中vnode的上限。因为系统默认是一个dnode中有cpu核数两倍的vnode,也可以通过配置文件中的参数supportVnodes控制。 该提示是创建 db 的 vnode 数量不够了,需要的 vnode 不能超过了 dnode 中 vnode 的上限。因为系统默认是一个 dnode 中有 CPU 核数两倍的 vnode,也可以通过配置文件中的参数 supportVnodes 控制。
正常调大taos.cfg种这个supportVnodes参数即可。 正常调大 taos.cfg 中 supportVnodes 参数即可。
...@@ -562,6 +562,7 @@ typedef struct SQueryInserterNode { ...@@ -562,6 +562,7 @@ typedef struct SQueryInserterNode {
char tableName[TSDB_TABLE_NAME_LEN]; char tableName[TSDB_TABLE_NAME_LEN];
int32_t vgId; int32_t vgId;
SEpSet epSet; SEpSet epSet;
bool explain;
} SQueryInserterNode; } SQueryInserterNode;
typedef struct SDataDeleterNode { typedef struct SDataDeleterNode {
......
...@@ -311,7 +311,7 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -311,7 +311,7 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
int32_t srcVgId = req.srcVgId; int32_t srcVgId = req.srcVgId;
int32_t dstVgId = req.dstVgId; int32_t dstVgId = req.dstVgId;
dInfo("vgId:%d, start to alter vnode hashrange[%u, %u), dstVgId:%d", req.srcVgId, req.hashBegin, req.hashEnd, dInfo("vgId:%d, start to alter vnode hashrange:[%u, %u], dstVgId:%d", req.srcVgId, req.hashBegin, req.hashEnd,
req.dstVgId); req.dstVgId);
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, srcVgId); SVnodeObj *pVnode = vmAcquireVnode(pMgmt, srcVgId);
...@@ -342,7 +342,7 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -342,7 +342,7 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return -1; return -1;
} }
dInfo("vgId:%d, start to open vnode", dstVgId); dInfo("vgId:%d, open vnode", dstVgId);
SVnode *pImpl = vnodeOpen(dstPath, pMgmt->pTfs, pMgmt->msgCb); SVnode *pImpl = vnodeOpen(dstPath, pMgmt->pTfs, pMgmt->msgCb);
if (pImpl == NULL) { if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode at %s since %s", dstVgId, dstPath, terrstr()); dError("vgId:%d, failed to open vnode at %s since %s", dstVgId, dstPath, terrstr());
...@@ -376,7 +376,7 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -376,7 +376,7 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
} }
int32_t vgId = alterReq.vgId; int32_t vgId = alterReq.vgId;
dInfo("vgId:%d, start to alter vnode, replica:%d selfIndex:%d strict:%d", vgId, alterReq.replica, alterReq.selfIndex, dInfo("vgId:%d, start to alter vnode replica:%d selfIndex:%d strict:%d", vgId, alterReq.replica, alterReq.selfIndex,
alterReq.strict); alterReq.strict);
for (int32_t i = 0; i < alterReq.replica; ++i) { for (int32_t i = 0; i < alterReq.replica; ++i) {
SReplica *pReplica = &alterReq.replicas[i]; SReplica *pReplica = &alterReq.replicas[i];
...@@ -423,7 +423,7 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -423,7 +423,7 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return -1; return -1;
} }
dInfo("vgId:%d, start to open vnode", vgId); dInfo("vgId:%d, close vnode", vgId);
SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb); SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb);
if (pImpl == NULL) { if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr()); dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
......
...@@ -57,7 +57,7 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { ...@@ -57,7 +57,7 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
if (IsReq(pMsg)) { if (IsReq(pMsg)) {
if (code != 0) { if (code != 0) {
if (terrno != 0) code = terrno; if (terrno != 0) code = terrno;
dGError("msg:%p, failed to process since %s", pMsg, terrstr(code)); dGError("msg:%p, failed to process since %s, type:%s", pMsg, terrstr(code), TMSG_INFO(pMsg->msgType));
} }
vmSendRsp(pMsg, code); vmSendRsp(pMsg, code);
} }
......
...@@ -669,9 +669,9 @@ _OVER: ...@@ -669,9 +669,9 @@ _OVER:
mGDebug( mGDebug(
"msg:%p, type:%s failed to process since %s, mnode restored:%d stopped:%d, sync restored:%d " "msg:%p, type:%s failed to process since %s, mnode restored:%d stopped:%d, sync restored:%d "
"role:%s, redirect numOfEps:%d inUse:%d", "role:%s, redirect numOfEps:%d inUse:%d, type:%s",
pMsg, TMSG_INFO(pMsg->msgType), terrstr(), pMnode->restored, pMnode->stopped, state.restored, pMsg, TMSG_INFO(pMsg->msgType), terrstr(), pMnode->restored, pMnode->stopped, state.restored,
syncStr(state.restored), epSet.numOfEps, epSet.inUse); syncStr(state.restored), epSet.numOfEps, epSet.inUse, TMSG_INFO(pMsg->msgType));
if (epSet.numOfEps <= 0) return -1; if (epSet.numOfEps <= 0) return -1;
......
...@@ -428,7 +428,7 @@ static void *mndBuildAlterVnodeHashRangeReq(SMnode *pMnode, SVgObj *pVgroup, int ...@@ -428,7 +428,7 @@ static void *mndBuildAlterVnodeHashRangeReq(SMnode *pMnode, SVgObj *pVgroup, int
.hashEnd = pVgroup->hashEnd, .hashEnd = pVgroup->hashEnd,
}; };
mInfo("vgId:%d, build alter vnode hashrange req, dstVgId:%d, begin:%u, end:%u", pVgroup->vgId, dstVgId, mInfo("vgId:%d, build alter vnode hashrange req, dstVgId:%d, hashrange:[%u, %u]", pVgroup->vgId, dstVgId,
pVgroup->hashBegin, pVgroup->hashEnd); pVgroup->hashBegin, pVgroup->hashEnd);
int32_t contLen = tSerializeSAlterVnodeHashRangeReq(NULL, 0, &alterReq); int32_t contLen = tSerializeSAlterVnodeHashRangeReq(NULL, 0, &alterReq);
if (contLen < 0) { if (contLen < 0) {
...@@ -1901,12 +1901,6 @@ static int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj ...@@ -1901,12 +1901,6 @@ static int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj
} }
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER; if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER;
mInfo("vgId:%d, vgroup info after adjust replica, replica:%d hashBegin:%u hashEnd:%u vnode:0 dnode:%d", newVg1.vgId,
newVg1.replica, newVg1.hashBegin, newVg1.hashEnd, newVg1.vnodeGid[0].dnodeId);
for (int32_t i = 0; i < newVg1.replica; ++i) {
mInfo("vgId:%d, vnode:%d dnode:%d", newVg1.vgId, i, newVg1.vnodeGid[i].dnodeId);
}
SVgObj newVg2 = {0}; SVgObj newVg2 = {0};
memcpy(&newVg2, &newVg1, sizeof(SVgObj)); memcpy(&newVg2, &newVg1, sizeof(SVgObj));
newVg1.replica = 1; newVg1.replica = 1;
...@@ -1918,13 +1912,13 @@ static int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj ...@@ -1918,13 +1912,13 @@ static int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj
memcpy(&newVg2.vnodeGid[0], &newVg2.vnodeGid[1], sizeof(SVnodeGid)); memcpy(&newVg2.vnodeGid[0], &newVg2.vnodeGid[1], sizeof(SVnodeGid));
memset(&newVg2.vnodeGid[1], 0, sizeof(SVnodeGid)); memset(&newVg2.vnodeGid[1], 0, sizeof(SVnodeGid));
mInfo("vgId:%d, vgroup info after adjust hash, replica:%d hashBegin:%u hashEnd:%u vnode:0 dnode:%d", newVg1.vgId, mInfo("vgId:%d, vgroup info after split, replica:%d hashrange:[%u, %u] vnode:0 dnode:%d", newVg1.vgId, newVg1.replica,
newVg1.replica, newVg1.hashBegin, newVg1.hashEnd, newVg1.vnodeGid[0].dnodeId); newVg1.hashBegin, newVg1.hashEnd, newVg1.vnodeGid[0].dnodeId);
for (int32_t i = 0; i < newVg1.replica; ++i) { for (int32_t i = 0; i < newVg1.replica; ++i) {
mInfo("vgId:%d, vnode:%d dnode:%d", newVg1.vgId, i, newVg1.vnodeGid[i].dnodeId); mInfo("vgId:%d, vnode:%d dnode:%d", newVg1.vgId, i, newVg1.vnodeGid[i].dnodeId);
} }
mInfo("vgId:%d, vgroup info after adjust hash, replica:%d hashBegin:%u hashEnd:%u vnode:0 dnode:%d", newVg2.vgId, mInfo("vgId:%d, vgroup info after split, replica:%d hashrange:[%u, %u] vnode:0 dnode:%d", newVg2.vgId, newVg2.replica,
newVg2.replica, newVg2.hashBegin, newVg2.hashEnd, newVg2.vnodeGid[0].dnodeId); newVg2.hashBegin, newVg2.hashEnd, newVg2.vnodeGid[0].dnodeId);
for (int32_t i = 0; i < newVg1.replica; ++i) { for (int32_t i = 0; i < newVg1.replica; ++i) {
mInfo("vgId:%d, vnode:%d dnode:%d", newVg2.vgId, i, newVg2.vnodeGid[i].dnodeId); mInfo("vgId:%d, vnode:%d dnode:%d", newVg2.vgId, i, newVg2.vnodeGid[i].dnodeId);
} }
......
...@@ -171,8 +171,8 @@ int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnod ...@@ -171,8 +171,8 @@ int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnod
return -1; return -1;
} }
vInfo("vgId:%d, start to alter hashrange from [%u, %u) to [%u, %u)", pReq->srcVgId, info.config.hashBegin, vInfo("vgId:%d, alter hashrange from [%u, %u] to [%u, %u]", pReq->srcVgId, info.config.hashBegin, info.config.hashEnd,
info.config.hashEnd, pReq->hashBegin, pReq->hashEnd); pReq->hashBegin, pReq->hashEnd);
info.config.vgId = pReq->dstVgId; info.config.vgId = pReq->dstVgId;
info.config.hashBegin = pReq->hashBegin; info.config.hashBegin = pReq->hashBegin;
info.config.hashEnd = pReq->hashEnd; info.config.hashEnd = pReq->hashEnd;
...@@ -204,7 +204,7 @@ int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnod ...@@ -204,7 +204,7 @@ int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnod
return -1; return -1;
} }
vInfo("vgId:%d, start to rename %s to %s", pReq->dstVgId, srcPath, dstPath); vInfo("vgId:%d, rename %s to %s", pReq->dstVgId, srcPath, dstPath);
ret = vnodeRenameVgroupId(srcPath, dstPath, pReq->srcVgId, pReq->dstVgId, pTfs); ret = vnodeRenameVgroupId(srcPath, dstPath, pReq->srcVgId, pReq->dstVgId, pTfs);
if (ret < 0) { if (ret < 0) {
vError("vgId:%d, failed to rename vnode from %s to %s since %s", pReq->dstVgId, srcPath, dstPath, vError("vgId:%d, failed to rename vnode from %s to %s since %s", pReq->dstVgId, srcPath, dstPath,
......
...@@ -219,7 +219,7 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) ...@@ -219,7 +219,7 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs)
isWeak, isBlock, msg, numOfMsgs, arrayPos, pMsg->info.handle); isWeak, isBlock, msg, numOfMsgs, arrayPos, pMsg->info.handle);
if (!pVnode->restored) { if (!pVnode->restored) {
vGError("vgId:%d, msg:%p failed to process since restore not finished", vgId, pMsg); vGError("vgId:%d, msg:%p failed to process since restore not finished, type:%s", vgId, pMsg, TMSG_INFO(pMsg->msgType));
terrno = TSDB_CODE_SYN_RESTORING; terrno = TSDB_CODE_SYN_RESTORING;
vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_SYN_RESTORING); vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_SYN_RESTORING);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
...@@ -228,7 +228,7 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) ...@@ -228,7 +228,7 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs)
} }
if (pMsgArr == NULL || pIsWeakArr == NULL) { if (pMsgArr == NULL || pIsWeakArr == NULL) {
vGError("vgId:%d, msg:%p failed to process since out of memory", vgId, pMsg); vGError("vgId:%d, msg:%p failed to process since out of memory, type:%s", vgId, pMsg, TMSG_INFO(pMsg->msgType));
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
vnodeHandleProposeError(pVnode, pMsg, terrno); vnodeHandleProposeError(pVnode, pMsg, terrno);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
...@@ -281,7 +281,7 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) ...@@ -281,7 +281,7 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs)
vnodeIsMsgBlock(pMsg->msgType), msg, numOfMsgs, pMsg->info.handle); vnodeIsMsgBlock(pMsg->msgType), msg, numOfMsgs, pMsg->info.handle);
if (!pVnode->restored) { if (!pVnode->restored) {
vGError("vgId:%d, msg:%p failed to process since restore not finished", vgId, pMsg); vGError("vgId:%d, msg:%p failed to process since restore not finished, type:%s", vgId, pMsg, TMSG_INFO(pMsg->msgType));
vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_SYN_RESTORING); vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_SYN_RESTORING);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
...@@ -635,7 +635,7 @@ int32_t vnodeSyncStart(SVnode *pVnode) { ...@@ -635,7 +635,7 @@ int32_t vnodeSyncStart(SVnode *pVnode) {
} }
void vnodeSyncPreClose(SVnode *pVnode) { void vnodeSyncPreClose(SVnode *pVnode) {
vInfo("vgId:%d, pre close sync", pVnode->config.vgId); vInfo("vgId:%d, sync pre close", pVnode->config.vgId);
syncLeaderTransfer(pVnode->sync); syncLeaderTransfer(pVnode->sync);
syncPreStop(pVnode->sync); syncPreStop(pVnode->sync);
...@@ -649,7 +649,7 @@ void vnodeSyncPreClose(SVnode *pVnode) { ...@@ -649,7 +649,7 @@ void vnodeSyncPreClose(SVnode *pVnode) {
} }
void vnodeSyncPostClose(SVnode *pVnode) { void vnodeSyncPostClose(SVnode *pVnode) {
vInfo("vgId:%d, post close sync", pVnode->config.vgId); vInfo("vgId:%d, sync post close", pVnode->config.vgId);
syncPostStop(pVnode->sync); syncPostStop(pVnode->sync);
} }
......
...@@ -46,6 +46,7 @@ typedef struct SDataInserterHandle { ...@@ -46,6 +46,7 @@ typedef struct SDataInserterHandle {
uint64_t cachedSize; uint64_t cachedSize;
TdThreadMutex mutex; TdThreadMutex mutex;
tsem_t ready; tsem_t ready;
bool explain;
} SDataInserterHandle; } SDataInserterHandle;
typedef struct SSubmitRspParam { typedef struct SSubmitRspParam {
...@@ -333,26 +334,28 @@ int32_t dataBlocksToSubmitReq(SDataInserterHandle* pInserter, void** pMsg, int32 ...@@ -333,26 +334,28 @@ int32_t dataBlocksToSubmitReq(SDataInserterHandle* pInserter, void** pMsg, int32
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) { static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle; SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
taosArrayPush(pInserter->pDataBlocks, &pInput->pData); if (!pInserter->explain) {
void* pMsg = NULL; taosArrayPush(pInserter->pDataBlocks, &pInput->pData);
int32_t msgLen = 0; void* pMsg = NULL;
int32_t code = dataBlocksToSubmitReq(pInserter, &pMsg, &msgLen); int32_t msgLen = 0;
if (code) { int32_t code = dataBlocksToSubmitReq(pInserter, &pMsg, &msgLen);
return code; if (code) {
} return code;
}
taosArrayClear(pInserter->pDataBlocks); taosArrayClear(pInserter->pDataBlocks);
code = sendSubmitRequest(pInserter, pMsg, msgLen, pInserter->pParam->readHandle->pMsgCb->clientRpc, code = sendSubmitRequest(pInserter, pMsg, msgLen, pInserter->pParam->readHandle->pMsgCb->clientRpc,
&pInserter->pNode->epSet); &pInserter->pNode->epSet);
if (code) { if (code) {
return code; return code;
} }
tsem_wait(&pInserter->ready); tsem_wait(&pInserter->ready);
if (pInserter->submitRes.code) { if (pInserter->submitRes.code) {
return pInserter->submitRes.code; return pInserter->submitRes.code;
}
} }
*pContinue = true; *pContinue = true;
...@@ -412,6 +415,7 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat ...@@ -412,6 +415,7 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat
inserter->pParam = pParam; inserter->pParam = pParam;
inserter->status = DS_BUF_EMPTY; inserter->status = DS_BUF_EMPTY;
inserter->queryEnd = false; inserter->queryEnd = false;
inserter->explain = pInserterNode->explain;
int64_t suid = 0; int64_t suid = 0;
int32_t code = int32_t code =
......
...@@ -178,7 +178,7 @@ const char* nodesNodeName(ENodeType type) { ...@@ -178,7 +178,7 @@ const char* nodesNodeName(ENodeType type) {
case QUERY_NODE_SHOW_DB_ALIVE_STMT: case QUERY_NODE_SHOW_DB_ALIVE_STMT:
return "ShowDbAliveStmt"; return "ShowDbAliveStmt";
case QUERY_NODE_SHOW_CLUSTER_ALIVE_STMT: case QUERY_NODE_SHOW_CLUSTER_ALIVE_STMT:
return "ShowClusterAliveStmt"; return "ShowClusterAliveStmt";
case QUERY_NODE_REDISTRIBUTE_VGROUP_STMT: case QUERY_NODE_REDISTRIBUTE_VGROUP_STMT:
return "RedistributeVgroupStmt"; return "RedistributeVgroupStmt";
case QUERY_NODE_SPLIT_VGROUP_STMT: case QUERY_NODE_SPLIT_VGROUP_STMT:
...@@ -2597,6 +2597,7 @@ static const char* jkQueryInsertPhysiPlanTableType = "TableType"; ...@@ -2597,6 +2597,7 @@ static const char* jkQueryInsertPhysiPlanTableType = "TableType";
static const char* jkQueryInsertPhysiPlanTableFName = "TableFName"; static const char* jkQueryInsertPhysiPlanTableFName = "TableFName";
static const char* jkQueryInsertPhysiPlanVgId = "VgId"; static const char* jkQueryInsertPhysiPlanVgId = "VgId";
static const char* jkQueryInsertPhysiPlanEpSet = "EpSet"; static const char* jkQueryInsertPhysiPlanEpSet = "EpSet";
static const char* jkQueryInsertPhysiPlanExplain = "Explain";
static int32_t physiQueryInsertNodeToJson(const void* pObj, SJson* pJson) { static int32_t physiQueryInsertNodeToJson(const void* pObj, SJson* pJson) {
const SQueryInserterNode* pNode = (const SQueryInserterNode*)pObj; const SQueryInserterNode* pNode = (const SQueryInserterNode*)pObj;
...@@ -2623,6 +2624,9 @@ static int32_t physiQueryInsertNodeToJson(const void* pObj, SJson* pJson) { ...@@ -2623,6 +2624,9 @@ static int32_t physiQueryInsertNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkQueryInsertPhysiPlanEpSet, epSetToJson, &pNode->epSet); code = tjsonAddObject(pJson, jkQueryInsertPhysiPlanEpSet, epSetToJson, &pNode->epSet);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkQueryInsertPhysiPlanExplain, pNode->explain);
}
return code; return code;
} }
...@@ -2652,6 +2656,9 @@ static int32_t jsonToPhysiQueryInsertNode(const SJson* pJson, void* pObj) { ...@@ -2652,6 +2656,9 @@ static int32_t jsonToPhysiQueryInsertNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonToObject(pJson, jkQueryInsertPhysiPlanEpSet, jsonToEpSet, &pNode->epSet); code = tjsonToObject(pJson, jkQueryInsertPhysiPlanEpSet, jsonToEpSet, &pNode->epSet);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkQueryInsertPhysiPlanExplain, &pNode->explain);
}
return code; return code;
} }
......
...@@ -3244,7 +3244,8 @@ enum { ...@@ -3244,7 +3244,8 @@ enum {
PHY_QUERY_INSERT_CODE_TABLE_TYPE, PHY_QUERY_INSERT_CODE_TABLE_TYPE,
PHY_QUERY_INSERT_CODE_TABLE_NAME, PHY_QUERY_INSERT_CODE_TABLE_NAME,
PHY_QUERY_INSERT_CODE_VG_ID, PHY_QUERY_INSERT_CODE_VG_ID,
PHY_QUERY_INSERT_CODE_EP_SET PHY_QUERY_INSERT_CODE_EP_SET,
PHY_QUERY_INSERT_CODE_EXPLAIN
}; };
static int32_t physiQueryInsertNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { static int32_t physiQueryInsertNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
...@@ -3272,6 +3273,9 @@ static int32_t physiQueryInsertNodeToMsg(const void* pObj, STlvEncoder* pEncoder ...@@ -3272,6 +3273,9 @@ static int32_t physiQueryInsertNodeToMsg(const void* pObj, STlvEncoder* pEncoder
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_QUERY_INSERT_CODE_EP_SET, epSetToMsg, &pNode->epSet); code = tlvEncodeObj(pEncoder, PHY_QUERY_INSERT_CODE_EP_SET, epSetToMsg, &pNode->epSet);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeBool(pEncoder, PHY_QUERY_INSERT_CODE_EXPLAIN, pNode->explain);
}
return code; return code;
} }
...@@ -3307,6 +3311,9 @@ static int32_t msgToPhysiQueryInsertNode(STlvDecoder* pDecoder, void* pObj) { ...@@ -3307,6 +3311,9 @@ static int32_t msgToPhysiQueryInsertNode(STlvDecoder* pDecoder, void* pObj) {
case PHY_QUERY_INSERT_CODE_EP_SET: case PHY_QUERY_INSERT_CODE_EP_SET:
code = tlvDecodeObjFromTlv(pTlv, msgToEpSet, &pNode->epSet); code = tlvDecodeObjFromTlv(pTlv, msgToEpSet, &pNode->epSet);
break; break;
case PHY_QUERY_INSERT_CODE_EXPLAIN:
code = tlvDecodeBool(pTlv, &pNode->explain);
break;
default: default:
break; break;
} }
......
...@@ -1718,6 +1718,7 @@ static int32_t createQueryInserter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNod ...@@ -1718,6 +1718,7 @@ static int32_t createQueryInserter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNod
strcpy(pInserter->tableName, pModify->tableName); strcpy(pInserter->tableName, pModify->tableName);
pInserter->vgId = pModify->pVgroupList->vgroups[0].vgId; pInserter->vgId = pModify->pVgroupList->vgroups[0].vgId;
pInserter->epSet = pModify->pVgroupList->vgroups[0].epSet; pInserter->epSet = pModify->pVgroupList->vgroups[0].epSet;
pInserter->explain = (QUERY_NODE_EXPLAIN_STMT == nodeType(pCxt->pPlanCxt->pAstRoot) ? true : false);
vgroupInfoToNodeAddr(pModify->pVgroupList->vgroups, &pSubplan->execNode); vgroupInfoToNodeAddr(pModify->pVgroupList->vgroups, &pSubplan->execNode);
int32_t code = setListSlotId(pCxt, pSubplan->pNode->pOutputDataBlockDesc->dataBlockId, -1, pModify->pInsertCols, int32_t code = setListSlotId(pCxt, pSubplan->pNode->pOutputDataBlockDesc->dataBlockId, -1, pModify->pInsertCols,
......
...@@ -537,7 +537,9 @@ int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRs ...@@ -537,7 +537,9 @@ int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRs
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC); SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC);
schProcessOnDataFetched(pJob); if (!SCH_IS_INSERT_JOB(pJob)) {
schProcessOnDataFetched(pJob);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -682,7 +684,7 @@ void schFreeJobImpl(void *job) { ...@@ -682,7 +684,7 @@ void schFreeJobImpl(void *job) {
int32_t schJobFetchRows(SSchJob *pJob) { int32_t schJobFetchRows(SSchJob *pJob) {
int32_t code = 0; int32_t code = 0;
if (!(pJob->attr.explainMode == EXPLAIN_MODE_STATIC)) { if (!(pJob->attr.explainMode == EXPLAIN_MODE_STATIC) && !(SCH_IS_EXPLAIN_JOB(pJob) && SCH_IS_INSERT_JOB(pJob))) {
SCH_ERR_RET(schLaunchFetchTask(pJob)); SCH_ERR_RET(schLaunchFetchTask(pJob));
if (schChkCurrentOp(pJob, SCH_OP_FETCH, true)) { if (schChkCurrentOp(pJob, SCH_OP_FETCH, true)) {
......
...@@ -341,6 +341,14 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa ...@@ -341,6 +341,14 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
} }
if (taosArrayGetSize(pTask->parents) == 0 && SCH_IS_EXPLAIN_JOB(pJob) && SCH_IS_INSERT_JOB(pJob)) {
SRetrieveTableRsp *pRsp = NULL;
SCH_ERR_JRET(qExecExplainEnd(pJob->explainCtx, &pRsp));
if (pRsp) {
SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp));
}
}
SQueryTableRsp rsp = {0}; SQueryTableRsp rsp = {0};
if (tDeserializeSQueryTableRsp(msg, msgSize, &rsp) < 0) { if (tDeserializeSQueryTableRsp(msg, msgSize, &rsp) < 0) {
SCH_TASK_ELOG("tDeserializeSQueryTableRsp failed, msgSize:%d", msgSize); SCH_TASK_ELOG("tDeserializeSQueryTableRsp failed, msgSize:%d", msgSize);
......
...@@ -295,7 +295,7 @@ int32_t tfsRmdir(STfs *pTfs, const char *rname) { ...@@ -295,7 +295,7 @@ int32_t tfsRmdir(STfs *pTfs, const char *rname) {
for (int32_t id = 0; id < pTier->ndisk; id++) { for (int32_t id = 0; id < pTier->ndisk; id++) {
STfsDisk *pDisk = pTier->disks[id]; STfsDisk *pDisk = pTier->disks[id];
snprintf(aname, TMPNAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, rname); snprintf(aname, TMPNAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, rname);
uInfo("tfs remove dir : path:%s aname:%s rname:[%s]", pDisk->path, aname, rname); uInfo("tfs remove dir:%s aname:%s rname:[%s]", pDisk->path, aname, rname);
taosRemoveDir(aname); taosRemoveDir(aname);
} }
} }
......
...@@ -29,43 +29,62 @@ sql insert into tb4 values (now, 4, "Bitmap Heap Scan on tenk1 t1 (cost=5.07..2 ...@@ -29,43 +29,62 @@ sql insert into tb4 values (now, 4, "Bitmap Heap Scan on tenk1 t1 (cost=5.07..2
#sql create table tb4 using st2 tags(4); #sql create table tb4 using st2 tags(4);
#sql insert into tb4 values (now, 4, "Bitmap Heap Scan on tenk1 t1 (cost=5.07..229.20 rows=101 width=244) (actual time=0.080..0.526 rows=100 loops=1)"); #sql insert into tb4 values (now, 4, "Bitmap Heap Scan on tenk1 t1 (cost=5.07..229.20 rows=101 width=244) (actual time=0.080..0.526 rows=100 loops=1)");
# for explain insert into select
sql create table t1 (ts timestamp, f1 int, f2 binary(200), t1 int);
print ======== step2 print ======== step2
sql explain select * from st1 where -2; sql explain select * from st1 where -2;
sql explain insert into t1 select * from st1 where -2;
sql explain select ts from tb1; sql explain select ts from tb1;
sql explain insert into t1(ts) select ts from tb1;
sql explain select * from st1; sql explain select * from st1;
sql explain insert into t1 select * from st1;
sql explain select * from st1 order by ts; sql explain select * from st1 order by ts;
sql explain insert into t1 select * from st1 order by ts;
sql explain select * from information_schema.ins_stables; sql explain select * from information_schema.ins_stables;
sql explain select count(*),sum(f1) from tb1; sql explain select count(*),sum(f1) from tb1;
sql explain select count(*),sum(f1) from st1; sql explain select count(*),sum(f1) from st1;
sql explain select count(*),sum(f1) from st1 group by f1; sql explain select count(*),sum(f1) from st1 group by f1;
#sql explain select count(f1) from tb1 interval(10s, 2s) sliding(3s) fill(prev); #sql explain select count(f1) from tb1 interval(10s, 2s) sliding(3s) fill(prev);
sql explain insert into t1(ts, t1) select _wstart, count(*) from st1 interval(10s);
print ======== step3 print ======== step3
sql explain verbose true select * from st1 where -2; sql explain verbose true select * from st1 where -2;
sql explain verbose true insert into t1 select * from st1 where -2;
sql explain verbose true select ts from tb1 where f1 > 0; sql explain verbose true select ts from tb1 where f1 > 0;
sql explain verbose true insert into t1(ts) select ts from tb1 where f1 > 0;
sql explain verbose true select * from st1 where f1 > 0 and ts > '2020-10-31 00:00:00' and ts < '2021-10-31 00:00:00'; sql explain verbose true select * from st1 where f1 > 0 and ts > '2020-10-31 00:00:00' and ts < '2021-10-31 00:00:00';
sql explain verbose true insert into t1 select * from st1 where f1 > 0 and ts > '2020-10-31 00:00:00' and ts < '2021-10-31 00:00:00';
sql explain verbose true select count(*) from st1 partition by tbname slimit 1 soffset 2 limit 2 offset 1; sql explain verbose true select count(*) from st1 partition by tbname slimit 1 soffset 2 limit 2 offset 1;
sql explain verbose true select * from information_schema.ins_stables where db_name='db2'; sql explain verbose true select * from information_schema.ins_stables where db_name='db2';
sql explain verbose true select st1.f1 from st1 join st2 on st1.ts=st2.ts and st1.f1 > 0; sql explain verbose true select st1.f1 from st1 join st2 on st1.ts=st2.ts and st1.f1 > 0;
sql explain verbose true insert into t1(ts) select st1.f1 from st1 join st2 on st1.ts=st2.ts and st1.f1 > 0;
sql explain verbose true insert into t1(ts, t1) select _wstart, count(*) from st1 interval(10s);
print ======== step4 print ======== step4
sql explain analyze select ts from st1 where -2; sql explain analyze select ts from st1 where -2;
sql explain analyze insert into t1(ts) select ts from st1 where -2;
sql explain analyze select ts from tb1; sql explain analyze select ts from tb1;
sql explain analyze insert into t1(ts) select ts from tb1;
sql explain analyze select ts from st1; sql explain analyze select ts from st1;
sql explain analyze select ts from st1; sql explain analyze insert into t1(ts) select ts from st1;
sql explain analyze select ts from st1 order by ts; sql explain analyze select ts from st1 order by ts;
sql explain analyze insert into t1(ts) select ts from st1 order by ts;
sql explain analyze select * from information_schema.ins_stables; sql explain analyze select * from information_schema.ins_stables;
sql explain analyze select count(*),sum(f1) from tb1; sql explain analyze select count(*),sum(f1) from tb1;
sql explain analyze select count(*),sum(f1) from st1; sql explain analyze select count(*),sum(f1) from st1;
sql explain analyze select count(*),sum(f1) from st1 group by f1; sql explain analyze select count(*),sum(f1) from st1 group by f1;
sql explain analyze insert into t1(ts, t1) select _wstart, count(*) from st1 interval(10s);
print ======== step5 print ======== step5
sql explain analyze verbose true select ts from st1 where -2; sql explain analyze verbose true select ts from st1 where -2;
sql explain analyze verbose true insert into t1(ts) select ts from st1 where -2;
sql explain analyze verbose true select ts from tb1; sql explain analyze verbose true select ts from tb1;
sql explain analyze verbose true insert into t1(ts) select ts from tb1;
sql explain analyze verbose true select ts from st1; sql explain analyze verbose true select ts from st1;
sql explain analyze verbose true select ts from st1; sql explain analyze verbose true insert into t1(ts) select ts from st1;
sql explain analyze verbose true select ts from st1 order by ts; sql explain analyze verbose true select ts from st1 order by ts;
sql explain analyze verbose true insert into t1(ts) select ts from st1 order by ts;
sql explain analyze verbose true select * from information_schema.ins_stables; sql explain analyze verbose true select * from information_schema.ins_stables;
sql explain analyze verbose true select count(*),sum(f1) from tb1; sql explain analyze verbose true select count(*),sum(f1) from tb1;
sql explain analyze verbose true select count(*),sum(f1) from st1; sql explain analyze verbose true select count(*),sum(f1) from st1;
......
...@@ -183,6 +183,7 @@ class TDTestCase: ...@@ -183,6 +183,7 @@ class TDTestCase:
tdLog.info("restart taosd to ensure that the data falls into the disk") tdLog.info("restart taosd to ensure that the data falls into the disk")
tdSql.query("flush database %s"%(paraDict['dbName'])) tdSql.query("flush database %s"%(paraDict['dbName']))
time.sleep(10)
# update to half tables # update to half tables
paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl / 2) paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl / 2)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册