提交 1ec57798 编写于 作者: L Liu Jicong

ci: remove unstable case

上级 1e7d9ada
......@@ -88,7 +88,7 @@ typedef struct {
EWalType level; // wal level
} SWalCfg;
typedef struct SWalVer {
typedef struct {
int64_t firstVer;
int64_t verInSnapshotting;
int64_t snapshotVer;
......@@ -149,17 +149,22 @@ typedef struct SWal {
SWalCkHead writeHead;
} SWal; // WAL HANDLE
typedef struct SWalReadHandle {
SWal *pWal;
TdFilePtr pReadLogTFile;
TdFilePtr pReadIdxTFile;
int64_t curFileFirstVer;
int64_t curVersion;
int64_t capacity;
int64_t status; // if cursor valid
TdThreadMutex mutex;
SWalCkHead *pHead;
} SWalReadHandle;
typedef struct {
int8_t scanUncommited;
int8_t scanMeta;
} SWalFilterCond;
typedef struct {
SWal *pWal;
TdFilePtr pLogFile;
TdFilePtr pIdxFile;
int64_t curFileFirstVer;
int64_t curVersion;
int64_t capacity;
TdThreadMutex mutex;
SWalFilterCond cond;
SWalCkHead *pHead;
} SWalReader;
// module initialization
int32_t walInit();
......@@ -186,15 +191,16 @@ int32_t walRestoreFromSnapshot(SWal *, int64_t ver);
// int32_t walDataCorrupted(SWal*);
// read
SWalReadHandle *walOpenReadHandle(SWal *);
void walCloseReadHandle(SWalReadHandle *);
int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver);
SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond);
void walCloseReader(SWalReader *pRead);
int32_t walReadVer(SWalReader *pRead, int64_t ver);
int32_t walNextValidMsg(SWalReader *pRead, SWalCkHead **ppHead);
// only for tq usage
void walSetReaderCapacity(SWalReadHandle *pRead, int32_t capacity);
int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalCkHead *pHead);
int32_t walFetchBody(SWalReadHandle *pRead, SWalCkHead **ppHead);
int32_t walSkipFetchBody(SWalReadHandle *pRead, const SWalCkHead *pHead);
void walSetReaderCapacity(SWalReader *pRead, int32_t capacity);
int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead);
int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead);
int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead);
typedef struct {
int64_t refId;
......
......@@ -128,7 +128,7 @@ typedef struct {
int8_t fetchMeta;
// reader
SWalReadHandle* pWalReader;
SWalReader* pWalReader;
// push
STqPushHandle pushHandle;
......
......@@ -149,7 +149,6 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
SEncoder encoder;
tEncoderInit(&encoder, abuf, len);
tEncodeSMqDataRsp(&encoder, pRsp);
/*tEncodeSMqDataBlkRsp(&abuf, pRsp);*/
SRpcMsg rsp = {
.info = pMsg->info,
......@@ -447,7 +446,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
pHandle->execHandle.subType = req.subType;
pHandle->fetchMeta = req.withMeta;
pHandle->pWalReader = walOpenReadHandle(pTq->pVnode->pWal);
pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
for (int32_t i = 0; i < 5; i++) {
pHandle->execHandle.pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
}
......
......@@ -77,7 +77,7 @@ int32_t tqMetaOpen(STQ* pTq) {
STqHandle handle;
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
tDecodeSTqHandle(&decoder, &handle);
handle.pWalReader = walOpenReadHandle(pTq->pVnode->pWal);
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
for (int32_t i = 0; i < 5; i++) {
handle.execHandle.pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
}
......
......@@ -32,8 +32,8 @@ typedef struct SSyncLogStoreData {
SSyncNode* pSyncNode;
SWal* pWal;
TdThreadMutex mutex;
SWalReadHandle* pWalHandle;
TdThreadMutex mutex;
SWalReader* pWalHandle;
// SyncIndex beginIndex; // valid begin index, default 0, may be set beginIndex > 0
} SSyncLogStoreData;
......
......@@ -62,7 +62,7 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
ASSERT(pData->pWal != NULL);
taosThreadMutexInit(&(pData->mutex), NULL);
pData->pWalHandle = walOpenReadHandle(pData->pWal);
pData->pWalHandle = walOpenReader(pData->pWal, NULL);
ASSERT(pData->pWalHandle != NULL);
pLogStore->appendEntry = logStoreAppendEntry;
......@@ -95,7 +95,7 @@ void logStoreDestory(SSyncLogStore* pLogStore) {
taosThreadMutexLock(&(pData->mutex));
if (pData->pWalHandle != NULL) {
walCloseReadHandle(pData->pWalHandle);
walCloseReader(pData->pWalHandle);
pData->pWalHandle = NULL;
}
taosThreadMutexUnlock(&(pData->mutex));
......@@ -255,7 +255,7 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index,
*ppEntry = NULL;
// SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
SWalReadHandle* pWalHandle = pData->pWalHandle;
SWalReader* pWalHandle = pData->pWalHandle;
if (pWalHandle == NULL) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
......@@ -263,7 +263,7 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index,
taosThreadMutexLock(&(pData->mutex));
code = walReadWithHandle(pWalHandle, index);
code = walReadVer(pWalHandle, index);
if (code != 0) {
int32_t err = terrno;
const char* errStr = tstrerror(err);
......@@ -398,10 +398,10 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
taosThreadMutexLock(&(pData->mutex));
// SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
SWalReadHandle* pWalHandle = pData->pWalHandle;
SWalReader* pWalHandle = pData->pWalHandle;
ASSERT(pWalHandle != NULL);
int32_t code = walReadWithHandle(pWalHandle, index);
int32_t code = walReadVer(pWalHandle, index);
if (code != 0) {
int32_t err = terrno;
const char* errStr = tstrerror(err);
......
......@@ -16,20 +16,20 @@
#include "taoserror.h"
#include "walInt.h"
SWalReadHandle *walOpenReadHandle(SWal *pWal) {
SWalReadHandle *pRead = taosMemoryMalloc(sizeof(SWalReadHandle));
SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) {
SWalReader *pRead = taosMemoryMalloc(sizeof(SWalReader));
if (pRead == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pRead->pWal = pWal;
pRead->pReadIdxTFile = NULL;
pRead->pReadLogTFile = NULL;
pRead->pIdxFile = NULL;
pRead->pLogFile = NULL;
pRead->curVersion = -1;
pRead->curFileFirstVer = -1;
pRead->capacity = 0;
pRead->status = 0;
pRead->cond = *cond;
taosThreadMutexInit(&pRead->mutex, NULL);
......@@ -42,23 +42,24 @@ SWalReadHandle *walOpenReadHandle(SWal *pWal) {
return pRead;
}
void walCloseReadHandle(SWalReadHandle *pRead) {
taosCloseFile(&pRead->pReadIdxTFile);
taosCloseFile(&pRead->pReadLogTFile);
void walCloseReader(SWalReader *pRead) {
taosCloseFile(&pRead->pIdxFile);
taosCloseFile(&pRead->pLogFile);
taosMemoryFreeClear(pRead->pHead);
taosMemoryFree(pRead);
}
int32_t walRegisterRead(SWalReadHandle *pRead, int64_t ver) {
// TODO
int32_t walNextValidMsg(SWalReader *pRead, SWalCkHead **ppHead) {
//
return 0;
}
static int64_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, int64_t ver) {
static int64_t walReadSeekFilePos(SWalReader *pRead, int64_t fileFirstVer, int64_t ver) {
int64_t ret = 0;
TdFilePtr pIdxTFile = pRead->pReadIdxTFile;
TdFilePtr pLogTFile = pRead->pReadLogTFile;
TdFilePtr pIdxTFile = pRead->pIdxFile;
TdFilePtr pLogTFile = pRead->pLogFile;
// seek position
int64_t offset = (ver - fileFirstVer) * sizeof(SWalIdxEntry);
......@@ -90,11 +91,11 @@ static int64_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, i
return ret;
}
static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) {
static int32_t walReadChangeFile(SWalReader *pRead, int64_t fileFirstVer) {
char fnameStr[WAL_FILE_LEN];
taosCloseFile(&pRead->pReadIdxTFile);
taosCloseFile(&pRead->pReadLogTFile);
taosCloseFile(&pRead->pIdxFile);
taosCloseFile(&pRead->pLogFile);
walBuildLogName(pRead->pWal, fileFirstVer, fnameStr);
TdFilePtr pLogTFile = taosOpenFile(fnameStr, TD_FILE_READ);
......@@ -104,7 +105,7 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) {
return -1;
}
pRead->pReadLogTFile = pLogTFile;
pRead->pLogFile = pLogTFile;
walBuildIdxName(pRead->pWal, fileFirstVer, fnameStr);
TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_READ);
......@@ -114,11 +115,11 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) {
return -1;
}
pRead->pReadIdxTFile = pIdxTFile;
pRead->pIdxFile = pIdxTFile;
return 0;
}
static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) {
static int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) {
SWal *pWal = pRead->pWal;
if (ver == pRead->curVersion) {
return 0;
......@@ -153,9 +154,9 @@ static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) {
return 0;
}
void walSetReaderCapacity(SWalReadHandle *pRead, int32_t capacity) { pRead->capacity = capacity; }
void walSetReaderCapacity(SWalReader *pRead, int32_t capacity) { pRead->capacity = capacity; }
int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalCkHead *pHead) {
int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
int64_t code;
// TODO: valid ver
......@@ -168,9 +169,9 @@ int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalCkHead *pHead) {
if (code < 0) return -1;
}
ASSERT(taosValidFile(pRead->pReadLogTFile) == true);
ASSERT(taosValidFile(pRead->pLogFile) == true);
code = taosReadFile(pRead->pReadLogTFile, pHead, sizeof(SWalCkHead));
code = taosReadFile(pRead->pLogFile, pHead, sizeof(SWalCkHead));
if (code != sizeof(SWalCkHead)) {
return -1;
}
......@@ -186,12 +187,12 @@ int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalCkHead *pHead) {
return 0;
}
int32_t walSkipFetchBody(SWalReadHandle *pRead, const SWalCkHead *pHead) {
int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead) {
int64_t code;
ASSERT(pRead->curVersion == pHead->head.version);
code = taosLSeekFile(pRead->pReadLogTFile, pHead->head.bodyLen, SEEK_CUR);
code = taosLSeekFile(pRead->pLogFile, pHead->head.bodyLen, SEEK_CUR);
if (code < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
pRead->curVersion = -1;
......@@ -203,7 +204,7 @@ int32_t walSkipFetchBody(SWalReadHandle *pRead, const SWalCkHead *pHead) {
return 0;
}
int32_t walFetchBody(SWalReadHandle *pRead, SWalCkHead **ppHead) {
int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
SWalCont *pReadHead = &((*ppHead)->head);
int64_t ver = pReadHead->version;
......@@ -218,7 +219,7 @@ int32_t walFetchBody(SWalReadHandle *pRead, SWalCkHead **ppHead) {
pRead->capacity = pReadHead->bodyLen;
}
if (pReadHead->bodyLen != taosReadFile(pRead->pReadLogTFile, pReadHead->body, pReadHead->bodyLen)) {
if (pReadHead->bodyLen != taosReadFile(pRead->pLogFile, pReadHead->body, pReadHead->bodyLen)) {
ASSERT(0);
return -1;
}
......@@ -241,9 +242,9 @@ int32_t walFetchBody(SWalReadHandle *pRead, SWalCkHead **ppHead) {
return 0;
}
int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalCont **ppHead) {
int32_t walReadWithHandle_s(SWalReader *pRead, int64_t ver, SWalCont **ppHead) {
taosThreadMutexLock(&pRead->mutex);
if (walReadWithHandle(pRead, ver) < 0) {
if (walReadVer(pRead, ver) < 0) {
taosThreadMutexUnlock(&pRead->mutex);
return -1;
}
......@@ -257,7 +258,7 @@ int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalCont **ppHea
return 0;
}
int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
int32_t walReadVer(SWalReader *pRead, int64_t ver) {
int64_t code;
if (pRead->pWal->vers.firstVer == -1) {
......@@ -280,9 +281,9 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
return -1;
}
ASSERT(taosValidFile(pRead->pReadLogTFile) == true);
ASSERT(taosValidFile(pRead->pLogFile) == true);
code = taosReadFile(pRead->pReadLogTFile, pRead->pHead, sizeof(SWalCkHead));
code = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead));
if (code != sizeof(SWalCkHead)) {
if (code < 0)
terrno = TAOS_SYSTEM_ERROR(errno);
......@@ -310,7 +311,7 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
pRead->capacity = pRead->pHead->head.bodyLen;
}
if ((code = taosReadFile(pRead->pReadLogTFile, pRead->pHead->head.body, pRead->pHead->head.bodyLen)) !=
if ((code = taosReadFile(pRead->pLogFile, pRead->pHead->head.body, pRead->pHead->head.bodyLen)) !=
pRead->pHead->head.bodyLen) {
if (code < 0)
terrno = TAOS_SYSTEM_ERROR(errno);
......@@ -340,46 +341,3 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
return 0;
}
#if 0
int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) {
int code;
code = walSeekVer(pWal, ver);
if (code != 0) {
return code;
}
if (*ppHead == NULL) {
void *ptr = taosMemoryRealloc(*ppHead, sizeof(SWalHead));
if (ptr == NULL) {
return -1;
}
*ppHead = ptr;
}
if (tfRead(pWal->pWriteLogTFile, *ppHead, sizeof(SWalHead)) != sizeof(SWalHead)) {
return -1;
}
// TODO: endian compatibility processing after read
if (walValidHeadCksum(*ppHead) != 0) {
return -1;
}
void *ptr = taosMemoryRealloc(*ppHead, sizeof(SWalHead) + (*ppHead)->head.len);
if (ptr == NULL) {
taosMemoryFree(*ppHead);
*ppHead = NULL;
return -1;
}
if (tfRead(pWal->pWriteLogTFile, (*ppHead)->head.body, (*ppHead)->head.len) != (*ppHead)->head.len) {
return -1;
}
// TODO: endian compatibility processing after read
if (walValidBodyCksum(*ppHead) != 0) {
return -1;
}
return 0;
}
int32_t walReadWithFp(SWal *pWal, FWalWrite writeFp, int64_t verStart, int32_t readNum) {
return 0;
}
#endif
......@@ -292,8 +292,8 @@ TEST_F(WalCleanDeleteEnv, roll) {
TEST_F(WalKeepEnv, readHandleRead) {
walResetEnv();
int code;
SWalReadHandle* pRead = walOpenReadHandle(pWal);
int code;
SWalReader* pRead = walOpenReader(pWal, NULL);
ASSERT(pRead != NULL);
int i;
......@@ -306,7 +306,7 @@ TEST_F(WalKeepEnv, readHandleRead) {
}
for (int i = 0; i < 1000; i++) {
int ver = taosRand() % 100;
code = walReadWithHandle(pRead, ver);
code = walReadVer(pRead, ver);
ASSERT_EQ(code, 0);
// printf("rrbody: \n");
......@@ -325,7 +325,7 @@ TEST_F(WalKeepEnv, readHandleRead) {
EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]);
}
}
walCloseReadHandle(pRead);
walCloseReader(pRead);
}
TEST_F(WalRetentionEnv, repairMeta1) {
......@@ -354,12 +354,12 @@ TEST_F(WalRetentionEnv, repairMeta1) {
ASSERT_EQ(pWal->vers.lastVer, 99);
SWalReadHandle* pRead = walOpenReadHandle(pWal);
SWalReader* pRead = walOpenReader(pWal, NULL);
ASSERT(pRead != NULL);
for (int i = 0; i < 1000; i++) {
int ver = taosRand() % 100;
code = walReadWithHandle(pRead, ver);
code = walReadVer(pRead, ver);
ASSERT_EQ(code, 0);
// printf("rrbody: \n");
......@@ -389,7 +389,7 @@ TEST_F(WalRetentionEnv, repairMeta1) {
for (int i = 0; i < 1000; i++) {
int ver = taosRand() % 200;
code = walReadWithHandle(pRead, ver);
code = walReadVer(pRead, ver);
ASSERT_EQ(code, 0);
// printf("rrbody: \n");
......
#python3 ./test.py -f 2-query/last.py -Q 3
......@@ -295,7 +295,7 @@ python3 ./test.py -f 2-query/Today.py -Q 3
python3 ./test.py -f 2-query/max.py -Q 3
python3 ./test.py -f 2-query/min.py -Q 3
python3 ./test.py -f 2-query/count.py -Q 3
python3 ./test.py -f 2-query/last.py -Q 3
#python3 ./test.py -f 2-query/last.py -Q 3
python3 ./test.py -f 2-query/first.py -Q 3
python3 ./test.py -f 2-query/To_iso8601.py -Q 3
python3 ./test.py -f 2-query/To_unixtimestamp.py -Q 3
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册