提交 0315e895 编写于 作者: dengyihao's avatar dengyihao

Avoid creating the same ID task multiple times

上级 64edab63
...@@ -719,7 +719,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t ...@@ -719,7 +719,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
qDebug("succ to open rocksdb cf"); qDebug("succ to open rocksdb cf");
} }
// close default cf // close default cf
rocksdb_column_family_handle_destroy(cfHandle[0]); if (((rocksdb_column_family_handle_t**)cfHandle)[0] != 0) rocksdb_column_family_handle_destroy(cfHandle[0]);
rocksdb_options_destroy(cfOpts[0]); rocksdb_options_destroy(cfOpts[0]);
handle->db = db; handle->db = db;
......
...@@ -205,24 +205,25 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { ...@@ -205,24 +205,25 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
// add to the ready tasks hash map, not the restored tasks hash map // add to the ready tasks hash map, not the restored tasks hash map
int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) { int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) {
if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
tFreeStreamTask(pTask);
return -1;
}
if (streamMetaSaveTask(pMeta, pTask) < 0) {
tFreeStreamTask(pTask);
return -1;
}
if (streamMetaCommit(pMeta) < 0) {
tFreeStreamTask(pTask);
return -1;
}
void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId)); void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId));
if (p == NULL) { if (p == NULL) {
if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
tFreeStreamTask(pTask);
return -1;
}
if (streamMetaSaveTask(pMeta, pTask) < 0) {
tFreeStreamTask(pTask);
return -1;
}
if (streamMetaCommit(pMeta) < 0) {
tFreeStreamTask(pTask);
return -1;
}
taosArrayPush(pMeta->pTaskList, &pTask->id.taskId); taosArrayPush(pMeta->pTaskList, &pTask->id.taskId);
} else {
return 0;
} }
taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, POINTER_BYTES); taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, POINTER_BYTES);
...@@ -359,18 +360,19 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { ...@@ -359,18 +360,19 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
tDecodeStreamTask(&decoder, pTask); tDecodeStreamTask(&decoder, pTask);
tDecoderClear(&decoder); tDecoderClear(&decoder);
if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.version) < 0) { // remove duplicate
tdbFree(pKey);
tdbFree(pVal);
tdbTbcClose(pCur);
return -1;
}
void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId)); void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId));
if (p == NULL) { if (p == NULL) {
if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.version) < 0) {
tdbFree(pKey);
tdbFree(pVal);
tdbTbcClose(pCur);
return -1;
}
taosArrayPush(pMeta->pTaskList, &pTask->id.taskId); taosArrayPush(pMeta->pTaskList, &pTask->id.taskId);
} else {
continue;
} }
if (taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, sizeof(void*)) < 0) { if (taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, sizeof(void*)) < 0) {
tdbFree(pKey); tdbFree(pKey);
tdbFree(pVal); tdbFree(pVal);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册