提交 cb6c09ec 编写于 作者: H Haojun Liao

[td-485]

上级 e401802f
......@@ -63,19 +63,21 @@ typedef struct SLocalReducer {
// char * pBufForInterpo; // intermediate buffer for interpolation
tFilePage * pTempBuffer;
struct SQLFunctionCtx *pCtx;
int32_t rowSize; // size of each intermediate result.
int32_t status; // denote it is in reduce process, in reduce process, it
bool hasPrevRow; // cannot be released
int32_t rowSize; // size of each intermediate result.
int32_t finalRowSize; // final result row size
int32_t status; // denote it is in reduce process, in reduce process, it
bool hasPrevRow; // cannot be released
bool hasUnprocessedRow;
tOrderDescriptor * pDesc;
SColumnModel * resColModel;
tExtMemBuffer ** pExtMemBuffer; // disk-based buffer
SFillInfo* pFillInfo; // interpolation support structure
SFillInfo* pFillInfo; // interpolation support structure
char * pFinalRes; // result data after interpo
tFilePage * discardData;
SResultInfo * pResInfo;
bool discard;
int32_t offset; // limit offset value
bool orderPrjOnSTable; // projection query on stable
} SLocalReducer;
typedef struct SSubqueryState {
......
......@@ -341,16 +341,6 @@ bool stableQueryFunctChanged(int32_t funcId) {
*/
void resetResultInfo(SResultInfo *pResInfo) { pResInfo->initialized = false; }
void initResultInfo(SResultInfo *pResInfo) {
pResInfo->initialized = true; // the this struct has been initialized flag
pResInfo->complete = false;
pResInfo->hasResult = false;
pResInfo->numOfRes = 0;
memset(pResInfo->interResultBuf, 0, (size_t)pResInfo->bufLen);
}
void setResultInfoBuf(SResultInfo *pResInfo, int32_t size, bool superTable) {
assert(pResInfo->interResultBuf == NULL);
......@@ -387,9 +377,7 @@ static bool function_setup(SQLFunctionCtx *pCtx) {
*/
static void function_finalizer(SQLFunctionCtx *pCtx) {
SResultInfo *pResInfo = GET_RES_INFO(pCtx);
if (pResInfo->hasResult != DATA_SET_FLAG) {
tscTrace("no result generated, result is set to NULL");
if (pCtx->outputType == TSDB_DATA_TYPE_BINARY || pCtx->outputType == TSDB_DATA_TYPE_NCHAR) {
setVardataNull(pCtx->aOutputBuf, pCtx->outputType);
} else {
......
......@@ -48,7 +48,7 @@ static int32_t tscToInteger(SSQLToken *pToken, int64_t *value, char **endPtr) {
int32_t radix = 10;
int32_t radixList[3] = {16, 8, 2};
int32_t radixList[3] = {16, 8, 2}; // the integer number with different radix: hex, oct, bin
if (pToken->type == TK_HEX || pToken->type == TK_OCT || pToken->type == TK_BIN) {
radix = radixList[pToken->type - TK_HEX];
}
......
......@@ -494,7 +494,6 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) {
tsem_init(&pSql->rspSem, 0, 0);
pSql->signature = pSql;
pSql->pTscObj = pObj;
//pSql->pTscObj->pSql = pSql;
pSql->maxRetry = TSDB_MAX_REPLICA_NUM;
pStmt->pSql = pSql;
......@@ -515,7 +514,7 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
//doAsyncQuery(pObj, pSql, waitForQueryRsp, taos, sqlstr, sqlLen);
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
pSql->param = (void*)pSql;
pSql->param = (void*) pSql;
pSql->fp = waitForQueryRsp;
pSql->insertType = TSDB_QUERY_TYPE_STMT_INSERT;
......
......@@ -217,7 +217,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
pReducer->numOfBuffer = numOfFlush;
pReducer->numOfVnode = numOfBuffer;
pReducer->pDesc = pDesc;
tscTrace("%p the number of merged leaves is: %d", pSql, pReducer->numOfBuffer);
......@@ -278,6 +278,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
param->groupOrderType = pQueryInfo->groupbyExpr.orderType;
pReducer->orderPrjOnSTable = tscOrderedProjectionQueryOnSTable(pQueryInfo, 0);
pRes->code = tLoserTreeCreate(&pReducer->pLoserTree, pReducer->numOfBuffer, param, treeComparator);
if (pReducer->pLoserTree == NULL || pRes->code != 0) {
......@@ -309,10 +310,10 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
pReducer->nResultBufSize = pMemBuffer[0]->pageSize * 16;
pReducer->pResultBuf = (tFilePage *)calloc(1, pReducer->nResultBufSize + sizeof(tFilePage));
int32_t finalRowLength = tscGetResRowLength(pQueryInfo->exprList);
pReducer->finalRowSize = tscGetResRowLength(pQueryInfo->exprList);
pReducer->resColModel = finalmodel;
pReducer->resColModel->capacity = pReducer->nResultBufSize / finalRowLength;
assert(finalRowLength <= pReducer->rowSize);
pReducer->resColModel->capacity = pReducer->nResultBufSize / pReducer->finalRowSize;
assert(pReducer->finalRowSize <= pReducer->rowSize);
pReducer->pFinalRes = calloc(1, pReducer->rowSize * pReducer->resColModel->capacity);
// pReducer->pBufForInterpo = calloc(1, pReducer->nResultBufSize);
......@@ -389,7 +390,7 @@ static int32_t tscFlushTmpBufferImpl(tExtMemBuffer *pMemoryBuf, tOrderDescriptor
assert(pPage->num <= pDesc->pColumnModel->capacity);
// sort before flush to disk, the data must be consecutively put on tFilePage.
if (pDesc->orderIdx.numOfCols > 0) {
if (pDesc->orderInfo.numOfCols > 0) {
tColDataQSort(pDesc, pPage->num, 0, pPage->num - 1, pPage->data, orderType);
}
......@@ -590,12 +591,10 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCm
bool isSameGroup(SSqlCmd *pCmd, SLocalReducer *pReducer, char *pPrev, tFilePage *tmpBuffer) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
int16_t functionId = tscSqlExprGet(pQueryInfo, 0)->functionId;
// disable merge procedure for column projection query
int16_t functionId = pReducer->pCtx[0].functionId;
assert(functionId != TSDB_FUNC_ARITHM);
if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
if (pReducer->orderPrjOnSTable) {
return true;
}
......@@ -604,26 +603,33 @@ bool isSameGroup(SSqlCmd *pCmd, SLocalReducer *pReducer, char *pPrev, tFilePage
}
tOrderDescriptor *pOrderDesc = pReducer->pDesc;
int32_t numOfCols = pOrderDesc->orderIdx.numOfCols;
SColumnOrderInfo* orderInfo = &pOrderDesc->orderInfo;
// no group by columns, all data belongs to one group
int32_t numOfCols = orderInfo->numOfCols;
if (numOfCols <= 0) {
return true;
}
if (pOrderDesc->orderIdx.pData[numOfCols - 1] == PRIMARYKEY_TIMESTAMP_COL_INDEX) { //<= 0
// super table interval query
if (orderInfo->pData[numOfCols - 1] == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
/*
* super table interval query
* if the order columns is the primary timestamp, all result data belongs to one group
*/
assert(pQueryInfo->intervalTime > 0);
pOrderDesc->orderIdx.numOfCols -= 1;
if (numOfCols == 1) {
return true;
}
} else { // simple group by query
assert(pQueryInfo->intervalTime == 0);
}
// only one row exists
int32_t ret = compare_a(pOrderDesc, 1, 0, pPrev, 1, 0, tmpBuffer->data);
pOrderDesc->orderIdx.numOfCols = numOfCols;
return (ret == 0);
int32_t index = orderInfo->pData[0];
int32_t offset = (pOrderDesc->pColumnModel)->pFields[index].offset;
int32_t ret = memcmp(pPrev + offset, tmpBuffer->data + offset, pOrderDesc->pColumnModel->rowSize - offset);
return ret == 0;
}
int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOrderDescriptor **pOrderDesc,
......@@ -873,24 +879,24 @@ static void reversedCopyFromInterpolationToDstBuf(SQueryInfo *pQueryInfo, SSqlRe
* Note: pRes->pLocalReducer may be null, due to the fact that "tscDestroyLocalReducer" is called
* by "interuptHandler" function in shell
*/
static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneOutput) {
static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneOutput) {
SSqlCmd * pCmd = &pSql->cmd;
SSqlRes * pRes = &pSql->res;
tFilePage * pFinalDataPage = pLocalReducer->pResultBuf;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if (pRes->pLocalReducer != pLocalReducer) {
/*
* Release the SSqlObj is called, and it is int destroying function invoked by other thread.
* However, the other thread will WAIT until current process fully completes.
* Since the flag of release struct is set by doLocalReduce function
*/
assert(pRes->pLocalReducer == NULL);
}
// if (pRes->pLocalReducer != pLocalReducer) {
// /*
// * Release the SSqlObj is called, and it is int destroying function invoked by other thread.
// * However, the other thread will WAIT until current process fully completes.
// * Since the flag of release struct is set by doLocalReduce function
// */
// assert(pRes->pLocalReducer == NULL);
// }
// no interval query, no fill operation
if (pQueryInfo->intervalTime == 0 || pQueryInfo->fillType == TSDB_FILL_NONE) {
// no interval query, no fill operation
pRes->data = pLocalReducer->pFinalRes;
pRes->numOfRows = pFinalDataPage->num;
pRes->numOfClauseTotal += pRes->numOfRows;
......@@ -929,9 +935,7 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
savePrevRecordAndSetupInterpoInfo(pLocalReducer, pQueryInfo, pLocalReducer->pFillInfo);
}
int32_t rowSize = tscGetResRowLength(pQueryInfo->exprList);
memcpy(pRes->data, pFinalDataPage->data, pRes->numOfRows * rowSize);
memcpy(pRes->data, pFinalDataPage->data, pRes->numOfRows * pLocalReducer->finalRowSize);
pFinalDataPage->num = 0;
return;
}
......@@ -1037,16 +1041,13 @@ static void savePreviousRow(SLocalReducer *pLocalReducer, tFilePage *tmpBuffer)
static void doExecuteSecondaryMerge(SSqlCmd *pCmd, SLocalReducer *pLocalReducer, bool needInit) {
// the tag columns need to be set before all functions execution
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t j = 0; j < size; ++j) {
SSqlExpr * pExpr = tscSqlExprGet(pQueryInfo, j);
SQLFunctionCtx *pCtx = &pLocalReducer->pCtx[j];
tVariantAssign(&pCtx->param[0], &pExpr->param[0]);
// tags/tags_dummy function, the tag field of SQLFunctionCtx is from the input buffer
int32_t functionId = pExpr->functionId;
int32_t functionId = pCtx->functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TS_DUMMY) {
tVariantDestroy(&pCtx->tag);
char* input = pCtx->aInputElemBuf;
......@@ -1057,17 +1058,20 @@ static void doExecuteSecondaryMerge(SSqlCmd *pCmd, SLocalReducer *pLocalReducer,
} else {
tVariantCreateFromBinary(&pCtx->tag, input, pCtx->inputBytes, pCtx->inputType);
}
} else if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, j);
pCtx->param[0].i64Key = pExpr->param[0].i64Key;
}
pCtx->currentStage = SECONDARY_STAGE_MERGE;
if (needInit) {
aAggs[pExpr->functionId].init(pCtx);
aAggs[pCtx->functionId].init(pCtx);
}
}
for (int32_t j = 0; j < size; ++j) {
int32_t functionId = tscSqlExprGet(pQueryInfo, j)->functionId;
int32_t functionId = pLocalReducer->pCtx[j].functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
continue;
}
......@@ -1101,8 +1105,7 @@ static int64_t getNumOfResultLocal(SQueryInfo *pQueryInfo, SQLFunctionCtx *pCtx)
* ts, tag, tagprj function can not decide the output number of current query
* the number of output result is decided by main output
*/
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, j);
int32_t functionId = pExpr->functionId;
int32_t functionId = pCtx[j].functionId;
if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TAGPRJ) {
continue;
}
......@@ -1136,15 +1139,13 @@ static void fillMultiRowsOfTagsVal(SQueryInfo *pQueryInfo, int32_t numOfRes, SLo
char *buf = malloc((size_t)maxBufSize);
for (int32_t k = 0; k < size; ++k) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, k);
if (pExpr->functionId != TSDB_FUNC_TAG) {
SQLFunctionCtx *pCtx = &pLocalReducer->pCtx[k];
if (pCtx->functionId != TSDB_FUNC_TAG) {
continue;
}
int32_t inc = numOfRes - 1; // tsdb_func_tag function only produce one row of result
memset(buf, 0, (size_t)maxBufSize);
SQLFunctionCtx *pCtx = &pLocalReducer->pCtx[k];
memcpy(buf, pCtx->aOutputBuf, (size_t)pCtx->outputBytes);
for (int32_t i = 0; i < inc; ++i) {
......@@ -1160,8 +1161,8 @@ int32_t finalizeRes(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer) {
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t k = 0; k < size; ++k) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, k);
aAggs[pExpr->functionId].xFinalize(&pLocalReducer->pCtx[k]);
SQLFunctionCtx* pCtx = &pLocalReducer->pCtx[k];
aAggs[pCtx->functionId].xFinalize(pCtx);
}
pLocalReducer->hasPrevRow = false;
......@@ -1182,13 +1183,13 @@ int32_t finalizeRes(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer) {
*/
bool needToMerge(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer, tFilePage *tmpBuffer) {
int32_t ret = 0; // merge all result by default
int16_t functionId = tscSqlExprGet(pQueryInfo, 0)->functionId;
int16_t functionId = pLocalReducer->pCtx[0].functionId;
if (functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_ARITHM) { // column projection query
ret = 1; // disable merge procedure
} else {
tOrderDescriptor *pDesc = pLocalReducer->pDesc;
if (pDesc->orderIdx.numOfCols > 0) {
if (pDesc->orderInfo.numOfCols > 0) {
if (pDesc->tsOrder == TSDB_ORDER_ASC) { // asc
// todo refactor comparator
ret = compare_a(pLocalReducer->pDesc, 1, 0, pLocalReducer->prevRowOfInput, 1, 0, tmpBuffer->data);
......@@ -1274,7 +1275,7 @@ bool doGenerateFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool no
taosFillCopyInputDataFromOneFilePage(pFillInfo, pResBuf);
}
doInterpolateResult(pSql, pLocalReducer, noMoreCurrentGroupRes);
doFillResult(pSql, pLocalReducer, noMoreCurrentGroupRes);
return true;
}
......@@ -1341,7 +1342,7 @@ static bool doBuildFilledResultForGroup(SSqlObj *pSql) {
// the first column must be the timestamp column
int32_t rows = taosGetNumOfResultWithFill(pFillInfo, remain, ekey, pLocalReducer->resColModel->capacity);
if (rows > 0) { // do interpo
doInterpolateResult(pSql, pLocalReducer, false);
doFillResult(pSql, pLocalReducer, false);
}
return true;
......@@ -1374,7 +1375,7 @@ static bool doHandleLastRemainData(SSqlObj *pSql) {
pQueryInfo->slidingTimeUnit, tinfo.precision);
int32_t rows = taosGetNumOfResultWithFill(pFillInfo, 0, etime, pLocalReducer->resColModel->capacity);
if (rows > 0) { // do interpo
doInterpolateResult(pSql, pLocalReducer, true);
doFillResult(pSql, pLocalReducer, true);
}
}
......@@ -1408,13 +1409,11 @@ static void doProcessResultInNextWindow(SSqlObj *pSql, int32_t numOfRes) {
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t k = 0; k < size; ++k) {
SSqlExpr * pExpr = tscSqlExprGet(pQueryInfo, k);
SQLFunctionCtx *pCtx = &pLocalReducer->pCtx[k];
pCtx->aOutputBuf += pCtx->outputBytes * numOfRes;
// set the correct output timestamp column position
if (pExpr->functionId == TSDB_FUNC_TOP || pExpr->functionId == TSDB_FUNC_BOTTOM) {
if (pCtx->functionId == TSDB_FUNC_TOP || pCtx->functionId == TSDB_FUNC_BOTTOM) {
pCtx->ptsOutputBuf = ((char *)pCtx->ptsOutputBuf + TSDB_KEYSIZE * numOfRes);
}
}
......
......@@ -2104,7 +2104,7 @@ void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)()) {
}
void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pFieldInfo, int32_t columnIndex) {
SFieldSupInfo* pInfo = tscFieldInfoGetSupp(pFieldInfo, columnIndex);
SFieldSupInfo* pInfo = taosArrayGet(pFieldInfo->pSupportInfo, columnIndex);//tscFieldInfoGetSupp(pFieldInfo, columnIndex);
assert(pInfo->pSqlExpr != NULL);
int32_t type = pInfo->pSqlExpr->resType;
......
......@@ -188,9 +188,10 @@ typedef void *TsdbPosT;
* @param tsdb tsdb handle
* @param pCond query condition, including time window, result set order, and basic required columns for each block
* @param groupInfo tableId list in the form of set, seperated into different groups according to group by condition
* @param qinfo query info handle from query processor
* @return
*/
TsdbQueryHandleT *tsdbQueryTables(TsdbRepoT *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupInfo);
TsdbQueryHandleT *tsdbQueryTables(TsdbRepoT *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupInfo, void* qinfo);
/**
* Get the last row of the given query time window for all the tables in STableGroupInfo object.
......@@ -202,11 +203,11 @@ TsdbQueryHandleT *tsdbQueryTables(TsdbRepoT *tsdb, STsdbQueryCond *pCond, STable
* @param groupInfo tableId list.
* @return
*/
TsdbQueryHandleT tsdbQueryLastRow(TsdbRepoT *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupInfo);
TsdbQueryHandleT tsdbQueryLastRow(TsdbRepoT *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupInfo, void* qinfo);
SArray* tsdbGetQueriedTableIdList(TsdbQueryHandleT *pHandle);
TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TsdbRepoT *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList);
TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TsdbRepoT *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList, void* qinfo);
/**
* move to next block if exists
......
......@@ -172,10 +172,11 @@ typedef struct SQueryRuntimeEnv {
STSBuf* pTSBuf;
STSCursor cur;
SQueryCostInfo summary;
bool stableQuery; // super table query or not
bool stableQuery; // super table query or not
void* pQueryHandle;
void* pSecQueryHandle; // another thread for
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
bool topBotQuery; // false;
} SQueryRuntimeEnv;
typedef struct SQInfo {
......
......@@ -31,7 +31,8 @@ void closeTimeWindow(SWindowResInfo* pWindowResInfo, int32_t slot);
void closeAllTimeWindow(SWindowResInfo* pWindowResInfo);
void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_t order);
SWindowResult *getWindowResult(SWindowResInfo *pWindowResInfo, int32_t slot);
int32_t curTimeWindow(SWindowResInfo *pWindowResInfo);
#define curTimeWindow(_winres) ((_winres)->curIndex)
bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot);
void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, SPosInfo *posInfo);
......
......@@ -28,7 +28,7 @@ extern "C" {
#include "tdataformat.h"
#include "talgo.h"
#define DEFAULT_PAGE_SIZE (1024L*56) // 16k larger than the SHistoInfo
#define DEFAULT_PAGE_SIZE (1024L*64) // 16k larger than the SHistoInfo
#define MAX_TMPFILE_PATH_LENGTH PATH_MAX
#define INITIAL_ALLOCATION_BUFFER_SIZE 64
......@@ -96,7 +96,7 @@ typedef struct SColumnOrderInfo {
typedef struct tOrderDescriptor {
SColumnModel * pColumnModel;
int32_t tsOrder; // timestamp order type if exists
SColumnOrderInfo orderIdx;
SColumnOrderInfo orderInfo;
} tOrderDescriptor;
typedef struct tExtMemBuffer {
......
......@@ -85,7 +85,7 @@ SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId);
* @param id
* @return
*/
tFilePage* getResultBufferPageById(SDiskbasedResultBuf* pResultBuf, int32_t id);
#define GET_RES_BUF_PAGE_BY_ID(buf, id) ((tFilePage*)((buf)->pBuf + DEFAULT_INTERN_BUF_PAGE_SIZE*(id)))
/**
* get the total buffer size in the format of disk file
......
......@@ -647,9 +647,9 @@ cmd ::= ALTER TABLE ids(X) cpxName(F) SET TAG ids(Y) EQ tagitem(Z). {
}
////////////////////////////////////////kill statement///////////////////////////////////////
cmd ::= KILL CONNECTION IPTOKEN(X) COLON(Z) INTEGER(Y). {X.n += (Z.n + Y.n); setKillSQL(pInfo, TSDB_SQL_KILL_CONNECTION, &X);}
cmd ::= KILL STREAM IPTOKEN(X) COLON(Z) INTEGER(Y) COLON(K) INTEGER(F). {X.n += (Z.n + Y.n + K.n + F.n); setKillSQL(pInfo, TSDB_SQL_KILL_STREAM, &X);}
cmd ::= KILL QUERY IPTOKEN(X) COLON(Z) INTEGER(Y) COLON(K) INTEGER(F). {X.n += (Z.n + Y.n + K.n + F.n); setKillSQL(pInfo, TSDB_SQL_KILL_QUERY, &X);}
cmd ::= KILL CONNECTION INTEGER(Y). {setKillSQL(pInfo, TSDB_SQL_KILL_CONNECTION, &Y);}
cmd ::= KILL STREAM INTEGER(X) COLON(Z) INTEGER(Y). {X.n += (Z.n + Y.n); setKillSQL(pInfo, TSDB_SQL_KILL_STREAM, &X);}
cmd ::= KILL QUERY INTEGER(X) COLON(Z) INTEGER(Y). {X.n += (Z.n + Y.n); setKillSQL(pInfo, TSDB_SQL_KILL_QUERY, &X);}
%fallback ID ABORT AFTER ASC ATTACH BEFORE BEGIN CASCADE CLUSTER CONFLICT COPY DATABASE DEFERRED
DELIMITERS DESC DETACH EACH END EXPLAIN FAIL FOR GLOB IGNORE IMMEDIATE INITIALLY INSTEAD
......
......@@ -272,9 +272,18 @@ bool top_bot_datablock_filter(SQLFunctionCtx *pCtx, int32_t functionId, char *mi
bool stableQueryFunctChanged(int32_t funcId);
void resetResultInfo(SResultInfo *pResInfo);
void initResultInfo(SResultInfo *pResInfo);
void setResultInfoBuf(SResultInfo *pResInfo, int32_t size, bool superTable);
static FORCE_INLINE void initResultInfo(SResultInfo *pResInfo) {
pResInfo->initialized = true; // the this struct has been initialized flag
pResInfo->complete = false;
pResInfo->hasResult = false;
pResInfo->numOfRes = 0;
memset(pResInfo->interResultBuf, 0, (size_t)pResInfo->bufLen);
}
#ifdef __cplusplus
}
#endif
......
此差异已折叠。
......@@ -206,11 +206,6 @@ bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot) {
return (getWindowResult(pWindowResInfo, slot)->status.closed == true);
}
int32_t curTimeWindow(SWindowResInfo *pWindowResInfo) {
assert(pWindowResInfo->curIndex >= 0 && pWindowResInfo->curIndex < pWindowResInfo->size);
return pWindowResInfo->curIndex;
}
void closeTimeWindow(SWindowResInfo *pWindowResInfo, int32_t slot) {
getWindowResult(pWindowResInfo, slot)->status.closed = true;
}
......
......@@ -356,17 +356,15 @@ static FORCE_INLINE int32_t primaryKeyComparator(int64_t f1, int64_t f2, int32_t
static FORCE_INLINE int32_t columnValueAscendingComparator(char *f1, char *f2, int32_t type, int32_t bytes) {
switch (type) {
case TSDB_DATA_TYPE_INT: {
int32_t first = *(int32_t *)f1;
int32_t second = *(int32_t *)f2;
int32_t first = *(int32_t *) f1;
int32_t second = *(int32_t *) f2;
if (first == second) {
return 0;
}
return (first < second) ? -1 : 1;
};
case TSDB_DATA_TYPE_DOUBLE: {
//double first = *(double *)f1;
double first = GET_DOUBLE_VAL(f1);
//double second = *(double *)f2;
double first = GET_DOUBLE_VAL(f1);
double second = GET_DOUBLE_VAL(f2);
if (first == second) {
return 0;
......@@ -374,9 +372,7 @@ static FORCE_INLINE int32_t columnValueAscendingComparator(char *f1, char *f2, i
return (first < second) ? -1 : 1;
};
case TSDB_DATA_TYPE_FLOAT: {
//float first = *(float *)f1;
//float second = *(float *)f2;
float first = GET_FLOAT_VAL(f1);
float first = GET_FLOAT_VAL(f1);
float second = GET_FLOAT_VAL(f2);
if (first == second) {
return 0;
......@@ -439,9 +435,9 @@ int32_t compare_a(tOrderDescriptor *pDescriptor, int32_t numOfRows1, int32_t s1,
int32_t s2, char *data2) {
assert(numOfRows1 == numOfRows2);
int32_t cmpCnt = pDescriptor->orderIdx.numOfCols;
int32_t cmpCnt = pDescriptor->orderInfo.numOfCols;
for (int32_t i = 0; i < cmpCnt; ++i) {
int32_t colIdx = pDescriptor->orderIdx.pData[i];
int32_t colIdx = pDescriptor->orderInfo.pData[i];
char *f1 = COLMODEL_GET_VAL(data1, pDescriptor->pColumnModel, numOfRows1, s1, colIdx);
char *f2 = COLMODEL_GET_VAL(data2, pDescriptor->pColumnModel, numOfRows2, s2, colIdx);
......@@ -471,9 +467,9 @@ int32_t compare_d(tOrderDescriptor *pDescriptor, int32_t numOfRows1, int32_t s1,
int32_t s2, char *data2) {
assert(numOfRows1 == numOfRows2);
int32_t cmpCnt = pDescriptor->orderIdx.numOfCols;
int32_t cmpCnt = pDescriptor->orderInfo.numOfCols;
for (int32_t i = 0; i < cmpCnt; ++i) {
int32_t colIdx = pDescriptor->orderIdx.pData[i];
int32_t colIdx = pDescriptor->orderInfo.pData[i];
char *f1 = COLMODEL_GET_VAL(data1, pDescriptor->pColumnModel, numOfRows1, s1, colIdx);
char *f2 = COLMODEL_GET_VAL(data2, pDescriptor->pColumnModel, numOfRows2, s2, colIdx);
......@@ -563,13 +559,13 @@ static void median(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t sta
int32_t midIdx = ((end - start) >> 1) + start;
#if defined(_DEBUG_VIEW)
int32_t f = pDescriptor->orderIdx.pData[0];
int32_t f = pDescriptor->orderInfo.pData[0];
char *midx = COLMODEL_GET_VAL(data, pDescriptor->pColumnModel, numOfRows, midIdx, f);
char *startx = COLMODEL_GET_VAL(data, pDescriptor->pColumnModel, numOfRows, start, f);
char *endx = COLMODEL_GET_VAL(data, pDescriptor->pColumnModel, numOfRows, end, f);
int32_t colIdx = pDescriptor->orderIdx.pData[0];
int32_t colIdx = pDescriptor->orderInfo.pData[0];
tSortDataPrint(pDescriptor->pColumnModel->pFields[colIdx].field.type, "before", startx, midx, endx);
#endif
......@@ -596,7 +592,7 @@ static void median(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t sta
}
static UNUSED_FUNC void tRowModelDisplay(tOrderDescriptor *pDescriptor, int32_t numOfRows, char *d, int32_t len) {
int32_t colIdx = pDescriptor->orderIdx.pData[0];
int32_t colIdx = pDescriptor->orderInfo.pData[0];
for (int32_t i = 0; i < len; ++i) {
char *startx = COLMODEL_GET_VAL(d, pDescriptor->pColumnModel, numOfRows, i, colIdx);
......@@ -1062,9 +1058,9 @@ tOrderDescriptor *tOrderDesCreate(const int32_t *orderColIdx, int32_t numOfOrder
desc->pColumnModel = pModel;
desc->tsOrder = tsOrderType;
desc->orderIdx.numOfCols = numOfOrderCols;
desc->orderInfo.numOfCols = numOfOrderCols;
for (int32_t i = 0; i < numOfOrderCols; ++i) {
desc->orderIdx.pData[i] = orderColIdx[i];
desc->orderInfo.pData[i] = orderColIdx[i];
}
return desc;
......
......@@ -50,12 +50,6 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t si
return TSDB_CODE_SUCCESS;
}
tFilePage* getResultBufferPageById(SDiskbasedResultBuf* pResultBuf, int32_t id) {
assert(id < pResultBuf->numOfPages && id >= 0);
return (tFilePage*)(pResultBuf->pBuf + DEFAULT_INTERN_BUF_PAGE_SIZE * id);
}
int32_t getNumOfResultBufGroupId(SDiskbasedResultBuf* pResultBuf) { return taosHashGetSize(pResultBuf->idsTable); }
int32_t getResBufSize(SDiskbasedResultBuf* pResultBuf) { return pResultBuf->totalBufSize; }
......@@ -169,7 +163,7 @@ tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32
*pageId = (pResultBuf->allocateId++);
registerPageId(pResultBuf, groupId, *pageId);
tFilePage* page = getResultBufferPageById(pResultBuf, *pageId);
tFilePage* page = GET_RES_BUF_PAGE_BY_ID(pResultBuf, *pageId);
// clear memory for the new page
memset(page, 0, DEFAULT_INTERN_BUF_PAGE_SIZE);
......
......@@ -17,6 +17,7 @@
#include "tulog.h"
#include "talgo.h"
#include "tutil.h"
#include "ttime.h"
#include "tcompare.h"
#include "exception.h"
......@@ -71,6 +72,8 @@ typedef struct STableCheckInfo {
int32_t compSize;
int32_t numOfBlocks; // number of qualified data blocks not the original blocks
SDataCols* pDataCols;
int32_t chosen; // indicate which iterator should move forward
bool initBuf; // whether to initialize the in-memory skip list iterator or not
SSkipListIterator* iter; // mem buffer skip list iterator
SSkipListIterator* iiter; // imem buffer skip list iterator
......@@ -79,8 +82,6 @@ typedef struct STableCheckInfo {
typedef struct STableBlockInfo {
SCompBlock* compBlock;
STableCheckInfo* pTableCheckInfo;
// int32_t blockIndex;
// int32_t groupIdx; /* number of group is less than the total number of tables */
} STableBlockInfo;
typedef struct SBlockOrderSupporter {
......@@ -120,7 +121,7 @@ static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle);
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock,
SArray* sa);
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
static int tsdbReadRowsFromCache(SSkipListIterator* pIter, STable* pTable, TSKEY maxKey, int maxRowsToRead, TSKEY* skey, TSKEY* ekey,
static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, TSKEY* skey, TSKEY* ekey,
STsdbQueryHandle* pQueryHandle);
static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) {
......@@ -134,7 +135,7 @@ static void tsdbInitCompBlockLoadInfo(SLoadCompBlockInfo* pCompBlockLoadInfo) {
pCompBlockLoadInfo->fileId = -1;
}
TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList) {
TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, void* qinfo) {
// todo 1. filter not exist table
// todo 2. add the reference count for each table that is involved in query
......@@ -147,6 +148,7 @@ TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STable
pQueryHandle->cur.win = TSWINDOW_INITIALIZER;
pQueryHandle->checkFiles = true;//ASCENDING_TRAVERSE(pQueryHandle->order);
pQueryHandle->activeIndex = 0; // current active table index
pQueryHandle->qinfo = qinfo;
pQueryHandle->outputCapacity = ((STsdbRepo*)tsdb)->config.maxRowsPerFileBlock;
tsdbInitReadHelper(&pQueryHandle->rhelper, (STsdbRepo*) tsdb);
......@@ -201,8 +203,8 @@ TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STable
return (TsdbQueryHandleT) pQueryHandle;
}
TsdbQueryHandleT tsdbQueryLastRow(TsdbRepoT *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList) {
STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList);
TsdbQueryHandleT tsdbQueryLastRow(TsdbRepoT *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, void* qinfo) {
STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qinfo);
pQueryHandle->type = TSDB_QUERY_TYPE_LAST;
pQueryHandle->order = TSDB_ORDER_DESC;
......@@ -227,8 +229,8 @@ SArray* tsdbGetQueriedTableIdList(TsdbQueryHandleT *pHandle) {
return res;
}
TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TsdbRepoT *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList) {
STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList);
TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TsdbRepoT *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList, void* qinfo) {
STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qinfo);
pQueryHandle->type = TSDB_QUERY_TYPE_EXTERNAL;
// pQueryHandle->outputCapacity = 2; // only allowed two rows to be loaded
......@@ -303,6 +305,83 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
return true;
}
SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo) {
SDataRow rmem = NULL, rimem = NULL;
if (pCheckInfo->iter) {
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
if (node != NULL) {
rmem = SL_GET_NODE_DATA(node);
}
}
if (pCheckInfo->iiter) {
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
if (node != NULL) {
rimem = SL_GET_NODE_DATA(node);
}
}
if (rmem != NULL && rimem != NULL) {
if (dataRowKey(rmem) < dataRowKey(rimem)) {
pCheckInfo->chosen = 0;
return rmem;
} else if (dataRowKey(rmem) == dataRowKey(rimem)) {
// data ts are duplicated, ignore the data in mem
tSkipListIterNext(pCheckInfo->iter);
pCheckInfo->chosen = 1;
return rimem;
} else {
pCheckInfo->chosen = 1;
return rimem;
}
}
if (rmem != NULL) {
pCheckInfo->chosen = 0;
return rmem;
}
if (rimem != NULL) {
pCheckInfo->chosen = 1;
return rimem;
}
return NULL;
}
bool moveToNextRow(STableCheckInfo* pCheckInfo) {
bool hasNext = false;
if (pCheckInfo->chosen == 0) {
if (pCheckInfo->iter != NULL) {
hasNext = tSkipListIterNext(pCheckInfo->iter);
}
if (hasNext) {
return hasNext;
}
if (pCheckInfo->iiter != NULL) {
return tSkipListIterGet(pCheckInfo->iiter) != NULL;
}
} else {
if (pCheckInfo->chosen == 1) {
if (pCheckInfo->iiter != NULL) {
hasNext = tSkipListIterNext(pCheckInfo->iiter);
}
if (hasNext) {
return hasNext;
}
if (pCheckInfo->iter != NULL) {
return tSkipListIterGet(pCheckInfo->iter) != NULL;
}
}
}
return hasNext;
}
static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
size_t size = taosArrayGetSize(pHandle->pTableCheckInfo);
assert(pHandle->activeIndex < size && pHandle->activeIndex >= 0 && size >= 1);
......@@ -312,31 +391,13 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
STable* pTable = pCheckInfo->pTableObj;
assert(pTable != NULL);
// no data in cache, abort
if (pTable->mem == NULL && pTable->imem == NULL) {
return false;
}
if (pCheckInfo->iter == NULL && pTable->mem) {
pCheckInfo->iter = tSkipListCreateIterFromVal(pTable->mem->pData, (const char*) &pCheckInfo->lastKey,
TSDB_DATA_TYPE_TIMESTAMP, pHandle->order);
if (pCheckInfo->iter == NULL) {
return false;
}
if (!tSkipListIterNext(pCheckInfo->iter)) { // buffer is empty
return false;
}
}
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
if (node == NULL) {
initTableMemIterator(pHandle, pCheckInfo);
SDataRow row = getSDataRowInTableMem(pCheckInfo);
if (row == NULL) {
return false;
}
SDataRow row = SL_GET_NODE_DATA(node);
pCheckInfo->lastKey = dataRowKey(row); // first timestamp in buffer
tsdbTrace("%p uid:%" PRId64", tid:%d check data in buffer from skey:%" PRId64 ", order:%d, %p", pHandle,
pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, pCheckInfo->lastKey, pHandle->order, pHandle->qinfo);
......@@ -349,7 +410,7 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
int32_t step = ASCENDING_TRAVERSE(pHandle->order)? 1:-1;
STimeWindow* win = &pHandle->cur.win;
pHandle->cur.rows = tsdbReadRowsFromCache(pCheckInfo->iter, pCheckInfo->pTableObj, pHandle->window.ekey,
pHandle->cur.rows = tsdbReadRowsFromCache(pCheckInfo, pHandle->window.ekey,
pHandle->outputCapacity, &win->skey, &win->ekey, pHandle); // todo refactor API
// update the last key value
......@@ -382,7 +443,7 @@ static int32_t getFileIdFromKey(TSKEY key, int32_t daysPerFile, int32_t precisio
return fid;
}
static int32_t binarySearchForBlockImpl(SCompBlock* pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) {
static int32_t binarySearchForBlock(SCompBlock* pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) {
int32_t firstSlot = 0;
int32_t lastSlot = numOfBlocks - 1;
......@@ -448,7 +509,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
TSKEY e = MAX(pCheckInfo->lastKey, pQueryHandle->window.ekey);
// discard the unqualified data block based on the query time window
int32_t start = binarySearchForBlockImpl(pCompInfo->blocks, compIndex->numOfBlocks, s, TSDB_ORDER_ASC);
int32_t start = binarySearchForBlock(pCompInfo->blocks, compIndex->numOfBlocks, s, TSDB_ORDER_ASC);
int32_t end = start;
if (s > pCompInfo->blocks[start].keyLast) {
......@@ -522,7 +583,9 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
bool blockLoaded = false;
SArray* sa = getDefaultLoadColumns(pQueryHandle, true);
int64_t st = taosGetTimestampUs();
if (pCheckInfo->pDataCols == NULL) {
STsdbMeta* pMeta = tsdbGetMeta(pRepo);
pCheckInfo->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock);
......@@ -540,9 +603,15 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
blockLoaded = true;
}
SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
assert(pCols->numOfRows != 0);
taosArrayDestroy(sa);
tfree(data);
int64_t et = taosGetTimestampUs() - st;
tsdbTrace("%p load file block into buffer, elapsed time:%"PRId64 " us", pQueryHandle, et);
return blockLoaded;
}
......@@ -600,7 +669,7 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock*
// do not load file block into buffer
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order) ? 1 : -1;
cur->rows = tsdbReadRowsFromCache(pCheckInfo->iter, pCheckInfo->pTableObj, binfo.window.skey - step,
cur->rows = tsdbReadRowsFromCache(pCheckInfo, binfo.window.skey - step,
pQueryHandle->outputCapacity, &cur->win.skey, &cur->win.ekey, pQueryHandle);
pQueryHandle->realNumOfRows = cur->rows;
......@@ -649,7 +718,7 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo)) {
return false;
}
SDataCols* pTSCol = pQueryHandle->rhelper.pDataCols[0];
assert(pTSCol->cols->type == TSDB_DATA_TYPE_TIMESTAMP && pTSCol->numOfRows == pBlock->numOfRows);
......@@ -1155,7 +1224,7 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
*numOfAllocBlocks = numOfBlocks;
int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
SBlockOrderSupporter sup = {0};
sup.numOfTables = numOfTables;
sup.numOfBlocksPerTable = calloc(1, sizeof(int32_t) * numOfTables);
......@@ -1192,15 +1261,26 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
pBlockInfo->compBlock = &pBlock[k];
pBlockInfo->pTableCheckInfo = pTableCheck;
// pBlockInfo->groupIdx = pTableCheckInfo[j]->groupIdx; // set the group index
// pBlockInfo->blockIndex = pTableCheckInfo[j]->start + k; // set the block index in original table
cnt++;
}
numOfQualTables++;
}
tsdbTrace("%p create data blocks info struct completed, %d blocks in %d tables", pQueryHandle, cnt, numOfQualTables);
assert(numOfBlocks == cnt);
// since there is only one table qualified, blocks are not sorted
if (numOfQualTables == 1) {
memcpy(pQueryHandle->pDataBlockInfo, sup.pDataBlockInfo[0], sizeof(STableBlockInfo) * numOfBlocks);
cleanBlockOrderSupporter(&sup, numOfQualTables);
tsdbTrace("%p create data blocks info struct completed for 1 table, %d blocks not sorted %p ", pQueryHandle, cnt,
pQueryHandle->qinfo);
return TSDB_CODE_SUCCESS;
}
tsdbTrace("%p create data blocks info struct completed, %d blocks in %d tables %p", pQueryHandle, cnt,
numOfQualTables, pQueryHandle->qinfo);
assert(cnt <= numOfBlocks && numOfQualTables <= numOfTables); // the pTableQueryInfo[j]->numOfBlocks may be 0
sup.numOfTables = numOfQualTables;
......@@ -1257,8 +1337,8 @@ static bool getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle) {
break;
}
tsdbTrace("%p %d blocks found in file for %d table(s), fid:%d", pQueryHandle, numOfBlocks,
numOfTables, pQueryHandle->pFileGroup->fileId);
tsdbTrace("%p %d blocks found in file for %d table(s), fid:%d, %p", pQueryHandle, numOfBlocks,
numOfTables, pQueryHandle->pFileGroup->fileId, pQueryHandle->qinfo);
assert(numOfBlocks >= 0);
if (numOfBlocks == 0) {
......@@ -1565,19 +1645,22 @@ static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle) {
pQueryHandle->window = (STimeWindow) {info.lastKey, TSKEY_INITIAL_VAL};
}
static int tsdbReadRowsFromCache(SSkipListIterator* pIter, STable* pTable, TSKEY maxKey, int maxRowsToRead, TSKEY* skey, TSKEY* ekey,
static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, TSKEY* skey, TSKEY* ekey,
STsdbQueryHandle* pQueryHandle) {
int numOfRows = 0;
int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns);
*skey = TSKEY_INITIAL_VAL;
int64_t st = taosGetTimestampUs();
STSchema* pSchema = tsdbGetTableSchema(tsdbGetMeta(pQueryHandle->pTsdb), pCheckInfo->pTableObj);
int32_t numOfTableCols = schemaNCols(pSchema);
do {
SSkipListNode* node = tSkipListIterGet(pIter);
if (node == NULL) {
SDataRow row = getSDataRowInTableMem(pCheckInfo);
if (row == NULL) {
break;
}
SDataRow row = SL_GET_NODE_DATA(node);
TSKEY key = dataRowKey(row);
if ((key > maxKey && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
......@@ -1590,49 +1673,69 @@ static int tsdbReadRowsFromCache(SSkipListIterator* pIter, STable* pTable, TSKEY
}
if (*skey == INT64_MIN) {
*skey = dataRowKey(row);
*skey = key;
}
*ekey = dataRowKey(row);
int32_t offset = -1;
*ekey = key;
char* pData = NULL;
STSchema* pSchema = tsdbGetTableSchema(tsdbGetMeta(pQueryHandle->pTsdb), pTable);
int32_t numOfTableCols = schemaNCols(pSchema);
for (int32_t i = 0; i < numOfCols; ++i) {
int32_t i = 0, j = 0;
while(i < numOfCols && j < numOfTableCols) {
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
if (pSchema->columns[j].colId < pColInfo->info.colId) {
j++;
continue;
}
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
pData = pColInfo->pData + numOfRows * pColInfo->info.bytes;
} else {
pData = pColInfo->pData + (maxRowsToRead - numOfRows - 1) * pColInfo->info.bytes;
}
for(int32_t j = 0; j < numOfTableCols; ++j) {
if (pColInfo->info.colId == pSchema->columns[j].colId) {
offset = pSchema->columns[j].offset;
break;
if (pSchema->columns[j].colId == pColInfo->info.colId) {
void* value = tdGetRowDataOfCol(row, pColInfo->info.type, TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset);
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
memcpy(pData, value, varDataTLen(value));
} else {
memcpy(pData, value, pColInfo->info.bytes);
}
j++;
i++;
} else { // pColInfo->info.colId < pSchema->columns[j].colId, it is a NULL data
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
setVardataNull(pData, pColInfo->info.type);
} else {
setNull(pData, pColInfo->info.type, pColInfo->info.bytes);
}
i++;
}
assert(offset != -1); // todo handle error
void *value = tdGetRowDataOfCol(row, pColInfo->info.type, TD_DATA_ROW_HEAD_SIZE + offset);
}
while (i < numOfCols) { // the remain columns are all null data
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
pData = pColInfo->pData + numOfRows * pColInfo->info.bytes;
} else {
pData = pColInfo->pData + (maxRowsToRead - numOfRows - 1) * pColInfo->info.bytes;
}
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
memcpy(pData, value, varDataTLen(value));
setVardataNull(pData, pColInfo->info.type);
} else {
memcpy(pData, value, pColInfo->info.bytes);
setNull(pData, pColInfo->info.type, pColInfo->info.bytes);
}
i++;
}
if (++numOfRows >= maxRowsToRead) {
tSkipListIterNext(pIter);
moveToNextRow(pCheckInfo);
break;
}
} while(tSkipListIterNext(pIter));
} while(moveToNextRow(pCheckInfo));
assert(numOfRows <= maxRowsToRead);
......@@ -1646,6 +1749,10 @@ static int tsdbReadRowsFromCache(SSkipListIterator* pIter, STable* pTable, TSKEY
}
}
int64_t elapsedTime = taosGetTimestampUs() - st;
tsdbTrace("%p build data block from cache completed, elapsed time:%"PRId64" us, numOfRows:%d, numOfCols:%d", pQueryHandle,
elapsedTime, numOfRows, numOfCols);
return numOfRows;
}
......
......@@ -153,6 +153,8 @@ bool taosMbsToUcs4(char *mbs, size_t mbs_len, char *ucs4, int32_t ucs4_max_len,
int tasoUcs4Compare(void* f1_ucs4, void *f2_ucs4, int bytes);
void taosRandStr(char* str, int32_t size);
int32_t taosUcs4ToMbs(void *ucs4, int32_t ucs4_max_len, char *mbs);
bool taosValidateEncodec(const char *encodec);
......
......@@ -92,12 +92,14 @@ void* taosArrayGet(const SArray* pArray, size_t index) {
}
void* taosArrayGetP(const SArray* pArray, size_t index) {
void* ret = taosArrayGet(pArray, index);
if (ret == NULL) {
assert(index < pArray->size);
void* d = TARRAY_GET_ELEM(pArray, index);
if (d == NULL) {
return NULL;
}
return *(void**)ret;
return *(void**)d;
}
size_t taosArrayGetSize(const SArray* pArray) { return pArray->size; }
......
......@@ -27,8 +27,6 @@
#include "tulog.h"
#include "taoserror.h"
int32_t tmpFileSerialNum = 0;
int32_t strdequote(char *z) {
if (z == NULL) {
return 0;
......@@ -433,12 +431,24 @@ void getTmpfilePath(const char *fileNamePrefix, char *dstPath) {
#else
char *tmpDir = "/tmp/";
#endif
int64_t ts = taosGetTimestampUs();
strcpy(tmpPath, tmpDir);
strcat(tmpPath, tdengineTmpFileNamePrefix);
strcat(tmpPath, fileNamePrefix);
strcat(tmpPath, "-%d-%"PRIu64"-%u-%"PRIu64);
snprintf(dstPath, PATH_MAX, tmpPath, getpid(), taosGetPthreadId(), atomic_add_fetch_32(&tmpFileSerialNum, 1), ts);
strcat(tmpPath, "-%d-%s");
char rand[8] = {0};
taosRandStr(rand, tListLen(rand) - 1);
snprintf(dstPath, PATH_MAX, tmpPath, getpid(), rand);
}
void taosRandStr(char* str, int32_t size) {
const char* set = "abcdefghijklmnopqrstuvwxyz0123456789-_.";
int32_t len = 39;
for(int32_t i = 0; i < size; ++i) {
str[i] = set[rand()%len];
}
}
int tasoUcs4Compare(void* f1_ucs4, void *f2_ucs4, int bytes) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册