diff --git a/cmake/cmake.options b/cmake/cmake.options index 1a1a5b5d785007692fcc5982558024f4bf05b893..946eb5d2583096dab9faecc2e8b222c5c24377f5 100644 --- a/cmake/cmake.options +++ b/cmake/cmake.options @@ -18,6 +18,13 @@ IF(${TD_WINDOWS}) ON ) + MESSAGE("build iconv Win32") + option( + BUILD_WITH_ICONV + "If build iconv on Windows" + ON + ) + ENDIF () IF(${TD_LINUX} MATCHES TRUE) diff --git a/cmake/iconv_CMakeLists.txt.in b/cmake/iconv_CMakeLists.txt.in new file mode 100644 index 0000000000000000000000000000000000000000..31dfd829fcbc8a5cc6a4b28752eda76280b3c791 --- /dev/null +++ b/cmake/iconv_CMakeLists.txt.in @@ -0,0 +1,12 @@ + +# iconv +ExternalProject_Add(iconv + GIT_REPOSITORY https://github.com/win-iconv/win-iconv.git + GIT_TAG v0.0.8 + SOURCE_DIR "${CMAKE_CONTRIB_DIR}/iconv" + BINARY_DIR "" + CONFIGURE_COMMAND "" + BUILD_COMMAND "" + INSTALL_COMMAND "" + TEST_COMMAND "" + ) \ No newline at end of file diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index f87f660ab350eb64ea272ef085e8c14ccfe56eae..9cf68b87f9764b4670cf7091c2f7d83bbb347063 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -83,6 +83,11 @@ if(${BUILD_WITH_NURAFT}) cat("${CMAKE_SUPPORT_DIR}/nuraft_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) endif(${BUILD_WITH_NURAFT}) +# iconv +if(${BUILD_WITH_ICONV}) + cat("${CMAKE_SUPPORT_DIR}/iconv_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) +endif(${BUILD_WITH_ICONV}) + # download dependencies configure_file(${CONTRIB_TMP_FILE} "${CMAKE_CONTRIB_DIR}/deps-download/CMakeLists.txt") execute_process(COMMAND "${CMAKE_COMMAND}" -G "${CMAKE_GENERATOR}" . @@ -208,14 +213,10 @@ endif(${BUILD_WITH_TRAFT}) # LIBUV if(${BUILD_WITH_UV}) - if (NOT ${CMAKE_SYSTEM_NAME} MATCHES "Windows") - MESSAGE("Windows need set no-sign-compare") - add_compile_options(-Wno-sign-compare) - endif () - if (${CMAKE_SYSTEM_NAME} MATCHES "Windows") - file(READ "libuv/include/uv.h" CONTENTS) - string(REGEX REPLACE "/([\r]*)\nstruct uv_tcp_s {" "/\\1\ntypedef BOOL (PASCAL *LPFN_CONNECTEX) (SOCKET s, const struct sockaddr* name, int namelen, PVOID lpSendBuffer, DWORD dwSendDataLength,LPDWORD lpdwBytesSent, LPOVERLAPPED lpOverlapped);\\1\nstruct uv_tcp_s {" CONTENTS_NEW "${CONTENTS}") - file(WRITE "libuv/include/uv.h" "${CONTENTS_NEW}") + if (${TD_WINDOWS}) + file(READ "libuv/include/uv.h" CONTENTS) + string(REGEX REPLACE "/([\r]*)\nstruct uv_tcp_s {" "/\\1\ntypedef BOOL (PASCAL *LPFN_CONNECTEX) (SOCKET s, const struct sockaddr* name, int namelen, PVOID lpSendBuffer, DWORD dwSendDataLength,LPDWORD lpdwBytesSent, LPOVERLAPPED lpOverlapped);\\1\nstruct uv_tcp_s {" CONTENTS_NEW "${CONTENTS}") + file(WRITE "libuv/include/uv.h" "${CONTENTS_NEW}") endif () add_subdirectory(libuv) endif(${BUILD_WITH_UV}) @@ -249,10 +250,14 @@ endif(${BUILD_WITH_SQLITE}) # pthread if(${BUILD_PTHREAD}) - ADD_DEFINITIONS("-DPTW32_STATIC_LIB") - add_subdirectory(pthread-win32) + add_definitions(-DPTW32_STATIC_LIB) + add_subdirectory(pthread) endif(${BUILD_PTHREAD}) +# iconv +if(${BUILD_WITH_ICONV}) + add_subdirectory(iconv) +endif(${BUILD_WITH_ICONV}) # ================================================================================================ # Build test diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 32271ed15793ed3055e332160b53da30e46b9876..bd4cc52837e0d90f7adf1c7078eae2ccd70bc8bd 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -198,6 +198,11 @@ typedef struct { }; } SMsgHead; +typedef struct { + int32_t workerType; + int32_t streamTaskId; +} SStreamExecMsgHead; + // Submit message for one table typedef struct SSubmitBlk { int64_t uid; // table unique id @@ -1892,9 +1897,9 @@ static FORCE_INLINE void* tDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW) return buf; } typedef struct { - int8_t version; // for compatibility(default 0) - int8_t intervalUnit; // MACRO: TIME_UNIT_XXX - int8_t slidingUnit; // MACRO: TIME_UNIT_XXX + int8_t version; // for compatibility(default 0) + int8_t intervalUnit; // MACRO: TIME_UNIT_XXX + int8_t slidingUnit; // MACRO: TIME_UNIT_XXX char indexName[TSDB_INDEX_NAME_LEN]; char timezone[TD_TIMEZONE_LEN]; // sma data expired if timezone changes. int32_t exprLen; @@ -1902,7 +1907,7 @@ typedef struct { int64_t indexUid; tb_uid_t tableUid; // super/child/common table uid int64_t interval; - int64_t offset; // use unit by precision of DB + int64_t offset; // use unit by precision of DB int64_t sliding; char* expr; // sma expression char* tagsFilter; @@ -2048,27 +2053,19 @@ static FORCE_INLINE void* tDecodeTSma(void* buf, STSma* pSma) { buf = taosDecodeFixedI64(buf, &pSma->sliding); if (pSma->exprLen > 0) { - pSma->expr = (char*)calloc(pSma->exprLen, 1); - if (pSma->expr != NULL) { - buf = taosDecodeStringTo(buf, pSma->expr); - } else { + if ((buf = taosDecodeString(buf, &pSma->expr)) == NULL) { tdDestroyTSma(pSma); return NULL; } - } else { pSma->expr = NULL; } if (pSma->tagsFilterLen > 0) { - pSma->tagsFilter = (char*)calloc(pSma->tagsFilterLen, 1); - if (pSma->tagsFilter != NULL) { - buf = taosDecodeStringTo(buf, pSma->tagsFilter); - } else { + if ((buf = taosDecodeString(buf, &pSma->tagsFilter)) == NULL) { tdDestroyTSma(pSma); return NULL; } - } else { pSma->tagsFilter = NULL; } @@ -2311,7 +2308,7 @@ typedef struct { } SStreamTaskDeployRsp; typedef struct { - SMsgHead head; + SStreamExecMsgHead head; // TODO: other info needed by task } SStreamTaskExecReq; diff --git a/include/common/tvariant.h b/include/common/tvariant.h index 995015fe6301b950f2ae415aef791296e6621d28..63f305ab2da00915555dbe63ffbdaefb4d2af73c 100644 --- a/include/common/tvariant.h +++ b/include/common/tvariant.h @@ -31,7 +31,7 @@ typedef struct SVariant { uint64_t u; double d; char *pz; - wchar_t *wpz; + TdUcs4 *ucs4; SArray *arr; // only for 'in' query to hold value list, not value for a field }; } SVariant; diff --git a/include/os/osFile.h b/include/os/osFile.h index 209cecedf8946ae51da4cebdbd98dd73a017aedf..508a52267901994da5bb1d8468b7bf48992da357 100644 --- a/include/os/osFile.h +++ b/include/os/osFile.h @@ -22,15 +22,6 @@ extern "C" { #include "osSocket.h" -#if defined(WINDOWS) -typedef int32_t FileFd; -typedef int32_t SocketFd; -#else -typedef int32_t FileFd; -typedef int32_t SocketFd; -#endif - -int64_t taosRead(FileFd fd, void *buf, int64_t count); // If the error is in a third-party library, place this header file under the third-party library header file. #ifndef ALLOW_FORBID_FUNC #define open OPEN_FUNC_TAOS_FORBID @@ -42,6 +33,7 @@ int64_t taosRead(FileFd fd, void *buf, int64_t count); #define close CLOSE_FUNC_TAOS_FORBID #define fclose FCLOSE_FUNC_TAOS_FORBID #define fsync FSYNC_FUNC_TAOS_FORBID + #define getline GETLINE_FUNC_TAOS_FORBID // #define fflush FFLUSH_FUNC_TAOS_FORBID #endif @@ -49,15 +41,6 @@ int64_t taosRead(FileFd fd, void *buf, int64_t count); #define PATH_MAX 256 #endif -typedef int32_t FileFd; - -typedef struct TdFile { - pthread_rwlock_t rwlock; - int refId; - FileFd fd; - FILE *fp; -} * TdFilePtr, TdFile; - typedef struct TdFile *TdFilePtr; #define TD_FILE_CTEATE 0x0001 @@ -95,10 +78,6 @@ int64_t taosPReadFile(TdFilePtr pFile, void *buf, int64_t count, int64_t offset) int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count); void taosFprintfFile(TdFilePtr pFile, const char *format, ...); -#if defined(WINDOWS) -#define __restrict__ -#endif // WINDOWS - int64_t taosGetLineFile(TdFilePtr pFile, char ** __restrict__ ptrBuf); int32_t taosEOFFile(TdFilePtr pFile); @@ -111,15 +90,7 @@ int32_t taosRemoveFile(const char *path); void taosGetTmpfilePath(const char *inputTmpDir, const char *fileNamePrefix, char *dstPath); -#if defined(_TD_DARWIN_64) -typedef int32_t SocketFd; - -int64_t taosSendFile(SocketFd fdDst, FileFd pFileSrc, int64_t *offset, int64_t size); -int64_t taosFSendFile(FILE *pFileOut, FILE *pFileIn, int64_t *offset, int64_t size); -#else -int64_t taosSendFile(SocketFd fdDst, TdFilePtr pFileSrc, int64_t *offset, int64_t size); int64_t taosFSendFile(TdFilePtr pFileOut, TdFilePtr pFileIn, int64_t *offset, int64_t size); -#endif void *taosMmapReadOnlyFile(TdFilePtr pFile, int64_t length); bool taosValidFile(TdFilePtr pFile); diff --git a/include/os/osString.h b/include/os/osString.h index 88160dd69e419450d1ec481a83a28065df99186d..9c6d523ab2f239fd8c66cca9a68de5fff76c317f 100644 --- a/include/os/osString.h +++ b/include/os/osString.h @@ -20,16 +20,28 @@ extern "C" { #endif +typedef wchar_t TdWchar; +typedef int32_t TdUcs4; + +// If the error is in a third-party library, place this header file under the third-party library header file. +#ifndef ALLOW_FORBID_FUNC + #define iconv_open ICONV_OPEN_FUNC_TAOS_FORBID + #define iconv_close ICONV_CLOSE_FUNC_TAOS_FORBID + #define iconv ICONV_FUNC_TAOS_FORBID + #define wcwidth WCWIDTH_FUNC_TAOS_FORBID + #define wcswidth WCSWIDTH_FUNC_TAOS_FORBID + #define mbtowc MBTOWC_FUNC_TAOS_FORBID + #define mbstowcs MBSTOWCS_FUNC_TAOS_FORBID + #define wctomb WCTOMB_FUNC_TAOS_FORBID + #define wcstombs WCSTOMBS_FUNC_TAOS_FORBID + #define wcsncpy WCSNCPY_FUNC_TAOS_FORBID + #define wchar_t WCHAR_T_FUNC_TAOS_FORBID +#endif + #if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) #define tstrdup(str) _strdup(str) - #define tstrndup(str, size) _strndup(str, size) - int32_t tgetline(char **lineptr, size_t *n, FILE *stream); - int32_t twcslen(const wchar_t *wcs); #else #define tstrdup(str) strdup(str) - #define tstrndup(str, size) strndup(str, size) - #define tgetline(lineptr, n, stream) getline(lineptr, n, stream) - #define twcslen wcslen #endif #define tstrncpy(dst, src, size) \ @@ -38,14 +50,22 @@ extern "C" { (dst)[(size)-1] = 0; \ } while (0) +int32_t taosUcs4len(TdUcs4 *ucs4); int64_t taosStr2int64(const char *str); -// USE_LIBICONV -int32_t taosUcs4ToMbs(void *ucs4, int32_t ucs4_max_len, char *mbs); -bool taosMbsToUcs4(const char *mbs, size_t mbs_len, char *ucs4, int32_t ucs4_max_len, int32_t *len); -int32_t tasoUcs4Compare(void *f1_ucs4, void *f2_ucs4, int32_t bytes, int8_t ncharSize); +int32_t taosUcs4ToMbs(TdUcs4 *ucs4, int32_t ucs4_max_len, char *mbs); +bool taosMbsToUcs4(const char *mbs, size_t mbs_len, TdUcs4 *ucs4, int32_t ucs4_max_len, int32_t *len); +int32_t tasoUcs4Compare(TdUcs4 *f1_ucs4, TdUcs4 *f2_ucs4, int32_t bytes); +TdUcs4* tasoUcs4Copy(TdUcs4 *target_ucs4, TdUcs4 *source_ucs4, int32_t len_ucs4); bool taosValidateEncodec(const char *encodec); +int32_t taosWcharWidth(TdWchar wchar); +int32_t taosWcharsWidth(TdWchar *pWchar, int32_t size); +int32_t taosMbToWchar(TdWchar *pWchar, const char *pStr, int32_t size); +int32_t taosMbsToWchars(TdWchar *pWchars, const char *pStrs, int32_t size); +int32_t taosWcharToMb(char *pStr, TdWchar wchar); +int32_t taosWcharsToMbs(char *pStrs, TdWchar *pWchars, int32_t size); + #ifdef __cplusplus } #endif diff --git a/include/util/tcompare.h b/include/util/tcompare.h index 4c80eeb4f601034852acebd52b2e18b74c52d424..cc9e8ae4641138be528830e17467dab7897f0166 100644 --- a/include/util/tcompare.h +++ b/include/util/tcompare.h @@ -46,7 +46,7 @@ typedef struct SPatternCompareInfo { int32_t patternMatch(const char *pattern, const char *str, size_t size, const SPatternCompareInfo *pInfo); -int32_t WCSPatternMatch(const wchar_t *pattern, const wchar_t *str, size_t size, const SPatternCompareInfo *pInfo); +int32_t WCSPatternMatch(const TdUcs4 *pattern, const TdUcs4 *str, size_t size, const SPatternCompareInfo *pInfo); int32_t taosArrayCompareString(const void *a, const void *b); diff --git a/include/util/tdef.h b/include/util/tdef.h index 41a61ceb55bbb2edb72315297cb005ce55394e96..47fc61947386f5c73b2c80afb999a4d57981a1df 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -41,7 +41,7 @@ extern const int32_t TYPE_BYTES[15]; #define DOUBLE_BYTES sizeof(double) #define POINTER_BYTES sizeof(void *) // 8 by default assert(sizeof(ptrdiff_t) == sizseof(void*) #define TSDB_KEYSIZE sizeof(TSKEY) -#define TSDB_NCHAR_SIZE sizeof(int32_t) +#define TSDB_NCHAR_SIZE sizeof(TdUcs4) // NULL definition #define TSDB_DATA_BOOL_NULL 0x02 @@ -448,6 +448,11 @@ typedef struct { #define SND_UNIQUE_THREAD_NUM 2 #define SND_SHARED_THREAD_NUM 2 +enum { + SND_WORKER_TYPE__SHARED = 1, + SND_WORKER_TYPE__UNIQUE, +}; + #ifdef __cplusplus } #endif diff --git a/source/common/src/tvariant.c b/source/common/src/tvariant.c index af6152d3f437ba673d9f7969101c6631ffa4eb7c..c6aa1cb81d7a7fe1c48f90f37218a14e63f7cf07 100644 --- a/source/common/src/tvariant.c +++ b/source/common/src/tvariant.c @@ -199,8 +199,8 @@ void taosVariantCreateFromBinary(SVariant *pVar, const char *pz, size_t len, uin case TSDB_DATA_TYPE_NCHAR: { // here we get the nchar length from raw binary bits length size_t lenInwchar = len / TSDB_NCHAR_SIZE; - pVar->wpz = calloc(1, (lenInwchar + 1) * TSDB_NCHAR_SIZE); - memcpy(pVar->wpz, pz, lenInwchar * TSDB_NCHAR_SIZE); + pVar->ucs4 = calloc(1, (lenInwchar + 1) * TSDB_NCHAR_SIZE); + memcpy(pVar->ucs4, pz, lenInwchar * TSDB_NCHAR_SIZE); pVar->nLen = (int32_t)len; break; @@ -343,7 +343,7 @@ int32_t taosVariantToString(SVariant *pVar, char *dst) { case TSDB_DATA_TYPE_NCHAR: { dst[0] = '\''; - taosUcs4ToMbs(pVar->wpz, (twcslen(pVar->wpz) + 1) * TSDB_NCHAR_SIZE, dst + 1); + taosUcs4ToMbs(pVar->ucs4, (taosUcs4len(pVar->ucs4) + 1) * TSDB_NCHAR_SIZE, dst + 1); int32_t len = (int32_t)strlen(dst); dst[len] = '\''; dst[len + 1] = 0; @@ -384,7 +384,7 @@ static FORCE_INLINE int32_t convertToBoolImpl(char *pStr, int32_t len) { } } -static FORCE_INLINE int32_t wcsconvertToBoolImpl(wchar_t *pstr, int32_t len) { +static FORCE_INLINE int32_t wcsconvertToBoolImpl(TdUcs4 *pstr, int32_t len) { if ((wcsncasecmp(pstr, L"true", len) == 0) && (len == 4)) { return TSDB_TRUE; } else if (wcsncasecmp(pstr, L"false", len) == 0 && (len == 5)) { @@ -412,11 +412,11 @@ static int32_t toBinary(SVariant *pVariant, char **pDest, int32_t *pDestSize) { pBuf = realloc(pBuf, newSize + 1); } - taosUcs4ToMbs(pVariant->wpz, (int32_t)newSize, pBuf); - free(pVariant->wpz); + taosUcs4ToMbs(pVariant->ucs4, (int32_t)newSize, pBuf); + free(pVariant->ucs4); pBuf[newSize] = 0; } else { - taosUcs4ToMbs(pVariant->wpz, (int32_t)newSize, *pDest); + taosUcs4ToMbs(pVariant->ucs4, (int32_t)newSize, *pDest); } } else { @@ -460,8 +460,8 @@ static int32_t toNchar(SVariant *pVariant, char **pDest, int32_t *pDestSize) { } if (*pDest == pVariant->pz) { - wchar_t *pWStr = calloc(1, (nLen + 1) * TSDB_NCHAR_SIZE); - bool ret = taosMbsToUcs4(pDst, nLen, (char *)pWStr, (nLen + 1) * TSDB_NCHAR_SIZE, NULL); + TdUcs4 *pWStr = calloc(1, (nLen + 1) * TSDB_NCHAR_SIZE); + bool ret = taosMbsToUcs4(pDst, nLen, pWStr, (nLen + 1) * TSDB_NCHAR_SIZE, NULL); if (!ret) { tfree(pWStr); return -1; @@ -469,21 +469,21 @@ static int32_t toNchar(SVariant *pVariant, char **pDest, int32_t *pDestSize) { // free the binary buffer in the first place if (pVariant->nType == TSDB_DATA_TYPE_BINARY) { - free(pVariant->wpz); + free(pVariant->ucs4); } - pVariant->wpz = pWStr; - *pDestSize = twcslen(pVariant->wpz); + pVariant->ucs4 = pWStr; + *pDestSize = taosUcs4len(pVariant->ucs4); // shrink the allocate memory, no need to check here. - char *tmp = realloc(pVariant->wpz, (*pDestSize + 1) * TSDB_NCHAR_SIZE); + char *tmp = realloc(pVariant->ucs4, (*pDestSize + 1) * TSDB_NCHAR_SIZE); assert(tmp != NULL); - pVariant->wpz = (wchar_t *)tmp; + pVariant->ucs4 = (TdUcs4 *)tmp; } else { int32_t output = 0; - bool ret = taosMbsToUcs4(pDst, nLen, *pDest, (nLen + 1) * TSDB_NCHAR_SIZE, &output); + bool ret = taosMbsToUcs4(pDst, nLen, (TdUcs4*)*pDest, (nLen + 1) * TSDB_NCHAR_SIZE, &output); if (!ret) { return -1; } @@ -554,7 +554,7 @@ static FORCE_INLINE int32_t convertToInteger(SVariant *pVariant, int64_t *result *result = res; } else if (pVariant->nType == TSDB_DATA_TYPE_NCHAR) { errno = 0; - wchar_t *endPtr = NULL; + TdUcs4 *endPtr = NULL; SToken token = {0}; token.n = tGetToken(pVariant->pz, &token.type); @@ -564,7 +564,7 @@ static FORCE_INLINE int32_t convertToInteger(SVariant *pVariant, int64_t *result } if (token.type == TK_FLOAT) { - double v = wcstod(pVariant->wpz, &endPtr); + double v = wcstod(pVariant->ucs4, &endPtr); if (releaseVariantPtr) { free(pVariant->pz); pVariant->nLen = 0; @@ -583,7 +583,7 @@ static FORCE_INLINE int32_t convertToInteger(SVariant *pVariant, int64_t *result setNull((char *)result, type, tDataTypes[type].bytes); return 0; } else { - int64_t val = wcstoll(pVariant->wpz, &endPtr, 10); + int64_t val = wcstoll(pVariant->ucs4, &endPtr, 10); if (releaseVariantPtr) { free(pVariant->pz); pVariant->nLen = 0; @@ -649,7 +649,7 @@ static int32_t convertToBool(SVariant *pVariant, int64_t *pDest) { *pDest = ret; } else if (pVariant->nType == TSDB_DATA_TYPE_NCHAR) { int32_t ret = 0; - if ((ret = wcsconvertToBoolImpl(pVariant->wpz, pVariant->nLen)) < 0) { + if ((ret = wcsconvertToBoolImpl(pVariant->ucs4, pVariant->nLen)) < 0) { return ret; } *pDest = ret; @@ -899,7 +899,7 @@ int32_t tVariantDumpEx(SVariant *pVariant, char *payload, int16_t type, bool inc return -1; } } else { - wcsncpy((wchar_t *)payload, pVariant->wpz, pVariant->nLen); + tasoUcs4Copy((TdUcs4*)payload, pVariant->ucs4, pVariant->nLen); } } } else { @@ -913,7 +913,7 @@ int32_t tVariantDumpEx(SVariant *pVariant, char *payload, int16_t type, bool inc return -1; } } else { - memcpy(p, pVariant->wpz, pVariant->nLen); + memcpy(p, pVariant->ucs4, pVariant->nLen); newlen = pVariant->nLen; } @@ -979,7 +979,7 @@ int32_t taosVariantTypeSetType(SVariant *pVariant, char type) { pVariant->d = v; } else if (pVariant->nType == TSDB_DATA_TYPE_NCHAR) { errno = 0; - double v = wcstod(pVariant->wpz, NULL); + double v = wcstod(pVariant->ucs4, NULL); if ((errno == ERANGE && v == -1) || (isinf(v) || isnan(v))) { free(pVariant->pz); return -1; diff --git a/source/dnode/mgmt/impl/inc/dndSnode.h b/source/dnode/mgmt/impl/inc/dndSnode.h index b21e9191e8fbedc064f7b9823cec1e68acb70d90..f72d2a137a304d5e9058e84d140f077a15a42c9f 100644 --- a/source/dnode/mgmt/impl/inc/dndSnode.h +++ b/source/dnode/mgmt/impl/inc/dndSnode.h @@ -24,12 +24,17 @@ extern "C" { int32_t dndInitSnode(SDnode *pDnode); void dndCleanupSnode(SDnode *pDnode); -void dndProcessSnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); +// void dndProcessSnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); int32_t dndProcessCreateSnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t dndProcessDropSnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); +void dndProcessSnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); +void dndProcessSnodeUniqueMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); +void dndProcessSnodeSharedMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); +void dndProcessSnodeExecMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); + #ifdef __cplusplus } #endif -#endif /*_TD_DND_SNODE_H_*/ \ No newline at end of file +#endif /*_TD_DND_SNODE_H_*/ diff --git a/source/dnode/mgmt/impl/src/dndSnode.c b/source/dnode/mgmt/impl/src/dndSnode.c index 8667952f2c71a5febe171e0eae6af4e61ad725f4..ea06c8c7519e48b881faa0f2e5aca8e64072dad3 100644 --- a/source/dnode/mgmt/impl/src/dndSnode.c +++ b/source/dnode/mgmt/impl/src/dndSnode.c @@ -382,6 +382,12 @@ static void dndProcessSnodeSharedQueue(SDnode *pDnode, SRpcMsg *pMsg) { taosFreeQitem(pMsg); } +static FORCE_INLINE int32_t dndGetSWTypeFromMsg(SRpcMsg *pMsg) { + SStreamExecMsgHead *pHead = pMsg->pCont; + pHead->workerType = htonl(pHead->workerType); + return pHead->workerType; +} + static FORCE_INLINE int32_t dndGetSWIdFromMsg(SRpcMsg *pMsg) { SMsgHead *pHead = pMsg->pCont; pHead->streamTaskId = htonl(pHead->streamTaskId); @@ -450,6 +456,18 @@ void dndProcessSnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { dndWriteSnodeMsgToMgmtWorker(pDnode, pMsg); } +void dndProcessSnodeExecMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { + SSnode *pSnode = dndAcquireSnode(pDnode); + if (pSnode != NULL) { + int32_t workerType = dndGetSWTypeFromMsg(pMsg); + if (workerType == SND_WORKER_TYPE__SHARED) { + dndWriteSnodeMsgToWorker(pDnode, &pDnode->smgmt.sharedWorker, pMsg); + } else { + dndWriteSnodeMsgToWorkerByMsg(pDnode, pMsg); + } + } +} + void dndProcessSnodeUniqueMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { dndWriteSnodeMsgToWorkerByMsg(pDnode, pMsg); } diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index 15db36477d27f845bf4ca3bb1d03d9050c911f38..617b6c0fc374678d29f16c5f74e1876c44aa4d01 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -23,10 +23,11 @@ #include "dndTransport.h" #include "dndMgmt.h" #include "dndMnode.h" +#include "dndSnode.h" #include "dndVnodes.h" -#define INTERNAL_USER "_dnd" -#define INTERNAL_CKEY "_key" +#define INTERNAL_USER "_dnd" +#define INTERNAL_CKEY "_key" #define INTERNAL_SECRET "_pwd" static void dndInitMsgFp(STransMgmt *pMgmt) { @@ -153,10 +154,14 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CUR)] = dndProcessVnodeFetchMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CONSUME)] = dndProcessVnodeFetchMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY_HEARTBEAT)] = dndProcessVnodeFetchMsg; + + // Requests handled by SNODE + pMgmt->msgFp[TMSG_INDEX(TDMT_SND_TASK_DEPLOY)] = dndProcessSnodeMgmtMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_SND_TASK_EXEC)] = dndProcessSnodeExecMsg; } static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) { - SDnode * pDnode = parent; + SDnode *pDnode = parent; STransMgmt *pMgmt = &pDnode->tmgmt; tmsg_t msgType = pRsp->msgType; @@ -219,7 +224,7 @@ static void dndCleanupClient(SDnode *pDnode) { } static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) { - SDnode * pDnode = param; + SDnode *pDnode = param; STransMgmt *pMgmt = &pDnode->tmgmt; tmsg_t msgType = pReq->msgType; @@ -313,7 +318,7 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char SAuthReq authReq = {0}; tstrncpy(authReq.user, user, TSDB_USER_LEN); int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq); - void * pReq = rpcMallocCont(contLen); + void *pReq = rpcMallocCont(contLen); tSerializeSAuthReq(pReq, contLen, &authReq); SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528}; diff --git a/source/dnode/vnode/src/inc/tsdbDBDef.h b/source/dnode/vnode/src/inc/tsdbDBDef.h index 2e37b0ba45132a92e4de5b7e0a21e680564256bc..ca9b60049eeda2c6788e2675e9d6f2d69d2cf939 100644 --- a/source/dnode/vnode/src/inc/tsdbDBDef.h +++ b/source/dnode/vnode/src/inc/tsdbDBDef.h @@ -26,8 +26,9 @@ typedef struct SDBFile SDBFile; typedef DB_ENV* TDBEnv; struct SDBFile { - DB* pDB; - char* path; + int32_t fid; + DB* pDB; + char* path; }; int32_t tsdbOpenDBF(TDBEnv pEnv, SDBFile* pDBF); diff --git a/source/dnode/vnode/src/meta/metaBDBImpl.c b/source/dnode/vnode/src/meta/metaBDBImpl.c index df2d9604d28dfff0a86ff330196a7a4ae1d12197..99a2b272ed36313dfb3c940d08e35a834b3914bc 100644 --- a/source/dnode/vnode/src/meta/metaBDBImpl.c +++ b/source/dnode/vnode/src/meta/metaBDBImpl.c @@ -884,7 +884,7 @@ const char *metaSmaCursorNext(SMSmaCursor *pCur) { STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid) { STSmaWrapper *pSW = NULL; - pSW = calloc(sizeof(*pSW), 1); + pSW = calloc(1, sizeof(*pSW)); if (pSW == NULL) { return NULL; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 97c52f44ebbbb56181717ab4a308d0efda9ed8f9..6b2857c4110cb6121a7be585dd383873a300cc54 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3386,7 +3386,7 @@ void filterPrepare(void* expr, void* param) { if (size < (uint32_t)pSchema->bytes) { size = pSchema->bytes; } - // to make sure tonchar does not cause invalid write, since the '\0' needs at least sizeof(wchar_t) space. + // to make sure tonchar does not cause invalid write, since the '\0' needs at least sizeof(TdUcs4) space. pInfo->q = calloc(1, size + TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE); tVariantDump(pCond, pInfo->q, pSchema->type, true); } diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c index 42bfebc77bbcfc5ff0765af252093a226691437f..02a0b587d5793d64444fe70d6662019ce75dc07f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSma.c +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -156,10 +156,6 @@ static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SSmaEnv **pEnv) { return TSDB_CODE_FAILED; } - if (*pEnv) { - return TSDB_CODE_SUCCESS; - } - if (*pEnv == NULL) { if ((*pEnv = tsdbNewSmaEnv(pTsdb, path)) == NULL) { return TSDB_CODE_FAILED; @@ -260,10 +256,15 @@ static SSmaStatItem *tsdbNewSmaStatItem(int8_t state) { int32_t tsdbDestroySmaState(SSmaStat *pSmaStat) { if (pSmaStat) { // TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready. - SSmaStatItem *item = taosHashIterate(pSmaStat->smaStatItems, NULL); + void *item = taosHashIterate(pSmaStat->smaStatItems, NULL); while (item != NULL) { - tfree(item->pSma); - taosHashCleanup(item->expiredWindows); + SSmaStatItem *pItem = *(SSmaStatItem **)item; + if (pItem != NULL) { + tdDestroyTSma(pItem->pSma); + tfree(pItem->pSma); + taosHashCleanup(pItem->expiredWindows); + tfree(pItem); + } item = taosHashIterate(pSmaStat->smaStatItems, item); } taosHashCleanup(pSmaStat->smaStatItems); @@ -292,9 +293,10 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) { // init sma env tsdbLockRepo(pTsdb); - if (pTsdb->pTSmaEnv == NULL) { + pEnv = (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_load_ptr(&pTsdb->pTSmaEnv) : atomic_load_ptr(&pTsdb->pRSmaEnv); + if (pEnv == NULL) { char rname[TSDB_FILENAME_LEN] = {0}; - char aname[TSDB_FILENAME_LEN * 2 + 32] = {0}; // TODO: make TMPNAME_LEN public as TSDB_FILENAME_LEN? + char aname[TSDB_FILENAME_LEN] = {0}; // use TSDB_FILENAME_LEN currently SDiskID did = {0}; tfsAllocDisk(pTsdb->pTfs, TFS_PRIMARY_LEVEL, &did); @@ -315,11 +317,8 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) { return TSDB_CODE_FAILED; } - if (smaType == TSDB_SMA_TYPE_TIME_RANGE) { - atomic_store_ptr(&pTsdb->pTSmaEnv, pEnv); - } else { - atomic_store_ptr(&pTsdb->pRSmaEnv, pEnv); - } + (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_store_ptr(&pTsdb->pTSmaEnv, pEnv) + : atomic_store_ptr(&pTsdb->pRSmaEnv, pEnv); } tsdbUnlockRepo(pTsdb); @@ -359,8 +358,10 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, ETsdbSmaType smaType, char *msg) { SSmaStat *pStat = SMA_ENV_STAT(pEnv); SHashObj *pItemsHash = SMA_ENV_STAT_ITEMS(pEnv); + TASSERT(pEnv != NULL && pStat != NULL && pItemsHash != NULL); + tsdbRefSmaStat(pTsdb, pStat); - SSmaStatItem *pItem = (SSmaStatItem *)taosHashGet(pItemsHash, &indexUid, sizeof(indexUid)); + SSmaStatItem *pItem = taosHashGet(pItemsHash, &indexUid, sizeof(indexUid)); if (pItem == NULL) { pItem = tsdbNewSmaStatItem(TSDB_SMA_STAT_EXPIRED); // TODO use the real state if (pItem == NULL) { @@ -421,9 +422,9 @@ static int32_t tsdbResetExpiredWindow(STsdb *pTsdb, SSmaStat *pStat, int64_t ind tsdbRefSmaStat(pTsdb, pStat); if (pStat && pStat->smaStatItems) { - pItem = *(SSmaStatItem **)taosHashGet(pStat->smaStatItems, &indexUid, sizeof(indexUid)); + pItem = taosHashGet(pStat->smaStatItems, &indexUid, sizeof(indexUid)); } - if (pItem != NULL) { + if ((pItem != NULL) && ((pItem = *(SSmaStatItem **)pItem) != NULL)) { // pItem resides in hash buffer all the time unless drop sma index // TODO: multithread protect if (taosHashRemove(pItem->expiredWindows, &skey, sizeof(TSKEY)) != 0) { @@ -494,7 +495,7 @@ static int32_t tsdbGetSmaStorageLevel(int64_t interval, int8_t intervalUnit) { * @brief Insert TSma data blocks to DB File build by B+Tree * * @param pSmaH - * @param smaKey + * @param smaKey tableUid-colId-skeyOfWindow(8-2-8) * @param keyLen * @param pData * @param dataLen @@ -502,12 +503,11 @@ static int32_t tsdbGetSmaStorageLevel(int64_t interval, int8_t intervalUnit) { */ static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t keyLen, void *pData, uint32_t dataLen) { SDBFile *pDBFile = &pSmaH->dFile; - - // TODO: insert sma data blocks into B+Tree tsdbDebug("vgId:%d insert sma data blocks into %s: smaKey %" PRIx64 "-%" PRIu16 "-%" PRIx64 ", dataLen %d", REPO_ID(pSmaH->pTsdb), pDBFile->path, *(tb_uid_t *)smaKey, *(uint16_t *)POINTER_SHIFT(smaKey, 8), *(int64_t *)POINTER_SHIFT(smaKey, 10), dataLen); + // TODO: insert sma data blocks into B+Tree(TDB) if (tsdbSaveSmaToDB(pDBFile, smaKey, keyLen, pData, dataLen) != 0) { return TSDB_CODE_FAILED; } @@ -564,34 +564,34 @@ static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit return interval / 1e3; } else if (TIME_UNIT_NANOSECOND == intervalUnit) { // nano second return interval / 1e6; - } else { + } else { // ms return interval; } break; case TSDB_TIME_PRECISION_MICRO: if (TIME_UNIT_MICROSECOND == intervalUnit) { // us return interval; - } else if (TIME_UNIT_NANOSECOND == intervalUnit) { // nano second + } else if (TIME_UNIT_NANOSECOND == intervalUnit) { // ns return interval / 1e3; - } else { + } else { // ms return interval * 1e3; } break; case TSDB_TIME_PRECISION_NANO: - if (TIME_UNIT_MICROSECOND == intervalUnit) { + if (TIME_UNIT_MICROSECOND == intervalUnit) { // us return interval * 1e3; - } else if (TIME_UNIT_NANOSECOND == intervalUnit) { // nano second + } else if (TIME_UNIT_NANOSECOND == intervalUnit) { // ns return interval; - } else { + } else { // ms return interval * 1e6; } break; default: // ms if (TIME_UNIT_MICROSECOND == intervalUnit) { // us return interval / 1e3; - } else if (TIME_UNIT_NANOSECOND == intervalUnit) { // nano second + } else if (TIME_UNIT_NANOSECOND == intervalUnit) { // ns return interval / 1e6; - } else { + } else { // ms return interval; } break; @@ -663,9 +663,13 @@ static void tsdbDestroyTSmaWriteH(STSmaWriteH *pSmaH) { static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t storageLevel, int32_t fid) { STsdb *pTsdb = pSmaH->pTsdb; ASSERT(pSmaH->dFile.path == NULL && pSmaH->dFile.pDB == NULL); + + pSmaH->dFile.fid = fid; + char tSmaFile[TSDB_FILENAME_LEN] = {0}; snprintf(tSmaFile, TSDB_FILENAME_LEN, "v%df%d.tsma", REPO_ID(pTsdb), fid); pSmaH->dFile.path = strdup(tSmaFile); + return TSDB_CODE_SUCCESS; } @@ -705,7 +709,7 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) { STsdbCfg * pCfg = REPO_CFG(pTsdb); STSmaDataWrapper *pData = (STSmaDataWrapper *)msg; - if (!pTsdb->pTSmaEnv) { + if (!atomic_load_ptr(&pTsdb->pTSmaEnv)) { terrno = TSDB_CODE_INVALID_PTR; tsdbWarn("vgId:%d insert tSma data failed since pTSmaEnv is NULL", REPO_ID(pTsdb)); return terrno; @@ -883,15 +887,15 @@ static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey) { static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval, int8_t intervalUnit, tb_uid_t tableUid, col_id_t colId, TSKEY querySKey, int32_t nMaxResult) { - if (!pTsdb->pTSmaEnv) { + if (!atomic_load_ptr(&pTsdb->pTSmaEnv)) { terrno = TSDB_CODE_INVALID_PTR; tsdbWarn("vgId:%d getTSmaDataImpl failed since pTSmaEnv is NULL", REPO_ID(pTsdb)); return TSDB_CODE_FAILED; } tsdbRefSmaStat(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv)); - SSmaStatItem *pItem = *(SSmaStatItem **)taosHashGet(SMA_ENV_STAT_ITEMS(pTsdb->pTSmaEnv), &indexUid, sizeof(indexUid)); - if (pItem == NULL) { + SSmaStatItem *pItem = taosHashGet(SMA_ENV_STAT_ITEMS(pTsdb->pTSmaEnv), &indexUid, sizeof(indexUid)); + if ((pItem == NULL) || ((pItem = *(SSmaStatItem **)pItem) == NULL)) { // Normally pItem should not be NULL, mark all windows as expired and notify query module to fetch raw TS data if // it's NULL. tsdbUnRefSmaStat(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv)); diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index 218b53c2abe42a8d0f197fc11db395339d68f9a5..8285020e14a336c08d51287b544d2714e0029b18 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -144,6 +144,9 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { return -1; } + // record current timezone of server side + tstrncpy(vCreateSmaReq.tSma.timezone, tsTimezone, TD_TIMEZONE_LEN); + if (metaCreateTSma(pVnode->pMeta, &vCreateSmaReq) < 0) { // TODO: handle error tdDestroyTSma(&vCreateSmaReq.tSma); diff --git a/source/dnode/vnode/test/tsdbSmaTest.cpp b/source/dnode/vnode/test/tsdbSmaTest.cpp index 86958da406ebb32eb57fe86d1ad76ef84a889168..5a87c180b67ec371a30c969bd0931fdc166b6a44 100644 --- a/source/dnode/vnode/test/tsdbSmaTest.cpp +++ b/source/dnode/vnode/test/tsdbSmaTest.cpp @@ -49,7 +49,7 @@ TEST(testCase, tSma_Meta_Encode_Decode_Test) { STSmaWrapper tSmaWrapper = {.number = 1, .tSma = &tSma}; uint32_t bufLen = tEncodeTSmaWrapper(NULL, &tSmaWrapper); - void *buf = calloc(bufLen, 1); + void *buf = calloc(1, bufLen); ASSERT_NE(buf, nullptr); STSmaWrapper *pSW = (STSmaWrapper *)buf; @@ -84,6 +84,7 @@ TEST(testCase, tSma_Meta_Encode_Decode_Test) { } // resource release + tfree(pSW); tdDestroyTSma(&tSma); tdDestroyTSmaWrapper(&dstTSmaWrapper); } @@ -113,7 +114,7 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) { tSma.tableUid = tbUid; tSma.exprLen = strlen(expr); - tSma.expr = (char *)calloc(tSma.exprLen + 1, 1); + tSma.expr = (char *)calloc(1, tSma.exprLen + 1); ASSERT_NE(tSma.expr, nullptr); tstrncpy(tSma.expr, expr, tSma.exprLen + 1); @@ -251,12 +252,12 @@ TEST(testCase, tSma_Data_Insert_Query_Test) { tSma.tableUid = tbUid; tSma.exprLen = strlen(expr); - tSma.expr = (char *)calloc(tSma.exprLen + 1, 1); + tSma.expr = (char *)calloc(1, tSma.exprLen + 1); ASSERT_NE(tSma.expr, nullptr); tstrncpy(tSma.expr, expr, tSma.exprLen + 1); tSma.tagsFilterLen = strlen(tagsFilter); - tSma.tagsFilter = (char *)calloc(tSma.tagsFilterLen + 1, 1); + tSma.tagsFilter = (char *)calloc(1, tSma.tagsFilterLen + 1); ASSERT_NE(tSma.tagsFilter, nullptr); tstrncpy(tSma.tagsFilter, tagsFilter, tSma.tagsFilterLen + 1); @@ -273,20 +274,20 @@ TEST(testCase, tSma_Data_Insert_Query_Test) { // step 2: insert data STSmaDataWrapper *pSmaData = NULL; - STsdb tsdb = {0}; - STsdbCfg * pCfg = &tsdb.config; - - tsdb.pMeta = pMeta; - tsdb.vgId = 2; - tsdb.config.daysPerFile = 10; // default days is 10 - tsdb.config.keep1 = 30; - tsdb.config.keep2 = 90; - tsdb.config.keep = 365; - tsdb.config.precision = TSDB_TIME_PRECISION_MILLI; - tsdb.config.update = TD_ROW_OVERWRITE_UPDATE; - tsdb.config.compression = TWO_STAGE_COMP; - - switch (tsdb.config.precision) { + STsdb * pTsdb = (STsdb *)calloc(1, sizeof(STsdb)); + STsdbCfg * pCfg = &pTsdb->config; + + pTsdb->pMeta = pMeta; + pTsdb->vgId = 2; + pTsdb->config.daysPerFile = 10; // default days is 10 + pTsdb->config.keep1 = 30; + pTsdb->config.keep2 = 90; + pTsdb->config.keep = 365; + pTsdb->config.precision = TSDB_TIME_PRECISION_MILLI; + pTsdb->config.update = TD_ROW_OVERWRITE_UPDATE; + pTsdb->config.compression = TWO_STAGE_COMP; + + switch (pTsdb->config.precision) { case TSDB_TIME_PRECISION_MILLI: skey1 *= 1e3; break; @@ -304,12 +305,12 @@ TEST(testCase, tSma_Data_Insert_Query_Test) { SDiskCfg pDisks = {.level = 0, .primary = 1}; strncpy(pDisks.dir, "/var/lib/taos", TSDB_FILENAME_LEN); int32_t numOfDisks = 1; - tsdb.pTfs = tfsOpen(&pDisks, numOfDisks); - ASSERT_NE(tsdb.pTfs, nullptr); + pTsdb->pTfs = tfsOpen(&pDisks, numOfDisks); + ASSERT_NE(pTsdb->pTfs, nullptr); char *msg = (char *)calloc(1, 100); ASSERT_NE(msg, nullptr); - ASSERT_EQ(tsdbUpdateSmaWindow(&tsdb, TSDB_SMA_TYPE_TIME_RANGE, msg), 0); + ASSERT_EQ(tsdbUpdateSmaWindow(pTsdb, TSDB_SMA_TYPE_TIME_RANGE, msg), 0); // init int32_t allocCnt = 0; @@ -367,13 +368,13 @@ TEST(testCase, tSma_Data_Insert_Query_Test) { ASSERT_GE(bufSize, pSmaData->dataLen); // execute - ASSERT_EQ(tsdbInsertTSmaData(&tsdb, (char *)pSmaData), TSDB_CODE_SUCCESS); + ASSERT_EQ(tsdbInsertTSmaData(pTsdb, (char *)pSmaData), TSDB_CODE_SUCCESS); // step 3: query uint32_t checkDataCnt = 0; for (int32_t t = 0; t < numOfTables; ++t) { for (col_id_t c = 0; c < numOfCols; ++c) { - ASSERT_EQ(tsdbGetTSmaData(&tsdb, NULL, indexUid1, interval1, intervalUnit1, tbUid + t, + ASSERT_EQ(tsdbGetTSmaData(pTsdb, NULL, indexUid1, interval1, intervalUnit1, tbUid + t, c + PRIMARYKEY_TIMESTAMP_COL_ID, skey1, 1), TSDB_CODE_SUCCESS); ++checkDataCnt; @@ -383,9 +384,12 @@ TEST(testCase, tSma_Data_Insert_Query_Test) { printf("%s:%d The sma data check count for insert and query is %" PRIu32 "\n", __FILE__, __LINE__, checkDataCnt); // release data + tfree(msg); taosTZfree(buf); // release meta tdDestroyTSma(&tSma); + tfsClose(pTsdb->pTfs); + tsdbClose(pTsdb); metaClose(pMeta); } #endif diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 6db3abb9d602203d57ea4f279669895df7623b18..a37820634f9c8cc80d5cce61047d502facd0a8d2 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -622,7 +622,7 @@ static FORCE_INLINE int32_t MemRowAppend(const void* value, int32_t len, void* p // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long' int32_t output = 0; const char* rowEnd = tdRowEnd(rb->pBuf); - if (!taosMbsToUcs4(value, len, (char*)varDataVal(rowEnd), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) { + if (!taosMbsToUcs4(value, len, (TdUcs4*)varDataVal(rowEnd), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) { return TSDB_CODE_TSC_SQL_SYNTAX_ERROR; } varDataSetLen(rowEnd, output); @@ -725,7 +725,7 @@ static int32_t KvRowAppend(const void *value, int32_t len, void *param) { } else if (TSDB_DATA_TYPE_NCHAR == type) { // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long' int32_t output = 0; - if (!taosMbsToUcs4(value, len, varDataVal(pa->buf), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) { + if (!taosMbsToUcs4(value, len, (TdUcs4*)varDataVal(pa->buf), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) { return TSDB_CODE_TSC_SQL_SYNTAX_ERROR; } diff --git a/source/libs/parser/test/mockCatalogService.cpp b/source/libs/parser/test/mockCatalogService.cpp index 65d3f51dde3e1aaf7cba570f60b9f3f10bf5641c..1f333e49e7726f90567d437731c27b878e449046 100644 --- a/source/libs/parser/test/mockCatalogService.cpp +++ b/source/libs/parser/test/mockCatalogService.cpp @@ -13,12 +13,11 @@ * along with this program. If not, see . */ -#include "mockCatalogService.h" - #include #include #include #include "tdatablock.h" +#include "mockCatalogService.h" #include "tname.h" #include "ttypes.h" diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 93615b05652f9dd060f999e6d824e8738a2f469c..cbc3921711cd15b224b37ed68c823978ab89e21e 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -392,12 +392,12 @@ static int32_t createWindowLogicNodeByInterval(SLogicPlanContext* pCxt, SInterva return TSDB_CODE_OUT_OF_MEMORY; } - SValueNode* pIntervalNode = (SValueNode*)((SRawExprNode*)(pInterval->pInterval))->pNode; - pWindow->winType = WINDOW_TYPE_INTERVAL; - pWindow->interval = pIntervalNode->datum.i; + pWindow->interval = ((SValueNode*)pInterval->pInterval)->datum.i; + pWindow->intervalUnit = ((SValueNode*)pInterval->pInterval)->unit; pWindow->offset = (NULL != pInterval->pOffset ? ((SValueNode*)pInterval->pOffset)->datum.i : 0); pWindow->sliding = (NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->datum.i : pWindow->interval); + pWindow->slidingUnit = (NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->unit : pWindow->intervalUnit); int32_t code = TSDB_CODE_SUCCESS; diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c index 33ec8ab6efb19437c290fba480678a5c6aabc08f..58b5c8340a2152d3335d411be2d2cd58f0fd0caf 100644 --- a/source/libs/scalar/src/filter.c +++ b/source/libs/scalar/src/filter.c @@ -1813,7 +1813,7 @@ int32_t fltInitValFieldData(SFilterInfo *info) { if(type == TSDB_DATA_TYPE_NCHAR && (unit->compare.optr == OP_TYPE_MATCH || unit->compare.optr == OP_TYPE_NMATCH)){ char newValData[TSDB_REGEX_STRING_DEFAULT_LEN * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE] = {0}; - int32_t len = taosUcs4ToMbs(varDataVal(fi->data), varDataLen(fi->data), varDataVal(newValData)); + int32_t len = taosUcs4ToMbs((TdUcs4*)varDataVal(fi->data), varDataLen(fi->data), varDataVal(newValData)); if (len < 0){ qError("filterInitValFieldData taosUcs4ToMbs error 1"); return TSDB_CODE_QRY_APP_ERROR; @@ -2992,7 +2992,7 @@ bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDa if(info->cunits[uidx].dataType == TSDB_DATA_TYPE_NCHAR && (info->cunits[uidx].optr == OP_TYPE_MATCH || info->cunits[uidx].optr == OP_TYPE_NMATCH)){ char *newColData = calloc(info->cunits[uidx].dataSize * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE, 1); - int32_t len = taosUcs4ToMbs(varDataVal(colData), varDataLen(colData), varDataVal(newColData)); + int32_t len = taosUcs4ToMbs((TdUcs4*)varDataVal(colData), varDataLen(colData), varDataVal(newColData)); if (len < 0){ qError("castConvert1 taosUcs4ToMbs error"); }else{ @@ -3052,7 +3052,7 @@ bool filterExecuteImpl(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDataAg } else { if(cunit->dataType == TSDB_DATA_TYPE_NCHAR && (cunit->optr == OP_TYPE_MATCH || cunit->optr == OP_TYPE_NMATCH)){ char *newColData = calloc(cunit->dataSize * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE, 1); - int32_t len = taosUcs4ToMbs(varDataVal(colData), varDataLen(colData), varDataVal(newColData)); + int32_t len = taosUcs4ToMbs((TdUcs4*)varDataVal(colData), varDataLen(colData), varDataVal(newColData)); if (len < 0){ qError("castConvert1 taosUcs4ToMbs error"); }else{ @@ -3433,7 +3433,7 @@ int32_t filterConverNcharColumns(SFilterInfo* info, int32_t rows, bool *gotNchar varDataCopy(dst, src); continue; } - bool ret = taosMbsToUcs4(varDataVal(src), varDataLen(src), varDataVal(dst), bufSize, &len); + bool ret = taosMbsToUcs4(varDataVal(src), varDataLen(src), (TdUcs4*)varDataVal(dst), bufSize, &len); if(!ret) { qError("filterConverNcharColumns taosMbsToUcs4 error"); return TSDB_CODE_FAILED; diff --git a/source/libs/scalar/src/sclvector.c b/source/libs/scalar/src/sclvector.c index 0fe7bf6e369fc3c9e9b2d8658ccb45c513b255f3..17ac9b19fd5574b61beb25ec3198dace2992e5fe 100644 --- a/source/libs/scalar/src/sclvector.c +++ b/source/libs/scalar/src/sclvector.c @@ -316,7 +316,7 @@ int32_t vectorConvertFromVarData(SScalarParam* pIn, SScalarParam* pOut, int32_t tmp = realloc(tmp, bufSize); } - int len = taosUcs4ToMbs(varDataVal(pIn->data), varDataLen(pIn->data), tmp); + int len = taosUcs4ToMbs((TdUcs4*)varDataVal(pIn->data), varDataLen(pIn->data), tmp); if (len < 0){ sclError("castConvert taosUcs4ToMbs error 1"); tfree(tmp); diff --git a/source/libs/sync/inc/syncCommit.h b/source/libs/sync/inc/syncCommit.h new file mode 100644 index 0000000000000000000000000000000000000000..c76236d5bfd3ca3050c7dbe3900a5644c4388911 --- /dev/null +++ b/source/libs/sync/inc/syncCommit.h @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_LIBS_SYNC_COMMIT_H +#define _TD_LIBS_SYNC_COMMIT_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include +#include "syncInt.h" +#include "taosdef.h" + +// \* Leader i advances its commitIndex. +// \* This is done as a separate step from handling AppendEntries responses, +// \* in part to minimize atomic regions, and in part so that leaders of +// \* single-server clusters are able to mark entries committed. +// AdvanceCommitIndex(i) == +// /\ state[i] = Leader +// /\ LET \* The set of servers that agree up through index. +// Agree(index) == {i} \cup {k \in Server : +// matchIndex[i][k] >= index} +// \* The maximum indexes for which a quorum agrees +// agreeIndexes == {index \in 1..Len(log[i]) : +// Agree(index) \in Quorum} +// \* New value for commitIndex'[i] +// newCommitIndex == +// IF /\ agreeIndexes /= {} +// /\ log[i][Max(agreeIndexes)].term = currentTerm[i] +// THEN +// Max(agreeIndexes) +// ELSE +// commitIndex[i] +// IN commitIndex' = [commitIndex EXCEPT ![i] = newCommitIndex] +// /\ UNCHANGED <> +// +void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode); +bool syncAgreeIndex(SSyncNode* pSyncNode, SRaftId* pRaftId, SyncIndex index); +bool syncAgree(SSyncNode* pSyncNode, SyncIndex index); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_LIBS_SYNC_COMMIT_H*/ diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 5a9af83827b6d952ea36165d9a97524708c1c985..8e36424f192e03fc72b6878f184ce075ecbdd0bc 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -236,7 +236,6 @@ void syncNodeCandidate2Follower(SSyncNode* pSyncNode); // raft vote -------------- void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId); void syncNodeVoteForSelf(SSyncNode* pSyncNode); -void syncNodeMaybeAdvanceCommitIndex(SSyncNode* pSyncNode); // for debug -------------- void syncNodePrint(SSyncNode* pObj); diff --git a/source/libs/sync/inc/syncOnMessage.h b/source/libs/sync/inc/syncOnMessage.h index 7cb186a8121800134fad6d1896870e85cbe503b1..2f8856e652a84332a9455fb1f6bc26bf8a975e89 100644 --- a/source/libs/sync/inc/syncOnMessage.h +++ b/source/libs/sync/inc/syncOnMessage.h @@ -25,6 +25,46 @@ extern "C" { #include #include "taosdef.h" +// TLA+ Spec +// Receive(m) == +// LET i == m.mdest +// j == m.msource +// IN \* Any RPC with a newer term causes the recipient to advance +// \* its term first. Responses with stale terms are ignored. +// \/ UpdateTerm(i, j, m) +// \/ /\ m.mtype = RequestVoteRequest +// /\ HandleRequestVoteRequest(i, j, m) +// \/ /\ m.mtype = RequestVoteResponse +// /\ \/ DropStaleResponse(i, j, m) +// \/ HandleRequestVoteResponse(i, j, m) +// \/ /\ m.mtype = AppendEntriesRequest +// /\ HandleAppendEntriesRequest(i, j, m) +// \/ /\ m.mtype = AppendEntriesResponse +// /\ \/ DropStaleResponse(i, j, m) +// \/ HandleAppendEntriesResponse(i, j, m) + +// DuplicateMessage(m) == +// /\ Send(m) +// /\ UNCHANGED <> + +// DropMessage(m) == +// /\ Discard(m) +// /\ UNCHANGED <> + +// Next == /\ \/ \E i \in Server : Restart(i) +// \/ \E i \in Server : Timeout(i) +// \/ \E i,j \in Server : RequestVote(i, j) +// \/ \E i \in Server : BecomeLeader(i) +// \/ \E i \in Server, v \in Value : ClientRequest(i, v) +// \/ \E i \in Server : AdvanceCommitIndex(i) +// \/ \E i,j \in Server : AppendEntries(i, j) +// \/ \E m \in DOMAIN messages : Receive(m) +// \/ \E m \in DOMAIN messages : DuplicateMessage(m) +// \/ \E m \in DOMAIN messages : DropMessage(m) +// \* History variable that tracks every log ever: +// /\ allLogs' = allLogs \cup {log[i] : i \in Server} +// + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 7013d281e3f04a8614663c97ea8990359bac8482..270180e3477b4c6359feb07a918de56ac097fcef 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -94,6 +94,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { } assert(pMsg->term <= ths->pRaftStore->currentTerm); + // reset elect timer if (pMsg->term == ths->pRaftStore->currentTerm) { ths->leaderCache = pMsg->srcId; syncNodeResetElectTimer(ths); @@ -135,38 +136,117 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) { syncNodeBecomeFollower(ths); - // need ret? + // ret or reply? return ret; } // accept request if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_FOLLOWER && logOK) { - bool matchSuccess = false; + bool preMatch = false; if (pMsg->prevLogIndex == SYNC_INDEX_INVALID && ths->pLogStore->getLastIndex(ths->pLogStore) == SYNC_INDEX_INVALID) { - matchSuccess = true; + preMatch = true; } if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) { SSyncRaftEntry* pPreEntry = logStoreGetEntry(ths->pLogStore, pMsg->prevLogIndex); assert(pPreEntry != NULL); if (pMsg->prevLogTerm == pPreEntry->term) { - matchSuccess = true; + preMatch = true; } syncEntryDestory(pPreEntry); } - if (matchSuccess) { - // delete conflict entries - if (pMsg->prevLogIndex < ths->pLogStore->getLastIndex(ths->pLogStore)) { - SyncIndex fromIndex = pMsg->prevLogIndex + 1; - ths->pLogStore->truncate(ths->pLogStore, fromIndex); - } + if (preMatch) { + // must has preIndex in local log + assert(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)); + + bool hasExtraEntries = pMsg->prevLogIndex < ths->pLogStore->getLastIndex(ths->pLogStore); + bool hasAppendEntries = pMsg->dataLen > 0; + + if (hasExtraEntries && hasAppendEntries) { + // conflict + bool conflict = false; + + SyncIndex extraIndex = pMsg->prevLogIndex + 1; + SSyncRaftEntry* pExtraEntry = logStoreGetEntry(ths->pLogStore, extraIndex); + assert(pExtraEntry != NULL); + + SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); + assert(pAppendEntry != NULL); + + assert(extraIndex == pAppendEntry->index); + if (pExtraEntry->term == pAppendEntry->term) { + conflict = true; + } + + if (conflict) { + // roll back + SyncIndex delBegin = ths->pLogStore->getLastIndex(ths->pLogStore); + SyncIndex delEnd = extraIndex; + + // notice! reverse roll back! + for (SyncIndex index = delEnd; index >= delBegin; --index) { + if (ths->pFsm->FpRollBackCb != NULL) { + SSyncRaftEntry* pRollBackEntry = logStoreGetEntry(ths->pLogStore, index); + assert(pRollBackEntry != NULL); + + SRpcMsg rpcMsg; + syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg); + ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, pRollBackEntry->index, pRollBackEntry->isWeak, 0); + rpcFreeCont(rpcMsg.pCont); + syncEntryDestory(pRollBackEntry); + } + } + + // delete confict entries + ths->pLogStore->truncate(ths->pLogStore, extraIndex); + + // append new entries + ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry); + + // pre commit + SRpcMsg rpcMsg; + syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); + if (ths->pFsm != NULL) { + if (ths->pFsm->FpPreCommitCb != NULL) { + ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 0); + } + } + rpcFreeCont(rpcMsg.pCont); + } + + // free memory + syncEntryDestory(pExtraEntry); + syncEntryDestory(pAppendEntry); - // append one entry - if (pMsg->dataLen > 0) { - SSyncRaftEntry* pEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); - ths->pLogStore->appendEntry(ths->pLogStore, pEntry); - syncEntryDestory(pEntry); + } else if (hasExtraEntries && !hasAppendEntries) { + // do nothing + + } else if (!hasExtraEntries && hasAppendEntries) { + SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); + assert(pAppendEntry != NULL); + + // append new entries + ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry); + + // pre commit + SRpcMsg rpcMsg; + syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); + if (ths->pFsm != NULL) { + if (ths->pFsm->FpPreCommitCb != NULL) { + ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 0); + } + } + rpcFreeCont(rpcMsg.pCont); + + // free memory + syncEntryDestory(pAppendEntry); + + } else if (!hasExtraEntries && !hasAppendEntries) { + // do nothing + + } else { + assert(0); } SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(); @@ -175,7 +255,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { pReply->term = ths->pRaftStore->currentTerm; pReply->success = true; - if (pMsg->dataLen > 0) { + if (hasAppendEntries) { pReply->matchIndex = pMsg->prevLogIndex + 1; } else { pReply->matchIndex = pMsg->prevLogIndex; @@ -201,11 +281,38 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { syncAppendEntriesReplyDestroy(pReply); } + // maybe update commit index from leader if (pMsg->commitIndex > ths->commitIndex) { + // has commit entry in local if (pMsg->commitIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) { - // commit + SyncIndex beginIndex = ths->commitIndex + 1; + SyncIndex endIndex = pMsg->commitIndex; + + // update commit index ths->commitIndex = pMsg->commitIndex; + + // call back Wal ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex); + + // execute fsm + if (ths->pFsm != NULL) { + for (SyncIndex i = beginIndex; i <= endIndex; ++i) { + if (i != SYNC_INDEX_INVALID) { + SSyncRaftEntry* pEntry = ths->pLogStore->getEntry(ths->pLogStore, i); + assert(pEntry != NULL); + + SRpcMsg rpcMsg; + syncEntry2OriginalRpc(pEntry, &rpcMsg); + + if (ths->pFsm->FpCommitCb != NULL) { + ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0); + } + + rpcFreeCont(rpcMsg.pCont); + syncEntryDestory(pEntry); + } + } + } } } } diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 93539db9386dd061fa680a83fc1d2c955f933326..9db9a3e8ac532790dbcffff40f80b5950adc983c 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -14,6 +14,7 @@ */ #include "syncAppendEntriesReply.h" +#include "syncCommit.h" #include "syncIndexMgr.h" #include "syncInt.h" #include "syncRaftLog.h" @@ -59,7 +60,7 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex); // maybe commit - syncNodeMaybeAdvanceCommitIndex(ths); + syncMaybeAdvanceCommitIndex(ths); } else { SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index bc2a39aa89cb01b682567aaade00295e63911c23..c75d23d96d560c4c342631585d1903051bcd7c8c 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -13,9 +13,11 @@ * along with this program. If not, see . */ +#include "syncCommit.h" #include "syncIndexMgr.h" #include "syncInt.h" #include "syncRaftLog.h" +#include "syncRaftStore.h" // \* Leader i advances its commitIndex. // \* This is done as a separate step from handling AppendEntries responses, @@ -40,30 +42,80 @@ // IN commitIndex' = [commitIndex EXCEPT ![i] = newCommitIndex] // /\ UNCHANGED <> // -void syncNodeMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { +void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pNextIndex", pSyncNode->pNextIndex); syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pMatchIndex", pSyncNode->pMatchIndex); // update commit index + SyncIndex newCommitIndex = pSyncNode->commitIndex; + for (SyncIndex index = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore); index > pSyncNode->commitIndex; + ++index) { + if (syncAgree(pSyncNode, index)) { + // term + SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, index); + assert(pEntry != NULL); - if (pSyncNode->pFsm != NULL) { - SyncIndex beginIndex = SYNC_INDEX_INVALID; - SyncIndex endIndex = SYNC_INDEX_INVALID; - for (SyncIndex i = beginIndex; i <= endIndex; ++i) { - if (i != SYNC_INDEX_INVALID) { - SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, i); - assert(pEntry != NULL); + // cannot commit, even if quorum agree. need check term! + if (pEntry->term == pSyncNode->pRaftStore->currentTerm) { + // update commit index + newCommitIndex = index; + break; + } + } + } - SRpcMsg rpcMsg; - syncEntry2OriginalRpc(pEntry, &rpcMsg); + if (newCommitIndex > pSyncNode->commitIndex) { + SyncIndex beginIndex = pSyncNode->commitIndex + 1; + SyncIndex endIndex = newCommitIndex; - if (pSyncNode->pFsm->FpCommitCb != NULL) { - pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0); - } + // update commit index + pSyncNode->commitIndex = newCommitIndex; + + // call back Wal + pSyncNode->pLogStore->updateCommitIndex(pSyncNode->pLogStore, pSyncNode->commitIndex); + + // execute fsm + if (pSyncNode->pFsm != NULL) { + for (SyncIndex i = beginIndex; i <= endIndex; ++i) { + if (i != SYNC_INDEX_INVALID) { + SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, i); + assert(pEntry != NULL); + + SRpcMsg rpcMsg; + syncEntry2OriginalRpc(pEntry, &rpcMsg); + + if (pSyncNode->pFsm->FpCommitCb != NULL) { + pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0); + } - rpcFreeCont(rpcMsg.pCont); - syncEntryDestory(pEntry); + rpcFreeCont(rpcMsg.pCont); + syncEntryDestory(pEntry); + } } } } +} + +bool syncAgreeIndex(SSyncNode* pSyncNode, SRaftId* pRaftId, SyncIndex index) { + SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, pRaftId); + + // b for debug + bool b = false; + if (matchIndex >= index) { + b = true; + } + return b; +} + +bool syncAgree(SSyncNode* pSyncNode, SyncIndex index) { + int agreeCount = 0; + for (int i = 0; i < pSyncNode->replicaNum; ++i) { + if (syncAgreeIndex(pSyncNode, &(pSyncNode->replicasId[i]), index)) { + ++agreeCount; + } + if (agreeCount >= pSyncNode->quorum) { + return true; + } + } + return false; } \ No newline at end of file diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 1f3e709a27e7001e71f064b75d718dfa8f0b0bc9..6c2ef0c85b92f8d2a5ad4a9e9860976ddd5eea90 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -17,6 +17,7 @@ #include "sync.h" #include "syncAppendEntries.h" #include "syncAppendEntriesReply.h" +#include "syncCommit.h" #include "syncElection.h" #include "syncEnv.h" #include "syncIndexMgr.h" @@ -150,6 +151,30 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { // init life cycle + // TLA+ Spec + // InitHistoryVars == /\ elections = {} + // /\ allLogs = {} + // /\ voterLog = [i \in Server |-> [j \in {} |-> <<>>]] + // InitServerVars == /\ currentTerm = [i \in Server |-> 1] + // /\ state = [i \in Server |-> Follower] + // /\ votedFor = [i \in Server |-> Nil] + // InitCandidateVars == /\ votesResponded = [i \in Server |-> {}] + // /\ votesGranted = [i \in Server |-> {}] + // \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the + // \* leader does not send itself messages. It's still easier to include these + // \* in the functions. + // InitLeaderVars == /\ nextIndex = [i \in Server |-> [j \in Server |-> 1]] + // /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]] + // InitLogVars == /\ log = [i \in Server |-> << >>] + // /\ commitIndex = [i \in Server |-> 0] + // Init == /\ messages = [m \in {} |-> 0] + // /\ InitHistoryVars + // /\ InitServerVars + // /\ InitCandidateVars + // /\ InitLeaderVars + // /\ InitLogVars + // + // init TLA+ server vars pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; pSyncNode->pRaftStore = raftStoreOpen(pSyncNode->raftStorePath); @@ -727,6 +752,16 @@ static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) { return ret; } +// TLA+ Spec +// ClientRequest(i, v) == +// /\ state[i] = Leader +// /\ LET entry == [term |-> currentTerm[i], +// value |-> v] +// newLog == Append(log[i], entry) +// IN log' = [log EXCEPT ![i] = newLog] +// /\ UNCHANGED <> +// static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) { int32_t ret = 0; syncClientRequestLog2("==syncNodeOnClientRequestCb==", pMsg); @@ -740,7 +775,7 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg ths->pLogStore->appendEntry(ths->pLogStore, pEntry); // only myself, maybe commit - syncNodeMaybeAdvanceCommitIndex(ths); + syncMaybeAdvanceCommitIndex(ths); // start replicate right now! syncNodeReplicate(ths); diff --git a/source/libs/sync/src/syncOnMessage.c b/source/libs/sync/src/syncOnMessage.c index 19a97ee1566d32ed0fc8786c4d75542c2c435f30..ce8bed9cd39c44df9b090ae931cba063d1dda53c 100644 --- a/source/libs/sync/src/syncOnMessage.c +++ b/source/libs/sync/src/syncOnMessage.c @@ -14,3 +14,43 @@ */ #include "syncOnMessage.h" + +// TLA+ Spec +// Receive(m) == +// LET i == m.mdest +// j == m.msource +// IN \* Any RPC with a newer term causes the recipient to advance +// \* its term first. Responses with stale terms are ignored. +// \/ UpdateTerm(i, j, m) +// \/ /\ m.mtype = RequestVoteRequest +// /\ HandleRequestVoteRequest(i, j, m) +// \/ /\ m.mtype = RequestVoteResponse +// /\ \/ DropStaleResponse(i, j, m) +// \/ HandleRequestVoteResponse(i, j, m) +// \/ /\ m.mtype = AppendEntriesRequest +// /\ HandleAppendEntriesRequest(i, j, m) +// \/ /\ m.mtype = AppendEntriesResponse +// /\ \/ DropStaleResponse(i, j, m) +// \/ HandleAppendEntriesResponse(i, j, m) + +// DuplicateMessage(m) == +// /\ Send(m) +// /\ UNCHANGED <> + +// DropMessage(m) == +// /\ Discard(m) +// /\ UNCHANGED <> + +// Next == /\ \/ \E i \in Server : Restart(i) +// \/ \E i \in Server : Timeout(i) +// \/ \E i,j \in Server : RequestVote(i, j) +// \/ \E i \in Server : BecomeLeader(i) +// \/ \E i \in Server, v \in Value : ClientRequest(i, v) +// \/ \E i \in Server : AdvanceCommitIndex(i) +// \/ \E i,j \in Server : AppendEntries(i, j) +// \/ \E m \in DOMAIN messages : Receive(m) +// \/ \E m \in DOMAIN messages : DuplicateMessage(m) +// \/ \E m \in DOMAIN messages : DropMessage(m) +// \* History variable that tracks every log ever: +// /\ allLogs' = allLogs \cup {log[i] : i \in Server} +// \ No newline at end of file diff --git a/source/libs/sync/test/syncWriteTest.cpp b/source/libs/sync/test/syncWriteTest.cpp index d4c2dde6d06b57322198313b460b8a714833eccd..2598abbddd86ac6813afd287246202d5439cb9c8 100644 --- a/source/libs/sync/test/syncWriteTest.cpp +++ b/source/libs/sync/test/syncWriteTest.cpp @@ -162,7 +162,7 @@ int main(int argc, char **argv) { SyncClientRequest *pMsg1 = step1(pMsg0); syncClientRequestPrint2((char *)"==step1==", pMsg1); - for (int i = 0; i < 5; ++i) { + for (int i = 0; i < 10; ++i) { SyncClientRequest *pSyncClientRequest = pMsg1; SRpcMsg rpcMsg; syncClientRequest2RpcMsg(pSyncClientRequest, &rpcMsg); diff --git a/source/libs/tfs/src/tfs.c b/source/libs/tfs/src/tfs.c index 97b8912b60f9bc3e39c1be03cb14a35adfe0340c..aee7376491c9b7c395456ce7d1bc20559b70c969 100644 --- a/source/libs/tfs/src/tfs.c +++ b/source/libs/tfs/src/tfs.c @@ -204,7 +204,8 @@ void tfsDirname(const STfsFile *pFile, char *dest) { void tfsAbsoluteName(STfs *pTfs, SDiskID diskId, const char *rname, char *aname) { STfsDisk *pDisk = TFS_DISK_AT(pTfs, diskId); - snprintf(aname, TMPNAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, rname); + + snprintf(aname, TSDB_FILENAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, rname); } int32_t tfsRemoveFile(const STfsFile *pFile) { return taosRemoveFile(pFile->aname); } diff --git a/source/os/CMakeLists.txt b/source/os/CMakeLists.txt index ed240015d4dcbff2ddb0c44cbeee7fc751f95429..eea390391126cd86725e39e7c0e756243780ab7f 100644 --- a/source/os/CMakeLists.txt +++ b/source/os/CMakeLists.txt @@ -5,9 +5,14 @@ target_include_directories( PUBLIC "${CMAKE_SOURCE_DIR}/include/os" PUBLIC "${CMAKE_SOURCE_DIR}/include" PUBLIC "${CMAKE_SOURCE_DIR}/include/util" - PUBLIC "${CMAKE_SOURCE_DIR}/contrib/pthread-win32" + PUBLIC "${CMAKE_SOURCE_DIR}/contrib/pthread" PUBLIC "${CMAKE_SOURCE_DIR}/contrib/gnuregex" ) +# iconv +find_path(IconvApiIncludes iconv.h PATHS) +if(NOT IconvApiIncludes) + add_definitions(-DDISALLOW_NCHAR_WITHOUT_ICONV) +endif () target_link_libraries( os pthread dl rt m ) diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index bcbd95e16028fc7efe20297ec265061580484760..5e859de5d602aef6077ed666575f9bb3b6ac1db8 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -46,6 +46,23 @@ extern int openU(const char *, int, ...); /* MsvcLibX UTF-8 version of open */ #define O_TEXT LINUX_FILE_NO_TEXT_OPTION #endif +#if defined(WINDOWS) +typedef int32_t FileFd; +typedef int32_t SocketFd; +#else +typedef int32_t FileFd; +typedef int32_t SocketFd; +#endif + +typedef int32_t FileFd; + +typedef struct TdFile { + pthread_rwlock_t rwlock; + int refId; + FileFd fd; + FILE *fp; +} * TdFilePtr, TdFile; + #define FILE_WITH_LOCK 1 void taosGetTmpfilePath(const char *inputTmpDir, const char *fileNamePrefix, char *dstPath) { diff --git a/source/os/src/osLocale.c b/source/os/src/osLocale.c index e9d6ed7c5424aff687175391942171f57c43c5f8..5f12f9cd3d5a2a75cd4080d1f35cdb892ea8e0b8 100644 --- a/source/os/src/osLocale.c +++ b/source/os/src/osLocale.c @@ -81,7 +81,7 @@ void taosSetSystemLocale(const char *inLocale, const char *inCharSet) { } if (!taosValidateEncodec(inCharSet)) { - printf("Invalid charset:%s, please set the valid charset in config file", inCharSet); + printf("Invalid charset:%s, please set the valid charset in config file\n", inCharSet); exit(-1); } } diff --git a/source/os/src/osString.c b/source/os/src/osString.c index 1052d108af43599ae2fb6706388a2ce9b4b2e115..07108ce34f81b0d02900878444ccb2ef9ebfca29 100644 --- a/source/os/src/osString.c +++ b/source/os/src/osString.c @@ -13,22 +13,77 @@ * along with this program. If not, see . */ +#define ALLOW_FORBID_FUNC #define _DEFAULT_SOURCE #include "os.h" -#include "tdef.h" -#include -#include +// #include "tdef.h" +// #include +// #include +#ifndef DISALLOW_NCHAR_WITHOUT_ICONV +#include "iconv.h" +#endif + +extern int wcwidth(wchar_t c); +extern int wcswidth(const wchar_t *s, size_t n); int64_t taosStr2int64(const char *str) { char *endptr = NULL; return strtoll(str, &endptr, 10); } -#ifdef USE_LIBICONV -#include "iconv.h" +bool taosCheckNcharValid(void) { +#ifdef DISALLOW_NCHAR_WITHOUT_ICONV + return false; +#else + return true; +#endif +} + +int32_t tasoUcs4Compare(TdUcs4 *f1_ucs4, TdUcs4 *f2_ucs4, int32_t bytes) { + for (int32_t i = 0; i < bytes; i += sizeof(TdUcs4)) { + int32_t f1 = *(int32_t *)((char *)f1_ucs4 + i); + int32_t f2 = *(int32_t *)((char *)f2_ucs4 + i); + + if ((f1 == 0 && f2 != 0) || (f1 != 0 && f2 == 0)) { + return f1 - f2; + } else if (f1 == 0 && f2 == 0) { + return 0; + } + + if (f1 != f2) { + return f1 - f2; + } + } + + return 0; + +#if 0 + int32_t ucs4_max_len = bytes + 4; + char *f1_mbs = calloc(bytes, 1); + char *f2_mbs = calloc(bytes, 1); + if (taosUcs4ToMbs(f1_ucs4, ucs4_max_len, f1_mbs) < 0) { + return -1; + } + if (taosUcs4ToMbs(f2_ucs4, ucs4_max_len, f2_mbs) < 0) { + return -1; + } + int32_t ret = strcmp(f1_mbs, f2_mbs); + free(f1_mbs); + free(f2_mbs); + return ret; +#endif +} -int32_t taosUcs4ToMbs(void *ucs4, int32_t ucs4_max_len, char *mbs) { + +TdUcs4* tasoUcs4Copy(TdUcs4 *target_ucs4, TdUcs4 *source_ucs4, int32_t len_ucs4) { + memcpy(target_ucs4, source_ucs4, len_ucs4*sizeof(TdUcs4)); +} + +int32_t taosUcs4ToMbs(TdUcs4 *ucs4, int32_t ucs4_max_len, char *mbs) { +#ifdef DISALLOW_NCHAR_WITHOUT_ICONV + return -1; +#else iconv_t cd = iconv_open(tsCharset, DEFAULT_UNICODE_ENCODEC); size_t ucs4_input_len = ucs4_max_len; size_t outLen = ucs4_max_len; @@ -39,14 +94,18 @@ int32_t taosUcs4ToMbs(void *ucs4, int32_t ucs4_max_len, char *mbs) { iconv_close(cd); return (int32_t)(ucs4_max_len - outLen); +#endif } -bool taosMbsToUcs4(char *mbs, size_t mbsLength, char *ucs4, int32_t ucs4_max_len, int32_t *len) { +bool taosMbsToUcs4(const char *mbs, size_t mbsLength, TdUcs4 *ucs4, int32_t ucs4_max_len, int32_t *len) { +#ifdef DISALLOW_NCHAR_WITHOUT_ICONV + return -1; +#else memset(ucs4, 0, ucs4_max_len); iconv_t cd = iconv_open(DEFAULT_UNICODE_ENCODEC, tsCharset); size_t ucs4_input_len = mbsLength; size_t outLeft = ucs4_max_len; - if (iconv(cd, &mbs, &ucs4_input_len, &ucs4, &outLeft) == -1) { + if (iconv(cd, (char**)&mbs, &ucs4_input_len, (char**)&ucs4, &outLeft) == -1) { iconv_close(cd); return false; } @@ -60,9 +119,13 @@ bool taosMbsToUcs4(char *mbs, size_t mbsLength, char *ucs4, int32_t ucs4_max_len } return true; +#endif } bool taosValidateEncodec(const char *encodec) { +#ifdef DISALLOW_NCHAR_WITHOUT_ICONV + return false; +#else iconv_t cd = iconv_open(encodec, DEFAULT_UNICODE_ENCODEC); if (cd == (iconv_t)(-1)) { return false; @@ -70,214 +133,11 @@ bool taosValidateEncodec(const char *encodec) { iconv_close(cd); return true; -} - -#else - -int32_t taosUcs4ToMbs(void *ucs4, int32_t ucs4_max_len, char *mbs) { - mbstate_t state = {0}; - int32_t len = (int32_t)wcsnrtombs(NULL, (const wchar_t **)&ucs4, ucs4_max_len / 4, 0, &state); - if (len < 0) { - return -1; - } - - memset(&state, 0, sizeof(state)); - len = wcsnrtombs(mbs, (const wchar_t **)&ucs4, ucs4_max_len / 4, (size_t)len, &state); - if (len < 0) { - return -1; - } - - return len; -} - -bool taosMbsToUcs4(const char *mbs, size_t mbsLength, char *ucs4, int32_t ucs4_max_len, int32_t *len) { - memset(ucs4, 0, ucs4_max_len); - mbstate_t state = {0}; - int32_t retlen = mbsnrtowcs((wchar_t *)ucs4, (const char **)&mbs, mbsLength, ucs4_max_len / 4, &state); - *len = retlen; - - return retlen >= 0; -} - -bool taosValidateEncodec(const char *encodec) { - return true; -} - -#endif - -#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) - -/* - * windows implementation - */ - -#ifdef HAVE_CONFIG_H -#include -#endif - -#include -#include -#include -#include -#include - -#if STDC_HEADERS -#include -#else -char *malloc(), *realloc(); #endif - -/* Always add at least this many bytes when extending the buffer. */ -#define MIN_CHUNK 64 - -/* Read up to (and including) a TERMINATOR from STREAM into *LINEPTR -+ OFFSET (and null-terminate it). *LINEPTR is a pointer returned from -malloc (or NULL), pointing to *N characters of space. It is realloc'd -as necessary. Return the number of characters read (not including the -null terminator), or -1 on error or EOF. On a -1 return, the caller -should check feof(), if not then errno has been set to indicate -the error. */ - -int32_t getstr(char **lineptr, size_t *n, FILE *stream, char terminator, int32_t offset) { - int32_t nchars_avail; /* Allocated but unused chars in *LINEPTR. */ - char * read_pos; /* Where we're reading into *LINEPTR. */ - int32_t ret; - - if (!lineptr || !n || !stream) { - errno = EINVAL; - return -1; - } - - if (!*lineptr) { - *n = MIN_CHUNK; - *lineptr = malloc(*n); - if (!*lineptr) { - errno = ENOMEM; - return -1; - } - } - - nchars_avail = (int32_t)(*n - offset); - read_pos = *lineptr + offset; - - for (;;) { - int32_t save_errno; - register int32_t c = getc(stream); - - save_errno = errno; - - /* We always want at least one char left in the buffer, since we - always (unless we get an error while reading the first char) - NUL-terminate the line buffer. */ - - assert((*lineptr + *n) == (read_pos + nchars_avail)); - if (nchars_avail < 2) { - if (*n > MIN_CHUNK) - *n *= 2; - else - *n += MIN_CHUNK; - - nchars_avail = (int32_t)(*n + *lineptr - read_pos); - char* lineptr1 = realloc(*lineptr, *n); - if (!lineptr1) { - errno = ENOMEM; - return -1; - } - *lineptr = lineptr1; - - read_pos = *n - nchars_avail + *lineptr; - assert((*lineptr + *n) == (read_pos + nchars_avail)); - } - - if (ferror(stream)) { - /* Might like to return partial line, but there is no - place for us to store errno. And we don't want to just - lose errno. */ - errno = save_errno; - return -1; - } - - if (c == EOF) { - /* Return partial line, if any. */ - if (read_pos == *lineptr) - return -1; - else - break; - } - - *read_pos++ = c; - nchars_avail--; - - if (c == terminator) /* Return the line. */ - break; - } - - /* Done - NUL terminate and return the number of chars read. */ - *read_pos = '\0'; - - ret = (int32_t)(read_pos - (*lineptr + offset)); - return ret; -} - -int32_t tgetline(char **lineptr, size_t *n, FILE *stream) { return getstr(lineptr, n, stream, '\n', 0); } - - -/* - * Get next token from string *stringp, where tokens are possibly-empty - * strings separated by characters from delim. - * - * Writes NULs into the string at *stringp to end tokens. - * delim need not remain constant from call to call. - * On return, *stringp points past the last NUL written (if there might - * be further tokens), or is NULL (if there are definitely no moretokens). - * - * If *stringp is NULL, strsep returns NULL. - */ -char *strsep(char **stringp, const char *delim) { - char * s; - const char *spanp; - int32_t c, sc; - char *tok; - if ((s = *stringp) == NULL) - return (NULL); - for (tok = s;;) { - c = *s++; - spanp = delim; - do { - if ((sc = *spanp++) == c) { - if (c == 0) - s = NULL; - else - s[-1] = 0; - *stringp = s; - return (tok); - } - } while (sc != 0); - } - /* NOTREACHED */ -} - -char *getpass(const char *prefix) { - static char passwd[TSDB_PASSWORD_LEN] = {0}; - memset(passwd, 0, TSDB_PASSWORD_LEN); - //printf("%s", prefix); - - int32_t index = 0; - char ch; - while (index < TSDB_PASSWORD_LEN) { - ch = getch(); - if (ch == '\n' || ch == '\r') { - break; - } else { - passwd[index++] = ch; - } - } - - return passwd; } -int32_t twcslen(const wchar_t *wcs) { - int32_t *wstr = (int32_t *)wcs; +int32_t taosUcs4len(TdUcs4 *ucs4) { + TdUcs4 *wstr = (TdUcs4 *)ucs4; if (NULL == wstr) { return 0; } @@ -292,73 +152,353 @@ int32_t twcslen(const wchar_t *wcs) { return n; } -int32_t tasoUcs4Compare(void *f1_ucs4, void *f2_ucs4, int32_t bytes) { - for (int32_t i = 0; i < bytes; i += TSDB_NCHAR_SIZE) { - int32_t f1 = *(int32_t *)((char *)f1_ucs4 + i); - int32_t f2 = *(int32_t *)((char *)f2_ucs4 + i); - if ((f1 == 0 && f2 != 0) || (f1 != 0 && f2 == 0)) { - return f1 - f2; - } else if (f1 == 0 && f2 == 0) { - return 0; - } +int32_t taosWcharWidth(TdWchar wchar) { return wcwidth(wchar); } - if (f1 != f2) { - return f1 - f2; - } - } +int32_t taosWcharsWidth(TdWchar *pWchar, int32_t size) { return wcswidth(pWchar, size); } - return 0; +int32_t taosMbToWchar(TdWchar *pWchar, const char *pStr, int32_t size) { return mbtowc(pWchar, pStr, size); } -#if 0 - int32_t ucs4_max_len = bytes + 4; - char *f1_mbs = calloc(bytes, 1); - char *f2_mbs = calloc(bytes, 1); - if (taosUcs4ToMbs(f1_ucs4, ucs4_max_len, f1_mbs) < 0) { - return -1; - } - if (taosUcs4ToMbs(f2_ucs4, ucs4_max_len, f2_mbs) < 0) { - return -1; - } - int32_t ret = strcmp(f1_mbs, f2_mbs); - free(f1_mbs); - free(f2_mbs); - return ret; -#endif -} - -/* Copy memory to memory until the specified number of bytes -has been copied, return pointer to following byte. -Overlap is NOT handled correctly. */ -void *mempcpy(void *dest, const void *src, size_t len) { - return (char*)memcpy(dest, src, len) + len; -} - -/* Copy SRC to DEST, returning the address of the terminating '\0' in DEST. */ -char *stpcpy (char *dest, const char *src) { - size_t len = strlen (src); - return (char*)memcpy(dest, src, len + 1) + len; -} - -/* Copy no more than N characters of SRC to DEST, returning the address of - the terminating '\0' in DEST, if any, or else DEST + N. */ -char *stpncpy (char *dest, const char *src, size_t n) { - size_t size = strnlen (src, n); - memcpy (dest, src, size); - dest += size; - if (size == n) - return dest; - return memset (dest, '\0', n - size); -} - -#else +int32_t taosMbsToWchars(TdWchar *pWchars, const char *pStrs, int32_t size) { return mbstowcs(pWchars, pStrs, size); } -/* - * linux and darwin implementation - */ +int32_t taosWcharToMb(char *pStr, TdWchar wchar) { return wctomb(pStr, wchar); } -int32_t tasoUcs4Compare(void *f1_ucs4, void *f2_ucs4, int32_t bytes, int8_t ncharSize) { - return wcsncmp((wchar_t *)f1_ucs4, (wchar_t *)f2_ucs4, bytes / ncharSize); -} - -#endif +int32_t taosWcharsToMbs(char *pStrs, TdWchar *pWchars, int32_t size) { return wcstombs(pStrs, pWchars, size); } + +// #ifdef USE_LIBICONV +// #include "iconv.h" + +// int32_t taosUcs4ToMbs(void *ucs4, int32_t ucs4_max_len, char *mbs) { +// iconv_t cd = iconv_open(tsCharset, DEFAULT_UNICODE_ENCODEC); +// size_t ucs4_input_len = ucs4_max_len; +// size_t outLen = ucs4_max_len; +// if (iconv(cd, (char **)&ucs4, &ucs4_input_len, &mbs, &outLen) == -1) { +// iconv_close(cd); +// return -1; +// } + +// iconv_close(cd); +// return (int32_t)(ucs4_max_len - outLen); +// } + +// bool taosMbsToUcs4(char *mbs, size_t mbsLength, char *ucs4, int32_t ucs4_max_len, int32_t *len) { +// memset(ucs4, 0, ucs4_max_len); +// iconv_t cd = iconv_open(DEFAULT_UNICODE_ENCODEC, tsCharset); +// size_t ucs4_input_len = mbsLength; +// size_t outLeft = ucs4_max_len; +// if (iconv(cd, &mbs, &ucs4_input_len, &ucs4, &outLeft) == -1) { +// iconv_close(cd); +// return false; +// } + +// iconv_close(cd); +// if (len != NULL) { +// *len = (int32_t)(ucs4_max_len - outLeft); +// if (*len < 0) { +// return false; +// } +// } + +// return true; +// } + +// bool taosValidateEncodec(const char *encodec) { +// iconv_t cd = iconv_open(encodec, DEFAULT_UNICODE_ENCODEC); +// if (cd == (iconv_t)(-1)) { +// return false; +// } + +// iconv_close(cd); +// return true; +// } + +// #else + +// int32_t taosUcs4ToMbs(void *ucs4, int32_t ucs4_max_len, char *mbs) { +// mbstate_t state = {0}; +// int32_t len = (int32_t)wcsnrtombs(NULL, (const wchar_t **)&ucs4, ucs4_max_len / 4, 0, &state); +// if (len < 0) { +// return -1; +// } + +// memset(&state, 0, sizeof(state)); +// len = wcsnrtombs(mbs, (const wchar_t **)&ucs4, ucs4_max_len / 4, (size_t)len, &state); +// if (len < 0) { +// return -1; +// } + +// return len; +// } + +// bool taosMbsToUcs4(const char *mbs, size_t mbsLength, char *ucs4, int32_t ucs4_max_len, int32_t *len) { +// memset(ucs4, 0, ucs4_max_len); +// mbstate_t state = {0}; +// int32_t retlen = mbsnrtowcs((wchar_t *)ucs4, (const char **)&mbs, mbsLength, ucs4_max_len / 4, &state); +// *len = retlen; + +// return retlen >= 0; +// } + +// bool taosValidateEncodec(const char *encodec) { +// return true; +// } + +// #endif + +// #if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) + +// /* +// * windows implementation +// */ + +// #ifdef HAVE_CONFIG_H +// #include +// #endif + +// #include +// #include +// #include +// #include +// #include + +// #if STDC_HEADERS +// #include +// #else +// char *malloc(), *realloc(); +// #endif + +// /* Always add at least this many bytes when extending the buffer. */ +// #define MIN_CHUNK 64 + +// /* Read up to (and including) a TERMINATOR from STREAM into *LINEPTR +// + OFFSET (and null-terminate it). *LINEPTR is a pointer returned from +// malloc (or NULL), pointing to *N characters of space. It is realloc'd +// as necessary. Return the number of characters read (not including the +// null terminator), or -1 on error or EOF. On a -1 return, the caller +// should check feof(), if not then errno has been set to indicate +// the error. */ + +// int32_t getstr(char **lineptr, size_t *n, FILE *stream, char terminator, int32_t offset) { +// int32_t nchars_avail; /* Allocated but unused chars in *LINEPTR. */ +// char * read_pos; /* Where we're reading into *LINEPTR. */ +// int32_t ret; + +// if (!lineptr || !n || !stream) { +// errno = EINVAL; +// return -1; +// } + +// if (!*lineptr) { +// *n = MIN_CHUNK; +// *lineptr = malloc(*n); +// if (!*lineptr) { +// errno = ENOMEM; +// return -1; +// } +// } + +// nchars_avail = (int32_t)(*n - offset); +// read_pos = *lineptr + offset; + +// for (;;) { +// int32_t save_errno; +// register int32_t c = getc(stream); + +// save_errno = errno; + +// /* We always want at least one char left in the buffer, since we +// always (unless we get an error while reading the first char) +// NUL-terminate the line buffer. */ + +// assert((*lineptr + *n) == (read_pos + nchars_avail)); +// if (nchars_avail < 2) { +// if (*n > MIN_CHUNK) +// *n *= 2; +// else +// *n += MIN_CHUNK; + +// nchars_avail = (int32_t)(*n + *lineptr - read_pos); +// char* lineptr1 = realloc(*lineptr, *n); +// if (!lineptr1) { +// errno = ENOMEM; +// return -1; +// } +// *lineptr = lineptr1; + +// read_pos = *n - nchars_avail + *lineptr; +// assert((*lineptr + *n) == (read_pos + nchars_avail)); +// } + +// if (ferror(stream)) { +// /* Might like to return partial line, but there is no +// place for us to store errno. And we don't want to just +// lose errno. */ +// errno = save_errno; +// return -1; +// } + +// if (c == EOF) { +// /* Return partial line, if any. */ +// if (read_pos == *lineptr) +// return -1; +// else +// break; +// } + +// *read_pos++ = c; +// nchars_avail--; + +// if (c == terminator) /* Return the line. */ +// break; +// } + +// /* Done - NUL terminate and return the number of chars read. */ +// *read_pos = '\0'; + +// ret = (int32_t)(read_pos - (*lineptr + offset)); +// return ret; +// } + +// int32_t tgetline(char **lineptr, size_t *n, FILE *stream) { return getstr(lineptr, n, stream, '\n', 0); } + + +// /* +// * Get next token from string *stringp, where tokens are possibly-empty +// * strings separated by characters from delim. +// * +// * Writes NULs into the string at *stringp to end tokens. +// * delim need not remain constant from call to call. +// * On return, *stringp points past the last NUL written (if there might +// * be further tokens), or is NULL (if there are definitely no moretokens). +// * +// * If *stringp is NULL, strsep returns NULL. +// */ +// char *strsep(char **stringp, const char *delim) { +// char * s; +// const char *spanp; +// int32_t c, sc; +// char *tok; +// if ((s = *stringp) == NULL) +// return (NULL); +// for (tok = s;;) { +// c = *s++; +// spanp = delim; +// do { +// if ((sc = *spanp++) == c) { +// if (c == 0) +// s = NULL; +// else +// s[-1] = 0; +// *stringp = s; +// return (tok); +// } +// } while (sc != 0); +// } +// /* NOTREACHED */ +// } + +// char *getpass(const char *prefix) { +// static char passwd[TSDB_PASSWORD_LEN] = {0}; +// memset(passwd, 0, TSDB_PASSWORD_LEN); +// //printf("%s", prefix); + +// int32_t index = 0; +// char ch; +// while (index < TSDB_PASSWORD_LEN) { +// ch = getch(); +// if (ch == '\n' || ch == '\r') { +// break; +// } else { +// passwd[index++] = ch; +// } +// } + +// return passwd; +// } + +// int32_t twcslen(const wchar_t *wcs) { +// int32_t *wstr = (int32_t *)wcs; +// if (NULL == wstr) { +// return 0; +// } + +// int32_t n = 0; +// while (1) { +// if (0 == *wstr++) { +// break; +// } +// n++; +// } + +// return n; +// } +// int32_t tasoUcs4Compare(void *f1_ucs4, void *f2_ucs4, int32_t bytes) { +// for (int32_t i = 0; i < bytes; i += TSDB_NCHAR_SIZE) { +// int32_t f1 = *(int32_t *)((char *)f1_ucs4 + i); +// int32_t f2 = *(int32_t *)((char *)f2_ucs4 + i); + +// if ((f1 == 0 && f2 != 0) || (f1 != 0 && f2 == 0)) { +// return f1 - f2; +// } else if (f1 == 0 && f2 == 0) { +// return 0; +// } + +// if (f1 != f2) { +// return f1 - f2; +// } +// } + +// return 0; + +// #if 0 +// int32_t ucs4_max_len = bytes + 4; +// char *f1_mbs = calloc(bytes, 1); +// char *f2_mbs = calloc(bytes, 1); +// if (taosUcs4ToMbs(f1_ucs4, ucs4_max_len, f1_mbs) < 0) { +// return -1; +// } +// if (taosUcs4ToMbs(f2_ucs4, ucs4_max_len, f2_mbs) < 0) { +// return -1; +// } +// int32_t ret = strcmp(f1_mbs, f2_mbs); +// free(f1_mbs); +// free(f2_mbs); +// return ret; +// #endif +// } + +// /* Copy memory to memory until the specified number of bytes +// has been copied, return pointer to following byte. +// Overlap is NOT handled correctly. */ +// void *mempcpy(void *dest, const void *src, size_t len) { +// return (char*)memcpy(dest, src, len) + len; +// } + +// /* Copy SRC to DEST, returning the address of the terminating '\0' in DEST. */ +// char *stpcpy (char *dest, const char *src) { +// size_t len = strlen (src); +// return (char*)memcpy(dest, src, len + 1) + len; +// } + +// /* Copy no more than N characters of SRC to DEST, returning the address of +// the terminating '\0' in DEST, if any, or else DEST + N. */ +// char *stpncpy (char *dest, const char *src, size_t n) { +// size_t size = strnlen (src, n); +// memcpy (dest, src, size); +// dest += size; +// if (size == n) +// return dest; +// return memset (dest, '\0', n - size); +// } + +// #else + +// /* +// * linux and darwin implementation +// */ + +// int32_t tasoUcs4Compare(void *f1_ucs4, void *f2_ucs4, int32_t bytes, int8_t ncharSize) { +// return wcsncmp((wchar_t *)f1_ucs4, (wchar_t *)f2_ucs4, bytes / ncharSize); +// } + +// #endif diff --git a/source/util/src/tcompare.c b/source/util/src/tcompare.c index 5e3cf1247c6941064a336cb14c5f51c2c6e4013e..ff7d2cf733c6af1043957d83d0dc24a8073db571 100644 --- a/source/util/src/tcompare.c +++ b/source/util/src/tcompare.c @@ -208,7 +208,7 @@ int32_t compareLenPrefixedWStr(const void *pLeft, const void *pRight) { if (len1 != len2) { return len1 > len2 ? 1 : -1; } else { - int32_t ret = memcmp((wchar_t *)pLeft, (wchar_t *)pRight, len1); + int32_t ret = memcmp((TdUcs4 *)pLeft, (TdUcs4 *)pRight, len1); if (ret == 0) { return 0; } else { @@ -295,10 +295,10 @@ int32_t patternMatch(const char *patterStr, const char *str, size_t size, const return (str[j] == 0 || j >= size) ? TSDB_PATTERN_MATCH : TSDB_PATTERN_NOMATCH; } -int32_t WCSPatternMatch(const wchar_t *patterStr, const wchar_t *str, size_t size, const SPatternCompareInfo *pInfo) { - wchar_t c, c1; - wchar_t matchOne = L'_'; // "_" - wchar_t matchAll = L'%'; // "%" +int32_t WCSPatternMatch(const TdUcs4 *patterStr, const TdUcs4 *str, size_t size, const SPatternCompareInfo *pInfo) { + TdUcs4 c, c1; + TdUcs4 matchOne = L'_'; // "_" + TdUcs4 matchAll = L'%'; // "%" int32_t i = 0; int32_t j = 0; @@ -315,7 +315,7 @@ int32_t WCSPatternMatch(const wchar_t *patterStr, const wchar_t *str, size_t siz return TSDB_PATTERN_MATCH; } - wchar_t accept[3] = {towupper(c), towlower(c), 0}; + TdUcs4 accept[3] = {towupper(c), towlower(c), 0}; while (1) { size_t n = wcscspn(str, accept); @@ -424,10 +424,10 @@ int32_t compareWStrPatternMatch(const void *pLeft, const void *pRight) { assert(varDataLen(pRight) <= TSDB_MAX_FIELD_LEN * TSDB_NCHAR_SIZE); - wchar_t *pattern = calloc(varDataLen(pRight) + 1, sizeof(wchar_t)); + char *pattern = calloc(varDataLen(pRight) + TSDB_NCHAR_SIZE, 1); memcpy(pattern, varDataVal(pRight), varDataLen(pRight)); - int32_t ret = WCSPatternMatch(pattern, (const wchar_t *)varDataVal(pLeft), varDataLen(pLeft) / TSDB_NCHAR_SIZE, &pInfo); + int32_t ret = WCSPatternMatch((TdUcs4*)pattern, (TdUcs4*)varDataVal(pLeft), varDataLen(pLeft) / TSDB_NCHAR_SIZE, &pInfo); free(pattern); return (ret == TSDB_PATTERN_MATCH) ? 0 : 1; @@ -647,7 +647,7 @@ int32_t doCompare(const char *f1, const char *f2, int32_t type, size_t size) { if (t1->len != t2->len) { return t1->len > t2->len ? 1 : -1; } - int32_t ret = memcmp((wchar_t *)t1, (wchar_t *)t2, t2->len); + int32_t ret = memcmp((TdUcs4 *)t1, (TdUcs4 *)t2, t2->len); if (ret == 0) { return ret; } diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index cafca76761bcf4924b0f997ef5f7de570c147c65..0c2b4fe1f4dad86661e70b06282b3cd8e9db86a9 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -18,4 +18,7 @@ # ---- insert ./test.sh -f tsim/insert/basic0.sim + +# ---- query +./test.sh -f tsim/query/interval.sim #======================b1-end=============== diff --git a/tests/script/tsim/insert/basic1.sim b/tests/script/tsim/insert/basic1.sim new file mode 100644 index 0000000000000000000000000000000000000000..3fc635532ac409000988808249d0310b7096c5e7 --- /dev/null +++ b/tests/script/tsim/insert/basic1.sim @@ -0,0 +1,94 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print =============== create database +sql create database d1 +sql show databases +if $rows != 1 then + return -1 +endi + +print $data00 $data01 $data02 + +sql use d1 + +print =============== create super table, include all type +sql create table if not exists stb (ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint, c6 float, c7 double, c8 binary(16), c9 nchar(16), c10 timestamp, c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned) tags (t1 bool, t2 tinyint, t3 smallint, t4 int, t5 bigint, t6 float, t7 double, t8 binary(16), t9 nchar(16), t10 timestamp, t11 tinyint unsigned, t12 smallint unsigned, t13 int unsigned, t14 bigint unsigned) + +sql create stable if not exists stb_1 (ts timestamp, i int) tags (j int) +sql create table stb_2 (ts timestamp, i int) tags (j int) +sql create stable stb_3 (ts timestamp, i int) tags (j int) + +sql show stables +if $rows != 4 then + return -1 +endi + +print =============== create child table +sql create table c1 using stb tags(true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40) +sql create table c2 using stb tags(false, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 2', 'child tbl 2', '2022-02-25 18:00:00.000', 10, 20, 30, 40) + +sql show tables +if $rows != 2 then + return -1 +endi + + +print =============== insert data, mode1: one row one table in sql +print =============== insert data, mode1: mulit rows one table in sql +print =============== insert data, mode1: one rows mulit table in sql +print =============== insert data, mode1: mulit rows mulit table in sql +sql insert into c1 values(now+0s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40) +sql insert into c1 values(now+0s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40) (now+1s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40) (now+2s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40) + +print =============== query data +sql select * from c1 +if $rows != 4 then + return -1 +endi + +if $data01 != true then + return -1 +endi + +if $data02 != -1 then + return -1 +endi + +if $data03 != -2 then + return -1 +endi + +print =============== query data from st +sql select * from st +if $rows != 4 then + return -1 +endi + +print =============== stop and restart taosd +system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode1 -s start + +sleep 2000 +print =============== query data +sql select * from c1 +if $rows != 4 then + return -1 +endi + +if $data01 != true then + return -1 +endi + +if $data02 != -1 then + return -1 +endi + +if $data03 != -2 then + return -1 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/query/interval.sim b/tests/script/tsim/query/interval.sim index 35e7c938d87834f0795b57124d81f245519aef26..47be71bdc6f54a81c94066c1cd62eb393f030b5c 100644 --- a/tests/script/tsim/query/interval.sim +++ b/tests/script/tsim/query/interval.sim @@ -47,161 +47,137 @@ $i = 1 $tb = $tbPrefix . $i sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $tb interval(1m) -print ===> $rows -if $rows < $rowNum then - return -1 -endi -if $data01 != 1 then - return -1 -endi -if $data05 != 1 then - return -1 -endi - -print =============== step3 -$cc = 4 * 60000 -$ms = 1601481600000 + $cc -sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $tb where ts <= $ms interval(1m) -print ===> $rows -if $rows > 10 then - return -1 -endi -if $rows < 3 then - return -1 -endi -if $data01 != 1 then - return -1 -endi -if $data05 != 1 then - return -1 -endi - -print =============== step4 -$cc = 40 * 60000 -$ms = 1601481600000 + $cc - -$cc = 1 * 60000 -$ms2 = 1601481600000 - $cc - -sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $tb where ts <= $ms and ts > $ms2 interval(1m) -print ===> $rows -if $rows < 18 then - return -1 -endi -if $rows > 22 then - return -1 -endi -if $data01 != 1 then - return -1 -endi -if $data05 != 1 then - return -1 -endi - -print =============== step5 -$cc = 40 * 60000 -$ms = 1601481600000 + $cc - -$cc = 1 * 60000 -$ms2 = 1601481600000 - $cc - -sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $tb where ts <= $ms and ts > $ms2 interval(1m) fill(value,0) -print ===> $rows -if $rows < 30 then - return -1 -endi -if $rows > 50 then - return -1 -endi -if $data21 != 1 then - return -1 -endi -if $data25 != 1 then - return -1 -endi - -print =============== step6 -sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $mt interval(1m) -print ===> $rows -if $rows < 18 then - return -1 -endi -if $rows > 22 then - return -1 -endi -if $data11 > 15 then - return -1 -endi -if $data11 < 5 then - return -1 -endi - -print =============== step7 -$cc = 4 * 60000 -$ms = 1601481600000 + $cc -sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $mt where ts <= $ms interval(1m) -print ===> $rows -if $rows < 3 then - return -1 -endi -if $rows > 7 then - return -1 -endi -if $data11 > 15 then - return -1 -endi -if $data11 < 5 then - return -1 -endi - -print =============== step8 -$cc = 40 * 60000 -$ms1 = 1601481600000 + $cc - -$cc = 1 * 60000 -$ms2 = 1601481600000 - $cc - -sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $mt where ts <= $ms1 and ts > $ms2 interval(1m) -print ===> $rows -if $rows < 18 then - return -1 -endi -if $rows > 22 then - return -1 -endi -if $data11 > 15 then - return -1 -endi -if $data11 < 5 then - return -1 -endi - -print =============== step9 -$cc = 40 * 60000 -$ms1 = 1601481600000 + $cc - -$cc = 1 * 60000 -$ms2 = 1601481600000 - $cc - -sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $mt where ts <= $ms1 and ts > $ms2 interval(1m) fill(value, 0) -if $rows < 30 then - return -1 -endi -if $rows > 50 then - return -1 -endi -if $data11 > 15 then - return -1 -endi -if $data11 < 5 then - return -1 -endi +print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $tb interval(1m) +print ===> $rows $data01 $data05 +if $rows != $rowNum then + return -1 +endi +if $data00 != 1 then + return -1 +endi +if $data04 != 1 then + return -1 +endi + +#print =============== step3 +#$cc = 4 * 60000 +#$ms = 1601481600000 + $cc +#sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $tb where ts <= $ms interval(1m) +#print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $tb where ts <= $ms interval(1m) +#print ===> $rows $data01 $data05 +#if $rows != 5 then +# return -1 +#endi +#if $data00 != 1 then +# return -1 +#endi +#if $data04 != 1 then +# return -1 +#endi + +#print =============== step4 +#$cc = 40 * 60000 +#$ms = 1601481600000 + $cc + +#$cc = 1 * 60000 +#$ms2 = 1601481600000 - $cc + +#sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $tb where ts <= $ms and ts > $ms2 interval(1m) +#print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $tb where ts <= $ms and ts > $ms2 interval(1m) +#print ===> $rows $data01 $data05 +#if $rows != 20 then +# return -1 +#endi +#if $data00 != 1 then +# return -1 +#endi +#if $data04 != 1 then +# return -1 +#endi + +#print =============== step5 +#$cc = 40 * 60000 +#$ms = 1601481600000 + $cc + +#$cc = 1 * 60000 +#$ms2 = 1601481600000 - $cc + +#sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $tb where ts <= $ms and ts > $ms2 interval(1m) fill(value,0) +#print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $tb where ts <= $ms and ts > $ms2 interval(1m) fill(value,0) +#print ===> $rows $data21 $data25 +#if $rows != 42 then +# return -1 +#endi +#if $data20 != 1 then +# return -1 +#endi +#if $data24 != 1 then +# return -1 +#endi + +#print =============== step6 +#sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $mt interval(1m) +#print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $mt interval(1m) +#print ===> $rows $data11 +#if $rows != 20 then +# return -1 +#endi +#if $data11 != 10 then +# return -1 +#endi + +#print =============== step7 +#$cc = 4 * 60000 +#$ms = 1601481600000 + $cc +#sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $mt where ts <= $ms interval(1m) +#print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $mt where ts <= $ms interval(1m) +#print ===> $rows $data11 +#if $rows != 5 then +# return -1 +#endi +#if $data11 != 10 then +# return -1 +#endi + +#print =============== step8 +#$cc = 40 * 60000 +#$ms1 = 1601481600000 + $cc +# +#$cc = 1 * 60000 +#$ms2 = 1601481600000 - $cc +# +#sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $mt where ts <= $ms1 and ts > $ms2 interval(1m) +#print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $mt where ts <= $ms1 and ts > $ms2 interval(1m) +#print ===> $rows $data11 +#if $rows != 20 then +# return -1 +#endi +#if $data11 != 10 then +# return -1 +#endi +# +#print =============== step9 +#$cc = 40 * 60000 +#$ms1 = 1601481600000 + $cc +# +#$cc = 1 * 60000 +#$ms2 = 1601481600000 - $cc +# +#sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $mt where ts <= $ms1 and ts > $ms2 interval(1m) fill(value, 0) +#print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $mt where ts <= $ms1 and ts > $ms2 interval(1m) fill(value, 0) +#print ===> $rows $data11 +#if $rows != 42 then +# return -1 +#endi +#if $data11 != 10 then +# return -1 +#endi print =============== clear -sql drop database $db -sql show databases -if $rows != 0 then - return -1 -endi +#sql drop database $db +#sql show databases +#if $rows != 0 then +# return -1 +#endi -system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file +#system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tools/shell/src/backup/shellDarwin.c b/tools/shell/src/backup/shellDarwin.c index e4cf09358bc1b0b6e4777577373288c577477efc..d7a976d52cc6179726cf80645b28f5bb30bc6fac 100644 --- a/tools/shell/src/backup/shellDarwin.c +++ b/tools/shell/src/backup/shellDarwin.c @@ -28,7 +28,6 @@ int indicator = 1; struct termios oldtio; -extern int wcwidth(wchar_t c); void insertChar(Command *cmd, char *c, int size); @@ -426,7 +425,7 @@ void showOnScreen(Command *cmd) { w.ws_row = 30; } - wchar_t wc; + TdWchar wc; int size = 0; // Print out the command. @@ -441,11 +440,11 @@ void showOnScreen(Command *cmd) { int remain_column = w.ws_col; /* size = cmd->commandSize + prompt_size; */ for (char *str = total_string; size < cmd->commandSize + prompt_size;) { - int ret = mbtowc(&wc, str, MB_CUR_MAX); + int ret = taosMbToWchar(&wc, str, MB_CUR_MAX); if (ret < 0) break; size += ret; /* assert(size >= 0); */ - int width = wcwidth(wc); + int width = taosWcharWidth(wc); if (remain_column > width) { printf("%lc", wc); remain_column -= width; diff --git a/tools/shell/src/shellCommand.c b/tools/shell/src/shellCommand.c index cf0ceded38307e5bd138a77c19203e882f0662e4..fd993998b823411c666194d9f2bb9c6390baa2c4 100644 --- a/tools/shell/src/shellCommand.c +++ b/tools/shell/src/shellCommand.c @@ -21,8 +21,6 @@ #include -extern int wcwidth(wchar_t c); -extern int wcswidth(const wchar_t *s, size_t n); typedef struct { char widthInString; char widthOnScreen; @@ -43,7 +41,7 @@ int countPrefixOnes(unsigned char c) { void getPrevCharSize(const char *str, int pos, int *size, int *width) { assert(pos > 0); - wchar_t wc; + TdWchar wc; *size = 0; *width = 0; @@ -53,25 +51,25 @@ void getPrevCharSize(const char *str, int pos, int *size, int *width) { if (str[pos] > 0 || countPrefixOnes((unsigned char )str[pos]) > 1) break; } - int rc = mbtowc(&wc, str + pos, MB_CUR_MAX); + int rc = taosMbToWchar(&wc, str + pos, MB_CUR_MAX); assert(rc == *size); - *width = wcwidth(wc); + *width = taosWcharWidth(wc); } void getNextCharSize(const char *str, int pos, int *size, int *width) { assert(pos >= 0); - wchar_t wc; - *size = mbtowc(&wc, str + pos, MB_CUR_MAX); - *width = wcwidth(wc); + TdWchar wc; + *size = taosMbToWchar(&wc, str + pos, MB_CUR_MAX); + *width = taosWcharWidth(wc); } void insertChar(Command *cmd, char *c, int size) { assert(cmd->cursorOffset <= cmd->commandSize && cmd->endOffset >= cmd->screenOffset); - wchar_t wc; - if (mbtowc(&wc, c, size) < 0) return; + TdWchar wc; + if (taosMbToWchar(&wc, c, size) < 0) return; clearScreen(cmd->endOffset + prompt_size, cmd->screenOffset + prompt_size); /* update the buffer */ @@ -81,8 +79,8 @@ void insertChar(Command *cmd, char *c, int size) { /* update the values */ cmd->commandSize += size; cmd->cursorOffset += size; - cmd->screenOffset += wcwidth(wc); - cmd->endOffset += wcwidth(wc); + cmd->screenOffset += taosWcharWidth(wc); + cmd->endOffset += taosWcharWidth(wc); showOnScreen(cmd); } @@ -249,10 +247,10 @@ int isReadyGo(Command *cmd) { } void getMbSizeInfo(const char *str, int *size, int *width) { - wchar_t *wc = (wchar_t *)calloc(sizeof(wchar_t), MAX_COMMAND_SIZE); + TdWchar *wc = (TdWchar *)calloc(sizeof(TdWchar), MAX_COMMAND_SIZE); *size = strlen(str); - mbstowcs(wc, str, MAX_COMMAND_SIZE); - *width = wcswidth(wc, MAX_COMMAND_SIZE); + taosMbsToWchars(wc, str, MAX_COMMAND_SIZE); + *width = taosWcharsWidth(wc, MAX_COMMAND_SIZE); free(wc); } diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index 5a989937d8cfd41050bca0a077947a6feda9514b..1b35afb57d231c07181499fa43c66cbad137e72c 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -560,12 +560,12 @@ static int dumpResultToFile(const char *fname, TAOS_RES *tres) { } static void shellPrintNChar(const char *str, int length, int width) { - wchar_t tail[3]; + TdWchar tail[3]; int pos = 0, cols = 0, totalCols = 0, tailLen = 0; while (pos < length) { - wchar_t wc; - int bytes = mbtowc(&wc, str + pos, MB_CUR_MAX); + TdWchar wc; + int bytes = taosMbToWchar(&wc, str + pos, MB_CUR_MAX); if (bytes == 0) { break; } @@ -577,7 +577,7 @@ static void shellPrintNChar(const char *str, int length, int width) { #ifdef WINDOWS int w = bytes; #else - int w = wcwidth(wc); + int w = taosWcharWidth(wc); #endif if (w <= 0) { continue; diff --git a/tools/shell/src/shellLinux.c b/tools/shell/src/shellLinux.c index cc497688d1bfcea14aba2e6869984962b305b075..6da05f28dfbf1c74ea6fa4aeb111053c89e80160 100644 --- a/tools/shell/src/shellLinux.c +++ b/tools/shell/src/shellLinux.c @@ -31,7 +31,6 @@ int indicator = 1; struct termios oldtio; -extern int wcwidth(wchar_t c); void insertChar(Command *cmd, char *c, int size); const char *argp_program_version = version; const char *argp_program_bug_address = ""; @@ -456,7 +455,7 @@ void showOnScreen(Command *cmd) { w.ws_row = 30; } - wchar_t wc; + TdWchar wc; int size = 0; // Print out the command. @@ -471,11 +470,11 @@ void showOnScreen(Command *cmd) { int remain_column = w.ws_col; /* size = cmd->commandSize + prompt_size; */ for (char *str = total_string; size < cmd->commandSize + prompt_size;) { - int ret = mbtowc(&wc, str, MB_CUR_MAX); + int ret = taosMbToWchar(&wc, str, MB_CUR_MAX); if (ret < 0) break; size += ret; /* assert(size >= 0); */ - int width = wcwidth(wc); + int width = taosWcharWidth(wc); if (remain_column > width) { printf("%lc", wc); remain_column -= width;