未验证 提交 8915418d 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #14429 from taosdata/feature/stream

refactor(wal)
...@@ -61,45 +61,23 @@ extern "C" { ...@@ -61,45 +61,23 @@ extern "C" {
} \ } \
} }
#define WAL_HEAD_VER 0 #define WAL_PROTO_VER 0
#define WAL_NOSUFFIX_LEN 20 #define WAL_NOSUFFIX_LEN 20
#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN + 1) #define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN + 1)
#define WAL_LOG_SUFFIX "log" #define WAL_LOG_SUFFIX "log"
#define WAL_INDEX_SUFFIX "idx" #define WAL_INDEX_SUFFIX "idx"
#define WAL_REFRESH_MS 1000 #define WAL_REFRESH_MS 1000
#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead)) #define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalCkHead))
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12) #define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
#define WAL_FILE_LEN (WAL_PATH_LEN + 32) #define WAL_FILE_LEN (WAL_PATH_LEN + 32)
#define WAL_MAGIC 0xFAFBFCFDULL #define WAL_MAGIC 0xFAFBFCFDULL
#pragma pack(push, 1)
typedef enum { typedef enum {
TAOS_WAL_NOLOG = 0, TAOS_WAL_NOLOG = 0,
TAOS_WAL_WRITE = 1, TAOS_WAL_WRITE = 1,
TAOS_WAL_FSYNC = 2, TAOS_WAL_FSYNC = 2,
} EWalType; } EWalType;
// used by sync module
typedef struct {
int8_t isWeek;
uint64_t seqNum;
uint64_t term;
} SSyncLogMeta;
typedef struct SWalReadHead {
int8_t headVer;
int8_t reserved;
int16_t msgType;
int32_t bodyLen;
int64_t ingestTs; // not implemented
int64_t version;
// sync meta
SSyncLogMeta syncMeta;
char body[];
} SWalReadHead;
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
int32_t fsyncPeriod; // millisecond int32_t fsyncPeriod; // millisecond
...@@ -110,13 +88,6 @@ typedef struct { ...@@ -110,13 +88,6 @@ typedef struct {
EWalType level; // wal level EWalType level; // wal level
} SWalCfg; } SWalCfg;
typedef struct {
uint64_t magic;
uint32_t cksumHead;
uint32_t cksumBody;
SWalReadHead head;
} SWalHead;
typedef struct SWalVer { typedef struct SWalVer {
int64_t firstVer; int64_t firstVer;
int64_t verInSnapshotting; int64_t verInSnapshotting;
...@@ -125,6 +96,35 @@ typedef struct SWalVer { ...@@ -125,6 +96,35 @@ typedef struct SWalVer {
int64_t lastVer; int64_t lastVer;
} SWalVer; } SWalVer;
#pragma pack(push, 1)
// used by sync module
typedef struct {
int8_t isWeek;
uint64_t seqNum;
uint64_t term;
} SSyncLogMeta;
typedef struct {
int8_t protoVer;
int64_t version;
int16_t msgType;
int32_t bodyLen;
int64_t ingestTs; // not implemented
// sync meta
SSyncLogMeta syncMeta;
char body[];
} SWalCont;
typedef struct {
uint64_t magic;
uint32_t cksumHead;
uint32_t cksumBody;
SWalCont head;
} SWalCkHead;
#pragma pack(pop)
typedef struct SWal { typedef struct SWal {
// cfg // cfg
SWalCfg cfg; SWalCfg cfg;
...@@ -134,7 +134,7 @@ typedef struct SWal { ...@@ -134,7 +134,7 @@ typedef struct SWal {
TdFilePtr pWriteLogTFile; TdFilePtr pWriteLogTFile;
TdFilePtr pWriteIdxTFile; TdFilePtr pWriteIdxTFile;
int32_t writeCur; int32_t writeCur;
SArray *fileInfoSet; SArray *fileInfoSet; // SArray<SWalFileInfo>
// status // status
int64_t totSize; int64_t totSize;
int64_t lastRollSeq; int64_t lastRollSeq;
...@@ -146,7 +146,7 @@ typedef struct SWal { ...@@ -146,7 +146,7 @@ typedef struct SWal {
// path // path
char path[WAL_PATH_LEN]; char path[WAL_PATH_LEN];
// reusable write head // reusable write head
SWalHead writeHead; SWalCkHead writeHead;
} SWal; // WAL HANDLE } SWal; // WAL HANDLE
typedef struct SWalReadHandle { typedef struct SWalReadHandle {
...@@ -158,11 +158,8 @@ typedef struct SWalReadHandle { ...@@ -158,11 +158,8 @@ typedef struct SWalReadHandle {
int64_t capacity; int64_t capacity;
int64_t status; // if cursor valid int64_t status; // if cursor valid
TdThreadMutex mutex; TdThreadMutex mutex;
SWalHead *pHead; SWalCkHead *pHead;
} SWalReadHandle; } SWalReadHandle;
#pragma pack(pop)
// typedef int32_t (*FWalWrite)(void *ahandle, void *pHead);
// module initialization // module initialization
int32_t walInit(); int32_t walInit();
...@@ -174,9 +171,9 @@ int32_t walAlter(SWal *, SWalCfg *pCfg); ...@@ -174,9 +171,9 @@ int32_t walAlter(SWal *, SWalCfg *pCfg);
void walClose(SWal *); void walClose(SWal *);
// write // write
int64_t walWriteWithSyncInfo(SWal *, int64_t index, tmsg_t msgType, SSyncLogMeta syncMeta, const void *body, int32_t walWriteWithSyncInfo(SWal *, int64_t index, tmsg_t msgType, SSyncLogMeta syncMeta, const void *body,
int32_t bodyLen); int32_t bodyLen);
int64_t walWrite(SWal *, int64_t index, tmsg_t msgType, const void *body, int32_t bodyLen); int32_t walWrite(SWal *, int64_t index, tmsg_t msgType, const void *body, int32_t bodyLen);
void walFsync(SWal *, bool force); void walFsync(SWal *, bool force);
// apis for lifecycle management // apis for lifecycle management
...@@ -196,9 +193,9 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver); ...@@ -196,9 +193,9 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver);
// only for tq usage // only for tq usage
void walSetReaderCapacity(SWalReadHandle *pRead, int32_t capacity); void walSetReaderCapacity(SWalReadHandle *pRead, int32_t capacity);
int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalHead *pHead); int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalCkHead *pHead);
int32_t walFetchBody(SWalReadHandle *pRead, SWalHead **ppHead); int32_t walFetchBody(SWalReadHandle *pRead, SWalCkHead **ppHead);
int32_t walSkipFetchBody(SWalReadHandle *pRead, const SWalHead *pHead); int32_t walSkipFetchBody(SWalReadHandle *pRead, const SWalCkHead *pHead);
typedef struct { typedef struct {
int64_t refId; int64_t refId;
......
...@@ -25,17 +25,17 @@ extern "C" { ...@@ -25,17 +25,17 @@ extern "C" {
// If the error is in a third-party library, place this header file under the third-party library header file. // If the error is in a third-party library, place this header file under the third-party library header file.
// When you want to use this feature, you should find or add the same function in the following sectio // When you want to use this feature, you should find or add the same function in the following sectio
#ifndef ALLOW_FORBID_FUNC #ifndef ALLOW_FORBID_FUNC
#define open OPEN_FUNC_TAOS_FORBID #define open OPEN_FUNC_TAOS_FORBID
#define fopen FOPEN_FUNC_TAOS_FORBID #define fopen FOPEN_FUNC_TAOS_FORBID
#define access ACCESS_FUNC_TAOS_FORBID #define access ACCESS_FUNC_TAOS_FORBID
#define stat STAT_FUNC_TAOS_FORBID #define stat STAT_FUNC_TAOS_FORBID
#define lstat LSTAT_FUNC_TAOS_FORBID #define lstat LSTAT_FUNC_TAOS_FORBID
#define fstat FSTAT_FUNC_TAOS_FORBID #define fstat FSTAT_FUNC_TAOS_FORBID
#define close CLOSE_FUNC_TAOS_FORBID #define close CLOSE_FUNC_TAOS_FORBID
#define fclose FCLOSE_FUNC_TAOS_FORBID #define fclose FCLOSE_FUNC_TAOS_FORBID
#define fsync FSYNC_FUNC_TAOS_FORBID #define fsync FSYNC_FUNC_TAOS_FORBID
#define getline GETLINE_FUNC_TAOS_FORBID #define getline GETLINE_FUNC_TAOS_FORBID
// #define fflush FFLUSH_FUNC_TAOS_FORBID // #define fflush FFLUSH_FUNC_TAOS_FORBID
#endif #endif
#ifndef PATH_MAX #ifndef PATH_MAX
...@@ -43,54 +43,54 @@ extern "C" { ...@@ -43,54 +43,54 @@ extern "C" {
#endif #endif
typedef struct TdFile *TdFilePtr; typedef struct TdFile *TdFilePtr;
#define TD_FILE_CREATE 0x0001 #define TD_FILE_CREATE 0x0001
#define TD_FILE_WRITE 0x0002 #define TD_FILE_WRITE 0x0002
#define TD_FILE_READ 0x0004 #define TD_FILE_READ 0x0004
#define TD_FILE_TRUNC 0x0008 #define TD_FILE_TRUNC 0x0008
#define TD_FILE_APPEND 0x0010 #define TD_FILE_APPEND 0x0010
#define TD_FILE_TEXT 0x0020 #define TD_FILE_TEXT 0x0020
#define TD_FILE_AUTO_DEL 0x0040 #define TD_FILE_AUTO_DEL 0x0040
#define TD_FILE_EXCL 0x0080 #define TD_FILE_EXCL 0x0080
#define TD_FILE_STREAM 0x0100 // Only support taosFprintfFile, taosGetLineFile, taosEOFFile #define TD_FILE_STREAM 0x0100 // Only support taosFprintfFile, taosGetLineFile, taosEOFFile
TdFilePtr taosOpenFile(const char *path,int32_t tdFileOptions); TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions);
#define TD_FILE_ACCESS_EXIST_OK 0x1 #define TD_FILE_ACCESS_EXIST_OK 0x1
#define TD_FILE_ACCESS_READ_OK 0x2 #define TD_FILE_ACCESS_READ_OK 0x2
#define TD_FILE_ACCESS_WRITE_OK 0x4 #define TD_FILE_ACCESS_WRITE_OK 0x4
bool taosCheckAccessFile(const char *pathname, int mode); bool taosCheckAccessFile(const char *pathname, int mode);
int32_t taosLockFile(TdFilePtr pFile); int32_t taosLockFile(TdFilePtr pFile);
int32_t taosUnLockFile(TdFilePtr pFile); int32_t taosUnLockFile(TdFilePtr pFile);
int32_t taosUmaskFile(int32_t maskVal); int32_t taosUmaskFile(int32_t maskVal);
int32_t taosStatFile(const char *path, int64_t *size, int32_t *mtime); int32_t taosStatFile(const char *path, int64_t *size, int32_t *mtime);
int32_t taosDevInoFile(TdFilePtr pFile, int64_t *stDev, int64_t *stIno); int32_t taosDevInoFile(TdFilePtr pFile, int64_t *stDev, int64_t *stIno);
int32_t taosFStatFile(TdFilePtr pFile, int64_t *size, int32_t *mtime); int32_t taosFStatFile(TdFilePtr pFile, int64_t *size, int32_t *mtime);
bool taosCheckExistFile(const char *pathname); bool taosCheckExistFile(const char *pathname);
int64_t taosLSeekFile(TdFilePtr pFile, int64_t offset, int32_t whence); int64_t taosLSeekFile(TdFilePtr pFile, int64_t offset, int32_t whence);
int32_t taosFtruncateFile(TdFilePtr pFile, int64_t length); int32_t taosFtruncateFile(TdFilePtr pFile, int64_t length);
int32_t taosFsyncFile(TdFilePtr pFile); int32_t taosFsyncFile(TdFilePtr pFile);
int64_t taosReadFile(TdFilePtr pFile, void *buf, int64_t count); int64_t taosReadFile(TdFilePtr pFile, void *buf, int64_t count);
int64_t taosPReadFile(TdFilePtr pFile, void *buf, int64_t count, int64_t offset); int64_t taosPReadFile(TdFilePtr pFile, void *buf, int64_t count, int64_t offset);
int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count); int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count);
void taosFprintfFile(TdFilePtr pFile, const char *format, ...); void taosFprintfFile(TdFilePtr pFile, const char *format, ...);
int64_t taosGetLineFile(TdFilePtr pFile, char ** __restrict ptrBuf); int64_t taosGetLineFile(TdFilePtr pFile, char **__restrict ptrBuf);
int64_t taosGetsFile(TdFilePtr pFile, int32_t maxSize, char *__restrict buf); int64_t taosGetsFile(TdFilePtr pFile, int32_t maxSize, char *__restrict buf);
int32_t taosEOFFile(TdFilePtr pFile); int32_t taosEOFFile(TdFilePtr pFile);
int64_t taosCloseFile(TdFilePtr *ppFile); int32_t taosCloseFile(TdFilePtr *ppFile);
int32_t taosRenameFile(const char *oldName, const char *newName); int32_t taosRenameFile(const char *oldName, const char *newName);
int64_t taosCopyFile(const char *from, const char *to); int64_t taosCopyFile(const char *from, const char *to);
int32_t taosRemoveFile(const char *path); int32_t taosRemoveFile(const char *path);
void taosGetTmpfilePath(const char *inputTmpDir, const char *fileNamePrefix, char *dstPath); void taosGetTmpfilePath(const char *inputTmpDir, const char *fileNamePrefix, char *dstPath);
int64_t taosFSendFile(TdFilePtr pFileOut, TdFilePtr pFileIn, int64_t *offset, int64_t size); int64_t taosFSendFile(TdFilePtr pFileOut, TdFilePtr pFileIn, int64_t *offset, int64_t size);
......
...@@ -139,7 +139,7 @@ void* taosArrayGetLast(const SArray* pArray); ...@@ -139,7 +139,7 @@ void* taosArrayGetLast(const SArray* pArray);
* @param pArray * @param pArray
* @return * @return
*/ */
size_t taosArrayGetSize(const SArray* pArray); int32_t taosArrayGetSize(const SArray* pArray);
/** /**
* set the size of array * set the size of array
......
...@@ -44,6 +44,8 @@ uint32_t ip2uint(const char *const ip_addr); ...@@ -44,6 +44,8 @@ uint32_t ip2uint(const char *const ip_addr);
void taosIp2String(uint32_t ip, char *str); void taosIp2String(uint32_t ip, char *str);
void taosIpPort2String(uint32_t ip, uint16_t port, char *str); void taosIpPort2String(uint32_t ip, uint16_t port, char *str);
void *tmemmem(const char *haystack, int hlen, const char *needle, int nlen);
static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *target) { static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *target) {
T_MD5_CTX context; T_MD5_CTX context;
tMD5Init(&context); tMD5Init(&context);
...@@ -59,10 +61,10 @@ static FORCE_INLINE void taosEncryptPass_c(uint8_t *inBuf, size_t len, char *tar ...@@ -59,10 +61,10 @@ static FORCE_INLINE void taosEncryptPass_c(uint8_t *inBuf, size_t len, char *tar
tMD5Final(&context); tMD5Final(&context);
char buf[TSDB_PASSWORD_LEN + 1]; char buf[TSDB_PASSWORD_LEN + 1];
sprintf(buf, "%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", context.digest[0], sprintf(buf, "%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", context.digest[0], context.digest[1],
context.digest[1], context.digest[2], context.digest[3], context.digest[4], context.digest[5], context.digest[2], context.digest[3], context.digest[4], context.digest[5], context.digest[6],
context.digest[6], context.digest[7], context.digest[8], context.digest[9], context.digest[10], context.digest[7], context.digest[8], context.digest[9], context.digest[10], context.digest[11],
context.digest[11], context.digest[12], context.digest[13], context.digest[14], context.digest[15]); context.digest[12], context.digest[13], context.digest[14], context.digest[15]);
memcpy(target, buf, TSDB_PASSWORD_LEN); memcpy(target, buf, TSDB_PASSWORD_LEN);
} }
......
...@@ -157,7 +157,7 @@ typedef struct { ...@@ -157,7 +157,7 @@ typedef struct {
static STqMgmt tqMgmt = {0}; static STqMgmt tqMgmt = {0};
// tqRead // tqRead
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalHead** pHeadWithCkSum); int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum);
// tqExec // tqExec
int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp, int32_t workerId); int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp, int32_t workerId);
...@@ -178,6 +178,7 @@ STqOffsetStore* tqOffsetOpen(); ...@@ -178,6 +178,7 @@ STqOffsetStore* tqOffsetOpen();
void tqOffsetClose(STqOffsetStore*); void tqOffsetClose(STqOffsetStore*);
STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey); STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey);
int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset); int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset);
int32_t tqOffsetDelete(STqOffsetStore* pStore, const char* subscribeKey);
int32_t tqOffsetSnapshot(STqOffsetStore* pStore); int32_t tqOffsetSnapshot(STqOffsetStore* pStore);
// tqSink // tqSink
......
...@@ -271,8 +271,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -271,8 +271,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pReq->subKey); STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pReq->subKey);
if (pOffset != NULL) { if (pOffset != NULL) {
fetchOffsetNew = pOffset->val; fetchOffsetNew = pOffset->val;
char formatBuf[50]; char formatBuf[80];
tFormatOffset(formatBuf, 50, &fetchOffsetNew); tFormatOffset(formatBuf, 80, &fetchOffsetNew);
tqDebug("tmq poll: consumer %ld, offset reset to %s", consumerId, formatBuf); tqDebug("tmq poll: consumer %ld, offset reset to %s", consumerId, formatBuf);
} else { } else {
if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) { if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
...@@ -302,9 +302,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -302,9 +302,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType); tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType);
if (fetchOffsetNew.type == TMQ_OFFSET__LOG) { if (fetchOffsetNew.type == TMQ_OFFSET__LOG) {
int64_t fetchVer = fetchOffsetNew.version + 1; int64_t fetchVer = fetchOffsetNew.version + 1;
SWalHead* pHeadWithCkSum = taosMemoryMalloc(sizeof(SWalHead) + 2048); SWalCkHead* pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
if (pHeadWithCkSum == NULL) { if (pCkHead == NULL) {
return -1; return -1;
} }
...@@ -318,7 +318,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -318,7 +318,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
break; break;
} }
if (tqFetchLog(pTq, pHandle, &fetchVer, &pHeadWithCkSum) < 0) { if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead) < 0) {
// TODO add push mgr // TODO add push mgr
tqOffsetResetToLog(&dataRsp.rspOffset, fetchVer); tqOffsetResetToLog(&dataRsp.rspOffset, fetchVer);
...@@ -329,7 +329,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -329,7 +329,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
goto OVER; goto OVER;
} }
SWalReadHead* pHead = &pHeadWithCkSum->head; SWalCont* pHead = &pCkHead->head;
tqDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch, tqDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch,
TD_VID(pTq->pVnode), fetchVer, pHead->msgType); TD_VID(pTq->pVnode), fetchVer, pHead->msgType);
...@@ -373,9 +373,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -373,9 +373,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
} }
} }
taosMemoryFree(pHeadWithCkSum); taosMemoryFree(pCkHead);
} else if (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_DATA) { } else if (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_DATA) {
tqInfo("retrieve using snapshot req offset: uid %ld ts %ld", dataRsp.reqOffset.uid, dataRsp.reqOffset.ts); tqInfo("retrieve using snapshot req offset: uid %ld ts %ld, actual offset: uid %ld ts %ld", dataRsp.reqOffset.uid,
dataRsp.reqOffset.ts, fetchOffsetNew.uid, fetchOffsetNew.ts);
if (tqScanSnapshot(pTq, &pHandle->execHandle, &dataRsp, fetchOffsetNew, workerId) < 0) { if (tqScanSnapshot(pTq, &pHandle->execHandle, &dataRsp, fetchOffsetNew, workerId) < 0) {
ASSERT(0); ASSERT(0);
} }
...@@ -522,7 +523,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -522,7 +523,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
break; break;
} }
SWalReadHead* pHead = &pHeadWithCkSum->head; SWalCont* pHead = &pHeadWithCkSum->head;
tqDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch, tqDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch,
TD_VID(pTq->pVnode), fetchOffset, pHead->msgType); TD_VID(pTq->pVnode), fetchOffset, pHead->msgType);
...@@ -597,6 +598,8 @@ int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -597,6 +598,8 @@ int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) {
int32_t code = taosHashRemove(pTq->handles, pReq->subKey, strlen(pReq->subKey)); int32_t code = taosHashRemove(pTq->handles, pReq->subKey, strlen(pReq->subKey));
ASSERT(code == 0); ASSERT(code == 0);
tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);
if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) { if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) {
ASSERT(0); ASSERT(0);
} }
......
...@@ -97,6 +97,10 @@ int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset) { ...@@ -97,6 +97,10 @@ int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset) {
return taosHashPut(pStore->pHash, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset)); return taosHashPut(pStore->pHash, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset));
} }
int32_t tqOffsetDelete(STqOffsetStore* pStore, const char* subscribeKey) {
return taosHashRemove(pStore->pHash, subscribeKey, strlen(subscribeKey));
}
int32_t tqOffsetSnapshot(STqOffsetStore* pStore) { int32_t tqOffsetSnapshot(STqOffsetStore* pStore) {
// open file // open file
// TODO file name should be with a version // TODO file name should be with a version
......
...@@ -15,13 +15,13 @@ ...@@ -15,13 +15,13 @@
#include "tq.h" #include "tq.h"
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalHead** ppHeadWithCkSum) { int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead) {
int32_t code = 0; int32_t code = 0;
taosThreadMutexLock(&pHandle->pWalReader->mutex); taosThreadMutexLock(&pHandle->pWalReader->mutex);
int64_t offset = *fetchOffset; int64_t offset = *fetchOffset;
while (1) { while (1) {
if (walFetchHead(pHandle->pWalReader, offset, *ppHeadWithCkSum) < 0) { if (walFetchHead(pHandle->pWalReader, offset, *ppCkHead) < 0) {
tqDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", pHandle->consumerId, tqDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", pHandle->consumerId,
pHandle->epoch, TD_VID(pTq->pVnode), offset); pHandle->epoch, TD_VID(pTq->pVnode), offset);
*fetchOffset = offset - 1; *fetchOffset = offset - 1;
...@@ -29,8 +29,8 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalHead* ...@@ -29,8 +29,8 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalHead*
goto END; goto END;
} }
if ((*ppHeadWithCkSum)->head.msgType == TDMT_VND_SUBMIT) { if ((*ppCkHead)->head.msgType == TDMT_VND_SUBMIT) {
code = walFetchBody(pHandle->pWalReader, ppHeadWithCkSum); code = walFetchBody(pHandle->pWalReader, ppCkHead);
if (code < 0) { if (code < 0) {
ASSERT(0); ASSERT(0);
...@@ -43,9 +43,9 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalHead* ...@@ -43,9 +43,9 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalHead*
goto END; goto END;
} else { } else {
if (pHandle->fetchMeta) { if (pHandle->fetchMeta) {
SWalReadHead* pHead = &((*ppHeadWithCkSum)->head); SWalCont* pHead = &((*ppCkHead)->head);
if (IS_META_MSG(pHead->msgType)) { if (IS_META_MSG(pHead->msgType)) {
code = walFetchBody(pHandle->pWalReader, ppHeadWithCkSum); code = walFetchBody(pHandle->pWalReader, ppCkHead);
if (code < 0) { if (code < 0) {
ASSERT(0); ASSERT(0);
...@@ -58,7 +58,7 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalHead* ...@@ -58,7 +58,7 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalHead*
goto END; goto END;
} }
} }
code = walSkipFetchBody(pHandle->pWalReader, *ppHeadWithCkSum); code = walSkipFetchBody(pHandle->pWalReader, *ppCkHead);
if (code < 0) { if (code < 0) {
ASSERT(0); ASSERT(0);
*fetchOffset = offset; *fetchOffset = offset;
......
...@@ -287,6 +287,7 @@ typedef struct STableScanInfo { ...@@ -287,6 +287,7 @@ typedef struct STableScanInfo {
} lastStatus; } lastStatus;
int8_t scanMode; int8_t scanMode;
int8_t noTable;
} STableScanInfo; } STableScanInfo;
typedef struct STagScanInfo { typedef struct STagScanInfo {
......
...@@ -235,9 +235,11 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts) { ...@@ -235,9 +235,11 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
if (uid == 0) { if (uid == 0) {
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0); if (taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList) != 0) {
uid = pTableInfo->uid; STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0);
ts = INT64_MIN; uid = pTableInfo->uid;
ts = INT64_MIN;
}
} }
return doPrepareScan(pTaskInfo->pRoot, uid, ts); return doPrepareScan(pTaskInfo->pRoot, uid, ts);
......
...@@ -2829,21 +2829,28 @@ int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) { ...@@ -2829,21 +2829,28 @@ int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
pScanInfo->blockType = STREAM_INPUT__DATA_SCAN; pScanInfo->blockType = STREAM_INPUT__DATA_SCAN;
STableScanInfo* pInfo = pScanInfo->pSnapshotReadOp->info; STableScanInfo* pInfo = pScanInfo->pSnapshotReadOp->info;
if (uid == 0) {
pInfo->noTable = 1;
return TSDB_CODE_SUCCESS;
}
/*if (pSnapShotScanInfo->dataReader == NULL) {*/ /*if (pSnapShotScanInfo->dataReader == NULL) {*/
/*pSnapShotScanInfo->dataReader = tsdbReaderOpen(pHandle->vnode, &pSTInfo->cond, tableList, 0, 0);*/ /*pSnapShotScanInfo->dataReader = tsdbReaderOpen(pHandle->vnode, &pSTInfo->cond, tableList, 0, 0);*/
/*pSnapShotScanInfo->scanMode = TABLE_SCAN__TABLE_ORDER;*/ /*pSnapShotScanInfo->scanMode = TABLE_SCAN__TABLE_ORDER;*/
/*}*/ /*}*/
pInfo->noTable = 0;
if (pInfo->lastStatus.uid != uid || pInfo->lastStatus.ts != ts) { if (pInfo->lastStatus.uid != uid || pInfo->lastStatus.ts != ts) {
tsdbSetTableId(pInfo->dataReader, uid); tsdbSetTableId(pInfo->dataReader, uid);
int64_t oldSkey = pInfo->cond.twindows[0].skey; int64_t oldSkey = pInfo->cond.twindows[0].skey;
pInfo->cond.twindows[0].skey = ts; pInfo->cond.twindows[0].skey = ts + 1;
tsdbResetReadHandle(pInfo->dataReader, &pInfo->cond, 0); tsdbResetReadHandle(pInfo->dataReader, &pInfo->cond, 0);
pInfo->cond.twindows[0].skey = oldSkey; pInfo->cond.twindows[0].skey = oldSkey;
pInfo->scanTimes = 0; pInfo->scanTimes = 0;
pInfo->curTWinIdx = 0; pInfo->curTWinIdx = 0;
} }
return TSDB_CODE_SUCCESS;
} else { } else {
if (pOperator->numOfDownstream == 1) { if (pOperator->numOfDownstream == 1) {
...@@ -2856,8 +2863,6 @@ int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) { ...@@ -2856,8 +2863,6 @@ int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
} }
return TSDB_CODE_SUCCESS;
} }
int32_t doGetScanStatus(SOperatorInfo* pOperator, uint64_t* uid, int64_t* ts) { int32_t doGetScanStatus(SOperatorInfo* pOperator, uint64_t* uid, int64_t* ts) {
......
...@@ -518,6 +518,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { ...@@ -518,6 +518,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
// if scan table by table // if scan table by table
if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) { if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
if (pInfo->noTable) return NULL;
while (1) { while (1) {
SSDataBlock* result = doTableScanGroup(pOperator); SSDataBlock* result = doTableScanGroup(pOperator);
if (result) { if (result) {
......
...@@ -88,3 +88,12 @@ SStreamDataSubmit* streamSubmitRefClone(SStreamDataSubmit* pSubmit) { ...@@ -88,3 +88,12 @@ SStreamDataSubmit* streamSubmitRefClone(SStreamDataSubmit* pSubmit) {
memcpy(pSubmitClone, pSubmit, sizeof(SStreamDataSubmit)); memcpy(pSubmitClone, pSubmit, sizeof(SStreamDataSubmit));
return pSubmitClone; return pSubmitClone;
} }
void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit) {
int32_t ref = atomic_sub_fetch_32(pDataSubmit->dataRef, 1);
ASSERT(ref >= 0);
if (ref == 0) {
taosMemoryFree(pDataSubmit->data);
taosMemoryFree(pDataSubmit->dataRef);
}
}
...@@ -63,7 +63,6 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) ...@@ -63,7 +63,6 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
continue; continue;
} }
// TODO: do we need free memory?
SSDataBlock block = {0}; SSDataBlock block = {0};
assignOneDataBlock(&block, output); assignOneDataBlock(&block, output);
block.info.childId = pTask->selfChildId; block.info.childId = pTask->selfChildId;
......
...@@ -41,12 +41,3 @@ void streamQueueClose(SStreamQueue* queue) { ...@@ -41,12 +41,3 @@ void streamQueueClose(SStreamQueue* queue) {
return; return;
} }
} }
void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit) {
int32_t ref = atomic_sub_fetch_32(pDataSubmit->dataRef, 1);
ASSERT(ref >= 0);
if (ref == 0) {
taosMemoryFree(pDataSubmit->data);
taosMemoryFree(pDataSubmit->dataRef);
}
}
...@@ -27,7 +27,7 @@ extern "C" { ...@@ -27,7 +27,7 @@ extern "C" {
#endif #endif
// meta section begin // meta section begin
typedef struct WalFileInfo { typedef struct {
int64_t firstVer; int64_t firstVer;
int64_t lastVer; int64_t lastVer;
int64_t createTs; int64_t createTs;
...@@ -98,20 +98,20 @@ static inline int walBuildIdxName(SWal* pWal, int64_t fileFirstVer, char* buf) { ...@@ -98,20 +98,20 @@ static inline int walBuildIdxName(SWal* pWal, int64_t fileFirstVer, char* buf) {
return sprintf(buf, "%s/%020" PRId64 "." WAL_INDEX_SUFFIX, pWal->path, fileFirstVer); return sprintf(buf, "%s/%020" PRId64 "." WAL_INDEX_SUFFIX, pWal->path, fileFirstVer);
} }
static inline int walValidHeadCksum(SWalHead* pHead) { static inline int walValidHeadCksum(SWalCkHead* pHead) {
return taosCheckChecksum((uint8_t*)&pHead->head, sizeof(SWalReadHead), pHead->cksumHead); return taosCheckChecksum((uint8_t*)&pHead->head, sizeof(SWalCont), pHead->cksumHead);
} }
static inline int walValidBodyCksum(SWalHead* pHead) { static inline int walValidBodyCksum(SWalCkHead* pHead) {
return taosCheckChecksum((uint8_t*)pHead->head.body, pHead->head.bodyLen, pHead->cksumBody); return taosCheckChecksum((uint8_t*)pHead->head.body, pHead->head.bodyLen, pHead->cksumBody);
} }
static inline int walValidCksum(SWalHead* pHead, void* body, int64_t bodyLen) { static inline int walValidCksum(SWalCkHead* pHead, void* body, int64_t bodyLen) {
return walValidHeadCksum(pHead) && walValidBodyCksum(pHead); return walValidHeadCksum(pHead) && walValidBodyCksum(pHead);
} }
static inline uint32_t walCalcHeadCksum(SWalHead* pHead) { static inline uint32_t walCalcHeadCksum(SWalCkHead* pHead) {
return taosCalcChecksum(0, (uint8_t*)&pHead->head, sizeof(SWalReadHead)); return taosCalcChecksum(0, (uint8_t*)&pHead->head, sizeof(SWalCont));
} }
static inline uint32_t walCalcBodyCksum(const void* body, uint32_t len) { static inline uint32_t walCalcBodyCksum(const void* body, uint32_t len) {
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
#include "cJSON.h" #include "cJSON.h"
#include "os.h" #include "os.h"
#include "taoserror.h" #include "taoserror.h"
#include "tref.h" #include "tutil.h"
#include "walInt.h" #include "walInt.h"
bool FORCE_INLINE walLogExist(SWal* pWal, int64_t ver) { bool FORCE_INLINE walLogExist(SWal* pWal, int64_t ver) {
...@@ -37,26 +37,9 @@ static FORCE_INLINE int walBuildMetaName(SWal* pWal, int metaVer, char* buf) { ...@@ -37,26 +37,9 @@ static FORCE_INLINE int walBuildMetaName(SWal* pWal, int metaVer, char* buf) {
return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer); return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer);
} }
void* tmemmem(char* haystack, int hlen, char* needle, int nlen) {
char* limit;
if (nlen == 0 || hlen < nlen) {
return NULL;
}
limit = haystack + hlen - nlen + 1;
while ((haystack = (char*)memchr(haystack, needle[0], limit - haystack)) != NULL) {
if (memcmp(haystack, needle, nlen) == 0) {
return haystack;
}
haystack++;
}
return NULL;
}
static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) { static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) {
ASSERT(pWal->fileInfoSet != NULL); ASSERT(pWal->fileInfoSet != NULL);
int sz = taosArrayGetSize(pWal->fileInfoSet); int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
ASSERT(sz > 0); ASSERT(sz > 0);
#if 0 #if 0
for (int i = 0; i < sz; i++) { for (int i = 0; i < sz; i++) {
...@@ -101,14 +84,14 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) { ...@@ -101,14 +84,14 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) {
char* candidate; char* candidate;
while ((candidate = tmemmem(haystack, readSize - (haystack - buf), (char*)&magic, sizeof(uint64_t))) != NULL) { while ((candidate = tmemmem(haystack, readSize - (haystack - buf), (char*)&magic, sizeof(uint64_t))) != NULL) {
// read and validate // read and validate
SWalHead* logContent = (SWalHead*)candidate; SWalCkHead* logContent = (SWalCkHead*)candidate;
if (walValidHeadCksum(logContent) == 0 && walValidBodyCksum(logContent) == 0) { if (walValidHeadCksum(logContent) == 0 && walValidBodyCksum(logContent) == 0) {
found = candidate; found = candidate;
} }
haystack = candidate + 1; haystack = candidate + 1;
} }
if (found == buf) { if (found == buf) {
SWalHead* logContent = (SWalHead*)found; SWalCkHead* logContent = (SWalCkHead*)found;
if (walValidHeadCksum(logContent) != 0 || walValidBodyCksum(logContent) != 0) { if (walValidHeadCksum(logContent) != 0 || walValidBodyCksum(logContent) != 0) {
// file has to be deleted // file has to be deleted
taosMemoryFree(buf); taosMemoryFree(buf);
...@@ -118,7 +101,7 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) { ...@@ -118,7 +101,7 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) {
} }
} }
taosCloseFile(&pFile); taosCloseFile(&pFile);
SWalHead* lastEntry = (SWalHead*)found; SWalCkHead* lastEntry = (SWalCkHead*)found;
return lastEntry->head.version; return lastEntry->head.version;
} }
......
...@@ -117,8 +117,8 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { ...@@ -117,8 +117,8 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
pWal->lastRollSeq = -1; pWal->lastRollSeq = -1;
// init write buffer // init write buffer
memset(&pWal->writeHead, 0, sizeof(SWalHead)); memset(&pWal->writeHead, 0, sizeof(SWalCkHead));
pWal->writeHead.head.headVer = WAL_HEAD_VER; pWal->writeHead.head.protoVer = WAL_PROTO_VER;
pWal->writeHead.magic = WAL_MAGIC; pWal->writeHead.magic = WAL_MAGIC;
if (taosThreadMutexInit(&pWal->mutex, NULL) < 0) { if (taosThreadMutexInit(&pWal->mutex, NULL) < 0) {
......
...@@ -33,7 +33,7 @@ SWalReadHandle *walOpenReadHandle(SWal *pWal) { ...@@ -33,7 +33,7 @@ SWalReadHandle *walOpenReadHandle(SWal *pWal) {
taosThreadMutexInit(&pRead->mutex, NULL); taosThreadMutexInit(&pRead->mutex, NULL);
pRead->pHead = taosMemoryMalloc(sizeof(SWalHead)); pRead->pHead = taosMemoryMalloc(sizeof(SWalCkHead));
if (pRead->pHead == NULL) { if (pRead->pHead == NULL) {
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
taosMemoryFree(pRead); taosMemoryFree(pRead);
...@@ -155,7 +155,7 @@ static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) { ...@@ -155,7 +155,7 @@ static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) {
void walSetReaderCapacity(SWalReadHandle *pRead, int32_t capacity) { pRead->capacity = capacity; } void walSetReaderCapacity(SWalReadHandle *pRead, int32_t capacity) { pRead->capacity = capacity; }
int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalHead *pHead) { int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalCkHead *pHead) {
int64_t code; int64_t code;
// TODO: valid ver // TODO: valid ver
...@@ -170,8 +170,8 @@ int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalHead *pHead) { ...@@ -170,8 +170,8 @@ int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalHead *pHead) {
ASSERT(taosValidFile(pRead->pReadLogTFile) == true); ASSERT(taosValidFile(pRead->pReadLogTFile) == true);
code = taosReadFile(pRead->pReadLogTFile, pHead, sizeof(SWalHead)); code = taosReadFile(pRead->pReadLogTFile, pHead, sizeof(SWalCkHead));
if (code != sizeof(SWalHead)) { if (code != sizeof(SWalCkHead)) {
return -1; return -1;
} }
...@@ -186,7 +186,7 @@ int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalHead *pHead) { ...@@ -186,7 +186,7 @@ int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalHead *pHead) {
return 0; return 0;
} }
int32_t walSkipFetchBody(SWalReadHandle *pRead, const SWalHead *pHead) { int32_t walSkipFetchBody(SWalReadHandle *pRead, const SWalCkHead *pHead) {
int64_t code; int64_t code;
ASSERT(pRead->curVersion == pHead->head.version); ASSERT(pRead->curVersion == pHead->head.version);
...@@ -203,12 +203,12 @@ int32_t walSkipFetchBody(SWalReadHandle *pRead, const SWalHead *pHead) { ...@@ -203,12 +203,12 @@ int32_t walSkipFetchBody(SWalReadHandle *pRead, const SWalHead *pHead) {
return 0; return 0;
} }
int32_t walFetchBody(SWalReadHandle *pRead, SWalHead **ppHead) { int32_t walFetchBody(SWalReadHandle *pRead, SWalCkHead **ppHead) {
SWalReadHead *pReadHead = &((*ppHead)->head); SWalCont *pReadHead = &((*ppHead)->head);
int64_t ver = pReadHead->version; int64_t ver = pReadHead->version;
if (pRead->capacity < pReadHead->bodyLen) { if (pRead->capacity < pReadHead->bodyLen) {
void *ptr = taosMemoryRealloc(*ppHead, sizeof(SWalHead) + pReadHead->bodyLen); void *ptr = taosMemoryRealloc(*ppHead, sizeof(SWalCkHead) + pReadHead->bodyLen);
if (ptr == NULL) { if (ptr == NULL) {
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
return -1; return -1;
...@@ -241,18 +241,18 @@ int32_t walFetchBody(SWalReadHandle *pRead, SWalHead **ppHead) { ...@@ -241,18 +241,18 @@ int32_t walFetchBody(SWalReadHandle *pRead, SWalHead **ppHead) {
return 0; return 0;
} }
int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalReadHead **ppHead) { int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalCont **ppHead) {
taosThreadMutexLock(&pRead->mutex); taosThreadMutexLock(&pRead->mutex);
if (walReadWithHandle(pRead, ver) < 0) { if (walReadWithHandle(pRead, ver) < 0) {
taosThreadMutexUnlock(&pRead->mutex); taosThreadMutexUnlock(&pRead->mutex);
return -1; return -1;
} }
*ppHead = taosMemoryMalloc(sizeof(SWalReadHead) + pRead->pHead->head.bodyLen); *ppHead = taosMemoryMalloc(sizeof(SWalCont) + pRead->pHead->head.bodyLen);
if (*ppHead == NULL) { if (*ppHead == NULL) {
taosThreadMutexUnlock(&pRead->mutex); taosThreadMutexUnlock(&pRead->mutex);
return -1; return -1;
} }
memcpy(*ppHead, &pRead->pHead->head, sizeof(SWalReadHead) + pRead->pHead->head.bodyLen); memcpy(*ppHead, &pRead->pHead->head, sizeof(SWalCont) + pRead->pHead->head.bodyLen);
taosThreadMutexUnlock(&pRead->mutex); taosThreadMutexUnlock(&pRead->mutex);
return 0; return 0;
} }
...@@ -282,8 +282,8 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) { ...@@ -282,8 +282,8 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
ASSERT(taosValidFile(pRead->pReadLogTFile) == true); ASSERT(taosValidFile(pRead->pReadLogTFile) == true);
code = taosReadFile(pRead->pReadLogTFile, pRead->pHead, sizeof(SWalHead)); code = taosReadFile(pRead->pReadLogTFile, pRead->pHead, sizeof(SWalCkHead));
if (code != sizeof(SWalHead)) { if (code != sizeof(SWalCkHead)) {
if (code < 0) if (code < 0)
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
else { else {
...@@ -301,7 +301,7 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) { ...@@ -301,7 +301,7 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
} }
if (pRead->capacity < pRead->pHead->head.bodyLen) { if (pRead->capacity < pRead->pHead->head.bodyLen) {
void *ptr = taosMemoryRealloc(pRead->pHead, sizeof(SWalHead) + pRead->pHead->head.bodyLen); void *ptr = taosMemoryRealloc(pRead->pHead, sizeof(SWalCkHead) + pRead->pHead->head.bodyLen);
if (ptr == NULL) { if (ptr == NULL) {
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
return -1; return -1;
......
...@@ -142,10 +142,10 @@ int32_t walRollback(SWal *pWal, int64_t ver) { ...@@ -142,10 +142,10 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
return -1; return -1;
} }
// validate offset // validate offset
SWalHead head; SWalCkHead head;
ASSERT(taosValidFile(pLogTFile)); ASSERT(taosValidFile(pLogTFile));
int64_t size = taosReadFile(pLogTFile, &head, sizeof(SWalHead)); int64_t size = taosReadFile(pLogTFile, &head, sizeof(SWalCkHead));
if (size != sizeof(SWalHead)) { if (size != sizeof(SWalCkHead)) {
return -1; return -1;
} }
code = walValidHeadCksum(&head); code = walValidHeadCksum(&head);
...@@ -261,7 +261,7 @@ int32_t walEndSnapshot(SWal *pWal) { ...@@ -261,7 +261,7 @@ int32_t walEndSnapshot(SWal *pWal) {
} }
int walRoll(SWal *pWal) { int walRoll(SWal *pWal) {
int code = 0; int32_t code = 0;
if (pWal->pWriteIdxTFile != NULL) { if (pWal->pWriteIdxTFile != NULL) {
code = taosCloseFile(&pWal->pWriteIdxTFile); code = taosCloseFile(&pWal->pWriteIdxTFile);
if (code != 0) { if (code != 0) {
...@@ -321,12 +321,13 @@ static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { ...@@ -321,12 +321,13 @@ static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
return 0; return 0;
} }
int64_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLogMeta syncMeta, const void *body, int32_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLogMeta syncMeta, const void *body,
int32_t bodyLen) { int32_t bodyLen) {
int code = 0; int32_t code = 0;
// no wal // no wal
if (pWal->cfg.level == TAOS_WAL_NOLOG) return 0; if (pWal->cfg.level == TAOS_WAL_NOLOG) return 0;
if (bodyLen > TSDB_MAX_WAL_SIZE) { if (bodyLen > TSDB_MAX_WAL_SIZE) {
terrno = TSDB_CODE_WAL_SIZE_LIMIT; terrno = TSDB_CODE_WAL_SIZE_LIMIT;
return -1; return -1;
...@@ -356,6 +357,7 @@ int64_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLog ...@@ -356,6 +357,7 @@ int64_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLog
terrno = TSDB_CODE_WAL_INVALID_VER; terrno = TSDB_CODE_WAL_INVALID_VER;
return -1; return -1;
} }
/*if (!tfValid(pWal->pWriteLogTFile)) return -1;*/ /*if (!tfValid(pWal->pWriteLogTFile)) return -1;*/
ASSERT(pWal->writeCur >= 0); ASSERT(pWal->writeCur >= 0);
...@@ -380,7 +382,7 @@ int64_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLog ...@@ -380,7 +382,7 @@ int64_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLog
pWal->writeHead.cksumHead = walCalcHeadCksum(&pWal->writeHead); pWal->writeHead.cksumHead = walCalcHeadCksum(&pWal->writeHead);
pWal->writeHead.cksumBody = walCalcBodyCksum(body, bodyLen); pWal->writeHead.cksumBody = walCalcBodyCksum(body, bodyLen);
if (taosWriteFile(pWal->pWriteLogTFile, &pWal->writeHead, sizeof(SWalHead)) != sizeof(SWalHead)) { if (taosWriteFile(pWal->pWriteLogTFile, &pWal->writeHead, sizeof(SWalCkHead)) != sizeof(SWalCkHead)) {
// TODO ftruncate // TODO ftruncate
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
...@@ -405,19 +407,19 @@ int64_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLog ...@@ -405,19 +407,19 @@ int64_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLog
// set status // set status
if (pWal->vers.firstVer == -1) pWal->vers.firstVer = index; if (pWal->vers.firstVer == -1) pWal->vers.firstVer = index;
pWal->vers.lastVer = index; pWal->vers.lastVer = index;
pWal->totSize += sizeof(SWalHead) + bodyLen; pWal->totSize += sizeof(SWalCkHead) + bodyLen;
if (walGetCurFileInfo(pWal)->firstVer == -1) { if (walGetCurFileInfo(pWal)->firstVer == -1) {
walGetCurFileInfo(pWal)->firstVer = index; walGetCurFileInfo(pWal)->firstVer = index;
} }
walGetCurFileInfo(pWal)->lastVer = index; walGetCurFileInfo(pWal)->lastVer = index;
walGetCurFileInfo(pWal)->fileSize += sizeof(SWalHead) + bodyLen; walGetCurFileInfo(pWal)->fileSize += sizeof(SWalCkHead) + bodyLen;
taosThreadMutexUnlock(&pWal->mutex); taosThreadMutexUnlock(&pWal->mutex);
return 0; return 0;
} }
int64_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, int32_t bodyLen) { int32_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, int32_t bodyLen) {
SSyncLogMeta syncMeta = { SSyncLogMeta syncMeta = {
.isWeek = -1, .isWeek = -1,
.seqNum = UINT64_MAX, .seqNum = UINT64_MAX,
...@@ -435,27 +437,3 @@ void walFsync(SWal *pWal, bool forceFsync) { ...@@ -435,27 +437,3 @@ void walFsync(SWal *pWal, bool forceFsync) {
} }
} }
} }
/*static int walValidateOffset(SWal* pWal, int64_t ver) {*/
/*int code = 0;*/
/*SWalHead *pHead = NULL;*/
/*code = (int)walRead(pWal, &pHead, ver);*/
/*if(pHead->head.version != ver) {*/
/*return -1;*/
/*}*/
/*return 0;*/
/*}*/
/*static int64_t walGetOffset(SWal* pWal, int64_t ver) {*/
/*int code = walSeekVer(pWal, ver);*/
/*if(code != 0) {*/
/*return -1;*/
/*}*/
/*code = walValidateOffset(pWal, ver);*/
/*if(code != 0) {*/
/*return -1;*/
/*}*/
/*return 0;*/
/*}*/
...@@ -148,7 +148,7 @@ TEST_F(WalCleanEnv, createNew) { ...@@ -148,7 +148,7 @@ TEST_F(WalCleanEnv, createNew) {
walRollFileInfo(pWal); walRollFileInfo(pWal);
ASSERT(pWal->fileInfoSet != NULL); ASSERT(pWal->fileInfoSet != NULL);
ASSERT_EQ(pWal->fileInfoSet->size, 1); ASSERT_EQ(pWal->fileInfoSet->size, 1);
WalFileInfo* pInfo = (WalFileInfo*)taosArrayGetLast(pWal->fileInfoSet); SWalFileInfo* pInfo = (SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet);
ASSERT_EQ(pInfo->firstVer, 0); ASSERT_EQ(pInfo->firstVer, 0);
ASSERT_EQ(pInfo->lastVer, -1); ASSERT_EQ(pInfo->lastVer, -1);
ASSERT_EQ(pInfo->closeTs, -1); ASSERT_EQ(pInfo->closeTs, -1);
......
...@@ -300,16 +300,14 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) { ...@@ -300,16 +300,14 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) {
return pFile; return pFile;
} }
int64_t taosCloseFile(TdFilePtr *ppFile) { int32_t taosCloseFile(TdFilePtr *ppFile) {
int32_t code = 0;
if (ppFile == NULL || *ppFile == NULL) { if (ppFile == NULL || *ppFile == NULL) {
return 0; return 0;
} }
#if FILE_WITH_LOCK #if FILE_WITH_LOCK
taosThreadRwlockWrlock(&((*ppFile)->rwlock)); taosThreadRwlockWrlock(&((*ppFile)->rwlock));
#endif #endif
if (ppFile == NULL || *ppFile == NULL) {
return 0;
}
if ((*ppFile)->fp != NULL) { if ((*ppFile)->fp != NULL) {
fflush((*ppFile)->fp); fflush((*ppFile)->fp);
fclose((*ppFile)->fp); fclose((*ppFile)->fp);
...@@ -320,9 +318,10 @@ int64_t taosCloseFile(TdFilePtr *ppFile) { ...@@ -320,9 +318,10 @@ int64_t taosCloseFile(TdFilePtr *ppFile) {
HANDLE h = (HANDLE)_get_osfhandle((*ppFile)->fd); HANDLE h = (HANDLE)_get_osfhandle((*ppFile)->fd);
!FlushFileBuffers(h); !FlushFileBuffers(h);
#else #else
fsync((*ppFile)->fd); // warning: never fsync silently in base lib
/*fsync((*ppFile)->fd);*/
#endif #endif
close((*ppFile)->fd); code = close((*ppFile)->fd);
(*ppFile)->fd = -1; (*ppFile)->fd = -1;
} }
(*ppFile)->refId = 0; (*ppFile)->refId = 0;
...@@ -332,7 +331,7 @@ int64_t taosCloseFile(TdFilePtr *ppFile) { ...@@ -332,7 +331,7 @@ int64_t taosCloseFile(TdFilePtr *ppFile) {
#endif #endif
taosMemoryFree(*ppFile); taosMemoryFree(*ppFile);
*ppFile = NULL; *ppFile = NULL;
return 0; return code;
} }
int64_t taosReadFile(TdFilePtr pFile, void *buf, int64_t count) { int64_t taosReadFile(TdFilePtr pFile, void *buf, int64_t count) {
...@@ -560,6 +559,8 @@ int32_t taosFsyncFile(TdFilePtr pFile) { ...@@ -560,6 +559,8 @@ int32_t taosFsyncFile(TdFilePtr pFile) {
return 0; return 0;
} }
// this implementation is WRONG
// fflush is not a replacement of fsync
if (pFile->fp != NULL) return fflush(pFile->fp); if (pFile->fp != NULL) return fflush(pFile->fp);
if (pFile->fd >= 0) { if (pFile->fd >= 0) {
#ifdef WINDOWS #ifdef WINDOWS
......
...@@ -206,11 +206,11 @@ void* taosArrayGetP(const SArray* pArray, size_t index) { ...@@ -206,11 +206,11 @@ void* taosArrayGetP(const SArray* pArray, size_t index) {
void* taosArrayGetLast(const SArray* pArray) { return TARRAY_GET_ELEM(pArray, pArray->size - 1); } void* taosArrayGetLast(const SArray* pArray) { return TARRAY_GET_ELEM(pArray, pArray->size - 1); }
size_t taosArrayGetSize(const SArray* pArray) { int32_t taosArrayGetSize(const SArray* pArray) {
if (pArray == NULL) { if (pArray == NULL) {
return 0; return 0;
} }
return pArray->size; return (int32_t)pArray->size;
} }
void taosArraySetSize(SArray* pArray, size_t size) { void taosArraySetSize(SArray* pArray, size_t size) {
......
...@@ -16,6 +16,23 @@ ...@@ -16,6 +16,23 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "tutil.h" #include "tutil.h"
void *tmemmem(const char *haystack, int32_t hlen, const char *needle, int32_t nlen) {
const char *limit;
if (nlen == 0 || hlen < nlen) {
return NULL;
}
limit = haystack + hlen - nlen + 1;
while ((haystack = (char *)memchr(haystack, needle[0], limit - haystack)) != NULL) {
if (memcmp(haystack, needle, nlen) == 0) {
return (void *)haystack;
}
haystack++;
}
return NULL;
}
int32_t strdequote(char *z) { int32_t strdequote(char *z) {
if (z == NULL) { if (z == NULL) {
return 0; return 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册