提交 68128bc2 编写于 作者: H Hongze Cheng

Merge branch 'main' of https://github.com/taosdata/TDengine into fix/td-21029

...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
# taosadapter # taosadapter
ExternalProject_Add(taosadapter ExternalProject_Add(taosadapter
GIT_REPOSITORY https://github.com/taosdata/taosadapter.git GIT_REPOSITORY https://github.com/taosdata/taosadapter.git
GIT_TAG 0dfad5b GIT_TAG 566540d
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter" SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter"
BINARY_DIR "" BINARY_DIR ""
#BUILD_IN_SOURCE TRUE #BUILD_IN_SOURCE TRUE
......
...@@ -211,6 +211,8 @@ int32_t catalogGetCachedSTableMeta(SCatalog* pCtg, const SName* pTableName, STab ...@@ -211,6 +211,8 @@ int32_t catalogGetCachedSTableMeta(SCatalog* pCtg, const SName* pTableName, STab
int32_t catalogGetCachedTableHashVgroup(SCatalog* pCtg, const SName* pTableName, SVgroupInfo* pVgroup, bool* exists); int32_t catalogGetCachedTableHashVgroup(SCatalog* pCtg, const SName* pTableName, SVgroupInfo* pVgroup, bool* exists);
int32_t catalogGetCachedTableVgMeta(SCatalog* pCtg, const SName* pTableName, SVgroupInfo* pVgroup, STableMeta** pTableMeta);
/** /**
* Force refresh DB's local cached vgroup info. * Force refresh DB's local cached vgroup info.
* @param pCtg (input, got with catalogGetHandle) * @param pCtg (input, got with catalogGetHandle)
......
...@@ -497,27 +497,27 @@ function install_service_on_systemd() { ...@@ -497,27 +497,27 @@ function install_service_on_systemd() {
taosd_service_config="${service_config_dir}/${serverName}.service" taosd_service_config="${service_config_dir}/${serverName}.service"
${csudo}bash -c "echo [Unit] >> ${taosd_service_config}" ${csudo}bash -c "echo '[Unit]' >> ${taosd_service_config}"
${csudo}bash -c "echo Description=${productName} server service >> ${taosd_service_config}" ${csudo}bash -c "echo 'Description=${productName} server service' >> ${taosd_service_config}"
${csudo}bash -c "echo After=network-online.target >> ${taosd_service_config}" ${csudo}bash -c "echo 'After=network-online.target' >> ${taosd_service_config}"
${csudo}bash -c "echo Wants=network-online.target >> ${taosd_service_config}" ${csudo}bash -c "echo 'Wants=network-online.target' >> ${taosd_service_config}"
${csudo}bash -c "echo >> ${taosd_service_config}" ${csudo}bash -c "echo >> ${taosd_service_config}"
${csudo}bash -c "echo [Service] >> ${taosd_service_config}" ${csudo}bash -c "echo '[Service]' >> ${taosd_service_config}"
${csudo}bash -c "echo Type=simple >> ${taosd_service_config}" ${csudo}bash -c "echo 'Type=simple' >> ${taosd_service_config}"
${csudo}bash -c "echo ExecStart=/usr/bin/${serverName} >> ${taosd_service_config}" ${csudo}bash -c "echo 'ExecStart=/usr/bin/${serverName}' >> ${taosd_service_config}"
${csudo}bash -c "echo ExecStartPre=${installDir}/bin/startPre.sh >> ${taosd_service_config}" ${csudo}bash -c "echo 'ExecStartPre=${installDir}/bin/startPre.sh' >> ${taosd_service_config}"
${csudo}bash -c "echo TimeoutStopSec=1000000s >> ${taosd_service_config}" ${csudo}bash -c "echo 'TimeoutStopSec=1000000s' >> ${taosd_service_config}"
${csudo}bash -c "echo LimitNOFILE=infinity >> ${taosd_service_config}" ${csudo}bash -c "echo 'LimitNOFILE=infinity' >> ${taosd_service_config}"
${csudo}bash -c "echo LimitNPROC=infinity >> ${taosd_service_config}" ${csudo}bash -c "echo 'LimitNPROC=infinity' >> ${taosd_service_config}"
${csudo}bash -c "echo LimitCORE=infinity >> ${taosd_service_config}" ${csudo}bash -c "echo 'LimitCORE=infinity' >> ${taosd_service_config}"
${csudo}bash -c "echo TimeoutStartSec=0 >> ${taosd_service_config}" ${csudo}bash -c "echo 'TimeoutStartSec=0' >> ${taosd_service_config}"
${csudo}bash -c "echo StandardOutput=null >> ${taosd_service_config}" ${csudo}bash -c "echo 'StandardOutput=null' >> ${taosd_service_config}"
${csudo}bash -c "echo Restart=always >> ${taosd_service_config}" ${csudo}bash -c "echo 'Restart=always' >> ${taosd_service_config}"
${csudo}bash -c "echo StartLimitBurst=3 >> ${taosd_service_config}" ${csudo}bash -c "echo 'StartLimitBurst=3' >> ${taosd_service_config}"
${csudo}bash -c "echo StartLimitInterval=60s >> ${taosd_service_config}" ${csudo}bash -c "echo 'StartLimitInterval=60s' >> ${taosd_service_config}"
${csudo}bash -c "echo >> ${taosd_service_config}" ${csudo}bash -c "echo >> ${taosd_service_config}"
${csudo}bash -c "echo [Install] >> ${taosd_service_config}" ${csudo}bash -c "echo '[Install]' >> ${taosd_service_config}"
${csudo}bash -c "echo WantedBy=multi-user.target >> ${taosd_service_config}" ${csudo}bash -c "echo 'WantedBy=multi-user.target' >> ${taosd_service_config}"
${csudo}systemctl enable ${serverName} ${csudo}systemctl enable ${serverName}
} }
......
...@@ -187,7 +187,7 @@ if [[ $productName == "TDengine" ]]; then ...@@ -187,7 +187,7 @@ if [[ $productName == "TDengine" ]]; then
git clone --depth 1 https://github.com/taosdata/taos-connector-dotnet ${install_dir}/connector/dotnet git clone --depth 1 https://github.com/taosdata/taos-connector-dotnet ${install_dir}/connector/dotnet
rm -rf ${install_dir}/connector/dotnet/.git ||: rm -rf ${install_dir}/connector/dotnet/.git ||:
# cp -r ${connector_dir}/nodejs ${install_dir}/connector # cp -r ${connector_dir}/nodejs ${install_dir}/connector
git clone --depth 1 https://github.com/taosdata/libtaos-rs ${install_dir}/connector/rust git clone --depth 1 https://github.com/taosdata/taos-connector-rust ${install_dir}/connector/rust
rm -rf ${install_dir}/connector/rust/.git ||: rm -rf ${install_dir}/connector/rust/.git ||:
fi fi
fi fi
......
...@@ -318,7 +318,7 @@ if [ "$verMode" == "cluster" ] || [ "$verMode" == "cloud" ]; then ...@@ -318,7 +318,7 @@ if [ "$verMode" == "cluster" ] || [ "$verMode" == "cloud" ]; then
git clone --depth 1 https://github.com/taosdata/taos-connector-dotnet ${install_dir}/connector/dotnet git clone --depth 1 https://github.com/taosdata/taos-connector-dotnet ${install_dir}/connector/dotnet
rm -rf ${install_dir}/connector/dotnet/.git ||: rm -rf ${install_dir}/connector/dotnet/.git ||:
git clone --depth 1 https://github.com/taosdata/libtaos-rs ${install_dir}/connector/rust git clone --depth 1 https://github.com/taosdata/taos-connector-rust ${install_dir}/connector/rust
rm -rf ${install_dir}/connector/rust/.git ||: rm -rf ${install_dir}/connector/rust/.git ||:
# cp -r ${connector_dir}/python ${install_dir}/connector # cp -r ${connector_dir}/python ${install_dir}/connector
......
...@@ -21,12 +21,12 @@ ...@@ -21,12 +21,12 @@
#include "os.h" #include "os.h"
#include "query.h" #include "query.h"
#include "scheduler.h" #include "scheduler.h"
#include "tdatablock.h"
#include "tglobal.h" #include "tglobal.h"
#include "tmsg.h" #include "tmsg.h"
#include "tref.h" #include "tref.h"
#include "trpc.h" #include "trpc.h"
#include "version.h" #include "version.h"
#include "tdatablock.h"
#define TSC_VAR_NOT_RELEASE 1 #define TSC_VAR_NOT_RELEASE 1
#define TSC_VAR_RELEASED 0 #define TSC_VAR_RELEASED 0
...@@ -178,6 +178,8 @@ void taos_free_result(TAOS_RES *res) { ...@@ -178,6 +178,8 @@ void taos_free_result(TAOS_RES *res) {
return; return;
} }
tscDebug("taos free res %p", res);
if (TD_RES_QUERY(res)) { if (TD_RES_QUERY(res)) {
SRequestObj *pRequest = (SRequestObj *)res; SRequestObj *pRequest = (SRequestObj *)res;
tscDebug("0x%" PRIx64 " taos_free_result start to free query", pRequest->requestId); tscDebug("0x%" PRIx64 " taos_free_result start to free query", pRequest->requestId);
...@@ -796,10 +798,11 @@ static void doAsyncQueryFromParse(SMetaData *pResultMeta, void *param, int32_t c ...@@ -796,10 +798,11 @@ static void doAsyncQueryFromParse(SMetaData *pResultMeta, void *param, int32_t c
SQuery *pQuery = pRequest->pQuery; SQuery *pQuery = pRequest->pQuery;
pRequest->metric.ctgEnd = taosGetTimestampUs(); pRequest->metric.ctgEnd = taosGetTimestampUs();
qDebug("0x%" PRIx64 " start to continue parse, reqId:0x%" PRIx64 ", code:%s", pRequest->self, pRequest->requestId, tstrerror(code)); qDebug("0x%" PRIx64 " start to continue parse, reqId:0x%" PRIx64 ", code:%s", pRequest->self, pRequest->requestId,
tstrerror(code));
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
pWrapper->pCatalogReq->forceUpdate = false; //pWrapper->pCatalogReq->forceUpdate = false;
code = qContinueParseSql(pWrapper->pParseCtx, pWrapper->pCatalogReq, pResultMeta, pQuery); code = qContinueParseSql(pWrapper->pParseCtx, pWrapper->pCatalogReq, pResultMeta, pQuery);
} }
...@@ -880,6 +883,11 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { ...@@ -880,6 +883,11 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
if (pRequest->retry++ > REQUEST_TOTAL_EXEC_TIMES) { if (pRequest->retry++ > REQUEST_TOTAL_EXEC_TIMES) {
code = pRequest->prevCode; code = pRequest->prevCode;
terrno = code;
pRequest->code = code;
tscDebug("call sync query cb with code: %s", tstrerror(code));
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
return;
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
...@@ -930,6 +938,17 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { ...@@ -930,6 +938,17 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
tscError("0x%" PRIx64 " error happens, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code), tscError("0x%" PRIx64 " error happens, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code),
pRequest->requestId); pRequest->requestId);
destorySqlCallbackWrapper(pWrapper); destorySqlCallbackWrapper(pWrapper);
qDestroyQuery(pRequest->pQuery);
pRequest->pQuery = NULL;
if (NEED_CLIENT_HANDLE_ERROR(code)) {
tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64,
pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId);
pRequest->prevCode = code;
doAsyncQuery(pRequest, true);
return;
}
terrno = code; terrno = code;
pRequest->code = code; pRequest->code = code;
pRequest->body.queryFp(pRequest->body.param, pRequest, code); pRequest->body.queryFp(pRequest->body.param, pRequest, code);
......
...@@ -1215,6 +1215,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -1215,6 +1215,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet); taosMemoryFree(pMsg->pEpSet);
tscDebug("consumer:%" PRId64 ", put poll res into mqueue %p", tmq->consumerId, pRspWrapper);
taosWriteQitem(tmq->mqueue, pRspWrapper); taosWriteQitem(tmq->mqueue, pRspWrapper);
tsem_post(&tmq->rspSem); tsem_post(&tmq->rspSem);
...@@ -1664,6 +1666,8 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1664,6 +1666,8 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
} }
} }
tscDebug("consumer:%" PRId64 " handle rsp %p", tmq->consumerId, rspWrapper);
if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) { if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
taosFreeQitem(rspWrapper); taosFreeQitem(rspWrapper);
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET; terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
......
...@@ -1946,9 +1946,9 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) ...@@ -1946,9 +1946,9 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
int32_t len = 0; int32_t len = 0;
len += snprintf(dumpBuf + len, size - len, len += snprintf(dumpBuf + len, size - len,
"===stream===%s|block type %d|child id %d|group id:%" PRIu64 "|uid:%" PRId64 "===stream===%s|block type %d|child id %d|group id:%" PRIu64 "|uid:%" PRId64
"|rows:%d|version:%" PRIu64 "\n", "|rows:%d|version:%" PRIu64 "|cal start:%" PRIu64 "|cal end:%" PRIu64 "\n",
flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.id.groupId, flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.id.groupId,
pDataBlock->info.id.uid, pDataBlock->info.rows, pDataBlock->info.version); pDataBlock->info.id.uid, pDataBlock->info.rows, pDataBlock->info.version, pDataBlock->info.calWin.skey, pDataBlock->info.calWin.ekey);
if (len >= size - 1) return dumpBuf; if (len >= size - 1) return dumpBuf;
for (int32_t j = 0; j < rows; j++) { for (int32_t j = 0; j < rows; j++) {
......
...@@ -507,7 +507,7 @@ static int32_t mndGetAvailableDnode(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup ...@@ -507,7 +507,7 @@ static int32_t mndGetAvailableDnode(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup
for (int32_t v = 0; v < pVgroup->replica; ++v) { for (int32_t v = 0; v < pVgroup->replica; ++v) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[v]; SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
SDnodeObj *pDnode = taosArrayGet(pArray, v); SDnodeObj *pDnode = taosArrayGet(pArray, v);
if (pDnode == NULL || pDnode->numOfVnodes > pDnode->numOfSupportVnodes) { if (pDnode == NULL || pDnode->numOfVnodes >= pDnode->numOfSupportVnodes) {
terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES; terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
return -1; return -1;
} }
...@@ -891,7 +891,7 @@ static int32_t mndAddVnodeToVgroup(SMnode *pMnode, STrans *pTrans, SVgObj *pVgro ...@@ -891,7 +891,7 @@ static int32_t mndAddVnodeToVgroup(SMnode *pMnode, STrans *pTrans, SVgObj *pVgro
} }
if (used) continue; if (used) continue;
if (pDnode == NULL || pDnode->numOfVnodes > pDnode->numOfSupportVnodes) { if (pDnode == NULL || pDnode->numOfVnodes >= pDnode->numOfSupportVnodes) {
terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES; terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
return -1; return -1;
} }
......
...@@ -953,6 +953,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs ...@@ -953,6 +953,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
code = tsdbReadDelIdx(pDelFReader, pDelIdxArray); code = tsdbReadDelIdx(pDelFReader, pDelIdxArray);
if (code) { if (code) {
taosArrayDestroy(pDelIdxArray);
tsdbDelFReaderClose(&pDelFReader); tsdbDelFReaderClose(&pDelFReader);
goto _err; goto _err;
} }
...@@ -961,6 +962,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs ...@@ -961,6 +962,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
code = getTableDelSkyline(pMem, pIMem, pDelFReader, delIdx, pIter->pSkyline); code = getTableDelSkyline(pMem, pIMem, pDelFReader, delIdx, pIter->pSkyline);
if (code) { if (code) {
taosArrayDestroy(pDelIdxArray);
tsdbDelFReaderClose(&pDelFReader); tsdbDelFReaderClose(&pDelFReader);
goto _err; goto _err;
} }
......
...@@ -798,6 +798,9 @@ SName* ctgGetFetchName(SArray* pNames, SCtgFetch* pFetch); ...@@ -798,6 +798,9 @@ SName* ctgGetFetchName(SArray* pNames, SCtgFetch* pFetch);
int32_t ctgdGetOneHandle(SCatalog **pHandle); int32_t ctgdGetOneHandle(SCatalog **pHandle);
int ctgVgInfoComp(const void* lp, const void* rp); int ctgVgInfoComp(const void* lp, const void* rp);
int32_t ctgMakeVgArray(SDBVgInfo* dbInfo); int32_t ctgMakeVgArray(SDBVgInfo* dbInfo);
int32_t ctgAcquireVgMetaFromCache(SCatalog *pCtg, const char *dbFName, const char *tbName, SCtgDBCache **pDb, SCtgTbCache **pTb);
int32_t ctgCopyTbMeta(SCatalog *pCtg, SCtgTbMetaCtx *ctx, SCtgDBCache **pDb, SCtgTbCache **pTb, STableMeta **pTableMeta, char* dbFName);
void ctgReleaseVgMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache);
extern SCatalogMgmt gCtgMgmt; extern SCatalogMgmt gCtgMgmt;
extern SCtgDebug gCTGDebug; extern SCtgDebug gCTGDebug;
......
...@@ -551,6 +551,35 @@ _return: ...@@ -551,6 +551,35 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgGetCachedTbVgMeta(SCatalog* pCtg, const SName* pTableName, SVgroupInfo* pVgroup, STableMeta** pTableMeta) {
int32_t code = 0;
char db[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(pTableName, db);
SCtgDBCache *dbCache = NULL;
SCtgTbCache *tbCache = NULL;
CTG_ERR_RET(ctgAcquireVgMetaFromCache(pCtg, db, pTableName->tname, &dbCache, &tbCache));
if (NULL == dbCache || NULL == tbCache) {
*pTableMeta = NULL;
return TSDB_CODE_SUCCESS;
}
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, pTableName, pVgroup));
SCtgTbMetaCtx ctx = {0};
ctx.pName = (SName*)pTableName;
ctx.flag = CTG_FLAG_UNKNOWN_STB;
CTG_ERR_JRET(ctgCopyTbMeta(pCtg, &ctx, &dbCache, &tbCache, pTableMeta, db));
_return:
ctgReleaseVgMetaToCache(pCtg, dbCache, tbCache);
CTG_RET(code);
}
int32_t ctgRemoveTbMeta(SCatalog* pCtg, SName* pTableName) { int32_t ctgRemoveTbMeta(SCatalog* pCtg, SName* pTableName) {
int32_t code = 0; int32_t code = 0;
...@@ -1118,6 +1147,13 @@ int32_t catalogGetCachedTableHashVgroup(SCatalog* pCtg, const SName* pTableName, ...@@ -1118,6 +1147,13 @@ int32_t catalogGetCachedTableHashVgroup(SCatalog* pCtg, const SName* pTableName,
CTG_API_LEAVE(ctgGetTbHashVgroup(pCtg, NULL, pTableName, pVgroup, exists)); CTG_API_LEAVE(ctgGetTbHashVgroup(pCtg, NULL, pTableName, pVgroup, exists));
} }
int32_t catalogGetCachedTableVgMeta(SCatalog* pCtg, const SName* pTableName, SVgroupInfo* pVgroup, STableMeta** pTableMeta) {
CTG_API_ENTER();
CTG_API_LEAVE(ctgGetCachedTbVgMeta(pCtg, pTableName, pVgroup, pTableMeta));
}
#if 0 #if 0
int32_t catalogGetAllMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SCatalogReq* pReq, SMetaData* pRsp) { int32_t catalogGetAllMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SCatalogReq* pReq, SMetaData* pRsp) {
CTG_API_ENTER(); CTG_API_ENTER();
......
...@@ -130,7 +130,7 @@ void ctgReleaseVgInfoToCache(SCatalog *pCtg, SCtgDBCache *dbCache) { ...@@ -130,7 +130,7 @@ void ctgReleaseVgInfoToCache(SCatalog *pCtg, SCtgDBCache *dbCache) {
} }
void ctgReleaseTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache) { void ctgReleaseTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache) {
if (pCache) { if (pCache && dbCache) {
CTG_UNLOCK(CTG_READ, &pCache->metaLock); CTG_UNLOCK(CTG_READ, &pCache->metaLock);
taosHashRelease(dbCache->tbCache, pCache); taosHashRelease(dbCache->tbCache, pCache);
} }
...@@ -151,6 +151,18 @@ void ctgReleaseTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache ...@@ -151,6 +151,18 @@ void ctgReleaseTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache
} }
} }
void ctgReleaseVgMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache) {
if (pCache && dbCache) {
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
taosHashRelease(dbCache->tbCache, pCache);
}
if (dbCache) {
ctgRUnlockVgInfo(dbCache);
ctgReleaseDBCache(pCtg, dbCache);
}
}
int32_t ctgAcquireVgInfoFromCache(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache) { int32_t ctgAcquireVgInfoFromCache(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache) {
SCtgDBCache *dbCache = NULL; SCtgDBCache *dbCache = NULL;
ctgAcquireDBCache(pCtg, dbFName, &dbCache); ctgAcquireDBCache(pCtg, dbFName, &dbCache);
...@@ -226,6 +238,75 @@ _return: ...@@ -226,6 +238,75 @@ _return:
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgAcquireVgMetaFromCache(SCatalog *pCtg, const char *dbFName, const char *tbName, SCtgDBCache **pDb, SCtgTbCache **pTb) {
SCtgDBCache *dbCache = NULL;
SCtgTbCache *tbCache = NULL;
bool vgInCache = false;
ctgAcquireDBCache(pCtg, dbFName, &dbCache);
if (NULL == dbCache) {
ctgDebug("db %s not in cache", dbFName);
CTG_CACHE_STAT_INC(numOfVgMiss, 1);
goto _return;
}
ctgRLockVgInfo(pCtg, dbCache, &vgInCache);
if (!vgInCache) {
ctgDebug("vgInfo of db %s not in cache", dbFName);
CTG_CACHE_STAT_INC(numOfVgMiss, 1);
goto _return;
}
*pDb = dbCache;
CTG_CACHE_STAT_INC(numOfVgHit, 1);
ctgDebug("Got db vgInfo from cache, dbFName:%s", dbFName);
tbCache = taosHashAcquire(dbCache->tbCache, tbName, strlen(tbName));
if (NULL == tbCache) {
ctgDebug("tb %s not in cache, dbFName:%s", tbName, dbFName);
CTG_CACHE_STAT_INC(numOfMetaMiss, 1);
goto _return;
}
CTG_LOCK(CTG_READ, &tbCache->metaLock);
if (NULL == tbCache->pMeta) {
ctgDebug("tb %s meta not in cache, dbFName:%s", tbName, dbFName);
CTG_CACHE_STAT_INC(numOfMetaMiss, 1);
goto _return;
}
*pTb = tbCache;
ctgDebug("tb %s meta got in cache, dbFName:%s", tbName, dbFName);
CTG_CACHE_STAT_INC(numOfMetaHit, 1);
return TSDB_CODE_SUCCESS;
_return:
if (tbCache) {
CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
taosHashRelease(dbCache->tbCache, tbCache);
}
if (vgInCache) {
ctgRUnlockVgInfo(dbCache);
}
if (dbCache) {
ctgReleaseDBCache(pCtg, dbCache);
}
*pDb = NULL;
*pTb = NULL;
return TSDB_CODE_SUCCESS;
}
/* /*
int32_t ctgAcquireStbMetaFromCache(SCatalog *pCtg, char *dbFName, uint64_t suid, SCtgDBCache **pDb, SCtgTbCache **pTb) { int32_t ctgAcquireStbMetaFromCache(SCatalog *pCtg, char *dbFName, uint64_t suid, SCtgDBCache **pDb, SCtgTbCache **pTb) {
SCtgDBCache *dbCache = NULL; SCtgDBCache *dbCache = NULL;
...@@ -378,25 +459,9 @@ int32_t ctgTbMetaExistInCache(SCatalog *pCtg, char *dbFName, char *tbName, int32 ...@@ -378,25 +459,9 @@ int32_t ctgTbMetaExistInCache(SCatalog *pCtg, char *dbFName, char *tbName, int32
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgReadTbMetaFromCache(SCatalog *pCtg, SCtgTbMetaCtx *ctx, STableMeta **pTableMeta) { int32_t ctgCopyTbMeta(SCatalog *pCtg, SCtgTbMetaCtx *ctx, SCtgDBCache **pDb, SCtgTbCache **pTb, STableMeta **pTableMeta, char* dbFName) {
int32_t code = 0; SCtgDBCache *dbCache = *pDb;
SCtgDBCache *dbCache = NULL; SCtgTbCache *tbCache = *pTb;
SCtgTbCache *tbCache = NULL;
*pTableMeta = NULL;
char dbFName[TSDB_DB_FNAME_LEN] = {0};
if (CTG_FLAG_IS_SYS_DB(ctx->flag)) {
strcpy(dbFName, ctx->pName->dbname);
} else {
tNameGetFullDbName(ctx->pName, dbFName);
}
ctgAcquireTbMetaFromCache(pCtg, dbFName, ctx->pName->tname, &dbCache, &tbCache);
if (NULL == tbCache) {
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
return TSDB_CODE_SUCCESS;
}
STableMeta *tbMeta = tbCache->pMeta; STableMeta *tbMeta = tbCache->pMeta;
ctx->tbInfo.inCache = true; ctx->tbInfo.inCache = true;
ctx->tbInfo.dbId = dbCache->dbId; ctx->tbInfo.dbId = dbCache->dbId;
...@@ -407,13 +472,11 @@ int32_t ctgReadTbMetaFromCache(SCatalog *pCtg, SCtgTbMetaCtx *ctx, STableMeta ** ...@@ -407,13 +472,11 @@ int32_t ctgReadTbMetaFromCache(SCatalog *pCtg, SCtgTbMetaCtx *ctx, STableMeta **
int32_t metaSize = CTG_META_SIZE(tbMeta); int32_t metaSize = CTG_META_SIZE(tbMeta);
*pTableMeta = taosMemoryCalloc(1, metaSize); *pTableMeta = taosMemoryCalloc(1, metaSize);
if (NULL == *pTableMeta) { if (NULL == *pTableMeta) {
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
} }
memcpy(*pTableMeta, tbMeta, metaSize); memcpy(*pTableMeta, tbMeta, metaSize);
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", ctx->pName->tname, tbMeta->tableType, dbFName); ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", ctx->pName->tname, tbMeta->tableType, dbFName);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -423,7 +486,7 @@ int32_t ctgReadTbMetaFromCache(SCatalog *pCtg, SCtgTbMetaCtx *ctx, STableMeta ** ...@@ -423,7 +486,7 @@ int32_t ctgReadTbMetaFromCache(SCatalog *pCtg, SCtgTbMetaCtx *ctx, STableMeta **
int32_t metaSize = sizeof(SCTableMeta); int32_t metaSize = sizeof(SCTableMeta);
*pTableMeta = taosMemoryCalloc(1, metaSize); *pTableMeta = taosMemoryCalloc(1, metaSize);
if (NULL == *pTableMeta) { if (NULL == *pTableMeta) {
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
memcpy(*pTableMeta, tbMeta, metaSize); memcpy(*pTableMeta, tbMeta, metaSize);
...@@ -433,6 +496,7 @@ int32_t ctgReadTbMetaFromCache(SCatalog *pCtg, SCtgTbMetaCtx *ctx, STableMeta ** ...@@ -433,6 +496,7 @@ int32_t ctgReadTbMetaFromCache(SCatalog *pCtg, SCtgTbMetaCtx *ctx, STableMeta **
if (tbCache) { if (tbCache) {
CTG_UNLOCK(CTG_READ, &tbCache->metaLock); CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
taosHashRelease(dbCache->tbCache, tbCache); taosHashRelease(dbCache->tbCache, tbCache);
*pTb = NULL;
} }
ctgDebug("Got ctb %s meta from cache, will continue to get its stb meta, type:%d, dbFName:%s", ctx->pName->tname, ctgDebug("Got ctb %s meta from cache, will continue to get its stb meta, type:%d, dbFName:%s", ctx->pName->tname,
...@@ -440,28 +504,54 @@ int32_t ctgReadTbMetaFromCache(SCatalog *pCtg, SCtgTbMetaCtx *ctx, STableMeta ** ...@@ -440,28 +504,54 @@ int32_t ctgReadTbMetaFromCache(SCatalog *pCtg, SCtgTbMetaCtx *ctx, STableMeta **
ctgAcquireStbMetaFromCache(dbCache, pCtg, dbFName, ctx->tbInfo.suid, &tbCache); ctgAcquireStbMetaFromCache(dbCache, pCtg, dbFName, ctx->tbInfo.suid, &tbCache);
if (NULL == tbCache) { if (NULL == tbCache) {
//ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
taosMemoryFreeClear(*pTableMeta); taosMemoryFreeClear(*pTableMeta);
*pDb = NULL;
ctgDebug("stb 0x%" PRIx64 " meta not in cache", ctx->tbInfo.suid); ctgDebug("stb 0x%" PRIx64 " meta not in cache", ctx->tbInfo.suid);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
*pTb = tbCache;
STableMeta *stbMeta = tbCache->pMeta; STableMeta *stbMeta = tbCache->pMeta;
if (stbMeta->suid != ctx->tbInfo.suid) { if (stbMeta->suid != ctx->tbInfo.suid) {
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid 0x%" PRIx64, stbMeta->suid, ctx->tbInfo.suid); ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid 0x%" PRIx64, stbMeta->suid, ctx->tbInfo.suid);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); taosMemoryFreeClear(*pTableMeta);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
} }
metaSize = CTG_META_SIZE(stbMeta); metaSize = CTG_META_SIZE(stbMeta);
*pTableMeta = taosMemoryRealloc(*pTableMeta, metaSize); *pTableMeta = taosMemoryRealloc(*pTableMeta, metaSize);
if (NULL == *pTableMeta) { if (NULL == *pTableMeta) {
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
} }
memcpy(&(*pTableMeta)->sversion, &stbMeta->sversion, metaSize - sizeof(SCTableMeta)); memcpy(&(*pTableMeta)->sversion, &stbMeta->sversion, metaSize - sizeof(SCTableMeta));
return TSDB_CODE_SUCCESS;
}
int32_t ctgReadTbMetaFromCache(SCatalog *pCtg, SCtgTbMetaCtx *ctx, STableMeta **pTableMeta) {
int32_t code = 0;
SCtgDBCache *dbCache = NULL;
SCtgTbCache *tbCache = NULL;
*pTableMeta = NULL;
char dbFName[TSDB_DB_FNAME_LEN] = {0};
if (CTG_FLAG_IS_SYS_DB(ctx->flag)) {
strcpy(dbFName, ctx->pName->dbname);
} else {
tNameGetFullDbName(ctx->pName, dbFName);
}
ctgAcquireTbMetaFromCache(pCtg, dbFName, ctx->pName->tname, &dbCache, &tbCache);
if (NULL == tbCache) {
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
return TSDB_CODE_SUCCESS;
}
CTG_ERR_JRET(ctgCopyTbMeta(pCtg, ctx, &dbCache, &tbCache, pTableMeta, dbFName));
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
ctgDebug("Got tb %s meta from cache, dbFName:%s", ctx->pName->tname, dbFName); ctgDebug("Got tb %s meta from cache, dbFName:%s", ctx->pName->tname, dbFName);
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#include "ttime.h" #include "ttime.h"
#define IS_FINAL_OP(op) ((op)->isFinal) #define IS_FINAL_OP(op) ((op)->isFinal)
#define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL);
typedef struct SSessionAggOperatorInfo { typedef struct SSessionAggOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
...@@ -56,6 +57,7 @@ typedef enum SResultTsInterpType { ...@@ -56,6 +57,7 @@ typedef enum SResultTsInterpType {
typedef struct SPullWindowInfo { typedef struct SPullWindowInfo {
STimeWindow window; STimeWindow window;
uint64_t groupId; uint64_t groupId;
STimeWindow calWin;
} SPullWindowInfo; } SPullWindowInfo;
typedef struct SOpenWindowInfo { typedef struct SOpenWindowInfo {
...@@ -793,17 +795,18 @@ int32_t comparePullWinKey(void* pKey, void* data, int32_t index) { ...@@ -793,17 +795,18 @@ int32_t comparePullWinKey(void* pKey, void* data, int32_t index) {
SArray* res = (SArray*)data; SArray* res = (SArray*)data;
SPullWindowInfo* pos = taosArrayGet(res, index); SPullWindowInfo* pos = taosArrayGet(res, index);
SPullWindowInfo* pData = (SPullWindowInfo*)pKey; SPullWindowInfo* pData = (SPullWindowInfo*)pKey;
if (pData->window.skey == pos->window.skey) {
if (pData->groupId > pos->groupId) { if (pData->groupId > pos->groupId) {
return 1; return 1;
} else if (pData->groupId < pos->groupId) { } else if (pData->groupId < pos->groupId) {
return -1; return -1;
} }
return 0;
} else if (pData->window.skey > pos->window.skey) { if (pData->window.skey > pos->window.ekey) {
return 1; return 1;
} } else if (pData->window.ekey < pos->window.skey) {
return -1; return -1;
}
return 0;
} }
static int32_t savePullWindow(SPullWindowInfo* pPullInfo, SArray* pPullWins) { static int32_t savePullWindow(SPullWindowInfo* pPullInfo, SArray* pPullWins) {
...@@ -812,10 +815,16 @@ static int32_t savePullWindow(SPullWindowInfo* pPullInfo, SArray* pPullWins) { ...@@ -812,10 +815,16 @@ static int32_t savePullWindow(SPullWindowInfo* pPullInfo, SArray* pPullWins) {
if (index == -1) { if (index == -1) {
index = 0; index = 0;
} else { } else {
if (comparePullWinKey(pPullInfo, pPullWins, index) > 0) { int32_t code = comparePullWinKey(pPullInfo, pPullWins, index);
index++; if (code == 0) {
} else { SPullWindowInfo* pos = taosArrayGet(pPullWins ,index);
pos->window.skey = TMIN(pos->window.skey, pPullInfo->window.skey);
pos->window.ekey = TMAX(pos->window.ekey, pPullInfo->window.ekey);
pos->calWin.skey = TMIN(pos->calWin.skey, pPullInfo->calWin.skey);
pos->calWin.ekey = TMAX(pos->calWin.ekey, pPullInfo->calWin.ekey);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else if (code > 0 ){
index++;
} }
} }
if (taosArrayInsert(pPullWins, index, pPullInfo) == NULL) { if (taosArrayInsert(pPullWins, index, pPullInfo) == NULL) {
...@@ -2255,8 +2264,8 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB ...@@ -2255,8 +2264,8 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB
colDataAppend(pStartTs, pBlock->info.rows, (const char*)&pWin->window.skey, false); colDataAppend(pStartTs, pBlock->info.rows, (const char*)&pWin->window.skey, false);
colDataAppend(pEndTs, pBlock->info.rows, (const char*)&pWin->window.ekey, false); colDataAppend(pEndTs, pBlock->info.rows, (const char*)&pWin->window.ekey, false);
colDataAppend(pGroupId, pBlock->info.rows, (const char*)&pWin->groupId, false); colDataAppend(pGroupId, pBlock->info.rows, (const char*)&pWin->groupId, false);
colDataAppend(pCalStartTs, pBlock->info.rows, (const char*)&pWin->window.skey, false); colDataAppend(pCalStartTs, pBlock->info.rows, (const char*)&pWin->calWin.skey, false);
colDataAppend(pCalEndTs, pBlock->info.rows, (const char*)&pWin->window.ekey, false); colDataAppend(pCalEndTs, pBlock->info.rows, (const char*)&pWin->calWin.ekey, false);
pBlock->info.rows++; pBlock->info.rows++;
} }
if ((*pIndex) == size) { if ((*pIndex) == size) {
...@@ -2266,14 +2275,18 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB ...@@ -2266,14 +2275,18 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB
blockDataUpdateTsWindow(pBlock, 0); blockDataUpdateTsWindow(pBlock, 0);
} }
void processPullOver(SSDataBlock* pBlock, SHashObj* pMap) { void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SInterval* pInterval) {
SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
TSKEY* tsData = (TSKEY*)pStartCol->pData; TSKEY* tsData = (TSKEY*)pStartCol->pData;
SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
TSKEY* tsEndData = (TSKEY*)pEndCol->pData;
SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
uint64_t* groupIdData = (uint64_t*)pGroupCol->pData; uint64_t* groupIdData = (uint64_t*)pGroupCol->pData;
int32_t chId = getChildIndex(pBlock); int32_t chId = getChildIndex(pBlock);
for (int32_t i = 0; i < pBlock->info.rows; i++) { for (int32_t i = 0; i < pBlock->info.rows; i++) {
SWinKey winRes = {.ts = tsData[i], .groupId = groupIdData[i]}; TSKEY winTs = tsData[i];
while (winTs < tsEndData[i]) {
SWinKey winRes = {.ts = winTs, .groupId = groupIdData[i]};
void* chIds = taosHashGet(pMap, &winRes, sizeof(SWinKey)); void* chIds = taosHashGet(pMap, &winRes, sizeof(SWinKey));
if (chIds) { if (chIds) {
SArray* chArray = *(SArray**)chIds; SArray* chArray = *(SArray**)chIds;
...@@ -2288,6 +2301,8 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap) { ...@@ -2288,6 +2301,8 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap) {
} }
} }
} }
winTs = taosTimeAdd(winTs, pInterval->sliding, pInterval->slidingUnit, pInterval->precision);
}
} }
} }
...@@ -2299,15 +2314,16 @@ static void addRetriveWindow(SArray* wins, SStreamIntervalOperatorInfo* pInfo) { ...@@ -2299,15 +2314,16 @@ static void addRetriveWindow(SArray* wins, SStreamIntervalOperatorInfo* pInfo) {
if (needDeleteWindowBuf(&nextWin, &pInfo->twAggSup) && !pInfo->ignoreExpiredData) { if (needDeleteWindowBuf(&nextWin, &pInfo->twAggSup) && !pInfo->ignoreExpiredData) {
void* chIds = taosHashGet(pInfo->pPullDataMap, winKey, sizeof(SWinKey)); void* chIds = taosHashGet(pInfo->pPullDataMap, winKey, sizeof(SWinKey));
if (!chIds) { if (!chIds) {
SPullWindowInfo pull = {.window = nextWin, .groupId = winKey->groupId}; SPullWindowInfo pull = {.window = nextWin, .groupId = winKey->groupId, .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey};
// add pull data request // add pull data request
savePullWindow(&pull, pInfo->pPullWins); if (savePullWindow(&pull, pInfo->pPullWins) == TSDB_CODE_SUCCESS) {
int32_t size1 = taosArrayGetSize(pInfo->pChildren); int32_t size1 = taosArrayGetSize(pInfo->pChildren);
addPullWindow(pInfo->pPullDataMap, winKey, size1); addPullWindow(pInfo->pPullDataMap, winKey, size1);
qDebug("===stream===prepare retrive for delete %" PRId64 ", size:%d", winKey->ts, size1); qDebug("===stream===prepare retrive for delete %" PRId64 ", size:%d", winKey->ts, size1);
} }
} }
} }
}
} }
static void clearFunctionContext(SExprSupp* pSup) { static void clearFunctionContext(SExprSupp* pSup) {
...@@ -2374,12 +2390,13 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p ...@@ -2374,12 +2390,13 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
}; };
void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey)); void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey));
if (isDeletedStreamWindow(&nextWin, groupId, pInfo->pState, &pInfo->twAggSup) && !chIds) { if (isDeletedStreamWindow(&nextWin, groupId, pInfo->pState, &pInfo->twAggSup) && !chIds) {
SPullWindowInfo pull = {.window = nextWin, .groupId = groupId}; SPullWindowInfo pull = {.window = nextWin, .groupId = groupId, .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey};
// add pull data request // add pull data request
savePullWindow(&pull, pInfo->pPullWins); if (savePullWindow(&pull, pInfo->pPullWins) == TSDB_CODE_SUCCESS) {
int32_t size = taosArrayGetSize(pInfo->pChildren); int32_t size = taosArrayGetSize(pInfo->pChildren);
addPullWindow(pInfo->pPullDataMap, &winRes, size); addPullWindow(pInfo->pPullDataMap, &winRes, size);
qDebug("===stream===prepare retrive %" PRId64 ", size:%d", winRes.ts, size); qDebug("===stream===prepare retrive %" PRId64 ", size:%d", winRes.ts, size);
}
} else { } else {
int32_t index = -1; int32_t index = -1;
SArray* chArray = NULL; SArray* chArray = NULL;
...@@ -2560,7 +2577,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -2560,7 +2577,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
} }
continue; continue;
} else if (pBlock->info.type == STREAM_PULL_OVER && IS_FINAL_OP(pInfo)) { } else if (pBlock->info.type == STREAM_PULL_OVER && IS_FINAL_OP(pInfo)) {
processPullOver(pBlock, pInfo->pPullDataMap); processPullOver(pBlock, pInfo->pPullDataMap, &pInfo->interval);
continue; continue;
} }
...@@ -2638,6 +2655,15 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -2638,6 +2655,15 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
return NULL; return NULL;
} }
int64_t getDeleteMark(SIntervalPhysiNode* pIntervalPhyNode) {
if (pIntervalPhyNode->window.deleteMark <= 0) {
return DEAULT_DELETE_MARK;
}
int64_t deleteMark = TMAX(pIntervalPhyNode->window.deleteMark,pIntervalPhyNode->window.watermark);
deleteMark = TMAX(deleteMark, pIntervalPhyNode->interval);
return deleteMark;
}
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo, int32_t numOfChild) { SExecTaskInfo* pTaskInfo, int32_t numOfChild) {
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
...@@ -2659,9 +2685,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -2659,9 +2685,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
.calTrigger = pIntervalPhyNode->window.triggerType, .calTrigger = pIntervalPhyNode->window.triggerType,
.maxTs = INT64_MIN, .maxTs = INT64_MIN,
.minTs = INT64_MAX, .minTs = INT64_MAX,
// for test 315360000000 .deleteMark = getDeleteMark(pIntervalPhyNode),
.deleteMark = 1000LL * 60LL * 60LL * 24LL * 365LL * 10LL,
// .deleteMark = INT64_MAX,
.deleteMarkSaved = 0, .deleteMarkSaved = 0,
.calTriggerSaved = 0, .calTriggerSaved = 0,
}; };
...@@ -4805,7 +4829,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -4805,7 +4829,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
.calTrigger = pIntervalPhyNode->window.triggerType, .calTrigger = pIntervalPhyNode->window.triggerType,
.maxTs = INT64_MIN, .maxTs = INT64_MIN,
.minTs = INT64_MAX, .minTs = INT64_MAX,
.deleteMark = INT64_MAX, .deleteMark = getDeleteMark(pIntervalPhyNode),
}; };
ASSERT(twAggSupp.calTrigger != STREAM_TRIGGER_MAX_DELAY); ASSERT(twAggSupp.calTrigger != STREAM_TRIGGER_MAX_DELAY);
......
...@@ -563,7 +563,8 @@ static int32_t parseTagValue(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt ...@@ -563,7 +563,8 @@ static int32_t parseTagValue(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt
static void buildCreateTbReq(SVnodeModifOpStmt* pStmt, STag* pTag, SArray* pTagName) { static void buildCreateTbReq(SVnodeModifOpStmt* pStmt, STag* pTag, SArray* pTagName) {
insBuildCreateTbReq(&pStmt->createTblReq, pStmt->targetTableName.tname, pTag, pStmt->pTableMeta->suid, insBuildCreateTbReq(&pStmt->createTblReq, pStmt->targetTableName.tname, pTag, pStmt->pTableMeta->suid,
pStmt->usingTableName.tname, pTagName, pStmt->pTableMeta->tableInfo.numOfTags, TSDB_DEFAULT_TABLE_TTL); pStmt->usingTableName.tname, pTagName, pStmt->pTableMeta->tableInfo.numOfTags,
TSDB_DEFAULT_TABLE_TTL);
} }
static int32_t checkAndTrimValue(SToken* pToken, char* tmpTokenBuf, SMsgBuf* pMsgBuf) { static int32_t checkAndTrimValue(SToken* pToken, char* tmpTokenBuf, SMsgBuf* pMsgBuf) {
...@@ -829,6 +830,44 @@ static int32_t getTableVgroup(SParseContext* pCxt, SVnodeModifOpStmt* pStmt, boo ...@@ -829,6 +830,44 @@ static int32_t getTableVgroup(SParseContext* pCxt, SVnodeModifOpStmt* pStmt, boo
return code; return code;
} }
static int32_t getTableMetaAndVgroupImpl(SParseContext* pCxt, SVnodeModifOpStmt* pStmt, bool* pMissCache) {
SVgroupInfo vg;
int32_t code = catalogGetCachedTableVgMeta(pCxt->pCatalog, &pStmt->targetTableName, &vg, &pStmt->pTableMeta);
if (TSDB_CODE_SUCCESS == code) {
if (NULL != pStmt->pTableMeta) {
code = taosHashPut(pStmt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg));
}
*pMissCache = (NULL == pStmt->pTableMeta);
}
return code;
}
static int32_t getTableMetaAndVgroup(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, bool* pMissCache) {
SParseContext* pComCxt = pCxt->pComCxt;
int32_t code = TSDB_CODE_SUCCESS;
if (pComCxt->async) {
code = getTableMetaAndVgroupImpl(pComCxt, pStmt, pMissCache);
} else {
code = getTableMeta(pCxt, &pStmt->targetTableName, false, &pStmt->pTableMeta, pMissCache);
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
code = getTableVgroup(pCxt->pComCxt, pStmt, false, &pCxt->missCache);
}
}
return code;
}
static int32_t collectUseTable(const SName* pName, SHashObj* pTable) {
char fullName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pName, fullName);
return taosHashPut(pTable, fullName, strlen(fullName), pName, sizeof(SName));
}
static int32_t collectUseDatabase(const SName* pName, SHashObj* pDbs) {
char dbFName[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(pName, dbFName);
return taosHashPut(pDbs, dbFName, strlen(dbFName), dbFName, sizeof(dbFName));
}
static int32_t getTargetTableSchema(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) { static int32_t getTargetTableSchema(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) {
if (pCxt->forceUpdate) { if (pCxt->forceUpdate) {
pCxt->missCache = true; pCxt->missCache = true;
...@@ -836,12 +875,24 @@ static int32_t getTargetTableSchema(SInsertParseContext* pCxt, SVnodeModifOpStmt ...@@ -836,12 +875,24 @@ static int32_t getTargetTableSchema(SInsertParseContext* pCxt, SVnodeModifOpStmt
} }
int32_t code = checkAuth(pCxt->pComCxt, &pStmt->targetTableName, &pCxt->missCache); int32_t code = checkAuth(pCxt->pComCxt, &pStmt->targetTableName, &pCxt->missCache);
#if 0
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
code = getTableMeta(pCxt, &pStmt->targetTableName, false, &pStmt->pTableMeta, &pCxt->missCache); code = getTableMeta(pCxt, &pStmt->targetTableName, false, &pStmt->pTableMeta, &pCxt->missCache);
} }
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
code = getTableVgroup(pCxt->pComCxt, pStmt, false, &pCxt->missCache); code = getTableVgroup(pCxt->pComCxt, pStmt, false, &pCxt->missCache);
} }
#else
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
code = getTableMetaAndVgroup(pCxt, pStmt, &pCxt->missCache);
}
#endif
if (TSDB_CODE_SUCCESS == code && !pCxt->pComCxt->async) {
code = collectUseDatabase(&pStmt->targetTableName, pStmt->pDbFNameHashObj);
if (TSDB_CODE_SUCCESS == code) {
code = collectUseTable(&pStmt->targetTableName, pStmt->pTableNameHashObj);
}
}
return code; return code;
} }
...@@ -862,6 +913,12 @@ static int32_t getUsingTableSchema(SInsertParseContext* pCxt, SVnodeModifOpStmt* ...@@ -862,6 +913,12 @@ static int32_t getUsingTableSchema(SInsertParseContext* pCxt, SVnodeModifOpStmt*
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
code = getTableVgroup(pCxt->pComCxt, pStmt, true, &pCxt->missCache); code = getTableVgroup(pCxt->pComCxt, pStmt, true, &pCxt->missCache);
} }
if (TSDB_CODE_SUCCESS == code && !pCxt->pComCxt->async) {
code = collectUseDatabase(&pStmt->usingTableName, pStmt->pDbFNameHashObj);
if (TSDB_CODE_SUCCESS == code) {
code = collectUseTable(&pStmt->usingTableName, pStmt->pTableNameHashObj);
}
}
return code; return code;
} }
...@@ -934,10 +991,9 @@ static int32_t getTableDataBlocks(SInsertParseContext* pCxt, SVnodeModifOpStmt* ...@@ -934,10 +991,9 @@ static int32_t getTableDataBlocks(SInsertParseContext* pCxt, SVnodeModifOpStmt*
pStmt->pTableMeta->uid = 0; pStmt->pTableMeta->uid = 0;
} }
return insGetDataBlockFromList(pStmt->pTableBlockHashObj, &uid, sizeof(pStmt->pTableMeta->uid), return insGetDataBlockFromList(
TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk), pStmt->pTableBlockHashObj, &uid, sizeof(pStmt->pTableMeta->uid), TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk),
getTableInfo(pStmt->pTableMeta).rowSize, pStmt->pTableMeta, pDataBuf, NULL, getTableInfo(pStmt->pTableMeta).rowSize, pStmt->pTableMeta, pDataBuf, NULL, &pStmt->createTblReq);
&pStmt->createTblReq);
} }
char tbFName[TSDB_TABLE_FNAME_LEN]; char tbFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(&pStmt->targetTableName, tbFName); tNameExtractFullName(&pStmt->targetTableName, tbFName);
...@@ -1540,8 +1596,9 @@ static int32_t setStmtInfo(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) ...@@ -1540,8 +1596,9 @@ static int32_t setStmtInfo(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt)
memcpy(tags, &pCxt->tags, sizeof(pCxt->tags)); memcpy(tags, &pCxt->tags, sizeof(pCxt->tags));
SStmtCallback* pStmtCb = pCxt->pComCxt->pStmtCb; SStmtCallback* pStmtCb = pCxt->pComCxt->pStmtCb;
int32_t code = (*pStmtCb->setInfoFn)(pStmtCb->pStmt, pStmt->pTableMeta, tags, &pStmt->targetTableName, pStmt->usingTableProcessing, int32_t code = (*pStmtCb->setInfoFn)(pStmtCb->pStmt, pStmt->pTableMeta, tags, &pStmt->targetTableName,
pStmt->pVgroupsHashObj, pStmt->pTableBlockHashObj, pStmt->usingTableName.tname); pStmt->usingTableProcessing, pStmt->pVgroupsHashObj, pStmt->pTableBlockHashObj,
pStmt->usingTableName.tname);
memset(&pCxt->tags, 0, sizeof(pCxt->tags)); memset(&pCxt->tags, 0, sizeof(pCxt->tags));
pStmt->pVgroupsHashObj = NULL; pStmt->pVgroupsHashObj = NULL;
...@@ -1776,17 +1833,26 @@ static int32_t initInsertQuery(SInsertParseContext* pCxt, SCatalogReq* pCatalogR ...@@ -1776,17 +1833,26 @@ static int32_t initInsertQuery(SInsertParseContext* pCxt, SCatalogReq* pCatalogR
static int32_t setRefreshMate(SQuery* pQuery) { static int32_t setRefreshMate(SQuery* pQuery) {
SVnodeModifOpStmt* pStmt = (SVnodeModifOpStmt*)pQuery->pRoot; SVnodeModifOpStmt* pStmt = (SVnodeModifOpStmt*)pQuery->pRoot;
if (taosHashGetSize(pStmt->pTableNameHashObj) > 0) {
taosArrayDestroy(pQuery->pTableList);
pQuery->pTableList = taosArrayInit(taosHashGetSize(pStmt->pTableNameHashObj), sizeof(SName));
SName* pTable = taosHashIterate(pStmt->pTableNameHashObj, NULL); SName* pTable = taosHashIterate(pStmt->pTableNameHashObj, NULL);
while (NULL != pTable) { while (NULL != pTable) {
taosArrayPush(pQuery->pTableList, pTable); taosArrayPush(pQuery->pTableList, pTable);
pTable = taosHashIterate(pStmt->pTableNameHashObj, pTable); pTable = taosHashIterate(pStmt->pTableNameHashObj, pTable);
} }
}
if (taosHashGetSize(pStmt->pDbFNameHashObj) > 0) {
taosArrayDestroy(pQuery->pDbList);
pQuery->pDbList = taosArrayInit(taosHashGetSize(pStmt->pDbFNameHashObj), TSDB_DB_FNAME_LEN);
char* pDb = taosHashIterate(pStmt->pDbFNameHashObj, NULL); char* pDb = taosHashIterate(pStmt->pDbFNameHashObj, NULL);
while (NULL != pDb) { while (NULL != pDb) {
taosArrayPush(pQuery->pDbList, pDb); taosArrayPush(pQuery->pDbList, pDb);
pDb = taosHashIterate(pStmt->pDbFNameHashObj, pDb); pDb = taosHashIterate(pStmt->pDbFNameHashObj, pDb);
} }
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1899,29 +1965,28 @@ static int32_t buildInsertCatalogReq(SInsertParseContext* pCxt, SVnodeModifOpStm ...@@ -1899,29 +1965,28 @@ static int32_t buildInsertCatalogReq(SInsertParseContext* pCxt, SVnodeModifOpStm
} }
static int32_t setNextStageInfo(SInsertParseContext* pCxt, SQuery* pQuery, SCatalogReq* pCatalogReq) { static int32_t setNextStageInfo(SInsertParseContext* pCxt, SQuery* pQuery, SCatalogReq* pCatalogReq) {
SVnodeModifOpStmt* pStmt = (SVnodeModifOpStmt*)pQuery->pRoot;
if (pCxt->missCache) { if (pCxt->missCache) {
parserDebug("0x%" PRIx64 " %d rows have been inserted before cache miss", pCxt->pComCxt->requestId, parserDebug("0x%" PRIx64 " %d rows of %d tables have been inserted before cache miss", pCxt->pComCxt->requestId,
((SVnodeModifOpStmt*)pQuery->pRoot)->totalRowsNum); pStmt->totalRowsNum, pStmt->totalTbNum);
pQuery->execStage = QUERY_EXEC_STAGE_PARSE; pQuery->execStage = QUERY_EXEC_STAGE_PARSE;
return buildInsertCatalogReq(pCxt, (SVnodeModifOpStmt*)pQuery->pRoot, pCatalogReq); return buildInsertCatalogReq(pCxt, pStmt, pCatalogReq);
} }
parserDebug("0x%" PRIx64 " %d rows have been inserted", pCxt->pComCxt->requestId, parserDebug("0x%" PRIx64 " %d rows of %d tables have been inserted", pCxt->pComCxt->requestId, pStmt->totalRowsNum,
((SVnodeModifOpStmt*)pQuery->pRoot)->totalRowsNum); pStmt->totalTbNum);
pQuery->execStage = QUERY_EXEC_STAGE_SCHEDULE; pQuery->execStage = QUERY_EXEC_STAGE_SCHEDULE;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t parseInsertSql(SParseContext* pCxt, SQuery** pQuery, SCatalogReq* pCatalogReq, const SMetaData* pMetaData) { int32_t parseInsertSql(SParseContext* pCxt, SQuery** pQuery, SCatalogReq* pCatalogReq, const SMetaData* pMetaData) {
SInsertParseContext context = { SInsertParseContext context = {.pComCxt = pCxt,
.pComCxt = pCxt,
.msg = {.buf = pCxt->pMsg, .len = pCxt->msgLen}, .msg = {.buf = pCxt->pMsg, .len = pCxt->msgLen},
.missCache = false, .missCache = false,
.usingDuplicateTable = false, .usingDuplicateTable = false,
.forceUpdate = (NULL != pCatalogReq ? pCatalogReq->forceUpdate : false) .forceUpdate = (NULL != pCatalogReq ? pCatalogReq->forceUpdate : false)};
};
int32_t code = initInsertQuery(&context, pCatalogReq, pMetaData, pQuery); int32_t code = initInsertQuery(&context, pCatalogReq, pMetaData, pQuery);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
......
...@@ -248,6 +248,13 @@ int32_t __catalogGetCachedTableHashVgroup(SCatalog* pCtg, const SName* pTableNam ...@@ -248,6 +248,13 @@ int32_t __catalogGetCachedTableHashVgroup(SCatalog* pCtg, const SName* pTableNam
return code; return code;
} }
int32_t __catalogGetCachedTableVgMeta(SCatalog* pCtg, const SName* pTableName, SVgroupInfo* pVgroup, STableMeta** pTableMeta) {
int32_t code = g_mockCatalogService->catalogGetTableMeta(pTableName, pTableMeta, true);
if (code) return code;
code = g_mockCatalogService->catalogGetTableHashVgroup(pTableName, pVgroup, true);
return code;
}
int32_t __catalogGetTableDistVgInfo(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, int32_t __catalogGetTableDistVgInfo(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName,
SArray** pVgList) { SArray** pVgList) {
return g_mockCatalogService->catalogGetTableDistVgInfo(pTableName, pVgList); return g_mockCatalogService->catalogGetTableDistVgInfo(pTableName, pVgList);
...@@ -316,6 +323,7 @@ void initMetaDataEnv() { ...@@ -316,6 +323,7 @@ void initMetaDataEnv() {
stub.set(catalogGetCachedSTableMeta, __catalogGetCachedTableMeta); stub.set(catalogGetCachedSTableMeta, __catalogGetCachedTableMeta);
stub.set(catalogGetTableHashVgroup, __catalogGetTableHashVgroup); stub.set(catalogGetTableHashVgroup, __catalogGetTableHashVgroup);
stub.set(catalogGetCachedTableHashVgroup, __catalogGetCachedTableHashVgroup); stub.set(catalogGetCachedTableHashVgroup, __catalogGetCachedTableHashVgroup);
stub.set(catalogGetCachedTableVgMeta, __catalogGetCachedTableVgMeta);
stub.set(catalogGetTableDistVgInfo, __catalogGetTableDistVgInfo); stub.set(catalogGetTableDistVgInfo, __catalogGetTableDistVgInfo);
stub.set(catalogGetDBVgVersion, __catalogGetDBVgVersion); stub.set(catalogGetDBVgVersion, __catalogGetDBVgVersion);
stub.set(catalogGetDBVgList, __catalogGetDBVgList); stub.set(catalogGetDBVgList, __catalogGetDBVgList);
......
...@@ -54,6 +54,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* ...@@ -54,6 +54,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
/*ASSERT(false);*/ /*ASSERT(false);*/
qError("unexpected stream execution, stream %" PRId64 " task: %d, since %s", pTask->streamId, pTask->taskId, qError("unexpected stream execution, stream %" PRId64 " task: %d, since %s", pTask->streamId, pTask->taskId,
terrstr()); terrstr());
continue;
} }
if (output == NULL) { if (output == NULL) {
if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) { if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
......
...@@ -720,14 +720,16 @@ static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage ...@@ -720,14 +720,16 @@ static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage
pgno = TDB_PAGE_PGNO(pPage); pgno = TDB_PAGE_PGNO(pPage);
tdbTrace("tdbttl init pager:%p, pgno:%d, loadPage:%d, size:%d", pPager, pgno, loadPage, pPager->dbOrigSize); tdbTrace("tdb/pager:%p, pgno:%d, loadPage:%d, size:%d", pPager, pgno, loadPage, pPager->dbOrigSize);
if (loadPage && pgno <= pPager->dbOrigSize) { if (loadPage && pgno <= pPager->dbOrigSize) {
init = 1; init = 1;
nRead = tdbOsPRead(pPager->fd, pPage->pData, pPage->pageSize, ((i64)pPage->pageSize) * (pgno - 1)); nRead = tdbOsPRead(pPager->fd, pPage->pData, pPage->pageSize, ((i64)pPage->pageSize) * (pgno - 1));
tdbTrace("tdbttl pager:%p, pgno:%d, nRead:%" PRId64, pPager, pgno, nRead); tdbTrace("tdb/pager:%p, pgno:%d, nRead:%" PRId64, pPager, pgno, nRead);
if (nRead < pPage->pageSize) { if (nRead < pPage->pageSize) {
ASSERT(0); ASSERT(0);
tdbError("tdb/pager:%p, pgno:%d, nRead:%" PRId64 "pgSize:%" PRId32, pPager, pgno, nRead, pPage->pageSize);
TDB_UNLOCK_PAGE(pPage);
return -1; return -1;
} }
} else { } else {
......
...@@ -285,6 +285,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { ...@@ -285,6 +285,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
} }
destroyCmsg(msg); destroyCmsg(msg);
} }
memset(&conn->ctx, 0, sizeof(conn->ctx));
} }
bool cliMaySendCachedMsg(SCliConn* conn) { bool cliMaySendCachedMsg(SCliConn* conn) {
if (!transQueueEmpty(&conn->cliMsgs)) { if (!transQueueEmpty(&conn->cliMsgs)) {
...@@ -589,6 +590,7 @@ static void addConnToPool(void* pool, SCliConn* conn) { ...@@ -589,6 +590,7 @@ static void addConnToPool(void* pool, SCliConn* conn) {
} }
static int32_t allocConnRef(SCliConn* conn, bool update) { static int32_t allocConnRef(SCliConn* conn, bool update) {
if (update) { if (update) {
transReleaseExHandle(transGetRefMgt(), conn->refId);
transRemoveExHandle(transGetRefMgt(), conn->refId); transRemoveExHandle(transGetRefMgt(), conn->refId);
conn->refId = -1; conn->refId = -1;
} }
...@@ -697,6 +699,7 @@ static void cliDestroyConn(SCliConn* conn, bool clear) { ...@@ -697,6 +699,7 @@ static void cliDestroyConn(SCliConn* conn, bool clear) {
tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn); tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn);
QUEUE_REMOVE(&conn->q); QUEUE_REMOVE(&conn->q);
QUEUE_INIT(&conn->q); QUEUE_INIT(&conn->q);
transReleaseExHandle(transGetRefMgt(), conn->refId);
transRemoveExHandle(transGetRefMgt(), conn->refId); transRemoveExHandle(transGetRefMgt(), conn->refId);
conn->refId = -1; conn->refId = -1;
...@@ -731,6 +734,7 @@ static void cliDestroy(uv_handle_t* handle) { ...@@ -731,6 +734,7 @@ static void cliDestroy(uv_handle_t* handle) {
conn->timer = NULL; conn->timer = NULL;
} }
transReleaseExHandle(transGetRefMgt(), conn->refId);
transRemoveExHandle(transGetRefMgt(), conn->refId); transRemoveExHandle(transGetRefMgt(), conn->refId);
taosMemoryFree(conn->ip); taosMemoryFree(conn->ip);
taosMemoryFree(conn->stream); taosMemoryFree(conn->stream);
......
...@@ -282,6 +282,9 @@ void transCtxCleanup(STransCtx* ctx) { ...@@ -282,6 +282,9 @@ void transCtxCleanup(STransCtx* ctx) {
} }
void transCtxMerge(STransCtx* dst, STransCtx* src) { void transCtxMerge(STransCtx* dst, STransCtx* src) {
if (src->args == NULL || src->freeFunc == NULL) {
return;
}
if (dst->args == NULL) { if (dst->args == NULL) {
dst->args = src->args; dst->args = src->args;
dst->brokenVal = src->brokenVal; dst->brokenVal = src->brokenVal;
......
...@@ -500,8 +500,9 @@ int32_t taosCloseDir(TdDirPtr *ppDir) { ...@@ -500,8 +500,9 @@ int32_t taosCloseDir(TdDirPtr *ppDir) {
void taosGetCwd(char *buf, int32_t len) { void taosGetCwd(char *buf, int32_t len) {
#if !defined(WINDOWS) #if !defined(WINDOWS)
(void)getcwd(buf, len - 1); char *unused __attribute__((unused));
unused = getcwd(buf, len - 1);
#else #else
strncpy(buf, "not implemented on windows", len -1); strncpy(buf, "not implemented on windows", len - 1);
#endif #endif
} }
...@@ -1037,4 +1037,4 @@ ...@@ -1037,4 +1037,4 @@
,,n,docs-examples-test,bash node.sh ,,n,docs-examples-test,bash node.sh
,,n,docs-examples-test,bash csharp.sh ,,n,docs-examples-test,bash csharp.sh
,,n,docs-examples-test,bash jdbc.sh ,,n,docs-examples-test,bash jdbc.sh
,,n,docs-examples-test,bash go.sh #,,n,docs-examples-test,bash go.sh
...@@ -4,11 +4,11 @@ system sh/deploy.sh -n dnode2 -i 2 ...@@ -4,11 +4,11 @@ system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3 system sh/deploy.sh -n dnode3 -i 3
system sh/deploy.sh -n dnode4 -i 4 system sh/deploy.sh -n dnode4 -i 4
system sh/deploy.sh -n dnode5 -i 5 system sh/deploy.sh -n dnode5 -i 5
system sh/cfg.sh -n dnode1 -c supportVnodes -v 4 system sh/cfg.sh -n dnode1 -c supportVnodes -v 5
system sh/cfg.sh -n dnode2 -c supportVnodes -v 4 system sh/cfg.sh -n dnode2 -c supportVnodes -v 5
system sh/cfg.sh -n dnode3 -c supportVnodes -v 4 system sh/cfg.sh -n dnode3 -c supportVnodes -v 5
system sh/cfg.sh -n dnode4 -c supportVnodes -v 4 system sh/cfg.sh -n dnode4 -c supportVnodes -v 5
system sh/cfg.sh -n dnode5 -c supportVnodes -v 4 system sh/cfg.sh -n dnode5 -c supportVnodes -v 5
print ========== step1 print ========== step1
system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode1 -s start
......
...@@ -461,7 +461,7 @@ sql insert into t2 values(1648791213004,4,10,10,4.1); ...@@ -461,7 +461,7 @@ sql insert into t2 values(1648791213004,4,10,10,4.1);
$loop_count = 0 $loop_count = 0
loop2: loop2:
sleep 100 sleep 200
$loop_count = $loop_count + 1 $loop_count = $loop_count + 1
if $loop_count == 10 then if $loop_count == 10 then
...@@ -519,7 +519,7 @@ print step 6 ...@@ -519,7 +519,7 @@ print step 6
$loop_count = 0 $loop_count = 0
loop3: loop3:
# sleep 300 sleep 300
$loop_count = $loop_count + 1 $loop_count = $loop_count + 1
if $loop_count == 10 then if $loop_count == 10 then
...@@ -618,6 +618,60 @@ if $data41 != 2 then ...@@ -618,6 +618,60 @@ if $data41 != 2 then
goto loop4 goto loop4
endi endi
sql insert into t1 values(1648791343003,4,4,4,3.1);
sql insert into t1 values(1648791213004,4,5,5,4.1);
loop5:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select * from streamt3;
# row 0
if $rows != 7 then
print =====rows=$rows
goto loop5
endi
if $data01 != 4 then
print =====data01=$data01
goto loop5
endi
if $data11 != 6 then
print =====data11=$data11
goto loop5
endi
if $data21 != 4 then
print =====data21=$data21
goto loop5
endi
if $data31 != 4 then
print =====data31=$data31
goto loop5
endi
if $data41 != 2 then
print =====data41=$data41
goto loop5
endi
if $data51 != 1 then
print =====data51=$data51
goto loop5
endi
if $data61 != 1 then
print =====data61=$data61
goto loop5
endi
$loop_all = $loop_all + 1 $loop_all = $loop_all + 1
print ============loop_all=$loop_all print ============loop_all=$loop_all
......
...@@ -3,10 +3,10 @@ system sh/deploy.sh -n dnode1 -i 1 ...@@ -3,10 +3,10 @@ system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2 system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3 system sh/deploy.sh -n dnode3 -i 3
system sh/deploy.sh -n dnode4 -i 4 system sh/deploy.sh -n dnode4 -i 4
system sh/cfg.sh -n dnode1 -c supportVnodes -v 4 system sh/cfg.sh -n dnode1 -c supportVnodes -v 5
system sh/cfg.sh -n dnode2 -c supportVnodes -v 4 system sh/cfg.sh -n dnode2 -c supportVnodes -v 5
system sh/cfg.sh -n dnode3 -c supportVnodes -v 4 system sh/cfg.sh -n dnode3 -c supportVnodes -v 5
system sh/cfg.sh -n dnode4 -c supportVnodes -v 4 system sh/cfg.sh -n dnode4 -c supportVnodes -v 5
$dbPrefix = br1_db $dbPrefix = br1_db
$tbPrefix = br1_tb $tbPrefix = br1_tb
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册