提交 f4ef8dc7 编写于 作者: S slzhou

Merge branch 'main' of github.com:taosdata/TDengine into szhou/cenc

...@@ -65,6 +65,7 @@ typedef struct STaosQnode { ...@@ -65,6 +65,7 @@ typedef struct STaosQnode {
STaosQnode *next; STaosQnode *next;
STaosQueue *queue; STaosQueue *queue;
int64_t timestamp; int64_t timestamp;
int64_t dataSize;
int32_t size; int32_t size;
int8_t itype; int8_t itype;
int8_t reserved[3]; int8_t reserved[3];
...@@ -103,7 +104,7 @@ typedef struct STaosQall { ...@@ -103,7 +104,7 @@ typedef struct STaosQall {
STaosQueue *taosOpenQueue(); STaosQueue *taosOpenQueue();
void taosCloseQueue(STaosQueue *queue); void taosCloseQueue(STaosQueue *queue);
void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp); void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp);
void *taosAllocateQitem(int32_t size, EQItype itype); void *taosAllocateQitem(int32_t size, EQItype itype, int64_t dataSize);
void taosFreeQitem(void *pItem); void taosFreeQitem(void *pItem);
void taosWriteQitem(STaosQueue *queue, void *pItem); void taosWriteQitem(STaosQueue *queue, void *pItem);
int32_t taosReadQitem(STaosQueue *queue, void **ppItem); int32_t taosReadQitem(STaosQueue *queue, void **ppItem);
......
...@@ -190,8 +190,9 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, ...@@ -190,8 +190,9 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param,
(*pRequest)->body.param = param; (*pRequest)->body.param = param;
STscObj* pTscObj = (*pRequest)->pTscObj; STscObj* pTscObj = (*pRequest)->pTscObj;
if (taosHashPut(pTscObj->pRequests, &(*pRequest)->self, sizeof((*pRequest)->self), &(*pRequest)->self, int32_t err = taosHashPut(pTscObj->pRequests, &(*pRequest)->self, sizeof((*pRequest)->self), &(*pRequest)->self,
sizeof((*pRequest)->self))) { sizeof((*pRequest)->self));
if (err) {
tscError("%" PRId64 " failed to add to request container, reqId:0x%" PRIx64 ", conn:%" PRId64 ", %s", tscError("%" PRId64 " failed to add to request container, reqId:0x%" PRIx64 ", conn:%" PRId64 ", %s",
(*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql); (*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql);
......
...@@ -691,7 +691,7 @@ void tmqAssignAskEpTask(void* param, void* tmrId) { ...@@ -691,7 +691,7 @@ void tmqAssignAskEpTask(void* param, void* tmrId) {
int64_t refId = *(int64_t*)param; int64_t refId = *(int64_t*)param;
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
if (tmq != NULL) { if (tmq != NULL) {
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM); int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
*pTaskType = TMQ_DELAYED_TASK__ASK_EP; *pTaskType = TMQ_DELAYED_TASK__ASK_EP;
taosWriteQitem(tmq->delayedTask, pTaskType); taosWriteQitem(tmq->delayedTask, pTaskType);
tsem_post(&tmq->rspSem); tsem_post(&tmq->rspSem);
...@@ -703,7 +703,7 @@ void tmqAssignDelayedCommitTask(void* param, void* tmrId) { ...@@ -703,7 +703,7 @@ void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
int64_t refId = *(int64_t*)param; int64_t refId = *(int64_t*)param;
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
if (tmq != NULL) { if (tmq != NULL) {
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM); int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
*pTaskType = TMQ_DELAYED_TASK__COMMIT; *pTaskType = TMQ_DELAYED_TASK__COMMIT;
taosWriteQitem(tmq->delayedTask, pTaskType); taosWriteQitem(tmq->delayedTask, pTaskType);
tsem_post(&tmq->rspSem); tsem_post(&tmq->rspSem);
...@@ -715,7 +715,7 @@ void tmqAssignDelayedReportTask(void* param, void* tmrId) { ...@@ -715,7 +715,7 @@ void tmqAssignDelayedReportTask(void* param, void* tmrId) {
int64_t refId = *(int64_t*)param; int64_t refId = *(int64_t*)param;
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
if (tmq != NULL) { if (tmq != NULL) {
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM); int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
*pTaskType = TMQ_DELAYED_TASK__REPORT; *pTaskType = TMQ_DELAYED_TASK__REPORT;
taosWriteQitem(tmq->delayedTask, pTaskType); taosWriteQitem(tmq->delayedTask, pTaskType);
tsem_post(&tmq->rspSem); tsem_post(&tmq->rspSem);
...@@ -1171,7 +1171,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -1171,7 +1171,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
goto CREATE_MSG_FAIL; goto CREATE_MSG_FAIL;
} }
if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) { if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM); SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
if (pRspWrapper == NULL) { if (pRspWrapper == NULL) {
tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch); tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
goto CREATE_MSG_FAIL; goto CREATE_MSG_FAIL;
...@@ -1204,7 +1204,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -1204,7 +1204,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
// handle meta rsp // handle meta rsp
int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType; int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM); SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
if (pRspWrapper == NULL) { if (pRspWrapper == NULL) {
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet); taosMemoryFree(pMsg->pEpSet);
...@@ -1394,7 +1394,7 @@ int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -1394,7 +1394,7 @@ int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
tmqUpdateEp(tmq, head->epoch, &rsp); tmqUpdateEp(tmq, head->epoch, &rsp);
tDeleteSMqAskEpRsp(&rsp); tDeleteSMqAskEpRsp(&rsp);
} else { } else {
SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM); SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM, 0);
if (pWrapper == NULL) { if (pWrapper == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
code = -1; code = -1;
......
...@@ -159,12 +159,12 @@ int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { ...@@ -159,12 +159,12 @@ int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
} }
if (pWorker == NULL) return -1; if (pWorker == NULL) return -1;
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM); SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen);
if (pMsg == NULL) return -1; if (pMsg == NULL) return -1;
memcpy(pMsg, pRpc, sizeof(SRpcMsg)); memcpy(pMsg, pRpc, sizeof(SRpcMsg));
pRpc->pCont = NULL; pRpc->pCont = NULL;
dTrace("msg:%p, is created and will put into %s queue, type:%s", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType)); dTrace("msg:%p, is created and will put into %s queue, type:%s len:%d", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType), pRpc->contLen);
int32_t code = mmPutMsgToWorker(pMgmt, pWorker, pMsg); int32_t code = mmPutMsgToWorker(pMgmt, pWorker, pMsg);
if (code != 0) { if (code != 0) {
dTrace("msg:%p, is freed", pMsg); dTrace("msg:%p, is freed", pMsg);
......
...@@ -58,19 +58,19 @@ int32_t qmPutNodeMsgToFetchQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -58,19 +58,19 @@ int32_t qmPutNodeMsgToFetchQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) {
} }
int32_t qmPutRpcMsgToQueue(SQnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { int32_t qmPutRpcMsgToQueue(SQnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM); SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen);
if (pMsg == NULL) return -1; if (pMsg == NULL) return -1;
memcpy(pMsg, pRpc, sizeof(SRpcMsg)); memcpy(pMsg, pRpc, sizeof(SRpcMsg));
pRpc->pCont = NULL; pRpc->pCont = NULL;
switch (qtype) { switch (qtype) {
case QUERY_QUEUE: case QUERY_QUEUE:
dTrace("msg:%p, is created and will put into qnode-query queue", pMsg); dTrace("msg:%p, is created and will put into qnode-query queue, len:%d", pMsg, pRpc->contLen);
taosWriteQitem(pMgmt->queryWorker.queue, pMsg); taosWriteQitem(pMgmt->queryWorker.queue, pMsg);
return 0; return 0;
case READ_QUEUE: case READ_QUEUE:
case FETCH_QUEUE: case FETCH_QUEUE:
dTrace("msg:%p, is created and will put into qnode-fetch queue", pMsg); dTrace("msg:%p, is created and will put into qnode-fetch queue, len:%d", pMsg, pRpc->contLen);
taosWriteQitem(pMgmt->fetchWorker.queue, pMsg); taosWriteQitem(pMgmt->fetchWorker.queue, pMsg);
return 0; return 0;
default: default:
......
...@@ -130,7 +130,7 @@ void smStopWorker(SSnodeMgmt *pMgmt) { ...@@ -130,7 +130,7 @@ void smStopWorker(SSnodeMgmt *pMgmt) {
} }
int32_t smPutMsgToQueue(SSnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { int32_t smPutMsgToQueue(SSnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM); SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen);
if (pMsg == NULL) { if (pMsg == NULL) {
rpcFreeCont(pRpc->pCont); rpcFreeCont(pRpc->pCont);
pRpc->pCont = NULL; pRpc->pCont = NULL;
...@@ -139,8 +139,8 @@ int32_t smPutMsgToQueue(SSnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { ...@@ -139,8 +139,8 @@ int32_t smPutMsgToQueue(SSnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
SSnode *pSnode = pMgmt->pSnode; SSnode *pSnode = pMgmt->pSnode;
if (pSnode == NULL) { if (pSnode == NULL) {
dError("msg:%p failed to put into snode queue since %s, type:%s qtype:%d", pMsg, terrstr(), dError("msg:%p failed to put into snode queue since %s, type:%s qtype:%d len:%d", pMsg, terrstr(),
TMSG_INFO(pMsg->msgType), qtype); TMSG_INFO(pMsg->msgType), qtype, pRpc->contLen);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
rpcFreeCont(pRpc->pCont); rpcFreeCont(pRpc->pCont);
pRpc->pCont = NULL; pRpc->pCont = NULL;
......
...@@ -233,7 +233,7 @@ int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -233,7 +233,7 @@ int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
} }
int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM); SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen);
if (pMsg == NULL) { if (pMsg == NULL) {
rpcFreeCont(pRpc->pCont); rpcFreeCont(pRpc->pCont);
pRpc->pCont = NULL; pRpc->pCont = NULL;
...@@ -241,7 +241,7 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { ...@@ -241,7 +241,7 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
} }
SMsgHead *pHead = pRpc->pCont; SMsgHead *pHead = pRpc->pCont;
dTrace("vgId:%d, msg:%p is created, type:%s", pHead->vgId, pMsg, TMSG_INFO(pRpc->msgType)); dTrace("vgId:%d, msg:%p is created, type:%s len:%d", pHead->vgId, pMsg, TMSG_INFO(pRpc->msgType), pRpc->contLen);
pHead->contLen = htonl(pHead->contLen); pHead->contLen = htonl(pHead->contLen);
pHead->vgId = htonl(pHead->vgId); pHead->vgId = htonl(pHead->vgId);
......
...@@ -141,11 +141,11 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { ...@@ -141,11 +141,11 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
} }
pRpc->info.wrapper = pWrapper; pRpc->info.wrapper = pWrapper;
pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM); pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen);
if (pMsg == NULL) goto _OVER; if (pMsg == NULL) goto _OVER;
memcpy(pMsg, pRpc, sizeof(SRpcMsg)); memcpy(pMsg, pRpc, sizeof(SRpcMsg));
dGTrace("msg:%p, is created, type:%s handle:%p", pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle); dGTrace("msg:%p, is created, type:%s handle:%p len:%d", pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle, pRpc->contLen);
code = dmProcessNodeMsg(pWrapper, pMsg); code = dmProcessNodeMsg(pWrapper, pMsg);
......
...@@ -769,7 +769,7 @@ static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p ...@@ -769,7 +769,7 @@ static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
} }
int32_t numOfQueries = taosArrayGetSize(pConn->pQueries); int32_t numOfQueries = taosArrayGetSize(pConn->pQueries);
for (int32_t i = 0; i < numOfQueries; ++i) { for (int32_t i = 0; i < numOfQueries && numOfRows < rows; ++i) {
SQueryDesc *pQuery = taosArrayGet(pConn->pQueries, i); SQueryDesc *pQuery = taosArrayGet(pConn->pQueries, i);
cols = 0; cols = 0;
......
...@@ -763,7 +763,7 @@ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, const void *pMsg, int32_t inpu ...@@ -763,7 +763,7 @@ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, const void *pMsg, int32_t inpu
tb_uid_t suid) { tb_uid_t suid) {
const SSubmitReq *pReq = (const SSubmitReq *)pMsg; const SSubmitReq *pReq = (const SSubmitReq *)pMsg;
void *qItem = taosAllocateQitem(pReq->header.contLen, DEF_QITEM); void *qItem = taosAllocateQitem(pReq->header.contLen, DEF_QITEM, 0);
if (!qItem) { if (!qItem) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
......
...@@ -1271,7 +1271,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { ...@@ -1271,7 +1271,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
qDebug("delete req enqueue stream task: %d, ver: %" PRId64, pTask->taskId, ver); qDebug("delete req enqueue stream task: %d, ver: %" PRId64, pTask->taskId, ver);
if (!failed) { if (!failed) {
SStreamRefDataBlock* pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM); SStreamRefDataBlock* pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
pRefBlock->type = STREAM_INPUT__REF_DATA_BLOCK; pRefBlock->type = STREAM_INPUT__REF_DATA_BLOCK;
pRefBlock->pBlock = pDelBlock; pRefBlock->pBlock = pDelBlock;
pRefBlock->dataRef = pRef; pRefBlock->dataRef = pRef;
...@@ -1303,7 +1303,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { ...@@ -1303,7 +1303,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
} }
#if 0 #if 0
SStreamDataBlock* pStreamBlock = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); SStreamDataBlock* pStreamBlock = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
pStreamBlock->type = STREAM_INPUT__DATA_BLOCK; pStreamBlock->type = STREAM_INPUT__DATA_BLOCK;
pStreamBlock->blocks = taosArrayInit(0, sizeof(SSDataBlock)); pStreamBlock->blocks = taosArrayInit(0, sizeof(SSDataBlock));
SSDataBlock block = {0}; SSDataBlock block = {0};
......
...@@ -121,7 +121,7 @@ static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) { ...@@ -121,7 +121,7 @@ static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) {
return &pInfo->blockData[1]; return &pInfo->blockData[1];
} }
if (pIter->pSttBlk == NULL) { if (pIter->pSttBlk == NULL || pInfo->pSchema == NULL) {
return NULL; return NULL;
} }
......
...@@ -2790,7 +2790,8 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { ...@@ -2790,7 +2790,8 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
while (1) { while (1) {
// load the last data block of current table // load the last data block of current table
STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter; STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
bool hasVal = initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
bool hasVal = initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
if (!hasVal) { if (!hasVal) {
bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus); bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
if (!hasNexTable) { if (!hasNexTable) {
...@@ -3823,6 +3824,8 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL ...@@ -3823,6 +3824,8 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
} }
// NOTE: the endVersion in pCond is the data version not schema version, so pCond->endVersion is not correct here. // NOTE: the endVersion in pCond is the data version not schema version, so pCond->endVersion is not correct here.
// no valid error code set in metaGetTbTSchema, so let's set the error code here.
// we should proceed in case of tmq processing.
if (pCond->suid != 0) { if (pCond->suid != 0) {
pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1, 1); pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1, 1);
if (pReader->pSchema == NULL) { if (pReader->pSchema == NULL) {
...@@ -3840,7 +3843,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL ...@@ -3840,7 +3843,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
updateBlockSMAInfo(pReader->pSchema, &pReader->suppInfo); updateBlockSMAInfo(pReader->pSchema, &pReader->suppInfo);
} }
STsdbReader* p = (pReader->innerReader[0] != NULL)? pReader->innerReader[0]:pReader; STsdbReader* p = (pReader->innerReader[0] != NULL) ? pReader->innerReader[0] : pReader;
pReader->status.pTableMap = createDataBlockScanInfo(p, &pReader->blockInfoBuf, pTableList, numOfTables); pReader->status.pTableMap = createDataBlockScanInfo(p, &pReader->blockInfoBuf, pTableList, numOfTables);
if (pReader->status.pTableMap == NULL) { if (pReader->status.pTableMap == NULL) {
tsdbReaderClose(p); tsdbReaderClose(p);
...@@ -3888,10 +3891,11 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL ...@@ -3888,10 +3891,11 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr); tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
return code; return code;
_err: _err:
tsdbError("failed to create data reader, code:%s %s", tstrerror(code), idstr); tsdbError("failed to create data reader, code:%s %s", tstrerror(code), idstr);
tsdbReaderClose(pReader);
return code; return code;
} }
void tsdbReaderClose(STsdbReader* pReader) { void tsdbReaderClose(STsdbReader* pReader) {
if (pReader == NULL) { if (pReader == NULL) {
......
...@@ -133,7 +133,7 @@ static int32_t getStatus(SDataDeleterHandle* pDeleter) { ...@@ -133,7 +133,7 @@ static int32_t getStatus(SDataDeleterHandle* pDeleter) {
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) { static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle; SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle;
SDataDeleterBuf* pBuf = taosAllocateQitem(sizeof(SDataDeleterBuf), DEF_QITEM); SDataDeleterBuf* pBuf = taosAllocateQitem(sizeof(SDataDeleterBuf), DEF_QITEM, 0);
if (NULL == pBuf) { if (NULL == pBuf) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
......
...@@ -126,7 +126,7 @@ static int32_t getStatus(SDataDispatchHandle* pDispatcher) { ...@@ -126,7 +126,7 @@ static int32_t getStatus(SDataDispatchHandle* pDispatcher) {
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) { static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
SDataDispatchBuf* pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf), DEF_QITEM); SDataDispatchBuf* pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf), DEF_QITEM, 0);
if (NULL == pBuf) { if (NULL == pBuf) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
......
...@@ -91,11 +91,15 @@ static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlo ...@@ -91,11 +91,15 @@ static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlo
SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId); SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId);
SFillLinearInfo* pLinearInfo = taosArrayGet(pSliceInfo->pLinearInfo, i); SFillLinearInfo* pLinearInfo = taosArrayGet(pSliceInfo->pLinearInfo, i);
if (!IS_MATHABLE_TYPE(pColInfoData->info.type)) {
continue;
}
// null value is represented by using key = INT64_MIN for now. // null value is represented by using key = INT64_MIN for now.
// TODO: optimize to ignore null values for linear interpolation. // TODO: optimize to ignore null values for linear interpolation.
if (!pLinearInfo->isStartSet) { if (!pLinearInfo->isStartSet) {
if (!colDataIsNull_s(pColInfoData, rowIndex)) { if (!colDataIsNull_s(pColInfoData, rowIndex)) {
ASSERT(IS_MATHABLE_TYPE(pColInfoData->info.type));
pLinearInfo->start.key = *(int64_t*)colDataGetData(pTsCol, rowIndex); pLinearInfo->start.key = *(int64_t*)colDataGetData(pTsCol, rowIndex);
char* p = colDataGetData(pColInfoData, rowIndex); char* p = colDataGetData(pColInfoData, rowIndex);
......
...@@ -56,7 +56,7 @@ void streamSchedByTimer(void* param, void* tmrId) { ...@@ -56,7 +56,7 @@ void streamSchedByTimer(void* param, void* tmrId) {
} }
if (atomic_load_8(&pTask->triggerStatus) == TASK_TRIGGER_STATUS__ACTIVE) { if (atomic_load_8(&pTask->triggerStatus) == TASK_TRIGGER_STATUS__ACTIVE) {
SStreamTrigger* trigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM); SStreamTrigger* trigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0);
if (trigger == NULL) return; if (trigger == NULL) return;
trigger->type = STREAM_INPUT__GET_RES; trigger->type = STREAM_INPUT__GET_RES;
trigger->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); trigger->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
...@@ -112,7 +112,7 @@ int32_t streamSchedExec(SStreamTask* pTask) { ...@@ -112,7 +112,7 @@ int32_t streamSchedExec(SStreamTask* pTask) {
} }
int32_t streamTaskEnqueue(SStreamTask* pTask, const SStreamDispatchReq* pReq, SRpcMsg* pRsp) { int32_t streamTaskEnqueue(SStreamTask* pTask, const SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
int8_t status; int8_t status;
// enqueue // enqueue
...@@ -150,7 +150,7 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, const SStreamDispatchReq* pReq, SR ...@@ -150,7 +150,7 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, const SStreamDispatchReq* pReq, SR
} }
int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) { int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
int8_t status = TASK_INPUT_STATUS__NORMAL; int8_t status = TASK_INPUT_STATUS__NORMAL;
// enqueue // enqueue
......
...@@ -67,7 +67,7 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock ...@@ -67,7 +67,7 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock
} }
SStreamDataSubmit* streamDataSubmitNew(SSubmitReq* pReq) { SStreamDataSubmit* streamDataSubmitNew(SSubmitReq* pReq) {
SStreamDataSubmit* pDataSubmit = (SStreamDataSubmit*)taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM); SStreamDataSubmit* pDataSubmit = (SStreamDataSubmit*)taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM, 0);
if (pDataSubmit == NULL) return NULL; if (pDataSubmit == NULL) return NULL;
pDataSubmit->dataRef = (int32_t*)taosMemoryMalloc(sizeof(int32_t)); pDataSubmit->dataRef = (int32_t*)taosMemoryMalloc(sizeof(int32_t));
if (pDataSubmit->dataRef == NULL) goto FAIL; if (pDataSubmit->dataRef == NULL) goto FAIL;
...@@ -81,7 +81,7 @@ FAIL: ...@@ -81,7 +81,7 @@ FAIL:
} }
SStreamMergedSubmit* streamMergedSubmitNew() { SStreamMergedSubmit* streamMergedSubmitNew() {
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)taosAllocateQitem(sizeof(SStreamMergedSubmit), DEF_QITEM); SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)taosAllocateQitem(sizeof(SStreamMergedSubmit), DEF_QITEM, 0);
if (pMerged == NULL) return NULL; if (pMerged == NULL) return NULL;
pMerged->reqs = taosArrayInit(0, sizeof(void*)); pMerged->reqs = taosArrayInit(0, sizeof(void*));
pMerged->dataRefs = taosArrayInit(0, sizeof(void*)); pMerged->dataRefs = taosArrayInit(0, sizeof(void*));
...@@ -107,7 +107,7 @@ static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit* pDataSubmit) ...@@ -107,7 +107,7 @@ static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit* pDataSubmit)
} }
SStreamDataSubmit* streamSubmitRefClone(SStreamDataSubmit* pSubmit) { SStreamDataSubmit* streamSubmitRefClone(SStreamDataSubmit* pSubmit) {
SStreamDataSubmit* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM); SStreamDataSubmit* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM, 0);
if (pSubmitClone == NULL) { if (pSubmitClone == NULL) {
return NULL; return NULL;
} }
......
...@@ -127,7 +127,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { ...@@ -127,7 +127,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
taosArrayDestroy(pRes); taosArrayDestroy(pRes);
break; break;
} }
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
if (qRes == NULL) { if (qRes == NULL) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
...@@ -235,7 +235,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { ...@@ -235,7 +235,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
qDebug("stream task %d exec end", pTask->taskId); qDebug("stream task %d exec end", pTask->taskId);
if (taosArrayGetSize(pRes) != 0) { if (taosArrayGetSize(pRes) != 0) {
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
if (qRes == NULL) { if (qRes == NULL) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
streamFreeQitem(input); streamFreeQitem(input);
......
...@@ -97,7 +97,7 @@ int32_t syncIOEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { ...@@ -97,7 +97,7 @@ int32_t syncIOEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
syncRpcMsgLog2(logBuf, pMsg); syncRpcMsgLog2(logBuf, pMsg);
SRpcMsg *pTemp; SRpcMsg *pTemp;
pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM); pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM, 0);
memcpy(pTemp, pMsg, sizeof(SRpcMsg)); memcpy(pTemp, pMsg, sizeof(SRpcMsg));
STaosQueue *pMsgQ = gSyncIO->pMsgQ; STaosQueue *pMsgQ = gSyncIO->pMsgQ;
...@@ -381,7 +381,7 @@ static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -381,7 +381,7 @@ static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
syncRpcMsgLog2((char *)"==syncIOProcessRequest==", pMsg); syncRpcMsgLog2((char *)"==syncIOProcessRequest==", pMsg);
SSyncIO *io = pParent; SSyncIO *io = pParent;
SRpcMsg *pTemp; SRpcMsg *pTemp;
pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM); pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM, 0);
memcpy(pTemp, pMsg, sizeof(SRpcMsg)); memcpy(pTemp, pMsg, sizeof(SRpcMsg));
taosWriteQitem(io->pMsgQ, pTemp); taosWriteQitem(io->pMsgQ, pTemp);
} }
...@@ -441,7 +441,7 @@ static void syncIOTickQ(void *param, void *tmrId) { ...@@ -441,7 +441,7 @@ static void syncIOTickQ(void *param, void *tmrId) {
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncPingReply2RpcMsg(pMsg, &rpcMsg); syncPingReply2RpcMsg(pMsg, &rpcMsg);
SRpcMsg *pTemp; SRpcMsg *pTemp;
pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM); pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM, 0);
memcpy(pTemp, &rpcMsg, sizeof(SRpcMsg)); memcpy(pTemp, &rpcMsg, sizeof(SRpcMsg));
syncRpcMsgLog2((char *)"==syncIOTickQ==", &rpcMsg); syncRpcMsgLog2((char *)"==syncIOTickQ==", &rpcMsg);
taosWriteQitem(io->pMsgQ, pTemp); taosWriteQitem(io->pMsgQ, pTemp);
......
...@@ -128,7 +128,7 @@ void *processShellMsg(void *arg) { ...@@ -128,7 +128,7 @@ void *processShellMsg(void *arg) {
void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
SRpcMsg *pTemp; SRpcMsg *pTemp;
pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM); pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM, 0);
memcpy(pTemp, pMsg, sizeof(SRpcMsg)); memcpy(pTemp, pMsg, sizeof(SRpcMsg));
int32_t idx = balance % multiQ->numOfThread; int32_t idx = balance % multiQ->numOfThread;
......
...@@ -109,20 +109,24 @@ int64_t taosQueueMemorySize(STaosQueue *queue) { ...@@ -109,20 +109,24 @@ int64_t taosQueueMemorySize(STaosQueue *queue) {
return memOfItems; return memOfItems;
} }
void *taosAllocateQitem(int32_t size, EQItype itype) { void *taosAllocateQitem(int32_t size, EQItype itype, int64_t dataSize) {
STaosQnode *pNode = taosMemoryCalloc(1, sizeof(STaosQnode) + size); STaosQnode *pNode = taosMemoryCalloc(1, sizeof(STaosQnode) + size);
if (pNode == NULL) { if (pNode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
pNode->dataSize = dataSize;
pNode->size = size; pNode->size = size;
pNode->itype = itype; pNode->itype = itype;
pNode->timestamp = taosGetTimestampUs(); pNode->timestamp = taosGetTimestampUs();
if (itype == RPC_QITEM) { if (itype == RPC_QITEM) {
int64_t alloced = atomic_add_fetch_64(&tsRpcQueueMemoryUsed, size); int64_t alloced = atomic_add_fetch_64(&tsRpcQueueMemoryUsed, size + dataSize);
if (alloced > tsRpcQueueMemoryAllowed) { if (alloced > tsRpcQueueMemoryAllowed) {
uError("failed to alloc qitem, size:%" PRId64 " alloc:%" PRId64 " allowed:%" PRId64, size + dataSize, alloced,
tsRpcQueueMemoryUsed);
atomic_sub_fetch_64(&tsRpcQueueMemoryUsed, size + dataSize);
taosMemoryFree(pNode); taosMemoryFree(pNode);
terrno = TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE; terrno = TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE;
return NULL; return NULL;
...@@ -139,8 +143,8 @@ void taosFreeQitem(void *pItem) { ...@@ -139,8 +143,8 @@ void taosFreeQitem(void *pItem) {
if (pItem == NULL) return; if (pItem == NULL) return;
STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode)); STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode));
if (pNode->itype > 0) { if (pNode->itype == RPC_QITEM) {
int64_t alloced = atomic_sub_fetch_64(&tsRpcQueueMemoryUsed, pNode->size); int64_t alloced = atomic_sub_fetch_64(&tsRpcQueueMemoryUsed, pNode->size + pNode->dataSize);
uTrace("item:%p, node:%p is freed, alloc:%" PRId64, pItem, pNode, alloced); uTrace("item:%p, node:%p is freed, alloc:%" PRId64, pItem, pNode, alloced);
} else { } else {
uTrace("item:%p, node:%p is freed", pItem, pNode); uTrace("item:%p, node:%p is freed", pItem, pNode);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册