提交 61622a73 编写于 作者: H Haojun Liao

[td-225] fix jdbc memory leaks.

上级 91275a73
......@@ -327,13 +327,12 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getResultSetImp(
STscObj *pObj = pSql->pTscObj;
if (tscIsUpdateQuery(pSql)) {
// taos_free_result(pSql); // free result here
jniDebug("jobj:%p, conn:%p, no resultset, %p", jobj, pObj, (void *)tres);
return 0;
jniDebug("jobj:%p, conn:%p, update query, no resultset, %p", jobj, pObj, (void *)tres);
} else {
jniDebug("jobj:%p, conn:%p, get resultset, %p", jobj, pObj, (void *)tres);
return tres;
}
return tres;
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_isUpdateQueryImp(JNIEnv *env, jobject jobj, jlong con,
......
......@@ -111,6 +111,8 @@ public class TSDBJNIConnector {
* @throws SQLException
*/
public long executeQuery(String sql) throws SQLException {
// close previous result set if the user forgets to invoke the
// free method to close previous result set.
if (!this.isResultsetClosed) {
freeResultSet(taosResultSetPointer);
}
......@@ -123,21 +125,20 @@ public class TSDBJNIConnector {
this.freeResultSet(pSql);
throw new SQLException(TSDBConstants.WrapErrMsg("Unsupported encoding"));
}
int code = this.getErrCode(pSql);
if (code != 0) {
affectedRows = -1;
String err_msg = this.getErrMsg(pSql);
String msg = this.getErrMsg(pSql);
this.freeResultSet(pSql);
throw new SQLException(TSDBConstants.WrapErrMsg(err_msg), "", code);
throw new SQLException(TSDBConstants.WrapErrMsg(msg), "", code);
}
// Try retrieving result set for the executed SQL using the current connection pointer. If the executed
// SQL is a DML/DDL which doesn't return a result set, then taosResultSetPointer should be 0L. Otherwise,
// taosResultSetPointer should be a non-zero value.
// Try retrieving result set for the executed SQL using the current connection pointer.
taosResultSetPointer = this.getResultSetImp(this.taos, pSql);
if (taosResultSetPointer != TSDBConstants.JNI_NULL_POINTER) {
isResultsetClosed = false;
}
isResultsetClosed = (taosResultSetPointer == TSDBConstants.JNI_NULL_POINTER);
return pSql;
}
......@@ -171,6 +172,10 @@ public class TSDBJNIConnector {
private native long getResultSetImp(long connection, long pSql);
public boolean isUpdateQuery(long pSql) {
return isUpdateQueryImp(this.taos, pSql) == 1? true:false;
}
private native long isUpdateQueryImp(long connection, long pSql);
/**
......@@ -180,13 +185,14 @@ public class TSDBJNIConnector {
int res = TSDBConstants.JNI_SUCCESS;
if (result != taosResultSetPointer && taosResultSetPointer != TSDBConstants.JNI_NULL_POINTER) {
throw new RuntimeException("Invalid result set pointer");
} else if (taosResultSetPointer != TSDBConstants.JNI_NULL_POINTER) {
}
if (taosResultSetPointer != TSDBConstants.JNI_NULL_POINTER) {
res = this.freeResultSetImp(this.taos, result);
isResultsetClosed = true; // reset resultSetPointer to 0 after freeResultSetImp() return
taosResultSetPointer = TSDBConstants.JNI_NULL_POINTER;
} else {
isResultsetClosed = true;
}
isResultsetClosed = true;
return res;
}
......
......@@ -51,6 +51,8 @@ public class TSDBStatement implements Statement {
if (isClosed) {
throw new SQLException("Invalid method call on a closed statement.");
}
// TODO make sure it is not a update query
pSql = this.connecter.executeQuery(sql);
long resultSetPointer = this.connecter.getResultSet();
......@@ -58,33 +60,40 @@ public class TSDBStatement implements Statement {
if (resultSetPointer == TSDBConstants.JNI_CONNECTION_NULL) {
this.connecter.freeResultSet(pSql);
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
} else if (resultSetPointer == TSDBConstants.JNI_NULL_POINTER) {
// create/insert/update/del/alter
}
// create/insert/update/delete/alter
if (resultSetPointer == TSDBConstants.JNI_NULL_POINTER) {
this.connecter.freeResultSet(pSql);
return null;
} else {
}
if (!this.connecter.isUpdateQuery(pSql)) {
return new TSDBResultSet(this.connecter, resultSetPointer);
} else {
this.connecter.freeResultSet(pSql);
return null;
}
}
public int executeUpdate(String sql) throws SQLException {
if (isClosed) {
throw new SQLException("Invalid method call on a closed statement.");
}
// TODO check if current query is update query
pSql = this.connecter.executeQuery(sql);
long resultSetPointer = this.connecter.getResultSet();
if (resultSetPointer == TSDBConstants.JNI_CONNECTION_NULL) {
this.connecter.freeResultSet(pSql);
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
} else if (resultSetPointer != TSDBConstants.JNI_NULL_POINTER) {
this.connecter.freeResultSet();
throw new SQLException("The executed SQL is not a DML or a DDL");
} else {
int num = this.connecter.getAffectedRows(pSql);
this.connecter.freeResultSet(pSql);
return num;
}
int num = this.connecter.getAffectedRows(pSql);
this.connecter.freeResultSet(pSql);
return num;
}
public String getErrorMsg(long pSql) {
......
......@@ -33,6 +33,17 @@ struct SColumnFilterElem;
typedef bool (*__filter_func_t)(struct SColumnFilterElem* pFilter, char* val1, char* val2);
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
typedef struct SPosInfo {
int32_t pageId:20;
int32_t rowId:12;
} SPosInfo;
typedef struct SGroupResInfo {
int32_t groupId;
int32_t numOfDataPages;
SPosInfo pos;
} SGroupResInfo;
typedef struct SSqlGroupbyExpr {
int16_t tableIndex;
SArray* columnInfo; // SArray<SColIndex>, group by columns information
......@@ -41,11 +52,6 @@ typedef struct SSqlGroupbyExpr {
int16_t orderType; // order by type: asc/desc
} SSqlGroupbyExpr;
typedef struct SPosInfo {
int32_t pageId:20;
int32_t rowId:12;
} SPosInfo;
typedef struct SWindowResult {
SPosInfo pos; // Position of current result in disk-based output buffer
uint16_t numOfRows; // number of rows of current time window
......@@ -190,18 +196,15 @@ typedef struct SQInfo {
STableGroupInfo tableGroupInfo; // table id list < only includes the STable list>
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
SQueryRuntimeEnv runtimeEnv;
int32_t groupIndex;
int32_t offset; // offset in group result set of subgroup, todo refactor
SArray* arrTableIdInfo;
int32_t groupIndex;
/*
* the query is executed position on which meter of the whole list.
* when the index reaches the last one of the list, it means the query is completed.
* We later may refactor to remove this attribution by using another flag to denote
* whether a multimeter query is completed or not.
*/
int32_t tableIndex;
int32_t numOfGroupResultPages;
SGroupResInfo groupResInfo;
void* pBuf; // allocated buffer for STableQueryInfo, sizeof(STableQueryInfo)*numOfTables;
pthread_mutex_t lock; // used to synchronize the rsp/query threads
......
......@@ -155,9 +155,9 @@ static bool hasMainOutput(SQuery *pQuery);
static void buildTagQueryResult(SQInfo *pQInfo);
static int32_t setAdditionalInfo(SQInfo *pQInfo, void *pTable, STableQueryInfo *pTableQueryInfo);
static int32_t flushFromResultBuf(SQInfo *pQInfo);
static int32_t flushFromResultBuf(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo);
bool doFilterData(SQuery *pQuery, int32_t elemPos) {
bool doFilterData(SQuery *pQuery, int32_t elemPos) {
for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) {
SSingleColumnFilterInfo *pFilterInfo = &pQuery->pFilterInfo[k];
......@@ -231,7 +231,7 @@ void updateNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOfRes) {
}
}
static int32_t getGroupResultId(int32_t groupIndex) {
static UNUSED_FUNC int32_t getGroupResultId(int32_t groupIndex) {
int32_t base = 20000000;
return base + (groupIndex * 10000);
}
......@@ -2686,11 +2686,12 @@ int32_t mergeIntoGroupResult(SQInfo *pQInfo) {
break;
}
assert(pQInfo->numOfGroupResultPages == 0);
assert(pQInfo->groupResInfo.numOfDataPages == 0);
qDebug("QInfo:%p no result in group %d, continue", pQInfo, pQInfo->groupIndex - 1);
}
if (pQInfo->groupIndex == numOfGroups && pQInfo->offset == pQInfo->numOfGroupResultPages) {
SGroupResInfo* info = &pQInfo->groupResInfo;
if (pQInfo->groupIndex == numOfGroups && info->pos.pageId == info->numOfDataPages) {
SET_STABLE_QUERY_OVER(pQInfo);
}
......@@ -2703,8 +2704,12 @@ int32_t mergeIntoGroupResult(SQInfo *pQInfo) {
}
void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
if (pQInfo->offset == pQInfo->numOfGroupResultPages) {
pQInfo->numOfGroupResultPages = 0;
SGroupResInfo* pGroupResInfo = &pQInfo->groupResInfo;
// all results have been return to client, try next group
if (pGroupResInfo->pos.pageId == pGroupResInfo->numOfDataPages) {
pGroupResInfo->numOfDataPages = 0;
pGroupResInfo->pos.rowId = 0;
// current results of group has been sent to client, try next group
if (mergeIntoGroupResult(pQInfo) != TSDB_CODE_SUCCESS) {
......@@ -2713,7 +2718,7 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
// check if all results has been sent to client
int32_t numOfGroup = (int32_t)(GET_NUM_OF_TABLEGROUP(pQInfo));
if (pQInfo->numOfGroupResultPages == 0 && pQInfo->groupIndex == numOfGroup) {
if (pGroupResInfo->numOfDataPages == 0 && pQInfo->groupIndex == numOfGroup) {
SET_STABLE_QUERY_OVER(pQInfo);
return;
}
......@@ -2722,30 +2727,50 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
SQueryRuntimeEnv * pRuntimeEnv = &pQInfo->runtimeEnv;
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
int32_t id = getGroupResultId(pQInfo->groupIndex - 1);
SIDList list = getDataBufPagesIdList(pResultBuf, pQInfo->offset + id);
int32_t size = (int32_t)(taosArrayGetSize(list));
int32_t id = pQInfo->groupResInfo.groupId;
SIDList list = getDataBufPagesIdList(pResultBuf, id);
int32_t offset = 0;
for (int32_t j = 0; j < size; ++j) {
int32_t numOfCopiedRows = 0;
size_t size = taosArrayGetSize(list);
assert(size == pGroupResInfo->numOfDataPages);
bool done = false;
for (int32_t j = pGroupResInfo->pos.pageId; j < size; ++j) {
SPageInfo* pi = *(SPageInfo**) taosArrayGet(list, j);
tFilePage *pData = getResBufPage(pResultBuf, pi->pageId);
tFilePage* pData = getResBufPage(pResultBuf, pi->pageId);
assert(pData->num > 0 && pData->num <= pRuntimeEnv->numOfRowsPerPage && pGroupResInfo->pos.rowId < pData->num);
int32_t numOfRes = pData->num - pGroupResInfo->pos.rowId;
if (numOfRes > pQuery->rec.capacity - offset) {
numOfCopiedRows = pQuery->rec.capacity - offset;
pGroupResInfo->pos.rowId += numOfCopiedRows;
done = true;
} else {
numOfCopiedRows = pData->num;
pGroupResInfo->pos.pageId += 1;
pGroupResInfo->pos.rowId = 0;
}
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes;
char * pDest = pQuery->sdata[i]->data;
memcpy(pDest + offset * bytes, pData->data + pRuntimeEnv->offset[i] * pData->num, (size_t)(bytes * pData->num));
memcpy(pDest + offset * bytes, pData->data + pRuntimeEnv->offset[i] * pRuntimeEnv->numOfRowsPerPage,
(size_t)bytes * numOfCopiedRows);
}
offset += (int32_t)pData->num;
offset += numOfCopiedRows;
if (done) {
break;
}
}
assert(pQuery->rec.rows == 0);
pQuery->rec.rows += offset;
pQInfo->offset += 1;
}
int64_t getNumOfResultWindowRes(SQuery *pQuery, SWindowResult *pWindowRes) {
......@@ -2791,23 +2816,38 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
// todo opt for the case of one table per group
int32_t numOfTables = 0;
SIDList pageList = NULL;
int32_t tid = -1;
for (int32_t i = 0; i < size; ++i) {
STableQueryInfo *item = taosArrayGetP(pGroup, i);
SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, TSDB_TABLEID(item->pTable)->tid);
pageList = list;
tid = TSDB_TABLEID(item->pTable)->tid;
if (taosArrayGetSize(list) > 0 && item->windowResInfo.size > 0) {
pTableList[numOfTables] = item;
numOfTables += 1;
pTableList[numOfTables++] = item;
}
}
// there is no data in current group
if (numOfTables == 0) {
taosTFree(posList);
taosTFree(pTableList);
assert(pQInfo->numOfGroupResultPages == 0);
return 0;
} else if (numOfTables == 1) { // no need to merge results since only one table in each group
taosTFree(posList);
taosTFree(pTableList);
SGroupResInfo* pGroupResInfo = &pQInfo->groupResInfo;
pGroupResInfo->numOfDataPages = taosArrayGetSize(pageList);
pGroupResInfo->groupId = tid;
pGroupResInfo->pos.pageId = 0;
pGroupResInfo->pos.rowId = 0;
return pGroupResInfo->numOfDataPages;
}
SCompSupporter cs = {pTableList, posList, pQInfo};
......@@ -2824,6 +2864,8 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
setWindowResultInfo(pResultInfo, pQuery, pRuntimeEnv->stableQuery, buf);
resetMergeResultBuf(pQuery, pRuntimeEnv->pCtx, pResultInfo);
pQInfo->groupResInfo.groupId = getGroupResultId(pQInfo->groupIndex);
// todo add windowRes iterator
int64_t lastTimestamp = -1;
int64_t startt = taosGetTimestampMs();
......@@ -2868,7 +2910,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
doMerge(pRuntimeEnv, ts, pWindowRes, true);
} else { // copy data to disk buffer
if (buffer[0]->num == pQuery->rec.capacity) {
if (flushFromResultBuf(pQInfo) != TSDB_CODE_SUCCESS) {
if (flushFromResultBuf(pRuntimeEnv, &pQInfo->groupResInfo) != TSDB_CODE_SUCCESS) {
return -1;
}
......@@ -2905,7 +2947,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
}
if (buffer[0]->num != 0) { // there are data in buffer
if (flushFromResultBuf(pQInfo) != TSDB_CODE_SUCCESS) {
if (flushFromResultBuf(pRuntimeEnv, &pQInfo->groupResInfo) != TSDB_CODE_SUCCESS) {
qError("QInfo:%p failed to flush data into temp file, abort query", pQInfo);
taosTFree(pTree);
......@@ -2929,16 +2971,14 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
taosTFree(posList);
taosTFree(pTree);
pQInfo->offset = 0;
taosTFree(pResultInfo);
taosTFree(buf);
return pQInfo->numOfGroupResultPages;
return pQInfo->groupResInfo.numOfDataPages;
}
int32_t flushFromResultBuf(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
int32_t flushFromResultBuf(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo) {
SQuery *pQuery = pRuntimeEnv->pQuery;
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
......@@ -2946,32 +2986,32 @@ int32_t flushFromResultBuf(SQInfo *pQInfo) {
int32_t pageId = -1;
int32_t capacity = pResultBuf->numOfRowsPerPage;
int32_t remain = (int32_t)pQuery->sdata[0]->num;
int32_t remain = (int32_t) pQuery->sdata[0]->num;
int32_t offset = 0;
while (remain > 0) {
int32_t r = remain;
if (r > capacity) {
r = capacity;
}
int32_t rows = (remain > capacity)? capacity:remain;
assert(rows > 0);
int32_t id = getGroupResultId(pQInfo->groupIndex) + pQInfo->numOfGroupResultPages;
tFilePage *buf = getNewDataBuf(pResultBuf, id, &pageId);
// get the output buffer page
tFilePage *buf = getNewDataBuf(pResultBuf, pGroupResInfo->groupId, &pageId);
buf->num = rows;
// pagewise copy to dest buffer
// pagewisely copy to dest buffer
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes;
buf->num = r;
memcpy(buf->data + pRuntimeEnv->offset[i] * buf->num, ((char *)pQuery->sdata[i]->data) + offset * bytes,
buf->num * bytes);
char* output = buf->data + pRuntimeEnv->offset[i] * pRuntimeEnv->numOfRowsPerPage;
char* src = ((char *) pQuery->sdata[i]->data) + offset * bytes;
memcpy(output, src, buf->num * bytes);
}
offset += r;
remain -= r;
offset += rows;
remain -= rows;
pGroupResInfo->numOfDataPages += 1;
}
pQInfo->numOfGroupResultPages += 1;
return TSDB_CODE_SUCCESS;
}
......@@ -3773,27 +3813,27 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResInfo *pResultInfo, int32_
step = -1;
}
SGroupResInfo* pGroupResInfo = &pQInfo->groupResInfo;
for (int32_t i = startIdx; (i < totalSet) && (i >= 0); i += step) {
if (result[i].numOfRows == 0) {
pQInfo->offset = 0;
pQInfo->groupIndex += 1;
pGroupResInfo->pos.rowId = 0;
continue;
}
assert(pQInfo->offset <= 1);
int32_t numOfRowsToCopy = result[i].numOfRows - pQInfo->offset;
int32_t oldOffset = pQInfo->offset;
int32_t numOfRowsToCopy = result[i].numOfRows - pGroupResInfo->pos.rowId;
int32_t oldOffset = pGroupResInfo->pos.rowId;
/*
* current output space is not enough to keep all the result data of this group, only copy partial results
* to SQuery object's result buffer
* current output space is not enough to accommodate all data of this page, only partial results
* will be copied to SQuery object's result buffer
*/
if (numOfRowsToCopy > pQuery->rec.capacity - numOfResult) {
numOfRowsToCopy = (int32_t)pQuery->rec.capacity - numOfResult;
pQInfo->offset += numOfRowsToCopy;
numOfRowsToCopy = (int32_t) pQuery->rec.capacity - numOfResult;
pGroupResInfo->pos.rowId += numOfRowsToCopy;
} else {
pQInfo->offset = 0;
pGroupResInfo->pos.rowId = 0;
pQInfo->groupIndex += 1;
}
......@@ -4004,12 +4044,15 @@ static void queryCostStatis(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQueryCostInfo *pSummary = &pRuntimeEnv->summary;
// add the merge time
pSummary->elapsedTime += pSummary->firstStageMergeTime;
qDebug("QInfo:%p :cost summary: elapsed time:%"PRId64" us, first merge:%"PRId64" us, total blocks:%d, "
"load block statis:%d, load data block:%d, total rows:%"PRId64 ", check rows:%"PRId64,
pQInfo, pSummary->elapsedTime, pSummary->firstStageMergeTime, pSummary->totalBlocks, pSummary->loadBlockStatis,
pSummary->loadBlocks, pSummary->totalRows, pSummary->totalCheckedRows);
qDebug("QInfo:%p :cost summary: internal size:%"PRId64", numOfWin:%"PRId64, pQInfo, pSummary->internalSupSize,
qDebug("QInfo:%p :cost summary: internal size:%"PRId64"B, numOfWin:%"PRId64, pQInfo, pSummary->internalSupSize,
pSummary->numOfTimeWindows);
}
......
......@@ -2564,7 +2564,7 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
tsdbDestroyHelper(&pQueryHandle->rhelper);
SIOCostSummary* pCost = &pQueryHandle->cost;
tsdbDebug("%p :io-cost summary: statis-info:%"PRId64"us, datablock:%" PRId64"us, check data:%"PRId64"us, %p",
tsdbDebug("%p :io-cost summary: statis-info:%"PRId64" us, datablock:%" PRId64" us, check data:%"PRId64" us, %p",
pQueryHandle, pCost->statisInfoLoadTime, pCost->blockLoadTime, pCost->checkForNextTime, pQueryHandle->qinfo);
taosTFree(pQueryHandle);
......
......@@ -133,12 +133,18 @@ endi
if $data74 != -4.000000000 then
return -1
endi
## fill(value) + group by
sql select max(c1), max(c2), max(c3), max(c4), max(c5) from $stb where ts >= $ts0 and ts <= $tsu interval(5m) fill(value, -1, -2, -3, -4, -5, -6, -7, -8) group by t1
$val = $rowNum * 2
print $rowNum, $val
$val = $val - 1
$val = $val * $tbNum
if $rows != $val then
print ==================== $val
if $rows != 190 then
print expect 190, actual:$rows
return -1
endi
if $data06 != 0 then
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册