diff --git a/source/libs/index/inc/indexFst.h b/source/libs/index/inc/indexFst.h index 4c5bca864a0be6b4926965fc1695a8e61d88feaa..ddeaee06ea024da687c44846c7d63fb0183bbbd0 100644 --- a/source/libs/index/inc/indexFst.h +++ b/source/libs/index/inc/indexFst.h @@ -290,21 +290,21 @@ bool fstVerify(Fst* fst); // refactor this function bool fstBuilderNodeCompileTo(FstBuilderNode* b, IdxFstFile* wrt, CompiledAddr lastAddr, CompiledAddr startAddr); -typedef struct StreamState { +typedef struct FstStreamState { FstNode* node; uint64_t trans; FstOutput out; void* autState; -} StreamState; +} FstStreamState; -void streamStateDestroy(void* s); +void fstStreamStateDestroy(void* s); typedef struct FStmSt { Fst* fst; FAutoCtx* aut; SArray* inp; FstOutput emptyOutput; - SArray* stack; // + SArray* stack; // FstBoundWithData* endAt; } FStmSt; @@ -317,14 +317,14 @@ typedef struct FStmStRslt { FStmStRslt* swsResultCreate(FstSlice* data, FstOutput fOut, void* state); void swsResultDestroy(FStmStRslt* result); -typedef void* (*StreamCallback)(void*); +typedef void* (*streamCallback__fn)(void*); FStmSt* stmStCreate(Fst* fst, FAutoCtx* automation, FstBoundWithData* min, FstBoundWithData* max); void stmStDestroy(FStmSt* sws); bool stmStSeekMin(FStmSt* sws, FstBoundWithData* min); -FStmStRslt* stmStNextWith(FStmSt* sws, StreamCallback callback); +FStmStRslt* stmStNextWith(FStmSt* sws, streamCallback__fn callback); FStmBuilder* stmBuilderCreate(Fst* fst, FAutoCtx* aut); diff --git a/source/libs/index/src/indexFst.c b/source/libs/index/src/indexFst.c index aed2ec3ee321011d9777b437ae9453296b6bbc62..bbffcfa6c17aa628b0afc79ba96746d764add523 100644 --- a/source/libs/index/src/indexFst.c +++ b/source/libs/index/src/indexFst.c @@ -1165,7 +1165,7 @@ FStmSt* stmStCreate(Fst* fst, FAutoCtx* automation, FstBoundWithData* min, FstBo sws->emptyOutput.null = true; sws->emptyOutput.out = 0; - sws->stack = (SArray*)taosArrayInit(256, sizeof(StreamState)); + sws->stack = (SArray*)taosArrayInit(256, sizeof(FstStreamState)); sws->endAt = max; stmStSeekMin(sws, min); @@ -1177,7 +1177,7 @@ void stmStDestroy(FStmSt* sws) { } taosArrayDestroy(sws->inp); - taosArrayDestroyEx(sws->stack, streamStateDestroy); + taosArrayDestroyEx(sws->stack, fstStreamStateDestroy); taosMemoryFree(sws); } @@ -1188,10 +1188,10 @@ bool stmStSeekMin(FStmSt* sws, FstBoundWithData* min) { if (fstBoundWithDataIsIncluded(min)) { sws->emptyOutput.out = fstEmptyFinalOutput(sws->fst, &(sws->emptyOutput.null)); } - StreamState s = {.node = fstGetRoot(sws->fst), - .trans = 0, - .out = {.null = false, .out = 0}, - .autState = automFuncs[aut->type].start(aut)}; // auto.start callback + FstStreamState s = {.node = fstGetRoot(sws->fst), + .trans = 0, + .out = {.null = false, .out = 0}, + .autState = automFuncs[aut->type].start(aut)}; // auto.start callback taosArrayPush(sws->stack, &s); return true; } @@ -1223,7 +1223,7 @@ bool stmStSeekMin(FStmSt* sws, FstBoundWithData* min) { autState = automFuncs[aut->type].accept(aut, preState, b); taosArrayPush(sws->inp, &b); - StreamState s = {.node = node, .trans = res + 1, .out = {.null = false, .out = out}, .autState = preState}; + FstStreamState s = {.node = node, .trans = res + 1, .out = {.null = false, .out = out}, .autState = preState}; node = NULL; taosArrayPush(sws->stack, &s); @@ -1244,7 +1244,7 @@ bool stmStSeekMin(FStmSt* sws, FstBoundWithData* min) { } } - StreamState s = {.node = node, .trans = i, .out = {.null = false, .out = out}, .autState = autState}; + FstStreamState s = {.node = node, .trans = i, .out = {.null = false, .out = out}, .autState = autState}; taosArrayPush(sws->stack, &s); taosMemoryFree(trans); return true; @@ -1255,7 +1255,7 @@ bool stmStSeekMin(FStmSt* sws, FstBoundWithData* min) { uint32_t sz = taosArrayGetSize(sws->stack); if (sz != 0) { - StreamState* s = taosArrayGet(sws->stack, sz - 1); + FstStreamState* s = taosArrayGet(sws->stack, sz - 1); if (inclusize) { s->trans -= 1; taosArrayPop(sws->inp); @@ -1264,7 +1264,7 @@ bool stmStSeekMin(FStmSt* sws, FstBoundWithData* min) { uint64_t trans = s->trans; FstTransition trn; fstNodeGetTransitionAt(n, trans - 1, &trn); - StreamState s = { + FstStreamState s = { .node = fstGetNode(sws->fst, trn.addr), .trans = 0, .out = {.null = false, .out = out}, .autState = autState}; taosArrayPush(sws->stack, &s); return true; @@ -1274,14 +1274,14 @@ bool stmStSeekMin(FStmSt* sws, FstBoundWithData* min) { return false; } -FStmStRslt* stmStNextWith(FStmSt* sws, StreamCallback callback) { +FStmStRslt* stmStNextWith(FStmSt* sws, streamCallback__fn callback) { FAutoCtx* aut = sws->aut; FstOutput output = sws->emptyOutput; if (output.null == false) { FstSlice emptySlice = fstSliceCreate(NULL, 0); if (fstBoundWithDataExceededBy(sws->endAt, &emptySlice)) { - taosArrayDestroyEx(sws->stack, streamStateDestroy); - sws->stack = (SArray*)taosArrayInit(256, sizeof(StreamState)); + taosArrayDestroyEx(sws->stack, fstStreamStateDestroy); + sws->stack = (SArray*)taosArrayInit(256, sizeof(FstStreamState)); return NULL; } void* start = automFuncs[aut->type].start(aut); @@ -1292,12 +1292,12 @@ FStmStRslt* stmStNextWith(FStmSt* sws, StreamCallback callback) { } SArray* nodes = taosArrayInit(8, sizeof(FstNode*)); while (taosArrayGetSize(sws->stack) > 0) { - StreamState* p = (StreamState*)taosArrayPop(sws->stack); + FstStreamState* p = (FstStreamState*)taosArrayPop(sws->stack); if (p->trans >= FST_NODE_LEN(p->node) || !automFuncs[aut->type].canMatch(aut, p->autState)) { if (FST_NODE_ADDR(p->node) != fstGetRootAddr(sws->fst)) { taosArrayPop(sws->inp); } - streamStateDestroy(p); + fstStreamStateDestroy(p); continue; } FstTransition trn; @@ -1318,10 +1318,10 @@ FStmStRslt* stmStNextWith(FStmSt* sws, StreamCallback callback) { isMatch = automFuncs[aut->type].isMatch(aut, eofState); } } - StreamState s1 = {.node = p->node, .trans = p->trans + 1, .out = p->out, .autState = p->autState}; + FstStreamState s1 = {.node = p->node, .trans = p->trans + 1, .out = p->out, .autState = p->autState}; taosArrayPush(sws->stack, &s1); - StreamState s2 = {.node = nextNode, .trans = 0, .out = {.null = false, .out = out}, .autState = nextState}; + FstStreamState s2 = {.node = nextNode, .trans = 0, .out = {.null = false, .out = out}, .autState = nextState}; taosArrayPush(sws->stack, &s2); int32_t isz = taosArrayGetSize(sws->inp); @@ -1331,8 +1331,8 @@ FStmStRslt* stmStNextWith(FStmSt* sws, StreamCallback callback) { } FstSlice slice = fstSliceCreate(buf, isz); if (fstBoundWithDataExceededBy(sws->endAt, &slice)) { - taosArrayDestroyEx(sws->stack, streamStateDestroy); - sws->stack = (SArray*)taosArrayInit(256, sizeof(StreamState)); + taosArrayDestroyEx(sws->stack, fstStreamStateDestroy); + sws->stack = (SArray*)taosArrayInit(256, sizeof(FstStreamState)); taosMemoryFreeClear(buf); fstSliceDestroy(&slice); taosArrayDestroy(nodes); @@ -1375,11 +1375,11 @@ void swsResultDestroy(FStmStRslt* result) { taosMemoryFree(result); } -void streamStateDestroy(void* s) { +void fstStreamStateDestroy(void* s) { if (NULL == s) { return; } - StreamState* ss = (StreamState*)s; + FstStreamState* ss = (FstStreamState*)s; fstNodeDestroy(ss->node); } diff --git a/source/os/src/osSocket.c b/source/os/src/osSocket.c index fc943d4d76f983aed8c36efd72f851e9b8f3baa4..fd5bde90ba21b13f2bb3d2cbbeb7b9778b8929f4 100644 --- a/source/os/src/osSocket.c +++ b/source/os/src/osSocket.c @@ -73,6 +73,7 @@ typedef struct TdEpoll { EpollFd fd; } * TdEpollPtr, TdEpoll; +#if 0 int32_t taosSendto(TdSocketPtr pSocket, void *buf, int len, unsigned int flags, const struct sockaddr *dest_addr, int addrlen) { if (pSocket == NULL || pSocket->fd < 0) { @@ -84,6 +85,7 @@ int32_t taosSendto(TdSocketPtr pSocket, void *buf, int len, unsigned int flags, return sendto(pSocket->fd, buf, len, flags, dest_addr, addrlen); #endif } + int32_t taosWriteSocket(TdSocketPtr pSocket, void *buf, int len) { if (pSocket == NULL || pSocket->fd < 0) { return -1; @@ -114,6 +116,8 @@ int32_t taosReadFromSocket(TdSocketPtr pSocket, void *buf, int32_t len, int32_t } return recvfrom(pSocket->fd, buf, len, flags, destAddr, addrLen); } +#endif // endif 0 + int32_t taosCloseSocketNoCheck1(SocketFd fd) { #ifdef WINDOWS return closesocket(fd); @@ -121,6 +125,7 @@ int32_t taosCloseSocketNoCheck1(SocketFd fd) { return close(fd); #endif } + int32_t taosCloseSocket(TdSocketPtr *ppSocket) { int32_t code; if (ppSocket == NULL || *ppSocket == NULL || (*ppSocket)->fd < 0) { @@ -131,6 +136,8 @@ int32_t taosCloseSocket(TdSocketPtr *ppSocket) { taosMemoryFree(*ppSocket); return code; } + +#if 0 int32_t taosCloseSocketServer(TdSocketServerPtr *ppSocketServer) { int32_t code; if (ppSocketServer == NULL || *ppSocketServer == NULL || (*ppSocketServer)->fd < 0) { @@ -216,20 +223,6 @@ int32_t taosShutDownSocketServerRDWR(TdSocketServerPtr pSocketServer) { #endif } -void taosWinSocketInit() { -#ifdef WINDOWS - static char flag = 0; - if (flag == 0) { - WORD wVersionRequested; - WSADATA wsaData; - wVersionRequested = MAKEWORD(1, 1); - if (WSAStartup(wVersionRequested, &wsaData) == 0) { - flag = 1; - } - } -#else -#endif -} int32_t taosSetNonblocking(TdSocketPtr pSocket, int32_t on) { if (pSocket == NULL || pSocket->fd < 0) { return -1; @@ -262,6 +255,8 @@ int32_t taosSetNonblocking(TdSocketPtr pSocket, int32_t on) { #endif return 0; } +#endif // endif 0 + int32_t taosSetSockOpt(TdSocketPtr pSocket, int32_t level, int32_t optname, void *optval, int32_t optlen) { if (pSocket == NULL || pSocket->fd < 0) { return -1; @@ -296,6 +291,8 @@ int32_t taosSetSockOpt(TdSocketPtr pSocket, int32_t level, int32_t optname, void return setsockopt(pSocket->fd, level, optname, optval, (int)optlen); #endif } + +#if 0 int32_t taosGetSockOpt(TdSocketPtr pSocket, int32_t level, int32_t optname, void *optval, int32_t *optlen) { if (pSocket == NULL || pSocket->fd < 0) { return -1; @@ -307,6 +304,9 @@ int32_t taosGetSockOpt(TdSocketPtr pSocket, int32_t level, int32_t optname, void return getsockopt(pSocket->fd, level, optname, optval, (int *)optlen); #endif } + +#endif + uint32_t taosInetAddr(const char *ipAddr) { #ifdef WINDOWS uint32_t value; @@ -330,6 +330,7 @@ const char *taosInetNtoa(struct in_addr ipInt, char *dstStr, int32_t len) { #define TCP_CONN_TIMEOUT 3000 // conn timeout +#if 0 int32_t taosWriteMsg(TdSocketPtr pSocket, void *buf, int32_t nbytes) { if (pSocket == NULL || pSocket->fd < 0) { return -1; @@ -726,6 +727,7 @@ int taosValidIp(uint32_t ip) { #endif return 0; } +#endif // endif 0 bool taosValidIpAndPort(uint32_t ip, uint16_t port) { struct sockaddr_in serverAdd; @@ -774,6 +776,8 @@ bool taosValidIpAndPort(uint32_t ip, uint16_t port) { return true; // return 0 == taosValidIp(ip) ? true : false; } + +#if 0 TdSocketServerPtr taosOpenTcpServerSocket(uint32_t ip, uint16_t port) { struct sockaddr_in serverAdd; SocketFd fd; @@ -888,6 +892,36 @@ int64_t taosCopyFds(TdSocketPtr pSrcSocket, TdSocketPtr pDestSocket, int64_t len return len; } +// Function converting an IP address string to an uint32_t. +uint32_t ip2uint(const char *const ip_addr) { + char ip_addr_cpy[20]; + char ip[5]; + + tstrncpy(ip_addr_cpy, ip_addr, sizeof(ip_addr_cpy)); + + char *s_start, *s_end; + s_start = ip_addr_cpy; + s_end = ip_addr_cpy; + + int32_t k; + + for (k = 0; *s_start != '\0'; s_start = s_end) { + for (s_end = s_start; *s_end != '.' && *s_end != '\0'; s_end++) { + } + if (*s_end == '.') { + *s_end = '\0'; + s_end++; + } + ip[k++] = (char)atoi(s_start); + } + + ip[k] = '\0'; + + return *((uint32_t *)ip); +} + +#endif // endif 0 + void taosBlockSIGPIPE() { #ifdef WINDOWS // assert(0); @@ -991,34 +1025,6 @@ int32_t taosGetFqdn(char *fqdn) { return 0; } -// Function converting an IP address string to an uint32_t. -uint32_t ip2uint(const char *const ip_addr) { - char ip_addr_cpy[20]; - char ip[5]; - - tstrncpy(ip_addr_cpy, ip_addr, sizeof(ip_addr_cpy)); - - char *s_start, *s_end; - s_start = ip_addr_cpy; - s_end = ip_addr_cpy; - - int32_t k; - - for (k = 0; *s_start != '\0'; s_start = s_end) { - for (s_end = s_start; *s_end != '.' && *s_end != '\0'; s_end++) { - } - if (*s_end == '.') { - *s_end = '\0'; - s_end++; - } - ip[k++] = (char)atoi(s_start); - } - - ip[k] = '\0'; - - return *((uint32_t *)ip); -} - void tinet_ntoa(char *ipstr, uint32_t ip) { sprintf(ipstr, "%d.%d.%d.%d", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, ip >> 24); } @@ -1039,12 +1045,14 @@ void taosSetMaskSIGPIPE() { #endif } +#if 0 int32_t taosGetSocketName(TdSocketPtr pSocket, struct sockaddr *destAddr, int *addrLen) { if (pSocket == NULL || pSocket->fd < 0) { return -1; } return getsockname(pSocket->fd, destAddr, addrLen); } +#endif // endif 0 /* * Set TCP connection timeout per-socket level. @@ -1080,3 +1088,18 @@ int32_t taosCreateSocketWithTimeout(uint32_t timeout) { return (int)fd; } + +void taosWinSocketInit() { +#ifdef WINDOWS + static char flag = 0; + if (flag == 0) { + WORD wVersionRequested; + WSADATA wsaData; + wVersionRequested = MAKEWORD(1, 1); + if (WSAStartup(wVersionRequested, &wsaData) == 0) { + flag = 1; + } + } +#else +#endif +}