提交 1e8579e8 编写于 作者: H Haojun Liao

fix(stream): fix other cases.

上级 4c929973
......@@ -644,7 +644,7 @@ void streamMetaClose(SStreamMeta* streamMeta);
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded);
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId);
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); // todo remove it
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
......
......@@ -185,7 +185,7 @@ int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) {
return 0;
}
streamMetaUnregisterTask(pSnode->pMeta, pReq->taskId);
streamMetaUnregisterTask(pSnode->pMeta, pReq->streamId, pReq->taskId);
streamMetaReleaseTask(pSnode->pMeta, pTask);
return 0;
}
......
......@@ -1242,7 +1242,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
tqDebug("s-task:%s fill-history task set status to be dropping", id);
streamMetaUnregisterTask(pMeta, pTask->id.taskId);
streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId);
streamMetaReleaseTask(pMeta, pTask);
return -1;
}
......@@ -1575,7 +1575,7 @@ int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgL
return 0;
}
streamMetaUnregisterTask(pTq->pStreamMeta, pReq->taskId);
streamMetaUnregisterTask(pTq->pStreamMeta, pReq->streamId, pReq->taskId);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0;
}
......
......@@ -350,10 +350,9 @@ static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
streamTaskResumeFromHalt(pStreamTask);
qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr);
int32_t taskId = pTask->id.taskId;
// 5. free it and remove fill-history task from disk meta-store
streamMetaUnregisterTask(pMeta, taskId);
streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId);
// 6. save to disk
taosWLockLatch(&pMeta->lock);
......
......@@ -161,43 +161,6 @@ void streamMetaClose(SStreamMeta* pMeta) {
taosMemoryFree(pMeta);
}
#if 0
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t ver, char* msg, int32_t msgLen) {
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) {
return -1;
}
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
if (tDecodeStreamTask(&decoder, pTask) < 0) {
tDecoderClear(&decoder);
goto FAIL;
}
tDecoderClear(&decoder);
if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
ASSERT(0);
goto FAIL;
}
if (taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) {
goto FAIL;
}
if (tdbTbUpsert(pMeta->pTaskDb, &pTask->id.taskId, sizeof(int32_t), msg, msgLen, pMeta->txn) < 0) {
taosHashRemove(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t));
ASSERT(0);
goto FAIL;
}
return 0;
FAIL:
if (pTask) tFreeStreamTask(pTask);
return -1;
}
#endif
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
void* buf = NULL;
int32_t len;
......@@ -241,7 +204,8 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded) {
*pAdded = false;
void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId));
int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId};
void* p = taosHashGet(pMeta->pTasks, keys, sizeof(keys));
if (p == NULL) {
if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
tFreeStreamTask(pTask);
......@@ -263,7 +227,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
return 0;
}
taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, POINTER_BYTES);
taosHashPut(pMeta->pTasks, keys, sizeof(keys), &pTask, POINTER_BYTES);
*pAdded = true;
return 0;
}
......@@ -315,12 +279,14 @@ static void doRemoveIdFromList(SStreamMeta* pMeta, int32_t num, SStreamId* id) {
}
}
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId) {
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
SStreamTask* pTask = NULL;
// pre-delete operation
taosWLockLatch(&pMeta->lock);
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
int64_t keys[2] = {streamId, taskId};
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys));
if (ppTask) {
pTask = *ppTask;
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING);
......@@ -336,7 +302,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId) {
while (1) {
taosRLockLatch(&pMeta->lock);
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys));
if (ppTask) {
if ((*ppTask)->status.timerActive == 0) {
......@@ -355,7 +321,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId) {
// let's do delete of stream task
taosWLockLatch(&pMeta->lock);
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys));
if (ppTask) {
taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t));
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING);
......@@ -472,7 +438,8 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
}
// do duplicate task check.
void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId));
int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId};
void* p = taosHashGet(pMeta->pTasks, keys, sizeof(keys));
if (p == NULL) {
if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.version) < 0) {
tdbFree(pKey);
......@@ -492,7 +459,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
continue;
}
if (taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, sizeof(void*)) < 0) {
if (taosHashPut(pMeta->pTasks, keys, sizeof(keys), &pTask, sizeof(void*)) < 0) {
tdbFree(pKey);
tdbFree(pVal);
tdbTbcClose(pCur);
......
......@@ -541,7 +541,9 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
qDebug("s-task:0x%x in timer to launch related history task", pInfo->taskId);
taosWLockLatch(&pMeta->lock);
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &pInfo->taskId, sizeof(int32_t));
int64_t keys[2] = {pInfo->streamId, pInfo->taskId};
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys));
if (ppTask) {
ASSERT((*ppTask)->status.timerActive == 1);
......@@ -596,8 +598,9 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta;
int32_t hTaskId = pTask->historyTaskId.taskId;
int64_t keys[2] = {pTask->historyTaskId.streamId, pTask->historyTaskId.taskId};
// Set the execute conditions, including the query time window and the version range
SStreamTask** pHTask = taosHashGet(pMeta->pTasks, &hTaskId, sizeof(hTaskId));
SStreamTask** pHTask = taosHashGet(pMeta->pTasks, keys, sizeof(keys));
if (pHTask == NULL) {
qWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since it is not built yet", pTask->id.idStr,
pMeta->vgId, hTaskId);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册