未验证 提交 cc92bd13 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #8589 from taosdata/feature/query

Feature/query
...@@ -781,6 +781,16 @@ bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) { ...@@ -781,6 +781,16 @@ bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) {
return isNull(((char*) pSql->res.urow[col]) + row * pInfo->field.bytes, pInfo->field.type); return isNull(((char*) pSql->res.urow[col]) + row * pInfo->field.bytes, pInfo->field.type);
} }
bool taos_is_update_query(TAOS_RES *res) {
SSqlObj *pSql = (SSqlObj *)res;
if (pSql == NULL || pSql->signature != pSql) {
return false;
}
SSqlCmd* pCmd = &pSql->cmd;
return ((pCmd->command >= TSDB_SQL_INSERT && pCmd->command <= TSDB_SQL_DROP_DNODE) || TSDB_SQL_RESET_CACHE == pCmd->command || TSDB_SQL_USE_DB == pCmd->command);
}
int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) { int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) {
int len = 0; int len = 0;
...@@ -909,7 +919,6 @@ int taos_validate_sql(TAOS *taos, const char *sql) { ...@@ -909,7 +919,6 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
strtolower(pSql->sqlstr, sql); strtolower(pSql->sqlstr, sql);
// pCmd->curSql = NULL;
if (NULL != pCmd->insertParam.pTableBlockHashList) { if (NULL != pCmd->insertParam.pTableBlockHashList) {
taosHashCleanup(pCmd->insertParam.pTableBlockHashList); taosHashCleanup(pCmd->insertParam.pTableBlockHashList);
pCmd->insertParam.pTableBlockHashList = NULL; pCmd->insertParam.pTableBlockHashList = NULL;
...@@ -934,6 +943,17 @@ int taos_validate_sql(TAOS *taos, const char *sql) { ...@@ -934,6 +943,17 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
return code; return code;
} }
void taos_reset_current_db(TAOS *taos) {
STscObj* pObj = (STscObj*) taos;
if (pObj == NULL || pObj->signature != pObj) {
return;
}
pthread_mutex_lock(&pObj->mutex);
memset(pObj->db, 0, tListLen(pObj->db));
pthread_mutex_unlock(&pObj->mutex);
}
void loadMultiTableMetaCallback(void *param, TAOS_RES *res, int code) { void loadMultiTableMetaCallback(void *param, TAOS_RES *res, int code) {
SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, (int64_t)param); SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, (int64_t)param);
if (pSql == NULL) { if (pSql == NULL) {
......
...@@ -175,11 +175,13 @@ DLL_EXPORT int taos_select_db(TAOS *taos, const char *db); ...@@ -175,11 +175,13 @@ DLL_EXPORT int taos_select_db(TAOS *taos, const char *db);
DLL_EXPORT int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields); DLL_EXPORT int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields);
DLL_EXPORT void taos_stop_query(TAOS_RES *res); DLL_EXPORT void taos_stop_query(TAOS_RES *res);
DLL_EXPORT bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col); DLL_EXPORT bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col);
DLL_EXPORT bool taos_is_update_query(TAOS_RES *res);
DLL_EXPORT int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows); DLL_EXPORT int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows);
DLL_EXPORT int taos_validate_sql(TAOS *taos, const char *sql);
DLL_EXPORT int* taos_fetch_lengths(TAOS_RES *res); DLL_EXPORT int* taos_fetch_lengths(TAOS_RES *res);
DLL_EXPORT int taos_validate_sql(TAOS *taos, const char *sql);
DLL_EXPORT void taos_reset_current_db(TAOS *taos);
// TAOS_RES *taos_list_tables(TAOS *mysql, const char *wild); // TAOS_RES *taos_list_tables(TAOS *mysql, const char *wild);
// TAOS_RES *taos_list_dbs(TAOS *mysql, const char *wild); // TAOS_RES *taos_list_dbs(TAOS *mysql, const char *wild);
...@@ -192,7 +194,6 @@ DLL_EXPORT int taos_errno(TAOS_RES *tres); ...@@ -192,7 +194,6 @@ DLL_EXPORT int taos_errno(TAOS_RES *tres);
DLL_EXPORT void taos_query_a(TAOS *taos, const char *sql, void (*fp)(void *param, TAOS_RES *, int code), void *param); DLL_EXPORT void taos_query_a(TAOS *taos, const char *sql, void (*fp)(void *param, TAOS_RES *, int code), void *param);
DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, void (*fp)(void *param, TAOS_RES *, int numOfRows), void *param); DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, void (*fp)(void *param, TAOS_RES *, int numOfRows), void *param);
//DLL_EXPORT void taos_fetch_row_a(TAOS_RES *res, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), void *param);
typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code); typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code);
DLL_EXPORT TAOS_SUB *taos_subscribe(TAOS* taos, int restart, const char* topic, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval); DLL_EXPORT TAOS_SUB *taos_subscribe(TAOS* taos, int restart, const char* topic, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval);
......
...@@ -355,16 +355,16 @@ enum OPERATOR_TYPE_E { ...@@ -355,16 +355,16 @@ enum OPERATOR_TYPE_E {
typedef struct SOperatorInfo { typedef struct SOperatorInfo {
uint8_t operatorType; uint8_t operatorType;
bool blockingOptr; // block operator or not bool blockingOptr; // block operator or not
uint8_t status; // denote if current operator is completed uint8_t status; // denote if current operator is completed
int32_t numOfOutput; // number of columns of the current operator results int32_t numOfOutput; // number of columns of the current operator results
char *name; // name, used to show the query execution plan char *name; // name, used to show the query execution plan
void *info; // extension attribution void *info; // extension attribution
SExprInfo *pExpr; SExprInfo *pExpr;
SQueryRuntimeEnv *pRuntimeEnv; SQueryRuntimeEnv *pRuntimeEnv;
struct SOperatorInfo **upstream; // upstream pointer list struct SOperatorInfo **upstream; // upstream pointer list
int32_t numOfUpstream; // number of upstream. The value is always ONE expect for join operator int32_t numOfUpstream; // number of upstream. The value is always ONE expect for join operator
__operator_fn_t exec; __operator_fn_t exec;
__optr_cleanup_fn_t cleanup; __optr_cleanup_fn_t cleanup;
} SOperatorInfo; } SOperatorInfo;
......
...@@ -4231,7 +4231,7 @@ static int32_t doCopyToSDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* ...@@ -4231,7 +4231,7 @@ static int32_t doCopyToSDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo*
int32_t start = 0; int32_t start = 0;
int32_t step = -1; int32_t step = -1;
qDebug("QInfo:0x%"PRIx64" start to copy data from windowResInfo to output buf", GET_QID(pRuntimeEnv)); qDebug("QInfo:0x%"PRIx64" start to copy data from resultrowInfo to output buf", GET_QID(pRuntimeEnv));
assert(orderType == TSDB_ORDER_ASC || orderType == TSDB_ORDER_DESC); assert(orderType == TSDB_ORDER_ASC || orderType == TSDB_ORDER_DESC);
if (orderType == TSDB_ORDER_ASC) { if (orderType == TSDB_ORDER_ASC) {
...@@ -5220,7 +5220,6 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* ...@@ -5220,7 +5220,6 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv*
pInfo->reverseTimes = 0; pInfo->reverseTimes = 0;
pInfo->order = pRuntimeEnv->pQueryAttr->order.order; pInfo->order = pRuntimeEnv->pQueryAttr->order.order;
pInfo->current = 0; pInfo->current = 0;
// pInfo->prevGroupId = -1;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "TableScanOperator"; pOperator->name = "TableScanOperator";
......
...@@ -448,7 +448,9 @@ int32_t tsDescOrder(const void* p1, const void* p2) { ...@@ -448,7 +448,9 @@ int32_t tsDescOrder(const void* p1, const void* p2) {
} }
} }
void orderTheResultRows(SQueryRuntimeEnv* pRuntimeEnv) { void
orderTheResultRows(SQueryRuntimeEnv* pRuntimeEnv) {
__compar_fn_t fn = NULL; __compar_fn_t fn = NULL;
if (pRuntimeEnv->pQueryAttr->order.order == TSDB_ORDER_ASC) { if (pRuntimeEnv->pQueryAttr->order.order == TSDB_ORDER_ASC) {
fn = tsAscOrder; fn = tsAscOrder;
......
...@@ -3452,9 +3452,12 @@ void filterPrepare(void* expr, void* param) { ...@@ -3452,9 +3452,12 @@ void filterPrepare(void* expr, void* param) {
int dummy = -1; int dummy = -1;
SHashObj *pObj = NULL; SHashObj *pObj = NULL;
if (pInfo->sch.colId == TSDB_TBNAME_COLUMN_INDEX) { if (pInfo->sch.colId == TSDB_TBNAME_COLUMN_INDEX) {
pObj = taosHashInit(256, taosGetDefaultHashFunction(pInfo->sch.type), true, false);
SArray *arr = (SArray *)(pCond->arr); SArray *arr = (SArray *)(pCond->arr);
for (size_t i = 0; i < taosArrayGetSize(arr); i++) {
size_t size = taosArrayGetSize(arr);
pObj = taosHashInit(size * 2, taosGetDefaultHashFunction(pInfo->sch.type), true, false);
for (size_t i = 0; i < size; i++) {
char* p = taosArrayGetP(arr, i); char* p = taosArrayGetP(arr, i);
strntolower_s(varDataVal(p), varDataVal(p), varDataLen(p)); strntolower_s(varDataVal(p), varDataVal(p), varDataLen(p));
taosHashPut(pObj, varDataVal(p), varDataLen(p), &dummy, sizeof(dummy)); taosHashPut(pObj, varDataVal(p), varDataLen(p), &dummy, sizeof(dummy));
...@@ -3462,12 +3465,14 @@ void filterPrepare(void* expr, void* param) { ...@@ -3462,12 +3465,14 @@ void filterPrepare(void* expr, void* param) {
} else { } else {
buildFilterSetFromBinary((void **)&pObj, pCond->pz, pCond->nLen); buildFilterSetFromBinary((void **)&pObj, pCond->pz, pCond->nLen);
} }
pInfo->q = (char *)pObj; pInfo->q = (char *)pObj;
} else if (pCond != NULL) { } else if (pCond != NULL) {
uint32_t size = pCond->nLen * TSDB_NCHAR_SIZE; uint32_t size = pCond->nLen * TSDB_NCHAR_SIZE;
if (size < (uint32_t)pSchema->bytes) { if (size < (uint32_t)pSchema->bytes) {
size = pSchema->bytes; size = pSchema->bytes;
} }
// to make sure tonchar does not cause invalid write, since the '\0' needs at least sizeof(wchar_t) space. // to make sure tonchar does not cause invalid write, since the '\0' needs at least sizeof(wchar_t) space.
pInfo->q = calloc(1, size + TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE); pInfo->q = calloc(1, size + TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE);
tVariantDump(pCond, pInfo->q, pSchema->type, true); tVariantDump(pCond, pInfo->q, pSchema->type, true);
...@@ -3615,7 +3620,7 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC ...@@ -3615,7 +3620,7 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC
return pTableGroup; return pTableGroup;
} }
int32_t tsdbQuerySTableByTagCond(STsdbRepo* tsdb, uint64_t uid, TSKEY skey, const char* pTagCond, size_t len, int32_t tsdbQuerySTableByTagCond(STsdbRepo* tsdb, uint64_t uid, TSKEY skey, const char* pTagCond, size_t len,
STableGroupInfo* pGroupInfo, SColIndex* pColIndex, int32_t numOfCols) { STableGroupInfo* pGroupInfo, SColIndex* pColIndex, int32_t numOfCols) {
if (tsdbRLockRepoMeta(tsdb) < 0) goto _error; if (tsdbRLockRepoMeta(tsdb) < 0) goto _error;
...@@ -3677,19 +3682,19 @@ int32_t tsdbQuerySTableByTagCond(STsdbRepo* tsdb, uint64_t uid, TSKEY skey, cons ...@@ -3677,19 +3682,19 @@ int32_t tsdbQuerySTableByTagCond(STsdbRepo* tsdb, uint64_t uid, TSKEY skey, cons
} END_TRY } END_TRY
void *filterInfo = NULL; void *filterInfo = NULL;
ret = filterInitFromTree(expr, &filterInfo, 0); ret = filterInitFromTree(expr, &filterInfo, 0);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
terrno = ret; terrno = ret;
goto _error; goto _error;
} }
tsdbQueryTableList(pTable, res, filterInfo); tsdbQueryTableList(pTable, res, filterInfo);
filterFreeInfo(filterInfo); filterFreeInfo(filterInfo);
tExprTreeDestroy(expr, NULL); tExprTreeDestroy(expr, NULL);
pGroupInfo->numOfTables = (uint32_t)taosArrayGetSize(res); pGroupInfo->numOfTables = (uint32_t)taosArrayGetSize(res);
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, skey); pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, skey);
...@@ -3876,7 +3881,7 @@ void tsdbDestroyTableGroup(STableGroupInfo *pGroupList) { ...@@ -3876,7 +3881,7 @@ void tsdbDestroyTableGroup(STableGroupInfo *pGroupList) {
static FORCE_INLINE int32_t tsdbGetTagDataFromId(void *param, int32_t id, void **data) { static FORCE_INLINE int32_t tsdbGetTagDataFromId(void *param, int32_t id, void **data) {
STable* pTable = (STable*)(SL_GET_NODE_DATA((SSkipListNode *)param)); STable* pTable = (STable*)(SL_GET_NODE_DATA((SSkipListNode *)param));
if (id == TSDB_TBNAME_COLUMN_INDEX) { if (id == TSDB_TBNAME_COLUMN_INDEX) {
*data = TABLE_NAME(pTable); *data = TABLE_NAME(pTable);
} else { } else {
...@@ -3909,7 +3914,7 @@ static void queryIndexedColumn(SSkipList* pSkipList, void* filterInfo, SArray* r ...@@ -3909,7 +3914,7 @@ static void queryIndexedColumn(SSkipList* pSkipList, void* filterInfo, SArray* r
iter = tSkipListCreateIterFromVal(pSkipList, startVal, pSkipList->type, TSDB_ORDER_DESC); iter = tSkipListCreateIterFromVal(pSkipList, startVal, pSkipList->type, TSDB_ORDER_DESC);
FILTER_CLR_FLAG(order, TSDB_ORDER_DESC); FILTER_CLR_FLAG(order, TSDB_ORDER_DESC);
} }
while (tSkipListIterNext(iter)) { while (tSkipListIterNext(iter)) {
SSkipListNode *pNode = tSkipListIterGet(iter); SSkipListNode *pNode = tSkipListIterGet(iter);
...@@ -3918,7 +3923,7 @@ static void queryIndexedColumn(SSkipList* pSkipList, void* filterInfo, SArray* r ...@@ -3918,7 +3923,7 @@ static void queryIndexedColumn(SSkipList* pSkipList, void* filterInfo, SArray* r
filterSetColFieldData(filterInfo, pNode, tsdbGetTagDataFromId); filterSetColFieldData(filterInfo, pNode, tsdbGetTagDataFromId);
all = filterExecute(filterInfo, 1, &addToResult, NULL, 0); all = filterExecute(filterInfo, 1, &addToResult, NULL, 0);
} }
char *pData = SL_GET_NODE_DATA(pNode); char *pData = SL_GET_NODE_DATA(pNode);
tsdbDebug("filter index column, table:%s, result:%d", ((STable *)pData)->name->data, all); tsdbDebug("filter index column, table:%s, result:%d", ((STable *)pData)->name->data, all);
...@@ -3950,7 +3955,7 @@ static void queryIndexlessColumn(SSkipList* pSkipList, void* filterInfo, SArray* ...@@ -3950,7 +3955,7 @@ static void queryIndexlessColumn(SSkipList* pSkipList, void* filterInfo, SArray*
SSkipListNode *pNode = tSkipListIterGet(iter); SSkipListNode *pNode = tSkipListIterGet(iter);
filterSetColFieldData(filterInfo, pNode, tsdbGetTagDataFromId); filterSetColFieldData(filterInfo, pNode, tsdbGetTagDataFromId);
char *pData = SL_GET_NODE_DATA(pNode); char *pData = SL_GET_NODE_DATA(pNode);
bool all = filterExecute(filterInfo, 1, &addToResult, NULL, 0); bool all = filterExecute(filterInfo, 1, &addToResult, NULL, 0);
...@@ -3958,7 +3963,7 @@ static void queryIndexlessColumn(SSkipList* pSkipList, void* filterInfo, SArray* ...@@ -3958,7 +3963,7 @@ static void queryIndexlessColumn(SSkipList* pSkipList, void* filterInfo, SArray*
if (all || (addToResult && *addToResult)) { if (all || (addToResult && *addToResult)) {
STableKeyInfo info = {.pTable = (void*)pData, .lastKey = TSKEY_INITIAL_VAL}; STableKeyInfo info = {.pTable = (void*)pData, .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(res, &info); taosArrayPush(res, &info);
} }
} }
tfree(addToResult); tfree(addToResult);
...@@ -3971,9 +3976,9 @@ static int32_t tsdbQueryTableList(STable* pTable, SArray* pRes, void* filterInfo ...@@ -3971,9 +3976,9 @@ static int32_t tsdbQueryTableList(STable* pTable, SArray* pRes, void* filterInfo
STSchema* pTSSchema = pTable->tagSchema; STSchema* pTSSchema = pTable->tagSchema;
bool indexQuery = false; bool indexQuery = false;
SSkipList *pSkipList = pTable->pIndex; SSkipList *pSkipList = pTable->pIndex;
filterIsIndexedColumnQuery(filterInfo, pTSSchema->columns->colId, &indexQuery); filterIsIndexedColumnQuery(filterInfo, pTSSchema->columns->colId, &indexQuery);
if (indexQuery) { if (indexQuery) {
queryIndexedColumn(pSkipList, filterInfo, pRes); queryIndexedColumn(pSkipList, filterInfo, pRes);
} else { } else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册