提交 84ec8a7f 编写于 作者: S Shengliang Guan

Merge branch 'main' into fix/TD-21353

......@@ -134,6 +134,13 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf
}
}
static void vmSendResponse(SRpcMsg *pMsg) {
if (pMsg->info.handle) {
SRpcMsg rsp = {.info = pMsg->info, .code = terrno};
rpcSendResponse(&rsp);
}
}
static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtype) {
const STraceId *trace = &pMsg->info.traceId;
if (pMsg->contLen < sizeof(SMsgHead)) {
......@@ -152,7 +159,9 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
if (pVnode == NULL) {
dGError("vgId:%d, msg:%p failed to put into vnode queue since %s, type:%s qtype:%d contLen:%d", pHead->vgId, pMsg,
terrstr(), TMSG_INFO(pMsg->msgType), qtype, pHead->contLen);
return terrno != 0 ? terrno : -1;
terrno = (terrno != 0) ? terrno : -1;
vmSendResponse(pMsg);
return terrno;
}
switch (qtype) {
......
......@@ -884,9 +884,9 @@ static int32_t mndRetrievePrivileges(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
db = taosHashIterate(pUser->writeDbs, NULL);
while (db != NULL) {
cols = 0;
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
char userName[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
char userName[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(userName, pUser->user, pShow->pMeta->pSchemas[cols].bytes);
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)userName, false);
char privilege[20] = {0};
......@@ -909,9 +909,9 @@ static int32_t mndRetrievePrivileges(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
char *topic = taosHashIterate(pUser->topics, NULL);
while (topic != NULL) {
cols = 0;
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
char userName[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
char userName[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(userName, pUser->user, pShow->pMeta->pSchemas[cols].bytes);
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)userName, false);
char privilege[20] = {0};
......
......@@ -182,7 +182,6 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
vError("vgId:%d, duplicate write request. version: %" PRId64 ", applied: %" PRId64 "", TD_VID(pVnode), version,
pVnode->state.applied);
terrno = TSDB_CODE_VND_DUP_REQUEST;
pRsp->info.handle = NULL;
return -1;
}
......
......@@ -5527,7 +5527,8 @@ static void getStreamQueryFirstProjectAliasName(SHashObj* pUserAliasSet, char* a
return;
}
static int32_t addWstartTsToCreateStreamQueryImpl(SSelectStmt* pSelect, SHashObj* pUserAliasSet) {
static int32_t addWstartTsToCreateStreamQueryImpl(STranslateContext* pCxt, SSelectStmt* pSelect,
SHashObj* pUserAliasSet) {
SNode* pProj = nodesListGetNode(pSelect->pProjectionList, 0);
if (NULL == pSelect->pWindow ||
(QUERY_NODE_FUNCTION == nodeType(pProj) && 0 == strcmp("_wstart", ((SFunctionNode*)pProj)->functionName))) {
......@@ -5539,7 +5540,10 @@ static int32_t addWstartTsToCreateStreamQueryImpl(SSelectStmt* pSelect, SHashObj
}
strcpy(pFunc->functionName, "_wstart");
getStreamQueryFirstProjectAliasName(pUserAliasSet, pFunc->node.aliasName, sizeof(pFunc->node.aliasName));
int32_t code = nodesListPushFront(pSelect->pProjectionList, (SNode*)pFunc);
int32_t code = getFuncInfo(pCxt, pFunc);
if (TSDB_CODE_SUCCESS == code) {
code = nodesListPushFront(pSelect->pProjectionList, (SNode*)pFunc);
}
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyNode((SNode*)pFunc);
}
......@@ -5551,7 +5555,7 @@ static int32_t addWstartTsToCreateStreamQuery(STranslateContext* pCxt, SNode* pS
SHashObj* pUserAliasSet = NULL;
int32_t code = checkProjectAlias(pCxt, pSelect->pProjectionList, &pUserAliasSet);
if (TSDB_CODE_SUCCESS == code) {
code = addWstartTsToCreateStreamQueryImpl(pSelect, pUserAliasSet);
code = addWstartTsToCreateStreamQueryImpl(pCxt, pSelect, pUserAliasSet);
}
taosHashCleanup(pUserAliasSet);
return code;
......@@ -5664,12 +5668,12 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm
static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) {
pCxt->createStream = true;
int32_t code = addWstartTsToCreateStreamQuery(pCxt, pStmt->pQuery);
int32_t code = addSubtableInfoToCreateStreamQuery(pCxt, pStmt);
if (TSDB_CODE_SUCCESS == code) {
code = addSubtableInfoToCreateStreamQuery(pCxt, pStmt);
code = translateQuery(pCxt, pStmt->pQuery);
}
if (TSDB_CODE_SUCCESS == code) {
code = translateQuery(pCxt, pStmt->pQuery);
code = addWstartTsToCreateStreamQuery(pCxt, pStmt->pQuery);
}
if (TSDB_CODE_SUCCESS == code) {
code = checkStreamQuery(pCxt, pStmt);
......
......@@ -1252,6 +1252,9 @@ void syncNodePreClose(SSyncNode* pSyncNode) {
// stop heartbeat timer
syncNodeStopHeartbeatTimer(pSyncNode);
// clean rsp
syncRespCleanRsp(pSyncNode->pSyncRespMgr);
}
void syncHbTimerDataFree(SSyncHbTimerData* pData) { taosMemoryFree(pData); }
......@@ -2667,16 +2670,12 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn
}
int32_t code = syncNodeAppend(ths, pEntry);
if (code < 0) {
sNError(ths, "failed to append blocking msg");
}
return code;
} else {
syncEntryDestroy(pEntry);
pEntry = NULL;
return -1;
}
return -1;
}
int32_t syncNodeOnClientRequestOld(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册