提交 eac1ccb5 编写于 作者: X Xiaoyu Wang

merge origin/3.0

......@@ -18,6 +18,13 @@ IF(${TD_WINDOWS})
ON
)
MESSAGE("build iconv Win32")
option(
BUILD_WITH_ICONV
"If build iconv on Windows"
ON
)
ENDIF ()
IF(${TD_LINUX} MATCHES TRUE)
......
# iconv
ExternalProject_Add(iconv
GIT_REPOSITORY https://github.com/win-iconv/win-iconv.git
GIT_TAG v0.0.8
SOURCE_DIR "${CMAKE_CONTRIB_DIR}/iconv"
BINARY_DIR ""
CONFIGURE_COMMAND ""
BUILD_COMMAND ""
INSTALL_COMMAND ""
TEST_COMMAND ""
)
\ No newline at end of file
......@@ -83,6 +83,11 @@ if(${BUILD_WITH_NURAFT})
cat("${CMAKE_SUPPORT_DIR}/nuraft_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif(${BUILD_WITH_NURAFT})
# iconv
if(${BUILD_WITH_ICONV})
cat("${CMAKE_SUPPORT_DIR}/iconv_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif(${BUILD_WITH_ICONV})
# download dependencies
configure_file(${CONTRIB_TMP_FILE} "${CMAKE_CONTRIB_DIR}/deps-download/CMakeLists.txt")
execute_process(COMMAND "${CMAKE_COMMAND}" -G "${CMAKE_GENERATOR}" .
......@@ -208,14 +213,10 @@ endif(${BUILD_WITH_TRAFT})
# LIBUV
if(${BUILD_WITH_UV})
if (NOT ${CMAKE_SYSTEM_NAME} MATCHES "Windows")
MESSAGE("Windows need set no-sign-compare")
add_compile_options(-Wno-sign-compare)
endif ()
if (${CMAKE_SYSTEM_NAME} MATCHES "Windows")
file(READ "libuv/include/uv.h" CONTENTS)
string(REGEX REPLACE "/([\r]*)\nstruct uv_tcp_s {" "/\\1\ntypedef BOOL (PASCAL *LPFN_CONNECTEX) (SOCKET s, const struct sockaddr* name, int namelen, PVOID lpSendBuffer, DWORD dwSendDataLength,LPDWORD lpdwBytesSent, LPOVERLAPPED lpOverlapped);\\1\nstruct uv_tcp_s {" CONTENTS_NEW "${CONTENTS}")
file(WRITE "libuv/include/uv.h" "${CONTENTS_NEW}")
if (${TD_WINDOWS})
file(READ "libuv/include/uv.h" CONTENTS)
string(REGEX REPLACE "/([\r]*)\nstruct uv_tcp_s {" "/\\1\ntypedef BOOL (PASCAL *LPFN_CONNECTEX) (SOCKET s, const struct sockaddr* name, int namelen, PVOID lpSendBuffer, DWORD dwSendDataLength,LPDWORD lpdwBytesSent, LPOVERLAPPED lpOverlapped);\\1\nstruct uv_tcp_s {" CONTENTS_NEW "${CONTENTS}")
file(WRITE "libuv/include/uv.h" "${CONTENTS_NEW}")
endif ()
add_subdirectory(libuv)
endif(${BUILD_WITH_UV})
......@@ -249,10 +250,14 @@ endif(${BUILD_WITH_SQLITE})
# pthread
if(${BUILD_PTHREAD})
ADD_DEFINITIONS("-DPTW32_STATIC_LIB")
add_subdirectory(pthread-win32)
add_definitions(-DPTW32_STATIC_LIB)
add_subdirectory(pthread)
endif(${BUILD_PTHREAD})
# iconv
if(${BUILD_WITH_ICONV})
add_subdirectory(iconv)
endif(${BUILD_WITH_ICONV})
# ================================================================================================
# Build test
......
......@@ -198,6 +198,11 @@ typedef struct {
};
} SMsgHead;
typedef struct {
int32_t workerType;
int32_t streamTaskId;
} SStreamExecMsgHead;
// Submit message for one table
typedef struct SSubmitBlk {
int64_t uid; // table unique id
......@@ -1892,9 +1897,9 @@ static FORCE_INLINE void* tDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW)
return buf;
}
typedef struct {
int8_t version; // for compatibility(default 0)
int8_t intervalUnit; // MACRO: TIME_UNIT_XXX
int8_t slidingUnit; // MACRO: TIME_UNIT_XXX
int8_t version; // for compatibility(default 0)
int8_t intervalUnit; // MACRO: TIME_UNIT_XXX
int8_t slidingUnit; // MACRO: TIME_UNIT_XXX
char indexName[TSDB_INDEX_NAME_LEN];
char timezone[TD_TIMEZONE_LEN]; // sma data expired if timezone changes.
int32_t exprLen;
......@@ -1902,7 +1907,7 @@ typedef struct {
int64_t indexUid;
tb_uid_t tableUid; // super/child/common table uid
int64_t interval;
int64_t offset; // use unit by precision of DB
int64_t offset; // use unit by precision of DB
int64_t sliding;
char* expr; // sma expression
char* tagsFilter;
......@@ -2048,27 +2053,19 @@ static FORCE_INLINE void* tDecodeTSma(void* buf, STSma* pSma) {
buf = taosDecodeFixedI64(buf, &pSma->sliding);
if (pSma->exprLen > 0) {
pSma->expr = (char*)calloc(pSma->exprLen, 1);
if (pSma->expr != NULL) {
buf = taosDecodeStringTo(buf, pSma->expr);
} else {
if ((buf = taosDecodeString(buf, &pSma->expr)) == NULL) {
tdDestroyTSma(pSma);
return NULL;
}
} else {
pSma->expr = NULL;
}
if (pSma->tagsFilterLen > 0) {
pSma->tagsFilter = (char*)calloc(pSma->tagsFilterLen, 1);
if (pSma->tagsFilter != NULL) {
buf = taosDecodeStringTo(buf, pSma->tagsFilter);
} else {
if ((buf = taosDecodeString(buf, &pSma->tagsFilter)) == NULL) {
tdDestroyTSma(pSma);
return NULL;
}
} else {
pSma->tagsFilter = NULL;
}
......@@ -2311,7 +2308,7 @@ typedef struct {
} SStreamTaskDeployRsp;
typedef struct {
SMsgHead head;
SStreamExecMsgHead head;
// TODO: other info needed by task
} SStreamTaskExecReq;
......
......@@ -31,7 +31,7 @@ typedef struct SVariant {
uint64_t u;
double d;
char *pz;
wchar_t *wpz;
TdUcs4 *ucs4;
SArray *arr; // only for 'in' query to hold value list, not value for a field
};
} SVariant;
......
......@@ -22,15 +22,6 @@ extern "C" {
#include "osSocket.h"
#if defined(WINDOWS)
typedef int32_t FileFd;
typedef int32_t SocketFd;
#else
typedef int32_t FileFd;
typedef int32_t SocketFd;
#endif
int64_t taosRead(FileFd fd, void *buf, int64_t count);
// If the error is in a third-party library, place this header file under the third-party library header file.
#ifndef ALLOW_FORBID_FUNC
#define open OPEN_FUNC_TAOS_FORBID
......@@ -42,6 +33,7 @@ int64_t taosRead(FileFd fd, void *buf, int64_t count);
#define close CLOSE_FUNC_TAOS_FORBID
#define fclose FCLOSE_FUNC_TAOS_FORBID
#define fsync FSYNC_FUNC_TAOS_FORBID
#define getline GETLINE_FUNC_TAOS_FORBID
// #define fflush FFLUSH_FUNC_TAOS_FORBID
#endif
......@@ -49,15 +41,6 @@ int64_t taosRead(FileFd fd, void *buf, int64_t count);
#define PATH_MAX 256
#endif
typedef int32_t FileFd;
typedef struct TdFile {
pthread_rwlock_t rwlock;
int refId;
FileFd fd;
FILE *fp;
} * TdFilePtr, TdFile;
typedef struct TdFile *TdFilePtr;
#define TD_FILE_CTEATE 0x0001
......@@ -95,10 +78,6 @@ int64_t taosPReadFile(TdFilePtr pFile, void *buf, int64_t count, int64_t offset)
int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count);
void taosFprintfFile(TdFilePtr pFile, const char *format, ...);
#if defined(WINDOWS)
#define __restrict__
#endif // WINDOWS
int64_t taosGetLineFile(TdFilePtr pFile, char ** __restrict__ ptrBuf);
int32_t taosEOFFile(TdFilePtr pFile);
......@@ -111,15 +90,7 @@ int32_t taosRemoveFile(const char *path);
void taosGetTmpfilePath(const char *inputTmpDir, const char *fileNamePrefix, char *dstPath);
#if defined(_TD_DARWIN_64)
typedef int32_t SocketFd;
int64_t taosSendFile(SocketFd fdDst, FileFd pFileSrc, int64_t *offset, int64_t size);
int64_t taosFSendFile(FILE *pFileOut, FILE *pFileIn, int64_t *offset, int64_t size);
#else
int64_t taosSendFile(SocketFd fdDst, TdFilePtr pFileSrc, int64_t *offset, int64_t size);
int64_t taosFSendFile(TdFilePtr pFileOut, TdFilePtr pFileIn, int64_t *offset, int64_t size);
#endif
void *taosMmapReadOnlyFile(TdFilePtr pFile, int64_t length);
bool taosValidFile(TdFilePtr pFile);
......
......@@ -20,16 +20,28 @@
extern "C" {
#endif
typedef wchar_t TdWchar;
typedef int32_t TdUcs4;
// If the error is in a third-party library, place this header file under the third-party library header file.
#ifndef ALLOW_FORBID_FUNC
#define iconv_open ICONV_OPEN_FUNC_TAOS_FORBID
#define iconv_close ICONV_CLOSE_FUNC_TAOS_FORBID
#define iconv ICONV_FUNC_TAOS_FORBID
#define wcwidth WCWIDTH_FUNC_TAOS_FORBID
#define wcswidth WCSWIDTH_FUNC_TAOS_FORBID
#define mbtowc MBTOWC_FUNC_TAOS_FORBID
#define mbstowcs MBSTOWCS_FUNC_TAOS_FORBID
#define wctomb WCTOMB_FUNC_TAOS_FORBID
#define wcstombs WCSTOMBS_FUNC_TAOS_FORBID
#define wcsncpy WCSNCPY_FUNC_TAOS_FORBID
#define wchar_t WCHAR_T_FUNC_TAOS_FORBID
#endif
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
#define tstrdup(str) _strdup(str)
#define tstrndup(str, size) _strndup(str, size)
int32_t tgetline(char **lineptr, size_t *n, FILE *stream);
int32_t twcslen(const wchar_t *wcs);
#else
#define tstrdup(str) strdup(str)
#define tstrndup(str, size) strndup(str, size)
#define tgetline(lineptr, n, stream) getline(lineptr, n, stream)
#define twcslen wcslen
#endif
#define tstrncpy(dst, src, size) \
......@@ -38,14 +50,22 @@ extern "C" {
(dst)[(size)-1] = 0; \
} while (0)
int32_t taosUcs4len(TdUcs4 *ucs4);
int64_t taosStr2int64(const char *str);
// USE_LIBICONV
int32_t taosUcs4ToMbs(void *ucs4, int32_t ucs4_max_len, char *mbs);
bool taosMbsToUcs4(const char *mbs, size_t mbs_len, char *ucs4, int32_t ucs4_max_len, int32_t *len);
int32_t tasoUcs4Compare(void *f1_ucs4, void *f2_ucs4, int32_t bytes, int8_t ncharSize);
int32_t taosUcs4ToMbs(TdUcs4 *ucs4, int32_t ucs4_max_len, char *mbs);
bool taosMbsToUcs4(const char *mbs, size_t mbs_len, TdUcs4 *ucs4, int32_t ucs4_max_len, int32_t *len);
int32_t tasoUcs4Compare(TdUcs4 *f1_ucs4, TdUcs4 *f2_ucs4, int32_t bytes);
TdUcs4* tasoUcs4Copy(TdUcs4 *target_ucs4, TdUcs4 *source_ucs4, int32_t len_ucs4);
bool taosValidateEncodec(const char *encodec);
int32_t taosWcharWidth(TdWchar wchar);
int32_t taosWcharsWidth(TdWchar *pWchar, int32_t size);
int32_t taosMbToWchar(TdWchar *pWchar, const char *pStr, int32_t size);
int32_t taosMbsToWchars(TdWchar *pWchars, const char *pStrs, int32_t size);
int32_t taosWcharToMb(char *pStr, TdWchar wchar);
int32_t taosWcharsToMbs(char *pStrs, TdWchar *pWchars, int32_t size);
#ifdef __cplusplus
}
#endif
......
......@@ -46,7 +46,7 @@ typedef struct SPatternCompareInfo {
int32_t patternMatch(const char *pattern, const char *str, size_t size, const SPatternCompareInfo *pInfo);
int32_t WCSPatternMatch(const wchar_t *pattern, const wchar_t *str, size_t size, const SPatternCompareInfo *pInfo);
int32_t WCSPatternMatch(const TdUcs4 *pattern, const TdUcs4 *str, size_t size, const SPatternCompareInfo *pInfo);
int32_t taosArrayCompareString(const void *a, const void *b);
......
......@@ -41,7 +41,7 @@ extern const int32_t TYPE_BYTES[15];
#define DOUBLE_BYTES sizeof(double)
#define POINTER_BYTES sizeof(void *) // 8 by default assert(sizeof(ptrdiff_t) == sizseof(void*)
#define TSDB_KEYSIZE sizeof(TSKEY)
#define TSDB_NCHAR_SIZE sizeof(int32_t)
#define TSDB_NCHAR_SIZE sizeof(TdUcs4)
// NULL definition
#define TSDB_DATA_BOOL_NULL 0x02
......@@ -448,6 +448,11 @@ typedef struct {
#define SND_UNIQUE_THREAD_NUM 2
#define SND_SHARED_THREAD_NUM 2
enum {
SND_WORKER_TYPE__SHARED = 1,
SND_WORKER_TYPE__UNIQUE,
};
#ifdef __cplusplus
}
#endif
......
......@@ -199,8 +199,8 @@ void taosVariantCreateFromBinary(SVariant *pVar, const char *pz, size_t len, uin
case TSDB_DATA_TYPE_NCHAR: { // here we get the nchar length from raw binary bits length
size_t lenInwchar = len / TSDB_NCHAR_SIZE;
pVar->wpz = calloc(1, (lenInwchar + 1) * TSDB_NCHAR_SIZE);
memcpy(pVar->wpz, pz, lenInwchar * TSDB_NCHAR_SIZE);
pVar->ucs4 = calloc(1, (lenInwchar + 1) * TSDB_NCHAR_SIZE);
memcpy(pVar->ucs4, pz, lenInwchar * TSDB_NCHAR_SIZE);
pVar->nLen = (int32_t)len;
break;
......@@ -343,7 +343,7 @@ int32_t taosVariantToString(SVariant *pVar, char *dst) {
case TSDB_DATA_TYPE_NCHAR: {
dst[0] = '\'';
taosUcs4ToMbs(pVar->wpz, (twcslen(pVar->wpz) + 1) * TSDB_NCHAR_SIZE, dst + 1);
taosUcs4ToMbs(pVar->ucs4, (taosUcs4len(pVar->ucs4) + 1) * TSDB_NCHAR_SIZE, dst + 1);
int32_t len = (int32_t)strlen(dst);
dst[len] = '\'';
dst[len + 1] = 0;
......@@ -384,7 +384,7 @@ static FORCE_INLINE int32_t convertToBoolImpl(char *pStr, int32_t len) {
}
}
static FORCE_INLINE int32_t wcsconvertToBoolImpl(wchar_t *pstr, int32_t len) {
static FORCE_INLINE int32_t wcsconvertToBoolImpl(TdUcs4 *pstr, int32_t len) {
if ((wcsncasecmp(pstr, L"true", len) == 0) && (len == 4)) {
return TSDB_TRUE;
} else if (wcsncasecmp(pstr, L"false", len) == 0 && (len == 5)) {
......@@ -412,11 +412,11 @@ static int32_t toBinary(SVariant *pVariant, char **pDest, int32_t *pDestSize) {
pBuf = realloc(pBuf, newSize + 1);
}
taosUcs4ToMbs(pVariant->wpz, (int32_t)newSize, pBuf);
free(pVariant->wpz);
taosUcs4ToMbs(pVariant->ucs4, (int32_t)newSize, pBuf);
free(pVariant->ucs4);
pBuf[newSize] = 0;
} else {
taosUcs4ToMbs(pVariant->wpz, (int32_t)newSize, *pDest);
taosUcs4ToMbs(pVariant->ucs4, (int32_t)newSize, *pDest);
}
} else {
......@@ -460,8 +460,8 @@ static int32_t toNchar(SVariant *pVariant, char **pDest, int32_t *pDestSize) {
}
if (*pDest == pVariant->pz) {
wchar_t *pWStr = calloc(1, (nLen + 1) * TSDB_NCHAR_SIZE);
bool ret = taosMbsToUcs4(pDst, nLen, (char *)pWStr, (nLen + 1) * TSDB_NCHAR_SIZE, NULL);
TdUcs4 *pWStr = calloc(1, (nLen + 1) * TSDB_NCHAR_SIZE);
bool ret = taosMbsToUcs4(pDst, nLen, pWStr, (nLen + 1) * TSDB_NCHAR_SIZE, NULL);
if (!ret) {
tfree(pWStr);
return -1;
......@@ -469,21 +469,21 @@ static int32_t toNchar(SVariant *pVariant, char **pDest, int32_t *pDestSize) {
// free the binary buffer in the first place
if (pVariant->nType == TSDB_DATA_TYPE_BINARY) {
free(pVariant->wpz);
free(pVariant->ucs4);
}
pVariant->wpz = pWStr;
*pDestSize = twcslen(pVariant->wpz);
pVariant->ucs4 = pWStr;
*pDestSize = taosUcs4len(pVariant->ucs4);
// shrink the allocate memory, no need to check here.
char *tmp = realloc(pVariant->wpz, (*pDestSize + 1) * TSDB_NCHAR_SIZE);
char *tmp = realloc(pVariant->ucs4, (*pDestSize + 1) * TSDB_NCHAR_SIZE);
assert(tmp != NULL);
pVariant->wpz = (wchar_t *)tmp;
pVariant->ucs4 = (TdUcs4 *)tmp;
} else {
int32_t output = 0;
bool ret = taosMbsToUcs4(pDst, nLen, *pDest, (nLen + 1) * TSDB_NCHAR_SIZE, &output);
bool ret = taosMbsToUcs4(pDst, nLen, (TdUcs4*)*pDest, (nLen + 1) * TSDB_NCHAR_SIZE, &output);
if (!ret) {
return -1;
}
......@@ -554,7 +554,7 @@ static FORCE_INLINE int32_t convertToInteger(SVariant *pVariant, int64_t *result
*result = res;
} else if (pVariant->nType == TSDB_DATA_TYPE_NCHAR) {
errno = 0;
wchar_t *endPtr = NULL;
TdUcs4 *endPtr = NULL;
SToken token = {0};
token.n = tGetToken(pVariant->pz, &token.type);
......@@ -564,7 +564,7 @@ static FORCE_INLINE int32_t convertToInteger(SVariant *pVariant, int64_t *result
}
if (token.type == TK_FLOAT) {
double v = wcstod(pVariant->wpz, &endPtr);
double v = wcstod(pVariant->ucs4, &endPtr);
if (releaseVariantPtr) {
free(pVariant->pz);
pVariant->nLen = 0;
......@@ -583,7 +583,7 @@ static FORCE_INLINE int32_t convertToInteger(SVariant *pVariant, int64_t *result
setNull((char *)result, type, tDataTypes[type].bytes);
return 0;
} else {
int64_t val = wcstoll(pVariant->wpz, &endPtr, 10);
int64_t val = wcstoll(pVariant->ucs4, &endPtr, 10);
if (releaseVariantPtr) {
free(pVariant->pz);
pVariant->nLen = 0;
......@@ -649,7 +649,7 @@ static int32_t convertToBool(SVariant *pVariant, int64_t *pDest) {
*pDest = ret;
} else if (pVariant->nType == TSDB_DATA_TYPE_NCHAR) {
int32_t ret = 0;
if ((ret = wcsconvertToBoolImpl(pVariant->wpz, pVariant->nLen)) < 0) {
if ((ret = wcsconvertToBoolImpl(pVariant->ucs4, pVariant->nLen)) < 0) {
return ret;
}
*pDest = ret;
......@@ -899,7 +899,7 @@ int32_t tVariantDumpEx(SVariant *pVariant, char *payload, int16_t type, bool inc
return -1;
}
} else {
wcsncpy((wchar_t *)payload, pVariant->wpz, pVariant->nLen);
tasoUcs4Copy((TdUcs4*)payload, pVariant->ucs4, pVariant->nLen);
}
}
} else {
......@@ -913,7 +913,7 @@ int32_t tVariantDumpEx(SVariant *pVariant, char *payload, int16_t type, bool inc
return -1;
}
} else {
memcpy(p, pVariant->wpz, pVariant->nLen);
memcpy(p, pVariant->ucs4, pVariant->nLen);
newlen = pVariant->nLen;
}
......@@ -979,7 +979,7 @@ int32_t taosVariantTypeSetType(SVariant *pVariant, char type) {
pVariant->d = v;
} else if (pVariant->nType == TSDB_DATA_TYPE_NCHAR) {
errno = 0;
double v = wcstod(pVariant->wpz, NULL);
double v = wcstod(pVariant->ucs4, NULL);
if ((errno == ERANGE && v == -1) || (isinf(v) || isnan(v))) {
free(pVariant->pz);
return -1;
......
......@@ -24,12 +24,17 @@ extern "C" {
int32_t dndInitSnode(SDnode *pDnode);
void dndCleanupSnode(SDnode *pDnode);
void dndProcessSnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
// void dndProcessSnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t dndProcessCreateSnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t dndProcessDropSnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
void dndProcessSnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessSnodeUniqueMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessSnodeSharedMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessSnodeExecMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DND_SNODE_H_*/
\ No newline at end of file
#endif /*_TD_DND_SNODE_H_*/
......@@ -382,6 +382,12 @@ static void dndProcessSnodeSharedQueue(SDnode *pDnode, SRpcMsg *pMsg) {
taosFreeQitem(pMsg);
}
static FORCE_INLINE int32_t dndGetSWTypeFromMsg(SRpcMsg *pMsg) {
SStreamExecMsgHead *pHead = pMsg->pCont;
pHead->workerType = htonl(pHead->workerType);
return pHead->workerType;
}
static FORCE_INLINE int32_t dndGetSWIdFromMsg(SRpcMsg *pMsg) {
SMsgHead *pHead = pMsg->pCont;
pHead->streamTaskId = htonl(pHead->streamTaskId);
......@@ -450,6 +456,18 @@ void dndProcessSnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
dndWriteSnodeMsgToMgmtWorker(pDnode, pMsg);
}
void dndProcessSnodeExecMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
SSnode *pSnode = dndAcquireSnode(pDnode);
if (pSnode != NULL) {
int32_t workerType = dndGetSWTypeFromMsg(pMsg);
if (workerType == SND_WORKER_TYPE__SHARED) {
dndWriteSnodeMsgToWorker(pDnode, &pDnode->smgmt.sharedWorker, pMsg);
} else {
dndWriteSnodeMsgToWorkerByMsg(pDnode, pMsg);
}
}
}
void dndProcessSnodeUniqueMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
dndWriteSnodeMsgToWorkerByMsg(pDnode, pMsg);
}
......
......@@ -23,10 +23,11 @@
#include "dndTransport.h"
#include "dndMgmt.h"
#include "dndMnode.h"
#include "dndSnode.h"
#include "dndVnodes.h"
#define INTERNAL_USER "_dnd"
#define INTERNAL_CKEY "_key"
#define INTERNAL_USER "_dnd"
#define INTERNAL_CKEY "_key"
#define INTERNAL_SECRET "_pwd"
static void dndInitMsgFp(STransMgmt *pMgmt) {
......@@ -153,10 +154,14 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CUR)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CONSUME)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY_HEARTBEAT)] = dndProcessVnodeFetchMsg;
// Requests handled by SNODE
pMgmt->msgFp[TMSG_INDEX(TDMT_SND_TASK_DEPLOY)] = dndProcessSnodeMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_SND_TASK_EXEC)] = dndProcessSnodeExecMsg;
}
static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) {
SDnode * pDnode = parent;
SDnode *pDnode = parent;
STransMgmt *pMgmt = &pDnode->tmgmt;
tmsg_t msgType = pRsp->msgType;
......@@ -219,7 +224,7 @@ static void dndCleanupClient(SDnode *pDnode) {
}
static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) {
SDnode * pDnode = param;
SDnode *pDnode = param;
STransMgmt *pMgmt = &pDnode->tmgmt;
tmsg_t msgType = pReq->msgType;
......@@ -313,7 +318,7 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char
SAuthReq authReq = {0};
tstrncpy(authReq.user, user, TSDB_USER_LEN);
int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq);
void * pReq = rpcMallocCont(contLen);
void *pReq = rpcMallocCont(contLen);
tSerializeSAuthReq(pReq, contLen, &authReq);
SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528};
......
......@@ -26,8 +26,9 @@ typedef struct SDBFile SDBFile;
typedef DB_ENV* TDBEnv;
struct SDBFile {
DB* pDB;
char* path;
int32_t fid;
DB* pDB;
char* path;
};
int32_t tsdbOpenDBF(TDBEnv pEnv, SDBFile* pDBF);
......
......@@ -884,7 +884,7 @@ const char *metaSmaCursorNext(SMSmaCursor *pCur) {
STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid) {
STSmaWrapper *pSW = NULL;
pSW = calloc(sizeof(*pSW), 1);
pSW = calloc(1, sizeof(*pSW));
if (pSW == NULL) {
return NULL;
}
......
......@@ -3386,7 +3386,7 @@ void filterPrepare(void* expr, void* param) {
if (size < (uint32_t)pSchema->bytes) {
size = pSchema->bytes;
}
// to make sure tonchar does not cause invalid write, since the '\0' needs at least sizeof(wchar_t) space.
// to make sure tonchar does not cause invalid write, since the '\0' needs at least sizeof(TdUcs4) space.
pInfo->q = calloc(1, size + TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE);
tVariantDump(pCond, pInfo->q, pSchema->type, true);
}
......
......@@ -156,10 +156,6 @@ static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SSmaEnv **pEnv) {
return TSDB_CODE_FAILED;
}
if (*pEnv) {
return TSDB_CODE_SUCCESS;
}
if (*pEnv == NULL) {
if ((*pEnv = tsdbNewSmaEnv(pTsdb, path)) == NULL) {
return TSDB_CODE_FAILED;
......@@ -260,10 +256,15 @@ static SSmaStatItem *tsdbNewSmaStatItem(int8_t state) {
int32_t tsdbDestroySmaState(SSmaStat *pSmaStat) {
if (pSmaStat) {
// TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready.
SSmaStatItem *item = taosHashIterate(pSmaStat->smaStatItems, NULL);
void *item = taosHashIterate(pSmaStat->smaStatItems, NULL);
while (item != NULL) {
tfree(item->pSma);
taosHashCleanup(item->expiredWindows);
SSmaStatItem *pItem = *(SSmaStatItem **)item;
if (pItem != NULL) {
tdDestroyTSma(pItem->pSma);
tfree(pItem->pSma);
taosHashCleanup(pItem->expiredWindows);
tfree(pItem);
}
item = taosHashIterate(pSmaStat->smaStatItems, item);
}
taosHashCleanup(pSmaStat->smaStatItems);
......@@ -292,9 +293,10 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) {
// init sma env
tsdbLockRepo(pTsdb);
if (pTsdb->pTSmaEnv == NULL) {
pEnv = (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_load_ptr(&pTsdb->pTSmaEnv) : atomic_load_ptr(&pTsdb->pRSmaEnv);
if (pEnv == NULL) {
char rname[TSDB_FILENAME_LEN] = {0};
char aname[TSDB_FILENAME_LEN * 2 + 32] = {0}; // TODO: make TMPNAME_LEN public as TSDB_FILENAME_LEN?
char aname[TSDB_FILENAME_LEN] = {0}; // use TSDB_FILENAME_LEN currently
SDiskID did = {0};
tfsAllocDisk(pTsdb->pTfs, TFS_PRIMARY_LEVEL, &did);
......@@ -315,11 +317,8 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) {
return TSDB_CODE_FAILED;
}
if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
atomic_store_ptr(&pTsdb->pTSmaEnv, pEnv);
} else {
atomic_store_ptr(&pTsdb->pRSmaEnv, pEnv);
}
(smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_store_ptr(&pTsdb->pTSmaEnv, pEnv)
: atomic_store_ptr(&pTsdb->pRSmaEnv, pEnv);
}
tsdbUnlockRepo(pTsdb);
......@@ -359,8 +358,10 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, ETsdbSmaType smaType, char *msg) {
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
SHashObj *pItemsHash = SMA_ENV_STAT_ITEMS(pEnv);
TASSERT(pEnv != NULL && pStat != NULL && pItemsHash != NULL);
tsdbRefSmaStat(pTsdb, pStat);
SSmaStatItem *pItem = (SSmaStatItem *)taosHashGet(pItemsHash, &indexUid, sizeof(indexUid));
SSmaStatItem *pItem = taosHashGet(pItemsHash, &indexUid, sizeof(indexUid));
if (pItem == NULL) {
pItem = tsdbNewSmaStatItem(TSDB_SMA_STAT_EXPIRED); // TODO use the real state
if (pItem == NULL) {
......@@ -421,9 +422,9 @@ static int32_t tsdbResetExpiredWindow(STsdb *pTsdb, SSmaStat *pStat, int64_t ind
tsdbRefSmaStat(pTsdb, pStat);
if (pStat && pStat->smaStatItems) {
pItem = *(SSmaStatItem **)taosHashGet(pStat->smaStatItems, &indexUid, sizeof(indexUid));
pItem = taosHashGet(pStat->smaStatItems, &indexUid, sizeof(indexUid));
}
if (pItem != NULL) {
if ((pItem != NULL) && ((pItem = *(SSmaStatItem **)pItem) != NULL)) {
// pItem resides in hash buffer all the time unless drop sma index
// TODO: multithread protect
if (taosHashRemove(pItem->expiredWindows, &skey, sizeof(TSKEY)) != 0) {
......@@ -494,7 +495,7 @@ static int32_t tsdbGetSmaStorageLevel(int64_t interval, int8_t intervalUnit) {
* @brief Insert TSma data blocks to DB File build by B+Tree
*
* @param pSmaH
* @param smaKey
* @param smaKey tableUid-colId-skeyOfWindow(8-2-8)
* @param keyLen
* @param pData
* @param dataLen
......@@ -502,12 +503,11 @@ static int32_t tsdbGetSmaStorageLevel(int64_t interval, int8_t intervalUnit) {
*/
static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t keyLen, void *pData, uint32_t dataLen) {
SDBFile *pDBFile = &pSmaH->dFile;
// TODO: insert sma data blocks into B+Tree
tsdbDebug("vgId:%d insert sma data blocks into %s: smaKey %" PRIx64 "-%" PRIu16 "-%" PRIx64 ", dataLen %d",
REPO_ID(pSmaH->pTsdb), pDBFile->path, *(tb_uid_t *)smaKey, *(uint16_t *)POINTER_SHIFT(smaKey, 8),
*(int64_t *)POINTER_SHIFT(smaKey, 10), dataLen);
// TODO: insert sma data blocks into B+Tree(TDB)
if (tsdbSaveSmaToDB(pDBFile, smaKey, keyLen, pData, dataLen) != 0) {
return TSDB_CODE_FAILED;
}
......@@ -564,34 +564,34 @@ static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit
return interval / 1e3;
} else if (TIME_UNIT_NANOSECOND == intervalUnit) { // nano second
return interval / 1e6;
} else {
} else { // ms
return interval;
}
break;
case TSDB_TIME_PRECISION_MICRO:
if (TIME_UNIT_MICROSECOND == intervalUnit) { // us
return interval;
} else if (TIME_UNIT_NANOSECOND == intervalUnit) { // nano second
} else if (TIME_UNIT_NANOSECOND == intervalUnit) { // ns
return interval / 1e3;
} else {
} else { // ms
return interval * 1e3;
}
break;
case TSDB_TIME_PRECISION_NANO:
if (TIME_UNIT_MICROSECOND == intervalUnit) {
if (TIME_UNIT_MICROSECOND == intervalUnit) { // us
return interval * 1e3;
} else if (TIME_UNIT_NANOSECOND == intervalUnit) { // nano second
} else if (TIME_UNIT_NANOSECOND == intervalUnit) { // ns
return interval;
} else {
} else { // ms
return interval * 1e6;
}
break;
default: // ms
if (TIME_UNIT_MICROSECOND == intervalUnit) { // us
return interval / 1e3;
} else if (TIME_UNIT_NANOSECOND == intervalUnit) { // nano second
} else if (TIME_UNIT_NANOSECOND == intervalUnit) { // ns
return interval / 1e6;
} else {
} else { // ms
return interval;
}
break;
......@@ -663,9 +663,13 @@ static void tsdbDestroyTSmaWriteH(STSmaWriteH *pSmaH) {
static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t storageLevel, int32_t fid) {
STsdb *pTsdb = pSmaH->pTsdb;
ASSERT(pSmaH->dFile.path == NULL && pSmaH->dFile.pDB == NULL);
pSmaH->dFile.fid = fid;
char tSmaFile[TSDB_FILENAME_LEN] = {0};
snprintf(tSmaFile, TSDB_FILENAME_LEN, "v%df%d.tsma", REPO_ID(pTsdb), fid);
pSmaH->dFile.path = strdup(tSmaFile);
return TSDB_CODE_SUCCESS;
}
......@@ -705,7 +709,7 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) {
STsdbCfg * pCfg = REPO_CFG(pTsdb);
STSmaDataWrapper *pData = (STSmaDataWrapper *)msg;
if (!pTsdb->pTSmaEnv) {
if (!atomic_load_ptr(&pTsdb->pTSmaEnv)) {
terrno = TSDB_CODE_INVALID_PTR;
tsdbWarn("vgId:%d insert tSma data failed since pTSmaEnv is NULL", REPO_ID(pTsdb));
return terrno;
......@@ -883,15 +887,15 @@ static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey) {
static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval,
int8_t intervalUnit, tb_uid_t tableUid, col_id_t colId, TSKEY querySKey,
int32_t nMaxResult) {
if (!pTsdb->pTSmaEnv) {
if (!atomic_load_ptr(&pTsdb->pTSmaEnv)) {
terrno = TSDB_CODE_INVALID_PTR;
tsdbWarn("vgId:%d getTSmaDataImpl failed since pTSmaEnv is NULL", REPO_ID(pTsdb));
return TSDB_CODE_FAILED;
}
tsdbRefSmaStat(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv));
SSmaStatItem *pItem = *(SSmaStatItem **)taosHashGet(SMA_ENV_STAT_ITEMS(pTsdb->pTSmaEnv), &indexUid, sizeof(indexUid));
if (pItem == NULL) {
SSmaStatItem *pItem = taosHashGet(SMA_ENV_STAT_ITEMS(pTsdb->pTSmaEnv), &indexUid, sizeof(indexUid));
if ((pItem == NULL) || ((pItem = *(SSmaStatItem **)pItem) == NULL)) {
// Normally pItem should not be NULL, mark all windows as expired and notify query module to fetch raw TS data if
// it's NULL.
tsdbUnRefSmaStat(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv));
......
......@@ -144,6 +144,9 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
return -1;
}
// record current timezone of server side
tstrncpy(vCreateSmaReq.tSma.timezone, tsTimezone, TD_TIMEZONE_LEN);
if (metaCreateTSma(pVnode->pMeta, &vCreateSmaReq) < 0) {
// TODO: handle error
tdDestroyTSma(&vCreateSmaReq.tSma);
......
......@@ -49,7 +49,7 @@ TEST(testCase, tSma_Meta_Encode_Decode_Test) {
STSmaWrapper tSmaWrapper = {.number = 1, .tSma = &tSma};
uint32_t bufLen = tEncodeTSmaWrapper(NULL, &tSmaWrapper);
void *buf = calloc(bufLen, 1);
void *buf = calloc(1, bufLen);
ASSERT_NE(buf, nullptr);
STSmaWrapper *pSW = (STSmaWrapper *)buf;
......@@ -84,6 +84,7 @@ TEST(testCase, tSma_Meta_Encode_Decode_Test) {
}
// resource release
tfree(pSW);
tdDestroyTSma(&tSma);
tdDestroyTSmaWrapper(&dstTSmaWrapper);
}
......@@ -113,7 +114,7 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
tSma.tableUid = tbUid;
tSma.exprLen = strlen(expr);
tSma.expr = (char *)calloc(tSma.exprLen + 1, 1);
tSma.expr = (char *)calloc(1, tSma.exprLen + 1);
ASSERT_NE(tSma.expr, nullptr);
tstrncpy(tSma.expr, expr, tSma.exprLen + 1);
......@@ -251,12 +252,12 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
tSma.tableUid = tbUid;
tSma.exprLen = strlen(expr);
tSma.expr = (char *)calloc(tSma.exprLen + 1, 1);
tSma.expr = (char *)calloc(1, tSma.exprLen + 1);
ASSERT_NE(tSma.expr, nullptr);
tstrncpy(tSma.expr, expr, tSma.exprLen + 1);
tSma.tagsFilterLen = strlen(tagsFilter);
tSma.tagsFilter = (char *)calloc(tSma.tagsFilterLen + 1, 1);
tSma.tagsFilter = (char *)calloc(1, tSma.tagsFilterLen + 1);
ASSERT_NE(tSma.tagsFilter, nullptr);
tstrncpy(tSma.tagsFilter, tagsFilter, tSma.tagsFilterLen + 1);
......@@ -273,20 +274,20 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
// step 2: insert data
STSmaDataWrapper *pSmaData = NULL;
STsdb tsdb = {0};
STsdbCfg * pCfg = &tsdb.config;
tsdb.pMeta = pMeta;
tsdb.vgId = 2;
tsdb.config.daysPerFile = 10; // default days is 10
tsdb.config.keep1 = 30;
tsdb.config.keep2 = 90;
tsdb.config.keep = 365;
tsdb.config.precision = TSDB_TIME_PRECISION_MILLI;
tsdb.config.update = TD_ROW_OVERWRITE_UPDATE;
tsdb.config.compression = TWO_STAGE_COMP;
switch (tsdb.config.precision) {
STsdb * pTsdb = (STsdb *)calloc(1, sizeof(STsdb));
STsdbCfg * pCfg = &pTsdb->config;
pTsdb->pMeta = pMeta;
pTsdb->vgId = 2;
pTsdb->config.daysPerFile = 10; // default days is 10
pTsdb->config.keep1 = 30;
pTsdb->config.keep2 = 90;
pTsdb->config.keep = 365;
pTsdb->config.precision = TSDB_TIME_PRECISION_MILLI;
pTsdb->config.update = TD_ROW_OVERWRITE_UPDATE;
pTsdb->config.compression = TWO_STAGE_COMP;
switch (pTsdb->config.precision) {
case TSDB_TIME_PRECISION_MILLI:
skey1 *= 1e3;
break;
......@@ -304,12 +305,12 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
SDiskCfg pDisks = {.level = 0, .primary = 1};
strncpy(pDisks.dir, "/var/lib/taos", TSDB_FILENAME_LEN);
int32_t numOfDisks = 1;
tsdb.pTfs = tfsOpen(&pDisks, numOfDisks);
ASSERT_NE(tsdb.pTfs, nullptr);
pTsdb->pTfs = tfsOpen(&pDisks, numOfDisks);
ASSERT_NE(pTsdb->pTfs, nullptr);
char *msg = (char *)calloc(1, 100);
ASSERT_NE(msg, nullptr);
ASSERT_EQ(tsdbUpdateSmaWindow(&tsdb, TSDB_SMA_TYPE_TIME_RANGE, msg), 0);
ASSERT_EQ(tsdbUpdateSmaWindow(pTsdb, TSDB_SMA_TYPE_TIME_RANGE, msg), 0);
// init
int32_t allocCnt = 0;
......@@ -367,13 +368,13 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
ASSERT_GE(bufSize, pSmaData->dataLen);
// execute
ASSERT_EQ(tsdbInsertTSmaData(&tsdb, (char *)pSmaData), TSDB_CODE_SUCCESS);
ASSERT_EQ(tsdbInsertTSmaData(pTsdb, (char *)pSmaData), TSDB_CODE_SUCCESS);
// step 3: query
uint32_t checkDataCnt = 0;
for (int32_t t = 0; t < numOfTables; ++t) {
for (col_id_t c = 0; c < numOfCols; ++c) {
ASSERT_EQ(tsdbGetTSmaData(&tsdb, NULL, indexUid1, interval1, intervalUnit1, tbUid + t,
ASSERT_EQ(tsdbGetTSmaData(pTsdb, NULL, indexUid1, interval1, intervalUnit1, tbUid + t,
c + PRIMARYKEY_TIMESTAMP_COL_ID, skey1, 1),
TSDB_CODE_SUCCESS);
++checkDataCnt;
......@@ -383,9 +384,12 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
printf("%s:%d The sma data check count for insert and query is %" PRIu32 "\n", __FILE__, __LINE__, checkDataCnt);
// release data
tfree(msg);
taosTZfree(buf);
// release meta
tdDestroyTSma(&tSma);
tfsClose(pTsdb->pTfs);
tsdbClose(pTsdb);
metaClose(pMeta);
}
#endif
......
......@@ -622,7 +622,7 @@ static FORCE_INLINE int32_t MemRowAppend(const void* value, int32_t len, void* p
// if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
int32_t output = 0;
const char* rowEnd = tdRowEnd(rb->pBuf);
if (!taosMbsToUcs4(value, len, (char*)varDataVal(rowEnd), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) {
if (!taosMbsToUcs4(value, len, (TdUcs4*)varDataVal(rowEnd), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) {
return TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
}
varDataSetLen(rowEnd, output);
......@@ -725,7 +725,7 @@ static int32_t KvRowAppend(const void *value, int32_t len, void *param) {
} else if (TSDB_DATA_TYPE_NCHAR == type) {
// if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
int32_t output = 0;
if (!taosMbsToUcs4(value, len, varDataVal(pa->buf), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) {
if (!taosMbsToUcs4(value, len, (TdUcs4*)varDataVal(pa->buf), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) {
return TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
}
......
......@@ -13,12 +13,11 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "mockCatalogService.h"
#include <iomanip>
#include <iostream>
#include <map>
#include "tdatablock.h"
#include "mockCatalogService.h"
#include "tname.h"
#include "ttypes.h"
......
......@@ -392,12 +392,12 @@ static int32_t createWindowLogicNodeByInterval(SLogicPlanContext* pCxt, SInterva
return TSDB_CODE_OUT_OF_MEMORY;
}
SValueNode* pIntervalNode = (SValueNode*)((SRawExprNode*)(pInterval->pInterval))->pNode;
pWindow->winType = WINDOW_TYPE_INTERVAL;
pWindow->interval = pIntervalNode->datum.i;
pWindow->interval = ((SValueNode*)pInterval->pInterval)->datum.i;
pWindow->intervalUnit = ((SValueNode*)pInterval->pInterval)->unit;
pWindow->offset = (NULL != pInterval->pOffset ? ((SValueNode*)pInterval->pOffset)->datum.i : 0);
pWindow->sliding = (NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->datum.i : pWindow->interval);
pWindow->slidingUnit = (NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->unit : pWindow->intervalUnit);
int32_t code = TSDB_CODE_SUCCESS;
......
......@@ -1813,7 +1813,7 @@ int32_t fltInitValFieldData(SFilterInfo *info) {
if(type == TSDB_DATA_TYPE_NCHAR &&
(unit->compare.optr == OP_TYPE_MATCH || unit->compare.optr == OP_TYPE_NMATCH)){
char newValData[TSDB_REGEX_STRING_DEFAULT_LEN * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE] = {0};
int32_t len = taosUcs4ToMbs(varDataVal(fi->data), varDataLen(fi->data), varDataVal(newValData));
int32_t len = taosUcs4ToMbs((TdUcs4*)varDataVal(fi->data), varDataLen(fi->data), varDataVal(newValData));
if (len < 0){
qError("filterInitValFieldData taosUcs4ToMbs error 1");
return TSDB_CODE_QRY_APP_ERROR;
......@@ -2992,7 +2992,7 @@ bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDa
if(info->cunits[uidx].dataType == TSDB_DATA_TYPE_NCHAR && (info->cunits[uidx].optr == OP_TYPE_MATCH || info->cunits[uidx].optr == OP_TYPE_NMATCH)){
char *newColData = calloc(info->cunits[uidx].dataSize * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE, 1);
int32_t len = taosUcs4ToMbs(varDataVal(colData), varDataLen(colData), varDataVal(newColData));
int32_t len = taosUcs4ToMbs((TdUcs4*)varDataVal(colData), varDataLen(colData), varDataVal(newColData));
if (len < 0){
qError("castConvert1 taosUcs4ToMbs error");
}else{
......@@ -3052,7 +3052,7 @@ bool filterExecuteImpl(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDataAg
} else {
if(cunit->dataType == TSDB_DATA_TYPE_NCHAR && (cunit->optr == OP_TYPE_MATCH || cunit->optr == OP_TYPE_NMATCH)){
char *newColData = calloc(cunit->dataSize * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE, 1);
int32_t len = taosUcs4ToMbs(varDataVal(colData), varDataLen(colData), varDataVal(newColData));
int32_t len = taosUcs4ToMbs((TdUcs4*)varDataVal(colData), varDataLen(colData), varDataVal(newColData));
if (len < 0){
qError("castConvert1 taosUcs4ToMbs error");
}else{
......@@ -3433,7 +3433,7 @@ int32_t filterConverNcharColumns(SFilterInfo* info, int32_t rows, bool *gotNchar
varDataCopy(dst, src);
continue;
}
bool ret = taosMbsToUcs4(varDataVal(src), varDataLen(src), varDataVal(dst), bufSize, &len);
bool ret = taosMbsToUcs4(varDataVal(src), varDataLen(src), (TdUcs4*)varDataVal(dst), bufSize, &len);
if(!ret) {
qError("filterConverNcharColumns taosMbsToUcs4 error");
return TSDB_CODE_FAILED;
......
......@@ -316,7 +316,7 @@ int32_t vectorConvertFromVarData(SScalarParam* pIn, SScalarParam* pOut, int32_t
tmp = realloc(tmp, bufSize);
}
int len = taosUcs4ToMbs(varDataVal(pIn->data), varDataLen(pIn->data), tmp);
int len = taosUcs4ToMbs((TdUcs4*)varDataVal(pIn->data), varDataLen(pIn->data), tmp);
if (len < 0){
sclError("castConvert taosUcs4ToMbs error 1");
tfree(tmp);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_LIBS_SYNC_COMMIT_H
#define _TD_LIBS_SYNC_COMMIT_H
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "syncInt.h"
#include "taosdef.h"
// \* Leader i advances its commitIndex.
// \* This is done as a separate step from handling AppendEntries responses,
// \* in part to minimize atomic regions, and in part so that leaders of
// \* single-server clusters are able to mark entries committed.
// AdvanceCommitIndex(i) ==
// /\ state[i] = Leader
// /\ LET \* The set of servers that agree up through index.
// Agree(index) == {i} \cup {k \in Server :
// matchIndex[i][k] >= index}
// \* The maximum indexes for which a quorum agrees
// agreeIndexes == {index \in 1..Len(log[i]) :
// Agree(index) \in Quorum}
// \* New value for commitIndex'[i]
// newCommitIndex ==
// IF /\ agreeIndexes /= {}
// /\ log[i][Max(agreeIndexes)].term = currentTerm[i]
// THEN
// Max(agreeIndexes)
// ELSE
// commitIndex[i]
// IN commitIndex' = [commitIndex EXCEPT ![i] = newCommitIndex]
// /\ UNCHANGED <<messages, serverVars, candidateVars, leaderVars, log>>
//
void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode);
bool syncAgreeIndex(SSyncNode* pSyncNode, SRaftId* pRaftId, SyncIndex index);
bool syncAgree(SSyncNode* pSyncNode, SyncIndex index);
#ifdef __cplusplus
}
#endif
#endif /*_TD_LIBS_SYNC_COMMIT_H*/
......@@ -236,7 +236,6 @@ void syncNodeCandidate2Follower(SSyncNode* pSyncNode);
// raft vote --------------
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId);
void syncNodeVoteForSelf(SSyncNode* pSyncNode);
void syncNodeMaybeAdvanceCommitIndex(SSyncNode* pSyncNode);
// for debug --------------
void syncNodePrint(SSyncNode* pObj);
......
......@@ -25,6 +25,46 @@ extern "C" {
#include <stdlib.h>
#include "taosdef.h"
// TLA+ Spec
// Receive(m) ==
// LET i == m.mdest
// j == m.msource
// IN \* Any RPC with a newer term causes the recipient to advance
// \* its term first. Responses with stale terms are ignored.
// \/ UpdateTerm(i, j, m)
// \/ /\ m.mtype = RequestVoteRequest
// /\ HandleRequestVoteRequest(i, j, m)
// \/ /\ m.mtype = RequestVoteResponse
// /\ \/ DropStaleResponse(i, j, m)
// \/ HandleRequestVoteResponse(i, j, m)
// \/ /\ m.mtype = AppendEntriesRequest
// /\ HandleAppendEntriesRequest(i, j, m)
// \/ /\ m.mtype = AppendEntriesResponse
// /\ \/ DropStaleResponse(i, j, m)
// \/ HandleAppendEntriesResponse(i, j, m)
// DuplicateMessage(m) ==
// /\ Send(m)
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
// DropMessage(m) ==
// /\ Discard(m)
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
// Next == /\ \/ \E i \in Server : Restart(i)
// \/ \E i \in Server : Timeout(i)
// \/ \E i,j \in Server : RequestVote(i, j)
// \/ \E i \in Server : BecomeLeader(i)
// \/ \E i \in Server, v \in Value : ClientRequest(i, v)
// \/ \E i \in Server : AdvanceCommitIndex(i)
// \/ \E i,j \in Server : AppendEntries(i, j)
// \/ \E m \in DOMAIN messages : Receive(m)
// \/ \E m \in DOMAIN messages : DuplicateMessage(m)
// \/ \E m \in DOMAIN messages : DropMessage(m)
// \* History variable that tracks every log ever:
// /\ allLogs' = allLogs \cup {log[i] : i \in Server}
//
#ifdef __cplusplus
}
#endif
......
......@@ -94,6 +94,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
}
assert(pMsg->term <= ths->pRaftStore->currentTerm);
// reset elect timer
if (pMsg->term == ths->pRaftStore->currentTerm) {
ths->leaderCache = pMsg->srcId;
syncNodeResetElectTimer(ths);
......@@ -135,38 +136,117 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) {
syncNodeBecomeFollower(ths);
// need ret?
// ret or reply?
return ret;
}
// accept request
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_FOLLOWER && logOK) {
bool matchSuccess = false;
bool preMatch = false;
if (pMsg->prevLogIndex == SYNC_INDEX_INVALID &&
ths->pLogStore->getLastIndex(ths->pLogStore) == SYNC_INDEX_INVALID) {
matchSuccess = true;
preMatch = true;
}
if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
SSyncRaftEntry* pPreEntry = logStoreGetEntry(ths->pLogStore, pMsg->prevLogIndex);
assert(pPreEntry != NULL);
if (pMsg->prevLogTerm == pPreEntry->term) {
matchSuccess = true;
preMatch = true;
}
syncEntryDestory(pPreEntry);
}
if (matchSuccess) {
// delete conflict entries
if (pMsg->prevLogIndex < ths->pLogStore->getLastIndex(ths->pLogStore)) {
SyncIndex fromIndex = pMsg->prevLogIndex + 1;
ths->pLogStore->truncate(ths->pLogStore, fromIndex);
}
if (preMatch) {
// must has preIndex in local log
assert(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore));
bool hasExtraEntries = pMsg->prevLogIndex < ths->pLogStore->getLastIndex(ths->pLogStore);
bool hasAppendEntries = pMsg->dataLen > 0;
if (hasExtraEntries && hasAppendEntries) {
// conflict
bool conflict = false;
SyncIndex extraIndex = pMsg->prevLogIndex + 1;
SSyncRaftEntry* pExtraEntry = logStoreGetEntry(ths->pLogStore, extraIndex);
assert(pExtraEntry != NULL);
SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
assert(pAppendEntry != NULL);
assert(extraIndex == pAppendEntry->index);
if (pExtraEntry->term == pAppendEntry->term) {
conflict = true;
}
if (conflict) {
// roll back
SyncIndex delBegin = ths->pLogStore->getLastIndex(ths->pLogStore);
SyncIndex delEnd = extraIndex;
// notice! reverse roll back!
for (SyncIndex index = delEnd; index >= delBegin; --index) {
if (ths->pFsm->FpRollBackCb != NULL) {
SSyncRaftEntry* pRollBackEntry = logStoreGetEntry(ths->pLogStore, index);
assert(pRollBackEntry != NULL);
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg);
ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, pRollBackEntry->index, pRollBackEntry->isWeak, 0);
rpcFreeCont(rpcMsg.pCont);
syncEntryDestory(pRollBackEntry);
}
}
// delete confict entries
ths->pLogStore->truncate(ths->pLogStore, extraIndex);
// append new entries
ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry);
// pre commit
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
if (ths->pFsm != NULL) {
if (ths->pFsm->FpPreCommitCb != NULL) {
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 0);
}
}
rpcFreeCont(rpcMsg.pCont);
}
// free memory
syncEntryDestory(pExtraEntry);
syncEntryDestory(pAppendEntry);
// append one entry
if (pMsg->dataLen > 0) {
SSyncRaftEntry* pEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
ths->pLogStore->appendEntry(ths->pLogStore, pEntry);
syncEntryDestory(pEntry);
} else if (hasExtraEntries && !hasAppendEntries) {
// do nothing
} else if (!hasExtraEntries && hasAppendEntries) {
SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
assert(pAppendEntry != NULL);
// append new entries
ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry);
// pre commit
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
if (ths->pFsm != NULL) {
if (ths->pFsm->FpPreCommitCb != NULL) {
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 0);
}
}
rpcFreeCont(rpcMsg.pCont);
// free memory
syncEntryDestory(pAppendEntry);
} else if (!hasExtraEntries && !hasAppendEntries) {
// do nothing
} else {
assert(0);
}
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild();
......@@ -175,7 +255,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
pReply->term = ths->pRaftStore->currentTerm;
pReply->success = true;
if (pMsg->dataLen > 0) {
if (hasAppendEntries) {
pReply->matchIndex = pMsg->prevLogIndex + 1;
} else {
pReply->matchIndex = pMsg->prevLogIndex;
......@@ -201,11 +281,38 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
syncAppendEntriesReplyDestroy(pReply);
}
// maybe update commit index from leader
if (pMsg->commitIndex > ths->commitIndex) {
// has commit entry in local
if (pMsg->commitIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
// commit
SyncIndex beginIndex = ths->commitIndex + 1;
SyncIndex endIndex = pMsg->commitIndex;
// update commit index
ths->commitIndex = pMsg->commitIndex;
// call back Wal
ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex);
// execute fsm
if (ths->pFsm != NULL) {
for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
if (i != SYNC_INDEX_INVALID) {
SSyncRaftEntry* pEntry = ths->pLogStore->getEntry(ths->pLogStore, i);
assert(pEntry != NULL);
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg);
if (ths->pFsm->FpCommitCb != NULL) {
ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0);
}
rpcFreeCont(rpcMsg.pCont);
syncEntryDestory(pEntry);
}
}
}
}
}
}
......
......@@ -14,6 +14,7 @@
*/
#include "syncAppendEntriesReply.h"
#include "syncCommit.h"
#include "syncIndexMgr.h"
#include "syncInt.h"
#include "syncRaftLog.h"
......@@ -59,7 +60,7 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p
syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex);
// maybe commit
syncNodeMaybeAdvanceCommitIndex(ths);
syncMaybeAdvanceCommitIndex(ths);
} else {
SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
......
......@@ -13,9 +13,11 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "syncCommit.h"
#include "syncIndexMgr.h"
#include "syncInt.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
// \* Leader i advances its commitIndex.
// \* This is done as a separate step from handling AppendEntries responses,
......@@ -40,30 +42,80 @@
// IN commitIndex' = [commitIndex EXCEPT ![i] = newCommitIndex]
// /\ UNCHANGED <<messages, serverVars, candidateVars, leaderVars, log>>
//
void syncNodeMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pNextIndex", pSyncNode->pNextIndex);
syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pMatchIndex", pSyncNode->pMatchIndex);
// update commit index
SyncIndex newCommitIndex = pSyncNode->commitIndex;
for (SyncIndex index = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore); index > pSyncNode->commitIndex;
++index) {
if (syncAgree(pSyncNode, index)) {
// term
SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, index);
assert(pEntry != NULL);
if (pSyncNode->pFsm != NULL) {
SyncIndex beginIndex = SYNC_INDEX_INVALID;
SyncIndex endIndex = SYNC_INDEX_INVALID;
for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
if (i != SYNC_INDEX_INVALID) {
SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, i);
assert(pEntry != NULL);
// cannot commit, even if quorum agree. need check term!
if (pEntry->term == pSyncNode->pRaftStore->currentTerm) {
// update commit index
newCommitIndex = index;
break;
}
}
}
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg);
if (newCommitIndex > pSyncNode->commitIndex) {
SyncIndex beginIndex = pSyncNode->commitIndex + 1;
SyncIndex endIndex = newCommitIndex;
if (pSyncNode->pFsm->FpCommitCb != NULL) {
pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0);
}
// update commit index
pSyncNode->commitIndex = newCommitIndex;
// call back Wal
pSyncNode->pLogStore->updateCommitIndex(pSyncNode->pLogStore, pSyncNode->commitIndex);
// execute fsm
if (pSyncNode->pFsm != NULL) {
for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
if (i != SYNC_INDEX_INVALID) {
SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, i);
assert(pEntry != NULL);
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg);
if (pSyncNode->pFsm->FpCommitCb != NULL) {
pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0);
}
rpcFreeCont(rpcMsg.pCont);
syncEntryDestory(pEntry);
rpcFreeCont(rpcMsg.pCont);
syncEntryDestory(pEntry);
}
}
}
}
}
bool syncAgreeIndex(SSyncNode* pSyncNode, SRaftId* pRaftId, SyncIndex index) {
SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, pRaftId);
// b for debug
bool b = false;
if (matchIndex >= index) {
b = true;
}
return b;
}
bool syncAgree(SSyncNode* pSyncNode, SyncIndex index) {
int agreeCount = 0;
for (int i = 0; i < pSyncNode->replicaNum; ++i) {
if (syncAgreeIndex(pSyncNode, &(pSyncNode->replicasId[i]), index)) {
++agreeCount;
}
if (agreeCount >= pSyncNode->quorum) {
return true;
}
}
return false;
}
\ No newline at end of file
......@@ -17,6 +17,7 @@
#include "sync.h"
#include "syncAppendEntries.h"
#include "syncAppendEntriesReply.h"
#include "syncCommit.h"
#include "syncElection.h"
#include "syncEnv.h"
#include "syncIndexMgr.h"
......@@ -150,6 +151,30 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
// init life cycle
// TLA+ Spec
// InitHistoryVars == /\ elections = {}
// /\ allLogs = {}
// /\ voterLog = [i \in Server |-> [j \in {} |-> <<>>]]
// InitServerVars == /\ currentTerm = [i \in Server |-> 1]
// /\ state = [i \in Server |-> Follower]
// /\ votedFor = [i \in Server |-> Nil]
// InitCandidateVars == /\ votesResponded = [i \in Server |-> {}]
// /\ votesGranted = [i \in Server |-> {}]
// \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the
// \* leader does not send itself messages. It's still easier to include these
// \* in the functions.
// InitLeaderVars == /\ nextIndex = [i \in Server |-> [j \in Server |-> 1]]
// /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]]
// InitLogVars == /\ log = [i \in Server |-> << >>]
// /\ commitIndex = [i \in Server |-> 0]
// Init == /\ messages = [m \in {} |-> 0]
// /\ InitHistoryVars
// /\ InitServerVars
// /\ InitCandidateVars
// /\ InitLeaderVars
// /\ InitLogVars
//
// init TLA+ server vars
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
pSyncNode->pRaftStore = raftStoreOpen(pSyncNode->raftStorePath);
......@@ -727,6 +752,16 @@ static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) {
return ret;
}
// TLA+ Spec
// ClientRequest(i, v) ==
// /\ state[i] = Leader
// /\ LET entry == [term |-> currentTerm[i],
// value |-> v]
// newLog == Append(log[i], entry)
// IN log' = [log EXCEPT ![i] = newLog]
// /\ UNCHANGED <<messages, serverVars, candidateVars,
// leaderVars, commitIndex>>
//
static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) {
int32_t ret = 0;
syncClientRequestLog2("==syncNodeOnClientRequestCb==", pMsg);
......@@ -740,7 +775,7 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg
ths->pLogStore->appendEntry(ths->pLogStore, pEntry);
// only myself, maybe commit
syncNodeMaybeAdvanceCommitIndex(ths);
syncMaybeAdvanceCommitIndex(ths);
// start replicate right now!
syncNodeReplicate(ths);
......
......@@ -14,3 +14,43 @@
*/
#include "syncOnMessage.h"
// TLA+ Spec
// Receive(m) ==
// LET i == m.mdest
// j == m.msource
// IN \* Any RPC with a newer term causes the recipient to advance
// \* its term first. Responses with stale terms are ignored.
// \/ UpdateTerm(i, j, m)
// \/ /\ m.mtype = RequestVoteRequest
// /\ HandleRequestVoteRequest(i, j, m)
// \/ /\ m.mtype = RequestVoteResponse
// /\ \/ DropStaleResponse(i, j, m)
// \/ HandleRequestVoteResponse(i, j, m)
// \/ /\ m.mtype = AppendEntriesRequest
// /\ HandleAppendEntriesRequest(i, j, m)
// \/ /\ m.mtype = AppendEntriesResponse
// /\ \/ DropStaleResponse(i, j, m)
// \/ HandleAppendEntriesResponse(i, j, m)
// DuplicateMessage(m) ==
// /\ Send(m)
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
// DropMessage(m) ==
// /\ Discard(m)
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
// Next == /\ \/ \E i \in Server : Restart(i)
// \/ \E i \in Server : Timeout(i)
// \/ \E i,j \in Server : RequestVote(i, j)
// \/ \E i \in Server : BecomeLeader(i)
// \/ \E i \in Server, v \in Value : ClientRequest(i, v)
// \/ \E i \in Server : AdvanceCommitIndex(i)
// \/ \E i,j \in Server : AppendEntries(i, j)
// \/ \E m \in DOMAIN messages : Receive(m)
// \/ \E m \in DOMAIN messages : DuplicateMessage(m)
// \/ \E m \in DOMAIN messages : DropMessage(m)
// \* History variable that tracks every log ever:
// /\ allLogs' = allLogs \cup {log[i] : i \in Server}
//
\ No newline at end of file
......@@ -162,7 +162,7 @@ int main(int argc, char **argv) {
SyncClientRequest *pMsg1 = step1(pMsg0);
syncClientRequestPrint2((char *)"==step1==", pMsg1);
for (int i = 0; i < 5; ++i) {
for (int i = 0; i < 10; ++i) {
SyncClientRequest *pSyncClientRequest = pMsg1;
SRpcMsg rpcMsg;
syncClientRequest2RpcMsg(pSyncClientRequest, &rpcMsg);
......
......@@ -204,7 +204,8 @@ void tfsDirname(const STfsFile *pFile, char *dest) {
void tfsAbsoluteName(STfs *pTfs, SDiskID diskId, const char *rname, char *aname) {
STfsDisk *pDisk = TFS_DISK_AT(pTfs, diskId);
snprintf(aname, TMPNAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, rname);
snprintf(aname, TSDB_FILENAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, rname);
}
int32_t tfsRemoveFile(const STfsFile *pFile) { return taosRemoveFile(pFile->aname); }
......
......@@ -5,9 +5,14 @@ target_include_directories(
PUBLIC "${CMAKE_SOURCE_DIR}/include/os"
PUBLIC "${CMAKE_SOURCE_DIR}/include"
PUBLIC "${CMAKE_SOURCE_DIR}/include/util"
PUBLIC "${CMAKE_SOURCE_DIR}/contrib/pthread-win32"
PUBLIC "${CMAKE_SOURCE_DIR}/contrib/pthread"
PUBLIC "${CMAKE_SOURCE_DIR}/contrib/gnuregex"
)
# iconv
find_path(IconvApiIncludes iconv.h PATHS)
if(NOT IconvApiIncludes)
add_definitions(-DDISALLOW_NCHAR_WITHOUT_ICONV)
endif ()
target_link_libraries(
os pthread dl rt m
)
......@@ -46,6 +46,23 @@ extern int openU(const char *, int, ...); /* MsvcLibX UTF-8 version of open */
#define O_TEXT LINUX_FILE_NO_TEXT_OPTION
#endif
#if defined(WINDOWS)
typedef int32_t FileFd;
typedef int32_t SocketFd;
#else
typedef int32_t FileFd;
typedef int32_t SocketFd;
#endif
typedef int32_t FileFd;
typedef struct TdFile {
pthread_rwlock_t rwlock;
int refId;
FileFd fd;
FILE *fp;
} * TdFilePtr, TdFile;
#define FILE_WITH_LOCK 1
void taosGetTmpfilePath(const char *inputTmpDir, const char *fileNamePrefix, char *dstPath) {
......
......@@ -81,7 +81,7 @@ void taosSetSystemLocale(const char *inLocale, const char *inCharSet) {
}
if (!taosValidateEncodec(inCharSet)) {
printf("Invalid charset:%s, please set the valid charset in config file", inCharSet);
printf("Invalid charset:%s, please set the valid charset in config file\n", inCharSet);
exit(-1);
}
}
......
此差异已折叠。
......@@ -208,7 +208,7 @@ int32_t compareLenPrefixedWStr(const void *pLeft, const void *pRight) {
if (len1 != len2) {
return len1 > len2 ? 1 : -1;
} else {
int32_t ret = memcmp((wchar_t *)pLeft, (wchar_t *)pRight, len1);
int32_t ret = memcmp((TdUcs4 *)pLeft, (TdUcs4 *)pRight, len1);
if (ret == 0) {
return 0;
} else {
......@@ -295,10 +295,10 @@ int32_t patternMatch(const char *patterStr, const char *str, size_t size, const
return (str[j] == 0 || j >= size) ? TSDB_PATTERN_MATCH : TSDB_PATTERN_NOMATCH;
}
int32_t WCSPatternMatch(const wchar_t *patterStr, const wchar_t *str, size_t size, const SPatternCompareInfo *pInfo) {
wchar_t c, c1;
wchar_t matchOne = L'_'; // "_"
wchar_t matchAll = L'%'; // "%"
int32_t WCSPatternMatch(const TdUcs4 *patterStr, const TdUcs4 *str, size_t size, const SPatternCompareInfo *pInfo) {
TdUcs4 c, c1;
TdUcs4 matchOne = L'_'; // "_"
TdUcs4 matchAll = L'%'; // "%"
int32_t i = 0;
int32_t j = 0;
......@@ -315,7 +315,7 @@ int32_t WCSPatternMatch(const wchar_t *patterStr, const wchar_t *str, size_t siz
return TSDB_PATTERN_MATCH;
}
wchar_t accept[3] = {towupper(c), towlower(c), 0};
TdUcs4 accept[3] = {towupper(c), towlower(c), 0};
while (1) {
size_t n = wcscspn(str, accept);
......@@ -424,10 +424,10 @@ int32_t compareWStrPatternMatch(const void *pLeft, const void *pRight) {
assert(varDataLen(pRight) <= TSDB_MAX_FIELD_LEN * TSDB_NCHAR_SIZE);
wchar_t *pattern = calloc(varDataLen(pRight) + 1, sizeof(wchar_t));
char *pattern = calloc(varDataLen(pRight) + TSDB_NCHAR_SIZE, 1);
memcpy(pattern, varDataVal(pRight), varDataLen(pRight));
int32_t ret = WCSPatternMatch(pattern, (const wchar_t *)varDataVal(pLeft), varDataLen(pLeft) / TSDB_NCHAR_SIZE, &pInfo);
int32_t ret = WCSPatternMatch((TdUcs4*)pattern, (TdUcs4*)varDataVal(pLeft), varDataLen(pLeft) / TSDB_NCHAR_SIZE, &pInfo);
free(pattern);
return (ret == TSDB_PATTERN_MATCH) ? 0 : 1;
......@@ -647,7 +647,7 @@ int32_t doCompare(const char *f1, const char *f2, int32_t type, size_t size) {
if (t1->len != t2->len) {
return t1->len > t2->len ? 1 : -1;
}
int32_t ret = memcmp((wchar_t *)t1, (wchar_t *)t2, t2->len);
int32_t ret = memcmp((TdUcs4 *)t1, (TdUcs4 *)t2, t2->len);
if (ret == 0) {
return ret;
}
......
......@@ -18,4 +18,7 @@
# ---- insert
./test.sh -f tsim/insert/basic0.sim
# ---- query
./test.sh -f tsim/query/interval.sim
#======================b1-end===============
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print =============== create database
sql create database d1
sql show databases
if $rows != 1 then
return -1
endi
print $data00 $data01 $data02
sql use d1
print =============== create super table, include all type
sql create table if not exists stb (ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint, c6 float, c7 double, c8 binary(16), c9 nchar(16), c10 timestamp, c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned) tags (t1 bool, t2 tinyint, t3 smallint, t4 int, t5 bigint, t6 float, t7 double, t8 binary(16), t9 nchar(16), t10 timestamp, t11 tinyint unsigned, t12 smallint unsigned, t13 int unsigned, t14 bigint unsigned)
sql create stable if not exists stb_1 (ts timestamp, i int) tags (j int)
sql create table stb_2 (ts timestamp, i int) tags (j int)
sql create stable stb_3 (ts timestamp, i int) tags (j int)
sql show stables
if $rows != 4 then
return -1
endi
print =============== create child table
sql create table c1 using stb tags(true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40)
sql create table c2 using stb tags(false, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 2', 'child tbl 2', '2022-02-25 18:00:00.000', 10, 20, 30, 40)
sql show tables
if $rows != 2 then
return -1
endi
print =============== insert data, mode1: one row one table in sql
print =============== insert data, mode1: mulit rows one table in sql
print =============== insert data, mode1: one rows mulit table in sql
print =============== insert data, mode1: mulit rows mulit table in sql
sql insert into c1 values(now+0s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40)
sql insert into c1 values(now+0s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40) (now+1s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40) (now+2s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40)
print =============== query data
sql select * from c1
if $rows != 4 then
return -1
endi
if $data01 != true then
return -1
endi
if $data02 != -1 then
return -1
endi
if $data03 != -2 then
return -1
endi
print =============== query data from st
sql select * from st
if $rows != 4 then
return -1
endi
print =============== stop and restart taosd
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s start
sleep 2000
print =============== query data
sql select * from c1
if $rows != 4 then
return -1
endi
if $data01 != true then
return -1
endi
if $data02 != -1 then
return -1
endi
if $data03 != -2 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
......@@ -47,161 +47,137 @@ $i = 1
$tb = $tbPrefix . $i
sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $tb interval(1m)
print ===> $rows
if $rows < $rowNum then
return -1
endi
if $data01 != 1 then
return -1
endi
if $data05 != 1 then
return -1
endi
print =============== step3
$cc = 4 * 60000
$ms = 1601481600000 + $cc
sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $tb where ts <= $ms interval(1m)
print ===> $rows
if $rows > 10 then
return -1
endi
if $rows < 3 then
return -1
endi
if $data01 != 1 then
return -1
endi
if $data05 != 1 then
return -1
endi
print =============== step4
$cc = 40 * 60000
$ms = 1601481600000 + $cc
$cc = 1 * 60000
$ms2 = 1601481600000 - $cc
sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $tb where ts <= $ms and ts > $ms2 interval(1m)
print ===> $rows
if $rows < 18 then
return -1
endi
if $rows > 22 then
return -1
endi
if $data01 != 1 then
return -1
endi
if $data05 != 1 then
return -1
endi
print =============== step5
$cc = 40 * 60000
$ms = 1601481600000 + $cc
$cc = 1 * 60000
$ms2 = 1601481600000 - $cc
sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $tb where ts <= $ms and ts > $ms2 interval(1m) fill(value,0)
print ===> $rows
if $rows < 30 then
return -1
endi
if $rows > 50 then
return -1
endi
if $data21 != 1 then
return -1
endi
if $data25 != 1 then
return -1
endi
print =============== step6
sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $mt interval(1m)
print ===> $rows
if $rows < 18 then
return -1
endi
if $rows > 22 then
return -1
endi
if $data11 > 15 then
return -1
endi
if $data11 < 5 then
return -1
endi
print =============== step7
$cc = 4 * 60000
$ms = 1601481600000 + $cc
sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $mt where ts <= $ms interval(1m)
print ===> $rows
if $rows < 3 then
return -1
endi
if $rows > 7 then
return -1
endi
if $data11 > 15 then
return -1
endi
if $data11 < 5 then
return -1
endi
print =============== step8
$cc = 40 * 60000
$ms1 = 1601481600000 + $cc
$cc = 1 * 60000
$ms2 = 1601481600000 - $cc
sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $mt where ts <= $ms1 and ts > $ms2 interval(1m)
print ===> $rows
if $rows < 18 then
return -1
endi
if $rows > 22 then
return -1
endi
if $data11 > 15 then
return -1
endi
if $data11 < 5 then
return -1
endi
print =============== step9
$cc = 40 * 60000
$ms1 = 1601481600000 + $cc
$cc = 1 * 60000
$ms2 = 1601481600000 - $cc
sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $mt where ts <= $ms1 and ts > $ms2 interval(1m) fill(value, 0)
if $rows < 30 then
return -1
endi
if $rows > 50 then
return -1
endi
if $data11 > 15 then
return -1
endi
if $data11 < 5 then
return -1
endi
print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $tb interval(1m)
print ===> $rows $data01 $data05
if $rows != $rowNum then
return -1
endi
if $data00 != 1 then
return -1
endi
if $data04 != 1 then
return -1
endi
#print =============== step3
#$cc = 4 * 60000
#$ms = 1601481600000 + $cc
#sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $tb where ts <= $ms interval(1m)
#print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $tb where ts <= $ms interval(1m)
#print ===> $rows $data01 $data05
#if $rows != 5 then
# return -1
#endi
#if $data00 != 1 then
# return -1
#endi
#if $data04 != 1 then
# return -1
#endi
#print =============== step4
#$cc = 40 * 60000
#$ms = 1601481600000 + $cc
#$cc = 1 * 60000
#$ms2 = 1601481600000 - $cc
#sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $tb where ts <= $ms and ts > $ms2 interval(1m)
#print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $tb where ts <= $ms and ts > $ms2 interval(1m)
#print ===> $rows $data01 $data05
#if $rows != 20 then
# return -1
#endi
#if $data00 != 1 then
# return -1
#endi
#if $data04 != 1 then
# return -1
#endi
#print =============== step5
#$cc = 40 * 60000
#$ms = 1601481600000 + $cc
#$cc = 1 * 60000
#$ms2 = 1601481600000 - $cc
#sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $tb where ts <= $ms and ts > $ms2 interval(1m) fill(value,0)
#print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $tb where ts <= $ms and ts > $ms2 interval(1m) fill(value,0)
#print ===> $rows $data21 $data25
#if $rows != 42 then
# return -1
#endi
#if $data20 != 1 then
# return -1
#endi
#if $data24 != 1 then
# return -1
#endi
#print =============== step6
#sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $mt interval(1m)
#print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $mt interval(1m)
#print ===> $rows $data11
#if $rows != 20 then
# return -1
#endi
#if $data11 != 10 then
# return -1
#endi
#print =============== step7
#$cc = 4 * 60000
#$ms = 1601481600000 + $cc
#sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $mt where ts <= $ms interval(1m)
#print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $mt where ts <= $ms interval(1m)
#print ===> $rows $data11
#if $rows != 5 then
# return -1
#endi
#if $data11 != 10 then
# return -1
#endi
#print =============== step8
#$cc = 40 * 60000
#$ms1 = 1601481600000 + $cc
#
#$cc = 1 * 60000
#$ms2 = 1601481600000 - $cc
#
#sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $mt where ts <= $ms1 and ts > $ms2 interval(1m)
#print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $mt where ts <= $ms1 and ts > $ms2 interval(1m)
#print ===> $rows $data11
#if $rows != 20 then
# return -1
#endi
#if $data11 != 10 then
# return -1
#endi
#
#print =============== step9
#$cc = 40 * 60000
#$ms1 = 1601481600000 + $cc
#
#$cc = 1 * 60000
#$ms2 = 1601481600000 - $cc
#
#sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $mt where ts <= $ms1 and ts > $ms2 interval(1m) fill(value, 0)
#print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $mt where ts <= $ms1 and ts > $ms2 interval(1m) fill(value, 0)
#print ===> $rows $data11
#if $rows != 42 then
# return -1
#endi
#if $data11 != 10 then
# return -1
#endi
print =============== clear
sql drop database $db
sql show databases
if $rows != 0 then
return -1
endi
#sql drop database $db
#sql show databases
#if $rows != 0 then
# return -1
#endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
......@@ -28,7 +28,6 @@
int indicator = 1;
struct termios oldtio;
extern int wcwidth(wchar_t c);
void insertChar(Command *cmd, char *c, int size);
......@@ -426,7 +425,7 @@ void showOnScreen(Command *cmd) {
w.ws_row = 30;
}
wchar_t wc;
TdWchar wc;
int size = 0;
// Print out the command.
......@@ -441,11 +440,11 @@ void showOnScreen(Command *cmd) {
int remain_column = w.ws_col;
/* size = cmd->commandSize + prompt_size; */
for (char *str = total_string; size < cmd->commandSize + prompt_size;) {
int ret = mbtowc(&wc, str, MB_CUR_MAX);
int ret = taosMbToWchar(&wc, str, MB_CUR_MAX);
if (ret < 0) break;
size += ret;
/* assert(size >= 0); */
int width = wcwidth(wc);
int width = taosWcharWidth(wc);
if (remain_column > width) {
printf("%lc", wc);
remain_column -= width;
......
......@@ -21,8 +21,6 @@
#include <regex.h>
extern int wcwidth(wchar_t c);
extern int wcswidth(const wchar_t *s, size_t n);
typedef struct {
char widthInString;
char widthOnScreen;
......@@ -43,7 +41,7 @@ int countPrefixOnes(unsigned char c) {
void getPrevCharSize(const char *str, int pos, int *size, int *width) {
assert(pos > 0);
wchar_t wc;
TdWchar wc;
*size = 0;
*width = 0;
......@@ -53,25 +51,25 @@ void getPrevCharSize(const char *str, int pos, int *size, int *width) {
if (str[pos] > 0 || countPrefixOnes((unsigned char )str[pos]) > 1) break;
}
int rc = mbtowc(&wc, str + pos, MB_CUR_MAX);
int rc = taosMbToWchar(&wc, str + pos, MB_CUR_MAX);
assert(rc == *size);
*width = wcwidth(wc);
*width = taosWcharWidth(wc);
}
void getNextCharSize(const char *str, int pos, int *size, int *width) {
assert(pos >= 0);
wchar_t wc;
*size = mbtowc(&wc, str + pos, MB_CUR_MAX);
*width = wcwidth(wc);
TdWchar wc;
*size = taosMbToWchar(&wc, str + pos, MB_CUR_MAX);
*width = taosWcharWidth(wc);
}
void insertChar(Command *cmd, char *c, int size) {
assert(cmd->cursorOffset <= cmd->commandSize && cmd->endOffset >= cmd->screenOffset);
wchar_t wc;
if (mbtowc(&wc, c, size) < 0) return;
TdWchar wc;
if (taosMbToWchar(&wc, c, size) < 0) return;
clearScreen(cmd->endOffset + prompt_size, cmd->screenOffset + prompt_size);
/* update the buffer */
......@@ -81,8 +79,8 @@ void insertChar(Command *cmd, char *c, int size) {
/* update the values */
cmd->commandSize += size;
cmd->cursorOffset += size;
cmd->screenOffset += wcwidth(wc);
cmd->endOffset += wcwidth(wc);
cmd->screenOffset += taosWcharWidth(wc);
cmd->endOffset += taosWcharWidth(wc);
showOnScreen(cmd);
}
......@@ -249,10 +247,10 @@ int isReadyGo(Command *cmd) {
}
void getMbSizeInfo(const char *str, int *size, int *width) {
wchar_t *wc = (wchar_t *)calloc(sizeof(wchar_t), MAX_COMMAND_SIZE);
TdWchar *wc = (TdWchar *)calloc(sizeof(TdWchar), MAX_COMMAND_SIZE);
*size = strlen(str);
mbstowcs(wc, str, MAX_COMMAND_SIZE);
*width = wcswidth(wc, MAX_COMMAND_SIZE);
taosMbsToWchars(wc, str, MAX_COMMAND_SIZE);
*width = taosWcharsWidth(wc, MAX_COMMAND_SIZE);
free(wc);
}
......
......@@ -560,12 +560,12 @@ static int dumpResultToFile(const char *fname, TAOS_RES *tres) {
}
static void shellPrintNChar(const char *str, int length, int width) {
wchar_t tail[3];
TdWchar tail[3];
int pos = 0, cols = 0, totalCols = 0, tailLen = 0;
while (pos < length) {
wchar_t wc;
int bytes = mbtowc(&wc, str + pos, MB_CUR_MAX);
TdWchar wc;
int bytes = taosMbToWchar(&wc, str + pos, MB_CUR_MAX);
if (bytes == 0) {
break;
}
......@@ -577,7 +577,7 @@ static void shellPrintNChar(const char *str, int length, int width) {
#ifdef WINDOWS
int w = bytes;
#else
int w = wcwidth(wc);
int w = taosWcharWidth(wc);
#endif
if (w <= 0) {
continue;
......
......@@ -31,7 +31,6 @@
int indicator = 1;
struct termios oldtio;
extern int wcwidth(wchar_t c);
void insertChar(Command *cmd, char *c, int size);
const char *argp_program_version = version;
const char *argp_program_bug_address = "<support@taosdata.com>";
......@@ -456,7 +455,7 @@ void showOnScreen(Command *cmd) {
w.ws_row = 30;
}
wchar_t wc;
TdWchar wc;
int size = 0;
// Print out the command.
......@@ -471,11 +470,11 @@ void showOnScreen(Command *cmd) {
int remain_column = w.ws_col;
/* size = cmd->commandSize + prompt_size; */
for (char *str = total_string; size < cmd->commandSize + prompt_size;) {
int ret = mbtowc(&wc, str, MB_CUR_MAX);
int ret = taosMbToWchar(&wc, str, MB_CUR_MAX);
if (ret < 0) break;
size += ret;
/* assert(size >= 0); */
int width = wcwidth(wc);
int width = taosWcharWidth(wc);
if (remain_column > width) {
printf("%lc", wc);
remain_column -= width;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册