提交 086a3a4a 编写于 作者: H Haojun Liao

[td-428]

上级 e56cd51a
......@@ -24,7 +24,6 @@
#include "tscSubquery.h"
int tsParseInsertSql(SSqlObj *pSql);
int taos_query_imp(STscObj* pObj, SSqlObj* pSql);
////////////////////////////////////////////////////////////////////////////////
// functions for normal statement preparation
......
......@@ -41,7 +41,7 @@
#define COLUMN_INDEX_VALIDE(index) (((index).tableIndex >= 0) && ((index).columnIndex >= TSDB_TBNAME_COLUMN_INDEX))
#define TBNAME_LIST_SEP ","
typedef struct SColumnList {
typedef struct SColumnList { // todo refactor
int32_t num;
SColumnIndex ids[TSDB_MAX_COLUMNS];
} SColumnList;
......@@ -1517,12 +1517,14 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExpr
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
// count tag is equalled to count(tbname)
if (index.columnIndex >= tscGetNumOfColumns(pTableMetaInfo->pTableMeta)) {
bool isTag = false;
if (index.columnIndex >= tscGetNumOfColumns(pTableMetaInfo->pTableMeta) || index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
index.columnIndex = TSDB_TBNAME_COLUMN_INDEX;
isTag = true;
}
int32_t size = tDataTypeDesc[TSDB_DATA_TYPE_BIGINT].nSize;
pExpr = tscSqlExprAppend(pQueryInfo, functionID, &index, TSDB_DATA_TYPE_BIGINT, size, size, false);
pExpr = tscSqlExprAppend(pQueryInfo, functionID, &index, TSDB_DATA_TYPE_BIGINT, size, size, isTag);
}
} else { // count(*) is equalled to count(primary_timestamp_key)
index = (SColumnIndex){0, PRIMARYKEY_TIMESTAMP_COL_INDEX};
......@@ -1543,10 +1545,13 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExpr
tscColumnListInsert(pQueryInfo->colList, &(ids.ids[i]));
}
}
SColumnIndex tsCol = {.tableIndex = index.tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
tscColumnListInsert(pQueryInfo->colList, &tsCol);
// the time stamp may be always needed
if (index.tableIndex > 0 && index.tableIndex < tscGetNumOfColumns(pTableMetaInfo->pTableMeta)) {
SColumnIndex tsCol = {.tableIndex = index.tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
tscColumnListInsert(pQueryInfo->colList, &tsCol);
}
return TSDB_CODE_SUCCESS;
}
case TK_SUM:
......
......@@ -605,7 +605,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
tscError("%p failed to malloc for query msg", pSql);
return -1;
return -1; // todo add test for this
}
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
......
......@@ -213,48 +213,6 @@ void taos_close(TAOS *taos) {
}
}
int taos_query_imp(STscObj *pObj, SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
pRes->numOfRows = 1;
pRes->numOfTotal = 0;
pRes->numOfClauseTotal = 0;
pCmd->curSql = NULL;
if (NULL != pCmd->pTableList) {
taosHashCleanup(pCmd->pTableList);
pCmd->pTableList = NULL;
}
tscDump("%p pObj:%p, SQL: %s", pSql, pObj, pSql->sqlstr);
pRes->code = (uint8_t)tsParseSql(pSql, false);
/*
* set the qhandle to 0 before return in order to erase the qhandle value assigned in the previous successful query.
* If qhandle is NOT set 0, the function of taos_free_result() will send message to server by calling tscProcessSql()
* to free connection, which may cause segment fault, when the parse phrase is not even successfully executed.
*/
pRes->qhandle = 0;
if (pRes->code == TSDB_CODE_SUCCESS) {
tscDoQuery(pSql);
}
if (pRes->code == TSDB_CODE_SUCCESS) {
tscTrace("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(pObj), pObj);
} else {
tscError("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(pObj), pObj);
}
if (pRes->code != TSDB_CODE_SUCCESS) {
tscPartiallyFreeSqlObj(pSql);
}
return pRes->code;
}
void waitForQueryRsp(void *param, TAOS_RES *tres, int code) {
assert(tres != NULL);
......
......@@ -79,8 +79,14 @@ bool tscQueryOnSTable(SSqlCmd* pCmd) {
bool tscQueryTags(SQueryInfo* pQueryInfo) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
int32_t functId = tscSqlExprGet(pQueryInfo, i)->functionId;
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
int32_t functId = pExpr->functionId;
// "select count(tbname)" query
if (functId == TSDB_FUNC_COUNT && pExpr->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) {
continue;
}
if (functId != TSDB_FUNC_TAGPRJ && functId != TSDB_FUNC_TID_TAG) {
return false;
}
......@@ -208,13 +214,14 @@ bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) {
return true;
}
// not order by timestamp projection query on super table
bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) {
if (!tscIsProjectionQueryOnSTable(pQueryInfo, tableIndex)) {
return false;
}
// order by columnIndex exists, not a non-ordered projection query
return pQueryInfo->order.orderColId < 0;
return pQueryInfo->order.orderColId < 0 && pQueryInfo->order.orderColId != TSDB_TBNAME_COLUMN_INDEX;
}
bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) {
......@@ -984,7 +991,6 @@ static SSqlExpr* doBuildSqlExpr(SQueryInfo* pQueryInfo, int16_t functionId, SCol
pExpr->uid = pTableMetaInfo->pTableMeta->uid;
}
return pExpr;
}
......
......@@ -858,9 +858,9 @@ void shellGetGrantInfo(void *con) {
char sql[] = "show grants";
TAOS_RES* pSql = taos_query(con, sql);
int code = taos_errno(pSql);
result = taos_query(con, sql);
int code = taos_errno(result);
if (code != TSDB_CODE_SUCCESS) {
if (code == TSDB_CODE_OPS_NOT_SUPPORT) {
fprintf(stdout, "Server is Community Edition, version is %s\n\n", taos_get_server_info(con));
......
......@@ -16,21 +16,19 @@
#include "os.h"
#include "shell.h"
#include "tsclient.h"
#include "tutil.h"
TAOS_RES* con;
pthread_t pid;
// TODO: IMPLEMENT INTERRUPT HANDLER.
void interruptHandler(int signum) {
#ifdef LINUX
taos_stop_query(con);
if (con != NULL) {
taos_stop_query(result);
if (result != NULL) {
/*
* we need to free result in async model, in order to avoid free
* results while the master thread is waiting for server response.
*/
tscQueueAsyncFreeResult(con);
tscQueueAsyncFreeResult(result);
}
result = NULL;
......@@ -88,7 +86,7 @@ int main(int argc, char* argv[]) {
shellParseArgument(argc, argv, &args);
/* Initialize the shell */
con = shellInit(&args);
TAOS* con = shellInit(&args);
if (con == NULL) {
exit(EXIT_FAILURE);
}
......@@ -109,5 +107,4 @@ int main(int argc, char* argv[]) {
pthread_create(&pid, NULL, shellLoopQuery, con);
pthread_join(pid, NULL);
}
return 0;
}
......@@ -1590,8 +1590,11 @@ static bool needReverseScan(SQuery *pQuery) {
static bool onlyQueryTags(SQuery* pQuery) {
for(int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
if (functionId != TSDB_FUNC_TAGPRJ && functionId != TSDB_FUNC_TID_TAG) {
SExprInfo* pExprInfo = &pQuery->pSelectExpr[i];
int32_t functionId = pExprInfo->base.functionId;
if (functionId != TSDB_FUNC_TAGPRJ && functionId != TSDB_FUNC_TID_TAG &&
(!(functionId == TSDB_FUNC_COUNT && pExprInfo->base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX))) {
return false;
}
}
......@@ -4885,6 +4888,10 @@ static int32_t getColumnIndexInSource(SQueryTableMsg *pQueryMsg, SSqlFuncMsg *pE
int32_t j = 0;
if (TSDB_COL_IS_TAG(pExprMsg->colInfo.flag)) {
if (pExprMsg->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) {
return -1;
}
while(j < pQueryMsg->numOfTags) {
if (pExprMsg->colInfo.colId == pTagCols[j].colId) {
return j;
......@@ -4942,8 +4949,11 @@ static bool validateQuerySourceCols(SQueryTableMsg *pQueryMsg, SSqlFuncMsg** pEx
return false;
} else if (numOfTotal == 0) {
for(int32_t i = 0; i < pQueryMsg->numOfOutput; ++i) {
if ((pExprMsg[i]->functionId == TSDB_FUNC_TAGPRJ) ||
(pExprMsg[i]->functionId == TSDB_FUNC_TID_TAG && pExprMsg[i]->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX)) {
SSqlFuncMsg* pFuncMsg = pExprMsg[i];
if ((pFuncMsg->functionId == TSDB_FUNC_TAGPRJ) ||
(pFuncMsg->functionId == TSDB_FUNC_TID_TAG && pFuncMsg->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) ||
(pFuncMsg->functionId == TSDB_FUNC_COUNT && pFuncMsg->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX)) {
continue;
}
......@@ -5079,8 +5089,8 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
}
}
if (pExprMsg->functionId == TSDB_FUNC_TAG || pExprMsg->functionId == TSDB_FUNC_TAGPRJ ||
pExprMsg->functionId == TSDB_FUNC_TAG_DUMMY) {
int16_t functionId = pExprMsg->functionId;
if (functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TAGPRJ || functionId == TSDB_FUNC_TAG_DUMMY) {
if (pExprMsg->colInfo.flag != TSDB_COL_TAG) { // ignore the column index check for arithmetic expression.
return TSDB_CODE_INVALID_QUERY_MSG;
}
......@@ -5192,12 +5202,12 @@ static int32_t buildAirthmeticExprFromMsg(SExprInfo *pArithExprInfo, SQueryTable
return TSDB_CODE_SUCCESS;
}
static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SExprInfo **pExprInfo, SSqlFuncMsg **pExprMsg,
static int32_t createQFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SExprInfo **pExprInfo, SSqlFuncMsg **pExprMsg,
SColumnInfo* pTagCols) {
*pExprInfo = NULL;
int32_t code = TSDB_CODE_SUCCESS;
SExprInfo *pExprs = (SExprInfo *)calloc(1, sizeof(SExprInfo) * pQueryMsg->numOfOutput);
SExprInfo *pExprs = (SExprInfo *)calloc(pQueryMsg->numOfOutput, sizeof(SExprInfo));
if (pExprs == NULL) {
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
......@@ -5223,16 +5233,22 @@ static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SExprInfo
type = TSDB_DATA_TYPE_DOUBLE;
bytes = tDataTypeDesc[type].nSize;
} else if (pExprs[i].base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) { // parse the normal column
} else if (pExprs[i].base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX && pExprs[i].base.functionId == TSDB_FUNC_TAGPRJ) { // parse the normal column
type = TSDB_DATA_TYPE_BINARY;
bytes = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE;
} else{
} else {
int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].base, pTagCols);
assert(j < pQueryMsg->numOfCols || j < pQueryMsg->numOfTags);
assert(j < pQueryMsg->numOfCols || j < pQueryMsg->numOfTags || j == TSDB_TBNAME_COLUMN_INDEX);
if (pExprs[i].base.colInfo.colId != TSDB_TBNAME_COLUMN_INDEX) {
SColumnInfo* pCol = (TSDB_COL_IS_TAG(pExprs[i].base.colInfo.flag))? &pTagCols[j]:&pQueryMsg->colList[j];
type = pCol->type;
bytes = pCol->bytes;
} else {
type = TSDB_DATA_TYPE_BINARY;
bytes = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE;
}
SColumnInfo* pCol = (TSDB_COL_IS_TAG(pExprs[i].base.colInfo.flag))? &pTagCols[j]:&pQueryMsg->colList[j];
type = pCol->type;
bytes = pCol->bytes;
}
int32_t param = pExprs[i].base.arg[0].argValue.i64;
......@@ -5485,7 +5501,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
}
// set the output buffer capacity
pQuery->rec.capacity = 4096;
pQuery->rec.capacity = 2;
pQuery->rec.threshold = 4000;
for (int32_t col = 0; col < pQuery->numOfOutput; ++col) {
......@@ -5824,7 +5840,7 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi
}
SExprInfo *pExprs = NULL;
if ((code = createSqlFunctionExprFromMsg(pQueryMsg, &pExprs, pExprMsg, pTagColumnInfo)) != TSDB_CODE_SUCCESS) {
if ((code = createQFunctionExprFromMsg(pQueryMsg, &pExprs, pExprMsg, pTagColumnInfo)) != TSDB_CODE_SUCCESS) {
goto _over;
}
......@@ -5926,6 +5942,7 @@ void qTableQuery(qinfo_t qinfo) {
qTrace("QInfo:%p query task is launched", pQInfo);
if (onlyQueryTags(pQInfo->runtimeEnv.pQuery)) {
assert(pQInfo->runtimeEnv.pQueryHandle == NULL);
buildTagQueryResult(pQInfo); // todo support the limit/offset
} else if (pQInfo->runtimeEnv.stableQuery) {
stableQueryImpl(pQInfo);
......@@ -6028,24 +6045,29 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
size_t num = taosArrayGetSize(pQInfo->groupInfo.pGroupList);
assert(num == 0 || num == 1);
if (num == 0) {
size_t numOfGroup = taosArrayGetSize(pQInfo->groupInfo.pGroupList);
assert(numOfGroup == 0 || numOfGroup == 1);
if (numOfGroup == 0) {
return;
}
SArray* pa = taosArrayGetP(pQInfo->groupInfo.pGroupList, 0);
num = taosArrayGetSize(pa);
size_t num = taosArrayGetSize(pa);
assert(num == pQInfo->groupInfo.numOfTables);
int32_t count = 0;
int32_t functionId = pQuery->pSelectExpr[0].base.functionId;
if (functionId == TSDB_FUNC_TID_TAG) { // return the tags & table Id
assert(pQuery->numOfOutput == 1);
SExprInfo* pExprInfo = &pQuery->pSelectExpr[0];
int32_t rsize = pExprInfo->bytes;
for(int32_t i = 0; i < num; ++i) {
count = 0;
while(pQInfo->tableIndex < num && count < pQuery->rec.capacity) {
int32_t i = pQInfo->tableIndex++;
SGroupItem *item = taosArrayGet(pa, i);
char *output = pQuery->sdata[0]->data + i * rsize;
......@@ -6085,30 +6107,38 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
}
}
}
count += 1;
}
pQInfo->tableIndex = pQInfo->groupInfo.numOfTables;
qTrace("QInfo:%p create (tableId, tag) info completed, rows:%d", pQInfo, num);
qTrace("QInfo:%p create (tableId, tag) info completed, rows:%d", pQInfo, count);
} else if (functionId == TSDB_FUNC_COUNT) {// handle the "count(tbname)" query
*(int64_t*) pQuery->sdata[0]->data = num;
count = 1;
pQInfo->tableIndex = num; //set query completed
qTrace("QInfo:%p create count(tbname) query, res:%d rows:1", pQInfo, count);
} else { // return only the tags|table name etc.
for(int32_t i = 0; i < num; ++i) {
count = 0;
while(pQInfo->tableIndex < num && count < pQuery->rec.capacity) {
int32_t i = pQInfo->tableIndex++;
SExprInfo* pExprInfo = pQuery->pSelectExpr;
SGroupItem* item = taosArrayGet(pa, i);
for(int32_t j = 0; j < pQuery->numOfOutput; ++j) {
// todo check the return value, refactor codes
if (pExprInfo[j].base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) {
char* data = tsdbGetTableName(pQInfo->tsdb, &item->id);
char* dst = pQuery->sdata[j]->data + i * (TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE);
char* dst = pQuery->sdata[j]->data + count * (TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE);
memcpy(dst, data, varDataTLen(data));
} else {// todo refactor
int16_t type = pExprInfo[j].type;
int16_t bytes = pExprInfo[j].bytes;
char* data = tsdbGetTableTagVal(pQInfo->tsdb, &item->id, pExprInfo[j].base.colInfo.colId, type, bytes);
char* dst = pQuery->sdata[j]->data + count * pExprInfo[j].bytes;
char* dst = pQuery->sdata[j]->data + i * pExprInfo[j].bytes;
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
if (data == NULL) {
setVardataNull(dst, type);
......@@ -6124,13 +6154,13 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
}
}
}
count += 1;
}
pQInfo->tableIndex = pQInfo->groupInfo.numOfTables;
qTrace("QInfo:%p create tag values results completed, rows:%d", pQInfo, num);
qTrace("QInfo:%p create tag values results completed, rows:%d", pQInfo, count);
}
pQuery->rec.rows = num;
pQuery->rec.rows = count;
setQueryStatus(pQuery, QUERY_COMPLETED);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册