提交 bb0ce395 编写于 作者: L Liu Jicong

refactor(stream): add stream meta

上级 43026ac3
......@@ -476,8 +476,10 @@ typedef struct SStreamMeta {
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc);
void streamMetaClose(SStreamMeta* streamMeta);
int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, char* msg, int32_t msgLen);
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId);
int32_t streamMetaBegin(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta);
......
......@@ -117,10 +117,9 @@ typedef struct {
struct STQ {
SVnode* pVnode;
char* path;
SHashObj* pushMgr; // consumerId -> STqHandle*
SHashObj* handles; // subKey -> STqHandle
SHashObj* pStreamTasks; // taksId -> SStreamTask
SHashObj* pAlterInfo; // topic -> SAlterCheckInfo
SHashObj* pushMgr; // consumerId -> STqHandle*
SHashObj* handles; // subKey -> STqHandle
SHashObj* pAlterInfo; // topic -> SAlterCheckInfo
STqOffsetStore* pOffsetStore;
......@@ -129,9 +128,7 @@ struct STQ {
TTB* pAlterInfoStore;
TDB* pStreamStore;
TTB* pTaskDb;
TTB* pTaskState;
SStreamMeta* pStreamMeta;
};
typedef struct {
......@@ -188,6 +185,9 @@ static FORCE_INLINE void tqOffsetResetToLog(STqOffsetVal* pOffsetVal, int64_t ve
pOffsetVal->version = ver;
}
// tqStream
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask);
#ifdef __cplusplus
}
#endif
......
......@@ -62,8 +62,6 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
pTq->handles = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
pTq->pStreamTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
pTq->pushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
pTq->pAlterInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
......@@ -76,6 +74,11 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
ASSERT(0);
}
pTq->pStreamMeta = streamMetaOpen(path, pTq, (FTaskExpand*)tqExpandTask);
if (pTq->pStreamMeta == NULL) {
ASSERT(0);
}
return pTq;
}
......@@ -83,18 +86,11 @@ void tqClose(STQ* pTq) {
if (pTq) {
tqOffsetClose(pTq->pOffsetStore);
taosHashCleanup(pTq->handles);
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pTq->pStreamTasks, pIter);
if (pIter == NULL) break;
SStreamTask* pTask = *(SStreamTask**)pIter;
tFreeSStreamTask(pTask);
}
taosHashCleanup(pTq->pStreamTasks);
taosHashCleanup(pTq->pushMgr);
taosHashCleanup(pTq->pAlterInfo);
taosMemoryFree(pTq->path);
tqMetaClose(pTq);
streamMetaClose(pTq->pStreamMeta);
taosMemoryFree(pTq);
}
}
......@@ -672,6 +668,9 @@ FAIL:
}
int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
//
return streamMetaAddSerializedTask(pTq->pStreamMeta, msg, msgLen);
#if 0
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) {
return -1;
......@@ -695,6 +694,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
FAIL:
if (pTask) taosMemoryFree(pTask);
return -1;
#endif
}
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
......@@ -710,7 +710,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
}
while (1) {
pIter = taosHashIterate(pTq->pStreamTasks, pIter);
pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
if (pIter == NULL) break;
SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue;
......@@ -744,9 +744,9 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
//
SStreamTaskRunReq* pReq = pMsg->pCont;
int32_t taskId = pReq->taskId;
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (ppTask) {
streamProcessRunReq(*ppTask);
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
if (pTask) {
streamProcessRunReq(pTask);
return 0;
} else {
return -1;
......@@ -762,14 +762,15 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
tDecodeStreamDispatchReq(&decoder, &req);
int32_t taskId = req.taskId;
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (ppTask) {
int32_t taskId = req.taskId;
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
if (pTask) {
SRpcMsg rsp = {
.info = pMsg->info,
.code = 0,
};
streamProcessDispatchReq(*ppTask, &req, &rsp, exec);
streamProcessDispatchReq(pTask, &req, &rsp, exec);
return 0;
} else {
return -1;
......@@ -779,9 +780,9 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) {
SStreamTaskRecoverReq* pReq = pMsg->pCont;
int32_t taskId = pReq->taskId;
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (ppTask) {
streamProcessRecoverReq(*ppTask, pReq, pMsg);
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
if (pTask) {
streamProcessRecoverReq(pTask, pReq, pMsg);
return 0;
} else {
return -1;
......@@ -791,9 +792,9 @@ int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t taskId = pRsp->taskId;
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (ppTask) {
streamProcessDispatchRsp(*ppTask, pRsp);
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
if (pTask) {
streamProcessDispatchRsp(pTask, pRsp);
return 0;
} else {
return -1;
......@@ -803,9 +804,10 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) {
SStreamTaskRecoverRsp* pRsp = pMsg->pCont;
int32_t taskId = pRsp->rspTaskId;
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (ppTask) {
streamProcessRecoverRsp(*ppTask, pRsp);
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
if (pTask) {
streamProcessRecoverRsp(pTask, pRsp);
return 0;
} else {
return -1;
......@@ -815,18 +817,7 @@ int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t));
if (ppTask) {
SStreamTask* pTask = *ppTask;
taosHashRemove(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t));
atomic_store_8(&pTask->taskStatus, TASK_STATUS__DROPPING);
}
// todo
// clear queue
// push drop req into queue
// launch exec to free memory
// remove from hash
return 0;
return streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId);
}
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
......@@ -837,18 +828,18 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
SDecoder decoder;
tDecoderInit(&decoder, msgBody, msgLen);
tDecodeStreamRetrieveReq(&decoder, &req);
int32_t taskId = req.dstTaskId;
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (ppTask) {
int32_t taskId = req.dstTaskId;
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
if (pTask) {
SRpcMsg rsp = {
.info = pMsg->info,
.code = 0,
};
streamProcessRetrieveReq(*ppTask, &req, &rsp);
streamProcessRetrieveReq(pTask, &req, &rsp);
return 0;
} else {
return -1;
}
return 0;
}
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
......@@ -871,16 +862,18 @@ void vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
goto FAIL;
}
int32_t taskId = req.taskId;
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (ppTask) {
int32_t taskId = req.taskId;
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
if (pTask) {
SRpcMsg rsp = {
.info = pMsg->info,
.code = 0,
};
streamProcessDispatchReq(*ppTask, &req, &rsp, false);
streamProcessDispatchReq(pTask, &req, &rsp, false);
return;
}
FAIL:
if (pMsg->info.handle == NULL) return;
SRpcMsg rsp = {
......
......@@ -215,7 +215,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
walApplyVer(pTq->pVnode->pWal, ver);
if (msgType == TDMT_VND_SUBMIT) {
if (taosHashGetSize(pTq->pStreamTasks) == 0) return 0;
if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) return 0;
void* data = taosMemoryMalloc(msgLen);
if (data == NULL) {
......
......@@ -413,7 +413,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
}
}
while (1) {
pIter = taosHashIterate(pTq->pStreamTasks, pIter);
pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
if (pIter == NULL) break;
SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
......
......@@ -36,8 +36,18 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
goto _err;
}
pMeta->pTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
if (pMeta->pTasks == NULL) {
goto _err;
}
if (streamMetaBegin(pMeta) < 0) {
goto _err;
}
pMeta->ahandle = ahandle;
pMeta->expandFunc = expandFunc;
return pMeta;
_err:
return NULL;
......@@ -48,6 +58,48 @@ void streamMetaClose(SStreamMeta* pMeta) {
tdbTbClose(pMeta->pTaskDb);
tdbTbClose(pMeta->pStateDb);
tdbClose(pMeta->db);
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pMeta->pTasks, pIter);
if (pIter == NULL) break;
SStreamTask* pTask = *(SStreamTask**)pIter;
tFreeSStreamTask(pTask);
}
taosHashCleanup(pMeta->pTasks);
taosMemoryFree(pMeta->path);
taosMemoryFree(pMeta);
}
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, char* msg, int32_t msgLen) {
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) {
return -1;
}
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
if (tDecodeSStreamTask(&decoder, pTask) < 0) {
ASSERT(0);
goto FAIL;
}
tDecoderClear(&decoder);
if (pMeta->expandFunc(pMeta->ahandle, pTask) < 0) {
ASSERT(0);
goto FAIL;
}
taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*));
if (tdbTbUpsert(pMeta->pTaskDb, &pTask->taskId, sizeof(int32_t), msg, msgLen, &pMeta->txn) < 0) {
ASSERT(0);
return -1;
}
return 0;
FAIL:
if (pTask) taosMemoryFree(pTask);
return -1;
}
int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask) {
......@@ -80,6 +132,16 @@ int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask) {
return 0;
}
SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId) {
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
if (ppTask) {
ASSERT((*ppTask)->taskId == taskId);
return *ppTask;
} else {
return NULL;
}
}
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
if (ppTask) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册