提交 afd01bb9 编写于 作者: H Hongze Cheng

Merge branch 'main' of https://github.com/taosdata/TDengine into feat/TS-2846

...@@ -124,7 +124,7 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal) ...@@ -124,7 +124,7 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal)
vmFreeQueue(pMgmt, pVnode); vmFreeQueue(pMgmt, pVnode);
if (commitAndRemoveWal) { if (commitAndRemoveWal) {
dInfo("vgId:%d, commit data", pVnode->vgId); dInfo("vgId:%d, commit data for vnode split", pVnode->vgId);
vnodeSyncCommit(pVnode->pImpl); vnodeSyncCommit(pVnode->pImpl);
vnodeBegin(pVnode->pImpl); vnodeBegin(pVnode->pImpl);
dInfo("vgId:%d, commit data finished", pVnode->vgId); dInfo("vgId:%d, commit data finished", pVnode->vgId);
......
...@@ -1664,7 +1664,7 @@ _err: ...@@ -1664,7 +1664,7 @@ _err:
} }
static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
SDropIndexReq req = {0}; SDropIndexReq req = {0};
pRsp->msgType = TDMT_VND_CREATE_INDEX_RSP; pRsp->msgType = TDMT_VND_DROP_INDEX_RSP;
pRsp->code = TSDB_CODE_SUCCESS; pRsp->code = TSDB_CODE_SUCCESS;
pRsp->pCont = NULL; pRsp->pCont = NULL;
pRsp->contLen = 0; pRsp->contLen = 0;
...@@ -1673,6 +1673,7 @@ static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t version, void *p ...@@ -1673,6 +1673,7 @@ static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t version, void *p
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
return -1; return -1;
} }
if (metaDropIndexFromSTable(pVnode->pMeta, version, &req) < 0) { if (metaDropIndexFromSTable(pVnode->pMeta, version, &req) < 0) {
pRsp->code = terrno; pRsp->code = terrno;
return -1; return -1;
......
...@@ -1462,7 +1462,7 @@ SListCell* nodesListErase(SNodeList* pList, SListCell* pCell) { ...@@ -1462,7 +1462,7 @@ SListCell* nodesListErase(SNodeList* pList, SListCell* pCell) {
} }
void nodesListInsertList(SNodeList* pTarget, SListCell* pPos, SNodeList* pSrc) { void nodesListInsertList(SNodeList* pTarget, SListCell* pPos, SNodeList* pSrc) {
if (NULL == pTarget || NULL == pPos || NULL == pSrc) { if (NULL == pTarget || NULL == pPos || NULL == pSrc || NULL == pSrc->pHead) {
return; return;
} }
......
...@@ -2935,6 +2935,9 @@ static int32_t translateSelectList(STranslateContext* pCxt, SSelectStmt* pSelect ...@@ -2935,6 +2935,9 @@ static int32_t translateSelectList(STranslateContext* pCxt, SSelectStmt* pSelect
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = translateFillValues(pCxt, pSelect); code = translateFillValues(pCxt, pSelect);
} }
if (NULL == pSelect->pProjectionList || 0 >= pSelect->pProjectionList->length) {
code = TSDB_CODE_PAR_INVALID_SELECTED_EXPR;
}
return code; return code;
} }
......
...@@ -59,6 +59,8 @@ int32_t sclCreateColumnInfoData(SDataType *pType, int32_t numOfRows, SScalarPara ...@@ -59,6 +59,8 @@ int32_t sclCreateColumnInfoData(SDataType *pType, int32_t numOfRows, SScalarPara
pParam->columnData = pColumnData; pParam->columnData = pColumnData;
pParam->colAlloced = true; pParam->colAlloced = true;
pParam->numOfRows = numOfRows;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -740,6 +742,10 @@ int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *outp ...@@ -740,6 +742,10 @@ int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *outp
SCL_ERR_JRET(code); SCL_ERR_JRET(code);
} }
if (rowNum == 0) {
goto _return;
}
code = (*ffpSet.process)(params, paramNum, output); code = (*ffpSet.process)(params, paramNum, output);
if (code) { if (code) {
sclError("scalar function exec failed, funcId:%d, code:%s", node->funcId, tstrerror(code)); sclError("scalar function exec failed, funcId:%d, code:%s", node->funcId, tstrerror(code));
......
...@@ -1082,21 +1082,15 @@ void syncNodePreClose(SSyncNode* pSyncNode) { ...@@ -1082,21 +1082,15 @@ void syncNodePreClose(SSyncNode* pSyncNode) {
ASSERT(pSyncNode->pFsm != NULL); ASSERT(pSyncNode->pFsm != NULL);
ASSERT(pSyncNode->pFsm->FpApplyQueueItems != NULL); ASSERT(pSyncNode->pFsm->FpApplyQueueItems != NULL);
while (1) {
int32_t aqItems = pSyncNode->pFsm->FpApplyQueueItems(pSyncNode->pFsm);
sTrace("vgId:%d, pre close, %d items in apply queue", pSyncNode->vgId, aqItems);
if (aqItems == 0 || aqItems == -1) {
break;
}
taosMsleep(20);
}
// stop elect timer // stop elect timer
syncNodeStopElectTimer(pSyncNode); syncNodeStopElectTimer(pSyncNode);
// stop heartbeat timer // stop heartbeat timer
syncNodeStopHeartbeatTimer(pSyncNode); syncNodeStopHeartbeatTimer(pSyncNode);
// stop ping timer
syncNodeStopPingTimer(pSyncNode);
// clean rsp // clean rsp
syncRespCleanRsp(pSyncNode->pSyncRespMgr); syncRespCleanRsp(pSyncNode->pSyncRespMgr);
} }
...@@ -1120,10 +1114,11 @@ void syncNodeClose(SSyncNode* pSyncNode) { ...@@ -1120,10 +1114,11 @@ void syncNodeClose(SSyncNode* pSyncNode) {
if (pSyncNode == NULL) return; if (pSyncNode == NULL) return;
sNInfo(pSyncNode, "sync close, node:%p", pSyncNode); sNInfo(pSyncNode, "sync close, node:%p", pSyncNode);
syncRespCleanRsp(pSyncNode->pSyncRespMgr);
syncNodeStopPingTimer(pSyncNode); syncNodeStopPingTimer(pSyncNode);
syncNodeStopElectTimer(pSyncNode); syncNodeStopElectTimer(pSyncNode);
syncNodeStopHeartbeatTimer(pSyncNode); syncNodeStopHeartbeatTimer(pSyncNode);
syncNodeLogReplMgrDestroy(pSyncNode); syncNodeLogReplMgrDestroy(pSyncNode);
syncRespMgrDestroy(pSyncNode->pSyncRespMgr); syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
......
...@@ -425,21 +425,6 @@ void cliHandleResp(SCliConn* conn) { ...@@ -425,21 +425,6 @@ void cliHandleResp(SCliConn* conn) {
tDebug("%s conn %p ref by app", CONN_GET_INST_LABEL(conn), conn); tDebug("%s conn %p ref by app", CONN_GET_INST_LABEL(conn), conn);
} }
// if (TMSG_INFO(pHead->msgType - 1) != 0) {
// char buf[128] = {0};
// sprintf(buf, "%s", TMSG_INFO(pHead->msgType - 1));
// int* count = taosHashGet(pThrd->msgCount, TMSG_INFO(pHead->msgType - 1), strlen(TMSG_INFO(pHead->msgType - 1)));
// if (NULL == 0) {
// int localCount = 1;
// taosHashPut(pThrd->msgCount, TMSG_INFO(pHead->msgType - 1), strlen(TMSG_INFO(pHead->msgType - 1)), &localCount,
// sizeof(localCount));
// } else {
// int localCount = *count - 1;
// taosHashPut(pThrd->msgCount, TMSG_INFO(pHead->msgType - 1), strlen(TMSG_INFO(pHead->msgType - 1)), &localCount,
// sizeof(localCount));
// }
// }
STraceId* trace = &transMsg.info.traceId; STraceId* trace = &transMsg.info.traceId;
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, code str:%s", CONN_GET_INST_LABEL(conn), conn, tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, code str:%s", CONN_GET_INST_LABEL(conn), conn,
TMSG_INFO(pHead->msgType), conn->dst, conn->src, pHead->msgLen, tstrerror(transMsg.code)); TMSG_INFO(pHead->msgType), conn->dst, conn->src, pHead->msgLen, tstrerror(transMsg.code));
...@@ -1118,19 +1103,6 @@ void cliSend(SCliConn* pConn) { ...@@ -1118,19 +1103,6 @@ void cliSend(SCliConn* pConn) {
msgLen = (int32_t)ntohl((uint32_t)(pHead->msgLen)); msgLen = (int32_t)ntohl((uint32_t)(pHead->msgLen));
} }
// if (tmsgIsValid(pHead->msgType)) {
// char buf[128] = {0};
// sprintf(buf, "%s", TMSG_INFO(pHead->msgType));
// int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf));
// if (NULL == 0) {
// int localCount = 1;
// taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
// } else {
// int localCount = *count + 1;
// taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
// }
// }
tGDebug("%s conn %p %s is sent to %s, local info %s, len:%d", CONN_GET_INST_LABEL(pConn), pConn, tGDebug("%s conn %p %s is sent to %s, local info %s, len:%d", CONN_GET_INST_LABEL(pConn), pConn,
TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, msgLen); TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, msgLen);
...@@ -1525,16 +1497,19 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { ...@@ -1525,16 +1497,19 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
destroyCmsg(pMsg); destroyCmsg(pMsg);
return; return;
} }
if (tmsgIsValid(pMsg->msg.msgType)) {
char buf[128] = {0}; if (rpcDebugFlag & DEBUG_TRACE) {
sprintf(buf, "%s", TMSG_INFO(pMsg->msg.msgType)); if (tmsgIsValid(pMsg->msg.msgType)) {
int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf)); char buf[128] = {0};
if (NULL == 0) { sprintf(buf, "%s", TMSG_INFO(pMsg->msg.msgType));
int localCount = 1; int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf));
taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount)); if (NULL == 0) {
} else { int localCount = 1;
int localCount = *count + 1; taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount)); } else {
int localCount = *count + 1;
taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
}
} }
} }
...@@ -1782,18 +1757,20 @@ static void cliAsyncCb(uv_async_t* handle) { ...@@ -1782,18 +1757,20 @@ static void cliAsyncCb(uv_async_t* handle) {
QUEUE_MOVE(&item->qmsg, &wq); QUEUE_MOVE(&item->qmsg, &wq);
taosThreadMutexUnlock(&item->mtx); taosThreadMutexUnlock(&item->mtx);
void* pIter = taosHashIterate(pThrd->msgCount, NULL); if (rpcDebugFlag & DEBUG_TRACE) {
while (pIter != NULL) { void* pIter = taosHashIterate(pThrd->msgCount, NULL);
int* count = pIter; while (pIter != NULL) {
size_t len = 0; int* count = pIter;
char* key = taosHashGetKey(pIter, &len); size_t len = 0;
if (*count != 0) { char* key = taosHashGetKey(pIter, &len);
tDebug("key: %s count: %d", key, *count); if (*count != 0) {
} tDebug("key: %s count: %d", key, *count);
}
pIter = taosHashIterate(pThrd->msgCount, pIter); pIter = taosHashIterate(pThrd->msgCount, pIter);
}
tDebug("all conn count: %d", pThrd->newConnCount);
} }
tDebug("all conn count: %d", pThrd->newConnCount);
int8_t supportBatch = pTransInst->supportBatch; int8_t supportBatch = pTransInst->supportBatch;
if (supportBatch == 0) { if (supportBatch == 0) {
...@@ -2379,17 +2356,18 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { ...@@ -2379,17 +2356,18 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn); tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn);
} }
} }
if (rpcDebugFlag & DEBUG_TRACE) {
if (tmsgIsValid(pResp->msgType - 1)) { if (tmsgIsValid(pResp->msgType - 1)) {
char buf[128] = {0}; char buf[128] = {0};
sprintf(buf, "%s", TMSG_INFO(pResp->msgType - 1)); sprintf(buf, "%s", TMSG_INFO(pResp->msgType - 1));
int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf)); int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf));
if (NULL == 0) { if (NULL == 0) {
int localCount = 0; int localCount = 0;
taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount)); taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
} else { } else {
int localCount = *count - 1; int localCount = *count - 1;
taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount)); taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
}
} }
} }
if (pCtx->pSem != NULL) { if (pCtx->pSem != NULL) {
......
...@@ -211,8 +211,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_BUFSIZE, "Invalid func bufSize" ...@@ -211,8 +211,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_BUFSIZE, "Invalid func bufSize"
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_COMMENT, "Invalid func comment") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_COMMENT, "Invalid func comment")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_RETRIEVE, "Invalid func retrieve msg") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_RETRIEVE, "Invalid func retrieve msg")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST, "Tag index already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST, "index already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TAG_INDEX_NOT_EXIST, "Tag index not exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TAG_INDEX_NOT_EXIST, "index not exist")
// mnode-db // mnode-db
...@@ -301,9 +301,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_STREAMS, "Too many streams") ...@@ -301,9 +301,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_STREAMS, "Too many streams")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TARGET_TABLE, "Cannot write the same stable as other stream") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TARGET_TABLE, "Cannot write the same stable as other stream")
// mnode-sma // mnode-sma
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "SMA already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "index already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_NOT_EXIST, "SMA does not exist") TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_NOT_EXIST, "index not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_SMA_OPTION, "Invalid sma option") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_SMA_OPTION, "Invalid sma index option")
// dnode // dnode
TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_OFFLINE, "Dnode is offline") TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_OFFLINE, "Dnode is offline")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册