提交 0ca2d9e9 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into fix/commit_txn

...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
# taos-tools # taos-tools
ExternalProject_Add(taos-tools ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG 4d02980 GIT_TAG c64858f
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR "" BINARY_DIR ""
#BUILD_IN_SOURCE TRUE #BUILD_IN_SOURCE TRUE
......
...@@ -194,6 +194,10 @@ int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -194,6 +194,10 @@ int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
SUseDbRsp usedbRsp = {0}; SUseDbRsp usedbRsp = {0};
tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp); tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp);
if(strlen(usedbRsp.db) == 0){
return TSDB_CODE_MND_DB_NOT_EXIST;
}
SName name = {0}; SName name = {0};
tNameFromString(&name, usedbRsp.db, T_NAME_ACCT | T_NAME_DB); tNameFromString(&name, usedbRsp.db, T_NAME_ACCT | T_NAME_DB);
......
...@@ -603,22 +603,27 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) { ...@@ -603,22 +603,27 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
return 0; return 0;
} }
if (mndAcquireRpc(pMsg->info.node) == 0) return 0; if (mndAcquireRpc(pMsg->info.node) == 0) return 0;
SMnode *pMnode = pMsg->info.node;
const char *role = syncGetMyRoleStr(pMnode->syncMgmt.sync);
bool restored = syncIsRestoreFinish(pMnode->syncMgmt.sync);
if (pMsg->msgType == TDMT_MND_MQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER || if (pMsg->msgType == TDMT_MND_MQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER ||
pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER || pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER ||
pMsg->msgType == TDMT_MND_UPTIME_TIMER) { pMsg->msgType == TDMT_MND_UPTIME_TIMER) {
mTrace("timer not process since mnode restored:%d stopped:%d, sync restored:%d role:%s ", pMnode->restored,
pMnode->stopped, restored, role);
return -1; return -1;
} }
SEpSet epSet = {0}; const STraceId *trace = &pMsg->info.traceId;
SMnode *pMnode = pMsg->info.node; SEpSet epSet = {0};
mndGetMnodeEpSet(pMnode, &epSet); mndGetMnodeEpSet(pMnode, &epSet);
const STraceId *trace = &pMsg->info.traceId;
mDebug( mDebug(
"msg:%p, failed to check mnode state since %s, mnode restored:%d stopped:%d, sync restored:%d role:%s type:%s " "msg:%p, type:%s failed to process since %s, mnode restored:%d stopped:%d, sync restored:%d "
"numOfEps:%d inUse:%d", "role:%s, redirect numOfEps:%d inUse:%d",
pMsg, terrstr(), pMnode->restored, pMnode->stopped, syncIsRestoreFinish(pMnode->syncMgmt.sync), pMsg, TMSG_INFO(pMsg->msgType), terrstr(), pMnode->restored, pMnode->stopped, restored, role, epSet.numOfEps,
syncGetMyRoleStr(pMnode->syncMgmt.sync), TMSG_INFO(pMsg->msgType), epSet.numOfEps, epSet.inUse); epSet.inUse);
if (epSet.numOfEps > 0) { if (epSet.numOfEps > 0) {
for (int32_t i = 0; i < epSet.numOfEps; ++i) { for (int32_t i = 0; i < epSet.numOfEps; ++i) {
......
...@@ -394,7 +394,6 @@ static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, ...@@ -394,7 +394,6 @@ static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode,
if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj) != 0) goto _OVER; if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj) != 0) goto _OVER;
if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER; if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER;
if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER; if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER;
if (mndTransAppendNullLog(pTrans) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0; code = 0;
...@@ -478,7 +477,6 @@ static int32_t mndSetDropMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnodeO ...@@ -478,7 +477,6 @@ static int32_t mndSetDropMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnodeO
static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) { static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL; void *pIter = NULL;
int32_t numOfReplicas = 0;
SDDropMnodeReq dropReq = {0}; SDDropMnodeReq dropReq = {0};
SEpSet dropEpSet = {0}; SEpSet dropEpSet = {0};
...@@ -505,9 +503,8 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode ...@@ -505,9 +503,8 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode
int32_t mndSetDropMnodeInfoToTrans(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) { int32_t mndSetDropMnodeInfoToTrans(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
if (pObj == NULL) return 0; if (pObj == NULL) return 0;
if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) return -1;
if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj) != 0) return -1; if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj) != 0) return -1;
if (mndTransAppendNullLog(pTrans) != 0) return -1; if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) return -1;
return 0; return 0;
} }
...@@ -715,7 +712,8 @@ static void mndReloadSyncConfig(SMnode *pMnode) { ...@@ -715,7 +712,8 @@ static void mndReloadSyncConfig(SMnode *pMnode) {
SMnodeObj *pObj = NULL; SMnodeObj *pObj = NULL;
ESdbStatus objStatus = 0; ESdbStatus objStatus = 0;
void *pIter = NULL; void *pIter = NULL;
bool hasUpdatingMnode = false; int32_t updatingMnodes = 0;
int32_t readyMnodes = 0;
SSyncCfg cfg = {.myIndex = -1}; SSyncCfg cfg = {.myIndex = -1};
while (1) { while (1) {
...@@ -723,7 +721,11 @@ static void mndReloadSyncConfig(SMnode *pMnode) { ...@@ -723,7 +721,11 @@ static void mndReloadSyncConfig(SMnode *pMnode) {
if (pIter == NULL) break; if (pIter == NULL) break;
if (objStatus == SDB_STATUS_CREATING || objStatus == SDB_STATUS_DROPPING) { if (objStatus == SDB_STATUS_CREATING || objStatus == SDB_STATUS_DROPPING) {
mInfo("vgId:1, has updating mnode:%d, status:%s", pObj->id, sdbStatusName(objStatus)); mInfo("vgId:1, has updating mnode:%d, status:%s", pObj->id, sdbStatusName(objStatus));
hasUpdatingMnode = true; updatingMnodes++;
}
if (objStatus == SDB_STATUS_READY) {
mInfo("vgId:1, has ready mnode:%d, status:%s", pObj->id, sdbStatusName(objStatus));
readyMnodes++;
} }
if (objStatus == SDB_STATUS_READY || objStatus == SDB_STATUS_CREATING) { if (objStatus == SDB_STATUS_READY || objStatus == SDB_STATUS_CREATING) {
...@@ -739,18 +741,24 @@ static void mndReloadSyncConfig(SMnode *pMnode) { ...@@ -739,18 +741,24 @@ static void mndReloadSyncConfig(SMnode *pMnode) {
sdbReleaseLock(pSdb, pObj, false); sdbReleaseLock(pSdb, pObj, false);
} }
if (cfg.myIndex == -1) { if (readyMnodes <= 0 || updatingMnodes <= 0) {
mInfo("vgId:1, mnode not reload since selfIndex is -1"); mInfo("vgId:1, mnode sync not reconfig since readyMnodes:%d updatingMnodes:%d", readyMnodes, updatingMnodes);
return; return;
} }
if (!mndGetRestored(pMnode)) { if (cfg.myIndex == -1) {
mInfo("vgId:1, mnode not reload since restore not finished"); #if 1
mInfo("vgId:1, mnode sync not reconfig since selfIndex is -1");
#else
// cannot reconfig because the leader may fail to elect after reboot
mInfo("vgId:1, mnode sync not reconfig since selfIndex is -1, do sync stop oper");
syncStop(pMnode->syncMgmt.sync);
#endif
return; return;
} }
if (hasUpdatingMnode) { if (updatingMnodes > 0) {
mInfo("vgId:1, start to reload mnode sync, replica:%d myIndex:%d", cfg.replicaNum, cfg.myIndex); mInfo("vgId:1, mnode sync reconfig, replica:%d myIndex:%d", cfg.replicaNum, cfg.myIndex);
for (int32_t i = 0; i < cfg.replicaNum; ++i) { for (int32_t i = 0; i < cfg.replicaNum; ++i) {
SNodeInfo *pNode = &cfg.nodeInfo[i]; SNodeInfo *pNode = &cfg.nodeInfo[i];
mInfo("vgId:1, index:%d, fqdn:%s port:%d", i, pNode->nodeFqdn, pNode->nodePort); mInfo("vgId:1, index:%d, fqdn:%s port:%d", i, pNode->nodeFqdn, pNode->nodePort);
......
...@@ -403,6 +403,10 @@ const char *sdbStatusName(ESdbStatus status); ...@@ -403,6 +403,10 @@ const char *sdbStatusName(ESdbStatus status);
void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper); void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper);
int32_t sdbGetIdFromRaw(SSdb *pSdb, SSdbRaw *pRaw); int32_t sdbGetIdFromRaw(SSdb *pSdb, SSdbRaw *pRaw);
void sdbWriteLock(SSdb *pSdb, int32_t type);
void sdbReadLock(SSdb *pSdb, int32_t type);
void sdbUnLock(SSdb *pSdb, int32_t type);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -181,3 +181,23 @@ void sdbGetCommitInfo(SSdb *pSdb, int64_t *index, int64_t *term, int64_t *config ...@@ -181,3 +181,23 @@ void sdbGetCommitInfo(SSdb *pSdb, int64_t *index, int64_t *term, int64_t *config
pSdb->applyIndex, pSdb->applyTerm, pSdb->applyConfig, *index, *term, *config); pSdb->applyIndex, pSdb->applyTerm, pSdb->applyConfig, *index, *term, *config);
#endif #endif
} }
void sdbWriteLock(SSdb *pSdb, int32_t type) {
TdThreadRwlock *pLock = &pSdb->locks[type];
// mTrace("sdb table:%d start write lock:%p", type, pLock);
taosThreadRwlockWrlock(pLock);
// mTrace("sdb table:%d stop write lock:%p", type, pLock);
}
void sdbReadLock(SSdb *pSdb, int32_t type) {
TdThreadRwlock *pLock = &pSdb->locks[type];
// mTrace("sdb table:%d start read lock:%p", type, pLock);
taosThreadRwlockRdlock(pLock);
// mTrace("sdb table:%d stop read lock:%p", type, pLock);
}
void sdbUnLock(SSdb *pSdb, int32_t type) {
TdThreadRwlock *pLock = &pSdb->locks[type];
// mTrace("sdb table:%d unlock:%p", type, pLock);
taosThreadRwlockUnlock(pLock);
}
...@@ -363,9 +363,8 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) { ...@@ -363,9 +363,8 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
mInfo("write %s to sdb file, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i)); mInfo("write %s to sdb file, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i));
SHashObj *hash = pSdb->hashObjs[i]; SHashObj *hash = pSdb->hashObjs[i];
TdThreadRwlock *pLock = &pSdb->locks[i]; sdbWriteLock(pSdb, i);
taosThreadRwlockWrlock(pLock);
SSdbRow **ppRow = taosHashIterate(hash, NULL); SSdbRow **ppRow = taosHashIterate(hash, NULL);
while (ppRow != NULL) { while (ppRow != NULL) {
...@@ -410,7 +409,7 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) { ...@@ -410,7 +409,7 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
sdbFreeRaw(pRaw); sdbFreeRaw(pRaw);
ppRow = taosHashIterate(hash, ppRow); ppRow = taosHashIterate(hash, ppRow);
} }
taosThreadRwlockUnlock(pLock); sdbUnLock(pSdb, i);
} }
if (code == 0) { if (code == 0) {
......
...@@ -133,12 +133,12 @@ static int32_t sdbGetkeySize(SSdb *pSdb, ESdbType type, const void *pKey) { ...@@ -133,12 +133,12 @@ static int32_t sdbGetkeySize(SSdb *pSdb, ESdbType type, const void *pKey) {
} }
static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) { static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) {
TdThreadRwlock *pLock = &pSdb->locks[pRow->type]; int32_t type = pRow->type;
taosThreadRwlockWrlock(pLock); sdbWriteLock(pSdb, type);
SSdbRow *pOldRow = taosHashGet(hash, pRow->pObj, keySize); SSdbRow *pOldRow = taosHashGet(hash, pRow->pObj, keySize);
if (pOldRow != NULL) { if (pOldRow != NULL) {
taosThreadRwlockUnlock(pLock); sdbUnLock(pSdb, type);
sdbFreeRow(pSdb, pRow, false); sdbFreeRow(pSdb, pRow, false);
terrno = TSDB_CODE_SDB_OBJ_ALREADY_THERE; terrno = TSDB_CODE_SDB_OBJ_ALREADY_THERE;
return terrno; return terrno;
...@@ -149,7 +149,7 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * ...@@ -149,7 +149,7 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
sdbPrintOper(pSdb, pRow, "insert"); sdbPrintOper(pSdb, pRow, "insert");
if (taosHashPut(hash, pRow->pObj, keySize, &pRow, sizeof(void *)) != 0) { if (taosHashPut(hash, pRow->pObj, keySize, &pRow, sizeof(void *)) != 0) {
taosThreadRwlockUnlock(pLock); sdbUnLock(pSdb, type);
sdbFreeRow(pSdb, pRow, false); sdbFreeRow(pSdb, pRow, false);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno; return terrno;
...@@ -164,12 +164,12 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * ...@@ -164,12 +164,12 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
taosHashRemove(hash, pRow->pObj, keySize); taosHashRemove(hash, pRow->pObj, keySize);
sdbFreeRow(pSdb, pRow, false); sdbFreeRow(pSdb, pRow, false);
terrno = code; terrno = code;
taosThreadRwlockUnlock(pLock); sdbUnLock(pSdb, type);
return terrno; return terrno;
} }
} }
taosThreadRwlockUnlock(pLock); sdbUnLock(pSdb, type);
if (pSdb->keyTypes[pRow->type] == SDB_KEY_INT32) { if (pSdb->keyTypes[pRow->type] == SDB_KEY_INT32) {
pSdb->maxId[pRow->type] = TMAX(pSdb->maxId[pRow->type], *((int32_t *)pRow->pObj)); pSdb->maxId[pRow->type] = TMAX(pSdb->maxId[pRow->type], *((int32_t *)pRow->pObj));
...@@ -183,26 +183,27 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * ...@@ -183,26 +183,27 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
} }
static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pNewRow, int32_t keySize) { static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pNewRow, int32_t keySize) {
TdThreadRwlock *pLock = &pSdb->locks[pNewRow->type]; int32_t type = pNewRow->type;
taosThreadRwlockWrlock(pLock); sdbWriteLock(pSdb, type);
SSdbRow **ppOldRow = taosHashGet(hash, pNewRow->pObj, keySize); SSdbRow **ppOldRow = taosHashGet(hash, pNewRow->pObj, keySize);
if (ppOldRow == NULL || *ppOldRow == NULL) { if (ppOldRow == NULL || *ppOldRow == NULL) {
taosThreadRwlockUnlock(pLock); sdbUnLock(pSdb, type);
return sdbInsertRow(pSdb, hash, pRaw, pNewRow, keySize); return sdbInsertRow(pSdb, hash, pRaw, pNewRow, keySize);
} }
SSdbRow *pOldRow = *ppOldRow; SSdbRow *pOldRow = *ppOldRow;
pOldRow->status = pRaw->status; pOldRow->status = pRaw->status;
sdbPrintOper(pSdb, pOldRow, "update"); sdbPrintOper(pSdb, pOldRow, "update");
sdbUnLock(pSdb, type);
int32_t code = 0; int32_t code = 0;
SdbUpdateFp updateFp = pSdb->updateFps[pNewRow->type]; SdbUpdateFp updateFp = pSdb->updateFps[type];
if (updateFp != NULL) { if (updateFp != NULL) {
code = (*updateFp)(pSdb, pOldRow->pObj, pNewRow->pObj); code = (*updateFp)(pSdb, pOldRow->pObj, pNewRow->pObj);
} }
taosThreadRwlockUnlock(pLock); // sdbUnLock(pSdb, type);
sdbFreeRow(pSdb, pNewRow, false); sdbFreeRow(pSdb, pNewRow, false);
pSdb->tableVer[pOldRow->type]++; pSdb->tableVer[pOldRow->type]++;
...@@ -210,12 +211,12 @@ static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * ...@@ -210,12 +211,12 @@ static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
} }
static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) { static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) {
TdThreadRwlock *pLock = &pSdb->locks[pRow->type]; int32_t type = pRow->type;
taosThreadRwlockWrlock(pLock); sdbWriteLock(pSdb, type);
SSdbRow **ppOldRow = taosHashGet(hash, pRow->pObj, keySize); SSdbRow **ppOldRow = taosHashGet(hash, pRow->pObj, keySize);
if (ppOldRow == NULL || *ppOldRow == NULL) { if (ppOldRow == NULL || *ppOldRow == NULL) {
taosThreadRwlockUnlock(pLock); sdbUnLock(pSdb, type);
sdbFreeRow(pSdb, pRow, false); sdbFreeRow(pSdb, pRow, false);
terrno = TSDB_CODE_SDB_OBJ_NOT_THERE; terrno = TSDB_CODE_SDB_OBJ_NOT_THERE;
return terrno; return terrno;
...@@ -228,7 +229,7 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * ...@@ -228,7 +229,7 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
taosHashRemove(hash, pOldRow->pObj, keySize); taosHashRemove(hash, pOldRow->pObj, keySize);
pSdb->tableVer[pOldRow->type]++; pSdb->tableVer[pOldRow->type]++;
taosThreadRwlockUnlock(pLock); sdbUnLock(pSdb, type);
sdbFreeRow(pSdb, pRow, false); sdbFreeRow(pSdb, pRow, false);
...@@ -282,12 +283,11 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey) { ...@@ -282,12 +283,11 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey) {
void *pRet = NULL; void *pRet = NULL;
int32_t keySize = sdbGetkeySize(pSdb, type, pKey); int32_t keySize = sdbGetkeySize(pSdb, type, pKey);
TdThreadRwlock *pLock = &pSdb->locks[type]; sdbReadLock(pSdb, type);
taosThreadRwlockRdlock(pLock);
SSdbRow **ppRow = taosHashGet(hash, pKey, keySize); SSdbRow **ppRow = taosHashGet(hash, pKey, keySize);
if (ppRow == NULL || *ppRow == NULL) { if (ppRow == NULL || *ppRow == NULL) {
taosThreadRwlockUnlock(pLock); sdbUnLock(pSdb, type);
terrno = TSDB_CODE_SDB_OBJ_NOT_THERE; terrno = TSDB_CODE_SDB_OBJ_NOT_THERE;
return NULL; return NULL;
} }
...@@ -310,13 +310,13 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey) { ...@@ -310,13 +310,13 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey) {
break; break;
} }
taosThreadRwlockUnlock(pLock); sdbUnLock(pSdb, type);
return pRet; return pRet;
} }
static void sdbCheckRow(SSdb *pSdb, SSdbRow *pRow) { static void sdbCheckRow(SSdb *pSdb, SSdbRow *pRow) {
TdThreadRwlock *pLock = &pSdb->locks[pRow->type]; int32_t type = pRow->type;
taosThreadRwlockWrlock(pLock); sdbWriteLock(pSdb, type);
int32_t ref = atomic_sub_fetch_32(&pRow->refCount, 1); int32_t ref = atomic_sub_fetch_32(&pRow->refCount, 1);
sdbPrintOper(pSdb, pRow, "check"); sdbPrintOper(pSdb, pRow, "check");
...@@ -324,7 +324,7 @@ static void sdbCheckRow(SSdb *pSdb, SSdbRow *pRow) { ...@@ -324,7 +324,7 @@ static void sdbCheckRow(SSdb *pSdb, SSdbRow *pRow) {
sdbFreeRow(pSdb, pRow, true); sdbFreeRow(pSdb, pRow, true);
} }
taosThreadRwlockUnlock(pLock); sdbUnLock(pSdb, type);
} }
void sdbReleaseLock(SSdb *pSdb, void *pObj, bool lock) { void sdbReleaseLock(SSdb *pSdb, void *pObj, bool lock) {
...@@ -333,9 +333,9 @@ void sdbReleaseLock(SSdb *pSdb, void *pObj, bool lock) { ...@@ -333,9 +333,9 @@ void sdbReleaseLock(SSdb *pSdb, void *pObj, bool lock) {
SSdbRow *pRow = (SSdbRow *)((char *)pObj - sizeof(SSdbRow)); SSdbRow *pRow = (SSdbRow *)((char *)pObj - sizeof(SSdbRow));
if (pRow->type >= SDB_MAX) return; if (pRow->type >= SDB_MAX) return;
TdThreadRwlock *pLock = &pSdb->locks[pRow->type]; int32_t type = pRow->type;
if (lock) { if (lock) {
taosThreadRwlockWrlock(pLock); sdbWriteLock(pSdb, type);
} }
int32_t ref = atomic_sub_fetch_32(&pRow->refCount, 1); int32_t ref = atomic_sub_fetch_32(&pRow->refCount, 1);
...@@ -345,7 +345,7 @@ void sdbReleaseLock(SSdb *pSdb, void *pObj, bool lock) { ...@@ -345,7 +345,7 @@ void sdbReleaseLock(SSdb *pSdb, void *pObj, bool lock) {
} }
if (lock) { if (lock) {
taosThreadRwlockUnlock(pLock); sdbUnLock(pSdb, type);
} }
} }
...@@ -357,8 +357,7 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) { ...@@ -357,8 +357,7 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) {
SHashObj *hash = sdbGetHash(pSdb, type); SHashObj *hash = sdbGetHash(pSdb, type);
if (hash == NULL) return NULL; if (hash == NULL) return NULL;
TdThreadRwlock *pLock = &pSdb->locks[type]; sdbReadLock(pSdb, type);
taosThreadRwlockRdlock(pLock);
SSdbRow **ppRow = taosHashIterate(hash, pIter); SSdbRow **ppRow = taosHashIterate(hash, pIter);
while (ppRow != NULL) { while (ppRow != NULL) {
...@@ -373,7 +372,7 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) { ...@@ -373,7 +372,7 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) {
*ppObj = pRow->pObj; *ppObj = pRow->pObj;
break; break;
} }
taosThreadRwlockUnlock(pLock); sdbUnLock(pSdb, type);
return ppRow; return ppRow;
} }
...@@ -384,9 +383,8 @@ void *sdbFetchAll(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj, ESdbStat ...@@ -384,9 +383,8 @@ void *sdbFetchAll(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj, ESdbStat
SHashObj *hash = sdbGetHash(pSdb, type); SHashObj *hash = sdbGetHash(pSdb, type);
if (hash == NULL) return NULL; if (hash == NULL) return NULL;
TdThreadRwlock *pLock = &pSdb->locks[type];
if (lock) { if (lock) {
taosThreadRwlockRdlock(pLock); sdbReadLock(pSdb, type);
} }
SSdbRow **ppRow = taosHashIterate(hash, pIter); SSdbRow **ppRow = taosHashIterate(hash, pIter);
...@@ -404,7 +402,7 @@ void *sdbFetchAll(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj, ESdbStat ...@@ -404,7 +402,7 @@ void *sdbFetchAll(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj, ESdbStat
break; break;
} }
if (lock) { if (lock) {
taosThreadRwlockUnlock(pLock); sdbUnLock(pSdb, type);
} }
return ppRow; return ppRow;
...@@ -416,18 +414,17 @@ void sdbCancelFetch(SSdb *pSdb, void *pIter) { ...@@ -416,18 +414,17 @@ void sdbCancelFetch(SSdb *pSdb, void *pIter) {
SHashObj *hash = sdbGetHash(pSdb, pRow->type); SHashObj *hash = sdbGetHash(pSdb, pRow->type);
if (hash == NULL) return; if (hash == NULL) return;
TdThreadRwlock *pLock = &pSdb->locks[pRow->type]; int32_t type = pRow->type;
taosThreadRwlockRdlock(pLock); sdbReadLock(pSdb, type);
taosHashCancelIterate(hash, pIter); taosHashCancelIterate(hash, pIter);
taosThreadRwlockUnlock(pLock); sdbUnLock(pSdb, type);
} }
void sdbTraverse(SSdb *pSdb, ESdbType type, sdbTraverseFp fp, void *p1, void *p2, void *p3) { void sdbTraverse(SSdb *pSdb, ESdbType type, sdbTraverseFp fp, void *p1, void *p2, void *p3) {
SHashObj *hash = sdbGetHash(pSdb, type); SHashObj *hash = sdbGetHash(pSdb, type);
if (hash == NULL) return; if (hash == NULL) return;
TdThreadRwlock *pLock = &pSdb->locks[type]; sdbReadLock(pSdb, type);
taosThreadRwlockRdlock(pLock);
SSdbRow **ppRow = taosHashIterate(hash, NULL); SSdbRow **ppRow = taosHashIterate(hash, NULL);
while (ppRow != NULL) { while (ppRow != NULL) {
...@@ -443,17 +440,16 @@ void sdbTraverse(SSdb *pSdb, ESdbType type, sdbTraverseFp fp, void *p1, void *p2 ...@@ -443,17 +440,16 @@ void sdbTraverse(SSdb *pSdb, ESdbType type, sdbTraverseFp fp, void *p1, void *p2
ppRow = taosHashIterate(hash, ppRow); ppRow = taosHashIterate(hash, ppRow);
} }
taosThreadRwlockUnlock(pLock); sdbUnLock(pSdb, type);
} }
int32_t sdbGetSize(SSdb *pSdb, ESdbType type) { int32_t sdbGetSize(SSdb *pSdb, ESdbType type) {
SHashObj *hash = sdbGetHash(pSdb, type); SHashObj *hash = sdbGetHash(pSdb, type);
if (hash == NULL) return 0; if (hash == NULL) return 0;
TdThreadRwlock *pLock = &pSdb->locks[type]; sdbReadLock(pSdb, type);
taosThreadRwlockRdlock(pLock);
int32_t size = taosHashGetSize(hash); int32_t size = taosHashGetSize(hash);
taosThreadRwlockUnlock(pLock); sdbUnLock(pSdb, type);
return size; return size;
} }
...@@ -465,9 +461,7 @@ int32_t sdbGetMaxId(SSdb *pSdb, ESdbType type) { ...@@ -465,9 +461,7 @@ int32_t sdbGetMaxId(SSdb *pSdb, ESdbType type) {
if (pSdb->keyTypes[type] != SDB_KEY_INT32) return -1; if (pSdb->keyTypes[type] != SDB_KEY_INT32) return -1;
int32_t maxId = 0; int32_t maxId = 0;
sdbReadLock(pSdb, type);
TdThreadRwlock *pLock = &pSdb->locks[type];
taosThreadRwlockRdlock(pLock);
SSdbRow **ppRow = taosHashIterate(hash, NULL); SSdbRow **ppRow = taosHashIterate(hash, NULL);
while (ppRow != NULL) { while (ppRow != NULL) {
...@@ -477,8 +471,7 @@ int32_t sdbGetMaxId(SSdb *pSdb, ESdbType type) { ...@@ -477,8 +471,7 @@ int32_t sdbGetMaxId(SSdb *pSdb, ESdbType type) {
ppRow = taosHashIterate(hash, ppRow); ppRow = taosHashIterate(hash, ppRow);
} }
taosThreadRwlockUnlock(pLock); sdbUnLock(pSdb, type);
maxId = TMAX(maxId, pSdb->maxId[type]); maxId = TMAX(maxId, pSdb->maxId[type]);
return maxId + 1; return maxId + 1;
} }
......
...@@ -1059,7 +1059,7 @@ int32_t metaFilterTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) { ...@@ -1059,7 +1059,7 @@ int32_t metaFilterTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
if (param->val == NULL) { if (param->val == NULL) {
metaError("vgId:%d, failed to filter NULL data", TD_VID(pMeta->pVnode)); metaError("vgId:%d, failed to filter NULL data", TD_VID(pMeta->pVnode));
return -1; goto END;
} else { } else {
if (IS_VAR_DATA_TYPE(param->type)) { if (IS_VAR_DATA_TYPE(param->type)) {
tagData = varDataVal(param->val); tagData = varDataVal(param->val);
...@@ -1111,27 +1111,25 @@ int32_t metaFilterTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) { ...@@ -1111,27 +1111,25 @@ int32_t metaFilterTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
break; break;
} }
} }
if (p->suid != pKey->suid) { if (p == NULL || p->suid != pKey->suid) {
break; break;
} }
first = false; first = false;
if (p != NULL) { int32_t cmp = (*param->filterFunc)(p->data, pKey->data, pKey->type);
int32_t cmp = (*param->filterFunc)(p->data, pKey->data, pKey->type); if (cmp == 0) {
if (cmp == 0) { // match
// match tb_uid_t tuid = 0;
tb_uid_t tuid = 0; if (IS_VAR_DATA_TYPE(pKey->type)) {
if (IS_VAR_DATA_TYPE(pKey->type)) { tuid = *(tb_uid_t *)(p->data + varDataTLen(p->data));
tuid = *(tb_uid_t *)(p->data + varDataTLen(p->data));
} else {
tuid = *(tb_uid_t *)(p->data + tDataTypes[pCursor->type].bytes);
}
taosArrayPush(pUids, &tuid);
} else if (cmp == 1) {
// not match but should continue to iter
} else { } else {
// not match and no more result tuid = *(tb_uid_t *)(p->data + tDataTypes[pCursor->type].bytes);
break;
} }
taosArrayPush(pUids, &tuid);
} else if (cmp == 1) {
// not match but should continue to iter
} else {
// not match and no more result
break;
} }
valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur); valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur);
if (valid < 0) { if (valid < 0) {
......
...@@ -116,9 +116,10 @@ static int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const ...@@ -116,9 +116,10 @@ static int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const
indexMultiTermAdd(terms, term); indexMultiTermAdd(terms, term);
} }
} }
taosArrayDestroy(pTagVals);
indexJsonPut(pMeta->pTagIvtIdx, terms, tuid); indexJsonPut(pMeta->pTagIvtIdx, terms, tuid);
indexMultiTermDestroy(terms); indexMultiTermDestroy(terms);
taosArrayDestroy(pTagVals);
#endif #endif
return 0; return 0;
} }
...@@ -159,6 +160,7 @@ int metaDelJsonVarFromIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSche ...@@ -159,6 +160,7 @@ int metaDelJsonVarFromIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSche
memcpy(val, (uint16_t *)&len, VARSTR_HEADER_SIZE); memcpy(val, (uint16_t *)&len, VARSTR_HEADER_SIZE);
type = TSDB_DATA_TYPE_VARCHAR; type = TSDB_DATA_TYPE_VARCHAR;
term = indexTermCreate(suid, DEL_VALUE, type, key, nKey, val, len); term = indexTermCreate(suid, DEL_VALUE, type, key, nKey, val, len);
taosMemoryFree(val);
} else if (pTagVal->nData == 0) { } else if (pTagVal->nData == 0) {
term = indexTermCreate(suid, DEL_VALUE, TSDB_DATA_TYPE_VARCHAR, key, nKey, pTagVal->pData, 0); term = indexTermCreate(suid, DEL_VALUE, TSDB_DATA_TYPE_VARCHAR, key, nKey, pTagVal->pData, 0);
} }
...@@ -177,6 +179,7 @@ int metaDelJsonVarFromIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSche ...@@ -177,6 +179,7 @@ int metaDelJsonVarFromIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSche
} }
indexJsonPut(pMeta->pTagIvtIdx, terms, tuid); indexJsonPut(pMeta->pTagIvtIdx, terms, tuid);
indexMultiTermDestroy(terms); indexMultiTermDestroy(terms);
taosArrayDestroy(pTagVals);
#endif #endif
return 0; return 0;
} }
......
...@@ -138,7 +138,7 @@ void idxReleaseRef(int64_t ref); ...@@ -138,7 +138,7 @@ void idxReleaseRef(int64_t ref);
#define IDX_TYPE_ADD_EXTERN_TYPE(ty, exTy) \ #define IDX_TYPE_ADD_EXTERN_TYPE(ty, exTy) \
do { \ do { \
uint8_t oldTy = ty; \ uint8_t oldTy = ty; \
ty = (ty >> 4) | exTy; \ ty = ((ty >> 4) & 0xFF) | exTy; \
ty = (ty << 4) | oldTy; \ ty = (ty << 4) | oldTy; \
} while (0) } while (0)
......
...@@ -139,7 +139,7 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { ...@@ -139,7 +139,7 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
END: END:
if (idx != NULL) { if (idx != NULL) {
indexClose(idx); indexDestroy(idx);
} }
*index = NULL; *index = NULL;
return ret; return ret;
......
...@@ -538,7 +538,7 @@ int idxCachePut(void* cache, SIndexTerm* term, uint64_t uid) { ...@@ -538,7 +538,7 @@ int idxCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
idxCacheRef(pCache); idxCacheRef(pCache);
// encode data // encode data
CacheTerm* ct = taosMemoryCalloc(1, sizeof(CacheTerm)); CacheTerm* ct = taosMemoryCalloc(1, sizeof(CacheTerm));
if (cache == NULL) { if (ct == NULL) {
return -1; return -1;
} }
// set up key // set up key
...@@ -730,15 +730,17 @@ static int32_t idxCacheJsonTermCompare(const void* l, const void* r) { ...@@ -730,15 +730,17 @@ static int32_t idxCacheJsonTermCompare(const void* l, const void* r) {
return cmp; return cmp;
} }
static MemTable* idxInternalCacheCreate(int8_t type) { static MemTable* idxInternalCacheCreate(int8_t type) {
int ttype = IDX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? TSDB_DATA_TYPE_BINARY : TSDB_DATA_TYPE_BINARY; // int ttype = IDX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? TSDB_DATA_TYPE_BINARY :
// TSDB_DATA_TYPE_BINARY;
int ttype = TSDB_DATA_TYPE_BINARY;
int32_t (*cmpFn)(const void* l, const void* r) = int32_t (*cmpFn)(const void* l, const void* r) =
IDX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? idxCacheJsonTermCompare : idxCacheTermCompare; IDX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? idxCacheJsonTermCompare : idxCacheTermCompare;
MemTable* tbl = taosMemoryCalloc(1, sizeof(MemTable)); MemTable* tbl = taosMemoryCalloc(1, sizeof(MemTable));
idxMemRef(tbl); idxMemRef(tbl);
if (ttype == TSDB_DATA_TYPE_BINARY || ttype == TSDB_DATA_TYPE_NCHAR) { // if (ttype == TSDB_DATA_TYPE_BINARY || ttype == TSDB_DATA_TYPE_NCHAR) {
tbl->mem = tSkipListCreate(MAX_SKIP_LIST_LEVEL, ttype, MAX_INDEX_KEY_LEN, cmpFn, SL_ALLOW_DUP_KEY, idxCacheTermGet); tbl->mem = tSkipListCreate(MAX_SKIP_LIST_LEVEL, ttype, MAX_INDEX_KEY_LEN, cmpFn, SL_ALLOW_DUP_KEY, idxCacheTermGet);
} //}
return tbl; return tbl;
} }
......
...@@ -427,6 +427,7 @@ int32_t idxConvertDataToStr(void* src, int8_t type, void** dst) { ...@@ -427,6 +427,7 @@ int32_t idxConvertDataToStr(void* src, int8_t type, void** dst) {
*dst = taosMemoryCalloc(1, bufSize + 1); *dst = taosMemoryCalloc(1, bufSize + 1);
idxInt2str(*(uint64_t*)src, *dst, 1); idxInt2str(*(uint64_t*)src, *dst, 1);
tlen = strlen(*dst); tlen = strlen(*dst);
break;
case TSDB_DATA_TYPE_FLOAT: case TSDB_DATA_TYPE_FLOAT:
*dst = taosMemoryCalloc(1, bufSize + 1); *dst = taosMemoryCalloc(1, bufSize + 1);
sprintf(*dst, "%.9lf", *(float*)src); sprintf(*dst, "%.9lf", *(float*)src);
......
...@@ -231,7 +231,9 @@ static int32_t sifInitParam(SNode *node, SIFParam *param, SIFCtx *ctx) { ...@@ -231,7 +231,9 @@ static int32_t sifInitParam(SNode *node, SIFParam *param, SIFCtx *ctx) {
SIF_ERR_RET(sifGetValueFromNode(node, &param->condValue)); SIF_ERR_RET(sifGetValueFromNode(node, &param->condValue));
param->colId = -1; param->colId = -1;
param->colValType = (uint8_t)(vn->node.resType.type); param->colValType = (uint8_t)(vn->node.resType.type);
memcpy(param->colName, vn->literal, strlen(vn->literal)); if (strlen(vn->literal) <= sizeof(param->colName)) {
memcpy(param->colName, vn->literal, strlen(vn->literal));
}
break; break;
} }
case QUERY_NODE_COLUMN: { case QUERY_NODE_COLUMN: {
...@@ -400,54 +402,52 @@ static FORCE_INLINE FilterFunc sifGetFilterFunc(EIndexQueryType type, bool *reve ...@@ -400,54 +402,52 @@ static FORCE_INLINE FilterFunc sifGetFilterFunc(EIndexQueryType type, bool *reve
static void sifSetFltParam(SIFParam *left, SIFParam *right, SDataTypeBuf *typedata, SMetaFltParam *param) { static void sifSetFltParam(SIFParam *left, SIFParam *right, SDataTypeBuf *typedata, SMetaFltParam *param) {
int8_t ltype = left->colValType, rtype = right->colValType; int8_t ltype = left->colValType, rtype = right->colValType;
if (ltype == TSDB_DATA_TYPE_FLOAT) { if (ltype == TSDB_DATA_TYPE_FLOAT) {
float f; float f = 0;
SIF_DATA_CONVERT(rtype, right->condValue, f); SIF_DATA_CONVERT(rtype, right->condValue, f);
typedata->f = f; typedata->f = f;
param->val = &typedata->f; param->val = &typedata->f;
} else if (ltype == TSDB_DATA_TYPE_DOUBLE) { } else if (ltype == TSDB_DATA_TYPE_DOUBLE) {
double d; double d = 0;
SIF_DATA_CONVERT(rtype, right->condValue, d); SIF_DATA_CONVERT(rtype, right->condValue, d);
typedata->d = d; typedata->d = d;
param->val = &typedata->d; param->val = &typedata->d;
} else if (ltype == TSDB_DATA_TYPE_BIGINT) { } else if (ltype == TSDB_DATA_TYPE_BIGINT) {
int64_t i64; int64_t i64 = 0;
SIF_DATA_CONVERT(rtype, right->condValue, i64); SIF_DATA_CONVERT(rtype, right->condValue, i64);
typedata->i64 = i64; typedata->i64 = i64;
param->val = &typedata->i64; param->val = &typedata->i64;
} else if (ltype == TSDB_DATA_TYPE_INT) { } else if (ltype == TSDB_DATA_TYPE_INT) {
int32_t i32; int32_t i32 = 0;
SIF_DATA_CONVERT(rtype, right->condValue, i32); SIF_DATA_CONVERT(rtype, right->condValue, i32);
typedata->i32 = i32; typedata->i32 = i32;
param->val = &typedata->i32; param->val = &typedata->i32;
} else if (ltype == TSDB_DATA_TYPE_SMALLINT) { } else if (ltype == TSDB_DATA_TYPE_SMALLINT) {
int16_t i16; int16_t i16 = 0;
SIF_DATA_CONVERT(rtype, right->condValue, i16); SIF_DATA_CONVERT(rtype, right->condValue, i16);
typedata->i16 = i16; typedata->i16 = i16;
param->val = &typedata->i16; param->val = &typedata->i16;
} else if (ltype == TSDB_DATA_TYPE_TINYINT) { } else if (ltype == TSDB_DATA_TYPE_TINYINT) {
int8_t i8; int8_t i8 = 0;
SIF_DATA_CONVERT(rtype, right->condValue, i8) SIF_DATA_CONVERT(rtype, right->condValue, i8)
typedata->i8 = i8; typedata->i8 = i8;
param->val = &typedata->i8; param->val = &typedata->i8;
} else if (ltype == TSDB_DATA_TYPE_UBIGINT) { } else if (ltype == TSDB_DATA_TYPE_UBIGINT) {
uint64_t u64; uint64_t u64 = 0;
SIF_DATA_CONVERT(rtype, right->condValue, u64); SIF_DATA_CONVERT(rtype, right->condValue, u64);
typedata->u64 = u64; typedata->u64 = u64;
param->val = &typedata->u64; param->val = &typedata->u64;
} else if (ltype == TSDB_DATA_TYPE_UINT) { } else if (ltype == TSDB_DATA_TYPE_UINT) {
uint32_t u32; uint32_t u32 = 0;
SIF_DATA_CONVERT(rtype, right->condValue, u32); SIF_DATA_CONVERT(rtype, right->condValue, u32);
typedata->u32 = u32; typedata->u32 = u32;
param->val = &typedata->u32; param->val = &typedata->u32;
} else if (ltype == TSDB_DATA_TYPE_USMALLINT) { } else if (ltype == TSDB_DATA_TYPE_USMALLINT) {
uint16_t u16; uint16_t u16 = 0;
SIF_DATA_CONVERT(rtype, right->condValue, u16); SIF_DATA_CONVERT(rtype, right->condValue, u16);
typedata->u16 = u16; typedata->u16 = u16;
param->val = &typedata->u16; param->val = &typedata->u16;
} else if (ltype == TSDB_DATA_TYPE_UTINYINT) { } else if (ltype == TSDB_DATA_TYPE_UTINYINT) {
uint8_t u8; uint8_t u8 = 0;
SIF_DATA_CONVERT(rtype, right->condValue, u8); SIF_DATA_CONVERT(rtype, right->condValue, u8);
typedata->u8 = u8; typedata->u8 = u8;
param->val = &typedata->u8; param->val = &typedata->u8;
...@@ -663,7 +663,7 @@ static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) { ...@@ -663,7 +663,7 @@ static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) {
// ugly code, refactor later // ugly code, refactor later
if (nParam > 1 && params[1].status == SFLT_NOT_INDEX) { if (nParam > 1 && params[1].status == SFLT_NOT_INDEX) {
output->status = SFLT_NOT_INDEX; output->status = SFLT_NOT_INDEX;
return code; goto _return;
} }
SIF_ERR_JRET(sifGetOperFn(node->opType, &operFn, &output->status)); SIF_ERR_JRET(sifGetOperFn(node->opType, &operFn, &output->status));
} }
......
...@@ -338,7 +338,7 @@ uint8_t fstStateCommInput(FstState* s, bool* null) { ...@@ -338,7 +338,7 @@ uint8_t fstStateCommInput(FstState* s, bool* null) {
return v; return v;
} }
// 0 indicate that common_input is None // 0 indicate that common_input is None
return v == 0 ? 0 : COMMON_INPUT(v); return COMMON_INPUT(v);
} }
// input_len // input_len
......
...@@ -72,7 +72,8 @@ static int idxFileCtxDoReadFrom(IFileCtx* ctx, uint8_t* buf, int len, int32_t of ...@@ -72,7 +72,8 @@ static int idxFileCtxDoReadFrom(IFileCtx* ctx, uint8_t* buf, int len, int32_t of
if (offset >= ctx->file.size) return 0; if (offset >= ctx->file.size) return 0;
do { do {
char key[128] = {0}; char key[1024] = {0};
assert(strlen(ctx->file.buf) + 1 + 64 < sizeof(key));
idxGenLRUKey(key, ctx->file.buf, blkId); idxGenLRUKey(key, ctx->file.buf, blkId);
LRUHandle* h = taosLRUCacheLookup(ctx->lru, key, strlen(key)); LRUHandle* h = taosLRUCacheLookup(ctx->lru, key, strlen(key));
...@@ -99,6 +100,7 @@ static int idxFileCtxDoReadFrom(IFileCtx* ctx, uint8_t* buf, int len, int32_t of ...@@ -99,6 +100,7 @@ static int idxFileCtxDoReadFrom(IFileCtx* ctx, uint8_t* buf, int len, int32_t of
assert(blk->nread <= kBlockSize); assert(blk->nread <= kBlockSize);
if (blk->nread < kBlockSize && blk->nread < len) { if (blk->nread < kBlockSize && blk->nread < len) {
taosMemoryFree(blk);
break; break;
} }
...@@ -150,7 +152,7 @@ IFileCtx* idxFileCtxCreate(WriterType type, const char* path, bool readOnly, int ...@@ -150,7 +152,7 @@ IFileCtx* idxFileCtxCreate(WriterType type, const char* path, bool readOnly, int
if (ctx->type == TFILE) { if (ctx->type == TFILE) {
// ugly code, refactor later // ugly code, refactor later
ctx->file.readOnly = readOnly; ctx->file.readOnly = readOnly;
memcpy(ctx->file.buf, path, strlen(path)); memcpy(ctx->file.buf, path, sizeof(ctx->file.buf));
if (readOnly == false) { if (readOnly == false) {
ctx->file.pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); ctx->file.pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
taosFtruncateFile(ctx->file.pFile, 0); taosFtruncateFile(ctx->file.pFile, 0);
......
...@@ -506,7 +506,9 @@ TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int64_t version, const c ...@@ -506,7 +506,9 @@ TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int64_t version, const c
tfh.suid = suid; tfh.suid = suid;
tfh.version = version; tfh.version = version;
tfh.colType = colType; tfh.colType = colType;
memcpy(tfh.colName, colName, strlen(colName)); if (strlen(colName) <= sizeof(tfh.colName)) {
memcpy(tfh.colName, colName, strlen(colName));
}
return tfileWriterCreate(wcx, &tfh); return tfileWriterCreate(wcx, &tfh);
} }
...@@ -580,8 +582,14 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) { ...@@ -580,8 +582,14 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
if (cap < ttsz) { if (cap < ttsz) {
cap = ttsz; cap = ttsz;
buf = (char*)taosMemoryRealloc(buf, cap); char* t = (char*)taosMemoryRealloc(buf, cap);
if (t == NULL) {
taosMemoryFree(buf);
return -1;
}
buf = t;
} }
char* p = buf; char* p = buf;
tfileSerialTableIdsToBuf(p, v->tableId); tfileSerialTableIdsToBuf(p, v->tableId);
tw->ctx->write(tw->ctx, buf, ttsz); tw->ctx->write(tw->ctx, buf, ttsz);
......
...@@ -124,8 +124,8 @@ static void optSetParentOrder(SLogicNode* pNode, EOrder order) { ...@@ -124,8 +124,8 @@ static void optSetParentOrder(SLogicNode* pNode, EOrder order) {
EDealRes scanPathOptHaveNormalColImpl(SNode* pNode, void* pContext) { EDealRes scanPathOptHaveNormalColImpl(SNode* pNode, void* pContext) {
if (QUERY_NODE_COLUMN == nodeType(pNode)) { if (QUERY_NODE_COLUMN == nodeType(pNode)) {
*((bool*)pContext) = *((bool*)pContext) =
(COLUMN_TYPE_TAG != ((SColumnNode*)pNode)->colType && COLUMN_TYPE_TBNAME != ((SColumnNode*)pNode)->colType); (COLUMN_TYPE_TAG != ((SColumnNode*)pNode)->colType && COLUMN_TYPE_TBNAME != ((SColumnNode*)pNode)->colType);
return *((bool*)pContext) ? DEAL_RES_END : DEAL_RES_IGNORE_CHILD; return *((bool*)pContext) ? DEAL_RES_END : DEAL_RES_IGNORE_CHILD;
} }
return DEAL_RES_CONTINUE; return DEAL_RES_CONTINUE;
...@@ -1520,11 +1520,15 @@ static bool partTagsHasIndefRowsSelectFunc(SNodeList* pFuncs) { ...@@ -1520,11 +1520,15 @@ static bool partTagsHasIndefRowsSelectFunc(SNodeList* pFuncs) {
return false; return false;
} }
static int32_t partTagsRewriteGroupTagsToFuncs(SNodeList* pGroupTags, SNodeList* pAggFuncs) { static int32_t partTagsRewriteGroupTagsToFuncs(SNodeList* pGroupTags, int32_t start, SNodeList* pAggFuncs) {
bool hasIndefRowsSelectFunc = partTagsHasIndefRowsSelectFunc(pAggFuncs); bool hasIndefRowsSelectFunc = partTagsHasIndefRowsSelectFunc(pAggFuncs);
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t index = 0;
SNode* pNode = NULL; SNode* pNode = NULL;
FOREACH(pNode, pGroupTags) { FOREACH(pNode, pGroupTags) {
if (index++ < start) {
continue;
}
if (hasIndefRowsSelectFunc) { if (hasIndefRowsSelectFunc) {
code = nodesListStrictAppend(pAggFuncs, partTagsCreateWrapperFunc("_select_value", pNode)); code = nodesListStrictAppend(pAggFuncs, partTagsCreateWrapperFunc("_select_value", pNode));
} else { } else {
...@@ -1559,20 +1563,35 @@ static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub ...@@ -1559,20 +1563,35 @@ static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub
} }
} else { } else {
SAggLogicNode* pAgg = (SAggLogicNode*)pNode; SAggLogicNode* pAgg = (SAggLogicNode*)pNode;
int32_t start = -1;
SNode* pGroupKey = NULL; SNode* pGroupKey = NULL;
FOREACH(pGroupKey, pAgg->pGroupKeys) { FOREACH(pGroupKey, pAgg->pGroupKeys) {
code = nodesListMakeStrictAppend( SNode* pGroupExpr = nodesListGetNode(((SGroupingSetNode*)pGroupKey)->pParameterList, 0);
&pScan->pGroupTags, nodesCloneNode(nodesListGetNode(((SGroupingSetNode*)pGroupKey)->pParameterList, 0))); if (NULL != pScan->pGroupTags) {
SNode* pGroupTag = NULL;
FOREACH(pGroupTag, pScan->pGroupTags) {
if (nodesEqualNode(pGroupTag, pGroupExpr)) {
continue;
}
}
}
if (start < 0) {
start = LIST_LENGTH(pScan->pGroupTags);
}
code = nodesListMakeStrictAppend(&pScan->pGroupTags, nodesCloneNode(pGroupExpr));
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
break; break;
} }
} }
NODES_DESTORY_LIST(pAgg->pGroupKeys); NODES_DESTORY_LIST(pAgg->pGroupKeys);
code = partTagsRewriteGroupTagsToFuncs(pScan->pGroupTags, pAgg->pAggFuncs); if (TSDB_CODE_SUCCESS == code && start >= 0) {
code = partTagsRewriteGroupTagsToFuncs(pScan->pGroupTags, start, pAgg->pAggFuncs);
}
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = partTagsOptRebuildTbanme(pScan->pGroupTags); code = partTagsOptRebuildTbanme(pScan->pGroupTags);
} }
pCxt->optimized = true;
return code; return code;
} }
......
...@@ -61,6 +61,8 @@ TEST_F(PlanPartitionByTest, withGroupBy) { ...@@ -61,6 +61,8 @@ TEST_F(PlanPartitionByTest, withGroupBy) {
run("SELECT COUNT(*) FROM t1 PARTITION BY c1 GROUP BY c2"); run("SELECT COUNT(*) FROM t1 PARTITION BY c1 GROUP BY c2");
run("SELECT TBNAME, c1 FROM st1 PARTITION BY TBNAME GROUP BY c1"); run("SELECT TBNAME, c1 FROM st1 PARTITION BY TBNAME GROUP BY c1");
run("SELECT COUNT(*) FROM t1 PARTITION BY TBNAME GROUP BY TBNAME");
} }
TEST_F(PlanPartitionByTest, withTimeLineFunc) { TEST_F(PlanPartitionByTest, withTimeLineFunc) {
......
...@@ -96,6 +96,19 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) { ...@@ -96,6 +96,19 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) {
syncNodeCandidate2Leader(pSyncNode); syncNodeCandidate2Leader(pSyncNode);
pSyncNode->pVotesGranted->toLeader = true; pSyncNode->pVotesGranted->toLeader = true;
return ret; return ret;
}
if (pSyncNode->replicaNum == 1) {
// only myself, to leader
voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum);
syncNodeCandidate2Leader(pSyncNode);
pSyncNode->pVotesGranted->toLeader = true;
return ret;
} }
switch (pSyncNode->pRaftCfg->snapshotStrategy) { switch (pSyncNode->pRaftCfg->snapshotStrategy) {
......
...@@ -1891,8 +1891,26 @@ inline bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config) { ...@@ -1891,8 +1891,26 @@ inline bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config) {
return b1; return b1;
} }
static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg) {
if (pOldCfg->replicaNum != pNewCfg->replicaNum) return true;
if (pOldCfg->myIndex != pNewCfg->myIndex) return true;
for (int32_t i = 0; i < pOldCfg->replicaNum; ++i) {
const SNodeInfo* pOldInfo = &pOldCfg->nodeInfo[i];
const SNodeInfo* pNewInfo = &pNewCfg->nodeInfo[i];
if (strcmp(pOldInfo->nodeFqdn, pNewInfo->nodeFqdn) != 0) return true;
if (pOldInfo->nodePort != pNewInfo->nodePort) return true;
}
return false;
}
void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) { void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
SSyncCfg oldConfig = pSyncNode->pRaftCfg->cfg; SSyncCfg oldConfig = pSyncNode->pRaftCfg->cfg;
if (!syncIsConfigChanged(&oldConfig, pNewConfig)) {
sInfo("vgId:1, sync not reconfig since not changed");
return;
}
pSyncNode->pRaftCfg->cfg = *pNewConfig; pSyncNode->pRaftCfg->cfg = *pNewConfig;
pSyncNode->pRaftCfg->lastConfigIndex = lastConfigChangeIndex; pSyncNode->pRaftCfg->lastConfigIndex = lastConfigChangeIndex;
...@@ -2264,7 +2282,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { ...@@ -2264,7 +2282,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
void syncNodeCandidate2Leader(SSyncNode* pSyncNode) { void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE); ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
ASSERT(voteGrantedMajority(pSyncNode->pVotesGranted)); //ASSERT(voteGrantedMajority(pSyncNode->pVotesGranted));
syncNodeBecomeLeader(pSyncNode, "candidate to leader"); syncNodeBecomeLeader(pSyncNode, "candidate to leader");
syncNodeLog2("==state change syncNodeCandidate2Leader==", pSyncNode); syncNodeLog2("==state change syncNodeCandidate2Leader==", pSyncNode);
......
...@@ -146,7 +146,7 @@ static FORCE_INLINE void clientRecvCb(uv_stream_t* handle, ssize_t nread, const ...@@ -146,7 +146,7 @@ static FORCE_INLINE void clientRecvCb(uv_stream_t* handle, ssize_t nread, const
if (nread < 0) { if (nread < 0) {
uError("http-report recv error:%s", uv_err_name(nread)); uError("http-report recv error:%s", uv_err_name(nread));
} else { } else {
uTrace("http-report succ to recv %d bytes", nread); uTrace("http-report succ to recv %d bytes", (int32_t)nread);
} }
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
} }
......
...@@ -54,11 +54,7 @@ void* rpcOpen(const SRpcInit* pInit) { ...@@ -54,11 +54,7 @@ void* rpcOpen(const SRpcInit* pInit) {
pRpc->retry = pInit->rfp; pRpc->retry = pInit->rfp;
pRpc->startTimer = pInit->tfp; pRpc->startTimer = pInit->tfp;
if (pInit->connType == TAOS_CONN_SERVER) { pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
} else {
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
}
uint32_t ip = 0; uint32_t ip = 0;
if (pInit->connType == TAOS_CONN_SERVER) { if (pInit->connType == TAOS_CONN_SERVER) {
...@@ -79,7 +75,7 @@ void* rpcOpen(const SRpcInit* pInit) { ...@@ -79,7 +75,7 @@ void* rpcOpen(const SRpcInit* pInit) {
} }
pRpc->parent = pInit->parent; pRpc->parent = pInit->parent;
if (pInit->user) { if (pInit->user) {
memcpy(pRpc->user, pInit->user, strlen(pInit->user)); memcpy(pRpc->user, pInit->user, TSDB_UNI_LEN);
} }
int64_t refId = transAddExHandle(transGetInstMgt(), pRpc); int64_t refId = transAddExHandle(transGetInstMgt(), pRpc);
......
...@@ -267,11 +267,12 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { ...@@ -267,11 +267,12 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
#define EPSET_GET_SIZE(epSet) (epSet)->numOfEps #define EPSET_GET_SIZE(epSet) (epSet)->numOfEps
#define EPSET_GET_INUSE_IP(epSet) ((epSet)->eps[(epSet)->inUse].fqdn) #define EPSET_GET_INUSE_IP(epSet) ((epSet)->eps[(epSet)->inUse].fqdn)
#define EPSET_GET_INUSE_PORT(epSet) ((epSet)->eps[(epSet)->inUse].port) #define EPSET_GET_INUSE_PORT(epSet) ((epSet)->eps[(epSet)->inUse].port)
#define EPSET_FORWARD_INUSE(epSet) \ #define EPSET_FORWARD_INUSE(epSet) \
do { \ do { \
if ((epSet)->numOfEps != 0) { \ if ((epSet)->numOfEps != 0) { \
(epSet)->inUse = (++((epSet)->inUse)) % ((epSet)->numOfEps); \ ++((epSet)->inUse); \
} \ (epSet)->inUse = ((epSet)->inUse) % ((epSet)->numOfEps); \
} \
} while (0) } while (0)
#define EPSET_DEBUG_STR(epSet, tbuf) \ #define EPSET_DEBUG_STR(epSet, tbuf) \
...@@ -503,6 +504,7 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { ...@@ -503,6 +504,7 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
SConnList list = {0}; SConnList list = {0};
taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list)); taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list));
plist = taosHashGet((SHashObj*)pool, key, strlen(key)); plist = taosHashGet((SHashObj*)pool, key, strlen(key));
if (plist == NULL) return NULL;
QUEUE_INIT(&plist->conns); QUEUE_INIT(&plist->conns);
} }
...@@ -1157,7 +1159,7 @@ void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, ...@@ -1157,7 +1159,7 @@ void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads,
SCliObj* cli = taosMemoryCalloc(1, sizeof(SCliObj)); SCliObj* cli = taosMemoryCalloc(1, sizeof(SCliObj));
STrans* pTransInst = shandle; STrans* pTransInst = shandle;
memcpy(cli->label, label, strlen(label)); memcpy(cli->label, label, TSDB_LABEL_LEN);
cli->numOfThreads = numOfThreads; cli->numOfThreads = numOfThreads;
cli->pThreadObj = (SCliThrd**)taosMemoryCalloc(cli->numOfThreads, sizeof(SCliThrd*)); cli->pThreadObj = (SCliThrd**)taosMemoryCalloc(cli->numOfThreads, sizeof(SCliThrd*));
...@@ -1611,8 +1613,8 @@ int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { ...@@ -1611,8 +1613,8 @@ int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
SCvtAddr cvtAddr = {0}; SCvtAddr cvtAddr = {0};
if (ip != NULL && fqdn != NULL) { if (ip != NULL && fqdn != NULL) {
memcpy(cvtAddr.ip, ip, strlen(ip)); if (strlen(ip) <= sizeof(cvtAddr.ip)) memcpy(cvtAddr.ip, ip, strlen(ip));
memcpy(cvtAddr.fqdn, fqdn, strlen(fqdn)); if (strlen(fqdn) <= sizeof(cvtAddr.fqdn)) memcpy(cvtAddr.fqdn, fqdn, strlen(fqdn));
cvtAddr.cvt = true; cvtAddr.cvt = true;
} }
for (int i = 0; i < pTransInst->numOfThreads; i++) { for (int i = 0; i < pTransInst->numOfThreads; i++) {
......
...@@ -590,7 +590,9 @@ TdSocketPtr taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t ...@@ -590,7 +590,9 @@ TdSocketPtr taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t
taosCloseSocket(&pSocket); taosCloseSocket(&pSocket);
return NULL; return NULL;
} else { } else {
taosKeepTcpAlive(pSocket); if (taosKeepTcpAlive(pSocket) == -1) {
return NULL;
}
} }
return pSocket; return pSocket;
...@@ -1059,18 +1061,22 @@ int32_t taosCreateSocketWithTimeout(uint32_t timeout) { ...@@ -1059,18 +1061,22 @@ int32_t taosCreateSocketWithTimeout(uint32_t timeout) {
} }
#if defined(WINDOWS) #if defined(WINDOWS)
if (0 != setsockopt(fd, IPPROTO_TCP, TCP_MAXRT, (char *)&timeout, sizeof(timeout))) { if (0 != setsockopt(fd, IPPROTO_TCP, TCP_MAXRT, (char *)&timeout, sizeof(timeout))) {
taosCloseSocketNoCheck1(fd);
return -1; return -1;
} }
#elif defined(_TD_DARWIN_64) #elif defined(_TD_DARWIN_64)
uint32_t conn_timeout_ms = timeout * 1000; uint32_t conn_timeout_ms = timeout * 1000;
if (0 != setsockopt(fd, IPPROTO_TCP, TCP_CONNECTIONTIMEOUT, (char *)&conn_timeout_ms, sizeof(conn_timeout_ms))) { if (0 != setsockopt(fd, IPPROTO_TCP, TCP_CONNECTIONTIMEOUT, (char *)&conn_timeout_ms, sizeof(conn_timeout_ms))) {
taosCloseSocketNoCheck1(fd);
return -1; return -1;
} }
#else // Linux like systems #else // Linux like systems
uint32_t conn_timeout_ms = timeout * 1000; uint32_t conn_timeout_ms = timeout * 1000;
if (0 != setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, (char *)&conn_timeout_ms, sizeof(conn_timeout_ms))) { if (0 != setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, (char *)&conn_timeout_ms, sizeof(conn_timeout_ms))) {
taosCloseSocketNoCheck1(fd);
return -1; return -1;
} }
#endif #endif
return (int)fd; return (int)fd;
} }
...@@ -38,6 +38,7 @@ void *taosInitScheduler(int32_t queueSize, int32_t numOfThreads, const char *lab ...@@ -38,6 +38,7 @@ void *taosInitScheduler(int32_t queueSize, int32_t numOfThreads, const char *lab
if (pSched->queue == NULL) { if (pSched->queue == NULL) {
uError("%s: no enough memory for queue", label); uError("%s: no enough memory for queue", label);
taosCleanUpScheduler(pSched); taosCleanUpScheduler(pSched);
taosMemoryFree(pSched);
return NULL; return NULL;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册