提交 dc78233f 编写于 作者: H Haojun Liao

Merge branch 'enh/rocksRevert' of github.com:taosdata/tdengine into enh/rocksRevert

......@@ -57,9 +57,10 @@ int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKe
int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, char* pKeyData,
int32_t keyDataLen, state_key_cmpr_fn fn, void** pVal, int32_t* pVLen);
int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key);
int32_t streamStateSessionClear_rocksdb(SStreamState* pState);
int32_t streamStateCurPrev_rocksdb(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key);
int32_t streamStateSessionClear_rocksdb(SStreamState* pState);
int32_t streamStateCurPrev_rocksdb(SStreamState* pState, SStreamStateCur* pCur);
SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinKey* key);
int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
......@@ -89,5 +90,14 @@ int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch);
int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen);
int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen);
int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key);
int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result);
int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result);
void* streamDefaultIterCreate_rocksdb(SStreamState* pState);
int32_t streamDefaultIterValid_rocksdb(void* iter);
void streamDefaultIterSeek_rocksdb(void* iter, const char* key);
void streamDefaultIterNext_rocksdb(void* iter);
char* streamDefaultIterKey_rocksdb(void* iter, int32_t* len);
char* streamDefaultIterVal_rocksdb(void* iter, int32_t* len);
// int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result);
#endif
\ No newline at end of file
......@@ -489,7 +489,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
int i = streamGetInit(funcname); \
if (i < 0) { \
qWarn("streamState failed to get cf name: %s", funcname); \
return -1; \
code = -1; \
break; \
} \
char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
......@@ -515,7 +516,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
int i = streamGetInit(funcname); \
if (i < 0) { \
qWarn("streamState failed to get cf name: %s", funcname); \
return -1; \
code = -1; \
break; \
} \
char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
......@@ -550,7 +552,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
int i = streamGetInit(funcname); \
if (i < 0) { \
qWarn("streamState failed to get cf name: %s", funcname); \
return -1; \
code = -1; \
break; \
} \
char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
......@@ -646,7 +649,79 @@ int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key) {
return code;
}
int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result) {
void* streamDefaultIterCreate_rocksdb(SStreamState* pState) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
pCur->db = pState->pTdbState->rocksdb;
pCur->iter = streamStateIterCreate(pState, "default", &pCur->snapshot, &pCur->readOpt);
return pCur;
}
int32_t streamDefaultIterValid_rocksdb(void* iter) {
SStreamStateCur* pCur = iter;
bool val = rocksdb_iter_valid(pCur->iter);
return val ? 0 : -1;
}
void streamDefaultIterSeek_rocksdb(void* iter, const char* key) {
SStreamStateCur* pCur = iter;
rocksdb_iter_seek(pCur->iter, key, strlen(key));
}
void streamDefaultIterNext_rocksdb(void* iter) {
SStreamStateCur* pCur = iter;
rocksdb_iter_next(pCur->iter);
}
char* streamDefaultIterKey_rocksdb(void* iter, int32_t* len) {
SStreamStateCur* pCur = iter;
return (char*)rocksdb_iter_key(pCur->iter, (size_t*)len);
}
char* streamDefaultIterVal_rocksdb(void* iter, int32_t* len) {
SStreamStateCur* pCur = iter;
return (char*)rocksdb_iter_value(pCur->iter, (size_t*)len);
}
// typedef struct {
// char* start;
// char* end;
// void* result;
// } StreamFilterArg;
// typedef int (*streamfilterFunc)(StreamFilterArg* arg);
// int32_t streamDefaultIterFilter_rocksdb(SStreamState* pState, streamfilterFunc filterFunc, StreamFilterArg* arg) {
// int code = 0;
// char* err = NULL;
// rocksdb_snapshot_t* snapshot = NULL;
// rocksdb_readoptions_t* readopts = NULL;
// rocksdb_iterator_t* pIter = streamStateIterCreate(pState, "default", &snapshot, &readopts);
// if (pIter == NULL) {
// return -1;
// }
// char* start = arg->start;
// char* end = arg->end;
// SArray* result = arg->result;
// rocksdb_iter_seek(pIter, start, strlen(start));
// while (rocksdb_iter_valid(pIter)) {
// const char* key = rocksdb_iter_key(pIter, NULL);
// if (end != NULL && strcmp(key, end) > 0) {
// break;
// }
// if (strncmp(key, start, strlen(start)) == 0 && strlen(key) >= strlen(start) + 1) {
// int64_t checkPoint = 0;
// // if (sscanf(key + strlen(key), ":%" PRId64 "", &checkPoint) == 1) {
// // taosArrayPush(result, &checkPoint);
// // }
// } else {
// break;
// }
// rocksdb_iter_next(pIter);
// }
// rocksdb_release_snapshot(pState->pTdbState->rocksdb, snapshot);
// rocksdb_readoptions_destroy(readopts);
// rocksdb_iter_destroy(pIter);
// return code;
// }
int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result) {
int code = 0;
char* err = NULL;
......@@ -896,6 +971,24 @@ int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* ke
memset(*pVal, 0, size);
return 0;
}
SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateGetCur_rocksdb");
int32_t code = 0;
const SStateKey maxStateKey = {.key = {.groupId = UINT64_MAX, .ts = INT64_MAX}, .opNum = INT64_MAX};
STREAM_STATE_PUT_ROCKSDB(pState, "state", &maxStateKey, "", 0);
char buf[128] = {0};
int32_t klen = stateKeyEncode((void*)&maxStateKey, buf);
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) return NULL;
pCur->db = pState->pTdbState->rocksdb;
pCur->iter = streamStateIterCreate(pState, "state", &pCur->snapshot, &pCur->readOpt);
rocksdb_iter_seek(pCur->iter, buf, (size_t)klen);
rocksdb_iter_prev(pCur->iter);
STREAM_STATE_DEL_ROCKSDB(pState, "state", &maxStateKey);
return pCur;
}
SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateGetCur_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
......
......@@ -327,7 +327,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
SListIter iter = {0};
tdListInitIter(pSnapshot, &iter, TD_LIST_FORWARD);
const int32_t BATCH_LIMIT = 128;
const int32_t BATCH_LIMIT = 256;
SListNode* pNode = NULL;
void* batch = streamStateCreateBatch();
......@@ -382,7 +382,7 @@ int32_t forceRemoveCheckpoint(SStreamFileState* pFileState, int64_t checkpointId
int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list) {
const char* taskKey = "streamFileState";
return streamDefaultIter_rocksdb(pFileState->pFileStore, taskKey, NULL, list);
return streamDefaultIterGet_rocksdb(pFileState->pFileStore, taskKey, NULL, list);
}
int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
......@@ -414,6 +414,7 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
sscanf(val, "%" PRId64 "", &ts);
taosMemoryFree(val);
if (ts < mark) {
// statekey winkey.ts < mark
forceRemoveCheckpoint(pFileState, i);
break;
} else {
......@@ -429,14 +430,11 @@ int32_t recoverSnapshot(SStreamFileState* pFileState) {
int32_t len = 0;
SWinKey key = {.groupId = 0, .ts = 0};
SStreamStateCur* pCur = streamStateGetCur_rocksdb(pFileState->pFileStore, &key);
if (!pCur) {
return TSDB_CODE_FAILED;
}
code = streamStateSeekLast(pFileState->pFileStore, pCur);
if (code != TSDB_CODE_SUCCESS) {
return code;
SStreamStateCur* pCur = streamStateSeekToLast_rocksdb(pFileState->pFileStore, &key);
if (pCur == NULL) {
return -1;
}
while (code == TSDB_CODE_SUCCESS) {
if (pFileState->curRowCount == pFileState->maxRowCount) {
break;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册