未验证 提交 dfcee117 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #20673 from taosdata/fix/liaohj

refactor: do some internal refactor.
......@@ -23,12 +23,11 @@ extern "C" {
#ifndef _STREAM_STATE_H_
#define _STREAM_STATE_H_
typedef struct SStreamTask SStreamTask;
typedef bool (*state_key_cmpr_fn)(void* pKey1, void* pKey2);
typedef struct STdbState {
SStreamTask* pOwner;
struct SStreamTask* pOwner;
TDB* db;
TTB* pStateDb;
TTB* pFuncStateDb;
......@@ -45,7 +44,7 @@ typedef struct {
int32_t number;
} SStreamState;
SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages);
SStreamState* streamStateOpen(char* path, struct SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages);
void streamStateClose(SStreamState* pState);
int32_t streamStateBegin(SStreamState* pState);
int32_t streamStateCommit(SStreamState* pState);
......
......@@ -295,7 +295,7 @@ typedef struct {
SEpSet epSet;
} SStreamChildEpInfo;
typedef struct SStreamTask {
struct SStreamTask {
int64_t streamId;
int32_t taskId;
int32_t totalLevel;
......@@ -362,8 +362,7 @@ typedef struct SStreamTask {
int64_t checkpointingId;
int32_t checkpointAlignCnt;
} SStreamTask;
};
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo);
......
......@@ -57,7 +57,7 @@ target_sources(
# tq
"src/tq/tq.c"
"src/tq/tqExec.c"
"src/tq/tqScan.c"
"src/tq/tqMeta.c"
"src/tq/tqRead.c"
"src/tq/tqOffset.c"
......
......@@ -193,7 +193,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode);
void tqClose(STQ*);
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
int tqRegisterPushEntry(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp, int32_t type);
int tqRemovePushEntry(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer);
int tqUnregisterPushEntry(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer);
int tqCommit(STQ*);
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
......
......@@ -223,19 +223,6 @@ static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqData
int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) {
SMqDataRsp* pRsp = pPushEntry->pDataRsp;
#if 0
A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);
A(!pRsp->withSchema);
A(taosArrayGetSize(pRsp->blockSchema) == 0);
if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
A(pRsp->rspOffset.version > pRsp->reqOffset.version);
}
#endif
SMqRspHead* pHeader = &pPushEntry->pDataRsp->head;
doSendDataRsp(&pPushEntry->info, pRsp, pHeader->epoch, pHeader->consumerId, pHeader->mqMsgType);
......@@ -486,6 +473,10 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
if (code != TSDB_CODE_SUCCESS) {
taosWUnLockLatch(&pTq->lock);
return code;
}
// till now, all data has been transferred to consumer, new data needs to push client once arrived.
if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
......@@ -905,7 +896,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
atomic_store_32(&pHandle->epoch, -1);
// remove if it has been register in the push manager, and return one empty block to consumer
tqRemovePushEntry(pTq, req.subKey, (int32_t)strlen(req.subKey), pHandle->consumerId, true);
tqUnregisterPushEntry(pTq, req.subKey, (int32_t)strlen(req.subKey), pHandle->consumerId, true);
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
atomic_add_fetch_32(&pHandle->epoch, 1);
......
......@@ -206,77 +206,61 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
}
#endif
int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg));
int32_t len = msgLen - sizeof(SSubmitReq2Msg);
int32_t vgId = TD_VID(pTq->pVnode);
typedef struct {
void* pKey;
int64_t keyLen;
} SItem;
if (msgType == TDMT_VND_SUBMIT) {
// lock push mgr to avoid potential msg lost
taosWLockLatch(&pTq->lock);
int32_t numOfRegisteredPush = taosHashGetSize(pTq->pPushMgr);
if (numOfRegisteredPush > 0) {
tqDebug("vgId:%d tq push msg version:%" PRId64 " type:%s, head:%p, body:%p len:%d, numOfPushed consumers:%d",
vgId, ver, TMSG_INFO(msgType), msg, pReq, len, numOfRegisteredPush);
static void recordPushedEntry(SArray* cachedKey, void* pIter);
SArray* cachedKeys = taosArrayInit(0, sizeof(void*));
SArray* cachedKeyLens = taosArrayInit(0, sizeof(size_t));
static void freeItem(void* param) {
SItem* p = (SItem*) param;
taosMemoryFree(p->pKey);
}
void* data = taosMemoryMalloc(len);
if (data == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("failed to copy data for stream since out of memory");
taosArrayDestroyP(cachedKeys, (FDelete)taosMemoryFree);
taosArrayDestroy(cachedKeyLens);
static void doRemovePushedEntry(SArray* pCachedKeys, STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode);
int32_t numOfKeys = (int32_t) taosArrayGetSize(pCachedKeys);
// unlock
taosWUnLockLatch(&pTq->lock);
return -1;
for (int32_t i = 0; i < numOfKeys; i++) {
SItem* pItem = taosArrayGet(pCachedKeys, i);
if (taosHashRemove(pTq->pPushMgr, pItem->pKey, pItem->keyLen) != 0) {
tqError("vgId:%d, tq push hash remove key error, key: %s", vgId, (char*) pItem->pKey);
}
}
memcpy(data, pReq, len);
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pTq->pPushMgr, pIter);
if (pIter == NULL) {
break;
if (numOfKeys > 0) {
tqDebug("vgId:%d, pushed %d items and remain:%d", vgId, numOfKeys, (int32_t)taosHashGetSize(pTq->pPushMgr));
}
}
static void doPushDataForEntry(void* pIter, STqExecHandle* pExec, STQ* pTq, int64_t ver, int32_t vgId, char* pData,
int32_t dataLen, SArray* pCachedKey) {
STqPushEntry* pPushEntry = *(STqPushEntry**)pIter;
STqHandle* pHandle = taosHashGet(pTq->pHandle, pPushEntry->subKey, strlen(pPushEntry->subKey));
if (pHandle == NULL) {
tqDebug("vgId:%d, cannot find handle %s", pTq->pVnode->config.vgId, pPushEntry->subKey);
continue;
}
SMqDataRsp* pRsp = pPushEntry->pDataRsp;
if (pRsp->reqOffset.version >= ver) {
tqDebug("vgId:%d, push entry req version %" PRId64 ", while push version %" PRId64 ", skip", vgId,
pRsp->reqOffset.version, ver);
continue;
return;
}
STqExecHandle* pExec = &pHandle->execHandle;
qTaskInfo_t task = pExec->task;
qTaskInfo_t pTaskInfo = pExec->task;
// prepare scan mem data
SPackedData submit = {
.msgStr = data,
.msgLen = len,
.ver = ver,
};
if(qStreamSetScanMemData(task, submit) != 0){
continue;
SPackedData submit = {.msgStr = pData, .msgLen = dataLen, .ver = ver};
if (qStreamSetScanMemData(pTaskInfo, submit) != 0) {
return;
}
// here start to scan submit block to extract the subscribed data
int32_t totalRows = 0;
while (1) {
SSDataBlock* pDataBlock = NULL;
uint64_t ts = 0;
if (qExecTask(task, &pDataBlock, &ts) < 0) {
if (qExecTask(pTaskInfo, &pDataBlock, &ts) < 0) {
tqDebug("vgId:%d, tq exec error since %s", vgId, terrstr());
}
......@@ -286,45 +270,79 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
pRsp->blockNum++;
totalRows += pDataBlock->info.rows;
}
tqDebug("vgId:%d, tq handle push, subkey:%s, block num:%d", vgId, pPushEntry->subKey, pRsp->blockNum);
tqDebug("vgId:%d, tq handle push, subkey:%s, block num:%d, rows:%d", vgId, pPushEntry->subKey, pRsp->blockNum,
totalRows);
if (pRsp->blockNum > 0) {
// set offset
tqOffsetResetToLog(&pRsp->rspOffset, ver);
tqPushDataRsp(pTq, pPushEntry);
recordPushedEntry(pCachedKey, pIter);
}
}
// remove from hash
size_t kLen;
void* key = taosHashGetKey(pIter, &kLen);
void* keyCopy = taosMemoryCalloc(1, kLen + 1);
memcpy(keyCopy, key, kLen);
int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg));
int32_t len = msgLen - sizeof(SSubmitReq2Msg);
int32_t vgId = TD_VID(pTq->pVnode);
taosArrayPush(cachedKeys, &keyCopy);
taosArrayPush(cachedKeyLens, &kLen);
if (msgType == TDMT_VND_SUBMIT) {
// lock push mgr to avoid potential msg lost
taosWLockLatch(&pTq->lock);
tqPushDataRsp(pTq, pPushEntry);
int32_t numOfRegisteredPush = taosHashGetSize(pTq->pPushMgr);
if (numOfRegisteredPush > 0) {
tqDebug("vgId:%d tq push msg version:%" PRId64 " type:%s, head:%p, body:%p len:%d, numOfPushed consumers:%d",
vgId, ver, TMSG_INFO(msgType), msg, pReq, len, numOfRegisteredPush);
void* data = taosMemoryMalloc(len);
if (data == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("failed to copy data for stream since out of memory, vgId:%d", vgId);
taosWUnLockLatch(&pTq->lock);
return -1;
}
memcpy(data, pReq, len);
SArray* cachedKey = taosArrayInit(0, sizeof(SItem));
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pTq->pPushMgr, pIter);
if (pIter == NULL) {
break;
}
// delete entry
for (int32_t i = 0; i < taosArrayGetSize(cachedKeys); i++) {
void* key = taosArrayGetP(cachedKeys, i);
size_t kLen = *(size_t*)taosArrayGet(cachedKeyLens, i);
if (taosHashRemove(pTq->pPushMgr, key, kLen) != 0) {
tqError("vgId:%d, tq push hash remove key error, key: %s", pTq->pVnode->config.vgId, (char*)key);
STqPushEntry* pPushEntry = *(STqPushEntry**)pIter;
STqHandle* pHandle = taosHashGet(pTq->pHandle, pPushEntry->subKey, strlen(pPushEntry->subKey));
if (pHandle == NULL) {
tqDebug("vgId:%d, failed to find handle %s in pushing data to consumer, ignore", pTq->pVnode->config.vgId, pPushEntry->subKey);
continue;
}
STqExecHandle* pExec = &pHandle->execHandle;
doPushDataForEntry(pIter, pExec, pTq, ver, vgId, data, len, cachedKey);
}
taosArrayDestroyP(cachedKeys, (FDelete)taosMemoryFree);
taosArrayDestroy(cachedKeyLens);
doRemovePushedEntry(cachedKey, pTq);
taosArrayDestroyEx(cachedKey, freeItem);
taosMemoryFree(data);
}
// unlock
taosWUnLockLatch(&pTq->lock);
}
// push data for stream processing
if (!tsDisableStream && vnodeIsRoleLeader(pTq->pVnode)) {
if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) return 0;
if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) {
return 0;
}
if (msgType == TDMT_VND_SUBMIT) {
void* data = taosMemoryMalloc(len);
if (data == NULL) {
......@@ -332,12 +350,9 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
tqError("failed to copy data for stream since out of memory");
return -1;
}
memcpy(data, pReq, len);
SPackedData submit = {
.msgStr = data,
.msgLen = len,
.ver = ver,
};
SPackedData submit = {.msgStr = data, .msgLen = len, .ver = ver};
tqDebug("tq copy write msg %p %d %" PRId64 " from %p", data, len, ver, pReq);
tqProcessSubmitReq(pTq, submit);
......@@ -351,6 +366,13 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
return 0;
}
void recordPushedEntry(SArray* cachedKey, void* pIter) {
size_t kLen = 0;
void* key = taosHashGetKey(pIter, &kLen);
SItem item = {.pKey = strndup(key, kLen), .keyLen = kLen};
taosArrayPush(cachedKey, &item);
}
int32_t tqRegisterPushEntry(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg,
SMqDataRsp* pDataRsp, int32_t type) {
uint64_t consumerId = pRequest->consumerId;
......@@ -388,7 +410,7 @@ int32_t tqRegisterPushEntry(STQ* pTq, void* pHandle, const SMqPollReq* pRequest,
return 0;
}
int32_t tqRemovePushEntry(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer) {
int32_t tqUnregisterPushEntry(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer) {
int32_t vgId = TD_VID(pTq->pVnode);
STqPushEntry** pEntry = taosHashGet(pTq->pPushMgr, pKey, keyLen);
......
......@@ -18,7 +18,9 @@
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision) {
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
void* buf = taosMemoryCalloc(1, dataStrLen);
if (buf == NULL) return -1;
if (buf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
pRetrieve->useconds = 0;
......@@ -31,7 +33,8 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t
actualLen += sizeof(SRetrieveTableRsp);
taosArrayPush(pRsp->blockDataLen, &actualLen);
taosArrayPush(pRsp->blockData, &buf);
return 0;
return TSDB_CODE_SUCCESS;
}
static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, STaosxRsp* pRsp) {
......@@ -62,72 +65,85 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, STaosxRsp* pRsp, in
}
int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) {
const STqExecHandle* pExec = &pHandle->execHandle;
const int32_t MAX_ROWS_TO_RETURN = 4096;
int32_t vgId = TD_VID(pTq->pVnode);
int32_t code = 0;
int32_t totalRows = 0;
const STqExecHandle* pExec = &pHandle->execHandle;
qTaskInfo_t task = pExec->task;
int32_t vgId = TD_VID(pTq->pVnode);
if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) {
tqDebug("prepare scan failed, return, consumer:0x%"PRIx64, pHandle->consumerId);
if (pOffset->type == TMQ_OFFSET__LOG) {
pRsp->rspOffset = *pOffset;
return 0;
return code;
} else {
tqOffsetResetToLog(pOffset, pHandle->snapshotVer);
if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) {
tqDebug("prepare scan failed, return, consumer:0x%"PRIx64, pHandle->consumerId);
pRsp->rspOffset = *pOffset;
return 0;
return code;
}
}
}
int32_t rowCnt = 0;
while (1) {
SSDataBlock* pDataBlock = NULL;
uint64_t ts = 0;
tqDebug("vgId:%d, tmq task start to execute, consumer:0x%"PRIx64, vgId, pHandle->consumerId);
if (qExecTask(task, &pDataBlock, &ts) < 0) {
tqDebug("vgId:%d, tmq task start to execute, consumer:0x%" PRIx64, vgId, pHandle->consumerId);
code = qExecTask(task, &pDataBlock, &ts);
if (code != TSDB_CODE_SUCCESS) {
tqError("vgId:%d, task exec error since %s, consumer:0x%" PRIx64, vgId, terrstr(),
pHandle->consumerId);
return -1;
return code;
}
tqDebug("consumer:0x%"PRIx64" vgId:%d, tmq task executed, get %p", pHandle->consumerId, vgId, pDataBlock);
// current scan should be stopped asap, since the rebalance occurs.
// current scan should be stopped ASAP, since the re-balance occurs.
if (pDataBlock == NULL) {
break;
}
tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
code = tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
if (code != TSDB_CODE_SUCCESS) {
tqError("vgId:%d, failed to add block to rsp msg", vgId);
return code;
}
pRsp->blockNum++;
tqDebug("vgId:%d, consumer:0x%" PRIx64 " tmq task executed, rows:%d, total blocks:%d", vgId, pHandle->consumerId,
pDataBlock->info.rows, pRsp->blockNum);
if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
rowCnt += pDataBlock->info.rows;
if (rowCnt >= 4096) {
totalRows += pDataBlock->info.rows;
if (totalRows >= MAX_ROWS_TO_RETURN) {
break;
}
}
}
if (qStreamExtractOffset(task, &pRsp->rspOffset) < 0) {
return -1;
}
qStreamExtractOffset(task, &pRsp->rspOffset);
if (pRsp->rspOffset.type == 0) {
code = TSDB_CODE_INVALID_PARA;
tqError("vgId:%d, expected rsp offset: type %d %" PRId64 " %" PRId64 " %" PRId64, vgId, pRsp->rspOffset.type,
pRsp->rspOffset.ts, pRsp->rspOffset.uid, pRsp->rspOffset.version);
return -1;
return code;
}
if (pRsp->withTbName || pRsp->withSchema) {
code = TSDB_CODE_INVALID_PARA;
tqError("vgId:%d, get column should not with meta:%d,%d", vgId, pRsp->withTbName, pRsp->withSchema);
return -1;
return code;
}
return 0;
tqDebug("vgId:%d, consumer:0x%" PRIx64 " tmq task executed, total blocks:%d, rows:%d", vgId, pHandle->consumerId,
pRsp->blockNum, totalRows);
return code;
}
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* pOffset) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册