diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 505d1849d855184e622085cb48b13befe9a4f6f4..e5692d154c6f65cf73182e2900e226974d015094 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -84,7 +84,7 @@ static bool onlyQueryMetricTags(SSqlCmd* pCmd); static bool hasUnsupportFunctionsForMetricQuery(SSqlCmd* pCmd); static bool functionCompatibleCheck(SSqlCmd* pCmd); -static void setColumnOffsetValueInResultset(SSqlCmd* pCmd); +static void setColumnOffsetValueInResultset(SSqlCmd* pCmd); static int32_t setGroupByClause(SSqlCmd* pCmd, tVariantList* pList); static int32_t setIntervalClause(SSqlCmd* pCmd, SQuerySQL* pQuerySql); @@ -97,7 +97,7 @@ static int32_t setFillPolicy(SSqlCmd* pCmd, SQuerySQL* pQuerySQL); static int32_t setOrderByClause(SSqlCmd* pCmd, SQuerySQL* pQuerySql, SSchema* pSchema, int32_t numOfCols); static int32_t tsRewriteFieldNameIfNecessary(SSqlCmd* pCmd); -static bool validateOneTags(SSqlCmd* pCmd, TAOS_FIELD* pTagField); +static bool validateOneTags(SSqlCmd* pCmd, TAOS_FIELD* pTagField); static int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo); static int32_t validateSqlFunctionInStreamSql(SSqlCmd* pCmd); static int32_t buildArithmeticExprString(tSQLExpr* pExpr, char** exprString, SColumnIdList* colIdList); @@ -106,10 +106,10 @@ static int32_t validateArithmeticSQLExpr(tSQLExpr* pExpr, SSchema* pSchema, int3 static int32_t validateDNodeConfig(tDCLSQL* pOptions); static int32_t validateColumnName(char* name); static int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo); -static bool hasTimestampForPointInterpQuery(SSqlCmd* pCmd); -static void updateTagColumnIndex(SSqlCmd* pCmd); +static bool hasTimestampForPointInterpQuery(SSqlCmd* pCmd); +static void updateTagColumnIndex(SSqlCmd* pCmd); static int32_t setLimitOffsetValueInfo(SSqlObj* pSql, SQuerySQL* pQuerySql); -static void addRequiredTagColumn(SSqlCmd* pCmd, int32_t tagColIndex); +static void addRequiredTagColumn(SSqlCmd* pCmd, int32_t tagColIndex); static int32_t parseCreateDBOptions(SCreateDBInfo* pCreateDbSql, SSqlCmd* pCmd); static int32_t tscQueryOnlyMetricTags(SSqlCmd* pCmd, bool* queryOnMetricTags) { @@ -202,13 +202,13 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { pCmd->command = TSDB_SQL_USE_DB; SSQLToken* pToken = &pInfo->pDCLInfo->a[0]; - - if (tscValidateName(pToken) != TSDB_CODE_SUCCESS) { - const char* msg1 = "invalid db name"; + + if (tscValidateName(pToken) != TSDB_CODE_SUCCESS) { + const char* msg1 = "invalid db name"; setErrMsg(pCmd, msg1); return TSDB_CODE_INVALID_SQL; } - + if (pToken->n > TSDB_DB_NAME_LEN) { setErrMsg(pCmd, msg); return TSDB_CODE_INVALID_SQL; @@ -390,15 +390,15 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { case DESCRIBE_TABLE: { pCmd->command = TSDB_SQL_DESCRIBE_TABLE; - SSQLToken* pToken = &pInfo->pDCLInfo->a[0]; + SSQLToken* pToken = &pInfo->pDCLInfo->a[0]; const char* msg = "table name is too long"; - if (tscValidateName(pToken) != TSDB_CODE_SUCCESS) { - const char* msg1 = "invalid table name"; + if (tscValidateName(pToken) != TSDB_CODE_SUCCESS) { + const char* msg1 = "invalid table name"; setErrMsg(pCmd, msg1); return TSDB_CODE_INVALID_SQL; } - + if (pToken->n > TSDB_METER_NAME_LEN) { setErrMsg(pCmd, msg); return TSDB_CODE_INVALID_SQL; @@ -575,12 +575,12 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { // metric name, create table by using dst SSQLToken* pToken = &(pInfo->pCreateTableInfo->usingInfo.metricName); - if (tscValidateName(pToken) != TSDB_CODE_SUCCESS) { + if (tscValidateName(pToken) != TSDB_CODE_SUCCESS) { setErrMsg(pCmd, msg); return TSDB_CODE_INVALID_SQL; } - int32_t ret = setMeterID(pSql, pToken); + int32_t ret = setMeterID(pSql, pToken); if (ret != TSDB_CODE_SUCCESS) { return ret; } @@ -710,7 +710,8 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { } /* - * check if fill operation is available, the fill operation is parsed and executed during query execution, not here. + * check if fill operation is available, the fill operation is parsed and executed during query execution, not + * here. */ if (pQuerySql->fillType != NULL) { if (pCmd->nAggTimeInterval == 0) { @@ -745,13 +746,13 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { setErrMsg(pCmd, msg); return TSDB_CODE_INVALID_SQL; } - - if (tscValidateName(&(pQuerySql->from)) != TSDB_CODE_SUCCESS) { - const char* msg = "invalid table name"; + + if (tscValidateName(&(pQuerySql->from)) != TSDB_CODE_SUCCESS) { + const char* msg = "invalid table name"; setErrMsg(pCmd, msg); return TSDB_CODE_INVALID_SQL; } - + if (setMeterID(pSql, &pQuerySql->from) != TSDB_CODE_SUCCESS) { const char* msg = "table name too long"; setErrMsg(pCmd, msg); @@ -953,7 +954,7 @@ int32_t setIntervalClause(SSqlCmd* pCmd, SQuerySQL* pQuerySql) { tscSqlExprInsert(pCmd, 0, TSDB_FUNC_TS, 0, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE); SColumnList ids = {.numOfCols = 1, .ids = {0}}; - int32_t ret = insertResultField(pCmd, 0, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, aAggs[TSDB_FUNC_TS].aName); + int32_t ret = insertResultField(pCmd, 0, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, aAggs[TSDB_FUNC_TS].aName); return ret; } @@ -984,11 +985,11 @@ int32_t setSlidingClause(SSqlCmd* pCmd, SQuerySQL* pQuerySql) { return TSDB_CODE_SUCCESS; } -int32_t setMeterID(SSqlObj *pSql, SSQLToken *pzTableName) { - SSqlCmd *pCmd = &(pSql->cmd); - int32_t ret = TSDB_CODE_SUCCESS; +int32_t setMeterID(SSqlObj* pSql, SSQLToken* pzTableName) { + SSqlCmd* pCmd = &(pSql->cmd); + int32_t ret = TSDB_CODE_SUCCESS; - //clear array + // clear array memset(pCmd->name, 0, tListLen(pCmd->name)); const char* msg = "name too long"; @@ -1607,7 +1608,7 @@ static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SSchema* pSchema, int32_t int16_t type = 0; int16_t bytes = 0; - char columnName[TSDB_COL_NAME_LEN] = {0}; + char columnName[TSDB_COL_NAME_LEN] = {0}; const char* msg1 = "not support column types"; if (functionID == TSDB_FUNC_SPREAD) { @@ -2621,7 +2622,7 @@ static int32_t buildTagQueryCondString(SSqlCmd* pCmd, tSQLExpr* pExpr, char** qu const char* msg0 = "invalid table name list"; const char* msg1 = "like operation is not allowed on numeric tags"; const char* msg2 = "in and query condition cannot be mixed up"; - STagCond* pCond = &pCmd->tagCond; + STagCond* pCond = &pCmd->tagCond; if (pExpr->nSQLOptr == TK_IN && pRight->nSQLOptr == TK_SET) { /* table name array list, invoke another routine */ @@ -3048,7 +3049,7 @@ int tableNameCompar(const void* lhs, const void* rhs) { } static int32_t setMetersIDForMetricQuery(SSqlObj* pSql, char* tmpTagCondBuf) { - SSqlCmd* pCmd = &pSql->cmd; + SSqlCmd* pCmd = &pSql->cmd; const char* msg = "meter name too long"; pCmd->tagCond.allocSize = 4096; @@ -3071,7 +3072,9 @@ static int32_t setMetersIDForMetricQuery(SSqlObj* pSql, char* tmpTagCondBuf) { extractDBName(pCmd->name, db); SSQLToken tDB = { - .z = db, .n = strlen(db), .type = TK_STRING, + .z = db, + .n = strlen(db), + .type = TK_STRING, }; char* acc = getAccountId(pSql); @@ -3297,9 +3300,9 @@ int32_t setFillPolicy(SSqlCmd* pCmd, SQuerySQL* pQuerySQL) { tVariantListItem* pItem = &pFillToken->a[0]; const int32_t START_INTERPO_COL_IDX = 1; - const char* msg = "illegal value or data overflow"; - const char* msg1 = "value is expected"; - const char* msg2 = "invalid fill option"; + const char* msg = "illegal value or data overflow"; + const char* msg1 = "value is expected"; + const char* msg2 = "invalid fill option"; if (pItem->pVar.nType != TSDB_DATA_TYPE_BINARY) { setErrMsg(pCmd, msg2); @@ -3518,7 +3521,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { pCmd->command = TSDB_SQL_ALTER_TABLE; if (tscValidateName(&(pAlterSQL->name)) != TSDB_CODE_SUCCESS) { - const char* msg = "invalid table name"; + const char* msg = "invalid table name"; setErrMsg(pCmd, msg); return TSDB_CODE_INVALID_SQL; } @@ -3658,7 +3661,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { srcFound = true; } - //todo extract method + // todo extract method if ((!dstFound) && (strncasecmp(tagName, pDstItem->pVar.pz, nameLen) == 0 && (pDstItem->pVar.nLen == nameLen))) { dstFound = true; } @@ -3818,7 +3821,7 @@ int32_t validateSqlFunctionInStreamSql(SSqlCmd* pCmd) { } int32_t validateFunctionsInIntervalOrGroupbyQuery(SSqlCmd* pCmd) { - bool isProjectionFunction = false; + bool isProjectionFunction = false; const char* msg = "column projection is not compatible with interval"; // multi-output set/ todo refactor @@ -3972,7 +3975,8 @@ int32_t setLimitOffsetValueInfo(SSqlObj* pSql, SQuerySQL* pQuerySql) { } /* - * Query results are empty. Therefore, the result is filled with 0 if count function is employed in selection clause. + * Query results are empty. Therefore, the result is filled with 0 if count function is employed in selection + * clause. * * The fill of empty result is required only when interval clause is absent. */ @@ -4037,7 +4041,7 @@ int32_t parseCreateDBOptions(SCreateDBInfo* pCreateDbSql, SSqlCmd* pCmd) { const char* msg0 = "invalid number of options"; const char* msg1 = "invalid time precision"; - SCreateDbMsg *pMsg = (SCreateDbMsg *) (pCmd->payload + tsRpcHeadSize + sizeof(SMgmtHead)); + SCreateDbMsg* pMsg = (SCreateDbMsg*)(pCmd->payload + tsRpcHeadSize + sizeof(SMgmtHead)); setCreateDBOption(pMsg, pCreateDbSql); if (pCreateDbSql->keep != NULL) { @@ -4063,7 +4067,7 @@ int32_t parseCreateDBOptions(SCreateDBInfo* pCreateDbSql, SSqlCmd* pCmd) { } } - SSQLToken *pToken = &pCreateDbSql->precision; + SSQLToken* pToken = &pCreateDbSql->precision; if (pToken->n > 0) { pToken->n = strdequote(pToken->z); @@ -4072,7 +4076,7 @@ int32_t parseCreateDBOptions(SCreateDBInfo* pCreateDbSql, SSqlCmd* pCmd) { // time precision for this db: million second pMsg->precision = TSDB_TIME_PRECISION_MILLI; } else if (strncmp(pToken->z, TSDB_TIME_PRECISION_MICRO_STR, pToken->n) == 0 && - strlen(TSDB_TIME_PRECISION_MICRO_STR) == pToken->n) { + strlen(TSDB_TIME_PRECISION_MICRO_STR) == pToken->n) { pMsg->precision = TSDB_TIME_PRECISION_MICRO; } else { setErrMsg(pCmd, msg1); diff --git a/src/inc/tutil.h b/src/inc/tutil.h index cd355717ba481b2ad7bb143a0b5a86c55142b3c2..41057d810d3f985e4ca161f6d9739e0c0bf947fb 100644 --- a/src/inc/tutil.h +++ b/src/inc/tutil.h @@ -242,6 +242,8 @@ int64_t __sync_add_and_fetch_64(int64_t *ptr, int64_t val); #define __sync_val_compare_and_swap_32 __sync_val_compare_and_swap #define __sync_add_and_fetch_64 __sync_add_and_fetch #define __sync_add_and_fetch_32 __sync_add_and_fetch +ssize_t tsendfile(int dfd, int sfd, off_t *offset, size_t size); +ssize_t twrite(int fd, void *buf, size_t n); #endif #ifdef __cplusplus diff --git a/src/kit/taosdump/taosdump.c b/src/kit/taosdump/taosdump.c index 5387beb1150dbb8f28bc46c437cab584e35c4136..a7874b7e5ad024cfd9f5dbdf6d8fadd4bc327117 100644 --- a/src/kit/taosdump/taosdump.c +++ b/src/kit/taosdump/taosdump.c @@ -626,7 +626,7 @@ int taosDumpDb(SDbInfo *dbInfo, struct arguments *arguments, FILE *fp) { strncpy(tableRecord.name, (char *)row[TSDB_SHOW_TABLES_NAME_INDEX], fields[TSDB_SHOW_TABLES_NAME_INDEX].bytes); strncpy(tableRecord.metric, (char *)row[TSDB_SHOW_TABLES_METRIC_INDEX], fields[TSDB_SHOW_TABLES_METRIC_INDEX].bytes); - write(fd, &tableRecord, sizeof(STableRecord)); + twrite(fd, &tableRecord, sizeof(STableRecord)); } taos_free_result(result); @@ -831,7 +831,7 @@ int32_t taosDumpMetric(char *metric, struct arguments *arguments, FILE *fp) { memset(&tableRecord, 0, sizeof(STableRecord)); strncpy(tableRecord.name, (char *)row[0], fields[0].bytes); strcpy(tableRecord.metric, metric); - write(fd, &tableRecord, sizeof(STableRecord)); + twrite(fd, &tableRecord, sizeof(STableRecord)); } taos_free_result(result); diff --git a/src/os/linux/inc/os.h b/src/os/linux/inc/os.h index cdfdfed36bcdee1af21fe79339015b8e19dd2614..f1b62d147c323f81006f68e1309dbb02b7af15f2 100644 --- a/src/os/linux/inc/os.h +++ b/src/os/linux/inc/os.h @@ -43,6 +43,7 @@ #include #include #include +#include bool taosCheckPthreadValid(pthread_t thread); diff --git a/src/os/linux/src/os.c b/src/os/linux/src/os.c index 785b1dd16e41618a1b9816ab40c18fa6c39bf5ea..e76f15e9ff4b3a3fae1379e321c71429e46e2762 100644 --- a/src/os/linux/src/os.c +++ b/src/os/linux/src/os.c @@ -251,3 +251,46 @@ int taosInitTimer(void *(*callback)(void *), int ms) { return 0; } + +ssize_t tsendfile(int dfd, int sfd, off_t *offset, size_t size) { + size_t leftbytes = size; + ssize_t sentbytes; + + while (leftbytes > 0) { + // TODO : Think to check if file is larger than 1GB + if (leftbytes > 1000000000) leftbytes = 1000000000; + sentbytes = sendfile(dfd, sfd, offset, leftbytes); + if (sentbytes == -1) { + if (errno == EINTR) { + continue; + } + else { + return -1; + } + } + + leftbytes -= sentbytes; + } + + return size; +} + +ssize_t twrite(int fd, void *buf, size_t n) { + size_t nleft, nwritten; + + nleft = n; + + while (nleft > 0) { + nwritten = write(fd, buf, nleft); + if (nwritten < 0) { + if (errno == EINTR) { + continue; + } + return -1; + } + nleft -= nwritten; + buf += nwritten; + } + + return n; +} \ No newline at end of file diff --git a/src/sdb/src/sdbEngine.c b/src/sdb/src/sdbEngine.c index f92fc663c1f753a03b17d57809504219f5454b70..47d04b1168580bf1e79a4ba32917868e79691a8f 100644 --- a/src/sdb/src/sdbEngine.c +++ b/src/sdb/src/sdbEngine.c @@ -62,7 +62,7 @@ void sdbFinishCommit(void *handle) { off_t offset = lseek(pTable->fd, 0, SEEK_END); assert(offset == pTable->size); - write(pTable->fd, &sdbEcommit, sizeof(sdbEcommit)); + twrite(pTable->fd, &sdbEcommit, sizeof(sdbEcommit)); pTable->size += sizeof(sdbEcommit); } @@ -112,7 +112,7 @@ int sdbOpenSdbFile(SSdbTable *pTable) { tclose(pTable->fd); return -1; } - write(pTable->fd, &(pTable->header), size); + twrite(pTable->fd, &(pTable->header), size); pTable->size += size; sdbFinishCommit(pTable); } else { @@ -401,7 +401,7 @@ int64_t sdbInsertRow(void *handle, void *row, int rowSize) { /* Update the disk content */ /* write(pTable->fd, &action, sizeof(action)); */ /* pTable->size += sizeof(action); */ - write(pTable->fd, rowHead, real_size); + twrite(pTable->fd, rowHead, real_size); pTable->size += real_size; sdbFinishCommit(pTable); @@ -497,7 +497,7 @@ int sdbDeleteRow(void *handle, void *row) { } /* write(pTable->fd, &action, sizeof(action)); */ /* pTable->size += sizeof(action); */ - write(pTable->fd, rowHead, total_size); + twrite(pTable->fd, rowHead, total_size); pTable->size += total_size; sdbFinishCommit(pTable); @@ -592,7 +592,7 @@ int sdbUpdateRow(void *handle, void *row, int updateSize, char isUpdated) { } /* write(pTable->fd, &action, sizeof(action)); */ /* pTable->size += sizeof(action); */ - write(pTable->fd, rowHead, real_size); + twrite(pTable->fd, rowHead, real_size); pMeta->id = pTable->id; pMeta->offset = pTable->size; @@ -675,7 +675,7 @@ int sdbBatchUpdateRow(void *handle, void *row, int rowSize) { taosCalcChecksumAppend(0, (uint8_t *)rowHead, sizeof(SRowHead) + rowHead->rowSize + sizeof(TSCKSUM)); pMeta->rowSize = rowHead->rowSize; lseek(pTable->fd, pTable->size, SEEK_SET); - write(pTable->fd, rowHead, sizeof(SRowHead) + rowHead->rowSize + sizeof(TSCKSUM)); + twrite(pTable->fd, rowHead, sizeof(SRowHead) + rowHead->rowSize + sizeof(TSCKSUM)); pTable->size += (sizeof(SRowHead) + rowHead->rowSize + sizeof(TSCKSUM)); sdbAddIntoUpdateList(pTable, SDB_TYPE_UPDATE, last_row); @@ -855,9 +855,9 @@ void sdbSaveSnapShot(void *handle) { memset(rowHead, 0, size); // Write the header - write(fd, &(pTable->header), sizeof(SSdbHeader)); + twrite(fd, &(pTable->header), sizeof(SSdbHeader)); size += sizeof(SSdbHeader); - write(fd, &sdbEcommit, sizeof(sdbEcommit)); + twrite(fd, &sdbEcommit, sizeof(sdbEcommit)); size += sizeof(sdbEcommit); while (1) { @@ -876,9 +876,9 @@ void sdbSaveSnapShot(void *handle) { /* write(fd, &action, sizeof(action)); */ /* size += sizeof(action); */ - write(fd, rowHead, real_size); + twrite(fd, rowHead, real_size); size += real_size; - write(fd, &sdbEcommit, sizeof(sdbEcommit)); + twrite(fd, &sdbEcommit, sizeof(sdbEcommit)); size += sizeof(sdbEcommit); numOfRows++; } diff --git a/src/system/src/vnodeFile.c b/src/system/src/vnodeFile.c index 98bc482d3a51fcbf31a437dd77f3995dede05ab5..69957a0b58f9778b51f6657ece94d2b459a1842f 100644 --- a/src/system/src/vnodeFile.c +++ b/src/system/src/vnodeFile.c @@ -17,7 +17,6 @@ #include #include #include -#include #include #include #include @@ -65,26 +64,7 @@ int vnodeRecoverCompHeader(int vnode, int fileId); int vnodeRecoverHeadFile(int vnode, int fileId); int vnodeRecoverDataFile(int vnode, int fileId); int vnodeForwardStartPosition(SQuery *pQuery, SCompBlock *pBlock, int32_t slotIdx, SVnodeObj *pVnode, SMeterObj *pObj); - -int64_t tsendfile(int dfd, int sfd, int64_t bytes) { - int64_t leftbytes = bytes; - off_t offset = 0; - int64_t sentbytes; - - while (leftbytes > 0) { - sentbytes = (leftbytes > 1000000000) ? 1000000000 : leftbytes; - sentbytes = sendfile(dfd, sfd, &offset, sentbytes); - if (sentbytes < 0) { - dError("send file failed,reason:%s", strerror(errno)); - return -1; - } - - leftbytes -= sentbytes; - // dTrace("sentbytes:%ld leftbytes:%ld", sentbytes, leftbytes); - } - - return bytes; -} +int vnodeCheckNewHeaderFile(int fd, SVnodeObj *pVnode); void vnodeGetHeadDataLname(char *headName, char *dataName, char *lastName, int vnode, int fileId) { if (headName != NULL) sprintf(headName, "%s/vnode%d/db/v%df%d.head", tsDirectory, vnode, vnode, fileId); @@ -172,7 +152,7 @@ int vnodeCreateEmptyCompFile(int vnode, int fileId) { taosCalcChecksumAppend(0, (uint8_t *)temp, size); lseek(tfd, TSDB_FILE_HEADER_LEN, SEEK_SET); - write(tfd, temp, size); + twrite(tfd, temp, size); free(temp); close(tfd); @@ -345,7 +325,7 @@ int vnodeOpenCommitFiles(SVnodeObj *pVnode, int noTempLast) { char *temp = malloc(size); memset(temp, 0, size); taosCalcChecksumAppend(0, (uint8_t *)temp, size); - write(pVnode->nfd, temp, size); + twrite(pVnode->nfd, temp, size); free(temp); pVnode->dfSize = lseek(pVnode->dfd, 0, SEEK_END); @@ -409,6 +389,11 @@ void vnodeCloseCommitFiles(SVnodeObj *pVnode) { int fileId; int ret; + // Check new if new header file is correct +#ifdef _CHECK_HEADER_FILE_ + assert(vnodeCheckNewHeaderFile(pVnode->nfd, pVnode) == 0); +#endif + close(pVnode->nfd); pVnode->nfd = 0; @@ -673,7 +658,7 @@ _again: pCompBlock->last = 0; pCompBlock->offset = lseek(pVnode->dfd, 0, SEEK_END); lseek(pVnode->lfd, pMeter->lastBlock.offset, SEEK_SET); - sendfile(pVnode->dfd, pVnode->lfd, NULL, pMeter->lastBlock.len); + tsendfile(pVnode->dfd, pVnode->lfd, NULL, pMeter->lastBlock.len); pVnode->dfSize = pCompBlock->offset + pMeter->lastBlock.len; headLen += sizeof(SCompBlock); @@ -779,7 +764,7 @@ _again: vnodeUpdateHeadFileHeader(pVnode->nfd, &headInfo); lseek(pVnode->nfd, TSDB_FILE_HEADER_LEN, SEEK_SET); taosCalcChecksumAppend(0, (uint8_t *)tmem, tmsize); - if (write(pVnode->nfd, tmem, tmsize) <= 0) { + if (twrite(pVnode->nfd, tmem, tmsize) <= 0) { dError("vid:%d sid:%d id:%s, failed to write:%s, error:%s", vnode, sid, pObj->meterId, pVnode->nfn, strerror(errno)); goto _over; @@ -802,7 +787,7 @@ _again: compInfo.delimiter = TSDB_VNODE_DELIMITER; taosCalcChecksumAppend(0, (uint8_t *)(&compInfo), sizeof(SCompInfo)); lseek(pVnode->nfd, pMeter->compInfoOffset, SEEK_SET); - if (write(pVnode->nfd, &compInfo, sizeof(compInfo)) <= 0) { + if (twrite(pVnode->nfd, &compInfo, sizeof(compInfo)) <= 0) { dError("vid:%d sid:%d id:%s, failed to write:%s, reason:%s", vnode, sid, pObj->meterId, pVnode->nfn, strerror(errno)); goto _over; @@ -815,10 +800,10 @@ _again: if (pMeter->changed) { int compBlockLen = pMeter->oldNumOfBlocks * sizeof(SCompBlock); read(pVnode->hfd, pOldCompBlocks, compBlockLen); - write(pVnode->nfd, pOldCompBlocks, compBlockLen); + twrite(pVnode->nfd, pOldCompBlocks, compBlockLen); chksum = taosCalcChecksum(0, pOldCompBlocks, compBlockLen); } else { - sendfile(pVnode->nfd, pVnode->hfd, NULL, pMeter->oldNumOfBlocks * sizeof(SCompBlock)); + tsendfile(pVnode->nfd, pVnode->hfd, NULL, pMeter->oldNumOfBlocks * sizeof(SCompBlock)); read(pVnode->hfd, &chksum, sizeof(TSCKSUM)); } } @@ -826,13 +811,13 @@ _again: if (pMeter->newNumOfBlocks) { chksum = taosCalcChecksum(chksum, (uint8_t *)(hmem + pMeter->tempHeadOffset), pMeter->newNumOfBlocks * sizeof(SCompBlock)); - if (write(pVnode->nfd, hmem + pMeter->tempHeadOffset, pMeter->newNumOfBlocks * sizeof(SCompBlock)) <= 0) { + if (twrite(pVnode->nfd, hmem + pMeter->tempHeadOffset, pMeter->newNumOfBlocks * sizeof(SCompBlock)) <= 0) { dError("vid:%d sid:%d id:%s, failed to write:%s, reason:%s", vnode, sid, pObj->meterId, pVnode->nfn, strerror(errno)); goto _over; } } - write(pVnode->nfd, &chksum, sizeof(TSCKSUM)); + twrite(pVnode->nfd, &chksum, sizeof(TSCKSUM)); } tfree(pOldCompBlocks); @@ -1221,7 +1206,7 @@ int vnodeWriteBlockToFile(SMeterObj *pObj, SCompBlock *pCompBlock, SData *data[] // Write SField part taosCalcChecksumAppend(0, (uint8_t *)fields, size); - wlen = write(dfd, fields, size); + wlen = twrite(dfd, fields, size); if (wlen <= 0) { tfree(fields); dError("vid:%d sid:%d id:%s, failed to write block, wlen:%d reason:%s", pObj->vnode, pObj->sid, pObj->meterId, wlen, @@ -1236,9 +1221,9 @@ int vnodeWriteBlockToFile(SMeterObj *pObj, SCompBlock *pCompBlock, SData *data[] // Write data part for (int i = 0; i < pObj->numOfColumns; ++i) { if (pCfg->compression) { - wlen = write(dfd, cdata[i]->data, cdata[i]->len + sizeof(TSCKSUM)); + wlen = twrite(dfd, cdata[i]->data, cdata[i]->len + sizeof(TSCKSUM)); } else { - wlen = write(dfd, data[i]->data, data[i]->len + sizeof(TSCKSUM)); + wlen = twrite(dfd, data[i]->data, data[i]->len + sizeof(TSCKSUM)); } if (wlen <= 0) { @@ -1814,3 +1799,71 @@ int vnodeRecoverDataFile(int vnode, int fileId) { assert(0); return 0; } + +int vnodeCheckNewHeaderFile(int fd, SVnodeObj *pVnode) { + SCompHeader *pHeader = NULL; + SCompBlock *pBlocks = NULL; + int blockSize = 0; + SCompInfo compInfo; + int tmsize = 0; + + tmsize = sizeof(SCompHeader) * pVnode->cfg.maxSessions + sizeof(TSCKSUM); + + pHeader = (SCompHeader *)malloc(tmsize); + if (pHeader == NULL) return 0; + + lseek(fd, TSDB_FILE_HEADER_LEN, SEEK_SET); + if (read(fd, (void *)pHeader, tmsize) != tmsize) { + goto _broken_exit; + } + + if (!taosCheckChecksumWhole((uint8_t *)pHeader, tmsize)) { + goto _broken_exit; + } + + for (int sid = 0; sid < pVnode->cfg.maxSessions; sid++) { + if (pVnode->meterList == NULL) goto _correct_exit; + if (pVnode->meterList[sid] == NULL || pHeader[sid].compInfoOffset == 0) continue; + lseek(fd, pHeader[sid].compInfoOffset, SEEK_SET); + + if (read(fd, (void *)(&compInfo), sizeof(SCompInfo)) != sizeof(SCompInfo)) { + goto _broken_exit; + } + + if (!taosCheckChecksumWhole((uint8_t *)(&compInfo), sizeof(SCompInfo))) { + goto _broken_exit; + } + + if (compInfo.uid != ((SMeterObj *)pVnode->meterList[sid])->uid) continue; + + int expectedSize = sizeof(SCompBlock) * compInfo.numOfBlocks + sizeof(TSCKSUM); + if (blockSize < expectedSize) { + pBlocks = (SCompBlock *)realloc(pBlocks, expectedSize); + if (pBlocks == NULL) { + tfree(pHeader); + return 0; + } + + blockSize = expectedSize; + } + + if (read(fd, (void *)pBlocks, expectedSize) != expectedSize) { + goto _broken_exit; + } + if (!taosCheckChecksumWhole((uint8_t *)pBlocks, expectedSize)) { + goto _broken_exit; + } + } + +_correct_exit: + dTrace("vid: %d new header file %s is correct", pVnode->vnode, pVnode->nfn); + tfree(pBlocks); + tfree(pHeader); + return 0; + +_broken_exit: + dError("vid: %d new header file %s is broken", pVnode->vnode, pVnode->nfn); + tfree(pBlocks); + tfree(pHeader); + return -1; +} \ No newline at end of file diff --git a/src/system/src/vnodeImport.c b/src/system/src/vnodeImport.c index 7f6d1bcc2ed37b9e7a8d59333caa3be6799a48a7..0e868c5b2cd2e19fe40c529764ee1e0477f43618 100644 --- a/src/system/src/vnodeImport.c +++ b/src/system/src/vnodeImport.c @@ -14,7 +14,6 @@ */ #include -#include #include #include #include @@ -97,12 +96,12 @@ int vnodeCloseFileForImport(SMeterObj *pObj, SHeadInfo *pHinfo) { if (pHinfo->newBlocks == 0 || pHinfo->compInfoOffset == 0) return 0; - if (pHinfo->oldNumOfBlocks == 0) write(pVnode->nfd, &chksum, sizeof(TSCKSUM)); + if (pHinfo->oldNumOfBlocks == 0) twrite(pVnode->nfd, &chksum, sizeof(TSCKSUM)); int leftSize = pHinfo->hfdSize - pHinfo->leftOffset; if (leftSize > 0) { lseek(pVnode->hfd, pHinfo->leftOffset, SEEK_SET); - sendfile(pVnode->nfd, pVnode->hfd, NULL, leftSize); + tsendfile(pVnode->nfd, pVnode->hfd, NULL, leftSize); } pHinfo->compInfo.numOfBlocks += pHinfo->newBlocks; @@ -117,7 +116,7 @@ int vnodeCloseFileForImport(SMeterObj *pObj, SHeadInfo *pHinfo) { lseek(pVnode->nfd, TSDB_FILE_HEADER_LEN, SEEK_SET); int tmsize = sizeof(SCompHeader) * pCfg->maxSessions + sizeof(TSCKSUM); taosCalcChecksumAppend(0, (uint8_t *)pHinfo->headList, tmsize); - write(pVnode->nfd, pHinfo->headList, tmsize); + twrite(pVnode->nfd, pHinfo->headList, tmsize); int size = pHinfo->compInfo.numOfBlocks * sizeof(SCompBlock); char *buffer = malloc(size); @@ -131,11 +130,11 @@ int vnodeCloseFileForImport(SMeterObj *pObj, SHeadInfo *pHinfo) { taosCalcChecksumAppend(0, (uint8_t *)(&pHinfo->compInfo), sizeof(SCompInfo)); lseek(pVnode->nfd, pHinfo->compInfoOffset, SEEK_SET); - write(pVnode->nfd, &pHinfo->compInfo, sizeof(SCompInfo)); + twrite(pVnode->nfd, &pHinfo->compInfo, sizeof(SCompInfo)); chksum = taosCalcChecksum(0, (uint8_t *)buffer, size); lseek(pVnode->nfd, pHinfo->compInfoOffset + sizeof(SCompInfo) + size, SEEK_SET); - write(pVnode->nfd, &chksum, sizeof(TSCKSUM)); + twrite(pVnode->nfd, &chksum, sizeof(TSCKSUM)); free(buffer); vnodeCloseCommitFiles(pVnode); @@ -161,11 +160,11 @@ int vnodeProcessLastBlock(SImportInfo *pImport, SHeadInfo *pHinfo, SData *data[] if (lastBlock.sversion != pObj->sversion) { lseek(pVnode->lfd, lastBlock.offset, SEEK_SET); lastBlock.offset = lseek(pVnode->dfd, 0, SEEK_END); - sendfile(pVnode->dfd, pVnode->lfd, NULL, lastBlock.len); + tsendfile(pVnode->dfd, pVnode->lfd, NULL, lastBlock.len); lastBlock.last = 0; lseek(pVnode->hfd, offset, SEEK_SET); - write(pVnode->hfd, &lastBlock, sizeof(SCompBlock)); + twrite(pVnode->hfd, &lastBlock, sizeof(SCompBlock)); } else { vnodeReadLastBlockToMem(pObj, &lastBlock, data); pHinfo->compInfo.numOfBlocks--; @@ -221,8 +220,8 @@ int vnodeOpenFileForImport(SImportInfo *pImport, char *payload, SHeadInfo *pHinf pHinfo->oldNumOfBlocks = pHinfo->compInfo.numOfBlocks; lseek(pVnode->hfd, 0, SEEK_SET); lseek(pVnode->nfd, 0, SEEK_SET); - sendfile(pVnode->nfd, pVnode->hfd, NULL, pHinfo->compInfoOffset); - write(pVnode->nfd, &pHinfo->compInfo, sizeof(SCompInfo)); + tsendfile(pVnode->nfd, pVnode->hfd, NULL, pHinfo->compInfoOffset); + twrite(pVnode->nfd, &pHinfo->compInfo, sizeof(SCompInfo)); if (pHinfo->headList[pObj->sid].compInfoOffset > 0) lseek(pVnode->hfd, sizeof(SCompInfo), SEEK_CUR); if (pVnode->commitFileId < pImport->fileId) { @@ -234,7 +233,7 @@ int vnodeOpenFileForImport(SImportInfo *pImport, char *payload, SHeadInfo *pHinf // copy all existing compBlockInfo lseek(pVnode->hfd, pHinfo->compInfoOffset + sizeof(SCompInfo), SEEK_SET); if (pHinfo->compInfo.numOfBlocks > 0) - sendfile(pVnode->nfd, pVnode->hfd, NULL, pHinfo->compInfo.numOfBlocks * sizeof(SCompBlock)); + tsendfile(pVnode->nfd, pVnode->hfd, NULL, pHinfo->compInfo.numOfBlocks * sizeof(SCompBlock)); } else if (pVnode->commitFileId == pImport->fileId) { int slots = pImport->pos ? pImport->slot + 1 : pImport->slot; @@ -251,7 +250,7 @@ int vnodeOpenFileForImport(SImportInfo *pImport, char *payload, SHeadInfo *pHinf if (pImport->slot > 0) { lseek(pVnode->hfd, pHinfo->compInfoOffset + sizeof(SCompInfo), SEEK_SET); - sendfile(pVnode->nfd, pVnode->hfd, NULL, pImport->slot * sizeof(SCompBlock)); + tsendfile(pVnode->nfd, pVnode->hfd, NULL, pImport->slot * sizeof(SCompBlock)); } if (pImport->slot < pHinfo->compInfo.numOfBlocks) @@ -448,7 +447,7 @@ int vnodeImportToFile(SImportInfo *pImport) { compBlock.last = headInfo.last; vnodeWriteBlockToFile(pObj, &compBlock, data, cdata, rowsToWrite); - write(pVnode->nfd, &compBlock, sizeof(SCompBlock)); + twrite(pVnode->nfd, &compBlock, sizeof(SCompBlock)); rowsToWrite = 0; headInfo.newBlocks++; diff --git a/src/system/src/vnodeUtil.c b/src/system/src/vnodeUtil.c index f9d1966378df2934461329d515d9a6292258dbf2..f8d50b6b81681496ca1daa93bd3e89520ad832b5 100644 --- a/src/system/src/vnodeUtil.c +++ b/src/system/src/vnodeUtil.c @@ -64,16 +64,16 @@ void vnodeCreateFileHeaderFd(int fd) { sprintf(temp + sizeof(int16_t), "tsdb version: %s\n", version); /* *((int16_t *)(temp + TSDB_FILE_HEADER_LEN/8)) = vnodeFileVersion; */ lseek(fd, 0, SEEK_SET); - write(fd, temp, lineLen); + twrite(fd, temp, lineLen); // second line memset(temp, 0, lineLen); - write(fd, temp, lineLen); + twrite(fd, temp, lineLen); // the third/forth line is the dynamic info memset(temp, 0, lineLen); - write(fd, temp, lineLen); - write(fd, temp, lineLen); + twrite(fd, temp, lineLen); + twrite(fd, temp, lineLen); } void vnodeGetHeadFileHeaderInfo(int fd, SVnodeHeadInfo* pHeadInfo) { @@ -83,7 +83,7 @@ void vnodeGetHeadFileHeaderInfo(int fd, SVnodeHeadInfo* pHeadInfo) { void vnodeUpdateHeadFileHeader(int fd, SVnodeHeadInfo* pHeadInfo) { lseek(fd, TSDB_FILE_HEADER_LEN / 4, SEEK_SET); - write(fd, pHeadInfo, sizeof(SVnodeHeadInfo)); + twrite(fd, pHeadInfo, sizeof(SVnodeHeadInfo)); } void vnodeCreateFileHeader(FILE* fp) { diff --git a/src/util/src/tlog.c b/src/util/src/tlog.c index 87447b75d0395187a97852f259256da1cd62ab63..b27161cc01977cd40575430766420d104d65b627 100644 --- a/src/util/src/tlog.c +++ b/src/util/src/tlog.c @@ -294,11 +294,11 @@ int taosOpenLogFileWithMaxLines(char *fn, int maxLines, int maxFileNum) { lseek(logHandle->fd, 0, SEEK_END); sprintf(name, "==================================================\n"); - write(logHandle->fd, name, (uint32_t)strlen(name)); + twrite(logHandle->fd, name, (uint32_t)strlen(name)); sprintf(name, " new log file \n"); - write(logHandle->fd, name, (uint32_t)strlen(name)); + twrite(logHandle->fd, name, (uint32_t)strlen(name)); sprintf(name, "==================================================\n"); - write(logHandle->fd, name, (uint32_t)strlen(name)); + twrite(logHandle->fd, name, (uint32_t)strlen(name)); return 0; } @@ -355,7 +355,7 @@ void tprintf(const char *const flags, int dflag, const char *const format, ...) if (tsAsyncLog) { taosPushLogBuffer(logHandle, buffer, len); } else { - write(logHandle->fd, buffer, len); + twrite(logHandle->fd, buffer, len); } if (taosLogMaxLines > 0) { @@ -365,7 +365,7 @@ void tprintf(const char *const flags, int dflag, const char *const format, ...) } } - if (dflag & DEBUG_SCREEN) write(1, buffer, (unsigned int)len); + if (dflag & DEBUG_SCREEN) twrite(1, buffer, (unsigned int)len); } void taosDumpData(unsigned char *msg, int len) { @@ -378,7 +378,7 @@ void taosDumpData(unsigned char *msg, int len) { pos += 3; if (c >= 16) { temp[pos++] = '\n'; - write(logHandle->fd, temp, (unsigned int)pos); + twrite(logHandle->fd, temp, (unsigned int)pos); c = 0; pos = 0; } @@ -386,7 +386,7 @@ void taosDumpData(unsigned char *msg, int len) { temp[pos++] = '\n'; - write(logHandle->fd, temp, (unsigned int)pos); + twrite(logHandle->fd, temp, (unsigned int)pos); return; } @@ -430,7 +430,7 @@ void taosPrintLongString(const char *const flags, int dflag, const char *const f } } - if (dflag & DEBUG_SCREEN) write(1, buffer, (unsigned int)len); + if (dflag & DEBUG_SCREEN) twrite(1, buffer, (unsigned int)len); } void taosCloseLog() { taosCloseLogByFd(logHandle->fd); } @@ -559,7 +559,7 @@ void *taosAsyncOutputLog(void *param) { while (1) { log_size = taosPollLogBuffer(tLogBuff, tempBuffer, TSDB_DEFAULT_LOG_BUF_UNIT); if (log_size) { - write(tLogBuff->fd, tempBuffer, log_size); + twrite(tLogBuff->fd, tempBuffer, log_size); LOG_BUF_START(tLogBuff) = (LOG_BUF_START(tLogBuff) + log_size) % LOG_BUF_SIZE(tLogBuff); } else { break; diff --git a/src/util/src/tutil.c b/src/util/src/tutil.c index 019690bf37b0bc5dd526f57fe9f8c891980ad93e..710d56603f59b9065cf194516446d2284196a73d 100644 --- a/src/util/src/tutil.c +++ b/src/util/src/tutil.c @@ -19,6 +19,7 @@ #include #include #include +#include #include "os.h"