diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 925a3f3c84609429ba986ac2aa91085a78d12c4f..e897a08270fe5389c101136fdd71b191d3d4b6cf 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -84,8 +84,7 @@ void taos_query_a(TAOS *taos, char *sqlstr, void (*fp)(void *, TAOS_RES *, int), pRes->qhandle = 0; pRes->numOfRows = 1; - strtolower(sqlstr, pSql->sqlstr); - pSql->sqlstr[sqlLen] = 0; + strtolower(pSql->sqlstr, sqlstr); tscTrace("%p Async SQL: %s, pObj:%p", pSql, pSql->sqlstr, pObj); int32_t code = tsParseSql(pSql, pObj->acctId, pObj->db, true); diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index cf43aad94bcf49e18a4c61b9fe9aeebaec116087..320cafc34afe520e22a3f84cea734d154e679ab4 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -1949,7 +1949,7 @@ int32_t getColumnIndexByName(SSQLToken* pToken, SSchema* pSchema, int32_t numOfC return -1; } - char* r = strnchr(pToken->z, '.', pToken->n); + char* r = strnchr(pToken->z, '.', pToken->n, false); if (r != NULL) { r += 1; @@ -3172,7 +3172,7 @@ int32_t getTimeRange(int64_t* stime, int64_t* etime, tSQLExpr* pRight, int32_t o bool parsed = false; if (pRight->val.nType == TSDB_DATA_TYPE_BINARY) { strdequote(pRight->val.pz); - char* seg = strnchr(pRight->val.pz, '-', pRight->val.nLen); + char* seg = strnchr(pRight->val.pz, '-', pRight->val.nLen, false); if (seg != NULL) { if (taosParseTime(pRight->val.pz, &val, pRight->val.nLen, TSDB_TIME_PRECISION_MICRO) == TSDB_CODE_SUCCESS) { parsed = true; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index df176314e5cb1371dd5465c8a24ae4e7b77e74ab..004508de27fc9d1cf1cd97e1784d90c01b10956b 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -2807,8 +2807,7 @@ int tscGetMeterMetaEx(SSqlObj *pSql, char *meterId, bool createIfNotExists) { * successfully created the corresponding table. */ static void tscWaitingForCreateTable(SSqlCmd *pCmd) { - int32_t CREATE_METER_ON_DEMAND = 1; - if (pCmd->command == TSDB_SQL_INSERT && pCmd->defaultVal[0] == CREATE_METER_ON_DEMAND) { + if (pCmd->command == TSDB_SQL_INSERT) { taosMsleep(50); // todo: global config } } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index d55a765d3b0691cd55fe9ac04a5f5d31719290bf..0277fe25cd945953eeb2ff781a82c8c8c29fb6a3 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -88,7 +88,7 @@ TAOS *taos_connect_imp(char *ip, char *user, char *pass, char *db, int port, voi strcpy(tmp, db); strdequote(tmp); - strtolower(tmp, pObj->db); + strtolower(pObj->db, tmp); } pthread_mutex_init(&pObj->mutex, NULL); @@ -198,8 +198,7 @@ int taos_query(TAOS *taos, char *sqlstr) { return pRes->code; } - strtolower(sqlstr, pSql->sqlstr); - pSql->sqlstr[sqlLen] = 0; + strtolower(pSql->sqlstr, sqlstr); pRes->code = (uint8_t)tsParseSql(pSql, pObj->acctId, pObj->db, false); @@ -728,8 +727,7 @@ int taos_validate_sql(TAOS *taos, char *sql) { return pRes->code; } - strtolower(sql, pSql->sqlstr); - pSql->sqlstr[sqlLen] = 0; + strtolower(pSql->sqlstr, sql); pRes->code = (uint8_t)tsParseSql(pSql, pObj->acctId, pObj->db, false); int code = pRes->code; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 4c9c180f6a07f88b39113cbf02269c00bd06af8f..f86016e981072b596ab57ef92a993f9e3b39ed5a 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -898,7 +898,7 @@ int32_t tscValidateName(SSQLToken* pToken) { return TSDB_CODE_INVALID_SQL; } - char* sep = strnchrNoquote(pToken->z, TS_PATH_DELIMITER[0], pToken->n); + char* sep = strnchr(pToken->z, TS_PATH_DELIMITER[0], pToken->n, true); if (sep == NULL) { // single part if (pToken->type == TK_STRING) { pToken->n = strdequote(pToken->z); @@ -911,7 +911,7 @@ int32_t tscValidateName(SSQLToken* pToken) { if (len == pToken->n) { return validateQuoteToken(pToken); } else { - sep = strnchrNoquote(pToken->z, TS_PATH_DELIMITER[0], pToken->n); + sep = strnchr(pToken->z, TS_PATH_DELIMITER[0], pToken->n, true); if (sep == NULL) { return TSDB_CODE_INVALID_SQL; } diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 2187bb4d878aa3536e26f99d7573016d7250ee02..8f5d1dc52a39e0b51757ac1ea9d04145dc0af5bc 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -129,6 +129,7 @@ extern "C" { #define TSDB_CODE_INVALID_QUERY_MSG 106 // failed to validate the sql expression msg by vnode #define TSDB_CODE_CACHE_BLOCK_TS_DISORDERED 107 // time stamp in cache block is disordered #define TSDB_CODE_FILE_BLOCK_TS_DISORDERED 108 // time stamp in file block is disordered +#define TSDB_CODE_INVALID_COMMIT_LOG 109 // invalid commit log may be caused by insufficient sotrage // message type #define TSDB_MSG_TYPE_REG 1 diff --git a/src/inc/tcache.h b/src/inc/tcache.h index 805e5a45af386f023016690fad7a871a9c7ac048..e8726321f5056b6e208b28c0f99907fe9c0f86d1 100644 --- a/src/inc/tcache.h +++ b/src/inc/tcache.h @@ -49,10 +49,10 @@ void *taosAddDataIntoCache(void *handle, char *key, char *pData, int dataSize, i * if it is referenced by other object, it will be remain in cache * @param handle cache object * @param data not the key, actually referenced data - * @param isForce force model, reduce the ref count and move the data into + * @param remove force model, reduce the ref count and move the data into * pTrash */ -void taosRemoveDataFromCache(void *handle, void **data, bool isForce); +void taosRemoveDataFromCache(void *handle, void **data, bool remove); /** * update data in cache diff --git a/src/inc/tutil.h b/src/inc/tutil.h index 3b64cf66ea6bc6fd37d72097bc798b341f587a0c..cd355717ba481b2ad7bb143a0b5a86c55142b3c2 100644 --- a/src/inc/tutil.h +++ b/src/inc/tutil.h @@ -180,12 +180,11 @@ int32_t strdequote(char *src); void strtrim(char *src); -char *strnchr(char *haystack, char needle, int32_t len); -char *strnchrNoquote(char *haystack, char needle, int32_t len); +char *strnchr(char *haystack, char needle, int32_t len, bool skipquote); char **strsplit(char *src, const char *delim, int32_t *num); -void strtolower(char *src, char *dst); +void strtolower(char *dst, const char *src); int64_t strnatoi(char *num, int32_t len); diff --git a/src/rpc/src/trpc.c b/src/rpc/src/trpc.c index 693784ab11770d6c3df6f7048907299483e28fb2..b380ead968f72f2dfcd1da06eb67f55a613bb852 100644 --- a/src/rpc/src/trpc.c +++ b/src/rpc/src/trpc.c @@ -1217,6 +1217,7 @@ int taosReSendRspToPeer(SRpcConn *pConn) { void taosProcessTaosTimer(void *param, void *tmrId) { STaosHeader *pHeader = NULL; SRpcConn * pConn = (SRpcConn *)param; + int msgLen; if (pConn->signature != param) { tError("pConn Signature:0x%x, pConn:0x%x not matched", pConn->signature, param); @@ -1252,6 +1253,7 @@ void taosProcessTaosTimer(void *param, void *tmrId) { if (pConn->pMsgNode && pConn->pMsgNode->msgLen > 0) { pHeader = (STaosHeader *)((char *)pConn->pMsgNode + sizeof(SMsgNode)); pHeader->destId = pConn->peerId; + msgLen = pConn->pMsgNode->msgLen; if (pConn->spi) { STaosDigest *pDigest = (STaosDigest *)(((char *)pHeader) + pConn->pMsgNode->msgLen - sizeof(STaosDigest)); pDigest->timeStamp = htonl(taosGetTimestampSec()); @@ -1279,8 +1281,7 @@ void taosProcessTaosTimer(void *param, void *tmrId) { pthread_mutex_unlock(&pChann->mutex); if (pHeader) { - (*taosSendData[pServer->type])(pConn->peerIp, pConn->peerPort, (char *)pHeader, pConn->pMsgNode->msgLen, - pConn->chandle); + (*taosSendData[pServer->type])(pConn->peerIp, pConn->peerPort, (char *)pHeader, msgLen, pConn->chandle); taosTmrReset(taosProcessTaosTimer, tsRpcTimer, pConn, pChann->tmrCtrl, &pConn->pTimer); } } diff --git a/src/rpc/src/tstring.c b/src/rpc/src/tstring.c index d76f3055a42cc534315aa91e7dcbf041cfae90a6..3c52b0c8b4f7ef024cde3295f1c3932fecc08367 100644 --- a/src/rpc/src/tstring.c +++ b/src/rpc/src/tstring.c @@ -139,7 +139,7 @@ char *tsError[] = {"success", "unexpected response", "invalid response type", "no resource", - "invalid time stamp", // 15 + "server-client date time unsynced", // 15 "mismatched meter ID", "transcation not finished", "not online", @@ -232,5 +232,6 @@ char *tsError[] = {"success", "timestamp out of range", "invalid query message", "timestamp disordered in cache block", - "timestamp disordered in file block" + "timestamp disordered in file block", + "invalid commit log" }; diff --git a/src/system/src/vnodeCommit.c b/src/system/src/vnodeCommit.c index 64de3bee7fe500bb94813b55203096a995348ccf..4a63095bfdc394ed10ac0703845dcfe39ada03ba 100644 --- a/src/system/src/vnodeCommit.c +++ b/src/system/src/vnodeCommit.c @@ -48,9 +48,9 @@ int vnodeOpenCommitLog(int vnode, uint64_t firstV) { dTrace("vid:%d, logfd:%d, open file:%s success", vnode, pVnode->logFd, fileName); if (posix_fallocate64(pVnode->logFd, 0, pVnode->mappingSize) != 0) { - dError("vid:%d, logfd:%d, failed to alloc file size:%d", vnode, pVnode->logFd, pVnode->mappingSize); + dError("vid:%d, logfd:%d, failed to alloc file size:%d reason:%s", vnode, pVnode->logFd, pVnode->mappingSize, strerror(errno)); perror("fallocate failed"); - return -1; + goto _err_log_open; } struct stat statbuf; @@ -60,13 +60,13 @@ int vnodeOpenCommitLog(int vnode, uint64_t firstV) { if (length != pVnode->mappingSize) { dError("vid:%d, logfd:%d, alloc file size:%ld not equal to mapping size:%ld", vnode, pVnode->logFd, length, pVnode->mappingSize); - return -1; + goto _err_log_open; } pVnode->pMem = mmap(0, pVnode->mappingSize, PROT_WRITE | PROT_READ, MAP_SHARED, pVnode->logFd, 0); if (pVnode->pMem == MAP_FAILED) { dError("vid:%d, logfd:%d, failed to map file, reason:%s", vnode, pVnode->logFd, strerror(errno)); - return -1; + goto _err_log_open; } pVnode->pWrite = pVnode->pMem; @@ -74,6 +74,12 @@ int vnodeOpenCommitLog(int vnode, uint64_t firstV) { pVnode->pWrite += sizeof(firstV); return pVnode->logFd; + + _err_log_open: + close(pVnode->logFd); + remove(fileName); + pVnode->logFd = -1; + return -1; } int vnodeRenewCommitLog(int vnode) { @@ -244,9 +250,9 @@ int vnodeInitCommit(int vnode) { void vnodeCleanUpCommit(int vnode) { SVnodeObj *pVnode = vnodeList + vnode; - if (pVnode->logFd) tclose(pVnode->logFd); + if (VALIDFD(pVnode->logFd)) tclose(pVnode->logFd); - if (pVnode->cfg.commitLog && remove(pVnode->logFn) < 0) { + if (pVnode->cfg.commitLog && (pVnode->logFd > 0 && remove(pVnode->logFn) < 0)) { dError("vid:%d, failed to remove:%s", vnode, pVnode->logFn); taosLogError("vid:%d, failed to remove:%s", vnode, pVnode->logFn); } diff --git a/src/system/src/vnodeImport.c b/src/system/src/vnodeImport.c index 92ecaa7e0120bb36b9b21f574af95cef4003987b..7f6d1bcc2ed37b9e7a8d59333caa3be6799a48a7 100644 --- a/src/system/src/vnodeImport.c +++ b/src/system/src/vnodeImport.c @@ -881,6 +881,7 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi } if ( pVnode->cfg.commitLog && source != TSDB_DATA_SOURCE_LOG) { + if (pVnode->logFd < 0) return TSDB_CODE_INVALID_COMMIT_LOG; code = vnodeWriteToCommitLog(pObj, TSDB_ACTION_IMPORT, cont, contLen, sversion); if (code != 0) return code; } diff --git a/src/system/src/vnodeMeter.c b/src/system/src/vnodeMeter.c index 62b6b64e22407defa861e94a3bb25d87cfcca646..27e6e74952851364036e304b65b2f90e59b44e18 100644 --- a/src/system/src/vnodeMeter.c +++ b/src/system/src/vnodeMeter.c @@ -556,6 +556,7 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi // FIXME: Here should be after the comparison of sversions. if (pVnode->cfg.commitLog && source != TSDB_DATA_SOURCE_LOG) { + if (pVnode->logFd < 0) return TSDB_CODE_INVALID_COMMIT_LOG; code = vnodeWriteToCommitLog(pObj, TSDB_ACTION_INSERT, cont, contLen, sversion); if (code != 0) return code; } diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index ea34ab421b0296e5a6048a6081dbf1ca6b837119..317daa28b4672c6e9e2d60ed7c42eea43a45c05c 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -46,7 +46,8 @@ typedef struct _cache_node_t { char * key; /* null-terminated string */ struct _cache_node_t *prev; struct _cache_node_t *next; - uint64_t time; + uint64_t addTime; // the time when this element is added or updated into cache + uint64_t time; // end time when this element should be remove from cache uint64_t signature; /* @@ -78,7 +79,7 @@ typedef struct { * when the node in pTrash does not be referenced, it will be release at the expired time */ SDataNode *pTrash; - int numOfElemsInTrash; /* number of element in trash */ + int numOfElemsInTrash; // number of element in trash void *tmrCtrl; void *pTimer; @@ -87,7 +88,7 @@ typedef struct { _hashFunc hashFp; /* - * pthread_rwlock_t have bugs on the windows platform + * pthread_rwlock_t will block ops on the windows platform, when refresh is called. * so use pthread_mutex_t as an alternative */ #if defined LINUX @@ -125,7 +126,8 @@ static SDataNode *taosCreateHashNode(const char *key, uint32_t keyLen, const cha memcpy(pNewNode->data, pData, dataSize); - pNewNode->time = taosGetTimestampMs() + lifespan; + pNewNode->addTime = (uint64_t) taosGetTimestampMs(); + pNewNode->time = pNewNode->addTime + lifespan; pNewNode->key = pNewNode->data + dataSize; strcpy(pNewNode->key, key); @@ -146,7 +148,7 @@ static SDataNode *taosCreateHashNode(const char *key, uint32_t keyLen, const cha static FORCE_INLINE int taosHashKey(int maxSessions, char *key, uint32_t len) { uint32_t hash = MurmurHash3_32(key, len); - /* avoid the costly remainder operation */ + // avoid the costly remainder operation assert((maxSessions & (maxSessions - 1)) == 0); hash = hash & (maxSessions - 1); @@ -485,11 +487,12 @@ void *taosAddDataIntoCache(void *handle, char *key, char *pData, int dataSize, i if (pOldNode == NULL) { // do add to cache pNode = taosAddToCacheImpl(pObj, key, keyLen, pData, dataSize, keepTime * 1000L); - pTrace("key:%s %p added into cache,slot:%d,expireTime:%lld,cache total:%d,size:%ldbytes,collision:%d", pNode->key, - pNode, pNode->hashVal, pNode->time, pObj->total, pObj->totalSize, pObj->statistics.numOfCollision); + pTrace("key:%s %p added into cache, slot:%d, addTime:%lld, expireTime:%lld, cache total:%d, " + "size:%lldbytes, collision:%d", pNode->key, pNode, pNode->hashVal, pNode->addTime, pNode->time, pObj->total, + pObj->totalSize, pObj->statistics.numOfCollision); } else { // old data exists, update the node pNode = taosUpdateCacheImpl(pObj, pOldNode, key, keyLen, pData, dataSize, keepTime * 1000L); - // pWarn("key:%s %p exist in cache,updated", key, pNode); + pTrace("key:%s %p exist in cache, updated", key, pNode); } #if defined LINUX @@ -507,7 +510,7 @@ void *taosAddDataIntoCache(void *handle, char *key, char *pData, int dataSize, i * @param handle * @param data */ -void taosRemoveDataFromCache(void *handle, void **data, _Bool isForce) { +void taosRemoveDataFromCache(void *handle, void **data, bool remove) { SCacheObj *pObj = (SCacheObj *)handle; if (pObj == NULL || pObj->maxSessions == 0 || (*data) == NULL || (pObj->total + pObj->numOfElemsInTrash == 0)) return; @@ -532,7 +535,7 @@ void taosRemoveDataFromCache(void *handle, void **data, _Bool isForce) { *data = NULL; - if (isForce) { + if (remove) { #if defined LINUX pthread_rwlock_wrlock(&pObj->lock); #else @@ -540,6 +543,7 @@ void taosRemoveDataFromCache(void *handle, void **data, _Bool isForce) { #endif taosCacheMoveNodeToTrash(pObj, pNode); + #if defined LINUX pthread_rwlock_unlock(&pObj->lock); #else diff --git a/src/util/src/ttime.c b/src/util/src/ttime.c index 218eab93118ed1be9aa66781dcb726e2546f857d..0e749b5be9105e5e96ba223f11375b66f83c94e5 100644 --- a/src/util/src/ttime.c +++ b/src/util/src/ttime.c @@ -56,7 +56,7 @@ int64_t taosGetTimestamp(int32_t precision) { int32_t taosParseTime(char* timestr, int64_t* time, int32_t len, int32_t timePrec) { /* parse datatime string in with tz */ - if (strnchr(timestr, 'T', len) != NULL) { + if (strnchr(timestr, 'T', len, false) != NULL) { return parseTimeWithTz(timestr, time, timePrec); } else { return parseLocaltime(timestr, time, timePrec); diff --git a/src/util/src/tutil.c b/src/util/src/tutil.c index ed195368eb68a46702a286778c2f54b50286afec..019690bf37b0bc5dd526f57fe9f8c891980ad93e 100644 --- a/src/util/src/tutil.c +++ b/src/util/src/tutil.c @@ -119,27 +119,18 @@ char **strsplit(char *z, const char *delim, int32_t *num) { return split; } -char *strnchr(char *haystack, char needle, int32_t len) { +char *strnchr(char *haystack, char needle, int32_t len, bool skipquote) { for (int32_t i = 0; i < len; ++i) { - if (haystack[i] == needle) { - return &haystack[i]; - } - } - return NULL; -} + // skip the needle in quote, jump to the end of quoted string + if (skipquote && (haystack[i] == '\'' || haystack[i] == '"')) { + char quote = haystack[i++]; + while(i < len && haystack[i++] != quote); + if (i >= len) { + return NULL; + } + } -char *strnchrNoquote(char *haystack, char needle, int32_t len) { - for (int32_t i = 0; i < len; ++i) { - if (haystack[i] == '\'' || haystack[i] == '"') { - char quote = haystack[i++]; - while(i < len && haystack[i] != quote){++i;} - - if (++i >= len) { - return NULL; - } - } - if (haystack[i] == needle) { return &haystack[i]; } @@ -148,8 +139,7 @@ char *strnchrNoquote(char *haystack, char needle, int32_t len) { return NULL; } - -void strtolower(char *z, char *dst) { +void strtolower(char *dst, const char *z) { int quote = 0; char *str = z; if (dst == NULL) { @@ -169,6 +159,8 @@ void strtolower(char *z, char *dst) { str++; } + + *dst = 0; } char *paGetToken(char *string, char **token, int32_t *tokenLen) {