未验证 提交 5119c4d8 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #1896 from taosdata/feature/query

Feature/query
...@@ -116,7 +116,7 @@ int32_t tscFlushTmpBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tF ...@@ -116,7 +116,7 @@ int32_t tscFlushTmpBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tF
* create local reducer to launch the second-stage reduce process at client site * create local reducer to launch the second-stage reduce process at client site
*/ */
void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc, void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc,
SColumnModel *finalModel, SSqlCmd *pSqlCmd, SSqlRes *pRes); SColumnModel *finalModel, SSqlObj* pSql);
void tscDestroyLocalReducer(SSqlObj *pSql); void tscDestroyLocalReducer(SSqlObj *pSql);
......
...@@ -183,7 +183,7 @@ void tscSqlExprInfoDestroy(SArray* pExprInfo); ...@@ -183,7 +183,7 @@ void tscSqlExprInfoDestroy(SArray* pExprInfo);
SColumn* tscColumnClone(const SColumn* src); SColumn* tscColumnClone(const SColumn* src);
SColumn* tscColumnListInsert(SArray* pColList, SColumnIndex* colIndex); SColumn* tscColumnListInsert(SArray* pColList, SColumnIndex* colIndex);
void tscColumnListCopy(SArray* dst, const SArray* src, int16_t tableIndex); SArray* tscColumnListClone(const SArray* src, int16_t tableIndex);
void tscColumnListDestroy(SArray* pColList); void tscColumnListDestroy(SArray* pColList);
SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numOfFilters); SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numOfFilters);
......
...@@ -2904,7 +2904,11 @@ static void leastsquares_finalizer(SQLFunctionCtx *pCtx) { ...@@ -2904,7 +2904,11 @@ static void leastsquares_finalizer(SQLFunctionCtx *pCtx) {
param[1][2] /= param[1][1]; param[1][2] /= param[1][1];
sprintf(pCtx->aOutputBuf, "(%lf, %lf)", param[0][2], param[1][2]); int32_t maxOutputSize = TSDB_AVG_FUNCTION_INTER_BUFFER_SIZE - VARSTR_HEADER_SIZE;
size_t n = snprintf(varDataVal(pCtx->aOutputBuf), maxOutputSize, "{slop:%.6lf, intercept:%.6lf}",
param[0][2], param[1][2]);
varDataSetLen(pCtx->aOutputBuf, n);
doFinalizer(pCtx); doFinalizer(pCtx);
} }
......
...@@ -1297,10 +1297,6 @@ SSqlExpr* doAddProjectCol(SQueryInfo* pQueryInfo, int32_t outputIndex, int32_t c ...@@ -1297,10 +1297,6 @@ SSqlExpr* doAddProjectCol(SQueryInfo* pQueryInfo, int32_t outputIndex, int32_t c
pSchema->bytes, functionId == TSDB_FUNC_TAGPRJ); pSchema->bytes, functionId == TSDB_FUNC_TAGPRJ);
} }
void addRequiredTagColumn(STableMetaInfo* pTableMetaInfo, SColumnIndex* index) {
}
static void addProjectQueryCol(SQueryInfo* pQueryInfo, int32_t startPos, SColumnIndex* pIndex, tSQLExprItem* pItem) { static void addProjectQueryCol(SQueryInfo* pQueryInfo, int32_t startPos, SColumnIndex* pIndex, tSQLExprItem* pItem) {
SSqlExpr* pExpr = doAddProjectCol(pQueryInfo, startPos, pIndex->columnIndex, pIndex->tableIndex); SSqlExpr* pExpr = doAddProjectCol(pQueryInfo, startPos, pIndex->columnIndex, pIndex->tableIndex);
...@@ -3796,6 +3792,8 @@ static int32_t getTagQueryCondExpr(SQueryInfo* pQueryInfo, SCondExpr* pCondExpr, ...@@ -3796,6 +3792,8 @@ static int32_t getTagQueryCondExpr(SQueryInfo* pQueryInfo, SCondExpr* pCondExpr,
tSQLExprDestroy(p1); tSQLExprDestroy(p1);
tExprTreeDestroy(&p, NULL); tExprTreeDestroy(&p, NULL);
taosArrayDestroy(colList);
} }
pCondExpr->pTagCond = NULL; pCondExpr->pTagCond = NULL;
......
...@@ -55,7 +55,7 @@ int32_t treeComparator(const void *pLeft, const void *pRight, void *param) { ...@@ -55,7 +55,7 @@ int32_t treeComparator(const void *pLeft, const void *pRight, void *param) {
} }
} }
static void tscInitSqlContext(SSqlCmd *pCmd, SSqlRes *pRes, SLocalReducer *pReducer, tOrderDescriptor *pDesc) { static void tscInitSqlContext(SSqlCmd *pCmd, SLocalReducer *pReducer, tOrderDescriptor *pDesc) {
/* /*
* the fields and offset attributes in pCmd and pModel may be different due to * the fields and offset attributes in pCmd and pModel may be different due to
* merge requirement. So, the final result in pRes structure is formatted in accordance with the pCmd object. * merge requirement. So, the final result in pRes structure is formatted in accordance with the pCmd object.
...@@ -96,13 +96,13 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SSqlRes *pRes, SLocalReducer *pRedu ...@@ -96,13 +96,13 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SSqlRes *pRes, SLocalReducer *pRedu
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) { if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
pCtx->ptsOutputBuf = pReducer->pCtx[0].aOutputBuf; pCtx->ptsOutputBuf = pReducer->pCtx[0].aOutputBuf;
pCtx->param[2].i64Key = pQueryInfo->order.order; pCtx->param[2].i64Key = pQueryInfo->order.order;
pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT; pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT;
pCtx->param[1].i64Key = pQueryInfo->order.orderColId; pCtx->param[1].i64Key = pQueryInfo->order.orderColId;
} }
SResultInfo *pResInfo = &pReducer->pResInfo[i]; SResultInfo *pResInfo = &pReducer->pResInfo[i];
pResInfo->bufLen = pExpr->interBytes; pResInfo->bufLen = pExpr->interBytes;
pResInfo->interResultBuf = calloc(1, (size_t)pResInfo->bufLen); pResInfo->interResultBuf = calloc(1, (size_t) pResInfo->bufLen);
pCtx->resultInfo = &pReducer->pResInfo[i]; pCtx->resultInfo = &pReducer->pResInfo[i];
pCtx->resultInfo->superTableQ = true; pCtx->resultInfo->superTableQ = true;
...@@ -132,16 +132,15 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SSqlRes *pRes, SLocalReducer *pRedu ...@@ -132,16 +132,15 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SSqlRes *pRes, SLocalReducer *pRedu
} }
} }
/*
* todo release allocated memory process with async process
*/
void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc, void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc,
SColumnModel *finalmodel, SSqlCmd *pCmd, SSqlRes *pRes) { SColumnModel *finalmodel, SSqlObj* pSql) {
// offset of cmd in SSqlObj structure SSqlCmd* pCmd = &pSql->cmd;
char *pSqlObjAddr = (char *)pCmd - offsetof(SSqlObj, cmd); SSqlRes* pRes = &pSql->res;
if (pMemBuffer == NULL) { if (pMemBuffer == NULL) {
tscError("%p pMemBuffer", pMemBuffer); tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer);
tscError("%p pMemBuffer is NULL", pMemBuffer);
pRes->code = TSDB_CODE_APP_ERROR; pRes->code = TSDB_CODE_APP_ERROR;
return; return;
} }
...@@ -149,7 +148,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd ...@@ -149,7 +148,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
if (pDesc->pColumnModel == NULL) { if (pDesc->pColumnModel == NULL) {
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer); tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer);
tscError("%p no local buffer or intermediate result format model", pSqlObjAddr); tscError("%p no local buffer or intermediate result format model", pSql);
pRes->code = TSDB_CODE_APP_ERROR; pRes->code = TSDB_CODE_APP_ERROR;
return; return;
} }
...@@ -158,7 +157,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd ...@@ -158,7 +157,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
for (int32_t i = 0; i < numOfBuffer; ++i) { for (int32_t i = 0; i < numOfBuffer; ++i) {
int32_t len = pMemBuffer[i]->fileMeta.flushoutData.nLength; int32_t len = pMemBuffer[i]->fileMeta.flushoutData.nLength;
if (len == 0) { if (len == 0) {
tscTrace("%p no data retrieved from orderOfVnode:%d", pSqlObjAddr, i + 1); tscTrace("%p no data retrieved from orderOfVnode:%d", pSql, i + 1);
continue; continue;
} }
...@@ -167,13 +166,13 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd ...@@ -167,13 +166,13 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
if (numOfFlush == 0 || numOfBuffer == 0) { if (numOfFlush == 0 || numOfBuffer == 0) {
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer); tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer);
tscTrace("%p retrieved no data", pSqlObjAddr); tscTrace("%p retrieved no data", pSql);
return; return;
} }
if (pDesc->pColumnModel->capacity >= pMemBuffer[0]->pageSize) { if (pDesc->pColumnModel->capacity >= pMemBuffer[0]->pageSize) {
tscError("%p Invalid value of buffer capacity %d and page size %d ", pSqlObjAddr, pDesc->pColumnModel->capacity, tscError("%p Invalid value of buffer capacity %d and page size %d ", pSql, pDesc->pColumnModel->capacity,
pMemBuffer[0]->pageSize); pMemBuffer[0]->pageSize);
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer); tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer);
...@@ -181,10 +180,11 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd ...@@ -181,10 +180,11 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
return; return;
} }
size_t nReducerSize = sizeof(SLocalReducer) + sizeof(void *) * numOfFlush; size_t size = sizeof(SLocalReducer) + POINTER_BYTES * numOfFlush;
SLocalReducer *pReducer = (SLocalReducer *)calloc(1, nReducerSize);
SLocalReducer *pReducer = (SLocalReducer *) calloc(1, size);
if (pReducer == NULL) { if (pReducer == NULL) {
tscError("%p failed to create merge structure", pSqlObjAddr); tscError("%p failed to create local merge structure, out of memory", pSql);
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer); tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer);
pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY; pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
...@@ -199,48 +199,52 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd ...@@ -199,48 +199,52 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
pReducer->numOfVnode = numOfBuffer; pReducer->numOfVnode = numOfBuffer;
pReducer->pDesc = pDesc; pReducer->pDesc = pDesc;
tscTrace("%p the number of merged leaves is: %d", pSqlObjAddr, pReducer->numOfBuffer); tscTrace("%p the number of merged leaves is: %d", pSql, pReducer->numOfBuffer);
int32_t idx = 0; int32_t idx = 0;
for (int32_t i = 0; i < numOfBuffer; ++i) { for (int32_t i = 0; i < numOfBuffer; ++i) {
int32_t numOfFlushoutInFile = pMemBuffer[i]->fileMeta.flushoutData.nLength; int32_t numOfFlushoutInFile = pMemBuffer[i]->fileMeta.flushoutData.nLength;
for (int32_t j = 0; j < numOfFlushoutInFile; ++j) { for (int32_t j = 0; j < numOfFlushoutInFile; ++j) {
SLocalDataSource *pDS = (SLocalDataSource *)malloc(sizeof(SLocalDataSource) + pMemBuffer[0]->pageSize); SLocalDataSource *ds = (SLocalDataSource *)malloc(sizeof(SLocalDataSource) + pMemBuffer[0]->pageSize);
if (pDS == NULL) { if (ds == NULL) {
tscError("%p failed to create merge structure", pSqlObjAddr); tscError("%p failed to create merge structure", pSql);
pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY; pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
return; return;
} }
pReducer->pLocalDataSrc[idx] = pDS;
pReducer->pLocalDataSrc[idx] = ds;
pDS->pMemBuffer = pMemBuffer[i]; ds->pMemBuffer = pMemBuffer[i];
pDS->flushoutIdx = j; ds->flushoutIdx = j;
pDS->filePage.numOfElems = 0; ds->filePage.numOfElems = 0;
pDS->pageId = 0; ds->pageId = 0;
pDS->rowIdx = 0; ds->rowIdx = 0;
tscTrace("%p load data from disk into memory, orderOfVnode:%d, total:%d", pSqlObjAddr, i + 1, idx + 1); tscTrace("%p load data from disk into memory, orderOfVnode:%d, total:%d", pSql, i + 1, idx + 1);
tExtMemBufferLoadData(pMemBuffer[i], &(pDS->filePage), j, 0); tExtMemBufferLoadData(pMemBuffer[i], &(ds->filePage), j, 0);
#ifdef _DEBUG_VIEW #ifdef _DEBUG_VIEW
printf("load data page into mem for build loser tree: %" PRIu64 " rows\n", pDS->filePage.numOfElems); printf("load data page into mem for build loser tree: %" PRIu64 " rows\n", ds->filePage.numOfElems);
SSrcColumnInfo colInfo[256] = {0}; SSrcColumnInfo colInfo[256] = {0};
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
tscGetSrcColumnInfo(colInfo, pQueryInfo); tscGetSrcColumnInfo(colInfo, pQueryInfo);
tColModelDisplayEx(pDesc->pColumnModel, pDS->filePage.data, pDS->filePage.numOfElems, tColModelDisplayEx(pDesc->pColumnModel, ds->filePage.data, ds->filePage.numOfElems,
pMemBuffer[0]->numOfElemsPerPage, colInfo); pMemBuffer[0]->numOfElemsPerPage, colInfo);
#endif #endif
if (pDS->filePage.numOfElems == 0) { // no data in this flush
tscTrace("%p flush data is empty, ignore %d flush record", pSqlObjAddr, idx); if (ds->filePage.numOfElems == 0) { // no data in this flush, the index does not increase
tfree(pDS); tscTrace("%p flush data is empty, ignore %d flush record", pSql, idx);
tfree(ds);
continue; continue;
} }
idx += 1; idx += 1;
} }
} }
assert(idx >= pReducer->numOfBuffer);
// no data actually, no need to merge result.
if (idx == 0) { if (idx == 0) {
return; return;
} }
...@@ -262,9 +266,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd ...@@ -262,9 +266,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
// the input data format follows the old format, but output in a new format. // the input data format follows the old format, but output in a new format.
// so, all the input must be parsed as old format // so, all the input must be parsed as old format
size_t size = tscSqlExprNumOfExprs(pQueryInfo); pReducer->pCtx = (SQLFunctionCtx *)calloc(tscSqlExprNumOfExprs(pQueryInfo), sizeof(SQLFunctionCtx));
pReducer->pCtx = (SQLFunctionCtx *)calloc(size, sizeof(SQLFunctionCtx));
pReducer->rowSize = pMemBuffer[0]->nElemSize; pReducer->rowSize = pMemBuffer[0]->nElemSize;
tscRestoreSQLFuncForSTableQuery(pQueryInfo); tscRestoreSQLFuncForSTableQuery(pQueryInfo);
...@@ -313,7 +315,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd ...@@ -313,7 +315,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
pReducer->pResInfo = calloc(size, sizeof(SResultInfo)); pReducer->pResInfo = calloc(size, sizeof(SResultInfo));
tscCreateResPointerInfo(pRes, pQueryInfo); tscCreateResPointerInfo(pRes, pQueryInfo);
tscInitSqlContext(pCmd, pRes, pReducer, pDesc); tscInitSqlContext(pCmd, pReducer, pDesc);
// we change the capacity of schema to denote that there is only one row in temp buffer // we change the capacity of schema to denote that there is only one row in temp buffer
pReducer->pDesc->pColumnModel->capacity = 1; pReducer->pDesc->pColumnModel->capacity = 1;
...@@ -428,8 +430,7 @@ int32_t saveToBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePa ...@@ -428,8 +430,7 @@ int32_t saveToBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePa
tColModelAppend(pModel, pPage, data, numOfRows - remain, numOfWriteElems, numOfRows); tColModelAppend(pModel, pPage, data, numOfRows - remain, numOfWriteElems, numOfRows);
if (pPage->numOfElems == pModel->capacity) { if (pPage->numOfElems == pModel->capacity) {
int32_t ret = tscFlushTmpBuffer(pMemoryBuf, pDesc, pPage, orderType); if (tscFlushTmpBuffer(pMemoryBuf, pDesc, pPage, orderType) != TSDB_CODE_SUCCESS) {
if (ret != 0) {
return -1; return -1;
} }
} else { } else {
......
...@@ -550,8 +550,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -550,8 +550,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
} }
/* /*
* for meter query, simply return the size <= 1k * for table query, simply return the size <= 1k
* for metric query, estimate size according to meter tags
*/ */
static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) { static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
const static int32_t MIN_QUERY_MSG_PKT_SIZE = TSDB_MAX_BYTES_PER_ROW * 5; const static int32_t MIN_QUERY_MSG_PKT_SIZE = TSDB_MAX_BYTES_PER_ROW * 5;
...@@ -562,15 +561,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) { ...@@ -562,15 +561,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo); size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
int32_t exprSize = sizeof(SSqlFuncMsg) * numOfExprs; int32_t exprSize = sizeof(SSqlFuncMsg) * numOfExprs;
//STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + exprSize + 4096;
// table query without tags values
//if (!UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) {
return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + exprSize + 4096;
//}
//int32_t size = 4096;
//return size;
} }
static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char *pMsg) { static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char *pMsg) {
......
...@@ -1533,8 +1533,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p ...@@ -1533,8 +1533,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
SQueryInfo *pPQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0); SQueryInfo *pPQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0);
tscClearInterpInfo(pPQueryInfo); tscClearInterpInfo(pPQueryInfo);
tscCreateLocalReducer(trsupport->pExtMemBuffer, pState->numOfTotal, pDesc, trsupport->pFinalColModel, tscCreateLocalReducer(trsupport->pExtMemBuffer, pState->numOfTotal, pDesc, trsupport->pFinalColModel, pPObj);
&pPObj->cmd, &pPObj->res);
tscTrace("%p build loser tree completed", pPObj); tscTrace("%p build loser tree completed", pPObj);
pPObj->res.precision = pSql->res.precision; pPObj->res.precision = pSql->res.precision;
......
...@@ -1209,18 +1209,18 @@ void tscColumnListCopy(SArray* dst, const SArray* src, int16_t tableIndex) { ...@@ -1209,18 +1209,18 @@ void tscColumnListCopy(SArray* dst, const SArray* src, int16_t tableIndex) {
} }
} }
void tscColumnListDestroy(SArray* pColumnBaseInfo) { void tscColumnListDestroy(SArray* pColumnList) {
if (pColumnBaseInfo == NULL) { if (pColumnList == NULL) {
return; return;
} }
size_t num = taosArrayGetSize(pColumnBaseInfo); size_t num = taosArrayGetSize(pColumnList);
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
SColumn* pCol = taosArrayGetP(pColumnBaseInfo, i); SColumn* pCol = taosArrayGetP(pColumnList, i);
tscColumnDestroy(pCol); tscColumnDestroy(pCol);
} }
taosArrayDestroy(pColumnBaseInfo); taosArrayDestroy(pColumnList);
} }
/* /*
...@@ -1572,7 +1572,7 @@ void tscInitQueryInfo(SQueryInfo* pQueryInfo) { ...@@ -1572,7 +1572,7 @@ void tscInitQueryInfo(SQueryInfo* pQueryInfo) {
assert(pQueryInfo->exprList == NULL); assert(pQueryInfo->exprList == NULL);
pQueryInfo->exprList = taosArrayInit(4, POINTER_BYTES); pQueryInfo->exprList = taosArrayInit(4, POINTER_BYTES);
pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES); pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
} }
int32_t tscAddSubqueryInfo(SSqlCmd* pCmd) { int32_t tscAddSubqueryInfo(SSqlCmd* pCmd) {
...@@ -1666,8 +1666,7 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST ...@@ -1666,8 +1666,7 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST
assert(pTableMetaInfo != NULL); assert(pTableMetaInfo != NULL);
if (name != NULL) { if (name != NULL) {
assert(strlen(name) <= TSDB_TABLE_ID_LEN); strncpy(pTableMetaInfo->name, name, TSDB_TABLE_ID_LEN);
strcpy(pTableMetaInfo->name, name);
} }
pTableMetaInfo->pTableMeta = pTableMeta; pTableMetaInfo->pTableMeta = pTableMeta;
...@@ -1678,10 +1677,9 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST ...@@ -1678,10 +1677,9 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST
memcpy(pTableMetaInfo->vgroupList, vgroupList, size); memcpy(pTableMetaInfo->vgroupList, vgroupList, size);
} }
if (pTagCols == NULL) { pTableMetaInfo->tagColList = taosArrayInit(4, POINTER_BYTES);
pTableMetaInfo->tagColList = taosArrayInit(4, POINTER_BYTES); if (pTagCols != NULL) {
} else { tscColumnListCopy(pTableMetaInfo->tagColList, pTagCols, -1);
pTableMetaInfo->tagColList = taosArrayClone(pTagCols);
} }
pQueryInfo->numOfTables += 1; pQueryInfo->numOfTables += 1;
...@@ -1701,6 +1699,12 @@ void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache) ...@@ -1701,6 +1699,12 @@ void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache)
tfree(pTableMetaInfo->vgroupList); tfree(pTableMetaInfo->vgroupList);
if (pTableMetaInfo->tagColList != NULL) { if (pTableMetaInfo->tagColList != NULL) {
size_t numOfTags = taosArrayGetSize(pTableMetaInfo->tagColList);
for(int32_t i = 0; i < numOfTags; ++i) { // todo do NOT use the allocated object
SColumn* pCol = taosArrayGetP(pTableMetaInfo->tagColList, i);
tfree(pCol);
}
taosArrayDestroy(pTableMetaInfo->tagColList); taosArrayDestroy(pTableMetaInfo->tagColList);
pTableMetaInfo->tagColList = NULL; pTableMetaInfo->tagColList = NULL;
} }
...@@ -1788,7 +1792,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void ...@@ -1788,7 +1792,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
tscFreeSqlObj(pNew); tscFreeSqlObj(pNew);
return NULL; return NULL;
} }
tscColumnListCopy(pNewQueryInfo->colList, pQueryInfo->colList, (int16_t)tableIndex); tscColumnListCopy(pNewQueryInfo->colList, pQueryInfo->colList, (int16_t)tableIndex);
// set the correct query type // set the correct query type
...@@ -1847,7 +1851,8 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void ...@@ -1847,7 +1851,8 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
if (pPrevSql == NULL) { if (pPrevSql == NULL) {
STableMeta* pTableMeta = taosCacheAcquireByName(tscCacheHandle, name); STableMeta* pTableMeta = taosCacheAcquireByName(tscCacheHandle, name);
// todo handle error
assert(pTableMeta != NULL);
pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pTableMeta, pTableMetaInfo->vgroupList, pTableMetaInfo->tagColList); pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pTableMeta, pTableMetaInfo->vgroupList, pTableMetaInfo->tagColList);
} else { // transfer the ownership of pTableMeta to the newly create sql object. } else { // transfer the ownership of pTableMeta to the newly create sql object.
STableMetaInfo* pPrevInfo = tscGetTableMetaInfoFromCmd(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex, 0); STableMetaInfo* pPrevInfo = tscGetTableMetaInfoFromCmd(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex, 0);
......
...@@ -23,4 +23,5 @@ void extractTableName(const char *tableId, char *name); ...@@ -23,4 +23,5 @@ void extractTableName(const char *tableId, char *name);
char* extractDBName(const char *tableId, char *name); char* extractDBName(const char *tableId, char *name);
#endif // TDENGINE_NAME_H #endif // TDENGINE_NAME_H
...@@ -32,6 +32,9 @@ extern "C" { ...@@ -32,6 +32,9 @@ extern "C" {
#define TSKEY int64_t #define TSKEY int64_t
#endif #endif
#define TSWINDOW_INITIALIZER {INT64_MIN, INT64_MAX};
#define TSKEY_INITIAL_VAL INT64_MIN
// ----------------- For variable data types such as TSDB_DATA_TYPE_BINARY and TSDB_DATA_TYPE_NCHAR // ----------------- For variable data types such as TSDB_DATA_TYPE_BINARY and TSDB_DATA_TYPE_NCHAR
typedef int32_t VarDataOffsetT; typedef int32_t VarDataOffsetT;
typedef int16_t VarDataLenT; typedef int16_t VarDataLenT;
...@@ -341,8 +344,6 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size); ...@@ -341,8 +344,6 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_MAX_DBS 100 #define TSDB_MAX_DBS 100
#define TSDB_MAX_VGROUPS 1000 #define TSDB_MAX_VGROUPS 1000
#define TSDB_MAX_SUPER_TABLES 100 #define TSDB_MAX_SUPER_TABLES 100
#define TSDB_MAX_NORMAL_TABLES 1000
#define TSDB_MAX_CHILD_TABLES 100000
#define TSDB_PORT_DNODESHELL 0 #define TSDB_PORT_DNODESHELL 0
#define TSDB_PORT_DNODEDNODE 5 #define TSDB_PORT_DNODEDNODE 5
......
...@@ -627,7 +627,6 @@ typedef struct { ...@@ -627,7 +627,6 @@ typedef struct {
typedef struct STableMetaMsg { typedef struct STableMetaMsg {
int32_t contLen; int32_t contLen;
char tableId[TSDB_TABLE_ID_LEN + 1]; // table id char tableId[TSDB_TABLE_ID_LEN + 1]; // table id
char stableId[TSDB_TABLE_ID_LEN + 1]; // stable name if it is created according to super table
uint8_t numOfTags; uint8_t numOfTags;
uint8_t precision; uint8_t precision;
uint8_t tableType; uint8_t tableType;
......
...@@ -1203,8 +1203,10 @@ void mgmtDropAllSuperTables(SDbObj *pDropDb) { ...@@ -1203,8 +1203,10 @@ void mgmtDropAllSuperTables(SDbObj *pDropDb) {
static int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable) { static int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable) {
int32_t numOfCols = pTable->numOfColumns + pTable->numOfTags; int32_t numOfCols = pTable->numOfColumns + pTable->numOfTags;
assert(numOfCols <= TSDB_MAX_COLUMNS);
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
strncpy(pSchema->name, pTable->schema[i].name, TSDB_TABLE_ID_LEN); strncpy(pSchema->name, pTable->schema[i].name, TSDB_COL_NAME_LEN);
pSchema->type = pTable->schema[i].type; pSchema->type = pTable->schema[i].type;
pSchema->bytes = htons(pTable->schema[i].bytes); pSchema->bytes = htons(pTable->schema[i].bytes);
pSchema->colId = htons(pTable->schema[i].colId); pSchema->colId = htons(pTable->schema[i].colId);
...@@ -1675,7 +1677,6 @@ static int32_t mgmtDoGetChildTableMeta(SQueuedMsg *pMsg, STableMetaMsg *pMeta) { ...@@ -1675,7 +1677,6 @@ static int32_t mgmtDoGetChildTableMeta(SQueuedMsg *pMsg, STableMetaMsg *pMeta) {
pMeta->numOfTags = (int8_t)pTable->superTable->numOfTags; pMeta->numOfTags = (int8_t)pTable->superTable->numOfTags;
pMeta->numOfColumns = htons((int16_t)pTable->superTable->numOfColumns); pMeta->numOfColumns = htons((int16_t)pTable->superTable->numOfColumns);
pMeta->contLen = sizeof(STableMetaMsg) + mgmtSetSchemaFromSuperTable(pMeta->schema, pTable->superTable); pMeta->contLen = sizeof(STableMetaMsg) + mgmtSetSchemaFromSuperTable(pMeta->schema, pTable->superTable);
strncpy(pMeta->stableId, pTable->superTable->info.tableId, tListLen(pMeta->stableId));
} else { } else {
pMeta->sversion = htons(pTable->sversion); pMeta->sversion = htons(pTable->sversion);
pMeta->numOfTags = 0; pMeta->numOfTags = 0;
......
...@@ -120,12 +120,6 @@ typedef struct tExtMemBuffer { ...@@ -120,12 +120,6 @@ typedef struct tExtMemBuffer {
EXT_BUFFER_FLUSH_MODEL flushModel; EXT_BUFFER_FLUSH_MODEL flushModel;
} tExtMemBuffer; } tExtMemBuffer;
//typedef struct tTagSchema {
// struct SSchema *pSchema;
// int32_t numOfCols;
// int32_t colOffset[];
//} tTagSchema;
/** /**
* *
* @param inMemSize * @param inMemSize
......
...@@ -507,7 +507,7 @@ static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t t ...@@ -507,7 +507,7 @@ static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t t
w.ekey = pQuery->window.ekey; w.ekey = pQuery->window.ekey;
} }
assert(ts >= w.skey && ts <= w.ekey && w.skey != 0); assert(ts >= w.skey && ts <= w.ekey);
return w; return w;
} }
...@@ -624,7 +624,7 @@ static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, ...@@ -624,7 +624,7 @@ static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey,
setQueryStatus(pQuery, QUERY_COMPLETED | QUERY_RESBUF_FULL); setQueryStatus(pQuery, QUERY_COMPLETED | QUERY_RESBUF_FULL);
} else { // set the current index to be the last unclosed window } else { // set the current index to be the last unclosed window
int32_t i = 0; int32_t i = 0;
int64_t skey = 0; int64_t skey = TSKEY_INITIAL_VAL;
for (i = 0; i < pWindowResInfo->size; ++i) { for (i = 0; i < pWindowResInfo->size; ++i) {
SWindowResult *pResult = &pWindowResInfo->pResult[i]; SWindowResult *pResult = &pWindowResInfo->pResult[i];
...@@ -642,7 +642,7 @@ static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, ...@@ -642,7 +642,7 @@ static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey,
} }
// all windows are closed, set the last one to be the skey // all windows are closed, set the last one to be the skey
if (skey == 0) { if (skey == TSKEY_INITIAL_VAL) {
assert(i == pWindowResInfo->size); assert(i == pWindowResInfo->size);
pWindowResInfo->curIndex = pWindowResInfo->size - 1; pWindowResInfo->curIndex = pWindowResInfo->size - 1;
} else { } else {
...@@ -660,7 +660,7 @@ static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, ...@@ -660,7 +660,7 @@ static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey,
qTrace("QInfo:%p total window:%d, closed:%d", GET_QINFO_ADDR(pRuntimeEnv), pWindowResInfo->size, n); qTrace("QInfo:%p total window:%d, closed:%d", GET_QINFO_ADDR(pRuntimeEnv), pWindowResInfo->size, n);
} }
assert(pWindowResInfo->prevSKey != 0); assert(pWindowResInfo->prevSKey != TSKEY_INITIAL_VAL);
} }
static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SDataBlockInfo *pDataBlockInfo, TSKEY *pPrimaryColumn, static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SDataBlockInfo *pDataBlockInfo, TSKEY *pPrimaryColumn,
...@@ -3080,6 +3080,9 @@ void disableFuncInReverseScan(SQInfo *pQInfo) { ...@@ -3080,6 +3080,9 @@ void disableFuncInReverseScan(SQInfo *pQInfo) {
int32_t functId = pQuery->pSelectExpr[j].base.functionId; int32_t functId = pQuery->pSelectExpr[j].base.functionId;
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[j]; SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[j];
if (pCtx->resultInfo == NULL) {
continue; // resultInfo is NULL, means no data checked in previous scan
}
if (((functId == TSDB_FUNC_FIRST || functId == TSDB_FUNC_FIRST_DST) && order == TSDB_ORDER_ASC) || if (((functId == TSDB_FUNC_FIRST || functId == TSDB_FUNC_FIRST_DST) && order == TSDB_ORDER_ASC) ||
((functId == TSDB_FUNC_LAST || functId == TSDB_FUNC_LAST_DST) && order == TSDB_ORDER_DESC)) { ((functId == TSDB_FUNC_LAST || functId == TSDB_FUNC_LAST_DST) && order == TSDB_ORDER_DESC)) {
...@@ -3590,7 +3593,6 @@ void setIntervalQueryRange(SQInfo *pQInfo, TSKEY key) { ...@@ -3590,7 +3593,6 @@ void setIntervalQueryRange(SQInfo *pQInfo, TSKEY key) {
if (pTableQueryInfo->queryRangeSet) { if (pTableQueryInfo->queryRangeSet) {
pTableQueryInfo->lastKey = key; pTableQueryInfo->lastKey = key;
} else { } else {
// pQuery->window.skey = key;
pTableQueryInfo->win.skey = key; pTableQueryInfo->win.skey = key;
STimeWindow win = {.skey = key, .ekey = pQuery->window.ekey}; STimeWindow win = {.skey = key, .ekey = pQuery->window.ekey};
...@@ -3613,18 +3615,16 @@ void setIntervalQueryRange(SQInfo *pQInfo, TSKEY key) { ...@@ -3613,18 +3615,16 @@ void setIntervalQueryRange(SQInfo *pQInfo, TSKEY key) {
getAlignQueryTimeWindow(pQuery, win.skey, win.skey, win.ekey, &skey1, &ekey1, &w); getAlignQueryTimeWindow(pQuery, win.skey, win.skey, win.ekey, &skey1, &ekey1, &w);
pWindowResInfo->startTime = pTableQueryInfo->win.skey; // windowSKey may be 0 in case of 1970 timestamp pWindowResInfo->startTime = pTableQueryInfo->win.skey; // windowSKey may be 0 in case of 1970 timestamp
if (pWindowResInfo->prevSKey == 0) { if (pWindowResInfo->prevSKey == TSKEY_INITIAL_VAL) {
if (QUERY_IS_ASC_QUERY(pQuery)) { if (!QUERY_IS_ASC_QUERY(pQuery)) {
pWindowResInfo->prevSKey = w.skey;
} else {
assert(win.ekey == pQuery->window.skey); assert(win.ekey == pQuery->window.skey);
pWindowResInfo->prevSKey = w.skey;
} }
pWindowResInfo->prevSKey = w.skey;
} }
pTableQueryInfo->queryRangeSet = 1; pTableQueryInfo->queryRangeSet = 1;
pTableQueryInfo->lastKey = pTableQueryInfo->win.skey; pTableQueryInfo->lastKey = pTableQueryInfo->win.skey;
pTableQueryInfo->win.skey = pTableQueryInfo->win.skey;
} }
} }
...@@ -4057,10 +4057,11 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -4057,10 +4057,11 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv) {
* pQuery->limit.offset times. Since hole exists, pQuery->intervalTime*pQuery->limit.offset value is * pQuery->limit.offset times. Since hole exists, pQuery->intervalTime*pQuery->limit.offset value is
* not valid. otherwise, we only forward pQuery->limit.offset number of points * not valid. otherwise, we only forward pQuery->limit.offset number of points
*/ */
assert(pRuntimeEnv->windowResInfo.prevSKey == 0); assert(pRuntimeEnv->windowResInfo.prevSKey == TSKEY_INITIAL_VAL);
TSKEY skey1, ekey1; TSKEY skey1, ekey1;
STimeWindow w = {0}; STimeWindow w = TSWINDOW_INITIALIZER;
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
STableQueryInfo *pTableQueryInfo = pQuery->current; STableQueryInfo *pTableQueryInfo = pQuery->current;
...@@ -4730,7 +4731,7 @@ static void doRestoreContext(SQInfo *pQInfo) { ...@@ -4730,7 +4731,7 @@ static void doRestoreContext(SQInfo *pQInfo) {
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY); SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
if (pRuntimeEnv->pTSBuf != NULL) { if (pRuntimeEnv->pTSBuf != NULL) {
pRuntimeEnv->pTSBuf->cur.order = pRuntimeEnv->pTSBuf->cur.order ^ 1; SWITCH_ORDER(pRuntimeEnv->pTSBuf->cur.order);
} }
switchCtxOrder(pRuntimeEnv); switchCtxOrder(pRuntimeEnv);
......
...@@ -1347,7 +1347,7 @@ int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) { ...@@ -1347,7 +1347,7 @@ int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) {
int32_t type = 0; int32_t type = 0;
int32_t bytes = 0; int32_t bytes = 0;
if (colIndex == TSDB_TBNAME_COLUMN_INDEX) { if (colIndex == TSDB_TBNAME_COLUMN_INDEX) { // todo refactor extract method , to queryExecutor to generate tags values
f1 = (char*) pTable1->name; f1 = (char*) pTable1->name;
f2 = (char*) pTable2->name; f2 = (char*) pTable2->name;
type = TSDB_DATA_TYPE_BINARY; type = TSDB_DATA_TYPE_BINARY;
...@@ -1355,7 +1355,8 @@ int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) { ...@@ -1355,7 +1355,8 @@ int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) {
} else { } else {
STColumn* pCol = schemaColAt(pTableGroupSupp->pTagSchema, colIndex); STColumn* pCol = schemaColAt(pTableGroupSupp->pTagSchema, colIndex);
bytes = pCol->bytes; bytes = pCol->bytes;
type = pCol->type;
f1 = tdGetRowDataOfCol(pTable1->tagVal, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset); f1 = tdGetRowDataOfCol(pTable1->tagVal, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset);
f2 = tdGetRowDataOfCol(pTable2->tagVal, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset); f2 = tdGetRowDataOfCol(pTable2->tagVal, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset);
} }
......
...@@ -94,7 +94,7 @@ size_t taosHashGetSize(const SHashObj *pHashObj); ...@@ -94,7 +94,7 @@ size_t taosHashGetSize(const SHashObj *pHashObj);
* @param size * @param size
* @return * @return
*/ */
int32_t taosHashPut(SHashObj *pHashObj, const char *key, size_t keyLen, void *data, size_t size); int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size);
/** /**
* return the payload data with the specified key * return the payload data with the specified key
...@@ -104,7 +104,7 @@ int32_t taosHashPut(SHashObj *pHashObj, const char *key, size_t keyLen, void *da ...@@ -104,7 +104,7 @@ int32_t taosHashPut(SHashObj *pHashObj, const char *key, size_t keyLen, void *da
* @param keyLen * @param keyLen
* @return * @return
*/ */
void *taosHashGet(SHashObj *pHashObj, const char *key, size_t keyLen); void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen);
/** /**
* remove item with the specified key * remove item with the specified key
...@@ -112,7 +112,7 @@ void *taosHashGet(SHashObj *pHashObj, const char *key, size_t keyLen); ...@@ -112,7 +112,7 @@ void *taosHashGet(SHashObj *pHashObj, const char *key, size_t keyLen);
* @param key * @param key
* @param keyLen * @param keyLen
*/ */
void taosHashRemove(SHashObj *pHashObj, const char *key, size_t keyLen); void taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen);
/** /**
* clean up hash table * clean up hash table
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
#include "hash.h" #include "hash.h"
#include "tulog.h" #include "tulog.h"
#include "ttime.h"
#include "tutil.h" #include "tutil.h"
static FORCE_INLINE void __wr_lock(void *lock) { static FORCE_INLINE void __wr_lock(void *lock) {
...@@ -90,154 +89,65 @@ static FORCE_INLINE int32_t taosHashCapacity(int32_t length) { ...@@ -90,154 +89,65 @@ static FORCE_INLINE int32_t taosHashCapacity(int32_t length) {
/** /**
* inplace update node in hash table * inplace update node in hash table
* @param pHashObj hash table object * @param pHashObj hash table object
* @param pNode data node * @param pNode hash data node
*/ */
static void doUpdateHashTable(SHashObj *pHashObj, SHashNode *pNode) { static void doUpdateHashTable(SHashObj *pHashObj, SHashNode *pNode);
if (pNode->prev1) {
pNode->prev1->next = pNode;
}
if (pNode->next) {
(pNode->next)->prev = pNode;
}
}
/** /**
* get SHashNode from hashlist, nodes from trash are not included. * Get SHashNode from hashlist, nodes from trash are not included.
* @param pHashObj Cache objection * @param pHashObj Cache objection
* @param key key for hash * @param key key for hash
* @param keyLen key length * @param keyLen key length
* @param hashVal hash value by hash function
* @return * @return
*/ */
static SHashNode *doGetNodeFromHashTable(SHashObj *pHashObj, const char *key, uint32_t keyLen, uint32_t *hashVal) { static SHashNode *doGetNodeFromHashTable(SHashObj *pHashObj, const void *key, uint32_t keyLen, uint32_t *hashVal);
uint32_t hash = (*pHashObj->hashFp)(key, keyLen);
int32_t slot = HASH_INDEX(hash, pHashObj->capacity);
SHashEntry *pEntry = pHashObj->hashList[slot];
SHashNode *pNode = pEntry->next;
while (pNode) {
if ((pNode->keyLen == keyLen) && (memcmp(pNode->key, key, keyLen) == 0)) {
break;
}
pNode = pNode->next;
}
if (pNode) {
assert(HASH_INDEX(pNode->hashVal, pHashObj->capacity) == slot);
}
// return the calculated hash value, to avoid calculating it again in other functions
if (hashVal != NULL) {
*hashVal = hash;
}
return pNode;
}
/** /**
* resize the hash list if the threshold is reached * Resize the hash list if the threshold is reached
* *
* @param pHashObj * @param pHashObj
*/ */
static void taosHashTableResize(SHashObj *pHashObj) { static void taosHashTableResize(SHashObj *pHashObj);
if (pHashObj->size < pHashObj->capacity * HASH_DEFAULT_LOAD_FACTOR) {
return;
}
// double the original capacity
SHashNode *pNode = NULL;
SHashNode *pNext = NULL;
int32_t newSize = pHashObj->capacity << 1u; /**
if (newSize > HASH_MAX_CAPACITY) { * @param key key of object for hash, usually a null-terminated string
// uTrace("current capacity:%d, maximum capacity:%d, no resize applied due to limitation is reached", * @param keyLen length of key
// pHashObj->capacity, HASH_MAX_CAPACITY); * @param pData actually data. Requires a consecutive memory block, no pointer is allowed in pData.
return; * Pointer copy causes memory access error.
} * @param dsize size of data
* @return SHashNode
// int64_t st = taosGetTimestampUs(); */
static SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, size_t dsize, uint32_t hashVal);
SHashEntry **pNewEntry = realloc(pHashObj->hashList, sizeof(SHashEntry *) * newSize);
if (pNewEntry == NULL) {
// uTrace("cache resize failed due to out of memory, capacity remain:%d", pHashObj->capacity);
return;
}
pHashObj->hashList = pNewEntry;
for (int32_t i = pHashObj->capacity; i < newSize; ++i) {
pHashObj->hashList[i] = calloc(1, sizeof(SHashEntry));
}
pHashObj->capacity = newSize;
for (int32_t i = 0; i < pHashObj->capacity; ++i) {
SHashEntry *pEntry = pHashObj->hashList[i];
pNode = pEntry->next;
if (pNode != NULL) {
assert(pNode->prev1 == pEntry && pEntry->num > 0);
}
while (pNode) {
int32_t j = HASH_INDEX(pNode->hashVal, pHashObj->capacity);
if (j == i) { // this key resides in the same slot, no need to relocate it
pNode = pNode->next;
} else {
pNext = pNode->next;
// remove from current slot
assert(pNode->prev1 != NULL);
if (pNode->prev1 == pEntry) { // first node of the overflow linked list
pEntry->next = pNode->next;
} else {
pNode->prev->next = pNode->next;
}
pEntry->num--;
assert(pEntry->num >= 0);
if (pNode->next != NULL) {
(pNode->next)->prev = pNode->prev;
}
// added into new slot
pNode->next = NULL;
pNode->prev1 = NULL;
SHashEntry *pNewIndexEntry = pHashObj->hashList[j];
if (pNewIndexEntry->next != NULL) {
assert(pNewIndexEntry->next->prev1 == pNewIndexEntry);
pNewIndexEntry->next->prev = pNode;
}
pNode->next = pNewIndexEntry->next;
pNode->prev1 = pNewIndexEntry;
pNewIndexEntry->next = pNode;
pNewIndexEntry->num++;
// continue /**
pNode = pNext; * Update the hash node
} *
} * @param pNode hash node
} * @param key key for generate hash value
* @param keyLen key length
* @param pData actual data
* @param dsize size of actual data
* @return hash node
*/
static SHashNode *doUpdateHashNode(SHashNode *pNode, const void *key, size_t keyLen, const void *pData, size_t dsize);
// int64_t et = taosGetTimestampUs(); /**
// uTrace("hash table resize completed, new capacity:%d, load factor:%f, elapsed time:%fms", pHashObj->capacity, * insert the hash node at the front of the linked list
// ((double)pHashObj->size) / pHashObj->capacity, (et - st) / 1000.0); *
} * @param pHashObj
* @param pNode
*/
static void doAddToHashTable(SHashObj *pHashObj, SHashNode *pNode);
/** /**
* @param capacity maximum slots available for hash elements * Get the next element in hash table for iterator
* @param fn hash function * @param pIter
* @return * @return
*/ */
static SHashNode *getNextHashNode(SHashMutableIterator *pIter);
SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool threadsafe) { SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool threadsafe) {
if (capacity == 0 || fn == NULL) { if (capacity == 0 || fn == NULL) {
return NULL; return NULL;
...@@ -285,79 +195,6 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool threadsafe) { ...@@ -285,79 +195,6 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool threadsafe) {
return pHashObj; return pHashObj;
} }
/**
* @param key key of object for hash, usually a null-terminated string
* @param keyLen length of key
* @param pData actually data. required a consecutive memory block, no pointer is allowed
* in pData. Pointer copy causes memory access error.
* @param size size of block
* @return SHashNode
*/
static SHashNode *doCreateHashNode(const char *key, size_t keyLen, const char *pData, size_t dataSize,
uint32_t hashVal) {
size_t totalSize = dataSize + sizeof(SHashNode) + keyLen + 1; // one extra byte for null
SHashNode *pNewNode = calloc(1, totalSize);
if (pNewNode == NULL) {
uError("failed to allocate memory, reason:%s", strerror(errno));
return NULL;
}
memcpy(pNewNode->data, pData, dataSize);
pNewNode->key = pNewNode->data + dataSize;
memcpy(pNewNode->key, key, keyLen);
pNewNode->keyLen = keyLen;
pNewNode->hashVal = hashVal;
return pNewNode;
}
static SHashNode *doUpdateHashNode(SHashNode *pNode, const char *key, size_t keyLen, const char *pData,
size_t dataSize) {
size_t size = dataSize + sizeof(SHashNode) + keyLen;
SHashNode *pNewNode = (SHashNode *)realloc(pNode, size);
if (pNewNode == NULL) {
return NULL;
}
memcpy(pNewNode->data, pData, dataSize);
pNewNode->key = pNewNode->data + dataSize;
assert(memcmp(pNewNode->key, key, keyLen) == 0 && keyLen == pNewNode->keyLen);
memcpy(pNewNode->key, key, keyLen);
return pNewNode;
}
/**
* insert the hash node at the front of the linked list
*
* @param pHashObj
* @param pNode
*/
static void doAddToHashTable(SHashObj *pHashObj, SHashNode *pNode) {
assert(pNode != NULL);
int32_t index = HASH_INDEX(pNode->hashVal, pHashObj->capacity);
SHashEntry *pEntry = pHashObj->hashList[index];
pNode->next = pEntry->next;
if (pEntry->next) {
pEntry->next->prev = pNode;
}
pEntry->next = pNode;
pNode->prev1 = pEntry;
pEntry->num++;
pHashObj->size++;
}
size_t taosHashGetSize(const SHashObj *pHashObj) { size_t taosHashGetSize(const SHashObj *pHashObj) {
if (pHashObj == NULL) { if (pHashObj == NULL) {
return 0; return 0;
...@@ -366,12 +203,7 @@ size_t taosHashGetSize(const SHashObj *pHashObj) { ...@@ -366,12 +203,7 @@ size_t taosHashGetSize(const SHashObj *pHashObj) {
return pHashObj->size; return pHashObj->size;
} }
/** int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size) {
* add data node into hash table
* @param pHashObj hash object
* @param pNode hash node
*/
int32_t taosHashPut(SHashObj *pHashObj, const char *key, size_t keyLen, void *data, size_t size) {
__wr_lock(pHashObj->lock); __wr_lock(pHashObj->lock);
uint32_t hashVal = 0; uint32_t hashVal = 0;
...@@ -402,7 +234,7 @@ int32_t taosHashPut(SHashObj *pHashObj, const char *key, size_t keyLen, void *da ...@@ -402,7 +234,7 @@ int32_t taosHashPut(SHashObj *pHashObj, const char *key, size_t keyLen, void *da
return 0; return 0;
} }
void *taosHashGet(SHashObj *pHashObj, const char *key, size_t keyLen) { void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen) {
__rd_lock(pHashObj->lock); __rd_lock(pHashObj->lock);
uint32_t hashVal = 0; uint32_t hashVal = 0;
...@@ -419,12 +251,7 @@ void *taosHashGet(SHashObj *pHashObj, const char *key, size_t keyLen) { ...@@ -419,12 +251,7 @@ void *taosHashGet(SHashObj *pHashObj, const char *key, size_t keyLen) {
} }
} }
/** void taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen) {
* remove node in hash list
* @param pHashObj
* @param pNode
*/
void taosHashRemove(SHashObj *pHashObj, const char *key, size_t keyLen) {
__wr_lock(pHashObj->lock); __wr_lock(pHashObj->lock);
uint32_t val = 0; uint32_t val = 0;
...@@ -518,23 +345,6 @@ SHashMutableIterator *taosHashCreateIter(SHashObj *pHashObj) { ...@@ -518,23 +345,6 @@ SHashMutableIterator *taosHashCreateIter(SHashObj *pHashObj) {
return pIter; return pIter;
} }
static SHashNode *getNextHashNode(SHashMutableIterator *pIter) {
assert(pIter != NULL);
pIter->entryIndex++;
while (pIter->entryIndex < pIter->pHashObj->capacity) {
SHashEntry *pEntry = pIter->pHashObj->hashList[pIter->entryIndex];
if (pEntry->next == NULL) {
pIter->entryIndex++;
continue;
}
return pEntry->next;
}
return NULL;
}
bool taosHashIterNext(SHashMutableIterator *pIter) { bool taosHashIterNext(SHashMutableIterator *pIter) {
if (pIter == NULL) { if (pIter == NULL) {
return false; return false;
...@@ -617,3 +427,205 @@ int32_t taosHashGetMaxOverflowLinkLength(const SHashObj *pHashObj) { ...@@ -617,3 +427,205 @@ int32_t taosHashGetMaxOverflowLinkLength(const SHashObj *pHashObj) {
return num; return num;
} }
void doUpdateHashTable(SHashObj *pHashObj, SHashNode *pNode) {
if (pNode->prev1) {
pNode->prev1->next = pNode;
}
if (pNode->next) {
(pNode->next)->prev = pNode;
}
}
SHashNode *doGetNodeFromHashTable(SHashObj *pHashObj, const void *key, uint32_t keyLen, uint32_t *hashVal) {
uint32_t hash = (*pHashObj->hashFp)(key, keyLen);
int32_t slot = HASH_INDEX(hash, pHashObj->capacity);
SHashEntry *pEntry = pHashObj->hashList[slot];
SHashNode *pNode = pEntry->next;
while (pNode) {
if ((pNode->keyLen == keyLen) && (memcmp(pNode->key, key, keyLen) == 0)) {
break;
}
pNode = pNode->next;
}
if (pNode) {
assert(HASH_INDEX(pNode->hashVal, pHashObj->capacity) == slot);
}
// return the calculated hash value, to avoid calculating it again in other functions
if (hashVal != NULL) {
*hashVal = hash;
}
return pNode;
}
void taosHashTableResize(SHashObj *pHashObj) {
if (pHashObj->size < pHashObj->capacity * HASH_DEFAULT_LOAD_FACTOR) {
return;
}
// double the original capacity
SHashNode *pNode = NULL;
SHashNode *pNext = NULL;
int32_t newSize = pHashObj->capacity << 1u;
if (newSize > HASH_MAX_CAPACITY) {
// uTrace("current capacity:%d, maximum capacity:%d, no resize applied due to limitation is reached",
// pHashObj->capacity, HASH_MAX_CAPACITY);
return;
}
// int64_t st = taosGetTimestampUs();
SHashEntry **pNewEntry = realloc(pHashObj->hashList, sizeof(SHashEntry *) * newSize);
if (pNewEntry == NULL) {
// uTrace("cache resize failed due to out of memory, capacity remain:%d", pHashObj->capacity);
return;
}
pHashObj->hashList = pNewEntry;
for (int32_t i = pHashObj->capacity; i < newSize; ++i) {
pHashObj->hashList[i] = calloc(1, sizeof(SHashEntry));
}
pHashObj->capacity = newSize;
for (int32_t i = 0; i < pHashObj->capacity; ++i) {
SHashEntry *pEntry = pHashObj->hashList[i];
pNode = pEntry->next;
if (pNode != NULL) {
assert(pNode->prev1 == pEntry && pEntry->num > 0);
}
while (pNode) {
int32_t j = HASH_INDEX(pNode->hashVal, pHashObj->capacity);
if (j == i) { // this key resides in the same slot, no need to relocate it
pNode = pNode->next;
} else {
pNext = pNode->next;
// remove from current slot
assert(pNode->prev1 != NULL);
if (pNode->prev1 == pEntry) { // first node of the overflow linked list
pEntry->next = pNode->next;
} else {
pNode->prev->next = pNode->next;
}
pEntry->num--;
assert(pEntry->num >= 0);
if (pNode->next != NULL) {
(pNode->next)->prev = pNode->prev;
}
// added into new slot
pNode->next = NULL;
pNode->prev1 = NULL;
SHashEntry *pNewIndexEntry = pHashObj->hashList[j];
if (pNewIndexEntry->next != NULL) {
assert(pNewIndexEntry->next->prev1 == pNewIndexEntry);
pNewIndexEntry->next->prev = pNode;
}
pNode->next = pNewIndexEntry->next;
pNode->prev1 = pNewIndexEntry;
pNewIndexEntry->next = pNode;
pNewIndexEntry->num++;
// continue
pNode = pNext;
}
}
}
// int64_t et = taosGetTimestampUs();
// uTrace("hash table resize completed, new capacity:%d, load factor:%f, elapsed time:%fms", pHashObj->capacity,
// ((double)pHashObj->size) / pHashObj->capacity, (et - st) / 1000.0);
}
SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, size_t dsize, uint32_t hashVal) {
size_t totalSize = dsize + sizeof(SHashNode) + keyLen + 1; // one extra byte for null
SHashNode *pNewNode = calloc(1, totalSize);
if (pNewNode == NULL) {
uError("failed to allocate memory, reason:%s", strerror(errno));
return NULL;
}
memcpy(pNewNode->data, pData, dsize);
pNewNode->key = pNewNode->data + dsize;
memcpy(pNewNode->key, key, keyLen);
pNewNode->keyLen = keyLen;
pNewNode->hashVal = hashVal;
return pNewNode;
}
SHashNode *doUpdateHashNode(SHashNode *pNode, const void *key, size_t keyLen, const void *pData, size_t dsize) {
size_t size = dsize + sizeof(SHashNode) + keyLen;
SHashNode *pNewNode = (SHashNode *)realloc(pNode, size);
if (pNewNode == NULL) {
return NULL;
}
memcpy(pNewNode->data, pData, dsize);
pNewNode->key = pNewNode->data + dsize;
assert(memcmp(pNewNode->key, key, keyLen) == 0 && keyLen == pNewNode->keyLen);
memcpy(pNewNode->key, key, keyLen);
return pNewNode;
}
void doAddToHashTable(SHashObj *pHashObj, SHashNode *pNode) {
assert(pNode != NULL);
int32_t index = HASH_INDEX(pNode->hashVal, pHashObj->capacity);
SHashEntry *pEntry = pHashObj->hashList[index];
pNode->next = pEntry->next;
if (pEntry->next) {
pEntry->next->prev = pNode;
}
pEntry->next = pNode;
pNode->prev1 = pEntry;
pEntry->num++;
pHashObj->size++;
}
SHashNode *getNextHashNode(SHashMutableIterator *pIter) {
assert(pIter != NULL);
pIter->entryIndex++;
while (pIter->entryIndex < pIter->pHashObj->capacity) {
SHashEntry *pEntry = pIter->pHashObj->hashList[pIter->entryIndex];
if (pEntry->next == NULL) {
pIter->entryIndex++;
continue;
}
return pEntry->next;
}
return NULL;
}
...@@ -48,41 +48,41 @@ $tb = $tbPrefix . $i ...@@ -48,41 +48,41 @@ $tb = $tbPrefix . $i
sql select leastsquares(tbcol, 1, 1) from $tb sql select leastsquares(tbcol, 1, 1) from $tb
print ===> $data00 print ===> $data00
if $data00 != @(1.000000, 1.000000)@ then if $data00 != @{slop:1.000000, intercept:1.000000}@ then
return -1 return -1
endi endi
print =============== step3 print =============== step3
sql select leastsquares(tbcol, 1, 1) from $tb where ts < now + 4m sql select leastsquares(tbcol, 1, 1) from $tb where ts < now + 4m
print ===> $data00 print ===> $data00
if $data00 != @(1.000000, 1.000000)@ then if $data00 != @{slop:1.000000, intercept:1.000000}@ then
return -1 return -1
endi endi
print =============== step4 print =============== step4
sql select leastsquares(tbcol, 1, 1) as b from $tb sql select leastsquares(tbcol, 1, 1) as b from $tb
print ===> $data00 print ===> $data00
if $data00 != @(1.000000, 1.000000)@ then if $data00 != @{slop:1.000000, intercept:1.000000}@ then
return -1 return -1
endi endi
print =============== step5 print =============== step5
sql select leastsquares(tbcol, 1, 1) as b from $tb interval(1m) sql select leastsquares(tbcol, 1, 1) as b from $tb interval(1m)
print ===> $data01 print ===> $data01
if $data01 != @(1.000000, 1.000000)@ then if $data01 != @{slop:1.000000, intercept:1.000000}@ then
return -1 return -1
endi endi
sql select leastsquares(tbcol, 1, 1) as b from $tb interval(1d) sql select leastsquares(tbcol, 1, 1) as b from $tb interval(1d)
print ===> $data01 print ===> $data01
if $data01 != @(1.000000, 1.000000)@ then if $data01 != @{slop:1.000000, intercept:1.000000}@ then
return -1 return -1
endi endi
print =============== step6 print =============== step6
sql select leastsquares(tbcol, 1, 1) as b from $tb where ts < now + 4m interval(1m) sql select leastsquares(tbcol, 1, 1) as b from $tb where ts < now + 4m interval(1m)
print ===> $data01 print ===> $data01
if $data01 != @(1.000000, 1.000000)@ then if $data01 != @{slop:1.000000, intercept:1.000000}@ then
return -1 return -1
endi endi
print ===> $rows print ===> $rows
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册