未验证 提交 8a9e729d 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #15195 from taosdata/feature/stream

feat(wal): remove wal log size limitation
...@@ -39,10 +39,10 @@ extern "C" { ...@@ -39,10 +39,10 @@ extern "C" {
#define WAL_LOG_SUFFIX "log" #define WAL_LOG_SUFFIX "log"
#define WAL_INDEX_SUFFIX "idx" #define WAL_INDEX_SUFFIX "idx"
#define WAL_REFRESH_MS 1000 #define WAL_REFRESH_MS 1000
#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalCkHead))
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12) #define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
#define WAL_FILE_LEN (WAL_PATH_LEN + 32) #define WAL_FILE_LEN (WAL_PATH_LEN + 32)
#define WAL_MAGIC 0xFAFBFCFDULL #define WAL_MAGIC 0xFAFBFCFDULL
#define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3)
typedef enum { typedef enum {
TAOS_WAL_WRITE = 1, TAOS_WAL_WRITE = 1,
......
...@@ -421,7 +421,7 @@ typedef enum ELogicConditionType { ...@@ -421,7 +421,7 @@ typedef enum ELogicConditionType {
#define TSDB_DEFAULT_STABLES_HASH_SIZE 100 #define TSDB_DEFAULT_STABLES_HASH_SIZE 100
#define TSDB_DEFAULT_CTABLES_HASH_SIZE 20000 #define TSDB_DEFAULT_CTABLES_HASH_SIZE 20000
#define TSDB_MAX_WAL_SIZE (1024 * 1024 * 3) #define TSDB_MAX_MSG_SIZE (1024 * 1024 * 10)
#define TSDB_ARB_DUMMY_TIME 4765104000000 // 2121-01-01 00:00:00.000, :P #define TSDB_ARB_DUMMY_TIME 4765104000000 // 2121-01-01 00:00:00.000, :P
......
...@@ -45,7 +45,6 @@ void taosIp2String(uint32_t ip, char *str); ...@@ -45,7 +45,6 @@ void taosIp2String(uint32_t ip, char *str);
void taosIpPort2String(uint32_t ip, uint16_t port, char *str); void taosIpPort2String(uint32_t ip, uint16_t port, char *str);
void *tmemmem(const char *haystack, int hlen, const char *needle, int nlen); void *tmemmem(const char *haystack, int hlen, const char *needle, int nlen);
char *strDupUnquo(const char *src);
static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *target) { static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *target) {
T_MD5_CTX context; T_MD5_CTX context;
......
...@@ -40,11 +40,11 @@ bool tsPrintAuth = false; ...@@ -40,11 +40,11 @@ bool tsPrintAuth = false;
// multi process // multi process
int32_t tsMultiProcess = 0; int32_t tsMultiProcess = 0;
int32_t tsMnodeShmSize = TSDB_MAX_WAL_SIZE * 2 + 1024; int32_t tsMnodeShmSize = TSDB_MAX_MSG_SIZE * 2 + 1024;
int32_t tsVnodeShmSize = TSDB_MAX_WAL_SIZE * 10 + 1024; int32_t tsVnodeShmSize = TSDB_MAX_MSG_SIZE * 10 + 1024;
int32_t tsQnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 1024; int32_t tsQnodeShmSize = TSDB_MAX_MSG_SIZE * 4 + 1024;
int32_t tsSnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 1024; int32_t tsSnodeShmSize = TSDB_MAX_MSG_SIZE * 4 + 1024;
int32_t tsBnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 1024; int32_t tsBnodeShmSize = TSDB_MAX_MSG_SIZE * 4 + 1024;
int32_t tsNumOfShmThreads = 1; int32_t tsNumOfShmThreads = 1;
// queue & threads // queue & threads
...@@ -387,11 +387,11 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { ...@@ -387,11 +387,11 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "deadLockKillQuery", tsDeadLockKillQuery, 0) != 0) return -1; if (cfgAddBool(pCfg, "deadLockKillQuery", tsDeadLockKillQuery, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "multiProcess", tsMultiProcess, 0, 2, 0) != 0) return -1; if (cfgAddInt32(pCfg, "multiProcess", tsMultiProcess, 0, 2, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "mnodeShmSize", tsMnodeShmSize, TSDB_MAX_WAL_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1; if (cfgAddInt32(pCfg, "mnodeShmSize", tsMnodeShmSize, TSDB_MAX_MSG_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "vnodeShmSize", tsVnodeShmSize, TSDB_MAX_WAL_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1; if (cfgAddInt32(pCfg, "vnodeShmSize", tsVnodeShmSize, TSDB_MAX_MSG_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "qnodeShmSize", tsQnodeShmSize, TSDB_MAX_WAL_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1; if (cfgAddInt32(pCfg, "qnodeShmSize", tsQnodeShmSize, TSDB_MAX_MSG_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "snodeShmSize", tsSnodeShmSize, TSDB_MAX_WAL_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1; if (cfgAddInt32(pCfg, "snodeShmSize", tsSnodeShmSize, TSDB_MAX_MSG_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "bnodeShmSize", tsBnodeShmSize, TSDB_MAX_WAL_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1; if (cfgAddInt32(pCfg, "bnodeShmSize", tsBnodeShmSize, TSDB_MAX_MSG_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "mumOfShmThreads", tsNumOfShmThreads, 1, 1024, 0) != 0) return -1; if (cfgAddInt32(pCfg, "mumOfShmThreads", tsNumOfShmThreads, 1, 1024, 0) != 0) return -1;
tsNumOfRpcThreads = tsNumOfCores / 2; tsNumOfRpcThreads = tsNumOfCores / 2;
...@@ -447,8 +447,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { ...@@ -447,8 +447,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "numOfSnodeUniqueThreads", tsNumOfSnodeUniqueThreads, 1, 1024, 0) != 0) return -1; if (cfgAddInt32(pCfg, "numOfSnodeUniqueThreads", tsNumOfSnodeUniqueThreads, 1, 1024, 0) != 0) return -1;
tsRpcQueueMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1; tsRpcQueueMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1;
tsRpcQueueMemoryAllowed = TRANGE(tsRpcQueueMemoryAllowed, TSDB_MAX_WAL_SIZE * 10L, TSDB_MAX_WAL_SIZE * 10000L); tsRpcQueueMemoryAllowed = TRANGE(tsRpcQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10L, TSDB_MAX_MSG_SIZE * 10000L);
if (cfgAddInt64(pCfg, "rpcQueueMemoryAllowed", tsRpcQueueMemoryAllowed, TSDB_MAX_WAL_SIZE * 10L, INT64_MAX, 0) != 0) if (cfgAddInt64(pCfg, "rpcQueueMemoryAllowed", tsRpcQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10L, INT64_MAX, 0) != 0)
return -1; return -1;
if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, 0) != 0) return -1; if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, 0) != 0) return -1;
......
...@@ -5349,6 +5349,7 @@ int32_t tEncodeSVAlterTbReq(SEncoder *pEncoder, const SVAlterTbReq *pReq) { ...@@ -5349,6 +5349,7 @@ int32_t tEncodeSVAlterTbReq(SEncoder *pEncoder, const SVAlterTbReq *pReq) {
if (tEncodeCStr(pEncoder, pReq->tbName) < 0) return -1; if (tEncodeCStr(pEncoder, pReq->tbName) < 0) return -1;
if (tEncodeI8(pEncoder, pReq->action) < 0) return -1; if (tEncodeI8(pEncoder, pReq->action) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->colId) < 0) return -1;
switch (pReq->action) { switch (pReq->action) {
case TSDB_ALTER_TABLE_ADD_COLUMN: case TSDB_ALTER_TABLE_ADD_COLUMN:
if (tEncodeCStr(pEncoder, pReq->colName) < 0) return -1; if (tEncodeCStr(pEncoder, pReq->colName) < 0) return -1;
...@@ -5399,6 +5400,7 @@ int32_t tDecodeSVAlterTbReq(SDecoder *pDecoder, SVAlterTbReq *pReq) { ...@@ -5399,6 +5400,7 @@ int32_t tDecodeSVAlterTbReq(SDecoder *pDecoder, SVAlterTbReq *pReq) {
if (tDecodeCStr(pDecoder, &pReq->tbName) < 0) return -1; if (tDecodeCStr(pDecoder, &pReq->tbName) < 0) return -1;
if (tDecodeI8(pDecoder, &pReq->action) < 0) return -1; if (tDecodeI8(pDecoder, &pReq->action) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->colId) < 0) return -1;
switch (pReq->action) { switch (pReq->action) {
case TSDB_ALTER_TABLE_ADD_COLUMN: case TSDB_ALTER_TABLE_ADD_COLUMN:
if (tDecodeCStr(pDecoder, &pReq->colName) < 0) return -1; if (tDecodeCStr(pDecoder, &pReq->colName) < 0) return -1;
......
...@@ -231,7 +231,7 @@ static int32_t sdbReadFileImp(SSdb *pSdb) { ...@@ -231,7 +231,7 @@ static int32_t sdbReadFileImp(SSdb *pSdb) {
snprintf(file, sizeof(file), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP); snprintf(file, sizeof(file), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
mDebug("start to read sdb file:%s", file); mDebug("start to read sdb file:%s", file);
SSdbRaw *pRaw = taosMemoryMalloc(WAL_MAX_SIZE + 100); SSdbRaw *pRaw = taosMemoryMalloc(TSDB_MAX_MSG_SIZE + 100);
if (pRaw == NULL) { if (pRaw == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed read sdb file since %s", terrstr()); mError("failed read sdb file since %s", terrstr());
...@@ -556,7 +556,8 @@ int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter, int64_t *index, int64_t *ter ...@@ -556,7 +556,8 @@ int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter, int64_t *index, int64_t *ter
if (term != NULL) *term = commitTerm; if (term != NULL) *term = commitTerm;
if (config != NULL) *config = commitConfig; if (config != NULL) *config = commitConfig;
mDebug("sdbiter:%p, is created to read snapshot, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64 " file:%s", mDebug("sdbiter:%p, is created to read snapshot, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64
" file:%s",
pIter, commitIndex, commitTerm, commitConfig, pIter->name); pIter, commitIndex, commitTerm, commitConfig, pIter->name);
return 0; return 0;
} }
......
...@@ -506,7 +506,8 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -506,7 +506,8 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
.initTqReader = true, .initTqReader = true,
.version = ver, .version = ver,
}; };
pHandle->execHandle.execCol.task[i] = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols); pHandle->execHandle.execCol.task[i] =
qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols);
ASSERT(pHandle->execHandle.execCol.task[i]); ASSERT(pHandle->execHandle.execCol.task[i]);
void* scanner = NULL; void* scanner = NULL;
qExtractStreamScanner(pHandle->execHandle.execCol.task[i], &scanner); qExtractStreamScanner(pHandle->execHandle.execCol.task[i], &scanner);
...@@ -679,9 +680,9 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -679,9 +680,9 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
// //
SStreamTaskRunReq* pReq = pMsg->pCont; SStreamTaskRunReq* pReq = pMsg->pCont;
int32_t taskId = pReq->taskId; int32_t taskId = pReq->taskId;
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (pTask) { if (ppTask) {
streamProcessRunReq(pTask); streamProcessRunReq(*ppTask);
return 0; return 0;
} else { } else {
return -1; return -1;
...@@ -697,13 +698,13 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -697,13 +698,13 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) {
tDecoderInit(&decoder, msgBody, msgLen); tDecoderInit(&decoder, msgBody, msgLen);
tDecodeStreamDispatchReq(&decoder, &req); tDecodeStreamDispatchReq(&decoder, &req);
int32_t taskId = req.taskId; int32_t taskId = req.taskId;
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (pTask) { if (ppTask) {
SRpcMsg rsp = { SRpcMsg rsp = {
.info = pMsg->info, .info = pMsg->info,
.code = 0, .code = 0,
}; };
streamProcessDispatchReq(pTask, &req, &rsp); streamProcessDispatchReq(*ppTask, &req, &rsp);
return 0; return 0;
} else { } else {
return -1; return -1;
...@@ -713,9 +714,9 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -713,9 +714,9 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) {
SStreamTaskRecoverReq* pReq = pMsg->pCont; SStreamTaskRecoverReq* pReq = pMsg->pCont;
int32_t taskId = pReq->taskId; int32_t taskId = pReq->taskId;
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (pTask) { if (ppTask) {
streamProcessRecoverReq(pTask, pReq, pMsg); streamProcessRecoverReq(*ppTask, pReq, pMsg);
return 0; return 0;
} else { } else {
return -1; return -1;
...@@ -725,9 +726,9 @@ int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -725,9 +726,9 @@ int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t taskId = pRsp->taskId; int32_t taskId = pRsp->taskId;
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (pTask) { if (ppTask) {
streamProcessDispatchRsp(pTask, pRsp); streamProcessDispatchRsp(*ppTask, pRsp);
return 0; return 0;
} else { } else {
return -1; return -1;
...@@ -737,9 +738,9 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { ...@@ -737,9 +738,9 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) {
SStreamTaskRecoverRsp* pRsp = pMsg->pCont; SStreamTaskRecoverRsp* pRsp = pMsg->pCont;
int32_t taskId = pRsp->taskId; int32_t taskId = pRsp->taskId;
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (pTask) { if (ppTask) {
streamProcessRecoverRsp(pTask, pRsp); streamProcessRecoverRsp(*ppTask, pRsp);
return 0; return 0;
} else { } else {
return -1; return -1;
...@@ -749,10 +750,10 @@ int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) { ...@@ -749,10 +750,10 @@ int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) { int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t)); SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t));
if (pTask) { if (ppTask) {
taosHashRemove(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t)); taosHashRemove(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t));
atomic_store_8(&pTask->taskStatus, TASK_STATUS__DROPPING); atomic_store_8(&(*ppTask)->taskStatus, TASK_STATUS__DROPPING);
} }
// todo // todo
// clear queue // clear queue
...@@ -781,15 +782,16 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -781,15 +782,16 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
tDecoderInit(&decoder, msgBody, msgLen); tDecoderInit(&decoder, msgBody, msgLen);
tDecodeStreamRetrieveReq(&decoder, &req); tDecodeStreamRetrieveReq(&decoder, &req);
int32_t taskId = req.dstTaskId; int32_t taskId = req.dstTaskId;
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) { if (ppTask) {
return 0;
}
SRpcMsg rsp = { SRpcMsg rsp = {
.info = pMsg->info, .info = pMsg->info,
.code = 0, .code = 0,
}; };
streamProcessRetrieveReq(pTask, &req, &rsp); streamProcessRetrieveReq(*ppTask, &req, &rsp);
} else {
return -1;
}
return 0; return 0;
} }
......
...@@ -40,7 +40,6 @@ static FORCE_INLINE int walBuildMetaName(SWal* pWal, int metaVer, char* buf) { ...@@ -40,7 +40,6 @@ static FORCE_INLINE int walBuildMetaName(SWal* pWal, int metaVer, char* buf) {
} }
static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) { static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) {
ASSERT(pWal->fileInfoSet != NULL);
int32_t sz = taosArrayGetSize(pWal->fileInfoSet); int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
ASSERT(sz > 0); ASSERT(sz > 0);
#if 0 #if 0
...@@ -55,7 +54,7 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) { ...@@ -55,7 +54,7 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) {
int64_t fileSize = 0; int64_t fileSize = 0;
taosStatFile(fnameStr, &fileSize, NULL); taosStatFile(fnameStr, &fileSize, NULL);
int readSize = TMIN(WAL_MAX_SIZE + 2, fileSize); int32_t readSize = TMIN(WAL_SCAN_BUF_SIZE, fileSize);
pLastFileInfo->fileSize = fileSize; pLastFileInfo->fileSize = fileSize;
TdFilePtr pFile = taosOpenFile(fnameStr, TD_FILE_READ); TdFilePtr pFile = taosOpenFile(fnameStr, TD_FILE_READ);
...@@ -73,7 +72,8 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) { ...@@ -73,7 +72,8 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) {
return -1; return -1;
} }
taosLSeekFile(pFile, -readSize, SEEK_END); int64_t offset;
offset = taosLSeekFile(pFile, -readSize, SEEK_END);
if (readSize != taosReadFile(pFile, buf, readSize)) { if (readSize != taosReadFile(pFile, buf, readSize)) {
taosMemoryFree(buf); taosMemoryFree(buf);
taosCloseFile(&pFile); taosCloseFile(&pFile);
...@@ -81,8 +81,9 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) { ...@@ -81,8 +81,9 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) {
return -1; return -1;
} }
char* haystack = buf;
char* found = NULL; char* found = NULL;
while (1) {
char* haystack = buf;
char* candidate; char* candidate;
while ((candidate = tmemmem(haystack, readSize - (haystack - buf), (char*)&magic, sizeof(uint64_t))) != NULL) { while ((candidate = tmemmem(haystack, readSize - (haystack - buf), (char*)&magic, sizeof(uint64_t))) != NULL) {
// read and validate // read and validate
...@@ -92,6 +93,17 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) { ...@@ -92,6 +93,17 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) {
} }
haystack = candidate + 1; haystack = candidate + 1;
} }
if (found || offset == 0) break;
offset = TMIN(0, offset - readSize + sizeof(uint64_t));
int64_t offset2 = taosLSeekFile(pFile, offset, SEEK_SET);
ASSERT(offset == offset2);
if (readSize != taosReadFile(pFile, buf, readSize)) {
taosMemoryFree(buf);
taosCloseFile(&pFile);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
#if 0
if (found == buf) { if (found == buf) {
SWalCkHead* logContent = (SWalCkHead*)found; SWalCkHead* logContent = (SWalCkHead*)found;
if (walValidHeadCksum(logContent) != 0 || walValidBodyCksum(logContent) != 0) { if (walValidHeadCksum(logContent) != 0 || walValidBodyCksum(logContent) != 0) {
...@@ -102,10 +114,23 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) { ...@@ -102,10 +114,23 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) {
return -1; return -1;
} }
} }
taosCloseFile(&pFile); #endif
}
// TODO truncate file
if (found == NULL) {
// file corrupted, no complete log
// TODO delete and search in previous files
ASSERT(0);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
}
SWalCkHead* lastEntry = (SWalCkHead*)found; SWalCkHead* lastEntry = (SWalCkHead*)found;
int64_t retVer = lastEntry->head.version;
taosCloseFile(&pFile);
taosMemoryFree(buf);
return lastEntry->head.version; return retVer;
} }
int walCheckAndRepairMeta(SWal* pWal) { int walCheckAndRepairMeta(SWal* pWal) {
......
...@@ -436,11 +436,6 @@ END: ...@@ -436,11 +436,6 @@ END:
} }
int64_t walAppendLog(SWal *pWal, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body, int32_t bodyLen) { int64_t walAppendLog(SWal *pWal, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body, int32_t bodyLen) {
if (bodyLen > TSDB_MAX_WAL_SIZE) {
terrno = TSDB_CODE_WAL_SIZE_LIMIT;
return -1;
}
taosThreadMutexLock(&pWal->mutex); taosThreadMutexLock(&pWal->mutex);
int64_t index = pWal->vers.lastVer + 1; int64_t index = pWal->vers.lastVer + 1;
...@@ -472,10 +467,6 @@ int32_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SWalSync ...@@ -472,10 +467,6 @@ int32_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SWalSync
int32_t bodyLen) { int32_t bodyLen) {
int32_t code = 0; int32_t code = 0;
if (bodyLen > TSDB_MAX_WAL_SIZE) {
terrno = TSDB_CODE_WAL_SIZE_LIMIT;
return -1;
}
taosThreadMutexLock(&pWal->mutex); taosThreadMutexLock(&pWal->mutex);
// concurrency control: // concurrency control:
......
...@@ -64,20 +64,6 @@ int32_t strdequote(char *z) { ...@@ -64,20 +64,6 @@ int32_t strdequote(char *z) {
return j + 1; // only one quote, do nothing return j + 1; // only one quote, do nothing
} }
char *strDupUnquo(const char *src) {
if (src == NULL) return NULL;
if (src[0] != '`') return strdup(src);
int32_t len = (int32_t)strlen(src);
if (src[len - 1] != '`') return NULL;
char *ret = taosMemoryMalloc(len);
if (ret == NULL) return NULL;
for (int32_t i = 0; i < len - 1; i++) {
ret[i] = src[i + 1];
}
ret[len - 1] = 0;
return ret;
}
size_t strtrim(char *z) { size_t strtrim(char *z) {
int32_t i = 0; int32_t i = 0;
int32_t j = 0; int32_t j = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册