提交 5374d824 编写于 作者: S slguan

fix issue #201 #319

上级 4bcb26a7
......@@ -84,7 +84,7 @@ static bool onlyQueryMetricTags(SSqlCmd* pCmd);
static bool hasUnsupportFunctionsForMetricQuery(SSqlCmd* pCmd);
static bool functionCompatibleCheck(SSqlCmd* pCmd);
static void setColumnOffsetValueInResultset(SSqlCmd* pCmd);
static void setColumnOffsetValueInResultset(SSqlCmd* pCmd);
static int32_t setGroupByClause(SSqlCmd* pCmd, tVariantList* pList);
static int32_t setIntervalClause(SSqlCmd* pCmd, SQuerySQL* pQuerySql);
......@@ -97,7 +97,7 @@ static int32_t setFillPolicy(SSqlCmd* pCmd, SQuerySQL* pQuerySQL);
static int32_t setOrderByClause(SSqlCmd* pCmd, SQuerySQL* pQuerySql, SSchema* pSchema, int32_t numOfCols);
static int32_t tsRewriteFieldNameIfNecessary(SSqlCmd* pCmd);
static bool validateOneTags(SSqlCmd* pCmd, TAOS_FIELD* pTagField);
static bool validateOneTags(SSqlCmd* pCmd, TAOS_FIELD* pTagField);
static int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo);
static int32_t validateSqlFunctionInStreamSql(SSqlCmd* pCmd);
static int32_t buildArithmeticExprString(tSQLExpr* pExpr, char** exprString, SColumnIdList* colIdList);
......@@ -106,10 +106,10 @@ static int32_t validateArithmeticSQLExpr(tSQLExpr* pExpr, SSchema* pSchema, int3
static int32_t validateDNodeConfig(tDCLSQL* pOptions);
static int32_t validateColumnName(char* name);
static int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo);
static bool hasTimestampForPointInterpQuery(SSqlCmd* pCmd);
static void updateTagColumnIndex(SSqlCmd* pCmd);
static bool hasTimestampForPointInterpQuery(SSqlCmd* pCmd);
static void updateTagColumnIndex(SSqlCmd* pCmd);
static int32_t setLimitOffsetValueInfo(SSqlObj* pSql, SQuerySQL* pQuerySql);
static void addRequiredTagColumn(SSqlCmd* pCmd, int32_t tagColIndex);
static void addRequiredTagColumn(SSqlCmd* pCmd, int32_t tagColIndex);
static int32_t parseCreateDBOptions(SCreateDBInfo* pCreateDbSql, SSqlCmd* pCmd);
static int32_t tscQueryOnlyMetricTags(SSqlCmd* pCmd, bool* queryOnMetricTags) {
......@@ -202,13 +202,13 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
pCmd->command = TSDB_SQL_USE_DB;
SSQLToken* pToken = &pInfo->pDCLInfo->a[0];
if (tscValidateName(pToken) != TSDB_CODE_SUCCESS) {
const char* msg1 = "invalid db name";
if (tscValidateName(pToken) != TSDB_CODE_SUCCESS) {
const char* msg1 = "invalid db name";
setErrMsg(pCmd, msg1);
return TSDB_CODE_INVALID_SQL;
}
if (pToken->n > TSDB_DB_NAME_LEN) {
setErrMsg(pCmd, msg);
return TSDB_CODE_INVALID_SQL;
......@@ -390,15 +390,15 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
case DESCRIBE_TABLE: {
pCmd->command = TSDB_SQL_DESCRIBE_TABLE;
SSQLToken* pToken = &pInfo->pDCLInfo->a[0];
SSQLToken* pToken = &pInfo->pDCLInfo->a[0];
const char* msg = "table name is too long";
if (tscValidateName(pToken) != TSDB_CODE_SUCCESS) {
const char* msg1 = "invalid table name";
if (tscValidateName(pToken) != TSDB_CODE_SUCCESS) {
const char* msg1 = "invalid table name";
setErrMsg(pCmd, msg1);
return TSDB_CODE_INVALID_SQL;
}
if (pToken->n > TSDB_METER_NAME_LEN) {
setErrMsg(pCmd, msg);
return TSDB_CODE_INVALID_SQL;
......@@ -575,12 +575,12 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
// metric name, create table by using dst
SSQLToken* pToken = &(pInfo->pCreateTableInfo->usingInfo.metricName);
if (tscValidateName(pToken) != TSDB_CODE_SUCCESS) {
if (tscValidateName(pToken) != TSDB_CODE_SUCCESS) {
setErrMsg(pCmd, msg);
return TSDB_CODE_INVALID_SQL;
}
int32_t ret = setMeterID(pSql, pToken);
int32_t ret = setMeterID(pSql, pToken);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
......@@ -710,7 +710,8 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
}
/*
* check if fill operation is available, the fill operation is parsed and executed during query execution, not here.
* check if fill operation is available, the fill operation is parsed and executed during query execution, not
* here.
*/
if (pQuerySql->fillType != NULL) {
if (pCmd->nAggTimeInterval == 0) {
......@@ -745,13 +746,13 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
setErrMsg(pCmd, msg);
return TSDB_CODE_INVALID_SQL;
}
if (tscValidateName(&(pQuerySql->from)) != TSDB_CODE_SUCCESS) {
const char* msg = "invalid table name";
if (tscValidateName(&(pQuerySql->from)) != TSDB_CODE_SUCCESS) {
const char* msg = "invalid table name";
setErrMsg(pCmd, msg);
return TSDB_CODE_INVALID_SQL;
}
if (setMeterID(pSql, &pQuerySql->from) != TSDB_CODE_SUCCESS) {
const char* msg = "table name too long";
setErrMsg(pCmd, msg);
......@@ -953,7 +954,7 @@ int32_t setIntervalClause(SSqlCmd* pCmd, SQuerySQL* pQuerySql) {
tscSqlExprInsert(pCmd, 0, TSDB_FUNC_TS, 0, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE);
SColumnList ids = {.numOfCols = 1, .ids = {0}};
int32_t ret = insertResultField(pCmd, 0, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, aAggs[TSDB_FUNC_TS].aName);
int32_t ret = insertResultField(pCmd, 0, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, aAggs[TSDB_FUNC_TS].aName);
return ret;
}
......@@ -984,11 +985,11 @@ int32_t setSlidingClause(SSqlCmd* pCmd, SQuerySQL* pQuerySql) {
return TSDB_CODE_SUCCESS;
}
int32_t setMeterID(SSqlObj *pSql, SSQLToken *pzTableName) {
SSqlCmd *pCmd = &(pSql->cmd);
int32_t ret = TSDB_CODE_SUCCESS;
int32_t setMeterID(SSqlObj* pSql, SSQLToken* pzTableName) {
SSqlCmd* pCmd = &(pSql->cmd);
int32_t ret = TSDB_CODE_SUCCESS;
//clear array
// clear array
memset(pCmd->name, 0, tListLen(pCmd->name));
const char* msg = "name too long";
......@@ -1607,7 +1608,7 @@ static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SSchema* pSchema, int32_t
int16_t type = 0;
int16_t bytes = 0;
char columnName[TSDB_COL_NAME_LEN] = {0};
char columnName[TSDB_COL_NAME_LEN] = {0};
const char* msg1 = "not support column types";
if (functionID == TSDB_FUNC_SPREAD) {
......@@ -2621,7 +2622,7 @@ static int32_t buildTagQueryCondString(SSqlCmd* pCmd, tSQLExpr* pExpr, char** qu
const char* msg0 = "invalid table name list";
const char* msg1 = "like operation is not allowed on numeric tags";
const char* msg2 = "in and query condition cannot be mixed up";
STagCond* pCond = &pCmd->tagCond;
STagCond* pCond = &pCmd->tagCond;
if (pExpr->nSQLOptr == TK_IN && pRight->nSQLOptr == TK_SET) {
/* table name array list, invoke another routine */
......@@ -3048,7 +3049,7 @@ int tableNameCompar(const void* lhs, const void* rhs) {
}
static int32_t setMetersIDForMetricQuery(SSqlObj* pSql, char* tmpTagCondBuf) {
SSqlCmd* pCmd = &pSql->cmd;
SSqlCmd* pCmd = &pSql->cmd;
const char* msg = "meter name too long";
pCmd->tagCond.allocSize = 4096;
......@@ -3071,7 +3072,9 @@ static int32_t setMetersIDForMetricQuery(SSqlObj* pSql, char* tmpTagCondBuf) {
extractDBName(pCmd->name, db);
SSQLToken tDB = {
.z = db, .n = strlen(db), .type = TK_STRING,
.z = db,
.n = strlen(db),
.type = TK_STRING,
};
char* acc = getAccountId(pSql);
......@@ -3297,9 +3300,9 @@ int32_t setFillPolicy(SSqlCmd* pCmd, SQuerySQL* pQuerySQL) {
tVariantListItem* pItem = &pFillToken->a[0];
const int32_t START_INTERPO_COL_IDX = 1;
const char* msg = "illegal value or data overflow";
const char* msg1 = "value is expected";
const char* msg2 = "invalid fill option";
const char* msg = "illegal value or data overflow";
const char* msg1 = "value is expected";
const char* msg2 = "invalid fill option";
if (pItem->pVar.nType != TSDB_DATA_TYPE_BINARY) {
setErrMsg(pCmd, msg2);
......@@ -3518,7 +3521,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
pCmd->command = TSDB_SQL_ALTER_TABLE;
if (tscValidateName(&(pAlterSQL->name)) != TSDB_CODE_SUCCESS) {
const char* msg = "invalid table name";
const char* msg = "invalid table name";
setErrMsg(pCmd, msg);
return TSDB_CODE_INVALID_SQL;
}
......@@ -3658,7 +3661,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
srcFound = true;
}
//todo extract method
// todo extract method
if ((!dstFound) && (strncasecmp(tagName, pDstItem->pVar.pz, nameLen) == 0 && (pDstItem->pVar.nLen == nameLen))) {
dstFound = true;
}
......@@ -3818,7 +3821,7 @@ int32_t validateSqlFunctionInStreamSql(SSqlCmd* pCmd) {
}
int32_t validateFunctionsInIntervalOrGroupbyQuery(SSqlCmd* pCmd) {
bool isProjectionFunction = false;
bool isProjectionFunction = false;
const char* msg = "column projection is not compatible with interval";
// multi-output set/ todo refactor
......@@ -3972,7 +3975,8 @@ int32_t setLimitOffsetValueInfo(SSqlObj* pSql, SQuerySQL* pQuerySql) {
}
/*
* Query results are empty. Therefore, the result is filled with 0 if count function is employed in selection clause.
* Query results are empty. Therefore, the result is filled with 0 if count function is employed in selection
* clause.
*
* The fill of empty result is required only when interval clause is absent.
*/
......@@ -4037,7 +4041,7 @@ int32_t parseCreateDBOptions(SCreateDBInfo* pCreateDbSql, SSqlCmd* pCmd) {
const char* msg0 = "invalid number of options";
const char* msg1 = "invalid time precision";
SCreateDbMsg *pMsg = (SCreateDbMsg *) (pCmd->payload + tsRpcHeadSize + sizeof(SMgmtHead));
SCreateDbMsg* pMsg = (SCreateDbMsg*)(pCmd->payload + tsRpcHeadSize + sizeof(SMgmtHead));
setCreateDBOption(pMsg, pCreateDbSql);
if (pCreateDbSql->keep != NULL) {
......@@ -4063,7 +4067,7 @@ int32_t parseCreateDBOptions(SCreateDBInfo* pCreateDbSql, SSqlCmd* pCmd) {
}
}
SSQLToken *pToken = &pCreateDbSql->precision;
SSQLToken* pToken = &pCreateDbSql->precision;
if (pToken->n > 0) {
pToken->n = strdequote(pToken->z);
......@@ -4072,7 +4076,7 @@ int32_t parseCreateDBOptions(SCreateDBInfo* pCreateDbSql, SSqlCmd* pCmd) {
// time precision for this db: million second
pMsg->precision = TSDB_TIME_PRECISION_MILLI;
} else if (strncmp(pToken->z, TSDB_TIME_PRECISION_MICRO_STR, pToken->n) == 0 &&
strlen(TSDB_TIME_PRECISION_MICRO_STR) == pToken->n) {
strlen(TSDB_TIME_PRECISION_MICRO_STR) == pToken->n) {
pMsg->precision = TSDB_TIME_PRECISION_MICRO;
} else {
setErrMsg(pCmd, msg1);
......
......@@ -242,6 +242,8 @@ int64_t __sync_add_and_fetch_64(int64_t *ptr, int64_t val);
#define __sync_val_compare_and_swap_32 __sync_val_compare_and_swap
#define __sync_add_and_fetch_64 __sync_add_and_fetch
#define __sync_add_and_fetch_32 __sync_add_and_fetch
ssize_t tsendfile(int dfd, int sfd, off_t *offset, size_t size);
ssize_t twrite(int fd, void *buf, size_t n);
#endif
#ifdef __cplusplus
......
......@@ -626,7 +626,7 @@ int taosDumpDb(SDbInfo *dbInfo, struct arguments *arguments, FILE *fp) {
strncpy(tableRecord.name, (char *)row[TSDB_SHOW_TABLES_NAME_INDEX], fields[TSDB_SHOW_TABLES_NAME_INDEX].bytes);
strncpy(tableRecord.metric, (char *)row[TSDB_SHOW_TABLES_METRIC_INDEX], fields[TSDB_SHOW_TABLES_METRIC_INDEX].bytes);
write(fd, &tableRecord, sizeof(STableRecord));
twrite(fd, &tableRecord, sizeof(STableRecord));
}
taos_free_result(result);
......@@ -831,7 +831,7 @@ int32_t taosDumpMetric(char *metric, struct arguments *arguments, FILE *fp) {
memset(&tableRecord, 0, sizeof(STableRecord));
strncpy(tableRecord.name, (char *)row[0], fields[0].bytes);
strcpy(tableRecord.metric, metric);
write(fd, &tableRecord, sizeof(STableRecord));
twrite(fd, &tableRecord, sizeof(STableRecord));
}
taos_free_result(result);
......
......@@ -43,6 +43,7 @@
#include <pthread.h>
#include <stdbool.h>
#include <limits.h>
#include <sys/sendfile.h>
bool taosCheckPthreadValid(pthread_t thread);
......
......@@ -251,3 +251,46 @@ int taosInitTimer(void *(*callback)(void *), int ms) {
return 0;
}
ssize_t tsendfile(int dfd, int sfd, off_t *offset, size_t size) {
size_t leftbytes = size;
ssize_t sentbytes;
while (leftbytes > 0) {
// TODO : Think to check if file is larger than 1GB
if (leftbytes > 1000000000) leftbytes = 1000000000;
sentbytes = sendfile(dfd, sfd, offset, leftbytes);
if (sentbytes == -1) {
if (errno == EINTR) {
continue;
}
else {
return -1;
}
}
leftbytes -= sentbytes;
}
return size;
}
ssize_t twrite(int fd, void *buf, size_t n) {
size_t nleft, nwritten;
nleft = n;
while (nleft > 0) {
nwritten = write(fd, buf, nleft);
if (nwritten < 0) {
if (errno == EINTR) {
continue;
}
return -1;
}
nleft -= nwritten;
buf += nwritten;
}
return n;
}
\ No newline at end of file
......@@ -62,7 +62,7 @@ void sdbFinishCommit(void *handle) {
off_t offset = lseek(pTable->fd, 0, SEEK_END);
assert(offset == pTable->size);
write(pTable->fd, &sdbEcommit, sizeof(sdbEcommit));
twrite(pTable->fd, &sdbEcommit, sizeof(sdbEcommit));
pTable->size += sizeof(sdbEcommit);
}
......@@ -112,7 +112,7 @@ int sdbOpenSdbFile(SSdbTable *pTable) {
tclose(pTable->fd);
return -1;
}
write(pTable->fd, &(pTable->header), size);
twrite(pTable->fd, &(pTable->header), size);
pTable->size += size;
sdbFinishCommit(pTable);
} else {
......@@ -401,7 +401,7 @@ int64_t sdbInsertRow(void *handle, void *row, int rowSize) {
/* Update the disk content */
/* write(pTable->fd, &action, sizeof(action)); */
/* pTable->size += sizeof(action); */
write(pTable->fd, rowHead, real_size);
twrite(pTable->fd, rowHead, real_size);
pTable->size += real_size;
sdbFinishCommit(pTable);
......@@ -497,7 +497,7 @@ int sdbDeleteRow(void *handle, void *row) {
}
/* write(pTable->fd, &action, sizeof(action)); */
/* pTable->size += sizeof(action); */
write(pTable->fd, rowHead, total_size);
twrite(pTable->fd, rowHead, total_size);
pTable->size += total_size;
sdbFinishCommit(pTable);
......@@ -592,7 +592,7 @@ int sdbUpdateRow(void *handle, void *row, int updateSize, char isUpdated) {
}
/* write(pTable->fd, &action, sizeof(action)); */
/* pTable->size += sizeof(action); */
write(pTable->fd, rowHead, real_size);
twrite(pTable->fd, rowHead, real_size);
pMeta->id = pTable->id;
pMeta->offset = pTable->size;
......@@ -675,7 +675,7 @@ int sdbBatchUpdateRow(void *handle, void *row, int rowSize) {
taosCalcChecksumAppend(0, (uint8_t *)rowHead, sizeof(SRowHead) + rowHead->rowSize + sizeof(TSCKSUM));
pMeta->rowSize = rowHead->rowSize;
lseek(pTable->fd, pTable->size, SEEK_SET);
write(pTable->fd, rowHead, sizeof(SRowHead) + rowHead->rowSize + sizeof(TSCKSUM));
twrite(pTable->fd, rowHead, sizeof(SRowHead) + rowHead->rowSize + sizeof(TSCKSUM));
pTable->size += (sizeof(SRowHead) + rowHead->rowSize + sizeof(TSCKSUM));
sdbAddIntoUpdateList(pTable, SDB_TYPE_UPDATE, last_row);
......@@ -855,9 +855,9 @@ void sdbSaveSnapShot(void *handle) {
memset(rowHead, 0, size);
// Write the header
write(fd, &(pTable->header), sizeof(SSdbHeader));
twrite(fd, &(pTable->header), sizeof(SSdbHeader));
size += sizeof(SSdbHeader);
write(fd, &sdbEcommit, sizeof(sdbEcommit));
twrite(fd, &sdbEcommit, sizeof(sdbEcommit));
size += sizeof(sdbEcommit);
while (1) {
......@@ -876,9 +876,9 @@ void sdbSaveSnapShot(void *handle) {
/* write(fd, &action, sizeof(action)); */
/* size += sizeof(action); */
write(fd, rowHead, real_size);
twrite(fd, rowHead, real_size);
size += real_size;
write(fd, &sdbEcommit, sizeof(sdbEcommit));
twrite(fd, &sdbEcommit, sizeof(sdbEcommit));
size += sizeof(sdbEcommit);
numOfRows++;
}
......
......@@ -17,7 +17,6 @@
#include <assert.h>
#include <fcntl.h>
#include <libgen.h>
#include <sys/sendfile.h>
#include <sys/stat.h>
#include <sys/stat.h>
#include <sys/time.h>
......@@ -65,26 +64,7 @@ int vnodeRecoverCompHeader(int vnode, int fileId);
int vnodeRecoverHeadFile(int vnode, int fileId);
int vnodeRecoverDataFile(int vnode, int fileId);
int vnodeForwardStartPosition(SQuery *pQuery, SCompBlock *pBlock, int32_t slotIdx, SVnodeObj *pVnode, SMeterObj *pObj);
int64_t tsendfile(int dfd, int sfd, int64_t bytes) {
int64_t leftbytes = bytes;
off_t offset = 0;
int64_t sentbytes;
while (leftbytes > 0) {
sentbytes = (leftbytes > 1000000000) ? 1000000000 : leftbytes;
sentbytes = sendfile(dfd, sfd, &offset, sentbytes);
if (sentbytes < 0) {
dError("send file failed,reason:%s", strerror(errno));
return -1;
}
leftbytes -= sentbytes;
// dTrace("sentbytes:%ld leftbytes:%ld", sentbytes, leftbytes);
}
return bytes;
}
int vnodeCheckNewHeaderFile(int fd, SVnodeObj *pVnode);
void vnodeGetHeadDataLname(char *headName, char *dataName, char *lastName, int vnode, int fileId) {
if (headName != NULL) sprintf(headName, "%s/vnode%d/db/v%df%d.head", tsDirectory, vnode, vnode, fileId);
......@@ -172,7 +152,7 @@ int vnodeCreateEmptyCompFile(int vnode, int fileId) {
taosCalcChecksumAppend(0, (uint8_t *)temp, size);
lseek(tfd, TSDB_FILE_HEADER_LEN, SEEK_SET);
write(tfd, temp, size);
twrite(tfd, temp, size);
free(temp);
close(tfd);
......@@ -345,7 +325,7 @@ int vnodeOpenCommitFiles(SVnodeObj *pVnode, int noTempLast) {
char *temp = malloc(size);
memset(temp, 0, size);
taosCalcChecksumAppend(0, (uint8_t *)temp, size);
write(pVnode->nfd, temp, size);
twrite(pVnode->nfd, temp, size);
free(temp);
pVnode->dfSize = lseek(pVnode->dfd, 0, SEEK_END);
......@@ -409,6 +389,11 @@ void vnodeCloseCommitFiles(SVnodeObj *pVnode) {
int fileId;
int ret;
// Check new if new header file is correct
#ifdef _CHECK_HEADER_FILE_
assert(vnodeCheckNewHeaderFile(pVnode->nfd, pVnode) == 0);
#endif
close(pVnode->nfd);
pVnode->nfd = 0;
......@@ -673,7 +658,7 @@ _again:
pCompBlock->last = 0;
pCompBlock->offset = lseek(pVnode->dfd, 0, SEEK_END);
lseek(pVnode->lfd, pMeter->lastBlock.offset, SEEK_SET);
sendfile(pVnode->dfd, pVnode->lfd, NULL, pMeter->lastBlock.len);
tsendfile(pVnode->dfd, pVnode->lfd, NULL, pMeter->lastBlock.len);
pVnode->dfSize = pCompBlock->offset + pMeter->lastBlock.len;
headLen += sizeof(SCompBlock);
......@@ -779,7 +764,7 @@ _again:
vnodeUpdateHeadFileHeader(pVnode->nfd, &headInfo);
lseek(pVnode->nfd, TSDB_FILE_HEADER_LEN, SEEK_SET);
taosCalcChecksumAppend(0, (uint8_t *)tmem, tmsize);
if (write(pVnode->nfd, tmem, tmsize) <= 0) {
if (twrite(pVnode->nfd, tmem, tmsize) <= 0) {
dError("vid:%d sid:%d id:%s, failed to write:%s, error:%s", vnode, sid, pObj->meterId, pVnode->nfn,
strerror(errno));
goto _over;
......@@ -802,7 +787,7 @@ _again:
compInfo.delimiter = TSDB_VNODE_DELIMITER;
taosCalcChecksumAppend(0, (uint8_t *)(&compInfo), sizeof(SCompInfo));
lseek(pVnode->nfd, pMeter->compInfoOffset, SEEK_SET);
if (write(pVnode->nfd, &compInfo, sizeof(compInfo)) <= 0) {
if (twrite(pVnode->nfd, &compInfo, sizeof(compInfo)) <= 0) {
dError("vid:%d sid:%d id:%s, failed to write:%s, reason:%s", vnode, sid, pObj->meterId, pVnode->nfn,
strerror(errno));
goto _over;
......@@ -815,10 +800,10 @@ _again:
if (pMeter->changed) {
int compBlockLen = pMeter->oldNumOfBlocks * sizeof(SCompBlock);
read(pVnode->hfd, pOldCompBlocks, compBlockLen);
write(pVnode->nfd, pOldCompBlocks, compBlockLen);
twrite(pVnode->nfd, pOldCompBlocks, compBlockLen);
chksum = taosCalcChecksum(0, pOldCompBlocks, compBlockLen);
} else {
sendfile(pVnode->nfd, pVnode->hfd, NULL, pMeter->oldNumOfBlocks * sizeof(SCompBlock));
tsendfile(pVnode->nfd, pVnode->hfd, NULL, pMeter->oldNumOfBlocks * sizeof(SCompBlock));
read(pVnode->hfd, &chksum, sizeof(TSCKSUM));
}
}
......@@ -826,13 +811,13 @@ _again:
if (pMeter->newNumOfBlocks) {
chksum = taosCalcChecksum(chksum, (uint8_t *)(hmem + pMeter->tempHeadOffset),
pMeter->newNumOfBlocks * sizeof(SCompBlock));
if (write(pVnode->nfd, hmem + pMeter->tempHeadOffset, pMeter->newNumOfBlocks * sizeof(SCompBlock)) <= 0) {
if (twrite(pVnode->nfd, hmem + pMeter->tempHeadOffset, pMeter->newNumOfBlocks * sizeof(SCompBlock)) <= 0) {
dError("vid:%d sid:%d id:%s, failed to write:%s, reason:%s", vnode, sid, pObj->meterId, pVnode->nfn,
strerror(errno));
goto _over;
}
}
write(pVnode->nfd, &chksum, sizeof(TSCKSUM));
twrite(pVnode->nfd, &chksum, sizeof(TSCKSUM));
}
tfree(pOldCompBlocks);
......@@ -1221,7 +1206,7 @@ int vnodeWriteBlockToFile(SMeterObj *pObj, SCompBlock *pCompBlock, SData *data[]
// Write SField part
taosCalcChecksumAppend(0, (uint8_t *)fields, size);
wlen = write(dfd, fields, size);
wlen = twrite(dfd, fields, size);
if (wlen <= 0) {
tfree(fields);
dError("vid:%d sid:%d id:%s, failed to write block, wlen:%d reason:%s", pObj->vnode, pObj->sid, pObj->meterId, wlen,
......@@ -1236,9 +1221,9 @@ int vnodeWriteBlockToFile(SMeterObj *pObj, SCompBlock *pCompBlock, SData *data[]
// Write data part
for (int i = 0; i < pObj->numOfColumns; ++i) {
if (pCfg->compression) {
wlen = write(dfd, cdata[i]->data, cdata[i]->len + sizeof(TSCKSUM));
wlen = twrite(dfd, cdata[i]->data, cdata[i]->len + sizeof(TSCKSUM));
} else {
wlen = write(dfd, data[i]->data, data[i]->len + sizeof(TSCKSUM));
wlen = twrite(dfd, data[i]->data, data[i]->len + sizeof(TSCKSUM));
}
if (wlen <= 0) {
......@@ -1814,3 +1799,71 @@ int vnodeRecoverDataFile(int vnode, int fileId) {
assert(0);
return 0;
}
int vnodeCheckNewHeaderFile(int fd, SVnodeObj *pVnode) {
SCompHeader *pHeader = NULL;
SCompBlock *pBlocks = NULL;
int blockSize = 0;
SCompInfo compInfo;
int tmsize = 0;
tmsize = sizeof(SCompHeader) * pVnode->cfg.maxSessions + sizeof(TSCKSUM);
pHeader = (SCompHeader *)malloc(tmsize);
if (pHeader == NULL) return 0;
lseek(fd, TSDB_FILE_HEADER_LEN, SEEK_SET);
if (read(fd, (void *)pHeader, tmsize) != tmsize) {
goto _broken_exit;
}
if (!taosCheckChecksumWhole((uint8_t *)pHeader, tmsize)) {
goto _broken_exit;
}
for (int sid = 0; sid < pVnode->cfg.maxSessions; sid++) {
if (pVnode->meterList == NULL) goto _correct_exit;
if (pVnode->meterList[sid] == NULL || pHeader[sid].compInfoOffset == 0) continue;
lseek(fd, pHeader[sid].compInfoOffset, SEEK_SET);
if (read(fd, (void *)(&compInfo), sizeof(SCompInfo)) != sizeof(SCompInfo)) {
goto _broken_exit;
}
if (!taosCheckChecksumWhole((uint8_t *)(&compInfo), sizeof(SCompInfo))) {
goto _broken_exit;
}
if (compInfo.uid != ((SMeterObj *)pVnode->meterList[sid])->uid) continue;
int expectedSize = sizeof(SCompBlock) * compInfo.numOfBlocks + sizeof(TSCKSUM);
if (blockSize < expectedSize) {
pBlocks = (SCompBlock *)realloc(pBlocks, expectedSize);
if (pBlocks == NULL) {
tfree(pHeader);
return 0;
}
blockSize = expectedSize;
}
if (read(fd, (void *)pBlocks, expectedSize) != expectedSize) {
goto _broken_exit;
}
if (!taosCheckChecksumWhole((uint8_t *)pBlocks, expectedSize)) {
goto _broken_exit;
}
}
_correct_exit:
dTrace("vid: %d new header file %s is correct", pVnode->vnode, pVnode->nfn);
tfree(pBlocks);
tfree(pHeader);
return 0;
_broken_exit:
dError("vid: %d new header file %s is broken", pVnode->vnode, pVnode->nfn);
tfree(pBlocks);
tfree(pHeader);
return -1;
}
\ No newline at end of file
......@@ -14,7 +14,6 @@
*/
#include <arpa/inet.h>
#include <sys/sendfile.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
......@@ -97,12 +96,12 @@ int vnodeCloseFileForImport(SMeterObj *pObj, SHeadInfo *pHinfo) {
if (pHinfo->newBlocks == 0 || pHinfo->compInfoOffset == 0) return 0;
if (pHinfo->oldNumOfBlocks == 0) write(pVnode->nfd, &chksum, sizeof(TSCKSUM));
if (pHinfo->oldNumOfBlocks == 0) twrite(pVnode->nfd, &chksum, sizeof(TSCKSUM));
int leftSize = pHinfo->hfdSize - pHinfo->leftOffset;
if (leftSize > 0) {
lseek(pVnode->hfd, pHinfo->leftOffset, SEEK_SET);
sendfile(pVnode->nfd, pVnode->hfd, NULL, leftSize);
tsendfile(pVnode->nfd, pVnode->hfd, NULL, leftSize);
}
pHinfo->compInfo.numOfBlocks += pHinfo->newBlocks;
......@@ -117,7 +116,7 @@ int vnodeCloseFileForImport(SMeterObj *pObj, SHeadInfo *pHinfo) {
lseek(pVnode->nfd, TSDB_FILE_HEADER_LEN, SEEK_SET);
int tmsize = sizeof(SCompHeader) * pCfg->maxSessions + sizeof(TSCKSUM);
taosCalcChecksumAppend(0, (uint8_t *)pHinfo->headList, tmsize);
write(pVnode->nfd, pHinfo->headList, tmsize);
twrite(pVnode->nfd, pHinfo->headList, tmsize);
int size = pHinfo->compInfo.numOfBlocks * sizeof(SCompBlock);
char *buffer = malloc(size);
......@@ -131,11 +130,11 @@ int vnodeCloseFileForImport(SMeterObj *pObj, SHeadInfo *pHinfo) {
taosCalcChecksumAppend(0, (uint8_t *)(&pHinfo->compInfo), sizeof(SCompInfo));
lseek(pVnode->nfd, pHinfo->compInfoOffset, SEEK_SET);
write(pVnode->nfd, &pHinfo->compInfo, sizeof(SCompInfo));
twrite(pVnode->nfd, &pHinfo->compInfo, sizeof(SCompInfo));
chksum = taosCalcChecksum(0, (uint8_t *)buffer, size);
lseek(pVnode->nfd, pHinfo->compInfoOffset + sizeof(SCompInfo) + size, SEEK_SET);
write(pVnode->nfd, &chksum, sizeof(TSCKSUM));
twrite(pVnode->nfd, &chksum, sizeof(TSCKSUM));
free(buffer);
vnodeCloseCommitFiles(pVnode);
......@@ -161,11 +160,11 @@ int vnodeProcessLastBlock(SImportInfo *pImport, SHeadInfo *pHinfo, SData *data[]
if (lastBlock.sversion != pObj->sversion) {
lseek(pVnode->lfd, lastBlock.offset, SEEK_SET);
lastBlock.offset = lseek(pVnode->dfd, 0, SEEK_END);
sendfile(pVnode->dfd, pVnode->lfd, NULL, lastBlock.len);
tsendfile(pVnode->dfd, pVnode->lfd, NULL, lastBlock.len);
lastBlock.last = 0;
lseek(pVnode->hfd, offset, SEEK_SET);
write(pVnode->hfd, &lastBlock, sizeof(SCompBlock));
twrite(pVnode->hfd, &lastBlock, sizeof(SCompBlock));
} else {
vnodeReadLastBlockToMem(pObj, &lastBlock, data);
pHinfo->compInfo.numOfBlocks--;
......@@ -221,8 +220,8 @@ int vnodeOpenFileForImport(SImportInfo *pImport, char *payload, SHeadInfo *pHinf
pHinfo->oldNumOfBlocks = pHinfo->compInfo.numOfBlocks;
lseek(pVnode->hfd, 0, SEEK_SET);
lseek(pVnode->nfd, 0, SEEK_SET);
sendfile(pVnode->nfd, pVnode->hfd, NULL, pHinfo->compInfoOffset);
write(pVnode->nfd, &pHinfo->compInfo, sizeof(SCompInfo));
tsendfile(pVnode->nfd, pVnode->hfd, NULL, pHinfo->compInfoOffset);
twrite(pVnode->nfd, &pHinfo->compInfo, sizeof(SCompInfo));
if (pHinfo->headList[pObj->sid].compInfoOffset > 0) lseek(pVnode->hfd, sizeof(SCompInfo), SEEK_CUR);
if (pVnode->commitFileId < pImport->fileId) {
......@@ -234,7 +233,7 @@ int vnodeOpenFileForImport(SImportInfo *pImport, char *payload, SHeadInfo *pHinf
// copy all existing compBlockInfo
lseek(pVnode->hfd, pHinfo->compInfoOffset + sizeof(SCompInfo), SEEK_SET);
if (pHinfo->compInfo.numOfBlocks > 0)
sendfile(pVnode->nfd, pVnode->hfd, NULL, pHinfo->compInfo.numOfBlocks * sizeof(SCompBlock));
tsendfile(pVnode->nfd, pVnode->hfd, NULL, pHinfo->compInfo.numOfBlocks * sizeof(SCompBlock));
} else if (pVnode->commitFileId == pImport->fileId) {
int slots = pImport->pos ? pImport->slot + 1 : pImport->slot;
......@@ -251,7 +250,7 @@ int vnodeOpenFileForImport(SImportInfo *pImport, char *payload, SHeadInfo *pHinf
if (pImport->slot > 0) {
lseek(pVnode->hfd, pHinfo->compInfoOffset + sizeof(SCompInfo), SEEK_SET);
sendfile(pVnode->nfd, pVnode->hfd, NULL, pImport->slot * sizeof(SCompBlock));
tsendfile(pVnode->nfd, pVnode->hfd, NULL, pImport->slot * sizeof(SCompBlock));
}
if (pImport->slot < pHinfo->compInfo.numOfBlocks)
......@@ -448,7 +447,7 @@ int vnodeImportToFile(SImportInfo *pImport) {
compBlock.last = headInfo.last;
vnodeWriteBlockToFile(pObj, &compBlock, data, cdata, rowsToWrite);
write(pVnode->nfd, &compBlock, sizeof(SCompBlock));
twrite(pVnode->nfd, &compBlock, sizeof(SCompBlock));
rowsToWrite = 0;
headInfo.newBlocks++;
......
......@@ -64,16 +64,16 @@ void vnodeCreateFileHeaderFd(int fd) {
sprintf(temp + sizeof(int16_t), "tsdb version: %s\n", version);
/* *((int16_t *)(temp + TSDB_FILE_HEADER_LEN/8)) = vnodeFileVersion; */
lseek(fd, 0, SEEK_SET);
write(fd, temp, lineLen);
twrite(fd, temp, lineLen);
// second line
memset(temp, 0, lineLen);
write(fd, temp, lineLen);
twrite(fd, temp, lineLen);
// the third/forth line is the dynamic info
memset(temp, 0, lineLen);
write(fd, temp, lineLen);
write(fd, temp, lineLen);
twrite(fd, temp, lineLen);
twrite(fd, temp, lineLen);
}
void vnodeGetHeadFileHeaderInfo(int fd, SVnodeHeadInfo* pHeadInfo) {
......@@ -83,7 +83,7 @@ void vnodeGetHeadFileHeaderInfo(int fd, SVnodeHeadInfo* pHeadInfo) {
void vnodeUpdateHeadFileHeader(int fd, SVnodeHeadInfo* pHeadInfo) {
lseek(fd, TSDB_FILE_HEADER_LEN / 4, SEEK_SET);
write(fd, pHeadInfo, sizeof(SVnodeHeadInfo));
twrite(fd, pHeadInfo, sizeof(SVnodeHeadInfo));
}
void vnodeCreateFileHeader(FILE* fp) {
......
......@@ -294,11 +294,11 @@ int taosOpenLogFileWithMaxLines(char *fn, int maxLines, int maxFileNum) {
lseek(logHandle->fd, 0, SEEK_END);
sprintf(name, "==================================================\n");
write(logHandle->fd, name, (uint32_t)strlen(name));
twrite(logHandle->fd, name, (uint32_t)strlen(name));
sprintf(name, " new log file \n");
write(logHandle->fd, name, (uint32_t)strlen(name));
twrite(logHandle->fd, name, (uint32_t)strlen(name));
sprintf(name, "==================================================\n");
write(logHandle->fd, name, (uint32_t)strlen(name));
twrite(logHandle->fd, name, (uint32_t)strlen(name));
return 0;
}
......@@ -355,7 +355,7 @@ void tprintf(const char *const flags, int dflag, const char *const format, ...)
if (tsAsyncLog) {
taosPushLogBuffer(logHandle, buffer, len);
} else {
write(logHandle->fd, buffer, len);
twrite(logHandle->fd, buffer, len);
}
if (taosLogMaxLines > 0) {
......@@ -365,7 +365,7 @@ void tprintf(const char *const flags, int dflag, const char *const format, ...)
}
}
if (dflag & DEBUG_SCREEN) write(1, buffer, (unsigned int)len);
if (dflag & DEBUG_SCREEN) twrite(1, buffer, (unsigned int)len);
}
void taosDumpData(unsigned char *msg, int len) {
......@@ -378,7 +378,7 @@ void taosDumpData(unsigned char *msg, int len) {
pos += 3;
if (c >= 16) {
temp[pos++] = '\n';
write(logHandle->fd, temp, (unsigned int)pos);
twrite(logHandle->fd, temp, (unsigned int)pos);
c = 0;
pos = 0;
}
......@@ -386,7 +386,7 @@ void taosDumpData(unsigned char *msg, int len) {
temp[pos++] = '\n';
write(logHandle->fd, temp, (unsigned int)pos);
twrite(logHandle->fd, temp, (unsigned int)pos);
return;
}
......@@ -430,7 +430,7 @@ void taosPrintLongString(const char *const flags, int dflag, const char *const f
}
}
if (dflag & DEBUG_SCREEN) write(1, buffer, (unsigned int)len);
if (dflag & DEBUG_SCREEN) twrite(1, buffer, (unsigned int)len);
}
void taosCloseLog() { taosCloseLogByFd(logHandle->fd); }
......@@ -559,7 +559,7 @@ void *taosAsyncOutputLog(void *param) {
while (1) {
log_size = taosPollLogBuffer(tLogBuff, tempBuffer, TSDB_DEFAULT_LOG_BUF_UNIT);
if (log_size) {
write(tLogBuff->fd, tempBuffer, log_size);
twrite(tLogBuff->fd, tempBuffer, log_size);
LOG_BUF_START(tLogBuff) = (LOG_BUF_START(tLogBuff) + log_size) % LOG_BUF_SIZE(tLogBuff);
} else {
break;
......
......@@ -19,6 +19,7 @@
#include <stdio.h>
#include <stdlib.h>
#include <limits.h>
#include <errno.h>
#include "os.h"
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册