提交 d7c30d6e 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into enh/row_optimize2

......@@ -101,6 +101,7 @@ tests/examples/JDBC/JDBCDemo/.settings/
source/libs/parser/inc/sql.*
tests/script/tmqResult.txt
tests/tmqResult.txt
tests/script/jenkins/basic.txt
# Emacs
# -*- mode: gitignore; -*-
......
......@@ -1629,7 +1629,6 @@ int32_t tSerializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq);
int32_t tDeserializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq);
void tFreeSSubQueryMsg(SSubQueryMsg *pReq);
typedef struct {
SMsgHead header;
uint64_t sId;
......@@ -1667,6 +1666,10 @@ typedef struct {
int32_t execId;
} SResFetchReq;
int32_t tSerializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq);
int32_t tDeserializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq);
typedef struct {
SMsgHead header;
uint64_t sId;
......@@ -2948,6 +2951,10 @@ typedef struct {
STqOffsetVal reqOffset;
} SMqPollReq;
int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq);
int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq);
typedef struct {
int32_t vgId;
int64_t offset;
......
......@@ -1461,12 +1461,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
return code;
}
SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
SMqPollReq* pReq = taosMemoryCalloc(1, sizeof(SMqPollReq));
if (pReq == NULL) {
return NULL;
}
void tmqBuildConsumeReqImpl(SMqPollReq *pReq, tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
/*strcpy(pReq->topic, pTopic->topicName);*/
/*strcpy(pReq->cgroup, tmq->groupId);*/
......@@ -1485,9 +1480,7 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic*
pReq->useSnapshot = tmq->useSnapshot;
pReq->head.vgId = htonl(pVg->vgId);
pReq->head.contLen = htonl(sizeof(SMqPollReq));
return pReq;
pReq->head.vgId = pVg->vgId;
}
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
......@@ -1559,15 +1552,32 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
#endif
}
atomic_store_32(&pVg->vgSkipCnt, 0);
SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, timeout, pTopic, pVg);
if (pReq == NULL) {
SMqPollReq req = {0};
tmqBuildConsumeReqImpl(&req, tmq, timeout, pTopic, pVg);
int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
if (msgSize < 0) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
tsem_post(&tmq->rspSem);
return -1;
}
char *msg = taosMemoryCalloc(1, msgSize);
if (NULL == msg) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
tsem_post(&tmq->rspSem);
return -1;
}
if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) {
taosMemoryFree(msg);
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
tsem_post(&tmq->rspSem);
return -1;
}
SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
if (pParam == NULL) {
taosMemoryFree(pReq);
taosMemoryFree(msg);
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
tsem_post(&tmq->rspSem);
return -1;
......@@ -1581,7 +1591,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (sendInfo == NULL) {
taosMemoryFree(pReq);
taosMemoryFree(msg);
taosMemoryFree(pParam);
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
tsem_post(&tmq->rspSem);
......@@ -1589,11 +1599,11 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
}
sendInfo->msgInfo = (SDataBuf){
.pData = pReq,
.len = sizeof(SMqPollReq),
.pData = msg,
.len = msgSize,
.handle = NULL,
};
sendInfo->requestId = pReq->reqId;
sendInfo->requestId = req.reqId;
sendInfo->requestObjRefId = 0;
sendInfo->param = pParam;
sendInfo->fp = tmqPollCb;
......@@ -1605,7 +1615,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
char offsetFormatBuf[80];
tFormatOffset(offsetFormatBuf, 80, &pVg->currentOffset);
tscDebug("consumer:%" PRId64 ", send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:%" PRIu64,
tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, pReq->reqId);
tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId);
/*printf("send vgId:%d %" PRId64 "\n", pVg->vgId, pVg->currentOffset);*/
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
pVg->pollCnt++;
......
......@@ -4723,6 +4723,139 @@ void tFreeSSubQueryMsg(SSubQueryMsg *pReq) {
taosMemoryFreeClear(pReq->msg);
}
int32_t tSerializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq) {
int32_t headLen = sizeof(SMsgHead);
if (buf != NULL) {
buf = (char *)buf + headLen;
bufLen -= headLen;
}
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeU64(&encoder, pReq->sId) < 0) return -1;
if (tEncodeU64(&encoder, pReq->queryId) < 0) return -1;
if (tEncodeU64(&encoder, pReq->taskId) < 0) return -1;
if (tEncodeI32(&encoder, pReq->execId) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
if (buf != NULL) {
SMsgHead *pHead = (SMsgHead *)((char *)buf - headLen);
pHead->vgId = htonl(pReq->header.vgId);
pHead->contLen = htonl(tlen + headLen);
}
return tlen + headLen;
}
int32_t tDeserializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq) {
int32_t headLen = sizeof(SMsgHead);
SMsgHead *pHead = buf;
pHead->vgId = pReq->header.vgId;
pHead->contLen = pReq->header.contLen;
SDecoder decoder = {0};
tDecoderInit(&decoder, (char *)buf + headLen, bufLen - headLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeU64(&decoder, &pReq->sId) < 0) return -1;
if (tDecodeU64(&decoder, &pReq->queryId) < 0) return -1;
if (tDecodeU64(&decoder, &pReq->taskId) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->execId) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSTqOffsetVal(SEncoder *pEncoder, STqOffsetVal *pOffset) {
if (tEncodeI8(pEncoder, pOffset->type) < 0) return -1;
if (tEncodeI64(pEncoder, pOffset->uid) < 0) return -1;
if (tEncodeI64(pEncoder, pOffset->ts) < 0) return -1;
return 0;
}
int32_t tDerializeSTqOffsetVal(SDecoder *pDecoder, STqOffsetVal *pOffset) {
if (tDecodeI8(pDecoder, &pOffset->type) < 0) return -1;
if (tDecodeI64(pDecoder, &pOffset->uid) < 0) return -1;
if (tDecodeI64(pDecoder, &pOffset->ts) < 0) return -1;
return 0;
}
int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) {
int32_t headLen = sizeof(SMsgHead);
if (buf != NULL) {
buf = (char *)buf + headLen;
bufLen -= headLen;
}
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->subKey) < 0) return -1;
if (tEncodeI8(&encoder, pReq->withTbName) < 0) return -1;
if (tEncodeI8(&encoder, pReq->useSnapshot) < 0) return -1;
if (tEncodeI32(&encoder, pReq->epoch) < 0) return -1;
if (tEncodeU64(&encoder, pReq->reqId) < 0) return -1;
if (tEncodeI64(&encoder, pReq->consumerId) < 0) return -1;
if (tEncodeI64(&encoder, pReq->timeout) < 0) return -1;
if (tSerializeSTqOffsetVal(&encoder, &pReq->reqOffset) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
if (buf != NULL) {
SMsgHead *pHead = (SMsgHead *)((char *)buf - headLen);
pHead->vgId = htonl(pReq->head.vgId);
pHead->contLen = htonl(tlen + headLen);
}
return tlen + headLen;
}
int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) {
int32_t headLen = sizeof(SMsgHead);
SMsgHead *pHead = buf;
pHead->vgId = pReq->head.vgId;
pHead->contLen = pReq->head.contLen;
SDecoder decoder = {0};
tDecoderInit(&decoder, (char *)buf + headLen, bufLen - headLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->subKey) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->withTbName) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->useSnapshot) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->epoch) < 0) return -1;
if (tDecodeU64(&decoder, &pReq->reqId) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->consumerId) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->timeout) < 0) return -1;
if (tDerializeSTqOffsetVal(&decoder, &pReq->reqOffset) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq) {
int32_t headLen = sizeof(SMsgHead);
if (buf != NULL) {
......
......@@ -458,20 +458,26 @@ static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, const SMqPollReq* pReq) {
}
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
SMqPollReq* pReq = pMsg->pCont;
int64_t consumerId = pReq->consumerId;
int32_t reqEpoch = pReq->epoch;
SMqPollReq req = {0};
int32_t code = 0;
STqOffsetVal reqOffset = pReq->reqOffset;
STqOffsetVal fetchOffsetNew;
SWalCkHead* pCkHead = NULL;
if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
return -1;
}
int64_t consumerId = req.consumerId;
int32_t reqEpoch = req.epoch;
STqOffsetVal reqOffset = req.reqOffset;
// 1.find handle
STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
/*ASSERT(pHandle);*/
if (pHandle == NULL) {
tqError("tmq poll: no consumer handle for consumer:%" PRId64 ", in vgId:%d, subkey %s", consumerId,
TD_VID(pTq->pVnode), pReq->subKey);
TD_VID(pTq->pVnode), req.subKey);
return -1;
}
......@@ -479,7 +485,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
if (pHandle->consumerId != consumerId) {
tqError("tmq poll: consumer handle mismatch for consumer:%" PRId64
", in vgId:%d, subkey %s, handle consumer id %" PRId64,
consumerId, TD_VID(pTq->pVnode), pReq->subKey, pHandle->consumerId);
consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
return -1;
}
......@@ -493,13 +499,13 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
char buf[80];
tFormatOffset(buf, 80, &reqOffset);
tqDebug("tmq poll: consumer %" PRId64 " (epoch %d), subkey %s, recv poll req in vg %d, req offset %s", consumerId,
pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf);
req.epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf);
// 2.reset offset if needed
if (reqOffset.type > 0) {
fetchOffsetNew = reqOffset;
} else {
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pReq->subKey);
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, req.subKey);
if (pOffset != NULL) {
fetchOffsetNew = pOffset->val;
char formatBuf[80];
......@@ -508,7 +514,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
TD_VID(pTq->pVnode), formatBuf);
} else {
if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
if (pReq->useSnapshot) {
if (req.useSnapshot) {
if (pHandle->fetchMeta) {
tqOffsetResetToMeta(&fetchOffsetNew, 0);
} else {
......@@ -520,21 +526,21 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType);
tqInitDataRsp(&dataRsp, &req, pHandle->execHandle.subType);
tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
tqDebug("tmq poll: consumer %" PRId64 ", subkey %s, vg %d, offset reset to %" PRId64, consumerId,
pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.rspOffset.version);
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
if (tqSendDataRsp(pTq, pMsg, &req, &dataRsp) < 0) {
code = -1;
}
tDeleteSMqDataRsp(&dataRsp);
return code;
} else {
STaosxRsp taosxRsp = {0};
tqInitTaosxRsp(&taosxRsp, pReq);
tqInitTaosxRsp(&taosxRsp, &req);
tqOffsetResetToLog(&taosxRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
if (tqSendTaosxRsp(pTq, pMsg, pReq, &taosxRsp) < 0) {
if (tqSendTaosxRsp(pTq, pMsg, &req, &taosxRsp) < 0) {
code = -1;
}
tDeleteSTaosxRsp(&taosxRsp);
......@@ -543,7 +549,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
} else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
tqError("tmq poll: subkey %s, no offset committed for consumer %" PRId64
" in vg %d, subkey %s, reset none failed",
pHandle->subKey, consumerId, TD_VID(pTq->pVnode), pReq->subKey);
pHandle->subKey, consumerId, TD_VID(pTq->pVnode), req.subKey);
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
return -1;
}
......@@ -552,7 +558,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType);
tqInitDataRsp(&dataRsp, &req, pHandle->execHandle.subType);
// lock
taosWLockLatch(&pTq->pushLock);
tqScanData(pTq, pHandle, &dataRsp, &fetchOffsetNew);
......@@ -580,7 +586,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
#endif
taosWUnLockLatch(&pTq->pushLock);
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
if (tqSendDataRsp(pTq, pMsg, &req, &dataRsp) < 0) {
code = -1;
}
......@@ -599,13 +605,13 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
SMqMetaRsp metaRsp = {0};
STaosxRsp taosxRsp = {0};
tqInitTaosxRsp(&taosxRsp, pReq);
tqInitTaosxRsp(&taosxRsp, &req);
if (fetchOffsetNew.type != TMQ_OFFSET__LOG) {
tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, &fetchOffsetNew);
if (metaRsp.metaRspLen > 0) {
if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) {
if (tqSendMetaPollRsp(pTq, pMsg, &req, &metaRsp) < 0) {
code = -1;
}
tqDebug("tmq poll: consumer %" PRId64 ", subkey %s, vg %d, send meta offset type:%d,uid:%" PRId64
......@@ -618,7 +624,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
}
if (taosxRsp.blockNum > 0) {
if (tqSendTaosxRsp(pTq, pMsg, pReq, &taosxRsp) < 0) {
if (tqSendTaosxRsp(pTq, pMsg, &req, &taosxRsp) < 0) {
code = -1;
}
tDeleteSTaosxRsp(&taosxRsp);
......@@ -648,13 +654,13 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
if (consumerEpoch > reqEpoch) {
tqWarn("tmq poll: consumer %" PRId64 " (epoch %d), subkey %s, vg %d offset %" PRId64
", found new consumer epoch %d, discard req epoch %d",
consumerId, pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, consumerEpoch, reqEpoch);
consumerId, req.epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, consumerEpoch, reqEpoch);
break;
}
if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead) < 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
if (tqSendTaosxRsp(pTq, pMsg, pReq, &taosxRsp) < 0) {
if (tqSendTaosxRsp(pTq, pMsg, &req, &taosxRsp) < 0) {
code = -1;
}
tDeleteSTaosxRsp(&taosxRsp);
......@@ -665,7 +671,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
SWalCont* pHead = &pCkHead->head;
tqDebug("tmq poll: consumer:%" PRId64 ", (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", consumerId,
pReq->epoch, TD_VID(pTq->pVnode), fetchVer, pHead->msgType);
req.epoch, TD_VID(pTq->pVnode), fetchVer, pHead->msgType);
if (pHead->msgType == TDMT_VND_SUBMIT) {
SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
......@@ -674,7 +680,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
}
if (taosxRsp.blockNum > 0 /* threshold */) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
if (tqSendTaosxRsp(pTq, pMsg, pReq, &taosxRsp) < 0) {
if (tqSendTaosxRsp(pTq, pMsg, &req, &taosxRsp) < 0) {
code = -1;
}
tDeleteSTaosxRsp(&taosxRsp);
......@@ -692,7 +698,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
metaRsp.resMsgType = pHead->msgType;
metaRsp.metaRspLen = pHead->bodyLen;
metaRsp.metaRsp = pHead->body;
if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) {
if (tqSendMetaPollRsp(pTq, pMsg, &req, &metaRsp) < 0) {
code = -1;
taosMemoryFreeClear(pCkHead);
tDeleteSTaosxRsp(&taosxRsp);
......
......@@ -416,27 +416,42 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
loadRemoteDataCallback(pWrapper, &pBuf, code);
taosMemoryFree(pWrapper);
} else {
SResFetchReq* pMsg = taosMemoryCalloc(1, sizeof(SResFetchReq));
if (NULL == pMsg) {
SResFetchReq req = {0};
req.header.vgId = pSource->addr.nodeId;
req.sId = pSource->schedId;
req.taskId = pSource->taskId;
req.queryId = pTaskInfo->id.queryId;
req.execId = pSource->execId;
int32_t msgSize = tSerializeSResFetchReq(NULL, 0, &req);
if (msgSize < 0) {
pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
taosMemoryFree(pWrapper);
return pTaskInfo->code;
}
void* msg = taosMemoryCalloc(1, msgSize);
if (NULL == msg) {
pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
taosMemoryFree(pWrapper);
return pTaskInfo->code;
}
if (tSerializeSResFetchReq(msg, msgSize, &req) < 0) {
pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
taosMemoryFree(pWrapper);
taosMemoryFree(msg);
return pTaskInfo->code;
}
qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", execId:%d, %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId,
pSource->execId, sourceIndex, totalSources);
pMsg->header.vgId = htonl(pSource->addr.nodeId);
pMsg->sId = htobe64(pSource->schedId);
pMsg->taskId = htobe64(pSource->taskId);
pMsg->queryId = htobe64(pTaskInfo->id.queryId);
pMsg->execId = htonl(pSource->execId);
// send the fetch remote task result reques
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (NULL == pMsgSendInfo) {
taosMemoryFreeClear(pMsg);
taosMemoryFreeClear(msg);
taosMemoryFree(pWrapper);
qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
......@@ -445,8 +460,8 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
pMsgSendInfo->param = pWrapper;
pMsgSendInfo->paramFreeFp = taosMemoryFree;
pMsgSendInfo->msgInfo.pData = pMsg;
pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq);
pMsgSendInfo->msgInfo.pData = msg;
pMsgSendInfo->msgInfo.len = msgSize;
pMsgSendInfo->msgType = pSource->fetchMsgType;
pMsgSendInfo->fp = loadRemoteDataCallback;
......
......@@ -5672,12 +5672,12 @@ int32_t modeFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElems = 0;
int32_t startOffset = pCtx->offset;
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
char* data = colDataGetData(pInputCol, i);
if (colDataIsNull_s(pInputCol, i)) {
continue;
}
numOfElems++;
char* data = colDataGetData(pInputCol, i);
doModeAdd(pInfo, i, pCtx, data);
if (sizeof(SModeInfo) + pInfo->numOfPoints * (sizeof(SModeItem) + pInfo->colBytes) >= MODE_MAX_RESULT_SIZE) {
......
......@@ -172,8 +172,8 @@ static int32_t parseDuplicateUsingClause(SInsertParseContext* pCxt, SVnodeModifO
}
// pStmt->pSql -> field1_name, ...)
static int32_t parseBoundColumns(SInsertParseContext* pCxt, const char** pSql, SParsedDataColInfo* pColList,
SSchema* pSchema) {
static int32_t parseBoundColumns(SInsertParseContext* pCxt, const char** pSql, bool isTags,
SParsedDataColInfo* pColList, SSchema* pSchema) {
col_id_t nCols = pColList->numOfCols;
pColList->numOfBound = 0;
......@@ -227,6 +227,10 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, const char** pSql, S
}
}
if (!isTags && pColList->cols[0].valStat == VAL_STAT_NONE) {
return buildInvalidOperationMsg(&pCxt->msg, "primary timestamp column can not be null");
}
pColList->orderStatus = isOrdered ? ORDER_STATUS_ORDERED : ORDER_STATUS_DISORDERED;
if (!isOrdered) {
......@@ -525,7 +529,7 @@ static int32_t parseBoundTagsClause(SInsertParseContext* pCxt, SVnodeModifOpStmt
}
pStmt->pSql += index;
return parseBoundColumns(pCxt, &pStmt->pSql, &pCxt->tags, pTagsSchema);
return parseBoundColumns(pCxt, &pStmt->pSql, true, &pCxt->tags, pTagsSchema);
}
static int32_t parseTagValue(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, SSchema* pTagSchema, SToken* pToken,
......@@ -792,6 +796,8 @@ static int32_t getTableMeta(SInsertParseContext* pCxt, SName* pTbName, bool isSt
*pMissCache = true;
} else if (isStb && TSDB_SUPER_TABLE != (*pTableMeta)->tableType) {
code = buildInvalidOperationMsg(&pCxt->msg, "create table only from super table is allowed");
} else if (!isStb && TSDB_SUPER_TABLE == (*pTableMeta)->tableType) {
code = buildInvalidOperationMsg(&pCxt->msg, "insert data into super table is not supported");
}
}
return code;
......@@ -935,11 +941,12 @@ static int32_t parseBoundColumnsClause(SInsertParseContext* pCxt, SVnodeModifOpS
return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", token.z);
}
// pStmt->pSql -> field1_name, ...)
return parseBoundColumns(pCxt, &pStmt->pSql, &pDataBuf->boundColumnInfo, getTableColumnSchema(pStmt->pTableMeta));
return parseBoundColumns(pCxt, &pStmt->pSql, false, &pDataBuf->boundColumnInfo,
getTableColumnSchema(pStmt->pTableMeta));
}
if (NULL != pStmt->pBoundCols) {
return parseBoundColumns(pCxt, &pStmt->pBoundCols, &pDataBuf->boundColumnInfo,
return parseBoundColumns(pCxt, &pStmt->pBoundCols, false, &pDataBuf->boundColumnInfo,
getTableColumnSchema(pStmt->pTableMeta));
}
......@@ -1571,16 +1578,16 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt, SVnodeModifOpStmt* pSt
static void destroySubTableHashElem(void* p) { taosMemoryFree(*(STableMeta**)p); }
static int32_t createVnodeModifOpStmt(SParseContext* pCxt, bool reentry, SNode** pOutput) {
static int32_t createVnodeModifOpStmt(SInsertParseContext* pCxt, bool reentry, SNode** pOutput) {
SVnodeModifOpStmt* pStmt = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
if (NULL == pStmt) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (pCxt->pStmtCb) {
if (pCxt->pComCxt->pStmtCb) {
TSDB_QUERY_SET_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_STMT_INSERT);
}
pStmt->pSql = pCxt->pSql;
pStmt->pSql = pCxt->pComCxt->pSql;
pStmt->freeHashFunc = insDestroyBlockHashmap;
pStmt->freeArrayFunc = insDestroyBlockArrayList;
......@@ -1604,7 +1611,7 @@ static int32_t createVnodeModifOpStmt(SParseContext* pCxt, bool reentry, SNode**
return TSDB_CODE_SUCCESS;
}
static int32_t createInsertQuery(SParseContext* pCxt, SQuery** pOutput) {
static int32_t createInsertQuery(SInsertParseContext* pCxt, SQuery** pOutput) {
SQuery* pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
if (NULL == pQuery) {
return TSDB_CODE_OUT_OF_MEMORY;
......@@ -1667,11 +1674,15 @@ static int32_t getTableVgroupFromMetaData(const SArray* pTables, SVnodeModifOpSt
sizeof(SVgroupInfo));
}
static int32_t getTableSchemaFromMetaData(const SMetaData* pMetaData, SVnodeModifOpStmt* pStmt, bool isStb) {
static int32_t getTableSchemaFromMetaData(SInsertParseContext* pCxt, const SMetaData* pMetaData,
SVnodeModifOpStmt* pStmt, bool isStb) {
int32_t code = checkAuthFromMetaData(pMetaData->pUser);
if (TSDB_CODE_SUCCESS == code) {
code = getTableMetaFromMetaData(pMetaData->pTableMeta, &pStmt->pTableMeta);
}
if (TSDB_CODE_SUCCESS == code && !isStb && TSDB_SUPER_TABLE == pStmt->pTableMeta->tableType) {
code = buildInvalidOperationMsg(&pCxt->msg, "insert data into super table is not supported");
}
if (TSDB_CODE_SUCCESS == code) {
code = getTableVgroupFromMetaData(pMetaData->pTableHash, pStmt, isStb);
}
......@@ -1696,24 +1707,25 @@ static void clearCatalogReq(SCatalogReq* pCatalogReq) {
pCatalogReq->pUser = NULL;
}
static int32_t setVnodeModifOpStmt(SParseContext* pCxt, SCatalogReq* pCatalogReq, const SMetaData* pMetaData,
static int32_t setVnodeModifOpStmt(SInsertParseContext* pCxt, SCatalogReq* pCatalogReq, const SMetaData* pMetaData,
SVnodeModifOpStmt* pStmt) {
clearCatalogReq(pCatalogReq);
if (pStmt->usingTableProcessing) {
return getTableSchemaFromMetaData(pMetaData, pStmt, true);
return getTableSchemaFromMetaData(pCxt, pMetaData, pStmt, true);
}
return getTableSchemaFromMetaData(pMetaData, pStmt, false);
return getTableSchemaFromMetaData(pCxt, pMetaData, pStmt, false);
}
static int32_t resetVnodeModifOpStmt(SParseContext* pCxt, SQuery* pQuery) {
static int32_t resetVnodeModifOpStmt(SInsertParseContext* pCxt, SQuery* pQuery) {
nodesDestroyNode(pQuery->pRoot);
int32_t code = createVnodeModifOpStmt(pCxt, true, &pQuery->pRoot);
if (TSDB_CODE_SUCCESS == code) {
SVnodeModifOpStmt* pStmt = (SVnodeModifOpStmt*)pQuery->pRoot;
(*pCxt->pStmtCb->getExecInfoFn)(pCxt->pStmtCb->pStmt, &pStmt->pVgroupsHashObj, &pStmt->pTableBlockHashObj);
(*pCxt->pComCxt->pStmtCb->getExecInfoFn)(pCxt->pComCxt->pStmtCb->pStmt, &pStmt->pVgroupsHashObj,
&pStmt->pTableBlockHashObj);
if (NULL == pStmt->pVgroupsHashObj) {
pStmt->pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
}
......@@ -1729,13 +1741,13 @@ static int32_t resetVnodeModifOpStmt(SParseContext* pCxt, SQuery* pQuery) {
return code;
}
static int32_t initInsertQuery(SParseContext* pCxt, SCatalogReq* pCatalogReq, const SMetaData* pMetaData,
static int32_t initInsertQuery(SInsertParseContext* pCxt, SCatalogReq* pCatalogReq, const SMetaData* pMetaData,
SQuery** pQuery) {
if (NULL == *pQuery) {
return createInsertQuery(pCxt, pQuery);
}
if (NULL != pCxt->pStmtCb) {
if (NULL != pCxt->pComCxt->pStmtCb) {
return resetVnodeModifOpStmt(pCxt, *pQuery);
}
......@@ -1896,7 +1908,7 @@ int32_t parseInsertSql(SParseContext* pCxt, SQuery** pQuery, SCatalogReq* pCatal
.usingDuplicateTable = false,
};
int32_t code = initInsertQuery(pCxt, pCatalogReq, pMetaData, pQuery);
int32_t code = initInsertQuery(&context, pCatalogReq, pMetaData, pQuery);
if (TSDB_CODE_SUCCESS == code) {
code = parseInsertSqlImpl(&context, (SVnodeModifOpStmt*)(*pQuery)->pRoot);
}
......
......@@ -499,27 +499,22 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
return TSDB_CODE_QRY_INVALID_INPUT;
}
SResFetchReq *msg = pMsg->pCont;
SResFetchReq req = {0};
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
QW_STAT_INC(mgmt->stat.msgStat.fetchProcessed, 1);
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
QW_ELOG("invalid fetch msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
if (tDeserializeSResFetchReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
QW_ELOG("tDeserializeSResFetchReq %d failed", pMsg->contLen);
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
msg->sId = be64toh(msg->sId);
msg->queryId = be64toh(msg->queryId);
msg->taskId = be64toh(msg->taskId);
msg->execId = ntohl(msg->execId);
uint64_t sId = msg->sId;
uint64_t qId = msg->queryId;
uint64_t tId = msg->taskId;
uint64_t sId = req.sId;
uint64_t qId = req.queryId;
uint64_t tId = req.taskId;
int64_t rId = 0;
int32_t eId = msg->execId;
int32_t eId = req.execId;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info, .msgType = pMsg->msgType};
......
......@@ -1083,22 +1083,29 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
}
case TDMT_SCH_FETCH:
case TDMT_SCH_MERGE_FETCH: {
msgSize = sizeof(SResFetchReq);
SResFetchReq req = {0};
req.header.vgId = addr->nodeId;
req.sId = schMgmt.sId;
req.queryId = pJob->queryId;
req.taskId = pTask->taskId;
req.execId = pTask->execId;
msgSize = tSerializeSResFetchReq(NULL, 0, &req);
if (msgSize < 0) {
SCH_TASK_ELOG("tSerializeSResFetchReq get size, msgSize:%d", msgSize);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
msg = taosMemoryCalloc(1, msgSize);
if (NULL == msg) {
SCH_TASK_ELOG("calloc %d failed", msgSize);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SResFetchReq *pMsg = msg;
pMsg->header.vgId = htonl(addr->nodeId);
pMsg->sId = htobe64(schMgmt.sId);
pMsg->queryId = htobe64(pJob->queryId);
pMsg->taskId = htobe64(pTask->taskId);
pMsg->execId = htonl(pTask->execId);
if (tSerializeSResFetchReq(msg, msgSize, &req) < 0) {
SCH_TASK_ELOG("tSerializeSResFetchReq %d failed", msgSize);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
break;
}
case TDMT_SCH_DROP_TASK: {
......
......@@ -171,7 +171,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
pSyncNode->pLogStore->syncLogUpdateCommitIndex(pSyncNode->pLogStore, pSyncNode->commitIndex);
// execute fsm
if (pSyncNode->pFsm != NULL) {
if (pSyncNode != NULL && pSyncNode->pFsm != NULL) {
int32_t code = syncNodeDoCommit(pSyncNode, beginIndex, endIndex, pSyncNode->state);
if (code != 0) {
sNError(pSyncNode, "advance commit index error, do commit begin:%" PRId64 ", end:%" PRId64, beginIndex,
......
......@@ -64,7 +64,7 @@ SSyncRaftEntry* syncEntryBuildFromRpcMsg(const SRpcMsg* pMsg, SyncTerm term, Syn
}
SSyncRaftEntry* syncEntryBuildFromAppendEntries(const SyncAppendEntries* pMsg) {
SSyncRaftEntry* pEntry = syncEntryBuild(pMsg->dataLen);
SSyncRaftEntry* pEntry = syncEntryBuild((int32_t)(pMsg->dataLen));
if (pEntry == NULL) return NULL;
memcpy(pEntry, pMsg->data, pMsg->dataLen);
......@@ -91,15 +91,14 @@ SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index, int32_t vgId)
void syncEntryDestory(SSyncRaftEntry* pEntry) {
if (pEntry != NULL) {
taosMemoryFree(pEntry);
sTrace("free entry: %p", pEntry);
taosMemoryFree(pEntry);
}
}
void syncEntry2OriginalRpc(const SSyncRaftEntry* pEntry, SRpcMsg* pRpcMsg) {
pRpcMsg->msgType = pEntry->originalRpcType;
pRpcMsg->contLen = pEntry->dataLen;
pRpcMsg->contLen = (int32_t)(pEntry->dataLen);
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
memcpy(pRpcMsg->pCont, pEntry->data, pRpcMsg->contLen);
}
......@@ -339,7 +338,8 @@ int32_t raftEntryCacheGetEntry(struct SRaftEntryCache* pCache, SyncIndex index,
SSyncRaftEntry* pEntry = NULL;
int32_t code = raftEntryCacheGetEntryP(pCache, index, &pEntry);
if (code == 1) {
*ppEntry = taosMemoryMalloc((int64_t)(pEntry->bytes));
int32_t bytes = (int32_t)pEntry->bytes;
*ppEntry = taosMemoryMalloc((int64_t)bytes);
memcpy(*ppEntry, pEntry, pEntry->bytes);
(*ppEntry)->rid = -1;
} else {
......
......@@ -91,7 +91,7 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapsh
if (code == 0) {
ASSERT(pEntry != NULL);
code = syncBuildAppendEntries(&rpcMsg, pEntry->bytes, pSyncNode->vgId);
code = syncBuildAppendEntries(&rpcMsg, (int32_t)(pEntry->bytes), pSyncNode->vgId);
ASSERT(code == 0);
pMsg = rpcMsg.pCont;
......
......@@ -229,7 +229,10 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo
int32_t writeLen = vsnprintf(eventLog, sizeof(eventLog), format, argpointer);
va_end(argpointer);
int32_t aqItems = pNode->pFsm->FpApplyQueueItems(pNode->pFsm);
int32_t aqItems = 0;
if (pNode != NULL && pNode->pFsm != NULL && pNode->pFsm->FpApplyQueueItems != NULL) {
aqItems = pNode->pFsm->FpApplyQueueItems(pNode->pFsm);
}
// restore error code
terrno = errCode;
......
......@@ -445,9 +445,11 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) {
if (pCtx == NULL || pCtx->pSem == NULL) {
if (transMsg.info.ahandle == NULL) {
if (pMsg == NULL || REQUEST_NO_RESP(&pMsg->msg) || pMsg->type == Release) destroyCmsg(pMsg);
once = true;
continue;
if (pMsg == NULL || REQUEST_NO_RESP(&pMsg->msg) || pMsg->type == Release) {
destroyCmsg(pMsg);
once = true;
continue;
}
}
}
......@@ -1217,6 +1219,7 @@ static FORCE_INLINE void destroyCmsg(void* arg) {
if (pMsg == NULL) {
return;
}
transDestroyConnCtx(pMsg->ctx);
destroyUserdata(&pMsg->msg);
taosMemoryFree(pMsg);
......
此差异已折叠。
......@@ -11,6 +11,9 @@ set -e
VALGRIND=0
LOG_BK_DIR=/data/valgrind_log_backup # 192.168.0.203
SIM_FILES=./jenkins/basic.txt
cases_task_file=../parallel_test/cases.task
cat $cases_task_file | grep "./test.sh " | awk -F, '{print $5}' > $SIM_FILES
while getopts "v:r:f:" arg
do
......@@ -21,9 +24,9 @@ do
r)
LOG_BK_DIR=$(echo $OPTARG)
;;
f)
SIM_FILES=$(echo $OPTARG)
;;
#f)
# SIM_FILES=$(echo $OPTARG)
# ;;
?) #unknow option
echo "unkonw argument"
exit 1
......
......@@ -21,15 +21,24 @@ LOG_DIR=$TAOS_DIR/sim/asan
error_num=`cat ${LOG_DIR}/*.asan | grep "ERROR" | wc -l`
memory_leak=`cat ${LOG_DIR}/*.asan | grep "Direct leak" | wc -l`
indirect_leak=`cat ${LOG_DIR}/*.asan | grep "Indirect leak" | wc -l`
python_error=`cat ${LOG_DIR}/*.info | grep -w "stack" | wc -l`
# ignore
# TD-20368
# /root/TDengine/contrib/zlib/trees.c:873:5: runtime error: null pointer passed as argument 2, which is declared to never be null
# TD-20494 TD-20452
# /root/TDengine/source/libs/scalar/src/sclfunc.c:735:11: runtime error: 4.75783e+11 is outside the range of representable values of type 'signed char'
# /root/TDengine/source/libs/scalar/src/sclfunc.c:790:11: runtime error: 3.4e+38 is outside the range of representable values of type 'long int'
# /root/TDengine/source/libs/scalar/src/sclfunc.c:772:11: runtime error: 3.52344e+09 is outside the range of representable values of type 'int'
# /root/TDengine/source/libs/scalar/src/sclfunc.c:753:11: runtime error: 4.75783e+11 is outside the range of representable values of type 'short int'
runtime_error=`cat ${LOG_DIR}/*.asan | grep "runtime error" | grep -v "trees.c:873" | grep -v "sclfunc.c.*outside the range of representable values of type" | wc -l`
python_error=`cat ${LOG_DIR}/*.info | grep -w "stack" | wc -l`
# TD-20569
# /root/TDengine/source/libs/function/src/builtinsimpl.c:856:29: runtime error: signed integer overflow: 9223372036854775806 + 9223372036854775805 cannot be represented in type 'long int'
# /root/TDengine/source/libs/scalar/src/sclvector.c:1075:66: runtime error: signed integer overflow: 9223372034707292160 + 1668838476672 cannot be represented in type 'long int'
runtime_error=`cat ${LOG_DIR}/*.asan | grep "runtime error" | grep -v "trees.c:873" | grep -v "sclfunc.c.*outside the range of representable values of type"| grep -v "builtinsimpl.c.*signed integer overflow"| grep -v "sclvector.c.*signed integer overflow" | wc -l`
echo -e "\033[44;32;1m"asan error_num: $error_num"\033[0m"
echo -e "\033[44;32;1m"asan memory_leak: $memory_leak"\033[0m"
......
......@@ -26,3 +26,17 @@ while [ -n "$PID" ]; do
fi
PID=`ps -ef|grep -w taosd | grep -v grep | awk '{print $2}'`
done
PID=`ps -ef|grep -w taos | grep -v grep | awk '{print $2}'`
while [ -n "$PID" ]; do
echo kill -9 $PID
#pkill -9 taosd
kill -9 $PID
echo "Killing taosd processes"
if [ "$OS_TYPE" != "Darwin" ]; then
fuser -k -n tcp 6030
else
lsof -nti:6030 | xargs kill -9
fi
PID=`ps -ef|grep -w taosd | grep -v grep | awk '{print $2}'`
done
......@@ -230,9 +230,11 @@ class TDTestCase:
tdLog.exit('taos -n client fail!')
finally:
if platform.system().lower() == 'windows':
os.system('ps -a | grep taos | awk \'{print $2}\' | xargs kill -9')
tdLog.info("ps -a | grep taos | awk \'{print $2}\' | xargs kill -9")
# os.system('ps -a | grep taos | awk \'{print $2}\' | xargs kill -9')
else:
os.system('pkill -9 taos')
tdLog.info("pkill -9 taos")
# os.system('pkill -9 taos')
def stop(self):
tdSql.close()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册