提交 b3eb1986 编写于 作者: C Cary Xu

Merge branch '3.0' into feature/TD-11274-3.0

...@@ -49,6 +49,8 @@ void osDefaultInit(); ...@@ -49,6 +49,8 @@ void osDefaultInit();
void osUpdate(); void osUpdate();
void osCleanup(); void osCleanup();
bool osLogSpaceAvailable(); bool osLogSpaceAvailable();
bool osDataSpaceAvailable();
bool osTempSpaceAvailable();
void osSetTimezone(const char *timezone); void osSetTimezone(const char *timezone);
void osSetSystemLocale(const char *inLocale, const char *inCharSet); void osSetSystemLocale(const char *inLocale, const char *inCharSet);
......
...@@ -452,7 +452,7 @@ static void taosSetClientLogCfg(SConfig *pCfg) { ...@@ -452,7 +452,7 @@ static void taosSetClientLogCfg(SConfig *pCfg) {
SConfigItem *pItem = cfgGetItem(pCfg, "logDir"); SConfigItem *pItem = cfgGetItem(pCfg, "logDir");
tstrncpy(tsLogDir, cfgGetItem(pCfg, "logDir")->str, PATH_MAX); tstrncpy(tsLogDir, cfgGetItem(pCfg, "logDir")->str, PATH_MAX);
taosExpandDir(tsLogDir, tsLogDir, PATH_MAX); taosExpandDir(tsLogDir, tsLogDir, PATH_MAX);
tsLogSpace.reserved = cfgGetItem(pCfg, "minimalLogDirGB")->fval; tsLogSpace.reserved = (int64_t)(((double)cfgGetItem(pCfg, "minimalLogDirGB")->fval) * 1024 * 1024 * 1024);
tsNumOfLogLines = cfgGetItem(pCfg, "numOfLogLines")->i32; tsNumOfLogLines = cfgGetItem(pCfg, "numOfLogLines")->i32;
tsAsyncLog = cfgGetItem(pCfg, "asyncLog")->bval; tsAsyncLog = cfgGetItem(pCfg, "asyncLog")->bval;
tsLogKeepDays = cfgGetItem(pCfg, "logKeepDays")->i32; tsLogKeepDays = cfgGetItem(pCfg, "logKeepDays")->i32;
...@@ -502,7 +502,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { ...@@ -502,7 +502,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
tstrncpy(tsTempDir, cfgGetItem(pCfg, "tempDir")->str, PATH_MAX); tstrncpy(tsTempDir, cfgGetItem(pCfg, "tempDir")->str, PATH_MAX);
taosExpandDir(tsTempDir, tsTempDir, PATH_MAX); taosExpandDir(tsTempDir, tsTempDir, PATH_MAX);
tsTempSpace.reserved = cfgGetItem(pCfg, "minimalTmpDirGB")->fval; tsTempSpace.reserved = (int64_t)(((double)cfgGetItem(pCfg, "minimalTmpDirGB")->fval) * 1024 * 1024 * 1024);
if (taosMulMkDir(tsTempDir) != 0) { if (taosMulMkDir(tsTempDir) != 0) {
uError("failed to create tempDir:%s since %s", tsTempDir, terrstr()); uError("failed to create tempDir:%s since %s", tsTempDir, terrstr());
return -1; return -1;
...@@ -540,7 +540,7 @@ static void taosSetSystemCfg(SConfig *pCfg) { ...@@ -540,7 +540,7 @@ static void taosSetSystemCfg(SConfig *pCfg) {
} }
static int32_t taosSetServerCfg(SConfig *pCfg) { static int32_t taosSetServerCfg(SConfig *pCfg) {
tsDataSpace.reserved = cfgGetItem(pCfg, "minimalDataDirGB")->fval; tsDataSpace.reserved = (int64_t)(((double)cfgGetItem(pCfg, "minimalDataDirGB")->fval) * 1024 * 1024 * 1024);
tsNumOfSupportVnodes = cfgGetItem(pCfg, "supportVnodes")->i32; tsNumOfSupportVnodes = cfgGetItem(pCfg, "supportVnodes")->i32;
tsMaxShellConns = cfgGetItem(pCfg, "maxShellConns")->i32; tsMaxShellConns = cfgGetItem(pCfg, "maxShellConns")->i32;
tsStatusInterval = cfgGetItem(pCfg, "statusInterval")->i32; tsStatusInterval = cfgGetItem(pCfg, "statusInterval")->i32;
...@@ -739,15 +739,15 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) { ...@@ -739,15 +739,15 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
} }
case 'i': { case 'i': {
if (strcasecmp("minimalTmpDirGB", name) == 0) { if (strcasecmp("minimalTmpDirGB", name) == 0) {
tsTempSpace.reserved = cfgGetItem(pCfg, "minimalTmpDirGB")->fval; tsTempSpace.reserved = (int64_t)(((double)cfgGetItem(pCfg, "minimalTmpDirGB")->fval) * 1024 * 1024 * 1024);
} else if (strcasecmp("minimalDataDirGB", name) == 0) { } else if (strcasecmp("minimalDataDirGB", name) == 0) {
tsDataSpace.reserved = cfgGetItem(pCfg, "minimalDataDirGB")->fval; tsDataSpace.reserved = (int64_t)(((double)cfgGetItem(pCfg, "minimalDataDirGB")->fval) * 1024 * 1024 * 1024);
} else if (strcasecmp("minSlidingTime", name) == 0) { } else if (strcasecmp("minSlidingTime", name) == 0) {
tsMinSlidingTime = cfgGetItem(pCfg, "minSlidingTime")->i32; tsMinSlidingTime = cfgGetItem(pCfg, "minSlidingTime")->i32;
} else if (strcasecmp("minIntervalTime", name) == 0) { } else if (strcasecmp("minIntervalTime", name) == 0) {
tsMinIntervalTime = cfgGetItem(pCfg, "minIntervalTime")->i32; tsMinIntervalTime = cfgGetItem(pCfg, "minIntervalTime")->i32;
} else if (strcasecmp("minimalLogDirGB", name) == 0) { } else if (strcasecmp("minimalLogDirGB", name) == 0) {
tsLogSpace.reserved = cfgGetItem(pCfg, "minimalLogDirGB")->fval; tsLogSpace.reserved = (int64_t)(((double)cfgGetItem(pCfg, "minimalLogDirGB")->fval) * 1024 * 1024 * 1024);
} }
break; break;
} }
......
...@@ -30,6 +30,12 @@ static int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader); ...@@ -30,6 +30,12 @@ static int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader);
* @return * @return
*/ */
STSBuf* tsBufCreate(bool autoDelete, int32_t order) { STSBuf* tsBufCreate(bool autoDelete, int32_t order) {
if (!osTempSpaceAvailable()) {
terrno = TSDB_CODE_TSC_NO_DISKSPACE;
// tscError("tmp file created failed since %s", terrstr());
return NULL;
}
STSBuf* pTSBuf = taosMemoryCalloc(1, sizeof(STSBuf)); STSBuf* pTSBuf = taosMemoryCalloc(1, sizeof(STSBuf));
if (pTSBuf == NULL) { if (pTSBuf == NULL) {
return NULL; return NULL;
......
...@@ -176,7 +176,11 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp ...@@ -176,7 +176,11 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
taosWriteQitem(pVnode->pFetchQ, pMsg); taosWriteQitem(pVnode->pFetchQ, pMsg);
break; break;
case WRITE_QUEUE: case WRITE_QUEUE:
if ((pMsg->msgType == TDMT_VND_SUBMIT) && (grantCheck(TSDB_GRANT_STORAGE) != TSDB_CODE_SUCCESS)) { if (!osDataSpaceAvailable()) {
terrno = TSDB_CODE_VND_NO_DISKSPACE;
code = terrno;
dError("vgId:%d, msg:%p put into vnode-write queue failed since %s", pVnode->vgId, pMsg, terrstr());
} else if ((pMsg->msgType == TDMT_VND_SUBMIT) && (grantCheck(TSDB_GRANT_STORAGE) != TSDB_CODE_SUCCESS)) {
terrno = TSDB_CODE_VND_NO_WRITE_AUTH; terrno = TSDB_CODE_VND_NO_WRITE_AUTH;
code = terrno; code = terrno;
dDebug("vgId:%d, msg:%p put into vnode-write queue failed since %s", pVnode->vgId, pMsg, terrstr()); dDebug("vgId:%d, msg:%p put into vnode-write queue failed since %s", pVnode->vgId, pMsg, terrstr());
......
...@@ -49,8 +49,26 @@ static int32_t dmInitMonitor() { ...@@ -49,8 +49,26 @@ static int32_t dmInitMonitor() {
return 0; return 0;
} }
static bool dmCheckDiskSpace() {
osUpdate();
if (!osDataSpaceAvailable()) {
dError("free disk size: %f GB, too little, require %f GB at least at least , quit", (double)tsDataSpace.size.avail / 1024.0 / 1024.0 / 1024.0, (double)tsDataSpace.reserved / 1024.0 / 1024.0 / 1024.0);
return false;
}
if (!osLogSpaceAvailable()) {
dError("free disk size: %f GB, too little, require %f GB at least at least, quit", (double)tsLogSpace.size.avail / 1024.0 / 1024.0 / 1024.0, (double)tsLogSpace.reserved / 1024.0 / 1024.0 / 1024.0);
return false;
}
if (!osTempSpaceAvailable()) {
dError("free disk size: %f GB, too little, require %f GB at least at least, quit", (double)tsTempSpace.size.avail / 1024.0 / 1024.0 / 1024.0, (double)tsTempSpace.reserved / 1024.0 / 1024.0 / 1024.0);
return false;
}
return true;
}
int32_t dmInit(int8_t rtype) { int32_t dmInit(int8_t rtype) {
dInfo("start to init dnode env"); dInfo("start to init dnode env");
if (!dmCheckDiskSpace()) return -1;
if (dmCheckRepeatInit(dmInstance()) != 0) return -1; if (dmCheckRepeatInit(dmInstance()) != 0) return -1;
if (dmInitSystem() != 0) return -1; if (dmInitSystem() != 0) return -1;
if (dmInitMonitor() != 0) return -1; if (dmInitMonitor() != 0) return -1;
......
...@@ -265,6 +265,7 @@ static void dmWatchNodes(SDnode *pDnode) { ...@@ -265,6 +265,7 @@ static void dmWatchNodes(SDnode *pDnode) {
} }
int32_t dmRunDnode(SDnode *pDnode) { int32_t dmRunDnode(SDnode *pDnode) {
int count = 0;
if (dmOpenNodes(pDnode) != 0) { if (dmOpenNodes(pDnode) != 0) {
dError("failed to open nodes since %s", terrstr()); dError("failed to open nodes since %s", terrstr());
return -1; return -1;
...@@ -274,7 +275,6 @@ int32_t dmRunDnode(SDnode *pDnode) { ...@@ -274,7 +275,6 @@ int32_t dmRunDnode(SDnode *pDnode) {
dError("failed to start nodes since %s", terrstr()); dError("failed to start nodes since %s", terrstr());
return -1; return -1;
} }
while (1) { while (1) {
if (pDnode->stop) { if (pDnode->stop) {
dInfo("TDengine is about to stop"); dInfo("TDengine is about to stop");
...@@ -285,6 +285,9 @@ int32_t dmRunDnode(SDnode *pDnode) { ...@@ -285,6 +285,9 @@ int32_t dmRunDnode(SDnode *pDnode) {
} }
dmWatchNodes(pDnode); dmWatchNodes(pDnode);
if (count == 0) osUpdate();
count %= 10;
count++;
taosMsleep(100); taosMsleep(100);
} }
} }
...@@ -149,7 +149,7 @@ int32_t mndSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void **ppWrit ...@@ -149,7 +149,7 @@ int32_t mndSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void **ppWrit
int32_t mndSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) { int32_t mndSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
mInfo("stop to apply snapshot to sdb, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64, isApply, mInfo("stop to apply snapshot to sdb, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64, isApply,
pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastApplyIndex); pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex);
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
return sdbStopWrite(pMnode->pSdb, pWriter, isApply, pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, return sdbStopWrite(pMnode->pSdb, pWriter, isApply, pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm,
pSnapshot->lastConfigIndex); pSnapshot->lastConfigIndex);
......
...@@ -630,7 +630,7 @@ static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool ...@@ -630,7 +630,7 @@ static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool
#ifdef USE_TSDB_SNAPSHOT #ifdef USE_TSDB_SNAPSHOT
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
vInfo("vgId:%d, stop write vnode snapshot, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64, pVnode->config.vgId, isApply, vInfo("vgId:%d, stop write vnode snapshot, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64, pVnode->config.vgId, isApply,
pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastApplyIndex); pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex);
int32_t code = vnodeSnapWriterClose(pWriter, !isApply, pSnapshot); int32_t code = vnodeSnapWriterClose(pWriter, !isApply, pSnapshot);
vInfo("vgId:%d, apply vnode snapshot finished, code:0x%x", pVnode->config.vgId, code); vInfo("vgId:%d, apply vnode snapshot finished, code:0x%x", pVnode->config.vgId, code);
......
...@@ -3398,7 +3398,12 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n ...@@ -3398,7 +3398,12 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n
uint32_t defaultBufsz = 0; uint32_t defaultBufsz = 0;
getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz); getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz);
int32_t code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, TD_TMP_DIR_PATH); if (!osTempSpaceAvailable()) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
qError("Init stream agg supporter failed since %s", terrstr(terrno));
return terrno;
}
int32_t code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, tsTempDir);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -4662,7 +4667,12 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlF ...@@ -4662,7 +4667,12 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlF
if (bufSize <= pageSize) { if (bufSize <= pageSize) {
bufSize = pageSize * 4; bufSize = pageSize * 4;
} }
int32_t code = createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, pKey, TD_TMP_DIR_PATH); if (!osTempSpaceAvailable()) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
qError("Init stream agg supporter failed since %s", terrstr(terrno));
return terrno;
}
int32_t code = createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, pKey, tsTempDir);
for (int32_t i = 0; i < numOfOutput; ++i) { for (int32_t i = 0; i < numOfOutput; ++i) {
pCtx[i].pBuf = pSup->pResultBuf; pCtx[i].pBuf = pSup->pResultBuf;
} }
......
...@@ -764,7 +764,13 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition ...@@ -764,7 +764,13 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
uint32_t defaultBufsz = 0; uint32_t defaultBufsz = 0;
getBufferPgSize(pResBlock->info.rowSize, &defaultPgsz, &defaultBufsz); getBufferPgSize(pResBlock->info.rowSize, &defaultPgsz, &defaultBufsz);
int32_t code = createDiskbasedBuf(&pInfo->pBuf, defaultPgsz, defaultBufsz, pTaskInfo->id.str, TD_TMP_DIR_PATH); if (!osTempSpaceAvailable()) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
pTaskInfo->code = terrno;
qError("Create partition operator info failed since %s", terrstr(terrno));
goto _error;
}
int32_t code = createDiskbasedBuf(&pInfo->pBuf, defaultPgsz, defaultBufsz, pTaskInfo->id.str, tsTempDir);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
......
...@@ -247,7 +247,12 @@ SLHashObj* tHashInit(int32_t inMemPages, int32_t pageSize, _hash_fn_t fn, int32_ ...@@ -247,7 +247,12 @@ SLHashObj* tHashInit(int32_t inMemPages, int32_t pageSize, _hash_fn_t fn, int32_
return NULL; return NULL;
} }
int32_t code = createDiskbasedBuf(&pHashObj->pBuf, pageSize, inMemPages * pageSize, 0, TD_TMP_DIR_PATH); if (!osTempSpaceAvailable()) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
printf("tHash Init failed since %s", terrstr(terrno));
return NULL;
}
int32_t code = createDiskbasedBuf(&pHashObj->pBuf, pageSize, inMemPages * pageSize, 0, tsTempDir);
if (code != 0) { if (code != 0) {
terrno = code; terrno = code;
return NULL; return NULL;
......
...@@ -159,7 +159,12 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) { ...@@ -159,7 +159,12 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
int32_t start = 0; int32_t start = 0;
if (pHandle->pBuf == NULL) { if (pHandle->pBuf == NULL) {
int32_t code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, "doAddToBuf", TD_TMP_DIR_PATH); if (!osTempSpaceAvailable()) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
qError("Add to buf failed since %s", terrstr(terrno));
return terrno;
}
int32_t code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, "doAddToBuf", tsTempDir);
dBufSetPrintInfo(pHandle->pBuf); dBufSetPrintInfo(pHandle->pBuf);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
...@@ -233,7 +238,13 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int ...@@ -233,7 +238,13 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int
} else { } else {
// multi-pass internal merge sort is required // multi-pass internal merge sort is required
if (pHandle->pBuf == NULL) { if (pHandle->pBuf == NULL) {
code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, "sortComparInit", TD_TMP_DIR_PATH); if (!osTempSpaceAvailable()) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
code = terrno;
qError("Sort compare init failed since %s", terrstr(terrno));
return code;
}
code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, "sortComparInit", tsTempDir);
dBufSetPrintInfo(pHandle->pBuf); dBufSetPrintInfo(pHandle->pBuf);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "taoserror.h"
#include "tglobal.h" #include "tglobal.h"
#include "tcompare.h" #include "tcompare.h"
...@@ -257,7 +258,14 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval, ...@@ -257,7 +258,14 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval,
resetSlotInfo(pBucket); resetSlotInfo(pBucket);
int32_t ret = createDiskbasedBuf(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 512, "1", TD_TMP_DIR_PATH); if (!osTempSpaceAvailable()) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
// qError("MemBucket create disk based Buf failed since %s", terrstr(terrno));
tMemBucketDestroy(pBucket);
return NULL;
}
int32_t ret = createDiskbasedBuf(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 512, "1", tsTempDir);
if (ret != 0) { if (ret != 0) {
tMemBucketDestroy(pBucket); tMemBucketDestroy(pBucket);
return NULL; return NULL;
......
...@@ -412,22 +412,31 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -412,22 +412,31 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
udf->outputLen = pFuncInfo->outputLen; udf->outputLen = pFuncInfo->outputLen;
udf->bufSize = pFuncInfo->bufSize; udf->bufSize = pFuncInfo->bufSize;
if (!osTempSpaceAvailable()) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
msgInfo->code = terrno;
fnError("udfd create shared library failed since %s", terrstr(terrno));
goto _return;
}
char path[PATH_MAX] = {0}; char path[PATH_MAX] = {0};
#ifdef WINDOWS #ifdef WINDOWS
snprintf(path, sizeof(path), "%s%s.dll", TD_TMP_DIR_PATH, pFuncInfo->name); snprintf(path, sizeof(path), "%s%s.dll", tsTempDir, pFuncInfo->name);
#else #else
snprintf(path, sizeof(path), "%s/lib%s.so", TD_TMP_DIR_PATH, pFuncInfo->name); snprintf(path, sizeof(path), "%s/lib%s.so", tsTempDir, pFuncInfo->name);
#endif #endif
TdFilePtr file = TdFilePtr file =
taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL); taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL);
if (file == NULL) { if (file == NULL) {
fnError("udfd write udf shared library: %s failed, error: %d %s", path, errno, strerror(errno)); fnError("udfd write udf shared library: %s failed, error: %d %s", path, errno, strerror(errno));
msgInfo->code = TSDB_CODE_FILE_CORRUPTED; msgInfo->code = TSDB_CODE_FILE_CORRUPTED;
goto _return;
} }
int64_t count = taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize); int64_t count = taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize);
if (count != pFuncInfo->codeSize) { if (count != pFuncInfo->codeSize) {
fnError("udfd write udf shared library failed"); fnError("udfd write udf shared library failed");
msgInfo->code = TSDB_CODE_FILE_CORRUPTED; msgInfo->code = TSDB_CODE_FILE_CORRUPTED;
goto _return;
} }
taosCloseFile(&file); taosCloseFile(&file);
strncpy(udf->path, path, strlen(path)); strncpy(udf->path, path, strlen(path));
......
...@@ -5845,11 +5845,16 @@ static int32_t createTagValFromExpr(STranslateContext* pCxt, SDataType targetDt, ...@@ -5845,11 +5845,16 @@ static int32_t createTagValFromExpr(STranslateContext* pCxt, SDataType targetDt,
} }
static int32_t createTagValFromVal(STranslateContext* pCxt, SDataType targetDt, SNode* pNode, SValueNode** pVal) { static int32_t createTagValFromVal(STranslateContext* pCxt, SDataType targetDt, SNode* pNode, SValueNode** pVal) {
*pVal = (SValueNode*)nodesCloneNode(pNode); SValueNode* pTempVal = (SValueNode*)nodesCloneNode(pNode);
if (NULL == *pVal) { if (NULL == pTempVal) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
return DEAL_RES_ERROR == translateValueImpl(pCxt, *pVal, targetDt, true) ? pCxt->errCode : TSDB_CODE_SUCCESS; if (DEAL_RES_ERROR == translateValueImpl(pCxt, pTempVal, targetDt, true)) {
nodesDestroyNode((SNode*)pTempVal);
return pCxt->errCode;
}
*pVal = pTempVal;
return TSDB_CODE_SUCCESS;
} }
static int32_t createTagVal(STranslateContext* pCxt, uint8_t precision, SSchema* pSchema, SNode* pNode, static int32_t createTagVal(STranslateContext* pCxt, uint8_t precision, SSchema* pSchema, SNode* pNode,
......
...@@ -573,7 +573,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc ...@@ -573,7 +573,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
// fsync once // fsync once
SSyncLogStoreData* pData = ths->pLogStore->data; SSyncLogStoreData* pData = ths->pLogStore->data;
SWal* pWal = pData->pWal; SWal* pWal = pData->pWal;
walFsync(pWal, true); walFsync(pWal, false);
// update match index // update match index
matchIndex = pMsg->prevLogIndex + pMsg->dataCount; matchIndex = pMsg->prevLogIndex + pMsg->dataCount;
...@@ -694,7 +694,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc ...@@ -694,7 +694,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
// fsync once // fsync once
SSyncLogStoreData* pData = ths->pLogStore->data; SSyncLogStoreData* pData = ths->pLogStore->data;
SWal* pWal = pData->pWal; SWal* pWal = pData->pWal;
walFsync(pWal, true); walFsync(pWal, false);
} }
// prepare response msg // prepare response msg
......
...@@ -206,7 +206,7 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr ...@@ -206,7 +206,7 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
SWal* pWal = pData->pWal; SWal* pWal = pData->pWal;
SyncIndex index = 0; SyncIndex index = 0;
SWalSyncInfo syncMeta; SWalSyncInfo syncMeta = {0};
syncMeta.isWeek = pEntry->isWeak; syncMeta.isWeek = pEntry->isWeak;
syncMeta.seqNum = pEntry->seqNum; syncMeta.seqNum = pEntry->seqNum;
syncMeta.term = pEntry->term; syncMeta.term = pEntry->term;
...@@ -444,7 +444,7 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { ...@@ -444,7 +444,7 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
SWal* pWal = pData->pWal; SWal* pWal = pData->pWal;
SyncIndex index = 0; SyncIndex index = 0;
SWalSyncInfo syncMeta; SWalSyncInfo syncMeta = {0};
syncMeta.isWeek = pEntry->isWeak; syncMeta.isWeek = pEntry->isWeak;
syncMeta.seqNum = pEntry->seqNum; syncMeta.seqNum = pEntry->seqNum;
syncMeta.term = pEntry->term; syncMeta.term = pEntry->term;
......
...@@ -132,8 +132,9 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) { ...@@ -132,8 +132,9 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) {
SyncIndex preLogIndex = syncNodeGetPreIndex(pSyncNode, nextIndex); SyncIndex preLogIndex = syncNodeGetPreIndex(pSyncNode, nextIndex);
SyncTerm preLogTerm = syncNodeGetPreTerm(pSyncNode, nextIndex); SyncTerm preLogTerm = syncNodeGetPreTerm(pSyncNode, nextIndex);
if (preLogTerm == SYNC_TERM_INVALID) { if (preLogTerm == SYNC_TERM_INVALID) {
// SyncIndex newNextIndex = syncNodeGetLastIndex(pSyncNode) + 1; SyncIndex newNextIndex = syncNodeGetLastIndex(pSyncNode) + 1;
SyncIndex newNextIndex = nextIndex + 1; // SyncIndex newNextIndex = nextIndex + 1;
syncIndexMgrSetIndex(pSyncNode->pNextIndex, pDestId, newNextIndex); syncIndexMgrSetIndex(pSyncNode->pNextIndex, pDestId, newNextIndex);
syncIndexMgrSetIndex(pSyncNode->pMatchIndex, pDestId, SYNC_INDEX_INVALID); syncIndexMgrSetIndex(pSyncNode->pMatchIndex, pDestId, SYNC_INDEX_INVALID);
sError("vgId:%d, sync get pre term error, nextIndex:%" PRId64 ", update next-index:%" PRId64 sError("vgId:%d, sync get pre term error, nextIndex:%" PRId64 ", update next-index:%" PRId64
...@@ -224,8 +225,9 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) { ...@@ -224,8 +225,9 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) {
SyncIndex preLogIndex = syncNodeGetPreIndex(pSyncNode, nextIndex); SyncIndex preLogIndex = syncNodeGetPreIndex(pSyncNode, nextIndex);
SyncTerm preLogTerm = syncNodeGetPreTerm(pSyncNode, nextIndex); SyncTerm preLogTerm = syncNodeGetPreTerm(pSyncNode, nextIndex);
if (preLogTerm == SYNC_TERM_INVALID) { if (preLogTerm == SYNC_TERM_INVALID) {
// SyncIndex newNextIndex = syncNodeGetLastIndex(pSyncNode) + 1; SyncIndex newNextIndex = syncNodeGetLastIndex(pSyncNode) + 1;
SyncIndex newNextIndex = nextIndex + 1; // SyncIndex newNextIndex = nextIndex + 1;
syncIndexMgrSetIndex(pSyncNode->pNextIndex, pDestId, newNextIndex); syncIndexMgrSetIndex(pSyncNode->pNextIndex, pDestId, newNextIndex);
syncIndexMgrSetIndex(pSyncNode->pMatchIndex, pDestId, SYNC_INDEX_INVALID); syncIndexMgrSetIndex(pSyncNode->pMatchIndex, pDestId, SYNC_INDEX_INVALID);
sError("vgId:%d, sync get pre term error, nextIndex:%" PRId64 ", update next-index:%" PRId64 sError("vgId:%d, sync get pre term error, nextIndex:%" PRId64 ", update next-index:%" PRId64
......
...@@ -105,6 +105,10 @@ void osCleanup() {} ...@@ -105,6 +105,10 @@ void osCleanup() {}
bool osLogSpaceAvailable() { return tsLogSpace.reserved <= tsLogSpace.size.avail; } bool osLogSpaceAvailable() { return tsLogSpace.reserved <= tsLogSpace.size.avail; }
bool osDataSpaceAvailable() { return tsDataSpace.reserved <= tsDataSpace.size.avail; }
bool osTempSpaceAvailable() { return tsTempSpace.reserved <= tsTempSpace.size.avail; }
void osSetTimezone(const char *timezone) { taosSetSystemTimezone(timezone, tsTimezoneStr, &tsDaylight, &tsTimezone); } void osSetTimezone(const char *timezone) { taosSetSystemTimezone(timezone, tsTimezoneStr, &tsDaylight, &tsTimezone); }
void osSetSystemLocale(const char *inLocale, const char *inCharSet) { void osSetSystemLocale(const char *inLocale, const char *inCharSet) {
......
...@@ -691,9 +691,16 @@ static void taosWriteLog(SLogBuff *pLogBuf) { ...@@ -691,9 +691,16 @@ static void taosWriteLog(SLogBuff *pLogBuf) {
static void *taosAsyncOutputLog(void *param) { static void *taosAsyncOutputLog(void *param) {
SLogBuff *pLogBuf = (SLogBuff *)param; SLogBuff *pLogBuf = (SLogBuff *)param;
setThreadName("log"); setThreadName("log");
int32_t count = 0;
while (1) { while (1) {
count += tsWriteInterval;
taosMsleep(tsWriteInterval); taosMsleep(tsWriteInterval);
if (count > 1000) {
osUpdate();
count = 0;
uError("Write log file failed, since log disk spase is not enough.\n %f GB, too little, require %f GB at least at least.", (double)tsLogSpace.size.avail / 1024.0 / 1024.0 / 1024.0, (double)tsLogSpace.reserved / 1024.0 / 1024.0 / 1024.0);
if (!osLogSpaceAvailable()) pLogBuf->stop = 1;
}
// Polling the buffer // Polling the buffer
taosWriteLog(pLogBuf); taosWriteLog(pLogBuf);
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3
system sh/deploy.sh -n dnode4 -i 4
system sh/cfg.sh -n dnode1 -c supportVnodes -v 0
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
system sh/exec.sh -n dnode4 -s start
sql connect
sql create dnode $hostname port 7200
sql create dnode $hostname port 7300
sql create dnode $hostname port 7400
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册