提交 bfcdbf76 编写于 作者: H Haojun Liao

[td-225] fix bugs in interp query

上级 eae2cd3e
...@@ -318,9 +318,10 @@ typedef struct SSqlObj { ...@@ -318,9 +318,10 @@ typedef struct SSqlObj {
char freed : 4; char freed : 4;
char listed : 4; char listed : 4;
tsem_t rspSem; tsem_t rspSem;
pthread_mutex_t inUse; // make sure that one connection can only be utilized by one thread/process
SSqlCmd cmd; SSqlCmd cmd;
SSqlRes res; SSqlRes res;
uint8_t numOfSubs; uint16_t numOfSubs;
struct SSqlObj **pSubs; struct SSqlObj **pSubs;
struct SSqlObj * prev, *next; struct SSqlObj * prev, *next;
} SSqlObj; } SSqlObj;
......
...@@ -653,7 +653,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -653,7 +653,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->order = htons(pQueryInfo->order.order); pQueryMsg->order = htons(pQueryInfo->order.order);
pQueryMsg->orderColId = htons(pQueryInfo->order.orderColId); pQueryMsg->orderColId = htons(pQueryInfo->order.orderColId);
pQueryMsg->fillType = htons(pQueryInfo->fillType); pQueryMsg->fillType = htons(pQueryInfo->fillType);
pQueryMsg->limit = htobe64(pQueryInfo->limit.limit); pQueryMsg->limit = htobe64(pQueryInfo->limit.limit);
pQueryMsg->offset = htobe64(pQueryInfo->limit.offset); pQueryMsg->offset = htobe64(pQueryInfo->limit.offset);
pQueryMsg->numOfCols = htons(taosArrayGetSize(pQueryInfo->colList)); pQueryMsg->numOfCols = htons(taosArrayGetSize(pQueryInfo->colList));
...@@ -1845,17 +1845,6 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { ...@@ -1845,17 +1845,6 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
size_t size = 0; size_t size = 0;
STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size); STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
#if 0
// if current table is created according to super table, get the table meta of super table
if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
char id[TSDB_TABLE_ID_LEN + 1] = {0};
strncpy(id, pMetaMsg->stableId, TSDB_TABLE_ID_LEN);
// NOTE: if the table meta of super table is not cached at client side yet, the pSTable is NULL
pTableMeta->pSTable = taosCacheAcquireByName(tscCacheHandle, id);
}
#endif
// todo add one more function: taosAddDataIfNotExists(); // todo add one more function: taosAddDataIfNotExists();
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
...@@ -1978,7 +1967,7 @@ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) { ...@@ -1978,7 +1967,7 @@ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
pSql->res.code = TSDB_CODE_SUCCESS; pSql->res.code = TSDB_CODE_SUCCESS;
pSql->res.numOfTotal = i; pSql->res.numOfTotal = i;
tscTrace("%p load multi-metermeta resp complete num:%d", pSql, pSql->res.numOfTotal); tscTrace("%p load multi-metermeta resp from complete num:%d", pSql, pSql->res.numOfTotal);
#endif #endif
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -123,6 +123,13 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con ...@@ -123,6 +123,13 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
tsem_init(&pSql->rspSem, 0, 0); tsem_init(&pSql->rspSem, 0, 0);
pthread_mutexattr_t mutexattr;
memset(&mutexattr, 0, sizeof(pthread_mutexattr_t));
pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_RECURSIVE_NP);
pthread_mutex_init(&pSql->inUse, &mutexattr);
pthread_mutexattr_destroy(&mutexattr);
pObj->pSql = pSql; pObj->pSql = pSql;
pObj->pDnodeConn = pDnodeConn; pObj->pDnodeConn = pDnodeConn;
...@@ -284,12 +291,23 @@ int taos_query(TAOS *taos, const char *sqlstr) { ...@@ -284,12 +291,23 @@ int taos_query(TAOS *taos, const char *sqlstr) {
} }
SSqlObj* pSql = pObj->pSql; SSqlObj* pSql = pObj->pSql;
SSqlCmd* pCmd = &pSql->cmd;
// now this TAOS_CONN object is in use by one thread
pthread_mutex_lock(&pSql->inUse);
size_t sqlLen = strlen(sqlstr); size_t sqlLen = strlen(sqlstr);
doAsyncQuery(pObj, pSql, waitForQueryRsp, taos, sqlstr, sqlLen); doAsyncQuery(pObj, pSql, waitForQueryRsp, taos, sqlstr, sqlLen);
// wait for the callback function to post the semaphore // wait for the callback function to post the semaphore
sem_wait(&pSql->rspSem); tsem_wait(&pSql->rspSem);
if (pCmd->command != TSDB_SQL_SELECT &&
pCmd->command != TSDB_SQL_SHOW &&
pCmd->command != TSDB_SQL_DESCRIBE_TABLE) {
pthread_mutex_unlock(&pSql->inUse);
}
return pSql->res.code; return pSql->res.code;
} }
...@@ -525,7 +543,7 @@ int taos_select_db(TAOS *taos, const char *db) { ...@@ -525,7 +543,7 @@ int taos_select_db(TAOS *taos, const char *db) {
return taos_query(taos, sql); return taos_query(taos, sql);
} }
void taos_free_result_imp(TAOS_RES *res, int keepCmd) { void taos_free_result(TAOS_RES *res) {
if (res == NULL) return; if (res == NULL) return;
SSqlObj *pSql = (SSqlObj *)res; SSqlObj *pSql = (SSqlObj *)res;
...@@ -536,26 +554,24 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) { ...@@ -536,26 +554,24 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) {
if (pSql->signature != pSql) return; if (pSql->signature != pSql) return;
STscObj* pObj = pSql->pTscObj;
if (pRes == NULL || pRes->qhandle == 0) { if (pRes == NULL || pRes->qhandle == 0) {
/* Query rsp is not received from vnode, so the qhandle is NULL */ /* Query rsp is not received from vnode, so the qhandle is NULL */
tscTrace("%p qhandle is null, abort free, fp:%p", pSql, pSql->fp); tscTrace("%p qhandle is null, abort free, fp:%p", pSql, pSql->fp);
STscObj* pTscObj = pSql->pTscObj;
if (pTscObj->pSql != pSql) { // The semaphore can not be changed while freeing async sub query objects.
if (pObj->pSql != pSql) {
tscTrace("%p SqlObj is freed by app", pSql); tscTrace("%p SqlObj is freed by app", pSql);
tscFreeSqlObj(pSql); tscFreeSqlObj(pSql);
} else { } else {
if (keepCmd) { tscPartiallyFreeSqlObj(pSql);
tscFreeSqlResult(pSql); pthread_mutex_unlock(&pSql->inUse); // now this TAOS_CONN can be used by other threads
} else {
tscPartiallyFreeSqlObj(pSql);
}
} }
return; return;
} }
// set freeFlag to 1 in retrieve message if there are un-retrieved results // set freeFlag to 1 in retrieve message if there are un-retrieved results data in node
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
if (pQueryInfo == NULL) { if (pQueryInfo == NULL) {
tscPartiallyFreeSqlObj(pSql); tscPartiallyFreeSqlObj(pSql);
...@@ -600,19 +616,12 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) { ...@@ -600,19 +616,12 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) {
tscFreeSqlObj(pSql); tscFreeSqlObj(pSql);
tscTrace("%p sql result is freed by app", pSql); tscTrace("%p sql result is freed by app", pSql);
} else { } else {
if (keepCmd) { tscPartiallyFreeSqlObj(pSql);
tscFreeSqlResult(pSql); tscTrace("%p sql result is freed by app", pSql);
tscTrace("%p sql result is freed while sql command is kept", pSql);
} else {
tscPartiallyFreeSqlObj(pSql);
tscTrace("%p sql result is freed by app", pSql);
}
} }
} }
} }
void taos_free_result(TAOS_RES *res) { taos_free_result_imp(res, 0); }
// todo should not be used in async query // todo should not be used in async query
int taos_errno(TAOS *taos) { int taos_errno(TAOS *taos) {
STscObj *pObj = (STscObj *)taos; STscObj *pObj = (STscObj *)taos;
......
...@@ -1767,11 +1767,12 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void ...@@ -1767,11 +1767,12 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
pNewQueryInfo->limit = pQueryInfo->limit; pNewQueryInfo->limit = pQueryInfo->limit;
pNewQueryInfo->slimit = pQueryInfo->slimit; pNewQueryInfo->slimit = pQueryInfo->slimit;
pNewQueryInfo->order = pQueryInfo->order; pNewQueryInfo->order = pQueryInfo->order;
pNewQueryInfo->clauseLimit = pQueryInfo->clauseLimit; pNewQueryInfo->tsBuf = NULL;
pNewQueryInfo->pTableMetaInfo = NULL; pNewQueryInfo->fillType = pQueryInfo->fillType;
pNewQueryInfo->fillVal = NULL; pNewQueryInfo->fillVal = NULL;
pNewQueryInfo->clauseLimit = pQueryInfo->clauseLimit;
pNewQueryInfo->numOfTables = 0; pNewQueryInfo->numOfTables = 0;
pNewQueryInfo->tsBuf = NULL; pNewQueryInfo->pTableMetaInfo = NULL;
pNewQueryInfo->groupbyExpr = pQueryInfo->groupbyExpr; pNewQueryInfo->groupbyExpr = pQueryInfo->groupbyExpr;
if (pQueryInfo->groupbyExpr.columnInfo != NULL) { if (pQueryInfo->groupbyExpr.columnInfo != NULL) {
......
...@@ -4525,57 +4525,59 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -4525,57 +4525,59 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
size_t numOfGroups = taosArrayGetSize(pQInfo->groupInfo.pGroupList); size_t numOfGroups = taosArrayGetSize(pQInfo->groupInfo.pGroupList);
if (isPointInterpoQuery(pQuery)) { if (isPointInterpoQuery(pQuery) || isFirstLastRowQuery(pQuery)) {
resetCtxOutputBuf(pRuntimeEnv); resetCtxOutputBuf(pRuntimeEnv);
assert(pQuery->limit.offset == 0 && pQuery->limit.limit != 0); assert(pQuery->limit.offset == 0 && pQuery->limit.limit != 0);
while (pQInfo->groupIndex < numOfGroups) { while (pQInfo->groupIndex < numOfGroups) {
SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, pQInfo->groupIndex); SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, pQInfo->groupIndex);
qTrace("QInfo:%p last_row query on group:%d, total group:%d, current group:%d", pQInfo, pQInfo->groupIndex,
numOfGroups);
STsdbQueryCond cond = {
.twindow = pQuery->window,
.colList = pQuery->colList,
.order = pQuery->order.order,
.numOfCols = pQuery->numOfCols,
};
SArray *g1 = taosArrayInit(1, POINTER_BYTES);
SArray *tx = taosArrayClone(group);
taosArrayPush(g1, &tx);
STableGroupInfo gp = {.numOfTables = taosArrayGetSize(tx), .pGroupList = g1};
// include only current table
if (pRuntimeEnv->pQueryHandle != NULL) {
tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle);
pRuntimeEnv->pQueryHandle = NULL;
}
if (isFirstLastRowQuery(pQuery)) { if (isFirstLastRowQuery(pQuery)) {
qTrace("QInfo:%p last_row query on group:%d, total group:%d, current group:%d", pQInfo, pQInfo->groupIndex,
numOfGroups);
STsdbQueryCond cond = {
.twindow = pQuery->window,
.colList = pQuery->colList,
.order = pQuery->order.order,
.numOfCols = pQuery->numOfCols,
};
SArray *g1 = taosArrayInit(1, POINTER_BYTES);
SArray *tx = taosArrayClone(group);
taosArrayPush(g1, &tx);
STableGroupInfo gp = {.numOfTables = taosArrayGetSize(tx), .pGroupList = g1};
// include only current table
if (pRuntimeEnv->pQueryHandle != NULL) {
tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle);
pRuntimeEnv->pQueryHandle = NULL;
}
pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(pQInfo->tsdb, &cond, &gp); pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(pQInfo->tsdb, &cond, &gp);
} else {
initCtxOutputBuf(pRuntimeEnv); pRuntimeEnv->pQueryHandle = tsdbQueryRowsInExternalWindow(pQInfo->tsdb, &cond, &gp);
setTagVal(pRuntimeEnv, (STableId*) taosArrayGet(tx, 0), pQInfo->tsdb); }
// here we simply set the first table as current table initCtxOutputBuf(pRuntimeEnv);
pQuery->current = ((SGroupItem*) taosArrayGet(group, 0))->info; setTagVal(pRuntimeEnv, (STableId*) taosArrayGet(tx, 0), pQInfo->tsdb);
scanAllDataBlocks(pRuntimeEnv, pQuery->current->lastKey);
// here we simply set the first table as current table
int64_t numOfRes = getNumOfResult(pRuntimeEnv); pQuery->current = ((SGroupItem*) taosArrayGet(group, 0))->info;
if (numOfRes > 0) { scanAllDataBlocks(pRuntimeEnv, pQuery->current->lastKey);
pQuery->rec.rows += numOfRes;
forwardCtxOutputBuf(pRuntimeEnv, numOfRes); int64_t numOfRes = getNumOfResult(pRuntimeEnv);
} if (numOfRes > 0) {
pQuery->rec.rows += numOfRes;
skipResults(pRuntimeEnv); forwardCtxOutputBuf(pRuntimeEnv, numOfRes);
pQInfo->groupIndex += 1;
// enable execution for next table, when handling the projection query
enableExecutionForNextTable(pRuntimeEnv);
} }
skipResults(pRuntimeEnv);
pQInfo->groupIndex += 1;
// enable execution for next table, when handling the projection query
enableExecutionForNextTable(pRuntimeEnv);
} }
} else { } else {
/* /*
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册