提交 2b9dc8c7 编写于 作者: wafwerar's avatar wafwerar

[TD-13062]<fix>: file system add stream.

上级 fb6fa6ad
......@@ -16,12 +16,12 @@
#ifndef _TD_OS_FILE_H_
#define _TD_OS_FILE_H_
#include "osSocket.h"
#ifdef __cplusplus
extern "C" {
#endif
#include "osSocket.h"
#ifndef ALLOW_FORBID_FUNC
#define open OPEN_FUNC_TAOS_FORBID
#define fopen FOPEN_FUNC_TAOS_FORBID
......@@ -43,6 +43,7 @@ typedef struct TdFile *TdFilePtr;
#define TD_FILE_TEXT 0x0020
#define TD_FILE_AUTO_DEL 0x0040
#define TD_FILE_EXCL 0x0080
#define TD_FILE_STREAM 0x0100 // Only support taosFprintfFile, taosGetLineFile, taosGetLineFile, taosEOFFile
int32_t taosLockFile(TdFilePtr pFile);
int32_t taosUnLockFile(TdFilePtr pFile);
......
......@@ -1090,14 +1090,14 @@ int32_t dumpFileBlockByGroupId(STSBuf* pTSBuf, int32_t groupIndex, void* buf, in
*numOfBlocks = 0;
if (taosLSeekFile(pTSBuf->pFile, pBlockInfo->offset, SEEK_SET) != 0) {
int32_t code = TAOS_SYSTEM_ERROR(taosEOFFile(pTSBuf->pFile));
int32_t code = TAOS_SYSTEM_ERROR(taosGetErrorFile(pTSBuf->pFile));
// qError("%p: fseek failed: %s", pSql, tstrerror(code));
return code;
}
size_t s = taosReadFile(pTSBuf->pFile, buf, pBlockInfo->compLen);
if (s != pBlockInfo->compLen) {
int32_t code = TAOS_SYSTEM_ERROR(taosEOFFile(pTSBuf->pFile));
int32_t code = TAOS_SYSTEM_ERROR(taosGetErrorFile(pTSBuf->pFile));
// tscError("%p: fread didn't return expected data: %s", pSql, tstrerror(code));
return code;
}
......
......@@ -63,7 +63,7 @@ static void mndAddCpuInfo(SMnode* pMnode, SBufferWriter* bw) {
int32_t done = 0;
// FILE* fp = fopen("/proc/cpuinfo", "r");
TdFilePtr pFile = taosOpenFile("/proc/cpuinfo", TD_FILE_READ);
TdFilePtr pFile = taosOpenFile("/proc/cpuinfo", TD_FILE_READ | TD_FILE_STREAM);
if (pFile == NULL) {
return;
}
......@@ -93,7 +93,7 @@ static void mndAddOsInfo(SMnode* pMnode, SBufferWriter* bw) {
size_t size = 0;
// FILE* fp = fopen("/etc/os-release", "r");
TdFilePtr pFile = taosOpenFile("/etc/os-release", TD_FILE_READ);
TdFilePtr pFile = taosOpenFile("/etc/os-release", TD_FILE_READ | TD_FILE_STREAM);
if (pFile == NULL) {
return;
}
......@@ -120,7 +120,7 @@ static void mndAddMemoryInfo(SMnode* pMnode, SBufferWriter* bw) {
size_t size = 0;
// FILE* fp = fopen("/proc/meminfo", "r");
TdFilePtr pFile = taosOpenFile("/proc/meminfo", TD_FILE_READ);
TdFilePtr pFile = taosOpenFile("/proc/meminfo", TD_FILE_READ | TD_FILE_STREAM);
if (pFile == NULL) {
return;
}
......
......@@ -187,47 +187,63 @@ int32_t taosStatFile(const char *path, int64_t *size, int32_t *mtime) {
void autoDelFileListAdd(const char *path) { return; }
TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) {
printf("%s(%d) %s path=%s tdFileOptions=%d\n", __FILE__, __LINE__,__func__,path,tdFileOptions);
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
return NULL;
#else
int access = O_BINARY;
char *mode = NULL;
access |= (tdFileOptions & TD_FILE_CTEATE) ? O_CREAT : 0;
if ((tdFileOptions & TD_FILE_WRITE) && (tdFileOptions & TD_FILE_READ)) {
access |= O_RDWR;
mode = (tdFileOptions & TD_FILE_TEXT) ? "rt+" : "rb+";
} else if (tdFileOptions & TD_FILE_WRITE) {
access |= O_WRONLY;
mode = (tdFileOptions & TD_FILE_TEXT) ? "wt" : "wb";
} else if (tdFileOptions & TD_FILE_READ) {
access |= O_RDONLY;
mode = (tdFileOptions & TD_FILE_TEXT) ? "rt" : "rb";
}
access |= (tdFileOptions & TD_FILE_TRUNC) ? O_TRUNC : 0;
access |= (tdFileOptions & TD_FILE_APPEND) ? O_APPEND : 0;
access |= (tdFileOptions & TD_FILE_TEXT) ? O_TEXT : 0;
access |= (tdFileOptions & TD_FILE_EXCL) ? O_EXCL : 0;
int fd = -1;
FILE *fp = NULL;
if (tdFileOptions & TD_FILE_STREAM) {
char *mode = NULL;
if (tdFileOptions & TD_FILE_APPEND) {
mode = (tdFileOptions & TD_FILE_TEXT) ? "at+" : "ab+";
}else if (tdFileOptions & TD_FILE_TRUNC) {
mode = (tdFileOptions & TD_FILE_TEXT) ? "wt+" : "wb+";
}else {
mode = (tdFileOptions & TD_FILE_TEXT) ? "rt+" : "rb+";
}
assert(!(tdFileOptions & TD_FILE_EXCL));
fp = fopen(path, mode);
if (fp == NULL) {
printf("%s(%d) %s\n", __FILE__, __LINE__,__func__);
return NULL;
}
} else {
int access = O_BINARY;
access |= (tdFileOptions & TD_FILE_CTEATE) ? O_CREAT : 0;
if ((tdFileOptions & TD_FILE_WRITE) && (tdFileOptions & TD_FILE_READ)) {
access |= O_RDWR;
} else if (tdFileOptions & TD_FILE_WRITE) {
access |= O_WRONLY;
} else if (tdFileOptions & TD_FILE_READ) {
access |= O_RDONLY;
}
access |= (tdFileOptions & TD_FILE_TRUNC) ? O_TRUNC : 0;
access |= (tdFileOptions & TD_FILE_APPEND) ? O_APPEND : 0;
access |= (tdFileOptions & TD_FILE_TEXT) ? O_TEXT : 0;
access |= (tdFileOptions & TD_FILE_EXCL) ? O_EXCL : 0;
fd = open(path, access, S_IRWXU | S_IRWXG | S_IRWXO);
if (fd == -1) {
printf("%s(%d) %s access=%d\n", __FILE__, __LINE__,__func__,access);
return NULL;
}
}
if (tdFileOptions & TD_FILE_AUTO_DEL) {
autoDelFileListAdd(path);
}
int fd = open(path, access, S_IRWXU | S_IRWXG | S_IRWXO);
if (fd == -1) {
return NULL;
}
FILE *fp = fdopen(fd, mode);
if (fp == NULL) {
close(fd);
return NULL;
}
TdFilePtr pFile = (TdFilePtr)malloc(sizeof(TdFile));
if (pFile == NULL) {
close(fd);
fclose(fp);
if (fd >= 0) close(fd);
if (fp != NULL) fclose(fp);
printf("%s(%d) %s\n", __FILE__, __LINE__,__func__);
return NULL;
}
pFile->fd = fd;
pFile->fp = fp;
pFile->refId = 0;
printf("%s(%d) %s\n", __FILE__, __LINE__,__func__);
return pFile;
#endif
}
......@@ -239,11 +255,16 @@ int64_t taosCloseFile(TdFilePtr *ppFile) {
if (ppFile == NULL || *ppFile == NULL || (*ppFile)->fd == -1) {
return 0;
}
fflush((*ppFile)->fp);
fsync((*ppFile)->fd);
close((*ppFile)->fd);
(*ppFile)->fd = -1;
(*ppFile)->fp = NULL;
if ((*ppFile)->fp != NULL) {
fflush((*ppFile)->fp);
fclose((*ppFile)->fp);
(*ppFile)->fp = NULL;
}
if ((*ppFile)->fd >= 0) {
fsync((*ppFile)->fd);
close((*ppFile)->fd);
(*ppFile)->fd = -1;
}
(*ppFile)->refId = 0;
free(*ppFile);
*ppFile = NULL;
......@@ -255,6 +276,7 @@ int64_t taosReadFile(TdFilePtr pFile, void *buf, int64_t count) {
if (pFile == NULL) {
return 0;
}
assert(pFile->fd >= 0);
int64_t leftbytes = count;
int64_t readbytes;
char *tbuf = (char *)buf;
......@@ -282,10 +304,16 @@ int64_t taosPReadFile(TdFilePtr pFile, void *buf, int64_t count, int64_t offset)
if (pFile == NULL) {
return 0;
}
assert(pFile->fd >= 0);
return pread(pFile->fd, buf, count, offset);
}
int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count) {
if (pFile == NULL) {
return 0;
}
assert(pFile->fd >= 0);
int64_t nleft = count;
int64_t nwritten = 0;
char *tbuf = (char *)buf;
......@@ -296,21 +324,20 @@ int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count) {
if (errno == EINTR) {
continue;
}
fflush(pFile->fp);
fsync(pFile->fd);
return -1;
}
nleft -= nwritten;
tbuf += nwritten;
}
fflush(pFile->fp);
fsync(pFile->fd);
return count;
}
int64_t taosLSeekFile(TdFilePtr pFile, int64_t offset, int32_t whence) {
if (pFile == NULL) return -1;
if (pFile == NULL) {
return 0;
}
assert(pFile->fd >= 0);
return (int64_t)lseek(pFile->fd, (long)offset, whence);
}
......@@ -318,6 +345,11 @@ int32_t taosFStatFile(TdFilePtr pFile, int64_t *size, int32_t *mtime) {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
return 0;
#else
if (pFile == NULL) {
return 0;
}
assert(pFile->fd >= 0);
struct stat fileStat;
int32_t code = fstat(pFile->fd, &fileStat);
if (code < 0) {
......@@ -340,6 +372,11 @@ int32_t taosLockFile(TdFilePtr pFile) {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
return 0;
#else
if (pFile == NULL) {
return 0;
}
assert(pFile->fd >= 0);
return (int32_t)flock(pFile->fd, LOCK_EX | LOCK_NB);
#endif
}
......@@ -348,6 +385,11 @@ int32_t taosUnLockFile(TdFilePtr pFile) {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
return 0;
#else
if (pFile == NULL) {
return 0;
}
assert(pFile->fd >= 0);
return (int32_t)flock(pFile->fd, LOCK_UN | LOCK_NB);
#endif
}
......@@ -403,6 +445,11 @@ int32_t taosFtruncateFile(TdFilePtr pFile, int64_t l_size) {
return 0;
#else
if (pFile == NULL) {
return 0;
}
assert(pFile->fd >= 0);
return ftruncate(pFile->fd, l_size);
#endif
}
......@@ -419,7 +466,14 @@ int32_t taosFsyncFile(TdFilePtr pFile) {
return FlushFileBuffers(h);
#else
return fflush(pFile->fp);
if (pFile == NULL) {
return 0;
}
if (pFile->fp != NULL) return fflush(pFile->fp);
if (pFile->fp >= 0) return fsync(pFile->fd);
return 0;
#endif
}
......@@ -543,6 +597,11 @@ int64_t taosSendFile(SocketFd dfd, FileFd sfd, int64_t *offset, int64_t count) {
#else
int64_t taosSendFile(SocketFd fdDst, TdFilePtr pFileSrc, int64_t *offset, int64_t size) {
if (pFileSrc == NULL) {
return 0;
}
assert(pFileSrc->fd >= 0);
int64_t leftbytes = size;
int64_t sentbytes;
......@@ -565,12 +624,22 @@ int64_t taosSendFile(SocketFd fdDst, TdFilePtr pFileSrc, int64_t *offset, int64_
}
int64_t taosFSendFile(TdFilePtr pFileOut, TdFilePtr pFileIn, int64_t *offset, int64_t size) {
if (pFileOut == NULL || pFileIn == NULL) {
return 0;
}
assert(pFileOut->fd >= 0);
return taosSendFile(pFileOut->fd, pFileIn, offset, size);
}
#endif
void taosFprintfFile(TdFilePtr pFile, const char *format, ...) {
if (pFile == NULL) {
return;
}
assert(pFile->fp != NULL);
char buffer[MAX_FPRINTFLINE_BUFFER_SIZE] = {0};
va_list ap;
va_start(ap, format);
......@@ -580,7 +649,10 @@ void taosFprintfFile(TdFilePtr pFile, const char *format, ...) {
}
void *taosMmapReadOnlyFile(TdFilePtr pFile, int64_t length) {
if (pFile == NULL) return NULL;
if (pFile == NULL) {
return NULL;
}
assert(pFile->fd >= 0);
void *ptr = mmap(NULL, length, PROT_READ, MAP_SHARED, pFile->fd, 0);
return ptr;
......@@ -598,7 +670,19 @@ int32_t taosUmaskFile(int32_t maskVal) {
int32_t taosGetErrorFile(TdFilePtr pFile) { return errno; }
int64_t taosGetLineFile(TdFilePtr pFile, char **__restrict__ ptrBuf) {
if (pFile == NULL) {
return -1;
}
assert(pFile->fp != NULL);
size_t len = 0;
return getline(ptrBuf, &len, pFile->fp);
}
int32_t taosEOFFile(TdFilePtr pFile) { return feof(pFile->fp); }
\ No newline at end of file
int32_t taosEOFFile(TdFilePtr pFile) {
if (pFile == NULL) {
return 0;
}
assert(pFile->fp != NULL);
return feof(pFile->fp);
}
\ No newline at end of file
......@@ -404,7 +404,7 @@ bool taosGetSysMemory(float *memoryUsedMB) {
bool taosGetProcMemory(float *memoryUsedMB) {
// FILE *fp = fopen(tsProcMemFile, "r");
TdFilePtr pFile = taosOpenFile(tsProcMemFile, TD_FILE_READ);
TdFilePtr pFile = taosOpenFile(tsProcMemFile, TD_FILE_READ | TD_FILE_STREAM);
if (pFile == NULL) {
//printf("open file:%s failed", tsProcMemFile);
return false;
......@@ -440,7 +440,7 @@ bool taosGetProcMemory(float *memoryUsedMB) {
static bool taosGetSysCpuInfo(SysCpuInfo *cpuInfo) {
// FILE *fp = fopen(tsSysCpuFile, "r");
TdFilePtr pFile = taosOpenFile(tsSysCpuFile, TD_FILE_READ);
TdFilePtr pFile = taosOpenFile(tsSysCpuFile, TD_FILE_READ | TD_FILE_STREAM);
if (pFile == NULL) {
//printf("open file:%s failed", tsSysCpuFile);
return false;
......@@ -465,7 +465,7 @@ static bool taosGetSysCpuInfo(SysCpuInfo *cpuInfo) {
static bool taosGetProcCpuInfo(ProcCpuInfo *cpuInfo) {
// FILE *fp = fopen(tsProcCpuFile, "r");
TdFilePtr pFile = taosOpenFile(tsProcCpuFile, TD_FILE_READ);
TdFilePtr pFile = taosOpenFile(tsProcCpuFile, TD_FILE_READ | TD_FILE_STREAM);
if (pFile == NULL) {
//printf("open file:%s failed", tsProcCpuFile);
return false;
......@@ -550,7 +550,7 @@ int32_t taosGetDiskSize(char *dataDir, SDiskSize *diskSize) {
bool taosGetCardInfo(int64_t *bytes, int64_t *rbytes, int64_t *tbytes) {
*bytes = 0;
// FILE *fp = fopen(tsSysNetFile, "r");
TdFilePtr pFile = taosOpenFile(tsSysNetFile, TD_FILE_READ);
TdFilePtr pFile = taosOpenFile(tsSysNetFile, TD_FILE_READ | TD_FILE_STREAM);
if (pFile == NULL) {
//printf("open file:%s failed", tsSysNetFile);
return false;
......@@ -636,7 +636,7 @@ bool taosGetBandSpeed(float *bandSpeedKb) {
bool taosReadProcIO(int64_t *rchars, int64_t *wchars) {
// FILE *fp = fopen(tsProcIOFile, "r");
TdFilePtr pFile = taosOpenFile(tsProcIOFile, TD_FILE_READ);
TdFilePtr pFile = taosOpenFile(tsProcIOFile, TD_FILE_READ | TD_FILE_STREAM);
if (pFile == NULL) {
//printf("open file:%s failed", tsProcIOFile);
return false;
......
......@@ -609,7 +609,7 @@ int32_t cfgLoadFromCfgFile(SConfig *pConfig, const char *filepath) {
ssize_t _bytes = 0;
// FILE *fp = fopen(filepath, "r");
TdFilePtr pFile = taosOpenFile(filepath, TD_FILE_READ);
TdFilePtr pFile = taosOpenFile(filepath, TD_FILE_READ | TD_FILE_STREAM);
if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
......
......@@ -171,7 +171,7 @@ static void shellSourceFile(TAOS *con, char *fptr) {
*/
// FILE *f = fopen(fname, "r");
TdFilePtr pFile = taosOpenFile(fname, TD_FILE_READ);
TdFilePtr pFile = taosOpenFile(fname, TD_FILE_READ | TD_FILE_STREAM);
if (pFile == NULL) {
fprintf(stderr, "ERROR: failed to open file %s\n", fname);
wordfree(&full_path);
......
......@@ -517,7 +517,7 @@ static int dumpResultToFile(const char *fname, TAOS_RES *tres) {
}
// FILE *fp = fopen(full_path.we_wordv[0], "w");
TdFilePtr pFile = taosOpenFile(full_path.we_wordv[0], TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC);
TdFilePtr pFile = taosOpenFile(full_path.we_wordv[0], TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
if (pFile == NULL) {
fprintf(stderr, "ERROR: failed to open file: %s\n", full_path.we_wordv[0]);
wordfree(&full_path);
......@@ -904,7 +904,7 @@ void read_history() {
get_history_path(f_history);
// FILE *f = fopen(f_history, "r");
TdFilePtr pFile = taosOpenFile(f_history, TD_FILE_READ);
TdFilePtr pFile = taosOpenFile(f_history, TD_FILE_READ | TD_FILE_STREAM);
if (pFile == NULL) {
#ifndef WINDOWS
if (errno != ENOENT) {
......@@ -934,7 +934,7 @@ void write_history() {
get_history_path(f_history);
// FILE *f = fopen(f_history, "w");
TdFilePtr pFile = taosOpenFile(f_history, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC);
TdFilePtr pFile = taosOpenFile(f_history, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
if (pFile == NULL) {
#ifndef WINDOWS
fprintf(stderr, "Failed to open file %s for write, reason:%s\n", f_history, strerror(errno));
......@@ -991,7 +991,7 @@ void source_file(TAOS *con, char *fptr) {
*/
// FILE *f = fopen(fname, "r");
TdFilePtr pFile = taosOpenFile(fname, TD_FILE_READ);
TdFilePtr pFile = taosOpenFile(fname, TD_FILE_READ | TD_FILE_STREAM);
if (pFile == NULL) {
fprintf(stderr, "ERROR: failed to open file %s\n", fname);
wordfree(&full_path);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册