提交 9e9cc345 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/master' into hotfix/tmq

...@@ -6000,6 +6000,12 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { ...@@ -6000,6 +6000,12 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
verbosePrint("%s() LN%d: tid=%d seq=%"PRId64" tableName=%s\n", verbosePrint("%s() LN%d: tid=%d seq=%"PRId64" tableName=%s\n",
__func__, __LINE__, __func__, __LINE__,
pThreadInfo->threadID, tableSeq, tableName); pThreadInfo->threadID, tableSeq, tableName);
if (0 == strlen(tableName)) {
errorPrint("[%d] %s() LN%d, getTableName return null\n",
pThreadInfo->threadID, __func__, __LINE__);
free(pThreadInfo->buffer);
return NULL;
}
int64_t remainderBufLen = maxSqlLen; int64_t remainderBufLen = maxSqlLen;
char *pstr = pThreadInfo->buffer; char *pstr = pThreadInfo->buffer;
......
...@@ -6720,6 +6720,10 @@ int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *p ...@@ -6720,6 +6720,10 @@ int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *p
if (pQueryMsg->tsLen > 0) { // open new file to save the result if (pQueryMsg->tsLen > 0) { // open new file to save the result
char *tsBlock = (char *) pQueryMsg + pQueryMsg->tsOffset; char *tsBlock = (char *) pQueryMsg + pQueryMsg->tsOffset;
pTsBuf = tsBufCreateFromCompBlocks(tsBlock, pQueryMsg->tsNumOfBlocks, pQueryMsg->tsLen, pQueryMsg->tsOrder, vgId); pTsBuf = tsBufCreateFromCompBlocks(tsBlock, pQueryMsg->tsNumOfBlocks, pQueryMsg->tsLen, pQueryMsg->tsOrder, vgId);
if (pTsBuf == NULL) {
code = TSDB_CODE_QRY_NO_DISKSPACE;
goto _error;
}
tsBufResetPos(pTsBuf); tsBufResetPos(pTsBuf);
bool ret = tsBufNextPos(pTsBuf); bool ret = tsBufNextPos(pTsBuf);
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
#include "taoserror.h" #include "taoserror.h"
#include "tscompression.h" #include "tscompression.h"
#include "tutil.h" #include "tutil.h"
#include "queryLog.h"
static int32_t getDataStartOffset(); static int32_t getDataStartOffset();
static void TSBufUpdateGroupInfo(STSBuf* pTSBuf, int32_t index, STSGroupBlockInfo* pBlockInfo); static void TSBufUpdateGroupInfo(STSBuf* pTSBuf, int32_t index, STSGroupBlockInfo* pBlockInfo);
...@@ -633,10 +634,15 @@ int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader) { ...@@ -633,10 +634,15 @@ int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader) {
int32_t r = fseek(pTSBuf->f, 0, SEEK_SET); int32_t r = fseek(pTSBuf->f, 0, SEEK_SET);
if (r != 0) { if (r != 0) {
qError("fseek failed, errno:%d", errno);
return -1; return -1;
} }
fwrite(pHeader, sizeof(STSBufFileHeader), 1, pTSBuf->f); size_t ws = fwrite(pHeader, sizeof(STSBufFileHeader), 1, pTSBuf->f);
if (ws != 1) {
qError("ts update header fwrite failed, size:%d, expected size:%d", (int32_t)ws, (int32_t)sizeof(STSBufFileHeader));
return -1;
}
return 0; return 0;
} }
...@@ -856,9 +862,17 @@ STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_ ...@@ -856,9 +862,17 @@ STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_
TSBufUpdateGroupInfo(pTSBuf, pTSBuf->numOfGroups - 1, pBlockInfo); TSBufUpdateGroupInfo(pTSBuf, pTSBuf->numOfGroups - 1, pBlockInfo);
int32_t ret = fseek(pTSBuf->f, pBlockInfo->offset, SEEK_SET); int32_t ret = fseek(pTSBuf->f, pBlockInfo->offset, SEEK_SET);
UNUSED(ret); if (ret == -1) {
qError("fseek failed, errno:%d", errno);
tsBufDestroy(pTSBuf);
return NULL;
}
size_t sz = fwrite((void*)pData, 1, len, pTSBuf->f); size_t sz = fwrite((void*)pData, 1, len, pTSBuf->f);
UNUSED(sz); if (sz != len) {
qError("ts data fwrite failed, write size:%d, expected size:%d", (int32_t)sz, len);
tsBufDestroy(pTSBuf);
return NULL;
}
pTSBuf->fileSize += len; pTSBuf->fileSize += len;
pTSBuf->tsOrder = order; pTSBuf->tsOrder = order;
...@@ -866,9 +880,16 @@ STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_ ...@@ -866,9 +880,16 @@ STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_
STSBufFileHeader header = { STSBufFileHeader header = {
.magic = TS_COMP_FILE_MAGIC, .numOfGroup = pTSBuf->numOfGroups, .tsOrder = pTSBuf->tsOrder}; .magic = TS_COMP_FILE_MAGIC, .numOfGroup = pTSBuf->numOfGroups, .tsOrder = pTSBuf->tsOrder};
STSBufUpdateHeader(pTSBuf, &header); if (STSBufUpdateHeader(pTSBuf, &header) < 0) {
tsBufDestroy(pTSBuf);
return NULL;
}
fsync(fileno(pTSBuf->f)); if (fsync(fileno(pTSBuf->f)) == -1) {
qError("fsync failed, errno:%d", errno);
tsBufDestroy(pTSBuf);
return NULL;
}
return pTSBuf; return pTSBuf;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册