提交 6f75e03a 编写于 作者: L Liu Jicong

refactor(stream): remove unneccessary param in converting data

上级 b8a3654c
......@@ -275,12 +275,8 @@ typedef struct SStreamTask {
int32_t nodeId;
SEpSet epSet;
// used for task source and sink,
// while task agg should have processedVer for each child
int64_t recoverSnapVer;
int64_t startVer;
int64_t checkpointVer;
int64_t processedVer;
// children info
SArray* childEpInfo; // SArray<SStreamChildEpInfo*>
......
......@@ -388,6 +388,8 @@ int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
ASSERT(waitingRspNum >= 0);
taosMemoryFree(pParam);
if (waitingRspNum == 0) {
// if no more waiting rsp
if (pParamSet->async) {
......@@ -402,6 +404,8 @@ int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
tsem_post(&pParamSet->rspSem);
}
taosMemoryFree(pParamSet);
#if 0
taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
......@@ -611,12 +615,12 @@ int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t
}
}
#if 0
if (!async) {
#if 0
taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
}
#endif
}
return 0;
}
......
......@@ -128,19 +128,19 @@ typedef struct STsdbReader STsdbReader;
#define LASTROW_RETRIEVE_TYPE_ALL 0x1
#define LASTROW_RETRIEVE_TYPE_SINGLE 0x2
int32_t tsdbSetTableId(STsdbReader *pReader, int64_t uid);
int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, SArray *pTableList, STsdbReader **ppReader,
const char *idstr);
void tsdbReaderClose(STsdbReader *pReader);
bool tsdbNextDataBlock(STsdbReader *pReader);
void tsdbRetrieveDataBlockInfo(STsdbReader *pReader, SDataBlockInfo *pDataBlockInfo);
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SColumnDataAgg ***pBlockStatis, bool *allHave);
SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond);
int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo);
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle);
void *tsdbGetIdx(SMeta *pMeta);
void *tsdbGetIvtIdx(SMeta *pMeta);
int32_t tsdbSetTableId(STsdbReader *pReader, int64_t uid);
int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, SArray *pTableList, STsdbReader **ppReader,
const char *idstr);
void tsdbReaderClose(STsdbReader *pReader);
bool tsdbNextDataBlock(STsdbReader *pReader);
void tsdbRetrieveDataBlockInfo(STsdbReader *pReader, SDataBlockInfo *pDataBlockInfo);
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SColumnDataAgg ***pBlockStatis, bool *allHave);
SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond);
int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo);
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle);
void *tsdbGetIdx(SMeta *pMeta);
void *tsdbGetIvtIdx(SMeta *pMeta);
uint64_t getReaderMaxVersion(STsdbReader *pReader);
int32_t tsdbLastRowReaderOpen(void *pVnode, int32_t type, SArray *pTableIdList, int32_t numOfCols, void **pReader);
......
......@@ -80,7 +80,7 @@ int32_t vnodeQueryOpen(SVnode* pVnode);
void vnodeQueryClose(SVnode* pVnode);
int32_t vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg, bool direct);
int vnodeGetTableCfg(SVnode* pVnode, SRpcMsg* pMsg, bool direct);
int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg);
int32_t vnodeGetBatchMeta(SVnode* pVnode, SRpcMsg* pMsg);
// vnodeCommit.c
int32_t vnodeBegin(SVnode* pVnode);
......@@ -98,6 +98,7 @@ void vnodeSyncStart(SVnode* pVnode);
void vnodeSyncClose(SVnode* pVnode);
void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg);
bool vnodeIsLeader(SVnode* pVnode);
bool vnodeIsRoleLeader(SVnode* pVnode);
#ifdef __cplusplus
}
......
......@@ -144,6 +144,7 @@ int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb
STsdbReader tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
void* pMemRef);
int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg);
int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list);
// tq
int tqInit();
......@@ -169,10 +170,9 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list);
SSubmitReq* tdBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid,
const char* stbFullName, int32_t vgId, SBatchDeleteReq* pDeleteReq);
SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid,
const char* stbFullName, SBatchDeleteReq* pDeleteReq);
// sma
int32_t smaInit();
......
......@@ -201,9 +201,8 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
}
SBatchDeleteReq deleteReq;
SSubmitReq *pSubmitReq =
tdBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, true, pTsmaStat->pTSma->dstTbUid,
pTsmaStat->pTSma->dstTbName, pTsmaStat->pTSma->dstVgId, &deleteReq);
SSubmitReq *pSubmitReq = tqBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, true,
pTsmaStat->pTSma->dstTbUid, pTsmaStat->pTSma->dstTbName, &deleteReq);
if (!pSubmitReq) {
smaError("vgId:%d, failed to gen submit blk while tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma),
......
......@@ -14,6 +14,7 @@
*/
#include "tq.h"
#include "vnd.h"
#if 0
void tqTmrRspFunc(void* param, void* tmrId) {
......@@ -212,9 +213,7 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
#endif
int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
walApplyVer(pTq->pVnode->pWal, ver);
if (msgType == TDMT_VND_SUBMIT) {
if (vnodeIsRoleLeader(pTq->pVnode) && msgType == TDMT_VND_SUBMIT) {
if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) return 0;
void* data = taosMemoryMalloc(msgLen);
......
......@@ -25,8 +25,7 @@ int32_t tdBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl
SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX);
for (int32_t row = 0; row < totRow; row++) {
int64_t ts = *(int64_t*)colDataGetData(pTsCol, row);
/*int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row);*/
int64_t groupId = 0;
int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row);
char* name = buildCtbNameByGroupId(stbFullName, groupId);
tqDebug("stream delete msg: groupId :%ld, name: %s", groupId, name);
SMetaReader mr = {0};
......@@ -49,8 +48,8 @@ int32_t tdBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl
return 0;
}
SSubmitReq* tdBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pTSchema, bool createTb,
int64_t suid, const char* stbFullName, int32_t vgId, SBatchDeleteReq* pDeleteReq) {
SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pTSchema, bool createTb,
int64_t suid, const char* stbFullName, SBatchDeleteReq* pDeleteReq) {
SSubmitReq* ret = NULL;
SArray* schemaReqs = NULL;
SArray* schemaReqSz = NULL;
......@@ -153,7 +152,7 @@ SSubmitReq* tdBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
// assign data
// TODO
ret = rpcMallocCont(cap);
ret->header.vgId = vgId;
ret->header.vgId = pVnode->config.vgId;
ret->length = sizeof(SSubmitReq);
ret->numOfBlocks = htonl(sz);
......@@ -233,8 +232,8 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
ASSERT(pTask->tbSink.pTSchema);
deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
SSubmitReq* pReq = tdBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid,
pTask->tbSink.stbFullName, pVnode->config.vgId, &deleteReq);
SSubmitReq* pReq = tqBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid,
pTask->tbSink.stbFullName, &deleteReq);
tqDebug("vgId:%d, task %d convert blocks over, put into write-queue", TD_VID(pVnode), pTask->taskId);
......
......@@ -247,6 +247,8 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
vTrace("vgId:%d, process %s request success, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version);
walApplyVer(pVnode->pWal, version);
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
return -1;
......
......@@ -728,6 +728,8 @@ void vnodeSyncStart(SVnode *pVnode) {
void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }
bool vnodeIsRoleLeader(SVnode *pVnode) { return syncGetMyRole(pVnode->sync) == TAOS_SYNC_STATE_LEADER; }
bool vnodeIsLeader(SVnode *pVnode) {
if (!syncIsReady(pVnode->sync)) {
vDebug("vgId:%d, vnode not ready, state:%s, restore:%d", pVnode->config.vgId, syncGetMyRoleStr(pVnode->sync),
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册