未验证 提交 ca3888c1 编写于 作者: H haojun Liao 提交者: GitHub

Merge pull request #4791 from taosdata/feature/query

Feature/query
......@@ -20,9 +20,6 @@
extern "C" {
#endif
/*
* @date 2018/09/30
*/
#include "exception.h"
#include "os.h"
#include "qExtbuffer.h"
......@@ -216,7 +213,7 @@ STableMetaInfo* tscGetMetaInfo(SQueryInfo *pQueryInfo, int32_t tableIndex);
SQueryInfo *tscGetQueryInfoDetail(SSqlCmd* pCmd, int32_t subClauseIndex);
SQueryInfo *tscGetQueryInfoDetailSafely(SSqlCmd *pCmd, int32_t subClauseIndex);
void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache);
void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo);
STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, STableMeta* pTableMeta,
SVgroupsInfo* vgroupList, SArray* pTagCols, SArray* pVgroupTables);
......@@ -276,7 +273,7 @@ void tscPrintSelectClause(SSqlObj* pSql, int32_t subClauseIndex);
bool hasMoreVnodesToTry(SSqlObj *pSql);
bool hasMoreClauseToTry(SSqlObj* pSql);
void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeFromCache);
void tscFreeQueryInfo(SSqlCmd* pCmd);
void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp);
void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
......@@ -290,6 +287,14 @@ int32_t doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_
char* serializeTagData(STagData* pTagData, char* pMsg);
int32_t copyTagData(STagData* dst, const STagData* src);
STableMeta* createSuperTableMeta(STableMetaMsg* pChild);
uint32_t tscGetTableMetaSize(STableMeta* pTableMeta);
CChildTableMeta* tscCreateChildMeta(STableMeta* pTableMeta);
uint32_t tscGetTableMetaMaxSize();
int32_t tscCreateTableMetaFromCChildMeta(STableMeta* pChild, const char* name);
STableMeta* tscTableMetaClone(STableMeta* pTableMeta);
void* malloc_throw(size_t size);
void* calloc_throw(size_t nmemb, size_t size);
char* strdup_throw(const char* str);
......
......@@ -105,7 +105,10 @@ SSchema tscGetTbnameColumnSchema();
* @param size size of the table meta
* @return
*/
STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size);
STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg);
bool vgroupInfoIdentical(SNewVgroupInfo *pExisted, SVgroupMsg* src);
SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg);
#ifdef __cplusplus
}
......
......@@ -56,23 +56,28 @@ typedef struct STableComInfo {
int32_t rowSize;
} STableComInfo;
typedef struct SCorVgroupInfo {
int32_t version;
int8_t inUse;
int8_t numOfEps;
SEpAddr1 epAddr[TSDB_MAX_REPLICA];
} SCorVgroupInfo;
typedef struct SNewVgroupInfo {
int32_t vgId;
int8_t inUse;
int8_t numOfEps;
SEpAddrMsg ep[TSDB_MAX_REPLICA];
} SNewVgroupInfo;
typedef struct CChildTableMeta {
int32_t vgId;
STableId id;
uint8_t tableType;
char sTableName[TSDB_TABLE_FNAME_LEN];
} CChildTableMeta;
typedef struct STableMeta {
STableComInfo tableInfo;
int32_t vgId;
STableId id;
uint8_t tableType;
char sTableName[TSDB_TABLE_FNAME_LEN];
int16_t sversion;
int16_t tversion;
char sTableId[TSDB_TABLE_FNAME_LEN];
int32_t vgId;
SCorVgroupInfo corVgroupInfo;
STableId id;
// union {int64_t stableUid; SSchema* schema;};
STableComInfo tableInfo;
SSchema schema[]; // if the table is TSDB_CHILD_TABLE, schema is acquired by super table meta info
} STableMeta;
......@@ -171,7 +176,7 @@ typedef struct SParamInfo {
} SParamInfo;
typedef struct STableDataBlocks {
char tableId[TSDB_TABLE_FNAME_LEN];
char tableName[TSDB_TABLE_FNAME_LEN];
int8_t tsSource; // where does the UNIX timestamp come from, server or client
bool ordered; // if current rows are ordered or not
int64_t vgId; // virtual group id
......@@ -249,7 +254,7 @@ typedef struct {
int8_t submitSchema; // submit block is built with table schema
STagData tagData; // NOTE: pTagData->data is used as a variant length array
STableMeta **pTableMetaList; // all involved tableMeta list of current insert sql statement.
char **pTableNameList; // all involved tableMeta list of current insert sql statement.
int32_t numOfTables;
SHashObj *pTableBlockHashList; // data block for each table
......@@ -386,7 +391,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet);
int tscProcessSql(SSqlObj *pSql);
int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex);
void tscQueueAsyncRes(SSqlObj *pSql);
void tscAsyncResultOnError(SSqlObj *pSql);
void tscQueueAsyncError(void(*fp), void *param, int32_t code);
......@@ -400,7 +405,7 @@ void tscRestoreSQLFuncForSTableQuery(SQueryInfo *pQueryInfo);
int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo);
void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo);
void tscResetSqlCmdObj(SSqlCmd *pCmd, bool removeFromCache);
void tscResetSqlCmdObj(SSqlCmd *pCmd);
/**
* free query result of the sql object
......@@ -414,7 +419,6 @@ void tscFreeSqlResult(SSqlObj *pSql);
*/
void tscFreeSqlObj(SSqlObj *pSql);
void tscFreeRegisteredSqlObj(void *pSql);
void tscFreeTableMetaHelper(void *pTableMeta);
void tscCloseTscObj(void *pObj);
......@@ -479,7 +483,9 @@ static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pField
}
}
extern SCacheObj *tscMetaCache;
extern int32_t sentinel;
extern SHashObj *tscVgroupMap;
extern SHashObj *tscTableMetaInfo;
extern int tscObjRef;
extern void *tscTmr;
......
......@@ -18,7 +18,6 @@
#include "tnote.h"
#include "trpc.h"
#include "tcache.h"
#include "tscLog.h"
#include "tscSubquery.h"
#include "tscLocalMerge.h"
......@@ -57,7 +56,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para
if (pSql->sqlstr == NULL) {
tscError("%p failed to malloc sql string buffer", pSql);
pSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscQueueAsyncRes(pSql);
tscAsyncResultOnError(pSql);
return;
}
......@@ -71,7 +70,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para
if (code != TSDB_CODE_SUCCESS) {
pSql->res.code = code;
tscQueueAsyncRes(pSql);
tscAsyncResultOnError(pSql);
return;
}
......@@ -166,7 +165,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
pRes->code = numOfRows;
}
tscQueueAsyncRes(pSql);
tscAsyncResultOnError(pSql);
return;
}
......@@ -217,7 +216,7 @@ void taos_fetch_rows_a(TAOS_RES *taosa, __async_cb_func_t fp, void *param) {
pRes->code = TSDB_CODE_TSC_INVALID_QHANDLE;
pSql->param = param;
tscQueueAsyncRes(pSql);
tscAsyncResultOnError(pSql);
return;
}
......@@ -280,7 +279,7 @@ void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW),
pSql->param = param;
pRes->code = TSDB_CODE_TSC_INVALID_QHANDLE;
tscQueueAsyncRes(pSql);
tscAsyncResultOnError(pSql);
return;
}
......@@ -382,7 +381,7 @@ void tscQueueAsyncError(void(*fp), void *param, int32_t code) {
}
void tscQueueAsyncRes(SSqlObj *pSql) {
void tscAsyncResultOnError(SSqlObj *pSql) {
if (pSql == NULL || pSql->signature != pSql) {
tscDebug("%p SqlObj is freed, not add into queue async res", pSql);
return;
......@@ -423,7 +422,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
// check if it is a sub-query of super table query first, if true, enter another routine
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY|TSDB_QUERY_TYPE_TAG_FILTER_QUERY))) {
tscDebug("%p update table meta in local cache, continue to process sql and send the corresponding query", pSql);
tscDebug("%p update local table meta, continue to process sql and send the corresponding query", pSql);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo);
......@@ -440,7 +439,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
return;
} else { // continue to process normal async query
if (pCmd->parseFinished) {
tscDebug("%p update table meta in local cache, continue to process sql and send corresponding query", pSql);
tscDebug("%p update local table meta, continue to process sql and send corresponding query", pSql);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo);
......@@ -455,7 +454,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
if (pCmd->command == TSDB_SQL_SELECT) {
tscDebug("%p redo parse sql string and proceed", pSql);
pCmd->parseFinished = false;
tscResetSqlCmdObj(pCmd, false);
tscResetSqlCmdObj(pCmd);
code = tsParseSql(pSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
......@@ -532,6 +531,6 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
_error:
if (code != TSDB_CODE_SUCCESS) {
pSql->res.code = code;
tscQueueAsyncRes(pSql);
tscAsyncResultOnError(pSql);
}
}
......@@ -17,7 +17,6 @@
#include "taosmsg.h"
#include "taosdef.h"
#include "tcache.h"
#include "tname.h"
#include "tscLog.h"
#include "tscUtil.h"
......@@ -273,7 +272,7 @@ void tscSCreateCallBack(void *param, TAOS_RES *tres, int code) {
if (pRes->code != TSDB_CODE_SUCCESS) {
taos_free_result(pSql);
free(builder);
tscQueueAsyncRes(pParentSql);
tscAsyncResultOnError(pParentSql);
return;
}
......@@ -291,7 +290,7 @@ void tscSCreateCallBack(void *param, TAOS_RES *tres, int code) {
if (pRes->code == TSDB_CODE_SUCCESS) {
(*pParentSql->fp)(pParentSql->param, pParentSql, code);
} else {
tscQueueAsyncRes(pParentSql);
tscAsyncResultOnError(pParentSql);
}
}
}
......@@ -571,7 +570,7 @@ static int32_t tscRebuildDDLForSubTable(SSqlObj *pSql, const char *tableName, ch
char fullName[TSDB_TABLE_FNAME_LEN * 2] = {0};
extractDBName(pTableMetaInfo->name, fullName);
extractTableName(pMeta->sTableId, param->sTableName);
extractTableName(pMeta->sTableName, param->sTableName);
snprintf(fullName + strlen(fullName), TSDB_TABLE_FNAME_LEN - strlen(fullName), ".%s", param->sTableName);
extractTableName(pTableMetaInfo->name, param->buf);
......@@ -901,7 +900,7 @@ int tscProcessLocalCmd(SSqlObj *pSql) {
} else if (pCmd->command == TSDB_SQL_SHOW_CREATE_DATABASE) {
pRes->code = tscProcessShowCreateDatabase(pSql);
} else if (pCmd->command == TSDB_SQL_RESET_CACHE) {
taosCacheEmpty(tscMetaCache);
taosHashEmpty(tscTableMetaInfo);
pRes->code = TSDB_CODE_SUCCESS;
} else if (pCmd->command == TSDB_SQL_SERV_VERSION) {
pRes->code = tscProcessServerVer(pSql);
......@@ -925,7 +924,7 @@ int tscProcessLocalCmd(SSqlObj *pSql) {
(*pSql->fp)(pSql->param, pSql, code);
} else if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS){
} else {
tscQueueAsyncRes(pSql);
tscAsyncResultOnError(pSql);
}
return code;
}
......@@ -89,7 +89,7 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalReducer *pReducer, tOrderDesc
pCtx->startOffset = 0;
pCtx->size = 1;
pCtx->hasNull = true;
pCtx->currentStage = SECONDARY_STAGE_MERGE;
pCtx->currentStage = MERGE_STAGE;
// for top/bottom function, the output of timestamp is the first column
int32_t functionId = pExpr->functionId;
......@@ -1067,7 +1067,7 @@ static void doExecuteSecondaryMerge(SSqlCmd *pCmd, SLocalReducer *pLocalReducer,
pCtx->param[0].i64Key = pExpr->param[0].i64Key;
}
pCtx->currentStage = SECONDARY_STAGE_MERGE;
pCtx->currentStage = MERGE_STAGE;
if (needInit) {
aAggs[pCtx->functionId].init(pCtx);
......@@ -1080,7 +1080,7 @@ static void doExecuteSecondaryMerge(SSqlCmd *pCmd, SLocalReducer *pLocalReducer,
continue;
}
aAggs[functionId].distSecondaryMergeFunc(&pLocalReducer->pCtx[j]);
aAggs[functionId].mergeFunc(&pLocalReducer->pCtx[j]);
}
}
......@@ -1647,7 +1647,7 @@ int32_t doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_
// calculate the result from several other columns
if (pSup->pArithExprInfo != NULL) {
arithSup.pArithExpr = pSup->pArithExprInfo;
tExprTreeCalcTraverse(arithSup.pArithExpr->pExpr, (int32_t) pOutput->num, pbuf + pOutput->num*offset, &arithSup, TSDB_ORDER_ASC, getArithmeticInputSrc);
arithmeticTreeTraverse(arithSup.pArithExpr->pExpr, (int32_t) pOutput->num, pbuf + pOutput->num*offset, &arithSup, TSDB_ORDER_ASC, getArithmeticInputSrc);
} else {
SSqlExpr* pExpr = pSup->pSqlExpr;
memcpy(pbuf + pOutput->num * offset, pExpr->offset * pOutput->num + pOutput->data, (size_t)(pExpr->resBytes * pOutput->num));
......
......@@ -1339,7 +1339,7 @@ int tsParseSql(SSqlObj *pSql, bool initial) {
if (sqlstr == NULL || pSql->parseRetry >= 1 || ret != TSDB_CODE_TSC_INVALID_SQL) {
free(sqlstr);
} else {
tscResetSqlCmdObj(pCmd, true);
tscResetSqlCmdObj(pCmd);
free(pSql->sqlstr);
pSql->sqlstr = sqlstr;
pSql->parseRetry++;
......@@ -1351,7 +1351,7 @@ int tsParseSql(SSqlObj *pSql, bool initial) {
SSqlInfo SQLInfo = qSQLParse(pSql->sqlstr);
ret = tscToSQLCmd(pSql, &SQLInfo);
if (ret == TSDB_CODE_TSC_INVALID_SQL && pSql->parseRetry == 0 && SQLInfo.type == TSDB_SQL_NULL) {
tscResetSqlCmdObj(pCmd, true);
tscResetSqlCmdObj(pCmd);
pSql->parseRetry++;
ret = tscToSQLCmd(pSql, &SQLInfo);
}
......@@ -1429,7 +1429,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) {
fclose(fp);
pParentSql->res.code = code;
tscQueueAsyncRes(pParentSql);
tscAsyncResultOnError(pParentSql);
return;
} while (0);
}
......@@ -1451,7 +1451,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) {
int32_t count = 0;
int32_t maxRows = 0;
tfree(pCmd->pTableMetaList);
tfree(pCmd->pTableNameList);
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
if (pCmd->pTableBlockHashList == NULL) {
......@@ -1500,7 +1500,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) {
code = doPackSendDataBlock(pSql, count, pTableDataBlock);
if (code != TSDB_CODE_SUCCESS) {
pParentSql->res.code = code;
tscQueueAsyncRes(pParentSql);
tscAsyncResultOnError(pParentSql);
return;
}
......@@ -1535,7 +1535,7 @@ void tscProcessMultiVnodesImportFromFile(SSqlObj *pSql) {
tscError("%p failed to open file %s to load data from file, code:%s", pSql, pCmd->payload, tstrerror(pSql->res.code));
tfree(pSupporter);
tscQueueAsyncRes(pSql);
tscAsyncResultOnError(pSql);
return;
}
......
......@@ -910,7 +910,7 @@ int32_t tscSetTableFullName(STableMetaInfo* pTableMetaInfo, SStrToken* pzTableNa
* that are corresponding to the old name for the new table name.
*/
if (strlen(oldName) > 0 && strncasecmp(oldName, pTableMetaInfo->name, tListLen(pTableMetaInfo->name)) != 0) {
tscClearTableMetaInfo(pTableMetaInfo, false);
tscClearTableMetaInfo(pTableMetaInfo);
}
return TSDB_CODE_SUCCESS;
......@@ -4019,6 +4019,7 @@ static int32_t setTableCondForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo,
if (pExpr->nSQLOptr == TK_LIKE) {
char* str = taosStringBuilderGetResult(sb, NULL);
pQueryInfo->tagCond.tbnameCond.cond = strdup(str);
pQueryInfo->tagCond.tbnameCond.len = (int32_t) strlen(str);
return TSDB_CODE_SUCCESS;
}
......@@ -4068,6 +4069,7 @@ static int32_t setTableCondForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo,
char* str = taosStringBuilderGetResult(&sb1, NULL);
pQueryInfo->tagCond.tbnameCond.cond = strdup(str);
pQueryInfo->tagCond.tbnameCond.len = (int32_t) strlen(str);
taosStringBuilderDestroy(&sb1);
tfree(segments);
......@@ -6381,6 +6383,41 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
return TSDB_CODE_SUCCESS;
}
static int32_t checkQueryRangeForFill(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
const char* msg3 = "start(end) time of query range required or time range too large";
if (pQueryInfo->interval.interval == 0) {
return TSDB_CODE_SUCCESS;
}
bool initialWindows = TSWINDOW_IS_EQUAL(pQueryInfo->window, TSWINDOW_INITIALIZER);
if (initialWindows) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
int64_t timeRange = ABS(pQueryInfo->window.skey - pQueryInfo->window.ekey);
int64_t intervalRange = 0;
if (pQueryInfo->interval.intervalUnit == 'n' || pQueryInfo->interval.intervalUnit == 'y') {
int64_t f = 1;
if (pQueryInfo->interval.intervalUnit == 'n') {
f = 30L * MILLISECOND_PER_DAY;
} else if (pQueryInfo->interval.intervalUnit == 'y') {
f = 365L * MILLISECOND_PER_DAY;
}
intervalRange = pQueryInfo->interval.interval * f;
} else {
intervalRange = pQueryInfo->interval.interval;
}
// number of result is not greater than 10,000,000
if ((timeRange == 0) || (timeRange / intervalRange) >= MAX_INTERVAL_TIME_WINDOW) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
return TSDB_CODE_SUCCESS;
}
int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
assert(pQuerySql != NULL && (pQuerySql->from == NULL || taosArrayGetSize(pQuerySql->from) > 0));
......@@ -6576,31 +6613,21 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
tscFieldInfoUpdateOffset(pQueryInfo);
/*
* fill options are set at the end position, when all columns are set properly
* the columns may be increased due to group by operation
*/
if (pQuerySql->fillType != NULL) {
if (pQueryInfo->interval.interval == 0 && (!tscIsPointInterpQuery(pQueryInfo))) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
if (pQueryInfo->interval.interval > 0) {
bool initialWindows = TSWINDOW_IS_EQUAL(pQueryInfo->window, TSWINDOW_INITIALIZER);
if (initialWindows) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
int64_t timeRange = ABS(pQueryInfo->window.skey - pQueryInfo->window.ekey);
// number of result is not greater than 10,000,000
if ((timeRange == 0) || (timeRange / pQueryInfo->interval.interval) > MAX_INTERVAL_TIME_WINDOW) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
/*
* fill options are set at the end position, when all columns are set properly
* the columns may be increased due to group by operation
*/
if ((code = checkQueryRangeForFill(pCmd, pQueryInfo)) != TSDB_CODE_SUCCESS) {
return code;
}
int32_t ret = parseFillClause(pCmd, pQueryInfo, pQuerySql);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
if ((code = parseFillClause(pCmd, pQueryInfo, pQuerySql)) != TSDB_CODE_SUCCESS) {
return code;
}
}
......
......@@ -130,19 +130,8 @@ SSchema* tscGetColumnSchemaById(STableMeta* pTableMeta, int16_t colId) {
return NULL;
}
static void tscInitCorVgroupInfo(SCorVgroupInfo *corVgroupInfo, SVgroupMsg *pVgroupMsg) {
corVgroupInfo->version = 0;
corVgroupInfo->inUse = 0;
corVgroupInfo->numOfEps = pVgroupMsg->numOfEps;
for (int32_t i = 0; i < pVgroupMsg->numOfEps; i++) {
corVgroupInfo->epAddr[i].fqdn = strndup(pVgroupMsg->epAddr[i].fqdn, tListLen(pVgroupMsg->epAddr[0].fqdn));
corVgroupInfo->epAddr[i].port = pVgroupMsg->epAddr[i].port;
}
}
STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size) {
assert(pTableMetaMsg != NULL);
STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg) {
assert(pTableMetaMsg != NULL && pTableMetaMsg->numOfColumns >= 2 && pTableMetaMsg->numOfTags >= 0);
int32_t schemaSize = (pTableMetaMsg->numOfColumns + pTableMetaMsg->numOfTags) * sizeof(SSchema);
STableMeta* pTableMeta = calloc(1, sizeof(STableMeta) + schemaSize);
......@@ -159,11 +148,9 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size
pTableMeta->id.tid = pTableMetaMsg->tid;
pTableMeta->id.uid = pTableMetaMsg->uid;
tscInitCorVgroupInfo(&pTableMeta->corVgroupInfo, &pTableMetaMsg->vgroup);
pTableMeta->sversion = pTableMetaMsg->sversion;
pTableMeta->tversion = pTableMetaMsg->tversion;
tstrncpy(pTableMeta->sTableId, pTableMetaMsg->sTableId, TSDB_TABLE_FNAME_LEN);
tstrncpy(pTableMeta->sTableName, pTableMetaMsg->sTableName, TSDB_TABLE_FNAME_LEN);
memcpy(pTableMeta->schema, pTableMetaMsg->schema, schemaSize);
......@@ -172,13 +159,44 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size
pTableMeta->tableInfo.rowSize += pTableMeta->schema[i].bytes;
}
if (size != NULL) {
*size = sizeof(STableMeta) + schemaSize;
}
return pTableMeta;
}
bool vgroupInfoIdentical(SNewVgroupInfo *pExisted, SVgroupMsg* src) {
assert(pExisted != NULL && src != NULL);
if (pExisted->numOfEps != src->numOfEps) {
return false;
}
for(int32_t i = 0; i < pExisted->numOfEps; ++i) {
if (pExisted->ep[i].port != src->epAddr[i].port) {
return false;
}
if (strncmp(pExisted->ep[i].fqdn, src->epAddr[i].fqdn, tListLen(pExisted->ep[i].fqdn)) != 0) {
return false;
}
}
return true;
}
SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg) {
assert(pVgroupMsg != NULL);
SNewVgroupInfo info = {0};
info.numOfEps = pVgroupMsg->numOfEps;
info.vgId = pVgroupMsg->vgId;
info.inUse = 0;
for(int32_t i = 0; i < pVgroupMsg->numOfEps; ++i) {
tstrncpy(info.ep[i].fqdn, pVgroupMsg->epAddr[i].fqdn, TSDB_FQDN_LEN);
info.ep[i].port = pVgroupMsg->epAddr[i].port;
}
return info;
}
// todo refactor
UNUSED_FUNC static FORCE_INLINE char* skipSegments(char* input, char delim, int32_t num) {
for (int32_t i = 0; i < num; ++i) {
......
此差异已折叠。
......@@ -709,7 +709,7 @@ static void tscKillSTableQuery(SSqlObj *pSql) {
pSubObj->rpcRid = -1;
}
tscQueueAsyncRes(pSubObj);
tscAsyncResultOnError(pSubObj);
taosReleaseRef(tscObjRef, pSubObj->self);
}
......@@ -745,7 +745,7 @@ void taos_stop_query(TAOS_RES *res) {
pSql->rpcRid = -1;
}
tscQueueAsyncRes(pSql);
tscAsyncResultOnError(pSql);
}
}
......@@ -909,7 +909,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t tblListLen) {
// must before clean the sqlcmd object
tscResetSqlCmdObj(&pSql->cmd, false);
tscResetSqlCmdObj(&pSql->cmd);
SSqlCmd *pCmd = &pSql->cmd;
......
......@@ -167,7 +167,9 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
retryDelay);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pStream->pSql->cmd, 0, 0);
taosCacheRelease(tscMetaCache, (void**)&(pTableMetaInfo->pTableMeta), true);
char* name = pTableMetaInfo->name;
taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
pTableMetaInfo->vgroupList = tscVgroupInfoClear(pTableMetaInfo->vgroupList);
tscSetRetryTimer(pStream, pStream->pSql, retryDelay);
......@@ -269,9 +271,8 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
tscDebug("%p stream:%p, query on:%s, fetch result completed, fetched rows:%" PRId64, pSql, pStream, pTableMetaInfo->name,
pStream->numOfRes);
// release the metric/meter meta information reference, so data in cache can be updated
tfree(pTableMetaInfo->pTableMeta);
taosCacheRelease(tscMetaCache, (void**)&(pTableMetaInfo->pTableMeta), false);
tscFreeSqlResult(pSql);
tfree(pSql->pSubs);
pSql->subState.numOfSub = 0;
......
......@@ -779,7 +779,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
pParentSql->res.code = numOfRows;
quitAllSubquery(pParentSql, pSupporter);
tscQueueAsyncRes(pParentSql);
tscAsyncResultOnError(pParentSql);
return;
}
......@@ -796,7 +796,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
quitAllSubquery(pParentSql, pSupporter);
tscQueueAsyncRes(pParentSql);
tscAsyncResultOnError(pParentSql);
return;
}
......@@ -845,7 +845,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
if (code != TSDB_CODE_SUCCESS) {
freeJoinSubqueryObj(pParentSql);
pParentSql->res.code = code;
tscQueueAsyncRes(pParentSql);
tscAsyncResultOnError(pParentSql);
taosArrayDestroy(s1);
taosArrayDestroy(s2);
......@@ -916,7 +916,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
pParentSql->res.code = numOfRows;
quitAllSubquery(pParentSql, pSupporter);
tscQueueAsyncRes(pParentSql);
tscAsyncResultOnError(pParentSql);
return;
}
......@@ -930,7 +930,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
tscError("%p invalid ts comp file from vnode, abort subquery, file size:%d", pSql, numOfRows);
pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
tscQueueAsyncRes(pParentSql);
tscAsyncResultOnError(pParentSql);
return;
}
......@@ -1028,7 +1028,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
pParentSql->res.code = numOfRows;
tscError("%p retrieve failed, index:%d, code:%s", pSql, pSupporter->subqueryIndex, tstrerror(numOfRows));
tscQueueAsyncRes(pParentSql);
tscAsyncResultOnError(pParentSql);
return;
}
......@@ -1155,7 +1155,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
if (pSql->res.code == TSDB_CODE_SUCCESS) {
(*pSql->fp)(pSql->param, pSql, 0);
} else {
tscQueueAsyncRes(pSql);
tscAsyncResultOnError(pSql);
}
return;
......@@ -1233,7 +1233,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
if (pSql->res.code == TSDB_CODE_SUCCESS) {
(*pSql->fp)(pSql->param, pSql, 0);
} else {
tscQueueAsyncRes(pSql);
tscAsyncResultOnError(pSql);
}
return;
......@@ -1344,7 +1344,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, code, pParentSql->res.code);
quitAllSubquery(pParentSql, pSupporter);
tscQueueAsyncRes(pParentSql);
tscAsyncResultOnError(pParentSql);
return;
}
......@@ -1357,7 +1357,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
pParentSql->res.code = code;
quitAllSubquery(pParentSql, pSupporter);
tscQueueAsyncRes(pParentSql);
tscAsyncResultOnError(pParentSql);
return;
}
......@@ -1403,7 +1403,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
if (pParentSql->res.code == TSDB_CODE_SUCCESS) {
(*pParentSql->fp)(pParentSql->param, pParentSql, 0);
} else {
tscQueueAsyncRes(pParentSql);
tscAsyncResultOnError(pParentSql);
}
}
}
......@@ -1612,7 +1612,7 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
_error:
pRes->code = code;
tscQueueAsyncRes(pSql);
tscAsyncResultOnError(pSql);
}
static void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs) {
......@@ -1666,7 +1666,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, &pFinalModel, nBufferSize);
if (ret != 0) {
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscQueueAsyncRes(pSql);
tscAsyncResultOnError(pSql);
tfree(pMemoryBuf);
return ret;
}
......@@ -1680,7 +1680,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pFinalModel,pState->numOfSub);
tscQueueAsyncRes(pSql);
tscAsyncResultOnError(pSql);
return ret;
}
......@@ -1890,7 +1890,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
(*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.code);
} else { // regular super table query
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
tscQueueAsyncRes(pParentSql);
tscAsyncResultOnError(pParentSql);
}
}
}
......@@ -1968,7 +1968,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
if (pParentSql->res.code == TSDB_CODE_SUCCESS) {
(*pParentSql->fp)(pParentSql->param, pParentSql, 0);
} else {
tscQueueAsyncRes(pParentSql);
tscAsyncResultOnError(pParentSql);
}
}
......@@ -2220,7 +2220,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
(*pParentObj->fp)(pParentObj->param, pParentObj, v);
} else {
if (!needRetryInsert(pParentObj, numOfSub)) {
tscQueueAsyncRes(pParentObj);
tscAsyncResultOnError(pParentObj);
return;
}
......@@ -2231,7 +2231,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
numOfFailed += 1;
// clean up tableMeta in cache
tscFreeQueryInfo(&pSql->cmd, true);
tscFreeQueryInfo(&pSql->cmd);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetailSafely(&pSql->cmd, 0);
STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pParentObj->cmd, pSql->cmd.clauseIndex, 0);
tscAddTableMetaInfo(pQueryInfo, pMasterTableMetaInfo->name, NULL, NULL, NULL, NULL);
......@@ -2243,15 +2243,16 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
tscError("%p Async insertion completed, total inserted:%d rows, numOfFailed:%d, numOfTotal:%d", pParentObj,
pParentObj->res.numOfRows, numOfFailed, numOfSub);
tscDebug("%p cleanup %d tableMeta in cache", pParentObj, pParentObj->cmd.numOfTables);
tscDebug("%p cleanup %d tableMeta in hashTable", pParentObj, pParentObj->cmd.numOfTables);
for(int32_t i = 0; i < pParentObj->cmd.numOfTables; ++i) {
taosCacheRelease(tscMetaCache, (void**)&(pParentObj->cmd.pTableMetaList[i]), true);
char* name = pParentObj->cmd.pTableNameList[i];
taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
}
pParentObj->cmd.parseFinished = false;
pParentObj->subState.numOfRemain = numOfFailed;
tscResetSqlCmdObj(&pParentObj->cmd, false);
tscResetSqlCmdObj(&pParentObj->cmd);
// in case of insert, redo parsing the sql string and build new submit data block for two reasons:
// 1. the table Id(tid & uid) may have been update, the submit block needs to be updated accordingly.
......@@ -2264,7 +2265,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
if (code != TSDB_CODE_SUCCESS) {
pParentObj->res.code = code;
tscQueueAsyncRes(pParentObj);
tscAsyncResultOnError(pParentObj);
return;
}
......@@ -2288,7 +2289,7 @@ int32_t tscHandleInsertRetry(SSqlObj* pParent, SSqlObj* pSql) {
int32_t code = tscCopyDataBlockToPayload(pSql, pTableDataBlock);
if ((pRes->code = code)!= TSDB_CODE_SUCCESS) {
tscQueueAsyncRes(pSql);
tscAsyncResultOnError(pSql);
return code; // here the pSql may have been released already.
}
......@@ -2481,7 +2482,7 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) {
SSqlRes* pRes = &pSql->res;
if (pRes->code != TSDB_CODE_SUCCESS) {
tscQueueAsyncRes(pSql);
tscAsyncResultOnError(pSql);
return;
}
......@@ -2496,7 +2497,7 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) {
if (pRes->tsrow == NULL || pRes->buffer == NULL || pRes->length == NULL) {
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscQueueAsyncRes(pSql);
tscAsyncResultOnError(pSql);
return;
}
......@@ -2508,7 +2509,7 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) {
if (pRes->code == TSDB_CODE_SUCCESS) {
(*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
} else {
tscQueueAsyncRes(pSql);
tscAsyncResultOnError(pSql);
}
}
......
......@@ -31,15 +31,20 @@
#include "tlocale.h"
// global, not configurable
SCacheObj *tscMetaCache; // table meta cache
SHashObj *tscHashMap; // hash map to keep the global vgroup info
int tscObjRef = -1;
void *tscTmr;
void *tscQhandle;
void *tscCheckDiskUsageTmr;
int tscRefId = -1;
int tscNumOfObj = 0; // number of sqlObj in current process.
#define TSC_VAR_NOT_RELEASE 1
#define TSC_VAR_RELEASED 0
int32_t sentinel = TSC_VAR_NOT_RELEASE;
SHashObj *tscVgroupMap; // hash map to keep the global vgroup info
SHashObj *tscTableMetaInfo; // table meta info
int32_t tscObjRef = -1;
void *tscTmr;
void *tscQhandle;
int32_t tscRefId = -1;
int32_t tscNumOfObj = 0; // number of sqlObj in current process.
static void *tscCheckDiskUsageTmr;
static pthread_once_t tscinit = PTHREAD_ONCE_INIT;
void tscCheckDiskUsage(void *UNUSED_PARAM(para), void* UNUSED_PARAM(param)) {
......@@ -129,11 +134,11 @@ void taos_init_imp(void) {
taosTmrReset(tscCheckDiskUsage, 10, NULL, tscTmr, &tscCheckDiskUsageTmr);
}
int64_t refreshTime = 10; // 10 seconds by default
if (tscMetaCache == NULL) {
tscMetaCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, tscFreeTableMetaHelper, "tableMeta");
if (tscTableMetaInfo == NULL) {
tscObjRef = taosOpenRef(40960, tscFreeRegisteredSqlObj);
tscHashMap = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
tscVgroupMap = taosHashInit(256, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
tscTableMetaInfo = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
tscDebug("TableMeta:%p", tscTableMetaInfo);
}
tscRefId = taosOpenRef(200, tscCloseTscObj);
......@@ -151,30 +156,38 @@ void taos_init() { pthread_once(&tscinit, taos_init_imp); }
void taos_cleanup(void) {
tscDebug("start to cleanup client environment");
void* m = tscMetaCache;
if (m != NULL && atomic_val_compare_exchange_ptr(&tscMetaCache, m, 0) == m) {
taosCacheCleanup(m);
if (atomic_val_compare_exchange_32(&sentinel, TSC_VAR_NOT_RELEASE, TSC_VAR_RELEASED) != TSC_VAR_NOT_RELEASE) {
return;
}
int refId = atomic_exchange_32(&tscObjRef, -1);
if (refId != -1) {
taosCloseRef(refId);
}
taosHashCleanup(tscTableMetaInfo);
tscTableMetaInfo = NULL;
m = tscQhandle;
if (m != NULL && atomic_val_compare_exchange_ptr(&tscQhandle, m, 0) == m) {
taosCleanUpScheduler(m);
}
taosHashCleanup(tscVgroupMap);
tscVgroupMap = NULL;
int32_t id = tscObjRef;
tscObjRef = -1;
taosCloseRef(id);
void* p = tscQhandle;
tscQhandle = NULL;
taosCleanUpScheduler(p);
id = tscRefId;
tscRefId = -1;
taosCloseRef(id);
taosCloseRef(tscRefId);
taosCleanupKeywordsTable();
taosCloseLog();
if (tscEmbedded == 0) rpcCleanup();
m = tscTmr;
if (m != NULL && atomic_val_compare_exchange_ptr(&tscTmr, m, 0) == m) {
taosTmrCleanUp(m);
if (tscEmbedded == 0) {
rpcCleanup();
}
p = tscTmr;
tscTmr = NULL;
taosTmrCleanUp(p);
}
static int taos_options_imp(TSDB_OPTION option, const char *pStr) {
......
......@@ -18,7 +18,6 @@
#include "os.h"
#include "qAst.h"
#include "taosmsg.h"
#include "tcache.h"
#include "tkey.h"
#include "tmd5.h"
#include "tscLocalMerge.h"
......@@ -31,7 +30,7 @@
#include "ttokendef.h"
static void freeQueryInfoImpl(SQueryInfo* pQueryInfo);
static void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache);
static void clearAllTableMetaInfo(SQueryInfo* pQueryInfo);
SCond* tsGetSTableQueryCond(STagCond* pTagCond, uint64_t uid) {
if (pTagCond->pCond == NULL) {
......@@ -379,17 +378,16 @@ static void tscDestroyResPointerInfo(SSqlRes* pRes) {
pRes->data = NULL; // pRes->data points to the buffer of pRsp, no need to free
}
void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeFromCache) {
void tscFreeQueryInfo(SSqlCmd* pCmd) {
if (pCmd == NULL || pCmd->numOfClause == 0) {
return;
}
for (int32_t i = 0; i < pCmd->numOfClause; ++i) {
char* addr = (char*)pCmd - offsetof(SSqlObj, cmd);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, i);
freeQueryInfoImpl(pQueryInfo);
clearAllTableMetaInfo(pQueryInfo, (const char*)addr, removeFromCache);
clearAllTableMetaInfo(pQueryInfo);
tfree(pQueryInfo);
}
......@@ -397,7 +395,7 @@ void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeFromCache) {
tfree(pCmd->pQueryInfo);
}
void tscResetSqlCmdObj(SSqlCmd* pCmd, bool removeFromCache) {
void tscResetSqlCmdObj(SSqlCmd* pCmd) {
pCmd->command = 0;
pCmd->numOfCols = 0;
pCmd->count = 0;
......@@ -407,17 +405,17 @@ void tscResetSqlCmdObj(SSqlCmd* pCmd, bool removeFromCache) {
pCmd->autoCreated = 0;
for(int32_t i = 0; i < pCmd->numOfTables; ++i) {
if (pCmd->pTableMetaList && pCmd->pTableMetaList[i]) {
taosCacheRelease(tscMetaCache, (void**)&(pCmd->pTableMetaList[i]), false);
if (pCmd->pTableNameList && pCmd->pTableNameList[i]) {
tfree(pCmd->pTableNameList[i]);
}
}
pCmd->numOfTables = 0;
tfree(pCmd->pTableMetaList);
tfree(pCmd->pTableNameList);
pCmd->pTableBlockHashList = tscDestroyBlockHashTable(pCmd->pTableBlockHashList);
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
tscFreeQueryInfo(pCmd, removeFromCache);
tscFreeQueryInfo(pCmd);
}
void tscFreeSqlResult(SSqlObj* pSql) {
......@@ -468,17 +466,6 @@ void tscFreeRegisteredSqlObj(void *pSql) {
tscDebug("%p free SqlObj, total in tscObj:%d, total:%d", pSql, num, total);
}
void tscFreeTableMetaHelper(void *pTableMeta) {
STableMeta* p = (STableMeta*) pTableMeta;
int32_t numOfEps1 = p->corVgroupInfo.numOfEps;
assert(numOfEps1 >= 0 && numOfEps1 <= TSDB_MAX_REPLICA);
for(int32_t i = 0; i < numOfEps1; ++i) {
tfree(p->corVgroupInfo.epAddr[i].fqdn);
}
}
void tscFreeSqlObj(SSqlObj* pSql) {
if (pSql == NULL || pSql->signature != pSql) {
return;
......@@ -506,7 +493,7 @@ void tscFreeSqlObj(SSqlObj* pSql) {
pSql->self = 0;
tscFreeSqlResult(pSql);
tscResetSqlCmdObj(pCmd, false);
tscResetSqlCmdObj(pCmd);
tfree(pCmd->tagData.data);
pCmd->tagData.dataLen = 0;
......@@ -529,7 +516,7 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock) {
// free the refcount for metermeta
if (pDataBlock->pTableMeta != NULL) {
taosCacheRelease(tscMetaCache, (void**)&(pDataBlock->pTableMeta), false);
tfree(pDataBlock->pTableMeta);
}
tfree(pDataBlock);
......@@ -600,15 +587,15 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
// set the correct table meta object, the table meta has been locked in pDataBlocks, so it must be in the cache
if (pTableMetaInfo->pTableMeta != pDataBlock->pTableMeta) {
tstrncpy(pTableMetaInfo->name, pDataBlock->tableId, sizeof(pTableMetaInfo->name));
tstrncpy(pTableMetaInfo->name, pDataBlock->tableName, sizeof(pTableMetaInfo->name));
if (pTableMetaInfo->pTableMeta != NULL) {
taosCacheRelease(tscMetaCache, (void**)&(pTableMetaInfo->pTableMeta), false);
tfree(pTableMetaInfo->pTableMeta);
}
pTableMetaInfo->pTableMeta = taosCacheTransfer(tscMetaCache, (void**)&pDataBlock->pTableMeta);
pTableMetaInfo->pTableMeta = tscTableMetaClone(pDataBlock->pTableMeta);
} else {
assert(strncmp(pTableMetaInfo->name, pDataBlock->tableId, tListLen(pDataBlock->tableId)) == 0);
assert(strncmp(pTableMetaInfo->name, pDataBlock->tableName, tListLen(pDataBlock->tableName)) == 0);
}
/*
......@@ -671,14 +658,10 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff
dataBuf->size = startOffset;
dataBuf->tsSource = -1;
tstrncpy(dataBuf->tableId, name, sizeof(dataBuf->tableId));
tstrncpy(dataBuf->tableName, name, sizeof(dataBuf->tableName));
/*
* The table meta may be released since the table meta cache are completed clean by other thread
* due to operation such as drop database. So here we add the reference count directly instead of invoke
* taosGetDataFromCache, which may return NULL value.
*/
dataBuf->pTableMeta = taosCacheAcquireByData(tscMetaCache, pTableMeta);
//Here we keep the tableMeta to avoid it to be remove by other threads.
dataBuf->pTableMeta = tscTableMetaClone(pTableMeta);
assert(initialSize > 0 && pTableMeta != NULL && dataBuf->pTableMeta != NULL);
*dataBlocks = dataBuf;
......@@ -784,15 +767,15 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) {
return result;
}
static void extractTableMeta(SSqlCmd* pCmd) {
static void extractTableNameList(SSqlCmd* pCmd) {
pCmd->numOfTables = (int32_t) taosHashGetSize(pCmd->pTableBlockHashList);
pCmd->pTableMetaList = calloc(pCmd->numOfTables, POINTER_BYTES);
pCmd->pTableNameList = calloc(pCmd->numOfTables, POINTER_BYTES);
STableDataBlocks **p1 = taosHashIterate(pCmd->pTableBlockHashList, NULL);
int32_t i = 0;
while(p1) {
STableDataBlocks* pBlocks = *p1;
pCmd->pTableMetaList[i++] = taosCacheTransfer(tscMetaCache, (void**) &pBlocks->pTableMeta);
pCmd->pTableNameList[i++] = strndup(pBlocks->tableName, TSDB_TABLE_FNAME_LEN);
p1 = taosHashIterate(pCmd->pTableBlockHashList, p1);
}
......@@ -815,7 +798,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql) {
STableDataBlocks* dataBuf = NULL;
int32_t ret = tscGetDataBlockFromList(pVnodeDataBlockHashList, pOneTableBlock->vgId, TSDB_PAYLOAD_SIZE,
INSERT_HEAD_SIZE, 0, pOneTableBlock->tableId, pOneTableBlock->pTableMeta, &dataBuf, pVnodeDataBlockList);
INSERT_HEAD_SIZE, 0, pOneTableBlock->tableName, pOneTableBlock->pTableMeta, &dataBuf, pVnodeDataBlockList);
if (ret != TSDB_CODE_SUCCESS) {
tscError("%p failed to prepare the data block buffer for merging table data, code:%d", pSql, ret);
taosHashCleanup(pVnodeDataBlockHashList);
......@@ -849,7 +832,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql) {
tscSortRemoveDataBlockDupRows(pOneTableBlock);
char* ekey = (char*)pBlocks->data + pOneTableBlock->rowSize*(pBlocks->numOfRows-1);
tscDebug("%p tableId:%s, sid:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, pSql, pOneTableBlock->tableId,
tscDebug("%p name:%s, sid:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, pSql, pOneTableBlock->tableName,
pBlocks->tid, pBlocks->numOfRows, pBlocks->sversion, GET_INT64_VAL(pBlocks->data), GET_INT64_VAL(ekey));
int32_t len = pBlocks->numOfRows * (pOneTableBlock->rowSize + expandSize) + sizeof(STColumn) * tscGetNumOfColumns(pOneTableBlock->pTableMeta);
......@@ -879,7 +862,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql) {
pOneTableBlock = *p;
}
extractTableMeta(pCmd);
extractTableNameList(pCmd);
// free the table data blocks;
pCmd->pDataBlocks = pVnodeDataBlockList;
......@@ -900,6 +883,7 @@ void tscCloseTscObj(void *param) {
rpcClose(pObj->pDnodeConn);
pObj->pDnodeConn = NULL;
}
tfree(pObj->tscCorMgmtEpSet);
pthread_mutex_destroy(&pObj->mutex);
......@@ -1528,6 +1512,7 @@ int32_t tscTagCondCopy(STagCond* dest, const STagCond* src) {
}
dest->tbnameCond.uid = src->tbnameCond.uid;
dest->tbnameCond.len = src->tbnameCond.len;
memcpy(&dest->joinInfo, &src->joinInfo, sizeof(SJoinInfo));
dest->relType = src->relType;
......@@ -1823,14 +1808,12 @@ SArray* tscVgroupTableInfoClone(SArray* pVgroupTables) {
return pa;
}
void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache) {
tscDebug("%p unref %d tables in the tableMeta cache", address, pQueryInfo->numOfTables);
void clearAllTableMetaInfo(SQueryInfo* pQueryInfo) {
for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
tscFreeVgroupTableInfo(pTableMetaInfo->pVgroupTables);
tscClearTableMetaInfo(pTableMetaInfo, removeFromCache);
tscClearTableMetaInfo(pTableMetaInfo);
free(pTableMetaInfo);
}
......@@ -1884,14 +1867,12 @@ STableMetaInfo* tscAddEmptyMetaInfo(SQueryInfo* pQueryInfo) {
return tscAddTableMetaInfo(pQueryInfo, NULL, NULL, NULL, NULL, NULL);
}
void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache) {
void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo) {
if (pTableMetaInfo == NULL) {
return;
}
if (pTableMetaInfo->pTableMeta != NULL) {
taosCacheRelease(tscMetaCache, (void**)&(pTableMetaInfo->pTableMeta), removeFromCache);
}
tfree(pTableMetaInfo->pTableMeta);
pTableMetaInfo->vgroupList = tscVgroupInfoClear(pTableMetaInfo->vgroupList);
tscColumnListDestroy(pTableMetaInfo->tagColList);
......@@ -2015,7 +1996,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
pNew->pTscObj = pSql->pTscObj;
pNew->signature = pNew;
pNew->sqlstr = NULL;
pNew->sqlstr = strdup(pSql->sqlstr);
SSqlCmd* pnCmd = &pNew->cmd;
memcpy(pnCmd, pCmd, sizeof(SSqlCmd));
......@@ -2031,7 +2012,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
pnCmd->numOfTables = 0;
pnCmd->parseFinished = 1;
pnCmd->pTableMetaList = NULL;
pnCmd->pTableNameList = NULL;
pnCmd->pTableBlockHashList = NULL;
if (tscAddSubqueryInfo(pnCmd) != TSDB_CODE_SUCCESS) {
......@@ -2113,8 +2094,8 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
char* name = pTableMetaInfo->name;
STableMetaInfo* pFinalInfo = NULL;
if (pPrevSql == NULL) { // get by name may failed due to the cache cleanup
STableMeta* pTableMeta = taosCacheAcquireByData(tscMetaCache, pTableMetaInfo->pTableMeta);
if (pPrevSql == NULL) {
STableMeta* pTableMeta = tscTableMetaClone(pTableMetaInfo->pTableMeta);
assert(pTableMeta != NULL);
pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pTableMeta, pTableMetaInfo->vgroupList,
......@@ -2122,15 +2103,15 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
} else { // transfer the ownership of pTableMeta to the newly create sql object.
STableMetaInfo* pPrevInfo = tscGetTableMetaInfoFromCmd(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex, 0);
STableMeta* pPrevTableMeta = taosCacheTransfer(tscMetaCache, (void**)&pPrevInfo->pTableMeta);
STableMeta* pPrevTableMeta = tscTableMetaClone(pPrevInfo->pTableMeta);
SVgroupsInfo* pVgroupsInfo = pPrevInfo->vgroupList;
pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pPrevTableMeta, pVgroupsInfo, pTableMetaInfo->tagColList,
pTableMetaInfo->pVgroupTables);
}
// this case cannot be happened
if (pFinalInfo->pTableMeta == NULL) {
tscError("%p new subquery failed since no tableMeta in cache, name:%s", pSql, name);
tscError("%p new subquery failed since no tableMeta, name:%s", pSql, name);
if (pPrevSql != NULL) { // pass the previous error to client
assert(pPrevSql->res.code != TSDB_CODE_SUCCESS);
......@@ -2557,6 +2538,7 @@ void* tscVgroupInfoClear(SVgroupsInfo *vgroupList) {
for(int32_t j = 0; j < pVgroupInfo->numOfEps; ++j) {
tfree(pVgroupInfo->epAddr[j].fqdn);
}
for(int32_t j = pVgroupInfo->numOfEps; j < TSDB_MAX_REPLICA; j++) {
assert( pVgroupInfo->epAddr[j].fqdn == NULL );
}
......@@ -2610,3 +2592,87 @@ int32_t copyTagData(STagData* dst, const STagData* src) {
return 0;
}
STableMeta* createSuperTableMeta(STableMetaMsg* pChild) {
assert(pChild != NULL);
int32_t total = pChild->numOfColumns + pChild->numOfTags;
STableMeta* pTableMeta = calloc(1, sizeof(STableMeta) + sizeof(SSchema) * total);
pTableMeta->tableType = TSDB_SUPER_TABLE;
pTableMeta->tableInfo.numOfTags = pChild->numOfTags;
pTableMeta->tableInfo.numOfColumns = pChild->numOfColumns;
pTableMeta->tableInfo.precision = pChild->precision;
pTableMeta->id.tid = 0;
pTableMeta->id.uid = pChild->suid;
pTableMeta->tversion = pChild->tversion;
pTableMeta->sversion = pChild->sversion;
memcpy(pTableMeta->schema, pChild->schema, sizeof(SSchema) * total);
int32_t num = pTableMeta->tableInfo.numOfColumns;
for(int32_t i = 0; i < num; ++i) {
pTableMeta->tableInfo.rowSize += pTableMeta->schema[i].bytes;
}
return pTableMeta;
}
uint32_t tscGetTableMetaSize(STableMeta* pTableMeta) {
assert(pTableMeta != NULL);
int32_t totalCols = pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags;
return sizeof(STableMeta) + totalCols * sizeof(SSchema);
}
CChildTableMeta* tscCreateChildMeta(STableMeta* pTableMeta) {
assert(pTableMeta != NULL);
CChildTableMeta* cMeta = calloc(1, sizeof(CChildTableMeta));
cMeta->tableType = TSDB_CHILD_TABLE;
cMeta->vgId = pTableMeta->vgId;
cMeta->id = pTableMeta->id;
tstrncpy(cMeta->sTableName, pTableMeta->sTableName, TSDB_TABLE_FNAME_LEN);
return cMeta;
}
int32_t tscCreateTableMetaFromCChildMeta(STableMeta* pChild, const char* name) {
assert(pChild != NULL);
uint32_t size = tscGetTableMetaMaxSize();
STableMeta* p = calloc(1, size);
taosHashGetClone(tscTableMetaInfo, pChild->sTableName, strnlen(pChild->sTableName, TSDB_TABLE_FNAME_LEN), NULL, p, -1);
if (p->id.uid > 0) { // tableMeta exists, build child table meta and return
pChild->sversion = p->sversion;
pChild->tversion = p->tversion;
memcpy(&pChild->tableInfo, &p->tableInfo, sizeof(STableInfo));
int32_t total = pChild->tableInfo.numOfColumns + pChild->tableInfo.numOfTags;
memcpy(pChild->schema, p->schema, sizeof(SSchema) *total);
tfree(p);
return TSDB_CODE_SUCCESS;
} else { // super table has been removed, current tableMeta is also expired. remove it here
taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
tfree(p);
return -1;
}
}
uint32_t tscGetTableMetaMaxSize() {
return sizeof(STableMeta) + TSDB_MAX_COLUMNS * sizeof(SSchema);
}
STableMeta* tscTableMetaClone(STableMeta* pTableMeta) {
assert(pTableMeta != NULL);
uint32_t size = tscGetTableMetaSize(pTableMeta);
STableMeta* p = calloc(1, size);
memcpy(p, pTableMeta, size);
return p;
}
......@@ -476,6 +476,7 @@ typedef struct {
int16_t numOfCols; // the number of columns will be load from vnode
SInterval interval;
uint16_t tagCondLen; // tag length in current query
uint32_t tbnameCondLen; // table name filter condition string length
int16_t numOfGroupCols; // num of group by columns
int16_t orderByIdx;
int16_t orderType; // used in group by xx order by xxx
......@@ -494,6 +495,7 @@ typedef struct {
int32_t tsNumOfBlocks; // ts comp block numbers
int32_t tsOrder; // ts comp block order
int32_t numOfTags; // number of tags columns involved
int32_t sqlstrLen; // sql query string
SColumnInfo colList[];
} SQueryTableMsg;
......@@ -725,7 +727,6 @@ typedef struct {
typedef struct STableMetaMsg {
int32_t contLen;
char tableId[TSDB_TABLE_FNAME_LEN]; // table id
char sTableId[TSDB_TABLE_FNAME_LEN];
uint8_t numOfTags;
uint8_t precision;
uint8_t tableType;
......@@ -735,6 +736,9 @@ typedef struct STableMetaMsg {
int32_t tid;
uint64_t uid;
SVgroupMsg vgroup;
char sTableName[TSDB_TABLE_FNAME_LEN];
uint64_t suid;
SSchema schema[];
} STableMetaMsg;
......
......@@ -2316,11 +2316,12 @@ static int32_t mnodeDoGetChildTableMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta) {
pMeta->precision = pDb->cfg.precision;
pMeta->tableType = pTable->info.type;
tstrncpy(pMeta->tableId, pTable->info.tableId, TSDB_TABLE_FNAME_LEN);
if (pTable->superTable != NULL) {
tstrncpy(pMeta->sTableId, pTable->superTable->info.tableId, TSDB_TABLE_FNAME_LEN);
}
if (pTable->info.type == TSDB_CHILD_TABLE && pTable->superTable != NULL) {
if (pTable->info.type == TSDB_CHILD_TABLE) {
assert(pTable->superTable != NULL);
tstrncpy(pMeta->sTableName, pTable->superTable->info.tableId, TSDB_TABLE_FNAME_LEN);
pMeta->suid = pTable->superTable->uid;
pMeta->sversion = htons(pTable->superTable->sversion);
pMeta->tversion = htons(pTable->superTable->tversion);
pMeta->numOfTags = (int8_t)pTable->superTable->numOfTags;
......
......@@ -23,7 +23,7 @@ extern "C" {
typedef void (*_bi_consumer_fn_t)(void *left, void *right, int32_t numOfLeft, int32_t numOfRight, void *output,
int32_t order);
_bi_consumer_fn_t tGetBiConsumerFn(int32_t leftType, int32_t rightType, int32_t optr);
_bi_consumer_fn_t getArithmeticOperatorFn(int32_t leftType, int32_t rightType, int32_t optr);
#ifdef __cplusplus
}
......
......@@ -74,9 +74,7 @@ typedef struct tExprNode {
};
} tExprNode;
void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, SExprTraverseSupp *param);
void tExprTreeCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput, void *param, int32_t order,
void arithmeticTreeTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput, void *param, int32_t order,
char *(*cb)(void *, const char*, int32_t));
tExprNode* exprTreeFromBinary(const void* data, size_t size);
......@@ -87,6 +85,8 @@ void exprTreeToBinary(SBufferWriter* bw, tExprNode* pExprTree);
void tExprNodeDestroy(tExprNode *pNode, void (*fp)(void *));
void tExprTreeDestroy(tExprNode **pExprs, void (*fp)(void*));
bool exprTreeApplayFilter(tExprNode *pExpr, const void *pItem, SExprTraverseSupp *param);
#ifdef __cplusplus
}
#endif
......
......@@ -190,7 +190,7 @@ typedef struct SQueryRuntimeEnv {
void* pSecQueryHandle; // another thread for
bool stableQuery; // super table query or not
bool topBotQuery; // TODO used bitwise flag
bool groupbyNormalCol; // denote if this is a groupby normal column query
bool groupbyColumn; // denote if this is a groupby normal column query
bool hasTagResults; // if there are tag values in final result or not
bool timeWindowInterpo;// if the time window start/end required interpolation
bool queryWindowIdentical; // all query time windows are identical for all tables in one group
......@@ -204,6 +204,8 @@ typedef struct SQueryRuntimeEnv {
int32_t* rowCellInfoOffset;// offset value for each row result cell info
char** prevRow;
char** nextRow;
SArithmeticSupport *sasArray;
} SQueryRuntimeEnv;
enum {
......@@ -237,6 +239,7 @@ typedef struct SQInfo {
int32_t dataReady; // denote if query result is ready or not
void* rspContext; // response context
int64_t startExecTs; // start to exec timestamp
char* sql; // query sql string
} SQInfo;
#endif // TDENGINE_QUERYEXECUTOR_H
......@@ -112,11 +112,10 @@ extern "C" {
#define TOP_BOTTOM_QUERY_LIMIT 100
enum {
MASTER_SCAN = 0x0u,
REVERSE_SCAN = 0x1u,
REPEAT_SCAN = 0x2u, //repeat scan belongs to the master scan
FIRST_STAGE_MERGE = 0x10u,
SECONDARY_STAGE_MERGE = 0x20u,
MASTER_SCAN = 0x0u,
REVERSE_SCAN = 0x1u,
REPEAT_SCAN = 0x2u, //repeat scan belongs to the master scan
MERGE_STAGE = 0x20u,
};
#define QUERY_IS_STABLE_QUERY(type) (((type)&TSDB_QUERY_TYPE_STABLE_QUERY) != 0)
......@@ -191,8 +190,8 @@ typedef struct SQLFunctionCtx {
int64_t nStartQueryTimestamp; // timestamp range of current query when function is executed on a specific data block
int32_t numOfParams;
tVariant param[4]; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param */
int64_t * ptsList; // corresponding timestamp array list
void * ptsOutputBuf; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/
int64_t *ptsList; // corresponding timestamp array list
void *ptsOutputBuf; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/
SQLPreAggVal preAggVals;
tVariant tag;
......@@ -215,18 +214,12 @@ typedef struct SQLAggFuncElem {
void (*xFunction)(SQLFunctionCtx *pCtx); // blocks version function
void (*xFunctionF)(SQLFunctionCtx *pCtx, int32_t position); // single-row function version
// some sql function require scan data twice or more, e.g.,stddev
// some sql function require scan data twice or more, e.g.,stddev, percentile
void (*xNextStep)(SQLFunctionCtx *pCtx);
/*
* finalizer must be called after all xFunction has been executed to
* generated final result. Otherwise, the value in aOutputBuf is a intern result.
*/
// finalizer must be called after all xFunction has been executed to generated final result.
void (*xFinalize)(SQLFunctionCtx *pCtx);
void (*distMergeFunc)(SQLFunctionCtx *pCtx);
void (*distSecondaryMergeFunc)(SQLFunctionCtx *pCtx);
void (*mergeFunc)(SQLFunctionCtx *pCtx);
int32_t (*dataReqFunc)(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId);
} SQLAggFuncElem;
......
......@@ -15,7 +15,7 @@
#include "os.h"
#include "qSyntaxtreefunction.h"
#include "qArithmeticOperator.h"
#include "taosdef.h"
#include "tutil.h"
......@@ -1234,7 +1234,7 @@ _bi_consumer_fn_t rem_function_arraylist[8][10] = {
////////////////////////////////////////////////////////////////////////////////////////////////////////////
_bi_consumer_fn_t tGetBiConsumerFn(int32_t leftType, int32_t rightType, int32_t optr) {
_bi_consumer_fn_t getArithmeticOperatorFn(int32_t leftType, int32_t rightType, int32_t optr) {
switch (optr) {
case TSDB_BINARY_OP_ADD:
return add_function_arraylist[leftType][rightType];
......
......@@ -16,29 +16,18 @@
#include "os.h"
#include "exception.h"
#include "qArithmeticOperator.h"
#include "qAst.h"
#include "qSyntaxtreefunction.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "tarray.h"
#include "tbuffer.h"
#include "tcompare.h"
#include "tname.h"
#include "tschemautil.h"
#include "tsdb.h"
#include "tskiplist.h"
#include "tsqlfunction.h"
#include "tstoken.h"
#include "tschemautil.h"
typedef struct {
char* v;
int32_t optr;
} SEndPoint;
typedef struct {
SEndPoint* start;
SEndPoint* end;
} SQueryCond;
static uint8_t UNUSED_FUNC isQueryOnPrimaryKey(const char *primaryColumnName, const tExprNode *pLeft, const tExprNode *pRight) {
if (pLeft->nodeType == TSQL_NODE_COL) {
......@@ -53,323 +42,6 @@ static uint8_t UNUSED_FUNC isQueryOnPrimaryKey(const char *primaryColumnName, co
}
}
void tExprNodeDestroy(tExprNode *pNode, void (*fp)(void *)) {
if (pNode == NULL) {
return;
}
if (pNode->nodeType == TSQL_NODE_EXPR) {
tExprTreeDestroy(&pNode, fp);
} else if (pNode->nodeType == TSQL_NODE_VALUE) {
tVariantDestroy(pNode->pVal);
} else if (pNode->nodeType == TSQL_NODE_COL) {
free(pNode->pSchema);
}
free(pNode);
}
void tExprTreeDestroy(tExprNode **pExpr, void (*fp)(void *)) {
if (*pExpr == NULL) {
return;
}
if ((*pExpr)->nodeType == TSQL_NODE_EXPR) {
tExprTreeDestroy(&(*pExpr)->_node.pLeft, fp);
tExprTreeDestroy(&(*pExpr)->_node.pRight, fp);
if (fp != NULL) {
fp((*pExpr)->_node.info);
}
} else if ((*pExpr)->nodeType == TSQL_NODE_VALUE) {
tVariantDestroy((*pExpr)->pVal);
free((*pExpr)->pVal);
} else if ((*pExpr)->nodeType == TSQL_NODE_COL) {
free((*pExpr)->pSchema);
}
free(*pExpr);
*pExpr = NULL;
}
// todo check for malloc failure
static int32_t setQueryCond(tQueryInfo *queryColInfo, SQueryCond* pCond) {
int32_t optr = queryColInfo->optr;
if (optr == TSDB_RELATION_GREATER || optr == TSDB_RELATION_GREATER_EQUAL ||
optr == TSDB_RELATION_EQUAL || optr == TSDB_RELATION_NOT_EQUAL) {
pCond->start = calloc(1, sizeof(SEndPoint));
pCond->start->optr = queryColInfo->optr;
pCond->start->v = queryColInfo->q;
} else if (optr == TSDB_RELATION_LESS || optr == TSDB_RELATION_LESS_EQUAL) {
pCond->end = calloc(1, sizeof(SEndPoint));
pCond->end->optr = queryColInfo->optr;
pCond->end->v = queryColInfo->q;
} else if (optr == TSDB_RELATION_IN || optr == TSDB_RELATION_LIKE) {
assert(0);
}
return TSDB_CODE_SUCCESS;
}
static void tQueryIndexColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArray* result) {
SSkipListIterator* iter = NULL;
SQueryCond cond = {0};
if (setQueryCond(pQueryInfo, &cond) != TSDB_CODE_SUCCESS) {
//todo handle error
}
if (cond.start != NULL) {
iter = tSkipListCreateIterFromVal(pSkipList, (char*) cond.start->v, pSkipList->type, TSDB_ORDER_ASC);
} else {
iter = tSkipListCreateIterFromVal(pSkipList, (char*)(cond.end ? cond.end->v: NULL), pSkipList->type, TSDB_ORDER_DESC);
}
if (cond.start != NULL) {
int32_t optr = cond.start->optr;
if (optr == TSDB_RELATION_EQUAL) { // equals
while(tSkipListIterNext(iter)) {
SSkipListNode* pNode = tSkipListIterGet(iter);
int32_t ret = pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v);
if (ret != 0) {
break;
}
STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(result, &info);
}
} else if (optr == TSDB_RELATION_GREATER || optr == TSDB_RELATION_GREATER_EQUAL) { // greater equal
bool comp = true;
int32_t ret = 0;
while(tSkipListIterNext(iter)) {
SSkipListNode* pNode = tSkipListIterGet(iter);
if (comp) {
ret = pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v);
assert(ret >= 0);
}
if (ret == 0 && optr == TSDB_RELATION_GREATER) {
continue;
} else {
STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(result, &info);
comp = false;
}
}
} else if (optr == TSDB_RELATION_NOT_EQUAL) { // not equal
bool comp = true;
while(tSkipListIterNext(iter)) {
SSkipListNode* pNode = tSkipListIterGet(iter);
comp = comp && (pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v) == 0);
if (comp) {
continue;
}
STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(result, &info);
}
tSkipListDestroyIter(iter);
comp = true;
iter = tSkipListCreateIterFromVal(pSkipList, (char*) cond.start->v, pSkipList->type, TSDB_ORDER_DESC);
while(tSkipListIterNext(iter)) {
SSkipListNode* pNode = tSkipListIterGet(iter);
comp = comp && (pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v) == 0);
if (comp) {
continue;
}
STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(result, &info);
}
} else {
assert(0);
}
} else {
int32_t optr = cond.end ? cond.end->optr : TSDB_RELATION_INVALID;
if (optr == TSDB_RELATION_LESS || optr == TSDB_RELATION_LESS_EQUAL) {
bool comp = true;
int32_t ret = 0;
while (tSkipListIterNext(iter)) {
SSkipListNode *pNode = tSkipListIterGet(iter);
if (comp) {
ret = pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.end->v);
assert(ret <= 0);
}
if (ret == 0 && optr == TSDB_RELATION_LESS) {
continue;
} else {
STableKeyInfo info = {.pTable = (void *)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(result, &info);
comp = false; // no need to compare anymore
}
}
} else {
assert(pQueryInfo->optr == TSDB_RELATION_ISNULL || pQueryInfo->optr == TSDB_RELATION_NOTNULL);
while (tSkipListIterNext(iter)) {
SSkipListNode *pNode = tSkipListIterGet(iter);
bool isnull = isNull(SL_GET_NODE_KEY(pSkipList, pNode), pQueryInfo->sch.type);
if ((pQueryInfo->optr == TSDB_RELATION_ISNULL && isnull) ||
(pQueryInfo->optr == TSDB_RELATION_NOTNULL && (!isnull))) {
STableKeyInfo info = {.pTable = (void *)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(result, &info);
}
}
}
}
free(cond.start);
free(cond.end);
tSkipListDestroyIter(iter);
}
static bool filterItem(tExprNode *pExpr, const void *pItem, SExprTraverseSupp *param) {
tExprNode *pLeft = pExpr->_node.pLeft;
tExprNode *pRight = pExpr->_node.pRight;
//non-leaf nodes, recursively traverse the expression tree in the post-root order
if (pLeft->nodeType == TSQL_NODE_EXPR && pRight->nodeType == TSQL_NODE_EXPR) {
if (pExpr->_node.optr == TSDB_RELATION_OR) { // or
if (filterItem(pLeft, pItem, param)) {
return true;
}
// left child does not satisfy the query condition, try right child
return filterItem(pRight, pItem, param);
} else { // and
if (!filterItem(pLeft, pItem, param)) {
return false;
}
return filterItem(pRight, pItem, param);
}
}
// handle the leaf node
param->setupInfoFn(pExpr, param->pExtInfo);
return param->nodeFilterFn(pItem, pExpr->_node.info);
}
static void tSQLBinaryTraverseOnSkipList(tExprNode *pExpr, SArray *pResult, SSkipList *pSkipList, SExprTraverseSupp *param ) {
SSkipListIterator* iter = tSkipListCreateIter(pSkipList);
while (tSkipListIterNext(iter)) {
SSkipListNode *pNode = tSkipListIterGet(iter);
if (filterItem(pExpr, pNode, param)) {
taosArrayPush(pResult, &(SL_GET_NODE_DATA(pNode)));
}
}
tSkipListDestroyIter(iter);
}
static void tQueryIndexlessColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArray* res, __result_filter_fn_t filterFp) {
SSkipListIterator* iter = tSkipListCreateIter(pSkipList);
while (tSkipListIterNext(iter)) {
bool addToResult = false;
SSkipListNode *pNode = tSkipListIterGet(iter);
char * pData = SL_GET_NODE_DATA(pNode);
tstr *name = (tstr*) tsdbGetTableName((void*) pData);
// todo speed up by using hash
if (pQueryInfo->sch.colId == TSDB_TBNAME_COLUMN_INDEX) {
if (pQueryInfo->optr == TSDB_RELATION_IN) {
addToResult = pQueryInfo->compare(name, pQueryInfo->q);
} else if (pQueryInfo->optr == TSDB_RELATION_LIKE) {
addToResult = !pQueryInfo->compare(name, pQueryInfo->q);
}
} else {
addToResult = filterFp(pNode, pQueryInfo);
}
if (addToResult) {
STableKeyInfo info = {.pTable = (void*)pData, .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(res, &info);
}
}
tSkipListDestroyIter(iter);
}
// post-root order traverse syntax tree
void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, SExprTraverseSupp *param) {
if (pExpr == NULL) {
return;
}
tExprNode *pLeft = pExpr->_node.pLeft;
tExprNode *pRight = pExpr->_node.pRight;
// column project
if (pLeft->nodeType != TSQL_NODE_EXPR && pRight->nodeType != TSQL_NODE_EXPR) {
assert(pLeft->nodeType == TSQL_NODE_COL && (pRight->nodeType == TSQL_NODE_VALUE || pRight->nodeType == TSQL_NODE_DUMMY));
param->setupInfoFn(pExpr, param->pExtInfo);
tQueryInfo *pQueryInfo = pExpr->_node.info;
if (pQueryInfo->indexed && pQueryInfo->optr != TSDB_RELATION_LIKE) {
tQueryIndexColumn(pSkipList, pQueryInfo, result);
} else {
tQueryIndexlessColumn(pSkipList, pQueryInfo, result, param->nodeFilterFn);
}
return;
}
// The value of hasPK is always 0.
uint8_t weight = pLeft->_node.hasPK + pRight->_node.hasPK;
assert(weight == 0 && pSkipList != NULL && taosArrayGetSize(result) == 0);
//apply the hierarchical expression to every node in skiplist for find the qualified nodes
tSQLBinaryTraverseOnSkipList(pExpr, result, pSkipList, param);
#if 0
/*
* (weight == 1 && pExpr->nSQLBinaryOptr == TSDB_RELATION_AND) is handled here
*
* first, we filter results based on the skiplist index, which is the initial filter stage,
* then, we conduct the secondary filter operation based on the result from the initial filter stage.
*/
assert(pExpr->_node.optr == TSDB_RELATION_AND);
tExprNode *pFirst = NULL;
tExprNode *pSecond = NULL;
if (pLeft->_node.hasPK == 1) {
pFirst = pLeft;
pSecond = pRight;
} else {
pFirst = pRight;
pSecond = pLeft;
}
assert(pFirst != pSecond && pFirst != NULL && pSecond != NULL);
// we filter the result based on the skiplist index in the first place
tExprTreeTraverse(pFirst, pSkipList, result, param);
/*
* recursively perform the filter operation based on the initial results,
* So, we do not set the skip list index as a parameter
*/
tExprTreeTraverse(pSecond, NULL, result, param);
#endif
}
static void reverseCopy(char* dest, const char* src, int16_t type, int32_t numOfRows) {
switch(type) {
case TSDB_DATA_TYPE_TINYINT: {
......@@ -430,7 +102,73 @@ static void reverseCopy(char* dest, const char* src, int16_t type, int32_t numOf
}
}
void tExprTreeCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput, void *param, int32_t order,
void tExprNodeDestroy(tExprNode *pNode, void (*fp)(void *)) {
if (pNode == NULL) {
return;
}
if (pNode->nodeType == TSQL_NODE_EXPR) {
tExprTreeDestroy(&pNode, fp);
} else if (pNode->nodeType == TSQL_NODE_VALUE) {
tVariantDestroy(pNode->pVal);
} else if (pNode->nodeType == TSQL_NODE_COL) {
free(pNode->pSchema);
}
free(pNode);
}
void tExprTreeDestroy(tExprNode **pExpr, void (*fp)(void *)) {
if (*pExpr == NULL) {
return;
}
if ((*pExpr)->nodeType == TSQL_NODE_EXPR) {
tExprTreeDestroy(&(*pExpr)->_node.pLeft, fp);
tExprTreeDestroy(&(*pExpr)->_node.pRight, fp);
if (fp != NULL) {
fp((*pExpr)->_node.info);
}
} else if ((*pExpr)->nodeType == TSQL_NODE_VALUE) {
tVariantDestroy((*pExpr)->pVal);
free((*pExpr)->pVal);
} else if ((*pExpr)->nodeType == TSQL_NODE_COL) {
free((*pExpr)->pSchema);
}
free(*pExpr);
*pExpr = NULL;
}
bool exprTreeApplayFilter(tExprNode *pExpr, const void *pItem, SExprTraverseSupp *param) {
tExprNode *pLeft = pExpr->_node.pLeft;
tExprNode *pRight = pExpr->_node.pRight;
//non-leaf nodes, recursively traverse the expression tree in the post-root order
if (pLeft->nodeType == TSQL_NODE_EXPR && pRight->nodeType == TSQL_NODE_EXPR) {
if (pExpr->_node.optr == TSDB_RELATION_OR) { // or
if (exprTreeApplayFilter(pLeft, pItem, param)) {
return true;
}
// left child does not satisfy the query condition, try right child
return exprTreeApplayFilter(pRight, pItem, param);
} else { // and
if (!exprTreeApplayFilter(pLeft, pItem, param)) {
return false;
}
return exprTreeApplayFilter(pRight, pItem, param);
}
}
// handle the leaf node
param->setupInfoFn(pExpr, param->pExtInfo);
return param->nodeFilterFn(pItem, pExpr->_node.info);
}
void arithmeticTreeTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput, void *param, int32_t order,
char *(*getSourceDataBlock)(void *, const char*, int32_t)) {
if (pExprs == NULL) {
return;
......@@ -442,7 +180,7 @@ void tExprTreeCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput,
/* the left output has result from the left child syntax tree */
char *pLeftOutput = (char*)malloc(sizeof(int64_t) * numOfRows);
if (pLeft->nodeType == TSQL_NODE_EXPR) {
tExprTreeCalcTraverse(pLeft, numOfRows, pLeftOutput, param, order, getSourceDataBlock);
arithmeticTreeTraverse(pLeft, numOfRows, pLeftOutput, param, order, getSourceDataBlock);
}
/* the right output has result from the right child syntax tree */
......@@ -450,7 +188,7 @@ void tExprTreeCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput,
char *pdata = malloc(sizeof(int64_t) * numOfRows);
if (pRight->nodeType == TSQL_NODE_EXPR) {
tExprTreeCalcTraverse(pRight, numOfRows, pRightOutput, param, order, getSourceDataBlock);
arithmeticTreeTraverse(pRight, numOfRows, pRightOutput, param, order, getSourceDataBlock);
}
if (pLeft->nodeType == TSQL_NODE_EXPR) {
......@@ -459,11 +197,11 @@ void tExprTreeCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput,
* exprLeft + exprRight
* the type of returned value of one expression is always double float precious
*/
_bi_consumer_fn_t fp = tGetBiConsumerFn(TSDB_DATA_TYPE_DOUBLE, TSDB_DATA_TYPE_DOUBLE, pExprs->_node.optr);
_bi_consumer_fn_t fp = getArithmeticOperatorFn(TSDB_DATA_TYPE_DOUBLE, TSDB_DATA_TYPE_DOUBLE, pExprs->_node.optr);
fp(pLeftOutput, pRightOutput, numOfRows, numOfRows, pOutput, TSDB_ORDER_ASC);
} else if (pRight->nodeType == TSQL_NODE_COL) { // exprLeft + columnRight
_bi_consumer_fn_t fp = tGetBiConsumerFn(TSDB_DATA_TYPE_DOUBLE, pRight->pSchema->type, pExprs->_node.optr);
_bi_consumer_fn_t fp = getArithmeticOperatorFn(TSDB_DATA_TYPE_DOUBLE, pRight->pSchema->type, pExprs->_node.optr);
// set input buffer
char *pInputData = getSourceDataBlock(param, pRight->pSchema->name, pRight->pSchema->colId);
......@@ -475,14 +213,14 @@ void tExprTreeCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput,
}
} else if (pRight->nodeType == TSQL_NODE_VALUE) { // exprLeft + 12
_bi_consumer_fn_t fp = tGetBiConsumerFn(TSDB_DATA_TYPE_DOUBLE, pRight->pVal->nType, pExprs->_node.optr);
_bi_consumer_fn_t fp = getArithmeticOperatorFn(TSDB_DATA_TYPE_DOUBLE, pRight->pVal->nType, pExprs->_node.optr);
fp(pLeftOutput, &pRight->pVal->i64Key, numOfRows, 1, pOutput, TSDB_ORDER_ASC);
}
} else if (pLeft->nodeType == TSQL_NODE_COL) {
// column data specified on left-hand-side
char *pLeftInputData = getSourceDataBlock(param, pLeft->pSchema->name, pLeft->pSchema->colId);
if (pRight->nodeType == TSQL_NODE_EXPR) { // columnLeft + expr2
_bi_consumer_fn_t fp = tGetBiConsumerFn(pLeft->pSchema->type, TSDB_DATA_TYPE_DOUBLE, pExprs->_node.optr);
_bi_consumer_fn_t fp = getArithmeticOperatorFn(pLeft->pSchema->type, TSDB_DATA_TYPE_DOUBLE, pExprs->_node.optr);
if (order == TSDB_ORDER_DESC) {
reverseCopy(pdata, pLeftInputData, pLeft->pSchema->type, numOfRows);
......@@ -494,12 +232,12 @@ void tExprTreeCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput,
} else if (pRight->nodeType == TSQL_NODE_COL) { // columnLeft + columnRight
// column data specified on right-hand-side
char *pRightInputData = getSourceDataBlock(param, pRight->pSchema->name, pRight->pSchema->colId);
_bi_consumer_fn_t fp = tGetBiConsumerFn(pLeft->pSchema->type, pRight->pSchema->type, pExprs->_node.optr);
_bi_consumer_fn_t fp = getArithmeticOperatorFn(pLeft->pSchema->type, pRight->pSchema->type, pExprs->_node.optr);
// both columns are descending order, do not reverse the source data
fp(pLeftInputData, pRightInputData, numOfRows, numOfRows, pOutput, order);
} else if (pRight->nodeType == TSQL_NODE_VALUE) { // columnLeft + 12
_bi_consumer_fn_t fp = tGetBiConsumerFn(pLeft->pSchema->type, pRight->pVal->nType, pExprs->_node.optr);
_bi_consumer_fn_t fp = getArithmeticOperatorFn(pLeft->pSchema->type, pRight->pVal->nType, pExprs->_node.optr);
if (order == TSDB_ORDER_DESC) {
reverseCopy(pdata, pLeftInputData, pLeft->pSchema->type, numOfRows);
......@@ -511,13 +249,13 @@ void tExprTreeCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput,
} else {
// column data specified on left-hand-side
if (pRight->nodeType == TSQL_NODE_EXPR) { // 12 + expr2
_bi_consumer_fn_t fp = tGetBiConsumerFn(pLeft->pVal->nType, TSDB_DATA_TYPE_DOUBLE, pExprs->_node.optr);
_bi_consumer_fn_t fp = getArithmeticOperatorFn(pLeft->pVal->nType, TSDB_DATA_TYPE_DOUBLE, pExprs->_node.optr);
fp(&pLeft->pVal->i64Key, pRightOutput, 1, numOfRows, pOutput, TSDB_ORDER_ASC);
} else if (pRight->nodeType == TSQL_NODE_COL) { // 12 + columnRight
// column data specified on right-hand-side
char *pRightInputData = getSourceDataBlock(param, pRight->pSchema->name, pRight->pSchema->colId);
_bi_consumer_fn_t fp = tGetBiConsumerFn(pLeft->pVal->nType, pRight->pSchema->type, pExprs->_node.optr);
_bi_consumer_fn_t fp = getArithmeticOperatorFn(pLeft->pVal->nType, pRight->pSchema->type, pExprs->_node.optr);
if (order == TSDB_ORDER_DESC) {
reverseCopy(pdata, pRightInputData, pRight->pSchema->type, numOfRows);
......@@ -527,7 +265,7 @@ void tExprTreeCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput,
}
} else if (pRight->nodeType == TSQL_NODE_VALUE) { // 12 + 12
_bi_consumer_fn_t fp = tGetBiConsumerFn(pLeft->pVal->nType, pRight->pVal->nType, pExprs->_node.optr);
_bi_consumer_fn_t fp = getArithmeticOperatorFn(pLeft->pVal->nType, pRight->pVal->nType, pExprs->_node.optr);
fp(&pLeft->pVal->i64Key, &pRight->pVal->i64Key, 1, 1, pOutput, TSDB_ORDER_ASC);
}
}
......
此差异已折叠。
......@@ -174,7 +174,7 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) {
}
assert((*pHisto)->elems[idx].val > val);
} else {
} else if ((*pHisto)->numOfElems > 0) {
assert((*pHisto)->elems[(*pHisto)->numOfEntries].val < val);
}
......
......@@ -2645,13 +2645,12 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC
return pTableGroup;
}
static bool indexedNodeFilterFp(const void* pNode, void* param) {
static bool tableFilterFp(const void* pNode, void* param) {
tQueryInfo* pInfo = (tQueryInfo*) param;
STable* pTable = (STable*)(SL_GET_NODE_DATA((SSkipListNode*)pNode));
char* val = NULL;
char* val = NULL;
if (pInfo->sch.colId == TSDB_TBNAME_COLUMN_INDEX) {
val = (char*) TABLE_NAME(pTable);
} else {
......@@ -2706,15 +2705,17 @@ static bool indexedNodeFilterFp(const void* pNode, void* param) {
return true;
}
static void getTableListfromSkipList(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, SExprTraverseSupp *param);
static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr) {
// query according to the expression tree
SExprTraverseSupp supp = {
.nodeFilterFn = (__result_filter_fn_t) indexedNodeFilterFp,
.nodeFilterFn = (__result_filter_fn_t) tableFilterFp,
.setupInfoFn = filterPrepare,
.pExtInfo = pSTable->tagSchema,
};
tExprTreeTraverse(pExpr, pSTable->pIndex, pRes, &supp);
getTableListfromSkipList(pExpr, pSTable->pIndex, pRes, &supp);
tExprTreeDestroy(&pExpr, destroyHelper);
return TSDB_CODE_SUCCESS;
}
......@@ -2956,3 +2957,235 @@ void tsdbDestroyTableGroup(STableGroupInfo *pGroupList) {
taosArrayDestroy(pGroupList->pGroupList);
pGroupList->numOfTables = 0;
}
static void applyFilterToSkipListNode(SSkipList *pSkipList, tExprNode *pExpr, SArray *pResult, SExprTraverseSupp *param) {
SSkipListIterator* iter = tSkipListCreateIter(pSkipList);
// Scan each node in the skiplist by using iterator
while (tSkipListIterNext(iter)) {
SSkipListNode *pNode = tSkipListIterGet(iter);
if (exprTreeApplayFilter(pExpr, pNode, param)) {
taosArrayPush(pResult, &(SL_GET_NODE_DATA(pNode)));
}
}
tSkipListDestroyIter(iter);
}
typedef struct {
char* v;
int32_t optr;
} SEndPoint;
typedef struct {
SEndPoint* start;
SEndPoint* end;
} SQueryCond;
// todo check for malloc failure
static int32_t setQueryCond(tQueryInfo *queryColInfo, SQueryCond* pCond) {
int32_t optr = queryColInfo->optr;
if (optr == TSDB_RELATION_GREATER || optr == TSDB_RELATION_GREATER_EQUAL ||
optr == TSDB_RELATION_EQUAL || optr == TSDB_RELATION_NOT_EQUAL) {
pCond->start = calloc(1, sizeof(SEndPoint));
pCond->start->optr = queryColInfo->optr;
pCond->start->v = queryColInfo->q;
} else if (optr == TSDB_RELATION_LESS || optr == TSDB_RELATION_LESS_EQUAL) {
pCond->end = calloc(1, sizeof(SEndPoint));
pCond->end->optr = queryColInfo->optr;
pCond->end->v = queryColInfo->q;
} else if (optr == TSDB_RELATION_IN || optr == TSDB_RELATION_LIKE) {
assert(0);
}
return TSDB_CODE_SUCCESS;
}
static void queryIndexedColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArray* result) {
SSkipListIterator* iter = NULL;
SQueryCond cond = {0};
if (setQueryCond(pQueryInfo, &cond) != TSDB_CODE_SUCCESS) {
//todo handle error
}
if (cond.start != NULL) {
iter = tSkipListCreateIterFromVal(pSkipList, (char*) cond.start->v, pSkipList->type, TSDB_ORDER_ASC);
} else {
iter = tSkipListCreateIterFromVal(pSkipList, (char*)(cond.end ? cond.end->v: NULL), pSkipList->type, TSDB_ORDER_DESC);
}
if (cond.start != NULL) {
int32_t optr = cond.start->optr;
if (optr == TSDB_RELATION_EQUAL) { // equals
while(tSkipListIterNext(iter)) {
SSkipListNode* pNode = tSkipListIterGet(iter);
int32_t ret = pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v);
if (ret != 0) {
break;
}
STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(result, &info);
}
} else if (optr == TSDB_RELATION_GREATER || optr == TSDB_RELATION_GREATER_EQUAL) { // greater equal
bool comp = true;
int32_t ret = 0;
while(tSkipListIterNext(iter)) {
SSkipListNode* pNode = tSkipListIterGet(iter);
if (comp) {
ret = pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v);
assert(ret >= 0);
}
if (ret == 0 && optr == TSDB_RELATION_GREATER) {
continue;
} else {
STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(result, &info);
comp = false;
}
}
} else if (optr == TSDB_RELATION_NOT_EQUAL) { // not equal
bool comp = true;
while(tSkipListIterNext(iter)) {
SSkipListNode* pNode = tSkipListIterGet(iter);
comp = comp && (pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v) == 0);
if (comp) {
continue;
}
STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(result, &info);
}
tSkipListDestroyIter(iter);
comp = true;
iter = tSkipListCreateIterFromVal(pSkipList, (char*) cond.start->v, pSkipList->type, TSDB_ORDER_DESC);
while(tSkipListIterNext(iter)) {
SSkipListNode* pNode = tSkipListIterGet(iter);
comp = comp && (pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v) == 0);
if (comp) {
continue;
}
STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(result, &info);
}
} else {
assert(0);
}
} else {
int32_t optr = cond.end ? cond.end->optr : TSDB_RELATION_INVALID;
if (optr == TSDB_RELATION_LESS || optr == TSDB_RELATION_LESS_EQUAL) {
bool comp = true;
int32_t ret = 0;
while (tSkipListIterNext(iter)) {
SSkipListNode *pNode = tSkipListIterGet(iter);
if (comp) {
ret = pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.end->v);
assert(ret <= 0);
}
if (ret == 0 && optr == TSDB_RELATION_LESS) {
continue;
} else {
STableKeyInfo info = {.pTable = (void *)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(result, &info);
comp = false; // no need to compare anymore
}
}
} else {
assert(pQueryInfo->optr == TSDB_RELATION_ISNULL || pQueryInfo->optr == TSDB_RELATION_NOTNULL);
while (tSkipListIterNext(iter)) {
SSkipListNode *pNode = tSkipListIterGet(iter);
bool isnull = isNull(SL_GET_NODE_KEY(pSkipList, pNode), pQueryInfo->sch.type);
if ((pQueryInfo->optr == TSDB_RELATION_ISNULL && isnull) ||
(pQueryInfo->optr == TSDB_RELATION_NOTNULL && (!isnull))) {
STableKeyInfo info = {.pTable = (void *)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(result, &info);
}
}
}
}
free(cond.start);
free(cond.end);
tSkipListDestroyIter(iter);
}
static void queryIndexlessColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArray* res, __result_filter_fn_t filterFp) {
SSkipListIterator* iter = tSkipListCreateIter(pSkipList);
while (tSkipListIterNext(iter)) {
bool addToResult = false;
SSkipListNode *pNode = tSkipListIterGet(iter);
char *pData = SL_GET_NODE_DATA(pNode);
tstr *name = (tstr*) tsdbGetTableName((void*) pData);
// todo speed up by using hash
if (pQueryInfo->sch.colId == TSDB_TBNAME_COLUMN_INDEX) {
if (pQueryInfo->optr == TSDB_RELATION_IN) {
addToResult = pQueryInfo->compare(name, pQueryInfo->q);
} else if (pQueryInfo->optr == TSDB_RELATION_LIKE) {
addToResult = !pQueryInfo->compare(name, pQueryInfo->q);
}
} else {
addToResult = filterFp(pNode, pQueryInfo);
}
if (addToResult) {
STableKeyInfo info = {.pTable = (void*)pData, .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(res, &info);
}
}
tSkipListDestroyIter(iter);
}
// Apply the filter expression to each node in the skiplist to acquire the qualified nodes in skip list
void getTableListfromSkipList(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, SExprTraverseSupp *param) {
if (pExpr == NULL) {
return;
}
tExprNode *pLeft = pExpr->_node.pLeft;
tExprNode *pRight = pExpr->_node.pRight;
// column project
if (pLeft->nodeType != TSQL_NODE_EXPR && pRight->nodeType != TSQL_NODE_EXPR) {
assert(pLeft->nodeType == TSQL_NODE_COL && (pRight->nodeType == TSQL_NODE_VALUE || pRight->nodeType == TSQL_NODE_DUMMY));
param->setupInfoFn(pExpr, param->pExtInfo);
tQueryInfo *pQueryInfo = pExpr->_node.info;
if (pQueryInfo->indexed && pQueryInfo->optr != TSDB_RELATION_LIKE) {
queryIndexedColumn(pSkipList, pQueryInfo, result);
} else {
queryIndexlessColumn(pSkipList, pQueryInfo, result, param->nodeFilterFn);
}
return;
}
// The value of hasPK is always 0.
uint8_t weight = pLeft->_node.hasPK + pRight->_node.hasPK;
assert(weight == 0 && pSkipList != NULL && taosArrayGetSize(result) == 0);
//apply the hierarchical filter expression to every node in skiplist to find the qualified nodes
applyFilterToSkipListNode(pSkipList, pExpr, result, param);
}
......@@ -130,16 +130,14 @@ int32_t taosHashRemoveWithData(SHashObj *pHashObj, const void *key, size_t keyLe
int32_t taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), void *param);
void taosHashEmpty(SHashObj *pHashObj);
/**
* clean up hash table
* @param handle
*/
void taosHashCleanup(SHashObj *pHashObj);
/*
void *SHashMutableIterator* taosHashCreateIter(SHashObj *pHashObj, void *);
*/
/**
*
* @param pHashObj
......
......@@ -313,10 +313,10 @@ void* taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void
}
if (d != NULL) {
memcpy(d, GET_HASH_NODE_DATA(pNode), dsize);
} else {
data = GET_HASH_NODE_DATA(pNode);
memcpy(d, GET_HASH_NODE_DATA(pNode), pNode->dataLen);
}
data = GET_HASH_NODE_DATA(pNode);
}
if (pHashObj->type == HASH_ENTRY_LOCK) {
......@@ -472,38 +472,49 @@ int32_t taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), voi
return 0;
}
void taosHashCleanup(SHashObj *pHashObj) {
void taosHashEmpty(SHashObj *pHashObj) {
if (pHashObj == NULL) {
return;
}
uDebug("hash:%p cleanup hash table", pHashObj);
SHashNode *pNode, *pNext;
__wr_lock(&pHashObj->lock, pHashObj->type);
if (pHashObj->hashList) {
for (int32_t i = 0; i < pHashObj->capacity; ++i) {
SHashEntry *pEntry = pHashObj->hashList[i];
if (pEntry->num == 0) {
assert(pEntry->next == 0);
continue;
}
for (int32_t i = 0; i < pHashObj->capacity; ++i) {
SHashEntry *pEntry = pHashObj->hashList[i];
if (pEntry->num == 0) {
assert(pEntry->next == 0);
continue;
}
pNode = pEntry->next;
assert(pNode != NULL);
pNode = pEntry->next;
assert(pNode != NULL);
while (pNode) {
pNext = pNode->next;
FREE_HASH_NODE(pHashObj, pNode);
while (pNode) {
pNext = pNode->next;
FREE_HASH_NODE(pHashObj, pNode);
pNode = pNext;
}
pNode = pNext;
}
free(pHashObj->hashList);
pEntry->num = 0;
pEntry->next = NULL;
}
pHashObj->size = 0;
__wr_unlock(&pHashObj->lock, pHashObj->type);
}
void taosHashCleanup(SHashObj *pHashObj) {
if (pHashObj == NULL) {
return;
}
taosHashEmpty(pHashObj);
tfree(pHashObj->hashList);
// destroy mem block
size_t memBlock = taosArrayGetSize(pHashObj->pMemBlock);
......
......@@ -106,7 +106,7 @@ while $x < 5000
endw
system sh/exec.sh -n dnode1 -s stop -x SIGINT
sleep 3000
sleep 1000
system sh/exec.sh -n dnode1 -s start
print ================== server restart completed
sql connect
......
......@@ -244,3 +244,120 @@ if $data00 != -2.000000000 then
print expect -2.000000000, actual: $data00
return -1
endi
sql create table tm1 (ts timestamp, k int);
sql insert into tm1 values('2020-10-30 18:11:56.680', -1000);
sql insert into tm1 values('2020-11-19 18:11:45.773', NULL);
sql insert into tm1 values('2020-12-09 18:11:17.098', NULL);
sql insert into tm1 values('2020-12-20 18:11:49.412', 1);
sql insert into tm1 values('2020-12-23 18:11:50.412', 2);
sql insert into tm1 values('2020-12-28 18:11:52.412', 3);
print =====================> td-2610
sql select twa(k)from tm1 where ts>='2020-11-19 18:11:45.773' and ts<='2020-12-9 18:11:17.098'
if $rows != 0 then
return -1
endi
print =====================> td-2609
sql select apercentile(k, 50) from tm1 where ts>='2020-10-30 18:11:56.680' and ts<='2020-12-09 18:11:17.098'
if $rows != 1 then
return -1
endi
if $data00 != -1000.000000000 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
sleep 1000
system sh/exec.sh -n dnode1 -s start
print ================== server restart completed
sql connect
sleep 500
sql use m_func_db0
print =====================> td-2583
sql select min(k) from tm1 where ts>='2020-11-19 18:11:45.773' and ts<='2020-12-20 18:11:49.412'
if $rows != 1 then
return -1
endi
if $data00 != 1 then
print expect 1, actual: $data00
return -1
endi
print =====================> td-2601
sql select count(*) from tm1 where ts<='2020-6-1 00:00:00' and ts>='2020-1-1 00:00:00' interval(1n) fill(NULL)
if $rows != 0 then
return -1
endi
print =====================> td-2615
sql select last(ts) from tm1 interval(17a) limit 776 offset 3
if $rows != 3 then
return -1
endi
sql select last(ts) from tm1 interval(17a) limit 1000 offset 4
if $rows != 2 then
return -1
endi
sql select last(ts) from tm1 interval(17a) order by ts desc limit 1000 offset 0
if $rows != 6 then
return -1
endi
print ==================> td-2624
sql create table tm2(ts timestamp, k int, b binary(12));
sql insert into tm2 values('2011-01-02 18:42:45.326', -1,'abc');
sql insert into tm2 values('2020-07-30 17:44:06.283', 0, null);
sql insert into tm2 values('2020-07-30 17:44:19.578', 9999999, null);
sql insert into tm2 values('2020-07-30 17:46:06.417', NULL, null);
sql insert into tm2 values('2020-11-09 18:42:25.538', 0, null);
sql insert into tm2 values('2020-12-29 17:43:11.641', 0, null);
sql insert into tm2 values('2020-12-29 18:43:17.129', 0, null);
sql insert into tm2 values('2020-12-29 18:46:19.109', NULL, null);
sql insert into tm2 values('2021-01-03 18:40:40.065', 0, null);
sql select twa(k),first(ts) from tm2 where k <50 interval(17s);
if $rows != 6 then
return -1
endi
if $data00 != @11-01-02 18:42:42.000@ then
return -1
endi
if $data02 != @11-01-02 18:42:45.326@ then
return -1
endi
if $data10 != @20-07-30 17:43:59.000@ then
return -1
endi
if $data21 != 0.000000000 then
return -1
endi
sql select twa(k),first(ts) from tm2 where k <50 interval(17s) order by ts desc;
if $rows != 6 then
return -1
endi
sql select twa(k),first(ts),count(k),first(k) from tm2 interval(17s) limit 20 offset 0;
if $rows != 9 then
return -1
endi
if $data00 != @11-01-02 18:42:42.000@ then
return -1
endi
if $data10 != @20-07-30 17:43:59.000@ then
return -1
endi
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册