未验证 提交 ff7794c0 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #3353 from taosdata/feature/query

Feature/query
...@@ -30,7 +30,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code); ...@@ -30,7 +30,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code);
SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, int32_t index); SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, int32_t index);
int32_t tscHandleMasterJoinQuery(SSqlObj* pSql); void tscHandleMasterJoinQuery(SSqlObj* pSql);
int32_t tscHandleMasterSTableQuery(SSqlObj *pSql); int32_t tscHandleMasterSTableQuery(SSqlObj *pSql);
......
...@@ -187,7 +187,7 @@ SSqlExpr* tscSqlExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functi ...@@ -187,7 +187,7 @@ SSqlExpr* tscSqlExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functi
size_t tscSqlExprNumOfExprs(SQueryInfo* pQueryInfo); size_t tscSqlExprNumOfExprs(SQueryInfo* pQueryInfo);
SSqlExpr* tscSqlExprGet(SQueryInfo* pQueryInfo, int32_t index); SSqlExpr* tscSqlExprGet(SQueryInfo* pQueryInfo, int32_t index);
void tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy); int32_t tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy);
void tscSqlExprInfoDestroy(SArray* pExprInfo); void tscSqlExprInfoDestroy(SArray* pExprInfo);
SColumn* tscColumnClone(const SColumn* src); SColumn* tscColumnClone(const SColumn* src);
...@@ -205,7 +205,7 @@ bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId, int32_t ...@@ -205,7 +205,7 @@ bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId, int32_t
SCond* tsGetSTableQueryCond(STagCond* pCond, uint64_t uid); SCond* tsGetSTableQueryCond(STagCond* pCond, uint64_t uid);
void tsSetSTableQueryCond(STagCond* pTagCond, uint64_t uid, SBufferWriter* bw); void tsSetSTableQueryCond(STagCond* pTagCond, uint64_t uid, SBufferWriter* bw);
void tscTagCondCopy(STagCond* dest, const STagCond* src); int32_t tscTagCondCopy(STagCond* dest, const STagCond* src);
void tscTagCondRelease(STagCond* pCond); void tscTagCondRelease(STagCond* pCond);
void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SQueryInfo* pQueryInfo); void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SQueryInfo* pQueryInfo);
......
...@@ -50,7 +50,8 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const ...@@ -50,7 +50,8 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const
pSql->sqlstr = calloc(1, sqlLen + 1); pSql->sqlstr = calloc(1, sqlLen + 1);
if (pSql->sqlstr == NULL) { if (pSql->sqlstr == NULL) {
tscError("%p failed to malloc sql string buffer", pSql); tscError("%p failed to malloc sql string buffer", pSql);
tscQueueAsyncError(pSql->fp, pSql->param, TSDB_CODE_TSC_OUT_OF_MEMORY); pSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscQueueAsyncRes(pSql);
return; return;
} }
...@@ -94,7 +95,6 @@ void taos_query_a(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *pa ...@@ -94,7 +95,6 @@ void taos_query_a(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *pa
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
if (pSql == NULL) { if (pSql == NULL) {
tscError("failed to malloc sqlObj"); tscError("failed to malloc sqlObj");
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscQueueAsyncError(fp, param, TSDB_CODE_TSC_OUT_OF_MEMORY); tscQueueAsyncError(fp, param, TSDB_CODE_TSC_OUT_OF_MEMORY);
return; return;
} }
...@@ -191,7 +191,7 @@ void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRo ...@@ -191,7 +191,7 @@ void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRo
tscProcessAsyncRetrieveImpl(param, tres, numOfRows, tscAsyncFetchSingleRowProxy); tscProcessAsyncRetrieveImpl(param, tres, numOfRows, tscAsyncFetchSingleRowProxy);
} }
void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), void *param) { void taos_fetch_rows_a(TAOS_RES *taosa, __async_cb_func_t fp, void *param) {
SSqlObj *pSql = (SSqlObj *)taosa; SSqlObj *pSql = (SSqlObj *)taosa;
if (pSql == NULL || pSql->signature != pSql) { if (pSql == NULL || pSql->signature != pSql) {
tscError("sql object is NULL"); tscError("sql object is NULL");
...@@ -209,6 +209,8 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi ...@@ -209,6 +209,8 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi
if (pRes->qhandle == 0) { if (pRes->qhandle == 0) {
tscError("qhandle is NULL"); tscError("qhandle is NULL");
pRes->code = TSDB_CODE_TSC_INVALID_QHANDLE; pRes->code = TSDB_CODE_TSC_INVALID_QHANDLE;
pSql->param = param;
tscQueueAsyncRes(pSql); tscQueueAsyncRes(pSql);
return; return;
} }
...@@ -269,7 +271,10 @@ void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW), ...@@ -269,7 +271,10 @@ void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW),
if (pRes->qhandle == 0) { if (pRes->qhandle == 0) {
tscError("qhandle is NULL"); tscError("qhandle is NULL");
tscQueueAsyncError(fp, param, TSDB_CODE_TSC_INVALID_QHANDLE); pSql->param = param;
pRes->code = TSDB_CODE_TSC_INVALID_QHANDLE;
tscQueueAsyncRes(pSql);
return; return;
} }
...@@ -352,36 +357,17 @@ void tscProcessFetchRow(SSchedMsg *pMsg) { ...@@ -352,36 +357,17 @@ void tscProcessFetchRow(SSchedMsg *pMsg) {
void tscProcessAsyncRes(SSchedMsg *pMsg) { void tscProcessAsyncRes(SSchedMsg *pMsg) {
SSqlObj *pSql = (SSqlObj *)pMsg->ahandle; SSqlObj *pSql = (SSqlObj *)pMsg->ahandle;
// SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
// void *taosres = pSql;
// pCmd may be released, so cache pCmd->command
// int cmd = pCmd->command;
// int code = pRes->code;
// in case of async insert, restore the user specified callback function
// bool shouldFree = tscShouldBeFreed(pSql);
// if (pCmd->command == TSDB_SQL_INSERT) {
// assert(pSql->fp != NULL);
assert(pSql->fp != NULL && pSql->fetchFp != NULL); assert(pSql->fp != NULL && pSql->fetchFp != NULL);
// }
// if (pSql->fp) {
pSql->fp = pSql->fetchFp; pSql->fp = pSql->fetchFp;
(*pSql->fp)(pSql->param, pSql, pRes->code); (*pSql->fp)(pSql->param, pSql, pRes->code);
// }
// if (shouldFree) {
// tscDebug("%p sqlObj is automatically freed in async res", pSql);
// tscFreeSqlObj(pSql);
// }
} }
// this function will be executed by queue task threads, so the terrno is not valid
static void tscProcessAsyncError(SSchedMsg *pMsg) { static void tscProcessAsyncError(SSchedMsg *pMsg) {
void (*fp)() = pMsg->ahandle; void (*fp)() = pMsg->ahandle;
terrno = *(int32_t*) pMsg->msg;
(*fp)(pMsg->thandle, NULL, *(int32_t*)pMsg->msg); (*fp)(pMsg->thandle, NULL, *(int32_t*)pMsg->msg);
} }
......
...@@ -2425,24 +2425,14 @@ static void top_bottom_func_finalizer(SQLFunctionCtx *pCtx) { ...@@ -2425,24 +2425,14 @@ static void top_bottom_func_finalizer(SQLFunctionCtx *pCtx) {
/////////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////////
static bool percentile_function_setup(SQLFunctionCtx *pCtx) { static bool percentile_function_setup(SQLFunctionCtx *pCtx) {
const int32_t MAX_AVAILABLE_BUFFER_SIZE = 1 << 20; // 1MB
const int32_t NUMOFCOLS = 1;
if (!function_setup(pCtx)) { if (!function_setup(pCtx)) {
return false; return false;
} }
SResultInfo *pResInfo = GET_RES_INFO(pCtx); SResultInfo *pResInfo = GET_RES_INFO(pCtx);
SSchema field[1] = { { (uint8_t)pCtx->inputType, "dummyCol", 0, pCtx->inputBytes } };
SColumnModel *pModel = createColumnModel(field, 1, 1000);
int32_t orderIdx = 0;
// tOrderDesc object
tOrderDescriptor *pDesc = tOrderDesCreate(&orderIdx, NUMOFCOLS, pModel, TSDB_ORDER_DESC);
((SPercentileInfo *)(pResInfo->interResultBuf))->pMemBucket = ((SPercentileInfo *)(pResInfo->interResultBuf))->pMemBucket =
tMemBucketCreate(1024, MAX_AVAILABLE_BUFFER_SIZE, pCtx->inputBytes, pCtx->inputType, pDesc); tMemBucketCreate(pCtx->inputBytes, pCtx->inputType);
return true; return true;
} }
...@@ -2488,15 +2478,13 @@ static void percentile_finalizer(SQLFunctionCtx *pCtx) { ...@@ -2488,15 +2478,13 @@ static void percentile_finalizer(SQLFunctionCtx *pCtx) {
SResultInfo *pResInfo = GET_RES_INFO(pCtx); SResultInfo *pResInfo = GET_RES_INFO(pCtx);
tMemBucket * pMemBucket = ((SPercentileInfo *)pResInfo->interResultBuf)->pMemBucket; tMemBucket * pMemBucket = ((SPercentileInfo *)pResInfo->interResultBuf)->pMemBucket;
if (pMemBucket->numOfElems > 0) { // check for null if (pMemBucket->total > 0) { // check for null
*(double *)pCtx->aOutputBuf = getPercentile(pMemBucket, v); *(double *)pCtx->aOutputBuf = getPercentile(pMemBucket, v);
} else { } else {
setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes); setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes);
} }
tOrderDescDestroy(pMemBucket->pOrderDesc);
tMemBucketDestroy(pMemBucket); tMemBucketDestroy(pMemBucket);
doFinalizer(pCtx); doFinalizer(pCtx);
} }
......
...@@ -274,7 +274,7 @@ static int32_t tscProcessDescribeTable(SSqlObj *pSql) { ...@@ -274,7 +274,7 @@ static int32_t tscProcessDescribeTable(SSqlObj *pSql) {
return tscSetValueToResObj(pSql, rowLen); return tscSetValueToResObj(pSql, rowLen);
} }
static void tscProcessCurrentUser(SSqlObj *pSql) { static int32_t tscProcessCurrentUser(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0); SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
...@@ -282,14 +282,20 @@ static void tscProcessCurrentUser(SSqlObj *pSql) { ...@@ -282,14 +282,20 @@ static void tscProcessCurrentUser(SSqlObj *pSql) {
pExpr->resType = TSDB_DATA_TYPE_BINARY; pExpr->resType = TSDB_DATA_TYPE_BINARY;
char* vx = calloc(1, pExpr->resBytes); char* vx = calloc(1, pExpr->resBytes);
if (vx == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
size_t size = sizeof(pSql->pTscObj->user); size_t size = sizeof(pSql->pTscObj->user);
STR_WITH_MAXSIZE_TO_VARSTR(vx, pSql->pTscObj->user, size); STR_WITH_MAXSIZE_TO_VARSTR(vx, pSql->pTscObj->user, size);
tscSetLocalQueryResult(pSql, vx, pExpr->aliasName, pExpr->resType, pExpr->resBytes); tscSetLocalQueryResult(pSql, vx, pExpr->aliasName, pExpr->resType, pExpr->resBytes);
free(vx); free(vx);
return TSDB_CODE_SUCCESS;
} }
static void tscProcessCurrentDB(SSqlObj *pSql) { static int32_t tscProcessCurrentDB(SSqlObj *pSql) {
char db[TSDB_DB_NAME_LEN] = {0}; char db[TSDB_DB_NAME_LEN] = {0};
extractDBName(pSql->pTscObj->db, db); extractDBName(pSql->pTscObj->db, db);
...@@ -302,6 +308,10 @@ static void tscProcessCurrentDB(SSqlObj *pSql) { ...@@ -302,6 +308,10 @@ static void tscProcessCurrentDB(SSqlObj *pSql) {
pExpr->resBytes = TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE; pExpr->resBytes = TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE;
char* vx = calloc(1, pExpr->resBytes); char* vx = calloc(1, pExpr->resBytes);
if (vx == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
if (t == 0) { if (t == 0) {
setVardataNull(vx, TSDB_DATA_TYPE_BINARY); setVardataNull(vx, TSDB_DATA_TYPE_BINARY);
} else { } else {
...@@ -310,9 +320,11 @@ static void tscProcessCurrentDB(SSqlObj *pSql) { ...@@ -310,9 +320,11 @@ static void tscProcessCurrentDB(SSqlObj *pSql) {
tscSetLocalQueryResult(pSql, vx, pExpr->aliasName, pExpr->resType, pExpr->resBytes); tscSetLocalQueryResult(pSql, vx, pExpr->aliasName, pExpr->resType, pExpr->resBytes);
free(vx); free(vx);
return TSDB_CODE_SUCCESS;
} }
static void tscProcessServerVer(SSqlObj *pSql) { static int32_t tscProcessServerVer(SSqlObj *pSql) {
const char* v = pSql->pTscObj->sversion; const char* v = pSql->pTscObj->sversion;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
...@@ -323,13 +335,18 @@ static void tscProcessServerVer(SSqlObj *pSql) { ...@@ -323,13 +335,18 @@ static void tscProcessServerVer(SSqlObj *pSql) {
pExpr->resBytes = (int16_t)(t + VARSTR_HEADER_SIZE); pExpr->resBytes = (int16_t)(t + VARSTR_HEADER_SIZE);
char* vx = calloc(1, pExpr->resBytes); char* vx = calloc(1, pExpr->resBytes);
if (vx == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
STR_WITH_SIZE_TO_VARSTR(vx, v, (VarDataLenT)t); STR_WITH_SIZE_TO_VARSTR(vx, v, (VarDataLenT)t);
tscSetLocalQueryResult(pSql, vx, pExpr->aliasName, pExpr->resType, pExpr->resBytes); tscSetLocalQueryResult(pSql, vx, pExpr->aliasName, pExpr->resType, pExpr->resBytes);
taosTFree(vx); free(vx);
return TSDB_CODE_SUCCESS;
} }
static void tscProcessClientVer(SSqlObj *pSql) { static int32_t tscProcessClientVer(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0); SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
...@@ -339,23 +356,28 @@ static void tscProcessClientVer(SSqlObj *pSql) { ...@@ -339,23 +356,28 @@ static void tscProcessClientVer(SSqlObj *pSql) {
pExpr->resBytes = (int16_t)(t + VARSTR_HEADER_SIZE); pExpr->resBytes = (int16_t)(t + VARSTR_HEADER_SIZE);
char* v = calloc(1, pExpr->resBytes); char* v = calloc(1, pExpr->resBytes);
if (v == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
STR_WITH_SIZE_TO_VARSTR(v, version, (VarDataLenT)t); STR_WITH_SIZE_TO_VARSTR(v, version, (VarDataLenT)t);
tscSetLocalQueryResult(pSql, v, pExpr->aliasName, pExpr->resType, pExpr->resBytes); tscSetLocalQueryResult(pSql, v, pExpr->aliasName, pExpr->resType, pExpr->resBytes);
taosTFree(v); free(v);
return TSDB_CODE_SUCCESS;
} }
static void tscProcessServStatus(SSqlObj *pSql) { static int32_t tscProcessServStatus(SSqlObj *pSql) {
STscObj* pObj = pSql->pTscObj; STscObj* pObj = pSql->pTscObj;
if (pObj->pHb != NULL) { if (pObj->pHb != NULL) {
if (pObj->pHb->res.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { if (pObj->pHb->res.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
pSql->res.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; pSql->res.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
return; return pSql->res.code;
} }
} else { } else {
if (pSql->res.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { if (pSql->res.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
return; return pSql->res.code;
} }
} }
...@@ -364,6 +386,7 @@ static void tscProcessServStatus(SSqlObj *pSql) { ...@@ -364,6 +386,7 @@ static void tscProcessServStatus(SSqlObj *pSql) {
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0); SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
int32_t val = 1; int32_t val = 1;
tscSetLocalQueryResult(pSql, (char*) &val, pExpr->aliasName, TSDB_DATA_TYPE_INT, sizeof(int32_t)); tscSetLocalQueryResult(pSql, (char*) &val, pExpr->aliasName, TSDB_DATA_TYPE_INT, sizeof(int32_t));
return TSDB_CODE_SUCCESS;
} }
void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnName, int16_t type, size_t valueLength) { void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnName, int16_t type, size_t valueLength) {
...@@ -393,37 +416,39 @@ void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnNa ...@@ -393,37 +416,39 @@ void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnNa
int tscProcessLocalCmd(SSqlObj *pSql) { int tscProcessLocalCmd(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
if (pCmd->command == TSDB_SQL_CFG_LOCAL) { if (pCmd->command == TSDB_SQL_CFG_LOCAL) {
pSql->res.code = (uint8_t)taosCfgDynamicOptions(pCmd->payload); pRes->code = (uint8_t)taosCfgDynamicOptions(pCmd->payload);
} else if (pCmd->command == TSDB_SQL_DESCRIBE_TABLE) { } else if (pCmd->command == TSDB_SQL_DESCRIBE_TABLE) {
pSql->res.code = (uint8_t)tscProcessDescribeTable(pSql); pRes->code = (uint8_t)tscProcessDescribeTable(pSql);
} else if (pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT) { } else if (pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
/* /*
* set the qhandle to be 1 in order to pass the qhandle check, and to call partial release function to * set the qhandle to be 1 in order to pass the qhandle check, and to call partial release function to
* free allocated resources and remove the SqlObj from sql query linked list * free allocated resources and remove the SqlObj from sql query linked list
*/ */
pSql->res.qhandle = 0x1; pRes->qhandle = 0x1;
pSql->res.numOfRows = 0; pRes->numOfRows = 0;
} else if (pCmd->command == TSDB_SQL_RESET_CACHE) { } else if (pCmd->command == TSDB_SQL_RESET_CACHE) {
taosCacheEmpty(tscCacheHandle); taosCacheEmpty(tscCacheHandle);
pRes->code = TSDB_CODE_SUCCESS;
} else if (pCmd->command == TSDB_SQL_SERV_VERSION) { } else if (pCmd->command == TSDB_SQL_SERV_VERSION) {
tscProcessServerVer(pSql); pRes->code = tscProcessServerVer(pSql);
} else if (pCmd->command == TSDB_SQL_CLI_VERSION) { } else if (pCmd->command == TSDB_SQL_CLI_VERSION) {
tscProcessClientVer(pSql); pRes->code = tscProcessClientVer(pSql);
} else if (pCmd->command == TSDB_SQL_CURRENT_USER) { } else if (pCmd->command == TSDB_SQL_CURRENT_USER) {
tscProcessCurrentUser(pSql); pRes->code = tscProcessCurrentUser(pSql);
} else if (pCmd->command == TSDB_SQL_CURRENT_DB) { } else if (pCmd->command == TSDB_SQL_CURRENT_DB) {
tscProcessCurrentDB(pSql); pRes->code = tscProcessCurrentDB(pSql);
} else if (pCmd->command == TSDB_SQL_SERV_STATUS) { } else if (pCmd->command == TSDB_SQL_SERV_STATUS) {
tscProcessServStatus(pSql); pRes->code = tscProcessServStatus(pSql);
} else { } else {
pSql->res.code = TSDB_CODE_TSC_INVALID_SQL; pRes->code = TSDB_CODE_TSC_INVALID_SQL;
tscError("%p not support command:%d", pSql, pCmd->command); tscError("%p not support command:%d", pSql, pCmd->command);
} }
// keep the code in local variable in order to avoid invalid read in case of async query // keep the code in local variable in order to avoid invalid read in case of async query
int32_t code = pSql->res.code; int32_t code = pRes->code;
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
(*pSql->fp)(pSql->param, pSql, code); (*pSql->fp)(pSql->param, pSql, code);
} else { } else {
......
...@@ -67,8 +67,7 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalReducer *pReducer, tOrderDesc ...@@ -67,8 +67,7 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalReducer *pReducer, tOrderDesc
SQLFunctionCtx *pCtx = &pReducer->pCtx[i]; SQLFunctionCtx *pCtx = &pReducer->pCtx[i];
SSqlExpr * pExpr = tscSqlExprGet(pQueryInfo, i); SSqlExpr * pExpr = tscSqlExprGet(pQueryInfo, i);
pCtx->aOutputBuf = pCtx->aOutputBuf = pReducer->pResultBuf->data + pExpr->offset * pReducer->resColModel->capacity;
pReducer->pResultBuf->data + pExpr->offset * pReducer->resColModel->capacity;
pCtx->order = pQueryInfo->order.order; pCtx->order = pQueryInfo->order.order;
pCtx->functionId = pExpr->functionId; pCtx->functionId = pExpr->functionId;
...@@ -160,7 +159,6 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd ...@@ -160,7 +159,6 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
if (pMemBuffer == NULL) { if (pMemBuffer == NULL) {
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer); tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer);
tscError("%p pMemBuffer is NULL", pMemBuffer); tscError("%p pMemBuffer is NULL", pMemBuffer);
pRes->code = TSDB_CODE_TSC_APP_ERROR; pRes->code = TSDB_CODE_TSC_APP_ERROR;
return; return;
...@@ -168,7 +166,6 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd ...@@ -168,7 +166,6 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
if (pDesc->pColumnModel == NULL) { if (pDesc->pColumnModel == NULL) {
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer); tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer);
tscError("%p no local buffer or intermediate result format model", pSql); tscError("%p no local buffer or intermediate result format model", pSql);
pRes->code = TSDB_CODE_TSC_APP_ERROR; pRes->code = TSDB_CODE_TSC_APP_ERROR;
return; return;
...@@ -188,7 +185,6 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd ...@@ -188,7 +185,6 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
if (numOfFlush == 0 || numOfBuffer == 0) { if (numOfFlush == 0 || numOfBuffer == 0) {
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer); tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer);
tscDebug("%p retrieved no data", pSql); tscDebug("%p retrieved no data", pSql);
return; return;
} }
...@@ -279,6 +275,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd ...@@ -279,6 +275,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
taosTFree(pReducer); taosTFree(pReducer);
return; return;
} }
param->pLocalData = pReducer->pLocalDataSrc; param->pLocalData = pReducer->pLocalDataSrc;
param->pDesc = pReducer->pDesc; param->pDesc = pReducer->pDesc;
param->num = pReducer->pLocalDataSrc[0]->pMemBuffer->numOfElemsPerPage; param->num = pReducer->pLocalDataSrc[0]->pMemBuffer->numOfElemsPerPage;
......
...@@ -5827,22 +5827,34 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) { ...@@ -5827,22 +5827,34 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
int32_t ret = TSDB_CODE_SUCCESS; int32_t ret = TSDB_CODE_SUCCESS;
for (int32_t i = 0; i < pList->nExpr; ++i) { for (int32_t i = 0; i < pList->nExpr; ++i) {
SSchema* pSchema = pTagSchema + i; SSchema* pSchema = &pTagSchema[i];
char tagVal[TSDB_MAX_TAGS_LEN];
if (pSchema->type == TSDB_DATA_TYPE_BINARY || pSchema->type == TSDB_DATA_TYPE_NCHAR) { if (pSchema->type == TSDB_DATA_TYPE_BINARY || pSchema->type == TSDB_DATA_TYPE_NCHAR) {
// validate the length of binary if (pList->a[i].pVar.nLen > pSchema->bytes) {
if (pList->a[i].pVar.nLen + VARSTR_HEADER_SIZE > pSchema->bytes) {
tdDestroyKVRowBuilder(&kvRowBuilder); tdDestroyKVRowBuilder(&kvRowBuilder);
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
} }
} }
char tagVal[TSDB_MAX_TAGS_LEN];
ret = tVariantDump(&(pList->a[i].pVar), tagVal, pSchema->type, true); ret = tVariantDump(&(pList->a[i].pVar), tagVal, pSchema->type, true);
// check again after the convert since it may be converted from binary to nchar.
if (pSchema->type == TSDB_DATA_TYPE_BINARY || pSchema->type == TSDB_DATA_TYPE_NCHAR) {
int16_t len = varDataTLen(tagVal);
if (len > pSchema->bytes) {
tdDestroyKVRowBuilder(&kvRowBuilder);
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
}
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
tdDestroyKVRowBuilder(&kvRowBuilder); tdDestroyKVRowBuilder(&kvRowBuilder);
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4);
} }
tdAddColToKVRow(&kvRowBuilder, pSchema->colId, pSchema->type, tagVal); tdAddColToKVRow(&kvRowBuilder, pSchema->colId, pSchema->type, tagVal);
} }
......
...@@ -226,17 +226,13 @@ int tscSendMsgToServer(SSqlObj *pSql) { ...@@ -226,17 +226,13 @@ int tscSendMsgToServer(SSqlObj *pSql) {
.handle = &pSql->pRpcCtx, .handle = &pSql->pRpcCtx,
.code = 0 .code = 0
}; };
// NOTE: the rpc context should be acquired before sending data to server. // NOTE: the rpc context should be acquired before sending data to server.
// Otherwise, the pSql object may have been released already during the response function, which is // Otherwise, the pSql object may have been released already during the response function, which is
// processMsgFromServer function. In the meanwhile, the assignment of the rpc context to sql object will absolutely // processMsgFromServer function. In the meanwhile, the assignment of the rpc context to sql object will absolutely
// cause crash. // cause crash.
if (pObj != NULL && pObj->signature == pObj) { rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg);
rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg); return TSDB_CODE_SUCCESS;
return TSDB_CODE_SUCCESS;
} else {
//pObj->signature has been reset by other thread, ignore concurrency problem
return TSDB_CODE_TSC_CONN_KILLED;
}
} }
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
...@@ -1496,8 +1492,7 @@ int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1496,8 +1492,7 @@ int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
char *tmpData = NULL; char *tmpData = NULL;
uint32_t len = pSql->cmd.payloadLen; uint32_t len = pSql->cmd.payloadLen;
if (len > 0) { if (len > 0) {
tmpData = calloc(1, len); if ((tmpData = calloc(1, len)) == NULL) {
if (NULL == tmpData) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
...@@ -1542,8 +1537,7 @@ int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1542,8 +1537,7 @@ int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
// copy payload content to temp buff // copy payload content to temp buff
char *tmpData = 0; char *tmpData = 0;
if (pCmd->payloadLen > 0) { if (pCmd->payloadLen > 0) {
tmpData = calloc(1, pCmd->payloadLen + 1); if ((tmpData = calloc(1, pCmd->payloadLen + 1)) == NULL) return -1;
if (NULL == tmpData) return -1;
memcpy(tmpData, pCmd->payload, pCmd->payloadLen); memcpy(tmpData, pCmd->payload, pCmd->payloadLen);
} }
......
...@@ -574,8 +574,9 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar ...@@ -574,8 +574,9 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar
SSchema* pColSchema = tscGetTableColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId); SSchema* pColSchema = tscGetTableColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId);
*s1 = taosArrayInit(p1->num, p1->tagSize); // int16_t for padding
*s2 = taosArrayInit(p2->num, p2->tagSize); *s1 = taosArrayInit(p1->num, p1->tagSize - sizeof(int16_t));
*s2 = taosArrayInit(p2->num, p2->tagSize - sizeof(int16_t));
if (!(checkForDuplicateTagVal(pQueryInfo, p1, pParentSql) && checkForDuplicateTagVal(pQueryInfo, p2, pParentSql))) { if (!(checkForDuplicateTagVal(pQueryInfo, p1, pParentSql) && checkForDuplicateTagVal(pQueryInfo, p2, pParentSql))) {
return TSDB_CODE_QRY_DUP_JOIN_KEY; return TSDB_CODE_QRY_DUP_JOIN_KEY;
...@@ -1043,6 +1044,10 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) { ...@@ -1043,6 +1044,10 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) {
int32_t numOfExprs = (int32_t)tscSqlExprNumOfExprs(pQueryInfo); int32_t numOfExprs = (int32_t)tscSqlExprNumOfExprs(pQueryInfo);
pRes->pColumnIndex = calloc(1, sizeof(SColumnIndex) * numOfExprs); pRes->pColumnIndex = calloc(1, sizeof(SColumnIndex) * numOfExprs);
if (pRes->pColumnIndex == NULL) {
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
return;
}
for (int32_t i = 0; i < numOfExprs; ++i) { for (int32_t i = 0; i < numOfExprs; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
...@@ -1157,7 +1162,8 @@ static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code); ...@@ -1157,7 +1162,8 @@ static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code);
static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj); static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj);
int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter *pSupporter) { // TODO
int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter *pSupporter) {
SSqlCmd * pCmd = &pSql->cmd; SSqlCmd * pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
...@@ -1203,7 +1209,9 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter ...@@ -1203,7 +1209,9 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
// this data needs to be transfer to support struct // this data needs to be transfer to support struct
memset(&pNewQueryInfo->fieldsInfo, 0, sizeof(SFieldInfo)); memset(&pNewQueryInfo->fieldsInfo, 0, sizeof(SFieldInfo));
tscTagCondCopy(&pSupporter->tagCond, &pNewQueryInfo->tagCond);//pNewQueryInfo->tagCond; if (tscTagCondCopy(&pSupporter->tagCond, &pNewQueryInfo->tagCond) != 0) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
pNew->cmd.numOfCols = 0; pNew->cmd.numOfCols = 0;
pNewQueryInfo->intervalTime = 0; pNewQueryInfo->intervalTime = 0;
...@@ -1300,52 +1308,75 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter ...@@ -1300,52 +1308,75 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
pNewQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY; pNewQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY;
} }
return tscProcessSql(pNew); return TSDB_CODE_SUCCESS;
} }
int32_t tscHandleMasterJoinQuery(SSqlObj* pSql) { void tscHandleMasterJoinQuery(SSqlObj* pSql) {
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
assert((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0); assert((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0);
int32_t code = TSDB_CODE_SUCCESS;
// todo add test // todo add test
SSubqueryState *pState = calloc(1, sizeof(SSubqueryState)); SSubqueryState *pState = calloc(1, sizeof(SSubqueryState));
if (pState == NULL) { if (pState == NULL) {
pSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY; code = TSDB_CODE_TSC_OUT_OF_MEMORY;
return pSql->res.code; goto _error;
} }
pState->numOfTotal = pQueryInfo->numOfTables; pState->numOfTotal = pQueryInfo->numOfTables;
pState->numOfRemain = pState->numOfTotal; pState->numOfRemain = pState->numOfTotal;
bool hasEmptySub = false;
tscDebug("%p start subquery, total:%d", pSql, pQueryInfo->numOfTables); tscDebug("%p start subquery, total:%d", pSql, pQueryInfo->numOfTables);
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
SJoinSupporter *pSupporter = tscCreateJoinSupporter(pSql, pState, i); SJoinSupporter *pSupporter = tscCreateJoinSupporter(pSql, pState, i);
if (pSupporter == NULL) { // failed to create support struct, abort current query if (pSupporter == NULL) { // failed to create support struct, abort current query
tscError("%p tableIndex:%d, failed to allocate join support object, abort further query", pSql, i); tscError("%p tableIndex:%d, failed to allocate join support object, abort further query", pSql, i);
pState->numOfRemain = i; code = TSDB_CODE_TSC_OUT_OF_MEMORY;
pSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY; goto _error;
if (0 == i) {
taosTFree(pState);
}
return pSql->res.code;
} }
int32_t code = tscLaunchJoinSubquery(pSql, i, pSupporter); code = tscCreateJoinSubquery(pSql, i, pSupporter);
if (code != TSDB_CODE_SUCCESS) { // failed to create subquery object, quit query if (code != TSDB_CODE_SUCCESS) { // failed to create subquery object, quit query
tscDestroyJoinSupporter(pSupporter); tscDestroyJoinSupporter(pSupporter);
pSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY; goto _error;
if (0 == i) { }
taosTFree(pState);
} SSqlObj* pSub = pSql->pSubs[i];
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSub->cmd, 0, 0);
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo) && (pTableMetaInfo->vgroupList->numOfVgroups == 0)) {
hasEmptySub = true;
break; break;
} }
} }
pSql->cmd.command = (pSql->numOfSubs <= 0)? TSDB_SQL_RETRIEVE_EMPTY_RESULT:TSDB_SQL_TABLE_JOIN_RETRIEVE; if (hasEmptySub) { // at least one subquery is empty, do nothing and return
freeJoinSubqueryObj(pSql);
return TSDB_CODE_SUCCESS; pSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
(*pSql->fp)(pSql->param, pSql, 0);
} else {
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
SSqlObj* pSub = pSql->pSubs[i];
if ((code = tscProcessSql(pSub)) != TSDB_CODE_SUCCESS) {
pState->numOfRemain = i - 1; // the already sent reques will continue and do not go to the error process routine
break;
}
}
pSql->cmd.command = TSDB_SQL_TABLE_JOIN_RETRIEVE;
}
return;
_error:
pRes->code = code;
tscQueueAsyncRes(pSql);
} }
static void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs, SSubqueryState* pState) { static void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs, SSubqueryState* pState) {
...@@ -1384,7 +1415,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { ...@@ -1384,7 +1415,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
const uint32_t nBufferSize = (1u << 16); // 64KB const uint32_t nBufferSize = (1u << 16); // 64KB
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
pSql->numOfSubs = pTableMetaInfo->vgroupList->numOfVgroups; pSql->numOfSubs = pTableMetaInfo->vgroupList->numOfVgroups;
...@@ -1399,9 +1430,20 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { ...@@ -1399,9 +1430,20 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
} }
pSql->pSubs = calloc(pSql->numOfSubs, POINTER_BYTES); pSql->pSubs = calloc(pSql->numOfSubs, POINTER_BYTES);
tscDebug("%p retrieved query data from %d vnode(s)", pSql, pSql->numOfSubs); tscDebug("%p retrieved query data from %d vnode(s)", pSql, pSql->numOfSubs);
SSubqueryState *pState = calloc(1, sizeof(SSubqueryState)); SSubqueryState *pState = calloc(1, sizeof(SSubqueryState));
if (pSql->pSubs == NULL || pState == NULL) {
taosTFree(pState);
taosTFree(pSql->pSubs);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pSql->numOfSubs);
tscQueueAsyncRes(pSql);
return ret;
}
pState->numOfTotal = pSql->numOfSubs; pState->numOfTotal = pSql->numOfSubs;
pState->numOfRemain = pSql->numOfSubs; pState->numOfRemain = pSql->numOfSubs;
...@@ -2033,8 +2075,21 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) { ...@@ -2033,8 +2075,21 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) {
numOfRes = (int32_t)(MIN(numOfRes, pSql->pSubs[i]->res.numOfRows)); numOfRes = (int32_t)(MIN(numOfRes, pSql->pSubs[i]->res.numOfRows));
} }
if (numOfRes == 0) {
return;
}
int32_t totalSize = tscGetResRowLength(pQueryInfo->exprList); int32_t totalSize = tscGetResRowLength(pQueryInfo->exprList);
pRes->pRsp = realloc(pRes->pRsp, numOfRes * totalSize);
assert(numOfRes * totalSize > 0);
char* tmp = realloc(pRes->pRsp, numOfRes * totalSize);
if (tmp == NULL) {
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
return;
} else {
pRes->pRsp = tmp;
}
pRes->data = pRes->pRsp; pRes->data = pRes->pRsp;
char* data = pRes->data; char* data = pRes->data;
...@@ -2073,6 +2128,12 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) { ...@@ -2073,6 +2128,12 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) {
pRes->buffer = calloc(numOfExprs, POINTER_BYTES); pRes->buffer = calloc(numOfExprs, POINTER_BYTES);
pRes->length = calloc(numOfExprs, sizeof(int32_t)); pRes->length = calloc(numOfExprs, sizeof(int32_t));
if (pRes->tsrow == NULL || pRes->buffer == NULL || pRes->length == NULL) {
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscQueueAsyncRes(pSql);
return;
}
tscRestoreSQLFuncForSTableQuery(pQueryInfo); tscRestoreSQLFuncForSTableQuery(pQueryInfo);
} }
......
...@@ -254,15 +254,12 @@ int32_t tscCreateResPointerInfo(SSqlRes* pRes, SQueryInfo* pQueryInfo) { ...@@ -254,15 +254,12 @@ int32_t tscCreateResPointerInfo(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
pRes->numOfCols = numOfOutput; pRes->numOfCols = numOfOutput;
pRes->tsrow = calloc(numOfOutput, POINTER_BYTES); pRes->tsrow = calloc(numOfOutput, POINTER_BYTES);
pRes->length = calloc(numOfOutput, sizeof(int32_t)); // todo refactor pRes->length = calloc(numOfOutput, sizeof(int32_t));
pRes->buffer = calloc(numOfOutput, POINTER_BYTES); pRes->buffer = calloc(numOfOutput, POINTER_BYTES);
// not enough memory // not enough memory
if (pRes->tsrow == NULL || (pRes->buffer == NULL && pRes->numOfCols > 0)) { if (pRes->tsrow == NULL || (pRes->buffer == NULL && pRes->numOfCols > 0)) {
taosTFree(pRes->tsrow); taosTFree(pRes->tsrow);
taosTFree(pRes->buffer);
taosTFree(pRes->length);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
return pRes->code; return pRes->code;
} }
...@@ -281,13 +278,14 @@ void tscDestroyResPointerInfo(SSqlRes* pRes) { ...@@ -281,13 +278,14 @@ void tscDestroyResPointerInfo(SSqlRes* pRes) {
} }
taosTFree(pRes->pRsp); taosTFree(pRes->pRsp);
taosTFree(pRes->tsrow); taosTFree(pRes->tsrow);
taosTFree(pRes->length); taosTFree(pRes->length);
taosTFree(pRes->buffer);
taosTFree(pRes->pGroupRec); taosTFree(pRes->pGroupRec);
taosTFree(pRes->pColumnIndex); taosTFree(pRes->pColumnIndex);
taosTFree(pRes->buffer);
if (pRes->pArithSup != NULL) { if (pRes->pArithSup != NULL) {
taosTFree(pRes->pArithSup->data); taosTFree(pRes->pArithSup->data);
taosTFree(pRes->pArithSup); taosTFree(pRes->pArithSup);
...@@ -1052,7 +1050,7 @@ void tscSqlExprInfoDestroy(SArray* pExprInfo) { ...@@ -1052,7 +1050,7 @@ void tscSqlExprInfoDestroy(SArray* pExprInfo) {
taosArrayDestroy(pExprInfo); taosArrayDestroy(pExprInfo);
} }
void tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy) { int32_t tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy) {
assert(src != NULL && dst != NULL); assert(src != NULL && dst != NULL);
size_t size = taosArrayGetSize(src); size_t size = taosArrayGetSize(src);
...@@ -1064,7 +1062,7 @@ void tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy) ...@@ -1064,7 +1062,7 @@ void tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy)
if (deepcopy) { if (deepcopy) {
SSqlExpr* p1 = calloc(1, sizeof(SSqlExpr)); SSqlExpr* p1 = calloc(1, sizeof(SSqlExpr));
if (p1 == NULL) { if (p1 == NULL) {
assert(0); return -1;
} }
*p1 = *pExpr; *p1 = *pExpr;
...@@ -1078,6 +1076,8 @@ void tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy) ...@@ -1078,6 +1076,8 @@ void tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy)
} }
} }
} }
return 0;
} }
SColumn* tscColumnListInsert(SArray* pColumnList, SColumnIndex* pColIndex) { SColumn* tscColumnListInsert(SArray* pColumnList, SColumnIndex* pColIndex) {
...@@ -1324,11 +1324,14 @@ bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId, int32_t ...@@ -1324,11 +1324,14 @@ bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId, int32_t
return false; return false;
} }
void tscTagCondCopy(STagCond* dest, const STagCond* src) { int32_t tscTagCondCopy(STagCond* dest, const STagCond* src) {
memset(dest, 0, sizeof(STagCond)); memset(dest, 0, sizeof(STagCond));
if (src->tbnameCond.cond != NULL) { if (src->tbnameCond.cond != NULL) {
dest->tbnameCond.cond = strdup(src->tbnameCond.cond); dest->tbnameCond.cond = strdup(src->tbnameCond.cond);
if (dest->tbnameCond.cond == NULL) {
return -1;
}
} }
dest->tbnameCond.uid = src->tbnameCond.uid; dest->tbnameCond.uid = src->tbnameCond.uid;
...@@ -1337,7 +1340,7 @@ void tscTagCondCopy(STagCond* dest, const STagCond* src) { ...@@ -1337,7 +1340,7 @@ void tscTagCondCopy(STagCond* dest, const STagCond* src) {
dest->relType = src->relType; dest->relType = src->relType;
if (src->pCond == NULL) { if (src->pCond == NULL) {
return; return 0;
} }
size_t s = taosArrayGetSize(src->pCond); size_t s = taosArrayGetSize(src->pCond);
...@@ -1354,7 +1357,7 @@ void tscTagCondCopy(STagCond* dest, const STagCond* src) { ...@@ -1354,7 +1357,7 @@ void tscTagCondCopy(STagCond* dest, const STagCond* src) {
assert(pCond->cond != NULL); assert(pCond->cond != NULL);
c.cond = malloc(c.len); c.cond = malloc(c.len);
if (c.cond == NULL) { if (c.cond == NULL) {
assert(0); return -1;
} }
memcpy(c.cond, pCond->cond, c.len); memcpy(c.cond, pCond->cond, c.len);
...@@ -1362,6 +1365,8 @@ void tscTagCondCopy(STagCond* dest, const STagCond* src) { ...@@ -1362,6 +1365,8 @@ void tscTagCondCopy(STagCond* dest, const STagCond* src) {
taosArrayPush(dest->pCond, &c); taosArrayPush(dest->pCond, &c);
} }
return 0;
} }
void tscTagCondRelease(STagCond* pTagCond) { void tscTagCondRelease(STagCond* pTagCond) {
...@@ -1855,7 +1860,10 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void ...@@ -1855,7 +1860,10 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
} }
} }
tscTagCondCopy(&pNewQueryInfo->tagCond, &pQueryInfo->tagCond); if (tscTagCondCopy(&pNewQueryInfo->tagCond, &pQueryInfo->tagCond) != 0) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error;
}
if (pQueryInfo->fillType != TSDB_FILL_NONE) { if (pQueryInfo->fillType != TSDB_FILL_NONE) {
pNewQueryInfo->fillVal = malloc(pQueryInfo->fieldsInfo.numOfOutput * sizeof(int64_t)); pNewQueryInfo->fillVal = malloc(pQueryInfo->fieldsInfo.numOfOutput * sizeof(int64_t));
...@@ -1884,7 +1892,10 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void ...@@ -1884,7 +1892,10 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
} }
uint64_t uid = pTableMetaInfo->pTableMeta->id.uid; uint64_t uid = pTableMetaInfo->pTableMeta->id.uid;
tscSqlExprCopy(pNewQueryInfo->exprList, pQueryInfo->exprList, uid, true); if (tscSqlExprCopy(pNewQueryInfo->exprList, pQueryInfo->exprList, uid, true) != 0) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error;
}
doSetSqlExprAndResultFieldInfo(pQueryInfo, pNewQueryInfo, uid); doSetSqlExprAndResultFieldInfo(pQueryInfo, pNewQueryInfo, uid);
......
...@@ -189,7 +189,10 @@ int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, in ...@@ -189,7 +189,10 @@ int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, in
start *= 1000L; start *= 1000L;
} }
} else { } else {
start = ((start - intervalTime) / slidingTime + 1) * slidingTime; int64_t delta = startTime - intervalTime;
int32_t factor = delta > 0? 1:-1;
start = (delta / slidingTime + factor) * slidingTime;
if (timeUnit == 'd' || timeUnit == 'w') { if (timeUnit == 'd' || timeUnit == 'w') {
/* /*
......
...@@ -17,6 +17,8 @@ ...@@ -17,6 +17,8 @@
#define TDENGINE_QPERCENTILE_H #define TDENGINE_QPERCENTILE_H
#include "qExtbuffer.h" #include "qExtbuffer.h"
#include "qResultbuf.h"
#include "qTsbuf.h"
typedef struct MinMaxEntry { typedef struct MinMaxEntry {
union { union {
...@@ -31,47 +33,43 @@ typedef struct MinMaxEntry { ...@@ -31,47 +33,43 @@ typedef struct MinMaxEntry {
}; };
} MinMaxEntry; } MinMaxEntry;
typedef struct tMemBucketSegment { typedef struct {
int32_t numOfSlots; int32_t size;
MinMaxEntry * pBoundingEntries; int32_t pageId;
tExtMemBuffer **pBuffer; tFilePage *data;
} tMemBucketSegment; } SSlotInfo;
typedef struct tMemBucketSlot {
SSlotInfo info;
MinMaxEntry range;
} tMemBucketSlot;
struct tMemBucket;
typedef int32_t (*__perc_hash_func_t)(struct tMemBucket *pBucket, const void *value);
typedef struct tMemBucket { typedef struct tMemBucket {
int16_t numOfSegs; int16_t numOfSlots;
int16_t nTotalSlots; int16_t type;
int16_t nSlotsOfSeg; int16_t bytes;
int16_t dataType; int32_t total;
int32_t elemPerPage; // number of elements for each object
int16_t nElemSize; int32_t maxCapacity; // maximum allowed number of elements that can be sort directly to get the result
int32_t numOfElems; int32_t bufPageSize; // disk page size
MinMaxEntry range; // value range
int32_t nTotalBufferSize; int32_t times; // count that has been checked for deciding the correct data value buckets.
int32_t maxElemsCapacity; __compar_fn_t comparFn;
int32_t pageSize; tMemBucketSlot *pSlots;
int16_t numOfTotalPages; SDiskbasedResultBuf *pBuffer;
int16_t numOfAvailPages; /* remain available buffer pages */ __perc_hash_func_t hashFunc;
tMemBucketSegment *pSegs;
tOrderDescriptor * pOrderDesc;
MinMaxEntry nRange;
void (*HashFunc)(struct tMemBucket *pBucket, void *value, int16_t *segIdx, int16_t *slotIdx);
} tMemBucket; } tMemBucket;
tMemBucket *tMemBucketCreate(int32_t totalSlots, int32_t nBufferSize, int16_t nElemSize, int16_t dataType, tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType);
tOrderDescriptor *pDesc);
void tMemBucketDestroy(tMemBucket *pBucket); void tMemBucketDestroy(tMemBucket *pBucket);
void tMemBucketPut(tMemBucket *pBucket, void *data, int32_t numOfRows); void tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size);
double getPercentile(tMemBucket *pMemBucket, double percent); double getPercentile(tMemBucket *pMemBucket, double percent);
void tBucketIntHash(tMemBucket *pBucket, void *value, int16_t *segIdx, int16_t *slotIdx);
void tBucketDoubleHash(tMemBucket *pBucket, void *value, int16_t *segIdx, int16_t *slotIdx);
#endif // TDENGINE_QPERCENTILE_H #endif // TDENGINE_QPERCENTILE_H
...@@ -1964,6 +1964,15 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, bool stableQuery) { ...@@ -1964,6 +1964,15 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, bool stableQuery) {
return; return;
} }
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
pQuery->order.order = TSDB_ORDER_ASC;
if (pQuery->window.skey > pQuery->window.ekey) {
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
}
return;
}
if (isPointInterpoQuery(pQuery) && pQuery->intervalTime == 0) { if (isPointInterpoQuery(pQuery) && pQuery->intervalTime == 0) {
if (!QUERY_IS_ASC_QUERY(pQuery)) { if (!QUERY_IS_ASC_QUERY(pQuery)) {
qDebug(msg, GET_QINFO_ADDR(pQuery), "interp", pQuery->order.order, TSDB_ORDER_ASC, pQuery->window.skey, qDebug(msg, GET_QINFO_ADDR(pQuery), "interp", pQuery->order.order, TSDB_ORDER_ASC, pQuery->window.skey,
...@@ -2125,35 +2134,36 @@ static bool needToLoadDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SDataStatis *pDat ...@@ -2125,35 +2134,36 @@ static bool needToLoadDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SDataStatis *pDat
return false; return false;
} }
#define PT_IN_WINDOW(_p, _w) ((_p) > (_w).skey && (_p) < (_w).ekey)
static bool overlapWithTimeWindow(SQuery* pQuery, SDataBlockInfo* pBlockInfo) { static bool overlapWithTimeWindow(SQuery* pQuery, SDataBlockInfo* pBlockInfo) {
STimeWindow w = {0}; STimeWindow w = {0};
TSKEY sk = MIN(pQuery->window.skey, pQuery->window.ekey); TSKEY sk = MIN(pQuery->window.skey, pQuery->window.ekey);
TSKEY ek = MAX(pQuery->window.skey, pQuery->window.ekey); TSKEY ek = MAX(pQuery->window.skey, pQuery->window.ekey);
if (QUERY_IS_ASC_QUERY(pQuery)) { if (QUERY_IS_ASC_QUERY(pQuery)) {
getAlignQueryTimeWindow(pQuery, pBlockInfo->window.skey, sk, ek, &w); getAlignQueryTimeWindow(pQuery, pBlockInfo->window.skey, sk, ek, &w);
assert(w.ekey >= pBlockInfo->window.skey);
if (PT_IN_WINDOW(w.ekey, pBlockInfo->window)) { if (w.ekey < pBlockInfo->window.ekey) {
return true; return true;
} }
while(1) { while(1) {
GET_NEXT_TIMEWINDOW(pQuery, &w); GET_NEXT_TIMEWINDOW(pQuery, &w);
if (w.skey > pBlockInfo->window.skey) { if (w.skey > pBlockInfo->window.ekey) {
break; break;
} }
if (PT_IN_WINDOW(w.skey, pBlockInfo->window) || PT_IN_WINDOW(w.ekey, pBlockInfo->window)) { assert(w.ekey > pBlockInfo->window.ekey);
if (w.skey <= pBlockInfo->window.ekey && w.skey > pBlockInfo->window.skey) {
return true; return true;
} }
} }
} else { } else {
getAlignQueryTimeWindow(pQuery, pBlockInfo->window.ekey, sk, ek, &w); getAlignQueryTimeWindow(pQuery, pBlockInfo->window.ekey, sk, ek, &w);
if (PT_IN_WINDOW(w.skey, pBlockInfo->window)) { assert(w.skey <= pBlockInfo->window.ekey);
if (w.skey > pBlockInfo->window.skey) {
return true; return true;
} }
...@@ -2163,7 +2173,8 @@ static bool overlapWithTimeWindow(SQuery* pQuery, SDataBlockInfo* pBlockInfo) { ...@@ -2163,7 +2173,8 @@ static bool overlapWithTimeWindow(SQuery* pQuery, SDataBlockInfo* pBlockInfo) {
break; break;
} }
if (PT_IN_WINDOW(w.skey, pBlockInfo->window) || PT_IN_WINDOW(w.ekey, pBlockInfo->window)) { assert(w.skey < pBlockInfo->window.skey);
if (w.ekey < pBlockInfo->window.ekey && w.ekey >= pBlockInfo->window.skey) {
return true; return true;
} }
} }
...@@ -4440,7 +4451,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo ...@@ -4440,7 +4451,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
// NOTE: pTableCheckInfo need to update the query time range and the lastKey info // NOTE: pTableCheckInfo need to update the query time range and the lastKey info
// TODO fixme // TODO fixme
changeExecuteScanOrder(pQInfo, false); changeExecuteScanOrder(pQInfo, isSTableQuery);
code = setupQueryHandle(tsdb, pQInfo, isSTableQuery); code = setupQueryHandle(tsdb, pQInfo, isSTableQuery);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -6149,6 +6160,9 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, ...@@ -6149,6 +6160,9 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
goto _cleanup; goto _cleanup;
} }
// NOTE: pTableCheckInfo need to update the query time range and the lastKey info
// changeExecuteScanOrder(pQInfo, stableQuery);
int32_t index = 0; int32_t index = 0;
for(int32_t i = 0; i < numOfGroups; ++i) { for(int32_t i = 0; i < numOfGroups; ++i) {
......
...@@ -14,310 +14,291 @@ ...@@ -14,310 +14,291 @@
*/ */
#include "qPercentile.h" #include "qPercentile.h"
#include "qResultbuf.h"
#include "os.h" #include "os.h"
#include "queryLog.h" #include "queryLog.h"
#include "taosdef.h" #include "taosdef.h"
#include "taosmsg.h"
#include "tulog.h" #include "tulog.h"
#include "tcompare.h"
tExtMemBuffer *releaseBucketsExceptFor(tMemBucket *pMemBucket, int16_t segIdx, int16_t slotIdx) { #define DEFAULT_NUM_OF_SLOT 1024
tExtMemBuffer *pBuffer = NULL;
int32_t getGroupId(int32_t numOfSlots, int32_t slotIndex, int32_t times) {
for (int32_t i = 0; i < pMemBucket->numOfSegs; ++i) { return (times * numOfSlots) + slotIndex;
tMemBucketSegment *pSeg = &pMemBucket->pSegs[i]; }
for (int32_t j = 0; j < pSeg->numOfSlots; ++j) { static tFilePage *loadDataFromFilePage(tMemBucket *pMemBucket, int32_t slotIdx) {
if (i == segIdx && j == slotIdx) { tFilePage *buffer = (tFilePage *)calloc(1, pMemBucket->bytes * pMemBucket->pSlots[slotIdx].info.size + sizeof(tFilePage));
pBuffer = pSeg->pBuffer[j];
} else { int32_t groupId = getGroupId(pMemBucket->numOfSlots, slotIdx, pMemBucket->times);
if (pSeg->pBuffer && pSeg->pBuffer[j]) { SIDList list = getDataBufPagesIdList(pMemBucket->pBuffer, groupId);
pSeg->pBuffer[j] = destoryExtMemBuffer(pSeg->pBuffer[j]);
} int32_t offset = 0;
} for(int32_t i = 0; i < list->size; ++i) {
} SPageInfo* pgInfo = *(SPageInfo**) taosArrayGet(list, i);
tFilePage* pg = getResBufPage(pMemBucket->pBuffer, pgInfo->pageId);
memcpy(buffer->data + offset, pg->data, pg->num * pMemBucket->bytes);
offset += pg->num * pMemBucket->bytes;
} }
return pBuffer; qsort(buffer->data, pMemBucket->pSlots[slotIdx].info.size, pMemBucket->bytes, pMemBucket->comparFn);
return buffer;
} }
static tFilePage *loadIntoBucketFromDisk(tMemBucket *pMemBucket, int32_t segIdx, int32_t slotIdx, static void resetBoundingBox(MinMaxEntry* range, int32_t type) {
tOrderDescriptor *pDesc) { switch (type) {
// release all data in other slots case TSDB_DATA_TYPE_BIGINT: {
tExtMemBuffer *pMemBuffer = pMemBucket->pSegs[segIdx].pBuffer[slotIdx]; range->i64MaxVal = INT64_MIN;
tFilePage * buffer = (tFilePage *)calloc(1, pMemBuffer->nElemSize * pMemBuffer->numOfTotalElems + sizeof(tFilePage)); range->i64MinVal = INT64_MAX;
int32_t oldCapacity = pDesc->pColumnModel->capacity; break;
pDesc->pColumnModel->capacity = pMemBuffer->numOfTotalElems; };
case TSDB_DATA_TYPE_INT:
if (!tExtMemBufferIsAllDataInMem(pMemBuffer)) { case TSDB_DATA_TYPE_SMALLINT:
pMemBuffer = releaseBucketsExceptFor(pMemBucket, segIdx, slotIdx); case TSDB_DATA_TYPE_TINYINT: {
assert(pMemBuffer->numOfTotalElems > 0); range->iMaxVal = INT32_MIN;
range->iMinVal = INT32_MAX;
// load data in disk to memory break;
tFilePage *pPage = (tFilePage *)calloc(1, pMemBuffer->pageSize); };
case TSDB_DATA_TYPE_DOUBLE:
for (uint32_t i = 0; i < pMemBuffer->fileMeta.flushoutData.nLength; ++i) { case TSDB_DATA_TYPE_FLOAT: {
tFlushoutInfo *pFlushInfo = &pMemBuffer->fileMeta.flushoutData.pFlushoutInfo[i]; range->dMaxVal = -DBL_MAX;
range->dMinVal = DBL_MAX;
int32_t ret = fseek(pMemBuffer->file, pFlushInfo->startPageId * pMemBuffer->pageSize, SEEK_SET); break;
UNUSED(ret);
for (uint32_t j = 0; j < pFlushInfo->numOfPages; ++j) {
ret = (int32_t)fread(pPage, pMemBuffer->pageSize, 1, pMemBuffer->file);
UNUSED(ret);
assert(pPage->num > 0);
tColModelAppend(pDesc->pColumnModel, buffer, pPage->data, 0, (int32_t)pPage->num, (int32_t)pPage->num);
printf("id: %d count: %" PRIu64 "\n", j, buffer->num);
}
} }
taosTFree(pPage);
assert(buffer->num == pMemBuffer->fileMeta.numOfElemsInFile);
} }
}
// load data in pMemBuffer to buffer
tFilePagesItem *pListItem = pMemBuffer->pHead; static void resetPosInfo(SSlotInfo* pInfo) {
while (pListItem != NULL) { pInfo->size = 0;
tColModelAppend(pDesc->pColumnModel, buffer, pListItem->item.data, 0, (int32_t)pListItem->item.num, pInfo->pageId = -1;
(int32_t)pListItem->item.num); pInfo->data = NULL;
pListItem = pListItem->pNext;
}
tColDataQSort(pDesc, (int32_t)buffer->num, 0, (int32_t)buffer->num - 1, buffer->data, TSDB_ORDER_ASC);
pDesc->pColumnModel->capacity = oldCapacity; // restore value
return buffer;
} }
double findOnlyResult(tMemBucket *pMemBucket) { double findOnlyResult(tMemBucket *pMemBucket) {
assert(pMemBucket->numOfElems == 1); assert(pMemBucket->total == 1);
for (int32_t i = 0; i < pMemBucket->numOfSegs; ++i) { for (int32_t i = 0; i < pMemBucket->numOfSlots; ++i) {
tMemBucketSegment *pSeg = &pMemBucket->pSegs[i]; tMemBucketSlot *pSlot = &pMemBucket->pSlots[i];
if (pSeg->pBuffer) { if (pSlot->info.size == 0) {
for (int32_t j = 0; j < pSeg->numOfSlots; ++j) { continue;
tExtMemBuffer *pBuffer = pSeg->pBuffer[j]; }
if (pBuffer) {
assert(pBuffer->numOfTotalElems == 1); int32_t groupId = getGroupId(pMemBucket->numOfSlots, i, pMemBucket->times);
tFilePage *pPage = &pBuffer->pHead->item; SIDList list = getDataBufPagesIdList(pMemBucket->pBuffer, groupId);
if (pBuffer->numOfElemsInBuffer == 1) { assert(list->size == 1);
switch (pMemBucket->dataType) {
case TSDB_DATA_TYPE_INT: SPageInfo* pgInfo = (SPageInfo*) taosArrayGetP(list, 0);
return *(int32_t *)pPage->data; tFilePage* pPage = getResBufPage(pMemBucket->pBuffer, pgInfo->pageId);
case TSDB_DATA_TYPE_SMALLINT: assert(pPage->num == 1);
return *(int16_t *)pPage->data;
case TSDB_DATA_TYPE_TINYINT: switch (pMemBucket->type) {
return *(int8_t *)pPage->data; case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_BIGINT: return *(int32_t *)pPage->data;
return (double)(*(int64_t *)pPage->data); case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_DOUBLE: { return *(int16_t *)pPage->data;
double dv = GET_DOUBLE_VAL(pPage->data); case TSDB_DATA_TYPE_TINYINT:
//return *(double *)pPage->data; return *(int8_t *)pPage->data;
return dv; case TSDB_DATA_TYPE_BIGINT:
} return (double)(*(int64_t *)pPage->data);
case TSDB_DATA_TYPE_FLOAT: { case TSDB_DATA_TYPE_DOUBLE: {
float fv = GET_FLOAT_VAL(pPage->data); double dv = GET_DOUBLE_VAL(pPage->data);
//return *(float *)pPage->data; return dv;
return fv; }
} case TSDB_DATA_TYPE_FLOAT: {
default: float fv = GET_FLOAT_VAL(pPage->data);
return 0; return fv;
}
}
}
} }
default:
return 0;
} }
} }
return 0; return 0;
} }
void tBucketBigIntHash(tMemBucket *pBucket, void *value, int16_t *segIdx, int16_t *slotIdx) { int32_t tBucketBigIntHash(tMemBucket *pBucket, const void *value) {
int64_t v = *(int64_t *)value; int64_t v = *(int64_t *)value;
int32_t index = -1;
int32_t halfSlot = pBucket->numOfSlots >> 1;
// int32_t bits = 32;//bitsOfNumber(pBucket->numOfSlots) - 1;
if (pBucket->nRange.i64MaxVal == INT64_MIN) { if (pBucket->range.i64MaxVal == INT64_MIN) {
if (v >= 0) { if (v >= 0) {
*segIdx = ((v >> (64 - 9)) >> 6) + 8; index = (v >> (64 - 9)) + halfSlot;
*slotIdx = (v >> (64 - 9)) & 0x3F;
} else { // v<0 } else { // v<0
*segIdx = ((-v) >> (64 - 9)) >> 6; index = ((-v) >> (64 - 9));
*slotIdx = ((-v) >> (64 - 9)) & 0x3F; index = -index + (halfSlot - 1);
*segIdx = 7 - (*segIdx);
} }
return index;
} else { } else {
// todo hash for bigint and float and double // todo hash for bigint and float and double
int64_t span = pBucket->nRange.i64MaxVal - pBucket->nRange.i64MinVal; int64_t span = pBucket->range.i64MaxVal - pBucket->range.i64MinVal;
if (span < pBucket->nTotalSlots) { if (span < pBucket->numOfSlots) {
int32_t delta = (int32_t)(v - pBucket->nRange.i64MinVal); int32_t delta = (int32_t)(v - pBucket->range.i64MinVal);
*segIdx = delta / pBucket->nSlotsOfSeg; index = delta % pBucket->numOfSlots;
*slotIdx = delta % pBucket->nSlotsOfSeg;
} else { } else {
double x = (double)span / pBucket->nTotalSlots; double slotSpan = (double)span / pBucket->numOfSlots;
double posx = (v - pBucket->nRange.i64MinVal) / x; index = (v - pBucket->range.i64MinVal) / slotSpan;
if (v == pBucket->nRange.i64MaxVal) { if (v == pBucket->range.i64MaxVal) {
posx -= 1; index -= 1;
} }
*segIdx = ((int32_t)posx) / pBucket->nSlotsOfSeg;
*slotIdx = ((int32_t)posx) % pBucket->nSlotsOfSeg;
} }
return index;
} }
} }
// todo refactor to more generic // todo refactor to more generic
void tBucketIntHash(tMemBucket *pBucket, void *value, int16_t *segIdx, int16_t *slotIdx) { int32_t tBucketIntHash(tMemBucket *pBucket, const void *value) {
int32_t v = *(int32_t *)value; int32_t v = *(int32_t *)value;
int32_t index = -1;
if (pBucket->nRange.iMaxVal == INT32_MIN) { if (pBucket->range.iMaxVal == INT32_MIN) {
/* /*
* taking negative integer into consideration, * taking negative integer into consideration,
* there is only half of pBucket->segs available for non-negative integer * there is only half of pBucket->segs available for non-negative integer
*/ */
// int32_t numOfSlots = pBucket->nTotalSlots>>1; int32_t halfSlot = pBucket->numOfSlots >> 1;
// int32_t bits = bitsOfNumber(numOfSlots)-1; int32_t bits = 32;//bitsOfNumber(pBucket->numOfSlots) - 1;
if (v >= 0) { if (v >= 0) {
*segIdx = ((v >> (32 - 9)) >> 6) + 8; index = (v >> (bits - 9)) + halfSlot;
*slotIdx = (v >> (32 - 9)) & 0x3F; } else { // v < 0
} else { // v<0 index = ((-v) >> (32 - 9));
*segIdx = ((-v) >> (32 - 9)) >> 6; index = -index + (halfSlot - 1);
*slotIdx = ((-v) >> (32 - 9)) & 0x3F;
*segIdx = 7 - (*segIdx);
} }
return index;
} else { } else {
// divide a range of [iMinVal, iMaxVal] into 1024 buckets // divide a range of [iMinVal, iMaxVal] into 1024 buckets
int32_t span = pBucket->nRange.iMaxVal - pBucket->nRange.iMinVal; int32_t span = pBucket->range.iMaxVal - pBucket->range.iMinVal;
if (span < pBucket->nTotalSlots) { if (span < pBucket->numOfSlots) {
int32_t delta = v - pBucket->nRange.iMinVal; int32_t delta = v - pBucket->range.iMinVal;
*segIdx = delta / pBucket->nSlotsOfSeg; index = (delta % pBucket->numOfSlots);
*slotIdx = delta % pBucket->nSlotsOfSeg;
} else { } else {
double x = (double)span / pBucket->nTotalSlots; double slotSpan = (double)span / pBucket->numOfSlots;
double posx = (v - pBucket->nRange.iMinVal) / x; index = (v - pBucket->range.iMinVal) / slotSpan;
if (v == pBucket->nRange.iMaxVal) { if (v == pBucket->range.iMaxVal) {
posx -= 1; index -= 1;
} }
*segIdx = ((int32_t)posx) / pBucket->nSlotsOfSeg;
*slotIdx = ((int32_t)posx) % pBucket->nSlotsOfSeg;
} }
return index;
} }
} }
void tBucketDoubleHash(tMemBucket *pBucket, void *value, int16_t *segIdx, int16_t *slotIdx) { int32_t tBucketDoubleHash(tMemBucket *pBucket, const void *value) {
// double v = *(double *)value;
double v = GET_DOUBLE_VAL(value); double v = GET_DOUBLE_VAL(value);
int32_t index = -1;
if (pBucket->nRange.dMinVal == DBL_MAX) { if (pBucket->range.dMinVal == DBL_MAX) {
/* /*
* taking negative integer into consideration, * taking negative integer into consideration,
* there is only half of pBucket->segs available for non-negative integer * there is only half of pBucket->segs available for non-negative integer
*/ */
double x = DBL_MAX / (pBucket->nTotalSlots >> 1); double x = DBL_MAX / (pBucket->numOfSlots >> 1);
double posx = (v + DBL_MAX) / x; double posx = (v + DBL_MAX) / x;
*segIdx = ((int32_t)posx) / pBucket->nSlotsOfSeg; return ((int32_t)posx) % pBucket->numOfSlots;
*slotIdx = ((int32_t)posx) % pBucket->nSlotsOfSeg;
} else { } else {
// divide a range of [dMinVal, dMaxVal] into 1024 buckets // divide a range of [dMinVal, dMaxVal] into 1024 buckets
double span = pBucket->nRange.dMaxVal - pBucket->nRange.dMinVal; double span = pBucket->range.dMaxVal - pBucket->range.dMinVal;
if (span < pBucket->nTotalSlots) { if (span < pBucket->numOfSlots) {
int32_t delta = (int32_t)(v - pBucket->nRange.dMinVal); int32_t delta = (int32_t)(v - pBucket->range.dMinVal);
*segIdx = delta / pBucket->nSlotsOfSeg; index = (delta % pBucket->numOfSlots);
*slotIdx = delta % pBucket->nSlotsOfSeg;
} else { } else {
double x = span / pBucket->nTotalSlots; double slotSpan = span / pBucket->numOfSlots;
double posx = (v - pBucket->nRange.dMinVal) / x; index = (v - pBucket->range.dMinVal) / slotSpan;
if (v == pBucket->nRange.dMaxVal) { if (v == pBucket->range.dMaxVal) {
posx -= 1; index -= 1;
} }
*segIdx = ((int32_t)posx) / pBucket->nSlotsOfSeg;
*slotIdx = ((int32_t)posx) % pBucket->nSlotsOfSeg;
} }
if (*segIdx < 0 || *segIdx > 16 || *slotIdx < 0 || *slotIdx > 64) { if (index < 0 || index > pBucket->numOfSlots) {
uError("error in hash process. segment is: %d, slot id is: %d\n", *segIdx, *slotIdx); uError("error in hash process. slot id: %d", index);
} }
return index;
} }
} }
tMemBucket *tMemBucketCreate(int32_t totalSlots, int32_t nBufferSize, int16_t nElemSize, int16_t dataType, static __perc_hash_func_t getHashFunc(int32_t type) {
tOrderDescriptor *pDesc) { switch (type) {
tMemBucket *pBucket = (tMemBucket *)malloc(sizeof(tMemBucket));
pBucket->nTotalSlots = totalSlots;
pBucket->nSlotsOfSeg = 1 << 6; // 64 Segments, 16 slots each seg.
pBucket->dataType = dataType;
pBucket->nElemSize = nElemSize;
pBucket->pageSize = DEFAULT_PAGE_SIZE;
pBucket->numOfElems = 0;
pBucket->numOfSegs = pBucket->nTotalSlots / pBucket->nSlotsOfSeg;
pBucket->nTotalBufferSize = nBufferSize;
pBucket->maxElemsCapacity = pBucket->nTotalBufferSize / pBucket->nElemSize;
pBucket->numOfTotalPages = pBucket->nTotalBufferSize / pBucket->pageSize;
pBucket->numOfAvailPages = pBucket->numOfTotalPages;
pBucket->pSegs = NULL;
pBucket->pOrderDesc = pDesc;
switch (pBucket->dataType) {
case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_SMALLINT: case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_TINYINT: { case TSDB_DATA_TYPE_TINYINT: {
pBucket->nRange.iMinVal = INT32_MAX; return tBucketIntHash;
pBucket->nRange.iMaxVal = INT32_MIN;
pBucket->HashFunc = tBucketIntHash;
break;
}; };
case TSDB_DATA_TYPE_DOUBLE: case TSDB_DATA_TYPE_DOUBLE:
case TSDB_DATA_TYPE_FLOAT: { case TSDB_DATA_TYPE_FLOAT: {
pBucket->nRange.dMinVal = DBL_MAX; return tBucketDoubleHash;
pBucket->nRange.dMaxVal = -DBL_MAX;
pBucket->HashFunc = tBucketDoubleHash;
break;
}; };
case TSDB_DATA_TYPE_BIGINT: { case TSDB_DATA_TYPE_BIGINT: {
pBucket->nRange.i64MinVal = INT64_MAX; return tBucketBigIntHash;
pBucket->nRange.i64MaxVal = INT64_MIN;
pBucket->HashFunc = tBucketBigIntHash;
break;
}; };
default: { default: {
uError("MemBucket:%p,not support data type %d,failed", pBucket, pBucket->dataType);
taosTFree(pBucket);
return NULL; return NULL;
} }
} }
}
int32_t numOfCols = pDesc->pColumnModel->numOfCols; static void resetSlotInfo(tMemBucket* pBucket) {
if (numOfCols != 1) { for (int32_t i = 0; i < pBucket->numOfSlots; ++i) {
uError("MemBucket:%p,only consecutive data is allowed,invalid numOfCols:%d", pBucket, numOfCols); tMemBucketSlot* pSlot = &pBucket->pSlots[i];
taosTFree(pBucket);
return NULL; resetBoundingBox(&pSlot->range, pBucket->type);
resetPosInfo(&pSlot->info);
} }
}
SSchema* pSchema = getColumnModelSchema(pDesc->pColumnModel, 0); tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType) {
if (pSchema->type != dataType) { tMemBucket *pBucket = (tMemBucket *)calloc(1, sizeof(tMemBucket));
uError("MemBucket:%p,data type is not consistent,%d in schema, %d in param", pBucket, pSchema->type, dataType); if (pBucket == NULL) {
taosTFree(pBucket);
return NULL; return NULL;
} }
if (pBucket->numOfTotalPages < pBucket->nTotalSlots) { pBucket->numOfSlots = DEFAULT_NUM_OF_SLOT;
uWarn("MemBucket:%p,total buffer pages %d are not enough for all slots", pBucket, pBucket->numOfTotalPages); pBucket->bufPageSize = DEFAULT_PAGE_SIZE * 4; // 4k per page
}
pBucket->pSegs = (tMemBucketSegment *)malloc(pBucket->numOfSegs * sizeof(tMemBucketSegment)); pBucket->type = dataType;
pBucket->bytes = nElemSize;
pBucket->total = 0;
pBucket->times = 1;
for (int32_t i = 0; i < pBucket->numOfSegs; ++i) { pBucket->maxCapacity = 200000;
pBucket->pSegs[i].numOfSlots = pBucket->nSlotsOfSeg;
pBucket->pSegs[i].pBuffer = NULL; pBucket->elemPerPage = (pBucket->bufPageSize - sizeof(tFilePage))/pBucket->bytes;
pBucket->pSegs[i].pBoundingEntries = NULL; pBucket->comparFn = getKeyComparFunc(pBucket->type);
resetBoundingBox(&pBucket->range, pBucket->type);
pBucket->hashFunc = getHashFunc(pBucket->type);
if (pBucket->hashFunc == NULL) {
uError("MemBucket:%p, not support data type %d, failed", pBucket, pBucket->type);
free(pBucket);
return NULL;
} }
uDebug("MemBucket:%p,created,buffer size:%ld,elem size:%d", pBucket, pBucket->numOfTotalPages * DEFAULT_PAGE_SIZE, pBucket->pSlots = (tMemBucketSlot *)calloc(pBucket->numOfSlots, sizeof(tMemBucketSlot));
pBucket->nElemSize); if (pBucket->pSlots == NULL) {
free(pBucket);
return NULL;
}
resetSlotInfo(pBucket);
int32_t ret = createDiskbasedResultBuffer(&pBucket->pBuffer, pBucket->bytes, pBucket->bufPageSize, pBucket->bufPageSize * 512, NULL);
if (ret != TSDB_CODE_SUCCESS) {
tMemBucketDestroy(pBucket);
return NULL;
}
uDebug("MemBucket:%p, elem size:%d", pBucket, pBucket->bytes);
return pBucket; return pBucket;
} }
...@@ -326,81 +307,11 @@ void tMemBucketDestroy(tMemBucket *pBucket) { ...@@ -326,81 +307,11 @@ void tMemBucketDestroy(tMemBucket *pBucket) {
return; return;
} }
if (pBucket->pSegs) { destroyResultBuf(pBucket->pBuffer);
for (int32_t i = 0; i < pBucket->numOfSegs; ++i) { taosTFree(pBucket->pSlots);
tMemBucketSegment *pSeg = &(pBucket->pSegs[i]);
taosTFree(pSeg->pBoundingEntries);
if (pSeg->pBuffer == NULL || pSeg->numOfSlots == 0) {
continue;
}
for (int32_t j = 0; j < pSeg->numOfSlots; ++j) {
if (pSeg->pBuffer[j] != NULL) {
pSeg->pBuffer[j] = destoryExtMemBuffer(pSeg->pBuffer[j]);
}
}
taosTFree(pSeg->pBuffer);
}
}
taosTFree(pBucket->pSegs);
taosTFree(pBucket); taosTFree(pBucket);
} }
/*
* find the slots which accounts for largest proportion of total in-memory buffer
*/
static void tBucketGetMaxMemSlot(tMemBucket *pBucket, int16_t *segIdx, int16_t *slotIdx) {
*segIdx = -1;
*slotIdx = -1;
int32_t val = 0;
for (int32_t k = 0; k < pBucket->numOfSegs; ++k) {
tMemBucketSegment *pSeg = &pBucket->pSegs[k];
for (int32_t i = 0; i < pSeg->numOfSlots; ++i) {
if (pSeg->pBuffer == NULL || pSeg->pBuffer[i] == NULL) {
continue;
}
if (val < pSeg->pBuffer[i]->numOfInMemPages) {
val = pSeg->pBuffer[i]->numOfInMemPages;
*segIdx = k;
*slotIdx = i;
}
}
}
}
static void resetBoundingBox(tMemBucketSegment *pSeg, int32_t type) {
switch (type) {
case TSDB_DATA_TYPE_BIGINT: {
for (int32_t i = 0; i < pSeg->numOfSlots; ++i) {
pSeg->pBoundingEntries[i].i64MaxVal = INT64_MIN;
pSeg->pBoundingEntries[i].i64MinVal = INT64_MAX;
}
break;
};
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_TINYINT: {
for (int32_t i = 0; i < pSeg->numOfSlots; ++i) {
pSeg->pBoundingEntries[i].iMaxVal = INT32_MIN;
pSeg->pBoundingEntries[i].iMinVal = INT32_MAX;
}
break;
};
case TSDB_DATA_TYPE_DOUBLE:
case TSDB_DATA_TYPE_FLOAT: {
for (int32_t i = 0; i < pSeg->numOfSlots; ++i) {
pSeg->pBoundingEntries[i].dMaxVal = -DBL_MAX;
pSeg->pBoundingEntries[i].dMinVal = DBL_MAX;
}
break;
}
}
}
void tMemBucketUpdateBoundingBox(MinMaxEntry *r, char *data, int32_t dataType) { void tMemBucketUpdateBoundingBox(MinMaxEntry *r, char *data, int32_t dataType) {
switch (dataType) { switch (dataType) {
case TSDB_DATA_TYPE_INT: { case TSDB_DATA_TYPE_INT: {
...@@ -461,7 +372,6 @@ void tMemBucketUpdateBoundingBox(MinMaxEntry *r, char *data, int32_t dataType) { ...@@ -461,7 +372,6 @@ void tMemBucketUpdateBoundingBox(MinMaxEntry *r, char *data, int32_t dataType) {
break; break;
}; };
case TSDB_DATA_TYPE_FLOAT: { case TSDB_DATA_TYPE_FLOAT: {
// double val = *(float *)data;
double val = GET_FLOAT_VAL(data); double val = GET_FLOAT_VAL(data);
if (r->dMinVal > val) { if (r->dMinVal > val) {
...@@ -478,116 +388,45 @@ void tMemBucketUpdateBoundingBox(MinMaxEntry *r, char *data, int32_t dataType) { ...@@ -478,116 +388,45 @@ void tMemBucketUpdateBoundingBox(MinMaxEntry *r, char *data, int32_t dataType) {
} }
/* /*
* in memory bucket, we only accept the simple data consecutive put in a row/column * in memory bucket, we only accept data array list
* no column-model in this case.
*/ */
void tMemBucketPut(tMemBucket *pBucket, void *data, int32_t numOfRows) { void tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) {
pBucket->numOfElems += numOfRows; assert(pBucket != NULL && data != NULL && size > 0);
int16_t segIdx = 0, slotIdx = 0; pBucket->total += size;
for (int32_t i = 0; i < numOfRows; ++i) {
char *d = (char *)data + i * tDataTypeDesc[pBucket->dataType].nSize;
switch (pBucket->dataType) {
case TSDB_DATA_TYPE_SMALLINT: {
int32_t val = *(int16_t *)d;
(pBucket->HashFunc)(pBucket, &val, &segIdx, &slotIdx);
break;
}
case TSDB_DATA_TYPE_TINYINT: {
int32_t val = *(int8_t *)d;
(pBucket->HashFunc)(pBucket, &val, &segIdx, &slotIdx);
break;
}
case TSDB_DATA_TYPE_INT: {
int32_t val = *(int32_t *)d;
(pBucket->HashFunc)(pBucket, &val, &segIdx, &slotIdx);
break;
}
case TSDB_DATA_TYPE_BIGINT: {
int64_t val = *(int64_t *)d;
(pBucket->HashFunc)(pBucket, &val, &segIdx, &slotIdx);
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
// double val = *(double *)d;
double val = GET_DOUBLE_VAL(d);
(pBucket->HashFunc)(pBucket, &val, &segIdx, &slotIdx);
break;
}
case TSDB_DATA_TYPE_FLOAT: {
// double val = *(float *)d;
double val = GET_FLOAT_VAL(d);
(pBucket->HashFunc)(pBucket, &val, &segIdx, &slotIdx);
break;
}
}
tMemBucketSegment *pSeg = &pBucket->pSegs[segIdx]; int32_t bytes = pBucket->bytes;
if (pSeg->pBoundingEntries == NULL) {
pSeg->pBoundingEntries = (MinMaxEntry *)malloc(sizeof(MinMaxEntry) * pBucket->nSlotsOfSeg);
resetBoundingBox(pSeg, pBucket->dataType);
}
if (pSeg->pBuffer == NULL) { for (int32_t i = 0; i < size; ++i) {
pSeg->pBuffer = (tExtMemBuffer **)calloc(pBucket->nSlotsOfSeg, sizeof(void *)); char *d = (char *) data + i * bytes;
}
if (pSeg->pBuffer[slotIdx] == NULL) { int32_t slotIdx = (pBucket->hashFunc)(pBucket, d);
pSeg->pBuffer[slotIdx] = createExtMemBuffer(pBucket->numOfTotalPages * pBucket->pageSize, pBucket->nElemSize, assert(slotIdx >= 0);
pBucket->pageSize, pBucket->pOrderDesc->pColumnModel);
pSeg->pBuffer[slotIdx]->flushModel = SINGLE_APPEND_MODEL;
pBucket->pOrderDesc->pColumnModel->capacity = pSeg->pBuffer[slotIdx]->numOfElemsPerPage;
}
tMemBucketUpdateBoundingBox(&pSeg->pBoundingEntries[slotIdx], d, pBucket->dataType); tMemBucketSlot *pSlot = &pBucket->pSlots[slotIdx];
tMemBucketUpdateBoundingBox(&pSlot->range, d, pBucket->type);
// ensure available memory pages to allocate // ensure available memory pages to allocate
int16_t cseg = 0, cslot = 0; int32_t groupId = getGroupId(pBucket->numOfSlots, slotIdx, pBucket->times);
if (pBucket->numOfAvailPages == 0) { int32_t pageId = -1;
uDebug("MemBucket:%p,max avail size:%d, no avail memory pages,", pBucket, pBucket->numOfTotalPages);
tBucketGetMaxMemSlot(pBucket, &cseg, &cslot);
if (cseg == -1 || cslot == -1) {
uError("MemBucket:%p,failed to find appropriated avail buffer", pBucket);
return;
}
if (cseg != segIdx || cslot != slotIdx) {
pBucket->numOfAvailPages += pBucket->pSegs[cseg].pBuffer[cslot]->numOfInMemPages;
int32_t avail = pBucket->pSegs[cseg].pBuffer[cslot]->numOfInMemPages; if (pSlot->info.data == NULL || pSlot->info.data->num >= pBucket->elemPerPage) {
UNUSED(avail); if (pSlot->info.data != NULL) {
tExtMemBufferFlush(pBucket->pSegs[cseg].pBuffer[cslot]); assert(pSlot->info.data->num >= pBucket->elemPerPage && pSlot->info.size > 0);
uDebug("MemBucket:%p,seg:%d,slot:%d flushed to disk,new avail pages:%d", pBucket, cseg, cslot, // keep the pointer in memory
pBucket->numOfAvailPages); releaseResBufPage(pBucket->pBuffer, pSlot->info.data);
} else { pSlot->info.data = NULL;
uDebug("MemBucket:%p,failed to choose slot to flush to disk seg:%d,slot:%d", pBucket, cseg, cslot);
} }
}
int16_t consumedPgs = pSeg->pBuffer[slotIdx]->numOfInMemPages;
int16_t newPgs = tExtMemBufferPut(pSeg->pBuffer[slotIdx], d, 1); pSlot->info.data = getNewDataBuf(pBucket->pBuffer, groupId, &pageId);
/* pSlot->info.pageId = pageId;
* trigger 1. page re-allocation, to reduce the available pages }
* 2. page flushout, to increase the available pages
*/
pBucket->numOfAvailPages += (consumedPgs - newPgs);
}
}
void releaseBucket(tMemBucket *pMemBucket, int32_t segIdx, int32_t slotIdx) { memcpy(pSlot->info.data->data + pSlot->info.data->num * pBucket->bytes, d, pBucket->bytes);
if (segIdx < 0 || segIdx > pMemBucket->numOfSegs || slotIdx < 0) {
return;
}
tMemBucketSegment *pSeg = &pMemBucket->pSegs[segIdx]; pSlot->info.data->num += 1;
if (slotIdx < 0 || slotIdx >= pSeg->numOfSlots || pSeg->pBuffer[slotIdx] == NULL) { pSlot->info.size += 1;
return;
} }
pSeg->pBuffer[slotIdx] = destoryExtMemBuffer(pSeg->pBuffer[slotIdx]);
} }
//////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////
...@@ -595,54 +434,49 @@ static void findMaxMinValue(tMemBucket *pMemBucket, double *maxVal, double *minV ...@@ -595,54 +434,49 @@ static void findMaxMinValue(tMemBucket *pMemBucket, double *maxVal, double *minV
*minVal = DBL_MAX; *minVal = DBL_MAX;
*maxVal = -DBL_MAX; *maxVal = -DBL_MAX;
for (int32_t i = 0; i < pMemBucket->numOfSegs; ++i) { for (int32_t i = 0; i < pMemBucket->numOfSlots; ++i) {
tMemBucketSegment *pSeg = &pMemBucket->pSegs[i]; tMemBucketSlot *pSlot = &pMemBucket->pSlots[i];
if (pSeg->pBuffer == NULL) { if (pSlot->info.size == 0) {
continue; continue;
} }
switch (pMemBucket->dataType) {
switch (pMemBucket->type) {
case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_SMALLINT: case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_TINYINT: { case TSDB_DATA_TYPE_TINYINT: {
for (int32_t j = 0; j < pSeg->numOfSlots; ++j) { double minv = pSlot->range.iMinVal;
double minv = pSeg->pBoundingEntries[j].iMinVal; double maxv = pSlot->range.iMaxVal;
double maxv = pSeg->pBoundingEntries[j].iMaxVal;
if (*minVal > minv) { if (*minVal > minv) {
*minVal = minv; *minVal = minv;
} }
if (*maxVal < maxv) { if (*maxVal < maxv) {
*maxVal = maxv; *maxVal = maxv;
}
} }
break; break;
} }
case TSDB_DATA_TYPE_DOUBLE: case TSDB_DATA_TYPE_DOUBLE:
case TSDB_DATA_TYPE_FLOAT: { case TSDB_DATA_TYPE_FLOAT: {
for (int32_t j = 0; j < pSeg->numOfSlots; ++j) { double minv = pSlot->range.dMinVal;
double minv = pSeg->pBoundingEntries[j].dMinVal; double maxv = pSlot->range.dMaxVal;
double maxv = pSeg->pBoundingEntries[j].dMaxVal;
if (*minVal > minv) { if (*minVal > minv) {
*minVal = minv; *minVal = minv;
} }
if (*maxVal < maxv) { if (*maxVal < maxv) {
*maxVal = maxv; *maxVal = maxv;
}
} }
break; break;
} }
case TSDB_DATA_TYPE_BIGINT: { case TSDB_DATA_TYPE_BIGINT: {
for (int32_t j = 0; j < pSeg->numOfSlots; ++j) { double minv = (double)pSlot->range.i64MinVal;
double minv = (double)pSeg->pBoundingEntries[j].i64MinVal; double maxv = (double)pSlot->range.i64MaxVal;
double maxv = (double)pSeg->pBoundingEntries[j].i64MaxVal;
if (*minVal > minv) { if (*minVal > minv) {
*minVal = minv; *minVal = minv;
} }
if (*maxVal < maxv) { if (*maxVal < maxv) {
*maxVal = maxv; *maxVal = maxv;
}
} }
break; break;
} }
...@@ -650,20 +484,6 @@ static void findMaxMinValue(tMemBucket *pMemBucket, double *maxVal, double *minV ...@@ -650,20 +484,6 @@ static void findMaxMinValue(tMemBucket *pMemBucket, double *maxVal, double *minV
} }
} }
static MinMaxEntry getMinMaxEntryOfNearestSlotInNextSegment(tMemBucket *pMemBucket, int32_t segIdx) {
int32_t i = segIdx + 1;
while (i < pMemBucket->numOfSegs && pMemBucket->pSegs[i].numOfSlots == 0) ++i;
tMemBucketSegment *pSeg = &pMemBucket->pSegs[i];
assert(pMemBucket->numOfSegs > i && pMemBucket->pSegs[i].pBuffer != NULL);
i = 0;
while (i < pMemBucket->nSlotsOfSeg && pSeg->pBuffer[i] == NULL) ++i;
assert(i < pMemBucket->nSlotsOfSeg);
return pSeg->pBoundingEntries[i];
}
/* /*
* *
* now, we need to find the minimum value of the next slot for * now, we need to find the minimum value of the next slot for
...@@ -671,262 +491,198 @@ static MinMaxEntry getMinMaxEntryOfNearestSlotInNextSegment(tMemBucket *pMemBuck ...@@ -671,262 +491,198 @@ static MinMaxEntry getMinMaxEntryOfNearestSlotInNextSegment(tMemBucket *pMemBuck
* j is the last slot of current segment, we need to get the first * j is the last slot of current segment, we need to get the first
* slot of the next segment. * slot of the next segment.
*/ */
static MinMaxEntry getMinMaxEntryOfNextSlotWithData(tMemBucket *pMemBucket, int32_t segIdx, int32_t slotIdx) { static MinMaxEntry getMinMaxEntryOfNextSlotWithData(tMemBucket *pMemBucket, int32_t slotIdx) {
tMemBucketSegment *pSeg = &pMemBucket->pSegs[segIdx];
MinMaxEntry next;
if (slotIdx == pSeg->numOfSlots - 1) { // find next segment with data
return getMinMaxEntryOfNearestSlotInNextSegment(pMemBucket, segIdx);
} else {
int32_t j = slotIdx + 1; int32_t j = slotIdx + 1;
for (; j < pMemBucket->nSlotsOfSeg && pMemBucket->pSegs[segIdx].pBuffer[j] == 0; ++j) { while (j < pMemBucket->numOfSlots && (pMemBucket->pSlots[j].info.size == 0)) {
++j;
}
assert(j < pMemBucket->numOfSlots);
return pMemBucket->pSlots[j].range;
}
static bool isIdenticalData(tMemBucket *pMemBucket, int32_t index);
char *getFirstElemOfMemBuffer(tMemBucketSlot *pSeg, int32_t slotIdx, tFilePage *pPage);
static double getIdenticalDataVal(tMemBucket* pMemBucket, int32_t slotIndex) {
assert(isIdenticalData(pMemBucket, slotIndex));
tMemBucketSlot *pSlot = &pMemBucket->pSlots[slotIndex];
double finalResult = 0.0;
switch (pMemBucket->type) {
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_INT: {
finalResult = pSlot->range.iMinVal;
break;
}
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_DOUBLE: {
finalResult = pSlot->range.dMinVal;
break;
}; };
if (j == pMemBucket->nSlotsOfSeg) { // current slot has no available case TSDB_DATA_TYPE_BIGINT: {
// slot,try next segment finalResult = pSlot->range.i64MinVal;
return getMinMaxEntryOfNearestSlotInNextSegment(pMemBucket, segIdx); break;
} else {
next = pSeg->pBoundingEntries[slotIdx + 1];
assert(pSeg->pBuffer[slotIdx + 1] != NULL);
} }
} }
return next; return finalResult;
} }
bool isIdenticalData(tMemBucket *pMemBucket, int32_t segIdx, int32_t slotIdx);
char *getFirstElemOfMemBuffer(tMemBucketSegment *pSeg, int32_t slotIdx, tFilePage *pPage);
double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction) { double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction) {
int32_t num = 0; int32_t num = 0;
for (int32_t i = 0; i < pMemBucket->numOfSegs; ++i) { for (int32_t i = 0; i < pMemBucket->numOfSlots; ++i) {
tMemBucketSegment *pSeg = &pMemBucket->pSegs[i]; tMemBucketSlot *pSlot = &pMemBucket->pSlots[i];
for (int32_t j = 0; j < pSeg->numOfSlots; ++j) { if (pSlot->info.size == 0) {
if (pSeg->pBuffer == NULL || pSeg->pBuffer[j] == NULL) { continue;
continue; }
}
// required value in current slot // required value in current slot
if (num < (count + 1) && num + pSeg->pBuffer[j]->numOfTotalElems >= (count + 1)) { if (num < (count + 1) && num + pSlot->info.size >= (count + 1)) {
if (pSeg->pBuffer[j]->numOfTotalElems + num == (count + 1)) { if (pSlot->info.size + num == (count + 1)) {
/* /*
* now, we need to find the minimum value of the next slot for interpolating the percentile value * now, we need to find the minimum value of the next slot for interpolating the percentile value
* j is the last slot of current segment, we need to get the first slot of the next segment. * j is the last slot of current segment, we need to get the first slot of the next segment.
* */
*/ MinMaxEntry next = getMinMaxEntryOfNextSlotWithData(pMemBucket, i);
MinMaxEntry next = getMinMaxEntryOfNextSlotWithData(pMemBucket, i, j);
double maxOfThisSlot = 0;
double maxOfThisSlot = 0; double minOfNextSlot = 0;
double minOfNextSlot = 0; switch (pMemBucket->type) {
switch (pMemBucket->dataType) { case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_SMALLINT: case TSDB_DATA_TYPE_TINYINT: {
case TSDB_DATA_TYPE_TINYINT: { maxOfThisSlot = pSlot->range.iMaxVal;
maxOfThisSlot = pSeg->pBoundingEntries[j].iMaxVal; minOfNextSlot = next.iMinVal;
minOfNextSlot = next.iMinVal; break;
break;
};
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_DOUBLE: {
maxOfThisSlot = pSeg->pBoundingEntries[j].dMaxVal;
minOfNextSlot = next.dMinVal;
break;
};
case TSDB_DATA_TYPE_BIGINT: {
maxOfThisSlot = (double)pSeg->pBoundingEntries[j].i64MaxVal;
minOfNextSlot = (double)next.i64MinVal;
break;
}
}; };
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_DOUBLE: {
maxOfThisSlot = pSlot->range.dMaxVal;
minOfNextSlot = next.dMinVal;
break;
};
case TSDB_DATA_TYPE_BIGINT: {
maxOfThisSlot = (double)pSlot->range.i64MaxVal;
minOfNextSlot = (double)next.i64MinVal;
break;
}
};
assert(minOfNextSlot > maxOfThisSlot); assert(minOfNextSlot > maxOfThisSlot);
double val = (1 - fraction) * maxOfThisSlot + fraction * minOfNextSlot; double val = (1 - fraction) * maxOfThisSlot + fraction * minOfNextSlot;
return val; return val;
} }
if (pSeg->pBuffer[j]->numOfTotalElems <= pMemBucket->maxElemsCapacity) {
// data in buffer and file are merged together to be processed.
tFilePage *buffer = loadIntoBucketFromDisk(pMemBucket, i, j, pMemBucket->pOrderDesc);
int32_t currentIdx = count - num;
char * thisVal = buffer->data + pMemBucket->nElemSize * currentIdx;
char * nextVal = thisVal + pMemBucket->nElemSize;
double td = 1.0, nd = 1.0;
switch (pMemBucket->dataType) {
case TSDB_DATA_TYPE_SMALLINT: {
td = *(int16_t *)thisVal;
nd = *(int16_t *)nextVal;
break;
}
case TSDB_DATA_TYPE_TINYINT: {
td = *(int8_t *)thisVal;
nd = *(int8_t *)nextVal;
break;
}
case TSDB_DATA_TYPE_INT: {
td = *(int32_t *)thisVal;
nd = *(int32_t *)nextVal;
break;
};
case TSDB_DATA_TYPE_FLOAT: {
// td = *(float *)thisVal;
// nd = *(float *)nextVal;
td = GET_FLOAT_VAL(thisVal);
nd = GET_FLOAT_VAL(nextVal);
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
// td = *(double *)thisVal;
td = GET_DOUBLE_VAL(thisVal);
// nd = *(double *)nextVal;
nd = GET_DOUBLE_VAL(nextVal);
break;
}
case TSDB_DATA_TYPE_BIGINT: {
td = (double)*(int64_t *)thisVal;
nd = (double)*(int64_t *)nextVal;
break;
}
}
double val = (1 - fraction) * td + fraction * nd;
taosTFree(buffer);
return val;
} else { // incur a second round bucket split
if (isIdenticalData(pMemBucket, i, j)) {
tExtMemBuffer *pMemBuffer = pSeg->pBuffer[j];
tFilePage *pPage = (tFilePage *)malloc(pMemBuffer->pageSize);
char *thisVal = getFirstElemOfMemBuffer(pSeg, j, pPage);
double finalResult = 0.0;
switch (pMemBucket->dataType) {
case TSDB_DATA_TYPE_SMALLINT: {
finalResult = *(int16_t *)thisVal;
break;
}
case TSDB_DATA_TYPE_TINYINT: {
finalResult = *(int8_t *)thisVal;
break;
}
case TSDB_DATA_TYPE_INT: {
finalResult = *(int32_t *)thisVal;
break;
};
case TSDB_DATA_TYPE_FLOAT: {
// finalResult = *(float *)thisVal;
finalResult = GET_FLOAT_VAL(thisVal);
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
// finalResult = *(double *)thisVal;
finalResult = GET_DOUBLE_VAL(thisVal);
break;
}
case TSDB_DATA_TYPE_BIGINT: {
finalResult = (double)(*(int64_t *)thisVal);
break;
}
}
free(pPage);
return finalResult;
}
uDebug("MemBucket:%p,start second round bucketing", pMemBucket); if (pSlot->info.size <= pMemBucket->maxCapacity) {
// data in buffer and file are merged together to be processed.
tFilePage *buffer = loadDataFromFilePage(pMemBucket, i);
int32_t currentIdx = count - num;
if (pSeg->pBuffer[j]->numOfElemsInBuffer != 0) { char *thisVal = buffer->data + pMemBucket->bytes * currentIdx;
uDebug("MemBucket:%p,flush %d pages to disk, clear status", pMemBucket, pSeg->pBuffer[j]->numOfInMemPages); char *nextVal = thisVal + pMemBucket->bytes;
pMemBucket->numOfAvailPages += pSeg->pBuffer[j]->numOfInMemPages; double td = 1.0, nd = 1.0;
tExtMemBufferFlush(pSeg->pBuffer[j]); switch (pMemBucket->type) {
case TSDB_DATA_TYPE_SMALLINT: {
td = *(int16_t *)thisVal;
nd = *(int16_t *)nextVal;
break;
} }
case TSDB_DATA_TYPE_TINYINT: {
tExtMemBuffer *pMemBuffer = pSeg->pBuffer[j]; td = *(int8_t *)thisVal;
pSeg->pBuffer[j] = NULL; nd = *(int8_t *)nextVal;
break;
// release all
for (int32_t tt = 0; tt < pMemBucket->numOfSegs; ++tt) {
tMemBucketSegment *pSeg = &pMemBucket->pSegs[tt];
for (int32_t ttx = 0; ttx < pSeg->numOfSlots; ++ttx) {
if (pSeg->pBuffer && pSeg->pBuffer[ttx]) {
pSeg->pBuffer[ttx] = destoryExtMemBuffer(pSeg->pBuffer[ttx]);
}
}
} }
case TSDB_DATA_TYPE_INT: {
pMemBucket->nRange.i64MaxVal = pSeg->pBoundingEntries->i64MaxVal; td = *(int32_t *)thisVal;
pMemBucket->nRange.i64MinVal = pSeg->pBoundingEntries->i64MinVal; nd = *(int32_t *)nextVal;
pMemBucket->numOfElems = 0; break;
};
for (int32_t tt = 0; tt < pMemBucket->numOfSegs; ++tt) { case TSDB_DATA_TYPE_FLOAT: {
tMemBucketSegment *pSeg = &pMemBucket->pSegs[tt]; td = GET_FLOAT_VAL(thisVal);
for (int32_t ttx = 0; ttx < pSeg->numOfSlots; ++ttx) { nd = GET_FLOAT_VAL(nextVal);
if (pSeg->pBoundingEntries) { break;
resetBoundingBox(pSeg, pMemBucket->dataType); }
} case TSDB_DATA_TYPE_DOUBLE: {
} td = GET_DOUBLE_VAL(thisVal);
nd = GET_DOUBLE_VAL(nextVal);
break;
}
case TSDB_DATA_TYPE_BIGINT: {
td = (double)*(int64_t *)thisVal;
nd = (double)*(int64_t *)nextVal;
break;
} }
}
tFilePage *pPage = (tFilePage *)malloc(pMemBuffer->pageSize); double val = (1 - fraction) * td + fraction * nd;
taosTFree(buffer);
tFlushoutInfo *pFlushInfo = &pMemBuffer->fileMeta.flushoutData.pFlushoutInfo[0]; return val;
assert(pFlushInfo->numOfPages == pMemBuffer->fileMeta.nFileSize); } else { // incur a second round bucket split
if (isIdenticalData(pMemBucket, i)) {
return getIdenticalDataVal(pMemBucket, i);
}
int32_t ret = fseek(pMemBuffer->file, pFlushInfo->startPageId * pMemBuffer->pageSize, SEEK_SET); // try next round
UNUSED(ret); pMemBucket->times += 1;
uDebug("MemBucket:%p, start next round data bucketing, time:%d", pMemBucket, pMemBucket->times);
for (uint32_t jx = 0; jx < pFlushInfo->numOfPages; ++jx) { pMemBucket->range = pSlot->range;
size_t sz = fread(pPage, pMemBuffer->pageSize, 1, pMemBuffer->file); pMemBucket->total = 0;
if (sz != pMemBuffer->pageSize) {
uError("MemBucket:%p, read tmp file %s failed", pMemBucket, pMemBuffer->path);
} else {
tMemBucketPut(pMemBucket, pPage->data, (int32_t)pPage->num);
}
}
fclose(pMemBuffer->file); resetSlotInfo(pMemBucket);
if (unlink(pMemBuffer->path) != 0) {
uError("MemBucket:%p, remove tmp file %s failed", pMemBucket, pMemBuffer->path);
}
taosTFree(pMemBuffer);
taosTFree(pPage);
return getPercentileImpl(pMemBucket, count - num, fraction); int32_t groupId = getGroupId(pMemBucket->numOfSlots, i, pMemBucket->times - 1);
} SIDList list = getDataBufPagesIdList(pMemBucket->pBuffer, groupId);
} else { assert(list->size > 0);
num += pSeg->pBuffer[j]->numOfTotalElems;
for (int32_t f = 0; f < list->size; ++f) {
SPageInfo *pgInfo = *(SPageInfo **)taosArrayGet(list, f);
tFilePage *pg = getResBufPage(pMemBucket->pBuffer, pgInfo->pageId);
tMemBucketPut(pMemBucket, pg->data, (int32_t)pg->num);
releaseResBufPageInfo(pMemBucket->pBuffer, pgInfo);
}
return getPercentileImpl(pMemBucket, count - num, fraction);
} }
} else {
num += pSlot->info.size;
} }
} }
return 0; return 0;
} }
double getPercentile(tMemBucket *pMemBucket, double percent) { double getPercentile(tMemBucket *pMemBucket, double percent) {
if (pMemBucket->numOfElems == 0) { if (pMemBucket->total == 0) {
return 0.0; return 0.0;
} }
if (pMemBucket->numOfElems == 1) { // return the only element // if only one elements exists, return it
if (pMemBucket->total == 1) {
return findOnlyResult(pMemBucket); return findOnlyResult(pMemBucket);
} }
percent = fabs(percent); percent = fabs(percent);
// validate the parameters // find the min/max value, no need to scan all data in bucket
if (fabs(percent - 100.0) < DBL_EPSILON || (percent < DBL_EPSILON)) { if (fabs(percent - 100.0) < DBL_EPSILON || (percent < DBL_EPSILON)) {
double minx = 0, maxx = 0; double minx = 0, maxx = 0;
/*
* find the min/max value, no need to scan all data in bucket
*/
findMaxMinValue(pMemBucket, &maxx, &minx); findMaxMinValue(pMemBucket, &maxx, &minx);
return fabs(percent - 100) < DBL_EPSILON ? maxx : minx; return fabs(percent - 100) < DBL_EPSILON ? maxx : minx;
} }
double percentVal = (percent * (pMemBucket->numOfElems - 1)) / ((double)100.0); double percentVal = (percent * (pMemBucket->total - 1)) / ((double)100.0);
int32_t orderIdx = (int32_t)percentVal; int32_t orderIdx = (int32_t)percentVal;
// do put data by using buckets // do put data by using buckets
...@@ -934,19 +690,18 @@ double getPercentile(tMemBucket *pMemBucket, double percent) { ...@@ -934,19 +690,18 @@ double getPercentile(tMemBucket *pMemBucket, double percent) {
} }
/* /*
* check if data in one slot are all identical * check if data in one slot are all identical only need to compare with the bounding box
* only need to compare with the bounding box
*/ */
bool isIdenticalData(tMemBucket *pMemBucket, int32_t segIdx, int32_t slotIdx) { bool isIdenticalData(tMemBucket *pMemBucket, int32_t index) {
tMemBucketSegment *pSeg = &pMemBucket->pSegs[segIdx]; tMemBucketSlot *pSeg = &pMemBucket->pSlots[index];
if (pMemBucket->dataType == TSDB_DATA_TYPE_INT || pMemBucket->dataType == TSDB_DATA_TYPE_BIGINT || if (pMemBucket->type == TSDB_DATA_TYPE_INT || pMemBucket->type == TSDB_DATA_TYPE_BIGINT ||
pMemBucket->dataType == TSDB_DATA_TYPE_SMALLINT || pMemBucket->dataType == TSDB_DATA_TYPE_TINYINT) { pMemBucket->type == TSDB_DATA_TYPE_SMALLINT || pMemBucket->type == TSDB_DATA_TYPE_TINYINT) {
return pSeg->pBoundingEntries[slotIdx].i64MinVal == pSeg->pBoundingEntries[slotIdx].i64MaxVal; return pSeg->range.i64MinVal == pSeg->range.i64MaxVal;
} }
if (pMemBucket->dataType == TSDB_DATA_TYPE_FLOAT || pMemBucket->dataType == TSDB_DATA_TYPE_DOUBLE) { if (pMemBucket->type == TSDB_DATA_TYPE_FLOAT || pMemBucket->type == TSDB_DATA_TYPE_DOUBLE) {
return fabs(pSeg->pBoundingEntries[slotIdx].dMaxVal - pSeg->pBoundingEntries[slotIdx].dMinVal) < DBL_EPSILON; return fabs(pSeg->range.dMaxVal - pSeg->range.dMinVal) < DBL_EPSILON;
} }
return false; return false;
...@@ -956,24 +711,24 @@ bool isIdenticalData(tMemBucket *pMemBucket, int32_t segIdx, int32_t slotIdx) { ...@@ -956,24 +711,24 @@ bool isIdenticalData(tMemBucket *pMemBucket, int32_t segIdx, int32_t slotIdx) {
* get the first element of one slot into memory. * get the first element of one slot into memory.
* if no data of current slot in memory, load it from disk * if no data of current slot in memory, load it from disk
*/ */
char *getFirstElemOfMemBuffer(tMemBucketSegment *pSeg, int32_t slotIdx, tFilePage *pPage) { char *getFirstElemOfMemBuffer(tMemBucketSlot *pSeg, int32_t slotIdx, tFilePage *pPage) {
tExtMemBuffer *pMemBuffer = pSeg->pBuffer[slotIdx]; // STSBuf *pMemBuffer = pSeg->pBuffer[slotIdx];
char * thisVal = NULL; char *thisVal = NULL;
if (pSeg->pBuffer[slotIdx]->numOfElemsInBuffer != 0) { // if (pSeg->pBuffer[slotIdx]->numOfTotal != 0) {
thisVal = pSeg->pBuffer[slotIdx]->pHead->item.data; //// thisVal = pSeg->pBuffer[slotIdx]->pHead->item.data;
} else { // } else {
/* // /*
* no data in memory, load one page into memory // * no data in memory, load one page into memory
*/ // */
tFlushoutInfo *pFlushInfo = &pMemBuffer->fileMeta.flushoutData.pFlushoutInfo[0]; // tFlushoutInfo *pFlushInfo = &pMemBuffer->fileMeta.flushoutData.pFlushoutInfo[0];
assert(pFlushInfo->numOfPages == pMemBuffer->fileMeta.nFileSize); // assert(pFlushInfo->numOfPages == pMemBuffer->fileMeta.nFileSize);
int32_t ret; // int32_t ret;
ret = fseek(pMemBuffer->file, pFlushInfo->startPageId * pMemBuffer->pageSize, SEEK_SET); // ret = fseek(pMemBuffer->file, pFlushInfo->startPageId * pMemBuffer->pageSize, SEEK_SET);
UNUSED(ret); // UNUSED(ret);
size_t sz = fread(pPage, pMemBuffer->pageSize, 1, pMemBuffer->file); // size_t sz = fread(pPage, pMemBuffer->pageSize, 1, pMemBuffer->file);
UNUSED(sz); // UNUSED(sz);
thisVal = pPage->data; // thisVal = pPage->data;
} // }
return thisVal; return thisVal;
} }
...@@ -272,6 +272,7 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen ...@@ -272,6 +272,7 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen
void* pData = (ptNode != NULL)? (*ptNode)->data:NULL; void* pData = (ptNode != NULL)? (*ptNode)->data:NULL;
assert((int64_t)pData != 0x40); assert((int64_t)pData != 0x40);
if (pData != NULL) { if (pData != NULL) {
atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1); atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1);
uDebug("cache:%s, key:%p, %p is retrieved from cache, refcnt:%d", pCacheObj->name, key, pData, T_REF_VAL_GET(*ptNode)); uDebug("cache:%s, key:%p, %p is retrieved from cache, refcnt:%d", pCacheObj->name, key, pData, T_REF_VAL_GET(*ptNode));
...@@ -498,7 +499,7 @@ void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) { ...@@ -498,7 +499,7 @@ void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
STrashElem *pElem = calloc(1, sizeof(STrashElem)); STrashElem *pElem = calloc(1, sizeof(STrashElem));
pElem->pData = pNode; pElem->pData = pNode;
pElem->prev = NULL; pElem->prev = NULL;
pNode->inTrashCan = true; pNode->inTrashCan = true;
pNode->pTNodeHeader = pElem; pNode->pTNodeHeader = pElem;
......
...@@ -482,15 +482,31 @@ sql insert into um2 using m2 tags(9) values(1000001, 10)(2000000, 20); ...@@ -482,15 +482,31 @@ sql insert into um2 using m2 tags(9) values(1000001, 10)(2000000, 20);
sql_error select count(*) from m1,m2 where m1.a=m2.a and m1.ts=m2.ts; sql_error select count(*) from m1,m2 where m1.a=m2.a and m1.ts=m2.ts;
#empty table join test, add for no result join test print ====> empty table/empty super-table join test, add for no result join test
sql create database ux1; sql create database ux1;
sql use ux1; sql use ux1;
sql create table m1(ts timestamp, k int) tags(a binary(12), b int); sql create table m1(ts timestamp, k int) tags(a binary(12), b int);
sql create table tm0 using m1 tags('abc', 1); sql create table tm0 using m1 tags('abc', 1);
sql create table m2(ts timestamp, k int) tags(a int, b binary(12)); sql create table m2(ts timestamp, k int) tags(a int, b binary(12));
sql select count(*) from m1, m2 where m1.ts=m2.ts and m1.b=m2.a;
if $rows != 0 then
return -1
endi
sql create table tm2 using m2 tags(2, 'abc'); sql create table tm2 using m2 tags(2, 'abc');
sql select count(*) from tm0, tm2 where tm0.ts=tm2.ts; sql select count(*) from tm0, tm2 where tm0.ts=tm2.ts;
sql select count(*) from m1, m2 where m1.ts=m2.ts and m1.b=m2.a if $rows != 0 then
return -1
endi
sql select count(*) from m1, m2 where m1.ts=m2.ts and m1.b=m2.a;
if $rows != 0 then
return -1
endi
sql drop table tm2;
sql select count(*) from m1, m2 where m1.ts=m2.ts and m1.b=m2.a;
sql drop database ux1; sql drop database ux1;
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册