提交 52d09b41 编写于 作者: L Liu Jicong

refactor(wal)

上级 7a655895
......@@ -61,45 +61,23 @@ extern "C" {
} \
}
#define WAL_HEAD_VER 0
#define WAL_PROTO_VER 0
#define WAL_NOSUFFIX_LEN 20
#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN + 1)
#define WAL_LOG_SUFFIX "log"
#define WAL_INDEX_SUFFIX "idx"
#define WAL_REFRESH_MS 1000
#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead))
#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalCkHead))
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
#define WAL_MAGIC 0xFAFBFCFDULL
#pragma pack(push, 1)
typedef enum {
TAOS_WAL_NOLOG = 0,
TAOS_WAL_WRITE = 1,
TAOS_WAL_FSYNC = 2,
} EWalType;
// used by sync module
typedef struct {
int8_t isWeek;
uint64_t seqNum;
uint64_t term;
} SSyncLogMeta;
typedef struct SWalReadHead {
int8_t headVer;
int8_t reserved;
int16_t msgType;
int32_t bodyLen;
int64_t ingestTs; // not implemented
int64_t version;
// sync meta
SSyncLogMeta syncMeta;
char body[];
} SWalReadHead;
typedef struct {
int32_t vgId;
int32_t fsyncPeriod; // millisecond
......@@ -110,13 +88,6 @@ typedef struct {
EWalType level; // wal level
} SWalCfg;
typedef struct {
uint64_t magic;
uint32_t cksumHead;
uint32_t cksumBody;
SWalReadHead head;
} SWalHead;
typedef struct SWalVer {
int64_t firstVer;
int64_t verInSnapshotting;
......@@ -125,6 +96,35 @@ typedef struct SWalVer {
int64_t lastVer;
} SWalVer;
#pragma pack(push, 1)
// used by sync module
typedef struct {
int8_t isWeek;
uint64_t seqNum;
uint64_t term;
} SSyncLogMeta;
typedef struct {
int8_t protoVer;
int64_t version;
int16_t msgType;
int32_t bodyLen;
int64_t ingestTs; // not implemented
// sync meta
SSyncLogMeta syncMeta;
char body[];
} SWalCont;
typedef struct {
uint64_t magic;
uint32_t cksumHead;
uint32_t cksumBody;
SWalCont head;
} SWalCkHead;
#pragma pack(pop)
typedef struct SWal {
// cfg
SWalCfg cfg;
......@@ -134,7 +134,7 @@ typedef struct SWal {
TdFilePtr pWriteLogTFile;
TdFilePtr pWriteIdxTFile;
int32_t writeCur;
SArray *fileInfoSet;
SArray *fileInfoSet; // SArray<SWalFileInfo>
// status
int64_t totSize;
int64_t lastRollSeq;
......@@ -146,7 +146,7 @@ typedef struct SWal {
// path
char path[WAL_PATH_LEN];
// reusable write head
SWalHead writeHead;
SWalCkHead writeHead;
} SWal; // WAL HANDLE
typedef struct SWalReadHandle {
......@@ -158,11 +158,8 @@ typedef struct SWalReadHandle {
int64_t capacity;
int64_t status; // if cursor valid
TdThreadMutex mutex;
SWalHead *pHead;
SWalCkHead *pHead;
} SWalReadHandle;
#pragma pack(pop)
// typedef int32_t (*FWalWrite)(void *ahandle, void *pHead);
// module initialization
int32_t walInit();
......@@ -174,9 +171,9 @@ int32_t walAlter(SWal *, SWalCfg *pCfg);
void walClose(SWal *);
// write
int64_t walWriteWithSyncInfo(SWal *, int64_t index, tmsg_t msgType, SSyncLogMeta syncMeta, const void *body,
int32_t walWriteWithSyncInfo(SWal *, int64_t index, tmsg_t msgType, SSyncLogMeta syncMeta, const void *body,
int32_t bodyLen);
int64_t walWrite(SWal *, int64_t index, tmsg_t msgType, const void *body, int32_t bodyLen);
int32_t walWrite(SWal *, int64_t index, tmsg_t msgType, const void *body, int32_t bodyLen);
void walFsync(SWal *, bool force);
// apis for lifecycle management
......@@ -196,9 +193,9 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver);
// only for tq usage
void walSetReaderCapacity(SWalReadHandle *pRead, int32_t capacity);
int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalHead *pHead);
int32_t walFetchBody(SWalReadHandle *pRead, SWalHead **ppHead);
int32_t walSkipFetchBody(SWalReadHandle *pRead, const SWalHead *pHead);
int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalCkHead *pHead);
int32_t walFetchBody(SWalReadHandle *pRead, SWalCkHead **ppHead);
int32_t walSkipFetchBody(SWalReadHandle *pRead, const SWalCkHead *pHead);
typedef struct {
int64_t refId;
......
......@@ -25,17 +25,17 @@ extern "C" {
// If the error is in a third-party library, place this header file under the third-party library header file.
// When you want to use this feature, you should find or add the same function in the following sectio
#ifndef ALLOW_FORBID_FUNC
#define open OPEN_FUNC_TAOS_FORBID
#define fopen FOPEN_FUNC_TAOS_FORBID
#define access ACCESS_FUNC_TAOS_FORBID
#define stat STAT_FUNC_TAOS_FORBID
#define lstat LSTAT_FUNC_TAOS_FORBID
#define fstat FSTAT_FUNC_TAOS_FORBID
#define close CLOSE_FUNC_TAOS_FORBID
#define fclose FCLOSE_FUNC_TAOS_FORBID
#define fsync FSYNC_FUNC_TAOS_FORBID
#define getline GETLINE_FUNC_TAOS_FORBID
// #define fflush FFLUSH_FUNC_TAOS_FORBID
#define open OPEN_FUNC_TAOS_FORBID
#define fopen FOPEN_FUNC_TAOS_FORBID
#define access ACCESS_FUNC_TAOS_FORBID
#define stat STAT_FUNC_TAOS_FORBID
#define lstat LSTAT_FUNC_TAOS_FORBID
#define fstat FSTAT_FUNC_TAOS_FORBID
#define close CLOSE_FUNC_TAOS_FORBID
#define fclose FCLOSE_FUNC_TAOS_FORBID
#define fsync FSYNC_FUNC_TAOS_FORBID
#define getline GETLINE_FUNC_TAOS_FORBID
// #define fflush FFLUSH_FUNC_TAOS_FORBID
#endif
#ifndef PATH_MAX
......@@ -43,54 +43,54 @@ extern "C" {
#endif
typedef struct TdFile *TdFilePtr;
#define TD_FILE_CREATE 0x0001
#define TD_FILE_WRITE 0x0002
#define TD_FILE_READ 0x0004
#define TD_FILE_TRUNC 0x0008
#define TD_FILE_APPEND 0x0010
#define TD_FILE_TEXT 0x0020
#define TD_FILE_AUTO_DEL 0x0040
#define TD_FILE_EXCL 0x0080
#define TD_FILE_STREAM 0x0100 // Only support taosFprintfFile, taosGetLineFile, taosEOFFile
TdFilePtr taosOpenFile(const char *path,int32_t tdFileOptions);
#define TD_FILE_CREATE 0x0001
#define TD_FILE_WRITE 0x0002
#define TD_FILE_READ 0x0004
#define TD_FILE_TRUNC 0x0008
#define TD_FILE_APPEND 0x0010
#define TD_FILE_TEXT 0x0020
#define TD_FILE_AUTO_DEL 0x0040
#define TD_FILE_EXCL 0x0080
#define TD_FILE_STREAM 0x0100 // Only support taosFprintfFile, taosGetLineFile, taosEOFFile
TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions);
#define TD_FILE_ACCESS_EXIST_OK 0x1
#define TD_FILE_ACCESS_READ_OK 0x2
#define TD_FILE_ACCESS_WRITE_OK 0x4
bool taosCheckAccessFile(const char *pathname, int mode);
bool taosCheckAccessFile(const char *pathname, int mode);
int32_t taosLockFile(TdFilePtr pFile);
int32_t taosUnLockFile(TdFilePtr pFile);
int32_t taosUmaskFile(int32_t maskVal);
int32_t taosStatFile(const char *path, int64_t *size, int32_t *mtime);
int32_t taosDevInoFile(TdFilePtr pFile, int64_t *stDev, int64_t *stIno);
int32_t taosFStatFile(TdFilePtr pFile, int64_t *size, int32_t *mtime);
bool taosCheckExistFile(const char *pathname);
int64_t taosLSeekFile(TdFilePtr pFile, int64_t offset, int32_t whence);
int32_t taosFtruncateFile(TdFilePtr pFile, int64_t length);
int32_t taosFsyncFile(TdFilePtr pFile);
int64_t taosReadFile(TdFilePtr pFile, void *buf, int64_t count);
int64_t taosPReadFile(TdFilePtr pFile, void *buf, int64_t count, int64_t offset);
int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count);
void taosFprintfFile(TdFilePtr pFile, const char *format, ...);
int64_t taosGetLineFile(TdFilePtr pFile, char ** __restrict ptrBuf);
int64_t taosGetLineFile(TdFilePtr pFile, char **__restrict ptrBuf);
int64_t taosGetsFile(TdFilePtr pFile, int32_t maxSize, char *__restrict buf);
int32_t taosEOFFile(TdFilePtr pFile);
int64_t taosCloseFile(TdFilePtr *ppFile);
int32_t taosCloseFile(TdFilePtr *ppFile);
int32_t taosRenameFile(const char *oldName, const char *newName);
int64_t taosCopyFile(const char *from, const char *to);
int32_t taosRemoveFile(const char *path);
void taosGetTmpfilePath(const char *inputTmpDir, const char *fileNamePrefix, char *dstPath);
void taosGetTmpfilePath(const char *inputTmpDir, const char *fileNamePrefix, char *dstPath);
int64_t taosFSendFile(TdFilePtr pFileOut, TdFilePtr pFileIn, int64_t *offset, int64_t size);
......
......@@ -139,7 +139,7 @@ void* taosArrayGetLast(const SArray* pArray);
* @param pArray
* @return
*/
size_t taosArrayGetSize(const SArray* pArray);
int32_t taosArrayGetSize(const SArray* pArray);
/**
* set the size of array
......
......@@ -44,6 +44,8 @@ uint32_t ip2uint(const char *const ip_addr);
void taosIp2String(uint32_t ip, char *str);
void taosIpPort2String(uint32_t ip, uint16_t port, char *str);
void *tmemmem(const char *haystack, int hlen, const char *needle, int nlen);
static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *target) {
T_MD5_CTX context;
tMD5Init(&context);
......@@ -59,10 +61,10 @@ static FORCE_INLINE void taosEncryptPass_c(uint8_t *inBuf, size_t len, char *tar
tMD5Final(&context);
char buf[TSDB_PASSWORD_LEN + 1];
sprintf(buf, "%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", context.digest[0],
context.digest[1], context.digest[2], context.digest[3], context.digest[4], context.digest[5],
context.digest[6], context.digest[7], context.digest[8], context.digest[9], context.digest[10],
context.digest[11], context.digest[12], context.digest[13], context.digest[14], context.digest[15]);
sprintf(buf, "%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", context.digest[0], context.digest[1],
context.digest[2], context.digest[3], context.digest[4], context.digest[5], context.digest[6],
context.digest[7], context.digest[8], context.digest[9], context.digest[10], context.digest[11],
context.digest[12], context.digest[13], context.digest[14], context.digest[15]);
memcpy(target, buf, TSDB_PASSWORD_LEN);
}
......
......@@ -157,7 +157,7 @@ typedef struct {
static STqMgmt tqMgmt = {0};
// tqRead
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalHead** pHeadWithCkSum);
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum);
// tqExec
int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp, int32_t workerId);
......@@ -178,6 +178,7 @@ STqOffsetStore* tqOffsetOpen();
void tqOffsetClose(STqOffsetStore*);
STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey);
int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset);
int32_t tqOffsetDelete(STqOffsetStore* pStore, const char* subscribeKey);
int32_t tqOffsetSnapshot(STqOffsetStore* pStore);
// tqSink
......
......@@ -271,8 +271,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pReq->subKey);
if (pOffset != NULL) {
fetchOffsetNew = pOffset->val;
char formatBuf[50];
tFormatOffset(formatBuf, 50, &fetchOffsetNew);
char formatBuf[80];
tFormatOffset(formatBuf, 80, &fetchOffsetNew);
tqDebug("tmq poll: consumer %ld, offset reset to %s", consumerId, formatBuf);
} else {
if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
......@@ -302,9 +302,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType);
if (fetchOffsetNew.type == TMQ_OFFSET__LOG) {
int64_t fetchVer = fetchOffsetNew.version + 1;
SWalHead* pHeadWithCkSum = taosMemoryMalloc(sizeof(SWalHead) + 2048);
if (pHeadWithCkSum == NULL) {
int64_t fetchVer = fetchOffsetNew.version + 1;
SWalCkHead* pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
if (pCkHead == NULL) {
return -1;
}
......@@ -318,7 +318,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
break;
}
if (tqFetchLog(pTq, pHandle, &fetchVer, &pHeadWithCkSum) < 0) {
if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead) < 0) {
// TODO add push mgr
tqOffsetResetToLog(&dataRsp.rspOffset, fetchVer);
......@@ -329,7 +329,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
goto OVER;
}
SWalReadHead* pHead = &pHeadWithCkSum->head;
SWalCont* pHead = &pCkHead->head;
tqDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch,
TD_VID(pTq->pVnode), fetchVer, pHead->msgType);
......@@ -373,9 +373,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
}
}
taosMemoryFree(pHeadWithCkSum);
taosMemoryFree(pCkHead);
} else if (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_DATA) {
tqInfo("retrieve using snapshot req offset: uid %ld ts %ld", dataRsp.reqOffset.uid, dataRsp.reqOffset.ts);
tqInfo("retrieve using snapshot req offset: uid %ld ts %ld, actual offset: uid %ld ts %ld", dataRsp.reqOffset.uid,
dataRsp.reqOffset.ts, fetchOffsetNew.uid, fetchOffsetNew.ts);
if (tqScanSnapshot(pTq, &pHandle->execHandle, &dataRsp, fetchOffsetNew, workerId) < 0) {
ASSERT(0);
}
......@@ -522,7 +523,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
break;
}
SWalReadHead* pHead = &pHeadWithCkSum->head;
SWalCont* pHead = &pHeadWithCkSum->head;
tqDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch,
TD_VID(pTq->pVnode), fetchOffset, pHead->msgType);
......@@ -597,6 +598,8 @@ int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) {
int32_t code = taosHashRemove(pTq->handles, pReq->subKey, strlen(pReq->subKey));
ASSERT(code == 0);
tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);
if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) {
ASSERT(0);
}
......
......@@ -97,6 +97,10 @@ int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset) {
return taosHashPut(pStore->pHash, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset));
}
int32_t tqOffsetDelete(STqOffsetStore* pStore, const char* subscribeKey) {
return taosHashRemove(pStore->pHash, subscribeKey, strlen(subscribeKey));
}
int32_t tqOffsetSnapshot(STqOffsetStore* pStore) {
// open file
// TODO file name should be with a version
......
......@@ -15,13 +15,13 @@
#include "tq.h"
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalHead** ppHeadWithCkSum) {
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead) {
int32_t code = 0;
taosThreadMutexLock(&pHandle->pWalReader->mutex);
int64_t offset = *fetchOffset;
while (1) {
if (walFetchHead(pHandle->pWalReader, offset, *ppHeadWithCkSum) < 0) {
if (walFetchHead(pHandle->pWalReader, offset, *ppCkHead) < 0) {
tqDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", pHandle->consumerId,
pHandle->epoch, TD_VID(pTq->pVnode), offset);
*fetchOffset = offset - 1;
......@@ -29,8 +29,8 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalHead*
goto END;
}
if ((*ppHeadWithCkSum)->head.msgType == TDMT_VND_SUBMIT) {
code = walFetchBody(pHandle->pWalReader, ppHeadWithCkSum);
if ((*ppCkHead)->head.msgType == TDMT_VND_SUBMIT) {
code = walFetchBody(pHandle->pWalReader, ppCkHead);
if (code < 0) {
ASSERT(0);
......@@ -43,9 +43,9 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalHead*
goto END;
} else {
if (pHandle->fetchMeta) {
SWalReadHead* pHead = &((*ppHeadWithCkSum)->head);
SWalCont* pHead = &((*ppCkHead)->head);
if (IS_META_MSG(pHead->msgType)) {
code = walFetchBody(pHandle->pWalReader, ppHeadWithCkSum);
code = walFetchBody(pHandle->pWalReader, ppCkHead);
if (code < 0) {
ASSERT(0);
......@@ -58,7 +58,7 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalHead*
goto END;
}
}
code = walSkipFetchBody(pHandle->pWalReader, *ppHeadWithCkSum);
code = walSkipFetchBody(pHandle->pWalReader, *ppCkHead);
if (code < 0) {
ASSERT(0);
*fetchOffset = offset;
......
......@@ -287,6 +287,7 @@ typedef struct STableScanInfo {
} lastStatus;
int8_t scanMode;
int8_t noTable;
} STableScanInfo;
typedef struct STagScanInfo {
......
......@@ -235,9 +235,11 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
if (uid == 0) {
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0);
uid = pTableInfo->uid;
ts = INT64_MIN;
if (taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList) != 0) {
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0);
uid = pTableInfo->uid;
ts = INT64_MIN;
}
}
return doPrepareScan(pTaskInfo->pRoot, uid, ts);
......
......@@ -2828,21 +2828,28 @@ int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
pScanInfo->blockType = STREAM_INPUT__DATA_SCAN;
STableScanInfo* pInfo = pScanInfo->pSnapshotReadOp->info;
if (uid == 0) {
pInfo->noTable = 1;
return TSDB_CODE_SUCCESS;
}
/*if (pSnapShotScanInfo->dataReader == NULL) {*/
/*pSnapShotScanInfo->dataReader = tsdbReaderOpen(pHandle->vnode, &pSTInfo->cond, tableList, 0, 0);*/
/*pSnapShotScanInfo->scanMode = TABLE_SCAN__TABLE_ORDER;*/
/*}*/
pInfo->noTable = 0;
if (pInfo->lastStatus.uid != uid || pInfo->lastStatus.ts != ts) {
tsdbSetTableId(pInfo->dataReader, uid);
int64_t oldSkey = pInfo->cond.twindows[0].skey;
pInfo->cond.twindows[0].skey = ts;
pInfo->cond.twindows[0].skey = ts + 1;
tsdbResetReadHandle(pInfo->dataReader, &pInfo->cond, 0);
pInfo->cond.twindows[0].skey = oldSkey;
pInfo->scanTimes = 0;
pInfo->curTWinIdx = 0;
}
return TSDB_CODE_SUCCESS;
} else {
if (pOperator->numOfDownstream == 1) {
......@@ -2855,8 +2862,6 @@ int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
return TSDB_CODE_QRY_APP_ERROR;
}
}
return TSDB_CODE_SUCCESS;
}
int32_t doGetScanStatus(SOperatorInfo* pOperator, uint64_t* uid, int64_t* ts) {
......
......@@ -518,6 +518,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
// if scan table by table
if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
if (pInfo->noTable) return NULL;
while (1) {
SSDataBlock* result = doTableScanGroup(pOperator);
if (result) {
......
......@@ -88,3 +88,12 @@ SStreamDataSubmit* streamSubmitRefClone(SStreamDataSubmit* pSubmit) {
memcpy(pSubmitClone, pSubmit, sizeof(SStreamDataSubmit));
return pSubmitClone;
}
void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit) {
int32_t ref = atomic_sub_fetch_32(pDataSubmit->dataRef, 1);
ASSERT(ref >= 0);
if (ref == 0) {
taosMemoryFree(pDataSubmit->data);
taosMemoryFree(pDataSubmit->dataRef);
}
}
......@@ -63,7 +63,6 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
continue;
}
// TODO: do we need free memory?
SSDataBlock block = {0};
assignOneDataBlock(&block, output);
block.info.childId = pTask->selfChildId;
......
......@@ -41,12 +41,3 @@ void streamQueueClose(SStreamQueue* queue) {
return;
}
}
void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit) {
int32_t ref = atomic_sub_fetch_32(pDataSubmit->dataRef, 1);
ASSERT(ref >= 0);
if (ref == 0) {
taosMemoryFree(pDataSubmit->data);
taosMemoryFree(pDataSubmit->dataRef);
}
}
......@@ -27,7 +27,7 @@ extern "C" {
#endif
// meta section begin
typedef struct WalFileInfo {
typedef struct {
int64_t firstVer;
int64_t lastVer;
int64_t createTs;
......@@ -98,20 +98,20 @@ static inline int walBuildIdxName(SWal* pWal, int64_t fileFirstVer, char* buf) {
return sprintf(buf, "%s/%020" PRId64 "." WAL_INDEX_SUFFIX, pWal->path, fileFirstVer);
}
static inline int walValidHeadCksum(SWalHead* pHead) {
return taosCheckChecksum((uint8_t*)&pHead->head, sizeof(SWalReadHead), pHead->cksumHead);
static inline int walValidHeadCksum(SWalCkHead* pHead) {
return taosCheckChecksum((uint8_t*)&pHead->head, sizeof(SWalCont), pHead->cksumHead);
}
static inline int walValidBodyCksum(SWalHead* pHead) {
static inline int walValidBodyCksum(SWalCkHead* pHead) {
return taosCheckChecksum((uint8_t*)pHead->head.body, pHead->head.bodyLen, pHead->cksumBody);
}
static inline int walValidCksum(SWalHead* pHead, void* body, int64_t bodyLen) {
static inline int walValidCksum(SWalCkHead* pHead, void* body, int64_t bodyLen) {
return walValidHeadCksum(pHead) && walValidBodyCksum(pHead);
}
static inline uint32_t walCalcHeadCksum(SWalHead* pHead) {
return taosCalcChecksum(0, (uint8_t*)&pHead->head, sizeof(SWalReadHead));
static inline uint32_t walCalcHeadCksum(SWalCkHead* pHead) {
return taosCalcChecksum(0, (uint8_t*)&pHead->head, sizeof(SWalCont));
}
static inline uint32_t walCalcBodyCksum(const void* body, uint32_t len) {
......
......@@ -16,7 +16,7 @@
#include "cJSON.h"
#include "os.h"
#include "taoserror.h"
#include "tref.h"
#include "tutil.h"
#include "walInt.h"
bool FORCE_INLINE walLogExist(SWal* pWal, int64_t ver) {
......@@ -37,26 +37,9 @@ static FORCE_INLINE int walBuildMetaName(SWal* pWal, int metaVer, char* buf) {
return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer);
}
void* tmemmem(char* haystack, int hlen, char* needle, int nlen) {
char* limit;
if (nlen == 0 || hlen < nlen) {
return NULL;
}
limit = haystack + hlen - nlen + 1;
while ((haystack = (char*)memchr(haystack, needle[0], limit - haystack)) != NULL) {
if (memcmp(haystack, needle, nlen) == 0) {
return haystack;
}
haystack++;
}
return NULL;
}
static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) {
ASSERT(pWal->fileInfoSet != NULL);
int sz = taosArrayGetSize(pWal->fileInfoSet);
int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
ASSERT(sz > 0);
#if 0
for (int i = 0; i < sz; i++) {
......@@ -101,14 +84,14 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) {
char* candidate;
while ((candidate = tmemmem(haystack, readSize - (haystack - buf), (char*)&magic, sizeof(uint64_t))) != NULL) {
// read and validate
SWalHead* logContent = (SWalHead*)candidate;
SWalCkHead* logContent = (SWalCkHead*)candidate;
if (walValidHeadCksum(logContent) == 0 && walValidBodyCksum(logContent) == 0) {
found = candidate;
}
haystack = candidate + 1;
}
if (found == buf) {
SWalHead* logContent = (SWalHead*)found;
SWalCkHead* logContent = (SWalCkHead*)found;
if (walValidHeadCksum(logContent) != 0 || walValidBodyCksum(logContent) != 0) {
// file has to be deleted
taosMemoryFree(buf);
......@@ -118,7 +101,7 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) {
}
}
taosCloseFile(&pFile);
SWalHead* lastEntry = (SWalHead*)found;
SWalCkHead* lastEntry = (SWalCkHead*)found;
return lastEntry->head.version;
}
......
......@@ -117,8 +117,8 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
pWal->lastRollSeq = -1;
// init write buffer
memset(&pWal->writeHead, 0, sizeof(SWalHead));
pWal->writeHead.head.headVer = WAL_HEAD_VER;
memset(&pWal->writeHead, 0, sizeof(SWalCkHead));
pWal->writeHead.head.protoVer = WAL_PROTO_VER;
pWal->writeHead.magic = WAL_MAGIC;
if (taosThreadMutexInit(&pWal->mutex, NULL) < 0) {
......
......@@ -33,7 +33,7 @@ SWalReadHandle *walOpenReadHandle(SWal *pWal) {
taosThreadMutexInit(&pRead->mutex, NULL);
pRead->pHead = taosMemoryMalloc(sizeof(SWalHead));
pRead->pHead = taosMemoryMalloc(sizeof(SWalCkHead));
if (pRead->pHead == NULL) {
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
taosMemoryFree(pRead);
......@@ -155,7 +155,7 @@ static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) {
void walSetReaderCapacity(SWalReadHandle *pRead, int32_t capacity) { pRead->capacity = capacity; }
int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalHead *pHead) {
int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalCkHead *pHead) {
int64_t code;
// TODO: valid ver
......@@ -170,8 +170,8 @@ int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalHead *pHead) {
ASSERT(taosValidFile(pRead->pReadLogTFile) == true);
code = taosReadFile(pRead->pReadLogTFile, pHead, sizeof(SWalHead));
if (code != sizeof(SWalHead)) {
code = taosReadFile(pRead->pReadLogTFile, pHead, sizeof(SWalCkHead));
if (code != sizeof(SWalCkHead)) {
return -1;
}
......@@ -186,7 +186,7 @@ int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalHead *pHead) {
return 0;
}
int32_t walSkipFetchBody(SWalReadHandle *pRead, const SWalHead *pHead) {
int32_t walSkipFetchBody(SWalReadHandle *pRead, const SWalCkHead *pHead) {
int64_t code;
ASSERT(pRead->curVersion == pHead->head.version);
......@@ -203,12 +203,12 @@ int32_t walSkipFetchBody(SWalReadHandle *pRead, const SWalHead *pHead) {
return 0;
}
int32_t walFetchBody(SWalReadHandle *pRead, SWalHead **ppHead) {
SWalReadHead *pReadHead = &((*ppHead)->head);
int64_t ver = pReadHead->version;
int32_t walFetchBody(SWalReadHandle *pRead, SWalCkHead **ppHead) {
SWalCont *pReadHead = &((*ppHead)->head);
int64_t ver = pReadHead->version;
if (pRead->capacity < pReadHead->bodyLen) {
void *ptr = taosMemoryRealloc(*ppHead, sizeof(SWalHead) + pReadHead->bodyLen);
void *ptr = taosMemoryRealloc(*ppHead, sizeof(SWalCkHead) + pReadHead->bodyLen);
if (ptr == NULL) {
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
return -1;
......@@ -241,18 +241,18 @@ int32_t walFetchBody(SWalReadHandle *pRead, SWalHead **ppHead) {
return 0;
}
int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalReadHead **ppHead) {
int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalCont **ppHead) {
taosThreadMutexLock(&pRead->mutex);
if (walReadWithHandle(pRead, ver) < 0) {
taosThreadMutexUnlock(&pRead->mutex);
return -1;
}
*ppHead = taosMemoryMalloc(sizeof(SWalReadHead) + pRead->pHead->head.bodyLen);
*ppHead = taosMemoryMalloc(sizeof(SWalCont) + pRead->pHead->head.bodyLen);
if (*ppHead == NULL) {
taosThreadMutexUnlock(&pRead->mutex);
return -1;
}
memcpy(*ppHead, &pRead->pHead->head, sizeof(SWalReadHead) + pRead->pHead->head.bodyLen);
memcpy(*ppHead, &pRead->pHead->head, sizeof(SWalCont) + pRead->pHead->head.bodyLen);
taosThreadMutexUnlock(&pRead->mutex);
return 0;
}
......@@ -282,8 +282,8 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
ASSERT(taosValidFile(pRead->pReadLogTFile) == true);
code = taosReadFile(pRead->pReadLogTFile, pRead->pHead, sizeof(SWalHead));
if (code != sizeof(SWalHead)) {
code = taosReadFile(pRead->pReadLogTFile, pRead->pHead, sizeof(SWalCkHead));
if (code != sizeof(SWalCkHead)) {
if (code < 0)
terrno = TAOS_SYSTEM_ERROR(errno);
else {
......@@ -301,7 +301,7 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
}
if (pRead->capacity < pRead->pHead->head.bodyLen) {
void *ptr = taosMemoryRealloc(pRead->pHead, sizeof(SWalHead) + pRead->pHead->head.bodyLen);
void *ptr = taosMemoryRealloc(pRead->pHead, sizeof(SWalCkHead) + pRead->pHead->head.bodyLen);
if (ptr == NULL) {
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
return -1;
......
......@@ -142,10 +142,10 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
return -1;
}
// validate offset
SWalHead head;
SWalCkHead head;
ASSERT(taosValidFile(pLogTFile));
int64_t size = taosReadFile(pLogTFile, &head, sizeof(SWalHead));
if (size != sizeof(SWalHead)) {
int64_t size = taosReadFile(pLogTFile, &head, sizeof(SWalCkHead));
if (size != sizeof(SWalCkHead)) {
return -1;
}
code = walValidHeadCksum(&head);
......@@ -261,7 +261,7 @@ int32_t walEndSnapshot(SWal *pWal) {
}
int walRoll(SWal *pWal) {
int code = 0;
int32_t code = 0;
if (pWal->pWriteIdxTFile != NULL) {
code = taosCloseFile(&pWal->pWriteIdxTFile);
if (code != 0) {
......@@ -321,12 +321,13 @@ static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
return 0;
}
int64_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLogMeta syncMeta, const void *body,
int32_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLogMeta syncMeta, const void *body,
int32_t bodyLen) {
int code = 0;
int32_t code = 0;
// no wal
if (pWal->cfg.level == TAOS_WAL_NOLOG) return 0;
if (bodyLen > TSDB_MAX_WAL_SIZE) {
terrno = TSDB_CODE_WAL_SIZE_LIMIT;
return -1;
......@@ -356,6 +357,7 @@ int64_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLog
terrno = TSDB_CODE_WAL_INVALID_VER;
return -1;
}
/*if (!tfValid(pWal->pWriteLogTFile)) return -1;*/
ASSERT(pWal->writeCur >= 0);
......@@ -380,7 +382,7 @@ int64_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLog
pWal->writeHead.cksumHead = walCalcHeadCksum(&pWal->writeHead);
pWal->writeHead.cksumBody = walCalcBodyCksum(body, bodyLen);
if (taosWriteFile(pWal->pWriteLogTFile, &pWal->writeHead, sizeof(SWalHead)) != sizeof(SWalHead)) {
if (taosWriteFile(pWal->pWriteLogTFile, &pWal->writeHead, sizeof(SWalCkHead)) != sizeof(SWalCkHead)) {
// TODO ftruncate
terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
......@@ -405,19 +407,19 @@ int64_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLog
// set status
if (pWal->vers.firstVer == -1) pWal->vers.firstVer = index;
pWal->vers.lastVer = index;
pWal->totSize += sizeof(SWalHead) + bodyLen;
pWal->totSize += sizeof(SWalCkHead) + bodyLen;
if (walGetCurFileInfo(pWal)->firstVer == -1) {
walGetCurFileInfo(pWal)->firstVer = index;
}
walGetCurFileInfo(pWal)->lastVer = index;
walGetCurFileInfo(pWal)->fileSize += sizeof(SWalHead) + bodyLen;
walGetCurFileInfo(pWal)->fileSize += sizeof(SWalCkHead) + bodyLen;
taosThreadMutexUnlock(&pWal->mutex);
return 0;
}
int64_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, int32_t bodyLen) {
int32_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, int32_t bodyLen) {
SSyncLogMeta syncMeta = {
.isWeek = -1,
.seqNum = UINT64_MAX,
......@@ -435,27 +437,3 @@ void walFsync(SWal *pWal, bool forceFsync) {
}
}
}
/*static int walValidateOffset(SWal* pWal, int64_t ver) {*/
/*int code = 0;*/
/*SWalHead *pHead = NULL;*/
/*code = (int)walRead(pWal, &pHead, ver);*/
/*if(pHead->head.version != ver) {*/
/*return -1;*/
/*}*/
/*return 0;*/
/*}*/
/*static int64_t walGetOffset(SWal* pWal, int64_t ver) {*/
/*int code = walSeekVer(pWal, ver);*/
/*if(code != 0) {*/
/*return -1;*/
/*}*/
/*code = walValidateOffset(pWal, ver);*/
/*if(code != 0) {*/
/*return -1;*/
/*}*/
/*return 0;*/
/*}*/
......@@ -148,7 +148,7 @@ TEST_F(WalCleanEnv, createNew) {
walRollFileInfo(pWal);
ASSERT(pWal->fileInfoSet != NULL);
ASSERT_EQ(pWal->fileInfoSet->size, 1);
WalFileInfo* pInfo = (WalFileInfo*)taosArrayGetLast(pWal->fileInfoSet);
SWalFileInfo* pInfo = (SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet);
ASSERT_EQ(pInfo->firstVer, 0);
ASSERT_EQ(pInfo->lastVer, -1);
ASSERT_EQ(pInfo->closeTs, -1);
......
......@@ -300,16 +300,14 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) {
return pFile;
}
int64_t taosCloseFile(TdFilePtr *ppFile) {
int32_t taosCloseFile(TdFilePtr *ppFile) {
int32_t code = 0;
if (ppFile == NULL || *ppFile == NULL) {
return 0;
}
#if FILE_WITH_LOCK
taosThreadRwlockWrlock(&((*ppFile)->rwlock));
#endif
if (ppFile == NULL || *ppFile == NULL) {
return 0;
}
if ((*ppFile)->fp != NULL) {
fflush((*ppFile)->fp);
fclose((*ppFile)->fp);
......@@ -320,9 +318,10 @@ int64_t taosCloseFile(TdFilePtr *ppFile) {
HANDLE h = (HANDLE)_get_osfhandle((*ppFile)->fd);
!FlushFileBuffers(h);
#else
fsync((*ppFile)->fd);
// warning: never fsync silently in base lib
/*fsync((*ppFile)->fd);*/
#endif
close((*ppFile)->fd);
code = close((*ppFile)->fd);
(*ppFile)->fd = -1;
}
(*ppFile)->refId = 0;
......@@ -332,7 +331,7 @@ int64_t taosCloseFile(TdFilePtr *ppFile) {
#endif
taosMemoryFree(*ppFile);
*ppFile = NULL;
return 0;
return code;
}
int64_t taosReadFile(TdFilePtr pFile, void *buf, int64_t count) {
......@@ -560,6 +559,8 @@ int32_t taosFsyncFile(TdFilePtr pFile) {
return 0;
}
// this implementation is WRONG
// fflush is not a replacement of fsync
if (pFile->fp != NULL) return fflush(pFile->fp);
if (pFile->fd >= 0) {
#ifdef WINDOWS
......
......@@ -206,11 +206,11 @@ void* taosArrayGetP(const SArray* pArray, size_t index) {
void* taosArrayGetLast(const SArray* pArray) { return TARRAY_GET_ELEM(pArray, pArray->size - 1); }
size_t taosArrayGetSize(const SArray* pArray) {
int32_t taosArrayGetSize(const SArray* pArray) {
if (pArray == NULL) {
return 0;
}
return pArray->size;
return (int32_t)pArray->size;
}
void taosArraySetSize(SArray* pArray, size_t size) {
......
......@@ -16,6 +16,23 @@
#define _DEFAULT_SOURCE
#include "tutil.h"
void *tmemmem(const char *haystack, int32_t hlen, const char *needle, int32_t nlen) {
const char *limit;
if (nlen == 0 || hlen < nlen) {
return NULL;
}
limit = haystack + hlen - nlen + 1;
while ((haystack = (char *)memchr(haystack, needle[0], limit - haystack)) != NULL) {
if (memcmp(haystack, needle, nlen) == 0) {
return (void *)haystack;
}
haystack++;
}
return NULL;
}
int32_t strdequote(char *z) {
if (z == NULL) {
return 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册