diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 20dbc2114d7e6145c2d84852f70c527e5e2aff20..f11378f84ce216af5c6e8a5f8e73bc4dc46290c0 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -111,7 +111,12 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { dGError("msg:%p, type:%s pCont is NULL", pRpc, TMSG_INFO(pRpc->msgType)); terrno = TSDB_CODE_INVALID_MSG_LEN; goto _OVER; - } + } /* else if ((pRpc->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRpc->code == TSDB_CODE_RPC_BROKEN_LINK) && + (!IsReq(pRpc)) && (pRpc->pCont == NULL)) { + dGError("msg:%p, type:%s pCont is NULL, err: %s", pRpc, TMSG_INFO(pRpc->msgType), tstrerror(pRpc->code)); + terrno = pRpc->code; + goto _OVER; + }*/ if (pHandle->defaultNtype == NODE_END) { dGError("msg:%p, type:%s not processed since no handle", pRpc, TMSG_INFO(pRpc->msgType)); diff --git a/source/libs/index/inc/indexFstFile.h b/source/libs/index/inc/indexFstFile.h index 0ddffe7df0c0857b3c5c9ce2aaae856d708d2283..d15141f79af3112a55fa5fa09be1b8caa39f602e 100644 --- a/source/libs/index/inc/indexFstFile.h +++ b/source/libs/index/inc/indexFstFile.h @@ -44,6 +44,11 @@ typedef struct IFileCtx { bool readOnly; char buf[256]; int64_t size; + + char* wBuf; + int32_t wBufOffset; + int32_t wBufCap; + #ifdef USE_MMAP char* ptr; #endif diff --git a/source/libs/index/src/indexFstFile.c b/source/libs/index/src/indexFstFile.c index 5538584754a92f8e11ef0deecbfb28aee12ef8a9..4620af8694e538a3525adaed11a1277ec14cd0a9 100644 --- a/source/libs/index/src/indexFstFile.c +++ b/source/libs/index/src/indexFstFile.c @@ -38,14 +38,41 @@ static FORCE_INLINE void idxGenLRUKey(char* buf, const char* path, int32_t block return; } static FORCE_INLINE int idxFileCtxDoWrite(IFileCtx* ctx, uint8_t* buf, int len) { + int tlen = len; if (ctx->type == TFILE) { - int nwr = taosWriteFile(ctx->file.pFile, buf, len); - assert(nwr == len); + int32_t cap = ctx->file.wBufCap; + if (len + ctx->file.wBufOffset >= cap) { + int32_t nw = cap - ctx->file.wBufOffset; + memcpy(ctx->file.wBuf + ctx->file.wBufOffset, buf, nw); + taosWriteFile(ctx->file.pFile, ctx->file.wBuf, cap); + + memset(ctx->file.wBuf, 0, cap); + ctx->file.wBufOffset = 0; + + len -= nw; + buf += nw; + + nw = (len / cap) * cap; + if (nw != 0) { + taosWriteFile(ctx->file.pFile, buf, nw); + } + + len -= nw; + buf += nw; + if (len != 0) { + memcpy(ctx->file.wBuf, buf, len); + } + ctx->file.wBufOffset += len; + } else { + memcpy(ctx->file.wBuf + ctx->file.wBufOffset, buf, len); + ctx->file.wBufOffset += len; + } + } else { memcpy(ctx->mem.buf + ctx->offset, buf, len); } - ctx->offset += len; - return len; + ctx->offset += tlen; + return tlen; } static FORCE_INLINE int idxFileCtxDoRead(IFileCtx* ctx, uint8_t* buf, int len) { int nRead = 0; @@ -127,14 +154,22 @@ static int idxFileCtxDoReadFrom(IFileCtx* ctx, uint8_t* buf, int len, int32_t of } static FORCE_INLINE int idxFileCtxGetSize(IFileCtx* ctx) { if (ctx->type == TFILE) { - int64_t file_size = 0; - taosStatFile(ctx->file.buf, &file_size, NULL); - return (int)file_size; + if (ctx->file.readOnly == false) { + return ctx->offset; + } else { + int64_t file_size = 0; + taosStatFile(ctx->file.buf, &file_size, NULL); + return (int)file_size; + } } return 0; } static FORCE_INLINE int idxFileCtxDoFlush(IFileCtx* ctx) { if (ctx->type == TFILE) { + if (ctx->file.wBufOffset > 0) { + int32_t nw = taosWriteFile(ctx->file.pFile, ctx->file.wBuf, ctx->file.wBufOffset); + ctx->file.wBufOffset = 0; + } taosFsyncFile(ctx->file.pFile); } else { // do nothing @@ -157,10 +192,15 @@ IFileCtx* idxFileCtxCreate(WriterType type, const char* path, bool readOnly, int ctx->file.pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); taosFtruncateFile(ctx->file.pFile, 0); taosStatFile(path, &ctx->file.size, NULL); + + ctx->file.wBufOffset = 0; + ctx->file.wBufCap = kBlockSize * 4; + ctx->file.wBuf = taosMemoryCalloc(1, ctx->file.wBufCap); } else { ctx->file.pFile = taosOpenFile(path, TD_FILE_READ); - taosFStatFile(ctx->file.pFile, &ctx->file.size, NULL); + ctx->file.wBufOffset = 0; + #ifdef USE_MMAP ctx->file.ptr = (char*)tfMmapReadOnly(ctx->file.pFile, ctx->file.size); #endif @@ -195,17 +235,18 @@ void idxFileCtxDestroy(IFileCtx* ctx, bool remove) { if (ctx->type == TMEMORY) { taosMemoryFree(ctx->mem.buf); } else { + if (ctx->file.wBufOffset > 0) { + int32_t nw = taosWriteFile(ctx->file.pFile, ctx->file.wBuf, ctx->file.wBufOffset); + ctx->file.wBufOffset = 0; + } ctx->flush(ctx); + taosMemoryFreeClear(ctx->file.wBuf); taosCloseFile(&ctx->file.pFile); if (ctx->file.readOnly) { #ifdef USE_MMAP munmap(ctx->file.ptr, ctx->file.size); #endif } - if (ctx->file.readOnly == false) { - int64_t file_size = 0; - taosStatFile(ctx->file.buf, &file_size, NULL); - } if (remove) { unlink(ctx->file.buf); } diff --git a/source/os/src/osEnv.c b/source/os/src/osEnv.c index c4627ffb7559a045ffae9a32e9a01a87a0e32de9..7f0e6d1dee22972cc633c78421a3ee1e6b297e36 100644 --- a/source/os/src/osEnv.c +++ b/source/os/src/osEnv.c @@ -37,11 +37,11 @@ float tsNumOfCores = 0; int64_t tsTotalMemoryKB = 0; char *tsProcPath = NULL; -char tsSIMDBuiltins = 0; -char tsSSE42Enable = 0; -char tsAVXEnable = 0; -char tsAVX2Enable = 0; -char tsFMAEnable = 0; +char tsSIMDBuiltins = 0; +char tsSSE42Enable = 0; +char tsAVXEnable = 0; +char tsAVX2Enable = 0; +char tsFMAEnable = 0; void osDefaultInit() { taosSeedRand(taosSafeRand()); diff --git a/source/os/src/osString.c b/source/os/src/osString.c index f03778de2f8cf9020e9540c581c30f9ca44947b6..5419da1c0ce49fe8e61cbf030ad540918835f465 100644 --- a/source/os/src/osString.c +++ b/source/os/src/osString.c @@ -116,39 +116,36 @@ TdUcs4 *tasoUcs4Copy(TdUcs4 *target_ucs4, TdUcs4 *source_ucs4, int32_t len_ucs4) return memcpy(target_ucs4, source_ucs4, len_ucs4 * sizeof(TdUcs4)); } -int32_t taosUcs4ToMbs(TdUcs4 *ucs4, int32_t ucs4_max_len, char *mbs) { -#ifdef DISALLOW_NCHAR_WITHOUT_ICONV - printf("Nchar cannot be read and written without iconv, please install iconv library and recompile TDengine.\n"); - return -1; -#else - iconv_t cd = iconv_open(tsCharset, DEFAULT_UNICODE_ENCODEC); - size_t ucs4_input_len = ucs4_max_len; - size_t outLen = ucs4_max_len; - if (iconv(cd, (char **)&ucs4, &ucs4_input_len, &mbs, &outLen) == -1) { - iconv_close(cd); - return -1; - } - - iconv_close(cd); - return (int32_t)(ucs4_max_len - outLen); -#endif -} - typedef struct { iconv_t conv; int8_t inUse; } SConv; -SConv *gConv = NULL; -int32_t convUsed = 0; -int32_t gConvMaxNum = 0; +typedef enum { M2C = 0, C2M } ConvType; + +// 0: Mbs --> Ucs4 +// 1: Ucs4--> Mbs +SConv *gConv[2] = {NULL, NULL}; +int32_t convUsed[2] = {0, 0}; +int32_t gConvMaxNum[2] = {0, 0}; int32_t taosConvInit(void) { - gConvMaxNum = 512; - gConv = taosMemoryCalloc(gConvMaxNum, sizeof(SConv)); - for (int32_t i = 0; i < gConvMaxNum; ++i) { - gConv[i].conv = iconv_open(DEFAULT_UNICODE_ENCODEC, tsCharset); - if ((iconv_t)-1 == gConv[i].conv || (iconv_t)0 == gConv[i].conv) { + int8_t M2C = 0; + gConvMaxNum[M2C] = 512; + gConvMaxNum[1 - M2C] = 512; + + gConv[M2C] = taosMemoryCalloc(gConvMaxNum[M2C], sizeof(SConv)); + gConv[1 - M2C] = taosMemoryCalloc(gConvMaxNum[1 - M2C], sizeof(SConv)); + + for (int32_t i = 0; i < gConvMaxNum[M2C]; ++i) { + gConv[M2C][i].conv = iconv_open(DEFAULT_UNICODE_ENCODEC, tsCharset); + if ((iconv_t)-1 == gConv[M2C][i].conv || (iconv_t)0 == gConv[M2C][i].conv) { + return -1; + } + } + for (int32_t i = 0; i < gConvMaxNum[1 - M2C]; ++i) { + gConv[1 - M2C][i].conv = iconv_open(tsCharset, DEFAULT_UNICODE_ENCODEC); + if ((iconv_t)-1 == gConv[1 - M2C][i].conv || (iconv_t)0 == gConv[1 - M2C][i].conv) { return -1; } } @@ -157,23 +154,33 @@ int32_t taosConvInit(void) { } void taosConvDestroy() { - for (int32_t i = 0; i < gConvMaxNum; ++i) { - iconv_close(gConv[i].conv); + int8_t M2C = 0; + for (int32_t i = 0; i < gConvMaxNum[M2C]; ++i) { + iconv_close(gConv[M2C][i].conv); + } + for (int32_t i = 0; i < gConvMaxNum[1 - M2C]; ++i) { + iconv_close(gConv[1 - M2C][i].conv); } - taosMemoryFreeClear(gConv); - gConvMaxNum = -1; + taosMemoryFreeClear(gConv[M2C]); + taosMemoryFreeClear(gConv[1 - M2C]); + gConvMaxNum[M2C] = -1; + gConvMaxNum[1 - M2C] = -1; } -iconv_t taosAcquireConv(int32_t *idx) { - if (gConvMaxNum <= 0) { +iconv_t taosAcquireConv(int32_t *idx, ConvType type) { + if (gConvMaxNum[type] <= 0) { *idx = -1; - return iconv_open(DEFAULT_UNICODE_ENCODEC, tsCharset); + if (type == M2C) { + return iconv_open(DEFAULT_UNICODE_ENCODEC, tsCharset); + } else { + return iconv_open(tsCharset, DEFAULT_UNICODE_ENCODEC); + } } while (true) { - int32_t used = atomic_add_fetch_32(&convUsed, 1); - if (used > gConvMaxNum) { - used = atomic_sub_fetch_32(&convUsed, 1); + int32_t used = atomic_add_fetch_32(&convUsed[type], 1); + if (used > gConvMaxNum[type]) { + used = atomic_sub_fetch_32(&convUsed[type], 1); sched_yield(); continue; } @@ -181,31 +188,31 @@ iconv_t taosAcquireConv(int32_t *idx) { break; } - int32_t startId = taosGetSelfPthreadId() % gConvMaxNum; + int32_t startId = taosGetSelfPthreadId() % gConvMaxNum[type]; while (true) { - if (gConv[startId].inUse) { - startId = (startId + 1) % gConvMaxNum; + if (gConv[type][startId].inUse) { + startId = (startId + 1) % gConvMaxNum[type]; continue; } - int8_t old = atomic_val_compare_exchange_8(&gConv[startId].inUse, 0, 1); + int8_t old = atomic_val_compare_exchange_8(&gConv[type][startId].inUse, 0, 1); if (0 == old) { break; } } *idx = startId; - return gConv[startId].conv; + return gConv[type][startId].conv; } -void taosReleaseConv(int32_t idx, iconv_t conv) { +void taosReleaseConv(int32_t idx, iconv_t conv, ConvType type) { if (idx < 0) { iconv_close(conv); return; } - atomic_store_8(&gConv[idx].inUse, 0); - atomic_sub_fetch_32(&convUsed, 1); + atomic_store_8(&gConv[type][idx].inUse, 0); + atomic_sub_fetch_32(&convUsed[type], 1); } bool taosMbsToUcs4(const char *mbs, size_t mbsLength, TdUcs4 *ucs4, int32_t ucs4_max_len, int32_t *len) { @@ -216,15 +223,15 @@ bool taosMbsToUcs4(const char *mbs, size_t mbsLength, TdUcs4 *ucs4, int32_t ucs4 memset(ucs4, 0, ucs4_max_len); int32_t idx = -1; - iconv_t conv = taosAcquireConv(&idx); + iconv_t conv = taosAcquireConv(&idx, M2C); size_t ucs4_input_len = mbsLength; size_t outLeft = ucs4_max_len; if (iconv(conv, (char **)&mbs, &ucs4_input_len, (char **)&ucs4, &outLeft) == -1) { - taosReleaseConv(idx, conv); + taosReleaseConv(idx, conv, M2C); return false; } - taosReleaseConv(idx, conv); + taosReleaseConv(idx, conv, M2C); if (len != NULL) { *len = (int32_t)(ucs4_max_len - outLeft); if (*len < 0) { @@ -236,6 +243,24 @@ bool taosMbsToUcs4(const char *mbs, size_t mbsLength, TdUcs4 *ucs4, int32_t ucs4 #endif } +int32_t taosUcs4ToMbs(TdUcs4 *ucs4, int32_t ucs4_max_len, char *mbs) { +#ifdef DISALLOW_NCHAR_WITHOUT_ICONV + printf("Nchar cannot be read and written without iconv, please install iconv library and recompile TDengine.\n"); + return -1; +#else + + int32_t idx = -1; + iconv_t conv = taosAcquireConv(&idx, C2M); + size_t ucs4_input_len = ucs4_max_len; + size_t outLen = ucs4_max_len; + if (iconv(conv, (char **)&ucs4, &ucs4_input_len, &mbs, &outLen) == -1) { + taosReleaseConv(idx, conv, C2M); + return -1; + } + taosReleaseConv(idx, conv, C2M); + return (int32_t)(ucs4_max_len - outLen); +#endif +} bool taosValidateEncodec(const char *encodec) { #ifdef DISALLOW_NCHAR_WITHOUT_ICONV printf("Nchar cannot be read and written without iconv, please install iconv library and recompile TDengine.\n");