提交 68f310dd 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 4fd3064f
...@@ -23,12 +23,11 @@ extern "C" { ...@@ -23,12 +23,11 @@ extern "C" {
#ifndef _STREAM_STATE_H_ #ifndef _STREAM_STATE_H_
#define _STREAM_STATE_H_ #define _STREAM_STATE_H_
typedef struct SStreamTask SStreamTask;
typedef bool (*state_key_cmpr_fn)(void* pKey1, void* pKey2); typedef bool (*state_key_cmpr_fn)(void* pKey1, void* pKey2);
typedef struct STdbState { typedef struct STdbState {
SStreamTask* pOwner; struct SStreamTask* pOwner;
TDB* db; TDB* db;
TTB* pStateDb; TTB* pStateDb;
TTB* pFuncStateDb; TTB* pFuncStateDb;
...@@ -45,7 +44,7 @@ typedef struct { ...@@ -45,7 +44,7 @@ typedef struct {
int32_t number; int32_t number;
} SStreamState; } SStreamState;
SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages); SStreamState* streamStateOpen(char* path, struct SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages);
void streamStateClose(SStreamState* pState); void streamStateClose(SStreamState* pState);
int32_t streamStateBegin(SStreamState* pState); int32_t streamStateBegin(SStreamState* pState);
int32_t streamStateCommit(SStreamState* pState); int32_t streamStateCommit(SStreamState* pState);
......
...@@ -295,7 +295,7 @@ typedef struct { ...@@ -295,7 +295,7 @@ typedef struct {
SEpSet epSet; SEpSet epSet;
} SStreamChildEpInfo; } SStreamChildEpInfo;
typedef struct SStreamTask { struct SStreamTask {
int64_t streamId; int64_t streamId;
int32_t taskId; int32_t taskId;
int32_t totalLevel; int32_t totalLevel;
...@@ -362,8 +362,7 @@ typedef struct SStreamTask { ...@@ -362,8 +362,7 @@ typedef struct SStreamTask {
int64_t checkpointingId; int64_t checkpointingId;
int32_t checkpointAlignCnt; int32_t checkpointAlignCnt;
};
} SStreamTask;
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo); int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo);
......
...@@ -57,7 +57,7 @@ target_sources( ...@@ -57,7 +57,7 @@ target_sources(
# tq # tq
"src/tq/tq.c" "src/tq/tq.c"
"src/tq/tqExec.c" "src/tq/tqScan.c"
"src/tq/tqMeta.c" "src/tq/tqMeta.c"
"src/tq/tqRead.c" "src/tq/tqRead.c"
"src/tq/tqOffset.c" "src/tq/tqOffset.c"
......
...@@ -473,6 +473,10 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -473,6 +473,10 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId); qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
code = tqScanData(pTq, pHandle, &dataRsp, pOffset); code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
if (code != TSDB_CODE_SUCCESS) {
taosWUnLockLatch(&pTq->lock);
return code;
}
// till now, all data has been transferred to consumer, new data needs to push client once arrived. // till now, all data has been transferred to consumer, new data needs to push client once arrived.
if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
......
...@@ -220,7 +220,7 @@ static void freeItem(void* param) { ...@@ -220,7 +220,7 @@ static void freeItem(void* param) {
static void doRemovePushedEntry(SArray* pCachedKeys, STQ* pTq) { static void doRemovePushedEntry(SArray* pCachedKeys, STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
size_t numOfKeys = taosArrayGetSize(pCachedKeys); int32_t numOfKeys = (int32_t) taosArrayGetSize(pCachedKeys);
for (int32_t i = 0; i < numOfKeys; i++) { for (int32_t i = 0; i < numOfKeys; i++) {
SItem* pItem = taosArrayGet(pCachedKeys, i); SItem* pItem = taosArrayGet(pCachedKeys, i);
...@@ -248,13 +248,9 @@ static void doPushDataForEntry(void* pIter, STqExecHandle* pExec, STQ* pTq, int6 ...@@ -248,13 +248,9 @@ static void doPushDataForEntry(void* pIter, STqExecHandle* pExec, STQ* pTq, int6
qTaskInfo_t pTaskInfo = pExec->task; qTaskInfo_t pTaskInfo = pExec->task;
// prepare scan mem data // prepare scan mem data
SPackedData submit = { SPackedData submit = {.msgStr = pData, .msgLen = dataLen, .ver = ver};
.msgStr = pData,
.msgLen = dataLen,
.ver = ver,
};
if(qStreamSetScanMemData(pTaskInfo, submit) != 0){ if (qStreamSetScanMemData(pTaskInfo, submit) != 0) {
return; return;
} }
...@@ -287,7 +283,7 @@ static void doPushDataForEntry(void* pIter, STqExecHandle* pExec, STQ* pTq, int6 ...@@ -287,7 +283,7 @@ static void doPushDataForEntry(void* pIter, STqExecHandle* pExec, STQ* pTq, int6
} }
} }
int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg)); void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg));
int32_t len = msgLen - sizeof(SSubmitReq2Msg); int32_t len = msgLen - sizeof(SSubmitReq2Msg);
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
...@@ -341,6 +337,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) ...@@ -341,6 +337,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
taosWUnLockLatch(&pTq->lock); taosWUnLockLatch(&pTq->lock);
} }
// push data for stream processing
if (!tsDisableStream && vnodeIsRoleLeader(pTq->pVnode)) { if (!tsDisableStream && vnodeIsRoleLeader(pTq->pVnode)) {
if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) { if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) {
return 0; return 0;
...@@ -353,12 +350,9 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) ...@@ -353,12 +350,9 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
tqError("failed to copy data for stream since out of memory"); tqError("failed to copy data for stream since out of memory");
return -1; return -1;
} }
memcpy(data, pReq, len); memcpy(data, pReq, len);
SPackedData submit = { SPackedData submit = {.msgStr = data, .msgLen = len, .ver = ver};
.msgStr = data,
.msgLen = len,
.ver = ver,
};
tqDebug("tq copy write msg %p %d %" PRId64 " from %p", data, len, ver, pReq); tqDebug("tq copy write msg %p %d %" PRId64 " from %p", data, len, ver, pReq);
tqProcessSubmitReq(pTq, submit); tqProcessSubmitReq(pTq, submit);
......
...@@ -18,7 +18,9 @@ ...@@ -18,7 +18,9 @@
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision) { int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision) {
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock); int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
void* buf = taosMemoryCalloc(1, dataStrLen); void* buf = taosMemoryCalloc(1, dataStrLen);
if (buf == NULL) return -1; if (buf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf; SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
pRetrieve->useconds = 0; pRetrieve->useconds = 0;
...@@ -31,7 +33,8 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t ...@@ -31,7 +33,8 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t
actualLen += sizeof(SRetrieveTableRsp); actualLen += sizeof(SRetrieveTableRsp);
taosArrayPush(pRsp->blockDataLen, &actualLen); taosArrayPush(pRsp->blockDataLen, &actualLen);
taosArrayPush(pRsp->blockData, &buf); taosArrayPush(pRsp->blockData, &buf);
return 0;
return TSDB_CODE_SUCCESS;
} }
static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, STaosxRsp* pRsp) { static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, STaosxRsp* pRsp) {
...@@ -63,38 +66,39 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, STaosxRsp* pRsp, in ...@@ -63,38 +66,39 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, STaosxRsp* pRsp, in
int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) { int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) {
const int32_t MAX_ROWS_TO_RETURN = 4096; const int32_t MAX_ROWS_TO_RETURN = 4096;
int32_t vgId = TD_VID(pTq->pVnode);
int32_t code = 0;
int32_t totalRows = 0;
const STqExecHandle* pExec = &pHandle->execHandle; const STqExecHandle* pExec = &pHandle->execHandle;
qTaskInfo_t task = pExec->task; qTaskInfo_t task = pExec->task;
int32_t vgId = TD_VID(pTq->pVnode);
if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) { if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) {
tqDebug("prepare scan failed, return, consumer:0x%"PRIx64, pHandle->consumerId); tqDebug("prepare scan failed, return, consumer:0x%"PRIx64, pHandle->consumerId);
if (pOffset->type == TMQ_OFFSET__LOG) { if (pOffset->type == TMQ_OFFSET__LOG) {
pRsp->rspOffset = *pOffset; pRsp->rspOffset = *pOffset;
return 0; return code;
} else { } else {
tqOffsetResetToLog(pOffset, pHandle->snapshotVer); tqOffsetResetToLog(pOffset, pHandle->snapshotVer);
if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) { if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) {
tqDebug("prepare scan failed, return, consumer:0x%"PRIx64, pHandle->consumerId); tqDebug("prepare scan failed, return, consumer:0x%"PRIx64, pHandle->consumerId);
pRsp->rspOffset = *pOffset; pRsp->rspOffset = *pOffset;
return 0; return code;
} }
} }
} }
int32_t totalRows = 0;
while (1) { while (1) {
SSDataBlock* pDataBlock = NULL; SSDataBlock* pDataBlock = NULL;
uint64_t ts = 0; uint64_t ts = 0;
tqDebug("vgId:%d, tmq task start to execute, consumer:0x%"PRIx64, vgId, pHandle->consumerId); tqDebug("vgId:%d, tmq task start to execute, consumer:0x%" PRIx64, vgId, pHandle->consumerId);
if (qExecTask(task, &pDataBlock, &ts) < 0) {
code = qExecTask(task, &pDataBlock, &ts);
if (code != TSDB_CODE_SUCCESS) {
tqError("vgId:%d, task exec error since %s, consumer:0x%" PRIx64, vgId, terrstr(), tqError("vgId:%d, task exec error since %s, consumer:0x%" PRIx64, vgId, terrstr(),
pHandle->consumerId); pHandle->consumerId);
return -1; return code;
} }
// current scan should be stopped ASAP, since the re-balance occurs. // current scan should be stopped ASAP, since the re-balance occurs.
...@@ -102,7 +106,12 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs ...@@ -102,7 +106,12 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
break; break;
} }
tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision); code = tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
if (code != TSDB_CODE_SUCCESS) {
tqError("vgId:%d, failed to add block to rsp msg", vgId);
return code;
}
pRsp->blockNum++; pRsp->blockNum++;
tqDebug("vgId:%d, consumer:0x%" PRIx64 " tmq task executed, rows:%d, total blocks:%d", vgId, pHandle->consumerId, tqDebug("vgId:%d, consumer:0x%" PRIx64 " tmq task executed, rows:%d, total blocks:%d", vgId, pHandle->consumerId,
...@@ -116,25 +125,25 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs ...@@ -116,25 +125,25 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
} }
} }
if (qStreamExtractOffset(task, &pRsp->rspOffset) < 0) { qStreamExtractOffset(task, &pRsp->rspOffset);
return -1;
}
if (pRsp->rspOffset.type == 0) { if (pRsp->rspOffset.type == 0) {
code = TSDB_CODE_INVALID_PARA;
tqError("vgId:%d, expected rsp offset: type %d %" PRId64 " %" PRId64 " %" PRId64, vgId, pRsp->rspOffset.type, tqError("vgId:%d, expected rsp offset: type %d %" PRId64 " %" PRId64 " %" PRId64, vgId, pRsp->rspOffset.type,
pRsp->rspOffset.ts, pRsp->rspOffset.uid, pRsp->rspOffset.version); pRsp->rspOffset.ts, pRsp->rspOffset.uid, pRsp->rspOffset.version);
return -1; return code;
} }
if (pRsp->withTbName || pRsp->withSchema) { if (pRsp->withTbName || pRsp->withSchema) {
code = TSDB_CODE_INVALID_PARA;
tqError("vgId:%d, get column should not with meta:%d,%d", vgId, pRsp->withTbName, pRsp->withSchema); tqError("vgId:%d, get column should not with meta:%d,%d", vgId, pRsp->withTbName, pRsp->withSchema);
return -1; return code;
} }
tqDebug("vgId:%d, consumer:0x%" PRIx64 " tmq task executed, rows:%d, total blocks:%d, rows:%d", vgId, pHandle->consumerId, tqDebug("vgId:%d, consumer:0x%" PRIx64 " tmq task executed, rows:%d, total blocks:%d, rows:%d", vgId, pHandle->consumerId,
pRsp->blockNum, totalRows); pRsp->blockNum, totalRows);
return 0; return code;
} }
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* pOffset) { int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* pOffset) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册