提交 ef092c1c 编写于 作者: dengyihao's avatar dengyihao

enh: add queue limit

上级 e1249aaf
...@@ -15,6 +15,10 @@ ...@@ -15,6 +15,10 @@
#include "tq.h" #include "tq.h"
// 0: not init
// 1: already inited
// 2: wait to be inited or cleaup
int32_t tqInit() { int32_t tqInit() {
int8_t old; int8_t old;
while (1) { while (1) {
...@@ -275,8 +279,8 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con ...@@ -275,8 +279,8 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
char buf2[80] = {0}; char buf2[80] = {0};
tFormatOffset(buf1, 80, &pRsp->reqOffset); tFormatOffset(buf1, 80, &pRsp->reqOffset);
tFormatOffset(buf2, 80, &pRsp->rspOffset); tFormatOffset(buf2, 80, &pRsp->rspOffset);
tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d), block num:%d, req:%s, rsp:%s", tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d), block num:%d, req:%s, rsp:%s", TD_VID(pTq->pVnode),
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2); pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
return 0; return 0;
} }
...@@ -497,7 +501,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -497,7 +501,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
// update epoch if need // update epoch if need
int32_t savedEpoch = atomic_load_32(&pHandle->epoch); int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
while (savedEpoch < reqEpoch) { while (savedEpoch < reqEpoch) {
tqDebug("tmq poll: consumer:0x%"PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch, reqEpoch); tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch,
reqEpoch);
savedEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, savedEpoch, reqEpoch); savedEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, savedEpoch, reqEpoch);
} }
...@@ -602,7 +607,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -602,7 +607,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
code = -1; code = -1;
} }
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64 ", ts:%" PRId64 "", tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64
", ts:%" PRId64 "",
consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type, consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type,
dataRsp.rspOffset.uid, dataRsp.rspOffset.ts); dataRsp.rspOffset.uid, dataRsp.rspOffset.ts);
...@@ -612,7 +618,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -612,7 +618,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
// for taosx // for taosx
SMqMetaRsp metaRsp = {0}; SMqMetaRsp metaRsp = {0};
STaosxRsp taosxRsp = {0}; STaosxRsp taosxRsp = {0};
tqInitTaosxRsp(&taosxRsp, &req); tqInitTaosxRsp(&taosxRsp, &req);
if (fetchOffsetNew.type != TMQ_OFFSET__LOG) { if (fetchOffsetNew.type != TMQ_OFFSET__LOG) {
...@@ -887,14 +893,15 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg ...@@ -887,14 +893,15 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
} }
taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle)); taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
tqDebug("try to persist handle %s consumer:0x%" PRIx64" , old consumer:0x%"PRIx64, req.subKey, pHandle->consumerId, tqDebug("try to persist handle %s consumer:0x%" PRIx64 " , old consumer:0x%" PRIx64, req.subKey,
oldConsumerId); pHandle->consumerId, oldConsumerId);
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) { if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
return -1; return -1;
} }
} else { } else {
// TODO handle qmsg and exec modification // TODO handle qmsg and exec modification
tqInfo("update the consumer info, old consumer id:0x%"PRIx64", new Id:0x%"PRIx64, pHandle->consumerId, req.newConsumerId); tqInfo("update the consumer info, old consumer id:0x%" PRIx64 ", new Id:0x%" PRIx64, pHandle->consumerId,
req.newConsumerId);
atomic_store_32(&pHandle->epoch, -1); atomic_store_32(&pHandle->epoch, -1);
atomic_store_64(&pHandle->consumerId, req.newConsumerId); atomic_store_64(&pHandle->consumerId, req.newConsumerId);
atomic_add_fetch_32(&pHandle->epoch, 1); atomic_add_fetch_32(&pHandle->epoch, 1);
...@@ -983,9 +990,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { ...@@ -983,9 +990,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
pTask->tbSink.vnode = pTq->pVnode; pTask->tbSink.vnode = pTq->pVnode;
pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline2; pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline2;
int32_t version = 1; int32_t version = 1;
SMetaInfo info = {0}; SMetaInfo info = {0};
int32_t code = metaGetInfo(pTq->pVnode->pMeta, pTask->tbSink.stbUid, &info, NULL); int32_t code = metaGetInfo(pTq->pVnode->pMeta, pTask->tbSink.stbUid, &info, NULL);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
version = info.skmVer; version = info.skmVer;
} }
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "ttimer.h" #include "ttimer.h"
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId) { SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId) {
int code = -1;
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta)); SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
if (pMeta == NULL) { if (pMeta == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
...@@ -33,7 +34,12 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF ...@@ -33,7 +34,12 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
} }
sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints"); sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints");
taosMulModeMkDir(streamPath, 0755); code = taosMulModeMkDir(streamPath, 0755) != 0;
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
taosMemoryFree(streamPath);
goto _err;
}
taosMemoryFree(streamPath); taosMemoryFree(streamPath);
if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) { if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册