提交 564bd505 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/develop' into feature/TD-1843

...@@ -14,10 +14,12 @@ pipeline { ...@@ -14,10 +14,12 @@ pipeline {
sh ''' sh '''
date date
cd ${WKC} cd ${WKC}
git reset --hard
git checkout develop git checkout develop
git pull git pull
git submodule update git submodule update
cd ${WK} cd ${WK}
git reset --hard
git checkout develop git checkout develop
git pull git pull
export TZ=Asia/Harbin export TZ=Asia/Harbin
...@@ -39,11 +41,13 @@ pipeline { ...@@ -39,11 +41,13 @@ pipeline {
steps { steps {
sh ''' sh '''
cd ${WKC} cd ${WKC}
git reset --hard
git checkout develop git checkout develop
git pull git pull
git submodule update git submodule update
cd ${WK} cd ${WK}
git reset --hard
git checkout develop git checkout develop
git pull git pull
export TZ=Asia/Harbin export TZ=Asia/Harbin
...@@ -65,11 +69,13 @@ pipeline { ...@@ -65,11 +69,13 @@ pipeline {
steps { steps {
sh ''' sh '''
cd ${WKC} cd ${WKC}
git reset --hard
git checkout develop git checkout develop
git pull git pull
git submodule update git submodule update
cd ${WK} cd ${WK}
git reset --hard
git checkout develop git checkout develop
git pull git pull
export TZ=Asia/Harbin export TZ=Asia/Harbin
...@@ -108,11 +114,13 @@ pipeline { ...@@ -108,11 +114,13 @@ pipeline {
steps { steps {
sh ''' sh '''
cd ${WKC} cd ${WKC}
git reset --hard
git checkout develop git checkout develop
git pull git pull
git submodule update git submodule update
cd ${WK} cd ${WK}
git reset --hard
git checkout develop git checkout develop
git pull git pull
export TZ=Asia/Harbin export TZ=Asia/Harbin
......
...@@ -333,7 +333,7 @@ typedef struct STscObj { ...@@ -333,7 +333,7 @@ typedef struct STscObj {
char superAuth : 1; char superAuth : 1;
uint32_t connId; uint32_t connId;
uint64_t rid; // ref ID returned by taosAddRef uint64_t rid; // ref ID returned by taosAddRef
struct SSqlObj * pHb; int64_t hbrid;
struct SSqlObj * sqlList; struct SSqlObj * sqlList;
struct SSqlStream *streamList; struct SSqlStream *streamList;
void* pDnodeConn; void* pDnodeConn;
...@@ -373,7 +373,7 @@ typedef struct SSqlObj { ...@@ -373,7 +373,7 @@ typedef struct SSqlObj {
struct SSqlObj **pSubs; struct SSqlObj **pSubs;
struct SSqlObj *prev, *next; struct SSqlObj *prev, *next;
struct SSqlObj **self; int64_t self;
} SSqlObj; } SSqlObj;
typedef struct SSqlStream { typedef struct SSqlStream {
...@@ -507,7 +507,7 @@ static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pField ...@@ -507,7 +507,7 @@ static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pField
} }
extern SCacheObj* tscMetaCache; extern SCacheObj* tscMetaCache;
extern SCacheObj* tscObjCache; extern int tscObjRef;
extern void * tscTmr; extern void * tscTmr;
extern void * tscQhandle; extern void * tscQhandle;
extern int tscKeepConn[]; extern int tscKeepConn[];
......
...@@ -825,8 +825,11 @@ static int32_t tscProcessClientVer(SSqlObj *pSql) { ...@@ -825,8 +825,11 @@ static int32_t tscProcessClientVer(SSqlObj *pSql) {
static int32_t tscProcessServStatus(SSqlObj *pSql) { static int32_t tscProcessServStatus(SSqlObj *pSql) {
STscObj* pObj = pSql->pTscObj; STscObj* pObj = pSql->pTscObj;
if (pObj->pHb != NULL) { SSqlObj* pHb = (SSqlObj*)taosAcquireRef(tscObjRef, pObj->hbrid);
if (pObj->pHb->res.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { if (pHb != NULL) {
int32_t code = pHb->res.code;
taosReleaseRef(tscObjRef, pObj->hbrid);
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
pSql->res.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; pSql->res.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
return pSql->res.code; return pSql->res.code;
} }
......
...@@ -175,10 +175,10 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { ...@@ -175,10 +175,10 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId)); if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId));
} }
} else { } else {
tscDebug("%p heartbeat failed, code:%s", pObj->pHb, tstrerror(code)); tscDebug("%" PRId64 " heartbeat failed, code:%s", pObj->hbrid, tstrerror(code));
} }
if (pObj->pHb != NULL) { if (pObj->hbrid != 0) {
int32_t waitingDuring = tsShellActivityTimer * 500; int32_t waitingDuring = tsShellActivityTimer * 500;
tscDebug("%p send heartbeat in %dms", pSql, waitingDuring); tscDebug("%p send heartbeat in %dms", pSql, waitingDuring);
...@@ -193,20 +193,12 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { ...@@ -193,20 +193,12 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
STscObj *pObj = taosAcquireRef(tscRefId, rid); STscObj *pObj = taosAcquireRef(tscRefId, rid);
if (pObj == NULL) return; if (pObj == NULL) return;
SSqlObj* pHB = pObj->pHb; SSqlObj* pHB = taosAcquireRef(tscObjRef, pObj->hbrid);
assert(pHB->self == pObj->hbrid);
void** p = taosCacheAcquireByKey(tscObjCache, &pHB, sizeof(TSDB_CACHE_PTR_TYPE));
if (p == NULL) {
tscWarn("%p HB object has been released already", pHB);
taosReleaseRef(tscRefId, pObj->rid);
return;
}
assert(*pHB->self == pHB);
pHB->retry = 0; pHB->retry = 0;
int32_t code = tscProcessSql(pHB); int32_t code = tscProcessSql(pHB);
taosCacheRelease(tscObjCache, (void**) &p, false); taosReleaseRef(tscObjRef, pObj->hbrid);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tscError("%p failed to sent HB to server, reason:%s", pHB, tstrerror(code)); tscError("%p failed to sent HB to server, reason:%s", pHB, tstrerror(code));
...@@ -236,7 +228,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { ...@@ -236,7 +228,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
.msgType = pSql->cmd.msgType, .msgType = pSql->cmd.msgType,
.pCont = pMsg, .pCont = pMsg,
.contLen = pSql->cmd.payloadLen, .contLen = pSql->cmd.payloadLen,
.ahandle = pSql, .ahandle = (void*)pSql->self,
.handle = NULL, .handle = NULL,
.code = 0 .code = 0
}; };
...@@ -247,26 +239,24 @@ int tscSendMsgToServer(SSqlObj *pSql) { ...@@ -247,26 +239,24 @@ int tscSendMsgToServer(SSqlObj *pSql) {
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
TSDB_CACHE_PTR_TYPE handle = (TSDB_CACHE_PTR_TYPE) rpcMsg->ahandle; TSDB_CACHE_PTR_TYPE handle = (TSDB_CACHE_PTR_TYPE) rpcMsg->ahandle;
void** p = taosCacheAcquireByKey(tscObjCache, &handle, sizeof(TSDB_CACHE_PTR_TYPE)); SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle);
if (p == NULL) { if (pSql == NULL) {
rpcFreeCont(rpcMsg->pCont); rpcFreeCont(rpcMsg->pCont);
return; return;
} }
assert(pSql->self == handle);
SSqlObj* pSql = *p;
assert(pSql != NULL);
STscObj *pObj = pSql->pTscObj; STscObj *pObj = pSql->pTscObj;
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
assert(*pSql->self == pSql);
pSql->rpcRid = -1; pSql->rpcRid = -1;
if (pObj->signature != pObj) { if (pObj->signature != pObj) {
tscDebug("%p DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command, pObj, pObj->signature); tscDebug("%p DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command, pObj, pObj->signature);
taosCacheRelease(tscObjCache, (void**) &p, true); taosRemoveRef(tscObjRef, pSql->self);
taosReleaseRef(tscObjRef, pSql->self);
rpcFreeCont(rpcMsg->pCont); rpcFreeCont(rpcMsg->pCont);
return; return;
} }
...@@ -276,10 +266,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { ...@@ -276,10 +266,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
tscDebug("%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p", tscDebug("%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p",
pSql, pCmd->command, pQueryInfo->type, pObj, pObj->signature); pSql, pCmd->command, pQueryInfo->type, pObj, pObj->signature);
void** p1 = p; taosRemoveRef(tscObjRef, pSql->self);
taosCacheRelease(tscObjCache, (void**) &p1, false); taosReleaseRef(tscObjRef, pSql->self);
taosCacheRelease(tscObjCache, (void**) &p, true);
rpcFreeCont(rpcMsg->pCont); rpcFreeCont(rpcMsg->pCont);
return; return;
} }
...@@ -322,7 +310,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { ...@@ -322,7 +310,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
// if there is an error occurring, proceed to the following error handling procedure. // if there is an error occurring, proceed to the following error handling procedure.
if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosCacheRelease(tscObjCache, (void**) &p, false); taosReleaseRef(tscObjRef, pSql->self);
rpcFreeCont(rpcMsg->pCont); rpcFreeCont(rpcMsg->pCont);
return; return;
} }
...@@ -390,11 +378,10 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { ...@@ -390,11 +378,10 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
(*pSql->fp)(pSql->param, pSql, rpcMsg->code); (*pSql->fp)(pSql->param, pSql, rpcMsg->code);
} }
void** p1 = p; taosReleaseRef(tscObjRef, pSql->self);
taosCacheRelease(tscObjCache, (void**) &p1, false);
if (shouldFree) { // in case of table-meta/vgrouplist query, automatically free it if (shouldFree) { // in case of table-meta/vgrouplist query, automatically free it
taosCacheRelease(tscObjCache, (void **)&p, true); taosRemoveRef(tscObjRef, pSql->self);
tscDebug("%p sqlObj is automatically freed", pSql); tscDebug("%p sqlObj is automatically freed", pSql);
} }
...@@ -2020,7 +2007,7 @@ int tscProcessShowRsp(SSqlObj *pSql) { ...@@ -2020,7 +2007,7 @@ int tscProcessShowRsp(SSqlObj *pSql) {
// TODO multithread problem // TODO multithread problem
static void createHBObj(STscObj* pObj) { static void createHBObj(STscObj* pObj) {
if (pObj->pHb != NULL) { if (pObj->hbrid != 0) {
return; return;
} }
...@@ -2052,7 +2039,7 @@ static void createHBObj(STscObj* pObj) { ...@@ -2052,7 +2039,7 @@ static void createHBObj(STscObj* pObj) {
registerSqlObj(pSql); registerSqlObj(pSql);
tscDebug("%p HB is allocated, pObj:%p", pSql, pObj); tscDebug("%p HB is allocated, pObj:%p", pSql, pObj);
pObj->pHb = pSql; pObj->hbrid = pSql->self;
} }
int tscProcessConnectRsp(SSqlObj *pSql) { int tscProcessConnectRsp(SSqlObj *pSql) {
......
...@@ -276,8 +276,8 @@ void taos_close(TAOS *taos) { ...@@ -276,8 +276,8 @@ void taos_close(TAOS *taos) {
pObj->signature = NULL; pObj->signature = NULL;
taosTmrStopA(&(pObj->pTimer)); taosTmrStopA(&(pObj->pTimer));
SSqlObj* pHb = pObj->pHb; SSqlObj* pHb = (SSqlObj*)taosAcquireRef(tscObjRef, pObj->hbrid);
if (pHb != NULL && atomic_val_compare_exchange_ptr(&pObj->pHb, pHb, 0) == pHb) { if (pHb != NULL) {
if (pHb->rpcRid > 0) { // wait for rsp from dnode if (pHb->rpcRid > 0) { // wait for rsp from dnode
rpcCancelRequest(pHb->rpcRid); rpcCancelRequest(pHb->rpcRid);
pHb->rpcRid = -1; pHb->rpcRid = -1;
...@@ -285,6 +285,7 @@ void taos_close(TAOS *taos) { ...@@ -285,6 +285,7 @@ void taos_close(TAOS *taos) {
tscDebug("%p HB is freed", pHb); tscDebug("%p HB is freed", pHb);
taos_free_result(pHb); taos_free_result(pHb);
taosReleaseRef(tscObjRef, pHb->self);
} }
int32_t ref = T_REF_DEC(pObj); int32_t ref = T_REF_DEC(pObj);
...@@ -606,8 +607,7 @@ void taos_free_result(TAOS_RES *res) { ...@@ -606,8 +607,7 @@ void taos_free_result(TAOS_RES *res) {
bool freeNow = tscKillQueryInDnode(pSql); bool freeNow = tscKillQueryInDnode(pSql);
if (freeNow) { if (freeNow) {
tscDebug("%p free sqlObj in cache", pSql); tscDebug("%p free sqlObj in cache", pSql);
SSqlObj** p = pSql->self; taosReleaseRef(tscObjRef, pSql->self);
taosCacheRelease(tscObjCache, (void**) &p, true);
} }
} }
...@@ -700,13 +700,7 @@ static void tscKillSTableQuery(SSqlObj *pSql) { ...@@ -700,13 +700,7 @@ static void tscKillSTableQuery(SSqlObj *pSql) {
continue; continue;
} }
void** p = taosCacheAcquireByKey(tscObjCache, &pSub, sizeof(TSDB_CACHE_PTR_TYPE)); SSqlObj* pSubObj = pSub;
if (p == NULL) {
continue;
}
SSqlObj* pSubObj = (SSqlObj*) (*p);
assert(pSubObj->self == (SSqlObj**) p);
pSubObj->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; pSubObj->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
if (pSubObj->rpcRid > 0) { if (pSubObj->rpcRid > 0) {
...@@ -715,7 +709,7 @@ static void tscKillSTableQuery(SSqlObj *pSql) { ...@@ -715,7 +709,7 @@ static void tscKillSTableQuery(SSqlObj *pSql) {
} }
tscQueueAsyncRes(pSubObj); tscQueueAsyncRes(pSubObj);
taosCacheRelease(tscObjCache, (void**) &p, false); taosReleaseRef(tscObjRef, pSubObj->self);
} }
tscDebug("%p super table query cancelled", pSql); tscDebug("%p super table query cancelled", pSql);
......
...@@ -179,8 +179,8 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* ...@@ -179,8 +179,8 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
fail: fail:
tscError("tscCreateSubscription failed at line %d, reason: %s", line, tstrerror(code)); tscError("tscCreateSubscription failed at line %d, reason: %s", line, tstrerror(code));
if (pSql != NULL) { if (pSql != NULL) {
if (pSql->self != NULL) { if (pSql->self != 0) {
taos_free_result(pSql); taosReleaseRef(tscObjRef, pSql->self);
} else { } else {
tscFreeSqlObj(pSql); tscFreeSqlObj(pSql);
} }
......
...@@ -2198,6 +2198,9 @@ int32_t tscHandleInsertRetry(SSqlObj* pSql) { ...@@ -2198,6 +2198,9 @@ int32_t tscHandleInsertRetry(SSqlObj* pSql) {
STableDataBlocks* pTableDataBlock = taosArrayGetP(pCmd->pDataBlocks, pSupporter->index); STableDataBlocks* pTableDataBlock = taosArrayGetP(pCmd->pDataBlocks, pSupporter->index);
int32_t code = tscCopyDataBlockToPayload(pSql, pTableDataBlock); int32_t code = tscCopyDataBlockToPayload(pSql, pTableDataBlock);
// free the data block created from insert sql string
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
if ((pRes->code = code)!= TSDB_CODE_SUCCESS) { if ((pRes->code = code)!= TSDB_CODE_SUCCESS) {
tscQueueAsyncRes(pSql); tscQueueAsyncRes(pSql);
return code; // here the pSql may have been released already. return code; // here the pSql may have been released already.
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
#include "os.h" #include "os.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tcache.h" #include "tref.h"
#include "trpc.h" #include "trpc.h"
#include "tsystem.h" #include "tsystem.h"
#include "ttimer.h" #include "ttimer.h"
...@@ -31,7 +31,7 @@ ...@@ -31,7 +31,7 @@
// global, not configurable // global, not configurable
SCacheObj* tscMetaCache; SCacheObj* tscMetaCache;
SCacheObj* tscObjCache; int tscObjRef = -1;
void * tscTmr; void * tscTmr;
void * tscQhandle; void * tscQhandle;
void * tscCheckDiskUsageTmr; void * tscCheckDiskUsageTmr;
...@@ -144,7 +144,7 @@ void taos_init_imp(void) { ...@@ -144,7 +144,7 @@ void taos_init_imp(void) {
int64_t refreshTime = 10; // 10 seconds by default int64_t refreshTime = 10; // 10 seconds by default
if (tscMetaCache == NULL) { if (tscMetaCache == NULL) {
tscMetaCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, tscFreeTableMetaHelper, "tableMeta"); tscMetaCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, tscFreeTableMetaHelper, "tableMeta");
tscObjCache = taosCacheInit(TSDB_CACHE_PTR_KEY, refreshTime / 2, false, tscFreeRegisteredSqlObj, "sqlObj"); tscObjRef = taosOpenRef(4096, tscFreeRegisteredSqlObj);
} }
tscRefId = taosOpenRef(200, tscCloseTscObj); tscRefId = taosOpenRef(200, tscCloseTscObj);
...@@ -167,9 +167,9 @@ void taos_cleanup(void) { ...@@ -167,9 +167,9 @@ void taos_cleanup(void) {
taosCacheCleanup(m); taosCacheCleanup(m);
} }
m = tscObjCache; int refId = atomic_exchange_32(&tscObjRef, -1);
if (m != NULL && atomic_val_compare_exchange_ptr(&tscObjCache, m, 0) == m) { if (refId != -1) {
taosCacheCleanup(m); taosCloseRef(refId);
} }
m = tscQhandle; m = tscQhandle;
......
...@@ -447,20 +447,18 @@ static void tscFreeSubobj(SSqlObj* pSql) { ...@@ -447,20 +447,18 @@ static void tscFreeSubobj(SSqlObj* pSql) {
void tscFreeRegisteredSqlObj(void *pSql) { void tscFreeRegisteredSqlObj(void *pSql) {
assert(pSql != NULL); assert(pSql != NULL);
SSqlObj** p = (SSqlObj**)pSql; SSqlObj* p = *(SSqlObj**)pSql;
STscObj* pTscObj = (*p)->pTscObj; STscObj* pTscObj = p->pTscObj;
assert((*p)->self != 0 && (*p)->self == (p)); assert(p->self != 0);
tscFreeSqlObj(p);
SSqlObj* ptr = *p;
tscFreeSqlObj(*p);
int32_t ref = T_REF_DEC(pTscObj); int32_t ref = T_REF_DEC(pTscObj);
assert(ref >= 0); assert(ref >= 0);
tscDebug("%p free sqlObj completed, tscObj:%p ref:%d", ptr, pTscObj, ref); tscDebug("%p free sqlObj completed, tscObj:%p ref:%d", p, pTscObj, ref);
if (ref == 0) { if (ref == 0) {
tscDebug("%p all sqlObj freed, free tscObj:%p", ptr, pTscObj); tscDebug("%p all sqlObj freed, free tscObj:%p", p, pTscObj);
taosRemoveRef(tscRefId, pTscObj->rid); taosRemoveRef(tscRefId, pTscObj->rid);
} }
} }
...@@ -840,7 +838,6 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) { ...@@ -840,7 +838,6 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
// the length does not include the SSubmitBlk structure // the length does not include the SSubmitBlk structure
pBlocks->dataLen = htonl(finalLen); pBlocks->dataLen = htonl(finalLen);
dataBuf->numOfTables += 1; dataBuf->numOfTables += 1;
} }
...@@ -1565,19 +1562,6 @@ void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SQueryInfo* pQueryInfo) { ...@@ -1565,19 +1562,6 @@ void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SQueryInfo* pQueryInfo) {
} }
} }
void tscSetFreeHeatBeat(STscObj* pObj) {
if (pObj == NULL || pObj->signature != pObj || pObj->pHb == NULL) {
return;
}
SSqlObj* pHeatBeat = pObj->pHb;
assert(pHeatBeat == pHeatBeat->signature);
// to denote the heart-beat timer close connection and free all allocated resources
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pHeatBeat->cmd, 0);
pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE;
}
/* /*
* the following four kinds of SqlObj should not be freed * the following four kinds of SqlObj should not be freed
* 1. SqlObj for stream computing * 1. SqlObj for stream computing
...@@ -1596,7 +1580,7 @@ bool tscShouldBeFreed(SSqlObj* pSql) { ...@@ -1596,7 +1580,7 @@ bool tscShouldBeFreed(SSqlObj* pSql) {
} }
STscObj* pTscObj = pSql->pTscObj; STscObj* pTscObj = pSql->pTscObj;
if (pSql->pStream != NULL || pTscObj->pHb == pSql || pSql->pSubscription != NULL) { if (pSql->pStream != NULL || pTscObj->hbrid == pSql->self || pSql->pSubscription != NULL) {
return false; return false;
} }
...@@ -1888,13 +1872,10 @@ void tscResetForNextRetrieve(SSqlRes* pRes) { ...@@ -1888,13 +1872,10 @@ void tscResetForNextRetrieve(SSqlRes* pRes) {
} }
void registerSqlObj(SSqlObj* pSql) { void registerSqlObj(SSqlObj* pSql) {
int32_t DEFAULT_LIFE_TIME = 2 * 600 * 1000; // 1200 sec
int32_t ref = T_REF_INC(pSql->pTscObj); int32_t ref = T_REF_INC(pSql->pTscObj);
tscDebug("%p add to tscObj:%p, ref:%d", pSql, pSql->pTscObj, ref); tscDebug("%p add to tscObj:%p, ref:%d", pSql, pSql->pTscObj, ref);
TSDB_CACHE_PTR_TYPE p = (TSDB_CACHE_PTR_TYPE) pSql; pSql->self = taosAddRef(tscObjRef, pSql);
pSql->self = taosCachePut(tscObjCache, &p, sizeof(TSDB_CACHE_PTR_TYPE), &p, sizeof(TSDB_CACHE_PTR_TYPE), DEFAULT_LIFE_TIME);
} }
SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cmd) { SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cmd) {
......
...@@ -40,15 +40,14 @@ ...@@ -40,15 +40,14 @@
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
int32_t master;
int32_t num; // number of continuous streams
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
char pass[TSDB_PASSWORD_LEN]; char pass[TSDB_PASSWORD_LEN];
char db[TSDB_DB_NAME_LEN]; char db[TSDB_DB_NAME_LEN];
FCqWrite cqWrite; FCqWrite cqWrite;
void *ahandle;
int32_t num; // number of continuous streams
struct SCqObj *pHead; struct SCqObj *pHead;
void *dbConn; void *dbConn;
int32_t master;
void *tmrCtrl; void *tmrCtrl;
pthread_mutex_t mutex; pthread_mutex_t mutex;
} SCqContext; } SCqContext;
...@@ -90,7 +89,6 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) { ...@@ -90,7 +89,6 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
tstrncpy(pContext->db, db, sizeof(pContext->db)); tstrncpy(pContext->db, db, sizeof(pContext->db));
pContext->vgId = pCfg->vgId; pContext->vgId = pCfg->vgId;
pContext->cqWrite = pCfg->cqWrite; pContext->cqWrite = pCfg->cqWrite;
pContext->ahandle = ahandle;
tscEmbedded = 1; tscEmbedded = 1;
pthread_mutex_init(&pContext->mutex, NULL); pthread_mutex_init(&pContext->mutex, NULL);
...@@ -342,7 +340,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { ...@@ -342,7 +340,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
pHead->version = 0; pHead->version = 0;
// write into vnode write queue // write into vnode write queue
pContext->cqWrite(pContext->ahandle, pHead, TAOS_QTYPE_CQ, NULL); pContext->cqWrite(pContext->vgId, pHead, TAOS_QTYPE_CQ, NULL);
free(buffer); free(buffer);
} }
...@@ -24,7 +24,7 @@ ...@@ -24,7 +24,7 @@
int64_t ver = 0; int64_t ver = 0;
void *pCq = NULL; void *pCq = NULL;
int writeToQueue(void *pVnode, void *data, int type, void *pMsg) { int writeToQueue(int32_t vgId, void *data, int type, void *pMsg) {
return 0; return 0;
} }
......
...@@ -257,7 +257,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf ...@@ -257,7 +257,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf
#define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64 #define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64
#define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE #define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE
#define TSDB_MAX_SQL_SHOW_LEN 512 #define TSDB_MAX_SQL_SHOW_LEN 512
#define TSDB_MAX_ALLOWED_SQL_LEN (8*1024*1024U) // sql length should be less than 8mb #define TSDB_MAX_ALLOWED_SQL_LEN (1*1024*1024U) // sql length should be less than 1mb
#define TSDB_APPNAME_LEN TSDB_UNI_LEN #define TSDB_APPNAME_LEN TSDB_UNI_LEN
......
...@@ -21,7 +21,7 @@ extern "C" { ...@@ -21,7 +21,7 @@ extern "C" {
#include "tdataformat.h" #include "tdataformat.h"
typedef int32_t (*FCqWrite)(void *ahandle, void *pHead, int32_t qtype, void *pMsg); typedef int32_t (*FCqWrite)(int32_t vgId, void *pHead, int32_t qtype, void *pMsg);
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
......
...@@ -630,8 +630,16 @@ static void rpcReleaseConn(SRpcConn *pConn) { ...@@ -630,8 +630,16 @@ static void rpcReleaseConn(SRpcConn *pConn) {
} else { } else {
// if there is an outgoing message, free it // if there is an outgoing message, free it
if (pConn->outType && pConn->pReqMsg) { if (pConn->outType && pConn->pReqMsg) {
if (pConn->pContext) pConn->pContext->pConn = NULL; SRpcReqContext *pContext = pConn->pContext;
taosRemoveRef(tsRpcRefId, pConn->pContext->rid); if (pContext->pRsp) {
// for synchronous API, post semaphore to unblock app
pContext->pRsp->code = TSDB_CODE_RPC_APP_ERROR;
pContext->pRsp->pCont = NULL;
pContext->pRsp->contLen = 0;
tsem_post(pContext->pSem);
}
pContext->pConn = NULL;
taosRemoveRef(tsRpcRefId, pContext->rid);
} }
} }
......
...@@ -917,6 +917,8 @@ static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int ...@@ -917,6 +917,8 @@ static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int
ASSERT(pHelper->pCompInfo->blocks[0].keyLast < pHelper->pCompInfo->blocks[1].keyFirst); ASSERT(pHelper->pCompInfo->blocks[0].keyLast < pHelper->pCompInfo->blocks[1].keyFirst);
} }
ASSERT((blkIdx == pIdx->numOfBlocks -1) || (!pCompBlock->last));
tsdbDebug("vgId:%d tid:%d a super block is inserted at index %d", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid, tsdbDebug("vgId:%d tid:%d a super block is inserted at index %d", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid,
blkIdx); blkIdx);
...@@ -1042,6 +1044,8 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int ...@@ -1042,6 +1044,8 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int
pIdx->maxKey = blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->keyLast; pIdx->maxKey = blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->keyLast;
pIdx->hasLast = (uint32_t)blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->last; pIdx->hasLast = (uint32_t)blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->last;
ASSERT((blkIdx == pIdx->numOfBlocks-1) || (!pCompBlock->last));
tsdbDebug("vgId:%d tid:%d a super block is updated at index %d", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid, tsdbDebug("vgId:%d tid:%d a super block is updated at index %d", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid,
blkIdx); blkIdx);
...@@ -1622,11 +1626,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, ...@@ -1622,11 +1626,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
pCfg->update); pCfg->update);
if (pDataCols->numOfRows == 0) break; if (pDataCols->numOfRows == 0) break;
if (tblkIdx == pIdx->numOfBlocks - 1) { if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1;
if (tsdbWriteBlockToProperFile(pHelper, pDataCols, &compBlock) < 0) return -1;
} else {
if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1;
}
if (round == 0) { if (round == 0) {
if (oBlock.last && pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false; if (oBlock.last && pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false;
......
...@@ -560,37 +560,6 @@ void taosTmrCleanUp(void* handle) { ...@@ -560,37 +560,6 @@ void taosTmrCleanUp(void* handle) {
tmrDebug("%s timer controller is cleaned up.", ctrl->label); tmrDebug("%s timer controller is cleaned up.", ctrl->label);
ctrl->label[0] = 0; ctrl->label[0] = 0;
// cancel all timers of this controller
for (size_t i = 0; i < timerMap.size; i++) {
timer_list_t* list = timerMap.slots + i;
lockTimerList(list);
tmr_obj_t* t = list->timers;
tmr_obj_t* prev = NULL;
while (t != NULL) {
tmr_obj_t* next = t->mnext;
if (t->ctrl != ctrl) {
prev = t;
t = next;
continue;
}
uint8_t state = atomic_val_compare_exchange_8(&t->state, TIMER_STATE_WAITING, TIMER_STATE_CANCELED);
if (state == TIMER_STATE_WAITING) {
removeFromWheel(t);
}
timerDecRef(t);
if (prev == NULL) {
list->timers = next;
} else {
prev->mnext = next;
}
t = next;
}
unlockTimerList(list);
}
pthread_mutex_lock(&tmrCtrlMutex); pthread_mutex_lock(&tmrCtrlMutex);
ctrl->next = unusedTmrCtrl; ctrl->next = unusedTmrCtrl;
numOfTmrCtrl--; numOfTmrCtrl--;
......
...@@ -272,7 +272,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -272,7 +272,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
strcpy(cqCfg.pass, tsInternalPass); strcpy(cqCfg.pass, tsInternalPass);
strcpy(cqCfg.db, pVnode->db); strcpy(cqCfg.db, pVnode->db);
cqCfg.vgId = vnode; cqCfg.vgId = vnode;
cqCfg.cqWrite = vnodeWriteToWQueue; cqCfg.cqWrite = vnodeWriteToCache;
pVnode->cq = cqOpen(pVnode, &cqCfg); pVnode->cq = cqOpen(pVnode, &cqCfg);
if (pVnode->cq == NULL) { if (pVnode->cq == NULL) {
vnodeCleanUp(pVnode); vnodeCleanUp(pVnode);
......
...@@ -18,7 +18,7 @@ import time ...@@ -18,7 +18,7 @@ import time
import argparse import argparse
class RestfulInsert: class RestfulInsert:
def __init__(self, host, startTimestamp, dbname, threads, tables, records, batchSize, tbNamePerfix, outOfOrder): def __init__(self, host, startTimestamp, dbname, threads, tables, records, batchSize, tbNamePerfix, outOfOrder,tablePerbatch):
self.header = {'Authorization': 'Basic cm9vdDp0YW9zZGF0YQ=='} self.header = {'Authorization': 'Basic cm9vdDp0YW9zZGF0YQ=='}
self.url = "http://%s:6041/rest/sql" % host self.url = "http://%s:6041/rest/sql" % host
self.ts = startTimestamp self.ts = startTimestamp
...@@ -29,12 +29,15 @@ class RestfulInsert: ...@@ -29,12 +29,15 @@ class RestfulInsert:
self.batchSize = batchSize self.batchSize = batchSize
self.tableNamePerfix = tbNamePerfix self.tableNamePerfix = tbNamePerfix
self.outOfOrder = outOfOrder self.outOfOrder = outOfOrder
self.tablePerbatch = tablePerbatch
def createTable(self, threadID): def createTable(self, threadID):
tablesPerThread = int (self.numOfTables / self.numOfThreads) tablesPerThread = int (self.numOfTables / self.numOfThreads)
print("create table %d to %d" % (tablesPerThread * threadID, tablesPerThread * (threadID + 1) - 1)) loop = tablesPerThread if threadID != self.numOfThreads - 1 else self.numOfTables - tablesPerThread * threadID
for i in range(tablesPerThread): print("create table %d to %d" % (tablesPerThread * threadID, tablesPerThread * threadID + loop - 1))
for i in range(loop):
tableID = threadID * tablesPerThread tableID = threadID * tablesPerThread
if tableID + i >= self.numOfTables : break
name = 'beijing' if tableID % 2 == 0 else 'shanghai' name = 'beijing' if tableID % 2 == 0 else 'shanghai'
data = "create table if not exists %s.%s%d using %s.meters tags(%d, '%s')" % (self.dbname, self.tableNamePerfix, tableID + i, self.dbname, tableID + i, name) data = "create table if not exists %s.%s%d using %s.meters tags(%d, '%s')" % (self.dbname, self.tableNamePerfix, tableID + i, self.dbname, tableID + i, name)
response = requests.post(self.url, data, headers = self.header) response = requests.post(self.url, data, headers = self.header)
...@@ -55,6 +58,58 @@ class RestfulInsert: ...@@ -55,6 +58,58 @@ class RestfulInsert:
response = requests.post(self.url, data, headers = self.header) response = requests.post(self.url, data, headers = self.header)
if response.status_code != 200: if response.status_code != 200:
print(response.content) print(response.content)
def insertnData(self, threadID):
print("thread %d started" % threadID)
tablesPerThread = int (self.numOfTables / self.numOfThreads)
loop = int(self.recordsPerTable / self.batchSize)
if self.tablePerbatch == 1 :
for i in range(tablesPerThread+1):
tableID = i + threadID * tablesPerThread
if tableID >= self.numOfTables: return
start = self.ts
start1=time.time()
for k in range(loop):
data = "insert into %s.%s%d values" % (self.dbname, self.tableNamePerfix, tableID)
values = []
bloop = self.batchSize if k != loop - 1 else self.recordsPerTable - self.batchSize * k
for l in range(bloop):
values.append("(%d, %d, %d, %d)" % (start + k * self.batchSize + l, random.randint(1, 100), random.randint(1, 100), random.randint(1, 100)))
if len(data) > 1048576 :
print ('batch size is larger than 1M')
exit(-1)
if self.outOfOrder :
random.shuffle(values)
data+=''.join(values)
response = requests.post(self.url, data, headers = self.header)
if response.status_code != 200:
print(response.content)
print('----------------',loop,time.time()-start1)
else:
for i in range(0,tablesPerThread+self.tablePerbatch,self.tablePerbatch):
for k in range(loop):
data = "insert into "
for j in range(self.tablePerbatch):
tableID = i + threadID * tablesPerThread+j
if tableID >= self.numOfTables: return
start = self.ts
data += "%s.%s%d values" % (self.dbname, self.tableNamePerfix, tableID)
values = []
bloop = self.batchSize if k != loop - 1 else self.recordsPerTable - self.batchSize * k
for l in range(bloop):
values.append("(%d, %d, %d, %d)" % (start + k * self.batchSize + l, random.randint(1, 100), random.randint(1, 100), random.randint(1, 100)))
if self.outOfOrder :
random.shuffle(values)
data+=''.join(values)
print('------------------',len(data))
if len(data) > 1024*1024 :
print ('batch size is larger than 1M')
exit(-1)
response = requests.post(self.url, data, headers = self.header)
if response.status_code != 200:
print(response.content)
def insertUnlimitedData(self, threadID): def insertUnlimitedData(self, threadID):
print("thread %d started" % threadID) print("thread %d started" % threadID)
...@@ -85,7 +140,7 @@ class RestfulInsert: ...@@ -85,7 +140,7 @@ class RestfulInsert:
if response.status_code != 200: if response.status_code != 200:
print(response.content) print(response.content)
def run(self): def run(self):
data = "create database if not exists %s" % self.dbname data = "create database if not exists %s" % self.dbname
requests.post(self.url, data, headers = self.header) requests.post(self.url, data, headers = self.header)
data = "create table if not exists %s.meters(ts timestamp, f1 int, f2 int, f3 int) tags(id int, loc nchar(20))" % self.dbname data = "create table if not exists %s.meters(ts timestamp, f1 int, f2 int, f3 int) tags(id int, loc nchar(20))" % self.dbname
...@@ -178,7 +233,18 @@ parser.add_argument( ...@@ -178,7 +233,18 @@ parser.add_argument(
'--out-of-order', '--out-of-order',
action='store_true', action='store_true',
help='The order of test data (default: False)') help='The order of test data (default: False)')
parser.add_argument(
'-b',
'--table-per-batch',
action='store',
default=1,
type=int,
help='the table per batch (default: 1)')
args = parser.parse_args() args = parser.parse_args()
ri = RestfulInsert(args.host_name, args.start_timestamp, args.db_name, args.number_of_threads, args.number_of_tables, args.number_of_records, args.batch_size, args.table_name_prefix, args.out_of_order) ri = RestfulInsert(
args.host_name, args.start_timestamp, args.db_name, args.number_of_threads, args.number_of_tables,
args.number_of_records, args.batch_size, args.table_name_prefix, args.out_of_order, args.table_per_batch)
ri.run() ri.run()
\ No newline at end of file
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def run(self):
tdSql.prepare()
print("==========step1")
print("create table && insert data")
tdSql.execute("create table mt0 (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool,c8 binary(20),c9 nchar(20))")
insertRows = 1000
t0 = 1604298064000
tdLog.info("insert %d rows" % (insertRows))
for i in range(insertRows):
ret = tdSql.execute(
"insert into mt0 values (%d , %d,%d,%d,%d,%d,%d,%d,'%s','%s')" %
(t0+i,i%100,i/2,i%41,i%100,i%100,i*1.0,i%2,'taos'+str(i%100),'涛思'+str(i%100)))
print("==========step2")
print("test last with group by normal_col ")
tdSql.query('select last(c1) from mt0 group by c3')
tdSql.checkData(0,0,84)
tdSql.checkData(0,1,85)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
\ No newline at end of file
...@@ -38,12 +38,12 @@ class TDTestCase: ...@@ -38,12 +38,12 @@ class TDTestCase:
print("test col*1*1 desc ") print("test col*1*1 desc ")
tdSql.query('select c1,c1*1*1,c2*1*1,c3*1*1,c4*1*1,c5*1*1,c6*1*1 from mt0 order by ts desc limit 2') tdSql.query('select c1,c1*1*1,c2*1*1,c3*1*1,c4*1*1,c5*1*1,c6*1*1 from mt0 order by ts desc limit 2')
tdSql.checkData(0,0,99) tdSql.checkData(0,0,99)
tdSql.checkData(0,1,0.0) tdSql.checkData(0,1,99.0)
tdSql.checkData(0,2,0.0) tdSql.checkData(0,2,499.0)
tdSql.checkData(0,3,0.0) tdSql.checkData(0,3,99.0)
tdSql.checkData(0,4,0.0) tdSql.checkData(0,4,99.0)
tdSql.checkData(0,5,0.0) tdSql.checkData(0,5,99.0)
tdSql.checkData(0,6,0.0) tdSql.checkData(0,6,999.0)
def stop(self): def stop(self):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册