提交 0bdccd26 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 d29f835a
......@@ -635,13 +635,15 @@ void streamMetaInit();
void streamMetaCleanup();
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId);
void streamMetaClose(SStreamMeta* streamMeta);
// save to b-tree meta store
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask);
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t checkpointVer, char* msg, int32_t msgLen);
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask);
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId);
int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta); // todo remove it
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
int32_t streamMetaBegin(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta);
......
......@@ -160,7 +160,7 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {
// 2.save task
taosWLockLatch(&pSnode->pMeta->lock);
code = streamMetaAddDeployedTask(pSnode->pMeta, -1, pTask);
code = streamMetaRegisterTask(pSnode->pMeta, -1, pTask);
if (code < 0) {
taosWUnLockLatch(&pSnode->pMeta->lock);
return -1;
......@@ -179,7 +179,17 @@ int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) {
SVDropStreamTaskReq *pReq = (SVDropStreamTaskReq *)msg;
qDebug("snode:%d receive msg to drop stream task:0x%x", pSnode->pMeta->vgId, pReq->taskId);
SStreamTask* pTask = streamMetaAcquireTask(pSnode->pMeta, pReq->taskId);
if (pTask == NULL) {
qError("vgId:%d failed to acquire s-task:0x%x when dropping it", pSnode->pMeta->vgId, pReq->taskId);
return 0;
}
streamMetaUnregisterTask(pSnode->pMeta, pReq->taskId);
streamMetaRemoveTask(pSnode->pMeta, pReq->taskId);
streamMetaReleaseTask(pSnode->pMeta, pTask);
streamMetaReleaseTask(pSnode->pMeta, pTask);
return 0;
}
......
......@@ -1041,7 +1041,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
// 2.save task, use the newest commit version as the initial start version of stream task.
int32_t taskId = 0;
taosWLockLatch(&pStreamMeta->lock);
code = streamMetaAddDeployedTask(pStreamMeta, sversion, pTask);
code = streamMetaRegisterTask(pStreamMeta, sversion, pTask);
taskId = pTask->id.taskId;
int32_t numOfTasks = streamMetaGetNumOfTasks(pStreamMeta);
......@@ -1468,8 +1468,17 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
tqDebug("vgId:%d receive msg to drop stream task:0x%x", TD_VID(pTq->pVnode), pReq->taskId);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
if (pTask == NULL) {
tqError("vgId:%d failed to acquire s-task:0x%x when dropping it", pTq->pStreamMeta->vgId, pReq->taskId);
return 0;
}
streamMetaUnregisterTask(pTq->pStreamMeta, pReq->taskId);
streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0;
}
......
......@@ -101,7 +101,7 @@ static void streamSchedByTimer(void* param, void* tmrId) {
}
int32_t streamSetupScheduleTrigger(SStreamTask* pTask) {
if (pTask->triggerParam != 0) {
if (pTask->triggerParam != 0 && pTask->info.fillHistory == 0) {
int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1);
ASSERT(ref == 2 && pTask->schedTimer == NULL);
......
......@@ -409,12 +409,15 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr);
int32_t taskId = pTask->id.taskId;
pTask->status.taskStatus = TASK_STATUS__DROPPING;
streamMetaRemoveTask(pMeta, pTask->id.taskId);
// free it and remove it from disk meta-store
streamMetaUnregisterTask(pMeta, pTask->id.taskId);
streamMetaRemoveTask(pMeta, taskId);
// save to disk
taosWLockLatch(&pMeta->lock);
streamMetaSaveTask(pMeta, pTask);
streamMetaSaveTask(pMeta, pStreamTask);
if (streamMetaCommit(pMeta) < 0) {
// persist to disk
......@@ -499,6 +502,10 @@ int32_t streamExecForAll(SStreamTask* pTask) {
while (1) {
int32_t batchSize = 0;
SStreamQueueItem* pInput = NULL;
if (streamTaskShouldStop(&pTask->status)) {
qDebug("s-task:%s stream task stopped, abort", id);
break;
}
// merge multiple input data if possible in the input queue.
qDebug("s-task:%s start to extract data block from inputQ", id);
......
......@@ -217,6 +217,7 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
tEncoderClear(&encoder);
if (tdbTbUpsert(pMeta->pTaskDb, &pTask->id.taskId, sizeof(int32_t), buf, len, pMeta->txn) < 0) {
qError("s-task:0x%x save to disk failed, code:%s", pTask->id.idStr, tstrerror(terrno));
return -1;
}
......@@ -224,8 +225,22 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
return 0;
}
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
taosWLockLatch(&pMeta->lock);
int32_t code = tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(taskId), pMeta->txn);
taosWUnLockLatch(&pMeta->lock);
if (code != 0) {
qError("vgId:%d failed to remove task:0x%x from metastore, code:%s", pMeta->vgId, taskId, tstrerror(terrno));
} else {
qDebug("vgId:%d remove task:0x%x from metastore", pMeta->vgId, taskId);
}
return code;
}
// add to the ready tasks hash map, not the restored tasks hash map
int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) {
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) {
void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId));
if (p == NULL) {
if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
......@@ -242,6 +257,7 @@ int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask*
tFreeStreamTask(pTask);
return -1;
}
taosArrayPush(pMeta->pTaskList, &pTask->id.taskId);
} else {
return 0;
......@@ -281,6 +297,7 @@ void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) {
qTrace("s-task:%s release task, ref:%d", pTask->id.idStr, ref);
} else if (ref == 0) {
ASSERT(streamTaskShouldStop(&pTask->status));
qTrace("s-task:%s all refs are gone, free it");
tFreeStreamTask(pTask);
} else if (ref < 0) {
qError("task ref is invalid, ref:%d, %s", ref, pTask->id.idStr);
......@@ -297,7 +314,7 @@ static void doRemoveIdFromList(SStreamMeta* pMeta, int32_t num, int32_t taskId)
}
}
void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId) {
SStreamTask* pTask = NULL;
// pre-delete operation
......@@ -309,7 +326,7 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
} else {
qDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", pMeta->vgId, taskId);
taosWUnLockLatch(&pMeta->lock);
return;
return 0;
}
taosWUnLockLatch(&pMeta->lock);
......@@ -339,9 +356,8 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
if (ppTask) {
taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t));
tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), pMeta->txn);
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING);
ASSERT(pTask->status.timerActive == 0);
int32_t num = taosArrayGetSize(pMeta->pTaskList);
......@@ -351,15 +367,13 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
// remove the ref by timer
if (pTask->triggerParam != 0) {
taosTmrStop(pTask->schedTimer);
streamMetaReleaseTask(pMeta, pTask);
}
streamMetaReleaseTask(pMeta, pTask);
} else {
qDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", pMeta->vgId, taskId);
}
taosWUnLockLatch(&pMeta->lock);
return 0;
}
int32_t streamMetaBegin(SStreamMeta* pMeta) {
......@@ -404,7 +418,9 @@ int32_t streamMetaAbort(SStreamMeta* pMeta) {
int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
TBC* pCur = NULL;
if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
qError("vgId:%d failed to open stream meta, code:%s", pMeta->vgId, tstrerror(terrno));
return -1;
}
......@@ -413,6 +429,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
void* pVal = NULL;
int32_t vLen = 0;
SDecoder decoder;
SArray* pRecycleList = taosArrayInit(4, sizeof(int32_t));
tdbTbcMoveToFirst(pCur);
......@@ -422,6 +439,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
tdbFree(pKey);
tdbFree(pVal);
tdbTbcClose(pCur);
taosArrayDestroy(pRecycleList);
return -1;
}
......@@ -429,16 +447,29 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
tDecodeStreamTask(&decoder, pTask);
tDecoderClear(&decoder);
// remove duplicate
if (pTask->status.taskStatus == TASK_STATUS__DROPPING) {
int32_t taskId = pTask->id.taskId;
tFreeStreamTask(pTask);
taosArrayPush(pRecycleList, &taskId);
int32_t total = taosArrayGetSize(pRecycleList);
qDebug("s-task:0x%x is already dropped, add into recycle list, total:%d", taskId, total);
continue;
}
// do duplicate task check.
void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId));
if (p == NULL) {
if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.version) < 0) {
tdbFree(pKey);
tdbFree(pVal);
tdbTbcClose(pCur);
taosMemoryFree(pTask);
tFreeStreamTask(pTask);
taosArrayDestroy(pRecycleList);
return -1;
}
taosArrayPush(pMeta->pTaskList, &pTask->id.taskId);
} else {
tdbFree(pKey);
......@@ -452,7 +483,8 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
tdbFree(pKey);
tdbFree(pVal);
tdbTbcClose(pCur);
taosMemoryFree(pTask);
tFreeStreamTask(pTask);
taosArrayDestroy(pRecycleList);
return -1;
}
......@@ -462,8 +494,18 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
tdbFree(pKey);
tdbFree(pVal);
if (tdbTbcClose(pCur) < 0) {
taosArrayDestroy(pRecycleList);
return -1;
}
if (taosArrayGetSize(pRecycleList) > 0) {
for(int32_t i = 0; i < taosArrayGetSize(pRecycleList); ++i) {
int32_t taskId = *(int32_t*) taosArrayGet(pRecycleList, i);
streamMetaRemoveTask(pMeta, taskId);
}
}
qDebug("vgId:%d load %d task from disk", pMeta->vgId, taosArrayGetSize(pMeta->pTaskList));
taosArrayDestroy(pRecycleList);
return 0;
}
......@@ -765,7 +765,13 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
} else {
SHistDataRange* pRange = &pTask->dataRange;
int64_t ekey = pRange->window.ekey + 1;
int64_t ekey = 0;
if (pRange->window.ekey < INT64_MAX) {
ekey = pRange->window.ekey + 1;
} else {
ekey = pRange->window.ekey;
}
int64_t ver = pRange->range.minVer;
pRange->window.skey = ekey;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册