提交 74d5f029 编写于 作者: dengyihao's avatar dengyihao

fix invalid free

上级 dfebe434
......@@ -163,6 +163,7 @@ typedef struct {
int64_t checkPointId;
int32_t taskId;
int64_t streamId;
int64_t streamBackendRid;
} SStreamState;
typedef struct SFunctionStateStore {
......
......@@ -344,7 +344,6 @@ typedef struct SStreamMeta {
SRWLatch lock;
int32_t walScanCounter;
void* streamBackend;
int32_t streamBackendId;
int64_t streamBackendRid;
SHashObj* pTaskBackendUnique;
} SStreamMeta;
......
......@@ -36,8 +36,9 @@ static SStreamGlobalEnv streamEnv;
int32_t streamDispatchStreamBlock(SStreamTask* pTask);
SStreamDataBlock* createStreamDataFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg);
SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes);
void destroyStreamDataBlock(SStreamDataBlock* pBlock);
SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize,
SArray* pRes);
void destroyStreamDataBlock(SStreamDataBlock* pBlock);
int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData);
int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* data);
......@@ -53,6 +54,8 @@ int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecov
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem);
extern int32_t streamBackendId;
#ifdef __cplusplus
}
#endif
......
......@@ -16,7 +16,9 @@
#include "streamBackendRocksdb.h"
#include "executor.h"
#include "query.h"
#include "streamInc.h"
#include "tcommon.h"
#include "tref.h"
typedef struct SCompactFilteFactory {
void* status;
......@@ -79,7 +81,7 @@ const char* compareParKeyName(void* name);
const char* comparePartagKeyName(void* name);
void* streamBackendInit(const char* path) {
qDebug("init stream backend");
qDebug("start to init stream backend at %s", path);
SBackendHandle* pHandle = calloc(1, sizeof(SBackendHandle));
pHandle->list = tdListNew(sizeof(SCfComparator));
taosThreadMutexInit(&pHandle->mutex, NULL);
......@@ -129,6 +131,7 @@ void* streamBackendInit(const char* path) {
if (cfs != NULL) {
rocksdb_list_column_families_destroy(cfs, nCf);
}
qDebug("succ to init stream backend at %s, backend:%p", path, pHandle);
return (void*)pHandle;
_EXIT:
......@@ -141,6 +144,7 @@ _EXIT:
rocksdb_compactionfilterfactory_destroy(pHandle->filterFactory);
tdListFree(pHandle->list);
free(pHandle);
qDebug("failed to init stream backend at %s", path);
return NULL;
}
void streamBackendCleanup(void* arg) {
......@@ -180,7 +184,7 @@ void streamBackendCleanup(void* arg) {
taosThreadMutexDestroy(&pHandle->cfMutex);
taosMemoryFree(pHandle);
qDebug("destroy stream backend backend:%p", pHandle);
return;
}
SListNode* streamBackendAddCompare(void* backend, void* arg) {
......@@ -803,7 +807,8 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
return 0;
}
int streamStateOpenBackend(void* backend, SStreamState* pState) {
qInfo("start to open backend, %p 0x%" PRIx64 "-%d", pState, pState->streamId, pState->taskId);
qInfo("start to open state %p on backend %p 0x%" PRIx64 "-%d", pState, backend, pState->streamId, pState->taskId);
taosAcquireRef(streamBackendId, pState->streamBackendRid);
SBackendHandle* handle = backend;
sprintf(pState->pTdbState->idstr, "0x%" PRIx64 "-%d", pState->streamId, pState->taskId);
......@@ -866,7 +871,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen};
pState->pTdbState->pComparNode = streamBackendAddCompare(handle, &compare);
// rocksdb_writeoptions_disable_WAL(pState->pTdbState->writeOpts, 1);
qInfo("succ to open backend, %p, 0x%" PRIx64 "-%d", pState, pState->streamId, pState->taskId);
qInfo("succ to open state %p on backend, %p, 0x%" PRIx64 "-%d", pState, handle, pState->streamId, pState->taskId);
return 0;
}
......@@ -882,8 +887,8 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) {
taosThreadMutexUnlock(&pHandle->cfMutex);
char* status[] = {"close", "drop"};
qInfo("start to %s backend, %p, 0x%" PRIx64 "-%d", status[remove == false ? 0 : 1], pState, pState->streamId,
pState->taskId);
qInfo("start to %s state %p on backend %p 0x%" PRIx64 "-%d", status[remove == false ? 0 : 1], pState, pHandle,
pState->streamId, pState->taskId);
if (pState->pTdbState->rocksdb == NULL) {
return;
}
......@@ -938,6 +943,7 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) {
taosThreadRwlockDestroy(&pState->pTdbState->rwLock);
pState->pTdbState->rocksdb = NULL;
taosReleaseRef(streamBackendId, pState->streamBackendRid);
}
void streamStateDestroyCompar(void* arg) {
SCfComparator* comp = (SCfComparator*)arg;
......
......@@ -20,7 +20,7 @@
#include "ttimer.h"
static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT;
static int32_t streamBackendId = 0;
int32_t streamBackendId = 0;
static void streamMetaEnvInit() { streamBackendId = taosOpenRef(20, streamBackendCleanup); }
void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); }
......@@ -79,7 +79,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->vgId = vgId;
pMeta->ahandle = ahandle;
pMeta->expandFunc = expandFunc;
pMeta->streamBackendId = streamBackendId;
memset(streamPath, 0, len);
sprintf(streamPath, "%s/%s", pMeta->path, "state");
......
......@@ -106,7 +106,7 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz
}
SStreamTask* pStreamTask = pTask;
char statePath[1024];
char statePath[1024];
if (!specPath) {
sprintf(statePath, "%s/%d", path, pStreamTask->id.taskId);
} else {
......@@ -119,10 +119,10 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz
#ifdef USE_ROCKSDB
SStreamMeta* pMeta = pStreamTask->pMeta;
taosAcquireRef(pMeta->streamBackendId, pMeta->streamBackendRid);
pState->streamBackendRid = pMeta->streamBackendRid;
int code = streamStateOpenBackend(pMeta->streamBackend, pState);
if (code == -1) {
taosReleaseRef(pMeta->streamBackendId, pMeta->streamBackendRid);
taosReleaseRef(streamBackendId, pMeta->streamBackendRid);
taosMemoryFree(pState);
pState = NULL;
}
......@@ -224,7 +224,7 @@ void streamStateClose(SStreamState* pState, bool remove) {
#ifdef USE_ROCKSDB
// streamStateCloseBackend(pState);
streamStateDestroy(pState, remove);
taosReleaseRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid);
//taosReleaseRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid);
#else
tdbCommit(pState->pTdbState->db, pState->pTdbState->txn);
tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn);
......@@ -278,10 +278,10 @@ int32_t streamStateCommit(SStreamState* pState) {
int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
#ifdef USE_ROCKSDB
void* pVal = NULL;
int32_t len = 0;
int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &len);
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
void* pVal = NULL;
int32_t len = 0;
int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &len);
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState);
memcpy(buf + len - rowSize, value, vLen);
return code;
......@@ -291,10 +291,10 @@ int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void*
}
int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen) {
#ifdef USE_ROCKSDB
void* pVal = NULL;
int32_t len = 0;
int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &len);
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
void* pVal = NULL;
int32_t len = 0;
int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &len);
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState);
*ppVal = buf + len - rowSize;
return code;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册