提交 b00fb8c1 编写于 作者: dengyihao's avatar dengyihao

reduce client mem cost

上级 5a207951
...@@ -331,7 +331,7 @@ typedef struct STscObj { ...@@ -331,7 +331,7 @@ typedef struct STscObj {
char superAuth : 1; char superAuth : 1;
uint32_t connId; uint32_t connId;
uint64_t rid; // ref ID returned by taosAddRef uint64_t rid; // ref ID returned by taosAddRef
struct SSqlObj * pHb; int64_t hbrid;
struct SSqlObj * sqlList; struct SSqlObj * sqlList;
struct SSqlStream *streamList; struct SSqlStream *streamList;
void* pDnodeConn; void* pDnodeConn;
...@@ -371,7 +371,7 @@ typedef struct SSqlObj { ...@@ -371,7 +371,7 @@ typedef struct SSqlObj {
struct SSqlObj **pSubs; struct SSqlObj **pSubs;
struct SSqlObj *prev, *next; struct SSqlObj *prev, *next;
struct SSqlObj **self; int64_t self;
} SSqlObj; } SSqlObj;
typedef struct SSqlStream { typedef struct SSqlStream {
...@@ -504,7 +504,7 @@ static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pField ...@@ -504,7 +504,7 @@ static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pField
} }
extern SCacheObj* tscMetaCache; extern SCacheObj* tscMetaCache;
extern SCacheObj* tscObjCache; extern int tscObjRef;
extern void * tscTmr; extern void * tscTmr;
extern void * tscQhandle; extern void * tscQhandle;
extern int tscKeepConn[]; extern int tscKeepConn[];
......
...@@ -814,8 +814,11 @@ static int32_t tscProcessClientVer(SSqlObj *pSql) { ...@@ -814,8 +814,11 @@ static int32_t tscProcessClientVer(SSqlObj *pSql) {
static int32_t tscProcessServStatus(SSqlObj *pSql) { static int32_t tscProcessServStatus(SSqlObj *pSql) {
STscObj* pObj = pSql->pTscObj; STscObj* pObj = pSql->pTscObj;
if (pObj->pHb != NULL) { SSqlObj* pHb = (SSqlObj*)taosAcquireRef(tscObjRef, pObj->hbrid);
if (pObj->pHb->res.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { if (pHb != NULL) {
int32_t code = pHb->res.code;
taosReleaseRef(tscObjRef, pObj->hbrid);
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
pSql->res.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; pSql->res.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
return pSql->res.code; return pSql->res.code;
} }
......
...@@ -175,10 +175,10 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { ...@@ -175,10 +175,10 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId)); if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId));
} }
} else { } else {
tscDebug("%p heartbeat failed, code:%s", pObj->pHb, tstrerror(code)); tscDebug("%" PRId64 " heartbeat failed, code:%s", pObj->hbrid, tstrerror(code));
} }
if (pObj->pHb != NULL) { if (pObj->hbrid != 0) {
int32_t waitingDuring = tsShellActivityTimer * 500; int32_t waitingDuring = tsShellActivityTimer * 500;
tscDebug("%p send heartbeat in %dms", pSql, waitingDuring); tscDebug("%p send heartbeat in %dms", pSql, waitingDuring);
...@@ -193,21 +193,11 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { ...@@ -193,21 +193,11 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
STscObj *pObj = taosAcquireRef(tscRefId, rid); STscObj *pObj = taosAcquireRef(tscRefId, rid);
if (pObj == NULL) return; if (pObj == NULL) return;
SSqlObj* pHB = pObj->pHb; SSqlObj* pHB = taosAcquireRef(tscObjRef, pObj->hbrid);
assert(pHB->self == pObj->hbrid);
void** p = taosCacheAcquireByKey(tscObjCache, &pHB, sizeof(TSDB_CACHE_PTR_TYPE));
if (p == NULL) {
tscWarn("%p HB object has been released already", pHB);
taosReleaseRef(tscRefId, pObj->rid);
return;
}
assert(*pHB->self == pHB);
pHB->retry = 0; pHB->retry = 0;
int32_t code = tscProcessSql(pHB); int32_t code = tscProcessSql(pHB);
taosCacheRelease(tscObjCache, (void**) &p, false); taosReleaseRef(tscObjRef, pObj->hbrid);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tscError("%p failed to sent HB to server, reason:%s", pHB, tstrerror(code)); tscError("%p failed to sent HB to server, reason:%s", pHB, tstrerror(code));
} }
...@@ -236,7 +226,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { ...@@ -236,7 +226,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
.msgType = pSql->cmd.msgType, .msgType = pSql->cmd.msgType,
.pCont = pMsg, .pCont = pMsg,
.contLen = pSql->cmd.payloadLen, .contLen = pSql->cmd.payloadLen,
.ahandle = pSql, .ahandle = (void *)pSql->self,
.handle = NULL, .handle = NULL,
.code = 0 .code = 0
}; };
...@@ -251,26 +241,25 @@ int tscSendMsgToServer(SSqlObj *pSql) { ...@@ -251,26 +241,25 @@ int tscSendMsgToServer(SSqlObj *pSql) {
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
TSDB_CACHE_PTR_TYPE handle = (TSDB_CACHE_PTR_TYPE) rpcMsg->ahandle; TSDB_CACHE_PTR_TYPE handle = (TSDB_CACHE_PTR_TYPE) rpcMsg->ahandle;
void** p = taosCacheAcquireByKey(tscObjCache, &handle, sizeof(TSDB_CACHE_PTR_TYPE)); SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle);
if (p == NULL) { if (pSql == NULL) {
rpcFreeCont(rpcMsg->pCont); rpcFreeCont(rpcMsg->pCont);
return; return;
} }
assert(pSql->self == handle);
SSqlObj* pSql = *p;
assert(pSql != NULL);
STscObj *pObj = pSql->pTscObj; STscObj *pObj = pSql->pTscObj;
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
assert(*pSql->self == pSql); assert(pSql->self == handle);
pSql->rpcRid = -1; pSql->rpcRid = -1;
if (pObj->signature != pObj) { if (pObj->signature != pObj) {
tscDebug("%p DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command, pObj, pObj->signature); tscDebug("%p DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command, pObj, pObj->signature);
taosCacheRelease(tscObjCache, (void**) &p, true); taosRemoveRef(tscObjRef, pSql->self);
taosReleaseRef(tscObjRef, pSql->self);
rpcFreeCont(rpcMsg->pCont); rpcFreeCont(rpcMsg->pCont);
return; return;
} }
...@@ -280,10 +269,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { ...@@ -280,10 +269,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
tscDebug("%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p", tscDebug("%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p",
pSql, pCmd->command, pQueryInfo->type, pObj, pObj->signature); pSql, pCmd->command, pQueryInfo->type, pObj, pObj->signature);
void** p1 = p; taosRemoveRef(tscObjRef, pSql->self);
taosCacheRelease(tscObjCache, (void**) &p1, false); taosReleaseRef(tscObjRef, pSql->self);
taosCacheRelease(tscObjCache, (void**) &p, true);
rpcFreeCont(rpcMsg->pCont); rpcFreeCont(rpcMsg->pCont);
return; return;
} }
...@@ -326,7 +313,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { ...@@ -326,7 +313,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
// if there is an error occurring, proceed to the following error handling procedure. // if there is an error occurring, proceed to the following error handling procedure.
if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosCacheRelease(tscObjCache, (void**) &p, false); taosReleaseRef(tscObjRef, pSql->self);
rpcFreeCont(rpcMsg->pCont); rpcFreeCont(rpcMsg->pCont);
return; return;
} }
...@@ -334,7 +321,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { ...@@ -334,7 +321,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
} }
pRes->rspLen = 0; pRes->rspLen = 0;
if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) { if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
tscDebug("%p query is cancelled, code:%s", pSql, tstrerror(pRes->code)); tscDebug("%p query is cancelled, code:%s", pSql, tstrerror(pRes->code));
} else { } else {
...@@ -383,7 +370,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { ...@@ -383,7 +370,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
tscDebug("%p SQL cmd:%s, code:%s rspLen:%d", pSql, sqlCmd[pCmd->command], tstrerror(pRes->code), pRes->rspLen); tscDebug("%p SQL cmd:%s, code:%s rspLen:%d", pSql, sqlCmd[pCmd->command], tstrerror(pRes->code), pRes->rspLen);
} }
} }
if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) { if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) {
rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql); rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
} }
...@@ -394,11 +381,10 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { ...@@ -394,11 +381,10 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
(*pSql->fp)(pSql->param, pSql, rpcMsg->code); (*pSql->fp)(pSql->param, pSql, rpcMsg->code);
} }
void** p1 = p; taosReleaseRef(tscObjRef, pSql->self);
taosCacheRelease(tscObjCache, (void**) &p1, false);
if (shouldFree) { // in case of table-meta/vgrouplist query, automatically free it if (shouldFree) { // in case of table-meta/vgrouplist query, automatically free it
taosCacheRelease(tscObjCache, (void **)&p, true); taosRemoveRef(tscObjRef, pSql->self);
tscDebug("%p sqlObj is automatically freed", pSql); tscDebug("%p sqlObj is automatically freed", pSql);
} }
...@@ -419,7 +405,7 @@ int doProcessSql(SSqlObj *pSql) { ...@@ -419,7 +405,7 @@ int doProcessSql(SSqlObj *pSql) {
pCmd->command == TSDB_SQL_STABLEVGROUP) { pCmd->command == TSDB_SQL_STABLEVGROUP) {
pRes->code = tscBuildMsg[pCmd->command](pSql, NULL); pRes->code = tscBuildMsg[pCmd->command](pSql, NULL);
} }
if (pRes->code != TSDB_CODE_SUCCESS) { if (pRes->code != TSDB_CODE_SUCCESS) {
tscQueueAsyncRes(pSql); tscQueueAsyncRes(pSql);
return pRes->code; return pRes->code;
...@@ -433,14 +419,14 @@ int doProcessSql(SSqlObj *pSql) { ...@@ -433,14 +419,14 @@ int doProcessSql(SSqlObj *pSql) {
tscQueueAsyncRes(pSql); tscQueueAsyncRes(pSql);
return code; return code;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int tscProcessSql(SSqlObj *pSql) { int tscProcessSql(SSqlObj *pSql) {
char *name = NULL; char *name = NULL;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
STableMetaInfo *pTableMetaInfo = NULL; STableMetaInfo *pTableMetaInfo = NULL;
uint32_t type = 0; uint32_t type = 0;
...@@ -465,7 +451,7 @@ int tscProcessSql(SSqlObj *pSql) { ...@@ -465,7 +451,7 @@ int tscProcessSql(SSqlObj *pSql) {
} else { // local handler } else { // local handler
return (*tscProcessMsgRsp[pCmd->command])(pSql); return (*tscProcessMsgRsp[pCmd->command])(pSql);
} }
return doProcessSql(pSql); return doProcessSql(pSql);
} }
...@@ -478,7 +464,7 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -478,7 +464,7 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
// todo valid the vgroupId at the client side // todo valid the vgroupId at the client side
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
int32_t vgIndex = pTableMetaInfo->vgroupIndex; int32_t vgIndex = pTableMetaInfo->vgroupIndex;
if (pTableMetaInfo->pVgroupTables == NULL) { if (pTableMetaInfo->pVgroupTables == NULL) {
...@@ -513,9 +499,9 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -513,9 +499,9 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta; STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
char* pMsg = pSql->cmd.payload; char* pMsg = pSql->cmd.payload;
// NOTE: shell message size should not include SMsgDesc // NOTE: shell message size should not include SMsgDesc
int32_t size = pSql->cmd.payloadLen - sizeof(SMsgDesc); int32_t size = pSql->cmd.payloadLen - sizeof(SMsgDesc);
int32_t vgId = pTableMeta->vgroupInfo.vgId; int32_t vgId = pTableMeta->vgroupInfo.vgId;
...@@ -529,7 +515,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -529,7 +515,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pShellMsg->header.vgId = htonl(vgId); pShellMsg->header.vgId = htonl(vgId);
pShellMsg->header.contLen = htonl(size); // the length not includes the size of SMsgDesc pShellMsg->header.contLen = htonl(size); // the length not includes the size of SMsgDesc
pShellMsg->length = pShellMsg->header.contLen; pShellMsg->length = pShellMsg->header.contLen;
pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit); // number of tables to be inserted pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit); // number of tables to be inserted
// pSql->cmd.payloadLen is set during copying data into payload // pSql->cmd.payloadLen is set during copying data into payload
...@@ -570,7 +556,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) { ...@@ -570,7 +556,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
} }
return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + exprSize + tsBufSize + return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + exprSize + tsBufSize +
tableSerialize + 4096; tableSerialize + 4096;
} }
static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char *pMsg) { static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char *pMsg) {
...@@ -579,12 +565,12 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char ...@@ -579,12 +565,12 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta; STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) || pTableMetaInfo->pVgroupTables == NULL) { if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) || pTableMetaInfo->pVgroupTables == NULL) {
SVgroupInfo* pVgroupInfo = NULL; SVgroupInfo* pVgroupInfo = NULL;
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
int32_t index = pTableMetaInfo->vgroupIndex; int32_t index = pTableMetaInfo->vgroupIndex;
assert(index >= 0); assert(index >= 0);
if (pTableMetaInfo->vgroupList->numOfVgroups > 0) { if (pTableMetaInfo->vgroupList->numOfVgroups > 0) {
assert(index < pTableMetaInfo->vgroupList->numOfVgroups); assert(index < pTableMetaInfo->vgroupList->numOfVgroups);
pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[index]; pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[index];
...@@ -618,14 +604,14 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char ...@@ -618,14 +604,14 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
// set the vgroup info // set the vgroup info
tscSetDnodeEpSet(pSql, &pTableIdList->vgInfo); tscSetDnodeEpSet(pSql, &pTableIdList->vgInfo);
pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId); pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId);
int32_t numOfTables = (int32_t)taosArrayGetSize(pTableIdList->itemList); int32_t numOfTables = (int32_t)taosArrayGetSize(pTableIdList->itemList);
pQueryMsg->numOfTables = htonl(numOfTables); // set the number of tables pQueryMsg->numOfTables = htonl(numOfTables); // set the number of tables
// serialize each table id info // serialize each table id info
for(int32_t i = 0; i < numOfTables; ++i) { for(int32_t i = 0; i < numOfTables; ++i) {
STableIdInfo* pItem = taosArrayGet(pTableIdList->itemList, i); STableIdInfo* pItem = taosArrayGet(pTableIdList->itemList, i);
STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg; STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
pTableIdInfo->tid = htonl(pItem->tid); pTableIdInfo->tid = htonl(pItem->tid);
pTableIdInfo->uid = htobe64(pItem->uid); pTableIdInfo->uid = htobe64(pItem->uid);
...@@ -633,10 +619,10 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char ...@@ -633,10 +619,10 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
pMsg += sizeof(STableIdInfo); pMsg += sizeof(STableIdInfo);
} }
} }
tscDebug("%p vgId:%d, query on table:%s, tid:%d, uid:%" PRIu64, pSql, htonl(pQueryMsg->head.vgId), pTableMetaInfo->name, tscDebug("%p vgId:%d, query on table:%s, tid:%d, uid:%" PRIu64, pSql, htonl(pQueryMsg->head.vgId), pTableMetaInfo->name,
pTableMeta->id.tid, pTableMeta->id.uid); pTableMeta->id.tid, pTableMeta->id.uid);
return pMsg; return pMsg;
} }
...@@ -649,7 +635,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -649,7 +635,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
tscError("%p failed to malloc for query msg", pSql); tscError("%p failed to malloc for query msg", pSql);
return TSDB_CODE_TSC_INVALID_SQL; // todo add test for this return TSDB_CODE_TSC_INVALID_SQL; // todo add test for this
} }
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta; STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
...@@ -661,12 +647,12 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -661,12 +647,12 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
} }
if (pQueryInfo->interval.interval < 0) { if (pQueryInfo->interval.interval < 0) {
tscError("%p illegal value of aggregation time interval in query msg: %" PRId64, pSql, (int64_t)pQueryInfo->interval.interval); tscError("%p illegal value of aggregation time interval in query msg: %" PRId64, pSql, (int64_t)pQueryInfo->interval.interval);
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
} }
if (pQueryInfo->groupbyExpr.numOfGroupCols < 0) { if (pQueryInfo->groupbyExpr.numOfGroupCols < 0) {
tscError("%p illegal value of numOfGroupCols in query msg: %d", pSql, pQueryInfo->groupbyExpr.numOfGroupCols); tscError("%p illegal value of numOfGroupCols in query msg: %d", pSql, pQueryInfo->groupbyExpr.numOfGroupCols);
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
...@@ -675,7 +661,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -675,7 +661,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload; SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload;
int32_t numOfTags = (int32_t)taosArrayGetSize(pTableMetaInfo->tagColList); int32_t numOfTags = (int32_t)taosArrayGetSize(pTableMetaInfo->tagColList);
if (pQueryInfo->order.order == TSDB_ORDER_ASC) { if (pQueryInfo->order.order == TSDB_ORDER_ASC) {
pQueryMsg->window.skey = htobe64(pQueryInfo->window.skey); pQueryMsg->window.skey = htobe64(pQueryInfo->window.skey);
pQueryMsg->window.ekey = htobe64(pQueryInfo->window.ekey); pQueryMsg->window.ekey = htobe64(pQueryInfo->window.ekey);
...@@ -700,7 +686,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -700,7 +686,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->numOfTags = htonl(numOfTags); pQueryMsg->numOfTags = htonl(numOfTags);
pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType); pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType);
pQueryMsg->queryType = htonl(pQueryInfo->type); pQueryMsg->queryType = htonl(pQueryInfo->type);
size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo); size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo);
pQueryMsg->numOfOutput = htons((int16_t)numOfOutput); pQueryMsg->numOfOutput = htons((int16_t)numOfOutput);
...@@ -708,7 +694,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -708,7 +694,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
size_t numOfCols = taosArrayGetSize(pQueryInfo->colList); size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
char *pMsg = (char *)(pQueryMsg->colList) + numOfCols * sizeof(SColumnInfo); char *pMsg = (char *)(pQueryMsg->colList) + numOfCols * sizeof(SColumnInfo);
SSchema *pSchema = tscGetTableSchema(pTableMeta); SSchema *pSchema = tscGetTableSchema(pTableMeta);
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumn *pCol = taosArrayGetP(pQueryInfo->colList, i); SColumn *pCol = taosArrayGetP(pQueryInfo->colList, i);
SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex]; SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];
...@@ -717,7 +703,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -717,7 +703,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pColSchema->type > TSDB_DATA_TYPE_NCHAR) { pColSchema->type > TSDB_DATA_TYPE_NCHAR) {
tscError("%p tid:%d uid:%" PRIu64" id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s", tscError("%p tid:%d uid:%" PRIu64" id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s",
pSql, pTableMeta->id.tid, pTableMeta->id.uid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex.columnIndex, pSql, pTableMeta->id.tid, pTableMeta->id.uid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex.columnIndex,
pColSchema->name); pColSchema->name);
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
} }
...@@ -787,10 +773,10 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -787,10 +773,10 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pSqlFuncExpr = (SSqlFuncMsg *)pMsg; pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
} }
// serialize the table info (sid, uid, tags) // serialize the table info (sid, uid, tags)
pMsg = doSerializeTableInfo(pQueryMsg, pSql, pMsg); pMsg = doSerializeTableInfo(pQueryMsg, pSql, pMsg);
SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr; SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr;
if (pGroupbyExpr->numOfGroupCols > 0) { if (pGroupbyExpr->numOfGroupCols > 0) {
pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex); pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex);
...@@ -798,7 +784,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -798,7 +784,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
for (int32_t j = 0; j < pGroupbyExpr->numOfGroupCols; ++j) { for (int32_t j = 0; j < pGroupbyExpr->numOfGroupCols; ++j) {
SColIndex* pCol = taosArrayGet(pGroupbyExpr->columnInfo, j); SColIndex* pCol = taosArrayGet(pGroupbyExpr->columnInfo, j);
*((int16_t *)pMsg) = pCol->colId; *((int16_t *)pMsg) = pCol->colId;
pMsg += sizeof(pCol->colId); pMsg += sizeof(pCol->colId);
...@@ -807,7 +793,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -807,7 +793,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
*((int16_t *)pMsg) += pCol->flag; *((int16_t *)pMsg) += pCol->flag;
pMsg += sizeof(pCol->flag); pMsg += sizeof(pCol->flag);
memcpy(pMsg, pCol->name, tListLen(pCol->name)); memcpy(pMsg, pCol->name, tListLen(pCol->name));
pMsg += tListLen(pCol->name); pMsg += tListLen(pCol->name);
} }
...@@ -819,14 +805,14 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -819,14 +805,14 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pMsg += sizeof(pQueryInfo->fillVal[0]); pMsg += sizeof(pQueryInfo->fillVal[0]);
} }
} }
if (numOfTags != 0) { if (numOfTags != 0) {
int32_t numOfColumns = tscGetNumOfColumns(pTableMeta); int32_t numOfColumns = tscGetNumOfColumns(pTableMeta);
int32_t numOfTagColumns = tscGetNumOfTags(pTableMeta); int32_t numOfTagColumns = tscGetNumOfTags(pTableMeta);
int32_t total = numOfTagColumns + numOfColumns; int32_t total = numOfTagColumns + numOfColumns;
pSchema = tscGetTableTagSchema(pTableMeta); pSchema = tscGetTableTagSchema(pTableMeta);
for (int32_t i = 0; i < numOfTags; ++i) { for (int32_t i = 0; i < numOfTags; ++i) {
SColumn *pCol = taosArrayGetP(pTableMetaInfo->tagColList, i); SColumn *pCol = taosArrayGetP(pTableMetaInfo->tagColList, i);
SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex]; SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];
...@@ -834,19 +820,19 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -834,19 +820,19 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
if ((pCol->colIndex.columnIndex >= numOfTagColumns || pCol->colIndex.columnIndex < -1) || if ((pCol->colIndex.columnIndex >= numOfTagColumns || pCol->colIndex.columnIndex < -1) ||
(pColSchema->type < TSDB_DATA_TYPE_BOOL || pColSchema->type > TSDB_DATA_TYPE_NCHAR)) { (pColSchema->type < TSDB_DATA_TYPE_BOOL || pColSchema->type > TSDB_DATA_TYPE_NCHAR)) {
tscError("%p tid:%d uid:%" PRIu64 " id:%s, tag index out of range, totalCols:%d, numOfTags:%d, index:%d, column name:%s", tscError("%p tid:%d uid:%" PRIu64 " id:%s, tag index out of range, totalCols:%d, numOfTags:%d, index:%d, column name:%s",
pSql, pTableMeta->id.tid, pTableMeta->id.uid, pTableMetaInfo->name, total, numOfTagColumns, pSql, pTableMeta->id.tid, pTableMeta->id.uid, pTableMetaInfo->name, total, numOfTagColumns,
pCol->colIndex.columnIndex, pColSchema->name); pCol->colIndex.columnIndex, pColSchema->name);
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
} }
SColumnInfo* pTagCol = (SColumnInfo*) pMsg; SColumnInfo* pTagCol = (SColumnInfo*) pMsg;
pTagCol->colId = htons(pColSchema->colId); pTagCol->colId = htons(pColSchema->colId);
pTagCol->bytes = htons(pColSchema->bytes); pTagCol->bytes = htons(pColSchema->bytes);
pTagCol->type = htons(pColSchema->type); pTagCol->type = htons(pColSchema->type);
pTagCol->numOfFilters = 0; pTagCol->numOfFilters = 0;
pMsg += sizeof(SColumnInfo); pMsg += sizeof(SColumnInfo);
} }
} }
...@@ -854,16 +840,16 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -854,16 +840,16 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
// serialize tag column query condition // serialize tag column query condition
if (pQueryInfo->tagCond.pCond != NULL && taosArrayGetSize(pQueryInfo->tagCond.pCond) > 0) { if (pQueryInfo->tagCond.pCond != NULL && taosArrayGetSize(pQueryInfo->tagCond.pCond) > 0) {
STagCond* pTagCond = &pQueryInfo->tagCond; STagCond* pTagCond = &pQueryInfo->tagCond;
SCond *pCond = tsGetSTableQueryCond(pTagCond, pTableMeta->id.uid); SCond *pCond = tsGetSTableQueryCond(pTagCond, pTableMeta->id.uid);
if (pCond != NULL && pCond->cond != NULL) { if (pCond != NULL && pCond->cond != NULL) {
pQueryMsg->tagCondLen = htons(pCond->len); pQueryMsg->tagCondLen = htons(pCond->len);
memcpy(pMsg, pCond->cond, pCond->len); memcpy(pMsg, pCond->cond, pCond->len);
pMsg += pCond->len; pMsg += pCond->len;
} }
} }
if (pQueryInfo->tagCond.tbnameCond.cond == NULL) { if (pQueryInfo->tagCond.tbnameCond.cond == NULL) {
*pMsg = 0; *pMsg = 0;
pMsg++; pMsg++;
...@@ -895,7 +881,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -895,7 +881,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
tscDebug("%p msg built success,len:%d bytes", pSql, msgLen); tscDebug("%p msg built success,len:%d bytes", pSql, msgLen);
pCmd->payloadLen = msgLen; pCmd->payloadLen = msgLen;
pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY; pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY;
pQueryMsg->head.contLen = htonl(msgLen); pQueryMsg->head.contLen = htonl(msgLen);
assert(msgLen + minMsgSize() <= (int32_t)pCmd->allocSize); assert(msgLen + minMsgSize() <= (int32_t)pCmd->allocSize);
...@@ -926,7 +912,7 @@ int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -926,7 +912,7 @@ int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SCreateDnodeMsg *pCreate = (SCreateDnodeMsg *)pCmd->payload; SCreateDnodeMsg *pCreate = (SCreateDnodeMsg *)pCmd->payload;
strncpy(pCreate->ep, pInfo->pDCLInfo->a[0].z, pInfo->pDCLInfo->a[0].n); strncpy(pCreate->ep, pInfo->pDCLInfo->a[0].z, pInfo->pDCLInfo->a[0].n);
pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DNODE; pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DNODE;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -1287,14 +1273,14 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1287,14 +1273,14 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
SAlterTableSQL *pAlterInfo = pInfo->pAlterInfo; SAlterTableSQL *pAlterInfo = pInfo->pAlterInfo;
int size = tscEstimateAlterTableMsgLength(pCmd); int size = tscEstimateAlterTableMsgLength(pCmd);
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) { if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
tscError("%p failed to malloc for alter table msg", pSql); tscError("%p failed to malloc for alter table msg", pSql);
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
SAlterTableMsg *pAlterTableMsg = (SAlterTableMsg *)pCmd->payload; SAlterTableMsg *pAlterTableMsg = (SAlterTableMsg *)pCmd->payload;
tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pAlterTableMsg->db); tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pAlterTableMsg->db);
...@@ -1305,7 +1291,7 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1305,7 +1291,7 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSchema *pSchema = pAlterTableMsg->schema; SSchema *pSchema = pAlterTableMsg->schema;
for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) { for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i); TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
pSchema->type = pField->type; pSchema->type = pField->type;
strcpy(pSchema->name, pField->name); strcpy(pSchema->name, pField->name);
pSchema->bytes = htons(pField->bytes); pSchema->bytes = htons(pField->bytes);
...@@ -1330,7 +1316,7 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1330,7 +1316,7 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) { int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
pCmd->msgType = TSDB_MSG_TYPE_UPDATE_TAG_VAL; pCmd->msgType = TSDB_MSG_TYPE_UPDATE_TAG_VAL;
SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) pCmd->payload; SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) pCmd->payload;
pCmd->payloadLen = htonl(pUpdateMsg->head.contLen); pCmd->payloadLen = htonl(pUpdateMsg->head.contLen);
...@@ -1425,7 +1411,7 @@ int tscProcessDescribeTableRsp(SSqlObj *pSql) { ...@@ -1425,7 +1411,7 @@ int tscProcessDescribeTableRsp(SSqlObj *pSql) {
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
int32_t numOfRes = tinfo.numOfColumns + tinfo.numOfTags; int32_t numOfRes = tinfo.numOfColumns + tinfo.numOfTags;
return tscLocalResultCommonBuilder(pSql, numOfRes); return tscLocalResultCommonBuilder(pSql, numOfRes);
} }
...@@ -1557,7 +1543,7 @@ int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1557,7 +1543,7 @@ int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
assert(pCmd->payloadLen + minMsgSize() <= pCmd->allocSize); assert(pCmd->payloadLen + minMsgSize() <= pCmd->allocSize);
tscDebug("%p build load multi-metermeta msg completed, numOfTables:%d, msg size:%d", pSql, pCmd->count, tscDebug("%p build load multi-metermeta msg completed, numOfTables:%d, msg size:%d", pSql, pCmd->count,
pCmd->payloadLen); pCmd->payloadLen);
return pCmd->payloadLen; return pCmd->payloadLen;
#endif #endif
...@@ -1593,7 +1579,7 @@ int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1593,7 +1579,7 @@ int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
char* pMsg = pCmd->payload; char* pMsg = pCmd->payload;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
...@@ -1668,7 +1654,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { ...@@ -1668,7 +1654,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
pMetaMsg->sversion = htons(pMetaMsg->sversion); pMetaMsg->sversion = htons(pMetaMsg->sversion);
pMetaMsg->tversion = htons(pMetaMsg->tversion); pMetaMsg->tversion = htons(pMetaMsg->tversion);
pMetaMsg->vgroup.vgId = htonl(pMetaMsg->vgroup.vgId); pMetaMsg->vgroup.vgId = htonl(pMetaMsg->vgroup.vgId);
pMetaMsg->uid = htobe64(pMetaMsg->uid); pMetaMsg->uid = htobe64(pMetaMsg->uid);
pMetaMsg->contLen = htons(pMetaMsg->contLen); pMetaMsg->contLen = htons(pMetaMsg->contLen);
pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns); pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns);
...@@ -1676,7 +1662,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { ...@@ -1676,7 +1662,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
if ((pMetaMsg->tableType != TSDB_SUPER_TABLE) && if ((pMetaMsg->tableType != TSDB_SUPER_TABLE) &&
(pMetaMsg->tid <= 0 || pMetaMsg->vgroup.vgId < 2 || pMetaMsg->vgroup.numOfEps <= 0)) { (pMetaMsg->tid <= 0 || pMetaMsg->vgroup.vgId < 2 || pMetaMsg->vgroup.numOfEps <= 0)) {
tscError("invalid value in table numOfEps:%d, vgId:%d tid:%d, name:%s", pMetaMsg->vgroup.numOfEps, pMetaMsg->vgroup.vgId, tscError("invalid value in table numOfEps:%d, vgId:%d tid:%d, name:%s", pMetaMsg->vgroup.numOfEps, pMetaMsg->vgroup.vgId,
pMetaMsg->tid, pMetaMsg->tableId); pMetaMsg->tid, pMetaMsg->tableId);
return TSDB_CODE_TSC_INVALID_VALUE; return TSDB_CODE_TSC_INVALID_VALUE;
} }
...@@ -1711,7 +1697,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { ...@@ -1711,7 +1697,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
size_t size = 0; size_t size = 0;
STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size); STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
// todo add one more function: taosAddDataIfNotExists(); // todo add one more function: taosAddDataIfNotExists();
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
assert(pTableMetaInfo->pTableMeta == NULL); assert(pTableMetaInfo->pTableMeta == NULL);
...@@ -1726,7 +1712,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { ...@@ -1726,7 +1712,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
tscDebug("%p recv table meta, uid:%"PRId64 ", tid:%d, name:%s", pSql, pTableMeta->id.uid, pTableMeta->id.tid, pTableMetaInfo->name); tscDebug("%p recv table meta, uid:%"PRId64 ", tid:%d, name:%s", pSql, pTableMeta->id.uid, pTableMeta->id.tid, pTableMetaInfo->name);
free(pTableMeta); free(pTableMeta);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1829,19 +1815,19 @@ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) { ...@@ -1829,19 +1815,19 @@ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
// pMeta->index = 0; // pMeta->index = 0;
// (void)taosCachePut(tscMetaCache, pMeta->tableId, (char *)pMeta, size, tsTableMetaKeepTimer); // (void)taosCachePut(tscMetaCache, pMeta->tableId, (char *)pMeta, size, tsTableMetaKeepTimer);
// } // }
} }
pSql->res.code = TSDB_CODE_SUCCESS; pSql->res.code = TSDB_CODE_SUCCESS;
pSql->res.numOfTotal = i; pSql->res.numOfTotal = i;
tscDebug("%p load multi-metermeta resp from complete num:%d", pSql, pSql->res.numOfTotal); tscDebug("%p load multi-metermeta resp from complete num:%d", pSql, pSql->res.numOfTotal);
#endif #endif
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int tscProcessSTableVgroupRsp(SSqlObj *pSql) { int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
SSqlRes* pRes = &pSql->res; SSqlRes* pRes = &pSql->res;
// NOTE: the order of several table must be preserved. // NOTE: the order of several table must be preserved.
SSTableVgroupRspMsg *pStableVgroup = (SSTableVgroupRspMsg *)pRes->pRsp; SSTableVgroupRspMsg *pStableVgroup = (SSTableVgroupRspMsg *)pRes->pRsp;
pStableVgroup->numOfTables = htonl(pStableVgroup->numOfTables); pStableVgroup->numOfTables = htonl(pStableVgroup->numOfTables);
...@@ -1850,7 +1836,7 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) { ...@@ -1850,7 +1836,7 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
// master sqlObj locates in param // master sqlObj locates in param
SSqlObj* parent = pSql->param; SSqlObj* parent = pSql->param;
assert(parent != NULL); assert(parent != NULL);
SSqlCmd* pCmd = &parent->cmd; SSqlCmd* pCmd = &parent->cmd;
for(int32_t i = 0; i < pStableVgroup->numOfTables; ++i) { for(int32_t i = 0; i < pStableVgroup->numOfTables; ++i) {
STableMetaInfo *pInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i); STableMetaInfo *pInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
...@@ -1883,7 +1869,7 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) { ...@@ -1883,7 +1869,7 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
pMsg += size; pMsg += size;
} }
return pSql->res.code; return pSql->res.code;
} }
...@@ -1928,7 +1914,7 @@ int tscProcessShowRsp(SSqlObj *pSql) { ...@@ -1928,7 +1914,7 @@ int tscProcessShowRsp(SSqlObj *pSql) {
size_t size = 0; size_t size = 0;
STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size); STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
pTableMetaInfo->pTableMeta = taosCachePut(tscMetaCache, key, strlen(key), (char *)pTableMeta, size, pTableMetaInfo->pTableMeta = taosCachePut(tscMetaCache, key, strlen(key), (char *)pTableMeta, size,
tsTableMetaKeepTimer * 1000); tsTableMetaKeepTimer * 1000);
SSchema *pTableSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta); SSchema *pTableSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
...@@ -1936,33 +1922,33 @@ int tscProcessShowRsp(SSqlObj *pSql) { ...@@ -1936,33 +1922,33 @@ int tscProcessShowRsp(SSqlObj *pSql) {
if (pQueryInfo->colList == NULL) { if (pQueryInfo->colList == NULL) {
pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES); pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
} }
SFieldInfo* pFieldInfo = &pQueryInfo->fieldsInfo; SFieldInfo* pFieldInfo = &pQueryInfo->fieldsInfo;
SColumnIndex index = {0}; SColumnIndex index = {0};
pSchema = pMetaMsg->schema; pSchema = pMetaMsg->schema;
for (int16_t i = 0; i < pMetaMsg->numOfColumns; ++i, ++pSchema) { for (int16_t i = 0; i < pMetaMsg->numOfColumns; ++i, ++pSchema) {
index.columnIndex = i; index.columnIndex = i;
tscColumnListInsert(pQueryInfo->colList, &index); tscColumnListInsert(pQueryInfo->colList, &index);
TAOS_FIELD f = tscCreateField(pSchema->type, pSchema->name, pSchema->bytes); TAOS_FIELD f = tscCreateField(pSchema->type, pSchema->name, pSchema->bytes);
SInternalField* pInfo = tscFieldInfoAppend(pFieldInfo, &f); SInternalField* pInfo = tscFieldInfoAppend(pFieldInfo, &f);
pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index,
pTableSchema[i].type, pTableSchema[i].bytes, pTableSchema[i].bytes, false); pTableSchema[i].type, pTableSchema[i].bytes, pTableSchema[i].bytes, false);
} }
pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutput; pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
tscFieldInfoUpdateOffset(pQueryInfo); tscFieldInfoUpdateOffset(pQueryInfo);
tfree(pTableMeta); tfree(pTableMeta);
return 0; return 0;
} }
// TODO multithread problem // TODO multithread problem
static void createHBObj(STscObj* pObj) { static void createHBObj(STscObj* pObj) {
if (pObj->pHb != NULL) { if (pObj->hbrid != 0) {
return; return;
} }
...@@ -1992,7 +1978,7 @@ static void createHBObj(STscObj* pObj) { ...@@ -1992,7 +1978,7 @@ static void createHBObj(STscObj* pObj) {
registerSqlObj(pSql); registerSqlObj(pSql);
tscDebug("%p HB is allocated, pObj:%p", pSql, pObj); tscDebug("%p HB is allocated, pObj:%p", pSql, pObj);
pObj->pHb = pSql; pObj->hbrid = pSql->self;
} }
int tscProcessConnectRsp(SSqlObj *pSql) { int tscProcessConnectRsp(SSqlObj *pSql) {
...@@ -2007,7 +1993,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) { ...@@ -2007,7 +1993,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) {
assert(len <= sizeof(pObj->db)); assert(len <= sizeof(pObj->db));
tstrncpy(pObj->db, temp, sizeof(pObj->db)); tstrncpy(pObj->db, temp, sizeof(pObj->db));
if (pConnect->epSet.numOfEps > 0) { if (pConnect->epSet.numOfEps > 0) {
tscEpSetHtons(&pConnect->epSet); tscEpSetHtons(&pConnect->epSet);
tscUpdateMgmtEpSet(&pConnect->epSet); tscUpdateMgmtEpSet(&pConnect->epSet);
...@@ -2125,18 +2111,18 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { ...@@ -2125,18 +2111,18 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
pRes->useconds = htobe64(pRetrieve->useconds); pRes->useconds = htobe64(pRetrieve->useconds);
pRes->completed = (pRetrieve->completed == 1); pRes->completed = (pRetrieve->completed == 1);
pRes->data = pRetrieve->data; pRes->data = pRetrieve->data;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) { if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
return pRes->code; return pRes->code;
} }
if (pSql->pSubscription != NULL) { if (pSql->pSubscription != NULL) {
int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput; int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, numOfCols - 1); TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, numOfCols - 1);
int16_t offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1); int16_t offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1);
char* p = pRes->data + (pField->bytes + offset) * pRes->numOfRows; char* p = pRes->data + (pField->bytes + offset) * pRes->numOfRows;
int32_t numOfTables = htonl(*(int32_t*)p); int32_t numOfTables = htonl(*(int32_t*)p);
...@@ -2209,16 +2195,16 @@ int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) { ...@@ -2209,16 +2195,16 @@ int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
if (pTableMetaInfo->pTableMeta != NULL) { if (pTableMetaInfo->pTableMeta != NULL) {
taosCacheRelease(tscMetaCache, (void **)&(pTableMetaInfo->pTableMeta), false); taosCacheRelease(tscMetaCache, (void **)&(pTableMetaInfo->pTableMeta), false);
} }
pTableMetaInfo->pTableMeta = (STableMeta *)taosCacheAcquireByKey(tscMetaCache, pTableMetaInfo->name, strlen(pTableMetaInfo->name)); pTableMetaInfo->pTableMeta = (STableMeta *)taosCacheAcquireByKey(tscMetaCache, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
if (pTableMetaInfo->pTableMeta != NULL) { if (pTableMetaInfo->pTableMeta != NULL) {
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
tscDebug("%p retrieve table Meta from cache, the number of columns:%d, numOfTags:%d, %p", pSql, tinfo.numOfColumns, tscDebug("%p retrieve table Meta from cache, the number of columns:%d, numOfTags:%d, %p", pSql, tinfo.numOfColumns,
tinfo.numOfTags, pTableMetaInfo->pTableMeta); tinfo.numOfTags, pTableMetaInfo->pTableMeta);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
return getTableMetaFromMgmt(pSql, pTableMetaInfo); return getTableMetaFromMgmt(pSql, pTableMetaInfo);
} }
...@@ -2242,7 +2228,7 @@ int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) { ...@@ -2242,7 +2228,7 @@ int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) {
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
if (pTableMetaInfo->pTableMeta) { if (pTableMetaInfo->pTableMeta) {
tscDebug("%p update table meta, old meta numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql, tscDebug("%p update table meta, old meta numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->id.uid, pTableMeta); tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->id.uid, pTableMeta);
} }
taosCacheRelease(tscMetaCache, (void **)&(pTableMetaInfo->pTableMeta), true); taosCacheRelease(tscMetaCache, (void **)&(pTableMetaInfo->pTableMeta), true);
...@@ -2257,7 +2243,7 @@ static bool allVgroupInfoRetrieved(SSqlCmd* pCmd, int32_t clauseIndex) { ...@@ -2257,7 +2243,7 @@ static bool allVgroupInfoRetrieved(SSqlCmd* pCmd, int32_t clauseIndex) {
return false; return false;
} }
} }
// all super tables vgroupinfo are retrieved, no need to retrieve vgroup info anymore // all super tables vgroupinfo are retrieved, no need to retrieve vgroup info anymore
return true; return true;
} }
...@@ -2265,7 +2251,7 @@ static bool allVgroupInfoRetrieved(SSqlCmd* pCmd, int32_t clauseIndex) { ...@@ -2265,7 +2251,7 @@ static bool allVgroupInfoRetrieved(SSqlCmd* pCmd, int32_t clauseIndex) {
int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) { int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
int code = TSDB_CODE_RPC_NETWORK_UNAVAIL; int code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
if (allVgroupInfoRetrieved(pCmd, clauseIndex)) { if (allVgroupInfoRetrieved(pCmd, clauseIndex)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -2282,7 +2268,7 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) { ...@@ -2282,7 +2268,7 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
tscFreeSqlObj(pNew); tscFreeSqlObj(pNew);
return code; return code;
} }
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
STableMetaInfo *pMInfo = tscGetMetaInfo(pQueryInfo, i); STableMetaInfo *pMInfo = tscGetMetaInfo(pQueryInfo, i);
...@@ -2377,7 +2363,7 @@ void tscInitMsgsFp() { ...@@ -2377,7 +2363,7 @@ void tscInitMsgsFp() {
tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_TABLE] = tscProcessShowCreateRsp; tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_TABLE] = tscProcessShowCreateRsp;
tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_DATABASE] = tscProcessShowCreateRsp; tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_DATABASE] = tscProcessShowCreateRsp;
tscKeepConn[TSDB_SQL_SHOW] = 1; tscKeepConn[TSDB_SQL_SHOW] = 1;
tscKeepConn[TSDB_SQL_RETRIEVE] = 1; tscKeepConn[TSDB_SQL_RETRIEVE] = 1;
......
...@@ -277,8 +277,8 @@ void taos_close(TAOS *taos) { ...@@ -277,8 +277,8 @@ void taos_close(TAOS *taos) {
pObj->signature = NULL; pObj->signature = NULL;
taosTmrStopA(&(pObj->pTimer)); taosTmrStopA(&(pObj->pTimer));
SSqlObj* pHb = pObj->pHb; SSqlObj* pHb = (SSqlObj*)taosAcquireRef(tscObjRef, pObj->hbrid);
if (pHb != NULL && atomic_val_compare_exchange_ptr(&pObj->pHb, pHb, 0) == pHb) { if (pHb != NULL) {
if (pHb->rpcRid > 0) { // wait for rsp from dnode if (pHb->rpcRid > 0) { // wait for rsp from dnode
rpcCancelRequest(pHb->rpcRid); rpcCancelRequest(pHb->rpcRid);
pHb->rpcRid = -1; pHb->rpcRid = -1;
...@@ -286,6 +286,7 @@ void taos_close(TAOS *taos) { ...@@ -286,6 +286,7 @@ void taos_close(TAOS *taos) {
tscDebug("%p HB is freed", pHb); tscDebug("%p HB is freed", pHb);
taos_free_result(pHb); taos_free_result(pHb);
taosReleaseRef(tscObjRef, pHb->self);
} }
int32_t ref = T_REF_DEC(pObj); int32_t ref = T_REF_DEC(pObj);
...@@ -645,8 +646,7 @@ void taos_free_result(TAOS_RES *res) { ...@@ -645,8 +646,7 @@ void taos_free_result(TAOS_RES *res) {
bool freeNow = tscKillQueryInDnode(pSql); bool freeNow = tscKillQueryInDnode(pSql);
if (freeNow) { if (freeNow) {
tscDebug("%p free sqlObj in cache", pSql); tscDebug("%p free sqlObj in cache", pSql);
SSqlObj** p = pSql->self; taosReleaseRef(tscObjRef, pSql->self);
taosCacheRelease(tscObjCache, (void**) &p, true);
} }
} }
...@@ -738,15 +738,8 @@ static void tscKillSTableQuery(SSqlObj *pSql) { ...@@ -738,15 +738,8 @@ static void tscKillSTableQuery(SSqlObj *pSql) {
if (pSub == NULL) { if (pSub == NULL) {
continue; continue;
} }
//SSqlObj* pHb = (SSqlObj*)taosAcquireRef(tscObjRef, pObj->hbrid);
void** p = taosCacheAcquireByKey(tscObjCache, &pSub, sizeof(TSDB_CACHE_PTR_TYPE)); SSqlObj *pSubObj = pSub;
if (p == NULL) {
continue;
}
SSqlObj* pSubObj = (SSqlObj*) (*p);
assert(pSubObj->self == (SSqlObj**) p);
pSubObj->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; pSubObj->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
if (pSubObj->rpcRid > 0) { if (pSubObj->rpcRid > 0) {
rpcCancelRequest(pSubObj->rpcRid); rpcCancelRequest(pSubObj->rpcRid);
...@@ -754,7 +747,7 @@ static void tscKillSTableQuery(SSqlObj *pSql) { ...@@ -754,7 +747,7 @@ static void tscKillSTableQuery(SSqlObj *pSql) {
} }
tscQueueAsyncRes(pSubObj); tscQueueAsyncRes(pSubObj);
taosCacheRelease(tscObjCache, (void**) &p, false); taosReleaseRef(tscObjRef, pSubObj->self);
} }
tscDebug("%p super table query cancelled", pSql); tscDebug("%p super table query cancelled", pSql);
......
...@@ -179,8 +179,9 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* ...@@ -179,8 +179,9 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
fail: fail:
tscError("tscCreateSubscription failed at line %d, reason: %s", line, tstrerror(code)); tscError("tscCreateSubscription failed at line %d, reason: %s", line, tstrerror(code));
if (pSql != NULL) { if (pSql != NULL) {
if (pSql->self != NULL) { if (pSql->self != 0) {
taos_free_result(pSql); //taos_free_result(pSql);
taosReleaseRef(tscObjRef, pSql->self);
} else { } else {
tscFreeSqlObj(pSql); tscFreeSqlObj(pSql);
} }
......
...@@ -2199,6 +2199,7 @@ int32_t tscHandleInsertRetry(SSqlObj* pSql) { ...@@ -2199,6 +2199,7 @@ int32_t tscHandleInsertRetry(SSqlObj* pSql) {
STableDataBlocks* pTableDataBlock = taosArrayGetP(pCmd->pDataBlocks, pSupporter->index); STableDataBlocks* pTableDataBlock = taosArrayGetP(pCmd->pDataBlocks, pSupporter->index);
int32_t code = tscCopyDataBlockToPayload(pSql, pTableDataBlock); int32_t code = tscCopyDataBlockToPayload(pSql, pTableDataBlock);
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
if ((pRes->code = code)!= TSDB_CODE_SUCCESS) { if ((pRes->code = code)!= TSDB_CODE_SUCCESS) {
tscQueueAsyncRes(pSql); tscQueueAsyncRes(pSql);
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#include "os.h" #include "os.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tcache.h" #include "tcache.h"
#include "tref.h"
#include "trpc.h" #include "trpc.h"
#include "tsystem.h" #include "tsystem.h"
#include "ttimer.h" #include "ttimer.h"
...@@ -31,7 +32,7 @@ ...@@ -31,7 +32,7 @@
// global, not configurable // global, not configurable
SCacheObj* tscMetaCache; SCacheObj* tscMetaCache;
SCacheObj* tscObjCache; int tscObjRef = -1;
void * tscTmr; void * tscTmr;
void * tscQhandle; void * tscQhandle;
void * tscCheckDiskUsageTmr; void * tscCheckDiskUsageTmr;
...@@ -143,7 +144,12 @@ void taos_init_imp(void) { ...@@ -143,7 +144,12 @@ void taos_init_imp(void) {
int64_t refreshTime = 10; // 10 seconds by default int64_t refreshTime = 10; // 10 seconds by default
if (tscMetaCache == NULL) { if (tscMetaCache == NULL) {
tscMetaCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, tscFreeTableMetaHelper, "tableMeta"); tscMetaCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, tscFreeTableMetaHelper, "tableMeta");
#ifndef SQLOBJ_USE_CACHE
tscObjRef = taosOpenRef(4096, tscFreeRegisteredSqlObj);
#else
tscObjCache = taosCacheInit(TSDB_CACHE_PTR_KEY, refreshTime / 2, false, tscFreeRegisteredSqlObj, "sqlObj"); tscObjCache = taosCacheInit(TSDB_CACHE_PTR_KEY, refreshTime / 2, false, tscFreeRegisteredSqlObj, "sqlObj");
#endif
} }
tscRefId = taosOpenRef(200, tscCloseTscObj); tscRefId = taosOpenRef(200, tscCloseTscObj);
...@@ -166,11 +172,11 @@ void taos_cleanup(void) { ...@@ -166,11 +172,11 @@ void taos_cleanup(void) {
taosCacheCleanup(m); taosCacheCleanup(m);
} }
m = tscObjCache; int refId = atomic_exchange_32(&tscObjRef, -1);
if (m != NULL && atomic_val_compare_exchange_ptr(&tscObjCache, m, 0) == m) { if (refId != -1) {
taosCacheCleanup(m); taosCloseRef(refId);
} }
m = tscQhandle; m = tscQhandle;
if (m != NULL && atomic_val_compare_exchange_ptr(&tscQhandle, m, 0) == m) { if (m != NULL && atomic_val_compare_exchange_ptr(&tscQhandle, m, 0) == m) {
taosCleanUpScheduler(m); taosCleanUpScheduler(m);
......
...@@ -364,18 +364,18 @@ static void tscFreeSubobj(SSqlObj* pSql) { ...@@ -364,18 +364,18 @@ static void tscFreeSubobj(SSqlObj* pSql) {
void tscFreeRegisteredSqlObj(void *pSql) { void tscFreeRegisteredSqlObj(void *pSql) {
assert(pSql != NULL); assert(pSql != NULL);
SSqlObj** p = (SSqlObj**)pSql; SSqlObj* p = *(SSqlObj**)pSql;
STscObj* pTscObj = (*p)->pTscObj; STscObj* pTscObj = p->pTscObj;
assert((*p)->self != 0 && (*p)->self == (p)); assert(p->self != 0);
tscFreeSqlObj(*p); tscFreeSqlObj(p);
int32_t ref = T_REF_DEC(pTscObj); int32_t ref = T_REF_DEC(pTscObj);
assert(ref >= 0); assert(ref >= 0);
tscDebug("%p free sqlObj completed, tscObj:%p ref:%d", *p, pTscObj, ref); tscDebug("%p free sqlObj completed, tscObj:%p ref:%d", p, pTscObj, ref);
if (ref == 0) { if (ref == 0) {
tscDebug("%p all sqlObj freed, free tscObj:%p", *p, pTscObj); tscDebug("%p all sqlObj freed, free tscObj:%p", p, pTscObj);
taosRemoveRef(tscRefId, pTscObj->rid); taosRemoveRef(tscRefId, pTscObj->rid);
} }
} }
...@@ -750,6 +750,10 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) { ...@@ -750,6 +750,10 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
dataBuf->size += (finalLen + sizeof(SSubmitBlk)); dataBuf->size += (finalLen + sizeof(SSubmitBlk));
assert(dataBuf->size <= dataBuf->nAllocSize); assert(dataBuf->size <= dataBuf->nAllocSize);
char* p = realloc(dataBuf->pData, dataBuf->size);
if (p != NULL) {
dataBuf->pData = p;
}
// the length does not include the SSubmitBlk structure // the length does not include the SSubmitBlk structure
pBlocks->dataLen = htonl(finalLen); pBlocks->dataLen = htonl(finalLen);
...@@ -1487,6 +1491,8 @@ void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SQueryInfo* pQueryInfo) { ...@@ -1487,6 +1491,8 @@ void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SQueryInfo* pQueryInfo) {
} }
} }
#ifndef SQLOBJ_USE_CACHE
#else
void tscSetFreeHeatBeat(STscObj* pObj) { void tscSetFreeHeatBeat(STscObj* pObj) {
if (pObj == NULL || pObj->signature != pObj || pObj->pHb == NULL) { if (pObj == NULL || pObj->signature != pObj || pObj->pHb == NULL) {
return; return;
...@@ -1499,6 +1505,7 @@ void tscSetFreeHeatBeat(STscObj* pObj) { ...@@ -1499,6 +1505,7 @@ void tscSetFreeHeatBeat(STscObj* pObj) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pHeatBeat->cmd, 0); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pHeatBeat->cmd, 0);
pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE; pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE;
} }
#endif
/* /*
* the following four kinds of SqlObj should not be freed * the following four kinds of SqlObj should not be freed
...@@ -1518,7 +1525,7 @@ bool tscShouldBeFreed(SSqlObj* pSql) { ...@@ -1518,7 +1525,7 @@ bool tscShouldBeFreed(SSqlObj* pSql) {
} }
STscObj* pTscObj = pSql->pTscObj; STscObj* pTscObj = pSql->pTscObj;
if (pSql->pStream != NULL || pTscObj->pHb == pSql || pSql->pSubscription != NULL) { if (pSql->pStream != NULL || pTscObj->hbrid == pSql->self || pSql->pSubscription != NULL) {
return false; return false;
} }
...@@ -1809,13 +1816,14 @@ void tscResetForNextRetrieve(SSqlRes* pRes) { ...@@ -1809,13 +1816,14 @@ void tscResetForNextRetrieve(SSqlRes* pRes) {
} }
void registerSqlObj(SSqlObj* pSql) { void registerSqlObj(SSqlObj* pSql) {
int32_t DEFAULT_LIFE_TIME = 2 * 600 * 1000; // 1200 sec //int32_t DEFAULT_LIFE_TIME = 2 * 600 * 1000; // 1200 sec
int32_t ref = T_REF_INC(pSql->pTscObj); int32_t ref = T_REF_INC(pSql->pTscObj);
tscDebug("%p add to tscObj:%p, ref:%d", pSql, pSql->pTscObj, ref); tscDebug("%p add to tscObj:%p, ref:%d", pSql, pSql->pTscObj, ref);
TSDB_CACHE_PTR_TYPE p = (TSDB_CACHE_PTR_TYPE) pSql; //TSDB_CACHE_PTR_TYPE p = (TSDB_CACHE_PTR_TYPE) pSql;
pSql->self = taosCachePut(tscObjCache, &p, sizeof(TSDB_CACHE_PTR_TYPE), &p, sizeof(TSDB_CACHE_PTR_TYPE), DEFAULT_LIFE_TIME); //pSql->self = taosCachePut(tscObjCache, &p, sizeof(TSDB_CACHE_PTR_TYPE), &p, sizeof(TSDB_CACHE_PTR_TYPE), DEFAULT_LIFE_TIME);
pSql->self = taosAddRef(tscObjRef, pSql);
} }
SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cmd) { SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cmd) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册