未验证 提交 6cfd7265 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #3658 from taosdata/feature/query

Feature/query
...@@ -258,6 +258,8 @@ void tscDoQuery(SSqlObj* pSql); ...@@ -258,6 +258,8 @@ void tscDoQuery(SSqlObj* pSql);
*/ */
SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cmd); SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cmd);
void registerSqlObj(SSqlObj* pSql);
SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void* param, int32_t cmd, SSqlObj* pPrevSql); SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void* param, int32_t cmd, SSqlObj* pPrevSql);
void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClauseIndex, int32_t tableIndex); void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClauseIndex, int32_t tableIndex);
......
...@@ -107,7 +107,7 @@ typedef struct STableMeta { ...@@ -107,7 +107,7 @@ typedef struct STableMeta {
} STableMeta; } STableMeta;
typedef struct STableMetaInfo { typedef struct STableMetaInfo {
STableMeta * pTableMeta; // table meta, cached in client side and acquired by name STableMeta *pTableMeta; // table meta, cached in client side and acquired by name
SVgroupsInfo *vgroupList; SVgroupsInfo *vgroupList;
SArray *pVgroupTables; // SArray<SVgroupTableInfo> SArray *pVgroupTables; // SArray<SVgroupTableInfo>
......
...@@ -51,10 +51,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const ...@@ -51,10 +51,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const
pSql->fp = fp; pSql->fp = fp;
pSql->fetchFp = fp; pSql->fetchFp = fp;
uint64_t handle = (uint64_t) pSql; registerSqlObj(pSql);
pSql->self = taosCachePut(tscObjCache, &handle, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000);
T_REF_INC(pSql->pTscObj);
pSql->sqlstr = calloc(1, sqlLen + 1); pSql->sqlstr = calloc(1, sqlLen + 1);
if (pSql->sqlstr == NULL) { if (pSql->sqlstr == NULL) {
......
...@@ -546,9 +546,7 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) { ...@@ -546,9 +546,7 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
pSql->cmd.numOfParams = 0; pSql->cmd.numOfParams = 0;
pSql->cmd.batchSize = 0; pSql->cmd.batchSize = 0;
uint64_t handle = (uint64_t) pSql; registerSqlObj(pSql);
pSql->self = taosCachePut(tscObjCache, &handle, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000);
T_REF_INC(pSql->pTscObj);
int32_t code = tsParseSql(pSql, true); int32_t code = tsParseSql(pSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
......
...@@ -128,6 +128,7 @@ static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) { ...@@ -128,6 +128,7 @@ static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) {
tscDebug("after: EndPoint in use: %d", pVgroupInfo->inUse); tscDebug("after: EndPoint in use: %d", pVgroupInfo->inUse);
taosCorEndWrite(&pVgroupInfo->version); taosCorEndWrite(&pVgroupInfo->version);
} }
void tscPrintMgmtEp() { void tscPrintMgmtEp() {
SRpcEpSet dump; SRpcEpSet dump;
tscDumpMgmtEpSet(&dump); tscDumpMgmtEpSet(&dump);
...@@ -745,7 +746,6 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -745,7 +746,6 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i); SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId, pExpr->numOfParams)) { if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId, pExpr->numOfParams)) {
/* column id is not valid according to the cached table meta, the table meta is expired */
tscError("%p table schema is not matched with parsed sql", pSql); tscError("%p table schema is not matched with parsed sql", pSql);
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
} }
...@@ -1987,15 +1987,11 @@ static void createHBObj(STscObj* pObj) { ...@@ -1987,15 +1987,11 @@ static void createHBObj(STscObj* pObj) {
pSql->param = pObj; pSql->param = pObj;
pSql->pTscObj = pObj; pSql->pTscObj = pObj;
pSql->signature = pSql; pSql->signature = pSql;
pObj->pHb = pSql;
tscAddSubqueryInfo(&pObj->pHb->cmd);
int64_t ad = (int64_t) pSql; registerSqlObj(pSql);
pSql->self = taosCachePut(tscObjCache, &ad, sizeof(int64_t), &pSql, sizeof(int64_t), 2 * 60 * 1000); tscDebug("%p HB is allocated, pObj:%p", pSql, pObj);
T_REF_INC(pObj);
tscDebug("%p HB is allocated, pObj:%p", pObj->pHb, pObj); pObj->pHb = pSql;
} }
int tscProcessConnectRsp(SSqlObj *pSql) { int tscProcessConnectRsp(SSqlObj *pSql) {
...@@ -2170,11 +2166,7 @@ static int32_t getTableMetaFromMgmt(SSqlObj *pSql, STableMetaInfo *pTableMetaInf ...@@ -2170,11 +2166,7 @@ static int32_t getTableMetaFromMgmt(SSqlObj *pSql, STableMetaInfo *pTableMetaInf
pNew->signature = pNew; pNew->signature = pNew;
pNew->cmd.command = TSDB_SQL_META; pNew->cmd.command = TSDB_SQL_META;
T_REF_INC(pNew->pTscObj); registerSqlObj(pNew);
// TODO add test case on x86 platform
uint64_t adr = (uint64_t) pNew;
pNew->self = taosCachePut(tscObjCache, &adr, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2*60*1000);
tscAddSubqueryInfo(&pNew->cmd); tscAddSubqueryInfo(&pNew->cmd);
...@@ -2301,10 +2293,8 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) { ...@@ -2301,10 +2293,8 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
} }
pNewQueryInfo->numOfTables = pQueryInfo->numOfTables; pNewQueryInfo->numOfTables = pQueryInfo->numOfTables;
T_REF_INC(pNew->pTscObj); registerSqlObj(pNew);
uint64_t p = (uint64_t) pNew;
pNew->self = taosCachePut(tscObjCache, &p, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2 * 600 * 1000);
tscDebug("%p new sqlObj:%p to get vgroupInfo, numOfTables:%d", pSql, pNew, pNewQueryInfo->numOfTables); tscDebug("%p new sqlObj:%p to get vgroupInfo, numOfTables:%d", pSql, pNew, pNewQueryInfo->numOfTables);
pNew->fp = tscTableMetaCallBack; pNew->fp = tscTableMetaCallBack;
......
...@@ -156,10 +156,7 @@ SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con ...@@ -156,10 +156,7 @@ SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
*taos = pObj; *taos = pObj;
} }
T_REF_INC(pSql->pTscObj); registerSqlObj(pSql);
uint64_t key = (uint64_t) pSql;
pSql->self = taosCachePut(tscObjCache, &key, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000);
tsInsertHeadSize = sizeof(SMsgDesc) + sizeof(SSubmitMsg); tsInsertHeadSize = sizeof(SMsgDesc) + sizeof(SSubmitMsg);
return pSql; return pSql;
...@@ -270,7 +267,7 @@ void taos_close(TAOS *taos) { ...@@ -270,7 +267,7 @@ void taos_close(TAOS *taos) {
pHb->pRpcCtx = NULL; pHb->pRpcCtx = NULL;
} }
tscDebug("%p, HB is freed", pHb); tscDebug("%p HB is freed", pHb);
taos_free_result(pHb); taos_free_result(pHb);
} }
...@@ -789,14 +786,17 @@ int taos_validate_sql(TAOS *taos, const char *sql) { ...@@ -789,14 +786,17 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
} }
SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
pSql->pTscObj = taos; pSql->pTscObj = taos;
pSql->signature = pSql; pSql->signature = pSql;
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
pRes->numOfTotal = 0; pRes->numOfTotal = 0;
pRes->numOfClauseTotal = 0; pRes->numOfClauseTotal = 0;
tscDebug("%p Valid SQL: %s pObj:%p", pSql, sql, pObj); tscDebug("%p Valid SQL: %s pObj:%p", pSql, sql, pObj);
int32_t sqlLen = (int32_t)strlen(sql); int32_t sqlLen = (int32_t)strlen(sql);
...@@ -832,11 +832,12 @@ int taos_validate_sql(TAOS *taos, const char *sql) { ...@@ -832,11 +832,12 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
tsem_wait(&pSql->rspSem); tsem_wait(&pSql->rspSem);
code = pSql->res.code; code = pSql->res.code;
} }
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tscDebug("%p Valid SQL result:%d, %s pObj:%p", pSql, code, taos_errstr(taos), pObj); tscDebug("%p Valid SQL result:%d, %s pObj:%p", pSql, code, taos_errstr(taos), pObj);
} }
taos_free_result(pSql);
taos_free_result(pSql);
return code; return code;
} }
...@@ -935,34 +936,32 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { ...@@ -935,34 +936,32 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
pSql->pTscObj = taos; pSql->pTscObj = taos;
pSql->signature = pSql; pSql->signature = pSql;
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
pRes->code = 0;
pRes->numOfTotal = 0; // the number of getting table meta from server pRes->numOfTotal = 0; // the number of getting table meta from server
pRes->numOfClauseTotal = 0; pRes->numOfClauseTotal = 0;
pRes->code = 0;
assert(pSql->fp == NULL); assert(pSql->fp == NULL);
tscDebug("%p tableNameList: %s pObj:%p", pSql, tableNameList, pObj); tscDebug("%p tableNameList: %s pObj:%p", pSql, tableNameList, pObj);
int32_t tblListLen = (int32_t)strlen(tableNameList); int32_t tblListLen = (int32_t)strlen(tableNameList);
if (tblListLen > MAX_TABLE_NAME_LENGTH) { if (tblListLen > MAX_TABLE_NAME_LENGTH) {
tscError("%p tableNameList too long, length:%d, maximum allowed:%d", pSql, tblListLen, MAX_TABLE_NAME_LENGTH); tscError("%p tableNameList too long, length:%d, maximum allowed:%d", pSql, tblListLen, MAX_TABLE_NAME_LENGTH);
pRes->code = TSDB_CODE_TSC_INVALID_SQL; tscFreeSqlObj(pSql);
taosTFree(pSql); return TSDB_CODE_TSC_INVALID_SQL;
return pRes->code;
} }
char *str = calloc(1, tblListLen + 1); char *str = calloc(1, tblListLen + 1);
if (str == NULL) { if (str == NULL) {
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscError("%p failed to malloc sql string buffer", pSql); tscError("%p failed to malloc sql string buffer", pSql);
taosTFree(pSql); tscFreeSqlObj(pSql);
return pRes->code; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
strtolower(str, tableNameList); strtolower(str, tableNameList);
pRes->code = (uint8_t)tscParseTblNameList(pSql, str, tblListLen); int32_t code = (uint8_t) tscParseTblNameList(pSql, str, tblListLen);
/* /*
* set the qhandle to 0 before return in order to erase the qhandle value assigned in the previous successful query. * set the qhandle to 0 before return in order to erase the qhandle value assigned in the previous successful query.
...@@ -972,17 +971,17 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { ...@@ -972,17 +971,17 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
pRes->qhandle = 0; pRes->qhandle = 0;
free(str); free(str);
if (pRes->code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tscFreeSqlObj(pSql); tscFreeSqlObj(pSql);
return pRes->code; return code;
} }
tscDoQuery(pSql); tscDoQuery(pSql);
tscDebug("%p load multi metermeta result:%d %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj); tscDebug("%p load multi table meta result:%d %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj);
if (pRes->code != TSDB_CODE_SUCCESS) { if ((code = pRes->code) != TSDB_CODE_SUCCESS) {
tscPartiallyFreeSqlObj(pSql); tscFreeSqlObj(pSql);
} }
return pRes->code; return code;
} }
...@@ -510,9 +510,7 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) { ...@@ -510,9 +510,7 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
return; return;
} }
uint64_t handle = (uint64_t) pSql; registerSqlObj(pSql);
pSql->self = taosCachePut(tscObjCache, &handle, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000);
T_REF_INC(pSql->pTscObj);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
......
...@@ -105,6 +105,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* ...@@ -105,6 +105,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
goto fail; goto fail;
} }
tstrncpy(pSub->topic, topic, sizeof(pSub->topic)); tstrncpy(pSub->topic, topic, sizeof(pSub->topic));
pSub->progress = taosArrayInit(32, sizeof(SSubscriptionProgress)); pSub->progress = taosArrayInit(32, sizeof(SSubscriptionProgress));
if (pSub->progress == NULL) { if (pSub->progress == NULL) {
...@@ -119,6 +120,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* ...@@ -119,6 +120,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
code = TSDB_CODE_TSC_OUT_OF_MEMORY; code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto fail; goto fail;
} }
pSql->signature = pSql; pSql->signature = pSql;
pSql->pTscObj = pObj; pSql->pTscObj = pObj;
pSql->pSubscription = pSub; pSql->pSubscription = pSub;
...@@ -142,6 +144,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* ...@@ -142,6 +144,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
code = TSDB_CODE_TSC_OUT_OF_MEMORY; code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto fail; goto fail;
} }
strtolower(pSql->sqlstr, pSql->sqlstr); strtolower(pSql->sqlstr, pSql->sqlstr);
pRes->qhandle = 0; pRes->qhandle = 0;
pRes->numOfRows = 1; pRes->numOfRows = 1;
...@@ -152,15 +155,14 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* ...@@ -152,15 +155,14 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
goto fail; goto fail;
} }
uint64_t handle = (uint64_t) pSql; registerSqlObj(pSql);
pSql->self = taosCachePut(tscObjCache, &handle, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000);
T_REF_INC(pSql->pTscObj);
code = tsParseSql(pSql, false); code = tsParseSql(pSql, false);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
tsem_wait(&pSub->sem); tsem_wait(&pSub->sem);
code = pSql->res.code; code = pSql->res.code;
} }
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
line = __LINE__; line = __LINE__;
goto fail; goto fail;
...@@ -182,8 +184,10 @@ fail: ...@@ -182,8 +184,10 @@ fail:
} else { } else {
tscFreeSqlObj(pSql); tscFreeSqlObj(pSql);
} }
pSql = NULL; pSql = NULL;
} }
if (pSub != NULL) { if (pSub != NULL) {
taosArrayDestroy(pSub->progress); taosArrayDestroy(pSub->progress);
tsem_destroy(&pSub->sem); tsem_destroy(&pSub->sem);
......
...@@ -122,11 +122,8 @@ void taos_init_imp(void) { ...@@ -122,11 +122,8 @@ void taos_init_imp(void) {
tscInitMsgsFp(); tscInitMsgsFp();
int queueSize = tsMaxConnections*2; int queueSize = tsMaxConnections*2;
if (tscEmbedded == 0) { double factor = (tscEmbedded == 0)? 2.0:4.0;
tscNumOfThreads = (int)(tsNumOfCores * tsNumOfThreadsPerCore / 2.0); tscNumOfThreads = (int)(tsNumOfCores * tsNumOfThreadsPerCore / factor);
} else {
tscNumOfThreads = (int)(tsNumOfCores * tsNumOfThreadsPerCore / 4.0);
}
if (tscNumOfThreads < 2) tscNumOfThreads = 2; if (tscNumOfThreads < 2) tscNumOfThreads = 2;
...@@ -141,10 +138,7 @@ void taos_init_imp(void) { ...@@ -141,10 +138,7 @@ void taos_init_imp(void) {
taosTmrReset(tscCheckDiskUsage, 10, NULL, tscTmr, &tscCheckDiskUsageTmr); taosTmrReset(tscCheckDiskUsage, 10, NULL, tscTmr, &tscCheckDiskUsageTmr);
} }
int64_t refreshTime = tsTableMetaKeepTimer; int64_t refreshTime = 10; // 10 seconds by default
refreshTime = refreshTime > 10 ? 10 : refreshTime;
refreshTime = refreshTime < 10 ? 10 : refreshTime;
if (tscMetaCache == NULL) { if (tscMetaCache == NULL) {
tscMetaCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, NULL, "tableMeta"); tscMetaCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, NULL, "tableMeta");
tscObjCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, refreshTime/2, false, tscFreeSqlObjInCache, "sqlObj"); tscObjCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, refreshTime/2, false, tscFreeSqlObjInCache, "sqlObj");
......
...@@ -391,10 +391,21 @@ static UNUSED_FUNC void tscFreeSubobj(SSqlObj* pSql) { ...@@ -391,10 +391,21 @@ static UNUSED_FUNC void tscFreeSubobj(SSqlObj* pSql) {
*/ */
void tscFreeSqlObjInCache(void *pSql) { void tscFreeSqlObjInCache(void *pSql) {
assert(pSql != NULL); assert(pSql != NULL);
SSqlObj** p = (SSqlObj**)pSql; SSqlObj** p = (SSqlObj**)pSql;
STscObj* pTscObj = (*p)->pTscObj;
assert((*p)->self != 0 && (*p)->self == (p)); assert((*p)->self != 0 && (*p)->self == (p));
tscFreeSqlObj(*p); tscFreeSqlObj(*p);
int32_t ref = T_REF_DEC(pTscObj);
assert(ref >= 0);
tscDebug("%p free sqlObj completed, tscObj:%p ref:%d", *p, pTscObj, ref);
if (ref == 0) {
tscDebug("%p all sqlObj freed, free tscObj:%p", *p, pTscObj);
tscCloseTscObj(pTscObj);
}
} }
void tscFreeSqlObj(SSqlObj* pSql) { void tscFreeSqlObj(SSqlObj* pSql) {
...@@ -403,7 +414,6 @@ void tscFreeSqlObj(SSqlObj* pSql) { ...@@ -403,7 +414,6 @@ void tscFreeSqlObj(SSqlObj* pSql) {
} }
tscDebug("%p start to free sqlObj", pSql); tscDebug("%p start to free sqlObj", pSql);
STscObj* pTscObj = pSql->pTscObj;
tscFreeSubobj(pSql); tscFreeSubobj(pSql);
tscPartiallyFreeSqlObj(pSql); tscPartiallyFreeSqlObj(pSql);
...@@ -421,14 +431,6 @@ void tscFreeSqlObj(SSqlObj* pSql) { ...@@ -421,14 +431,6 @@ void tscFreeSqlObj(SSqlObj* pSql) {
tsem_destroy(&pSql->rspSem); tsem_destroy(&pSql->rspSem);
free(pSql); free(pSql);
tscDebug("%p free sqlObj completed", pSql);
int32_t ref = T_REF_DEC(pTscObj);
assert(ref >= 0);
if (ref == 0) {
tscCloseTscObj(pTscObj);
}
} }
void tscDestroyDataBlock(STableDataBlocks* pDataBlock) { void tscDestroyDataBlock(STableDataBlocks* pDataBlock) {
...@@ -1780,6 +1782,16 @@ void tscResetForNextRetrieve(SSqlRes* pRes) { ...@@ -1780,6 +1782,16 @@ void tscResetForNextRetrieve(SSqlRes* pRes) {
pRes->numOfRows = 0; pRes->numOfRows = 0;
} }
void registerSqlObj(SSqlObj* pSql) {
int32_t DEFAULT_LIFE_TIME = 2 * 600 * 1000; // 1200 sec
int32_t ref = T_REF_INC(pSql->pTscObj);
tscDebug("%p add to tscObj:%p, ref:%d", pSql, pSql->pTscObj, ref);
uint64_t p = (uint64_t) pSql;
pSql->self = taosCachePut(tscObjCache, &p, sizeof(uint64_t), &p, sizeof(uint64_t), DEFAULT_LIFE_TIME);
}
SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cmd) { SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cmd) {
SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj)); SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj));
if (pNew == NULL) { if (pNew == NULL) {
...@@ -1808,8 +1820,7 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm ...@@ -1808,8 +1820,7 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm
pNew->sqlstr = strdup(pSql->sqlstr); pNew->sqlstr = strdup(pSql->sqlstr);
if (pNew->sqlstr == NULL) { if (pNew->sqlstr == NULL) {
tscError("%p new subquery failed", pSql); tscError("%p new subquery failed", pSql);
tscFreeSqlObj(pNew);
free(pNew);
return NULL; return NULL;
} }
...@@ -1820,9 +1831,7 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm ...@@ -1820,9 +1831,7 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm
tscAddTableMetaInfo(pQueryInfo, pMasterTableMetaInfo->name, NULL, NULL, NULL); tscAddTableMetaInfo(pQueryInfo, pMasterTableMetaInfo->name, NULL, NULL, NULL);
T_REF_INC(pNew->pTscObj); registerSqlObj(pNew);
uint64_t p = (uint64_t) pNew;
pNew->self = taosCachePut(tscObjCache, &p, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2 * 600 * 1000);
return pNew; return pNew;
} }
...@@ -2060,10 +2069,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void ...@@ -2060,10 +2069,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
tscDebug("%p new sub insertion: %p, vnodeIdx:%d", pSql, pNew, pTableMetaInfo->vgroupIndex); tscDebug("%p new sub insertion: %p, vnodeIdx:%d", pSql, pNew, pTableMetaInfo->vgroupIndex);
} }
T_REF_INC(pNew->pTscObj); registerSqlObj(pNew);
uint64_t p = (uint64_t) pNew;
pNew->self = taosCachePut(tscObjCache, &p, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2 * 600 * 10);
return pNew; return pNew;
_error: _error:
......
...@@ -136,6 +136,7 @@ typedef struct SSkipListIterator { ...@@ -136,6 +136,7 @@ typedef struct SSkipListIterator {
SSkipListNode *cur; SSkipListNode *cur;
int32_t step; // the number of nodes that have been checked already int32_t step; // the number of nodes that have been checked already
int32_t order; // order of the iterator int32_t order; // order of the iterator
SSkipListNode *next; // next points to the true qualified node in skip list
} SSkipListIterator; } SSkipListIterator;
/** /**
......
...@@ -97,7 +97,7 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNo ...@@ -97,7 +97,7 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNo
int32_t size = (int32_t)taosHashGetSize(pCacheObj->pHashTable); int32_t size = (int32_t)taosHashGetSize(pCacheObj->pHashTable);
assert(size > 0); assert(size > 0);
uDebug("cache:%s, key:%p, %p is destroyed from cache, size:%dbytes, num:%d size:%" PRId64 "bytes", uDebug("cache:%s, key:%p, %p is destroyed from cache, size:%dbytes, totalNum:%d size:%" PRId64 "bytes",
pCacheObj->name, pNode->key, pNode->data, pNode->size, size - 1, pCacheObj->totalSize); pCacheObj->name, pNode->key, pNode->data, pNode->size, size - 1, pCacheObj->totalSize);
if (pCacheObj->freeFp) { if (pCacheObj->freeFp) {
......
...@@ -79,9 +79,12 @@ static SSkipListIterator* doCreateSkipListIterator(SSkipList *pSkipList, int32_t ...@@ -79,9 +79,12 @@ static SSkipListIterator* doCreateSkipListIterator(SSkipList *pSkipList, int32_t
// when order is TSDB_ORDER_ASC, return the last node with key less than val // when order is TSDB_ORDER_ASC, return the last node with key less than val
// when order is TSDB_ORDER_DESC, return the first node with key large than val // when order is TSDB_ORDER_DESC, return the first node with key large than val
static SSkipListNode* getPriorNode(SSkipList* pSkipList, const char* val, int32_t order) { static SSkipListNode* getPriorNode(SSkipList* pSkipList, const char* val, int32_t order, SSkipListNode** pCur) {
__compar_fn_t comparFn = pSkipList->comparFn; __compar_fn_t comparFn = pSkipList->comparFn;
SSkipListNode *pNode = NULL; SSkipListNode *pNode = NULL;
if (pCur != NULL) {
*pCur = NULL;
}
if (order == TSDB_ORDER_ASC) { if (order == TSDB_ORDER_ASC) {
pNode = pSkipList->pHead; pNode = pSkipList->pHead;
...@@ -93,6 +96,9 @@ static SSkipListNode* getPriorNode(SSkipList* pSkipList, const char* val, int32_ ...@@ -93,6 +96,9 @@ static SSkipListNode* getPriorNode(SSkipList* pSkipList, const char* val, int32_
pNode = p; pNode = p;
p = SL_GET_FORWARD_POINTER(p, i); p = SL_GET_FORWARD_POINTER(p, i);
} else { } else {
if (pCur != NULL) {
*pCur = p;
}
break; break;
} }
} }
...@@ -107,6 +113,9 @@ static SSkipListNode* getPriorNode(SSkipList* pSkipList, const char* val, int32_ ...@@ -107,6 +113,9 @@ static SSkipListNode* getPriorNode(SSkipList* pSkipList, const char* val, int32_
pNode = p; pNode = p;
p = SL_GET_BACKWARD_POINTER(p, i); p = SL_GET_BACKWARD_POINTER(p, i);
} else { } else {
if (pCur != NULL) {
*pCur = p;
}
break; break;
} }
} }
...@@ -295,7 +304,7 @@ SArray* tSkipListGet(SSkipList *pSkipList, SSkipListKey key) { ...@@ -295,7 +304,7 @@ SArray* tSkipListGet(SSkipList *pSkipList, SSkipListKey key) {
pthread_rwlock_wrlock(pSkipList->lock); pthread_rwlock_wrlock(pSkipList->lock);
} }
SSkipListNode* pNode = getPriorNode(pSkipList, key, TSDB_ORDER_ASC); SSkipListNode* pNode = getPriorNode(pSkipList, key, TSDB_ORDER_ASC, NULL);
while (1) { while (1) {
SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, 0); SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, 0);
if (p == pSkipList->pTail) { if (p == pSkipList->pTail) {
...@@ -452,7 +461,7 @@ uint32_t tSkipListRemove(SSkipList *pSkipList, SSkipListKey key) { ...@@ -452,7 +461,7 @@ uint32_t tSkipListRemove(SSkipList *pSkipList, SSkipListKey key) {
pthread_rwlock_wrlock(pSkipList->lock); pthread_rwlock_wrlock(pSkipList->lock);
} }
SSkipListNode* pNode = getPriorNode(pSkipList, key, TSDB_ORDER_ASC); SSkipListNode* pNode = getPriorNode(pSkipList, key, TSDB_ORDER_ASC, NULL);
while (1) { while (1) {
SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, 0); SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, 0);
if (p == pSkipList->pTail) { if (p == pSkipList->pTail) {
...@@ -545,7 +554,7 @@ SSkipListIterator *tSkipListCreateIterFromVal(SSkipList* pSkipList, const char* ...@@ -545,7 +554,7 @@ SSkipListIterator *tSkipListCreateIterFromVal(SSkipList* pSkipList, const char*
pthread_rwlock_rdlock(pSkipList->lock); pthread_rwlock_rdlock(pSkipList->lock);
} }
iter->cur = getPriorNode(pSkipList, val, order); iter->cur = getPriorNode(pSkipList, val, order, &iter->next);
if (pSkipList->lock) { if (pSkipList->lock) {
pthread_rwlock_unlock(pSkipList->lock); pthread_rwlock_unlock(pSkipList->lock);
...@@ -567,8 +576,22 @@ bool tSkipListIterNext(SSkipListIterator *iter) { ...@@ -567,8 +576,22 @@ bool tSkipListIterNext(SSkipListIterator *iter) {
if (iter->order == TSDB_ORDER_ASC) { // ascending order iterate if (iter->order == TSDB_ORDER_ASC) { // ascending order iterate
iter->cur = SL_GET_FORWARD_POINTER(iter->cur, 0); iter->cur = SL_GET_FORWARD_POINTER(iter->cur, 0);
// a new node is inserted into between iter->cur and iter->next, ignore it
if (iter->cur != iter->next && (iter->next != NULL)) {
iter->cur = iter->next;
}
iter->next = SL_GET_FORWARD_POINTER(iter->cur, 0);
} else { // descending order iterate } else { // descending order iterate
iter->cur = SL_GET_BACKWARD_POINTER(iter->cur, 0); iter->cur = SL_GET_BACKWARD_POINTER(iter->cur, 0);
// a new node is inserted into between iter->cur and iter->next, ignore it
if (iter->cur != iter->next && (iter->next != NULL)) {
iter->cur = iter->next;
}
iter->next = SL_GET_BACKWARD_POINTER(iter->cur, 0);
} }
if (pSkipList->lock) { if (pSkipList->lock) {
...@@ -715,8 +738,10 @@ SSkipListIterator* doCreateSkipListIterator(SSkipList *pSkipList, int32_t order) ...@@ -715,8 +738,10 @@ SSkipListIterator* doCreateSkipListIterator(SSkipList *pSkipList, int32_t order)
iter->order = order; iter->order = order;
if(order == TSDB_ORDER_ASC) { if(order == TSDB_ORDER_ASC) {
iter->cur = pSkipList->pHead; iter->cur = pSkipList->pHead;
iter->next = SL_GET_FORWARD_POINTER(iter->cur, 0);
} else { } else {
iter->cur = pSkipList->pTail; iter->cur = pSkipList->pTail;
iter->next = SL_GET_BACKWARD_POINTER(iter->cur, 0);
} }
return iter; return iter;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册