提交 a39fbb1f 编写于 作者: H Haojun Liao

[td-1319]

上级 6e3affb9
...@@ -334,6 +334,7 @@ typedef struct STscObj { ...@@ -334,6 +334,7 @@ typedef struct STscObj {
struct SSqlStream *streamList; struct SSqlStream *streamList;
void* pDnodeConn; void* pDnodeConn;
pthread_mutex_t mutex; pthread_mutex_t mutex;
T_REF_DECLARE();
} STscObj; } STscObj;
typedef struct SSqlObj { typedef struct SSqlObj {
...@@ -503,7 +504,7 @@ static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pField ...@@ -503,7 +504,7 @@ static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pField
} }
} }
extern SCacheObj* tscCacheHandle; extern SCacheObj* tscMetaCache;
extern SCacheObj* tscObjCache; extern SCacheObj* tscObjCache;
extern void * tscTmr; extern void * tscTmr;
extern void * tscQhandle; extern void * tscQhandle;
......
...@@ -51,6 +51,11 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const ...@@ -51,6 +51,11 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const
pSql->fp = fp; pSql->fp = fp;
pSql->fetchFp = fp; pSql->fetchFp = fp;
uint64_t handle = (uint64_t) pSql;
pSql->self = taosCachePut(tscObjCache, &handle, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000);
T_REF_INC(pSql->pTscObj);
pSql->sqlstr = calloc(1, sqlLen + 1); pSql->sqlstr = calloc(1, sqlLen + 1);
if (pSql->sqlstr == NULL) { if (pSql->sqlstr == NULL) {
tscError("%p failed to malloc sql string buffer", pSql); tscError("%p failed to malloc sql string buffer", pSql);
...@@ -64,9 +69,6 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const ...@@ -64,9 +69,6 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const
tscDebugL("%p SQL: %s", pSql, pSql->sqlstr); tscDebugL("%p SQL: %s", pSql, pSql->sqlstr);
pCmd->curSql = pSql->sqlstr; pCmd->curSql = pSql->sqlstr;
uint64_t handle = (uint64_t) pSql;
pSql->self = taosCachePut(tscObjCache, &handle, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000);
int32_t code = tsParseSql(pSql, true); int32_t code = tsParseSql(pSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return; if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return;
......
...@@ -429,7 +429,7 @@ int tscProcessLocalCmd(SSqlObj *pSql) { ...@@ -429,7 +429,7 @@ int tscProcessLocalCmd(SSqlObj *pSql) {
pRes->qhandle = 0x1; pRes->qhandle = 0x1;
pRes->numOfRows = 0; pRes->numOfRows = 0;
} else if (pCmd->command == TSDB_SQL_RESET_CACHE) { } else if (pCmd->command == TSDB_SQL_RESET_CACHE) {
taosCacheEmpty(tscCacheHandle); taosCacheEmpty(tscMetaCache);
pRes->code = TSDB_CODE_SUCCESS; pRes->code = TSDB_CODE_SUCCESS;
} else if (pCmd->command == TSDB_SQL_SERV_VERSION) { } else if (pCmd->command == TSDB_SQL_SERV_VERSION) {
pRes->code = tscProcessServerVer(pSql); pRes->code = tscProcessServerVer(pSql);
......
...@@ -1181,13 +1181,14 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t ...@@ -1181,13 +1181,14 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t
} END_TRY } END_TRY
len = tbufTell(&bw); len = tbufTell(&bw);
char* c = tbufGetData(&bw, true); char* c = tbufGetData(&bw, false);
// set the serialized binary string as the parameter of arithmetic expression // set the serialized binary string as the parameter of arithmetic expression
addExprParams(pExpr, c, TSDB_DATA_TYPE_BINARY, (int32_t)len, index.tableIndex); addExprParams(pExpr, c, TSDB_DATA_TYPE_BINARY, (int32_t)len, index.tableIndex);
insertResultField(pQueryInfo, exprIndex, &columnList, sizeof(double), TSDB_DATA_TYPE_DOUBLE, pExpr->aliasName, pExpr); insertResultField(pQueryInfo, exprIndex, &columnList, sizeof(double), TSDB_DATA_TYPE_DOUBLE, pExpr->aliasName, pExpr);
tbufCloseWriter(&bw);
taosArrayDestroy(colList); taosArrayDestroy(colList);
tExprTreeDestroy(&pNode, NULL); tExprTreeDestroy(&pNode, NULL);
} else { } else {
......
...@@ -373,17 +373,17 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { ...@@ -373,17 +373,17 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql); rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
} }
bool shouldFree = tscShouldBeFreed(pSql);; bool shouldFree = tscShouldBeFreed(pSql);
if (rpcMsg->code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) { if (rpcMsg->code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
rpcMsg->code = (pRes->code == TSDB_CODE_SUCCESS) ? (int32_t)pRes->numOfRows : pRes->code; rpcMsg->code = (pRes->code == TSDB_CODE_SUCCESS) ? (int32_t)pRes->numOfRows : pRes->code;
(*pSql->fp)(pSql->param, pSql, rpcMsg->code); (*pSql->fp)(pSql->param, pSql, rpcMsg->code);
} }
void** p1 = p; void** p1 = p;
taosCacheRelease(tscObjCache, (void**) &p, false); taosCacheRelease(tscObjCache, (void**) &p1, false);
if (shouldFree) { if (shouldFree) { // in case of table-meta/vgrouplist query, automatically free it
taosCacheRelease(tscObjCache, (void **)&p1, true); taosCacheRelease(tscObjCache, (void **)&p, true);
tscDebug("%p sqlObj is automatically freed", pSql); tscDebug("%p sqlObj is automatically freed", pSql);
} }
...@@ -1718,7 +1718,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { ...@@ -1718,7 +1718,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
assert(pTableMetaInfo->pTableMeta == NULL); assert(pTableMetaInfo->pTableMeta == NULL);
pTableMetaInfo->pTableMeta = (STableMeta *) taosCachePut(tscCacheHandle, pTableMetaInfo->name, pTableMetaInfo->pTableMeta = (STableMeta *) taosCachePut(tscMetaCache, pTableMetaInfo->name,
strlen(pTableMetaInfo->name), pTableMeta, size, tsTableMetaKeepTimer * 1000); strlen(pTableMetaInfo->name), pTableMeta, size, tsTableMetaKeepTimer * 1000);
// todo handle out of memory case // todo handle out of memory case
...@@ -1830,7 +1830,7 @@ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) { ...@@ -1830,7 +1830,7 @@ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
// int32_t size = (int32_t)(rsp - ((char *)pMeta)); // Consistent with STableMeta in cache // int32_t size = (int32_t)(rsp - ((char *)pMeta)); // Consistent with STableMeta in cache
// //
// pMeta->index = 0; // pMeta->index = 0;
// (void)taosCachePut(tscCacheHandle, pMeta->tableId, (char *)pMeta, size, tsTableMetaKeepTimer); // (void)taosCachePut(tscMetaCache, pMeta->tableId, (char *)pMeta, size, tsTableMetaKeepTimer);
// } // }
} }
...@@ -1917,12 +1917,14 @@ int tscProcessShowRsp(SSqlObj *pSql) { ...@@ -1917,12 +1917,14 @@ int tscProcessShowRsp(SSqlObj *pSql) {
key[0] = pCmd->msgType + 'a'; key[0] = pCmd->msgType + 'a';
strcpy(key + 1, "showlist"); strcpy(key + 1, "showlist");
taosCacheRelease(tscCacheHandle, (void *)&(pTableMetaInfo->pTableMeta), false); if (pTableMetaInfo->pTableMeta != NULL) {
taosCacheRelease(tscMetaCache, (void *)&(pTableMetaInfo->pTableMeta), false);
}
size_t size = 0; size_t size = 0;
STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size); STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
pTableMetaInfo->pTableMeta = taosCachePut(tscCacheHandle, key, strlen(key), (char *)pTableMeta, size, pTableMetaInfo->pTableMeta = taosCachePut(tscMetaCache, key, strlen(key), (char *)pTableMeta, size,
tsTableMetaKeepTimer * 1000); tsTableMetaKeepTimer * 1000);
SSchema *pTableSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta); SSchema *pTableSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
...@@ -1981,6 +1983,8 @@ static void createHBObj(STscObj* pObj) { ...@@ -1981,6 +1983,8 @@ static void createHBObj(STscObj* pObj) {
pSql->pTscObj = pObj; pSql->pTscObj = pObj;
pSql->signature = pSql; pSql->signature = pSql;
pObj->pHb = pSql; pObj->pHb = pSql;
T_REF_INC(pObj);
tscAddSubqueryInfo(&pObj->pHb->cmd); tscAddSubqueryInfo(&pObj->pHb->cmd);
tscDebug("%p HB is allocated, pObj:%p", pObj->pHb, pObj); tscDebug("%p HB is allocated, pObj:%p", pObj->pHb, pObj);
...@@ -2025,14 +2029,14 @@ int tscProcessUseDbRsp(SSqlObj *pSql) { ...@@ -2025,14 +2029,14 @@ int tscProcessUseDbRsp(SSqlObj *pSql) {
int tscProcessDropDbRsp(SSqlObj *pSql) { int tscProcessDropDbRsp(SSqlObj *pSql) {
pSql->pTscObj->db[0] = 0; pSql->pTscObj->db[0] = 0;
taosCacheEmpty(tscCacheHandle); taosCacheEmpty(tscMetaCache);
return 0; return 0;
} }
int tscProcessDropTableRsp(SSqlObj *pSql) { int tscProcessDropTableRsp(SSqlObj *pSql) {
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
STableMeta *pTableMeta = taosCacheAcquireByKey(tscCacheHandle, pTableMetaInfo->name, strlen(pTableMetaInfo->name)); STableMeta *pTableMeta = taosCacheAcquireByKey(tscMetaCache, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
if (pTableMeta == NULL) { /* not in cache, abort */ if (pTableMeta == NULL) { /* not in cache, abort */
return 0; return 0;
} }
...@@ -2045,10 +2049,10 @@ int tscProcessDropTableRsp(SSqlObj *pSql) { ...@@ -2045,10 +2049,10 @@ int tscProcessDropTableRsp(SSqlObj *pSql) {
* instead. * instead.
*/ */
tscDebug("%p force release table meta after drop table:%s", pSql, pTableMetaInfo->name); tscDebug("%p force release table meta after drop table:%s", pSql, pTableMetaInfo->name);
taosCacheRelease(tscCacheHandle, (void **)&pTableMeta, true); taosCacheRelease(tscMetaCache, (void **)&pTableMeta, true);
if (pTableMetaInfo->pTableMeta) { if (pTableMetaInfo->pTableMeta) {
taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true); taosCacheRelease(tscMetaCache, (void **)&(pTableMetaInfo->pTableMeta), true);
} }
return 0; return 0;
...@@ -2057,21 +2061,21 @@ int tscProcessDropTableRsp(SSqlObj *pSql) { ...@@ -2057,21 +2061,21 @@ int tscProcessDropTableRsp(SSqlObj *pSql) {
int tscProcessAlterTableMsgRsp(SSqlObj *pSql) { int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
STableMeta *pTableMeta = taosCacheAcquireByKey(tscCacheHandle, pTableMetaInfo->name, strlen(pTableMetaInfo->name)); STableMeta *pTableMeta = taosCacheAcquireByKey(tscMetaCache, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
if (pTableMeta == NULL) { /* not in cache, abort */ if (pTableMeta == NULL) { /* not in cache, abort */
return 0; return 0;
} }
tscDebug("%p force release metermeta in cache after alter-table: %s", pSql, pTableMetaInfo->name); tscDebug("%p force release metermeta in cache after alter-table: %s", pSql, pTableMetaInfo->name);
taosCacheRelease(tscCacheHandle, (void **)&pTableMeta, true); taosCacheRelease(tscMetaCache, (void **)&pTableMeta, true);
if (pTableMetaInfo->pTableMeta) { if (pTableMetaInfo->pTableMeta) {
bool isSuperTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo); bool isSuperTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true); taosCacheRelease(tscMetaCache, (void **)&(pTableMetaInfo->pTableMeta), true);
if (isSuperTable) { // if it is a super table, reset whole query cache if (isSuperTable) { // if it is a super table, reset whole query cache
tscDebug("%p reset query cache since table:%s is stable", pSql, pTableMetaInfo->name); tscDebug("%p reset query cache since table:%s is stable", pSql, pTableMetaInfo->name);
taosCacheEmpty(tscCacheHandle); taosCacheEmpty(tscMetaCache);
} }
} }
...@@ -2156,6 +2160,12 @@ static int32_t getTableMetaFromMgmt(SSqlObj *pSql, STableMetaInfo *pTableMetaInf ...@@ -2156,6 +2160,12 @@ static int32_t getTableMetaFromMgmt(SSqlObj *pSql, STableMetaInfo *pTableMetaInf
pNew->signature = pNew; pNew->signature = pNew;
pNew->cmd.command = TSDB_SQL_META; pNew->cmd.command = TSDB_SQL_META;
T_REF_INC(pNew->pTscObj);
// TODO add test case on x86 platform
uint64_t adr = (uint64_t) pNew;
pNew->self = taosCachePut(tscObjCache, &adr, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2*60*1000);
tscAddSubqueryInfo(&pNew->cmd); tscAddSubqueryInfo(&pNew->cmd);
SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetailSafely(&pNew->cmd, 0); SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetailSafely(&pNew->cmd, 0);
...@@ -2179,10 +2189,6 @@ static int32_t getTableMetaFromMgmt(SSqlObj *pSql, STableMetaInfo *pTableMetaInf ...@@ -2179,10 +2189,6 @@ static int32_t getTableMetaFromMgmt(SSqlObj *pSql, STableMetaInfo *pTableMetaInf
pNew->fp = tscTableMetaCallBack; pNew->fp = tscTableMetaCallBack;
pNew->param = pSql; pNew->param = pSql;
// TODO add test case on x86 platform
uint64_t adr = (uint64_t) pNew;
pNew->self = taosCachePut(tscObjCache, &adr, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2*60*1000);
int32_t code = tscProcessSql(pNew); int32_t code = tscProcessSql(pNew);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
code = TSDB_CODE_TSC_ACTION_IN_PROGRESS; // notify upper application that current process need to be terminated code = TSDB_CODE_TSC_ACTION_IN_PROGRESS; // notify upper application that current process need to be terminated
...@@ -2196,10 +2202,10 @@ int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) { ...@@ -2196,10 +2202,10 @@ int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
// If this STableMetaInfo owns a table meta, release it first // If this STableMetaInfo owns a table meta, release it first
if (pTableMetaInfo->pTableMeta != NULL) { if (pTableMetaInfo->pTableMeta != NULL) {
taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), false); taosCacheRelease(tscMetaCache, (void **)&(pTableMetaInfo->pTableMeta), false);
} }
pTableMetaInfo->pTableMeta = (STableMeta *)taosCacheAcquireByKey(tscCacheHandle, pTableMetaInfo->name, strlen(pTableMetaInfo->name)); pTableMetaInfo->pTableMeta = (STableMeta *)taosCacheAcquireByKey(tscMetaCache, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
if (pTableMetaInfo->pTableMeta != NULL) { if (pTableMetaInfo->pTableMeta != NULL) {
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
tscDebug("%p retrieve table Meta from cache, the number of columns:%d, numOfTags:%d, %p", pSql, tinfo.numOfColumns, tscDebug("%p retrieve table Meta from cache, the number of columns:%d, numOfTags:%d, %p", pSql, tinfo.numOfColumns,
...@@ -2234,7 +2240,7 @@ int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) { ...@@ -2234,7 +2240,7 @@ int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) {
tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->id.uid, pTableMeta); tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->id.uid, pTableMeta);
} }
taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true); taosCacheRelease(tscMetaCache, (void **)&(pTableMetaInfo->pTableMeta), true);
return getTableMetaFromMgmt(pSql, pTableMetaInfo); return getTableMetaFromMgmt(pSql, pTableMetaInfo);
} }
...@@ -2264,7 +2270,8 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) { ...@@ -2264,7 +2270,8 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
pNew->signature = pNew; pNew->signature = pNew;
pNew->cmd.command = TSDB_SQL_STABLEVGROUP; pNew->cmd.command = TSDB_SQL_STABLEVGROUP;
// TODO TEST IT
SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetailSafely(&pNew->cmd, 0); SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetailSafely(&pNew->cmd, 0);
if (pNewQueryInfo == NULL) { if (pNewQueryInfo == NULL) {
tscFreeSqlObj(pNew); tscFreeSqlObj(pNew);
...@@ -2274,7 +2281,7 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) { ...@@ -2274,7 +2281,7 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
STableMetaInfo *pMInfo = tscGetMetaInfo(pQueryInfo, i); STableMetaInfo *pMInfo = tscGetMetaInfo(pQueryInfo, i);
STableMeta *pTableMeta = taosCacheAcquireByData(tscCacheHandle, pMInfo->pTableMeta); STableMeta *pTableMeta = taosCacheAcquireByData(tscMetaCache, pMInfo->pTableMeta);
tscAddTableMetaInfo(pNewQueryInfo, pMInfo->name, pTableMeta, NULL, pMInfo->tagColList); tscAddTableMetaInfo(pNewQueryInfo, pMInfo->name, pTableMeta, NULL, pMInfo->tagColList);
} }
...@@ -2284,6 +2291,7 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) { ...@@ -2284,6 +2291,7 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
} }
pNewQueryInfo->numOfTables = pQueryInfo->numOfTables; pNewQueryInfo->numOfTables = pQueryInfo->numOfTables;
T_REF_INC(pNew->pTscObj);
uint64_t p = (uint64_t) pNew; uint64_t p = (uint64_t) pNew;
pNew->self = taosCachePut(tscObjCache, &p, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2 * 600 * 1000); pNew->self = taosCachePut(tscObjCache, &p, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2 * 600 * 1000);
......
...@@ -102,6 +102,7 @@ SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con ...@@ -102,6 +102,7 @@ SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
pObj->signature = pObj; pObj->signature = pObj;
pObj->pDnodeConn = pDnodeConn; pObj->pDnodeConn = pDnodeConn;
T_REF_INIT_VAL(pObj, 1);
tstrncpy(pObj->user, user, sizeof(pObj->user)); tstrncpy(pObj->user, user, sizeof(pObj->user));
secretEncryptLen = MIN(secretEncryptLen, sizeof(pObj->pass)); secretEncryptLen = MIN(secretEncryptLen, sizeof(pObj->pass));
...@@ -155,6 +156,8 @@ SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con ...@@ -155,6 +156,8 @@ SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
*taos = pObj; *taos = pObj;
} }
T_REF_INC(pSql->pTscObj);
uint64_t key = (uint64_t) pSql; uint64_t key = (uint64_t) pSql;
pSql->self = taosCachePut(tscObjCache, &key, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000); pSql->self = taosCachePut(tscObjCache, &key, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000);
tsInsertHeadSize = sizeof(SMsgDesc) + sizeof(SSubmitMsg); tsInsertHeadSize = sizeof(SMsgDesc) + sizeof(SSubmitMsg);
...@@ -261,6 +264,31 @@ void taos_close(TAOS *taos) { ...@@ -261,6 +264,31 @@ void taos_close(TAOS *taos) {
tscFreeSqlObj(pObj->pHb); tscFreeSqlObj(pObj->pHb);
} }
// free all sqlObjs created by using this connect before free the STscObj
while(1) {
pthread_mutex_lock(&pObj->mutex);
void* p = pObj->sqlList;
pthread_mutex_unlock(&pObj->mutex);
if (p == NULL) {
break;
}
tscDebug("%p waiting for sqlObj to be freed, %p", pObj, p);
taosMsleep(100);
// todo fix me!! two threads call taos_free_result will cause problem.
tscDebug("%p free :%p", pObj, p);
taos_free_result(p);
}
int32_t ref = T_REF_DEC(pObj);
assert(ref >= 0);
if (ref > 0) {
return;
}
tscCloseTscObj(pObj); tscCloseTscObj(pObj);
} }
...@@ -537,7 +565,7 @@ int taos_select_db(TAOS *taos, const char *db) { ...@@ -537,7 +565,7 @@ int taos_select_db(TAOS *taos, const char *db) {
} }
// send free message to vnode to free qhandle and corresponding resources in vnode // send free message to vnode to free qhandle and corresponding resources in vnode
static bool tscKillQueryInDnode(SSqlObj* pSql) { static UNUSED_FUNC bool tscKillQueryInDnode(SSqlObj* pSql) {
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res; SSqlRes* pRes = &pSql->res;
...@@ -561,7 +589,7 @@ static bool tscKillQueryInDnode(SSqlObj* pSql) { ...@@ -561,7 +589,7 @@ static bool tscKillQueryInDnode(SSqlObj* pSql) {
cmd == TSDB_SQL_FETCH)) { cmd == TSDB_SQL_FETCH)) {
pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE; pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE;
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
tscDebug("%p send msg to dnode to free qhandle ASAP before free sqlObj, command:%s, ", pSql, sqlCmd[pCmd->command]); tscDebug("%p send msg to dnode to free qhandle ASAP before free sqlObj, command:%s", pSql, sqlCmd[pCmd->command]);
tscProcessSql(pSql); tscProcessSql(pSql);
return false; return false;
...@@ -577,45 +605,15 @@ void taos_free_result(TAOS_RES *res) { ...@@ -577,45 +605,15 @@ void taos_free_result(TAOS_RES *res) {
return; return;
} }
assert(pSql->self != 0 && *pSql->self == pSql); // assert(pSql->self != 0 && *pSql->self == pSql);
bool freeNow = tscKillQueryInDnode(pSql); bool freeNow = tscKillQueryInDnode(pSql);
if (freeNow) { if (freeNow) {
tscDebug("%p free sqlObj in cache", pSql); tscDebug("%p free sqlObj in cache", pSql);
taosCacheRelease(tscObjCache, (void**) &pSql->self, true); SSqlObj** p = pSql->self;
} taosCacheRelease(tscObjCache, (void**) &p, true);
} }
}
//static void doFreeResult(TAOS_RES *res) {
// SSqlObj *pSql = (SSqlObj *)res;
//
// if (pSql == NULL || pSql->signature != pSql) {
// tscDebug("%p sqlObj has been freed", pSql);
// return;
// }
//
// // The semaphore can not be changed while freeing async sub query objects.
// SSqlRes *pRes = &pSql->res;
// if (pRes == NULL || pRes->qhandle == 0) {
// tscFreeSqlObj(pSql);
// tscDebug("%p SqlObj is freed by app, qhandle is null", pSql);
// return;
// }
//
// // set freeFlag to 1 in retrieve message if there are un-retrieved results data in node
// SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
// if (pQueryInfo == NULL) {
// tscFreeSqlObj(pSql);
// tscDebug("%p SqlObj is freed by app", pSql);
// return;
// }
//
// pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE;
// if (!tscKillQueryInDnode(pSql)) {
// tscFreeSqlObj(pSql);
// tscDebug("%p sqlObj is freed by app", pSql);
// }
//}
int taos_errno(TAOS_RES *tres) { int taos_errno(TAOS_RES *tres) {
SSqlObj *pSql = (SSqlObj *) tres; SSqlObj *pSql = (SSqlObj *) tres;
......
...@@ -167,7 +167,7 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf ...@@ -167,7 +167,7 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
retryDelay); retryDelay);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pStream->pSql->cmd, 0, 0); STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pStream->pSql->cmd, 0, 0);
taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pTableMeta), true); taosCacheRelease(tscMetaCache, (void**)&(pTableMetaInfo->pTableMeta), true);
taosTFree(pTableMetaInfo->vgroupList); taosTFree(pTableMetaInfo->vgroupList);
tscSetRetryTimer(pStream, pStream->pSql, retryDelay); tscSetRetryTimer(pStream, pStream->pSql, retryDelay);
...@@ -275,7 +275,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf ...@@ -275,7 +275,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
// release the metric/meter meta information reference, so data in cache can be updated // release the metric/meter meta information reference, so data in cache can be updated
taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pTableMeta), false); taosCacheRelease(tscMetaCache, (void**)&(pTableMetaInfo->pTableMeta), false);
tscFreeSqlResult(pSql); tscFreeSqlResult(pSql);
taosTFree(pSql->pSubs); taosTFree(pSql->pSubs);
pSql->numOfSubs = 0; pSql->numOfSubs = 0;
......
...@@ -30,7 +30,7 @@ ...@@ -30,7 +30,7 @@
#include "tlocale.h" #include "tlocale.h"
// global, not configurable // global, not configurable
SCacheObj* tscCacheHandle; SCacheObj* tscMetaCache;
SCacheObj* tscObjCache; SCacheObj* tscObjCache;
void * tscTmr; void * tscTmr;
void * tscQhandle; void * tscQhandle;
...@@ -145,9 +145,9 @@ void taos_init_imp(void) { ...@@ -145,9 +145,9 @@ void taos_init_imp(void) {
refreshTime = refreshTime > 10 ? 10 : refreshTime; refreshTime = refreshTime > 10 ? 10 : refreshTime;
refreshTime = refreshTime < 10 ? 10 : refreshTime; refreshTime = refreshTime < 10 ? 10 : refreshTime;
if (tscCacheHandle == NULL) { if (tscMetaCache == NULL) {
tscCacheHandle = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, NULL, "tableMeta"); tscMetaCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, NULL, "tableMeta");
tscObjCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, refreshTime, false, tscFreeSqlObjInCache, "sqlObjHandle"); tscObjCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, refreshTime/2, false, tscFreeSqlObjInCache, "sqlObj");
} }
tscDebug("client is initialized successfully"); tscDebug("client is initialized successfully");
...@@ -156,9 +156,9 @@ void taos_init_imp(void) { ...@@ -156,9 +156,9 @@ void taos_init_imp(void) {
void taos_init() { pthread_once(&tscinit, taos_init_imp); } void taos_init() { pthread_once(&tscinit, taos_init_imp); }
void taos_cleanup() { void taos_cleanup() {
if (tscCacheHandle != NULL) { if (tscMetaCache != NULL) {
taosCacheCleanup(tscCacheHandle); taosCacheCleanup(tscMetaCache);
tscCacheHandle = NULL; tscMetaCache = NULL;
taosCacheCleanup(tscObjCache); taosCacheCleanup(tscObjCache);
tscObjCache = NULL; tscObjCache = NULL;
......
...@@ -344,8 +344,6 @@ void tscPartiallyFreeSqlObj(SSqlObj* pSql) { ...@@ -344,8 +344,6 @@ void tscPartiallyFreeSqlObj(SSqlObj* pSql) {
} }
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
STscObj* pObj = pSql->pTscObj;
int32_t cmd = pCmd->command; int32_t cmd = pCmd->command;
if (cmd < TSDB_SQL_INSERT || cmd == TSDB_SQL_RETRIEVE_LOCALMERGE || cmd == TSDB_SQL_RETRIEVE_EMPTY_RESULT || if (cmd < TSDB_SQL_INSERT || cmd == TSDB_SQL_RETRIEVE_LOCALMERGE || cmd == TSDB_SQL_RETRIEVE_EMPTY_RESULT ||
cmd == TSDB_SQL_TABLE_JOIN_RETRIEVE) { cmd == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
...@@ -353,11 +351,11 @@ void tscPartiallyFreeSqlObj(SSqlObj* pSql) { ...@@ -353,11 +351,11 @@ void tscPartiallyFreeSqlObj(SSqlObj* pSql) {
} }
// pSql->sqlstr will be used by tscBuildQueryStreamDesc // pSql->sqlstr will be used by tscBuildQueryStreamDesc
if (pObj->signature == pObj) { // if (pObj->signature == pObj) {
//pthread_mutex_lock(&pObj->mutex); //pthread_mutex_lock(&pObj->mutex);
taosTFree(pSql->sqlstr); taosTFree(pSql->sqlstr);
//pthread_mutex_unlock(&pObj->mutex); //pthread_mutex_unlock(&pObj->mutex);
} // }
tscFreeSqlResult(pSql); tscFreeSqlResult(pSql);
...@@ -384,42 +382,61 @@ static void tscFreeSubobj(SSqlObj* pSql) { ...@@ -384,42 +382,61 @@ static void tscFreeSubobj(SSqlObj* pSql) {
pSql->numOfSubs = 0; pSql->numOfSubs = 0;
} }
//static UNUSED_FUNC bool tscKillQueryInDnode(SSqlObj* pSql) {
// SSqlCmd* pCmd = &pSql->cmd;
// SSqlRes* pRes = &pSql->res;
//
// if (pRes == NULL || pRes->qhandle == 0) {
// return true;
// }
//
// SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
// if ((pQueryInfo == NULL) || tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
// return true;
// }
//
// STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
// tscRemoveFromSqlList(pSql);
//
// int32_t cmd = pCmd->command;
// if (pRes->code == TSDB_CODE_SUCCESS && pRes->completed == false && pSql->pStream == NULL && (pTableMetaInfo->pTableMeta != NULL) &&
// (cmd == TSDB_SQL_SELECT ||
// cmd == TSDB_SQL_SHOW ||
// cmd == TSDB_SQL_RETRIEVE ||
// cmd == TSDB_SQL_FETCH)) {
// pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE;
// pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
// tscDebug("%p send msg to dnode to free qhandle ASAP before free sqlObj, command:%s, ", pSql, sqlCmd[pCmd->command]);
//
// tscProcessSql(pSql);
// return false;
// }
//
// return true;
//}
/**
* The free operation will cause the pSql to be removed from hash table and free it in
* the function of processmsgfromserver is impossible in this case, since it will fail
* to retrieve pSqlObj in hashtable.
*
* @param pSql
*/
void tscFreeSqlObjInCache(void *pSql) { void tscFreeSqlObjInCache(void *pSql) {
assert(pSql != NULL); assert(pSql != NULL);
SSqlObj** p = (SSqlObj**) pSql; SSqlObj** p = (SSqlObj**)pSql;
assert((*p)->self != 0 && (*p)->self == (p));
tscFreeSqlObj(*p); tscFreeSqlObj(*p);
} }
static UNUSED_FUNC bool tscKillQueryInDnode(SSqlObj* pSql) {
SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
return false;
}
int32_t cmd = pCmd->command;
if (pRes->code == TSDB_CODE_SUCCESS && pRes->completed == false && pSql->pStream == NULL && (pTableMetaInfo->pTableMeta != NULL) &&
(cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_SHOW || cmd == TSDB_SQL_RETRIEVE || cmd == TSDB_SQL_FETCH)) {
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
tscDebug("%p send msg to dnode to free qhandle ASAP, command:%s, ", pSql, sqlCmd[pCmd->command]);
tscProcessSql(pSql);
return true;
}
return false;
}
void tscFreeSqlObj(SSqlObj* pSql) { void tscFreeSqlObj(SSqlObj* pSql) {
if (pSql == NULL || pSql->signature != pSql) { if (pSql == NULL || pSql->signature != pSql) {
return; return;
} }
tscDebug("%p start to free sqlObj", pSql); tscDebug("%p start to free sqlObj", pSql);
STscObj* pTscObj = pSql->pTscObj;
tscFreeSubobj(pSql); tscFreeSubobj(pSql);
tscPartiallyFreeSqlObj(pSql); tscPartiallyFreeSqlObj(pSql);
...@@ -438,6 +455,13 @@ void tscFreeSqlObj(SSqlObj* pSql) { ...@@ -438,6 +455,13 @@ void tscFreeSqlObj(SSqlObj* pSql) {
free(pSql); free(pSql);
tscDebug("%p free sqlObj completed", pSql); tscDebug("%p free sqlObj completed", pSql);
int32_t ref = T_REF_DEC(pTscObj);
assert(ref >= 0);
if (ref == 0) {
tscCloseTscObj(pTscObj);
}
} }
void tscDestroyDataBlock(STableDataBlocks* pDataBlock) { void tscDestroyDataBlock(STableDataBlocks* pDataBlock) {
...@@ -450,7 +474,7 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock) { ...@@ -450,7 +474,7 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock) {
// free the refcount for metermeta // free the refcount for metermeta
if (pDataBlock->pTableMeta != NULL) { if (pDataBlock->pTableMeta != NULL) {
taosCacheRelease(tscCacheHandle, (void**)&(pDataBlock->pTableMeta), false); taosCacheRelease(tscMetaCache, (void**)&(pDataBlock->pTableMeta), false);
} }
taosTFree(pDataBlock); taosTFree(pDataBlock);
...@@ -509,10 +533,10 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) { ...@@ -509,10 +533,10 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
tstrncpy(pTableMetaInfo->name, pDataBlock->tableId, sizeof(pTableMetaInfo->name)); tstrncpy(pTableMetaInfo->name, pDataBlock->tableId, sizeof(pTableMetaInfo->name));
if (pTableMetaInfo->pTableMeta != NULL) { if (pTableMetaInfo->pTableMeta != NULL) {
taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pTableMeta), false); taosCacheRelease(tscMetaCache, (void**)&(pTableMetaInfo->pTableMeta), false);
} }
pTableMetaInfo->pTableMeta = taosCacheTransfer(tscCacheHandle, (void**)&pDataBlock->pTableMeta); pTableMetaInfo->pTableMeta = taosCacheTransfer(tscMetaCache, (void**)&pDataBlock->pTableMeta);
} else { } else {
assert(strncmp(pTableMetaInfo->name, pDataBlock->tableId, tListLen(pDataBlock->tableId)) == 0); assert(strncmp(pTableMetaInfo->name, pDataBlock->tableId, tListLen(pDataBlock->tableId)) == 0);
} }
...@@ -583,7 +607,7 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff ...@@ -583,7 +607,7 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff
* due to operation such as drop database. So here we add the reference count directly instead of invoke * due to operation such as drop database. So here we add the reference count directly instead of invoke
* taosGetDataFromCache, which may return NULL value. * taosGetDataFromCache, which may return NULL value.
*/ */
dataBuf->pTableMeta = taosCacheAcquireByData(tscCacheHandle, pTableMeta); dataBuf->pTableMeta = taosCacheAcquireByData(tscMetaCache, pTableMeta);
assert(initialSize > 0 && pTableMeta != NULL && dataBuf->pTableMeta != NULL); assert(initialSize > 0 && pTableMeta != NULL && dataBuf->pTableMeta != NULL);
*dataBlocks = dataBuf; *dataBlocks = dataBuf;
...@@ -777,28 +801,10 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) { ...@@ -777,28 +801,10 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
// TODO: all subqueries should be freed correctly before close this connection. // TODO: all subqueries should be freed correctly before close this connection.
void tscCloseTscObj(STscObj* pObj) { void tscCloseTscObj(STscObj* pObj) {
assert(pObj != NULL); assert(pObj != NULL);
pObj->signature = NULL; pObj->signature = NULL;
taosTmrStopA(&(pObj->pTimer)); taosTmrStopA(&(pObj->pTimer));
// wait for all sqlObjs created according to this connect closed
while(1) {
pthread_mutex_lock(&pObj->mutex);
void* p = pObj->sqlList;
pthread_mutex_unlock(&pObj->mutex);
if (p == NULL) {
break;
}
tscDebug("%p waiting for sqlObj to be freed, %p", pObj, p);
taosMsleep(100);
// todo fix me!! two threads call taos_free_result will cause problem.
tscDebug("%p free :%p", pObj, p);
taos_free_result(p);
}
if (pObj->pDnodeConn != NULL) { if (pObj->pDnodeConn != NULL) {
rpcClose(pObj->pDnodeConn); rpcClose(pObj->pDnodeConn);
pObj->pDnodeConn = NULL; pObj->pDnodeConn = NULL;
...@@ -1743,7 +1749,7 @@ void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache) ...@@ -1743,7 +1749,7 @@ void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache)
} }
if (pTableMetaInfo->pTableMeta != NULL) { if (pTableMetaInfo->pTableMeta != NULL) {
taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pTableMeta), removeFromCache); taosCacheRelease(tscMetaCache, (void**)&(pTableMetaInfo->pTableMeta), removeFromCache);
} }
taosTFree(pTableMetaInfo->vgroupList); taosTFree(pTableMetaInfo->vgroupList);
...@@ -1769,6 +1775,8 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm ...@@ -1769,6 +1775,8 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm
} }
pNew->pTscObj = pSql->pTscObj; pNew->pTscObj = pSql->pTscObj;
T_REF_INC(pNew->pTscObj);
pNew->signature = pNew; pNew->signature = pNew;
SSqlCmd* pCmd = &pNew->cmd; SSqlCmd* pCmd = &pNew->cmd;
...@@ -1800,6 +1808,8 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm ...@@ -1800,6 +1808,8 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm
tscAddTableMetaInfo(pQueryInfo, pMasterTableMetaInfo->name, NULL, NULL, NULL); tscAddTableMetaInfo(pQueryInfo, pMasterTableMetaInfo->name, NULL, NULL, NULL);
T_REF_INC(pNew->pTscObj);
uint64_t p = (uint64_t) pNew; uint64_t p = (uint64_t) pNew;
pNew->self = taosCachePut(tscObjCache, &p, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2 * 600 * 1000); pNew->self = taosCachePut(tscObjCache, &p, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2 * 600 * 1000);
return pNew; return pNew;
...@@ -1890,6 +1900,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void ...@@ -1890,6 +1900,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
pNew->pTscObj = pSql->pTscObj; pNew->pTscObj = pSql->pTscObj;
pNew->signature = pNew; pNew->signature = pNew;
T_REF_INC(pNew->pTscObj);
pNew->sqlstr = strdup(pSql->sqlstr); pNew->sqlstr = strdup(pSql->sqlstr);
if (pNew->sqlstr == NULL) { if (pNew->sqlstr == NULL) {
...@@ -1994,14 +2005,14 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void ...@@ -1994,14 +2005,14 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
STableMetaInfo* pFinalInfo = NULL; STableMetaInfo* pFinalInfo = NULL;
if (pPrevSql == NULL) { if (pPrevSql == NULL) {
STableMeta* pTableMeta = taosCacheAcquireByData(tscCacheHandle, pTableMetaInfo->pTableMeta); // get by name may failed due to the cache cleanup STableMeta* pTableMeta = taosCacheAcquireByData(tscMetaCache, pTableMetaInfo->pTableMeta); // get by name may failed due to the cache cleanup
assert(pTableMeta != NULL); assert(pTableMeta != NULL);
pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pTableMeta, pTableMetaInfo->vgroupList, pTableMetaInfo->tagColList); pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pTableMeta, pTableMetaInfo->vgroupList, pTableMetaInfo->tagColList);
} else { // transfer the ownership of pTableMeta to the newly create sql object. } else { // transfer the ownership of pTableMeta to the newly create sql object.
STableMetaInfo* pPrevInfo = tscGetTableMetaInfoFromCmd(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex, 0); STableMetaInfo* pPrevInfo = tscGetTableMetaInfoFromCmd(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex, 0);
STableMeta* pPrevTableMeta = taosCacheTransfer(tscCacheHandle, (void**)&pPrevInfo->pTableMeta); STableMeta* pPrevTableMeta = taosCacheTransfer(tscMetaCache, (void**)&pPrevInfo->pTableMeta);
SVgroupsInfo* pVgroupsInfo = pPrevInfo->vgroupList; SVgroupsInfo* pVgroupsInfo = pPrevInfo->vgroupList;
pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pPrevTableMeta, pVgroupsInfo, pTableMetaInfo->tagColList); pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pPrevTableMeta, pVgroupsInfo, pTableMetaInfo->tagColList);
...@@ -2041,6 +2052,8 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void ...@@ -2041,6 +2052,8 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
tscDebug("%p new sub insertion: %p, vnodeIdx:%d", pSql, pNew, pTableMetaInfo->vgroupIndex); tscDebug("%p new sub insertion: %p, vnodeIdx:%d", pSql, pNew, pTableMetaInfo->vgroupIndex);
} }
T_REF_INC(pNew->pTscObj);
uint64_t p = (uint64_t) pNew; uint64_t p = (uint64_t) pNew;
pNew->self = taosCachePut(tscObjCache, &p, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2 * 600 * 10); pNew->self = taosCachePut(tscObjCache, &p, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2 * 600 * 10);
return pNew; return pNew;
...@@ -2154,6 +2167,7 @@ int32_t tscSQLSyntaxErrMsg(char* msg, const char* additionalInfo, const char* s ...@@ -2154,6 +2167,7 @@ int32_t tscSQLSyntaxErrMsg(char* msg, const char* additionalInfo, const char* s
return TSDB_CODE_TSC_SQL_SYNTAX_ERROR; return TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
} }
int32_t tscInvalidSQLErrMsg(char* msg, const char* additionalInfo, const char* sql) { int32_t tscInvalidSQLErrMsg(char* msg, const char* additionalInfo, const char* sql) {
const char* msgFormat1 = "invalid SQL: %s"; const char* msgFormat1 = "invalid SQL: %s";
const char* msgFormat2 = "invalid SQL: \'%s\' (%s)"; const char* msgFormat2 = "invalid SQL: \'%s\' (%s)";
......
...@@ -71,7 +71,7 @@ static SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const ...@@ -71,7 +71,7 @@ static SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const
* @param pCacheObj Cache object * @param pCacheObj Cache object
* @param pNode Cache slot object * @param pNode Cache slot object
*/ */
static void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode); static void taosAddToTrashcan(SCacheObj *pCacheObj, SCacheDataNode *pNode);
/** /**
* remove nodes in trash with refCount == 0 in cache * remove nodes in trash with refCount == 0 in cache
...@@ -80,7 +80,7 @@ static void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode); ...@@ -80,7 +80,7 @@ static void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode);
* @param force force model, if true, remove data in trash without check refcount. * @param force force model, if true, remove data in trash without check refcount.
* may cause corruption. So, forece model only applys before cache is closed * may cause corruption. So, forece model only applys before cache is closed
*/ */
static void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force); static void taosTrashcanEmpty(SCacheObj *pCacheObj, bool force);
/** /**
* release node * release node
...@@ -222,7 +222,7 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v ...@@ -222,7 +222,7 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v
taosTFree(p); taosTFree(p);
} else { } else {
taosAddToTrash(pCacheObj, p); taosAddToTrashcan(pCacheObj, p);
uDebug("cache:%s, key:%p, %p exist in cache, updated old:%p", pCacheObj->name, key, pNode1->data, p->data); uDebug("cache:%s, key:%p, %p exist in cache, updated old:%p", pCacheObj->name, key, pNode1->data, p->data);
} }
} }
...@@ -322,11 +322,11 @@ void *taosCacheTransfer(SCacheObj *pCacheObj, void **data) { ...@@ -322,11 +322,11 @@ void *taosCacheTransfer(SCacheObj *pCacheObj, void **data) {
} }
void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
if (taosHashGetSize(pCacheObj->pHashTable) + pCacheObj->numOfElemsInTrash == 0) { if (pCacheObj == NULL || taosHashGetSize(pCacheObj->pHashTable) + pCacheObj->numOfElemsInTrash == 0) {
return; return;
} }
if (pCacheObj == NULL || (*data) == NULL) { if ((*data) == NULL) {
uError("cache:%s, NULL data to release", pCacheObj->name); uError("cache:%s, NULL data to release", pCacheObj->name);
return; return;
} }
...@@ -399,19 +399,19 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { ...@@ -399,19 +399,19 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
"others already", pCacheObj->name, pNode->key, p->data, T_REF_VAL_GET(p), pNode->data); "others already", pCacheObj->name, pNode->key, p->data, T_REF_VAL_GET(p), pNode->data);
assert(p->pTNodeHeader == NULL); assert(p->pTNodeHeader == NULL);
taosAddToTrash(pCacheObj, p); taosAddToTrashcan(pCacheObj, p);
} else { } else {
uDebug("cache:%s, key:%p, %p successfully removed from hash table, refcnt:%d", pCacheObj->name, pNode->key, uDebug("cache:%s, key:%p, %p successfully removed from hash table, refcnt:%d", pCacheObj->name, pNode->key,
pNode->data, ref); pNode->data, ref);
if (ref > 0) { if (ref > 0) {
assert(pNode->pTNodeHeader == NULL); assert(pNode->pTNodeHeader == NULL);
taosAddToTrash(pCacheObj, pNode); taosAddToTrashcan(pCacheObj, pNode);
} else { // ref == 0 } else { // ref == 0
atomic_sub_fetch_64(&pCacheObj->totalSize, pNode->size); atomic_sub_fetch_64(&pCacheObj->totalSize, pNode->size);
int32_t size = (int32_t)taosHashGetSize(pCacheObj->pHashTable); int32_t size = (int32_t)taosHashGetSize(pCacheObj->pHashTable);
uDebug("cache:%s, key:%p, %p is destroyed from cache, size:%dbytes, num:%d size:%" PRId64 "bytes", uDebug("cache:%s, key:%p, %p is destroyed from cache, size:%dbytes, totalNum:%d size:%" PRId64 "bytes",
pCacheObj->name, pNode->key, pNode->data, pNode->size, size, pCacheObj->totalSize); pCacheObj->name, pNode->key, pNode->data, pNode->size, size, pCacheObj->totalSize);
if (pCacheObj->freeFp) { if (pCacheObj->freeFp) {
...@@ -432,6 +432,26 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { ...@@ -432,6 +432,26 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
char* key = pNode->key; char* key = pNode->key;
char* p = pNode->data; char* p = pNode->data;
// int32_t ref = T_REF_VAL_GET(pNode);
//
// if (ref == 1 && inTrashcan) {
// // If it is the last ref, remove it from trashcan linked-list first, and then destroy it.Otherwise, it may be
// // destroyed by refresh worker if decrease ref count before removing it from linked-list.
// assert(pNode->pTNodeHeader->pData == pNode);
//
// __cache_wr_lock(pCacheObj);
// doRemoveElemInTrashcan(pCacheObj, pNode->pTNodeHeader);
// __cache_unlock(pCacheObj);
//
// ref = T_REF_DEC(pNode);
// assert(ref == 0);
//
// doDestroyTrashcanElem(pCacheObj, pNode->pTNodeHeader);
// } else {
// ref = T_REF_DEC(pNode);
// assert(ref >= 0);
// }
int32_t ref = T_REF_DEC(pNode); int32_t ref = T_REF_DEC(pNode);
uDebug("cache:%s, key:%p, %p released, refcnt:%d, data in trashcan:%d", pCacheObj->name, key, p, ref, inTrashcan); uDebug("cache:%s, key:%p, %p released, refcnt:%d, data in trashcan:%d", pCacheObj->name, key, p, ref, inTrashcan);
} }
...@@ -452,7 +472,7 @@ static bool travHashTableEmptyFn(void* param, void* data) { ...@@ -452,7 +472,7 @@ static bool travHashTableEmptyFn(void* param, void* data) {
if (T_REF_VAL_GET(pNode) == 0) { if (T_REF_VAL_GET(pNode) == 0) {
taosCacheReleaseNode(pCacheObj, pNode); taosCacheReleaseNode(pCacheObj, pNode);
} else { // do add to trashcan } else { // do add to trashcan
taosAddToTrash(pCacheObj, pNode); taosAddToTrashcan(pCacheObj, pNode);
} }
// this node should be remove from hash table // this node should be remove from hash table
...@@ -463,7 +483,7 @@ void taosCacheEmpty(SCacheObj *pCacheObj) { ...@@ -463,7 +483,7 @@ void taosCacheEmpty(SCacheObj *pCacheObj) {
SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = NULL, .time = taosGetTimestampMs()}; SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = NULL, .time = taosGetTimestampMs()};
taosHashCondTraverse(pCacheObj->pHashTable, travHashTableEmptyFn, &sup); taosHashCondTraverse(pCacheObj->pHashTable, travHashTableEmptyFn, &sup);
taosTrashCanEmpty(pCacheObj, false); taosTrashcanEmpty(pCacheObj, false);
} }
void taosCacheCleanup(SCacheObj *pCacheObj) { void taosCacheCleanup(SCacheObj *pCacheObj) {
...@@ -503,7 +523,7 @@ SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char * ...@@ -503,7 +523,7 @@ SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *
return pNewNode; return pNewNode;
} }
void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) { void taosAddToTrashcan(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
if (pNode->inTrashcan) { /* node is already in trash */ if (pNode->inTrashcan) { /* node is already in trash */
assert(pNode->pTNodeHeader != NULL && pNode->pTNodeHeader->pData == pNode); assert(pNode->pTNodeHeader != NULL && pNode->pTNodeHeader->pData == pNode);
return; return;
...@@ -525,11 +545,11 @@ void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) { ...@@ -525,11 +545,11 @@ void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
pCacheObj->numOfElemsInTrash++; pCacheObj->numOfElemsInTrash++;
__cache_unlock(pCacheObj); __cache_unlock(pCacheObj);
uDebug("cache:%s key:%p, %p move to trash, numOfElem in trash:%d", pCacheObj->name, pNode->key, pNode->data, uDebug("cache:%s key:%p, %p move to trashcan, numOfElem in trashcan:%d", pCacheObj->name, pNode->key, pNode->data,
pCacheObj->numOfElemsInTrash); pCacheObj->numOfElemsInTrash);
} }
void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force) { void taosTrashcanEmpty(SCacheObj *pCacheObj, bool force) {
__cache_wr_lock(pCacheObj); __cache_wr_lock(pCacheObj);
if (pCacheObj->numOfElemsInTrash == 0) { if (pCacheObj->numOfElemsInTrash == 0) {
...@@ -573,7 +593,7 @@ void doCleanupDataCache(SCacheObj *pCacheObj) { ...@@ -573,7 +593,7 @@ void doCleanupDataCache(SCacheObj *pCacheObj) {
// todo memory leak if there are object with refcount greater than 0 in hash table? // todo memory leak if there are object with refcount greater than 0 in hash table?
taosHashCleanup(pCacheObj->pHashTable); taosHashCleanup(pCacheObj->pHashTable);
taosTrashCanEmpty(pCacheObj, true); taosTrashcanEmpty(pCacheObj, true);
__cache_lock_destroy(pCacheObj); __cache_lock_destroy(pCacheObj);
...@@ -648,7 +668,7 @@ void* taosCacheTimedRefresh(void *handle) { ...@@ -648,7 +668,7 @@ void* taosCacheTimedRefresh(void *handle) {
doCacheRefresh(pCacheObj, now, NULL); doCacheRefresh(pCacheObj, now, NULL);
} }
taosTrashCanEmpty(pCacheObj, false); taosTrashcanEmpty(pCacheObj, false);
} }
return NULL; return NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册